Grouping Related Trends with Hadoop and Hive
(guest blog post by Pete Skomoroch)
In a previous post, I outlined how to build a basic trend tracking site called trendingtopics.org with Cloudera’s Distribution for Hadoop and Hive. TrendingTopics uses Hadoop to identify the top articles trending on Wikipedia and displays related news stories and charts. The data powering the site was pulled from an Amazon EBS Wikipedia Public Dataset containing 8 months of hourly pageview logfiles. In addition to the pageview logs, the EBS data volume also includes the full text content and link graph for all articles. This post will use that link graph data to build a new feature for our site: grouping related articles together under a single “lead trend” to ensure the homepage isn’t dominated by a single news story.
Finding Related Trends Using Wikipedia Link Graph Data
For several weeks this summer, the death of Michael Jackson dominated the news and drove a large number of pageviews on Wikipedia. The hourly data in the following chart was downloaded from trendingtopics.org during the last week in June:
This burst of interest in the Jackson family, Michael’s albums, and his medical conditions pushed most topics unrelated to MJ off the front page of our site:
Ideally, TrendingTopics would continue to show a range of stories when this type of event happens. Automated news sites like Techmeme or Google News display clusters of rising articles across a number of topics to provide a better mix of content on the homepage. For example, Techmeme shows a ranked list of related blog posts that link back to the lead article for each story:
We can build a simple version of this functionality with Hadoop by combining the article trend estimates we computed in the previous post with the Wikipedia link graph. Wikipedia provides a periodic database dump which includes a file with the outgoing pagelinks for each article. The June data dump includes a 12GB pagelink file named “enwiki-20090618-pagelinks.sql” containing ~ 13K SQL insert statements:
INSERT INTO `pagelinks` VALUES (5588,0,'Pinar_del_Río'),(5588,0,'Planned_economy'),.. INSERT INTO `pagelinks` VALUES (5845,0,'Attorney_General_of_Colombia'),(5845,0,'Auditor_General_of_Colombia'),... INSERT INTO `pagelinks` VALUES (6187,0,'Wallraf-Richartz_Museum'),(6187,0,'Wangen_im_Allgäu'),... ...
The first step in our data preparation uses a Hadoop Streaming job to convert the Mediawiki SQL insert format into a tab delimited text file ready for further processing with Hive. The job uses a Python mapper called “parse_links.py” to convert the SQL insert statements into a tab delimited format. The “parse_links.py” script uses a regular expression to extract the graph edges from each SQL statement and emits tab-delimited record if the page_id belongs to namespace 0 (this indicates the page is an article).
import sys, os, re insert_regex = re.compile('''INSERT INTO \`pagelinks\` VALUES (.*)\;''') row_regex = re.compile("""(.*),(.*),'(.*)'""") for line in sys.stdin: match = insert_regex.match(line.strip()) if match is not None: data = match.groups(0) rows = data[1:-1].split("),(") for row in rows: row_match = row_regex.match(row) if row_match is not None: # >>> row_match.groups() # (12,0,'Anti-statism') # # page_id, pl_namespace, pl_title if row_match.groups() == '0': page_id, pl_title = row_match.groups(), row_match.groups() sys.stdout.write('%s\t%s\n' % (page_id, pl_title))
You can test this script at the unix command line on a single machine before running it on Hadoop. Here we pipe a sample of the file into the mapper script and then examine the results with grep. The first column of the resulting output is the Wikipedia page_id and the second column contains other Wikipedia article titles linked to by that page_id.
$ head -35 enwiki-20090618-pagelinks.sql | ./parse_links.py > links.txt $ grep 'Super' links.txt | more 303 Super_Outbreak 303 Talladega_Superspeedway 324 Super_Bowl 594 Tarquinius_Superbus 615 List_of_Super_Bowl_champions 615 List_of_Super_Bowl_records 615 Super_Bowl 615 Super_Bowl_XIX
We ran this script on the entire pagelinks file using a temporary Cloudera Hadoop cluster on Amazon EC2, and saved the output of the conversion process in an Amazon S3 folder called “links”. To work with these tab delimited link files on a new Hadoop cluster, we use distcp on the master node to pull the output data into HDFS from S3. We also pull in an archived export of our trendingtopics “pages” data which contains the page_id -> title mapping and monthly trend score for each article:
$ hadoop distcp s3n://trendingtopics/wikidump/links links $ hadoop fs -rmr links/_distcp_logs* $ hadoop distcp s3n://trendingtopics/archive/20090816/pages pages $ hadoop fs -rmr pages/_distcp_logs*
We will use a set of Hive queries on the link graph data to generate the ranked list of top trending articles linking back to each Wikipedia. After our data is in hdfs, we can start the Hive CLI on the Hadoop master node:
In the Hive shell, we create a “links” table and load the raw data from hdfs:
hive> CREATE TABLE links ( page_id BIGINT, pl_title STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE; hive> LOAD DATA INPATH 'links' OVERWRITE INTO TABLE links;
Now we have the “outlink” data for each page in Hive, but we want to reverse the keys so that we can find all trending page titles which link back to a given page_id. We can map the original page_id’s to article titles and pull in the trend data using a Hive JOIN:
hive> CREATE TABLE temp_backlinks ( pl_title STRING, page_id BIGINT, bl_title STRING, monthly_trend DOUBLE) ROW FORMAT DELIMITED FIELDS TERMINATED BY '01' STORED AS TEXTFILE; hive> INSERT OVERWRITE TABLE temp_backlinks select links.pl_title, pages.page_id, pages.redirect_title, pages.monthly_trend from links JOIN pages ON (pages.page_id = links.page_id);
A second join is used to convert the original outlink titles to page_ids for use in our web application’s MySQL database:
hive> CREATE TABLE backlinks ( page_id BIGINT, bl_title STRING, monthly_trend DOUBLE) ROW FORMAT DELIMITED FIELDS TERMINATED BY '01' STORED AS TEXTFILE; hive> INSERT OVERWRITE TABLE backlinks select pages.page_id, temp_backlinks.bl_title, temp_backlinks.monthly_trend from pages JOIN temp_backlinks ON (pages.redirect_title = temp_backlinks.pl_title);
The final stage of processing involves generating an array of the top trending backlink urls for each Wikipedia page_id. Later on, we can export the entire lookup table “backlinks_reduced” to MySQL for use in the trendingtopics Rails app. We use a Hive transform written in Python to rank the backlinks and concatenate the top 10 titles into a comma delimited text string for each page_id:
hive> CREATE TABLE backlinks_reduced ( page_id BIGINT, backlinks STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '01' STORED AS TEXTFILE; hive> add FILE /mnt/hive_backlink_mapper.py; hive> add FILE /mnt/hive_backlink_reducer.py; hive> FROM ( FROM backlinks MAP backlinks.page_id, backlinks.bl_title, backlinks.monthly_trend USING 'python hive_backlink_mapper.py' CLUSTER BY key) map_output INSERT OVERWRITE TABLE backlinks_reduced REDUCE map_output.key, map_output.value USING 'python hive_backlink_reducer.py' AS page_id, backlinks;
The streaming mapper and reducer we used are shown below:
import sys, os for line in sys.stdin: try: page_id, bl_title, score = line.strip().split("\t") sys.stdout.write('%s\t%s\t%s\n' % (page_id, bl_title, score)) except: # just skip possible bad rows for now pass
import sys, os def top_backlinks(backlinks, scores): # return top 10 backlinks based on score metric (trend) scorevals,links = zip( *sorted( zip (scores,backlinks))) toplinks = list(links) toplinks.reverse() backlink_str = '[%s]' % ','.join(toplinks[:10]) return backlink_str # For each page, emit backlinks sorted by score desc last_page, backlinks, scores = None, ,  for line in sys.stdin: try: (page, backlink, score) = line.strip().split("\t") if last_page != page and last_page is not None: backlink_string = top_backlinks(backlinks, scores) print "%s\t%s" % (last_page, backlink_string) backlinks =  scores =  last_page = page backlinks.append(backlink) scores.append(float(score)) except: pass backlink_string = top_backlinks(backlinks, scores) print "%s\t%s" % (last_page, backlink_string)
To examine the results, we run a join query against the pages table to find the top trending backlinks for the Wikipedia article on “Swine Flu”:
hive> SELECT backlinks_reduced.backlinks FROM backlinks_reduced JOIN pages ON (pages.page_id = backlinks_reduced.page_id) WHERE redirect_title="Swine_influenza"; [Influenza_A_virus_subtype_H1N1,1918_flu_pandemic,Guillain-Barré_syndrome, Oseltamivir,Influenza_treatment,Influenza_vaccine,Zanamivir, Orthomyxoviridae,Pig,Amantadine]
This looks like a reasonable list of related pages. Here are a few more trending topics from the same time period:
Julia Child [Julie_Powell,Julie_&_Julia,August_13,August_15,Meryl_Streep, Mastering_the_Art_of_French_Cooking,Le_Cordon_Bleu,Amy_Adams, Simone_Beck,The_French_Chef]
John Hughes [The_Breakfast_Club,Molly_Ringwald,Sixteen_Candles,Pretty_in_Pink, Weird_Science_(film),Some_Kind_of_Wonderful_(film),Curly_Sue, Home_Alone_3,Uncle_Buck,Home_Alone_(film)]
Quentin Tarantino [Inglourious_Basterds,Machete_(film),Pulp_Fiction_(film), Eli_Roth,Bruce_Willis,Rosario_Dawson,Robert_Rodriguez, Grindhouse_(film),Julie_Dreyfus,Death_Proof]
At this point, we have what looks like a reasonable approach for generating related links with a low amount of effort. The Rails application can apply filters to remove titles like “August_15″ from each list and display the related links around each trend.
Here are a few ideas for next steps with the Wikipedia link graph data and Hadoop:
- Show related articles by using the backlink ids as a feature vector to compute article similarity
- Use Pagerank to filter articles displayed in trends (see Running PageRank on Wikipedia)
- Compute TFIDF weights for Wikipedia articles and incorporate it into the similarity calculation
If you are interested in digging deeper into the link data with Python and Hadoop, you should also check out Viraj Bhat and Jake Hofman’s talk “Cool Development Projects at Yahoo!: Automatic Tuning and Social Graph Analysis” next week at Hadoop World NYC. I’ll also giving a talk that afternoon called “Building Data Intensive Apps: A closer look at TrendingTopics.org”