Using Apache Parquet at AppNexus

Cloudera Blog - Wed, 04/15/2015 - 16:10

Thanks to Chen Song, Data Team Lead at AppNexus, for allowing us to republish the following post about his company’s use case for Apache Parquet (incubating at this writing), the open standard for columnar storage across the Apache Hadoop ecosystem.

At AppNexus, over 2MM log events are ingested into our data pipeline every second. Log records are sent from upstream systems in the form of Protobuf messages. Raw logs are compressed in Snappy when stored on HDFS. That said, even with compression, this still leads to over 25TB of log data collected every day. On top of logs, we also have hundreds of MapReduce jobs that process and generate aggregated data. Collectively, we store petabytes of data in our primary Hadoop cluster.

Parquet (incubating at the time of writing) is a columnar storage format in the Hadoop ecosystem. Compared to a traditional row oriented format, it is much more efficient in storage and has better query performance. Parquet is widely used in the Hadoop world for analytics workloads by many query engines. Among them are engines on top of Hadoop, such as Apache HiveImpala, and systems that go beyond MapReduce to improve performance (Apache SparkPresto).

Parquet stores binary data in a column-oriented way, where the values of each column are organized so that they are all adjacent, enabling better compression. It is especially good for queries which read particular columns from a “wide” (with many columns) table, since only needed columns are read and IO is minimized. 

Data is important but storage costs money, not to mention the processing resources at this scale, including CPU, network IO, etc. With limited hardware provided, we started looking at member COGS since end of last year and would like to figure out a way to serve on-growing data needs. We think Parquet is the choice as it serves both needs, efficient and performant in both storage and processing.

The benchmark here measures some key performance metrics, including disk usage, resource usage, and query performance. We did comparisons on two storage formats:

  • Sequence Files with Protobuf compressed by Snappy
  • Parquet Files compressed by Snappy

(Note: This benchmark is not intended to provide a performance comparison among query engines out on the market. For now, we have chosen Hive as the simple query engine and we used default configurations for Hive without optimization, with the goal to make the results reproducible.)


For testing, we picked one of our biggest tables, aggregated_impressions. Rows in this table represent impressions, callbacks and auctions joined at transactions level. It is now the biggest table stored in our Hadoop cluster, which currently takes 270TB of HDFS storage (810TB in raw storage after three replications), and serves as the primary source of data for most of the higher-level aggregated tables.

On HDFS, files for this table are stored in data blocks of size 128MB(134217728 bytes).

Disk Storage

We compare HDFS usage of a single day for this table in different storage formats and break down in hours. The result is shown in the following chart.

Query Performance

Query performance for Parquet tables really depends on the number of columns needed to process in SELECT, WHERE, and GROUP BY clauses of the query. Say the table has n columns, and m columns are needed in the query, we denote column ration t as determined by m / n. The smaller the ratio t is, the bigger performance improvement you will see for that query on Parquet tables than regular row-oriented tables.

We picked three common used queries on aggregated_impressions and measure their execution times and resource consumptions. The same set of queries were run against data stored in the two data formats, Parquet and Sequence with Protobuf.

  1. The first query does a simple aggregation for the table aggregated_impressions on one particular field. The only dimension used here is imp_type which has a low cardinality of 10. The aggregation generates metrics on double_monetary_field1 and double_monetary_field2.

  2. The second query is pulled from one of our hive jobs, agg_high_dimesion_data. The query performs a more complex, high cardinality aggregation on multiple different dimensions and metrics.

  3. The third query is a common type ad hoc query to find some sparse records in the massive dataset. Sometimes, such type of query may produce no output.

For all 3 tests, Hive queries ran with the same configuration.

  • Block size is 134217728 bytes
  • Hive max split size is 256000000 bytes
Comparison over MR Metrics Query 1

SELECT imp_type, Sum(double_monetary_field1), Sum(double_monetary_field1) FROM aggregated_impressions WHERE dh = '${hour}' GROUP BY imp_type;

Query 2

INSERT overwrite TABLE *agg_high_dimesion_data* partition ( dy='2015', dm='2015-03', dd='2015-03-30', dh='${hour}' ) SELECT date_trunc('hour',date_time) AS ymdh, bl.aid, bl.cgid, bl.cid, bl.crid, bl.pid, bl.vid, CASE WHEN int_field1 < 20 THEN int_field1 ELSE 20 END AS int_field1_cal, CASE ... // A large CASE WHEN statement END AS int_field2_cal, sum(bl.isI) AS i_count, sum(bl.isC) AS c_count, sum(pc_number) AS pc_count, sum(bl.pv_number) AS pv_count, to_numeric(sum(double_field)/1000, '%18.6f') AS double_field FROM ( SELECT * FROM aggregated_impressions WHERE date_time >= '${hour}:00:00' AND date_time <= '${hour}:59:59' AND dh = '${hour}' ) bl GROUP BY date_trunc('hour',date_time) , bl.aid, bl.cgid, bl.cid, bl.crid, bl.pid, bl.vid, CASE WHEN some_frequency < 20 THEN some_frequency ELSE 20 END , CASE ... // A large CASE WHEN statement END

Query 3

SELECT auction_id_64, imp_type, date_time FROM aggregated_impressions WHERE aid=1234567 AND id1=1111 AND id2=4567 AND dh = '${hour}' ORDER BY date_time ASC limit 25;

For all three tests, queries on Parquet table needed only half as many maps as those on the Protobuf table. We created several plots below that break down some of the key MapReduce job performance metrics.

  • Total CPU Time

  • Average Map Time

As you can see from the above charts, Parquet really excels when the query is on sparse data or low cardinality in column selection. The CPU usage can be reduced by more than 98% in total and the total memory consumption is cut by at least half. To summarize, Parquet is a better technology in storage and computation resources. At the time of this writing, we have migrated a few of our biggest tables on Parquet.

Upstream log data comes in as Protobuf. After ingestion, we validate this data before loading to HDFS and also whenever jobs read any Protobuf messages from HDFS. This validation framework, along with our entire MapReduce job API framework are tied to the Protobuf data format. 

Categories: Hadoop

"Hadoop: The Definitive Guide" is Now a 4th Edition

Cloudera Blog - Mon, 04/13/2015 - 14:23

Apache Hadoop ecosystem, time to celebrate! The much-anticipated, significantly updated 4th edition of Tom White’s classic O’Reilly Media book, Hadoop: The Definitive Guide, is now available.

The Hadoop ecosystem has changed a lot since the 3rd edition. How are those changes reflected in the new edition?

The core of the book is about the core Apache Hadoop project, and since the 3rd edition, Hadoop 2 has stabilized and become the Hadoop runtime that most people are using. The 3rd edition actually covered both Hadoop 1 (based on the JobTracker) and Hadoop 2 (based on YARN), which made things a bit awkward at times since it flipped between the two and had to describe the differences. Only Hadoop 2 is covered in the 4th edition, which simplifies things considerably. The YARN material has been expanded and now has a whole chapter devoted to it.

This update is the biggest since the 1st edition, and in response to reader feedback, I reorganized the chapters to simplify the flow. The new edition is broken into parts (I. Hadoop Fundamentals, II. MapReduce, III. Hadoop Operations, IV. Related Projects, V. Case Studies), and includes a diagram to show possible pathways through the book (on p. 17).

The Hadoop ecosystem has been growing faster with each new edition, which makes it impossible to cover everything; even if I wanted to, there wouldn’t be enough space. The book is aimed primarily at users doing data processing, so in this edition I added two new chapters about processing frameworks (Apache Spark and Apache Crunch), one on data formats (Apache Parquet, incubating at this writing) and one on data ingestion (Apache Flume).

I’m also really pleased with the two new case studies in this edition: one about how Hadoop is used to manage records in a healthcare system (by Ryan Brush and Micah Whitacre), and one on building big data genomics pipelines (by Matt Massie).

Based on those changes, what do you want readers to learn?

I think the core Hadoop features are still important to understand—things like how HDFS stores files in blocks, how MapReduce input splits work, how YARN schedules work across nodes in the cluster. These ideas provide the foundation for learning how components covered in later chapters take advantage of these features. For example, Spark uses MapReduce input formats for reading and writing data efficiently, and it can run on YARN.

Beyond that, we’ve seen how the Hadoop platform as a whole has become even more powerful and flexible, and the new chapters reflect some of these new capabilities, such as iterative processing with Spark.

In a nutshell, what does your research process/methodology look like?

I think the two main things that readers want from a book like this are: 1) good examples for each component, and 2) an explanation of how the component in question works. Examples are important since they are concrete and allow readers to start using and exploring the system. In addition, a good mental model is important for understanding how the system works so users can reason about it, and extend the examples to cover their own use cases.

There’s a Martin Gardner quote that I cite in the book, and which sums up my approach to writing about technology: “Beyond calculus, I am lost. That was the secret of my column’s success. It took me so long to understand what I was writing about that I knew how to write in a way most readers would understand.”

I find that there’s really no substitute for reading the code to understand how a component works. I spend a lot of time writing small examples to test how different aspects of the component work. A few of these are turned into examples for the book. I also spend a lot of time reading JIRAs to understand the motivation for features, their design, and how they relate to other features. Finally, I’m very lucky to have access to a talented group of reviewers who work on Hadoop projects. Their feedback has undoubtedly improved the book.

Nothing can be completely “definitive.” What is good complementary material for this book?

The goal of my book is to explain how the component parts of Hadoop and its ecosystem work and how to use them—the nuts and bolts, as it were. What it doesn’t do is explain how to tie all the pieces together to build applications. For this I recommend Hadoop Application Architectures by Mark Grover, Ted Malaska, Jonathan Seidman, and Gwen Shapira, which explains how to select Hadoop components and use them to build a data application. For building machine-learning applications, I like Advanced Analytics with Spark by Sandy Ryza, Uri Laserson, Sean Owen, and Josh Wills.

My book has some material for Hadoop administrators, but Eric Sammer’s Hadoop Operations (2nd edition forthcoming) goes into a lot more depth. There are also books for most of the Hadoop components that go into more depth than mine.

It’s really gratifying to see the large number books coming out in the Hadoop and big data space.

Do you have a 5th edition in you?

I like to think so, but I’m not sure my family would agree (yet)!

Categories: Hadoop

Sneak Preview: HBaseCon 2015 Operations Track

Cloudera Blog - Fri, 04/10/2015 - 13:57

This year’s HBaseCon Operations track features some of the world’s largest and most impressive operators.

In this post, I’ll give you a window into the HBaseCon 2015′s (May 7 in San Francisco) Operations track.

Thanks, Program Committee!

  • “HBase Operations in a Flurry”

    Rahul Gidwani & Ian Friedman (Yahoo!)

    With multiple clusters of 1,000+ nodes replicated across multiple data centers, Flurry has learned many operational lessons over the years. In this talk, you’ll explore the challenges of maintaining and scaling Flurry’s cluster, how we monitor, and how we diagnose and address potential problems. 

  • ” HBase at Scale in an Online and High-Demand Environment”

    Jeremy Carroll & Tian Ying-Chang (Pinterest)

    Pinterest runs 38 different HBase clusters in production, doing a lot of different types of work—with some doing up to 5 million operations per second. In this talk, you’ll get details about how we do capacity planning, maintenance tasks such as online automated rolling compaction, configuration management, and monitoring.

  • “OpenTSDB and AsyncHBase Update”

    Chris Larsen (Yahoo!) & Benoît Sigoure (Arista Networks)

    OpenTSDB continues to scale along with HBase. A number of updates have been implemented to push writes over 2 million data points a second. Here we will discuss about HBase schema improvements, including salting, random UI assignment, and using append operations instead of puts. You’ll also get AsyncHBase development updates about rate limiting, statistics, and security. 

  • “Multitenancy in HBase: Learnings from Yahoo!” 

    Francis Liu & Vandana Ayyalasomayajula (Yahoo!)

    Since 2013, thanks to a combination of deployment and HBase enhancements, Yahoo! has successfully supported a diverse set of tenants in a single HBase cluster. Here you’ll learn how this approach makes it feasible to support small and large tenants cost-effectively, minimizes operational overhead, and provides a lot of flexibility.

  • “Smooth Operators: Panel”

    Shaohui Liu & Jianwei Cui (Xiaomi)

    In this session, you will learn the work Xiaomi has done to improve the availability and stability of our HBase clusters, including cross-site data and service backup and a coordinated compaction framework. You’ll also learn about the Themis framework, which supports cross-row transactions on HBase based on Google’s percolator algorithm, and its usage in Xiaomi’s applications.

  • “DeathStar: Easy, Dynamic, Multi-tenant HBase via YARN”

    Cosmin Lehene (Adobe)

    Adobe has packaged HBase in Docker containers and uses Marathon and Mesos to schedule them—allowing us to decouple the RegionServer from the host, express resource requirements declaratively, and open the door for unassisted real-time deployments, elastic (up and down) real-time scalability, and more. In this talk, you’ll hear what we’ve learned and explain why this approach could fundamentally change HBase operations.

  • “Elastic HBase on Mesos”

    Nitin Aggarwal & Ishan Chhabra (Rocket Fuel)

    In this talk, you’ll learn how Rocket Fuel has developed various HBase access patterns and multi-tenancy scenarios and the role of DeathStar, an in-house solution built on top of Apache Slider and YARN. We’ll cover how we use a single YARN cluster to host multiple smaller and highly customized HBase clusters, and how dynamic provisioning and elastic scaling are made possible in this model.

    Getting interested? Wait until you see the Development & Internals sneak preview next week!

Thank you to our sponsors – Bloomberg, Cask Data, Hortonworks, MapR, Pepperdata, Salesforce.com (Gold); Facebook, Google (Silver); Apache Software Foundation, The Hive Think Tank (Community); O’Reilly Media (Media) — without which HBaseCon would be impossible!

Categories: Hadoop

How-to: Install Hue on a Mac

Cloudera Blog - Tue, 04/07/2015 - 19:48

You might have already all the prerequisites installed but we are going to show how to start from a fresh Yosemite (10.10) install and end up with running Hue on your Mac in almost no time!

We are going to be using the official Quickstart VM from Cloudera that already packs all the Apache Hadoop ecosystem components your Hue will talk to. If you don’t have the latest already downloaded and running, please visit this link and choose the versions that suits you the best.

In the meanwhile, let’s set up your Mac!

Step 1: Clone the Hue repository

To clone the Hue Github repository you need git installed on your system. Git (plus a ton of other tools) is included in the Xcode command line tools. To install it open Terminal and type

xcode-select --install

In the dialog choose “Install”. If on Terminal you have the message “xcode-select: error: command line tools are already installed, use “Software Update” to install updates” it means you are almost good to go already.

From Terminal, navigate to a directory where you keep all your project and run

git clone https://github.com/cloudera/hue.git

You now have the Hue source code in your Mac.

Step 2: Install Java

The build process use Java to run. A quick way to get to the right download URL from Oracle is to run from terminal:

java -version

and then click on the “More info” button on the dialog that appears. On Oracle’s website, accept the license and choose the Mac OS X JDK link. After the DMG has been downloaded, open it and double click on the installation package. Now, if we return to the terminal and type again:

java -version

we will have the version of the freshly installed JDK. At the time of writing, 1.8.0_40.

Step 3: Install the pre-requisites

Hue uses several libraries that are not included in the XCode command line tools so we will need to install that too. To do that we will use Homebrew, the fantastic open source package manager for Mac OS X. Install it from terminal with

ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

You will need to enter your password to continue. Then, as suggested by the installation script, run

brew doctor

If you already have Homebrew installed, just update it running

brew update

As a first thing, we need to install Maven 3

brew install maven

And then MySQL to have the development libraries for it

brew install mysql

Step 4: Compile and Configure Hue

Now that we are all set with the requirements we can compile Hue by running

make apps

from the Hue folder that was created by the git clone in step 1. After a while, if everything goes as planned, you should see as a last build message something like “N static files copied to …”.

Hue comes with a default configuration file that points all the service to the local machine. Since we are using a VM for this purposes, we will need to change several conf lines. For your convenience, we have the file readily available here.

Just copy this file over to your hue/desktop/conf folder!

Step 5: Configure your /etc/hosts

The last thing we should do is to start the Quickstart VM and get its IP address


(You can launch the terminal inside the VM and run ‘ifconfig’ for that; in my case it’s Then, on your Mac, edit the hosts file with

sudo vi /etc/hosts

and add the line quickstart.cloudera

with the IP you got from the VM. Save and you are good to go!

Step 6: Run!

What you have to do on Terminal from the Hue folder is just

./build/env/hue runserver

And point your browser to http://localhost:8000! Go and write a new app now!

As usual feel free to comment on the hue-user list, in the Hue discussion forum, or @gethue.

Categories: Hadoop

Sneak Preview: HBaseCon 2015 General Session

Cloudera Blog - Thu, 04/02/2015 - 20:10

As is its tradition, this year’s HBaseCon General Session includes keynotes about the world’s most awesome HBase deployments.

It’s Spring, which also means that it’s HBaseCon season—the time when the Apache HBase community gathers for its annual ritual.

Specifically, when HBaseCon 2015 arrives on May 7 in San Francisco, it will do so with a loud bang: Yet again, the agenda contains presentations from the developers and operators of some of the world’s most impressive production HBase deployments, including those in use at Adobe, Cerner, Dropbox, eBay, Facebook, FINRA, Pinterest, Salesforce.com, Xiaomi, Yahoo!, and more.  (For a taste of what HBaseCon is like, see the photos and presentations from 2014.)

Over the next few weeks, as is my custom, I’ll be bringing you sneak previews of session content (across Operations, Development & Internals, Ecosystem, and Use Cases tracks) accepted by the Program Committee. In this post, we’ll begin with the General Session:

  • “The State of HBase”

    Andrew Purtell (Salesforce.com), Enis Söztutar (Hortonworks), Michael Stack (Cloudera)

    With HBase hitting the 1.0 mark and adoption/production use cases continuing to grow, it’s been an exciting year since last we met at HBaseCon 2014. What is the state of HBase and its ecosystem today, and where does it go from here?

  • “Bigtable and HBase: Storing the World’s Data”

    Carter Page (Google)

    HBase is based on Bigtable, designed over a decade ago at Google. This architecture is the choice for mind-bendingly large datasets both inside and outside of Google because it scales structured data unlike anything else. As an update to his 2014 keynote, Carter will talk about how Google has continued to improve Bigtable to meet ever-larger datasets and more demanding performance requirements.

  • “Zen: A Graph Data Model on HBase”

    Raghavendra Prabhu & Xun Liu (Pinterest)

    Zen is a storage service built at Pinterest that offers a graph data model on top of HBase and potentially other storage backends. In this talk, RVP and Xun will go over the design motivation for Zen and describe its internals including the API, type system, and HBase backend. 

  • “The Evolution of HBase @ Bloomberg” 

    Matthew Hunt & Sudarshan Kadambi (Bloomberg)

    Learn the evolution and consolidation of Bloomberg’s core infrastructure around fewer, faster, and simpler systems, and the role HBase plays within that effort. You’ll also hear about HBase modifications to accommodate the “medium data” use case and get a preview of what’s to come.

As it was last year, for HBase learners, the General Session will be preceded by an optional “Apache HBase: Just the Basics” session from Jesse Anderson.

Interested yet? If not, next week, we’ll offer a preview of the Operations track.

Thank you to our sponsors – Bloomberg, Cask Data, Hortonworks, MapR, Salesforce.com (Gold); Facebook, Google (Silver); Apache Software Foundation, The Hive Think Tank (Community); O’Reilly Media (Media) — without which HBaseCon would be impossible!

Categories: Hadoop

How-to: Tune Your Apache Spark Jobs (Part 2)

Cloudera Blog - Mon, 03/30/2015 - 15:50

In the conclusion to this series, learn how resource tuning, parallelism, and data representation affect Spark job performance.

In this post, we’ll finish what we started in “How to Tune Your Apache Spark Jobs (Part 1)”. I’ll try to cover pretty much everything you could care to know about making a Spark program run fast. In particular, you’ll learn about resource tuning, or configuring Spark to take advantage of everything the cluster has to offer. Then we’ll move to tuning parallelism, the most difficult as well as most important parameter in job performance. Finally, you’ll learn about representing the data itself, in the on-disk form which Spark will read (spoiler alert: use Apache Avro or Apache Parquet) as well as the in-memory format it takes as it’s cached or moves through the system.

Tuning Resource Allocation

The Spark user list is a litany of questions to the effect of “I have a 500-node cluster, but when I run my application, I see only two tasks executing at a time. HALP.” Given the number of parameters that control Spark’s resource utilization, these questions aren’t unfair, but in this section you’ll learn how to squeeze every last bit of juice out of your cluster. The recommendations and configurations here differ a little bit between Spark’s cluster managers (YARN, Mesos, and Spark Standalone), but we’re going to focus only on YARN, which Cloudera recommends to all users.

For some background on what it looks like to run Spark on YARN, check out my post on this topic.

The two main resources that Spark (and YARN) think about are CPU and memory. Disk and network I/O, of course, play a part in Spark performance as well, but neither Spark nor YARN currently do anything to actively manage them.

Every Spark executor in an application has the same fixed number of cores and same fixed heap size. The number of cores can be specified with the --executor-cores flag when invoking spark-submit, spark-shell, and pyspark from the command line, or by setting the spark.executor.cores property in the spark-defaults.conf file or on a SparkConf object. Similarly, the heap size can be controlled with the --executor-cores flag or the spark.executor.memory property. The cores property controls the number of concurrent tasks an executor can run. --executor-cores 5 means that each executor can run a maximum of five tasks at the same time. The memory property impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins.

The --num-executors command-line flag or spark.executor.instances configuration property control the number of executors requested. Starting in CDH 5.4/Spark 1.3, you will be able to avoid setting this property by turning on dynamic allocation with the spark.dynamicAllocation.enabled property. Dynamic allocation enables a Spark application to request executors when there is a backlog of pending tasks and free up executors when idle.

It’s also important to think about how the resources requested by Spark will fit into what YARN has available. The relevant YARN properties are:

  • yarn.nodemanager.resource.memory-mb controls the maximum sum of memory used by the containers on each node.
  • yarn.nodemanager.resource.cpu-vcores controls the maximum sum of cores used by the containers on each node.

Asking for five executor cores will result in a request to YARN for five virtual cores. The memory requested from YARN is a little more complex for a couple reasons: 

  • --executor-memory/spark.executor.memory controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. The value of the spark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full memory request to YARN for each executor. It defaults to max(384, .07 * spark.executor.memory).
  • YARN may round the requested memory up a little. YARN’s yarn.scheduler.minimum-allocation-mb and yarn.scheduler.increment-allocation-mb properties control the minimum and increment request values respectively.

The following (not to scale with defaults) shows the hierarchy of memory properties in Spark and YARN:

And if that weren’t enough to think about, a few final concerns when sizing Spark executors:

  • The application master, which is a non-executor container with the special capability of requesting containers from YARN, takes up resources of its own that must be budgeted in. In yarn-client mode, it defaults to a 1024MB and one vcore. In yarn-cluster mode, the application master runs the driver, so it’s often useful to bolster its resources with the --driver-memory and --driver-cores properties.
  • Running executors with too much memory often results in excessive garbage collection delays. 64GB is a rough guess at a good upper limit for a single executor.
  • I’ve noticed that the HDFS client has trouble with tons of concurrent threads. A rough guess is that at most five tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor below that number.
  • Running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM. For example, broadcast variables need to be replicated once on each executor, so many small executors will result in many more copies of the data.

To hopefully make all of this a little more concrete, here’s a worked example of configuring a Spark app to use as much of the cluster as possible: Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities, yarn.nodemanager.resource.memory-mb and yarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes. Cloudera Manager helps by accounting for these and configuring these YARN properties automatically.

The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because:

  • 63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers.
  • The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node.
  • 15 cores per executor can lead to bad HDFS I/O throughput.

A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why?

  • This config results in three executors on all nodes except for the one with the AM, which will have two executors.
  • --executor-memory was derived as (63/3 executors per node) = 21.  21 * 0.07 = 1.47.  21 – 1.47 ~ 19.
Tuning Parallelism

Spark, as you have likely figured out by this point, is a parallel processing engine. What is maybe less obvious is that Spark is not a “magic” parallel processing engine, and is limited in its ability to figure out the optimal amount of parallelism. Every Spark stage has a number of tasks, each of which processes data sequentially. In tuning Spark jobs, this number is probably the single most important parameter in determining performance.

How is this number determined? The way Spark groups RDDs into stages is described in the previous post. (As a quick reminder, transformations like repartition and reduceByKey induce stage boundaries.) The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage. The number of partitions in an RDD is the same as the number of partitions in the RDD on which it depends, with a couple exceptions: thecoalesce transformation allows creating an RDD with fewer partitions than its parent RDD, the union transformation creates an RDD with the sum of its parents’ number of partitions, and cartesian creates an RDD with their product.

What about RDDs with no parents? RDDs produced by textFile or hadoopFile have their partitions determined by the underlying MapReduce InputFormat that’s used. Typically there will be a partition for each HDFS block being read. Partitions for RDDs produced by parallelize come from the parameter given by the user, or spark.default.parallelism if none is given.

To determine the number of partitions in an RDD, you can always call rdd.partitions().size().

The primary concern is that the number of tasks will be too small. If there are fewer tasks than slots available to run them in, the stage won’t be taking advantage of all the CPU available. 

A small number of tasks also mean that more memory pressure is placed on any aggregation operations that occur in each task. Any join, cogroup, or *ByKey operation involves holding objects in hashmaps or in-memory buffers to group or sort. join, cogroup, and groupByKey use these data structures in the tasks for the stages that are on the fetching side of the shuffles they trigger. reduceByKey and aggregateByKey use data structures in the tasks for the stages on both sides of the shuffles they trigger.

When the records destined for these aggregation operations do not easily fit in memory, some mayhem can ensue. First, holding many records in these data structures puts pressure on garbage collection, which can lead to pauses down the line. Second, when the records do not fit in memory, Spark will spill them to disk, which causes disk I/O and sorting. This overhead during large shuffles is probably the number one cause of job stalls I have seen at Cloudera customers.

So how do you increase the number of partitions? If the stage in question is reading from Hadoop, your options are:

  • Use the repartition transformation, which will trigger a shuffle.
  • Configure your InputFormat to create more splits.
  • Write the input data out to HDFS with a smaller block size.

If the stage is getting its input from another stage, the transformation that triggered the stage boundary will accept a numPartitions argument, such as

val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)

What should “X” be? The most straightforward way to tune the number of partitions is experimentation: Look at the number of partitions in the parent RDD and then keep multiplying that by 1.5 until performance stops improving. 

There is also a more principled way of calculating X, but it’s difficult to apply a priori because some of the quantities are difficult to calculate. I’m including it here not because it’s recommended for daily use, but because it helps with understanding what’s going on. The main goal is to run enough tasks so that the data destined for each task fits in the memory available to that task.

The memory available to each task is (spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores. Memory fraction and safety fraction default to 0.2 and 0.8 respectively.

The in-memory size of the total shuffle data is harder to determine. The closest heuristic is to find the ratio between Shuffle Spill (Memory) metric and the Shuffle Spill (Disk) for a stage that ran. Then multiply the total shuffle write by this number. However, this can be somewhat compounded if the stage is doing a reduction:

Then round up a bit because too many partitions is usually better than too few partitions.

In fact, when in doubt, it’s almost always better to err on the side of a larger number of tasks (and thus partitions). This advice is in contrast to recommendations for MapReduce, which requires you to be more conservative with the number of tasks. The difference stems from the fact that MapReduce has a high startup overhead for tasks, while Spark does not.

Slimming Down Your Data Structures

Data flows through Spark in the form of records. A record has two representations: a deserialized Java object representation and a serialized binary representation. In general, Spark uses the deserialized representation for records in memory and the serialized representation for records stored on disk or being transferred over the network. There is work planned to store some in-memory shuffle data in serialized form.

The spark.serializer property controls the serializer that’s used to convert between these two representations. The Kryo serializer, org.apache.spark.serializer.KryoSerializer, is the preferred option. It is unfortunately not the default, because of some instabilities in Kryo during earlier versions of Spark and a desire not to break compatibility, but the Kryo serializer should always be used

The footprint of your records in these two representations has a massive impact on Spark performance. It’s worthwhile to review the data types that get passed around and look for places to trim some fat.

Bloated deserialized objects will result in Spark spilling data to disk more often and reduce the number of deserialized records Spark can cache (e.g. at the MEMORY storage level). The Spark tuning guide has a great section on slimming these down.

Bloated serialized objects will result in greater disk and network I/O, as well as reduce the number of serialized records Spark can cache (e.g. at the MEMORY_SER storage level.)  The main action item here is to make sure to register any custom classes you define and pass around using the SparkConf#registerKryoClasses API.

Data Formats

Whenever you have the power to make the decision about how data is stored on disk, use an extensible binary format like Avro, Parquet, Thrift, or Protobuf. Pick one of these formats and stick to it. To be clear, when one talks about using Avro, Thrift, or Protobuf on Hadoop, they mean that each record is a Avro/Thrift/Protobuf struct stored in a sequence file. JSON is just not worth it. 

Every time you consider storing lots of data in JSON, think about the conflicts that will be started in the Middle East, the beautiful rivers that will be dammed in Canada, or the radioactive fallout from the nuclear plants that will be built in the American heartland to power the CPU cycles spent parsing your files over and over and over again. Also, try to learn people skills so that you can convince your peers and superiors to do this, too.

Sandy Ryza is a Data Scientist at Cloudera, an Apache Hadoop committer, and an Apache Spark contributor. He is a co-author of the O’Reilly Media book, Advanced Analytics with Spark.

Categories: Hadoop

Checklist for Painless Upgrades to CDH 5

Cloudera Blog - Thu, 03/26/2015 - 15:57

Following these best practices can make your upgrade path to CDH 5 relatively free of obstacles.

Upgrading the software that powers mission-critical workloads can be challenging in any circumstance. In the case of CDH, however, Cloudera Manager makes upgrades easy, and the built-in Upgrade Wizard, available with Cloudera Manager 5, further simplifies the upgrade process. The wizard performs service-specific upgrade steps that, previously, you had to run manually, and also features a rolling restart capability that reduces downtime for minor and maintenance version upgrades. (Please refer to this blog post or webinar to learn more about the Upgrade Wizard).

As you prepare to upgrade your cluster, keep this checklist of some of Cloudera’s best practices and additional recommendations in mind. Please note that this information is complement to, not a replacement for, the comprehensive upgrade documentation.

Backing Up Databases

You will need to take backups prior to the upgrade. It is recommended that you already have procedures in place to periodically backup your databases. Prior to upgrading, be sure to:

  1. Back-up the Cloudera Manager server and management databases that store configuration, monitoring, and reporting data. (These include the databases that contain all the information about what services you have configured, their role assignments, all configuration history, commands, users, and running processes.)
  2. Back-up all databases (if you don’t already have regularly scheduled backup procedures), including the Apache Hive Metastore Server, Apache Sentry server (contains authorization metadata), Cloudera Navigator Audit Server (contains auditing information), Cloudera Navigator Metadata Server (contains authorization, policies, and audit report metadata), Apache Sqoop Metastore, Hue, Apache Oozie, and Apache Sqoop.
  3. Back-up NameNode metadata by locating the NameNode Data Directories in the HDFS service and back up a listed directory (you only need to make a backup of one directory if more than one is listed)

Note: Cloudera Manager provides an integrated, easy-to-use management solution for enabling Backup and Disaster Recovery and the key capabilities are fully integrated into the Cloudera Manager Admin Console. It also is automated and fault tolerant.

Cloudera Manager makes it easy to manage data stored in HDFS and accessed through Hive. You can define your backup and disaster recovery policies and apply them across services. You can select the key datasets that are critical to your business, schedule and track the progress of data replication jobs, and get notified when a replication job fails. Replication can be set up on files or directories in the case of HDFS and on tables in the case of Hive. Hive metastore information is also replicated which means that table definitions are updated. (Please refer to the BDR documentation for more details.)

A separate Disaster Recovery cluster is not required for a safe upgrade but the Backup and Disaster Recovery capability in Cloudera Manager can ease the upgrade process by ensuring the critical parts of your infrastructure are backed up before you take the upgrade plunge.

Recommended Practices for Upgrading to CDH 5
  1. Create fine-grained, step-by-step production plan for critical upgrades (using the Upgrade Documentation as a reference).
  2. Document the current deployment by chronicling the existing cluster environment and dependencies, including
    • The current CDH and Cloudera Manager versions that are installed
    • All third-party tools that interact with the cluster
    • The databases for Cloudera Manager, Hive, Oozie, and Hue
    • Important job performance metrics so pre-upgrade baselines are well defined
  3. Test the production upgrade plan in a non-production environment (e.g. sandbox or test environment) so you can update the plan if there are unexpected outcomes. It also allows you to:
    • Test job compatibility with the new version
    • Run performance tests
  4. Upgrade to Cloudera Manager 5 before upgrading to CDH 5.
    • Ensure the Cloudera Manager minor version is equal to or greater than the target CDH minor version—the Cloudera Manager version must always be equal to or greater than the CDH version to which you upgrade.
  5. Reserve a maintenance window with enough time allotted to perform all steps.
    • For a major upgrade on production clusters, Cloudera recommends allocating up to a full-day maintenance window to perform the upgrade (but time is dependent on the number of hosts, the amount of Hadoop experience, and the particular hardware). Note that it is not possible to perform a rolling upgrade from CDH 4 to CDH 5 (major upgrade) due to incompatibilities between the two major versions.
  6. Maintain your own local Cloudera Manager and CDH package/parcel repositories to protect against external repositories being unavailable.
    • Read the reference documentation for details on how to create a local Yum repository, or
    • Pre-download a parcel to a local parcel repository on the Cloudera Manager server, where it is available for distribution to the other nodes in any of your clusters managed by this Cloudera Manager server. You can have multiple parcels for a given product downloaded to your Cloudera Manager server. Once a parcel has been downloaded to the server, it will be available for distribution on all clusters managed by the server. (Note: Parcel and package installations are equally supported by the Upgrade Wizard. Using parcels is the preferred and recommended way, as packages must be manually installed, whereas parcels are installed by Cloudera Manager. See this FAQ and this blog post to learn more about parcels.)
  7. Ensure there are no Oozie workflows in RUNNING or SUSPENDED status as the Oozie database upgrade will fail and you will have to reinstall CDH 4 to complete or kill those running workflows. (Note: When upgrading from CDH 4 to CDH 5, the Oozie upgrade can take a very long time. You can reduce this time by reducing the amount of history Oozie retains; see the documentation.)
  8. Import MapReduce configurations to YARN as part of the Upgrade Wizard. (Note: If you do not import configurations during upgrade, you can manually import the configurations at a later time. In addition to importing configuration settings, the import process will configure services to use YARN as the MapReduce computation framework instead of MapReduce and overwrites existing YARN configuration and role assignments.)

These recommendations and notable points to address when planning an upgrade to a Cloudera cluster are intended to complement the upgrade documentation that is provided for Cloudera Manager and CDH. As mentioned, Cloudera Manager streamlines the upgrade process and strives to prevent job failures by making upgrades simple and predictable—which is especially necessary for production clusters.

Cloudera’s enterprise data hub is constantly evolving with more production-ready capabilities and innovative tools. To ensure the highest level of functionality and stability, consider upgrading to the most recent version of CDH.

Categories: Hadoop

How Edmunds.com Used Spark Streaming to Build a Near Real-Time Dashboard

Cloudera Blog - Tue, 03/24/2015 - 16:15

Thanks to Sam Shuster, Software Engineer at Edmunds.com, for the guest post below about his company’s use case for Spark Streaming, SparkOnHBase, and Morphlines.

Every year, the Super Bowl brings parties, food and hopefully a great game to appease everyone’s football appetites until the fall. With any event that brings in around 114 million viewers with larger numbers each year, Americans have also grown accustomed to commercials with production budgets on par with television shows and with entertainment value that tries to rival even the game itself.

Some of the big spenders every year are car manufacturers. Edmunds.com is a car shopping website where people from all across the nation come to read reviews, compare prices, and in general get help in all matters car related. Because of Edmunds’ place as a destination for automobile news and facts, Super Bowl car commercials do indeed mean traffic spikes to make and model specific pages.

For the last few years, business analysts at Edmunds have used this unique opportunity of unfettered advertising cause and effect as a way to create awareness by sending out updates during the game to interested parties concerning how the commercials have affected site traffic for particular make and model pages. Unfortunately, in the past, these updates have been restricted to hourly granularities with an additional hour delay. Furthermore, as this data was not available in an easy-to-use dashboard, manual processing was needed to visualize the data.

As our team began to transition from MapReduce to Apache Spark, we saw this use case as a perfect opportunity to explore a solution via Spark Streaming. The goal was to build a near real-time dashboard that would provide both unique visitor and page view counts per make and make/model that could be engineered in a couple of weeks.

The Plan

Here is our prototype architecture (by no means optimal) that takes Apache Web Server loglines describing user event information, aggregates loglines by visitor id and make/model page information to get unique visitor and page view counts and finally outputs this information to be visualized by a near real-time dashboard.

Why Flume?

As we were already using Apache Flume for ingesting our logs, using a Flume Spark Streaming sink was an obvious choice. In addition to being more reliable, the polling sink does not require the Flume agent to be restarted every time the Spark Streaming job is restarted.

Unfortunately, due to the limitations of our media-tier Flume agent configuration, which currently spools the log files on five-minute intervals, we are currently limited to a streaming micro-batch size of five minutes as well. Eventually, the goal will be to change the production media flume agents to tail the logs so that data is flowing at a constant rate. Every other stage of our architecture could easily handle the load at 30 seconds or less so there is no reason other than our team not being in control over this Flume agent’s configuration that we have not been able to break 5 minutes.

Why Spark Streaming?

While we have had success using a Flume Solr sink combined with custom Morphlines to go directly from Flume to Apache Solr for log lines with out aggregates, we needed something that was able to perform complicated aggregations quickly, which is why Spark Streaming was necessary.

Why HBase and Lily?

At this point, as the end goal for this application was Banana (the dashboard tool that reads directly from Solr) you might wonder why we decided to include Apache HBase and Lily HBase Indexer as added complications to an already fairly lengthy pipeline. There were a couple of reasons:

  • The existence of the Cloudera Labs project, SparkOnHBase. (This blog post explains more about the project and how to incorporate it into your Spark jobs.) This library provides an easy-to-use interface for connecting Spark batch and streaming jobs to HBase. Writing directly to Solr would have required an entirely new library, with functionality very similar to what already exists in the SparkOnHBase project.
  • Our existing processing ecosystem features HBase as an important data sink/source and ideally we would want streaming data to be available in HBase for other processes.
  • Having HBase as an intermediate data store means that we have more flexibility if we ever decide to change our front-end dashboard.
  • Finally, in the event Solr was to crash, HBase has all data replicated and Lily could be used to repopulate Solr.
Why Solr and Banana?

A main reason for wishing to include Solr is that it exposes the data in a fashion that makes it easily accessible to others through a rest interface and queries.

As for dashboards, we did briefly consider other tools like Graphite but found that for this initial prototype that the flexibility, ease of use, and customizability of Banana was perfect for our use case and lack of expertise in the area. Plus, Banana is free.


We want to calculate two different metrics:

  • Page View Counts
  • Unique Visitor Counts

We want to compute the above metrics for:

  • Every make
  • Every model

Finally, we want to have two time windows:

  • Cumulative count that refreshes at midnight eastern time
  • Count that is on the scale of the micro batch size of the streaming process

While eventually Edmunds will want to aggregate data using windowing functions to obtain counts for other time periods (hourly for example), for this prototype we restricted ourselves to only aggregations for every micro batch and a cumulative count through using the updateStateByKey function. While this cumulative statistic can be trivially computed as the sum of the microbatch values for page views, unique visitors require that this cumulative count be computed separately.

Saving every visitor to determine uniqueness for a 24-hour time period would be resource intensive, so we decided to use the Twitter algebird implementation of an approximate streaming algorithm called HyperLogLog. The state stored by the DStream call is thus the HLL object itself, which represents an approximate set of the visitors seen so far as well as the cardinality of that set. For those of who want to know how close the approximation came to actual data, for a 24-bit HLL we had an average % error of 0.016% and a standard deviation of 0.106%—so it performed very well while taking up a fixed, small memory cost for each make and make model count.

Here is an example flow of the aggregation of the unique visitors metrics, which ends with outputting two DStreams for the two different time windows:

Working with HBase

While performing scans on HBase is easy to do using the Spark API, doing Puts, Gets, and other useful operations on HBase is much trickier. The SparkOnHBase library provides a simple API that abstracts much of the lower level operations that are required to achieve those operations. This specific streaming job uses the streamBulkPut method. For every count, we put a row key that is comprised of the make, model, data point, aggregation type and timestamp. The reason for including the timestamp in the row key itself is so that every value is written to its own row without using versioning. Finally, we also put these values under different qualifiers for each of the attributes so that the Lily Indexer Morphline can easy transform those values into the Solr fields without having to parse the row key.

Lily Indexer Morphline and Solr Schema

Here is a snippet of the Morphline used by Lily Indexer to transform the HBase data into the Solr schema. The format of the Solr schema was primary chosen based on the limitations of Banana.

extractHBaseCells {           mappings : [             {               inputColumn : "data:count"               outputField : "count"               type : long               source : value             },             {               inputColumn : "data:timestamp"               outputField : "timestamp"               type : string               source : value             },

Morphlines is an open source framework (inside Kite SDK) with the sole intent to make ETL processes as painless to create, and as highly configurable, as possible. Basically, a Morphline file provides a sequence of commands that you wish to apply to your data and that are powerful enough to even fully replace such things as log processing applications (which we have done with a different Flume -> Morphlines -> Solr process).

Morphlines in this pipeline are used by Lily Indexer as the logic for how to transform HBase values into Solr values. For example, the first mapping above says that the input column is in the column family called data and the qualifier count and it is the value. This field is to be put into the Solr field count.

<field name="timestamp" type="date" indexed="true" stored="true" /> <field name="count" type="long" indexed="false" stored="true"/> <field name="valueType" type="string" indexed="true" stored="true"/> <field name="aggregationType" type="string" indexed="true" stored="true"/> <field name="make" type="string" indexed="true" stored="true" default="EMPTY"/> <field name="makeModel" type="string" indexed="true" stored="true" default="EMPTY"/> <field name="id" type="string" indexed="true" stored="true"/>

Banana then can make time range queries to the Solr server based on the above schema to create time-series plots. The below plot shows page views per minute between at 14:00 to 21:00 PST on Jan. 25, 2015, for each make.

Page views per a minute per make on a normal Sunday. This is the Banana panel (histogram) that supports grouping by a separate Solr field (in this case, make).

This is the query that generates the above plot (note timestamps are actually in GMT):

q=*%3A*&df=id&wt=json&rows=100000&fq=timestamp:[2015-02-01T22:00:00.000Z%20TO%202015-02-02T05:00:00.000Z]&fl=timestamp count&group=true&group.field=make&group.limit=100000&fq=aggregationType:PVCURR -make:EMPTY

You then can choose how often you wish Banana to query Solr for updates and voila, you have your streaming near real-time dashboard!


So you might be skeptical that we would actually see detectable differences to make and make model specific pages during the Super Bowl. Would people really visit Edmunds.com while the game is happening? Well, compare the previous snapshot to the snapshot below, which is for 16:00 to 21:00 on Super Bowl Sunday 2015 (on Feb. 1).

Super Bowl Sunday page views per a minute. Note the peaks which blow the normal noise maximum of 450 out of the water!

Super Bowl XLIX took place from 3:30pm PST to 7:30pm PST and as you can see as compared to normal site traffic there are huge spikes up to around 5500 page views for Lexus near half time. This is as compared to around 150 page views on a normal Sunday evening. No statistical tests are needed to determine if those are significant increases!

Let’s look at a specific instance. At around 5:55pm (unfortunately we do not have exact times of when the commercials aired), there was a Kia Sorento commercial that featured Pierce Brosnan in a James Bond spoof. Kia is a much less popular make on Edmunds.com in general, so we see a much smaller increase in unique visitors to Kia pages – going up to around 1,049 unique visitors at its peak at 6:10pm. This commercial, however, meant that Kia Sorento finished as the third-most viewed model for the day.

Kia unique visitors per 5 minutes. Notice the peak at 6:10 to 1049 unique visitors.

Kia Sorento unique visitors per 5 minutes. Note the increase at 6:10 to 923 unique visitors.


Kia Sorento cumulative unique visitors. Note the huge increase at 6:10 which helped Kia Sorento finish as third-most visited model at Edmunds.com on Super Bowl Sunday.


As I have hopefully have demonstrated, the Spark Streaming prototype was a success and satisfied all of our requirements in being able to present near real time updates of unique visitors and page views to make and make model pages on Edmunds.com. The fact that the system was able to be put together in a tight timeframe and was reliable enough to be used on a live Super Bowl campaign is a testament to the conciseness and relative ease of the new way of thinking that is Spark. So, what are you waiting for?

Categories: Hadoop

How-to: Quickly Configure Kerberos for Your Apache Hadoop Cluster

Cloudera Blog - Mon, 03/23/2015 - 16:12

Use the scripts and screenshots below to configure a Kerberized cluster in minutes.

Kerberos is the foundation of securing your Apache Hadoop cluster. With Kerberos enabled, user authentication is required. Once users are authenticated, you can use projects like Apache Sentry for role-based access control via GRANT/REVOKE statements. 

Taming the three-headed dog that guards the gates of Hades is challenging, so Cloudera has put significant effort into making this process easier in Hadoop-based enterprise data hubs. In this post, you’ll learn how to stand-up a one-node cluster with Kerberos enforcing user authentication, using the Cloudera QuickStart VM as a demo environment.

If you want to read the product documentation, it’s available here. You should consider this reference material; I’d suggest reading it later to understand more details about what the scripts do.


You need the following downloads to follow along.

Initial Configuration

Before you start the QuickStart VM, increase the memory allocation to 8GB RAM and increase the number of CPUs to two. You can get by with a little less RAM, but we will have everything including the Kerberos server running on one node.

Start up the VM and activate Cloudera Manager as show here:

Give this script some time to run, it has to restart the cluster.

KDC Install and Setup Script

The script goKerberos_beforeCM.sh does all the setup work for the Kerberos server and the appropriate configuration parameters. The comments are designed to explain what is going on inline. (Do not copy and paste this script! It contains unprintable characters that are pretending to be spaces. Rather, download it.)

#!/bin/bash # (c) copyright 2014 martin lurie sample code not supported # reminder to activate CM in the quickstart echo Activate CM in the quickstart vmware image echo Hit enter when you are ready to proceed # pause until the user hits enter read foo # for debugging - set -x # fix the permissions in the quickstart vm # may not be an issue in later versions of the vm # this fixes the following error # failed to start File /etc/hadoop must not be world # or group writable, but is 775 # File /etc must not be world or group writable, but is 775 # # run this as root # to become root # sudo su - cd /root chmod 755 /etc chmod 755 /etc/hadoop # install the kerberos components yum install -y krb5-server yum install -y openldap-clients yum -y install krb5-workstation # update the config files for the realm name and hostname # in the quickstart VM # notice the -i.xxx for sed will create an automatic backup # of the file before making edits in place # # set the Realm # this would normally be YOURCOMPANY.COM # in this case the hostname is quickstart.cloudera # so the equivalent domain name is CLOUDERA sed -i.orig 's/EXAMPLE.COM/CLOUDERA/g' /etc/krb5.conf # set the hostname for the kerberos server sed -i.m1 's/kerberos.example.com/quickstart.cloudera/g' /etc/krb5.conf # change domain name to cloudera sed -i.m2 's/example.com/cloudera/g' /etc/krb5.conf # download UnlimitedJCEPolicyJDK7.zip from Oracle into # the /root directory # we will use this for full strength 256 bit encryption mkdir jce cd jce unzip ../UnlimitedJCEPolicyJDK7.zip # save the original jar files cp /usr/java/jdk1.7.0_67-cloudera/jre/lib/security/local_policy.jar local_policy.jar.orig cp /usr/java/jdk1.7.0_67-cloudera/jre/lib/security/US_export_policy.jar US_export_policy.jar.orig # copy the new jars into place cp /root/jce/UnlimitedJCEPolicy/local_policy.jar /usr/java/jdk1.7.0_67-cloudera/jre/lib/security/local_policy.jar cp /root/jce/UnlimitedJCEPolicy/US_export_policy.jar /usr/java/jdk1.7.0_67-cloudera/jre/lib/security/US_export_policy.jar # now create the kerberos database # type in cloudera at the password prompt echo suggested password is cloudera kdb5_util create -s # update the kdc.conf file sed -i.orig 's/EXAMPLE.COM/CLOUDERA/g' /var/kerberos/krb5kdc/kdc.conf # this will add a line to the file with ticket life sed -i.m1 '/dict_file/a max_life = 1d' /var/kerberos/krb5kdc/kdc.conf # add a max renewable life sed -i.m2 '/dict_file/a max_renewable_life = 7d' /var/kerberos/krb5kdc/kdc.conf # indent the two new lines in the file sed -i.m3 's/^max_/ max_/' /var/kerberos/krb5kdc/kdc.conf # the acl file needs to be updated so the */admin # is enabled with admin privileges sed -i 's/EXAMPLE.COM/CLOUDERA/' /var/kerberos/krb5kdc/kadm5.acl # The kerberos authorization tickets need to be renewable # if not the Hue service will show bad (red) status # and the Hue “Kerberos Ticket Renewer” will not start # the error message in the log will look like this: # kt_renewer ERROR Couldn't renew # kerberos ticket in # order to work around Kerberos 1.8.1 issue. # Please check that the ticket for 'hue/quickstart.cloudera' # is still renewable # update the kdc.conf file to allow renewable sed -i.m3 '/supported_enctypes/a default_principal_flags = +renewable, +forwardable' /var/kerberos/krb5kdc/kdc.conf # fix the indenting sed -i.m4 's/^default_principal_flags/ default_principal_flags/' /var/kerberos/krb5kdc/kdc.conf # There is an addition error message you may encounter # this requires an update to the krbtgt principal # 5:39:59 PM ERROR kt_renewer # #Couldn't renew kerberos ticket in order to work around # Kerberos 1.8.1 issue. Please check that the ticket # for 'hue/quickstart.cloudera' is still renewable: # $ kinit -f -c /tmp/hue_krb5_ccache #If the 'renew until' date is the same as the 'valid starting' # date, the ticket cannot be renewed. Please check your # KDC configuration, and the ticket renewal policy # (maxrenewlife) for the 'hue/quickstart.cloudera' # and `krbtgt' principals. # # # we need a running server and admin service to make this update service krb5kdc start service kadmin start kadmin.local <<eoj modprinc -maxrenewlife 1week krbtgt/CLOUDERA@CLOUDERA eoj # now just add a few user principals #kadmin: addprinc -pw <Password> # cloudera-scm/admin@YOUR-LOCAL-REALM.COM # add the admin user that CM will use to provision # kerberos in the cluster kadmin.local <<eoj addprinc -pw cloudera cloudera-scm/admin@CLOUDERA modprinc -maxrenewlife 1week cloudera-scm/admin@CLOUDERA eoj # add the hdfs principal so you have a superuser for hdfs kadmin.local <<eoj addprinc -pw cloudera hdfs@CLOUDERA eoj # add a cloudera principal for the standard user # in the Cloudera Quickstart VM kadmin.local <<eoj addprinc -pw cloudera cloudera@CLOUDERA eoj # test the server by authenticating as the CM admin user # enter the password cloudera when you are prompted echo use kinit to get a valid ticket to access the cluster kinit cloudera-scm/admin@CLOUDERA # once you have a valid ticket you can see the # characteristics of the ticket with klist -e # you will see the encryption type which you will # need for a screen in the wizard, for example # Etype (skey, tkt): aes256-cts-hmac-sha1-96 klist -e # to see the contents of the files cat them cat /var/kerberos/krb5kdc/kdc.conf cat /var/kerberos/krb5kdc/kadm5.acl cat /etc/krb5.conf #The files will look like this: [root@quickstart ~]# cat /var/kerberos/krb5kdc/kdc.conf [kdcdefaults] kdc_ports = 88 kdc_tcp_ports = 88 [realms] CLOUDERA = { #master_key_type = aes256-cts acl_file = /var/kerberos/krb5kdc/kadm5.acl dict_file = /usr/share/dict/words max_renewable_life = 7d max_life = 1d admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal default_principal_flags = +renewable, +forwardable } [root@quickstart ~]# cat /var/kerberos/krb5kdc/kadm5.acl */admin@CLOUDERA * [root@quickstart ~]# cat /etc/krb5.conf [libdefaults] default_realm = CLOUDERA dns_lookup_kdc = false dns_lookup_realm = false ticket_lifetime = 86400 renew_lifetime = 604800 forwardable = true default_tgs_enctypes = aes256-cts-hmac-sha1-96 default_tkt_enctypes = aes256-cts-hmac-sha1-96 permitted_enctypes = aes256-cts-hmac-sha1-96 udp_preference_limit = 1 [realms] CLOUDERA = { kdc = quickstart.cloudera admin_server = quickstart.cloudera }

Cloudera Manager Kerberos Wizard

After running the script, you now have a working Kerberos server and can secure the Hadoop cluster. The wizard will do most of the heavy lifting; you just have to fill in a few values.

To start, log into Cloudera Manager by going to http://quickstart.cloudera:7180 in your browser. The userid is cloudera and the password is cloudera. (Almost needless to say but never use “cloudera” as a password in a real-world setting.)

There are lots of productivity tools here for managing the cluster but ignore them for now and head straight for the Administration > Kerberos wizard as shown in the next screenshot.

Click on the “Enable Kerberos” button.

The four checklist items were all completed by the script you’ve already run. Check off each item and select “Continue.”

The Kerberos Wizard needs to know the details of what the script configured. Fill in the entries as follows:

  • KDC Server Host: quickstart.cloudera
  • Kerberos Security Realm: CLOUDERA
  • Kerberos Encryption Types: aes256-cts-hmac-sha1-96

Click “Continue.”

Do you want Cloudera Manager to manage the krb5.conf files in your cluster? Remember, the whole point of this blog post is to make Kerberos easier. So, please check “Yes” and then select “Continue.”

The Kerberos Wizard is going to create Kerberos principals for the different services in the cluster. To do that it needs a Kerberos Administrator ID. The ID created is: cloudera-scm/admin@CLOUDERA.

The screen shot shows how to enter this information. Recall the password is: cloudera.

The next screen provides good news. It lets you know that the wizard was able to successfully authenticate. 

OK, you’re ready to let the Kerberos Wizard do its work. Since this is a VM, you can safely select “I’m ready to restart the cluster now” and then click “Continue.” You now have time to go get a coffee or other beverage of your choice.

How long does that take? Just let it work.

Congrats, you are now running a Hadoop cluster secured with Kerberos.

Kerberos is Enabled. Now What?

The old method of su - hdfs will no longer provide administrator access to the HDFS filesystem. Here is how you become the hdfs user with Kerberos:

kinit hdfs@CLOUDERA

Now validate you can do hdfs user things:

hadoop fs -mkdir /eraseme hadoop fs -rmdir /eraseme

Next, invalidate the Kerberos token so as not to break anything:


The min.user parameter needs to be fixed per the message below:

Requested user cloudera is not whitelisted and has id 501, which is below the minimum allowed 1000 Must kinit prior to using cluster

This is the error message you get without fixing min.user.id:

Application initialization failed (exitCode=255) with output: Requested user cloudera is not whitelisted and has id 501, which is below the minimum allowed 1000

Save the changes shown above and restart the YARN service. Now validate that the cloudera user can use the cluster:

kinit cloudera@CLOUDERA hadoop jar /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar pi 10 10000

If you forget to kinit before trying to use the cluster you’ll get the errors below. The simple fix is to use kinit with the principal you wish to use.

# force the error to occur by eliminating the ticket with kdestroy [cloudera@quickstart ~]$ kdestroy [cloudera@quickstart ~]$ hadoop fs -ls 15/01/12 08:21:33 WARN security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/01/12 08:21:33 WARN ipc.Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/01/12 08:21:33 WARN security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:KERBEROS) cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] ls: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; Host Details : local host is: "quickstart.cloudera/"; destination host is: "quickstart.cloudera":8020; [cloudera@quickstart ~]$ hadoop fs -put /etc/hosts myhosts 15/01/12 08:21:47 WARN security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:KERBEROS) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/01/12 08:21:47 WARN ipc.Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] 15/01/12 08:21:47 WARN security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:KERBEROS) cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] put: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; Host Details : local host is: "quickstart.cloudera/"; destination host is: "quickstart.cloudera":8020;

Congratulations, you have a running Kerberos cluster!

Marty Lurie is a Systems Engineer at Cloudera.

Categories: Hadoop

Converting Apache Avro Data to Parquet Format in Apache Hadoop

Cloudera Blog - Fri, 03/20/2015 - 16:13

Thanks to Big Data Solutions Architect Matthieu Lieber for allowing us to republish the post below.

A customer of mine wants to take advantage of both worlds: work with his existing Apache Avro data, with all of the advantages that it confers, but take advantage of the predicate push-down features that Parquet provides. How to reconcile the two?

For more information about combining these formats, see this.

For a quick recap on Avro, see my previous post. While you are at it, see why Apache Avro is currently the gold standard in the industry.

What we are going to demonstrate here: how to take advantage of existing tools to convert our existing Avro format into Apache Parquet (incubating at the time of this writing), and make sure we can query that transformed data.

Parquet Data

First let’s try to convert text data to Parquet, and read it back. Fortunately there is some code already from Cloudera to do this in MapReduce

The code from Cloudera: https://github.com/cloudera/parquet-examples, and doc here lets you read and write Parquet data. Let’s try this.

First, let’s create some Parquet data as input. We will use Hive for this, by directly converting Text data into Parquet.

Parquet Conversion
    1. Let’s create a csv data example, and create a text table (here, just 2 columns of integers) in HDFS pointing to it:


      create table mycsvtable (x int, y int) row format delimited ELDS TERMINATED BY ',' STORED AS TEXTFILE; LOAD DATA LOCAL INPATH '/home/cloudera/test/' OVERWRITE INTO TABLE mycsvtable;

    2. Create a Parquet table in Hive, and convert the data to it:

      create table myparquettable (a INT, b INT) STORED AS PARQUET LOCATION '/tmp/data'; insert overwrite table myparquettable select * from mycsvtable;

    3. You will need to add Hadoop and Parquet libraries relevant to the project in say, Eclipse for the code needed to be built; therefore, all of the links to the proper libs needed to be added. We then export the code as a JAR (File->Export as Running Jar) and run it outside of Eclipse (otherwise, some Hadoop security issues ensue that prevent you to run the code).
    4. Run the program (you could also run java instead of Hadoop if you copy the data from hdfs to local disk). The arguments are: inputData as Parquet / outputData as csv. We just want to ensure that we can read the Parquet data and display it.

      $ sudo hadoop -jar ./testparquet.jar hdfs:///home/cloudera/test/data/000000_0 hdfs:///home/cloudera/test/dataparquet

      See result: (csv file):

      $ more test/dataparquet2/part-m-00000 1,2 3,4 5,6

Avro Data Conversion Avro Data Example

Let’s get some Avro data example working, from this post.

Avro Data Generation

Interestingly Hive doesn’t let you load/convert csv data into Avro like we did in the Parquet example.  

Let’s walk through an example of creating an Avro schema with its IDL, and generating some data. Let’s use this example, with this twitter.avsc schema:

{ "type" : "record", "name" : "twitter_schema", "namespace" : "com.miguno.avro", "fields" : [ { "name" : "username", "type" : "string", "doc" : "Name of the user account on Twitter.com" }, { "name" : "tweet", "type" : "string", "doc" : "The content of the user's Twitter message" }, { "name" : "timestamp", "type" : "long", "doc" : "Unix epoch time in seconds" } ], "doc:" : "A basic schema for storing Twitter messages" }

and some data in twitter.json:

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 } {"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 }

We will convert the data (in Json) into binary Avro format.

$ java -jar ~/avro-tools-1.7.7.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro

Transformation from Avro to Parquet Storage Format

So essentially use the best of both worlds: take advantage of the Avro object model and serialization format of Avro, and combine it with the columnar storage format of Parquet.

First we will reuse our Avro data that was created earlier.

          1. We will then take advantage of this code to convert the Avro data to Parquet data. This is a map-only job that simply sets up the right input and output format according to what we want.
          2. After compilation, let’s run the script on our existing Avro data:


            $  hadoop jar avro2parquet.jar hdfs:///user/cloudera/twitter.avsc hdfs:///user/cloudera/inputdir hdfs:///user/cloudera/outputdir

            We get:

            $ hadoop fs -ls /user/cloudera/outputdir Found 3 items -rw-r--r-- /user/cloudera/outputdir2/_SUCCESS -rw-r--r-- 1 cloudera cloudera /user/cloudera/outputdir2/_metadata -rw-r--r-- 1 cloudera cloudera /user/cloudera/outputdir2/part-m-00000.snappy.parquet

            Note that the Avro schema is converted directly to a Parquet-compatible format.


          3. Now let’s test our result in Hive. We first create a Parquet table (note the simple syntax in Hive 0.14+), then point to the data we just created via a LOAD command, and finally query our converted data directly.
            hive> create table tweets_parquet (username string, tweet string, timestamp bigint) STORED AS PARQUET; OK load data inpath '/user/cloudera/outputdir/part-m-00000.snappy.parquet' overwrite into table tweets_parquet; Loading data to table default.tweets_parquet chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/tweets_parquet/part-m-00000.snappy.parquet': User does not belong to hive Table default.tweets_parquet stats: [numFiles=1, numRows=0, totalSize=1075, rawDataSize=0] OK Time taken: 6.712 seconds hive select * from tweets_parquet; OK miguno Rock: Nerf paper, scissors is fine. 1366150681 BlizzardCS Works as intended. Terran is IMBA. 1366154481 Time taken: 1.107 seconds, Fetched: 2 row(s)

            Parquet with Avro

            Let’s see verify our Parquet schema now that it is converted; note that the schema still refers to Avro:

            $ hadoop parquet.tools.Main schema outputdir/part-m-00000.snappy.parquet message com.miguno.avro.Tweet { required binary username (UTF8); required binary tweet (UTF8); required int64 timestamp; } $ hadoop parquet.tools.Main meta outputdir/part-m-00000.snappy.parquet creator: parquet-mr extra: avro.schema = {"type":"record","name":"Tweet","namespace" file schema: com.miguno.avro.Tweet ------------------------------------------------------ username: REQUIRED BINARY O:UTF8 R:0 D:0 tweet: REQUIRED BINARY O:UTF8 R:0 D:0 timestamp: REQUIRED INT64 R:0 D:0 row group 1: RC:2 TS:297 ---------------------------------------------------------username: BINARY SNAPPY DO:0 FPO:4 SZ:67/65/0.97 VC:2 ENC:PLAIN,BIT_PACKED tweet: BINARY SNAPPY DO:0 FPO:71 SZ:176/175/0.99 VC:2 ENC:PLAIN timestamp: INT64 SNAPPY DO:0 FPO:247 SZ:59/57/0.97 VC:2 ENC:PLAIN,BIT_PACKED $

That concludes our exercise! Let me know if additional questions.


Categories: Hadoop

How-to: Build Re-usable Spark Programs using Spark Shell and Maven

Cloudera Blog - Tue, 03/17/2015 - 14:57

Set up your own, or even a shared, environment for doing interactive analysis of time-series data.

Although software engineering offers several methods and approaches to produce robust and reliable components, a more lightweight and flexible approach is required for data analysts—who do not build “products” per se but still need high-quality tools and components. Thus, recently, I tried to find a way to re-use existing libraries and datasets stored already in HDFS with Apache Spark.

The use case involved information flow analysis based on time series and network data. In this use case, all measured data (primary data) is stored in time series buckets, which are Hadoop SequenceFiles with keys of type Text and values of type VectorWritable (from Apache Mahout 0.9). In my testing, I found the Spark shell to be a useful tool for doing interactive data analysis for this purpose, especially since the code involved can be modularized, re-used, and even shared.

In this post, you’ll learn how to set up your own, or even a shared, environment for doing interactive data analysis of time series within the Spark shell. Instead of developing an application, you will use Scala code snippets and third-party libraries to create reusable Spark modules.

What is a Spark Module?

Using existing Java classes inside the Spark shell requires a solid deployment procedure and some dependency management. In addition to the Scala Simple Build tool (sbt), Apache Maven is really useful, too.

Figure 1: For simple and reliable usage of Java classes and complete third-party libraries, we define a Spark Module as a self-contained artifact created by Maven. This module can easily be shared by multiple users.

For this use case, you will need to create a single jar file containing all dependencies. In some cases, it is also really helpful to provide some Library wrapper tools. Such Helper classes should be well tested and documented. That way, you can achieve a kind of decoupling between data analysis and software development tasks.

Next, let’s go through the steps of creating this artifact.

Set Up a New Maven Project

First, confirm you have Java 1.7 and Maven 3 installed. Create a new directory for your projects and use Maven to prepare a project directory.

$> mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DgroupId=org.etosha -DartifactId=infodynamics -Dfilter=org.apache.maven.archetypes:maven-archetype-quickstart

Maven will ask:

Choose archetype: 1: remote -> org.apache.maven.archetypes:maven-archetype-quickstart (An archetype which contains a sample Maven project.) Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): 1: Choose org.apache.maven.archetypes:maven-archetype-quickstart version: 1: 1.0-alpha-1 2: 1.0-alpha-2 3: 1.0-alpha-3 4: 1.0-alpha-4 5: 1.0 6: 1.1

Select (6) to work with Spark 1.1 locally or another number according to the settings for your cluster.

Now, add some dependencies to the automatically generated POM file.

<dependency> <groupId>org.apache.mahout</groupId> <artifactId>mahout-math</artifactId> <version>0.9</version> <scope>provided</scope> <!-- use scope jar to ship it as part of your module --> </dependency> <dependency> <groupId>org.apache.mahout</groupId> <artifactId>mahout-core</artifactId> <version>0.9</version> <scope>provided</scope> <!-- use scope jar to ship it as part of your module --> </dependency>

Mahout 0.9 libraries are also dependencies of Spark so you will need to add the scope “provided” to the dependency section—otherwise, Maven will load the library and all classes will be added to your final single jar file. (As our time-series buckets are SequenceFiles and contain objects of type VectorWritable, they require this version of Mahout.)

Another reason to package third-party libraries is for creating charts inside the Spark shell. If you have Gnuplot, it is really easy to plot results with the Scalaplot library. Just add this dependency definition to your pom.xml file and you are ready to plot:

<dependency> <groupId>org.sameersingh.scalaplot</groupId> <artifactId>scalaplot</artifactId> <version>0.0.3</version> </dependency>

In this specific scenario the plan is to do some interactive time-series analysis within the Spark shell. First, you’ll want to evaluate the datasets and algorithms. You have to learn more about the domain before a custom application can be built and deployed. Finally, you can use Apache Oozie actions to execute the code but even in this case all third-party libraries have to be available as one artifact.

It is worthwhile to invest some minutes in building such a single jar file—especially for projects that are more than just a hack—with all dependencies and to share this artifact among all the data scientists in your group.

But what about libraries that are not available in Maven Central–such as those on Sourceforge or Google Code?

Download and Deploy a Third-Party-Library as Part of a Spark Module

You’ll need to prepare a location for all third-party libraries that are not available via Maven Central but are required in this particular project.

$> mkdir libs $> cd libs

Now download the required artifacts, e.g. the JIDT library from Google Code, and decompress the zip file:

$> wget http://information-dynamics-toolkit.googlecode.com/files/infodynamics-jar-0.1.4.zip $> unzip infodynamics-jar-0.1.4.zip

Maven can deploy the artifact for you using the mvn deploy:deploy-file goal:

$> mvn deploy:deploy-file \ -Durl=file:///home/training/.m2/repository \ -Dfile=libs/infodynamics.jar \ -DgroupId=org.etosha \ -DartifactId=EXT.infodynamics \ -Dpackaging=jar -Dversion=0.1.4

Now, you are ready to add this locally available library to the dependencies section of the POM file of the new Spark Module project:

<dependency> <groupId>org.etosha</groupId> <artifactId>EXT.infodynamics</artifactId> <version>0.1.4</version> </dependency>

The next step is to add the Maven Assembly Plugin to the plugins-section in the pom.xml file. It manages the merge procedure for all available JAR files during the build.

<build> <pluginManagement> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>MyMAINClass</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </pluginManagement> </build>

Use the above build snippet and place it inside the project section.

Now you are ready to run the Maven build.

$> mvn clean assembly:single

The result will be a single JAR file with defined libraries built in. The file is located in the target directory. As a next step, run the Spark shell and test the settings.

Run and Test the Single-JAR Spark-Module

To run Spark in interactive mode via Spark shell, just define a variable with the name ADD_JARS. If more than one jar file should be added, use a comma-separated list of paths. Now run the Spark shell with this command:

$> ADD_JARS=target/infodynamics-SNAPHOT-1.0.jar spark-shell

A fast validation can be done via the Web UI of the Spark shell application. It is available on port 4040, so open this URL http://localhost:4040/environment/ in a browser for validation.

Figure 2: Validation of Spark environment settings. Jar files that are available to the Spark nodes is shown in the marked field. One has to specify all additional paths in the property spark.jars.

Another test can now be done inside the Spark shell: just import some of the required Java classes, such as the MatrixUtils, from the third-party library. You just have to type:

scala> import infodynamics.utils.MatrixUtils;

At this point, you may well wonder how to save your Scala code that was entered into the Spark shell. After a successful interactive session, you can simply extract your input from the Spark shell history. The Spark shell logs all commands in a file called .spark-history in the user’s home directory. Within a Linux terminal, you run the tail command to conserve the latest commands before you go on.

$> tail -n 5 .spark_history > mySession1.scala

This command allows us to conserve the commands in a simple reusable script or as a base for further development in an IDE. Now, you can run this script file containing your Scala functions and custom code just by using the :load command. Inside the Spark shell you enter:

scala> :load mySession1.scala

And don’t forget to share your code! If you want to publish this module via Github, you can quickly follow the instructions here.

Because visual investigation is an important time saver, let’s add the scalaplot library to this module. Now you can easily create some simple charts from the variables stored in the Spark shell. Because this post is not about RDDs and working with large datasets but rather about preparing the stage, follow the steps from the scalaplot documentation to plot a simple sine wave.

scala> import org.sameersingh.scalaplot.Implicits._ scala> val x = 0.0 until 2.0 * math.Pi by 0.1 scala> output(GUI, plot(x ->(math.sin(_), math.cos(_))))

If your system shows a window with two waves now and no error messages appear, you are done for today.

Congratulations, the Spark shell is now aware of your project libraries, including the plotting tools and the ”legacy” libraries containing the data types used in your SequenceFiles, all bundled in your first Spark module!


In this post, you learned how to manage and use external dependencies (especially to Java libraries) and project specific artifacts in the Spark shell. Now it is really easy to share and distribute the modules within your data analyst working group.

Mirko Kämpf is the lead instructor for the Cloudera Administrator Training for Apache Hadoop for Cloudera University.

Categories: Hadoop

Exactly-once Spark Streaming from Apache Kafka

Cloudera Blog - Mon, 03/16/2015 - 14:39

Thanks to Cody Koeninger, Senior Software Engineer at Kixer, for the guest post below about Apache Kafka integration points in Apache Spark 1.3. Spark 1.3 will ship in CDH 5.4.

The new release of Apache Spark, 1.3, includes new experimental RDD and DStream implementations for reading data from Apache Kafka. As the primary author of those features, I’d like to explain their implementation and usage. You may be interested if you would benefit from:

  • More uniform usage of Spark cluster resources when consuming from Kafka
  • Control of message delivery semantics
  • Delivery guarantees without reliance on a write-ahead log in HDFS
  • Access to message metadata

I’ll assume you’re familiar with the Spark Streaming docs and Kafka docs. All code examples are in Scala, but there are Java-friendly methods in the API.

Basic Usage

The new API for both Kafka RDD and DStream is in the spark-streaming-kafka artifact.

SBT dependency:

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.3.0"

Maven dependency:

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.3.0</version> </dependency>

To read from Kafka in a Spark Streaming job, use KafkaUtils.createDirectStream:

import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils val ssc = new StreamingContext(new SparkConf, Seconds(60)) // hostname:port for Kafka brokers, not Zookeeper val kafkaParams = Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092") val topics = Set("sometopic", "anothertopic") val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics)

The call to createDirectStream returns a stream of tuples formed from each Kafka message’s key and value. The exposed return type is InputDStream[(K, V)], where K and V in this case are both String. The private implementation is DirectKafkaInputDStream. There are other overloads of createDirectStream that allow you to access message metadata, and to specify the exact per-topic-and-partition starting offsets.

To read from Kafka in a non-streaming Spark job, use KafkaUtils.createRDD:

import kafka.serializer.StringDecoder import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange} val sc = new SparkContext(new SparkConf) // hostname:port for Kafka brokers, not Zookeeper val kafkaParams = Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092") val offsetRanges = Array( OffsetRange("sometopic", 0, 110, 220), OffsetRange("sometopic", 1, 100, 313), OffsetRange("anothertopic", 0, 456, 789) ) val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder]( sc, kafkaParams, offsetRanges)

The call to createRDD returns a single RDD of (key, value) tuples for each Kafka message in the specified batch of offset ranges. The exposed return type is RDD[(K, V)], the private implementation is KafkaRDD. There are other overloads of createRDD that allow you to access message metadata, and to specify the current per-topic-and-partition Kafka leaders.


DirectKafkaInputDStream is a stream of batches. Each batch corresponds to a KafkaRDD. Each partition of the KafkaRDD corresponds to an OffsetRange. Most of this implementation is private, but it’s still useful to understand.


An OffsetRange represents the lower and upper boundaries for a particular sequence of messages in a given Kafka topic and partition. The following data structure:

OffsetRange("visits", 2, 300, 310)

identifies the 10 messages from offset 300 (inclusive) until offset 310 (exclusive) in partition 2 of the “visits” topic. Note that it does not actually contain the contents of the messages, it’s just a way of identifying the range.

Also note that because Kafka ordering is only defined on a per-partition basis, the messages referred to by

OffsetRange("visits", 3, 300, 310)

may be from a completely different time period; even though the offsets are the same as above, the partition is different.


Recall that an RDD is defined by:

  • A method to divide the work into partitions (getPartitions).
  • A method to do the work for a given partition (compute).
  • A list of parent RDDs. KafkaRDD is an input, not a transformation, so it has no parents.
  • Optionally, a partitioner defining how keys are hashed. KafkaRDD doesn’t define one.
  • Optionally, a list of preferred hosts for a given partition, in order to push computation to where the data is (getPreferredLocations).

The KafkaRDD constructor takes an array of OffsetRanges and a map with the current leader host and port for each Kafka topic and partition. The reason for the separation of leader info is to allow for the KafkaUtils.createRDD convenience constructor that doesn’t require you to know the leaders. In that case, createRDD will do the Kafka API metadata calls necessary to find the current leaders, using the list of hosts specified in metadata.broker.list as the initial points of contact. That inital lookup will happen once, in the Spark driver process.

The getPartitions method of KafkaRDD takes each OffsetRange in the array and turns it into an RDD partition by adding the leader’s host and port info. The important thing to notice here is there is a 1:1 correspondence between Kafka partition and RDD partition. This means the degree of Spark parallelism (at least for reading messages) will be directly tied to the degree of Kafka parallelism.

The getPreferredLocations method uses the Kafka leader for the given partition as the preferred host. I don’t run my Spark executors on the same hosts as Kafka, so if you do, let me know how this works out for you.

The compute method runs in the Spark executor processes. It uses a Kafka SimpleConsumer to connect to the leader for the given topic and partition, then makes repeated fetch requests to read messages for the specified range of offsets.

Each message is converted using the messageHandler argument to the constructor. messageHandler is a function from Kafka MessageAndMetadata to a user-defined type, with the default being a tuple of key and value. In most cases, it’s more efficient to access topic and offset metadata on a per-partition basis (see the discussion of HasOffsetRanges below), but if you really need to associate each message with its offset, you can do so.

The key point to notice about compute is that, because offset ranges are defined in advance on the driver, then read directly from Kafka by executors, the messages returned by a particular KafkaRDD are deterministic. There is no important state maintained on the executors, and no notion of committing read offsets to Apache ZooKeeper, as there is with prior solutions that used the Kafka high-level consumer.

Because the compute operation is deterministic, it is in general safe to re-try a task if it fails. If a Kafka leader is lost, for instance, the compute method will just sleep for the amount of time defined by the refresh.leader.backoff.ms Kafka param, then fail the task and let the normal Spark task retry mechanism handle it. On subsequent attempts after the first, the new leader will be looked up on the executor as part of the compute method.


The KafkaRDD returned by KafkaUtils.createRDD is usable in batch jobs if you have existing code to obtain and manage offsets. In most cases however, you’ll probably be using KafkaUtils.createDirectStream, which returns a DirectKafkaInputDStream. Similar to an RDD, a DStream is defined by:

  • A list of parent DStreams. Again, this is an input DStream, not a transformation, so it has no parents.
  • A time interval at which the stream will generate batches. This stream uses the interval of the streaming context.
  • A method to generate an RDD for a given time interval (compute)

The compute method runs on the driver. It connects to the leader for each topic and partition, not to read messages, but just to get the latest available offset. It then defines a KafkaRDD with offset ranges spanning from the ending point of the last batch until the latest leader offsets.

To define the starting point of the very first batch, you can either specify exact offsets per TopicAndPartition, or use the Kafka parameter auto.offset.reset, which may be set to “largest” or “smallest” (defaults to “largest”). For rate limiting, you can use the Spark configuration variable spark.streaming.kafka.maxRatePerPartition to set the maximum number of messages per partition per batch.

Once the KafkaRDD for a given time interval is defined, it executes exactly as described above for the batch usage case. Unlike prior Kafka DStream implementations, there is no long-running receiver task that occupies a core per stream regardless of what the message volume is. For our use cases at Kixer, it’s common to have important but low-volume topics in the same job as high-volume topics. With the direct stream, the low-volume partitions result in smaller tasks that finish quickly and free up that node to process other partitions in the batch. It’s a pretty big win to have uniform cluster usage while still keeping topics logically separate.

A significant difference from the batch use case is that there is some important state that varies over time, namely the offset ranges generated at each time interval. Executor or Kafka leader failure isn’t a big deal, as discussed above, but if the driver fails, offset ranges will be lost, unless stored somewhere. I’ll discuss this in more detail under Delivery Semantics below, but you basically have three choices:

  1. Don’t worry about it if you don’t care about lost or duplicated messages, and just restart the stream from the earliest or latest offset.
  2. Checkpoint the stream, in which case the offset ranges (not the messages, just the offset range definitions) will be stored in the checkpoint.
  3. Store the offset ranges yourself, and provide the correct starting offsets when restarting the stream.

Again, no consumer offsets are stored in ZooKeeper. If you want interop with existing Kafka monitoring tools that talk to ZK directly, you’ll need to store the offsets into ZK yourself (this doesn’t mean it needs to be your system of record for offsets, you can just duplicate them there).

Note that because Kafka is being treated as a durable store of messages, not a transient network source, you don’t need to duplicate messages into HDFS for error recovery. This design does have some implications, however. The first is that you can’t read messages that no longer exist in Kafka, so make sure your retention is adequate. The second is that you can’t read messages that don’t exist in Kafka yet. To put it another way, the consumers on the executors aren’t polling for new messages, the driver is just periodically checking with the leaders at every batch interval, so there is some inherent latency.


One other implementation detail is a public interface, HasOffsetRanges, with a single method returning an array of OffsetRange. KafkaRDD implements this interface, allowing you to obtain topic and offset information on a per-partition basis.

val stream = KafkaUtils.createDirectStream(...) ... stream.foreachRDD { rdd => // Cast the rdd to an interface that lets us get a collection of offset ranges val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.mapPartitionsWithIndex { (i, iter) => // index to get the correct offset range for the rdd partition we're working on val osr: OffsetRange = offsets(i) // get any needed data from the offset range val topic = osr.topic val kafkaPartitionId = osr.partition val begin = osr.fromOffset val end = osr.untilOffset ...

The reason for this layer of indirection is because the static type used by DStream methods like foreachRDD and transform is just RDD, not the type of the underlying (and in this case, private) implementation. Because the DStream returned by createDirectStream generates batches of KafkaRDD, you can safely cast to HasOffsetRanges. Also note that because of the 1:1 correspondence between offset ranges and rdd partitions, the indexes of the rdd partitions correspond to the indexes into the array returned by offsetRanges.

Delivery Semantics

First, understand the Kafka docs on delivery semantics. If you’ve already read them, go read them again. In short: consumer delivery semantics are up to you, not Kafka.

Second, understand that Spark does not guarantee exactly-once semantics for output actions. When the Spark streaming guide talks about exactly-once, it’s only referring to a given item in an RDD being included in a calculated value once, in a purely functional sense. Any side-effecting output operations (i.e. anything you do in foreachRDD to save the result) may be repeated, because any stage of the process might fail and be retried.

Third, understand that Spark checkpoints may not be recoverable, for instance in cases where you need to change the application code in order to get the stream restarted. This situation may improve by 1.4, but be aware that it is an issue. I’ve been bitten by it before, you may be too. Any place I mention “checkpoint the stream” as an option, consider the risk involved. Also note that any windowing transformations are going to rely on checkpointing, anyway.

Finally, I’ll repeat that any semantics beyond at-most-once require that you have sufficient log retention in Kafka. If you’re seeing things like OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka storage, not because something’s wrong with Spark or Kafka.

Given all that, how do you obtain the equivalent of the semantics you want?


This could be useful in cases where you’re sending results to something that isn’t a system of record, you don’t want duplicates, and it’s not worth the hassle of ensuring that messages don’t get lost. An example might be sending summary statistics over UDP, since it’s an unreliable protocol to begin with.

To get at-most-once semantics, do all of the following:

  1. Set spark.task.maxFailures to 1, so the job dies as soon as a task fails.
  2. Make sure spark.speculation is false (the default), so multiple copies of tasks don’t get speculatively run.
  3. When the job dies, start the stream back up using the Kafka param auto.offset.reset set to “largest”, so it will skip to the current end of the log.

This will mean you lose messages on restart, but at least they shouldn’t get replayed. Probably. Test this carefully if it’s actually important to you that a message never gets repeated, because it’s not a common use case, and I’m not providing example code for it.


You’re okay with duplicate messages, but not okay with losing messages. An example of this might be sending internal email alerts on relatively rare occurrences in the stream. Getting duplicate critical alerts in a short time frame is much better than not getting them at all.

Basic options here are either:

  • Checkpoint the stream, or
  • Restart the job with auto.offset.reset set to smallest. This will replay the whole log from the beginning of your retention, so you’d better have relatively short retention or really be ok with duplicate messages.

Checkpointing the stream serves as the basis of the next option, so see the example code for it.

Exactly-once using idempotent writes

Idempotent writes make duplicate messages safe, turning at-least-once into the equivalent of exactly-once. The typical way of doing this is by having a unique key of some kind (either embedded in the message, or using topic/partition/offset as the key), and storing the results according to that key. Relying on a per-message unique key means this is useful for transforming or filtering individually valuable messages, less so for aggregating multiple messages.

There’s a complete sample of this idea at IdempotentExample.scala. It’s using Postgres for the sake of consistency with the next example, but any storage system that allows for unique keys could be used.

The important points here are that the schema is set up with a unique key and a rule to allow for no-op duplicate inserts. For this example, the message body is being used as the unique key, but any appropriate key could be used.

stream.foreachRDD { rdd => rdd.foreachPartition { iter => // make sure connection pool is set up on the executor before writing SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) iter.foreach { case (key, msg) => DB.autoCommit { implicit session => // the unique key for idempotency is just the text of the message itself, for example purposes sql"insert into idem_data(msg) values (${msg})".update.apply } } } }

In the case of a failure, the above output action can safely be retried. Checkpointing the stream ensures that offset ranges are saved as they are generated. Checkpointing is accomplished in the usual way, by defining a function that configures the streaming context (ssc) and sets up the stream, then calling


before returning the ssc. See the Streaming Guide for more details.

Exactly-once using transactional writes

For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you’re careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics, and is straightforward to use even for aggregations.

TransactionalExample.scala is a complete Spark job implementing this idea. It’s using Postgres, but any data store that has transactional semantics could be used.

The first important point is that the stream is started using the last successfully committed offsets as the beginning point. This allows for failure recovery:

// begin from the the offsets committed to the database val fromOffsets = DB.readOnly { implicit session => sql"select topic, part, off from txn_offsets". map { resultSet => TopicAndPartition(resultSet.string(1), resultSet.int(2)) -> resultSet.long(3) }.list.apply().toMap } val stream: InputDStream[Long] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Long]( ssc, kafkaParams, fromOffsets, // we're just going to count messages, don't care about the contents, so convert each message to a 1 (mmd: MessageAndMetadata[String, String]) => 1L)

For the very first time the job is run, the table can be pre-loaded with appropriate starting offsets.

The example accesses offset ranges on a per-partition basis, as mentioned in the discussion of HasOffsetRanges above. The important thing to notice about mapPartitionsWithIndex is that it’s a transformation, and there is no equivalent foreachPartitionWithIndex action. RDD transformations are generally lazy, so unless you add an output action of some kind, Spark will never schedule the job to actually do anything. Calling foreach on the RDD with an empty body is sufficient. Also, notice that some iterator methods, such as map, are lazy. If you’re setting up transient state, like a network or database connection, by the time the map is fully forced the connection may already be closed. In that case, be sure to instead use methods like foreach, that eagerly consume the iterator.

rdd.mapPartitionsWithIndex { (i, iter) => // set up some connection iter.foreach { // use the connection } // close the connection Iterator.empty }.foreach { // Without an action, the job won't get scheduled, so empty foreach to force it // This is a little awkward, but there is no foreachPartitionWithIndex method on rdds (_: Nothing) => () }

The final thing to notice about the example is that it’s important to ensure that saving the results and saving the offsets either both succeed, or both fail. Storing offsets should fail if the prior committed offset doesn’t equal the beginning of the current offset range; this prevents gaps or repeats. Kafka semantics ensure that there aren’t gaps in messages within a range of offsets (if you’re especially concerned, you could verify by comparing the size of the offset range to the number of messages).

// localTx is transactional, if metric update or offset update fails, neither will be committed DB.localTx { implicit session => // store metric data val metricRows = sql""" update txn_data set metric = metric + ${metric} where topic = ${osr.topic} """.update.apply() if (metricRows != 1) { throw new Exception("...") } // store offsets val offsetRows = sql""" update txn_offsets set off = ${osr.untilOffset} where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset} """.update.apply() if (offsetRows != 1) { throw new Exception("...") } }

The example code is throwing an exception, which will result in a transaction rollback. Other failure-handling strategies may be appropriate, as long as they result in a transaction rollback as well.

Future Improvements

Although this feature is considered experimental for Spark 1.3, the underlying KafkaRDD design has been in production at Kixer for months. It’s currently handling billions of messages per day, in batch sizes ranging from 2 seconds to 5 minutes. That being said, there are known areas for improvement (and probably a few unknown ones as well).

  • Connection pooling. Currently, Kafka consumer connections are created as needed; pooling should help efficiency. Hopefully this can be implemented in a way that integrates nicely with ongoing work towards a Kafka producer API in Spark.
  • Kafka metadata API. The class for interacting with Kafka is currently private, meaning you’ll need to duplicate some of that work if you want low-level access to Kafka metadata. This is partly because the Kafka consumer offset API is a moving target right now. If this code proves to be stable, it would be nice to have a user-facing API for interacting with Kafka metadata.
  • Batch generation policies. Right now, rate-limiting is the only tuning available for how the next batch in the stream is defined. We have some use cases that involve larger tweaks, such as a fixed time delay. A flexible way of defining batch generation policies might be useful.

If there are other improvements you can think of, please let me know.

Categories: Hadoop

How Testing Supports Production-Ready Security in Cloudera Search

Cloudera Blog - Fri, 03/13/2015 - 16:18

Security architecture is complex, but these testing strategies help Cloudera customers rely on production-ready results.

Among other things, good security requires user authentication and that authenticated users and services be granted access to those things (and only those things) that they’re authorized to use. Across Apache Hadoop and Apache Solr (which ships in CDH and powers Cloudera Search), authentication is accomplished using Kerberos and SPNego over HTTP and authorization is accomplished using Apache Sentry (the emerging standard for role-based fine grain access control, currently incubating at the ASF).

The interactions among Kerberos, Sentry, and different system configs, OSs, and environments are complicated, and for production-ready applications, they require a variety of tests to ensure that security works as expected. In this post, you’ll learn how Cloudera uses a range of testing methodologies to help ensure that security operations work as expected for Cloudera Search customers—many of which are in highly regulated industries.

Native Security in Solr

This diagram illustrates the execution cycle of a sample Solr request in a secure environment. Incoming HTTP requests must first complete Kerberos authentication. If authentication fails, the web server returns an HTTP “401 Unauthorized” error to the user, thereby restricting Solr access. If authentication succeeds, Solr forwards the request to Sentry for authorization. Sentry grants or denies access based on which user is requesting access, the request type, and the existing permissions defined for the user.

This feature, called index-level security, provides control over collections using QUERY, UPDATE, and ALL permissions. If authorization fails, Solr returns an HTTP “401 Unauthorized” error. Otherwise, the request is processed by Solr.

In addition to index-level security, Solr supports document-level security via Sentry, which enforces the fine-grain access control of which Solr documents can be viewed by which users.

Testing Types

There are two primary sets of test cases for validating Sentry-Solr integration: Unit tests and integration tests. Integration tests are further divided into mini-cluster integration tests and real hardware integration tests.

Unit Testing Sentry and Solr

Unit tests run in a virtualized cluster spun up on the test JVM and help validate the functionality of a specific method or class. These tests are very important in isolating failures to a specific module before the error touches multiple layers (modules), which makes it hard to debug. Unit tests are important because they provide good code coverage and help catch bugs sooner in the development cycle. Cloudera runs the entire suite of unit tests before deploying every new project build. This helps to ensure that there are no regressions from recent check-ins and helps verify code stability.

Integration Testing for Sentry + Solr

Integration tests evaluate end-to-end functionality of the product, mainly to mimic the end user scenarios. These tests are very important in verifying the overall functionality of the system, ranging from integration of multiple sub-modules or inter-component interaction (Sentry <-> Solr) and to test all these pieces at one place.

There are two places where integration tests are run: One is in the mini-cluster spun up in the JVM, and the other is on real distributed clusters. Both methods have advantages: Issues found using tests on a single JVM can be easier to debug, whereas end-to-end user scenarios run on real distributed clusters may identify issues that would not manifest on a single JVM.

Integration Testing in a Single JVM

Sentry, being a critical piece in security, requires exhaustive test coverage across all possible user scenarios to avoid corner cases that bypass security. This characteristic requires running SolrCloud (multiple shards and replicas) and Sentry in a single environment. Solr has an existing test framework for testing SolrCloud features (see AbstractFullDistribZkTestBase), but using it requires pulling in the entire Solr test hierarchy, which may not be required. The other downside of this approach is that Solr dependencies may conflict with other projects. To avoid this issue and to make testing more pluggable, Cloudera developed the MiniSolrCloudCluster (SOLR-5865) test framework, which separates out the SolrCloud functionality and thus makes it easier for new projects to use and test SolrCloud in a single JVM.

The tests developed on MiniSolrCloudCluster framework cover extensive use cases by auto-generating users with all possible permissions and sending out requests as a particular user to make sure the Solr responses are being matched with the expected output. For example, for three existing access specifiers QUERY, UPDATE, ALL, the test framework generates eight possible users. Each user would have one of the eight possible permissions and each user would issue both QUERY and UPDATE requests to Solr. The tests then compare to verify that Sentry’s granting access or unauthorized error matches the expected response. This framework provides us with a good understanding and baseline of Sentry behavior in a real Solr cluster scenario.

Integration Testing in a Distributed Cluster

One caveat to the above approach is that neither MiniSolrCloudCluster nor the existing Solr test-framework  support Kerberos authentication at the time of this writing. To address this issue, Cloudera developed a non-Kerberos authentication layer that modifies the request to look like it successfully passed the Kerberos authentication. Although this approach enables us to test Sentry extensively with Solr, it also bypasses testing Kerberos authentication, a critical element of our security solution. In a production deployment, the end-user must still use Kerberos to log into the system and run commands via Sentry. This critical piece was covered in our integration testing on real distributed clusters.

The other main difference between real clusters and MiniSolrCloudCluster is that with the latter, all the required processes like Sentry, Solr, and Apache ZooKeeper run in a single JVM. Although this approach is good for testing locally, we would still want to have a clear picture of how the system behaves if all the processes have their own JVMs and completely distributed across multiple nodes in the cluster.

The next and final line of defense is to run the integration tests on real clusters of varied sizes, configurations, and OSs. Running the suite of integration tests on real clusters has many clear advantages, like the ability to:

  • Catch packaging issues (like errors with RPMs, DEBs and etc) that fall out when building the product
  • Mimic the end-user deployment scenario (running on wide range of OSs)
  • Cover use cases not covered by previous rounds of testing (running the tests in fully secure Kerberos environment that hasn’t been covered by MiniSolrCloudCluster)
  • Scale out the cluster to an arbitrary size and check the performance implications
  • Run the system under different configurations, such as NameNode HA or vanilla (simple secure) systems
  • Induce failure scenarios and monitor how the system recovers
  • Run longer-running tests that ingest lots of data (as memory is a constraint when running MiniSolrCloudCluster in JVM)

Testing on real clusters is done by first creating a large number of Linux users, groups, Kerberos principals, and keytabs for the above users, which is completely missing in MiniSolrCloudCluster. We then define a Sentry policy file with roles to groups and privileges to roles mapping.

We run a subset of the most common end-to-end scenarios, as the MiniSolrCloudCluster already runs the exhaustive suite of tests. These tests are run against the real cluster, which includes “kinit”ing as user and sending out a QUERY/UPDATE request to Solr. In this cycle, the user first authenticates with Solr using Kerberos, followed by Sentry scrutinizing the incoming request based on the user authenticated.

Because MiniSolrCloudCluster runs in a JVM, it has to be created at the beginning of every test cycle, which results in loss of state. However, in real clusters the state is preserved as the clusters are longer running. This approach provides us with an environment for running the same set of tests multiple times and to notice the problems over time (such as memory leaks, longer GC pauses, and average turnaround time of a single request).


You should now understand the different levels of validation done by Cloudera for Search and Sentry integration. We welcome any other suggestions or contributions to the existing Sentry test suite. The source code can be found here.

Cloudera Search and Sentry are available for download as part of CDH and comes with extensive documentation. If you have any questions, please contact us at the Cloudera Search Forum or Search mailing list.

Vamsee Yarlagadda is a Software Engineer at Cloudera and an Apache Sentry (incubating) committer.

Categories: Hadoop

Understanding HDFS Recovery Processes (Part 2)

Cloudera Blog - Wed, 03/11/2015 - 16:12

Having a good grasp of HDFS recovery processes is important when running or moving toward production-ready Apache Hadoop. In the conclusion to this two-part post, pipeline recovery is explained.

An important design requirement of HDFS is to ensure continuous and correct operations that support production deployments. For that reason, it’s important for operators to understand how HDFS recovery processes work. In Part 1 of this post, we looked at lease recovery and block recovery. Now, in Part 2, we explore pipeline recovery.

All three recovery processes are essential for HDFS fault tolerance. Together, they help to ensure that writes are durable and consistent in HDFS, even in the presence of network and node failures.


In HDFS, files are divided into blocks, and file access follows multi-reader, single-writer semantics. To meet the fault-tolerance requirement, multiple replicas of a block are stored on different DataNodes. The number of replicas is called the replication factor. When a new file block is created, or an existing file is opened for append, the HDFS write operation creates a pipeline of DataNodes to receive and store the replicas (the replication factor generally determines the number of DataNodes in the pipeline). Subsequent writes to that block go through the pipeline (Figure 1).

Figure 1. HDFS Write Pipeline

For read operations the client chooses one of the DataNodes holding copies of the block and requests a data transfer from it.

For a deeper dive into this background information, read Part 1 of this post.

Pipeline Recovery The Write Pipeline

When an HDFS client writes to file, the data is written as sequential blocks. To write or construct a block, HDFS breaks the block into packets (not actually network packets but rather messages; the term packets refers to the class which embodies these messages), and propagates them to the DataNodes in the write pipeline, as shown in Figure 2.

Figure 2. HDFS Write Pipeline Stages

There are three stages of a write pipeline:

  1. Pipeline setup. The client sends a Write_Block request along the pipeline and the last DataNode sends an acknowledgement back. After receiving the acknowledgement, the pipeline is ready for writing.
  2. Data streaming. The data is sent through the pipeline in packets. The client buffers the data until a packet is filled up, and then sends the packet to the pipeline. If the client calls hflush(), then even if a packet is not full, it will nevertheless be sent to the pipeline and the next packet will not be sent until the acknowledgement of the previous hflush’ed packet is received by the client.
  3. Close (finalize the replica and shutdown the pipeline). The client waits until all packets have been acknowledged and then sends a close request. All DataNodes in the pipeline change the corresponding replica into the FINALIZED state and report back to the NameNode. The NameNode then changes the block’s state to COMPLETE if at least the configured minimum replication number of DataNodes reported a FINALIZED state of their corresponding replicas.
Pipeline Recovery

Pipeline recovery is initiated when one or more DataNodes in the pipeline encounter an error in any of the three stages while a block is being written.

Recovery from Pipeline Setup Failure

  1. If the pipeline was created for a new block, the client abandons the block and asks the NameNode for a new block and a new list of DataNodes. The pipeline is reinitialized for the new block.
  2. If the pipeline was created to append to a block, the client rebuilds the pipeline with the remaining DataNodes and increments the block’s generation stamp.

Recovery from Data Streaming Failure

  1. When a DataNode in the pipeline detects an error (for example, a checksum error or a failure to write to disk), that DataNode takes itself out of the pipeline by closing up all TCP/IP connections. If the data is deemed not corrupted, it also writes buffered data to the relevant block and checksum (METADATA) files.
  2. When the client detects the failure, it stops sending data to the pipeline, and reconstructs a new pipeline using the remaining good DataNodes. As a result, all replicas of the block are bumped up to a new GS.
  3. The client resumes sending data packets with this new GS. If the data sent has already been received by some of the DataNodes, they just ignore the packet and pass it downstream in the pipeline.

Recovery from Close Failure

  1. When the client detects a failure in the close state, it rebuilds the pipeline with the remaining DataNodes. Each DataNode bumps up the block’s GS and finalizes the replica if it’s not finalized yet.

When one DataNode is bad, it removes itself from the pipeline. During the pipeline recovery process, the client may need to rebuild a new pipeline with the remaining DataNodes. (It may or may not replace bad DataNodes with new DataNodes, depending on the DataNode replacement policy described in the next section.) The replication monitor will take care of replicating the block to satisfy the configured replication factor.

DataNode Replacement Policy upon Failure

There are four configurable policies regarding whether to add additional DataNodes to replace the bad ones when setting up a pipeline for recovery with the remaining DataNodes:

  1. DISABLE: Disables DataNode replacement and throws an error (at the server); this acts like NEVER at the client.
  2. NEVER: Never replace a DataNode when a pipeline fails (generally not a desirable action).
  3. DEFAULT:  Replace based on the following conditions:
    1. Let r be the configured replication number.
    2. Let n be the number of existing replica datanodes.
    3. Add a new datanodeonly if r >= 3 and EITHER
    4.   (1) floor(r/2) >= n; OR
    5.   (2) r > n and the block is hflushed/appended.
  4. ALWAYS: Always add a new DataNode when an existing DataNode failed. This fails if a DataNode can’t be replaced.

To disable using any of these policies, you can set the following configuration property to false (the default is true):


When enabled, the default policy is DEFAULT. The following config property changes the policy:


When using DEFAULT or ALWAYS, if only one DataNode succeeds in the pipeline, the recovery will never succeed and client will not be able to perform the write. This problem is addressed with this configuration property:


which defaults to false. With the default setting, the client will keep trying until the specified policy is satisfied. When this property is set to true, even if the specified policy can’t be satisfied (for example, there is only one DataNode that succeeds in the pipeline, which is less than the policy requirement), the client will still be allowed to continue to write.

Some Solved Issues
  • HDFS-5016 details a deadlock scenario in pipeline recovery that causes DataNode to be marked dead (duplicates HDFS-3655 “Datanode recoverRbw could hang sometime” and HDFS-4851 “Deadlock in pipeline recovery”). Here’s what happens: when the recovery is ongoing, it causes some relevant threads to wait for each other, thus deadlocking. Since the FSDataset lock is held in this deadlock, the heartbeat thread and data transceiver threads are blocked waiting on FSDataset lock. The solution is to introduce a timeout mechanism to break the deadlock.
  • HDFS-4882 reports a case that the NameNode’s LeaseManager keep looping forever in checkLeases. When the hard limit expires, LeaseManager tries to recover lease, if the second-to-last block is COMMITTED, and the last block is COMPLETE, internalReleaseLease() would return without releasing the lease, and the LeaseManager will keep trying to release the same lease, thus an infinite loop. Since the FSNamesystem.writeLock is hold in the loop, it essentially makes the NameNode unresponsive. The fix is to only try releasing a lease periodically rather than continuously.
  • HDFS-5557 details a case in which write pipeline recovery for the last packet in the block may cause rejection of valid replicas because of incorrect GS recording when handling block report. The worst case is that all good replicas will be rejected and a bad one is accepted. In this case, the corresponding block will get completed, but the data cannot be read until the next full block report containing one of the valid replicas is received. The solution is to fix the GS recording.
  • HDFS-5558 reports a case that LeaseManager monitor thread can crash if the last block is complete but second-to-last block is not. If a file has its last and second-to-last block not in COMPLETE state and it is attempted to close the file, the last block may change to COMPLETE but the second-to-last one might not. If this condition lasts long and the file is abandoned, LeaseManager will try to recover the lease and do block recovery on the block. But internalReleaseLease() will fail with invalid cast exception with this kind of file. The solution is to ensure the second-to-last block is in COMPLETE state before closing the file.
Known Open Issues
  • With the introduction of dfs.client.block.write.replace-datanode-on-failure.best-effort, a client will be able to continue to write even if there is only one DataNode. When this happens, a block may have only one replica, and if anything happens to this single copy before it is replicated, data loss will occur. To alleviate the problem, HDFS-6867 proposes a background thread to do the pipeline recovery while the client is writing to the single replica.
  • HDFS-4504 details the case where DFSOutputStream#close doesn’t always release resources (such as leases). In some cases, DFSOutputStream#close can throw an IOException. One example is if there is a pipeline error and then pipeline recovery fails. Unfortunately, in this case, some of the resources used by the DFSOutputStreamare leaked. One particularly important resource is file leases.

    So, it’s possible for a long-lived HDFS client, such as Apache Flume, to write many blocks to a file but then fail to close it. However, the LeaseRenewer thread inside the client will continue to renew the lease for the “undead” file. Future attempts to close the file will just re-throw the previous exception, and no progress can be made by the client.

  • HDFS-6937 details a pipeline recovery issue due to checksum error. The data on the middle DataNode (assuming a replication factor of 3) is somehow corrupted, but not detected. The last DataNode found out the checksum error and takes itself out from the pipeline. The recovery process keeps trying to replace the last DataNode in the pipeline with a new one, as well as replicating the data from the middle DataNode to the new one. Each time the replication fails due to checksum error (because of the corrupted replica at the middle DataNode), and the new DataNode is marked as a bad and thrown away, even though it’s not really bad. Eventually the recovery fails after exhausting all the DataNodes.
  • HDFS-7342 reports a case that Lease Recovery can not succeed when the second-to-last block is COMMITTED and the last block is COMPLETE. One suggested solution is to force the the lease to be recovered, which is similar to how we handle when the last block is COMMITTED. One can see that HDFS-7342, HDFS-4882, HDFS-5558 are related in that the second-to-last block is in COMMITTED state. The subtlety of the issue is still under investigation currently.

Lease recovery, block recovery, and pipeline recovery are all essential for HDFS fault tolerance. Together, they insure that writes are durable and consistent in HDFS, even in the presence of network and node failures.

Hopefully, after reading these posts, you have a better understanding of when and why these processes are invoked, and what they do. If you are interested in learning more, you can read through some of the links including the design specification, JIRAs referenced here, or the relevant code.

Yongjun Zhang is a Software Engineer at Cloudera, and a Hadoop committer.

Categories: Hadoop

How-to: Tune Your Apache Spark Jobs (Part 1)

Cloudera Blog - Mon, 03/09/2015 - 16:29

Learn techniques for tuning your Apache Spark jobs for optimal efficiency.

(Editor’s note: Sandy presents on “Estimating Financial Risk with Spark” at Spark Summit East on March 18.)

When you write Apache Spark code and page through the public APIs, you come across words like transformation, action, and RDD. Understanding Spark at this level is vital for writing Spark programs. Similarly, when things start to fail, or when you venture into the web UI to try to understand why your application is taking so long, you’re confronted with a new vocabulary of words like job, stage, and task. Understanding Spark at this level is vital for writing good Spark programs, and of course by good, I mean fast. To write a Spark program that will execute efficiently, it is very, very helpful to understand Spark’s underlying execution model.

In this post, you’ll learn the basics of how Spark programs are actually executed on a cluster. Then, you’ll get some practical recommendations about what Spark’s execution model means for writing efficient programs.

How Spark Executes Your Program

A Spark application consists of a single driver process and a set of executor processes scattered across nodes on the cluster.

The driver is the process that is in charge of the high-level control flow of work that needs to be done. The executor processes are responsible for executing this work, in the form of tasks, as well as for storing any data that the user chooses to cache. Both the driver and the executors typically stick around for the entire time the application is running, although dynamic resource allocation changes that for the latter. A single executor has a number of slots for running tasks, and will run many concurrently throughout its lifetime. Deploying these processes on the cluster is up to the cluster manager in use (YARN, Mesos, or Spark Standalone), but the driver and executor themselves exist in every Spark application.

At the top of the execution hierarchy are jobs. Invoking an action inside a Spark application triggers the launch of a Spark job to fulfill it. To decide what this job looks like, Spark examines the graph of RDDs on which that action depends and formulates an execution plan. This plan starts with the farthest-back RDDs—that is, those that depend on no other RDDs or reference already-cached data–and culminates in the final RDD required to produce the action’s results.

The execution plan consists of assembling the job’s transformations into stages. A stage corresponds to a collection of tasks that all execute the same code, each on a different subset of the data. Each stage contains a sequence of transformations that can be completed without shuffling the full data.

What determines whether data needs to be shuffled? Recall that an RDD comprises a fixed number of partitions, each of which comprises a number of records. For the RDDs returned by so-called narrow transformations like map and filter, the records required to compute the records in a single partition reside in a single partition in the parent RDD. Each object is only dependent on a single object in the parent. Operations like coalesce can result in a task processing multiple input partitions, but the transformation is still considered narrow because the input records used to compute any single output record can still only reside in a limited subset of the partitions.

However, Spark also supports transformations with wide dependencies such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition may reside in many partitions of the parent RDD. All of the tuples with the same key must end up in the same partition, processed by the same task. To satisfy these operations, Spark must execute a shuffle, which transfers data around the cluster and results in a new stage with a new set of partitions.

For example, consider the following code:

sc.textFile("someFile.txt"). map(mapFunc). flatMap(flatMapFunc). filter(filterFunc). count()

It executes a single action, which depends on a sequence of transformations on an RDD derived from a text file. This code would execute in a single stage, because none of the outputs of these three operations depend on data that can come from different partitions than their inputs.

In contrast, this code finds how many times each character appears in all the words that appear more than 1,000 times in a text file.

val tokenized = sc.textFile(args(0)).flatMap(_.split(' ')) val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _) val filtered = wordCounts.filter(_._2 >= 1000) val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)). reduceByKey(_ + _) charCounts.collect()

This process would break down into three stages. The reduceByKey operations result in stage boundaries, because computing their outputs requires repartitioning the data by keys.

Here is a more complicated transformation graph including a join transformation with multiple dependencies.

The pink boxes show the resulting stage graph used to execute it.

At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by tasks in the child stage. Because they incur heavy disk and network I/O, stage boundaries can be expensive and should be avoided when possible. The number of data partitions in the parent stage may be different than the number of partitions in the child stage. Transformations that may trigger a stage boundary typically accept a numPartitions argument that determines how many partitions to split the data into in the child stage.

Just as the number of reducers is an important parameter in tuning MapReduce jobs, tuning the number of partitions at stage boundaries can often make or break an application’s performance. We’ll delve deeper into how to tune this number in a later section.

Picking the Right Operators

When trying to accomplish something with Spark, a developer can usually choose from many arrangements of actions and transformations that will produce the same results. However, not all these arrangements will result in the same performance: avoiding common pitfalls and picking the right arrangement can make a world of difference in an application’s performance. A few rules and insights will help you orient yourself when these choices come up.

Recent work in SPARK-5097 began stabilizing SchemaRDD, which will open up Spark’s Catalyst optimizer to programmers using Spark’s core APIs, allowing Spark to make some higher-level choices about which operators to use. When SchemaRDD becomes a stable component, users will be shielded from needing to make some of these decisions.

The primary goal when choosing an arrangement of operators is to reduce the number of shuffles and the amount of data shuffled. This is because shuffles are fairly expensive operations; all shuffle data must be written to disk and then transferred over the network. repartition , join, cogroup, and any of the *By or *ByKey transformations can result in shuffles. Not all these operations are equal, however, and a few of the most common performance pitfalls for novice Spark developers arise from picking the wrong one:

  • Avoid groupByKey when performing an associative reductive operation. For example, rdd.groupByKey().mapValues(_.sum) will produce the same results as rdd.reduceByKey(_ + _). However, the former will transfer the entire dataset across the network, while the latter will compute local sums for each key in each partition and combine those local sums into larger sums after shuffling.
  • Avoid reduceByKey When the input and output value types are different. For example, consider writing a transformation that finds all the unique strings corresponding to each key. One way would be to use map to transform each element into a Set and then combine the Sets with reduceByKey:
    rdd.map(kv => (kv._1, new Set[String]() + kv._2))   .reduceByKey(_ ++ _)

    This code results in tons of unnecessary object creation because a new set must be allocated for each record. It’s better to use aggregateByKey, which performs the map-side aggregation more efficiently:

    val zero = new collection.mutable.Set[String]() rdd.aggregateByKey(zero)( (set, v) => set += v, (set1, set2) => set1 ++= set2)

  • Avoid the flatMap-join-groupBy pattern. When two datasets are already grouped by key and you want to join them and keep them grouped, you can just use cogroup. That avoids all the overhead associated with unpacking and repacking the groups.
When Shuffles Don’t Happen

It’s also useful to be aware of the cases in which the above transformations will not result in shuffles. Spark knows to avoid a shuffle when a previous transformation has already partitioned the data according to the same partitioner. Consider the following flow:

rdd1 = someRdd.reduceByKey(...) rdd2 = someOtherRdd.reduceByKey(...) rdd3 = rdd1.join(rdd2)

Because no partitioner is passed to reduceByKey, the default partitioner will be used, resulting in rdd1 and rdd2 both hash-partitioned. These two reduceByKeys will result in two shuffles. If the RDDs have the same number of partitions, the join will require no additional shuffling. Because the RDDs are partitioned identically, the set of keys in any single partition of rdd1 can only show up in a single partition of rdd2. Therefore, the contents of any single output partition of rdd3 will depend only on the contents of a single partition in rdd1 and single partition in rdd2, and a third shuffle is not required.

For example, if someRdd has four partitions, someOtherRdd has two partitions, and both the reduceByKeys use three partitions, the set of tasks that execute would look like:

What if rdd1 and rdd2 use different partitioners or use the default (hash) partitioner with different numbers partitions?  In that case, only one of the rdds (the one with the fewer number of partitions) will need to be reshuffled for the join.

Same transformations, same inputs, different number of partitions:

One way to avoid shuffles when joining two datasets is to take advantage of broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor. A map transformation can then reference the hash table to do lookups.

When More Shuffles are Better

There is an occasional exception to the rule of minimizing the number of shuffles. An extra shuffle can be advantageous to performance when it increases parallelism. For example, if your data arrives in a few large unsplittable files, the partitioning dictated by the InputFormat might place large numbers of records in each partition, while not generating enough partitions to take advantage of all the available cores. In this case, invoking repartition with a high number of partitions (which will trigger a shuffle) after loading the data will allow the operations that come after it to leverage more of the cluster’s CPU.

Another instance of this exception can arise when using the reduce or aggregate action to aggregate data into the driver. When aggregating over a high number of partitions, the computation can quickly become bottlenecked on a single thread in the driver merging all the results together. To loosen the load on the driver, one can first use reduceByKey or aggregateByKey to carry out a round of distributed aggregation that divides the dataset into a smaller number of partitions. The values within each partition are merged with each other in parallel, before sending their results to the driver for a final round of aggregation. Take a look at treeReduce and treeAggregate for examples of how to do that. (Note that in 1.2, the most recent version at the time of this writing, these are marked as developer APIs, but SPARK-5430 seeks to add stable versions of them in core.)

This trick is especially useful when the aggregation is already grouped by a key. For example, consider an app that wants to count the occurrences of each word in a corpus and pull the results into the driver as a map.  One approach, which can be accomplished with the aggregate action, is to compute a local map at each partition and then merge the maps at the driver. The alternative approach, which can be accomplished with aggregateByKey, is to perform the count in a fully distributed way, and then simply collectAsMap the results to the driver.

Secondary Sort

Another important capability to be aware of is the repartitionAndSortWithinPartitions transformation. It’s a transformation that sounds arcane, but seems to come up in all sorts of strange situations. This transformation pushes sorting down into the shuffle machinery, where large amounts of data can be spilled efficiently and sorting can be combined with other operations.

For example, Apache Hive on Spark uses this transformation inside its join implementation. It also acts as a vital building block in the secondary sort pattern, in which you want to both group records by key and then, when iterating over the values that correspond to a key, have them show up in a particular order. This issue comes up in algorithms that need to group events by user and then analyze the events for each user based on the order they occurred in time. Taking advantage of repartitionAndSortWithinPartitions to do secondary sort currently requires a bit of legwork on the part of the user, but SPARK-3655 will simplify things vastly.


You should now have a good understanding of the basic factors in involved in creating a performance-efficient Spark program! In Part 2, we’ll cover tuning resource requests, parallelism, and data structures.

Sandy Ryza is a Data Scientist at Cloudera, an Apache Hadoop committer, and an Apache Spark contributor. He is a co-author of the O’Reilly Media book, Advanced Analytics with Spark.

Categories: Hadoop

This Month in the Ecosystem (February 2015)

Cloudera Blog - Thu, 03/05/2015 - 20:02

Welcome to our 17th edition of “This Month in the Ecosystem,” a digest of highlights from February 2015 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly). 

Wow, a ton of news for such a short month:

  • 4,500 people converged for the first Strata + Hadoop World San Jose, ever (with a special appearance by Barack Obama, no less). Slides and session video are available.
  • After years of development by a huge and diverse community, Apache HBase 1.0 was released. (Learn more about it at HBaseCon 2015.)
  • Apache Hive 1.0.0 (formerly 0.14.1) was also released by its community.
  • Cloudera announced formal support for Apache Kafka alongside CDH.
  • Pivotal and Hortonworks announced the Open Data Platform initiative, which Cloudera and MapR have declined to join.
  • Project Myriad, an integration of YARN and Mesos spearheaded by MapR and eBay, was proposed for the Apache Incubator.
  • A Kite SDK 1.0 was released. The Kite API is now stable!
  • A Hive-on-Spark beta was released by Cloudera. (Only HDFS, YARN, Apache ZooKeeper, and Apache Hive are supported thus far.)
  • Cloudera also announced a strategic partnership with Cask, and integration of the latter’s CDAP offering with Cloudera Enterprise.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

Calculating CVA with Apache Spark

Cloudera Blog - Wed, 03/04/2015 - 18:03

Thanks to Matthew Dixon, principal consultant at Quiota LLC and Professor of Analytics at the University of San Francisco, and Mohammad Zubair, Professor of Computer Science at Old Dominion University, for this guest post that demonstrates how to easily deploy exposure calculations on Apache Spark for in-memory analytics on scenario data.

Since the 2007 global financial crisis, financial institutions now more accurately measure the risks of over-the-counter (OTC) products. It is now standard practice for institutions to adjust derivative prices for the risk of the counter-party’s, or one’s own, default by means of credit or debit valuation adjustments (CVA/DVA).

Calculating the CVA of a portfolio typically requires Monte-Carlo simulation with a large number of scenarios. The computation and storage requirement for what-if scenario analysis grows significantly when the portfolio is large and contains a wide array of financial instruments across multiple asset classes. To handle that complexity, distributed computing platforms offer trade-offs across performance, flexibility, modularity, and maintainability of programming infrastructure.

For example, in this post, you will learn how to efficiently deploy exposure calculations on Apache Spark for in-memory analytics on scenario data. This example application exemplifies the flexibility, maintainability, and scalability provided by Spark. Applications with cyclic dataflow graphs, such as counter-party credit risk analytics, are well suited for Spark because the scenario data can be kept in memory for fast what-if scenario and statistical analysis. (To see another example of Spark’s benefits in the financial calculations area—in this case, for calculating VaR—see this post.)

Estimating CVA on Spark

The computational complexity of estimating the CVA is significant and beyond the capability of a single workstation. The number of calculations to estimate CVA of a portfolio is given by:

Number of Instruments × Number of time intervals × Number of scenarios

A calculation here refers to calculating the price of an instrument for a scenario at some time interval. Consider, for example, a CVA calculation on a portfolio of 10,000 instruments, with average maturity of 10 years, and 2,000 market scenarios generated every three months—resulting in 8 billion calculations. The time to price an instrument can vary depending on the instrument. If we assume on average that it takes 100 microseconds to price an instrument on a single workstation, it will take a total of 220 hours to perform all the calculations.

A numbers of banks are using customized parallel and distributed computing platforms to perform these calculations in a reasonable time. Often banks need to hold all the pricing of instruments in memory to calculate various statistics. This requirement results in a large memory consumption, thus the current proprietary solutions are expensive and hard to scale with respect to fault tolerance.

Alternatively, Spark can be programmed to calculate the CVA of a portfolio over a cluster of thousands of cheap commodity nodes using high-level languages such as Scala and Python, thus making it an attractive platform for prototyping and live risk estimates. The key benefit of using Spark over other distributed implementations such as MPI, OpenMP, and CUDA is that it allows the computations to scale to a large number of nodes reliably where failure of nodes is managed by the framework. Furthermore, Spark can hold all the price results of a large portfolio simulation in memory across thousands of nodes in support of calculating various statistics on demand. 

This benefit, along with the ease of implementation, comes at the expense of some performance loss. However, we can minimize this performance loss by using, say, the numpy/scipy package in Python that has been built using BLAS and LAPACK routines.

Implementing CVA in Python on Spark

For demonstration purposes we consider a portfolio of NI vanilla swaps with NT time intervals and NS simulations. The total number of price calculations then is given by NI × NT × NS. Our implementation on Spark is based on Python code that utilizes the QuantLib package to value a vanilla swap.

(Figure courtesy of Giovanni Cesari, UBS)

Next, we will briefly outline this computation on Spark.

1. Create a distributed collection (RDD), randArrayRDD, of random numbers of size NS × NT using the Apache Spark Python MLIB API:

sc = SparkContext(appName = 'CVA') randArrayRDD = RandomRDDs.normalVectorRDD(sc, NS, NT, numPartitions=NP, seed=1L)

2. Call a map function that processes the RDD in parallel and collect the pricing at the driver.

pmat = randArrayRDD.map(lambda p:(value_swap(p, NI))).collect() pmat = np.array(pmat) pmat = pmat.reshape((NS, NI, NT))

The map function works on a row of RDD of random numbers in two stages:

(a) Construct NT discount curves, crvVec ,one for each time interval using the single factor Hull-White short-rate model.

for iT in xrange(1,NT): crvDate=Dates[iT]; crvDates=[crvDate]+[crvDate+Period(k,Years) for k in xrange(1,NTenors)] crvDiscounts=[1.0]+[A(T[iT],T[iT]+k)*exp(-B(T[iT],T[iT]+k)*rvec[iT]) for k in xrange(1,NTenors)] crvVec[iT]=DiscountCurve(crvDates,crvDiscounts,Actual360(),TARGET())

(b) For each discount curve, value NI swaps at NT time intervals in the future to construct a price matrix, spmat, of size NI × NT for each simulation.

for iT in xrange(len(T)): Settings.instance().evaluationDate=Dates[iT] allDates= list(floatingSchedule) fixingdates=[index.fixingDate(floatingSchedule[iDate]) for iDate in xrange(len(allDates)) if index.fixingDate(floatingSchedule[iDate])<=Dates[iT]] if fixingdates: for date in fixingdates[:-1]: try:index.addFixing(date,0.0) except:pass try:index.addFixing(fixingdates[-1],rmean[iT]) except:pass discountTermStructure = RelinkableYieldTermStructureHandle() swapEngine = DiscountingSwapEngine(discountTermStructure) swap1.setPricingEngine(swapEngine) crv=crvVec[iT] discountTermStructure.linkTo(crv) forecastTermStructure.linkTo(crv) npvVec[nI][iT]=swap1.NPV()

This price matrix, pmat, has price distribution information for all swaps in the portfolio and can be used to calculate various exposure measures such as potential future exposure (PFE) and the expected positive exposure (EPE). The CVA can be estimated using the simulated estimate of the EPE.

EE=np.sum(npvMat, axis=1) EE=np.mean(EE,axis=0) sum=0 for i in xrange(NT-1): sum += 0.5*(EE[i]*crvToday.discount(T[i])+EE[i+1]*crvToday.discount(T[i+1]))*(exp(-S*T[i]/(1.0-R))-exp(-S*T[i+1]/(1.0-R))) CVA=(1.0-R)*sum

Note that if you do not want to preserve the price distribution for different swaps, you can simply aggregate prices of different swaps at each worker before sending it to the driver—thereby reducing the communication cost. To do that, modify step 2(b).

For each yield curve constructed earlier, value all NI swaps to create a price matrix of size NI × NT:

for nI in xrange(NI): fixedSchedule=Schedule(startDate, maturity,Period("6m"), TARGET(),ModifiedFollowing,ModifiedFollowing,DateGeneration.Forward, False) floatingSchedule=Schedule(startDate, maturity,Period("6m"),TARGET() ,ModifiedFollowing,ModifiedFollowing,DateGeneration.Forward, False) swap1=VanillaSwap(VanillaSwap.Receiver, 1000000,fixedSchedule,0.05 , Actual360(),floatingSchedule, index, 0,Actual360()) for iT in xrange(len(T)): Settings.instance().evaluationDate=Dates[iT] allDates=list(floatingSchedule) fixingdates=[index.fixingDate(floatingSchedule[iDate]) for iDate in xrange(len(allDates)) if index.fixingDate(floatingSchedule[iDate])<=Dates[iT]] if fixingdates: for date in fixingdates[:-1]: try:index.addFixing(date,0.0) except:pass try:index.addFixing(fixingdates[-1],rmean[iT]) except:pass discountTermStructure=RelinkableYieldTermStructureHandle() swapEngine=DiscountingSwapEngine(discountTermStructure) swap1.setPricingEngine(swapEngine) crv=crvVec[iT] discountTermStructure.linkTo(crv) forecastTermStructure.linkTo(crv) spmat[nI][iT]=swap1.NPV() spmat=np.array(spmat) spmat[spmat<0]=0 spmat=np.sum(spmat, axis=0)

This results in simplification of computation of the mean exposure and the CVA calculations at the driver. Next, create a distributed collection (RDD) of random numbers of size NS × NT:

randArrayRDD = RandomRDDs.normalVectorRDD(sc, NS, NT, numPartitions=NP, seed=1L)

Value each swap in parallel, where one unit of work is valuing a swap under all scenarios in a row of the random matrix.

pmat=randArrayRDD.map(lambda p: (value_swap(p, NI))).collect()

Collect the valuations at the driver.

pmat=np.array(pmat) pmat=pmat.reshape((NS, NT)) EE=np.sum(pmat, axis=1) EE=np.mean(EE,axis=0)

Calculate the CVA.

for i in xrange(NT-1): sum+=0.5*(EE[i]*crvToday.discount(T[i])+EE[i+1]*crvToday.discount(T[i+1]))*(exp(-S*T[i]/(1.0-R))-exp(-S*T[i+1]/(1.0-R))) CVA=(1.0-R)*sum


We have demonstrated how Spark can be programmed to calculate the CVA of a portfolio over a cluster of thousands of cheap commodity nodes using Python, thus making it an attractive platform for prototyping and live risk estimates.

Categories: Hadoop

Hello, Kite SDK 1.0

Cloudera Blog - Tue, 03/03/2015 - 23:09

The Kite project recently released a stable 1.0!

This milestone means that Kite’s data API and command-line tools is ready for long-term use.

The 1.0 data modules and API are no longer rapidly changing. From 1.0 on, Kite will be strict about breaking compatibility and will use semantic versioning to signal what compatibility guarantees you can expect from a given release. For example, breaking changes require increasing the major version number, so both minor and patch updates are safe to use without code changes, and binary compatible.

Kite provides some additional guarantees as well:

  • Kite’s command-line tool, kite-dataset, also follows semantic versioning. Changes that may break scripts will require updating the major version number.
  • Incompatible changes to Kite’s on-disk formats, Avro and Parquet, will also be signalled by a major version update.
  • Kite’s storage formats, metadata, and file layout are forward-compatible for at least one major version. You can update Kite, write to existing datasets, and roll back to the previous version safely.

We’re excited to get the Kite 1.0 release out. Now, Kite provides a great high-level API built around how you work with your data, and stability guarantees so you can be confident building on top of it.

Learn more about Kite:

Categories: Hadoop

How-to: Let Users Provision Apache Hadoop Clusters On-Demand

Cloudera Blog - Mon, 03/02/2015 - 16:51

Providing Hadoop-as-a-Service to your internal users can be a major operational advantage.

Cloudera Director (free to download and use) is designed for easy, on-demand provisioning of Apache Hadoop clusters in Amazon Web Services (AWS) environments, with support for other cloud environments in the works. It allows for provisioning clusters in accordance with the Cloudera AWS Reference Architecture.

At Cloudera, Cloudera Director is used internally to enable our technical field to provision clusters on-demand for demos, POCs, and testing purposes. The following post describes the goals of Cloudera’s internal environment, and how you can set it up your own to meet similar requirements.


Our internal deployment has several goals:

  • Provide a way to provision Hadoop clusters on-demand in a self-service manner as and when the field team needs them
  • Use a common AWS account
  • Make clusters accessible from within Cloudera’s network, making them look like an extension to our data center
  • Enable clusters to access Amazon S3 and the internet with the aggregate bandwidth scaling proportional to the cluster and instance sizes
  • Conform to Cloudera IT’s guidelines on AWS usage, which require all users to tag their instances with their usernames for them. All stray instances are terminated.

Ordinarily, you would set up the network context from the AWS management console, the AWS CLI client, or using AWS Quickstart. As it stands today, AWS Quickstart does not meet our goals because it doesn’t create the VPN, IAM roles, and so on the way we want them—it works great if you don’t need the VPN setup and are starting afresh. We chose to set things up manually instead, and the process is described below.

Setting Up a VPC

The first piece of that setup is the VPC, which is a one-time investment you can repurpose for other deployments that you might have in AWS with similar requirements. To do that, go into the VPC section of the console. It’ll look something like this:

As articulated in Cloudera’s reference architecture, three kinds of setups are possible: VPC with public subnet only, VPC with private subnet only, or VPC with private and public subnets. The choice depends on what kind of connectivity and security requirements you have. If you are looking to transfer data between your cluster and S3, you’ll need to deploy your cluster in a public subnet so it has public IP addresses and can interact with S3. Once you create your VPC, configure the route tables and virtual gateways to link it back to your data center using a VPN or Direct Connect link. Instructions for setting up a VPN are available here and for setting up Direct Connect are available here

Setting Up the Subnets

To have a cluster in AWS that has S3 and internet access, you need the instances to have public IP addresses. For the instances to be accessible from within your network, you need them to be linked via VPN or Direct Connect. Create a subnet inside the VPC you just created. It’ll automatically populate the route table from the VPC. The configurations would look something like the following:

You can leave the NACLs to default.

Setting Up Security Groups

To allow users to spin up clusters on-demand in a self-service manner, you need every provisioned instance to adhere to a set of ingress and egress rules at the bare minimum. These rules would allow outbound traffic to the internet and S3 and bidirectional traffic from Cloudera’s network. To do that, you can create a security group with the following rules:

Creating these security groups by themselves isn’t sufficient to enforce all instances to use them. You have to disallow creation, modification, and deletion of security groups in this VPC so that users have the option of only using the one you created for them.

Setting Up IAM Rules

Once you have the above setup configured, you don’t want users to be able to create, delete, or modify any of the configurations, especially security groups. If not restricted via IAM rules, security groups are easy to modify. For that, you can set IAM rules restricting users from modifying anything in this environment by having the rule like the following:

{     "Statement":[        {           "Effect":"Allow",          "NotAction":"iam:*",          "Resource":"*"       },       {           "Effect":"Deny",          "Action":"*",          "Resource":"arn:aws:ec2:us-east-1:*:security-group/sg-dbc403bf"       },       {           "Effect":"Deny",          "Action":[              "ec2:AuthorizeSecurityGroupIngress",             "ec2:AuthorizeSecurityGroupEgress",             "ec2:RevokeSecurityGroupIngress",             "ec2:RevokeSecurityGroupEgress",             "ec2:DeleteSecurityGroup",             "ec2:CreateSecurityGroup"          ],          "Resource":"*",          "Condition":{              "StringEquals":{                 "ec2:Vpc":"arn:aws:ec2:us-east-1:007856030109:vpc/vpc-c2d650a7"             }          }       }    ] }

This rule disallows users from performing any action on the specified security group that you have created as a part of the network context as well as creating, deleting, or modifying any other security groups inside the VPC that you’ve created. That way, you can rest assured you are not running at the risk of opening your network to more traffic than you intended.

Setting Up Cloudera Director

Cloudera Director setup instructions can be found in the documentation.

Once installed, you can create an environment inside Director with the specified public subnet so clusters in that environment can access the internet and S3. You’ll have to share the pem file for this environment with all users since keys are environment specific. Once the environment is set up, you can create instance templates with appropriate instance type configurations and tags based on your requirement. Cloudera tracks internal usage based on tags and every user marks their instances via tags. For that reason, we have every user create a template for themselves with their name in the tags.

Creating Clusters

Cloudera Director provides you with a web UI you can use to provision clusters. Click the “Add Cluster” button on the web UI to start the process.

You’ll create a Cloudera Manager instance first and then a cluster. You can also add a cluster to an existing Cloudera Manager instance, if you have one. The cluster configuration page looks like the following:

On clicking “Continue,” Cloudera Director will bootstrap your cluster and it’ll become available through the Cloudera Manager instance to which you added the cluster. That way, your users can provision clusters on-demand for different purposes.

Billing and Metering

Currently, there is no special billing and metering provision in Cloudera Director or Cloudera Manager. Pay-as-you-go pricing is available in lieu of annual subscription pricing. To track your usage, you can leverage the EC2 instance tags and detailed billing that’ll give you a breakdown of usage based on tags and other filters. You can also use third-party tools such as Cloudhealth and Cloudability to create the reports (which we do internally at Cloudera to track usage).


In this post, you learned how to set up and use AWS and Cloudera Director to provide Hadoop as a service to your users. At Cloudera, we are committed to continually improving this experience and would love to hear your feedback about what’s working and what’s not in the Cloudera Director area of community.cloudera.com.

Amandeep Khurana is Principal Solutions Architect at Cloudera. He is a co-author of the Manning book, HBase in Action.

Categories: Hadoop

How-to: Do Real-Time Log Analytics with Apache Kafka, Cloudera Search, and Hue

Cloudera Blog - Fri, 02/27/2015 - 16:35

Cloudera recently announced formal support for Apache Kafka. This simple use case illustrates how to make web log analysis, powered in part by Kafka, one of your first steps in a pervasive analytics journey.

If you are not looking at your company’s operational logs, then you are at a competitive disadvantage in your industry. Web server logs, application logs, and system logs are all valuable sources of operational intelligence, uncovering potential revenue opportunities and helping drive down the bottom line. Whether your firm is an advertising agency that analyzes clickstream logs for customer insight, or you are responsible for protecting the firm’s information assets by preventing cyber-security threats, you should strive to get the most value from your data as soon as possible.

In the past, it is cost-prohibitive to capture all logs, let alone implement systems that act on them intelligently in real time. Recently, however, technology has matured quite a bit and, today, we have all the right ingredients we need in the Apache Hadoop ecosystem to capture the events in real time, process them, and make intelligent decisions based on that information.

In this post, you will explore a sample implementation of a system that can capture Apache HTTP Server logs in real time, index them for searching, and make them available to other analytic apps as part of a “pervasive analytics” approach. This implementation is based on open source components such as Apache Flume, Apache Kafka, Hue, and Apache Solr.

Flume, Solr, Hue, and Kafka can all be easily installed using Cloudera Manager and parcels (the first three via the CDH parcel, and Kafka via its own parcel).


The high-level diagram below illustrates a simple setup that you can deploy in a matter of minutes. For our purposes, Apache web server log events originate in syslog. They are then forwarded to a Flume Agent, via Flume Syslog Source. Syslog Source sends them to Kafka Channel, which in turn passes them to a MorphlineSolr sink. MorphlineSink parses the messages, converts them into Solr documents, and sends them to Solr Server. After the indexed documents appear in Solr, Hue’s Search Application is utilized to search the indexes and build and display multiple unique dashboards for various audiences.


Next, you will learn all the details behind the above.

Apache Logs Breakdown

Every time you start a new project that involves Solr, you must first understand your data and organize it into fields. Fortunately, Apache web server logs are easy enough to understand and relate to Solr documents. A sample of the logs can be found below: - - [15/Dec/2014:06:39:51 +0000] "GET /accounts/login/?next=/ HTTP/1.1" 302 460 "-" "Mozilla/5.0+(compatible; UptimeRobot/2.0; http://www.uptimerobot.com/)" 55006 - - [15/Dec/2014:06:39:54 +0000] "GET /pig/watch/0000365-141119075018336-oozie-oozi-W?format=python&amp;_=1418625519197 HTTP/1.1" 200 719 "http://demo.gethue.com/pig/#logs" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)" 37789 - - [15/Dec/2014:06:39:55 +0000] "GET /pig/watch/0000365-141119075018336-oozie-oozi-W?format=python&amp;_=1418625519198 HTTP/1.1" 200 719 "http://demo.gethue.com/pig/#logs" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)" 28120

The diagram below represents a simple view of how to organize raw Apache web server messages into Solr fields:

There’s Something About the Cloud

Cloudera Search, which is integrates Solr with HDFS, is deployed in SolrCloud mode with all the options and flexibility that come with integrating to the rest of the Hadoop ecosystem in CDH. Throughout this example, you will use the solrctl command to manage SolrCloud deployments. (For the full command reference, please click here.)

Let’s begin by generating template configuration files for Solr. The most important and the only file to update is schema.xml,which is used in Solr to define the fields in the collection, their types, and their indexing characteristics. The command below generates conf directory with all configuration files in the $HOME/accessCollection folder:

solrctl --zk localhost:2181/solr instancedir --generate $HOME/accessCollection

(Please note that –zk localhost:2181 should be replaced with the address and port of your own Apache ZooKeeper quorum.)

schema.xml file. What follows is a brief overview of what was changed from the template generated above. The fields relevant to the Apache logs have to be defined in the schema file:

<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="_version_" type="long" indexed="true" stored="true"/> <field name="time" type="tdate" indexed="true" stored="true" /> <field name="record" type="text_general" indexed="true" stored="false" multiValued="true"/> <field name="client_ip" type="string" indexed="true" stored="true" /> <field name="code" type="string" indexed="true" stored="true" /> <field name="user_agent" type="string" indexed="true" stored="true" /> <field name="protocol" type="string" indexed="true" stored="true" /> <field name="url" type="string" indexed="true" stored="true" /> <field name="request" type="string" indexed="true" stored="true" /> <field name="referer" type="string" indexed="true" stored="true" /> <field name="bytes" type="tint" indexed="true" stored="true" /> <field name="method" type="string" indexed="true" stored="true" /> <field name="extension" type="string" indexed="true" stored="true" /> <field name="app" type="string" indexed="true" stored="true" /> <field name="subapp" type="string" indexed="true" stored="true" /> <field name="device_family" type="string" indexed="true" stored="true" /> <field name="user_agent_major" type="string" indexed="true" stored="true" /> <field name="user_agent_family" type="string" indexed="true" stored="true" /> <field name="os_family" type="string" indexed="true" stored="true" /> <field name="os_major" type="string" indexed="true" stored="true" /> <field name="region_code" type="string" indexed="true" stored="true" /> <field name="country_code" type="string" indexed="true" stored="true" /> <field name="city" type="string" indexed="true" stored="true" /> <field name="latitude" type="float" indexed="true" stored="true" /> <field name="longitude" type="float" indexed="true" stored="true" /> <field name="country_name" type="string" indexed="true" stored="true" /> <field name="country_code3" type="string" indexed="true" stored="true" />

Although you are not using the id and _version_ fields in this application, Solr uses them internally for its own bookkeeping. Therefore, every collection must have them (as defined in the schema.xml file).

One very important concept in SolrCloud deployments is the notion of collections. A collection is a single index that spans multiple Solr Instances. For example, if your syslog index is distributed across multiple Solr Instances, they all add up to form one collection.

Let’s call our collection accessCollection and set it up using the commands below. The first command uploads all of the configurations into a ZooKeeper znode. The second command creates the collection in Solr, based on the configuration in ZooKeeper from the first command.

solrctl --zk localhost:2181/solr instancedir --create accessCollection $HOME/accessCollection solrctl --zk localhost:2181/solr --create accessCollection -s 1

Again, replace –zk localhost:2181 with your own ZooKeeper quorum configuration in both statements.

Note that the -s 1 argument defines the number of shards. A shard is a very important concept in Solr that refers to a slice of an index. For example, if you have a corpus of 1 million events, you may want to split it into two shards for scalability and improved query performance. The first shard might handle all the documents that have an id between 0-500,000, and the second shard will handle documents with message id between 500,000-1,000,000. Solr handles all this logic internally; you need only specify the number of shards you would like to create with the -s option.

The number of shards will depend on many factors and should be determined carefully. The following table describes some of the considerations that should go into choosing the optimal number:

How Flume Met Kafka

Before you index the logs for searching, you need to collect them from the application servers.

Flume is a distributed system for collecting and processing log data. One of the main advantages of Flume is its large collection of sources and sinks. In many cases, Flume makes integration a no-brainer.

As previously described, our example uses Flume with Syslog Source to collect the log data from syslog, Kafka as a distributed and highly available channel to store the log data, and Solr sink with Morphlines to index the data and store it in Cloudera Search. All this can be done by properly configuring Flume, without writing a line of code. You can find the configuration file here.

There are three components in the configuration:

  • First, a syslog source, configured with the host and port to which it will bind.
    # Syslog Source Configuration tier1.sources.source1.type     = syslogtcp # the hostname that Flume Syslog source will be running on tier1.sources.source1.host     = localhost # the port that Flume Syslog source will listen on tier1.sources.source1.port     = 5040
  • Next, a Solr sink, configured with a configuration file that we’ll review in detail later.
    tier1.sinks.sink1.type          = org.apache.flume.sink.solr.morphline.MorphlineSolrSink tier1.sinks.sink1.morphlineFile = /apache_logs/latest/morphline.conf
  • Finally, a Kafka channel in between them.
    tier1.channels.channel1.type             = org.apache.flume.channel.kafka.KafkaChannel tier1.channels.channel1.transactionCapacity = 1000 tier1.channels.channel1.brokerList          = kafkaf-2:9092,kafkaf-3:9092 tier1.channels.channel1.topic               = channel1 tier1.channels.channel1.zookeeperConnect    = kafkaf-1:2181

The Kafka channel requires two mandatory parameters:

  • Location of at least one, but preferably two or more Kafka brokers
  • Location of the ZooKeeper quorum that Kafka uses

There are also a few optional parameters:

  • topic – specifies which topic the channel will use. It’s important to set it correctly if you expect Flume to read data that other apps wrote to Kafka; the topic should match between the apps and the Kafka Channel configuration. The default topic is flume-channel.
  • groupId – if multiple Kafka channels share the same groupId and same topic, they will each get partial data from the topic. Thus you can use this setting to add scalability via multiple Flume agents, each with a Kafka channel configured with the same groupId. Or, if you need multiple channels that all receive all the data from Kafka (essentially duplicating all the data), you’ll want to use different groupIds or different topics. The default groupId is flume.
  • transactionCapacity – the number of events the channel processes in one transaction. Setting this parameter to a higher number can increase throughput but latency as well.
  • parseAsFlumeEvent – A setting of “true” assumes that all events in the Kafka topic were written by a Flume source or Flume client. Thus the first time the channel starts, all events in the topic are read (subsequently, only the last recorded position is read). A setting of “false” assumes that some other application wrote the events to Kafka so thus they’re not parsed. In addition, only events written after the channel started are read (since the topic may have a large history in it already).

All this is nice if the data is arriving from syslog and going only to Solr by way of Morphlines, but in today’s enterprise IT, there are usually many different data sources. In many companies, applications write important events directly to Kafka without going through syslog or Log4J at all.

To get data from Kafka, parse it with Morphlines, and index it into Solr, you can use an almost identical configuration. The only changes required are:

  • Leave out the Syslog source.
  • When configuring the Kafka channel, specify: parseAsFlumeEvent = false.

These changes are necessary because the events are now written to Kafka by apps other than Flume, so the source is not necessary (Kafka channel will get events from Kafka to SolrSink) and the events in the channel can be any data type, not necessarily a FlumeEvent.

This configuration allows indexing and searching using Cloudera Search any enterprise event that was written to Kafka (including logs, metrics, audit events, and so on).

ETL Coding with Kite Morphlines

A “morphline” is a rich configuration file that makes it easy to define a transformation chain that can consume any kind of data from any kind of data source, process that data, and load the results into a Hadoop component. Apache log parsing is achieved with the help of the Morphlines library, an open source framework available through the Kite SDK, that defines a transformation chain without a single line of code.

Our morphline configuration file will break down raw apache logs and generate Solr fields that will be used for indexing. The morphlines library will perform the following actions:

  • Read the logs with the readCSV command, using space as a separator
  • Use the split command to break up request field into three parts:  method, url, protocol
  • Use the split command to extract app and subapp fields from the url field
  • Use the userAgent command to extract all of the device, OS, and user agent information
  • Use the geoIP and extractJsonPaths commands to retrieve geo coordinates such as country, region, city, latitude, and longitude by doing a lookup against an efficient in-memory Maxmind database. Therefore, the databases need to be downloaded from Maxmind
  • Generate unique ID for every log with the generateUUID command
  • Convert the date/timestamp into a field that Solr will understand, with the convertTimestamp command
  • Drop all of the extra fields that we did not specify in schema.xml, with the sanitizeUknownSolrFields command, and
  • Load the record into Solr for HDFS write, with the loadSolr command

When building this example, we initially used three morphlines commands to break up the Apache log event: readCSV, split, split. Our intention was to make this blog more generic and demonstrate how easy it can be adapted to all different types of logs. However, the creators of the morphlines library have generously provided a number of pre-defined patterns for commonly used log formats, including Apache web server ones. What follows is an alternative way of reading the Apache log events and breaking them up into fields via morphlines:

{        readLine {          ignoreFirstLine : true          commentPrefix : "#"          charset : UTF-8        }      }                    {        grok {          dictionaryFiles : [target/test-classes/grok-dictionaries]                   expressions : {                      message : """<%{COMBINEDAPACHELOG:apache_log}>"""               }          extract : inplace          findSubstrings : false          addEmptyStrings : false        }      }

Picture Perfect with Hue Dashboards

Now that your logs are indexed in near real time, you need a dashboard to search and drill into the events. The best tool for this job is Hue, the open source GUI for Hadoop, which comes preloaded with a Search application.


With just a few simple clicks, we can generate a nice dashboard, and present it to the end user. 

Start by clicking the Search->Indexes menu item, then click on Dashboards and Create. You will then see a new dashboard template window such as the one below; to get going, click the little pencil (Edit) button. 

Next, choose how to present the search results on the dashboard. For our demo, we chose Grid Layout, but if you are handy with HTML you can chose an HTML layout and present the results in a sexier manner. 

The next step is where all fun begins: You can drag and drop different widgets on the screen and assign them to the fields in the index. At the moment, Hue Search Dashboards support the following widgets:

  • Filter Bar
  • Marker Map
  • Text Facet
  • Pie Chart
  • Bar Chart
  • Line Chart
  • Tree
  • Heatmap
  • Timeline
  • Gradient Map

For full reference of how to build your own dashboards, follow the links below:


For our Apache logs demo, we used pie charts to give users the ability to drill into Application, Region, and Operating System facets. Text facets allow users to drill into country and city. A timeline view provides a nice graphical view of when users accessed our website. Finally, a marker map visually displays geo locations from which users accessed our example website.


Although the main example in this post describes a use case involving Apache web server logs, you could just easily use the same components for any type of log/event processing. For an information security use case, processing proxy and firewall logs in real time can go a long way toward stopping external attacks and preventing insider threats. For an insurance company, processing claims and making them searchable to adjusters and fraud analysts can decrease time to resolution.

Whatever the use case, the ongoing investment in pervasive analytics is key.

Gwen Shapira is a Software Engineer at Cloudera, working on the Data Ingest team.

Jeff Shmain is a Solutions Architect at Cloudera.

Categories: Hadoop