Tracking Trends with Hadoop and Hive on EC2

Categories: Community General Guest Hadoop

At Cloudera, we frequently work with leading Hadoop developers to produce guest blog posts of general interest to the community. We started a project with Pete Skomoroch a while back, and we were so impressed with his work, we’ve decided to bring Pete on as a regular guest blogger. Pete can show you how to do some pretty amazing things with Hadoop, Pig and Hive and has a particular bias towards Amazon EC2. With that, I’m happy to welcome Pete to the blog, and hope you enjoy his first post as much as we did. -Christophe was built by Data Wrangling to demonstrate how Hadoop and Amazon EC2 can be used with Rails to power a data-driven website.  This post will give an overview of how was put together and show some basic approaches for finding trends in log data with Hive.  The source code for trendingtopics is available on Github and a tutorial is provided on the Cloudera site which describes many of the data processing steps in greater detail.

The trendingtopics Rails application identifies recent trends on the web by periodically launching an EC2 cluster running Cloudera’s Distribution for Hadoop to process Wikipedia log files.  The cluster runs a Hive batch job that analyzes hourly pageview statistics for millions of Wikipedia articles, and then loads the resulting trend parameters into the application’s MySQL database.


Application Features

  • Ranked list of the most significant trends over the last 30 days along with total pageviews
  • Ranked list of rising articles that have been trending in the last 24 hours
  • Daily time series charts and sparklines for over 2.5 million Wikipedia articles
  • Autocomplete functionality and search results ranked by article trend score
  • Category based trends for People or Companies

How Hadoop is Used in the Application

  • Cleaning raw log data and joining article title strings with Wikipedia page IDs
  • Aggregating hourly time series data for daily pageview charts and sparklines
  • Running periodic trend estimation jobs and regressions
  • Generating statistics to power search autocomplete and the ranking of search results

Preprocessing the Raw Data

The daily pageview charts on the site were created by running an initial MapReduce job on 1TB of hourly traffic logs collected from Wikipedia’s squid proxy by Domas Mituzas.  We made the first seven months of this hourly log data available as an Amazon Public Data Set which covers the period from October 1, 2008 to April 30, 2009.  We run a cron job on the server to fetch the latest log files every hour and store a copy on Amazon S3 for processing by Hadoop.

The log files are named with the date and time of collection. Individual hourly files are around 55 MB when compressed, so eight months of compressed data takes up about 300 GB of space.

Each line in a pagecount file has four fields: projectcode, pagename, pageviews, and bytes:

$ grep ‘^en Barack’ pagecounts-20090521-100001
en Barack 8 1240112
en Barack%20Obama 1 1167
en Barack_H._Obama 1 142802
en Barack_H_Obama 3 428946
en Barack_H_Obama_Jr. 2 285780
en Barack_Hussein_Obama,_Junior 2 285606
en Barack_O%27Bama 1 142796
en Barack_Obama 701 139248439
en Barack_Obama%27s_first_100_days 2 143181
en Barack_Obama,_Jr 2 285755

Many records in the log file are actually Wikipedia redirects that point to other articles in the Wikipedia “Pages” table. As part of our Hadoop processing, we clean the page names and perform a join in Hive against the contents of a MySQL redirect table to find the true Wikipedia page_id for each page title.

Aggregating the Hourly Data

For the initial log file text normalization and filtering, we used a simple Hadoop Streaming job with Python.  The first MapReduce pass restricts pageviews to a subset of English Wikipedia pages, filters out bad records, and then sums hourly pageviews keyed by article-date. You will notice that the date is not actually in the raw log data itself, but is part of the filename. We can access this parameter in our streaming script using a Hadoop environment variable. You could fetch this job parameter in Java via job.get(“mapred.input.file”), but Hadoop also makes it available as an environment variable directly accessible to streaming jobs. For example, in Python you can access the file name as follows:

filepath = os.environ[“map_input_file”]
filename = os.path.split(filepath)[-1]

Our second MapReduce pass maps the daily aggregations by article name:

Barack_Obama 20090422 129
Barack_Obama 20090419 143
Barack_Obama 20090421 163
Barack_Obama 20090420 152

These records are merged at the reducers to generate a daily time series for each article in serialized JSON format.




These records are joined with the Wikipedia page ID using Hive, and the resulting output is loaded into MySQL where it is indexed for fast lookups by the web application.  For the first version of the site, we chose a simple key-value store approach where the time series for a single article is stored in JSON  format.   This format is simple to work with using R or Python and can be loaded quickly by the dynamic charts on the website.

Daily Trend Estimation

After the historical timeline aggregation was complete, we began running a daily batch job in Hive on EC2 to aggregate recent log data and detect topics trending over the previous 24 hours.  This MapReduce job only operates on the last 30 days of data for trend estimation, so it is less resource intensive than a pass over the full Wikipedia log dataset.

For MapReduce operations that go beyond the basic cleaning and aggregation of data, using higher level tools, such as Hive, can accelerate the development process. Hive supports a range of SQL-like operators, random sampling, bucketing of tables, and calling of external functions through custom MapReduce scripts.

To find trending articles, we load the latest timeline data into Hive and run a HiveQL query that applies a simple Python trend estimation algorithm to all 2.5 Million Wikipedia articles:

The file looks something like this:

The nice thing about this approach is that analysts who use Python or R can easily substitute more advanced algorithms by replacing the calc_daily_trend method.  We describe the trend estimation process in more detail in the tutorial on building data intensive web applications with Hadoop.

Running the Batch Jobs with Hadoop and Hive on EC2

One of the advantages of using the Cloudera Distribution for Hadoop on EC2 is that it gives you full control over the configuration of your Hadoop cluster and operating system environment.  To run batch jobs for, we use the Cloudera command line tools within cron or Rake tasks to launch Hadoop clusters on EC2:

./hadoop-ec2 launch-cluster –user-packages ‘r-base python-simplejson git-core s3cmd’ my-hadoop-cluster 10

By using a combination of command line options and simple configuration files, you can install packages and modify settings to meet the needs of your specific MapReduce jobs.  If you have any additional commands you want to run immediately after your Hadoop EC2 cluster launches, then you can add them to the bottom of the file

For trendingtopics, we added a few lines of code to fetch the trendingtopics source code from GitHub and start our daily batch job:

Running the Rails Application

If you want to explore the application further, you can on check out the code on Github. Note that the latest version of the trending topics code is in the “experimental” branch.

Dependencies for local development:

  • Ruby (1.8.7)
  • Ruby Gems (1.3.1)
  • Capistrano (v2.5.5)
  • Rails (2.3.2)

Running locally in development mode:

Fetch the trending topics source code:

git clone git://

Navigate to the root of the source code directory and create the necessary configuration files from the provided examples:

$ cd trendingtopics
$ cp config/config.yml.example config/config.yml
$ cp config/database.yml.example config/database.yml

Run the normal Rails gem installations to get any missing dependencies:

$ rake gems:install

Note that we also use the following plugins (already included in /vendor):

  • autocomplete
  • annotated-timeline
  • gc4r (modified a bit for custom sparklines)

Create the database:

$ rake db:create
$ rake db:migrate

Populate the app with 2 months of demo data from 100 wiki articles:

$ rake db:develop

Launch the Rails app itself:

$ script/server
=> Booting Mongrel
=> Rails 2.3.2 application starting on
=> Call with -d to detach
=> Ctrl-C to shutdown server

Navigate to http://localhost:3000/ to access the application.

Deploying the Rails App to EC2

The Rails app and MySQL database are deployed to Amazon EC2 using Paul Dowman’s EC2 on Rails.  To deploy the application to EC2, first install the EC2 on Rails gem as described at

$ sudo gem install ec2onrails

Find AMI ID of the latest 32-bit EC2 on Rails image (in our case this was ami-5394733a):

$ cap ec2onrails:ami_ids

Launch an instance of the latest EC2 on Rails AMI and note the returned instance address from ec2-describe-instances. It will be something like

$ ec2-run-instances ami-5394733a -k gsg-keypair
$ ec2-describe-instances

Fetch the trendingtopics source code from Github as shown in the previous section. Create the necessary configuration files from the examples provided and edit them, filling in your EC2 instance address information, keypairs, and other configuration details as indicated in the comments of each file. See the ec2onrails documentation or source code for more details on each setting.

$ cp config/deploy.rb.example config/deploy.rb
$ cp config/s3.yml.example config/s3.yml
$ cp config/config.yml.example config/config.yml
$ cp config/database.yml.example config/database.yml

Be sure to substitute your own AWS key and secret key in both config.yml and s3.yml (you can leave these out of s3.yml and ec2onrails will still work – it just won’t back up MySQL or the log files).

aws_secret_access_key: YYVUYVIUBIBI
aws_access_key_id: BBKBBOUjbkj/BBOUBOBJKBjbjbboubuBUB

Deploy the app to your launched EC2 instance with Capistrano (this will take several minutes).

$ cap ec2onrails:setup
$ cap deploy:cold

You should now be able to access your app from a web browser or as a web service at the URL of the instance you provided in deploy.rb: You can also SSH into your running EC2 instance as usual with your keypairs to debug any issues. See the EC2 on Rails forums for more help with debugging.

To redeploy the app after making changes to the base trending topics code, just do the usual cap deploy:

$ cap deploy

No data will be populated in the production-deployed app until you run the included Hadoop jobs and import the resulting data to MySQL. To test the deployment, you can use Capistrano to run the db:develop task on the EC2 server; just wipe the dev data before loading real production data.

Possible Next Steps with Hadoop and Trending Topics

  • Implement alternate trend-detection algorithms
  • Use the Cloudera EBS (beta) to persist the timeline data in HDFS between runs, appending each hour of data to partitioned Hive tables
  • Explore using partitions with Hive for fast time window queries
  • Generate smaller representative sample datasets for R&D with Hive bucket sampling
  • Use Wikipedia text content to find trends for individual words and phrases
  • Use the Wikipedia link graph dataset to show related articles for each trend
  • Find correlated Wikipedia articles based on page views

10 responses on “Tracking Trends with Hadoop and Hive on EC2

  1. Otis Gospodnetic

    Interesting stuff. Did I stumble upon a bug, though?
    Note how Wikipedia has pages for Simpy and SimPy.
    I can find both at if I search for “simpy”.
    Then, when I click on “Simpy”, I end up on , which looks correct. But note how the pageview chart is actually for SimPy (not Simpy)!

    Is this a bug?

  2. nikhil

    Great article guys, thanks for sharing. Time and again, I find that Cloudera has the best tutorials and articles about everything Hadoop :)