Couchdoop: Couchbase Meets Apache Hadoop

Categories: Guest Hadoop

Thanks to Călin-Andrei Burloiu, Big Data Engineer at antivirus company Avira, and Radu Pastia, Senior Software Developer in the Big Data Team at Orange, for the guest post below about the Couchdoop connector for bringing Couchbase data into Hadoop.

Couchdoop is a Couchbase connector for Apache Hadoop, developed by Avira on CDH, that allows for easy, parallel data transfer between Couchbase and Hadoop storage engines. It includes a command-line tool, for simple tasks and prototyping, as well as a MapReduce library, for those who want to use Couchdoop directly in MapReduce jobs. Couchdoop works natively with CDH 5.x.
Couchdoop can help you:

  • Import documents from Couchbase to Hadoop storage (HDFS or Apache HBase)
  • Export documents from Hadoop storage to Couchbase
  • Batch-update existing Couchbase documents
  • Query Couchbase views to import only specific documents (daily imports for example)
  • Easily control performance by adjusting the degree of parallelism via MapReduce

In the remainder of this post, you’ll learn the main features of Couchdoop and explore a demo application .

Why Couchdoop?

In many Big Data applications, data is transferred from an “operational” tier containing a key-value store to an “analytical” tier containing Hadoop via Apache Flume or a queuing service such as Apache Kafka or Rabbit MQ. However, this approach is not always possible or efficient, such as when the events themselves are highly related (like a shopping session with several clicks and views) and could be conveniently grouped before being pushed to Hadoop. In those cases where Couchbase serves as the operational tier, Couchdoop’s import feature comes in handy. Conversely, you can use Couchdoop’s export feature to move data computed with Hadoop into Couchbase for use in real-time applications.

The data collected by the operational tier can be imported in the analytical tier where traditionally it will be stored in HDFS. By using the tools provided by CDH, the data could be processed and enhanced for various use cases. One use case is ad hoc querying, which allows business people to query the data in real time using  Impala. Another use case is improving user experience by using machine-learning algorithms to adapt the application to users’ needs. For this use case, both MapReduce and Apache Spark, which are included in CDH, can be used. (Spark comes with its own machine-learning library, MLlib.) Apache Mahout offers time-proved algorithms written in MapReduce as well as newer and faster implementations written in Spark. The outcome of the machine-learning algorithms can be exported to the operational tier using Couchdoop.

Importing in E-Commerce Websites

Real-time tier user events are typically streamed to Hadoop by using a logging system, such as Flume, or a message queuing system, such as Kafka. So why would you import data with Couchdoop instead of streaming it? Because you will be importing directly from Couchbase when your real-time application requires keeping a session with recent user activity.

An e-commerce website is a typical example, because it needs to store the shopping cart and recently viewed products in order to recommend to the user other products that he/she might want to buy in that session. A batch process can be scheduled to use Couchdoop to incrementally import all user sessions from Couchbase to HDFS at the end of each day. The recommendation engine can be retrained with the fresh data to adapt to user’s interests. Algorithms such as frequent-item-sets require all products from the same cart to be grouped together. If you were to use Flume to stream user events, you would need to create a job to group all events from the same session. By using the Couchdoop import approach, all the data from a particular session is already in the same place, dramatically reducing the data processing computational cost.

Exporting in Avira Messaging Engine

One example of a back-end application that uses Couchdoop in production is the Avira Messaging Engine (AME), which is responsible for delivering in-product messages to our users. Those messages may be security alerts, announcements, and marketing campaigns. Because some messages may be irrelevant for some users, we are now building a marketing engine based on response modeling, which targets each user based on their profile. The best next message to deliver to a user is predicted in the analytical tier using CDH. Couchdoop’s export feature is used to publish targeted messages for each user in Couchbase. Thus, our antivirus service is now able to request messages from our operational tier..

Demo

Learning to use a new tool is always easier via example. Here is a fictional but common use-case that we’ll solve with Couchdoop:

Let’s say we have a very popular news website. Currently all users are shown the same content, but going forward we want to analyze user behavior and deliver personalized recommendations.

We’ll use Couchbase, Hadoop, and Couchdoop for this goal. This is what we’ll do:

  • Keep track of users by storing a unique ID in a cookie, and optionally asking them to authenticate
  • Record each session/visit as a Couchbase document; each page view will be added to a session until this session is ended (some time passes and the session expires – the visit is complete)
  • Move completed sessions to Hadoop using Couchdoop to import
  • Run our recommendation algorithm/tool in Hadoop (out of scope for this demo)
  • Load the recommendations into Couchbase to serve them to the website

Let’s go further into detail.

Preparing the Data

For each session ID we store a document in Couchbase that tracks user actions in that session:

The above Couchbase document encodes the date in the key, 20141201 (December 1, 2014), and the sequence of articles read in the value. The time spent reading an article is very important feedback; the user may have requested an article because it seemed interesting but then stopped reading for whatever reason.

Couchdoop is scheduled to import user sessions in batch into HDFS. By analyzing millions of sessions we can detect sequence patterns, apply association rules (such as frequent-item-sets), and finally compute the recommendations for each user. Each user has its own recommendation document:

As we plan to import sessions to Hadoop on a daily basis, we need to be able to retrieve all sessions created during any given day. We’ll create a Couchbase view for those documents, which will allow finding all session documents from a particular date. Furthermore, we will also randomly partition the data within each date to allow separation of input into smaller pieces, to leverage Hadoop’s parallelism. Couchdoop will create a Map task for each of these partitions.

The following JavaScript map function parses a document’s key, extracts the date, and emits it as a view key:

For the document with key sid::20141201::johnny::1357902468, the map function will emit ["20141201", 4]. When we configure Couchdoop to import data from a Couchbase view we need to specify a list of view keys. Each map task will retrieve data with one of the view keys. The list can be expressed sequentially as a semicolon separated JSON list, e.g. ["20141201", 0];["20141201", 1];["20141201", 2];["20141201", 3], ... or by using key ranges, e.g. ["20141201", ((0-63))].

To prepare your Couchbase installation with sample data and to configure the view above, follow the Prerequisites section from the README of the GitHub demo project. The above map function should be defined in the sessions design document and the by_date view.

Now we need to move these sessions to HDFS, to serve as input for our recommender.

Importing

The following command will load all sessions for December 1, 2014, in HDFS:

The above command will start a MapReduce job.

Map tasks will connect to Couchbase by using the provided URLs, bucket, and password. The view by_date from design document sessions is going to be queried. Parallelism is controlled with the --hadoop-mappers parameter. The 64 views key will be divided to the specified 4 Map tasks, so each one will process 16 of them. If we don’t set --hadoop-mappers parameter, the number of Map tasks used will be equal to the number of view keys, in this case 64.

The result will be written in HDFS in /website/sessions, as TSV files having two columns one corresponding to keys and the other to values from Couchbase. By default, Hadoop uses tab as separator between columns, but this setting can be changed by modifying the mapred.textoutputformat.separator property.

Exporting

We assume that we have computed recommendations for each user and we now have them stored as key-value TSV files in HDFS, formatted as Couchbase recommendation documents. In other to publish them to Couchbase, we can run the following in the command line:

Each Hadoop task will connect to Couchbase Server by using the provided URLs, bucket, and password. We can use the --couchbase-operation parameter to choose one of the available Couchbase operations to be performed for each key-value pair:

  • ADD (puts a document if it does not exist)
  • SET (puts a document if it does not exist and overwrites it if it exists)
  • REPLACE (overwrites a document if it exists)
  • APPEND / PREPEND (appends / prepends a document’s value if it exists; these operations violate JSON semantics)
  • DELETE (removes a document by its key if it exists)
  • EXISTS (does not modify any document from Couchbase, it just uses Hadoop counters to count how many input keys exist in the bucket).

We can optionally set an expiry for the documents stored. If we need more control and we need to specify a particular operation and expiry for each document we can use the MapReduce API and write our own MapReduce jobs with Couchdoop.

Using Couchdoop as a Hadoop Library

In a real situation, one often needs a bit more functionality than what the Couchdoop command-line tool offers. For example, you might need to convert Couchbase JSON documents to other formats (like CSV, Avro, or Parquet) to match the expected format of the recommender. Our sample project illustrates exactly that, and also demos the use of Couchdoop as a library.

Next, let’s look a bit in more detail at what we need to look after. If we use Couchdoop as a library in our MapReduce job, the first thing we must confirm is that all configuration properties needed by Couchdoop are set in the Hadoop configuration object. These properties are the same as the ones used in the command line (as shown above) but use dots instead of dashes (for example --couchbase-urls becomes couchbase.urls). You can set them as easily as:
conf.set("couchbase.urls", "http://couchbase.example.com:8091/pools")

You can conveniently create an XML file with your configuration, similar to mapred-site.xml, and pass it with -conf argument of hadoop jar command line tool. (This is what we did in in the demo project in couchdoop-site.xml; check the README for a usage sample. Check out the wiki on Couchdoop’s GitHub page to learn more about configuration properties.)

Another important topic before we go on is dependencies of the Couchbase client that may be incompatible with those of Hadoop (depending on your Hadoop distribution). Use the statement job.setUserClassesTakesPrecedence(true); to confirm the Couchbase client functions correctly.

Now that everything is all set, we can have a look at the actual jobs.

Importing
  • Set the input format with job.setInputFormatClass(CouchbaseViewInputFormat.class);. This config will instruct Hadoop to read data from Couchbase. The properties you set earlier will be used to connect to Couchbase, use the correct view, and query the requested keys. For an example, check ImportDriver class from the demo project.
  • Write a mapper that gets the key as Text and the value as ViewRow (com.couchbase.client.protocol.views.ViewRow). The ViewRow is a document extracted by the CouchbaseViewInputFormat  from the given view. For an example, check ImportMapper class.
Exporting
  • Write a mapper that emits the key as String and the value as CouchbaseAction (com.avira.couchdoop.exp.CouchbaseAction). Check ExportMapper to see you can do this.
  • Set the output key, value< and format classes:
    job.setOutputKeyClass(String.class);
    job.setOutputValueClass(CouchbaseAction.class);
    job.setOutputFormatClass(CouchbaseOutputFormat.class);

     

    For an example, check ExportDriver from the demo project.

Potential Issues

Using Couchbase Views

Couchdoop’s import feature relies on Couchbase views. This is a powerful feature that allows you to query for any document in the database, but it is well known that using views in Couchbase can affect bucket performance. Views are slow because they keep their index on disk and consume CPU each time a document is indexed. You typically overcome these issues by reserving about 20% of your memory to your operating system file cache and allocating a CPU core for each view design document. If the views still don’t work well, consider using the Couchbase Sqoop connector, which can import the whole Couchbase bucket into HDFS – however, you will lose the advantage of incremental updates at the expense of real-time performance. Alternatively, you can update your architecture to stream all your real-time application events to Hadoop via Flume or Kafka, as previously discussed.

Couchdoop and High Throughput

As our tests on Bigstep’s Full Metal Cloud prove, by leveraging parallelism Couchdoop can achieve very high throughput and push Couchbase to the limit. See the results of the performance test here. On one hand, this saves time, but on the other hand, it might affect your real-time production application performance. The real-time application will compete with Couchdoop on Couchbase resources. To overcome this issue, schedule Couchdoop imports/exports when the real-time has lower traffic or simply decrease Couchdoop’s level of parallelism.

Note: During testing, we noticed that when Couchdoop transfers data between many Hadoop workers and many Couchbase nodes, the throughput is affected by the network congestion. If you have many Couchbase nodes in the cluster, it might be better to have a lower level of parallelism and use fewer Hadoop workers. Experiment with more levels of parallelism and choose the value that works better for you.

Conclusion

If you use both Couchbase and CDH, you will definitely find Couchdoop useful. If you are computing something with Hadoop in the analytical tier, Couchdoop’s export feature can help you publish the new data in Couchbase to make it available to the operational tier.

If you want to take incremental updates from Couchbase and process them in Hadoop, you should use Couchdoop’s import feature. The most common use case in this circumstance is when you want to incrementally import user sessions. But if you don’t require incremental updates and you only occasionally perform imports from Couchbase to Hadoop, the Couchbase Sqoop connector might be the better tool for you. If the data that needs to go to your analytical tier looks more like events and is not structured as sessions, or if Couchbase views cause you pain, you might find that Flume or Kafka works better for you.

Whatever you do, experiment before choosing a solution for production!

Acknowledgements

We built Couchdoop out of necessity, while wrangling data at Avira and we’d like to thank them for allowing us to open-source this project. Our thanks also to Bigstep, which showed excitement toward Couchdoop and helped us test it and move forward with the project.

Facebooktwittergoogle_pluslinkedinmailFacebooktwittergoogle_pluslinkedinmail