How-to: Integrate Cloudera Director with a Data Pipeline in the Cloud

How-to: Integrate Cloudera Director with a Data Pipeline in the Cloud

Learn how to use Cloudera Director to automate cluster operations (and more) in the cloud.

Cloudera Director was designed from the beginning to be primarily an API that can integrate with your existing data pipelines and workflows to handle tasks like creating, terminating, and resizing the Apache Hadoop (CDH) clusters used to run your data processing jobs or SQL queries.

Among many other new features, Cloudera Director 2.0 includes a new API endpoint that you can used to import cluster-definition files (called client config files in Director parlance). This capability is interesting because it allows you to easily automate the task of creating and terminating clusters—including integrating with job-submission workflows—with all sorts of topologies, and with minimal dependencies. You can automate the entire process using shell scripts with common tools like wget and jq. You can even use cron or Jenkins for scheduling.

In this post, you’ll learn how you use Cloudera Director to construct a data pipeline that manages the cluster lifecycle.

Data Pipelines in the Cloud

A data pipeline is a multi-step ingest-and-transform process. On cloud infrastructure, a key component of a data pipeline is an object store: Data originating from your web tier or various other application servers gets uploaded to an object store, and later on, downstream orchestration systems schedule processing jobs that will transform it based on your business needs. The end results are either derived datasets, reports for end-users, or both.

For efficiency reasons, data pipelines in the cloud should provision storage and compute resources only as needed to satisfy immediate demands (that is, in an elastic manner). Such resources include the clusters used by your workflow orchestration system to run the jobs or queries that will transform your raw data. By making cloud elasticity work for you, you can significantly increase the efficiency of your infrastructure from a cost perspective.

Next, let’s move on to the details.

Building a Simple Pipeline

Let’s start simple: The task at hand is to write a shell script that can be executed on a daily basis to provision a cluster, run a MapReduce job, and terminate it.

Before we get into the fine details, you will need to get Cloudera Director up and running. The fastest way to do that is by using AWS Quick Start (click on “Cloudera Enterprise Data Hub”). That CloudFormation template generated by AWS Quickstart can create a brand new VPC, install Cloudera Director, and handle all the initial configuration. All you have to do is SSH into the cluster launcher and configure a tunnel for port 7189. (See AWS Quickstart documentation.) If you have a customized AWS setup, or if you are using Google Cloud Platform, check out the installation instructions (for AWS and GCP respectively).

With Cloudera Director up and running, the next step is to create a valid client configuration file. AWS QuickStart does that for you via the CloudFormation template (see ~/cloudera/setup-default/aws.simple.conf). If you are starting from scratch, check out our sample config files at director-scripts/configs and edit them as needed to match your environment.

Now that you have a valid client configuration file, you can use it to create a cluster for your job. In the next few paragraphs, I will go through this example script and explain the relevant sections.

Creating a Cluster

Step 1 is to send the config file to Cloudera Director.

${WGET} --header="Content-Type: text/plain" --post-file "${CONFIG_FILE}" "${DIRECTOR_SERVER}/api/v4/import?clusterName=${CLUSTER_NAME}&deploymentName=${DEPLOYMENT_NAME}&environmentName=${ENVIRONMENT_NAME}"

That will immediately trigger the cluster creation process and override the names used for the cluster, deployment, and environment. If any entity in this hierarchy already exists, the import process will skip it.

Step 2 is to wait for the cluster to be ready. The status of the cluster setup process can be retrieved via the API like this:

while
    STAGE=$(${WGET} -q "${CLUSTER_URL}/status" | jq -r ".stage");
    [ "${STAGE}" != "READY" ];
do
    if [ "${STAGE}" == "BOOTSTRAP_FAILED" ]; then
        echo "Cluster ${CLUSTER_NAME} failed to  bootstrap. Please check server logs."
        exit -1
    fi
    if [[ -z "${STAGE}" ]]; then
        echo "Unable to retrieve status for cluster ${CLUSTER_NAME}. Please check server logs."
        exit -1
    fi
    echo "Stage is ${STAGE}. Waiting for 30 seconds ..."
    sleep 30
done

This process usually takes 20-25 minutes the first time around (depending on the complexity of your topology). If you keep Cloudera Manager running, the second setup will be faster.

Step 3 is to discover a gateway and run the job. If your cluster doesn’t have a gateway, you could use the master instance group as one.

GATEWAY_IP=$(${WGET} -q "${CLUSTER_URL}" | jq -r ".instances | map(select(.virtualInstance.template.name == "${GATEWAY_GROUP_NAME}"))[0] | .ipAddress")

To run a job you will need a script that will SSH into the gateway, start it, and block while the job is running. The example dispatch script does that for you, so you only need to provide a script with the actual job invocation (see job-example-script.sh).

Step 4 is to terminate the cluster to release the resources you no longer need. This is as simple as the initial creation:

${WGET} --method DELETE "${CLUSTER_URL}"

STAGE=$(${WGET} -q "${CLUSTER_URL}/status" | jq -r ".stage");
if [ "${STAGE}" == "TERMINATING" ]; then
    echo "Cluster ${CLUSTER_NAME} is terminating"
else
    echo "Unable to terminate cluster. Stage is ${STAGE}"
fi

That will keep Cloudera Manager around for the next run. If you wish to terminate it, a similar delete request can be sent to a different URL.

That’s all. Easy, right?

More Complex Integrations

If you are using other, more generic workflow management or orchestration systems like Activiti or Airbnb Airflow, you can use our Java and Python SDKs to integrate with Cloudera Director. The sky’s the limit!

This is how you import a client configuration file in Java (script):

ApiClient client = newAuthenticatedApiClient(...);

ImportClientConfigApi api = new ImportClientConfigApi(client);
ImportResult result = api.importClientConfig(config, clusterName, deploymentName, environmentName);

This is how you import a client configuration file in Python (script):

config = open(args.config_file, 'r').read()
client = get_authenticated_client(args)

api = ImportClientConfigApi(client)
result = api.importClientConfig(config)

Need Help Along the Way?

Don’t hesitate to reach out on our community forum. Thanks!

Andrei Savu is a Software Engineer at Cloudera.

Andrei Savu
More by this author

Leave a comment

Your email address will not be published. Links are not permitted in comments.