Apache Hive on Apache Spark: Motivations and Design Principles

Cloudera Blog - Tue, 07/01/2014 - 16:54

Two of the most vibrant communities in the Apache Hadoop ecosystem are now working together to bring users a Hive-on-Spark option that combines the best elements of both.

Apache Hive is a popular SQL interface for batch processing and ETL using Apache Hadoop. Until recently, MapReduce was the only execution engine in the Hadoop ecosystem, and Hive queries could only run on MapReduce. But today, alternative execution engines to MapReduce are available — such as Apache Spark and Apache Tez (incubating).

Although Spark is relatively new to the Hadoop ecosystem, its adoption has been meteoric. An open-source data analytics cluster computing framework, Spark is built outside of Hadoop’s two-stage MapReduce paradigm but runs on top of HDFS. Because of its successful approach, Spark has quickly gained momentum and become established as an attractive choice for the future of data processing in Hadoop.

In this post, you’ll get an overview of the motivations and technical details behind some very exciting news for Spark and Hive users: the fact that the Hive and Spark communities are joining forces to collaboratively introduce Spark as a new execution engine option for Hive, alongside MapReduce and Tez (see HIVE-7292).

Motivation and Approach

Here are the two main motivations for enabling Hive to run on Spark:

  • To improve the Hive user experience
    Hive queries will run faster, thereby improving user experience. Furthermore, users will have access to a robust, non-MR execution engine that has already proven itself to be a leading option for data processing as well as streaming, and which is among the most active projects across all of Apache from contributor and commit standpoints.
  • To streamline operational management for Spark shops
    Hive-on-Spark will be very valuable for users who are already using Spark for other data-processing and machine-learning needs. Standardizing on one execution back end is also convenient for operational management, making it easier to debug issues and create enhancements.

Superficially, this project’s goals look similar to those of Shark or Spark SQL, which are separate projects that reuse the Hive front end to run queries using Spark. However, this design adds Spark into Hive, parallel to MapReduce and Tez, as another backend execution engine. Thus, existing Hive jobs continue to run as-is transparently on Spark.

The key advantage of this approach is to leverage all the existing integration on top of Hive, including ODBC/JDBC, auditing, authorization, and monitoring. Another advantage is that it will have no impact on Hive’s existing code path and thus no functional or performance effects. Users choosing to run Hive on either MapReduce or Tez will have the same functionality and code paths they have today — thus, the Hive user community will be in the great position of being able to choose among MapReduce, Tez, or Spark as a back end. In addition, maintenance costs will be minimized so the Hive community needn’t make specialized investments for Spark.

Meanwhile, users opting for Spark as the execution engine will automatically have all the rich functional features that Hive provides. Future features (such as new data types, UDFs, logical optimization, and so on ) added to Hive should become automatically available to those users, without any customization work to be done in Hive’s Spark execution engine.

Overall Functionality

To use Spark as an execution engine in Hive, you would set the following:

set hive.execution.engine=spark;

The default value for this configuration is still “mr”. Hive will continue to work on MapReduce as-is on clusters that don’t have Spark on them. When Spark is configured as Hive’s execution, a few configuration variables will be introduced, such as the master URL of the Spark cluster.

The new execution engine should support all Hive queries without any modification. Query results should be functionally equivalent to those from either MapReduce or Tez. In addition, existing functionality related to the execution engine should also be available, including the following:

  • Hive will display a task execution plan that’s similar to that being displayed by the explain command for MapReduce and Tez.     
  • Hive will give appropriate feedback to the user about progress and completion status of the query when running queries on Spark.
  • The user will be able to get statistics and diagnostic information as before (counters, logs, and debug info on the console).
Hive-Level Design

As noted previously, this project takes a different approach from that of Shark in that SQL semantics will be not implemented using Spark’s primitives, but rather MapReduce ones that will be executed in Spark.

The main work to implement the Spark execution engine for Hive has two components: query planning, where Hive operator plan from semantic analyzer is further translated a task plan that Spark can execute; and query execution, where the generated Spark plan is executed in the Spark cluster. There are other miscellaneous yet indispensable functional pieces involving monitoring, counters, statistics, and so on, but for brevity, we will only address the main design considerations here.

Query Planning

Currently, for a given user query, Hive’s semantic analyzer generates an operator plan that comprises a graph of logical operators such as TableScanOperator, ReduceSink, FileSink, GroupByOperator, and so on. MapReduceCompiler compiles a graph of MapReduceTasks and other helper tasks (such as MoveTask) from the logical operator plan. Tez behaves similarly, yet generates a TezTask that combines otherwise multiple MapReduce tasks into a single Tez task.

For Spark, we will introduce SparkCompiler parallel to MapReduceCompiler and TezCompiler. Its main responsibility is to compile from the Hive logical operator plan a plan that can be executed on Spark. Thus, we will have SparkTask, depicting a job that will be executed in a Spark cluster, and SparkWork, describing the plan of a Spark task. Thus, SparkCompiler translates a Hive’s operator plan into a SparkWork instance. During the task plan generation, SparkCompiler may also perform physical optimizations suitable for Spark.   

Job Execution

A SparkTask instance can be executed by Hive’s task execution framework in the same way as for other tasks. Internally, the SparkTask.execute() method will make RDDs and functions out of a SparkWork instance, and submit the execution to the Spark cluster via a Spark client.

Once the Spark work is submitted to the Spark cluster, the Spark client will continue to monitor the job execution and report progress. A Spark job can be monitored via SparkListener APIs.

With SparkListener APIs, we will add a SparkJobMonitor class that handles printing of status as well as reporting the final result. This class provides similar functions as HadoopJobExecHelper used for MapReduce processing, or TezJobMonitor used for Tez job processing, and will also retrieve and print the top-level exception thrown at execution time in case of job failure.

Spark job submission is done via a SparkContext object that’s instantiated with the user’s configuration. When a SparkTask is executed by Hive, such a context object is created in the current user session. With the context object, RDDs corresponding to Hive tables are created and MapFunction and ReduceFunction (more details below) are built from Hive’s SparkWork and applied to the RDDs. Job execution is triggered by applying a foreach() transformation on the RDDs with a dummy function.

Main Design Considerations

Hive’s operator plan is based on MapReduce paradigm, and traditionally, a query’s execution contains a list of MapReduce jobs. Each MapReduce job consists of map-side processing starting from Hive’s ExecMapper and reduce-side processing starting from ExecReducer, and MapReduce provides inherent shuffling, sorting, and grouping between the map-side and the reduce-side. The input to the whole processing pipeline are the folders and files corresponding to the table.

Because we will reuse Hive’s operator plan but perform the same data processing in Spark, the execution plan will be built in Spark constructs such as RDD, function, and transformation. This approach is outlined below.

Table as RDD

A Hive table is simply a bunch of files and folders on HDFS. Spark primitives are applied to RDDs. Thus, naturally, Hive tables will be treated as RDDs in the Spark execution engine.


The above mentioned MapFunction will be made from MapWork; specifically, the operator chain starting from ExecMapper.map() method. ExecMapper class implements MapReduce Mapper interface, but the implementation in Hive contains some code that can be reused for Spark. Therefore, we will extract the common code into a separate class, MapperDriver, to be shared by MapReduce as well as Spark.


Similarly, ReduceFunction will be made of ReduceWork instance from SparkWork. To Spark, ReduceFunction is no different than MapFunction, but the function’s implementation will be different, and made of the operator chain starting from ExecReducer.reduce(). Also, because some code in ExecReducer will be reused, we will extract the common code into a separate class, ReducerDriver, for sharing by both MapReduce and Spark.

Shuffle, Group, and Sort

While this functionality comes for “free” along with MapReduce, we will need to provide an equivalent for Spark. Fortunately, Spark provides a few transformations that are suitable for replacing MapReduce’s shuffle capability, such as partitionBy, groupByKey, and sortByKey. Transformation partitionBy does pure shuffling (no grouping or sorting), groupByKey does shuffling and grouping, and sortByKey() does shuffling plus sorting. Therefore, for each ReduceSinkOperator in SparkWork, we will need to inject one of the transformations.

Having the capability to selectively choose the exact shuffling behavior provides opportunities for optimization. For instance, Hive’s groupBy doesn’t require the key to be sorted, but MapReduce does. In contrast, in Spark, one can choose sortByKey only if key order is important (such as for SQL ORDER BY).

Multiple Reduce Stages

Whenever a query has multiple ReduceSinkOperator instances, Hive will break the plan apart and submit one MR job per sink. All the MR jobs in this chain need to be scheduled one-by-one, and each job has to re-read the output of the previous job from HDFS and shuffle it. In Spark, this step is unnecessary: multiple map functions and reduce functions can be concatenated. For each ReduceSinkOperator, a proper shuffle transformation needs to be injected as explained above.


Based on the above, you will likely recognize that although Hive on Spark is simple and clean in terms of functionality and design, the implementation will take some time. Therefore, the community will take a phased approach, with all basic functionality delivered in a Phase 1 and optimization and improvements ongoing over a longer period of time. (Precise number of phases and what each will entail are under discussion.)

Most important, the Hive and Spark communities will work together closely to achieve this technical vision and resolve any obstacles that might arise — with the end result being the availability to Hive users of an execution engine that improves performance as well as unifies batch and stream processing.

We invite you to follow our progress, on which we will offer periodic updates!

Xuefu Zhang is a Software Engineer at Cloudera and a Hive PMC member.

Categories: Hadoop

Why Extended Attributes are Coming to HDFS

Cloudera Blog - Fri, 06/27/2014 - 15:00

Extended attributes in HDFS will facilitate at-rest encryption for Project Rhino, but they have many other uses, too.

Many mainstream Linux filesystems implement extended attributes, which let you associate metadata with a file or directory beyond common “fixed” attributes like filesize, permissions, modification dates, and so on. Extended attributes are key/value pairs in which the values are optional; generally, the key and value sizes are limited to some implementation-specific limit. A filesystem that implements extended attributes also provides system calls and shell commands to get, list, set, and remove attributes (and values) to/from a file or directory.

Recently, my Intel colleague Yi Liu led the implementation of extended attributes for HDFS (HDFS-2006). This work is largely motivated by Cloudera and Intel contributions to bringing at-rest encryption to Apache Hadoop (HDFS-6134; also see this post) under Project Rhino – extended attributes will be the mechanism for associating encryption key metadata with files and encryption zones — but it’s easy to imagine lots of other places where they could be useful.

For instance, you might want to store a document’s author and subject in sometime like user.author=cwl and user.subject=HDFS. You could store a file checksum in an attribute called user.checksum. Even just comments about a particular file or directory can be saved in an extended attribute.

In this post, you’ll learn some of the details of this feature from an HDFS user’s point of view.

Inside Extended Attributes

Extended attribute keys are java.lang.Strings and the values are byte[]s. By default, there is a maximum of 32 extended attribute key/value pairs per file or directory, and the (default) maximum size of the combined lengths of name and value is 16,384. You can configure these two limits with the dfs.namenode.fs-limits.max-xattrs-per-inode and dfs.namenode.fs-limits.max-xattr-size config parameters.

Every extended attribute name must include a namespace prefix, and just like in the Linux filesystem implementations, there are four extended attribute namespaces: user, trusted, system, and security. The system and security namespaces are for HDFS internal use only; only the HDFS super user can access trusted namespace. So, user extended attributes will generally reside in the user namespace (for example, “user.myXAttrkey”). Namespaces are case-insensitive and extended attribute names are case-sensitive.

Extended attributes can be accessed using the hdfs dfs command. To set an extended attribute on a file or directory, use the -setfattr subcommand. For example,

hdfs dfs -setfattr -n 'user.myXAttr' -v someValue /foo


You can replace a value with the same command (and a new value) and you can delete an extended attribute with the -x option. The usage message shows the format. Setting an extended attribute does not change the file’s modification time.

To examine the value of a particular extended attribute or to list all the extended attributes for a file or directory, use the hdfs dfs –getfattr subcommand. There are options to recursively descend through a subtree and specify a format to write them out (text, hex, or base64). For example, to scan all the extended attributes for /foo, use the -d option:

hdfs dfs -getfattr -d /foo # file: /foo user.myXAttr='someValue'


The org.apache.hadoop.fs.FileSystem class has been enhanced with methods to set, get, and remove extended attributes from a path.

package org.apache.hadoop.fs; public class FileSystem { /* Create or replace an extended attribute. */ public void setXAttr(Path path, String name, byte[] value) throws IOException; /* get an extended attribute. */ public byte[] getXAttr(Path path, String name) throws IOException; /* get multiple extended attributes. */ public Map<String, byte[]> getXAttrs(Path path, List<String> names) throws IOException; /* Remove an extended attribute. */ public void removeXAttr(Path path, String name) throws IOException; }


Current Status

Extended attributes are currently committed on the upstream trunk and branch-2 and will be included in CDH 5.2 and Apache Hadoop 2.5. They are enabled by default (you can disable them with the dfs.namenode.xattrs.enabled configuration option) and there is no overhead if you don’t use them.

Extended attributes are also upward compatible, so you can use an older client with a newer NameNode version. Try them out!

Charles Lamb is a Software Engineer at Cloudera, currently working on HDFS.

Categories: Hadoop

Where to Find Cloudera Tech Talks (Through September 2014)

Cloudera Blog - Wed, 06/25/2014 - 16:12

Find Cloudera tech talks in Texas, Oregon, Washington DC, Illinois, Georgia, Japan, and across the SF Bay Area during the next calendar quarter.

Below please find our regularly scheduled quarterly update about where to find tech talks by Cloudera employees – this time, for the third calendar quarter of 2014 (July through September; traditionally, the least active quarter of the year). Note that this list will be continually curated during the period; complete logistical information may not be available yet. And remember, many of these talks are in “free” venues (no cost of entry).

As always, we’re standing by to assist your meetup by providing speakers, sponsorships, and schwag!

Date City Venue Speaker(s) June 30-July 1 San Francisco Spark Summit Sandy Ryza on Apache Spark-on-YARN, and on doing end-to-end analytics with Spark July TBD Austin Austin Data Geeks Sean Busbey on Apache Accumulo July 7 San Francisco Spark Happy Hour No speakers, just sponsoring July 8 Tokyo Hadoop Conference Japan Tatsuo Kawasaki on Hue, Daisuke Kobayashi on security, Sho Shimauchi on Impala July 15 Palo Alto, Calif. SF Bay Lucene/Solr Meetup Mark Miller on SolrCloud replica failover and automated testing July 16 Irvine, Calif. OC Big Data Meetup Greg Chanan on Solr+Apache Hadoop July 21 San Francisco GraphLab Conference Josh Wills on ” What Comes After the Star Schema?” July 22 Washington DC DC Area Spark Interactive Ted Malaska (panelist) on Spark use cases July 20-23 Portland, Ore. OSCON Gwen Shapira on using R for analytics on Hadoop (+ leads a Hadoop BOF) July 23 Portland, Ore. Portland Big Data User Group Speaker TBD on Impala July 23 SF Bay Area TBD The Hive Think Tank Eddie Garcia and Alejandro Abdelnur on enterprise security for Hadoop Aug. 13 San Diego, Calif. SD Big Data Meetup Maxime Dumas on Impala Aug. 20 San Jose, Calif. NoSQL Now! Aleks Shulman on Apache HBase development Sept. 15 Champaign, Ill. U-CHUG Doug Cutting on the future of data Sept. 19 Atlanta MLconf Atlanta Sandy Ryza on end-to-end analytics with Spark Sept. 28-Oct. 2 San Francisco JavaOne Aleks Shulman on HBase development, Daniel Templeton with a Hadoop HOL

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

How-to: Create an IntelliJ IDEA Project for Apache Hadoop

Cloudera Blog - Tue, 06/24/2014 - 15:07

Prefer IntelliJ IDEA over Eclipse? We’ve got you covered: learn how to get ready to contribute to Apache Hadoop via an IntelliJ project.

It’s generally useful to have an IDE at your disposal when you’re developing and debugging code. When I first started working on HDFS, I used Eclipse, but I’ve recently switched to JetBrains’ IntelliJ IDEA (specifically, version 13.1 Community Edition).

My main motivation was the ease of project setup in the face of Maven and Google Protocol Buffers (used in HDFS). The latter is an issue because the code generated by protoc ends up in one of the target subdirectories, which can be a configuration headache. The problem is not that Eclipse can’t handle getting these files into the classpath — it’s that in my personal experience, configuration is cumbersome and it takes more time to set up a new project whenever I make a new clone. Conversely, IntelliJ’s import maven project functionality seems to “just work”.

In this how-to, you’ll learn a few simple steps I use to create a new IntelliJ project for writing and debugging your code for Hadoop. (For Eclipse users, there is a similar post available here.) It assumes you already know how to clone, build, and run in a Hadoop repository using mvn, git, and so on. These instructions have been tested with the Hadoop upstream trunk (github.com/apache/hadoop-common.git).

  1. Make a clone of a Hadoop repository (or use an existing sandbox). There should be a hadoop-common directory at the top level when you’re finished.
  2. Do a clean and a full build using the mvn command in the CLI. I use mvn clean install, but you should do whatever suits you.
  3. In .../bin/idea.properties file, set idea.max.intellisense.filesize=10000.
  4. Start IntelliJ.
  5. Select File > Import Project…
  6. A “Select File or Directory” to Import wizard screen will pop up. Select the hadoop-common directory from your repository clone. Click OK.
  7. An “Import Project” wizard screen will appear. Check “Import project from external model” and select “Maven”. Click Next.

  8. On the next screen, you don’t need to select any options. Click Next


  9. On the next wizard screen, you do not need to select any profiles. Click Next.
  10. On the next wizard screen, org.apache.hadoop.hadoop-main:N.M.P-SNAPSHOT will already be selected. Click Next.
  11. On the next wizard screen, ensure that IntelliJ is pointing at a JDK 7 SDK (download JDK 7 here) for the JDK home path. Click Next.

  12. On the next wizard screen, give your project a project name. I typically set the “Project file location” to a sibling of the hadoop-common directory, but that’s optional. Click Finish.
  13. IntelliJ will then tell you: “New projects can either be opened in a new window or replace the project in the existing window. How would you like to open the project?” I typically select “New Window” because it lets me different projects in different panels. Select one or the other.
  14. IntelliJ then imports the project.
  15. You can check that the project builds OK using Build > Rebuild Project. In the “Messages” panel I typically use the little icon on the left side of that window to Hide Warnings.

You’re ready for action. It’s easy to set up multiple projects that refer to different clones: just repeat the same steps above. Click on Window and you can switch between them.

A typical workflow for me is to edit either in my favorite editor or in IntelliJ. If the former, IntelliJ is very good about updating the source that it shows you. After editing, I’ll do Build > Rebuild Project, work out any compilation errors, and then either use the Debug or Run buttons. I like the fact that IntelliJ doesn’t make me mess around with Run/Debug configurations.

Some of my favorite commands and keystrokes (generally configurable) are:

  • c-sh-N (Navigate > File), which lets me easily search for a file and open it
  • c-F2 and c-F5 (stop the currently running process, and debug failed unit tests)
  • In the Run menu, F7 (step into), F8 (step over), Shift-F8 (step out), and F9 (resume program) while in the debugger
  • In the View > Tool Windows menu: Alt-1 (Project) and Alt-7 (Structure)

If you ever run into an error that says you need to add webapps/hdfs to your classpath (has happened to me when running some HDFS unit tests), taking the following steps should fix it (credit goes to this post from Stack Overflow):

  1. Select File > Project Structure…
  2. Click on Modules under “Project Settings.”
  3. Select the hadoop-hdfs project.
  4. Select the Dependencies tab.

  5. Click the + sign on right side and select “Jars or directories.”

  6. From your clone hierarchy, select the .../hadoop-common/hadoop-hdfs-project/hadoop-hdfs/target directory. Click OK.
  7. Check all of the / directories (classes, jar directory, source archive directory). Click OK.
  8. Click OK (again).

Congratulations, you are now ready to contribute to Hadoop via an IntelliJ project!

Charles Lamb is a Software Engineer at Cloudera, currently working on HDFS.

Categories: Hadoop

How-to: Install a Virtual Apache Hadoop Cluster with Vagrant and Cloudera Manager

Cloudera Blog - Fri, 06/20/2014 - 15:25

It’s been a while since we provided a how-to for this purpose. Thanks, Daan Debie (@DaanDebie), for allowing us to re-publish the instructions below (for CDH 5)!

I recently started as a Big Data Engineer at The New Motion. While researching our best options for running an Apache Hadoop cluster, I wanted to try out some of the features available in the newest version of Cloudera’s Hadoop distribution: CDH 5. Of course I could’ve downloaded the QuickStart VM, but I rather wanted to run a virtual cluster, making use of the 16GB of RAM my shiny new 15″ Retina Macbook Pro has ;)

There are some tutorials, and repositories available for installing a local virtualized cluster, but none of them did what I wanted to do: install the bare cluster using Vagrant, and install the Hadoop stack using the Cloudera Manager. So I created a simple Vagrant setup myself. You can find it here.

Setting up the Virtual Machines

As per the instructions from the Gitub repo:

Depending on the hardware of your computer, installation will probably take between 15 and 25 minutes.

First install VirtualBox and Vagrant.

Install the Vagrant Hostmanager plugin.

$ vagrant plugin install vagrant-hostmanager


Clone this repository.

$ git clone https://github.com/DandyDev/virtual-hadoop-cluster.git


Provision the bare cluster. It will ask you to enter your password, so it can modify your /etc/hosts file for easy access in your browser. It uses the

$ cd virtual-hadoop-cluster $ vagrant up


Now we can install the Hadoop stack.

Installing Hadoop and Related Components
  1. Surf to: http://vm-cluster-node1:7180.
  2. Login with admin/admin.
  3. Select Cloudera Express and click Continue twice.
  4. On the page where you have to specifiy hosts, enter the following: vm-cluster-node[1-4] and click Search. Four nodes should pop up and be selected. Click Continue.
  5. On the next page (“Cluster Installation > Select Repository”), leave everything as is and click Continue.
  6. On the next page (“Cluster Installation > Configure Java Encryption”) I’d advise to tick the box, but only if your country allows it. Click Continue.
  7. On this page do the following:
    • Login To All Hosts As: Another user -> enter vagrant
    • In the two password fields enter: vagrant
    • Click Continue.
  8. Wait for Cloudera Manager to install the prerequisites… and click Continue.
  9. Wait for Cloudera Manager to download and distribute the CDH packages… and click Continue.
  10. Wait while the installer is inspecting the hosts, and Run Again if you encounter any (serious) errors (I got some that went away the second time). After this, click Finish.
  11. For now, we’ll install everything but HBase. You can add HBase later, but it’s quite taxing for the virtual cluster. So on the “Cluster Setup” page, choose “Custom Services” and select the following: HDFS, Hive, Hue, Impala, Oozie, Solr, Spark, Sqoop2, YARN and ZooKeeper. Click Continue.
  12. On the next page, you can select what services end up on what nodes. Usually Cloudera Manager chooses the best configuration here, but you can change it if you want. For now, click Continue.
  13. On the “Database Setup” page, leave it on “Use Embedded Database.” Click Test Connection (it says it will skip this step) and click Continue.
  14. Click Continue on the “Review Changes” step. Cloudera Manager will now try to configure and start all services.

And you’re Done!. Have fun experimenting with Hadoop!

Categories: Hadoop

Meet the Data Scientist: Sandy Ryza

Cloudera Blog - Thu, 06/19/2014 - 15:53

Meet Sandy Ryza (@SandySifting), the newest member of Cloudera’s data science team. See Sandy present at Spark Summit 2014 (June 30-July 1 in San Francisco; register here for a 20% discount).

What is your definition of a “data scientist”?

To put it in boring terms, data scientists are people who find that the bulk of the work for testing their hypotheses lies in manipulating quantities of information – AKA, “San Franciscans with calculators.” 

About what parts of the field are you excited in particular?

The proliferation (or “Cambrian Explosion”, as Josh Wills would say) of tools and frameworks for munging and analyzing massive data. A few years ago, we really only had MapReduce for dealing with data at a large scale. Since then, a bunch of players have written engines that can handle more of the data transformation patterns required for complex analytics. I think eventually the market will need standardize on two or three, and I’m curious to see how it plays out.

When it comes to datasets, I’m always trying to find time to play around with traffic data. CalTrans releases the measurements they collect from loop detectors on freeways all over California every 30 seconds, so that’s my go-to whenever I want to try out a new tool or technique.

You previously worked as a software engineer (and are an Apache Hadoop committer). How are the two roles different?

At Cloudera, “data scientist” is a much more external facing role. We don’t have massive data problems of our own, but our customers have a huge variety. I get to spend time understanding these problems, helping out with them, and thinking about how to make our platform a good place to solve them.

That said, my job is still very heavy on the engineering. I spend half of my time contributing to Apache Spark and some more of it writing code for customer engagements. It’s hard to get away from distributed systems over here.  

Josh Wills says data scientists would be more accurately called “data janitors”. Do you agree?

At this point, thanks to the insistence of Josh and others, I think the statement that data science boils down to a bunch of data munging is mostly an accepted truism. But this fact is probably just a stronger endorsement of the term. Anybody who’s done “real” science knows that it’s not exactly about flying around, splashing nature with elegant statistical sprinkles, and “deriving insights”. (Not that I’ve done any real science.) If data science is 20% statistics and 80% engineering, and science is 1% inspiration and 99% perspiration, I’m just happy I get to save on my deodorant bill. 

You’re presenting at Spark Summit 2014. What are your topics, and why should people attend?

I’ll be giving a couple talks. One is about Spark-on-YARN: a deep dive into how the Spark and YARN resource management models intersect, along with some practical advice on how to operate them on your own cluster. The other is about Oryx, MLLib, and machine learning infrastructure. I’ll be talking about what it means to put a machine learning model into production and our plan to base Oryx, an open source project that combines model-serving and model building, on Spark’s streaming and ML libraries.

What is your advice for aspiring data scientists?

This might apply to any job, but I think a good data scientist needs to work on being a good translator and a good teacher. Besides memorizing obscure R syntax and trying to convince their superiors that implementing a Dynamic Bayesian net is a good idea, a data scientist spends most of their time striving to understand problems in the language of the people who have them. For a statistical model or data pipeline to be worth anything, its creator needs to be able to articulate its limitations and the problem it solves. 

Interested in becoming a data scientist yourself? Explore Cloudera’s Intro to Data Science training or Data Science certification program.

Categories: Hadoop

Project Rhino Goal: At-Rest Encryption for Apache Hadoop

Cloudera Blog - Tue, 06/17/2014 - 15:59

An update on community efforts to bring at-rest encryption to HDFS — a major theme of Project Rhino.

Encryption is a key requirement for many privacy and security-sensitive industries, including healthcare (HIPAA regulations), card payments (PCI DSS regulations), and the US government (FISMA regulations).

Although network encryption has been provided in the Apache Hadoop platform for some time (since Hadoop 2.02-alpha/CDH 4.1), at-rest encryption, the encryption of data stored on persistent storage such as disk, is not. To meet that requirement in the platform, Cloudera and Intel are working with the rest of the Hadoop community under the umbrella of Project Rhino — an effort to bring a comprehensive security framework for data protection to Hadoop, which also now includes Apache Sentry (incubating) — to implement at-rest encryption for HDFS (HDFS-6134 and HADOOP-10150).

With this work, encryption and decryption will be transparent: existing Hadoop applications will be able to work with encrypted data without modifications. Data will be encrypted with configurable ciphers and cipher modes, allowing users to choose an appropriate level of confidentiality. Because encryption is being implemented directly in HDFS, the full spectrum of HDFS access methods and file formats will be supported.

Design Principles

At-rest encryption for HDFS centers around the concept of an encryption zone, which is a directory where all the contents are encrypted with a unique encryption key. When accessing data within an encryption zone, HDFS clients will transparently fetch the encryption key from the cluster key server to encrypt and decrypt data. Encryption keys can also be rolled in the event of compromise or because of corporate security policy. Subsequent files will be encrypted with the new encryption key, while existing files can be rewritten by the user to be encrypted with the new key.

Access to encrypted data is dependent on two things: appropriate HDFS-level filesystem permissions (i.e. Unix-style permissions and access control lists) as well as appropriate permissions on the key server for the encryption key. This two-fold scheme has a number of nice properties. Through the use of HDFS ACLs, users and administrators can granularly control data access. However, because key server permissions are also required, compromising HDFS is insufficient to gain access to unencrypted data. Importantly, this means that even HDFS administrators do not have full access to unencrypted data on the cluster.

A critical part of this vision is the cluster key server (for example, Cloudera Navigator Key Trustee). Since we foresee customer deployments with hundreds to thousands of encryption zones, an enterprise-grade key server needs to be secure, reliable, and easy-to-use. In highly-regulated industries, robust key management is as valid a requirement as at-rest encryption itself.

An equally important part of this vision is support for hardware-accelerated encryption. Encryption is a business-critical need, but it can be unusable if it carries a significant performance penalty. This requirement is addressed by other Rhino-related contributions from Intel (HDFS-10693), which will provide highly optimized libraries using AES-NI instructions available on Intel processors. By using these Intel libraries, HDFS will be able to provide access to encrypted data at hardware speeds with minimal performance impact.


At-rest encryption has been a major objective for Rhino since the effort’s inception. With the new Cloudera-Intel relationship — and the addition of even more engineering resources at our new Center for Security Excellence — we expect at-rest encryption to be among the first committed features to arise from Rhino, and among the first such features to ship inside Cloudera Enterprise 5.x.

Andrew Wang and Charles Lamb are Software Engineers on the HDFS team at Cloudera. Andrew is also a Hadoop PMC member.

To learn more about comprehensive, compliance-ready security for the enterprise, register for the upcoming webinar, “Compliance-Ready Hadoop,” on June 19 at 10am PT.

Categories: Hadoop

How-to: Easily Do Rolling Upgrades with Cloudera Manager

Cloudera Blog - Thu, 06/12/2014 - 16:07

Unique across all options, Cloudera Manager makes it easy to do what would otherwise be a disruptive operation for operators and users.

For the increasing number of customers that rely on enterprise data hubs (EDHs) for business-critical applications, it is imperative to minimize or eliminate downtime — thus, Cloudera has focused intently on making software upgrades a routine, non-disruptive operation for EDH administrators and users.

With Cloudera Manager 4.6 and later, it is extremely easy to do minor version upgrades (from CDH 4.3 to CDH 4.6) with zero downtime, and with no service interruptions during the process. The parcels binary packaging format is the key to this process: Parcels allow administrators to keep multiple versions of EDH software on their clusters simultaneously, and make transitions across versions seamless.

There are two steps involved in doing a rolling upgrade:

  1. Distributing and activating a newer version of the relevant parcel (described in detail here)
  2. Doing a rolling restart of the existing cluster, which I will explain in the remainder of this relatively brief post
Doing the Rolling Restart

To start the rolling restart operation, go to the Actions menu of your cluster in Cloudera Manager and select Rolling Restart. (Having a highly available NameNode is a pre-requisite for the rolling restart operation because it ensures that all data is available throughout the operation.)

Selecting Rolling Restart will take you to a pop-up where you can specify the parameters for the operation:

The important parameters in the dialog box are as follows:

  • Services to restart: Here you specify which services should be restarted with no downtime. Any service that does not have a single point of access for the clients (including HDFS, Apache HBase, Apache MapReduce, Apache Oozie, and Apache ZooKeeper) is eligible for rolling restart because we can ensure that clients do not face any service interruptions while the operation is going on. This setting is required and the form provides more options to select what roles to restart from the selected services
    • Roles to include: Here you can specify if you want to restart only worker roles (DataNode, TaskTracker, and RegionServer), only non-worker roles (NameNode, JobTracker, HBase Master, ZooKeeper Server, and so on), or all roles. This setting offers the flexibility to pick the order in which the roles are restarted.
    • Role filters: These filters are present to allow users the convenience to only restart the specific roles that have a new configuration or a new software version. One common use case for these role filters is to resume the rolling restart operation if it fails due to host failures in the middle of the operation. If that happens, you can trigger rolling restart again with the appropriate role filter and it will resume restarting roles that haven’t been restarted after software upgrade or configuration changes. The list of roles on which the rolling restart operation is performed is an intersection of “Roles to include” and the selected role filter.
  • Batch size: Here you can specify how many hosts to restart at a time. Typically, you would choose 5-10% of the size of the cluster here if you have multiple racks. If you select a higher batch size, the overall rolling restart operation will finish sooner, but the cluster performance will be reduced during the operation as there would be fewer worker roles active at any given time. If you have a single rack, then the batch size should be left as 1.

Once you’ve specified the parameters, click Confirm, which will take you to the Command Details page for the command that is consequently triggered.

The command will restart the selected services and their roles in proper order to ensure zero downtime for EDH users. The rolling restart operation itself comprises several steps that occur in the background:

  • Master rolling restart: Highly available master roles like NameNode and JobTracker, along with other auxiliary roles in the cluster, are restarted without interrupting services to EDH consumers. To do so, one of the masters is always available and active during the operation.
  • Worker rolling restart: This step involves restarting the worker roles of each service properly to not affect any running jobs or clients accessing data in EDH. The roles are restarted rack-by-rack. By default, Hadoop maintains replicas of every block on at least two racks (provided there are multiple racks), which ensures that all data is always available to the clients. Within each rack, the hosts are grouped in specified batch sizes alphabetically and then roles are restarted on them. For some roles, a simple restart is sufficient, while others must be decommissioned to ensure that another worker role is servicing the client.

The operation can take a little while to finish depending on cluster size and the chosen batch size. Should a host failure occur, Cloudera Manager lets users easily recover and resume  the operation using the role filters described above.


As you can see, what would otherwise be a complex operation for doing a software upgrade with no downtime is made extremely easy by Cloudera Manager — and can be done without ever leaving the UI. More information about rolling upgrades can be found in Cloudera Manager documentation.

Vikram Srivastava is a Software Engineer at Cloudera.

Categories: Hadoop

This Month in the Ecosystem (May 2014)

Cloudera Blog - Mon, 06/09/2014 - 16:21

Welcome to our ninth edition of “This Month in the Ecosystem,” a digest of highlights from May/early June 2014 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly).

More good news!

  • Hadoop Summit San Jose 2014 wrapped up. Every attendee will have a different lens on the experience, but for me, the main takeaway was the increasingly mainstream presence of the enterprise juggernaut called Apache Hadoop. Clearly, Hadoop has earned a permanent place in the data center alongside the incumbents. (For further thoughts from Hadoop’s pioneers, watch this video of a pre-event Hive Think Tank panel on the topic, “Beyond MapReduce.”)
  • As further support for the observation above, Cloudera’s new acquisition of Gazzang (as a complement to the platform work being led by Intel and Cloudera under Project Rhino) puts an exclamation point on the acute need for comprehensive security around production Hadoop deployments: Perimeter and access control, auditing and lineage, and data protection/encryption/key management. Clearly, organizations in regulated industries are taking Hadoop very, very seriously — and they have the most stringent requirements of all.
  • New benchmark testing from Cloudera revealed the current state of SQL-on-Hadoop technology across the ecosystem: Impala, Apache Hive, Presto, and Shark. (Preview: Impala performance leads by 950% or more under multiuser workload, with much higher CPU efficiency.)
  • Apache Spark 1.0 was released, signifying an important milestone for that rapidly growing effort (currently the most active project in the ecosystem, based on number and diversity of contributors).
  • Parquet, the general-purpose columnar storage format for Hadoop, became an Apache Incubator project. Over the past couple of years, only Spark rivals it for rapidity of adoption.
  • Black Duck Software published the results of its annual Future of Open Source survey. Survey says? Eighty percent of respondents choose OSS for its quality, and half of all corporations are expected to contribute to and adopt some form of OSS in 2015.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

Capacity Planning with Big Data and Cloudera Manager

Cloudera Blog - Fri, 06/06/2014 - 15:42

Thanks to Bill Podell, VP Big Data and BI Practice, MBI Solutions, for the guest post below.

Capacity planning has long been a critical component of successful implementations for production systems. Today, Big Data calls for a particularly deep understanding of capacity management – because resource utilization explodes as business users, analysts, and data scientists jump onboard to analyze and use newly found data. The resource impact can escalate very quickly, causing poor loading and or response times. The result is throwing more hardware at the issue without any understanding of what impact the new hardware will have on the current issue. Better yet, be proactive and know about the problem before the problem even occurs!

In this post, I’ll offer an overview of how MBI Solutions built its Capacity Planning and Forecasting solution to utilize the performance metrics captured by Cloudera Manager. This solution provides a broad range of models/predictive analytics including: automated time forecasting models, extrapolation models, “what-if” scenarios, and optimal allocation models.

The Inside Story

As noted above, the application that is written specifically and certified to utilize the Cloudera Manager API version 4 and 5 to connect and extract hundreds of performance related metrics captured on a constant basis. Being non-intrusive to the data that is on your system, it does not care or interface with specific data elements. The extracted metrics are configurable to allow for customizations based upon utilization and specifics to your own environment.

The application can run locally or externally and typically once a day. Once the metric collection  is completed, it connects to an FTP site and uploads the data to be loaded into the MBI Capacity Planning engine. The load of the data creates a hierarchy of the Hadoop schema as seen below:

Each hierarchy level is drillable and allows for all the metrics extrapolated from Cloudera Manager to be viewed. More than 100 metrics are used to compile the capacity planning reports at all the different levels. As an example:

Thresholds at all the levels can be incorporated so that alerting will automatically be generated as the different scenarios are built to look into the future of the load capacity based upon the current level of growth. This feature helps users quickly understand what resources, if any, will be at risk 30, 60, and 90 days out so they can be addressed well before crossing the threshold. 

Below is an example of threshold monitoring. In this case, the monitor is set at 75 percent CPU utilization, and the forecasted model shows that in the next three months, CPU utilization based upon the previous three months will only hit 30 percent:

Below is an example of predictive analytics based on a potential upgrade in CPU and the predicted effect on system utilization:


Capacity planning is a requirement for any and all production systems, and for systems involving Big Data — where the makeup of the system is changing on a constant basis either in growth of data, users, and analytic processes – it’s particularly important. The solution described above, thanks to integration with Cloudera Manager via its API, makes that capability real.

Categories: Hadoop

How-to: Use Kite SDK to Easily Store and Configure Data in Apache Hadoop

Cloudera Blog - Wed, 06/04/2014 - 15:46

Organizing your data inside Hadoop doesn’t have to be hard — Kite SDK helps you try out new data configurations quickly in either HDFS or HBase.

Kite SDK is a Cloudera-sponsored open source project that makes it easier for you to build applications on top of Apache Hadoop. Its premise is that you shouldn’t need to know how Hadoop works to build your application on it, even though that’s an unfortunately common requirement today (because the Hadoop APIs are low-level; all you get is a filesystem and whatever else you can dream up — well, code up).

The goal of Kite’s Data module (which is used by Spring XD’s HDFS Dataset sink to store payload data in Apache Avro or Apache Parquet [incubating] format) is to simplify this process by adding a higher-level compatibility layer. The result is much more similar to how you would work with a traditional database: you describe the records you need to store, provide some basic information about how queries will look, and start streaming records into a data set.

Kite’s API lets you work in terms of data sets, views of data, and records rather than low-level storage. And interacting with a dataset is the same whether it’s stored as data files in HDFS or as records in a collection of Apache HBase cells.

In this post, you’ll learn how to use Kite to store a CSV data set in HDFS as well as Apache HBase (using the same commands), without knowing low-level details like file splits, formats, or serialization!

Using the Kite CLI

Until recently, Kite’s data support could be used only as an API. But version 0.13.0 introduced a command-line interface (CLI) that handles mundane tasks so that you can spend your time solving more interesting business problems.

To get started, download the command-line interface jar. All the dependencies you’ll need when you’re running on a cluster are included in executable jar. You’ll use the Cloudera QuickStart VM to run the commands, which you can download and use to follow along if you don’t already have a cluster. If you’re following along, you can download the jar and make it executable using these commands:

curl -L http://tiny.cloudera.com/dataset-0.14.0 -o dataset chmod +x dataset


First, run the dataset command without any arguments. You’ll see an extended version of this message, which outlines the commands and what they do:

[cloudera@localhost ~]$ ./dataset Usage: dataset [options] [command] [command options] Options: -v, --verbose, --debug Print extra debugging information Commands: Help Retrieves details on the functions of other commands


Next, you are going to use a few of these commands to create both datasets. The example data is a set of movie ratings from GroupLens. You’ll be using the small set with about 17,000 records, but the process is the same for any size dataset.

First, download and unzip the archive, and then rename the file u.item to movies.psv to make it easier to keep track of the relevant content. This is a pipe-separated file with general information on a collection of movies.

cp ml-100k/u.item movies.psv


Show the first line of the file to see what this data looks like:

[cloudera@localhost ~]$ head -n 1 movies.psv 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0


To load this data, you need to tell Kite what it looks like using a schema file. A schema is like a table description and tells Kite what fields are in the data and the type of each field. You can have Kite generate that schema by inspecting the data, but you need to tell it what the fields are because the movies data has no header.

Open the movies.psv file with your favorite editor and add a header line with names for the first few columns. The additional data without headers will be ignored. Your data file should look like this:

[cloudera@localhost ~]$ head -n 2 movies.psv id|title|release_date|video_release_date|imdb_url 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0


Now you can use Kite’s csv-schema command to generate a schema. This requires a couple of extra arguments besides the sample data file. You use --class to pass a name for the record schema being created, and --delimiter to set the field separator character, which is a pipe. (Note that Kite has inspected the sample data and identified that the id field is a long. It also assumes that all the fields may be null, and allows null for each type as well.)

[cloudera@localhost ~]$ ./dataset csv-schema --class Movie movies.psv --delimiter '|' { "type" : "record", "name" : "Movie", "doc" : "Schema generated by Kite", "fields" : [ { "name" : "id", "type" : [ "null", "long" ] }, { "name" : "title", "type" : [ "null", "string" ] }, { "name" : "release_date", "type" : [ "null", "string" ] }, { "name" : "video_release_date", "type" : [ "null", "string" ] }, { "name" : "imdb_url", "type" : [ "null", "string" ] } ] }


Save this schema to a file so you can pass it to Kite when you create the movies dataset. The --output option will save it directly to HDFS.

[cloudera@localhost ~]$ ./dataset csv-schema --class Movie movies_sample.psv --delimiter '|' --output hdfs:/user/cloudera/schemas/movie.avsc


Creating the Dataset

Now that you have a description of the movie data, you’re ready to create a dataset. You will use the create command, passing a name for the dataset, movies, and the path to the newly created schema.

[cloudera@localhost ~]$ ./dataset create movies --schema hdfs:/user/cloudera/schemas/movie.avsc


To confirm that it worked, let’s view the schema of the dataset we just created using the schema command.

[cloudera@localhost ~]$ ./dataset schema movies { "type" : "record", "name" : "Movie", "doc" : "Schema generated by Kite", ... }


You should see the same schema that you just created. You’re ready to load some data using the csv-import command. Because you’ve already configured the schema, Kite knows how to store the data, so you just need to tell Kite that the file is pipe-delimited just like when you inspected it to build a schema.

[cloudera@localhost ~]$ ./dataset csv-import movies.psv movies --delimiter '|' Added 1682 records to dataset "movies"


Now you have a data set of 1,682 movies! The data is available to Apache Hive and Impala, and you can use the show command to list the first 10:

[cloudera@localhost ~]$ ./dataset show movies {"id": 1, "title": "Toy Story (1995)", "release_date": "01-Jan-1995", "video_release_date": "", "imdb_url": "http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)"} {"id": 2, "title": "GoldenEye (1995)", "release_date": "01-Jan-1995", "video_release_date": "", "imdb_url": "http://us.imdb.com/M/title-exact?GoldenEye%20(1995)"} {"id": 3, "title": "Four Rooms (1995)", "release_date": "01-Jan-1995", "video_release_date": "", "imdb_url": "http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)"}


Moving to HBase

So far, you’ve created a dataset stored in HDFS, but that’s not very new. Lots of tools can do that, but Kite is different because tools built with Kite can work equally well with other storage engines. Next, you will build the same dataset in HBase, with just a little extra configuration.

Creating the movies dataset in HBase uses the same commands, but you need two extra bits of configuration: a column mapping and a partition strategy. A column mapping tells Kite where to store record fields in HBase, which stores values in cells that are identified by column family and qualifier. Here is an example column mapping that stores the “title” field for each movie as a column with family “m” and qualifier “title”:

{ "source" : "title", "type" : "column", "family": "m", "qualifier": "title" }


Each column needs a mapping in the schema’s “mapping” section. Kite doesn’t help generate this configuration yet, so you can download a finished version.

curl -L http://tiny.cloudera.com/movie-hbase.avsc -o movie-hbase.avsc


Next, a partition strategy is a configuration file that tells Kite how to store records. Kite uses the partition strategy to build a storage key for each record, and then organizes the dataset by those keys. For HDFS, keys identify the directory, or partition, where the data is stored, just like Hive. In HBase, a key uniquely identifies a record, and HBase groups them into files as needed.

Kite can also help create partition strategies using the partition-config command. It turns a series of source:type pairs into a formatted strategy. For this example, though, you are just using one field in the storage keys, the record id, copied into the key.

[cloudera@localhost ~]$ ./dataset partition-config id:copy -s movie-hbase.avsc [ { "source" : "id", "type" : "identity", "name" : "id_copy" } ]


Source data for a partition is required, so another, minor, change in the movie-hbase.avsc file is that the id field doesn’t have a null option anymore.

Use --output again to save this to the local filesystem as id.json.

[cloudera@localhost ~]$ ./dataset partition-config id:copy -s movie-hbase.avsc -o id.json


Loading the Data into HBase

Now that you have a schema with column mappings and a partition strategy file, you are ready to use the create command and load data. The only difference is that you add --partition-by to pass the partition config and --use-hbase to store data in HBase.

[cloudera@localhost ~]$ ./dataset create movies --use-hbase -s movie-hbase.avsc --partition-by id.json [cloudera@localhost ~]$ ./dataset csv-import movies.psv movies --delimiter '|' --use-hbase Added 1682 records to dataset "movies" [cloudera@localhost ~]$ ./dataset show movies --use-hbase {"id": 1, "title": "Toy Story (1995)", "release_date": "01-Jan-1995", "video_release_date": "", "imdb_url": "http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)"} {"id": 2, "title": "GoldenEye (1995)", "release_date": "01-Jan-1995", "video_release_date": "", "imdb_url": "http://us.imdb.com/M/title-exact?GoldenEye%20(1995)"} {"id": 3, "title": "Four Rooms (1995)", "release_date": "01-Jan-1995", "video_release_date": "", "imdb_url": "http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)"}



One of the hardest problems when moving data to Hadoop is figuring out how you want to organize it. What I’ve just demonstrated — loading data into both Hive and HBase with variations of the same commands — helps solve that problem by enabling you to try out new configurations quickly, even between very different storage systems. That’s more time to spend running performance tests, rather than debugging proof-of-concept code.

Ryan Blue is a Software Engineer at Cloudera, currently working on the Kite SDK.

Categories: Hadoop

Apache Spark 1.0 is Released

Cloudera Blog - Fri, 05/30/2014 - 17:56

Spark 1.0 is its biggest release yet, with a list of new features for enterprise customers.

Congratulations to the Apache Spark community for today’s release of Spark 1.0, which includes contributions from more than 100 people (including Cloudera’s own Diana Carroll, Mark Grover, Ted Malaska, Sean Owen, Sandy Ryza, and Marcelo Vanzin). We think this release is an important milestone in the continuing rapid uptake of Spark by enterprises — which is supported by Cloudera via Cloudera Enterprise 5 — as a modern, general-purpose processing engine for Apache Hadoop.

Spark 1.0 contains, among other things:

  • History Server, for improved monitoring capabilities
  • Improvements to MLLib (Sparse Vector Support)
  • Improvements to Apache Avro integration
  • Support for Java 8 and lambda expressions
  • Simplified job submission to YARN cluster
  • Spark Streaming integration with Kerberos
  • Authentication of all Spark communications
  • Introduction of Spark SQL (alpha)
  • Unified application configuration and submission through spark-submit
  • PySpark on YARN support

(You’ll find more details about these features in the Release Notes. You can also read more from Databricks, here.)

Spark 1.0 will be packaged inside Cloudera’s CDH 5.1 release/available as a Cloudera Manager 5.1 parcel, which are forthcoming soon.

Fire it up!

Justin Kestelyn is Cloudera’s developer outreach director.

Spark Summit 2014 is coming (June 30 – July 2)! Register here to get 20% off the regular conference price.

Categories: Hadoop

Apache Spark Resource Management and YARN App Models

Cloudera Blog - Fri, 05/30/2014 - 14:41

A concise look at the differences between how Spark and MapReduce manage cluster resources under YARN

The most popular Apache YARN application after MapReduce itself is Apache Spark. At Cloudera, we have worked hard to stabilize Spark-on-YARN (SPARK-1101), and CDH 5.0.0 added support for Spark on YARN clusters.

In this post, you’ll learn about the differences between the Spark and MapReduce architectures, why you should care, and how they run on the YARN cluster ResourceManager.


In MapReduce, the highest-level unit of computation is a job. The system loads the data, applies a map function, shuffles it, applies a reduce function, and writes it back out to persistent storage. Spark has a similar job concept (although a job can consist of more stages than just a single map and reduce), but it also has a higher-level construct called an “application,” which can run multiple jobs, in sequence or in parallel.

Spark application architecture

For those familiar with the Spark API, an application corresponds to an instance of the SparkContext class. An application can be used for a single batch job, an interactive session with multiple jobs spaced apart, or a long-lived server continually satisfying requests. Unlike MapReduce, an application will have processes, called Executors, running on the cluster on its behalf even when it’s not running any jobs. This approach enables data storage in memory for quick access, as well as lightning-fast task startup time.


MapReduce runs each task in its own process. When a task completes, the process goes away. In Spark, many tasks can run concurrently in a single process, and this process sticks around for the lifetime of the Spark application, even when no jobs are running.

The advantage of this model, as mentioned above, is speed: Tasks can start up very quickly and process in-memory data. The disadvantage is coarser-grained resource management. As the number of executors for an app is fixed and each executor has a fixed allotment of resources, an app takes up the same amount of resources for the full duration that it’s running. (When YARN supports container resizing, we plan to take advantage of it in Spark to acquire and give back resources dynamically.)

Active Driver

To manage the job flow and schedule tasks Spark relies on an active driver process. Typically, this driver process is the same as the client process used to initiate the job, although in YARN mode (covered later), the driver can run on the cluster. In contrast, in MapReduce, the client process can go away and the job can continue running. In Hadoop 1.x, the JobTracker was responsible for task scheduling, and in Hadoop 2.x, the MapReduce application master took over this responsibility.

Pluggable Resource Management

Spark supports pluggable cluster management. The cluster manager is responsible for starting executor processes. Spark application writers do not need to worry about what cluster manager against which Spark is running.

Spark supports YARN, Mesos, and its own “standalone” cluster manager. All three of these frameworks have two components. A central master service (the YARN ResourceManager, Mesos master, or Spark standalone master) decides which applications get to run executor processes, as well as where and when they get to run. A slave service running on every node (the YARN NodeManager, Mesos slave, or Spark standalone slave) actually starts the executor processes. It may also monitor their liveliness and resource consumption.

Why Run on YARN?

Using YARN as Spark’s cluster manager confers a few benefits over Spark standalone and Mesos:

  • YARN allows you to dynamically share and centrally configure the same pool of cluster resources between all frameworks that run on YARN. You can throw your entire cluster at a MapReduce job, then use some of it on an Impala query and the rest on Spark application, without any changes in configuration.
  • You can take advantage of all the features of YARN schedulers for categorizing, isolating, and prioritizing workloads.
  • Spark standalone mode requires each application to run an executor on every node in the cluster, whereas with YARN, you choose the number of executors to use.
  • Finally, YARN is the only cluster manager for Spark that supports security. With YARN, Spark can run against Kerberized Hadoop clusters and uses secure authentication between its processes.
Running on YARN

When running Spark on YARN, each Spark executor runs as a YARN container. Where MapReduce schedules a container and fires up a JVM for each task, Spark hosts multiple tasks within the same container. This approach enables several orders of magnitude faster task startup time.

Spark supports two modes for running on YARN, “yarn-cluster” mode and “yarn-client” mode.  Broadly, yarn-cluster mode makes sense for production jobs, while yarn-client mode makes sense for interactive and debugging uses where you want to see your application’s output immediately.

Understanding the difference requires an understanding of YARN’s Application Master concept. In YARN, each application instance has an Application Master process, which is the first container started for that application. The application is responsible for requesting resources from the ResourceManager, and, when allocated them, telling NodeManagers to start containers on its behalf. Application Masters obviate the need for an active client — the process starting the application can go away and coordination continues from a process managed by YARN running on the cluster.

In yarn-cluster mode, the driver runs in the Application Master. This means that the same process is responsible for both driving the application and requesting resources from YARN, and this process runs inside a YARN container. The client that starts the app doesn’t need to stick around for its entire lifetime.

yarn-cluster mode

The yarn-cluster mode, however, is not well suited to using Spark interactively. Spark applications that require user input, like spark-shell and PySpark, need the Spark driver to run inside the client process that initiates the Spark application. In yarn-client mode, the Application Master is merely present to request executor containers from YARN. The client communicates with those containers to schedule work after they start:

yarn-client mode

This table offers a concise list of differences between these modes:


Key Concepts in Summary
  • Application: This may be a single job, a sequence of jobs, a long-running service issuing new commands as needed or an interactive exploration session.
  • Spark Driver: The Spark driver is the process running the spark context (which represents the application session). This driver is responsible for converting the application to a directed graph of individual steps to execute on the cluster. There is one driver per application.
  • Spark Application Master: The Spark Application Master is responsible for negotiating resource requests made by the driver with YARN and finding a suitable set of hosts/containers in which to run the Spark applications. There is one Application Master per application.
  • Spark Executor: A single JVM instance on a node that serves a single Spark application. An executor runs multiple tasks over its lifetime, and multiple tasks concurrently. A node may have several Spark executors and there are many nodes running Spark Executors for each client application.
  • Spark Task: A Spark Task represents a unit of work on a partition of a distributed dataset. 
Further reading:

Sandy Ryza is a data scientist at Cloudera, and an Apache Hadoop committer.

Spark Summit 2014 is coming (June 30 – July 2)! Register here to get 20% off the regular conference price.

Categories: Hadoop

New SQL Choices in the Apache Hadoop Ecosystem: Why Impala Continues to Lead

Cloudera Blog - Thu, 05/29/2014 - 19:23

Impala continues to demonstrate performance leadership compared to alternatives (by 950% or more), while providing greater query throughput and with a far smaller CPU footprint.

In our previous post from January 2014, we reported that Impala had achieved query performance over Apache Hadoop equivalent to that of an analytic DBMS over its own proprietary storage system. We believed this was an important milestone because Impala’s objective has been to support a high-quality BI experience on Hadoop data, not to produce a “faster Apache Hive.” An enterprise-quality BI experience requires low latency and high concurrency (among other things), so surpassing a well-known proprietary MPP DBMS in these areas was important evidence of progress.
In the past nine months, we’ve also all seen additional public validation that the original technical design for Hive, while effective for batch processing, was a dead-end for BI workloads. Recent examples have included the launch of Facebook’s Presto engine (Facebook was the inventor and world’s largest user of Hive), the emergence of Shark (Hive running on the Apache Spark DAG), and the “Stinger” initiative (Hive running on the Apache Tez [incubating] DAG).
Given the introduction of a number of new SQL-on-Hadoop implementations it seemed like a good time to do a roundup of the latest versions of each engine to see how they differ. We find that Impala maintains a significant performance advantage over the various other open source alternatives — ranging from 5x to 23x depending on the workload and the implementations that are compared. This advantage is due to some inherent design differences among the various systems, which we’ll explain below. Impala’s advantage is strongest for multi-user workloads, which arguably is the most relevant measure for users evaluating their options for BI use cases.

Configuration Cluster

All tests were run on precisely the same cluster, which was torn down between runs to ensure fair comparisons. The cluster comprised 21 nodes, each equipped with:

  • 2 processors, 12 cores, Intel Xeon CPU E5-2630L 0 at 2.00GHz
  • 12 disk drives at 932GB each (one for the OS, the rest for HDFS)
  • 384GB of memory
Comparative Set
  • Impala 1.3.0
  • Hive-on-Tez: The final phase of the 18-month Stinger initiative (aka Hive 0.13)
  • Shark 0.9.2: A port of Hive from UC Berkeley AMPLab that is architecturally similar to Hive-on-Tez, but based on Spark instead of Tez. Shark testing was done on a native in-memory dataset (RDD) as well as HDFS.
  • Presto 0.60: Facebook’s query engine project
  • To ensure a realistic Hadoop workload with representative data-size-per-node, queries were run on a 15TB scale-factor dataset across 20 nodes.
  • We ran precisely the same open decision-support benchmark derived from TPC-DS described in our previous testing (with queries categorized into Interactive, Reporting, and Deep Analytics buckets).
  • Due to the lack of a cost-based optimizer and predicate propagation in all tested engines excepting Impala, we ran the same queries that had been converted to SQL-92-style joins from the previous testing and also manually propagated predicates where semantically equivalent. For consistency, we ran those same queries against Impala — although Impala produces identical results without these modifications.
  • In the case of Shark, manual query hints were needed in addition to the modifications above to complete the query runs. Furthermore, Shark required more memory than available in the cluster to run the Reporting and Deep Analytics queries on RDDs (and thus those queries could not be completed).
  • We selected comparable file formats across all engines, consistently using Snappy compression to ensure apples-to-apples comparisons. Furthermore, each engine was tested on a file format that ensures the best possible performance and a fair, consistent comparison: Impala on Apache Parquet (incubating), Hive-on-Tez on ORC, Presto on RCFile, and Shark on ORC. (Note that native support for Parquet in Shark as well as Presto is forthcoming.)
  • Standard methodical testing techniques (multiple runs, tuning, and so on) were used for each of the engines involved.
Results Single User

Impala on Parquet was the performance leader by a substantial margin, running on average 5x faster than its next best alternative (Shark 0.9.2).

(Note: The results are not shown here, but the queries were also run on Impala/RCFile as a direct comparison to Presto/RCFile — and performance was consistently 20-30% slower than that of Impala/Parquet.)

The two Hive-on-DAG implementations produced similar results, which is consistent with what one would have expected given they have highly similar designs. Presto is the youngest implementation of the four and is held back by the fact that it runs on RCFile, which is a much less effective columnar format than Parquet. We look forward to re-running these benchmarks in a few months when Presto runs on Parquet.

Although these results are exciting in themselves, as previously explained, we believe that measuring latency under a multi-user workload is a more valuable metric — because you would very rarely, if ever, commit your entire cluster to a single query at a time.

Multiple Users

In this test of a concurrent workload, we ran seven Interactive queries (q42, q52, q55, q63, q68, q73, q98) 10 times concurrently. To prevent all processes from running the same queries at the same time, queries were run consistently back-to-back and randomized. Furthermore, because we could not run the full query set for Shark on RDDs, we used only the partition necessary for the Interactive queries to do the single-user and 10-user comparisons.

In this run, Impala widened its performance advantage, performing 9.5x better than the next best alternative:

Throughput and Hardware Utilization

In the above chart you can see that under the (simulated) load of 10 concurrent users, Impala slows down by 1.9x, whereas for other SQL implementations, query performance slows by 2.6x – 8.6x under the same load. This performance difference translates into quality of experience as perceived by the BI user.

We also measured total throughput, or how many queries the system could process in a given hour — which has an impact on the quantity of hardware required to run a SQL workload at a targeted performance level. This metric is a big influence on TCO, where the carrying cost of hardware is typically two-thirds of the TCO of a Hadoop system.

It’s perhaps surprising to see Shark running on data cached as RDDs resulting in slightly slower single-user queries than Shark running directly on HDFS — because the data in HDFS was already in memory (local cache) and RDDs only added overhead. This disparity will widen over time now that HDFS supports in-memory reads (HDFS-4949), which are more efficient than the OS buffer cache. In addition, in-memory writes are planned for an upcoming HDFS release (HDFS-5851). (In the coming months, we plan to re-run these benchmarks using updated versions and with HDFS caching configured.)

CPU efficiency explains how Impala is able to provide lower latency and higher throughput than the alternatives, and why a native high-performance MPP query engine offers benefits that just porting Hive onto a DAG (either Tez or Spark) does not. While utilizing a DAG removes additional I/O costs beyond the initial scan of the data, most of the performance and concurrency gains come from the CPU efficiency of the query engine itself.

Beyond Performance

Based on these results, Impala not only outperforms its nearest competitors, but also proved itself to be a more robust system that requires less manual tuning:

  • The other systems required significant rewrites of the original queries in order to run, while Impala could run the original as well as modified queries.
    Deep knowledge about how to rewrite SQL statements was required to ensure a head-to-head comparison across non-Impala systems to avoid even slower response times and outright query failures, in some cases. For most users of applications or BI tools, such manual writing of queries is highly undesirable, if not impossible.
    In contrast, Impala’s cost-based optimizer and predicate propagation capability allows it to run the queries in the original SQL-89 form of the TPC-DS-derived benchmark or the modified versions with identical performance. Manual predicate propagation in particular is often challenging for users; traditional databases provide automatic propagation similar to that of Impala and incorrect placements can lead to wrong results.
  • Some systems require manual tuning of the JVM’s garbage collection parameters.
    Presto in particular required manual tuning of Java garbage collection in order to achieve its results. Likewise, Shark’s inability to run without manual query hints was partially due to Shark’s dependence on JVM memory management. And Tez either needs more time for startup and smaller queries when running queries in separate containers, or runs into similar challenges when reusing containers.
    Impala’s query execution, however, is written in native code, which not only leads to greater performance and CPU efficiency as demonstrated above, but also offers a more stable multi-user service similar to traditional MPP query engines.

In summary, these new results prove out that Impala achieves better concurrent latency than its competitors while providing high query throughput, and with a far smaller CPU footprint. Furthermore, out of the entire comparative set, only Impala was able to run the queries in their original SQL-89-style join format without modification.

The results above help demonstrate that despite significant engineering investments into alternatives, Impala uniquely delivers on the requirements for BI and SQL analytics by combining:

  • Interactive SQL
  • Ability to handle highly-concurrent workloads
  • Efficient resource usage (so Impala is a “good citizen” in a shared workload environment)
  • Open formats for accessing any data
  • Multi-vendor support (from Cloudera, MapR, and Amazon) to avoid lock-in, and
  • Broad ISV support

As usual, we invite you to do the same testing for yourselves using our openly published benchmark kit — any and all feedback is welcome and appreciated. We’ll bring you more news over time as Impala continues to hit its milestones!

Justin Erickson is Director of Product Management at Cloudera.

Marcel Kornacker is Impala’s architect and the Impala tech lead at Cloudera.

Dileep Kumar is a Performance Engineer at Cloudera.

Categories: Hadoop

How-to: Manage Time-Dependent Multilayer Networks in Apache Hadoop

Cloudera Blog - Tue, 05/27/2014 - 14:22

Using an appropriate network representation and the right tool set are the key factors in successfully merging structured and time-series data for analysis.

In Part 1 of this series, you took your first steps for using Apache Giraph, the highly scalable graph-processing system, alongside Apache Hadoop. In this installment, you’ll explore a general use case for analyzing time-dependent, Big Data graphs using data from multiple sources. You’ll learn how to generate random large graphs and small-world networks using Giraph – as well as play with several parameters to probe the limits of your cluster.

As you’ll see, using an appropriate network representation, and the right tools for the job (such as Gephi for visualization, a Hadoop-Gephi connector, and CDH), can make this process much easier. (Note: Although neither Giraph nor Gephi are currently supported/shipping inside CDH, you may find this exercise interesting for learning, testing, and development purposes.)

The Use Case

Since Hadoop is a relatively recent innovation, attracting more and more people’s attention and even a lot of money, I was interested in the question: What can Wikipedia tell us about the emerging Hadoop market?

Surprisingly, no single Wikipedia page on this topic exists. A quick search for the terms “Hadoop” and “market”, however, shows 49 results. (That is, 49 Wikipedia pages contain both words.) I did not consider non-English pages, which introduced a strong and not quantified bias. Nobody is equipped to analyze all topic pages in every language, so I had to define a purely data-driven approach.

The solution is simple: Wikipedia pages, like other webpages, can easily be ranked with the classic PageRank algorithm, which is implemented in Giraph. But to do that, we would need the full page graph for every page in Wikipedia – and that is definitely not an option. This raises the question: What is a large graph? Or rather, how large are “large graphs”?

Large Graphs and Efficient Representation

To quantify the size of a graph which is represented in a format, suitable for Apache Giraph, one either specifies the required amount of memory that is needed to store the graph in RAM or disk (which is interesting for long-term storage), or counts the number of nodes and edges. Either approach will reveal something about possible limitations, whether on one machine or on a cluster, because in all cases resources are limited. Let’s look at two examples:

  • Wikipedia has about 30,000,000 pages (as of January 2014), and if the average number of links per page is assumed to be around eight, which was (according to S. N. Sorogovstev, p.8) the estimated number of hyperlinks per page in the WWW in 1999, we would need around 230MB to store each link as a byte in the adjacency matrix. To track the dynamics of such a network over five years based on daily snapshots of the network links, one would need around 407GB, but that’s without content – no words, images, edit history, or Java objects. Instead, we could calculate a correlation matrix — which contains the correlation link strength for all node pairs — from multiple node properties, but that would require around 410TB per snapshot (and 145PB for a full year of them!).
  • The social graph used by Facebook has about 1 billion input vectors with 100 features. Facebook claims, among other things, that its “performance and scalability on a 1-trillion edge social graph is two orders of magnitude beyond that scale of other public benchmarks.”

These massive numbers are cause for concern with respect to representation — especially in Hadoop, where data is distributed across many nodes. Whether the graph is stored as a file in Apache Hive, or even in Apache HBase tables, this consideration is critical because not all file formats can be split.

Usually a graph is presumed to be a matrix. If a link has more than one property or the nodes have more than one interaction mode (or multiple connectivity), tensor representation is necessary. Sometimes the nodes are of different types; such networks are called n-partite networks, and if multiple link types between the nodes are possible, multiplex networks – in which each individual link type defines a single network layer. Another option is a “hyper-graph” containing hyper-edges, which are characterized by the three or more nodes they tie together.

Other appropriate representations — like adjacency lists, or node and edge lists – are available. Such graph representations differ from the fundamental matrix representation. They often have a more efficient memory footprint; compressed link lists, for example, do not contain any values for non-existent links. In the case of a matrix, even a link strength of zero would require a stored value. Node identifiers can efficiently be encoded, for example with Huffman coding. Instead of the complete, sometimes quite long, node name like a URI, which is often used in semantic graphs only a number is used during computation, which also lowers memory requirements. In other cases, like in a tabular representation of a vertex cut, which is used in GraphX or Apache Spark, the data model contains additional information about the neighborhood of a node.

This brings us back to our example. We want to analyze how information about the Hadoop market is covered or represented in Wikipedia. Instead of working with the full page graph, I downloaded the neighborhood graphs, shown in Figure 1, wherein a) all languages are presented in an individual color, and b) highlights the local neighborhood. This means that all pages in the same language, like the initial page in English, are shown in group LN (green), while all remaining pages in all other languages are shown in the global neighborhood GN (blue).

Figure 1. Preparation of interconnected local networks for multilingual Wikipedia analysis.

The dark-colored nodes have a special meaning: The green one is our initial page, called central node CN. This node has so-called inter-wiki links to nodes in other languages, which describe the same semantic concept. All dark nodes form the core and all nodes in light colors are the “hull” of the local network.

l personally like the comparison with the model of an atom. Just as multiple atoms form molecules, several such local networks form a neighborhood network.

Such an intermediate result is visualized with Gephi in Figure 2. The neighborhood network for “Hadoop market” consists of manually selected pages (a set of 26 central nodes) and also includes their local and global neighborhoods, which means all direct linked pages in all Wikipedia languages are available.

Figure 2. The local neighborhood network for Wikipedia pages, which represents the Hadoop market, shows three major cluster: the green nodes on the top are all pages about open-source software projects. Companies in the Hadoop market are shown in the middle, and a cluster that contains more fundamental topics, like Java programming, is shown at the bottom.

This approach has two major advantages:

  • We can reduce the amount of data we have to handle without losing the embedding of the pages, and
  • We can easily separate and compare sub-data sets by language for language-dependent analysis.

But we have to be careful: a PageRank computation on this graph would give a different result compared to the result obtained from a full Wikipedia link graph. Comparing the global (full page graph) and local ranking (combined neighborhood graph) would tell us how the extraction of the local neighborhood network influences the results. This process is similar to an external disturbance of the system — one can expect that the absolute values of each node PageRank will differ in both networks, but the ranked list of PageRanks should not differ much (otherwise, the results may not be meaningful).

Now we have an approach that allows efficient extraction of sub-data sets. (Network nodes like Wikipedia pages have many different facets.) We can measure the amount of information on each page (text volume or number of words or sentences), the number of images, or links to other pages. Even a more dynamic view is possible; we just have to create correlation networks from user activity data, like access time series or edit history. All this data should be part of the data set, even if it is not used every time in every computation.

Visualization of Time-Dependent Multilayer Graphs

Before we get into more Giraph-related coding details, we need to add some tools to our graph-analysis workbench for quickly analyzing intermediate results and handling multilayer graphs. (Fortunately, such multilayer networks can be stored in Hive tables using partitions for each layer or for each temporal snapshot.)

We need multiple tables or multiple queries to extract edge and node data separately from one table. One sub-data set contains all the node properties of a neighborhood network. The links are also organized in multiple sub-data sets, one per link type. We can also store all links in just one table, but in this case each link type needs a classifier for filtering the list. (In the case of time-dependent networks, I recommend partitioned tables – this makes loading data into Gephi much faster.)

Figure 3. Gephi visualization of a correlation network.

Both features — semantic graph metadata management and “one-click” import from Hadoop — are implemented in the Gephi-Hadoop-Connector (see Figure 4). The Hive Metastore already stores some details about the data in our tables, like the table name, column names and types, the SerDe with its relevant parameters, and partitions. But in many research projects, one has to track yet more data that describes the meaning of a given table or even a column. Either for selection of filter criteria or for consistency checks, such information is highly relevant.

Figure 4. Metadata management for multiple time-dependent network layers is done via the Etosha plugin, which allows a comprehensive description of each single layer.

Here, we will store all facts about a data set as a page in a Semantic Media Wiki (SMW). This page allows human interaction, collaboration, and even machine access in parallel. Semantic annotations tell the Gephi-Hadoop-Connector all details about the multilayer network, such as which layer belongs to what network and which Hive or Impala query is required to load this data set from Hadoop. This information is retrieved via the query API of the SMW. (A future version of the Etosha Semantic Metastore, or ESM, will work with any type of triple store, which allows a more generic approach based on SPARQL queries.)

Our data-driven market study starts with an extraction of sub-data sets, which are defined by the local neighborhood graphs for a set of manually selected central nodes. We extract the access-rate and edit-event time series for those pages and calculate some characteristic values, like the local relevance index, based on access-rate data. This is shown in Figure 5, and the global relevance index for each page is shown in Figure 6. (Some more details about the analysis procedure are shown in this conference poster, presented during the Spring 2014 meeting of the DPG in Germany.)

Figure 5. Hadoop continuously attracts measurably more attention since 2009 until its “public” breakthrough in 2011 in English.

Figure 6. The increase of attention, people give to Hadoop related topics strongly depends on the language. In non-English Wikipedia projects, the Hadoop market is not that well recognized as in the English Wikipedia, but the increasing trend is clearly visible for Apache Solr (orange) and Hadoop (violet).

To summarize, our graph-analysis toolbox now contains:

  • Giraph 1.0
  • A CDH 4 development cluster in pseudo-distributed mode. We store data in Hive tables and the JDBC connector allows fast data access via Impala.
  • Gephi for visualization, on an external client machine. With the Gephi toolkit library, we can load and manipulate any graph file without showing a GUI. If we have to process a very large number of relatively small graphs, we can use this library within a map-only MapReduce job, but for large-scale processing, we will use Giraph.

Now, we can focus on practical things. First, we have to prepare the analysis workbench and then generate test data sets.

Two InputFormats for random graph generation are available in Giraph. Let’s calculate the PageRank for same random sample graphs.

Build and Deploy Giraph 1.1.0

1. Clone giraphl from Github into directory /home/cloudera/GIRAPH.

mkdir GIRAPH cd GIRAPH git clone <a href="https://github.com/kamir/giraphl.git" target="_blank">https://github.com/kamir/giraphl.git</a> cd giraphl/bin gedit bsg.sh


2. Check and if necessary modify settings: 

##################################################### # Cloudera Quickstart VM C5 # current VERSION !!!!!! ############################# CDHV=5.0.0 USER=cloudera HADOOP_MAPRED_HOME=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce EXJARS=hadoop-mapreduce-examples-2.2.0-cdh5.0.0-beta-2.jar GIRAPH_HOME=/usr/local/giraph JTADDRESS= ZKSERVER= ZKPORT=2181


3. Run the “bootstrap giraph script” in ./giraphl/bin/bsg.sh:

./bsg.sh deploy


4. The build procedure will take some time. After about three to 10 minutes you should see the result:

[INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] Apache Giraph Parent .............................. SUCCESS [4.174s] [INFO] Apache Giraph Core ................................ SUCCESS [29.513s] [INFO] Apache Giraph Examples ............................ SUCCESS [14.484s] [INFO] Apache Giraph Accumulo I/O ........................ SUCCESS [14.126s] [INFO] Apache Giraph HBase I/O ........................... SUCCESS [14.439s] [INFO] Apache Giraph HCatalog I/O ........................ SUCCESS [20.972s] [INFO] Apache Giraph Hive I/O ............................ SUCCESS [22.264s] [INFO] Apache Giraph Gora I/O ............................ SUCCESS [16.835s] [INFO] Apache Giraph Rexster I/O ......................... SUCCESS [0.089s] [INFO] Apache Giraph Rexster Kibble ...................... SUCCESS [3.227s] [INFO] Apache Giraph Rexster I/O Formats ................. SUCCESS [13.861s] [INFO] Apache Giraph Distribution ........................ SUCCESS [31.572s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 3:06.224s [INFO] Finished at: Mon Apr 07 20:43:43 PDT 2014 [INFO] Final Memory: 118M/848M [INFO] ------------------------------------------------------------------------


5. Start a Giraph benchmark with the following command:

hadoop jar giraph-ex.jar org.apache.giraph.benchmark.PageRankBenchmark -Dgiraph.zkList=<a href="" target="_blank"></a> -Dmapreduce.jobtracker.address= -libjars giraph-core.jar -e 1 -s 3 -v -V 50 -w 1


and it should give a comparable output:

14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp 14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:java.compiler= 14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux 14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64 14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:os.version=2.6.32-220.el6.x86_64 14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:user.name=cloudera 14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:user.home=/home/cloudera 14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:user.dir=/usr/local/giraph 14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Initiating client connection, connectString= sessionTimeout=60000 watcher=org.apache.giraph.job.JobProgressTracker@3a1d1cf7 14/04/07 23:39:33 INFO mapreduce.Job: Running job: job_1396880118210_0033 14/04/07 23:39:33 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost.localdomain/ Will not attempt to authenticate using SASL (unknown error) 14/04/07 23:39:33 INFO zookeeper.ClientCnxn: Socket connection established to localhost.localdomain/, initiating session 14/04/07 23:39:33 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost.localdomain/, sessionid = 0x1453c8ace24080c, negotiated timeout = 60000 14/04/07 23:39:33 INFO job.JobProgressTracker: Data from 1 workers - Compute superstep 3: 50 out of 50 vertices computed; 1 out of 1 partitions computed; min free memory on worker 1 - 91.06MB, average 91.06MB 14/04/07 23:39:33 INFO zookeeper.ClientCnxn: EventThread shut down 14/04/07 23:39:33 INFO zookeeper.ZooKeeper: Session: 0x1453c8ace24080c closed 14/04/07 23:39:34 INFO mapreduce.Job: Job job_1396880118210_0033 running in uber mode : false 14/04/07 23:39:34 INFO mapreduce.Job:  map 50% reduce 0% 14/04/07 23:39:36 INFO mapreduce.Job:  map 100% reduce 0% 14/04/07 23:39:42 INFO mapreduce.Job: Job job_1396880118210_0033 completed successfully 14/04/07 23:39:42 INFO mapreduce.Job: Counters: 50     File System Counters       FILE: Number of bytes read=0       FILE: Number of bytes written=182040       FILE: Number of read operations=0       FILE: Number of large read operations=0       FILE: Number of write operations=0       HDFS: Number of bytes read=88       HDFS: Number of bytes written=0       HDFS: Number of read operations=2       HDFS: Number of large read operations=0       HDFS: Number of write operations=1     Job Counters       Launched map tasks=2       Other local map tasks=2       Total time spent by all maps in occupied slots (ms)=7168000       Total time spent by all reduces in occupied slots (ms)=0     Map-Reduce Framework       Map input records=2       Map output records=0       Input split bytes=88       Spilled Records=0       Failed Shuffles=0       Merged Map outputs=0       GC time elapsed (ms)=110       CPU time spent (ms)=1130       Physical memory (bytes) snapshot=370827264       Virtual memory (bytes) snapshot=1842810880       Total committed heap usage (bytes)=300941312     Giraph Stats       Aggregate edges=50       Aggregate finished vertices=50       Aggregate sent message message bytes=1275       Aggregate sent messages=150       Aggregate vertices=50       Current master task partition=0       Current workers=1       Last checkpointed superstep=0       Sent message bytes=0       Sent messages=0       Superstep=4     Giraph Timers       Initialize (ms)=1867       Input superstep (ms)=192       Setup (ms)=25       Shutdown (ms)=9942       Superstep 0 PageRankComputation (ms)=56       Superstep 1 PageRankComputation (ms)=87       Superstep 2 PageRankComputation (ms)=38       Superstep 3 PageRankComputation (ms)=36       Total (ms)=10377     Zookeeper base path       /_hadoopBsp/job_1396880118210_0033=0     Zookeeper halt node       /_hadoopBsp/job_1396880118210_0033/_haltComputation=0     Zookeeper server:port     File Input Format Counters       Bytes Read=0     File Output Format Counters       Bytes Written=0


Congratulations, you have successfully built Giraph (branch: trunk) and tested it for CDH!

Generating Random Graphs with Giraph

Graph classification can be based on measurable properties. “How ordered or how random is the graph structure?” are important questions in graph analysis, and many researchers like to investigate the degree-distribution of any given graphs. Two networks with very different properties are shown in Figure 8 together with the corresponding degree distribution.

Giraph calculates several properties of a given graph, which is loaded before the computation starts, but the graph can be generated on the fly as well. Table 1 shows two VertexInputFormats that are used for random graph generation.

Table 1. VertexInputFormats are used as “on the fly” graph-generators.

Figure 8. Small-world network generated with WattsStrogatzVertexInputFormat

Figure 9. Random graph generated with PseudoRandomVertexInputFormat

Analysis of such a generated graph is done with one of many algorithms, and if the graph should only be stored, use the IdentityComputation with an appropriate EdgeOutputFormat (such as the GraphvizOutputFormat).

Because of a limitation in Gephi, it is necessary to install graphviz to plot graphs defined in DOT format. (OmniGraffle is another option that allows import of a subset of DOT data.) In order to use the generated output in graphviz, the graphivz tools have to be installed. In CentOS this is done via:

sudo yum install graphviz


The following two examples illustrate how the graphs in Figures 8 and 9 can be created with Giraph. Both commands are available as functions in the giraphl bsg.sh script. For Giraph library paths, the variables GEX and GEC are defined as follows:

GEX=/usr/local/giraph/giraph-examples/target/giraph-examples-1.1.0-SNAPSHOT-for-hadoop-2.2.0-jar-with-dependencies.jar GEC=/usr/local/giraph/giraph-core/target/giraph-1.1.0-SNAPSHOT-for-hadoop-2.2.0-jar-with-dependencies.jar


Generate and calculate PageRank for a random network, store it in DOT format:

######################################################################## # RUN PAGERANK on generated graph (PseudoRandom generator InputFormat) # and store for visualization in Graphviz ######################################################################## hadoop jar $GEX org.apache.giraph.GiraphRunner -Dgiraph.zkList=$ZKSERVER:$ZKPORT -Dmapreduce.jobtracker.address=$JTADDRESS -libjars giraph-core.jar org.apache.giraph.examples.SimplePageRankComputation2 -mc org.apache.giraph.examples.SimplePageRankComputation2$SimplePageRankMasterCompute2 -wc org.apache.giraph.examples.SimplePageRankComputation2$SimplePageRankWorkerContext2 -vif org.apache.giraph.io.formats.PseudoRandomVertexInputFormat -ca giraph.pseudoRandomInputFormat.aggregateVertices=60 -ca giraph.pseudoRandomInputFormat.edgesPerVertex=3 -ca giraph.pseudoRandomInputFormat.localEdgesMinRatio=0.2 -vof org.apache.giraph.io.formats.GraphvizOutputFormat -op /user/$USER/goutput/graphviz_pseudo_$NOW -w 4 hadoop fs -getmerge /user/$USER/goutput/graphviz_pseudo_$NOW graphviz_pseudo_$NOW.dot dot -Tps graphviz_pseudo_$NOW.dot -o graph_pseudo_$NOW.ps #######################################################


Generate and calculate PageRank for a small-world network, store it in DOT format:

####################################################### # RUN PAGERANK on generated WattsStrogatzGraph => needs modified sample code # and store for visualization in Graphviz ####################################################### hadoop jar $GEX org.apache.giraph.GiraphRunner -Dgiraph.zkList=$ZKSERVER:$ZKPORT -Dmapreduce.jobtracker.address=$JTADDRESS -libjars $GEC org.apache.giraph.examples.SimplePageRankComputation2 -mc org.apache.giraph.examples.SimplePageRankComputation2$SimplePageRankMasterCompute2 -wc org.apache.giraph.examples.SimplePageRankComputation2$SimplePageRankWorkerContext2 -vif org.apache.giraph.io.formats.WattsStrogatzVertexInputFormat -vof org.apache.giraph.io.formats.GraphvizOutputFormat -ca wattsStrogatz.aggregateVertices=160 -ca wattsStrogatz.edgesPerVertex=4 -ca wattsStrogatz.beta=0.2 -ca wattsStrogatz.seed=1 -op /user/$USER/goutput/graphviz_watts_$NOW -w 4 hadoop fs -getmerge /user/$USER/goutput/graphviz_watts_$NOW graphviz_watts_$NOW.dot dot -Tps graphviz_watts_$NOW.dot -o graph_ws_$NOW.ps #######################################################



You have now learned how to manage time-dependent multilayer networks in Hadoop using Giraph and other tools. To conclude this series, we will compute the correlation matrix from access-rate time series using Apache Crunch and then we calculate the PageRank with Giraph and GraphLab to compare results for static networks, correlation networks, and dependency networks created by two concurring open source graph processing tools, which integrate well into the Hadoop ecosystem.

See you soon, and have fun with Giraph!

Mirko Kämpf is the lead instructor for the Cloudera Administrator Training for Apache Hadoop for Cloudera University.

Categories: Hadoop

Congratulations to Parquet, Now an Apache Incubator Project

Cloudera Blog - Fri, 05/23/2014 - 14:04

Yesterday, Parquet was accepted into the Apache Incubator. Congratulations to all the contributors to what will eventually become Apache Parquet!

In its relatively short lifetime (co-founded by Twitter and Cloudera in July 2013), Parquet has already become the de facto standard for columnar storage of Apache Hadoop data — with native support in Impala, Apache Hive, Apache Pig, Apache Spark, MapReduce, Apache Tajo, Apache Drill, Apache Crunch, and Cascading (and forthcoming in Presto and Shark). Parquet adoption is also broad-based, with employees of the following companies (partial list) actively contributing:

  • ARRIS Enterprises
  • Cloudera
  • Criteo 
  • Netflix
  • Salesforce.com
  • Seznam
  • Stripe
  • Twitter
  • UC Berkeley AMPLab

…and usage of Parquet commercially supported by Cloudera, IBM, MapR, and Pivotal!

With this news, I thought it would be a good time to recap our coverage of Parquet to date for those of you in catch-up mode:

Congrats again, Parquet People! 

 Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

How-to: Configure JDBC Connections in Secure Apache Hadoop Environments

Cloudera Blog - Wed, 05/21/2014 - 15:31

Learn how HiveServer, Apache Sentry, and Impala help make Hadoop play nicely with BI tools when Kerberos is involved.

In 2010, I wrote a simple pair of blog entries outlining the general considerations behind using Apache Hadoop with BI tools. The Cloudera partner ecosystem has positively exploded since then, and the technology has matured as well. Today, if JDBC is involved, all the pieces needed to expose Hadoop data through familiar BI tools are available:

  • HiveServer2 in Apache Hive, which provides multi-user support
  • Cloudera Impala, which provides interactive SQL capabilities on data in HDFS
  • Apache Sentry (incubating), which provides a standard mechanism for finer-grained security and access control over that data

Getting these pieces to work together seamlessly in a Kerberos-secured environment is still tricky. The most common approach is to try to connect to the cluster using Kerberos authentication, which has some challenges that make working with Hadoop different than other JDBC sources:

Client-side challenges

  • JDBC connections require a Kerberos session initiation. This introduces a host of complexities, such as the administrative overheads of setting up two-way cross-realm trust between LDAP and the Kerberos Key Distribution Center, or configuring your LDAP server to authenticate against the KDC.
  • JDBC connections to Apache Hive and Impala require referencing the principle of the system user running the HiveServer or Impala daemon in addition to the user credentials, and that system needs to be visible to the client DNS. This creates a usability problem for the end-user and is a non-starter if the client is on the other side of a DMZ.

Server-side challenges

  • You need all the cluster users in the KDC, or
  • You need one-way cross-realm trust set up between the cluster KDC and Active Directory or LDAP controller, and
  • You need user accounts available on every node in the cluster.

Furthermore, people often have questions about how to get JDBC working with Cloudera’s platform, which JDBC driver to use, what files and folders are needed on the client classpath (and where to get them), and the best way to test and de-bug a JDBC connection.

In this how-to, I’ll address these challenges and answer those questions, as well. (More information about tools that use ODBC is coming soon.)

LDAP Authentication from HiveServer2 or Impalad

Although it’s documented, the LDAP support in HiveServer2 and Impala deserves special emphasis: When HiveServer2 and Impala are configured with LDAP support, they allow users to connect to Kerberized clusters, with Sentry enabled and with no special client-side configuration.

Specifically, this setup is ideal because:

  • HiveServer2 handles authentication against LDAP and runs without impersonation enabled, so it requires no additional user management on the cluster aside from a Sentry policy file. So, setting up a test/certification environment is easy.
  • It requires no initialization of a Kerberos session or a kinit on the client side, reducing errors and complexity.
  • It requires no changes of the JDBC string on the client side, reducing errors and complexity.
  • If all you’re doing is reading and writing to Hive or Impala, this setup requires no additional users or shell access on the cluster.
  • Kerberos setup is easy because you don’t have to worry about how to set up cross-realm trust between the cluster’s KDC and the LDAP server.

Next, lets walk through an end-to-end example of setting up such an environment.

Getting it Working

You will be setting up the following:

  • CDH 5 cluster
  • Linux client system
  • Windows client system with JDK 7, Eclipse installed, and Active Directory
  • The CDH Windows client package (see link to follow)
Setting up the CDH 5 Cluster with JDBC

First, you need a cluster. (Any cluster will do, but for test/cert clusters, I like to use the AWS m2.4xlarge instance because it’s a decent approximation of a Hadoop node.) Before running the Cloudera Manager installer, run this script to prepare the nodes. The script isn’t required, but it saves time and ensures that Cloudera Manager comes up reading healthy by moving the necessary directories to instance storage, giving them enough space for health checks. From Cloudera Manager, install Hive, Impala, and Hue.

The Hue sample setup loads the sample tables you’ll be using.

Cross-platform JDBC in an Unsecure Environment

Next, spin up two additional machines for client purposes. One system is CentOS running Cloudera Manager agent and has a gateway role assigned to it. This is a baseline client system.

The other is running Windows 2012 Server and is unknown to Cloudera Manager. This will be your Active Directory server as well as your Windows JDBC client machine. The environment now looks like this:

For JDBC client testing, you’ll use two tools: Beeline and an arbitrary JDBC client. Beeline alone isn’t sufficient because it’s installed by Cloudera Manager and configured with the correct classpath and configuration, which doesn’t help debug JDBC connectivity from systems not managed by Cloudera Manager. The one you’re using here is similar to the JDBC client hosted at Apache with the following tweaks:

  • It queries the sample tables initialized by Hue.
  • It doesn’t create or load a table, making iteration easier.
  • It accepts JDBC URL, username, and password information on the command line – which lets you iterate without rebuilding the code.

The test cycle is to run queries over JDBC from Beeline first. This will help rule out client and server misconfigurations as well as operator error with connection strings. When the Beeline instance on a gateway node managed by Cloudera Manager can successfully query over JDBC, you can attempt to run the same query from the arbitrary client on the Windows machine.

Getting JDBC working on Windows requires you to have your classpath setup properly including select Hive/JDBC client jars as well as cluster configuration files. Cloudera doesn’t package CDH for Windows, so it’s cumbersome for customers and partners to know which jars and files to include on the classpath. To that end, I put together this CDH JDBC client package, which contains everything you need from CDH in order to make a JDBC connection. Included in the client package is the winutils.exe file, built using a free version of Microsoft Visual Studio from my Windows 2012 server, following these directions but on the source tarball of CDH. Furthermore, it’s important to generate the client configurations from Cloudera Manager and include them in your classpath. The simplest thing to do is to unzip the client configs into the same directory as the client package.

Setting Up Active Directory

Now is a good time to set up Active Directory on the Windows server. Add two users: user1@cloudera.com and user2@cloudera.com.

Testing JDBC

After the sample tables have been set up in hue, ssh to the Linux gateway and run Beeline:

For Hive testing:

0: jdbc:hive2://ip-10-187-41-35.ec2.internal:> !connect jdbc:hive2://ip-10-171-6-67.ec2.internal:10000/default


And start running a test:

select * from sample_07 where salary > 40000;


Also test Impala:

1: beeline> !connect jdbc:hive2://ip-10-187-41-35.ec2.internal:21050/;auth=noSasl


And run the same query (it’ll be a lot faster… because Impala):

select * from sample_07 where salary > 40000;


When Hive and Impala both return success, you’ve established a baseline with JDBC and beeline.

JDBC from Windows

From the Windows server, install JDK 7 and Eclipse. Place the Windows client package on the machine somewhere, and add its contents to the Eclipse project build path along with the unpacked client configurations from Cloudera Manager. Set HADOOP_HOME to the location of the client package. Create Eclipse “run configurations” for connecting to Hive and to Impala.


When both run configurations return success, you’ve established a baseline with a generic JDBC client on a non-Cloudera managed Windows machine.

Now you can start securing the cluster.

Enabling Kerberos Security

First, ssh to the Cloudera Manager node, and begin to enable Kerberos security. For test/dev purposes, Eric Sammer’s krb-bootstrap script is a great place to start. This installs a KDC for you, generates the principles from Cloudera Manager, and puts them in the right place. It also gives you a krb5.conf file to distribute to the rest of the cluster.

Go ahead and install krb5-workstation on every node in the cluster, and then distribute the krb5.conf file clusterwide. This sets you up to follow the documentation, starting with Step 6.

At this point you should have a cluster with Kerberos security enabled. The JDBC client setup we’ve previously tested is now broken. In order to connect with Hive and Impala:

1. kinit the principle for the user that you’ll be using
2. Modify the JDBC connection string to refer to Kerberos security
3. Ensure the user you’re authenticating exists on every node in the cluster

(There’s still no table-level security as provided by Sentry.)

Your Linux client with Beeline looks like this after a kinit:

0: jdbc:hive2://ip-10-187-41-35.ec2.internal:> !connect jdbc:hive2://ip-10-171-6-67.ec2.internal:10000/default;principal=hive/ip-10-171-6-67.ec2.internal@CLOUDERA Connecting to jdbc:hive2://ip-10-171-6-67.ec2.internal:10000/default;principal=hive/ip-10-171-6-67.ec2.internal@CLOUDERA Enter username for jdbc:hive2://ip-10-171-6-67.ec2.internal:10000/default;principal=hive/ip-10-171-6-67.ec2.internal@CLOUDERA: ec2-user Enter password for jdbc:hive2://ip-10-171-6-67.ec2.internal:10000/default;principal=hive/ip-10-171-6-67.ec2.internal@CLOUDERA: *** Connected to: Apache Hive (version 0.12.0-cdh5.0.0) Driver: Hive JDBC (version 0.12.0-cdh5.0.0) Transaction isolation: TRANSACTION_REPEATABLE_READ 1: jdbc:hive2://ip-10-171-6-67.ec2.internal:1>


With Impala it looks largely the same:

beeline> !connect jdbc:hive2://ip-10-187-41-35.ec2.internal:21050/;principal=impala/ip-10-187-41-35.ec2.internal@CLOUDERA scan complete in 4ms Connecting to jdbc:hive2://ip-10-187-41-35.ec2.internal:21050/;principal=impala/ip-10-187-41-35.ec2.internal@CLOUDERA Enter username for jdbc:hive2://ip-10-187-41-35.ec2.internal:21050/;principal=impala/ip-10-187-41-35.ec2.internal@CLOUDERA: ec2-user Enter password for jdbc:hive2://ip-10-187-41-35.ec2.internal:21050/;principal=impala/ip-10-187-41-35.ec2.internal@CLOUDERA: *** Connected to: Impala (version cdh5-1.3.0) Driver: Hive JDBC (version 0.12.0-cdh5.0.0) Transaction isolation: TRANSACTION_REPEATABLE_READ


At this point, you can go get client side authentication going with Windows, but the goal is to make Impala and HiveServer2 authenticate against an AD server running on Windows. You don’t want to have to change any client configuration; rather, you want to authenticate user1 and user2 from LDAP, and have HiveServer2 and/or Impala to apply the correct rules in Sentry. This is accomplished by adding the hive.server2.authentication and hive.server2.authentication.ldap.url entries to the HiveServer2 role in Cloudera Manager:

To enable LDAP authentication for Impala, you do it on the impalad command line:

You should now have authentication via LDAP; test it from Beeline by trying to login to Hive or Impala as an Active Directory user:

5: jdbc:hive2://ip-10-232-28-131.ec2.internal> !connect jdbc:hive2://ip-10-232-28-131.ec2.internal:21050 Connecting to jdbc:hive2://ip-10-232-28-131.ec2.internal:21050 Enter username for jdbc:hive2://ip-10-232-28-131.ec2.internal:21050: user1@cloudera.com Enter password for jdbc:hive2://ip-10-232-28-131.ec2.internal:21050: ********* 0: jdbc:hive2://ip-10-232-28-131.ec2.internal> select * from sample_07 limit 10; +----------+--------------------------------------+------------+---------+ | code | description | total_emp | salary | +----------+--------------------------------------+------------+---------+ | 00-0000 | All Occupations | 134354250 | 40690 | | 11-0000 | Management occupations | 6003930 | 96150 | | 11-1011 | Chief executives | 299160 | 151370 | | 11-1021 | General and operations managers | 1655410 | 103780 | | 11-1031 | Legislators | 61110 | 33880 | | 11-2011 | Advertising and promotions managers | 36300 | 91100 | | 11-2021 | Marketing managers | 165240 | 113400 | | 11-2022 | Sales managers | 322170 | 106790 | | 11-2031 | Public relations managers | 47210 | 97170 | | 11-3011 | Administrative services managers | 239360 | 76370 | +----------+--------------------------------------+------------+---------+ 10 rows selected (1.184 seconds)


You should now be able to add a run configuration for the AD user on the Windows system.

Enabling Sentry

Now, let’s enable Sentry. You’ll create a rule where user1@cloudera.com is an admin user who can do everything, but user2@cloudera.com is a read-only user.

To enable Sentry for HiveServer2 and Impala, check the Sentry authentication box for each service in Cloudera Manager. Also, be sure to disable HiveServer2 Impersonation. When Sentry is enabled, all jobs run as a super user and then Sentry determines access based on the given credentials.

Let’s also tell Sentry to get the user group information from the policy file and not from the Linux authentication system, which in this example has no reference to the Active Directory users:

For our example, let’s use a policy file that says user1@cloudera.com is an admin, but user2@cloudera.com has read-only access. In this example, the policy file contains duplicate entries with the long and short names because of IMPALA-956.

[users] user1@cloudera.com=default_admin, sample_reader, admin user1=default_admin, sample_reader, admin user2@cloudera.com=sample_reader user2=sample_reader [groups] default_admin = default_admin_role sample_reader = sample_reader_role admin = admin_role [roles] # can read both sample tables sample_reader_role = server=server1->db=default->table=sample_07->action=select, server=server1->db=default->table=sample_08->action=select # implies everything on server1, default db default_admin_role = server=server1->db=default # implies everything on server1 admin_role_role = server=server1


At this point, you should be able to experiment with beeline or your own custom JDBC client. When authenticated as user2, you shouldn’t be able to create tables.

Debugging Sentry

Sentry logs all facts that lead up to authorization decisions at the debug level. Therefore, if you do not understand why Sentry is denying access, the best way to debug is to temporarily turning on debug logging.
That can be done by adding log4j.logger.org.apache.sentry=DEBUG to the log4j.properties file for your service. Specifically, you want to look for exceptions and messages such as:

FilePermission server..., RequestPermission server...., result [true|false]


which indicate each evaluation Sentry makes. The FilePermission is from the policy file while RequestPermission is the privilege required for the query. A RequestPermission will iterate over all appropriate FilePermissions until a match is found. If no matching privilege is found, Sentry returns false ("access denied").


Although the process above is a little involved, the end result is a setup that demonstrates Sentry, Kerberos Security, and JDBC access to Hive and Impala without adding any users to the cluster, and without initiating Kerberos sessions on the client side.

Secure clusters set up this way will accept JDBC connections from clients that are completely unaware of Kerberos. We encourage all partners and customers wanting to test or certify in Kerberos secured environments to take this approach.

Jeff Bean has been at Cloudera since 2010, serving on our Solutions Architect Team, Support Team, Training Team, and more recently, Partner Engineering Team.

Categories: Hadoop

How-to: Convert Existing Data into Parquet

Cloudera Blog - Mon, 05/19/2014 - 13:47

Learn how to convert your data to the Parquet columnar format to get big performance gains.

Using a columnar storage format for your data offers significant performance advantages for a large subset of real-world queries. (Click here for a great introduction.)

Last year, Cloudera, in collaboration with Twitter and others, released a new Apache Hadoop-friendly, binary, columnar file format called Parquet. (Parquet was recently proposed for the ASF Incubator.) In this post, you will get an introduction to converting your existing data into Parquet format, both with and without Hadoop.

Implementation Details

The Parquet format is described here. However, it is unlikely that you’ll actually need this repository. Rather, the code you’ll need is the set of Hadoop connectors that you can find here.

The underlying implementation for writing data as Parquet requires a subclass of parquet.hadoop.api.WriteSupport that knows how to take an in-memory object and write Parquet primitives through parquet.io.api.RecordConsumer. Currently, there are several WriteSupport implementations, including ThriftWriteSupport, AvroWriteSupport, and ProtoWriteSupport, with more on the way.

These WriteSupport implementations are then wrapped as ParquetWriter objects or ParquetOutputFormat objects for writing as standalone programs or through the Hadoop MapReduce framework, respectively.

Next, I will demonstrate writing Avro objects to Parquet files.

Non-Hadoop (Standalone) Writer

Here is the basic outline for the program:

// load your Avro schema Schema avroSchema = new Schema.Parser().parse(in); // generate the corresponding Parquet schema MessageType parquetSchema = new AvroSchemaConverter().convert(avroSchema); // create a WriteSupport object to serialize your Avro objects AvroWriteSupport writeSupport = new AvroWriteSupport(parquetSchema, avroSchema); // choose compression scheme compressionCodecName = CompressionCodecName.SNAPPY; // set Parquet file block size and page size values int blockSize = 256 * 1024 * 1024; int pageSize = 64 * 1024; Path outputPath = new Path(outputFilename); // the ParquetWriter object that will consume Avro GenericRecords parquetWriter = new ParquetWriter(outputPath, writeSupport, compressionCodecName, blockSize, pageSize); for (GenericRecord record : SourceOfRecords) { parquetWriter.write(record); }


Alternatively, we could have just generated an AvroParquetWriter like so:

parquetWriter = new AvroParquetWriter(outputPath, avroSchema, compressionCodecName, blockSize, pageSize);


but the first version will generalize to other WriteSupports.

Hadoop MapReduce Writer

In this case you must use the ParquetOutputFormat classes. The driver for a MapReduce job that converts Avro to Parquet:

public class Avro2Parquet extends Configured implements Tool { public int run(String[] args) throws Exception { // all paths in HDFS // path to Avro schema file (.avsc) Path schemaPath = new Path(args[0]); Path inputPath = new Path(args[1]); Path outputPath = new Path(args[2]); Job job = new Job(getConf()); job.setJarByClass(getClass()); Configuration conf = job.getConfiguration(); // read in the Avro schema FileSystem fs = FileSystem.get(conf); InputStream in = fs.open(schemaPath); Schema avroSchema = new Schema.Parser().parse(in); // point to input data FileInputFormat.addInputPath(job, inputPath); job.setInputFormatClass(AvroKeyInputFormat.class); // set the output format job.setOutputFormatClass(AvroParquetOutputFormat.class); AvroParquetOutputFormat.setOutputPath(job, outputPath); AvroParquetOutputFormat.setSchema(job, avroSchema); AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY); AvroParquetOutputFormat.setCompressOutput(job, true); // set a large block size to ensure a single row group. see discussion AvroParquetOutputFormat.setBlockSize(job, 500 * 1024 * 1024); job.setMapperClass(Avro2ParquetMapper.class); job.setNumReduceTasks(0); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Avro2Parquet(), args); System.exit(exitCode); } }


The mapper class is extremely simple:

public class Avro2ParquetMapper extends Mapper, NullWritable, Void, GenericRecord> { @Override protected void map(AvroKey key, NullWritable value, Context context) throws IOException, InterruptedException { context.write(null, key.datum()); } }


You can find the code for the MapReduce Avro-to-Parquet converter here.

Notes on Compression

The Parquet specification allows you to specify a separate encoding/compression scheme for each column individually. However, this feature is not yet implemented on the write path. Currently, choosing a compression scheme will apply the same compression to each column (which should still be an improvement over row-major formats, since each column is still stored and compressed separately). As a general rule, we recommend Snappy compression as a good balance between size and CPU cost.

Notes on Block/Page Size

The Parquet specification allows you to specify block (row group) and page sizes. The page size refers to the amount of uncompressed data for a single column that is read before it is compressed as a unit and buffered in memory to be written out as a “page”. In principle, the larger the page size, the better the compression should be, though the 1MB default size already starts to achieve diminishing returns. The block size refers to the amount of compressed data that should be buffered in memory (comprising multiple pages from different columns) before a row group is written out to disk. Larger block sizes require more memory to buffer the data; the 128MB default size also shows good performance in our experience.

Impala prefers that Parquet files will contain a single row group (aka a “block”) in order to maximize the amount of data that is stored contiguously on disk. Separately, given a single row group per file, Impala prefers that the entire Parquet file will fit into an HDFS block, in order to avoid network I/O. To achieve that goal with MapReduce, each map must write only a single row group. Set the HDFS block size to a number that is greater than the size of the total Parquet output from a single input split — that is, if the HDFS block size is 128MB, and assuming no compression and rewriting the data doesn’t change the total size significantly, then the Parquet block size should be set slightly smaller 128MB. The only concern here is that the output for the entire input split must be buffered in memory before writing it to disk.

You should now have a good understanding of how to convert your data to Parquet format. The performance gains are substantial.

Uri Laserson (@laserson) is a data scientist at Cloudera. Questions/comments are welcome. Thanks to the Impala team, Tom White, and Julien Le Dem (Twitter) for help getting up-and-running with Parquet.


Categories: Hadoop

Meet the Data Scientist: Alan Paulsen

Cloudera Blog - Fri, 05/16/2014 - 18:32

Meet Alan Paulsen, among the first to earn the CCP: Data Scientist distinction.

Big Data success requires professionals who can prove their mastery with the tools and techniques of the Apache Hadoop stack. However, experts predict a major shortage of advanced analytics skills over the next few years. At Cloudera, we’re drawing on our industry leadership and early corpus of real-world experience to address the Big Data talent gap with the Cloudera Certified Professional (CCP) program.

As part of this blog series, we’ll introduce the proud few who have earned the CCP: Data Scientist distinction. Featured today is CCP-04, Alan Paulsen. You can start on your own journey to data science and CCP:DS with Cloudera’s new Data Science Challenge on Detecting Anomalies in Medicare Claims.

What’s your current role?

I am the Senior Hadoop Specialist at Wargaming.net, creators of the massively popular World of Tanks (78 million players) and World of Warplanes. I’m part of the Global Business Intelligence team located in Austin, Texas. My responsibilities include architecting and developing Hadoop/Big Data solutions and handing them off to our talented groups of data scientists and analysts located in Texas, Belarus, and around the world. 

Prior to taking CCP:DS, what was your experience with Big Data, Hadoop, and data science?

I come from a more traditional data engineer and data warehousing background, having learned both Ralph Kimball’s and Bill Inmon’s methodologies. I’ve always gravitated more towards software development, so Hadoop was a natural path for me to take when I encountered it a few years ago. It’s great to be able to dive into the code and see how things work.

Cloudera’s open-source distribution made it easy to get started with the Hadoop ecosystem, and I began learning while getting my hands dirty with large data sets. The openness and ease-of-use of CDH with Cloudera Manager also made it easy to go down the rabbit hole and, invariably, dive really deep into the myriad applications of Hadoop and its related projects. 

As far as data science, I’ve had to wear many hats in the past, but mathematics and statistics have always been weak points for me. When I saw the Cloudera Certified Professional: Data Scientist program, I knew this was just the thing to get me moving out of the Danger Zone, face my challenges head-on, and take my skills to the next level.

What’s most interesting about data science, and what made you want to become a data scientist?

I love data, engineering, and exploration! What better way to combine all of this than with data science? Hadoop can almost be like a magic hat at times, and being able to use that power to exact real change with my work is very gratifying. This extends beyond my professional work and into my personal life as well. Data is everywhere, and we now have the tools available to make sense of it all. 

Here at Wargaming.net, we are performing cutting-edge analysis on user telemetry data to enhance enjoyment and increase player satisfaction. The volume, variety, and velocity of information we see is very exciting, and presents the company with a unique opportunity to both learn from our customers and convert those insights into better experiences for our users.  

How did you prepare for the Data Science Essentials exam and CCP:DS? What advice would you give to aspiring data scientists?

As I mentioned earlier, my biggest area of improvement was mathematics, so I broke out the statistics and linear algebra textbooks. I also reviewed Cloudera’s handy CCP:DS study guide and spent plenty of time at the library with the recommended reading. Mahout in Action was a great book to help bridge the gap between machine learning and Hadoop – it’s also co-authored by Sean Owen, who is the co-author of the current Data Science Challenge. After the exam, I found and took the Coursera course on machine learning, which was absolutely fantastic. I have also heard good things about Cloudera’s Data Analyst Training and Introduction to Data Science: Building Recommender Systems

Regarding the lab, my advice is that you make sure to allocate your time appropriately. I started late and wound up working six weekends in a row. Commit to doing the lab and start it right from the beginning.

Since becoming a CCP:DS in November 2013, what has changed in your career and/or in your life?

I started a new position in December 2013 after earning a place in the first class of CCP:DS. However, the biggest change has been the way I approach problems now that I have a better understanding of data science principals. I think there is a big learning curve in getting started, but once I got over the initial hump, it became much easier to advance my understanding and skillset. 

Another exciting aspect of my entrance into this community is that I am also in a much better position to expand and scale the work of other data scientists into Hadoop and Big Data.

Why should aspiring data scientists consider taking CCP:DS?

Honestly, it’s fun! Yes, you can learn a lot from reading, tutorials, and case studies. However, getting your hands dirty is truly the best way to learn. Without the Data Science Challenge, it would have been tough getting involved in a data science project, especially coupled with Hadoop. Cloudera provides the data set, environment, problem set (not too rigid), and interaction as a mock customer. 

The industry recognition that comes with being a CCP:DS is just icing on the cake! 


Categories: Hadoop