Operating Apache Kafka with Cruise Control

Operating Apache Kafka with Cruise Control

About Cruise Control

There are two big gaps in the Apache Kafka project when we think of operating a cluster. The first is monitoring the cluster efficiently and the second is managing failures and changes in the cluster. There are no solutions for these inside the Kafka project but there are many good 3rd party tools for both problems.

Cruise Control is one of the earliest open source tools to provide a solution for the failure management problem but lately for the monitoring problem as well. It was created by LinkedIn licensed under the Apache License and there are contributions by many companies including LinkedIn and Cloudera as well.

In this blog post I briefly explore how Cruise Control works internally to provide some context for the API and drive you through a series of examples that can be executed after each other to demonstrate the serious capabilities that it has to help in operating a CDP Kafka cluster.

Architecture

Cruise Control is integrated with Kafka through metrics reporting. In CDP it connects to Cloudera Manager’s time series database to fetch metrics. Based on these metrics it builds an internal picture of the cluster, the so-called workload model, that will be used as the input of the optimization based on parameters such as network throughput, CPU or disk usage. These optimizations — or proposals — will be executed upon user request or automatically depending on how Cruise Control is configured. We’ll look at the internals briefly to understand the basics of the main building components as they are important to know when we’ll try to understand the output of the API calls.

Metrics Reporting

This is a pluggable component that fetches and stores Kafka metrics. The open source version of Cruise Control stores metrics back to Kafka or Prometheus. The CDP integration currently uses the Cloudera Manager Metrics Database (SMON) as that is the single source of truth of Kafka metrics in our environment. In fact we don’t use a separate component but rather use a custom sample store implementation to fetch metrics and generate samples.

Load Monitor

This component is responsible for the creation of workload models which are used as the basis of Cruise Control. It will collect various metrics and also derive a special partition level CPU utilization which isn’t available in Kafka. Then it organizes these metrics into time based windows. A preconfigured number of windows will form the cluster model. It will then feed these metrics into the anomaly detector and the analyzer.

Analyzer

This is the “brain” of Cruise Control. It uses a heuristic method to generate optimization proposals based on the goals provided by the users and the workload model emitted by the load monitor.

The goals are predefined but pluggable components and they will define how an optimal cluster utilization would look. For instance, a goal can say that the CPU utilization of brokers must not exceed 85%. There are a number of predefined goals but users can implement their own too, it’s pluggable. Goals have two types: soft goals and hard goals. During the optimization, hard goals are satisfied first and they must be satisfied to get a valid optimization proposal. If soft goals aren’t satisfied then a proposal can still be valid. Such a goal is the ReplicaDistributionGoal which specifies that the number of replicas on each broker should be around the same within a given threshold.

Anomaly Detector

The Anomaly Detector identifies four types of different anomalies.

  • Broker failure: this is when a non-empty broker leaves the cluster unexpectedly and doesn’t come back within a defined grace period of time. When this happens and if self-healing is enabled for this kind of anomaly, then Cruise Control will attempt to fix this by moving all offline replicas to healthy brokers.
  • Disk failure: when Cruise Control is used with JBOD then a non-empty disk might die which causes partitions to go offline. If self-healing is enabled for this anomaly, then it will trigger a replica move to a healthy disk.
  • Goal violation: this happens when an optimization goal is violated. In such cases and when self-healing is enabled for this, Cruise Control will attempt to proactively fix the cluster by analyzing the workload and executing an optimization proposal.
  • Metric anomaly: when Cruise Control observes a sudden out of order value in a collected metric it can trigger self-healing if it’s enabled for this kind of anomaly. Currently there isn’t a standard for this type of anomaly since different metric inconsistencies may require different remediations, however since it’s a pluggable component, users can define their own anomaly detection and remediation rules.

Executor

This component is responsible for executing the optimization proposals generated by the previous components. It is designed in such a way that it is safely interruptible and doesn’t overwhelm the brokers. If a proposal is too large to execute at once, it breaks up into smaller chunks and executes them after each other. In practice this means that it breaks up large partition reassignments and executes them separately. It can also set throttling to further ensure safety.

Use Cases

As the main attraction of this blog post I’ll cover the most important functionalities of Cruise Control. It’s not complete as I won’t list every parameter to maintain focus on functionality but these are well documented on GitHub. In the examples I use a CDP Private Base cluster that has 4 nodes with 3 Kafka brokers initially. The cluster isn’t secured with Kerberos and SSL as I wanted to concentrate on Cruise Control here and leave out anything else that could complicate these examples. We will add, remove and heal brokers, so if you want to follow I suggest you create a similar setup. In the cluster I have 295 partitions altogether. The majority of these are default partitions and I have a test topic called cruise-control-test-topic with 25 partitions. The replication factor is set to 3 for most of the partitions but some default topics created by Cruise Control have a replication factor of 2. 

The test topic is created the following way:

kafka-topics --bootstrap-server cruise-control-blog-1.example.com:9092 --create --topic cruise-control-test-topic --replication-factor 3 --partitions 25

After creating this topic I populate it with some data:

kafka-producer-perf-test --producer.config producer.properties --topic cruise-control-test-topic --throughput -1 --record-size 1000 --num-records 5000000

The producer properties file is configured to access the broker, it at least has to contain bootstrap.servers config.

Adding Brokers

In this first example we will add a broker. In the Cloudera world this consists of two steps. First we need to add a new role in Cloudera Manager and then use Cruise Control to put data on it. So following the docs I added a new Kafka Broker role instance but this broker is empty at this point. If we look at the Cruise Control cluster state then we would see the following:

[root@cruise-control-blog-1 ~]# curl -X GET http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/kafka_cluster_state

Brokers:

              BROKER           LEADER(S)            REPLICAS         OUT-OF-SYNC             OFFLINE       IS_CONTROLLER

                  25                  96                 204                   0                   0               false

                  27                  97                 203                   0                   0                true

                  29                 102                 211                   0                   0               false

                  34                   0                   0                   0                   0               false

LogDirs of brokers with replicas:

              BROKER                       ONLINE-LOGDIRS  OFFLINE-LOGDIRS

                  25              [/var/local/kafka/data]               []

                  27              [/var/local/kafka/data]               []

                  29              [/var/local/kafka/data]               []

                  34              [/var/local/kafka/data]               []




Under Replicated, Offline, and Under MinIsr Partitions:

                                                     TOPIC PARTITION    LEADER                      REPLICAS                       IN-SYNC              OUT-OF-SYNC                  OFFLINE

Offline Partitions:

Partitions with Offline Replicas:

Under Replicated Partitions:

Under MinIsr Partitions:

Now comes Cruise Control! There is a REST API called add_broker in Cruise Control that will run a rebalance and put replicas on it so that there is optimal load on it. It is important to note that by executing this action there won’t be any partition movements between other brokers, it just moves data between the newly added and old brokers. To initiate it you have to call the API like this:

curl -X POST 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/add_broker?brokerid=34'

The output is huge so instead of copying the whole of it I’ll only talk about some parts of it. Immediately the first lines contain the most important information: how much data is moved, what topics or brokers were excluded (usually specified by the excluded_topics parameter with the API calls). Brokers are usually excluded because of the exclude_recently_demoted_brokers and the exclude_recently_removed_brokers parameters.

Optimization has 169 inter-broker replica(9885 MB) moves, 0 intra-broker replica(0 MB) moves and 0 leadership moves with a cluster model of 3 recent windows and 100.000% of the partitions covered.

Excluded Topics: [].

Excluded Brokers For Leadership: [].

Excluded Brokers For Replica Move: [].

Counts: 4 brokers 618 replicas 29 topics.

On-demand Balancedness Score Before (71.541) After(81.121).

Then in the next few sections it will iterate through the goals and display its CPU/network/disk stats and whether the goal was fixed, still violated or there wasn’t any action.

Stats for DiskUsageDistributionGoal(FIXED):

AVG:{cpu:       0.415 networkInbound:       0.087 networkOutbound:       0.053 disk:   10887.340 potentialNwOut:       0.140 replicas:154.5 leaderReplicas:73.75 topicReplicas:5.327586206896552}

MAX:{cpu:       1.181 networkInbound:       0.119 networkOutbound:       0.159 disk:   11604.546 potentialNwOut:       0.188 replicas:157 leaderReplicas:106 topicReplicas:46}

MIN:{cpu:       0.113 networkInbound:       0.007 networkOutbound:       0.004 disk:    9893.073 potentialNwOut:       0.009 replicas:153 leaderReplicas:62 topicReplicas:0}

STD:{cpu:       0.444 networkInbound:       0.047 networkOutbound:       0.062 disk:     520.183 potentialNwOut:       0.076 replicas:1.6583123951777 leaderReplicas:18.632968094214082 topicReplicas:2.519316559725089

In the last section of the output we can see optimized cluster load.

Cluster load after adding broker [34]:

                  HOST         BROKER             DISK(MB)/_(%)_         CPU(%)       LEADER_NW_IN(KB/s)     FOLLOWER_NW_IN(KB/s)        NW_OUT(KB/s)       PNW_OUT(KB/s)    LEADERS/REPLICAS

cruise-control-blog-1.example.com,            25,           11025.854/10.77,         0.118,                   0.082,                   0.023,              0.148,              0.175,            59/147

cruise-control-blog-2.example.com,            27,           11604.533/11.33,         0.094,                   0.015,                   0.091,              0.015,              0.175,            63/153

cruise-control-blog-3.example.com,            29,           11025.741/10.77,         1.017,                   0.023,                   0.082,              0.027,              0.174,            59/149

cruise-control-blog-4.example.com,            34,            9893.253/09.89,         0.430,                   0.021,                   0.013,              0.022,              0.036,           114/169

At this point it didn’t execute the request, it was just a dry run: Cruise Control will execute every command in this mode unless you specify the dryrun=false parameter:

curl -X POST 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/add_broker?brokerid=34&dryrun=false'

There are some other useful parameters that may come handy like replication_throttle where you can put an upper cap on the bandwidth (bytes/second) of the reassignment. This is useful to avoid putting a sudden pressure on the brokers. This parameter is usually available for all the API calls that may cause data movement. To launch this command with replication throttling you need to specify the URL like this:

curl -X POST 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/add_broker?brokerid=34&dryrun=false&replication_throttle=1000000'

Here I specified replication_throttle to limit the bandwidth at 1000000 bytes/second (1 MB/sec). It is important here to factor in the growth rate of the overall cluster because specifying a too low value may cause moved replicas to never catch up to their leader and stay in reassignment longer than expected.

After setting the parameters correctly and executing the command the reassignment starts, it will move some replicas from old brokers to this new broker. Also, the brokerid parameter can be a comma separated list so one can add a number of brokers at once.

If we look at the load API in Cruise Control logs we can check the execution:

[root@cruise-control-blog-1 ~]# curl -X GET 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/user_tasks'

USER TASK ID                          CLIENT ADDRESS        START TIME               STATUS       REQUEST URL 6116123a-9811-49bd-b5fc-5ee27a58aa83  10.65.52.176          2021-07-30_14:23:46 UTC  InExecution  POST /kafkacruisecontrol/add_broker?brokerid=34&dryrun=false 7ccb7fe0-6d57-43d9-84b3-66dcdaa16cd6  10.65.52.176          2021-07-30_14:23:15 UTC  Completed    POST /kafkacruisecontrol/add_broker?brokerid=34

Also with the state endpoint we will get a very accurate picture of what’s going on inside Cruise Control. Here I’ll use the verbose=true mode which not only prints the detailed state but also prints the pending, ongoing and dead reassignments. If you don’t want such a verbose view you can either leave this parameter or you can specify the substates parameter (with values analyzer, monitor, executor and anomaly_detector) that returns only the selected subcomponent.

[root@cruise-control-blog-1 ~]# curl -X GET 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/state?verbose=true'

The first section of the output is the Load Monitor. It contains information about the status of the linear regression model that is used to estimate CPU utilization (can be enabled with the use.linear.regression.model flag but it’s disabled by default), number of valid windows and partitions. Then there are flawed partitions which means that some of the partitions may not have metrics in all the windows so Cruise Control extrapolated some metric values. Then it will display if the metric collection is running which isn’t in this case as an execution is in progress (and hence the flawed partitions).

MonitorState: {state: PAUSED(20.000% trained), NumValidWindows: (5/5) (100.000%), NumValidPartitions: 295/295 (100.000%), flawedPartitions: 295, reasonOfPauseOrResume: Paused-By-Cruise-Control-Before-Starting-Execution (Date: 2021-07-30_14:23:46 UTC)}

Then next the Executor state is displayed. This displays information about what is currently being executed and where the process is exactly at. In this case we can see that it moved about 1/3rd of the data and 2/3rd of the partitions.

ExecutorState: {state: INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS, pending/in-progress/aborting/finished/total inter-broker partition movement 47/0/0/111/158, completed/total bytes(MB): 4701/12065, maximum concurrent inter-broker partition movements per-broker: 5, triggeredUserTaskId: 6116123a-9811-49bd-b5fc-5ee27a58aa83, triggeredTaskReason: No reason provided (Client: 10.65.52.176, Date: 2021-07-30_14:23:46 UTC)}

Then in the next section we can see the Analyzer’s state. It tells us that there is a proposal ready and that the goals that form that proposal.

AnalyzerState: {isProposalReady: true, readyGoals: [NetworkInboundUsageDistributionGoal, CpuUsageDistributionGoal, PotentialNwOutGoal, LeaderReplicaDistributionGoal, NetworkInboundCapacityGoal, LeaderBytesInDistributionGoal, DiskCapacityGoal, ReplicaDistributionGoal, RackAwareGoal, TopicReplicaDistributionGoal, NetworkOutboundCapacityGoal, CpuCapacityGoal, DiskUsageDistributionGoal, NetworkOutboundUsageDistributionGoal, ReplicaCapacityGoal]}

Our next component is the Anomaly Detector. As said previously this is responsible for detecting broker or disk failures, goal violations or metric anomalies. In its status we see which of these are enabled and whether there were any incidents. Later on in the self-healing section we will see a better example.

AnomalyDetectorState: {selfHealingEnabled:[BROKER_FAILURE, DISK_FAILURE, GOAL_VIOLATION, METRIC_ANOMALY, TOPIC_ANOMALY], selfHealingDisabled:[], selfHealingEnabledRatio:{BROKER_FAILURE=1.0, DISK_FAILURE=1.0, METRIC_ANOMALY=1.0, GOAL_VIOLATION=1.0, TOPIC_ANOMALY=1.0}, recentGoalViolations:[], recentBrokerFailures:[], recentMetricAnomalies:[], recentDiskFailures:[], recentTopicAnomalies:[], metrics:{meanTimeBetweenAnomalies:{GOAL_VIOLATION:0.00 milliseconds, BROKER_FAILURE:0.00 milliseconds, METRIC_ANOMALY:0.00 milliseconds, DISK_FAILURE:0.00 milliseconds, TOPIC_ANOMALY:0.00 milliseconds}, meanTimeToStartFix:0.00 milliseconds, numSelfHealingStarted:0, numSelfHealingFailedToStart:0, ongoingAnomalyDuration=0.00 milliseconds}, ongoingSelfHealingAnomaly:None, balancednessScore:100.000}

After Cruise Control’s components’ status we can see the windows and their completeness. It can happen that there are some windows where not all required metrics could be collected. In this case completeness will be less than 100%.

Monitored Windows [Window End_Time=Data_Completeness]:

{1627654800000=100.000%, 1627654500000=100.000%, 1627654200000=100.000%, 1627653900000=100.000%, 1627653600000=100.000%}

After the monitored windows the next output is the goal readiness. I redacted this to only display the rack-aware goal as it is quite a lengthy list. It basically displays whether the goal is ready or not.

Goal Readiness:

                                     RackAwareGoal, (requiredNumWindows=1, minMonitoredPartitionPercentage=0.000, includedAllTopics=true), Ready

After the goals we can see the current execution with the specific pending, in-progress, aborting/aborted and dead reassignments. It won’t display the completed reassignments but be aware that if the rebalance affected let’s say 300 partitions then potentially it can display a lot of data.

Pending inter-broker partition movements:

{EXE_ID: 130, INTER_BROKER_REPLICA_ACTION, {cruise-control-test-topic-15, oldLeader: 27, [27, 25, 29] -> [27, 34, 29]}, PENDING}

In progress inter-broker partition movements:

Aborting inter-broker partition movements:

Aborted inter-broker partition movements:

Dead inter-broker partition movements:

These operations may take some time depending on the size of the moved data so feel free to have a cup of tea or coffee. Once it’s complete you can take a look at the kafka_cluster_state endpoint and we should see that all the brokers are utilized. Also by looking at the output of user_tasks (as we did previously) we can confirm that the execution has been completed.

Fixing Offline Replicas

This API can be thought of as a manual healing tool. It is able to repair the cluster in case of disk failures or broker failures. Repairing the cluster in this case means that it’ll reassign the replicas on the dead disk/broker to healthy ones. To run this you’ll need to call the fix_offline_replicas API.

It starts a rebalance which might involve data movement so it can be a lengthy process. It can only be used if there are offline replicas in the cluster, unlike the rebalance API which can be used any time.

To try this out, let’s kill a broker:

  1. Log into a broker host
  2. Search for processes that are run by the Kafka user to get the PID to kill: ps -aux | grep “kafka.properties” | less -S
  3. kill -9 <pid>

After killing the broker some replicas will go offline, which you can see by calling the kafka_cluster_state?verbose=true API. At this point we can use the API like this to trigger the repairing process:

curl -X POST 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/fix_offline_replicas?dryrun=false'

Partitions that have a single replica won’t be fixed. Cruise Control just removes the replica on the dead host, then recreates them on a new host and allows Kafka to replicate the leader. If there is only a single replica, it won’t be able to do anything with it.

If we bring back the host with the old partitions still on it, then although Kafka loads properly and old data will still exist, it won’t continue replicating the data as fix_offline_replicas moved them to other brokers. In practice this means that the Kafka broker will see no topics in Zookeeper and therefore it thinks it has no replicas to replicate. The cleanest thing to do in this case is to remove all data from the broker and call add_broker to repopulate it.

Once the failed broker is brought back online you’ll need to use the add_broker API to repopulate it with partitions as fix_offline_replicas removed all of them.

Self-Healing

As I showed previously, Cruise Control is able to recover partitions manually in some cases. The most frequent problems are that brokers crash and/or disks fail, get corrupted. Cruise Control essentially defends against these situations by reassigning the lost replica to a healthy broker (and it will replicate the leader on the newly assigned broker). This reassignment is however done in alignment with the optimized workload model. This feature is similar to the previously mentioned fix_offline_replicas API but it’s automated.

To enable this feature you must set the self.healing.enabled config to true and the anomaly.notifier.class to com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier. In Cloudera Manager 7.4.3 you can find these configurations in the Cruise Control configuration page but in earlier versions you must set the following in the “Cruise Control Server Advanced Configuration Snippet (Safety Valve) for cruisecontrol.properties”:

self.healing.enabled=true
anomaly.notifier.class=com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier

After setting these, restart Cruise Control.

Finally to simulate self-healing I’ll kill broker 25 the following way:

  1. Log into a broker host
  2. Search for processes that are run by the Kafka user to get the PID to kill: ps -aux | grep “kafka.properties” | less -S
  3. kill -9 <pid>

At this point Cruise Control will notice that the broker has been removed from the cluster but it will wait until broker.failure.alert.threshold.ms time elapses (by default it’s 15 minutes) to mark the broker dead. This config only marks the broker dead and triggers an alert. Self-healing itself is triggered by the broker.failure.self.healing.threshold.ms config which is set to 30 minutes by default and starts from the point where the broker disappeared. During this time the operator can prevent the start of self-healing if this is a known or expected issue.

The start of the self-healing process can be told from querying the state endpoint:

[root@cruise-control-blog-1 ~]# curl -X GET 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/state?substates=executor,anomaly_detector'

ExecutorState: {state: NO_TASK_IN_PROGRESS, recentlyRemovedBrokers: [25]}

AnomalyDetectorState: {selfHealingEnabled:[BROKER_FAILURE, DISK_FAILURE, GOAL_VIOLATION, METRIC_ANOMALY, TOPIC_ANOMALY], selfHealingDisabled:[], selfHealingEnabledRatio:{BROKER_FAILURE=1.0, DISK_FAILURE=1.0, METRIC_ANOMALY=1.0, GOAL_VIOLATION=1.0, TOPIC_ANOMALY=1.0}, recentGoalViolations:[], recentBrokerFailures:[{anomalyId=10009399-1407-4d2c-a71f-1125407f9c30, failedBrokersByTimeMs={25=1627993425277}, detectionDate=2021-08-03_12:26:45 UTC, status=FIX_STARTED, statusUpdateDate=2021-08-03_12:26:45 UTC}, {anomalyId=897a0d55-4a19-4200-a38d-37221d4709a6, failedBrokersByTimeMs={25=1627993425277}, detectionDate=2021-08-03_12:26:15 UTC, status=CHECK_WITH_DELAY, statusUpdateDate=2021-08-03_12:26:15 UTC}, {anomalyId=c37d233d-4350-48ae-9e40-6ecd86556442, failedBrokersByTimeMs={25=1627993425277}, detectionDate=2021-08-03_12:26:45 UTC, status=CHECK_WITH_DELAY, statusUpdateDate=2021-08-03_12:26:45 UTC}, {anomalyId=7d9f3dc5-cb61-41dd-8762-2514b36afb25, failedBrokersByTimeMs={25=1627993425277}, detectionDate=2021-08-03_12:23:45 UTC, status=CHECK_WITH_DELAY, statusUpdateDate=2021-08-03_12:23:45 UTC}], recentMetricAnomalies:[], recentDiskFailures:[], recentTopicAnomalies:[], metrics:{meanTimeBetweenAnomalies:{GOAL_VIOLATION:0.00 milliseconds, BROKER_FAILURE:0.25 milliseconds, METRIC_ANOMALY:0.00 milliseconds, DISK_FAILURE:0.00 milliseconds, TOPIC_ANOMALY:0.00 milliseconds}, meanTimeToStartFix:3.00 minutes, numSelfHealingStarted:1, numSelfHealingFailedToStart:0, ongoingAnomalyDuration=0.00 milliseconds}, ongoingSelfHealingAnomaly:None, balancednessScore:100.000}

Here you see in the recentBrokerFailures section of the output that there was a broker failure anomaly that it started to fix. Also the executor marks the broker as recently removed (you can see it in the Executor state section). This will be marked as failed as long as Cruise Control is running and the only way to remove this is to add the broker again. Any other operation will exclude the recently removed brokers so they won’t have any effect on it. This behavior however can be changed by passing exclude_recently_removed_brokers=false when calling the API however it is recommended to add the broker back instead with the add_brokers api and specify the old ID.

So at this point let’s add the removed broker 25 back with the following command:

curl -X POST 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/add_broker?brokerid=25&dryrun=false'

Load Rebalancing

Rebalancing is an on-demand way of starting a reassignment if you want to trigger reallocating the partitions if there is some imbalance observed in the cluster. It is important to note that this isn’t the equivalent of Kafka’s kafka-reassign-partitions. It doesn’t rebalance partitions based on user input but rather does it based on the workload model. Also Kafka’s kafka-reassign-partitions command is much less robust as you need to manually select partitions to rebalance, edit pass on JSON files for this command. That is really error prone and inefficient.

To launch a rebalance one should use the following command:

curl -X POST 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/rebalance?dryrun=false'

With well configured self-healing there might be no need to use the API but for users who wouldn’t like to enable self-healing in order to have more control over the cluster or it is suitable if rebalances need to be called programmatically.

Maintenance Mode

Sometimes it is required to demote a broker. That means Cruise Control will shift all leader partitions from the demoted broker and reorder the replicas so that the demoted broker’s replicas’ will be the least preferred replicas. This is useful if maintenance is needed on those brokers or if the data center is being under maintenance it can give an extra layer of safety as in case of an unexpected outage there will be no unexpected leadership changes, just some under-replicated partitions.

Let’s execute the demotion command:

[root@cruise-control-blog-1 ~]# curl -X POST 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/demote_broker?brokerid=34&dryrun=false'

Optimization has 0 inter-broker replica(0 MB) moves, 0 intra-broker replica(0 MB) moves and 95 leadership moves with a cluster model of 5 recent windows and 100.000% of the partitions covered.

Excluded Topics: [].

Excluded Brokers For Leadership: [].

Excluded Brokers For Replica Move: [].

Counts: 4 brokers 808 replicas 29 topics.

On-demand Balancedness Score Before (0.000) After(100.000).

Stats for PreferredLeaderElectionGoal(FIXED):

AVG:{cpu:       0.222 networkInbound:       0.114 networkOutbound:       0.071 disk:    7270.377 potentialNwOut:       0.192 replicas:202.0 leaderReplicas:73.75 topicReplicas:6.9655172413793105}

MAX:{cpu:       0.739 networkInbound:       0.128 networkOutbound:       0.191 disk:    7361.541 potentialNwOut:       0.230 replicas:221 leaderReplicas:116 topicReplicas:44}

MIN:{cpu:       0.025 networkInbound:       0.083 networkOutbound:       0.000 disk:    7002.346 potentialNwOut:       0.092 replicas:190 leaderReplicas:0 topicReplicas:0}

STD:{cpu:       0.300 networkInbound:       0.018 networkOutbound:       0.074 disk:     191.631 potentialNwOut:       0.058 replicas:11.554220008291344 leaderReplicas:44.17224807500746 topicReplicas:1.754077044300701

Cluster load after demoting broker [34]:

                  HOST         BROKER             DISK(MB)/_(%)_         CPU(%)       LEADER_NW_IN(KB/s)     FOLLOWER_NW_IN(KB/s)        NW_OUT(KB/s)       PNW_OUT(KB/s)    LEADERS/REPLICAS

cruise-control-blog-1.example.com,            25,            7002.346/06.84,         0.739,                   0.061,                   0.022,              0.069,              0.092,           116/221

cruise-control-blog-2.example.com,            27,            7356.167/07.36,         0.040,                   0.022,                   0.102,              0.022,              0.227,            96/200

cruise-control-blog-3.example.com,            29,            7361.541/07.19,         0.083,                   0.096,                   0.032,              0.191,              0.230,            83/190

cruise-control-blog-4.example.com,            34,            7361.452/07.19,         0.025,                   0.000,                   0.121,              0.000,              0.221,             0/197

You can see that after demotion of broker 34 it’ll have 0 leaders. It’ll execute fairly quickly as leadership change doesn’t include data movement. Once it’s finished we can look at kafka_cluster_state to confirm the success of the previous API call:

[root@cruise-control-blog-1 ~]# curl -X GET 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/kafka_cluster_state'

Brokers:

              BROKER           LEADER(S)            REPLICAS         OUT-OF-SYNC             OFFLINE       IS_CONTROLLER

                  25                 116                 221                   0                   0               false

                  27                  96                 200                   0                   0               false

                  29                  83                 190                   0                   0                true

                  34                   0                 197                   0                   0               false

LogDirs of brokers with replicas:

              BROKER                       ONLINE-LOGDIRS  OFFLINE-LOGDIRS

                  25              [/var/local/kafka/data]               []

                  27              [/var/local/kafka/data]               []

                  29              [/var/local/kafka/data]               []

                  34              [/var/local/kafka/data]               []

Under Replicated, Offline, and Under MinIsr Partitions:

                                                     TOPIC PARTITION    LEADER                      REPLICAS                       IN-SYNC              OUT-OF-SYNC                  OFFLINE

Offline Partitions:

Partitions with Offline Replicas:

Under Replicated Partitions:

Under MinIsr Partitions:

Also by calling the state endpoint we can again confirm that broker 34 has been recently demoted:

[root@cruise-control-blog-1 ~]# curl -X GET 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/state?substates=executor'

ExecutorState: {state: NO_TASK_IN_PROGRESS, recentlyDemotedBrokers: [34]}

To add the broker back and remove the broker from the recently demoted brokers list, you should call the add_broker endpoint. I have to note here that doing a simple rebalance will also work but that unfortunately doesn’t remove the broker from the recently demoted brokers list which is a problem because some operations might not be executed on demoted brokers (it is usually defined in an exclude_recently_demoted_brokers parameter in the REST calls).

Removing Brokers

Our final main use case is preparing a broker for removal from the cluster. The remove_broker API doesn’t remove the broker entirely from the cluster but only makes sure that all the replicas and data are moved to other brokers and therefore it is safe to switch off the target broker. Similarly to add_broker it doesn’t move partitions between other brokers. Also partitions are moved in batches and throttling can be applied to ensure an orderly rebalance that doesn’t overwhelm the cluster.

To try out this API and give a proper ending to this blog entry, let’s remove broker 34 which was added at the beginning of our journey through Cruise Control’s API. This can be done by calling the remove_broker API similarly to the previous APIs (and we can expect similar outputs):

curl -X POST 'http://cruise-control-blog-1.example.com:8899/kafkacruisecontrol/remove_broker?brokerid=34&dryrun=false'

This API can be thought of as a harder version of the demote_broker API. Similarly to that there will be a recentlyRemovedBrokers entry in the Executor’s state which is held in memory (so a restart will make it disappear) or an add_broker call would reinstate the broker and get rid of that entry. Also similarly to the demote_broker endpoint if a broker is listed in the recentlyRemovedBrokers list then some API functions won’t be executed on it, so if you decide to use it again then you must call an add_broker before any other operations.

Summary

I think it’s safe to say that for clusters with bigger loads Cruise Control is a good tool to use with Kafka. It helps you to balance the cluster load, react to failures much more efficiently, add, remove brokers and much more. It really empowers your ability as an operator of a Kafka cluster. It can also be used to automate the management of your cluster.

We have integrated Cruise Control with Cloudera Manager and with CDP 7.1 so it comes as an integral part of the platform. You’ll be able to monitor, configure your Cruise Control instance with the rest of the features.

To learn more about CDP Private Cloud Base that I used in my demo, you can visit the product’s page here and the documentation here. To learn more about how to use Cruise Control in CDP Private Cloud, go here.

Viktor Somogyi-Vass
Staff Engineer
More by this author

1 Comments

by Uday on

Hi Viktor,

Thanks for the article, its straight forward and easy to implement.

One question I have is, can we have the add broker option enabled automatically so that once the broker is up and running, Cruise control should automatically rebalance the partitions, instead of manually running the add_broker API.

Thanks,
Uday

Leave a comment

Your email address will not be published. Links are not permitted in comments.