New in CDH 5.1: HDFS Read Caching

Categories: CDH Hadoop HDFS Impala Performance

Applications using HDFS, such as Impala, will be able to read data up to 59x faster thanks to this new feature.

Server memory capacity and bandwidth have increased dramatically over the last few years. Beefier servers make in-memory computation quite attractive, since a lot of interesting data sets can fit into cluster memory, and memory is orders of magnitude faster than disk.

For the latest release of CDH 5.1, Cloudera contributed a read caching feature to HDFS to allow applications in the Apache Hadoop ecosystem to take full advantage of the potential of in-memory computation (HDFS-4949). By using caching, we’ve seen a speedup of up to 59x compared to reading from disk, and up to 3x compared to reading from page cache.

We’ll cover performance evaluation in more detail in a future blog post. Here, we’ll focus on the motivation and design of HDFS caching.


A form of memory caching is already present on each HDFS DataNode: the operating system page cache. The page cache automatically caches recently accessed data on the local filesystem. Because of the page cache, reading the same file more than once will often result in a dramatic speedup. However, the OS page cache falls short when considered in the setting of a distributed system.

One issue is the lack of global information about the in-memory state of each node. Given the choice of multiple HDFS replicas from which to read some data, an application is unable to schedule its tasks for cache-locality. Since the application is forced to schedule its tasks blindly, performance suffers.

When a data analyst runs a query, the application scheduler chooses one of the three block replica locations and runs its task there, which pulls the replica into the page cache (A). However, if the analyst runs the same query again, the scheduler has no way of knowing which replica is in the page cache, and thus no way to place its task for cache locality (B).

Another issue is the page cache’s replacement algorithm, which is a modified version of “least-recently used” eviction. LRU-like algorithms are susceptible to large scans that wipe out the existing contents of the cache. This happens quite commonly on shared Hadoop clusters.

Consider a data analyst running interactive queries on a memory-sized working set: If a large I/O-heavy MapReduce job runs at the same time, it will evict the data analyst’s working set from the page cache, leading to poor interactive performance. Without application-level knowledge of which dataset to keep in memory, the page cache can do no better for mixed workloads. Finally, although reading data from the page cache is faster than disk, it is still inefficient compared to reading directly from memory (so-called zero-copy reads).

Another source of inefficiency is checksum verification. These checksums are intended to catch disk and network errors, and can theoretically be skipped if the client is reading from local in-memory data that has already been checksummed. However, skipping redundant checksumming safely is impossible with the page cache since there’s no way to guarantee that a read is coming from memory. By fixing these two issues, we were able to improve read performance by up to 3x compared to reading from page cache.


The above issues resulted in the following three design requirements:

  1. Global knowledge of cluster cache state, so tasks can be scheduled for cache locality
  2. Global control over cluster cache state, for predictable performance for mixed workloads
  3. Pinning of data in local caches, to enable zero-copy reads and skipping checksums

Based on these requirements, we decided to add centralized cache management to the NameNode.

Example of an HDFS client caching a file: First, itsends a cache directive asking the NameNode to cache the file. The NameNode chooses some DataNodes to cache the requested file, with cache commands piggy-backed on the DataNode heartbeat. DataNodes respond with a cache report when the data is successfully cached.

Caching is explicit and user-driven. When a user wants something cached, they express their intent by creating a cache directive on the NameNode. A cache directive specifies the desired path to cache (meaning a file or directory in HDFS), a desired cache replication factor (up to the file’s replication factor), and the cache pool for the directive (used to enforce quotas on memory use). The system does not automatically manage cache directives, so it’s up to users to manage their outstanding cache directives based on their usage patterns.

Assuming that this cache directive is valid, the NameNode will attempt to cache said data. It will select cache locations from the set of DataNodes with the data on disk, and ask them to cache the data by piggy-backing a cache command on the DataNode heartbeat reply. This is the same way block replication and invalidation commands are sent.

When a DataNode receives a cache command, it pulls the desired data into its local cache by using mmap() and mlock() methods and then verifies its checksums. This series of operations guarantees that the data will remain resident in memory, and that it is safe to read without further checksum verification. Using the mmap() and mlock() methods has the advantage of storing the data off-heap, so large amounts of data can be cached without affecting garbage collection.

Because mlock() takes advantage of the OS page cache, if the block is already held there, we don’t need to copy it. The disadvantage of mlock is that the block must already exist in the filesystem before it can be locked in memory. So we cannot cache replicas on nodes that don’t have the replica already on disk.

DataNodes periodically send cache reports to the NameNode, which contain the state of their local cache. As soon as the NameNode knows that a block has been successfully cached on a DataNode, application schedulers can query the NameNode for this information and use it to schedule tasks for cache-locality.

Zero-copy Reads

Zero-copy read (ZCR) is the final step in efforts to improve the efficiency of the HDFS read path. Copies are one of the most obvious sources of inefficiency; the more time spent copying data, the fewer CPU cycles are left for useful work. ZCR is theoretically optimal in this regard, hence the name “zero-copy.”

The standard HDFS remote read path copies data from the kernel into the DataNode prior to sending it on to the DFSClient via a TCP socket. Short-circuit local reads eliminate this copy by “short-circuiting” the trip through the DataNode. Instead, the client simply reads the block file directly from the local filesystem.

However, even when using short-circuit reads, the DFSClient still needs to copy the data from kernel page cache into the client’s address space. ZCR, implemented in HDFS-4953, allow us to avoid that copy. Instead of copying, we use the mmap() system call to map the block from page cache directly into the client’s address space. ZCR also avoids the context switch overhead of repeated invocations of the read system call, which can be significant.

However, mmap() has some disadvantages. One difficulty is handling I/O errors. If a read() system call encounters an I/O error, it simply returns an error code. Accessing a memory-mapped segment can’t return an error, so any error results in a SIGBUS signal instead. Unless a signal handler has been installed, the calling process is terminated.

Fortunately, if a client is reading data that is cached by HDFS, it will never hit an I/O error (and thus never get a SIGBUS) — because the data is pinned in memory with mlock(). This approach lets us safely do ZCR without worrying about unexpected program termination. The client can also skip checksum verification when reading cached data, as the data is already checksummed by the datanode when it’s cached.

The ZCR API is described in HDFS-5191. In addition to a Java API, there is also a C API that allows applications such as Impala to take full advantage of zero-copy reads.

Example CLI usage

Here’s a simple example of creating a new cache pool and adding a cache directive for a file. This example assumes you’ve already configured your cluster correctly according to the official documentation.


Future Work

There are a number of further improvements we’d like to explore. For example, a current limitation of the system is that users need to manually specify what files and directories should be cached. Instead, HDFS could automatically manage what is cached based on workload patterns or hints.

Another potential improvement would be to extend HDFS caching to output files as well as input files. One potential use case for this so-called write-caching is for intermediate stages of a multi-job pipeline. Write-caching could avoid writing to disk at all, if durability is not required. This avenue of development is being pursued in HDFS-5851.


Due to increasing memory capacity, many interesting working sets are able to fit in aggregate cluster memory. By using HDFS centralized cache management, applications can take advantage of the performance benefits of in-memory computation. Cluster cache state is aggregated and controlled by the NameNode, allowing applications schedulers to place their tasks for cache locality. Explicit pinning of datasets allows users to isolate their working sets from other users on shared clusters. Finally, the new zero-copy read API offers substantially improved I/O performance by allowing clients to safely skip overhead from checksumming and the read()syscall.

In a follow-up post, we’ll analyze the performance of HDFS caching using a number of micro and macro benchmarks. Stay tuned!

Colin McCabe and Andrew Wang are both Software Engineers at Cloudera, and Hadoop committers/PMC members.


10 responses on “New in CDH 5.1: HDFS Read Caching

  1. Ofir Manor

    excellent post! Looks like a clean design.
    I think I’ve asked in the past about ecosystem integration….
    Specifically, if I use the version of Impala that comes with CDH 5.1, and I set up a cache pool and add directive to a cache a certain file, will it be picked up automatically by Impala’s HDFS client by default?
    If so, that is awesome… Worth an explicit mention :)
    If not, what needs to be set to make it work for Impala? Andf or Java HDFS clients like pig?

  2. Jim Bates

    So… Is this really telling me that for Impala to do solid real time applications on Hadoop I need to do some innovative workarounds to remove HDFS from the picture? Is HDFS is just to slow for real-time? I’m never going to have enough memory to not have to worry about a file system. Also, what were the numbers with the file reads to get things into memory?

  3. Andrew Wang

    HI Ofir,

    With an older version of Impala, you’ll still see some benefit, but not as much. There were two main improvements within Impala to take advantage of caching; first, scheduling query fragments for cache-locality, and second, using the new ZCR API for faster reads of cached data. Without these two things, you’ll still see some speedup if you happen to get placed local with a cached copy, or if you choose to cache all the replicas of a file, but you’ll still be reading via SCR rather than ZCR. Note this is still a few GB/s though, so not bad :)

    So far, I believe input formats like Parquet and ORC have been improved to use the ZCR API, but the scheduler improvements haven’t happened yet in Spark or MR.

    If you’re using Impala from 5.1 onwards, you shouldn’t need to do anything beyond setting up cache directives. This can be done either manually, or via the Impala DDL:

  4. Ofir Manor

    Thanks a lot Andrew!
    So, it is good news for Impala users migrating to CDH 5.1 (if they have the right admin…)
    The Impala documentation is very good and the API (ALTER TABLE etc) is very natural and clean, well done :)
    Just curious – why is calling ZCR not transparent to the app? Is it because Impala is non-Java?

  5. Srini


    You mention write thru cache as future work HDFS-5851.

    One question , if we have write thru cache, then can map jobs still be placed with memory locality? That is, if I do not use disk at all , then do I still specify HDFS folders as my input folders, and Hadoop figures out where the block is already cached and places the map task on that machine.


  6. Maninder

    Hi, i have couple of questions:-
    1. If the underlying data changes, is there a way for cache to poll the underlying metrics occasionally to check if the cached data is still the latest?

    2. What happens when a pool becomes full? Does it implements a LRU policy?

    3. Is there a time based pool eviction policy?

    4. Are pool metrics available, such as hit ratio, % full etc?

  7. Colin McCabe

    Hi Maninder,

    If the underlying block is modified, the cached replica will be invalidated. By using the short-circuit shared memory stuff introduced in HDFS-5182, the client will become aware that it should not continue to use the file descriptor.

    Pools do not currently implement an LRU policy. It would be a nice improvement, but for now, the administrator must ensure that they don’t cache too much data.

    There is a time-based pool eviction policy in the form of cache TTL.

    We don’t currently have hit ratio metrics.

    More information is available at


  8. Andrew Wang

    Hi Maninder,

    1. The NN periodically rescans its directives, so will cache data if it changes. e.g. a file is appended to and closed, or a new file is added to a directory.

    2. There are quotas on a pool basis, so when a pool is full, it will not cache any more. It also can reject submission of new directives. Of submitted directives, it does not have a priority order of what it caches or does not cache. No LRU either.

    3. There is a time-based expiry policy, see time-to-live and maxTtl in the documentation:

    4. There are currently no metrics like this, since SCR means that most reads happen directly from the client rather than going through the DN. Impala does have client-metrics though that include number of cached bytes read.

Leave a Reply

Your email address will not be published. Required fields are marked *