HBase Clusters Data Synchronization with HashTable/SyncTable tool

Replication (covered in this previous blog article) has been released for a while and is among the most used features of Apache HBase. Having clusters replicating data with different peers is a very common deployment, whether as a DR strategy or simply as a seamless way of replicating data between production/staging/development environments. Although it is an efficient way of keeping different HBase databases in sync within a sub-second latency, replication only operates over data ingested after the feature has been enabled. That means any pre-existing data on all clusters involved in the replication deployment will still need to get copied between the peers in some other way. There are quite a few tools that can be used to synchronize pre-existing data on different peer clusters. Snapshots, BulkLoad, CopyTable are well-known examples of such tools covered in previous Cloudera blog posts. In this article we are going to cover HashTable/SyncTable, detailing some of its internal implementation logic, the pros and cons of using it, and how it compares to some of the other data copy techniques mentioned above.

HashTable/SyncTable in a nutshell

HashTable/SyncTable is a tool implemented as two map-reduce jobs that are executed as individual steps. It looks similar to the CopyTable tool, which can perform both partial or entire table data copies. Unlike CopyTable it only copies diverging data between target clusters, saving both network and computing resources during the copy procedure.

The first step to be executed in the process is the HashTable map-reduce job. This should be run on the cluster whose data should be copied to the remote peer, normally the source cluster. A quick example of how to run it is shown below, detailed explanation of each of the required parameters is given later in this article: 

hbase org.apache.hadoop.hbase.mapreduce.HashTable --families=cf my-table /hashes/test-tbl

20/04/28 05:05:48 INFO mapreduce.Job:  map 100% reduce 100%
20/04/28 05:05:49 INFO mapreduce.Job: Job job_1587986840019_0001 completed successfully
20/04/28 05:05:49 INFO mapreduce.Job: Counters: 68

File Input Format Counters 
Bytes Read=0
File Output Format Counters 
Bytes Written=6811788

Once the HashTable job execution with the above command is completed, some output files have been generated in the source hdfs /hashes/my-table directory:

hdfs dfs -ls -R /hashes/test-tbl
drwxr-xr-x   - root supergroup          0 2020-04-28 05:05 /hashes/test-tbl/hashes
-rw-r--r--   2 root supergroup          0 2020-04-28 05:05 /hashes/test-tbl/hashes/_SUCCESS
drwxr-xr-x   - root supergroup          0 2020-04-28 05:05 /hashes/test-tbl/hashes/part-r-00000
-rw-r--r--   2 root supergroup    6790909 2020-04-28 05:05 /hashes/test-tbl/hashes/part-r-00000/data
-rw-r--r--   2 root supergroup      20879 2020-04-28 05:05 /hashes/test-tbl/hashes/part-r-00000/index
-rw-r--r--   2 root supergroup         99 2020-04-28 05:04 /hashes/test-tbl/manifest
-rw-r--r--   2 root supergroup        153 2020-04-28 05:04 /hashes/test-tbl/partitions

These are needed as an input for the SyncTable run. SyncTable must be launched at the target peer. Below command runs SyncTable for the output of HashTable from the previous example. It uses the dryrun option explained later in this article:

hbase org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase hdfs://source-cluster-active-nn/hashes/test-tbl test-tbl test-tbl

org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_MATCHED=97146
HASHES_NOT_MATCHED=2
MATCHINGCELLS=17
MATCHINGROWS=2
RANGESNOTMATCHED=2
ROWSWITHDIFFS=2
SOURCEMISSINGCELLS=1
TARGETMISSINGCELLS=1

As a quick reference, you may just replace the given parameters on both examples by your actual environment values. The remainder of this article will cover the implementation details in more depth.

Why two different steps?

The main goal of this tool is to identify and copy only data that is missing between the two clusters. HashTable works as a sharding/indexing job, analysing batches of the table data and generating hash indexes for each of these batches. These are the output written in the files under the /hashes/my-table hdfs directory passed as one of the job parameters. As mentioned before, this output is required by the SyncTable job. SyncTable scans the target table locally in the same batch sizes as used by HashTable, and also computes hash values for these batches using the same function used by HashTable. It then compares the local batch hash value with the one from the HashTable output. If the hash values are equal, it means the whole batch is identical in the two clusters, and nothing needs to be copied on that segment. Otherwise, it opens a scan for the batch in the source cluster, checking if each of the cells already exists in the target cluster, copying only those that diverge. On sparse, slightly different data sets, this would result in much less data being copied between the two clusters. It would also require only a small number of cells to be scanned in the source to check for mismatches.

Required Parameters

HashTable requires only two parameters: the table name and output path where related hashes and other meta info files will be written. SyncTable uses HashTable output dir as input, together with the table names in the source and in the target cluster, respectively. Since we are using HashTable/SyncTable to move data between remote clusters, sourcezkcluster option must be defined for SyncTable. This should be the zookeeper quorum address of the source cluster. In this article example, we had also referenced the source cluster active namenode address directly, so that SyncTable will read the hash output file directly from the source cluster. Alternatively, HashTable output can be manually copied from source cluster to remote cluster (with distcp, for instance).

NOTE: Working with remote clusters under different kerberos realm is only supported from CDH 6.2.1 onwards.

Advanced options

Both HashTable and SyncTable offer extra optional options that can be tuned for optimal results. 

HashTable allows for filtering the data by both row key and modification time, with startrow/starttime, stoprow/stoptime properties, respectively. The dataset scope can also be limited by versions and families properties. The batchsize property defines the size of each portion that will be hashed. This impacts directly on the sync performance. In cases of very few mismatches, setting larger batch size values can lead to better performance as greater portions of the data set would be ignored without needing to be scanned by SyncTable.

SyncTable provides a dryrun option which allows for a preview of the changes to be applied in the target. 

SyncTable default behaviour is to mirror the source data at the target side, so any additional cell present in target but absent in source ends up getting deleted at target side. That can be undesirable when syncing clusters under an Active-Active replication setup and for such cases, doDeletes options can be turned to false, skipping replication of deletions on the target. There is also a similar doPuts flag for cases where additional cells should not be inserted into the target cluster.

Analysing the outputs

HashTable outputs a few files with meta information for SyncTable, those, however, are not human readable. It does not perform any changes on existing data, so related info is of little interest for a user context. 

SyncTable is the step that really applies the modifications on the target and it might be important to review its summary before actually altering the target cluster data (see dryrun option mentioned above). It publishes some relevant counters at the end of the map reduce execution. Looking at the values from above example, we can see there were 97148 partitions hashed (reported by BATCHES counter), which SyncTable detected divergences in only two of them (according to the HASHES_MATCHED and HASHES_NOT_MACTHED counters). Additionally, within the two partitions having different hashes,  17 cells over 2 rows were matching (as reported by MATCHING_CELLS and MATCHING_ROWS, respectively), but there were also 2 rows diverging, on these two partitions (according to RANGESNOTMATCHED and ROWSWITHDIFFS). Finally, SOURCEMISSINGCELLS and TARGETMISSINGCELLS tell us in detail if cells were present on source or target cluster only. In this example, source cluster had one cell that was not on the target, but target also had a cell that was not on source. Since SyncTable was run without specifying dryrun option and setting doDeletes option to false, the job has deleted the extra cell in the target cluster and had added the extra cell found in source to the target cluster. Assuming no writes happen on either clusters, a subsequent run of the very same SyncTable command in the target cluster would show no differences:

hbase org.apache.hadoop.hbase.mapreduce.SyncTable --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase hdfs://nn:9000/hashes/test-tbl test-tbl test-tbl

org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_MATCHED=97148

Applicable Scenarios

Data Sync

At first glance, HashTable/SyncTable may seem to overlap with the CopyTable tool, but there are still specific scenarios where either tools would be more suitable. As a first comparison example, using HashTable/SyncTable for an initial load of a table containing 100,004 rows and a total data size of 5.17GB required a few minutes just for SyncTable to complete:

...
20/04/29 03:48:00 INFO mapreduce.Job: Running job: job_1587985272792_0011
20/04/29 03:48:09 INFO mapreduce.Job: Job job_1587985272792_0011 running in uber mode : false
20/04/29 03:48:09 INFO mapreduce.Job:  map 0% reduce 0%
20/04/29 03:54:08 INFO mapreduce.Job:  map 100% reduce 0%
20/04/29 03:54:09 INFO mapreduce.Job: Job job_1587985272792_0011 completed successfully

org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
EMPTY_BATCHES=97148
HASHES_NOT_MATCHED=97148
RANGESNOTMATCHED=97148
ROWSWITHDIFFS=100004
TARGETMISSINGCELLS=749589
TARGETMISSINGROWS=100004

Even on such small dataset, CopyTable executed quicker (roughly 3 minutes, while SyncTable took 6 minutes to copy the whole data set):

...
20/04/29 05:12:07 INFO mapreduce.Job: Running job: job_1587986840019_0005
20/04/29 05:12:24 INFO mapreduce.Job: Job job_1587986840019_0005 running in uber mode : false
20/04/29 05:12:24 INFO mapreduce.Job:  map 0% reduce 0%
20/04/29 05:13:16 INFO mapreduce.Job:  map 25% reduce 0%
20/04/29 05:13:49 INFO mapreduce.Job:  map 50% reduce 0%
20/04/29 05:14:37 INFO mapreduce.Job:  map 75% reduce 0%
20/04/29 05:15:14 INFO mapreduce.Job:  map 100% reduce 0%
20/04/29 05:15:14 INFO mapreduce.Job: Job job_1587986840019_0005 completed successfully

HBase Counters
BYTES_IN_REMOTE_RESULTS=2787236791
BYTES_IN_RESULTS=5549784428
MILLIS_BETWEEN_NEXTS=130808
NOT_SERVING_REGION_EXCEPTION=0
NUM_SCANNER_RESTARTS=0
NUM_SCAN_RESULTS_STALE=0
REGIONS_SCANNED=4
REMOTE_RPC_CALLS=1334
REMOTE_RPC_RETRIES=0
ROWS_FILTERED=0
ROWS_SCANNED=100004
RPC_CALLS=2657
RPC_RETRIES=0

Now let’s use both tools again to deal with sparse differences over the dataset. The test-tbl table used on all these examples has four regions in the source cluster. After all the original data set had been copied to target cluster in the former example, we added four additional rows at source side only, one for each of the existing regions, then run HashTable/SyncTable again to sync both clusters:

20/04/29 05:29:23 INFO mapreduce.Job: Running job: job_1587985272792_0013
20/04/29 05:29:39 INFO mapreduce.Job: Job job_1587985272792_0013 running in uber mode : false
20/04/29 05:29:39 INFO mapreduce.Job:  map 0% reduce 0%
20/04/29 05:29:53 INFO mapreduce.Job:  map 50% reduce 0%
20/04/29 05:30:42 INFO mapreduce.Job:  map 100% reduce 0%
20/04/29 05:30:42 INFO mapreduce.Job: Job job_1587985272792_0013 completed successfully

org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_MATCHED=97144
HASHES_NOT_MATCHED=4
MATCHINGCELLS=42
MATCHINGROWS=5
RANGESNOTMATCHED=4
ROWSWITHDIFFS=4
TARGETMISSINGCELLS=4
TARGETMISSINGROWS=4

We can see that with just four partitions mismatching, SyncTable was considerably quicker (roughly a minute to complete). Using CopyTable to perform this same sync showed the following results:

20/04/29 08:32:38 INFO mapreduce.Job: Running job: job_1587986840019_0008
20/04/29 08:32:52 INFO mapreduce.Job: Job job_1587986840019_0008 running in uber mode : false
20/04/29 08:32:52 INFO mapreduce.Job:  map 0% reduce 0%
20/04/29 08:33:38 INFO mapreduce.Job:  map 25% reduce 0%
20/04/29 08:34:15 INFO mapreduce.Job:  map 50% reduce 0%
20/04/29 08:34:48 INFO mapreduce.Job:  map 75% reduce 0%
20/04/29 08:35:31 INFO mapreduce.Job:  map 100% reduce 0%
20/04/29 08:35:32 INFO mapreduce.Job: Job job_1587986840019_0008 completed successfully

HBase Counters
BYTES_IN_REMOTE_RESULTS=2762547723
BYTES_IN_RESULTS=5549784600
MILLIS_BETWEEN_NEXTS=340672
NOT_SERVING_REGION_EXCEPTION=0
NUM_SCANNER_RESTARTS=0
NUM_SCAN_RESULTS_STALE=0
REGIONS_SCANNED=4
REMOTE_RPC_CALLS=1323
REMOTE_RPC_RETRIES=0
ROWS_FILTERED=0
ROWS_SCANNED=100008
RPC_CALLS=2657
RPC_RETRIES=0

CopyTable took the same amount of time to sync the tables as when copying the whole dataset, even though there were only four cells to copy. This was still ok for this very small dataset, and with an idle cluster, but under production use cases with larger data sets and where target cluster may also be in use by many client applications writing data on it, CopyTable performance degradation in comparison to SyncTable would be even higher.

It’s worth mentioning that there are also additional tools/features that could be used in combination for an initial load of a target cluster (the target has no data at all), such as snapshots export, bulk load, or even a direct copy of original table dirs from source cluster. For initial loads with large sums of data to copy, taking a table snapshot and then using  the ExportSnapshot tool will outperform online copy tools such as SyncTable or CopyTable.

Checking Replication Integrity

Another common use of HashTable/SyncTable is for monitoring replication state between clusters, when troubleshooting possible replication issues. In this scenario it functions as an alternative to the VerifyReplication tool. Typically when checking the state between two clusters there are either no mismatches at all or a temporary opartionally issue has caused a small portion of the larger dataset to fall out of sync. In the test environment we have been using for our previous example there should be 100,008 rows with matching values on both clusters. Running SyncTable on the destination cluster with the dryrun option will let us identify any differences:

20/05/04 10:47:25 INFO mapreduce.Job: Running job: job_1588611199158_0004

20/05/04 10:48:48 INFO mapreduce.Job:  map 100% reduce 0%
20/05/04 10:48:48 INFO mapreduce.Job: Job job_1588611199158_0004 completed successfully

HBase Counters
BYTES_IN_REMOTE_RESULTS=3753476784
BYTES_IN_RESULTS=5549784600
ROWS_SCANNED=100008

org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_MATCHED=97148
...

Unlike SyncTable we must run the VerifyReplication tool on the source cluster. We pass the peer id as one of its parameters so that it can find the remote cluster to scan for comparison:
20/05/04 11:01:58 INFO mapreduce.Job: Running job: job_1588611196128_0001

20/05/04 11:04:39 INFO mapreduce.Job:  map 100% reduce 0%
20/05/04 11:04:39 INFO mapreduce.Job: Job job_1588611196128_0001 completed successfully

HBase Counters
BYTES_IN_REMOTE_RESULTS=2761955495
BYTES_IN_RESULTS=5549784600


org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication$Verifier$Counters
GOODROWS=100008
...

With no differences, SyncTable finds all hashes matching between source and target partitions and thus, avoids the need to scan the remote source cluster again. VerifyReplication performs a one by one comparison for each cell in both clusters which might already carry a high network cost even when dealing with such small datasets.

Adding one extra row in the source cluster and performing the checks again. With VerifyReplication:

20/05/05 11:14:05 INFO mapreduce.Job: Running job: job_1588611196128_0004

20/05/05 11:16:32 INFO mapreduce.Job:  map 100% reduce 0%
20/05/05 11:16:32 INFO mapreduce.Job: Job job_1588611196128_0004 completed successfully

org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication$Verifier$Counters
BADROWS=1
GOODROWS=100008
ONLY_IN_SOURCE_TABLE_ROWS=1

Before we can use SyncTable, we have to regenerate hashes on source with HashTable again, as there’s a new cell now:

20/05/04 11:31:48 INFO mapreduce.Job: Running job: job_1588611196128_0003

20/05/04 11:33:15 INFO mapreduce.Job: Job job_1588611196128_0003 completed successfully
...

Now SyncTable:

20/05/07 05:47:51 INFO mapreduce.Job: Running job: job_1588611199158_0014


20/05/07 05:49:20 INFO mapreduce.Job: Job job_1588611199158_0014 completed successfully

org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$Counter
BATCHES=97148
HASHES_NOT_MATCHED=97148
MATCHINGCELLS=749593
MATCHINGROWS=100008
RANGESMATCHED=97147
RANGESNOTMATCHED=1
ROWSWITHDIFFS=1
TARGETMISSINGCELLS=1
TARGETMISSINGROWS=1

We can see the increase in execution time due to additional scan and cell comparison between the two remote clusters. Meanwhile, VerifyReplication execution time showed little variation.

Conclusion

HashTable/SyncTable is a valuable tool for moving data around, when dealing with sparse mismatches between two clusters datasets. It makes usage of data partitioning and hashing to efficiently detect differences on ranges from the two data sets, reducing the number of cells to be scanned while comparing data from the two clusters, whereas also avoiding unnecessary puts of already existing values in the target cluster. It’s not a silver bullet, though, as demonstrated on some of the examples above, whilst it may seem to overlap in function with the CopyTable tool, the latter can offer better performance when handling larger, sequential range of mismatching cells. In that sense, HashTable/SyncTable would be most applicable in cases where both clusters already have some data residing, or in cases where an existing replication setup has been disrupted by temporary unavailability of one of the peers.

Related articles:

https://blog.cloudera.com/apache-hbase-replication-overview/

https://blog.cloudera.com/approaches-to-backup-and-disaster-recovery-in-hbase/

https://blog.cloudera.com/online-apache-hbase-backups-with-copytable/

https://blog.cloudera.com/introduction-to-apache-hbase-snapshots/

Wellington Chevreuil
Wellington Chevreuil

Leave a comment

Your email address will not be published. Links are not permitted in comments.