Analyzing Twitter Data with Apache Hadoop, Part 2: Gathering Data with Flume

This is the second article in a series about analyzing Twitter data using some of the components of the Hadoop ecosystem available in CDH, Cloudera’s open-source distribution of Apache Hadoop and related projects. In the first article, you learned how to pull CDH components together into a single cohesive application, but to really appreciate the flexibility of each of these components, we need to dive deeper.

Every story has a beginning, and every data pipeline has a source. So, to build Hadoop applications, we need to get data from a source into HDFS.

Apache Flume is one way to bring data into HDFS using CDH. The Apache Flume website describes Flume as “a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data.” At the most basic level, Flume enables applications to collect data from its origin and send it to a resting location, such as HDFS. At a slightly more detailed level, Flume achieves this goal by defining dataflows consisting of three primary structures: sources, channels and sinks. The pieces of data that flow through Flume are called events, and the processes that run the dataflow are called agents.

In the Twitter example, we used Flume to collect data from the Twitter Streaming API, and forward it to HDFS. Looking closer at the Flume pipeline from that example, we come away with a system like this:

 

In the rest of this post, we’ll take an in-depth look at the pieces of Flume that are used to build dataflows, and specifically, how they were used in the example.

Sources

A source is just what it sounds like: the part of Flume that connects to a source of data, and starts the data along its journey through a Flume dataflow. A source processes events and moves them along by sending them into a channel. Sources operate by gathering discrete pieces of data, translating the data into individual events, and then using the channel to process events one at a time, or as a batch.

Sources come in two flavors: event-driven or pollable. The difference between event-driven and pollable sources is how events are generated and processed. Event-driven sources typically receive events through mechanisms like callbacks or RPC calls. Pollable sources, in contrast, operate by polling for events every so often in a loop. Another good way to frame this differentiation is as a push-versus-pull model, where event-driven sources have events pushed to them, and pollable sources pull events from a generator.

Examining the TwitterSource

In our Twitter analysis example, we built a custom source called TwitterSource. To understand how sources operate more thoroughly, let’s look at how the TwitterSource was built. We can start with a very generic piece of boilerplate code:

/**
 * A template for a custom, configurable Flume source
 */
public class BoilerplateCustomFlumeSource extends AbstractSource
    implements EventDrivenSource, Configurable {

  /**
   * The initialization method for the Source. The context contains all the
   * Flume configuration info, and can be used to retrieve any configuration
   * values necessary to set up the Source.
   */
  @Override
  public void configure(Context context) {
    // Get config params with context.get* methods
    // Example: stringParam = context.getString("stringParamName")
  }

  /**
   * Start any dependent systems and begin processing events.
   */
  @Override
  public void start() {
    // For an event-driven source, the start method should spawn
    // a thread that will receive events and forward them to the
    // channel
    super.start();
  }

  /**
   * Stop processing events and shut any dependent systems down.
   */
  @Override
  public void stop() {
    super.stop();
  }
}
  

With this code, we have a configurable source that we can plug into Flume, although at this stage, it won’t do anything.

The start() method contains the bulk of the source’s logic. In the TwitterSource, the twitter4j library is used to get access to the Twitter Streaming API, using this block of code:

// The StatusListener is a twitter4j API, which can be added to a Twitter
// stream, and will execute callback methods every time a message comes in
// through the stream.
StatusListener listener = new StatusListener() {
  // The onStatus method is a callback executed when a new tweet comes in.
  public void onStatus(Status status) {
    Map headers = new HashMap();
    // The EventBuilder is used to build an event using the headers and
    // the raw JSON of a tweet
    headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
    Event event = EventBuilder.withBody(
        DataObjectFactory.getRawJSON(status).getBytes(), headers);

    try {
      getChannelProcessor().processEvent(event);
    } catch (ChannelException e) {
      // If we catch a channel exception, it’s likely that the memory channel
      // does not have a high enough capacity for our rate of throughput, and
      // we tried to put too many events in the channel. Error handling or
      // retry logic would go here.
      throw e;
    }
  }
       
  // This listener will ignore everything except for new tweets
  public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
  public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
  public void onScrubGeo(long userId, long upToStatusId) {}
  public void onException(Exception ex) {}
};

The StatusListener implements a set of callbacks that will be called when receiving a new tweet, represented by a Status object. There are other callbacks available but for the purposes of this source, we’re only concerned with new tweets. As can be seen in the TwitterSource, the StatusListener is created and registered in the start() method.

Looking a bit closer, we can pick out the code that actually builds an event out of a tweet:

headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
Event event = EventBuilder.withBody(
      DataObjectFactory.getRawJSON(status).getBytes(), headers));

The EventBuilder interface takes a byte array and an optional set of headers, and creates an event, which we’re putting on the end of a list. The source processes events as they come in and passes them along to the channel:

channel.processEvent(event);

In order to connect to the Twitter APIs, we need access to some application-specific secrets. In the TwitterSource, these are variables like the consumerKey and consumerSecret, which are used to setup the Twitter stream:

twitterStream.setOAuthConsumer(consumerKey, consumerSecret);

So, where did the consumerKey and consumerSecret get defined? For this source, these variables are configuration parameters. Taking a look at the configure() method, we can see where the variables are defined:

consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);

The context object contains all the configuration parameters for the source, which can be pulled out and stored in instance variables using a variety of get accessors.

With this code in place, the custom source will be able to process tweets as events. The next step is to define where those events should go and how they should get there.

Configuring the Flume Agent

Before we discuss how to actually configure a Flume agent, we need to know what a configuration looks like. For the Twitter analysis example, we used this configuration:

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = [required]
TwitterAgent.sources.Twitter.consumerSecret = [required]
TwitterAgent.sources.Twitter.accessToken = [required]
TwitterAgent.sources.Twitter.accessTokenSecret = [required]
TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientist, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

Each object that is defined will be referenced by these names throughout the rest of the configuration. Most Flume configuration entries will follow a format very similar to the configuration of log4j appenders. An entry will look like [agent_name].[object_type].[object_name].[parameter_name], where [object_type] is one of sources, channels, or sinks.

Channels

Channels act as a pathway between the sources and sinks. Events are added to channels by sources, and later removed from the channels by sinks. Flume dataflows can actually support multiple channels, which enables more complicated dataflows, such as fanning out for replication purposes.

In the case of the Twitter example, we’ve defined a memory channel:

TwitterAgent.channels.MemChannel.type = memory

Memory channels use an in-memory queue to store events until they’re ready to be written to a sink. Memory channels are useful for dataflows that have a high throughput; however, since events are stored in memory in the channel, they may be lost if the agent experiences a failure. If the risk of data loss is not tolerable, this situation can be remedied using a different type of channel – i.e., with one that provides stronger guarantees of data durability like a FileChannel.

Sinks

The final piece of the Flume dataflow is the sink. Sinks take events and send them to a resting location or forward them on to another agent. In the Twitter example, we utilized an HDFS sink, which writes events to a configured location in HDFS.

The HDFS sink configuration we used does a number of things: First, it defines the size of the files with the rollCount parameter, so each file will end up containing 10,000 tweets. It also retains the original data format, by setting the fileType to DataStream and setting writeFormat to Text. This is done instead of storing the data as a SequenceFile or some other format. The most interesting piece, however, is the file path:

TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/

The file path, as defined, uses some wildcards to specify that the files will end up in a series of directories for the year, month, day and hour during which the events occur. For example, an event that comes in at 9/20/2012 3:00PM will end up in HDFS at hdfs://hadoop1:8020/user/flume/tweets/2012/09/20/15/.

Where does the timestamp information come from? If you’ll recall, we added a header to each event in the TwitterSource:

headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));

This timestamp header is used by Flume to determine the timestamp of the event, and is used to resolve the full path where the event should end up.

Starting the Agent

Now that we understand the configuration of our source, channel and sink, we need to start up the agent to get the dataflow running. Before we actually start the agent, we need to set the agent to have the appropriate name as defined in the configuration.

The file /etc/default/flume-ng-agent contains one environment variable defined called FLUME_AGENT_NAME. In a production system, for simplicity, the FLUME_AGENT_NAME will typically be set to the hostname of the machine on which the agent is running. However, in this case, we set it to TwitterAgent, and we’re ready to start up the process.

We can start the process by executing

$ /etc/init.d/flume-ng-agent start

Once it’s going, we should start to see files showing up in our /user/flume/tweets directory:

natty@hadoop1:~/source/cdh-twitter-example$ hadoop fs -ls /user/flume/tweets/2012/09/20/05
  Found 2 items
  -rw-r--r--   3 flume hadoop   255070 2012-09-20 05:30 /user/flume/tweets/2012/09/20/05/FlumeData.1348143893253
  -rw-r--r--   3 flume hadoop   538616 2012-09-20 05:39 /user/flume/tweets/2012/09/20/05/FlumeData.1348143893254.tmp

As more events are processed, Flume writes to files in the appropriate directory. A temporary file, suffixed with .tmp, is the file currently being written to. That .tmp suffix is removed when Flume determines that the file contains enough events or enough time has passed to roll the file. Those thresholds are determined in the configuration of the HDFS sink, as we saw above, by the rollCount and rollInterval parameters, respectively.

Conclusion

In this article, you’ve seen how to develop a custom source and process events from Twitter. The same approach can be used to build custom sources for other types of data. Also, we’ve looked at how to configure a basic, complete dataflow for Flume, to bring data into HDFS as it is generated. A distributed filesystem isn’t particularly useful unless you can get data into it, and Flume provides an efficient, reliable way to achieve just that.

In Part 3, we’ll examine how to query this data with Apache Hive.

Jon Natkins (@nattybnatkins) is a Software Engineer at Cloudera

4 Responses

Leave a comment


+ 9 = ten