Hadoop

Advanced Analytics with Apache Spark: The Book

Cloudera Blog - Fri, 01/23/2015 - 17:16

Authored by a substantial portion of Cloudera’s Data Science team (Sean Owen, Sandy Ryza, Uri Laserson, Josh Wills), Advanced Analytics with Spark (currently in Early Release from O’Reilly Media) is the newest addition to the pipeline of ecosystem books by Cloudera engineers. I talked to the authors recently.

Why did you decide to write this book?

We think it’s mostly to fill a gap between what a lot of people need to know to be productive with large-scale analytics on Apache Hadoop in 2015, and the resources that are out there. There are plenty of books on machine learning theory, and plenty of references covering how to use Hadoop ecosystem tools. However, there is not as much specifically targeting the overlap between the two, and focusing on use cases and examples rather than being a manual. So the book is a modest attempt to meet that need, which we see turn up frequently among customers and in the community.

Who is the intended reader?

The ideal reader is a data scientist or aspiring data scientist. “Data scientist” has come to mean quite a few things, but the book is targeted specifically at the subset who are interested in analysis on large datasets, and who are motivated to learn a bit about the software and mathematical underpinnings of doing so. It will be most useful for people who want to get their heads around the basics of machine learning but are more interested in its application than the theory.

Different chapters appeal to different levels of experience in different fields. For example, the second chapter, on record linkage, seeks to teach the basics of using Scala and Apache Spark to work with data, while the eighth chapter, on estimating financial risk through Monte Carlo simulation, assumes a basic understanding of probability and statistics.

What will readers learn, and how does it complement what they will learn from other titles on the market?

Readers ought to pick up the 20% of Spark that’s used 80% of the time in practice. It’s not a reference by any means; Learning Spark (also in Early Release at the time of this writing) is the “definitive” guide. Likewise, it gives enough machine-learning theory to use Spark as a tool for analytics correctly but is not a textbook or ML course. It still complements, say, Coursera’s free online ML courses.

What makes Spark so different in this particular area? Why do people need to know about this?

The first couple chapters of the book actually try to answer this question, and we think it comes down to couple things. Spark is just far more developer-friendly than its predecessor frameworks that process large datasets. Its rich library of operators makes expressing complex transformations easy, and the interactive environment it provides enables exploratory analysis. Spark also has primitives that open up many of the processing patterns required by machine-learning algorithms. It’s relevant for exploratory as well operational analytics. 

None of these capabilities are individually new, but having one platform that does a decent job at all of them is powerful. Its abstractions strike a nice balance between forcing the user to write programs that can scale to lots of data and allowing them to think about things at a high level.

Advanced Analytics with Spark is scheduled to be generally available by April 2015. Get a free signed Early Release from the authors at Strata + Hadoop World San Jose 2015!

Categories: Hadoop

New in Cloudera Labs: Google Cloud Dataflow on Apache Spark

Cloudera Blog - Tue, 01/20/2015 - 16:00

Cloudera and Google are collaborating to bring Google Cloud Dataflow to Apache Spark users (and vice-versa). This new project is now incubating in Cloudera Labs!

“The future is already here—it’s just not evenly distributed.” —William Gibson

For the past decade, a lot of the future has been concentrated at Google’s headquarters in Mountain View. Because of the scale of its operations, Google usually bumped up against the limitations of the current state-of-the-art before anyone else, and was required to come up with its own solutions to the problems it encountered. From time to time, it would publish its solutions, either in the form of open source software projects like Guava or protocol buffers, or as research papers that would challenge and inspire the broader academic and open source communities. Open source projects like Apache Hadoop, Apache HBase, and Apache Parquet (incubating) were all inspired by research papers that Google published about their internal data management systems.

With the release of Cloud Dataflow, Google is leveraging its cloud computing infrastructure to provide a service that developers can use to execute their own batch and streaming data pipelines. Cloud Dataflow is a descendent of the FlumeJava (PDF) batch processing engine (which served as inspiration for both Apache Crunch and Apache Spark, the new standard for data processing on Hadoop) that has been extended to support stream processing using ideas from Google’s Millwheel project. Even better, Google has released the Dataflow SDK as an Apache-licensed project that can support alternative backends, and Cloudera was pleased to collaborate with our friends at Google on a version of Dataflow that runs on Apache Spark. This new Dataflow “runner,” which allows users to target a Dataflow pipeline for execution on Spark, has now joined Cloudera Labs as an incubating project (as usual, for testing and experimentation only.)

One of the most compelling aspects of Cloud Dataflow is its approach to one of the most difficult problems facing data engineers: how to develop pipeline logic that can execute in both batch and streaming contexts. Although the lambda architecture, best represented by Twitter’s Summingbird project, has been the recommended approach for the last few years, Jay Kreps wrote a blog post in July 2014 that argued for a “kappa architecture” based on a streaming-oriented execution model built on top of Apache Kafka. Cloud Dataflow ends up between these two extremes: the streaming execution engine has strong consistency guarantees and provides a windowing model that is even more advanced than the one in Spark Streaming, but there is still a distinct batch execution engine that is capable of performing additional optimizations to pipelines that do not process streaming data. Crucially, the client API for the batch and the stream processing engines are identical, so that any operation that can be performed in one context can also be performed in the other, and moving a pipeline from batch mode to streaming mode should be as seamless as possible.

You can begin constructing your own Dataflow pipelines for local execution by downloading the SDK and reading the getting started guide (see also the StackOverflow tag for Dataflow). Instructions for setting up the Spark runner for Dataflow are in the README on our github repository, along with a simple example pipeline you can setup and run. Note that the Spark runner currently requires Apache Spark version 1.2, which ships as part of CDH 5.3.0, and currently only supports batch pipelines as we work on extending Spark Streaming to support all of the windowing functionality provided by Dataflow.

Enjoy! To provide feedback about the new Spark runner for Dataflow, use our Cloudera Labs discussion forum.

Josh Wills is Cloudera’s Senior Director of Data Science.

Categories: Hadoop

How-to: Deploy Apache Hadoop Clusters Like a Boss

Cloudera Blog - Fri, 01/16/2015 - 17:20

Learn how to set up a Hadoop cluster in a way that maximizes successful production-ization of Hadoop and minimizes ongoing, long-term adjustments.

Previously, we published some recommendations on selecting new hardware for Apache Hadoop deployments. That post covered some important ideas regarding cluster planning and deployment such as workload profiling and general recommendations for CPU, disk, and memory allocations. In this post, we’ll provide some best practices and guidelines for the next part of the implementation process: configuring the machines once they arrive. Between the two posts, you’ll have a great head start toward production-izing Hadoop.

Specifically, we’ll cover some important decisions you must make to ensure network, disks, and hosts are configured correctly. We’ll also explain how disks and services should be laid out to be utilized efficiently and minimize problems as your data sets scale.

After following this, you’ll be able to

Networking: May All Your SYNs Be Forgiven Hostname Resolution, DNS and FQDNs

A Hadoop Java process such as the DataNode gets the hostname of the host on which it is running and then does a lookup to determine the IP address. It then uses this IP to determine the canonical name as stored in DNS or /etc/hosts. Each host must be able to perform a forward lookup on its own hostname and a reverse lookup using its own IP address. Furthermore, all hosts in the cluster need to resolve other hosts. You can verify that forward and reverse lookups are configured correctly using the Linux host command.

$ host `hostname` bp101.cloudera.com has address 10.20.195.121 $ host 10.20.195.121 121.195.20.10.in-addr.arpa domain name pointer bp101.cloudera.com

Cloudera Manager uses a quick Python command to test proper resolution.

$ python -c 'import socket; print socket.getfqdn(), socket.gethostbyname(socket.getfqdn())'

While it is tempting to rely on /etc/hosts for this step, we recommend using DNS instead. DNS is much less error-prone than using the hosts file and makes changes easier to implement down the line. Hostnames should be set to the fully-qualified domain name (FQDN). It is important to note that using Kerberos requires the use of FQDNs, which is important for enabling security features such as TLS encryption and Kerberos. You can verify this with

$ hostname --fqdn bp101.cloudera.com

If you do use /etc/hosts, ensure that you are listing them in the appropriate order.

192.168.2.1 bp101.cloudera.com bp101 master1 192.168.2.2 bl102.cloudera.com bp102 master2

Name Service Caching

Hadoop makes extensive use of network-based services such as DNS, NIS, and LDAP. To help weather network hiccups, alleviate stress on shared infrastructure, and improve the latency of name resolution, it can be helpful to enable the name server cache daemon (nscd). nscd caches the results of both local and remote calls in memory, often avoiding a latent round-trip to the network. In most cases you can enable nscd, let it work, and leave it alone. If you’re running Red Hat SSSD, you’ll need to modify the nscd configuration; with SSSD enabled, don’t use nscd to cache passwd, group, or netgroup information.

Link Aggregation

Also known as NIC bonding or NIC teaming, this refers to combining network interfaces for increased throughput or redundancy. Exact settings will depend on your environment. 

There are many different ways to bond interfaces. Typically, we recommend bonding for throughput as opposed to availability, but that tradeoff will depend greatly on the number of interfaces and internal network policies. NIC bonding is one of Cloudera’s highest case drivers for misconfigurations. We typically recommend enabling the cluster and verifying everything work before enabling bonding, which will help troubleshoot any issues you may encounter.

Multi-Homed Networks

Another frequently asked question is whether Hadoop can handle interfaces on separate networks. The HDFS docs have some information, and logically it makes sense to separate the network of the Hadoop nodes from a “management” network. However, in our experience, multi-homed networks can be tricky to configure and support. The pain stems from Hadoop integrating with a large ecosystem of components that all have their own network and port-binding settings. New components can lack the ability to bind to specific networks or wildcard addresses depending on your setup. It can be advantageous to first setup your network without multi-homing to avoid trouble and keep your cluster on the same network. Once you are certain everything is setup properly, then go back and add in the management network.

VLAN

VLANs are not required, but they can make things easier from the network perspective. It is recommended to move to a dedicated switching infrastructure for production deployments, as much for the benefit of other traffic on the network as anything else. Then make sure all of the Hadoop traffic is on one VLAN for ease of troubleshooting and isolation.

Operating System (OS)

Cloudera Manager does a good job of identifying known and common issues in the OS configuration, but double-check the following:

IPTables

Some customers disable IPTables completely in their initial cluster setup. Doing makes things easier from an administration perspective of course, but also introduces some risk. Depending on the sensitivity of data in your cluster you may wish to enable IP Tables. Hadoop requires many ports to communicate over the numerous ecosystem components but our documentation will help navigate this.

SELinux

It is challenging to construct an SELinux policy that governs all the different components in the Hadoop ecosystem, and so most of our customers run with SELinux disabled. If you are interested in running SELinux make sure to verify that it is on a supported OS version. We recommend only enabling permissive mode initially so that you can capture the output to define a policy that meets your needs.

Swappiness

The traditional recommendation for worker nodes was to set swappiness (vm.swappiness) to 0. However, this behavior changed in newer kernels and we now recommend setting this to 1. (This post has more details.)

$ sysctl vm.swappiness=1 $ echo "vm.swappiness = 1" >> /etc/sysctl.conf

Limits

The default file handle limits (aka ulimits) of 1024 for most distributions are likely not set high enough. Cloudera Manager will fix this issue, but if you aren’t running Cloudera Manager, be aware of this fact. Cloudera Manager will not alter users’ limits outside of Hadoop’s default limits. Nevertheless, it is still beneficial to raise the global limits to 64k.

Transparent Huge Pages (THP)

Most Linux platforms supported by CDH 5 include a feature called Transparent Huge Page compaction, which interacts poorly with Hadoop workloads and can seriously degrade performance. Red Hat claims versions past 6.4 patched this bug but there are still remnants that can cause performance issues. We recommend disabling defrag until further testing can be done.

Red Hat/CentOS: /sys/kernel/mm/redhat_transparent_hugepage/defrag
Ubuntu/Debian, OEL, SLES: /sys/kernel/mm/transparent_hugepage/defrag

$ echo 'never' > defrag_file_pathname

**Remember to add this to your /etc/rc.local file to make it reboot persistent.**

Time

Make sure you enable NTP on all of your hosts.

Storage

Properly configuring the storage for your cluster is one of the most important initial steps. Failure to do so correctly will lead to pain down the road as changing the configuration can be invasive and typically requires a complete redo of the current storage layer.

OS, Log Drives and Data Drives

Typical 2U machines come equipped with between 16 and 24 drive bays for dedicated data drives, and some number of drives (usually two) dedicated for the operating system and logs. Hadoop was designed with a simple principle: “hardware fails.”  As such, it will sustain a disk, node, or even rack failure. (This principle really starts to take hold at massive scale but let’s face it: if you are reading this blog, you probably aren’t at Google or Facebook.) 

Even at normal-person scale (fewer than 4,000 nodes), Hadoop survives hardware failure like a boss but it makes sense to build in a few extra redundancies to reduce these failures. As a general guideline, we recommend using RAID-1 (mirroring) for OS drives to help keep the data nodes ticking a little longer in the event of losing an OS drive. Although this step is not absolutely necessary, in smaller clusters the loss of one node could lead to a significant loss in computing power.

The other drives should be deployed in a JBOD (“Just a Bunch Of Disks”) configuration with individually mounted ext4 partitions on systems running RHEL6+, Debian 7.x, or SLES11+. In some hardware profiles, individual RAID-0 volumes must be used when a RAID controller is mandatory for that particular machine build. This approach will have the same effect as mounting the drives as individual spindles.

There are some mount options that can be useful. These are covered well in Hadoop Operations and by Alex Moundalexis, but echoed here.

Root Reserved Space

By default, both ext3 and ext4 reserve 5% of the blocks on a given filesystem for the root user. This reserve isn’t needed for HDFS data directories, however, and you can adjust it to zero when creating the partition or after using mkfs and tune2fs respectively.

$ mkfs.ext4 -m 0 /dev/sdb1 $ tune2fs -m 0 /dev/sdb1

File Access Time

Linux filesystems maintain metadata that records when each file was last accessed—thus, even reads result in a write to disk. This timestamp is called atime and should be disabled on drives configured for Hadoop. Set it via mount option in /etc/fstab:

/dev/sdb1 /data1    ext4    defaults,noatime       0

and apply without reboot.

mount -o remount /data1

Directory Permissions

This is a minor point but you should consider changing the permissions on your directories to 700 before you mount data drives. Consequently, if the drives become unmounted, the processes writing to these directories will not fill up the OS mount.

LVM, RAID or JBOD

We are frequently asked whether a JBOD configuration, RAID configuration, or LVM configuration is required. The entire Hadoop ecosystem was created with a JBOD configuration in mind. HDFS is an immutable filesystem that was designed for large file sizes with long sequential reads. This goal plays well with stand-alone SATA drives, as they get the best performance with sequential reads. In summary, whereas RAID is typically used to add redundancy to an existing system, HDFS already has that built in. In fact, using a RAID system with Hadoop can negatively affect performance.

Both RAID-5 and RAID-6 add parity bits into the RAID stripes. These parity bits have to be written and read during standard operations and add significant overhead. Standalone SATA drives will write/read continuously without having to worry about the parity bits since they don’t exist. In contrast, HDFS takes advantage of having numerous individual mount points and can allow individual drives/volumes to fail before the node goes down—which is HDFS’s not-so secret sauce for parallelizing I/O. Setting the drives up in RAID-5 or RAID-6 arrays will create a single array or a couple very large arrays of mount points depending on the drive configuration. These RAID arrays will undermine HDFS’s natural promotion of data protection, slower sequential reads, and data locality of Map tasks.

RAID arrays will also affect other systems that expect numerous mount points. Impala, for example, spins up a thread per spindle in the system, which will perform favorably in a JBOD environment vs. a large single RAID group. For the same reasons, configuring your Hadoop drives under LVM is neither necessary nor recommended.

Deploying Heterogeneously

Many customers purchase new hardware in regular cycles; adding new generations of computing resources makes sense as data volumes and workloads increase. For such environments containing heterogeneous disk, memory, or CPU configurations, Cloudera Manager allows Role Groups, which allow the administrator to specify memory, YARN containers, and Cgroup settings per node or per groups of nodes. 

While Hadoop can certainly run with mixed hardware specs, we recommend keeping worker-node configurations homogenous, if possible. In distributed computing environments, workloads are distributed amongst nodes and optimizing for local data access is preferred. Nodes configured with fewer computing resources can become a bottleneck, and running with a mixed hardware configuration could lead to a wider variation in SLA windows. There are a few things to consider:

  • Mixed spindle configuration - HDFS block placement by default works in a round-robin fashion across all the directories specified by dfs.data.dir. If you have, for example, a node with six 1.2TB drives and six 600GB drives, you will fill up the smaller drives more quickly, leading to volume imbalance. Using the Available Space policy requires additional configuration, and in this scenario I/O bound workloads could be affected as you might only be writing to a subset of your disks. Understand the implications of deploying drives in this fashion in advance. Furthermore, if you deploy nodes with more overall storage, remember that HDFS balances by percentage.
  • Mixed memory configuration – Mixing available memory in worker nodes can be problematic as it does require additional configuration.
  • Mixed CPU configuration - Same concept; jobs can be limited by the slowest CPU, effectively negating the benefits of running updated/more cores.

It is important to be cognizant of the points above but remember that Cloudera Manager can help with allocating resources to different hosts; allowing you to easily manage and optimize your configuration.

Cloudera Manager Like A Boss

We highly recommend using  Cloudera Manager to manage your Hadoop cluster. Cloudera Manager offers many valuable features to make life much easier. The Cloudera Manager documentation is pretty clear on this but in order to stamp out any ambiguity, below are the high-level steps to do a production-ready Hadoop deployment with Cloudera Manager.

  1. Set up an external database and pre-create the schemas needed for your deployment.
    create database amon DEFAULT CHARACTER SET utf8; grant all on amon.* TO 'amon'@'%' IDENTIFIED BY 'amon_password'; create database rman DEFAULT CHARACTER SET utf8; grant all on rman.* TO 'rman'@'%' IDENTIFIED BY 'rman_password'; create database metastore DEFAULT CHARACTER SET utf8; grant all on metastore.* TO 'metastore'@'%' IDENTIFIED BY 'metastore_password'; create database nav DEFAULT CHARACTER SET utf8; grant all on nav.* TO 'nav'@'%' IDENTIFIED BY 'nav_password'; create database sentry DEFAULT CHARACTER SET utf8; grant all on sentry.* TO 'sentry'@'%' IDENTIFIED BY 'sentry_password';

    (Please change the passwords in the examples above!)

  2. Install the cloudera-manager-server and cloudera-manager-daemons packages per documentation.
    yum install cloudera-manager-server cloudera-manager-daemons
  3. Run the scm_prepare_database.shscript specific to your database type.
    /usr/share/cmf/schema/scm_prepare_database.sh mysql -h cm-db-host.cloudera.com -utemp -ptemp --scm-host cm-db-host.cloudera.com scm scm scm
  4. Start the Cloudera Manager Service and follow the wizard from that point forward.

This is the simplest way to install Cloudera Manager and will get you started with a production-ready deployment in under 20 minutes.

You Play It Out: Services Layout Guide

Given a Cloudera Manager-based deployment, the diagrams below present a rational way to lay out service roles across the cluster in most configurations.

In larger clusters (50+ nodes), a move to five management nodes might be required, with dedicated nodes for the ResourceManager and NameNode pairs. Further, it is not uncommon to use an external database for Cloudera Manager, the Hive Metastore, and so on, and additional HiveServer2 or HMS services could be deployed as well.

We recommend 128GB per management node and 256-512GB for worker nodes. Memory is relatively inexpensive and as computation engines increasingly rely on in-memory execution the additional memory will be put to good use.

Diving a little deeper, the following charts depict the appropriate disk mappings to the various service storage components.

We specify the use of an LVM here for the Cloudera Manager databases but RAID 0 is an option, as well.

Conclusion

Setting up a Hadoop cluster is relatively straightforward once armed with the appropriate knowledge. Take the extra time to procure the right infrastructure and configure it correctly from the start. Following the guidelines described above will give you the best chance for success in your Hadoop deployment and you can avoid fussing with configuration, allowing you to focus your time on solving real business problems—like a boss.

Look for upcoming posts on security and resource management best practices.

Jeff Holoman and Kevin O’Dell are System Engineers at Cloudera.

Categories: Hadoop

Improving Sort Performance in Apache Spark: It’s a Double

Cloudera Blog - Wed, 01/14/2015 - 16:47

Cloudera and Intel engineers are collaborating to make Spark’s shuffle process more scalable and reliable. Here are the details about the approach’s design.

What separates computation engines like MapReduce and Apache Spark from embarrassingly parallel systems is their support for “all-to-all” operations. As distributed engines, MapReduce and Spark operate on sub-slices of a dataset partitioned across the cluster. Many operations process single data-points at a time and can be carried out fully within each partition. All-to-all operations must consider the dataset as a whole; the contents of each output record can depend on records that come from many different partitions. In Spark, groupByKey, sortByKey, and reduceByKey are popular examples of these types of operations.

In these distributed computation engines, the “shuffle” refers to the repartitioning and aggregation of data during an all-to-all operation. Understandably, most performance, scalability, and reliability issues that we observe in production Spark deployments occur within the shuffle.

Cloudera and Intel engineers are collaborating on work to augment Spark’s shuffle so that it can handle large datasets more quickly and more reliably. While Spark has advantages over MapReduce in many respects, it is still catching up on scalability and reliability. We’ve borrowed concepts from the battle-tested MapReduce shuffle implementation to improve shuffle operations that output sorted data.

In this post, we’ll survey the workings of the current Spark shuffle implementation, our proposed changes, and how they improve its performance. Work is in progress upstream at SPARK-2926.

Current State of Affairs

A shuffle involves two sets of tasks: tasks from the stage producing the shuffle data and tasks from the stage consuming it. For historical reasons, the tasks writing out shuffle data are known as “map task” and the tasks reading the shuffle data are known as “reduce tasks.” These roles are with respect to a particular shuffle within a job. A task might be a reduce task in one shuffle where it’s reading data, and then a map task for the next shuffle, where it’s writing data to be consumed by a subsequent stage.

The MapReduce and Spark shuffles use a “pull” model. Every map task writes out data to local disk, and then the reduce tasks make remote requests to fetch that data. As shuffles sit underneath all-to-all operations, any map task may have some set of records that are meant for any reduce task. The job of the map side of the shuffle is to write out records in such a way that all records headed for the same reduce task are grouped next to each other for easy fetching.

Spark’s original shuffle (“hash-based shuffle”) implementation accomplished this goal by opening a file in each map task for each reduce task. This approach has a simplicity advantage but runs into a few issues. For example, Spark must either use lots of memory holding a buffer over each file or incur lots of random disk I/O. Furthermore, if M and R are the number of map and reduce tasks in a shuffle, hash-based shuffle requires a total of M * R intermediate files. Shuffle consolidation work reduced this to C * R intermediate files, where C is the number of map tasks that can run at the same time. But even with this change, users would often run into the “Too many open files” ulimit when running jobs with non-trivial numbers of reducers.


Single map task in hash-based shuffle


Single map task in sort-based shuffle

To further improve the scalability and performance of shuffle, starting with release 1.1, Spark introduced a “sort-based shuffle” implementation that is similar to the map-side approach used by MapReduce. In this implementation, the map output records from each task are kept in memory until they can’t fit. At that point they are sorted by the reduce task for which they are destined and then spilled to a single file. If this process occurs multiple times within a task, the spilled segments are merged later.

On the reduce side, a set of threads are responsible for fetching the remote map output blocks. As each block comes in, its records are deserialized and passed into a data structure appropriate to the all-to-all operation being carried out. For aggregation operations like groupByKey, reduceByKey, and aggregateByKey, the records are passed into an ExternalAppendOnlyMap, which is essentially a hash map that can spill to disk when it overflows memory. For ordering operations like sortByKey, records are passed into an ExternalSorter, which sorts them, possibly spilling to disk, and returns an iterator over them in sorted order.

Full Sort-based Shuffle

There are two disadvantages to the approaches described above:

  • Each Spark reduce task holds many deserialized records in memory at once. Large numbers of Java objects put pressure the JVM’s garbage collection that can lead to slowdowns and pauses. They also take up more memory than their serialized versions, meaning that Spark must spill earlier and more often, incurring more disk I/O. Furthermore, it’s difficult to determine memory footprint of deserialized objects with 100% accuracy, so holding more of them opens up more possibilities for out-of-memory errors.
  • When conducting an operation that requires sorting the records within partitions, we end up sorting the same data twice: first by partition in the mapper, and then by key in the reducer.

Our change sorts the records by key within the partitions on the map side as well. Then, on the reduce side, we need only to merge the sorted blocks coming in from each map task. We can store the blocks in memory in serialized form and deserialize a record at a time as we merge. Thus the maximum number of deserialized records in memory at any time is the number of blocks we’re merging together.


Single map task in full sort-based shuffle

A single reduce tasks can receive blocks from thousands of map tasks. To make this many-way merge efficient, especially in the case where the data does not fit in memory, we introduce a tiered merger. When we need to merge many on-disk blocks, the tiered merger conducts merges on subsets of the blocks to minimize disk seeking. This tiered merge is applicable to the merge steps inside ExternalAppendOnlyMap and ExternalSorter as well, but we haven’t yet modified them to take advantage of it.

High-Performance Merging

For each task, a set of threads are responsible for concurrently fetching shuffle data. A per-task memory pool of 48MB is used as a landing spot for fetched data.

The SortShuffleReader we are introducing is responsible for taking blocks from there and exposing an iterator over [Key, Value] pairs to the user code.

Spark maintains a main shuffle memory zone shared across all tasks whose default size is 20% of the full executor heap. As blocks come in, the SortShuffleReader tries to acquire shuffle memory from this main zone for them. Serialized blocks fill up in memory until an attempt to acquire memory fails. At this point, we needs to spill data to disk to free up space. The SortShuffleReader merges all  (well, not actually all; sometimes it’s better to only spill a few) in-memory blocks into a single sorted on-disk file. As blocks pile up on disk, a background thread monitors them and merges them into larger sorted on-disk blocks if necessary. The “final merge” that feeds the Iterator passed to user code merges the final set of on-disk blocks with any blocks remaining in memory.

How do we decide when an intermediate disk-to-disk merge is necessary? The spark.shuffle.maxMergeFactor property (defaulting to 100) controls the maximum number of on-disk blocks that may be merged at once. When the number of on-disk blocks exceeds this limit the background thread runs a merge to bring this number down (but not immediately; more details in the code). In deciding how many blocks to merge, the thread first minimizes the number of merges it carries out, and, within that number, tries to merge as few blocks as possible. Consequently, if spark.shuffle.maxMergeFactor is 100 and the final number of on-disk blocks is 110, it only merges 11 blocks together, which puts the final number of on-disk blocks at exactly 100. Merging any fewer blocks would require an additional merge, and merging any more blocks would result in unnecessary disk I/O.



Tiered merge with maxMergeWidth=4. Each rectangle is an on-disk segment. Three segments are merged into a single segment and then the final four segments are merged into the iterator that’s fed to the next operation.

Performance Comparison with sortByKey

We tested using SparkPerf‘s sort-by-key workload to assess the performance impact of our change. We choose two different size datasets to compare the performance gain of our change when memory is sufficient to hold all the shuffle data or or not.

Spark’s sortByKey transformation results in two jobs and three stages.

  • Sample stage: Sample the data to create a range-partitioner that will result in an even partitioning.
  • “Map” stage: Write the data to the destined shuffle bucket for reduce stage.
  • “Reduce” stage: Get the related shuffle output and merge/sort on the specific partition of dataset.

The benchmarks were conducted on a 6-node cluster. Each executor had 24 cores and 36GB of memory. The large dataset had 20 billion  records, which, compressed, occupied 409.8GB on HDFS. The small dataset had 2 billion records, which, compressed, occupied 15.9GB on HDFS. Each record was a key-value pair of two 10-character strings. The sort was conducted over 1000 partitions in both cases. The charts of running time with each stage and total jobs are shown below:


Large dataset (lower is better)


Small dataset (lower is better)

The sample stage time remains the same because it doesn’t involve a shuffle. In the map stage, because our improvement now sorts data by key within each partition, the stage running time increases (by 37% for the large dataset and 27% for the small dataset). However, the extra time is more than compensated for in the reduce stage, which now only needs to merge the sorted data. The duration of the reduce stage is reduced by over 66% on both of the datasets, leading to a 27% total speedup on the large dataset and 17% total speedup on the small dataset.

What’s Next?

SPARK-2926 is one of a few planned improvements to Spark’s shuffle machinery. In particular, there are many ways that the shuffle can manage its memory better:

  • SPARK-4550 tracks storing memory-buffered map output data as raw bytes instead of Java objects. This work will allow map output data to take up less space and thus result in fewer spills, as well as faster comparisons on raw bytes.
  • SPARK-4452 tracks allocating memory more carefully between different shuffle data structures that make use of it, as well as giving no-longer needed memory back sooner.
  • SPARK-3461 tracks streaming over the values that correspond to a particular key passing on the results of a groupBy or join instead of loading them all into memory at once.

Sandy Ryza is a Data Scientist at Cloudera, a Hadoop committer, and a Spark contributor. He is a co-author of Advanced Analytics with Spark.

Saisai (Jerry) Shao is a Software Engineer at Intel and a Spark contributor.

Categories: Hadoop

Using Apache Sqoop for Load Testing

Cloudera Blog - Mon, 01/12/2015 - 17:05

Our thanks to Montrial Harrell, Enterprise Architect for the State of Indiana, for the guest post below.

Recently, the State of Indiana has begun to focus on how enterprise data management can help our state’s government operate more efficiently and improve the lives of our residents. With that goal in mind, I began this journey just like everyone else I know: with an interest in learning more about Apache Hadoop.

I started learning Hadoop via a virtual server onto which I installed CDH and worked through a few online tutorials. Then, I learned a little more by reading blogs and documentation, and by trial and error.

Eventually, I decided to experiment with a classic Hadoop use case: extract, load, and transfer (ELT). In most cases, ELT allows you to offload some resource-intensive data transforms in favor of Hadoop’s MPP-like functionality, thereby cutting resource usage on the current ETL server at a relatively low cost. This functionality is in part delivered via the Hadoop ecosystem project called Apache Sqoop.

Preparing for Sqoop

In preparing to use Sqoop, I found that there are two versions inside CDH. The classic version, called Sqoop 1, has a command line interface (CLI) and you store drivers for it in /var/lib/sqoop. If you are going to use Apache Oozie jobs that reference Sqoop, you also need to store your driver in /user/oozie/share/lib/sqoop.

The more recent Sqoop engine is called Sqoop 2. Sqoop 2 uses the path /var/lib/sqoop2 for drivers and is the method employed when you access Sqoop Transfer located under the “Data Browsers” menu in Hue. There are differences between the two Sqoop engines that are better explained by Cloudera documentation, but suffice to say here that I chose to use Sqoop 1.

The steps below are based on Sqoop 1 only; should you want to follow along, you should install the JDBC drivers first (docs).

Preparing for a SQL Sqoop Job

Here are the steps I followed to run my first Sqoop job using Microsoft SQL Server as the source:

  1. First, I downloaded the Microsoft JDBC Driver 4.0 tar file from the Microsoft Download Center.
  2. Next, I unpacked the tar file using the command tar –zxvf .
  3. I copied the driver to the path /var/lib/sqoop on the machine where I would run the Sqoop command. (If CDH is installed via a parcel, you’ll need to create this directory first.)
  4. I created a test user on the SQL Server instance with read-only permission to my test database.
  5. Next, I logged into the test Hadoop cluster.
  6. I ran the below command to import a single SQL database table.

Sqoop import –-connect "jdbc:sqlserver://xx.xx.xx.xx:3464;databaseName=testing" --username testingusername –P  --table testtable --hive-import –m 24

  • --connect = specifies the JDBC connect string that contains the server name and port if needed, database name. Please note the double-quotes around the connection string.
  • --username = database login name
  • -P = prompt user for database login password
  • --table = table you wish to import
  • --hive-import = import table into Hive
  • -m = number of map tasks (parallel processes) to use to perform the import

Finally, I logged into Hue and verified that the imported table imported successfully.

Conclusion

After testing Sqoop and becoming more familiar with its features, flexibility, and ease of use along with Apache Hive, Impala, and Apache Flume, my team and I are very excited. During our testing, I have been able to test load 65GB of data from a SQL table into Hive in 4 minutes in our testing environment and run new transformation test code in minutes as opposed to hours. This is not to say that there have not been bumps along the way, but it does confirm that this platform has definite advantages, and we intend to use it as part of our standard enterprise tool set going forward.

Categories: Hadoop

Where to Find Cloudera Tech Talks (through March 2015)

Cloudera Blog - Fri, 01/09/2015 - 17:36

Find Cloudera tech talks in Austin, London, Washington DC, Zurich, and other cities through March 2015.

Below please find our regularly scheduled quarterly update about where to find tech talks by Cloudera employees—this time, through the first quarter of calendar year 2015. Note that this list will be continually curated during the period; complete logistical information may not be available yet. And remember, many of these talks are in “free” venues (no cost of entry).

As always, we’re standing by to assist your meetup by providing speakers, sponsorships, and schwag!

Date City Venue Speaker(s) Jan. 10 Austin, Tex. Data Day Texas Wes McKinney on PyData, Joey Echeverria on Kite SDK, Hari Shreedharan on Spark Streaming, Mark Grover on Hadoop app architecture Jan. 16 Santa Clara, Calif Big Data Bootcamp Daniel Templeton on common pitfalls in MR development Jan. 19 Zurich Swiss Big Data User Group James Kinley on Hadoop cluster deployment Jan. 20 Santa Clara, Calif. Bay Area Apache Sentry Meetup Prasad Majumdar on what’s new Jan. 21 Palo Alto, Calif. SFBay Apache Lucene/Solr Meetup Wolfgang Hoschek on Solr-on-Spark Feb. 4 Austin, Tex. Austin Data Geeks Sean Busbey on Apache Accumulo Feb. 10 Portland, Ore. Portland Big Data User Group Juliet Hougland on data science for fantasy football Feb. 12 San Francisco SF Data Mining Juliet Hougland on data science for fantasy football Feb. 16 Santa Clara, Calif. USENIX FAST Ryan Blue on building data apps with Kite SDK Feb. 18-20 San Jose, Calif. Strata + Hadoop World San Jose John Russell (Impala), Tom White/Joey Echeverria/Ryan Blue (Kite SDK), Mark Grover/Ted Malaska/Jon Seidman/Gwen Shapira (architecture), and Phil Zeyliger/Phil Landale/Kate Ting (Hadoop ops) with turorials and Joey also on Hadoop security, Yanpei Chen/Karthik Kambatla on MR/SSD performance, Marcel Kornacker on Impala, Kate Ting/Miklos Christine on YARN, Woody Christie on Hadoop hardware, Aaron Myers/Daniel Templeton with Hadoop puzzlers, and Xuefu Zhang on Hive-on-Spark Feb. 18 San Francisco SF Hadoop Users Charles Lamb on HDFS encryption Feb. 25 Tyson’s Corner, Va. Cloudera Federal Forum Eddia Garcia on Hadoop security, Juliet Hougland on Spark, Eva Andreasson on Solr+Hadoop Feb. 25 Sunnyvale, Calif. The Hive Big Data Think Tank Jai Ranganathan on Lambda architecture (panelist) March 11 Seattle, Wash. Seattle Spark Meetup Hari Sheedharan on Spark Streaming March 26 London QCon London Sean Owen on Spark

 

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

New Advanced Analytics and Data Wrangling Tutorials on Cloudera Live

Cloudera Blog - Thu, 01/08/2015 - 21:01

A new Spark tutorial and Trifacta deployment option make Cloudera Live even more useful for getting started with Apache Hadoop.

When it comes to learning Hadoop and CDH (Cloudera’s open source platform including Hadoop), there is no better place to start than Cloudera Live (cloudera.com/live).  With a quick, one-button deployment option, Cloudera Live launches a four-node Cloudera cluster that you can learn and experiment in free for two-weeks. To help plan and extend the capabilities of your cluster, we also offer various partner deployments. Building on the addition of interactive tutorials and Tableau and Zoomdata integration, we have added a new tutorial on Apache Spark and a new Trifacta partner deployment.

One of the most popular tools in the Hadoop ecosystem is Apache Spark. This easy-to-use, general-purpose framework is extensible across multiple use cases – including batch processing, iterative advanced analytics, and real-time stream processing. With support and development from multiple industry vendors and partner tools, Spark has quickly become a standard within Hadoop.

With the new tutorial, “Relationship Strength Analytics Using Spark,” it will walk you through the basics of Spark and how you can utilize the same, unified enterprise data hub to launch into advanced analytics. Using the example of product relationships, it will walk you through how to discover what products are commonly viewed together, how to optimize product campaigns together for better sales, and discover other insights about product relationships to help build advanced recommendations.

In addition to the Spark tutorial, we have also added another partner deployment. One of the key strengths of implementing an enterprise data hub is its ability to integrate with other popular tools that you may already be using or would like to implement. One such tool is Trifacta. Trifacta lets you easily transform raw, complex data into clean and structured formats for analysis, so you can get more value from your data faster. With the new Trifacta deployment on Cloudera Live, you get the full functionality of Cloudera’s platform, along with an integrated trial of the Trifacta Data Transformation Platform to help you wrangle a variety of complex data.

Categories: Hadoop

New in CDH 5.3: Transparent Encryption in HDFS

Cloudera Blog - Wed, 01/07/2015 - 16:48

Support for transparent, end-to-end encryption in HDFS is now available and production-ready (and shipping inside CDH 5.3 and later). Here’s how it works.

Apache Hadoop 2.6 adds support for transparent encryption to HDFS. Once configured, data read from and written to specified HDFS directories will be transparently encrypted and decrypted, without requiring any changes to user application code. This encryption is also end-to-end, meaning that data can only be encrypted and decrypted by the client. HDFS itself never handles unencrypted data or data encryption keys. All these characteristics improve security, and HDFS encryption can be an important part of an organization-wide data protection story.

Cloudera’s HDFS and Cloudera Navigator Key Trustee (formerly Gazzang zTrustee) engineering teams did this work under HDFS-6134 in collaboration with engineers at Intel as an extension of earlier Project Rhino work. In this post, we’ll explain how it works, and how to use it.

Background

In a traditional data management software/hardware stack, encryption can be performed at different layers, each with different pros and cons:

  • Application-level encryption is the most secure and flexible one. The application has ultimate control over what is encrypted and can precisely reflect the requirements of the user. However, writing applications to handle encryption is difficult. It also relies on the application supporting encryption, which may rule out this approach with many applications already in use by an organization. If integrating encryption in the application isn’t done well, security can be compromised (keys or credentials can be exposed).
  • Database-level encryption is similar to application-level encryption. Most database vendors offer some form of encryption; however, database encryption often comes with performance trade-offs. One example is that indexes cannot be encrypted.
  • Filesystem-level encryption offers high performance, application transparency, and is typically easy to deploy. However, it can’t model some application-level policies. For instance, multi-tenant applications might require per-user encryption. A database might require different encryption settings for each column stored within a single file.
  • Disk-level encryption is easy to deploy and fast but also quite inflexible. In practice, it protects only against physical theft.

HDFS transparent encryption sits between database- and filesystem-level encryption in this stack. This approach has multiple benefits:

  • HDFS encryption can provide good performance and existing Hadoop applications can run transparently on encrypted data.
  • HDFS encryption prevents attacks at the filesystem-level and below (so-called “OS-level attacks”). The operating system and disk only interact with encrypted bytes because the data is already encrypted by HDFS.
  • Data is encrypted all the way to the client, securing data both when it is at rest and in transit.
  • Key management is handled externally from HDFS, with its own set of per-key ACLs controlling access. Crucially, this approach allows a separation of duties: the key administrator does not have to be the HDFS administrator, and acts as another policy evaluation point.

This type of encryption can be an important part of a certification process for industrial or governmental regulatory compliance. And that requirement is important in industries like healthcare (HIPAA), card payment (PCI DSS), and the US government (FISMA).

Design

Securely implementing encryption in HDFS presents some unique challenges. As a distributed filesystem, performance and scalability are primary concerns. Data also needs to be encrypted while being transferred over the network. Finally, as HDFS is typically run as a multi-user system, we need to be careful not to expose sensitive information to other users of the system, particularly admins who might have HDFS superuser access or root shell access to cluster machines.

With transparent encryption being a primary goal, all the above needs to happen without requiring any changes to user application code. Encryption also needs to support the standard HDFS access methods, including WebHDFS, HttpFS, FUSE, and NFS.

Key Management via the Key Management Server

Integrating HDFS with an enterprise keystore, such as Cloudera Navigator Key Trustee, was an important design goal. However, most keystores are not designed for the request rates driven by Hadoop workloads, and also do not expose a standardized API. For these reasons, we developed the Hadoop Key Management Server (KMS).

Figure 1: High-level architecture of how HDFS clients and the NameNode interact with an enterprise keystore through the Hadoop Key Management Server

The KMS acts as a proxy between clients on the cluster and a backing keystore, exposing Hadoop’s KeyProvider interface to clients via a REST API. The KMS was designed such that any keystore with the required functionality can be plugged in with a little integration effort.

Note that the KMS doesn’t itself store keys (other than temporarily in its cache). It’s up to the enterprise keystore to be the authoritative storage for keys, and to ensure that keys can never be lost—as a lost key is equivalent to destruction of data. For production use, two or more redundant enterprise key stores should be deployed.

The KMS also supports a range of ACLs that control access to keys and key operations on a granular basis. This feature can be used, for instance, to only grant users access to certain keys, or to restrict the NameNode and DataNode from accessing key material entirely. Information on how to configure the KMS is available in our documentation.

Accessing Data Within an Encryption Zone

This new architecture introduces the concept of an encryption zone (EZ), which is a directory in HDFS whose contents will be automatically encrypted on write and decrypted on read. Encryption zones always start off empty, and renames are not supported into or out of EZs. Consquently, an EZ’s entire contents are always encrypted.

Figure 2: Interaction among encryption zone keys (EZ keys), data encryption keys (DEKs), encrypted data encryption keys (EDEKs), files, and encrypted files

When an EZ is created, the administrator specifies an encryption zone key (EZ Key) that is already stored in the backing keystore. The EZ Key encrypts the data encryption keys (DEKs) that are used in turn to encrypt each file. DEKs are encrypted with the EZ key to form an encrypted data encryption key (EDEK), which is stored on the NameNode via an extended attribute on the file (1).

To encrypt a file, the client retrieves a new EDEK from the NameNode, and then asks the KMS to decrypt it with the corresponding EZ key. This step results in a DEK (2), which the client can use to encrypt their data (3).

To decrypt a file, the client needs to again decrypt the file’s EDEK with the EZ key to get the DEK (2). Then, the client reads the encrypted data and decrypts it with the DEK (4).

Figures 3 & 4: The flow of events required to write a new file to an encryption zone

The above diagrams describe the process of writing a new encrypted file in more detail. One important detail is the per-EZ EDEK cache on the NameNode, which is populated in the background. This approach obviates having to call the KMS to create a new EDEK on each create call. Furthermore, note that the EZ key is never directly handled by HDFS, as generating and decrypting EDEKs happens on the KMS.

For more in-depth implementation details, please refer to the design document posted on HDFS-6134 and follow-on work at HDFS-6891.

With regard to security, there are a few things worth mentioning:

  • First, the encryption is end-to-end. Encryption and decryption happens on the client, so unencrypted data is never available to HDFS and data is encrypted when it is at-rest as well as in-flight. This approach limits the need for the user to trust the system, and precludes threats such as an attacker walking away with DataNode hard drives or setting up a network sniffer.
  • Second, HDFS never directly handles or stores sensitive key material (DEKs or EZ keys)—thus compromising the HDFS daemons themselves does not leak any sensitive material. Encryption keys are stored separately on the keystore (persistently) and KMS (cached in-memory). A secure environment will also typically separate the roles of the KMS/keystore administrator and the HDFS administrator, meaning that no single person will have access to all the encrypted data and also all the encryption keys. KMS ACLs will also be configured to only allow end-users access to key material.
  • Finally, since each file is encrypted with a unique DEK and each EZ can have a different key, the potential damage from a single rogue user is limited. A rogue user can only access EDEKs and ciphertext of files for which they have HDFS permissions, and can only decrypt EDEKs for which they have KMS permissions. Their ability to access plaintext is limited to the intersection of the two. In a secure setup, both sets of permissions will be heavily restricted.
Configuration and New APIs

Interacting with the KMS and creating encryption zones requires the use of two new CLI commands: hadoop key and hdfs crypto. A fuller explanation is available in our documentation, but here’s a short example snippet for how you might quickly get started on a dev cluster.

As a normal user, create a new encryption key:

$ hadoop key create myKey

As the superuser, create a new empty directory anywhere in the HDFS namespace and make it an encryption zone:

$ sudo -u hdfs hadoop fs -mkdir /zone $ sudo -u hdfs hdfs crypto -createZone -keyName myKey -path /zone

Change its ownership to the normal user:

     $ sudo -u hdfs hadoop fs -chown myuser:myuser /zone

As the normal user, put a file in, read it out:

$ hadoop fs -put helloWorld /zone $ hadoop fs -cat /zone/helloWorld

Performance

Currently, AES-CTR is the only supported encryption algorithm and can be used either with a 128- or 256-bit encryption key (when the unlimited strength JCE is installed). A very important optimization was making use of hardware acceleration in OpenSSL 1.0.1e using the AES-NI instruction set, which can be an order of magnitude faster than software implementations of AES. With AES-NI, our preliminary performance evaluation with TestDFSIO shows negligible overhead for writes and only a minor impact on reads (~7.5%) with datasets larger than memory.

The cipher suite was designed to be pluggable. Adding support for additional cipher suites like AES-GCM that provide authenticated encryption is future work.

Conclusion

Transparent encryption in HDFS enables new use cases for Hadoop, particularly in high-security environments with regulatory requirements. This encryption is end-to-end, meaning that data is encrypted both at-rest and in-flight; encryption and decryption can only be done by the client. HDFS and HDFS administrators never have access to sensitive key material or unencrypted plaintext, further enhancing security. Furthermore, when using AES-NI optimizations, encryption imposes only a minor performance overhead on read and write throughput.

Acknowledgements

Transparent encryption for HDFS was developed upstream as part of a community effort involving contributors from multiple companies. The HDFS aspects of this feature were developed primarily by the authors of this blog post: Charles Lamb, Yi Liu, and Andrew Wang. Alejandro Abdelnur was instrumental in overseeing the overall design. Alejandro and Arun Suresh were responsible for implementation of the KMS, as well as encryption-related work within MapReduce. Mat Crocker and Anthony Young-Garner were responsible for integrating Navigator Key Trustee with the KMS.

Charles Lamb is a Software Engineer at Cloudera, primarily working on HDFS.

Yi Liu is a Senior Process Engineer at Intel and a Hadoop committer.

Andrew Wang is a Software Engineer at Cloudera, and a Hadoop committer/PMC member.

Learn more about Project Rhino—work done and work still to be completed—in this live Webinar on Thurs., Jan. 29, at 10am PT.

Categories: Hadoop

How-to: Ingest Data Quickly Using the Kite CLI

Cloudera Blog - Tue, 12/30/2014 - 17:27

Thanks to Ben Harden of CapTech for allowing us to re-publish the post below.

Getting delimited flat file data ingested into Apache Hadoop and ready for use is a tedious task, especially when you want to take advantage of file compression, partitioning and performance gains you get from using the Avro and Parquet file formats. 

In general, you have to go through the following steps to move data from a local file system to HDFS.

  • Move data into HDFS. If you have a raw file you can use the command line, if you’re pulling from a relational source I recommend using a tool like Apache Sqoop to easily land data and automatically create a schema and Hive table.
  • Describe and document your schema in an Avro compatible JSON schema. If you ingested data using Sqoop you’re in luck because the schema is now available to you in the Hive Metastore. If not, you need to create the schema definition by hand.
  • Define the partitioning strategy.
  • Write a program to convert your data to Avro or Parquet.
  • Using the schema created in step 2 and the file created in step 3 you can now create a schema in Hive and use HQL to view data.

Going through those steps to ingest a large amount of new data can get time consuming and very tedious. Fortunately, the Kite SDK and associated command-line interface (CLI) exist and make the process much easier

I’m not a Java developer, so I opted to use the CLI to bulk load my data into HDFS and exposed via Hive. In this example, I used a comma delimited set of 25 baseball statistics data files, with data dating back to 1893. 

Here are the steps I went through to quickly ingest this data into HDFS using Kite.

  1. Install the Kite CLI by running the following command:

[cloudera@quickstart ~]$ curl http://central.maven.org/maven2/org/kitesdk/kite-tools/0.17.0/kite-tools-0.17.0-binary.jar -o kite-dataset [cloudera@quickstart ~]$ chmod +x kite-dataset

  1. Create a folder called ingest and download and unzip the baseball statistics data into the folder

mkdir ingest gzip –d lahman-csv_2014-02-14.zip

  1. Create the following shell script and name it ingestHive.sh

#!/bin/bash FILES=/home/cloudera/ingest/*.csv for f in $FILES do filename=`basename $f` name=`echo $f | cut -f1 -d'.'` echo "************* Start Processing $name ********************" echo "./kite-dataset csv-schema $name.csv --class `basename $name` -o $name.avsc" ./kite-dataset csv-schema $name.csv --class `basename $name` -o $name.avsc echo "./kite-dataset create `basename $name` --schema $name.avsc" ./kite-dataset create `basename $name` --schema $name.avsc echo "./kite-dataset csv-import $name.csv `basename $name`" ./kite-dataset csv-import $name.csv `basename $name` echo "************* End Processing $name ********************" done

  1. Change the script to be executable and run

chmod 777 ingest.sh ./ingest.sh

  1. All data is now ingested into HDFS in compressed Avro format and tables are created in Hive
  2. We can check to confirm that tables exist in in Hive by running

[cloudera@quickstart ~]$ hive -e "show tables;"

I can use the same technique to just define a schema and load to HDFS directly, without creating a Hive table.  This is useful if the processing I want to do will not require the Hive Metastore.  As an example, I modified the above script to create a set of parquet files in HDFS:

#!/bin/bash FILES=/home/cloudera/ingest/*.csv for f in $FILES do filename=`basename $f` name=`echo $f | cut -f1 -d'.'` echo "************* Processing $name ********************" echo "./kite-dataset csv-schema $name.csv --class `basename $name` -o $name.avsc" ./kite-dataset csv-schema $name.csv --class `basename $name` -o $name.avsc echo "./kite-dataset create dataset:hdfs:/user/cloudera/baseball/`basename $name` --schema $name.avsc --format parquet" ./kite-dataset create dataset:hdfs:/user/cloudera/baseball/`basename $name` --schema $name.avsc --format parquet echo "./kite-dataset csv-import $name.csv dataset:hdfs:/user/cloudera/baseball/`basename $name`" ./kite-dataset csv-import $name.csv dataset:hdfs:/user/cloudera/baseball/`basename $name` echo "************* Processing $name ********************" done

As you can see, using Kite makes the process of ingesting, converting and publishing to Hive easy. A fairly simple ingest engine could be built using the above techniques to monitor files landing to an edge node, and as they are received automatically ingest, convert, partition and publish data to the Hive Metastore and HDFS.

Ben Harden leads the Big Data Practice at CapTech and has over 17 years of enterprise software development experience including project management, requirements gathering, functional design, technical design, development, training, testing and system implementation.

Categories: Hadoop

Cloudera Enterprise 5.3 is Released

Cloudera Blog - Tue, 12/23/2014 - 16:59

We’re pleased to announce the release of Cloudera Enterprise 5.3 (comprising CDH 5.3, Cloudera Manager 5.3, and Cloudera Navigator 2.2).

This release continues the drumbeat for security functionality in particular, with HDFS encryption (jointly developed with Intel under Project Rhino) now recommended for production use. This feature alone should justify upgrades for security-minded users (and an improved CDH upgrade wizard makes that process easier).

Here are some of the highlights (incomplete; see the respective Release Notes for CDH, Cloudera Manager, and Cloudera Navigator for full lists of features and fixes):

Security
  • Folder-level HDFS encryption (in addition to storage, management, and access to encryption zone keys) is now a production-ready feature (HDFS-6134). This feature integrates with Navigator Key Trustee so that encryption keys can be securely stored separately from the data, with all the enterprise access and audit controls required to pass most security compliance audits such as PCI.
  • The Cloudera Manager Agent can now be run as a single configured user when running as root is not permitted.
  • In Apache Sentry (incubating), data can now be shared across Impala, Apache Hive, Search, and other access methods such as MapReduce using only Sentry permissions.
  • A Sentry bug that affected CDH 5.2 upgrades has been patched (SENTRY-500).
Data Management and Governance
  • In Cloudera Navigator 2.2, policies are now generally available and enabled by default. Policies let you set, monitor and enforce data curation rules, retention guidelines, and access permissions. They also let you notify partner products, such as profiling and data preparation tools, whenever there are relevant changes to metadata.
  • Navigator 2.2′s REST API now supports user-defined relations. Using these new APIs, you can augment Navigator’s automatically-generated lineage with your own column-level lineage. This is particularly useful for custom MapReduce jobs that run on structured data sources.
  • Navigator 2.2 also features many top-requested enhancements, including metadata search auto-suggest and a number of other usability improvements.
Cloud Deployments
  • Cloudera Enterprise 5.3 is now a first-class citizen with respect to deployments on Microsoft Azure.
  • Apache Hadoop gets a new S3-native filesystem for improved performance on AWS (HADOOP-10400).
Real-Time Architecture
  • Apache Flume now includes an Apache Kafka Channel for tighter Kafka-Flume integration (FLUME-2500).
  • Apache HBase performance is significantly improved thanks to updated defaults (HBASE-2611, HBASE-12529).
New or Updated Open Source Components
  • Apache Spark 1.2
  • Hue 3.7
  • Impala 2.1

Other notables: Oracle JDK 1.8 is now supported, Impala now does incremental computation of table and column statistics (IMPALA-1122), and Apache Avro has new date, time, timestamp, and duration binary types (AVRO-739).

Over the next few weeks, we’ll publish blog posts that cover some of these features in detail. In the meantime:

As always, we value your feedback; please provide any comments and suggestions through our community forums. You can also file bugs via issues.cloudera.org.

Categories: Hadoop

HBaseCon 2015: Call for Papers and Early Bird Registration

Cloudera Blog - Fri, 12/19/2014 - 16:54

HBaseCon 2015 is ON, people! Book Thursday, May 7, in your calendars.

If you’re a developer in Silicon Valley, you probably already know that since its debut in 2012, HBaseCon has been one of the best developer community conferences out there. If you’re not, this is a great opportunity to learn that for yourself: HBaseCon 2015 will occur on Thurs., May 7, 2015, at the Westin St. Francis on Union Square in San Francisco.

If you’re new to HBase or HBaseCon, the following FAQs should give you a good overview:

What is Apache HBase?

HBase is the Apache Hadoop database: a distributed, scalable, data store that provides random, realtime read/write access to Big Data. The HBase community independently works within the Apache Software Foundation to provide HBase software under the permissive Apache license.

What is HBaseCon?

HBaseCon is the premier community event for Apache HBase contributors, developers, admins, and users of all skill levels. The event is hosted and organized by Cloudera, with the Program Committee including leaders from across the HBase community (including employees of Cask, Cloudera, Facebook, Google, Hortonworks, and Salesforce.com). In past iterations (2012-2014), employees of companies like Nielsen, Bloomberg LP, Optimizely, Intel, Facebook, Flurry, Google, Groupon, Twitter, Pinterest, Opower, Xiaomi, Yahoo!, Ancestry.com, and many others have presented about their HBase production use cases.

To get an idea of the conference experience, explore presentations from previous years, and view photos from HBaseCon 2014.

Why should I go to HBaseCon?

HBaseCon is the only place in the world where you’ll find virtually every HBase committer and power-user under one roof during a single day. Basically, if you have any serious interest in HBase at all, missing HBaseCon is unthinkable! (Plus, we always have a great party.)

What should I present at HBaseCon?

The Program Committee is looking for talks about war stories/case studies, internals, development and admin/devops best practices, and futures. Anything you can share that will help others run HBase in production successfully is appreciated.

How much does it cost to attend?

Early Bird registration: $375 (through Feb. 1, 2015); Standard registration: $425

So think about that session proposal over the holidays and send it over to the Program Committee for review. But don’t thing about it for TOO long; CfP (via hbasecon.com) closes at midnight on Feb. 6.

If you’re happy with the idea of just being an attendee, you’ll save $50 if you register during the Early Bird period (ends soon).

Categories: Hadoop

New in Cloudera Labs: SparkOnHBase

Cloudera Blog - Thu, 12/18/2014 - 21:06

Apache Spark is making a huge impact across our industry, changing the way we think about batch processing and stream processing. However, as we progressively migrate from MapReduce toward Spark, we shouldn’t have to “give up” anything. One of those capabilities we need to retain is the ability to interact with Apache HBase.

In this post, we will share the work being done in Cloudera Labs to make integrating Spark and HBase super-easy in the form of the SparkOnHBase project. (As with everything else in Cloudera Labs, SparkOnHBase is not supported and there is no timetable for possible support in the future; it’s for experimentation only.) You’ll learn common patterns of HBase integration with Spark and see Scala and Java examples for each. (It may be helpful to have the SparkOnHBase repository open as you read along.)

HBase and Batch Processing Patterns

Before we get into the coolness of Spark, let’s define some powerful usage patterns around HBase interactions with batch processing. This discussion is necessary because when I talk to many customers that are new to HBase, they tell me that they hear HBase and MapReduce should never be used together.

In fact, although there are valid use cases to have a HBase cluster that is isolated from MapReduce for low SLA reasons, there are also use cases where the combination of MapReduce and HBase is the right approach. Here are just a couple examples:

  • Massive operations on a tree/DAG/graph structures stored in HBase
  • Interaction with a store or table that is in constant change, with MapReduce or Impala
SparkOnHBase Design

We experimented with many designs for how Spark and HBase integration should work and ended up focusing on a few goals:

  • Make HBase connections seamless.
  • Make Kerberos integration seamless.
  • Create RDDs through Scan actions or from an existing RDD which are used to generate Get commands.
  • Take any RDD and allow any combination of HBase operations to be done.
  • Provide simple methods for common operations while allowing unrestricted, unknown advanced operation through the API.
  • Support Scala and Java.
  • Support Spark and Spark Streaming with a like API.

These goals led us to a design that took a couple of notes from the GraphX API in Spark. For example, in SparkOnHBase there is an object called HBaseContext. This class has a constructor that takes HBase configuration information and then once constructed, allows you to do a bunch of operations on it. For example, you can:

  • Create RDD/DStream from a Scan
  • Put/Delete the contents of a RDD/DStream into HBase
  • Create a RDD/DStream from gets created from the contents of a RDD/DStream
  • Take the contents of a RDD/DStream and do any operation if a HConnection was handed to you in the worker process

Let’s walk through a code example so you can an idea about how easy and powerful this API can be. First, we create a RDD, connect to HBase, and put the contents of that RDD into HBase.

// Nothing to see here just creating a SparkContext like you normally would val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + tableName + " " + columnFamily) val sc = new SparkContext(sparkConf) //This is making a RDD of //(RowKey, columnFamily, columnQualifier, value) val rdd = sc.parallelize(Array( (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))), (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))), (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))), (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))), (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))) ) ) //Create the HBase config like you normally would then //Pass the HBase configs and SparkContext to the HBaseContext val conf = HBaseConfiguration.create(); conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); val hbaseContext = new HBaseContext(sc, conf); //Now give the rdd, table name, and a function that will convert a RDD record to a put, and finally // A flag if you want the puts to be batched hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd, tableName, //This function is really important because it allows our source RDD to have data of any type // Also because puts are not serializable (putRecord) => { val put = new Put(putRecord._1) putRecord._2.foreach((putValue) => put.add(putValue._1, putValue._2, putValue._3)) put }, true);

Now every partition of that RDD will execute in parallel (in different threads in a number of Spark workers across the cluster)—kind of like what would have happened if we did Puts in a mapper or reducer task.

One thing to note is that the same rules apply when working with HBase from MapReduce or Spark in terms of Put and Get performance. If you have Puts that are not partitioned, a Put batch will most likely get sent to each RegionServer, which will result in fewer records per RegionServers per batch. The image below illustrates how this would look with six RegionServers; imagine if you had 100 of them (it would be 16.7x worse)!

Now let’s look at that same diagram if we used Spark to partition first before talking to HBase.

Examples

Next, we’ll quickly explore just three code examples to illustrate how you can do different types of operations. (A Put example would look almost exactly like a delete, checkPut, checkDelete, or increment example.)

The big difference in a get example would be the fact that we are producing a new RDD from an existing one. Think of it as a “Spark map function.”

// Create some fake data val rdd = sc.parallelize(Array( (Bytes.toBytes("1")), … (Bytes.toBytes("6")), (Bytes.toBytes("7")))) //Make HBaseContext val conf = HBaseConfiguration.create() conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) val hbaseContext = new HBaseContext(sc, conf); //This is the method we are going to focus on val getRdd = hbaseContext.bulkGet[Array[Byte], String]( tableName, //The table we want to get from 2, //Get list batch size. Set this somewhere under 1000 rdd, //RDD that hold records that will turn into Gets record => { //Function that will take a given record to a Get new Get(record) }, (result: Result) > { //Function that will take a given result and return a serializable object val it = result.list().iterator() val b = new StringBuilder b.append(Bytes.toString(result.getRow()) + ":") while (it.hasNext()) { val kv = it.next() val q = Bytes.toString(kv.getQualifier()) if (q.equals("counter")) { b.append("(" + Bytes.toString(kv.getQualifier()) + "," + Bytes.toLong(kv.getValue()) + ")") } else { b.append("(" + Bytes.toString(kv.getQualifier()) + "," + Bytes.toString(kv.getValue()) + ")") } } b.toString })

Now, let’s say your interaction with HBase is more complex than straight gets or Puts—a case were you want to say, “Just give me an HConnection and leave me alone.” Well, HBaseContext has map, mapPartition, foreach, andforeachPartition methods just for you.

Here’s an example of the foreachPartition in Java.

JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); //Create some fake data List list = new ArrayList(); list.add(Bytes.toBytes("1")); list.add(Bytes.toBytes("2")); list.add(Bytes.toBytes("3")); list.add(Bytes.toBytes("4")); list.add(Bytes.toBytes("5")); JavaRDD rdd = jsc.parallelize(list); //This foreachPartition will allow us to do anything we want with a HConnection // It take two parameters: // - input RDD // - a VoidFunction that will get a Iterator and the HConnection. The Iterator will // have all the records in this partition hbaseContext.foreachPartition(rdd, new VoidFunction, HConnection>>() { public void call(Tuple2, HConnection> t) throws Exception { //We can get the table out side of the loop HTableInterface table1 = t._2().getTable(Bytes.toBytes("Foo")); Iterator it = t._1(); //Go through every record and getting it from HBase // if it isn't there then put it there. Not a great real world example but an example while (it.hasNext()) { byte[] b = it.next(); Result r = table1.get(new Get(b)); if (!r.getExists()) { table1.put(new Put(b)); } } //close table outside of loop table1.close(); } });

The last example to talk about will be the create a RDD from a scan:

val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() conf.addResource(new Path("/etc/hbase/conf/core-site.xml")) conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")) val hbaseContext = new HBaseContext(sc, conf) var scan = new Scan() scan.setCaching(100) var getRdd = hbaseContext.hbaseRDD(tableName, scan)

This code will execute a scan just like MapReduce would do with the table input format and populate the resulting RDD with records of type (RowKey, List[(columnFamily, columnQualifier, Value)]. If you don’t like that record type, then just use the hbaseRDD method, which gives you a record conversion function for changing it to whatever you like.

Conclusion

SparkOnHBase has been tested on a number of clusters with Spark and Spark Streaming; give it a look and let us know your feedback via the Cloudera Labs discussion group. The hope is that this project and others like it will help us blend the goodness from different Hadoop ecosystem components to help solve bigger problems.

Acknowledgements

Special thanks to the people that helped me make SparkOnHBase: Tathagata Das (TD), Mark Grover, Michael Stack, Sandy Ryza, Kevin O’Dell, Jean-Marc Spaggiari, Matteo Bertozzi, and Jeff Lord.

Ted Malaska is a Solutions Architect at Cloudera, a contributor to Apache Spark, and a co-author of the O’Reilly book, Hadoop Applications Architecture.

Categories: Hadoop

The Top 10 Posts of 2014 from the Cloudera Engineering Blog

Cloudera Blog - Thu, 12/18/2014 - 17:20

Our “Top 10″ list of blog posts published during a calendar year is a crowd favorite (see the 2013 version here), in particular because it serves as informal, crowdsourced research about popular interests. Page views don’t lie (although skew for publishing date—clearly, posts that publish earlier in the year have pole position—has to be taken into account). 

In 2014, a strong interest in various new components that bring real time or near-real time capabilities to the Apache Hadoop ecosystem is apparent. And we’re particularly proud that the most popular post was authored by a non-employee.

  1. How-to: Create a Simple Hadoop Cluster with VirtualBox
    by Christian Javet
    Explains how t set up a CDH-based Hadoop cluster in less than an hour using VirtualBox and Cloudera Manager.
  2. Why Apache Spark is a Crossover Hit for Data Scientists
    by Sean Owen
    An explanation of why Spark is a compelling multi-purpose platform for use cases that span investigative, as well as operational, analytics. 
  3. How-to: Run a Simple Spark App in CDH 5
    by Sandy Ryza
    Helps you get started with Spark using a simple example.
  4. New SQL Choices in the Apache Hadoop Ecosystem: Why Impala Continues to Lead
    by Justin Erickson, Marcel Kornacker & Dileep Kumar
    Open benchmark testing of Impala 1.3 demonstrates performance leadership compared to alternatives (by 950% or more), while providing greater query throughput and with a far smaller CPU footprint.
  5. Apache Kafka for Beginners
    by Gwen Shapira & Jeff Holoman
    When used in the right way and for the right use case, Kafka has unique attributes that make it a highly attractive option for data integration.
  6. Apache Hadoop YARN: Avoiding 6 Time-Consuming “Gotchas”
    by Jeff Bean
    Understanding some key differences between MR1 and MR2/YARN will make your migration much easier.
  7. Impala Performance Update: Now Reaching DBMS-Class Speed
    by Justin Erickson, Greg Rahn, Marcel Kornacker & Yanpei Chen
    As of release 1.1.1, Impala’s speed beat the fastest SQL-on-Hadoop alternatives–including a popular analytic DBMS running on its own proprietary data store.
  8. The Truth About MapReduce Performance on SSDs
    by Karthik Kambatla & Yanpei Chen
    It turns out that cost-per-performance, not cost-per-capacity, is the better metric for evaluating the true value of SSDs. (See the session on this topic at Strata+Hadoop World San Jose in Feb. 2015!)
  9. How-to: Translate from MapReduce to Spark
    by Sean Owen
    The key to getting the most out of Spark is to understand the differences between its RDD API and the original Mapper and Reducer API.
  10. How-to: Write and Run Apache Giraph Jobs on Hadoop
    by Mirko Kämpf
    Explains how to create a test environment for writing and testing Giraph jobs, or just for playing around with Giraph and small sample datasets.

Based on the above, a significant number of you are at least exploring Apache Spark as an eventual replacement for MapReduce, as well as tracking Impala’s progress as the standard analytic database for Apache Hadoop. What will next year bring, do you think? 

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

Hands-on Hive-on-Spark in the AWS Cloud

Cloudera Blog - Tue, 12/16/2014 - 20:03

Interested in Hive-on-Spark progress? This new AMI gives you a hands-on experience.

Nearly one year ago, the Apache Hadoop community began to embrace Apache Spark as a powerful batch-processing engine. Today, many organizations and projects are augmenting their Hadoop capabilities with Spark. As part of this shift, the Apache Hive community is working to add Spark as an execution engine for Hive. The Hive-on-Spark work is being tracked by HIVE-7292 which is one of the most popular JIRAs in the Hadoop ecosystem. Furthermore, three weeks ago, the Hive-on-Spark team offered the first demo of Hive on Spark.

Since that demo, we have made tremendous progress, having finished up Map Join (HIVE-7613), Bucket Map Join (HIVE-8638), integrated with Hive Server 2 (HIVE-8993) and importantly integrated our Spark Client (HIVE-8548, aka Remote Spark Context). Remote Spark Context is important as it’s not possible to have multiple SparkContexts within a single process. The RSC API allows us to run the SparkContext on the server in a container while utilizing the Spark API on the client—in this case HiveServer 2, which reduces resource utilization on an already burdened component.

Many users have proactively starting using the Spark branch and providing feedback. Today, we’d like to offer you the first chance to try Hive-on-Spark yourself. As this work is under active development, for most users, we do not recommend that you attempt to run this code outside of the packaged Amazon Machine Image (AMI) provided. The AMI ami-35ffed70 (named hos-demo-4) is available in us-west-1 while we recommend an instance of m3.large or larger.

Once logging in as ubuntu, change to the hive user (sudo su - hive) and you will be greeted with instructions on how to start Hive on Spark. Pre-loaded on the AMI is a small TPC-DS dataset and some sample queries. Users are strongly encouraged to load their own sample datasets and try their own queries. We are hoping not only to showcase our progress delivering Hive-on-Spark but also to help find areas of improvement, early. As such, if you find any issues, please email hos-ami@cloudera.org and the cross-vendor team will do its best to investigate the issue.

Despite spanning the globe, the cross-company engineering teams have become close. The team members would like to thank our employers for sponsoring this project: MapR, Intel, IBM, and Cloudera.

Rui Li is a software engineer at Intel and a contributor to Hive.

Na Yang is a staff software engineer at MapR and a contributor to Hive.

Brock Noland is an engineering manager at Cloudera and a Hive PMC member.

 

 

Categories: Hadoop

5 Pitfalls of Benchmarking Big Data Systems

Cloudera Blog - Fri, 12/12/2014 - 17:31

Benchmarking Big Data systems is nontrivial. Avoid these traps!

Here at Cloudera, we know how hard it is to get reliable performance benchmarking results. Benchmarking matters because one of the defining characteristics of Big Data systems is the ability to process large datasets faster. “How large” and “how fast” drive technology choices, purchasing decisions, and cluster operations. Even with the best intentions, performance benchmarking is fraught with pitfalls—easy to get numbers, hard to tell if they are sound.

Below we list five common pitfalls and illustrate them with internal and customer-based stories. They offer a behind-the-scenes look at our engineering and review processes that allow us to produce rigorous benchmark results. These stories illustrate important principles of conducting your own performance benchmarking, or assessing others’ results.

Pitfall 1: Comparing Apples to Oranges

We often run two tests, expecting only one parameter to change, while in fact many parameters changed and a comparison is impossible – in other words, we “compare apples to oranges.”

CDH 5.0.0 was the first release of our software distribution with YARN and MapReduce 2 (MR2) as the default MapReduce execution framework. To their credit, our partners and customers did performance benchmarking on their own when they considered whether to upgrade. Many partners and customers initially reported a performance regression from MapReduce 1 (MR1) in earlier versions of CDH to YARN and MapReduce2 in CDH 5.0.0.

What actually happened was that a straightforward benchmark ended up comparing two different things—comparing apples to oranges. Two technical issues led to this comparison discrepancy.

One issue was that TeraSort, a limited yet popular benchmark, changed between MR1 and MR2. To reflect rule changes in the GraySort benchmark on which it is based, the data generated by the TeraSort included with MR2 is less compressible. A valid comparison would use the same version of TeraSort for both releases, because map-output compression is enabled by default as a performance optimization in CDH. Otherwise, MR1 will have an unfair advantage by using more compressible data.

Another issue was the replacement of the “task slot” concept in MR1 with the “container” concept in MR2. YARN has several configuration parameters that affected how many containers will be run on each node. A valid comparison would set these configurations such that there is the same degree of parallel processing between MR1 and MR2. Otherwise, depending on whether hardware is over or under-committed, either MR1 or MR2 will have the advantage.

We committed these pitfalls ourselves in the early days of ensuring MR1 and MR2 performance parity. We regularly compared MR1 and MR2 performance on our nightly CDH builds, and the “regression” was caught the very first time we did this comparison. Our MapReduce and Performance Engineering teams collaborated to both identify the code changes, and understand what makes a valid performance comparison. This effort culminated in MR2 shipped in CDH 5.0.0 at performance parity with MR1.

Pitfall 2: Not Testing at Scale

“Scale” for big data systems can mean data scale, concurrency scale (number of jobs and number of tasks per job), cluster scale (number of nodes/racks), or node scale (per node hardware size). Failing to test “at scale” for any of these dimensions can lead to surprising behavior for your production clusters.

It is illustrative to look at another aspect of our efforts to drive MR2 to performance parity with MR1. We wanted to verify that MR2 and MR1 perform at parity when a large number of jobs are running. We ran SWIM, which submits many jobs concurrently over hours or even days, simulating the workload logged on actual production clusters. The first runs of SWIM on MR2 revealed a live-lock issue where the jobs would appear as submitted, but none of them would make any progress. What happened is that all available resources were allocated to the Application Masters, leaving no room for the actual tasks.

This issue escaped detection in our other scale tests that covered a range of data, cluster, and node scales. The live-lock occurs only when all the containers in a cluster are taken up by Application Masters. On a cluster of non-trivial size, this means hundreds or even thousands of concurrent jobs. SWIM is specifically designed to reveal such issues by replaying production workloads with their original level of concurrency and load variation over time. In this case, we found a critical issue before our customers ever hit it.

Pitfall 3: Believing in Miracles

If something is too good to be true, it’s probably not true. This means we should always have a model of what performance should be, so that we can tell if a performance improvement is expected, or too good to be true.

Here are some recent “miracles” we have had to debunk for ourselves and for our customers:

  • A customer came to us and declared that Impala performs more than 1000x better than its existing data warehouse system, and wanted us to help it set up a new cluster to handle a growing production workload. The 1000x difference is orders of magnitude larger than our own measurements, and immediately made us skeptical. Following much discussion, we realized that the customer was comparing very simple queries running on a proof-of-concept Impala cluster versus complex queries running on a heavily loaded production system. We helped the customer do an apples-to-apples comparison, yet it turns out Impala still has an advantage. We left the customer with realistic plans for how to grow its data management systems.
  • A customer asked us to run Apache Sqoop in several configurations, with the intent of finding the configuration leading to the best export performance. Among other tests we compared the performance of loading data to new partitions through Oracle Database’s direct path writes, to loading the same data through normal inserts. We normally expect direct path writes to be significantly faster since they bypass the normally busy buffer-cache and redo log subsystems, writing data blocks directly to storage. In this test, the normal inserts were 3 times faster than the direct path writes. Quick investigation revealed that Sqoop was exporting data to an otherwise idle Oracle cluster with over 300GB of memory dedicated to the buffer cache. Loading data into memory in a server with no contention is obviously faster than writing the same data to disk. We explained the results to the customer and recommended repeating the tests on a cluster with realistic workloads.
  • A customer asked us for comment on a Hadoop sort benchmark result in the trade press. The result is more than 100x faster than what we found internally. We took a look at the benchmark report and very quickly found that the data size being tested is considerably smaller than the available memory in the cluster. In other words, a knowledgeable operator would be able to configure Hadoop in a way that the sort takes place completely in memory. This approach departs from the common practice of configuring sort with data size much greater than total cluster memory. So the more-than-100x gap comes from the inherent hardware difference between memory and disk IO, rather than a difference between two software systems.

The ability to identify miracles requires us having models of expected performance beyond just a “gut feeling”. These models can come from prior results, or an understanding of where the system bottlenecks should be. Benchmarking without such models would give you a lot of numbers but not a lot of meaning.

Pitfall 4: Using Unrealistic Benchmarks

Biased benchmarks are benchmarks where the choice of workload, hardware or presentation choices is done regardless of the expected requirements of the customers. Rather, these choices are meant to highlight the capabilities of the vendor performing the benchmark.

Here are specific warning signs of a biased benchmark:

  • Misleading workloads: When a vendor ran benchmarks on 100GB of data when the system is marketed as a “Big Data” system designed for 100TB data sets. Or when a transactional workload is used to test a system with mostly analytical use cases. Terasort, for example, has specific characteristics and stresses a very specific subset of the processing subsystem. It is not necessarily a good benchmark to evaluate how the system will scale for other workloads, although it is a useful first step in comparing different hardware configurations.

At Cloudera, Terasort is only one job in our MapReduce performance benchmarking suite. We run all jobs in the suite under different meanings of scale beyond just large data size. (See Pitfall 2 above.)

  • Premium hardware: Vendors often improve their numbers by using hardware not typically used in production: solid state drives (SSDs) when the customers more commonly use hard disk drives (HDDs), or types of SSDs not available in the general market. The Transaction Processing Council – C (TPC-C) benchmark allow the use of hardware that is not available provided that availability dates are published. It is wise to check if the hardware choices make results irrelevant when using benchmarks for purchasing decisions.

At Cloudera, we have explored MapReduce performance for SSDs. We were very conscious of SSD’s prevalence in the market compared with HDDs. This prompted us to suggest to our hardware partners to track SSD performance-per-cost in addition to the more commonly cited capacity-per-cost. The importance of the performance-per-cost metric represents a key insight from the study.

  • Cherry-picking queries or jobs: The vendor picked very specific queries out of a standard benchmark, but can’t explain the choice with objective criteria that is relevant to the customers (or worse, doesn’t even disclose that a choice was made!)

At Cloudera, many of our past Impala performance results used 20 queries derived from the TPC – Decision Support (TPC-DS) benchmark. These queries were chosen over a year ago, and cover interactive, reporting, and deep analytic use cases. At the time, it was a major improvement over a frequently cited set of queries that were constructed without empirical backing from actual customer use cases. The 20 queries also represent a step forward from our own prior efforts using queries derived from TPC-H. Both TPC-H and TPC-DS are backed by customer surveys from vendors in the TPC consortium, with TPC-H considered to be the less demanding benchmark. We have kept the set of 20 queries derived from TPC-DS to help ourselves compare against our own prior results, and we are well aware they are less than the full set of 99 queries in the official TPC-DS. Look for our future posts in this space.

To an extent all commercial benchmarks are suspect of bias, since they are performed or commissioned by a specific vendor to market their products. Vendors can enhance their own credibility by being transparent about the limits of their own work. Customers can hold vendors accountable by understanding their own workload and have a conversation with vendors about whether a product addresses their specific use case.

Pitfall 5. Communicating Results Poorly

Poorly communicated results detract from otherwise good performance benchmarking projects. Here are Cloudera, we check all external-facing benchmarking communications for the following:

  1. Whether we selected a benchmark that
    1. Is unbiased (see Pitfall 3 above),
    2. Exercise workloads relevant to actual customers, and
    3. Scales across data size, concurrency level, cluster size, and node size.
  2. Whether we reported sufficient information for industry peers to assess the significance of the result, and to reproduce the tests if needed. This requires reporting
    1. The benchmark we used and why we used it,
    2. The performance metrics we measured and how we measured them,
    3. The hardware used and the software tuning applied.

One more aspect of a good benchmarking report is whether the results have been independently verified or audited. The purpose of an independent audit is to have the above checks done by someone other than the organization that does the benchmarking study. Benchmarking results that passed independent audit are more likely to be communicated clearly and completely.

There are several gold standards for audit and verification practices established before the rise of Big Data:

  • Dedicated auditors: The TPC uses dedicated auditors. Each auditor is certified to audit a particular benchmark only after passing a test designed by the working group who initially specified that benchmark.
  • Validation kits and fair-use rules: The Standard Performance Evaluation Corporation (SPEC) uses a combination of validation checks built into benchmarking kits, fair-use rules governing how the results should be communicated, and review by the SPEC organization, which encompasses many industry peers of the test sponsor.
  • Peer review: The official Sort Benchmark has new entries reviewed by past winners. This incentivizes the winners to “hand over the torch” only if new entries are sufficiently rigorous.

There are not yet any widely accepted audit and verification processes for Big Data. The need for complete and neutral benchmarking results is sometimes diluted by the need to stand out in the trade press. However, the past year has seen a phenomenal growth in the level of performance knowledge in the customer base and the broader community. Every vendor benchmark is now audited by customers and industry peers. This is why we always conduct and communicate our performance benchmarking in a rigorous and open manner.

Closing

Performance benchmarking is hard. When they are done well, benchmarks can guide us as well as the community. We close this blog with anecdotes of the authors’ benchmarking mistakes committed early in their career. After all, anyone can make benchmarking errors, and everyone can learn from them.

Gwen Shapira is on the Platform Engineering team at Cloudera. She once ran a database performance benchmark on a proof-of-concept 5-node cluster. When she was asked what would be the performance for a 50-node production cluster, she multiplied the 5-node performance numbers by 10x. The production cluster blew up, hitting network bottlenecks not revealed at the proof-of-concept scale. Lesson: Testing is better than extrapolating.

Yanpei Chen is on the Performance Engineering team at Cloudera. He ran his first Hadoop benchmark as a grad student at UC Berkeley, where he accidentally mounted HDFS on the departmental network filer. He took down the filer for all EECS professors, staff, and students, and received hate mail from the system administrators for a week. Lesson: Run your benchmarks in a way that doesn’t disrupt production systems.

Categories: Hadoop

The Impala Cookbook

Cloudera Blog - Wed, 12/10/2014 - 16:01

Impala, the open source MPP analytic database for Apache Hadoop, is now firmly entrenched in the Big Data mainstream. How do we know this? For one, Impala is now the standard against which alternatives measure themselves, based on a proliferation of new benchmark testing. Furthermore, Impala has been adopted by multiple vendors as their solution for letting customers do exploratory analysis on Big Data, natively and in place (without the need for redundant architecture or ETL). Also significant, we’re seeing the emergence of best practices and patterns out of customer experiences.

As an effort to streamline deployments and shorten the path to success, Cloudera’s Impala team has compiled a “cookbook” based on those experiences, covering:

  • Physical and Schema Design
  • Memory Usage
  • Cluster Sizing and Hardware Recommendations
  • Benchmarking
  • Multi-tenancy Best Practices
  • Query Tuning Basics
  • Interaction with Apache Hive, Apache Sentry, and Apache Parquet

By using these recommendations, Impala users will be assured of proper configuration, sizing, management, and measurement practices to provide an optimal experience. Happy cooking!

Categories: Hadoop

Progress Report: Community Contributions to Parquet

Cloudera Blog - Tue, 12/09/2014 - 21:05

Community contributions to Parquet are increasing in parallel with its adoption. Here are some of the highlights.

Apache Parquet (incubating), the open source, general-purpose columnar storage format for Apache Hadoop, was co-founded only 18 months ago by Cloudera and Twitter. Since that time, its rapid adoption by multiple platform vendors and communities has made it a de facto standard for this purpose.

Most of Cloudera’s recent contributions to have focused on fixing bugs reported by its growing number of users. Thanks to this work and contributions by others in the Parquet community (including employees from companies like Criteo, Stripe, Netflix, MapR, Salesforce.com, and others), the format is becoming more stable and mature with each release

In this post, you’ll learn about just a few of these awesome community contributions.

parquet-tools

parquet-tools is a command-line utility that was contributed by engineers from ARRIS Inc. earlier this year. You can download it as a standalone tarball or use the copy included in CDH 5.2 and later. This handy utility lets you view the schema for a Parquet file, catthe content, or take a closer look at the encoding details of individual columns, all the way down to to the page level. For example:

blue@work:~$ parquet-tools schema favorite_books.parquet message Book { required binary isbn (UTF8); required binary title (UTF8); required int32 length_pages; required binary author (UTF8); } blue@work:~$ parquet-tools cat favorite_books.parquet isbn = 860-1200653809 title = Pride and Prejudice and Zombies length_pages = 320 author = Jane Austen & Seth Grahame-Smith isbn = 978-0394800011 title = The Cat in the Hat length_pages = 61 author = Dr. Seuss blue@work:~$ parquet-tools dump --disable-data favorite_books.parquet row group 0 -------------------------------------------------------------------------------------isbn: BINARY SNAPPY DO:0 FPO:4 SZ:54/53/0.98 VC:2 ENC:BIT_PACKED,PLAIN title: BINARY SNAPPY DO:0 FPO:58 SZ:73/74/1.01 VC:2 ENC:BIT_PACKED,PLAIN length_pages: INT32 SNAPPY DO:0 FPO:131 SZ:27/25/0.93 VC:2 ENC:BIT_PACKED,PLAIN author: BINARY SNAPPY DO:0 FPO:158 SZ:68/66/0.97 VC:2 ENC:BIT_PACKED,PLAIN isbn TV=2 RL=0 DL=0 ------------------------------------------------------------------------------------- page 0: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN SZ:36 VC:2 title TV=2 RL=0 DL=0 ------------------------------------------------------------------------------------- page 0: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN SZ:57 VC:2 length_pages TV=2 RL=0 DL=0 ------------------------------------------------------------------------------------- page 0: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN SZ:8 VC:2 author TV=2 RL=0 DL=0 ------------------------------------------------------------------------------------- page 0: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN SZ:49 VC:2

parquet-protobuf

parquet-protobuf, contributed by Lukas Nalezenec, adds support for storing protobuf objects directly in Parquet without using a translation step to another object model. It works just like the existing parquet-avro and parquet-thrift libraries, making it possible to move existing application code to a column-oriented format without substantial rewrites. Using parquet-protobuf is as easy as this:

ProtoParquetWriter writer = new ProtoParquetWriter( new Path("/tmp/test.parquet"), cls); for (MessageOrBuilder record : records) { writer.write(record); } writer.close();

Filter2 API

The filter2 API, contributed by Twitter engineers, adds a DSL to easily express predicates that are applied to records. It can also serialize the predicates so there’s no longer a need to write custom code that pulls filter values out of a Configuration.

// foo == 10 || bar <= 17.0 IntColumn foo = intColumn("foo"); DoubleColumn bar = doubleColumn("x.y.bar"); FilterPredicate pred = or(eq(foo, 10), ltEq(bar, 17.0));

Conclusion

The Parquet community has grown to include more than 50 contributors, not including the work done by the Apache Spark and Apache Hive communities for native support. We welcome any and all new contributors to make this vibrant community even stronger!

Ryan Blue is a Software Engineer at Cloudera.

Categories: Hadoop

New in CDH 5.2: Improvements for Running Multiple Workloads on a Single HBase Cluster

Cloudera Blog - Tue, 12/09/2014 - 16:57

These new Apache HBase features in CDH 5.2 make multi-tenant environments easier to manage.

Historically, Apache HBase treats all tables, users, and workloads with equal weight. This approach is sufficient for a single workload, but when multiple users and multiple workloads were applied on the same cluster or table, conflicts can arise. Fortunately, starting with HBase in CDH 5.2 (HBase 0.98 + backports), workloads and users can now be prioritized.

One can categorize the approaches to this multi-tenancy problem in three ways:

  • Physical isolation or partitioning – each application or workload operates on its own table and each table can be assigned to a set of machines.
  • Scheduling – applications and workloads are scheduled based on access patterns or time and resources needed to complete.
  • Quotas – limited ad-hoc queries on a table can be shared with other applications.

In this post, I’ll explain three new HBase mechanisms (see umbrella JIRA HBASE-10994 – HBase Multitenancy) focused on enabling some of the approaches above.

Throttling

In a multi-tenant environment, it is useful to enforce manual limits that prevent users from abusing the system. (A simple example is: “Let MyApp run as fast as possible and limit all the other users to 100 request per second.”)

The new throttling feature in CDH 5.2 (HBASE-11598 – Add rpc throttling) allows an admin to enforce a limit on number of requests by time or data by time for a specified user, table, or namespace. Some examples are:

  • Throttle Table A to X req/min
  • Throttle Namespace B to Y req/hour
  • Throttle User K on Table Z to KZ MB/sec

An admin can also change the throttle at runtime. The change will propagate after the quota refresh period has expired, which at the moment has a default refresh period of 5 minutes. This value is configurable by modifying the hbase.quota.refresh.period property in hbase-site.xml. In future releases, a notification will be sent to apply the changes instantly.

In the chart below, you can see an example of the results of throttling. 

Initially, User 1 and User2 are unlimited and then the admin decides that the User 1 job is more important and throttles the User 2 job, reducing contention with the User 1 requests.

The shell allows you to specify the limit in a descriptive way (e.g. LIMIT => 10req/sec or LIMIT => 50M/sec). To remove the limit, use LIMIT => NONE.

Examples:

$ hbase shell hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10req/sec' hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10M/sec' hbase> set_quota TYPE => THROTTLE, USER => 'u1', TABLE => 't2', LIMIT => '5K/min' hbase> set_quota TYPE => THROTTLE, USER => 'u1', NAMESPACE => 'ns2', LIMIT => NONE hbase> set_quota TYPE => THROTTLE, NAMESPACE => 'ns1', LIMIT => '10req/sec' hbase> set_quota TYPE => THROTTLE, TABLE => 't1', LIMIT => '10M/sec' hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => NONE

You can also place a global limit and exclude a user or a table from the limit by applying the GLOBAL_BYPASS property. Consider a situation with a production workload and many ad-hoc workloads. You can choose to set a limit for all the workloads except the production one, reducing the impact of the ad-hoc queries on the production workload.

$ hbase shell hbase> set_quota NAMESPACE => 'ns1', LIMIT => '100req/min' hbase> set_quota USER => 'u1', GLOBAL_BYPASS => true

Note that the throttle is always enforced; even when the production workload is currently in-active, the ad-hoc requests are all throttled.

Request Queues

Assuming no throttling policy is in place, when the RegionServer receives multiple requests they are now placed into a queue waiting for a free execution slot (HBASE-6721 – RegionServer Group based Assignment).

The simplest queue is a FIFO queue, which means that each request has to wait for the completion of all the requests in the queue before it. And, as you can see from the picture below, fast/interactive queries can get stuck behind large requests. (To keep the example simple, let’s assume that there is a single executor.)

One solution would be to divide the large requests into small requests and interleave each chunk with other requests, allowing multiple requests to make progress. The current infrastructure doesn’t allow that; however, if you are able to guess how long a request will take to be served, you can reorder requests—pushing the long requests to the end of the queue and allowing short requests to jump in front of longer ones. At some point you have to execute the large requests and prioritize the new requests behind large requests. However, the short requests will be newer, so the result is not as bad as the FIFO case but still suboptimal compared to the solution described above where large requests are split into multiple smaller requests.

Deprioritizing Long-running Scanners

Along the line of what we described above, CDH 5.2 has a “fifo” queue and a new queue type called “deadline” configurable by setting the hbase.ipc.server.callqueue.type property (HBASE-10993 – Deprioritize long-running scanners). Currently there is no way to estimate how long each request may take, so de-prioritization only affects scans and is based on the number of “next” calls a scan request did. This assumes that when you are doing a full table scan, your job is probably not that interactive, so if there are concurrent requests you can delay long-running scans up to a limit tunable by setting the hbase.ipc.server.queue.max.call.delay property. The slope of the delay is calculated by a simple square root of (numNextCall * weight) where the weight is configurable by setting the hbase.ipc.server.scan.vtime.weight property.

Multiple-Typed Queues

Another way you can prioritize/deprioritize different kinds of requests is by having a specified number of dedicated handlers and queues. That way you can segregate the scan requests in a single queue with a single handler, and all the other available queues can service short Get requests.

Currently, some static tuning options are available to adjust the ipc queues/handlers based on the type of workload. This approach is an interim first step that will eventually allow you to change the settings at runtime as you do for throttling, and to enable dynamically adjusting values based on the load.

Multiple Queues

To avoid contention and separate different kinds of requests, a new property, hbase.ipc.server.callqueue.handler.factor, allows admins to increase the number of queues and decide how many handlers share the same queue (HBASE-11355 – Multiple Queues / Read-Write Queues).

Having more queues, such as one queue per handler, reduces contention when adding a task to a queue or selecting it from a queue. The trade-off is that if you have some queues with long-running tasks, a handler may end up waiting to execute from that queue rather than stealing from another queue which has waiting tasks.

Read and Write

With multiple queues, you can now divide read and write requests, giving more priority (queues) to one or the other type. Use the hbase.ipc.server.callqueue.read.ratio property to choose to serve more reads or writes (HBASE-11724 Short-Reads/Long-Reads Queues).

Similar to the read/write split, you can split gets and scans by tuning the hbase.ipc.server.callqueue.scan.ratio to give more priority to gets or to scans. The chart below shows the effect of the settings.

A scan ratio 0.1 will give more queue/handlers to the incoming gets, which means that more of them can be processed at the same time and that fewer scans can be executed at the same time. A value of 0.9 will give more queue/handlers to scans so the number of scan request served will increase and the number of gets will decrease.

Future Work

Aside from addressing the current limitations mentioned above (static conf, unsplittable large requests, and so on) and doing things like limiting the number of tables that a user can create or using the namespaces more, a couple of major new features on the roadmap will further improve interaction between multiple workloads:

  • Per-user queues: Instead of a global setting for the system, a more advanced way to schedule requests is to allow each user to have its own “scheduling policy” allowing each user to define priorities for each table, and allowing each table to define request-types priorities. This would be administered in a similar way to throttling.
  • Cost-based scheduling: Request execution can take advantage of the known system state to prioritize and optimize scheduling. For example, one could prioritize requests that are known to be served from cache, prefer concurrent execution of requests that are hitting two different disks, prioritize requests that are known to be short, and so on.
  • Isolation/partitioning: Separating workload onto different machines is useful in situations where the admin understand the workload of each table and how to manually separate them. The basic idea is to reserve enough resources to run everything smoothly. (The only way to achieve that today is to set up one cluster per use case.)
Conclusion

Based on the above, you should now understand how to improve the interaction between different workloads using this new functionality. Note however that these features are only down payments on what will become more robust functionality in future releases.

Matteo Bertozzi is a Software Engineer at Cloudera and an HBase committer/PMC member.

Categories: Hadoop

This Month in the Ecosystem (November 2014)

Cloudera Blog - Mon, 12/08/2014 - 21:52

Welcome to our 15th edition of “This Month in the Ecosystem,” a digest of highlights from November 2014 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly).

November was busy, even accounting for the US Thanksgiving holiday:

  • LinkedIn was busy: it open-sourced Cubert, a framework for building data pipelines that includes a new scripting language and runtime. It also described Gobblin, an internal data ingestion system to which it intends to migrate some of its large ingestion processes. 
  • Spotify described how it uses Apache Crunch.
  • Strata + Hadoop World Barcelona 2014 convened, with 1,000+ in attendance. You can watch recorded keynotes here.
  • Pinerest described PinAnalytics, its in-house platform for Big Data analytics that utilizes Apache HBase.
  • Cloudera and Intel described the progress of BigBench, a new industrywide effort to create a sorely needed Big Data benchmark.
  • DataTorrent announced the formation of a new project, KOYA, to add YARN support to Apache Kafka.
  • Cask described Tephra, an open source project that adds ”complete” transaction support to HBase (as a first step, support for other NoSQL data stores to be added later).
  • Apache Hadoop 2.6, Apache Hive 0.14, and Apache Pig 0.14 were all released by their respective Apache communities.
  • Apache Drill graduated into a Top Level Project.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

For Apache Hadoop, The POODLE Attack Has Lost Its Bite

Cloudera Blog - Wed, 12/03/2014 - 16:32

A significant vulnerability affecting the entire Apache Hadoop ecosystem has now been patched. What was involved?

By now, you may have heard about the POODLE (Padding Oracle On Downgraded Legacy Encryption) attack on TLS (Transport Layer Security). This attack combines a cryptographic flaw in the obsolete SSLv3 protocol with the ability of an attacker to downgrade TLS connections to use that protocol. The result is that an active attacker on the same network as the victim can potentially decrypt parts of an otherwise encrypted channel. The only immediately workable fix has been to disable the SSLv3 protocol entirely.

POODLE sent many technical people scrambling. Web servers needed configuration changes, software projects using TLS needed to change default behavior, and web browsers moved to phase out SSLv3 support. Cloudera has also taken action.

This blog post provides an overview of the POODLE vulnerability, discuss its impacts on Apache Hadoop, and describe the fixes Cloudera pushed forward across the ecosystem.

What is POODLE?

Let’s begin with some background about SSL/TLS terminology: SSL (Secure Sockets Layer) is the former name for what is today called TLS. Between SSLv3 and TLSv1, the protocol was renamed. Even though this happened 15 years ago, the SSL name has stuck around. And even though SSLv3 has long been obsolete, and has been known to have other, lesser, vulnerabilities, its retirement has been drawn out due to the desire to provide backward compatibility for the sake of a smooth user experience.

In the meantime, SSLv3 has been replaced by TLSv1, TLSv1.1, and TLSv1.2. Under normal circumstances, the strongest protocol version that both sides support is negotiated at the start of the connection. However, an active attacker can introduce errors into this negotiation and force a fallback into the weakest protocol version: SSLv3.

POODLE—the attack on SSLv3—was discovered by Bodo Möller, Thai Duong, and Krzysztof Kotowicz at Google. Their report describes how the SSLv3 protocol can be tortured to reveal otherwise encrypted information, one byte at a time. Using the vulnerability, the researchers were able to extract an average of one byte for every 256 SSLv3 connection attempts. This might not sound bad to non-crypto-geeks, but attackers can realistically use it to retrieve session cookies: strings that identify a user in a secure session. If you have a session cookie for someone logged into say, Gmail, you can then gain access to his or her Gmail account.

The attack itself is an excellent piece of work. If you’re interested in more details, I can highly recommend this Imperial Violet blog post and this blog post by Matthew Green. The Wikipedia article on TLS has a huge amount of general information.

Leashing POODLE

One common thread between the Hadoop community and the security research community is the habit of devising creative project names; a clever acronym or portmanteau seems to be at least as valuable as the code or exploits themselves. In that spirit I bring you HADOODLE: fixes for POODLE across the Hadoop ecosystem.

As you all know, the Hadoop platform isn’t one project; rather, it’s a confederation of many different projects, all interoperating and cooperating in the same environment. The word “ecosystem” is overused, but describes the situation perfectly. This can be a great thing, because it lets multiple different groups solve a variety of problems independently of each other. It’s also a great model for fast-paced innovation. However, it can be problematic for the sort of pervasive changes required by security vulnerabilities.

The loose confederation of projects means that there are several different web servers and other usages of TLS. Rounding up and fixing POODLE meant educating and coordinating changes among 12 different projects comprising five different types of web servers and three programming languages. While conceptually simple, “turning off SSLv3″ is done slightly differently for each of these technologies and required an awareness of TLS idiosyncrasies. Beyond the full system-level tests (provided by services like Apache Bigtop), every component of the ecosystem needed to be individually scrutinized to ensure that POODLE was really fixed. All in all, it was a fair amount of work.

I’m happy to say that Cloudera took up the challenge, and today we’re able to announce patches for every current release of CDH and Cloudera Manager. Cloudera engineers contributed the following HADOODLE fixes upstream:

Cloudera also has fixes for Apache HBase, Impala, and Cloudera Manager. Every component not mentioned does not yet support TLS and hence is not vulnerable.

The table below shows the releases where the POODLE fixes are first available.

This issue is also described in our Technical Service Bulletin #37 (TSB-37). If you run a secure Hadoop cluster, I strongly recommend upgrading to the appropriate patch release above. 

Michael Yoder is a Software Engineer at Cloudera.

Categories: Hadoop

Pages