How-To: Schedule Recurring Hadoop Jobs with Apache Oozie

Our thanks to guest author Jon Natkins (@nattyice) of WibiData for the following post!

Today, many (if not most) companies have ETL or data enrichment jobs that are executed on a regular basis as data becomes available. In this scenario it is important to minimize the lag time between data being created and being ready for analysis.

CDH, Cloudera’s open-source distribution of Apache Hadoop and related projects, includes a framework called Apache Oozie that can be used to design complex job workflows and coordinate them to occur at regular intervals. In this how-to, you’ll review a simple Oozie coordinator job, and learn how to schedule a recurring job in Hadoop. The example involves adding new data to a Hive table every hour, using Oozie to schedule the execution of recurring Hive scripts. (For the full context of the example, see the “Analyzing Twitter Data with Apache Hadoop” series.)

Adding Data to Hive Tables

In Apache Hive, tables may be partitioned to make it easier to manage a rolling window of data, or to improve performance of queries that predicate on the partition column. (To learn more about Hive, and how partitions work, see “Querying Semi-structured Data with Apache Hive”.) For the purpose of this article, we’ll work with a table, introduced in a previous blog post, used to store Twitter tweets, and populated by a data stream ingested by Flume into HDFS:

 

The important things to note here are that the root of the table’s data directory is located at /user/flume/tweets, and that the table has a partition column, which uses an integer to represent the date and hour of the partition. We’ve chosen to mimic the Java date format ‘yyyyMMddHH’. For example, the partition containing data for 2013-01-04 09:00 UTC would be identified by the partition value of 2013010409.

For this example, we will design an Oozie coordinator job that will add a new partition to the table each hour.

Writing the Hive Queries

In order to add a partition to a Hive table, you’ll need a file containing the queries that need to be executed. In Hive, partitions are created through an ALTER TABLE statement:

 

 The commands are fairly straightforward. The table that was created uses a custom SerDe, which Hive will use when performing the add partition command. In order to ensure that Hive can access the SerDe class for the table, you must add the JAR file, and then add the partition. However, several of the values in the query are variables: ${JSON_SERDE}, ${DATEHOUR}, and ${PARTITION_PATH}will be replaced by Oozie with their actual values at execution time.

Designing the Workflow

In order to run a recurring job, you’ll need to have an Oozie workflow to execute. Oozie workflows are written as an XML file representing a directed acyclic graph (DAG).

Let’s look at a simple workflow example, and dive into what’s actually going on. The following file is saved as add-partition-hive-action.xml, in a directory which is referenced as workflowRoot:

 

The graph is made up of a series of control-flow nodes and Oozie actions. The control flow nodes are XML entities like <start>, <kill>, and <end>, which identify the beginning, error, and final states of the workflow. There are also several other more complex control flow nodes called <decision>, <fork>, and <join>. More information on those nodes can be found in the Oozie documentation. In the workflow above, we’ve designed a very simple workflow, which, when visualized as a DAG, looks like this:

This is a very simple workflow that only executes a single action. Actions in Oozie are a discrete computation or processing task. They can do a number of different things: execute a MapReduce job, run a Java main class, or run a Hive or Pig script, to name a few possibilities. In this workflow, we identify the action as a Hive action with this node: <hive xmlns="uri:oozie:hive-action:0.4">. As an aside, in order to use Hive actions, the Oozie Share Lib must be installed.

Looking more closely at the Hive action, we can see how its functionality is defined. The <job-tracker> and <name-node> entities dictate the servers that the Hive job will connect to for executing its script:

 

You’ll see where the variable values ${jobTracker} and ${nameNode}come from a little later.

The <job-xml> entity is used to specify a configuration file for Hive. Since most Hive installations will need to connect to a non-default metastore database, this configuration file will likely contain database connection credentials. You may also notice that the <configuration> section of the action defines a parameter called oozie.hive.defaults, which points to the same file as <job-xml>:

 

This approach is necessary because in the current version of Oozie (CDH4.1.2), Oozie Hive actions require a Hive defaults file. The file can be provided using the workflow configuration or, in certain versions of Hive, is provided by the Hive JAR itself. However, the CDH4.1.2 version of Hive no longer packages or uses that file, so you must include it in the workflow despite it having no effect on the action. (For more information, see OOZIE-1087.) In CDH4.2, it will no longer be necessary to include the oozie.hive.defaults configuration parameter.

The most interesting pieces in this action are the <script> and <param> entities:

 

The <script> entity specifies the location of the Hive script in HDFS, and the <param> entities contain declarations of variables that will be passed into the Hive script. You can see here that the parameters being passed in match the names of the variables from the Hive script. Again, for reference, the JSON_SERDEparameter specifies the HDFS path to a JAR file containing the custom SerDe class.

Scheduling Recurring Workflows

Oozie has another type of a job called a coordinator application. Coordinator applications allow users to schedule more complex workflows, including workflows that are scheduled regularly, or that have dependencies on the output from other workflows. For this application, which is stored on an HDFS cluster in a file named add-partition-coord-app.xml, the add partition workflow is executed on an hourly basis:

 

Note in the top-level entity that the app is configured to run once an hour using the coord:hours(1) method. Oozie includes a number of different methods for specifying frequency intervals, which can be found in the documentation. You also have to specify a start time and end time for the job, which are represented by the jobStart and jobEndvariables. You’ll see where those values come from a little later on.

The second major component of the coordinator app is the datasets entity. A dataset specifies the location of a set of input data. In this case, there is a dataset called tweets, which is updated every hour, as specified by the frequency. For each execution of the Hive workflow, you will have a separate instance of the tweets dataset, starting with the initial instance specified by the dataset. A particular instance of a dataset is identified by its creation time in ISO-8601 format, such as 2013-01-04T09:00Z. In this case, the location of the data is hdfs://hadoop1:8020/user/flume/tweets/${YEAR}/${MONTH}/${DAY}/${HOUR}.

YEAR, MONTH, DAY, and HOUR are special variables that can be used to parameterize the URI template for the dataset. The done flag specifies a file that determines when the dataset is done being generated. The default value for this is _SUCCESS, which is the name of the file that a MapReduce job generates when it completes. For the purposes of this application, data was being ingested into HDFS via Flume, which does not generate a _SUCCESS file. Instead, the done flag is empty, which instructs Oozie that the dataset is considered ready when the directory exists. For this application, Flume is configured to write events to a different directory each hour.

Having a completed dataset is one of the criteria for executing an instance of the Hive workflow. The other requirement is for any input events to be satisfied. Currently, Oozie is restricted to input events in the form of available datasets. This means that an input event will not be satisfied until a particular instance of a dataset exists. In the context of a more complex data pipeline, this means that a job can be configured to execute only after all of its input data has been successfully generated. For this coordinator app, the input events look like this:

 

There are a number of things going on here: we have two distinct input events, and each of them is specified by an Oozie built-in function. Let’s break down the built-in function:

 

coord:tzOffset()is a function that returns the offset, in minutes, from UTC, of the timezone of the machine executing the workflow. In order to get the number of hours offset from UTC, we divide by 60.

coord:current(n) will return the timestamp representing the nth instance for a dataset. For example, if the time were 2013-01-17 at midnight UTC, then coord:current(0) would return 2013-01-17T00:00Z. For the tweets dataset, which has an hourly frequency, coord:current(1) would evaluate to 2013-01-17T01:00Z. However, if the tweets dataset had a frequency of coord:days(1), coord:current(1) would evaluate to 2013-01-18T00:00Z.

Putting these two pieces together, you can see that function used to determine the tweetInput input event is going to evaluate to the current dataset instance for your machine’s local timezone. Making a logical leap, the readyIndicator input event refers to the dataset instance immediately after the current one, both in your local timezone.

In order to understand why the readyIndicator event exists, let’s revisit the done flag for the tweets dataset. Since Flume does not create an indicator file when it’s finished writing to a particular directory, we left the flag empty to specify that an instance of the tweets dataset would exist when a directory exists. Since Flume will immediately begin writing to a new directory each hour, this means is that the current instance of the tweets dataset is really not ready for processing until Flume has started writing the next hour’s directory. You can ensure that the current instance is ready by using the readyIndicator event to ensure that Flume has started writing the next instance of the tweets dataset.

The final section of the coordinator app is the action, which contains the details of what will be executed:

 

The action specifies a workflow that will be executed, as well as any configuration properties that the workflow will need in order to execute. For this worfklow, we’ve specified workflowInput and dateHour. The function used to generate workflowInput, coord:dataIn('tweetInput'), will evaluate to the URI of the current tweetInput instance. To illustrate this, let’s consider an example:

The time is 2013-01-04 5:00 PM in the Pacific timezone. Then the current tweetInput instance time will be 2013-01-04T09:00Z, since Pacific time is offset by -8 hours from UTC. The tweetInput event is linked to the tweets dataset, so the HDFS URI that is returned by the coord:dataIn function is hdfs://hadoop1:8020/user/flume/tweets/2013/01/04/09.

The dateHour parameter will be used to generate the partition that we will be adding. In the Hive table, the partition is an integer representing the year, month, day, and hour in the form ‘yyyyMMddHH’. To generate the appropriate value, we use another function: coord:formatTime(coord:dateOffset(coord:nominalTime(), tzOffset, 'HOUR'), 'yyyyMMddHH').

Considering the same example as above, the coord:nominalTime() function will return the time when the coordinator action was created. Since the coordinator app has a frequency of one hour, this time will be somewhere between 2013-01-04 5:00 PM and 2013-01-04 5:59 PM (Pacific Time), depending on when the coordinator app was started. The coord:dateOffset function will take that nominal time, and offset the hours by our timezone offset, which is -8, so we’ll end up with a timestamp of the form 2013-01-04T09:XXZ, where XX is something between 00 and 59. Finally, the coord:formatTime function will translate the timestamp to a string 2013010409, which Hive will be able to convert to the integer partition value.

When all is said and done, this particular coordinator action would run these Hive commands:

 

You still need one last piece to fill in the remaining variables, such as jobStart, jobEnd, and workflowRoot. That piece is the job.properties file:

 

You can see here where the remaining variables are set: One variable of note, user.name, is actually an Oozie built-in variable, and evaluates to the name of the user running the coordinator app. A directory must exist in HDFS with all the necessary files and external JAR files for running the application. In this app, workflowRootpoints to the location where that directory exists.

Running the Coordinator Application

In order to run the coordinator app, you first have to stage the necessary files in HDFS. The files should be put in HDFS according to their defined locations in the XML files. In this example, all the files are located in the same directory: /user/${user.name}/add-tweet-partitions. That directory will also need to have a lib subdirectory in it. The lib directory can be used to make any dependency JARs available to the workflow. For example, Hive actions will usually require a JDBC driver, in order to communicate with a non-Derby metastore, and for the example in this article, the lib directory will contain the custom SerDe JAR file, as well.

 

In order to run the coordinator app, the job.properties file should be stored locally, and the following command should be run:

 

That command will return the name of the job, which can be used to get status updates on the job, either via the web UI or the command line tool.

Once executed, the coordinator app will create and run instances of the workflow each hour, and data will automatically become queryable through Hive as the partitions get created.

Conclusion

Oozie is a versatile system that can be used to set up and automate even the most complicated of data processing workflows. Also, check out the recently added workflow and coordinator app creation UI in Hue, which makes it much easier to design and execute complicated data flows.

This how-to is certainly not comprehensive, and there are lots of other features in Oozie that haven’t been discussed. I encourage you to go and try to build your own Oozie workflows and coordinator apps, and explore some different features.

Jon Natkins is a field engineer at WibiData. Formerly, he worked as a software engineer at Cloudera.

Filed under:

1 Response
  • Anne / February 11, 2013 / 12:30 AM

    I keep getting a “The matching wildcard is strict, but no declaration can be found for element ‘hive’.” error. Been searching around the web for solutions, and I already checked my Oozie config (oozie-site.xml and using the Cloudera manager) and the required .xsd files are declared there.

    Any idea on what could be causing this? I’m on CDH 4.1.3. Thanks!

Leave a comment


+ two = 9