Streaming Data into Apache HBase using Apache Flume

The following post was originally published via blog.apache.org; we are re-publishing it here.

Apache Flume was conceived as a fault-tolerant ingest system for the Apache Hadoop ecosystem. Flume comes packaged with an HDFS Sink which can be used to write events into HDFS, and two different implementations of HBase sinks to write events into Apache HBase. You can read about the basic architecture of Apache Flume 1.x in this blog post. You can also read about how Flume’s File Channel persists events and still provides extremely high performance in an earlier blog post. In this article, we will explore how to configure Flume to write events into HBase, and write custom serializers to write events into HBase in a format of the user’s choice.

Data is stored in HBase as tables. Each table has one or more column families, and each column family has one or more columns. HBase stores all columns in a column family in physical proximity. Each row is identified by a key known as the row key. To insert data into HBase, the table name, column family, column name and row key have to be specified. More details on the HBase data model can be found in the HBase documentation.

Flume has two HBase Sinks, the HBaseSink(org.apache.flume.sink.hbase.HBaseSink) and AsyncHBaseSink(org.apache.flume.sink.hbase.AsyncHBaseSink). These two sinks will eventually converge to similar functionality, but currently each has some advantages over the other:

  • The AsyncHBaseSink currently gives better performance than the HBaseSink, primarily because it makes non-blocking calls to HBase.
  • The HBaseSink will soon support secure HBase clusters (FLUME-1626) and the new HBase IPC which was introduced in HBase 0.96.

The configuration for both these sinks are very similar. A sample configuration is shown below:

  #Use the AsyncHBaseSink
  host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
  #Use the HBaseSink
  #host1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
  host1.sinks.sink1.channel = ch1
  host1.sinks.sink1.table = transactions
  host1.sinks.sink1.columnFamily = clients
  host1.sinks.sink1.column = charges
  host1.sinks.sink1.batchSize = 5000
  #Use the SimpleAsyncHbaseEventSerializer that comes with Flume
  host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
  #Use the SimpleHbaseEventSerializer that comes with Flume
  #host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
  host1.sinks.sink1.serializer.incrementColumn = icol
  host1.channels.ch1.type=memory
  

In the above config, the “table” parameter specifies the table in HBase that the sink has to write to – in this case, “transactions”; the “columnFamily” parameter specifies the column family in that table to insert the data into, in this case, “clients”; and the “column” parameter specifies the column in the column family to write to, in this case “charges”. Apart from this the sink requires the channel to be configured, like all other Flume Sinks. The other interesting configuration parameters are the “serializer” and the “serializer.*” parameters. The two sinks use different interfaces for the serializer. In both cases, the serializer is a class that converts the Flume Event into an HBase-friendly format. This piece of code that “translates” the events is usually specific to the schema used by the user’s HBase cluster and is usually implemented by the user. All configuration parameters passed in as “serializer.*” are passed to the serializer. This configuration can be used to set up any internal state the serializer needs.

In case of the HBaseSink, the serializer converts a Flume Event into one or more HBase Puts and/or Increments. The serializer must implement the HbaseEventSerializer. The serializer is instantiated when the sink is started by the Flume configuration framework. For each event processed by the sink, the sink calls the initialize method in the serializer. The serializer must “translate” the Flume Event into HBase puts and increments which should be returned by getActions and getIncrements methods.  These puts and increments are then sent over the wire to the HBase cluster. When the sink stops, this instance of the serializer is closed by the HBaseSink.

The AsyncHBaseSink’s serializer must implement AsyncHbaseEventSerializer. In this case, the initialize method is called once by the sink, when it starts up. For every event, the sink calls the setEvent method and then calls the getActions and getIncrements methods – similar to the HBaseSink. When the sink is stopped, the serializer’s cleanUp method is called. Notice that the methods do not return the standard HBase Puts and Increments, but PutRequest and AtomicIncrementRequest from the asynchbase API. These are roughly equivalent to the HBase Puts and Increments respectively, with some differences.
 
An example of such a serializer is below.

/**
   * A serializer for the AsyncHBaseSink, which splits the event body into
   * multiple columns and inserts them into a row whose key is available in
   * the headers
   */
  public class SplittingSerializer implements AsyncHbaseEventSerializer {
    private byte[] table;
    private byte[] colFam;
    private Event currentEvent;
    private byte[][] columnNames;
    private final List<PutRequest> puts = new ArrayList<PutRequest>();
    private final List<AtomicIncrementRequest> incs = new ArrayList<AtomicIncrementRequest>();
    private byte[] currentRowKey;
    private final byte[] eventCountCol = "eventCount".getBytes();

    @Override
    public void initialize(byte[] table, byte[] cf) {
      this.table = table;
      this.colFam = cf;
    }

    @Override
    public void setEvent(Event event) {
      // Set the event and verify that the rowKey is not present
      this.currentEvent = event;
      String rowKeyStr = currentEvent.getHeaders().get("rowKey");
      if (rowKeyStr == null) {
        throw new FlumeException("No row key found in headers!");
      }
      currentRowKey = rowKeyStr.getBytes();
    }

    @Override
    public List<PutRequest> getActions() {
      // Split the event body and get the values for the columns
      String eventStr = new String(currentEvent.getBody());
      String[] cols = eventStr.split(",");
      puts.clear();
      for (int i = 0; i < cols.length; i++) {
        //Generate a PutRequest for each column.
        PutRequest req = new PutRequest(table, currentRowKey, colFam,
                columnNames[i], cols[i].getBytes());
        puts.add(req);
      }
      return puts;
    }

    @Override
    public List<AtomicIncrementRequest> getIncrements() {
      incs.clear();
      //Increment the number of events received
      incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));
      return incs;
    }

    @Override
    public void cleanUp() {
      table = null;
      colFam = null;
      currentEvent = null;
      columnNames = null;
      currentRowKey = null;
    }

    @Override
    public void configure(Context context) {
      //Get the column names from the configuration
      String cols = new String(context.getString("columns"));
      String[] names = cols.split(",");
      byte[][] columnNames = new byte[names.length][];
      int i = 0;
      for(String name : names) {
        columnNames[i++] = name.getBytes();
      }
    }

    @Override
    public void configure(ComponentConfiguration conf) {
    }
  }
  

This serializer splits the event body based on a delimiter and inserts each split into a different column. The row is defined in the event header. When each event is received, a counter is incremented to keep track of the number of events received as well.

This serializer can be configured by the following configuration:

  host1.sinks.sink1.type = org.apache.flume.sink.hbase.AsyncHBaseSink
  host1.sinks.sink1.channel = ch1
  host1.sinks.sink1.table = transactions
  host1.sinks.sink1.columnFamily = clients
  host1.sinks.sink1.batchSize = 5000
  #The serializer to use
  host1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SplittingSerializer
  #List of columns each event writes to.
  host1.sinks.sink1.serializer.columns = charges,date,priority
  

Internals of the HBaseSink and AsyncHBaseSink

The HBaseSink uses the HBase HTable API to write events out to HBase. HTable supports batching of Puts, but only HBase 0.92+ supports batching of Increments. Currently, the HBase Sink is single-threaded and will call the serializer to get the Puts and Increments once per event it processes. HBase Put and Increments are sent to HBase via blocking calls, which means the next event is read and passed to the serializer only once the current event is successfully written to HBase. Each transaction consists of at most the number of events specified by the batchSize property in the configuration. Like all other Flume sinks, if one of these events fails to get written successfully, the sink will retry the entire transaction again.

On the other hand, the AsyncHBaseSink uses the asynchbase API, and sends out events asynchronously to HBase. The AsyncHBaseSink, in the same way as the HBase sink, generates Puts and Increments for each event. Once the Puts and Increments are generated, the sink sends them out immediately to HBase and moves on to process the next event. Success or failure is handled through callbacks. Again, each transaction consists of at most the number of events specified by the batchSize configuration parameter. The sink waits until either success callbacks are received for all the events sent, or at least one error callback is received. If an error callback is received, the entire transaction is retried, in true Flume style.

A Word of Caution

As you can see, if HBase reports failure to write even one Put or Increment, the entire transaction is retried – this is how Flume’s at-least-once semantics work, and most Flume sinks operate in the same way. In case of HBase Increments, this means it is possible that the same event would cause a counter to be incremented more than once. This is something to keep in mind while using Flume to perform Increments. Also, if the serializer is not idempotent, then this means that it is possible that the same event can cause multiple different Puts to be written to HBase. Imagine a case where we are talking about credit card transactions represented by the event. If the same event can generate different Puts each time, it is possible that HBase would have multiple records of the same transactions, which is probably not desired.

The AsyncHBaseSink is known to give better performance than the HBaseSink primarily because of the non-blocking nature of the underlying API it uses. The HBase community is working on improving the HBase client API to improve its performance, which would vastly improve the HBaseSink performance.

Conclusion

Flume is an excellent tool to write events out to the different storage systems in the Hadoop ecosystem including HBase. The HBase sinks provide the functionality to write data to HBase in your own schema and allows the user to “map” the Flume event to HBase data.

Hari Shreedharan is a Software Engineer at Cloudera, and an Apache Flume committer.

Filed under:

No Responses

Leave a comment


+ 4 = five