The Small Files Problem

Small files are a big problem in Hadoop — or, at least, they are if the number of questions on the user list on this topic is anything to go by. In this post I’ll look at the problem, and examine some common solutions.

Problems with small files and HDFS

A small file is one which is significantly smaller than the HDFS block size (default 64MB). If you’re storing small files, then you probably have lots of them (otherwise you wouldn’t turn to Hadoop), and the problem is that HDFS can’t handle lots of files.

Every file, directory and block in HDFS is represented as an object in the namenode’s memory, each of which occupies 150 bytes, as a rule of thumb. So 10 million files, each using a block, would use about 3 gigabytes of memory. Scaling up much beyond this level is a problem with current hardware. Certainly a billion files is not feasible.

Furthermore, HDFS is not geared up to efficiently accessing small files: it is primarily designed for streaming access of large files. Reading through small files normally causes lots of seeks and lots of hopping from datanode to datanode to retrieve each small file, all of which is an inefficient data access pattern.

Problems with small files and MapReduce

Map tasks usually process a block of input at a time (using the default FileInputFormat). If the file is very small and there are a lot of them, then each map task processes very little input, and there are a lot more map tasks, each of which imposes extra bookkeeping overhead. Compare a 1GB file broken into 16 64MB blocks, and 10,000 or so 100KB files. The 10,000 files use one map each, and the job time can be tens or hundreds of times slower than the equivalent one with a single input file.

There are a couple of features to help alleviate the bookkeeping overhead: task JVM reuse for running multiple map tasks in one JVM, thereby avoiding some JVM startup overhead (see the mapred.job.reuse.jvm.num.tasks property), and MultiFileInputSplit which can run more than one split per map.

Why are small files produced?

There are at least two cases

  1. The files are pieces of a larger logical file. Since HDFS has only recently supported appends, a very common pattern for saving unbounded files (e.g. log files) is to write them in chunks into HDFS.
  2. The files are inherently small. Imagine a large corpus of images. Each image is a distinct file, and there is no natural way to combine them into one larger file.

These two cases require different solutions. For the first case, where the file is made up of records, the problem may be avoided by calling HDFS’s sync() method (which came with appends, although see this discussion) every so often to continuously write large files. Alternatively, it’s possible to write a program to concatenate the small files together (see Nathan Marz’s post about a tool called the Consolidator which does exactly this).

For the second case, some kind of container is needed to group the files in some way. Hadoop offers a few options here.

HAR files

Hadoop Archives (HAR files) were introduced to HDFS in 0.18.0 to alleviate the problem of lots of files putting pressure on the namenode’s memory. HAR files work by building a layered filesystem on top of HDFS. A HAR file is created using the hadoop archive command, which runs a MapReduce job to pack the files being archived into a small number of HDFS files. To a client using the HAR filesystem nothing has changed: all of the original files are visible and accessible (albeit using a har:// URL). However, the number of files in HDFS has been reduced.
SequenceFile and MapFile File Layouts

Reading through files in a HAR is no more efficient than reading through files in HDFS, and in fact may be slower since each HAR file access requires two index file reads as well as the data file read (see diagram). And although HAR files can be used as input to MapReduce, there is no special magic that allows maps to operate over all the files in the HAR co-resident on a HDFS block. It should be possible to built an input format that can take advantage of the improved locality of files in HARs, but it doesn’t exist yet. Note that MultiFileInputSplit, even with the improvements in HADOOP-4565 to choose files in a split that are node local, will need a seek per small file. It would be interesting to see the performance of this compared to a SequenceFile, say. At the current time HARs are probably best used purely for archival purposes.

Sequence Files

The usual response to questions about “the small files problem” is: use a SequenceFile. The idea here is that you use the filename as the key and the file contents as the value. This works very well in practice. Going back to the 10,000 100KB files, you can write a program to put them into a single SequenceFile, and then you can process them in a streaming fashion (directly or using MapReduce) operating on the SequenceFile. There are a couple of bonuses too. SequenceFiles are splittable, so MapReduce can break them into chunks and operate on each chunk independently. They support compression as well, unlike HARs.  Block compression is the best option in most cases, since it compresses blocks of several records (rather than per record).

It can be slow to convert existing data into SequenceFiles. However, it is perfectly possible to create a collection of SequenceFiles in parallel. (Stuart Sierra has written a very useful post about converting a tar file into a SequenceFile — tools like this are very useful, and it would be good to see more of them). Going forward it’s best to design your data pipeline to write the data at source direct into a SequenceFile, if possible, rather than writing to small files as an intermediate step.

HAR File Layout
Unlike HAR files there is no way to list all the keys in a SequenceFile, short of reading through the whole file. (MapFiles, which are like SequenceFiles with sorted keys, maintain a partial index, so they can’t list all their keys either — see diagram.)

SequenceFile is rather Java-centric. TFile is designed to be cross-platform, and be a replacement for SequenceFile, but it’s not available yet.

HBase

If you are producing lots of small files, then, depending on the access pattern, a different type of storage might be more appropriate. HBase stores data in MapFiles (indexed SequenceFiles), and is a good choice if you need to do MapReduce style streaming analyses with the occasional random look up. If latency is an issue, then there are lots of other choices — see Richard Jones’ excellent survey of key-value stores.

Filed under:

32 Responses
  • Jean-Daniel Cryans / February 02, 2009 / 4:43 PM

    Nice post Tom. Just to add some info with regards to HBase, we are currently integrating TFile in HBase for 0.20 which will hopefully reduce the latency to a reasonable level (looking good). We are also in the process of adding a new cache (HBASE-80).

  • ryan / February 18, 2009 / 3:38 AM

    Hi Tom,

    We are making substantial changes to HBase for 0.20 to improve all manners of performance. Random reads, large scans, etc, etc are all anticipated to be faster.

    The new file-format, HFile (hbase-file! get it?) is behind these changes. Check out this for a history of how it came to be:

    http://ryantwopointoh.blogspot.com/2009/02/scalability-what-are-you-doing-about-it.html

  • Mark Schnitzius / March 30, 2009 / 5:52 PM

    Great article – we are going through many of the same issues. I’m curious to hear how others are dealing with concurrency issues, i.e. external processes adding new data while consolidation of smaller files is taking place.

  • Tongjie Chen / May 21, 2009 / 2:58 PM

    Each day we have to deal with around 20,000 gzip files, each gzip file is around 25M(not sure whether 25M falls into *small file* category or not :-) ).

    Hadoop tasktracker has memory leaking bug prior to 0.19(http://issues.apache.org/jira/browse/HADOOP-4906), which basically disallows us to run job with 20k+ mappers.

    We found that MultiFileSplit is very good fit for this problem:

    1. convert the whole gzip file to bytes and wrap it with BytesWritable; and wrap multiple BytesWritables into one ArrayWritable
    2. Write a CustomMultiFileInputFormat and CustomMultiFileRecordReader where record key is NullWritable, and record value is ArrayWritable.
    3 In the map task, for each value(ArrayWritable), We can restore the original bytes from BytesWritable (element of array) and process.

    I have used this approach to process around 1.5 millions gzip files (each with 25M in size) in one job and use only 10000 mapper (our cluster size is 61 machines, each 16G memory).

    One important thing is that We have to increase the child jvm’s -Xmx to big enough to hold ArrayWritable.

  • Konstantin Shvachko / July 13, 2009 / 5:57 PM

    This might be also useful in the context of this discussion.
    HADOOP-1687. Name-node memory size estimates…
    Name-node object (file or block) size is less than 200 bytes. This is more conservative than 150 bytes, but more practical in my experience.
    So the rule of thumb would be: 10 million files require 4 GB of memory.

  • Eko Kurniawan Khannedy / April 28, 2010 / 4:25 AM

    anyone can tell me, why hadoop default block size is 64MB? why not larger?

  • Prabhu / July 28, 2010 / 9:06 AM

    Great exposition, thanks a lot !
    I am faced with a somewhat related problem. Initially I obtained a directory containing a lot of files from the customer. Hadoop was ideal for this, I used the older api for random sampling and totalOrderSort partitioning. Now they are feeding one small file at 6 hour intervals and want it to be processed at the same rate. Now, when I try to run my job, I am getting an ArrayIndexOutOfBoundsException while it tries to write the partition file. I suspect it is because I don’t have as many splits as I specify in new RandomSampler(0.1,10,1000). Any help resolving this would be greatly appreciated.
    Thanks very much !
    -Prabhu

  • Prabhu / July 28, 2010 / 11:48 AM

    I solved th eproblem by determining the block size. If it is less than 256 MB, set number of reduce tasks to 1. Else, use random sampler, total order sort partitioner etc.,

  • Adolf / October 01, 2010 / 3:58 AM

    TO solve small files problem in HDFS , DO you know What can be done?

  • Q Ethan / February 10, 2011 / 2:21 PM

    If SequenceFiles are your way to address the small files problem, you may be interested in forqlift:

    http://www.exmachinatech.net/go/forqlift/

    forqlift is a commandline tool that makes it easy to import/export small files to/from SequenceFiles. (It’s also free and open-source.)

  • Jeans zhong / March 08, 2011 / 10:17 AM

    Hii ,Tom, can you explain the following sentence more detailedly ?or give me some reference information?
    this sentence:
    Reading through small files normally causes lots of seeks and lots of hopping from datanode to datanode to retrieve each small file, all of which is an inefficient data access pattern.
    why cause seeks and hopping? I think it only need to read block of small file at the best datanode.why did you say like that.what ‘s your reason?
    please help me .

  • TreeMan / March 19, 2012 / 4:30 AM

    begin to understand this,a good document !

  • Viraj Paropkari / November 05, 2012 / 10:42 AM

    Hi Tom,
    I think there is miscalculation in the sentence.

    “So 10 million files, each using a block, would use about 3 gigabytes of memory. ”

    It should be 1 million instead of 10 million.

    Thanks.

  • Andrew / November 06, 2012 / 2:02 AM

    I have found using SequenceFile.Reader via a direct DFSClient to be very slow at around 4.5MB/s.

    Compared to using FSDataInputStream and reading blocks of bytes directly we get 76MB/s.

    Is the SequenceFile reader expected to be this slow?

Leave a comment


six + 2 =