Hadoop

New in CDH 5.4: Apache HBase Request Throttling

Cloudera Blog - Thu, 05/28/2015 - 15:42

The following post about the new request throttling feature in HBase 1.1 (now shipping in CDH 5.4) originally published in the ASF blog. We re-publish it here for your convenience.

Running multiple workloads on HBase has always been challenging, especially  when trying to execute real-time workloads while concurrently running analytical jobs. One possible way to address this issue is to throttle analytical MR jobs so that real-time workloads are less affected.

A new QoS (quality of service) feature that Apache HBase 1.1 introduces is request-throttling, which controls the rate at which requests get handled by a HBase cluster. HBase typically treats all requests identically; however, the new throttling feature can be used to specify a maximum rate or bandwidth to override this behavior. The limit may be applied to a requests originating from a particular user, or alternatively, to requests directed to a given table or a specified namespace.

The objective of this post is to evaluate the effectiveness of this feature and the overhead it might impose on a running HBase workload. The performance runs carried out showed that throttling works very well, by redirecting resources from a user whose workload is throttled to the workloads of other users, without incurring a significant overhead in the process.

Enabling Request Throttling

It is straightforward to enable the request-throttling feature—all that is necessary is to set the HBase configuration parameter hbase.quota.enabled to true. The related parameter hbase.quota.refresh.period specifies the time interval in milliseconds that that regionserver should re-check for any new restrictions that have been added.

The throttle can then be set from the HBase shell, like so:

hbase> set_quota TYPE => THROTTLE, USER => 'uname', LIMIT => '100req/sec' hbase> set_quota TYPE => THROTTLE, TABLE => 'tbl', LIMIT => '10M/sec' hbase> set_quota TYPE => THROTTLE, NAMESPACE => 'ns', LIMIT => 'NONE'

Test Setup

To evaluate how effectively HBase throttling worked, a YCSB workload was imposed on a 10 node cluster. There were 6 RegionServers and 2 master nodes. YCSB clients were run on the 4 nodes that were not running RegionServer processes. The client processes were initiated by two separate users and the workload issued by one of them was throttled.

More details on the test setup follow.

HBase version:

HBase 0.98.6-cdh5.3.0-SNAPSHOT (HBASE-11598 was backported to this version)

Configuration:
CentOS release 6.4 (Final)
CPU sockets: 2
Physical cores per socket: 6
Total number of logical cores: 24
Number of disks: 12
Memory: 64GB
Number of RS: 6
Master nodes: 2  (for the Namenode, Zookeeper and HBase master)
Number of client nodes: 4
Number of rows: 1080M
Number of regions: 180
Row size: 1K
Threads per client: 40
Workload: read-only and scan
Key distribution: Zipfian
Run duration: 1 hour

Procedure

An initial data set was first generated by running YCSB in its data generation mode. A HBase table was created with the table specifications above and pre-split. After all the data was inserted, the table was flushed, compacted and saved as a snapshot. This data set was used to prime the table for each run. Read-only and scan workloads were used to evaluate performance; this eliminates effects such as memstore flushes and compactions. One run with a long duration was carried out first to ensure the caches were warmed and that the runs yielded repeatable results.

For the purpose of these tests, the throttle was applied to the workload emanating from one user in a two-user scenario. There were four client machines used to impose identical read-only workloads. The client processes on two machines were run by the user “jenkins”, while those on the other two were run as a different user. The throttle was applied to the workload issued by this second user. There were two sets of runs, one with both users running read workloads and the second where the throttled user ran a scan workload. Typically, scans are long running and it can be desirable on occasion to de-prioritize them in favor of more real-time read or update workloads. In this case, the scan was for sets of 100 rows per YCSB operation.

For each run, the following steps were carried out:

  • Any existing YCSB-related table was dropped.

  • The initial data set was cloned from the snapshot.

  • The desired throttle setting was applied.

  • The desired workloads were imposed from the client machines.

  • Throughput and latency data was collected and is presented in the table below.

The throttle was applied at the start of the job (the command used was the first in the list shown in the “Enabling Request Throttling” section above). The hbase.quota.refresh.period property was set to under a minute so that the throttle took effect by the time test setup was finished.

The throttle option specifically tested here was the one to limit the number of requests (rather than the one to limit bandwidth).

Observations and Results

The throttling feature appears to work quite well. When applied interactively in the middle of a running workload, it goes into effect immediately after the the quota refresh period and can be observed clearly in the throughput numbers put out by YCSB while the test is progressing. The table below has performance data from test runs indicating the impact of the throttle. For each row, the throughput and latency numbers are also shown in separate columns, one set for the “throttled” user (indicated by “T” for throttled) and the other for the “non-throttled” user (represented by “U” for un-throttled).

Read + Read Workload

As can be seen, when the throttle pressure is increased (by reducing the permitted throughput for user “T” from 2500 req/sec to 500 req/sec, as shown in column 1), the total throughput (column 2) stays around the same.  In other words, the cluster resources get redirected to benefit the non-throttled user, with the feature consuming no significant overhead. One possible outlier is the case where the throttle parameter is at its most restrictive (500 req/sec), where the total throughput is about 10% less than the maximum cluster throughput.

Correspondingly, the latency for the non-throttled user improves while that for the throttled user degrades. This is shown in the last two columns in the table.

The charts above show that the change in throughput is linear with the amount of throttling, for both the throttled and non-throttled user. With regard to latency, the change is generally linear, until the throttle becomes very restrictive; in this case, latency for the throttled user degrades substantially.

One point that should be noted is that, while the throttle parameter in req/sec is indeed correlated to the actual restriction in throughput as reported by YCSB (ops/sec) as seen by the trend in column 4, the actual figures differ. As user “T”’s throughput is restricted down from 2500 to 500 req/sec, the observed throughput goes down from 2500 ops/sec to 1136 ops/sec. Therefore, users should calibrate the throttle to their workload to determine the appropriate figure to use (either req/sec or MB/sec) in their case.

Read + Scan Workload

With the read/scan workload, similar results are observed as in the read/read workload. As the extent of throttling is increased for the long-running scan workload, the observed throughput decreases and latency increases. Conversely, the read workload benefits. displaying better throughput and improved latency. Again, the specific numeric value used to specify the throttle needs to be calibrated to the workload at hand. Since scans break down into a large number of read requests, the throttle parameter needs to be much higher than in the case with the read workload. Shown above is a log-linear chart of the impact on throughput of the two workloads when the extent of throttling is adjusted.

Conclusion

HBase request throttling is an effective and useful technique to handle multiple workloads, or even multi-tenant workloads on an HBase cluster. A cluster administrator can choose to throttle long-running or lower-priority workloads, knowing that RegionServer resources will get re-directed to the other workloads, without this feature imposing a significant overhead. By calibrating the throttle to the cluster and the workload, the desired performance can be achieved on clusters running multiple concurrent workloads.

Govind Kamat is a Performance Engineer at Cloudera, and an HBase contributor.

Categories: Hadoop

Impala Needs Your Contributions

Cloudera Blog - Fri, 05/22/2015 - 14:59

Your contributions, and a vibrant developer community, are important for Impala’s users. Read below to learn how to get involved.

From the moment that Cloudera announced it at Strata New York in 2012, Impala has been an 100% Apache-licensed open source project. All of Impala’s source code is available on GitHub—where nearly 500 users have forked the project for their own use—and we follow the same model as every other platform project at Cloudera: code changes are committed “upstream” first, and are then selected and backported to our release branches for CDH releases.

However, Impala was (and still is!) moving at an extraordinary pace, and we focused on meeting the feature and stability needs of our user community at the expense of making it easy for the developer community to contribute fixes and new features. After the release of Impala 2.0, which was a major milestone for the project, we’ve been working on incubating a developer community. Now we’re ready to more widely publicize our improvements, and invite more developers to come and build the state-of-the-art SQL query engine on Hadoop with us.

Since January 2015, we have moved more and more of Impala’s development out into the open, where members of the community can watch the progress of the JIRA issues that are important to them (including through the code review process), participate in technical discussion on the Impala developer mailing list, and can submit new patches for inclusion in Impala.

We are committed to making it easier for our developer community to work with Impala’s source code. We’ve recently released a developer environment Docker image that contains everything you need to start building and working with Impala. We have also posted the full set of generated developer documentation to help navigate the hundreds of classes and thousands of lines of code that make up Impala’s internals. We’re in the process of adding a Jenkins-based test server to make it easier for contributors to validate their patches—watch this space! Finally, we’ve gathered together some of the published papers on Impala, and the presentations that we’ve been giving at meetups and conferences around the world.

If you’d like to get involved in contributing to Impala, start with the presentation we gave at the March 2015 Impala Meetup in Cloudera’s Palo Alto office, which provides an overview of our contribution process:

Then check out our contribution guidelines, join the mailing list and get started! We’re excited to work with you.

Henry Robinson is a Software Engineer at Cloudera on the Impala team.

Marcel Kornacker is a tech lead at Cloudera and the founder and architect of Impala.

Categories: Hadoop

Graduating Apache Parquet

Cloudera Blog - Thu, 05/21/2015 - 20:41

The following post from Julien Le Dem, a tech lead at Twitter, originally appeared in the Twitter Engineering Blog. We bring it to you here for your convenience.

ASF, the Apache Software Foundation, recently announced the graduation of Apache Parquet, a columnar storage format for the Apache Hadoop ecosystem. At Twitter, we’re excited to be a founding member of the project.

Apache Parquet is built to work across programming languages, processing frameworks, data models and query engines including Apache Hive, Apache Drill, Impala and Presto.

At Twitter, Parquet has helped us scale by reducing storage requirements by at least one-third on large datasets, as well as improving scan and deserialization time. This has translated into hardware savings and reduced latency for accessing data. Furthermore, Parquet’s integration with so many tools creates opportunities and flexibility for query engines to help optimize performance.

Since we announced Parquet, these open source communities have integrated the project: Apache Crunch, Apache Drill, Apache Hive, Apache Pig, Apache Spark, Apache Tajo, Kite, Impala, Presto and Scalding.

What’s New?

The Parquet community just released version 1.7.0 with several new features and bug fixes. This update includes:

  • A new filter API for Java and DSL for Scala that uses statistics metadata to filter large batches of records without reading them
  • A memory manager that will scale down memory consumption to help avoid crashes
  • Improved MR and Spark job startup time
  • Better support for evolving schemas with type promotion when reading
  • More logical types for storing dates, times, and more
  • Improved compatibility between Hive, Avro and other object models

As usual, this release also includes many other bug fixes. We’d like to thank the community for reporting these and contributing fixes. Parquet 1.7.0 is now available for download.

Future Work

Although Parquet has graduated, there’s still plenty to do, and the Parquet community is planning some major changes to enable even more improvements.

First is updating the internals to work with the zero-copy read path in Hadoop, making reads even faster by not copying data into Parquet’s memory space. This will also enable Parquet to take advantage of HDFS read caching and should pave the way for significant performance improvements.

After moving to zero-copy reads, we plan to add a vectorized read API that will enable processing engines like Drill, Presto and Hive to save time by processing column data in batches before reconstructing records in memory, if at all.

We also plan to add more advanced statistics based record filtering to Parquet. Statistics based record filtering allows us to drop entire batches of data with only reading a small amount of metadata). For example, we’ll take advantage of dictionary encoded columns and apply filters to batches of data by examining a column’s dictionary, and in cases where no dictionary is available, we plan to store a bloom filter in the metadata.

Aside from performance, we’re working on adding POJO support in the Parquet Avro object model that works the same way Avro handles POJOs in avro-reflect. This will make it easier to use existing Java classes that aren’t based on one of the already-supported object models and enable applications that rely on avro-reflect to use Parquet as their data format.

Getting Involved

Parquet is an independent open source project at the ASF. To get involved, join the community mailing lists and any of the community hangouts the project holds. We welcome everyone to participate to make Parquet better and look forward to working with you in the open.

Acknowledgements

First we would like to thank Ryan Blue from Cloudera for helping craft parts of this post and the wider Parquet community for contributing to the project. Specifically, contributors from a number of organizations (Twitter, Netflix, Criteo, MapR, Stripe, Cloudera, AmpLab) contributed to this release. We’d also like to thank these people: Daniel Weeks, Zhenxiao Luo, Nezih Yigitbasi, Tongjie Chen, Mickael Lacour, Jacques Nadeau, Jason Altekruse, Parth Chandra, Colin Marc (@colinmarc), Avi Bryant (@avibryant), Ryan Blue, Marcel Kornacker, Nong Li (@nongli), Tom White (@tom_e_white), Sergio Pena, Matt Massie (@matt_massie), Tianshuo Deng, Julien Le Dem, Alex Levenson, Chris Aniszczyk and Lukas Nalezenec.

Categories: Hadoop

How-to: Read FIX Messages Using Apache Hive and Impala

Cloudera Blog - Thu, 05/21/2015 - 16:42

Learn how to read FIX message files directly with Hive, create a view to simplify user queries, and use a flattened Apache Parquet table to enable fast user queries with Impala.

The Financial Information eXchange (FIX) protocol is used widely by the financial services industry to communicate various trading-related activities. Each FIX message is a record that represents an action by a financial party, such as a new order or an execution report. As the raw point of truth for much of the trading activity of a financial firm, it makes sense that FIX messages are an obvious data source for analytics and reporting in Apache Hadoop.

Apache Hive, the versatile SQL engine that runs on Hadoop, can read FIX messages without any code or pre-processing. Thus users can easily load FIX message files into Hadoop and immediately start executing queries. Furthermore, with a simple data set conversion, users can also run their queries against Impala, the open source analytic database for Hadoop. Cloudera’s open platform lets you do both types of queries inside the Hue GUI so that a command-line session is not required. This blog post will show you how.

If you do not already have a Hadoop cluster but would like to run these steps, the Cloudera QuickStart VM allows you to run a pre-made single-node Hadoop cluster on your own computer. Or, use Cloudera Live to access a full Cloudera Enterprise cluster for a free two-week period.

Loading Data and Creating Tables

First, let’s review the structure of a FIX message.

FIX messages are formatted as a set of key-value pairs on a single line. Each key-value pair contains an attribute (known as a tag) of the message, such as the message type or order identifier. The key-value pairs are separated by the ASCII 001 “start of heading” character (commonly visualized as “^A”), and the keys and values are separated by the equal-sign character (=).

For example:

8=FIX.4.2^A9=145^A35=D^A34=4^A49=ABC_DEFG01^A52=20090323-15:40:29^A56=CCG^A115=XYZ^A11=NF0542/03232009^A54=1^A38=100^A55=CVS^A40=1^A59=0^A47=A^A60=20090323-15:40:29^A21=1^A207=N^A10=139^A

Armed with one or more files containing FIX messages, one per line, you can push them into Hadoop by uploading them into an HDFS directory using the Hue File Browser:

You can create a Hive table over the top of the FIX message files using a Hive CREATE TABLE statement.

Hive is sufficiently flexible to allow expression of each record as a collection of key-value pairs, using the MAP data type. Thus, you can simply reference the key for a message and Hive will return the value. Although key in the FIX message format is an integer (INT), the values can have varying data types, so you would use strings (STRING) to capture any value.

Delimiters are specified using the TERMINATED BY clause. You express the collection item delimiter as ASCII 001 using ‘1’, and the map key delimiter as the equals sign using ‘=’. The default field delimiter is ASCII 001, so you change it to the placeholder ASCII 002 to avoid ambiguity. You are only defining one field in each FIX record—the single MAP collection of key-value pairs—so the field delimiter will not be found in the files.

An external Hive table is used and this maps the table to the location of the uploaded FIX files. Dropping an external table does not delete the underlying data.

CREATE EXTERNAL TABLE fix_map (tag MAP<INT, STRING>) ROW FORMAT DELIMITED COLLECTION ITEMS TERMINATED BY '1' FIELDS TERMINATED BY '2' MAP KEYS TERMINATED BY '=' LOCATION '/user/jeremy/fix/';

This statement can be run in the Hue Hive Editor after replacing the location with the HDFS path of the uploaded FIX files.

Queries

With the data loaded and the table created, you can start to query the FIX records. Use the single ‘tag’ column to reference the various tags of the messages.

This feature is powerful because you can reference any tag identifier without the need to pre-define it. The downside, however, is that you need to recall tag numbers to query the data.

You can make querying easier by first creating a Hive view over the table that renames the tag references, and then reading from the view instead of the table. This approach may also be a good opportunity to cast specific tags to specific types so that users don’t have to do that themselves.

CREATE VIEW fix_view AS SELECT tag[38] AS OrderQty , tag[55] AS Symbol , tag[60] AS TransactTime FROM fix_map; SELECT OrderQty , Symbol , TransactTime FROM fix_view;

(Find a pre-made view of all known tags here. Use them all, or edit the list to include only the desired tags. Tags referenced but not found in the data will return as NULL.)

Optimizing with Impala

At this point, you’re in a good spot: You can query the raw FIX message files and reference tags using sensible field names. You could stop there and functionally have all that you need, but if you want to go a bit further, you can improve query performance by converting the data into a more read-optimized format.

This process involves two transformations:

  • Converting the data from records of collections of key-value pairs to records of plain columns. This approach allows you to use Impala instead of Hive for user queries—which is much faster, but in this case, does not support the MAP data type. This conversion will also speed up Hive queries by de-serializing the MAP key-value pairs in advance.
  • Converting the data from plain text files into Apache Parquet files, which is a column-oriented binary format that considerably improves query performance over delimited text files.

You can use Hive to do both of these tasks in a single query, and most of the work has already been done by using the view created earlier:

CREATE TABLE fix STORED AS PARQUET AS SELECT tag[38] AS OrderQty , tag[55] AS Symbol , tag[60] AS TransactTime FROM fix_map;

Switching to the Hue Impala Editor, let Impala know about the new table via INVALIDATE METADATA fix; and query it like you did with the view. However, as “symbol” is a reserved word in Impala, you need to wrap it in back-ticks:

SELECT OrderQty , `Symbol` , TransactTime FROM fix;

Conclusion

In this post, you’ve learned how easy it is use Hadoop to query FIX message datasets with SQL, and that Hive is a good choice for preparing and optimizing data while Impala is a good choice for running analytic queries. This same pattern can be applied to a wide variety of other datasets, which can all be integrated together in Hadoop to discover new insights about your enterprise. Happy querying!

Jeremy Beard is a Senior Solutions Architect at Cloudera.

Categories: Hadoop

How-to: Get Started with CDH on OpenStack with Sahara

Cloudera Blog - Mon, 05/18/2015 - 16:03

The recent OpenStack Kilo release adds many features to the Sahara project, which provides a simple means of provisioning an Apache Hadoop (or Spark) cluster on top of OpenStack. This how-to, from Intel Software Engineer Wei Ting Chen, explains how to use the Sahara CDH plugin with this new release.

Prerequisites

This how-to assumes that OpenStack is already installed. If not, we recommend using Devstack to build a test OpenStack environment in a short time. (Note: Devstack is not recommended for use in a production environment. For production deployments, refer to the OpenStack Installation Guide.)

Sahara UI

You can use Horizon as a web-based interface to manage your Sahara environment. Please login to the Horizon website and look for the “Data Processing” link under the “Project” tab. You can also use Sahara CLI to launch the commands in your OpenStack environment.


Login page


“Data Processing” link

Sahara CDH Plugin Configuration

Confirm the CDH plugin has already been enabled in the Sahara configuration. (Currently CDH 5.0 and CDH 5.3 are supported; CDH 5.4 will be supported in an upcoming Kilo release.) In the Kilo release, Sahara enables the CDH plugin by default. If you find there is no CDH plugin in your OpenStack environment, confirm the CDH plugins (vanilla, hdp, cdh, spark, fake) are installed in the Sahara configuration file (/etc/sahara/sahara.conf).

You can also confirm the plugin is installed via Horizon. Below is a screenshot of the Horizon page where you can confirm installed plugins.


Data processing plugins

Building a CDH Image

Before you start to use Sahara, you need to prepare one image with Cloudera packages. The open source project sahara-image-elements is a script that creates Sahara images. Below are the steps for building the image via this project.

Step 1: Checkout the project from https://github.com/openstack/sahara-image-elements

# git clone https://github.com/openstack/sahara-image-elements

Step 2: Run this command and add some arguments to specify what OS distribution you would like to build. Here we use CentOS and CDH 5.3 as an example.

# sudo DIB_CDH_VERSION=”5.3” bash sahara-image-elements/diskimage-create/diskimage-create.sh -p cloudera -i centos

This command will download a base image and start to pre-install all the required package including Cloudera Manager and the CDH package into the image. The image creation requires internet connectivity. The entire process may take a long time depending on your internet speed and machines.

Currently Sahara can support building “Ubuntu” and “CentOS” images. The Kilo release has been tested with CDH “5.0”, “5.3”, abd “5.4”. Cloudera Manager package will be downloaded into the image from the Cloudera website.

For more information, please refer to https://github.com/openstack/sahara-image-elements.

Uploading the Image with Glance

You need to upload and register the image with the Glance service in your OpenStack environment. These steps use Horizon to upload the image you created from the previous step. You can also use glance CLI to upload your own image.

Step 1: Click “Images” in “System.”
Step 2: Click “Create Image.”
Step 3: Click “Image File” from “Image Source.”
Step 4: Click “Choose File” from Image File and select the image you would like to upload from file explorer.
Step 5: Fill in all the required fields; for our example, we use QCOW2 as the image format. You can also select other options as per your needs.


Creating an image

Adding Tags to the Glance Image with Sahara

Next, you need to register the Glance image to Sahara and add tags to help Sahara recognize the images.

Step 1: Select “Image Registry.”
Step 2: Click “Register Image” and select an image to add username and tags. (The username is a user account creating from sahara-image-elements. By default, the ubuntu user is “ubuntu” and the centos user is “cloud-user.” As for tags, add the related plugin and version tags to the image. It supports adding of multiple version tags in one image.)


Registering an image

Provisioning a Cluster

Sahara provides two major features. It allows you to:

  • Provision a cluster in your Openstack environment. You can use the Sahara UI to quickly select the services you want to launch in your cluster; there are many settings you can configure during the cluster creation, such as for anti-affinity.
  • Submit a job to the created cluster. The purpose is to help data scientists or application developers focus on development and ignore the provisioning process.
Provision a cluster using Guides

Using Guides can help you following the steps to launch a cluster.

Step 1. Select “Cluster Creation Guide” from “Data Processing Guides.”


Data processing Guides

Step 2: Select Plugin “Cloudera Plugin” and Version “5.3.”


Selecting plugin version

Step 3: Create templates and launch a cluster. After launching a cluster, you can check the “Cluster” tab to see the launched cluster.


Clusters

Provision a cluster manually

You can also create your own cluster using the node group template:

Step 1: Create node group templates. A node group template is a basic template to provision a node. You can use this template to create your own custom node and select which service you would like to run. There are several settings you need to assign:

  • OpenStack Flavor: select a flavor size for this template.
  • Availability Zone: select an availability zone for this template.
  • Storage Location:  select a storage location to put HDFS (currently supports ephemeral and cinder).
  • Select Floating IP pool: Optional; select a floating ip for this template.
  • Select Process: CDH Plugin support most services in CDH 5.3/CDH 5.4. After selecting a process, you can find the related tab on top of screen and you can change the default parameter for this process.


Node group templates


Creating node group template (1)


Creating node group template (2)

Step 2: Create cluster templates. After creating node group template, you need to create a cluster template by selecting multiple node group template. Go to the “Node Groups” tab and select how many node group templates you would like to run in this cluster template.


Cluster templates


Creating a cluster template

Step 3: Launch Cluster using Cluster Templates. Choose a cluster template and wait for the cluster status to become “Active.”

How to Run a Job Run a job using Guides

Step 1: Select “Job Execution Guide” in “Guides.”
Step 2: Follow the steps to create the required templates.


Guided job execution

Run a job manually

Step 1: Create a Data Source. Select a data source type to put your input and output file. Currently Sahara supports internal/external HDFS and Swift for the CDH plugin. If you select Swift, please provide your account information.


Creating a data source

Step 2: Create a Job Binary. Write your own binary and upload it via the Sahara UI. The supported storage type include internal databases and Swift.


Creating a job binary

Step 3: Create job templates. Create a job template using the job binary you created in previous step. Choose which job type (pig, hive, mapreduce, and java) you would like to run and choose the binary you uploaded previously; you can also add the related libraries in this page.


Creating a job template

Step 4: Run a job. Select a job template you would like to run and check job status on the Jobs page. (If the job finishes without problems, the status will be “Success.” If a job fails, it will show “Killed.”) If you would like to get more information, you can check the cluster details to get Cloudera Manager address by clicking the cluster name.

FAQ

Finally, here are answers to some questions you may have.

  1. Is Apache Sentry integratIon available?
    Yes. However, although a Sentry service is on the service list, currently it needs to be configured manually. This is a limitation in the current Kilo release.
  2. Can I access Cloudera Manager directly?
    Yes, you can. Just click “Cluster Name” on the Clusters page and you can see the Cloudera Manager address and the password for the user – admin. For each cluster, the CDH plugin will create an individual password for security.
  3. Why is my Cluster status always “Waiting?”
    Please confirm al instances are reachable by using ssh and ping. If not, please check your security group settings.
  4. How do I use external HDFS as a data source?
    You need to setup this up manually and make sure there are no authentication issues between computing nodes and external HDFS.
  5. Does the CDH plugin support Hadoop’s data locality feature?
    No, currently the plugin does not support data locality.
  6. What if I create an invalid setting for the node group template?
    Before launching a cluster, there will be a validation warning to confirm your cluster template is valid. If you create an invalid cluster template, you will receive an invalid message when you are launching a cluster using this template.
  7. Is there HA support for CDH plugin?
    No, currently there is no HA support for a virtual cluster.
  8. How to define a CDH template?
    There is a validation program when you launch a cluster template to verify your template is workable. For more detail about the validation rule, see these docs. And you can follow the validation rule to design your cluster template.
  9. Can I configure the details for every services?
    Yes, you can. Please check the sub-tab of process; the sub-tab will pop-up when you select the process in the node group template.
  10. Can I scale up/down a running cluster?
    Yes, currently you may scale up/down a Node Manager or Data Node.

Enjoy!

Categories: Hadoop

Scan Improvements in Apache HBase 1.1.0

Cloudera Blog - Fri, 05/15/2015 - 16:02

The following post, from Cloudera intern Jonathan Lawlor, originally appeared in the Apache Software Foundation’s blog.

Over the past few months there have a been a variety of nice changes made to scanners in Apache HBase. This post focuses on two such changes, namely RPC chunking (HBASE-11544) and scanner heartbeat messages (HBASE-13090). Both of these changes address long standing issues in the client-server scan protocol. Specifically, RPC chunking deals with how a server handles the scanning of very large rows and scanner heartbeat messages allow scan operations to progress even when aggressive server-side filtering makes infrequent result returns.

Background

In order to discuss these issues, let’s first gain a general understanding of how scans currently work in HBase.

From an application’s point of view, a ResultScanner is the client side source of all of the Results that an application asked for. When a client opens a scan, it’s a ResultScanner that is returned and it is against this object that the client invokes next to fetch more data. ResultScanners handle all communication with the RegionServers involved in a scan and the ResultScanner decides which Results to make visible to the application layer. While there are various implementations of the ResultScanner interface, all implementations use basically the same protocol to communicate with the server.

In order to retrieve Results from the server, the ResultScanner will issue ScanRequests via RPC’s to the different RegionServers involved in a scan. A client configures a ScanRequest by passing an appropriately set Scan instance when opening the scan setting start/stop rows, caching limits, the maximum result size limit, and the filters to apply.

On the server side, there are three main components involved in constructing the ScanResponse that will be sent back to the client in answer to a ScanRequest:

RSRpcService

The RSRpcService is a service that lives on RegionServers that can respond to incoming RPC requests, such as ScanRequests. During a scan, the RSRpcServices is the server side component that is responsible for constructing the ScanResponse that will be sent back to the client. The RSRpcServices continues to scan in order to accumulate Results until the region is exhausted, the table is exhausted, or a scan limit is reached (such as the caching limit or max result size limit). In order to retrieve these Results, the RSRpcServices must talk to a RegionScanner

RegionScanner

The RegionScanner is the server side component responsible for scanning the different rows in the region. In order to scan through the rows in the region, the RegionScanner will talk to a one or more different instances of StoreScanners (one per column family in the row). If the row passes all filtering criteria, the RegionScanner will return the Cells for that row to the RSRpcServices so that they can be used to form a Result to include in the ScanResponse.

StoreScanner

The StoreScanner is the server side component responsible for scanning through the Cells in each column family.

When the client (i.e. ResultScanner) receives the ScanResponse back from the server, it can then decide whether or not scanning should continue. Since the client and the server communicate back and forth in chunks of Results, the client-side ResultScanner will cache all the Results it receives from the server. This allows the application to perform scans faster than the case where an RPC is required for each Result that the application sees.

RPC Chunking Why is it necessary?

Currently, the server sends back whole rows to the client (each Result contains all of the cells for that row). The max result size limit is only applied at row boundaries. After each full row is scanned, the size limit will be checked.

The problem with this approach is that it does not provide a granular enough restriction. Consider, for example, the scenario where each row being scanned is 100MB. This means that 100MB worth of data will be read between checks for the size limit. So, even in the case that the client has specified that the size limit is 1MB, 100MB worth of data will be read and then returned to the client.

This approach for respecting the size limit is problematic. First of all, it means that it is possible for the server to run out of memory and crash in the case that it must scan a large row. At the application level, you simply want to perform a scan without having to worry about crashing the RegionServer.

This scenario is also problematic because it means that we do not have fine-grained control over the network resources. The most efficient use of the network is to have the client and server talk back and forth in fixed-sized chunks.

Finally, this approach for respecting the size limit is a problem because it can lead to large, erratic allocations server-side playing havoc with GC.

Goal of the RPC Chunking solution

The goal of the RPC chunking solution was to:

  • Create a workflow that is more ‘regular’, and less likely to cause RegionServer distress
  • Use the network more efficiently
  • Avoid large garbage collections caused by allocating large blocks

Furthermore, we wanted the RPC chunking mechanism to be invisible to the application. The communication between the client and the server should not be a concern of the application. All the application wants is a Result for their scan. How that Result is retrieved is completely up to the protocol used between the client and the server.

Implementation Details

The first step in implementing this proposed RPC chunking method was to move the max result size limit to a place where it could be respected at a more granular level than on row boundaries. Thus, the max result size limit was moved down to the cell level within StoreScanner. This meant that now the max result size limit could be checked at the cell boundaries, and if in excess, the scan can return early.

The next step was to consider what would occur if the size limit was reached in the middle of a row. In this scenario, the Result sent back to the client will be marked as a “partial” Result. By marking the Result with this flag, the server communicates to the client that the Result is not a complete view of the row. Further Scan RPC’s must be made in order to retrieve the outstanding parts of the row before it can be presented to the application. This allows the server to send back partial Results to the client and then the client can handle combining these partials into “whole” row Results. This relieves the server of the burden of having to read the entire row at a time.

Finally, these changes were performed in a backwards compatible manner. The client must indicate in its ScanRequest that partial Results are supported. If that flag is not seen server side, the server will not send back partial results. Note that the configuration hbase.client.scanner.max.result.size can be used to change the default chunk size. By default this value is 2MB in HBase 1.1+.

An option (Scan#setAllowPartialResults) was also added so that an application can ask to see partial results as they are returned rather than wait on the aggregation of complete rows.

A consistent view on the row is maintained even though a row is the result of multiple RPC partials because the running context server-side keeps account of the outstanding mvcc read point and will not include in results Cells written later.

Note that this feature will be available starting in HBase 1.1. All 1.1+ clients will chunk their responses.

Jonathan Lawlor, a computer science student at University of Waterloo and HBase contributor, recently completed an internship at Cloudera.

Categories: Hadoop

Working with Apache Spark: Or, How I Learned to Stop Worrying and Love the Shuffle

Cloudera Blog - Thu, 05/14/2015 - 16:16

Our thanks to Ilya Ganelin, Senior Data Engineer at Capital One Labs, for the guest post below about his hard-earned lessons from using Spark.

I started using Apache Spark in late 2014, learning it at the same time as I learned Scala, so I had to wrap my head around the various complexities of a new language as well as a new computational framework. This process was a great in-depth introduction to the world of Big Data (I previously worked as an electrical engineer for Boeing), and I very quickly found myself deep in the guts of Spark. The hands-on experience paid off; I now feel extremely comfortable with Spark as my go-to tool for a wide variety of data analytics tasks, but my journey here was no cakewalk.

Capital One’s original use case for Spark was to surface product recommendations for a set of 25 million users and 10 million products, one of the largest datasets available for this type of modeling. Moreover, we had the goal of considering all possible products, specifically to capture the “long tail” (AKA less frequently purchased items). That problem is even harder, since to generate all possible pairings of user and products, you’d have 250e12 combinations—which is more data than you can store in memory, or even on disk. Not only were we ingesting more data than Spark could readily handle, but we also had to step away from the standard use case for Spark (batch processing with RDDs) and actually decompose the process of generating recommendations into an iterative operation.

Learning how to properly configure Spark, and use its powerful API in a way that didn’t cause things to break, taught me a lot about its internals. I also learned that at the time, there really wasn’t a consolidated resource that explained how those pieces fit together. The end goal of Spark is to abstract away those internals so the end-user doesn’t have to worry about them, but at the time I was using it (and to some degree today), to write efficient and functional Spark code you need to know what’s going on under the hood. This blog post is intended to reveal just that: to teach the curious reader what’s happening, and to highlight some simple and tangible lessons for writing better Spark programs.

Note: this post is not intended as a ground-zero introduction. Rather, the reader should have some familiarity with Spark’s execution model and the basics of Spark’s API.

The Pieces

First, let’s review the key players in the Spark infrastructure. I won’t touch on all the components, but there are a few basics that I’ll cover. These are partitioning, caching, serialization, and the shuffle operation.

Partitions

Spark’s basic abstraction is the Resilient Distributed Dataset, or RDD. The RDD is how Spark simplifies complex operations like join or groupBy and hides the fact that under the hood, you’re dealing with fragmented data. That fragmentation is what enables Spark to execute in parallel, and the level of fragmentation is a function of the number of partitions of your RDD. The number of partitions is important because a stage in Spark will operate on one partition at a time (and load the data in that partition into memory). Consequently, if you have fewer partitions than active stages, you will wind up under-utilizing your cluster. Furthermore, since with fewer partitions there’s more data in each partition, you increase the memory pressure on your program. On the flip side, with too many partitions, your performance may degrade as you take a greater hit from network and disk I/O. Ultimately this concept ties into Spark’s notion of parallelism and how you can tune it (see the discussion of tuning parallelism here) to optimize performance.

Caching

Spark is a big deal for two main reasons: first, as already mentioned, it has a really simple and useful API for complex processes. The other big thing is that unlike your standard MapReduce program, Spark lets you cache intermediate results in memory. Caching can dramatically improve performance if you have data structures that are used frequently, such as a lookup table or a matrix of scores in a machine-learning algorithm. Caching can also introduce problems since it will often require huge chunks of memory; tuning this process is its own challenge, but doing so can increase performance by several orders of magnitude.

Serialization

In distributed computing, you generally want to avoid writing data back and forth because it’s expensive. Instead, the common paradigm is to bring your code to your data. This is why many frameworks are based on the JVM, which lets you execute code on the same machine as the data. Serialization is the process of translating this code into an ideally compressed format for efficient transfer over the network. By default, Spark uses the standard Java serializer. However, you can get much faster and more memory-efficient serialization using Kryo serialization. Switching to Kryo can reduce memory pressure on your cluster and improve stability.

Shuffle

Even though moving data is expensive, sometimes it’s necessary. For example, certain operations need to consolidate data on a single node so that it can be co-located in memory, such as when you want to perform a reduce operation across all values associated with a particular key of a key-value RDD (reduceByKey()). This expensive reorganization of data is known as the shuffle. The shuffle involves serialization as well as Akka, Spark’s internal messaging system, thereby consuming disk and network I/O while increasing memory pressure from garbage collection. Improperly configured Akka settings or serialization settings can cause problems depending on the size of your data. There is an excellent write-up of what happens during a shuffle in this Cloudera Engineering blog post.

Lessons

Next, let’s delve into how these components all come together when you write a Spark program and, specifically, what lessons I’ve learned about tying all these pieces together.

Lesson 1: Spark Gorges on Memory

As I mentioned previously, part of Spark’s power is its ability to cache things in memory. The downside of this amazing utility is that when you use it, Spark transforms into a total memory hog. First, the JVM and YARN (if you’re using it) consume a significant amount of memory, leaving less than you expect for data movement and caching. Next, there’s metadata that accumulates on the driver as a byproduct of shuffle operations and becomes particularly problematic during long-running jobs (multi-day). Finally, Java or Scala classes may introduce hidden overhead in your RDDs. A 10-character Java string may actually consume as much as 60 bytes! To pour salt in the wound, actually tracking down the source of a problem can be nearly impossible since a Spark program may have logs distributed across the cluster, have hundreds of tasks executing per second, and errors may not always propagate all the way up the stack when exceptions are thrown.

Thus, the first thing to do is to tame this unruly beast. For starters, it’s critical to partition wisely, to manage memory pressure as well as to ensure complete resource utilization. Next, you must always know your data—size, types, and how it’s distributed. This last bit is important since otherwise you may wind up with skewed distribution of data across partitions. A simple solution for this last problem is to use a custom partitioner. Last, as mentioned above, Kryo serialization is faster and more efficient.

To deal with the issue of accumulating metadata, you have two options. First, you can set the spark.cleaner.ttl parameter to trigger automatic cleanups. However, this will also wipe out any persisted RDDs and I found that it caused problems when trying to subsequently interact with HDFS. The other solution, which I ended up implementing in my case, is to simply split long-running jobs into batches and write intermediate results to disk. This way, you have a fresh environment for every batch and don’t have to worry about metadata build-up.

Lesson 2: Avoid Data Movement

In general, I found that avoiding shuffles and minimizing data transfers helped me write programs that ran faster and executed more reliability. Keep in mind: there are occasional cases when having an extra shuffle can help, such as when you have data that can’t be automatically partitioned into many partitions (see “When More Shuffles are Better” here.)

So, how does one avoid shipping data? The obvious answer is to avoid operations that trigger shuffles like repartition and coalesce, ByKey operations (except forcounting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Spark also provides two mechanisms in particular that help us here. Broadcast variables are read-only variables that are cached in-memory locally on each machine and eliminate the need to ship copies of it with every task. Using broadcast variables can also let you do efficient joins between large and small RDDs or store a lookup table in memory that provides more efficient retrieval than doing an RDD lookup().

Accumulators are a way to efficiently update a variable in parallel during execution. Accumulators differ from broadcast variables in that they may only be read from on the driver process, but they allow Spark programs to efficiently aggregate results such counters, sums, or generated lists. An important note about Accumulators is that they’re not limited to basic types; you can accumulate any Accumulable classes.

Lesson 3: Avoiding Data Movement is Hard

The challenge of using the above mechanisms is that, for example, to broadcast an RDD you need to first collect() it on the driver node. To accumulate the results of distributed execution you need to serialize that data back to the driver and aggregate it there. The upshot of this is that all of a sudden you’re increasing the memory pressure on the driver. Between collected RDDs, the persistent metadata discussed previously, and Accumulators, you may quickly run out of memory on the driver. You can, of course, increase the amount of memory allocated by setting spark.driver.memory, but that only works to a degree.

Above, I mentioned a few things: how a smaller number of partitions increases the size footprint of each partition, and how Spark uses Akka for messaging. If your 2GB RDD is only partitioned into 20 partitions, to serialize the data from one partition to another node, you’ll need to ship a 100MB chunk of data from each partition via Akka. But, by default, Akka’s buffer is only 10 MB! One can get around this by setting akka.frameSize, but this is another example of needing to fundamentally understand your data and how all these moving pieces come together.

Lesson 4: Speed!

The first three lessons deal primarily with stability. Next, I want to briefly describe how I got dramatic performance gains.

The first is fairly obvious: liberal use of caching. Of course, you don’t have infinite memory so you must cache wisely, but in general, if you use some data twice, cache it. Also, unused RDDs that go out of scope will be automatically un-persisted but they may be explicitly released with unpersist().

Second, I found broadcast variables extremely useful. I use them regularly for large maps and lookup tables. They have a significant advantage over an RDD that is cached in memory since a lookup in an RDD, even one cached in memory, will still be O(m), where m is the length of data in a single partition. In contrast, a broadcasted hash map will have a lookup of O(1).

Finally, there are times when I had to help Spark parallelize its execution. In my case, I needed to perform a per-key operation between two RDDs—that is, select a key-value pair from the first and perform some computation with that data against the second RDD. In a naive implementation, Spark would process these keys one at a time, so I was of course getting terrible resource utilization on my cluster. I’d have only a few tasks running at a time and most of my executors weren’t used. Because I wasn’t using the RDD API to run this code, I couldn’t benefit from Spark’s built-in parallelism. The ultimate solution was simple: create a thread pool on the driver that would process multiple keys at the same time. Each of the threads would generate tasks that were submitted to YARN and now I could easily ramp up my CPU utilization.

The final lesson here is that Spark can only help you so long as you help it to do so.

Conclusion

Ideally, the above concept breakdown and lessons (coupled with the provided documentation links) help clarify some of Spark’s underlying complexity. I’ve made liberal citations to the sources that helped me most when it came to understanding all these various mechanisms, as well as those that guided me through learning how to configure and use Spark’s more advanced features. I simply hope you find them as useful as I have!

Ilya is a roboticist turned data scientist. After a few years at the University of Michigan building self-discovering robots and another few years working on core DSP software with cell phones and radios at Boeing, he landed in the world of big data at Capital One Labs. He’s an active contributor to the core components of Spark with the goal of learning what it takes to build a next-generation distributed computing platform.

Categories: Hadoop

New in CDH 5.4: Hot-Swapping of HDFS DataNode Drives

Cloudera Blog - Mon, 05/11/2015 - 16:18

This new feature gives Hadoop admins the commonplace ability to replace failed DataNode drives without unscheduled downtime.

Hot swapping—the process of replacing system components without shutting down the system—is a common and important operation in modern, production-ready systems. Because disk failures are common in data centers, the ability to hot-swap hard drives is a supported feature in hardware and server operating systems such as Linux and Windows Server, and sysadmins routinely upgrade servers or replace a faulty components without interrupting business-critical services.

Even so, historically, decommissioning an individual drive on an HDFS DataNode has not been possible in Apache Hadoop. Instead, to replace a bad drive or upgrade it with a larger one, the user had to decommission the entire DataNode. That process could take hours to complete, significantly affecting system availability and operational effectiveness.

Happily, in Apache Hadoop 2.6, HDFS has added support for hot-swapping disks on a DataNode; this feature was developed by Cloudera committers under the umbrella JIRA HDFS-1362. And starting in release 5.4.0, DataNode hot-swapping is supported in Cloudera Enterprise.

To ensure that the disk changes are persistent between DataNode restarts, the hot-swap drive feature is performed using the Reconfigurable framework introduced in HADOOP-7001. In summary, the reconfigurable framework enables HDFS daemons changing configurations without restarting the daemon. So adding and/or removing disks on a DataNode involves two steps:

  1. Modifying the dfs.datanode.data.dir property in the DataNode configuration to reflect the disk changes
  2. Asking the DataNode to reload its configuration and apply the configuration changes

It’s worth mentioning that when adding new disks, if the data directory does not exist, the DataNode must have write access to create the directory. Moreover, because we anticipate that the most common use case for the hot-swap disk feature is to replace bad disks, the hot-swapping procedure does not attempt to move the blocks to other DataNodes, as is done when decommissioning a node. That being said, it is advisable to remove no more than N-1 disks simultaneously, where N is the system-wide replication factor. We also recommend running “hdfs fsck” after swapping a disk out.

In the future, adding a -safe flag to the reconfig command will advise the DataNode to relocate the blocks before swapping out the volume.

How to Hot Swap

Assume that you have a DataNode (dn1.example.com), which currently manages two data volumes (/dfs/dn1,/dfs/dn2). In the event of a disk failure on the disk where /dfs/dn2 is located, the sysadmin will want to replace this bad drive with a new drive. Moreover, this new drive will be mounted on the same directory /dfs/dn2:

From a user’s perspective, there are two ways to achieve this task:

From the Command Line
  1. Remove /dfs/dn2 from the DataNode configuration file (i.e., hfds-site.xml).
    <pre class="code"> <property>   <name>dfs.datanode.data.dir</name>   <value>/dfs/dn1</value> </property>
  2. Ask the DataNode to reload its configurations and wait for this reconfiguration task to finish.
    $ hdfs dfsadmin -reconfig datanode dn1.example.com:50020 start # query the status of the reconfiguration task $ hdfs dfsadmin -reconfig datanode dn1.example.com:50020 status # Once the first line of the outputs shows as below, the reconfiguration task is completed. Reconfiguring status for DataNode[dn1.example.com:50020]: started at Tue Feb 10 15:09:52 PST 2015 and finished at Tue Feb 10 15:09:53 PST 2015. … SUCCESS: Change property dfs.datanode.data.dir      From: "file:///dfs/dn1,file:///dfs/dn2"      To: "file:///dfs/dn1" ...
  3. Unmount the disk from /dfs/dn2. Mount the new disk on /dfs/dn2.
  4. Add /dfs/dn2 back to the DataNode configuration file (hdfs-site.xml).
      <property>   <name>dfs.datanode.data.dir</name>   <value>/dfs/dn1,/dfs/dn2</value> </property>
  5. Run Step 2 again.
From Cloudera Manager

To make this process even easier, the DataNode Hot Swap Disk feature is supported in Cloudera Manager as of 5.4.0. To use it, follow these steps:

  1. Navigate to the Configuration page for the DataNode role in question.
  2. Edit the DataNode Data Directory configuration property: remove /dfs/dn2. Click “Save Changes.” Warning: Be sure to modify this property only for the DataNode role whose disk you are swapping. Do not modify the role group value for this property, as that will affect all DataNode roles in the group.

  3. Select “Refresh Data Directories” from the Actions menu in the upper right corner of the page (which should still be the configuration page for the DataNode role you are modifying).

  4. Confirm that the window that pops up refers to the correct DataNode role, and click Refresh Data Directories. This will run the reconfiguration task.

  5. Unmount the disk /dfs/dn2. Mount the new disk to /dfs/dn2.
  6. Reset the “DataNode Data Directory” property to add back the disk. Click “Save Changes,” as illustrated in Step 2.
  7. Repeat Steps 3 and 4.

To learn more details about hot swapping disks for DataNode, please see the official docs.

Conclusion

The DataNode Hot Swap Disk feature gives users a fine-grained tool for managing the health of their clusters. Moreover, Cloudera Manager offers the most convenient approach to monitor disk health and manage disk volumes as needed.

Lei Xu is a Software Engineer at Cloudera.

Categories: Hadoop

Apache Phoenix Joins Cloudera Labs

Cloudera Blog - Wed, 05/06/2015 - 20:23

We are happy to announce the inclusion of Apache Phoenix in Cloudera Labs.

Apache Phoenix is an efficient SQL skin for Apache HBase that has created a lot of buzz. Many companies are successfully using this technology, including Salesforce.com, where Phoenix first started.

With the news that Apache Phoenix integration with Cloudera’s platform has joined Cloudera Labs, let’s take a closer look at a few key questions surrounding Phoenix: What does it do? Why does anyone want to use it? How does it compare to existing solutions? Do the benefits justify replacing existing systems and infrastructure?

In this post, we’ll try to answers those questions by briefly introducing Phoenix and then discussing some of its unique features. I’ll also cover some use cases and compare Phoenix to existing solutions.

What is Apache Phoenix?

Phoenix adds SQL to HBase, the distributed, scalable, big data store built on Hadoop. Phoenix aims to ease HBase access by supporting SQL syntax and allowing inputs and outputs using standard JDBC APIs instead of HBase’s Java client APIs. It lets you perform all CRUD and DDL operations such as creating tables, inserting data, and querying data. SQL and JDBC reduce the amount of code users need to write, allow for performance optimizations that are transparent to the user, and opens the door to leverage and integrate lots of existing tooling.

Internally, Phoenix takes your SQL query, compiles it into a series of native HBase API calls, and pushes as much work as possible onto the cluster for parallel execution. It automatically creates a metadata repository that provides typed access to data stored in HBase tables. Phoenix’s direct use of the HBase API, along with coprocessors and custom filters, results in performance on the order of milliseconds for small queries, or seconds for tens of millions of rows.

Use Cases

Phoenix is good for fast HBase lookups. Its secondary indexing feature supports many SQL constructs and make lookup via non-primary key fields more efficient than full table scans. It simplifies the creation and management of typed row-centric data by providing composite row keys and by enforcing constraints on data when written using the Phoenix interfaces.

Phoenix provides a way to transparently salt the row key, which helps in avoiding the RegionServer hotspotting often caused by monotonically increasing rowkeys. It also provides multi-tenancy via a combination of multi-tenant tables and tenant-specific connections. With tenant-specific connections, tenants can only access data that belongs to them, and with multi-tenant tables, they can only see their own data in those tables and all data in regular tables.

Regardless of these helpful features, Phoenix is not a drop-in RDBMS replacement. There are some limitations:

  • Phoenix doesn’t support cross-row transactions yet.
  • Its query optimizer and join mechanisms are less sophisticated than most COTS DBMSs.
  • As secondary indexes are implemented using a separate index table, they can get out of sync with the primary table (although perhaps only for very short periods.) These indexes are therefore not fully-ACID compliant.
  • Multi-tenancy is constrained—internally, Phoenix uses a single HBase table.
Comparisons to Hive and Impala

The other well known SQL alternatives to Phoenix on top of HBase are Apache Hive and Impala. There is significant overlap in the functionality provided by these products. For example, all of them follow SQL-like syntax and provide a JDBC driver.

Unlike Impala and Hive, however, Phoenix is intended to operate exclusively on HBase data; its design and implementation are heavily customized to leverage HBase features including coprocessors and skip scans.

Some other considerations include:

  • The main goal of Phoenix is to provide a high-performance relational database layer over HBase for low-latency applications. Impala’s primary focus is to enable interactive exploration of large data sets by providing high-performance, low-latency SQL queries on data stored in popular Hadoop file formats. Hive is mainly concerned with providing data warehouse infrastructure, especially for long-running batch-oriented tasks.
  • Phoenix is a good choice, for example, in CRUD applications where you need the scalability of HBase along with the facility of SQL access. In contrast, Impala is a better option for strictly analytic workloads and Hive is well suited for batch-oriented tasks like ETL.
  • Phoenix is comparatively lightweight since it doesn’t need an additional server.
  • Phoenix supports advanced functionality like multiple secondary-index implementations optimized for different workloads, flashback queries, and so on. Neither Impala nor Hive have any provision for supporting secondary index lookups yet.

The following table summarizes what we’ve discussed so far:

Future Work

The Phoenix project is investigating integration with transaction managers such as Tephra (from Cask). It is also trying to incorporate query optimizations based on the size and cardinality of the data. Although Phoenix supports tracing now, more work is needed to round out its monitoring and management capabilities.

Conclusion

Phoenix offers some unique functionality for a certain set of HBase use cases. You can begin playing with Phoenix by following the installation instructions (you will need HBase 1.0, which ships as part of CDH 5.4), and you can view the source code here

As with everything else in Cloudera Labs, Phoenix integration is not supported yet and it’s for experimentation only. If you have any feedback or comments, let us know via the Cloudera Labs discussion forum.

Srikanth Srungarapu is a Software Engineer at Cloudera, and an HBase committer.

Categories: Hadoop

Sneak Preview: HBaseCon 2015 Use Cases Track

Cloudera Blog - Fri, 05/01/2015 - 16:22

This year’s HBaseCon Use Cases track includes war stories about some of the world’s best examples of running Apache HBase in production.

As a final sneak preview leading up to the show next week, in this post, I’ll give you a window into the HBaseCon 2015′s (May 7 in San Francisco) Use Cases track.

Thanks, Program Committee!

  • “HBase @ Flipboard”

    Sang Chi, Jason Culverhouse, Matt Blair (Flipboard)

    Flipboard services over 100 million users using heterogenous results including user generated content, interest profile, algorithmically generated content, social firehose, friends graph, ads, and web/rss crawlers. To personalize and serve these results in real time, Flipboard employs a variety of data models, access patterns and configuration. This talk will present how some of these strategies are implemented using HBase.

  • “Graph Processing of Stock Market Order Flow in HBase on AWS”

    Aaron Carreras (FINRA)

    In this session, we will briefly cover the FINRA use case and then dive into our approach with a particular focus on how we leverage HBase on AWS. Among the topics covered will be our use of HBase Bulk Loading and ExportSnapShots for backup. We will also cover some lessons learned and experiences of running a persistent HBase cluster on AWS.

  • “Running ML Infrastructure on HBase” 

    Andrey Gusev (Sift Science)

    Sift Science uses online, large-scale machine learning to detect fraud for thousands of sites and hundreds of millions of users in real-time. This talk describes how we leverage HBase to power an ML infrastructure including how we train and build models, store and update model parameters online, and provide real-time predictions. The central pieces of the machine learning infrastructure and the tradeoffs we made to maximize performance will also be covered.

  • “Industrial Internet Case Study using HBase and TSDB”

    Shyam Nath, Arnab Guin (GE)

    This case study involves analysis of high-volume, continuous time-series aviation data from jet engines that consist of temperature, pressure, vibration and related parameters from the on-board sensors, joined with well-characterized slowly changing engine asset configuration data and other enterprise data for continuous engine diagnostics and analytics. This data is ingested via distributed fabric comprising transient containers, message queues and a columnar, compressed storage leveraging OpenTSDB over Apache HBase.

  • “S2Graph: A Large-scale Graph Database with HBase”

    Doyung Yoon, Taejin Chin (DaumKakao)

    As the operator of the dominant messenger application in South Korea, KakaoTalk has more than 170 million users, and our ever-growing graph has more than 10B edges and 200M vertices. This scale presents several technical challenges for storing and querying the graph data, but we have resolved them by creating a new distributed graph database with HBase. Here you’ll learn the methodology and architecture we used to solve the problems, compare it another famous graph database, Titan, and explore the HBase issues we encountered.

  • “HBase @ CyberAgent”

    Toshihiro Suzuki, Hirotaka Kakishima (CyberAgent)

    CyberAgent is a leading Internet company in Japan focused on smartphone social communities and a game platform known as Ameba, which has 40M users. In this presentation, we will introduce how we use HBase for storing social graph data and as a basis for ad systems, user monitoring, log analysis, and recommendation systems.

  • “Blackbird Collections: In-situ Stream Processing in HBase”

    Ishan Chhabra, Nitin Aggarwal, Venkata Deepankar Duvvuru (Rocket Fuel)

    Blackbird is a large-scale object store built at Rocket Fuel, which stores 100+ TB of data and provides real time access to 10 billion+ objects in a 2-3 milliseconds at a rate of 1 million+ times per second. In this talk (an update from HBaseCon 2014), we will describe Blackbird’s comprehensive collections API and various examples of how it can be used to model collections like sets, maps, and aggregates on these collections like counters, etc. We will also illustrate the flexibility and power of the API by modeling custom collection types that are unique to the Rocket Fuel context.

  • “NRT Event Processing with Guaranteed Delivery of HTTP Callbacks”

    Alan Steckley (Salesforce.com), Poorna Chandra (Cask Data)

    At Salesforce, we are building a new service, code-named Webhooks, that enables our customers’ own systems to respond in near real-time to system events and customer behavioral actions from the Salesforce Marketing Cloud. The application should process millions of events per day to address the current needs and scale up to billions of events per day for future needs, so horizontal scalability is a primary concern. In this talk, we will discuss how Webhooks is built using HBase for data storage and Cask Data Application Platform (CDAP), an open source framework for building applications on Hadoop.

That concludes our sneak previews series. Better register really soon!

Thank you to our sponsors – Bloomberg, Cask Data, Hortonworks, MapR, Pepperdata, Salesforce.com (Gold); Facebook, Google (Silver); Apache Software Foundation, The Hive Think Tank (Community); O’Reilly Media (Media) — without which HBaseCon would be impossible!

Categories: Hadoop

How-to: Install Cloudera Navigator Encrypt 3.7.0 on SUSE 11 SP2 and SP3

Cloudera Blog - Thu, 04/30/2015 - 15:50

Installing Cloudera Navigator Encrypt on SUSE is a one-off process, but we have you covered with this how-to.

Cloudera Navigator Encrypt, which is integrated with Cloudera Navigator governance software, provides massively scalable, high-performance encryption for critical Apache Hadoop data. It leverages industry-standard AES-256 encryption and provides a transparent layer between the application and filesystem. Navigator Encrypt also includes process-based access controls, allowing authorized Hadoop processes to access encrypted data, while simultaneously preventing admins or super-users like root from accessing data that they don’t need to see.

Navigator Encrypt is distributed in two different packages: the kernel module, and the binaries (cli commands) and configuration files. Current supported distributions are debian-7-x64, rhel-5-x64, rhel-6-x64, sles-11-x64, ubuntu-12.04-x64, and ubuntu-14.04-x64. As SUSE has a specific way to build and distribute RPMs for any external kernel module, this post explains how to install Navigator Encrypt 3.7.0 specifically on SLES 11 SP2 and SP3.

Understanding KMPs

For nearly all platforms, the traditional way to install Navigator Encrypt and its kernel module is to issue:

zypper | apt-get | yum install navencrypt

or any package manager equivalent. In these cases, the Navigator Encrypt kernel module uses dkms to build the kernel module at installation time.

This strategy doesn’t work with SUSE, however, which doesn’t support dkms and which handles external kernel modules in a unique manner. Because the process to build the kernel module manually is tedious, the easiest way to install the kernel module is by distributing it already built.

Fortunately, SUSE provides a build tool (openSUSE Build Service, or OBS) that creates RPM packages containing the pre-built kernel module; this tool is free and can be found at build.opensuse.org. A SUSE package created with this tool is called a kernel module package (KMP). (To learn more about how to build KMPs, see the openSUSE build service user guide.)

For KMP names, SUSE recommends using a naming convention based on the company name and a short package name (example: cloudera-zncryptfs-kmp-default-3.4.2_3.0.13_0.27-15.1.x86_64.rpm). To clarify which packages belong to SP2 and SP3, Cloudera has renamed the KMP by adding “SPx” to the package name, as in: cloudera-zncryptfs-kmp-SP2-default-3.4.2_3.0.13_0.27-15.1.x86_64.rpm.

KMPs are designed to maintain compatibility among all kernel versions for a specific SUSE version (SP2, SP3, etc.). SUSE assures us that if there is a kernel upgrade the kABI symbols will not change; those symbols will have the same symbol version (checksum) for all the kernels supported for a specific SUSE Version (SP2, for example). Thus the same installed kernel module will work after the upgrade without the need to rebuild or upgrade it.

A Navigator Encrypt kernel module only needs to be re-installed when there is an upgrade from SP2 to SP3. The reason for that re-install is that the SP2 cloudera-zncryptfs kernel module is incompatible with SP3—instead, you would need to install the SP3 cloudera-zncryptfs KMP.

Navigator Encrypt for SUSE doesn’t have an implicit dependency on the zncrypt-kernel-module anymore, so it has to be installed independently and based on the kernel where it is going to run. Cloudera packages are named to make it easy to know which version to use; for example, the package cloudera-zncryptfs-kmp-SP2-default-3.4.2_3.0.13_0.27-15.1.x86_64.rpm corresponds to SP2 and cloudera-zncryptfs-kmp-SP3-default-3.4.2_3.0.76_0.11-10.2.x86_64.rpm corresponds to SP3.

SUSE also maintains a list of supported kernels and their versions that you can use to verify compatibility between the KMP version and your installed kernel. Just select the service pack that interests you and then click on the “Kernel:” drop-down list to see all the kernels. (Note: the latest SP3 kernel update used slightly different numbers for i586 vs. x86_64 architectures. The update released for i586 was 3.0.101-0.42.1, and the update released for x86_64 was 3.0.101-0.46.1. This is why you see both in the list. Anyway, for an SP3 kernel x86_x64, you will never see a 3.0.101-0.42.1 version. Navigator Encrypt only supports x86_64.)

Cloudera is a SUSE partner and thus cloudera-zncryptfs is part of the Solid Driver Program, ensuring kernel driver compatibility. To check if the SUSE kernel is tainted, look at the variable:

#cat /proc/sys/kernel/tainted 0

where 0 means the kernel is not tainted. However, because cloudera-zncryptfs is a supported kernel module, a tainted kernel is tagged with a specific value:

#cat /proc/sys/kernel/tainted< 2847453646

(To learn more about a tainted kernel here.)

When looking at the kernel module info, you will also see that it has a tag identifying the support as external to SUSE (“supported:  external”).

# modinfo zncryptfs.ko filename:    zncryptfs.ko license:     GPL v2 description: zncryptfs (v3.4.2-0) author:      Sergio Pena srcversion:  6C5F956ACF58A518FF5DAC9 depends:        supported:   external vermagic:    3.0.101-0.46-default SMP mod_unload modversions parm:        zncryptfs_stats_enabled:Enable data statistics (int) parm:        zncryptfs_acl_cache_size:Set ACL cache size (int) parm:        zncryptfs_acl_verbose:Display more information on an access denied (int) parm:        zncryptfs_debug:Display debug information (int)

Currently, Navigator Encrypt supports SUSE 11 SP2 and SP3. (SLES 11 SP1 is not supported, nor is SLES 12.) The Cloudera stable repo for SUSE/OpenSUSE can be found at here.

Here is a list of cloudera-zncryptfs KMPs built for SP2:

cloudera-zncryptfs-kmp-SP2-default-3.4.2_3.0.13_0.27-15.1.x86_64.rpm
cloudera-zncryptfs-kmp-SP2-xen-3.4.2_3.0.13_0.27-15.1.x86_64.rpm    
cloudera-zncryptfs-kmp-SP2-ec2-3.4.2_3.0.13_0.27-15.1.x86_64.rpm    

And here is a list of cloudera-zncryptfs KMPs built for SP3:

cloudera-zncryptfs-kmp-SP3-default-3.4.2_3.0.76_0.11-10.2.x86_64.rpm
cloudera-zncryptfs-kmp-SP3-xen-3.4.2_3.0.76_0.11-10.2.x86_64.rpm
cloudera-zncryptfs-kmp-SP3-ec2-3.4.2_3.0.76_0.11-10.2.x86_64.rpm

What flavor is your kernel? It mostly depends on your hardware. Learn more about kernel flavors here.

Installation Process

Identify the cloudera-zncryptfs KMPs that you will install for SP2 or SP3. You also need to identify the flavor.

Add the cloudera archive as specified in the Navigator Encrypt user guide:

 # vi /etc/zypp/repos.d/cloudera.repo [cloudera_stable] name=SLES $releasever - cloudera.com baseurl=https://USER:PASSWORD@archive.gazzang.com/sles/stable/11 enabled=1 gpgcheck=1 gpgkey=http://archive.gazzang.com/gpg_gazzang.asc  # sudo rpm --import http://archive.gazzang.com/gpg_gazzang.asc

In this example, we are installing Navigator Encrypt for SLES 11 SP3. Let’s install its corresponding KMP:

#zypper install https://archive.gazzang.com/sles/stable/11/Packages/cloudera-zncryptfs-kmp-SP3-default-3.4.2_3.0.76_0.11-14.13.x86_64.rpm Refreshing service 'nu_novell_com'. Loading repository data... Reading installed packages... Resolving package dependencies... The following package is going to be upgraded:   cloudera-zncryptfs-kmp-default The following package is not supported by its vendor:   cloudera-zncryptfs-kmp-SP3-default 1 package to upgrade. Overall download size: 265.0 KiB. No additional space will be used or freed after the operation. Continue? [y/n/? shows all options] (y): Retrieving package cloudera-zncryptfs-kmp-SP3-default-3.4.2_3.0.76_0.11-14.13.x86_64 (1/1), 265.0 KiB (1.5 MiB unpacked) Retrieving package cloudera-zncryptfs-kmp-SP3-default-3.4.2_3.0.76_0.11-14.13.x86_64 (1/1), 265.0 KiB (1.5 MiB unpacked) Installing: cloudera-zncryptfs-kmp-SP3-default-3.4.2_3.0.76_0.11-14.13 [done]

(Note: The previous command-line example is for the build revision number 14.13. That number might change, so please check the stable repo for the newest build.)

zncryptfs.ko is installed at /lib/modules/3.0.76-0.11-default/updates/zncryptfs.ko and it becomes a weak-update of your current kernel module.

Current kernel module in this example is:

# uname -r 3.0.101-0.46-default

The weak-update has now a symlink to the kernel module installed:

# ll /lib/modules/3.0.101-0.46-default/weak-updates/updates/zncryptfs.ko lrwxrwxrwx 1 root root 53 Feb 17 09:17 /lib/modules/3.0.101-0.46-default/weak-updates/updates/zncryptfs.ko -> /lib/modules/3.0.76-0.11-default/updates/zncryptfs.ko

This is a good moment to check for the modinfo of our installed kernel module:

# modinfo /lib/modules/3.0.101-0.46-default/weak-updates/updates/zncryptfs.ko filename: /lib/modules/3.0.101-0.46-default/weak-updates/updates/zncryptfs.ko license: GPL v2 description: zncryptfs (v3.4.2-0) author: Sergio Pena srcversion: 6C5F956ACF58A518FF5DAC9 depends: supported: external vermagic: 3.0.76-0.11-default SMP mod_unload modversions parm: zncryptfs_stats_enabled:Enable data statistics (int) parm: zncryptfs_acl_cache_size:Set ACL cache size (int) parm: zncryptfs_acl_verbose:Display more information on an access denied (int) parm: zncryptfs_debug:Display debug information (int)

Our module is now ready to work on our installed kernel. Next, we can install the Navigator Encrypt binaries:

#zypper install navencrypt -y

You can register Navigator Encrypt against a Key Trustee server:

# navencrypt register -s keytrustee.server.net -o ORGANIZATION -a AUTH_CODE Choose MASTER key type: 1) Passphrase (single) 2) Passphrase (dual) 3) RSA private key Select: 1 Type MASTER passphrase: Verify MASTER passphrase: Registering navencrypt v3.6.0 (r44)... * Registering keytrustee GPG keys on 'keytrustee.server.net'... * Uploading MASTER key... * Uploading CONTROL content... * Creating control file... Navencrypt is now registered.

SUSE has a flag that allows external kernel modules to load. Set this flag to 1 as specified in the user guide:

# vi /etc/modprobe.d/unsupported-modules # # Every kernel module has a flag 'supported'. If this flag is not set loading # this module will taint your kernel. You will not get much help with a kernel # problem if your kernel is marked as tainted. In this case you firstly have # to avoid loading of unsupported modules. # # Setting allow_unsupported_modules 1 enables loading of unsupported modules # by modprobe, setting allow_unsupported_modules 0 disables it. This can # be overridden using the --allow-unsupported-modules command line switch. allow_unsupported_modules 1

If you don’t do that, you won’t be able to prepare any mount point, and a message like the following will appear:

# navencrypt-prepare /mnt/t1 /mnt/t1 The Navigator Encrypt kernel module is marked as unsupported on this kernel. Please set allow_unsupported_modules to 1 in /etc/modprobe.d/unsupported-modules

Then, prepare a first mount point:

# navencrypt-prepare /mnt/t1 /mnt/t1 Type MASTER passphrase: Encryption Type: eCryptfs Cipher: aes Key Size: 256 Random Interface: OpenSSL Filesystem: ext3 Options: Verifying MASTER key against KeyTrustee (wait a moment) ... OK Generation Encryption Keys with OpenSSL ... OK Registering Encryption Keys (wait a moment) ... OK Mounting /mnt/t1 ... OK

Verify that it is actually mounted:

# mount … (rw,ecryptfs_sig=b3fc17166dc9f34c,ecryptfs_cipher=aes,ecryptfs_key_bytes=32,ecryptfs_cipher=aes,ecryptfs_key_bytes=32,ecryptfs_enable_filename_crypto=n) suse:~ #

A quick encryption test adding a universal rule can be done:

# mkdir encrypted # vi encrypted/file # navencrypt-move encrypt @test encrypted/file /mnt/t1 Type MASTER passphrase: Size to encrypt: 4 KB Moving from: '/root/encrypted/file' Moving to: '/mnt/t1/test/root/encrypted/file' 100% [=======================================================>] [ 18 B] Done. # cat encrypted/file cat: encrypted/file: Permission denied # navencrypt acl --add --rule="ALLOW @* * *" Type MASTER passphrase: 1 rule(s) were added # cat encrypted/file encrypted content # navencrypt-move decrypt encrypted/file Type MASTER passphrase: Size to decrypt: 12 KB Moving from: '/mnt/t1/test/root/encrypted/file' Moving to: '/root/encrypted/file' 100% [=======================================================>] [ 18 B] Done. # /etc/init.d/navencrypt-mount restart Stopping navencrypt directories * Umounting /mnt/t1 ... done * Unloading module ... done Starting navencrypt directories * Mounting /mnt/t1 ... done suse:~ #

Congratulations, you have just installed Navigator Encrypt on SLES 11 SP3!

Alex Gonzalez is a Software Engineer at Cloudera.

Categories: Hadoop

How-to: Translate from MapReduce to Apache Spark (Part 2)

Cloudera Blog - Tue, 04/28/2015 - 16:39

The conclusion to this series covers Combiner-like aggregation functionality, counters, partitioning, and serialization.

Apache Spark is rising in popularity as an alternative to MapReduce, in a large part due to its expressive API for complex data processing. A few months ago, my colleague, Sean Owen wrote a post describing how to translate functionality from MapReduce into Spark, and in this post, I’ll extend that conversation to cover additional functionality.

To briefly reiterate, MapReduce was originally designed for batch Extract Transform Load (ETL) operations and massive log processing. MapReduce relies on processing key-value pairs in map and reduce phases. Each phase has the following actions:

  1. Map: Emits 0, 1, or more key-values pairs as output for every input.
  2. Shuffle: Groups key-value pairs with the same keys by shuffling data across the cluster’s network.
  3. Reduce: Operates on an iterable of values associated with each key, often performing some kind of aggregation.

To perform complex operations, many Map and Reduce phases must be strung together. As MapReduce became more popular, its limitations with respect to complex and iterative operations became clear.

Spark provides a processing API based around Resilient Distributed Datasets (RDDs.) You can create an RDD by reading in a file and then specifying the sequence of operations you want to perform on it, like parsing records, grouping by a key, and averaging an associated value. Spark allows you to specify two different types of operations on RDDs: transformations and actions. Transformations describe how to transform one data collection into another. Examples of transformations include map, flatMap, and groupByKey. Actions require that the computation be performed, like writing output to a file or printing a variable to screen.

Spark uses a lazy computation model, which means that computation does not get triggered until an action is called. Calling an action on an RDD triggers all necessary transformations to be performed. This lazy evaluation allows Spark to smartly combine operations and optimize performance.

As an aid to the successful production deployment of a Spark cluster, in the rest of the blog post, we’ll explore how to reproduce functionality with which you may already be familiar from MapReduce in Spark. Specifically, we will cover combiner-like aggregation functionality, partitioning data, counter-like functionality, and the pluggable serialization frameworks involved.

reduceByKey vs Combiner

This simple Mapper featured in Sean’s blog post:

public class LineLengthMapper     extends Mapper {   @Override   protected void map(LongWritable lineNumber, Text line, Context context)       throws IOException, InterruptedException {     context.write(new IntWritable(line.getLength()), new IntWritable(1));   } }

…is part of a job that counts lines of text by their length. It’s simple, but inefficient: The Mapper writes a length and count of 1 for every line, which is then written to disk and shuffled across the network, just to be added up on the Reducer. If there are a million empty lines, then a million records representing “length 0, count 1” will be copied across the network, just to be collapsed into “length 0, count 1000000” by a Reducer like the one also presented last time:

public class LineLengthReducer     extends Reducer {   @Override   protected void reduce(IntWritable length, Iterable counts, Context context)       throws IOException, InterruptedException {     int sum = 0;     for (IntWritable count : counts) {       sum += count.get();     }     context.write(length, new IntWritable(sum));   } }

For this reason, MapReduce has the notion of a Combiner. A Combiner is an optimization that works like a Reducer—in fact, it must be an implementation of Reducer—that can combine multiple records on the Mapper side, before those records are written anywhere. It functions like a miniature Reducer preceding the shuffle. A Combiner must be commutative and associative, which means that its result must be the same no matter the order in which it combines records. In fact, LineLengthReducer itself could be applied as the Combiner in this MapReduce job, as well as being the Reducer.

Back to Spark. A terse and literal—but certainly not optimal—translation of LineLengthMapper and LineLengthReducer is:

linesRDD.mapPartitions { lines => lines.map(line => (line.length, 1)) }.groupByKey().mapValues(_.sum)

The Mapper corresponds to mapPartitions, the shuffle to groupByKey, and the Reducer to the mapValues call. A likewise literal translation of the Combiner would inject its logic at the end of the Mapper’s analog, mapPartitions:

linesRDD.mapPartitions { lines =>   val mapResult = lines.map(line => (line.length, 1))   mapResult.toSeq.groupBy(_._1).mapValues(_.map(_._2).sum).toIterator }.groupByKey().mapValues(_.sum)

The new code uses Scala’s Collections API; these are not Spark operations. As mentioned previously, the new code actually implements exactly the same logic. It’s easy to see the resemblance in the expression of the two, since Spark mimics many of Scala’s APIs.

Still, it’s clunky. The essence of the operation is summing counts, and to know how to sum many counts it’s only necessary to know how to sum two counts, and apply that over and over until just one value is left. This is what a true reduce operation does: from a function that makes two values into one, it makes many values into one.

In fact, if just given the reduce function, Spark can intelligently apply it so as to get the effect of the Combiner and Reducer above all at once:

linesRDD.mapPartitions { lines =>   val mapResult = lines.map(line => (line.length, 1)) }.reduceByKey(_ + _)

_ + _ is shorthand for a function of two arguments that returns their sum. This is a far more common way of expressing this operation in Spark, and under the hood, Spark will be able to apply the reduce function before and after a shuffle automatically. In fact, without the need to express the Combiner’s counterpart directly in the code, it’s also no longer necessary to express how to map an entire partition with mapPartitions, since it’s implied by expressing how to map an element at a time:

linesRDD.map(line => (line.length, 1)).reduceByKey(_ + _)

The upshot is that, when using Spark, you’re often automatically using the equivalent of a Combiner. For the interested, a few further notes:

  • reduceByKey is built on a more general operation in Spark, called combineByKey, which allows values to be transformed at the same time.
  • For those who really are counting values, there is an even more direct Spark method for this: linesRDD.map(_.length).countByValue()
  • And if speed is more important than accuracy, there is a much faster approximate version that relies on the HyperLogLog algorithm: linesRDD.map(_.length).countByValueApprox()
Partitioning and Grouping Data

Both Spark and MapReduce support partitioning of key-value data by key. How data is split into chunks and in turn tasks by the processing framework has a large effect on the performance of common data operations like joining disparate data sets or doing per-key aggregations.

In MapReduce, you can specify a partitioner that determines how key-value pairs are split up and organized amongst the reducers. A well-designed partitioner will approximately evenly distribute the records between the reducers. Both MapReduce and Spark use hash partitioning as their default partitioning strategy, though there are separate implementations for MapReduce and Spark. Hash partitioning works by assigning pairs to partitions based on the hash value of the key. In MapReduce and Spark the partition a key-value pair is assigned to the hashCode() method modulo the number of partitions you are creating. The hope is that the hashing function will evenly distribute your keys in the hash-space and you should end up with approximately evenly distributed data between partitions.

A common issue in distributed programs with per-key aggregation is seeing a long tail in the distribution of the number of records assigned to reducers, and having “straggler” reducers that take much more time to complete than the rest. You can often resolve this problem by specifying a different, potentially custom partitioner. To do this in MapReduce, you can define your own customer partitioner by extending Partitioner and specifying your custom Partitioning class in the job configuration. This can be done in the configuration file, or programmatically with conf.setPartitionerClass(MyPartitioner.class).

In Spark, there are operations that benefit from partitioning as well as operations can modify partitioning. The following table explains what types of transformations can affect partitioning and how.

 

Counters

MapReduce allows you to count things that happen in your job, and then query that count later. To define customer counters in MapReduce, you first need to define an Enum that describes the counters you will track. Imagine you are using Jackson (org.codehaus.jackson) to parse JSON into a Plain Old Java Object (POJO) using a jackson ObjectMapper. In doing so, you may encounter a JsonParseException or JsonMappingException, and you would like to track how many of each you see. So, you will create an enum that contains an element for both of these possible exceptions:

public static enum JsonErr {       PARSE_ERROR,       MAPPING_ERROR }

Then, in the map method of your map class you would have

public void map(LongWritable key, Text value, Context context)     throws IOException, InterruptedException {    try { /* Parse a string into a POJO */    } catch (JsonParseException parseException) { context.getCounter(JsonErr.PARSE_ERROR).increment(1L)    } catch (JsonMappingException mappingException) { context.getCounter(JsonErr.MAPPING_ERROR).increment(1L);    } }

All counters that get incremented during the course of a job will be reported to the JobTracker and displayed in the JobTracker Web UI, along with the default I/O counters. You can also access the counters from the MapReduce driver program, from the Job you create using your Configuration.

Spark exposes Accumulators, which can be used as counters, but more generally support any associative operation. Thus, you can go beyond incrementing and decrementing by integers toward summing arbitrary floating-point numbers—or even better, actually collecting samples of parsing errors you encounter.

If you were to do a literal translation of this parsing-error count to Spark it would look like:

/** Define accumulators. */ val parseErrorAccum = sc.accumulator(0, "JsonParseException") val mappingErrorAccum = sc.accumulator(0, "JsonMappingException") /** Define a function for parsing records and incrementing accumulators when exceptions are thrown. */ def parse(line: String) = {   try { /* Parse a String into a POJO */   } catch {     case e: JsonParseException => mapErr += 1     case e: JsonMappingException => parseError += 1   } } /** Parse records in a transformation.*/ val parsedRdd = unparsedRdd.map(parse)

While there does not currently exist a good way to count while performing a transformation, Spark’s Accumulators do provide useful functionality for creating samples of parsing errors. This is very notably something that is more difficult to do in MapReduce. An alternative and useful strategy to take is to instead use reservoir sampling to create a sample of error messages associated with parsing errors.

Important Caveat About Accumulators

Now, you should be careful about how and when you use Accumulators. In MapReduce, increment actions on a counter executed during a task that later fails will not be counted toward the final value. MapReduce is careful to count correctly even when tasks fail or speculative execution occurs.

In Spark, the behavior of accumulators requires careful attention. It is strongly recommended that accumulators only be used in an action. Accumulators incremented in an action are guaranteed to only be incremented once. Accumulators incremented in a transformation can have their values incremented multiple times if a task or job stage is ever rerun, which is unexpected behavior for most users.

In the example below, an RDD is created and then mapped over while an accumulator is incremented. Since Spark uses a lazy evaluation model, these RDDs are only computed once an action is invoked and a value is required to be returned. Calling another action on myRdd2 requires that the preceding steps in the workflow are recomputed, incrementing the accumulator again.

scala> val myrdd = sc.parallelize(List(1,2,3)) myrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :12 scala> val myaccum = sc.accumulator(0, "accum") myaccum: org.apache.spark.Accumulator[Int] = 0 scala> val myRdd2 = myrdd.map(x => myaccum += 1) myRdd2: org.apache.spark.rdd.RDD[Unit] = MappedRDD[1] at map at :16 scala> myRdd2.collect res0: Array[Unit] = Array((), (), ()) scala> myaccum res1: org.apache.spark.Accumulator[Int] = 3 scala> myRdd2.collect res2: Array[Unit] = Array((), (), ()) scala> myaccum res3: org.apache.spark.Accumulator[Int] = 6

Beyond situations where Spark’s lazy evaluation model causes transformations to be reapplied and accumulators incremented, it is possible that tasks getting rerun because of a partial failure will cause accumulators to be incremented again. The semantics of accumulators are distinctly not a once-and-only-once (aka counting) model.

The problem with the example of counting parsing errors above is that it is possible for the job to complete successfully with no explicit errors, but the numeric results may not be valid—and it would be difficult to know either way. One of the most common uses of counters in MapReduce is parsing records and simultaneously counting errors, and unfortunately there is no way to reliably count using accumulators in Spark.

Serialization Frameworks

MapReduce and Spark both need to be able to take objects in the JVM and serialize them into a binary representation to be sent across the network when shuffling data. MapReduce uses a pluggable serialization framework, which allows users to specify their own implementation(s) of org.apache.hadoop.io.serializer.Serialization by setting io.serialization in the Hadoop configuration if they wish to use a custom serializer. HadoopWritable and Avro specific and reflection-based serializers are configured as the default supported serializations.

Similarly, Spark has a pluggable serialization system that can be configured by setting the spark.serializer variable in the Spark configuration to a class that extends org.apache.spark.serializer.Serializer. By default, Spark uses Java Serialization that works out of the box but is not as fast as other serialization methods. Spark can be configured to use the much faster Kryo Serialization protocol by setting spark.serializer to org.apache.spark.serializer.KryoSerializer and setting spark.kryo.registrator to the class of your own custom registrator, if you have one. In order to get the best performance out of Kryo, you should register the classes with a KryoRegistrator ahead of time, and configure Spark to use your particular Kryo registrator.

If you wanted to use Kryo for serialization and register a User class for speed, you would define your registrator like this.

import com.mycompany.model.User import org.apache.spark.serializer.KryoRegistrator class MyKryoRegistrator extends KryoRegistrator {   override def registerClasses(kryo: Kryo) {     kryo.register(classOf[User])   } }

You would then set spark.serializer to spark.KryoSerializer and spark.kryo.registrator to com.mycompany.myproject.MyKryoRegistrator. It is worth noting that if you are working with Avro objects, you will also need to specify the AvroSerializer class to serialize and deserialize. You would modify our Registrator code like so:

import com.mycompany.model.UserAvroRecord import org.apache.spark.serializer.KryoRegistrator class MyKryoRegistrator extends KryoRegistrator {   override def registerClasses(kryo: Kryo) {     kryo.register(classOf[UserAvroRecord], new AvroSerializer[UserAvroRecord]())   }

Note: while the data sent across the network using Spark will be serialized with the serializer you specify in the configuration, the closures of tasks will be serialized with Java serialization. This means anything in the closures of your tasks must be serializable, or you will get a TaskNotSerializableException.

For Spark to operate on the data in your RDD it must be able to serialize the function you specify in map, flatMap, combineByKey on the driver node, ship that serialized function to the worker nodes, deserialize it on the worker nodes, and execute it on the data. This is always done with Java Serialization, which means you can’t easily have Avro objects in the closure of function in Spark because Avro objects have not been Java serializable up until version 1.8.

Conclusion

As you hopefully observed, there are similarities but also important differences between MapReduce and Spark with respect to combiner-like aggregation functionality, partitioning, counters, and pluggable serialization frameworks. Understanding these nuances can help ensure that your Spark deployment is a long-term success.

Juliet Hougland is a Data Scientist at Cloudera.

Categories: Hadoop

Sneak Preview: HBaseCon 2015 Ecosystem Track

Cloudera Blog - Fri, 04/24/2015 - 17:03

This year’s HBaseCon Ecosystem track covers projects that are complementary to HBase (with a focus on SQL) such as Apache Phoenix, Apache Kylin, and Trafodion.

In this post, I’ll give you a window into the HBaseCon 2015′s (May 7 in San Francisco) Ecosystem track.

Thanks, Program Committee!

  • “HBase as an IoT Stream Analytics Platform for Parkinson’s Disease Research”

    Ido Karavany (Trafodion)

    In this session, you will learn about a solution developed in partnership between Intel and the Michael J. Fox foundation to enable breakthroughs in Parkinson’s disease (PD) research, by leveraging wearable sensors and smartphone to monitor PD patient’s motor movements 24/7. We’ll elaborate on how we’re using HBase for time-series data storage and integrating it with various stream, batch, and interactive technologies. We’ll also review our efforts to create an interactive querying solution over HBase.

  • “Apache Phoenix: The Evolution of a Relational Database Layer over HBase”

    James Taylor (Salesforce.com), Maryann Xue (Intel)

    Phoenix has evolved to become a full-fledged relational database layer over HBase data. We’ll discuss the fundamental principles of how Phoenix pushes the computation to the server and why this leads to performance enabling direct support of low-latency applications, along with some major new features. Next, we’ll outline our approach for transaction support in Phoenix, a work in-progress, and discuss the pros and cons of the various approaches. Lastly, we’ll examine the current means of integrating Phoenix with the rest of the Hadoop ecosystem.

  • “Analyzing HBase Data with Apache Hive”

    Swarnim Kulkarni (Cerner), Brock Noland (StreamSets), Nick Dimiduk (Hortonworks)

    Hive/HBase integration extends the familiar analytical tooling of Hive to cover online data stored in HBase. This talk will walk through the architecture for Hive/HBase integration with a focus on some of the latest improvements such as Hive over HBase Snapshots, HBase filter pushdown, and composite keys. These changes improve the performance of the Hive/HBase integration and expose HBase to a larger audience of business users. We’ll also discuss our future plans for HBase integration.

  • “Apache Kylin: Extreme OLAP Engine for Hadoop” 

    Seshu Adunuthula (eBay)

    Kylin is an open source distributed analytics engine contributed by eBay that provides a SQL interface and OLAP on Hadoop supporting extremely large datasets. Kylin’s pre-built MOLAP cubes (stored in HBase), distributed architecture, and high concurrency helps users analyze multidimensional queries via SQL and other BI tools. During this session, you’ll learn how Kylin uses HBase’s key-value store to serve SQL queries with relational schema.

  • “Optimizing HBase for the Cloud in Microsoft Azure HDInsight”

    Maxim Lukiyanov (Microsoft)

    Microsuft Azure’s Hadoop cloud service, HDInsight, offers Hadoop, Storm, and HBase as fully managed clusters. In this talk, you’ll explore the architecture of HBase clusters in Azure, which is optimized for the cloud, and a set of unique challenges and advantages that come with that architecture. We’ll also talk about common patterns and use cases utilizing HBase on Azure.

  • “Warcbase: Scaling ‘Out’ and ‘Down’ HBase for Web Archiving”

    Jimmy Lin (University of Maryland)

    Web archiving initiatives around the world capture ephemeral web content to preserve our collective digital memory. However, that requires scalable, responsive tools that support exploration and discovery of captured content. Here you’ll learn about why Warcbase, an open-source platform for managing web archives built on HBase, is one such tool. It provides a flexible data model for storing and managing raw content as well as metadata and extracted knowledge, tightly integrates with Hadoop for analytics and data processing, and relies on HBase for storage infrastructure.

  • “Trafodion: Integrating Operational SQL into HBase”

    Anoop Sharma, Rohot Jain (HP)

    Trafodion, open sourced by HP, reflects 20+ years of investment in a full-fledged RDBMS built on Tandem’s OLTP heritage and geared towards a wide set of mixed query workloads. In this talk, we will discuss how HP integrated Trafodion with HBase to take full advantage of the Trafodion database engine and the HBase storage engine, covering 3-tier architecture, storage, salting/partitioning, data movement, and more.

  • “SQL-on-HBase Smackdown: Panel”

    Julian Hyde (Hortonworks), Rohit Jain (HP), Dr. Ricardo Jimenez-Peris (LeanXscale), Joen Leach (Splice Machine), Jacques Nadeau (MapR), James Taylor (Salesforce.com)

    Nothing is hotter than SQL-on-Hadoop, and now SQL-on-HBase is fast approaching equal hotness status. In this panel, a panel of developers and architects deeply involved in this effort will discuss the work done so far across the ecosystem and the work still to be done.

Getting interested? Wait until you see the Use Cases sneak preview next week!

Thank you to our sponsors – Bloomberg, Cask Data, Hortonworks, MapR, Pepperdata, Salesforce.com (Gold); Facebook, Google (Silver); Apache Software Foundation, The Hive Think Tank (Community); O’Reilly Media (Media) — without which HBaseCon would be impossible!

Categories: Hadoop

New Cloudera Search Training: Learn Powerful Techniques for Full-Text Search on an EDH

Cloudera Blog - Fri, 04/24/2015 - 15:43

Cloudera Search combines the speed of Apache Solr with the scalability of CDH. Our newest training course covers this exciting technology in depth, from indexing to user interfaces, and is ideal for developers, analysts, and engineers who want to learn how to effectively search both structured and unstructured data at scale.

Despite being nearly 10 years old, Apache Hadoop already has an interesting history. Some of you may know that it was inspired by the Google File System and MapReduce papers, which detailed how the search giant was able to store and process vast amounts of data. Search was the original Big Data application, and, in fact, Hadoop itself was a spinoff of a project designed to create a reliable, scalable system to index data using one of Doug Cutting’s other creations: Apache Lucene.

Fortunately, many of Hadoop’s early developers and users had the vision to see that a system that offers scalable, cost-effective data storage and processing would become increasingly important in a world that generates data at ever-increasing rates. Hadoop ultimately became the center of an entire ecosystem of tools that made it easier to ingest, process, and analyze a variety of data. Concurrently, Yonik Seeley was creating what eventually became Apache Solr, which is also based on Apache Lucene. Cloudera Search brings these technologies together to enable full-text search at scale.

What is search, exactly? Many people picture Google’s home page, simply offering a text box for you to enter search terms and a button that submits your query. That’s certainly one application of search — and one that helps to illustrate how Cloudera Search can help users to find relevant information regardless of their technical ability — but it’s not the only one. In fact, search also encompasses analytical capabilities that allow you to explore your data interactively, as you can see below:

Economics and technology have converged to usher in the era of the enterprise data hub. As organizations become more data-driven, it’s more important than ever for decision makers to identify important trends. Cloudera Search is an ideal tool for this, and it’s often used for data discovery and exploration. Yet the ability to quickly drill down and find a specific result also makes it a good choice for detecting anomalies. Rather than finding a Web site as one might with internet search, an enterprise user would use Cloudera Search to analyze customer satisfaction through social media posts, identify a failing device using network sensor data, or uncover intrusion attempts hidden away in firewall logs.

Although our Designing and Building Big Data Applications course does include a section on Cloudera Search, many students wanted to cover Search in greater depth after they discovered how they could apply it in their own organizations. That’s why I’m pleased to announce the availability of Cloudera Search Training, a three-day course dedicated to showing you how to ingest, index, and query data at scale using this versatile technology.

If you’d like to learn more about this course, I invite you to join me for a webinar on Tuesday, April 28th. We’ll cover:

  • What Cloudera Search is and how it enables data discovery and analytics
  • How to perform indexing of data from various sources and in various formats
  • Who is best suited to attend the course and what prior knowledge you should have
  • The benefits Search delivers as part of an enterprise data hub

Register now for the webinar.

Tom Wheeler is a Senior Curriculum Developer for Cloudera University.

Categories: Hadoop

Cloudera Enterprise 5.4 is Released

Cloudera Blog - Thu, 04/23/2015 - 23:37

We’re pleased to announce the release of Cloudera Enterprise 5.4 (comprising CDH 5.4, Cloudera Manager 5.4, and Cloudera Navigator 2.3).

Cloudera Enterprise 5.4 (Release Notes) reflects critical investments in a production-ready customer experience through  governance, security, performance and deployment flexibility in cloud environments. It also includes support for a significant number of updated open standard components–including Apache Spark 1.3, Apache Kafka 1.3, Impala 2.2, and Apache HBase 1.0 (as well as unsupported beta releases of Hive-on-Spark data processing and OpenStack deployments).

Recently Cloudera made the upgrade process considerably easier via an improved CDH upgrade wizard; see details about that wizard here and best practices here. (Note: Due to metadata format changes in Apache Hadoop 2.6, upgrading to CDH 5.4.0 and later from any earlier release requires an HDFS metadata upgrade, as well.)

Here are some of the highlights (incomplete; see the respective Release Notes for CDH, Cloudera Manager, and Cloudera Navigator for full lists of features and fixes):

Security
  • SSL and Kerberos support in Apache Flume for the Thrift source and sink.
  • SSL support across Cloudera Search (Solr and all integrations with CDH).
  • Cluster-wide redaction of sensitive data in logs is now possible.
  • HBase impersonation in Hue allows your client to authenticate to HBase as any user, and to re-authenticate at any time.
  • Solr metadata stored in ZooKeeper can now be protected by ZooKeeper ACLs.
  • Kerberos is now supported for Apache Sqoop2.
Performance
  • Includes beta release of Hive-on-Spark as an option for improved Hive data processing performance (unsupported).
  • MultiWAL support for HBase RegionServers allows you to increase throughput when a region writes to the write-ahead log (WAL).
  • You can now store medium-sized objects (MOBs) up to 10MB in size directly in HBase while maintaining read and write performance.
  • A new Kafka connector for Spark Streaming avoids the need for the HDFS WAL.
  • Hue pages render much faster.
Data Management and Governance
  • Expanded coverage in Cloudera Navigator
    • Impala (CDH 5.4 and higher) lineage
    • Cloudera Search (CDH 5.4 and higher) auditing
    • Auditing of Navigator activity, such as audit views, metadata searches, and policy editing
    • Avro and Parquet schema inference
  • Platform enhancements
    • Redesigned metadata search provides autocomplete, faster filtering, and saved searches
    • SAML for single sign-on
Cloud Deployments
  • OpenStack deployments are now possible as an unsupported beta.
  • HBase support on Microsoft Azure.
Real-Time Architecture
  • Apache Kafka 1.3 installs by default and is supported for production use.
  • Spark Streaming now has a receiver-less “direct” connector for Kafka.
New or Updated Open Source Components
  • Apache Hadoop 2.6
  • Apache HBase 1.0
  • Apache Hive 1.1
  • Apache Kafka 1.3
  • Apache Oozie 4.1
  • Apache Solr 4.10.3
  • Apache Spark 1.3
  • Hue 3.7
  • Impala 2.2
  • Kite SDK 1.0
New/Updated OS & Java Support
  • RHEL 6.6/CentOS 6.6/OEL 6.6 (UEK3)
  • JDK8u40, JDK7u75

Over the next few weeks, we’ll publish blog posts that cover some of these features in detail. In the meantime:

As always, we value your feedback; please provide any comments and suggestions through our community forums. You can also file bugs via issues.cloudera.org.

Categories: Hadoop

Text Mining with Impala

Cloudera Blog - Wed, 04/22/2015 - 16:03

Thanks to Torsten Kilias and Alexander Löser of the Beuth University of Applied Sciences in Berlin for the following guest post about their INDREX project and its integration with Impala for integrated management of textual and relational data.

Textual data is a core source of information in the enterprise. Example demands arise from sales departments (monitor and identify leads), human resources (identify professionals with capabilities in ‘xyz’), market research (campaign monitoring from the social web), product development (incorporate feedback from customers), and the medical domain (anamnesis).

In this post, we describe In-Database Relation Extraction (INDREX), a system that transforms text data into relational data with Impala (the open source analytic database for Apache Hadoop), with low overhead needed for extraction, linking, and organization. Read this paper for complete details about our approach and our implementation.

Introduction to INDREX

Currently, mature systems exist for either managing relational data (such as Impala) or for extracting information from text (such as the Stanford CoreNLP). As a result, the user must ship data between systems. Moreover, transforming textual data in a relational representation requires “glue” and development time to bind different system landscapes and data models seamlessly. Finally, domain-specific relation extraction is an iterative task. It requires the user to continuously adopt extraction rules and semantics in both, the extraction system and the database system. As a result, many projects that combine textual data with existing relational data may likely fail and/or will become infeasible.

In contrast, INDREX is a single system for managing textual and relational data together. The figure below shows two possible system configurations for INDREX; both read and transform textual data into generic relations in an ETL approach and store results in base tables in the Parquet file format. Once generic relations are loaded, the INDREX user defines queries on base tables to extract higher-level semantics or to join them with other relational data.

The first configuration imitates state-of-the-art batch processing systems for text mining, such as Stanford Core NLP, IBM’s System-T, or GATE. These systems use HDFS for integrating text data and loading relational data from an RDBMS. The user poses queries against HDFS-based data with proprietary query languages or by writing transformation rules with user-defined functions in languages like Pig Latin, Java, or JAQL.

The second configuration utilizes Impala and INDREX. This approach permits users to describe relation extraction tasks, such as linking entities in documents to relational data, within a single system and with SQL. For providing this powerful functionality, INDREX extends Impala with a set of white-box user-defined functions that enable corpus-wide transformations from sentences into relations. As a result:

  • The user can utilize structured data from tables in Impala to adapt extraction rules to the target domain
  • No additional system is needed for relation extraction, and
  • INDREX can benefit from the full power of Impala’s built-in indexing, query optimization, and security model

Figure 1 shows the initial ETL (Steps 1, 2, and 3) and compares two implementations. Both setups read text data from HDFS sequence files (Step 1) and process the same base linguistic operations (Step 2). The system writes the output, generic domain independent relations, in highly compressed HDFS Parquet files (Step 3). The user refines these generic relations into domain-specific relations in an iterative process. [One implementation uses MapReduce on Hadoop (Step 4.1) and writes results into HDFS sequence files (Step 5.1).] The user analyzes the result from HDFS (Step 6) and may refine the query. INDREX permits also a much faster approach in Impala (Step 4.2) where the user inspects results (Step 5.2) and refines the query.
This approach benefits from optimizing query workflows in Impala.

Data Model

Each document, such as an email or a web page, comprises one or more sequence of characters. For instance, we denote the interval of characters from the last sentence with the type document:sentence. We call a sequence of characters attached with a semantic meaning a span. Our data model is based on spans and permits the user (or the application) to map multiple spans to an n-ary relation. For example, the relation PersonCareerAge(Person, Position, Age) requires to map spans to three attribute values. Our model is flexible enough to hold additional important structures for common relation extraction tasks, such as extracting document structures, shallow or deep syntactic structures, or structures for resolving entities within and across documents (see details in our paper).  

In practice, either a batch-based ETL process, or an ad-hoc query, may generate such spans.

The diagram above illustrates an example corpus of three documents. Each document consists of a single sentence. The figure shows the output after common language-specific annotations, such as tokenization, part-of-speech (POS) tagging, phrases chunking, dependencies, constituencies (between the phrases), OIE-relationship candidates, entities, and semantic relationships. Each horizontal bar represents an annotation with one span and each arrow an annotation with multiple spans.

The semantic relationship PersonCareerAge is an annotation with three spans. The spans “Torsten Kilias” in document 26 and “Torsten” in document 27 are connected by a cross-document co-reference annotation. The query in the right-bottom corner shows a join between a table, including positions about persons in an organization, and spans about relations of these persons that have been extracted from the three documents shown in the diagram.

Joining Text with Tables and Extracting Domain-Dependent Relations

Domain-dependent operations combine base features from above with existing entities from structured data (also called entity linkage). The user may add domain specific rules for extracting relationship or attribute types or attribute value ranges. In INDREX, these domain-specific operations are part of an iterative query process and are based on SQL. (Recall the example corpus, in which the user joins spans with persons and their positions from an existing table.)

The example query below in Impala demonstrates the extraction of the binary relationship type PersonCareer(Person, Position). After an initial training, writing these queries is as simple as writing SQL queries in Impala.

Aggregation and Group By Queries from Text Data

INDREX supports aggregations, such as MIN(), MAX(), SUM(), or AVG(), directly from text data and within Impala.

The query above shows the process of extracting a person’s age. In our simple example, the user expresses the age argument to the person as apposition, using a comma. The query applies span and consolidation operators (explained in our paper) and finally shows a distribution of age information for persons appearing in the text.   

The query below shows a simple person-age extractor. In addition, it provides a distribution for age-group information in a corpus. The outer SELECT statement computes the aggregation for each age-group observation, grouped and ordered by age. The nested SELECT statement projects two strings: the person and the age from conditions in the WHERE clause that implement the heuristic of a syntactic apposition, in this case represented by the pattern

.

Experimental Results

For a dataset, we chose the Reuters Corpus Volume 1 (RCV1), which is a standard evaluation corpus in the information retrieval community. It contains information approximately 800k news from 1997 and shows characteristic Zipfian distributions for text data and derived annotations: Only few relations appear frequent on many documents while most relations only appear in a few documents. (See our work here for a detailed analysis.) Querying such typical distributions for text data requires Impala’s optimizations for selective queries, parallel query execution, or data partitioning schemes. Overall, the raw corpus is approximately 2.5GB in size. 

Our generic relation extraction stack created more than 2,500 annotations per document on average or roughly 2 billion annotations for our 800k documents. After annotating the corpus with the Stanford CoreNLP pipeline and ClausIE, the annotated corpus achieved a size of approximately 107GB. Overall, we could observe that our linguistic base annotations increase the raw data by nearly two orders of magnitudes.

The testing compared two configurations, INDREX on Apache Pig and INDREX on Impala:

  • INDREX on Hadoop + Pig
    We ran Hadoop 2.3.0 and Pig 0.12 on a cluster with nine nodes. Each node consists of 24 AMD cores, each with 2.4 Ghz, 256GB RAM, and 24 disks. Overall, we had 9 x 24 cores available. The cluster runs the Ubuntu 12.0 operating system and CDH 5.1. We assigned Hadoop up to 200 cores for Map tasks and 100 cores for Reduce tasks. Each task consumes up to 8GB RAM. Reduce tasks start if 90 percent of the map tasks are complete.
  • INDREX on Impala
    We tested Impala version 1.2.3 on the same machine setup and can leverage up to 200 multiple cores too. Impala uses a pipeline-based query processing approach and retrieves main memory only on demand, such as for building in-memory hash tables to store intermediate results from an aggregation query. Profiling Impala revealed that the system did not retrieve more than 10GB during query execution. We implemented INDREX on top of Impala as so-called macros, which are basically white-box UDF implementations.   
  • Queries and measurements
    We investigated (1) the feasibility if INDREX permits the user to execute common query scenarios for text mining tasks, and, (2) the performance of INDREX for common query types. Our set of 27 benchmark queries included typical operations for text mining workflows, such as point and range selections at various attribute selectivities, local joins within the same sentence or the same document, joins with external domain data, and queries using user defined table generating functions (UDTs) or user defined aggregation functions (UDAs), as well as global aggregation queries across individual documents. (See queries and details in the appendix of this paper.).

The outcome is that Impala outperforms Hadoop/Pig by nearly two orders of magnitude for text mining query workflows.

The left chart above shows query execution times for both systems over all queries. We observe a similar runtime for Pig Latin across all queries. Each query in Pig reads tuples from disc. Furthermore, Pig always executes a full table scan over these tuples and, by design, always stores query results back to the distributed file system.

For Impala, however, we observed nearly two orders of magnitude faster query execution times because Impala conducts various optimizations that also benefit our iterative text mining workflows. For understanding this impressive performance, see the details for each individual operation.

The right chart compares the aggregated runtime for individual operations, such as complex and local queries (including local joins), joins with external data, aggregations, and selections.

Conclusion

In a business setting, there are significant costs for information extraction, including the labor cost of developing or adapting extractors for a particular business problem, and the throughput required by the system. Our extensive evaluations show that INDREX can return results for most text mining queries within a few seconds in Impala. Most operations on text data are embarrassingly parallel, such as local and join operations on a single sentence or operations on a single document.

Future work on INDREX will contain approaches that drastically simplify writing queries, such as semi-supervised learning approaches for entity-linkage tasks. Other directions are instant query refinements and instant results; see also our slide deck about INDREX.

INDREX is a project of the DATEXIS.COM research group of the Beuth University of Applied Sciences Berlin, Germany. The team conducts research at the intersection of Database Systems and Text-based Information Systems. Alexander Löser leads the DATEXIS group. His previous stations include HP Labs Bristol, the IBM Almaden Research Center, and the research division of the SAP AG. Torsten Kilias is a second-year PhD student with Alexander and conducts work in the area of scalable in-database text mining.

Categories: Hadoop

Sneak Preview: HBaseCon 2015 Development & Internals Track

Cloudera Blog - Fri, 04/17/2015 - 16:23

This year’s HBaseCon Development & Internals track covers new features in HBase 1.0, what’s to come in 2.0, best practices for tuning, and more.

In this post, I’ll give you a window into the HBaseCon 2015′s (May 7 in San Francisco) Development & Internals track.

Thanks, Program Committee!

  • “Meet HBase 1.0″

    Enis Söztutar (Hortonworks) & Solomon Duskis (Google)

    HBase 1.0 is the new stable major release, and the start of “semantic versioned” releases. We will cover new features, changes in behavior and requirements, source/binary and wire compatibility details, and upgrading. We’ll also dive deep into the new standardized client API in 1.0, which establishes a separation of concerns, encapsulates what is needed from how it’s delivered, and guarantees future compatibility while freeing the implementation to evolve.

  • “HBase 2.0 and Beyond: Panel”

    Matteo Bertozzi (Cloudera), Sean Busbey (Cloudera), Jingcheng Du (Intel), Lars Hofhansl (Salesforce.com), Jon Hsieh (Cloudera), Enis Söztutar (Hortonworks) & Jimmy Xiang (Cloudera) (Pinterest)

    Now that you’ve seen Base 1.0, what’s ahead in HBase 2.0, and beyond—and why? Find out from this panel of people who have designed and/or are working on 2.0 features.

  • “HBase Performance Tuning”

    Lars Hofhansl (Salesforce.com)

    At Salesforce, we have deployed many thousands of HBase/HDFS servers, and learned a lot about tuning during this process. This talk will walk you through the many relevant HBase, HDFS, Apache ZooKeeper, Java/GC, and Operating System configuration options and provides guidelines about which options to use in what situation, and how they relate to each other.

  • “Solving HBase Performance Problems with Apache HTrace” 

    Abraham Elmahrek & Colin McCabe (Cloudera)

    HTrace is a new Apache incubator project which makes it much easier to diagnose and detect performance problems in HBase. It provides a unified view of the performance of requests, following them from their origin in the HBase client, through the HBase region servers, and finally into HDFS. System administrators can use a central web interface to query and view aggregate performance information for the whole cluster. This talk will cover the motivations for creating HTrace, its design, and some examples of how HTrace can help diagnose real-world HBase problems.

  • “HBase and Spark”

    Ted Malaska (Cloudera)

    In this session, learn how to build an Apache Spark or Spark Streaming application that can interact with HBase. In addition, you’ll walk through how to implement common, real-world batch design patterns to optimize for performance and scale.

  • “Events @ Box: Using HBase as a Message Queue”

    David Mackenzie (Box)

    Box’s /events API powers our desktop sync experience and provides users with a realtime, guaranteed-delivery event stream. To do that, we use HBase to store and serve a separate message queue for each of 30+ million users. Learn how we implemented queue semantics, were able to replicate our queues between clusters to enable transparent client failover, and why we chose to build a queueing system on top of HBase.

  • “State of HBase Docs and How to Contribute”

    Misty Stanley-Jones (Cloudera)

    In this session, learn about the move to Asciidoc in HBase docs, some of the other notable changes lately, and things we’ve done to make it easier for you to contribute to the docs.

  • “Reusable Data Access Patterns with CDAP Datasets”

    Gary Helmling (Cask Data)

    In this talk, you’ll learn about Datasets, part of the open source Cask Data Application Platform (CDAP), which provide reusable implementations of common data access patterns. We will also look at how Datasets provide a set of common services that extend the capabilities of HBase: global transactions for multi-row or multi-table updates, read-less increments for write-optimized counters, and support for combined batch and real-time access.

Getting interested? Wait until you see the Ecosystem sneak preview next week!

Thank you to our sponsors – Bloomberg, Cask Data, Hortonworks, MapR, Pepperdata, Salesforce.com (Gold); Facebook, Google (Silver); Apache Software Foundation, The Hive Think Tank (Community); O’Reilly Media (Media) — without which HBaseCon would be impossible!

Categories: Hadoop

Using Apache Parquet at AppNexus

Cloudera Blog - Wed, 04/15/2015 - 16:10

Thanks to Chen Song, Data Team Lead at AppNexus, for allowing us to republish the following post about his company’s use case for Apache Parquet (incubating at this writing), the open standard for columnar storage across the Apache Hadoop ecosystem.

At AppNexus, over 2MM log events are ingested into our data pipeline every second. Log records are sent from upstream systems in the form of Protobuf messages. Raw logs are compressed in Snappy when stored on HDFS. That said, even with compression, this still leads to over 25TB of log data collected every day. On top of logs, we also have hundreds of MapReduce jobs that process and generate aggregated data. Collectively, we store petabytes of data in our primary Hadoop cluster.

Parquet (incubating at the time of writing) is a columnar storage format in the Hadoop ecosystem. Compared to a traditional row oriented format, it is much more efficient in storage and has better query performance. Parquet is widely used in the Hadoop world for analytics workloads by many query engines. Among them are engines on top of Hadoop, such as Apache HiveImpala, and systems that go beyond MapReduce to improve performance (Apache SparkPresto).

Parquet stores binary data in a column-oriented way, where the values of each column are organized so that they are all adjacent, enabling better compression. It is especially good for queries which read particular columns from a “wide” (with many columns) table, since only needed columns are read and IO is minimized. 

Data is important but storage costs money, not to mention the processing resources at this scale, including CPU, network IO, etc. With limited hardware provided, we started looking at member COGS since end of last year and would like to figure out a way to serve on-growing data needs. We think Parquet is the choice as it serves both needs, efficient and performant in both storage and processing.

The benchmark here measures some key performance metrics, including disk usage, resource usage, and query performance. We did comparisons on two storage formats:

  • Sequence Files with Protobuf compressed by Snappy
  • Parquet Files compressed by Snappy

(Note: This benchmark is not intended to provide a performance comparison among query engines out on the market. For now, we have chosen Hive as the simple query engine and we used default configurations for Hive without optimization, with the goal to make the results reproducible.)

DataSet

For testing, we picked one of our biggest tables, aggregated_impressions. Rows in this table represent impressions, callbacks and auctions joined at transactions level. It is now the biggest table stored in our Hadoop cluster, which currently takes 270TB of HDFS storage (810TB in raw storage after three replications), and serves as the primary source of data for most of the higher-level aggregated tables.

On HDFS, files for this table are stored in data blocks of size 128MB(134217728 bytes).

Disk Storage

We compare HDFS usage of a single day for this table in different storage formats and break down in hours. The result is shown in the following chart.

Query Performance

Query performance for Parquet tables really depends on the number of columns needed to process in SELECT, WHERE, and GROUP BY clauses of the query. Say the table has n columns, and m columns are needed in the query, we denote column ration t as determined by m / n. The smaller the ratio t is, the bigger performance improvement you will see for that query on Parquet tables than regular row-oriented tables.

We picked three common used queries on aggregated_impressions and measure their execution times and resource consumptions. The same set of queries were run against data stored in the two data formats, Parquet and Sequence with Protobuf.

  1. The first query does a simple aggregation for the table aggregated_impressions on one particular field. The only dimension used here is imp_type which has a low cardinality of 10. The aggregation generates metrics on double_monetary_field1 and double_monetary_field2.

  2. The second query is pulled from one of our hive jobs, agg_high_dimesion_data. The query performs a more complex, high cardinality aggregation on multiple different dimensions and metrics.

  3. The third query is a common type ad hoc query to find some sparse records in the massive dataset. Sometimes, such type of query may produce no output.

For all 3 tests, Hive queries ran with the same configuration.

  • Block size is 134217728 bytes
  • Hive max split size is 256000000 bytes
Comparison over MR Metrics Query 1

SELECT imp_type, Sum(double_monetary_field1), Sum(double_monetary_field1) FROM aggregated_impressions WHERE dh = '${hour}' GROUP BY imp_type;

Query 2

INSERT overwrite TABLE *agg_high_dimesion_data* partition ( dy='2015', dm='2015-03', dd='2015-03-30', dh='${hour}' ) SELECT date_trunc('hour',date_time) AS ymdh, bl.aid, bl.cgid, bl.cid, bl.crid, bl.pid, bl.vid, CASE WHEN int_field1 &lt; 20 THEN int_field1 ELSE 20 END AS int_field1_cal, CASE ... // A large CASE WHEN statement END AS int_field2_cal, sum(bl.isI) AS i_count, sum(bl.isC) AS c_count, sum(pc_number) AS pc_count, sum(bl.pv_number) AS pv_count, to_numeric(sum(double_field)/1000, '%18.6f') AS double_field FROM ( SELECT * FROM aggregated_impressions WHERE date_time &gt;= '${hour}:00:00' AND date_time &lt;= '${hour}:59:59' AND dh = '${hour}' ) bl GROUP BY date_trunc('hour',date_time) , bl.aid, bl.cgid, bl.cid, bl.crid, bl.pid, bl.vid, CASE WHEN some_frequency &lt; 20 THEN some_frequency ELSE 20 END , CASE ... // A large CASE WHEN statement END

Query 3

SELECT auction_id_64, imp_type, date_time FROM aggregated_impressions WHERE aid=1234567 AND id1=1111 AND id2=4567 AND dh = '${hour}' ORDER BY date_time ASC limit 25;

For all three tests, queries on Parquet table needed only half as many maps as those on the Protobuf table. We created several plots below that break down some of the key MapReduce job performance metrics.

  • Total CPU Time

  • Average Map Time

As you can see from the above charts, Parquet really excels when the query is on sparse data or low cardinality in column selection. The CPU usage can be reduced by more than 98% in total and the total memory consumption is cut by at least half. To summarize, Parquet is a better technology in storage and computation resources. At the time of this writing, we have migrated a few of our biggest tables on Parquet.

Upstream log data comes in as Protobuf. After ingestion, we validate this data before loading to HDFS and also whenever jobs read any Protobuf messages from HDFS. This validation framework, along with our entire MapReduce job API framework are tied to the Protobuf data format. 

Categories: Hadoop

"Hadoop: The Definitive Guide" is Now a 4th Edition

Cloudera Blog - Mon, 04/13/2015 - 14:23

Apache Hadoop ecosystem, time to celebrate! The much-anticipated, significantly updated 4th edition of Tom White’s classic O’Reilly Media book, Hadoop: The Definitive Guide, is now available.

The Hadoop ecosystem has changed a lot since the 3rd edition. How are those changes reflected in the new edition?

The core of the book is about the core Apache Hadoop project, and since the 3rd edition, Hadoop 2 has stabilized and become the Hadoop runtime that most people are using. The 3rd edition actually covered both Hadoop 1 (based on the JobTracker) and Hadoop 2 (based on YARN), which made things a bit awkward at times since it flipped between the two and had to describe the differences. Only Hadoop 2 is covered in the 4th edition, which simplifies things considerably. The YARN material has been expanded and now has a whole chapter devoted to it.

This update is the biggest since the 1st edition, and in response to reader feedback, I reorganized the chapters to simplify the flow. The new edition is broken into parts (I. Hadoop Fundamentals, II. MapReduce, III. Hadoop Operations, IV. Related Projects, V. Case Studies), and includes a diagram to show possible pathways through the book (on p. 17).

The Hadoop ecosystem has been growing faster with each new edition, which makes it impossible to cover everything; even if I wanted to, there wouldn’t be enough space. The book is aimed primarily at users doing data processing, so in this edition I added two new chapters about processing frameworks (Apache Spark and Apache Crunch), one on data formats (Apache Parquet, incubating at this writing) and one on data ingestion (Apache Flume).

I’m also really pleased with the two new case studies in this edition: one about how Hadoop is used to manage records in a healthcare system (by Ryan Brush and Micah Whitacre), and one on building big data genomics pipelines (by Matt Massie).

Based on those changes, what do you want readers to learn?

I think the core Hadoop features are still important to understand—things like how HDFS stores files in blocks, how MapReduce input splits work, how YARN schedules work across nodes in the cluster. These ideas provide the foundation for learning how components covered in later chapters take advantage of these features. For example, Spark uses MapReduce input formats for reading and writing data efficiently, and it can run on YARN.

Beyond that, we’ve seen how the Hadoop platform as a whole has become even more powerful and flexible, and the new chapters reflect some of these new capabilities, such as iterative processing with Spark.

In a nutshell, what does your research process/methodology look like?

I think the two main things that readers want from a book like this are: 1) good examples for each component, and 2) an explanation of how the component in question works. Examples are important since they are concrete and allow readers to start using and exploring the system. In addition, a good mental model is important for understanding how the system works so users can reason about it, and extend the examples to cover their own use cases.

There’s a Martin Gardner quote that I cite in the book, and which sums up my approach to writing about technology: “Beyond calculus, I am lost. That was the secret of my column’s success. It took me so long to understand what I was writing about that I knew how to write in a way most readers would understand.”

I find that there’s really no substitute for reading the code to understand how a component works. I spend a lot of time writing small examples to test how different aspects of the component work. A few of these are turned into examples for the book. I also spend a lot of time reading JIRAs to understand the motivation for features, their design, and how they relate to other features. Finally, I’m very lucky to have access to a talented group of reviewers who work on Hadoop projects. Their feedback has undoubtedly improved the book.

Nothing can be completely “definitive.” What is good complementary material for this book?

The goal of my book is to explain how the component parts of Hadoop and its ecosystem work and how to use them—the nuts and bolts, as it were. What it doesn’t do is explain how to tie all the pieces together to build applications. For this I recommend Hadoop Application Architectures by Mark Grover, Ted Malaska, Jonathan Seidman, and Gwen Shapira, which explains how to select Hadoop components and use them to build a data application. For building machine-learning applications, I like Advanced Analytics with Spark by Sandy Ryza, Uri Laserson, Sean Owen, and Josh Wills.

My book has some material for Hadoop administrators, but Eric Sammer’s Hadoop Operations (2nd edition forthcoming) goes into a lot more depth. There are also books for most of the Hadoop components that go into more depth than mine.

It’s really gratifying to see the large number books coming out in the Hadoop and big data space.

Do you have a 5th edition in you?

I like to think so, but I’m not sure my family would agree (yet)!

Categories: Hadoop

Sneak Preview: HBaseCon 2015 Operations Track

Cloudera Blog - Fri, 04/10/2015 - 13:57

This year’s HBaseCon Operations track features some of the world’s largest and most impressive operators.

In this post, I’ll give you a window into the HBaseCon 2015′s (May 7 in San Francisco) Operations track.

Thanks, Program Committee!

  • “HBase Operations in a Flurry”

    Rahul Gidwani & Ian Friedman (Yahoo!)

    With multiple clusters of 1,000+ nodes replicated across multiple data centers, Flurry has learned many operational lessons over the years. In this talk, you’ll explore the challenges of maintaining and scaling Flurry’s cluster, how we monitor, and how we diagnose and address potential problems. 

  • ” HBase at Scale in an Online and High-Demand Environment”

    Jeremy Carroll & Tian Ying-Chang (Pinterest)

    Pinterest runs 38 different HBase clusters in production, doing a lot of different types of work—with some doing up to 5 million operations per second. In this talk, you’ll get details about how we do capacity planning, maintenance tasks such as online automated rolling compaction, configuration management, and monitoring.

  • “OpenTSDB and AsyncHBase Update”

    Chris Larsen (Yahoo!) & Benoît Sigoure (Arista Networks)

    OpenTSDB continues to scale along with HBase. A number of updates have been implemented to push writes over 2 million data points a second. Here we will discuss about HBase schema improvements, including salting, random UI assignment, and using append operations instead of puts. You’ll also get AsyncHBase development updates about rate limiting, statistics, and security. 

  • “Multitenancy in HBase: Learnings from Yahoo!” 

    Francis Liu & Vandana Ayyalasomayajula (Yahoo!)

    Since 2013, thanks to a combination of deployment and HBase enhancements, Yahoo! has successfully supported a diverse set of tenants in a single HBase cluster. Here you’ll learn how this approach makes it feasible to support small and large tenants cost-effectively, minimizes operational overhead, and provides a lot of flexibility.

  • “Smooth Operators: Panel”

    Shaohui Liu & Jianwei Cui (Xiaomi)

    In this session, you will learn the work Xiaomi has done to improve the availability and stability of our HBase clusters, including cross-site data and service backup and a coordinated compaction framework. You’ll also learn about the Themis framework, which supports cross-row transactions on HBase based on Google’s percolator algorithm, and its usage in Xiaomi’s applications.

  • “DeathStar: Easy, Dynamic, Multi-tenant HBase via YARN”

    Cosmin Lehene (Adobe)

    Adobe has packaged HBase in Docker containers and uses Marathon and Mesos to schedule them—allowing us to decouple the RegionServer from the host, express resource requirements declaratively, and open the door for unassisted real-time deployments, elastic (up and down) real-time scalability, and more. In this talk, you’ll hear what we’ve learned and explain why this approach could fundamentally change HBase operations.

  • “Elastic HBase on Mesos”

    Nitin Aggarwal & Ishan Chhabra (Rocket Fuel)

    In this talk, you’ll learn how Rocket Fuel has developed various HBase access patterns and multi-tenancy scenarios and the role of DeathStar, an in-house solution built on top of Apache Slider and YARN. We’ll cover how we use a single YARN cluster to host multiple smaller and highly customized HBase clusters, and how dynamic provisioning and elastic scaling are made possible in this model.

    Getting interested? Wait until you see the Development & Internals sneak preview next week!

Thank you to our sponsors – Bloomberg, Cask Data, Hortonworks, MapR, Pepperdata, Salesforce.com (Gold); Facebook, Google (Silver); Apache Software Foundation, The Hive Think Tank (Community); O’Reilly Media (Media) — without which HBaseCon would be impossible!

Categories: Hadoop

Pages