Hello, Kite SDK 1.0

Cloudera Blog - Tue, 03/03/2015 - 23:09

The Kite project recently released a stable 1.0!

This milestone means that Kite’s data API and command-line tools is ready for long-term use.

The 1.0 data modules and API are no longer rapidly changing. From 1.0 on, Kite will be strict about breaking compatibility and will use semantic versioning to signal what compatibility guarantees you can expect from a given release. For example, breaking changes require increasing the major version number, so both minor and patch updates are safe to use without code changes, and binary compatible.

Kite provides some additional guarantees as well:

  • Kite’s command-line tool, kite-dataset, also follows semantic versioning. Changes that may break scripts will require updating the major version number.
  • Incompatible changes to Kite’s on-disk formats, Avro and Parquet, will also be signalled by a major version update.
  • Kite’s storage formats, metadata, and file layout are forward-compatible for at least one major version. You can update Kite, write to existing datasets, and roll back to the previous version safely.

We’re excited to get the Kite 1.0 release out. Now, Kite provides a great high-level API built around how you work with your data, and stability guarantees so you can be confident building on top of it.

Learn more about Kite:

Categories: Hadoop

How-to: Let Users Provision Apache Hadoop Clusters On-Demand

Cloudera Blog - Mon, 03/02/2015 - 16:51

Providing Hadoop-as-a-Service to your internal users can be a major operational advantage.

Cloudera Director (free to download and use) is designed for easy, on-demand provisioning of Apache Hadoop clusters in Amazon Web Services (AWS) environments, with support for other cloud environments in the works. It allows for provisioning clusters in accordance with the Cloudera AWS Reference Architecture.

At Cloudera, Cloudera Director is used internally to enable our technical field to provision clusters on-demand for demos, POCs, and testing purposes. The following post describes the goals of Cloudera’s internal environment, and how you can set it up your own to meet similar requirements.


Our internal deployment has several goals:

  • Provide a way to provision Hadoop clusters on-demand in a self-service manner as and when the field team needs them
  • Use a common AWS account
  • Make clusters accessible from within Cloudera’s network, making them look like an extension to our data center
  • Enable clusters to access Amazon S3 and the internet with the aggregate bandwidth scaling proportional to the cluster and instance sizes
  • Conform to Cloudera IT’s guidelines on AWS usage, which require all users to tag their instances with their usernames for them. All stray instances are terminated.

Ordinarily, you would set up the network context from the AWS management console, the AWS CLI client, or using AWS Quickstart. As it stands today, AWS Quickstart does not meet our goals because it doesn’t create the VPN, IAM roles, and so on the way we want them—it works great if you don’t need the VPN setup and are starting afresh. We chose to set things up manually instead, and the process is described below.

Setting Up a VPC

The first piece of that setup is the VPC, which is a one-time investment you can repurpose for other deployments that you might have in AWS with similar requirements. To do that, go into the VPC section of the console. It’ll look something like this:

As articulated in Cloudera’s reference architecture, three kinds of setups are possible: VPC with public subnet only, VPC with private subnet only, or VPC with private and public subnets. The choice depends on what kind of connectivity and security requirements you have. If you are looking to transfer data between your cluster and S3, you’ll need to deploy your cluster in a public subnet so it has public IP addresses and can interact with S3. Once you create your VPC, configure the route tables and virtual gateways to link it back to your data center using a VPN or Direct Connect link. Instructions for setting up a VPN are available here and for setting up Direct Connect are available here

Setting Up the Subnets

To have a cluster in AWS that has S3 and internet access, you need the instances to have public IP addresses. For the instances to be accessible from within your network, you need them to be linked via VPN or Direct Connect. Create a subnet inside the VPC you just created. It’ll automatically populate the route table from the VPC. The configurations would look something like the following:

You can leave the NACLs to default.

Setting Up Security Groups

To allow users to spin up clusters on-demand in a self-service manner, you need every provisioned instance to adhere to a set of ingress and egress rules at the bare minimum. These rules would allow outbound traffic to the internet and S3 and bidirectional traffic from Cloudera’s network. To do that, you can create a security group with the following rules:

Creating these security groups by themselves isn’t sufficient to enforce all instances to use them. You have to disallow creation, modification, and deletion of security groups in this VPC so that users have the option of only using the one you created for them.

Setting Up IAM Rules

Once you have the above setup configured, you don’t want users to be able to create, delete, or modify any of the configurations, especially security groups. If not restricted via IAM rules, security groups are easy to modify. For that, you can set IAM rules restricting users from modifying anything in this environment by having the rule like the following:

{     "Statement":[        {           "Effect":"Allow",          "NotAction":"iam:*",          "Resource":"*"       },       {           "Effect":"Deny",          "Action":"*",          "Resource":"arn:aws:ec2:us-east-1:*:security-group/sg-dbc403bf"       },       {           "Effect":"Deny",          "Action":[              "ec2:AuthorizeSecurityGroupIngress",             "ec2:AuthorizeSecurityGroupEgress",             "ec2:RevokeSecurityGroupIngress",             "ec2:RevokeSecurityGroupEgress",             "ec2:DeleteSecurityGroup",             "ec2:CreateSecurityGroup"          ],          "Resource":"*",          "Condition":{              "StringEquals":{                 "ec2:Vpc":"arn:aws:ec2:us-east-1:007856030109:vpc/vpc-c2d650a7"             }          }       }    ] }

This rule disallows users from performing any action on the specified security group that you have created as a part of the network context as well as creating, deleting, or modifying any other security groups inside the VPC that you’ve created. That way, you can rest assured you are not running at the risk of opening your network to more traffic than you intended.

Setting Up Cloudera Director

Cloudera Director setup instructions can be found in the documentation.

Once installed, you can create an environment inside Director with the specified public subnet so clusters in that environment can access the internet and S3. You’ll have to share the pem file for this environment with all users since keys are environment specific. Once the environment is set up, you can create instance templates with appropriate instance type configurations and tags based on your requirement. Cloudera tracks internal usage based on tags and every user marks their instances via tags. For that reason, we have every user create a template for themselves with their name in the tags.

Creating Clusters

Cloudera Director provides you with a web UI you can use to provision clusters. Click the “Add Cluster” button on the web UI to start the process.

You’ll create a Cloudera Manager instance first and then a cluster. You can also add a cluster to an existing Cloudera Manager instance, if you have one. The cluster configuration page looks like the following:

On clicking “Continue,” Cloudera Director will bootstrap your cluster and it’ll become available through the Cloudera Manager instance to which you added the cluster. That way, your users can provision clusters on-demand for different purposes.

Billing and Metering

Currently, there is no special billing and metering provision in Cloudera Director or Cloudera Manager. Pay-as-you-go pricing is available in lieu of annual subscription pricing. To track your usage, you can leverage the EC2 instance tags and detailed billing that’ll give you a breakdown of usage based on tags and other filters. You can also use third-party tools such as Cloudhealth and Cloudability to create the reports (which we do internally at Cloudera to track usage).


In this post, you learned how to set up and use AWS and Cloudera Director to provide Hadoop as a service to your users. At Cloudera, we are committed to continually improving this experience and would love to hear your feedback about what’s working and what’s not in the Cloudera Director area of community.cloudera.com.

Amandeep Khurana is Principal Solutions Architect at Cloudera. He is a co-author of the Manning book, HBase in Action.

Categories: Hadoop

How-to: Do Real-Time Log Analytics with Apache Kafka, Cloudera Search, and Hue

Cloudera Blog - Fri, 02/27/2015 - 16:35

Cloudera recently announced formal support for Apache Kafka. This simple use case illustrates how to make web log analysis, powered in part by Kafka, one of your first steps in a pervasive analytics journey.

If you are not looking at your company’s operational logs, then you are at a competitive disadvantage in your industry. Web server logs, application logs, and system logs are all valuable sources of operational intelligence, uncovering potential revenue opportunities and helping drive down the bottom line. Whether your firm is an advertising agency that analyzes clickstream logs for customer insight, or you are responsible for protecting the firm’s information assets by preventing cyber-security threats, you should strive to get the most value from your data as soon as possible.

In the past, it is cost-prohibitive to capture all logs, let alone implement systems that act on them intelligently in real time. Recently, however, technology has matured quite a bit and, today, we have all the right ingredients we need in the Apache Hadoop ecosystem to capture the events in real time, process them, and make intelligent decisions based on that information.

In this post, you will explore a sample implementation of a system that can capture Apache HTTP Server logs in real time, index them for searching, and make them available to other analytic apps as part of a “pervasive analytics” approach. This implementation is based on open source components such as Apache Flume, Apache Kafka, Hue, and Apache Solr.

Flume, Solr, Hue, and Kafka can all be easily installed using Cloudera Manager and parcels (the first three via the CDH parcel, and Kafka via its own parcel).


The high-level diagram below illustrates a simple setup that you can deploy in a matter of minutes. For our purposes, Apache web server log events originate in syslog. They are then forwarded to a Flume Agent, via Flume Syslog Source. Syslog Source sends them to Kafka Channel, which in turn passes them to a MorphlineSolr sink. MorphlineSink parses the messages, converts them into Solr documents, and sends them to Solr Server. After the indexed documents appear in Solr, Hue’s Search Application is utilized to search the indexes and build and display multiple unique dashboards for various audiences.


Next, you will learn all the details behind the above.

Apache Logs Breakdown

Every time you start a new project that involves Solr, you must first understand your data and organize it into fields. Fortunately, Apache web server logs are easy enough to understand and relate to Solr documents. A sample of the logs can be found below: - - [15/Dec/2014:06:39:51 +0000] "GET /accounts/login/?next=/ HTTP/1.1" 302 460 "-" "Mozilla/5.0+(compatible; UptimeRobot/2.0; http://www.uptimerobot.com/)" 55006 - - [15/Dec/2014:06:39:54 +0000] "GET /pig/watch/0000365-141119075018336-oozie-oozi-W?format=python&_=1418625519197 HTTP/1.1" 200 719 "http://demo.gethue.com/pig/#logs" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)" 37789 - - [15/Dec/2014:06:39:55 +0000] "GET /pig/watch/0000365-141119075018336-oozie-oozi-W?format=python&_=1418625519198 HTTP/1.1" 200 719 "http://demo.gethue.com/pig/#logs" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)" 28120

The diagram below represents a simple view of how to organize raw Apache web server messages into Solr fields:

There’s Something About the Cloud

Cloudera Search, which is integrates Solr with HDFS, is deployed in SolrCloud mode with all the options and flexibility that come with integrating to the rest of the Hadoop ecosystem in CDH. Throughout this example, you will use the solrctl command to manage SolrCloud deployments. (For the full command reference, please click here.)

Let’s begin by generating template configuration files for Solr. The most important and the only file to update is schema.xml,which is used in Solr to define the fields in the collection, their types, and their indexing characteristics. The command below generates conf directory with all configuration files in the $HOME/accessCollection folder:

solrctl --zk localhost:2181/solr instancedir --generate $HOME/accessCollection

(Please note that –zk localhost:2181 should be replaced with the address and port of your own Apache ZooKeeper quorum.)

schema.xml file. What follows is a brief overview of what was changed from the template generated above. The fields relevant to the Apache logs have to be defined in the schema file:

<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="_version_" type="long" indexed="true" stored="true"/> <field name="time" type="tdate" indexed="true" stored="true" /> <field name="record" type="text_general" indexed="true" stored="false" multiValued="true"/> <field name="client_ip" type="string" indexed="true" stored="true" /> <field name="code" type="string" indexed="true" stored="true" /> <field name="user_agent" type="string" indexed="true" stored="true" /> <field name="protocol" type="string" indexed="true" stored="true" /> <field name="url" type="string" indexed="true" stored="true" /> <field name="request" type="string" indexed="true" stored="true" /> <field name="referer" type="string" indexed="true" stored="true" /> <field name="bytes" type="tint" indexed="true" stored="true" /> <field name="method" type="string" indexed="true" stored="true" /> <field name="extension" type="string" indexed="true" stored="true" /> <field name="app" type="string" indexed="true" stored="true" /> <field name="subapp" type="string" indexed="true" stored="true" /> <field name="device_family" type="string" indexed="true" stored="true" /> <field name="user_agent_major" type="string" indexed="true" stored="true" /> <field name="user_agent_family" type="string" indexed="true" stored="true" /> <field name="os_family" type="string" indexed="true" stored="true" /> <field name="os_major" type="string" indexed="true" stored="true" /> <field name="region_code" type="string" indexed="true" stored="true" /> <field name="country_code" type="string" indexed="true" stored="true" /> <field name="city" type="string" indexed="true" stored="true" /> <field name="latitude" type="float" indexed="true" stored="true" /> <field name="longitude" type="float" indexed="true" stored="true" /> <field name="country_name" type="string" indexed="true" stored="true" /> <field name="country_code3" type="string" indexed="true" stored="true" />

Although you are not using the id and _version_ fields in this application, Solr uses them internally for its own bookkeeping. Therefore, every collection must have them (as defined in the schema.xml file).

One very important concept in SolrCloud deployments is the notion of collections. A collection is a single index that spans multiple Solr Instances. For example, if your syslog index is distributed across multiple Solr Instances, they all add up to form one collection.

Let’s call our collection accessCollection and set it up using the commands below. The first command uploads all of the configurations into a ZooKeeper znode. The second command creates the collection in Solr, based on the configuration in ZooKeeper from the first command.

solrctl --zk localhost:2181/solr instancedir --create accessCollection $HOME/accessCollection solrctl --zk localhost:2181/solr --create accessCollection -s 1

Again, replace –zk localhost:2181 with your own ZooKeeper quorum configuration in both statements.

Note that the -s 1 argument defines the number of shards. A shard is a very important concept in Solr that refers to a slice of an index. For example, if you have a corpus of 1 million events, you may want to split it into two shards for scalability and improved query performance. The first shard might handle all the documents that have an id between 0-500,000, and the second shard will handle documents with message id between 500,000-1,000,000. Solr handles all this logic internally; you need only specify the number of shards you would like to create with the -s option.

The number of shards will depend on many factors and should be determined carefully. The following table describes some of the considerations that should go into choosing the optimal number:

How Flume Met Kafka

Before you index the logs for searching, you need to collect them from the application servers.

Flume is a distributed system for collecting and processing log data. One of the main advantages of Flume is its large collection of sources and sinks. In many cases, Flume makes integration a no-brainer.

As previously described, our example uses Flume with Syslog Source to collect the log data from syslog, Kafka as a distributed and highly available channel to store the log data, and Solr sink with Morphlines to index the data and store it in Cloudera Search. All this can be done by properly configuring Flume, without writing a line of code. You can find the configuration file here.

There are three components in the configuration:

  • First, a syslog source, configured with the host and port to which it will bind.
    # Syslog Source Configuration tier1.sources.source1.type     = syslogtcp # the hostname that Flume Syslog source will be running on tier1.sources.source1.host     = localhost # the port that Flume Syslog source will listen on tier1.sources.source1.port     = 5040
  • Next, a Solr sink, configured with a configuration file that we’ll review in detail later.
    tier1.sinks.sink1.type          = org.apache.flume.sink.solr.morphline.MorphlineSolrSink tier1.sinks.sink1.morphlineFile = /apache_logs/latest/morphline.conf
  • Finally, a Kafka channel in between them.
    tier1.channels.channel1.type             = org.apache.flume.channel.kafka.KafkaChannel tier1.channels.channel1.transactionCapacity = 1000 tier1.channels.channel1.brokerList          = kafkaf-2:9092,kafkaf-3:9092 tier1.channels.channel1.topic               = channel1 tier1.channels.channel1.zookeeperConnect    = kafkaf-1:2181

The Kafka channel requires two mandatory parameters:

  • Location of at least one, but preferably two or more Kafka brokers
  • Location of the ZooKeeper quorum that Kafka uses

There are also a few optional parameters:

  • topic – specifies which topic the channel will use. It’s important to set it correctly if you expect Flume to read data that other apps wrote to Kafka; the topic should match between the apps and the Kafka Channel configuration. The default topic is flume-channel.
  • groupId – if multiple Kafka channels share the same groupId and same topic, they will each get partial data from the topic. Thus you can use this setting to add scalability via multiple Flume agents, each with a Kafka channel configured with the same groupId. Or, if you need multiple channels that all receive all the data from Kafka (essentially duplicating all the data), you’ll want to use different groupIds or different topics. The default groupId is flume.
  • transactionCapacity – the number of events the channel processes in one transaction. Setting this parameter to a higher number can increase throughput but latency as well.
  • parseAsFlumeEvent – A setting of “true” assumes that all events in the Kafka topic were written by a Flume source or Flume client. Thus the first time the channel starts, all events in the topic are read (subsequently, only the last recorded position is read). A setting of “false” assumes that some other application wrote the events to Kafka so thus they’re not parsed. In addition, only events written after the channel started are read (since the topic may have a large history in it already).

All this is nice if the data is arriving from syslog and going only to Solr by way of Morphlines, but in today’s enterprise IT, there are usually many different data sources. In many companies, applications write important events directly to Kafka without going through syslog or Log4J at all.

To get data from Kafka, parse it with Morphlines, and index it into Solr, you can use an almost identical configuration. The only changes required are:

  • Leave out the Syslog source.
  • When configuring the Kafka channel, specify: parseAsFlumeEvent = false.

These changes are necessary because the events are now written to Kafka by apps other than Flume, so the source is not necessary (Kafka channel will get events from Kafka to SolrSink) and the events in the channel can be any data type, not necessarily a FlumeEvent.

This configuration allows indexing and searching using Cloudera Search any enterprise event that was written to Kafka (including logs, metrics, audit events, and so on).

ETL Coding with Kite Morphlines

A “morphline” is a rich configuration file that makes it easy to define a transformation chain that can consume any kind of data from any kind of data source, process that data, and load the results into a Hadoop component. Apache log parsing is achieved with the help of the Morphlines library, an open source framework available through the Kite SDK, that defines a transformation chain without a single line of code.

Our morphline configuration file will break down raw apache logs and generate Solr fields that will be used for indexing. The morphlines library will perform the following actions:

  • Read the logs with the readCSV command, using space as a separator
  • Use the split command to break up request field into three parts:  method, url, protocol
  • Use the split command to extract app and subapp fields from the url field
  • Use the userAgent command to extract all of the device, OS, and user agent information
  • Use the geoIP and extractJsonPaths commands to retrieve geo coordinates such as country, region, city, latitude, and longitude by doing a lookup against an efficient in-memory Maxmind database. Therefore, the databases need to be downloaded from Maxmind
  • Generate unique ID for every log with the generateUUID command
  • Convert the date/timestamp into a field that Solr will understand, with the convertTimestamp command
  • Drop all of the extra fields that we did not specify in schema.xml, with the sanitizeUknownSolrFields command, and
  • Load the record into Solr for HDFS write, with the loadSolr command

When building this example, we initially used three morphlines commands to break up the Apache log event: readCSV, split, split. Our intention was to make this blog more generic and demonstrate how easy it can be adapted to all different types of logs. However, the creators of the morphlines library have generously provided a number of pre-defined patterns for commonly used log formats, including Apache web server ones. What follows is an alternative way of reading the Apache log events and breaking them up into fields via morphlines:

{        readLine {          ignoreFirstLine : true          commentPrefix : "#"          charset : UTF-8        }      }                    {        grok {          dictionaryFiles : [target/test-classes/grok-dictionaries]                   expressions : {                      message : """<%{COMBINEDAPACHELOG:apache_log}>"""               }          extract : inplace          findSubstrings : false          addEmptyStrings : false        }      }

Picture Perfect with Hue Dashboards

Now that your logs are indexed in near real time, you need a dashboard to search and drill into the events. The best tool for this job is Hue, the open source GUI for Hadoop, which comes preloaded with a Search application.


With just a few simple clicks, we can generate a nice dashboard, and present it to the end user. 

Start by clicking the Search->Indexes menu item, then click on Dashboards and Create. You will then see a new dashboard template window such as the one below; to get going, click the little pencil (Edit) button. 

Next, choose how to present the search results on the dashboard. For our demo, we chose Grid Layout, but if you are handy with HTML you can chose an HTML layout and present the results in a sexier manner. 

The next step is where all fun begins: You can drag and drop different widgets on the screen and assign them to the fields in the index. At the moment, Hue Search Dashboards support the following widgets:

  • Filter Bar
  • Marker Map
  • Text Facet
  • Pie Chart
  • Bar Chart
  • Line Chart
  • Tree
  • Heatmap
  • Timeline
  • Gradient Map

For full reference of how to build your own dashboards, follow the links below:


For our Apache logs demo, we used pie charts to give users the ability to drill into Application, Region, and Operating System facets. Text facets allow users to drill into country and city. A timeline view provides a nice graphical view of when users accessed our website. Finally, a marker map visually displays geo locations from which users accessed our example website.


Although the main example in this post describes a use case involving Apache web server logs, you could just easily use the same components for any type of log/event processing. For an information security use case, processing proxy and firewall logs in real time can go a long way toward stopping external attacks and preventing insider threats. For an insurance company, processing claims and making them searchable to adjusters and fraud analysts can decrease time to resolution.

Whatever the use case, the ongoing investment in pervasive analytics is key.

Gwen Shapira is a Software Engineer at Cloudera, working on the Data Ingest team.

Jeff Shmain is a Solutions Architect at Cloudera.

Categories: Hadoop

Download the Hive-on-Spark Beta

Cloudera Blog - Wed, 02/25/2015 - 14:51

A Hive-on-Spark beta is now available via CDH parcel. Give it a try!

The Hive-on-Spark project (HIVE-7292) is one of the most watched projects in Apache Hive history. It has attracted developers from across the ecosystem, including from organizations such as Intel, MapR, IBM, and Cloudera, and gained critical help from the Spark community.

Many anxious users have inquired about its availability in the last few months. Some users even built Hive-on-Spark from the branch code and tried it in their testing environments, and then provided us valuable feedback. The team is thrilled to see this level of excitement and early adoption, and has been working around the clock to deliver the product at an accelerated pace.

Thanks to this hard work, significant progress has been made in the last six months. (The project is currently incubating in Cloudera Labs.) All major functionality is now in place, including different flavors of joins and integration with Spark, HiveServer2, and YARN, and the team has made initial but important investments in performance optimization, including split generation and grouping, supporting vectorization and cost-based optimization, and more. We are currently focused on running benchmarks, identifying and prototyping optimization areas such as dynamic partition pruning and table caching, and creating a roadmap for further performance enhancements for the near future.

Two month ago, we announced the availability of an Amazon Machine Image (AMI) for a hands-on experience. Today, we even more proudly present you a Hive-on-Spark beta via CDH parcel. You can download that parcel here. (Please note that in this beta release only HDFS, YARN, Apache ZooKeeper, and Hive are supported. Other components, such as Apache Pig and Impala, might not work as expected.) The “Getting Started” guide will help you get your Hive queries up and running on the Spark engine without much trouble.

We welcome your feedback. For assistance, please use user@hive.apache.org or the Cloudera Labs discussion board.

We will update you again when GA is available. Stay tuned!

Xuefu Zhang is a software engineer at Cloudera and a Hive PMC member.

Categories: Hadoop

Apache HBase 1.0 is Released

Cloudera Blog - Wed, 02/25/2015 - 00:20

The Cloudera HBase Team are proud to be members of Apache HBase’s model community and are currently AWOL, busy celebrating the release of the milestone Apache HBase 1.0. The following, from release manager Enis Soztutar, was published today in the ASF’s blog.


The Apache HBase community has released Apache HBase 1.0.0. Seven years in the making, it marks a major milestone in the Apache HBase project’s development, offers some exciting features and new API’s without sacrificing stability, and is both on-wire and on-disk compatible with HBase 0.98.x.


In this blog, we look at the past, present and future of Apache HBase project. 

Versions, versions, versions 

Before enumerating feature details of this release let’s take a journey into the past and how release numbers emerged. HBase started its life as a contrib project in a subdirectory of Apache Hadoop, circa 2007, and released with Hadoop. Three years later, HBase became a standalone top-level Apache project. Because HBase depends on HDFS, the community ensured that HBase major versions were identical and compatible with Hadoop’s major version numbers. For example, HBase 0.19.x worked with Hadoop 0.19.x, and so on.

However, the HBase community wanted to ensure that an HBase version can work with multiple Hadoop versions—not only with its matching major release numbers Thus, a new naming scheme was invented where the releases would start at the close-to-1.0 major version of 0.90, as show above in the timeline. We also took on an even-odd release number convention where releases with odd version numbers were “developer previews” and even-numbered releases were “stable” and ready for production. The stable release series included 0.90, 0.92, 0.94, 0.96 and 0.98. (See HBase Versioning for an overview.)

After 0.98, we named the trunk version 0.99-SNAPSHOT, but we officially ran out of numbers! Levity aside, last year, the HBase community agreed that the project had matured and stabilized enough such that a 1.0.0 release was due. After three releases in the 0.99.x series of “developer previews” and six Apache HBase 1.0.0 release candidates, HBase 1.0.0 has now shipped! See the above diagram, courtesy of Lars George, for a timeline of releases. It shows each release line together with the support lifecycle, and any previous developer preview releases if any (0.99->1.0.0 for example).

HBase-1.0.0, start of a new era

The 1.0.0 release has three goals:

1) to lay a stable foundation for future 1.x releases;

2) to stabilize running HBase cluster and its clients; and

3) make versioning and compatibility dimensions explicit 

Including previous 0.99.x releases, 1.0.0 contains over 1500 jiras resolved. Some of the major changes are: 

API reorganization and changes

HBase’s client level API has evolved over the years. To simplify the semantics and to support and make it extensible and easier to use in the future, we revisited the API before 1.0. To that end, 1.0.0 introduces new APIs, and deprecates some of the commonly-used client side APIs (HTableInterface, HTable and HBaseAdmin).

We advise you to update your application to use the new style of APIs, since deprecated APIs will be removed in the future 2.x series of releases. For further guidance, please visit these two decks: http://www.slideshare.net/xefyr/apache-hbase-10-release and http://s.apache.org/hbase-1.0-api.

All Client side APIs are marked with the InterfaceAudience.Public class, indicating if a class/method is an official “client API” for HBase (See “11.1.1. HBase API Surface” in the HBase Refguide for more details on the Audience annotations). Going forward, all 1.x releases are planned to be API compatible for classes annotated as client public.

Read availability using timeline consistent region replicas

As part of phase 1, this release contains an experimental “Read availability using timeline consistent region replicas” feature. That is, a region can be hosted in multiple region servers in read-only mode. One of the replicas for the region will be primary, accepting writes, and other replicas will share the same data files. Read requests can be done against any replica for the region with backup RPCs for high availability with timeline consistency guarantees. See JIRA HBASE-10070 for more details.

Online config change and other forward ports from 0.89-fb branch

The 0.89-fb branch in Apache HBase was where Facebook used to post their changes. HBASE-12147 JIRA forward ported the patches which enabled reloading a subset of the server configuration without having to restart the region servers.

Apart from the above, there are hundreds of improvements, performance (improved WAL pipeline, using disruptor, multi-WAL, more off-heap, etc) and bug fixes and other goodies that are too long to list here. Check out the official release notes for a detailed overview. The release notes and the book also cover binary, source and wire compatibility requirements, supported Hadoop and Java versions, upgrading from 0.94, 0.96 and 0.98 versions and other important details.

HBase-1.0.0 is also the start of using “semantic versioning” for HBase releases. In short, future HBase releases will have MAJOR.MINOR.PATCH version with the explicit semantics for compatibility. The HBase book contains all the dimensions for compatibility and what can be expected between different versions.

What’s Next

We have marked HBase-1.0.0 as the next stable version of HBase, meaning that all new users should start using this version. However, as a database, we understand that switching to a newer version might take some time. We will continue to maintain and make 0.98.x releases until the user community is ready for its end of life. 1.0.x releases as well as 1.1.0, 1.2.0, etc line of releases are expected to be released from their corresponding branches, while 2.0.0 and other major releases will follow when their time arrives.

Read replicas phase 2, per column family flush, procedure v2, SSD for WAL or column family data, etc are some of the upcoming features in the pipeline. 


Finally, the HBase 1.0.0 release has come a long way, with contributions from a very large group of awesome people and hard work from committers and contributors. We would like to extend our thanks to our users and all who have contributed to HBase over the years.

Keep HBase’ing!

Categories: Hadoop

What’s New in Cloudera Director 1.1?

Cloudera Blog - Thu, 02/19/2015 - 14:17

Cloudera Director 1.1 introduces new features and improvements that provide more options for creating and managing cloud deployments of Apache Hadoop. Here are details about how they work.

Cloudera Director, which was released in October of 2014, delivers production-ready, self-service interaction with Apache Hadoop clusters in cloud environments. You can find background information about Cloudera Director’s purpose and fundamental features in our earlier introductory blog post and technical overview blog post.

The 1.1 release of Cloudera Director builds on the premiere release with new features and improvements to existing functionality. Let’s take a look at some of them.

On-Demand Shrinking of Clusters

As you use Hadoop clusters, you may find that you need to expand them to handle heavier workloads, and Cloudera Director 1.0 already lets you “grow” your clusters by allocating new instances and adding them to cluster roles.

Cloudera Director 1.1 now also lets you “shrink” your clusters by removing both compute and storage instances you no longer need and adjusting your cluster configuration accordingly. This completes Cloudera Director’s mechanism for managing the size of your clusters, letting you more easily manage resource usage in your cloud environment.

The Cloudera Director UI uses the grow and shrink capabilities to let you “repair” nodes in your clusters. By simply tagging a node for repair, Cloudera Director will replace a node with a fresh one, handling all the cloud provider and configuration details for you.

On-Demand Database Creation

Cloudera Director 1.0 lets you configure Cloudera Manager and cluster services like Hive to use external databases hosted on separate database servers, instead of relying on the PostgreSQL database embedded within Cloudera Manager. Hosting databases on your own servers lets you implement the backup and availability strategies that may be required by your enterprise.

Cloudera Director 1.1 goes further with external database support by creating the necessary databases for you when you describe the databases using external database templates, either through the Cloudera Director server API or the client configuration file. Not only can Cloudera Director create the databases while bootstrapping your deployment and cluster, but it can also destroy them when you terminate your deployment and cluster.

In order for Cloudera Director to know where to create new databases, you also define the database servers that you would like to use. Cloudera Director uses the capabilities of Cloudera Manager to communicate with your database servers, meaning that Cloudera Director does not even need to have direct access to them for databases to be created.

Amazon RDS Integration

Cloudera Director 1.1 goes even further with external database support by adding support for Amazon Relational Database Service (RDS). Instead of defining database servers that already exist, you can define them in terms of RDS instance information, and Cloudera Director will allocate and bootstrap them for you while it works to bootstrap your new deployments.

Cloudera Director Database on MySQL

The server component of Cloudera Director 1.0 uses an embedded database by default for its own data. Cloudera Director 1.1 adds support for hosting this database instead on an external MySQL database server.

Cloudera Director API Client Libraries

The Cloudera Director server’s RESTful API exposes all of Cloudera Director’s features. With the Cloudera Director 1.1 release, Cloudera is offering client-side libraries implemented in Java and Python to make it easy for developers to integrate with Cloudera Director in just a few lines of code. The libraries are automatically generated directly from the API itself, so you know it covers each and every service available.

The client libraries are available from Cloudera’s Github account and are Apache-licensed so you can use them anywhere. Sample code using the clients is also available to help developers get started.

Faster Bootstrap of Cloudera Manager and Clusters

Cloudera Director 1.1 includes performance improvements beyond 1.0.

  • Cloudera Manager uses parcels for packaging and distributing CDH. Cloudera Director is now smarter about detecting when parcels are already available to Cloudera Manager locally, and avoids downloading them again.
  • If you install Cloudera Manager on your machine images (e.g., AMIs), Cloudera Director can detect that Cloudera Manager is already present and skip downloading it and installing it.

You can combine these improvements by creating machine images with Cloudera Manager and CDH parcels already installed. Cloudera Director can take advantage of this work and shorten bootstrap time by avoiding expensive downloads.

Expanded Cloudera Director UI and Documentation

The Cloudera Director UI has been enhanced with lots of new features. Here’s just a sampling.

  • When you first visit the UI after server installation, a welcome page gives you the information you need to get started and then guides you through the bootstrap wizard.
  • Each environment’s tab lists its details, such as its cloud provider, SSH username, and links to all of its defined instance templates.
  • If an instance template is not being used by any deployments or clusters, you can edit it.
  • The UI now provides a health display for each of your clusters, and for each service in a cluster.
  • When defining a new cluster, you can customize the names and size of each group of instances in the cluster, and add and remove groups as well.
  • You can grow, shrink, and repair clusters through the UI.
  • All users can change their own passwords through the UI, while administrative users have full user management capability.
  • Many usability improvements make the UI easier to use.

Cloudera Director’s documentation has gotten a lot of attention. It’s now available either in PDF or HTML formats, and you can get to it either from within the UI or externally. Introductory sections, release notes, installation and configuration instructions, examples, and feature descriptions have all been expanded and improved. Of course, details on all the new features described in this post are included, too.


Cloudera Director 1.1 builds on the solid technical foundation of Cloudera Director 1.0 to provide more ways to build and manage Hadoop clusters in the cloud. We’re not done yet, though. Here are some improvements that we have planned.

  • Support for enabling high availability (HA) in clusters
  • Automation of adding Kerberos authentication through Cloudera Manager
  • A service provider interface for implementing support for new cloud providers
  • Exposing more Cloudera Director functionality through the UI

Thanks to everyone who has tried out Cloudera Director and for the feedback we’ve received so far. To try out Cloudera Director yourself, download it for free or try the Cloudera AWS Quick Start to get started right away.

Bill Havanki is a Software Engineer at Cloudera.

Categories: Hadoop

How-to: Deploy and Configure Apache Kafka in Cloudera Enterprise

Cloudera Blog - Wed, 02/18/2015 - 12:55

With Kafka now formally integrated with, and supported as part of, Cloudera Enterprise, what’s the best way to deploy and configure it?

Earlier today, Cloudera announced that, following an incubation period in Cloudera Labs, Apache Kafka is now fully integrated into Cloudera’s Big Data platform, Cloudera Enterprise (CDH + Cloudera Manager). Our customers have expressed strong interest in Kafka, and some are already running Kafka in production.

Kafka is a popular open source project so there is already good awareness of its strengths and overall architecture. However, a common set of questions and concerns come up when folks try to deploy Kafka in production for the first time. In this post, we will  attempt to answer a few of those questions.


We assume you are familiar with the overall architecture of Kafka, and with the Kafka concepts of brokers, producers, consumers, topics, and partitions. If not, you can check out a previous blog post, “Kafka for Beginners,” and the Kafka documentation.

Hardware for Kafka

For optimal performance, Cloudera strongly recommendeds that production Kafka brokers be deployed on dedicated machines, separate from the machines on which the rest of your Apache Hadoop cluster runs. Kafka relies on dedicated disk access and large pagecache for peak performance, and sharing nodes with Hadoop processes may interfere with its ability to fully leverage the pagecache.

Kafka is meant to run on industry standard hardware. The machine specs for a Kafka broker machine will be similar to that of your Hadoop worker nodes. Though there is no minimum specification that is required, Cloudera suggests machines that have at least:

  • Processor with four 2Ghz cores
  • Six 7200 RPM SATA drives (JBOD or RAID10)
  • 32GB of RAM
  • 1Gb Ethernet
Cluster Sizing

The most accurate way to model your use case is to simulate the load you expect on your own hardware, and you can do that using the load-generation tools that ship with Kafka.

If you need to size your cluster without simulation, here are two simple rules of thumb:

  1. Size the cluster based on the amount of disk space required. This requirement can be computed from the estimated rate at which you get data multiplied by the required data retention period.
  2. Size the cluster cluster based on your memory requirements. Assuming readers and writers are fairly evenly distributed across the brokers in your cluster, you can roughly estimate of memory needs by assuming you want to be able to buffer for at least 30 seconds and compute your memory need as write_throughput*30.
Replication, Partitions, and Leaders

Data written to Kafka is replicated for fault tolerance and durability, and Kafka allows users to set a separate replication factor for each topic. The replication factor controls how many brokers will replicate each message that is written. If you have a replication factor of three, then up to two brokers can fail before you will lose access to your data. Cloudera recommends using a replication factor of at least two, so that you can transparently bounce machines without interrupting data consumption. However, if you have stronger durability requirements, use a replication factor of three or above.

Topics in Kafka are partitioned, and each topic has a configurable partition count. The partition count controls how many logs into which the topic will be sharded. Producers assign data to partitions in round-robin fashion, to spread the data belonging to a topic among its partitions. Each partition is of course replicated, but one replica of each partition is selected as a leader partition, and all reads and writes go to this lead partition.

Here are some factors to consider while picking an appropriate partition count for a topic:

A partition can only be read by a single consumer. (However, a consumer can read many partitions.) Thus, if your partition count is lower than your consumer count, many consumers will not receive data for that topic. Hence, Cloudera recommends a partition count that is higher than the maximum number of simultaneous consumers of the data, so that each consumer receives data.

Similarly, Cloudera recommends a partition count that is higher than the number of brokers, so that the leader partitions are evenly distributed across brokers, thus distributing the read/write load. (Kafka performs random and even distribution of partitions across brokers.) As a reference, many Cloudera customers have topics with tens or even hundreds of partitions each. However, note that Kafka will need to allocate memory for message buffer per partition. If there are a large number of partitions, make sure Kafka starts with sufficient heap space (number of partitions multiplied by replica.fetch.max.bytes).

Message Size

Though there is no maximum message size enforced by Kafka, Cloudera recommends writing messages that are no more than 1MB in size. Most customers see optimal throughput with messages ranging from  1-10 KB in size.

Settings for Guaranteed Message Delivery

Many use cases require reliable delivery of messages. Fortunately, it is easy to configure Kafka for zero data loss. But first, curious readers may like to understand the concept of an in-sync replica (commonly referred to as ISR): For a topic partition, an ISR is a follower replica that is caught-up with the leader partition, and is situated on a broker that is alive. Thus, if a leader replica is replaced by an ISR, there will be no loss of data. However, if a non-ISR replica is made a leader partition, some data loss is possible since it may not have the latest messages.

To ensure message delivery without data loss, the following settings are important:

  • While configuring a Producer, set acks=-1. This setting ensures that a message is considered to be successfully delivered only after ALL the ISRs have acknowledged writing the message.
  • Set the topic level configuration min.insync.replicas, which specifies the number of replicas that must acknowledge a write, for the write to be considered successful.If this minimum cannot be met, and acks=-1, the producer will raise an exception.
  • Set the broker configuration param unclean.leader.election.enable to false. This setting essentially means you are prioritizing durability over availability since Kafka would avoid electing a leader, and instead make the partition unavailable, if no ISR is available to become the next leader safely.

A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with request.required.acks set to -1. However, you can increase these numbers for stronger durability.


As of today, Cloudera provides a Kafka Custom Service Descriptor (CSD) to enable easy deployment and administration of a Kafka cluster via Cloudera Manager. The CSD provides granular real-time view of the health of your Kafka brokers, along with reporting and diagnostic tools. This CSD is available via Cloudera’s downloads page.

We hope the Cloudera Manager CSD, along with the tips in this blog post, make it easy for you to get up and running with Kafka in production.

Anand Iyer is a product manager at Cloudera.

Categories: Hadoop

Understanding HDFS Recovery Processes (Part 1)

Cloudera Blog - Fri, 02/13/2015 - 17:41

Having a good grasp of HDFS recovery processes is important when running or moving toward production-ready Apache Hadoop.

An important design requirement of HDFS is to ensure continuous and correct operations to support production deployments. One particularly complex area is ensuring correctness of writes to HDFS in the presence of network and node failures, where the lease recovery, block recovery, and pipeline recovery processes come into play. Understanding when and why these recovery processes are called, along with what they do, can help users as well as developers understand the machinations of their HDFS cluster.

In this blog post, you will get an in-depth examination of  these recovery processes. We’ll start with a quick introduction to the HDFS write pipeline and these recovery processes, explain the important concepts of block/replica states and generation stamps, then step through each recovery process. Finally, we’ll conclude by listing several relevant issues, both resolved and open.

This post is divided into two parts: Part 1 will examine the details of lease recovery and block recovery, and Part 2 will examine the details of pipeline recovery. Readers interested in learning more should refer the design specification: Append, Hflush, and Read for implementation details.


In HDFS, files are divided into blocks, and file access follows multi-reader, single-writer semantics. To meet the fault-tolerance requirement, multiple replicas of a block are stored on different DataNodes. The number of replicas is called the replication factor. When a new file block is created, or an existing file is opened for append, the HDFS write operation creates a pipeline of DataNodes to receive and store the replicas. (The replication factor generally determines the number of DataNodes in the pipeline.) Subsequent writes to that block go through the pipeline (Figure 1).

Figure 1. HDFS Write Pipeline

For read operations the client chooses one of the DataNodes holding copies of the block and requests a data transfer from it.

Below are two application scenarios highlighting the need for the fault-tolerance design requirement:

  • HBase’s Region Server (RS) writes to its WAL (Write Ahead Log), which is an HDFS file that helps to prevent data loss. If an RS goes down, a new one will be started and it will reconstruct the state of the predecessor RS by reading the WAL file. If the write pipeline was not finished when the RS died, then different DataNodes in the pipeline may not be in sync. HDFS must ensure that all of the necessary data is read from WAL file to reconstruct the correct RS state.
  • When a Flume client is streaming data to an HDFS file, it must be able to write continuously, even if some DataNodes in the pipeline fail or stop responding.

Lease recovery, block recovery, and pipeline recovery come into play in this type of situation:

  • Before a client can write an HDFS file, it must obtain a lease, which is essentially a lock. This ensures the single-writer semantics. The lease must be renewed within a predefined period of time if the client wishes to keep writing. If a lease is not explicitly renewed or the client holding it dies, then it will expire. When this happens, HDFS will close the file and release the lease on behalf of the client so that other clients can write to the file. This process is called lease recovery
  • If the last block of the file being written is not propagated to all DataNodes in the pipeline, then the amount of data written to different nodes may be different when lease recovery happens. Before lease recovery causes the file to be closed, it’s necessary to ensure that all replicas of the last block have the same length; this process is known as block recovery. Block recovery is only triggered during the lease recovery process, and lease recovery only triggers block recovery on the last block of a file if that block is not in COMPLETE state (defined in later section).
  • During write pipeline operations, some DataNodes in the pipeline may fail. When this happens, the underlying write operations can’t just fail. Instead, HDFS will try to recover from the error to allow the pipeline to keep going and the client to continue to write to the file. The mechanism to recover from the pipeline error is called pipeline recovery.

The following sections will explain these processes in more details.

Blocks, Replicas, and Their States

To differentiate between blocks in the context of the NameNode and blocks in the context of the DataNode, we will refer to the former as blocks, and the latter as replicas.

A replica in the DataNode context can be in one of the following states (see enum ReplicaState in org.apache.hadoop.hdfs.server.common.HdfsServerConstants.java):

  • FINALIZED: when a replica is in this state, writing to the replica is finished and the data in the replica is "frozen" (the length is finalized), unless the replica is re-opened for append. All finalized replicas of a block with the same generation stamp (referred to as the GS and defined below) should have the same data. The GS of a finalized replica may be incremented as a result of recovery.
  • RBW (Replica Being Written): this is the state of any replica that is being written, whether the file was created for write or re-opened for append. An RBW replica is always the last block of an open file. The data is still being written to the replica and it is not yet finalized. The data (not necessarily all of it) of an RBW replica is visible to reader clients. If any failures occur, an attempt will be made to preserve the data in an RBW replica.
  • RWR (Replica Waiting to be Recovered): If a DataNode dies and restarts, all its RBW replicas will be changed to the RWR state. An RWR replica will either become outdated and therefore discarded, or will participate in lease recovery.
  • RUR (Replica Under Recovery): A non-TEMPORARY replica will be changed to the RUR state when it is participating in lease recovery.
  • TEMPORARY: a temporary replica is created for the purpose of block replication (either by replication monitor or cluster balancer). It’s similar to an RBW replica, except that its data is invisible to all reader clients. If the block replication fails, a TEMPORARY replica will be deleted.

A block in the NameNode context may be in one the following states (see enum BlockUCState in org.apache.hadoop.hdfs.server.common.HdfsServerConstants.java):

  • UNDER_CONSTRUCTION: this is the state when it is being written to. An UNDER_CONSTRUCTION block is the last block of an open file; its length and generation stamp are still mutable, and its data (not necessarily all of it) is visible to readers. An UNDER_CONSTRUCTION block in the NameNode keeps track of the write pipeline (the locations of valid RBW replicas), and the locations of its RWR replicas.
  • UNDER_RECOVERY: If the last block of a file is in UNDER_CONSTRUCTION state when the corresponding client’s lease expires, then it will be changed to UNDER_RECOVERY state when block recovery starts.
  • COMMITTED: COMMITTED means that a block’s data and generation stamp are no longer mutable (unless it is reopened for append), and there are fewer than the minimal-replication number of DataNodes that have reported FINALIZED replicas of same GS/length. In order to service read requests, a COMMITTED block must keep track of the locations of RBW replicas, the GS and the length of its FINALIZED replicas. An UNDER_CONSTRUCTION block is changed to COMMITTED when the NameNode is asked by the client to add a new block to the file, or to close the file. If the last or the second-to-last blocks are in COMMITTED state, the file cannot be closed and the client has to retry.
  • COMPLETE: A COMMITTED block changes to COMPLETE when the NameNode has seen the minimum replication number of FINALIZED replicas of matching GS/length. A file can be closed only when all its blocks become COMPLETE. A block may be forced to the COMPLETE state even if it doesn’t have the minimal replication number of replicas, for example, when a client asks for a new block, and the previous block is not yet COMPLETE.

DataNodes persist a replica’s state to disk, but the NameNode doesn’t persist the block state to disk. When the NameNode restarts, it changes the state of the last block of any previously open file to the UNDER_CONSTRUCTION state, and the state of all the other blocks to COMPLETE.

Simplified state transition diagrams of replica and block are shown in Figures 2 and 3. See the design document for more detailed ones.

Figure 2: Simplified Replica State Transition

Figure 3. Simplified Block State Transition

Generation Stamp

A GS is a monotonically increasing 8-byte number for each block that is maintained persistently by the NameNode. The GS for a block and replica (Design Specification: HDFS Append and Truncates) is introduced for the following purposes:

  • Detecting stale replica(s) of a block: that is, when the replica GS is older than the block GS, which might happen when, for example, an append operation is somehow skipped at the replica.
  • Detecting outdated replica(s) on DataNodes which have been dead for long time and rejoin the cluster.

A new GS is needed when any of the following occur:

  • A new file is created
  • A client opens an existing file for append or truncate
  • A client encounters an error while writing data to the DataNode(s) and requests a new GS
  • The NameNode initiates lease recovery for a file
Lease Recovery and Block Recovery Lease Manager

The leases are managed by the lease manager at the NameNode. The NameNode tracks the files each client has open for write. It is not necessary for a client to enumerate each file it has opened for write when renewing leases. Instead, it periodically sends a single request to the NameNode to renew all of them at once. (The request is an org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseResponseProto message, which is an RPC protocol between HDFS client and NameNode.)

Each NameNode manages a single HDFS namespace, each of which has a single lease manager to manage all the client leases associated with that namespace. Federated HDFS clusters may have multiple namespaces, each with its own lease manager.

The lease manager maintains a soft limit (1 minute) and hard limit (1 hour) for the expiration time (these limits are currently non-configurable), and all leases maintained by the lease manager abide by the same soft and hard limits. Before the soft limit expires, the client holding the lease of a file has exclusive write access to the file. If the soft limit expires and the client has not renewed the lease or closed the file (the lease of a file is released when the file is closed), another client can forcibly take over the lease. If the hard limit expires and the client has not renewed the lease, HDFS assumes that the client has quit and will automatically close the file on behalf of the client, thereby recovering the lease.

The fact that the lease of a file is held by one client does not prevent other clients from reading the file, and a file may have many concurrent readers, even while a client is writing to it.

Operations that the lease manager supports include:

  • Adding a lease for a client and path (if the client already has a lease, it adds the path to the lease, otherwise, it creates a new lease and adds the path to the lease)
  • Removing a lease for a client and path (if it’s the last path in the lease, it removes the lease)
  • Checking whether the soft and/or hard limits have expired, and
  • Renewing the lease for a given client.

The lease manager has a monitor thread that periodically (currently every 2 seconds) checks whether any lease has an expired hard limit, and if so, it will trigger the lease recovery process for the files in these leases.

An HDFS client renews its leases via the org.apache.hadoop.hdfs.LeaseRenewer.LeaseRenewer class which maintains a list of users and runs one thread per user per NameNode. This thread periodically checks in with the NameNode and renews all of the client’s leases when the lease period is half over.

(Note: An HDFS client is only associated with one NameNode; see constructors of org.apache.hadoop.hdfs.DFSClient). If the same application wants to access different files managed by different NameNodes in a federated cluster, then one client needs to be created for each NameNode.)

Lease Recovery Process

The lease recovery process is triggered on the NameNode to recover leases for a given client, either by the monitor thread upon hard limit expiry, or when a client tries to take over lease from another client when the soft limit expires. It checks each file open for write by the same client, performs block recovery for the file if the last block of the file is not in COMPLETE state, and closes the file. Block recovery of a file is only triggered when recovering the lease of a file.

Below is the lease recovery algorithm for given file f. When a client dies, the same algorithm is applied to each file the client opened for write.

  1. Get the DataNodes which contain the last block of f.
  2. Assign one of the DataNodes as the primary DataNode p.
  3. p obtains a new generation stamp from the NameNode.
  4. p gets the block info from each DataNode.
  5. p computes the minimum block length.
  6. p updates the DataNodes, which have a valid generation  stamp, with the new generation stamp and the minimum block length.
  7. p acknowledges the NameNode the update results.
  8. NameNode updates the BlockInfo.
  9. NameNode remove f’s lease (other writer can now obtain the lease for writing to f).
  10. NameNode commit changes to edit log.

Steps 3 through 7 are the block recovery parts of the algorithm. If a file needs block recovery, the NameNode picks a primary DataNode that has a replica of the last block of the file, and tells this DataNode to coordinate the block recovery work with other DataNodes. That DataNode reports back to the NameNode when it is done. The NameNode then updates its internal state of this block, removes the lease, and commits the change to edit log.

Sometimes, administrator needs to recover the lease of a file forcibly before the hard limit expires. A CLI debug command is available (starting from Hadoop release 2.7 and CDH 5.3) for this purpose:

hdfs debug recoverLease [-path <path>] [-retries <num-retries>]


Lease recovery, block recovery, and pipeline recovery are essential to HDFS fault-tolerance. Together, they ensure that writes are durable and consistent in HDFS, even in the presence of network and node failures.

You should now have a better understanding of when and why lease recovery and block recovery are invoked, and what they do. In Part 2, we’ll explore pipeline recovery.

Yongjun Zhang is a Software Engineer at Cloudera.

Categories: Hadoop

How-to: Install and Use Cask Data Application Platform Alongside Impala

Cloudera Blog - Wed, 02/11/2015 - 14:10

Cloudera customers can now install, launch, and monitor CDAP directly from Cloudera Manager. This post from Nitin Motgi, Cask CTO, explains how.

Today, Cloudera and Cask are very happy to introduce the integration of Cloudera’s enterprise data hub (EDH) with the Cask Data Application Platform (CDAP). CDAP is an integrated platform for developers and organizations to build, deploy, and manage data applications on Apache Hadoop. This initial integration will enable CDAP to be installed, configured, and managed from within Cloudera Manager, a component of Cloudera Enterprise. Furthermore, it will simplify data ingestion for a variety of data sources, as well as enable interactive queries via Impala. Starting today, you can download and install CDAP directly from Cloudera’s downloads page.

In this post, you’ll learn how to get started by installing CDAP using Cloudera Manager. We have also created a video of this integration for further exploration.

Installing CDAP with Cloudera Manager

To install CDAP on a cluster managed by Cloudera Manager, we have provided a CSD (Custom Service Descriptor). This CSD gives Cloudera Manager the required information on where to download CDAP from and how to configure and run CDAP services. To install CDAP CSD, first drop the downloaded jar (from Cloudera’s download page) into the /opt/cloudera/csd directory on the Cloudera Manager server and restart Cloudera Manager. Cloudera’s full documentation on installing CSDs can be found here.

Once CSD is installed, the first thing you want to do is download the CDAP parcel from the Hosts -> Parcels page. Note that by default, the CSD adds a Remote Parcel Repository URL for the latest version of CDAP at http://repository.cask.co/parcels/cdap/latest/. If desired, you can specify a particular version of CDAP, for example http://repository.cask.co/parcels/cdap/2.7/

With a CDAP Parcel Repository URL configured, you will now see the CDAP Parcel available for download in the parcels page. From there, you can download, distribute, and activate the CDAP parcel on your cluster hosts.

Once the parcel is installed, the next step is to configure and start the CDAP services. Before doing so, note that there are a few additional outside requirements for running CDAP on your cluster. Please refer to the prerequisites section. Once all the prerequisites are satisfied, you can begin installing CDAP via the “Add Service” wizard.

The “Add Service” wizard will guide you through selecting the hosts you want to run CDAP on and customizing the configuration. A few things to note during the wizard:

  • CDAP consists of the Gateway/Router, Kafka, Master, and Web-App roles, and an optional Auth role. These services can all be thought of as “master” services. We recommend installing all roles together on a host, with multiple hosts for redundancy. Additionally, there is a client “Gateway” role that can be installed on any host where it is desired to run CDAP client tools (such as cdap-cl).
  • There is an optional “Explore” capability for ad-hoc querying via Apache Hive. If you plan on using this, be sure to select the service dependency set containing Hive and check the “Explore Enabled” option on the configuration page.
  • If you are installing CDAP on a Kerberos-enabled cluster, you must select the “Kerberos Auth Enabled” checkbox on the configuration page.

Finally, sit back and watch as Cloudera Manager spins up your CDAP services! Once it completes, check out the CDAP Console from the “Quick Links” on the CDAP Service overview page. For more details on Cloudera Manager/CDAP integration, please click here.

Ingesting Data and Exploring It with Impala

Streams are the primary means of bringing data from external systems into CDAP in real-time. They are ordered, time-partitioned sequences of data, usable for both real-time and batch collection and consumption of data. Using the CDAP Command Line Interface (CLI), you can easily create streams.

First, connect to your CDAP instance using the CLI:

> connect <hostname>:11015

Next, create a Stream:

> create stream trades

You can then add events to a Stream one by one:

> send stream trades 'NFLX,441.07,50' > send stream trades 'AAPL,118.63,100' > send stream trades 'GOOG,528.48,10'

Alternatively, you can add the entire contents of a file:

> load stream trades /my/path/trades.csv

Or you can use other tools or APIs available to ingest data in real-time or batch. For more information on what are other ways of ingesting data into CDAP, please refer to the docs here.

You can now examine the contents of your stream by executing a SQL query:

> execute 'select * from cdap_stream_trades limit 5' +==================================================================================================================+ | cdap_stream_trades.ts: BIGINT | cdap_stream_trades.headers: map<string,string> | cdap_stream_trades.body: STRING | +==================================================================================================================+ | 1422924257559 | {} | NFLX,441.07,50 | | 1422924261588 | {} | AAPL,118.63,100 | | 1422924265441 | {} | GOOG,528.48,10 | | 1422924291009 | {"content.type":"text/csv"} | GOOG,538.53,18230 | | 1422924291009 | {"content.type":"text/csv"} | GOOG,538.17,100 | +==================================================================================================================+

You can also attach a schema to your stream to enable more powerful queries:

> set stream format trades csv 'ticker string, price double, trades int' > execute 'select ticker, sum(price * trades) / 1000000 as millions from cdap_stream_trades group by ticker order by millions desc' +=====================================+ | ticker: STRING | millions: DOUBLE | +=====================================+ | AAPL | 3121.8966341143905 | | NFLX | 866.0789117408007 | | GOOG | 469.01340359839986 | +=====================================+

On one of our test clusters, the query above took just about two minutes to complete.

Data in CDAP is integrated with Apache Hive and the above query translates into a Hive query. As such, it will launch two MapReduce jobs in order to calculate the query results, which is why it takes minutes instead of seconds. To cut down query time, you can use Impala to query the data instead of Hive. Since Streams are written in a custom format, they cannot be directly queried through Impala. Instead, you can create an Adapter that regularly reads Stream events and writes those events into files on HDFS that can then be queried by Impala. You can also do this through the CLI:

> create stream-conversion adapter ticks_adapter on trades frequency 10m format csv schema "ticker string, price double, trades int"

This command will create an Adapter that runs every 10 minutes, reads the last 10 minutes of events from the Stream, and writes them to a file set that can be queried through Impala. The next time the Adapter runs, it will spawn a MapReduce job that reads all events added in the past 10 minutes, writes each event to Avro encoded files, and registers a new partition in the Hive Metastore.

You can then query the contents using Impala. On your cluster, use the Impala shell to connect to Impala:

$ impala-shell -i <impala-host> > invalidate metadata > select ticker, sum(price * trades) / 1000000 as millions from cdap_user_trades_converted group by ticker order by millions desc +--------+-------------------+ | ticker | millions | +--------+-------------------+ | AAPL | 3121.88477111439 | | NFLX | 866.0568582408006 | | GOOG | 469.0081187983999 | +--------+-------------------+ Fetched 3 row(s) in 1.03s

Since you are using Impala, no MapReduce jobs are launched and the query comes back in a second!

Now that you have data in CDAP and are able to explore your data, you can use CDAP to build real-time, batch, or real-time and batch data application. For more information on how to build data applications using CDAP, please visit http://docs.cask.co.


Categories: Hadoop

New in Cloudera Manager 5.3: Easier CDH Upgrades

Cloudera Blog - Tue, 02/10/2015 - 16:42

An improved upgrade wizard in Cloudera Manager 5.3 makes it easy to upgrade CDH on your clusters.

Upgrades can be hard, and any downtime to mission-critical workloads can have a direct impact on revenue. Upgrading the software that powers these workloads can often be an overwhelming and uncertain task that can create unpredictable issues. Apache Hadoop can be especially complex as it consists of dozens of components running across multiple machines. That’s why an enterprise-grade administration tool is necessary for running Hadoop in production, and is especially important when taking the upgrade plunge.

Cloudera Manager makes it easy to upgrade to the latest version of CDH. Not only does Cloudera Manager have a built-in upgrade wizard to make your CDH upgrades simple and predictable, it also features rolling-restart capability that enables zero-downtime upgrades under certain conditions.

This post illustrates how to leverage Cloudera Manager to upgrade your Cloudera cluster using the upgrade wizard, and also highlights some of the new features in Cloudera Enterprise 5.3.

Why a Wizard?

Upgrading can involve many steps that can depend on the services installed and the start/end versions. A wizard to upgrade across major versions (CDH 4 to CDH 5) has been available since Cloudera Manager 5. Cloudera Manager 5.3 introduces an enhanced CDH upgrade wizard that adds support for minor (CDH 5.x to CDH 5.y) and maintenance (CDH 5.b.x to CDH 5.b.y) version upgrades. The enhanced upgrade wizard performs service-specific upgrade steps that you would have had to run manually in the past.

Parcel and package installations are both supported by the CDH upgrade wizard. Using parcels is the preferred and recommended way, as packages must be manually installed, whereas parcels are installed by Cloudera Manager. Consider upgrading from packages to parcels so that the process is more automated, supports rolling upgrades, and provides an easier upgrade experience. (See this FAQ and this blog post to learn more about parcels.)

If you use parcels, have a Cloudera Enterprise license, and have enabled HDFS high availability, you can perform a rolling upgrade for non-major upgrades. This enables you to upgrade your cluster software and restart the upgraded services without incurring any cluster downtime. Note that it is not possible to perform a rolling upgrade from CDH 4 to CDH 5 because of incompatibilities between the two major versions.

Running the Upgrade Wizard

The Cloudera Manager version must always be equal to or greater than the CDH version you upgrade to. For example, to upgrade to CDH 5.3, you must be on Cloudera Manager 5.3 or higher.

  1. Log in to the Cloudera Manager Admin Console.
  2. To access the wizard, on the Home page, click the cluster’s drop down menu, and select “Upgrade Cluster.”

  3. Alternately, you can trigger the wizard from the Parcels page, by first downloading and distributing a parcel to upgrade to, and then selecting the “Upgrade” button for this parcel.

  4. When starting from the cluster’s Upgrade option, if the option to pick between packages and parcels is provided, click the “Use Parcels” radio button. Select the CDH version.

    If there are no qualifying parcels, the location of the parcel repository will need to be added under “Parcel Configuration Settings.”

  5. The wizard will now prompt you to backup existing databases. It will provide examples of additional steps to prepare your cluster for upgrade. Please read the upgrade documentation for a more complete list of actions to be taken at this stage, before proceeding with the upgrade. Check “Yes” for all required actions to be able to “Continue.”

  6. The wizard now performs consistency and health checks on all hosts in the cluster. This feature is particularly helpful if you have mismatched versions of packages across cluster hosts. If any problems are found, you will be prompted to fix these before continuing.

  7. The selected parcel is downloaded and distributed to all hosts.


  8. For major upgrades, the wizard will warn that the services are about to be shut down for the upgrade.

    For minor and maintenance upgrades, if you are using parcels and have HDFS high availability enabled, you will have the option to select “Rolling Upgrade” on this page. Supported services will undergo a rolling restart, while the rest will undergo a normal restart, with some downtime. Check “Rolling Upgrade” to proceed with this option.

    Until this point, you can exit and resume the wizard without impacting any running services.

  9. The Command Progress screen displays the results of the commands run by the wizard as it shuts down all services, activates the new parcel, upgrades services, deploys client configuration files, and restarts services.

    The service commands include upgrading HDFS metadata, upgrading the Apache Oozie database and installing ShareLib, upgrading the Apache Sqoop server and Hive Metastore, among other things.

  10. The Host Inspector runs to validate all hosts, as well as report CDH versions running on them.

  11. At the end of the wizard process, you are prompted to finalize the HDFS metadata upgrade. It is recommended at this stage to refer to the upgrade documentation for additional steps that might be relevant to your cluster configuration and upgrade path.

    For major (CDH 4 to CDH 5) upgrades, you have the option of importing your MapReduce configurations into your YARN service. Additional steps in the wizard will assist with this migration. On completion, Cloudera recommends that you review the YARN configurations for any additional tuning you might need.

  12. Your upgrade is now complete!
Next Steps

Cloudera Enterprise 5 provides additional enterprise-ready capabilities and marks the next step in the evolution of the Hadoop-based data management platform. Any enhancements are ineffective if the benefits of the enterprise data hub are not easily accessible to existing users. That’s why Cloudera has placed an increased emphasis on the upgrade experience, to make it easier to upgrade to the latest version of the software. The team will continue to work on making improvements to this experience.

To ensure the highest level of functionality and stability, consider upgrading to the most recent version of CDH.

Please refer to the upgrade documentation for more comprehensive details on using the CDH upgrade wizard. Also, register for the “Best Practices for Upgrading Hadoop in Production” webinar that will occur live on Feb. 12, 2015.

Jayita Bhojwani is a Software Engineer at Cloudera.

Vala Dormiani is a Product Manager at Cloudera.

Categories: Hadoop

Couchdoop: Couchbase Meets Apache Hadoop

Cloudera Blog - Mon, 02/09/2015 - 15:11

Thanks to Călin-Andrei Burloiu, Big Data Engineer at antivirus company Avira, and Radu Pastia, Senior Software Developer in the Big Data Team at Orange, for the guest post below about the Couchdoop connector for bringing Couchbase data into Hadoop.

Couchdoop is a Couchbase connector for Apache Hadoop, developed by Avira on CDH, that allows for easy, parallel data transfer between Couchbase and Hadoop storage engines. It includes a command-line tool, for simple tasks and prototyping, as well as a MapReduce library, for those who want to use Couchdoop directly in MapReduce jobs. Couchdoop works natively with CDH 5.x.
Couchdoop can help you:

  • Import documents from Couchbase to Hadoop storage (HDFS or Apache HBase)
  • Export documents from Hadoop storage to Couchbase
  • Batch-update existing Couchbase documents
  • Query Couchbase views to import only specific documents (daily imports for example)
  • Easily control performance by adjusting the degree of parallelism via MapReduce

In the remainder of this post, you’ll learn the main features of Couchdoop and explore a demo application .

Why Couchdoop?

In many Big Data applications, data is transferred from an “operational” tier containing a key-value store to an “analytical” tier containing Hadoop via Apache Flume or a queuing service such as Apache Kafka or Rabbit MQ. However, this approach is not always possible or efficient, such as when the events themselves are highly related (like a shopping session with several clicks and views) and could be conveniently grouped before being pushed to Hadoop. In those cases where Couchbase serves as the operational tier, Couchdoop’s import feature comes in handy. Conversely, you can use Couchdoop’s export feature to move data computed with Hadoop into Couchbase for use in real-time applications.

The data collected by the operational tier can be imported in the analytical tier where traditionally it will be stored in HDFS. By using the tools provided by CDH, the data could be processed and enhanced for various use cases. One use case is ad hoc querying, which allows business people to query the data in real time using  Impala. Another use case is improving user experience by using machine-learning algorithms to adapt the application to users’ needs. For this use case, both MapReduce and Apache Spark, which are included in CDH, can be used. (Spark comes with its own machine-learning library, MLlib.) Apache Mahout offers time-proved algorithms written in MapReduce as well as newer and faster implementations written in Spark. The outcome of the machine-learning algorithms can be exported to the operational tier using Couchdoop.

Importing in E-Commerce Websites

Real-time tier user events are typically streamed to Hadoop by using a logging system, such as Flume, or a message queuing system, such as Kafka. So why would you import data with Couchdoop instead of streaming it? Because you will be importing directly from Couchbase when your real-time application requires keeping a session with recent user activity.

An e-commerce website is a typical example, because it needs to store the shopping cart and recently viewed products in order to recommend to the user other products that he/she might want to buy in that session. A batch process can be scheduled to use Couchdoop to incrementally import all user sessions from Couchbase to HDFS at the end of each day. The recommendation engine can be retrained with the fresh data to adapt to user’s interests. Algorithms such as frequent-item-sets require all products from the same cart to be grouped together. If you were to use Flume to stream user events, you would need to create a job to group all events from the same session. By using the Couchdoop import approach, all the data from a particular session is already in the same place, dramatically reducing the data processing computational cost.

Exporting in Avira Messaging Engine

One example of a back-end application that uses Couchdoop in production is the Avira Messaging Engine (AME), which is responsible for delivering in-product messages to our users. Those messages may be security alerts, announcements, and marketing campaigns. Because some messages may be irrelevant for some users, we are now building a marketing engine based on response modeling, which targets each user based on their profile. The best next message to deliver to a user is predicted in the analytical tier using CDH. Couchdoop’s export feature is used to publish targeted messages for each user in Couchbase. Thus, our antivirus service is now able to request messages from our operational tier..


Learning to use a new tool is always easier via example. Here is a fictional but common use-case that we’ll solve with Couchdoop:

Let’s say we have a very popular news website. Currently all users are shown the same content, but going forward we want to analyze user behavior and deliver personalized recommendations.

We’ll use Couchbase, Hadoop, and Couchdoop for this goal. This is what we’ll do:

  • Keep track of users by storing a unique ID in a cookie, and optionally asking them to authenticate
  • Record each session/visit as a Couchbase document; each page view will be added to a session until this session is ended (some time passes and the session expires – the visit is complete)
  • Move completed sessions to Hadoop using Couchdoop to import
  • Run our recommendation algorithm/tool in Hadoop (out of scope for this demo)
  • Load the recommendations into Couchbase to serve them to the website

Let’s go further into detail.

Preparing the Data

For each session ID we store a document in Couchbase that tracks user actions in that session:

Key: sid::20141201::johnny::1357902468 { "articles": [ { "name": "hackers-target-biotech", "requestTime": "2014-12-01T22:04:15+00:00", "timeSpent": 247 }, { "name": "mexican-president-to-sell-mansion", "requestTime": "2014-12-01T22:08:25+00:00", "timeSpent": 26 } ] }

The above Couchbase document encodes the date in the key, 20141201 (December 1, 2014), and the sequence of articles read in the value. The time spent reading an article is very important feedback; the user may have requested an article because it seemed interesting but then stopped reading for whatever reason.

Couchdoop is scheduled to import user sessions in batch into HDFS. By analyzing millions of sessions we can detect sequence patterns, apply association rules (such as frequent-item-sets), and finally compute the recommendations for each user. Each user has its own recommendation document:

Key: rec::johnny { "articles": [ { "name": "enterprise-security-startup", "score": 0.92 }, { "name": "the-internet-is-burning", "score": 0.78 } ] }

As we plan to import sessions to Hadoop on a daily basis, we need to be able to retrieve all sessions created during any given day. We’ll create a Couchbase view for those documents, which will allow finding all session documents from a particular date. Furthermore, we will also randomly partition the data within each date to allow separation of input into smaller pieces, to leverage Hadoop’s parallelism. Couchdoop will create a Map task for each of these partitions.

The following JavaScript map function parses a document’s key, extracts the date, and emits it as a view key:

function (doc, meta) { // Retrieve only JSON documents. if (meta.type != "json") { return; } // Split each day into 64 smaller parts. var NUM_PARTITIONS = 64; // Use Regex to extract the date and the session id. var re = /ses::([\d]+)::([\w]+)::([\d]+)/ var fields = re.exec(meta.id); if (fields != null) { var date = fields[1]; var sid = parseInt(fields[3]); // Compute random partition number based on session ID // (should provide enough randomness). var partition = sid % NUM_PARTITIONS; emit([date, partition], null); } }

For the document with key sid::20141201::johnny::1357902468, the map function will emit ["20141201", 4]. When we configure Couchdoop to import data from a Couchbase view we need to specify a list of view keys. Each map task will retrieve data with one of the view keys. The list can be expressed sequentially as a semicolon separated JSON list, e.g. ["20141201", 0];["20141201", 1];["20141201", 2];["20141201", 3], ... or by using key ranges, e.g. ["20141201", ((0-63))].

To prepare your Couchbase installation with sample data and to configure the view above, follow the Prerequisites section from the README of the GitHub demo project. The above map function should be defined in the sessions design document and the by_date view.

Now we need to move these sessions to HDFS, to serve as input for our recommender.


The following command will load all sessions for December 1, 2014, in HDFS:

hadoop jar target/couchdoop-*-job.jar import \     --couchbase-urls http://couchbase.example.com:8091/pools \     --couchbase-bucket users \     --couchbase-password 'secret' \     --couchbase-designdoc-name sessions \     --couchbase-view-name by_date \     --couchbase-view-keys '["20140401",((0-63))]' \     --hadoop-mappers 4 \     --output /website/sessions

The above command will start a MapReduce job.

Map tasks will connect to Couchbase by using the provided URLs, bucket, and password. The view by_date from design document sessions is going to be queried. Parallelism is controlled with the --hadoop-mappers parameter. The 64 views key will be divided to the specified 4 Map tasks, so each one will process 16 of them. If we don’t set --hadoop-mappers parameter, the number of Map tasks used will be equal to the number of view keys, in this case 64.

The result will be written in HDFS in /website/sessions, as TSV files having two columns one corresponding to keys and the other to values from Couchbase. By default, Hadoop uses tab as separator between columns, but this setting can be changed by modifying the mapred.textoutputformat.separator property.


We assume that we have computed recommendations for each user and we now have them stored as key-value TSV files in HDFS, formatted as Couchbase recommendation documents. In other to publish them to Couchbase, we can run the following in the command line:

hadoop jar target/couchdoop-*-job.jar export \     --couchbase-urls http://avira5:8091/pools \     --couchbase-bucket users \     --couchbase-password 'secret' \     --input /user/johnny/recommendations \     --couchbase-operation ADD \     --couchbase-expiry 3600

Each Hadoop task will connect to Couchbase Server by using the provided URLs, bucket, and password. We can use the --couchbase-operation parameter to choose one of the available Couchbase operations to be performed for each key-value pair:

  • ADD (puts a document if it does not exist)
  • SET (puts a document if it does not exist and overwrites it if it exists)
  • REPLACE (overwrites a document if it exists)
  • APPEND / PREPEND (appends / prepends a document’s value if it exists; these operations violate JSON semantics)
  • DELETE (removes a document by its key if it exists)
  • EXISTS (does not modify any document from Couchbase, it just uses Hadoop counters to count how many input keys exist in the bucket).

We can optionally set an expiry for the documents stored. If we need more control and we need to specify a particular operation and expiry for each document we can use the MapReduce API and write our own MapReduce jobs with Couchdoop.

Using Couchdoop as a Hadoop Library

In a real situation, one often needs a bit more functionality than what the Couchdoop command-line tool offers. For example, you might need to convert Couchbase JSON documents to other formats (like CSV, Avro, or Parquet) to match the expected format of the recommender. Our sample project illustrates exactly that, and also demos the use of Couchdoop as a library.

Next, let’s look a bit in more detail at what we need to look after. If we use Couchdoop as a library in our MapReduce job, the first thing we must confirm is that all configuration properties needed by Couchdoop are set in the Hadoop configuration object. These properties are the same as the ones used in the command line (as shown above) but use dots instead of dashes (for example --couchbase-urls becomes couchbase.urls). You can set them as easily as:
conf.set("couchbase.urls", "http://couchbase.example.com:8091/pools")

You can conveniently create an XML file with your configuration, similar to mapred-site.xml, and pass it with -conf argument of hadoop jar command line tool. (This is what we did in in the demo project in couchdoop-site.xml; check the README for a usage sample. Check out the wiki on Couchdoop’s GitHub page to learn more about configuration properties.)

Another important topic before we go on is dependencies of the Couchbase client that may be incompatible with those of Hadoop (depending on your Hadoop distribution). Use the statement job.setUserClassesTakesPrecedence(true); to confirm the Couchbase client functions correctly.

Now that everything is all set, we can have a look at the actual jobs.

  • Set the input format with job.setInputFormatClass(CouchbaseViewInputFormat.class);. This config will instruct Hadoop to read data from Couchbase. The properties you set earlier will be used to connect to Couchbase, use the correct view, and query the requested keys. For an example, check ImportDriver class from the demo project.
  • Write a mapper that gets the key as Text and the value as ViewRow (com.couchbase.client.protocol.views.ViewRow). The ViewRow is a document extracted by the CouchbaseViewInputFormat  from the given view. For an example, check ImportMapper class.
  • Write a mapper that emits the key as String and the value as CouchbaseAction (com.avira.couchdoop.exp.CouchbaseAction). Check ExportMapper to see you can do this.
  • Set the output key, value< and format classes:


    For an example, check ExportDriver from the demo project.

Potential Issues Using Couchbase Views

Couchdoop’s import feature relies on Couchbase views. This is a powerful feature that allows you to query for any document in the database, but it is well known that using views in Couchbase can affect bucket performance. Views are slow because they keep their index on disk and consume CPU each time a document is indexed. You typically overcome these issues by reserving about 20% of your memory to your operating system file cache and allocating a CPU core for each view design document. If the views still don’t work well, consider using the Couchbase Sqoop connector, which can import the whole Couchbase bucket into HDFS – however, you will lose the advantage of incremental updates at the expense of real-time performance. Alternatively, you can update your architecture to stream all your real-time application events to Hadoop via Flume or Kafka, as previously discussed.

Couchdoop and High Throughput

As our tests on Bigstep’s Full Metal Cloud prove, by leveraging parallelism Couchdoop can achieve very high throughput and push Couchbase to the limit. See the results of the performance test here. On one hand, this saves time, but on the other hand, it might affect your real-time production application performance. The real-time application will compete with Couchdoop on Couchbase resources. To overcome this issue, schedule Couchdoop imports/exports when the real-time has lower traffic or simply decrease Couchdoop’s level of parallelism.

Note: During testing, we noticed that when Couchdoop transfers data between many Hadoop workers and many Couchbase nodes, the throughput is affected by the network congestion. If you have many Couchbase nodes in the cluster, it might be better to have a lower level of parallelism and use fewer Hadoop workers. Experiment with more levels of parallelism and choose the value that works better for you.


If you use both Couchbase and CDH, you will definitely find Couchdoop useful. If you are computing something with Hadoop in the analytical tier, Couchdoop’s export feature can help you publish the new data in Couchbase to make it available to the operational tier.

If you want to take incremental updates from Couchbase and process them in Hadoop, you should use Couchdoop’s import feature. The most common use case in this circumstance is when you want to incrementally import user sessions. But if you don’t require incremental updates and you only occasionally perform imports from Couchbase to Hadoop, the Couchbase Sqoop connector might be the better tool for you. If the data that needs to go to your analytical tier looks more like events and is not structured as sessions, or if Couchbase views cause you pain, you might find that Flume or Kafka works better for you.

Whatever you do, experiment before choosing a solution for production!


We built Couchdoop out of necessity, while wrangling data at Avira and we’d like to thank them for allowing us to open-source this project. Our thanks also to Bigstep, which showed excitement toward Couchdoop and helped us test it and move forward with the project.

Categories: Hadoop

Data Processing with Apache Crunch at Spotify

Cloudera Blog - Fri, 02/06/2015 - 17:15

Many thanks to David Whiting of Spotify for allowing us to re-publish the following Spotify Labs post about its Apache Crunch use cases.

(Note: Since this post was originally published in November 2014, many of the library functions described have been added into crunch-core, so they’ll soon be available to all Crunch users by default.)

All of our lovely Spotify users generate many terabytes of data every day. All the songs that are listened to, all the playlists you make, all the people you follow, and all the music you share. Somehow we need to organize, process, and aggregate all of this into meaningful information out the other side. Here are just a few of the things we need to get out of the data:

  • Reporting to record labels and rights holders so we can make sure everyone gets paid
  • Creating toplists of what is the most popular music right now
  • Getting feedback on how well different aspects of the product are working so we can improve the user experience
  • Powering our intelligent radio and discovery features

To store and process all this data we use Apache Hadoop, a framework for distributed storage and processing of huge amounts of data.

This is nothing new for us, of course, we’ve been using Hadoop here since 2009. In the past we’ve talked a lot about the tools we’ve developed to make it easy for developers and analysts to write data processing jobs using the MapReduce approach in Python (see Luigi). However, in 2013 in became apparent that our current strategy of running Python code over the Hadoop Streaming API just wasn’t performing well enough. Jobs were slow and CPU bound and we had far too many runtime failures on the Hadoop cluster. Furthermore, the environment was lacking higher-level abstractions which could have helped expressing developer intent. Instead we saw lots of unnecessary code duplication which was costly to maintain and debug. This coincided with a general move away from Python and towards Java as a language of choice for backend systems at Spotify, so we started looking for a better platform on which people could write the code for their data processing.

Enter Crunch…

In 2010, Google published a paper entitled FlumeJava: Easy, Efficient Data-Parallel Pipelines describing an abstraction for parallelisable data processing based on the notion of a lazily-evaluated "parallel collection" (PCollection) and various transformations that could be applied to it. Thanks to the work of Josh Wills at Cloudera and various other contributors from 2011 until the present day, a system based on this called Crunch was implemented on top of Hadoop, making it available for everyone (including us) to use on top of their existing infrastructure.

Given our requirements at the time, we were attracted to Crunch for a number of reasons:

  • Type-safety makes it much less likely to make mistakes in your code, which are very costly when running across a large Hadoop cluster
  • High performance, especially compared to the performance of Python over Hadoop Streaming
  • Higher-level abstractions such as filters, joins and aggregations instead of having to think of everything in terms of MapReduce
  • First-class Apache Avro support lets us work with strongly-typed data files with the ability to evolve schemas over time
  • Pluggable execution engines such as MapReduce and Apache Spark which let us keep up to date with new technology advancements in the big data space without having to rewrite all of our pipelines with each increment
  • Simple powerful testing using the supplied MemPipline for fast in-memory unit tests

So how does this look in practice? Suppose we want to know how many tracks were played in each country on a given day:

public static void main(String[] args) { Pipeline pipeline = new MRPipeline(Example1StreamCountJob.class); PCollection<TrackPlayedMessage> trackPlayedMessages = pipeline.read(From.avroFile("/logs/track_played/2014-01-01", TrackPlayedMessage.class)); trackPlayedMessages .parallelDo(new MapFn<TrackPlayedMessage, String>() { public String map(TrackPlayedMessage input) { return input.getCountry().toString(); } }, strings()) .count() .parallelDo(new MapFn<Pair<String, Long>, String>() { public String map(Pair<String, Long> input) { return input.first() + "\t" + input.second(); } }, strings()) .write(To.textFile("/my/output/path")); pipeline.done(); }

That’s all that is required. If you execute this code with hadoop jar ... then it’ll run a MapReduce job, reading from HDFS and writing back to HDFS at the end.

Now of course those anonymous inner classes are a bit ugly, but your IDE can usually hide them away or you can refactor them out to inner classes, and with any luck we’ll be able to do away with them altogether in the bright new promised land of Java 8. Either way, it’s an improvement on writing Java MapReduce jobs by hand.

Library Functions

One of the great things about this approach is that it lends itself really nicely to factoring out common higher-level operations for your use-cases. We’ve been actively looking for patterns that people have implemented in several different pipelines and factoring them out into a library of tried-and-tested implementations as a library that people can use from their code. We’ve given it the not-particularly-imaginative name of crunch-lib and it’s open source and available on Maven central so you can take advantage of it too. Here are just a few of the things that are currently included:

  • Easy field extraction and keying for Avro records
  • Calculating percentiles and averages for numeric data
  • Generating toplists of the most common items in a data set

This means something fairly complex like finding a list of the top 10 tracks played in each country becomes as simple as this:

PTable<String, String> countryTrack = AvroCollections.extract(trackPlayed, "country", "play_track", tableOf(strings(), strings())); PTable<String, Collection<Pair<Long, String>>> toplist = TopLists.topNYbyX(countryTrack, 10);

Unifying Execution

The only remaining hurdle was how to integrate with all our other systems for scheduling and execution, and to do that we needed people to build their pipelines within some kind of predictable structure and command line interface. It’s no good having people reading and writing to arbitrary data sets within the job code itself if you want to manage the paths externally.

The solution we came up with was to remove the reading and writing altogether from the job code that developers write, and instead just write an annotated method for data processing using inputs and outputs in PCollection form, then have a general-purpose launcher which executes these jobs using configuration from the command line. It looks a bit like this:

hadoop jar my_shaded_jar.jar com.example.MyJob -PtrackPlayed=/logs/track_played/2014-01-01 -Poutput=/my/output/path

As you can see we’ve omitted the reading, writing and pipeline management steps, as these will be performed by the common launching layer. This also has the additional benefit of enforcing a structure which is easily unit testable by passing in MemCollections and making assertions on the materialized output. The common launcher then allows this job to be run from the command line as follows:

hadoop jar my_shaded_jar.jar com.example.MyJob -PtrackPlayed=/logs/track_played/2014-01-01 -Poutput=/my/output/path

This Inversion of Control pattern means that this way of executing jobs works for any job, and makes it super easy to plug into other infrastructure (such as Luigi, which we still use for scheduling). It also means general purpose configuration (execution pools, non-standard memory requirements etc.) can be parsed at the common layer using familiar names, and eases the transition when ownership of pipelines is transferred between teams.


Crunch has been a huge boost for both our developer productivity and execution performance on Hadoop. I’d strongly recommend anyone who is still writing MapReduce by hand to have a look. Crunch might also be of interest to to anyone frustrated with the current stability of Spark. It provides a similarly nice API that executes as good old fashioned MapReduce and there is even the subproject Scrunch that exposes a Scala API almost identical to the one provided by Spark. If you want to try it out a great place to start is the Crunch Getting Started Guide.


Categories: Hadoop

Apache Hive 1.0.0 Has Been Released

Cloudera Blog - Wed, 02/04/2015 - 22:50

The Apache Hive PMC has recently voted to release Hive 1.0.0 (formerly known as Hive 0.14.1).

This release is recognition of the work the Apache Hive community has done over the past nine years and is continuing to do. The Apache Hive 1.0.0 release is a codebase that was expected to be released as 0.14.1 but the community felt it was time to move to a 1.x.y release naming structure.

As such, the code changes in 1.0.0 are small in scope. The two main changes are to begin to define the public API and remove HiveServer 1. The API documentation has only started and will be continued in HIVE-9363.

Removal of HiveServer 1 is an important step towards making Hive enterprise-ready. HiveServer 2 was contributed to the Hive project by Cloudera two and half years ago, and has enabled Hive to support JDBC, ODBC, and fined-grain authorization with Apache Sentry (incubating).

For CLI users, migrating to HiveServer2 will require migrating to Beeline. Cloudera and Intel have continued to invest heavily in Beeline and HiveServer2 to make the transition easier. We’ve added too many polish items to cover in any detail. One feature I am particularly excited about is retrieval of query logs via the JDBC API and Beeline query status, which is implemented using that API. This will make it easier for Hive developers to use Beeline to develop their future Hive jobs.

The next major release of Hive, 0.15.0, has been renamed to 1.1.0. The much anticipated Hive-on-Apache Spark work is the marquee feature expected in 1.1.0, and the release process has already started. (In this blog, we’ve covered the Hive-on-Spark project in detail, including the design, first demo, and a hands-on sandbox.)

As always, you can expect Cloudera to pull upstream Hive features into CDH when they are ready for our customers to run in production. Stay tuned.

Brock Noland is a Software Engineer at Cloudera, and a Hive PMC member.

Categories: Hadoop

How-to: Do Real-time Big Data Discovery using Cloudera Enterprise and Qlik Sense

Cloudera Blog - Wed, 02/04/2015 - 16:43

Thanks to Qlik for the post below about using Impala alongside Qlik Sense.

Cloudera and Qlik (which is part of the Impala Accelerator Program) have revolutionized the delivery of insights and value to every business stakeholder for “small data,” to something more powerful in the Big Data world—enabling users to combine Big Data and “small data” to yield actionable business insights.

In this post, you’ll learn how Qlik Sense interacts with HDFS via a Cloudera enterprise data hub. This document can be also used as a quick guide on how to conduct a fast and easy-to-set-up evaluation of Qlik Sense and Cloudera.


Qlik’s primary way to leverage CDH as a source is via ODBC. Qlik leverages the free Apache Hive ODBC connector and Impala ODBC connector, both of which have been certified with Qlik (Version Compatibility tab):

The Cloudera Connectors enable Qlik Sense to access Hadoop data by passing SQL queries to the underlying Impala or Hive engines. Data can be extracted either as a batch load (using Hive connector) or real-time via Qlik’s Direct Discovery feature (using the Impala connector). Within the context of a Qlik Sense application, the connections to Cloudera become a source that can co-exist with other sources (relational databases, flat files, and so on) within the associative model of the Qlik Sense application in question.


Cloudera’s QuickStart VM provides an easy way to get started for an easy-to-set-up evaluation of Qlik Sense and Cloudera. (In addition, the Impala ODBC connector will be needed as well.) After logging into the VM, use the Cloudera Manager console to confirm that the services are all running—specifically hdfs1, impala1, hue and mapreduce1:

Next, use Hue, the open source GUI for Hadoop (see the bookmark in the browser integrated with the VM), to load data into HDFS. For example, the Tables Option allows the creation of new databases and tables.

Once the data has been either identified or loaded, Qlik Sense can connect to Cloudera by creating an ODBC connection using the Impala ODBC driver. The IP address of the Cloudera environment will be needed. For example, for the Cloudera QuickStart VM, the IP address can be found using the shell command ifconfig, which will show something like this:

Using the appropriate IP address and Port number, an ODBC DSN that used the Impala ODBC connector can be used to connect to Cloudera. Use the DSN to test the connectivity.

Analyzing Data

As mentioned previously, Qlik Sense can extract data from Cloudera into an application either: a) in-memory as a batch load process, b) Direct Discovery as a real-time extraction, or c) Direct Discovery in hybrid mode, where part of the data model is loaded as part of a batch load process and the other part of the data model is kept real-time. This article will focus on option (b) and (c).

When a business discovery application is created, Qlik Sense will automatically create and capture the logic that will be used to query the data sources in question. When Direct Discovery is used, Qlik Sense determines which part of the data model resides in-memory and which part is real-time. Qlik Sense will use a specific syntax (DIRECT QUERY) for those constructs that are kept real-time. This syntax allows certain data elements not to be loaded into the data model but still have them available for query purposes from the user interface and to be combined for analysis with the elements that are in-memory.

When a Direct Discovery field is used in a chart, the corresponding SQL query runs on the external data source (Hadoop via Impala connector). Upon completion, the result of the query will be displayed in the chart. For example, a common approach is to keep fact tables as Direct Discovery tables and load the rest of the data model into memory (hybrid approach). This type of setup enables quick analysis for dimensional data for filtering and discovery purposes while keeping the fact data real-time.

When selections are made in the application, the associated data values of the Direct Discovery fields will be used as a filter for all other queries. With each selection, the Direct Discovery charts will be recalculated. It is possible to use calculation conditions to set a rule indicating when the chart should be calculated. Until that condition is met, Qlik will not run queries and the chart in question will not be calculated.

For the queries that represent the Direct Discovery part of the data model, Qlik will automatically use specific Qlik keywords. For example, the Direct Discovery statement will start with the keyword DIRECT QUERY. Then the statement will use other keywords such as: DIMENSION, MEASURE, DETAIL, and NATIVE.

  • The fields tagged as DIMENSION are loaded into memory as symbol tables so they can be used for quick selections and filtering purposes. The DIMENSION fields will be considered part of the associative data model and linked to other dimensions in the data model that are in-memory. When the application is loaded, Qlik will only load the unique values per field (standard behavior for all in-memory based fields too). This allows QlikView to use these columns to set up the associative links with the rest of the data model. If a DIMENSION field needs to be treated separately from the rest of the associative data model, then the keyword DETACH should be used instead. 
  • The fields tagged as MEASURE and DETAIL will exist only in the source data table within Hadoop, and they are not part of the in memory data model. Qlik will be aware of both, the MEASURE and DETAIL fields at the “meta level” so they may be used in charts and visualizations. However, the actual data of such fields reside only in the source. MEASURE fields that are used in a chart or visualization should be used with an aggregation function (Sum, Count, Min, Max, Avg). Alternatively, the DETAIL fields are used for drilling purposes and are not involved in any chart expressions.
  • The “NATIVE” fields are displayed at the lowest level without aggregation within a chart. In the case of relational databases, it is also possible to execute source Database SQL functions with the Direct Discovery table by using the keyword NATIVE.

Typically all keywords are used in a Direct Discovery statement which looks like this:  

DIRECT QUERY DIMENSION SalesPersonID, OrderDate, NATIVE('month([OrderDate])') as OrderMonth, NATIVE('Year([OrderDate])') as OrderYear MEASURE SubTotal, TaxAmt, TotalDue DETAIL DueDate, ShipDate, AccountNumber, CreditCardApprovalCode, rowguid, ModifiedDate DETACH SalesOrderID, CustomerID FROM AdventureWorks.Sales.SalesOrderHeader;

Qlik Direct Discovery capability comes with multi-table support. In other words, a Direct Discovery table could be defined as the output of the join of multiple tables at the source. It is also possible to have more than one Direct Discovery table within the same application. Qlik Direct Discovery can be used against any SQL compliant data source such as:

  • Hadoop (i.e. Cloudera) via ODBC connectors
  • ODBC/OLEDB data sources such as commercial relational databases
  • SQL-based custom connectors (SAP SQL Connector, Custom QVX connectors for SQL compliant data stores, etc.)
  • Other SQL-based sources (SAP HANA/Parstream/HP Vertica)

Qlik Sense is very efficient at combining data from multiple data sources, so that it can help organizations that need to complement the data residing in Hadoop with data residing in other sources. While all the users using the same application with Direct Discovery tables will be using the same connection, Qlik Sense provides a mechanism to have row-level based security rules at the user level.

Caching is used in order to improve the overall execution time, and user experience of Direct Discovery. As the same types of selections are made from various users, Qlik Sense will leverage the query from the cache rather than querying the source data. These cached result sets are shared across users.

There are a number of parameters that can be used in a Qlik Sense application to control and determine the exact behavior of Direct Discovery (amount of time cache will be stored in memory, the maximum number of parallel connections, the maximum rows that can be displayed at once when drilling down to DETAIL fields, and so on.).

In terms of Qlik Sense Direct Discovery performance, it is important to keep the following considerations in mind:

  • Performance of Direct Discovery queries on Cloudera will be directly affected by network performance, cluster size, and memory available in data nodes.
  • It is highly recommended to use an efficient megastore such as MySql vs. Derby in order to improve performance on a concurrent Direct Discovery queries.
  • Direct Discovery queries running through Impala can be accelerated by using the Apache Parquet file format.

As you should have gathered from the above, Qlik Sense and Cloudera work well together and integrate easily to provide business discovery capabilities against Big Data in a visual way. Your final result could look something like this:

Good luck!

Categories: Hadoop

Got SQL? Xplain.io Joins Cloudera

Cloudera Blog - Tue, 02/03/2015 - 12:00

Xplain.io is now part of Cloudera. 

Fifteen months ago, Rituparna Agrawal and I incorporated Xplain.io in a small shed in my backyard. With intense focus on solving real customer problems, we built an eclectic and diverse team with skills across database internals, distributed systems, and customer-centric design.

Throughout Q4 2013, we interviewed more than 60 enterprise data architects and found that they were all overwhelmed with the choices available in modern data management. Today’s data architect is expected to design, deploy, and maintain data stores across newSQL, key-value, document-oriented, in-memory, and many other data models. The problem is incredibly widespread, yet no tools were available to choose the most efficient data store for existing workloads.

Our first “Aha!” moment was the realization that customers already had a great starting point to build their modern data stores. For multiple decades, enterprises have crafted SQL queries to build reporting, analytics, ETL, and other applications. What if we could analyze these queries to choose a modern data store best-suited to specific workloads?

Working on that question quickly revealed the second Aha!: enterprises have very little visibility into their existing queries’ behavior. Xplain.io’s first customer executes nearly 8.4 million (yes, million!) SQL queries annually against various data stores. This begs the question: How many of these queries have access patterns that could benefit from a new data model? The customer did not have a clear answer, and we saw an opportunity. Today, Xplain.io’s profiler is used to identify the most common data access patterns and Xplain.io’s transformation engine is used to generate the schema design for modern data stores such as Impala.

Then another interesting pattern began to emerge in our customer base: almost all of them wanted us to generate code for CDH. In the summer of 2014, we started mapping out an integrated product offering with Mike Olson and Charles Zedlewski at Cloudera. Once we started delving into the strategy of integrating Xplain.io into CDH, it dawned on us that building product together would make for a much more powerful and useful tool for our customers.

On behalf of the team at Xplain, we would like to thank our Series A investor at Mayfield, partners, and customers for helping us come this far. We are incredibly excited to join forces with the whole Cloudera team to build best-in-class data management tools for enterprise data architects. Stay tuned to see what the future of this collaboration will bring.

Anupam Singh is the CEO and co-founder of Xplain.io.


Categories: Hadoop

This Month in the Ecosystem (January 2015)

Cloudera Blog - Mon, 02/02/2015 - 17:02

Welcome to our 16th edition of “This Month in the Ecosystem,” a digest of highlights from January 2015 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly). 

You may have noticed that this report went on hiatus for December 2014 due to a lack of critical news mass (plus, we realize that most of you are out of the loop until mid-January). It’s back with a vengeance, though:

  • Cloudera and Google announced new work to bring Apache Spark support to Google Cloud Dataflow, via the Dataflow SDK. This new Spark “runner” is now available in the form of a Cloudera Labs project.
  • Also released in Cloudera Labs: SparkOnHBase, an integration between Spark and Apache HBase.
  • Spotify described how Apache Crunch is becoming its main tool for building data pipelines, and the value of its in-house Crunch libraries (crunch-lib). 
  • Apache NiFi, a dataflow management and automation system, entered the Apache Incubator.
  • Transparent data encryption in HDFS became production-ready with the commitment of upstream patches related to Cloudera’s release of CDH 5.3.
  • Netflix open-sourced its in-house UDFs for Hive and Pig, under the name Surus.
  • Apache Hive 0.14, Apache Sqoop 1.99.4, Apache Tez 0.6, and Apache Pig 0.14 were all released by their respective communities.
  • Call for Papers for HBaseCon 2015 (May 7 in San Francisco), the community conference for the Apache HBase community, opened. Cfp CLOSES at midnight on Feb. 6, so don’t wait.
  • Apache Flink, Apache Drill, and Apache Falcon graduated into Top Level Projects.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

How-to: Use BIRT with Impala for Interactive Big Data Reporting

Cloudera Blog - Thu, 01/29/2015 - 16:44

Thanks to Michael Williams, BIRT Product Evangelist & Forums Manager at analytics software specialist Actuate Corp. (now OpenText), for the guest post below. Actuate is the primary builder and supporter of BIRT, a top-level project of the Eclipse Foundation.

The Actuate (now OpenText) products BIRT Designer Professional and BIRT iHub allow you to connect to multiple data sources to create and deliver meaningful visualizations securely, with scalability reaching millions of users and devices. And now, with Impala emerging as a standard Big Data query engine for many of Actuate’s customers, solid BIRT integration with Impala has become critical.

For that reason, Actuate was one of the initial companies to join the Impala Accelerator Program—a partner program that develops and certifies innovative applications on Impala. Actuate joined this program to provide an end-to-end data management and embedded analytics/data visualization solution on top of Impala, and ensure customers would have a seamless experience when they pair BIRT Designer Professional with a Cloudera Enterprise deployment.

In this post, you’ll learn how to use Actuate’s BIRT Designer Professional to grab data from Impala via a JDBC connection, that you can in turn use to create meaningful, interactive reports and dashboards that allow you to deliver new insights to your end users.


First things first: If you don’t have BIRT, you’ll need to download it. This article makes the assumption you have data available through Impala. (Note: BIRT Designer Professional has a Cloudera-specific data source. With open source BIRT, you’d use the JDBC data source. For the purposes of this post, I’m using BIRT Designer Professional.)

The next thing you’ll need to do is grab the HiveServer2 jars from your CDH install and add them to your BIRT install in the folder:

 <BDPro location>/eclipse/plugins/org.eclipse.birt.report.data.oda.jdbc_4.2.3.v20131216-0430/drivers/

In the Cloudera VM I used, these were located at /opt/cloudera/parcels/CDH/lib/hive/lib/.

If you’ve already started BIRT, you’ll need to restart it at this point.

Creating a Project/Design

To create a new BIRT project, all you need to do is right click in the Navigator in the lower left corner of the BIRT designer and choose New -> Project. Then, right click on your newly created project and select New -> Report to create the report design. (For more information on the BIRT Designer, see the Design Guides section of the BIRT Developer Center’s Design Center.)

Creating the Connection

Now you’re ready to set up our connection. All you need to do is create a new data source in BIRT, by right-clicking on Data Sources in the Data Explorer in the top left area of the designer, choosing New -> Data Source, and selecting the Cloudera Hive Data Source type.

Click Next and enter your connection information and test to make sure you’re able to connect to your server. To connect to Impala, you’d use a URL like jdbc:hive2://;auth=noSasl

With your connection made, you can now create a data set using this data source and write your HQL queries. To do this, right-click on Data Sets in your Data Explorer, choose New Data Set, select your data source from your list of sources, and select Next and then enter your query.

What Now?

Now that you’re able to connect to your data, “It’s just BIRT.” Just as with any other data, you can do further computations and joins, create tables, charts, crosstabs, and so on to display your data in the way that is useful to your end users. Then you deploy the reports with BIRT iHub to make them accessible to your users, by using the standalone viewer or by embedding the BIRT content into your application.

For more information on BIRT and BIRT iHub, or if you have questions about using BIRT with Impala (or Cloudera Enterprise 5), see the BIRT Developer Center.


Categories: Hadoop

Tutorials at Strata + Hadoop World San Jose: Architecture, Hadoop Ops, Interactive SQL-on-Hadoop

Cloudera Blog - Tue, 01/27/2015 - 17:01

Strata + Hadoop World San Jose 2015 (Feb. 17-20) is a focal point for learning about production-izing Hadoop.

Strata + Hadoop World sessions have always been indispensable for learning about Hadoop internals, use cases, and admin best practices. When deep learning is needed, however—and deep dives are a necessity if you’re running Hadoop in production, or aspire to—tutorials are your ticket.

This year, tutorials span a range of topics that are central in today’s Hadoop conversation, including Apache Spark, real-time architecture, and data visualization. There are four tutorials in particular, however, to which I want to direct your attention due to their emphasis on successful production deployments.

Morning Tutorials Interactive SQL-on-Hadoop (9am-12:30pm)

Led by Marcel Kornacker & John Russell

SQL users are the lifeblood of any organization’s data journey, because their adoption of Hadoop technologies are crucial for success. In this tutorial, attendees will learn many different ways to ingest data into HDFS so that it’s query-able in SQL through Impala, and explore all the kinds of tables (internal and external, file formats, partitioned and unpartitioned) and why one would use each one. They will also try many kinds of queries in real time, and understand what factors influence their performance.

Architectural Considerations for Hadoop Applications (aka Architecture Day, Part 1) (9am-12:30pm)

Led by Mark Grover, Jonathan Seidman, Gwen Shapira, and Ted Malaska

Implementing production-ready solutions with Hadoop requires understanding not just Hadoop, but a broad range of related projects in the Hadoop ecosystem such as Apache Hive, Apache Pig, Apache Oozie, Apache Sqoop, and Apache Flume. In this tutorial, attendees will walk through an end-to-end case study of a clickstream analytics engine to understand a concrete example of how to architect and implement a complete solution with Hadoop.

Afternoon Tutorials Building an Apache Hadoop Data Application (aka Architecture Day, Part 2) (1:30-5pm)

Led by Tom White, Joey Echeverria, and Ryan Blue

In the second (afternoon) half of the Architecture Day tutorial, attendees will build a data application from the ground up. The application will ingest streaming user data (like web clicks) and, using tools and APIs in the Kite SDK, transform and store the data in Hadoop in a form that is readily consumable with Hadoop tools like Impala and Spark. Attendees will also learn how Kite SDK codifies the best practices from the Hadoop Architecture Day morning session.

Apache Hadoop Operations for Production Systems (1:30-5pm)

Led by Kathleen Ting, Philip Zeyliger, Philip Langdale, and Miklos Christine

Hadoop is emerging as the standard for big data processing and analytics. However, as usage of the Hadoop clusters grow, so do the demands of managing and monitoring these systems. In this tutorial, attendees will get an overview of all phases for successfully managing Hadoop clusters, with an emphasis on production systems—from installation, configuration management, service monitoring, troubleshooting and support integration. We will review tooling capabilities and highlight the ones that have been most helpful to users, and share some of the lessons learnt and best practices from users that depend on Hadoop as a business-critical system.

That’s a couple really good potential tutorial schedules right there. To fortify your Strata + Hadoop World experience and make it as actionable as possible, strongly consider adding at least one to your agenda. (And if you want full, industrial-strength, bona fide training, Cloudera University will offer a four-day “Designing and Building Big Data Applications” class co-located with the conference. More info here.)

Categories: Hadoop

New in CDH 5.3: Apache Sentry Integration with HDFS

Cloudera Blog - Mon, 01/26/2015 - 16:39

Starting in CDH 5.3, Apache Sentry integration with HDFS saves admins a lot of work.

It’s been more than a year and a half since a couple of my colleagues here at Cloudera shipped the first version of Sentry (now Apache Sentry (incubating)). This project filled a huge security gap in the Apache Hadoop ecosystem by bringing truly secure and dependable fine grained authorization to the Hadoop ecosystem and provided out-of-the-box integration for Apache Hive. Since then the project has grown significantly–adding support for Impala and Search and the wonderful Hue App to name a few significant additions.

In order to provide a truly secure and centralized authorization mechanism, Sentry deployments have been historically set up so that all Hive’s data and metadata are accessible only by HiveServer2 and every other user is cut out. This has been a pain point for Sqoop users as Sqoop does not use the HiveServer2 interface. Hence users with a Sentry-secured Hive deployment were forced to split the import task into two steps: simple HDFS import followed by manually loading the data into Hive.

With the inclusion of HDFS ACLs and the integration of Sentry into the Hive metastore in CDH 5.1, users were able to improve this situation and get the direct Hive import working again. However, this approach required manual administrator intervention to configure HDFS ACLs according to the Sentry configuration and needed a manual refresh to keep both systems in sync.

One of the large features included in the recently released CDH 5.3 is Sentry integration with HDFS, which enables customers to easily share data between Hive, Impala and all the other Hadoop components that interact with HDFS (MapReduce, Spark, Pig, and Sqoop, and so on) while ensuring that user access permissions only need to be set once, and that they are uniformly enforced.

The rest of this post focuses on the example of using Sqoop together with this Sentry feature. Sqoop data can now be imported into Hive without any additional administrator intervention. By exposing Sentry policies—what tables from which a user can select and to what tables they can insert—directly in HDFS, Sqoop will re-use the same policies that have been configured via GRANT/REVOKE statements or the Hue Sentry App and will import data into Hive without any trouble.


In order for Sqoop to seamlessly import into a Sentry Secured Hive instance, the Hadoop administrator needs to follow a few configuration steps to enable all the necessary features. First, your cluster needs to be using the Sentry Service as backend for storing authorization metadata and not rely on the older policy files.

If you are already using Sentry Service and GRANT/REVOKE statements, you can directly jump to step 3).

  1. Make sure that you have Sentry service running on your cluster. You should see it in the service list:

  2. And that Hive is configured to use this service as a backend for Sentry metadata:

  3. Finally enable HDFS Integration with Sentry:

Example Sqoop Import

Let’s assume that we have user jarcec who needs to import data into a Hive database named default. User jarcec is part of a group that is also called jarcec – in real life the name of the group doesn’t have to be the same as the username and that is fine.

With an unsecured Hive installation, the Hadoop administrator would have to jump in and grant writing privilege to user jarcec for directory /user/hive/warehouse or one of its subdirectories. With Sentry and HDFS integration, the Hadoop administrator no longer needs to jump in. Instead Sqoop will reuse the same authorization policies that has been configured through Hive SQL or via the Sentry Hue Application. Let’s assume that user bc is jarcec‘s Manager and already has privileges to grant privileges in the default database.

  1. bc starts by invoking beeline and connecting to HiveServer2:
  2. [bc@sqoopsentry-1 ~]$ beeline 1: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> !connect jdbc:hive2://sqoopsentry-1.vpc.cloudera.com:10000/default;principal=hive/sqoopsentry-1.vpc.cloudera.com@ENT.CLOUDERA.COM

  3. In case that user jarcec is not part of any role yet, we need to create a role for him:
  4. 1: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> CREATE ROLE jarcec_role; No rows affected (0.769 seconds)

  5. And this new role jarcec_role needs to be granted to jarcec‘s group jarcec.
  6. >

    1: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> GRANT ROLE jarcec_role to GROUP jarcec; No rows affected (0.651 seconds)

  7. And finally bc can grant access to database default (or any other) to the role jarcec_role;
  8. 1: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> GRANT ALL ON DATABASE default TO ROLE jarcec_role; No rows affected (0.16 seconds)

By executing the steps above, user jarcec has been given privilege to do any action (insert or select) with all objects inside database default. That includes the ability to create new tables, insert data or simply querying existing tables. With those privileges user jarcec can run the following Sqoop command as he was used to:

[jarcec@sqoopsentry-1 ~]$ sqoop import --connect jdbc:mysql://mysql.ent.cloudera.com/sqoop --username sqoop --password sqoop --table text <strong>--hive-import</strong> 14/12/14 15:37:38 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.3.0 …. 14/12/14 15:38:58 INFO mapreduce.ImportJobBase: Transferred 249.7567 MB in 75.8448 seconds (3.293 MB/sec) 14/12/14 15:38:58 INFO mapreduce.ImportJobBase: Retrieved 1000000 records. 14/12/14 15:38:58 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `text` AS t LIMIT 1 14/12/14 15:38:58 INFO hive.HiveImport: Loading uploaded data into Hive 14/12/14 15:39:09 INFO hive.HiveImport: 14/12/14 15:39:09 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore. 14/12/14 15:39:09 INFO hive.HiveImport: 14/12/14 15:39:09 INFO hive.HiveImport: Logging initialized using configuration in jar:file:/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.26/jars/hive-common-0.13.1-cdh5.3.0.jar!/hive-log4j.properties 14/12/14 15:39:12 INFO hive.HiveImport: OK 14/12/14 15:39:12 INFO hive.HiveImport: Time taken: 1.079 seconds 14/12/14 15:39:12 INFO hive.HiveImport: Loading data to table default.text 14/12/14 15:39:12 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00000 14/12/14 15:39:12 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00001 14/12/14 15:39:12 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00002 14/12/14 15:39:13 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00003 14/12/14 15:39:13 INFO hive.HiveImport: Table default.text stats: [numFiles=4, numRows=0, totalSize=261888896, rawDataSize=0] 14/12/14 15:39:13 INFO hive.HiveImport: OK 14/12/14 15:39:13 INFO hive.HiveImport: Time taken: 0.719 seconds 14/12/14 15:39:13 INFO hive.HiveImport: <strong>Hive import complete</strong>. 14/12/14 15:39:13 INFO hive.HiveImport: Export directory is not empty, keeping it.

And jarcec can easily confirm in beeline that data have been indeed imported into Hive:

0: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> show tables from default; +------------+--+ | tab_name | +------------+--+ | text | +------------+--+ 1 row selected (0.177 seconds) 0: jdbc:hive2://sqoopsentry-1.vpc.cloudera.co> select count(*) from text; +----------+--+ | _c0 | +----------+--+ | 1000000 | +----------+--+ 1 row selected (72.188 seconds)

If Hive is configured to inherit permissions, you might notice that Sqoop will print out several warnings similar to this one:

14/12/14 15:39:12 INFO hive.HiveImport: setfacl: Permission denied. user=jarcec is not the owner of inode=part-m-00000

As there is no need to inherit HDFS permissions when Sentry is enabled in HDFS, you can safely ignore such messages.

That’s All, Folks!

The latest CDH release version 5.3.0 brings a bunch of new features. All Sqoop users should particularly check out the Sentry integration with HDFS as it will enable simple and straightforward import into Sentry-secured Hive deployments without the need to manually configure HDFS permissions. The same SQL interface that is used to grant access to various databases and tables is used to determine who can import (or export) data into Hive!

Jarcek Jarcec Cecho is an Engineering Manager at Cloudera, responsible for the Data Ingest team (see team blog). Jarcec is also a committer/PMC member for Sqoop, Apache Flume, Apache MRunit, Apache Datafu (incubating), and Apache Sentry (incubating). He is also the co-author of Apache Sqoop Cookbook.

Categories: Hadoop

Advanced Analytics with Apache Spark: The Book

Cloudera Blog - Fri, 01/23/2015 - 17:16

Authored by a substantial portion of Cloudera’s Data Science team (Sean Owen, Sandy Ryza, Uri Laserson, Josh Wills), Advanced Analytics with Spark (currently in Early Release from O’Reilly Media) is the newest addition to the pipeline of ecosystem books by Cloudera engineers. I talked to the authors recently.

Why did you decide to write this book?

We think it’s mostly to fill a gap between what a lot of people need to know to be productive with large-scale analytics on Apache Hadoop in 2015, and the resources that are out there. There are plenty of books on machine learning theory, and plenty of references covering how to use Hadoop ecosystem tools. However, there is not as much specifically targeting the overlap between the two, and focusing on use cases and examples rather than being a manual. So the book is a modest attempt to meet that need, which we see turn up frequently among customers and in the community.

Who is the intended reader?

The ideal reader is a data scientist or aspiring data scientist. “Data scientist” has come to mean quite a few things, but the book is targeted specifically at the subset who are interested in analysis on large datasets, and who are motivated to learn a bit about the software and mathematical underpinnings of doing so. It will be most useful for people who want to get their heads around the basics of machine learning but are more interested in its application than the theory.

Different chapters appeal to different levels of experience in different fields. For example, the second chapter, on record linkage, seeks to teach the basics of using Scala and Apache Spark to work with data, while the eighth chapter, on estimating financial risk through Monte Carlo simulation, assumes a basic understanding of probability and statistics.

What will readers learn, and how does it complement what they will learn from other titles on the market?

Readers ought to pick up the 20% of Spark that’s used 80% of the time in practice. It’s not a reference by any means; Learning Spark (also in Early Release at the time of this writing) is the “definitive” guide. Likewise, it gives enough machine-learning theory to use Spark as a tool for analytics correctly but is not a textbook or ML course. It still complements, say, Coursera’s free online ML courses.

What makes Spark so different in this particular area? Why do people need to know about this?

The first couple chapters of the book actually try to answer this question, and we think it comes down to couple things. Spark is just far more developer-friendly than its predecessor frameworks that process large datasets. Its rich library of operators makes expressing complex transformations easy, and the interactive environment it provides enables exploratory analysis. Spark also has primitives that open up many of the processing patterns required by machine-learning algorithms. It’s relevant for exploratory as well operational analytics. 

None of these capabilities are individually new, but having one platform that does a decent job at all of them is powerful. Its abstractions strike a nice balance between forcing the user to write programs that can scale to lots of data and allowing them to think about things at a high level.

Advanced Analytics with Spark is scheduled to be generally available by April 2015. Get a free signed Early Release from the authors at Strata + Hadoop World San Jose 2015!

Categories: Hadoop