How To: Use Oozie Shell and Java Actions

Apache Oozie, the workflow coordinator for Apache Hadoop, has actions for running MapReduce, Apache Hive, Apache Pig, Apache Sqoop, and Distcp jobs; it also has a Shell action and a Java action. These last two actions allow us to execute any arbitrary shell command or Java code, respectively.

In this blog post, we’ll look at an example use case and see how to use both the Shell and Java actions in more detail. Please follow along below; you can get a copy of the full project at Cloudera’s GitHub as well. This how-to assumes some basic familiarity with Oozie.

Example Use Case

Suppose we’d like to design a workflow that determines which earthquakes from the last 30 days have a magnitude greater than or equal to that of the largest earthquake in the last hour; also, we’d like to run this workflow every hour. One last requirement for our workflow is that in order to save bandwidth and time, we’d like to be able to skip downloading and processing the 30 days of earthquake data if there were no “large” earthquakes within the last hour; because “large” is subjective, we’ll just go with 3.2 for this example but we should make this easy to configure.

We don’t have the earthquake data ourselves, but luckily the US Geological Survey (USGS) maintains earthquake datasets in a few formats that are updated quite frequently here. We’re interested in their “Past Hour” and “Past 30 Days” datasets. We’ll use the CSV (comma-separated value) format because it’s simple. The URL for the “Past Hour” dataset is http://earthquake.usgs.gov/earthquakes/feed/csv/all/hour and the URL for the “Past 30 Days” is http://earthquake.usgs.gov/earthquakes/feed/csv/2.5/month. Unfortunately, the USGS doesn’t make all earthquake data available for 30 days, only ones with magnitudes greater than 2.5.

Going back to our requirements, we can create a simple Pig script to process the 30 days of earthquake data. But what about checking the last hour of data or downloading the data from the USGS website?  We can use the Shell action to run a shell script to check the last hour of data and the Java action to run some Java code to handle downloading the data. We’ll go into more detail on this later, but first let’s learn about some information that we’ll find useful for this workflow.

Capturing Data from an Action

We’ll need a way to get some small pieces of data (i.e. the magnitude of the largest earthquake from the past hour) from our shell script into the workflow itself. We can tell Oozie to capture any data that the shell action outputs to STDOUT so it’s available in the workflow by simply adding the <capture-output/> element. The outputted data must be in Java Properties file format and it must not exceed 2KB (by default). Then all we have to do is have our shell script echo the necessary information to STDOUT; we’ll also need to make sure that it doesn’t echo anything else.

While we won’t need to do this in our example, <capture-output/> can also be used to get data from the Java action. The only difference is that instead of STDOUT, we have to write to a (local) file whose path must be obtained from System.getProperty("oozie.action.output.properties").

Shell Action Caveats

The Shell action has the following caveats:

  • Interactive commands are not supported.
  • In an unsecure cluster, everything is run as the user who started the TaskTracker where our shell script is running (mapred user in CDH4); in a “Kerberized” cluster, it will run as the UNIX user of whomever submitted the workflow. This is in contrast to MapReduce-based actions, which, for the purposes of interaction with Hadoop, are run as the user who submitted the workflow –although the UNIX process for the task still runs as mapred.
  • The Shell action is executed on an arbitrary node in the cluster.
  • Different operating systems may have different versions of the same shell commands.

The implications of that third caveat are very important. Oozie executes the shell action in the same way it executes any of the other actions: as a MapReduce job. In this case, it’s a 1-mapper-0-reducer job, which is why it can be executed on any node in the cluster. This means that any command or script that we want to execute has to be available on that node; because we don’t know which node the shell action will be executed on, the command or script has to be available on all nodes! This is fine for typical built-in shell commands like echo or grep, but can be more problematic for programs such as matlab, which must not only be installed but may also require a license. Instead, we’ll be putting our script in the same directory as the workflow.xml and taking advantage of the <file> tag to have Oozie copy it to the proper node for us.

Even though two operating systems, or even two different versions of the same operating system, may have the same built-in commands or programs, they may behave differently or accept different arguments. For example, we’ll be using the tail command later; on Mac OS 10.7.5 we can specify the number of lines with the following arguments, but this won’t work properly on CentOS 6.2:

tail +2 hour.txt

 

This doesn’t mean that we can’t use the tail command though; it just means that we have to be careful to ensure that all of the machines on which our Shell action could possibly run have compatible versions of the built-in commands. For example, the following arguments work correctly on both Mac OS 10.7.5 and CentOS 6.2:

tail -n +2 hour.txt

 

That said, the script has been tested on Mac OS 10.7.5 and CentOS 6.2, but may require minor tweaking on other operating systems or versions.

Java Action Caveats

The Java action has the following caveats:

  • The Java action is executed on an arbitrary node in the cluster.
  • Calling System.exit(int n) will always make the Java action do an “error to” transition.

The Java action is also executed on an arbitrary node in the cluster for the same reason the Shell action is; however, this is typically less problematic for the Java action because external resources would be JAR files that we’d be already including anyway.

It is also important that our Java code not call System.exit(int n) as this will make the Java action do an “error to” transition, even if the exit code was 0; instead, an “ok to” transition is indicated by gracefully finishing main and an “error to” transition is indicated by throwing an exception.

Our Shell Script

We’ll use a shell script to download the “Past Hour” data, get the magnitude of the largest earthquake (in the last hour), and determine if we need to go on to process the “Past 30 Days” data. Let’s call the script check-hour.sh.

We’ll need to take in the minimum earthquake magnitude that we’re interested in as an argument (for deciding whether or not to process the “Past 30 Days” data):

earthquakeMin=”$1”

 

Now we’ll need the script to download the “Past Hour” data to a file; we can use curl for this:

curl http://earthquake.usgs.gov/earthquakes/feed/csv/all/hour -o "hour.txt" -s -S -f

 

The -o argument tells curl where to download the file, the -s and -S arguments together make curl silent except for errors, and the -f argument makes curl return an error instead of outputting server error HTML pages (e.g. error 404).

The -s argument is important here because we don’t want to pollute STDOUT. The working directory, where the hour.txt file will be written to, is on the TaskTracker that’s running our script; it will be automatically removed once the action ends, so we don’t have to worry about temporary files or anything like that. Here’s what the downloaded file looks like:

DateTime,Latitude,Longitude,Depth,Magnitude,MagType,NbStations,Gap,Distance,RMS,Source,EventID,Version
2013-02-19T23:32:25.000+00:00,33.472,-116.881,15.6,1.1,Ml,,79,0.1,0.23,ci,ci11246978,1361316922097
2013-02-19T23:22:34.400+00:00,33.576,-117.598,7.7,1.8,Ml,,130,0.4,0.51,ci,ci11246970,1361316305852

 

The next step is to determine the magnitude of the largest earthquake from the file we just downloaded. We’ll use a combination of standard Linux tools for this. The first line in the file is always the header information for the columns, which we don’t actually want; we can use the tailcommand to keep everything except for the first line:

tail -n +2 hour.txt

 

which looks like this:

2013-02-19T23:32:25.000+00:00,33.472,-116.881,15.6,1.1,Ml,,79,0.1,0.23,ci,ci11246978,1361316922097
2013-02-19T23:22:34.400+00:00,33.576,-117.598,7.7,1.8,Ml,,130,0.4,0.51,ci,ci11246970,1361316305852

 

Going by the headers we just removed, the fifth column is the magnitude. We can get just the fifth column by using the cut command:

tail -n +2 hour.txt | cut -f 5 -d “,”

 

The -f 5 tells it to get the fifth column and the -d “,”tells it to use comma as the column separator. This gives us:

1.1
1.8

 

We can now use the sort command to put the magnitudes in sorted order. In this case, they are already sorted, but that’s not always guaranteed, especially if there happens to be more than two earthquakes in the past hour.

tail -n +2 hour.txt | cut -f 5 -d “,” | sort -n

 

We can then use the tail command again to get just the last magnitude, which must be the largest because we just sorted them:

tail -n +2 hour.txt | cut -f 5 -d “,” | sort -n | tail -n -1

 

Finally, we get just this:

1.8

 

We now need to compare the largest magnitude with earthquakeMin to determine whether or not to process the “Past 30 Days” data. It’s not recommended to use bash directly for comparing floats, but we can use the bc command instead:

compare=`echo $largest '>=' $earthquakeMin | bc -l`

 

where largest is the variable where we stored the largest magnitude from the “Past Hour” data. We can then check the value of compare and echo to STDOUT accordingly:

  if [ "$compare" == "1" ]
  then
      echo "largest=$largest"
      echo "isLarger=true"
  else
      echo "isLarger=false"
  fi

 

Here we’re echoing, in Java Properties file format, whether or not largest is greater than or equal to earthquakeMin and if so, what largest is. We’ll see how these values can be picked up by the workflow later.

One last thing that we should take care of in our shell script: what happens if there were no earthquakes in the past hour? 

(standard_in) 1: parse error

 

Our script will give the above error message (which would cause Oozie to FAIL the workflow). We can prevent this from happening by adding a simple check for this edge case:

  numLines=`cat hour.txt | wc -l`
  if [ $numLines \< 2 ]
  then
      echo "isLarger=false"
  else
  ...

 

This will use the cat command to echo the contents of the hour.txt file and pass it to the wc command. The -l argument tells wc to count the number of lines. We know that we need at least two lines (the header and one line of data) to not get the error message, so we can check if numLines is less than 2. We can then echo the property from before as false, instead of continuing with the script.

Our final script (with some additional minor details) looks like this:

#!/bin/bash -e

earthquakeMin="$1"

curl http://earthquake.usgs.gov/earthquakes/feed/csv/all/hour -o "hour.txt" -s -S -f

numLines=`cat hour.txt | wc -l`
if [ $numLines \< 2 ]
then
      echo "isLarger=false"
else
      largest=`tail -n +2 hour.txt | cut -f 5 -d "," | sort -n | tail -n -1`
      compare=`echo $largest '>=' $earthquakeMin | bc -l`
      if [ "$compare" == "1" ]
      then
          echo "largest=$largest"
          echo "isLarger=true"
      else
          echo "isLarger=false"
      fi
fi

 

Note that we set -e to make sure that the script exits with an error if any of these commands had an error, instead of continuing. If the script exits with a non-zero exit code, the shell action will follow the “error to” transition instead of the “ok to” transition.

Making a Decision

Once our workflow finishes running our shell script, we need to make a decision. We can use the aptly-named Decision Control Node for this. Decision nodes are useful when we want our workflow to transition to a different node based on some criteria; it’s essentially identical to the switch statement available in many programming languages. Here’s what ours will look like:

<decision name="decide">
    <switch>
        <case to="get-data">
            ${wf:actionData('shell-check-hour')['isLarger']}
        </case>
        <default to="end"/>
    </switch>
</decision>

 

Earlier we saw in our shell script that we outputted whether or not we wanted the workflow to process the “Past 30 Days” data by echoing either isLarger=true or isLarger=false. We can use the wf:actionData EL function to retrieve this key=value pair from our shell action anywhere in the workflow. This EL function’s argument is the name of the action (which we’ll be calling “shell-check-hour” later); this then gives us an array of all of the key=value pairs that Oozie captured from that action. So here we’re checking if isLarger is true. If so, then Oozie will transition the workflow to the “get-data” action; otherwise, the “default to” transition will be followed, which goes to “end” to finish the workflow and skipping the processing of the “Past 30 Days” data.

Downloading the Data to HDFS

Before we can run a Pig job on the “Past 30 Days” data, we need to get it into HDFS. We could use the shell action again with curl and the Hadoop CLI, but there are some issues with this approach, including the second caveat with the Shell action that we saw earlier about which user is executing the script. We’ll instead use the Java action, which, as we mentioned earlier, doesn’t have this limitation.

The Java action is particularly powerful because it allows us to execute any Java code we want. Here’s what our Java main function looks like:

public static void main(String[] args) throws IOException {
      URL url = new URL("http://earthquake.usgs.gov/earthquakes/feed/csv/2.5/month");
      HttpURLConnection conn = (HttpURLConnection)url.openConnection();
      conn.connect();
      InputStream connStream = conn.getInputStream();

      FileSystem hdfs = FileSystem.get(new Configuration());
      FSDataOutputStream outStream = hdfs.create(new Path(args[0], "month.txt"));
      IOUtils.copy(connStream, outStream);

      outStream.close();
      connStream.close();
      conn.disconnect();
}

 

The first four lines open a connection to the USGS website where the dataset is located and get an InputStream. We then get the HDFS FileSystem and create an FSDataOutputStream, which we can use to write a new file, month.txt. We can then use Apache Commons IOUtils to simplify writing the input stream from the connection to the output stream writing to month.txt. And finally, we close the two streams and disconnect the connection.

The folder that we want to put month.txt in is passed in as an argument (i.e. args[0]) to our Java program; we’ll see how to do this in the workflow later. Though we’ll need to make sure that this location is set to be the input of our Pig job, which we’ll look at next.

Processing the Data

The final step is to process the data that we downloaded to HDFS in the previous section. The processing that we want to do is to filter out any earthquakes whose magnitude is less than that of the largest earthquake we found in the “Past Hour” data earlier. As before, we can get this magnitude in the workflow with this EL function call:

${wf:actionData('shell-check-hour')['largest']}

 

We’ll see how to pass this to the Pig script from our workflow later. For now, let’s just assume we have a variable in our Pig script named $MINMAG. We’ll also need to have variables for the input file (the “Past 30 Days” data we downloaded to HDFS) and the output directory: $INPUT and $OUTPUT respectively.

First we need to LOAD the data to form a relation:

A = LOAD '$INPUT' using PigStorage(',') AS(a1,a2,a3,a4,a5:float,a6,a7,a8,a9,a10,a11,a12,a13);

 

The USING PigStorage(‘,’) is to tell it to parse the input data as comma-delimited instead of tab-delimited, which is the default. The AS (a1, …) is to tell it the schema that we want it to use. When we looked at the header from the input file, we saw that the magnitude of the earthquake in a data row was the fifth column; hence why we need to make sure to let Pig know that a5 is a float — the rest of the columns we won’t be using so we don’t need to bother specifying data types for them.

Now that we have the data loaded into Pig, we can simply use the FILTER operator:

B = FILTER A BY (a5 >= $MINMAG);

 

The expression (a5 >= $MINMAG) is telling Pig to filter out any data row whose a5 (i.e. the magnitude of the earthquake) is less than $MINMAG (i.e. the largest earthquake in the last hour). The resulting rows that pass the filter are put into relation B.

What happens to the first row of our data, the one with the header information? Its fifth column is obviously not a float.

(DateTime,Latitude,Longitude,Depth,,MagType,NbStations,Gap,Distance,RMS,Source,EventID,Version)

 

Note that the fifth column is an empty string instead of “Magnitude”. Conveniently, the FILTER operator rejects null values, so the header row won’t pass the filter and won’t be a problem.

Now that we’ve filtered the data, we need to output it back to HDFS. We can do that with the STORE operator:

STORE B INTO '$OUTPUT' USING PigStorage(',');

 

As before with the LOAD operator, we specify USING PigStorage(‘,’) to keep the data in a comma-delimited format.

The end result is that the output from this Pig job has all of the data rows from the last 30 days whose magnitude is greater than or equal to that of the largest earthquake from the last hour. Here’s our final Pig script:

A = LOAD '$INPUT' USING PigStorage(',') AS (a1,a2,a3,a4,a5:float,a6,a7,a8,a9,a10,a11,a12,a13);
B = FILTER A BY (a5 >= $MINMAG);
STORE B INTO '$OUTPUT' USING PigStorage(',');

 

Our Workflow

We’re almost done: let’s take a quick look at the 3 actions in our workflow. First, we have our shell action:

<action name="shell-check-hour">
    <shell xmlns="uri:oozie:shell-action:0.2">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
            <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <exec>check-hour.sh</exec>
        <argument>${earthquakeMinThreshold}</argument>
        <file>check-hour.sh</file> 
        <capture-output/>
   </shell>
   <ok to="decide"/>
   <error to="fail"/>
</action>

 

The <exec> element tells Oozie what shell command to execute; this can be a script (like our check-hour.sh script) or an existing command such as echo or grep. We then use the <argument> element to pass the minimum earthquake magnitude that we want to consider to our script; here, we’re using ${earthquakeMinThreshold}, which we’ll define in our job.properties file. (You can have multiple <argument> elements for passing multiple arguments.)  The <file> argument tells Oozie to add our script to the distributed cache, making it available on whichever node the shell action gets executed on. Alternatively, we could place our script on every node in the cluster, but that is a lot more work and would require synchronizing updates to the script. We mentioned the <capture-output/> element earlier: it’s what tells Oozie to capture the shell command’s STDOUT so we can use it later in our workflow.

Our shell action transitions to the decision node, which we looked at earlier. After the decision node, we have our Java action to download the “Past 30 Days” data to HDFS:

<action name="get-data">
    <java>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <prepare>
            <delete path="${dataInputDir}"/>
            <mkdir path="${dataInputDir}"/>
        </prepare>
        <configuration>
            <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <main-class>com.cloudera.earthquake.GetData</main-class>
        <arg>${dataInputDir}</arg>
    </java>
    <ok to="filter-data"/>
    <error to="fail"/>
</action>

 

We use the <main-class> element to set the class name of our Java code. The JAR file containing the class will need to be made available to the action; in my previous blog post about the Oozie Sharelib, we saw a few ways to include additional JARs with our workflow. For simplicity, we’ll just put the JAR file in the lib folder next to our workflow.xml so Oozie will pick it up automatically. The <arg> element is essentially the same as the <argument> element we saw in the Shell action. Here we pass ${dataInputDir} to be the path where we want to store the downloaded data in HDFS; it would be defined in our job.properties. In the <prepare> section, we delete ${dataInputDir} and then recreate it to make sure that it exists (our Java code would throw an exception otherwise) and that its clean before we download the data to it.

Lastly, we have our Pig action to process the data:

<action name="filter-data">
    <pig>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <prepare>
            <delete path="${outputDir}"/>
        </prepare>
        <configuration>
            <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <script>filter-data.pig</script>
        <param>INPUT=${dataInputDir}</param>
        <param>OUTPUT=${outputDir}</param>
        <param>MINMAG=${wf:actionData('shell-check-hour')['largest']}</param>
    </pig>
    <ok to="end"/>
    <error to="fail"/>
 </action>

 

The <script> element tells the Pig action the name of our Pig script, which we can put next to our workflow.xml. The <param> elements can then be used to pass variable names to the Pig script, such as the INPUT, OUTPUT, and MINMAG ones we used earlier. For INPUT, we’ll use ${dataInputDir}; note that this is where we told the Java action to download the data to. For OUTPUT, we’ll use ${outputDir}, which we can define in our job.properties to be where to output the final data. We’re also setting MINMAG to the largest variable from our shell script as previously discussed.

Running the Workflow

Before creating a coordinator job to run the workflow every hour, we can first try running the workflow directly to make sure that it works. If you downloaded the full project from Cloudera’s GitHub, you should be able to follow the README.txt to build the JAR file and you’ll have your workflow folder already prepared. Once you’ve done that, you can upload the workflow folder to your home directory in HDFS:

hadoop fs -put Earthquake-WF Earthquake-WF

 

In HDFS, you should now have something like this:

$ hadoop fs -ls -R Earthquake-WF
-rw-r--r--   1 rkanter supergroup       1278 2013-02-25 15:54 Earthquake-WF/check-hour.sh
-rw-r--r--   1 rkanter supergroup        987 2013-02-25 15:54 Earthquake-WF/filter-data.pig
-rw-r--r--   1 rkanter supergroup       1155 2013-02-25 15:54 Earthquake-WF/job.properties
drwxr-xr-x   - rkanter supergroup          0 2013-02-25 15:54 Earthquake-WF/lib
-rw-r--r--   1 rkanter supergroup       3433 2013-02-25 15:54 Earthquake-WF/lib/Earthquake-J-1.0-SNAPSHOT.jar
-rw-r--r--   1 rkanter supergroup       3406 2013-02-25 15:54 Earthquake-WF/workflow.xml

 

Because the workflow uses a Pig action, you’ll need to make sure the sharelib is properly installed or the workflow will fail; please see my previous blog post about the Oozie Sharelib. You can then submit the workflow to Oozie:

oozie job -config path/to/job.properties -run -oozie http://some.hostname:11000/oozie

 

It shouldn’t take more than a few minutes to run. If there wasn’t a “large” earthquake in the last hour, it won’t run the Java or Pig actions (and should complete very quickly). You can lower the value of earthquakeMinThreshold in job.properties to compensate or try waiting until after a large earthquake.

Creating a coordinator job to run the workflow every hour is left as an exercise for the reader.

Conclusion

In this blog post we focused mostly on the Shell, Java, and Pig actions; we also looked at the decision node. You should now be able to write your own Shell and Java actions.

The Shell and Java actions are great for times when there isn’t an appropriate action to accomplish a task in your Oozie workflow. While there are some limitations to the Shell action, its ease-of-use and lack of requirement to compile any code are quite useful; plus, you can use it to execute scripts from other languages such as python or perl. The Java action is also quite easy to use and has fewer caveats than the Shell action, but requires recompiling every time you change something. Ultimately, creating your own action type is the most flexible as you have more control over how it interacts with the outside world; plus, it’s directly reusable by other users and can even be contributed back to the Apache Oozie project. Creating your own action type is quite a bit more work though.

Further Reading

Robert Kanter is a Software Engineer on the Platform team and an Apache Oozie Committer.

Filed under:

2 Responses
  • Sanjay Subramanian / August 05, 2013 / 4:59 PM

    Thanks. This along with the email discussions on CDH users groups, I have finally launched my project into production. I have 3 daily Oozie flows each flow containing about 30-40 actions each (each flow is controlled by a coordinator). I successfully used the following Actions
    - Hive
    - Jave
    - Map Reduce
    - Shell

  • Vinay / March 10, 2014 / 11:28 PM

    In shell action,

    1. what should element contain ? Is it the path of the shell script or just name of the shell script?

    2. what should element contain ? apache documentation says it should contain path of shell script.

    Please clarify above both queries.

Leave a comment


7 + six =