Apache Hadoop 2.3.0 is Released (HDFS Caching FTW!)

Categories: Community Hadoop HDFS Impala

Hadoop 2.3.0 includes hundreds of new fixes and features, but none more important than HDFS caching.

The Apache Hadoop community has voted to release Hadoop 2.3.0, which includes (among many other things):

  • In-memory caching for HDFS, including centralized administration and management
  • Groundwork for future support of heterogeneous storage in HDFS
  • Simplified distribution of MapReduce binaries via the YARN Distributed Cache

You can read the release notes here. Congratulations to everyone who contributed!

As noted above, one of the major new features in Hadoop 2.3.0 is HDFS caching, which enables memory-speed reads in HDFS. This feature was developed by two engineers/Hadoop committers at Cloudera: Andrew Wang and Colin McCabe.

HDFS caching lets users explicitly cache certain files or directories in HDFS. DataNodes will then cache the corresponding blocks in off-heap memory through the use of mmap and mlock. Once cached, Hadoop applications can query the locations of cached blocks and place their tasks for memory-locality. Finally, when memory-local, applications can use the new zero-copy read API to read cached data with no additional overhead. Preliminary benchmarks show that optimized applications can achieve read throughput on the order of gigabytes per second.

Better yet, this feature will be landing in CDH 5.0 (which is based on Hadoop 2.3.0) when it ships alongside corresponding Impala improvements that take advantage of these new APIs for improved performance. So, you can look forward to an even faster Impala in the new release!

Justin Kestelyn is Cloudera’s developer outreach director.


7 responses on “Apache Hadoop 2.3.0 is Released (HDFS Caching FTW!)

  1. Ofir Manor

    Is there a more detailed explanation on HDFS caching behaviour and how to control it? For example:
    * How is it managed when there are multiple applications on the cluster (Impala and non-Impala) asking for caching?
    * Are there caching quotas per user or other mechanism?
    * How is it integrated with YARN? Id the HDFS cache taken into YARN consideration (less RAM for containers)? Is it accounted as HDFS RAM or the app RAM?
    * How does it handle multiple caching requests? LRU eviction? Can it cache partial file under memory pressure?
    etc etc
    Anyway, it looks like an interesting way to help with repeated access to common data set, in cases where standard O/S caching is not good enough (forcing cold data to linger in memory). It gives a good tradeoff between query efficiency (on specific tables) and overall system efficiency (holding to RAM forever, not usable for the rest of the Hadoop workloads).
    Personally, I think that the coming mechanism to colocate files on the same HDFS node will have a much bigger impact on Impala performance (for joins…)
    Just my two cents,

  2. Olivier Grisel

    By reading the source code it does not seem to be the case that a collocated non-java DFSClient could use the BlockStorageLocation metadata to find the backing local file path so as to memory map it directly from the block offset to access the data (in readonly mode) directly without going through the JVM. Is this intended (maybe for security reasons?).

    It would be very nice if YARN applications written in Python for instance could use the Centralized Cache Management to cache a folder or file HDFS and pin it in memory and use YARN host aware process allocation to execute a Python process that can access the block replica content via direct memory mapping without going through the JVM.

  3. Ofir Manor

    Thanks a lot Andrew!
    The doc answers my first two questions with the cache pool concept. Some things are still not clear:
    – YARN – is YARN aware of the cache pool? If a add a 1TB cache pool, and limit it to 16GB per node, is the local NodeManager aware of how much free RAM is availble for apps? Even though this may change dynamically as blocks are added or removed?
    – Within a single cache pool, how is it managed? If I create a 512GB cache pool and try to cache a 1TB directory, what will happen? What if the directory started as 300GB and later grew to 1TB? (this is my LRU / memory pressure question)
    – More interesting, what is available in CDH5 and specifically – Cloudera Manager for CDH 5? What are your Impala best practices regarding HDFS cache? Is there some table attribute within Impala / HCatalog to request caching or is it done directly from the hdfs cache command line? (for example, think of partitioned table, maybe with subpartitions)
    Anyway, always good to see progress :)

  4. Andrew Wang

    Hi Oliver,

    The current short-circuit read path works via file descriptor passing over a unix domain socket, not by letting the client directly open the underlying block file (the old insecure way). You can see details on the new SCR at HDFS-347.

    I’m not familiar with the Python bindings for HDFS, but pretty much all clients can achieve zero-copy reads. All the HDFS client is doing is mmaping the passed file descriptor and passing back a pointer, which is pretty simple.

    Hi Ofir,

    We don’t have YARN integration at this time, so right now you have to partition node memory between NMs and the DN cache. It’s definitely something I’d like to work on.

    Within a cache pool, we don’t have any really smart policies like fair sharing. It’s essentially first come, first served: we cache things until we run out of quota, and then you get alerts that we can’t cache the rest. Cache directives right now are explicit, there’s no concept of automatic eviction or LRU, but again a future work item.

    As to what will be in CDH5, we’re working on a future blog post on this very topic. We’ll also cover a sample set of usecases. Finally, there will eventually be DDL-level improvements (not 100% sure when), but again a roadmap item.

    As you can probably tell, there’s still a fair amount of work to be done, but we’re hoping that this initial release will let sophisticated users kick the tires and get back to us with feedback.

  5. Olivier Grisel

    Thanks very much for your explanations Andrew. I did not know that it was possible to pass file descriptors between processes using domain sockets under Linux. Very smart way to handle the security constraints.

    So if I understand correctly, if I want a Python YARN application that does non-copy access to HDFS data I would need to:

    – issue a RPC call to the Centralized Cache Manager service to pin an HDFS file in memory;

    – fetch the list of hosts that have a cached replica of the blocks for that file, either using the RPC interface to getBlockStorageLocations or libhdfs’ hdfsGetHosts (but the latter does not seem to be cache-aware and does not return information on the byte offsets for individual blocks);

    – ask YARN to schedule the execution of instances of my Python program to be collocated on those datanodes;

    – in my Python program use libhfds JNI bindings to do an hdfsSeek to the byte offset for each block and then call the hadoopReadZero / hadoopRzBufferGet functions to get the address of the buffer pointer and wrap it as a python object, for instance using:

    numpy.frombuffer((ctypes.c_char * block_size).from_address(block_address))

    Apparently python-hdfs does not expose hadoopReadZero but I suppose this is just a matter of wrapping it and the related helper functions.

  6. Olivier Grisel

    Actually by reading the code further I now understand that using libhdfs and JNI is not even required and that the Python process can directly fetch readonly file descriptors from the data node via RPC calls as well.