Getting Started with Big Data Architecture

Categories: Hadoop

What does a “Big Data engineer” do, and what does “Big Data architecture” look like? In this post, you’ll get answers to both questions.

Apache Hadoop has come a long way in its relatively short lifespan. From its beginnings as a reliable storage pool with integrated batch processing using the scalable, parallelizable (though inherently sequential) MapReduce framework, we have witnessed the recent additions of real-time (interactive) components like Impala for interactive SQL queries and integration with Apache Solr as a search engine for free-form text exploration.

Getting started is now also a lot easier: Just install CDH, and all the Hadoop ecosystem components are at your disposal. But after installation, where do you go from there? What is a good first use case? How do you ask those “bigger questions”?

Having worked with more customers running Hadoop in production than any other vendor, Cloudera’s field technical services team has seen more than its fair share of these use cases. Although they obviously vary by industry and application, there is a common theme: the presence of Big Data architecture.

In this post, you’ll get a whirlwind tour of that architecture based on what we’ve seen at customer sites over the past couple of years, and get some tips/initial advice about building your own as the foundation for an enterprise data hub.

Big Data Architecture

Big Data architecture is premised on a skill set for developing reliable, scalable, completely automated data pipelines. That skill set requires profound knowledge of every layer in the stack, beginning with cluster design and spanning everything from Hadoop tuning to setting up the top chain responsible for processing the data. The following diagram shows the complexity of the stack, as well as how data pipeline engineering touches every part of it.

The main detail here is that data pipelines take raw data and convert it into insight (or value). Along the way, the Big Data engineer has to make decisions about what happens to the data, how it is stored in the cluster, how access is granted internally, what tools to use to process the data, and eventually the manner of providing access to the outside world. The latter could be BI or other analytic tools, the former (for the processing) are likely tools such as Impala or Apache Spark. The people who design and/or implement such architecture I refer to as Big Data engineers.

In the remainder of this post, you’ll learn about the various components in the stack and their role in creating data pipelines.

Cluster Planning

Cluster planning is a “chicken-and-egg” problem, as cluster design is inherently driven by the use-case(s) running later on, and often the use case is not yet clear. Most vendors, including Cloudera, have a reference architecture guideline to help you select the proper class of machines. (For Cloudera certified partners, see the online listing.)

In general, the current recommended machines are dual CPU with 4 to 8 cores each, at least 48GB of RAM up to 512GB (for low latency analytical workloads where lots of data is cached), at least 6 HDDs (hard disk drives), up to 12 or larger for storage heavy configurations, and otherwise standard rack-mountable 19″ servers. Sometimes we also see SSD (solid state drive) setups for low-latency use cases, although the results are not as dramatic as one would assume. (Please test carefully.)

When in doubt, you can always try the public (or private) cloud services first, and once you know your requirements better, you can move things around. If you do so, be generous about machine size for getting comparable results with bare-metal hardware – remember, you are in a shared environment and need to factor-in competing loads and slower data connections (network and virtualized storage).

Ingress

After you have spun up your cluster, you have to decide how to load data. In practice there are two main approaches: batch and event-driven. The former is appropriate for file and structured data, while the latter is appropriate for most near-real-time events such as log or transactional data.

Batch Ingest

Let me start with the more straightforward case: ingesting data from structured data sources (for example, an RDBMS). The weapon of choice is universally Apache Sqoop, which allows you to move data into Hadoop from RDBMSs. You can select partial (column projection and row selection) or full data sets and do full or (given some requirements) incremental transfers. Sqoop uses MapReduce as its workhorse and employs default JDBC drivers for many database systems—or, if necessary, specialized drivers that speed up the data transfer.

The more complex batch ingest method is file loading. Here there are many ways to achieve that but none are really established. In fact, when possible, it is better to switch to the event ingest explained below to avoid bulk loading of files. The matter is complicated by the location of the files (on site or remote), as well as the API to load them (the HDFS put command being the simplest one; there are also REST based APIs with WebHDFS and HttpFS).

But how can you reliably ingest files without human intervention as demanded by Big Data architecture? I have yet to see a solution here, and for now can only point to bespoke custom scripting (Bash, Python, Java and so on) or the vast Hadoop partner field, which has lots to offer on the data integration topic.

On their own, these tools are one-off jobs only—they get invoked and do their work. What is missing is automatic ingest so that the data pipeline is constantly processing data. We’ll pick that up in the “Productionization” section below.

Event Ingest

For event-based ingest there is Apache Flume, which allows you to define a redundant, failsafe network of so-called agents that transport event records from a generating system to the consuming one. The latter might be HDFS, but it can also be Spark or HBase, or a combination of both.

Flume has been battle-tested at large user clusters and allows you to reliably deliver data to where it is needed. The tricky part is to configure the Flume topology and the agents correctly. The agents need to be able to buffer enough data on persistent media so that all anticipated “normal” server failures are covered. Also, tuning the batch sizes of events that are sent between agents is vital to achieve either higher throughput or lower latencies (faster message delivery).

Staging

Once the data has arrived in Hadoop as a whole, there remains the task of staging it for processing. This is not just about storing it somewhere, but rather storing it in the right format, with the right size, and the right access mask.

Storage Formats

The right data format depends on the subsequent use case. Whether the application is batch or real-time is again relevant, but so is whether the format retains the full fidelity of data and is open source (i.e. can be used by more than one tool in the processing stage).

For batch, container file formats, including the venerable SequenceFile and Avro formats, are both useful and popular. As for the analytical, real-time application, the new rising star is Apache Parquet (incubating), which similar to columnar databases lays out the data in columns with built-in structure and compression (e.g. skip NULL values) that allow you to very efficiently scan very large data sets (assuming a selective query pattern).

In addition to the file format, you should also strongly consider encoding and compression formats because the best I/O in Big Data is the one you are not doing. Compression is always a good thing for reducing I/O while loading more data with fewer bytes being moved around. The proper approach is driven by CPU-versus-compression ratio trade-offs, because the better a codec compresses, the more CPU it usually needs. Thus, the data we see is almost always compressed with the Snappy codec, which is super-fast and lightweight yet offers decent compression ratios. For historical data, BZip2 or something similar is often used.

It is also important to think about what happens with your data over time. You might want to implement policies that rewrite older data into different file or compression formats, so that you make better use of the available cluster capacity. As data ages and is accessed less often, it is worthwhile to trade back the compression ratio against CPU usage. Here there are no incumbent tools to help you out, and in the field I often see rather custom solutions (scripting again)… or none at all (which is not good).

Data Partitioning

As you land data, there is another important aspect to consider: how you partition or, more generally, size data. For starters, Hadoop is good at managing fewer very large files. You do not want to design an architecture that lands many small files in HDFS and then be surprised when the NameNode starts to perform badly. You can, of course, land small files, but you would need to implement an ETL stage (or rather TL as no extract is needed) that combines smaller files into larger ones.

While you are transforming files as they arrive, the next step is to split them into decent chunks for later processing. This is usually done using partitions on HDFS. In HBase the partitioning is implicit as it divides data into regions of contiguous rows, sorted by their row key; it splits and rebalances as it goes along. For HDFS, you have to plan ahead of time—you might need to sample data and explore its structure to decide what is best for you. The rule of thumb, though, is for partitions to span at least a decent amount of data worth processing without creating the small-file problem mentioned above. I would advise you to start with a partition amounting to at least 1GB in a single file, and knowing the size of the total dataset, tune this up to even larger sizes. So for very large datasets in the hundreds of TBs and up, I would have each file in a partition be 10GB, or even 100GB or more.

One final note: make sure the file format supports splitting the files into smaller blocks for parallel processing. The above suggested container formats usually do that, but you might want to double check (look for splittable support). If not, you can end up with suboptimal performance across the cluster because a single reader has to process a single large file (that is, your parallelism rate drops considerably).

Access Control

The last part you have to consider is what we call information architecture (IA), which addresses the need to lay out the data in such a way that multiple teams can work safely on a shared cluster—also referred to as multi-tenancy.

It is not enough to have each job read from one directory and emit to another. If you share a cluster across departments, you need to devise a concise access schema that controls tightly (and possibly supports auditing of) who has access to what data. The IA is where these rules are defined—for example, by using user groups and other HDFS features (see the new extended ACLs features in HDFS or Apache Sentry) to map business units into owners of data. With that, you can further define a plan on how data is read from storage during processing and pushed through the various stages of the data processing pipeline.

One way to handle proper processing is to create a time-stamped directory for every running job and then within a further directory structure for incoming (for example from a previous job), currently being processed, and final (as well as permanently failed) files. This ensures that jobs can run in parallel without overwriting each other’s data mid-flight.

We won’t cover this issue in detail here, but IA should also account for data backups (for disaster recovery or load balancing). You need a strategy for moving data across multiple clusters or even data centers.

Data Processing

Thus far you have learned about landing and staging the incoming data. The next step is automatically processing it as part of the data pipeline.

Data Transformation

This is the part mentioned above, i.e. where you process existing data, for example, to transform it into other file formats or other compression algorithms. Just because you transform your data doesn’t mean you need to lose any of its detail: this is not your typical ETL which is often lossy, but rather an optional step to increase the effectiveness of your cluster. Plan to do whatever is needed for staging, which might also extend to rewriting data over time (or based on changing requirements). You could, for example, employ heuristics that check how often and in what way data is used and change its layout over time.

Analytics

The more interesting part of processing is the analytics done on top of the staged data. Here we see the venerable MapReduce—now rejuvenated on top of YARN—as well as other frameworks, such as Spark or Apache Giraph. On top of that layer there are other abstractions in use, notably Apache Crunch and Cascading.

The currently most hyped topic is machine learning, wherein you build mathematical models for recommendations or clustering/classification of incoming new data—for example, to do risk assessment, fraud detection, or spam filtering. The more “mundane” tasks in analysis, such as building aggregations and reporting data, are still very common. In fact, the latter is more than 90% of the use cases we see, with the former being an emerging area.

Either way, after prototyping the algorithm and approach, you have to convert it into an automated workflow.

Egress and Querying

As for providing access to the data, you need to find one that covers all types of users, from novices to experts. The access spans from the ubiquitous Search using Apache Solr, to JDBC interfaces that SQL users and BI tools can use, all the way to low-level APIs—and eventually, raw file access. Regardless of the access method, the data is never copied nor siloed into lesser data structures: all these tools work on the single source of truth represented as the full-fidelity files in HDFS or key values in HBase. Whether you use Impala or Hive to issue SQL commands, the Kite SDK to read files, or process data with interactive Spark, you are always working on the same copy of data.

In fact, that’s what makes Hadoop so powerful, as it removes the need to move data around and transform it to “yet another (lesser) schema”. The integration of Hadoop into the enterprise IT landscape with Kerberos authentication, role-based authorization, and log-based auditing completes the picture.

Data Pipelines

Before we can automate, we have to combine the tools described above into more complex data pipelines. There are two main types of such pipelines: micro and macro.

Micro-pipelines are streamlined helpers that allow you to abstract (and therefore simplify) parts of the larger processing. Tools for this purpose include Morphlines (see “Introducing Morphlines: The Easy Way to Build and Integrate ETL Apps for Hadoop” or details), Crunch, and Cascading. Morphlines tie together smaller processing steps applied to each record or data pair as it flows through the processing. That lets you build tested, reusable processing sub-steps, used for example to cleanse data or enhance its metadata for later steps.

In contrast, Crunch and Cascading define an abstraction layer on top of the processing, where you deal with data points. You define how data is consumed, routed, processed, and emitted, which translates into one or more processing job on MapReduce and/or Spark. But a Crunch or Cascading “meta” job can further be combined to yet more complex workflows, which is usually done in macro-pipelines.

Apache Oozie is one of those macro-pipelines tools. It defines workflows as directed, acyclic graphs (DAGs) that have control and action elements, where the former influences how the flow proceeds and the latter what has to be done for each step. Oozie also has a server component that tracks the running flows and measures to handle their completion (or termination).

As with single jobs or micro-pipelines, a “workflow” is not automated but rather just a definition of work. It has to be invoked manually to start the flow processing. This is where another part of Oozie, the coordinators, come in. Oozie coordinators help define the time or frequency a workflow should run, and/or the dependencies to other workflows and data sources. With this feature, you can define the missing link in automating processing.

Productioniziation

We have closed the loop above and now data pipelines can run in an automated fashion, consuming and producing data as needed. But for a Big Data engineer, I argue there is one more piece to the puzzle: production-ization.

Superficially, it sounds like a trivial task since the hard work has been “done.” In practice, this last step is a challenge because it spans the entire stack and requires careful release planning, with proper testing (QA), staging, and deployment phases. It also includes operating the data pipelines, which means monitoring, reporting, and alerting. Finally, insights about the performance of the pipelines might trigger cluster changes, from hardware to configuration settings.

There are some tools that help you along the way. For example, Cloudera Manager can track cluster utilization and job performance, and Cloudera Navigator can define data lifecycle rules, including metadata about the source and lineage of data. A Big Data engineer is still needed to fill in the gaps, while maintaining the entire pipeline in production.

The following diagram adds the discussed tools and concepts to the data pipeline architecture:

Support

Please do consider the option of having constant support for your Hadoop cluster and data pipelines in production (but also in development). The complexity of running a cluster in such a mode is not trivial and can cause considerable delays when things go wrong. Cloudera Manager will help you tremendously to reduce the time to discover a root cause for a problem, and often you can apply a fix yourself. But there are also many issue we have seen in practice that require a Hadoop engineer to lend a helping hand. Obviously Cloudera has such a team of engineers, in fact a multilayered, dedicated team, which can help you solve any problem you might face.

Conclusion

While Hadoop has grown tremendously, there are still functional gaps for putting data pipelines into production easily, so skilled Big Data engineers are needed. Demand for these engineers is high and expected to grow, and Cloudera’s new “Designing and Building Big Data Applications” training course can teach you the skills you will need to excel in this role.

The Hadoop ecosystem, helpfully, offers most of the tools needed to build and automate these pipelines based on business rules—testing and deploying pipelines is easier with proper tooling support, while operating the same pipelines in production can be equally automated and transparent. As time moves on, missing functionality will be provided either by Cloudera, by third-party vendors, or as part of the open source ecosystem.

In a future world, we will be able to point Hadoop to a source, internal or external, batch or streaming, and press an “Implement Pipeline” button. The initial parameters will be assumed, and further learned and adjusted as needed, resulting in data being laid out for the current use case, be it interactive or automated (or both).

We can dream. In the meantime, happy Hadoop-ing!

Lars George is Cloudera’s EMEA Chief Architect, an HBase committer and PMC member, and the author of O’Reilly’s HBase: The Definitive Guide.

Facebooktwittergoogle_pluslinkedinmailFacebooktwittergoogle_pluslinkedinmail

9 responses on “Getting Started with Big Data Architecture

  1. Naveen

    Thanks Lars, nice big picture of all stages of data processing. is there no concept of master data? also, when morphlines do their job, it creates a new copy in HDFS, which i guess is used for analytics? i also thought all the processing is immutable , no updates or rewrites? any advise?

    1. Lars George Post author

      Hey Naveen, that is exactly the point: the notion of Master Data is something that is completely user defined. Hadoop does not impose anything. You need to devise a strategy for your use-case/company based on a global information architecture. And that is what I tried to explain, an Hadoop Big Data Engineer is partly (or often mostly) responsible to ensure how data flows through the system. Third party support (such as our own Navigator) will close this gap over time, but for the time being there is work needed. As for Morphlines, they are executed inline during processing and do not (necessarily) create new files, but transform data in a reusable manner. How you use the data is up to you. And yes, once you write data it becomes immutable. So transforming the data depending on what you do with it is quite normal. Does that help?

  2. xuxiong

    Hi Lars,

    ” The rule of thumb, though, is for partitions to span at least a decent amount of data worth processing without creating the small-file problem mentioned above. I would advise you to start with a partition amounting to at least 1GB in a single file, and knowing the size of the total dataset, tune this up to even larger sizes. ”

    I’m not sure what ‘partition’ means in the context, is it the hdfs data block?

  3. MGR

    @ XUXIONG

    I think Lars was probably refering to partitions in the context of Hive and Impala. Both Hive and Impala use straight HDFS immutable files for querying. So by partitioning them in the way Lars explained can help you avoid scanning the entire dataset while doing SQL queries.

    But partitioning can still be used in the context of MapReduce/Spark. For example: your log data is ingested continuously and you have a data pipeline that processes the land in a particular directory every 15/30/60 minutes.

    If you have all the data landing in the same directory -for example:
    /dev/data/landingdir/file1
    /dev/data/landingdir/file2 ….
    /dev/data/landingdir/filen

    This setup is considered non-paritioned, and if we use this as the input directory for the MapReduce/Spark job, it will process all the files and puts burden on the MR/Spark job to filter out only the files that arrived in the last 15/30/60 minutes and output them. This is usually a bad practice and the job runtime lengths can get very large as number of files in the directory increase every few minutes.

    Another way to organize the data is in a partitioned fashion. For example:

    /dev/data/landingdir/timestamp1/file1
    /dev/data/landingdir/timestamp1/file2
    /dev/data/landingdir/timestamp1+30/file1
    /dev/data/landingdir/timestamp1+30/file2 …..

    Now we can specify the MR/Spark to pick the files that are in the latest partition and just process them. The job runtime is much faster and should be predictable if the average number&size of files ingested into the system every few minutes.

    Hope this helps!

  4. Mich Talebzadeh

    Hi,
    Thank you for the article. My query is with regard to use of Solid State Disks (SSD) for Name nodes and data nodes. Since Name node holds meta data information about blocks and their distribution, it is essential that the metadata is available pretty fast so caching would be ideal. This is where large amount of RAM helps on the host that NameNode is running. In addition, I would agree that name node files on SSD will perform much better. The name node reads are going to be pretty random access so reads from SSD devices is going to improve performance.

    With regard to data nodes, we are talking about few writes (say bulk-insert) and many more reads. Let us compare SSD response compared to magnetic spinning hard disks (HDD) for bulk inserts. Although not on HDFS, I did tests using Oracle and SAP ASE RDBMS for bulk inserts and full table scans respectively as described below

    SSD versus HDD for Bulk Inserts

    SSD faired reasonably better compared to HDD but not that much faster. The initial seek time for SSD was faster but the rest of the writes were sequential extent after extent which would not much different from what HDD does.

    In summary:

    • First seek time for positioning the disk on the first available space will be much faster for SSD than HDD
    • The rest of the writes will have marginal improvements with SSD over HDD
    • The performance gains I got was twice faster for SSD compared to HDD for bulk inserts

    SSD versus HDD for full table scans

    solid state devices are typically twice as fast that is it. When the query results in table scans, the gain is not that much. The reason being that HDD is reading sequentially from blocks and the seek time impact is considerably less compared to random reads.

    In summary Solid State Disks are best suited for random access via index scan as seek time disappears and accessing any part of the disks is just as fast. When the query results in table scans, the gain is not that much. I gather in Hadoop practically all block reads are going to be sequential so having Solid State Disks for data nodes may not pay.

    The results were publish In Oracle’s SELECT and SAP ASE’s ISUG magazine.

    HTH,

    Mich

  5. Gautam

    The whole purpose of hadoop is to enable store enormous of data at optimum economy and still provide superior performance. JBOD when work in parallel in distributed environment often outperform RAID disks and not only that the whole idea of having larger block size on top of smaller OS block (4K) is to make seek time insignificant compared to transfer rate of data in a disk. Hence solid state disk even being faster in a commodity hardware based scalable distributed environment it may not fit. regarding namenode caches whole namespace in memory hence we rather use good memory size rather improving disk.

  6. Prakash

    Hi ,
    I am Retail planner / brand manager , with Fashion technology graduation and post graduation in MBA . I am purely from non technical back ground , but I am aspiring to get inside big data analytics , hence would like to know few things .

    1. To be a Data scientist is it mandate to have computer science graduation or any body can take up by doing certifications ?
    2. Is there any eligibility criteria for taking up test and getting certification ?
    3. Will this certification help me get job in Retail Big Data Analytic .?

    Best regards
    Prakash

  7. Prasad

    Hi,

    We are trying to implement new a bigdata pletform starting from a scratch. From an information architecture point of view, i would like to know how is folder schema and its corresponding Hcatlogs are created. Here are folders i am thinking of

    /data/staging///files — DataAcquisition (Hcatalog) : Place holder for raw files
    /data/integration/ — DataIntegration (Hcatalog) : Placeholder for integrated data
    /data/presentaion/ — DataPresentation (Hcatalog) : Placeholder for holding presentation data structures (flat tables/dimensions/facts etc…)

    /datalab — Datalab (Hcatlog) : Placeholder for research and discovery analysis for data scientists

    Please peruse and suggest better way of organising hadoop folder structures and hive databases.

    Kind Regards
    Prasad