Apache Flume – Architecture of Flume NG

This blog was originally posted on the Apache Blog: https://blogs.apache.org/flume/entry/flume_ng_architecture

Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store. Flume is currently undergoing incubation at The Apache Software Foundation. More information on this project can be found at http://incubator.apache.org/flume. Flume NG is work related to new major revision of Flume and is the subject of this post.

Prior to entering the incubator, Flume saw incremental releases leading up to version 0.9.4. As Flume became adopted it became clear that certain design choices would need to be reworked in order to address problems reported in the field. The work necessary to make this change began a few months ago under the JIRA issue FLUME-728. This work currently resides on a separate branch by the name flume-728, and is informally referred to as Flume NG. At the time of writing this post Flume NG had gone through two internal milestones – NG Alpha 1, and NG Alpha 2 and a formal incubator release of Flume NG is in the works.

At a high-level, Flume NG uses a single-hop message delivery guarantee semantics to provide end-to-end reliability for the system. To accomplish this, certain new concepts have been incorporated into its design, while certain other existing concepts have been either redefined, reused or dropped completely.

In this blog post, I will describe the fundamental concepts incorporated in Flume NG and talk about it’s high-level architecture. This is a first in a series of blog posts by Flume team that will go into further details of it’s design and implementation.

Core Concepts

The purpose of Flume is to provide a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store. The architecture of Flume NG is based on a few concepts that together help achieve this objective. Some of these concepts have existed in the past implementation, but have changed drastically. Here is a summary of concepts that Flume NG introduces, redefines, or reuses from earlier implementation:

  • Event: A byte payload with optional string headers that represent the unit of data that Flume can transport from it’s point of origination to it’s final destination.
  • Flow: Movement of events from the point of origin to their final destination is considered a data flow, or simply flow. This is not a rigorous definition and is used only at a high level for description purposes.
  • Client: An interface implementation that operates at the point of origin of events and delivers them to a Flume agent. Clients typically operate in the process space of the application they are consuming data from. For example, Flume Log4j Appender is a client.
  • Agent: An independent process that hosts flume components such as sources, channels and sinks, and thus has the ability to receive, store and forward events to their next-hop destination.
  • Source: An interface implementation that can consume events delivered to it via a specific mechanism. For example, an Avro source is a source implementation that can be used to receive Avro events from clients or other agents in the flow. When a source receives an event, it hands it over to one or more channels.
  • Channel: A transient store for events, where events are delivered to the channel via sources operating within the agent. An event put in a channel stays in that channel until a sink removes it for further transport. An example of channel is the JDBC channel that uses a file-system backed embedded database to persist the events until they are removed by a sink. Channels play an important role in ensuring durability of the flows.
  • Sink: An interface implementation that can remove events from a channel and transmit them to the next agent in the flow, or to the event’s final destination. Sinks that transmit the event to it’s final destination are also known as terminal sinks. The Flume HDFS sink is an example of a terminal sink. Whereas the Flume Avro sink is an example of a regular sink that can transmit messages to other agents that are running an Avro source.

These concepts help in simplifying the architecture, implementation, configuration and deployment of Flume.

Flow Pipeline

A flow in Flume NG starts from the client. The client transmits the event to it’s next hop destination. This destination is an agent. More precisely, the destination is a source operating within the agent. The source receiving this event will then deliver it to one or more channels. The channels that receive the event are drained by one or more sinks operating within the same agent. If the sink is a regular sink, it will forward the event to it’s next-hop destination which will be another agent. If instead it is a terminal sink, it will forward the event to it’s final destination. Channels allow the decoupling of sources from sinks using the familiar producer-consumer model of data exchange. This allows sources and sinks to have different performance and runtime characteristics and yet be able to effectively use the physical resources available to the system.

Figure 1 below shows how the various components interact with each other within a flow pipeline.

Schematic showing logical components in a flow. The arrows represent the direction in which events travel across the system. This also illustrates how flows can fan-out by having one source write the event out to multiple=

Figure 1: Schematic showing logical components in a flow. The arrows represent the direction in which events travel across the system. This also illustrates how flows can fan-out by having one source write the event out to multiple channels.

By configuring a source to deliver the event to more than one channel, flows can fan-out to more than one destination. This is illustrated in Figure 1 where the source within the operating Agent writes the event out to two channels – Channel 1 and Channel 2.

Conversely, flows can be converged by having multiple sources operating within the same agent write to the same channel. A example of the physical layout of a converging flow is show in Figure 2 below.

A simple converging flow on Flume NG.

Figure 2: A simple converging flow on Flume NG.

Reliability and Failure Handling

Flume NG uses channel-based transactions to guarantee reliable message delivery. When a message moves from one agent to another, two transactions are started, one on the agent that delivers the event and the other on the agent that receives the event. In order for the sending agent to commit it’s transaction, it must receive success indication from the receiving agent. The receiving agent only returns a success indication if it’s own transaction commits properly first. This ensures guaranteed delivery semantics between the hops that the flow makes. Figure 3 below shows a sequence diagram that illustrates the relative scope and duration of the transactions operating within the two interacting agents.

Transactional exchange of events between agents.

Figure 3: Transactional exchange of events between agents.

This mechanism also forms the basis for failure handling in Flume NG. When a flow that passes through many different agents encounters a communication failure on any leg of the flow, the affected events start getting buffered at the last unaffected agent in the flow. If the failure is not resolved on time, this may lead to the failure of the last unaffected agent, which then would force the agent before it to start buffering the events. Eventually if the failure occurs when the client transmits the event to its first-hop destination, the failure will be reported back to the client which can then allow the application generating the events to take appropriate action.

On the other hand, if the failure is resolved before the first-hop agent fails, the buffered events in various agents downstream will start draining towards their destination. Eventually the flow will be restored to its original characteristic throughput levels. Figure 4 below illustrates a scenario where a flow comprising of two intermediary agents between the client and the central store go through a transient failure. The failure occurs between agent 2 and the central store, resulting in the events getting buffered at the agent 2 itself. Once the failing link has been restored to normal, the buffered events drain out to the central store and the flow is restored to its original throughput characteristics.

Failure handling in flows.

Figure 4: Failure handling in flows. In (a) the flow is normal and events can travel from the client to the central store. In (b) a communication failure occurs between Agent 2 and the event store resulting in events being buffered on Agent 2. In (c) the cause of failure was addressed and the flow was restored and any events buffered in Agent 2 were drained to the store.

Wrapping up

In this post I described the various concepts that are a part of Flume NG and its high-level architecture. This is first of a series of posts from the Flume team that will highlight the design and implementation of this system. In the meantime, if you need anymore information, please feel free to drop an email on the project’s user or developer lists, or alternatively file the appropriate JIRA issues. Your contribution in any form is welcome on the project.

Links:

Project Website: http://incubator.apache.org/flume/

Flume NG Getting Started Guide: https://cwiki.apache.org/confluence/display/FLUME/Getting+Started

Mailing Lists: http://incubator.apache.org/flume/mail-lists.html

Issue Tracking: https://issues.apache.org/jira/browse/FLUME

IRC Channel: #flume on irc.freenode.net

9 Responses
  • Tims / December 09, 2011 / 7:26 PM

    I’m pleased with these abstractions, this sounds like a great refactor. I especially like that the channel behaviour will make end-to-end flow’s failure handing clearer.

  • Saurabh / March 01, 2013 / 4:15 AM

    Arvind,

    Excellent Article. We have multiple flume agents talking to one flume agent ( acting as collector) and then that collector ingest the data into HBase.

    To handle higher velocity, we need to configure multiple collector agents.

    Flume NG has an option to configure Sink processor to manage Load balancing / Failover scenarios but documentation on http://flume.apache.org/FlumeUserGuide.html#flume-sink-processors is limited that it explaining only about required fields.

    Can you guide us what is the best way to load balance the traffic flowing from a group of agents to group of collectors?

    Thanks,
    Saurabh.

  • Mike Percy / March 13, 2013 / 1:09 PM

    Saurabh,

    Have you tried asking this question on the email lists?

    The way you can set up load balancing is to set up sinks and sink groups. For example:


    agent.sinkgroups = load-balancer-sink-group
    agent.sinks = avro-sink-1 avro-sink-2

    # Define sink group for load balancing
    agent.sinkgroups.load-balancer-sink-group.sinks = avro-sink-1 avro-sink-2
    agent.sinkgroups.load-balancer-sink-group.processor.type = load_balance
    agent.sinkgroups.load-balancer-sink-group.processor.selector = random

    # avro-sink-1 properties
    agent.sinks.avro-sink-1.type = avro
    agent.sinks.avro-sink-1.hostname = host1.example.com
    agent.sinks.avro-sink-1.port = 4040

    # avro-sink-2 properties
    agent.sinks.avro-sink-2.type = avro
    agent.sinks.avro-sink-2.hostname = host2.example.com
    agent.sinks.avro-sink-2.port = 4040

    Hope this helps. Please feel free to join the mailing lists cdh-user@cloudera.org via https://groups.google.com/a/cloudera.org/forum/?fromgroups#!forum/cdh-user and user@flume.apache.org via http://flume.apache.org/mailinglists.html for more information.

    Regards,
    Mike

Leave a comment


7 + = fifteen