How-to: Do Apache Flume Performance Tuning (Part 1)

The post below was originally published via blogs.apache.org and is republished below for your reading pleasure.

This is Part 1 in a series of articles about tuning the performance of Apache Flume, a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of event data.

To kick off this series, I’d like to start off discussing some important Flume concepts that come into play when tuning your Flume flows for maximum performance: the channel and the transaction batch size.

Setting Up a Data Flow

Imagine you want to take a heavy stream of user activity from your application server logs and store that onto your Hadoop cluster for analytics. If you have a large deployment of application servers, you would likely want to build a fan-in architecture, where you are sending data from many nodes to relatively fewer nodes.

If you are sending these user events one at a time, each time waiting for acknowledgment that it was delivered, your throughput may be limited by network latency. Naturally, you would want to batch up the events into larger transactions, so that you amortize the latency of the acknowledgment over a larger number of events and therefore get more throughput.

Channels

So, what happens if the storage tier goes down momentarily, as in the case of a network partition? What happens to the events if a Flume agent machine crashes? We still want to be able to serve our users on the application tier and retain our data somehow. In order to accomplish this, we need a buffering mechanism on each agent that allows it to store events in the case of downstream failures or slowdowns. In Flume, the channel is what persists events at each hop in the flow. Below is a diagram that illustrates where the channel sits in the architecture of a Flume agent.

Memory Channel vs. File Channel

An important decision to make when designing your Flume flow is what type of channel you want to use. At the time of this writing, the two recommended channels are the file channel and the memory channel. The file channel is a durable channel, as it persists all events that are stored in it to disk. So, even if the Java virtual machine is killed, or the operating system crashes or reboots, events that were not successfully transferred to the next agent in the pipeline will still be there when the Flume agent is restarted. The memory channel is a volatile channel, as it buffers events in memory only: if the Java process dies, any events stored in the memory channel are lost. Naturally, the memory channel also exhibits very low put/take latencies compared to the file channel, even for a batch size of 1. Since the number of events that can be stored is limited by available RAM, its ability to buffer events in the case of temporary downstream failure is quite limited. The file channel, on the other hand, has far superior buffering capability due to utilizing cheap, abundant hard disk space.

Flume Event Batching

As mentioned earlier, Flume can batch events. The batch size is the maximum number of events that a sink or client will attempt to take from a channel in a single transaction. Tuning the batch size trades throughput vs. latency and duplication under failure. With a small batch size, throughput decreases, but the risk of event duplication is reduced if a failure were to occur. With a large batch size, you get much better throughput, but increased latency, and in the case of a transaction failure, the number of possible duplicates increases.

Transactions are a critical concept in Flume, because the delivery and durability guarantees made by channels only take effect at the end of each successful transaction. For example, when a source receives or generates an event, in order to store that event into a channel a transaction must first be opened on that channel. Within the transaction, the source puts up to the batch size number of events into the channel, and on success commits the transaction. A sink must go through the same process of operating within a transaction when taking events from a channel.

Batch size is configured at the sink level. The larger the batch, the faster the channels operate (but there is a caveat). In the case of the file channel, this speed difference is because all buffers are flushed and then synced to disk when each transaction is committed. A disk sync is a time-consuming operation (it may take several milliseconds), but it is required to ensure the durability of the data. Likewise, with the memory channel, there is a memory synchronization step when each transaction is committed. Of course, memory synchronization is much faster than a disk sync. For more information on the inner workings of the file channel, please see Brock Noland’s article about the Apache Flume File Channel.

The downside of using a large batch size is that if there is some type of failure in the middle of a transaction, such as a downstream host or network failure, there is a possibility of duplicates being created. So, for example, if you set your batch size to 1000, and the machine you are writing to goes offline, duplicates may be generated in groups of up to 1000. This may occur in special cases, for example, if the events got written to the downstream machine but then the connection failed before it could acknowledge that it had received them. However, duplicate events will only appear in exceptional circumstances.

Choosing a Batch Size

To squeeze all the performance possible out of a Flume system, batch sizes should be tuned with care through experimentation. While I will get into this in more detail in the follow-up to this post, here are some rules of thumb for selecting batch sizes.

  1. Start off with batch sizes equal to the sum of the batch sizes of the input streams coming into that tier. For example, if you have a downstream Flume agent running an Avro source with 10 upstream agents sending events via Avro sinks using a batch size of 100 each, consider starting that downstream agent with a batch size of 1,000. Tune / experiment from there.
  2. If you find yourself setting the batch size very high (say, higher than 10,000) then consider adding another Sink instead, in order to increase the parallelism (each sink typically runs on its own thread). Say you were going to use one HDFS sink with a batch size of 20,000. Experiment with using 2 HDFS sinks with batch sizes of 5,000 or 10,000 to see if that helps more.
  3. Prefer the lowest batch size that gives you acceptable performance.
  4. Monitor the steady-state channel sizes to get more tuning insight (more on this in the next article).

Different Batch Sizes in Different Situations

Due to the performance / duplication tradeoff of the batch size parameter, I often see varying batch size settings depending on the use case. In the case of using Flume to write to HBase using the HBase Sink, incrementing counters, a smaller batch size on the order of 100 is often used to reduce the impact in case of hiccups in the system. On the other hand, with an HDFS sink, in order to get maximum throughput, I see people running with batch sizes of 1,000 or even 10,000, since typically they can easily run map-reduce jobs to de-duplicate the data at processing time. Note that when writing large events with large batch sizes to HDFS, often other parameters need to be increased as well. One such parameter is hdfs.callTimeout, which may be increased to 60 seconds or more to account for the long tail of occasional higher-latency calls to HDFS.

At the other end of the spectrum, in cases where batching events at the application (Flume client) tier is not possible, the memory channel is often used at the collector-tier Flume agent (or a localhost agent on the app servers) to get acceptable performance with a batch size of 1, while using larger batch sizes and file channels in the downstream agents in order to get most of the benefits of durability there. For the best performance, however, all tiers including the client/application tier would perform some level of batching. (Please see above diagram for an illustration of the tiers referenced in this scenario.)

Configuration Parameters and Gotchas

The actual parameter used to set the batch size varies between sinks, but for most sinks it’s simply called batchSize or batch-size. For the HDFS sink, it’s actually called hdfs.batchSize for historical reasons; I recommend setting hdfs.txnEventMax to the same value as hdfs.batchSize for simplicity. Historically, in the HDFS sink, the number of events taken in a single transaction can be different from the number of events written to HDFS before a sync() operation; In practice, there is little reason these should not be set to the same value.

One potentially confusing gotcha when tuning the batch size on a sink is that it must be less than or equal to the transactionCapacity set on the corresponding channel. The transactionCapacity should be set to the value of the largest batch size that will be used to store or remove events from that channel.

tl;dr: Below is a “cheat sheet” batch size tuning summary for your convenience. Please note that these are just starting points for tuning.

Sink Type Config parameter Typical value
Avro batch-size 100
HDFS hdfs.batchSize, hdfs.txnEventMax 1000
HBaseSink batchSize 100
AsyncHBaseSink batchSize 100

That’s all I have space for in one blog post. Please leave feedback and questions in the comments. Look for another post in the future with tips on using Flume’s monitoring capabilities to take advantage of important information which can aid you in your quest for optimum performance.

Mike Percy is a software engineer at Cloudera and an Apache Flume committer.

Filed under:

7 Responses
  • Matt / February 12, 2013 / 11:30 AM

    While the FileChannel persists data that has been placed in the queue before it shuts down it still allows for data loss. For example if you are using tail on a log file and the Flume agent processing this data goes down you will lose all data that is new to the log that has not been placed in the queue until the process is restarted. Your topology above may solve this problem as one process. My question is will this cause redundant data?

    • Mike Percy (@mike_percy) / February 15, 2013 / 3:39 PM

      Hi Matt,
      Good question. First off, when you say “tail on a log file” are you referring to using the exec source with “tail -F” as the command? If so, the exec source documentation at http://archive.cloudera.com/cdh4/cdh/4/flume-ng/FlumeUserGuide.html#exec-source clearly warns that it is not a reliable source. If you need reliable delivery of events, I would not recommend using the exec source. Similar issues arise (inability to acknowledge receipt or failure) occur with protocols like syslog and netcat, simply because the creators of those protocols did not design them to be 100% reliable protocols… they are essentially “fire and forget” protocols. If you drop events while using such protocols, it’s really just a limitation of the protocol itself.

      You probably want to consider using an Avro Source or another reliable source. In the topology pictured above, I am primarily referring to using a reliable client, such as the RPC client library, from your app server to talk to an Avro source. With all reliable sources, the source guarantees that the data is committed to the channel before it responds to the client that the event put request has been successful.

      We continue to add new features and reliable sources to Flume in each CDH release, so if the Avro RPC protocol doesn’t work for you now, keep an eye out for the HTTP source, which is going into CDH 4.2 Flume. HTTP can also be a reliable protocol, because we can return the HTTP 200 status code for success, HTTP 5xx for failures, etc.

      Hope this helps.

  • Avinash / August 08, 2013 / 1:36 AM

    Hi Mike,

    Recently I started learning Flume. While learning I tried the different combinations of channels capacity and sink batch sizes. I have used the HDFS sink and I tried to set the batch size to large number so that I can get the file size on HDFS, of more than 64MB.
    However, during this I found that if I use the large batch size the performance of the flume agent is decreased.

    One possible combination that I tried is as follows:
    1 SINK, 1 CHANNEL with capacity around 64MB and 6 sinks for 1 channel with batch size around 6710886.
    My question is, if we use larger batch size to transfer data, will it decrease the performance?

    • Mike Percy (@mike_percy) / January 05, 2014 / 12:45 PM

      At some point, increasing batch size will give you diminishing returns in terms of performance improvement. Could it hurt performance? Well if your batch sizes are huge, then if you don’t increase your sink timeout (i.e. HDFS Sink hdfs.callTimeout) then you will get timeouts and retries which will hurt performance overall.

  • James / December 19, 2013 / 10:27 AM

    Part #2 please =)

  • anirudh / January 05, 2014 / 2:11 AM

    Hi Mike ,

    I recently started using flume for streaming twitter using the cloudera example for twitter data analysis.

    However what I am seeing is that when I run the flume agent , i get the tweets files of expected size some where around 80 Kb but after some time I see multiple duplicate files of large size 4 to 5 mb .

    the Agent logs are saying it is a transaction error and consider committing more frequently. I guess I need to pull down my batch size.

    I am using the same configuration file as specified in the example

    Regards
    Anirudh

Leave a comment


three + = 10