Email Indexing Using Cloudera Search

Why would any company be interested in searching through its vast trove of email? A better question is: Why wouldn’t everybody be interested? 

Email has become the most widespread method of communication we have, so there is much value to be extracted by making all emails searchable and readily available for further analysis. Some common use cases that involve email analysis are fraud detection, customer sentiment and churn, lawsuit prevention, and that’s just the tip of the iceberg. Each and every company can extract tremendous value based on its own business needs. 

A little over a year ago we described how to archive and index emails using HDFS and Apache Solr. However, at that time, searching and analyzing emails were still relatively cumbersome and technically challenging tasks. We have come a long way in document indexing automation since then — especially with the recent introduction of Cloudera Search, it is now easier than ever to extract value from the corpus of available information.

In this post, you’ll learn how to set up Apache Flume for near-real-time indexing and MapReduce for batch indexing of email documents. Note that although this post focuses on email data, there is no reason why the same concepts could not be applied to instant messages, voice transcripts, or any other data (both structured and unstructured). 

Where to Start

Every document that you would like to make searchable must live in an index. In addition, the documents themselves must be broken up to provide the capability of searching on specific fields.

The diagram below provides a high-level view of how indexes, documents, and fields are related:

The first thing to do is to break down the emails into fields that are of particular interest to the use case.  The fields will be used for building the indexes and facets. To understand the fields of the email, we will use the Mime 1.0 standard, which is defined here. Below is the text from a sample email, with relevant fields highlighted (This email came from a public archive.)

Cloudera Search is based on Solr, which includes multiple other components such as Apache Lucene, SolrCloud, Apache Tika, and Solr Cell. Let’s begin by editing Solr’s schema.xml file to incorporate the fields we have identified in the previous exercise. Throughout this document, you will extensively use the solrctl command to manage SolrCloud deployments. (See the full command reference here.)

The command below will generate a template schema.xml file that you will update to fit our email use case.  

$ solrctl --zk localhost:2181/solr instancedir --generate $HOME/emailSearchConfig

 

(Please note that –zk localhost:2181 should be replaced with the address and port of your own Apache ZooKeeper quorum.)
 
Next, edit the schema.xml file (which can be found here). What follows is a brief overview of what was changed from the template generated above. 

First, completely replace the <fields> and <fieldTypes> section with the one below and add the additional “to” line. The email addresses need to be tokenized differently from the rest of the text — therefore, we created email_general field type below, using the solr.UAX29URLEmailTokenizerFactory tokenizer. Similarly, use a new field type, names_general, for fields that contain names. Date fields use the tdate field type, which allows for easy date range faceting. All other fields are just set up as text_general, which is supplied in the default Solr schema.xml file.
 

<fields>
    <field name="message_id" type="string" indexed="true" stored="true" required="true" multiValued="false" />
    <field name="date" type="tdate" indexed="true" stored="true"/>
    <field name="from" type="email_general" indexed="true" stored="true"/>
    <field name="to" type="email_general" indexed="true" stored="true"/>
    <field name="subject" type="text_general" indexed="true" stored="true"/>
    <field name="from_names" type="names_general" indexed="true" stored="true"/>
    <field name="to_names" type="names_general" indexed="true" stored="true"/>
    <field name="cc" type="email_general" indexed="true" stored="true"/>
    <field name="bcc" type="text_general" indexed="true" stored="true"/>
    <field name="cc_names" type="names_general" indexed="true" stored="true"/>
    <field name="bcc_names" type="names_general" indexed="true" stored="true" />
    <field name="x_folder" type="text_general" indexed="true" stored="false" />
    <field name="x_origin" type="text_general" indexed="true" stored="false" />
    <field name="x_filename" type="text_general" indexed="true" stored="false"/>
    <field name="message" type="text_general" indexed="false" stored="false"/>
    <field name="body" type="text_general" indexed="true" stored="true"/>
    <field name="text" type="text_general" indexed="false" stored="false" />
    <field name="_version_" type="long" indexed="true" stored="true"/>
  </fields>
  <uniqueKey>message_id</uniqueKey>

<fieldType name="names_general" class="solr.TextField" >
    <analyzer type="index">
      <charFilter class="solr.HTMLStripCharFilterFactory" />
      <tokenizer class="solr.StandardTokenizerFactory"/>
      <filter class="solr.LowerCaseFilterFactory"/>
    </analyzer>
    <analyzer type="query">
      <charFilter class="solr.HTMLStripCharFilterFactory" />
      <tokenizer class="solr.StandardTokenizerFactory"/>
      <filter class="solr.LowerCaseFilterFactory"/>
    </analyzer>
  </fieldType>
  <fieldType name="email_general" class="solr.TextField" >
    <analyzer type="index">
      <tokenizer class="solr.UAX29URLEmailTokenizerFactory" />
      <filter class="solr.LowerCaseFilterFactory"/>
    </analyzer>
    <analyzer type="query">
      <tokenizer class="solr.UAX29URLEmailTokenizerFactory" />
      <filter class="solr.LowerCaseFilterFactory"/>
    </analyzer>
  </fieldType>

 

One very important concept in SolrCloud deployments is the notion of collections. A “collection” is a single index that spans multiple Solr Instances. For example, if your email index is distributed across two Solr Instances, they all add up to form one collection. Let’s call our collection email_collection and set it up using the commands below:

$ solrctl --zk localhost:2181/solr instancedir --create email_collection $HOME/emailSearchConfig
$ solrctl --zk localhost:2181/solr collection --create email_collection -s 2

Again, replace –zk localhost:2181 with your own ZooKeeper quorum configuration in both statements.
 
Note that the “-s 2″ argument defines the number of shards. A “shard” is a very important concept in Solr and refers to a slice of an index. For example, if you have a corpus of 1 million emails, you may want to split it into two shards for scalability and to improve query performance. The first shard will handle all the documents that have a message-id between 0 – 500,000, and the second shard will handle documents with message id between 500,000 – 1,000,000. This logic is all handled by Solr internally; you need only specify the number of shards you would like to create with the -s option.

The number of shards will depend on many factors and should be determined carefully. The following table describes some of the considerations that should go into choosing the optimal number:

How Email Parsing Happens

Email parsing is achieved with the help of Cloudera Morphlines.  The Morphlines library is an open source framework, available through the Cloudera Development Kit (CDK), that defines a transformation chain without a single line of code needed.

The Morphlines configuration file defines the transformation chain. We will define a Morphlines configuration file and use it later for both batch and near real-time indexing.

The Email morphlines library will perform the following actions:

  1. Read the email events with the readMultiLine command
  2. Break up the unstructured text into fields with the grok command
  3. If Message-ID is missing from the email, generate it with the generateUUID command
  4. Convert the date/timestamp into a field that Solr will understand, with the convertTimestamp command
  5. Drop all of the extra fields that we did not specify in schema.xml, with the sanitizeUknownSolrFields command
  6. Load the record into Solr for HDFS write, with the loadSolr command

To view the full Morphline configuration file for this example, please visit this link.

Which Tool to Use for Indexing

Cloudera Search can index emails in various ways. In a near-real-time scenario, Cloudera Search will utilize Flume to index messages on their way into HDFS.  As the data passes through Flume, relevant fields will be extracted using MorphlineSolrSink. Solr will then index the event and write the indexes to HDFS. For the batch-oriented scenarios, Cloudera Search provides MapReduceIndexerTool, which will read the data out of HDFS, build the indexes, and write them out back to HDFS. There are multiple options in the tool, including one that will merge the generated indexes into live Solr Servers.

The appropriate indexing method will be dictated entirely by your use case and data ingestion strategy. For example, if you are doing customer churn analysis, you may initiate a periodic search query for words that signify a negative sentiment. As emails from customers come into the system, you may need to persist and index them right away. In that case, the indexing must be done in near-real-time and the data would be ingested through a Flume Agent. But suppose you already have five years of emails already stored on HDFS?  If you need to index them, the only scalable option is to run MapReduceIndexerTool.

In some use cases, both methods may be needed. For example, if most of the emails are indexed in real time but later on you identify a new field that was not indexed before, you may need to go back and run MapReduceIndexerTool to reindex the whole corpus. (There is an HBase near-real time indexer as well, but we’ll address that in a future post.)

The table below describes some question that may help you identify optimal indexing tools: 

How to Run Batch Indexing

The flow of the MapReduceIndexerTool is straightforward:


 

  1. Read the HDFS directory with the files that need to be indexed.
  2. Pass them through the morphline, which will perform the following transformation chain:
    1. Break the text up into fields with grok command
    2. Change the date field into Date Time that Solr will understand
    3. Drop all of the extra fields that you did not specify in the schema.xml file
    4. Generate the indexes and store them to HDFS
    5.  Merge the indexes into live SOLR servers

To run the MapReduceIndexerTool, execute the following command:

/contrib/mr/search-mr-0.9.2-cdh4.3.0-SNAPSHOT-job.jar \

org.apache.solr.hadoop.MapReduceIndexerTool \

--morphline-file <morphlines file> \

--output-dir <hdfs URI for indexes> --go-live --zk-host clust2:2181/solr \ --collection email_collection <HDFS URI with the files to index>

A few notes:

  • <cloudera search directory> is the directory where search tools are installed. If you installed via packages, this would be /usr/lib/solr. If you installed via parcels, it would be the directory where the Search parcel was deployed. 
  • <morphlines file> is the location of the morphlines config file that was created above.
  • The –go-live option will actually merge the indexes into Solr Server.

For a full description of the above command, read this doc.

How to Index in Near-Real Time Using Flume

The flow for near-real-time indexing looks more complex than batch indexing, but it is just as simple:

  1. As the email files are dropped into the spooling directory, Flume will pick them up.
  2. Flume will replicate them into two different memory channels.
  3. The first channel is read by HDFS Sink and rolled directly into HDFS.
  4. The second channel is read by MorphlineSolrSink, which will perform the same transformation chain as described previously:
    1. Break the text up into fields with the grok command
    2. Change the date field into Date Time that Solr will understand
    3. Drop all of the extra fields that you did not specify in the schema.xml file
    4. Generate the Indexes and send them to Solr

One powerful feature of Cloudera Search is that the morphline file, which encodes the transformation logic, is identical in both examples.

For completeness, here is the section in Flume configuration that defines Solr Sink:

# SOLR Sink
tier1.sinks.solrSink.type=org.apache.flume.sink.solr.morphline.MorphlineSolrSink
tier1.sinks.solrSink.channel=solrChannel
tier1.sinks.solrSink.morphlineFile=/tmp/morphline.conf

 

How to Search and View Indexed Data

Once the data is indexable and accessible by Cloudera Search, there are many ways for users to interact with it, including the Solr GUI or the via Hue’s Search application, which provides a very rich and configurable interface with faceting, date ranges and much more. 


Solr GUI

 

Hue’s Search app

I could spend a significant amount of time describing Search visualization tools, but that is a matter for another post. 

Conclusion

To summarize, here are the important steps an enterprise can take to get more value from its email data:

  • Decide which email fields are to be extracted and indexed.
  • Use Morphlines to help with transformations. (No coding required!)
  • Use MapReduceIndexerTool to index the emails in HDFS.
  • Use Flume with MorphlineSolrSink to index the emails in near real time.

The big news is that with Cloudera Search, indexing and searching various types of data over small and large data sets has become much easier and much more flexible. We have all learned to search the internet for information by using popular tools like Google and Yahoo!. Now, this tool is available for data in your Hadoop and HBase environments – and conveniently provides a more integrated and unified process. You will be able to find your data and process it on the same platform, without having to switch context or move your data. 

Jeff Shmain is a solution architect at Cloudera.

3 Responses

Leave a comment


− 1 = six