What does a “Big Data engineer” do, and what does “Big Data architecture” look like? In this post, you’ll get answers to both questions.
Apache Hadoop has come a long way in its relatively short lifespan. From its beginnings as a reliable storage pool with integrated batch processing using the scalable, parallelizable (though inherently sequential) MapReduce framework, we have witnessed the recent additions of real-time (interactive) components like Impala for interactive SQL queries and integration with Apache Solr as a search engine for free-form text exploration.
Getting started is now also a lot easier: Just install CDH, and all the Hadoop ecosystem components are at your disposal. But after installation, where do you go from there? What is a good first use case? How do you ask those “bigger questions”?
Having worked with more customers running Hadoop in production than any other vendor, Cloudera’s field technical services team has seen more than its fair share of these use cases. Although they obviously vary by industry and application, there is a common theme: the presence of Big Data architecture.
In this post, you’ll get a whirlwind tour of that architecture based on what we’ve seen at customer sites over the past couple of years, and get some tips/initial advice about building your own as the foundation for an enterprise data hub.
Big Data Architecture
Big Data architecture is premised on a skill set for developing reliable, scalable, completely automated data pipelines. That skill set requires profound knowledge of every layer in the stack, beginning with cluster design and spanning everything from Hadoop tuning to setting up the top chain responsible for processing the data. The following diagram shows the complexity of the stack, as well as how data pipeline engineering touches every part of it.
The main detail here is that data pipelines take raw data and convert it into insight (or value). Along the way, the Big Data engineer has to make decisions about what happens to the data, how it is stored in the cluster, how access is granted internally, what tools to use to process the data, and eventually the manner of providing access to the outside world. The latter could be BI or other analytic tools, the former (for the processing) are likely tools such as Impala or Apache Spark. The people who design and/or implement such architecture I refer to as Big Data engineers.
In the remainder of this post, you’ll learn about the various components in the stack and their role in creating data pipelines.
Cluster planning is a “chicken-and-egg” problem, as cluster design is inherently driven by the use-case(s) running later on, and often the use case is not yet clear. Most vendors, including Cloudera, have a reference architecture guideline to help you select the proper class of machines. (For Cloudera certified partners, see the online listing.)
In general, the current recommended machines are dual CPU with 4 to 8 cores each, at least 48GB of RAM up to 512GB (for low latency analytical workloads where lots of data is cached), at least 6 HDDs (hard disk drives), up to 12 or larger for storage heavy configurations, and otherwise standard rack-mountable 19″ servers. Sometimes we also see SSD (solid state drive) setups for low-latency use cases, although the results are not as dramatic as one would assume. (Please test carefully.)
When in doubt, you can always try the public (or private) cloud services first, and once you know your requirements better, you can move things around. If you do so, be generous about machine size for getting comparable results with bare-metal hardware – remember, you are in a shared environment and need to factor-in competing loads and slower data connections (network and virtualized storage).
After you have spun up your cluster, you have to decide how to load data. In practice there are two main approaches: batch and event-driven. The former is appropriate for file and structured data, while the latter is appropriate for most near-real-time events such as log or transactional data.
Let me start with the more straightforward case: ingesting data from structured data sources (for example, an RDBMS). The weapon of choice is universally Apache Sqoop, which allows you to move data into Hadoop from RDBMSs. You can select partial (column projection and row selection) or full data sets and do full or (given some requirements) incremental transfers. Sqoop uses MapReduce as its workhorse and employs default JDBC drivers for many database systems—or, if necessary, specialized drivers that speed up the data transfer.
The more complex batch ingest method is file loading. Here there are many ways to achieve that but none are really established. In fact, when possible, it is better to switch to the event ingest explained below to avoid bulk loading of files. The matter is complicated by the location of the files (on site or remote), as well as the API to load them (the HDFS put command being the simplest one; there are also REST based APIs with WebHDFS and HttpFS).
But how can you reliably ingest files without human intervention as demanded by Big Data architecture? I have yet to see a solution here, and for now can only point to bespoke custom scripting (Bash, Python, Java and so on) or the vast Hadoop partner field, which has lots to offer on the data integration topic.
On their own, these tools are one-off jobs only—they get invoked and do their work. What is missing is automatic ingest so that the data pipeline is constantly processing data. We’ll pick that up in the “Productionization” section below.
For event-based ingest there is Apache Flume, which allows you to define a redundant, failsafe network of so-called agents that transport event records from a generating system to the consuming one. The latter might be HDFS, but it can also be Spark or HBase, or a combination of both.
Flume has been battle-tested at large user clusters and allows you to reliably deliver data to where it is needed. The tricky part is to configure the Flume topology and the agents correctly. The agents need to be able to buffer enough data on persistent media so that all anticipated “normal” server failures are covered. Also, tuning the batch sizes of events that are sent between agents is vital to achieve either higher throughput or lower latencies (faster message delivery).
Once the data has arrived in Hadoop as a whole, there remains the task of staging it for processing. This is not just about storing it somewhere, but rather storing it in the right format, with the right size, and the right access mask.
The right data format depends on the subsequent use case. Whether the application is batch or real-time is again relevant, but so is whether the format retains the full fidelity of data and is open source (i.e. can be used by more than one tool in the processing stage).
For batch, container file formats, including the venerable SequenceFile and Avro formats, are both useful and popular. As for the analytical, real-time application, the new rising star is Apache Parquet (incubating), which similar to columnar databases lays out the data in columns with built-in structure and compression (e.g. skip NULL values) that allow you to very efficiently scan very large data sets (assuming a selective query pattern).
In addition to the file format, you should also strongly consider encoding and compression formats because the best I/O in Big Data is the one you are not doing. Compression is always a good thing for reducing I/O while loading more data with fewer bytes being moved around. The proper approach is driven by CPU-versus-compression ratio trade-offs, because the better a codec compresses, the more CPU it usually needs. Thus, the data we see is almost always compressed with the Snappy codec, which is super-fast and lightweight yet offers decent compression ratios. For historical data, BZip2 or something similar is often used.
It is also important to think about what happens with your data over time. You might want to implement policies that rewrite older data into different file or compression formats, so that you make better use of the available cluster capacity. As data ages and is accessed less often, it is worthwhile to trade back the compression ratio against CPU usage. Here there are no incumbent tools to help you out, and in the field I often see rather custom solutions (scripting again)… or none at all (which is not good).
As you land data, there is another important aspect to consider: how you partition or, more generally, size data. For starters, Hadoop is good at managing fewer very large files. You do not want to design an architecture that lands many small files in HDFS and then be surprised when the NameNode starts to perform badly. You can, of course, land small files, but you would need to implement an ETL stage (or rather TL as no extract is needed) that combines smaller files into larger ones.
While you are transforming files as they arrive, the next step is to split them into decent chunks for later processing. This is usually done using partitions on HDFS. In HBase the partitioning is implicit as it divides data into regions of contiguous rows, sorted by their row key; it splits and rebalances as it goes along. For HDFS, you have to plan ahead of time—you might need to sample data and explore its structure to decide what is best for you. The rule of thumb, though, is for partitions to span at least a decent amount of data worth processing without creating the small-file problem mentioned above. I would advise you to start with a partition amounting to at least 1GB in a single file, and knowing the size of the total dataset, tune this up to even larger sizes. So for very large datasets in the hundreds of TBs and up, I would have each file in a partition be 10GB, or even 100GB or more.
One final note: make sure the file format supports splitting the files into smaller blocks for parallel processing. The above suggested container formats usually do that, but you might want to double check (look for splittable support). If not, you can end up with suboptimal performance across the cluster because a single reader has to process a single large file (that is, your parallelism rate drops considerably).
The last part you have to consider is what we call information architecture (IA), which addresses the need to lay out the data in such a way that multiple teams can work safely on a shared cluster—also referred to as multi-tenancy.
It is not enough to have each job read from one directory and emit to another. If you share a cluster across departments, you need to devise a concise access schema that controls tightly (and possibly supports auditing of) who has access to what data. The IA is where these rules are defined—for example, by using user groups and other HDFS features (see the new extended ACLs features in HDFS or Apache Sentry) to map business units into owners of data. With that, you can further define a plan on how data is read from storage during processing and pushed through the various stages of the data processing pipeline.
One way to handle proper processing is to create a time-stamped directory for every running job and then within a further directory structure for incoming (for example from a previous job), currently being processed, and final (as well as permanently failed) files. This ensures that jobs can run in parallel without overwriting each other’s data mid-flight.
We won’t cover this issue in detail here, but IA should also account for data backups (for disaster recovery or load balancing). You need a strategy for moving data across multiple clusters or even data centers.
Thus far you have learned about landing and staging the incoming data. The next step is automatically processing it as part of the data pipeline.
This is the part mentioned above, i.e. where you process existing data, for example, to transform it into other file formats or other compression algorithms. Just because you transform your data doesn’t mean you need to lose any of its detail: this is not your typical ETL which is often lossy, but rather an optional step to increase the effectiveness of your cluster. Plan to do whatever is needed for staging, which might also extend to rewriting data over time (or based on changing requirements). You could, for example, employ heuristics that check how often and in what way data is used and change its layout over time.
The more interesting part of processing is the analytics done on top of the staged data. Here we see the venerable MapReduce—now rejuvenated on top of YARN—as well as other frameworks, such as Spark or Apache Giraph. On top of that layer there are other abstractions in use, notably Apache Crunch and Cascading.
The currently most hyped topic is machine learning, wherein you build mathematical models for recommendations or clustering/classification of incoming new data—for example, to do risk assessment, fraud detection, or spam filtering. The more “mundane” tasks in analysis, such as building aggregations and reporting data, are still very common. In fact, the latter is more than 90% of the use cases we see, with the former being an emerging area.
Either way, after prototyping the algorithm and approach, you have to convert it into an automated workflow.
Egress and Querying
As for providing access to the data, you need to find one that covers all types of users, from novices to experts. The access spans from the ubiquitous Search using Apache Solr, to JDBC interfaces that SQL users and BI tools can use, all the way to low-level APIs—and eventually, raw file access. Regardless of the access method, the data is never copied nor siloed into lesser data structures: all these tools work on the single source of truth represented as the full-fidelity files in HDFS or key values in HBase. Whether you use Impala or Hive to issue SQL commands, the Kite SDK to read files, or process data with interactive Spark, you are always working on the same copy of data.
In fact, that’s what makes Hadoop so powerful, as it removes the need to move data around and transform it to “yet another (lesser) schema”. The integration of Hadoop into the enterprise IT landscape with Kerberos authentication, role-based authorization, and log-based auditing completes the picture.
Before we can automate, we have to combine the tools described above into more complex data pipelines. There are two main types of such pipelines: micro and macro.
Micro-pipelines are streamlined helpers that allow you to abstract (and therefore simplify) parts of the larger processing. Tools for this purpose include Morphlines (see “Introducing Morphlines: The Easy Way to Build and Integrate ETL Apps for Hadoop” or details), Crunch, and Cascading. Morphlines tie together smaller processing steps applied to each record or data pair as it flows through the processing. That lets you build tested, reusable processing sub-steps, used for example to cleanse data or enhance its metadata for later steps.
In contrast, Crunch and Cascading define an abstraction layer on top of the processing, where you deal with data points. You define how data is consumed, routed, processed, and emitted, which translates into one or more processing job on MapReduce and/or Spark. But a Crunch or Cascading “meta” job can further be combined to yet more complex workflows, which is usually done in macro-pipelines.
Apache Oozie is one of those macro-pipelines tools. It defines workflows as directed, acyclic graphs (DAGs) that have control and action elements, where the former influences how the flow proceeds and the latter what has to be done for each step. Oozie also has a server component that tracks the running flows and measures to handle their completion (or termination).
As with single jobs or micro-pipelines, a “workflow” is not automated but rather just a definition of work. It has to be invoked manually to start the flow processing. This is where another part of Oozie, the coordinators, come in. Oozie coordinators help define the time or frequency a workflow should run, and/or the dependencies to other workflows and data sources. With this feature, you can define the missing link in automating processing.
We have closed the loop above and now data pipelines can run in an automated fashion, consuming and producing data as needed. But for a Big Data engineer, I argue there is one more piece to the puzzle: production-ization.
Superficially, it sounds like a trivial task since the hard work has been “done.” In practice, this last step is a challenge because it spans the entire stack and requires careful release planning, with proper testing (QA), staging, and deployment phases. It also includes operating the data pipelines, which means monitoring, reporting, and alerting. Finally, insights about the performance of the pipelines might trigger cluster changes, from hardware to configuration settings.
There are some tools that help you along the way. For example, Cloudera Manager can track cluster utilization and job performance, and Cloudera Navigator can define data lifecycle rules, including metadata about the source and lineage of data. A Big Data engineer is still needed to fill in the gaps, while maintaining the entire pipeline in production.
The following diagram adds the discussed tools and concepts to the data pipeline architecture:
Please do consider the option of having constant support for your Hadoop cluster and data pipelines in production (but also in development). The complexity of running a cluster in such a mode is not trivial and can cause considerable delays when things go wrong. Cloudera Manager will help you tremendously to reduce the time to discover a root cause for a problem, and often you can apply a fix yourself. But there are also many issue we have seen in practice that require a Hadoop engineer to lend a helping hand. Obviously Cloudera has such a team of engineers, in fact a multilayered, dedicated team, which can help you solve any problem you might face.
While Hadoop has grown tremendously, there are still functional gaps for putting data pipelines into production easily, so skilled Big Data engineers are needed. Demand for these engineers is high and expected to grow, and Cloudera’s new “Designing and Building Big Data Applications” training course can teach you the skills you will need to excel in this role.
The Hadoop ecosystem, helpfully, offers most of the tools needed to build and automate these pipelines based on business rules—testing and deploying pipelines is easier with proper tooling support, while operating the same pipelines in production can be equally automated and transparent. As time moves on, missing functionality will be provided either by Cloudera, by third-party vendors, or as part of the open source ecosystem.
In a future world, we will be able to point Hadoop to a source, internal or external, batch or streaming, and press an “Implement Pipeline” button. The initial parameters will be assumed, and further learned and adjusted as needed, resulting in data being laid out for the current use case, be it interactive or automated (or both).
We can dream. In the meantime, happy Hadoop-ing!
Lars George is Cloudera’s EMEA Chief Architect, an HBase committer and PMC member, and the author of O’Reilly’s HBase: The Definitive Guide.