How-to: Run a Simple Apache Spark App in CDH 5

Getting started with Spark (now shipping inside CDH 5) is easy using this simple example.

(Editor’s note – this post has been updated to reflect CDH 5.1/Spark 1.0)

Apache Spark is a general-purpose, cluster computing framework that, like MapReduce in Apache Hadoop, offers powerful abstractions for processing large datasets. For various reasons pertaining to performance, functionality, and APIs, Spark is already becoming more popular than MapReduce for certain types of workloads. (For more background about Spark, read this post.)

In this how-to, you’ll learn how to write, compile, and run a simple Spark program written in Scala on CDH 5 (in which Spark ships and is supported by Cloudera). The full code for the example is hosted at


Our example app will be a souped-up version of WordCount, the classic MapReduce example. In WordCount, the goal is to learn the distribution of letters in the most popular words in our corpus. That is, we want to:

  1. Read an input set of text documents
  2. Count the number of times each word appears
  3. Filter out all words that show up less than a million times
  4. For the remaining set, count the number of times each letter occurs

In MapReduce, this would require two MapReduce jobs, as well as persisting the intermediate data to HDFS in between them. In constrast, in Spark, you can write a single job in about 90 percent fewer lines of code.

Our input is a huge text file where each line contains all the words in a document, stripped of punctuation. The full Scala program looks like this:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkWordCount {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))
    val threshold = args(1).toInt

    // split each document into words
    val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))

    // count the occurrence of each word
    val wordCounts =, 1)).reduceByKey(_ + _)

    // filter out words with less than threshold occurrences
    val filtered = wordCounts.filter(_._2 >= threshold)

    // count characters
    val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)

    System.out.println(charCounts.collect().mkString(", "))


Spark uses “lazy evaluation”, meaning that transformations don’t execute on the cluster until an “action” operation is invoked. Examples of action operations are collect, which pulls data to the client, and saveAsTextFile, which writes data to a filesystem like HDFS.

It’s worth noting that in Spark, the definition of “reduce” is slightly different than that in MapReduce. In MapReduce, a reduce function call accepts all the records corresponding to a given key. In Spark, the function passed to reduce, or reduceByKey function call, accepts just two arguments – so if it’s not associative, bad things will happen. A positive consequence is that Spark knows it can always apply a combiner. Based on that definition, the Spark equivalent of MapReduce’s reduce is similar to a groupBy followed by a map.

For those more comfortable with Java, here’s the same program using Spark’s Java API:

import java.util.ArrayList;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import scala.Tuple2;

public class JavaWordCount {
  public static void main(String[] args) {
    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));
    final int threshold = Integer.parseInt(args[1]);

    // split each document into words
    JavaRDD tokenized = sc.textFile(args[0]).flatMap(
      new FlatMapFunction() {
        public Iterable call(String s) {
          return Arrays.asList(s.split(" "));

    // count the occurrence of each word
    JavaPairRDD counts = tokenized.mapToPair(
      new PairFunction() {
        public Tuple2 call(String s) {
          return new Tuple2(s, 1);
      new Function2() {
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;

    // filter out words with less than threshold occurrences
    JavaPairRDD filtered = counts.filter(
      new Function, Boolean>() {
        public Boolean call(Tuple2 tup) {
          return tup._2 >= threshold;

    // count characters
    JavaPairRDD charCounts = filtered.flatMap(
      new FlatMapFunction, Character>() {
        public Iterable call(Tuple2 s) {
          ArrayList chars = new ArrayList(s._1.length());
          for (char c : s._1.toCharArray()) {
          return chars;
      new PairFunction() {
        public Tuple2 call(Character c) {
          return new Tuple2(c, 1);
      new Function2() {
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;



Because Java doesn’t support anonymous functions, the program is considerably more verbose, but it still requires a fraction of the code needed in an equivalent MapReduce program.


We’ll use Maven to compile our program. Maven expects a specific directory layout that informs it where to look for source files. Our Scala code goes under src/main/scala, and our Java code goes under src/main/java. That is, we place SparkWordCount.scala in the src/main/scala/com/cloudera/sparkwordcount directory and in the src/main/java/com/cloudera/sparkwordcount directory.

Maven also requires you to place a pom.xml file in the root of the project directory that tells it how to build the project. A few noteworthy excerpts are included below.

To compile Scala code, include:



which requires adding the scala-tools plugin repository:

    <name>Scala-tools Maven2 Repository</name>


Then, include Spark and Scala as dependencies:



Finally, to generate our app jar, simply run:

mvn package


It will show up in the target directory as sparkwordcount-0.0.1-SNAPSHOT.jar.


(Note: the following instructions only work with CDH 5.1/Spark 1.0 and later. To run against CDH 5.0/Spark 0.9, see the instructions here.)

Before running, place the input file into a directory on HDFS. The repository supplies an example input file in its data directory. To run the Spark program, we use the spark-submit script:

spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --master local
  target/sparkwordcount-0.0.1-SNAPSHOT.jar <input file> 2


This will run the application in a single local process. If the cluster is running a Spark standalone cluster manager, you can replace --master local with --master spark://<master host>:<master port>.

If the cluster is running YARN, you can replace --master local with --master yarn. Spark will determine the YARN ResourceManager’s address from the YARN configuration file.

The output of the program should look something like this:

(e,6), (f,1), (a,4), (t,2), (u,1), (r,2), (v,1), (b,1), (c,1), (h,1), (o,2), (l,1), (n,4), (p,2), (i,1)


Congratulations, you have just run a simple Spark application in CDH 5. Happy Sparking!

Sandy Ryza is data scientist at Cloudera. He is an Apache Hadoop committer and recently led Cloudera’s Spark development.

Filed under:

7 Responses
  • Sriram / April 30, 2014 / 7:46 PM

    Hi Sandy

    Thanks for the detailed explanation. We have CDH 5 cluster which has Kerberos. Will spark work in a cluster with Kerberos?

  • Kosmaj / May 03, 2014 / 4:02 AM

    Thanks for a nice intro to Spark! I tried the sample on the latest CDH-5 quick VM (on Mac). Maven downloading took a while but it worked as expected. However, on the last line of the shell script I used the path of the directory containing all test files, not an individual file path.

  • Martin Tapp / May 07, 2014 / 10:31 AM

    In order to run on a Spark cluster (with spark://…), I had to set SPARK_CLASSPATH to point to my jar. Otherwise, I would get ClassNotFound exceptions.

  • Jim / May 14, 2014 / 12:57 PM

    Even setting the SPARK_CLASSPATH gives ClassNotFound exceptions. I am using CHD5 with Spark 0.9.0.

  • Andrew / June 20, 2014 / 9:02 AM

    Anyone manage to get the scala version of the example to work? I’m getting the ClassNotFound exception.

    Setting SPARK_CLASSPATH did nothing. CHD5 with Spark 0.9.0. And i’m building my jar with ‘package’ and compiling with scala 2.10.3 (same version that spark is using).

  • Sandy Ryza / June 22, 2014 / 11:43 PM

    The original version of this post had an error that it seems like a few people have run into – it didn’t mark the app jar for distribution to the executors, so, running in a distributed setting would result in ClassNotFoundExceptions.

    The fix is to add the following to the launch command:

    Apologies for the confusion.

Leave a comment

× 3 = twenty one