Applying Parallel Prediction to Big Data
- by Dan McClary
- October 05, 2012
- no comments
This guest post is provided by Dan McClary, Principal Product Manager for Big Data and Hadoop at Oracle.
One of the constants in discussions around Big Data is the desire for richer analytics and models. However, for those who don’t have a deep background in statistics or machine learning, it can be difficult to know not only just what techniques to apply, but on what data to apply them. Moreover, how can we leverage the power of Apache Hadoop to effectively operationalize the model-building process? In this post we’re going to take a look at a simple approach for applying well-known machine learning approaches to our big datasets. We’ll use Pig and Hadoop to quickly parallelize a standalone machine-learning program written in Jython.
I’d like to predict the weather. Heck, we all would – there’s personal and business value in knowing the likelihood of sun, rain, or snow. Do I need an umbrella? Can I sell more umbrellas? Better yet, groups like the National Climatic Data Center offer public access to weather data stretching back to the 1930s. I’ve got a question I want to answer and some big data with which to do it. On first reaction, because I want to do machine learning on data stored in HDFS, I might be tempted to reach for a massively scalable machine learning library like Mahout.
For the problem at hand, that may be overkill and we can get it solved in an easier way, without understanding Mahout. Something becomes apparent on thinking about the problem: I don’t want my climate model for San Francisco to include the weather data from Providence, RI. Weather is a local problem and we want to model it locally. Therefore what we need is many models across different subsets of data. For the purpose of example, I’d like to model the weather on a state-by-state basis. But if I have to build 50 models sequentially, tomorrow’s weather will have happened before I’ve got a national forecast. Fortunately, this is an area where Pig shines.
Parallel Pipelines with Pig
We want to build models by state, but unfortunately NCDC’s global surface temperature data doesn’t come tagged by state. Instead, what we have is day-over-day data organized by station ID number. NCDC provides a map from station ID to locations, but we’ll need to join it to our weather data. However, the number of stations is much, much smaller than the number of temperature observations — in fact, it will fit into a modest amount of memory.
Pig’s JOIN operator allows us to specify join behavior when we understand our bags are of uneven sizes. In this case, we can use the “replicated” directive with the JOIN operator to force all but the first bag to reside in memory.
Pig’s GROUP operator allows us to quickly organize our dataset by a feature (e.g., state) resulting in an outer bag for which each group is a training set. Once the data’s organized, dispatching our model-building code is as simple as using Pig’s FOREACH operator. That accomplished, we’re going to need some model-building code. Pig alone isn’t suited to this task, but its ability to leverage scripting languages in the JVM makes it easy to tap into a wealth of machine learning methods.
Using Jython and Weka for Model Building
Decision trees are one of the fundamental techniques in machine learning and are applicable to a wide set of industry problems (e.g., advertisement targeting, quality evaluation). I’d like to build a simple C4.5 tree for each state’s weather. While I won’t get into the details of C4.5 here, the concept is simple: we want to organize our tree so that as we travel from the root to the leaves our decisions are ordered by the amount of information that can be gleaned from that feature. For example, if one of our features is latitude, then the model for California might place that decision near the top of the tree. Why? Because northern and southern California have very different climates and latitude tells us more about weather outcome across California than, say, the wind speed.
As I mentioned above, Pig isn’t suitable for constructing a C4.5 tree. Besides, C4.5′s been around since the 1990s, someone has surely open-sourced an implementation of it. In fact, the Weka machine-learning library contains a very good Java implementation of C4.5 called J48. Since Pig can register JARs for use as UDFs, it should be easy enough for us to route our FOREACH call into Weka.
However, I’m in a hurry, and may want to try out a bunch of different modeling techniques. I don’t want to write and package Java code if I can just write Python instead. Fortunately, Pig has support for Jython UDFs built in. All we need to do is make sure Jython is on the Pig classpath and make sure our code knows where to find our UDF. It looks a little like this:
REGISTER weka.jar; REGISTER 'c45_udf.py' USING jython AS c45;
And we’ll send the bags of training data to Jython like this:
models = FOREACH training_groups GENERATE c45.build_instances(group, training_data);
But what does the Jython code do to leverage Weka? The code needs to accomplish a few things:
- Import the necessary Weka classes
- Define an output schema so that Pig understands how the resulting tuples are structured
- Transform the input data into a Weka dataset and build a C4.5 tree
- Turn the model into data that can be used to test or make predictions in the future and return it to Pig
Importing the classes and defining the output schema are simple:
import sys sys.path += ["/usr/lib/jython/Lib","/usr/lib/jython/Lib/site-packages"] import weka.core.Instances as Instances import weka.core.Instance as Instance import weka.core.FastVector as FastVector import weka.core.Attribute as Attribute import weka.classifiers.trees.J48 as J48 @outputSchema("state:chararray, model:chararray") def build_instances(state,dataset):
That output decorator tells Pig what to expect in the return tuple.
The mechanics of transforming the input data into Weka vectors and training a model are less easily summarized, so you can find a code sample here:
REGISTER weka.jar; REGISTER 'c45_udf.py' USING jython AS c45; rmf /user/oracle/weather/models weather_obs = LOAD '/user/oracle/weather/cleaned_history' using PigStorage('\u0001') as (usaf:int, wban:int, year:int, month:int, day:int, temp:float, dewp:float, weather:chararray); stations = LOAD '/user/oracle/weather/stations' USING PigStorage() as (stn:int, wban:int, country:chararray, state:chararray, lat:float, lon:float); observations = JOIN weather_obs BY usaf, stations BY stn using 'replicated'; training_data = FOREACH observations GENERATE state,lat, lon, day,temp,dewp,weather; training_groups = GROUP training_data BY state; models = FOREACH training_groups GENERATE c45.build_instances(group, training_data); STORE models INTO '/user/oracle/weather/models' USING PigStorage();,
…or read about integrating Weka and Jython here.
Once we’ve done this, we end up with trees like this one for California:
lon <= -124.160004 | dewp <= 49.299999 | | temp <= 50.599998 | | | dewp <= 45.700001 | | | | temp <= 42.299999 | | | | | day <= 6: Fog (2.0) | | | | | day > 6: Sunny (18.0/2.0) | | | | temp > 42.299999 | | | | | dewp <= 44.299999 | | | | | | temp <= 42.599998: Rain (2.0) | | | | | | temp > 42.599998: Sunny (156.0/38.0) | | | | | dewp > 44.299999 | | | | | | temp <= 50.299999 | | | | | | | dewp <= 44.599998: Rain (10.0) | | | | | | | dewp > 44.599998 | | | | | | | | day <= 18: Sunny (8.0) | | | | | | | | day > 18: Rain (4.0) | | | | | | temp > 50.299999: Sunny (4.0) ...
While my code includes some half-backed packaging of the model, consider serialization an exercise left to the reader.
This model snippet doesn’t tell me much, other than perhaps that it’s always sunny in California. But what we have here is more than just a fun example of how to play with public data using Pig and Python; rather it’s a simple methodology for applying existing modeling approaches to Big Data. By adding in your own aggregation logic and modeling code, you can get up and running with analytics on Hadoop with very little effort.