How-to: Use HBase Bulk Loading, and Why

How-to: Use HBase Bulk Loading, and Why

Apache HBase is all about giving you random, real-time, read/write access to your Big Data, but how do you efficiently get that data into HBase in the first place? Intuitively, a new user will try to do that via the client APIs or by using a MapReduce job with TableOutputFormat, but those approaches are problematic, as you will learn below. Instead, the HBase bulk loading feature is much easier to use and can insert the same amount of data more quickly.

This blog post will introduce the basic concepts of the bulk loading feature, present two use cases, and propose two examples.

Overview of Bulk Loading

If you have any of these symptoms, bulk loading is probably the right choice for you:

  • You needed to tweak your MemStores to use most of the memory.
  • You needed to either use bigger WALs or bypass them entirely.
  • Your compaction and flush queues are in the hundreds.
  • Your GC is out of control because your inserts range in the MBs.
  • Your latency goes out of your SLA when you import data.

Most of those symptoms are commonly referred to as “growing pains.” Using bulk loading can help you avoid them.

In HBase-speak, bulk loading is the process of preparing and loading HFiles (HBase’s own file format) directly into the RegionServers, thus bypassing the write path and obviating those issues entirely. This process is similar to ETL and looks like this:

1. Extract the data from a source, typically text files or another database. HBase doesn’t manage this part of the process. In other words, you cannot tell HBase to prepare HFiles by directly reading them from MySQL — rather, you have to do it by your own means. For example, you could run mysqldump on a table and upload the resulting files to HDFS or just grab your Apache HTTP log files. In any case, your data needs to be in HDFS before the next step.

2. Transform the data into HFiles. This step requires a MapReduce job and for most input types you will have to write the Mapper yourself. The job will need to emit the row key as the Key, and either a KeyValue, a Put, or a Delete as the Value. The Reducer is handled by HBase; you configure it using HFileOutputFormat.configureIncrementalLoad() and it does the following:

  • Inspects the table to configure a total order partitioner
  • Uploads the partitions file to the cluster and adds it to the DistributedCache
  • Sets the number of reduce tasks to match the current number of regions
  • Sets the output key/value class to match HFileOutputFormat’s requirements
  • Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or PutSortReducer)

At this stage, one HFile will be created per region in the output folder. Keep in mind that the input data is almost completely re-written, so you will need at least twice the amount of disk space available than the size of the original data set. For example, for a 100GB mysqldump you should have at least 200GB of available disk space in HDFS. You can delete the dump file at the end of the process.

3. Load the files into HBase by telling the RegionServers where to find them. This is the easiest step. It requires using LoadIncrementalHFiles (more commonly known as the completebulkload tool), and by passing it a URL that locates the files in HDFS, it will load each file into the relevant region via the RegionServer that serves it. In the event that a region was split after the files were created, the tool will automatically split the HFile according to the new boundaries. This process isn’t very efficient, so if your table is currently being written to by other processes, it’s best to get the files loaded as soon as the transform step is done.

Here’s an illustration of this process. The data flow goes from the original source to HDFS, where the RegionServers will simply move the files to their regions’ directories.

Use Cases

Original dataset load: All users migrating from another datastore should consider this use case. First, you have to go through the exercise of designing the table schema and then create the table itself, pre-split. The split points have to take into consideration the row-key distribution and the number of RegionServers. I recommend reading my colleague Lars George’s presentation on advanced schema design for any serious use case.

The advantage here is that it is much faster to write the files directly than going through the RegionServer’s write path (writing to both the MemStore and the WAL) and then eventually flushing, compacting, and so on. It also means you don’t have to tune your cluster for a write-heavy workload and then tune it again for your normal workload.

Incremental load: Let’s say that you have some dataset currently being served by HBase, but now you need to import more data in batch from a third party or you have a nightly job that generates a few gigabytes that you need to insert. It’s probably not as large as the dataset that HBase is already serving, but it might affect your latency’s 95th percentile. Going through the normal write path will have the adverse effect of triggering more flushes and compactions during the import than normal. This additional IO stress will compete with your latency-sensitive queries.

Examples

You can use the following examples in your own Hadoop cluster but the instructions are provided for the Cloudera QuickStart VM, which is a single-node cluster, guest OS, and sample data and examples baked into a virtual machine appliance for your desktop.

Once you start the VM, tell it, via the web interface that will automatically open, to deploy CDH and then make sure that the HBase service is also started.

Built-in TSV Bulk Loader

HBase ships with a MR job that can read a delimiter-separated values file and output directly into an HBase table or create HFiles for bulk loading. Here we are going to:

  1. Get the sample data and upload it to HDFS.
  2. Run the ImportTsv job to transform the file into multiple HFiles according to a pre-configured table.
  3. Prepare and load the files in HBase.

The first step is to open a console and use the following command to get sample data:

curl -O
https://people.apache.org/~jdcryans/word_count.csv

I created this file by running a word count on the original manuscript of this very blog post and then outputting the result in csv format, without any column titles. Now, upload the file to HDFS:

hdfs dfs -put word_count.csv

The extraction part of the bulk load now being complete, you need to transform the file. First you need to design the table. To keep things simple, call it “wordcount” — the row keys will be the words themselves and the only column will contain the count, in a family that we’ll call “f”. The best practice when creating a table is to split it according to the row key distribution but for this example we’ll just create five regions with split points spread evenly across the key space. Open the hbase shell:

hbase shell

And run the following command to create the table:

create 'wordcount', {NAME => 'f'},   {SPLITS => ['g', 'm', 'r', 'w']}

The four split points will generate five regions, where the first region starts with an empty row key. To get better split points you could also do a quick analysis to see how the words are truly distributed, but I’ll leave that up to you.

If you point your VM’s browser to http://localhost:60010/ you will see our newly created table and its five regions all assigned to the RegionServer.

Now it’s time to do the heavy lifting. Invoking the HBase jar on the command line with the “hadoop” script will show a list of available tools. The one we want is called importtsv and has the following usage:

hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar importtsv
 ERROR: Wrong number of arguments: 0
 Usage: importtsv -Dimporttsv.columns=a,b,c  

The command line we are going to use is the following one:

hadoop jar   /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
security.jar importtsv
-Dimporttsv.separator=,
-Dimporttsv.bulk.output=output
-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv

Here’s a rundown of the different configuration elements:

  • -Dimporttsv.separator=, specifies that the separator is a comma.
  • -Dimporttsv.bulk.output=output is a relative path to where the HFiles will be written. Since your user on the VM is “cloudera” by default, it means the files will be in /user/cloudera/output. Skipping this option will make the job write directly to HBase.
  • -Dimporttsv.columns=HBASE_ROW_KEY,f:count is a list of all the columns contained in this file. The row key needs to be identified using the all-caps HBASE_ROW_KEY string; otherwise it won’t start the job. (I decided to use the qualifier “count” but it could be anything else.)

The job should complete within a minute, given the small input size. Note that five Reducers are running, one per region. Here’s the result on HDFS:

-rw-r--r--   3 cloudera cloudera         4265   2013-09-12 13:13 output/f/2c0724e0c8054b70bce11342dc91897b
-rw-r--r--   3 cloudera cloudera         3163   2013-09-12 13:14 output/f/786198ca47ae406f9be05c9eb09beb36
-rw-r--r--   3 cloudera cloudera         2487   2013-09-12 13:14 output/f/9b0e5b2a137e479cbc978132e3fc84d2
-rw-r--r--   3 cloudera cloudera         2961   2013-09-12 13:13 output/f/bb341f04c6d845e8bb95830e9946a914
-rw-r--r--   3 cloudera cloudera         1336   2013-09-12 13:14 output/f/c656d893bd704260a613be62bddb4d5f

As you can see, the files currently belong to the user “cloudera”. In order to load them we need to change the owner to “hbase” or HBase won’t have the permission to move the files. Run the following command:

sudo -u hdfs hdfs dfs -chown -R   hbase:hbase/user/cloudera/output

For the final step, we need to use the completebulkload tool to point to where the files are and which tables we are loading to:

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount

Going back into the HBase shell, you can run the count command that will show you how many rows were loaded. If you forgot to chown, the command will hang.

Custom MR Job

The TSV bulk loader is good for prototyping, but because it interprets everything as strings and doesn’t support manipulating the fields at transformation time, you will end up having to write your own MR job. My colleague James Kinley, who works as a solutions architect in Europe, wrote such a job that we’re going to use for our next example. The data for the job contains public Facebook and Twitter messages related to the 2010 NBA Finals (game 1) between the Lakers and the Celtics. You can find the code here. (The Quick Start VM comes with git and maven installed so you can clone the repository on it.)

Looking at the Driver class, the most important bits are the following:

job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);
…
	// Auto configure partitioner and reducer
    HFileOutputFormat.configureIncrementalLoad(job, hTable);

First, your Mapper needs to output a ImmutableBytesWritable that contains the row key, and the output value can be either a KeyValue, a Put, or a Delete. The second snippet shows how to configure the Reducer; it is in fact completely handled by HFileOutputFormat. configureIncrementalLoad() as described in the “Transform” section previously.

The HBaseKVMapper class only contains the Mapper that respects the configured output key and values:

public class HBaseKVMapper extends
   Mapper<LongWritable,   Text, ImmutableBytesWritable,
KeyValue> {

In order to run it you’ll need to compile the project using maven and grab the data files following the links in the README. (It also contains the shell script to create the table.) Before starting the job, don’t forget to upload the files to HDFS and to set your classpath to be aware of HBase because you’re not going to use its jar this time:

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/etc/hbase/conf/:/usr/lib/hbase/*

You’ll be able to start the job using a command line similar to this one:

hadoop jar hbase-examples-0.0.1-SNAPSHOT.jar
com.cloudera.examples.hbase.bulkimport.Driver -libjars
/home/cloudera/.m2/repository/joda-time/joda-time/2.1/joda-time-2.1.jar,
/home/cloudera/.m2/repository/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jar
RowFeeder\ for\ Celtics\ and\ Lakers\ Game\ 1.csv output2 NBAFinal2010

As you can see, the job’s dependencies have to be added separately. Finally, you can load the files by first changing their owner and then running the completebulkload tool:

sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output2
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output2 NBAFinal2010

Potential Issues

Recently deleted data reappearing. This issue happens when a Delete is inserted via a bulk load and is major compacted while the corresponding Put is still in a MemStore. The data will be considered deleted when the Delete is in an HFile but, once it’s removed during the compaction, the Put will become visible again. If you have such a use case, consider configuring your column families to keep the deleted cells with KEEP_DELETED_CELLS in the shell or HColumnDescriptor.setKeepDeletedCells().

Bulk-loaded data cannot be overwritten by another bulk load. This issue occurs when two bulk-loaded HFiles loaded at different times try to write a different value in the same cell, meaning that they have the same row key, family, qualifier, and timestamp. The result is that the first inserted value will be returned instead of the second one. This bug will be fixed in HBase 0.96.0 and CDH 5 (the next CDH major version) and work is being done in HBASE-8521 for the 0.94 branch and CDH 4.

Bulk loading triggers major compactions. This issue comes up when you’re doing incremental bulk loads and there are enough bulk-loaded files to trigger a minor compaction (the default threshold being 3). The HFiles are loaded with a sequence number set to 0 so they get picked up first when the RegionServer is selecting files for a compaction, and due to a bug it will also select all the remaining files. This issue will seriously affect those who already have big regions (multiple GBs) or who bulk load often (every few hours and less) as a lot of data will be compacted. HBase 0.96.0 has the proper fix and so will CDH 5; HBASE-8521 fixes the issue in 0.94 as the bulk-loaded HFiles are now assigned a proper sequence number. HBASE-8283 can be enabled with hbase.hstore.useExploringCompation after 0.94.9 and CDH 4.4.0 to mitigate this issue by just being a smarter compaction-selection algorithm.

Bulk-loaded data isn’t replicated. As bulk loading bypasses the write path, the WAL doesn’t get written to as part of the process. Replication works by reading the WAL files so it won’t see the bulk loaded data – and the same goes for the edits that use Put.setWriteToWAL(true). One way to handle that is to ship the raw files or the HFiles to the other cluster and do the other processing there.

Conclusion

The goal of this blog post was to introduce you to Apache HBase bulk loading’s basic concepts. We explained how the process is like doing ETL, and that it is much better for big data sets than using the normal API since it bypasses the write path. The two examples were included to show how simple TSV files can be bulk loaded to HBase and how to write your own Mapper for other data formats.

Now you can try doing the same using a graphical user interface via Hue.

 

Jean-Daniel Cryans
More by this author

Leave a comment

Your email address will not be published. Links are not permitted in comments.