Having a good grasp of HDFS recovery processes is important when running or moving toward production-ready Apache Hadoop. In the conclusion to this two-part post, pipeline recovery is explained.
An important design requirement of HDFS is to ensure continuous and correct operations that support production deployments. For that reason, it’s important for operators to understand how HDFS recovery processes work. In Part 1 of this post, we looked at lease recovery and block recovery. Now, in Part 2, we explore pipeline recovery.
All three recovery processes are essential for HDFS fault tolerance. Together, they help to ensure that writes are durable and consistent in HDFS, even in the presence of network and node failures.
Recap
In HDFS, files are divided into blocks, and file access follows multi-reader, single-writer semantics. To meet the fault-tolerance requirement, multiple replicas of a block are stored on different DataNodes. The number of replicas is called the replication factor. When a new file block is created, or an existing file is opened for append, the HDFS write operation creates a pipeline of DataNodes to receive and store the replicas (the replication factor generally determines the number of DataNodes in the pipeline). Subsequent writes to that block go through the pipeline (Figure 1).
Figure 1. HDFS Write Pipeline
For read operations the client chooses one of the DataNodes holding copies of the block and requests a data transfer from it.
For a deeper dive into this background information, read Part 1 of this post.
Pipeline Recovery
The Write Pipeline
When an HDFS client writes to file, the data is written as sequential blocks. To write or construct a block, HDFS breaks the block into packets (not actually network packets but rather messages; the term packets refers to the class which embodies these messages), and propagates them to the DataNodes in the write pipeline, as shown in Figure 2.
Figure 2. HDFS Write Pipeline Stages
There are three stages of a write pipeline:
- Pipeline setup. The client sends a Write_Block request along the pipeline and the last DataNode sends an acknowledgement back. After receiving the acknowledgement, the pipeline is ready for writing.
- Data streaming. The data is sent through the pipeline in packets. The client buffers the data until a packet is filled up, and then sends the packet to the pipeline. If the client calls hflush(), then even if a packet is not full, it will nevertheless be sent to the pipeline and the next packet will not be sent until the acknowledgement of the previous hflush’ed packet is received by the client.
- Close (finalize the replica and shutdown the pipeline). The client waits until all packets have been acknowledged and then sends a close request. All DataNodes in the pipeline change the corresponding replica into the FINALIZED state and report back to the NameNode. The NameNode then changes the block’s state to COMPLETE if at least the configured minimum replication number of DataNodes reported a FINALIZED state of their corresponding replicas.
Pipeline Recovery
Pipeline recovery is initiated when one or more DataNodes in the pipeline encounter an error in any of the three stages while a block is being written.
Recovery from Pipeline Setup Failure
- If the pipeline was created for a new block, the client abandons the block and asks the NameNode for a new block and a new list of DataNodes. The pipeline is reinitialized for the new block.
- If the pipeline was created to append to a block, the client rebuilds the pipeline with the remaining DataNodes and increments the block’s generation stamp.
Recovery from Data Streaming Failure
- When a DataNode in the pipeline detects an error (for example, a checksum error or a failure to write to disk), that DataNode takes itself out of the pipeline by closing up all TCP/IP connections. If the data is deemed not corrupted, it also writes buffered data to the relevant block and checksum (METADATA) files.
- When the client detects the failure, it stops sending data to the pipeline, and reconstructs a new pipeline using the remaining good DataNodes. As a result, all replicas of the block are bumped up to a new GS.
- The client resumes sending data packets with this new GS. If the data sent has already been received by some of the DataNodes, they just ignore the packet and pass it downstream in the pipeline.
Recovery from Close Failure
- When the client detects a failure in the close state, it rebuilds the pipeline with the remaining DataNodes. Each DataNode bumps up the block’s GS and finalizes the replica if it’s not finalized yet.
When one DataNode is bad, it removes itself from the pipeline. During the pipeline recovery process, the client may need to rebuild a new pipeline with the remaining DataNodes. (It may or may not replace bad DataNodes with new DataNodes, depending on the DataNode replacement policy described in the next section.) The replication monitor will take care of replicating the block to satisfy the configured replication factor.
DataNode Replacement Policy upon Failure
There are four configurable policies regarding whether to add additional DataNodes to replace the bad ones when setting up a pipeline for recovery with the remaining DataNodes:
- DISABLE: Disables DataNode replacement and throws an error (at the server); this acts like NEVER at the client.
- NEVER: Never replace a DataNode when a pipeline fails (generally not a desirable action).
- DEFAULT: Replace based on the following conditions:
- Let r be the configured replication number.
- Let n be the number of existing replica datanodes.
- Add a new DataNode only if r >= 3 and EITHER
- floor(r/2) >= n; OR
- r > n and the block is hflushed/appended.
- ALWAYS: Always add a new DataNode when an existing DataNode failed. This fails if a DataNode can’t be replaced.
To disable using any of these policies, you can set the following configuration property to false (the default is true):
dfs.client.block.write.replace-datanode-on-failure.enable
When enabled, the default policy is DEFAULT. The following config property changes the policy:
dfs.client.block.write.replace-datanode-on-failure.policy
When using DEFAULT or ALWAYS, if only one DataNode succeeds in the pipeline, the recovery will never succeed and client will not be able to perform the write. This problem is addressed with this configuration property:
dfs.client.block.write.replace-datanode-on-failure.best-effort
which defaults to false. With the default setting, the client will keep trying until the specified policy is satisfied. When this property is set to true, even if the specified policy can’t be satisfied (for example, there is only one DataNode that succeeds in the pipeline, which is less than the policy requirement), the client will still be allowed to continue to write.
Some Solved Issues
- HDFS-5016 details a deadlock scenario in pipeline recovery that causes DataNode to be marked dead (duplicates HDFS-3655 “Datanode recoverRbw could hang sometime” and HDFS-4851 “Deadlock in pipeline recovery”). Here’s what happens: when the recovery is ongoing, it causes some relevant threads to wait for each other, thus deadlocking. Since the
FSDataset
lock is held in this deadlock, the heartbeat thread and data transceiver threads are blocked waiting onFSDataset
lock. The solution is to introduce a timeout mechanism to break the deadlock. - HDFS-4882 reports a case that the NameNode’s LeaseManager keep looping forever in checkLeases. When the hard limit expires, LeaseManager tries to recover lease, if the second-to-last block is COMMITTED, and the last block is COMPLETE,
internalReleaseLease()
would return without releasing the lease, and the LeaseManager will keep trying to release the same lease, thus an infinite loop. Since theFSNamesystem.writeLock
is hold in the loop, it essentially makes the NameNode unresponsive. The fix is to only try releasing a lease periodically rather than continuously. - HDFS-5557 details a case in which write pipeline recovery for the last packet in the block may cause rejection of valid replicas because of incorrect GS recording when handling block report. The worst case is that all good replicas will be rejected and a bad one is accepted. In this case, the corresponding block will get completed, but the data cannot be read until the next full block report containing one of the valid replicas is received. The solution is to fix the GS recording.
- HDFS-5558 reports a case that LeaseManager monitor thread can crash if the last block is complete but second-to-last block is not. If a file has its last and second-to-last block not in COMPLETE state and it is attempted to close the file, the last block may change to COMPLETE but the second-to-last one might not. If this condition lasts long and the file is abandoned, LeaseManager will try to recover the lease and do block recovery on the block. But
internalReleaseLease()
will fail with invalid cast exception with this kind of file. The solution is to ensure the second-to-last block is in COMPLETE state before closing the file.
Known Open Issues
- With the introduction of
dfs.client.block.write.replace-datanode-on-failure.best-effort
, a client will be able to continue to write even if there is only one DataNode. When this happens, a block may have only one replica, and if anything happens to this single copy before it is replicated, data loss will occur. To alleviate the problem, HDFS-6867 proposes a background thread to do the pipeline recovery while the client is writing to the single replica. - HDFS-4504 details the case where
DFSOutputStream#close
doesn’t always release resources (such as leases). In some cases,DFSOutputStream#close
can throw anIOException
. One example is if there is a pipeline error and then pipeline recovery fails. Unfortunately, in this case, some of the resources used by theDFSOutputStream
are leaked. One particularly important resource is file leases.So, it’s possible for a long-lived HDFS client, such as Apache Flume, to write many blocks to a file but then fail to close it. However, the LeaseRenewer thread inside the client will continue to renew the lease for the “undead” file. Future attempts to close the file will just re-throw the previous exception, and no progress can be made by the client. - HDFS-6937 details a pipeline recovery issue due to checksum error. The data on the middle DataNode (assuming a replication factor of 3) is somehow corrupted, but not detected. The last DataNode found out the checksum error and takes itself out from the pipeline. The recovery process keeps trying to replace the last DataNode in the pipeline with a new one, as well as replicating the data from the middle DataNode to the new one. Each time the replication fails due to checksum error (because of the corrupted replica at the middle DataNode), and the new DataNode is marked as a bad and thrown away, even though it’s not really bad. Eventually the recovery fails after exhausting all the DataNodes.
- HDFS-7342 reports a case that Lease Recovery can not succeed when the second-to-last block is COMMITTED and the last block is COMPLETE. One suggested solution is to force the the lease to be recovered, which is similar to how we handle when the last block is COMMITTED. One can see that HDFS-7342, HDFS-4882, HDFS-5558 are related in that the second-to-last block is in COMMITTED state. The subtlety of the issue is still under investigation currently.
Conclusion
Lease recovery, block recovery, and pipeline recovery are all essential for HDFS fault tolerance. Together, they insure that writes are durable and consistent in HDFS, even in the presence of network and node failures.
Hopefully, after reading these posts, you have a better understanding of when and why these processes are invoked, and what they do. If you are interested in learning more, you can read through some of the links including the design specification, JIRAs referenced here, or the relevant code.
Yongjun Zhang is a Software Engineer at Cloudera, and a Hadoop committer.