How-to: Process Data using Morphlines (in Kite SDK)

Our thanks to Janos Matyas, CTO and Founder of SequenceIQ, for the guest post below about his company’s use case for Morphlines (part of the Kite SDK).

SequenceIQ has an Apache Hadoop-based platform and API that consume and ingest various types of data from different sources to offer predictive analytics and actionable insights. Our datasets are structured, unstructured, log files, and communication records, and they require constant refining, cleaning, and transformation.

These datasets come from different sources (industry-standard and proprietary adapters, Apache Flume, MQTT, iBeacon, and so on), so we need a flexible, embeddable framework to support our ETL process chain. Hello, Morphlines! (As you may know, originally the Morphlines library was developed as part of Cloudera Search; eventually, it graduated into the Kite SDK as a general-purpose framework.)

To define a Morphline transformation chain, you need to describe the steps in a configuration file, and the framework will then turn into an in-memory container for transformation commands. Commands perform tasks such as transforming, loading, parsing, and processing records, and they can be linked in a processing chain.

In this blog post, I’ll demonstrate such an ETL process chain containing custom Morphlines commands (defined via config file and Java), and use the framework within MapReduce jobs and Flume. For the sample ETL with Morphlines use case, we have picked a publicly available “million song” dataset from Last.fm. The raw data consist of one JSON file/entry for each track; the dictionary contains the following keywords:

 

The keys are not always available for each object.

 

During the ETL steps, we will use two process chains with built-in, as well as custom, commands (and you will learn two different ways to write the latter):

Import Data into the Hadoop Cluster

The initial data import flow from an external source into Hadoop/HDFS is the following:

During the import phase, set up a Flume Interceptor with a Morphlines configuration file to load data into HDFS. The load process uses a custom Morphlines Java command to do a preliminary ETL process on the data (selecting songs before and after a given date). For this process, you can use a simple custom Java command, called LatestSongCommand, below:

 

To configure your new Morphline command:

 

The data is coming through the Flume agent, so the Morphlines commands are applied to the records and the Flume sink will receive the cleaned data.

Note that when using Morphlines with the Flume HdfsSink, configuring a custom data serializer for the HdfsSink is often handy, as Flume by default persists the body but not the headers. For your convenience, we have written a Flume serializer for Morphlines (CustomLastfmHeaderAndBodyTextEventSerializer) that will retain the same input data format, with the ETL commands applied.

Post-process the Entries

Once the data is imported into HDFS, you can post-process it and apply additional ETL steps. The flow used in this example is the following:

The post-processing ETL uses a chain of Morphline commands set up in a configuration file. Each track in the Last.fm dataset has similar tracks associated with it, which are the results of precomputed song-level similarities. The custom Morphlines command lets you select only the entries where the similarity is less than, equal to, or over the filter value (in our case, over 0.1).

 

In this Morphline config file there are two default commands (readJson, extractJsonPaths) and one custom command (written in Java). As you can see, custom Morphline commands can be defined in a command file as well, so you don’t have to compile them or write a Java class beforehand. This is an extremely useful feature; using the JavaBuilder class the framework compiles the commands at runtime.

Using Morphlines from a MapReduce job is straightforward. During the setup phase of the MapReduce job you build a context and a Morphline — that’s it.

 

Also, we have created a RecordEmitter inner class:

 

in the mapper. This method:

 

takes an important parameter called finalChild, which in this case is the RecordEmitter. The returned command will feed records into finalChild, which means that if this parameter is not provided, a DropRecord command will be assigned automatically. (In Flume there is a Collector command to avoid losing any transformed record). The only thing left is to out-box the processed record and write the results to HDFS. The RecordEmitter will serve this purpose writing out to HDFS: context.write(line, null).

Once the morphline is created, you can now process the records:

 

Testing

Morphlines has a nice test framework built in to the SDK; your unit tests can extend the abstract AbstractMorphlineTest class, thus you can test your custom-built commands the same way as Morphlines does for the built-in ones. (Use our LatestSongCommandTest test case as a reference.)

Building and Running

You can get the code from our GitHub page and build the project with Maven mvn clean install. Download the Last.fm sample dataset from S3 and save it to your computer. (Alternatively, you can use our other Morphlines example to process/ETL files directly from an S3 bucket.) Start Flume using the following configuration and make sure you change the input and output folders accordingly. Once the data is processed and available on HDFS, you can run the second ETL process, this time using Morphlines from a MapReduce job:

 

Conclusion

As you can see, embedding Morphlines in your application and using it is very easy. The increasing number of built-in commands will satisfy most needs, but the framework offers flexible ways to write custom commands, as well.

Happy Morphlining!

Filed under:

No Responses

Leave a comment


× 1 = seven