About Apache Flume FileChannel

The post below was originally published via blogs.apache.org and is republished below for your reading pleasure.

This blog post is about Apache Flume’s File Channel. Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

FileChannel is a persistent Flume channel that supports writing to multiple disks in parallel and encryption.

Overview

When using Flume, each flow has a Source, Channel, and Sink. A typical example would be a webserver writing events to a Source via RPC (e.g. Avro Source), the sources writing to MemoryChannel, and HDFS Sink consuming the events, writing them to HDFS.

MemoryChannel provides high throughput but loses data in the event of a crash or loss of power. As such the development of a persistent Channel was desired. FileChannel was implemented in FLUME-1085. The goal of FileChannel is to provide a reliable high throughput channel. FileChannel guarantees that when a transaction is committed, no data will be lost due to a subsequent crash or loss of power.

It’s important to note that FileChannel does not do any replication of data itself. As such, it is only as reliable as the underlying disks. Users who use FileChannel because of its durability should take this into account when purchasing and configuring hardware. The underlying disks should be RAID, SAN, or similar.

Many systems trade a small amount of data loss (fsync from memory to disk every few seconds for example) for higher throughput. The Flume team decided on a different approach with FileChannel. Flume is a transactional system and multiple events can be either Put or Taken in a single transaction. The batch size can be used to control throughput. Using large batch sizes, Flume can move data through a flow with no data loss and high throughput. The batch size is completely controlled by the client. This is an approach users of RDBMS’s will be familiar with.

A Flume transaction consists of either Puts or Takes, but not both, and either a commit or a rollback. Each transaction implements both a Put and Take method. Sources do Puts onto the channel and Sinks do Takes from the channel.

Design

FileChannel is based on a write ahead log or WAL in addition to an in-memory queue. Each transaction is written to the WAL based on the transaction type (Take or Put) and the queue is modified accordingly. Each time a transaction is committed, fsync is called on the appropriate file to ensure the data is actually on disk and a pointer to that event is placed on a queue. The queue serves just like any other queue: it manages what is yet to be consumed by the sink. During a take, a pointer is removed from the queue. The event is then read directly from the WAL. Due to the large amount of RAM available today, it’s very common for that read to occur from the operating system file cache.

After a crash, the WAL can be replayed to place the queue in the same state it was immediately preceding the crash such that no committed transactions are lost. Replaying WALs can be time consuming, so the queue itself is written to disk periodically. Writing the queue to disk is called a checkpoint. After a crash, the queue is loaded from disk and then only committed transactions after the queue was saved to disk are replayed, significantly reducing the amount of WAL, which must be read. 

For example, a channel that has two events will look like this:

The WAL contains three important items: the transaction id, sequence number, and event data. Each transaction has a unique transaction id, and each event has a unique sequence number. The transaction id is used simply to group events into a transaction while the sequence number is used when replaying logs. In the above example, the transaction id is 1 and the sequence numbers are 1, 2, and 3.

When the queue is saved to disk – a checkpoint – the sequence number is incremented and saved as well. At restart, first the queue from disk is loaded and then any WAL entries with a greater sequence number than the queue, are replayed. During the checkpoint operation the channel is locked so that no Put or Take operations can alter it’s state. Allowing modification of the queue during the checkpoint would result in an inconsistent snapshot of the queue stored on disk.

In the example queue above, a checkpoint occurs after the commit of transaction 1 resulting in the queue being saved to disk with both events (“a” and “b”) and a sequence number of 4.

After that point, event a is Taken from the queue in transaction 2:

If a crash occurs, the queue checkpoint is read from disk. Note that since the checkpoint occurred before transaction 2, both events a and b currently exist on the queue. Then the WAL is read and any committed transaction with a sequence number greater than 4 is applied resulting in “a” being removed from the queue.

Two items are not covered by the design above. Takes and Puts which are in progress at the time the checkpoint occurs are lost. Assume the checkpoint occurred instead after the take of “a”:

If a crash occurred at this point, under the design described above, event “b” would be on the queue and on replay any WAL entry with a sequence number greater than 5 would be replayed. The Rollback for transaction 2 would be replayed, but the Take for transaction 2 would not be replayed. As such, “a” would not be placed on the queue resulting in data loss. A similar scenario is played out for Puts. For this reason, when a queue checkpoint occurs, transactions which are still in progress are also written out so that this scenario can be handled appropriately.

Implementation

FileChannel is stored in the flume-file-channel module of the Flume project and it’s Java package name is org.apache.flume.channel.file. The queue described above is named FlumeEventQueue and the WAL is named Log. The queue itself is a circular array and is backed by a Memory Mapped File while the WAL is a set of files written and read from using the LogFile class and its subclasses.

Conclusion

FileChannel provides Flume users with durability in the face of hardware, software, and environmental failures while perserving high throughput. It is the recommended channel for most topologies where both aspects are important.

Brock Noland is a software engineer at Cloudera.

6 Responses
  • ram / September 28, 2012 / 12:43 AM

    1. How is the file channel different from any other db transaction log / commit design?
    2. Did you consider going in the path of just writing the actual data sequentially to a file instead of transaction log / commit design (similar to kafka)?

  • Brock Noland / September 28, 2012 / 7:42 AM

    1) It is similar to a db transaction log only supports a queue as opposed to a table. There is a JDBC Channel which embeds a database but we have found performance to be quite poor.

    2) From what I understand about Kafka, the design is similar. We do just write the data sequential to a file, never updating it, and then we someone needs an event read it directly out the file. The base assumption is the same as Kafka, that the data is likely in the OS file cache.

  • Alex Baranau / September 28, 2012 / 3:33 PM

    Nice post!

    On the third picture (where “checkpoint with the sequence number 5″) – why to allow creating checkpoint while some transaction is in progress? Does it makes sense (if it is possible) to create checkpoints only in between the transactions? (i.e. so that no transactions are caught in the middle)

  • Brock Noland / September 29, 2012 / 7:47 AM

    Hi,

    That would be ideal, however, since the amount of time a transaction is open, is controlled by sinks and sources, it could be open for a long time. As such, the checkpoint duration would block from the time the first client was done with it’s transaction lasting until to the last client plus the time it actually takes to write out a checkpoint.

    Brock

  • Hari Shreedharan / October 01, 2012 / 11:33 AM

    Just adding to what Brock said: during a checkpoint, events in transactions in flight are also serialized, so they can be recovered when the channel starts up – so no data is lost when the channel starts up even if the channel was in the middle of a transaction during the checkpoint. These “inflight” events are replayed if and only if the specific transaction was actually committed.

  • Brock Noland / October 01, 2012 / 11:39 AM

    Yes, I could have been more clear on that. I only said “For this reason, when a queue checkpoint occurs, transactions which are still in progress are also written out so that this scenario can be handled appropriately.”

Leave a comment


− five = 4