Hadoop

Apache Kafka for Beginners

Cloudera Blog - Fri, 09/12/2014 - 18:10

When used in the right way and for the right use case, Kafka has unique attributes that make it a highly attractive option for data integration.

Apache Kafka is creating a lot of buzz these days. While LinkedIn, where Kafka was founded, is the most well known user, there are many companies successfully using this technology.

So now that the word is out, it seems the world wants to know: What does it do? Why does everyone want to use it? How is it better than existing solutions? Do the benefits justify replacing existing systems and infrastructure?
 
In this post, we’ll try to answers those questions. We’ll begin by briefly introducing Kafka, and then demonstrate some of Kafka’s unique features by walking through an example scenario. We’ll also cover some additional use cases and also compare Kafka to existing solutions.

What is Kafka?

Kafka is one of those systems that is very simple to describe at a high level, but has an incredible depth of technical detail when you dig deeper. The Kafka documentation does an excellent job of explaining the many design and implementation subtleties in the system, so we will not attempt to explain them all here. In summary, Kafka is a distributed publish-subscribe messaging system that is designed to be fast, scalable, and durable.

Like many publish-subscribe messaging systems, Kafka maintains feeds of messages in topics. Producers write data to topics and consumers read from topics. Since Kafka is a distributed system, topics are partitioned and replicated across multiple nodes.

Messages are simply byte arrays and the developers can use them to store any object in any format – with String, JSON, and Avro the most common. It is possible to attach a key to each message, in which case the producer guarantees that all messages with the same key will arrive to the same partition. When consuming from a topic, it is possible to configure a consumer group with multiple consumers. Each consumer in a consumer group will read messages from a unique subset of partitions in each topic they subscribe to, so each message is delivered to one consumer in the group, and all messages with the same key arrive at the same consumer.

What makes Kafka unique is that Kafka treats each topic partition as a log (an ordered set of messages). Each message in a partition is assigned a unique offset. Kafka does not attempt to track which messages were read by each consumer and only retain unread messages; rather, Kafka retains all messages for a set amount of time, and consumers are responsible to track their location in each log. Consequently, Kafka can support a large number of consumers and retain large amounts of data with very little overhead.

Next, let’s look at how Kafka’s unique properties are applied in a specific use case.

Kafka at Work

Suppose we are developing a massive multiplayer online game. In these games, players cooperate and compete with each other in a virtual world. Often players trade with each other, exchanging game items and money, so as game developers it is important to make sure players don’t cheat: Trades will be flagged if the trade amount is significantly larger than normal for the player and if the IP the player is logged in with is different than the IP used for the last 20 games. In addition to flagging trades in real-time, we also want to load the data to Apache Hadoop, where our data scientists can use it to train and test new algorithms.

For the real-time event flagging, it will be best if we can reach the decision quickly based on data that is cached on the game server memory, at least for our most active players. Our system has multiple game servers and the data set that includes the last 20 logins and last 20 trades for each player can fit in the memory we have, if we partition it between our game servers.

Our game servers have to perform two distinct roles: The first is to accept and propagate user actions and the second to process trade information in real time and flag suspicious events. To perform the second role effectively, we want the whole history of trade events for each user to reside in memory of a single server. This means we have to pass messages between the servers, since the server that accepts the user action may not have his trade history. To keep the roles loosely coupled, we use Kafka to pass messages between the servers, as you’ll see below.

Kafka has several features that make it a good fit for our requirements: scalability, data partitioning, low latency, and the ability to handle large number of diverse consumers. We have configured Kafka with a single topic for logins and trades. The reason we need a single topic is to make sure that trades arrive to our system after we already have information about the login (so we can make sure the gamer logged in from his usual IP). Kafka maintains order within a topic, but not between topics.

When a user logs in or makes a trade, the accepting server immediately sends the event into Kafka. We send messages with the user id as the key, and the event as the value. This guarantees that all trades and logins from the same user arrive to the same Kafka partition. Each event processing server runs a Kafka consumer, each of which is configured to be part of the same group—this way, each server reads data from few Kafka partitions, and all the data about a particular user arrives to the same event processing server (which can be different from the accepting server). When the event-processing server reads a user trade from Kafka, it adds the event to the user’s event history it caches in local memory. Then it can access the user’s event history from the local cache and flag suspicious events without additional network or disk overhead.

It’s important to note that we create a partition per event-processing server, or per core on the event-processing servers for a multi-threaded approach. (Keep in mind that Kafka was mostly tested with fewer than 10,000 partitions for all the topics in the cluster in total, and therefore we do not attempt to create a partition per user.)

This may sound like a circuitous way to handle an event: Send it from the game server to Kafka, read it from another game server and only then process it. However, this design decouples the two roles and allows us to manage capacity for each role as required. In addition, the approach does not add significantly to the timeline as Kafka is designed for high throughput and low latency; even a small three-node cluster can process close to a million events per second with an average latency of 3ms.

When the server flags an event as suspicious, it sends the flagged event into a new Kafka topic—for example, Alerts—where alert servers and dashboards pick it up. Meanwhile, a separate process reads data from the Events and Alerts topics and writes them to Hadoop for further analysis.

Because Kafka does not track acknowledgements and messages per consumer it can handle many thousands of consumers with very little performance impact. Kafka even handles batch consumers—processes that wake up once an hour to consume all new messages from a queue—without affecting system throughput or latency.

Additional Use Cases

As this simple example demonstrates, Kafka works well as a traditional message broker as well as a method of ingesting events into Hadoop.

Here are some other common uses for Kafka:

  • Website activity tracking: The web application sends events such as page views and searches Kafka, where they become available for real-time processing, dashboards and offline analytics in Hadoop
  • Operational metrics: Alerting and reporting on operational metrics. One particularly fun example is having Kafka producers and consumers occasionally publish their message counts to a special Kafka topic; a service can be used to compare counts and alert if data loss occurs.
  • Log aggregation: Kafka can be used across an organization to collect logs from multiple services and make them available in standard format to multiple consumers, including Hadoop and Apache Solr.
  • Stream processing: A framework such as Spark Streaming reads data from a topic, processes it and writes processed data to a new topic where it becomes available for users and applications. Kafka’s strong durability is also very useful in the context of stream processing.

Other systems serve many of those use cases, but none of them do them all. ActiveMQ and RabbitMQ are very popular message broker systems, and Apache Flume is traditionally used to ingest events, logs, and metrics into Hadoop.

Kafka and Its Alternatives

We can’t speak much about message brokers, but data ingest for Hadoop is a problem we understand very well.

First, it is interesting to note that Kafka started out as a way to make data ingest to Hadoop easier. When there are multiple data sources and destinations involved, writing a separate data pipeline for each source and destination pairing quickly evolves to an unmaintainable mess. Kafka helped LinkedIn standardize the data pipelines and allowed getting data out of each system once and into each system once, significantly reducing the pipeline complexity and cost of operation.

Jay Kreps, Kafka’s architect at LinkedIn, describes this familiar problem well in a blog post:

My own involvement in this started around 2008 after we had shipped our key-value store. My next project was to try to get a working Hadoop setup going, and move some of our recommendation processes there. Having little experience in this area, we naturally budgeted a few weeks for getting data in and out, and the rest of our time for implementing fancy prediction algorithms. So began a long slog.

Diffs versus Flume

There is significant overlap in the functions of Flume and Kafka. Here are some considerations when evaluating the two systems.

  • Kafka is very much a general-purpose system. You can have many producers and many consumers sharing multiple topics. In contrast, Flume is a special-purpose tool designed to send data to HDFS and HBase. It has specific optimizations for HDFS and it integrates with Hadoop’s security. As a result, Cloudera recommends using Kafka if the data will be consumed by multiple applications, and Flume if the data is designated for Hadoop.
  • Those of you familiar with Flume know that Flume has many built-in sources and sinks. Kafka, however, has a significantly smaller producer and consumer ecosystem, and it is not well supported by the Kafka community. Hopefully this situation will improve in the future, but for now: Use Kafka if you are prepared to code your own producers and consumers. Use Flume if the existing Flume sources and sinks match your requirements and you prefer a system that can be set up without any development.
  • Flume can process data in-flight using interceptors. These can be very useful for data masking or filtering. Kafka requires an external stream processing system for that.
  • Both Kafka and Flume are reliable systems that with proper configuration can guarantee zero data loss. However, Flume does not replicate events. As a result, even when using the reliable file channel, if a node with Flume agent crashes, you will lose access to the events in the channel until you recover the disks. Use Kafka if you need an ingest pipeline with very high availability.
  • Flume and Kafka can work quite well together. If your design requires streaming data from Kafka to Hadoop, using a Flume agent with Kafka source to read the data makes sense: You don’t have to implement your own consumer, you get all the benefits of Flume’s integration with HDFS and HBase, you have Cloudera Manager monitoring the consumer and you can even add an interceptor and do some stream processing on the way.
Conclusion

As you can see, Kafka has a unique design that makes it very useful for solving a wide range of architectural challenges. It is important to make sure you use the right approach for your use case and use it correctly to ensure high throughput, low latency, high availability, and no loss of data.

Gwen Shapira is a Software Engineer at Cloudera, and a Kafka contributor. Jeff Holoman is a Systems Engineer at Cloudera.

Categories: Hadoop

Getting Started with Big Data Architecture

Cloudera Blog - Wed, 09/10/2014 - 14:12

What does a “Big Data engineer” do, and what does “Big Data architecture” look like? In this post, you’ll get answers to both questions.

Apache Hadoop has come a long way in its relatively short lifespan. From its beginnings as a reliable storage pool with integrated batch processing using the scalable, parallelizable (though inherently sequential) MapReduce framework, we have witnessed the recent additions of real-time (interactive) components like Impala for interactive SQL queries and integration with Apache Solr as a search engine for free-form text exploration.

Getting started is now also a lot easier: Just install CDH, and all the Hadoop ecosystem components are at your disposal. But after installation, where do you go from there? What is a good first use case? How do you ask those “bigger questions”?

Having worked with more customers running Hadoop in production than any other vendor, Cloudera’s field technical services team has seen more than its fair share of these use cases. Although they obviously vary by industry and application, there is a common theme: the presence of Big Data architecture.

In this post, you’ll get a whirlwind tour of that architecture based on what we’ve seen at customer sites over the past couple of years, and get some tips/initial advice about building your own as the foundation for an enterprise data hub.

Big Data Architecture

Big Data architecture is premised on a skill set for developing reliable, scalable, completely automated data pipelines. That skill set requires profound knowledge of every layer in the stack, beginning with cluster design and spanning everything from Hadoop tuning to setting up the top chain responsible for processing the data. The following diagram shows the complexity of the stack, as well as how data pipeline engineering touches every part of it.

The main detail here is that data pipelines take raw data and convert it into insight (or value). Along the way, the Big Data engineer has to make decisions about what happens to the data, how it is stored in the cluster, how access is granted internally, what tools to use to process the data, and eventually the manner of providing access to the outside world. The latter could be BI or other analytic tools, the former (for the processing) are likely tools such as Impala or Apache Spark. The people who design and/or implement such architecture I refer to as Big Data engineers.

In the remainder of this post, you’ll learn about the various components in the stack and their role in creating data pipelines.

Cluster Planning

Cluster planning is a “chicken-and-egg” problem, as cluster design is inherently driven by the use-case(s) running later on, and often the use case is not yet clear. Most vendors, including Cloudera, have a reference architecture guideline to help you select the proper class of machines. (For Cloudera certified partners, see the online listing.)

In general, the current recommended machines are dual CPU with 4 to 8 cores each, at least 48GB of RAM up to 512GB (for low latency analytical workloads where lots of data is cached), at least 6 HDDs (hard disk drives), up to 12 or larger for storage heavy configurations, and otherwise standard rack-mountable 19″ servers. Sometimes we also see SSD (solid state drive) setups for low-latency use cases, although the results are not as dramatic as one would assume. (Please test carefully.)

When in doubt, you can always try the public (or private) cloud services first, and once you know your requirements better, you can move things around. If you do so, be generous about machine size for getting comparable results with bare-metal hardware – remember, you are in a shared environment and need to factor-in competing loads and slower data connections (network and virtualized storage).

Ingress

After you have spun up your cluster, you have to decide how to load data. In practice there are two main approaches: batch and event-driven. The former is appropriate for file and structured data, while the latter is appropriate for most near-real-time events such as log or transactional data.

Batch Ingest

Let me start with the more straightforward case: ingesting data from structured data sources (for example, an RDBMS). The weapon of choice is universally Apache Sqoop, which allows you to move data into Hadoop from RDBMSs. You can select partial (column projection and row selection) or full data sets and do full or (given some requirements) incremental transfers. Sqoop uses MapReduce as its workhorse and employs default JDBC drivers for many database systems—or, if necessary, specialized drivers that speed up the data transfer.

The more complex batch ingest method is file loading. Here there are many ways to achieve that but none are really established. In fact, when possible, it is better to switch to the event ingest explained below to avoid bulk loading of files. The matter is complicated by the location of the files (on site or remote), as well as the API to load them (the HDFS put command being the simplest one; there are also REST based APIs with WebHDFS and HttpFS).

But how can you reliably ingest files without human intervention as demanded by Big Data architecture? I have yet to see a solution here, and for now can only point to bespoke custom scripting (Bash, Python, Java and so on) or the vast Hadoop partner field, which has lots to offer on the data integration topic.

On their own, these tools are one-off jobs only—they get invoked and do their work. What is missing is automatic ingest so that the data pipeline is constantly processing data. We’ll pick that up in the “Productionization” section below.

Event Ingest

For event-based ingest there is Apache Flume, which allows you to define a redundant, failsafe network of so-called agents that transport event records from a generating system to the consuming one. The latter might be HDFS, but it can also be Spark or HBase, or a combination of both.

Flume has been battle-tested at large user clusters and allows you to reliably deliver data to where it is needed. The tricky part is to configure the Flume topology and the agents correctly. The agents need to be able to buffer enough data on persistent media so that all anticipated “normal” server failures are covered. Also, tuning the batch sizes of events that are sent between agents is vital to achieve either higher throughput or lower latencies (faster message delivery).

Staging

Once the data has arrived in Hadoop as a whole, there remains the task of staging it for processing. This is not just about storing it somewhere, but rather storing it in the right format, with the right size, and the right access mask.

Storage Formats

The right data format depends on the subsequent use case. Whether the application is batch or real-time is again relevant, but so is whether the format retains the full fidelity of data and is open source (i.e. can be used by more than one tool in the processing stage).

For batch, container file formats, including the venerable SequenceFile and Avro formats, are both useful and popular. As for the analytical, real-time application, the new rising star is Apache Parquet (incubating), which similar to columnar databases lays out the data in columns with built-in structure and compression (e.g. skip NULL values) that allow you to very efficiently scan very large data sets (assuming a selective query pattern).

In addition to the file format, you should also strongly consider encoding and compression formats because the best I/O in Big Data is the one you are not doing. Compression is always a good thing for reducing I/O while loading more data with fewer bytes being moved around. The proper approach is driven by CPU-versus-compression ratio trade-offs, because the better a codec compresses, the more CPU it usually needs. Thus, the data we see is almost always compressed with the Snappy codec, which is super-fast and lightweight yet offers decent compression ratios. For historical data, BZip2 or something similar is often used.

It is also important to think about what happens with your data over time. You might want to implement policies that rewrite older data into different file or compression formats, so that you make better use of the available cluster capacity. As data ages and is accessed less often, it is worthwhile to trade back the compression ratio against CPU usage. Here there are no incumbent tools to help you out, and in the field I often see rather custom solutions (scripting again)… or none at all (which is not good).

Data Partitioning

As you land data, there is another important aspect to consider: how you partition or, more generally, size data. For starters, Hadoop is good at managing fewer very large files. You do not want to design an architecture that lands many small files in HDFS and then be surprised when the NameNode starts to perform badly. You can, of course, land small files, but you would need to implement an ETL stage (or rather TL as no extract is needed) that combines smaller files into larger ones.

While you are transforming files as they arrive, the next step is to split them into decent chunks for later processing. This is usually done using partitions on HDFS. In HBase the partitioning is implicit as it divides data into regions of contiguous rows, sorted by their row key; it splits and rebalances as it goes along. For HDFS, you have to plan ahead of time—you might need to sample data and explore its structure to decide what is best for you. The rule of thumb, though, is for partitions to span at least a decent amount of data worth processing without creating the small-file problem mentioned above. I would advise you to start with a partition amounting to at least 1GB in a single file, and knowing the size of the total dataset, tune this up to even larger sizes. So for very large datasets in the hundreds of TBs and up, I would have each file in a partition be 10GB, or even 100GB or more.

One final note: make sure the file format supports splitting the files into smaller blocks for parallel processing. The above suggested container formats usually do that, but you might want to double check (look for splittable support). If not, you can end up with suboptimal performance across the cluster because a single reader has to process a single large file (that is, your parallelism rate drops considerably).

Access Control

The last part you have to consider is what we call information architecture (IA), which addresses the need to lay out the data in such a way that multiple teams can work safely on a shared cluster—also referred to as multi-tenancy.

It is not enough to have each job read from one directory and emit to another. If you share a cluster across departments, you need to devise a concise access schema that controls tightly (and possibly supports auditing of) who has access to what data. The IA is where these rules are defined—for example, by using user groups and other HDFS features (see the new extended ACLs features in HDFS or Apache Sentry) to map business units into owners of data. With that, you can further define a plan on how data is read from storage during processing and pushed through the various stages of the data processing pipeline.

One way to handle proper processing is to create a time-stamped directory for every running job and then within a further directory structure for incoming (for example from a previous job), currently being processed, and final (as well as permanently failed) files. This ensures that jobs can run in parallel without overwriting each other’s data mid-flight.

We won’t cover this issue in detail here, but IA should also account for data backups (for disaster recovery or load balancing). You need a strategy for moving data across multiple clusters or even data centers.

Data Processing

Thus far you have learned about landing and staging the incoming data. The next step is automatically processing it as part of the data pipeline.

Data Transformation

This is the part mentioned above, i.e. where you process existing data, for example, to transform it into other file formats or other compression algorithms. Just because you transform your data doesn’t mean you need to lose any of its detail: this is not your typical ETL which is often lossy, but rather an optional step to increase the effectiveness of your cluster. Plan to do whatever is needed for staging, which might also extend to rewriting data over time (or based on changing requirements). You could, for example, employ heuristics that check how often and in what way data is used and change its layout over time.

Analytics

The more interesting part of processing is the analytics done on top of the staged data. Here we see the venerable MapReduce—now rejuvenated on top of YARN—as well as other frameworks, such as Spark or Apache Giraph. On top of that layer there are other abstractions in use, notably Apache Crunch and Cascading.

The currently most hyped topic is machine learning, wherein you build mathematical models for recommendations or clustering/classification of incoming new data—for example, to do risk assessment, fraud detection, or spam filtering. The more “mundane” tasks in analysis, such as building aggregations and reporting data, are still very common. In fact, the latter is more than 90% of the use cases we see, with the former being an emerging area.

Either way, after prototyping the algorithm and approach, you have to convert it into an automated workflow.

Egress and Querying

As for providing access to the data, you need to find one that covers all types of users, from novices to experts. The access spans from the ubiquitous Search using Apache Solr, to JDBC interfaces that SQL users and BI tools can use, all the way to low-level APIs—and eventually, raw file access. Regardless of the access method, the data is never copied nor siloed into lesser data structures: all these tools work on the single source of truth represented as the full-fidelity files in HDFS or key values in HBase. Whether you use Impala or Hive to issue SQL commands, the Kite SDK to read files, or process data with interactive Spark, you are always working on the same copy of data.

In fact, that’s what makes Hadoop so powerful, as it removes the need to move data around and transform it to “yet another (lesser) schema”. The integration of Hadoop into the enterprise IT landscape with Kerberos authentication, role-based authorization, and log-based auditing completes the picture.

Data Pipelines

Before we can automate, we have to combine the tools described above into more complex data pipelines. There are two main types of such pipelines: micro and macro.

Micro-pipelines are streamlined helpers that allow you to abstract (and therefore simplify) parts of the larger processing. Tools for this purpose include Morphlines (see “Introducing Morphlines: The Easy Way to Build and Integrate ETL Apps for Hadoop” or details), Crunch, and Cascading. Morphlines tie together smaller processing steps applied to each record or data pair as it flows through the processing. That lets you build tested, reusable processing sub-steps, used for example to cleanse data or enhance its metadata for later steps.

In contrast, Crunch and Cascading define an abstraction layer on top of the processing, where you deal with data points. You define how data is consumed, routed, processed, and emitted, which translates into one or more processing job on MapReduce and/or Spark. But a Crunch or Cascading “meta” job can further be combined to yet more complex workflows, which is usually done in macro-pipelines.

Apache Oozie is one of those macro-pipelines tools. It defines workflows as directed, acyclic graphs (DAGs) that have control and action elements, where the former influences how the flow proceeds and the latter what has to be done for each step. Oozie also has a server component that tracks the running flows and measures to handle their completion (or termination).

As with single jobs or micro-pipelines, a “workflow” is not automated but rather just a definition of work. It has to be invoked manually to start the flow processing. This is where another part of Oozie, the coordinators, come in. Oozie coordinators help define the time or frequency a workflow should run, and/or the dependencies to other workflows and data sources. With this feature, you can define the missing link in automating processing.

Productioniziation

We have closed the loop above and now data pipelines can run in an automated fashion, consuming and producing data as needed. But for a Big Data engineer, I argue there is one more piece to the puzzle: production-ization.

Superficially, it sounds like a trivial task since the hard work has been “done.” In practice, this last step is a challenge because it spans the entire stack and requires careful release planning, with proper testing (QA), staging, and deployment phases. It also includes operating the data pipelines, which means monitoring, reporting, and alerting. Finally, insights about the performance of the pipelines might trigger cluster changes, from hardware to configuration settings.

There are some tools that help you along the way. For example, Cloudera Manager can track cluster utilization and job performance, and Cloudera Navigator can define data lifecycle rules, including metadata about the source and lineage of data. A Big Data engineer is still needed to fill in the gaps, while maintaining the entire pipeline in production.

The following diagram adds the discussed tools and concepts to the data pipeline architecture:

Support

Please do consider the option of having constant support for your Hadoop cluster and data pipelines in production (but also in development). The complexity of running a cluster in such a mode is not trivial and can cause considerable delays when things go wrong. Cloudera Manager will help you tremendously to reduce the time to discover a root cause for a problem, and often you can apply a fix yourself. But there are also many issue we have seen in practice that require a Hadoop engineer to lend a helping hand. Obviously Cloudera has such a team of engineers, in fact a multilayered, dedicated team, which can help you solve any problem you might face.

Conclusion

While Hadoop has grown tremendously, there are still functional gaps for putting data pipelines into production easily, so skilled Big Data engineers are needed. Demand for these engineers is high and expected to grow, and Cloudera’s new “Designing and Building Big Data Applications” training course can teach you the skills you will need to excel in this role.

The Hadoop ecosystem, helpfully, offers most of the tools needed to build and automate these pipelines based on business rules—testing and deploying pipelines is easier with proper tooling support, while operating the same pipelines in production can be equally automated and transparent. As time moves on, missing functionality will be provided either by Cloudera, by third-party vendors, or as part of the open source ecosystem.

In a future world, we will be able to point Hadoop to a source, internal or external, batch or streaming, and press an “Implement Pipeline” button. The initial parameters will be assumed, and further learned and adjusted as needed, resulting in data being laid out for the current use case, be it interactive or automated (or both).

We can dream. In the meantime, happy Hadoop-ing!

Lars George is Cloudera’s EMEA Chief Architect, an HBase committer and PMC member, and the author of O’Reilly’s HBase: The Definitive Guide.

Categories: Hadoop

The Early Release Books Keep Coming: This Time, Hadoop Security

Cloudera Blog - Mon, 09/08/2014 - 15:33

Hadoop Security is the latest book in the Hadoop ecosystem books canon.

We are thrilled to announce the availability of the early release of Hadoop Security, a new book about security in the Apache Hadoop ecosystem published by O’Reilly Media. The early release contains two chapters on System Architecture and Securing Data Ingest and is available in O’Reilly’s catalog and in Safari Books.

The goal of the book is to serve the experienced security architect that has been tasked with integrating Hadoop into a larger enterprise security context. System and application administrators also benefit from a thorough treatment of the risks inherent in deploying Hadoop in production and the associated how and why of Hadoop security.

As Hadoop continues to mature and become ever more widely adopted, material must become specialized for the security architects tasked with ensuring new applications meet corporate and regulatory policies. While it is up to operations staff to deploy and maintain the system, they won’t be responsible for determining what policies their systems must adhere to. Hadoop is mature enough that dedicated security professionals need a reference to navigate the complexities of security on such a massive scale. Additionally, security professionals must be able to keep up with the array of activity in the Hadoop security landscape as exemplified by new projects like Apache Sentry (incubating) and cross-project initiatives such as Project Rhino.

Security architects aren’t interested in how to write a MapReduce job or how HDFS splits files into data blocks, they care about where data is going and who will be able to access it. Their focus is on putting into practice the policies and standards necessary to keep their data secure. As more corporations turn to Hadoop to store and process their most valuable data, the risks with a potential breach of those systems increases exponentially. Without a thorough treatment of the subject, organizations will delay deployments or resort to siloed systems that increase capital and operating costs.

The first chapter available is on the System Architecture where Hadoop is deployed. It goes into the different options for deployment: in-house, cloud, and managed. The chapter also covers how major components of the Hadoop stack get laid out physically from both a server perspective and a network perspective. It gives a security architect the necessary background to put the overall security architecture of a Hadoop deployment into context.

The second available chapter is on Securing Data Ingest it covers the basics of Confidentiality, Integrity, and Availability (CIA) and applies them to feeding your cluster with data from external systems. In particular, the two most common data ingest tools, Apache Flume and Apache Sqoop, are evaluated for their support of CIA. The chapter details the motivation for securing your ingest pipeline as well as providing ample information and examples on how to configure these tools for your specific needs. The chapter also puts the security of your Hadoop data ingest flow into the broader context of your enterprise architecture.

We encourage you to take a look and get involved early. Security is a complex topic and it never hurts to get a jump start on it. We’re also eagerly awaiting feedback. We would never have come this far without the help of some extremely kind reviewers. You can also expect more chapters to come in the coming months. We’ll continue to provide summaries on this blog as we release new content so you know what to expect.

Ben Spivey is a Solutions Architect at Cloudera, and Joey Echeverria is a Software Engineer at Cloudera.

Categories: Hadoop

This Month in the Ecosystem (August 2014)

Cloudera Blog - Fri, 09/05/2014 - 18:09

Welcome to our 12th (first annual!) edition of “This Month in the Ecosystem,” a digest of highlights from August 2014 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly).

  • Developers at Sigmoid Analytics described the Spork project, with the goal of giving Apache Pig access to Apache Spark as a data processing backend. Efforts to do the same for other ecosystem components are also underway (such as the Apache Mahout community’s movement of Mahout’s item-based collaborative filtering recommender to Spark).
  • The Transaction Processing Council (TPC) announced a new Big Data performance benchmark called TPCx-HS, largely based on TeraSort. To learn more about why TPCx-HS is a good first step toward establishing a useful benchmark in this area, read this interview with TPC-DS architect Francois Raab and Cloudera Performance Engineer Yanpei Chen.
  • The GlusterFS community announced that CDH 5 has been successfully tested on that filesystem. Similar work is underway for Impala specifically.
  • Apache Hadoop 2.5 was released. It includes extended file attributes for HDFS (HDFS-2006), a new feature that is explained in detail here.
  • An early release of the new O’Reilly Media book, Using Flumebecame available. The author, Hari Shreedharan, is a Software Engineer at Cloudera and an Apache Flume committer/PMC member.
  • Kite SDK 0.16 was released.
  • Adobe Research open sourced Spindle, its web analytics processing system based on Spark, Apache Parquet (incubating), and CDH 4.7.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

Pig is Flying: Apache Pig on Apache Spark

Cloudera Blog - Thu, 09/04/2014 - 16:10

Our thanks to Mayur Rustagi (@mayur_rustagi), CTO at Sigmoid Analytics, for allowing us to re-publish his post about the Spork (Pig-on-Spark) project below. (Related: Read about the ongoing upstream to bring Spark-based data processing to Hive here.)

Analysts can talk about data insights all day (and night), but the reality is that 70% of all data analyst time goes into data processing and not analysis. At Sigmoid Analytics, we want to streamline this data processing pipeline so that analysts can truly focus on value generation and not data preparation.

We focus our efforts on three simple initiatives:

  • Make data processing more powerful
  • Make data processing more simple
  • Make data processing 100x faster than before

As a data mashing platform, the first key initiative is to combine the power and simplicity of Apache Pig on Apache Spark, making existing ETL pipelines 100x faster than before. We do that via a unique mix of our operator toolkit, called DataDoctor, and Spark.

DataDoctor is a high-level operator DSL on top of Spark. It has frameworks for no-symmetrical joins, sorting, grouping, and embedding native Spark functions. It hides a lot of complexity and makes it simple to implement data operators used in applications like Pig and Apache Hive on Spark.

For the uninitiated, Spark is open source Big Data infrastructure that enables distributed fault-tolerant in-memory computation. As the kernel for the distributed computation, it empowers developers to write testable, readable, and powerful Big Data applications in a number of languages including Python, Java, and Scala.

How Can I Get Started?

As a user of Apache Pig, the migration effort starts and ends with

pig -x spark

 

All your existing UDF, Pig scripts, and data loaders will work out of the box on Spark — which means you can write simpler, easier-to-develop-and-manage data pipelines on Spark. The Pig REPL is a simple way to speed up your data processing on Spark without any coding, compiling, or development effort. What’s more, you have thousands of Pig UDFs to choose from and bootstrap your ETL process on Spark.

High-Level Design

Pig operates in a similar manner to Big Data applications like Hive and Cascading. It has a query language quite akin to SQL that allows analysts and developers to design and write data flow. The query language is translated into a “logical plan” that is is further translated into a “physical plan” containing operators. Those operators are then run on the designated execution engine (MapReduce, Apache Tez, and now Spark). There are a whole bunch of details around tracking progress, handling errors, and so on that I will skip here.

Query Planning

Query planning on Spark will vary significantly from MapReduce, as Spark handles data wrangling in a much more optimized way. Further query planning can benefit greatly from ongoing effort on Catalyst inside Spark. At this moment, we have simply introduced a SparkPlanner that will undertake the conversion from logical to physical plan for Pig. Databricks is working actively to enable Catalyst to handle much of the operator optimizations that will plug into SparkPlanner in the near future. Longer term, we plan to rely on Spark itself for logical plan generation. An early version of this integration has been prototyped in partnership with Databricks.

Pig Launcher

Pig Core hands off Spark execution to SparkLauncher with the physical plan. SparkLauncher creates a SparkContext providing all the Pig dependency jars and Pig itself.

SparkLauncher gets a MR plan object created from the physical plan. At this point, we override all the Pig operators to DataDoctor operators recursively in the whole plan. Two iterations are performed over the plan — one which looks at the store operations and recursively travels down the execution tree, and a second iteration that does a breadth-first traversal over the plan and calls convert on each of the operators.

The base class of convertors in DataDoctor is POConverter class and defines the abstract method convert, which is called during plan execution.

Interesting Operators
  • LoadOperator: An RDD is created for the data that can be used for subsequent transformations. LoadConverter helps load data from HDFS using the Spark API with parameters initialized from POLoad operator.
  • StoreOperator: This operator is useful for saving the end results or some intermediate data whenever required. StoreConverter is used to save data to HDFS with parameters from POStore operator.
  • Local rearrange: LocalRearrangeConverter directly passes data to POLocalRearrangeConverter, which in turn transforms data into the required format. This happens through the Spark map API. The local rearrange operator is a part of the COGROUP implementation. It has an embedded physical plan that generates tuples of the form (grpKey,(indxed inp Tuple))
  • Global rearrange: GlobalRearrangeConverter is used in case of a groupBy operation or a join operation; the converter method uses groupBy and map APIs from Spark to achieve that. In the case of a groupBy operation, results are converted into the form (key, Iterator(values)). In the case of a COGROUP operation, results are in the form (index, key, value).

You can catch the finer details of migration plan in PIG-4059 or give Pig on Spark a go at our Github repo. We know it’s not perfect, so you can file issues here as well while we get Apache JIRA into shape.

Status

I am happy to announce that we have passed 100% of end-to-end test cases on Pig, which means all your Pig code should run pretty smoothly already. When merged with the Pig repository, you will be able to get builds directly from the Pig website as well.

All this would not have been possible without the hard work from many organizations and people: Praveen R (Sigmoid Analytics), Akhil Das (Sigmoid Analytics), Kamal Banga (Sigmoid Analytics), Anish Haldiya (Sigmoid Analytics), Mayur Rustagi (Sigmoid Analytics), Amit Kumar Behera (Sigmoid Analytics), Mahesh Kalakoti (Sigmoid Analytics), Julien Le Dem (Twitter),  Bill Graham (Twitter), Dmitriy Ryaboy (Twitter), Aniket Mokashi (Google), and Greg Owen (Databricks).

Future Plans

Finally, as we merge to Apache Pig, we are focusing on the following enhancements to further improve the speed on Pig:

  • Cache Operator: Adding a new operator to explicitly hint Spark to cache certain datasets for faster execution
  • Storage Hints: Allowing user to specify storage location of datasets in Spark for better control of memory
  • YARN and Mesos Support: Adding resource manager support for more global deployment and support

Mayur Rustagi has four years of experience in building end-to-end architecture for big data applications. He is currently the CTO of Sigmoid Analytics, which is focused on Real Time Streaming & ETL solutions on Apache Spark.

 

Categories: Hadoop

How-to: Translate from MapReduce to Apache Spark

Cloudera Blog - Tue, 09/02/2014 - 15:46

The key to getting the most out of Spark is to understand the differences between its RDD API and the original Mapper and Reducer API.

Venerable MapReduce has been Apache Hadoop‘s work-horse computation paradigm since its inception. It is ideal for the kinds of work for which Hadoop was originally designed: large-scale log processing, and batch-oriented ETL (extract-transform-load) operations.

As Hadoop’s usage has broadened, it has become clear that MapReduce is not the best framework for all computations. Hadoop has made room for alternative architectures by extracting resource management into its own first-class component, YARN. And so, projects like Impala have been able to use new, specialized non-MapReduce architectures to add interactive SQL capability to the platform, for example.

Today, Apache Spark is another such alternative, and is said by many to succeed MapReduce as Hadoop’s general-purpose computation paradigm. But if MapReduce has been so useful, how can it suddenly be replaced? After all, there is still plenty of ETL-like work to be done on Hadoop, even if the platform now has other real-time capabilities as well.

Thankfully, it’s entirely possible to re-implement MapReduce-like computations in Spark. They can be simpler to maintain, and in some cases faster, thanks to Spark’s ability to optimize away spilling to disk. For MapReduce, re-implementation on Spark is a homecoming. Spark, after all, mimics Scala‘s functional programming style and APIs. And the very idea of MapReduce comes from the functional programming language LISP.

Although Spark’s primary abstraction, the RDD (Resilient Distributed Dataset), plainly exposes map() and reduce() operations, these are not the direct analog of Hadoop’s Mapper or Reducer APIs. This is often a stumbling block for developers looking to move Mapper and Reducer classes to Spark equivalents.

Viewed in comparison with classic functional language implementations of map() and reduce() in Scala or Spark, the Mapper and Reducer APIs in Hadoop are actually both more flexible and more complex as a result. These differences may not even be apparent to developers accustomed to MapReduce, but, the following behaviors are specific to Hadoop’s implementation rather than the idea of MapReduce in the abstract:

  • Mappers and Reducers always use key-value pairs as input and output.
  • A Reducer reduces values per key only.
  • A Mapper or Reducer may emit 0, 1 or more key-value pairs for every input.
  • Mappers and Reducers may emit any arbitrary keys or values, not just subsets or transformations of those in the input.
  • Mapper and Reducer objects have a lifecycle that spans many map() and reduce() calls. They support a setup() and cleanup() method, which can be used to take actions before or after a batch of records is processed.

This post will briefly demonstrate how to recreate each of these within Spark — and also show that it’s not necessarily desirable to literally translate a Mapper and Reducer!

Key-Value Pairs as Tuples

Let’s say we need to compute the length of each line in a large text input, and report the count of lines by line length. In Hadoop MapReduce, this begins with a Mapper that produces key-value pairs in which the line length is the key, and count of 1 is the value:

public class LineLengthMapper extends Mapper { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { context.write(new IntWritable(line.getLength()), new IntWritable(1)); } }

 

It’s worth noting that Mappers and Reducers only operate on key-value pairs. So the input to LineLengthMapper, provided by a TextInputFormat, is actually a pair containing the line as value, with position within the file thrown in as a key, for fun. (It’s rarely used, but, something has to be the key.)

The Spark equivalent is:

lines.map(line => (line.length, 1))

 

In Spark, the input is an RDD of Strings only, not of key-value pairs. Spark’s representation of a key-value pair is a Scala tuple, created with the (a,b) syntax shown above. The result of the map() operation above is an RDD of (Int,Int) tuples. When an RDD contains tuples, it gains more methods, such as reduceByKey(), which will be essential to reproducing MapReduce behavior.

Reducer and reduce() versus reduceByKey()

To produce a count of line lengths, it’s necessary to sum the counts per length in a Reducer:

public class LineLengthReducer extends Reducer { @Override protected void reduce(IntWritable length, Iterable counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(length, new IntWritable(sum)); } }

 

The equivalent of the Mapper and Reducer above together is a one-liner in Spark:

val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)

 

Spark’s RDD API has a reduce() method, but it will reduce the entire set of key-value pairs to one single value. This is not what Hadoop MapReduce does. Instead, Reducers reduce all values for a key and emit a key along with the reduced value. reduceByKey() is the closer analog. But, that is not even the most direct equivalent in Spark; see groupByKey() below.

It is worth pointing out here that a Reducer’s reduce() method receives a stream of many values, and produces 0, 1 or more results. reduceByKey(), in contrast, accepts a function that turns exactly two values into exactly one — here, a simple addition function that maps two numbers to their sum. This associative function can be used to reduce many values to one for the caller. It is a simpler, narrower API for reducing values by key than what a Reducer exposes.

Mapper and map() versus flatMap()

Now, instead consider counting the occurrences of only words beginning with an uppercase character. For each line of text in the input, a Mapper might emit 0, 1 or many key-value pairs:

public class CountUppercaseMapper extends Mapper { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { for (String word : line.toString().split(" ")) { if (Character.isUpperCase(word.charAt(0))) { context.write(new Text(word), new IntWritable(1)); } } } }

 

The equivalent in Spark is:

lines.flatMap( _.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1)) )

 

map() will not suffice here, because map() must produce exactly one output per input, but unlike before, one line needs to yield potentially many outputs. Again, the map() function in Spark is simpler and narrower compared to what the Mapper API supports.

The solution in Spark is to first map each line to an array of output values. The array may be empty, or have many values. Merely map()-ing lines to arrays would produce an RDD of arrays as the result, when the result should be the contents of those arrays. The result needs to be “flattened” afterward, and flatMap() does exactly this. Here, the array of words in the line is filtered and converted into tuples inside the function. In a case like this, it’s flatMap() that’s required to emulate such a Mapper, not map().

groupByKey()

It’s simple to write a Reducer that then adds up the counts for each word, as before. And in Spark, again, reduceByKey() could be used to sum counts per word. But what if for some reason the output has to contain the word in all uppercase, along with a count? In MapReduce, that’s:

public class CountUppercaseReducer extends Reducer { @Override protected void reduce(Text word, Iterable counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(new Text(word.toString().toUpperCase()), new IntWritable(sum)); } }

 

But reduceByKey() by itself doesn’t quite work in Spark, since it preserves the original key. To emulate this in Spark, something even more like the Reducer API is needed. Recall that Reducer’s reduce() method receives a key and Iterable of values, and then emits some transformation of those. groupByKey() and a subsequent map() can achieve this:

... .groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }

 

groupByKey() merely collects all values for a key together, and does not apply a reduce function. From there, any transformation can be applied to the key and Iterable of values. Here, the key is transformed to uppercase, and the values are directly summed.

Be careful! groupByKey() works, but also collects all values for a key into memory. If a key is associated to many values, a worker could run out of memory. Although this is the most direct analog of a Reducer, it’s not necessarily the best choice in all cases. For example, Spark could have simply transformed the keys after a call to reduceByKey:

... .reduceByKey(_ + _).map { case (word,total) => (word.toUpperCase,total) }

 

It’s better to let Spark manage the reduction rather than ask it to collect all values just for us to manually sum them.

setup() and cleanup()

In MapReduce, a Mapper and Reducer can declare a setup() method, called before any input is processed, to perhaps allocate an expensive resource like a database connection, and a cleanup() method to release the resource:

public class SetupCleanupMapper extends Mapper { private Connection dbConnection; @Override protected void setup(Context context) { dbConnection = ...; } ... @Override protected void cleanup(Context context) { dbConnection.close(); } }

 

The Spark map() and flatMap() methods only operate on one input at a time though, and provide no means to execute code before or after transforming a batch of values. It looks possible to simply put the setup and cleanup code before and after a call to map() in Spark:

val dbConnection = ... lines.map(... dbConnection.createStatement(...) ...) dbConnection.close() // Wrong!

 

However, this fails for several reasons:

  • It puts the object dbConnection into the map function’s closure, which requires that it be serializable (for example, by implementing java.io.Serializable). An object like a database connection is generally not serializable.
  • map() is a transformation, rather than an operation, and is lazily evaluated. The connection can’t be closed immediately here.
  • Even so, it would only close the connection on the driver, not necessarily freeing resources allocated by serialized copies.

In fact, neither map() nor flatMap() is the closest counterpart to a Mapper in Spark — it’s the important mapPartitions() method. This method does not map just one value to one other value, but rather maps an Iterator of values to an Iterator of other values. It’s like a “bulk map” method. This means that the mapPartitions() function can allocate resources locally at its start, and release them when done mapping many values.

Adding setup code is simple; adding cleanup code is harder because it remains difficult to detect when the transformed iterator has been fully evaluated. For example, this does not work:

lines.mapPartitions { valueIterator => val dbConnection = ... // OK val transformedIterator = valueIterator.map(... dbConnection ...) dbConnection.close() // Still wrong! May not have evaluated iterator transformedIterator }

 

A more complete formulation (HT Tobias Pfeiffer) is roughly:

lines.mapPartitions { valueIterator => if (valueIterator.isEmpty) { Iterator[...]() } else { val dbConnection = ... valueIterator.map { item => val transformedItem = ... if (!valueIterator.hasNext) { dbConnection.close() } transformedItem } } }

 

Although decidedly less elegant than previous translations, it can be done.

There is no flatMapPartitions() method. However, the same effect can be achieved by calling mapPartitions(), followed by a call to flatMap(a => a) to flatten.

The equivalent of a Reducer with setup() and cleanup() is just a groupByKey() followed by a mapPartitions() call like the one above. Take note of the caveat about using groupByKey() above, though.

But Wait, There’s More

MapReduce developers will point out that there is yet more to the API that hasn’t been mentioned yet:

  • MapReduce supports a special type of Reducer, called a Combiner, that can reduce shuffled data size from a Mapper.
  • It also supports custom partitioning via a Partitioner, and custom grouping for purposes of the Reducer via grouping Comparator.
  • The Context objects give access to a Counter API for accumulating statistics.
  • A Reducer always sees keys in sorted order within its lifecycle.
  • MapReduce has its own Writable serialization scheme.
  • Mappers and Reducers can emit multiple outputs at once.
  • MapReduce alone has tens of tuning parameters.

There are ways to implement or port these concepts into Spark, using APIs like the Accumulator, methods like groupBy() and the partitioner argument in various of these methods, Java or Kryo serialization, caching, and more. To keep this post brief, the remainder will be left to a follow-up post.

The concepts in MapReduce haven’t stopped being useful. It just now has a different and potentially more powerful implementation on Hadoop, and in a functional language that better matches its functional roots. Understanding the differences between Spark’s RDD API, and the original Mapper and Reducer APIs, helps developers better understand how all of them truly work and how to use Spark’s counterparts to best advantage.

Sean Owen is Director of Data Science at Cloudera, an Apache Mahout committer/PMC member, and a Spark contributor.

Categories: Hadoop

Building Lambda Architecture with Spark Streaming

Cloudera Blog - Fri, 08/29/2014 - 15:28

The versatility of Apache Spark’s API for both batch/ETL and streaming workloads brings the promise of lambda architecture to the real world.

Few things help you concentrate like a last-minute change to a major project.

One time, after working with a customer for three weeks to design and implement a proof-of-concept data ingest pipeline, the customer’s chief architect told us:

You know, I really like the design – I like how data is validated on arrival. I like how we store the raw data to allow for exploratory analysis while giving the business analysts pre-computed aggregates for faster response times. I like how we automatically handle data that arrives late and changes to the data structure or algorithms.

But, he continued, I really wish there was a real-time component here. There is a one-hour delay between the point when data is collected until it’s available in our dashboards. I understand that this is to improve efficiency and protect us from unclean data. But for some of our use cases, being able to react immediately to new data is more important than being 100% certain of data validity.

Can we quickly add a real-time component to the POC? It will make the results much more impressive for our users.

Without directly articulating it, the architect was referring to what we call the lambda architecture – originally proposed by Nathan Marz – which usually combines batch and real-time components. One often needs both because data arriving in real-time has inherent issues: there is no guarantee that each event will arrive exactly once, so there may be duplicates that will add noise to the data. Data that arrives late due to network or server instability also routinely causes problems. The lambda architecture handles these issues by processing the data twice — once in the real-time view, and a second time in the batch process – to give you one view that is fast, and one that is reliable.

Why Spark?

But this approach comes with a cost: you’ll have to implement and maintain the same business logic in two different systems. For example, if your batch system is implemented with Apache Hive or Apache Pig and your real-time system is implemented with Apache Storm, you need to write and maintain the same aggregates in SQL and in Java. As Jay Kreps noted in his article “Questioning the Lambda Architecture,” this situation very quickly becomes a maintenance nightmare.

Had we implemented the customer’s POC system in Hive, I would have had to tell him: “No, there is not enough time left to re-implement our entire aggregation logic in Storm.” But fortunately, we were using Apache Spark, not Hive, for the customer’s aggregation logic.

Spark is well known as a framework for machine learning, but it is also quite capable for ETL tasks, as well. Spark has clean and easy-to-use APIs (far more readable and with less boilerplate code than MapReduce), and its REPL interface allows for fast prototyping of logic with business users. Obviously, no one complains when the aggregates execute significantly faster than they would with MapReduce.

But the biggest advantage Spark gave us in this case was Spark Streaming, which allowed us to re-use the same aggregates we wrote for our batch application on a real-time data stream. We didn’t need to re-implement the business logic, nor test and maintain a second code base. As a result, we could rapidly deploy a real-time component in the limited time left — and impress not just the users but also the developers and their management.

DIY

Here’s a quick and simple example of how this was done. (For simplicity, only the most important steps are included.) You can see the complete source code here.

  1. First, we wrote a function to implement business logic. In this example, we want to count the number of errors per day in a collection of log events. The log events comprise date and time, followed by a log level, the logging process, and the actual message:   14/08/07 19:19:26 INFO Executor: Finished task ID 11  

    To count the number of errors per day, we need to filter by the log level and then count the number of messages for each day:

    def countErrors(rdd: RDD[String]): RDD[(String, Int)] = { rdd .filter(_.contains("ERROR")) // Keep "ERROR" lines .map( s => (s.split(" ")(0), 1) ) // Return tuple with date & count .reduceByKey(_+_) // Sum counts for each date }

     

    In the function we filter all lines that contain “ERROR”, then use a map function to set the first word in the line (the date) as the key. Then we run reduce by key to count the number of errors we got for each day.

    As you can see, the function transforms one RDD into another. RDD’s are Spark’s main data structure- essentially partitioned, replicated collections. Spark hides the complexity of handling distributed collections from us, and we can work with them like we would with any other collection.

  2. We can use this function in a Spark ETL process to read data from HDFS to an RDD, count errors, and save the results to HDFS:

     

    val sc = new SparkContext(conf) val lines = sc.textFile(...) val errCount = countErrors(lines) errCount.saveAsTextFile(...)

     

    In this example we initialized a SparkContext to execute our code within a Spark cluster. (Note that this is not necessary if you use the Spark REPL, where the SparkContext is initialized automatically.) Once the SparkContext is initialized, we use it to read lines from a file into an RDD and then execute our error count function and save the result back to a file.

    The URLs in spark.textFile and errCount.saveAsTextFile can be placed in HDFS by using hdfs://…or to files in local filesystem, Amazon S3, and so on.

  3. Now, suppose we can’t wait an entire day for the error counts, and need to publish updated results every minute during the day. We don’t have to re-implement the aggregation — we can just reuse it in our streaming code:

     

    val ssc = new StreamingContext(sparkConf, 60) // Create the DStream from data sent over the network val dStream = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_AND_DISK_SER) // Counting the errors in each RDD in the stream val errCountStream = dStream.transform(rdd => ErrorCount.countErrors(rdd)) // printing out the current error count errCountStream.foreachRDD(rdd => { System.out.println("Errors this minute:%d".format(rdd.first()._2)) }) // creating a stream with running error count val stateStream = errCountStream.updateStateByKey[Int](updateFunc) // printing the running error count stateStream.foreachRDD(rdd => { System.out.println("Errors today:%d".format(rdd.first()._2)) })

     

    Once again, we are initializing a context – this time, it’s a SteamingContext. StreamingContext takes a stream of events (in this case from a network socket; production architecture will use a reliable service like Apache Kafka instead) and turns them into a stream of RDDs.

    Each RDD represents a micro-batching of the stream. The duration of each micro-batch is configurable (in this case 60-second batches), and can serve to balance between throughput (larger batches) and latency (smaller batches).

    We run a map job on the DStream, using our countErrors function to transform each RDD of lines from the stream into an RDD of (date, errorCount).

    For each RDD we output the error count for this specific batch, and use the same RDD to update a stream with running totals of the counts. We use this stream to print the running totals.

For simplicity you could print the output to screen, but you can also save it to HDFS, Apache HBase, or Kafka, where real-time applications and users can use it.

Conclusion

To recap: Spark Streaming lets you implement your business logic function once, and then reuse the code in a batch ETL process as well as a streaming process. In the customer engagement I described previously, this versatility allowed us to very quickly implement (within hours) a real-time layer to complement the batch-processing one, impress users and management with a snazzy demo, and make our flight home. But its not just a short term POC win. In the long term, our architecture will require less maintenance overhead and have lower risk for errors resulting from duplicate code bases.

Acknowledgements

Thanks to Hari Shreedharan, Ted Malaska, Grant Henke, and Sean Owen for their valuable input and feedback.

Gwen Shapira is a Software Engineer (and former Solutions Architect) at Cloudera. She is also a co-author of the forthcoming book Hadoop Application Architectures from O’Reilly Media.

Categories: Hadoop

Bayesian Machine Learning on Apache Spark

Cloudera Blog - Wed, 08/27/2014 - 14:51

Markov Chain Monte Carlo methods are another example of useful statistical computation for Big Data that is capably enabled by Apache Spark.

During my internship at Cloudera, I have been working on integrating PyMC with Apache Spark. PyMC is an open source Python package that allows users to easily apply Bayesian machine learning methods to their data, while Spark is a new, general framework for distributed computing on Hadoop. Together, they provide a scalable framework for scalable Markov Chain Monte Carlo (MCMC) methods. In this blog post, I am going to describe my work on distributing large-scale graphical models and MCMC computation.

Markov Chain Monte Carlo Methods

MCMC methods are a set of widely-used algorithms in Bayesian inference. These methods mainly aim to approximate an intractable posterior function by random sampling, and their applications can be found in fields like physics, system simulation, econometrics, and machine learning. Andrieu et al. [2] describe how MCMC is useful for computing integrals or optimizing functions in large-dimensional spaces, which in the context of machine learning translates into parameter estimation, prediction, and model selection. As the community is transitioning from processing medium-scale data to Big Data, being able to analyze large dimensional data is becoming more significant than ever.

There are ongoing efforts in academia to implement MCMC methods in distributed and parallel settings. During my internship at Cloudera, I tried to adapt the PyMC framework to Spark in order to run multiple MCMC chains on distributed data, while maintaining PyMC’s convenient abstractions for computing on probabilistic graphical models (PGMs).

In lieu of covering the theoretical background of MCMC methods, below are a few useful resources for interested readers:

  • Bayesian Reasoning and Machine Learning by David Barber has a chapter on Approximate Sampling
  • Christophe Andrieu et al. have written an introductory tutorial (pdf) on MCMC methods that covers most of the MCMC algorithms
  • Dr. Daphne Koller offers an online course on Coursera, Probabilistic Graphical Models, which also covers the Gibbs Sampler and the Metropolis-Hastings Algorithm
  • Dr. A. Taylan Cemgil has prepared very useful lecture notes (pdf) for his Monte Carlo methods course
PyMC

PyMC is a widely used Python package that provides a library of tools for defining Bayesian statistical models and applying approximate inference techniques on them. It has built a well designed abstraction for defining statistical models and analyzing the sampled data after the sampling process.

To introduce the PyMC user API, we will follow the great introductory example provided in PyMC’s User’s Guide: analyzing the coal mining dataset [1]. The dataset contains the number of coal-mining disasters in England from 1851 to 1962. What we would like to find out from this dataset is the year where a dramatic change had occurred in the number of deaths caused by these disasters.

This dataset is composed of the number of events (deaths from coal mine disasters) that occur each year in the range. It is natural to model the probability distribution of the number of events with a Poisson distribution. The switchpoint, which is assumed to be distributed uniformly along the year domain, divides the data into two regions where each regions uses a different mean for its Poisson distribution. Furthermore, it is assumed that both of the means follow an Exponential distribution.

Defining the parameters of the model is easy and straightforward (the original code is in disaster_model.py):

from pymc import * from numpy import array, empty from numpy.random import randint disasters_array = array([4, 5, 4, 0, 1, 4, 3, 4, 0, 6, 3, 3, 4, 0, 2, 6, 3, 3, 5, 4, 5, 3, 1, 4, 4, 1, 5, 5, 3, 4, 2, 5, 2, 2, 3, 4, 2, 1, 3, 2, 2, 1, 1, 1, 1, 3, 0, 0, 1, 0, 1, 1, 0, 0, 3, 1, 0, 3, 2, 2, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 2, 1, 0, 0, 0, 1, 1, 0, 2, 3, 3, 1, 1, 2, 1, 1, 1, 1, 2, 4, 2, 0, 0, 1, 4, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1]) switchpoint = DiscreteUniform( 'switchpoint', lower=0, upper=110, doc='Switchpoint[year]') early_mean = Exponential('early_mean', beta=1.) late_mean = Exponential('late_mean', beta=1.) @deterministic(plot=False) def rate(s=switchpoint, e=early_mean, l=late_mean): ''' Concatenate Poisson means ''' out = empty(len(disasters_array)) out[:s] = e out[s:] = l return out disasters = Poisson('disasters', mu=rate, value=disasters_array, observed=True)

 

The model defines four stochastic variables

  1. switchpoint: the year that the change occurred; Uniform distribution
  2. early_mean: the average deaths per year before the change; Exponential distribution
  3. late_mean: the average deaths per year after the change; Exponential distribution
  4. disasters: this is the variable that generates the disasters data conditioned on switchpoint, early_mean, and late_mean; Poisson distribution

and a single deterministic variable, rate, which takes on the value of the early or late rate value depending on the switchpoint value.

After defining the model, we can learn the posterior distribution for switchpoint by sampling from the model:

m = MCMC([disasters, rate, switchpoint, early_mean, late_mean]) m.sample(500, burn=100)

 

The Python code above performs the inference by using an MCMC algorithm to draw 500 samples from our model (where the first 100 samples are ignored as a “burn-in period”). We can view the samples like so:

m.trace('switchpoint')[:]

 

Although PyMC holds the sampled values in memory by default, it also provides other “backend” options: “txt”, “pickle”, “sqlite” and “hdf5″. For instance, one can save the sampled data to a text file:

m = MCMC([disasters, rate, switchpoint, early_mean, late_mean], db='txt', dbname='path/to/text_file') m.sample(500, burn=100)

 

The best way to dive deeper into PyMC is to read the package’s User’s Guide.  Additionally, the examples folder in the source code contains very useful introductory code to start with.

Apache Spark

Apache Spark is a popular distributed computing framework that was started as a research project at UC Berkeley’s AMPLab [6]. The most prominent difference between Spark and the MapReduce paradigm is that Spark can keep the data in distributed memory. Because MCMC methods consist of iterative computations, Spark would be the most suitable platform for running MCMC jobs in parallel.

For further information, Sean Owen’s blog post explains why Spark is a great platform for data scientists.

PyMC on Spark

By its design, PyMC runs on a single computer, which limits the size of the data that can be analyzed. This summer, I’ve been working on integrating PyMC and Spark, and providing users a PyMC-based abstraction to run their MCMC algorithms distributed on Spark. The development version of this project can be found under the public repository pymc.

In order to accomplish the above goal, I started with implementing another backend option, HDFS, to enable users to save their traces to HDFS as txt files. The only dependency for the HDFS backend is PyWebHDFS, which can be installed via pip.

If we follow the disaster model example, we can save the results to HDFS as follows:

import pymc as pm # an alternative way to load the same model: from pymc.examples import disaster_model # The HDFS path to save the traces (without the leading '/') dbname = 'user/test/results' # HDFS Configuration host = 'localhost' port = '50070' user_name = 'test' # Create and save data to HDFS M = pm.MCMC(disaster_model, db='hdfs', dbname=dbname, host=host, port=port, user_name=user_name) M.sample(100) M.db.close() # Load data from HDFS db = pm.database.hdfs.load(dirname=dbname, host=host, port=port, user_name=user_name) print db.trace('early_mean')[:]

 

In the above example, computation is done on a local machine and the data is sent over the network to HDFS for reads and writes. Instead, we would like to achieve distributed computation for big models that can’t fit into or would be slow to run on a single machine (MCMC is a collection of methods which are more CPU bound). Therefore, I have implemented DistributedMCMC, a class which fits distributed models, along with distributed_spark, a backend class for storing the samples (analogous to other PyMC backends).

The DistributedMCMC class works by running MCMC chains locally on partitions of data with periodic global synchronizations. It can be instantiated with custom

  • PyMC models
  • Step methods
  • Data-preprocessing functions to avoid recomputing derived data on each iteration
  • Factory functions that create broadcasted global parameters
  • Storage functions for serializing sampled data to HDFS rather than storing them in memory (vital for large sampled data)

The purpose of the global parameter is to enable the executors to remain synchronized across nodes and to ensure that the samples are being drawn from the true posterior. The number of local steps to take before every synchronization is also parameterized as local_iter. Since the sampling procedure is paused to synchronize the distributed models, a global update is an expensive operation. Therefore, the user should set the local_iter as high as their model allows, though most of the distributed models require frequent synchronizations.

To illustrate the distributed model better, let’s review an example.

Topic Modeling with MCMC

Given a set of documents and number of topics, topic modeling is a collection of statistical modeling algorithms that clusters the documents according to their topics. Examples of such models are probabilistic latent semantic indexing, non-negative matrix factorization, latent Dirichlet allocation (LDA), and hierarchical Dirichlet processes. The idea behind topic modeling is to consider topics as a probability distribution over words and documents as a mixture of topics. In this section, I will dive into the LDA model I implemented with PyMC and Spark. Feel free to skip ahead to the next section.

Latent Dirichlet Allocation

LDA was first proposed in a paper by David M. Blei, Andrew Y. Ng, and Michael I. Jordan in 2003 [3]. As defined in the paper, the terms are:

  • φt,w is the probability of topic t generating word w
  • θd,t is the probability of sampling topic t for document d
  • xd,i is the ith word in the dth document
  • zd,i is the topic assignment for the ith word in the dMth document

LDA is a generative model which assumes the following generative process:

  • φt ∝ Dirichlet(β)
  • θd ∝ Dirichlet(α)
  • zd,i ∝ Categorical(θd)
  • xd,i ∝ Categorical(φz(d,i))

Since LDA is a generative model, one can model a corpus of documents as well as generate some documents given the parameters.

For a more detailed interpretation of LDA, Probabilistic Topic Models by Mark Steyvers and Tom Griffiths is a very informative article.

Collapsed Gibbs Sampling

We can apply any MCMC sampling algorithm to a corpus of documents to extract the hidden topics, however a typical LDA model contains large numbers of nodes and this might affect the convergence rate. In order to prevent slow convergence, Steyvers and Griffiths [4] have proposed a Collapsed Gibbs Sampler for LDA, where the latent variables φ and θ are integrated out and the topic for each word in the corpus is sampled conditioned on all other variables. Therefore, instead of regular Metropolis-Hastings method, I have implemented the collapsed Gibbs sampler for the LDA model. Griffiths and Steyvers have showed [3] the conditional probability of sampling topic t as zd,j given all other variables is:

where

is a matrix of word-topic counts excluding the current instance, and on the other hand,

is a matrix of document-topic counts not including the current topic assignment.

Approximate Distributed LDA

The approximate distributed LDA (AD-LDA) model was first proposed by Newman et al. [5] and takes advantage of the weak dependency between topic assignments to different words to distribute the Gibbs sampling updates. After partitioning the corpus across the executors, the algorithm is composed of two parts:

  • Sample topics locally on each executor using the collapsed Gibbs sampler
  • Synchronize the word-topic counts

We don’t synchronize document-topic counts because each document is local to a single executor. On the other hand, the same vocabulary is being used by all executors, so they need to synchronize the word-topic counts after a specified number of iterations of the local Gibbs sampler. The synchronization is being performed by combining word-topic counts in a reducer (as illustrated in [5]), followed by broadcasting the updated word-topic counts to the executors using the SparkContext.broadcast() method.

Distributed LDA on Spark with PyMC

I’ve implemented the AD-LDA algorithm in the CollapsedDistributedLDA module. Because the code is involved, I will only outline the structure here.

The code begins by instantiating a DistributedMCMC object.  The supplied model_function will be run on each executor, returning an MCMC object that locally runs the MCMC using the instantiated stochastic variable, z (initialized randomly).  The supplied step_function defines the collapsed Gibbs sampling step method. Synchronization between the executors is achieved with the global_update function, in which a reducer combines the word-topic counts in each of the executors and then distributes the new matrix.

NIPS Dataset

We will be extracting topics from the NIPS data set. NIPS is the Conference on Neural Information Processing Systems, focused on machine learning and held annually. Sam Roweis has applied OCR techniques to the scanned papers that were published in the proceedings and made the raw data available here.

I have prepared a Python scriptto preprocess the corpus which filters out the stopwords (words that can be found frequently and does not contain any semantic information, like “a”, “the”, “in”, “on”, etc.), applies lemmatization on words, removes the infrequent words (

Results

The experiments were conducted on a cluster with 6 machines, and 12 executors on each of the machines. The total_partitions parameter was set to 72 (6 nodes x 12 executors) to maximize the parallelism of the DistributedMCMC class. As suggested in [4], alpha and beta were set to 0.1 and 0.01 respectively.

A total of 100 topics were extracted using the AD-LDA algorithm. Below are five random topics, each with 10 words that have highest probabilities in the corresponding topic-word distributions, φ:

Topic #14

Topic #52

Topic #47

Topic #78

Topic #96

mixture

network

rule

radio

matching

em

model

fuzzy

objective

graph

density

learning

cell

packet

match

p

neural

extraction

optimization

object

data

input

symbolic

channel

point

estimation

function

domain

power

objective

likelihood

set

knowledge

signal

correspondence

model

figure

group

multiscale

constraint

x

training

expert

application

matrix

parameter

system

shavlik

scale

distance

 

As it is illustrated in the above table, topics that were formed by LDA algorithm make sense, where topic #14 is about maximum likelihood estimation and topic #52 contains terms related to neural nets.

Other than observing the multinomial distribution over words for topic, one can also compute the similarity between two documents, say document i and document j, by computing the Kullback-Leibler divergence of their corresponding topic distributions, KL(θi, θj) [4].

Replicating the Experiments

Here are some additional notes for those who want to replicate the experiments and run the AD-LDA model using PyMC on Spark.

  1. Install NLTK and the WordNet and Stopword corpora. import nltk nltk.download()

     

    and select the relevant corpora from the popup.

  2. Clone the repository from https://github.com/mertterzihan/pymc(make sure you’re on the pyspark branch, which should be the default) and compile it by running the following commands: python setup.py config_fc --fcompiler gfortran build python setup.py install python setupegg.py bdist_egg

     

    The final step creates an egg file that will be distributed to the workers with Spark’s broadcast method.

  3. Download the NIPS dataset from here and then run preprocess_nips.py to preprocess the raw data. Make sure that parent_folder and destination_file variables point to the folder that contains raw data and desired output file location, respectively. This script requires NLTK, and WordNet and Stopword corpora to be downloaded.
  4. Make sure you run Spark >= 1.0.2.  Earlier versions throw an exception when trying to run PyMC. Check out these instructions for running a custom version of Spark on YARN.
  5. Inside the PySpark IPython shell, copy and paste CollapsedDistributedLDA. Make sure to change the path variable (location of the NIPS dataset), the egg file path (location of egg file formed after compiling PyMC), and path variable in the save_traces method. You can change the number of partitions, number of topics, number of local iterations (total iterations of Gibbs sampler between two consecutive global updates), and number of total iterations to obtain various results to find the best set of parameters.
Future Work

Future work would include implementing other large scale graphical models and sampling algorithms, and experimenting on a wide range of datasets. Moreover, I’ve been in contact with the authors of PyMC regarding contributing the distributed MCMC code to the project.

Acknowledgments

I would like to thank my manager Josh Wills, my mentor Uri Laserson, Sean Owen, and Sandy Ryza for the invaluable guidance throughout my project. My internship at Cloudera has been a great experience and I can’t thank enough to them for this wonderful opportunity.

References

[1] R. G. Jarrett. A note on the intervals between coal mining disasters. Biometrika, 66:191-193, 1979.
[2] C. Andrieu, N. de Freitas, A. Doucet, and M. I. Jordan. An Introduction to MCMC for Machine Learning. Machine Learning, 50(1-2): 5-43, January 2003.
[3] D. M. Blei, A. Y. Ng, and M. I. Jordan. Latent Dirichlet Allocation. Journal of Machine Learning Research, 3:993-1022, March 2003.
[4] M. Steyvers, and T. Griffiths. Probabilistic Topic Models. Handbook of Latent Semantic Analysis, 427(7):424-440, 2007.
[5] D. Newman, A. Asuncion, P. Smyth, and M. Welling. Distributed Inference for Latent Dirichlet Allocation. Advances in Neural Information Processing Systems, 1081-1088, 2007.
[6] M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica. Spark: Cluster Computing with Working Sets. Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, 10:10, 2010.
[7] D. Barber. Bayesian Reasoning and Machine Learning. Cambridge University Press, 2012.

Mert Terzihan is a summer intern on the Data Science team at Cloudera.

Categories: Hadoop

What’s Next for Impala: Focus on Advanced SQL Functionality

Cloudera Blog - Tue, 08/26/2014 - 15:27

Impala 2.0 will add much more complete SQL functionality to what is already the fastest SQL-on-Hadoop solution available.

In September 2013, we provided a roadmap for Impala — the open source MPP SQL query engine for Apache Hadoop, which was on release 1.1 at the time — that documented planned functionality through release 2.0 and beyond.

Impala is now on release 1.4, with many major features delivered since our previous roadmap update, and adoption is at an all-time high: it’s been download by 10,000 unique organizations since January 2013, is in use by most of Cloudera’s enterprise data hub customers, and is shipped by MapR, Amazon, and inside the Oracle Big Data Appliance in addition to Cloudera. For these reasons, it seems like a good time to elaborate on the 2.x roadmap.

First, let’s recap what has been delivered since 1.1. Then, we’ll follow with a list of the substantial new features, mainly in the area of SQL functionality, planned for Impala 2.0 and a few of the features beyond.

Delivered Thus Far Impala 1.2 (Shipped Oct. 2013)
  • UDFs and extensibility – enables users to add their own custom functionality; Impala will support existing Hive Java UDFs as well as high-performance native UDFs and UDAFs
  • Automatic metadata refresh – enables new tables and data to seamlessly be available for Impala queries as they are added without having to issue a manual refresh on on each Impala node
  • Cost-based join order optimization – frees the user from having to guess the correct join order
  • Additional authentication mechanisms – including the ability to specify Active Directory username/passwords in addition to the already supported Kerberos authentication

Impala 1.3 (Shipped May 2014)
  • Admission Control – allows prioritization and queueing of queries within Impala
  • Preview of YARN-integrated resource manager (CDH 5.0) — allows prioritization of workloads at a finer granularity than the service-level isolation currently provided in Cloudera Manager
  • Improved memory consumption at higher scale – allows for greater multi-user concurrency with lower memory footprints
Impala 1.4 (Shipped July 2014)
  • In-memory HDFS caching (CDH 5.1 or higher) via Impala DDL – allows access to frequently accessed Hadoop data at in-memory speeds
  • DECIMAL data type – allows Impala to query fixed-precision numeric data
  • Faster COMPUTE STATS – 5x faster statistics capture than previous releases
  • Additional built-ins from traditional databases – easier migration with some common SQL language extensions like statistics functions such as TRUNC and EXTRACT
  • ORDER BY without LIMIT clauses – allows easier migration of existing queries without having to fit in memory or requiring LIMIT clauses
  • Improved performance for selective joins – improvements in such queries by over 2x compared to previous versions of Impala
  • Enhanced, production-ready, YARN-integrated resource manager (CDH 5.1 and later)
To Be Delivered by Impala 2.x

Impala 2.0, scheduled for release by the end of 2014, is the most significant milestone since GA. It will add the most popular SQL analytic language features on top of what has already been demonstrated to be not only be the fastest SQL-on-Hadoop solution (by at least 950% compared to Shark, “Stinger,” and Presto), but more important, one that has been documented by multiple customers as performing on the same level as traditional MPP query engines yet doing so on Hadoop-native data sets. Essentially, the Impala 2.0 milestone marks the point at which Hadoop users will get the “whole package”: the expected SQL support and performance of commercial MPP-query engines, running natively on Hadoop.

Impala 2.0 (Ships in Fall 2014)
  • SQL 2003-compliant analytic window functions (aggregation OVER PARTITION, RANK, LEAD, LAG, NTILE, and so on) – to provide more advanced SQL analytic capabilities
  • External joins and aggregations using disk – enables operations to spill to disk if their internal state exceeds the aggregate memory size
  • Subqueries inside WHERE clauses
  • Incremental statistics – only run statistics on the new or changed data for even faster statistics computations
  • Additional data types – including VARCHAR, CHAR
  • Additional built-in functions – enables easier migration of custom language extensions for users of traditional SQL engines
Impala 2.1 and Beyond (Ships in 2015)
  • Nested data – enables queries on complex nested structures including maps, structs, and arrays (early 2015)
  • MERGE statement – enables merging in updates into existing tables
  • Additional analytic SQL functionality – ROLLUP, CUBE, and GROUPING SET
  • SQL SET operators – MINUS, INTERSECT
  • Apache HBase CRUD – allows use of Impala for inserts and updates into HBase
  • UDTFs (user-defined table functions) – for more advanced user functions and extensibility
  • Intra-node parallelized aggregations and joins – to provide even faster joins and aggregations on on top of the performance gains of Impala
  • Parquet enhancements – continued performance gains including index pages
  • Amazon S3 integration
Conclusion

From the outset, we described the Impala journey as one that would take its users beyond the limits of what they thought Hadoop could do by offering the performance and SQL capabilities of traditional analytic DBMSs natively on Hadoop data. The functionality delivered thus far has certainly done that in terms of performance, and with the features planned for Impala 2.0, we’re confident it will do the same with respect to SQL functionality.

As we’ve written before, thanks to these features, Impala uniquely delivers on requirements for BI and SQL analytics in enterprise data hubs by blending:

  • Low-latency queries for a BI user experience
  • Ability to handle highly-concurrent workloads
  • Efficient resource usage in a shared workload environment (via YARN)
  • Open formats for accessing any data from any native Hadoop engine
  • Multi-vendor support to avoid lock-in, and
  • Broad ISV support

As always, we welcome your comments and feedback!

Justin Erickson is Director of Product Management at Cloudera.

Marcel Kornacker is Impala’s architect and the Impala tech lead at Cloudera.

Categories: Hadoop

Improving Query Performance Using Partitioning in Apache Hive

Cloudera Blog - Fri, 08/22/2014 - 16:02

Our thanks to Quaero, for allowing us to re-publish the post below about its experiences using partitioning in Apache Hive.

In this post, we will talk about how we can use the partitioning features available in Hive to improve performance of Hive queries.

Partitions

Hive is a good tool for performing queries on large datasets, especially datasets that require full table scans. But quite often there are instances where users need to filter the data on specific column values. Generally, Hive users know about the domain of the data that they deal with. With this knowledge they can identify common columns that are frequently queried in order to identify columns with low cardinality which can be used to organize data using the partitioning feature of Hive. In non-partitioned tables, Hive would have to read all the files in a table’s data directory and subsequently apply filters on it. This is slow and expensive—especially in cases of large tables.

The concept of partitioning is not new for folks who are familiar with relational databases. Partitions are essentially horizontal slices of data which allow larger sets of data to be separated into more manageable chunks. In Hive, partitioning is supported for both managed and external tables in the table definition as seen below.

CREATE TABLE REGISTRATION DATA ( userid BIGINT, First_Name STRING, Last_Name STRING, address1 STRING, address2 STRING, city STRING, zip_code STRING, state STRING ) PARTITION BY ( REGION STRING, COUNTRY STRING )

 

As you can see, multi-column partition is supported (REGION/COUNTRY). You do not need to include the partition columns in the table definition and you can still use them in your query projections. The partition statement lets Hive alter the way it manages the underlying structures of the table’s data directory. If you browse the location of the data directory for a non-partitioned table, it will look like this: .db/. All the data files are directly written to this directory. In case of partitioned tables, subdirectories are created under the table’s data directory for each unique value of a partition column. In case the table is partitioned on multiple columns, then Hive creates nested subdirectories based on the order of partition columns in the table definition. For instance, from the above example of the registration data table the subdirectories will look like the example below.

/quaero.db/registration-data/region=South America/country=BR /quaero.db/registration-data/region=South America/country=ME /quaero.db/registration-data/region=North America/country=US /quaero.db/registration-data/region=North America/country=CA

 

When a partitioned table is queried with one or both partition columns in criteria or in the WHERE clause, what Hive effectively does is partition elimination by scanning only those data directories that are needed. If no partitioned columns are used, then all the directories are scanned (full table scan) and partitioning will not have any effect.

Pointers

A few things to keep in mind when using partitioning:

  • It’s important to consider the cardinality of the column that will be partitioned on. Selecting a column with high cardinality will result in fragmentation of data and put strain on the name node to manage all the underlying structures in HDFS.
  • Do not over-partition the data. With too many small partitions, the task of recursively scanning the directories becomes more expensive than a full table scan of the table.
  • Partitioning columns should be selected such that it results in roughly similar size partitions in order to prevent a single long running thread from holding up things.
  • If hive.exec.dynamic.partition.mode is set to strict, then you need to do at least one static partition. In non-strict mode, all partitions are allowed to be dynamic.
  • If your partitioned table is very large, you could block any full table scan queries by putting Hive into strict mode using the set hive.mapred.mode=strict command. In this mode, when users submit a query that would result in a full table scan (i.e. queries without any partitioned columns) an error is issued.

Quaero’s data management platform (QDMP) uses partitioning extensively and we have greatly benefited from it. QDMP uses an identifier called “dataset instance id” to identify a chunk of data that flows through the system. The system also uses this column to collect stats about the data as it passes through various phases. Partitioning the tables of QDMP on this column, along with using Apache Parquet (incubating) as the storage format, helped us cut down the run times of our workflows by almost half.

This improvement was especially evident in the case of tables that were holding large historical data — prior to partitioning, a full table scan of these tables was done in order to collect the stats. Partitioning also enabled us to selectively expire portions of data without having to rebuild the table. In addition, we also partitioned our embedded analytics tables that are frequently queried upon by analytics team members. In this case, we selected the candidate columns for partitioning after analyzing the data query patterns.

In conclusion, in our experience, using Hive partitioning in the right context and on appropriate columns will help a data management platform be much more efficient.

Categories: Hadoop

The New Apache Flume Book is in Early Release

Cloudera Blog - Wed, 08/20/2014 - 15:56

Congratulations to Hari Shreedharan, Cloudera software engineer and Apache Flume committer/PMC member, for the early release of his new O’Reilly Media book, Using Flume: Stream Data into HDFS and HBase. It’s the seventh Hadoop ecosystem book so far that was authored by a current or former Cloudera employee (but who’s counting?).

Why did you decide to write this book?

I have been working on Apache Flume for the past two years, and have been actively responding to user and developer queries on the developer and user lists on Apache and Cloudera. Even though Flume and its components are pretty well documented, I realized that having a book that documented each component in detail, explained end-to-end deployment etc would really help users. There were a lot of lessons that I learned over the years building Flume and working with customers who have deployed Flume on thousands of servers. I felt that a book on Flume would be a good place to share these lessons.

Who should read this book?

The book essentially is meant for operations engineers who are planning to deploy or have already deployed Flume and developers who want to build custom Flume components for their specific use-cases.

Most sections of the book cover the configuration and operational aspects of Flume that can help operations engineers deploy and configure Flume. I have tried to share most of the lessons I learnt helping customers and users deploy and configure Flume in production.

Flume is highly customizable. This allows developers who want to customize Flume write their own plugins. In this book, I describe how to implement plugins for various Flume components with examples.

What are your favorite things about Flume that you want people to know?

Flume is extremely flexible by design. Literally, every major component in a Flume agent is pluggable and users can deploy their own implementations. This leads to a wide variety of usecases that as developers even we did not expect to see. Custom formats, modifying events specific to use-case, lightweight processing, and so on can be easily done in Flume by simply dropping in plugins.

What are some other things that the Flume community can do to make Hadoop data ingestion easier?

One of the things I hope would be added to Flume is a centralized configuration mechanism that allows the user to deploy the configuration in one place than on every single machine. Cloudera Manager added this functionality some time back, but still it would be nice to see this happening within Flume. There is work going on in the Apache community to integrate this feature into Flume (FLUME-1491). Once this gets committed, Flume configuration would become much easier.

Categories: Hadoop

Big Data Benchmarks: Toward Real-Life Use Cases

Cloudera Blog - Tue, 08/19/2014 - 15:49

The Transaction Processing Council (TPC), working with Cloudera, recently announced the new TPCx-HS benchmark, a good first step toward providing a Big Data benchmark.

In this interview by Roberto Zicari with Francois Raab, the original author of the TPC-C Benchmark, and Yanpei Chen, a Performance Engineer at Cloudera, the interviewees share their thoughts on the next step for benchmarks that reflect real-world use cases.

This interview was originally published at ODBMS.org; thanks to Roberto for his permission to republish.

There have been a number of attempts at constructing big data benchmarks. None of them has yet gained wide recognition and usage. Why?

Yanpei: Many big data benchmarks are just like big data systems – new, and with room to improve and grow. In more detail big data systems: – rapidly evolve, so it’s important to define performance in ways that matter for end customers. – consist of many interdependent components, so it’s difficult to measure performance in a reliable fashion. – service diverse business needs using diverse implementations, so benchmarks need to accommodate different system implementations.

Francois: It’s unlikely that a big data benchmark will gain wide recognition until a clear “playing field” has emerged and focused the competitive pressure. There are three phases in the evolution of a new technology. First, the technology is introduced and applied to a wide array of solutions without a proven return on investment. Next, a “killer app” emerges from the early adopters and its rapid growth draws all the vendors into competing on a common playing field. Lastly, some technologies emerge as clear winners in the race and the market start to consolidate around a few dominant vendors. Big data has not entered the second phase yet.

Is it possible to build a truly representative big data benchmark?

Yanpei: Absolutely! To me, the rise of “big data” in part comes from our increased ability to instrument, measure, and ultimately derive value from large scale systems – technology systems, financial systems, medical systems, or physical systems touching day-to-day life. Big data systems, as a special case of technology systems, also deal with ever increasing instrumentation and measurement. Over time, I am absolutely confident that we will increase our understanding of big data systems, and with it, improve the quality of our big data benchmarks.

Cloudera’s broad customers base gives us visibility into big data deployments across telecom, banking, retail, manufacturing, media, government, healthcare, and many other industry sectors. We’re in a great position to identify representative use cases. [Editor's note: Cloudera is actively working with the TPC on Big Data benchmarks that include SQL-on-Hadoop and other compute frameworks.]

Francois: A benchmark is a somewhat abstract (i.e. simplified) model of a real life scenario. The question we face today is to identify a scenario that Fortune 500 companies would widely recognize as relevant to their operations and vital to their competitive survival. Once that critical mass has been reached it will quickly spread to the entire commercial data processing landscape and a successful big data benchmark will be built based on that scenario.

How would you define a Big Data Benchmark ?

Yanpei: The key properties of good big data benchmarks are a re-cast of the same properties for benchmarks of more established systems.

A good big data benchmark should be representative of real-life use cases; it should generate performance insights immediately relevant to diverse and evolving big data use cases. The benchmark should also be scalable; it should stress big data systems today, as well as the vastly improved systems in the future. The benchmark should be portable, meaning it should accommodate systems with different implementations that achieve the same end-goal. The benchmark should also be verifiable, in that the results can be checked by independent auditors if needed, and end-users can reproduce on their own systems the winning configurations and result.

Can you give some examples of Successful Benchmarks ?

Yanpei: My co-author Francois was a lead contributor to TPC-C, a very successful benchmark for online transactional processing (OLTP). He can share other examples.

Francois: The success of a benchmark can be measured by its number of published results and by its longevity over shifts in the underlying technologies. By that measure TPC-C and TPC-H are leading the field. While it can be argued that they have lost relevance over their two decades lifetime, they still encapsulate critical elements at the core of the application domains they represent (transaction processing and decision support).

One of the main purposes of a benchmark is to evaluate and contrast the merits of various implementations of the same set of requirements. How do you do this with Big Data?

Yanpei: You construct benchmarks that are portable. In other words, you specify implementation-independent requirements.

Best illustrated by example – TPC-C. TPC-C specifies five operations – New Order, Payment, Delivery, Order-Status, and Stock-Level. It also describes the interdependencies between these operations. For example, every New Order will be accompanied by Payment, but only one in ten New Orders will trigger an Order Status. TPC-C describes the load that the system under test should handle – many concurrent operations arriving in randomized order with randomized inter-arrival time, but at controlled relative frequencies. TPC-C also specifies the initial content of all the datasets, as well as how the content grows over the execution of the benchmark. This is an implementation-independent set of requirements – “handle these operations on these data sets.” The underlying system could be a relational database, or a key-value store like HBase.

Francois: Benchmarks can be defined one of two ways: by creating a kit to be deployed on technology specific platforms or by specifying a set of technology agnostic requirements to be implemented at will. Because big data has first emerged from the MapReduce paradigm, we have seen a number of technology centric benchmarks (also called component benchmarks) that put a narrow focus on one or more components of a predefined solution. But we should soon expect to see a big data application emerge as the new must-have in commercial data centers.

In a recent position paper you argued for building future big data benchmarks using what you call a “functional workload model.” What is it?

Francois: We introduced a couple of terms in that position paper to highlight the core concepts underlying representative, scalable, portable, and verifiable big data benchmarks.

The “functional workload model” is a way to specify such benchmarks. It contains three things – the “functions of abstraction,” the load pattern serviced by the system, and the data sets being acted upon.

“Functions of abstraction” describes “what is being computed” without specifying “how the computation should be done.” The intent is an abstract, functional description that allows the benchmark to be portable across systems of different compute paradigms. “What is being computed” should be justified by empirical evidence, either system traces or industry-wide surveys, with emphasis on identifying the common computation goals.

The load pattern describes “what is the serviced load” without specifying “how it is serviced.” It outlines the execution frequency, distribution, arrival rate, bursts and averages over time of each individual function of abstraction.

The data sets describe “what is the data and the relationships within the data” without specifying “how it is represented.” It is in terms of the structure and interdependence between data elements, initial size and contents, how it evolves over the course of the workload execution, and how it is expected to scale with the system size and load volume.

These concepts help us routinely identify shortcomings in haphazardly specified benchmarks. For example, some of the most often-cited big data benchmarks contain artificial functions of abstraction that do not match any common use cases. Or, a multi-job, multi-query load pattern is missing altogether, or the data sets are represented in unrealistic formats that inflate performance advantages.

Why did you select TPC-C as a starting point for your work?

Yanpei: Because TPC-C already has a functional workload model within its specification. And because Francois wrote TPC-C.

Francois: The functional workload model is the underlying structure on which TPC-C was built. Subsequent TPC benchmarks, like TPC-H and TPC-E, were also built based on a functional workload model.

How does your functional workload model compares with TPC-C ?

Yanpei: TPC-C already uses the functional workload concept.

For your functions of abstractions concept to be useful, it must be applicable to different types of big data systems. Two important examples are relational databases and MapReduce. How do you do that? How does your work compare with other MapReduce-Specific Benchmarks ?

Yanpei: Best illustrated by example.

Suppose we discover that sorting data is a common operation in real-life production use cases. We would then define “sort” as a function of abstraction. We would define it in the same fashion as the official Sort Benchmark – the input data is of size X, format Y, and the system is asked to produce output sorted by order Z.

A relational database implementation could do, say, “insert into TABLE … ” followed by “select * from TABLE ordered by COLUMN”. A MapReduce implementation would use the IdentityMapper and IdentityReducer, and rely on the implicit shuffle-sort in MapReduce.

This is obvious for sort, because the sort operation has traditionally been defined in a system-independent way. In contrast, many of the existing MapReduce and relational database performance measurement tools are specified in ways that do not translate across different types of systems. The many SQL-on-Hadoop systems are fast removing the boundary. The functions of abstraction concept allows us to understand use-case at a level above than any SQL-only or Hadoop-only specifications.

What are in your opinion the Emerging Big Data Application Domains?

Francois: Everyone wants to figure out which application domain will become the big data killer app. Today, no commercial data center can live without on-line transaction processing or without decision support systems. Which big data application will become indispensable tomorrow? That is the million dollar question! Once we know that, a standard big data benchmark will soon follow.

Yanpei: The maturation of the Hadoop platform has been relentless. Its role has changed as the platform has gotten more secure, more reliable, more powerful, and (especially) more real-time. It’s no longer a system used for just big batch jobs. Instead, it has become the first place that data lands. It scales and it can store anything – no data need be discarded. It’s used to pre-process data before delivering it to an enterprise data warehouse, a document repository, an analytic engine, a CRM or ERP application, or other specialized system. Most significantly, it has begun to take over some of the work previously done by those traditional platforms, because it can do real-time search and analysis on the data directly, in place, and without further Extract-Transform-Load (ETL).

This leads to the emergence of the enterprise data hub (EDH), a new architecture to complement existing investments and help put data at the center of an organisation’s business. An enterprise data hub allows storage of any amount and type of data, for as long as is needed, and accessible in any way needed. Additional necessary attributes of EDHs include: It’s Secure and Compliant, offering perimeter security and encryption, plus fine-grained (row and column-level), role-based access controls over data, just like a data warehouse. It’s Governed, enabling users to do data discovery, data auditing, and data lineage, thus understanding what data is in their EDH and how the data are used. It’s Unified and Manageable, providing native high-availability, fault-tolerance, self-healing storage, automated replication, and disaster recovery, as well as advanced workload management capabilities to enable multiple speciacialist systems to analyze the same data set. And it’s Open, ensuring that customers are not locked in to any particular vendor’s license agreement, that you can choose what tools to use with your EDH, and nobody can hold your data or applications hostage.

The emergence of EDHs pose both challenges and opportunities for defining big data benchmarks. As Francois alluded to, the representative scenarios typically involve application domains whose performance has traditionally been measured separately, such as the case for on-line transaction processing and decision support systems. How to define and measure performance for such concurrent application domains present both a challenge and an opportunity. Further, to compare different EDHs, it becomes necessary to quantify characteristics that are previously yes/no checks – which is the more secure EDH? the better governed? the more unified and more manageable? the more open? How to quantify such characteristics will stretch our performance thinking and measurement methodology into new territory.

Future work ?

Yanpei: We have a Performance Engineering Team at Cloudera. We insist on systematic, fair, and repeatable tests both for our internal performance assessment and competitive studies. We are also engaged with community efforts to define big data benchmarks. Look for our future posts on Cloudera Engineering’s Blog!

Further Reading:

Francois Raab is a recognized, award winning expert in the field of performance engineering, benchmark design and system testing. He is the original author of the TPC-C Benchmark, the most successful industry standard measure of OLTP performance. He was also co-author of The Benchmark Handbook (pub. Morgan Kaufmann). He is currently the President of InfoSizing, Inc.

Yanpei Chen is a member of the Performance Engineering Team at Cloudera, where he works on internal and competitive performance measurement and optimization. His work touches upon multiple interconnected computation frameworks, including Cloudera Search, Impala, Apache Hadoop, Apache HBase, and Apache Hive. He is the lead author of the Statistical Workload Injector for MapReduce (SWIM), an open source tool that allows someone to synthesize and replay MapReduce production workloads.

 

Categories: Hadoop

Running CDH 5 on GlusterFS 3.3

Cloudera Blog - Mon, 08/18/2014 - 16:45

The following post was written by Jay Vyas (@jayunit100) and originally published in the Gluster.org Community.

I have recently spent some time getting Cloudera’s CDH 5 distribution of Apache Hadoop to work on GlusterFS 3.3 using Distributed Replicated 2 Volumes. This is made possible by the fact that Apache Hadoop has a pluggable filesystem architecture that allows the computational components within the CDH 5 distribution to be configured to use alternative filesystems to HDFS. In this case, one can configure CDH 5 to use the Hadoop FileSystem plugin for GlusterFS (glusterfs-hadoop), which allows it to run on GlusterFS 3.3. I’ve provided a diagram below that illustrates the CDH 5 core processes and how they interact with GlusterFS.

Running a Single CDH 5 Deployment on One or More GlusterFS Volumes

Given that the CDH 5 distribution is comprised of other components besides YARN and MapReduce, I used the Apache Bigtop System Testing Framework to explicitly validate that Apache Sqoop, Apache Flume, Apache Pig, Apache Hive, Apache Oozie, Apache Mahout, Apache ZooKeeper, Apache Solr and Apache HBase also ran successfully.

Work is Still in Progress to Enable the Use of Impala

If you would like to participate in accelerating the work on Impala, please reach out to us on the Gluster mailing list

Implementation details for this solution and the specific setup required for all the components are available on the glusterfs-hadoop project wiki. If you have additional questions, feel free to reach out to me on FreeNode (IRC handle jayunit100), @jayunit100 on twitter, or via the Gluster mailing list.

Categories: Hadoop

How-to: Count Events Like a Data Scientist

Cloudera Blog - Mon, 08/18/2014 - 15:05

The ability to quickly and accurately count complex events is a legitimate business advantage.

In our work as data scientists, we spend most of our time counting things. It is the foundational skill that is used in data cleansing, reporting, feature engineering, and simple-but-effective machine learning models like Naive Bayes classifiers. Hilary Mason has a quote about the benefits of counting that I love:

Understand that what big data really means is to be able to count things in data sets of any size, rapidly. And the advantage we get from that is not a technical advantage…it’s actually a cognitive advantage.

Learning how to count things quickly and accurately is both incredibly powerful and surprisingly challenging. To illustrate this, let’s walk through a classic problem that shows you how to think about counting the way a data scientist does. In order to make this example as broadly accessible as possible, I’m going to do every step of this analysis via SQL, instead of with a programming language like Python or Java.

Your First Data Science Project: Analyzing Misspelled Queries

Let’s say that you have just been hired as a data scientist to work on a new search engine for mobile applications. Users have been complaining that it is difficult to find the apps that they want to install:

  • They have a hard time typing the name of the apps on the phone’s keyboard without making a mistake or having autocorrect “fix” the spelling of a word.
  • Some of the apps have unusual names that don’t have a single obvious spelling.
  • The most popular apps inspire copycats with slightly different names, and it can be hard to identify the “real” app.

Your job is to analyze the impact of these misspelled or mistyped queries and then come up with some practical ways to help our users find the apps they want. Fortunately, the developers of the search engine have been keeping careful logs of all the queries and app installs that users have done, and these logs are stored in Hadoop and available for querying:

> DESCRIBE searches; event_id: bigint account_id: bigint query: string tstamp_sec: bigint (some other columns) > DESCRIBE installs; event_id: bigint account_id: bigint app_id: bigint search_event_id: bigint (some other columns)

 

Note that both of the tables define an event_id field, which is a unique identifier for every logged record. Both tables also contain information about the account_id that issued the query, which identifies a particular user of our search engine. The searches table also has information about the query that was issued and the time of day (in seconds) that the query was received, while the installs table contains an app_id field for the application that was installed as well as a search_event_idfield that can link an install event to the search event that generated it.

Let’s start by finding out how often people perform a search without installing an app. One way to do that is to find out how many event_ids in the searches table do not appear in the search_event_id column of the installs table:

> SELECT SUM(CAST(b.search_event_id IS NULL AS INT)) as good_searches FROM searches a LEFT JOIN installs b ON a.event_id = b.search_event_id;

 

While this is useful, it isn’t exactly what we want to know, since there can be lots of reasons why a user didn’t install an app after doing a search. They may have just been browsing for apps, or there could have been an app they wanted but it was too expensive for them to buy, or the app they found could have been fairly large and they wanted to wait until they were connected to a WiFi network to download it. We want to be able to distinguish mistyped queries from these other kinds of search events.

The Unreasonable Effectiveness of Counting

The trick is to realize that there isn’t any single event in our logs that identifies a misspelled query; rather, it’s a combination of events that relate to each other in a certain way:

  1. A user types in a query, gets back a list of results, and doesn’t install any of them.
  2. A few seconds later, the user types in a slightly different query, gets back a new list of results, and installs at least one of the apps in the list.

In our logs, that would be a search event that did not have an associated install event, followed by a search event that did have an associated install event. As an added benefit, we can assume that the query in the second search event is a spell correction of the first one, and we can count how often certain query pairs occur in order to build a powerful spell correction model.

It’s relatively easy to understand this pattern based on our own experience as searchers, but it’s more difficult to formulate a high-performance SQL query that will allow us to accurately count these events. The main challenge is that we need to analyze the relationship between multiple search events that are in the same table, and a naive approach to doing this requires a very slow and very resource-intensive self-join. Our job as data scientists is to figure out ways to make counting complex events fast, and if we have a strong command of SQL, one way of doing that is to use analytic SQL functions like LAG and LEAD in order to analyze the relationship between sequential rows in the same table:

-- Create a table that has a column to indicate whether a search -- resulted in an install. > CREATE TABLE search_installs AS SELECT a.*, b.search_event_id is not null as installed FROM searches a LEFT JOIN installs b ON a.event_id = b.search_event_id; -- Analyze sequential searches by user_id. > CREATE TABLE spell_correction_candidates AS SELECT query qw, installed iw, tstamp_sec tsw, LEAD(query) OVER w qr, LEAD(installed) OVER w ir, LEAD(tstamp_sec) OVER w tsr FROM search_installs WINDOW w AS (PARTITION BY user_id ORDER BY event_id); -- Aggregate queries over the sequential rows under the condition that -- the first query didn't result in an install, the second one did, -- and the two queries occurred close together in time- under 10 seconds -- apart. > SELECT qw, qr, count(*) cnt FROM spell_correction_candidates WHERE iw = false AND ir = true AND tsr - tsw < 10 GROUP BY qw, qr;

 

The first query in this sequence is just a variation on our earlier LEFT JOIN that adds an indicator column for whether or not a search resulted in an install or not. We then perform our analytic SQL query using the LEAD function to get the value of the query, installed, and tstamp_sec values from the next row in the sequence that is partitioned by user_id and ordered by event_id. Finally, we do a simple filter and aggregation query over the results of the analytic SQL query to get the counts of how frequently certain query pairs appear as spell correction candidates.

From Data Analyst to Data Scientist

Using the analytic SQL query is a definite improvement in our model, but it still has some limitations that we would like to overcome. Right now, we’re only counting the queries that immediately precede another one as spell correction candidates, but it’s likely that a user often makes multiple incorrect queries before they enter the query that leads to the app they want. What we would really like to do is consider all of the queries that a user issued in some window of time before the successful query as potential spell correction candidates. Unfortunately, expressing this idea with analytic SQL functions is difficult, because it’s not clear how many rows we would need to lag/lead within each partition in order to identify all of the candidate queries.

Right now, the data for each user is stored in separate rows within separate tables, but if we were to restructure the data into a single table, in which each row contained all of the information about a single user — every search and every install they did — then we would have all of the information we needed within each row. The resulting table’s schema would look something like this:

> DESCRIBE sessions; account_id: bigint search_events: array<struct<event_id: bigint, query: string, tstamp_sec: bigint>> install_events: array<struct<event_id: bigint, search_event_id: bigint, app_id: bigint>>  

I call these “supernova schemas” because they look like a star schema where the fact tables have collapsed into one of the dimensions. It’s not immediately obvious that the supernova will help us solve our counting problem, because HiveQL does not have extensive built-in functions for working with arrays of structs. At this point, most data scientists would turn to programming languages like Java or Python in order to complete this analysis. But my commitment was to do every step of the analysis in SQL, so I wrote an extension to Hive that allows us to treat the arrays of structs within each row as if they were tiny database tables that can be queried via SQL. The resulting query looks like this:

-- Execute the spell correction analysis; SELECT qw, qr, count(*) as cnt FROM sessions LATERAL VIEW WITHIN( "SELECT t1.query qw, t2.query qr FROM t1 as bad, t1 as good WHERE bad.tstamp_sec < good.tstamp_sec AND good.tstamp_sec - bad.tstamp_sec < 30 AND bad.event_id NOT IN (select search_event_id FROM t2) AND good.event_id IN (select search_event_id FROM t2)", search_events, install_events) GROUP BY qw, qr;

 

The WITHIN function is a Hive table generating function, and we’re using Hive’s LATERAL VIEW syntax to express the aggregation of the results of the function across all of the rows in the table. Each of the arrays that is passed in after the nested SQL query gets assigned a position-based alias, so search_events is t1 and install_events is t2. Inside of the WITHIN function, we’re using the newly incubating Apache Optiq project to execute a self-join on the (tiny) search_events array in order to identify all of the candidate spell correction queries within a 30-second time window.

I decided to call this project Exhibit, and I hope that it is a useful way for data analysts who are making the transition to data science to work effectively with complex, nested records. I talked about building and using supernova schemas to solve other data science problems at MidwestIO and the GraphLab Conference.

Data Science Superpowers

Although this particular analysis focused on analyzing mistyped and misspelled queries, the form of this analysis occurs in lots of different problem domains. For example, you could use these counting techniques to identify usability problems with your website by analyzing visits that are followed by calls to a customer care center. A hospital could analyze treatment records to identify events that lead to certain negative health outcomes in patients like sepsis. Telecommunications providers can analyze sequences of dropped phone calls to identify locations that have poor network coverage.

The freedom to structure data in a way that makes it easy to quickly and accurately count complex events is my favorite data science superpower. As Hilary said, this isn’t simply a technical advantage, it’s a cognitive one: it allows me to understand the way that my customers experience my business, and then use that understanding to find new ways to improve that experience and create value.

Josh Wills is Cloudera’s Senior Director of Data Science.

Categories: Hadoop

Apache Hadoop 2.5.0 is Released

Cloudera Blog - Fri, 08/15/2014 - 15:33

The Apache Hadoop community has voted to release Apache Hadoop 2.5.0.

Apache Hadoop 2.5.0 is a minor release in the 2.x release line and includes some major features and improvements, including:

More details can be found in the documentation and release notes.

The next minor release (2.6.0) is expected to include some major features as well, including transparent encryption in HDFS along with a key management server,  work-preserving restarts of all YARN daemons, and others. Refer to the roadmap for a full, updated list.

Currently, Hadoop 2.5 is scheduled to ship inside CDH 5.2 (in late 2014).

Karthik Kambatla is Software Engineer at Cloudera and a Hadoop committer.

Categories: Hadoop

How-to: Use IPython Notebook with Apache Spark

Cloudera Blog - Thu, 08/14/2014 - 15:30

IPython Notebook and Spark’s Python API are a powerful combination for data science.

The developers of Apache Spark have given thoughtful consideration to Python as a language of choice for data analysis. They have developed the PySpark API for working with RDDs in Python, and further support using the powerful IPythonshell instead of the builtin Python REPL.

The developers of IPython have invested considerable effort in building the IPython Notebook, a system inspired by Mathematica that allows you to create "executable documents". IPython Notebooks can integrate formatted text (Markdown), executable code (Python), mathematical formulae (LaTeX), and graphics/visualizations (matplotlib) into a single document that captures the flow of an exploration and can be exported as a formatted report or an executable script. Below are a few pieces on why IPython Notebooks can improve your productivity:

Here I will describe how to set up IPython Notebook to work smoothly with PySpark, allowing a data scientist to document the history of her exploration while taking advantage of the scalability of Spark and Apache Hadoop.

Software Prerequisites
  • IPython: I used IPython 1.x, since I’m running Python 2.6 on CentOS 6. This required me to install a few extra dependencies, like Jinja2, ZeroMQ, pyzmq, and Tornado, to allow the notebook functionality, as detailed in the IPython docs. :These requirements are only for the node on which IPython Notebook (and therefore the PySpark driver) will be running.
  • PySpark: I used the CDH-installed PySpark (1.x) running through YARN-client mode, which is our recommended method on CDH 5.1. It’s easy to use a custom Spark (or any commit from the repo) through YARN as well. Finally, this will also work with Spark standalone mode.
IPython Configuration

This installation workflow loosely follows the one contributed by Fernando Perez here. This should be performed on the machine where the IPython Notebook will be executed, typically one of the Hadoop nodes.

First create an IPython profile for use with PySpark.

ipython profile create pyspark

 

This should have created the profile directory ~/.ipython/profile_pyspark/. Edit the file ~/.ipython/profile_pyspark/ipython_notebook_config.py to have:

c = get_config() c.NoteBookApp.ip = '*' c.NotebookApp.open_browser = False c.NotebookApp.port = 8880 # or whatever you want; be aware of conflicts with CDH

 

If you want a password prompt as well, first generate a password for the notebook app:

python -c 'from IPython.lib import passwd; print passwd()' > ~/.ipython/profile_pyspark/nbpasswd.txt

 

and set the following in the same .../ipython_notebook_config.py file you just edited:

PWDFILE='~/.ipython/profile_pyspark/nbpasswd.txt' c.NotebookApp.password = open(PWDFILE).read().strip()

 

Finally, create the file ~/.ipython/profile_pyspark/startup/00-pyspark-setup.py with the following contents:

import os import sys spark_home = os.environ.get('SPARK_HOME', None) if not spark_home: raise ValueError('SPARK_HOME environment variable is not set') sys.path.insert(0, os.path.join(spark_home, 'python')) sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.1-src.zip')) execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

 

Starting IPython Notebook with PySpark

IPython Notebook should be run on a machine from which PySpark would be run on, typically one of the Hadoop nodes.

First, make sure the following environment variables are set:

# for the CDH-installed Spark export SPARK_HOME='/opt/cloudera/parcels/CDH/lib/spark' # this is where you specify all the options you would normally add after bin/pyspark export PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client --num-executors 24 --executor-memory 10g --executor-cores 5'

 

Note that you must set whatever other environment variables you want to get Spark running the way you desire. For example, the settings above are consistent with running the CDH-installed Spark in YARN-client mode. If you wanted to run your own custom Spark, you could build it, put the JAR on HDFS, set the SPARK_JAR environment variable, along with any other necessary parameters.
For example, see here for running a custom Spark on YARN.

Finally, decide from what directory to run the IPython Notebook. This directory will contain the .ipynb files that represent the different notebooks that can be served. See the IPython docs for more information. From this directory, execute:

ipython notebook --profile=pyspark

 

Note that if you just want to serve the notebooks without initializing Spark, you can start IPython Notebook using a profile that does not execute the shell.py script in the startup file.

Example Session

At this point, the IPython Notebook server should be running. Point your browser to http://my.host.com:8880/, which should open up the main access point to the available notebooks. This should look something like this:

This will show the list of possible .ipynb files to serve. If it is empty (because this is the first time you’re running it) you can create a new notebook, which will also create a new .ipynb file. As an example, here is a screenshot from a session that uses PySpark to analyze the GDELT event data set:

The full .ipynb file can be obtained as a GitHub gist.

The notebook itself can be viewed (but not executed) using the public IPython Notebook Viewer.

Uri Laserson (@laserson) is a data scientist at Cloudera.

Categories: Hadoop

New in CDH 5.1: HDFS Read Caching

Cloudera Blog - Mon, 08/11/2014 - 16:01

Applications using HDFS, such as Impala, will be able to read data up to 59x faster thanks to this new feature.

Server memory capacity and bandwidth have increased dramatically over the last few years. Beefier servers make in-memory computation quite attractive, since a lot of interesting data sets can fit into cluster memory, and memory is orders of magnitude faster than disk.

For the latest release of CDH 5.1, Cloudera contributed a read caching feature to HDFS to allow applications in the Apache Hadoop ecosystem to take full advantage of the potential of in-memory computation (HDFS-4949). By using caching, we’ve seen a speedup of up to 59x compared to reading from disk, and up to 3x compared to reading from page cache.

We’ll cover performance evaluation in more detail in a future blog post. Here, we’ll focus on the motivation and design of HDFS caching.

Motivation

A form of memory caching is already present on each HDFS DataNode: the operating system page cache. The page cache automatically caches recently accessed data on the local filesystem. Because of the page cache, reading the same file more than once will often result in a dramatic speedup. However, the OS page cache falls short when considered in the setting of a distributed system.

One issue is the lack of global information about the in-memory state of each node. Given the choice of multiple HDFS replicas from which to read some data, an application is unable to schedule its tasks for cache-locality. Since the application is forced to schedule its tasks blindly, performance suffers.


When a data analyst runs a query, the application scheduler chooses one of the three block replica locations and runs its task there, which pulls the replica into the page cache (A). However, if the analyst runs the same query again, the scheduler has no way of knowing which replica is in the page cache, and thus no way to place its task for cache locality (B).

Another issue is the page cache’s replacement algorithm, which is a modified version of “least-recently used” eviction. LRU-like algorithms are susceptible to large scans that wipe out the existing contents of the cache. This happens quite commonly on shared Hadoop clusters.

Consider a data analyst running interactive queries on a memory-sized working set: If a large I/O-heavy MapReduce job runs at the same time, it will evict the data analyst’s working set from the page cache, leading to poor interactive performance. Without application-level knowledge of which dataset to keep in memory, the page cache can do no better for mixed workloads. Finally, although reading data from the page cache is faster than disk, it is still inefficient compared to reading directly from memory (so-called zero-copy reads).

Another source of inefficiency is checksum verification. These checksums are intended to catch disk and network errors, and can theoretically be skipped if the client is reading from local in-memory data that has already been checksummed. However, skipping redundant checksumming safely is impossible with the page cache since there’s no way to guarantee that a read is coming from memory. By fixing these two issues, we were able to improve read performance by up to 3x compared to reading from page cache.

Architecture

The above issues resulted in the following three design requirements:

  1. Global knowledge of cluster cache state, so tasks can be scheduled for cache locality
  2. Global control over cluster cache state, for predictable performance for mixed workloads
  3. Pinning of data in local caches, to enable zero-copy reads and skipping checksums

Based on these requirements, we decided to add centralized cache management to the NameNode.


Example of an HDFS client caching a file: First, itsends a cache directive asking the NameNode to cache the file. The NameNode chooses some DataNodes to cache the requested file, with cache commands piggy-backed on the DataNode heartbeat. DataNodes respond with a cache report when the data is successfully cached.

Caching is explicit and user-driven. When a user wants something cached, they express their intent by creating a cache directive on the NameNode. A cache directive specifies the desired path to cache (meaning a file or directory in HDFS), a desired cache replication factor (up to the file’s replication factor), and the cache pool for the directive (used to enforce quotas on memory use). The system does not automatically manage cache directives, so it’s up to users to manage their outstanding cache directives based on their usage patterns.

Assuming that this cache directive is valid, the NameNode will attempt to cache said data. It will select cache locations from the set of DataNodes with the data on disk, and ask them to cache the data by piggy-backing a cache command on the DataNode heartbeat reply. This is the same way block replication and invalidation commands are sent.

When a DataNode receives a cache command, it pulls the desired data into its local cache by using mmap() and mlock() methods and then verifies its checksums. This series of operations guarantees that the data will remain resident in memory, and that it is safe to read without further checksum verification. Using the mmap() and mlock() methods has the advantage of storing the data off-heap, so large amounts of data can be cached without affecting garbage collection.

Because mlock() takes advantage of the OS page cache, if the block is already held there, we don’t need to copy it. The disadvantage of mlock is that the block must already exist in the filesystem before it can be locked in memory. So we cannot cache replicas on nodes that don’t have the replica already on disk.

DataNodes periodically send cache reports to the NameNode, which contain the state of their local cache. As soon as the NameNode knows that a block has been successfully cached on a DataNode, application schedulers can query the NameNode for this information and use it to schedule tasks for cache-locality.

Zero-copy Reads

Zero-copy read (ZCR) is the final step in efforts to improve the efficiency of the HDFS read path. Copies are one of the most obvious sources of inefficiency; the more time spent copying data, the fewer CPU cycles are left for useful work. ZCR is theoretically optimal in this regard, hence the name “zero-copy.”

The standard HDFS remote read path copies data from the kernel into the DataNode prior to sending it on to the DFSClient via a TCP socket. Short-circuit local reads eliminate this copy by “short-circuiting” the trip through the DataNode. Instead, the client simply reads the block file directly from the local filesystem.

However, even when using short-circuit reads, the DFSClient still needs to copy the data from kernel page cache into the client’s address space. ZCR, implemented in HDFS-4953, allow us to avoid that copy. Instead of copying, we use the mmap() system call to map the block from page cache directly into the client’s address space. ZCR also avoids the context switch overhead of repeated invocations of the read system call, which can be significant.

However, mmap() has some disadvantages. One difficulty is handling I/O errors. If a read() system call encounters an I/O error, it simply returns an error code. Accessing a memory-mapped segment can’t return an error, so any error results in a SIGBUS signal instead. Unless a signal handler has been installed, the calling process is terminated.

Fortunately, if a client is reading data that is cached by HDFS, it will never hit an I/O error (and thus never get a SIGBUS) — because the data is pinned in memory with mlock(). This approach lets us safely do ZCR without worrying about unexpected program termination. The client can also skip checksum verification when reading cached data, as the data is already checksummed by the datanode when it’s cached.

The ZCR API is described in HDFS-5191. In addition to a Java API, there is also a C API that allows applications such as Impala to take full advantage of zero-copy reads.

Example CLI usage

Here’s a simple example of creating a new cache pool and adding a cache directive for a file. This example assumes you’ve already configured your cluster correctly according to the official documentation.

$ hadoop fs -put myfile / $ hadoop fs -put myfile / $ # Add a new cache pool and cache directive $ hdfs cacheadmin -addPool testPool Successfully added cache pool testPool. $ hdfs cacheadmin -addDirective -path /myfile -pool testPool Added cache directive 1 $ # Wait for a minute or two for the NameNode to gather all datanode cache statistics. 512 of 512 bytes of our file should be cached. $ hdfs cacheadmin -listPools -stats testPool Found 1 result. NAME OWNER GROUP MODE LIMIT MAXTTL BYTES_NEEDED BYTES_CACHED BYTES_OVERLIMIT FILES_NEEDED FILES_CACHED testPool andrew andrew rwxr-xr-x unlimited never 512 512 0 1 $ # Look at the datanode stats, see that our DN is using 1 page of cache $ hdfs dfsadmin -report ...<snip>... Live datanodes (1): ...<snip>... Configured Cache Capacity: 64000 (62.50 KB) Cache Used: 4096 (4 KB) Cache Remaining: 59904 (58.50 KB) Cache Used%: 6.40% Cache Remaining%: 93.60%

 

Future Work

There are a number of further improvements we’d like to explore. For example, a current limitation of the system is that users need to manually specify what files and directories should be cached. Instead, HDFS could automatically manage what is cached based on workload patterns or hints.

Another potential improvement would be to extend HDFS caching to output files as well as input files. One potential use case for this so-called write-caching is for intermediate stages of a multi-job pipeline. Write-caching could avoid writing to disk at all, if durability is not required. This avenue of development is being pursued in HDFS-5851.

Conclusion

Due to increasing memory capacity, many interesting working sets are able to fit in aggregate cluster memory. By using HDFS centralized cache management, applications can take advantage of the performance benefits of in-memory computation. Cluster cache state is aggregated and controlled by the NameNode, allowing applications schedulers to place their tasks for cache locality. Explicit pinning of datasets allows users to isolate their working sets from other users on shared clusters. Finally, the new zero-copy read API offers substantially improved I/O performance by allowing clients to safely skip overhead from checksumming and the read()syscall.

In a follow-up post, we’ll analyze the performance of HDFS caching using a number of micro and macro benchmarks. Stay tuned!

Colin McCabe and Andrew Wang are both Software Engineers at Cloudera, and Hadoop committers/PMC members.

Categories: Hadoop

This Month in the Ecosystem (July 2014)

Cloudera Blog - Fri, 08/08/2014 - 15:35

Welcome to our 11th edition of “This Month in the Ecosystem,” a digest of highlights from July 2014 (never intended to be comprehensive; for that, see the excellent Hadoop Weekly).

  • An early release of the new O’Reilly Media book, Hadoop Application Architectures, became available. This one is sure to become standard bookshelf material. (Look for signed copies at Strata + Hadoop World!)
  • Continuuity introduced Tephra, an open source transaction engine for Apache HBase. According to Continuuity, Tephra “utilizes the key features of HBase to make transactional capabilities available without sacrificing overall performance.”
  • eBay open sourced its Apache Pig framework, which goes by the charming name Oink. Its architects say that Oink provides a UI for submitting jobs, offers QoS, and abstracts the user from cluster configuration.
  • New developer training for Apache Spark became available from Cloudera. For more background on the curriculum, read this.
  • Spring XD 1.0 became generally available, and includes support for CDH 4 among other major platforms. Spring XD utilizes the Kite SDK Data module for storage of some serialized data.

That’s all for this month, folks!

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

Progress Report: Cloudera Community Forums After One Year

Cloudera Blog - Tue, 08/05/2014 - 16:00

Cloudera Community forums are proving their value as an important contributor to a rich user experience.

It’s been almost exactly one year since the debut of the Cloudera Community forums. In addition to doing the birthday shout-out, I though it would be interesting to bring you up to date about adoption and usage patterns.

Launched in response to candid feedback from our customers, use of these forums has been steadily growing, with thousands of registered members today and many more joining each month. They contain hundreds of solved threads that have been viewed well over 1 million times, proving the value they add to other Apache Hadoop users.

The one-year-anniversary numbers tell the full story:

  • Logins to date: 64,574
  • Posts to date: 7,402
  • Sessions to date: 1,862,154
  • Page views to date: 2,633,211
  • “Solution” views to date: 1,249,207 (almost half of all page views!)
  • Average time on site per session: about 15 minutes
  • Percentage of topics with at least one response: 68%

Just look at those pretty trend lines:

 Green: message views; Orange: page views; Purple: user sessions

Top 5 search keywords:

  1. ec2
  2. hbase
  3. impala
  4. spark
  5. aws 

Top 5 most active users to date (non-employees) based on overall engagement – special thanks and congrats to you folks, you get a T-shirt!:

  1. Avin
  2. beth.klein
  3. jtrav
  4. Andre Araujo 
  5. james.sirota 

There’s still work to do, however. For example, we need to do a better job encouraging users to give “kudos” and mark correct answers as “solutions.” Doing so ensures that the most helpful users are recognized for their contributions, and makes it easier for people with similar issues to find the answers they need. And although the percentage of topics with a response is pretty high, we’d be happier with 100%.

All in all, we hope that you’ll give these forums a try, if you haven’t already — we believe that they’re an important part of a rich user experience, so we’re going to continue to invest in their (and your) success.

Justin Kestelyn is Cloudera’s developer outreach director.

Categories: Hadoop

Meet the Engineer: Sravya Tirukkovalur

Cloudera Blog - Fri, 08/01/2014 - 16:19

Meet Sravya Tirukkovalur (@sravsatuluri), a Software Engineer working on Apache Hadoop security at Cloudera.

What do you do at Cloudera, and in which Apache projects are you involved?

I am a software engineer here at Cloudera, working on the security aspects of the platform. I specifically work on and an active contributor to the Apache Sentry (incubating) project, which is part of the Project Rhino effort with Intel to bring comprehensive security for data protection to Hadoop. I am also a committer and a PPMC member of the project.

Sentry is a system for enforcing fine-grained, role-based authorization to data and metadata stored on a Hadoop cluster. It seamlessly integrates to provide authorization control to Apache Hive, Impala, and Apache Solr users currently.

Why do you enjoy your job?

It has been a great experience so far. Learning something new every day, working with some of the brightest minds in the industry, working in open source — and moreover, working with a fun-loving, dedicated team — is very rewarding. I also really like the fact that Cloudera encourages public knowledge sharing, and thus engineers get to talk at various meetups and conferences about the work they are doing.

What is your favorite thing about Apache Hadoop?

As Cloudera rightly says in one sentence: It lets you “ask bigger questions”. If you think about how much data we produce every day versus how much we actually process, it is astonishing to imagine how many ways the world could benefit if we had the software capabilities to easily store, process, measure, and learn from all of it. And with more and more new datasets becoming available daily, it is very important for the software to evolve rapidly in terms of scale, performance, usability, and security.

I think this rapidity of software development in the Hadoop ecosystem is only possible because of the open source community, and I am very glad to be a part of that community as well as working with the leader, Cloudera. 

What is your advice for someone who is interested in participating in any open source project for the first time?

There are numerous, high-impact, and interesting open source projects out there. I think it would be best to pick a couple of projects that interest you the most (even better, if you are already using that project). Subscribe to the users@ and dev@ mailing lists, follow the activity, and start using the project. Most of the projects have newbie JIRAs, which are relatively self-contained bug fixes. These are the ideal candidates to get started contributing.

You should completely feel free to file bugs and contribute patches when you hit problems. And each project also has a “How to contribute” page with detailed instructions for new contributors.

At what age did you become interested and programming, and why?

I started coding in C when I was around 17, and I instantly felt like I earned supernatural powers to do much more than what I could do with limited resources like physical strength and time. Coming from a sports background (tennis), I used to train my body for six hours a day — so the return over investment (in terms of time and energy) I saw in programming was very very exciting!

In tennis, once you reach the point of having good technique, all you need to do is keep your body in the best possible condition (which is no easy task) and experiment with your gaming strategy. But in programming, I can’t imagine a time where I will not have anything new to learn. That is something that keeps me excited every day. 

Categories: Hadoop

Pages