How-to: Prepare Your Apache Hadoop Cluster for PySpark Jobs

Categories: CDH Hadoop How-to Spark

Proper configuration of your Python environment is a critical pre-condition for using Apache Spark’s Python API.

One of the most enticing aspects of Apache Spark for data scientists is the API it provides in non-JVM languages for Python (via PySpark) and for R (via SparkR). There are a few reasons that these language bindings have generated a lot of excitement: Most data scientists think writing Java or Scala is a drag, they already know Python or R, or don’t want to learn a new language to write code for distributed computing. Most important, these languages already have a rich variety of numerical libraries with a statistical, machine learning, or optimization focus.

Like everything in engineering, there are tradeoffs to be made when picking these non-JVM languages for your Spark code. Java offers advantages like platform independence by running inside the JVM, self-contained packaging of code and its dependencies into JAR files, and higher performance since Spark itself runs in the JVM. If you chose to use Python, users lose such advantages. In particular, managing dependencies and making them available for PySpark jobs on a cluster can be a pain. In this blog post, I will explain what your options are.

To determine what dependencies are required on the cluster ahead of time, it is important to understand where different parts of Spark code get executed and how computation is distributed on the cluster. Spark orchestrates its operations via the driver program. The driver program initializes a SparkContext, in which you define your data actions and transformations, e.g. map, flatMap, and filter. When the driver program is run, the Spark framework initializes executor processes on the worker nodes that then process your data across the cluster.

Self-contained Dependency

If the Python transformations you define use any third-party libraries, like NumPy or nltk, then the Spark executors will need access to those libraries when they execute your code on the remote worker nodes. A common situation is one where we have our own custom Python package that contains functionality we would like to apply to each element of our RDD. A simple example of this is illustrated below. I assume a SparkContext is already initialized as sc, as in the PySpark shell.

After creating a SparkContext, you create a simple rdd of four elements and call it int_rdd. Then you apply the function import_my_special_package to every element of the int_rdd. This function just imports my.special.package and then returns the original argument passed to it. This has the same effect as using classes or functions defined in my.special.package because Spark requires that each Spark executor can import my.special.package when its functionality is needed.

If you only need a single file inside my.special.package you may direct Spark to make this available to all executors by using the --py-files option in your spark-submit command and specifying the local path to the file. You may also specify this programmatically by using the sc.addPyFiles() function. If you use functionality from a package that spans multiple files in it, you will be better off making an *.egg for the package, as the --py-files flag also accepts a path to an egg file. (Caveat: if your package depends on compiled code and machines in your cluster have different CPU architectures than the code you compile your egg on, this will not work.)

In short, if you have a self-contained dependency there are a two ways that you can make required Python dependency available to your executors:

  • If you only depend on a single file, you can use either the --py-files command line option or programmatically add them to the SparkContext with sc.addPyFiles(path) and specify the local path to that Python file.
  • If you have have a dependency on a self contained module (meaning a module with no other dependencies) you can create an egg or zip file of that module and use either the --py-files command line option or programmatically add them to the SparkContext with sc.addPyFiles(path) and specify the local path to that egg or zip file.

Complex Dependency

If the operations you want to apply in a distributed way rely on complex packages that themselves have many dependencies, you have a real challenge. Let’s take the simple snippet below as an example:

Again, all we are doing is importing pandas. Pandas depends on NumPy, SciPy, and many other packages. We have a problem here because we cannot make an egg that contains all of the required dependencies. Instead, we need to have the required Python environment already set up on the cluster and the Spark executors configured to use that Python environment.

Now, let’s consider our options for making these libraries available. While pandas is too complex to just distribute a *.py file that contains the functionality we need, we could theoretically create an *.egg for it and try shipping it off to executors with the --py-files option on the command line or #addPyFiles() on a SparkContext. A major issue with this approach is that *.egg files for packages containing native code (which most numerically oriented Python packages do) must be compiled for the specific machine it will run on.

An assumption that anyone doing distributed computing with commodity hardware must assume is that the underlying hardware is potentially heterogeneous. A Python egg built on a client machine will be specific to the client’s CPU architecture because of the required C compilation. Distributing an egg for a complex, compiled package like NumPy, SciPy, or pandas is a brittle solution that is likely to fail on most clusters, at least eventually. This means we should prefer the alternative approach: have our required Python packages already installed on each node of the cluster and specify the path to the Python binaries for the worker nodes to use.

As long as the Python installations you want to use are in a consistent location on your cluster, you can set the PYSPARK_PYTHON environment variable to the path to your Python executables and Spark will use those as the Python installations your executors. You can set this environment variable on a per-session basis by executing the following line of the command line:

If you would like to consistently use this PYSPARK_PYTHON definition, you can add that line to your spark-env.sh. In CDH this script is located at /etc/spark/conf/spark-env.sh (and at /opt/cloudera/parcels/CDH/lib/spark/conf if you’re using parcels). If you set PYSPARK_PYTHON in spark-env.sh, you should check that users have not set this environment variable already with the following lines:

If you have complex dependencies like pandas or SciPy, you can create the required Python environment on each node of your cluster and set PYSPARK_PYTHON to the path to the associated Python executable.

Installing and Maintaining Python Environments

Installing and maintaining Python environments on a cluster is not an easy task, but it is the best solution that allows making full use of the Python package ecosystem with PySpark. In the best possible world, you have a good relationship with your local sysadmin and they are able and willing to set up a virtualenv or install the Anaconda distribution of Python on every node of your cluster, with your required dependencies. If you are a data scientist responsible for administering your own cluster, you may need to get creative about setting up your required Python environment on your cluster. If you have sysadmin or devops support for your cluster, use it! They are professionals who know what they are doing. If you are on your own, the following, somewhat fragile, instructions may be useful to you.

If you aren’t yet worrying about long term maintainability and just need to get a Python environment set up yourself, you could take the less maintainable path of setting up virtual environments on your cluster by executing commands on each machine using Cluster SSH, Parallel SSH, or Fabric.

As an example, I provide instructions for setting up the a standard data stack (including SciPy, NumPy, scikit-learn and pandas) in a virtualenv on a CentOS 6/RHEL 6 system, assuming you have logged into every node in your cluster using cluster ssh and each node has Python and pip installed. (Note that you may need sudo access in order to install packages, like LAPACK and BLAS, in the operating system.):

Once you have a virtualenv setup in a uniform location on each node in your cluster, you can use it as the Python executable for your Spark executors by setting the PYSPARK_PYTHON environment variable to /path/to/mynewenv/bin/python.

This is not particularly simple or easily maintainable. In a follow-up post I will discuss other options for creating and maintaining Python environments on a CDH cluster.

Acknowledgements

Thanks to Uri Laserson for his invaluable feedback on the blog post. Additional thanks to Sean Owen, Sandy Ryza, Mark Grover, Alex Moundalexis, and Stephanie Bodoff.

Juliet is a Data Scientist at Cloudera, and contributor/committer/maintainer for the Sparkling Pandas project. Juliet was the technical editor for Learning Spark by Karau et al. and Advanced Analytics with Spark by Ryza et al.

facebooktwittergoogle_pluslinkedinmailfacebooktwittergoogle_pluslinkedinmail

16 responses on “How-to: Prepare Your Apache Hadoop Cluster for PySpark Jobs

  1. Jon

    Thanks for this. It would be interesting to hear your thoughts on how to organize deployment of different virtualenvs to cluster nodes. This comes up when clients on a single cluster have requirements for e.g. different versions of pandas or similar libraries.

  2. Dmitry

    Hi Juliet,

    Thank you, this is a very helpful post. I have a question though – assuming I have a homogeneous environment, can I just create a Python installation on a network share and mount it to all the nodes? Do you think it will work?

  3. Pete

    Yes would also be interested in ways to support multiple virtualenvs on a shared cluster. Any thoughts on this?

  4. Harry

    Hi Juliet,

    Thanks for this very clear explanation. You’ve got me thinking of several follow up questions:

    1. Does this problem go away if one uses a cloud-based cluster?

    2. There must be a similar problem using R on Spark. Are there similar solutions?

    3. Does Scala enjoy a home field advantage, because the libraries are already included in Spark?

  5. darkblue_b

    hmm, if all your compute nodes are the same kind of linux, then pip works well enough; just bootstrap the pip tool and then execute a few lines using your favorite method, on each node. Isn’t pip standard in Python 3x now? https://www.python.org/dev/peps/pep-0453/

    Interestingly, the Jupyter project is solving similar problems. Their solutions are worth looking into a bit.

    I think that taking time to consider and discuss “heterogeneous environments” is a distraction, best as an addendum to the unfortunate researcher who is trying to do that..

  6. Juliet Hougland

    Jon and Pete,
    Managing multiple environments on a cluster is a major challenge. This is where manually managing environments (e.g. with clusterssh and virtualenv) can quickly get unmanageable on a cluster. Continuum has an enterprise product called Anaconda Cluster that enables management.

    Dmitry,
    Let me repeat your question to make sure I understand. You are asking: “Is possible to make a Python interpreters available to PySpark via network attached storage?” The answer is “maybe, if your cluster is small.” NAS systems are meant to provide access to data, so the Python processes launched by PySpark will still need to run on the hosts in your cluster. If you really want to use NAS you can use it to help distribute Python bits to your nodes, but then you should be careful as some dependencies are compiled and NAS systems have a limit to the number of clients they can support. In general, I wouldn’t recommend doing it this way.

    Harry,
    1. This problem still exists in a cloud based infrastructure. It is possible to configure your setup scripts or configuration files to setup the Python environments you require. After this blog post, my colleague Mark Brooks sent me an example of one of his Cloudera Director configuration files that downloads and installs Python and associated needed dependencies. Essentially, you specify a bootstrap script that installs Python and pip installs required dependencies.
    2. I am less familiar with R’s dependency management options. I do know that one available option is to use Anaconda/Anaconda Cluster as it is able to manage R environments.
    3. Scala and Java both have an advantage because of the JVM tooling and build systems. When you declare a decency in a pom.xml file or via SBT, you build tool has the ability to package the dependencies with the jar. You can build these jars and specify them in your spark-submit command.

    darkblue_d,
    1. What I describe in this post is how to do that on a large cluster.
    2. Jupyter does not solve similar problems.
    3. Heterogeneous environments are extremely common in production environments.

  7. Warren

    Hi Juliet,
    There’s a mistake in your Complex Dependency example. The map doesn’t actually get executed because int_rdd.collect() won’t invoke the map that’s defined in the previous line.

    For a simple test, it should be:
    int_rdd.map(lambda x: import_pandas(x)).collect()

    Unfortunately, the import fails because pandas can’t locate some required C extensions.

  8. Eric

    Great post Juliet,
    I’m interested to read your next post and to hear how/if things will change with the new distribution of CDH 5.5 which contains many updates including, I believe, Spark 1.5.

  9. Uday Menon

    Hi Juliet,

    Thank you for your post. I’m glad you recommend the Anaconda python distribution when setting up virtualenvs. That is exactly the setup we have on our cluster of 5 nodes running CDH 5.4.3. Our spark-submit job runs fine in local mode but throws connection reset errors in yarn-client mode.

    We are working with Cloudera support on this issue (Case#: 79415; that’s how I even saw your post – they sent it to me). The first thing we tried was to set PYSPARK_PYTHON to the path to the associated Python executable in our virtualenv. That however resulted in Access Denied errors. Without this setting we get the Connection Reset error mentioned above.

    Any ideas on what might be causing this?

    Thanks
    Uday

  10. ph

    I think the suggestion of what to put in spark-env.sh is wrong:
    if [ -n “${PYSPARK_PYTHON}” ]; then
    export PYSPARK_PYTHON=
    fi
    This code says if PYSPARK_PYTHON is NOT empty, then export the default. Don’t we want if it IS empty? Like:
    if [ -z “${PYSPARK_PYTHON}” ]; then
    export PYSPARK_PYTHON=
    fi
    I wasted a day chasing this. A good lesson to not just copy and paste from blogs.

  11. Chris Cornelison

    So I guess I don’t understand the point of the –py-file option in spark-submit???
    If I zip up the contents of my virtualenv’s site-packages directory, can’t the Spark framework deliver this to the executors and make sure it is on the PYTHON_PATH?
    So far in my experiments, the zip file gets copied, but it doesn’t appear to get uncompressed (i.e. missing modules in import statements).

    Thanks,
    Chris

  12. Matt

    Before doing pip installs, you should activate the virtualenv. In the example, virtualenv is not activated. This would lead packages to get installed in the current environment.

    virtualenv
    source /bin/activate
    pip install numpy
    pip install scipy
    pip freeze # to list all packages installed on
    deactivate # to deactivate the virtualenv