New in CDH 5.1: Hue’s Improved Search App

Cloudera Blog - 2 hours 11 min ago

Hue 3.6 (now packaged in CDH 5.1) has brought the second version of the Search App up to even higher standards. The user experience has been greatly improved, as the app now provides a very easy way to build custom dashboards and visualizations.

Below is a video demo-ing how to interactively explore some real Apache log data coming from the live Hue demo at cloudera.com/live. In just a few clicks, you can look for pages with errors, find the most popular Hue apps, identify the top Web browsers, or inspect user traffic on a gradient colored world map:

The main features in the new app include:

  • Dynamic interface updating in live
  • Drag-and-drop dashboard builder
  • Text, Timeline, Pie, Line, Bar, Map, Filters, Grid and HTML widgets
  • Solr Index creation wizard from a file

More is on the roadmap, such as integration with other Hue apps like Hive/HBase, export/import of results to Hadoop, and more data types to plot.

This tutorial explains how to index the Apache Log into Solr and start doing your own analytics. In the meantime, feel free to give the search dashboards a try via Hue 3.6 in CDH 5.1!

As usual, we welcome any feedback via @gethuehue-user, or our community discussion forum.

Categories: Hadoop

What’s New in Kite SDK 0.15.0?

Cloudera Blog - Tue, 07/29/2014 - 16:42

Kite SDK’s new release contains new improvements that make working with data easier.

Recently, Kite SDK, the open source toolset that helps developers build systems on the Apache Hadoop ecosystem, became a 0.15.0. In this post, you’ll get an overview of several new features and bug fixes.

Working with Datasets by URI

The new Datasets class lets you work with datasets based on individual dataset URIs. Previously, you had to open a dataset repository even if you only needed to do one action, like load a dataset. In 0.15.0, the right repository will automatically be used based on the dataset URI, so there is no need to work with repositories directly for most applications.

The old way:

DatasetRepository repo = DatasetRepositories.open("repo:hdfs:/datasets"); Dataset<GenericRecord> events = repo.load("events");


The new way, 0.15.0 and later:

Dataset<GenericRecord> events = Datasets.load("dataset:hdfs:/datasets/events");


An added benefit of using a dataset URI rather than a repository URI is fewer configuration options. The Kite command-line tool now accepts a dataset URI in place dataset names and repository options like --use-hdfs and --directory. The repository still defaults to Apache Hive if just names are used.

The old way:

$ ./dataset create users --use-hdfs --directory /datasets


The new way, 0.15.0 and later:

$ ./dataset create dataset:hdfs:/datasets/users


Dataset URIs are defined by the dataset implementations, but are mostly made from adding a dataset name to the repository URI and changing the prefix to “dataset”. Here are the basic URI patterns:

  • Local FS – dataset:file:/<path>/<dataset-name>
  • HDFS – dataset:hdfs:/<path>/<dataset-name>
  • Hive (external) – dataset:hive:/<path>/<dataset-name>
  • Hive (managed) – dataset:hive?dataset=<dataset-name>
  • HBase – dataset:hbase:<zk-hosts>/<dataset-name>

This release also includes experimental support for view URIs, which will be expanded in the next release to support Apache Oozie integration.

Improved Configuration for MR and Apache Crunch Jobs

The MapReduce input and output formats now use dataset (or view) URIs and a configuration builder that is easier to read:

Dataset eventsBeforeToday = Datasets.load("dataset:hive?dataset=events") .toBefore("timestamp", startOfToday()); DatasetKeyInputFormat.configure(mrJob).readFrom(eventsBeforeToday); DatasetKeyOutputFormat.configure(mrJob).writeTo("dataset:hive?dataset=sessions");


Crunch support received a similar update:

PCollection<Record> logs = getPipeline().read( CrunchDatasets.asSource("view:hdfs:/datasets/logs?level=WARN")); Command-line tool additions


We’ve added a copy command to the CLI tool that can be used to bulk copy one dataset or view into another. By default, it compacts the dataset into one output file per partition, but can be used to do map-only copies. Using dataset URIs, the tool will copy between any two datasets, including datasets stored in HBase.

# copy a Hive table into HBase $ ./dataset copy ratings dataset:hbase:zk1/ratings --no-compaction


CSV imports will now use the same copy task and can import the data in parallel if the source CSV files are in HDFS.

Parent POM for Kite Applications

Kite 0.15.0 includes a Maven POM file that can be used to reduce the annoyance of managing Hadoop dependencies in Maven projects. You can add it to your project by adding it as a parent POM. Then, your project will inherit a consistent set of dependencies for Kite, Hadoop, and other integrated projects from the Kite application POM, and will be updated when you change your Kite version.

<parent> <groupId>org.kitesdk</groupId> <artifactId>kite-app-parent-cdh4</artifactId> <version>0.15.0</version> </parent>


The application parent POM also configures Kite and Apache Avro / Maven integration plugins that you can turn on by adding a four-line plugin entry to your build.

<build> <plugins> <!-- compile schemas in src/main/avro to specific records --> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> </plugin> </plugins> </build>


The POMs for the Kite examples now use the Kite parent POM, and are much smaller as a result. The demo application’s POM is a good example of using Kite to manage Hadoop dependencies, and adding just the application-specific dependencies in the app’s POM.

Java Class Hints

We’ve added Java class arguments to the load, create, and update methods in the API that return datasets. The current DatasetRepository behavior hasn’t changed, but in the Datasets API, the class argument is needed any time you use specific or reflected objects.

Dataset<GenericRecord> generics = Datasets.load("dataset:hdfs:/data/events"); Dataset<Event> events = Datasets.load("dataset:hdfs:/data/events", Event.class);


This is needed for Kite to ensure it can produce the requested class or throw a helpful error message. Before this fix, Kite would happily produce a generic record if it can’t load your specific class, which causes a confusing ClassCastException somewhere in your code instead of telling you what went wrong.

The new class hint argument also makes it possible for you to request a generic object even if your specific class is available and it fixes a class loading bug that caused Avro to incorrectly load generic objects.

More Docs and Tutorials

The last addition this release is a new user guide on kitesdk.org, where we’re adding new tutorials and background articles. We’ve also updated the examples for the new features, which is a great place to learn more about Kite.

Also, watch this technical webinar on-demand to learn more about working with datasets in Kite.

Ryan Blue is a Software Engineer at Cloudera.

Categories: Hadoop

New in CDH 5.1: Apache Spark 1.0

Cloudera Blog - Mon, 07/28/2014 - 17:03

Spark 1.0 reflects a lot of hard work from a very diverse community.

Cloudera’s latest platform release, CDH 5.1, includes Apache Spark 1.0, a milestone release for the Spark project that locks down APIs for Spark’s core functionality. The release reflects the work of hundreds of contributors (including our own Diana Carroll, Mark Grover, Ted Malaska, Colin McCabe, Sean Owen, Hari Shreedharan, Marcelo Vanzin, and me).

In this post, we’ll describe some of Spark 1.0’s changes and new features that are relevant to CDH users.

API Incompatibilities

In anticipation of some features coming down the pipe, the release includes a few incompatible changes that will enable Spark to avoid breaking compatibility in the future. Most applications will require a recompile to run against Spark 1.0, and some will require changes in source code.

There are two changes in the core Scala API:

  • The cogroup and groupByKey operators now return Iterators over their values instead of Seqs. This change means that the set of values corresponding to a particular key need not all reside in memory at the same time.
  • SparkContext.jarOfClass now returns Option[String] instead of Seq[String].

Spark’s Java APIs were updated to accommodate Java 8 lambdas. Details on these changes are available under the Java section here.

The MLLib API contains a set of changes that allows it to support sparse and dense vectors in a unified way. Details on these changes are available here. (Note that MLLib is still a beta component, meaning that its APIs may change in the future.)

Details on the future-proofing of Spark streaming APIs are available here.


Most Spark programming examples focus on spark-shell, but prior to 1.0, users who wanted to submit compiled Spark applications to a cluster found it to be a convoluted process requiring guess-and-check, different invocations depending on the cluster manager and deploy mode, and oodles of boilerplate. Spark 1.0 includes spark-submit, a command that abstracts across the variety of deploy modes that Spark supports and takes care of assembling the classpath for you. A sample invocation:

spark-submit --class com.yourcompany.MainClass --deploy-mode cluster --master yarn appjar.jar apparg1 apparg2


Avro Support

We fixed a couple critical Apache Avro bugs that were preventing Spark from reading and writing Avro data. Stay tuned for a future post explaining best practices on interacting with Avro and Apache Parquet (incubating) data from Spark.

PySpark on YARN

One of the remaining items in Spark on YARN compared to other cluster managers was lack of PySpark support. Spark 1.0 allows you to launch PySpark apps against YARN clusters. PySpark currently only works in yarn-client mode. Starting a PySpark shell against a YARN installation is as simple as running:

MASTER=yarn-client pyspark


and running a PySpark script is as simple as running:

spark-submit --master yarn yourscript.py apparg1 apparg1


Spark History Server

A common complaint with Spark has been that the per-application UI, which displays task metrics and other useful information, disappears after an app completes. That leaves users in a rut when trying to debug failures. Instead, Spark 1.0 offers a History Server that displays information about applications after they have completed. Cloudera Manager provides easy setup and configuration of this daemon.

Spark SQL

Spark SQL, which deserves a blog post of its own, is a new Spark component that allows you to run SQL statements inside of a Spark application that manipulate and produce RDDs. Due to its immaturity and alpha component status, Cloudera does not currently offer commercial support for Spark SQL. However, we bundle it with our distribution so that users can try it out.

A Note on Stability

While we at Cloudera are quite bullish on Spark, it’s important the acknowledge that even its core components are not yet as stable as many of the more mature Hadoop ecosystem components. The 1.0 mark does not mean that Spark is now bug-free and ready to replace all your production MapReduce uses — but it does mean that people building apps on top of Spark core should be safe from surprises in future releases. Existing APIs will maintain compatibility, existing deploy modes will remain supported, and the general architecture will remain the same.


Cloudera engineers are working hard to make Spark more stable, easier to use, easier to debug, and easier to manage. Expect future releases to greater robustness, enhanced scalability, and deeper insight into what is going on inside a Spark application, both while running and after it has completed.

Sandy Ryza is a data scientist at Cloudera, and an Apache Hadoop committer.

Categories: Hadoop

New in Cloudera Manager 5.1: Direct Active Directory Integration for Kerberos Authentication

Cloudera Blog - Fri, 07/25/2014 - 15:28

With this new release, setting up a separate MIT KDC for cluster authentication services is no longer necessary.

Kerberos (initially developed by MIT in the 1980s) has been adopted by every major component of the Apache Hadoop ecosystem. Consequently, Kerberos has become an integral part of the security infrastructure for the enterprise data hub (EDH).

Until recently, the preferred architecture was to configure your Hadoop cluster to connect directly to an MIT key distribution center (KDC) for authentication services. However, many enterprises already use Active Directory (which has built-in support for Kerberos) in their environments for authentication. Thus, to use Active Directory with Hadoop, those organizations would typically need to set up their Kerberos KDC for one-way trust with the Active Directory KDC.

This task, however, is not easy; configuring and managing a MIT KDC can involve substantial overhead. Fortunately, Cloudera has long provided the simplest experience for enabling Kerberos on a Hadoop cluster, and with Cloudera Manager 5.1 (download), is now the only vendor that lets users easily integrate with an Active Directory KDC — thereby eliminating the need to have a separate MIT KDC and further reducing complexity and removing opportunities for security misconfiguration. Note that this functionality is in addition to the existing feature in Cloudera Manager for managing clusters with MIT KDC (with and without one-way trust to Active Directory) that has been present for more than three years now.

Cloudera Manager 5.1 has also added a new wizard that takes users through all the steps involved with enabling Kerberos for Hadoop clusters, including generating and deploying Kerberos client configuration (krb5.conf), configuring CDH components, and generating the principals needed by all the processes running in the cluster.

In the remainder of this blog post, you’ll learn how to use that wizard to set up a Kerberized cluster using an Active Directory KDC directly.

Before Starting the Wizard

Cloudera Manager requires an Account Manager user that has privileges to create other accounts in Active Directory. For our example here, presume the existence of an Organizational Unit (OU) where we keep all the accounts needed by the cluster. This also ensures that the Account Manager user doesn’t create any accounts outside of the OU. The OU is called “edhCluster” and Account Manager user is called “edh-account-manager.”

You can use the Active Directory Delegate Control wizard to grant this user permission to create other users by checking the option to “Create, delete and manage user accounts” as shown in the screenshot below.

Creating the Account Manager User

Next step is to install additional packages required for the direct-to-AD setup:

  1. Install OpenLDAP utilities (openldap-clients on RHEL/Centos) on the host of Cloudera Manager server.
  2. Install Kerberos client (krb5-workstation on RHEL/Centos) on all hosts of the cluster.

Now you are ready to enter the wizard that will enable Kerberos for your cluster.

Walking Through the Wizard

The wizard is triggered by going to the Actions menu of your cluster on the Home page of the Cloudera Manager UI as shown below.

Triggering the Enable Kerberos Wizard

The first page of the wizard contains a checklist to inform you about the prerequisites of setting up a Kerberized cluster:

The Welcome Page

The next page has fields to provide information about your KDC. In this example, I’ve selected Active Directory as the KDC type and specified the hostname of the domain controller in KDC Server Host field. Additionally, you’d need to provide the OU ( ou=edhCluster,dc=ent,dc=cloudera,dc=com, for example) where all the accounts will be created and the Kerberos realm you would like to use for the cluster. If you want Cloudera Manager to generate and deploy Kerberos client configs (krb5.conf) on your cluster instead of doing it manually, you should check “Manage krb5.conf” through Cloudera Manager. You can also optionally provide an account prefix that will be added to all the accounts created by Cloudera Manager to easily identify them. In the example below, the prefix is set as edhCluster.

Enter KDC information

The advanced Kerberos client configuration page is next, which you’d typically use if you have a complex setup that involves cross-realm trust. If you are going to have a simple direct-to-Active Directory setup only, you can move on to the next section. 

Advanced krb5.conf configuration (can be skipped for Direct-to-AD setup)

Next, you are going to enter the username and password for the Account Manager user (edh-account-manager) you created in Active Directory before entering the wizard. Cloudera Manager will generate an encrypted keytab with the credentials and use it whenever it needs to create new accounts.

Enter Account Manager credentials

The final piece of information you need to provide is the privileged ports that are needed by HDFS DataNodes in a secure cluster. The wizard recommends defaults that you can use. 

Enter privileged ports used by DataNodes

That’s it! Now Cloudera Manager starts a workflow that would run all the steps involved in enabling Kerberos for the cluster as shown below. Once the workflow is finished, only the users present in Active Directory (in any OU) will be able to authenticate to the cluster.

The final workflow

New Active Directory Accounts

Below is a screenshot of the OU after Cloudera Manager has created all the accounts needed by these processes. As you can see, all the auto-generated accounts have the prefix edhCluster in the display names. One important point to note is that Cloudera Manager sets randomly generated passwords for all the accounts. These passwords are also stored in encrypted keytabs and used when starting a process on the cluster (that is, the passwords themselves are unknown even to Cloudera Manager once the account is created).

Accounts created by Cloudera Manager

Once the initial setup has been done for enabling Kerberos for the cluster, Cloudera Manager will also automatically create any new accounts that are needed when new hosts are added to your cluster or when you’re adding a new service.

Next Steps

Now that you have added Kerberos based authentication to your cluster, you can further add authorizations for the data present in the cluster using Apache Sentry (incubating). Cloudera Manager also supports configuring LDAP group mappings for Hadoop.


As you can see, Cloudera Manager makes direct-to-AD Kerberos setup extremely simple and you can secure your cluster without having to worry about complex MIT KDC configuration and management. Learn more about configuring security using Cloudera Manager in the Security Guide.

Vikram Srivastava is a Software Engineer at Cloudera.

Categories: Hadoop

New in CDH 5.1: Document-level Security for Cloudera Search

Cloudera Blog - Wed, 07/23/2014 - 15:24

Cloudera Search now supports fine-grain access control via document-level security provided by Apache Sentry.

In my previous blog post, you learned about index-level security in Apache Sentry (incubating) and Cloudera Search. Although index-level security is effective when the access control requirements for documents in a collection are homogenous, often administrators want to restrict access to certain subsets of documents in a collection.

For example, consider a simple hierarchy of increasingly restrictive security classifications: confidential, secret, and top-secret, and a user with access to view confidential and secret documents querying the corpus. Without document-level security, this query becomes unnecessarily complex. Consider two possible implementations:

  • You could store the confidential and secret documents in non-intersecting collections. That would require complexity at the application or client level to query multiple collections and to combine and score the results:

    • You could duplicate and store the confidential documents with the secret ones in a single collection. That would reduce the application-layer complexity, but add storage overhead and complexity associated with keeping multiple copies of documents in sync:

    In contrast, document-level security, integrated via Sentry and now shipping in CDH 5.1, provides an out-of-the-box solution to this problem without adding extra complexity at the application/client layer or significant storage overhead. In this post, you’ll learn how it works. (Note: only access control is addressed here; other security requirements such as encryption are out of scope.)

    Document-Level Security Model

    You may recall from my previous post that a Sentry policy file specifies the following sections:

    • [groups]: maps a Hadoop group to its set of Sentry roles
    • [roles]: maps a Sentry role to its set of privileges (such as QUERY access on a collection “logs”)

    A simple policy file specification giving every user of the hadoop group “ops” the ability to query collection “logs” would look like this:

    [groups] # Assigns each Hadoop group to its set of roles ops = ops_role [roles] ops_role = collection = logs->action=Query,


    In document-level security, the Sentry role names are used as the authorization tokens that specify the set of roles that can view certain documents. The authorization tokens are specified in the individual Apache Solr documents, rather than in the Sentry policy file with the index-level permissions. This separation is done for a couple of reasons:

    • There are many more documents than collections; specifying thousands or millions of document-level permissions per collection in a single policy file would not scale.
    • Because the tokens are indexed in the Solr documents themselves, we can use Solr’s built-in filtering capabilities to efficiently enforce authorization requirements.

    The filtering works by having a Solr SearchComponent intercept the query and append a FilterQuery as part of the following process:

    A few important considerations to note here:

    • Document-level authorization does not supersede index-level authorization; if a user has the ability to view a document according to document-level security rules, but not according to index-level security rules, the request will be rejected.
    • The document-level component adds a FilterQuery with all of the user’s roles OR’ed together (a slight simplification of the actual FilterQuery used). Thus, to be able to view the document, the document must contain at least one of the user’s roles in the authorization token field. The name of the token field (called “authField” in the image above) is configurable.
    • Because multiple FilterQuerys work together as an intersection, a malicious user can’t avoid the document-level filter by specify his/her own trivial FilterQuery (such as fq=*:*)
    • Using a FilterQuery is efficient, because Solr caches previously used FilterQuerys. Thus, when a user makes repeated queries on a collection with document-level security enabled, we only pay the cost of constructing the filter on the first query and use the cached filter on subsequent requests
    Enabling Document-Level Security

    By default, document-level security is disabled to maintain backward compatibility with prior versions of Cloudera Search.  Enabling the feature for a collection involves small modifications to the default solrconfig.xml configuration file:

    <searchComponent name="queryDocAuthorization" class="org.apache.solr.handler.component.QueryDocAuthorizationComponent" > <!-- Set to true to enable document-level authorization --> <bool name="enabled">false</bool> <!-- Field where the auth tokens are stored in the document --> <str name="sentryAuthField">sentry_auth</str> ... </searchComponent>


    Simply changed enabled from “false” to “true” and if desired, change the sentryAuthField field. Then, upload the configuration and create the collection using Solrctl.

    Integration with the Hue Search App

    As with index-level security, document-level security is already integrated with the Hue Search App via secure impersonation in order to provide an intuitive and extensible end-user application.


    CDH 5.1 brings fine-grain access control to Cloudera Search via the integration of Sentry’s document-level security features.  Document-level security handles complex security requirements, while being simple to setup and efficient to use.

    Cloudera Search is available for download with extensive documentation. If you have any questions, please contact us at the Cloudera Search Forum.

    Gregory Chanan is a Software Engineer at Cloudera, and an Apache HBase committer.

Categories: Hadoop

New Apache Spark Developer Training: Beyond the Basics

Cloudera Blog - Mon, 07/21/2014 - 14:54

While the new Spark Developer training from Cloudera University is valuable for developers who are new to Big Data, it’s also a great call for MapReduce veterans.

When I set out to learn Apache Spark (which ships inside Cloudera’s open source platform) about six months ago, I started where many other people do: by following the various online tutorials available from UC Berkeley’s AMPLab, the creators of Spark. I quickly developed an appreciation for the elegant, easy-to-use API and super-fast results, and was eager to learn more.

Unfortunately, that proved harder than I expected. It was easy to pick up the basic syntax for working with Spark’s core abstraction, Resilient Distributed Datasets (RDDs). But in order to be able to use Spark to solve real-world problems, I needed a deeper understanding, I needed realistic examples, guidance on best practices, and of course, lots and lots of practice.

There is a wealth of information on the internet — videos, tutorials, academic papers, and a terrific active user community — but I had to spend a lot of time weeding through it to find what I need. Because Spark is so new, and changing so fast, it was hard to know on which information I could rely.

So, I set out to build a training course that goes beyond the basics, and give developers what they need to start using Spark to solve their Big Data problems: Cloudera Developer Training for Apache Spark.

Beyond WordCount

Understanding how Spark operates under the hood is the key to writing efficient code that best takes advantage of Spark’s built-in features. The course certainly covers the basics, like how to create and operate on RDDs, but it quickly goes beyond them. For instance, you’ll explore how Spark constructs Directed Acyclic Graphs, or DAGs, in order to execute tasks in parallel. You’ll learn how “narrow” operations like maps are pipelined together on a single node, whereas “wide” operations like grouping and reducing require shuffling results between cluster nodes:

Armed with this knowledge, you’ll learn techniques for minimizing expensive network shuffling like using shared variables, and favoring reduce operations (which reduce the data locally before shuffling across the network) over grouping (which shuffles all the data across the network).

Focus on Performance and Best Practices

Spark’s ability to distribute, cache, and process data in-memory offers huge advantages over MapReduce for important data processing jobs such as graph analysis and machine learning.  This course focuses on best practices for taking advantage of Spark’s capabilities  — such as how, when, and why to cache data in-memory, on local disk, or in HDFS. You will also learn about common performance bottlenecks and how to diagnose performance issues using the Spark Application Web UI. You will get hands-on experience solving performance problems using shared variables, checkpointing, and repartitioning.

Practice, Practice, Practice

I know from nearly 20 years of educating software developers that the only way to learn a new technology is to practice applying it – not just typing in commands as instructed, but by actually having to apply knowledge to real-world problems. And when I’m learning a new technology, I learn as much through my mistakes as my successes.

I wrote the course exercises to start with simple step-by-step instructions, and then moved on to challenge participants to think about how to solve realistic data processing problems like text-mining log files, correlating data from different sources in a variety of formats, and implementing analysis algorithms.

No Experience Necessary

I designed this course to be equally useful whether you are brand new to Big Data processing or an old hand at MapReduce and related technologies. Code examples and exercise solutions are available as either Python or Scala, so the only course requirement is experience developing applications in one of those two languages. We cover key related technologies such as cluster management, distributed file storage, and functional programming, with pointers to additional material for further study provided.

The growth and popularity of Spark over the last year has been huge, and adoption is accelerating; soon Spark will overtake MapReduce as the dominant technology for Big Data processing and analysis. This course is a great way for developers to get up to speed quickly and start using Spark to build faster, more flexible, easier-to-use applications.

Learn More

If you’d like to learn more, Cloudera is hosting a free webinar introducing Cloudera Developer Training for Apache Spark on Wed., July 23, at 10am PT/1pm ET. It will cover more about the course’s objectives, outline, prerequisites, and technical and business benefits, including a portion of the full training, plus Q&A. Register now!

You can also enroll in the full Spark Developer course by visiting Cloudera University. Public classes start in August and are currently scheduled in Redwood City, Calif., and Austin, with more class dates coming soon. Private training for your team is also available at your location and according to your schedule, so contact us for more information.

Diana Carroll is a curriculum designer for Cloudera University.

Categories: Hadoop

Cloudera Enterprise 5.1 is Now Available

Cloudera Blog - Thu, 07/17/2014 - 20:30

Cloudera Enterprise’s newest release contains important new security and performance features, and offers support for the latest innovations in the open source platform.

We’re pleased to announce the release of Cloudera Enterprise 5.1 (comprising CDH 5.1, Cloudera Manager 5.1, and Cloudera Navigator 2.0).

Cloudera Enterprise 5, released April 2014, was a milestone for users in terms of security, performance, and support for the latest community-driven innovations, and this update includes significant new investments in those areas, as well as a host of bug fixes. Here are some of the highlights (incomplete; see the Release Notes for CDH, Cloudera Manager, and Cloudera Navigator for a full list of features and fixes):

  • HDFS now includes support for access control lists (ACLs).
  • Apache Sentry (incubating) now supports GRANT/REVOKE statements.
  • Cloudera Search now supports document-level security (via Sentry).
  • Cloudera Manager has several new security-related features, such as ability to manage/deploy Kerberos configurations, integrate Kerberos with Active Directory, and manage Hadoop/CDH SSL-related configurations.
  • Spark Streaming is now integrated with Kerberos.
  • Cloudera Navigator 2.0 now provides comprehensive metadata, lineage, and auditing support across enterprise data hubs. Navigator also now includes enterprise-grade encryption and key management via Navigator Encrypt and Navigator Key Trustee, respectively.
  • Impala now utilizes HDFS caching for improved performance.
  • Impala queries and COMPUTE STATS statements are significantly faster.
  • HBase has improved write performance for WAL.
Support for the Latest Open Source Innovations

Cloudera Enterprise 5.1 is re-based on the latest stable component releases, including:

  • Apache Crunch 0.10
  • Apache Flume 1.5.0
  • Apache HBase 0.98.1
  • Apache Mahout  0.9.0
  • Apache Sentry (incubating) 1.3
  • Apache Spark 1.0
  • HBase Lily Indexer 1.5
  • Hue 3.6
  • Impala 1.4

…with new platform support for:

  • RHEL 6.5/CentOS 6.5
  • OEL 6.5 with UEK 2 and UEK3
  • MySQL 5.6
  • PostgreSQL 9.2 

Furthermore, this release contains a number of enhancements in the areas of resource management (now supports three different modes of Impala RM via YARN), and SQL support (DECIMAL support across Apache Hive, Impala, Apache Avro, Apache Parquet [incubating], and text file formats; plus ORDER BY without LIMIT in Impala).

Over the next few weeks, we’ll be publishing blog posts that cover a number of these new features in detail. In the meantime:

As always, we value your feedback; please provide any comments and suggestions through our community forums. You can also file bugs via issues.cloudera.org.

Categories: Hadoop

Jay Kreps, Apache Kafka Architect, Visits Cloudera

Cloudera Blog - Wed, 07/16/2014 - 21:42

It was good to see Jay Kreps (@jaykreps), the LinkedIn engineer who is the tech lead for that company’s online data infrastructure, visit Cloudera Engineering yesterday to spread the good word about Apache Kafka.

Kafka, of course, was originally developed inside LinkedIn and entered the Apache Incubator in 2011. Today, it is being widely adopted as a pub/sub framework that works at massive scale (and which is commonly used to write to Apache Hadoop clusters, and even data warehouses).

Perhaps the most interesting thing about Kafka is its treatment of the venerable commit log as its inspiring abstraction. As Jay puts it, the log is “the natural data structure for handling data flow between systems” — and he describes that approach is “pub/sub done right” (much more detail about this important concept here).

In his talk, Jay first described the objectives that brought Kafka into being, including the need to build data pipelines across a highly heterogeneous environment (and for highly heterogeneous data), and without ending up with a spaghetti diagram of specialized, one/off connections. The requirements involved will not be unfamiliar to most of this blog’s readers: fast, filesystem-like performance; built-in failover; and an inherently distributed architecture (including replication, partitioning, and so on). LinkedIn explored several COTS alternatives, all of which failed to tick at least one of these checkboxes, before going homegrown.

Today, Kafka is the dataflow glue that binds LinkedIn’s data infrastructure, keeping its online systems Espresso, Voldemort, graph databases, search, and so on in sync with offline ones (Hadoop and the enterprise data warehouse) — with 7 million messages written per second, and 35 million read per second.

Jay kindly agreed to let us share his presentation with you, and here it is:

Thanks Jay, we really appreciate your visit!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

The New Hadoop Application Architectures Book is Here!

Cloudera Blog - Tue, 07/15/2014 - 15:50

There’s an important new addition coming to the Apache Hadoop book ecosystem. It’s now in early release!

We are very happy to announce that the new Apache Hadoop book we have been writing for O’Reilly Media, Hadoop Application Architectures, is now available as an early release! It contains the first two chapters and can be found in O’Reilly’s Catalog and via Safari.        

The goal of this book is to give developers and architects guidance on architecting end-to-end solutions using Hadoop and tools in the ecosystem. We have split the book into two broad sections: the first section discusses various considerations for designing applications, and the second section describes the architectures of some of the most common applications of Hadoop and their architecture, thereby applying the considerations learned in the previous section.

The two chapters that are now available concentrate on design considerations for data modeling and data movement in Hadoop. For example, have you ever wondered:

  • Should your application store data in HDFS or Apache HBase?
  • If HDFS, in what format should you store your data? What compression codec should you use? What should your HDFS directories be called, which users should own them? What should be your partitioning columns? In general, what are the best practices for designing your HDFS schema?
  • If HBase, how can you best design your HBase schema?
  • What’s the best way to store and access metadata in Hadoop? What types of metadata are involved?
  • What are the considerations for designing schema for SQL-on-Hadoop (Apache Hive, Impala, HCatalog) tables?

In Chapter 1 – Data Modeling, we discuss considerations for above and many other questions to guide you with data modeling for your application.

And, if you have ever wondered:

  • How much latency is OK for your end users – a few seconds, minutes, or hours? How does the latency change the complexity of your design?
  • Which tools should you use for ingesting data into your cluster — file copy, Apache Flume, Apache Sqoop, Apache Kafka – and why?
  • Which tools should you use for egress of data out of your cluster — file copy, Sqoop, and so on?
  • Should you ingest or egress incrementally or overwrite it on every run? When using Flume, what kinds of sources, channels, sinks should you use?
  • When using Sqoop, how do you choose a split-by column, and tune your Sqoop import?
  • When using Kafka, how do you integrate Kafka with Hadoop and the rest of its ecosystem? 

Then Chapter 2 – Data Movement, is for you.

As you may have noticed, the questions above are fairly broad, and the answers rely heavily on understanding your application and its use case. So, we provide a very holistic set of considerations, and offer recommendations based on those considerations when designing your application.

We encourage you to check us out, get involved early, and explore the answers to the above questions. And, of course, we always value your feedback – whether it’s about errata and improvements or topics that you’d like to learn more about.

The work we have done so far wouldn’t have been possible without the encouragement, support, and reviews of many people. Thanks to all our reviewers thus far!

Mark Grover is a Software Engineer at Cloudera, an Apache Bigtop committer, Apache Sentry (incubating) PMC member, and  contributor to Hive, Flume, and Sqoop.

Ted Malaska is a Solutions Architect at Cloudera, and a contributor to Apache Avro, Flume, Apache Pig, and Hadoop.

Jonathan Seidman is a Solutions Architect on the Partner Engineering team at Cloudera.

Gwen Shapira is a Solutions Architect at Cloudera.

Categories: Hadoop

Estimating Financial Risk with Apache Spark

Cloudera Blog - Mon, 07/14/2014 - 15:13

Learn how Spark facilitates the calculation of computationally-intensive statistics such as VaR via the Monte Carlo method.

Under reasonable circumstances, how much money can you expect to lose? The financial statistic value at risk (VaR) seeks to answer this question. Since its development on Wall Street soon after the stock market crash of 1987, VaR has been widely adopted across the financial services industry. Some organizations report the statistic to satisfy regulations, some use it to better understand the risk characteristics of large portfolios, and others compute it before executing trades to help make informed and immediate decisions.

For reasons that we will delve into later, reaching an accurate estimate of VaR can be a computationally expensive process. The most advanced approaches involve Monte Carlo simulations, a class of algorithms that seek to compute quantities through repeated random sampling. When we can’t derive a closed form for a probability distribution analytically, we can often estimate the shape it takes by repeatedly sampling the simpler processes that compose it and seeing how the results perform in aggregate.

Several Cloudera customers in the financial services industry have independently approached us about calculating VaR as well as doing more general analysis of financial risk on our platform. Working on engagements with a few different institutions, we’ve had the chance to codify some best practices for analyzing risk in the Apache Hadoop ecosystem. In particular, we’ve helped stand up these calculations on Apache Spark, the general-purpose distributed computation framework with support for in-memory analytics that is being rapidly adopted across multiple industries. Some customers have come across Spark on their own, while we’ve recommended it to others and found it to be a good fit.

In this post you will learn the background on VaR and approaches to calculating it, and get a reference implementation for using a Monte Carlo method that leverages Spark to run many simulations in parallel. The post focuses on simplicity over the best possible model, but describe the points where steps can be extended with more complex techniques or domain knowledge.

Example code along with a small sample dataset is available here.

Background on VaR

VaR is a simple measure of an investment’s risk that tries to provide a reasonable estimate of maximum probable loss over a particular time period. A VaR calculation considers a given confidence level: a VaR of US$1 million with a 95% confidence level means that we believe our investment stands only a 5% chance of losing more than US$1 million over the time period.

A few different methods are employed to calculate VaR:

  • Variance-covariance: The simplest and least computationally expensive approach, the variance-covariance method derives a solution analytically by relying on simplifying assumptions about the probability distributions in the model.
  • Historical simulation: Historical simulation tries to directly extrapolate risk from historical data. A drawback of this method is that historical data can be limited and fails to include “what-ifs”. The history we have for the instruments in our portfolio may lack market collapses, but we might wish to model what happens to our portfolio in these situations. Techniques exist for making historical simulations robust to these issues, such as introducing “shocks” into the data, but we won’t cover them in this post.
  • Monte Carlo simulation: This method, covered in this post, tries to avoid some of the assumptions in the methods described above. In its most general form, this method:
    1. Defines a relationship between market conditions and each instrument’s returns
    2. Poses “trials” consisting of random market conditions
    3. Calculates the portfolio loss for each trial, and uses the aggregated trial data to build up a profile of the portfolio’s risk characteristics.

It is of course worth mentioning that the Monte Carlo method isn’t perfect. The models for generating trial conditions and for inferring instrument performance from them must make simplifying assumptions, and the distribution that comes out won’t be more accurate than these models going in.

The Model

A Monte Carlo risk model typically phrases each instrument’s return in terms of a set of market factors (such as the value of indexes like the S&P 500, the US GDP, or currency exchange rates). In our simulation, we’ll use a simple linear model: instrument returns are calculated as a weighted sum of the values of the market factors. We choose different weights for each instrument for each market factor. We can fit the model for each instrument with a regression using historical data. On top of this, we’ll allow our instruments to have some optionality – each can be parameterized with a minimum and maximum value. This adds an element of non-linearity that the variance-covariance method has trouble handling, but which the Monte Carlo method can take in stride.

It’s also worth mentioning that we could have chosen a more complicated model, perhaps incorporating domain specific knowledge.  While the per-instrument model-fitting step of the computation is a good fit for Spark as well, we’ll leave it out here for the sake of brevity.

Now that we have our model for calculating instrument losses from market factors, we need a process for simulating the behavior of market factors. A simple assumption is that each market factor follows a normal distribution. To capture the fact that market factors are often correlated – when NASDAQ is down, the Dow is likely to be suffering as well – we can use a multivariate normal distribution with a non-diagonal covariance matrix. As above, we could have chosen a more complicated method of simulating the market or assumed a different distribution for each market factor, perhaps one with a fatter tail.

To summarize, trial conditions are drawn from a multivariate normal distribution:

The value of a particular instrument in a particular trial is the dot product of the trial conditions and the instrument’s factor weightswi bounded by the instrument’s minimum and maximum value, ni and xi:

The portfolio’s value for the trial is the sum of all instrument values for that trial:

Running on Spark

A drawback of the Monte Carlo method is its computational intensity; getting accurate results for a large portfolio can require many trials, and simulating each trial can be computationally involved. This is where Spark comes in.

Spark allows you to express parallel computations across many machines using simple operators. A Spark job consists of a set of transformations on parallel collections. We simply pass in Scala (or Java or Python) functions and Spark handles distributing the computation across the cluster.  It is also fault tolerant, so if any machines or processes fail while the computation is running, we don’t need to restart from scratch.

A general sketch of our computation looks like:

  1. Broadcast our instrument data to every node on the cluster. While a large portfolio might consist of millions of instruments, the most memory this should take is in the 10s of gigabytes, which is easily enough to fit into main memory on modern machines.
  2. Create a parallel collection (RDD) of seeds for our random number generators.
  3. Create a new parallel collection of portfolio values under random trial conditions by applying a function to each seed that generates a set of random trial conditions, applies them to each instrument to calculate the its value under those conditions, and then sums over all instruments.
  4. Find the boundary between the bottom 5% of trial values and the rest.
  5. Subtract the portfolio at this boundary from the current value to find the value at risk.

The following is Scala code for calculating the trial values. The trialValues function takes all our instruments and a number of trials to run, and spits out an array containing the portfolio values for each trial.

def trialValues(seed: Long, numTrials: Int, instruments: Seq[Instrument], factorMeans: Array[Double], factorCovariances: Array[Array[Double]]): Seq[Double] = { val rand = new MersenneTwister(seed) val multivariateNormal = new MultivariateNormalDistribution(rand, factorMeans, factorCovariances) val trialValues = new Array[Double](numTrials) for (i <- 0 until numTrials) { val trial = multivariateNormal.sample() trialValues(i) = trialValue(trial, instruments) } trialValues } def trialValue(trial: Array[Double], instruments: Seq[Instrument]): Double = { var totalValue = 0.0 for (instrument <- instruments) { totalValue += instrumentTrialValue(instrument, trial) } totalValue } def instrumentTrialValue(instrument: Instrument, trial: Array[Double]): Double = { var instrumentTrialValue = 0.0 var i = 0 while (i < trial.length) { instrumentTrialValue += trial(i) * instrument.factorWeights(i) i += 1 } Math.min(Math.max(instrumentTrialValue, instrument.minValue), instrument.maxValue) }


Note that we could have written this code in Java or Python as well. The Spark code that runs it on a cluster is:

val broadcastInstruments = sc.broadcast(instruments) val seeds = (seed until seed + parallelism) val seedRdd = sc.parallelize(seeds, parallelism) val trialsRdd = seedRdd.flatMap(trialValues(_, numTrials / parallelism, broadcastInstruments.value, factorMeans, factorCovariances))


We now have a collection of simulated losses over many trials. To calculate VaR, we want to see what happens at the bottom 5%. We can compute this with:

val varFivePercent = trialsRdd.takeOrdered(numTrials / 20).last


Of course, we don’t have to stop at calculating VaR — our simulation contains much more information about the risk characteristics of the portfolio. For example, we can home in on the trials for which our portfolio performs the worst and determine which instruments and market factors are the most to blame. Spark is also useful for computing these rollups and aggregations. We can also use kernel-density estimation to visualize the probability distribution our simulated portfolio values take; the sample repository contains a simple implementation that will soon go into Spark’s built-in machine learning library, MLLib, itself. 

Using our toy data with four instruments, three factors, and a million trials, we observe the following risk profile (chart generated by pulling the density results into a spreadsheet):

Other Considerations Interactive Analysis

So far we’ve presented the computation as a batch job; however, Spark also supports interactive settings. For example, analysts and traders might wish to see what happens when they tweak model parameters, filter the set of instruments considered to those matching some particular criteria, or add a trade that they’re about to execute into the mix. After broadcasting, Spark will keep the set of instruments in memory on every machine in the cluster, making them available for servicing interactive queries. If filtering on particular instrument attributes is routinely done, it might make sense to store the instruments as a map indexed by those attributes.

Huge Instrument Data

While it’s rare for a single portfolio to be too large to fit in entirety on every machine, working with huge portfolios composed of instruments like small business loans might require splitting up the portfolio data across machines.

Spark handles this use case as well; here’s a general sketch of how it looks:

val sc = new SparkContext(...) val instrumentsRdd = sc.parallelize(instruments, numPartitions) val fragmentedTrialsRdd = instrumentsRdd.mapPartitions(trialValues(seed, _, numTrials, modelParameters)) val trialsRdd = fragmentedTrialsRdd.sumByKey() def trialReturns(seed: Long, instruments: Iterator[Instrument], numTrials: Int, modelParameters: YourModelParameters): Iterator[(Int, Double)] = { // Compute the value of the given subset of instruments for all trials, // and emit tuples of (trialId, value) }


Essentially we invert the parallelism so that each task computes the value of a subset of instruments under every trial, instead of the value of every instrument under a subset of trials.

Other Distributed Processing Frameworks

Thus far we’ve hyped Spark for distributed processing, without comparing it to other distributed processing frameworks. (For example, why not use MPI and take advantage of the speed of C++?) The primary reason for preferring Spark is the balance it strikes between flexibility and ease of use. Its programming model allows describing parallel computations with far fewer lines of code, and its Scala and Python REPLs allow experimenting with models interactively. Spark combines the potential for rapid prototyping with the strong performance characteristics of the JVM that matter when the time comes to put a model into production. Furthermore, deeper analyses often involve operations on the full instrument-trial loss matrix, which, on decently sized portfolios, can run into the terabytes. A fault-tolerant, data-parallel framework like Spark can handle data at this volume.

That said, only highly optimized native code can squeeze the last bit of juice out of each CPU. Most numerical computations boil down to matrix and vector operations. The bulk of the computation from the model in this post lies in computing dot products — using Java and Python frameworks like JBlas and numpy pushes these linear-algebra operations down to optimized Fortran. Existing C++ libraries can be accessed through JNI.

I’ve elided these optimizations here for the sake of simplicity, but they’re worth considering when building a production application.


In this post, we’ve demonstrated what it looks like to use Spark for Monte Carlo simulations in financial applications.  VaR is the first step at answering the larger question: What is the shape of the probability distribution that governs my investments’ returns, how sensitive are my returns, and to what market factors are they most sensitive? Spark provides a flexible framework for leveraging the processing power of compute clusters to answer these questions.

Sandy Ryza is a data scientist at Cloudera, a Hadoop committer, and a Spark contributor.

Categories: Hadoop

This Month in the Ecosystem (June 2014)

Cloudera Blog - Fri, 07/11/2014 - 14:01

Welcome to our 10th edition of “This Month in the Ecosystem,” a digest of highlights from June 2014 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly).

Pretty busy for early Summer:

  • A design document has gone upstream describing new work to make Apache Hive run on Apache Spark for a data processing backend. This effort is the first step in a collaborative broader effort by Cloudera, Databricks, IBM, Intel, and MapR to make Spark the data processing standard for the entire Apache Hadoop ecosystem, starting with Hive.
  • The above was announced by Cloudera’s Mike Olson at Spark Summit 2014, which appeared to have doubled in size over the 2013 edition!
  • Facebook described HydraBase, its internal HBase implementation, for the first time in a broad public manner. (HydraBase was also the subject of a keynote session at HBaseCon 2014.) Later in June, Facebook also described how it uses HDFS in combination with RAID concepts.
  • Cloudera and Intel described their intention to bring comprehensive enterprise-class security to Hadoop under Project Rhino — with one of the most recent steps being the contribution of code to support at-rest encryption in HDFS.
  • The first Accumulo Summit convened in Maryland, and there was much rejoicing. Presentations are here.
  • Presentations and recordings from HBaseCon 2014 were released to all.
  • Yet Another General Data Processing Platform for Hadoop (YAGDPPH), Apache Flink (previously Stratosphere), entered the Apache incubator.
  • Apache Tez graduated into a Top Level Project.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

Jeff Dean’s Talk at Cloudera

Cloudera Blog - Wed, 07/09/2014 - 16:03

Google’s Jeff Dean — among the original architects of MapReduce, Bigtable, and Spanner — revealed some fascinating facts about Google’s internal environment at Cloudera HQ recently.

Earlier this week, we were pleased to welcome Google Senior Fellow Jeff Dean to Cloudera’s Palo Alto HQ to give an overview of some of his group’s current research. Jeff has a peerless pedigree in distributed computing circles, having been deeply involved in the design and implementation of Google’s original advertising serving system, MapReduce, Bigtable, Spanner, and a host of other projects.


Jeff’s presentation had two main parts:

  • First, a discussion about Google’s efforts to bring classic fault tolerance principles to online services — to “create a predictably responsive whole out of less-predictable parts”, in his phrasing. This effort is born out of the fact that the growing size and complexity of Google’s infrastructure, and/or increases in usage, create an ever-higher risk of increased latency if left unchecked. Instead, Google strives for “tail-tolerant” systems that reduce that risk through the application of well known techniques.
  • Second, an explanation of how Google is banking on the venerable neural network model to make “deep machine learning” a general abstraction across image, audio, and text processing. (In Dean’s view, the original neural networks of the late 1980s/early 1990s failed to meet expectations because of a lack of computational power as well as inadequate amounts of data for model-training purposes, neither of which are issues for Google today.) Dean predicts that neural nets will make a big comeback in the machine-learning area — and with Google’s track record of influence, that’s likely to be a self-fulfilling prophecy.

Jeff also spent some time addressing how MapReduce, Bigtable, and other familiar technologies are being used at Google today. For example, Jeff told us that more than 1 million MR jobs run at Google daily, although use of the native API was largely dropped in favor of FlumeJava (the inspiration for Apache Crunch) and other abstractions some time ago.

It’s hard to imagine a more prestigious computer scientist inside these walls. Thanks, Jeff!

Justin Kestelyn is Cloudera’s developer outreach director.


Categories: Hadoop

Apache Hive on Apache Spark: Motivations and Design Principles

Cloudera Blog - Tue, 07/01/2014 - 16:54

Two of the most vibrant communities in the Apache Hadoop ecosystem are now working together to bring users a Hive-on-Spark option that combines the best elements of both.

Apache Hive is a popular SQL interface for batch processing and ETL using Apache Hadoop. Until recently, MapReduce was the only execution engine in the Hadoop ecosystem, and Hive queries could only run on MapReduce. But today, alternative execution engines to MapReduce are available — such as Apache Spark and Apache Tez (incubating).

Although Spark is relatively new to the Hadoop ecosystem, its adoption has been meteoric. An open-source data analytics cluster computing framework, Spark is built outside of Hadoop’s two-stage MapReduce paradigm but runs on top of HDFS. Because of its successful approach, Spark has quickly gained momentum and become established as an attractive choice for the future of data processing in Hadoop.

In this post, you’ll get an overview of the motivations and technical details behind some very exciting news for Spark and Hive users: the fact that the Hive and Spark communities are joining forces to collaboratively introduce Spark as a new execution engine option for Hive, alongside MapReduce and Tez (see HIVE-7292).

Motivation and Approach

Here are the two main motivations for enabling Hive to run on Spark:

  • To improve the Hive user experience
    Hive queries will run faster, thereby improving user experience. Furthermore, users will have access to a robust, non-MR execution engine that has already proven itself to be a leading option for data processing as well as streaming, and which is among the most active projects across all of Apache from contributor and commit standpoints.
  • To streamline operational management for Spark shops
    Hive-on-Spark will be very valuable for users who are already using Spark for other data-processing and machine-learning needs. Standardizing on one execution back end is also convenient for operational management, making it easier to debug issues and create enhancements.

Superficially, this project’s goals look similar to those of Shark or Spark SQL, which are separate projects that reuse the Hive front end to run queries using Spark. However, this design adds Spark into Hive, parallel to MapReduce and Tez, as another backend execution engine. Thus, existing Hive jobs continue to run as-is transparently on Spark.

The key advantage of this approach is to leverage all the existing integration on top of Hive, including ODBC/JDBC, auditing, authorization, and monitoring. Another advantage is that it will have no impact on Hive’s existing code path and thus no functional or performance effects. Users choosing to run Hive on either MapReduce or Tez will have the same functionality and code paths they have today — thus, the Hive user community will be in the great position of being able to choose among MapReduce, Tez, or Spark as a back end. In addition, maintenance costs will be minimized so the Hive community needn’t make specialized investments for Spark.

Meanwhile, users opting for Spark as the execution engine will automatically have all the rich functional features that Hive provides. Future features (such as new data types, UDFs, logical optimization, and so on ) added to Hive should become automatically available to those users, without any customization work to be done in Hive’s Spark execution engine.

Overall Functionality

To use Spark as an execution engine in Hive, you would set the following:

set hive.execution.engine=spark;

The default value for this configuration is still “mr”. Hive will continue to work on MapReduce as-is on clusters that don’t have Spark on them. When Spark is configured as Hive’s execution, a few configuration variables will be introduced, such as the master URL of the Spark cluster.

The new execution engine should support all Hive queries without any modification. Query results should be functionally equivalent to those from either MapReduce or Tez. In addition, existing functionality related to the execution engine should also be available, including the following:

  • Hive will display a task execution plan that’s similar to that being displayed by the explain command for MapReduce and Tez.     
  • Hive will give appropriate feedback to the user about progress and completion status of the query when running queries on Spark.
  • The user will be able to get statistics and diagnostic information as before (counters, logs, and debug info on the console).
Hive-Level Design

As noted previously, this project takes a different approach from that of Shark in that SQL semantics will be not implemented using Spark’s primitives, but rather MapReduce ones that will be executed in Spark.

The main work to implement the Spark execution engine for Hive has two components: query planning, where Hive operator plan from semantic analyzer is further translated a task plan that Spark can execute; and query execution, where the generated Spark plan is executed in the Spark cluster. There are other miscellaneous yet indispensable functional pieces involving monitoring, counters, statistics, and so on, but for brevity, we will only address the main design considerations here.

Query Planning

Currently, for a given user query, Hive’s semantic analyzer generates an operator plan that comprises a graph of logical operators such as TableScanOperator, ReduceSink, FileSink, GroupByOperator, and so on. MapReduceCompiler compiles a graph of MapReduceTasks and other helper tasks (such as MoveTask) from the logical operator plan. Tez behaves similarly, yet generates a TezTask that combines otherwise multiple MapReduce tasks into a single Tez task.

For Spark, we will introduce SparkCompiler parallel to MapReduceCompiler and TezCompiler. Its main responsibility is to compile from the Hive logical operator plan a plan that can be executed on Spark. Thus, we will have SparkTask, depicting a job that will be executed in a Spark cluster, and SparkWork, describing the plan of a Spark task. Thus, SparkCompiler translates a Hive’s operator plan into a SparkWork instance. During the task plan generation, SparkCompiler may also perform physical optimizations suitable for Spark.   

Job Execution

A SparkTask instance can be executed by Hive’s task execution framework in the same way as for other tasks. Internally, the SparkTask.execute() method will make RDDs and functions out of a SparkWork instance, and submit the execution to the Spark cluster via a Spark client.

Once the Spark work is submitted to the Spark cluster, the Spark client will continue to monitor the job execution and report progress. A Spark job can be monitored via SparkListener APIs.

With SparkListener APIs, we will add a SparkJobMonitor class that handles printing of status as well as reporting the final result. This class provides similar functions as HadoopJobExecHelper used for MapReduce processing, or TezJobMonitor used for Tez job processing, and will also retrieve and print the top-level exception thrown at execution time in case of job failure.

Spark job submission is done via a SparkContext object that’s instantiated with the user’s configuration. When a SparkTask is executed by Hive, such a context object is created in the current user session. With the context object, RDDs corresponding to Hive tables are created and MapFunction and ReduceFunction (more details below) are built from Hive’s SparkWork and applied to the RDDs. Job execution is triggered by applying a foreach() transformation on the RDDs with a dummy function.

Main Design Considerations

Hive’s operator plan is based on MapReduce paradigm, and traditionally, a query’s execution contains a list of MapReduce jobs. Each MapReduce job consists of map-side processing starting from Hive’s ExecMapper and reduce-side processing starting from ExecReducer, and MapReduce provides inherent shuffling, sorting, and grouping between the map-side and the reduce-side. The input to the whole processing pipeline are the folders and files corresponding to the table.

Because we will reuse Hive’s operator plan but perform the same data processing in Spark, the execution plan will be built in Spark constructs such as RDD, function, and transformation. This approach is outlined below.

Table as RDD

A Hive table is simply a bunch of files and folders on HDFS. Spark primitives are applied to RDDs. Thus, naturally, Hive tables will be treated as RDDs in the Spark execution engine.


The above mentioned MapFunction will be made from MapWork; specifically, the operator chain starting from ExecMapper.map() method. ExecMapper class implements MapReduce Mapper interface, but the implementation in Hive contains some code that can be reused for Spark. Therefore, we will extract the common code into a separate class, MapperDriver, to be shared by MapReduce as well as Spark.


Similarly, ReduceFunction will be made of ReduceWork instance from SparkWork. To Spark, ReduceFunction is no different than MapFunction, but the function’s implementation will be different, and made of the operator chain starting from ExecReducer.reduce(). Also, because some code in ExecReducer will be reused, we will extract the common code into a separate class, ReducerDriver, for sharing by both MapReduce and Spark.

Shuffle, Group, and Sort

While this functionality comes for “free” along with MapReduce, we will need to provide an equivalent for Spark. Fortunately, Spark provides a few transformations that are suitable for replacing MapReduce’s shuffle capability, such as partitionBy, groupByKey, and sortByKey. Transformation partitionBy does pure shuffling (no grouping or sorting), groupByKey does shuffling and grouping, and sortByKey() does shuffling plus sorting. Therefore, for each ReduceSinkOperator in SparkWork, we will need to inject one of the transformations.

Having the capability to selectively choose the exact shuffling behavior provides opportunities for optimization. For instance, Hive’s groupBy doesn’t require the key to be sorted, but MapReduce does. In contrast, in Spark, one can choose sortByKey only if key order is important (such as for SQL ORDER BY).

Multiple Reduce Stages

Whenever a query has multiple ReduceSinkOperator instances, Hive will break the plan apart and submit one MR job per sink. All the MR jobs in this chain need to be scheduled one-by-one, and each job has to re-read the output of the previous job from HDFS and shuffle it. In Spark, this step is unnecessary: multiple map functions and reduce functions can be concatenated. For each ReduceSinkOperator, a proper shuffle transformation needs to be injected as explained above.


Based on the above, you will likely recognize that although Hive on Spark is simple and clean in terms of functionality and design, the implementation will take some time. Therefore, the community will take a phased approach, with all basic functionality delivered in a Phase 1 and optimization and improvements ongoing over a longer period of time. (Precise number of phases and what each will entail are under discussion.)

Most important, the Hive and Spark communities will work together closely to achieve this technical vision and resolve any obstacles that might arise — with the end result being the availability to Hive users of an execution engine that improves performance as well as unifies batch and stream processing.

We invite you to follow our progress, on which we will offer periodic updates!

Xuefu Zhang is a Software Engineer at Cloudera and a Hive PMC member.

Categories: Hadoop

Why Extended Attributes are Coming to HDFS

Cloudera Blog - Fri, 06/27/2014 - 15:00

Extended attributes in HDFS will facilitate at-rest encryption for Project Rhino, but they have many other uses, too.

Many mainstream Linux filesystems implement extended attributes, which let you associate metadata with a file or directory beyond common “fixed” attributes like filesize, permissions, modification dates, and so on. Extended attributes are key/value pairs in which the values are optional; generally, the key and value sizes are limited to some implementation-specific limit. A filesystem that implements extended attributes also provides system calls and shell commands to get, list, set, and remove attributes (and values) to/from a file or directory.

Recently, my Intel colleague Yi Liu led the implementation of extended attributes for HDFS (HDFS-2006). This work is largely motivated by Cloudera and Intel contributions to bringing at-rest encryption to Apache Hadoop (HDFS-6134; also see this post) under Project Rhino – extended attributes will be the mechanism for associating encryption key metadata with files and encryption zones — but it’s easy to imagine lots of other places where they could be useful.

For instance, you might want to store a document’s author and subject in sometime like user.author=cwl and user.subject=HDFS. You could store a file checksum in an attribute called user.checksum. Even just comments about a particular file or directory can be saved in an extended attribute.

In this post, you’ll learn some of the details of this feature from an HDFS user’s point of view.

Inside Extended Attributes

Extended attribute keys are java.lang.Strings and the values are byte[]s. By default, there is a maximum of 32 extended attribute key/value pairs per file or directory, and the (default) maximum size of the combined lengths of name and value is 16,384. You can configure these two limits with the dfs.namenode.fs-limits.max-xattrs-per-inode and dfs.namenode.fs-limits.max-xattr-size config parameters.

Every extended attribute name must include a namespace prefix, and just like in the Linux filesystem implementations, there are four extended attribute namespaces: user, trusted, system, and security. The system and security namespaces are for HDFS internal use only; only the HDFS super user can access trusted namespace. So, user extended attributes will generally reside in the user namespace (for example, “user.myXAttrkey”). Namespaces are case-insensitive and extended attribute names are case-sensitive.

Extended attributes can be accessed using the hdfs dfs command. To set an extended attribute on a file or directory, use the -setfattr subcommand. For example,

hdfs dfs -setfattr -n 'user.myXAttr' -v someValue /foo


You can replace a value with the same command (and a new value) and you can delete an extended attribute with the -x option. The usage message shows the format. Setting an extended attribute does not change the file’s modification time.

To examine the value of a particular extended attribute or to list all the extended attributes for a file or directory, use the hdfs dfs –getfattr subcommand. There are options to recursively descend through a subtree and specify a format to write them out (text, hex, or base64). For example, to scan all the extended attributes for /foo, use the -d option:

hdfs dfs -getfattr -d /foo # file: /foo user.myXAttr='someValue'


The org.apache.hadoop.fs.FileSystem class has been enhanced with methods to set, get, and remove extended attributes from a path.

package org.apache.hadoop.fs; public class FileSystem { /* Create or replace an extended attribute. */ public void setXAttr(Path path, String name, byte[] value) throws IOException; /* get an extended attribute. */ public byte[] getXAttr(Path path, String name) throws IOException; /* get multiple extended attributes. */ public Map<String, byte[]> getXAttrs(Path path, List<String> names) throws IOException; /* Remove an extended attribute. */ public void removeXAttr(Path path, String name) throws IOException; }


Current Status

Extended attributes are currently committed on the upstream trunk and branch-2 and will be included in CDH 5.2 and Apache Hadoop 2.5. They are enabled by default (you can disable them with the dfs.namenode.xattrs.enabled configuration option) and there is no overhead if you don’t use them.

Extended attributes are also upward compatible, so you can use an older client with a newer NameNode version. Try them out!

Charles Lamb is a Software Engineer at Cloudera, currently working on HDFS.

Categories: Hadoop

Where to Find Cloudera Tech Talks (Through September 2014)

Cloudera Blog - Wed, 06/25/2014 - 16:12

Find Cloudera tech talks in Texas, Oregon, Washington DC, Illinois, Georgia, Japan, and across the SF Bay Area during the next calendar quarter.

Below please find our regularly scheduled quarterly update about where to find tech talks by Cloudera employees – this time, for the third calendar quarter of 2014 (July through September; traditionally, the least active quarter of the year). Note that this list will be continually curated during the period; complete logistical information may not be available yet. And remember, many of these talks are in “free” venues (no cost of entry).

As always, we’re standing by to assist your meetup by providing speakers, sponsorships, and schwag!

Date City Venue Speaker(s) June 30-July 1 San Francisco Spark Summit Sandy Ryza on Apache Spark-on-YARN, and on doing end-to-end analytics with Spark July TBD Austin Austin Data Geeks Sean Busbey on Apache Accumulo July 7 San Francisco Spark Happy Hour No speakers, just sponsoring July 8 Tokyo Hadoop Conference Japan Tatsuo Kawasaki on Hue, Daisuke Kobayashi on security, Sho Shimauchi on Impala July 15 Palo Alto, Calif. SF Bay Lucene/Solr Meetup Mark Miller on SolrCloud replica failover and automated testing July 16 Irvine, Calif. OC Big Data Meetup Greg Chanan on Solr+Apache Hadoop July 21 San Francisco GraphLab Conference Josh Wills on ” What Comes After the Star Schema?” July 22 Washington DC DC Area Spark Interactive Ted Malaska (panelist) on Spark use cases July 20-23 Portland, Ore. OSCON Gwen Shapira on using R for analytics on Hadoop (+ leads a Hadoop BOF) July 23 Portland, Ore. Portland Big Data User Group Speaker TBD on Impala July 23 SF Bay Area TBD The Hive Think Tank Eddie Garcia and Alejandro Abdelnur on enterprise security for Hadoop Aug. 13 San Diego, Calif. SD Big Data Meetup Maxime Dumas on Impala Aug. 20 San Jose, Calif. NoSQL Now! Aleks Shulman on Apache HBase development Sept. 15 Champaign, Ill. U-CHUG Doug Cutting on the future of data Sept. 19 Atlanta MLconf Atlanta Sandy Ryza on end-to-end analytics with Spark Sept. 28-Oct. 2 San Francisco JavaOne Aleks Shulman on HBase development, Daniel Templeton with a Hadoop HOL

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

How-to: Create an IntelliJ IDEA Project for Apache Hadoop

Cloudera Blog - Tue, 06/24/2014 - 15:07

Prefer IntelliJ IDEA over Eclipse? We’ve got you covered: learn how to get ready to contribute to Apache Hadoop via an IntelliJ project.

It’s generally useful to have an IDE at your disposal when you’re developing and debugging code. When I first started working on HDFS, I used Eclipse, but I’ve recently switched to JetBrains’ IntelliJ IDEA (specifically, version 13.1 Community Edition).

My main motivation was the ease of project setup in the face of Maven and Google Protocol Buffers (used in HDFS). The latter is an issue because the code generated by protoc ends up in one of the target subdirectories, which can be a configuration headache. The problem is not that Eclipse can’t handle getting these files into the classpath — it’s that in my personal experience, configuration is cumbersome and it takes more time to set up a new project whenever I make a new clone. Conversely, IntelliJ’s import maven project functionality seems to “just work”.

In this how-to, you’ll learn a few simple steps I use to create a new IntelliJ project for writing and debugging your code for Hadoop. (For Eclipse users, there is a similar post available here.) It assumes you already know how to clone, build, and run in a Hadoop repository using mvn, git, and so on. These instructions have been tested with the Hadoop upstream trunk (github.com/apache/hadoop-common.git).

  1. Make a clone of a Hadoop repository (or use an existing sandbox). There should be a hadoop-common directory at the top level when you’re finished.
  2. Do a clean and a full build using the mvn command in the CLI. I use mvn clean install, but you should do whatever suits you.
  3. In .../bin/idea.properties file, set idea.max.intellisense.filesize=10000.
  4. Start IntelliJ.
  5. Select File > Import Project…
  6. A “Select File or Directory” to Import wizard screen will pop up. Select the hadoop-common directory from your repository clone. Click OK.
  7. An “Import Project” wizard screen will appear. Check “Import project from external model” and select “Maven”. Click Next.

  8. On the next screen, you don’t need to select any options. Click Next


  9. On the next wizard screen, you do not need to select any profiles. Click Next.
  10. On the next wizard screen, org.apache.hadoop.hadoop-main:N.M.P-SNAPSHOT will already be selected. Click Next.
  11. On the next wizard screen, ensure that IntelliJ is pointing at a JDK 7 SDK (download JDK 7 here) for the JDK home path. Click Next.

  12. On the next wizard screen, give your project a project name. I typically set the “Project file location” to a sibling of the hadoop-common directory, but that’s optional. Click Finish.
  13. IntelliJ will then tell you: “New projects can either be opened in a new window or replace the project in the existing window. How would you like to open the project?” I typically select “New Window” because it lets me different projects in different panels. Select one or the other.
  14. IntelliJ then imports the project.
  15. You can check that the project builds OK using Build > Rebuild Project. In the “Messages” panel I typically use the little icon on the left side of that window to Hide Warnings.

You’re ready for action. It’s easy to set up multiple projects that refer to different clones: just repeat the same steps above. Click on Window and you can switch between them.

A typical workflow for me is to edit either in my favorite editor or in IntelliJ. If the former, IntelliJ is very good about updating the source that it shows you. After editing, I’ll do Build > Rebuild Project, work out any compilation errors, and then either use the Debug or Run buttons. I like the fact that IntelliJ doesn’t make me mess around with Run/Debug configurations.

Some of my favorite commands and keystrokes (generally configurable) are:

  • c-sh-N (Navigate > File), which lets me easily search for a file and open it
  • c-F2 and c-F5 (stop the currently running process, and debug failed unit tests)
  • In the Run menu, F7 (step into), F8 (step over), Shift-F8 (step out), and F9 (resume program) while in the debugger
  • In the View > Tool Windows menu: Alt-1 (Project) and Alt-7 (Structure)

If you ever run into an error that says you need to add webapps/hdfs to your classpath (has happened to me when running some HDFS unit tests), taking the following steps should fix it (credit goes to this post from Stack Overflow):

  1. Select File > Project Structure…
  2. Click on Modules under “Project Settings.”
  3. Select the hadoop-hdfs project.
  4. Select the Dependencies tab.

  5. Click the + sign on right side and select “Jars or directories.”

  6. From your clone hierarchy, select the .../hadoop-common/hadoop-hdfs-project/hadoop-hdfs/target directory. Click OK.
  7. Check all of the / directories (classes, jar directory, source archive directory). Click OK.
  8. Click OK (again).

Congratulations, you are now ready to contribute to Hadoop via an IntelliJ project!

Charles Lamb is a Software Engineer at Cloudera, currently working on HDFS.

Categories: Hadoop

How-to: Install a Virtual Apache Hadoop Cluster with Vagrant and Cloudera Manager

Cloudera Blog - Fri, 06/20/2014 - 15:25

It’s been a while since we provided a how-to for this purpose. Thanks, Daan Debie (@DaanDebie), for allowing us to re-publish the instructions below (for CDH 5)!

I recently started as a Big Data Engineer at The New Motion. While researching our best options for running an Apache Hadoop cluster, I wanted to try out some of the features available in the newest version of Cloudera’s Hadoop distribution: CDH 5. Of course I could’ve downloaded the QuickStart VM, but I rather wanted to run a virtual cluster, making use of the 16GB of RAM my shiny new 15″ Retina Macbook Pro has ;)

There are some tutorials, and repositories available for installing a local virtualized cluster, but none of them did what I wanted to do: install the bare cluster using Vagrant, and install the Hadoop stack using the Cloudera Manager. So I created a simple Vagrant setup myself. You can find it here.

Setting up the Virtual Machines

As per the instructions from the Gitub repo:

Depending on the hardware of your computer, installation will probably take between 15 and 25 minutes.

First install VirtualBox and Vagrant.

Install the Vagrant Hostmanager plugin.

$ vagrant plugin install vagrant-hostmanager


Clone this repository.

$ git clone https://github.com/DandyDev/virtual-hadoop-cluster.git


Provision the bare cluster. It will ask you to enter your password, so it can modify your /etc/hosts file for easy access in your browser. It uses the

$ cd virtual-hadoop-cluster $ vagrant up


Now we can install the Hadoop stack.

Installing Hadoop and Related Components
  1. Surf to: http://vm-cluster-node1:7180.
  2. Login with admin/admin.
  3. Select Cloudera Express and click Continue twice.
  4. On the page where you have to specifiy hosts, enter the following: vm-cluster-node[1-4] and click Search. Four nodes should pop up and be selected. Click Continue.
  5. On the next page (“Cluster Installation > Select Repository”), leave everything as is and click Continue.
  6. On the next page (“Cluster Installation > Configure Java Encryption”) I’d advise to tick the box, but only if your country allows it. Click Continue.
  7. On this page do the following:
    • Login To All Hosts As: Another user -> enter vagrant
    • In the two password fields enter: vagrant
    • Click Continue.
  8. Wait for Cloudera Manager to install the prerequisites… and click Continue.
  9. Wait for Cloudera Manager to download and distribute the CDH packages… and click Continue.
  10. Wait while the installer is inspecting the hosts, and Run Again if you encounter any (serious) errors (I got some that went away the second time). After this, click Finish.
  11. For now, we’ll install everything but HBase. You can add HBase later, but it’s quite taxing for the virtual cluster. So on the “Cluster Setup” page, choose “Custom Services” and select the following: HDFS, Hive, Hue, Impala, Oozie, Solr, Spark, Sqoop2, YARN and ZooKeeper. Click Continue.
  12. On the next page, you can select what services end up on what nodes. Usually Cloudera Manager chooses the best configuration here, but you can change it if you want. For now, click Continue.
  13. On the “Database Setup” page, leave it on “Use Embedded Database.” Click Test Connection (it says it will skip this step) and click Continue.
  14. Click Continue on the “Review Changes” step. Cloudera Manager will now try to configure and start all services.

And you’re Done!. Have fun experimenting with Hadoop!

Categories: Hadoop

Meet the Data Scientist: Sandy Ryza

Cloudera Blog - Thu, 06/19/2014 - 15:53

Meet Sandy Ryza (@SandySifting), the newest member of Cloudera’s data science team. See Sandy present at Spark Summit 2014 (June 30-July 1 in San Francisco; register here for a 20% discount).

What is your definition of a “data scientist”?

To put it in boring terms, data scientists are people who find that the bulk of the work for testing their hypotheses lies in manipulating quantities of information – AKA, “San Franciscans with calculators.” 

About what parts of the field are you excited in particular?

The proliferation (or “Cambrian Explosion”, as Josh Wills would say) of tools and frameworks for munging and analyzing massive data. A few years ago, we really only had MapReduce for dealing with data at a large scale. Since then, a bunch of players have written engines that can handle more of the data transformation patterns required for complex analytics. I think eventually the market will need standardize on two or three, and I’m curious to see how it plays out.

When it comes to datasets, I’m always trying to find time to play around with traffic data. CalTrans releases the measurements they collect from loop detectors on freeways all over California every 30 seconds, so that’s my go-to whenever I want to try out a new tool or technique.

You previously worked as a software engineer (and are an Apache Hadoop committer). How are the two roles different?

At Cloudera, “data scientist” is a much more external facing role. We don’t have massive data problems of our own, but our customers have a huge variety. I get to spend time understanding these problems, helping out with them, and thinking about how to make our platform a good place to solve them.

That said, my job is still very heavy on the engineering. I spend half of my time contributing to Apache Spark and some more of it writing code for customer engagements. It’s hard to get away from distributed systems over here.  

Josh Wills says data scientists would be more accurately called “data janitors”. Do you agree?

At this point, thanks to the insistence of Josh and others, I think the statement that data science boils down to a bunch of data munging is mostly an accepted truism. But this fact is probably just a stronger endorsement of the term. Anybody who’s done “real” science knows that it’s not exactly about flying around, splashing nature with elegant statistical sprinkles, and “deriving insights”. (Not that I’ve done any real science.) If data science is 20% statistics and 80% engineering, and science is 1% inspiration and 99% perspiration, I’m just happy I get to save on my deodorant bill. 

You’re presenting at Spark Summit 2014. What are your topics, and why should people attend?

I’ll be giving a couple talks. One is about Spark-on-YARN: a deep dive into how the Spark and YARN resource management models intersect, along with some practical advice on how to operate them on your own cluster. The other is about Oryx, MLLib, and machine learning infrastructure. I’ll be talking about what it means to put a machine learning model into production and our plan to base Oryx, an open source project that combines model-serving and model building, on Spark’s streaming and ML libraries.

What is your advice for aspiring data scientists?

This might apply to any job, but I think a good data scientist needs to work on being a good translator and a good teacher. Besides memorizing obscure R syntax and trying to convince their superiors that implementing a Dynamic Bayesian net is a good idea, a data scientist spends most of their time striving to understand problems in the language of the people who have them. For a statistical model or data pipeline to be worth anything, its creator needs to be able to articulate its limitations and the problem it solves. 

Interested in becoming a data scientist yourself? Explore Cloudera’s Intro to Data Science training or Data Science certification program.

Categories: Hadoop

Project Rhino Goal: At-Rest Encryption for Apache Hadoop

Cloudera Blog - Tue, 06/17/2014 - 15:59

An update on community efforts to bring at-rest encryption to HDFS — a major theme of Project Rhino.

Encryption is a key requirement for many privacy and security-sensitive industries, including healthcare (HIPAA regulations), card payments (PCI DSS regulations), and the US government (FISMA regulations).

Although network encryption has been provided in the Apache Hadoop platform for some time (since Hadoop 2.02-alpha/CDH 4.1), at-rest encryption, the encryption of data stored on persistent storage such as disk, is not. To meet that requirement in the platform, Cloudera and Intel are working with the rest of the Hadoop community under the umbrella of Project Rhino — an effort to bring a comprehensive security framework for data protection to Hadoop, which also now includes Apache Sentry (incubating) — to implement at-rest encryption for HDFS (HDFS-6134 and HADOOP-10150).

With this work, encryption and decryption will be transparent: existing Hadoop applications will be able to work with encrypted data without modifications. Data will be encrypted with configurable ciphers and cipher modes, allowing users to choose an appropriate level of confidentiality. Because encryption is being implemented directly in HDFS, the full spectrum of HDFS access methods and file formats will be supported.

Design Principles

At-rest encryption for HDFS centers around the concept of an encryption zone, which is a directory where all the contents are encrypted with a unique encryption key. When accessing data within an encryption zone, HDFS clients will transparently fetch the encryption key from the cluster key server to encrypt and decrypt data. Encryption keys can also be rolled in the event of compromise or because of corporate security policy. Subsequent files will be encrypted with the new encryption key, while existing files can be rewritten by the user to be encrypted with the new key.

Access to encrypted data is dependent on two things: appropriate HDFS-level filesystem permissions (i.e. Unix-style permissions and access control lists) as well as appropriate permissions on the key server for the encryption key. This two-fold scheme has a number of nice properties. Through the use of HDFS ACLs, users and administrators can granularly control data access. However, because key server permissions are also required, compromising HDFS is insufficient to gain access to unencrypted data. Importantly, this means that even HDFS administrators do not have full access to unencrypted data on the cluster.

A critical part of this vision is the cluster key server (for example, Cloudera Navigator Key Trustee). Since we foresee customer deployments with hundreds to thousands of encryption zones, an enterprise-grade key server needs to be secure, reliable, and easy-to-use. In highly-regulated industries, robust key management is as valid a requirement as at-rest encryption itself.

An equally important part of this vision is support for hardware-accelerated encryption. Encryption is a business-critical need, but it can be unusable if it carries a significant performance penalty. This requirement is addressed by other Rhino-related contributions from Intel (HDFS-10693), which will provide highly optimized libraries using AES-NI instructions available on Intel processors. By using these Intel libraries, HDFS will be able to provide access to encrypted data at hardware speeds with minimal performance impact.


At-rest encryption has been a major objective for Rhino since the effort’s inception. With the new Cloudera-Intel relationship — and the addition of even more engineering resources at our new Center for Security Excellence — we expect at-rest encryption to be among the first committed features to arise from Rhino, and among the first such features to ship inside Cloudera Enterprise 5.x.

Andrew Wang and Charles Lamb are Software Engineers on the HDFS team at Cloudera. Andrew is also a Hadoop PMC member.

To learn more about comprehensive, compliance-ready security for the enterprise, register for the upcoming webinar, “Compliance-Ready Hadoop,” on June 19 at 10am PT.

Categories: Hadoop

How-to: Easily Do Rolling Upgrades with Cloudera Manager

Cloudera Blog - Thu, 06/12/2014 - 16:07

Unique across all options, Cloudera Manager makes it easy to do what would otherwise be a disruptive operation for operators and users.

For the increasing number of customers that rely on enterprise data hubs (EDHs) for business-critical applications, it is imperative to minimize or eliminate downtime — thus, Cloudera has focused intently on making software upgrades a routine, non-disruptive operation for EDH administrators and users.

With Cloudera Manager 4.6 and later, it is extremely easy to do minor version upgrades (from CDH 4.3 to CDH 4.6) with zero downtime, and with no service interruptions during the process. The parcels binary packaging format is the key to this process: Parcels allow administrators to keep multiple versions of EDH software on their clusters simultaneously, and make transitions across versions seamless.

There are two steps involved in doing a rolling upgrade:

  1. Distributing and activating a newer version of the relevant parcel (described in detail here)
  2. Doing a rolling restart of the existing cluster, which I will explain in the remainder of this relatively brief post
Doing the Rolling Restart

To start the rolling restart operation, go to the Actions menu of your cluster in Cloudera Manager and select Rolling Restart. (Having a highly available NameNode is a pre-requisite for the rolling restart operation because it ensures that all data is available throughout the operation.)

Selecting Rolling Restart will take you to a pop-up where you can specify the parameters for the operation:

The important parameters in the dialog box are as follows:

  • Services to restart: Here you specify which services should be restarted with no downtime. Any service that does not have a single point of access for the clients (including HDFS, Apache HBase, Apache MapReduce, Apache Oozie, and Apache ZooKeeper) is eligible for rolling restart because we can ensure that clients do not face any service interruptions while the operation is going on. This setting is required and the form provides more options to select what roles to restart from the selected services
    • Roles to include: Here you can specify if you want to restart only worker roles (DataNode, TaskTracker, and RegionServer), only non-worker roles (NameNode, JobTracker, HBase Master, ZooKeeper Server, and so on), or all roles. This setting offers the flexibility to pick the order in which the roles are restarted.
    • Role filters: These filters are present to allow users the convenience to only restart the specific roles that have a new configuration or a new software version. One common use case for these role filters is to resume the rolling restart operation if it fails due to host failures in the middle of the operation. If that happens, you can trigger rolling restart again with the appropriate role filter and it will resume restarting roles that haven’t been restarted after software upgrade or configuration changes. The list of roles on which the rolling restart operation is performed is an intersection of “Roles to include” and the selected role filter.
  • Batch size: Here you can specify how many hosts to restart at a time. Typically, you would choose 5-10% of the size of the cluster here if you have multiple racks. If you select a higher batch size, the overall rolling restart operation will finish sooner, but the cluster performance will be reduced during the operation as there would be fewer worker roles active at any given time. If you have a single rack, then the batch size should be left as 1.

Once you’ve specified the parameters, click Confirm, which will take you to the Command Details page for the command that is consequently triggered.

The command will restart the selected services and their roles in proper order to ensure zero downtime for EDH users. The rolling restart operation itself comprises several steps that occur in the background:

  • Master rolling restart: Highly available master roles like NameNode and JobTracker, along with other auxiliary roles in the cluster, are restarted without interrupting services to EDH consumers. To do so, one of the masters is always available and active during the operation.
  • Worker rolling restart: This step involves restarting the worker roles of each service properly to not affect any running jobs or clients accessing data in EDH. The roles are restarted rack-by-rack. By default, Hadoop maintains replicas of every block on at least two racks (provided there are multiple racks), which ensures that all data is always available to the clients. Within each rack, the hosts are grouped in specified batch sizes alphabetically and then roles are restarted on them. For some roles, a simple restart is sufficient, while others must be decommissioned to ensure that another worker role is servicing the client.

The operation can take a little while to finish depending on cluster size and the chosen batch size. Should a host failure occur, Cloudera Manager lets users easily recover and resume  the operation using the role filters described above.


As you can see, what would otherwise be a complex operation for doing a software upgrade with no downtime is made extremely easy by Cloudera Manager — and can be done without ever leaving the UI. More information about rolling upgrades can be found in Cloudera Manager documentation.

Vikram Srivastava is a Software Engineer at Cloudera.

Categories: Hadoop