How-to: Run a Simple Apache Spark App in CDH 5

Getting started with Spark (now shipping inside CDH 5) is easy using this simple example.

(Editor’s note – this post has been updated to reflect CDH 5.1/Spark 1.0)

Apache Spark is a general-purpose, cluster computing framework that, like MapReduce in Apache Hadoop, offers powerful abstractions for processing large datasets. For various reasons pertaining to performance, functionality, and APIs, Spark is already becoming more popular than MapReduce for certain types of workloads. (For more background about Spark, read this post.)

In this how-to, you’ll learn how to write, compile, and run a simple Spark program written in Scala on CDH 5 (in which Spark ships and is supported by Cloudera). The full code for the example is hosted at https://github.com/sryza/simplesparkapp.

Writing

Our example app will be a souped-up version of WordCount, the classic MapReduce example. In WordCount, the goal is to learn the distribution of letters in the most popular words in our corpus. That is, we want to:

  1. Read an input set of text documents
  2. Count the number of times each word appears
  3. Filter out all words that show up less than a million times
  4. For the remaining set, count the number of times each letter occurs

In MapReduce, this would require two MapReduce jobs, as well as persisting the intermediate data to HDFS in between them. In constrast, in Spark, you can write a single job in about 90 percent fewer lines of code.

Our input is a huge text file where each line contains all the words in a document, stripped of punctuation. The full Scala program looks like this:

 

Spark uses “lazy evaluation”, meaning that transformations don’t execute on the cluster until an “action” operation is invoked. Examples of action operations are collect, which pulls data to the client, and saveAsTextFile, which writes data to a filesystem like HDFS.

It’s worth noting that in Spark, the definition of “reduce” is slightly different than that in MapReduce. In MapReduce, a reduce function call accepts all the records corresponding to a given key. In Spark, the function passed to reduce, or reduceByKey function call, accepts just two arguments – so if it’s not associative, bad things will happen. A positive consequence is that Spark knows it can always apply a combiner. Based on that definition, the Spark equivalent of MapReduce’s reduce is similar to a groupBy followed by a map.

For those more comfortable with Java, here’s the same program using Spark’s Java API:

 

Because Java doesn’t support anonymous functions, the program is considerably more verbose, but it still requires a fraction of the code needed in an equivalent MapReduce program.

Compiling

We’ll use Maven to compile our program. Maven expects a specific directory layout that informs it where to look for source files. Our Scala code goes under src/main/scala, and our Java code goes under src/main/java. That is, we place SparkWordCount.scala in the src/main/scala/com/cloudera/sparkwordcount directory and JavaWordCount.java in the src/main/java/com/cloudera/sparkwordcount directory.

Maven also requires you to place a pom.xml file in the root of the project directory that tells it how to build the project. A few noteworthy excerpts are included below.

To compile Scala code, include:

 

which requires adding the scala-tools plugin repository:

 

Then, include Spark and Scala as dependencies:

 

Finally, to generate our app jar, simply run:

 

It will show up in the target directory as sparkwordcount-0.0.1-SNAPSHOT.jar.

Running

(Note: the following instructions only work with CDH 5.1/Spark 1.0 and later. To run against CDH 5.0/Spark 0.9, see the instructions here.)

Before running, place the input file into a directory on HDFS. The repository supplies an example input file in its data directory. To run the Spark program, we use the spark-submit script:

 

This will run the application in a single local process. If the cluster is running a Spark standalone cluster manager, you can replace --master local with --master spark://<master host>:<master port>.

If the cluster is running YARN, you can replace --master local with --master yarn. Spark will determine the YARN ResourceManager’s address from the YARN configuration file.

The output of the program should look something like this:

 

Congratulations, you have just run a simple Spark application in CDH 5. Happy Sparking!

Sandy Ryza is data scientist at Cloudera. He is an Apache Hadoop committer and recently led Cloudera’s Spark development.

Filed under:

11 Responses
  • Sriram / April 30, 2014 / 7:46 PM

    Hi Sandy

    Thanks for the detailed explanation. We have CDH 5 cluster which has Kerberos. Will spark work in a cluster with Kerberos?

  • Kosmaj / May 03, 2014 / 4:02 AM

    Thanks for a nice intro to Spark! I tried the sample on the latest CDH-5 quick VM (on Mac). Maven downloading took a while but it worked as expected. However, on the last line of the shell script I used the path of the directory containing all test files, not an individual file path.

  • Martin Tapp / May 07, 2014 / 10:31 AM

    In order to run on a Spark cluster (with spark://…), I had to set SPARK_CLASSPATH to point to my jar. Otherwise, I would get ClassNotFound exceptions.

  • Jim / May 14, 2014 / 12:57 PM

    Even setting the SPARK_CLASSPATH gives ClassNotFound exceptions. I am using CHD5 with Spark 0.9.0.

  • Andrew / June 20, 2014 / 9:02 AM

    Anyone manage to get the scala version of the example to work? I’m getting the ClassNotFound exception.

    Setting SPARK_CLASSPATH did nothing. CHD5 with Spark 0.9.0. And i’m building my jar with ‘package’ and compiling with scala 2.10.3 (same version that spark is using).

  • Sandy Ryza / June 22, 2014 / 11:43 PM

    The original version of this post had an error that it seems like a few people have run into – it didn’t mark the app jar for distribution to the executors, so, running in a distributed setting would result in ClassNotFoundExceptions.

    The fix is to add the following to the launch command:
    -Dspark.jars=target/sparkwordcount-0.0.1-SNAPSHOT.jar

    Apologies for the confusion.

  • Mike / August 26, 2014 / 7:53 PM

    I ran into a java.io.IOException: No FileSystem for scheme: hdfs

    Anyone had the same issue? please advise.

    14/08/27 02:44:52 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140827024452-0008
    Exception in thread “main” java.lang.RuntimeException: java.io.IOException: No FileSystem for scheme: hdfs
    at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:657)
    at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:389)
    at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
    at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
    at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
    at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$1.apply(HadoopRDD.scala:145)
    at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$1.apply(HadoopRDD.scala:145)
    at scala.Option.map(Option.scala:145)
    at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:145)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:168)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:59)
    at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:420)
    at com.cloudera.sparkwordcount.SparkWordCount$.main(SparkWordCount.scala:20)
    at com.cloudera.sparkwordcount.SparkWordCount.main(SparkWordCount.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.io.IOException: No FileSystem for scheme: hdfs
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2385)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167)
    at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:653)
    … 39 more

    • Justin Kestelyn (@kestelyn) / August 27, 2014 / 7:51 AM

      Mike, best if you post this issue in the “Spark” area at community.cloudera.com.

  • S.K.Nair / September 01, 2014 / 3:03 PM

    Hi,
    I ran into a couple of issues while trying this using CDH5.1. The following steps helped me with those errors.

    [root@quickstart cloudera]# find / -name mvn
    /usr/local/apache-maven/apache-maven-3.0.4/bin/mvn

    sudo su hdfs
    PATH=/usr/local/apache-maven/apache-maven-3.0.4/bin:${PATH}
    M2_HOME=/usr/local/apache-maven/apache-maven-3.0.4/bin/mvn
    export JAVA_HOME=/usr/java/jdk1.7.0_55-cloudera/

    mvn archetype:generate
    ……….
    [INFO] Parameter: groupId, Value: com.cloudera.sparkwordcount
    [INFO] Parameter: artifactId, Value: sparkwordcount
    [INFO] Parameter: version, Value: 1.0-SNAPSHOT
    [INFO] Parameter: package, Value: com.cloudera.sparkwordcount
    [INFO] Parameter: packageInPathFormat, Value: com/cloudera/sparkwordcount
    [INFO] Parameter: package, Value: com.cloudera.sparkwordcount
    [INFO] Parameter: version, Value: 1.0-SNAPSHOT
    [INFO] Parameter: groupId, Value: com.cloudera.sparkwordcount
    [INFO] Parameter: artifactId, Value: sparkwordcount
    [INFO] project created from Archetype in dir:

    update the pom.xml with file downloaded from https://raw.githubusercontent.com/sryza/simplesparkapp/master/pom.xml
    remove App.scala and place SparkWordCount.scala in src/main/scala/com/cloudera/sparkwordcount
    SparkWordCount.scala can be downloaded as refered in http://blog.cloudera.com/blog/2014/04/how-to-run-a-simple-apache-spark-app-in-cdh-5/

    run mvn package command
    bash-4.1$ mvn package
    [INFO] Scanning for projects…
    [INFO]

    ………………..**************DOWNLOADS for some extended time ************..
    ……

    bash-4.1$ cd sparkwordcount
    place a sample file in ../../data/sample.txt
    hadoop fs -put ../../data/sample.txt hdfs://quickstart.cloudera:8020/sample.txt
    spark-submit –class com.cloudera.sparkwordcount.SparkWordCount –master local target/sparkwordcount-0.0.1-SNAPSHOT.jar ../../sample.txt 2

    And got the output

  • Bin / September 23, 2014 / 6:34 PM

    Running this example, we got following exception. When check the jar file, it does contain org/slf4j/impl/StaticLoggerBinder.class. Any idea?

    Exception in thread “main” java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder
    at org.apache.spark.Logging$class.initializeLogging(Logging.scala:114)
    at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:106)
    at org.apache.spark.Logging$class.log(Logging.scala:45)
    at org.apache.spark.util.Utils$.log(Utils.scala:49)
    at org.apache.spark.Logging$class.logWarning(Logging.scala:70)
    at org.apache.spark.util.Utils$.logWarning(Utils.scala:49)

Leave a comment


4 − = three