Thanks to Jonathan Natkins, a field engineer from StreamSets, for the guest post below about using StreamSets Data Collector—open source, GUI-driven ingest technology for developing and operating data pipelines with a minimum of code—and Cloudera Search and HUE to build a real-time search environment.
As pressure mounts on data engineers to deliver more data from more sources in less time, StreamSets Data Collector can serve as a linchpin in the data management process, helping them simplify ingest pipeline development and operations across the rapidly evolving ecosystem of big data tools and technology. In this post, we’ll create a pipeline for ingesting loan data to show you how to use StreamSets Data Collector and Cloudera Search to build a real-time search environment.
StreamSets is an open source, Apache-licensed system for building continuous ingestion pipelines. The StreamSets Data Collector provides ETL-upon-ingest capabilities while enabling custom-code-free integration between a wide variety of source data systems (like relational databases, Amazon S3, or flat files) and destination systems within the Hadoop ecosystem. StreamSets is easy to install via downloadable tarballs or using the Cloudera Manager Custom Service Descriptor (CSD).
Lending Club is a company that provides peer-to-peer loans. A user can request a loan, and then the loans are crowd-funded by investors. Peer-to-peer lending has become an extremely hot space, especially as similar platforms like Kickstarter have gained traction. Investors take on huge amounts of risk, however, if they invest in loans that they don’t understand well. Fortunately, Lending Club provides publicly available data about the loans it issues, as well as the current performance and returns. Using StreamSets and Cloudera Search, one can leverage this data to better understand how to find loans in which it’s worth investing.
Unfortunately, Lending Club doesn’t provide a truly live feed, but we can simulate it easily using StreamSets and Apache Kafka. We’ll leverage StreamSets to load data from flat files into Kafka, and then use StreamSets again to consume the data from Kafka and send it to Cloudera Search and HDFS.
For the sake of brevity, the data files have been downloaded to a server running a StreamSets Data Collector, and some minor processing has been done to remove a one-line preamble from the top of each of the CSV files.
Loading the Loan Data into Kafka
Kafka is a high-throughput message-queueing system, which has become widely used for building publish-subscribe systems with the Apache Hadoop ecosystem. A major benefit of using Kafka as an intermediate datastore is that it makes it very easy to replay ingestion and analysis, as well as making it significantly easier to consume datasets across multiple applications. However, a common challenge with using Kafka is that the primary methods of producing and consuming data requires writing custom code to leverage the APIs.
We can use StreamSets to graphically build a pipeline that will load data into a Kafka topic. We can also use this pipeline to do a little work to canonicalize our data format. Generally, it is a best practice to have a common data format within a Hadoop deployment for ease of building follow-on applications, and for this deployment, we’ve chosen JSON. One benefit JSON gives us over CSV data is that CSV files are heavily dependent upon ordering of columns; converting to JSON will help us avoid any potential column ordering issues later on.
Configuring a StreamSets Pipeline
StreamSets pipelines avoid custom code by providing general-purpose connectors that are configuration-driven. StreamSets Data Collectors may have many pipelines, and each pipeline has a single data origin, but may have one or more destinations. To load the loan data into Kafka, we will build a very simple pipeline that has a Directory origin and a Kafka destination.
A key concept in StreamSets is the idea of the StreamSets Data Collector (SDC) Record. When data is read into a pipeline, it is parsed into an SDC Record. Having a common record format within the pipeline enables transformations to be built in a generic fashion, so that they can operate on any record that comes through, regardless of schema. When the data is sent to a destination, it is serialized to a target data format (when applicable).
For this initial pipeline, the Directory origin will be configured to read in Delimited data with a header line, and the Kafka destination will be configured to output JSON data. We will also specify the location of the files to ingest, and the Kafka topic to which we send the data.
We must also configure how to handle errors in the pipeline. A record may be classified as an error for many different reasons: perhaps data came in the wrong format, or a field required for a transformation was missing. When a record is marked as an error, it is sent off to an error destination, which could be another pipeline or a Kafka topic. If you’re not sure what to do with error records, they can always be discarded.
Handling Varying Record Types and Preparing Data for Search
On the other side of Kafka, we’ll use another StreamSets pipeline to consume data from the Kafka topic and build up an index in Cloudera Search. Oftentimes data is received in a less-than-pristine format, and very frequently, it’s necessary to do some amount of pre-processing or transformations to get the data into a consumption-ready format.
StreamSets can be used to perform row-oriented transformations as the data is ingested. A good way to think about the types of transformations that StreamSets can handle is to think of a pipeline as a continuous map-only job. For this example, the pipeline has been designed to perform a handful of transformation operations.
One interesting challenge is that the accepted and rejected loan files that came from Lending Club have different schemas. All the data is in CSV format, but accepted loans have upwards of 50 fields, while rejected loans only have nine. Since StreamSets parses each record individually, we don’t have to make any changes to the pipeline to handle the different record types. However, one transformation we’ll put in place is to canonicalize some of the field names between the two record types, using a Field Renamer processor. This will allow us to perform transformations on semantically identical fields, regardless of the schema.
Another type of transformation and data enrichment that this pipeline handles is mapping from a zip code to a latitude/longitude pair. Occasionally, when it is necessary to build proprietary logic or some other complex transformation into a pipeline, it makes sense to use some of the extensibility capabilities of StreamSets to fulfill those needs. In the case of this pipeline, we’ve downloaded a mapping dictionary available online and written a Python script to do the lookup and create some additional fields to store the latitude and longitude data.
Finally, we’ve separated out the accepted and rejected loans, with accepted loans going to Cloudera Search and rejected loans being archived on HDFS. Notably, the HDFS location can be parameterized with field values or timestamps, which can make the HDFS destination useful for loading data into partitioned Apache Hive tables.
Starting Up the Pipelines and Getting Some Results
Once the two StreamSets pipelines are started, data will start to flow into the configured Cloudera Search index.
As data arrives, we can use HUE to build dashboards on the index, and get some more information about these loans. In this dashboard, we’ve plotted the number of loans being issued from each state, as well as a comparison between income brackets and the status of the loan (paid off, delinquent, etc.). We can use this information, along with the rest of the data that we’re continuously ingesting, to make better loan investment decisions.
The Hadoop ecosystem has a wide array of tools and technologies for building solutions. In this post, you’ve learned how to piece together complementary ingestion technologies like StreamSets and Kafka to bring data in real-time to analytics and search infrastructure like Solr, and finally visualize that data with HUE. The combination of these technologies provides an end-to-end solution for enabling data scientists and analysts to better serve themselves, and to get faster access to data that is critical to them.