How Treato Analyzes Health-related Social Media Big Data with Hadoop and HBase
- by Assaf Yardeni
- May 03, 2012
- 1 comment
This is a guest post by Assaf Yardeni, Head of R&D for Treato, an online social healthcare solution, headquartered in Israel.
Three years ago I joined Treato, a social healthcare analysis firm to help treato.com scale up to its present capability. Treato is a new source for healthcare information where health-related user generated content (UGC) from the Internet is aggregated and organized into usable insights for patients, physicians and other healthcare professionals. With oceans of patient-written health-related information available on the Web, and more being published each day, Treato needs to be able to collect and process vast amounts of data – Treato is Big Data par excellence, and my job has been to bring Treato to this stage.
Before the Hadoop era
When I arrived at Treato, the team had already developed a Microsoft-based prototype that could organize a limited amount of health-related UGC into relevant insights, as a proof of concept. The system would:
- Crawl the Web and fetch raw HTML sources,
- Extract the user-generated content (i.e. user’s posts) out of the raw sources,
- Extract concepts from the posts and index them,
- Execute semantic analysis on the posts using natural language processing (NLP) algorithms
- And calculate statistics.
The prototype was able to prove the initial hypothesis that relevant medical insights can be found in social media, you just have to know how to analyze it. We collected data from dozens of websites and individual social media posts in the tens of millions. We had a handful of text analysis algorithms and could only process a couple million posts per day, but the results were impressive. We found that we were able to identify side effects through social media long before initial FDA or pharmaceutical companies issued warnings about them. For example, when we looked at the discussions about Singulair – an asthma medication – we found that almost half of the user generated content discussed mental disorders. When we looked back through the historical data, we learned that this would have been identifiable in our data four years before the official warning.
In order to gain even more health-related insights, we knew we needed a solution that could crawl and process a larger quantity of data – larger by an order of magnitude. That was the point at which Web scale joined the game. In order to collect massive amounts of posts, we needed to add thousands of data sources. And, of course, all the data we collected would need to be analyzed.
Dealing with a few dozen websites was difficult and costly. But we were able to scale up our Microsoft code to handle collection from a several hundred sites, and could process around 250 million posts. We were running a few old IBM boxes that did the collection work and had developed a job manager that administered crawling and fetching tasks. Different servers ran the indexing and the stats calculations, and we had developed a distributed job manager to direct task executions. Different servers were used for serving the data. We didn’t have any storage solution, and all of the boxes worked with local drives.
Besides the fact that administering the process was hell, it was expensive in terms of CPU, network and input/output (I/O); e.g., after each stage, the data needed to be moved to a different server for the next stage. In addition, our job manager didn’t deal with failures; every time a task failed we needed to handle it manually. Needless to say, supporting collection and analysis of thousands of websites would have been impossible using this approach.
Looking at scale
In the beginning of 2010, we started searching for solutions that could support the capabilities we wanted. The requirements included:
- Reliable and scalable storage.
- Reliable and scalable processing infrastructure.
- Search engine capabilities (for retrieving posts) with high availability (HA).
- Scalable real-time store for retrieving stats, with HA.
We wanted the ability to periodically reprocess the data in a timely manner, so new algorithms or other analysis improvements would take effect on all historical data.
We wanted to know how much it costs to deal with X number of posts, and to be able to scale according to this formula.
We wanted a technology and architecture that would scale with the business.
We searched for answers to questions such as: “How does Google do it?” and it didn’t take too long to find Google’s papers, documentation on Hadoop and MapReduce, and so on.
We started digging deeper in these areas. After a short investigation, it was clear that the Hadoop Distributed File System (HDFS) would support our storage demands, and MapReduce would be a good fit for the processing infrastructure.
First Hadoop cluster in the lab
While looking for Hadoop distributions, I encountered Cloudera’s Distribution including Apache Hadoop (CDH), however, I decided to start with a manual installation since this usually helps me better understand how things work. We started a pilot, setting up a 2 node cluster on Linux boxes. As mentioned, the first installation was done totally manually using the binaries downloaded from Apache, and gently configuring the system. This process was ugly: I needed to download all sorts of binaries from different sources, deal with networking issues, exchange of SSH keys between the nodes, formatting the FS and all sorts of OS tweaks.
We started testing the behavior of the new technology, first with some simple WordCount and pi calculations, and then we quickly wrote MapReduce (Java) code that did parts of our processing and tested it on real HTML sources. The little cluster just worked: I was able to submit jobs & monitor them; I tested recovery from task failures, crash of a node, etc.
Next, I wanted to see how this Hadoop solution scaled. To do this, I installed an additional box and added it to our little Hadoop cluster. It was awesome: after adding the new slave to the cluster, everything was transparent. Suddenly we had more capacity on the file system and more horsepower for processing. The job submission was the same as before; the job submitter (Hadoop client) didn’t even know that the cluster had changed, it simply got the results quicker. We were able to crunch some numbers and got a dollar-per-post cost.
So, the evaluation was great, but still there was the awful installation and maintenance process. That’s when we started to test Cloudera’s Distribution including Apache Hadoop; I think it was version 2 of CDH back then. We re-installed our little cluster from scratch using this Hadoop distribution. The installation process was much easier, and the documentation helped. The setup took only a couple of hours. (CDH3 takes less than an hour).
After we found a good package, we wanted to set up a bigger cluster for prototyping, and deeper tests and evaluations. Amazon seemed to be the perfect place for that. Using CDH we set up a 10 node (small instances) cluster on EC2. This was used for performance evaluation and the processing rate was about 10M-20M posts per day — approximately 6 times higher than the performance from our pre-Hadoop solution.
We decided to go with Hadoop. This was a dramatic decision, as we took a company with a Microsoft-oriented development team, ported all the code into Java, all the while adopting a new and very complicated technology stack. This actually meant starting implementation from the beginning, opening a new integrated development environment (IDE) and starting to code from scratch.
In order to reduce risks and avoid critical mistakes, we searched for someone who has “been there, done that” so we could learn from them and validate our overall planned new architecture. Cloudera was our first choice; it made sense to go with a company that has multiple setups behind them, some of which are very large clusters. Cloudera sent Solutions Architect, Lars George, to our offices for two days, and we gave him our suggested design in advance. We felt lucky to have Lars, an HBase committer and author of HBase: The Definitive Guide, as our consultant since HBase was one of the core technologies we were using.
For the first implementation phase, we decided to go with HDFS, MapReduce & HBase. Our in-house-developed crawlers were using HBase as the store for the list of URLs to be fetched. This table should be able to scale to billions of rows. The fetcher (the component in charge of fetching the raw HTML sources) gets the URL queues out of HBase, runs HTTP requests, and stores the raw HTML sources in large files on top of HDFS (few gigs per file). Both the crawler and fetcher don’t use any relational database or any other kind of store except HDFS & HBase. These two components are network and I/O intensive, but CPU is not much of an issue.
Next comes the processing. Each line in the HDFS files contains an HTML source and metadata related to this source. For each directory of files in HDFS, the following processing jobs need to be executed:
- Turn the unstructured HTML into a list of post entities (content, timestamp, etc.)
- Each post needs to be processed as follows:
- Index key terms – extract medical concepts out of the post content, using Treato’s extensive knowledge base
- Execute text analysis algorithms
During this process, many database queries and updates are needed. For example, each post retrieved may potentially already exist in our system, and of course we don’t want to add a duplicate post to our system, nor invest processing power on documents we already have. In order to accomplish this, we need to calculate a hash for each post, and then check it against a database containing all of the existing hashes. For this purpose HBase works perfectly in terms of both latency and load.
After the design phase, we started implementation. All R&D teams worked on porting their code into Java, and our Ops team worked on planning the data center (we decided on co-location data center setup).
For the initial setup, we had 11 boxes that comprised our Hadoop cluster, two of which were name nodes in an active / passive mode (one was in standby for manual failover in case the active NameNode failed). Nine nodes were slaves hosting DataNodes, TaskTrackers and Region-Servers daemons. In addition to this we had three boxes running Zookeeper services.
The new system was capable of analyzing 50M posts per day. This was a significant performance improvement. In addition, it was reasonable to maintain, reliable and worked quite smoothly. Of course, there were bumps in the road, but in the end we managed to overcome them all.
We have continued to improve and expand the solution, and today we can process 150 – 200 million user posts per day. In subsequent blog posts, I will share in greater detail our system design, use of HBase, and cluster architecture.