Indexing Files via Solr and Java MapReduce

Several weeks ago, I set about to demonstrate the ease with which Solr and Map/Reduce can be integrated. I was unable to find a simple, yet comprehensive, primer on integrating the two technologies. So I set about to write one.

What follows is my bare-bones tutorial on getting Solr up and running to index each word of the complete works of Shakespeare. Note: Special thanks to Sematext for looking over the Solr bits and making sure they are sane. Check them out if you’re going to be doing a lot of work with Solr, ElasticSearch, or search in general and want to bring in the experts.

First things first

The way that I got started was by instantiating a new CentOS 6 Virtual Machine. You can pick a different flavor of Linux if that suits you; Hadoop should work fine on any (though advocated distros are SuSE, Ubuntu/Debian, RedHat/CentOS).

If you are fine with CentOS and want to skip some of the manual labor here, you can download a pre-loaded Virtual Machine from the Cloudera Downloads section, that already includes an installation of Sun Java 6u21 and CDH3u3. You can then skip ahead to installing Solr and downloading sample data as outlined below.

If you are proceeding with a new (virtual) machine, then follow along as follows: Make sure to disable SELinux, if applicable, and enable sshd. For CentOS6, that was done with the following commands:

[user@localhost ~]$ sudo chkconfig --levels 2345 sshd on
[user@localhost ~]$ /etc/init.d/sshd start
[user@localhost ~]$ vim /etc/selinux/config [set to disabled]

On that machine, download and install:

[user@localhost ~]$ sudo rm /usr/bin/java
[user@localhost ~]$ sudo ln -s /usr/java/jdk1.6.0_26/jre/bin/java \
 /usr/bin/java

You can validate that all of the pieces are installed and running correctly by doing the following:

  • Java
[user@localhost ~]$ java -version
java version "1.6.0_21"
Java(TM) SE Runtime Environment (build 1.6.0_21-b06)
Java HotSpot(TM) 64-Bit Server VM (build 17.0-b16, mixed mode)
  • Solr
[user@localhost ~]$ cd /example
[user@localhost ~]$ java -jar start.jar

The goal is to get up and running quickly here, so I am opting to use the Solr example configuration. Worth noting also that when run in this manner, the Solr server will be started with the default JVM heap size – which I believe to be the smaller of {1/4 system memory or 1GB}.

Now, you should be able to access the Solr administration GUI (one of the niceties of Solr!) via a web browser inside your VM with the address: http://localhost:8983/solr/admin

  • Hadoop

You can validate that Hadoop is installed and running successfully by navigating in your VM’s browser to: http://localhost:7180, logging in as admin/admin, and seeing the following:

A Healthy Hadoop (pseudo)cluster

You probably want to export the client config XML files (can be done with a single click via Cloudera Manager – see the Generate Client Configuration buttong), copy them to /usr/lib/hadoop/conf, and then copy the sample text into hdfs:

[user@localhost ~]$ hadoop fs -put <shakespeare> shakespeare

Creating the indexing code

I have some history with Lucene from a past life, so the high level functionality of Solr was familiar to me. In a nutshell, you index files within Java code by creating a SolrInputDocument, which represents a single entity to index – a file or document generally – and using the .addField() to attach fields to this document that you’d later like to search.

The driver code for the indexer is very simple, in that it takes input file path(s) off the command line, and runs the mapper on the files that it finds. Note that it will accept a directory, and parse all of the files that it finds within.

public class IndexDriver extends Configured implements Tool {     

  public static void main(String[] args) throws Exception {
    //TODO: Add some checks here to validate the input path
    int exitCode = ToolRunner.run(new Configuration(),
     new IndexDriver(), args);
    System.exit(exitCode);
  }

  @Override
  public int run(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(), IndexDriver.class);
    conf.setJobName("Index Builder - Adam S @ Cloudera");
    conf.setSpeculativeExecution(false);

    // Set Input and Output paths
    FileInputFormat.setInputPaths(conf, new Path(args[0].toString()));
    FileOutputFormat.setOutputPath(conf, new Path(args[1].toString()));
    // Use TextInputFormat
    conf.setInputFormat(TextInputFormat.class);

    // Mapper has no output
    conf.setMapperClass(IndexMapper.class);
    conf.setMapOutputKeyClass(NullWritable.class);
    conf.setMapOutputValueClass(NullWritable.class);
    conf.setNumReduceTasks(0);
    JobClient.runJob(conf);
    return 0;
  }
}

The Map code is where things get more interesting. A couple notes before we proceed:

Solr servers may be used in 2 ways:

  1. Via embedding a Solr server object within your Java code using EmbeddedSolrServer
  2. Via HTTP requests, using the class CommonsHttpSolrServer with a URL (in our case, http://localhost:8983/solr)

In what follows, I elected to go with the StreamingUpdateSolrServer – which is a subclass of CommonsHttpSolrServer. More comments on that towards the end.

I will assume now that the reader has some familiarity with the Map/Reduce programming paradigm. The salient points for us here are that we will use a Map-only job to read through each file in the input that we provide, and index our chosen fields. Taking the path of least resistance, I used the fact that each line of text is it’s own Key/Value pair if we read the input as TextInputFormat, and I chose to index the following fields:

  1. As a unique identifier for each word, I concatenated the filename, line offset (conveniently provided to the Map code as the “Key” because we are using TextInputFormat), and the position on that line of the word
  2. The word itself

The Solr server obeys field definitions (specifying field names, data types, uniqueness, etc.) as dictated by a schema file. For this example, running Solr as indicated above, the schema is defined by/example/solr/conf/schema.xml

Per my choice to index 2 distinct fields, the relevant fields in the schema are:

<field name="id" type="string" indexed="true" \
 stored="true" required="true" />
<field name="text" type="text_general" indexed="true" \
 stored="true" multiValued="true"/>

Without further ado, then, the code looks like the following:

public class IndexMapper extends MapReduceBase implements
 Mapper <LongWritable, Text, NullWritable, NullWritable> {
  private StreamingUpdateSolrServer server = null;
  private SolrInputDocument thisDoc = new SolrInputDocument();
  private String fileName;
  private StringTokenizer st = null;
  private int lineCounter = 0;

  @Override
  public void configure(JobConf job) {
    String url = "http://localhost:8983/solr";
    fileName = job.get("map.input.file").substring(
      (job.get("map.input.file")).lastIndexOf(
      System.getProperty("file.separator")) +1);
      try {
        server = new StreamingUpdateSolrServer(url, 100, 5);
      } catch (MalformedURLException e) {
        e.printStackTrace();
      }
  }

  @Override
  public void map(LongWritable key, Text val,
   OutputCollector <NullWritable, NullWritable> output,
   Reporter reporter) throws IOException {

    st = new StringTokenizer(val.toString());
    lineCounter = 0;
    while (st.hasMoreTokens()) {
      thisDoc = new SolrInputDocument();
      thisDoc.addField("id", fileName + " "
       + key.toString() + " " + lineCounter++);
      thisDoc.addField("text", st.nextToken());
      try {
        server.add(thisDoc);
      } catch (SolrServerException e) {
        e.printStackTrace();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }

  @Override
  public void close() throws IOException {
  try {
      server.commit();
    } catch (SolrServerException e) {
      e.printStackTrace();
    }
  }
}

Compile the code how you see fit (I am old school and still use ant), and the job is ready to run!

To index all of the comedies, you can run the job with the compiled jar file as follows. Note that you must tell hadoop to include an additional Solr jar at runtime:

[user@localhost SolrTest]$ hadoop jar solrtest.jar \
 -libjars <solr_install_dir>/dist/apache-solr-solrj-3.5.0.jar \
 shakespeare/comedies shakespeare_output

If you then query the Solr server (via the web GUI at http://localhost:8983/solr/admin, the default search is *:* which works well for a quick test) you should see something like the following:

<response>
 <lst name="responseHeader">
  <int name="status">0</int>
  <int name="QTime">35</int>
  <lst name="params">
    <str name="indent">on</str>
    <str name="start">0</str>
    <str name="q">*:*</str>
    <str name="version">2.2</str>
    <str name="rows">10</str>
  </lst>
 </lst>
<result name="response" numFound="377452" start="0">
<doc>
 <str name="id">troilusandcressida 0 0</str>
 <arr name="text">
  <str>TROILUS</str>
 </arr>
</doc>
<doc>
 <str name="id">troilusandcressida 0 1</str>
 <arr name="text">
  <str>AND</str>
 </arr>
</doc>
<doc>
 <str name="id">troilusandcressida 0 2</str>
 <arr name="text">
  <str>CRESSIDA</str>
 </arr>
</doc>

Further Tuning/Investigation Opportunities

Performance Implications of StreamingUpdateSolrServer – possibility of using EmbeddedSolrServer: What are the optimal tuning parameters for number of threads and batch size when using StreamingUpdateSolrServer? More investigation could be done here. It is also possible to use an EmbeddedSolrServer (per the Rackspace case study in Hadoop: The Definitive Guide), though it does add some maintenance overhead to create the indexes in a distributed fashion and then later re-combine. I opted to use the StreamingUpdateSolrServer because I believe that it is simpler to get up and running in a small test environment.

How to minimize the memory requirements in the Map code: I haven’t been a full time Java developer in many years, so there are almost certainly things that I’m missing on how to minimize the memory overhead of the objects used in the Map code. Since this is called for each line in the input, it is critical to make this code as lean as possible. One tip that I came across on this topic is to use (mutable) org.apache.hadoop.io.Text objects rather than (immutable) Strings. I avoided creating any new String objects in this example Map code, but the point is worth noting for other exercises.

Resources that I found useful

Filed under:

10 Responses
  • Pere / March 02, 2012 / 8:15 AM

    Hi,
    Good article.
    In the article you mention that using EmbeddedSolr “adds some maintenance overhead to create the indexes in a distributed fashion and then later re-combine”. I would like to point out that it has some advantages as well. On one hand it allows you to create the index in a completely distributed fashion. Using Map and Reduce phases allows you to shuffle your documents (preparing them better for a sharding environment). It is a batch indexing strategy that completely decouples the frontend with the backend – boosting the performance of both parts. Finally, it allows you do to deploy the index atomically, leveraging Solr’s core swap and rollback mechanisms. We have previously published an article in our blog in the past that explains how to buid “frontend views” in a completely distributed way using this philosophy: http://www.datasalt.com/2011/10/front-end-view-generation-with-hadoop/ . We thought this would be meaningful to the article.

    Cheers.

  • terrence pietrondi / March 26, 2012 / 8:24 PM

    In this example, what purpose does “shakespeare_output” serve when running the job since the output is the solr document?

  • Adam Smieszny / March 27, 2012 / 5:16 AM

    @Terrence Pietrondi:
    Hi Terrence. The shakespeare_output directory in HDFS will hold some metadata that is generated by MapReduce. In particular, a file named _SUCCESS that will indicate if the job completed successfully. It will also contain (empty) files names part_0000x, that are empty because we don’t emit anything from our code.

  • terrence pietrondi / March 30, 2012 / 10:11 AM

    What are the suggested specs on the cloudera traning vm image, when attempting this tutorial, the process ran out of memory. I gave it two processors and 4080 of memory. My host machine is a mac ox s 10.6.8, 2.66 GHz intel core 2 due, 8 gig memory.

  • Dan Brown / April 01, 2012 / 8:22 AM

    A while back we had the need to generate indexes via M/R (solr 1.3). I’ve documented the details here: http://www.likethecolor.com/2010/09/26/using-hadoop-to-create-solr-indexes/ Thought I might be useful to others here. I would love any suggestions. Thanks.

  • Adam Smieszny / April 01, 2012 / 8:39 AM

    @ Terrence Pietrondi:
    Hi Terrence. If you find that the process runs out of memory, most likely the issue doesn’t have to do with the VM specs (what you have sounds like plenty of CPU and RAM available).
    Rather, it probably requires increasing the memory allocated for the MapReduce processes.
    If you are using Cloudera Manager, modifying this configuration is quite simple. Please view the following video for more information:
    http://www.cloudera.com/resource/cloudera-manager-service-and-configuration-management-part-1/

  • terrence pietrondi / April 06, 2012 / 7:59 AM

    I was able to get the demo running after rebooting the vmware. For anyone looking, here is the build file I used:

    https://github.com/tepietrondi/oddsandendsrandomcode/blob/master/cloudera-hadoop-solr-test/build.xml

  • Pravin / June 10, 2012 / 11:18 AM

    Very useful article.

    Thanks for sharing.

Leave a comment


× 7 = thirty five