How-to: Scan Salted Apache HBase Tables with Region-Specific Key Ranges in MapReduce

Cloudera Blog - Wed, 06/24/2015 - 16:44

Thanks to Pengyu Wang, software developer at FINRA, for permission to republish this post.

Salted Apache HBase tables with pre-split is a proven effective HBase solution to provide uniform workload distribution across RegionServers and prevent hot spots during bulk writes. In this design, a row key is made with a logical key plus salt at the beginning. One way of generating salt is by calculating n (number of regions) modulo on the hash code of the logical row key (date, etc).

Salting Row Keys

For example, a table accepting data load on a daily basis might use logical row keys starting with a date, and we want to pre-split this table into 1,000 regions. In this case, we expect to generate 1,000 different salts. The salt can be generated, for example, as:

StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey  logicalKey = 2015-04-26|abc rowKey = 893|2015-04-26|abc

The output from hashCode() with modulo provides randomness for salt value from “000” to “999”. With this key transform, the table is pre-split on the salt boundaries as it’s created. This will make row volumes uniformly distributed while loading the HFiles with MapReduce bulkload. It guarantees that row keys with same salt fall into the same region.

In many use cases, like data archiving, you need to scan or copy the data over a particular logical key range (date range) using MapReduce job. Standard table MapReduce jobs are setup by providing the Scan instance with key range attributes.

Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setBatch(1000); scan.setMaxVersions(1); scan.setStartRow(Bytes.toBytes("2015-04-26")); scan.setStopRow(Bytes.toBytes("2015-04-27")); /* Setup the table mapper job */ TableMapReduceUtil.initTableMapperJob( tablename, scan, DataScanMapper.class, ImmutableBytesWritable.class, KeyValue.class, job,  true,  TableInputFormat.class ); …

However, setup of such a job becomes challenging for salted pre-splitted tables. Start and stop row keys will be different for each region because each has a unique salt. And we can’t specify multiple ranges to one Scan instance.

To solve this problem, we need to look into how table MapReduce works. Generally, the MapReduce framework creates one map task to read and process each input split. Each split is generated in InputFormat class base, by method getSplits().

In HBase table MapReduce job, TableInputFormat is used as InputFormat. Inside the implementation, the getSplits() method is overridden to retrieve the start and stop row keys from the Scan instance. As the start and stop row keys span across multiple regions, the range is divided by region boundaries and returns the list of TableSplit objects which covers the scan key range. Instead of being based by HDFS block, TableSplits are based on region. By overwriting the getSplits() method, we are able to control the TableSplit.

Building Custom TableInputFormat

To change the behavior of the getSplits() method, a custom class extending TableInputFormat is required. The purpose of getSplits() here is to cover the logical key range in each region, construct their row key range with their unique salt. The HTable class provides method getStartEndKeys() which returns start and end row keys for each region. From each start key, parse the corresponding salt for the region.

Pair keys = table.getStartEndKeys(); for (int i = 0; i < keys.getFirst().length; i++) { // The first 3 bytes is the salt, for the first region, start key is empty, so apply “000” if (keys.getFirst()[i].length == 0) { regionSalt = "000"; } else { regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3); } … }

Job Configuration Passes Logical Key Range

TableInputFormat retrieves the start and stop key from Scan instance. Since we cannot use Scan in our MapReduce job, we could use Configuration instead to pass these two variables and only logical start and stop key is good enough (a variable could be a date or other business information). The getSplits() method has JobContext argument, The configuration instance can be read as context.getConfiguration().

In MapReduce driver:

Configuration conf = getConf(); conf = HBaseConfiguration.addHbaseResources(conf); conf.set("logical.scan.start", "2015-04-26"); conf.set("logical.scan.stop", "2015-04-27");

In Custom TableInputFormat:

@Override  public List getSplits(JobContext context) throws IOException { conf = context.getConfiguration(); String scanStart = conf.get("logical.scan.start"); String scanStop = conf.get("logical.scan.stop"); … }

Reconstruct the Salted Key Range by Region

Now that we have the salt and logical start/stop key for each region, we can rebuild the actual row key range.

byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);

Creating a TableSplit for Each Region

With row key range, we can now initialize TableSplit instance for the region.

List splits = new ArrayList(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { … byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop); InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation); splits.add(split); }

One more thing to look at is data locality. The framework uses location information in each input split to assign a map task in its local host. For our TableInputFormat, we use the method getTableRegionLocation() to retrieve the region location serving the row key.

This location is then passed to the TableSplit constructor. This will assure that the mapper processing the table split is on the same region server. One method, called DNS.reverseDns(), requires the address for the HBase name server. This attribute is stored in configuration “hbase.nameserver.address“.

this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null); … public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException { HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress(); InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress(); String regionLocation; try { regionLocation = reverseDNS(regionAddress); } catch (NamingException e) { regionLocation = regionServerAddress.getHostname(); } return regionLocation; } protected String reverseDNS(InetAddress ipAddress) throws NamingException { String hostName = this.reverseDNSCacheMap.get(ipAddress); if (hostName == null) { hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); this.reverseDNSCacheMap.put(ipAddress, hostName); } return hostName; }

A complete code of getSplits will look like this:

@Override  public List getSplits(JobContext context) throws IOException { conf = context.getConfiguration(); table = getHTable(conf); if (table == null) { throw new IOException("No table was provided."); } // Get the name server address and the default value is null. this.nameServer = conf.get("hbase.nameserver.address", null); String scanStart = conf.get("region.scan.start"); String scanStop = conf.get("region.scan.stop"); Pair keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new RuntimeException("At least one region is expected"); } List splits = new ArrayList(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]); String regionSalt = null; if (keys.getFirst()[i].length == 0) { regionSalt = "000"; } else { regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3); } byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop); InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation); splits.add(split); } log.info("Total table splits: " + splits.size()); return splits; }

Use the Custom TableInoutFormat in the MapReduce Driver

Now we need to replace the TableInputFormat class with the custom build we used for table MapReduce job setup.

Configuration conf = getConf(); conf = HBaseConfiguration.addHbaseResources(conf); HTableInterface status_table = new HTable(conf, status_tablename); conf.set("logical.scan.start", "2015-04-26"); conf.set("logical.scan.stop", "2015-04-27"); Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setBatch(1000); scan.setMaxVersions(1); /* Setup the table mapper job */ TableMapReduceUtil.initTableMapperJob( tablename, scan, DataScanMapper.class, ImmutableBytesWritable.class, KeyValue.class, job,  true,  MultiRangeTableInputFormat.class );

The approach of custom TableInputFormat provides an efficient and scalable scan capability for HBase tables which are designed to use salt for a balanced data load. Since the scan can bypass any unrelated row keys, regardless of how big the table is, the scan’s complexity is limited only to the size of the target data. In most use cases, this can guarantee relatively consistent processing time as the table grows.

Categories: Hadoop

Cloudera Navigator Encrypt Architecture: The Overview

Cloudera Blog - Thu, 06/11/2015 - 14:58

Cloudera Navigator Encrypt is a key security feature in production-deployed enterprise data hubs. This post explains how it works.

Cloudera Navigator Encrypt, which is integrated with Cloudera Navigator (the native, end-to-end governance solution for Apache Hadoop-based systems), provides massively scalable, high-performance encryption for critical Hadoop data. It utilizes industry-standard AES-256 encryption and provides a transparent layer between the application and filesystem. Navigator Encrypt also includes process-based access controls, allowing authorized Hadoop processes to access encrypted data while simultaneously preventing admins or super-users like root from accessing data that they don’t need to see.

In this post, I’ll provide an overview of key features as well as some information about how Navigator users can get started.


Navigator Encryptis part of Cloudera’s overall encryption-at-rest solution, along with HDFS encryption—which operates at the HDFS folder level, enabling encryption to be applied only to HDFS folders where needed—and Navigator Key Trustee, which is a virtual safe-deposit box for managing encryption keys, certificates, and passwords.

Features of Navigator Encrypt include:

  • Advanced key management: Stores keys separate from the encrypted data to ensure a data breach does not also result in the loss of the cryptographic key
  • Transparent data encryption: Protects data at rest resulting in minimal performance impact. Requires no complex changes to databases, files, applications or storage
  • Process-based access controls: Restricts access to specific processes rather than by OS user
  • Encrypting and decryption of unstructured data: Secures personally identifiable information, intellectual property, log files, and any other sensitive data that could be considered damaging if exposed outside the business
  • Performance: Supports the Intel AES-NI cryptographic accelerator for enhanced performance in the encryption and decryption process
  • Compliance: Helps enable compliance with HIPAA-HITECH, PCI-DSS, FISMA, EU Data Protection Directive, and other data security regulations
  • Multi-distribution support: Supported on Debian, Ubuntu, CentOS, Red Hat, and SUSE
  • Simple installation: Distributed in RPM and DEB packages, as well as SUSE KMPs
  • Multiple mountpoints: Support for multiple encrypted mountpoints managed with individual keys

The security of cloud IT data depends on the safety of cryptographic keys, SSL certificates, database tokens, and other opaque objects. Navigator Encrypt also helps to protect these critical IT secrets from unauthorized access and helps meet strict data security regulations.

What can be secured? Supported assets include (but are not limited to):

  • Databases like Hive and Impala Metastore (postgreSQL and MySQL)
  • Temporal files such as YARN containers
  • Log files of any CDH project or Cloudera Manager
  • Data directories like HDFS
  • Config files
Navigator Encrypt in Simple Steps

After installing Navigator Encrypt two operations are required: navencrypt register and navencrypt-prepare. After that, it is possible to encrypt the data with the navencrypt-move command and add the ACL rule(s) with the navencrypt acl command:

root@demo:~# navencrypt register -s SERVER -o ORG -a AUTH root@demo:~# navencrypt-prepare /dev/xvdf /mnt/zncrypt root@demo:~# navencrypt-move encrypt @data /var/lib/myapp /mnt/zncrypt root@demo:~# navencrypt acl --add --rule="ALLOW @data * /usr/bin/myapp"

The commands above accomplish the following tasks (in order):

  • Registers Navigator Encrypt on a Key Trustee server for key storage
  • Configures a block-device or directory for transparent encryption (encryption keys are sent to Key Trustee)
  • Encrypts a directory on a specific navencrypt mount point
  • Edits the access control list

Navigator Encrypt depends on dmcrypt to perform the encryption. The following is a brief glossary of those terms as well as encryption keys related to Navigator Encrypt.

Master Key: the key the user knows and types when registering Navigator Encrypt with the Key Trustee. The master key can be in the form of a single passphrase, dual-passphrase, or RSA file. Uploaded to Key Trustee.

Mount Encryption Key (MEK): a key generated by Navigator Encrypt by openssl rand (default) or /dev/urandom (as an option). This key is generated when preparing a new mount point; every Navigator Encrypt-prepared mount point has a different MEK. Uploaded to Key Trustee.

dmcrypt Device Encryption Key (dmcrypt DEK): NOT managed by Navigator Encrypt, and is not uploaded to Key Trustee. dmcrypt saves the DEK on the header of the device and manages its DEKs locally.

dmcrypt Transparent Device-level Data Encryption: provides block-level encryption that is applied per device. Its cryptographic metadata is stored in the header of the block device, which means that there is only a single dmcrypt DEK (thus, it has good I/O performance). Performance is also improved when running on the AES-NI Intel chip.

The following diagram illustrates how a process (myapp) tries to access an encrypted directory named /mydata. dmcrypt stores its DEK in the header of the device already encrypted with MEK, and the data is written already encrypted with the dmcrypt DEK.

Process-Based Access Control List (ACL)

The ACL allows access to specific authorized processes by storing process fingerprints, which is the sha256sum of the process file, for authentication. The rules are created to authorize access to specific files or directories through a category that works as an anchor. The ACL rules file is encrypted with the master key and is stored locally for quick access and updates.

An example of a rule is: "ALLOW @mydata * /usr/bin/myapp". The meaning of this rule is that the myapp process is allowed to access to any encrypted path (*) that was encrypted under the category @mydata.

Navigator Encrypt depends on the zncrypt-kernel-module that serves as interceptor of any IO sent to an encrypted and managed path. The Linux module filename is zncryptfs.ko and it resides in the kernel stack injecting filesystem hooks; it also performs authentication/authorization of processes and provides authentication cache for better performance. Since zncrypt-kernel-module behaves as an interceptor or filter and the IO is not modified in any way, it supports any filesystem (ext3/ext4, xfs, etc.)

The following diagram shows myapp sending an ‘open()’ call that is intercepted by the zncrypt-kernel-module as an ‘open’ hook, the process fingerprint evaluation by the ACL authentication, the zncrypt-kernel-module reading the process fingerprint, and evaluating if the ‘Authentication cache’ already has the fingerprint—and if it does, allowing access.

When an ACL rule is added, a master key is required to authenticate the user. At that point, the ACL rules file is updated as well as the zncrypt-kernel-module cache.

The next diagram illustrates the different stages of Navigator Encrypt: how the user adds a rule to allow myapp to access the encrypted data in category @mylogs, and adds another rule to allow myapp's process to access encrypted data in category @mydata. These two rules are loaded into the zncrypt-kernel-module cache after restart. The fingerprints are also added to the cache as well as the process name, path, and category.

Now that the cache has the fingerprints of the allowed process, any authenticated IO can be securely saved in the respective datastore.

The directory /mydata is encrypted under @mydata category and /mylogs directory is encrypted under @mylogs category under dmcrypt (block device encryption). The kernel module gets the fingerprint of the myapp process that tries to issue an IO and then compares its process fingerprint with the list of authorized fingerprints cache.

Storing Encryption Keys on Key Trustee

The master key and mount encryption keys are securely deposited in the Key Trustee. There is one MEK per mount point, which are stored locally for offline recovery and quick access. Before being stored, the local MEKs are encrypted with the master key.

The connection between Navigator Encrypt and Key Trustee is secured with SSL certificates. Navigator Encrypt uses a Key Trustee C API installed locally that serves as an interface with the Key Trustee. Navigator Encrypt clients offer an extra security and authentication mechanism with GPG keys through the Key Trustee API.

The following diagram shows the communication process between Navigator Encrypt and Key Trustee. For example, the master key is encrypted with local GPG keys and before being stored in the Key Trustee database, is encrypted again with Key Trustee Server GPG Keys. When the master key is fetched to perform any Navigator Encrypt operation, the Key Trustee server decrypts the stored key with its server GPG keys, and it sent back to the client—in this case Navigator Encrypt, which decrypts the deposit with the local GPG keys so the master key is then readable. All this communications occurs through SSL connections.

Next Steps

Navigator Encrypt has been running in production systems for approximately 5 years and has proven effective for encrypting sensitive data stored on Hadoop, Cassandra, MongoDB, and more across highly regulated industries.

With this blog post, Cloudera Navigator users should know what to do to get started. If you are not currently a Navigator user, you can download and use Cloudera Manager to install a 60-day trial.

Alex Gonzalez is a Software Engineer at Cloudera.

Categories: Hadoop

Inside Apache HBase’s New Support for MOBs

Cloudera Blog - Mon, 06/08/2015 - 15:52

Learn about the design decisions behind HBase’s new support for MOBs.

Apache HBase is a distributed, scalable, performant, consistent key value database that can store a variety of binary data types. It excels at storing many relatively small values (<10K), and providing low-latency reads and writes.

However, there is a growing demand for storing documents, images, and other moderate objects (MOBs)  in HBase while maintaining low latency for reads and writes. One such use case is a bank that stores signed and scanned customer documents. As another example, transport agencies may want to store  snapshots of traffic and moving cars. These MOBs are generally write-once.

Unfortunately, performance can degrade in situations where many moderately sized values (100K to 10MB) are stored due to the ever-increasing  I/O pressure created by compactions. Consider the case where 1TB of photos from traffic cameras, each 1MB in size, are stored into HBase daily. Parts of the stored files are compacted multiple times via minor compactions and eventually, data is rewritten by major compactions. Along with accumulation of these MOBs, I/O created by compactions will slow down the compactions, further block memstore flushing, and eventually block updates. A big MOB store will trigger frequent region splits, reducing the availability of the affected regions.

In order to address these drawbacks, Cloudera and Intel engineers have implemented MOB support in an HBase branch (hbase-11339: HBase MOB). This branch will be merged to the master in HBase 1.1 or 1.2, and is already present and supported in CDH 5.4.x, as well. 

Operations on MOBs are usually write-intensive, with rare updates or deletes and relatively infrequent reads. MOBs are usually stored together with their metadata. Metadata relating to MOBs may include, for instance, car number, speed, and color. Metadata are very small relative to the MOBs. Metadata are usually accessed for analysis, while MOBs are usually randomly accessed only when they are explicitly requested with row keys.

Users want to read and write the MOBs in HBase with low latency in the same APIs, and want strong consistency, security, snapshot and HBase replication between clusters, and so on. To meet these goals, MOBs were moved out of the main I/O path of HBase and into a new I/O path.

In this post, you will learn about this design approach, and why it was selected.

Possible Approaches

There were a few possible approaches to this problem. The first approach we considered was to store MOBs in HBase with a tuned split and compaction policies—a bigger desiredMaxFileSize decreases the frequency of region split, and fewer or no compactions can avoid the write amplification penalty. That approach would improve write latency and throughput considerably. However, along with the increasing number of stored files, there would be too many opened readers in a single store, even more than what is allowed by the OS. As a result, a lot of memory would be consumed and read performance would degrade.

Another approach was to use an HBase + HDFS model to store the metadata and MOBs separately. In this model, a single file is linked by an entry in HBase. This is a client solution, and the transaction is controlled by the client—no HBase-side memories are consumed by MOBs. This approach would work for objects larger than 50MB, but for MOBs, many small files lead to inefficient HDFS usage since the default block size in HDFS is 128MB.

For example, let’s say a NameNode has 48GB of memory and each file is 100KB with three replicas. Each file takes more than 300 bytes in memory, so a NameNode with 48GB memory can hold about 160 million files, which would limit us to only storing 16TB MOB files in total.

As an improvement, we could have assembled the small MOB files into bigger ones—that is, a file could have multiple MOB entries–and store the offset and length in the HBase table for fast reading. However, maintaining data consistency and managing deleted MOBs and small MOB files in compactions are difficult.

Furthermore, if we were to use this approach, we’d have to consider new security policies, lose atomicity properties of writes, and potentially lose the backup and disaster recovery provided by replication and snapshots.

HBase MOB Design

In the end, because most of the concerns around storing MOBs in HBase involve the I/O created by compactions, the key was to move MOBs out of management by normal regions to avoid region splits and compactions there.

The HBase MOB design is similar to the HBase + HDFS approach because we store the metadata and MOBs separately. However, the difference lies in a server-side design: memstore caches the MOBs before they are flushed to disk, the MOBs are written into a HFile called “MOB file” in each flush, and each MOB file has multiple entries instead of single file in HDFS for each MOB. This MOB file is stored in a special region. All the read and write can be used by the current HBase APIs.

Write and Read

Each MOB has a threshold: if the value length of a cell is larger than this threshold, this cell is regarded as a MOB cell.

When the MOB cells are updated in the regions, they are written to the WAL and memstore, just like the normal cells. In flushing, the MOBs are flushed to MOB files, and the metadata and paths of MOB files are flushed to store files. The data consistency and HBase replication features are native to this design.

The MOB edits are larger than usual. In the sync, the corresponding I/O is larger too, which can slow down the sync operations of WAL. If there are other regions that share the same WAL, the write latency of these regions can be affected. However, if the data consistency and non-volatility are needed, WAL is a must.

The cells are permitted to move between stored files and MOB files in the compactions by changing the threshold. The default threshold is 100KB.

As illustrated below, the cells that contain the paths of MOB files are called reference cells. The tags are retained in the cells, so we can continue to rely on the HBase security mechanism.

The reference cells have reference tags that differentiates them from normal cells. A reference tag implies a MOB cell in a MOB file, and thus further resolving is needed in reading.

In reading, the store scanner opens scanners to memstore and store files. If a reference cell is met, the scanner reads the file path from the cell value, and seeks the same row key from that file. The block cache can be enabled for the MOB files in scan, which can accelerate seeking.

It is not necessary to open readers to all the MOB files; only one is needed when required. This random read is not impacted by the number of MOB files. So, we don’t need to compact the MOB files over and over again when they are large enough.

The MOB filename is readable, and comprises three parts: the MD5 of the start key, the latest date of cells in this MOB file, and a UUID. The first part is the start key of the region from where this MOB file is flushed. Usually, the MOBs have a user-defined TTL, so you can find and delete expired MOB files by comparing the second part with the TTL.


To be more friendly to the snapshot, the MOB files are stored in a special dummy region, whereby the snapshot, table export/clone, and archive work as expected.

When storing a snapshot to a table, one creates the MOB region in the snapshot, and adds the existing MOB files into the manifest. When restoring the snapshot, create file links in the MOB region.

Clean and compactions

There are two situations when MOB files should be deleted: when the MOB file is expired, and when the MOB file is too small and should be merged into bigger ones to improve HDFS efficiency.

HBase MOB has a chore in master: it scans the MOB files, finds the expired ones determined by the date in the filename, and deletes them. Thus disk space is reclaimed periodically by aging off expired MOB files.

MOB files may be relatively small compared to a HDFS block if you write rows where only a few entries qualify as MOBs; also, there might be deleted cells. You need to drop the deleted cells and merge the small files into bigger ones to improve HDFS utilization. The MOB compactions only compact the small files and the large files are not touched, which avoids repeated compaction to large files.

Some other things to keep in mind:

  • Know which cells are deleted. In every HBase major compaction, the delete markers are written to a del file before they are dropped.
  • In the first step of MOB compactions, these del files are merged into bigger ones.
  • All the small MOB files are selected. If the number of small files is equal to the number of existing MOB files, this compaction is regarded as a major one and is called an ALL_FILES compaction.
  • These selected files are partitioned by the start key and date in the filename. The small files in each partition are compacted with del files so that deleted cells could be dropped; meanwhile, a new HFile with new reference cells is generated, the compactor commits the new MOB file, and then it bulk loads this HFile into HBase.
  • After compactions in all partitions are finished, if an ALL_FILES compaction is involved, the del files are archived.

The life cycle of MOB files is illustrated below. Basically, they are created when memstore is flushed, and deleted by HFileCleaner from the filesystem when they are not referenced by the snapshot or expired in the archive.


In summary, the new HBase MOB design moves MOBs out of the main I/O path of HBase while retaining most security, compaction, and snapshotting features. It caters to the characteristics of operations in MOB, makes the write amplification of MOBs more predictable, and keeps low latencies in both reading and writing.

Jincheng Du is a Software Engineer at Intel and an HBase contributor.

Jon Hsieh is a Software Engineer at Cloudera and an HBase committer/PMC member. He is also the founder of Apache Flume, and a committer on Apache Sqoop.

Categories: Hadoop

New in CDH 5.4: Sensitive Data Redaction

Cloudera Blog - Wed, 06/03/2015 - 15:37

The best data protection strategy is to remove sensitive information from everyplace it’s not needed.

Have you ever wondered what sort of “sensitive” information might wind up in Apache Hadoop log files? For example, if you’re storing credit card numbers inside HDFS, might they ever “leak” into a log file outside of HDFS? What about SQL queries? If you have a query like select * from table where creditcard = '1234-5678-9012-3456', where is that query information ultimately stored?

This concern affects anyone managing a Hadoop cluster containing sensitive information. At Cloudera, we set out to address this problem through a new feature called Sensitive Data Redaction, and it’s now available starting in Cloudera Manager 5.4.0 when operating on a CDH 5.4.0 cluster.

Specifically, this feature addresses the “leakage” of sensitive information into channels unrelated to the flow of data–not the data stream itself. So, for example, Sensitive Data Redaction will get credit-card numbers out of log files and SQL queries, but it won’t touch credit-card numbers from the actual data returned from an SQL query. nor modify the stored data itself.


Our first step was to study the problem: load up a cluster with sensitive information, run queries, and see if we could find the sensitive data outside of the expected locations. In the case of log files, We found that SQL queries themselves were written to several log files. Beyond the SQL queries, however, we did not observe any egregious offenders; developers seem to know that writing internal data to log files is a bad idea.

That’s the good news. The bad news is that the Hadoop ecosystem is really big, and there are doubtless many code paths and log messages that we didn’t exercise. Developers are also adding code to the system all the time, and future log messages might reveal sensitive data.

Looking more closely at how copies of SQL queries are distributed across the system was enlightening. Apache Hive writes a job configuration file that contains a copy of the query, and makes this configuration file available “pretty much everywhere.” Impala keeps queries and query plans around for debugging and record keeping and makes them available in the UI. Hue saves queries so they can be run again. This behavior makes perfect sense: users want to know what queries they have run, want to debug queries that went bad, and want information on currently running queries. When sensitive information is in the query itself, however, this helpfulness is suddenly much less helpful.

One way to tackle such “leakage” of sensitive data is to put log files in an encrypted filesystem such as that provided by Cloudera Navigator Encrypt. This strategy is reasonable and addresses compliance concerns, especially in the event that some users require the original queries.

This approach allows some users to still see the log files in the clear; the contents of the log files make their way to the Cloudera Manager UI. However, in most cases, the original query strings are not strictly required and the preferred solution is to simply remove (a.k.a. redact) the sensitive data entirely from places where it’s not needed.

The Approach

We decided to tackle redaction for log files as well as for SQL queries. Even though we observed little information leakage from log files, we decided that it would be better to be safe than sorry and apply redaction to all of them. We also wanted to protect against future log messages and code paths that we didn’t exercise. We therefore implemented “log redaction” code that plugs itself into the logging subsystem used by every component of Hadoop. This “log4j redactor” will inspect every log message as it’s generated, redact it, and pass it on to the normal logging stream.

The other component of this effort was to redact SQL queries at their source, which required work in Hive, Impala, and Hue (the open source GUI for Hadoop components). In both Hive and Impala, as soon as the query is entered it is split into two: the original copy that’s used to actually run the query, and a redacted copy of the query that’s shown to anything external. In Hue, queries are redacted as they are saved.

Finally, Cloudera Manager makes this all easy to configure. The administrator is able to specify a set of rules for redaction in one place, click a button, and have the redaction rules take effect on everything throughout the cluster.

Configuring Redaction

Let’s see how this works in practice. In Cloudera Manager there are two new parameters in HDFS, one to enable redaction and one to specify what to redact. Let’s say that I want to redact credit-card numbers. But because credit-card numbers are also a boring demo, let’s also say that I just read a Harry Potter book and would feel more comfortable if the name “Voldemort” were replaced by “He Who Shall Not Be Named.”

Redaction is an HDFS parameter that is applied to the whole cluster. The easiest way to find it is to simply search for ‘redact’ on the HDFS Configuration page:

Here there is no “Log and Query Redaction Policy” defined yet, and I’ve clicked on the + sign to add one. There are four presets in the UI, and it’s easy to create custom rules from scratch. For the credit-card numbers, I’ll select the first entry, “Credit Card numbers (with separator)”.

This action creates the first rule.

  • The “Description” is useful for record keeping and to remember what a rule is for, but has no impact on the actual redaction itself.
  • The “Trigger” field, if specified, limits the cases in which the “Search” field is applied. It’s a simple string match (not a regular expression), and if that string appears in the data to redact the “Search” regular expression is applied. This is a performance optimization: string matching is much faster than regular expressions.
  • The most important parameter is the “Search” field. It’s a regular expression that describes the thing to redact. The search parameter shown here is complicated and matches four digits, a separator, and so on that describes a credit card.
  • The final field, “Replace,” is what to put in the place of the text matched by “Search.”

Let’s now click on the + sign and select a custom rule.

The fields start out blank; I filled them in so that instances of the name “Voldemort” are replaced with “He Who Shall Not Be Named”.

Now, for a really useful part of the UI: It’s possible to test the existing redaction rules against some text in order to be certain that they work as expected. This takes the guesswork out of making redaction rules; you can easily see how they work in action. Here’s an example:

The Test Redaction Rules box has my sample sentence to redact, and the Output box shows that Voldemort’s name and credit-card information have both been replaced with something much more comforting.

After checking the “Enable Log and Query Redaction” box (visible in the first screenshot) and a cluster restart, these redaction rules are populated to the cluster. The easiest place to see them in effect is in Hue. For the purposes of this entirely contrived example, let’s say I have a table containing Harry Potter character credit-card information. Let’s try an Impala search inside Hue:

I actually typed in the word “Voldemort” into the UI. After being executed, the notification “This query had some sensitive information removed when saved” appeared. Let’s check out the list of recent queries:

Hue replaced Voldemort’s name with “He Who Shall Not Be Named” in the list of recent queries. We can also see what Impala does with this query. Going to the Impala Daemon’s web UI, we see the same redaction taking place:

The same redaction holds true for Hive, for log files, and for any other place where this query might have appeared.


We hope that you find this functionality useful. Several teams across CDH came together to make this project happen, including those working on Cloudera Manager, Impala, Hive, Hue, packaging, QA, and docs.

Removing sensitive information from places where it’s not needed is the simplest and most effective data protection strategy. The Sensitive Data Redaction feature achieves that goal throughout CDH and provides an easy, intuitive UI in Cloudera Manager.

Michael Yoder is a Software Engineer at Cloudera.

Categories: Hadoop

Architectural Patterns for Near Real-Time Data Processing with Apache Hadoop

Cloudera Blog - Mon, 06/01/2015 - 17:25

Evaluating which streaming architectural pattern is the best match to your use case is a precondition for a successful production deployment.

The Apache Hadoop ecosystem has become a preferred platform for enterprises seeking to process and understand large-scale data in real time. Technologies like Apache Kafka, Apache Flume, Apache Spark, Apache Storm, and Apache Samza are increasingly pushing the envelope on what is possible. It is often tempting to bucket large-scale streaming use cases together but in reality they tend to break down into a few different architectural patterns, with different components of the ecosystem better suited for different problems.

In this post, I will outline the four major streaming patterns that we have encountered with customers running enterprise data hubs in production, and explain how to implement those patterns architecturally on Hadoop.

Streaming Patterns

The four basic streaming patterns (often used in tandem) are:

  • Stream ingestion: Involves low-latency persisting of events to HDFS, Apache HBase, and Apache Solr.
  • Near Real-Time (NRT) Event Processing with External Context: Takes actions like alerting, flagging, transforming, and filtering of events as they arrive. Actions might be taken based on sophisticated criteria, such as anomaly detection models. Common use cases, such as NRT fraud detection and recommendation, often demand low latencies under 100 milliseconds.
  • NRT Event Partitioned Processing:  Similar to NRT event processing, but deriving benefits from partitioning the data—like storing more relevant external information in memory. This pattern also requires processing latencies under 100 milliseconds.
  • Complex Topology for Aggregations or ML: The holy grail of stream processing: gets real-time answers from data with a complex and flexible set of operations. Here, because results often depend on windowed computations and require more active data, the focus shifts from ultra-low latency to functionality and accuracy.

In the following sections, we’ll get into recommended ways for implementing such patterns in a tested, proven, and maintainable way.

Streaming Ingestion

Traditionally, Flume has been the recommended system for streaming ingestion. Its large library of sources and sinks cover all the bases of what to consume and where to write. (For details about how to configure and manage Flume, Using Flume, the O’Reilly Media book by Cloudera Software Engineer/Flume PMC member Hari Shreedharan, is a great resource.)

Within the last year, Kafka has also become popular because of powerful features such as playback and replication. Because of the overlap between Flume’s and Kafka’s goals, their relationship is often confusing. How do they fit together? The answer is simple: Kafka is a pipe similar to Flume’s Channel abstraction, albeit a better pipe because of its support for the features mentioned above. One common approach is to use Flume for the source and sink, and Kafka for the pipe between them.

The diagram below illustrates how Kafka can serve as the UpStream Source of Data to Flume, the DownStream destination of Flume, or the Flume Channel.

The design illustrated below is massively scalable, battle hardened, centrally monitored through Cloudera Manager, fault tolerant, and supports replay.

One thing to note before we go to the next streaming architecture is how this design gracefully handles failure. The Flume Sinks pull from a Kafka Consumer Group.  The Consumer group track the Topic’s offset with help from Apache ZooKeeper. If a Flume Sink is lost, the Kafka Consumer will redistribute the load to the remaining sinks. When the Flume Sink comes back up, the Consumer group will redistribute again.

NRT Event Processing with External Context

To reiterate, a common use case for this pattern is to look at events streaming in and make immediate decisions, either to transform the data or to take some sort of external action. The decision logic often depends on external profiles or metadata. An easy and scalable way to implement this approach is to add a Source or Sink Flume interceptor to your Kafka/Flume architecture.  With modest tuning, it’s not difficult to achieve latencies in the low milliseconds.

Flume Interceptors take events or batches of events and allow user code to modify or take actions based on them. The user code can interact with local memory or an external storage system like HBase to get profile information needed for decisions. HBase usually can give us our information in around 4-25 milliseconds depending on network, schema design, and configuration. You can also set up HBase in a way that it is never down or interrupted, even in the case of failure.

Implementation requires nearly no coding beyond the application-specific logic in the interceptor. Cloudera Manager offers an intuitive UI for deploying this logic through parcels as well as hooking up, configuring, and monitoring the services.

NRT Partitioned Event Processing with External Context

In the architecture illustrated below (unpartitioned solution), you would need to call out frequently to HBase because external context relevant to particular events does not fit in local memory on the Flume interceptors.

However, if you define a key to partition your data, you can match incoming data to the subset of the context data that is relevant to it. If you partition the data 10 times, then you only need to hold 1/10th of the profiles in memory. HBase is fast, but local memory is faster. Kafka enables you to define a custom partitioner that it uses to split up your data.

Note that Flume is not strictly necessary here; the root solution here just a Kafka consumer. So, you could use just a consumer in YARN or a Map-only MapReduce application.

Complex Topology for Aggregations or ML

Up to this point, we have been exploring event-level operations. However, sometimes you need more complex operations like counts, averages, sessionization, or machine-learning model building that operate on batches of data. In this case, Spark Streaming is the ideal tool for several reasons:

  • It’s easy to develop compared to other tools.

    Spark’s rich and concise APIs make building out complex topologies easy.

  • Similar code for streaming and batch processing.

    With a few changes, the code for small batches in real time can be used for enormous batches offline. In addition to reducing code size, this approach reduces the time needed for testing and integration.

  • There’s one engine to know.

    There is a cost that goes into training staff on the quirks and internals of distributed processing engines. Standardizing on Spark consolidates this cost for both streaming and batch.

  • Micro-batching helps you scale reliably.

    Acknowledging at a batch level allows for more throughput and allows for solutions without the fear of a double-send. Micro-batching also helps with sending changes to HDFS or HBase in terms of performance at scale.

  • Hadoop ecosystem integration is baked in.

    Spark has deep integration with HDFS, HBase, and Kafka.

  • No risk of data loss.

    Thanks to the WAL and Kafka, Spark Streaming avoids data loss in case of failure.

  • It’s easy to debug and run.

    You can debug and step through your code Spark Streaming in a local IDE without a cluster. Plus, the code looks like normal functional programing code so it doesn’t take much time for a Java or Scala developer to make the jump. (Python is also supported.)

  • Streaming is natively stateful.

    In Spark Streaming, state is a first-class citizen, meaning that it’s easy to write stateful streaming applications that are resilient to node failures.

  • As the de facto standard, Spark is getting long-term investment from across the ecosystem. 
    At the time of this writing, there were approximately 700 commits to Spark as a whole in the last 30 days—compared to other streaming frameworks such as Storm, with 15 commits during the same time.
  • You have access to ML libraries.
    Spark’s MLlib is becoming hugely popular and its functionality will only increase.
  • You can use SQL where needed.
    With Spark SQL, you can add SQL logic to your streaming application to reduce code complexity.

There is a lot of power in streaming and several possible patterns, but as you have learned in this post, you can do really powerful things with minimal coding if you know which pattern matches up with your use case best.

Ted Malaska is a Solutions Architect at Cloudera, a contributor to Spark, Flume, and HBase, and a co-author of the O’Reilly book, Hadoop Applications Architecture.

Categories: Hadoop

Security, Hive-on-Spark, and Other Improvements in Apache Hive 1.2.0

Cloudera Blog - Fri, 05/29/2015 - 16:06

Apache Hive 1.2.0, although not a major release, contains significant improvements.

Recently, the Apache Hive community moved to a more frequent, incremental release schedule. So, a little while ago, we covered the Apache Hive 1.0.0 release and explained how it was renamed from 0.14.1 with only minor feature additions since 0.14.0.

Shortly thereafter, Apache Hive 1.1.0 was released (renamed from Apache Hive 0.15.0), which included more significant features—including Hive-on-Spark.

Last week, the community released Apache Hive 1.2.0. Although a more narrow release than Hive 1.1.0, it nevertheless contains improvements in the following areas:

New Functionality
  • Support for Apache Spark 1.3 (HIVE-9726), enabling dynamic executor allocation and impersonation
  • Support for integration of Hive-on-Spark with Apache HBase (HIVE-10073)
  • Support for numeric partition columns with literals (HIVE-10313, HIVE-10307)
  • Support for Union Distinct (HIVE-9039)
  • Support for specifying column list in insert statement (HIVE-9481)
Performance and Optimizations Security Usability and Stability

For a larger but still incomplete list of features, improvements, and bug fixes, see the release notes. (Most of the Hive-on-Spark JIRAs are missing from the list.)

The most important improvements and fixes above (such as those involving security, for example) are already available in CDH 5.4.x releases. As another example, CDH users have been testing the Hive-on-Spark public beta since its first release, as well as improvements made to that beta in CDH 5.4.0.

We’re looking forward to working with the rest of the Apache Hive community to drive the project continually forward in the areas of SQL functionality, performance, security, and stability!

Xuefu Zhang is a Software Engineer at Cloudera and a PMC member of Apache Hive.


Categories: Hadoop

New in CDH 5.4: Apache HBase Request Throttling

Cloudera Blog - Thu, 05/28/2015 - 15:42

The following post about the new request throttling feature in HBase 1.1 (now shipping in CDH 5.4) originally published in the ASF blog. We re-publish it here for your convenience.

Running multiple workloads on HBase has always been challenging, especially  when trying to execute real-time workloads while concurrently running analytical jobs. One possible way to address this issue is to throttle analytical MR jobs so that real-time workloads are less affected.

A new QoS (quality of service) feature that Apache HBase 1.1 introduces is request-throttling, which controls the rate at which requests get handled by a HBase cluster. HBase typically treats all requests identically; however, the new throttling feature can be used to specify a maximum rate or bandwidth to override this behavior. The limit may be applied to a requests originating from a particular user, or alternatively, to requests directed to a given table or a specified namespace.

The objective of this post is to evaluate the effectiveness of this feature and the overhead it might impose on a running HBase workload. The performance runs carried out showed that throttling works very well, by redirecting resources from a user whose workload is throttled to the workloads of other users, without incurring a significant overhead in the process.

Enabling Request Throttling

It is straightforward to enable the request-throttling feature—all that is necessary is to set the HBase configuration parameter hbase.quota.enabled to true. The related parameter hbase.quota.refresh.period specifies the time interval in milliseconds that that regionserver should re-check for any new restrictions that have been added.

The throttle can then be set from the HBase shell, like so:

hbase> set_quota TYPE => THROTTLE, USER => 'uname', LIMIT => '100req/sec' hbase> set_quota TYPE => THROTTLE, TABLE => 'tbl', LIMIT => '10M/sec' hbase> set_quota TYPE => THROTTLE, NAMESPACE => 'ns', LIMIT => 'NONE'

Test Setup

To evaluate how effectively HBase throttling worked, a YCSB workload was imposed on a 10 node cluster. There were 6 RegionServers and 2 master nodes. YCSB clients were run on the 4 nodes that were not running RegionServer processes. The client processes were initiated by two separate users and the workload issued by one of them was throttled.

More details on the test setup follow.

HBase version:

HBase 0.98.6-cdh5.3.0-SNAPSHOT (HBASE-11598 was backported to this version)

CentOS release 6.4 (Final)
CPU sockets: 2
Physical cores per socket: 6
Total number of logical cores: 24
Number of disks: 12
Memory: 64GB
Number of RS: 6
Master nodes: 2  (for the Namenode, Zookeeper and HBase master)
Number of client nodes: 4
Number of rows: 1080M
Number of regions: 180
Row size: 1K
Threads per client: 40
Workload: read-only and scan
Key distribution: Zipfian
Run duration: 1 hour


An initial data set was first generated by running YCSB in its data generation mode. A HBase table was created with the table specifications above and pre-split. After all the data was inserted, the table was flushed, compacted and saved as a snapshot. This data set was used to prime the table for each run. Read-only and scan workloads were used to evaluate performance; this eliminates effects such as memstore flushes and compactions. One run with a long duration was carried out first to ensure the caches were warmed and that the runs yielded repeatable results.

For the purpose of these tests, the throttle was applied to the workload emanating from one user in a two-user scenario. There were four client machines used to impose identical read-only workloads. The client processes on two machines were run by the user “jenkins”, while those on the other two were run as a different user. The throttle was applied to the workload issued by this second user. There were two sets of runs, one with both users running read workloads and the second where the throttled user ran a scan workload. Typically, scans are long running and it can be desirable on occasion to de-prioritize them in favor of more real-time read or update workloads. In this case, the scan was for sets of 100 rows per YCSB operation.

For each run, the following steps were carried out:

  • Any existing YCSB-related table was dropped.

  • The initial data set was cloned from the snapshot.

  • The desired throttle setting was applied.

  • The desired workloads were imposed from the client machines.

  • Throughput and latency data was collected and is presented in the table below.

The throttle was applied at the start of the job (the command used was the first in the list shown in the “Enabling Request Throttling” section above). The hbase.quota.refresh.period property was set to under a minute so that the throttle took effect by the time test setup was finished.

The throttle option specifically tested here was the one to limit the number of requests (rather than the one to limit bandwidth).

Observations and Results

The throttling feature appears to work quite well. When applied interactively in the middle of a running workload, it goes into effect immediately after the the quota refresh period and can be observed clearly in the throughput numbers put out by YCSB while the test is progressing. The table below has performance data from test runs indicating the impact of the throttle. For each row, the throughput and latency numbers are also shown in separate columns, one set for the “throttled” user (indicated by “T” for throttled) and the other for the “non-throttled” user (represented by “U” for un-throttled).

Read + Read Workload

As can be seen, when the throttle pressure is increased (by reducing the permitted throughput for user “T” from 2500 req/sec to 500 req/sec, as shown in column 1), the total throughput (column 2) stays around the same.  In other words, the cluster resources get redirected to benefit the non-throttled user, with the feature consuming no significant overhead. One possible outlier is the case where the throttle parameter is at its most restrictive (500 req/sec), where the total throughput is about 10% less than the maximum cluster throughput.

Correspondingly, the latency for the non-throttled user improves while that for the throttled user degrades. This is shown in the last two columns in the table.

The charts above show that the change in throughput is linear with the amount of throttling, for both the throttled and non-throttled user. With regard to latency, the change is generally linear, until the throttle becomes very restrictive; in this case, latency for the throttled user degrades substantially.

One point that should be noted is that, while the throttle parameter in req/sec is indeed correlated to the actual restriction in throughput as reported by YCSB (ops/sec) as seen by the trend in column 4, the actual figures differ. As user “T”’s throughput is restricted down from 2500 to 500 req/sec, the observed throughput goes down from 2500 ops/sec to 1136 ops/sec. Therefore, users should calibrate the throttle to their workload to determine the appropriate figure to use (either req/sec or MB/sec) in their case.

Read + Scan Workload

With the read/scan workload, similar results are observed as in the read/read workload. As the extent of throttling is increased for the long-running scan workload, the observed throughput decreases and latency increases. Conversely, the read workload benefits. displaying better throughput and improved latency. Again, the specific numeric value used to specify the throttle needs to be calibrated to the workload at hand. Since scans break down into a large number of read requests, the throttle parameter needs to be much higher than in the case with the read workload. Shown above is a log-linear chart of the impact on throughput of the two workloads when the extent of throttling is adjusted.


HBase request throttling is an effective and useful technique to handle multiple workloads, or even multi-tenant workloads on an HBase cluster. A cluster administrator can choose to throttle long-running or lower-priority workloads, knowing that RegionServer resources will get re-directed to the other workloads, without this feature imposing a significant overhead. By calibrating the throttle to the cluster and the workload, the desired performance can be achieved on clusters running multiple concurrent workloads.

Govind Kamat is a Performance Engineer at Cloudera, and an HBase contributor.

Categories: Hadoop

Impala Needs Your Contributions

Cloudera Blog - Fri, 05/22/2015 - 14:59

Your contributions, and a vibrant developer community, are important for Impala’s users. Read below to learn how to get involved.

From the moment that Cloudera announced it at Strata New York in 2012, Impala has been an 100% Apache-licensed open source project. All of Impala’s source code is available on GitHub—where nearly 500 users have forked the project for their own use—and we follow the same model as every other platform project at Cloudera: code changes are committed “upstream” first, and are then selected and backported to our release branches for CDH releases.

However, Impala was (and still is!) moving at an extraordinary pace, and we focused on meeting the feature and stability needs of our user community at the expense of making it easy for the developer community to contribute fixes and new features. After the release of Impala 2.0, which was a major milestone for the project, we’ve been working on incubating a developer community. Now we’re ready to more widely publicize our improvements, and invite more developers to come and build the state-of-the-art SQL query engine on Hadoop with us.

Since January 2015, we have moved more and more of Impala’s development out into the open, where members of the community can watch the progress of the JIRA issues that are important to them (including through the code review process), participate in technical discussion on the Impala developer mailing list, and can submit new patches for inclusion in Impala.

We are committed to making it easier for our developer community to work with Impala’s source code. We’ve recently released a developer environment Docker image that contains everything you need to start building and working with Impala. We have also posted the full set of generated developer documentation to help navigate the hundreds of classes and thousands of lines of code that make up Impala’s internals. We’re in the process of adding a Jenkins-based test server to make it easier for contributors to validate their patches—watch this space! Finally, we’ve gathered together some of the published papers on Impala, and the presentations that we’ve been giving at meetups and conferences around the world.

If you’d like to get involved in contributing to Impala, start with the presentation we gave at the March 2015 Impala Meetup in Cloudera’s Palo Alto office, which provides an overview of our contribution process:

Then check out our contribution guidelines, join the mailing list and get started! We’re excited to work with you.

Henry Robinson is a Software Engineer at Cloudera on the Impala team.

Marcel Kornacker is a tech lead at Cloudera and the founder and architect of Impala.

Categories: Hadoop

Graduating Apache Parquet

Cloudera Blog - Thu, 05/21/2015 - 20:41

The following post from Julien Le Dem, a tech lead at Twitter, originally appeared in the Twitter Engineering Blog. We bring it to you here for your convenience.

ASF, the Apache Software Foundation, recently announced the graduation of Apache Parquet, a columnar storage format for the Apache Hadoop ecosystem. At Twitter, we’re excited to be a founding member of the project.

Apache Parquet is built to work across programming languages, processing frameworks, data models and query engines including Apache Hive, Apache Drill, Impala and Presto.

At Twitter, Parquet has helped us scale by reducing storage requirements by at least one-third on large datasets, as well as improving scan and deserialization time. This has translated into hardware savings and reduced latency for accessing data. Furthermore, Parquet’s integration with so many tools creates opportunities and flexibility for query engines to help optimize performance.

Since we announced Parquet, these open source communities have integrated the project: Apache Crunch, Apache Drill, Apache Hive, Apache Pig, Apache Spark, Apache Tajo, Kite, Impala, Presto and Scalding.

What’s New?

The Parquet community just released version 1.7.0 with several new features and bug fixes. This update includes:

  • A new filter API for Java and DSL for Scala that uses statistics metadata to filter large batches of records without reading them
  • A memory manager that will scale down memory consumption to help avoid crashes
  • Improved MR and Spark job startup time
  • Better support for evolving schemas with type promotion when reading
  • More logical types for storing dates, times, and more
  • Improved compatibility between Hive, Avro and other object models

As usual, this release also includes many other bug fixes. We’d like to thank the community for reporting these and contributing fixes. Parquet 1.7.0 is now available for download.

Future Work

Although Parquet has graduated, there’s still plenty to do, and the Parquet community is planning some major changes to enable even more improvements.

First is updating the internals to work with the zero-copy read path in Hadoop, making reads even faster by not copying data into Parquet’s memory space. This will also enable Parquet to take advantage of HDFS read caching and should pave the way for significant performance improvements.

After moving to zero-copy reads, we plan to add a vectorized read API that will enable processing engines like Drill, Presto and Hive to save time by processing column data in batches before reconstructing records in memory, if at all.

We also plan to add more advanced statistics based record filtering to Parquet. Statistics based record filtering allows us to drop entire batches of data with only reading a small amount of metadata). For example, we’ll take advantage of dictionary encoded columns and apply filters to batches of data by examining a column’s dictionary, and in cases where no dictionary is available, we plan to store a bloom filter in the metadata.

Aside from performance, we’re working on adding POJO support in the Parquet Avro object model that works the same way Avro handles POJOs in avro-reflect. This will make it easier to use existing Java classes that aren’t based on one of the already-supported object models and enable applications that rely on avro-reflect to use Parquet as their data format.

Getting Involved

Parquet is an independent open source project at the ASF. To get involved, join the community mailing lists and any of the community hangouts the project holds. We welcome everyone to participate to make Parquet better and look forward to working with you in the open.


First we would like to thank Ryan Blue from Cloudera for helping craft parts of this post and the wider Parquet community for contributing to the project. Specifically, contributors from a number of organizations (Twitter, Netflix, Criteo, MapR, Stripe, Cloudera, AmpLab) contributed to this release. We’d also like to thank these people: Daniel Weeks, Zhenxiao Luo, Nezih Yigitbasi, Tongjie Chen, Mickael Lacour, Jacques Nadeau, Jason Altekruse, Parth Chandra, Colin Marc (@colinmarc), Avi Bryant (@avibryant), Ryan Blue, Marcel Kornacker, Nong Li (@nongli), Tom White (@tom_e_white), Sergio Pena, Matt Massie (@matt_massie), Tianshuo Deng, Julien Le Dem, Alex Levenson, Chris Aniszczyk and Lukas Nalezenec.

Categories: Hadoop

How-to: Read FIX Messages Using Apache Hive and Impala

Cloudera Blog - Thu, 05/21/2015 - 16:42

Learn how to read FIX message files directly with Hive, create a view to simplify user queries, and use a flattened Apache Parquet table to enable fast user queries with Impala.

The Financial Information eXchange (FIX) protocol is used widely by the financial services industry to communicate various trading-related activities. Each FIX message is a record that represents an action by a financial party, such as a new order or an execution report. As the raw point of truth for much of the trading activity of a financial firm, it makes sense that FIX messages are an obvious data source for analytics and reporting in Apache Hadoop.

Apache Hive, the versatile SQL engine that runs on Hadoop, can read FIX messages without any code or pre-processing. Thus users can easily load FIX message files into Hadoop and immediately start executing queries. Furthermore, with a simple data set conversion, users can also run their queries against Impala, the open source analytic database for Hadoop. Cloudera’s open platform lets you do both types of queries inside the Hue GUI so that a command-line session is not required. This blog post will show you how.

If you do not already have a Hadoop cluster but would like to run these steps, the Cloudera QuickStart VM allows you to run a pre-made single-node Hadoop cluster on your own computer. Or, use Cloudera Live to access a full Cloudera Enterprise cluster for a free two-week period.

Loading Data and Creating Tables

First, let’s review the structure of a FIX message.

FIX messages are formatted as a set of key-value pairs on a single line. Each key-value pair contains an attribute (known as a tag) of the message, such as the message type or order identifier. The key-value pairs are separated by the ASCII 001 “start of heading” character (commonly visualized as “^A”), and the keys and values are separated by the equal-sign character (=).

For example:


Armed with one or more files containing FIX messages, one per line, you can push them into Hadoop by uploading them into an HDFS directory using the Hue File Browser:

You can create a Hive table over the top of the FIX message files using a Hive CREATE TABLE statement.

Hive is sufficiently flexible to allow expression of each record as a collection of key-value pairs, using the MAP data type. Thus, you can simply reference the key for a message and Hive will return the value. Although key in the FIX message format is an integer (INT), the values can have varying data types, so you would use strings (STRING) to capture any value.

Delimiters are specified using the TERMINATED BY clause. You express the collection item delimiter as ASCII 001 using ‘1’, and the map key delimiter as the equals sign using ‘=’. The default field delimiter is ASCII 001, so you change it to the placeholder ASCII 002 to avoid ambiguity. You are only defining one field in each FIX record—the single MAP collection of key-value pairs—so the field delimiter will not be found in the files.

An external Hive table is used and this maps the table to the location of the uploaded FIX files. Dropping an external table does not delete the underlying data.


This statement can be run in the Hue Hive Editor after replacing the location with the HDFS path of the uploaded FIX files.


With the data loaded and the table created, you can start to query the FIX records. Use the single ‘tag’ column to reference the various tags of the messages.

This feature is powerful because you can reference any tag identifier without the need to pre-define it. The downside, however, is that you need to recall tag numbers to query the data.

You can make querying easier by first creating a Hive view over the table that renames the tag references, and then reading from the view instead of the table. This approach may also be a good opportunity to cast specific tags to specific types so that users don’t have to do that themselves.

CREATE VIEW fix_view AS SELECT tag[38] AS OrderQty , tag[55] AS Symbol , tag[60] AS TransactTime FROM fix_map; SELECT OrderQty , Symbol , TransactTime FROM fix_view;

(Find a pre-made view of all known tags here. Use them all, or edit the list to include only the desired tags. Tags referenced but not found in the data will return as NULL.)

Optimizing with Impala

At this point, you’re in a good spot: You can query the raw FIX message files and reference tags using sensible field names. You could stop there and functionally have all that you need, but if you want to go a bit further, you can improve query performance by converting the data into a more read-optimized format.

This process involves two transformations:

  • Converting the data from records of collections of key-value pairs to records of plain columns. This approach allows you to use Impala instead of Hive for user queries—which is much faster, but in this case, does not support the MAP data type. This conversion will also speed up Hive queries by de-serializing the MAP key-value pairs in advance.
  • Converting the data from plain text files into Apache Parquet files, which is a column-oriented binary format that considerably improves query performance over delimited text files.

You can use Hive to do both of these tasks in a single query, and most of the work has already been done by using the view created earlier:

CREATE TABLE fix STORED AS PARQUET AS SELECT tag[38] AS OrderQty , tag[55] AS Symbol , tag[60] AS TransactTime FROM fix_map;

Switching to the Hue Impala Editor, let Impala know about the new table via INVALIDATE METADATA fix; and query it like you did with the view. However, as “symbol” is a reserved word in Impala, you need to wrap it in back-ticks:

SELECT OrderQty , `Symbol` , TransactTime FROM fix;


In this post, you’ve learned how easy it is use Hadoop to query FIX message datasets with SQL, and that Hive is a good choice for preparing and optimizing data while Impala is a good choice for running analytic queries. This same pattern can be applied to a wide variety of other datasets, which can all be integrated together in Hadoop to discover new insights about your enterprise. Happy querying!

Jeremy Beard is a Senior Solutions Architect at Cloudera.

Categories: Hadoop

How-to: Get Started with CDH on OpenStack with Sahara

Cloudera Blog - Mon, 05/18/2015 - 16:03

The recent OpenStack Kilo release adds many features to the Sahara project, which provides a simple means of provisioning an Apache Hadoop (or Spark) cluster on top of OpenStack. This how-to, from Intel Software Engineer Wei Ting Chen, explains how to use the Sahara CDH plugin with this new release.


This how-to assumes that OpenStack is already installed. If not, we recommend using Devstack to build a test OpenStack environment in a short time. (Note: Devstack is not recommended for use in a production environment. For production deployments, refer to the OpenStack Installation Guide.)

Sahara UI

You can use Horizon as a web-based interface to manage your Sahara environment. Please login to the Horizon website and look for the “Data Processing” link under the “Project” tab. You can also use Sahara CLI to launch the commands in your OpenStack environment.

Login page

“Data Processing” link

Sahara CDH Plugin Configuration

Confirm the CDH plugin has already been enabled in the Sahara configuration. (Currently CDH 5.0 and CDH 5.3 are supported; CDH 5.4 will be supported in an upcoming Kilo release.) In the Kilo release, Sahara enables the CDH plugin by default. If you find there is no CDH plugin in your OpenStack environment, confirm the CDH plugins (vanilla, hdp, cdh, spark, fake) are installed in the Sahara configuration file (/etc/sahara/sahara.conf).

You can also confirm the plugin is installed via Horizon. Below is a screenshot of the Horizon page where you can confirm installed plugins.

Data processing plugins

Building a CDH Image

Before you start to use Sahara, you need to prepare one image with Cloudera packages. The open source project sahara-image-elements is a script that creates Sahara images. Below are the steps for building the image via this project.

Step 1: Checkout the project from https://github.com/openstack/sahara-image-elements

# git clone https://github.com/openstack/sahara-image-elements

Step 2: Run this command and add some arguments to specify what OS distribution you would like to build. Here we use CentOS and CDH 5.3 as an example.

# sudo DIB_CDH_VERSION=”5.3” bash sahara-image-elements/diskimage-create/diskimage-create.sh -p cloudera -i centos

This command will download a base image and start to pre-install all the required package including Cloudera Manager and the CDH package into the image. The image creation requires internet connectivity. The entire process may take a long time depending on your internet speed and machines.

Currently Sahara can support building “Ubuntu” and “CentOS” images. The Kilo release has been tested with CDH “5.0”, “5.3”, abd “5.4”. Cloudera Manager package will be downloaded into the image from the Cloudera website.

For more information, please refer to https://github.com/openstack/sahara-image-elements.

Uploading the Image with Glance

You need to upload and register the image with the Glance service in your OpenStack environment. These steps use Horizon to upload the image you created from the previous step. You can also use glance CLI to upload your own image.

Step 1: Click “Images” in “System.”
Step 2: Click “Create Image.”
Step 3: Click “Image File” from “Image Source.”
Step 4: Click “Choose File” from Image File and select the image you would like to upload from file explorer.
Step 5: Fill in all the required fields; for our example, we use QCOW2 as the image format. You can also select other options as per your needs.

Creating an image

Adding Tags to the Glance Image with Sahara

Next, you need to register the Glance image to Sahara and add tags to help Sahara recognize the images.

Step 1: Select “Image Registry.”
Step 2: Click “Register Image” and select an image to add username and tags. (The username is a user account creating from sahara-image-elements. By default, the ubuntu user is “ubuntu” and the centos user is “cloud-user.” As for tags, add the related plugin and version tags to the image. It supports adding of multiple version tags in one image.)

Registering an image

Provisioning a Cluster

Sahara provides two major features. It allows you to:

  • Provision a cluster in your Openstack environment. You can use the Sahara UI to quickly select the services you want to launch in your cluster; there are many settings you can configure during the cluster creation, such as for anti-affinity.
  • Submit a job to the created cluster. The purpose is to help data scientists or application developers focus on development and ignore the provisioning process.
Provision a cluster using Guides

Using Guides can help you following the steps to launch a cluster.

Step 1. Select “Cluster Creation Guide” from “Data Processing Guides.”

Data processing Guides

Step 2: Select Plugin “Cloudera Plugin” and Version “5.3.”

Selecting plugin version

Step 3: Create templates and launch a cluster. After launching a cluster, you can check the “Cluster” tab to see the launched cluster.


Provision a cluster manually

You can also create your own cluster using the node group template:

Step 1: Create node group templates. A node group template is a basic template to provision a node. You can use this template to create your own custom node and select which service you would like to run. There are several settings you need to assign:

  • OpenStack Flavor: select a flavor size for this template.
  • Availability Zone: select an availability zone for this template.
  • Storage Location:  select a storage location to put HDFS (currently supports ephemeral and cinder).
  • Select Floating IP pool: Optional; select a floating ip for this template.
  • Select Process: CDH Plugin support most services in CDH 5.3/CDH 5.4. After selecting a process, you can find the related tab on top of screen and you can change the default parameter for this process.

Node group templates

Creating node group template (1)

Creating node group template (2)

Step 2: Create cluster templates. After creating node group template, you need to create a cluster template by selecting multiple node group template. Go to the “Node Groups” tab and select how many node group templates you would like to run in this cluster template.

Cluster templates

Creating a cluster template

Step 3: Launch Cluster using Cluster Templates. Choose a cluster template and wait for the cluster status to become “Active.”

How to Run a Job Run a job using Guides

Step 1: Select “Job Execution Guide” in “Guides.”
Step 2: Follow the steps to create the required templates.

Guided job execution

Run a job manually

Step 1: Create a Data Source. Select a data source type to put your input and output file. Currently Sahara supports internal/external HDFS and Swift for the CDH plugin. If you select Swift, please provide your account information.

Creating a data source

Step 2: Create a Job Binary. Write your own binary and upload it via the Sahara UI. The supported storage type include internal databases and Swift.

Creating a job binary

Step 3: Create job templates. Create a job template using the job binary you created in previous step. Choose which job type (pig, hive, mapreduce, and java) you would like to run and choose the binary you uploaded previously; you can also add the related libraries in this page.

Creating a job template

Step 4: Run a job. Select a job template you would like to run and check job status on the Jobs page. (If the job finishes without problems, the status will be “Success.” If a job fails, it will show “Killed.”) If you would like to get more information, you can check the cluster details to get Cloudera Manager address by clicking the cluster name.


Finally, here are answers to some questions you may have.

  1. Is Apache Sentry integratIon available?
    Yes. However, although a Sentry service is on the service list, currently it needs to be configured manually. This is a limitation in the current Kilo release.
  2. Can I access Cloudera Manager directly?
    Yes, you can. Just click “Cluster Name” on the Clusters page and you can see the Cloudera Manager address and the password for the user – admin. For each cluster, the CDH plugin will create an individual password for security.
  3. Why is my Cluster status always “Waiting?”
    Please confirm al instances are reachable by using ssh and ping. If not, please check your security group settings.
  4. How do I use external HDFS as a data source?
    You need to setup this up manually and make sure there are no authentication issues between computing nodes and external HDFS.
  5. Does the CDH plugin support Hadoop’s data locality feature?
    No, currently the plugin does not support data locality.
  6. What if I create an invalid setting for the node group template?
    Before launching a cluster, there will be a validation warning to confirm your cluster template is valid. If you create an invalid cluster template, you will receive an invalid message when you are launching a cluster using this template.
  7. Is there HA support for CDH plugin?
    No, currently there is no HA support for a virtual cluster.
  8. How to define a CDH template?
    There is a validation program when you launch a cluster template to verify your template is workable. For more detail about the validation rule, see these docs. And you can follow the validation rule to design your cluster template.
  9. Can I configure the details for every services?
    Yes, you can. Please check the sub-tab of process; the sub-tab will pop-up when you select the process in the node group template.
  10. Can I scale up/down a running cluster?
    Yes, currently you may scale up/down a Node Manager or Data Node.


Categories: Hadoop

Scan Improvements in Apache HBase 1.1.0

Cloudera Blog - Fri, 05/15/2015 - 16:02

The following post, from Cloudera intern Jonathan Lawlor, originally appeared in the Apache Software Foundation’s blog.

Over the past few months there have a been a variety of nice changes made to scanners in Apache HBase. This post focuses on two such changes, namely RPC chunking (HBASE-11544) and scanner heartbeat messages (HBASE-13090). Both of these changes address long standing issues in the client-server scan protocol. Specifically, RPC chunking deals with how a server handles the scanning of very large rows and scanner heartbeat messages allow scan operations to progress even when aggressive server-side filtering makes infrequent result returns.


In order to discuss these issues, let’s first gain a general understanding of how scans currently work in HBase.

From an application’s point of view, a ResultScanner is the client side source of all of the Results that an application asked for. When a client opens a scan, it’s a ResultScanner that is returned and it is against this object that the client invokes next to fetch more data. ResultScanners handle all communication with the RegionServers involved in a scan and the ResultScanner decides which Results to make visible to the application layer. While there are various implementations of the ResultScanner interface, all implementations use basically the same protocol to communicate with the server.

In order to retrieve Results from the server, the ResultScanner will issue ScanRequests via RPC’s to the different RegionServers involved in a scan. A client configures a ScanRequest by passing an appropriately set Scan instance when opening the scan setting start/stop rows, caching limits, the maximum result size limit, and the filters to apply.

On the server side, there are three main components involved in constructing the ScanResponse that will be sent back to the client in answer to a ScanRequest:


The RSRpcService is a service that lives on RegionServers that can respond to incoming RPC requests, such as ScanRequests. During a scan, the RSRpcServices is the server side component that is responsible for constructing the ScanResponse that will be sent back to the client. The RSRpcServices continues to scan in order to accumulate Results until the region is exhausted, the table is exhausted, or a scan limit is reached (such as the caching limit or max result size limit). In order to retrieve these Results, the RSRpcServices must talk to a RegionScanner


The RegionScanner is the server side component responsible for scanning the different rows in the region. In order to scan through the rows in the region, the RegionScanner will talk to a one or more different instances of StoreScanners (one per column family in the row). If the row passes all filtering criteria, the RegionScanner will return the Cells for that row to the RSRpcServices so that they can be used to form a Result to include in the ScanResponse.


The StoreScanner is the server side component responsible for scanning through the Cells in each column family.

When the client (i.e. ResultScanner) receives the ScanResponse back from the server, it can then decide whether or not scanning should continue. Since the client and the server communicate back and forth in chunks of Results, the client-side ResultScanner will cache all the Results it receives from the server. This allows the application to perform scans faster than the case where an RPC is required for each Result that the application sees.

RPC Chunking Why is it necessary?

Currently, the server sends back whole rows to the client (each Result contains all of the cells for that row). The max result size limit is only applied at row boundaries. After each full row is scanned, the size limit will be checked.

The problem with this approach is that it does not provide a granular enough restriction. Consider, for example, the scenario where each row being scanned is 100MB. This means that 100MB worth of data will be read between checks for the size limit. So, even in the case that the client has specified that the size limit is 1MB, 100MB worth of data will be read and then returned to the client.

This approach for respecting the size limit is problematic. First of all, it means that it is possible for the server to run out of memory and crash in the case that it must scan a large row. At the application level, you simply want to perform a scan without having to worry about crashing the RegionServer.

This scenario is also problematic because it means that we do not have fine-grained control over the network resources. The most efficient use of the network is to have the client and server talk back and forth in fixed-sized chunks.

Finally, this approach for respecting the size limit is a problem because it can lead to large, erratic allocations server-side playing havoc with GC.

Goal of the RPC Chunking solution

The goal of the RPC chunking solution was to:

  • Create a workflow that is more ‘regular’, and less likely to cause RegionServer distress
  • Use the network more efficiently
  • Avoid large garbage collections caused by allocating large blocks

Furthermore, we wanted the RPC chunking mechanism to be invisible to the application. The communication between the client and the server should not be a concern of the application. All the application wants is a Result for their scan. How that Result is retrieved is completely up to the protocol used between the client and the server.

Implementation Details

The first step in implementing this proposed RPC chunking method was to move the max result size limit to a place where it could be respected at a more granular level than on row boundaries. Thus, the max result size limit was moved down to the cell level within StoreScanner. This meant that now the max result size limit could be checked at the cell boundaries, and if in excess, the scan can return early.

The next step was to consider what would occur if the size limit was reached in the middle of a row. In this scenario, the Result sent back to the client will be marked as a “partial” Result. By marking the Result with this flag, the server communicates to the client that the Result is not a complete view of the row. Further Scan RPC’s must be made in order to retrieve the outstanding parts of the row before it can be presented to the application. This allows the server to send back partial Results to the client and then the client can handle combining these partials into “whole” row Results. This relieves the server of the burden of having to read the entire row at a time.

Finally, these changes were performed in a backwards compatible manner. The client must indicate in its ScanRequest that partial Results are supported. If that flag is not seen server side, the server will not send back partial results. Note that the configuration hbase.client.scanner.max.result.size can be used to change the default chunk size. By default this value is 2MB in HBase 1.1+.

An option (Scan#setAllowPartialResults) was also added so that an application can ask to see partial results as they are returned rather than wait on the aggregation of complete rows.

A consistent view on the row is maintained even though a row is the result of multiple RPC partials because the running context server-side keeps account of the outstanding mvcc read point and will not include in results Cells written later.

Note that this feature will be available starting in HBase 1.1. All 1.1+ clients will chunk their responses.

Jonathan Lawlor, a computer science student at University of Waterloo and HBase contributor, recently completed an internship at Cloudera.

Categories: Hadoop

Working with Apache Spark: Or, How I Learned to Stop Worrying and Love the Shuffle

Cloudera Blog - Thu, 05/14/2015 - 16:16

Our thanks to Ilya Ganelin, Senior Data Engineer at Capital One Labs, for the guest post below about his hard-earned lessons from using Spark.

I started using Apache Spark in late 2014, learning it at the same time as I learned Scala, so I had to wrap my head around the various complexities of a new language as well as a new computational framework. This process was a great in-depth introduction to the world of Big Data (I previously worked as an electrical engineer for Boeing), and I very quickly found myself deep in the guts of Spark. The hands-on experience paid off; I now feel extremely comfortable with Spark as my go-to tool for a wide variety of data analytics tasks, but my journey here was no cakewalk.

Capital One’s original use case for Spark was to surface product recommendations for a set of 25 million users and 10 million products, one of the largest datasets available for this type of modeling. Moreover, we had the goal of considering all possible products, specifically to capture the “long tail” (AKA less frequently purchased items). That problem is even harder, since to generate all possible pairings of user and products, you’d have 250e12 combinations—which is more data than you can store in memory, or even on disk. Not only were we ingesting more data than Spark could readily handle, but we also had to step away from the standard use case for Spark (batch processing with RDDs) and actually decompose the process of generating recommendations into an iterative operation.

Learning how to properly configure Spark, and use its powerful API in a way that didn’t cause things to break, taught me a lot about its internals. I also learned that at the time, there really wasn’t a consolidated resource that explained how those pieces fit together. The end goal of Spark is to abstract away those internals so the end-user doesn’t have to worry about them, but at the time I was using it (and to some degree today), to write efficient and functional Spark code you need to know what’s going on under the hood. This blog post is intended to reveal just that: to teach the curious reader what’s happening, and to highlight some simple and tangible lessons for writing better Spark programs.

Note: this post is not intended as a ground-zero introduction. Rather, the reader should have some familiarity with Spark’s execution model and the basics of Spark’s API.

The Pieces

First, let’s review the key players in the Spark infrastructure. I won’t touch on all the components, but there are a few basics that I’ll cover. These are partitioning, caching, serialization, and the shuffle operation.


Spark’s basic abstraction is the Resilient Distributed Dataset, or RDD. The RDD is how Spark simplifies complex operations like join or groupBy and hides the fact that under the hood, you’re dealing with fragmented data. That fragmentation is what enables Spark to execute in parallel, and the level of fragmentation is a function of the number of partitions of your RDD. The number of partitions is important because a stage in Spark will operate on one partition at a time (and load the data in that partition into memory). Consequently, if you have fewer partitions than active stages, you will wind up under-utilizing your cluster. Furthermore, since with fewer partitions there’s more data in each partition, you increase the memory pressure on your program. On the flip side, with too many partitions, your performance may degrade as you take a greater hit from network and disk I/O. Ultimately this concept ties into Spark’s notion of parallelism and how you can tune it (see the discussion of tuning parallelism here) to optimize performance.


Spark is a big deal for two main reasons: first, as already mentioned, it has a really simple and useful API for complex processes. The other big thing is that unlike your standard MapReduce program, Spark lets you cache intermediate results in memory. Caching can dramatically improve performance if you have data structures that are used frequently, such as a lookup table or a matrix of scores in a machine-learning algorithm. Caching can also introduce problems since it will often require huge chunks of memory; tuning this process is its own challenge, but doing so can increase performance by several orders of magnitude.


In distributed computing, you generally want to avoid writing data back and forth because it’s expensive. Instead, the common paradigm is to bring your code to your data. This is why many frameworks are based on the JVM, which lets you execute code on the same machine as the data. Serialization is the process of translating this code into an ideally compressed format for efficient transfer over the network. By default, Spark uses the standard Java serializer. However, you can get much faster and more memory-efficient serialization using Kryo serialization. Switching to Kryo can reduce memory pressure on your cluster and improve stability.


Even though moving data is expensive, sometimes it’s necessary. For example, certain operations need to consolidate data on a single node so that it can be co-located in memory, such as when you want to perform a reduce operation across all values associated with a particular key of a key-value RDD (reduceByKey()). This expensive reorganization of data is known as the shuffle. The shuffle involves serialization as well as Akka, Spark’s internal messaging system, thereby consuming disk and network I/O while increasing memory pressure from garbage collection. Improperly configured Akka settings or serialization settings can cause problems depending on the size of your data. There is an excellent write-up of what happens during a shuffle in this Cloudera Engineering blog post.


Next, let’s delve into how these components all come together when you write a Spark program and, specifically, what lessons I’ve learned about tying all these pieces together.

Lesson 1: Spark Gorges on Memory

As I mentioned previously, part of Spark’s power is its ability to cache things in memory. The downside of this amazing utility is that when you use it, Spark transforms into a total memory hog. First, the JVM and YARN (if you’re using it) consume a significant amount of memory, leaving less than you expect for data movement and caching. Next, there’s metadata that accumulates on the driver as a byproduct of shuffle operations and becomes particularly problematic during long-running jobs (multi-day). Finally, Java or Scala classes may introduce hidden overhead in your RDDs. A 10-character Java string may actually consume as much as 60 bytes! To pour salt in the wound, actually tracking down the source of a problem can be nearly impossible since a Spark program may have logs distributed across the cluster, have hundreds of tasks executing per second, and errors may not always propagate all the way up the stack when exceptions are thrown.

Thus, the first thing to do is to tame this unruly beast. For starters, it’s critical to partition wisely, to manage memory pressure as well as to ensure complete resource utilization. Next, you must always know your data—size, types, and how it’s distributed. This last bit is important since otherwise you may wind up with skewed distribution of data across partitions. A simple solution for this last problem is to use a custom partitioner. Last, as mentioned above, Kryo serialization is faster and more efficient.

To deal with the issue of accumulating metadata, you have two options. First, you can set the spark.cleaner.ttl parameter to trigger automatic cleanups. However, this will also wipe out any persisted RDDs and I found that it caused problems when trying to subsequently interact with HDFS. The other solution, which I ended up implementing in my case, is to simply split long-running jobs into batches and write intermediate results to disk. This way, you have a fresh environment for every batch and don’t have to worry about metadata build-up.

Lesson 2: Avoid Data Movement

In general, I found that avoiding shuffles and minimizing data transfers helped me write programs that ran faster and executed more reliability. Keep in mind: there are occasional cases when having an extra shuffle can help, such as when you have data that can’t be automatically partitioned into many partitions (see “When More Shuffles are Better” here.)

So, how does one avoid shipping data? The obvious answer is to avoid operations that trigger shuffles like repartition and coalesce, ByKey operations (except forcounting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Spark also provides two mechanisms in particular that help us here. Broadcast variables are read-only variables that are cached in-memory locally on each machine and eliminate the need to ship copies of it with every task. Using broadcast variables can also let you do efficient joins between large and small RDDs or store a lookup table in memory that provides more efficient retrieval than doing an RDD lookup().

Accumulators are a way to efficiently update a variable in parallel during execution. Accumulators differ from broadcast variables in that they may only be read from on the driver process, but they allow Spark programs to efficiently aggregate results such counters, sums, or generated lists. An important note about Accumulators is that they’re not limited to basic types; you can accumulate any Accumulable classes.

Lesson 3: Avoiding Data Movement is Hard

The challenge of using the above mechanisms is that, for example, to broadcast an RDD you need to first collect() it on the driver node. To accumulate the results of distributed execution you need to serialize that data back to the driver and aggregate it there. The upshot of this is that all of a sudden you’re increasing the memory pressure on the driver. Between collected RDDs, the persistent metadata discussed previously, and Accumulators, you may quickly run out of memory on the driver. You can, of course, increase the amount of memory allocated by setting spark.driver.memory, but that only works to a degree.

Above, I mentioned a few things: how a smaller number of partitions increases the size footprint of each partition, and how Spark uses Akka for messaging. If your 2GB RDD is only partitioned into 20 partitions, to serialize the data from one partition to another node, you’ll need to ship a 100MB chunk of data from each partition via Akka. But, by default, Akka’s buffer is only 10 MB! One can get around this by setting akka.frameSize, but this is another example of needing to fundamentally understand your data and how all these moving pieces come together.

Lesson 4: Speed!

The first three lessons deal primarily with stability. Next, I want to briefly describe how I got dramatic performance gains.

The first is fairly obvious: liberal use of caching. Of course, you don’t have infinite memory so you must cache wisely, but in general, if you use some data twice, cache it. Also, unused RDDs that go out of scope will be automatically un-persisted but they may be explicitly released with unpersist().

Second, I found broadcast variables extremely useful. I use them regularly for large maps and lookup tables. They have a significant advantage over an RDD that is cached in memory since a lookup in an RDD, even one cached in memory, will still be O(m), where m is the length of data in a single partition. In contrast, a broadcasted hash map will have a lookup of O(1).

Finally, there are times when I had to help Spark parallelize its execution. In my case, I needed to perform a per-key operation between two RDDs—that is, select a key-value pair from the first and perform some computation with that data against the second RDD. In a naive implementation, Spark would process these keys one at a time, so I was of course getting terrible resource utilization on my cluster. I’d have only a few tasks running at a time and most of my executors weren’t used. Because I wasn’t using the RDD API to run this code, I couldn’t benefit from Spark’s built-in parallelism. The ultimate solution was simple: create a thread pool on the driver that would process multiple keys at the same time. Each of the threads would generate tasks that were submitted to YARN and now I could easily ramp up my CPU utilization.

The final lesson here is that Spark can only help you so long as you help it to do so.


Ideally, the above concept breakdown and lessons (coupled with the provided documentation links) help clarify some of Spark’s underlying complexity. I’ve made liberal citations to the sources that helped me most when it came to understanding all these various mechanisms, as well as those that guided me through learning how to configure and use Spark’s more advanced features. I simply hope you find them as useful as I have!

Ilya is a roboticist turned data scientist. After a few years at the University of Michigan building self-discovering robots and another few years working on core DSP software with cell phones and radios at Boeing, he landed in the world of big data at Capital One Labs. He’s an active contributor to the core components of Spark with the goal of learning what it takes to build a next-generation distributed computing platform.

Categories: Hadoop

New in CDH 5.4: Hot-Swapping of HDFS DataNode Drives

Cloudera Blog - Mon, 05/11/2015 - 16:18

This new feature gives Hadoop admins the commonplace ability to replace failed DataNode drives without unscheduled downtime.

Hot swapping—the process of replacing system components without shutting down the system—is a common and important operation in modern, production-ready systems. Because disk failures are common in data centers, the ability to hot-swap hard drives is a supported feature in hardware and server operating systems such as Linux and Windows Server, and sysadmins routinely upgrade servers or replace a faulty components without interrupting business-critical services.

Even so, historically, decommissioning an individual drive on an HDFS DataNode has not been possible in Apache Hadoop. Instead, to replace a bad drive or upgrade it with a larger one, the user had to decommission the entire DataNode. That process could take hours to complete, significantly affecting system availability and operational effectiveness.

Happily, in Apache Hadoop 2.6, HDFS has added support for hot-swapping disks on a DataNode; this feature was developed by Cloudera committers under the umbrella JIRA HDFS-1362. And starting in release 5.4.0, DataNode hot-swapping is supported in Cloudera Enterprise.

To ensure that the disk changes are persistent between DataNode restarts, the hot-swap drive feature is performed using the Reconfigurable framework introduced in HADOOP-7001. In summary, the reconfigurable framework enables HDFS daemons changing configurations without restarting the daemon. So adding and/or removing disks on a DataNode involves two steps:

  1. Modifying the dfs.datanode.data.dir property in the DataNode configuration to reflect the disk changes
  2. Asking the DataNode to reload its configuration and apply the configuration changes

It’s worth mentioning that when adding new disks, if the data directory does not exist, the DataNode must have write access to create the directory. Moreover, because we anticipate that the most common use case for the hot-swap disk feature is to replace bad disks, the hot-swapping procedure does not attempt to move the blocks to other DataNodes, as is done when decommissioning a node. That being said, it is advisable to remove no more than N-1 disks simultaneously, where N is the system-wide replication factor. We also recommend running “hdfs fsck” after swapping a disk out.

In the future, adding a -safe flag to the reconfig command will advise the DataNode to relocate the blocks before swapping out the volume.

How to Hot Swap

Assume that you have a DataNode (dn1.example.com), which currently manages two data volumes (/dfs/dn1,/dfs/dn2). In the event of a disk failure on the disk where /dfs/dn2 is located, the sysadmin will want to replace this bad drive with a new drive. Moreover, this new drive will be mounted on the same directory /dfs/dn2:

From a user’s perspective, there are two ways to achieve this task:

From the Command Line
  1. Remove /dfs/dn2 from the DataNode configuration file (i.e., hfds-site.xml).
    <pre class="code"> <property>   <name>dfs.datanode.data.dir</name>   <value>/dfs/dn1</value> </property>
  2. Ask the DataNode to reload its configurations and wait for this reconfiguration task to finish.
    $ hdfs dfsadmin -reconfig datanode dn1.example.com:50020 start # query the status of the reconfiguration task $ hdfs dfsadmin -reconfig datanode dn1.example.com:50020 status # Once the first line of the outputs shows as below, the reconfiguration task is completed. Reconfiguring status for DataNode[dn1.example.com:50020]: started at Tue Feb 10 15:09:52 PST 2015 and finished at Tue Feb 10 15:09:53 PST 2015. … SUCCESS: Change property dfs.datanode.data.dir      From: "file:///dfs/dn1,file:///dfs/dn2"      To: "file:///dfs/dn1" ...
  3. Unmount the disk from /dfs/dn2. Mount the new disk on /dfs/dn2.
  4. Add /dfs/dn2 back to the DataNode configuration file (hdfs-site.xml).
      <property>   <name>dfs.datanode.data.dir</name>   <value>/dfs/dn1,/dfs/dn2</value> </property>
  5. Run Step 2 again.
From Cloudera Manager

To make this process even easier, the DataNode Hot Swap Disk feature is supported in Cloudera Manager as of 5.4.0. To use it, follow these steps:

  1. Navigate to the Configuration page for the DataNode role in question.
  2. Edit the DataNode Data Directory configuration property: remove /dfs/dn2. Click “Save Changes.” Warning: Be sure to modify this property only for the DataNode role whose disk you are swapping. Do not modify the role group value for this property, as that will affect all DataNode roles in the group.

  3. Select “Refresh Data Directories” from the Actions menu in the upper right corner of the page (which should still be the configuration page for the DataNode role you are modifying).

  4. Confirm that the window that pops up refers to the correct DataNode role, and click Refresh Data Directories. This will run the reconfiguration task.

  5. Unmount the disk /dfs/dn2. Mount the new disk to /dfs/dn2.
  6. Reset the “DataNode Data Directory” property to add back the disk. Click “Save Changes,” as illustrated in Step 2.
  7. Repeat Steps 3 and 4.

To learn more details about hot swapping disks for DataNode, please see the official docs.


The DataNode Hot Swap Disk feature gives users a fine-grained tool for managing the health of their clusters. Moreover, Cloudera Manager offers the most convenient approach to monitor disk health and manage disk volumes as needed.

Lei Xu is a Software Engineer at Cloudera.

Categories: Hadoop

Apache Phoenix Joins Cloudera Labs

Cloudera Blog - Wed, 05/06/2015 - 20:23

We are happy to announce the inclusion of Apache Phoenix in Cloudera Labs.

Apache Phoenix is an efficient SQL skin for Apache HBase that has created a lot of buzz. Many companies are successfully using this technology, including Salesforce.com, where Phoenix first started.

With the news that Apache Phoenix integration with Cloudera’s platform has joined Cloudera Labs, let’s take a closer look at a few key questions surrounding Phoenix: What does it do? Why does anyone want to use it? How does it compare to existing solutions? Do the benefits justify replacing existing systems and infrastructure?

In this post, we’ll try to answers those questions by briefly introducing Phoenix and then discussing some of its unique features. I’ll also cover some use cases and compare Phoenix to existing solutions.

What is Apache Phoenix?

Phoenix adds SQL to HBase, the distributed, scalable, big data store built on Hadoop. Phoenix aims to ease HBase access by supporting SQL syntax and allowing inputs and outputs using standard JDBC APIs instead of HBase’s Java client APIs. It lets you perform all CRUD and DDL operations such as creating tables, inserting data, and querying data. SQL and JDBC reduce the amount of code users need to write, allow for performance optimizations that are transparent to the user, and opens the door to leverage and integrate lots of existing tooling.

Internally, Phoenix takes your SQL query, compiles it into a series of native HBase API calls, and pushes as much work as possible onto the cluster for parallel execution. It automatically creates a metadata repository that provides typed access to data stored in HBase tables. Phoenix’s direct use of the HBase API, along with coprocessors and custom filters, results in performance on the order of milliseconds for small queries, or seconds for tens of millions of rows.

Use Cases

Phoenix is good for fast HBase lookups. Its secondary indexing feature supports many SQL constructs and make lookup via non-primary key fields more efficient than full table scans. It simplifies the creation and management of typed row-centric data by providing composite row keys and by enforcing constraints on data when written using the Phoenix interfaces.

Phoenix provides a way to transparently salt the row key, which helps in avoiding the RegionServer hotspotting often caused by monotonically increasing rowkeys. It also provides multi-tenancy via a combination of multi-tenant tables and tenant-specific connections. With tenant-specific connections, tenants can only access data that belongs to them, and with multi-tenant tables, they can only see their own data in those tables and all data in regular tables.

Regardless of these helpful features, Phoenix is not a drop-in RDBMS replacement. There are some limitations:

  • Phoenix doesn’t support cross-row transactions yet.
  • Its query optimizer and join mechanisms are less sophisticated than most COTS DBMSs.
  • As secondary indexes are implemented using a separate index table, they can get out of sync with the primary table (although perhaps only for very short periods.) These indexes are therefore not fully-ACID compliant.
  • Multi-tenancy is constrained—internally, Phoenix uses a single HBase table.
Comparisons to Hive and Impala

The other well known SQL alternatives to Phoenix on top of HBase are Apache Hive and Impala. There is significant overlap in the functionality provided by these products. For example, all of them follow SQL-like syntax and provide a JDBC driver.

Unlike Impala and Hive, however, Phoenix is intended to operate exclusively on HBase data; its design and implementation are heavily customized to leverage HBase features including coprocessors and skip scans.

Some other considerations include:

  • The main goal of Phoenix is to provide a high-performance relational database layer over HBase for low-latency applications. Impala’s primary focus is to enable interactive exploration of large data sets by providing high-performance, low-latency SQL queries on data stored in popular Hadoop file formats. Hive is mainly concerned with providing data warehouse infrastructure, especially for long-running batch-oriented tasks.
  • Phoenix is a good choice, for example, in CRUD applications where you need the scalability of HBase along with the facility of SQL access. In contrast, Impala is a better option for strictly analytic workloads and Hive is well suited for batch-oriented tasks like ETL.
  • Phoenix is comparatively lightweight since it doesn’t need an additional server.
  • Phoenix supports advanced functionality like multiple secondary-index implementations optimized for different workloads, flashback queries, and so on. Neither Impala nor Hive have any provision for supporting secondary index lookups yet.

The following table summarizes what we’ve discussed so far:

Future Work

The Phoenix project is investigating integration with transaction managers such as Tephra (from Cask). It is also trying to incorporate query optimizations based on the size and cardinality of the data. Although Phoenix supports tracing now, more work is needed to round out its monitoring and management capabilities.


Phoenix offers some unique functionality for a certain set of HBase use cases. You can begin playing with Phoenix by following the installation instructions (you will need HBase 1.0, which ships as part of CDH 5.4), and you can view the source code here

As with everything else in Cloudera Labs, Phoenix integration is not supported yet and it’s for experimentation only. If you have any feedback or comments, let us know via the Cloudera Labs discussion forum.

Srikanth Srungarapu is a Software Engineer at Cloudera, and an HBase committer.

Categories: Hadoop

Sneak Preview: HBaseCon 2015 Use Cases Track

Cloudera Blog - Fri, 05/01/2015 - 16:22

This year’s HBaseCon Use Cases track includes war stories about some of the world’s best examples of running Apache HBase in production.

As a final sneak preview leading up to the show next week, in this post, I’ll give you a window into the HBaseCon 2015′s (May 7 in San Francisco) Use Cases track.

Thanks, Program Committee!

  • “HBase @ Flipboard”

    Sang Chi, Jason Culverhouse, Matt Blair (Flipboard)

    Flipboard services over 100 million users using heterogenous results including user generated content, interest profile, algorithmically generated content, social firehose, friends graph, ads, and web/rss crawlers. To personalize and serve these results in real time, Flipboard employs a variety of data models, access patterns and configuration. This talk will present how some of these strategies are implemented using HBase.

  • “Graph Processing of Stock Market Order Flow in HBase on AWS”

    Aaron Carreras (FINRA)

    In this session, we will briefly cover the FINRA use case and then dive into our approach with a particular focus on how we leverage HBase on AWS. Among the topics covered will be our use of HBase Bulk Loading and ExportSnapShots for backup. We will also cover some lessons learned and experiences of running a persistent HBase cluster on AWS.

  • “Running ML Infrastructure on HBase” 

    Andrey Gusev (Sift Science)

    Sift Science uses online, large-scale machine learning to detect fraud for thousands of sites and hundreds of millions of users in real-time. This talk describes how we leverage HBase to power an ML infrastructure including how we train and build models, store and update model parameters online, and provide real-time predictions. The central pieces of the machine learning infrastructure and the tradeoffs we made to maximize performance will also be covered.

  • “Industrial Internet Case Study using HBase and TSDB”

    Shyam Nath, Arnab Guin (GE)

    This case study involves analysis of high-volume, continuous time-series aviation data from jet engines that consist of temperature, pressure, vibration and related parameters from the on-board sensors, joined with well-characterized slowly changing engine asset configuration data and other enterprise data for continuous engine diagnostics and analytics. This data is ingested via distributed fabric comprising transient containers, message queues and a columnar, compressed storage leveraging OpenTSDB over Apache HBase.

  • “S2Graph: A Large-scale Graph Database with HBase”

    Doyung Yoon, Taejin Chin (DaumKakao)

    As the operator of the dominant messenger application in South Korea, KakaoTalk has more than 170 million users, and our ever-growing graph has more than 10B edges and 200M vertices. This scale presents several technical challenges for storing and querying the graph data, but we have resolved them by creating a new distributed graph database with HBase. Here you’ll learn the methodology and architecture we used to solve the problems, compare it another famous graph database, Titan, and explore the HBase issues we encountered.

  • “HBase @ CyberAgent”

    Toshihiro Suzuki, Hirotaka Kakishima (CyberAgent)

    CyberAgent is a leading Internet company in Japan focused on smartphone social communities and a game platform known as Ameba, which has 40M users. In this presentation, we will introduce how we use HBase for storing social graph data and as a basis for ad systems, user monitoring, log analysis, and recommendation systems.

  • “Blackbird Collections: In-situ Stream Processing in HBase”

    Ishan Chhabra, Nitin Aggarwal, Venkata Deepankar Duvvuru (Rocket Fuel)

    Blackbird is a large-scale object store built at Rocket Fuel, which stores 100+ TB of data and provides real time access to 10 billion+ objects in a 2-3 milliseconds at a rate of 1 million+ times per second. In this talk (an update from HBaseCon 2014), we will describe Blackbird’s comprehensive collections API and various examples of how it can be used to model collections like sets, maps, and aggregates on these collections like counters, etc. We will also illustrate the flexibility and power of the API by modeling custom collection types that are unique to the Rocket Fuel context.

  • “NRT Event Processing with Guaranteed Delivery of HTTP Callbacks”

    Alan Steckley (Salesforce.com), Poorna Chandra (Cask Data)

    At Salesforce, we are building a new service, code-named Webhooks, that enables our customers’ own systems to respond in near real-time to system events and customer behavioral actions from the Salesforce Marketing Cloud. The application should process millions of events per day to address the current needs and scale up to billions of events per day for future needs, so horizontal scalability is a primary concern. In this talk, we will discuss how Webhooks is built using HBase for data storage and Cask Data Application Platform (CDAP), an open source framework for building applications on Hadoop.

That concludes our sneak previews series. Better register really soon!

Thank you to our sponsors – Bloomberg, Cask Data, Hortonworks, MapR, Pepperdata, Salesforce.com (Gold); Facebook, Google (Silver); Apache Software Foundation, The Hive Think Tank (Community); O’Reilly Media (Media) — without which HBaseCon would be impossible!

Categories: Hadoop

How-to: Install Cloudera Navigator Encrypt 3.7.0 on SUSE 11 SP2 and SP3

Cloudera Blog - Thu, 04/30/2015 - 15:50

Installing Cloudera Navigator Encrypt on SUSE is a one-off process, but we have you covered with this how-to.

Cloudera Navigator Encrypt, which is integrated with Cloudera Navigator governance software, provides massively scalable, high-performance encryption for critical Apache Hadoop data. It leverages industry-standard AES-256 encryption and provides a transparent layer between the application and filesystem. Navigator Encrypt also includes process-based access controls, allowing authorized Hadoop processes to access encrypted data, while simultaneously preventing admins or super-users like root from accessing data that they don’t need to see.

Navigator Encrypt is distributed in two different packages: the kernel module, and the binaries (cli commands) and configuration files. Current supported distributions are debian-7-x64, rhel-5-x64, rhel-6-x64, sles-11-x64, ubuntu-12.04-x64, and ubuntu-14.04-x64. As SUSE has a specific way to build and distribute RPMs for any external kernel module, this post explains how to install Navigator Encrypt 3.7.0 specifically on SLES 11 SP2 and SP3.

Understanding KMPs

For nearly all platforms, the traditional way to install Navigator Encrypt and its kernel module is to issue:

zypper | apt-get | yum install navencrypt

or any package manager equivalent. In these cases, the Navigator Encrypt kernel module uses dkms to build the kernel module at installation time.

This strategy doesn’t work with SUSE, however, which doesn’t support dkms and which handles external kernel modules in a unique manner. Because the process to build the kernel module manually is tedious, the easiest way to install the kernel module is by distributing it already built.

Fortunately, SUSE provides a build tool (openSUSE Build Service, or OBS) that creates RPM packages containing the pre-built kernel module; this tool is free and can be found at build.opensuse.org. A SUSE package created with this tool is called a kernel module package (KMP). (To learn more about how to build KMPs, see the openSUSE build service user guide.)

For KMP names, SUSE recommends using a naming convention based on the company name and a short package name (example: cloudera-zncryptfs-kmp-default-3.4.2_3.0.13_0.27-15.1.x86_64.rpm). To clarify which packages belong to SP2 and SP3, Cloudera has renamed the KMP by adding “SPx” to the package name, as in: cloudera-zncryptfs-kmp-SP2-default-3.4.2_3.0.13_0.27-15.1.x86_64.rpm.

KMPs are designed to maintain compatibility among all kernel versions for a specific SUSE version (SP2, SP3, etc.). SUSE assures us that if there is a kernel upgrade the kABI symbols will not change; those symbols will have the same symbol version (checksum) for all the kernels supported for a specific SUSE Version (SP2, for example). Thus the same installed kernel module will work after the upgrade without the need to rebuild or upgrade it.

A Navigator Encrypt kernel module only needs to be re-installed when there is an upgrade from SP2 to SP3. The reason for that re-install is that the SP2 cloudera-zncryptfs kernel module is incompatible with SP3—instead, you would need to install the SP3 cloudera-zncryptfs KMP.

Navigator Encrypt for SUSE doesn’t have an implicit dependency on the zncrypt-kernel-module anymore, so it has to be installed independently and based on the kernel where it is going to run. Cloudera packages are named to make it easy to know which version to use; for example, the package cloudera-zncryptfs-kmp-SP2-default-3.4.2_3.0.13_0.27-15.1.x86_64.rpm corresponds to SP2 and cloudera-zncryptfs-kmp-SP3-default-3.4.2_3.0.76_0.11-10.2.x86_64.rpm corresponds to SP3.

SUSE also maintains a list of supported kernels and their versions that you can use to verify compatibility between the KMP version and your installed kernel. Just select the service pack that interests you and then click on the “Kernel:” drop-down list to see all the kernels. (Note: the latest SP3 kernel update used slightly different numbers for i586 vs. x86_64 architectures. The update released for i586 was 3.0.101-0.42.1, and the update released for x86_64 was 3.0.101-0.46.1. This is why you see both in the list. Anyway, for an SP3 kernel x86_x64, you will never see a 3.0.101-0.42.1 version. Navigator Encrypt only supports x86_64.)

Cloudera is a SUSE partner and thus cloudera-zncryptfs is part of the Solid Driver Program, ensuring kernel driver compatibility. To check if the SUSE kernel is tainted, look at the variable:

#cat /proc/sys/kernel/tainted 0

where 0 means the kernel is not tainted. However, because cloudera-zncryptfs is a supported kernel module, a tainted kernel is tagged with a specific value:

#cat /proc/sys/kernel/tainted< 2847453646

(To learn more about a tainted kernel here.)

When looking at the kernel module info, you will also see that it has a tag identifying the support as external to SUSE (“supported:  external”).

# modinfo zncryptfs.ko filename:    zncryptfs.ko license:     GPL v2 description: zncryptfs (v3.4.2-0) author:      Sergio Pena srcversion:  6C5F956ACF58A518FF5DAC9 depends:        supported:   external vermagic:    3.0.101-0.46-default SMP mod_unload modversions parm:        zncryptfs_stats_enabled:Enable data statistics (int) parm:        zncryptfs_acl_cache_size:Set ACL cache size (int) parm:        zncryptfs_acl_verbose:Display more information on an access denied (int) parm:        zncryptfs_debug:Display debug information (int)

Currently, Navigator Encrypt supports SUSE 11 SP2 and SP3. (SLES 11 SP1 is not supported, nor is SLES 12.) The Cloudera stable repo for SUSE/OpenSUSE can be found at here.

Here is a list of cloudera-zncryptfs KMPs built for SP2:


And here is a list of cloudera-zncryptfs KMPs built for SP3:


What flavor is your kernel? It mostly depends on your hardware. Learn more about kernel flavors here.

Installation Process

Identify the cloudera-zncryptfs KMPs that you will install for SP2 or SP3. You also need to identify the flavor.

Add the cloudera archive as specified in the Navigator Encrypt user guide:

 # vi /etc/zypp/repos.d/cloudera.repo [cloudera_stable] name=SLES $releasever - cloudera.com baseurl=https://USER:PASSWORD@archive.gazzang.com/sles/stable/11 enabled=1 gpgcheck=1 gpgkey=http://archive.gazzang.com/gpg_gazzang.asc  # sudo rpm --import http://archive.gazzang.com/gpg_gazzang.asc

In this example, we are installing Navigator Encrypt for SLES 11 SP3. Let’s install its corresponding KMP:

#zypper install https://archive.gazzang.com/sles/stable/11/Packages/cloudera-zncryptfs-kmp-SP3-default-3.4.2_3.0.76_0.11-14.13.x86_64.rpm Refreshing service 'nu_novell_com'. Loading repository data... Reading installed packages... Resolving package dependencies... The following package is going to be upgraded:   cloudera-zncryptfs-kmp-default The following package is not supported by its vendor:   cloudera-zncryptfs-kmp-SP3-default 1 package to upgrade. Overall download size: 265.0 KiB. No additional space will be used or freed after the operation. Continue? [y/n/? shows all options] (y): Retrieving package cloudera-zncryptfs-kmp-SP3-default-3.4.2_3.0.76_0.11-14.13.x86_64 (1/1), 265.0 KiB (1.5 MiB unpacked) Retrieving package cloudera-zncryptfs-kmp-SP3-default-3.4.2_3.0.76_0.11-14.13.x86_64 (1/1), 265.0 KiB (1.5 MiB unpacked) Installing: cloudera-zncryptfs-kmp-SP3-default-3.4.2_3.0.76_0.11-14.13 [done]

(Note: The previous command-line example is for the build revision number 14.13. That number might change, so please check the stable repo for the newest build.)

zncryptfs.ko is installed at /lib/modules/3.0.76-0.11-default/updates/zncryptfs.ko and it becomes a weak-update of your current kernel module.

Current kernel module in this example is:

# uname -r 3.0.101-0.46-default

The weak-update has now a symlink to the kernel module installed:

# ll /lib/modules/3.0.101-0.46-default/weak-updates/updates/zncryptfs.ko lrwxrwxrwx 1 root root 53 Feb 17 09:17 /lib/modules/3.0.101-0.46-default/weak-updates/updates/zncryptfs.ko -> /lib/modules/3.0.76-0.11-default/updates/zncryptfs.ko

This is a good moment to check for the modinfo of our installed kernel module:

# modinfo /lib/modules/3.0.101-0.46-default/weak-updates/updates/zncryptfs.ko filename: /lib/modules/3.0.101-0.46-default/weak-updates/updates/zncryptfs.ko license: GPL v2 description: zncryptfs (v3.4.2-0) author: Sergio Pena srcversion: 6C5F956ACF58A518FF5DAC9 depends: supported: external vermagic: 3.0.76-0.11-default SMP mod_unload modversions parm: zncryptfs_stats_enabled:Enable data statistics (int) parm: zncryptfs_acl_cache_size:Set ACL cache size (int) parm: zncryptfs_acl_verbose:Display more information on an access denied (int) parm: zncryptfs_debug:Display debug information (int)

Our module is now ready to work on our installed kernel. Next, we can install the Navigator Encrypt binaries:

#zypper install navencrypt -y

You can register Navigator Encrypt against a Key Trustee server:

# navencrypt register -s keytrustee.server.net -o ORGANIZATION -a AUTH_CODE Choose MASTER key type: 1) Passphrase (single) 2) Passphrase (dual) 3) RSA private key Select: 1 Type MASTER passphrase: Verify MASTER passphrase: Registering navencrypt v3.6.0 (r44)... * Registering keytrustee GPG keys on 'keytrustee.server.net'... * Uploading MASTER key... * Uploading CONTROL content... * Creating control file... Navencrypt is now registered.

SUSE has a flag that allows external kernel modules to load. Set this flag to 1 as specified in the user guide:

# vi /etc/modprobe.d/unsupported-modules # # Every kernel module has a flag 'supported'. If this flag is not set loading # this module will taint your kernel. You will not get much help with a kernel # problem if your kernel is marked as tainted. In this case you firstly have # to avoid loading of unsupported modules. # # Setting allow_unsupported_modules 1 enables loading of unsupported modules # by modprobe, setting allow_unsupported_modules 0 disables it. This can # be overridden using the --allow-unsupported-modules command line switch. allow_unsupported_modules 1

If you don’t do that, you won’t be able to prepare any mount point, and a message like the following will appear:

# navencrypt-prepare /mnt/t1 /mnt/t1 The Navigator Encrypt kernel module is marked as unsupported on this kernel. Please set allow_unsupported_modules to 1 in /etc/modprobe.d/unsupported-modules

Then, prepare a first mount point:

# navencrypt-prepare /mnt/t1 /mnt/t1 Type MASTER passphrase: Encryption Type: eCryptfs Cipher: aes Key Size: 256 Random Interface: OpenSSL Filesystem: ext3 Options: Verifying MASTER key against KeyTrustee (wait a moment) ... OK Generation Encryption Keys with OpenSSL ... OK Registering Encryption Keys (wait a moment) ... OK Mounting /mnt/t1 ... OK

Verify that it is actually mounted:

# mount … (rw,ecryptfs_sig=b3fc17166dc9f34c,ecryptfs_cipher=aes,ecryptfs_key_bytes=32,ecryptfs_cipher=aes,ecryptfs_key_bytes=32,ecryptfs_enable_filename_crypto=n) suse:~ #

A quick encryption test adding a universal rule can be done:

# mkdir encrypted # vi encrypted/file # navencrypt-move encrypt @test encrypted/file /mnt/t1 Type MASTER passphrase: Size to encrypt: 4 KB Moving from: '/root/encrypted/file' Moving to: '/mnt/t1/test/root/encrypted/file' 100% [=======================================================>] [ 18 B] Done. # cat encrypted/file cat: encrypted/file: Permission denied # navencrypt acl --add --rule="ALLOW @* * *" Type MASTER passphrase: 1 rule(s) were added # cat encrypted/file encrypted content # navencrypt-move decrypt encrypted/file Type MASTER passphrase: Size to decrypt: 12 KB Moving from: '/mnt/t1/test/root/encrypted/file' Moving to: '/root/encrypted/file' 100% [=======================================================>] [ 18 B] Done. # /etc/init.d/navencrypt-mount restart Stopping navencrypt directories * Umounting /mnt/t1 ... done * Unloading module ... done Starting navencrypt directories * Mounting /mnt/t1 ... done suse:~ #

Congratulations, you have just installed Navigator Encrypt on SLES 11 SP3!

Alex Gonzalez is a Software Engineer at Cloudera.

Categories: Hadoop

How-to: Translate from MapReduce to Apache Spark (Part 2)

Cloudera Blog - Tue, 04/28/2015 - 16:39

The conclusion to this series covers Combiner-like aggregation functionality, counters, partitioning, and serialization.

Apache Spark is rising in popularity as an alternative to MapReduce, in a large part due to its expressive API for complex data processing. A few months ago, my colleague, Sean Owen wrote a post describing how to translate functionality from MapReduce into Spark, and in this post, I’ll extend that conversation to cover additional functionality.

To briefly reiterate, MapReduce was originally designed for batch Extract Transform Load (ETL) operations and massive log processing. MapReduce relies on processing key-value pairs in map and reduce phases. Each phase has the following actions:

  1. Map: Emits 0, 1, or more key-values pairs as output for every input.
  2. Shuffle: Groups key-value pairs with the same keys by shuffling data across the cluster’s network.
  3. Reduce: Operates on an iterable of values associated with each key, often performing some kind of aggregation.

To perform complex operations, many Map and Reduce phases must be strung together. As MapReduce became more popular, its limitations with respect to complex and iterative operations became clear.

Spark provides a processing API based around Resilient Distributed Datasets (RDDs.) You can create an RDD by reading in a file and then specifying the sequence of operations you want to perform on it, like parsing records, grouping by a key, and averaging an associated value. Spark allows you to specify two different types of operations on RDDs: transformations and actions. Transformations describe how to transform one data collection into another. Examples of transformations include map, flatMap, and groupByKey. Actions require that the computation be performed, like writing output to a file or printing a variable to screen.

Spark uses a lazy computation model, which means that computation does not get triggered until an action is called. Calling an action on an RDD triggers all necessary transformations to be performed. This lazy evaluation allows Spark to smartly combine operations and optimize performance.

As an aid to the successful production deployment of a Spark cluster, in the rest of the blog post, we’ll explore how to reproduce functionality with which you may already be familiar from MapReduce in Spark. Specifically, we will cover combiner-like aggregation functionality, partitioning data, counter-like functionality, and the pluggable serialization frameworks involved.

reduceByKey vs Combiner

This simple Mapper featured in Sean’s blog post:

public class LineLengthMapper     extends Mapper {   @Override   protected void map(LongWritable lineNumber, Text line, Context context)       throws IOException, InterruptedException {     context.write(new IntWritable(line.getLength()), new IntWritable(1));   } }

…is part of a job that counts lines of text by their length. It’s simple, but inefficient: The Mapper writes a length and count of 1 for every line, which is then written to disk and shuffled across the network, just to be added up on the Reducer. If there are a million empty lines, then a million records representing “length 0, count 1” will be copied across the network, just to be collapsed into “length 0, count 1000000” by a Reducer like the one also presented last time:

public class LineLengthReducer     extends Reducer {   @Override   protected void reduce(IntWritable length, Iterable counts, Context context)       throws IOException, InterruptedException {     int sum = 0;     for (IntWritable count : counts) {       sum += count.get();     }     context.write(length, new IntWritable(sum));   } }

For this reason, MapReduce has the notion of a Combiner. A Combiner is an optimization that works like a Reducer—in fact, it must be an implementation of Reducer—that can combine multiple records on the Mapper side, before those records are written anywhere. It functions like a miniature Reducer preceding the shuffle. A Combiner must be commutative and associative, which means that its result must be the same no matter the order in which it combines records. In fact, LineLengthReducer itself could be applied as the Combiner in this MapReduce job, as well as being the Reducer.

Back to Spark. A terse and literal—but certainly not optimal—translation of LineLengthMapper and LineLengthReducer is:

linesRDD.mapPartitions { lines => lines.map(line => (line.length, 1)) }.groupByKey().mapValues(_.sum)

The Mapper corresponds to mapPartitions, the shuffle to groupByKey, and the Reducer to the mapValues call. A likewise literal translation of the Combiner would inject its logic at the end of the Mapper’s analog, mapPartitions:

linesRDD.mapPartitions { lines =>   val mapResult = lines.map(line => (line.length, 1))   mapResult.toSeq.groupBy(_._1).mapValues(_.map(_._2).sum).toIterator }.groupByKey().mapValues(_.sum)

The new code uses Scala’s Collections API; these are not Spark operations. As mentioned previously, the new code actually implements exactly the same logic. It’s easy to see the resemblance in the expression of the two, since Spark mimics many of Scala’s APIs.

Still, it’s clunky. The essence of the operation is summing counts, and to know how to sum many counts it’s only necessary to know how to sum two counts, and apply that over and over until just one value is left. This is what a true reduce operation does: from a function that makes two values into one, it makes many values into one.

In fact, if just given the reduce function, Spark can intelligently apply it so as to get the effect of the Combiner and Reducer above all at once:

linesRDD.mapPartitions { lines =>   val mapResult = lines.map(line => (line.length, 1)) }.reduceByKey(_ + _)

_ + _ is shorthand for a function of two arguments that returns their sum. This is a far more common way of expressing this operation in Spark, and under the hood, Spark will be able to apply the reduce function before and after a shuffle automatically. In fact, without the need to express the Combiner’s counterpart directly in the code, it’s also no longer necessary to express how to map an entire partition with mapPartitions, since it’s implied by expressing how to map an element at a time:

linesRDD.map(line => (line.length, 1)).reduceByKey(_ + _)

The upshot is that, when using Spark, you’re often automatically using the equivalent of a Combiner. For the interested, a few further notes:

  • reduceByKey is built on a more general operation in Spark, called combineByKey, which allows values to be transformed at the same time.
  • For those who really are counting values, there is an even more direct Spark method for this: linesRDD.map(_.length).countByValue()
  • And if speed is more important than accuracy, there is a much faster approximate version that relies on the HyperLogLog algorithm: linesRDD.map(_.length).countByValueApprox()
Partitioning and Grouping Data

Both Spark and MapReduce support partitioning of key-value data by key. How data is split into chunks and in turn tasks by the processing framework has a large effect on the performance of common data operations like joining disparate data sets or doing per-key aggregations.

In MapReduce, you can specify a partitioner that determines how key-value pairs are split up and organized amongst the reducers. A well-designed partitioner will approximately evenly distribute the records between the reducers. Both MapReduce and Spark use hash partitioning as their default partitioning strategy, though there are separate implementations for MapReduce and Spark. Hash partitioning works by assigning pairs to partitions based on the hash value of the key. In MapReduce and Spark the partition a key-value pair is assigned to the hashCode() method modulo the number of partitions you are creating. The hope is that the hashing function will evenly distribute your keys in the hash-space and you should end up with approximately evenly distributed data between partitions.

A common issue in distributed programs with per-key aggregation is seeing a long tail in the distribution of the number of records assigned to reducers, and having “straggler” reducers that take much more time to complete than the rest. You can often resolve this problem by specifying a different, potentially custom partitioner. To do this in MapReduce, you can define your own customer partitioner by extending Partitioner and specifying your custom Partitioning class in the job configuration. This can be done in the configuration file, or programmatically with conf.setPartitionerClass(MyPartitioner.class).

In Spark, there are operations that benefit from partitioning as well as operations can modify partitioning. The following table explains what types of transformations can affect partitioning and how.



MapReduce allows you to count things that happen in your job, and then query that count later. To define customer counters in MapReduce, you first need to define an Enum that describes the counters you will track. Imagine you are using Jackson (org.codehaus.jackson) to parse JSON into a Plain Old Java Object (POJO) using a jackson ObjectMapper. In doing so, you may encounter a JsonParseException or JsonMappingException, and you would like to track how many of each you see. So, you will create an enum that contains an element for both of these possible exceptions:

public static enum JsonErr {       PARSE_ERROR,       MAPPING_ERROR }

Then, in the map method of your map class you would have

public void map(LongWritable key, Text value, Context context)     throws IOException, InterruptedException {    try { /* Parse a string into a POJO */    } catch (JsonParseException parseException) { context.getCounter(JsonErr.PARSE_ERROR).increment(1L)    } catch (JsonMappingException mappingException) { context.getCounter(JsonErr.MAPPING_ERROR).increment(1L);    } }

All counters that get incremented during the course of a job will be reported to the JobTracker and displayed in the JobTracker Web UI, along with the default I/O counters. You can also access the counters from the MapReduce driver program, from the Job you create using your Configuration.

Spark exposes Accumulators, which can be used as counters, but more generally support any associative operation. Thus, you can go beyond incrementing and decrementing by integers toward summing arbitrary floating-point numbers—or even better, actually collecting samples of parsing errors you encounter.

If you were to do a literal translation of this parsing-error count to Spark it would look like:

/** Define accumulators. */ val parseErrorAccum = sc.accumulator(0, "JsonParseException") val mappingErrorAccum = sc.accumulator(0, "JsonMappingException") /** Define a function for parsing records and incrementing accumulators when exceptions are thrown. */ def parse(line: String) = {   try { /* Parse a String into a POJO */   } catch {     case e: JsonParseException => mapErr += 1     case e: JsonMappingException => parseError += 1   } } /** Parse records in a transformation.*/ val parsedRdd = unparsedRdd.map(parse)

While there does not currently exist a good way to count while performing a transformation, Spark’s Accumulators do provide useful functionality for creating samples of parsing errors. This is very notably something that is more difficult to do in MapReduce. An alternative and useful strategy to take is to instead use reservoir sampling to create a sample of error messages associated with parsing errors.

Important Caveat About Accumulators

Now, you should be careful about how and when you use Accumulators. In MapReduce, increment actions on a counter executed during a task that later fails will not be counted toward the final value. MapReduce is careful to count correctly even when tasks fail or speculative execution occurs.

In Spark, the behavior of accumulators requires careful attention. It is strongly recommended that accumulators only be used in an action. Accumulators incremented in an action are guaranteed to only be incremented once. Accumulators incremented in a transformation can have their values incremented multiple times if a task or job stage is ever rerun, which is unexpected behavior for most users.

In the example below, an RDD is created and then mapped over while an accumulator is incremented. Since Spark uses a lazy evaluation model, these RDDs are only computed once an action is invoked and a value is required to be returned. Calling another action on myRdd2 requires that the preceding steps in the workflow are recomputed, incrementing the accumulator again.

scala> val myrdd = sc.parallelize(List(1,2,3)) myrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :12 scala> val myaccum = sc.accumulator(0, "accum") myaccum: org.apache.spark.Accumulator[Int] = 0 scala> val myRdd2 = myrdd.map(x => myaccum += 1) myRdd2: org.apache.spark.rdd.RDD[Unit] = MappedRDD[1] at map at :16 scala> myRdd2.collect res0: Array[Unit] = Array((), (), ()) scala> myaccum res1: org.apache.spark.Accumulator[Int] = 3 scala> myRdd2.collect res2: Array[Unit] = Array((), (), ()) scala> myaccum res3: org.apache.spark.Accumulator[Int] = 6

Beyond situations where Spark’s lazy evaluation model causes transformations to be reapplied and accumulators incremented, it is possible that tasks getting rerun because of a partial failure will cause accumulators to be incremented again. The semantics of accumulators are distinctly not a once-and-only-once (aka counting) model.

The problem with the example of counting parsing errors above is that it is possible for the job to complete successfully with no explicit errors, but the numeric results may not be valid—and it would be difficult to know either way. One of the most common uses of counters in MapReduce is parsing records and simultaneously counting errors, and unfortunately there is no way to reliably count using accumulators in Spark.

Serialization Frameworks

MapReduce and Spark both need to be able to take objects in the JVM and serialize them into a binary representation to be sent across the network when shuffling data. MapReduce uses a pluggable serialization framework, which allows users to specify their own implementation(s) of org.apache.hadoop.io.serializer.Serialization by setting io.serialization in the Hadoop configuration if they wish to use a custom serializer. HadoopWritable and Avro specific and reflection-based serializers are configured as the default supported serializations.

Similarly, Spark has a pluggable serialization system that can be configured by setting the spark.serializer variable in the Spark configuration to a class that extends org.apache.spark.serializer.Serializer. By default, Spark uses Java Serialization that works out of the box but is not as fast as other serialization methods. Spark can be configured to use the much faster Kryo Serialization protocol by setting spark.serializer to org.apache.spark.serializer.KryoSerializer and setting spark.kryo.registrator to the class of your own custom registrator, if you have one. In order to get the best performance out of Kryo, you should register the classes with a KryoRegistrator ahead of time, and configure Spark to use your particular Kryo registrator.

If you wanted to use Kryo for serialization and register a User class for speed, you would define your registrator like this.

import com.mycompany.model.User import org.apache.spark.serializer.KryoRegistrator class MyKryoRegistrator extends KryoRegistrator {   override def registerClasses(kryo: Kryo) {     kryo.register(classOf[User])   } }

You would then set spark.serializer to spark.KryoSerializer and spark.kryo.registrator to com.mycompany.myproject.MyKryoRegistrator. It is worth noting that if you are working with Avro objects, you will also need to specify the AvroSerializer class to serialize and deserialize. You would modify our Registrator code like so:

import com.mycompany.model.UserAvroRecord import org.apache.spark.serializer.KryoRegistrator class MyKryoRegistrator extends KryoRegistrator {   override def registerClasses(kryo: Kryo) {     kryo.register(classOf[UserAvroRecord], new AvroSerializer[UserAvroRecord]())   }

Note: while the data sent across the network using Spark will be serialized with the serializer you specify in the configuration, the closures of tasks will be serialized with Java serialization. This means anything in the closures of your tasks must be serializable, or you will get a TaskNotSerializableException.

For Spark to operate on the data in your RDD it must be able to serialize the function you specify in map, flatMap, combineByKey on the driver node, ship that serialized function to the worker nodes, deserialize it on the worker nodes, and execute it on the data. This is always done with Java Serialization, which means you can’t easily have Avro objects in the closure of function in Spark because Avro objects have not been Java serializable up until version 1.8.


As you hopefully observed, there are similarities but also important differences between MapReduce and Spark with respect to combiner-like aggregation functionality, partitioning, counters, and pluggable serialization frameworks. Understanding these nuances can help ensure that your Spark deployment is a long-term success.

Juliet Hougland is a Data Scientist at Cloudera.

Categories: Hadoop

Sneak Preview: HBaseCon 2015 Ecosystem Track

Cloudera Blog - Fri, 04/24/2015 - 17:03

This year’s HBaseCon Ecosystem track covers projects that are complementary to HBase (with a focus on SQL) such as Apache Phoenix, Apache Kylin, and Trafodion.

In this post, I’ll give you a window into the HBaseCon 2015′s (May 7 in San Francisco) Ecosystem track.

Thanks, Program Committee!

  • “HBase as an IoT Stream Analytics Platform for Parkinson’s Disease Research”

    Ido Karavany (Trafodion)

    In this session, you will learn about a solution developed in partnership between Intel and the Michael J. Fox foundation to enable breakthroughs in Parkinson’s disease (PD) research, by leveraging wearable sensors and smartphone to monitor PD patient’s motor movements 24/7. We’ll elaborate on how we’re using HBase for time-series data storage and integrating it with various stream, batch, and interactive technologies. We’ll also review our efforts to create an interactive querying solution over HBase.

  • “Apache Phoenix: The Evolution of a Relational Database Layer over HBase”

    James Taylor (Salesforce.com), Maryann Xue (Intel)

    Phoenix has evolved to become a full-fledged relational database layer over HBase data. We’ll discuss the fundamental principles of how Phoenix pushes the computation to the server and why this leads to performance enabling direct support of low-latency applications, along with some major new features. Next, we’ll outline our approach for transaction support in Phoenix, a work in-progress, and discuss the pros and cons of the various approaches. Lastly, we’ll examine the current means of integrating Phoenix with the rest of the Hadoop ecosystem.

  • “Analyzing HBase Data with Apache Hive”

    Swarnim Kulkarni (Cerner), Brock Noland (StreamSets), Nick Dimiduk (Hortonworks)

    Hive/HBase integration extends the familiar analytical tooling of Hive to cover online data stored in HBase. This talk will walk through the architecture for Hive/HBase integration with a focus on some of the latest improvements such as Hive over HBase Snapshots, HBase filter pushdown, and composite keys. These changes improve the performance of the Hive/HBase integration and expose HBase to a larger audience of business users. We’ll also discuss our future plans for HBase integration.

  • “Apache Kylin: Extreme OLAP Engine for Hadoop” 

    Seshu Adunuthula (eBay)

    Kylin is an open source distributed analytics engine contributed by eBay that provides a SQL interface and OLAP on Hadoop supporting extremely large datasets. Kylin’s pre-built MOLAP cubes (stored in HBase), distributed architecture, and high concurrency helps users analyze multidimensional queries via SQL and other BI tools. During this session, you’ll learn how Kylin uses HBase’s key-value store to serve SQL queries with relational schema.

  • “Optimizing HBase for the Cloud in Microsoft Azure HDInsight”

    Maxim Lukiyanov (Microsoft)

    Microsuft Azure’s Hadoop cloud service, HDInsight, offers Hadoop, Storm, and HBase as fully managed clusters. In this talk, you’ll explore the architecture of HBase clusters in Azure, which is optimized for the cloud, and a set of unique challenges and advantages that come with that architecture. We’ll also talk about common patterns and use cases utilizing HBase on Azure.

  • “Warcbase: Scaling ‘Out’ and ‘Down’ HBase for Web Archiving”

    Jimmy Lin (University of Maryland)

    Web archiving initiatives around the world capture ephemeral web content to preserve our collective digital memory. However, that requires scalable, responsive tools that support exploration and discovery of captured content. Here you’ll learn about why Warcbase, an open-source platform for managing web archives built on HBase, is one such tool. It provides a flexible data model for storing and managing raw content as well as metadata and extracted knowledge, tightly integrates with Hadoop for analytics and data processing, and relies on HBase for storage infrastructure.

  • “Trafodion: Integrating Operational SQL into HBase”

    Anoop Sharma, Rohot Jain (HP)

    Trafodion, open sourced by HP, reflects 20+ years of investment in a full-fledged RDBMS built on Tandem’s OLTP heritage and geared towards a wide set of mixed query workloads. In this talk, we will discuss how HP integrated Trafodion with HBase to take full advantage of the Trafodion database engine and the HBase storage engine, covering 3-tier architecture, storage, salting/partitioning, data movement, and more.

  • “SQL-on-HBase Smackdown: Panel”

    Julian Hyde (Hortonworks), Rohit Jain (HP), Dr. Ricardo Jimenez-Peris (LeanXscale), Joen Leach (Splice Machine), Jacques Nadeau (MapR), James Taylor (Salesforce.com)

    Nothing is hotter than SQL-on-Hadoop, and now SQL-on-HBase is fast approaching equal hotness status. In this panel, a panel of developers and architects deeply involved in this effort will discuss the work done so far across the ecosystem and the work still to be done.

Getting interested? Wait until you see the Use Cases sneak preview next week!

Thank you to our sponsors – Bloomberg, Cask Data, Hortonworks, MapR, Pepperdata, Salesforce.com (Gold); Facebook, Google (Silver); Apache Software Foundation, The Hive Think Tank (Community); O’Reilly Media (Media) — without which HBaseCon would be impossible!

Categories: Hadoop

New Cloudera Search Training: Learn Powerful Techniques for Full-Text Search on an EDH

Cloudera Blog - Fri, 04/24/2015 - 15:43

Cloudera Search combines the speed of Apache Solr with the scalability of CDH. Our newest training course covers this exciting technology in depth, from indexing to user interfaces, and is ideal for developers, analysts, and engineers who want to learn how to effectively search both structured and unstructured data at scale.

Despite being nearly 10 years old, Apache Hadoop already has an interesting history. Some of you may know that it was inspired by the Google File System and MapReduce papers, which detailed how the search giant was able to store and process vast amounts of data. Search was the original Big Data application, and, in fact, Hadoop itself was a spinoff of a project designed to create a reliable, scalable system to index data using one of Doug Cutting’s other creations: Apache Lucene.

Fortunately, many of Hadoop’s early developers and users had the vision to see that a system that offers scalable, cost-effective data storage and processing would become increasingly important in a world that generates data at ever-increasing rates. Hadoop ultimately became the center of an entire ecosystem of tools that made it easier to ingest, process, and analyze a variety of data. Concurrently, Yonik Seeley was creating what eventually became Apache Solr, which is also based on Apache Lucene. Cloudera Search brings these technologies together to enable full-text search at scale.

What is search, exactly? Many people picture Google’s home page, simply offering a text box for you to enter search terms and a button that submits your query. That’s certainly one application of search — and one that helps to illustrate how Cloudera Search can help users to find relevant information regardless of their technical ability — but it’s not the only one. In fact, search also encompasses analytical capabilities that allow you to explore your data interactively, as you can see below:

Economics and technology have converged to usher in the era of the enterprise data hub. As organizations become more data-driven, it’s more important than ever for decision makers to identify important trends. Cloudera Search is an ideal tool for this, and it’s often used for data discovery and exploration. Yet the ability to quickly drill down and find a specific result also makes it a good choice for detecting anomalies. Rather than finding a Web site as one might with internet search, an enterprise user would use Cloudera Search to analyze customer satisfaction through social media posts, identify a failing device using network sensor data, or uncover intrusion attempts hidden away in firewall logs.

Although our Designing and Building Big Data Applications course does include a section on Cloudera Search, many students wanted to cover Search in greater depth after they discovered how they could apply it in their own organizations. That’s why I’m pleased to announce the availability of Cloudera Search Training, a three-day course dedicated to showing you how to ingest, index, and query data at scale using this versatile technology.

If you’d like to learn more about this course, I invite you to join me for a webinar on Tuesday, April 28th. We’ll cover:

  • What Cloudera Search is and how it enables data discovery and analytics
  • How to perform indexing of data from various sources and in various formats
  • Who is best suited to attend the course and what prior knowledge you should have
  • The benefits Search delivers as part of an enterprise data hub

Register now for the webinar.

Tom Wheeler is a Senior Curriculum Developer for Cloudera University.

Categories: Hadoop