Checklist for Painless Upgrades to CDH 5

Cloudera Blog - Thu, 03/26/2015 - 15:57

Following these best practices can make your upgrade path to CDH 5 relatively free of obstacles.

Upgrading the software that powers mission-critical workloads can be challenging in any circumstance. In the case of CDH, however, Cloudera Manager makes upgrades easy, and the built-in Upgrade Wizard, available with Cloudera Manager 5, further simplifies the upgrade process. The wizard performs service-specific upgrade steps that, previously, you had to run manually, and also features a rolling restart capability that reduces downtime for minor and maintenance version upgrades. (Please refer to this blog post or webinar to learn more about the Upgrade Wizard).

As you prepare to upgrade your cluster, keep this checklist of some of Cloudera’s best practices and additional recommendations in mind. Please note that this information is complement to, not a replacement for, the comprehensive upgrade documentation.

Backing Up Databases

You will need to take backups prior to the upgrade. It is recommended that you already have procedures in place to periodically backup your databases. Prior to upgrading, be sure to:

  1. Back-up the Cloudera Manager server and management databases that store configuration, monitoring, and reporting data. (These include the databases that contain all the information about what services you have configured, their role assignments, all configuration history, commands, users, and running processes.)
  2. Back-up all databases (if you don’t already have regularly scheduled backup procedures), including the Apache Hive Metastore Server, Apache Sentry server (contains authorization metadata), Cloudera Navigator Audit Server (contains auditing information), Cloudera Navigator Metadata Server (contains authorization, policies, and audit report metadata), Apache Sqoop Metastore, Hue, Apache Oozie, and Apache Sqoop.
  3. Back-up NameNode metadata by locating the NameNode Data Directories in the HDFS service and back up a listed directory (you only need to make a backup of one directory if more than one is listed)

Note: Cloudera Manager provides an integrated, easy-to-use management solution for enabling Backup and Disaster Recovery and the key capabilities are fully integrated into the Cloudera Manager Admin Console. It also is automated and fault tolerant.

Cloudera Manager makes it easy to manage data stored in HDFS and accessed through Hive. You can define your backup and disaster recovery policies and apply them across services. You can select the key datasets that are critical to your business, schedule and track the progress of data replication jobs, and get notified when a replication job fails. Replication can be set up on files or directories in the case of HDFS and on tables in the case of Hive. Hive metastore information is also replicated which means that table definitions are updated. (Please refer to the BDR documentation for more details.)

A separate Disaster Recovery cluster is not required for a safe upgrade but the Backup and Disaster Recovery capability in Cloudera Manager can ease the upgrade process by ensuring the critical parts of your infrastructure are backed up before you take the upgrade plunge.

Recommended Practices for Upgrading to CDH 5
  1. Create fine-grained, step-by-step production plan for critical upgrades (using the Upgrade Documentation as a reference).
  2. Document the current deployment by chronicling the existing cluster environment and dependencies, including
    • The current CDH and Cloudera Manager versions that are installed
    • All third-party tools that interact with the cluster
    • The databases for Cloudera Manager, Hive, Oozie, and Hue
    • Important job performance metrics so pre-upgrade baselines are well defined
  3. Test the production upgrade plan in a non-production environment (e.g. sandbox or test environment) so you can update the plan if there are unexpected outcomes. It also allows you to:
    • Test job compatibility with the new version
    • Run performance tests
  4. Upgrade to Cloudera Manager 5 before upgrading to CDH 5.
    • Ensure the Cloudera Manager minor version is equal to or greater than the target CDH minor version—the Cloudera Manager version must always be equal to or greater than the CDH version to which you upgrade.
  5. Reserve a maintenance window with enough time allotted to perform all steps.
    • For a major upgrade on production clusters, Cloudera recommends allocating up to a full-day maintenance window to perform the upgrade (but time is dependent on the number of hosts, the amount of Hadoop experience, and the particular hardware). Note that it is not possible to perform a rolling upgrade from CDH 4 to CDH 5 (major upgrade) due to incompatibilities between the two major versions.
  6. Maintain your own local Cloudera Manager and CDH package/parcel repositories to protect against external repositories being unavailable.
    • Read the reference documentation for details on how to create a local Yum repository, or
    • Pre-download a parcel to a local parcel repository on the Cloudera Manager server, where it is available for distribution to the other nodes in any of your clusters managed by this Cloudera Manager server. You can have multiple parcels for a given product downloaded to your Cloudera Manager server. Once a parcel has been downloaded to the server, it will be available for distribution on all clusters managed by the server. (Note: Parcel and package installations are equally supported by the Upgrade Wizard. Using parcels is the preferred and recommended way, as packages must be manually installed, whereas parcels are installed by Cloudera Manager. See this FAQ and this blog post to learn more about parcels.)
  7. Ensure there are no Oozie workflows in RUNNING or SUSPENDED status as the Oozie database upgrade will fail and you will have to reinstall CDH 4 to complete or kill those running workflows. (Note: When upgrading from CDH 4 to CDH 5, the Oozie upgrade can take a very long time. You can reduce this time by reducing the amount of history Oozie retains; see the documentation.)
  8. Import MapReduce configurations to YARN as part of the Upgrade Wizard. (Note: If you do not import configurations during upgrade, you can manually import the configurations at a later time. In addition to importing configuration settings, the import process will configure services to use YARN as the MapReduce computation framework instead of MapReduce and overwrites existing YARN configuration and role assignments.)

These recommendations and notable points to address when planning an upgrade to a Cloudera cluster are intended to complement the upgrade documentation that is provided for Cloudera Manager and CDH. As mentioned, Cloudera Manager streamlines the upgrade process and strives to prevent job failures by making upgrades simple and predictable—which is especially necessary for production clusters.

Cloudera’s enterprise data hub is constantly evolving with more production-ready capabilities and innovative tools. To ensure the highest level of functionality and stability, consider upgrading to the most recent version of CDH.

Categories: Hadoop

How Edmunds.com Used Spark Streaming to Build a Near Real-Time Dashboard

Cloudera Blog - Tue, 03/24/2015 - 16:15

Thanks to Sam Shuster, Software Engineer at Edmunds.com, for the guest post below about his company’s use case for Spark Streaming, SparkOnHBase, and Morphlines.

Every year, the Super Bowl brings parties, food and hopefully a great game to appease everyone’s football appetites until the fall. With any event that brings in around 114 million viewers with larger numbers each year, Americans have also grown accustomed to commercials with production budgets on par with television shows and with entertainment value that tries to rival even the game itself.

Some of the big spenders every year are car manufacturers. Edmunds.com is a car shopping website where people from all across the nation come to read reviews, compare prices, and in general get help in all matters car related. Because of Edmunds’ place as a destination for automobile news and facts, Super Bowl car commercials do indeed mean traffic spikes to make and model specific pages.

For the last few years, business analysts at Edmunds have used this unique opportunity of unfettered advertising cause and effect as a way to create awareness by sending out updates during the game to interested parties concerning how the commercials have affected site traffic for particular make and model pages. Unfortunately, in the past, these updates have been restricted to hourly granularities with an additional hour delay. Furthermore, as this data was not available in an easy-to-use dashboard, manual processing was needed to visualize the data.

As our team began to transition from MapReduce to Apache Spark, we saw this use case as a perfect opportunity to explore a solution via Spark Streaming. The goal was to build a near real-time dashboard that would provide both unique visitor and page view counts per make and make/model that could be engineered in a couple of weeks.

The Plan

Here is our prototype architecture (by no means optimal) that takes Apache Web Server loglines describing user event information, aggregates loglines by visitor id and make/model page information to get unique visitor and page view counts and finally outputs this information to be visualized by a near real-time dashboard.

Why Flume?

As we were already using Apache Flume for ingesting our logs, using a Flume Spark Streaming sink was an obvious choice. In addition to being more reliable, the polling sink does not require the Flume agent to be restarted every time the Spark Streaming job is restarted.

Unfortunately, due to the limitations of our media-tier Flume agent configuration, which currently spools the log files on five-minute intervals, we are currently limited to a streaming micro-batch size of five minutes as well. Eventually, the goal will be to change the production media flume agents to tail the logs so that data is flowing at a constant rate. Every other stage of our architecture could easily handle the load at 30 seconds or less so there is no reason other than our team not being in control over this Flume agent’s configuration that we have not been able to break 5 minutes.

Why Spark Streaming?

While we have had success using a Flume Solr sink combined with custom Morphlines to go directly from Flume to Apache Solr for log lines with out aggregates, we needed something that was able to perform complicated aggregations quickly, which is why Spark Streaming was necessary.

Why HBase and Lily?

At this point, as the end goal for this application was Banana (the dashboard tool that reads directly from Solr) you might wonder why we decided to include Apache HBase and Lily HBase Indexer as added complications to an already fairly lengthy pipeline. There were a couple of reasons:

  • The existence of the Cloudera Labs project, SparkOnHBase. (This blog post explains more about the project and how to incorporate it into your Spark jobs.) This library provides an easy-to-use interface for connecting Spark batch and streaming jobs to HBase. Writing directly to Solr would have required an entirely new library, with functionality very similar to what already exists in the SparkOnHBase project.
  • Our existing processing ecosystem features HBase as an important data sink/source and ideally we would want streaming data to be available in HBase for other processes.
  • Having HBase as an intermediate data store means that we have more flexibility if we ever decide to change our front-end dashboard.
  • Finally, in the event Solr was to crash, HBase has all data replicated and Lily could be used to repopulate Solr.
Why Solr and Banana?

A main reason for wishing to include Solr is that it exposes the data in a fashion that makes it easily accessible to others through a rest interface and queries.

As for dashboards, we did briefly consider other tools like Graphite but found that for this initial prototype that the flexibility, ease of use, and customizability of Banana was perfect for our use case and lack of expertise in the area. Plus, Banana is free.


We want to calculate two different metrics:

  • Page View Counts
  • Unique Visitor Counts

We want to compute the above metrics for:

  • Every make
  • Every model

Finally, we want to have two time windows:

  • Cumulative count that refreshes at midnight eastern time
  • Count that is on the scale of the micro batch size of the streaming process

While eventually Edmunds will want to aggregate data using windowing functions to obtain counts for other time periods (hourly for example), for this prototype we restricted ourselves to only aggregations for every micro batch and a cumulative count through using the updateStateByKey function. While this cumulative statistic can be trivially computed as the sum of the microbatch values for page views, unique visitors require that this cumulative count be computed separately.

Saving every visitor to determine uniqueness for a 24-hour time period would be resource intensive, so we decided to use the Twitter algebird implementation of an approximate streaming algorithm called HyperLogLog. The state stored by the DStream call is thus the HLL object itself, which represents an approximate set of the visitors seen so far as well as the cardinality of that set. For those of who want to know how close the approximation came to actual data, for a 24-bit HLL we had an average % error of 0.016% and a standard deviation of 0.106%—so it performed very well while taking up a fixed, small memory cost for each make and make model count.

Here is an example flow of the aggregation of the unique visitors metrics, which ends with outputting two DStreams for the two different time windows:

Working with HBase

While performing scans on HBase is easy to do using the Spark API, doing Puts, Gets, and other useful operations on HBase is much trickier. The SparkOnHBase library provides a simple API that abstracts much of the lower level operations that are required to achieve those operations. This specific streaming job uses the streamBulkPut method. For every count, we put a row key that is comprised of the make, model, data point, aggregation type and timestamp. The reason for including the timestamp in the row key itself is so that every value is written to its own row without using versioning. Finally, we also put these values under different qualifiers for each of the attributes so that the Lily Indexer Morphline can easy transform those values into the Solr fields without having to parse the row key.

Lily Indexer Morphline and Solr Schema

Here is a snippet of the Morphline used by Lily Indexer to transform the HBase data into the Solr schema. The format of the Solr schema was primary chosen based on the limitations of Banana.

extractHBaseCells {           mappings : [             {               inputColumn : "data:count"               outputField : "count"               type : long               source : value             },             {               inputColumn : "data:timestamp"               outputField : "timestamp"               type : string               source : value             },

Morphlines is an open source framework (inside Kite SDK) with the sole intent to make ETL processes as painless to create, and as highly configurable, as possible. Basically, a Morphline file provides a sequence of commands that you wish to apply to your data and that are powerful enough to even fully replace such things as log processing applications (which we have done with a different Flume -> Morphlines -> Solr process).

Morphlines in this pipeline are used by Lily Indexer as the logic for how to transform HBase values into Solr values. For example, the first mapping above says that the input column is in the column family called data and the qualifier count and it is the value. This field is to be put into the Solr field count.

<field name="timestamp" type="date" indexed="true" stored="true" /> <field name="count" type="long" indexed="false" stored="true"/> <field name="valueType" type="string" indexed="true" stored="true"/> <field name="aggregationType" type="string" indexed="true" stored="true"/> <field name="make" type="string" indexed="true" stored="true" default="EMPTY"/> <field name="makeModel" type="string" indexed="true" stored="true" default="EMPTY"/> <field name="id" type="string" indexed="true" stored="true"/>

Banana then can make time range queries to the Solr server based on the above schema to create time-series plots. The below plot shows page views per minute between at 14:00 to 21:00 PST on Jan. 25, 2015, for each make.

Page views per a minute per make on a normal Sunday. This is the Banana panel (histogram) that supports grouping by a separate Solr field (in this case, make).

This is the query that generates the above plot (note timestamps are actually in GMT):

q=*%3A*&df=id&wt=json&rows=100000&fq=timestamp:[2015-02-01T22:00:00.000Z%20TO%202015-02-02T05:00:00.000Z]&fl=timestamp count&group=true&group.field=make&group.limit=100000&fq=aggregationType:PVCURR -make:EMPTY

You then can choose how often you wish Banana to query Solr for updates and voila, you have your streaming near real-time dashboard!


So you might be skeptical that we would actually see detectable differences to make and make model specific pages during the Super Bowl. Would people really visit Edmunds.com while the game is happening? Well, compare the previous snapshot to the snapshot below, which is for 16:00 to 21:00 on Super Bowl Sunday 2015 (on Feb. 1).

Super Bowl Sunday page views per a minute. Note the peaks which blow the normal noise maximum of 450 out of the water!

Super Bowl XLIX took place from 3:30pm PST to 7:30pm PST and as you can see as compared to normal site traffic there are huge spikes up to around 5500 page views for Lexus near half time. This is as compared to around 150 page views on a normal Sunday evening. No statistical tests are needed to determine if those are significant increases!

Let’s look at a specific instance. At around 5:55pm (unfortunately we do not have exact times of when the commercials aired), there was a Kia Sorento commercial that featured Pierce Brosnan in a James Bond spoof. Kia is a much less popular make on Edmunds.com in general, so we see a much smaller increase in unique visitors to Kia pages – going up to around 1,049 unique visitors at its peak at 6:10pm. This commercial, however, meant that Kia Sorento finished as the third-most viewed model for the day.

Kia unique visitors per 5 minutes. Notice the peak at 6:10 to 1049 unique visitors.

Kia Sorento unique visitors per 5 minutes. Note the increase at 6:10 to 923 unique visitors.


Kia Sorento cumulative unique visitors. Note the huge increase at 6:10 which helped Kia Sorento finish as third-most visited model at Edmunds.com on Super Bowl Sunday.


As I have hopefully have demonstrated, the Spark Streaming prototype was a success and satisfied all of our requirements in being able to present near real time updates of unique visitors and page views to make and make model pages on Edmunds.com. The fact that the system was able to be put together in a tight timeframe and was reliable enough to be used on a live Super Bowl campaign is a testament to the conciseness and relative ease of the new way of thinking that is Spark. So, what are you waiting for?

Categories: Hadoop

How-to: Quickly Configure Kerberos for Your Apache Hadoop Cluster

Cloudera Blog - Mon, 03/23/2015 - 16:12

Use the scripts and screenshots below to configure a Kerberized cluster in minutes.

Kerberos is the foundation of securing your Apache Hadoop cluster. With Kerberos enabled, user authentication is required. Once users are authenticated, you can use projects like Apache Sentry for role-based access control via GRANT/REVOKE statements. 

Taming the three-headed dog that guards the gates of Hades is challenging, so Cloudera has put significant effort into making this process easier in Hadoop-based enterprise data hubs. In this post, you’ll learn how to stand-up a one-node cluster with Kerberos enforcing user authentication, using the Cloudera QuickStart VM as a demo environment.

If you want to read the product documentation, it’s available here. You should consider this reference material; I’d suggest reading it later to understand more details about what the scripts do.


You need the following downloads to follow along.

Initial Configuration

Before you start the QuickStart VM, increase the memory allocation to 8GB RAM and increase the number of CPUs to two. You can get by with a little less RAM, but we will have everything including the Kerberos server running on one node.

Start up the VM and activate Cloudera Manager as show here:

Give this script some time to run, it has to restart the cluster.

KDC Install and Setup Script

The script goKerberos_beforeCM.sh does all the setup work for the Kerberos server and the appropriate configuration parameters. The comments are designed to explain what is going on inline. (Do not copy and paste this script! It contains unprintable characters that are pretending to be spaces. Rather, download it.)

#!/bin/bash # (c) copyright 2014 martin lurie sample code not supported # reminder to activate CM in the quickstart echo Activate CM in the quickstart vmware image echo Hit enter when you are ready to proceed # pause until the user hits enter read foo # for debugging - set -x # fix the permissions in the quickstart vm # may not be an issue in later versions of the vm # this fixes the following error # failed to start File /etc/hadoop must not be world # or group writable, but is 775 # File /etc must not be world or group writable, but is 775 # # run this as root # to become root # sudo su - cd /root chmod 755 /etc chmod 755 /etc/hadoop # install the kerberos components yum install -y krb5-server yum install -y openldap-clients yum -y install krb5-workstation # update the config files for the realm name and hostname # in the quickstart VM # notice the -i.xxx for sed will create an automatic backup # of the file before making edits in place # # set the Realm # this would normally be YOURCOMPANY.COM # in this case the hostname is quickstart.cloudera # so the equivalent domain name is CLOUDERA sed -i.orig 's/EXAMPLE.COM/CLOUDERA/g' /etc/krb5.conf # set the hostname for the kerberos server sed -i.m1 's/kerberos.example.com/quickstart.cloudera/g' /etc/krb5.conf # change domain name to cloudera sed -i.m2 's/example.com/cloudera/g' /etc/krb5.conf # download UnlimitedJCEPolicyJDK7.zip from Oracle into # the /root directory # we will use this for full strength 256 bit encryption mkdir jce cd jce unzip ../UnlimitedJCEPolicyJDK7.zip # save the original jar files cp /usr/java/jdk1.7.0_67-cloudera/jre/lib/security/local_policy.jar local_policy.jar.orig cp /usr/java/jdk1.7.0_67-cloudera/jre/lib/security/US_export_policy.jar US_export_policy.jar.orig # copy the new jars into place cp /root/jce/UnlimitedJCEPolicy/local_policy.jar /usr/java/jdk1.7.0_67-cloudera/jre/lib/security/local_policy.jar cp /root/jce/UnlimitedJCEPolicy/US_export_policy.jar /usr/java/jdk1.7.0_67-cloudera/jre/lib/security/US_export_policy.jar # now create the kerberos database # type in cloudera at the password prompt echo suggested password is cloudera kdb5_util create -s # update the kdc.conf file sed -i.orig 's/EXAMPLE.COM/CLOUDERA/g' /var/kerberos/krb5kdc/kdc.conf # this will add a line to the file with ticket life sed -i.m1 '/dict_file/a max_life = 1d' /var/kerberos/krb5kdc/kdc.conf # add a max renewable life sed -i.m2 '/dict_file/a max_renewable_life = 7d' /var/kerberos/krb5kdc/kdc.conf # indent the two new lines in the file sed -i.m3 's/^max_/ max_/' /var/kerberos/krb5kdc/kdc.conf # the acl file needs to be updated so the */admin # is enabled with admin privileges sed -i 's/EXAMPLE.COM/CLOUDERA/' /var/kerberos/krb5kdc/kadm5.acl # The kerberos authorization tickets need to be renewable # if not the Hue service will show bad (red) status # and the Hue “Kerberos Ticket Renewer” will not start # the error message in the log will look like this: # kt_renewer ERROR Couldn't renew # kerberos ticket in # order to work around Kerberos 1.8.1 issue. # Please check that the ticket for 'hue/quickstart.cloudera' # is still renewable # update the kdc.conf file to allow renewable sed -i.m3 '/supported_enctypes/a default_principal_flags = +renewable, +forwardable' /var/kerberos/krb5kdc/kdc.conf # fix the indenting sed -i.m4 's/^default_principal_flags/ default_principal_flags/' /var/kerberos/krb5kdc/kdc.conf # There is an addition error message you may encounter # this requires an update to the krbtgt principal # 5:39:59 PM ERROR kt_renewer # #Couldn't renew kerberos ticket in order to work around # Kerberos 1.8.1 issue. Please check that the ticket # for 'hue/quickstart.cloudera' is still renewable: # $ kinit -f -c /tmp/hue_krb5_ccache #If the 'renew until' date is the same as the 'valid starting' # date, the ticket cannot be renewed. Please check your # KDC configuration, and the ticket renewal policy # (maxrenewlife) for the 'hue/quickstart.cloudera' # and `krbtgt' principals. # # # we need a running server and admin service to make this update service krb5kdc start service kadmin start kadmin.local <<eoj modprinc -maxrenewlife 1week krbtgt/CLOUDERA@CLOUDERA eoj # now just add a few user principals #kadmin: addprinc -pw <Password> # cloudera-scm/admin@YOUR-LOCAL-REALM.COM # add the admin user that CM will use to provision # kerberos in the cluster kadmin.local <<eoj addprinc -pw cloudera cloudera-scm/admin@CLOUDERA modprinc -maxrenewlife 1week cloudera-scm/admin@CLOUDERA eoj # add the hdfs principal so you have a superuser for hdfs kadmin.local <<eoj addprinc -pw cloudera hdfs@CLOUDERA eoj # add a cloudera principal for the standard user # in the Cloudera Quickstart VM kadmin.local <<eoj addprinc -pw cloudera cloudera@CLOUDERA eoj # test the server by authenticating as the CM admin user # enter the password cloudera when you are prompted echo use kinit to get a valid ticket to access the cluster kinit cloudera-scm/admin@CLOUDERA # once you have a valid ticket you can see the # characteristics of the ticket with klist -e # you will see the encryption type which you will # need for a screen in the wizard, for example # Etype (skey, tkt): aes256-cts-hmac-sha1-96 klist -e # to see the contents of the files cat them cat /var/kerberos/krb5kdc/kdc.conf cat /var/kerberos/krb5kdc/kadm5.acl cat /etc/krb5.conf #The files will look like this: [root@quickstart ~]# cat /var/kerberos/krb5kdc/kdc.conf [kdcdefaults] kdc_ports = 88 kdc_tcp_ports = 88 [realms] CLOUDERA = { #master_key_type = aes256-cts acl_file = /var/kerberos/krb5kdc/kadm5.acl dict_file = /usr/share/dict/words max_renewable_life = 7d max_life = 1d admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal default_principal_flags = +renewable, +forwardable } [root@quickstart ~]# cat /var/kerberos/krb5kdc/kadm5.acl */admin@CLOUDERA * [root@quickstart ~]# cat /etc/krb5.conf [libdefaults] default_realm = CLOUDERA dns_lookup_kdc = false dns_lookup_realm = false ticket_lifetime = 86400 renew_lifetime = 604800 forwardable = true default_tgs_enctypes = aes256-cts-hmac-sha1-96 default_tkt_enctypes = aes256-cts-hmac-sha1-96 permitted_enctypes = aes256-cts-hmac-sha1-96 udp_preference_limit = 1 [realms] CLOUDERA = { kdc = quickstart.cloudera admin_server = quickstart.cloudera }

Cloudera Manager Kerberos Wizard

After running the script, you now have a working Kerberos server and can secure the Hadoop cluster. The wizard will do most of the heavy lifting; you just have to fill in a few values.

To start, log into Cloudera Manager by going to http://quickstart.cloudera:7180 in your browser. The userid is cloudera and the password is cloudera. (Almost needless to say but never use “cloudera” as a password in a real-world setting.)

There are lots of productivity tools here for managing the cluster but ignore them for now and head straight for the Administration > Kerberos wizard as shown in the next screenshot.

Click on the “Enable Kerberos” button.

The four checklist items were all completed by the script you’ve already run. Check off each item and select “Continue.”

The Kerberos Wizard needs to know the details of what the script configured. Fill in the entries as follows:

  • KDC Server Host: quickstart.cloudera
  • Kerberos Security Realm: CLOUDERA
  • Kerberos Encryption Types: aes256-cts-hmac-sha1-96

Click “Continue.”

Do you want Cloudera Manager to manage the krb5.conf files in your cluster? Remember, the whole point of this blog post is to make Kerberos easier. So, please check “Yes” and then select “Continue.”

The Kerberos Wizard is going to create Kerberos principals for the different services in the cluster. To do that it needs a Kerberos Administrator ID. The ID created is: cloudera-scm/admin@CLOUDERA.

The screen shot shows how to enter this information. Recall the password is: cloudera.

The next screen provides good news. It lets you know that the wizard was able to successfully authenticate. 

OK, you’re ready to let the Kerberos Wizard do its work. Since this is a VM, you can safely select “I’m ready to restart the cluster now” and then click “Continue.” You now have time to go get a coffee or other beverage of your choice.

How long does that take? Just let it work.

Congrats, you are now running a Hadoop cluster secured with Kerberos.

Kerberos is Enabled. Now What?

The old method of su - hdfs will no longer provide administrator access to the HDFS filesystem. Here is how you become the hdfs user with Kerberos:

kinit hdfs@CLOUDERA

Now validate you can do hdfs user things:

hadoop fs -mkdir /eraseme hadoop fs -rmdir /eraseme

Next, invalidate the Kerberos token so as not to break anything:


The min.user parameter needs to be fixed per the message below:

Requested user cloudera is not whitelisted and has id 501, which is below the minimum allowed 1000 Must kinit prior to using cluster

This is the error message you get without fixing min.user.id:

Application initialization failed (exitCode=255) with output: Requested user cloudera is not whitelisted and has id 501, which is below the minimum allowed 1000

Save the changes shown above and restart the YARN service. Now validate that the cloudera user can use the cluster:

kinit cloudera@CLOUDERA hadoop jar /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar pi 10 10000

If you forget to kinit before trying to use the cluster you’ll get the errors below. The simple fix is to use kinit with the principal you wish to use.

# force the error to occur by eliminating the ticket with kdestroy [cloudera@quickstart ~]$ kdestroy [cloudera@quickstart ~]$ hadoop fs -ls 15/01/12 08:21:33 WARN security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/01/12 08:21:33 WARN ipc.Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/01/12 08:21:33 WARN security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:KERBEROS) cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] ls: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; Host Details : local host is: "quickstart.cloudera/"; destination host is: "quickstart.cloudera":8020; [cloudera@quickstart ~]$ hadoop fs -put /etc/hosts myhosts 15/01/12 08:21:47 WARN security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/01/12 08:21:47 WARN ipc.Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/01/12 08:21:47 WARN security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:KERBEROS) cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] put: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; Host Details : local host is: "quickstart.cloudera/"; destination host is: "quickstart.cloudera":8020;

Congratulations, you have a running Kerberos cluster!

Marty Lurie is a Systems Engineer at Cloudera.

Categories: Hadoop

Converting Apache Avro Data to Parquet Format in Apache Hadoop

Cloudera Blog - Fri, 03/20/2015 - 16:13

Thanks to Big Data Solutions Architect Matthieu Lieber for allowing us to republish the post below.

A customer of mine wants to take advantage of both worlds: work with his existing Apache Avro data, with all of the advantages that it confers, but take advantage of the predicate push-down features that Parquet provides. How to reconcile the two?

For more information about combining these formats, see this.

For a quick recap on Avro, see my previous post. While you are at it, see why Apache Avro is currently the gold standard in the industry.

What we are going to demonstrate here: how to take advantage of existing tools to convert our existing Avro format into Apache Parquet (incubating at the time of this writing), and make sure we can query that transformed data.

Parquet Data

First let’s try to convert text data to Parquet, and read it back. Fortunately there is some code already from Cloudera to do this in MapReduce

The code from Cloudera: https://github.com/cloudera/parquet-examples, and doc here lets you read and write Parquet data. Let’s try this.

First, let’s create some Parquet data as input. We will use Hive for this, by directly converting Text data into Parquet.

Parquet Conversion
    1. Let’s create a csv data example, and create a text table (here, just 2 columns of integers) in HDFS pointing to it:


      create table mycsvtable (x int, y int) row format delimited ELDS TERMINATED BY ',' STORED AS TEXTFILE; LOAD DATA LOCAL INPATH '/home/cloudera/test/' OVERWRITE INTO TABLE mycsvtable;

    2. Create a Parquet table in Hive, and convert the data to it:

      create table myparquettable (a INT, b INT) STORED AS PARQUET LOCATION '/tmp/data'; insert overwrite table myparquettable select * from mycsvtable;

    3. You will need to add Hadoop and Parquet libraries relevant to the project in say, Eclipse for the code needed to be built; therefore, all of the links to the proper libs needed to be added. We then export the code as a JAR (File->Export as Running Jar) and run it outside of Eclipse (otherwise, some Hadoop security issues ensue that prevent you to run the code).
    4. Run the program (you could also run java instead of Hadoop if you copy the data from hdfs to local disk). The arguments are: inputData as Parquet / outputData as csv. We just want to ensure that we can read the Parquet data and display it.

      $ sudo hadoop -jar ./testparquet.jar hdfs:///home/cloudera/test/data/000000_0 hdfs:///home/cloudera/test/dataparquet

      See result: (csv file):

      $ more test/dataparquet2/part-m-00000 1,2 3,4 5,6

Avro Data Conversion Avro Data Example

Let’s get some Avro data example working, from this post.

Avro Data Generation

Interestingly Hive doesn’t let you load/convert csv data into Avro like we did in the Parquet example.  

Let’s walk through an example of creating an Avro schema with its IDL, and generating some data. Let’s use this example, with this twitter.avsc schema:

{ "type" : "record", "name" : "twitter_schema", "namespace" : "com.miguno.avro", "fields" : [ { "name" : "username", "type" : "string", "doc" : "Name of the user account on Twitter.com" }, { "name" : "tweet", "type" : "string", "doc" : "The content of the user's Twitter message" }, { "name" : "timestamp", "type" : "long", "doc" : "Unix epoch time in seconds" } ], "doc:" : "A basic schema for storing Twitter messages" }

and some data in twitter.json:

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 } {"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 }

We will convert the data (in Json) into binary Avro format.

$ java -jar ~/avro-tools-1.7.7.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro

Transformation from Avro to Parquet Storage Format

So essentially use the best of both worlds: take advantage of the Avro object model and serialization format of Avro, and combine it with the columnar storage format of Parquet.

First we will reuse our Avro data that was created earlier.

          1. We will then take advantage of this code to convert the Avro data to Parquet data. This is a map-only job that simply sets up the right input and output format according to what we want.
          2. After compilation, let’s run the script on our existing Avro data:


            $  hadoop jar avro2parquet.jar hdfs:///user/cloudera/twitter.avsc hdfs:///user/cloudera/inputdir hdfs:///user/cloudera/outputdir

            We get:

            $ hadoop fs -ls /user/cloudera/outputdir Found 3 items -rw-r--r-- /user/cloudera/outputdir2/_SUCCESS -rw-r--r-- 1 cloudera cloudera /user/cloudera/outputdir2/_metadata -rw-r--r-- 1 cloudera cloudera /user/cloudera/outputdir2/part-m-00000.snappy.parquet

            Note that the Avro schema is converted directly to a Parquet-compatible format.


          3. Now let’s test our result in Hive. We first create a Parquet table (note the simple syntax in Hive 0.14+), then point to the data we just created via a LOAD command, and finally query our converted data directly.
            hive> create table tweets_parquet (username string, tweet string, timestamp bigint) STORED AS PARQUET; OK load data inpath '/user/cloudera/outputdir/part-m-00000.snappy.parquet' overwrite into table tweets_parquet; Loading data to table default.tweets_parquet chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/tweets_parquet/part-m-00000.snappy.parquet': User does not belong to hive Table default.tweets_parquet stats: [numFiles=1, numRows=0, totalSize=1075, rawDataSize=0] OK Time taken: 6.712 seconds hive select * from tweets_parquet; OK miguno Rock: Nerf paper, scissors is fine. 1366150681 BlizzardCS Works as intended. Terran is IMBA. 1366154481 Time taken: 1.107 seconds, Fetched: 2 row(s)

            Parquet with Avro

            Let’s see verify our Parquet schema now that it is converted; note that the schema still refers to Avro:

            $ hadoop parquet.tools.Main schema outputdir/part-m-00000.snappy.parquet message com.miguno.avro.Tweet { required binary username (UTF8); required binary tweet (UTF8); required int64 timestamp; } $ hadoop parquet.tools.Main meta outputdir/part-m-00000.snappy.parquet creator: parquet-mr extra: avro.schema = {"type":"record","name":"Tweet","namespace" file schema: com.miguno.avro.Tweet ------------------------------------------------------ username: REQUIRED BINARY O:UTF8 R:0 D:0 tweet: REQUIRED BINARY O:UTF8 R:0 D:0 timestamp: REQUIRED INT64 R:0 D:0 row group 1: RC:2 TS:297 ---------------------------------------------------------username: BINARY SNAPPY DO:0 FPO:4 SZ:67/65/0.97 VC:2 ENC:PLAIN,BIT_PACKED tweet: BINARY SNAPPY DO:0 FPO:71 SZ:176/175/0.99 VC:2 ENC:PLAIN timestamp: INT64 SNAPPY DO:0 FPO:247 SZ:59/57/0.97 VC:2 ENC:PLAIN,BIT_PACKED $

That concludes our exercise! Let me know if additional questions.


Categories: Hadoop

How-to: Build Re-usable Spark Programs using Spark Shell and Maven

Cloudera Blog - Tue, 03/17/2015 - 14:57

Set up your own, or even a shared, environment for doing interactive analysis of time-series data.

Although software engineering offers several methods and approaches to produce robust and reliable components, a more lightweight and flexible approach is required for data analysts—who do not build “products” per se but still need high-quality tools and components. Thus, recently, I tried to find a way to re-use existing libraries and datasets stored already in HDFS with Apache Spark.

The use case involved information flow analysis based on time series and network data. In this use case, all measured data (primary data) is stored in time series buckets, which are Hadoop SequenceFiles with keys of type Text and values of type VectorWritable (from Apache Mahout 0.9). In my testing, I found the Spark shell to be a useful tool for doing interactive data analysis for this purpose, especially since the code involved can be modularized, re-used, and even shared.

In this post, you’ll learn how to set up your own, or even a shared, environment for doing interactive data analysis of time series within the Spark shell. Instead of developing an application, you will use Scala code snippets and third-party libraries to create reusable Spark modules.

What is a Spark Module?

Using existing Java classes inside the Spark shell requires a solid deployment procedure and some dependency management. In addition to the Scala Simple Build tool (sbt), Apache Maven is really useful, too.

Figure 1: For simple and reliable usage of Java classes and complete third-party libraries, we define a Spark Module as a self-contained artifact created by Maven. This module can easily be shared by multiple users.

For this use case, you will need to create a single jar file containing all dependencies. In some cases, it is also really helpful to provide some Library wrapper tools. Such Helper classes should be well tested and documented. That way, you can achieve a kind of decoupling between data analysis and software development tasks.

Next, let’s go through the steps of creating this artifact.

Set Up a New Maven Project

First, confirm you have Java 1.7 and Maven 3 installed. Create a new directory for your projects and use Maven to prepare a project directory.

$> mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DgroupId=org.etosha -DartifactId=infodynamics -Dfilter=org.apache.maven.archetypes:maven-archetype-quickstart

Maven will ask:

Choose archetype: 1: remote -> org.apache.maven.archetypes:maven-archetype-quickstart (An archetype which contains a sample Maven project.) Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): 1: Choose org.apache.maven.archetypes:maven-archetype-quickstart version: 1: 1.0-alpha-1 2: 1.0-alpha-2 3: 1.0-alpha-3 4: 1.0-alpha-4 5: 1.0 6: 1.1

Select (6) to work with Spark 1.1 locally or another number according to the settings for your cluster.

Now, add some dependencies to the automatically generated POM file.

<dependency> <groupId>org.apache.mahout</groupId> <artifactId>mahout-math</artifactId> <version>0.9</version> <scope>provided</scope> <!-- use scope jar to ship it as part of your module --> </dependency> <dependency> <groupId>org.apache.mahout</groupId> <artifactId>mahout-core</artifactId> <version>0.9</version> <scope>provided</scope> <!-- use scope jar to ship it as part of your module --> </dependency>

Mahout 0.9 libraries are also dependencies of Spark so you will need to add the scope “provided” to the dependency section—otherwise, Maven will load the library and all classes will be added to your final single jar file. (As our time-series buckets are SequenceFiles and contain objects of type VectorWritable, they require this version of Mahout.)

Another reason to package third-party libraries is for creating charts inside the Spark shell. If you have Gnuplot, it is really easy to plot results with the Scalaplot library. Just add this dependency definition to your pom.xml file and you are ready to plot:

<dependency> <groupId>org.sameersingh.scalaplot</groupId> <artifactId>scalaplot</artifactId> <version>0.0.3</version> </dependency>

In this specific scenario the plan is to do some interactive time-series analysis within the Spark shell. First, you’ll want to evaluate the datasets and algorithms. You have to learn more about the domain before a custom application can be built and deployed. Finally, you can use Apache Oozie actions to execute the code but even in this case all third-party libraries have to be available as one artifact.

It is worthwhile to invest some minutes in building such a single jar file—especially for projects that are more than just a hack—with all dependencies and to share this artifact among all the data scientists in your group.

But what about libraries that are not available in Maven Central–such as those on Sourceforge or Google Code?

Download and Deploy a Third-Party-Library as Part of a Spark Module

You’ll need to prepare a location for all third-party libraries that are not available via Maven Central but are required in this particular project.

$> mkdir libs $> cd libs

Now download the required artifacts, e.g. the JIDT library from Google Code, and decompress the zip file:

$> wget http://information-dynamics-toolkit.googlecode.com/files/infodynamics-jar-0.1.4.zip $> unzip infodynamics-jar-0.1.4.zip

Maven can deploy the artifact for you using the mvn deploy:deploy-file goal:

$> mvn deploy:deploy-file \ -Durl=file:///home/training/.m2/repository \ -Dfile=libs/infodynamics.jar \ -DgroupId=org.etosha \ -DartifactId=EXT.infodynamics \ -Dpackaging=jar -Dversion=0.1.4

Now, you are ready to add this locally available library to the dependencies section of the POM file of the new Spark Module project:

<dependency> <groupId>org.etosha</groupId> <artifactId>EXT.infodynamics</artifactId> <version>0.1.4</version> </dependency>

The next step is to add the Maven Assembly Plugin to the plugins-section in the pom.xml file. It manages the merge procedure for all available JAR files during the build.

<build> <pluginManagement> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>MyMAINClass</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </pluginManagement> </build>

Use the above build snippet and place it inside the project section.

Now you are ready to run the Maven build.

$> mvn clean assembly:single

The result will be a single JAR file with defined libraries built in. The file is located in the target directory. As a next step, run the Spark shell and test the settings.

Run and Test the Single-JAR Spark-Module

To run Spark in interactive mode via Spark shell, just define a variable with the name ADD_JARS. If more than one jar file should be added, use a comma-separated list of paths. Now run the Spark shell with this command:

$> ADD_JARS=target/infodynamics-SNAPHOT-1.0.jar spark-shell

A fast validation can be done via the Web UI of the Spark shell application. It is available on port 4040, so open this URL http://localhost:4040/environment/ in a browser for validation.

Figure 2: Validation of Spark environment settings. Jar files that are available to the Spark nodes is shown in the marked field. One has to specify all additional paths in the property spark.jars.

Another test can now be done inside the Spark shell: just import some of the required Java classes, such as the MatrixUtils, from the third-party library. You just have to type:

scala> import infodynamics.utils.MatrixUtils;

At this point, you may well wonder how to save your Scala code that was entered into the Spark shell. After a successful interactive session, you can simply extract your input from the Spark shell history. The Spark shell logs all commands in a file called .spark-history in the user’s home directory. Within a Linux terminal, you run the tail command to conserve the latest commands before you go on.

$> tail -n 5 .spark_history > mySession1.scala

This command allows us to conserve the commands in a simple reusable script or as a base for further development in an IDE. Now, you can run this script file containing your Scala functions and custom code just by using the :load command. Inside the Spark shell you enter:

scala> :load mySession1.scala

And don’t forget to share your code! If you want to publish this module via Github, you can quickly follow the instructions here.

Because visual investigation is an important time saver, let’s add the scalaplot library to this module. Now you can easily create some simple charts from the variables stored in the Spark shell. Because this post is not about RDDs and working with large datasets but rather about preparing the stage, follow the steps from the scalaplot documentation to plot a simple sine wave.

scala> import org.sameersingh.scalaplot.Implicits._ scala> val x = 0.0 until 2.0 * math.Pi by 0.1 scala> output(GUI, plot(x ->(math.sin(_), math.cos(_))))

If your system shows a window with two waves now and no error messages appear, you are done for today.

Congratulations, the Spark shell is now aware of your project libraries, including the plotting tools and the ”legacy” libraries containing the data types used in your SequenceFiles, all bundled in your first Spark module!


In this post, you learned how to manage and use external dependencies (especially to Java libraries) and project specific artifacts in the Spark shell. Now it is really easy to share and distribute the modules within your data analyst working group.

Mirko Kämpf is the lead instructor for the Cloudera Administrator Training for Apache Hadoop for Cloudera University.

Categories: Hadoop

Exactly-once Spark Streaming from Apache Kafka

Cloudera Blog - Mon, 03/16/2015 - 14:39

Thanks to Cody Koeninger, Senior Software Engineer at Kixer, for the guest post below about Apache Kafka integration points in Apache Spark 1.3. Spark 1.3 will ship in CDH 5.4.

The new release of Apache Spark, 1.3, includes new experimental RDD and DStream implementations for reading data from Apache Kafka. As the primary author of those features, I’d like to explain their implementation and usage. You may be interested if you would benefit from:

  • More uniform usage of Spark cluster resources when consuming from Kafka
  • Control of message delivery semantics
  • Delivery guarantees without reliance on a write-ahead log in HDFS
  • Access to message metadata

I’ll assume you’re familiar with the Spark Streaming docs and Kafka docs. All code examples are in Scala, but there are Java-friendly methods in the API.

Basic Usage

The new API for both Kafka RDD and DStream is in the spark-streaming-kafka artifact.

SBT dependency:

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.3.0"

Maven dependency:

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.3.0</version> </dependency>

To read from Kafka in a Spark Streaming job, use KafkaUtils.createDirectStream:

import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils val ssc = new StreamingContext(new SparkConf, Seconds(60)) // hostname:port for Kafka brokers, not Zookeeper val kafkaParams = Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092") val topics = Set("sometopic", "anothertopic") val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics)

The call to createDirectStream returns a stream of tuples formed from each Kafka message’s key and value. The exposed return type is InputDStream[(K, V)], where K and V in this case are both String. The private implementation is DirectKafkaInputDStream. There are other overloads of createDirectStream that allow you to access message metadata, and to specify the exact per-topic-and-partition starting offsets.

To read from Kafka in a non-streaming Spark job, use KafkaUtils.createRDD:

import kafka.serializer.StringDecoder import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange} val sc = new SparkContext(new SparkConf) // hostname:port for Kafka brokers, not Zookeeper val kafkaParams = Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092") val offsetRanges = Array( OffsetRange("sometopic", 0, 110, 220), OffsetRange("sometopic", 1, 100, 313), OffsetRange("anothertopic", 0, 456, 789) ) val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder]( sc, kafkaParams, offsetRanges)

The call to createRDD returns a single RDD of (key, value) tuples for each Kafka message in the specified batch of offset ranges. The exposed return type is RDD[(K, V)], the private implementation is KafkaRDD. There are other overloads of createRDD that allow you to access message metadata, and to specify the current per-topic-and-partition Kafka leaders.


DirectKafkaInputDStream is a stream of batches. Each batch corresponds to a KafkaRDD. Each partition of the KafkaRDD corresponds to an OffsetRange. Most of this implementation is private, but it’s still useful to understand.


An OffsetRange represents the lower and upper boundaries for a particular sequence of messages in a given Kafka topic and partition. The following data structure:

OffsetRange("visits", 2, 300, 310)

identifies the 10 messages from offset 300 (inclusive) until offset 310 (exclusive) in partition 2 of the “visits” topic. Note that it does not actually contain the contents of the messages, it’s just a way of identifying the range.

Also note that because Kafka ordering is only defined on a per-partition basis, the messages referred to by

OffsetRange("visits", 3, 300, 310)

may be from a completely different time period; even though the offsets are the same as above, the partition is different.


Recall that an RDD is defined by:

  • A method to divide the work into partitions (getPartitions).
  • A method to do the work for a given partition (compute).
  • A list of parent RDDs. KafkaRDD is an input, not a transformation, so it has no parents.
  • Optionally, a partitioner defining how keys are hashed. KafkaRDD doesn’t define one.
  • Optionally, a list of preferred hosts for a given partition, in order to push computation to where the data is (getPreferredLocations).

The KafkaRDD constructor takes an array of OffsetRanges and a map with the current leader host and port for each Kafka topic and partition. The reason for the separation of leader info is to allow for the KafkaUtils.createRDD convenience constructor that doesn’t require you to know the leaders. In that case, createRDD will do the Kafka API metadata calls necessary to find the current leaders, using the list of hosts specified in metadata.broker.list as the initial points of contact. That inital lookup will happen once, in the Spark driver process.

The getPartitions method of KafkaRDD takes each OffsetRange in the array and turns it into an RDD partition by adding the leader’s host and port info. The important thing to notice here is there is a 1:1 correspondence between Kafka partition and RDD partition. This means the degree of Spark parallelism (at least for reading messages) will be directly tied to the degree of Kafka parallelism.

The getPreferredLocations method uses the Kafka leader for the given partition as the preferred host. I don’t run my Spark executors on the same hosts as Kafka, so if you do, let me know how this works out for you.

The compute method runs in the Spark executor processes. It uses a Kafka SimpleConsumer to connect to the leader for the given topic and partition, then makes repeated fetch requests to read messages for the specified range of offsets.

Each message is converted using the messageHandler argument to the constructor. messageHandler is a function from Kafka MessageAndMetadata to a user-defined type, with the default being a tuple of key and value. In most cases, it’s more efficient to access topic and offset metadata on a per-partition basis (see the discussion of HasOffsetRanges below), but if you really need to associate each message with its offset, you can do so.

The key point to notice about compute is that, because offset ranges are defined in advance on the driver, then read directly from Kafka by executors, the messages returned by a particular KafkaRDD are deterministic. There is no important state maintained on the executors, and no notion of committing read offsets to Apache ZooKeeper, as there is with prior solutions that used the Kafka high-level consumer.

Because the compute operation is deterministic, it is in general safe to re-try a task if it fails. If a Kafka leader is lost, for instance, the compute method will just sleep for the amount of time defined by the refresh.leader.backoff.ms Kafka param, then fail the task and let the normal Spark task retry mechanism handle it. On subsequent attempts after the first, the new leader will be looked up on the executor as part of the compute method.


The KafkaRDD returned by KafkaUtils.createRDD is usable in batch jobs if you have existing code to obtain and manage offsets. In most cases however, you’ll probably be using KafkaUtils.createDirectStream, which returns a DirectKafkaInputDStream. Similar to an RDD, a DStream is defined by:

  • A list of parent DStreams. Again, this is an input DStream, not a transformation, so it has no parents.
  • A time interval at which the stream will generate batches. This stream uses the interval of the streaming context.
  • A method to generate an RDD for a given time interval (compute)

The compute method runs on the driver. It connects to the leader for each topic and partition, not to read messages, but just to get the latest available offset. It then defines a KafkaRDD with offset ranges spanning from the ending point of the last batch until the latest leader offsets.

To define the starting point of the very first batch, you can either specify exact offsets per TopicAndPartition, or use the Kafka parameter auto.offset.reset, which may be set to “largest” or “smallest” (defaults to “largest”). For rate limiting, you can use the Spark configuration variable spark.streaming.kafka.maxRatePerPartition to set the maximum number of messages per partition per batch.

Once the KafkaRDD for a given time interval is defined, it executes exactly as described above for the batch usage case. Unlike prior Kafka DStream implementations, there is no long-running receiver task that occupies a core per stream regardless of what the message volume is. For our use cases at Kixer, it’s common to have important but low-volume topics in the same job as high-volume topics. With the direct stream, the low-volume partitions result in smaller tasks that finish quickly and free up that node to process other partitions in the batch. It’s a pretty big win to have uniform cluster usage while still keeping topics logically separate.

A significant difference from the batch use case is that there is some important state that varies over time, namely the offset ranges generated at each time interval. Executor or Kafka leader failure isn’t a big deal, as discussed above, but if the driver fails, offset ranges will be lost, unless stored somewhere. I’ll discuss this in more detail under Delivery Semantics below, but you basically have three choices:

  1. Don’t worry about it if you don’t care about lost or duplicated messages, and just restart the stream from the earliest or latest offset.
  2. Checkpoint the stream, in which case the offset ranges (not the messages, just the offset range definitions) will be stored in the checkpoint.
  3. Store the offset ranges yourself, and provide the correct starting offsets when restarting the stream.

Again, no consumer offsets are stored in ZooKeeper. If you want interop with existing Kafka monitoring tools that talk to ZK directly, you’ll need to store the offsets into ZK yourself (this doesn’t mean it needs to be your system of record for offsets, you can just duplicate them there).

Note that because Kafka is being treated as a durable store of messages, not a transient network source, you don’t need to duplicate messages into HDFS for error recovery. This design does have some implications, however. The first is that you can’t read messages that no longer exist in Kafka, so make sure your retention is adequate. The second is that you can’t read messages that don’t exist in Kafka yet. To put it another way, the consumers on the executors aren’t polling for new messages, the driver is just periodically checking with the leaders at every batch interval, so there is some inherent latency.


One other implementation detail is a public interface, HasOffsetRanges, with a single method returning an array of OffsetRange. KafkaRDD implements this interface, allowing you to obtain topic and offset information on a per-partition basis.

val stream = KafkaUtils.createDirectStream(...) ... stream.foreachRDD { rdd => // Cast the rdd to an interface that lets us get a collection of offset ranges val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.mapPartitionsWithIndex { (i, iter) => // index to get the correct offset range for the rdd partition we're working on val osr: OffsetRange = offsets(i) // get any needed data from the offset range val topic = osr.topic val kafkaPartitionId = osr.partition val begin = osr.fromOffset val end = osr.untilOffset ...

The reason for this layer of indirection is because the static type used by DStream methods like foreachRDD and transform is just RDD, not the type of the underlying (and in this case, private) implementation. Because the DStream returned by createDirectStream generates batches of KafkaRDD, you can safely cast to HasOffsetRanges. Also note that because of the 1:1 correspondence between offset ranges and rdd partitions, the indexes of the rdd partitions correspond to the indexes into the array returned by offsetRanges.

Delivery Semantics

First, understand the Kafka docs on delivery semantics. If you’ve already read them, go read them again. In short: consumer delivery semantics are up to you, not Kafka.

Second, understand that Spark does not guarantee exactly-once semantics for output actions. When the Spark streaming guide talks about exactly-once, it’s only referring to a given item in an RDD being included in a calculated value once, in a purely functional sense. Any side-effecting output operations (i.e. anything you do in foreachRDD to save the result) may be repeated, because any stage of the process might fail and be retried.

Third, understand that Spark checkpoints may not be recoverable, for instance in cases where you need to change the application code in order to get the stream restarted. This situation may improve by 1.4, but be aware that it is an issue. I’ve been bitten by it before, you may be too. Any place I mention “checkpoint the stream” as an option, consider the risk involved. Also note that any windowing transformations are going to rely on checkpointing, anyway.

Finally, I’ll repeat that any semantics beyond at-most-once require that you have sufficient log retention in Kafka. If you’re seeing things like OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka storage, not because something’s wrong with Spark or Kafka.

Given all that, how do you obtain the equivalent of the semantics you want?


This could be useful in cases where you’re sending results to something that isn’t a system of record, you don’t want duplicates, and it’s not worth the hassle of ensuring that messages don’t get lost. An example might be sending summary statistics over UDP, since it’s an unreliable protocol to begin with.

To get at-most-once semantics, do all of the following:

  1. Set spark.task.maxFailures to 1, so the job dies as soon as a task fails.
  2. Make sure spark.speculation is false (the default), so multiple copies of tasks don’t get speculatively run.
  3. When the job dies, start the stream back up using the Kafka param auto.offset.reset set to “largest”, so it will skip to the current end of the log.

This will mean you lose messages on restart, but at least they shouldn’t get replayed. Probably. Test this carefully if it’s actually important to you that a message never gets repeated, because it’s not a common use case, and I’m not providing example code for it.


You’re okay with duplicate messages, but not okay with losing messages. An example of this might be sending internal email alerts on relatively rare occurrences in the stream. Getting duplicate critical alerts in a short time frame is much better than not getting them at all.

Basic options here are either:

  • Checkpoint the stream, or
  • Restart the job with auto.offset.reset set to smallest. This will replay the whole log from the beginning of your retention, so you’d better have relatively short retention or really be ok with duplicate messages.

Checkpointing the stream serves as the basis of the next option, so see the example code for it.

Exactly-once using idempotent writes

Idempotent writes make duplicate messages safe, turning at-least-once into the equivalent of exactly-once. The typical way of doing this is by having a unique key of some kind (either embedded in the message, or using topic/partition/offset as the key), and storing the results according to that key. Relying on a per-message unique key means this is useful for transforming or filtering individually valuable messages, less so for aggregating multiple messages.

There’s a complete sample of this idea at IdempotentExample.scala. It’s using Postgres for the sake of consistency with the next example, but any storage system that allows for unique keys could be used.

The important points here are that the schema is set up with a unique key and a rule to allow for no-op duplicate inserts. For this example, the message body is being used as the unique key, but any appropriate key could be used.

stream.foreachRDD { rdd => rdd.foreachPartition { iter => // make sure connection pool is set up on the executor before writing SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) iter.foreach { case (key, msg) => DB.autoCommit { implicit session => // the unique key for idempotency is just the text of the message itself, for example purposes sql"insert into idem_data(msg) values (${msg})".update.apply } } } }

In the case of a failure, the above output action can safely be retried. Checkpointing the stream ensures that offset ranges are saved as they are generated. Checkpointing is accomplished in the usual way, by defining a function that configures the streaming context (ssc) and sets up the stream, then calling


before returning the ssc. See the Streaming Guide for more details.

Exactly-once using transactional writes

For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you’re careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics, and is straightforward to use even for aggregations.

TransactionalExample.scala is a complete Spark job implementing this idea. It’s using Postgres, but any data store that has transactional semantics could be used.

The first important point is that the stream is started using the last successfully committed offsets as the beginning point. This allows for failure recovery:

// begin from the the offsets committed to the database val fromOffsets = DB.readOnly { implicit session => sql"select topic, part, off from txn_offsets". map { resultSet => TopicAndPartition(resultSet.string(1), resultSet.int(2)) -> resultSet.long(3) }.list.apply().toMap } val stream: InputDStream[Long] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Long]( ssc, kafkaParams, fromOffsets, // we're just going to count messages, don't care about the contents, so convert each message to a 1 (mmd: MessageAndMetadata[String, String]) => 1L)

For the very first time the job is run, the table can be pre-loaded with appropriate starting offsets.

The example accesses offset ranges on a per-partition basis, as mentioned in the discussion of HasOffsetRanges above. The important thing to notice about mapPartitionsWithIndex is that it’s a transformation, and there is no equivalent foreachPartitionWithIndex action. RDD transformations are generally lazy, so unless you add an output action of some kind, Spark will never schedule the job to actually do anything. Calling foreach on the RDD with an empty body is sufficient. Also, notice that some iterator methods, such as map, are lazy. If you’re setting up transient state, like a network or database connection, by the time the map is fully forced the connection may already be closed. In that case, be sure to instead use methods like foreach, that eagerly consume the iterator.

rdd.mapPartitionsWithIndex { (i, iter) => // set up some connection iter.foreach { // use the connection } // close the connection Iterator.empty }.foreach { // Without an action, the job won't get scheduled, so empty foreach to force it // This is a little awkward, but there is no foreachPartitionWithIndex method on rdds (_: Nothing) => () }

The final thing to notice about the example is that it’s important to ensure that saving the results and saving the offsets either both succeed, or both fail. Storing offsets should fail if the prior committed offset doesn’t equal the beginning of the current offset range; this prevents gaps or repeats. Kafka semantics ensure that there aren’t gaps in messages within a range of offsets (if you’re especially concerned, you could verify by comparing the size of the offset range to the number of messages).

// localTx is transactional, if metric update or offset update fails, neither will be committed DB.localTx { implicit session => // store metric data val metricRows = sql""" update txn_data set metric = metric + ${metric} where topic = ${osr.topic} """.update.apply() if (metricRows != 1) { throw new Exception("...") } // store offsets val offsetRows = sql""" update txn_offsets set off = ${osr.untilOffset} where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset} """.update.apply() if (offsetRows != 1) { throw new Exception("...") } }

The example code is throwing an exception, which will result in a transaction rollback. Other failure-handling strategies may be appropriate, as long as they result in a transaction rollback as well.

Future Improvements

Although this feature is considered experimental for Spark 1.3, the underlying KafkaRDD design has been in production at Kixer for months. It’s currently handling billions of messages per day, in batch sizes ranging from 2 seconds to 5 minutes. That being said, there are known areas for improvement (and probably a few unknown ones as well).

  • Connection pooling. Currently, Kafka consumer connections are created as needed; pooling should help efficiency. Hopefully this can be implemented in a way that integrates nicely with ongoing work towards a Kafka producer API in Spark.
  • Kafka metadata API. The class for interacting with Kafka is currently private, meaning you’ll need to duplicate some of that work if you want low-level access to Kafka metadata. This is partly because the Kafka consumer offset API is a moving target right now. If this code proves to be stable, it would be nice to have a user-facing API for interacting with Kafka metadata.
  • Batch generation policies. Right now, rate-limiting is the only tuning available for how the next batch in the stream is defined. We have some use cases that involve larger tweaks, such as a fixed time delay. A flexible way of defining batch generation policies might be useful.

If there are other improvements you can think of, please let me know.

Categories: Hadoop

How Testing Supports Production-Ready Security in Cloudera Search

Cloudera Blog - Fri, 03/13/2015 - 16:18

Security architecture is complex, but these testing strategies help Cloudera customers rely on production-ready results.

Among other things, good security requires user authentication and that authenticated users and services be granted access to those things (and only those things) that they’re authorized to use. Across Apache Hadoop and Apache Solr (which ships in CDH and powers Cloudera Search), authentication is accomplished using Kerberos and SPNego over HTTP and authorization is accomplished using Apache Sentry (the emerging standard for role-based fine grain access control, currently incubating at the ASF).

The interactions among Kerberos, Sentry, and different system configs, OSs, and environments are complicated, and for production-ready applications, they require a variety of tests to ensure that security works as expected. In this post, you’ll learn how Cloudera uses a range of testing methodologies to help ensure that security operations work as expected for Cloudera Search customers—many of which are in highly regulated industries.

Native Security in Solr

This diagram illustrates the execution cycle of a sample Solr request in a secure environment. Incoming HTTP requests must first complete Kerberos authentication. If authentication fails, the web server returns an HTTP “401 Unauthorized” error to the user, thereby restricting Solr access. If authentication succeeds, Solr forwards the request to Sentry for authorization. Sentry grants or denies access based on which user is requesting access, the request type, and the existing permissions defined for the user.

This feature, called index-level security, provides control over collections using QUERY, UPDATE, and ALL permissions. If authorization fails, Solr returns an HTTP “401 Unauthorized” error. Otherwise, the request is processed by Solr.

In addition to index-level security, Solr supports document-level security via Sentry, which enforces the fine-grain access control of which Solr documents can be viewed by which users.

Testing Types

There are two primary sets of test cases for validating Sentry-Solr integration: Unit tests and integration tests. Integration tests are further divided into mini-cluster integration tests and real hardware integration tests.

Unit Testing Sentry and Solr

Unit tests run in a virtualized cluster spun up on the test JVM and help validate the functionality of a specific method or class. These tests are very important in isolating failures to a specific module before the error touches multiple layers (modules), which makes it hard to debug. Unit tests are important because they provide good code coverage and help catch bugs sooner in the development cycle. Cloudera runs the entire suite of unit tests before deploying every new project build. This helps to ensure that there are no regressions from recent check-ins and helps verify code stability.

Integration Testing for Sentry + Solr

Integration tests evaluate end-to-end functionality of the product, mainly to mimic the end user scenarios. These tests are very important in verifying the overall functionality of the system, ranging from integration of multiple sub-modules or inter-component interaction (Sentry <-> Solr) and to test all these pieces at one place.

There are two places where integration tests are run: One is in the mini-cluster spun up in the JVM, and the other is on real distributed clusters. Both methods have advantages: Issues found using tests on a single JVM can be easier to debug, whereas end-to-end user scenarios run on real distributed clusters may identify issues that would not manifest on a single JVM.

Integration Testing in a Single JVM

Sentry, being a critical piece in security, requires exhaustive test coverage across all possible user scenarios to avoid corner cases that bypass security. This characteristic requires running SolrCloud (multiple shards and replicas) and Sentry in a single environment. Solr has an existing test framework for testing SolrCloud features (see AbstractFullDistribZkTestBase), but using it requires pulling in the entire Solr test hierarchy, which may not be required. The other downside of this approach is that Solr dependencies may conflict with other projects. To avoid this issue and to make testing more pluggable, Cloudera developed the MiniSolrCloudCluster (SOLR-5865) test framework, which separates out the SolrCloud functionality and thus makes it easier for new projects to use and test SolrCloud in a single JVM.

The tests developed on MiniSolrCloudCluster framework cover extensive use cases by auto-generating users with all possible permissions and sending out requests as a particular user to make sure the Solr responses are being matched with the expected output. For example, for three existing access specifiers QUERY, UPDATE, ALL, the test framework generates eight possible users. Each user would have one of the eight possible permissions and each user would issue both QUERY and UPDATE requests to Solr. The tests then compare to verify that Sentry’s granting access or unauthorized error matches the expected response. This framework provides us with a good understanding and baseline of Sentry behavior in a real Solr cluster scenario.

Integration Testing in a Distributed Cluster

One caveat to the above approach is that neither MiniSolrCloudCluster nor the existing Solr test-framework  support Kerberos authentication at the time of this writing. To address this issue, Cloudera developed a non-Kerberos authentication layer that modifies the request to look like it successfully passed the Kerberos authentication. Although this approach enables us to test Sentry extensively with Solr, it also bypasses testing Kerberos authentication, a critical element of our security solution. In a production deployment, the end-user must still use Kerberos to log into the system and run commands via Sentry. This critical piece was covered in our integration testing on real distributed clusters.

The other main difference between real clusters and MiniSolrCloudCluster is that with the latter, all the required processes like Sentry, Solr, and Apache ZooKeeper run in a single JVM. Although this approach is good for testing locally, we would still want to have a clear picture of how the system behaves if all the processes have their own JVMs and completely distributed across multiple nodes in the cluster.

The next and final line of defense is to run the integration tests on real clusters of varied sizes, configurations, and OSs. Running the suite of integration tests on real clusters has many clear advantages, like the ability to:

  • Catch packaging issues (like errors with RPMs, DEBs and etc) that fall out when building the product
  • Mimic the end-user deployment scenario (running on wide range of OSs)
  • Cover use cases not covered by previous rounds of testing (running the tests in fully secure Kerberos environment that hasn’t been covered by MiniSolrCloudCluster)
  • Scale out the cluster to an arbitrary size and check the performance implications
  • Run the system under different configurations, such as NameNode HA or vanilla (simple secure) systems
  • Induce failure scenarios and monitor how the system recovers
  • Run longer-running tests that ingest lots of data (as memory is a constraint when running MiniSolrCloudCluster in JVM)

Testing on real clusters is done by first creating a large number of Linux users, groups, Kerberos principals, and keytabs for the above users, which is completely missing in MiniSolrCloudCluster. We then define a Sentry policy file with roles to groups and privileges to roles mapping.

We run a subset of the most common end-to-end scenarios, as the MiniSolrCloudCluster already runs the exhaustive suite of tests. These tests are run against the real cluster, which includes “kinit”ing as user and sending out a QUERY/UPDATE request to Solr. In this cycle, the user first authenticates with Solr using Kerberos, followed by Sentry scrutinizing the incoming request based on the user authenticated.

Because MiniSolrCloudCluster runs in a JVM, it has to be created at the beginning of every test cycle, which results in loss of state. However, in real clusters the state is preserved as the clusters are longer running. This approach provides us with an environment for running the same set of tests multiple times and to notice the problems over time (such as memory leaks, longer GC pauses, and average turnaround time of a single request).


You should now understand the different levels of validation done by Cloudera for Search and Sentry integration. We welcome any other suggestions or contributions to the existing Sentry test suite. The source code can be found here.

Cloudera Search and Sentry are available for download as part of CDH and comes with extensive documentation. If you have any questions, please contact us at the Cloudera Search Forum or Search mailing list.

Vamsee Yarlagadda is a Software Engineer at Cloudera and an Apache Sentry (incubating) committer.

Categories: Hadoop

Understanding HDFS Recovery Processes (Part 2)

Cloudera Blog - Wed, 03/11/2015 - 16:12

Having a good grasp of HDFS recovery processes is important when running or moving toward production-ready Apache Hadoop. In the conclusion to this two-part post, pipeline recovery is explained.

An important design requirement of HDFS is to ensure continuous and correct operations that support production deployments. For that reason, it’s important for operators to understand how HDFS recovery processes work. In Part 1 of this post, we looked at lease recovery and block recovery. Now, in Part 2, we explore pipeline recovery.

All three recovery processes are essential for HDFS fault tolerance. Together, they help to ensure that writes are durable and consistent in HDFS, even in the presence of network and node failures.


In HDFS, files are divided into blocks, and file access follows multi-reader, single-writer semantics. To meet the fault-tolerance requirement, multiple replicas of a block are stored on different DataNodes. The number of replicas is called the replication factor. When a new file block is created, or an existing file is opened for append, the HDFS write operation creates a pipeline of DataNodes to receive and store the replicas (the replication factor generally determines the number of DataNodes in the pipeline). Subsequent writes to that block go through the pipeline (Figure 1).

Figure 1. HDFS Write Pipeline

For read operations the client chooses one of the DataNodes holding copies of the block and requests a data transfer from it.

For a deeper dive into this background information, read Part 1 of this post.

Pipeline Recovery The Write Pipeline

When an HDFS client writes to file, the data is written as sequential blocks. To write or construct a block, HDFS breaks the block into packets (not actually network packets but rather messages; the term packets refers to the class which embodies these messages), and propagates them to the DataNodes in the write pipeline, as shown in Figure 2.

Figure 2. HDFS Write Pipeline Stages

There are three stages of a write pipeline:

  1. Pipeline setup. The client sends a Write_Block request along the pipeline and the last DataNode sends an acknowledgement back. After receiving the acknowledgement, the pipeline is ready for writing.
  2. Data streaming. The data is sent through the pipeline in packets. The client buffers the data until a packet is filled up, and then sends the packet to the pipeline. If the client calls hflush(), then even if a packet is not full, it will nevertheless be sent to the pipeline and the next packet will not be sent until the acknowledgement of the previous hflush’ed packet is received by the client.
  3. Close (finalize the replica and shutdown the pipeline). The client waits until all packets have been acknowledged and then sends a close request. All DataNodes in the pipeline change the corresponding replica into the FINALIZED state and report back to the NameNode. The NameNode then changes the block’s state to COMPLETE if at least the configured minimum replication number of DataNodes reported a FINALIZED state of their corresponding replicas.
Pipeline Recovery

Pipeline recovery is initiated when one or more DataNodes in the pipeline encounter an error in any of the three stages while a block is being written.

Recovery from Pipeline Setup Failure

  1. If the pipeline was created for a new block, the client abandons the block and asks the NameNode for a new block and a new list of DataNodes. The pipeline is reinitialized for the new block.
  2. If the pipeline was created to append to a block, the client rebuilds the pipeline with the remaining DataNodes and increments the block’s generation stamp.

Recovery from Data Streaming Failure

  1. When a DataNode in the pipeline detects an error (for example, a checksum error or a failure to write to disk), that DataNode takes itself out of the pipeline by closing up all TCP/IP connections. If the data is deemed not corrupted, it also writes buffered data to the relevant block and checksum (METADATA) files.
  2. When the client detects the failure, it stops sending data to the pipeline, and reconstructs a new pipeline using the remaining good DataNodes. As a result, all replicas of the block are bumped up to a new GS.
  3. The client resumes sending data packets with this new GS. If the data sent has already been received by some of the DataNodes, they just ignore the packet and pass it downstream in the pipeline.

Recovery from Close Failure

  1. When the client detects a failure in the close state, it rebuilds the pipeline with the remaining DataNodes. Each DataNode bumps up the block’s GS and finalizes the replica if it’s not finalized yet.

When one DataNode is bad, it removes itself from the pipeline. During the pipeline recovery process, the client may need to rebuild a new pipeline with the remaining DataNodes. (It may or may not replace bad DataNodes with new DataNodes, depending on the DataNode replacement policy described in the next section.) The replication monitor will take care of replicating the block to satisfy the configured replication factor.

DataNode Replacement Policy upon Failure

There are four configurable policies regarding whether to add additional DataNodes to replace the bad ones when setting up a pipeline for recovery with the remaining DataNodes:

  1. DISABLE: Disables DataNode replacement and throws an error (at the server); this acts like NEVER at the client.
  2. NEVER: Never replace a DataNode when a pipeline fails (generally not a desirable action).
  3. DEFAULT:  Replace based on the following conditions:
    1. Let r be the configured replication number.
    2. Let n be the number of existing replica datanodes.
    3. Add a new datanodeonly if r >= 3 and EITHER
    4.   (1) floor(r/2) >= n; OR
    5.   (2) r > n and the block is hflushed/appended.
  4. ALWAYS: Always add a new DataNode when an existing DataNode failed. This fails if a DataNode can’t be replaced.

To disable using any of these policies, you can set the following configuration property to false (the default is true):


When enabled, the default policy is DEFAULT. The following config property changes the policy:


When using DEFAULT or ALWAYS, if only one DataNode succeeds in the pipeline, the recovery will never succeed and client will not be able to perform the write. This problem is addressed with this configuration property:


which defaults to false. With the default setting, the client will keep trying until the specified policy is satisfied. When this property is set to true, even if the specified policy can’t be satisfied (for example, there is only one DataNode that succeeds in the pipeline, which is less than the policy requirement), the client will still be allowed to continue to write.

Some Solved Issues
  • HDFS-5016 details a deadlock scenario in pipeline recovery that causes DataNode to be marked dead (duplicates HDFS-3655 “Datanode recoverRbw could hang sometime” and HDFS-4851 “Deadlock in pipeline recovery”). Here’s what happens: when the recovery is ongoing, it causes some relevant threads to wait for each other, thus deadlocking. Since the FSDataset lock is held in this deadlock, the heartbeat thread and data transceiver threads are blocked waiting on FSDataset lock. The solution is to introduce a timeout mechanism to break the deadlock.
  • HDFS-4882 reports a case that the NameNode’s LeaseManager keep looping forever in checkLeases. When the hard limit expires, LeaseManager tries to recover lease, if the second-to-last block is COMMITTED, and the last block is COMPLETE, internalReleaseLease() would return without releasing the lease, and the LeaseManager will keep trying to release the same lease, thus an infinite loop. Since the FSNamesystem.writeLock is hold in the loop, it essentially makes the NameNode unresponsive. The fix is to only try releasing a lease periodically rather than continuously.
  • HDFS-5557 details a case in which write pipeline recovery for the last packet in the block may cause rejection of valid replicas because of incorrect GS recording when handling block report. The worst case is that all good replicas will be rejected and a bad one is accepted. In this case, the corresponding block will get completed, but the data cannot be read until the next full block report containing one of the valid replicas is received. The solution is to fix the GS recording.
  • HDFS-5558 reports a case that LeaseManager monitor thread can crash if the last block is complete but second-to-last block is not. If a file has its last and second-to-last block not in COMPLETE state and it is attempted to close the file, the last block may change to COMPLETE but the second-to-last one might not. If this condition lasts long and the file is abandoned, LeaseManager will try to recover the lease and do block recovery on the block. But internalReleaseLease() will fail with invalid cast exception with this kind of file. The solution is to ensure the second-to-last block is in COMPLETE state before closing the file.
Known Open Issues
  • With the introduction of dfs.client.block.write.replace-datanode-on-failure.best-effort, a client will be able to continue to write even if there is only one DataNode. When this happens, a block may have only one replica, and if anything happens to this single copy before it is replicated, data loss will occur. To alleviate the problem, HDFS-6867 proposes a background thread to do the pipeline recovery while the client is writing to the single replica.
  • HDFS-4504 details the case where DFSOutputStream#close doesn’t always release resources (such as leases). In some cases, DFSOutputStream#close can throw an IOException. One example is if there is a pipeline error and then pipeline recovery fails. Unfortunately, in this case, some of the resources used by the DFSOutputStreamare leaked. One particularly important resource is file leases.

    So, it’s possible for a long-lived HDFS client, such as Apache Flume, to write many blocks to a file but then fail to close it. However, the LeaseRenewer thread inside the client will continue to renew the lease for the “undead” file. Future attempts to close the file will just re-throw the previous exception, and no progress can be made by the client.

  • HDFS-6937 details a pipeline recovery issue due to checksum error. The data on the middle DataNode (assuming a replication factor of 3) is somehow corrupted, but not detected. The last DataNode found out the checksum error and takes itself out from the pipeline. The recovery process keeps trying to replace the last DataNode in the pipeline with a new one, as well as replicating the data from the middle DataNode to the new one. Each time the replication fails due to checksum error (because of the corrupted replica at the middle DataNode), and the new DataNode is marked as a bad and thrown away, even though it’s not really bad. Eventually the recovery fails after exhausting all the DataNodes.
  • HDFS-7342 reports a case that Lease Recovery can not succeed when the second-to-last block is COMMITTED and the last block is COMPLETE. One suggested solution is to force the the lease to be recovered, which is similar to how we handle when the last block is COMMITTED. One can see that HDFS-7342, HDFS-4882, HDFS-5558 are related in that the second-to-last block is in COMMITTED state. The subtlety of the issue is still under investigation currently.

Lease recovery, block recovery, and pipeline recovery are all essential for HDFS fault tolerance. Together, they insure that writes are durable and consistent in HDFS, even in the presence of network and node failures.

Hopefully, after reading these posts, you have a better understanding of when and why these processes are invoked, and what they do. If you are interested in learning more, you can read through some of the links including the design specification, JIRAs referenced here, or the relevant code.

Yongjun Zhang is a Software Engineer at Cloudera, and a Hadoop committer.

Categories: Hadoop

How-to: Tune Your Apache Spark Jobs (Part 1)

Cloudera Blog - Mon, 03/09/2015 - 16:29

Learn techniques for tuning your Apache Spark jobs for optimal efficiency.

(Editor’s note: Sandy presents on “Estimating Financial Risk with Spark” at Spark Summit East on March 18.)

When you write Apache Spark code and page through the public APIs, you come across words like transformation, action, and RDD. Understanding Spark at this level is vital for writing Spark programs. Similarly, when things start to fail, or when you venture into the web UI to try to understand why your application is taking so long, you’re confronted with a new vocabulary of words like job, stage, and task. Understanding Spark at this level is vital for writing good Spark programs, and of course by good, I mean fast. To write a Spark program that will execute efficiently, it is very, very helpful to understand Spark’s underlying execution model.

In this post, you’ll learn the basics of how Spark programs are actually executed on a cluster. Then, you’ll get some practical recommendations about what Spark’s execution model means for writing efficient programs.

How Spark Executes Your Program

A Spark application consists of a single driver process and a set of executor processes scattered across nodes on the cluster.

The driver is the process that is in charge of the high-level control flow of work that needs to be done. The executor processes are responsible for executing this work, in the form of tasks, as well as for storing any data that the user chooses to cache. Both the driver and the executors typically stick around for the entire time the application is running, although dynamic resource allocation changes that for the latter. A single executor has a number of slots for running tasks, and will run many concurrently throughout its lifetime. Deploying these processes on the cluster is up to the cluster manager in use (YARN, Mesos, or Spark Standalone), but the driver and executor themselves exist in every Spark application.

At the top of the execution hierarchy are jobs. Invoking an action inside a Spark application triggers the launch of a Spark job to fulfill it. To decide what this job looks like, Spark examines the graph of RDDs on which that action depends and formulates an execution plan. This plan starts with the farthest-back RDDs—that is, those that depend on no other RDDs or reference already-cached data–and culminates in the final RDD required to produce the action’s results.

The execution plan consists of assembling the job’s transformations into stages. A stage corresponds to a collection of tasks that all execute the same code, each on a different subset of the data. Each stage contains a sequence of transformations that can be completed without shuffling the full data.

What determines whether data needs to be shuffled? Recall that an RDD comprises a fixed number of partitions, each of which comprises a number of records. For the RDDs returned by so-called narrow transformations like map and filter, the records required to compute the records in a single partition reside in a single partition in the parent RDD. Each object is only dependent on a single object in the parent. Operations like coalesce can result in a task processing multiple input partitions, but the transformation is still considered narrow because the input records used to compute any single output record can still only reside in a limited subset of the partitions.

However, Spark also supports transformations with wide dependencies such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition may reside in many partitions of the parent RDD. All of the tuples with the same key must end up in the same partition, processed by the same task. To satisfy these operations, Spark must execute a shuffle, which transfers data around the cluster and results in a new stage with a new set of partitions.

For example, consider the following code:

sc.textFile("someFile.txt"). map(mapFunc). flatMap(flatMapFunc). filter(filterFunc). count()

It executes a single action, which depends on a sequence of transformations on an RDD derived from a text file. This code would execute in a single stage, because none of the outputs of these three operations depend on data that can come from different partitions than their inputs.

In contrast, this code finds how many times each character appears in all the words that appear more than 1,000 times in a text file.

val tokenized = sc.textFile(args(0)).flatMap(_.split(' ')) val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _) val filtered = wordCounts.filter(_._2 >= 1000) val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)). reduceByKey(_ + _) charCounts.collect()

This process would break down into three stages. The reduceByKey operations result in stage boundaries, because computing their outputs requires repartitioning the data by keys.

Here is a more complicated transformation graph including a join transformation with multiple dependencies.

The pink boxes show the resulting stage graph used to execute it.

At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by tasks in the child stage. Because they incur heavy disk and network I/O, stage boundaries can be expensive and should be avoided when possible. The number of data partitions in the parent stage may be different than the number of partitions in the child stage. Transformations that may trigger a stage boundary typically accept a numPartitions argument that determines how many partitions to split the data into in the child stage.

Just as the number of reducers is an important parameter in tuning MapReduce jobs, tuning the number of partitions at stage boundaries can often make or break an application’s performance. We’ll delve deeper into how to tune this number in a later section.

Picking the Right Operators

When trying to accomplish something with Spark, a developer can usually choose from many arrangements of actions and transformations that will produce the same results. However, not all these arrangements will result in the same performance: avoiding common pitfalls and picking the right arrangement can make a world of difference in an application’s performance. A few rules and insights will help you orient yourself when these choices come up.

Recent work in SPARK-5097 began stabilizing SchemaRDD, which will open up Spark’s Catalyst optimizer to programmers using Spark’s core APIs, allowing Spark to make some higher-level choices about which operators to use. When SchemaRDD becomes a stable component, users will be shielded from needing to make some of these decisions.

The primary goal when choosing an arrangement of operators is to reduce the number of shuffles and the amount of data shuffled. This is because shuffles are fairly expensive operations; all shuffle data must be written to disk and then transferred over the network. repartition , join, cogroup, and any of the *By or *ByKey transformations can result in shuffles. Not all these operations are equal, however, and a few of the most common performance pitfalls for novice Spark developers arise from picking the wrong one:

  • Avoid groupByKey when performing an associative reductive operation. For example, rdd.groupByKey().mapValues(_.sum) will produce the same results as rdd.reduceByKey(_ + _). However, the former will transfer the entire dataset across the network, while the latter will compute local sums for each key in each partition and combine those local sums into larger sums after shuffling.
  • Avoid reduceByKey When the input and output value types are different. For example, consider writing a transformation that finds all the unique strings corresponding to each key. One way would be to use map to transform each element into a Set and then combine the Sets with reduceByKey:
    rdd.map(kv => (kv._1, new Set[String]() + kv._2))   .reduceByKey(_ ++ _)

    This code results in tons of unnecessary object creation because a new set must be allocated for each record. It’s better to use aggregateByKey, which performs the map-side aggregation more efficiently:

    val zero = new collection.mutable.Set[String]() rdd.aggregateByKey(zero)( (set, v) => set += v, (set1, set2) => set1 ++= set2)

  • Avoid the flatMap-join-groupBy pattern. When two datasets are already grouped by key and you want to join them and keep them grouped, you can just use cogroup. That avoids all the overhead associated with unpacking and repacking the groups.
When Shuffles Don’t Happen

It’s also useful to be aware of the cases in which the above transformations will not result in shuffles. Spark knows to avoid a shuffle when a previous transformation has already partitioned the data according to the same partitioner. Consider the following flow:

rdd1 = someRdd.reduceByKey(...) rdd2 = someOtherRdd.reduceByKey(...) rdd3 = rdd1.join(rdd2)

Because no partitioner is passed to reduceByKey, the default partitioner will be used, resulting in rdd1 and rdd2 both hash-partitioned. These two reduceByKeys will result in two shuffles. If the RDDs have the same number of partitions, the join will require no additional shuffling. Because the RDDs are partitioned identically, the set of keys in any single partition of rdd1 can only show up in a single partition of rdd2. Therefore, the contents of any single output partition of rdd3 will depend only on the contents of a single partition in rdd1 and single partition in rdd2, and a third shuffle is not required.

For example, if someRdd has four partitions, someOtherRdd has two partitions, and both the reduceByKeys use three partitions, the set of tasks that execute would look like:

What if rdd1 and rdd2 use different partitioners or use the default (hash) partitioner with different numbers partitions?  In that case, only one of the rdds (the one with the fewer number of partitions) will need to be reshuffled for the join.

Same transformations, same inputs, different number of partitions:

One way to avoid shuffles when joining two datasets is to take advantage of broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor. A map transformation can then reference the hash table to do lookups.

When More Shuffles are Better

There is an occasional exception to the rule of minimizing the number of shuffles. An extra shuffle can be advantageous to performance when it increases parallelism. For example, if your data arrives in a few large unsplittable files, the partitioning dictated by the InputFormat might place large numbers of records in each partition, while not generating enough partitions to take advantage of all the available cores. In this case, invoking repartition with a high number of partitions (which will trigger a shuffle) after loading the data will allow the operations that come after it to leverage more of the cluster’s CPU.

Another instance of this exception can arise when using the reduce or aggregate action to aggregate data into the driver. When aggregating over a high number of partitions, the computation can quickly become bottlenecked on a single thread in the driver merging all the results together. To loosen the load on the driver, one can first use reduceByKey or aggregateByKey to carry out a round of distributed aggregation that divides the dataset into a smaller number of partitions. The values within each partition are merged with each other in parallel, before sending their results to the driver for a final round of aggregation. Take a look at treeReduce and treeAggregate for examples of how to do that. (Note that in 1.2, the most recent version at the time of this writing, these are marked as developer APIs, but SPARK-5430 seeks to add stable versions of them in core.)

This trick is especially useful when the aggregation is already grouped by a key. For example, consider an app that wants to count the occurrences of each word in a corpus and pull the results into the driver as a map.  One approach, which can be accomplished with the aggregate action, is to compute a local map at each partition and then merge the maps at the driver. The alternative approach, which can be accomplished with aggregateByKey, is to perform the count in a fully distributed way, and then simply collectAsMap the results to the driver.

Secondary Sort

Another important capability to be aware of is the repartitionAndSortWithinPartitions transformation. It’s a transformation that sounds arcane, but seems to come up in all sorts of strange situations. This transformation pushes sorting down into the shuffle machinery, where large amounts of data can be spilled efficiently and sorting can be combined with other operations.

For example, Apache Hive on Spark uses this transformation inside its join implementation. It also acts as a vital building block in the secondary sort pattern, in which you want to both group records by key and then, when iterating over the values that correspond to a key, have them show up in a particular order. This issue comes up in algorithms that need to group events by user and then analyze the events for each user based on the order they occurred in time. Taking advantage of repartitionAndSortWithinPartitions to do secondary sort currently requires a bit of legwork on the part of the user, but SPARK-3655 will simplify things vastly.


You should now have a good understanding of the basic factors in involved in creating a performance-efficient Spark program! In Part 2, we’ll cover tuning resource requests, parallelism, and data structures.

Sandy Ryza is a Data Scientist at Cloudera, an Apache Hadoop committer, and an Apache Spark contributor. He is a co-author of the O’Reilly Media book, Advanced Analytics with Spark.

Categories: Hadoop

This Month in the Ecosystem (February 2015)

Cloudera Blog - Thu, 03/05/2015 - 20:02

Welcome to our 17th edition of “This Month in the Ecosystem,” a digest of highlights from February 2015 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly). 

Wow, a ton of news for such a short month:

  • 4,500 people converged for the first Strata + Hadoop World San Jose, ever (with a special appearance by Barack Obama, no less). Slides and session video are available.
  • After years of development by a huge and diverse community, Apache HBase 1.0 was released. (Learn more about it at HBaseCon 2015.)
  • Apache Hive 1.0.0 (formerly 0.14.1) was also released by its community.
  • Cloudera announced formal support for Apache Kafka alongside CDH.
  • Pivotal and Hortonworks announced the Open Data Platform initiative, which Cloudera and MapR have declined to join.
  • Project Myriad, an integration of YARN and Mesos spearheaded by MapR and eBay, was proposed for the Apache Incubator.
  • A Kite SDK 1.0 was released. The Kite API is now stable!
  • A Hive-on-Spark beta was released by Cloudera. (Only HDFS, YARN, Apache ZooKeeper, and Apache Hive are supported thus far.)
  • Cloudera also announced a strategic partnership with Cask, and integration of the latter’s CDAP offering with Cloudera Enterprise.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

Calculating CVA with Apache Spark

Cloudera Blog - Wed, 03/04/2015 - 18:03

Thanks to Matthew Dixon, principal consultant at Quiota LLC and Professor of Analytics at the University of San Francisco, and Mohammad Zubair, Professor of Computer Science at Old Dominion University, for this guest post that demonstrates how to easily deploy exposure calculations on Apache Spark for in-memory analytics on scenario data.

Since the 2007 global financial crisis, financial institutions now more accurately measure the risks of over-the-counter (OTC) products. It is now standard practice for institutions to adjust derivative prices for the risk of the counter-party’s, or one’s own, default by means of credit or debit valuation adjustments (CVA/DVA).

Calculating the CVA of a portfolio typically requires Monte-Carlo simulation with a large number of scenarios. The computation and storage requirement for what-if scenario analysis grows significantly when the portfolio is large and contains a wide array of financial instruments across multiple asset classes. To handle that complexity, distributed computing platforms offer trade-offs across performance, flexibility, modularity, and maintainability of programming infrastructure.

For example, in this post, you will learn how to efficiently deploy exposure calculations on Apache Spark for in-memory analytics on scenario data. This example application exemplifies the flexibility, maintainability, and scalability provided by Spark. Applications with cyclic dataflow graphs, such as counter-party credit risk analytics, are well suited for Spark because the scenario data can be kept in memory for fast what-if scenario and statistical analysis. (To see another example of Spark’s benefits in the financial calculations area—in this case, for calculating VaR—see this post.)

Estimating CVA on Spark

The computational complexity of estimating the CVA is significant and beyond the capability of a single workstation. The number of calculations to estimate CVA of a portfolio is given by:

Number of Instruments × Number of time intervals × Number of scenarios

A calculation here refers to calculating the price of an instrument for a scenario at some time interval. Consider, for example, a CVA calculation on a portfolio of 10,000 instruments, with average maturity of 10 years, and 2,000 market scenarios generated every three months—resulting in 8 billion calculations. The time to price an instrument can vary depending on the instrument. If we assume on average that it takes 100 microseconds to price an instrument on a single workstation, it will take a total of 220 hours to perform all the calculations.

A numbers of banks are using customized parallel and distributed computing platforms to perform these calculations in a reasonable time. Often banks need to hold all the pricing of instruments in memory to calculate various statistics. This requirement results in a large memory consumption, thus the current proprietary solutions are expensive and hard to scale with respect to fault tolerance.

Alternatively, Spark can be programmed to calculate the CVA of a portfolio over a cluster of thousands of cheap commodity nodes using high-level languages such as Scala and Python, thus making it an attractive platform for prototyping and live risk estimates. The key benefit of using Spark over other distributed implementations such as MPI, OpenMP, and CUDA is that it allows the computations to scale to a large number of nodes reliably where failure of nodes is managed by the framework. Furthermore, Spark can hold all the price results of a large portfolio simulation in memory across thousands of nodes in support of calculating various statistics on demand. 

This benefit, along with the ease of implementation, comes at the expense of some performance loss. However, we can minimize this performance loss by using, say, the numpy/scipy package in Python that has been built using BLAS and LAPACK routines.

Implementing CVA in Python on Spark

For demonstration purposes we consider a portfolio of NI vanilla swaps with NT time intervals and NS simulations. The total number of price calculations then is given by NI × NT × NS. Our implementation on Spark is based on Python code that utilizes the QuantLib package to value a vanilla swap.

(Figure courtesy of Giovanni Cesari, UBS)

Next, we will briefly outline this computation on Spark.

1. Create a distributed collection (RDD), randArrayRDD, of random numbers of size NS × NT using the Apache Spark Python MLIB API:

sc = SparkContext(appName = 'CVA') randArrayRDD = RandomRDDs.normalVectorRDD(sc, NS, NT, numPartitions=NP, seed=1L)

2. Call a map function that processes the RDD in parallel and collect the pricing at the driver.

pmat = randArrayRDD.map(lambda p:(value_swap(p, NI))).collect() pmat = np.array(pmat) pmat = pmat.reshape((NS, NI, NT))

The map function works on a row of RDD of random numbers in two stages:

(a) Construct NT discount curves, crvVec ,one for each time interval using the single factor Hull-White short-rate model.

for iT in xrange(1,NT): crvDate=Dates[iT]; crvDates=[crvDate]+[crvDate+Period(k,Years) for k in xrange(1,NTenors)] crvDiscounts=[1.0]+[A(T[iT],T[iT]+k)*exp(-B(T[iT],T[iT]+k)*rvec[iT]) for k in xrange(1,NTenors)] crvVec[iT]=DiscountCurve(crvDates,crvDiscounts,Actual360(),TARGET())

(b) For each discount curve, value NI swaps at NT time intervals in the future to construct a price matrix, spmat, of size NI × NT for each simulation.

for iT in xrange(len(T)): Settings.instance().evaluationDate=Dates[iT] allDates= list(floatingSchedule) fixingdates=[index.fixingDate(floatingSchedule[iDate]) for iDate in xrange(len(allDates)) if index.fixingDate(floatingSchedule[iDate])<=Dates[iT]] if fixingdates: for date in fixingdates[:-1]: try:index.addFixing(date,0.0) except:pass try:index.addFixing(fixingdates[-1],rmean[iT]) except:pass discountTermStructure = RelinkableYieldTermStructureHandle() swapEngine = DiscountingSwapEngine(discountTermStructure) swap1.setPricingEngine(swapEngine) crv=crvVec[iT] discountTermStructure.linkTo(crv) forecastTermStructure.linkTo(crv) npvVec[nI][iT]=swap1.NPV()

This price matrix, pmat, has price distribution information for all swaps in the portfolio and can be used to calculate various exposure measures such as potential future exposure (PFE) and the expected positive exposure (EPE). The CVA can be estimated using the simulated estimate of the EPE.

EE=np.sum(npvMat, axis=1) EE=np.mean(EE,axis=0) sum=0 for i in xrange(NT-1): sum += 0.5*(EE[i]*crvToday.discount(T[i])+EE[i+1]*crvToday.discount(T[i+1]))*(exp(-S*T[i]/(1.0-R))-exp(-S*T[i+1]/(1.0-R))) CVA=(1.0-R)*sum

Note that if you do not want to preserve the price distribution for different swaps, you can simply aggregate prices of different swaps at each worker before sending it to the driver—thereby reducing the communication cost. To do that, modify step 2(b).

For each yield curve constructed earlier, value all NI swaps to create a price matrix of size NI × NT:

for nI in xrange(NI): fixedSchedule=Schedule(startDate, maturity,Period("6m"), TARGET(),ModifiedFollowing,ModifiedFollowing,DateGeneration.Forward, False) floatingSchedule=Schedule(startDate, maturity,Period("6m"),TARGET() ,ModifiedFollowing,ModifiedFollowing,DateGeneration.Forward, False) swap1=VanillaSwap(VanillaSwap.Receiver, 1000000,fixedSchedule,0.05 , Actual360(),floatingSchedule, index, 0,Actual360()) for iT in xrange(len(T)): Settings.instance().evaluationDate=Dates[iT] allDates=list(floatingSchedule) fixingdates=[index.fixingDate(floatingSchedule[iDate]) for iDate in xrange(len(allDates)) if index.fixingDate(floatingSchedule[iDate])<=Dates[iT]] if fixingdates: for date in fixingdates[:-1]: try:index.addFixing(date,0.0) except:pass try:index.addFixing(fixingdates[-1],rmean[iT]) except:pass discountTermStructure=RelinkableYieldTermStructureHandle() swapEngine=DiscountingSwapEngine(discountTermStructure) swap1.setPricingEngine(swapEngine) crv=crvVec[iT] discountTermStructure.linkTo(crv) forecastTermStructure.linkTo(crv) spmat[nI][iT]=swap1.NPV() spmat=np.array(spmat) spmat[spmat<0]=0 spmat=np.sum(spmat, axis=0)

This results in simplification of computation of the mean exposure and the CVA calculations at the driver. Next, create a distributed collection (RDD) of random numbers of size NS × NT:

randArrayRDD = RandomRDDs.normalVectorRDD(sc, NS, NT, numPartitions=NP, seed=1L)

Value each swap in parallel, where one unit of work is valuing a swap under all scenarios in a row of the random matrix.

pmat=randArrayRDD.map(lambda p: (value_swap(p, NI))).collect()

Collect the valuations at the driver.

pmat=np.array(pmat) pmat=pmat.reshape((NS, NT)) EE=np.sum(pmat, axis=1) EE=np.mean(EE,axis=0)

Calculate the CVA.

for i in xrange(NT-1): sum+=0.5*(EE[i]*crvToday.discount(T[i])+EE[i+1]*crvToday.discount(T[i+1]))*(exp(-S*T[i]/(1.0-R))-exp(-S*T[i+1]/(1.0-R))) CVA=(1.0-R)*sum


We have demonstrated how Spark can be programmed to calculate the CVA of a portfolio over a cluster of thousands of cheap commodity nodes using Python, thus making it an attractive platform for prototyping and live risk estimates.

Categories: Hadoop

Hello, Kite SDK 1.0

Cloudera Blog - Tue, 03/03/2015 - 23:09

The Kite project recently released a stable 1.0!

This milestone means that Kite’s data API and command-line tools is ready for long-term use.

The 1.0 data modules and API are no longer rapidly changing. From 1.0 on, Kite will be strict about breaking compatibility and will use semantic versioning to signal what compatibility guarantees you can expect from a given release. For example, breaking changes require increasing the major version number, so both minor and patch updates are safe to use without code changes, and binary compatible.

Kite provides some additional guarantees as well:

  • Kite’s command-line tool, kite-dataset, also follows semantic versioning. Changes that may break scripts will require updating the major version number.
  • Incompatible changes to Kite’s on-disk formats, Avro and Parquet, will also be signalled by a major version update.
  • Kite’s storage formats, metadata, and file layout are forward-compatible for at least one major version. You can update Kite, write to existing datasets, and roll back to the previous version safely.

We’re excited to get the Kite 1.0 release out. Now, Kite provides a great high-level API built around how you work with your data, and stability guarantees so you can be confident building on top of it.

Learn more about Kite:

Categories: Hadoop

How-to: Let Users Provision Apache Hadoop Clusters On-Demand

Cloudera Blog - Mon, 03/02/2015 - 16:51

Providing Hadoop-as-a-Service to your internal users can be a major operational advantage.

Cloudera Director (free to download and use) is designed for easy, on-demand provisioning of Apache Hadoop clusters in Amazon Web Services (AWS) environments, with support for other cloud environments in the works. It allows for provisioning clusters in accordance with the Cloudera AWS Reference Architecture.

At Cloudera, Cloudera Director is used internally to enable our technical field to provision clusters on-demand for demos, POCs, and testing purposes. The following post describes the goals of Cloudera’s internal environment, and how you can set it up your own to meet similar requirements.


Our internal deployment has several goals:

  • Provide a way to provision Hadoop clusters on-demand in a self-service manner as and when the field team needs them
  • Use a common AWS account
  • Make clusters accessible from within Cloudera’s network, making them look like an extension to our data center
  • Enable clusters to access Amazon S3 and the internet with the aggregate bandwidth scaling proportional to the cluster and instance sizes
  • Conform to Cloudera IT’s guidelines on AWS usage, which require all users to tag their instances with their usernames for them. All stray instances are terminated.

Ordinarily, you would set up the network context from the AWS management console, the AWS CLI client, or using AWS Quickstart. As it stands today, AWS Quickstart does not meet our goals because it doesn’t create the VPN, IAM roles, and so on the way we want them—it works great if you don’t need the VPN setup and are starting afresh. We chose to set things up manually instead, and the process is described below.

Setting Up a VPC

The first piece of that setup is the VPC, which is a one-time investment you can repurpose for other deployments that you might have in AWS with similar requirements. To do that, go into the VPC section of the console. It’ll look something like this:

As articulated in Cloudera’s reference architecture, three kinds of setups are possible: VPC with public subnet only, VPC with private subnet only, or VPC with private and public subnets. The choice depends on what kind of connectivity and security requirements you have. If you are looking to transfer data between your cluster and S3, you’ll need to deploy your cluster in a public subnet so it has public IP addresses and can interact with S3. Once you create your VPC, configure the route tables and virtual gateways to link it back to your data center using a VPN or Direct Connect link. Instructions for setting up a VPN are available here and for setting up Direct Connect are available here

Setting Up the Subnets

To have a cluster in AWS that has S3 and internet access, you need the instances to have public IP addresses. For the instances to be accessible from within your network, you need them to be linked via VPN or Direct Connect. Create a subnet inside the VPC you just created. It’ll automatically populate the route table from the VPC. The configurations would look something like the following:

You can leave the NACLs to default.

Setting Up Security Groups

To allow users to spin up clusters on-demand in a self-service manner, you need every provisioned instance to adhere to a set of ingress and egress rules at the bare minimum. These rules would allow outbound traffic to the internet and S3 and bidirectional traffic from Cloudera’s network. To do that, you can create a security group with the following rules:

Creating these security groups by themselves isn’t sufficient to enforce all instances to use them. You have to disallow creation, modification, and deletion of security groups in this VPC so that users have the option of only using the one you created for them.

Setting Up IAM Rules

Once you have the above setup configured, you don’t want users to be able to create, delete, or modify any of the configurations, especially security groups. If not restricted via IAM rules, security groups are easy to modify. For that, you can set IAM rules restricting users from modifying anything in this environment by having the rule like the following:

{     "Statement":[        {           "Effect":"Allow",          "NotAction":"iam:*",          "Resource":"*"       },       {           "Effect":"Deny",          "Action":"*",          "Resource":"arn:aws:ec2:us-east-1:*:security-group/sg-dbc403bf"       },       {           "Effect":"Deny",          "Action":[              "ec2:AuthorizeSecurityGroupIngress",             "ec2:AuthorizeSecurityGroupEgress",             "ec2:RevokeSecurityGroupIngress",             "ec2:RevokeSecurityGroupEgress",             "ec2:DeleteSecurityGroup",             "ec2:CreateSecurityGroup"          ],          "Resource":"*",          "Condition":{              "StringEquals":{                 "ec2:Vpc":"arn:aws:ec2:us-east-1:007856030109:vpc/vpc-c2d650a7"             }          }       }    ] }

This rule disallows users from performing any action on the specified security group that you have created as a part of the network context as well as creating, deleting, or modifying any other security groups inside the VPC that you’ve created. That way, you can rest assured you are not running at the risk of opening your network to more traffic than you intended.

Setting Up Cloudera Director

Cloudera Director setup instructions can be found in the documentation.

Once installed, you can create an environment inside Director with the specified public subnet so clusters in that environment can access the internet and S3. You’ll have to share the pem file for this environment with all users since keys are environment specific. Once the environment is set up, you can create instance templates with appropriate instance type configurations and tags based on your requirement. Cloudera tracks internal usage based on tags and every user marks their instances via tags. For that reason, we have every user create a template for themselves with their name in the tags.

Creating Clusters

Cloudera Director provides you with a web UI you can use to provision clusters. Click the “Add Cluster” button on the web UI to start the process.

You’ll create a Cloudera Manager instance first and then a cluster. You can also add a cluster to an existing Cloudera Manager instance, if you have one. The cluster configuration page looks like the following:

On clicking “Continue,” Cloudera Director will bootstrap your cluster and it’ll become available through the Cloudera Manager instance to which you added the cluster. That way, your users can provision clusters on-demand for different purposes.

Billing and Metering

Currently, there is no special billing and metering provision in Cloudera Director or Cloudera Manager. Pay-as-you-go pricing is available in lieu of annual subscription pricing. To track your usage, you can leverage the EC2 instance tags and detailed billing that’ll give you a breakdown of usage based on tags and other filters. You can also use third-party tools such as Cloudhealth and Cloudability to create the reports (which we do internally at Cloudera to track usage).


In this post, you learned how to set up and use AWS and Cloudera Director to provide Hadoop as a service to your users. At Cloudera, we are committed to continually improving this experience and would love to hear your feedback about what’s working and what’s not in the Cloudera Director area of community.cloudera.com.

Amandeep Khurana is Principal Solutions Architect at Cloudera. He is a co-author of the Manning book, HBase in Action.

Categories: Hadoop

How-to: Do Real-Time Log Analytics with Apache Kafka, Cloudera Search, and Hue

Cloudera Blog - Fri, 02/27/2015 - 16:35

Cloudera recently announced formal support for Apache Kafka. This simple use case illustrates how to make web log analysis, powered in part by Kafka, one of your first steps in a pervasive analytics journey.

If you are not looking at your company’s operational logs, then you are at a competitive disadvantage in your industry. Web server logs, application logs, and system logs are all valuable sources of operational intelligence, uncovering potential revenue opportunities and helping drive down the bottom line. Whether your firm is an advertising agency that analyzes clickstream logs for customer insight, or you are responsible for protecting the firm’s information assets by preventing cyber-security threats, you should strive to get the most value from your data as soon as possible.

In the past, it is cost-prohibitive to capture all logs, let alone implement systems that act on them intelligently in real time. Recently, however, technology has matured quite a bit and, today, we have all the right ingredients we need in the Apache Hadoop ecosystem to capture the events in real time, process them, and make intelligent decisions based on that information.

In this post, you will explore a sample implementation of a system that can capture Apache HTTP Server logs in real time, index them for searching, and make them available to other analytic apps as part of a “pervasive analytics” approach. This implementation is based on open source components such as Apache Flume, Apache Kafka, Hue, and Apache Solr.

Flume, Solr, Hue, and Kafka can all be easily installed using Cloudera Manager and parcels (the first three via the CDH parcel, and Kafka via its own parcel).


The high-level diagram below illustrates a simple setup that you can deploy in a matter of minutes. For our purposes, Apache web server log events originate in syslog. They are then forwarded to a Flume Agent, via Flume Syslog Source. Syslog Source sends them to Kafka Channel, which in turn passes them to a MorphlineSolr sink. MorphlineSink parses the messages, converts them into Solr documents, and sends them to Solr Server. After the indexed documents appear in Solr, Hue’s Search Application is utilized to search the indexes and build and display multiple unique dashboards for various audiences.


Next, you will learn all the details behind the above.

Apache Logs Breakdown

Every time you start a new project that involves Solr, you must first understand your data and organize it into fields. Fortunately, Apache web server logs are easy enough to understand and relate to Solr documents. A sample of the logs can be found below: - - [15/Dec/2014:06:39:51 +0000] "GET /accounts/login/?next=/ HTTP/1.1" 302 460 "-" "Mozilla/5.0+(compatible; UptimeRobot/2.0; http://www.uptimerobot.com/)" 55006 - - [15/Dec/2014:06:39:54 +0000] "GET /pig/watch/0000365-141119075018336-oozie-oozi-W?format=python&amp;_=1418625519197 HTTP/1.1" 200 719 "http://demo.gethue.com/pig/#logs" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)" 37789 - - [15/Dec/2014:06:39:55 +0000] "GET /pig/watch/0000365-141119075018336-oozie-oozi-W?format=python&amp;_=1418625519198 HTTP/1.1" 200 719 "http://demo.gethue.com/pig/#logs" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)" 28120

The diagram below represents a simple view of how to organize raw Apache web server messages into Solr fields:

There’s Something About the Cloud

Cloudera Search, which is integrates Solr with HDFS, is deployed in SolrCloud mode with all the options and flexibility that come with integrating to the rest of the Hadoop ecosystem in CDH. Throughout this example, you will use the solrctl command to manage SolrCloud deployments. (For the full command reference, please click here.)

Let’s begin by generating template configuration files for Solr. The most important and the only file to update is schema.xml,which is used in Solr to define the fields in the collection, their types, and their indexing characteristics. The command below generates conf directory with all configuration files in the $HOME/accessCollection folder:

solrctl --zk localhost:2181/solr instancedir --generate $HOME/accessCollection

(Please note that –zk localhost:2181 should be replaced with the address and port of your own Apache ZooKeeper quorum.)

schema.xml file. What follows is a brief overview of what was changed from the template generated above. The fields relevant to the Apache logs have to be defined in the schema file:

<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="_version_" type="long" indexed="true" stored="true"/> <field name="time" type="tdate" indexed="true" stored="true" /> <field name="record" type="text_general" indexed="true" stored="false" multiValued="true"/> <field name="client_ip" type="string" indexed="true" stored="true" /> <field name="code" type="string" indexed="true" stored="true" /> <field name="user_agent" type="string" indexed="true" stored="true" /> <field name="protocol" type="string" indexed="true" stored="true" /> <field name="url" type="string" indexed="true" stored="true" /> <field name="request" type="string" indexed="true" stored="true" /> <field name="referer" type="string" indexed="true" stored="true" /> <field name="bytes" type="tint" indexed="true" stored="true" /> <field name="method" type="string" indexed="true" stored="true" /> <field name="extension" type="string" indexed="true" stored="true" /> <field name="app" type="string" indexed="true" stored="true" /> <field name="subapp" type="string" indexed="true" stored="true" /> <field name="device_family" type="string" indexed="true" stored="true" /> <field name="user_agent_major" type="string" indexed="true" stored="true" /> <field name="user_agent_family" type="string" indexed="true" stored="true" /> <field name="os_family" type="string" indexed="true" stored="true" /> <field name="os_major" type="string" indexed="true" stored="true" /> <field name="region_code" type="string" indexed="true" stored="true" /> <field name="country_code" type="string" indexed="true" stored="true" /> <field name="city" type="string" indexed="true" stored="true" /> <field name="latitude" type="float" indexed="true" stored="true" /> <field name="longitude" type="float" indexed="true" stored="true" /> <field name="country_name" type="string" indexed="true" stored="true" /> <field name="country_code3" type="string" indexed="true" stored="true" />

Although you are not using the id and _version_ fields in this application, Solr uses them internally for its own bookkeeping. Therefore, every collection must have them (as defined in the schema.xml file).

One very important concept in SolrCloud deployments is the notion of collections. A collection is a single index that spans multiple Solr Instances. For example, if your syslog index is distributed across multiple Solr Instances, they all add up to form one collection.

Let’s call our collection accessCollection and set it up using the commands below. The first command uploads all of the configurations into a ZooKeeper znode. The second command creates the collection in Solr, based on the configuration in ZooKeeper from the first command.

solrctl --zk localhost:2181/solr instancedir --create accessCollection $HOME/accessCollection solrctl --zk localhost:2181/solr --create accessCollection -s 1

Again, replace –zk localhost:2181 with your own ZooKeeper quorum configuration in both statements.

Note that the -s 1 argument defines the number of shards. A shard is a very important concept in Solr that refers to a slice of an index. For example, if you have a corpus of 1 million events, you may want to split it into two shards for scalability and improved query performance. The first shard might handle all the documents that have an id between 0-500,000, and the second shard will handle documents with message id between 500,000-1,000,000. Solr handles all this logic internally; you need only specify the number of shards you would like to create with the -s option.

The number of shards will depend on many factors and should be determined carefully. The following table describes some of the considerations that should go into choosing the optimal number:

How Flume Met Kafka

Before you index the logs for searching, you need to collect them from the application servers.

Flume is a distributed system for collecting and processing log data. One of the main advantages of Flume is its large collection of sources and sinks. In many cases, Flume makes integration a no-brainer.

As previously described, our example uses Flume with Syslog Source to collect the log data from syslog, Kafka as a distributed and highly available channel to store the log data, and Solr sink with Morphlines to index the data and store it in Cloudera Search. All this can be done by properly configuring Flume, without writing a line of code. You can find the configuration file here.

There are three components in the configuration:

  • First, a syslog source, configured with the host and port to which it will bind.
    # Syslog Source Configuration tier1.sources.source1.type     = syslogtcp # the hostname that Flume Syslog source will be running on tier1.sources.source1.host     = localhost # the port that Flume Syslog source will listen on tier1.sources.source1.port     = 5040
  • Next, a Solr sink, configured with a configuration file that we’ll review in detail later.
    tier1.sinks.sink1.type          = org.apache.flume.sink.solr.morphline.MorphlineSolrSink tier1.sinks.sink1.morphlineFile = /apache_logs/latest/morphline.conf
  • Finally, a Kafka channel in between them.
    tier1.channels.channel1.type             = org.apache.flume.channel.kafka.KafkaChannel tier1.channels.channel1.transactionCapacity = 1000 tier1.channels.channel1.brokerList          = kafkaf-2:9092,kafkaf-3:9092 tier1.channels.channel1.topic               = channel1 tier1.channels.channel1.zookeeperConnect    = kafkaf-1:2181

The Kafka channel requires two mandatory parameters:

  • Location of at least one, but preferably two or more Kafka brokers
  • Location of the ZooKeeper quorum that Kafka uses

There are also a few optional parameters:

  • topic – specifies which topic the channel will use. It’s important to set it correctly if you expect Flume to read data that other apps wrote to Kafka; the topic should match between the apps and the Kafka Channel configuration. The default topic is flume-channel.
  • groupId – if multiple Kafka channels share the same groupId and same topic, they will each get partial data from the topic. Thus you can use this setting to add scalability via multiple Flume agents, each with a Kafka channel configured with the same groupId. Or, if you need multiple channels that all receive all the data from Kafka (essentially duplicating all the data), you’ll want to use different groupIds or different topics. The default groupId is flume.
  • transactionCapacity – the number of events the channel processes in one transaction. Setting this parameter to a higher number can increase throughput but latency as well.
  • parseAsFlumeEvent – A setting of “true” assumes that all events in the Kafka topic were written by a Flume source or Flume client. Thus the first time the channel starts, all events in the topic are read (subsequently, only the last recorded position is read). A setting of “false” assumes that some other application wrote the events to Kafka so thus they’re not parsed. In addition, only events written after the channel started are read (since the topic may have a large history in it already).

All this is nice if the data is arriving from syslog and going only to Solr by way of Morphlines, but in today’s enterprise IT, there are usually many different data sources. In many companies, applications write important events directly to Kafka without going through syslog or Log4J at all.

To get data from Kafka, parse it with Morphlines, and index it into Solr, you can use an almost identical configuration. The only changes required are:

  • Leave out the Syslog source.
  • When configuring the Kafka channel, specify: parseAsFlumeEvent = false.

These changes are necessary because the events are now written to Kafka by apps other than Flume, so the source is not necessary (Kafka channel will get events from Kafka to SolrSink) and the events in the channel can be any data type, not necessarily a FlumeEvent.

This configuration allows indexing and searching using Cloudera Search any enterprise event that was written to Kafka (including logs, metrics, audit events, and so on).

ETL Coding with Kite Morphlines

A “morphline” is a rich configuration file that makes it easy to define a transformation chain that can consume any kind of data from any kind of data source, process that data, and load the results into a Hadoop component. Apache log parsing is achieved with the help of the Morphlines library, an open source framework available through the Kite SDK, that defines a transformation chain without a single line of code.

Our morphline configuration file will break down raw apache logs and generate Solr fields that will be used for indexing. The morphlines library will perform the following actions:

  • Read the logs with the readCSV command, using space as a separator
  • Use the split command to break up request field into three parts:  method, url, protocol
  • Use the split command to extract app and subapp fields from the url field
  • Use the userAgent command to extract all of the device, OS, and user agent information
  • Use the geoIP and extractJsonPaths commands to retrieve geo coordinates such as country, region, city, latitude, and longitude by doing a lookup against an efficient in-memory Maxmind database. Therefore, the databases need to be downloaded from Maxmind
  • Generate unique ID for every log with the generateUUID command
  • Convert the date/timestamp into a field that Solr will understand, with the convertTimestamp command
  • Drop all of the extra fields that we did not specify in schema.xml, with the sanitizeUknownSolrFields command, and
  • Load the record into Solr for HDFS write, with the loadSolr command

When building this example, we initially used three morphlines commands to break up the Apache log event: readCSV, split, split. Our intention was to make this blog more generic and demonstrate how easy it can be adapted to all different types of logs. However, the creators of the morphlines library have generously provided a number of pre-defined patterns for commonly used log formats, including Apache web server ones. What follows is an alternative way of reading the Apache log events and breaking them up into fields via morphlines:

{        readLine {          ignoreFirstLine : true          commentPrefix : "#"          charset : UTF-8        }      }                    {        grok {          dictionaryFiles : [target/test-classes/grok-dictionaries]                   expressions : {                      message : """<%{COMBINEDAPACHELOG:apache_log}>"""               }          extract : inplace          findSubstrings : false          addEmptyStrings : false        }      }

Picture Perfect with Hue Dashboards

Now that your logs are indexed in near real time, you need a dashboard to search and drill into the events. The best tool for this job is Hue, the open source GUI for Hadoop, which comes preloaded with a Search application.


With just a few simple clicks, we can generate a nice dashboard, and present it to the end user. 

Start by clicking the Search->Indexes menu item, then click on Dashboards and Create. You will then see a new dashboard template window such as the one below; to get going, click the little pencil (Edit) button. 

Next, choose how to present the search results on the dashboard. For our demo, we chose Grid Layout, but if you are handy with HTML you can chose an HTML layout and present the results in a sexier manner. 

The next step is where all fun begins: You can drag and drop different widgets on the screen and assign them to the fields in the index. At the moment, Hue Search Dashboards support the following widgets:

  • Filter Bar
  • Marker Map
  • Text Facet
  • Pie Chart
  • Bar Chart
  • Line Chart
  • Tree
  • Heatmap
  • Timeline
  • Gradient Map

For full reference of how to build your own dashboards, follow the links below:


For our Apache logs demo, we used pie charts to give users the ability to drill into Application, Region, and Operating System facets. Text facets allow users to drill into country and city. A timeline view provides a nice graphical view of when users accessed our website. Finally, a marker map visually displays geo locations from which users accessed our example website.


Although the main example in this post describes a use case involving Apache web server logs, you could just easily use the same components for any type of log/event processing. For an information security use case, processing proxy and firewall logs in real time can go a long way toward stopping external attacks and preventing insider threats. For an insurance company, processing claims and making them searchable to adjusters and fraud analysts can decrease time to resolution.

Whatever the use case, the ongoing investment in pervasive analytics is key.

Gwen Shapira is a Software Engineer at Cloudera, working on the Data Ingest team.

Jeff Shmain is a Solutions Architect at Cloudera.

Categories: Hadoop

Download the Hive-on-Spark Beta

Cloudera Blog - Wed, 02/25/2015 - 14:51

A Hive-on-Spark beta is now available via CDH parcel. Give it a try!

The Hive-on-Spark project (HIVE-7292) is one of the most watched projects in Apache Hive history. It has attracted developers from across the ecosystem, including from organizations such as Intel, MapR, IBM, and Cloudera, and gained critical help from the Spark community.

Many anxious users have inquired about its availability in the last few months. Some users even built Hive-on-Spark from the branch code and tried it in their testing environments, and then provided us valuable feedback. The team is thrilled to see this level of excitement and early adoption, and has been working around the clock to deliver the product at an accelerated pace.

Thanks to this hard work, significant progress has been made in the last six months. (The project is currently incubating in Cloudera Labs.) All major functionality is now in place, including different flavors of joins and integration with Spark, HiveServer2, and YARN, and the team has made initial but important investments in performance optimization, including split generation and grouping, supporting vectorization and cost-based optimization, and more. We are currently focused on running benchmarks, identifying and prototyping optimization areas such as dynamic partition pruning and table caching, and creating a roadmap for further performance enhancements for the near future.

Two month ago, we announced the availability of an Amazon Machine Image (AMI) for a hands-on experience. Today, we even more proudly present you a Hive-on-Spark beta via CDH parcel. You can download that parcel here. (Please note that in this beta release only HDFS, YARN, Apache ZooKeeper, and Hive are supported. Other components, such as Apache Pig and Impala, might not work as expected.) The “Getting Started” guide will help you get your Hive queries up and running on the Spark engine without much trouble.

We welcome your feedback. For assistance, please use user@hive.apache.org or the Cloudera Labs discussion board.

We will update you again when GA is available. Stay tuned!

Xuefu Zhang is a software engineer at Cloudera and a Hive PMC member.

Categories: Hadoop

Apache HBase 1.0 is Released

Cloudera Blog - Wed, 02/25/2015 - 00:20

The Cloudera HBase Team are proud to be members of Apache HBase’s model community and are currently AWOL, busy celebrating the release of the milestone Apache HBase 1.0. The following, from release manager Enis Soztutar, was published today in the ASF’s blog.


The Apache HBase community has released Apache HBase 1.0.0. Seven years in the making, it marks a major milestone in the Apache HBase project’s development, offers some exciting features and new API’s without sacrificing stability, and is both on-wire and on-disk compatible with HBase 0.98.x.


In this blog, we look at the past, present and future of Apache HBase project. 

Versions, versions, versions 

Before enumerating feature details of this release let’s take a journey into the past and how release numbers emerged. HBase started its life as a contrib project in a subdirectory of Apache Hadoop, circa 2007, and released with Hadoop. Three years later, HBase became a standalone top-level Apache project. Because HBase depends on HDFS, the community ensured that HBase major versions were identical and compatible with Hadoop’s major version numbers. For example, HBase 0.19.x worked with Hadoop 0.19.x, and so on.

However, the HBase community wanted to ensure that an HBase version can work with multiple Hadoop versions—not only with its matching major release numbers Thus, a new naming scheme was invented where the releases would start at the close-to-1.0 major version of 0.90, as show above in the timeline. We also took on an even-odd release number convention where releases with odd version numbers were “developer previews” and even-numbered releases were “stable” and ready for production. The stable release series included 0.90, 0.92, 0.94, 0.96 and 0.98. (See HBase Versioning for an overview.)

After 0.98, we named the trunk version 0.99-SNAPSHOT, but we officially ran out of numbers! Levity aside, last year, the HBase community agreed that the project had matured and stabilized enough such that a 1.0.0 release was due. After three releases in the 0.99.x series of “developer previews” and six Apache HBase 1.0.0 release candidates, HBase 1.0.0 has now shipped! See the above diagram, courtesy of Lars George, for a timeline of releases. It shows each release line together with the support lifecycle, and any previous developer preview releases if any (0.99->1.0.0 for example).

HBase-1.0.0, start of a new era

The 1.0.0 release has three goals:

1) to lay a stable foundation for future 1.x releases;

2) to stabilize running HBase cluster and its clients; and

3) make versioning and compatibility dimensions explicit 

Including previous 0.99.x releases, 1.0.0 contains over 1500 jiras resolved. Some of the major changes are: 

API reorganization and changes

HBase’s client level API has evolved over the years. To simplify the semantics and to support and make it extensible and easier to use in the future, we revisited the API before 1.0. To that end, 1.0.0 introduces new APIs, and deprecates some of the commonly-used client side APIs (HTableInterface, HTable and HBaseAdmin).

We advise you to update your application to use the new style of APIs, since deprecated APIs will be removed in the future 2.x series of releases. For further guidance, please visit these two decks: http://www.slideshare.net/xefyr/apache-hbase-10-release and http://s.apache.org/hbase-1.0-api.

All Client side APIs are marked with the InterfaceAudience.Public class, indicating if a class/method is an official “client API” for HBase (See “11.1.1. HBase API Surface” in the HBase Refguide for more details on the Audience annotations). Going forward, all 1.x releases are planned to be API compatible for classes annotated as client public.

Read availability using timeline consistent region replicas

As part of phase 1, this release contains an experimental “Read availability using timeline consistent region replicas” feature. That is, a region can be hosted in multiple region servers in read-only mode. One of the replicas for the region will be primary, accepting writes, and other replicas will share the same data files. Read requests can be done against any replica for the region with backup RPCs for high availability with timeline consistency guarantees. See JIRA HBASE-10070 for more details.

Online config change and other forward ports from 0.89-fb branch

The 0.89-fb branch in Apache HBase was where Facebook used to post their changes. HBASE-12147 JIRA forward ported the patches which enabled reloading a subset of the server configuration without having to restart the region servers.

Apart from the above, there are hundreds of improvements, performance (improved WAL pipeline, using disruptor, multi-WAL, more off-heap, etc) and bug fixes and other goodies that are too long to list here. Check out the official release notes for a detailed overview. The release notes and the book also cover binary, source and wire compatibility requirements, supported Hadoop and Java versions, upgrading from 0.94, 0.96 and 0.98 versions and other important details.

HBase-1.0.0 is also the start of using “semantic versioning” for HBase releases. In short, future HBase releases will have MAJOR.MINOR.PATCH version with the explicit semantics for compatibility. The HBase book contains all the dimensions for compatibility and what can be expected between different versions.

What’s Next

We have marked HBase-1.0.0 as the next stable version of HBase, meaning that all new users should start using this version. However, as a database, we understand that switching to a newer version might take some time. We will continue to maintain and make 0.98.x releases until the user community is ready for its end of life. 1.0.x releases as well as 1.1.0, 1.2.0, etc line of releases are expected to be released from their corresponding branches, while 2.0.0 and other major releases will follow when their time arrives.

Read replicas phase 2, per column family flush, procedure v2, SSD for WAL or column family data, etc are some of the upcoming features in the pipeline. 


Finally, the HBase 1.0.0 release has come a long way, with contributions from a very large group of awesome people and hard work from committers and contributors. We would like to extend our thanks to our users and all who have contributed to HBase over the years.

Keep HBase’ing!

Categories: Hadoop

What’s New in Cloudera Director 1.1?

Cloudera Blog - Thu, 02/19/2015 - 14:17

Cloudera Director 1.1 introduces new features and improvements that provide more options for creating and managing cloud deployments of Apache Hadoop. Here are details about how they work.

Cloudera Director, which was released in October of 2014, delivers production-ready, self-service interaction with Apache Hadoop clusters in cloud environments. You can find background information about Cloudera Director’s purpose and fundamental features in our earlier introductory blog post and technical overview blog post.

The 1.1 release of Cloudera Director builds on the premiere release with new features and improvements to existing functionality. Let’s take a look at some of them.

On-Demand Shrinking of Clusters

As you use Hadoop clusters, you may find that you need to expand them to handle heavier workloads, and Cloudera Director 1.0 already lets you “grow” your clusters by allocating new instances and adding them to cluster roles.

Cloudera Director 1.1 now also lets you “shrink” your clusters by removing both compute and storage instances you no longer need and adjusting your cluster configuration accordingly. This completes Cloudera Director’s mechanism for managing the size of your clusters, letting you more easily manage resource usage in your cloud environment.

The Cloudera Director UI uses the grow and shrink capabilities to let you “repair” nodes in your clusters. By simply tagging a node for repair, Cloudera Director will replace a node with a fresh one, handling all the cloud provider and configuration details for you.

On-Demand Database Creation

Cloudera Director 1.0 lets you configure Cloudera Manager and cluster services like Hive to use external databases hosted on separate database servers, instead of relying on the PostgreSQL database embedded within Cloudera Manager. Hosting databases on your own servers lets you implement the backup and availability strategies that may be required by your enterprise.

Cloudera Director 1.1 goes further with external database support by creating the necessary databases for you when you describe the databases using external database templates, either through the Cloudera Director server API or the client configuration file. Not only can Cloudera Director create the databases while bootstrapping your deployment and cluster, but it can also destroy them when you terminate your deployment and cluster.

In order for Cloudera Director to know where to create new databases, you also define the database servers that you would like to use. Cloudera Director uses the capabilities of Cloudera Manager to communicate with your database servers, meaning that Cloudera Director does not even need to have direct access to them for databases to be created.

Amazon RDS Integration

Cloudera Director 1.1 goes even further with external database support by adding support for Amazon Relational Database Service (RDS). Instead of defining database servers that already exist, you can define them in terms of RDS instance information, and Cloudera Director will allocate and bootstrap them for you while it works to bootstrap your new deployments.

Cloudera Director Database on MySQL

The server component of Cloudera Director 1.0 uses an embedded database by default for its own data. Cloudera Director 1.1 adds support for hosting this database instead on an external MySQL database server.

Cloudera Director API Client Libraries

The Cloudera Director server’s RESTful API exposes all of Cloudera Director’s features. With the Cloudera Director 1.1 release, Cloudera is offering client-side libraries implemented in Java and Python to make it easy for developers to integrate with Cloudera Director in just a few lines of code. The libraries are automatically generated directly from the API itself, so you know it covers each and every service available.

The client libraries are available from Cloudera’s Github account and are Apache-licensed so you can use them anywhere. Sample code using the clients is also available to help developers get started.

Faster Bootstrap of Cloudera Manager and Clusters

Cloudera Director 1.1 includes performance improvements beyond 1.0.

  • Cloudera Manager uses parcels for packaging and distributing CDH. Cloudera Director is now smarter about detecting when parcels are already available to Cloudera Manager locally, and avoids downloading them again.
  • If you install Cloudera Manager on your machine images (e.g., AMIs), Cloudera Director can detect that Cloudera Manager is already present and skip downloading it and installing it.

You can combine these improvements by creating machine images with Cloudera Manager and CDH parcels already installed. Cloudera Director can take advantage of this work and shorten bootstrap time by avoiding expensive downloads.

Expanded Cloudera Director UI and Documentation

The Cloudera Director UI has been enhanced with lots of new features. Here’s just a sampling.

  • When you first visit the UI after server installation, a welcome page gives you the information you need to get started and then guides you through the bootstrap wizard.
  • Each environment’s tab lists its details, such as its cloud provider, SSH username, and links to all of its defined instance templates.
  • If an instance template is not being used by any deployments or clusters, you can edit it.
  • The UI now provides a health display for each of your clusters, and for each service in a cluster.
  • When defining a new cluster, you can customize the names and size of each group of instances in the cluster, and add and remove groups as well.
  • You can grow, shrink, and repair clusters through the UI.
  • All users can change their own passwords through the UI, while administrative users have full user management capability.
  • Many usability improvements make the UI easier to use.

Cloudera Director’s documentation has gotten a lot of attention. It’s now available either in PDF or HTML formats, and you can get to it either from within the UI or externally. Introductory sections, release notes, installation and configuration instructions, examples, and feature descriptions have all been expanded and improved. Of course, details on all the new features described in this post are included, too.


Cloudera Director 1.1 builds on the solid technical foundation of Cloudera Director 1.0 to provide more ways to build and manage Hadoop clusters in the cloud. We’re not done yet, though. Here are some improvements that we have planned.

  • Support for enabling high availability (HA) in clusters
  • Automation of adding Kerberos authentication through Cloudera Manager
  • A service provider interface for implementing support for new cloud providers
  • Exposing more Cloudera Director functionality through the UI

Thanks to everyone who has tried out Cloudera Director and for the feedback we’ve received so far. To try out Cloudera Director yourself, download it for free or try the Cloudera AWS Quick Start to get started right away.

Bill Havanki is a Software Engineer at Cloudera.

Categories: Hadoop

How-to: Deploy and Configure Apache Kafka in Cloudera Enterprise

Cloudera Blog - Wed, 02/18/2015 - 12:55

With Kafka now formally integrated with, and supported as part of, Cloudera Enterprise, what’s the best way to deploy and configure it?

Earlier today, Cloudera announced that, following an incubation period in Cloudera Labs, Apache Kafka is now fully integrated into Cloudera’s Big Data platform, Cloudera Enterprise (CDH + Cloudera Manager). Our customers have expressed strong interest in Kafka, and some are already running Kafka in production.

Kafka is a popular open source project so there is already good awareness of its strengths and overall architecture. However, a common set of questions and concerns come up when folks try to deploy Kafka in production for the first time. In this post, we will  attempt to answer a few of those questions.


We assume you are familiar with the overall architecture of Kafka, and with the Kafka concepts of brokers, producers, consumers, topics, and partitions. If not, you can check out a previous blog post, “Kafka for Beginners,” and the Kafka documentation.

Hardware for Kafka

For optimal performance, Cloudera strongly recommendeds that production Kafka brokers be deployed on dedicated machines, separate from the machines on which the rest of your Apache Hadoop cluster runs. Kafka relies on dedicated disk access and large pagecache for peak performance, and sharing nodes with Hadoop processes may interfere with its ability to fully leverage the pagecache.

Kafka is meant to run on industry standard hardware. The machine specs for a Kafka broker machine will be similar to that of your Hadoop worker nodes. Though there is no minimum specification that is required, Cloudera suggests machines that have at least:

  • Processor with four 2Ghz cores
  • Six 7200 RPM SATA drives (JBOD or RAID10)
  • 32GB of RAM
  • 1Gb Ethernet
Cluster Sizing

The most accurate way to model your use case is to simulate the load you expect on your own hardware, and you can do that using the load-generation tools that ship with Kafka.

If you need to size your cluster without simulation, here are two simple rules of thumb:

  1. Size the cluster based on the amount of disk space required. This requirement can be computed from the estimated rate at which you get data multiplied by the required data retention period.
  2. Size the cluster cluster based on your memory requirements. Assuming readers and writers are fairly evenly distributed across the brokers in your cluster, you can roughly estimate of memory needs by assuming you want to be able to buffer for at least 30 seconds and compute your memory need as write_throughput*30.
Replication, Partitions, and Leaders

Data written to Kafka is replicated for fault tolerance and durability, and Kafka allows users to set a separate replication factor for each topic. The replication factor controls how many brokers will replicate each message that is written. If you have a replication factor of three, then up to two brokers can fail before you will lose access to your data. Cloudera recommends using a replication factor of at least two, so that you can transparently bounce machines without interrupting data consumption. However, if you have stronger durability requirements, use a replication factor of three or above.

Topics in Kafka are partitioned, and each topic has a configurable partition count. The partition count controls how many logs into which the topic will be sharded. Producers assign data to partitions in round-robin fashion, to spread the data belonging to a topic among its partitions. Each partition is of course replicated, but one replica of each partition is selected as a leader partition, and all reads and writes go to this lead partition.

Here are some factors to consider while picking an appropriate partition count for a topic:

A partition can only be read by a single consumer. (However, a consumer can read many partitions.) Thus, if your partition count is lower than your consumer count, many consumers will not receive data for that topic. Hence, Cloudera recommends a partition count that is higher than the maximum number of simultaneous consumers of the data, so that each consumer receives data.

Similarly, Cloudera recommends a partition count that is higher than the number of brokers, so that the leader partitions are evenly distributed across brokers, thus distributing the read/write load. (Kafka performs random and even distribution of partitions across brokers.) As a reference, many Cloudera customers have topics with tens or even hundreds of partitions each. However, note that Kafka will need to allocate memory for message buffer per partition. If there are a large number of partitions, make sure Kafka starts with sufficient heap space (number of partitions multiplied by replica.fetch.max.bytes).

Message Size

Though there is no maximum message size enforced by Kafka, Cloudera recommends writing messages that are no more than 1MB in size. Most customers see optimal throughput with messages ranging from  1-10 KB in size.

Settings for Guaranteed Message Delivery

Many use cases require reliable delivery of messages. Fortunately, it is easy to configure Kafka for zero data loss. But first, curious readers may like to understand the concept of an in-sync replica (commonly referred to as ISR): For a topic partition, an ISR is a follower replica that is caught-up with the leader partition, and is situated on a broker that is alive. Thus, if a leader replica is replaced by an ISR, there will be no loss of data. However, if a non-ISR replica is made a leader partition, some data loss is possible since it may not have the latest messages.

To ensure message delivery without data loss, the following settings are important:

  • While configuring a Producer, set acks=-1. This setting ensures that a message is considered to be successfully delivered only after ALL the ISRs have acknowledged writing the message.
  • Set the topic level configuration min.insync.replicas, which specifies the number of replicas that must acknowledge a write, for the write to be considered successful.If this minimum cannot be met, and acks=-1, the producer will raise an exception.
  • Set the broker configuration param unclean.leader.election.enable to false. This setting essentially means you are prioritizing durability over availability since Kafka would avoid electing a leader, and instead make the partition unavailable, if no ISR is available to become the next leader safely.

A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with request.required.acks set to -1. However, you can increase these numbers for stronger durability.


As of today, Cloudera provides a Kafka Custom Service Descriptor (CSD) to enable easy deployment and administration of a Kafka cluster via Cloudera Manager. The CSD provides granular real-time view of the health of your Kafka brokers, along with reporting and diagnostic tools. This CSD is available via Cloudera’s downloads page.

We hope the Cloudera Manager CSD, along with the tips in this blog post, make it easy for you to get up and running with Kafka in production.

Anand Iyer is a product manager at Cloudera.

Categories: Hadoop

Understanding HDFS Recovery Processes (Part 1)

Cloudera Blog - Fri, 02/13/2015 - 17:41

Having a good grasp of HDFS recovery processes is important when running or moving toward production-ready Apache Hadoop.

An important design requirement of HDFS is to ensure continuous and correct operations to support production deployments. One particularly complex area is ensuring correctness of writes to HDFS in the presence of network and node failures, where the lease recovery, block recovery, and pipeline recovery processes come into play. Understanding when and why these recovery processes are called, along with what they do, can help users as well as developers understand the machinations of their HDFS cluster.

In this blog post, you will get an in-depth examination of  these recovery processes. We’ll start with a quick introduction to the HDFS write pipeline and these recovery processes, explain the important concepts of block/replica states and generation stamps, then step through each recovery process. Finally, we’ll conclude by listing several relevant issues, both resolved and open.

This post is divided into two parts: Part 1 will examine the details of lease recovery and block recovery, and Part 2 will examine the details of pipeline recovery. Readers interested in learning more should refer the design specification: Append, Hflush, and Read for implementation details.


In HDFS, files are divided into blocks, and file access follows multi-reader, single-writer semantics. To meet the fault-tolerance requirement, multiple replicas of a block are stored on different DataNodes. The number of replicas is called the replication factor. When a new file block is created, or an existing file is opened for append, the HDFS write operation creates a pipeline of DataNodes to receive and store the replicas. (The replication factor generally determines the number of DataNodes in the pipeline.) Subsequent writes to that block go through the pipeline (Figure 1).

Figure 1. HDFS Write Pipeline

For read operations the client chooses one of the DataNodes holding copies of the block and requests a data transfer from it.

Below are two application scenarios highlighting the need for the fault-tolerance design requirement:

  • HBase’s Region Server (RS) writes to its WAL (Write Ahead Log), which is an HDFS file that helps to prevent data loss. If an RS goes down, a new one will be started and it will reconstruct the state of the predecessor RS by reading the WAL file. If the write pipeline was not finished when the RS died, then different DataNodes in the pipeline may not be in sync. HDFS must ensure that all of the necessary data is read from WAL file to reconstruct the correct RS state.
  • When a Flume client is streaming data to an HDFS file, it must be able to write continuously, even if some DataNodes in the pipeline fail or stop responding.

Lease recovery, block recovery, and pipeline recovery come into play in this type of situation:

  • Before a client can write an HDFS file, it must obtain a lease, which is essentially a lock. This ensures the single-writer semantics. The lease must be renewed within a predefined period of time if the client wishes to keep writing. If a lease is not explicitly renewed or the client holding it dies, then it will expire. When this happens, HDFS will close the file and release the lease on behalf of the client so that other clients can write to the file. This process is called lease recovery
  • If the last block of the file being written is not propagated to all DataNodes in the pipeline, then the amount of data written to different nodes may be different when lease recovery happens. Before lease recovery causes the file to be closed, it’s necessary to ensure that all replicas of the last block have the same length; this process is known as block recovery. Block recovery is only triggered during the lease recovery process, and lease recovery only triggers block recovery on the last block of a file if that block is not in COMPLETE state (defined in later section).
  • During write pipeline operations, some DataNodes in the pipeline may fail. When this happens, the underlying write operations can’t just fail. Instead, HDFS will try to recover from the error to allow the pipeline to keep going and the client to continue to write to the file. The mechanism to recover from the pipeline error is called pipeline recovery.

The following sections will explain these processes in more details.

Blocks, Replicas, and Their States

To differentiate between blocks in the context of the NameNode and blocks in the context of the DataNode, we will refer to the former as blocks, and the latter as replicas.

A replica in the DataNode context can be in one of the following states (see enum ReplicaState in org.apache.hadoop.hdfs.server.common.HdfsServerConstants.java):

  • FINALIZED: when a replica is in this state, writing to the replica is finished and the data in the replica is "frozen" (the length is finalized), unless the replica is re-opened for append. All finalized replicas of a block with the same generation stamp (referred to as the GS and defined below) should have the same data. The GS of a finalized replica may be incremented as a result of recovery.
  • RBW (Replica Being Written): this is the state of any replica that is being written, whether the file was created for write or re-opened for append. An RBW replica is always the last block of an open file. The data is still being written to the replica and it is not yet finalized. The data (not necessarily all of it) of an RBW replica is visible to reader clients. If any failures occur, an attempt will be made to preserve the data in an RBW replica.
  • RWR (Replica Waiting to be Recovered): If a DataNode dies and restarts, all its RBW replicas will be changed to the RWR state. An RWR replica will either become outdated and therefore discarded, or will participate in lease recovery.
  • RUR (Replica Under Recovery): A non-TEMPORARY replica will be changed to the RUR state when it is participating in lease recovery.
  • TEMPORARY: a temporary replica is created for the purpose of block replication (either by replication monitor or cluster balancer). It’s similar to an RBW replica, except that its data is invisible to all reader clients. If the block replication fails, a TEMPORARY replica will be deleted.

A block in the NameNode context may be in one the following states (see enum BlockUCState in org.apache.hadoop.hdfs.server.common.HdfsServerConstants.java):

  • UNDER_CONSTRUCTION: this is the state when it is being written to. An UNDER_CONSTRUCTION block is the last block of an open file; its length and generation stamp are still mutable, and its data (not necessarily all of it) is visible to readers. An UNDER_CONSTRUCTION block in the NameNode keeps track of the write pipeline (the locations of valid RBW replicas), and the locations of its RWR replicas.
  • UNDER_RECOVERY: If the last block of a file is in UNDER_CONSTRUCTION state when the corresponding client’s lease expires, then it will be changed to UNDER_RECOVERY state when block recovery starts.
  • COMMITTED: COMMITTED means that a block’s data and generation stamp are no longer mutable (unless it is reopened for append), and there are fewer than the minimal-replication number of DataNodes that have reported FINALIZED replicas of same GS/length. In order to service read requests, a COMMITTED block must keep track of the locations of RBW replicas, the GS and the length of its FINALIZED replicas. An UNDER_CONSTRUCTION block is changed to COMMITTED when the NameNode is asked by the client to add a new block to the file, or to close the file. If the last or the second-to-last blocks are in COMMITTED state, the file cannot be closed and the client has to retry.
  • COMPLETE: A COMMITTED block changes to COMPLETE when the NameNode has seen the minimum replication number of FINALIZED replicas of matching GS/length. A file can be closed only when all its blocks become COMPLETE. A block may be forced to the COMPLETE state even if it doesn’t have the minimal replication number of replicas, for example, when a client asks for a new block, and the previous block is not yet COMPLETE.

DataNodes persist a replica’s state to disk, but the NameNode doesn’t persist the block state to disk. When the NameNode restarts, it changes the state of the last block of any previously open file to the UNDER_CONSTRUCTION state, and the state of all the other blocks to COMPLETE.

Simplified state transition diagrams of replica and block are shown in Figures 2 and 3. See the design document for more detailed ones.

Figure 2: Simplified Replica State Transition

Figure 3. Simplified Block State Transition

Generation Stamp

A GS is a monotonically increasing 8-byte number for each block that is maintained persistently by the NameNode. The GS for a block and replica (Design Specification: HDFS Append and Truncates) is introduced for the following purposes:

  • Detecting stale replica(s) of a block: that is, when the replica GS is older than the block GS, which might happen when, for example, an append operation is somehow skipped at the replica.
  • Detecting outdated replica(s) on DataNodes which have been dead for long time and rejoin the cluster.

A new GS is needed when any of the following occur:

  • A new file is created
  • A client opens an existing file for append or truncate
  • A client encounters an error while writing data to the DataNode(s) and requests a new GS
  • The NameNode initiates lease recovery for a file
Lease Recovery and Block Recovery Lease Manager

The leases are managed by the lease manager at the NameNode. The NameNode tracks the files each client has open for write. It is not necessary for a client to enumerate each file it has opened for write when renewing leases. Instead, it periodically sends a single request to the NameNode to renew all of them at once. (The request is an org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseResponseProto message, which is an RPC protocol between HDFS client and NameNode.)

Each NameNode manages a single HDFS namespace, each of which has a single lease manager to manage all the client leases associated with that namespace. Federated HDFS clusters may have multiple namespaces, each with its own lease manager.

The lease manager maintains a soft limit (1 minute) and hard limit (1 hour) for the expiration time (these limits are currently non-configurable), and all leases maintained by the lease manager abide by the same soft and hard limits. Before the soft limit expires, the client holding the lease of a file has exclusive write access to the file. If the soft limit expires and the client has not renewed the lease or closed the file (the lease of a file is released when the file is closed), another client can forcibly take over the lease. If the hard limit expires and the client has not renewed the lease, HDFS assumes that the client has quit and will automatically close the file on behalf of the client, thereby recovering the lease.

The fact that the lease of a file is held by one client does not prevent other clients from reading the file, and a file may have many concurrent readers, even while a client is writing to it.

Operations that the lease manager supports include:

  • Adding a lease for a client and path (if the client already has a lease, it adds the path to the lease, otherwise, it creates a new lease and adds the path to the lease)
  • Removing a lease for a client and path (if it’s the last path in the lease, it removes the lease)
  • Checking whether the soft and/or hard limits have expired, and
  • Renewing the lease for a given client.

The lease manager has a monitor thread that periodically (currently every 2 seconds) checks whether any lease has an expired hard limit, and if so, it will trigger the lease recovery process for the files in these leases.

An HDFS client renews its leases via the org.apache.hadoop.hdfs.LeaseRenewer.LeaseRenewer class which maintains a list of users and runs one thread per user per NameNode. This thread periodically checks in with the NameNode and renews all of the client’s leases when the lease period is half over.

(Note: An HDFS client is only associated with one NameNode; see constructors of org.apache.hadoop.hdfs.DFSClient). If the same application wants to access different files managed by different NameNodes in a federated cluster, then one client needs to be created for each NameNode.)

Lease Recovery Process

The lease recovery process is triggered on the NameNode to recover leases for a given client, either by the monitor thread upon hard limit expiry, or when a client tries to take over lease from another client when the soft limit expires. It checks each file open for write by the same client, performs block recovery for the file if the last block of the file is not in COMPLETE state, and closes the file. Block recovery of a file is only triggered when recovering the lease of a file.

Below is the lease recovery algorithm for given file f. When a client dies, the same algorithm is applied to each file the client opened for write.

  1. Get the DataNodes which contain the last block of f.
  2. Assign one of the DataNodes as the primary DataNode p.
  3. p obtains a new generation stamp from the NameNode.
  4. p gets the block info from each DataNode.
  5. p computes the minimum block length.
  6. p updates the DataNodes, which have a valid generation  stamp, with the new generation stamp and the minimum block length.
  7. p acknowledges the NameNode the update results.
  8. NameNode updates the BlockInfo.
  9. NameNode remove f’s lease (other writer can now obtain the lease for writing to f).
  10. NameNode commit changes to edit log.

Steps 3 through 7 are the block recovery parts of the algorithm. If a file needs block recovery, the NameNode picks a primary DataNode that has a replica of the last block of the file, and tells this DataNode to coordinate the block recovery work with other DataNodes. That DataNode reports back to the NameNode when it is done. The NameNode then updates its internal state of this block, removes the lease, and commits the change to edit log.

Sometimes, administrator needs to recover the lease of a file forcibly before the hard limit expires. A CLI debug command is available (starting from Hadoop release 2.7 and CDH 5.3) for this purpose:

hdfs debug recoverLease [-path <path>] [-retries <num-retries>]


Lease recovery, block recovery, and pipeline recovery are essential to HDFS fault-tolerance. Together, they ensure that writes are durable and consistent in HDFS, even in the presence of network and node failures.

You should now have a better understanding of when and why lease recovery and block recovery are invoked, and what they do. In Part 2, we’ll explore pipeline recovery.

Yongjun Zhang is a Software Engineer at Cloudera.

Categories: Hadoop

How-to: Install and Use Cask Data Application Platform Alongside Impala

Cloudera Blog - Wed, 02/11/2015 - 14:10

Cloudera customers can now install, launch, and monitor CDAP directly from Cloudera Manager. This post from Nitin Motgi, Cask CTO, explains how.

Today, Cloudera and Cask are very happy to introduce the integration of Cloudera’s enterprise data hub (EDH) with the Cask Data Application Platform (CDAP). CDAP is an integrated platform for developers and organizations to build, deploy, and manage data applications on Apache Hadoop. This initial integration will enable CDAP to be installed, configured, and managed from within Cloudera Manager, a component of Cloudera Enterprise. Furthermore, it will simplify data ingestion for a variety of data sources, as well as enable interactive queries via Impala. Starting today, you can download and install CDAP directly from Cloudera’s downloads page.

In this post, you’ll learn how to get started by installing CDAP using Cloudera Manager. We have also created a video of this integration for further exploration.

Installing CDAP with Cloudera Manager

To install CDAP on a cluster managed by Cloudera Manager, we have provided a CSD (Custom Service Descriptor). This CSD gives Cloudera Manager the required information on where to download CDAP from and how to configure and run CDAP services. To install CDAP CSD, first drop the downloaded jar (from Cloudera’s download page) into the /opt/cloudera/csd directory on the Cloudera Manager server and restart Cloudera Manager. Cloudera’s full documentation on installing CSDs can be found here.

Once CSD is installed, the first thing you want to do is download the CDAP parcel from the Hosts -> Parcels page. Note that by default, the CSD adds a Remote Parcel Repository URL for the latest version of CDAP at http://repository.cask.co/parcels/cdap/latest/. If desired, you can specify a particular version of CDAP, for example http://repository.cask.co/parcels/cdap/2.7/

With a CDAP Parcel Repository URL configured, you will now see the CDAP Parcel available for download in the parcels page. From there, you can download, distribute, and activate the CDAP parcel on your cluster hosts.

Once the parcel is installed, the next step is to configure and start the CDAP services. Before doing so, note that there are a few additional outside requirements for running CDAP on your cluster. Please refer to the prerequisites section. Once all the prerequisites are satisfied, you can begin installing CDAP via the “Add Service” wizard.

The “Add Service” wizard will guide you through selecting the hosts you want to run CDAP on and customizing the configuration. A few things to note during the wizard:

  • CDAP consists of the Gateway/Router, Kafka, Master, and Web-App roles, and an optional Auth role. These services can all be thought of as “master” services. We recommend installing all roles together on a host, with multiple hosts for redundancy. Additionally, there is a client “Gateway” role that can be installed on any host where it is desired to run CDAP client tools (such as cdap-cl).
  • There is an optional “Explore” capability for ad-hoc querying via Apache Hive. If you plan on using this, be sure to select the service dependency set containing Hive and check the “Explore Enabled” option on the configuration page.
  • If you are installing CDAP on a Kerberos-enabled cluster, you must select the “Kerberos Auth Enabled” checkbox on the configuration page.

Finally, sit back and watch as Cloudera Manager spins up your CDAP services! Once it completes, check out the CDAP Console from the “Quick Links” on the CDAP Service overview page. For more details on Cloudera Manager/CDAP integration, please click here.

Ingesting Data and Exploring It with Impala

Streams are the primary means of bringing data from external systems into CDAP in real-time. They are ordered, time-partitioned sequences of data, usable for both real-time and batch collection and consumption of data. Using the CDAP Command Line Interface (CLI), you can easily create streams.

First, connect to your CDAP instance using the CLI:

> connect <hostname>:11015

Next, create a Stream:

> create stream trades

You can then add events to a Stream one by one:

> send stream trades 'NFLX,441.07,50' > send stream trades 'AAPL,118.63,100' > send stream trades 'GOOG,528.48,10'

Alternatively, you can add the entire contents of a file:

> load stream trades /my/path/trades.csv

Or you can use other tools or APIs available to ingest data in real-time or batch. For more information on what are other ways of ingesting data into CDAP, please refer to the docs here.

You can now examine the contents of your stream by executing a SQL query:

> execute 'select * from cdap_stream_trades limit 5' +==================================================================================================================+ | cdap_stream_trades.ts: BIGINT | cdap_stream_trades.headers: map<string,string> | cdap_stream_trades.body: STRING | +==================================================================================================================+ | 1422924257559 | {} | NFLX,441.07,50 | | 1422924261588 | {} | AAPL,118.63,100 | | 1422924265441 | {} | GOOG,528.48,10 | | 1422924291009 | {"content.type":"text/csv"} | GOOG,538.53,18230 | | 1422924291009 | {"content.type":"text/csv"} | GOOG,538.17,100 | +==================================================================================================================+

You can also attach a schema to your stream to enable more powerful queries:

> set stream format trades csv 'ticker string, price double, trades int' > execute 'select ticker, sum(price * trades) / 1000000 as millions from cdap_stream_trades group by ticker order by millions desc' +=====================================+ | ticker: STRING | millions: DOUBLE | +=====================================+ | AAPL | 3121.8966341143905 | | NFLX | 866.0789117408007 | | GOOG | 469.01340359839986 | +=====================================+

On one of our test clusters, the query above took just about two minutes to complete.

Data in CDAP is integrated with Apache Hive and the above query translates into a Hive query. As such, it will launch two MapReduce jobs in order to calculate the query results, which is why it takes minutes instead of seconds. To cut down query time, you can use Impala to query the data instead of Hive. Since Streams are written in a custom format, they cannot be directly queried through Impala. Instead, you can create an Adapter that regularly reads Stream events and writes those events into files on HDFS that can then be queried by Impala. You can also do this through the CLI:

> create stream-conversion adapter ticks_adapter on trades frequency 10m format csv schema "ticker string, price double, trades int"

This command will create an Adapter that runs every 10 minutes, reads the last 10 minutes of events from the Stream, and writes them to a file set that can be queried through Impala. The next time the Adapter runs, it will spawn a MapReduce job that reads all events added in the past 10 minutes, writes each event to Avro encoded files, and registers a new partition in the Hive Metastore.

You can then query the contents using Impala. On your cluster, use the Impala shell to connect to Impala:

$ impala-shell -i <impala-host> > invalidate metadata > select ticker, sum(price * trades) / 1000000 as millions from cdap_user_trades_converted group by ticker order by millions desc +--------+-------------------+ | ticker | millions | +--------+-------------------+ | AAPL | 3121.88477111439 | | NFLX | 866.0568582408006 | | GOOG | 469.0081187983999 | +--------+-------------------+ Fetched 3 row(s) in 1.03s

Since you are using Impala, no MapReduce jobs are launched and the query comes back in a second!

Now that you have data in CDAP and are able to explore your data, you can use CDAP to build real-time, batch, or real-time and batch data application. For more information on how to build data applications using CDAP, please visit http://docs.cask.co.


Categories: Hadoop