How-to: Analyze Twitter Data with Apache Hadoop

Categories: CDH Data Ingestion Flume General Hive How-to Oozie

Social media has gained immense popularity with marketing teams, and Twitter is an effective tool for a company to get people excited about its products. Twitter makes it easy to engage users and communicate directly with them, and in turn, users can provide word-of-mouth marketing for companies by discussing the products. Given limited resources, and knowing we may not be able to talk to everyone we want to target directly, marketing departments can be more efficient by being selective about whom we reach out to.

In this post, we’ll learn how we can use Apache Flume, Apache HDFS, Apache Oozie, and Apache Hive to design an end-to-end data pipeline that will enable us to analyze Twitter data. This will be the first post in a series. The posts to follow to will describe, in more depth, how each component is involved and how the custom code operates. All the code and instructions necessary to reproduce this pipeline is available on the Cloudera Github.

Who is Influential?

To understand whom we should target, let’s take a step back and try to understand the mechanics of Twitter. A user – let’s call him Joe – follows a set of people, and has a set of followers. When Joe sends an update out, that update is seen by all of his followers. Joe can also retweet other users’ updates. A retweet is a repost of an update, much like you might forward an email. If Joe sees a tweet from Sue, and retweets it, all of Joe’s followers see Sue’s tweet, even if they don’t follow Sue. Through retweets, messages can get passed much further than just the followers of the person who sent the original tweet. Knowing that, we can try to engage users whose updates tend to generate lots of retweets. Since Twitter tracks retweet counts for all tweets, we can find the users we’re looking for by analyzing Twitter data.

Now we know the question we want to ask: Which Twitter users get the most retweets? Who is influential within our industry?

How Do We Answer These Questions?

SQL queries can be used to answer this question: We want to look at which users are responsible for the most retweets, in descending order of most retweeted. However, querying Twitter data in a traditional RDBMS is inconvenient, since the Twitter Streaming API outputs tweets in a JSON format which can be arbitrarily complex. In the Hadoop ecosystem, the Hive project provides a query interface which can be used to query data that resides in HDFS. The query language looks very similar to SQL, but allows us to easily model complex types, so we can easily query the type of data we have. Seems like a good place to start. So how do we get Twitter data into Hive? First, we need to get Twitter data into HDFS, and then we’ll be able to tell Hive where the data resides and how to read it.

The diagram above shows a high-level view of how some of the CDH (Cloudera’s Distribution Including Apache Hadoop) components can be pieced together to build the data pipeline we need to answer the questions we have. The rest of this post will describe how these components interact and the purposes they each serve.

Gathering Data with Apache Flume

The Twitter Streaming API will give us a constant stream of tweets coming from the service. One option would be to use a simple utility like curl to access the API and then periodically load the files. However, this would require us to write code to control where the data goes in HDFS, and if we have a secure cluster, we will have to integrate with security mechanisms. It will be much simpler to use components within CDH to automatically move the files from the API to HDFS, without our manual intervention.

Apache Flume is a data ingestion system that is configured by defining endpoints in a data flow called sources and sinks. In Flume, each individual piece of data (tweets, in our case) is called an event; sources produce events, and send the events through a channel, which connects the source to the sink. The sink then writes the events out to a predefined location. Flume supports some standard data sources, such as syslog or netcat. For this use case, we’ll need to design a custom source that accesses the Twitter Streaming API, and sends the tweets through a channel to a sink that writes to HDFS files. Additionally, we can use the custom source to filter the tweets on a set of search keywords to help identify relevant tweets, rather than a pure sample of the entire Twitter firehose. The custom Flume source code can be found here.

Partition Management with Oozie

Once we have the Twitter data loaded into HDFS, we can stage it for querying by creating an external table in Hive. Using an external table will allow us to query the table without moving the data from the location where it ends up in HDFS. To ensure scalability, as we add more and more data, we’ll need to also partition the table. A partitioned table allows us to prune the files that we read when querying, which results in better performance when dealing with large data sets. However, the Twitter API will continue to stream tweets and Flume will perpetually create new files. We can automate the periodic process of adding partitions to our table as the new data comes in.

Apache Oozie is a workflow coordination system that can be used to solve this problem. Oozie is an extremely flexible system for designing job workflows, which can be scheduled to run based on a set of criteria. We can configure the workflow to run an ALTER TABLE command that adds a partition containing the last hour’s worth of data into Hive, and we can instruct the workflow to occur every hour. This will ensure that we’re always looking at up-to-date data.

The configuration files for the Oozie workflow are located here.

Querying Complex Data with Hive

Before we can query the data, we need to ensure that the Hive table can properly interpret the JSON data. By default, Hive expects that input files use a delimited row format, but our Twitter data is in a JSON format, which will not work with the defaults. This is actually one of Hive’s biggest strengths. Hive allows us to flexibly define, and redefine, how the data is represented on disk. The schema is only really enforced when we read the data, and we can use the Hive SerDe interface to specify how to interpret what we’ve loaded.

SerDe stands for Serializer and Deserializer, which are interfaces that tell Hive how it should translate the data into something that Hive can process. In particular, the Deserializer interface is used when we read data off of disk, and converts the data into objects that Hive knows how to manipulate. We can write a custom SerDe that reads the JSON data in and translates the objects for Hive. Once that’s put into place, we can start querying. The JSON SerDe code can be found here. The SerDe will take a tweet in JSON form, like the following:

and translate the JSON entities into queryable columns:

which will result in:

We’ve now managed to put together an end-to-end system, which gathers data from the Twitter Streaming API, sends the tweets to files on HDFS through Flume, and uses Oozie to periodically load the files into Hive, where we can query the raw JSON data, through the use of a Hive SerDe.

Some Results

In my own testing, I let Flume collect data for about three days, filtering on a set of keywords:
hadoop, big data, analytics, bigdata, cloudera, data science, data scientist, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

The collected data was about half a GB of JSON data, and here is an example of what a tweet looks like. The data has some structure, but certain fields may or may not exist. The retweeted_status field, for example, will only be present if the tweet was a retweet. Additionally, some of the fields may be arbitrarily complex. The hashtags field is an array of all the hashtags present in the tweets, but most RDBMS’s do not support arrays as a column type. This semi-structured quality of the data makes the data very difficult to query in a traditional RDBMS. Hive can handle this data much more gracefully.

The query below will find usernames, and the number of retweets they have generated across all the tweets that we have data for:

For the few days of data, I found that these were the most retweeted users for the industry:

From these results, we can see whose tweets are getting heard by the widest audience, and also determine whether these people are communicating on a regular basis or not. We can use this information to more carefully target our messaging in order to get them talking about our products, which, in turn, will get other people talking about our products.


In this post we’ve seen how we can take some of the components of CDH and combine them to create an end-to-end data management system. This same architecture could be used for a variety of applications designed to look at Twitter data, such as identifying spam accounts, or identifying clusters of keywords. Taking the system even further, the general architecture can be used across numerous applications. By plugging in different Flume sources and Hive SerDes, this application can be customized for many other applications, like analyzing web logs, to give an example. Grab the code, and give it a shot yourself.

Jon Natkins (@nattybnatkins) is a Software Engineer at Cloudera, where he has worked on Cloudera Manager and Hue, and has contributed to a variety of projects in the Apache Hadoop ecosystem. Prior to Cloudera, Jon wrangled databases at Vertica. He holds an Sc.B in Computer Science from Brown University.


22 responses on “How-to: Analyze Twitter Data with Apache Hadoop

  1. Jef Spicoli

    Seems like alot of work. Also, since Twitter is realtime, I notice you’re periodically using Ooozie to load files in to Hive so they can be queried. Can you do this in real-time? If not, thats unfortunate.

    1. Jon Natkins Post author

      As far as I can tell, the links are working…Could you double check if they’re still broken, and if so, show me the link you’re trying to access?

  2. Jon Natkins Post author

    Hey Jeff,

    You could definitely do this with out the hour delay if you loaded all the data into a single directory, and had a non-partitioned table. The advantage of partitioning the table is that you can take advantage of Hive’s partition pruning functionality, which will speed up queries that only need to look at particular periods of time. If that’s not something of interest.


  3. Jean-Pierre

    Hi Jon.

    Interesting post. We had a similar approach using Flume to aggregate LOG information from our realtime application. We realized that we would end up with a huge amount of very, very small files on HDFS, produced by the Flume HDFS sink. Since HDFS is not designed for small blocks we thought thats not a good idea and didn’t proceed with this approach.

    I would like to know how you do the house keeping on HDFS? Do you flush the files written by HDFS sink once you have a partition in the table?

    What about the HDFS mantra: write once, read forever?

    From my point of view it should be possible to aggregate a certain amount of tweets (events) within Flume before it’s written to the disk to archive a reasonable block size. But, that’s again never realtime.

    What about using HBase as HDFS sink and run PIG jobs to analyse the data. Besides the fact that this approach is again not realtime (cause PIG is batch oriented) one could remove Oozie from the architecture.


    1. Jon Natkins Post author

      Hi Jean-Pierre,

      A few thoughts: I hear you on the small files issue. I didn’t really address that in this post, but one tactic could be to combine the files for an hour’s data before adding the partition to Hive. That way, you’d end up with a smaller number of files. This isn’t so dissimilar from what HBase does during a major compaction. The other option is to increase the batch size for the Flume source, so that it writes files that are larger. However, with both of these, you’re still going to have trouble achieving truly real-time results. Using HBase may be the best option, since it will manage the files for you, and you can still potentially query HBase using Hive.


  4. Jay

    Natty, interesting approach. I’ve been trying to do something similar. Tell me something, why not just use Flume decorators to convert Twitter JSON to a more friendly format? Also, isn’t flume an overkill for data collection, when you are opening just one single connection to Twitter Streaming API?

  5. Jay

    Also, about your comment on how Flume should batch more records before writing to HDFS – I don’t think that would be optimal – If I read the flume doc correctly, Flume is tuned for a data transfer size of 32KB.

    1. Jon Natkins Post author


      You could convert the JSON to something else, but the point of the post was to show how you could avoid having to do any post-processing by using the Hive SerDe interface. As for Flume being overkill, I don’t think it’s overkill at all. This is a great use case for Flume. We have data coming in from one place, and we need to get it somewhere else. This is precisely what Flume is for.

      As for the data transfer size, I think I may have misspoken slightly. You can tune the size of the flushes to HDFS by changing the batch size on the HDFS sink. The roll count can be used to control the number of events written to a particular file.


  6. R. Klopman

    This is a very useful tutorial. I had however a problem with the software package on I got compilation errors because of a not implemented abstract function and the missing symbol ‘setIncludeEntities’.

    Twitter4j has moved to versions 3.x and the function FilterQuery.setIncludeEntities has disappeared. I made a quick fix by changing in pom.xml the version property of twitter4j to 2.2.6 instead of [2.2,). In the pom.xml there are also references to CDH4.0.1, while we currently are at later versions.

    I would appreciate if the software could be upgraded to twitter4j 3.x, and the could tell where to set the right hadoop versions.

  7. Allwin Fernandez

    Interesting post.However when I try to execute the oozie workflow as mentioned,I am getting the error “variable[wfInput] cannot be resolved.Please help me to resolve it.

  8. Dan Sandler

    This sample Twitter agent is a great way to learn the Hadoop stack. Once the example works (it takes time) the flume Twitter Agent will stream tweets in real-time to your HDFS.

    With regards to the real-time question at the top of the post, once the tweets land on your HDFS, there may be a 4-minute lag before the Oozie workflow adds a new partition to the external table. Such latency may disqualify the app as real-time, but understand this lag is a worst case scenario (i.e. it only happens when the Oozie workflow adds a partition to the external table) and I believe the 4 minutes in between Oozie job execution is configurable (admittedly I have taken the defaults).

    Once the Oozie workflow adds a partition and, based on the application which adds a partition every time an subdirectory is added to HDFS on an hourly basis, then any tweet appearing in that hour will be IMMEDIATELY visible in the external table BECAUSE it is an external table.

    The flume portion of the example is extremely impressive, I tweeted from my Twitter account and before I could type the “hadoop fs -cat … | grep –I “ command on the tweet file, I saw my tweet on my local HDFS. So what I was seeing was near real-time streaming from twitter to HDFS.

    Regarding the error variable[wfInput] cannot be resolved, this happens when the Oozie workflow tries to roll forward one hour AND that hour (or more accurately the specified time increment) doesn’t exist on HDFS.

    For instance, in I have the following parameters. Let’s say Flume is streaming the tweets to my HDFS folder, /user/flume/tweets/*. If I have Flume running from 8AM-9AM, but then I pause Flume from 10AM-11AM and resume from 12PM-20PM, then I will get the error “variable[wfInput] cannot be resolved when Oozie tries to add the partitions for 10AM *aka 2013011710) and 11AM (aka 2013011711) to the external table TWEETS.


    Hopefully this makes sense, it took me a while to understand what was happening and to recognize the “wfInput” error as a soft error (at least in my case).

  9. Dan Sandler

    I made a few changes from above, the what/why of these changes are listed below. For me, this gets the example closer to real-time, and you can get even closer by tweaking the flume.conf parameters (specifically hdfs.rollCount, hdfs.batchSize, or hdfs.rollInterval).

    1) I removed the following tags (i.e. readyIndicator) from coord-app.xml. I didn’t want to wait 1 hour for the logs to roll forward

    2) I “hid” the in-process Flume.nnnnnnnnnnnnnn.tmp files from map reduce. Otherwise you’ll get an exception (reported in To avoid this exception:
    * I implemented the workaround referenced in!topic/cdh-user/FWH80lehYxk. I created a Java class (code below) to specify a pathFilter of .tmp files, and I created a JAR we can reference in hive-site.xml.
    * I placed the JAR in the following property in hive-site.xml and made reference the pathFilter.class, thereby ensuring that any hive connections will exclude the Flume temp files. I also copied this jar to /usr/lib/hadoop and /usr/lib/hdfs



    3) I bounced the hive servers

    I tested this several times, first without the oozie workflow running. Basically I renamed an existing Flume file so it became a .tmp file, and I got a count * on the tweets table. As expected, the total tweet count decreased by the number of tweets in the Flume file.

    I then started the oozie workflow and as the workflow added partitions to the table (as expected), the Flume agent wrote to the .tmp files. I queried the tweets table as it happened and fortunately, the .tmp file was excluded from the query.

    Here is the Java code (BTW I am not a Java developer, the code is lifted nearly verbatim from!topic/cdh-user/FWH80lehYxk).

    Hope this helps.


    package com.twitter.util;

    import java.util.ArrayList;
    import java.util.List;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.PathFilter;

    public class FileFilterExcludeTmpFiles implements PathFilter {
    public boolean accept(Path p) {
    String name = p.getName();
    return !name.startsWith(“_”) && !name.startsWith(“.”) && !name.endsWith(“.tmp”);

  10. Robin Brouwer


    Like some other above, I’m getting “variable [wfInput] cannot be resolved” when running the oozie workflow. Can anybody help me resolve this isuee?

    Dan Sandler in some posts above suggests: “…this happens when the Oozie workflow tries to roll forward one hour AND that hour (or more accurately the specified time increment) doesn’t exist on HDFS…”. However, flume is running, the right folders/files exist on HDFS and the timings all seem right.

    Any help much appreciated.

  11. Pranesh Pandurangan

    Can you explain where you do each of these steps? The readme in the github is not clear on which steps happen on the server with cloudera manager, and which ones happen in the client nodes

  12. Rajiv

    I am new to this. A couple of basic questions :
    1) In this approach does same data get stored twice- 1) In HDFS and 2) external table in HIVE
    2) Is the data stored in HIVE in JSON format?
    2) When we run a query in HIVE, does it call the appropriate SerDe interface to interpret the data? Is this transparent to the user of the query?

  13. Kartik

    Thank you for the github tutorial. It is very helpful for beginners like me to get started on using hadoop for analyses.
    I have been working on the github tutorial for about a month, I seem to be able to execute all the steps given on the tutorial on a Cloudera VM. But I have not been able to get tweets to start streaming to the Flume directory in HDFS. Are there any checks that I can do to make sure I have done everything right?
    Is there a way I can verify that my flume configuration is correct?
    Any help would be appreciated.


  14. venu


    I downloaded cloudera quick start vm , iam unable workout with oozie and flume , please send me good tutorial with examples..


  15. Eralp


    Thank you for the tutorial. I am conducting a project with my class this semester and we are basically focusing on compliance improvements on social media. For our prototype, we are planning to use Twitter. Everything looks so far but instead of who has got the most retweets, we are planning to address some keywords, that can be create a potential problem, and filter them. Is there any chance we can do this by modifying the existing code on this tutorial ? Any help will be appreciated greatly.