Apache HBase Replication: Operational Overview

Apache HBase Replication: Operational Overview

This is the second blogpost about Apache HBase replication. The previous blogpost, HBase Replication Overview, discussed use cases, architecture and different modes supported in HBase replication. This blogpost is from an operational perspective and will touch upon HBase replication configuration, and key concepts for using it — such as bootstrapping, schema change, and fault tolerance.


As mentioned in HBase Replication Overview, the master cluster sends shipment of WALEdits to one or more slave clusters. This section describes the steps needed to configure replication in a master-slave mode.

  1. All tables/column families that are to be replicated must exist on both the clusters.
  2. Add the following property in $HBASE_HOME/conf/hbase-site.xml on all nodes on both clusters; set it to true.


On the master cluster, make the following additional changes:

  1. Set replication scope (REPLICATION_SCOPEattribute) on the table/column family which is to be replicated:

    hbase shell> disable ‘table’
    hbase shell> alter ‘table’, {NAME => ‘column-family’, REPLICATION_SCOPE => 1}
    hbase shell> enable ‘table’

REPLICATION_SCOPE is a column-family level attribute and its value can be either 0 or 1. A value of 0 means replication is disabled, and 1 means replication is enabled. A user has to alter each column family with the alter command as shown above, for all the column families he wants to replicate.

If a user wants to enable replication while creating a table, he should use the following command:

hbase shell> create ‘table’, ‘column-family1’, ‘‘column-family2’, {NAME => ‘column-family1’, REPLICATION_SCOPE => 1}

The above command will enable replication on ‘column-family1’ of the above table.

       2.    In the hbase shell, add the slave peer. A user is supposed to provide slave cluster’s zookeeper quorum, its client port, and the root hbase znode, along with a peerId:

hbase shell>add_peer ‘peerId’, “::”

The peerId is a one or two character long string, and a corresponding znode is created under the peers znode, as explained in the previous blog. Once a user runs the add_peer command, Replication code instantiates a ReplicationSource object for that peer, and all the master cluster regionservers try to connect to the slave cluster’s regionservers. It also fetches the slave cluster’s ClusterId (UUID, registered on the slave cluster’s zookeeper quorum). The master cluster regionserver lists the available regionservers of the slave by reading “/hbase/rs” znode and its children on the slave cluster’s zookeeper quorum, and makes connection to it. Each regionserver at the master cluster chooses a subset from the slave regionservers, depending on the ratio specified by “replication.source.ratio”, with default value 0.1. This means that each master cluster regionserver will try to connect to 10% of the total of slave cluster regionservers. While sending the transaction batch, the master cluster regionserver will pick a random regionserver from these connected regionservers. (Note: Replication is not done for catalog tables, .META. and _ROOT_.)

To set up a master-master mode, the above steps should be repeated on both clusters.

Schema Change

As mentioned in the previous section, replicated table and column family must exist in both clusters. This section discusses various possible scenarios as to what happens during a schema change when the replication is still in progress:

a) Delete the column family in master: Deletion of a column family will not affect the replication of any pending mutations for that family. This is because replication code reads the WAL and checks the replication scope of the column families for each WALEdit. Each WALEdit has a map of the replication enabled column families; the check is done on all the constituting keyvalue’s column family (whether they are scoped or not). If it is present in the map, it is added to the shipment. Since the WALEdit object is created before the column family was deleted, its replication will not be affected.

b) Delete the column family in slave: The WALEdits are shipped from the master cluster to a particular slave regionserver, which processes it like a normal HBase client (using a HTablePool object). Since the column family is deleted, the put operation will fail and that exception is thrown to the master regionserver cluster.

Start/Stop Replication

Start/Stop commands work as a kill switch. When stop_replication command is run at the HBase shell, it will change the value of /hbase/replication/state to false. It will stop all the replication source objects from reading the logs; but the existing read entries will be shipped. If a user uses the stop replication command,  the newly rolled logs will not be enqueued for replication. Similarly, issuing a start_replication command will start the replication from the current WAL (which may contain some previous transactions), and not from the time when the command was issued.


Figure 1 explains the start-stop switch behavior, where the sequence of events flows in the direction of arrows.

Version Compatibility

Master cluster regionservers connect to slave cluster regionservers as normal HBase clients. The same rule of compatibility applies as to whether a HBase client on version xxx is supported on an HBase server on version yyy.

On another point, since replication is still evolving (more and more functionalities are continually added to it), a user should be aware of the existing functionalities. For example, in CDH4, which is based on the HBase 0.92 branch, there is support for master-master and cyclic replication. Enabling/Disabling replication source at peer level is added in the HBase 0.94 branch.


Replication works by reading the WALs of the master cluster regionservers. If a user wants to replicate old data, they can run a copyTable command (defining start and end timestamp), while enabling the replication. The copyTable command will copy the data scoped by the start/end timestamps, and replication will take care of current data. The overall approach can be summarized as:

  1. Start the replication (note the timestamp).
  2. Run the copyTable command with an end timestamp equal to the above timestamp.
  3. Since replication starts from current WAL, there may be some keyvalues which are copied to slave by both replication and copyTable jobs. This is still okay, as it is an idempotent operation.

In the case of master-master replication, one should run the copyTable job before starting the replication. This is because if a user starts a copyTable job after enabling replication, the second master will resend the data to the first master, as copyTable does not edit the clusterId in the mutation objects. The overall approach can be summarized as:

  1. Run the copyTable job, (note the start timestamp of the job).
  2. Start the replication.
  3. Run the copyTable again with starttime equal to the starttime noted in step 1.

This also entails some data being pushed back and forth between the two clusters;  but it minimizes its size.

Fault Tolerance

Master Cluster Region Server Failover
All the regionservers in the master cluster create a znode under “/hbase/replication/rs”, as mentioned in the Architecture section. A regionserver adds a child znode for each WAL, with a byte offset under its znode in the replication hierarchy, as shown in Figure 1. If a regionserver fails, other regionservers need to take care of the dead regionserver’s logs, which are listed under that regionserver’s znode. All regionservers keep a watch on other regionserver znodes (“/hbase/rs”) znode; so when a regionserver fails, other regionservers will get the event as master marks this regionserver as dead. In this case, all other regionservers race to transfer the WALs from dead regionserver znode to their znode, and attach a prefix it with slave id and dead regionserver name, in order to distinguish from other normal logs. A separate replication source (NodeFailoverWorker instance) is instantiated for the transferred logs, which dies after processing these transferred logs.

If one consider Figure 1 of the HBase Replication Overview as the base figure of replication znodes hierarchy, Figure 2. shows the new replication znodes hierarchy in case server foo1.bar.com dies and foo2.bar.com takes over its queue. Note the new znode “1-foo1.bar.com,40020,1339435481973” which is created under foo2.bar.com znode

/hbase/hbaseid: b53f7ec6-ed8a-4227-b088-fd6552bd6a68 …. /hbase/rs/foo2.bar.com,40020,1339435481973: /hbase/rs/foo3.bar.com,40020,1339435486713: /hbase/replication: /hbase/replication/state: true /hbase/replication/peers: /hbase/replication/peers/1: zk.quorum.slave:281:/hbase /hbase/replication/rs: /hbase/replication/rs/foo1.bar.com.com,40020,1339435084846: /hbase/replication/rs/foo1.bar.com,40020,1339435481973/1: /hbase/replication/rs/foo1.bar.com,40020, 1339435481973/1/foo1.bar.com.1339435485769: 1243232 /hbase/replication/rs/foo3.bar.com,40020,1339435481742: /hbase/replication/rs/foo3.bar.com,40020,1339435481742/1: /hbase/replication/rs/foo3.bar.com,40020, 1339435481742/1/foo3.bar..com.1339435485769: 1243232 /hbase/replication/rs/foo2.bar.com,40020,1339435089550: /hbase/replication/rs/foo2.bar.com,40020,1339435481742/1: /hbase/replication/rs/foo2.bar.com,40020, 1339435481742/1/foo2.bar..com.13394354343443: 1909033 /hbase/replication/rs/foo2.bar.com,40020,1339435481742/1- foo1.bar.com,40020,1339435481973/foo1.bar.com.1339435485769: 1243232

Figure 2. Regionserver failover znodes hierarchy

Meanwhile, log splitting may kick in and may archive the dead region server logs. The replication source looks for the logs in both regular and archived directory.

Slow/unresponsive slave cluster (or regionservers)
When a slave cluster is down, or if there is a temporary network partition, logs which are not yet replicated to the slave will not be deleted by the HBase log cleaner.

Log cleaning is handled by LogCleaner class, which keeps on running after a configured time. Replication code adds ReplicationLogCleaner plugin, to the LogCleaner class. When the latter tries to delete a specific log, ReplicationLogCleaner will look to see whether that log exists in the replication znode hierarchy (under the /hbase/replication/rs/ znode). If the log is found, it means that the log is yet to be replicated, and it will skip its deletion. Once the log is replicated, its znode will be deleted from the replication hierarchy. In its next run, LogCleaner will delete the log file successfully once it is replicated.


For smaller amount of data, one can simply look for the table rows using the hbase shell at the slave cluster to verify whether they are replicated or not. A standard way to verify is to run the verifyrep mapreduce job, that comes with HBase. It should be run at the master cluster and require slave clusterId and the target table name. One can also provide additional arguments such as start/stop timestamp and column families. It prints out two counters namely, GOODROWS and BADROWS, signifying the number of replicated and unreplicated rows, respectively.

Replication Metrics

Replication framework exposes some useful metrics which can be used to check the replication progress. Some of the important ones are:

  1. sizeOfLogQueue: number of HLogs to process (excludes the one which is being processed) at the Replication source
  2. shippedOpsRate: rate of mutations shipped
  3. logEditsReadRate: rate of mutations read from HLogs at the replication source
  4. ageOfLastShippedOp: age of last batch that was shipped by the replication source

Future Work

With all the current features present in current HBase replication,  there is still scope for further improvement. It varies from performance such as decreasing the replication time-lag between master and slave, to more robust handling of region server failure (HBase-2611). Further areas of improvement include enabling peer-level table replication and proper handling of IncrementColumnValue (HBase-2804).

This post discussed HBase replication from an operator’s point of view including configuration (of various modes), bootstrapping an existing cluster, effects of schema changes and fault tolerance.

Himanshu Vashishtha
More by this author

Leave a comment

Your email address will not be published. Links are not permitted in comments.