Making Apache Spark Easier to Use in Java with Java 8

Our thanks to Prashant Sharma and Matei Zaharia of Databricks for their permission to re-publish the post below about future Java 8 support in Apache Spark. Spark is now generally available inside CDH 5.

One of Apache Spark‘s main goals is to make big data applications easier to write. Spark has always had concise APIs in Scala and Python, but its Java API was verbose due to the lack of function expressions. With the addition of lambda expressions in Java 8, we’ve updated Spark’s API to transparently support these expressions, while staying compatible with old versions of Java. This new support will be available in Spark 1.0.

A Few Examples

The following examples show how Java 8 makes code more concise. In our first example, we search a log file for lines that contain “error”, using Spark’s filter and count operations. The code is simple to write, but passing a Function object to filter is clunky:

Java 7 search example:

JavaRDD<String> lines = sc.textFile("hdfs://log.txt").filter(
  new Function<String, Boolean>() {
    public Boolean call(String s) {
      return s.contains("error");
    }
});
long numErrors = lines.count();

 

(If you’re new to Spark, JavaRDD is a distributed collection of objects, in this case lines of text in a file. We can apply operations to these objects that will automatically be parallelized across a cluster.)

With Java 8, we can replace the Function object with an inline function expression, making the code a lot cleaner:

Java 8 search example:

JavaRDD<String> lines = sc.textFile("hdfs://log.txt")
                          .filter(s -> s.contains("error"));
long numErrors = lines.count();

 

The gains become even bigger for longer programs. For instance, the program below implements Word Count, by taking a file (read as a collection of lines), splitting each line into multiple words, then counting the words with a reduce function.

Java 7 word count:

JavaRDD<String> lines = sc.textFile("hdfs://log.txt");

// Map each line to multiple words
JavaRDD<String> words = lines.flatMap(
  new FlatMapFunction<String, String>() {
    public Iterable<String> call(String line) {
      return Arrays.asList(line.split(" "));
    }
});

// Turn the words into (word, 1) pairs
JavaPairRDD<String, Integer> ones = words.mapToPair(
  new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String w) {
      return new Tuple2<String, Integer>(w, 1);
    }
});

// Group up and add the pairs by key to produce counts
JavaPairRDD<String, Integer> counts = ones.reduceByKey(
  new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer i1, Integer i2) {
      return i1 + i2;
    }
});

counts.saveAsTextFile("hdfs://counts.txt");

 

With Java 8, we can write this program in just a few lines:

Java 8 word count:

JavaRDD<String> lines = sc.textFile("hdfs://log.txt");
JavaRDD<String> words =
    lines.flatMap(line -> Arrays.asList(line.split(" ")));
JavaPairRDD<String, Integer> counts =
    words.mapToPair(w -> new Tuple2<String, Integer>(w, 1))
         .reduceByKey((x, y) -> x + y);
counts.saveAsTextFile("hdfs://counts.txt");

 

We are very excited to offer this functionality, as it opens up the simple, concise programming style that Scala and Python Spark users are familiar with to a much broader set of developers.

Availability

Java 8 lambda support will be available in Spark 1.0, which will be released in early May. Although using this syntax requires Java 8, Spark 1.0 will still support older versions of Java through the old form of the API. Lambda expressions are simply a shorthand for anonymous inner classes, so the same API can be used in any Java version.

Prashant Sharma is a committer on the Spark project. Matei Zaharia is the creator of Spark and the CTO of Databricks.


Spark Summit 2014 is coming (June 30 – July 2)! Register here to get 20% off the regular conference price.

Filed under:

No Responses

Leave a comment


one + = 6