Apache HBase Log Splitting

In the recent blog post about the Apache HBase Write Path, we talked about the write-ahead-log (WAL), which plays an important role in preventing data loss should a HBase region server failure occur.  This blog post describes how HBase prevents data loss after a region server crashes, using an especially critical process for recovering lost updates called log splitting.

Log splitting

As we mentioned in the write path blog post, HBase data updates are stored in a place in memory called memstore for fast write. In the event of a region server failure, the contents of the memstore are lost because they have not been saved to disk yet. To prevent data loss in such a scenario, the updates are persisted in a WAL file before they are stored in the memstore. In the event of a region server failure, the lost contents in the memstore can be regenerated by replaying the updates (also called edits) from the WAL file.

A region server serves many regions.  All of the regions in a region server share the same active WAL file. Each edit in the WAL file has information about which region it belongs to.  When a region is opened, we need to replay those edits in the WAL file that belong to that region.  Therefore, edits in the WAL file must be grouped by region so that particular sets can be replayed to regenerate the data in a particular region. The process of grouping the WAL edits by region is called log splitting. It is a critical process for recovering data if a region server fails.

Log splitting is done by HMaster as the cluster starts or by ServerShutdownHandler as a region server shuts down. Since we need to guarantee consistency, affected regions are unavailable until data is restored. So we need to recover and replay all WAL edits before letting those regions become available again. As a result, regions affected by log splitting are unavailable until the process completes and any required edits are applied.

When log splitting starts, the log directory is renamed as follows:

/hbase/.logs/<host>,
<port>,<startcode>-splitting

For example:

/hbase/.logs/srv.example.com,60020,1254173957298-splitting

It is important that HBase renames the folder. A region server may still be up when the master thinks it is down. The region server may not respond immediately and consequently doesn’t heartbeat its ZooKeeper session. HMaster may interpret this as an indication that the region server has failed. If the folder is renamed, any existing, valid WAL files still being used by an active but busy region server are not accidentally written to.

Each log file is split one at a time. The log splitter reads the log file one edit entry at a time and puts each edit entry into the buffer corresponding to the edit’s region. At the same time, the splitter starts several writer threads. Writer threads pick up a corresponding buffer and write the edit entries in the buffer to a temporary recovered edit file.

The file location and name is of the following form:

/hbase/
<table_name>/<region_id>/recovered.edits/.temp

The <sequenceid> shown above is the sequence id of the first log entry written to the file. The temporary recovered edit file is used for all the edits in the WAL file for this region. Once log splitting is completed, the temporary file is renamed to:

/hbase/
<table_name>/<region_id>/recovered.edits/<sequenceid>

In the preceding example, the is the highest (most recent) edit sequence id of the entries in the recovered edit file. As a result, when replaying the recovered edits, it is possible to determine if all edits have been written. If the last edit that was written to the HFile is greater than or equal to the edit sequence id included in the file name, it is clear that all writes from the edit file have been completed.

When the log splitting is completed, each affected region is assigned to a region server. When the region is opened, the recovered.edits folder is checked for recovered edits files. If any such files are present, they are replayed by reading the edits and saving them to the memstore. After all edit files are replayed, the contents of the memstore are written to disk (HFile) and the edit files are deleted.

Times to complete single threaded log splitting vary, but the process may take several hours if multiple region servers have crashed. Distributed log splitting was added in HBase version 0.92 (HBASE-1364) by Prakash Khemani from Facebook.  It reduces the time to complete the process dramatically, and hence improves the availability of regions and tables. For example, we knew a cluster crashed. With single threaded log splitting, it took around 9 hours to recover.  With distributed log splitting, it just took around 6 minutes.

Distributed log splitting

HBase 0.90 log splitting is all done by the HMaster. For one log splitting invocation, all the log files are processed sequentially.  After a cluster restarts from crash, unfortunately, all region servers are idle and waiting for the master to finish the log splitting.  Instead of having all the region servers remain idle, why not make them useful and help in the log splitting process? This is the insight behind distributed log splitting

With distributed log splitting, the master is the boss.  It has a split log manager to manage all log files which should be scanned and split. Split log manager puts all the files under the splitlog ZooKeeper node (/hbase/splitlog) as tasks. For example, while in zkcli, “ls /hbase/splitlog” returns:

[hdfs%3A%2F%2Fhost2.sample.com%3A56020%2Fhbase%2F.logs%2Fhost8.sample.com%2C57020%2C1340474893275-splitting%2Fhost8.sample.com%253A57020.1340474893900, hdfs%3A%2F%2Fhost2.sample.com%3A56020%2Fhbase%2F.logs%2Fhost3.sample.com%2C57020%2C1340474893299-splitting%2Fhost3.sample.com%253A57020.1340474893931, hdfs%3A%2F%2Fhost2.sample.com%3A56020%2Fhbase%2F.logs%2Fhost4.sample.com%2C57020%2C1340474893287-splitting%2Fhost4.sample.com%253A57020.1340474893946]

After some characters are converted into plain ASCII, it is:

[hdfs://host2.sample.com:56020/hbase/.logs/host8.sample.com,57020,1340474893275-splitting/host8.sample.com%3A57020.1340474893900, hdfs://host2.sample.com:56020/hbase/.logs/host3.sample.com,57020,1340474893299-splitting/host3.sample.com%3A57020.1340474893931, hdfs://host2.sample.com:56020/hbase/.logs/host4.sample.com,57020,1340474893287-splitting/host4.sample.com%3A57020.1340474893946]

It is a list of WAL file names to be scanned and split, which is a list of log splitting tasks.

Once split log manager publishes all the tasks to the splitlog znode, it monitors these task nodes and waits for them to be processed.

Split Log Manager

In each region server, there is a daemon thread called split log worker. Split log worker does the actual work to split the logs. The worker watches the splitlog znode all the time.  If there are new tasks, split log worker retrieves the task paths, and then loops through them all to grab any one which is not claimed by other worker yet.  After it grabs one, it tries to claim the ownership of the task, to work on the task if successfully owned, and to update the task’s state properly based on the splitting outcome. After the split worker completes the current task, it tries to grab another task to work on if any remains.

This feature is controlled by the configuration hbase.master.distributed.log.splitting property. By default, it is enabled. (Note that distributed log splitting is backported to CDH3u3 which is based on 0.90. However, it is disabled by default in CDH3u3. To enable it, you need to set configuration parameter hbase.master.distributed.log.splitting to true). When HMaster starts up, a split log manager instance is created if this parameter is not explicitly set to false. The split log manager creates a monitor thread. The monitor thread periodically does the following:

  1. Checks if there are any dead split log workers queued up. If so, it will resubmit those tasks owned by the dead workers. If the resubmit fails due to some ZooKeeper exception, the dead worker is queued up again for retry.
  2. Checks if there are any unassigned tasks. If so, create an ephemeral rescan node so that each split log worker is notified to re-scan unassigned tasks via the nodeChildrenChanged ZooKeeper event.
  3. Checks those assigned tasks if they are expired. If so, move the task to TASK_UNASSIGNED state again so that they can be retried. These tasks could be assigned to some slow workers, or could be already finished. It is fine since the split can be retried due to the idempotency of the log splitting task; that is, the same log splitting task can be processed many times without causing any problem.

Split log manager watches the HBase split log znodes all the time. If any split log task node data is changed, it retrieves the node data. The node data has the current state of the task. For example, while in zkcli, “get /hbase/splitlog/hdfs%3A%2F%2Fhost2.sample.com%3A56020%2Fhbase%2F.logs%2Fhost6.sample.com%2C57020%2C1340474893287-splitting%2Fhost6.sample.com%253A57020.1340474893945” returns:

unassigned host2.sample.com:57000
cZxid = 0×7115
ctime = Sat Jun 23 11:13:40 PDT 2012
mZxid = 0×7115
mtime = Sat Jun 23 11:13:40 PDT 2012
pZxid = 0×7115
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0×0
dataLength = 33
numChildren = 0

It shows this task is still unassigned.

Based on the state of the task whose data is changed, the split log manager does one of the following:

  1. Resubmit the task if it is unassigned
  2. Heart beat the task if it is assigned
  3. Resubmit or fail* the task if it is resigned
  4. Resubmit or fail* the task if it is completed with errors
  5. Resubmit or fail* the task if it could not complete due to errors
  6. Delete the task if it is successfully completed or failed

Note: fail a task if:

  1. The task is deleted
  2. The node doesn’t exist anymore
  3. Fails to move the state of the task to TASK_UNASSIGNED
  4. The number of resubmits is over the resubmit threshold

The split log worker is created and started by the region server. So there is a split log worker in each region server. When the split log worker starts, it registers itself to watch HBase znodes.

If any splitlog znode children change, it notifies the worker thread to wake up to grab more tasks if it is sleeping. If current task’s node data is changed, it checks if the task is taken by another worker. If so, interrupt the worker thread and stop the current task.

The split log worker thread keeps checking the task nodes under splitlog znode if any node children change.

For each task, it does the following:

  1. Get the task state and doesn’t do anything if it is not in TASK_UNASSIGNED state.
  2. If it is in TASK_UNASSIGNED state, try to set the state to TASK_OWNED by the worker. If it fails to set the state, it is ok, another worker will try to grab it. Split log manager will also try to ask all workers to rescan later if it remains unassigned.
  3. If the worker gets this task, it tries to get the task state again to make sure it really gets it asynchronously. In the meantime, it starts a split task executor to do the actual work:
  1. Get the HBase root folder, create a temp folder under the root, and split the log file to the temp folder.
  2. If everything is ok, the task executor sets the task to state TASK_DONE.
  3. If catches an unexpected IOException, the task is set to state TASK_ERR.
  4. If the working is shutting down, set the the task to state TASK_RESIGNED.
  5. If the task is taken by another worker, it’s ok, just log it.

Split Log Worker

Split log manager returns when all tasks are completed successfully. If all tasks are completed with some failure, it throws an exception so that the log splitting can be retried. Due to an asynchronous implementation, in very rare cases, split log manager loses track of some completed tasks. So it periodically checks if there is any remaining uncompleted task in its task map or ZooKeeper.  If none, it throws an exception so that the log splitting can be retried right away instead of hanging there waiting for something that won’t happen.

Conclusion

In this blog post, we have presented a critical process, log splitting, to recover lost updates from region server failures. Log splitting used to be done by the HMaster sequentially. In 0.92, an improvement called distributed log splitting was introduced, and the actual work is done by region servers in parallel. Since there are many region servers in the cluster, distributed log splitting dramatically reduces the log splitting time, and improves regions’ availability.

Filed under:

5 Responses
  • Mrc / September 12, 2012 / 3:23 AM

    If throws an Exception will it retry forever?
    Maybe is better to change it ex. 3-10 times, and abort then.

    • Jimmy Xiang / September 12, 2012 / 8:37 AM

      Yes, currently it retries forever. I agree it is better to abort after a certain retries.

  • qinghuang.liang / January 23, 2013 / 3:21 AM

    excellent,it makes me understand what’s the log spilting.

Leave a comment


6 + = eleven