Hadoop

BigBench: Toward An Industry-Standard Benchmark for Big Data Analytics

Cloudera Blog - Tue, 11/25/2014 - 17:49

Learn about BigBench, the new industrywide effort to create a sorely needed Big Data benchmark.

Benchmarking Big Data systems is an open problem. To address this concern, numerous hardware and software vendors are working together to create a comprehensive end-to-end big data benchmark suite called BigBench. BigBench builds upon and borrows elements from existing benchmarking efforts in the Big Data space (such as YCSB, TPC-xHS, GridMix, PigMix, HiBench, Big Data Benchmark, and TPC-DS). Intel and Cloudera, along with other industry partners, are working to define and implement extensions to BigBench 1.0. (A TPC proposal for BigBench 2.0 is in the works.)

BigBench Overview

BigBench is a specification-based benchmark with an open-source reference implementation kit, which sets it apart from its predecessors. As a specification-based benchmark, it would be technology-agnostic and provide the necessary formalism and flexibility to support multiple implementations. As a “kit”, it would lower the barrier of entry to benchmarking by providing a readily available reference implementation as a starting point. As open source, it would allow multiple implementations to co-exist in one place and be reused by different vendors, while providing consistency where expected for the ability to provide meaningful comparisons.

The BigBench specification comprises two key components: a data model specification, and a workload/query specification. The structured part of the BigBench data model is adopted from the TPC-DS data model depicting a product retailer, which sells products to customers via physical and online stores. BigBench’s schema uses the data of the store and web sales distribution channel and augments it with semi-structured and unstructured data as shown in Figure 1.  

 

Figure 1: BigBench data model specification

The data model specification is implemented by a data generator, which is based on an extension of PDGF. Plugins for PDGF enable data generation for an arbitrary schema. Using the BigBench plugin, data can be generated for all three pats of the schema: structured, semi-structured and unstructured.

BigBench 1.0 workload specification consists of 30 queries/workloads. Ten of these queries have been taken from the TPC-DS workload and run against the structured part of the schema. The remaining 20 were adapted from a McKinsey report on Big Data use cases and opportunities. Seven of these run against the semi-structured portion and five run against the unstructured portion of the schema. The reference implementation of the workload specification is available here.

BigBench 1.0 specification includes a set of metrics (focused around execution time calculation) and multiple execution modes. The metrics can be reported for the end-to-end execution pipeline as well as each individual workload/query. The benchmark also defines a model for submitting concurrent workload streams in parallel, which can be extended to simulate the multi-user scenario.

BigBench Extensions: Toward BigBench 2.0

BigBench has some ways to go before it can be declared complete. A work-in-progress paper about BigBench co-authored by various industry and academia experts, discusses the reference implementation, community feedback on what is done well, and shortcomings of the 1.0 specification and implementation. The concerns are addressed in the form of proposed extensions for BigBench 2.0, some of which are described below.

The current specification, while representative of a wide variety of big data use cases, falls short of being complete—primarily because it is structured-data-intensive, with some representation for semi-structured and unstructured content (which also gets formatted into structured data before being processed). By adding more procedural and analytic workloads that perform complex operations directly on unstructured data, the lopsidedness can be eliminated. This approach is also closely tied to the data specification model, which currently doesn’t state the rate of input data generation and refresh, thereby excluding streaming workloads from the current specification.

The specification also needs to be extended to enforce that all file formats demonstrate sufficient flexibility to be created, read, and written from multiple popular data processing engines (MapReduce, Apache Spark, Apache Hive, and so on). This capability would ensure that all data is immediately query-able with no ETL or format-conversion delays.  

The current set of metrics excludes performance per cost. As our experiments show, this metric is critical for comparing software systems in the presence of hardware diversity. Performance subject to failures is another important metric currently missing in the specification. On the implementation side, the kit needs to be enhanced with implementation of existing and proposed queries over popular data processing engines, such as Impala, Spark, and MapReduce in addition to Hive.

Results for BigBench

Intel has evaluated a subset of BigBench 1.0 workloads against multiple hardware configurations. The goals of these experiments were to:

  • Validate a reference implementation of BigBench 1.0 on the latest Intel hardware using CDH (CDH 5.1)
  • Understand the price/performance trade-off for different configurations
  • Highlight price/performance as the key metric

We selected four queries (2, 16, 20, 30), each representing a different processing model (for query descriptions, please see https://github.com/intel-hadoop/Big-Bench). A common input dataset of 2TB was used. Input data sizes for each of the four workloads (2, 16, 20, 30) were 244GB, 206GB, 144GB, and 244GB respectively. We used Intel’s Performance Analysis Tool (PAT) to collect and analyze data. (PAT automates benchmark execution and data collection and makes use of Linux performance counters.)

The table below shows the three different configurations under experimentation. Each of the three clusters contained 10 nodes. For Configuration 1, we selected Intel Xeon E5 2680 v3 with 12 cores, 30MB cache, 2.5GHz (3.3 GHz @Turbo) and operating TDP of 120W. 800GB DC3500 series SSD was used as the boot drive. For primary storage, 12x 2TB SATA drives were used.

In Configuration 2, Intel Xeon E5 2680 v3 was replaced with E5 2697 v3 (14 cores, 35MB cache, 2.6GHz with 3.6 GHz@Turbo, and 145W TDP). A 2TB Intel DC3700 SSD was added as primary storage alongside the hard-disk drives. Concerns with regard to endurance and affordability of a large capacity NAND drive have prevented customers from embracing SSD technology for their Hadoop clusters; this Cloudera blog post explains the merits of using SSD for performance and the drawbacks in terms of cost. However, SSD technology has advanced significantly since the blog was published. For example, capacity and endurance have improved and the price of Intel DC3600 SSD is now one-third the price of SSD reported in that post.

In Configuration 3, we replaced Intel Xeon E5 2697 v3 with Intel Xeon E5 2699 v3 (18 cores, 45MB cache, 2.3GHz with 3.6 Ghz@Turbo, and 145W TDP). The hard-disk drives were replaced with a second SSD drive. Memory was increased from 128GB to 192GB.

As shown in the table above, Configuration 2 costs 1.5x of Configuration 1 and Configuration 3 costs around 2x of Configuration 1. A summary of results from the experiments are shown in Figure 2.       

Figure 2: BigBench testing results

For workloads #16 and #30, the performance gains from Configurations 2 and 3 are strictly proportional to the cost of the hardware. The customer has to pay 2x for getting 2x performance, and the performance per dollar ratio is close to 1. For Workloads #2 and #20, however, the performance per dollar ratio is less than 1. From these results we can conclude that Configuration 1 is a good choice in all cases—whereas scaled-up Configurations 2 and 3 make sense for certain type of workloads, especially those that are disk-IO intensive.

We monitored CPU utilization, disk bandwidth, network IO and memory usage for each workload (using PAT).  Most of the gains come from the use of SSDs. In the presence of SSDs, the workloads tend to become CPU-bound at which point an increase in CPU cores and frequency starts to help. For in-memory data processing engines (Spark and Impala), an increase in memory size is likely to be the most important factor. We hope to cover that issue in a future study.

Summary

BigBench is an industrywide effort on creating a comprehensive and standardized BigData benchmark. Intel and Cloudera are working on defining and implementing extensions to BigBench (in the form of BigBench 2.0). A preliminary validation against multiple cluster configurations using latest Intel hardware shows that, from a price-performance viewpoint, scaled-up configurations (that use SSD and high-end Intel processors) are beneficial for workloads that are disk-IO bound.

Acknowledgements

BigBench is a joint effort with partners in industry and academia. The authors would like to thank Chaitan Baru, Milind Bhandarkar, Alain Crolotte, Carlo Curino, Manuel Danisch, Michael Frank, Ahmed Ghazal, Minqing Hu, Hans-Arno Jacobsen, Huang Jie, Dileep Kumar, Raghu Nambiar, Meikel Poess, Francois Raab, Tilmann Rabl, Kai Sachs, Saptak Sen, Lan Yi and Choonhan Youn. We invite rest of the community to participate in development of the BigBench 2.0 kit.

Bhaskar Gowda is a Systems Engineer at Intel Corp.

Nishkam Ravi is a Software Engineer at Cloudera.

Software and workloads used in performance tests may have been optimized for performance only on Intel microprocessors. Performance tests, such as SYSmark and MobileMark, are measured using specific computer systems, components, software, operations and functions. Any change to any of those factors may cause the results to vary. You should consult other information and performance tests to assist you in fully evaluating your contemplated purchases, including the performance of that product when combined with other products.

Intel processor numbers are not a measure of performance. Processor numbers differentiate features within each processor family. Not across different processor families. Go to: http://www.intel.com/products/processor_number.

Intel, the Intel logo and Xeon are trademarks of Intel Corporation in the U.S. and/or other countries.

Categories: Hadoop

Apache Hive on Apache Spark: The First Demo

Cloudera Blog - Fri, 11/21/2014 - 13:06

The community effort to make Apache Spark an execution engine for Apache Hive is making solid progress.

Apache Spark is quickly becoming the programmatic successor to MapReduce for data processing on Apache Hadoop. Over the course of its short history, it has become one of the most popular projects in the Hadoop ecosystem, and is now supported by multiple industry vendors—ensuring its status as an emerging standard.

Two months ago Cloudera, Databricks, IBM, Intel, MapR, and others came together to port Apache Hive and the other batch processing engines to Spark. In October at Strata + Hadoop World New York, the Hive on Spark project lead Xuefu Zhang shared the project status and a provided a demo of our work. The same week at the Bay Area Hadoop User Group, Szehon Ho discussed the project and demo’ed the work completed. Additionally, Xuefu and Suhas Satish will be speaking about Hive on Spark at the Bay Area Hive User Group on Dec. 3.

The community has committed more than 140 changes to the Spark branch as part of HIVE-7292 – Hive on Spark. We are proud to say that queries are now functionally able to run, as you can see in the demo below of a multi-node Hive-on-Spark query (query 28 from TPC-DS with a scale factor of 20 on a TPC-DS derived dataset).

This demo is intended to illustrate our progress toward porting Hive to Spark, not to compare Hive-on-Spark performance versus other engines. The Hive-on-Spark team is now focused on additional join strategies, like Map-side joins, statistics, job monitoring, and other operational aspects. As these pieces come together, we’ll then shift our focus to the performance tuning and optimization needed prior to general release.

The Hive and Spark communities have worked closely together to make this effort possible. In order to support Hive on Spark, Spark developers have provided enhancements including MapReduce-style shuffle transformation, removed Guava from the public API, and improved the Java version of the Spark API. Among other enhancements in progress, the Spark community is working hard to provide elastic scaling within a Spark application. (Elastic Spark application scaling is a favorite request from longtime Spark users.) Given all the enhancements Hive on Spark is driving within Spark, the Hive-on-Spark project is turning out to be beneficial for their respective communities.

We look forward to providing another update in a few months. Until then, please enjoy the demo video!

Finally, a big thanks to to the project team members: Chao Sun, Chengxiang Li, Chinna Rao Lalam, Jimmy Xiang, Marcelo Vanzin, Na Yang, Reynold Xin, Rui Li, Sandy Ryza, Suhas Satish, Szehon Ho, Thomas Friedrich, Venki Korukanti, and Xuefu Zhang.

Brock Noland is a Software Engineer at Cloudera and an Apache Hive committer/PMC member.

Categories: Hadoop

Guidelines for Installing CDH Packages on Unsupported Operating Systems

Cloudera Blog - Tue, 11/18/2014 - 17:04

Installing CDH on newer unsupported operating systems (such as Ubuntu 13.04 and later) can lead to conflicts. These guidelines will help you avoid them.

Some of the more recently released operating systems that bundle portions of the Apache Hadoop stack in their respective distro repositories can conflict with software from Cloudera repositories. Consequently, when you set up CDH for installation on such an OS, you may end up picking up packages with the same name from the OS’s distribution instead of Cloudera’s distribution. Package installation may succeed, but using the installed packages may lead to unforeseen errors. 

If you are manually installing (via apt-get, yum, or Puppet) packages for Ubuntu 14.04 with CDH 5.2.0 or later (which is supported by Cloudera), this issue does not pertain to you—Cloudera Manager takes care of the necessary steps to avoid conflicts. Furthermore, if you are using CDH parcels instead of packages for any release or OS, conflicts are similarly not an issue.

If, however, you are either:

  • Manually installing CDH packages (any release) on a newer unsupported OS (such as Ubuntu 13.04, Ubuntu 13.10, Debian 7.5, Fedora 19, and Fedora 20–refer to the CDH 5 Requirements and Supported Versions guide for an up-to-date list of supported OSs), or
  • Manually installing CDH packages for a release earlier than CDH 5.2.0 on Ubuntu 14.04

then you should find the following good-faith installation guidelines helpful.

(Note: If you are installing CDH 5.2 packages manually on a supported OS like Ubuntu 14.04, the documentation lists the necessary steps you need to take. However, you may still find this blog post useful as background reading.)

The Problem in Action

As explained above, if you are mixing-and-matching packages between distributions, you may easily end up with misleading errors.

For example, here is an error when running hbase shell on Ubuntu 13.04 with CDH 5. In this case, the zookeeper package is installed from the OS repositories (in this case, Ubuntu 13.04) whereas the hbase package is installed from CDH.

$ hbase shell 2014-09-08 13:27:51,017 INFO  [main] Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available HBase Shell; enter 'help' for list of supported commands. Type "exit" to leave the HBase Shell Version 0.98.1-cdh5.1.2, rUnknown, Mon Aug 25 19:33:59 PDT 2014 hbase(main):001:0> status ERROR: cannot link Java class org.apache.hadoop.hbase.client.HBaseAdmin, probable missing dependency: org/apache/zookeeper/KeeperException

Does My OS Have This Problem?

In the table below, you will find the mapping of various operating systems and the conflicting packages.

Red means a conflict exists: Installing packages today on the OS would install some package(s) from the OS repo instead of the CDH repo, or worse, have a mix of packages from OS and CDH repo. The value of the field represents the package that will be installed from the OS. For example, OS zookeeper refers to the fact that the zookeeper package would be installed from the OS instead of the CDH repository, which will cause issues.

Orange means no conflict currently exists but that one could arise if the OS repo decides to bump up or change the package version.

If you are using a problematic OS, you will find the solution in the next section.

(Note: Even though Ubuntu 14.04 is listed as a “problematic” OS in the above table, the solution described below is already implemented in Cloudera Manager and described in the documentation. You don’t have to do anything extra if you are using Cloudera Manager or simply following the documentation.)

Solution

The best way to fix this problem is to ensure that all the packages are coming from the same CDH repository. The OS repository is added by default and it’s usually not a good idea to disable that repository. You can, however, set the priority of CDH repo to be higher than the default OS repo. Consequently, if there is a package with the same name in the CDH and the default OS repo, the package from the CDH repository would take precedence over the one in the OS repository regardless of which one has the higher version. This concept is generally referred to as pinning.

For Debian-based OSs (e.g. Ubuntu 13.04, Ubuntu 13.10, Debian 7.5)

Create a file at /etc/apt/preferences.d/cloudera.pref with the following contents:

Package: * Pin: release o=Cloudera, l=Cloudera Pin-Priority: 501

No apt-get update is required after creating this file.

For those curious about this solution, the default priority of packages is 500. By creating the file above, you provide a higher priority of 501 to any package that has origin specified as “Cloudera” (o=Cloudera) and is coming from Cloudera’s repo (l=Cloudera), which does the trick.

For RPM-based OSs (such as Fedora 19 and Fedora 20)

Install the yum-plugin-priorities package by running:

sudo yum install yum-plugin-priorities

This package enables us to use yum priorities which you will see in the next step.

Then, edit the relevant cloudera-cdh*.repo file under /etc/yum.repos.d/ and add this line at the bottom of that file:

priority = 98

The default priority for all repositories (including the OS repository) is 99. Lower priority takes more precedence in RHEL/CentOS. By setting the priority to 98, we give the Cloudera repository higher precedence than the OS repository.

For OSs Not on the List

In general, you will have a problem if the OS repository and the CDH repository provide a package with the same name. The most common conflicting packages are zookeeper and hadoop-client, so as a start you need to ascertain whether there is more than one repository delivering those packages.

On a Debian-based system, you can run something like apt-cache policy zookeeper. That command will list all the repositories where the package zookeeper is available. For example, here is the result of running apt-cache policy zookeeper on Ubuntu 13.04:

root@ip-172-26-1-209:/etc/apt/sources.list.d# apt-cache policy zookeeper zookeeper:   Installed: (none)   Candidate: 3.4.5+dfsg-1~exp2   Version table:      3.4.5+dfsg-1~exp2 0         500 http://us-west-1.ec2.archive.ubuntu.com/ubuntu/ raring/universe amd64 Packages      3.4.5+cdh5.1.0+28-1.cdh5.1.0.p0.208~precise-cdh5.1.0 0         500 http://archive.cloudera.com/cdh5/ubuntu/precise/amd64/cdh/ precise-cdh5/contrib amd64 Packages

As you can see, the package zookeeper is available from two repositories: Ubuntu’s Raring Universe Repository and the CDH repository. So, you have a problem.

On a yum-based system, you can run something like yum whatprovides hadoop-client. That command will list all the repositories where the hadoop-client package is available. For example, here is the result from Fedora 20:

$yum whatprovides hadoop-client Loaded plugins: priorities cloudera-cdh4 hadoop-client-2.0.0+1604-1.cdh4.7.0.p0.17.el6.x86_64 : Hadoop client side dependencies Repo : cloudera-cdh4 hadoop-client-2.2.0-1.fc20.noarch : Libraries for Hadoop clients Repo : fedora hadoop-client-2.2.0-5.fc20.noarch : Libraries for Apache Hadoop clients Repo : updates

As you can see, the package zookeeper is available from multiple repositories: Fedora 20 repositories and the CDH repository. Again, that’s a problem.

Conclusion

Managing package repositories that deliver conflicting packages can be tricky. You have to take the above steps on affected operating systems to avoid any conflicts.

To re-iterate, this issue is mostly contained to the manual use of packages on unsupported OSs:

  • If you are using parcels, you don’t have to worry about such problems. On top of that you get easy rolling upgrades.
  • If you are installing packages via Cloudera Manager, you don’t have to worry about such problems since Cloudera Manager takes care of pinning.
  • If the preceding points don’t apply to you, follow the instructions in the blog post to ensure there are no conflicts among CDH and OS packages

Mark Grover is a Software Engineer on Cloudera Engineering’s Packaging and Integration team, an Apache Bigtop committer, and a co-author of the O’Reilly Media book, Hadoop Application Architectures.

Categories: Hadoop

How Apache Sqoop 1.4.5 Improves Oracle Database/Apache Hadoop Integration

Cloudera Blog - Fri, 11/14/2014 - 16:44

Thanks to Guy Harrison of Dell Inc. for the guest post below about time-tested performance optimizations for connecting Oracle Database with Apache Hadoop that are now available in Apache Sqoop 1.4.5 and later.

Back in 2009, I attended a presentation by a Cloudera employee named Aaron Kimball at the MySQL User Conference in which he unveiled a new tool for moving data from relational databases into Hadoop. This tool was to become, of course, the now very widely known and beloved Sqoop!

As someone who had worked with relational databases—particularly Oracle Database—for more than 20 years and who was becoming very interested in Hadoop, Sqoop immediately piqued my interest. A general-purpose tool for moving data between RDBMS and Hadoop obviously seemed important, but experience had already taught me that generic database access methods are rarely optimal for a specific database because each RDBMS implements its own fast-path access methods to gain competitive advantage. It instantly occurred to me that using these fast-path methods was clearly the key to making Sqoop not just a good general-purpose tool but also a high-performance solution.

For MySQL, Aaron had already integrated a fast-path export based on the mysqldump utility. I wondered if my team at Quest Software (now part of Dell Software) could create a fast-path access method for Oracle. 

OraOop Beginnings

When we looked at Sqoop, we found a few significant opportunities for optimization. The most significant involved how Sqoop distributes work across mapper tasks. When parallelizing across a large table, Sqoop uses primary-key ranges to break up the work. Each mapper runs a query to grab a range of primary key values. This approach can lead to a few undesirable outcomes in Oracle Database, for example:

  • The Oracle optimizer uses cost-based algorithms to decide between primary key index lookups and full table scans. Sometimes, Oracle will execute each primary key range lookup by a full table scan—resulting in multiple scans hitting the Oracle database simultaneously.
  • Oracle Database may even decide to use its own parallel query capability to parallelize individual mapper tasks—increasing the load on the database and disturbing relative load created by each mapper.
  • In Oracle Database, physical location of data is not normally dictated by primary key—so a physical block on disk may contain data required by multiple mappers. Consequently, individual blocks on disk would each be accessed multiple times creating excessive IO.
  • When the Oracle Database optimizer decides to use index scans, the RDBMS will load the resulting data into its buffer cache—driving out blocks required for normal database operation with blocks that are only needed by Sqoop.
  • Primary keys are often not uniformly distributed: as older rows are deleted, the older primary key ranges become sparse. Sqoop’s algorithms could not take this into account and often one mapper would have many more rows to process than another. 

Thus for our initial implementation of a Sqoop connector (informally called “OraOop”), we had the following design goals:

  • Partition data to the mappers based on physical storage characteristics so that each mapper is allocated a completely distinct set of physical blocks and no physical block is read more than once.
  • Make sure each mapper receives an equal amount of work.
  • Bypass Oracle Database parallelism and the Oracle buffer cache.
  • Neither require nor use indexes.
  • Use Oracle “direct path” IO.

The first release of OraOop in 2010 achieved these goals, and as Sqoop added bi-directional transfer capabilities, we performed similar optimizations for moving data from Hadoop to Oracle, including exploiting the Oracle direct-path insert capability, utilizing Oracle partitions to maximize parallelism, and adding a MERGE capability that simultaneously updates and inserts into the target Oracle tables.

Despite the free availability of our Apache-licensed connector, many Sqoop users understandably continued to use default Sqoop, sometimes with disappointing outcomes. So in early 2014 Dell, Cloudera, and others in the Apache Sqoop community collaborated to bring the OraOop code directly into core Sqoop. We are all very happy to see this functionality now fully integrated into Sqoop, starting with release 1.4.5 (packaged in CDH 5.1 and later).

Assuming you use the direct=true clause on your Sqoop command line, all the optimizations outlined above will be employed. (Use of this clause does require that the Oracle account have the privileges required to read the Oracle extent information; it is not supported for views or index-organized tables.) There is no longer any need to install an additional Oracle connector.

Performance Results

The chart below illustrates a typical reduction in Oracle Database overhead when using the new Sqoop 1.4.5 -direct=true setting (import of a 310GB, 1-billion row table). The test platform had the following characteristics:

  • 9-node cluster on AWS EC2 (1 x NameNode/JobTracker, 8 x DataNode/TaskTracker), each with 8 CPUs and 15GB RAM, running CDH 5.1
  • 1 32-CPU database server with 60GB RAM, running Oracle Database 12.1.0.2.0
  • 10GB Ethernet on the database server, 1GB Ethernet on the Hadoop nodes

As you can see, overall elapsed time reduced by 83%, database time (total time consumed by all database operations) reduced by 90%, and IO requests reduced by 99%! The next chart shows the scalability profile for the direct=true option compared to direct=false (the only available option for Oracle Database transfers to Sqoop prior to 1.4.5). 

We can draw these conclusions:

  • When data is un-clustered (arranged on disk in effectively random order), the direct=false mode scales very poorly (blue line) because the index-range scans do not result in contiguous IO requests on disk—resulting in a lot of duplicate IO by each mapper.
  • As the number of mappers increases, this IO load clobbers the database and elapsed time degrades. When data is highly clustered (green line), performance scales better because each mapper’s index-range scan accesses distinct blocks of data on disk.
  • The new Sqoop direct mode (red line) is unaffected by the clustering of primary keys because it always reads blocks directly from disk in physically contiguous ranges. The absolute improvement in elapsed time varies, but is generally from 5-20 times greater than the non-direct mode.
Conclusion

As you can see, the performance benefits and reduction in load on the Oracle side of these optimizations is extremely significant. If you are performing data transfers between Oracle Database and Hadoop, we encourage you to try out the new Scoop 1.4.5 direct mode.

Guy Harrison (@guyharrison) is an Executive Director of Research and Development at the Dell Software Group. Guy is the author of five books and many articles on database and data management and writes a monthly column for Database Trends and Applications.

Categories: Hadoop

The Story of the Cloudera Engineering Hackathon (2014 Edition)

Cloudera Blog - Tue, 11/11/2014 - 22:07

Cloudera’s culture is premised on innovation and teamwork, and there’s no better example of them in action than our internal hackathon.

Cloudera Engineering doubled-down on its “hackathon” tradition last week, with this year’s edition taking an around-the-clock approach thanks to the HQ building upgrade since the 2013 edition (just look at all that space!).

This year, Cloudera software engineers had 24 straight hours to conceive, build, and present their hacks to a panel of celebrity judges. Thanks to a steady supply of solid- and liquid-state fuel, stimulation, and all-around great attitude, pet projects like OYA (“Oozie squeezing the hell out of YARN”), Cloudera Vacuum, HiveKa (Apache Hive-on-Apache Kafka), YARN-on-Mesos, and the Cloudera Data Science Sports Division saw the light of day, and there was much rejoicing.

And now, the Cloudera Engineering Hackathon 2014 in pictures (most photos by Jarcec Cecho):

 The tone was set early as peeps arrived:

Caffeine as a Service (CaaS) is always an important ingredient:

The ingredients didn’t stop at caffeine though:

With tanks full, it was time to start hacking…

And hacking…

And hacking some more (SO serious).

I’m pretty sure this photo was taken at 2 or 3am (but not by me!), is it obvious?

The presentations were grueling for everyone involved:

With our extra-intimidating judges involved, how could they not be?

But when all was said and done, the winners were ecstatic:

 

And that’s the story of the Cloudera Engineering Hackathon (2014 Edition).

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

How Cerner Uses CDH with Apache Kafka

Cloudera Blog - Tue, 11/11/2014 - 16:52

Our thanks to Micah Whitacre, a senior software architect on Cerner Corp.’s Big Data Platforms team, for the post below about Cerner’s use case for CDH + Apache Kafka. (Kafka integration with CDH is currently incubating in Cloudera Labs.)

Over the years, Cerner Corp., a leading Healthcare IT provider, has utilized several of the core technologies available in CDH, Cloudera’s software platform containing Apache Hadoop and related projects—including HDFS, Apache HBase, Apache Crunch, Apache Hive, and Apache Oozie. Building upon those technologies, we have been able to architect solutions to handle our diverse ingestion and processing requirements.

At various points, however, we reached certain scalability limits and perhaps even abused the intent of certain technologies, causing us to look for better options. By adopting Apache Kafka, Cerner has been able to solidify our core infrastructure, utilizing those technologies as they were intended.

One of the early challenges Cerner faced when building our initial processing infrastructure was moving from batch-oriented processing to technologies that could handle a streaming near-real-time system. Building upon the concepts in Google’s Percolator paper, we built a similar infrastructure on top of HBase. Listeners interested in data of specific types and from specific sources would register interest in data written to a given table. For each write performed, a notification for each applicable listener would be written to a corresponding notification table. Listeners would continuously scan a small set of rows on the notification table looking for new data to process, deleting the notification when complete.

Our low-latency processing infrastructure worked well for a time but quickly reached scalability limits based on its use of HBase. Listener scan performance would degrade without frequent compactions to remove deleted notifications. During the frequent compactions, performance would degrade, causing severe drops in processing throughput. Processing would require frequent reads from HBase to retrieve the notification, the payload, and often supporting information from other HBase tables. The high number of reads would often contend with writes done our processing infrastructure that were writing transformed payloads and additional notifications for downstream listeners. The I/O contention and the compaction needs required careful management to distribute the load across the cluster, often segregating the notification tables on isolated region servers. 

Adopting Kafka was a natural fit for reading and writing notifications. Instead of scanning rows in HBase, a listener would process messages off of a Kafka topic, updating its offset as notifications were successfully processed. 

Kafka’s natural separation of producers and consumers eliminated contention at the HBase RegionServer due to the high number of notification read and write operations. Kafka’s consumer offset tracking helped to eliminate the need for notification deletes, and replaying notifications became as simple as resetting the offset in Kafka. Offloading the highly transient data from HBase greatly reduced unnecessary overhead from compactions and high I/O. 

Building upon the success of Kafka-based notifications, Cerner then explored using Kafka to simplify and streamline data ingestion. Cerner systems ingest data from multiple disparate sources and systems. Many of these sources are external to our data centers. The “Collector,” a secured HTTP endpoint, will identify and namespace the data before it is persisted into HBase. Prior to utilizing Kafka, our data ingestion infrastructure targeted a single data store such as an HBase cluster. 

The system satisfied our initial use cases but as our processing needs changed, so did the complexity of our data ingestion infrastructure. Data would often need to be ingested into multiple clusters in near real time, and not all data needed the random read/write functionality of HBase.

Utilizing Kafka in our ingestion platform helped provide a durable staging area, giving us a natural way to broadcast the ingested data to multiple destinations. The collector process stayed simple by persisting data into Kafka topics, segregated by source. Pushing data to Kafka resulted in a noticeable improvement as the uploading processes were no longer subject to intermittent performance degradations due to compaction or region splitting with HBase.

After data lands in Kafka, Apache Storm topologies push data to consuming clusters independently. Kafka and Storm allow the collector process to remain simple by eliminating the need to deal with multiple writes or the performance influence of the slowest downstream system. Storm’s at least once guarantee of delivering the data is acceptable because persistence of the data is idempotent.

The separation that Kafka provides also allows us to aggregate the data for processing as necessary. Some medical data feeds produce a high volume of small payloads that only need to be processed through batch methods such as MapReduce. Linkedin’s Camus project allows our ingestion platform to persist batches of small payloads within Kafka topics into larger files in HDFS for processing. In fact, all the data we ingest into Kafka is archived into HDFS as Kite SDK Datasets using the Camus project. This approach gives us the ability to perform further analytics and processing that do not require low latency processing on that data. Archiving the data also provides a recovery mechanism in case data delivery lags beyond the topic retention policies of Kafka. 

Cerner’s use of Kafka for ingesting data will allow us to continue to experiment and evolve our data processing infrastructure when new use cases are discovered. Technologies such as Spark Streaming, Apache Samza (incubating), and Apache Flume can be explored as alternatives or additions to the current infrastructure. Cerner can prototype Lambda and Kappa architectures for multiple solutions independently without affecting the processes producing data. As Kafka’s multi-tenancy capabilities develop, Cerner can also look to simplify some of its data persistence needs, eliminating the need to push to downstream HBase clusters. 

Overall, Kafka will play a key role in Cerner’s infrastructure for large-scale distributed processing and be a nice companion to our existing investments in Hadoop and HBase.

Micah Whitacre (@mkwhit) is a senior software architect on Cerner Corp.’s Big Data Platforms team, and an Apache Crunch committer.

Categories: Hadoop

Where to Find Cloudera Tech Talks (Through End of 2014)

Cloudera Blog - Mon, 11/10/2014 - 16:52

Find Cloudera tech talks in Seattle, Las Vegas, London, Madrid, Budapest, Barcelona, Washington DC, Toronto, and other cities through the end of 2014.

Below please find our regularly scheduled quarterly update about where to find tech talks by Cloudera employees—this time, for the remaining dates of 2014. Note that this list will be continually curated during the period; complete logistical information may not be available yet. And remember, many of these talks are in “free” venues (no cost of entry).

As always, we’re standing by to assist your meetup by providing speakers, sponsorships, and schwag!

Date City Venue Speaker(s) Nov. 9-14 Seattle USENIX LISA14 Gwen Shapira and Yanpei Chen on big data benchmarking, Yanpei and Karthik Kambatla on Hadoop-on-SSD, Kate Ting and Jon Hsieh on Hadoop ops Nov. 11-14 Las Vegas AWS re:Invent Amandeep Khurana on Hadoop/cloud bets practices Nov. 11-14 Washington, DC Lucene/Solr Revolution 2014 Romain Rigaux on Hue’s Search app, Mark Miller on Solr-on-HDFS, Greg Chanan on secure search Nov. 17 London Hadoop Users Group UK Mark Grover and Ted Malaska on Hadoop apps architecture Nov. 17 Madrid Big Data Spain Sean Owen on Spark-based anomaly detection, Yanpei Chen and Gwen Shapira on big data benchmarking, Mark Grover and Jonathan Seidman on Hadoop apps architecture, Enrico Berti on Hue Nov. 18 Budapest Big Data Budapest Colin McCabe on HDFS optimization Nov. 19 Bellevue, Wash. Seattle Spark Meetup Jain Ranganathan on Spark/CDH integration Nov. 19-21 Barcelona Strata+Hadoop World Barcelona Multiple Cloudera speakers: Impala, Spark, genomics, Hadoop apps architecture, more Nov. 17-21 Budapest ApacheCon Europe Dima Spivak on Hadoop-on-Docker, Colin McCabe on native clients for HDFS and HDFS optimization, Xuefu Zhang on security Dec. 2 Menlo Park, Calif. Bay Area Spark Meetup TBD Dec. 5 Dulles, Va. DevIgnition 2014 Doug Cutting on Hadoop ecosystem Dec. 18 Louisville, Ky. Louisville BI & Big Data Meetup TBD Dec. 18 Toronto, Ont. Toronto HUG Mark Grover on Hadoop apps architecture

 

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

This Month in the Ecosystem (October 2014)

Cloudera Blog - Fri, 11/07/2014 - 18:34

Welcome to our 14th edition of “This Month in the Ecosystem,” a digest of highlights from October 2014 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly).

  • Approximately 5,000 people converged at Javits Center for Strata + Hadoop World New York 2014, where Cloudera and O’Reilly Media announced an extended partnership in which Strata + Hadoop World events will be held around the world, 4x per year. Next stops: San Jose (Feb. 17-20), Barcelona (Nov. 19-21) and London (May 5-7).
  • Cloudera announced the formation of Cloudera Labs, an incubator for ecosystem innovations that are candidates for shipping inside CDH. Among its first projects is CDH integration with Apache Kafka.
  • Via its Cloudera Live program, Cloudera also made a new end-to-end Apache Hadoop tutorial available to Hadoop evaluators that utilizes a full cloud-based demo cluster and sample data (free access for two weeks). Tableau and Zoomdata flavors are also provided.
  • eBay has open sourced its Kylin SQL-on-Hadoop framework. Remember when SQL was “dead?”
  • MapR announced that it will add Apache Spark support to Apache Drill. Furthermore, several other Big Data related vendors announced transitions away from MapReduce in their platforms and toward Spark. The Spark invasion continues!
  • Cloudera and Microsoft announced that Cloudera Enterprise will become certified on Azure and that deeper integration with the Microsoft stack is forthcoming.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

Flafka: Apache Flume Meets Apache Kafka for Event Processing

Cloudera Blog - Thu, 11/06/2014 - 19:10

The new integration between Flume and Kafka offers sub-second-latency event processing without the need for dedicated infrastructure.

In this previous post you learned some Apache Kafka basics and explored a scenario for using Kafka in an online application. This post takes you a step further and highlights the integration of Kafka with Apache Hadoop, demonstrating both a basic ingestion capability as well as how different open-source components can be easily combined to create a near-real time stream processing workflow using Kafka, Apache Flume, and Hadoop. (Kafka integration with CDH is currently incubating in Cloudera Labs.)

The Case for Flafka

One key feature of Kafka is its functional simplicity. While there is a lot of sophisticated engineering under the covers, Kafka’s general functionality is relatively straightforward. Part of this simplicity comes from its independence from any other applications (excepting Apache ZooKeeper). As a consequence however, the responsibility is on the developer to write code to either produce or consume messages from Kafka. While there are a number of Kafka clients that support this process, for the most part custom coding is required.

Cloudera engineers and other open source community members have recently committed code for Kafka-Flume integration, informally called “Flafka,” to the Flume project. Flume is a distributed, reliable, and available system for efficiently collecting, aggregating, and moving large amounts of data from many different sources to a centralized data store. Flume provides a tested, production-hardened framework for implementing ingest and real-time processing pipelines. Using the new Flafka source and sink, now available in CDH 5.2, Flume can both read and write messages with Kafka.

Flume can act as a both a consumer (above) and producer for Kafka (below).

Flume-Kafka integration offers the following functionality that Kafka, absent custom coding, does not.

  • Producers – Use Flume sources to write to Kafka
  • Consumers – Write to Flume sinks reading from Kafka
  • A combination of the above
  • In-flight transformations and processing

This functionality expands your ability to utilize all the features of Flume such as bucketing and event modification / routing, Kite SDK Morphline Integration, and NRT indexing with Cloudera Search

Next, we’ll walk you through an example application using the ingestion of credit-card data as the use case. All example code and configuration info involved are available here. A detailed walkthrough of the setup and example code is in the readme.

Example: Transaction Ingest

Assume that you are ingesting transaction data from a card processing system, and want to pull the transactions directly from Kafka and write them into HDFS.

The record simply contains a UUID for a transaction_id, a dummy credit-card number, timestamp, amount, and store_id for the transaction.

888fc23a-5361-11e4-b76d-22000ada828b|4916177742705110|2014-10-14 01:18:29|67.88|1433 888fdb26-5361-11e4-b76d-22000ada828b|4929011455520|2014-10-14 01:18:29|45.22|886 888ff1e2-5361-11e4-b76d-22000ada828b|4532623020656|2014-10-14 01:18:29|27.14|681 88900c72-5361-11e4-b76d-22000ada828b|4024007162856600|2014-10-14 01:18:29|34.63|577

To import this data directly into HDFS, you could use the following Flume configuration.

# Sources, channels, and sinks are defined per # agent name, in this case flume1. flume1.sources = kafka-source-1 flume1.channels = hdfs-channel-1 flume1.sinks = hdfs-sink-1 # For each source, channel, and sink, set # standard properties. flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource flume1.sources.kafka-source-1.zookeeperConnect = flume1.ent.cloudera.com:2181/kafka flume1.sources.kafka-source-1.topic = flume.txn flume1.sources.kafka-source-1.batchSize = 100 flume1.sources.kafka-source-1.channels = hdfs-channel-1 flume1.channels.hdfs-channel-1.type = memory flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1 flume1.sinks.hdfs-sink-1.type = hdfs flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d flume1.sinks.hdfs-sink-1.hdfs.rollCount=100 flume1.sinks.hdfs-sink-1.hdfs.rollSize=0 # Other properties are specific to each type of # source, channel, or sink. In this case, we # specify the capacity of the memory channel. flume1.channels.hdfs-channel-1.capacity = 10000 flume1.channels.hdfs-channel-1.transactionCapacity = 1000

This configuration defines an agent using the Kafka Source and a standard HDFS sink. Connecting to Kafka from Flume is as simple as setting the topic, ZooKeeper server, and channel. Your generated transactions will be persisted to HDFS with no coding necessary.

The Kafka Source allows for a number of different configuration options.

Property

Default

Description

type*

 

Must be set to org.apache.flume.source.kafka.KafkaSource

topic*

 

The Kafka topic from which this source reads messages. Flume supports only one topic per source.

zookeeperConnect*

 

The URI of the ZooKeeper server or quorum used by Kafka. This URI can be a single node (for example, zk01.example.com:2181) or a comma-separated list of nodes in a ZooKeeper quorum (for example, zk01.example.com:2181, zk02.example.com:2181, zk03.example.com:2181). If you have created a path in ZooKeeper for storing Kafka data, specify the path in the last entry in the list (for example, zk01.example.com:2181, zk02.example.com:2181, zk03.example.com:2181/kafka). Use the /kafka ZooKeeper path for Cloudera Labs Kafka, because it is created automatically at installation.

batchSize

1000

The maximum number of messages that can be written to a channel in a single batch.

batchDurationMillis

1000

The maximum time (in ms) before a batch is written to the channel. The batch is written when the batchSize limit or batchDurationMillis limit is reached, whichever comes first.

consumer.timeout.ms

10

kafka.consumer.timeout.ms (polling interval for new data for batch)

auto.commit.enabled

false

If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin.

groupId

flume

The unique identifier of the Kafka consumer group. Set the same groupID in all sources to indicate that they belong to the same consumer group.

*Required

Any other properties to pass when creating a Kafka consumer can be accomplished by using the kafka. prefix.

You can declare the batch size can be declared in one of two ways: by specifying the size of the batch in terms of number of events (batchSize), or as a number of milliseconds (batchDurationMillis) to wait while receiving events from Kafka. In this manner, latency-based SLAs can be maintained for lower volume flows.

Note: With any real-time ingestion or processing system there is a tradeoff involved between throughput and single-event processing latency. There is some overhead in processing a batch of events; and so by decreasing the batch size, this overhead is incurred more frequently. Furthermore, events wait until the batch size is attained so per-event latency can suffer. You should experiment with different batch sizes to attain the proper latency and throughput SLAs.

By default, Flume uses the groupId “flume” when reading from Kafka. Adding multiple Flume sources with the same groupId will mean that each Flume agent will get a subset of the messages and can increase throughput. It is best to have any other consumers outside of Flume use a separate groupId so as to avoid message loss.

Example: Event Processing During Ingest

Let’s take our example further and assume that you not only want to use Hadoop for a long-term persistence layer, but also like to build a pipeline for performing arbitrary event processing. Flume provides a key component called the interceptor, part of the Flume extensibility model. Interceptors have the following characteristics; they can

  • Inspect events as they pass between source and channel
  • Modify or drop events as required
  • Be chained together to form a processing pipeline
  • Execute any custom code within the event processing

You can use Flume interceptors to do a variety of processing against incoming events as they pass through the system. In this example, you’ll be calculating a simple “Travel Score” to attempt to identify whether a banking customer is traveling while using their debit card. The exact use case is fabricated, but the architecture can be used to apply virtually any online model or scoring while returning results in sub-second times. Other uses of the interceptor could include:

  • Inspecting the content of the message for proper routing to a particular location such as by geo region
  • Calculating a streaming TopN list
  • Callout to a machine learning serving layer
  • Event enrichment / augmentation
  • In-flight data masking

Thus you can essentially deploy a Hadoop-enabled Kafka consumer group with built-in metrics and manageability via Cloudera Manager—as any Java code, such as a Spring Integration or Apache Camel flow, can be dropped into the interceptor.

(Note: For complex stream processing use cases, Spark Streaming provides the most flexible and feature rich execution engine. Flume Interceptors provide a great way to process events with very low latency and minimal complexity. For per-event response latencies under 50 ms, building a custom application is the right choice.)

To do any meaningful processing of the event as it arrives, you need to enrich the incoming transaction with information from your other systems. For that, call Apache HBase to get additional values related to the transaction and modify the record to reflect the results of the processing performed by Interceptor.

Now you can write your event directly to HDFS as before or back to Kafka, where the event could be picked up by other systems or for more comprehensive stream processing. In this case, you’ll return it directly back to Kafka so that the authorization result can be immediately returned to the client.

The updated Flume configuration looks like this:

# Sources, channels, and sinks are defined per # agent name, in this case flume1. flume1.sources = kafka-source-1 flume1.channels = hdfs-channel-1 flume1.sinks = kafka-sink-1 # For each source, channel, and sink, set # standard properties. flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource flume1.sources.kafka-source-1.zookeeperConnect = kafka1.ent.cloudera.com:2181/kafka flume1.sources.kafka-source-1.topic = flume.txn flume1.sources.kafka-source-1.batchSize = 5 flume1.sources.kafka-source-1.batchDurationMillis = 200 flume1.sources.kafka-source-1.channels = hdfs-channel-1 flume1.sources.kafka-source-1.interceptors = int-1 flume1.sources.kafka-source-1.interceptors.int-1.type=cloudera.se.fraud.demo.flume.interceptor.FraudEventInterceptor$Builder flume1.sources.kafka-source-1.interceptors.int-1.threadNum = 200 flume1.channels.hdfs-channel-1.type = memory flume1.sinks.kafka-sink-1.channel = hdfs-channel-1 flume1.sinks.kafka-sink-1.type = org.apache.flume.sink.kafka.KafkaSink flume1.sinks.kafka-sink-1.batchSize = 5 flume1.sinks.kafka-sink-1.brokerList = kafka1.ent.cloudera.com:9092 flume1.sinks.kafka-sink-1.topic = flume.auths # Other properties are specific to each type of # source, channel, or sink. In this case, we # specify the capacity of the memory channel. flume1.channels.hdfs-channel-1.capacity = 10000 flume1.channels.hdfs-channel-1.transactionCapacity = 10000

Configuring the Flafka sink is as easy as configuring the source, with just a few declarations needed. The interceptor also just needs a few lines for configuration. After configuration is done, place the project jar in the Flume classpath, restart, and the pipeline is ready to go.

Like the source, the sink also supports passing configs to use in the Kafka producer by using the kafka. prefix. The sink supports the following:

Property

Default

Description

type*

 

Must be set to org.apache.flume.sink.kafka.KafkaSink

brokerList*

 

The brokers the Kafka sink uses to discover topic partitions formatted as a comma-separated list of hostname:port entries. You do not need to specify the entire list of brokers but Cloudera recommends that you specify at least two for HA.

topic

default-flume-topic

The Kafka topic to which messages are published by default. If the event header contains a topic field the event is published to the designated topic, overriding the configured topic.

batchSize

100

The number of messages to process in a single batch. Specifying a larger batchSize can improve throughput and increase latency.

requiredAcks

1

The number of replicas that must acknowledge a message before it is written successfully. Possible values are 0 (do not wait for an acknowledgement), 1 (wait for the leader to acknowledge only) and -1 (wait for all replicas to acknowledge). To avoid potential loss of data in case of a leader failure, set this to -1.

 

*Required

Furthermore, the sink supports the addition of per-event topic and key headers as set in the interceptor. As mentioned previously, if the source of the message is the Kafka source, the topic header will be set to the topic of the Flume source.

In testing this simple scenario, we were able to achieve sub-150ms latency using one Flume agent, one Kafka partition, and one broker using a small 3-node m2.2xlarge cluster in AWS.

Flume’s Kafka Channel

The recent commit of FLUME-2500 introduces Kafka as a channel in Flume in addition to the traditional file and memory channels. This functionality will be available in CDH 5.3/Flume 1.6, and provides the ability to:

  • Write to Hadoop directly from Kafka without using a source
  • Be used as a reliable and highly available channel for any source/sink combination

The Flume memory channel does not protect against data loss in the event of agent failure, and the when using the file channel, any data in a channel not yet written to a sink will be unavailable until the agent is recovered. The Kafka channel addresses both of these limitations.

Utilizing a Flume source allows you to use interceptors and selectors before writing to Kafka. But the channel can also be utilized in the following way:

Building on our example to instead use the Kafka channel, the configuration might look like this:

# Sources, channels, and sinks are defined per # agent name, in this case flume1. flume1.sources = kafka-source-1 flume1.channels = kafka-channel-1 flume1.sinks = hdfs-sink-1 # For each source, channel, and sink, set # standard properties. flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource flume1.sources.kafka-source-1.zookeeperConnect = kafka1.ent.cloudera.com:2181/kafka flume1.sources.kafka-source-1.topic = flume.txn flume1.sources.kafka-source-1.batchSize = 5 flume1.sources.kafka-source-1.batchDurationMillis = 200 flume1.sources.kafka-source-1.channels = hdfs-channel-1 flume1.sources.kafka-source-1.interceptors = int-1 flume1.sources.kafka-source-1.interceptors.int-1.type=cloudera.se.fraud.demo.flume.interceptor.FraudEventInterceptor$Builder flume1.sources.kafka-source-1.interceptors.int-1.threadNum = 200 flume1.channels.kafka-channel-1.type = org.apache.flume.channel.kafka.KafkaChannel flume1.channels.kafka-channel-1.brokerList = kafka1.ent.cloudera.com:9092 flume1.channels.kafka-channel-1.topic = flume.auths flume1.channels.kafka-channel-1.zookeeperConnect = kafka1.ent.cloudera.com:2181/kafka flume1.sinks.hdfs-sink-1.channel = kafka-channel-1 flume1.sinks.hdfs-sink-1.type = hdfs flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true flume1.sinks.hdfs-sink-1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d flume1.sinks.hdfs-sink-1.hdfs.rollCount=100 flume1.sinks.hdfs-sink-1.hdfs.rollSize=0 # specify the capacity of the memory channel. flume1.channels.kafka-channel-1.capacity = 10000 flume1.channels.kafka-channel-1.transactionCapacity = 10000

Using this configuration, your enriched transaction would go directly to Kafka and then on to HDFS using the HDFS sink.

The Kafka channel implements both a Kafka consumer and producer and is configured as follows.

Property

Default

Description

type*

 

Must be set to org.apache.flume.channel.kafka.KafkaChannel

brokerList*

 

The brokers the Kafka channel uses to discover topic partitions formatted as a comma-separated list of hostname:port entries. You do not need to specify the entire list of brokers but Cloudera recommends that you specify at least two for HA.

zookeeperConnect*

 

The URI of the ZooKeeper server or quorum used by Kafka. This can be a single node (for example, zk01.example.com:2181) or a comma-separated list of nodes in a ZooKeeper quorum (for example, zk01.example.com:2181,zk02.example.com:2181, zk03.example.com:2181). If you have created a path in ZooKeeper for storing Kafka data, specify the path in the last entry in the list (for example, zk01.example.com:2181, zk02.example.com:2181, zk03.example.com:2181/kafka). Use the /kafka ZooKeeper path for Cloudera Labs Kafka, because it is created automatically at installation.

topic

flume-channel

The Kafka topic the channel will use.

groupId

flume

Consumer group ID the channel uses to register with Kafka.

parseAsFlumeEvent

true

This should be true if a Flume source is writing to the channel and will expect AvroDataums with the FlumeEvent schema (org.apache.flume.source.avro.AvroFlumeEvent) in the channel. Should be set to false if other producers are writing into the topic that the channel is using.

readSmallestOffset

false

If true will read all data in the topic, if false will only read data written after the channel has started. Only relevant when parseAsFlumeEvent is false.

consumer.timeout.ms

100

kafka.consumer.timeout.ms (polling interval when writing to the sink)

*Required

Other properties can be overridden as with the Source and Sink by supplying the kafka. prefix.

When parseAsFlumeEvent is set to true, if other consumers are reading from the channel they will need the FlumeEvent class as mentioned in the table above. The channel in this case serializes the event as an . To provide reliability you should configure multiple agents with the same topic and groupId for the channel so that when an agent fails, other agents can remove data from the channel. The producer mode is always set to sync (required acks -1) and auto.commit.enabled is always overridden to false.

As Kafka sink and Kafka channel provide overlapping functionality, our recommendations are as follows:

  • If you are ingesting from Kafka to Hadoop and need the capabilities of an interceptor or selector, use the Kafka source and file or Kafka channel and standard Flume sink that you require.
  • If you want to ingest directly from Kafka to HDFS, then the Kafka channel by itself is recommended.
  • For writing events to Kafka from either Kafka or other source, the Kafka channel is recommended.
  • If you can’t wait until CDH 5.3/Flume 1.6, the Kafka sink provides this functionality today.
Conclusion

Flafka provides a lot of flexibility in pipeline architecture. The right combination of options will depend on your requirements.

We hope that this post demonstrates the ease of use of Flafka as well as that implementing fairly sophisticated event processing doesn’t necessarily dictate the need for a dedicated stream-processing system when sub-second latencies are required.

Gwen Shapira is a Software Engineer at Cloudera, and a Kafka contributor.

Jeff Holoman is a Systems Engineer at Cloudera.

Categories: Hadoop

NoSQL in a Hadoop World

Cloudera Blog - Wed, 11/05/2014 - 16:51

The number of powerful data query tools in the Apache Hadoop ecosystem can be confusing, but understanding a few simple things about your needs usually makes the choice easy. 

Ah, the good old days. I recall vividly that in 2007, I was faced to store 1 billion XML documents and make them accessible as well as searchable. I had few choices on a given shoestring budget: build something one my own (it was the rage back then—and still is), use an existing open source database like PostgreSQL or MySQL, or try this thing that Google built successfully and that was now implemented in open source under the Apache umbrella: Hadoop.

So I did bet on Hadoop, and Apache HBase in due course, as I failed to store that many small files in HDFS directly, or combine and maintain them. I probably had the first-ever HBase production cluster online in 2008, since the other users were still in development or non-commercial.

In 2014, the list of tools at your disposal is countless. Seemingly every other month, a new framework that solves the mother of all problems is announced—but luckily, the pace at which they join the Hadoop ecosystem is rather stable. We see evolution rather than revolution; those new projects have to prove themselves before being deemed a candidate for inclusion. But even within the Hadoop stack, we now have enough choices that in my role as one of Cloudera’s chief architects, I have been asked many times how to implement specific use cases, with features such as: 

  • Random reads and writes only
  • Random access, but also high throughput sequential scans
  • Analytical queries that are mostly scans
  • Interactive vs. batch-oriented use of data
  • Slowly changing dimensions (SCD) in an OLAP-like setup

In the past, either MapReduce or HBase would cover every one of these use cases. The former was the workhorse for anything batch oriented that needed high-sequential throughput as fast as disks could spin. The latter was the answer for anything else, because it was impractical to rewrite the typically very large files in HDFS for just a few small updates. 

Sure, if those updates were rather rare, one could (and did) build purely MapReduce based solutions, using ETL style workflows that merged changes as part of the overall data pipeline. But for truly random access to data and being able to serve the same there was only HBase, the Hadoop Database. But then one day during Strata+Hadoop World 2012, everything changed.

MPP Query Engines

Cloudera announced Impala at that conference in October 2012 and shipped it in early 2013. Now, you have an SQL-based query engine that can query data stored natively in HDFS (and also in HBase, but that is a different topic I will address soon). With Impala, you can query data similar to commercial MPP databases; all the servers in a cluster work together to receive the user query, distribute the work amongst them, read data locally at raw disk speeds, and stream the results back to the user, without ever materializing intermediate data or spinning up new OS processes like MapReduce does. It puts MapReduce into a batch-oriented corner, and lets standard BI tools connect directly with Hadoop data.

However, Impala also raises a slew of new questions about where HBase fits. Could Impala replace HBase? Probably not, as it still deals with immutable files that were staged by ETL workflows higher up the data ingest and processing pipeline (also see this earlier post). In practice, I often end up in situations where the customer is really trying to figure out where one starts and the other ends. I call this process the “Trough of Indecision”:

The primary driver here is what you need to achieve: high throughput, sequential scans, or random access to your data that you need to keep current along the way? 

Those are the most obvious choices. But what if you also need random access but sequential scans as well? Or be as fast as possible for scans but also update the data? That’s where the decisions get harder.

The SCD Problem

In the relational world, and especially the analytical OLAP one, there is a modeling technique referred to as “slowly changing dimensions” (SCD). (You can get much more info on this from one of Cloudera’s webinars, held by the one-and-only Ralph Kimball. Suffice to say that you have laid out data in a relational database that allows you to update dimension tables over time. Those are then JOINed (the SQL operation) with the fact tables when a report needs to be generated. If you move this data over into Hadoop and especially HDFS, you have many choices to engineer a suitable solution. (Again, please see this post for more details.) Typically you would land the data and transform it into the Apache Parquet file format. Often you pre-materialize the final results so that reading it does not involve any heavy lifting—something attributed to the cost of the necessary I/O at scale.

On the HBase side, you can store the data differently as well, since you have the power to embed or nest dependent entities into the main record. This is different from the star schema that you may retain in the HDFS version of the data. You can also create a very flexible schema that allows you to cover many aspects of usually normalized data structures.

How could you handle the SCD problem in either HDFS with Parquet format, or with HBase as a row-based, random access store? Both give you the ability to update the data—either rewrite the immutable data files, or update columns directly—so why use either? 

Amdahl’s Law–Or, the Cost of Resurrection

With HBase, there is an inherent cost to converting data into binary representation and back again. I have run some tests recently and the bottom line is that schema design in HBase is ever so important. It is basically Amdahl’s Law at play, which says that some sequential part of an algorithm defines the overall performance of an operation. And for HBase (this also applies to any other data store that handles small data points), this is the deserialization of the cells, aka the key-value pair. It represents a fixed cost, while the variable cost is based on how large the cell is (for loading and copying it in memory).

For a very small cell, the fixed cost dominates. Once you are dealing with large cells, the fixed cost is small in comparison. In other words, small cells are faster on a cells-per-second basis, but slower on a MB/sec basis. In HBase, it is not “rows per second” but “cells per second” that matters. Note: Keeping fewer versions around also should increase scan performance linearly. The diagram shows the difference of row vs. cell based access and how versions matter.

You have a choice to store every data point separately or combine them into larger entities, for example an Apache Avro record, to improve scan performance. Of course, this puts the onus of decoding the larger record on the application. 

For HDFS, you do not have the freedom to access data based on specific keys, like HBase offers. Rather, you have to scan some amount of files to find the information for which you are looking. To reduce the overall I/O, you typically partition the data so that specific directories contain data for a given selector. This is usually a column in the data that has a decent cardinality: not too many values, not too few. You want to end up where each partition is a few GB in size. Or more generically, the partition size should not get you into the small-files problem mentioned previously. Ideally, you have one file in each directory that has the size of N x HDFS blocksize—and is splittable for parallelism. 

Conversely, it is much easier to store as much as data as you want in HDFS than it is in HBase. The latter has to maintain the said index to the cell keys, and that is a cost factor. HBase needs Java heap space, just as the small-files issue does. In the end, you only have a scarce resource that you can use one way or another—a tradeoff that needs to be handled carefully. Data in HDFS can be read as fast as the I/O permits, while HBase has to do the additional work of reconstituting the cell structures (even if only at the lowest level). 

Write Amplification Woes

Both HDFS and HBase now face another issue: write amplification. If you update data in HDFS, you need to write a new delta file and eventually merge those into the augmented original, creating a new file with the updated data. The delta and original file are then deleted. 

That is pretty much exactly what HBase compactions do: rewrite files asynchronously to keep them fresh. Delta files here are the so-called flush files and the original files are the older HFiles from previous flush or compaction runs. For Hive this is implemented under HIVE-5317 though that this is for slow-changing data mostly, while HBase is suited also for fast-changing data.

You can tune the I/O in HBase by changing the compaction settings. There are ways to compact more aggressively causing more background I/O, or tone down the compactions to leave more HFiles in HDFS but also incur less “hidden” cost. But the drawback is that you have to read data from more files, and if your use case does not allow you to do some sort of grouping of data, then reads will be slower—and more memory is used to hold the block data. (In other words, you will see a churn on the HBase block cache.)

With HDFS, you can do the same, but only manually. You need to create a temporary table, then UNION ALL the delta with the original file, and finally swap out the file. This needs to be automated using Oozie and run at the appropriate times. The overarching question is, when is this approach “good enough”?

Appropriate Use Cases for Impala vs. HBase

So after seeing how storing is in the end just physics—since you have to convert and ship data during reads and writes—how does that concept translate to Impala and HBase? The following diagram tries to gauge the various use-cases:

  • Complete random access
  • Random access with scans
  • Hybrid of the above
  • High-throughput analytical scans

You can see that with Impala you get the best throughput; it simply has fewer moving parts and its lower granularity (see notes on Amdahl’s Law above) makes for best I/O performance. HBase, in contrast, gives you the most freedom to access data randomly. In the middle, the said trough, you have a choice. In practice, I start with the left-hand side (Impala first). Only when I have to guarantee random updates in addition to low latency do I tilt toward the HBase side. 

Single Source of Truth

So how on earth do you select across these choices? What advantages and disadvantages does each have?

One approach to judge is the total cost of ownership (TCO): How often do you have to split the data to make each work? A few examples:

  • Spark and Impala read data natively of HDFS, affording no extra copy. (Well, they might, depending on mixed use-cases that need different storage formats to work most efficiently.)
  • Search and the NoSQL faction need a copy of some or all data. For Search you have to build separate indexes, which usually does not involve duplicate existing ones but rather enabling them in the first place.
  • All the tools need memory to optimize recurring access to data. HDFS can cache data, HBase has the block cache, Search stores index structures, and Spark can pin intermediate data in memory for fast iterations. The more you spend, the more you get.
  • Some NoSQL solutions trade latency for efficiency; for example, Apache Cassandra dilutes the total cluster memory by the read factor (the R in “N > R + W” to achieve consistency) as it has to read blocks of data on as many servers. 

So cost, or really TCO, is an often-underestimated factor. Many decisions in organizations around the world have been ultimately made based on exactly that: it is not that one solution is better, much faster, or easier to manage than the other. It is simply the bottom line at work, and at a certain scale, the cost can be prohibitive. 

Copying data multiple times is an issue when you need that very data in various forms. This varies across use cases within the same framework (say Parquet or Avro file formats with Impala) or APIs and access patterns (random access in HBase versus large scans in Impala/Spark, or full-text search in Solr). Disks are much more affordable than memory (factor 400 and above), so in practice, we often opt for the duplication of data on disk, but not in memory.

Recent efforts in HDFS also point to this conclusion as you can now pin hot datasets in memory, and in the future will be able to share this read-only data between OS processes without any further copying. Impala and Spark will benefit from that, in particular. For HBase and Search, their use cases are still quite distinct from the other two, but with HBase snapshots and being able to read that directly from HDFS shows that there is a connection—even if it is still in the form of a rather wobbly suspension bridge.

NoSQL Defiance

So where does NoSQL sit in the Hadoop world? It certainly is going strong. But Impala, Spark, and Search are cutting into the data cake with a vengeance. They close the gap toward the stretch area where NoSQL is out of its comfort zone and batch processing simply too slow. We span the bridge from batch, to near real-time, to interactive data access now. Doing the match on how “expensive” (in comparison), flexible, and fast each option is will guide you toward the proper selection for you.

Lars George is Cloudera’s EMEA Chief Architect, an HBase committer and PMC member, and the author of O’Reilly’s HBase: The Definitive Guide.

Categories: Hadoop

How-to: Do Near-Real Time Sessionization with Spark Streaming and Apache Hadoop

Cloudera Blog - Mon, 11/03/2014 - 17:34

This Spark Streaming use case is a great example of how near-real-time processing can be brought to Hadoop.

Spark Streaming is one of the most interesting components within the Apache Spark stack. With Spark Streaming, you can create data pipelines that process streamed data using the same API that you use for processing batch-loaded data. Furthermore, Spark Steaming’s “micro-batching” approach provides decent resiliency should a job fail for some reason.

In this post, I will demonstrate and walk you through some common and advanced Spark Streaming functionality via the use case of doing near-real time sessionization of Website events, then load stats about that activity into Apache HBase, and then populate graphs in your preferred BI tool for analysis. (Sessionization refers to the capture of all clickstream activity within the timeframe of a single visitor’s Website session.) You can find the code for this demo here.

A system like this one can be super-useful for understanding visitor behavior (whether human or machine). With some additional work, it can also be designed to contain windowing patterns for detecting possible fraud in an asynchronous manner.

Spark Streaming Code

The main class to look at in our example is:

com.cloudera.sa.example.sparkstreaming.sessionization.SessionizeData

Let’s look at this code in sections (ignoring lines 1-59, which contains imports and other uninteresting stuff).

Lines 60 to 112: Setting up Spark Streaming

These lines are our pretty basic start for Spark Streaming with an option to receive data from HDFS or a socket. I’ve added some verbose comments to help you understand the code if you are new to Spark Streaming. (I’m not going to go into great detail here because we’re still in the boilerplate-code zone.)

//This is just creating a Spark Config object. I don't do much here but //add the app name. There are tons of options to put into the Spark config, //but none are needed for this simple example. val sparkConf = new SparkConf(). setAppName("SessionizeData " + args(0)). set("spark.cleaner.ttl", "120000") //These two lines will get us out SparkContext and our StreamingContext. //These objects have all the root functionality we need to get started. val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) //Here are are loading our HBase Configuration object. This will have //all the information needed to connect to our HBase cluster. //There is nothing different here from when you normally interact with HBase. val conf = HBaseConfiguration.create(); conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); //This is a HBaseContext object. This is a nice abstraction that will hide //any complex HBase stuff from us so we can focus on our business case //HBaseContext is from the SparkOnHBase project which can be found at // https://github.com/tmalaska/SparkOnHBase val hbaseContext = new HBaseContext(sc, conf); //This is create a reference to our root DStream. DStreams are like RDDs but //with the context of being in micro batch world. I set this to null now //because I later give the option of populating this data from HDFS or from //a socket. There is no reason this could not also be populated by Kafka, //Flume, MQ system, or anything else. I just focused on these because //there are the easiest to set up. var lines: DStream[String] = null //Options for data load. Will be adding Kafka and Flume at some point if (args(0).equals("socket")) { val host = args(FIXED_ARGS); val port = args(FIXED_ARGS + 1); println("host:" + host) println("port:" + Integer.parseInt(port)) //Simple example of how you set up a receiver from a Socket Stream lines = ssc.socketTextStream(host, port.toInt) } else if (args(0).equals("newFile")) { val directory = args(FIXED_ARGS) println("directory:" + directory) //Simple example of how you set up a receiver from a HDFS folder lines = ssc.fileStream[LongWritable, Text, TextInputFormat](directory, (t: Path) => true, true).map(_._2.toString) } else { throw new RuntimeException("bad input type") }

Lines 114 to 124: String Parsing

Here’s where the Streaming with Spark begins. Look at the following four lines:

val ipKeyLines = lines.map[(String, (Long, Long, String))](eventRecord => { //Get the time and ip address out of the original event val time = dateFormat.parse( eventRecord.substring(eventRecord.indexOf('[') + 1, eventRecord.indexOf(']'))). getTime() val ipAddress = eventRecord.substring(0, eventRecord.indexOf(' ')) //We are return the time twice because we will use the first at the start time //and the second as the end time (ipAddress, (time, time, eventRecord)) })

The first command above is doing a map function on the “lines” DStream object and parsing the original events to separate out the IP address, timestamp, and event body. For those new to Spark Streaming, a DStream holds a batch of records to be processed. These records are populated by the receiver object, which was defined previously, and this map function produces another DStream within this micro-batch to store the transformed records for additional processing.

There are a couple things to note when looking at a Spark Streaming diagram like the one above:

  • Each micro-batch is fired at the number of seconds defined when constructing your streamingContext
  • The Receiver is always populated the future RDDs for the next micro-batch
  • Older RDDs of past micro batch will be cleaned up and discarded
Lines 126 to 135: Making Sessions

Now that we have IP address and times broken out from the web log, it ‘s time to build sessions. The following code does the session building by first clumping events within the micro-batch, and then reducing those clumps with sessions in the stateful DStream.  

val latestSessionInfo = ipKeyLines. map[(String, (Long, Long, Long))](a => { //transform to (ipAddress, (time, time, counter)) (a._1, (a._2._1, a._2._2, 1)) }). reduceByKey((a, b) => { //transform to (ipAddress, (lowestStartTime, MaxFinishTime, sumOfCounter)) (Math.min(a._1, b._1), Math.max(a._2, b._2), a._3 + b._3) }). updateStateByKey(updateStatbyOfSessions)

Here’s an example of how the records will be reduced within the micro-batch:  

With the session ranges joined within the micro-batch, we can use the super-cool updateStateByKey functionality, which will do a join/reduce-like operation with a DStream from the micro-batch before the active one. The diagram below illustrates how this process looks in terms of DStreams over time.

Now let’s dig into the updateStatbyOfSessions function, which is defined at the bottom of the file. This code (note the verbose comments) contains a lot of the magic that makes sessionization happen in a micro-batch continuous mode.  

/** * This function will be called for to union of keys in the Reduce DStream * with the active sessions from the last micro batch with the ipAddress * being the key * * To goal is that this produces a stateful RDD that has all the active * sessions. So we add new sessions and remove sessions that have timed * out and extend sessions that are still going */ def updateStatbyOfSessions( //(sessionStartTime, sessionFinishTime, countOfEvents) a: Seq[(Long, Long, Long)], //(sessionStartTime, sessionFinishTime, countOfEvents, isNewSession) b: Option[(Long, Long, Long, Boolean)] ): Option[(Long, Long, Long, Boolean)] = { //This function will return a Optional value. //If we want to delete the value we can return a optional "None". //This value contains four parts //(startTime, endTime, countOfEvents, isNewSession) var result: Option[(Long, Long, Long, Boolean)] = null // These if statements are saying if we didn't get a new event for //this session's ip address for longer then the session //timeout + the batch time then it is safe to remove this key value //from the future Stateful DStream if (a.size == 0) { if (System.currentTimeMillis() - b.get._2 < SESSION_TIMEOUT + 11000) { result = None } else { if (b.get._4 == false) { result = b } else { result = Some((b.get._1, b.get._2, b.get._3, false)) } } } //Now because we used the reduce function before this function we are //only ever going to get at most one event in the Sequence. a.foreach(c => { if (b.isEmpty) { //If there was no value in the Stateful DStream then just add it //new, with a true for being a new session result = Some((c._1, c._2, c._3, true)) } else { if (c._1 - b.get._2 < SESSION_TIMEOUT) { //If the session from the stateful DStream has not timed out //then extend the session result = Some(( Math.min(c._1, b.get._1), //newStartTime Math.max(c._2, b.get._2), //newFinishTime b.get._3 + c._3, //newSumOfEvents false //This is not a new session )) } else { //Otherwise remove the old session with a new one result = Some(( c._1, //newStartTime c._2, //newFinishTime b.get._3, //newSumOfEvents true //new session )) } } }) result } }

There’s a lot going on in this code, and in many ways, it’s the most complex part of the whole job. To summarize, it tracks active sessions so you know if you are continuing an existing session or starting a new one.

Line 126 to 207: Counting and HBase

This section is where most of the counting happens. There is a lot of repetition here, so let’s walk through just one count example and then the steps that will allow us to put the generated counts in the same record for storage in HBase.  

val onlyActiveSessions = latestSessionInfo.filter(t => System.currentTimeMillis() - t._2._2 < SESSION_TIMEOUT) … val newSessionCount = onlyActiveSessions.filter(t => { //is the session newer then that last micro batch //and is the boolean saying this is a new session true (System.currentTimeMillis() - t._2._2 < 11000 && t._2._4) }). count. map[HashMap[String, Long]](t => HashMap((NEW_SESSION_COUNTS, t)))

In short, the code above is filtering all but the active sessions, counting them, and putting that final count record into a single entity HashMap. It uses the HashMap as a container, so we can call the following reduce function after all the counts are done to put them all into a single record. (I’m sure there are better ways to do that, but this approach works just fine.)

Next, the following code takes all those HashMaps and puts all their values in one HashMap.

val allCounts = newSessionCount. union(totalSessionCount). union(totals). union(totalEventsCount). union(deadSessionsCount). union(totalSessionEventCount). reduce((a, b) => b ++ a)

Interacting with HBase through Spark Streaming is super simple with HBaseContext. All you have to do is supply the DStream with the HashMap and a function to convert it to a put object. 

hbaseContext.streamBulkPut[HashMap[String, Long]]( allCounts, //The input RDD hTableName, //The name of the table we want to put too (t) => { //Here we are converting our input record into a put //The rowKey is C for Count and a backward counting time so the newest //count show up first in HBase's sorted order val put = new Put(Bytes.toBytes("C." + (Long.MaxValue - System.currentTimeMillis()))) //We are iterating through the HashMap to make all the columns with their counts t.foreach(kv => put.add(Bytes.toBytes(hFamily), Bytes.toBytes(kv._1), Bytes.toBytes(kv._2.toString))) put }, false)

Now with this information in HBase, can wrap it up with an Apache Hive table, and then execute a query through your favorite BI tool to get graphs like the following that will refresh on every micro-batch.

Lines 209 to 215: Writing to HDFS

The final task is to join the active session information with the event data and then persist the events to HDFS with the starting time of the session.  

//Persist to HDFS ipKeyLines.join(onlyActiveSessions). map(t => { //Session root start time | Event message dateFormat.format(new Date(t._2._2._1)) + "\t" + t._2._1._3 }). saveAsTextFiles(outputDir + "/session", "txt")

Conclusion

I hope you come away from this example feeling like a lot of work was done with just a little bit of code, because it was. Imagine about what else you can do with this pattern and the ability to interact with HBase and HDFS so easily within Spark Streaming.

Ted Malaska is a Solutions Architect at Cloudera, a contributor to Apache Spark, and a co-author of the O’Reilly book, Hadoop Applications Architecture.
 
 

Categories: Hadoop

Inside Cloudera Director

Cloudera Blog - Thu, 10/30/2014 - 16:49

With Cloudera Director, cloud deployments of Apache Hadoop are now as enterprise-ready as on-premise ones. Here’s the technology behind it.

As part of the recent Cloudera Enterprise 5.2 release, we unveiled Cloudera Director, a new product that delivers enterprise-class, self-service interaction with Hadoop clusters in cloud environments. (Cloudera Director is free to download and use, but commercial support requires a Cloudera Enterprise subscription.) It provides a centralized administrative view for cloud deployments and lets end users provision and scale clusters themselves using automated, repeatable, managed processes. To summarize, the same enterprise-grade capabilities that are available with on-premise deployments are now also available for cloud deployments. (For an overview of and motivation for Cloudera Director, please check out this blog post.)

In this post, you’ll learn about some of the technology behind Cloudera Director and why one would use it.

Data Model

From the outset, Cloudera Director was designed to be cloud-neutral, which translates to support for different cloud providers (as well as both private and public clouds). Its data model is therefore abstracted away from the specific architecture of any single provider. Here are some of the key concepts in that model.

  • Environment – An environment maps to a cloud provider. Each environment has a unique name that you provide, and can contain configuration data and SSH credentials specific to your provider account. Today, Cloudera Director supports Amazon Web Services (AWS), but support for more providers is planned for the future.
  • Instance – An instance represents a computing resource that you provision from the cloud provider. An instance is generated from an instance template, which gives specifications such as memory size, storage capacity and type, and operating system. Under AWS, for example, details such as the EC2 instance type and AMI describe the instance.
  • Deployment – A deployment maps to an instance of Cloudera Manager, Cloudera’s management application for Hadoop and enterprise data hubs. A deployment is hosted by an environment and resides on an instance that you specify.
  • Cluster – A cluster defines the instances that run the components of your enterprise data hub, such as HDFS, YARN, Apache HBase, Apache Hive, and Impala. Each cluster that you define is created and managed by a deployment. So, after Cloudera Director has readied a cluster, Cloudera Manager capabilities such as monitoring, security configuration, and auditing are available right away. You may host multiple clusters under a single deployment.

Cloudera Director works as the interface to your cloud provider (environment) by working with the provider-specific API to create, replicate, and terminate deployments and clusters. This approach lets you interact with cloud-hosted clusters just as you would with on-premise clusters, while benefiting from the advantages of cloud computing.

Server API

Cloudera Director includes a server component that you can use as a central location for your administrators and users to manage cloud deployments. The server is designed around an API that provides access to the complete set of capabilities Cloudera Director has to offer.

The Cloudera Director API is designed using RESTful principles, using JSON as a data interchange format. Service requests and responses are served over HTTP, with TLS as an option. Documentation on the API, generated using Swagger, is hosted on the server itself. The Swagger API console includes live forms that developers can use to explore, design, and troubleshoot their work.

Clients can interact with the API to manage environments, deployments, clusters, instance templates, and users of Cloudera Director. Calls are made to create, read, update, and delete each of these items, and Cloudera Director handles the details.

By default, the server enforces user authentication and authorization using a simple internal user database. Clients can use HTTP basic authentication or access a specific “login” service to authenticate and go on to make more calls. The API itself gives administrators access to tailor user accounts and add new ones to fit their needs.

User Interface

The Cloudera Director server hosts a UI available through your browser. The UI dashboard shows you at a glance the set of environments that are available and a list of deployments (Cloudera Manager instances) and clusters managed by each deployment. From the dashboard, you can perform many of the same actions that are available through the API.

In fact, a key design feature of the UI is that it relies completely on the API to work. That means that developers can be sure they have access to the full range of capabilities in Cloudera Director when they code to the API. It also ensures that users working through the UI and the API are acting on the same cluster information.

The UI does offer some features over and above the API that make working with Cloudera Director easier:

  • Wizards are integrated into the UI to make the process of defining new environments, deployments, and clusters straightforward.
  • Special wizards are also available for the higher-level tasks of adding compute nodes to a cluster (“growing” a cluster) and cloning a cluster from an existing one.
  • The UI performs some client-side validations to help guide users.
Client

In addition to the server component, Cloudera Director provides a client tool. The client provides many of the same capabilities as the server but in a standalone form; you can use the client to stand up, check status on, update, and terminate clusters. The client is a good choice for integrating with scripts, build servers, and other automated tools.

The client gets its cluster definitions from a configuration file written using the HOCON data format, which is based on JSON. The configuration file is a blueprint that completely describes the environment, deployment, and instances for a cluster. Because the configuration file is plain text, it is amenable to being stored in version control systems for tracking and control purposes.

While the client can be used on its own without the server, the client can ask the server to stand up a new cluster using its bootstrap-remote command. As with the user interface, the client uses the server API for this operation, meaning that the new deployment information is integrated into the server.

Conclusion

The diagram below illustrates how components in Cloudera Director interact. While AWS is the cloud provider shown, the picture would look very similar with any other cloud provider.

As you have now learned, Cloudera Director is built on a solid technical foundation for managing the enterprise data hub on your cloud provider. Future releases will build on this foundation with new and expanded features, including:

  • Support for more cloud providers
  • Expanded automation of typical deployment setups
  • More self-service capabilities
  • Pre-built API clients for various programming languages

To try out Cloudera Director, download it for free or try the Cloudera AWS Quick Start to get started right away.

Bill Havanki is a Software Engineer at Cloudera.

Categories: Hadoop

How-to: Write Apache Hadoop Applications on OpenShift with Kite SDK

Cloudera Blog - Mon, 10/27/2014 - 16:17

The combination of OpenShift and Kite SDK turns out to be an effective one for developing and testing Apache Hadoop applications.

At Cloudera, our engineers develop a variety of applications on top of Hadoop to solve our own data needs (here and here). More recently, we’ve started to look at streamlining our development process by using a PaaS (Platform-as-a-Service) for some of these applications. Having single-click deployment and updates to consistent development environments lets us onboard new developers more quickly, and helps ensure that code is written and tested along patterns that will ensure high quality.

The PaaS we’ve chosen is Red Hat OpenShift. OpenShift is an open hybrid solution, which means it’s open source and can be either self-hosted or accessed as a hosted solution. To date, we’ve been able to prototype our examples in the hosted solution, but are moving toward self-hosting on our internal OpenStack infrastructure.

Kite: An API for Data

The Kite SDK is an open source toolkit for building Hadoop applications. The data module provides APIs and tools for working with datasets without the developer having to know about low-level details such as file formats, layout on disk, and so on. Kite provides two main implementations of the dataset API: one for Apache Hive (where data is stored in HDFS) and one for Apache HBase.

We have written two Kite applications that run on OpenShift. The first (the Kite OpenShift Logging Example) uses a Hive dataset, and the second (the Kite OpenShift Spring HBase Example) uses an HBase dataset.

In the remainder of this post, we’ll describe the development lifecycle for these applications. To learn more about using the Kite APIs and tools to write Hadoop applications, consult the Kite SDK documentation, the Kite Examples, and the Kite OpenShift Example code (referenced in the previous paragraph).

Development Miniclusters

One of the challenges of developing Hadoop applications is the slow development cycle caused by the overhead of configuring a Hadoop cluster on which to do testing. For example, setting up a data pipeline involves a lot of manual steps such as configuring Apache Flume agents and setting up HDFS permissions. Running a Hadoop cluster in a local VM (like Cloudera’s QuickStart VM) is a lot more convenient, but there are still multiple configuration steps to run through.

The approach that projects in the Hadoop ecosystem take for internal tests is to use miniclusters—lightweight clusters that are fully functional yet run within the same JVM as the test code. They start up quickly and can be configured programmatically. However, being an internal Hadoop test artifact, miniclusters aren’t very approachable for developers who want to write applications that run on Hadoop. In addition, they are not easy to use together, so hooking up a Hive minicluster to an HDFS minicluster, for example, requires some knowledge of the internals of both.

We decided that it would be very useful to have a cohesive minicluster that can run multiple services simultaneously, so we set about wiring the various component miniclusters together. At the time of this writing, our minicluster can run services for HDFS, Apche ZooKeeper, HBase, Hive (both Metastore and HiveServer2), and Flume.

Starting the minicluster using the Java API is straightforward and uses a builder pattern:

MiniCluster miniCluster = new MiniCluster.Builder()   .addService(HdfsService.class)   .addService(HiveService.class)   .addService(FlumeService.class)     .flumeConfiguration("resource:flume.properties")     .flumeAgentName("tier1")   .clean(true).build(); miniCluster.start();

This snippet starts a minicluster running HDFS, Hive, and Flume. The Flume agent called tier1 is configured using the flume.properties file from the classpath.

The HDFS and Hive services do not need any configuration by default, but it is easy to configure them by placing the standard hdfs-site.xml and hive-site.xml files on the classpath.

A minicluster stores its working state in a local directory (/tmp/kite-minicluster by default, overridden via the workDir() method). The clean() method on the builder instructs the minicluster to start afresh by clearing the working directory before starting.

You can see an example of using a minicluster embedded in a web app in the Kite OpenShift Logging Example. The web app uses a ServletContextListener to start and stop the minicluster as a part of the webapp’s lifecycle. The main part of the web app uses Kite’s dataset API to write data into Hadoop using Flume and then displays the data on a web page using the Hive JDBC driver.

The Minicluster CLI

Kite has a CLI for running the minicluster, which can be useful for quickly spinning up a cluster locally during development. The CLI is also used by the OpenShift Hadoop cartridge described to follow.

The following CLI invocation is equivalent to the Java snippet above:

./kite-minicluster run hdfs hive flume \   --flume-configuration /path/to/flume.properties   --flume-agent-name tier1   --clean

The Minicluster on OpenShift

A PaaS provides the infrastructure required for applications to run. This could include web frameworks, databases, monitoring services, and more. In the OpenShift world, the container through which these components are deployed is called a cartridge. To allow a database to be deployed to OpenShift, a cartridge must exist that is capable of installing, configuring, and running that database. The Kite GitHub repository now includes an OpenShift Minicluster cartridge.

Using the Kite minicluster cartridge is a great alternative to embedding the minicluster in an application. One large benefit is faster deployment times of the application, which is crucial for maintaining an agile development workflow. Another benefit is that you can run multiple applications without having dependencies on the application running the embedded minicluster.

You can see an example Spring MVC web application using the Kite minicluster cartridge deployed to OpenShift. The application has a Maven build profile called openshift that will set the connection settings to environment variable values that the Kite minicluster cartridge exports on the OpenShift system. The OpenShift application deployment workflow will build the application locally using the openshift build profile by default.

The biggest challenge to getting the minicluster to run in OpenShift was making it compatible with IP and port binding permission restrictions OpenShift puts in place. There is a requirement that applications running in OpenShift can only bind to a private IP address that can be found in an environment variable on the system. Binding to the wildcard address (0.0.0.0) or localhost (127.0.0.1) produces a “permission denied” error. The various component miniclusters that the Kite minicluster wraps are sometimes hardcoded with configurations that can’t be overridden. The IP addresses to which they bind is one such hardcoded configuration that proved problematic for running in OpenShift.

We were able to get around these restrictions with a variety of techniques. One includes creating a Hadoop configuration implementation that forces the IP bind configuration to be the value we set it to, even when the internal minicluster code tries to override it. In other cases we had to re-implement some of the component minicluster code.

After running the minicluster with the command ./kite-minicluster run hdfs zk hbase -b 192.168.0.135, you will find that all sockets opened for listening in the process will be bound to the IP 192.168.0.135.

Trying the Examples on OpenShift

To try out the web example in OpenShift, you first need to sign up for an OpenShift account and install the client tools. Due to the resource requirements of the minicluster, you need to have an account that allows you to run larger instances. (This is not possible with the free account.) Next, create a web application called “snapshot” with the following commands:

$ rhc app create snapshot https://raw.githubusercontent.com/kite-sdk/kite-minicluster-openshift-cartridge/master/metadata/manifest.yml jbossews-2.0 -g large $ rhc cartridge storage jbossews-2.0 --app snapshot --set 4

This app create command a barebones web application with the Kite minicluster running as a cartridge for the web application to access. This command will also create a git repository on your local host in the directory from which you ran this command. When you add and push your web application code to this git repository it will be deployed to the environment. The cartridge storage command will increase the amount of storage space to which the web application has access. It needs to be raised so it has enough space to pull in the dependencies on build.

Next, add your code to the repository and push it to the server. Do that with the following commands:

$ cd snapshot $ git remote add upstream -m master \   git://github.com/kite-sdk/kite-spring-hbase-example.git $ git pull -s recursive -X theirs upstream master $ git push origin master

First move into the git repository, and then add the kite-spring-hbase-example repository as an upstream remote repo. This step will allow you to merge that code into the empty repo with the git pull command. Finally, push that code to the OpenShift server, which will cause OpenShift to build and deploy the code you pushed.

Those of you who have experience with OpenShift may be wondering why you can’t just add the URL of the example repository to the app create command in step 1, which would tell OpenShift to use that repository as the base. The answer to that question is that Kite and Hadoop have many dependencies that need to be downloaded by Maven when building, and the app create command has a timeout of ~230 seconds. That isn’t enough time to download all the dependencies when the application is built for the first time, causing the command to fail. So, this limitation necessitates the workaround of adding the code to the empty repository as explained above.

Conclusion

The two examples that are compatible with OpenShift (logging and web) have directions in their project READMEs that detail how to deploy them. These examples are a great way to get started with both Kite and OpenShift, and provide a solid foundation on which you can build your own applications.

The routine steps for beginning development on top of the Hadoop stack required either an already existing (and properly configured) Hadoop cluster to be set up or installation of a VM on a developer’s local workstation. But that approach takes both resources and time for proper bootstrapping. The combination of Kite’s minicluster and OpenShift provides a much lower barrier to entry to developing your own applications on top of this powerful stack.

Adam Warrington is an Engineer Manager on the customer operations team at Cloudera.

Tom White is a Software Engineer at Cloudera, a committer/PMC member for Apache Hadoop and multiple other Hadoop ecosystem projects, and the author of the popular O’Reilly Media book, Hadoop: The Definitive Guide.

Categories: Hadoop

New in CDH 5.2: New Security App and More in Hue

Cloudera Blog - Fri, 10/24/2014 - 16:49

Thanks to new improvements in Hue, CDH 5.2 offers the best GUI yet for using Hadoop.

CDH 5.2 includes important new usability functionality via Hue, the open source GUI that makes Apache Hadoop easy to use. In addition to shipping a brand-new app for managing security permissions, this release is particularly feature-packed, and is becoming a great complement to BI tools from Cloudera partners like Tableau, MicroStrategy, and Zoomdata because a more usable Hadoop translates into better BI overall across your organization! 

In the rest of this post, we’ll offer an overview of these improvements.

Security

To support the growth of the Apache Sentry (incubating) project and make it easier to secure your cluster, CDH 5.2 Hue contains a new Security app. Sentry privileges determine which Apache Hive/Impala databases and tables a user can see or modify. The Security app lets you create/edit/delete roles and privileges directly from your browser (there is no sentry-provider.ini file to edit anymore), and to take advantage of Sentry’s new WITH GRANT OPTION functionality. (Learn more here.)

Search

Hue’s initial Search dashboards introduced new ways to quickly explore a lot of data by dragging & dropping graphical widgets and leveraging Apache Solr capabilities. In CDH 5.2, the new application is greatly improved. For example, the top menu bar is re-organized to split widgets displaying records or facets, new Heatmap and Tree widgets let you explore data in 2D or n-Dimensions, the new Marker Map is great for automatically plotting the result rows on a leaflet, and index fields can now have their terms and stats retrieved in a single click. (Learn more here.) 

Apache Oozie

Hue’s Oozie dashboard got a few improvements to make Oozie job management less tedious: page display is faster, you can now suspend/kill/resume jobs and rerun failed coordinator actions in bulk, and there’s a new Metrics feature. (Learn more here.)

File Browser

A lot of exciting work has been done on File Browser to provide the best user experience possible, including:

  • Drag & drop uploading
  • Quick links to most recently used paths (up to 10 most recent)
  • Cleaner, more streamlined actions bar
  • New Actions Context menu
  • Improved Copy/Move Modals

and more.

Query Editors

The query editors for Impala and Hive are now more secure and convenient to use thanks to new support for LDAP passthrough, SSL encryption with HiveServer2, and automatic query timeout. Plus, there are pretty new graphs.

And Don’t Forget…

Kerberos support in the Apache HBase app, Solr Indexer improvements (picks up the Apache ZooKeeper config and gives a hint if pointing to the wrong Solr), a “kill application button” for YARN in Job Browser, and more SDK functionality for building your own apps.

Look for even more in the next CDH release!

 

Categories: Hadoop

New in CDH 5.2: Impala Authentication with LDAP and Kerberos

Cloudera Blog - Thu, 10/23/2014 - 16:21

Impala authentication can now be handled by a combination of LDAP and Kerberos. Here’s why, and how.

Impala, the open source analytic database for Apache Hadoop, supports authentication—the act of proving you are who you say you are—using both Kerberos and LDAP. Kerberos has been supported since release 1.0, LDAP support was added more recently, and with CDH 5.2, you can use both at the same time.

Using LDAP and Kerberos together provides significant value; Kerberos remains the core authentication protocol and is always used when Impala daemons connect to each other and to the Hadoop cluster. However, Kerberos can require more maintenance to support. LDAP is ubiquitous across the enterprise and is commonly utilized by client applications connecting to Impala via ODBC and JDBC drivers. A mix of the two therefore frequently makes sense.

This table demonstrates the various combinations and their use cases:

In this post, I’ll explain why and how to set-up Impala authentication using a combination of LDAP and Kerberos.

Kerberos

Kerberos remains the primary authentication mechanism for Apache Hadoop. A little Kerberos terminology will help with the discussion to follow.

  • A principal is some Kerberos entity, like a person or a daemon process. For our purposes, a principal looks like name/hostname@realm for daemon processes, or just name@realm for users.
  • The name field can be associated with a process, like “impala”, or it can be a username, like “myoder”.
  • The hostname field can be the fully qualified name of the machine, or the Hadoop-specific magic _HOST string, which is auto-replaced with the fully qualified hostname.
  • The realm is similar to (but not necessarily the same as) a DNS domain. 

Kerberos principals can prove that they are who they say that they are by either supplying a password (if the principal is a human) or by providing a “keytab” file. Impala daemons need a keytab file, which must be well protected: anyone who can read that keytab file can impersonate the Impala daemons.

Basic support for Kerberos in impala for this process is straightforward: Supply the following arguments, and the daemons will use the given principal and the keys in the keytab file to take on the identity of the principal for all communication.

  • --principal=impala/hostname@realm and
  • --keytab_file=/full/path/to/keytab

There is another wrinkle if the Impala daemon (impalad) is sitting behind a load balancer. When the clients running queries go through the load balancer (a proxy) the client is expecting the impalad to have a principal that’s the same as the name of the load balancer. So the impalad has to use a principal matching the name of the proxy when it services these external queries, but will need to use a principal matching its actual host name when doing back-end communication between daemons. The new flags to the impalad in this case would be:

  • --principal=impala/proxy-hostname@realm
  • --be_principal=impala/actual-hostname@realm
  • --keytab_file=/full/path/to/keytab

The first --principal specifies what principal to use when the impalad services external queries, and the --be_principal specifies the principal for when the impalad is doing back-end communication. Keys for both of these principals must reside in the same keytab file.

Debugging Kerberos

Kerberos is an elegant protocol, but practical implementations are not always very helpful if something goes wrong. The top two things to check in case of failure are:

  • Time. Kerberos is dependent on synchronized clocks, so it is a best practice to install and use NTP (the Network Time Protocol) on all machines dealing with Kerberos.
  • DNS. Make sure that your hostnames are fully qualified and that forward (name->IP) and reverse (IP->name) DNS lookups are correct.

Beyond that, it is possible to set two environment variables that will give you Kerberos debugging information. The output may be a little overwhelming, but frequently it will point the way to a solution.

  • KRB5_TRACE=/full/path/to/trace/output.log: This environment variable will instruct all kerberos clients and utilities to print debug output to the named file.
  • JAVA_TOOL_OPTIONS=-Dsun.security.krb5.debug=true: This environment variable is passed to the impala daemons, which in turn pass it to the internal java component.

In CDH 5.2 and later you can also supply the --krb5_debug_file parameter, which will turn on Kerberos debugging and write the output to the given file. You can supply it in Cloudera Manager via the Impala Configuration “Service-Wide” -> “Advanced” -> “Impala Command Line Argument Advanced Configuration Snippet” parameters. (Environment variables like those above can be supplied in the adjacent “Impala Service Environment Advanced Configuration Snippet” parameters.) It also goes without saying that Google is your friend when debugging problems with Kerberos.

Kerberos Flags

The Cloudera documentation for Kerberos and Impala covers this in greater detail, but these are the basic flags:

LDAP

Kerberos is great, but it does require that the end user have a valid Kerberos credential, which is not practical in many environments—because every user who interacts with Impala and the Hadoop cluster must have a Kerberos principal configured. For organizations that use Active Directory to manage user accounts, it can be onerous to create corresponding user accounts for each user in an MIT Kerberos realm. Many corporate environments use the LDAP protocol instead, where clients authenticate themselves using their username and password.

When configured to use LDAP, think of the impalad as an LDAP proxy: the client (the Impala shell, ODBC, JDBC, Hue, and so on) sends its username and password to the impalad, and the impalad takes the username and password and sends them to the LDAP server in an attempt to log in. In LDAP terminology, the impalad issues an LDAP "bind" operation. If the LDAP server returns success for the login attempt, the impalad accepts the connection.

LDAP is only used to authenticate external clients, such as the Impala shell, ODBC, JDBC, and Hue. All other back-end authentication is handled by Kerberos.

LDAP Configurations

LDAP is complicated (and powerful) because it is so flexible; there are many ways to configure LDAP entities and authenticate those entities. In general, every person in LDAP has a Distinguished Name, or DN, which can be considered the username or principal according to LDAP. 

Let’s examine how users are set up for two different LDAP servers.  The first user is named "Test1 Person" and resides in Windows 2008 Active Directory.

# Test1 Person, Users, ad.sec.cloudera.com dn: CN=Test1 Person,CN=Users,DC=ad,DC=sec,DC=cloudera,DC=com cn: Test1 Person sAMAccountName: test1 userPrincipalName: test1@ad.sec.cloudera.com

The second is me: the entry for user myoder, residing in an OpenLDAP server:

# myoder, People, cloudera.com dn: uid=myoder,ou=People,dc=cloudera,dc=com cn: Michael Yoder uid: myoder homeDirectory: /home/myoder

Many items have been removed from the above for simplicity. Let’s note some of the similarities and differences in these two accounts:

  • DN: The first line after the comment is for the DN. This is the primary identifying string for one LDAP account. The name starts out specific (CN=Test1 Person and uid=myoder) and works out to the more general; DC=cloudera,DC=com corresponds to cloudera.com. They are quite different: the AD entry has a human name in the first field (CN=Test1 Person) and the OpenLDAP entry has an account username (uid=myoder).
  • CN: The Common Name. For AD, it’s the same as in the DN; for OpenLDAP it’s the human name, which is not the uid in the DN.
  • sAMAccountName: This AD-only entry is a legacy form of a username. Despite being deprecated it is widely used and documented.
  • userPrincipalName: This AD-only entry, by convention, should map to the user’s email name. It will usually look like this: sAMAccountName@fully.qualified.domain.com. This is the modern Active Directory username and is widely used.

There is an additional interesting implementation detail about AD. Normally, authentication in LDAP is based on the DN. With AD, several items are tried in turn:

  • First, the DN
  • userPrincipalName
  • sAMAccountName + "@" + the DNS domain name
  • Netbios domain name + "\" + the sAMAccountName
  • And several other somewhat more obscure mechanisms (see the link above)
LDAP and the Impalad

Given all these differences, it is fortunate that the impala daemon provides several mechanisms to address the varieties of LDAP configurations out there. First, let’s start simple:

  • --enable_ldap_auth must be set, and
  • --ldap_uri=ldap://ldapserver.your.company.com needs to be specified.

With just those set, the username given to the impalad (by the impala shell, jdbc, odbc, etc) is passed straight through to the LDAP server unaltered. This approach works out great for AD if the user name is fully qualified, like test1@ad.sec.cloudera.com—it’ll match either the userPrincipal or the sAMAccountName plus the DNS domain name.

It’s also possible to set up the impalad up so that the domain (ad.sec.cloudera.com in this case) is automatically added to the username, by setting --ldap_domain=ad.sec.cloudera.com as an argument to the impalad. Now when a client username comes in, like "test1", it will append that domain name so that the result passed to AD becomes test1@ad.sec.cloudera.com. This behavior can be a convenience to your users.

So far, things are working smoothly for AD. But what about other LDAP directories, like OpenLDAP? It doesn’t have any of the sAMAccountName or userPrincipalName stuff, and instead we have to authenticate directly against the DN. Users aren’t going to know their LDAP DN! 

Fortunately, the impalad has parameters for this scenario, too. The --ldap_baseDN=X parameter is used to convert the username into the LDAP DN, so that the resulting DN looks like uid=username,X. For example, if --ldap_baseDN=ou=People,dc=cloudera,dc=com, and the username passed in is "myoder", the resulting query passed to LDAP will look like uid=myoder,ou=People,dc=cloudera,dc=com—which does indeed match the DN of user myoder above. Presto!

For maximum flexibility, it’s also possible to specify an arbitrary mapping from usernames into a DN via the --ldap_bind_pattern string. The idea is that the string specified must have a placeholder named #UID inside it, and that #UID is replaced with the username. For example, you could mimic the behavior of --ldap_baseDN by specifying --ldap_bind_pattern=uid=#UID,ou=People,dc=cloudera,dc=com. When the username of "myoder" comes in, it replaces the #UID, and we’ll get the same string as above. This option should be used when more control over the DN is needed.

LDAP and TLS

When using LDAP, the username and password are sent over the connection to the LDAP server in the clear. This means that without any other sort of protection, anyone can see the password travelling over the wire. To prevent that, you must protect connection with TLS (Transport Layer Security, formerly known as SSL). There are two different connections to protect: between the client and the impalad, and between the impalad and the LDAP server.

TLS Between the Client and the Impalad

Authentication for TLS connections is done with certificates, so the impalad (as a TLS server) will need its own certificate. The impalad presents this certificate to clients in order to prove that it really is the impalad. In order to supply this certificate, the impalad must be started with --ssl_server_certificate=/full/path/to/impalad-cert.pem and --ssl_private_key=/full/path/to/impalad-key.pem.

Now clients must use TLS to talk to the impalad. In the impala shell, you accomplished that goal with the --ssl and --ca_cert=/full/path/to/ca-certificate.pem arguments. The ca_cert argument specifies the certificate that signed the ssl_server_certificate above. For ODBC connections, consult the documentation for the Cloudera ODBC driver for Impala. It offers a thorough description of the settings required for certificates, authentication, and TLS.

Frankly, using TLS between the impala clients and the impalad is a good idea, regardless of whether or not LDAP is being used. Otherwise, your queries, and the results of those queries, go over the wire in the clear.

TLS Between the Impalad and the LDAP Server

There are two ways to turn on TLS with the LDAP Server:

  • Supply --ldap_tls as an argument to the impalad. The connection will take place over the usual LDAP port, but after the connection is first made it will issue a STARTTLS request which will upgrade the connection to a secure connection using TLS on that same port.
  • Supply a URI starting with ldaps://. This uses a different port than ldap://

Finally, the connection to the LDAP server needs its own authentication; this way, you know that the impalad is talking to the correct ldap server and you’re not giving your passwords to a rogue man-in-the-middle attacker. You’ll need to pass --ldap_ca_certificate to the impalad to specify the location of the certificate that signed the LDAP server’s certificate.

LDAP Flags

The Cloudera documentation for LDAP and Impala contains much of this information, and the documentation for TLS between the Impala client and the Impala daemon is required reading as well. In Cloudera Manager, you set these flags in the Impala Configuration in the “Service-Wide” -> “Security” menu. You must specifiy them in the “Service-Wide” -> “Advanced” -> “Impala Command Line Argument Advanced Configuration Snippet” parameters.

To summarize all these flags:

Bringing it All Together

Correctly implementing authentication in the most secure manner possible results in quite a lot of flags being passed to the Impala daemons. Here is an example invocation of the impalad (minus other flags), assuming that we want to enable both kerberos and LDAP authentication:

impalad --enable_ldap_auth \     --ldap_uri=ldap://ldapserver.your.company.com \     --ldap_tls \     --ldap_ca_certificate=/full/path/to/certs/ldap-ca-cert.pem \     --ssl_server_certificate=/full/path/to/certs/impala-cert.pem \     --ssl_private_key=/full/path/to/certs/impala-key.pem \     --principal=impala/_HOST@EXAMPLE.COM \     --keytab_file=/full/path/to/keytab

Connecting from the impala shell might look like this:

impala-shell.sh --ssl \ --ca_cert=/full/path/to/cert/impala-ca-cert.pem \ -k

When authenticating with Kerberos, or

impala-shell.sh --ssl \  --ca_cert=/full/path/to/cert/impala-ca-cert.pem \  -l -u myoder@cloudera.com

When authenticating with LDAP.

Michael Yoder is a Software Engineer at Cloudera.

Categories: Hadoop

New in CDH 5.2: Apache Sentry Delegated GRANT and REVOKE

Cloudera Blog - Fri, 10/17/2014 - 15:13

This new feature, jointly developed by Cloudera and Intel engineers, makes management of role-based security much easier in Apache Hive, Impala, and Hue.

Apache Sentry (incubating) provides centralized authorization for services and applications in the Apache Hadoop ecosystem, allowing administrators to set up granular, role-based protection on resources, and to review them in one place. Previously, Sentry only designated administrators to GRANT and REVOKE privileges on an authorizable object. In Apache Sentry 1.5.0 (shipping inside CDH 5.2), we have implemented a new feature (SENTRY-327) that allows admin users to delegate the GRANT privilege to other users using WITH GRANT OPTION. If a user has the GRANT OPTION privilege on a specific resource, the user can now grant the GRANT privilege to other users on the same resource. Apache Hive, Impala, and Hue have all been updated to take advantage of this new Sentry functionality.

In this post, we’ll provide an overview of how this new feature works.

Delegating GRANT/REVOKE Privileges

You can use Hive or Impala to grant privileges using the GRANT ... WITH GRANT OPTION SQL statement:

GRANT   priv_type [, priv_type ] ...   ON table_or_view_name   TO principal_specification [, principal_specification] ... [WITH GRANT OPTION];

Note: Impala currently only supports granting/revoking a single privilege at a time (IMPALA-1341).

When WITH GRANT OPTION is specified, the command will give members of the target role privileges to issue their own GRANT

statements. Initially, only a pre-defined set of Sentry admin users can issue GRANT statements.

For example, the following commands will create a new role, sales_dept, and provide members of the role the GRANT OPTION privilege on database salesdb:

USE salesdb; CREATE ROLE sales_dept; GRANT ROLE sales_dept TO GROUP sales_grp; GRANT ALL ON DATABASE salesdb TO ROLE sales_dept WITH GRANT OPTION;

This will give users belonging to the sales_dept role the ability to grant equivalent or lesser privileges—privileges on salesdb or tables under salesdb—to other roles. This status includes the ability to grant using the GRANT OPTION privilege.

Thus, a user who belongs to the sales_dept role will now have privileges to execute commands such as:

GRANT ALL ON TABLE marketing TO ROLE marketing_dept; GRANT SELECT ON DATABASE salesdb TO ROLE marketing_dept;

The GRANT OPTION privilege also allows for granting the GRANT OPTION to other roles. For example, the following will grant the GRANT OPTION privilege to role marketing_dept, which will give members of that role the ability to grant it to other roles:

GRANT SELECT ON DATABASE salesdb TO ROLE marketing_dept WITH GRANT OPTION;

Viewing Granted Privileges

When managing role privileges, you can determine which privileges have been granted to a role and whether the privilege was granted using WITH GRANT OPTION, using:

SHOW GRANT ROLE <roleName>;

This statement returns all privileges granted to a role by all users. It can be executed by admin users or by any user who currently belongs to the role.

An example from Impala is shown below. The statement returns similar results in Hive:

+----------+----------+-----------+-----------+--------------+-----------------+ | scope    | database | table     | privilege | grant_option | create_time     | +----------+----------+-----------+-----------+--------------+-----------------+ | TABLE    | salesdb  | marketing | ALL       | false        | Wed, Oct 01 ... | | DATABASE | salesdb  |           | SELECT    | true         | Wed, Oct 01 ... | +----------+----------+-----------+-----------+--------------+-----------------+

Revoking the GRANT privilege

If a user has the GRANT OPTION privilege, they can also revoke privilege from roles. The Impala and Hive syntax for REVOKE is:

REVOKE [GRANT OPTION FOR]   priv_type [, priv_type ] ...   ON table_or_view_name FROM principal_specification [, principal_specification] ... ;

To revoke only the grant option from a privilege, the GRANT OPTION FOR clause can be added to a REVOKE statement. When this clause is specified, the target privilege will be preserved, but users in the role will no longer be allowed to issue GRANT statements.

Hive does not currently support the GRANT OPTION FOR, but the REVOKE command without this clause will always revoke all privileges (those granted with and without WITH GRANT OPTION). For example, if a role named sales_dept was granted SELECT and INSERT privileges on table marketing:

USE salesdb; GRANT SELECT, INSERT ON TABLE marketing TO ROLE sales_dept;

The following REVOKE will only remove the INSERT on the table marketing, preserving the SELECT privilege:

REVOKE INSERT ON TABLE marketing FROM ROLE sales_dept;

Furthermore, we support the revocation of child privileges when executing the REVOKE command. To revoke all privileges on the database salesdb along with all privileges granted on all child tables:

REVOKE ALL ON DATABASE salesdb FROM ROLE sales_dept;

Future Work

The Hive integration with Sentry is based on Hive 0.13, which does not support the GRANT OPTION FOR clause in the Hive revoke command. In Hive 0.14.0, this syntax is supported and the grant option for a privilege can be removed while still keeping the privilege using REVOKE. (For more information, see SENTRY-473.)

Impala syntax will also be enhanced to match the Hive syntax for granting/revoking multiple privileges to/from multiple roles in a single statement (IMPALA-1341).

Acknowledgments

This feature is co-developed by Intel and Cloudera. Many thanks to everyone who participated in this work (listed in alphabetical order):

  • Arun Suresh
  • Dapeng Sun
  • Haifeng Chen
  • Lenni Kuff
  • Prasad Mujumdar
  • Sravya Tirukkovalur
  • Xiaomeng Huang

Xiaomeng Huang is a Software Engineer at Intel.

Lenni Kuff is a Software Engineer at Cloudera.

Categories: Hadoop

New in CDH 5.2: More SQL Functionality and Compatibility for Impala 2.0

Cloudera Blog - Thu, 10/16/2014 - 12:40

Impala 2.0 is the most SQL-complete/SQL-compatible release yet.

As we reported in the most recent roadmap update (“What’s Next for Impala: Focus on Advanced SQL Functionality”), more complete SQL functionality (and better SQL compatibility with other vendor extensions) is a major theme in Impala 2.0.

In this post, we’ll describe the highlights (not a complete list), and provide links to the docs that drill-down on these functions.

Analytic (Window) Functions

Analytic functions (aka window functions) are a special category of built-in functions. Analytic functions are frequently used in fields such as finance and science to provide trend, outlier, and bucketed analysis for large data sets. You might also see the term “window functions” in database literature, referring to the interval (the “window”) to which the function call applies, particularly when the query includes a ROWS clause.

Like aggregate functions, analytic functions examine the contents of multiple input rows to compute each output value. However, rather than being limited to one result value per GROUP BY group, they operate on sliding windows where the input rows are ordered and grouped using flexible conditions expressed through an OVER() clause.

Impala 2.0 now supports the following analytic query clauses and pure analytic functions:

  • OVER Clause
  • Window Clause
  • DENSE_RANK() Function
  • FIRST_VALUE() Function
  • LAG() Function
  • LAST_VALUE() Function
  • LEAD() Function
  • RANK() Function
  • ROW_NUMBER() Function

See the docs for more details about these functions.

New Data Types

New data types in Impala 2.0 provide greater compatibility with source code from traditional database systems:

  • VARCHAR is like the STRING data type, but with a maximum length. See VARCHAR Data Type for details.
  • CHAR is like the STRING data type, but with a precise length. Short values are padded with spaces on the right. See CHAR Data Type for details.
Subquery Enhancements

Impala 2.0 also supports a number of subquery enhancements including:

  • Subqueries in the WHERE clause (for example, with the IN operator).
  • EXISTS and NOT EXISTS operators (always used in conjunction with subqueries).
  • The IN and NOT IN queries on the result set form a subquery, not just a hardcoded list of values.
  • Uncorrelated subqueries let you compare against one or more values for equality, IN, and EXISTS comparisons. For example, you might use WHERE clauses such as WHERE column = (SELECT MAX(some_other_column FROM table) or WHERE column IN (SELECT some_other_column FROM table WHERE conditions).
  • Correlated subqueries let you cross-reference values from the outer query block and the subquery.
  • Scalar subqueries let you substitute the result of single-value aggregate functions such as MAX(), MIN(), COUNT(), or AVG(), where you would normally use a numeric value in a WHERE clause.

See the docs for more details.

SQL Operations That Spill to Disk

Certain memory-intensive operations now write temporary data to disk (known as “spilling to disk”) when Impala is close to exceeding its memory limit for a particular node.

For example, when large tables are joined, Impala keeps the distinct values of the join columns from one table in memory, to compare them to incoming values from the other table. When a query uses a GROUP BY clause for columns with millions or billions of distinct values, Impala keeps a similar number of temporary results in memory, to accumulate the aggregate results for each value in the group. When a large result set is sorted by the ORDER BY clause, each node sorts its portion of the result set in memory. The DISTINCT and UNION operators also build in-memory data structures to represent all values found so far, to eliminate duplicates as the query progresses.

The result is a query that completes successfully, rather than failing with an out-of-memory error. The tradeoff is decreased performance due to the extra disk I/O to write the temporary data and read it back in. Thus, while this feature improves reliability and reduces memory usage, you should optimize your queries, system parameters, and hardware configuration to make spilling rare.

See the docs for more details.

More SQL on the Way

The features above are just a few of the most notable highlights in Impala 2.0, which also includes additional SQL functionality such as vendor-specific extensions like DECODE and DATE_PART.

Stay tuned to this blog for information about SQL functionality in future releases.

John Russell is the technical writer for Impala, and the author of Getting Started with Impala (O’Reilly Media).

Categories: Hadoop

Introducing Cloudera Labs: An Open Look into Cloudera Engineering R&D

Cloudera Blog - Wed, 10/15/2014 - 14:19

Cloudera Labs contains ecosystem innovations that one day may bring developers more functionality or productivity in CDH.

Since its inception, one of the defining characteristics of Apache Hadoop has been its ability to evolve/reinvent and thrive at the same time. For example, two years ago, nobody could have predicted that the formative MapReduce engine, one of the cornerstones of “original” Hadoop, would be marginalized or even replaced. Yet today, that appears to be happening via Apache Spark, with Hadoop becoming the stronger for it. Similarly, we’ve seen other relatively new components, like Impala, Apache Parquet (incubating), and Apache Sentry (also incubating), become widely adopted in relatively short order.

This unique characteristic requires Cloudera to be highly sensitive to new activity at the “edges” of the ecosystem — in other words, to be vigilant for the abrupt arrival of new developer requirements, and new components or features that meet them. (In fact, Cloudera employees are often the creators of such solutions.) When there is sufficient market interest and customer success with them seems assured, these new components often join the Cloudera platform as shipping product.

Today, we are announcing a new program that externalizes this thought process: Cloudera Labs (cloudera.com/labs). Cloudera Labs is a virtual container for innovations being incubated within Cloudera Engineering, with the goal of bringing more use cases, productivity, or other types of value to developers by constantly exploring new solutions for their problems. Although Labs initiatives are not supported or intended for production use, you may find them interesting for experimentation or personal projects, and we encourage your feedback about their usefulness to you. (Note that inclusion in Cloudera Labs is not a precondition for productization, either.)

Apache Kafka is among the “charter members” of this program. Since its origin as proprietary LinkedIn infrastructure just a couple years ago for highly scalable and resilient real-time data transport, it’s now one of the hottest projects associated with Hadoop. To stimulate feedback about Kafka’s role in enterprise data hubs, today we are making a Kafka-Cloudera Labs parcel (unsupported) available for installation.

Other initial Labs projects include:

  • Exhibit
    Exhibit is a library of Apache Hive UDFs that usefully let you treat array fields within a Hive row as if they were “mini-tables” and then execute SQL statements against them for deeper analysis.
  • Hive-on-Spark Integration
    A broad community effort is underway to bring Apache Spark-based data processing to Apache Hive, reducing query latency considerably and allowing IT to further standardize on Spark for data processing.
  • Impyla
    Impyla is a Python (2.6 and 2.7) client for Impala, the open source MPP query engine for Hadoop. It communicates with Impala using the same standard protocol as ODBC/JDBC drivers.
  • Oryx
    Oryx, a project jointly spearheaded by Cloudera Engineering and Intel, provides simple, real-time infrastructure for large-scale machine learning/predictive analytics applications.
  • RecordBreaker
    RecordBreaker, a project jointly developed by Hadoop co-founder Mike Cafarella and Cloudera, automatically turns your text-formatted data into structured Avro data–dramatically reducing data prep time.

As time goes on, and some of the projects potentially graduate into CDH components (or otherwise remain as Labs projects), more names will join the list. And of course, we’re always interested in hearing your suggestions for new Labs projects.

Categories: Hadoop

Cloudera Enterprise 5.2 is Released

Cloudera Blog - Tue, 10/14/2014 - 20:01

Cloudera Enterprise 5.2 contains new functionality for security, cloud deployments, and real-time architectures, and support for the latest open source component releases and partner technologies.

We’re pleased to announce the release of Cloudera Enterprise 5.2 (comprising CDH 5.2, Cloudera Manager 5.2, Cloudera Director 1.0, and Cloudera Navigator 2.1).

This release reflects our continuing investments in Cloudera Enterprise’s main focus areas, including security, integration with the partner ecosystem, and support for the latest innovations in the open source platform (including Impala 2.0, its most significant release yet, and Apache Hive 0.13.1). It also includes a new product, Cloudera Director, that streamlines deployment and management of enterprise-grade Hadoop clusters in cloud environments; new component releases for building real-time applications; and new support for significant partner technologies like EMC Isilon. Furthermore, this release ships the first results of joint engineering with Intel, including WITH GRANT OPTION for Hive and Impala and performance optimizations for MapReduce.

Here are some of the highlights (incomplete; see the respective Release Notes for CDH, Cloudera Manager, and Cloudera Navigator for full lists of features and fixes):

Security
  • Via Apache Sentry (incubating) 1.4, GRANT and REVOKE statements in Impala and Hive can now include WITH GRANT OPTION, for delegation of granting and revoking privileges (joint work with Intel under Project Rhino).
  • Hue has a new Sentry UI that supports policy management for visually creating/editing roles in Sentry and permissions on files in HDFS.
  • Kerberos authentication is now supported in Apache Accumulo.
  • Impala, authentication can now be done through a combination of Kerberos and LDAP.
Data Management and Governance
  • Cloudera Navigator 2.1 features a brand-new auditing UI that is unified with lineage and discovery, so you now have access to all Navigator functionality from a single interface.
  • Navigator 2.1 includes role-based access control so you can restrict access to auditing, metadata and policy management capabilities.
  • We’re also shipping a beta policy engine in Navigator 2.1. Targeted to GA by year-end, the policy engine allows you to set up rules and notifications so you can classify data as it arrives and integrate with data preparation and profiling tools. Try it out and let us know what you think!
  • And we’ve added lots of top-requested enhancements, such as Sentry auditing for Impala and integration with Hue.
Cloud Deployment
  • Cloudera Director is a simple and reliable way to deploy, scale, and manage Hadoop in the cloud (initially for AWS) in an enterprise-grade fashion. It’s free to download and use, and supported by default for Cloudera Enterprise customers. See the User Guide for more details.
Real-Time Architecture
  • Re-base on Apache HBase 0.98.6
  • Re-base on Apache Spark/Streaming 1.1
  • Re-base on Impala 2.0
  • Apache Sqoop now supports import into Apache Parquet (incubating) file format
  • Apache Kafka integration with CDH is now incubating in Cloudera Labs; a Kafka-Cloudera Labs parcel (unsupported) is available for installation. Integration with Flume via special Source and Sink have also been provided.
Impala 2.0
  • Disk-based query processing: enables large queries to “spill to disk” if their in-memory structures are larger than the currently available memory. (Note that this feature only uses disk for the portion that doesn’t fit in the available memory.)
  • Greater SQL compatibility: SQL 2003 analytic window functions, support for legacy data types (such as CHAR and VARCHAR), better compliance with SQL standards (WHERE, EXISTS, IN), and additional vendor-specific SQL extensions.
New Open Source Releases and Certifications

Cloudera Enterprise 5.2 includes multiple new component releases:

  • Apache Avro 1.7.6
  • Apache Crunch 0.11
  • Apache Hadoop 2.5
  • Apache HBase 0.98.6
  • Apache Hive 0.13.1
  • Apache Parquet (incubating) 1.5 / Parquet-format 2.1.0
  • Apache Sentry (incubating) 1.4
  • Apache Spark 1.1
  • Apache Sqoop 1.4.5
  • Impala 2.0
  • Kite SDK 0.15.0

…with new certifications on:

  • Filesystems: EMC Isilon
  • OSs: Ubuntu 14.04 (Trusty)
  • Java: Oracle JDK1.7.0_67

Over the next few weeks, we’ll publish blog posts that cover some of these and other new features in detail. In the meantime:

As always, we value your feedback; please provide any comments and suggestions through our community forums. You can also file bugs via issues.cloudera.org.

Categories: Hadoop

How SQOOP-1272 Can Help You Move Big Data from Mainframe to Apache Hadoop

Cloudera Blog - Fri, 10/10/2014 - 21:52

Thanks to M. Asokan, Chief Architect at Syncsort, for the guest post below.

Apache Sqoop provides a framework to move data between HDFS and relational databases in a parallel fashion using Hadoop’s MR framework. As Hadoop becomes more popular in enterprises, there is a growing need to move data from non-relational sources like mainframe datasets to Hadoop. Following are possible reasons for this:

  • HDFS is used simply as an archival medium for historical data living on the mainframe. It is cost effective to store data in HDFS.
  • Organizations want to move some processing workloads to Hadoop to free up CPU cycles on the mainframe.

Regardless, there was no solution available in Sqoop to allow users to easily move data from mainframe to Hadoop. We at Syncsort wanted to fill that void and started discussions with some Sqoop committers at Cloudera. They were very excited and receptive to the idea of Syncsort making an open source contribution to Sqoop. We decided that this contribution would go into Sqoop version 1.x since it is still widely used. We raised a Sqoop JIRA ticket SQOOP-1272 to work towards that.

Brief Introduction to ainframe Datasets

Unlike UNIX where a file consists of a sequence of bytes, files on mainframe operating systems contain structured data. The term “datasets” is used to refer to “files.” (Hereafter, we will use “datasets” when we refer to mainframe files.) Datasets have associated access methods and record formats at the OS level. The OS provides APIs to access records from datasets. Most mainframes run an FTP server to provide remote access to datasets. The FTP server accesses records using these APIs and passes them to clients. A set of sequential (an access method) datasets can be stored under something similar to a directory called Partitioned Data Set (PDS) on the mainframe. FTP is not the only way to access datasets on the mainframe remotely. There are proprietary software products like IBM’s Connect:Direct for remote access to datasets on a mainframe.

Functional Specification

Once we decided to make a contribution, I started working on the functional specification with the Syncsort engineering team. In terms of functionality, we wanted to start with small, but fundamental steps to accomplishing the objective of mainframe connectivity via Sqoop. Our goals were very modest:

  1. Connect to the mainframe using open source software.
  2. Support datasets that can be accessed using sequential access methods.

The first goal was a no-brainer. Sqoop being an Apache project, we decided to use Apache commons-net library for FTP client. The second goal follows the first since mainframe FTP server supports transferring sequential datasets only. On further discussions with the Sqoop team at Cloudera, we decided to support transferring a set of sequential datasets in a partitioned dataset. The transfer will happen in parallel. Each dataset will be stored as a separate HDFS file when the target is HDFS file or Hive/HCatalog. The sequential datasets are transferred to Hadoop in FTP text mode. Each record in the datasets will be treated as one text field in the target.

In essence, a new import tool called import-mainframe would be added to Sqoop. In the --connect option, user can specify a mainframe host name. In the --dataset option, user can specify the PDS name. For more details, readers can refer to the design document online.

Design

Now that the functionality was defined, I started working with the Syncsort engineering team on the design. I posted an overall design document in the JIRA for feedback from Sqoop committers. We went through a couple of iterations and agreed on it.

At the top level, the design involves implementing a new Sqoop tool class (MainframeImportTool) and a new connection manager class (MainframeManager.) If you dig a little deeper, there are support classes like mainframe specific Mapper implementation (MainframeDatasetImportMapper), InputFormat implementation (MainframeDatasetInputFormat), InputSplit implementation (MainframeDatasetInputSplit),RecordReader implementation (MainframeDatasetRecordReader), and so on.

Implementation

Next came the most difficult part, the actual implementation. Members of Syncsort engineering played a vital role in the detailed design and implementation. Since it is impossible to connect to a real mainframe in Apache testing environment, we decided to contribute unit tests based on Java mock objects. It was amazing to discover that Sqoop never used mock objects for testing before! We ran end-to-end tests with real mainframe in-house to verify the correctness of the implementation.

Patch Submission and Review Process

Once we were satisfied with the implementation and testing at Syncsort, I posted a patch to SQOOP-1272 for review. I was advised to post the patch to Sqoop Review Board. I have worked on a handful of Apache Hadoop JIRAs before. Most of the comments and discussions on a patch happen in the JIRA itself. It was a new experience for me to go through the Review Board. After some initial getting-used-to, I liked it very much since it kept most of the noise off of the JIRA. Also, the review comments were easy to address. There was a suggestion to add documentation to this new feature. I started digging through the Sqoop documentation files. I had to learn the documentation format. With some trial and error, I was able to understand it. It took a while to update the documentation. With busy schedules at work and despite Summer vacation interrupting my work on the JIRA, the review process was moving steadily. After a few cycles of review, the code and documentation got better and better. Finally, on Sept. 10, 2014, the patch to the JIRA was committed by Venkat. SQOOP-1272 will go into official Sqoop release 1.4.6.

Extensibility

The design and implementation of SQOOP-1272 allow anyone to extend the functionalities. Users can extend MainframeManager class to provide more complex functionalities while making use of the current implementation.

Syncsort’s DMX-h

Syncsort’s flagship Hadoop ETL product DMX-h provides enterprise grade mainframe connectivity with an easy to use graphical user interface. It has been enhanced so that it can be invoked from Sqoop. In particular, it supports the following features:

  • Ability to specify complex COBOL copybooks to define mainframe record layouts.
  • Ability to transfer a mainframe dataset “as is” in binary form to archive a golden copy in Hadoop.
  • Ability to transfer VSAM (Virtual Storage Access Method) datasets.

Please refer to this page for more details.

Acknowledgements

I think this contribution was a group effort and many individuals contributed to make it possible. In particular, I would like to thank Jarcec Cecho, Venkat Ranganathan, and Gwen Shapira for all their review comments and suggestions. I want to thank Syncsort engineer Jixiang Li who did the heavy lifting of the implementation and Yunkai Huang for creating the unit tests.

Categories: Hadoop

Pages