How To: Use Oozie Shell and Java Actions

Categories: General How-to Oozie Pig

Ed. Note (Oct. 16, 2015): This post has been updated for CDH 5.x; some external links have been updated as well.

Apache Oozie, the workflow coordinator for Apache Hadoop, has actions for running MapReduce, Apache Hive, Apache Pig, Apache Sqoop, and Distcp jobs; it also has a Shell action and a Java action. These last two actions allow us to execute any arbitrary shell command or Java code, respectively.

In this blog post, we’ll look at an example use case and see how to use both the Shell and Java actions in more detail. Please follow along below; you can get a copy of the full project at Cloudera’s GitHub as well. This how-to assumes some basic familiarity with Oozie.

Example Use Case

Suppose we’d like to design a workflow that determines which earthquakes from the last 30 days have a magnitude greater than or equal to that of the largest earthquake in the last hour; also, we’d like to run this workflow every hour. One last requirement for our workflow is that in order to save bandwidth and time, we’d like to be able to skip downloading and processing the 30 days of earthquake data if there were no “large” earthquakes within the last hour; because “large” is subjective, we’ll just go with 3.2 for this example but we should make this easy to configure.

We don’t have the earthquake data ourselves, but luckily the US Geological Survey (USGS) maintains earthquake datasets in a few formats that are updated quite frequently here. We’re interested in their “Past Hour” and “Past 30 Days” datasets. We’ll use the CSV (comma-separated value) format because it’s simple. The URL for the “Past Hour” dataset is and the URL for the “Past 30 Days” is

Going back to our requirements, we can create a simple Pig script to process the 30 days of earthquake data. But what about checking the last hour of data or downloading the data from the USGS website?  We can use the Shell action to run a shell script to check the last hour of data and the Java action to run some Java code to handle downloading the data. We’ll go into more detail on this later, but first let’s learn about some information that we’ll find useful for this workflow.

Capturing Data from an Action

We’ll need a way to get some small pieces of data (i.e. the magnitude of the largest earthquake from the past hour) from our shell script into the workflow itself. We can tell Oozie to capture any data that the shell action outputs to STDOUT so it’s available in the workflow by simply adding the <capture-output/> element. The outputted data must be in Java Properties file format and it must not exceed 2KB (by default). Then all we have to do is have our shell script echo the necessary information to STDOUT; we’ll also need to make sure that it doesn’t echo anything else.

While we won’t need to do this in our example, <capture-output/> can also be used to get data from the Java action. The only difference is that instead of STDOUT, we have to write to a (local) file whose path must be obtained from System.getProperty("").

Shell Action Caveats

The Shell action has the following caveats:

  • Interactive commands are not supported.
  • MR1:
    • In an unsecure cluster, everything is run as the user who started the TaskTracker where our shell script is running (mapred user in CDH4); in a “Kerberized” cluster, it will run as the UNIX user of whomever submitted the workflow. This is in contrast to MapReduce-based actions, which, for the purposes of interaction with Hadoop, are run as the user who submitted the workflow –although the UNIX process for the task still runs as mapred.
  • YARN/MR2:
    • The user everything is run as depends entirely on your ContainerExecutor YARN configurations (i.e. yarn.nodemanager.container-executor.class). If the DefaultContainerExecutor is being used, then everything will run as the user who started the NodeManagers (yarn user). If LinuxContainerExecutor is being used, then everything will run as whoever is configured in yarn.nodemanager.linux-container-executor.nonsecure-mode.local-user (“nobody” by default). If LinuxContainerExecutor is being used and yarn.nodemanager.linux-container-executor.nonsecure-mode.limit-user is set to “true”, then everything will run as the user who submitted the job. Alternatively, in a Kerberized cluster with LinuxContainerExecutor, everything will always run as the user who submitted the job. This can be a bit confusing, so I created this table:

  • The Shell action is executed on an arbitrary node in the cluster.
  • Different operating systems may have different versions of the same shell commands.

The implications of that third caveat are very important. Oozie executes the shell action in the same way it executes any of the other actions: as a MapReduce job. In this case, it’s a 1-mapper-0-reducer job, which is why it can be executed on any node in the cluster. This means that any command or script that we want to execute has to be available on that node; because we don’t know which node the shell action will be executed on, the command or script has to be available on all nodes! This is fine for typical built-in shell commands like echo or grep, but can be more problematic for programs such as matlab, which must not only be installed but may also require a license. Instead, we’ll be putting our script in the same directory as the workflow.xml and taking advantage of the<file>tag to have Oozie copy it to the proper node for us.

Even though two operating systems, or even two different versions of the same operating system, may have the same built-in commands or programs, they may behave differently or accept different arguments. For example, we’ll be using the tail command later; on Mac OS 10.7.5 we can specify the number of lines with the following arguments, but this won’t work properly on CentOS 6.2:

This doesn’t mean that we can’t use the tail command though; it just means that we have to be careful to ensure that all of the machines on which our Shell action could possibly run have compatible versions of the built-in commands. For example, the following arguments work correctly on both Mac OS 10.7.5 and CentOS 6.2:

That said, the script has been tested on Mac OS 10.7.5 and CentOS 6.2, but may require minor tweaking on other operating systems or versions.

As a general recommendation, when possible, it’s best to use a built-in action type instead of the Shell Action. For instance, if you want to run a Hive script, it’s best to use the Hive action, instead of the Shell Action running Hive.

Java Action Caveats

The Java action has the following caveats:

  • The Java action is executed on an arbitrary node in the cluster.
  • In CDH 4.x, calling System.exit(int n) will always make the Java action do an “error to” transition regardless of the exit code. In CDH 5.x, the action will do an “ok to” transition if the exit code is 0, and an “error to” transition otherwise.

The Java action is also executed on an arbitrary node in the cluster for the same reason the Shell action is; however, this is typically less problematic for the Java action because external resources would be JAR files that we’d be already including anyway.

In CDH 4.x, is also important that our Java code not call System.exit(int n) as this will make the Java action do an “error to” transition, even if the exit code was 0; instead, an “ok to” transition is indicated by gracefully finishing main and an “error to” transition is indicated by throwing an exception. In CDH 5.x, you can either throw an exception, gracefully finish main, or call System.exit(int n) and it will be handled as expected.

Our Shell Script

We’ll use a shell script to download the “Past Hour” data, get the magnitude of the largest earthquake (in the last hour), and determine if we need to go on to process the “Past 30 Days” data. Let’s call the script

We’ll need to take in the minimum earthquake magnitude that we’re interested in as an argument (for deciding whether or not to process the “Past 30 Days” data):

Now we’ll need the script to download the “Past Hour” data to a file; we can use curl for this:

The -o argument tells curl where to download the file, the -s and -S arguments together make curl silent except for errors, and the -f argument makes curl return an error instead of outputting server error HTML pages (e.g. error 404).

The -s argument is important here because we don’t want to pollute STDOUT. The working directory, where the hour.txt file will be written to, is on the TaskTracker/NodeManager host that’s running our script; it will be automatically removed once the action ends, so we don’t have to worry about temporary files or anything like that. Here’s what the downloaded file looks like:

The next step is to determine the magnitude of the largest earthquake from the file we just downloaded. We’ll use a combination of standard Linux tools for this. The first line in the file is always the header information for the columns, which we don’t actually want; we can use the tailcommand to keep everything except for the first line:

which looks like this:

Going by the headers we just removed, the fifth column is the magnitude. We can get just the fifth column by using the cut command:

The -f 5 tells it to get the fifth column and the -d ","tells it to use comma as the column separator. This gives us:

We can now use the sort command to put the magnitudes in sorted order.

We can then use the tail command again to get just the last magnitude, which must be the largest because we just sorted them:

Finally, we get just this:

We now need to compare the largest magnitude with earthquakeMin to determine whether or not to process the “Past 30 Days” data. It’s not recommended to use bash directly for comparing floats, but we can use the bc command instead:

where largest is the variable where we stored the largest magnitude from the “Past Hour” data. We can then check the value of compare and echo to STDOUT accordingly:

Here we’re echoing, in Java Properties file format, whether or not largest is greater than or equal to earthquakeMin and if so, what largest is. We’ll see how these values can be picked up by the workflow later.

One last thing that we should take care of in our shell script: what happens if there were no earthquakes in the past hour? 

Our script will give the above error message (which would cause Oozie to FAIL the workflow). We can prevent this from happening by adding a simple check for this edge case:

This will use the cat command to echo the contents of the hour.txt file and pass it to the wc command. The -l argument tells wc to count the number of lines. We know that we need at least two lines (the header and one line of data) to not get the error message, so we can check if numLines is less than 2. We can then echo the property from before as false, instead of continuing with the script.

Our final script (with some additional minor details) looks like this:

Note that we set -e to make sure that the script exits with an error if any of these commands had an error, instead of continuing. If the script exits with a non-zero exit code, the shell action will follow the “error to” transition instead of the “ok to” transition.

Making a Decision

Once our workflow finishes running our shell script, we need to make a decision. We can use the aptly-named Decision Control Node for this. Decision nodes are useful when we want our workflow to transition to a different node based on some criteria; it’s essentially identical to the switch statement available in many programming languages. Here’s what ours will look like:

Earlier we saw in our shell script that we outputted whether or not we wanted the workflow to process the “Past 30 Days” data by echoing either isLarger=true or isLarger=false. We can use the wf:actionData EL function to retrieve this key=value pair from our shell action anywhere in the workflow. This EL function’s argument is the name of the action (which we’ll be calling “shell-check-hour” later); this then gives us an array of all of the key=value pairs that Oozie captured from that action. So here we’re checking if isLarger is true. If so, then Oozie will transition the workflow to the “get-data” action; otherwise, the “default to” transition will be followed, which goes to “end” to finish the workflow and skipping the processing of the “Past 30 Days” data.

Downloading the Data to HDFS

Before we can run a Pig job on the “Past 30 Days” data, we need to get it into HDFS. We could use the shell action again with curl and the Hadoop CLI, but there are some issues with this approach, including the second caveat with the Shell action that we saw earlier about which user is executing the script. We’ll instead use the Java action, so we don’t have to worry about this issue.

The Java action is particularly powerful because it allows us to execute any Java code we want. Here’s what our Java main function looks like:

The first four lines open a connection to the USGS website where the dataset is located and get an InputStream. We then get the HDFS FileSystem and create an FSDataOutputStream, which we can use to write a new file, month.txt. We can then use Apache Commons IOUtils to simplify writing the input stream from the connection to the output stream writing to month.txt. And finally, we close the two streams and disconnect the connection.

The folder that we want to put month.txt in is passed in as an argument (i.e. args[0]) to our Java program; we’ll see how to do this in the workflow later. Though we’ll need to make sure that this location is set to be the input of our Pig job, which we’ll look at next.

Processing the Data

The final step is to process the data that we downloaded to HDFS in the previous section. The processing that we want to do is to filter out any earthquakes whose magnitude is less than that of the largest earthquake we found in the “Past Hour” data earlier. As before, we can get this magnitude in the workflow with this EL function call:

We’ll see how to pass this to the Pig script from our workflow later. For now, let’s just assume we have a variable in our Pig script named $MINMAG. We’ll also need to have variables for the input file (the “Past 30 Days” data we downloaded to HDFS) and the output directory: $INPUT and $OUTPUT respectively.

First we need to LOAD the data to form a relation:

The USING PigStorage(',') is to tell it to parse the input data as comma-delimited instead of tab-delimited, which is the default. The AS (a1, …) is to tell it the schema that we want it to use. When we looked at the header from the input file, we saw that the magnitude of the earthquake in a data row was the fifth column; hence why we need to make sure to let Pig know that a5 is a float — the rest of the columns we won’t be using so we don’t need to bother specifying data types for them.

Now that we have the data loaded into Pig, we can simply use the FILTER operator:

The expression (a5 >= $MINMAG) is telling Pig to filter out any data row whose a5 (i.e. the magnitude of the earthquake) is less than $MINMAG (i.e. the largest earthquake in the last hour). The resulting rows that pass the filter are put into relation B.

What happens to the first row of our data, the one with the header information? Its fifth column is obviously not a float.

Note that the fifth column is an empty string instead of “mag”. Conveniently, the FILTER operator rejects null values, so the header row won’t pass the filter and won’t be a problem.

Now that we’ve filtered the data, we need to output it back to HDFS. We can do that with the STORE operator:

As before with the LOAD operator, we specify USING PigStorage(',') to keep the data in a comma-delimited format.

The end result is that the output from this Pig job has all of the data rows from the last 30 days whose magnitude is greater than or equal to that of the largest earthquake from the last hour. Here’s our final Pig script:

Our Workflow

We’re almost done: let’s take a quick look at the 3 actions in our workflow. First, we have our shell action:

The <exec> element tells Oozie what shell command to execute; this can be a script (like our script) or an existing command such as echo or grep. We then use the <argument> element to pass the minimum earthquake magnitude that we want to consider to our script; here, we’re using ${earthquakeMinThreshold}, which we’ll define in our file. (You can have multiple <argument> elements for passing multiple arguments.)  The <file> argument tells Oozie to add our script to the distributed cache, making it available on whichever node the shell action gets executed on. Alternatively, we could place our script on every node in the cluster, but that is a lot more work and would require synchronizing updates to the script. We mentioned the <capture-output/> element earlier: it’s what tells Oozie to capture the shell command’s STDOUT so we can use it later in our workflow.

Our shell action transitions to the decision node, which we looked at earlier. After the decision node, we have our Java action to download the “Past 30 Days” data to HDFS:

We use the <main-class> element to set the class name of our Java code. The JAR file containing the class will need to be made available to the action; in my previous blog post about the Oozie Sharelib and its follow-up post, we saw a few ways to include additional JARs with our workflow. For simplicity, we’ll just put the JAR file in the lib folder next to our workflow.xml so Oozie will pick it up automatically. The <arg> element is essentially the same as the <argument> element we saw in the Shell action. Here we pass ${dataInputDir} to be the path where we want to store the downloaded data in HDFS; it would be defined in our In the
section, we delete ${dataInputDir} and then recreate it to make sure that it exists (our Java code would throw an exception otherwise) and that its clean before we download the data to it.

Lastly, we have our Pig action to process the data:

The <script> element tells the Pig action the name of our Pig script, which we can put next to our workflow.xml. The
elements can then be used to pass variable names to the Pig script, such as the INPUT, OUTPUT, and MINMAG ones we used earlier. For INPUT, we’ll use ${dataInputDir}; note that this is where we told the Java action to download the data to. For OUTPUT, we’ll use ${outputDir}, which we can define in our to be where to output the final data. We’re also setting MINMAG to the largest variable from our shell script as previously discussed.

You may be wondering where the usual <job-tracker> and <name-node> elements are in our actions above. To simplify our workflow, we can put these, and other common configuration properties such as the queue name, into the <global> section, as described in this blog post on shortening your workflow definitions.

Running the Workflow

Before creating a coordinator job to run the workflow every hour, we can first try running the workflow directly to make sure that it works. If you downloaded the full project from Cloudera’s GitHub, you should be able to follow the README.txt to build the JAR file and you’ll have your workflow folder already prepared. Once you’ve done that, you can upload the workflow folder to your home directory in HDFS:

In HDFS, you should now have something like this:

Because the workflow uses a Pig action, you’ll need to make sure the sharelib is properly installed or the workflow will fail; please see my previous blog post about the Oozie Sharelib. You can then submit the workflow to Oozie:

It shouldn’t take more than a few minutes to run. If there wasn’t a “large” earthquake in the last hour, it won’t run the Java or Pig actions (and should complete very quickly). You can lower the value of earthquakeMinThreshold in to compensate or try waiting until after a large earthquake.

Creating a coordinator job to run the workflow every hour is left as an exercise for the reader.


In this blog post we focused mostly on the Shell, Java, and Pig actions; we also looked at the decision node. You should now be able to write your own Shell and Java actions.

The Shell and Java actions are great for times when there isn’t an appropriate action to accomplish a task in your Oozie workflow. While there are some limitations to the Shell action, its ease-of-use and lack of requirement to compile any code are quite useful; plus, you can use it to execute scripts from other languages such as python or perl. The Java action is also quite easy to use and has fewer caveats than the Shell action, but requires recompiling every time you change something. Ultimately, creating your own action type is the most flexible as you have more control over how it interacts with the outside world; plus, it’s directly reusable by other users and can even be contributed back to the Apache Oozie project. Creating your own action type is quite a bit more work though.

Further Reading

Robert Kanter is a Software Engineer at Cloudera, the Apache Oozie PMC Chair, and an Apache Hadoop committer.


2 responses on “How To: Use Oozie Shell and Java Actions

  1. Sanjay Subramanian

    Thanks. This along with the email discussions on CDH users groups, I have finally launched my project into production. I have 3 daily Oozie flows each flow containing about 30-40 actions each (each flow is controlled by a coordinator). I successfully used the following Actions
    – Hive
    – Jave
    – Map Reduce
    – Shell

  2. Vinay

    In shell action,

    1. what should element contain ? Is it the path of the shell script or just name of the shell script?

    2. what should element contain ? apache documentation says it should contain path of shell script.

    Please clarify above both queries.