New in CDH 5.2: Apache Sentry Delegated GRANT and REVOKE

Cloudera Blog - Fri, 10/17/2014 - 15:13

This new feature, jointly developed by Cloudera and Intel engineers, makes management of role-based security much easier in Apache Hive, Impala, and Hue.

Apache Sentry (incubating) provides centralized authorization for services and applications in the Apache Hadoop ecosystem, allowing administrators to set up granular, role-based protection on resources, and to review them in one place. Previously, Sentry only designated administrators to GRANT and REVOKE privileges on an authorizable object. In Apache Sentry 1.5.0 (shipping inside CDH 5.2), we have implemented a new feature (SENTRY-327) that allows admin users to delegate the GRANT privilege to other users using WITH GRANT OPTION. If a user has the GRANT OPTION privilege on a specific resource, the user can now grant the GRANT privilege to other users on the same resource. Apache Hive, Impala, and Hue have all been updated to take advantage of this new Sentry functionality.

In this post, we’ll provide an overview of how this new feature works.

Delegating GRANT/REVOKE Privileges

You can use Hive or Impala to grant privileges using the GRANT ... WITH GRANT OPTION SQL statement:

GRANT   priv_type [, priv_type ] ...   ON table_or_view_name   TO principal_specification [, principal_specification] ... [WITH GRANT OPTION];

Note: Impala currently only supports granting/revoking a single privilege at a time (IMPALA-1341).

When WITH GRANT OPTION is specified, the command will give members of the target role privileges to issue their own GRANT

statements. Initially, only a pre-defined set of Sentry admin users can issue GRANT statements.

For example, the following commands will create a new role, sales_dept, and provide members of the role the GRANT OPTION privilege on database salesdb:

USE salesdb; CREATE ROLE sales_dept; GRANT ROLE sales_dept TO GROUP sales_grp; GRANT ALL ON DATABASE salesdb TO ROLE sales_dept WITH GRANT OPTION;

This will give users belonging to the sales_dept role the ability to grant equivalent or lesser privileges—privileges on salesdb or tables under salesdb—to other roles. This status includes the ability to grant using the GRANT OPTION privilege.

Thus, a user who belongs to the sales_dept role will now have privileges to execute commands such as:

GRANT ALL ON TABLE marketing TO ROLE marketing_dept; GRANT SELECT ON DATABASE salesdb TO ROLE marketing_dept;

The GRANT OPTION privilege also allows for granting the GRANT OPTION to other roles. For example, the following will grant the GRANT OPTION privilege to role marketing_dept, which will give members of that role the ability to grant it to other roles:


Viewing Granted Privileges

When managing role privileges, you can determine which privileges have been granted to a role and whether the privilege was granted using WITH GRANT OPTION, using:


This statement returns all privileges granted to a role by all users. It can be executed by admin users or by any user who currently belongs to the role.

An example from Impala is shown below. The statement returns similar results in Hive:

+----------+----------+-----------+-----------+--------------+-----------------+ | scope    | database | table     | privilege | grant_option | create_time     | +----------+----------+-----------+-----------+--------------+-----------------+ | TABLE    | salesdb  | marketing | ALL       | false        | Wed, Oct 01 ... | | DATABASE | salesdb  |           | SELECT    | true         | Wed, Oct 01 ... | +----------+----------+-----------+-----------+--------------+-----------------+

Revoking the GRANT privilege

If a user has the GRANT OPTION privilege, they can also revoke privilege from roles. The Impala and Hive syntax for REVOKE is:

REVOKE [GRANT OPTION FOR]   priv_type [, priv_type ] ...   ON table_or_view_name FROM principal_specification [, principal_specification] ... ;

To revoke only the grant option from a privilege, the GRANT OPTION FOR clause can be added to a REVOKE statement. When this clause is specified, the target privilege will be preserved, but users in the role will no longer be allowed to issue GRANT statements.

Hive does not currently support the GRANT OPTION FOR, but the REVOKE command without this clause will always revoke all privileges (those granted with and without WITH GRANT OPTION). For example, if a role named sales_dept was granted SELECT and INSERT privileges on table marketing:

USE salesdb; GRANT SELECT, INSERT ON TABLE marketing TO ROLE sales_dept;

The following REVOKE will only remove the INSERT on the table marketing, preserving the SELECT privilege:

REVOKE INSERT ON TABLE marketing FROM ROLE sales_dept;

Furthermore, we support the revocation of child privileges when executing the REVOKE command. To revoke all privileges on the database salesdb along with all privileges granted on all child tables:


Future Work

The Hive integration with Sentry is based on Hive 0.13, which does not support the GRANT OPTION FOR clause in the Hive revoke command. In Hive 0.14.0, this syntax is supported and the grant option for a privilege can be removed while still keeping the privilege using REVOKE. (For more information, see SENTRY-473.)

Impala syntax will also be enhanced to match the Hive syntax for granting/revoking multiple privileges to/from multiple roles in a single statement (IMPALA-1341).


This feature is co-developed by Intel and Cloudera. Many thanks to everyone who participated in this work (listed in alphabetical order):

  • Arun Suresh
  • Dapeng Sun
  • Haifeng Chen
  • Lenni Kuff
  • Prasad Mujumdar
  • Sravya Tirukkovalur
  • Xiaomeng Huang

Xiaomeng Huang is a Software Engineer at Intel.

Lenni Kuff is a Software Engineer at Cloudera.

Categories: Hadoop

New in CDH 5.2: More SQL Functionality and Compatibility for Impala 2.0

Cloudera Blog - Thu, 10/16/2014 - 12:40

Impala 2.0 is the most SQL-complete/SQL-compatible release yet.

As we reported in the most recent roadmap update (“What’s Next for Impala: Focus on Advanced SQL Functionality”), more complete SQL functionality (and better SQL compatibility with other vendor extensions) is a major theme in Impala 2.0.

In this post, we’ll describe the highlights (not a complete list), and provide links to the docs that drill-down on these functions.

Analytic (Window) Functions

Analytic functions (aka window functions) are a special category of built-in functions. Analytic functions are frequently used in fields such as finance and science to provide trend, outlier, and bucketed analysis for large data sets. You might also see the term “window functions” in database literature, referring to the interval (the “window”) to which the function call applies, particularly when the query includes a ROWS clause.

Like aggregate functions, analytic functions examine the contents of multiple input rows to compute each output value. However, rather than being limited to one result value per GROUP BY group, they operate on sliding windows where the input rows are ordered and grouped using flexible conditions expressed through an OVER() clause.

Impala 2.0 now supports the following analytic query clauses and pure analytic functions:

  • OVER Clause
  • Window Clause
  • DENSE_RANK() Function
  • FIRST_VALUE() Function
  • LAG() Function
  • LAST_VALUE() Function
  • LEAD() Function
  • RANK() Function
  • ROW_NUMBER() Function

See the docs for more details about these functions.

New Data Types

New data types in Impala 2.0 provide greater compatibility with source code from traditional database systems:

  • VARCHAR is like the STRING data type, but with a maximum length. See VARCHAR Data Type for details.
  • CHAR is like the STRING data type, but with a precise length. Short values are padded with spaces on the right. See CHAR Data Type for details.
Subquery Enhancements

Impala 2.0 also supports a number of subquery enhancements including:

  • Subqueries in the WHERE clause (for example, with the IN operator).
  • EXISTS and NOT EXISTS operators (always used in conjunction with subqueries).
  • The IN and NOT IN queries on the result set form a subquery, not just a hardcoded list of values.
  • Uncorrelated subqueries let you compare against one or more values for equality, IN, and EXISTS comparisons. For example, you might use WHERE clauses such as WHERE column = (SELECT MAX(some_other_column FROM table) or WHERE column IN (SELECT some_other_column FROM table WHERE conditions).
  • Correlated subqueries let you cross-reference values from the outer query block and the subquery.
  • Scalar subqueries let you substitute the result of single-value aggregate functions such as MAX(), MIN(), COUNT(), or AVG(), where you would normally use a numeric value in a WHERE clause.

See the docs for more details.

SQL Operations That Spill to Disk

Certain memory-intensive operations now write temporary data to disk (known as “spilling to disk”) when Impala is close to exceeding its memory limit for a particular node.

For example, when large tables are joined, Impala keeps the distinct values of the join columns from one table in memory, to compare them to incoming values from the other table. When a query uses a GROUP BY clause for columns with millions or billions of distinct values, Impala keeps a similar number of temporary results in memory, to accumulate the aggregate results for each value in the group. When a large result set is sorted by the ORDER BY clause, each node sorts its portion of the result set in memory. The DISTINCT and UNION operators also build in-memory data structures to represent all values found so far, to eliminate duplicates as the query progresses.

The result is a query that completes successfully, rather than failing with an out-of-memory error. The tradeoff is decreased performance due to the extra disk I/O to write the temporary data and read it back in. Thus, while this feature improves reliability and reduces memory usage, you should optimize your queries, system parameters, and hardware configuration to make spilling rare.

See the docs for more details.

More SQL on the Way

The features above are just a few of the most notable highlights in Impala 2.0, which also includes additional SQL functionality such as vendor-specific extensions like DECODE and DATE_PART.

Stay tuned to this blog for information about SQL functionality in future releases.

John Russell is the technical writer for Impala, and the author of Getting Started with Impala (O’Reilly Media).

Categories: Hadoop

Introducing Cloudera Labs: An Open Look into Cloudera Engineering R&D

Cloudera Blog - Wed, 10/15/2014 - 14:19

Cloudera Labs contains ecosystem innovations that one day may bring developers more functionality or productivity in CDH.

Since its inception, one of the defining characteristics of Apache Hadoop has been its ability to evolve/reinvent and thrive at the same time. For example, two years ago, nobody could have predicted that the formative MapReduce engine, one of the cornerstones of “original” Hadoop, would be marginalized or even replaced. Yet today, that appears to be happening via Apache Spark, with Hadoop becoming the stronger for it. Similarly, we’ve seen other relatively new components, like Impala, Apache Parquet (incubating), and Apache Sentry (also incubating), become widely adopted in relatively short order.

This unique characteristic requires Cloudera to be highly sensitive to new activity at the “edges” of the ecosystem — in other words, to be vigilant for the abrupt arrival of new developer requirements, and new components or features that meet them. (In fact, Cloudera employees are often the creators of such solutions.) When there is sufficient market interest and customer success with them seems assured, these new components often join the Cloudera platform as shipping product.

Today, we are announcing a new program that externalizes this thought process: Cloudera Labs (cloudera.com/labs). Cloudera Labs is a virtual container for innovations being incubated within Cloudera Engineering, with the goal of bringing more use cases, productivity, or other types of value to developers by constantly exploring new solutions for their problems. Although Labs initiatives are not supported or intended for production use, you may find them interesting for experimentation or personal projects, and we encourage your feedback about their usefulness to you. (Note that inclusion in Cloudera Labs is not a precondition for productization, either.)

Apache Kafka is among the “charter members” of this program. Since its origin as proprietary LinkedIn infrastructure just a couple years ago for highly scalable and resilient real-time data transport, it’s now one of the hottest projects associated with Hadoop. To stimulate feedback about Kafka’s role in enterprise data hubs, today we are making a Kafka-Cloudera Labs parcel (unsupported) available for installation.

Other initial Labs projects include:

  • Exhibit
    Exhibit is a library of Apache Hive UDFs that usefully let you treat array fields within a Hive row as if they were “mini-tables” and then execute SQL statements against them for deeper analysis.
  • Hive-on-Spark Integration
    A broad community effort is underway to bring Apache Spark-based data processing to Apache Hive, reducing query latency considerably and allowing IT to further standardize on Spark for data processing.
  • Impyla
    Impyla is a Python (2.6 and 2.7) client for Impala, the open source MPP query engine for Hadoop. It communicates with Impala using the same standard protocol as ODBC/JDBC drivers.
  • Oryx
    Oryx, a project jointly spearheaded by Cloudera Engineering and Intel, provides simple, real-time infrastructure for large-scale machine learning/predictive analytics applications.
  • RecordBreaker
    RecordBreaker, a project jointly developed by Hadoop co-founder Mike Cafarella and Cloudera, automatically turns your text-formatted data into structured Avro data–dramatically reducing data prep time.

As time goes on, and some of the projects potentially graduate into CDH components (or otherwise remain as Labs projects), more names will join the list. And of course, we’re always interested in hearing your suggestions for new Labs projects.

Categories: Hadoop

Cloudera Enterprise 5.2 is Released

Cloudera Blog - Tue, 10/14/2014 - 20:01

Cloudera Enterprise 5.2 contains new functionality for security, cloud deployments, and real-time architectures, and support for the latest open source component releases and partner technologies.

We’re pleased to announce the release of Cloudera Enterprise 5.2 (comprising CDH 5.2, Cloudera Manager 5.2, Cloudera Director 1.0, and Cloudera Navigator 2.1).

This release reflects our continuing investments in Cloudera Enterprise’s main focus areas, including security, integration with the partner ecosystem, and support for the latest innovations in the open source platform (including Impala 2.0, its most significant release yet, and Apache Hive 0.13.1). It also includes a new product, Cloudera Director, that streamlines deployment and management of enterprise-grade Hadoop clusters in cloud environments; new component releases for building real-time applications; and new support for significant partner technologies like EMC Isilon. Furthermore, this release ships the first results of joint engineering with Intel, including WITH GRANT OPTION for Hive and Impala and performance optimizations for MapReduce.

Here are some of the highlights (incomplete; see the respective Release Notes for CDH, Cloudera Manager, and Cloudera Navigator for full lists of features and fixes):

  • Via Apache Sentry (incubating) 1.4, GRANT and REVOKE statements in Impala and Hive can now include WITH GRANT OPTION, for delegation of granting and revoking privileges (joint work with Intel under Project Rhino).
  • Hue has a new Sentry UI that supports policy management for visually creating/editing roles in Sentry and permissions on files in HDFS.
  • Kerberos authentication is now supported in Apache Accumulo.
  • Impala, authentication can now be done through a combination of Kerberos and LDAP.
Data Management and Governance
  • Cloudera Navigator 2.1 features a brand-new auditing UI that is unified with lineage and discovery, so you now have access to all Navigator functionality from a single interface.
  • Navigator 2.1 includes role-based access control so you can restrict access to auditing, metadata and policy management capabilities.
  • We’re also shipping a beta policy engine in Navigator 2.1. Targeted to GA by year-end, the policy engine allows you to set up rules and notifications so you can classify data as it arrives and integrate with data preparation and profiling tools. Try it out and let us know what you think!
  • And we’ve added lots of top-requested enhancements, such as Sentry auditing for Impala and integration with Hue.
Cloud Deployment
  • Cloudera Director is a simple and reliable way to deploy, scale, and manage Hadoop in the cloud (initially for AWS) in an enterprise-grade fashion. It’s free to download and use, and supported by default for Cloudera Enterprise customers. See the User Guide for more details.
Real-Time Architecture
  • Re-base on Apache HBase 0.98.6
  • Re-base on Apache Spark/Streaming 1.1
  • Re-base on Impala 2.0
  • Apache Sqoop now supports import into Apache Parquet (incubating) file format
  • Apache Kafka integration with CDH is now incubating in Cloudera Labs; a Kafka-Cloudera Labs parcel (unsupported) is available for installation. Integration with Flume via special Source and Sink have also been provided.
Impala 2.0
  • Disk-based query processing: enables large queries to “spill to disk” if their in-memory structures are larger than the currently available memory. (Note that this feature only uses disk for the portion that doesn’t fit in the available memory.)
  • Greater SQL compatibility: SQL 2003 analytic window functions, support for legacy data types (such as CHAR and VARCHAR), better compliance with SQL standards (WHERE, EXISTS, IN), and additional vendor-specific SQL extensions.
New Open Source Releases and Certifications

Cloudera Enterprise 5.2 includes multiple new component releases:

  • Apache Avro 1.7.6
  • Apache Crunch 0.11
  • Apache Hadoop 2.5
  • Apache HBase 0.98.6
  • Apache Hive 0.13.1
  • Apache Parquet (incubating) 1.5 / Parquet-format 2.1.0
  • Apache Sentry (incubating) 1.4
  • Apache Spark 1.1
  • Apache Sqoop 1.4.5
  • Impala 2.0
  • Kite SDK 0.15.0

…with new certifications on:

  • Filesystems: EMC Isilon
  • OSs: Ubuntu 14.04 (Trusty)
  • Java: Oracle JDK1.7.0_67

Over the next few weeks, we’ll publish blog posts that cover some of these and other new features in detail. In the meantime:

As always, we value your feedback; please provide any comments and suggestions through our community forums. You can also file bugs via issues.cloudera.org.

Categories: Hadoop

How SQOOP-1272 Can Help You Move Big Data from Mainframe to Apache Hadoop

Cloudera Blog - Fri, 10/10/2014 - 21:52

Thanks to M. Asokan, Chief Architect at Syncsort, for the guest post below.

Apache Sqoop provides a framework to move data between HDFS and relational databases in a parallel fashion using Hadoop’s MR framework. As Hadoop becomes more popular in enterprises, there is a growing need to move data from non-relational sources like mainframe datasets to Hadoop. Following are possible reasons for this:

  • HDFS is used simply as an archival medium for historical data living on the mainframe. It is cost effective to store data in HDFS.
  • Organizations want to move some processing workloads to Hadoop to free up CPU cycles on the mainframe.

Regardless, there was no solution available in Sqoop to allow users to easily move data from mainframe to Hadoop. We at Syncsort wanted to fill that void and started discussions with some Sqoop committers at Cloudera. They were very excited and receptive to the idea of Syncsort making an open source contribution to Sqoop. We decided that this contribution would go into Sqoop version 1.x since it is still widely used. We raised a Sqoop JIRA ticket SQOOP-1272 to work towards that.

Brief Introduction to ainframe Datasets

Unlike UNIX where a file consists of a sequence of bytes, files on mainframe operating systems contain structured data. The term “datasets” is used to refer to “files.” (Hereafter, we will use “datasets” when we refer to mainframe files.) Datasets have associated access methods and record formats at the OS level. The OS provides APIs to access records from datasets. Most mainframes run an FTP server to provide remote access to datasets. The FTP server accesses records using these APIs and passes them to clients. A set of sequential (an access method) datasets can be stored under something similar to a directory called Partitioned Data Set (PDS) on the mainframe. FTP is not the only way to access datasets on the mainframe remotely. There are proprietary software products like IBM’s Connect:Direct for remote access to datasets on a mainframe.

Functional Specification

Once we decided to make a contribution, I started working on the functional specification with the Syncsort engineering team. In terms of functionality, we wanted to start with small, but fundamental steps to accomplishing the objective of mainframe connectivity via Sqoop. Our goals were very modest:

  1. Connect to the mainframe using open source software.
  2. Support datasets that can be accessed using sequential access methods.

The first goal was a no-brainer. Sqoop being an Apache project, we decided to use Apache commons-net library for FTP client. The second goal follows the first since mainframe FTP server supports transferring sequential datasets only. On further discussions with the Sqoop team at Cloudera, we decided to support transferring a set of sequential datasets in a partitioned dataset. The transfer will happen in parallel. Each dataset will be stored as a separate HDFS file when the target is HDFS file or Hive/HCatalog. The sequential datasets are transferred to Hadoop in FTP text mode. Each record in the datasets will be treated as one text field in the target.

In essence, a new import tool called import-mainframe would be added to Sqoop. In the --connect option, user can specify a mainframe host name. In the --dataset option, user can specify the PDS name. For more details, readers can refer to the design document online.


Now that the functionality was defined, I started working with the Syncsort engineering team on the design. I posted an overall design document in the JIRA for feedback from Sqoop committers. We went through a couple of iterations and agreed on it.

At the top level, the design involves implementing a new Sqoop tool class (MainframeImportTool) and a new connection manager class (MainframeManager.) If you dig a little deeper, there are support classes like mainframe specific Mapper implementation (MainframeDatasetImportMapper), InputFormat implementation (MainframeDatasetInputFormat), InputSplit implementation (MainframeDatasetInputSplit),RecordReader implementation (MainframeDatasetRecordReader), and so on.


Next came the most difficult part, the actual implementation. Members of Syncsort engineering played a vital role in the detailed design and implementation. Since it is impossible to connect to a real mainframe in Apache testing environment, we decided to contribute unit tests based on Java mock objects. It was amazing to discover that Sqoop never used mock objects for testing before! We ran end-to-end tests with real mainframe in-house to verify the correctness of the implementation.

Patch Submission and Review Process

Once we were satisfied with the implementation and testing at Syncsort, I posted a patch to SQOOP-1272 for review. I was advised to post the patch to Sqoop Review Board. I have worked on a handful of Apache Hadoop JIRAs before. Most of the comments and discussions on a patch happen in the JIRA itself. It was a new experience for me to go through the Review Board. After some initial getting-used-to, I liked it very much since it kept most of the noise off of the JIRA. Also, the review comments were easy to address. There was a suggestion to add documentation to this new feature. I started digging through the Sqoop documentation files. I had to learn the documentation format. With some trial and error, I was able to understand it. It took a while to update the documentation. With busy schedules at work and despite Summer vacation interrupting my work on the JIRA, the review process was moving steadily. After a few cycles of review, the code and documentation got better and better. Finally, on Sept. 10, 2014, the patch to the JIRA was committed by Venkat. SQOOP-1272 will go into official Sqoop release 1.4.6.


The design and implementation of SQOOP-1272 allow anyone to extend the functionalities. Users can extend MainframeManager class to provide more complex functionalities while making use of the current implementation.

Syncsort’s DMX-h

Syncsort’s flagship Hadoop ETL product DMX-h provides enterprise grade mainframe connectivity with an easy to use graphical user interface. It has been enhanced so that it can be invoked from Sqoop. In particular, it supports the following features:

  • Ability to specify complex COBOL copybooks to define mainframe record layouts.
  • Ability to transfer a mainframe dataset “as is” in binary form to archive a golden copy in Hadoop.
  • Ability to transfer VSAM (Virtual Storage Access Method) datasets.

Please refer to this page for more details.


I think this contribution was a group effort and many individuals contributed to make it possible. In particular, I would like to thank Jarcec Cecho, Venkat Ranganathan, and Gwen Shapira for all their review comments and suggestions. I want to thank Syncsort engineer Jixiang Li who did the heavy lifting of the implementation and Yunkai Huang for creating the unit tests.

Categories: Hadoop

Using Impala, Amazon EMR, and Tableau to Analyze and Visualize Data

Cloudera Blog - Wed, 10/08/2014 - 15:41

Our thanks to AWS Solutions Architect Rahul Bhartia for allowing us to republish his post below.

Apache Hadoop provides a great ecosystem of tools for extracting value from data in various formats and sizes. Originally focused on large-batch processing with tools like MapReduce, Apache Pig, and Apache Hive, Hadoop now provides many tools for running interactive queries on your data, such as Impala, Drill, and Presto. This post shows you how to use Amazon Elastic MapReduce (Amazon EMR) to analyze a data set available on Amazon Simple Storage Service (Amazon S3) and then use Tableau with Impala to visualize the data.

Amazon Elastic MapReduce

Amazon EMR is a web service that makes it easy to quickly and cost-effectively process vast amounts of data. Amazon EMR uses Apache Hadoop, an open source framework, to distribute and process your data across a resizable cluster of Amazon Elastic Compute Cloud (Amazon EC2) instances.


Impala is an open source tool in the Hadoop ecosystem and is available on EMR for interactive, ad hoc querying using SQL syntax. Instead of using a MapReduce engine like Hive, Impala leverages a massively parallel processing (MPP) engine similar to what’s used in traditional relational database management systems (RDBMS), which allows it to achieve faster query response times.

While both Impala and Hive provide SQL-like capabilities and can share the same Metastore (a repository for tables and partitions metadata), they each play a distinct role in the Hadoop ecosystem. Compared to Impala, Hive typically has much higher query response times. This makes it inefficient to use with interactive data analysis tools like Tableau. However, Impala uses significant memory resources, and the cluster’s available memory places a constraint on how much data any query can consume. Hive is not limited in the same way and can process larger data sets with the same hardware, making it better for ETL workloads on large datasets.


Tableau Software is a business intelligence solution that integrates data analysis and reports into a continuous visual analysis process that is easy to learn and use. Tableau’s software delivers fast analytics, visualization and business intelligence and connects directly to AWS services and many other sources. The recent version of Tableau Desktop enables connection to Hive or Impala running on Amazon EMR via the ODBC driver for Amazon EMR. You can contact Tableau to find out how to activate Amazon EMR as a data source.

In this blog post, we’ll demonstrate Amazon EMR enabled as a data source in Tableau and connect to Impala for creating an interactive visualization

Using Amazon EMR to Analyze Google Books n-grams

The Google Books n-gram data set is freely available via the AWS Public Data Sets on Amazon S3. N-grams are fixed-size tuples of items. In this case, the items are words extracted from the Google Books corpus. The “n” specifies the number of elements in the tuple, so a 5-gram contains five words or characters.

Apache Hadoop traditionally works with HDFS, but it also supports the use of Amazon S3 as a file system. Impala currently requires data to be on HDFS while Hive supports direct querying of data on Amazon S3.

The Google Books n-gram data set is already in a Hadoop-friendly file format with sizes up to 2.2 TB. The data set files are stored in the SequenceFile format with block-level LZO compression. The sequence file key is the row number of the dataset stored as a LongWritable and the value is the raw data stored as TextWritable.

The SequenceFile format with block-level LZO compression requires further transformation because Impala cannot create them or insert data into them; it can only query LZO-compressed Text tables. Hive, which supports SequenceFile format with block-level LZO compression and querying for external data in Amazon S3, is an easy choice as the tool for transforming our data into an Impala-supported format onto HDFS.

Starting an Amazon EMR Cluster

First, we’ll launch an Amazon EMR cluster with Hive and Impala installed.

  1. Launch the Amazon EMR cluster using the AWS CLI. If you’ve never used CLI before, AWS provides instructions for installing and configuring it.

    The statement below launches the EMR cluster using the AWS CLI and returns the unique identifier for your cluster.

    aws emr create-cluster --name ImpalaCluster --ami-version 3.1.0 --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m1.medium InstanceGroupType=CORE,InstanceCount=2,InstanceType=m1.medium --ec2-attributes KeyName=keyPairName,AvailabilityZone=availabilityZone --applications Name=Hive,Name=Impala --no-auto-terminate

    Note: Replace the string keyPairName and availabilityZone with appropriate values before running the statement above. You’ll also need to replace the string j- XXXXXXXXXXXX in some of the next steps below with the unique identifier the statement above returns.

  2. The cluster should be ready in 5-10 minutes (its state will become “Waiting.”) To check the status of the cluster as it is initializing, run the following command:

    aws emr describe-cluster --cluster-id j-XXXXXXXXXXXX--query 'Cluster.Status.State' --output text

  3. Once your cluster enters the “WAITING” state, you can connect to the master node by using the command below.

    aws emr ssh --cluster-id <em>j-XXXXXXXXXXXX</em> --key-pair-file keyFilePath

    Note: Replace the string keyFilePath with the path to the private key file.

Creating the External Table from data in Amazon S3

Outside data sources are referenced in Amazon EMR by creating an EXTERNAL TABLE. This simply creates a reference to the data; no data is moved yet.

  1. Once logged into the master node, start the Hive shell:

    $ hive

  2. Define the source using a CREATE TABLE statement. For this example, we will only use English 1-grams dataset. 

    hive> CREATE EXTERNAL TABLE eng_1M_1gram(token STRING, year INT, frequency INT, pages INT, books INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS SEQUENCEFILE LOCATION 's3://datasets.elasticmapreduce/ngrams/books/20090715/eng-1M/1gram';

Creating a Replica Table in HDFS

We’ll create a replica table to store the results on HDFS required for Impala. In the replica table we’ll use Parquet instead of Sequence File format. Parquet is a column-oriented binary file format intended to be highly efficient for running large-scale queries.

  1. Create the replica table in Hive:

    hive> CREATE TABLE eng_1M_1gram_paraquet(token STRING, year INT, frequency INT, pages INT, books INT) ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' STORED AS inputformat 'parquet.hive.DeprecatedParquetInputFormat' outputformat 'parquet.hive.DeprecatedParquetOutputFormat';

  2. Adjust the mapred.min.split.size settings because the data is stored in Amazon S3 in a single file.

    hive> set mapred.min.split.size=134217728;

    This setting tells Hive to split the file into pieces of at least 128 MB for processing. This prevents you from using only one mapper when processing the data, as this wouldn’t take advantage of the distributed nature of MapReduce.

  3. Insert data into this table using a select query. We’ll read from the raw data table and insert into this new table.

    hive> INSERT OVERWRITE TABLE eng_1M_1gram_paraquet SELECT lower(token), year, frequency, pages, books FROM eng_1M_1gram WHERE year >= 1890 AND token REGEXP "^[A-Za-z+'-]+";

    This query also illustrates a typical use of Hive to run transformation on your data to make it easier to query downstream using tools like Tableau. In the query, first we filter out data before 1890 because of lower number of publications and use regular expression to accept only n-grams composed of alphabets and commonly used punctuations. We also lowercase the n-gram using the built-in function to store data in unified format and simplify querying in later stages.

  4. Once the step above completes, quit Hive.

Making the Replica Table Available in Impala

We’ll use the same Metastore here for both Impala and Hive. This requires the metadata to be updated in Impala before the table is available for Impala queries. The INVALIDATE METADATA statement marks the metadata as stale and makes Impala reload the associated metadata before a query proceeds.

  1. Log into Impala.

    $ impala-shell

  2. Invalidate the metadata for the replica table in Impala.

    impala> invalidate metadata;

  3. Exit the Impala shell and close the SSH connection to the Amazon EMR cluster.

Using Tableau to Visualize the Data from Impala

For the next steps, you’ll need Tableau Desktop installed on a Windows or MacOSX machine. If you don’t have a Tableau installation, you can use Amazon EC2 and install Tableau for the purpose of this blog. You’ll also need the keys to enable Amazon EMR as a data connection in Tableau. These can be obtained by contacting Tableau.

  1. Install the ODBC driver on your machine with Tableau Desktop, required for connecting Tableau Desktop to Impala on Amazon EMR.

    1. Download the drivers. 
    2. Unzip the downloaded file. This should create a folder named "ImpalaODBC."
    3. Navigate to the required package for installing the driver.

      Windows: ImpalaODBC\\Windows\SimbaImpalaODBC64.msi

      MacOSX: ImpalaODBC/ SimbaImpalaODBC.dmg

    4. Run the package above and follow the prompts to install the ODBC driver.
  2. Modify the Amazon EMR cluster’s Master Security Group so Tableau can connect with the Impala server running on the master node of the Amazon EMR cluster.

    1. Click the Amazon EC2 tab in the AWS Management Console to open the Amazon EC2 console.
    2. In the navigation pane, select Security Groups under the Network and Security group.
    3. In the Security Groups list, select Elastic MapReduce-master.
    4. In the lower pane, click the Inbound tab.
    5. In the Port Range field type 21050. Leave the default value in the Source field.
    6. Click Add Rule, and then click Apply Rule Changes.
  3. Follow the steps as directed by Tableau to enable Amazon EMR as a data connection option in Tableau. Clicking A" should show a connection page similar to the image below.

  4. Fill in the DNS of master node in the Server field above and click Connect. You can find the DNS using the statement below:

    aws emr describe-cluster --cluster-id j-XXXXXXXXXXXX --query 'Cluster.MasterPublicDnsName' --output text

  5. On the next screen, Select default from the schema drop-down, drag the table named "eng_1m_1g_paraquet" onto the upper left panel as in the image below, and click Go to Worksheet.

  6. This opens a Tableau workbook with the Dimension and Measure automatically populated. Now we can use Tableau with Impala running on Amazon EMR.

Video Demonstrations

Creating Interactive Visualizations

This video demonstrates the steps for creating some interactive visualizations using Tableau. First, we create a trend-line of books published over year.

Creating a Filter

The video demonstrates the steps for creating a filter that lets users choose a specific 1-Gram for the trend-line. The sudden increase around 1905 for the 1-gram ‘computer’ is quite interesting. If you have a hypothesis for the cause of this, please leave a comment below.

Terminating the Amazon EMR Cluster

Once you’ve completed the steps above, terminate the Amazon EMR cluster from the AWS console or via the CLI using the command below:

aws emr terminate-cluster --cluster-id j-XXXXXXXXXXXX

If you launched an Amazon EC2 instance for the purpose of this demo, don’t forget to terminate the instance.


In this post, we looked at a way to quickly visualize your data in Amazon S3 using Amazon EMR, Impala and Tableau. We also saw how Impala provides the response time needed while using Tableau’s data exploration feature so that you can quickly explore data and shift views on the fly to follow your train of thought. You can use the same pattern to quickly analyze any data on Amazon S3, including AWS CloudTrail logs or Amazon S3 access logs.

Categories: Hadoop

The Definitive "Getting Started" Tutorial for Apache Hadoop + Your Own Demo Cluster

Cloudera Blog - Mon, 10/06/2014 - 21:04

Using this new tutorial alongside Cloudera Live is now the fastest, easiest, and most hands-on way to get started with Hadoop.

At Cloudera, developer enablement is one of our most important objectives. One only has to look at examples from history (Java or SQL, for example) to know that knowledge fuels the ecosystem. That objective is what drives initiatives such as our community forums, the Cloudera QuickStart VM, and this blog itself.

Today, we are providing what we believe is a model for Hadoop developer enablement going forward: a definitive end-to-end tutorial and free, cloud-based demo cluster and sample data for hands-on exercises, via the Cloudera Live program.

When Cloudera Live was launched in April 2014, it initially contained a read-only environment where users could experiment with CDH, our open source platform containing the Hadoop stack, for a few hours. Today, we are launching a new interactive version (hosted by GoGrid) in which you can use pre-loaded datasets or your own data, and which is available to you for free for two weeks. Furthermore, the environment is available in two other flavors—with Tableau or Zoomdata included—so you can test-drive CDH and Cloudera Manager alongside familiar BI tools, too.

Now, back to that tutorial:

To There and Back

Most Hadoop tutorials take a piecemeal approach: they either focus on one or two components, or at best a segment of the end-to-end process (just data ingestion, just batch processing, or just analytics). Furthermore, few if any provide a business context that makes the exercise pragmatic.

This new tutorial closes both gaps. It takes the reader through the complete Hadoop data lifecycle—from data ingestion through interactive data discovery—and does so while emphasizing the business questions concerned: What products do customers view on the Web, what do they like to buy, and is there a relationship between the two?

Getting those answers is a task that organizations with traditional infrastructure have been doing for years. However, the ones that bought into Hadoop do the same thing at greater scale, at lower cost, and on the same storage substrate (with no ETL, that is) upon which many other types of analysis can be done.

To learn how to do that, in this tutorial (and assuming you are using our sample dataset) you will:

  • Load relational and clickstream data into HDFS (via Apache Sqoop and Apache Flume respectively)
  • Use Apache Avro to serialize/prepare that data for analysis
  • Create Apache Hive tables
  • Query those tables using Hive or Impala (via the Hue GUI)
  • Index the clickstream data using Flume, Cloudera Search, and Morphlines, and expose a search GUI for business users/analysts
Go Live

We think that even on its own, this tutorial will be a huge help to developers of all skill levels—and with Cloudera Live in the mix as a demo backend for doing the hands-on exercises, it’s almost irresistible.

If you have any comments or encounter a roadblock, let us know about it in this discussion forum.

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

This Month in the Ecosystem (September 2014)

Cloudera Blog - Thu, 10/02/2014 - 20:17

Welcome to our 13th edition of “This Month in the Ecosystem,” a digest of highlights from September 2014 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly).

  • Cloudera confirmed the acquisition of DataPad’s technology assets and team. Wes McKinney, the creator of the Python-based Pandas data analysis framework, and Chang She, a major contributor to Pandas, will join Cloudera Engineering.
  • The results of new benchmarking testing of Impala 1.4 versus Presto, Spark SQL, and Hive-on-Tez were published, with Impala in the lead by 13x to 27x (and having expanded that lead since the last round of testing).
  • A community meetups schedule for Strata+Hadoop World was published — with more than a dozen are planned during the run of NYC DataWeek.
  • Early releases of two new O’Reilly Media books, Getting Started with Impala and Hadoop Security, became available. (Click here to see a list of ecosystem books.)
  • Continuuity has open-sourced its Reactor platform and is now operating under a new monicker: Cask.
  • Apache Storm graduated from the ASF Incubator and is now a Top Level Project.
  • Apache Hadoop source code management has moved from SVN to git. In this article, Cloudera Software Engineer/Hadoop PMC member Karthik Kambatla (and other PMC members) explain the reasoning.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

Here’s Your Getting Started with Impala Book

Cloudera Blog - Mon, 09/29/2014 - 14:08

Getting Started with Impala (now in early release)—another book in the Hadoop ecosystem books canon—is indispensable for people who want to get familiar with Impala, the open source MPP query engine for Apache Hadoop. We spoke with its author, Impala docs writer John Russell, about the book’s origin and mission.

Why did you decide to write this book?

I wanted to do some long-form tutorials, discuss anti-patterns, and other kinds of things that you don’t often see in official documentation. The focus on SQL coding let me go deep on certain features rather than covering everything. I could demonstrate several ways of tackling the same problem and discuss the pros and cons of each approach. In the official docs, I try to optimize for Google searchers who want to jump into any page and quickly find the right answer.

Who is the intended reader?

Anyone who knows their way around a database, and is interested to learn how those SQL and data modelling skills translate to the world of Big Data and data science. I really focus on the SQL side of things from a developer perspective, which could be that of a data analyst, data scientist, someone writing a business intelligence application, or a student hoping to go into one of those fields.

What will readers learn?

How not to be intimidated when confronted with large volumes of data. I go through different ways to get data into Impala, organize it, and optimize it for queries. I figure once you’ve joined a billion-row table with a million-row table, that’s a good confidence builder. I want you to understand the reasons why query X performs better than query Y, so that when you encounter your own unique situation, you’ll be able to pull the right arrow from your quiver.

I’ve tried to distill all the gotchas and misunderstandings I’ve encountered when transitioning from one database platform to another, to help make readers comfortable in a heterogeneous environment. In my experience, data scientists can easily spend 75% of their time on SQL queries. At a data-oriented company, they might be involved with half a dozen different SQL-oriented systems.

Although I don’t cover administration-related aspects in detail, I provide tips to help developers design their schemas and code their SQL in DBA-friendly ways.

What are some particularly interesting things about Impala that most people don’t know?

That Impala doesn’t have all that many knobs to turn while doing performance tuning. Yet each one can have a dramatic impact on performance and scalability. The key to happiness is often one crucial SQL statement like COMPUTE STATS, or even just a CREATE TABLE clause for partitioning or file format. Also, I think it’s fascinating that many aspects have a kind of “donut hole” where the choices on the extreme ends matter, but you can ignore the ones in the middle.

For example, I focus mostly on the file formats that provide maximum convenience or maximum query speed, and skip over the ones that fall somewhere in between. To illustrate performance and distributed queries, I’ll use tables that are tiny or huge. Tables somewhere in the middle (with sizes like 1MB, 10MB, or 100MB) are basically all the same from a Big Data perspective.

Do you foresee this book having future editions, and if so, what do you think they would add?

I’m expecting plenty of interesting topics for future editions. The new features in Impala 2.0 and beyond open up new use cases for other kinds of queries, ETL techniques, and porting tips. As people explore more and more Impala features, they’ll find opportunities to trade-off between more convenience and better performance. All good subjects for more tutorials and deep dives!

Meet John in person/get a signed copy in the Cloudera booth at Strata+ Hadoop World, at 4pm ET on Fri., Oct. 15.

Categories: Hadoop

Secrets of Cloudera Support: Using OpenStack to Shorten Time-to-Resolution

Cloudera Blog - Wed, 09/24/2014 - 15:39

Automating the creation of short-lived clusters for testing purposes frees our support engineers to spend more time on customer issues.

The first step for any support engineer is often to replicate the customer’s environment in order to identify the problem or issue. Given the complexity of Cloudera customer environments, reproducing a specific issue is often quite difficult, as a customer’s problem might only surface in an environment with specific versions of Cloudera Enterprise (CDH + Cloudera Manager), configuration settings, certain number of nodes, or the structure of the dataset itself. Even with Cloudera Manager’s awesome setup wizards, setting up Apache Hadoop can be quite time consuming, as the software was never designed with ephemeral clusters in mind.

For these reasons, until recently, a significant amount of our support engineers’ time was spent creating virtual machines, installing specific Cloudera Manager and/or CDH versions, and setting up the services installed on the customer’s cluster. To make matters worse, engineers had to use their own laptops to run four-node Hadoop clusters, which was not only taxing on the machine’s resources but often forced an engineer to reproduce one issue at a time—which is incredibly inefficient as support engineers are very rarely working just a single ticket. This approach required a careful balance of time management and swift case resolutions to keep things moving smoothly.

In recognition of this problem, Cloudera Support determined that the first step toward streamlining how engineers reproduce issues–and thus reducing resolution time for our customers overall—was to give them a fast, scalable way to deploy ephemeral instances, so that they could install the necessary software on any number of nodes without having to run them locally on their machines.

In this remainder of this post, we’ll explain how OpenStack was the ideal choice for a self-service tool that meets these goals.

On OpenStack

Using OpenStack, a user clicks a button and seconds later gets a brand-new virtual machine on which to install software, run tests, and then tear down when finished. Countless companies use the OpenStack core to build internal cloud infrastructure, for various reasons:

  • Scale-out architecture—more nodes equals more resources
  • Incredibly active community—contributions from hundreds of companies
  • API centric—anything can be scripted
  • Lightning-fast instance spin-up—brand-new instances in under a minute

After considering all the major OpenStack distributions, we chose to use Red Hat’s RDO for the reasons listed below:

  • Simple yet powerful install procedure
  • Great documentation
  • 100% open source core; no lock-in
  • Production-quality updates
  • Optional enterprise support
  • Very approachable community (for questions, comments, bugs)

Within two weeks of deploying what we now call Support Lab, we already had the majority of our support staff using it and providing great feedback and suggestions.

Adding Cloudera Manager

Very quickly we realized that we needed to ride this momentum, so we started the next phase of the project: to fully automate the deployment of instances as well as Cloudera Enterprise. The goals were simple:

  • Abstract the creation of instances in OpenStack
  • Fully bootstrap the installation of Cloudera Enterprise
  • Allow the user to mix-and-match CDH/Cloudera Manager versions
  • Only install services defined by the user (HDFS, Impala, HBase, and so on)
  • Make the cluster size configurable (1 node => n nodes)

By utilizing the Python APIs of both OpenStack and Cloudera Manager, we were able to create a simple web application to completely orchestrate all the steps an engineer would normally take to set up a cluster manually. Below is a screenshot of what this piece of automation looks like to the user.

After clicking the Deploy button, the user is brought to the following screen where they can monitor the status of the deployment. This particular deployment took only 16 minutes to stand up a fully functioning CDH 5.1 Apache HBase cluster, a process that would normally take hours of an engineer’s time. In addition to the monitoring page shown below, the user is also notified via HipChat so they can redirect their attention to something more important during the actual deployment:

And just like that, the engineer has a brand-new temporary cluster to use for testing.


Overall, OpenStack met all of our expectations for automating cluster deployments, and as our team grows, we’ll continue to invest in our automation infrastructure. Many thanks to Red Hat and the countless other companies that contribute to the project—we can say for sure that our support staff (and customers) appreciate it.

Richard Saltzer is a developer on Cloudera Support’s tools team.

Categories: Hadoop

New Benchmarks for SQL-on-Hadoop: Impala 1.4 Widens the Performance Gap

Cloudera Blog - Mon, 09/22/2014 - 15:31

With 1.4, Impala’s performance lead over the SQL-on-Hadoop ecosystem gets wider, especially under multi-user load.

As noted in our recent post about the Impala 2.x roadmap (“What’s Next for Impala: Focus on Advanced SQL Functionality”), Impala’s ecosystem momentum continues to accelerate, with nearly 1 million downloads since the GA of 1.0, deployment by most of Cloudera’s enterprise data hub customers, and adoption by MapR, Amazon, and Oracle as a shipping product. Furthermore, in the past few months, independent sources such as IBM Research have confirmed that “Impala’s database-like architecture provides significant performance gains, compared to Hive’s MapReduce- or Tez-based runtime.”

Cloudera’s performance engineering team recently completed a new round of benchmark testing based on Impala 1.4 and the most recent stable releases of the major SQL engine options for the Apache Hadoop platform (see previous results for 1.1 here, and for 1.3 here). As you’ll see in the results below, Impala has extended its performance lead since our last post, especially under multi-user load:

  • For single-user queries, Impala is up to 13x faster than alternatives, and 6.7x faster on average.
  • For multi-user queries, the gap widens: Impala is up to 27.4x faster than alternatives, and 18x faster on average — or nearly three times faster on average for multi-user queries than for single-user ones.

Now, let’s review the details.


As always, all tests were run on precisely the same 21-node cluster. This time around we ran on a smaller (64GB per node) memory footprint across all engines to correct a misperception that Impala only works well with lots of memory (Impala performed similarly well on larger memory clusters):

  • 2 processors, 12 cores, Intel Xeon CPU E5-2630L 0 at 2.00GHz
  • 12 disk drives at 932GB each (one for the OS, the rest for HDFS)
  • 64GB memory
Comparative Set

This time, we dropped Shark from the comparative set as it has since been retired in favor of the broad-based community initiative to speed up batch processing with Hive-on-Spark. We also added the current version of Spark SQL to the mix for those interested:

  • Impala 1.4.0
  • Hive-on-Tez: The final phase of the 18-month Stinger initiative (aka Hive 0.13)
  • Spark SQL 1.1: A new project that enables developers do inline calls to SQL for sampling, filtering, and aggregations as part of a Spark data processing application
  • Presto 0.74: Facebook’s query engine project
  • Just like last time, to ensure a realistic Hadoop workload with representative data-size-per-node, queries were run on a 15TB scale-factor dataset across 21 nodes.
  • We ran precisely the same open decision-support benchmark derived from TPC-DS described in our previous rounds of testing (with queries categorized into Interactive, Reporting, and Analytics buckets).
  • Due to the lack of a cost-based optimizer in all tested engines except Impala, we tested all engines with queries that had been converted to SQL-92 style joins (the same modifications used in previous rounds of testing). For consistency, we ran those same queries against Impala — although Impala produces identical results without these modifications.
  • We selected the most optimal file formats across all engines, consistently using Snappy compression to ensure apples-to-apples comparisons. Furthermore, each engine was assessed on a file format that ensured the best possible performance and a fair, consistent comparison: Impala on Apache Parquet (incubating), Hive-on-Tez on ORC, Presto on RCFile, and Spark SQL on Parquet.
  • The standard rigorous testing techniques (multiple runs, tuning, and so on) were used for each of the engines involved.
Results: Single User

Impala outperformed all alternatives on single-user workloads across all queries run. Impala’s performance advantaged ranged from 2.1x to 13.0x and on average was 6.7x faster. This result indicates Impala’s performance advantage is actually widening over time as the previous gap was 4.8x on average based on our previous round of testing. The widening of its performance advantage is most pronounced when compared to “Stinger”/Hive-on-Tez (from an average of 4.9x to 9x) and Presto (from an average of 5.3x to 7.5x):

Next, let’s turn to multi-user results. We view performance under concurrent load as the more meaningful metric for real-world comparison.

Results: Multiple Users

We re-ran the same Interactive queries as the previous post, running 10 users at the same time. Our experience was the same as the past several posts: Impala’s performance advantage widens under concurrent load. Impala’s average advantage widens from 6.7x to 18.7x when going from single user to concurrent user workloads. Advantages varied from 10.6x to 27.4x depending on the comparison. This is a bigger advantage than in our previous round of testing where our average advantage was 13x. Again, Impala is widening the gap:

Note that Impala’s speed under 10-user load was nearly half that under single-user load — whereas the average across the alternatives was just one-fifth that under single-user load. We attribute that advantage to Impala’s extremely efficient query engine, which demonstrated query throughput that is 8x to 22x better (and by an average of 14x) than alternatives:


Our vision for Impala is for it to become the most performant, compatible, and usable native analytic SQL engine for Hadoop. These results demonstrate Impala’s widening performance lead over alternatives, especially under multi-user workloads. We have a number of exciting performance and efficiency improvements planned so stay tuned for additional benchmark posts. (As usual, we encourage you to independently verify these results by running your own benchmarks based on the open toolkit.)

While Impala’s performance advantage has widened, the team has been hard at work adding to Impala’s functionality. Later this year, Impala 2.0 will ship with a great many feature additions including commonly requested ANSI SQL functionality (such as analytic window functions and subqueries), additional datatypes, and additional popular vendor-specific SQL extensions.

Impala occupies a unique position in the Hadoop open source ecosystem today by being the only SQL engine that offers:

  • Low-latency, feature rich SQL for BI users
  • Ability to handle highly-concurrent workloads
  • Efficient resource usage in a shared workload environment (via YARN)
  • Open formats for accessing any data from any native Hadoop engine
  • Low lock-in multi-vendor support, and
  • Broad ISV support

As always, we welcome your comments and feedback!

Justin Erickson is Director of Product Management at Cloudera.

Marcel Kornacker is Impala’s architect and the Impala tech lead at Cloudera.

Dileep Kumar is a Performance Engineer at Cloudera.

David Rorke is a Performance Engineer at Cloudera.

Categories: Hadoop

Community Meetups during Strata + Hadoop World 2014

Cloudera Blog - Fri, 09/19/2014 - 15:54

The meetup opportunities during the conference week are more expansive than ever — spanning Impala, Spark, HBase, Kafka, and more.

Strata + Hadoop World 2014 is a kaleidoscope of experiences for attendees, and those experiences aren’t contained within the conference center’s walls. For example, the meetups that occur during the conf week (which is concurrent with NYC DataWeek) are a virtual track for developers — and with Strata + Hadoop World being bigger than ever, so is the scope of that track.

To follow is a list of the meetups currently planned (which may expand as more groups power-up); its richness is an apt reflection of the ecosystem’s current disposition:

Monday, Oct. 13
  • SQL NYC (530pm)

    Marcel Kornacker on “The Next Wave of SQL”: Using Hadoop and Impala to build analytic apps (530pm)

Tuesday, Oct. 14 Wednesday, Oct. 15
  • NYC Cloudera User Group (3:45pm – attendees only; onsite)

    Kostas Sakellis on Cloudera Manager’s Extension Framework, Amandeep Khurana on Hadoop best practices on AWS

  • Apache Kafka NYC (6pm)

    Agenda TBD

  • Sqoop User Meetup (630pm)

    Sunil Sitaula on EDH offloading, Abe Elmahrek on refactoring Sqoop2 for general data transfer

  • Hive User Group Meeting (630pm)

    Xuefu Zhang on Hive-on-Spark, Prasad Mujumdar on Apache Sentry, and several others

  • HBase NYC (7pm)

    Nick Dimiduk on HBase 1.0, Masatake Iwasaki on HTrace, Lars George on HBase cluster sizing.

Thursday, Oct. 16

We’ll see you at some of these things, yes?

Categories: Hadoop

How Impala Supports Mixed Workloads in Multi-User Environments

Cloudera Blog - Wed, 09/17/2014 - 16:29

Our thanks to Melanie Imhof, Jonas Looser, Thierry Musy, and Kurt Stockinger of the Zurich University of Applied Science in Switzerland for the post below about their research into the query performance of Impala for mixed workloads.

Recently, we were approached by an industry partner to research and create a blueprint for a new Big Data, near real-time, query processing architecture that would replace its current architecture based on a popular open source database system.

The goal of our research was to find a new, cost-effective solution that could accomplish the following:

  • Perform analytic queries for terabyte-scale data sets with response times of only a few seconds
  • Guarantee the above response times even with increasing data set sizes
  • Guarantee the above response times even when concurrent users increase

In the remainder of this post, we will explain how we identified Impala as the best solution for these requirements, and refer you to the detailed results of our research.

Use Case Details

The current system manages dozens of different data sets being accessed by hundreds of users from various international locations. Moreover, the database feeds multiple web-based business intelligence reports for its customers that are updated on a regular basis. In other words, the architecture can be considered as a near-real-time data warehouse/business intelligence environment that enables advanced analytics for customers.

Furthermore, the data size as well as the user base that concurrently access the system have been increasing considerably — with data size increasing by up to 1,000x to be accessed by hundreds of users. Hence, the new architecture needed to be re-designed to cover new, multi-user workloads.

In addition to the requirements explained above, the chosen solution would also allow for relatively easy migration from the current open source database approach to the new architecture. Moreover, the existing business intelligence reports for the end-users should be migrated with minimal impact.

Given these requirements, we made the following strategic decisions for our architecture choice:

  • We ruled out a traditional database system. The use case is a classic business intelligence problem with near-real-time requirements. However, given that our solution was supposed to be large-scale and also cost effective, we ruled out traditional commercial database systems.
  • We ruled out Apache Hive. Although the presence of large data sets with the potential to scale by a factor of 10, 100, or 1,000 initially made Hive an option, our requirement for near-real-time query processing eventually ruled it out.

In contrast, as we explained in the introduction, we found that Impala could equivalently meet our partner’s needs for a near-real-time BI system at higher scale and lower cost.

Research Details

The goal of our experiments was to go beyond the popular TPC-DS benchmark for evaluating decision-support workloads. Rather, our focus was on multi-dimensional point, range, and aggregation queries under consideration of concurrent user access for a real-world workload that is typical for decision-support systems.

In summary, we found that in a multi-user, multi-node environment, Impala’s query response time maintains a linear increase with the number of concurrent users, and that query response time is equally distributed across all users. However, when we also simulated the time needed for the concurrent users to interact with the query result by adding “sleep” time, the response dropped to the expected optimal execution time of a single user.

The details of our evaluation can be found in this technical report.

This work was funded as an applied research project/proof of concept by LinkResearch Tools.

Melanie Imhof is a research associate and a Ph.D. candidate at Zurich University of Applied Sciences and the University of Neuchâtel, Switzerland. She is interested in information retrieval, machine learning and related areas. She has obtained her M.Sc. in computer science from ETH Zurich with focus on machine learning and computer vision.

Jonas Looser is research associate at Zurich University of Applied Sciences (ZHAW). He has received a B.Sc. in computer science from ZHAW and has been working in different data science projects at ZHAW. His research interests are in data warehousing and business intelligence.

Thierry Musy is a research associate at Zurich University of Applied Sciences (ZHAW) in the areas of information retrieval, data science and big data. He works on joint research projects at ZHAW where cutting edge technologies are applied both in academia and industry. He has earned a B.Sc. in computer science at ZHAW and has worked in leading positions in the ICT and the financial sector.

Dr. Kurt Stockinger is an associate professor of computer science at Zurich University of Applied Sciences, Switzerland. His research interests are in data science with focus on big data, data warehousing, business intelligence and advanced database technology. He is also on the Advisory Board of Callista Group AG. Previously Kurt worked at Credit Suisse in Zurich, at Lawrence Berkeley National Laboratory in Berkeley, California, as well as at CERN in Geneva. He holds a Ph.D. in computer science from CERN/University of Vienna.

Categories: Hadoop

How-to: Install CDH on Mac OSX 10.9 Mavericks

Cloudera Blog - Tue, 09/16/2014 - 16:35

This overview will cover the basic tarball setup for your Mac.

If you’re an engineer building applications on CDH and becoming familiar with all the rich features for designing the next big solution, it becomes essential to have a native Mac OSX install. Sure, you may argue that your MBP with its four-core, hyper-threaded i7, SSD, 16GB of DDR3 memory are sufficient for spinning up a VM, and in most instances — such as using a VM for a quick demo — you’re right.  However, when experimenting with a slightly heavier workload that is a bit more resource intensive, you’ll want to explore a native install.

In this post, I will cover setup of a few basic dependencies and the necessities to run HDFS, MapReduce with YARN, Apache ZooKeeper, and Apache HBase. It should be used as a guideline to get your local CDH box setup with the objective to enable you with building and running applications on the Apache Hadoop stack.

Note: This process is not supported and thus you should be comfortable as a self-supporting sysadmin. With that in mind, the configurations throughout this guideline are suggested for your default bash shell environment that can be set in your ~/.profile.


Install the Java version that is supported for the CDH version you are installing. In my case for CDH 5.1, I’ve installed JDK 1.7 u67. Historically the JDK for Mac OSX was only available from Apple, but since JDK 1.7, it’s available directly through Oracle’s Java downloads. Download the .dmg (in the example below, jdk-7u67-macosx-x64.dmg) and install it.

Verify and configure the installation:

Old Java path: /System/Library/Frameworks/JavaVM.framework/Home
New Java path: /Library/Java/JavaVirtualMachines/jdk1.7.0_67.jdk/Contents/Home

export JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.7.0_67.jdk/Contents/Home"

Note: You’ll notice that after installing the Oracle JDK, the original path used to manage versioning /System/Library/Frameworks/JavaVM.framework/Versions, will not be updated and you now have the control to manage your versions independently. 

Enable ssh on your mac by turning on remote login. You can find this option under your toolbar’s Apple icon > System Preferences > Sharing.

  1. Check the box for Remote Login to enable the service. 
  2. Allow access for: “Only these users: Administrators”

    Note: In this same window, you can modify your computer’s hostname.

Enable password-less ssh login to localhost for MRv1 and HBase. 

  1. Open your terminal.
  2. Generate an rsa or dsa key.
    1. ssh-keygen -t rsa -P ""
    2. Continue through the key generator prompts (use default options).
  3. Test: ssh localhost

Another toolkit I admire is Homebrew, a package manager for OSX. While Xcode developer command-line tools are great, the savvy naming conventions and ease of use of Homebrew get the job done in a fun way. 

I haven’t needed Homebrew for much else than for installing dependencies required for building native Snappy libraries for Mac OSX and ease of install of MySQL for Hive. Snappy is commonly used within HBase, HDFS, and MapReduce for compression and decompression.


Finally, the easy part: The CDH tarballs are very nicely packaged and easily downloadable from Cloudera’s repository. I’ve downloaded tarballs for CDH 5.1.0.

Download and explode the tarballs in a lib directory where you can manage latest versions with a simple symlink as the following. Although Mac OSX’s “Make Alias” feature is bi-directional, do not use it, but instead use your command-line ln -s command, such as ln -s source_file target_file. 

  • /Users/jordanh/cloudera/
  • cdh5.1/
    • hadoop -> /Users/jordanh/cloudera/lib/hadoop-2.3.0-cdh5.1.0
    • hbase -> /Users/jordanh/cloudera/lib/hbase-0.98.1-cdh5.1.0
    • hive -> /Users/jordanh/cloudera/lib/hive-0.12.0-cdh5.1.0
    • zookeeper -> /Users/jordanh/cloudera/lib/zookeeper-3.4.5-cdh4.7.0
  • ops/
    • dn
    • logs/hadoop, logs/hbase, logs/yarn
    • nn/
    • pids
    • tmp/
    • zk/

You’ll notice above that you’ve created a handful of directories under a folder named ops. You’ll use them later to customize the configuration of the essential components for running Hadoop. Set your environment properties according to the paths where you’ve exploded your tarballs. 

~/.profile CDH="cdh5.1" export HADOOP_HOME="/Users/jordanh/cloudera/${CDH}/hadoop" export HBASE_HOME="/Users/jordanh/cloudera/${CDH}/hbase" export HIVE_HOME="/Users/jordanh/cloudera/${CDH}/hive" export HCAT_HOME="/Users/jordanh/cloudera/${CDH}/hive/hcatalog" export PATH=${JAVA_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${ZK_HOME}/bin:${HBASE_HOME}/bin:${HIVE_HOME}/bin:${HCAT_HOME}/bin:${M2_HOME}/bin:${ANT_HOME}/bin:${PATH}

Update your main Hadoop configuration files, as shown in the sample files below. You can also download all files referenced in this post directly from here.

<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:8020</value> <description>The name of the default file system. A URI whose scheme and authority determine the FileSystem implementation. The uri's scheme determines the config property (fs.SCHEME.impl) naming the FileSystem implementation class. The uri's authority is used to determine the host, port, etc. for a filesystem.</description> </property> <property> <name>hadoop.tmp.dir</name> <value>/Users/jordanh/cloudera/ops/tmp/hadoop-${user.name}</value> <description>A base for other temporary directories.</description> </property> <property> <name>io.compression.codecs</name> <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec</value> <description>A comma-separated list of the compression codec classes that can be used for compression/decompression. In addition to any classes specified with this property (which take precedence), codec classes on the classpath are discovered using a Java ServiceLoader.</description> </property> </configuration>


<configuration> <property> <name>dfs.namenode.name.dir</name> <value>/Users/jordanh/cloudera/ops/nn</value> <description>Determines where on the local filesystem the DFS name node should store the name table(fsimage). If this is a comma-delimited list of directories then the name table is replicated in all of the directories, for redundancy. </description> </property> <property> <name>dfs.datanode.data.dir</name> <value>/Users/jordanh/cloudera/ops/dn/</value> <description>Determines where on the local filesystem an DFS data node should store its blocks. If this is a comma-delimited list of directories, then data will be stored in all named directories, typically on different devices. Directories that do not exist are ignored. </description> </property> <property> <name>dfs.datanode.http.address</name> <value>localhost:50075</value> <description> The datanode http server address and port. If the port is 0 then the server will start on a free port. </description> </property> <property> <name>dfs.replication</name> <value>1</value> <description>Default block replication. The actual number of replications can be specified when the file is created. The default is used if replication is not specified in create time. </description> </property> </configuration>

I attribute the YARN and MRv2 configuration and setup from the CDH 5 installation docs. I will not digress into the specifications of each property or the orchestration and details of how YARN and MRv2 operate, but there’s some great information that my colleague Sandy has already shared for developers and admins.

Be sure to make the necessary adjustments per your system’s memory and CPU constraints. Per the image below, it is easy to see how these parameters will affect your machine’s performance when you execute jobs.

Next, edit the following files as shown.

<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> <description>the valid service name should only contain a-zA-Z0-9_ and can not start with numbers</description> </property> <property> <name>yarn.log-aggregation-enable</name> <value>true</value> <description>Whether to enable log aggregation</description> </property> <property> <name>yarn.nodemanager.remote-app-log-dir</name> <value>hdfs://localhost:8020/tmp/yarn-logs</value> <description>Where to aggregate logs to.</description> </property> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>8192</value> <description>Amount of physical memory, in MB, that can be allocated for containers.</description> </property> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>4</value> <description>Number of CPU cores that can be allocated for containers.</description> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>1024</value> <description>The minimum allocation for every container request at the RM, in MBs. Memory requests lower than this won't take effect, and the specified value will get allocated at minimum.</description> </property> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>2048</value> <description>The maximum allocation for every container request at the RM, in MBs. Memory requests higher than this won't take effect, and will get capped to this value.</description> </property> <property> <name>yarn.scheduler.minimum-allocation-vcores</name> <value>1</value> <description>The minimum allocation for every container request at the RM, in terms of virtual CPU cores. Requests lower than this won't take effect, and the specified value will get allocated the minimum.</description> </property> <property> <name>yarn.scheduler.maximum-allocation-vcores</name> <value>2</value> <description>The maximum allocation for every container request at the RM, in terms of virtual CPU cores. Requests higher than this won't take effect, and will get capped to this value.</description> </property> </configuration>


<configuration> <property> <name>mapreduce.jobtracker.address</name> <value>localhost:8021</value> </property> <property> <name>mapreduce.jobhistory.done-dir</name> <value>/tmp/job-history/</value> <description></description> </property> <property> <name>mapreduce.framework.name</name> <value>yarn</value> <description>The runtime framework for executing MapReduce jobs. Can be one of local, classic or yarn. </description> </property> <property> <name>mapreduce.map.cpu.vcores</name> <value>1</value> <description> The number of virtual cores required for each map task. </description> </property> <property> <name>mapreduce.reduce.cpu.vcores</name> <value>1</value> <description> The number of virtual cores required for each reduce task. </description> </property> <property> <name>mapreduce.map.memory.mb</name> <value>1024</value> <description>Larger resource limit for maps.</description> </property> <property> <name>mapreduce.reduce.memory.mb</name> <value>1024</value> <description>Larger resource limit for reduces.</description> </property> <property> <name>mapreduce.map.java.opts</name> <value>-Xmx768m</value> <description>Heap-size for child jvms of maps.</description> </property> <property> <name>mapreduce.reduce.java.opts</name> <value>-Xmx768m</value> <description>Heap-size for child jvms of reduces.</description> </property> <property> <name>yarn.app.mapreduce.am.resource.mb</name> <value>1024</value> <description>The amount of memory the MR AppMaster needs.</description> </property> </configuration>


# Where log files are stored. $HADOOP_HOME/logs by default. export HADOOP_LOG_DIR="/Users/jordanh/cloudera/ops/logs/hadoop" export YARN_LOG_DIR="/Users/jordanh/cloudera/ops/logs/yarn" # The directory where pid files are stored when processes run as daemons. /tmp by default. export HADOOP_PID_DIR="/Users/jordanh/cloudera/ops/pids" export YARN_PID_DIR=${HADOOP_PID_DIR}

You can configure HBase to run without separately downloading Apache ZooKeeper. Rather, it has a bundled package that you can easily run as a separate instance or as standalone mode in a single JVM. I recommend using either distributed or standalone mode instead of a separately downloaded ZooKeeper tarball on your machine for ease of use, configuration, and management.

The primary difference with configuration between running HBase in distributed or standalone mode is with the hbase.cluster.distributed property in hbase-site.xml. Set the property to false for launching HBase in standalone mode or true to spin up separate instances for services such as HBase’s ZooKeeper and RegionServer. Update the following configurations for HBase as specified to run it per this type of configuration.

Note regarding hbase-site.xml: Property hbase.cluster.distributed is set to false by default and will launch in standalone mode. Also, hbase.zookeeper.quorum is set to localhost by default and does not need to be overridden in our scenario.

<configuration> <property> <name>hbase.cluster.distributed</name> <value>true</value> <description>The mode the cluster will be in. Possible values are false for standalone mode and true for distributed mode. If false, startup will run all HBase and ZooKeeper daemons together in the one JVM. </description> </property> <property> <name>hbase.tmp.dir</name> <value>/Users/jordanh/cloudera/ops/tmp/hbase-${user.name}</value> <description>Temporary directory on the local filesystem. Change this setting to point to a location more permanent than '/tmp' (The '/tmp' directory is often cleared on machine restart). </description> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/Users/jordanh/cloudera/ops/zk</value> <description>Property from ZooKeeper's config zoo.cfg. The directory where the snapshot is stored. </description> </property> <property> <name>hbase.rootdir</name> <value>hdfs://localhost:8020/hbase</value> <description>The directory shared by region servers and into which HBase persists. The URL should be 'fully-qualified' to include the filesystem scheme. For example, to specify the HDFS directory '/hbase' where the HDFS instance's namenode is running at namenode.example.org on port 9000, set this value to: hdfs://namenode.example.org:9000/hbase. By default HBase writes into /tmp. Change this configuration else all data will be lost on machine restart. </description> </property> </configuration>

Note regarding $HBASE_HOME/conf/hbase-env.sh: By default HBASE_MANAGES_ZK is set as true and is listed below only for explicit definition.

# Where log files are stored. $HBASE_HOME/logs by default. # Where log files are stored. $HBASE_HOME/logs by default. export HBASE_LOG_DIR="/Users/jordanh/cloudera/ops/logs/hbase" # The directory where pid files are stored. /tmp by default. export HBASE_PID_DIR="/Users/jordanh/cloudera/ops/pids" # Tell HBase whether it should manage its own instance of Zookeeper or not. export HBASE_MANAGES_ZK=true

Pulling it All Together

By now, you should have accomplished setting up HDFS, YARN, and HBase. Hadoop setup and configuration is quite tedious, much less managing it over time (thus Cloudera Manager, which is unfortunately not available for Macs).

These are the bare essentials for getting your local machine ready for running MapReduce jobs and building applications on HBase. In the next few steps, we will start/stop the services and provide examples to ensure each service is operating correctly. The steps are listed in the specific order for initialization in order to adhere to dependencies. The order could be reversed for halting the services.

Service HDFS


format:  hdfs namenode -format

start:  hdfs namenode

stop:  Ctrl-C

url:  http://localhost:50070/dfshealth.html


start:  hdfs datanode

stop:  Ctrl-C

url:  http://localhost:50075/browseDirectory.jsp?dir=%2F&nnaddr=


hadoop fs -mkdir /tmp

hadoop fs -put /path/to/local/file.txt /tmp/

hadoop fs -cat /tmp/file.txt

Service YARN


start:  yarn resourcemanager

stop:  Ctrl-C

url:  http://localhost:8088/cluster


start:  yarn nodemanager

stop:  Ctrl-C

url:  http://localhost:8042/node

MapReduce Job History Server

start:  mapred historyserver, mr-jobhistory-daemon.sh start historyserver

stop:  Ctrl-C, mr-jobhistory-daemon.sh stop historyserver

url:  http://localhost:19888/jobhistory/app

Test Vanilla YARN Application

hadoop jar $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.3.0-cdh5.1.0.jar -appname DistributedShell -jar $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.3.0-cdh5.1.0.jar -shell_command "ps wwaxr -o pid,stat,%cpu,time,command | head -10" -num_containers 2 -master_memory 1024


hadoop org.apache.hadoop.fs.TestDFSIO -write -nrFiles 5 -size 1GB hadoop org.apache.hadoop.fs.TestDFSIO -read -nrFiles 5 -size 1GB

Test MRv2 YARN Terasort/Teragen

hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.3.0-cdh5.1.0.jar teragen 100000000 /tmp/eval/teragen hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.3.0-cdh5.1.0.jar terasort /tmp/eval/teragen /tmp/eval/terasort

Test MRv2 YARN Pi

hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.3.0-cdh5.1.0.jar pi 100 100

Service HBase

HBase Master/RegionServer/ZooKeeper

start:  start-hbase.sh

stop:  stop-hbase.sh

logs:  /Users/jordanh/cloudera/ops/logs/hbase/

url:  http://localhost:60010/master-status


hbase shell create 'URL_HITS', {NAME=>'HOURLY'},{NAME=>'DAILY'},{NAME=>'YEARLY'} put 'URL_HITS', 'com.cloudera.blog.osx.localinstall', 'HOURLY:2014090110', '10' put 'URL_HITS', 'com.cloudera.blog.osx.localinstall', 'HOURLY:2014090111', '5' put 'URL_HITS', 'com.cloudera.blog.osx.localinstall', 'HOURLY:2014090112', '30' put 'URL_HITS', 'com.cloudera.blog.osx.localinstall', 'HOURLY:2014090113', '80' put 'URL_HITS', 'com.cloudera.blog.osx.localinstall', 'HOURLY:2014090114', '7' put 'URL_HITS', 'com.cloudera.blog.osx.localinstall', 'DAILY:20140901', '10012' put 'URL_HITS', 'com.cloudera.blog.osx.localinstall', 'YEARLY:2014', '93310101' scan 'URL_HITS'

Kite SDK Test

Get familiar with the Kite SDK by trying out this example that loads data to both HDFS and then HBase. Note that there are a few common issues on your OSX that may surface when running through the Kite SDK example. They can be easily resolved with additional setup/config as specified below.

Problem:  NoClassDefFoundError: org/apache/hadoop/hive/metastore/api/NoSuchObjectException

Resolution:  Fix your classpath by making sure to set HIVE_HOME and HCAT_HOME in your environment.

export HIVE_HOME="/Users/jordanh/cloudera/${CDH}/hive" export HCAT_HOME="/Users/jordanh/cloudera/${CDH}/hive/hcatalog"

Problem:  InvocationTargetException Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path

Resolution:  Snappy libraries are not compiled for Mac OSX out of the box. A Snappy Java port was introduced in CDH 5 and likely will require to be recompiled on your machine.

git clone https://github.com/xerial/snappy-java.git cd snappy-java make cp target/snappy-java- $HADOOP_HOME/share/hadoop/common/lib/asnappy-java-

Landing Page

Creating a landing page will help consolidate all the HTTP addresses of the services that you’re running. Please note that localhost can be replaced with your local hostname (such as jakuza-mbp.local).

Service Apache HTTPD

start: sudo -s launchctl load -w /System/Library/LaunchDaemons/org.apache.httpd.plist

stop: sudo -s launchctl unload -w /System/Library/LaunchDaemons/org.apache.httpd.plist

logs: /var/log/apache2/

url: http://localhost/index.html

Create index.html (edit /Library/WebServer/Documents/index.html, which you can download here).

It will look something like this:



With this guide, you should have a locally running Hadoop cluster with HDFS, MapReduce, and HBase. These are the core components for Hadoop, and are good initial foundation for building and prototyping your applications locally. 

I hope this will be a good starting point on your dev box to try out more ways to build your products, whether they are data pipelines, analytics, machine learning, search and exploration, or more, on the Hadoop stack. 

Jordan Hambleton is a Solutions Architect at Cloudera.

Categories: Hadoop

Apache Kafka for Beginners

Cloudera Blog - Fri, 09/12/2014 - 18:10

When used in the right way and for the right use case, Kafka has unique attributes that make it a highly attractive option for data integration.

Apache Kafka is creating a lot of buzz these days. While LinkedIn, where Kafka was founded, is the most well known user, there are many companies successfully using this technology.

So now that the word is out, it seems the world wants to know: What does it do? Why does everyone want to use it? How is it better than existing solutions? Do the benefits justify replacing existing systems and infrastructure?
In this post, we’ll try to answers those questions. We’ll begin by briefly introducing Kafka, and then demonstrate some of Kafka’s unique features by walking through an example scenario. We’ll also cover some additional use cases and also compare Kafka to existing solutions.

What is Kafka?

Kafka is one of those systems that is very simple to describe at a high level, but has an incredible depth of technical detail when you dig deeper. The Kafka documentation does an excellent job of explaining the many design and implementation subtleties in the system, so we will not attempt to explain them all here. In summary, Kafka is a distributed publish-subscribe messaging system that is designed to be fast, scalable, and durable.

Like many publish-subscribe messaging systems, Kafka maintains feeds of messages in topics. Producers write data to topics and consumers read from topics. Since Kafka is a distributed system, topics are partitioned and replicated across multiple nodes.

Messages are simply byte arrays and the developers can use them to store any object in any format – with String, JSON, and Avro the most common. It is possible to attach a key to each message, in which case the producer guarantees that all messages with the same key will arrive to the same partition. When consuming from a topic, it is possible to configure a consumer group with multiple consumers. Each consumer in a consumer group will read messages from a unique subset of partitions in each topic they subscribe to, so each message is delivered to one consumer in the group, and all messages with the same key arrive at the same consumer.

What makes Kafka unique is that Kafka treats each topic partition as a log (an ordered set of messages). Each message in a partition is assigned a unique offset. Kafka does not attempt to track which messages were read by each consumer and only retain unread messages; rather, Kafka retains all messages for a set amount of time, and consumers are responsible to track their location in each log. Consequently, Kafka can support a large number of consumers and retain large amounts of data with very little overhead.

Next, let’s look at how Kafka’s unique properties are applied in a specific use case.

Kafka at Work

Suppose we are developing a massive multiplayer online game. In these games, players cooperate and compete with each other in a virtual world. Often players trade with each other, exchanging game items and money, so as game developers it is important to make sure players don’t cheat: Trades will be flagged if the trade amount is significantly larger than normal for the player and if the IP the player is logged in with is different than the IP used for the last 20 games. In addition to flagging trades in real-time, we also want to load the data to Apache Hadoop, where our data scientists can use it to train and test new algorithms.

For the real-time event flagging, it will be best if we can reach the decision quickly based on data that is cached on the game server memory, at least for our most active players. Our system has multiple game servers and the data set that includes the last 20 logins and last 20 trades for each player can fit in the memory we have, if we partition it between our game servers.

Our game servers have to perform two distinct roles: The first is to accept and propagate user actions and the second to process trade information in real time and flag suspicious events. To perform the second role effectively, we want the whole history of trade events for each user to reside in memory of a single server. This means we have to pass messages between the servers, since the server that accepts the user action may not have his trade history. To keep the roles loosely coupled, we use Kafka to pass messages between the servers, as you’ll see below.

Kafka has several features that make it a good fit for our requirements: scalability, data partitioning, low latency, and the ability to handle large number of diverse consumers. We have configured Kafka with a single topic for logins and trades. The reason we need a single topic is to make sure that trades arrive to our system after we already have information about the login (so we can make sure the gamer logged in from his usual IP). Kafka maintains order within a topic, but not between topics.

When a user logs in or makes a trade, the accepting server immediately sends the event into Kafka. We send messages with the user id as the key, and the event as the value. This guarantees that all trades and logins from the same user arrive to the same Kafka partition. Each event processing server runs a Kafka consumer, each of which is configured to be part of the same group—this way, each server reads data from few Kafka partitions, and all the data about a particular user arrives to the same event processing server (which can be different from the accepting server). When the event-processing server reads a user trade from Kafka, it adds the event to the user’s event history it caches in local memory. Then it can access the user’s event history from the local cache and flag suspicious events without additional network or disk overhead.

It’s important to note that we create a partition per event-processing server, or per core on the event-processing servers for a multi-threaded approach. (Keep in mind that Kafka was mostly tested with fewer than 10,000 partitions for all the topics in the cluster in total, and therefore we do not attempt to create a partition per user.)

This may sound like a circuitous way to handle an event: Send it from the game server to Kafka, read it from another game server and only then process it. However, this design decouples the two roles and allows us to manage capacity for each role as required. In addition, the approach does not add significantly to the timeline as Kafka is designed for high throughput and low latency; even a small three-node cluster can process close to a million events per second with an average latency of 3ms.

When the server flags an event as suspicious, it sends the flagged event into a new Kafka topic—for example, Alerts—where alert servers and dashboards pick it up. Meanwhile, a separate process reads data from the Events and Alerts topics and writes them to Hadoop for further analysis.

Because Kafka does not track acknowledgements and messages per consumer it can handle many thousands of consumers with very little performance impact. Kafka even handles batch consumers—processes that wake up once an hour to consume all new messages from a queue—without affecting system throughput or latency.

Additional Use Cases

As this simple example demonstrates, Kafka works well as a traditional message broker as well as a method of ingesting events into Hadoop.

Here are some other common uses for Kafka:

  • Website activity tracking: The web application sends events such as page views and searches Kafka, where they become available for real-time processing, dashboards and offline analytics in Hadoop
  • Operational metrics: Alerting and reporting on operational metrics. One particularly fun example is having Kafka producers and consumers occasionally publish their message counts to a special Kafka topic; a service can be used to compare counts and alert if data loss occurs.
  • Log aggregation: Kafka can be used across an organization to collect logs from multiple services and make them available in standard format to multiple consumers, including Hadoop and Apache Solr.
  • Stream processing: A framework such as Spark Streaming reads data from a topic, processes it and writes processed data to a new topic where it becomes available for users and applications. Kafka’s strong durability is also very useful in the context of stream processing.

Other systems serve many of those use cases, but none of them do them all. ActiveMQ and RabbitMQ are very popular message broker systems, and Apache Flume is traditionally used to ingest events, logs, and metrics into Hadoop.

Kafka and Its Alternatives

We can’t speak much about message brokers, but data ingest for Hadoop is a problem we understand very well.

First, it is interesting to note that Kafka started out as a way to make data ingest to Hadoop easier. When there are multiple data sources and destinations involved, writing a separate data pipeline for each source and destination pairing quickly evolves to an unmaintainable mess. Kafka helped LinkedIn standardize the data pipelines and allowed getting data out of each system once and into each system once, significantly reducing the pipeline complexity and cost of operation.

Jay Kreps, Kafka’s architect at LinkedIn, describes this familiar problem well in a blog post:

My own involvement in this started around 2008 after we had shipped our key-value store. My next project was to try to get a working Hadoop setup going, and move some of our recommendation processes there. Having little experience in this area, we naturally budgeted a few weeks for getting data in and out, and the rest of our time for implementing fancy prediction algorithms. So began a long slog.

Diffs versus Flume

There is significant overlap in the functions of Flume and Kafka. Here are some considerations when evaluating the two systems.

  • Kafka is very much a general-purpose system. You can have many producers and many consumers sharing multiple topics. In contrast, Flume is a special-purpose tool designed to send data to HDFS and HBase. It has specific optimizations for HDFS and it integrates with Hadoop’s security. As a result, Cloudera recommends using Kafka if the data will be consumed by multiple applications, and Flume if the data is designated for Hadoop.
  • Those of you familiar with Flume know that Flume has many built-in sources and sinks. Kafka, however, has a significantly smaller producer and consumer ecosystem, and it is not well supported by the Kafka community. Hopefully this situation will improve in the future, but for now: Use Kafka if you are prepared to code your own producers and consumers. Use Flume if the existing Flume sources and sinks match your requirements and you prefer a system that can be set up without any development.
  • Flume can process data in-flight using interceptors. These can be very useful for data masking or filtering. Kafka requires an external stream processing system for that.
  • Both Kafka and Flume are reliable systems that with proper configuration can guarantee zero data loss. However, Flume does not replicate events. As a result, even when using the reliable file channel, if a node with Flume agent crashes, you will lose access to the events in the channel until you recover the disks. Use Kafka if you need an ingest pipeline with very high availability.
  • Flume and Kafka can work quite well together. If your design requires streaming data from Kafka to Hadoop, using a Flume agent with Kafka source to read the data makes sense: You don’t have to implement your own consumer, you get all the benefits of Flume’s integration with HDFS and HBase, you have Cloudera Manager monitoring the consumer and you can even add an interceptor and do some stream processing on the way.

As you can see, Kafka has a unique design that makes it very useful for solving a wide range of architectural challenges. It is important to make sure you use the right approach for your use case and use it correctly to ensure high throughput, low latency, high availability, and no loss of data.

Gwen Shapira is a Software Engineer at Cloudera, and a Kafka contributor. Jeff Holoman is a Systems Engineer at Cloudera.

Categories: Hadoop

Getting Started with Big Data Architecture

Cloudera Blog - Wed, 09/10/2014 - 14:12

What does a “Big Data engineer” do, and what does “Big Data architecture” look like? In this post, you’ll get answers to both questions.

Apache Hadoop has come a long way in its relatively short lifespan. From its beginnings as a reliable storage pool with integrated batch processing using the scalable, parallelizable (though inherently sequential) MapReduce framework, we have witnessed the recent additions of real-time (interactive) components like Impala for interactive SQL queries and integration with Apache Solr as a search engine for free-form text exploration.

Getting started is now also a lot easier: Just install CDH, and all the Hadoop ecosystem components are at your disposal. But after installation, where do you go from there? What is a good first use case? How do you ask those “bigger questions”?

Having worked with more customers running Hadoop in production than any other vendor, Cloudera’s field technical services team has seen more than its fair share of these use cases. Although they obviously vary by industry and application, there is a common theme: the presence of Big Data architecture.

In this post, you’ll get a whirlwind tour of that architecture based on what we’ve seen at customer sites over the past couple of years, and get some tips/initial advice about building your own as the foundation for an enterprise data hub.

Big Data Architecture

Big Data architecture is premised on a skill set for developing reliable, scalable, completely automated data pipelines. That skill set requires profound knowledge of every layer in the stack, beginning with cluster design and spanning everything from Hadoop tuning to setting up the top chain responsible for processing the data. The following diagram shows the complexity of the stack, as well as how data pipeline engineering touches every part of it.

The main detail here is that data pipelines take raw data and convert it into insight (or value). Along the way, the Big Data engineer has to make decisions about what happens to the data, how it is stored in the cluster, how access is granted internally, what tools to use to process the data, and eventually the manner of providing access to the outside world. The latter could be BI or other analytic tools, the former (for the processing) are likely tools such as Impala or Apache Spark. The people who design and/or implement such architecture I refer to as Big Data engineers.

In the remainder of this post, you’ll learn about the various components in the stack and their role in creating data pipelines.

Cluster Planning

Cluster planning is a “chicken-and-egg” problem, as cluster design is inherently driven by the use-case(s) running later on, and often the use case is not yet clear. Most vendors, including Cloudera, have a reference architecture guideline to help you select the proper class of machines. (For Cloudera certified partners, see the online listing.)

In general, the current recommended machines are dual CPU with 4 to 8 cores each, at least 48GB of RAM up to 512GB (for low latency analytical workloads where lots of data is cached), at least 6 HDDs (hard disk drives), up to 12 or larger for storage heavy configurations, and otherwise standard rack-mountable 19″ servers. Sometimes we also see SSD (solid state drive) setups for low-latency use cases, although the results are not as dramatic as one would assume. (Please test carefully.)

When in doubt, you can always try the public (or private) cloud services first, and once you know your requirements better, you can move things around. If you do so, be generous about machine size for getting comparable results with bare-metal hardware – remember, you are in a shared environment and need to factor-in competing loads and slower data connections (network and virtualized storage).


After you have spun up your cluster, you have to decide how to load data. In practice there are two main approaches: batch and event-driven. The former is appropriate for file and structured data, while the latter is appropriate for most near-real-time events such as log or transactional data.

Batch Ingest

Let me start with the more straightforward case: ingesting data from structured data sources (for example, an RDBMS). The weapon of choice is universally Apache Sqoop, which allows you to move data into Hadoop from RDBMSs. You can select partial (column projection and row selection) or full data sets and do full or (given some requirements) incremental transfers. Sqoop uses MapReduce as its workhorse and employs default JDBC drivers for many database systems—or, if necessary, specialized drivers that speed up the data transfer.

The more complex batch ingest method is file loading. Here there are many ways to achieve that but none are really established. In fact, when possible, it is better to switch to the event ingest explained below to avoid bulk loading of files. The matter is complicated by the location of the files (on site or remote), as well as the API to load them (the HDFS put command being the simplest one; there are also REST based APIs with WebHDFS and HttpFS).

But how can you reliably ingest files without human intervention as demanded by Big Data architecture? I have yet to see a solution here, and for now can only point to bespoke custom scripting (Bash, Python, Java and so on) or the vast Hadoop partner field, which has lots to offer on the data integration topic.

On their own, these tools are one-off jobs only—they get invoked and do their work. What is missing is automatic ingest so that the data pipeline is constantly processing data. We’ll pick that up in the “Productionization” section below.

Event Ingest

For event-based ingest there is Apache Flume, which allows you to define a redundant, failsafe network of so-called agents that transport event records from a generating system to the consuming one. The latter might be HDFS, but it can also be Spark or HBase, or a combination of both.

Flume has been battle-tested at large user clusters and allows you to reliably deliver data to where it is needed. The tricky part is to configure the Flume topology and the agents correctly. The agents need to be able to buffer enough data on persistent media so that all anticipated “normal” server failures are covered. Also, tuning the batch sizes of events that are sent between agents is vital to achieve either higher throughput or lower latencies (faster message delivery).


Once the data has arrived in Hadoop as a whole, there remains the task of staging it for processing. This is not just about storing it somewhere, but rather storing it in the right format, with the right size, and the right access mask.

Storage Formats

The right data format depends on the subsequent use case. Whether the application is batch or real-time is again relevant, but so is whether the format retains the full fidelity of data and is open source (i.e. can be used by more than one tool in the processing stage).

For batch, container file formats, including the venerable SequenceFile and Avro formats, are both useful and popular. As for the analytical, real-time application, the new rising star is Apache Parquet (incubating), which similar to columnar databases lays out the data in columns with built-in structure and compression (e.g. skip NULL values) that allow you to very efficiently scan very large data sets (assuming a selective query pattern).

In addition to the file format, you should also strongly consider encoding and compression formats because the best I/O in Big Data is the one you are not doing. Compression is always a good thing for reducing I/O while loading more data with fewer bytes being moved around. The proper approach is driven by CPU-versus-compression ratio trade-offs, because the better a codec compresses, the more CPU it usually needs. Thus, the data we see is almost always compressed with the Snappy codec, which is super-fast and lightweight yet offers decent compression ratios. For historical data, BZip2 or something similar is often used.

It is also important to think about what happens with your data over time. You might want to implement policies that rewrite older data into different file or compression formats, so that you make better use of the available cluster capacity. As data ages and is accessed less often, it is worthwhile to trade back the compression ratio against CPU usage. Here there are no incumbent tools to help you out, and in the field I often see rather custom solutions (scripting again)… or none at all (which is not good).

Data Partitioning

As you land data, there is another important aspect to consider: how you partition or, more generally, size data. For starters, Hadoop is good at managing fewer very large files. You do not want to design an architecture that lands many small files in HDFS and then be surprised when the NameNode starts to perform badly. You can, of course, land small files, but you would need to implement an ETL stage (or rather TL as no extract is needed) that combines smaller files into larger ones.

While you are transforming files as they arrive, the next step is to split them into decent chunks for later processing. This is usually done using partitions on HDFS. In HBase the partitioning is implicit as it divides data into regions of contiguous rows, sorted by their row key; it splits and rebalances as it goes along. For HDFS, you have to plan ahead of time—you might need to sample data and explore its structure to decide what is best for you. The rule of thumb, though, is for partitions to span at least a decent amount of data worth processing without creating the small-file problem mentioned above. I would advise you to start with a partition amounting to at least 1GB in a single file, and knowing the size of the total dataset, tune this up to even larger sizes. So for very large datasets in the hundreds of TBs and up, I would have each file in a partition be 10GB, or even 100GB or more.

One final note: make sure the file format supports splitting the files into smaller blocks for parallel processing. The above suggested container formats usually do that, but you might want to double check (look for splittable support). If not, you can end up with suboptimal performance across the cluster because a single reader has to process a single large file (that is, your parallelism rate drops considerably).

Access Control

The last part you have to consider is what we call information architecture (IA), which addresses the need to lay out the data in such a way that multiple teams can work safely on a shared cluster—also referred to as multi-tenancy.

It is not enough to have each job read from one directory and emit to another. If you share a cluster across departments, you need to devise a concise access schema that controls tightly (and possibly supports auditing of) who has access to what data. The IA is where these rules are defined—for example, by using user groups and other HDFS features (see the new extended ACLs features in HDFS or Apache Sentry) to map business units into owners of data. With that, you can further define a plan on how data is read from storage during processing and pushed through the various stages of the data processing pipeline.

One way to handle proper processing is to create a time-stamped directory for every running job and then within a further directory structure for incoming (for example from a previous job), currently being processed, and final (as well as permanently failed) files. This ensures that jobs can run in parallel without overwriting each other’s data mid-flight.

We won’t cover this issue in detail here, but IA should also account for data backups (for disaster recovery or load balancing). You need a strategy for moving data across multiple clusters or even data centers.

Data Processing

Thus far you have learned about landing and staging the incoming data. The next step is automatically processing it as part of the data pipeline.

Data Transformation

This is the part mentioned above, i.e. where you process existing data, for example, to transform it into other file formats or other compression algorithms. Just because you transform your data doesn’t mean you need to lose any of its detail: this is not your typical ETL which is often lossy, but rather an optional step to increase the effectiveness of your cluster. Plan to do whatever is needed for staging, which might also extend to rewriting data over time (or based on changing requirements). You could, for example, employ heuristics that check how often and in what way data is used and change its layout over time.


The more interesting part of processing is the analytics done on top of the staged data. Here we see the venerable MapReduce—now rejuvenated on top of YARN—as well as other frameworks, such as Spark or Apache Giraph. On top of that layer there are other abstractions in use, notably Apache Crunch and Cascading.

The currently most hyped topic is machine learning, wherein you build mathematical models for recommendations or clustering/classification of incoming new data—for example, to do risk assessment, fraud detection, or spam filtering. The more “mundane” tasks in analysis, such as building aggregations and reporting data, are still very common. In fact, the latter is more than 90% of the use cases we see, with the former being an emerging area.

Either way, after prototyping the algorithm and approach, you have to convert it into an automated workflow.

Egress and Querying

As for providing access to the data, you need to find one that covers all types of users, from novices to experts. The access spans from the ubiquitous Search using Apache Solr, to JDBC interfaces that SQL users and BI tools can use, all the way to low-level APIs—and eventually, raw file access. Regardless of the access method, the data is never copied nor siloed into lesser data structures: all these tools work on the single source of truth represented as the full-fidelity files in HDFS or key values in HBase. Whether you use Impala or Hive to issue SQL commands, the Kite SDK to read files, or process data with interactive Spark, you are always working on the same copy of data.

In fact, that’s what makes Hadoop so powerful, as it removes the need to move data around and transform it to “yet another (lesser) schema”. The integration of Hadoop into the enterprise IT landscape with Kerberos authentication, role-based authorization, and log-based auditing completes the picture.

Data Pipelines

Before we can automate, we have to combine the tools described above into more complex data pipelines. There are two main types of such pipelines: micro and macro.

Micro-pipelines are streamlined helpers that allow you to abstract (and therefore simplify) parts of the larger processing. Tools for this purpose include Morphlines (see “Introducing Morphlines: The Easy Way to Build and Integrate ETL Apps for Hadoop” or details), Crunch, and Cascading. Morphlines tie together smaller processing steps applied to each record or data pair as it flows through the processing. That lets you build tested, reusable processing sub-steps, used for example to cleanse data or enhance its metadata for later steps.

In contrast, Crunch and Cascading define an abstraction layer on top of the processing, where you deal with data points. You define how data is consumed, routed, processed, and emitted, which translates into one or more processing job on MapReduce and/or Spark. But a Crunch or Cascading “meta” job can further be combined to yet more complex workflows, which is usually done in macro-pipelines.

Apache Oozie is one of those macro-pipelines tools. It defines workflows as directed, acyclic graphs (DAGs) that have control and action elements, where the former influences how the flow proceeds and the latter what has to be done for each step. Oozie also has a server component that tracks the running flows and measures to handle their completion (or termination).

As with single jobs or micro-pipelines, a “workflow” is not automated but rather just a definition of work. It has to be invoked manually to start the flow processing. This is where another part of Oozie, the coordinators, come in. Oozie coordinators help define the time or frequency a workflow should run, and/or the dependencies to other workflows and data sources. With this feature, you can define the missing link in automating processing.


We have closed the loop above and now data pipelines can run in an automated fashion, consuming and producing data as needed. But for a Big Data engineer, I argue there is one more piece to the puzzle: production-ization.

Superficially, it sounds like a trivial task since the hard work has been “done.” In practice, this last step is a challenge because it spans the entire stack and requires careful release planning, with proper testing (QA), staging, and deployment phases. It also includes operating the data pipelines, which means monitoring, reporting, and alerting. Finally, insights about the performance of the pipelines might trigger cluster changes, from hardware to configuration settings.

There are some tools that help you along the way. For example, Cloudera Manager can track cluster utilization and job performance, and Cloudera Navigator can define data lifecycle rules, including metadata about the source and lineage of data. A Big Data engineer is still needed to fill in the gaps, while maintaining the entire pipeline in production.

The following diagram adds the discussed tools and concepts to the data pipeline architecture:


Please do consider the option of having constant support for your Hadoop cluster and data pipelines in production (but also in development). The complexity of running a cluster in such a mode is not trivial and can cause considerable delays when things go wrong. Cloudera Manager will help you tremendously to reduce the time to discover a root cause for a problem, and often you can apply a fix yourself. But there are also many issue we have seen in practice that require a Hadoop engineer to lend a helping hand. Obviously Cloudera has such a team of engineers, in fact a multilayered, dedicated team, which can help you solve any problem you might face.


While Hadoop has grown tremendously, there are still functional gaps for putting data pipelines into production easily, so skilled Big Data engineers are needed. Demand for these engineers is high and expected to grow, and Cloudera’s new “Designing and Building Big Data Applications” training course can teach you the skills you will need to excel in this role.

The Hadoop ecosystem, helpfully, offers most of the tools needed to build and automate these pipelines based on business rules—testing and deploying pipelines is easier with proper tooling support, while operating the same pipelines in production can be equally automated and transparent. As time moves on, missing functionality will be provided either by Cloudera, by third-party vendors, or as part of the open source ecosystem.

In a future world, we will be able to point Hadoop to a source, internal or external, batch or streaming, and press an “Implement Pipeline” button. The initial parameters will be assumed, and further learned and adjusted as needed, resulting in data being laid out for the current use case, be it interactive or automated (or both).

We can dream. In the meantime, happy Hadoop-ing!

Lars George is Cloudera’s EMEA Chief Architect, an HBase committer and PMC member, and the author of O’Reilly’s HBase: The Definitive Guide.

Categories: Hadoop

The Early Release Books Keep Coming: This Time, Hadoop Security

Cloudera Blog - Mon, 09/08/2014 - 15:33

Hadoop Security is the latest book in the Hadoop ecosystem books canon.

We are thrilled to announce the availability of the early release of Hadoop Security, a new book about security in the Apache Hadoop ecosystem published by O’Reilly Media. The early release contains two chapters on System Architecture and Securing Data Ingest and is available in O’Reilly’s catalog and in Safari Books.

The goal of the book is to serve the experienced security architect that has been tasked with integrating Hadoop into a larger enterprise security context. System and application administrators also benefit from a thorough treatment of the risks inherent in deploying Hadoop in production and the associated how and why of Hadoop security.

As Hadoop continues to mature and become ever more widely adopted, material must become specialized for the security architects tasked with ensuring new applications meet corporate and regulatory policies. While it is up to operations staff to deploy and maintain the system, they won’t be responsible for determining what policies their systems must adhere to. Hadoop is mature enough that dedicated security professionals need a reference to navigate the complexities of security on such a massive scale. Additionally, security professionals must be able to keep up with the array of activity in the Hadoop security landscape as exemplified by new projects like Apache Sentry (incubating) and cross-project initiatives such as Project Rhino.

Security architects aren’t interested in how to write a MapReduce job or how HDFS splits files into data blocks, they care about where data is going and who will be able to access it. Their focus is on putting into practice the policies and standards necessary to keep their data secure. As more corporations turn to Hadoop to store and process their most valuable data, the risks with a potential breach of those systems increases exponentially. Without a thorough treatment of the subject, organizations will delay deployments or resort to siloed systems that increase capital and operating costs.

The first chapter available is on the System Architecture where Hadoop is deployed. It goes into the different options for deployment: in-house, cloud, and managed. The chapter also covers how major components of the Hadoop stack get laid out physically from both a server perspective and a network perspective. It gives a security architect the necessary background to put the overall security architecture of a Hadoop deployment into context.

The second available chapter is on Securing Data Ingest it covers the basics of Confidentiality, Integrity, and Availability (CIA) and applies them to feeding your cluster with data from external systems. In particular, the two most common data ingest tools, Apache Flume and Apache Sqoop, are evaluated for their support of CIA. The chapter details the motivation for securing your ingest pipeline as well as providing ample information and examples on how to configure these tools for your specific needs. The chapter also puts the security of your Hadoop data ingest flow into the broader context of your enterprise architecture.

We encourage you to take a look and get involved early. Security is a complex topic and it never hurts to get a jump start on it. We’re also eagerly awaiting feedback. We would never have come this far without the help of some extremely kind reviewers. You can also expect more chapters to come in the coming months. We’ll continue to provide summaries on this blog as we release new content so you know what to expect.

Ben Spivey is a Solutions Architect at Cloudera, and Joey Echeverria is a Software Engineer at Cloudera.

Categories: Hadoop

This Month in the Ecosystem (August 2014)

Cloudera Blog - Fri, 09/05/2014 - 18:09

Welcome to our 12th (first annual!) edition of “This Month in the Ecosystem,” a digest of highlights from August 2014 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly).

  • Developers at Sigmoid Analytics described the Spork project, with the goal of giving Apache Pig access to Apache Spark as a data processing backend. Efforts to do the same for other ecosystem components are also underway (such as the Apache Mahout community’s movement of Mahout’s item-based collaborative filtering recommender to Spark).
  • The Transaction Processing Council (TPC) announced a new Big Data performance benchmark called TPCx-HS, largely based on TeraSort. To learn more about why TPCx-HS is a good first step toward establishing a useful benchmark in this area, read this interview with TPC-DS architect Francois Raab and Cloudera Performance Engineer Yanpei Chen.
  • The GlusterFS community announced that CDH 5 has been successfully tested on that filesystem. Similar work is underway for Impala specifically.
  • Apache Hadoop 2.5 was released. It includes extended file attributes for HDFS (HDFS-2006), a new feature that is explained in detail here.
  • An early release of the new O’Reilly Media book, Using Flumebecame available. The author, Hari Shreedharan, is a Software Engineer at Cloudera and an Apache Flume committer/PMC member.
  • Kite SDK 0.16 was released.
  • Adobe Research open sourced Spindle, its web analytics processing system based on Spark, Apache Parquet (incubating), and CDH 4.7.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

Pig is Flying: Apache Pig on Apache Spark

Cloudera Blog - Thu, 09/04/2014 - 16:10

Our thanks to Mayur Rustagi (@mayur_rustagi), CTO at Sigmoid Analytics, for allowing us to re-publish his post about the Spork (Pig-on-Spark) project below. (Related: Read about the ongoing upstream to bring Spark-based data processing to Hive here.)

Analysts can talk about data insights all day (and night), but the reality is that 70% of all data analyst time goes into data processing and not analysis. At Sigmoid Analytics, we want to streamline this data processing pipeline so that analysts can truly focus on value generation and not data preparation.

We focus our efforts on three simple initiatives:

  • Make data processing more powerful
  • Make data processing more simple
  • Make data processing 100x faster than before

As a data mashing platform, the first key initiative is to combine the power and simplicity of Apache Pig on Apache Spark, making existing ETL pipelines 100x faster than before. We do that via a unique mix of our operator toolkit, called DataDoctor, and Spark.

DataDoctor is a high-level operator DSL on top of Spark. It has frameworks for no-symmetrical joins, sorting, grouping, and embedding native Spark functions. It hides a lot of complexity and makes it simple to implement data operators used in applications like Pig and Apache Hive on Spark.

For the uninitiated, Spark is open source Big Data infrastructure that enables distributed fault-tolerant in-memory computation. As the kernel for the distributed computation, it empowers developers to write testable, readable, and powerful Big Data applications in a number of languages including Python, Java, and Scala.

How Can I Get Started?

As a user of Apache Pig, the migration effort starts and ends with

pig -x spark


All your existing UDF, Pig scripts, and data loaders will work out of the box on Spark — which means you can write simpler, easier-to-develop-and-manage data pipelines on Spark. The Pig REPL is a simple way to speed up your data processing on Spark without any coding, compiling, or development effort. What’s more, you have thousands of Pig UDFs to choose from and bootstrap your ETL process on Spark.

High-Level Design

Pig operates in a similar manner to Big Data applications like Hive and Cascading. It has a query language quite akin to SQL that allows analysts and developers to design and write data flow. The query language is translated into a “logical plan” that is is further translated into a “physical plan” containing operators. Those operators are then run on the designated execution engine (MapReduce, Apache Tez, and now Spark). There are a whole bunch of details around tracking progress, handling errors, and so on that I will skip here.

Query Planning

Query planning on Spark will vary significantly from MapReduce, as Spark handles data wrangling in a much more optimized way. Further query planning can benefit greatly from ongoing effort on Catalyst inside Spark. At this moment, we have simply introduced a SparkPlanner that will undertake the conversion from logical to physical plan for Pig. Databricks is working actively to enable Catalyst to handle much of the operator optimizations that will plug into SparkPlanner in the near future. Longer term, we plan to rely on Spark itself for logical plan generation. An early version of this integration has been prototyped in partnership with Databricks.

Pig Launcher

Pig Core hands off Spark execution to SparkLauncher with the physical plan. SparkLauncher creates a SparkContext providing all the Pig dependency jars and Pig itself.

SparkLauncher gets a MR plan object created from the physical plan. At this point, we override all the Pig operators to DataDoctor operators recursively in the whole plan. Two iterations are performed over the plan — one which looks at the store operations and recursively travels down the execution tree, and a second iteration that does a breadth-first traversal over the plan and calls convert on each of the operators.

The base class of convertors in DataDoctor is POConverter class and defines the abstract method convert, which is called during plan execution.

Interesting Operators
  • LoadOperator: An RDD is created for the data that can be used for subsequent transformations. LoadConverter helps load data from HDFS using the Spark API with parameters initialized from POLoad operator.
  • StoreOperator: This operator is useful for saving the end results or some intermediate data whenever required. StoreConverter is used to save data to HDFS with parameters from POStore operator.
  • Local rearrange: LocalRearrangeConverter directly passes data to POLocalRearrangeConverter, which in turn transforms data into the required format. This happens through the Spark map API. The local rearrange operator is a part of the COGROUP implementation. It has an embedded physical plan that generates tuples of the form (grpKey,(indxed inp Tuple))
  • Global rearrange: GlobalRearrangeConverter is used in case of a groupBy operation or a join operation; the converter method uses groupBy and map APIs from Spark to achieve that. In the case of a groupBy operation, results are converted into the form (key, Iterator(values)). In the case of a COGROUP operation, results are in the form (index, key, value).

You can catch the finer details of migration plan in PIG-4059 or give Pig on Spark a go at our Github repo. We know it’s not perfect, so you can file issues here as well while we get Apache JIRA into shape.


I am happy to announce that we have passed 100% of end-to-end test cases on Pig, which means all your Pig code should run pretty smoothly already. When merged with the Pig repository, you will be able to get builds directly from the Pig website as well.

All this would not have been possible without the hard work from many organizations and people: Praveen R (Sigmoid Analytics), Akhil Das (Sigmoid Analytics), Kamal Banga (Sigmoid Analytics), Anish Haldiya (Sigmoid Analytics), Mayur Rustagi (Sigmoid Analytics), Amit Kumar Behera (Sigmoid Analytics), Mahesh Kalakoti (Sigmoid Analytics), Julien Le Dem (Twitter),  Bill Graham (Twitter), Dmitriy Ryaboy (Twitter), Aniket Mokashi (Google), and Greg Owen (Databricks).

Future Plans

Finally, as we merge to Apache Pig, we are focusing on the following enhancements to further improve the speed on Pig:

  • Cache Operator: Adding a new operator to explicitly hint Spark to cache certain datasets for faster execution
  • Storage Hints: Allowing user to specify storage location of datasets in Spark for better control of memory
  • YARN and Mesos Support: Adding resource manager support for more global deployment and support

Mayur Rustagi has four years of experience in building end-to-end architecture for big data applications. He is currently the CTO of Sigmoid Analytics, which is focused on Real Time Streaming & ETL solutions on Apache Spark.


Categories: Hadoop

How-to: Translate from MapReduce to Apache Spark

Cloudera Blog - Tue, 09/02/2014 - 15:46

The key to getting the most out of Spark is to understand the differences between its RDD API and the original Mapper and Reducer API.

Venerable MapReduce has been Apache Hadoop‘s work-horse computation paradigm since its inception. It is ideal for the kinds of work for which Hadoop was originally designed: large-scale log processing, and batch-oriented ETL (extract-transform-load) operations.

As Hadoop’s usage has broadened, it has become clear that MapReduce is not the best framework for all computations. Hadoop has made room for alternative architectures by extracting resource management into its own first-class component, YARN. And so, projects like Impala have been able to use new, specialized non-MapReduce architectures to add interactive SQL capability to the platform, for example.

Today, Apache Spark is another such alternative, and is said by many to succeed MapReduce as Hadoop’s general-purpose computation paradigm. But if MapReduce has been so useful, how can it suddenly be replaced? After all, there is still plenty of ETL-like work to be done on Hadoop, even if the platform now has other real-time capabilities as well.

Thankfully, it’s entirely possible to re-implement MapReduce-like computations in Spark. They can be simpler to maintain, and in some cases faster, thanks to Spark’s ability to optimize away spilling to disk. For MapReduce, re-implementation on Spark is a homecoming. Spark, after all, mimics Scala‘s functional programming style and APIs. And the very idea of MapReduce comes from the functional programming language LISP.

Although Spark’s primary abstraction, the RDD (Resilient Distributed Dataset), plainly exposes map() and reduce() operations, these are not the direct analog of Hadoop’s Mapper or Reducer APIs. This is often a stumbling block for developers looking to move Mapper and Reducer classes to Spark equivalents.

Viewed in comparison with classic functional language implementations of map() and reduce() in Scala or Spark, the Mapper and Reducer APIs in Hadoop are actually both more flexible and more complex as a result. These differences may not even be apparent to developers accustomed to MapReduce, but, the following behaviors are specific to Hadoop’s implementation rather than the idea of MapReduce in the abstract:

  • Mappers and Reducers always use key-value pairs as input and output.
  • A Reducer reduces values per key only.
  • A Mapper or Reducer may emit 0, 1 or more key-value pairs for every input.
  • Mappers and Reducers may emit any arbitrary keys or values, not just subsets or transformations of those in the input.
  • Mapper and Reducer objects have a lifecycle that spans many map() and reduce() calls. They support a setup() and cleanup() method, which can be used to take actions before or after a batch of records is processed.

This post will briefly demonstrate how to recreate each of these within Spark — and also show that it’s not necessarily desirable to literally translate a Mapper and Reducer!

Key-Value Pairs as Tuples

Let’s say we need to compute the length of each line in a large text input, and report the count of lines by line length. In Hadoop MapReduce, this begins with a Mapper that produces key-value pairs in which the line length is the key, and count of 1 is the value:

public class LineLengthMapper extends Mapper { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { context.write(new IntWritable(line.getLength()), new IntWritable(1)); } }


It’s worth noting that Mappers and Reducers only operate on key-value pairs. So the input to LineLengthMapper, provided by a TextInputFormat, is actually a pair containing the line as value, with position within the file thrown in as a key, for fun. (It’s rarely used, but, something has to be the key.)

The Spark equivalent is:

lines.map(line => (line.length, 1))


In Spark, the input is an RDD of Strings only, not of key-value pairs. Spark’s representation of a key-value pair is a Scala tuple, created with the (a,b) syntax shown above. The result of the map() operation above is an RDD of (Int,Int) tuples. When an RDD contains tuples, it gains more methods, such as reduceByKey(), which will be essential to reproducing MapReduce behavior.

Reducer and reduce() versus reduceByKey()

To produce a count of line lengths, it’s necessary to sum the counts per length in a Reducer:

public class LineLengthReducer extends Reducer { @Override protected void reduce(IntWritable length, Iterable counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(length, new IntWritable(sum)); } }


The equivalent of the Mapper and Reducer above together is a one-liner in Spark:

val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)


Spark’s RDD API has a reduce() method, but it will reduce the entire set of key-value pairs to one single value. This is not what Hadoop MapReduce does. Instead, Reducers reduce all values for a key and emit a key along with the reduced value. reduceByKey() is the closer analog. But, that is not even the most direct equivalent in Spark; see groupByKey() below.

It is worth pointing out here that a Reducer’s reduce() method receives a stream of many values, and produces 0, 1 or more results. reduceByKey(), in contrast, accepts a function that turns exactly two values into exactly one — here, a simple addition function that maps two numbers to their sum. This associative function can be used to reduce many values to one for the caller. It is a simpler, narrower API for reducing values by key than what a Reducer exposes.

Mapper and map() versus flatMap()

Now, instead consider counting the occurrences of only words beginning with an uppercase character. For each line of text in the input, a Mapper might emit 0, 1 or many key-value pairs:

public class CountUppercaseMapper extends Mapper { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { for (String word : line.toString().split(" ")) { if (Character.isUpperCase(word.charAt(0))) { context.write(new Text(word), new IntWritable(1)); } } } }


The equivalent in Spark is:

lines.flatMap( _.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1)) )


map() will not suffice here, because map() must produce exactly one output per input, but unlike before, one line needs to yield potentially many outputs. Again, the map() function in Spark is simpler and narrower compared to what the Mapper API supports.

The solution in Spark is to first map each line to an array of output values. The array may be empty, or have many values. Merely map()-ing lines to arrays would produce an RDD of arrays as the result, when the result should be the contents of those arrays. The result needs to be “flattened” afterward, and flatMap() does exactly this. Here, the array of words in the line is filtered and converted into tuples inside the function. In a case like this, it’s flatMap() that’s required to emulate such a Mapper, not map().


It’s simple to write a Reducer that then adds up the counts for each word, as before. And in Spark, again, reduceByKey() could be used to sum counts per word. But what if for some reason the output has to contain the word in all uppercase, along with a count? In MapReduce, that’s:

public class CountUppercaseReducer extends Reducer { @Override protected void reduce(Text word, Iterable counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(new Text(word.toString().toUpperCase()), new IntWritable(sum)); } }


But reduceByKey() by itself doesn’t quite work in Spark, since it preserves the original key. To emulate this in Spark, something even more like the Reducer API is needed. Recall that Reducer’s reduce() method receives a key and Iterable of values, and then emits some transformation of those. groupByKey() and a subsequent map() can achieve this:

... .groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }


groupByKey() merely collects all values for a key together, and does not apply a reduce function. From there, any transformation can be applied to the key and Iterable of values. Here, the key is transformed to uppercase, and the values are directly summed.

Be careful! groupByKey() works, but also collects all values for a key into memory. If a key is associated to many values, a worker could run out of memory. Although this is the most direct analog of a Reducer, it’s not necessarily the best choice in all cases. For example, Spark could have simply transformed the keys after a call to reduceByKey:

... .reduceByKey(_ + _).map { case (word,total) => (word.toUpperCase,total) }


It’s better to let Spark manage the reduction rather than ask it to collect all values just for us to manually sum them.

setup() and cleanup()

In MapReduce, a Mapper and Reducer can declare a setup() method, called before any input is processed, to perhaps allocate an expensive resource like a database connection, and a cleanup() method to release the resource:

public class SetupCleanupMapper extends Mapper { private Connection dbConnection; @Override protected void setup(Context context) { dbConnection = ...; } ... @Override protected void cleanup(Context context) { dbConnection.close(); } }


The Spark map() and flatMap() methods only operate on one input at a time though, and provide no means to execute code before or after transforming a batch of values. It looks possible to simply put the setup and cleanup code before and after a call to map() in Spark:

val dbConnection = ... lines.map(... dbConnection.createStatement(...) ...) dbConnection.close() // Wrong!


However, this fails for several reasons:

  • It puts the object dbConnection into the map function’s closure, which requires that it be serializable (for example, by implementing java.io.Serializable). An object like a database connection is generally not serializable.
  • map() is a transformation, rather than an operation, and is lazily evaluated. The connection can’t be closed immediately here.
  • Even so, it would only close the connection on the driver, not necessarily freeing resources allocated by serialized copies.

In fact, neither map() nor flatMap() is the closest counterpart to a Mapper in Spark — it’s the important mapPartitions() method. This method does not map just one value to one other value, but rather maps an Iterator of values to an Iterator of other values. It’s like a “bulk map” method. This means that the mapPartitions() function can allocate resources locally at its start, and release them when done mapping many values.

Adding setup code is simple; adding cleanup code is harder because it remains difficult to detect when the transformed iterator has been fully evaluated. For example, this does not work:

lines.mapPartitions { valueIterator => val dbConnection = ... // OK val transformedIterator = valueIterator.map(... dbConnection ...) dbConnection.close() // Still wrong! May not have evaluated iterator transformedIterator }


A more complete formulation (HT Tobias Pfeiffer) is roughly:

lines.mapPartitions { valueIterator => if (valueIterator.isEmpty) { Iterator[...]() } else { val dbConnection = ... valueIterator.map { item => val transformedItem = ... if (!valueIterator.hasNext) { dbConnection.close() } transformedItem } } }


Although decidedly less elegant than previous translations, it can be done.

There is no flatMapPartitions() method. However, the same effect can be achieved by calling mapPartitions(), followed by a call to flatMap(a => a) to flatten.

The equivalent of a Reducer with setup() and cleanup() is just a groupByKey() followed by a mapPartitions() call like the one above. Take note of the caveat about using groupByKey() above, though.

But Wait, There’s More

MapReduce developers will point out that there is yet more to the API that hasn’t been mentioned yet:

  • MapReduce supports a special type of Reducer, called a Combiner, that can reduce shuffled data size from a Mapper.
  • It also supports custom partitioning via a Partitioner, and custom grouping for purposes of the Reducer via grouping Comparator.
  • The Context objects give access to a Counter API for accumulating statistics.
  • A Reducer always sees keys in sorted order within its lifecycle.
  • MapReduce has its own Writable serialization scheme.
  • Mappers and Reducers can emit multiple outputs at once.
  • MapReduce alone has tens of tuning parameters.

There are ways to implement or port these concepts into Spark, using APIs like the Accumulator, methods like groupBy() and the partitioner argument in various of these methods, Java or Kryo serialization, caching, and more. To keep this post brief, the remainder will be left to a follow-up post.

The concepts in MapReduce haven’t stopped being useful. It just now has a different and potentially more powerful implementation on Hadoop, and in a functional language that better matches its functional roots. Understanding the differences between Spark’s RDD API, and the original Mapper and Reducer APIs, helps developers better understand how all of them truly work and how to use Spark’s counterparts to best advantage.

Sean Owen is Director of Data Science at Cloudera, an Apache Mahout committer/PMC member, and a Spark contributor.

Categories: Hadoop