Thanks to Pengyu Wang, software developer at FINRA, for permission to republish this post.
Salted Apache HBase tables with pre-split is a proven effective HBase solution to provide uniform workload distribution across RegionServers and prevent hot spots during bulk writes. In this design, a row key is made with a logical key plus salt at the beginning. One way of generating salt is by calculating n (number of regions) modulo on the hash code of the logical row key (date, etc).
Salting Row Keys
For example, a table accepting data load on a daily basis might use logical row keys starting with a date, and we want to pre-split this table into 1,000 regions. In this case, we expect to generate 1,000 different salts. The salt can be generated, for example, as:
StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logicalKey = 2015-04-26|abc rowKey = 893|2015-04-26|abc
The output from hashCode()
with modulo provides randomness for salt value from “000” to “999”. With this key transform, the table is pre-split on the salt boundaries as it’s created. This will make row volumes uniformly distributed while loading the HFiles with MapReduce bulkload. It guarantees that row keys with same salt fall into the same region.
In many use cases, like data archiving, you need to scan or copy the data over a particular logical key range (date range) using MapReduce job. Standard table MapReduce jobs are setup by providing the Scan
instance with key range attributes.
Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setBatch(1000); scan.setMaxVersions(1); scan.setStartRow(Bytes.toBytes("2015-04-26")); scan.setStopRow(Bytes.toBytes("2015-04-27")); /* Setup the table mapper job */ TableMapReduceUtil.initTableMapperJob( tablename, scan, DataScanMapper.class, ImmutableBytesWritable.class, KeyValue.class, job, true, TableInputFormat.class ); …
However, setup of such a job becomes challenging for salted pre-splitted tables. Start and stop row keys will be different for each region because each has a unique salt. And we can’t specify multiple ranges to one Scan
instance.
To solve this problem, we need to look into how table MapReduce works. Generally, the MapReduce framework creates one map task to read and process each input split. Each split is generated in InputFormat
class base, by method getSplits()
.
In HBase table MapReduce job, TableInputFormat
is used as InputFormat
. Inside the implementation, the getSplits()
method is overridden to retrieve the start and stop row keys from the Scan
instance. As the start and stop row keys span across multiple regions, the range is divided by region boundaries and returns the list of TableSplit
objects which covers the scan key range. Instead of being based by HDFS block, TableSplit
s are based on region. By overwriting the getSplits()
method, we are able to control the TableSplit
.
Building Custom TableInputFormat
To change the behavior of the getSplits()
method, a custom class extending TableInputFormat
is required. The purpose of getSplits()
here is to cover the logical key range in each region, construct their row key range with their unique salt. The HTable class provides method getStartEndKeys()
which returns start and end row keys for each region. From each start key, parse the corresponding salt for the region.
Pair keys = table.getStartEndKeys(); for (int i = 0; i < keys.getFirst().length; i++) { // The first 3 bytes is the salt, for the first region, start key is empty, so apply “000” if (keys.getFirst()[i].length == 0) { regionSalt = "000"; } else { regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3); } … }
Job Configuration Passes Logical Key Range
TableInputFormat
retrieves the start and stop key from Scan
instance. Since we cannot use Scan
in our MapReduce job, we could use Configuration
instead to pass these two variables and only logical start and stop key is good enough (a variable could be a date or other business information). The getSplits()
method has JobContext
argument, The configuration instance can be read as context.getConfiguration()
.
In MapReduce driver:
Configuration conf = getConf(); conf = HBaseConfiguration.addHbaseResources(conf); conf.set("logical.scan.start", "2015-04-26"); conf.set("logical.scan.stop", "2015-04-27");
In Custom TableInputFormat
:
@Override public List getSplits(JobContext context) throws IOException { conf = context.getConfiguration(); String scanStart = conf.get("logical.scan.start"); String scanStop = conf.get("logical.scan.stop"); … }
Reconstruct the Salted Key Range by Region
Now that we have the salt and logical start/stop key for each region, we can rebuild the actual row key range.
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);
Creating a TableSplit for Each Region
With row key range, we can now initialize TableSplit
instance for the region.
List splits = new ArrayList(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { … byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop); InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation); splits.add(split); }
One more thing to look at is data locality. The framework uses location information in each input split to assign a map task in its local host. For our TableInputFormat
, we use the method getTableRegionLocation()
to retrieve the region location serving the row key.
This location is then passed to the TableSplit
constructor. This will assure that the mapper processing the table split is on the same region server. One method, called DNS.reverseDns()
, requires the address for the HBase name server. This attribute is stored in configuration “hbase.nameserver.address
“.
this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null); … public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException { HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress(); InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress(); String regionLocation; try { regionLocation = reverseDNS(regionAddress); } catch (NamingException e) { regionLocation = regionServerAddress.getHostname(); } return regionLocation; } protected String reverseDNS(InetAddress ipAddress) throws NamingException { String hostName = this.reverseDNSCacheMap.get(ipAddress); if (hostName == null) { hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); this.reverseDNSCacheMap.put(ipAddress, hostName); } return hostName; }
A complete code of getSplits
will look like this:
@Override public List getSplits(JobContext context) throws IOException { conf = context.getConfiguration(); table = getHTable(conf); if (table == null) { throw new IOException("No table was provided."); } // Get the name server address and the default value is null. this.nameServer = conf.get("hbase.nameserver.address", null); String scanStart = conf.get("region.scan.start"); String scanStop = conf.get("region.scan.stop"); Pair keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new RuntimeException("At least one region is expected"); } List splits = new ArrayList(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]); String regionSalt = null; if (keys.getFirst()[i].length == 0) { regionSalt = "000"; } else { regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3); } byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop); InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation); splits.add(split); } log.info("Total table splits: " + splits.size()); return splits; }
Use the Custom TableInoutFormat in the MapReduce Driver
Now we need to replace the TableInputFormat
class with the custom build we used for table MapReduce job setup.
Configuration conf = getConf(); conf = HBaseConfiguration.addHbaseResources(conf); HTableInterface status_table = new HTable(conf, status_tablename); conf.set("logical.scan.start", "2015-04-26"); conf.set("logical.scan.stop", "2015-04-27"); Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setBatch(1000); scan.setMaxVersions(1); /* Setup the table mapper job */ TableMapReduceUtil.initTableMapperJob( tablename, scan, DataScanMapper.class, ImmutableBytesWritable.class, KeyValue.class, job, true, MultiRangeTableInputFormat.class );
The approach of custom TableInputFormat
provides an efficient and scalable scan capability for HBase tables which are designed to use salt for a balanced data load. Since the scan can bypass any unrelated row keys, regardless of how big the table is, the scan’s complexity is limited only to the size of the target data. In most use cases, this can guarantee relatively consistent processing time as the table grows.