Cloudera Flow Management Continuous Delivery Architecture

Cloudera Flow Management Continuous Delivery Architecture

Having introduced the flow delivery challenges and corresponding resolutions in the first article ‘Cloudera Flow Management Continuous Delivery while Minimizing Downtime’, we will combine all the preceding solutions into an example of flow management continuous delivery architecture.

 

DataFlow Continuous Delivery Architecture

DataFlow Continuous Delivery Architecture

 

In the whole process, we can see the following steps.

1.Development

Developers design the DataFlow and test in a shared multi-tenant DEV cluster. A team can only work in their own process groups controlled by Apache Ranger NiFi policies. If any teams require to modify or add a root controller service for their referencing, the administrator will assist with this change and copy it into the dedicated root_controller_services process group.

 

2. Version Control

Developers manually add or update the tested flows into a DEV NiFi Registry bucket for version control. With Apache Ranger NiFi Registry policies, one team is only able to observe the bucket assigned to them. Administrator updates the root_controller_services process group version in the Registry admin bucket.

 

3. Source Change Request

For developers to avoid promoting flows still under testing to a higher environment, they are required to create a change request file for peer review after the flows are tested. This file resides in the source change git project. Developers create a new branch of this git project, then update the source_change_request.json file to contain the changed flows and versions. If any root controller services are updated, the administrator will update the root_controller_services process group version in the source_change_request.json file.

Once these are all ready, the change owner needs to raise a git pull request to trigger Peer Review.

{
    "env": "dev",
    "nifi_api_url": "https://nifi-dev.example.cloudera.com:9091/nifi-api",
    "reg_api_url": "https://registry-dev.example.cloudera.com:61443/nifi-registry-api",
    "reg_client_name": "registry-dev.example.cloudera.com",
    "buckets": [
        {
            "name": "teama_bucket",
            "flows": [
                {"name": "flowa1_feeding", "version": "2"},
                {"name": "flowa1_digestion", "version": "2"},
            ]
        },
        {
            "name": "admin_bucket",
            "flows": [
                {"name": "root_controller_services", "version": "1"}
            ]
        }
    ]
}

source_change_request.json example

 

4. Peer Review

Team members peer review by checking the flow status and testing in DEV NiFi.

 

5. Approve Source Change

If the team confirms the change is ready to be promoted, they can merge this pull request into the master branch of the source change project.

 

6. Promote to Higher Environment

Cloudera suggests creating a Jenkins job to trigger the promote_source_changes.py python script by monitoring the master branch of the source change project. This script utilizes NiFi/NiFi Registry API to export the flow version from the DEV NiFi Registry and then import it into the UAT NiFi Registry. Example code here:

for bucket in source_env_conf['buckets']:
  login_env(source_env_conf)
  source_bucket = nipyapi.versioning.get_registry_bucket(bucket['name'])
  for flow in bucket['flows'][:]:
    # Check out a flow from Source NiFi Registry
    login_env(source_env_conf)
    source_versioned_flow = nipyapi.versioning.get_flow_in_bucket(source_bucket.identifier, identifier=flow['name'])
    exported_flow = nipyapi.versioning.export_flow_version(bucket_id=source_bucket.identifier, flow_id=source_versioned_flow.identifier, version=flow['version'], mode='yaml')
    # Version control this flow in Dest NiFi Registry
    login_env(dest_env_conf)
    dest_bucket = nipyapi.versioning.get_registry_bucket(dest_env_conf['buckets'][0])
    dest_flow = nipyapi.versioning.get_flow_in_bucket(bucket_id=dest_bucket.identifier, identifier=flow['name'])
    if dest_flow is None:
      imported_flow = nipyapi.versioning.import_flow_version(bucket_id=dest_bucket.identifier, encoded_flow=exported_flow, flow_name=flow['name'])
    else:
      imported_flow = nipyapi.versioning.import_flow_version(bucket_id=dest_bucket.identifier, encoded_flow=exported_flow, flow_id=dest_flow.identifier)
    log.info("Flow %s Version %s is imported from ENV %s into ENV %s.", flow['name'], flow['version'], source_env_conf['env'], dest_env_conf['env'])
    # Remove promoted flow from the json
    if flow in bucket['flows']:
      bucket['flows'].remove(flow)
  log.info("All %d flows in %s bucket %s are imported into %s bucket %s.", len(bucket['flows']), source_env_conf['env'], bucket['name'], dest_env_conf['env'], dest_env_conf['buckets'][0])

promote_source_changes.py Example

 

7. Destination Change Request

In the UAT NiFi Registry, a generate_dest_change_request.sh script is registered as a registry event hook.

<eventHookProvider>
  <class>org.apache.nifi.registry.provider.hook.ScriptEventHookProvider</class>
  <property name="Script Path">/var/cloudera/flow_cd/generate_dest_change_request.sh</property>
  <property name="Working Directory">/var/cloudera/flow_cd/</property>
  <property name="Whitelisted Event Type 1">CREATE_FLOW_VERSION</property>
</eventHookProvider>

NiFi Registry Event Hook Configuration

 

CREATE_FLOW_VERSION feeb0fbe-5d7e-4363-b58b-142fa80775e1 1a0b614c-3d0f-471a-b6b1-645e6091596d 4 flow_cd Update-Attributes

CREATE_FLOW_VERSION Event Example

 

This is equivalent to 

CREATE_FLOW_VERSION [BUCKET_ID=5d81dc5e-79e1-4387-8022-79e505f5e3a0, FLOW_ID=a89bf6b7-41f9-4a96-86d4-0aeb3c3c25be, VERSION=4, USER=flow_cd, COMMENT=Update-Attributes]

Any individual flow version update triggers the script to create a new branch of the destination change request git project, and to generate a dest_change_request.json file by comparing the new version and the deployed version. 

{
   "env":"uat",
   "nifi_api_url":"https://nifi-uat.example.cloudera.com:9091/nifi-api",
   "reg_api_url":"https://registry-uat.example.cloudera.com:61443/nifi-registry-api",
   "reg_client_name":"registry-uat.example.cloudera.com",
   "bucket":{
      "name":"uat_bucket",
      "flows":[
         {
            "name":"flowb1_digestion",
            "version":"4",
            "sensitive_parameters":[
               

            ],
            "comment":"Update Flow Attributes",
            "deployed_version":"2",
            "deployed_comment":"Update Logic"
         }
      ]
   }
}

dest_change_request.json example

A git pull request is raised for the release manager to review.

 

8. Approve Destination Deployment

Since each process group version change triggers one NiFi Registry event, the release manager will receive multiple destination change requests for one source change request if it contains multiple process groups and a few root controller services.  

The release manager needs to combine the related change requests together and review the changes. If any sensitive parameters are listed, he/she should confirm whether or not new sensitive parameter values are required in the destination NiFi. Having manually applied these sensitive parameter changes, the pull request can be approved and merged into the git master branch.

{
   "env":"uat",
   "nifi_api_url":"https://nifi-uat.example.cloudera.com:9091/nifi-api",
   "reg_api_url":"https://registry-uat.example.cloudera.com:61443/nifi-registry-api",
   "reg_client_name":"registry-uat.example.cloudera.com",
   "bucket":{
      "name":"uat_bucket",
      "flows":[
         {
            "name":"flowa1_digestion",
            "version":"1",
            "sensitive_parameters":[
               {
                  "name":"access_key_id"
               },
               {
                  "name":"secret_access_key"
               }
            ],
            "comment":"Initial Version"
         },
         {
            "name":"root_controller_services",
            "version":"3",
            "sensitive_parameters":[
               {
                  "name":"oracle_password"
               }
            ],
            "comment":"root_DBCPConnectionPool",
            "deployed_version":"2",
            "deployed_comment":"root_PropertiesFileLookupService"
         },
         {
            "name":"flowb1_digestion",
            "version":"4",
            "sensitive_parameters":[           
            ],
            "comment":"Update Flow Attributes",
            "deployed_version":"2",
            "deployed_comment":"Update Logic"
         }
      ]
   }
}

Combined dest_change_request.json example

 

9. Deploy Destination

Finally, the merged master branch triggers the python script deploy_dest_changes.py to automatically deploy the new flow version in the UAT environment without downtime, or minimum downtime.

    # Connect flowx_feeding with flowx_digestion
    if feeding_pg is not None and digestion_pg is not None:
      digestion_inputport = nipyapi.canvas.list_all_input_ports(pg_id=digestion_pg.id)
      feeding_outputport = nipyapi.canvas.list_all_output_ports(pg_id=feeding_pg.id)
      if digestion_inputport[0] is None or feeding_outputport[0] is None:
        raise SystemExit('Error: the flowx_feeding pg must have an output port, and the flowx_digestion pg must have an input port!')
      nipyapi.canvas.create_connection(feeding_outputport[0], digestion_inputport[0])
      nipyapi.canvas.schedule_process_group(digestion_pg.id, scheduled=True)
      nipyapi.canvas.schedule_process_group(feeding_pg.id, scheduled=True)

Connect the feeding and digestion sub-flows of a brand new flow 

      # Stop the input port
      nipyapi.canvas.schedule_components(pg_id=digestion_pg.id, scheduled=False, components=input_port)
      all_connections =  nipyapi.canvas.list_all_connections(pg_id=digestion_pg.id)
      queued_count = sum(locale.atoi(connection.status.aggregate_snapshot.queued_count) for connection in all_connections)
      # Wait for Queues are empty
      while (queued_count > 0):
        log.info("There are still %d queued events, waiting for all are processed.", queued_count)
        time.sleep(10)
        all_connections =  nipyapi.canvas.list_all_connections(pg_id=digestion_pg.id)
        queued_count = sum(locale.atoi(connection.status.aggregate_snapshot.queued_count) for connection in all_connections)
      log.info("Process Group %s has no queued event, start updating new version now!", flow['name'])
      nipyapi.canvas.schedule_process_group(digestion_pg.id, scheduled=False)
      nipyapi.versioning.update_flow_ver(process_group=digestion_pg, target_version=flow['version'])
      nipyapi.canvas.schedule_process_group(digestion_pg.id, scheduled=True)

Deploy a new version of a live digestion sub-flow

 

Thanks to the excellent NiPyApi https://github.com/Chaffelson/nipyapi innovated by Daniel Chaffelson, the Cloudera Flow Management delivery python scripts are much simpler than expected.

Also, thank you to Pierre Villard, Timothy Spann, Daniel Chaffelson, David Thompson, and Laura Chu for the review and guidance!

Leave a comment

Your email address will not be published. Links are not permitted in comments.