HBaseCon 2015: Call for Papers and Early Bird Registration

Cloudera Blog - Fri, 12/19/2014 - 16:54

HBaseCon 2015 is ON, people! Book Thursday, May 7, in your calendars.

If you’re a developer in Silicon Valley, you probably already know that since its debut in 2012, HBaseCon has been one of the best developer community conferences out there. If you’re not, this is a great opportunity to learn that for yourself: HBaseCon 2015 will occur on Thurs., May 7, 2015, at the Westin St. Francis on Union Square in San Francisco.

If you’re new to HBase or HBaseCon, the following FAQs should give you a good overview:

What is Apache HBase?

HBase is the Apache Hadoop database: a distributed, scalable, data store that provides random, realtime read/write access to Big Data. The HBase community independently works within the Apache Software Foundation to provide HBase software under the permissive Apache license.

What is HBaseCon?

HBaseCon is the premier community event for Apache HBase contributors, developers, admins, and users of all skill levels. The event is hosted and organized by Cloudera, with the Program Committee including leaders from across the HBase community (including employees of Cask, Cloudera, Facebook, Google, Hortonworks, and Salesforce.com). In past iterations (2012-2014), employees of companies like Nielsen, Bloomberg LP, Optimizely, Intel, Facebook, Flurry, Google, Groupon, Twitter, Pinterest, Opower, Xiaomi, Yahoo!, Ancestry.com, and many others have presented about their HBase production use cases.

To get an idea of the conference experience, explore presentations from previous years, and view photos from HBaseCon 2014.

Why should I go to HBaseCon?

HBaseCon is the only place in the world where you’ll find virtually every HBase committer and power-user under one roof during a single day. Basically, if you have any serious interest in HBase at all, missing HBaseCon is unthinkable! (Plus, we always have a great party.)

What should I present at HBaseCon?

The Program Committee is looking for talks about war stories/case studies, internals, development and admin/devops best practices, and futures. Anything you can share that will help others run HBase in production successfully is appreciated.

How much does it cost to attend?

Early Bird registration: $375 (through Feb. 1, 2015); Standard registration: $425

So think about that session proposal over the holidays and send it over to the Program Committee for review. But don’t thing about it for TOO long; CfP (via hbasecon.com) closes at midnight on Feb. 6.

If you’re happy with the idea of just being an attendee, you’ll save $50 if you register during the Early Bird period (ends soon).

Categories: Hadoop

New in Cloudera Labs: SparkOnHBase

Cloudera Blog - Thu, 12/18/2014 - 21:06

Apache Spark is making a huge impact across our industry, changing the way we think about batch processing and stream processing. However, as we progressively migrate from MapReduce toward Spark, we shouldn’t have to “give up” anything. One of those capabilities we need to retain is the ability to interact with Apache HBase.

In this post, we will share the work being done in Cloudera Labs to make integrating Spark and HBase super-easy in the form of the SparkOnHBase project. (As with everything else in Cloudera Labs, SparkOnHBase is not supported and there is no timetable for possible support in the future; it’s for experimentation only.) You’ll learn common patterns of HBase integration with Spark and see Scala and Java examples for each. (It may be helpful to have the SparkOnHBase repository open as you read along.)

HBase and Batch Processing Patterns

Before we get into the coolness of Spark, let’s define some powerful usage patterns around HBase interactions with batch processing. This discussion is necessary because when I talk to many customers that are new to HBase, they tell me that they hear HBase and MapReduce should never be used together.

In fact, although there are valid use cases to have a HBase cluster that is isolated from MapReduce for low SLA reasons, there are also use cases where the combination of MapReduce and HBase is the right approach. Here are just a couple examples:

  • Massive operations on a tree/DAG/graph structures stored in HBase
  • Interaction with a store or table that is in constant change, with MapReduce or Impala
SparkOnHBase Design

We experimented with many designs for how Spark and HBase integration should work and ended up focusing on a few goals:

  • Make HBase connections seamless.
  • Make Kerberos integration seamless.
  • Create RDDs through Scan actions or from an existing RDD which are used to generate Get commands.
  • Take any RDD and allow any combination of HBase operations to be done.
  • Provide simple methods for common operations while allowing unrestricted, unknown advanced operation through the API.
  • Support Scala and Java.
  • Support Spark and Spark Streaming with a like API.

These goals led us to a design that took a couple of notes from the GraphX API in Spark. For example, in SparkOnHBase there is an object called HBaseContext. This class has a constructor that takes HBase configuration information and then once constructed, allows you to do a bunch of operations on it. For example, you can:

  • Create RDD/DStream from a Scan
  • Put/Delete the contents of a RDD/DStream into HBase
  • Create a RDD/DStream from gets created from the contents of a RDD/DStream
  • Take the contents of a RDD/DStream and do any operation if a HConnection was handed to you in the worker process

Let’s walk through a code example so you can an idea about how easy and powerful this API can be. First, we create a RDD, connect to HBase, and put the contents of that RDD into HBase.

// Nothing to see here just creating a SparkContext like you normally would val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + tableName + " " + columnFamily) val sc = new SparkContext(sparkConf) //This is making a RDD of //(RowKey, columnFamily, columnQualifier, value) val rdd = sc.parallelize(Array( (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) ) ) //Create the HBase config like you normally would then //Pass the HBase configs and SparkContext to the HBaseContext val conf = HBaseConfiguration.create(); conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); val hbaseContext = new HBaseContext(sc, conf); //Now give the rdd, table name, and a function that will convert a RDD record to a put, and finally // A flag if you want the puts to be batched hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, tableName, //This function is really important because it allows our source RDD to have data of any type // Also because puts are not serializable (putRecord) => { val put = new Put(putRecord._1) putRecord._2.foreach((putValue) => put.add(putValue._1, putValue._2, putValue._3)) put }, true);

Now every partition of that RDD will execute in parallel (in different threads in a number of Spark workers across the cluster)—kind of like what would have happened if we did Puts in a mapper or reducer task.

One thing to note is that the same rules apply when working with HBase from MapReduce or Spark in terms of Put and Get performance. If you have Puts that are not partitioned, a Put batch will most likely get sent to each RegionServer, which will result in fewer records per RegionServers per batch. The image below illustrates how this would look with six RegionServers; imagine if you had 100 of them (it would be 16.7x worse)!

Now let’s look at that same diagram if we used Spark to partition first before talking to HBase.


Next, we’ll quickly explore just three code examples to illustrate how you can do different types of operations. (A Put example would look almost exactly like a delete, checkPut, checkDelete, or increment example.)

The big difference in a get example would be the fact that we are producing a new RDD from an existing one. Think of it as a “Spark map function.”

// Create some fake data val rdd = sc.parallelize(Array( (Bytes.toBytes("1")), … (Bytes.toBytes("6")), (Bytes.toBytes("7")))) //Make HBaseContext val conf = HBaseConfiguration.create() conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) val hbaseContext = new HBaseContext(sc, conf); //This is the method we are going to focus on val getRdd = hbaseContext.bulkGet[Array[Byte], String]( tableName, //The table we want to get from 2, //Get list batch size. Set this somewhere under 1000 rdd, //RDD that hold records that will turn into Gets record => { //Function that will take a given record to a Get new Get(record) }, (result: Result) > { //Function that will take a given result and return a serializable object val it = result.list().iterator() val b = new StringBuilder b.append(Bytes.toString(result.getRow()) + ":") while (it.hasNext()) { val kv = it.next() val q = Bytes.toString(kv.getQualifier()) if (q.equals("counter")) { b.append("(" + Bytes.toString(kv.getQualifier()) + "," + Bytes.toLong(kv.getValue()) + ")") } else { b.append("(" + Bytes.toString(kv.getQualifier()) + "," + Bytes.toString(kv.getValue()) + ")") } } b.toString })

Now, let’s say your interaction with HBase is more complex than straight gets or Puts—a case were you want to say, “Just give me an HConnection and leave me alone.” Well, HBaseContext has map, mapPartition, foreach, andforeachPartition methods just for you.

Here’s an example of the foreachPartition in Java.

JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); //Create some fake data List list = new ArrayList(); list.add(Bytes.toBytes("1")); list.add(Bytes.toBytes("2")); list.add(Bytes.toBytes("3")); list.add(Bytes.toBytes("4")); list.add(Bytes.toBytes("5")); JavaRDD rdd = jsc.parallelize(list); //This foreachPartition will allow us to do anything we want with a HConnection // It take two parameters: // - input RDD // - a VoidFunction that will get a Iterator and the HConnection. The Iterator will // have all the records in this partition hbaseContext.foreachPartition(rdd, new VoidFunction, HConnection>>() { public void call(Tuple2, HConnection> t) throws Exception { //We can get the table out side of the loop HTableInterface table1 = t._2().getTable(Bytes.toBytes("Foo")); Iterator it = t._1(); //Go through every record and getting it from HBase // if it isn't there then put it there. Not a great real world example but an example while (it.hasNext()) { byte[] b = it.next(); Result r = table1.get(new Get(b)); if (!r.getExists()) { table1.put(new Put(b)); } } //close table outside of loop table1.close(); } });

The last example to talk about will be the create a RDD from a scan:

val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) val hbaseContext = new HBaseContext(sc, conf) var scan = new Scan() scan.setCaching(100) var getRdd = hbaseContext.hbaseRDD(tableName, scan)

This code will execute a scan just like MapReduce would do with the table input format and populate the resulting RDD with records of type (RowKey, List[(columnFamily, columnQualifier, Value)]. If you don’t like that record type, then just use the hbaseRDD method, which gives you a record conversion function for changing it to whatever you like.


SparkOnHBase has been tested on a number of clusters with Spark and Spark Streaming; give it a look and let us know your feedback via the Cloudera Labs discussion group. The hope is that this project and others like it will help us blend the goodness from different Hadoop ecosystem components to help solve bigger problems.


Special thanks to the people that helped me make SparkOnHBase: Tathagata Das (TD), Mark Grover, Michael Stack, Sandy Ryza, Kevin O’Dell, Jean-Marc Spaggiari, Matteo Bertozzi, and Jeff Lord.

Ted Malaska is a Solutions Architect at Cloudera, a contributor to Apache Spark, and a co-author of the O’Reilly book, Hadoop Applications Architecture.

Categories: Hadoop

The Top 10 Posts of 2014 from the Cloudera Engineering Blog

Cloudera Blog - Thu, 12/18/2014 - 17:20

Our “Top 10″ list of blog posts published during a calendar year is a crowd favorite (see the 2013 version here), in particular because it serves as informal, crowdsourced research about popular interests. Page views don’t lie (although skew for publishing date—clearly, posts that publish earlier in the year have pole position—has to be taken into account). 

In 2014, a strong interest in various new components that bring real time or near-real time capabilities to the Apache Hadoop ecosystem is apparent. And we’re particularly proud that the most popular post was authored by a non-employee.

  1. How-to: Create a Simple Hadoop Cluster with VirtualBox
    by Christian Javet
    Explains how t set up a CDH-based Hadoop cluster in less than an hour using VirtualBox and Cloudera Manager.
  2. Why Apache Spark is a Crossover Hit for Data Scientists
    by Sean Owen
    An explanation of why Spark is a compelling multi-purpose platform for use cases that span investigative, as well as operational, analytics. 
  3. How-to: Run a Simple Spark App in CDH 5
    by Sandy Ryza
    Helps you get started with Spark using a simple example.
  4. New SQL Choices in the Apache Hadoop Ecosystem: Why Impala Continues to Lead
    by Justin Erickson, Marcel Kornacker & Dileep Kumar
    Open benchmark testing of Impala 1.3 demonstrates performance leadership compared to alternatives (by 950% or more), while providing greater query throughput and with a far smaller CPU footprint.
  5. Apache Kafka for Beginners
    by Gwen Shapira & Jeff Holoman
    When used in the right way and for the right use case, Kafka has unique attributes that make it a highly attractive option for data integration.
  6. Apache Hadoop YARN: Avoiding 6 Time-Consuming “Gotchas”
    by Jeff Bean
    Understanding some key differences between MR1 and MR2/YARN will make your migration much easier.
  7. Impala Performance Update: Now Reaching DBMS-Class Speed
    by Justin Erickson, Greg Rahn, Marcel Kornacker & Yanpei Chen
    As of release 1.1.1, Impala’s speed beat the fastest SQL-on-Hadoop alternatives–including a popular analytic DBMS running on its own proprietary data store.
  8. The Truth About MapReduce Performance on SSDs
    by Karthik Kambatla & Yanpei Chen
    It turns out that cost-per-performance, not cost-per-capacity, is the better metric for evaluating the true value of SSDs. (See the session on this topic at Strata+Hadoop World San Jose in Feb. 2015!)
  9. How-to: Translate from MapReduce to Spark
    by Sean Owen
    The key to getting the most out of Spark is to understand the differences between its RDD API and the original Mapper and Reducer API.
  10. How-to: Write and Run Apache Giraph Jobs on Hadoop
    by Mirko Kämpf
    Explains how to create a test environment for writing and testing Giraph jobs, or just for playing around with Giraph and small sample datasets.

Based on the above, a significant number of you are at least exploring Apache Spark as an eventual replacement for MapReduce, as well as tracking Impala’s progress as the standard analytic database for Apache Hadoop. What will next year bring, do you think? 

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

Hands-on Hive-on-Spark in the AWS Cloud

Cloudera Blog - Tue, 12/16/2014 - 20:03

Interested in Hive-on-Spark progress? This new AMI gives you a hands-on experience.

Nearly one year ago, the Apache Hadoop community began to embrace Apache Spark as a powerful batch-processing engine. Today, many organizations and projects are augmenting their Hadoop capabilities with Spark. As part of this shift, the Apache Hive community is working to add Spark as an execution engine for Hive. The Hive-on-Spark work is being tracked by HIVE-7292 which is one of the most popular JIRAs in the Hadoop ecosystem. Furthermore, three weeks ago, the Hive-on-Spark team offered the first demo of Hive on Spark.

Since that demo, we have made tremendous progress, having finished up Map Join (HIVE-7613), Bucket Map Join (HIVE-8638), integrated with Hive Server 2 (HIVE-8993) and importantly integrated our Spark Client (HIVE-8548, aka Remote Spark Context). Remote Spark Context is important as it’s not possible to have multiple SparkContexts within a single process. The RSC API allows us to run the SparkContext on the server in a container while utilizing the Spark API on the client—in this case HiveServer 2, which reduces resource utilization on an already burdened component.

Many users have proactively starting using the Spark branch and providing feedback. Today, we’d like to offer you the first chance to try Hive-on-Spark yourself. As this work is under active development, for most users, we do not recommend that you attempt to run this code outside of the packaged Amazon Machine Image (AMI) provided. The AMI ami-35ffed70 (named hos-demo-4) is available in us-west-1 while we recommend an instance of m3.large or larger.

Once logging in as ubuntu, change to the hive user (sudo su - hive) and you will be greeted with instructions on how to start Hive on Spark. Pre-loaded on the AMI is a small TPC-DS dataset and some sample queries. Users are strongly encouraged to load their own sample datasets and try their own queries. We are hoping not only to showcase our progress delivering Hive-on-Spark but also to help find areas of improvement, early. As such, if you find any issues, please email hos-ami@cloudera.org and the cross-vendor team will do its best to investigate the issue.

Despite spanning the globe, the cross-company engineering teams have become close. The team members would like to thank our employers for sponsoring this project: MapR, Intel, IBM, and Cloudera.

Rui Li is a software engineer at Intel and a contributor to Hive.

Na Yang is a staff software engineer at MapR and a contributor to Hive.

Brock Noland is an engineering manager at Cloudera and a Hive PMC member.



Categories: Hadoop

5 Pitfalls of Benchmarking Big Data Systems

Cloudera Blog - Fri, 12/12/2014 - 17:31

Benchmarking Big Data systems is nontrivial. Avoid these traps!

Here at Cloudera, we know how hard it is to get reliable performance benchmarking results. Benchmarking matters because one of the defining characteristics of Big Data systems is the ability to process large datasets faster. “How large” and “how fast” drive technology choices, purchasing decisions, and cluster operations. Even with the best intentions, performance benchmarking is fraught with pitfalls—easy to get numbers, hard to tell if they are sound.

Below we list five common pitfalls and illustrate them with internal and customer-based stories. They offer a behind-the-scenes look at our engineering and review processes that allow us to produce rigorous benchmark results. These stories illustrate important principles of conducting your own performance benchmarking, or assessing others’ results.

Pitfall 1: Comparing Apples to Oranges

We often run two tests, expecting only one parameter to change, while in fact many parameters changed and a comparison is impossible – in other words, we “compare apples to oranges.”

CDH 5.0.0 was the first release of our software distribution with YARN and MapReduce 2 (MR2) as the default MapReduce execution framework. To their credit, our partners and customers did performance benchmarking on their own when they considered whether to upgrade. Many partners and customers initially reported a performance regression from MapReduce 1 (MR1) in earlier versions of CDH to YARN and MapReduce2 in CDH 5.0.0.

What actually happened was that a straightforward benchmark ended up comparing two different things—comparing apples to oranges. Two technical issues led to this comparison discrepancy.

One issue was that TeraSort, a limited yet popular benchmark, changed between MR1 and MR2. To reflect rule changes in the GraySort benchmark on which it is based, the data generated by the TeraSort included with MR2 is less compressible. A valid comparison would use the same version of TeraSort for both releases, because map-output compression is enabled by default as a performance optimization in CDH. Otherwise, MR1 will have an unfair advantage by using more compressible data.

Another issue was the replacement of the “task slot” concept in MR1 with the “container” concept in MR2. YARN has several configuration parameters that affected how many containers will be run on each node. A valid comparison would set these configurations such that there is the same degree of parallel processing between MR1 and MR2. Otherwise, depending on whether hardware is over or under-committed, either MR1 or MR2 will have the advantage.

We committed these pitfalls ourselves in the early days of ensuring MR1 and MR2 performance parity. We regularly compared MR1 and MR2 performance on our nightly CDH builds, and the “regression” was caught the very first time we did this comparison. Our MapReduce and Performance Engineering teams collaborated to both identify the code changes, and understand what makes a valid performance comparison. This effort culminated in MR2 shipped in CDH 5.0.0 at performance parity with MR1.

Pitfall 2: Not Testing at Scale

“Scale” for big data systems can mean data scale, concurrency scale (number of jobs and number of tasks per job), cluster scale (number of nodes/racks), or node scale (per node hardware size). Failing to test “at scale” for any of these dimensions can lead to surprising behavior for your production clusters.

It is illustrative to look at another aspect of our efforts to drive MR2 to performance parity with MR1. We wanted to verify that MR2 and MR1 perform at parity when a large number of jobs are running. We ran SWIM, which submits many jobs concurrently over hours or even days, simulating the workload logged on actual production clusters. The first runs of SWIM on MR2 revealed a live-lock issue where the jobs would appear as submitted, but none of them would make any progress. What happened is that all available resources were allocated to the Application Masters, leaving no room for the actual tasks.

This issue escaped detection in our other scale tests that covered a range of data, cluster, and node scales. The live-lock occurs only when all the containers in a cluster are taken up by Application Masters. On a cluster of non-trivial size, this means hundreds or even thousands of concurrent jobs. SWIM is specifically designed to reveal such issues by replaying production workloads with their original level of concurrency and load variation over time. In this case, we found a critical issue before our customers ever hit it.

Pitfall 3: Believing in Miracles

If something is too good to be true, it’s probably not true. This means we should always have a model of what performance should be, so that we can tell if a performance improvement is expected, or too good to be true.

Here are some recent “miracles” we have had to debunk for ourselves and for our customers:

  • A customer came to us and declared that Impala performs more than 1000x better than its existing data warehouse system, and wanted us to help it set up a new cluster to handle a growing production workload. The 1000x difference is orders of magnitude larger than our own measurements, and immediately made us skeptical. Following much discussion, we realized that the customer was comparing very simple queries running on a proof-of-concept Impala cluster versus complex queries running on a heavily loaded production system. We helped the customer do an apples-to-apples comparison, yet it turns out Impala still has an advantage. We left the customer with realistic plans for how to grow its data management systems.
  • A customer asked us to run Apache Sqoop in several configurations, with the intent of finding the configuration leading to the best export performance. Among other tests we compared the performance of loading data to new partitions through Oracle Database’s direct path writes, to loading the same data through normal inserts. We normally expect direct path writes to be significantly faster since they bypass the normally busy buffer-cache and redo log subsystems, writing data blocks directly to storage. In this test, the normal inserts were 3 times faster than the direct path writes. Quick investigation revealed that Sqoop was exporting data to an otherwise idle Oracle cluster with over 300GB of memory dedicated to the buffer cache. Loading data into memory in a server with no contention is obviously faster than writing the same data to disk. We explained the results to the customer and recommended repeating the tests on a cluster with realistic workloads.
  • A customer asked us for comment on a Hadoop sort benchmark result in the trade press. The result is more than 100x faster than what we found internally. We took a look at the benchmark report and very quickly found that the data size being tested is considerably smaller than the available memory in the cluster. In other words, a knowledgeable operator would be able to configure Hadoop in a way that the sort takes place completely in memory. This approach departs from the common practice of configuring sort with data size much greater than total cluster memory. So the more-than-100x gap comes from the inherent hardware difference between memory and disk IO, rather than a difference between two software systems.

The ability to identify miracles requires us having models of expected performance beyond just a “gut feeling”. These models can come from prior results, or an understanding of where the system bottlenecks should be. Benchmarking without such models would give you a lot of numbers but not a lot of meaning.

Pitfall 4: Using Unrealistic Benchmarks

Biased benchmarks are benchmarks where the choice of workload, hardware or presentation choices is done regardless of the expected requirements of the customers. Rather, these choices are meant to highlight the capabilities of the vendor performing the benchmark.

Here are specific warning signs of a biased benchmark:

  • Misleading workloads: When a vendor ran benchmarks on 100GB of data when the system is marketed as a “Big Data” system designed for 100TB data sets. Or when a transactional workload is used to test a system with mostly analytical use cases. Terasort, for example, has specific characteristics and stresses a very specific subset of the processing subsystem. It is not necessarily a good benchmark to evaluate how the system will scale for other workloads, although it is a useful first step in comparing different hardware configurations.

At Cloudera, Terasort is only one job in our MapReduce performance benchmarking suite. We run all jobs in the suite under different meanings of scale beyond just large data size. (See Pitfall 2 above.)

  • Premium hardware: Vendors often improve their numbers by using hardware not typically used in production: solid state drives (SSDs) when the customers more commonly use hard disk drives (HDDs), or types of SSDs not available in the general market. The Transaction Processing Council – C (TPC-C) benchmark allow the use of hardware that is not available provided that availability dates are published. It is wise to check if the hardware choices make results irrelevant when using benchmarks for purchasing decisions.

At Cloudera, we have explored MapReduce performance for SSDs. We were very conscious of SSD’s prevalence in the market compared with HDDs. This prompted us to suggest to our hardware partners to track SSD performance-per-cost in addition to the more commonly cited capacity-per-cost. The importance of the performance-per-cost metric represents a key insight from the study.

  • Cherry-picking queries or jobs: The vendor picked very specific queries out of a standard benchmark, but can’t explain the choice with objective criteria that is relevant to the customers (or worse, doesn’t even disclose that a choice was made!)

At Cloudera, many of our past Impala performance results used 20 queries derived from the TPC – Decision Support (TPC-DS) benchmark. These queries were chosen over a year ago, and cover interactive, reporting, and deep analytic use cases. At the time, it was a major improvement over a frequently cited set of queries that were constructed without empirical backing from actual customer use cases. The 20 queries also represent a step forward from our own prior efforts using queries derived from TPC-H. Both TPC-H and TPC-DS are backed by customer surveys from vendors in the TPC consortium, with TPC-H considered to be the less demanding benchmark. We have kept the set of 20 queries derived from TPC-DS to help ourselves compare against our own prior results, and we are well aware they are less than the full set of 99 queries in the official TPC-DS. Look for our future posts in this space.

To an extent all commercial benchmarks are suspect of bias, since they are performed or commissioned by a specific vendor to market their products. Vendors can enhance their own credibility by being transparent about the limits of their own work. Customers can hold vendors accountable by understanding their own workload and have a conversation with vendors about whether a product addresses their specific use case.

Pitfall 5. Communicating Results Poorly

Poorly communicated results detract from otherwise good performance benchmarking projects. Here are Cloudera, we check all external-facing benchmarking communications for the following:

  1. Whether we selected a benchmark that
    1. Is unbiased (see Pitfall 3 above),
    2. Exercise workloads relevant to actual customers, and
    3. Scales across data size, concurrency level, cluster size, and node size.
  2. Whether we reported sufficient information for industry peers to assess the significance of the result, and to reproduce the tests if needed. This requires reporting
    1. The benchmark we used and why we used it,
    2. The performance metrics we measured and how we measured them,
    3. The hardware used and the software tuning applied.

One more aspect of a good benchmarking report is whether the results have been independently verified or audited. The purpose of an independent audit is to have the above checks done by someone other than the organization that does the benchmarking study. Benchmarking results that passed independent audit are more likely to be communicated clearly and completely.

There are several gold standards for audit and verification practices established before the rise of Big Data:

  • Dedicated auditors: The TPC uses dedicated auditors. Each auditor is certified to audit a particular benchmark only after passing a test designed by the working group who initially specified that benchmark.
  • Validation kits and fair-use rules: The Standard Performance Evaluation Corporation (SPEC) uses a combination of validation checks built into benchmarking kits, fair-use rules governing how the results should be communicated, and review by the SPEC organization, which encompasses many industry peers of the test sponsor.
  • Peer review: The official Sort Benchmark has new entries reviewed by past winners. This incentivizes the winners to “hand over the torch” only if new entries are sufficiently rigorous.

There are not yet any widely accepted audit and verification processes for Big Data. The need for complete and neutral benchmarking results is sometimes diluted by the need to stand out in the trade press. However, the past year has seen a phenomenal growth in the level of performance knowledge in the customer base and the broader community. Every vendor benchmark is now audited by customers and industry peers. This is why we always conduct and communicate our performance benchmarking in a rigorous and open manner.


Performance benchmarking is hard. When they are done well, benchmarks can guide us as well as the community. We close this blog with anecdotes of the authors’ benchmarking mistakes committed early in their career. After all, anyone can make benchmarking errors, and everyone can learn from them.

Gwen Shapira is on the Platform Engineering team at Cloudera. She once ran a database performance benchmark on a proof-of-concept 5-node cluster. When she was asked what would be the performance for a 50-node production cluster, she multiplied the 5-node performance numbers by 10x. The production cluster blew up, hitting network bottlenecks not revealed at the proof-of-concept scale. Lesson: Testing is better than extrapolating.

Yanpei Chen is on the Performance Engineering team at Cloudera. He ran his first Hadoop benchmark as a grad student at UC Berkeley, where he accidentally mounted HDFS on the departmental network filer. He took down the filer for all EECS professors, staff, and students, and received hate mail from the system administrators for a week. Lesson: Run your benchmarks in a way that doesn’t disrupt production systems.

Categories: Hadoop

The Impala Cookbook

Cloudera Blog - Wed, 12/10/2014 - 16:01

Impala, the open source MPP analytic database for Apache Hadoop, is now firmly entrenched in the Big Data mainstream. How do we know this? For one, Impala is now the standard against which alternatives measure themselves, based on a proliferation of new benchmark testing. Furthermore, Impala has been adopted by multiple vendors as their solution for letting customers do exploratory analysis on Big Data, natively and in place (without the need for redundant architecture or ETL). Also significant, we’re seeing the emergence of best practices and patterns out of customer experiences.

As an effort to streamline deployments and shorten the path to success, Cloudera’s Impala team has compiled a “cookbook” based on those experiences, covering:

  • Physical and Schema Design
  • Memory Usage
  • Cluster Sizing and Hardware Recommendations
  • Benchmarking
  • Multi-tenancy Best Practices
  • Query Tuning Basics
  • Interaction with Apache Hive, Apache Sentry, and Apache Parquet

By using these recommendations, Impala users will be assured of proper configuration, sizing, management, and measurement practices to provide an optimal experience. Happy cooking!

Categories: Hadoop

Progress Report: Community Contributions to Parquet

Cloudera Blog - Tue, 12/09/2014 - 21:05

Community contributions to Parquet are increasing in parallel with its adoption. Here are some of the highlights.

Apache Parquet (incubating), the open source, general-purpose columnar storage format for Apache Hadoop, was co-founded only 18 months ago by Cloudera and Twitter. Since that time, its rapid adoption by multiple platform vendors and communities has made it a de facto standard for this purpose.

Most of Cloudera’s recent contributions to have focused on fixing bugs reported by its growing number of users. Thanks to this work and contributions by others in the Parquet community (including employees from companies like Criteo, Stripe, Netflix, MapR, Salesforce.com, and others), the format is becoming more stable and mature with each release

In this post, you’ll learn about just a few of these awesome community contributions.


parquet-tools is a command-line utility that was contributed by engineers from ARRIS Inc. earlier this year. You can download it as a standalone tarball or use the copy included in CDH 5.2 and later. This handy utility lets you view the schema for a Parquet file, catthe content, or take a closer look at the encoding details of individual columns, all the way down to to the page level. For example:

blue@work:~$ parquet-tools schema favorite_books.parquet message Book { required binary isbn (UTF8); required binary title (UTF8); required int32 length_pages; required binary author (UTF8); } blue@work:~$ parquet-tools cat favorite_books.parquet isbn = 860-1200653809 title = Pride and Prejudice and Zombies length_pages = 320 author = Jane Austen & Seth Grahame-Smith isbn = 978-0394800011 title = The Cat in the Hat length_pages = 61 author = Dr. Seuss blue@work:~$ parquet-tools dump --disable-data favorite_books.parquet row group 0 -------------------------------------------------------------------------------------isbn: BINARY SNAPPY DO:0 FPO:4 SZ:54/53/0.98 VC:2 ENC:BIT_PACKED,PLAIN title: BINARY SNAPPY DO:0 FPO:58 SZ:73/74/1.01 VC:2 ENC:BIT_PACKED,PLAIN length_pages: INT32 SNAPPY DO:0 FPO:131 SZ:27/25/0.93 VC:2 ENC:BIT_PACKED,PLAIN author: BINARY SNAPPY DO:0 FPO:158 SZ:68/66/0.97 VC:2 ENC:BIT_PACKED,PLAIN isbn TV=2 RL=0 DL=0 ------------------------------------------------------------------------------------- page 0: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN SZ:36 VC:2 title TV=2 RL=0 DL=0 ------------------------------------------------------------------------------------- page 0: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN SZ:57 VC:2 length_pages TV=2 RL=0 DL=0 ------------------------------------------------------------------------------------- page 0: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN SZ:8 VC:2 author TV=2 RL=0 DL=0 ------------------------------------------------------------------------------------- page 0: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN SZ:49 VC:2


parquet-protobuf, contributed by Lukas Nalezenec, adds support for storing protobuf objects directly in Parquet without using a translation step to another object model. It works just like the existing parquet-avro and parquet-thrift libraries, making it possible to move existing application code to a column-oriented format without substantial rewrites. Using parquet-protobuf is as easy as this:

ProtoParquetWriter writer = new ProtoParquetWriter( new Path("/tmp/test.parquet"), cls); for (MessageOrBuilder record : records) { writer.write(record); } writer.close();

Filter2 API

The filter2 API, contributed by Twitter engineers, adds a DSL to easily express predicates that are applied to records. It can also serialize the predicates so there’s no longer a need to write custom code that pulls filter values out of a Configuration.

// foo == 10 || bar <= 17.0 IntColumn foo = intColumn("foo"); DoubleColumn bar = doubleColumn("x.y.bar"); FilterPredicate pred = or(eq(foo, 10), ltEq(bar, 17.0));


The Parquet community has grown to include more than 50 contributors, not including the work done by the Apache Spark and Apache Hive communities for native support. We welcome any and all new contributors to make this vibrant community even stronger!

Ryan Blue is a Software Engineer at Cloudera.

Categories: Hadoop

New in CDH 5.2: Improvements for Running Multiple Workloads on a Single HBase Cluster

Cloudera Blog - Tue, 12/09/2014 - 16:57

These new Apache HBase features in CDH 5.2 make multi-tenant environments easier to manage.

Historically, Apache HBase treats all tables, users, and workloads with equal weight. This approach is sufficient for a single workload, but when multiple users and multiple workloads were applied on the same cluster or table, conflicts can arise. Fortunately, starting with HBase in CDH 5.2 (HBase 0.98 + backports), workloads and users can now be prioritized.

One can categorize the approaches to this multi-tenancy problem in three ways:

  • Physical isolation or partitioning – each application or workload operates on its own table and each table can be assigned to a set of machines.
  • Scheduling – applications and workloads are scheduled based on access patterns or time and resources needed to complete.
  • Quotas – limited ad-hoc queries on a table can be shared with other applications.

In this post, I’ll explain three new HBase mechanisms (see umbrella JIRA HBASE-10994 – HBase Multitenancy) focused on enabling some of the approaches above.


In a multi-tenant environment, it is useful to enforce manual limits that prevent users from abusing the system. (A simple example is: “Let MyApp run as fast as possible and limit all the other users to 100 request per second.”)

The new throttling feature in CDH 5.2 (HBASE-11598 – Add rpc throttling) allows an admin to enforce a limit on number of requests by time or data by time for a specified user, table, or namespace. Some examples are:

  • Throttle Table A to X req/min
  • Throttle Namespace B to Y req/hour
  • Throttle User K on Table Z to KZ MB/sec

An admin can also change the throttle at runtime. The change will propagate after the quota refresh period has expired, which at the moment has a default refresh period of 5 minutes. This value is configurable by modifying the hbase.quota.refresh.period property in hbase-site.xml. In future releases, a notification will be sent to apply the changes instantly.

In the chart below, you can see an example of the results of throttling. 

Initially, User 1 and User2 are unlimited and then the admin decides that the User 1 job is more important and throttles the User 2 job, reducing contention with the User 1 requests.

The shell allows you to specify the limit in a descriptive way (e.g. LIMIT => 10req/sec or LIMIT => 50M/sec). To remove the limit, use LIMIT => NONE.


$ hbase shell hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10req/sec' hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10M/sec' hbase> set_quota TYPE => THROTTLE, USER => 'u1', TABLE => 't2', LIMIT => '5K/min' hbase> set_quota TYPE => THROTTLE, USER => 'u1', NAMESPACE => 'ns2', LIMIT => NONE hbase> set_quota TYPE => THROTTLE, NAMESPACE => 'ns1', LIMIT => '10req/sec' hbase> set_quota TYPE => THROTTLE, TABLE => 't1', LIMIT => '10M/sec' hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => NONE

You can also place a global limit and exclude a user or a table from the limit by applying the GLOBAL_BYPASS property. Consider a situation with a production workload and many ad-hoc workloads. You can choose to set a limit for all the workloads except the production one, reducing the impact of the ad-hoc queries on the production workload.

$ hbase shell hbase> set_quota NAMESPACE => 'ns1', LIMIT => '100req/min' hbase> set_quota USER => 'u1', GLOBAL_BYPASS => true

Note that the throttle is always enforced; even when the production workload is currently in-active, the ad-hoc requests are all throttled.

Request Queues

Assuming no throttling policy is in place, when the RegionServer receives multiple requests they are now placed into a queue waiting for a free execution slot (HBASE-6721 – RegionServer Group based Assignment).

The simplest queue is a FIFO queue, which means that each request has to wait for the completion of all the requests in the queue before it. And, as you can see from the picture below, fast/interactive queries can get stuck behind large requests. (To keep the example simple, let’s assume that there is a single executor.)

One solution would be to divide the large requests into small requests and interleave each chunk with other requests, allowing multiple requests to make progress. The current infrastructure doesn’t allow that; however, if you are able to guess how long a request will take to be served, you can reorder requests—pushing the long requests to the end of the queue and allowing short requests to jump in front of longer ones. At some point you have to execute the large requests and prioritize the new requests behind large requests. However, the short requests will be newer, so the result is not as bad as the FIFO case but still suboptimal compared to the solution described above where large requests are split into multiple smaller requests.

Deprioritizing Long-running Scanners

Along the line of what we described above, CDH 5.2 has a “fifo” queue and a new queue type called “deadline” configurable by setting the hbase.ipc.server.callqueue.type property (HBASE-10993 – Deprioritize long-running scanners). Currently there is no way to estimate how long each request may take, so de-prioritization only affects scans and is based on the number of “next” calls a scan request did. This assumes that when you are doing a full table scan, your job is probably not that interactive, so if there are concurrent requests you can delay long-running scans up to a limit tunable by setting the hbase.ipc.server.queue.max.call.delay property. The slope of the delay is calculated by a simple square root of (numNextCall * weight) where the weight is configurable by setting the hbase.ipc.server.scan.vtime.weight property.

Multiple-Typed Queues

Another way you can prioritize/deprioritize different kinds of requests is by having a specified number of dedicated handlers and queues. That way you can segregate the scan requests in a single queue with a single handler, and all the other available queues can service short Get requests.

Currently, some static tuning options are available to adjust the ipc queues/handlers based on the type of workload. This approach is an interim first step that will eventually allow you to change the settings at runtime as you do for throttling, and to enable dynamically adjusting values based on the load.

Multiple Queues

To avoid contention and separate different kinds of requests, a new property, hbase.ipc.server.callqueue.handler.factor, allows admins to increase the number of queues and decide how many handlers share the same queue (HBASE-11355 – Multiple Queues / Read-Write Queues).

Having more queues, such as one queue per handler, reduces contention when adding a task to a queue or selecting it from a queue. The trade-off is that if you have some queues with long-running tasks, a handler may end up waiting to execute from that queue rather than stealing from another queue which has waiting tasks.

Read and Write

With multiple queues, you can now divide read and write requests, giving more priority (queues) to one or the other type. Use the hbase.ipc.server.callqueue.read.ratio property to choose to serve more reads or writes (HBASE-11724 Short-Reads/Long-Reads Queues).

Similar to the read/write split, you can split gets and scans by tuning the hbase.ipc.server.callqueue.scan.ratio to give more priority to gets or to scans. The chart below shows the effect of the settings.

A scan ratio 0.1 will give more queue/handlers to the incoming gets, which means that more of them can be processed at the same time and that fewer scans can be executed at the same time. A value of 0.9 will give more queue/handlers to scans so the number of scan request served will increase and the number of gets will decrease.

Future Work

Aside from addressing the current limitations mentioned above (static conf, unsplittable large requests, and so on) and doing things like limiting the number of tables that a user can create or using the namespaces more, a couple of major new features on the roadmap will further improve interaction between multiple workloads:

  • Per-user queues: Instead of a global setting for the system, a more advanced way to schedule requests is to allow each user to have its own “scheduling policy” allowing each user to define priorities for each table, and allowing each table to define request-types priorities. This would be administered in a similar way to throttling.
  • Cost-based scheduling: Request execution can take advantage of the known system state to prioritize and optimize scheduling. For example, one could prioritize requests that are known to be served from cache, prefer concurrent execution of requests that are hitting two different disks, prioritize requests that are known to be short, and so on.
  • Isolation/partitioning: Separating workload onto different machines is useful in situations where the admin understand the workload of each table and how to manually separate them. The basic idea is to reserve enough resources to run everything smoothly. (The only way to achieve that today is to set up one cluster per use case.)

Based on the above, you should now understand how to improve the interaction between different workloads using this new functionality. Note however that these features are only down payments on what will become more robust functionality in future releases.

Matteo Bertozzi is a Software Engineer at Cloudera and an HBase committer/PMC member.

Categories: Hadoop

This Month in the Ecosystem (November 2014)

Cloudera Blog - Mon, 12/08/2014 - 21:52

Welcome to our 15th edition of “This Month in the Ecosystem,” a digest of highlights from November 2014 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly).

November was busy, even accounting for the US Thanksgiving holiday:

  • LinkedIn was busy: it open-sourced Cubert, a framework for building data pipelines that includes a new scripting language and runtime. It also described Gobblin, an internal data ingestion system to which it intends to migrate some of its large ingestion processes. 
  • Spotify described how it uses Apache Crunch.
  • Strata + Hadoop World Barcelona 2014 convened, with 1,000+ in attendance. You can watch recorded keynotes here.
  • Pinerest described PinAnalytics, its in-house platform for Big Data analytics that utilizes Apache HBase.
  • Cloudera and Intel described the progress of BigBench, a new industrywide effort to create a sorely needed Big Data benchmark.
  • DataTorrent announced the formation of a new project, KOYA, to add YARN support to Apache Kafka.
  • Cask described Tephra, an open source project that adds ”complete” transaction support to HBase (as a first step, support for other NoSQL data stores to be added later).
  • Apache Hadoop 2.6, Apache Hive 0.14, and Apache Pig 0.14 were all released by their respective Apache communities.
  • Apache Drill graduated into a Top Level Project.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

For Apache Hadoop, The POODLE Attack Has Lost Its Bite

Cloudera Blog - Wed, 12/03/2014 - 16:32

A significant vulnerability affecting the entire Apache Hadoop ecosystem has now been patched. What was involved?

By now, you may have heard about the POODLE (Padding Oracle On Downgraded Legacy Encryption) attack on TLS (Transport Layer Security). This attack combines a cryptographic flaw in the obsolete SSLv3 protocol with the ability of an attacker to downgrade TLS connections to use that protocol. The result is that an active attacker on the same network as the victim can potentially decrypt parts of an otherwise encrypted channel. The only immediately workable fix has been to disable the SSLv3 protocol entirely.

POODLE sent many technical people scrambling. Web servers needed configuration changes, software projects using TLS needed to change default behavior, and web browsers moved to phase out SSLv3 support. Cloudera has also taken action.

This blog post provides an overview of the POODLE vulnerability, discuss its impacts on Apache Hadoop, and describe the fixes Cloudera pushed forward across the ecosystem.

What is POODLE?

Let’s begin with some background about SSL/TLS terminology: SSL (Secure Sockets Layer) is the former name for what is today called TLS. Between SSLv3 and TLSv1, the protocol was renamed. Even though this happened 15 years ago, the SSL name has stuck around. And even though SSLv3 has long been obsolete, and has been known to have other, lesser, vulnerabilities, its retirement has been drawn out due to the desire to provide backward compatibility for the sake of a smooth user experience.

In the meantime, SSLv3 has been replaced by TLSv1, TLSv1.1, and TLSv1.2. Under normal circumstances, the strongest protocol version that both sides support is negotiated at the start of the connection. However, an active attacker can introduce errors into this negotiation and force a fallback into the weakest protocol version: SSLv3.

POODLE—the attack on SSLv3—was discovered by Bodo Möller, Thai Duong, and Krzysztof Kotowicz at Google. Their report describes how the SSLv3 protocol can be tortured to reveal otherwise encrypted information, one byte at a time. Using the vulnerability, the researchers were able to extract an average of one byte for every 256 SSLv3 connection attempts. This might not sound bad to non-crypto-geeks, but attackers can realistically use it to retrieve session cookies: strings that identify a user in a secure session. If you have a session cookie for someone logged into say, Gmail, you can then gain access to his or her Gmail account.

The attack itself is an excellent piece of work. If you’re interested in more details, I can highly recommend this Imperial Violet blog post and this blog post by Matthew Green. The Wikipedia article on TLS has a huge amount of general information.

Leashing POODLE

One common thread between the Hadoop community and the security research community is the habit of devising creative project names; a clever acronym or portmanteau seems to be at least as valuable as the code or exploits themselves. In that spirit I bring you HADOODLE: fixes for POODLE across the Hadoop ecosystem.

As you all know, the Hadoop platform isn’t one project; rather, it’s a confederation of many different projects, all interoperating and cooperating in the same environment. The word “ecosystem” is overused, but describes the situation perfectly. This can be a great thing, because it lets multiple different groups solve a variety of problems independently of each other. It’s also a great model for fast-paced innovation. However, it can be problematic for the sort of pervasive changes required by security vulnerabilities.

The loose confederation of projects means that there are several different web servers and other usages of TLS. Rounding up and fixing POODLE meant educating and coordinating changes among 12 different projects comprising five different types of web servers and three programming languages. While conceptually simple, “turning off SSLv3″ is done slightly differently for each of these technologies and required an awareness of TLS idiosyncrasies. Beyond the full system-level tests (provided by services like Apache Bigtop), every component of the ecosystem needed to be individually scrutinized to ensure that POODLE was really fixed. All in all, it was a fair amount of work.

I’m happy to say that Cloudera took up the challenge, and today we’re able to announce patches for every current release of CDH and Cloudera Manager. Cloudera engineers contributed the following HADOODLE fixes upstream:

Cloudera also has fixes for Apache HBase, Impala, and Cloudera Manager. Every component not mentioned does not yet support TLS and hence is not vulnerable.

The table below shows the releases where the POODLE fixes are first available.

This issue is also described in our Technical Service Bulletin #37 (TSB-37). If you run a secure Hadoop cluster, I strongly recommend upgrading to the appropriate patch release above. 

Michael Yoder is a Software Engineer at Cloudera.

Categories: Hadoop

Tuning Java Garbage Collection for HBase

Cloudera Blog - Tue, 12/02/2014 - 17:40

This guest post from Intel Java performance architect Eric Kaczmarek (originally published here) explores how to tune Java garbage collection (GC) for Apache HBase focusing on 100% YCSB reads.

Apache HBase is an Apache open source project offering NoSQL data storage. Often used together with HDFS, HBase is widely used across the world. Well-known users include Facebook, Twitter, Yahoo, and more. From the developer’s perspective, HBase is a “distributed, versioned, non-relational database modeled after Google’s Bigtable, a distributed storage system for structured data”. HBase can easily handle very high throughput by either scaling up (i.e., deployment on a larger server) or scaling out (i.e., deployment on more servers).

From a user’s point of view, the latency for each single query matters very much. As we work with users to test, tune, and optimize HBase workloads, we encounter a significant number now who really want 99th percentile operation latencies. That means a round-trip, from client request to the response back to the client, all within 100 milliseconds.

Several factors contribute to variation in latency. One of the most devastating and unpredictable latency intruders is the Java Virtual Machine’s (JVM’s) “stop the world” pauses for garbage collection (memory clean-up).

To address that, we tried some experiments using Oracle jdk7u21 and jdk7u60 G1 (Garbage 1st) collector. The server system we used was based on Intel Xeon Ivy-bridge EP processors with Hyper-threading (40 logical processors). It had 256GB DDR3-1600 RAM, and three 400GB SSDs as local storage. This small setup contained one master and one slave, configured on a single node with the load appropriately scaled. We used HBase version 0.98.1 and local filesystem for HFile storage. HBase test table was configured as 400 million rows, and it was 580GB in size. We used the default HBase heap strategy: 40% for blockcache, 40% for memstore. YCSB was used to drive 600 work threads sending requests to the HBase server.

The following charts shows jdk7u21 running 100% read for one hour using -XX:+UseG1GC -Xms100g -Xmx100g -XX:MaxGCPauseMillis=100. We specified the garbage collector to use, the heap size, and the desired garbage collection (GC) “stop the world” pause time.

Figure 1: Wild swings in GC Pause time

In this case, we got wildly swinging GC pauses. The GC pause had a range from 7 milliseconds to 5 full seconds after an initial spike that reached as high as 17.5 seconds.

The following chart shows more details, during steady state:

Figure 2: GC pause details, during steady state

Figure 2 tells us the GC pauses actually comes in three different groups: (1) between 1 to 1.5 seconds; (2) between 0.007 seconds to 0.5 seconds; (3) spikes between 1.5 seconds to 5 seconds. This was very strange, so we tested the most recently released jdk7u60 to see if the data would be any different:

We ran the same 100% read tests using exactly the same JVM parameters: -XX:+UseG1GC -Xms100g -Xmx100g -XX:MaxGCPauseMillis=100.

Figure 3: Greatly improved handling of pause time spikes

Jdk7u60 greatly improved G1′s ability to handle pause time spikes after initial spike during settling down stage. Jdk7u60 made 1029 Young and mixed GCs during a one hour run. GC happened about every 3.5 seconds. Jdk7u21 made 286 GCs with each GC happening about every 12.6 seconds. Jdk7u60 was able to manage pause time between 0.302 to 1 second without major spikes.

Figure 4, below, gives us a closer look at 150 GC pauses during steady state:

Figure 4: Better, but not good enough

During steady state, jdk7u60 was able to keep the average pause time around 369 milliseconds. It was much better than jdk7u21, but it still did not meet our requirement of 100 milliseconds given by –Xx:MaxGCPauseMillis=100.

To determine what else we could do to get our 100 million seconds pause time, we needed to understand more about the behavior of the JVM’s memory management and G1 (Garbage First) garbage collector. The following figures show how G1 works on Young Gen collection.

Figure 5: Slide from the 2012 JavaOne presentation by Charlie Hunt and Monica Beckwith: “G1 Garbage Collector Performance Tuning”

When the JVM starts, based on the JVM launching parameters, it asks the operating system to allocate a big continuous memory chunk to host the JVM’s heap. That memory chunk is partitioned by the JVM into regions.

Figure 6: Slide from the 2012 JavaOne presentation by Charlie Hunt and Monica Beckwith: “G1 Garbage Collector Performance Tuning”

As Figure 6 shows, every object that the Java program allocates using the Java API first comes to the Eden space in the Young generation on the left. After a while, the Eden becomes full, and a Young generation GC is triggered. Objects that still are referenced (i.e., “alive”) are copied to Survivor space. When objects survive several GCs in the Young generation, they get promoted to the Old generation space.

When Young GC happens, the Java application’s threads are stopped in order to safely mark and copy live objects. These stops are the notorious “stop-the-world” GC pauses, which make the applications non-responding until the pauses are over.

Figure 7: Slide from the 2012 JavaOne presentation by Charlie Hunt and Monica Beckwith: “G1 Garbage Collector Performance Tuning”

The Old generation also can become crowded. At a certain level—controlled by -XX:InitiatingHeapOccupancyPercent=? where the default is 45% of total heap—a mixed GC is triggered. It collects both Young gen and Old gen. The mixed GC pauses are controlled by how long the Young gen takes to clean-up when mixed GC happens.

So we can see in G1, the “stop the world” GC pauses are dominated by how fast G1 can mark and copy live objects out of Eden space. With this in mind, we will analyze how the HBase memory allocation pattern will help us tune G1 GC to get our 100 milliseconds desired pause.

In HBase, there are two in-memory structures that consume most of its heap: The BlockCache, caching HBase file blocks for read operations, and the Memstore caching the latest updates.

Figure 8: In HBase, two in-memory structures consume most of its heap.

The default implementation of HBase’s BlockCache is the LruBlockCache, which simply uses a large byte array to host all the HBase blocks. When blocks are “evicted”, the reference to that block’s Java object is removed, allowing the GC to relocate the memory.

New objects forming the LruBlockCache and Memstore go to the Eden space of Young generation first. If they live long enough (i.e., if they are not evicted from LruBlockCache or flushed out of Memstore), then after several Young generations of GCs, they makes their way to the Old generation of the Java heap. When the Old generation’s free space is less than a given threshOld (InitiatingHeapOccupancyPercent to start with), mixed GC kicks in and clears out some dead objects in the Old generation, copies live objects from the Young gen, and recalculates the Young gen’s Eden and the Old gen’s HeapOccupancyPercent. Eventually, when HeapOccupancyPercent reaches a certain level, a FULL GC happens, which makes huge “stop the world” GC pauses to clean-up all dead objects inside the Old gen.

After studying the GC log produced by “-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy“, we noticed HeapOccupancyPercent never grew large enough to induce a full GC during HBase 100% read. The GC pauses we saw were dominated by Young gen “stop the world” pauses and the increasing reference processing over the time.

Upon completing that analysis, we made three groups of changes in the default G1 GC setting:

  1. Use -XX:+ParallelRefProcEnabled

    When this flag is turned on, GC uses multiple threads to process the increasing references during Young and mixed GC. With this flag for HBase, the GC remarking time is reduced by 75%, and overall GC pause time is reduced by 30%.

  2. Set -XX:-ResizePLAB and -XX:ParallelGCThreads=8+(logical processors-8)(5/8)

    Promotion Local Allocation Buffers (PLABs) are used during Young collection. Multiple threads are used. Each thread may need to allocate space for objects being copied either in Survivor or Old space. PLABs are required to avoid competition of threads for shared data structures that manage free memory. Each GC thread has one PLAB for Survival space and one for Old space. We would like to stop resizing PLABs to avoid the large communication cost among GC threads, as well as variations during each GC.

    We would like to fix the number of GC threads to be the size calculated by 8+(logical processors-8)(5/8). This formula was recently recommended by Oracle.

    With both settings, we are able to see smoother GC pauses during the run.

  3. Change -XX:G1NewSizePercentdefault from 5 to 1 for 100GB heap

    Based on the output from -XX:+PrintGCDetails and -XX:+PrintAdaptiveSizePolicy, we noticed the reason for G1′s failure to meet our desired 100GC pause time was the time it took to process Eden. In other words, G1 took an average 369 million seconds to empty 5GB of Eden during our tests. We then changed the Eden size using -XX:G1NewSizePercent=<positive integer> flag from 5 down to 1. With this change, we saw GC pause time reduced to 100 milliseconds.

From this experiment, we found out G1′s speed to clean Eden is about 1GB per 100 milliseconds, or 10GB per second for the HBase setup that we used.

Based on that speed, we can set -XX:G1NewSizePercent=<positive integer> so the Eden size can be kept around 1GB. For example:

  • 32GB heap, -XX:G1NewSizePercent=3
  • 64GB heap, -XX:G1NewSizePercent=2
  • 100GB and above heap, -XX:G1NewSizePercent=1
  • So our final command-line options for the HRegionserver are:
    • -XX:+UseG1GC
    • -Xms100g -Xmx100g (Heap size used in our tests)
    • -XX:MaxGCPauseMillis=100 (Desired GC pause time in tests)
    • -XX:+ParallelRefProcEnabled
    • -XX:-ResizePLAB
    • -XX:ParallelGCThreads= 8+(40-8)(5/8)=28
    • -XX:G1NewSizePercent=1

Here is GC pause time chart for running 100% read operation for 1 hour:

Figure 9: The highest initial settling spikes were reduced by more than half.

In this chart, even the highest initial settling spikes were reduced from 3.792 seconds to 1.684 seconds. The most initial spikes were less than 1 second. After the settlement, GC was able to keep pause time around 100 milliseconds.

The chart below compares jdk7u60 runs with and without tuning, during steady state:

Figure 10: jdk7u60 runs with and without tuning, during steady state.

The simple GC tuning we described above gives ideal GC pause times, around 100 milliseconds, with average 106 milliseconds and 7 milliseconds standard deviation.


HBase is a response-time-critical application that requires GC pause time to be predictable and manageable. With Oracle jdk7u60, based on the GC information reported by -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy, we are able to tune the GC pause time down to our desired 100 milliseconds.

Eric Kaczmarek is a Java performance architect in Intel’s Software Solution Group. He leads the effort at Intel to enable and optimize Big Data frameworks (Hadoop, HBase, Spark, Cassandra) for Intel platforms.

Software and workloads used in performance tests may have been optimized for performance only on Intel microprocessors. Performance tests, such as SYSmark and MobileMark, are measured using specific computer systems, components, software, operations and functions. Any change to any of those factors may cause the results to vary. You should consult other information and performance tests to assist you in fully evaluating your contemplated purchases, including the performance of that product when combined with other products.

Intel processor numbers are not a measure of performance. Processor numbers differentiate features within each processor family. Not across different processor families. Go to: http://www.intel.com/products/processor_number.

Copyright 2014 Intel Corp. Intel, the Intel logo and Xeon are trademarks of Intel Corporation in the U.S. and/or other countries.

Categories: Hadoop

Apache Hadoop 2.6 is Released

Cloudera Blog - Mon, 12/01/2014 - 18:02

The Apache Hadoop community has voted to release Hadoop 2.6. Congrats to all contributors!

This new release contains a variety of improvements, particularly in the storage layer and in YARN. We’re particularly excited about the encryption-at-rest feature in HDFS!

Hadoop Common


  • Heterogeneous Storage Tiers – Phase 2
  • HDFS-5682 - Application APIs for heterogeneous storage
  • HDFS-7228 - SSD storage tier
  • HDFS-5851 - Memory as a storage tier (beta)
  • HDFS-6584 - Support for Archival Storage
  • HDFS-6134 - Transparent data at rest encryption (beta)
  • HDFS-2856 - Operating secure DataNode without requiring root access
  • HDFS-6740 - Hot swap drive: support add/remove data node volumes without restarting data node (beta)
  • HDFS-6606 - AES support for faster wire encryption


  • YARN-896 - Support for long-running services in YARN
    • YARN-913 - Service Registry for applications
  • YARN-666 - Support for rolling upgrades
    • YARN-556 - Work-preserving restarts of ResourceManager
    • YARN-1336 - Container-preserving restart of NodeManager
  • YARN-796 - Support node labels during scheduling
  • YARN-1051 - Support for time-based resource reservations in Capacity Scheduler (beta)
  • YARN-1492 - Global, shared cache for application artifacts (beta)
  • YARN-1964 - Support running of applications natively in Docker containers (alpha)

See the Hadoop 2.6.0 Release Notes for details.

Justin Kestelyn is Cloudera’s community outreach director.

Categories: Hadoop

BigBench: Toward An Industry-Standard Benchmark for Big Data Analytics

Cloudera Blog - Tue, 11/25/2014 - 17:49

Learn about BigBench, the new industrywide effort to create a sorely needed Big Data benchmark.

Benchmarking Big Data systems is an open problem. To address this concern, numerous hardware and software vendors are working together to create a comprehensive end-to-end big data benchmark suite called BigBench. BigBench builds upon and borrows elements from existing benchmarking efforts in the Big Data space (such as YCSB, TPC-xHS, GridMix, PigMix, HiBench, Big Data Benchmark, and TPC-DS). Intel and Cloudera, along with other industry partners, are working to define and implement extensions to BigBench 1.0. (A TPC proposal for BigBench 2.0 is in the works.)

BigBench Overview

BigBench is a specification-based benchmark with an open-source reference implementation kit, which sets it apart from its predecessors. As a specification-based benchmark, it would be technology-agnostic and provide the necessary formalism and flexibility to support multiple implementations. As a “kit”, it would lower the barrier of entry to benchmarking by providing a readily available reference implementation as a starting point. As open source, it would allow multiple implementations to co-exist in one place and be reused by different vendors, while providing consistency where expected for the ability to provide meaningful comparisons.

The BigBench specification comprises two key components: a data model specification, and a workload/query specification. The structured part of the BigBench data model is adopted from the TPC-DS data model depicting a product retailer, which sells products to customers via physical and online stores. BigBench’s schema uses the data of the store and web sales distribution channel and augments it with semi-structured and unstructured data as shown in Figure 1.  


Figure 1: BigBench data model specification

The data model specification is implemented by a data generator, which is based on an extension of PDGF. Plugins for PDGF enable data generation for an arbitrary schema. Using the BigBench plugin, data can be generated for all three pats of the schema: structured, semi-structured and unstructured.

BigBench 1.0 workload specification consists of 30 queries/workloads. Ten of these queries have been taken from the TPC-DS workload and run against the structured part of the schema. The remaining 20 were adapted from a McKinsey report on Big Data use cases and opportunities. Seven of these run against the semi-structured portion and five run against the unstructured portion of the schema. The reference implementation of the workload specification is available here.

BigBench 1.0 specification includes a set of metrics (focused around execution time calculation) and multiple execution modes. The metrics can be reported for the end-to-end execution pipeline as well as each individual workload/query. The benchmark also defines a model for submitting concurrent workload streams in parallel, which can be extended to simulate the multi-user scenario.

BigBench Extensions: Toward BigBench 2.0

BigBench has some ways to go before it can be declared complete. A work-in-progress paper about BigBench co-authored by various industry and academia experts, discusses the reference implementation, community feedback on what is done well, and shortcomings of the 1.0 specification and implementation. The concerns are addressed in the form of proposed extensions for BigBench 2.0, some of which are described below.

The current specification, while representative of a wide variety of big data use cases, falls short of being complete—primarily because it is structured-data-intensive, with some representation for semi-structured and unstructured content (which also gets formatted into structured data before being processed). By adding more procedural and analytic workloads that perform complex operations directly on unstructured data, the lopsidedness can be eliminated. This approach is also closely tied to the data specification model, which currently doesn’t state the rate of input data generation and refresh, thereby excluding streaming workloads from the current specification.

The specification also needs to be extended to enforce that all file formats demonstrate sufficient flexibility to be created, read, and written from multiple popular data processing engines (MapReduce, Apache Spark, Apache Hive, and so on). This capability would ensure that all data is immediately query-able with no ETL or format-conversion delays.  

The current set of metrics excludes performance per cost. As our experiments show, this metric is critical for comparing software systems in the presence of hardware diversity. Performance subject to failures is another important metric currently missing in the specification. On the implementation side, the kit needs to be enhanced with implementation of existing and proposed queries over popular data processing engines, such as Impala, Spark, and MapReduce in addition to Hive.

Results for BigBench

Intel has evaluated a subset of BigBench 1.0 workloads against multiple hardware configurations. The goals of these experiments were to:

  • Validate a reference implementation of BigBench 1.0 on the latest Intel hardware using CDH (CDH 5.1)
  • Understand the price/performance trade-off for different configurations
  • Highlight price/performance as the key metric

We selected four queries (2, 16, 20, 30), each representing a different processing model (for query descriptions, please see https://github.com/intel-hadoop/Big-Bench). A common input dataset of 2TB was used. Input data sizes for each of the four workloads (2, 16, 20, 30) were 244GB, 206GB, 144GB, and 244GB respectively. We used Intel’s Performance Analysis Tool (PAT) to collect and analyze data. (PAT automates benchmark execution and data collection and makes use of Linux performance counters.)

The table below shows the three different configurations under experimentation. Each of the three clusters contained 10 nodes. For Configuration 1, we selected Intel Xeon E5 2680 v3 with 12 cores, 30MB cache, 2.5GHz (3.3 GHz @Turbo) and operating TDP of 120W. 800GB DC3500 series SSD was used as the boot drive. For primary storage, 12x 2TB SATA drives were used.

In Configuration 2, Intel Xeon E5 2680 v3 was replaced with E5 2697 v3 (14 cores, 35MB cache, 2.6GHz with 3.6 GHz@Turbo, and 145W TDP). A 2TB Intel DC3700 SSD was added as primary storage alongside the hard-disk drives. Concerns with regard to endurance and affordability of a large capacity NAND drive have prevented customers from embracing SSD technology for their Hadoop clusters; this Cloudera blog post explains the merits of using SSD for performance and the drawbacks in terms of cost. However, SSD technology has advanced significantly since the blog was published. For example, capacity and endurance have improved and the price of Intel DC3600 SSD is now one-third the price of SSD reported in that post.

In Configuration 3, we replaced Intel Xeon E5 2697 v3 with Intel Xeon E5 2699 v3 (18 cores, 45MB cache, 2.3GHz with 3.6 Ghz@Turbo, and 145W TDP). The hard-disk drives were replaced with a second SSD drive. Memory was increased from 128GB to 192GB.

As shown in the table above, Configuration 2 costs 1.5x of Configuration 1 and Configuration 3 costs around 2x of Configuration 1. A summary of results from the experiments are shown in Figure 2.       

Figure 2: BigBench testing results

For workloads #16 and #30, the performance gains from Configurations 2 and 3 are strictly proportional to the cost of the hardware. The customer has to pay 2x for getting 2x performance, and the performance per dollar ratio is close to 1. For Workloads #2 and #20, however, the performance per dollar ratio is less than 1. From these results we can conclude that Configuration 1 is a good choice in all cases—whereas scaled-up Configurations 2 and 3 make sense for certain type of workloads, especially those that are disk-IO intensive.

We monitored CPU utilization, disk bandwidth, network IO and memory usage for each workload (using PAT).  Most of the gains come from the use of SSDs. In the presence of SSDs, the workloads tend to become CPU-bound at which point an increase in CPU cores and frequency starts to help. For in-memory data processing engines (Spark and Impala), an increase in memory size is likely to be the most important factor. We hope to cover that issue in a future study.


BigBench is an industrywide effort on creating a comprehensive and standardized BigData benchmark. Intel and Cloudera are working on defining and implementing extensions to BigBench (in the form of BigBench 2.0). A preliminary validation against multiple cluster configurations using latest Intel hardware shows that, from a price-performance viewpoint, scaled-up configurations (that use SSD and high-end Intel processors) are beneficial for workloads that are disk-IO bound.


BigBench is a joint effort with partners in industry and academia. The authors would like to thank Chaitan Baru, Milind Bhandarkar, Alain Crolotte, Carlo Curino, Manuel Danisch, Michael Frank, Ahmed Ghazal, Minqing Hu, Hans-Arno Jacobsen, Huang Jie, Dileep Kumar, Raghu Nambiar, Meikel Poess, Francois Raab, Tilmann Rabl, Kai Sachs, Saptak Sen, Lan Yi and Choonhan Youn. We invite rest of the community to participate in development of the BigBench 2.0 kit.

Bhaskar Gowda is a Systems Engineer at Intel Corp.

Nishkam Ravi is a Software Engineer at Cloudera.

Software and workloads used in performance tests may have been optimized for performance only on Intel microprocessors. Performance tests, such as SYSmark and MobileMark, are measured using specific computer systems, components, software, operations and functions. Any change to any of those factors may cause the results to vary. You should consult other information and performance tests to assist you in fully evaluating your contemplated purchases, including the performance of that product when combined with other products.

Intel processor numbers are not a measure of performance. Processor numbers differentiate features within each processor family. Not across different processor families. Go to: http://www.intel.com/products/processor_number.

Intel, the Intel logo and Xeon are trademarks of Intel Corporation in the U.S. and/or other countries.

Categories: Hadoop

Apache Hive on Apache Spark: The First Demo

Cloudera Blog - Fri, 11/21/2014 - 13:06

The community effort to make Apache Spark an execution engine for Apache Hive is making solid progress.

Apache Spark is quickly becoming the programmatic successor to MapReduce for data processing on Apache Hadoop. Over the course of its short history, it has become one of the most popular projects in the Hadoop ecosystem, and is now supported by multiple industry vendors—ensuring its status as an emerging standard.

Two months ago Cloudera, Databricks, IBM, Intel, MapR, and others came together to port Apache Hive and the other batch processing engines to Spark. In October at Strata + Hadoop World New York, the Hive on Spark project lead Xuefu Zhang shared the project status and a provided a demo of our work. The same week at the Bay Area Hadoop User Group, Szehon Ho discussed the project and demo’ed the work completed. Additionally, Xuefu and Suhas Satish will be speaking about Hive on Spark at the Bay Area Hive User Group on Dec. 3.

The community has committed more than 140 changes to the Spark branch as part of HIVE-7292 – Hive on Spark. We are proud to say that queries are now functionally able to run, as you can see in the demo below of a multi-node Hive-on-Spark query (query 28 from TPC-DS with a scale factor of 20 on a TPC-DS derived dataset).

This demo is intended to illustrate our progress toward porting Hive to Spark, not to compare Hive-on-Spark performance versus other engines. The Hive-on-Spark team is now focused on additional join strategies, like Map-side joins, statistics, job monitoring, and other operational aspects. As these pieces come together, we’ll then shift our focus to the performance tuning and optimization needed prior to general release.

The Hive and Spark communities have worked closely together to make this effort possible. In order to support Hive on Spark, Spark developers have provided enhancements including MapReduce-style shuffle transformation, removed Guava from the public API, and improved the Java version of the Spark API. Among other enhancements in progress, the Spark community is working hard to provide elastic scaling within a Spark application. (Elastic Spark application scaling is a favorite request from longtime Spark users.) Given all the enhancements Hive on Spark is driving within Spark, the Hive-on-Spark project is turning out to be beneficial for their respective communities.

We look forward to providing another update in a few months. Until then, please enjoy the demo video!

Finally, a big thanks to to the project team members: Chao Sun, Chengxiang Li, Chinna Rao Lalam, Jimmy Xiang, Marcelo Vanzin, Na Yang, Reynold Xin, Rui Li, Sandy Ryza, Suhas Satish, Szehon Ho, Thomas Friedrich, Venki Korukanti, and Xuefu Zhang.

Brock Noland is a Software Engineer at Cloudera and an Apache Hive committer/PMC member.

Categories: Hadoop

Guidelines for Installing CDH Packages on Unsupported Operating Systems

Cloudera Blog - Tue, 11/18/2014 - 17:04

Installing CDH on newer unsupported operating systems (such as Ubuntu 13.04 and later) can lead to conflicts. These guidelines will help you avoid them.

Some of the more recently released operating systems that bundle portions of the Apache Hadoop stack in their respective distro repositories can conflict with software from Cloudera repositories. Consequently, when you set up CDH for installation on such an OS, you may end up picking up packages with the same name from the OS’s distribution instead of Cloudera’s distribution. Package installation may succeed, but using the installed packages may lead to unforeseen errors. 

If you are manually installing (via apt-get, yum, or Puppet) packages for Ubuntu 14.04 with CDH 5.2.0 or later (which is supported by Cloudera), this issue does not pertain to you—Cloudera Manager takes care of the necessary steps to avoid conflicts. Furthermore, if you are using CDH parcels instead of packages for any release or OS, conflicts are similarly not an issue.

If, however, you are either:

  • Manually installing CDH packages (any release) on a newer unsupported OS (such as Ubuntu 13.04, Ubuntu 13.10, Debian 7.5, Fedora 19, and Fedora 20–refer to the CDH 5 Requirements and Supported Versions guide for an up-to-date list of supported OSs), or
  • Manually installing CDH packages for a release earlier than CDH 5.2.0 on Ubuntu 14.04

then you should find the following good-faith installation guidelines helpful.

(Note: If you are installing CDH 5.2 packages manually on a supported OS like Ubuntu 14.04, the documentation lists the necessary steps you need to take. However, you may still find this blog post useful as background reading.)

The Problem in Action

As explained above, if you are mixing-and-matching packages between distributions, you may easily end up with misleading errors.

For example, here is an error when running hbase shell on Ubuntu 13.04 with CDH 5. In this case, the zookeeper package is installed from the OS repositories (in this case, Ubuntu 13.04) whereas the hbase package is installed from CDH.

$ hbase shell 2014-09-08 13:27:51,017 INFO  [main] Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available HBase Shell; enter 'help' for list of supported commands. Type "exit" to leave the HBase Shell Version 0.98.1-cdh5.1.2, rUnknown, Mon Aug 25 19:33:59 PDT 2014 hbase(main):001:0&gt; status ERROR: cannot link Java class org.apache.hadoop.hbase.client.HBaseAdmin, probable missing dependency: org/apache/zookeeper/KeeperException

Does My OS Have This Problem?

In the table below, you will find the mapping of various operating systems and the conflicting packages.

Red means a conflict exists: Installing packages today on the OS would install some package(s) from the OS repo instead of the CDH repo, or worse, have a mix of packages from OS and CDH repo. The value of the field represents the package that will be installed from the OS. For example, OS zookeeper refers to the fact that the zookeeper package would be installed from the OS instead of the CDH repository, which will cause issues.

Orange means no conflict currently exists but that one could arise if the OS repo decides to bump up or change the package version.

If you are using a problematic OS, you will find the solution in the next section.

(Note: Even though Ubuntu 14.04 is listed as a “problematic” OS in the above table, the solution described below is already implemented in Cloudera Manager and described in the documentation. You don’t have to do anything extra if you are using Cloudera Manager or simply following the documentation.)


The best way to fix this problem is to ensure that all the packages are coming from the same CDH repository. The OS repository is added by default and it’s usually not a good idea to disable that repository. You can, however, set the priority of CDH repo to be higher than the default OS repo. Consequently, if there is a package with the same name in the CDH and the default OS repo, the package from the CDH repository would take precedence over the one in the OS repository regardless of which one has the higher version. This concept is generally referred to as pinning.

For Debian-based OSs (e.g. Ubuntu 13.04, Ubuntu 13.10, Debian 7.5)

Create a file at /etc/apt/preferences.d/cloudera.pref with the following contents:

Package: * Pin: release o=Cloudera, l=Cloudera Pin-Priority: 501

No apt-get update is required after creating this file.

For those curious about this solution, the default priority of packages is 500. By creating the file above, you provide a higher priority of 501 to any package that has origin specified as “Cloudera” (o=Cloudera) and is coming from Cloudera’s repo (l=Cloudera), which does the trick.

For RPM-based OSs (such as Fedora 19 and Fedora 20)

Install the yum-plugin-priorities package by running:

sudo yum install yum-plugin-priorities

This package enables us to use yum priorities which you will see in the next step.

Then, edit the relevant cloudera-cdh*.repo file under /etc/yum.repos.d/ and add this line at the bottom of that file:

priority = 98

The default priority for all repositories (including the OS repository) is 99. Lower priority takes more precedence in RHEL/CentOS. By setting the priority to 98, we give the Cloudera repository higher precedence than the OS repository.

For OSs Not on the List

In general, you will have a problem if the OS repository and the CDH repository provide a package with the same name. The most common conflicting packages are zookeeper and hadoop-client, so as a start you need to ascertain whether there is more than one repository delivering those packages.

On a Debian-based system, you can run something like apt-cache policy zookeeper. That command will list all the repositories where the package zookeeper is available. For example, here is the result of running apt-cache policy zookeeper on Ubuntu 13.04:

root@ip-172-26-1-209:/etc/apt/sources.list.d# apt-cache policy zookeeper zookeeper:   Installed: (none)   Candidate: 3.4.5+dfsg-1~exp2   Version table:      3.4.5+dfsg-1~exp2 0         500 http://us-west-1.ec2.archive.ubuntu.com/ubuntu/ raring/universe amd64 Packages      3.4.5+cdh5.1.0+28-1.cdh5.1.0.p0.208~precise-cdh5.1.0 0         500 http://archive.cloudera.com/cdh5/ubuntu/precise/amd64/cdh/ precise-cdh5/contrib amd64 Packages

As you can see, the package zookeeper is available from two repositories: Ubuntu’s Raring Universe Repository and the CDH repository. So, you have a problem.

On a yum-based system, you can run something like yum whatprovides hadoop-client. That command will list all the repositories where the hadoop-client package is available. For example, here is the result from Fedora 20:

$yum whatprovides hadoop-client Loaded plugins: priorities cloudera-cdh4 hadoop-client-2.0.0+1604-1.cdh4.7.0.p0.17.el6.x86_64 : Hadoop client side dependencies Repo : cloudera-cdh4 hadoop-client-2.2.0-1.fc20.noarch : Libraries for Hadoop clients Repo : fedora hadoop-client-2.2.0-5.fc20.noarch : Libraries for Apache Hadoop clients Repo : updates

As you can see, the package zookeeper is available from multiple repositories: Fedora 20 repositories and the CDH repository. Again, that’s a problem.


Managing package repositories that deliver conflicting packages can be tricky. You have to take the above steps on affected operating systems to avoid any conflicts.

To re-iterate, this issue is mostly contained to the manual use of packages on unsupported OSs:

  • If you are using parcels, you don’t have to worry about such problems. On top of that you get easy rolling upgrades.
  • If you are installing packages via Cloudera Manager, you don’t have to worry about such problems since Cloudera Manager takes care of pinning.
  • If the preceding points don’t apply to you, follow the instructions in the blog post to ensure there are no conflicts among CDH and OS packages

Mark Grover is a Software Engineer on Cloudera Engineering’s Packaging and Integration team, an Apache Bigtop committer, and a co-author of the O’Reilly Media book, Hadoop Application Architectures.

Categories: Hadoop

How Apache Sqoop 1.4.5 Improves Oracle Database/Apache Hadoop Integration

Cloudera Blog - Fri, 11/14/2014 - 16:44

Thanks to Guy Harrison of Dell Inc. for the guest post below about time-tested performance optimizations for connecting Oracle Database with Apache Hadoop that are now available in Apache Sqoop 1.4.5 and later.

Back in 2009, I attended a presentation by a Cloudera employee named Aaron Kimball at the MySQL User Conference in which he unveiled a new tool for moving data from relational databases into Hadoop. This tool was to become, of course, the now very widely known and beloved Sqoop!

As someone who had worked with relational databases—particularly Oracle Database—for more than 20 years and who was becoming very interested in Hadoop, Sqoop immediately piqued my interest. A general-purpose tool for moving data between RDBMS and Hadoop obviously seemed important, but experience had already taught me that generic database access methods are rarely optimal for a specific database because each RDBMS implements its own fast-path access methods to gain competitive advantage. It instantly occurred to me that using these fast-path methods was clearly the key to making Sqoop not just a good general-purpose tool but also a high-performance solution.

For MySQL, Aaron had already integrated a fast-path export based on the mysqldump utility. I wondered if my team at Quest Software (now part of Dell Software) could create a fast-path access method for Oracle. 

OraOop Beginnings

When we looked at Sqoop, we found a few significant opportunities for optimization. The most significant involved how Sqoop distributes work across mapper tasks. When parallelizing across a large table, Sqoop uses primary-key ranges to break up the work. Each mapper runs a query to grab a range of primary key values. This approach can lead to a few undesirable outcomes in Oracle Database, for example:

  • The Oracle optimizer uses cost-based algorithms to decide between primary key index lookups and full table scans. Sometimes, Oracle will execute each primary key range lookup by a full table scan—resulting in multiple scans hitting the Oracle database simultaneously.
  • Oracle Database may even decide to use its own parallel query capability to parallelize individual mapper tasks—increasing the load on the database and disturbing relative load created by each mapper.
  • In Oracle Database, physical location of data is not normally dictated by primary key—so a physical block on disk may contain data required by multiple mappers. Consequently, individual blocks on disk would each be accessed multiple times creating excessive IO.
  • When the Oracle Database optimizer decides to use index scans, the RDBMS will load the resulting data into its buffer cache—driving out blocks required for normal database operation with blocks that are only needed by Sqoop.
  • Primary keys are often not uniformly distributed: as older rows are deleted, the older primary key ranges become sparse. Sqoop’s algorithms could not take this into account and often one mapper would have many more rows to process than another. 

Thus for our initial implementation of a Sqoop connector (informally called “OraOop”), we had the following design goals:

  • Partition data to the mappers based on physical storage characteristics so that each mapper is allocated a completely distinct set of physical blocks and no physical block is read more than once.
  • Make sure each mapper receives an equal amount of work.
  • Bypass Oracle Database parallelism and the Oracle buffer cache.
  • Neither require nor use indexes.
  • Use Oracle “direct path” IO.

The first release of OraOop in 2010 achieved these goals, and as Sqoop added bi-directional transfer capabilities, we performed similar optimizations for moving data from Hadoop to Oracle, including exploiting the Oracle direct-path insert capability, utilizing Oracle partitions to maximize parallelism, and adding a MERGE capability that simultaneously updates and inserts into the target Oracle tables.

Despite the free availability of our Apache-licensed connector, many Sqoop users understandably continued to use default Sqoop, sometimes with disappointing outcomes. So in early 2014 Dell, Cloudera, and others in the Apache Sqoop community collaborated to bring the OraOop code directly into core Sqoop. We are all very happy to see this functionality now fully integrated into Sqoop, starting with release 1.4.5 (packaged in CDH 5.1 and later).

Assuming you use the direct=true clause on your Sqoop command line, all the optimizations outlined above will be employed. (Use of this clause does require that the Oracle account have the privileges required to read the Oracle extent information; it is not supported for views or index-organized tables.) There is no longer any need to install an additional Oracle connector.

Performance Results

The chart below illustrates a typical reduction in Oracle Database overhead when using the new Sqoop 1.4.5 -direct=true setting (import of a 310GB, 1-billion row table). The test platform had the following characteristics:

  • 9-node cluster on AWS EC2 (1 x NameNode/JobTracker, 8 x DataNode/TaskTracker), each with 8 CPUs and 15GB RAM, running CDH 5.1
  • 1 32-CPU database server with 60GB RAM, running Oracle Database
  • 10GB Ethernet on the database server, 1GB Ethernet on the Hadoop nodes

As you can see, overall elapsed time reduced by 83%, database time (total time consumed by all database operations) reduced by 90%, and IO requests reduced by 99%! The next chart shows the scalability profile for the direct=true option compared to direct=false (the only available option for Oracle Database transfers to Sqoop prior to 1.4.5). 

We can draw these conclusions:

  • When data is un-clustered (arranged on disk in effectively random order), the direct=false mode scales very poorly (blue line) because the index-range scans do not result in contiguous IO requests on disk—resulting in a lot of duplicate IO by each mapper.
  • As the number of mappers increases, this IO load clobbers the database and elapsed time degrades. When data is highly clustered (green line), performance scales better because each mapper’s index-range scan accesses distinct blocks of data on disk.
  • The new Sqoop direct mode (red line) is unaffected by the clustering of primary keys because it always reads blocks directly from disk in physically contiguous ranges. The absolute improvement in elapsed time varies, but is generally from 5-20 times greater than the non-direct mode.

As you can see, the performance benefits and reduction in load on the Oracle side of these optimizations is extremely significant. If you are performing data transfers between Oracle Database and Hadoop, we encourage you to try out the new Scoop 1.4.5 direct mode.

Guy Harrison (@guyharrison) is an Executive Director of Research and Development at the Dell Software Group. Guy is the author of five books and many articles on database and data management and writes a monthly column for Database Trends and Applications.

Categories: Hadoop

The Story of the Cloudera Engineering Hackathon (2014 Edition)

Cloudera Blog - Tue, 11/11/2014 - 22:07

Cloudera’s culture is premised on innovation and teamwork, and there’s no better example of them in action than our internal hackathon.

Cloudera Engineering doubled-down on its “hackathon” tradition last week, with this year’s edition taking an around-the-clock approach thanks to the HQ building upgrade since the 2013 edition (just look at all that space!).

This year, Cloudera software engineers had 24 straight hours to conceive, build, and present their hacks to a panel of celebrity judges. Thanks to a steady supply of solid- and liquid-state fuel, stimulation, and all-around great attitude, pet projects like OYA (“Oozie squeezing the hell out of YARN”), Cloudera Vacuum, HiveKa (Apache Hive-on-Apache Kafka), YARN-on-Mesos, and the Cloudera Data Science Sports Division saw the light of day, and there was much rejoicing.

And now, the Cloudera Engineering Hackathon 2014 in pictures (most photos by Jarcec Cecho):

 The tone was set early as peeps arrived:

Caffeine as a Service (CaaS) is always an important ingredient:

The ingredients didn’t stop at caffeine though:

With tanks full, it was time to start hacking…

And hacking…

And hacking some more (SO serious).

I’m pretty sure this photo was taken at 2 or 3am (but not by me!), is it obvious?

The presentations were grueling for everyone involved:

With our extra-intimidating judges involved, how could they not be?

But when all was said and done, the winners were ecstatic:


And that’s the story of the Cloudera Engineering Hackathon (2014 Edition).

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

How Cerner Uses CDH with Apache Kafka

Cloudera Blog - Tue, 11/11/2014 - 16:52

Our thanks to Micah Whitacre, a senior software architect on Cerner Corp.’s Big Data Platforms team, for the post below about Cerner’s use case for CDH + Apache Kafka. (Kafka integration with CDH is currently incubating in Cloudera Labs.)

Over the years, Cerner Corp., a leading Healthcare IT provider, has utilized several of the core technologies available in CDH, Cloudera’s software platform containing Apache Hadoop and related projects—including HDFS, Apache HBase, Apache Crunch, Apache Hive, and Apache Oozie. Building upon those technologies, we have been able to architect solutions to handle our diverse ingestion and processing requirements.

At various points, however, we reached certain scalability limits and perhaps even abused the intent of certain technologies, causing us to look for better options. By adopting Apache Kafka, Cerner has been able to solidify our core infrastructure, utilizing those technologies as they were intended.

One of the early challenges Cerner faced when building our initial processing infrastructure was moving from batch-oriented processing to technologies that could handle a streaming near-real-time system. Building upon the concepts in Google’s Percolator paper, we built a similar infrastructure on top of HBase. Listeners interested in data of specific types and from specific sources would register interest in data written to a given table. For each write performed, a notification for each applicable listener would be written to a corresponding notification table. Listeners would continuously scan a small set of rows on the notification table looking for new data to process, deleting the notification when complete.

Our low-latency processing infrastructure worked well for a time but quickly reached scalability limits based on its use of HBase. Listener scan performance would degrade without frequent compactions to remove deleted notifications. During the frequent compactions, performance would degrade, causing severe drops in processing throughput. Processing would require frequent reads from HBase to retrieve the notification, the payload, and often supporting information from other HBase tables. The high number of reads would often contend with writes done our processing infrastructure that were writing transformed payloads and additional notifications for downstream listeners. The I/O contention and the compaction needs required careful management to distribute the load across the cluster, often segregating the notification tables on isolated region servers. 

Adopting Kafka was a natural fit for reading and writing notifications. Instead of scanning rows in HBase, a listener would process messages off of a Kafka topic, updating its offset as notifications were successfully processed. 

Kafka’s natural separation of producers and consumers eliminated contention at the HBase RegionServer due to the high number of notification read and write operations. Kafka’s consumer offset tracking helped to eliminate the need for notification deletes, and replaying notifications became as simple as resetting the offset in Kafka. Offloading the highly transient data from HBase greatly reduced unnecessary overhead from compactions and high I/O. 

Building upon the success of Kafka-based notifications, Cerner then explored using Kafka to simplify and streamline data ingestion. Cerner systems ingest data from multiple disparate sources and systems. Many of these sources are external to our data centers. The “Collector,” a secured HTTP endpoint, will identify and namespace the data before it is persisted into HBase. Prior to utilizing Kafka, our data ingestion infrastructure targeted a single data store such as an HBase cluster. 

The system satisfied our initial use cases but as our processing needs changed, so did the complexity of our data ingestion infrastructure. Data would often need to be ingested into multiple clusters in near real time, and not all data needed the random read/write functionality of HBase.

Utilizing Kafka in our ingestion platform helped provide a durable staging area, giving us a natural way to broadcast the ingested data to multiple destinations. The collector process stayed simple by persisting data into Kafka topics, segregated by source. Pushing data to Kafka resulted in a noticeable improvement as the uploading processes were no longer subject to intermittent performance degradations due to compaction or region splitting with HBase.

After data lands in Kafka, Apache Storm topologies push data to consuming clusters independently. Kafka and Storm allow the collector process to remain simple by eliminating the need to deal with multiple writes or the performance influence of the slowest downstream system. Storm’s at least once guarantee of delivering the data is acceptable because persistence of the data is idempotent.

The separation that Kafka provides also allows us to aggregate the data for processing as necessary. Some medical data feeds produce a high volume of small payloads that only need to be processed through batch methods such as MapReduce. Linkedin’s Camus project allows our ingestion platform to persist batches of small payloads within Kafka topics into larger files in HDFS for processing. In fact, all the data we ingest into Kafka is archived into HDFS as Kite SDK Datasets using the Camus project. This approach gives us the ability to perform further analytics and processing that do not require low latency processing on that data. Archiving the data also provides a recovery mechanism in case data delivery lags beyond the topic retention policies of Kafka. 

Cerner’s use of Kafka for ingesting data will allow us to continue to experiment and evolve our data processing infrastructure when new use cases are discovered. Technologies such as Spark Streaming, Apache Samza (incubating), and Apache Flume can be explored as alternatives or additions to the current infrastructure. Cerner can prototype Lambda and Kappa architectures for multiple solutions independently without affecting the processes producing data. As Kafka’s multi-tenancy capabilities develop, Cerner can also look to simplify some of its data persistence needs, eliminating the need to push to downstream HBase clusters. 

Overall, Kafka will play a key role in Cerner’s infrastructure for large-scale distributed processing and be a nice companion to our existing investments in Hadoop and HBase.

Micah Whitacre (@mkwhit) is a senior software architect on Cerner Corp.’s Big Data Platforms team, and an Apache Crunch committer.

Categories: Hadoop

Where to Find Cloudera Tech Talks (Through End of 2014)

Cloudera Blog - Mon, 11/10/2014 - 16:52

Find Cloudera tech talks in Seattle, Las Vegas, London, Madrid, Budapest, Barcelona, Washington DC, Toronto, and other cities through the end of 2014.

Below please find our regularly scheduled quarterly update about where to find tech talks by Cloudera employees—this time, for the remaining dates of 2014. Note that this list will be continually curated during the period; complete logistical information may not be available yet. And remember, many of these talks are in “free” venues (no cost of entry).

As always, we’re standing by to assist your meetup by providing speakers, sponsorships, and schwag!

Date City Venue Speaker(s) Nov. 9-14 Seattle USENIX LISA14 Gwen Shapira and Yanpei Chen on big data benchmarking, Yanpei and Karthik Kambatla on Hadoop-on-SSD, Kate Ting and Jon Hsieh on Hadoop ops Nov. 11-14 Las Vegas AWS re:Invent Amandeep Khurana on Hadoop/cloud bets practices Nov. 11-14 Washington, DC Lucene/Solr Revolution 2014 Romain Rigaux on Hue’s Search app, Mark Miller on Solr-on-HDFS, Greg Chanan on secure search Nov. 17 London Hadoop Users Group UK Mark Grover and Ted Malaska on Hadoop apps architecture Nov. 17 Madrid Big Data Spain Sean Owen on Spark-based anomaly detection, Yanpei Chen and Gwen Shapira on big data benchmarking, Mark Grover and Jonathan Seidman on Hadoop apps architecture, Enrico Berti on Hue Nov. 18 Budapest Big Data Budapest Colin McCabe on HDFS optimization Nov. 19 Bellevue, Wash. Seattle Spark Meetup Jain Ranganathan on Spark/CDH integration Nov. 19-21 Barcelona Strata+Hadoop World Barcelona Multiple Cloudera speakers: Impala, Spark, genomics, Hadoop apps architecture, more Nov. 17-21 Budapest ApacheCon Europe Dima Spivak on Hadoop-on-Docker, Colin McCabe on native clients for HDFS and HDFS optimization, Xuefu Zhang on security Dec. 2 Menlo Park, Calif. Bay Area Spark Meetup TBD Dec. 5 Dulles, Va. DevIgnition 2014 Doug Cutting on Hadoop ecosystem Dec. 18 Louisville, Ky. Louisville BI & Big Data Meetup TBD Dec. 18 Toronto, Ont. Toronto HUG Mark Grover on Hadoop apps architecture


Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

This Month in the Ecosystem (October 2014)

Cloudera Blog - Fri, 11/07/2014 - 18:34

Welcome to our 14th edition of “This Month in the Ecosystem,” a digest of highlights from October 2014 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly).

  • Approximately 5,000 people converged at Javits Center for Strata + Hadoop World New York 2014, where Cloudera and O’Reilly Media announced an extended partnership in which Strata + Hadoop World events will be held around the world, 4x per year. Next stops: San Jose (Feb. 17-20), Barcelona (Nov. 19-21) and London (May 5-7).
  • Cloudera announced the formation of Cloudera Labs, an incubator for ecosystem innovations that are candidates for shipping inside CDH. Among its first projects is CDH integration with Apache Kafka.
  • Via its Cloudera Live program, Cloudera also made a new end-to-end Apache Hadoop tutorial available to Hadoop evaluators that utilizes a full cloud-based demo cluster and sample data (free access for two weeks). Tableau and Zoomdata flavors are also provided.
  • eBay has open sourced its Kylin SQL-on-Hadoop framework. Remember when SQL was “dead?”
  • MapR announced that it will add Apache Spark support to Apache Drill. Furthermore, several other Big Data related vendors announced transitions away from MapReduce in their platforms and toward Spark. The Spark invasion continues!
  • Cloudera and Microsoft announced that Cloudera Enterprise will become certified on Azure and that deeper integration with the Microsoft stack is forthcoming.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop