Exploring Compression for Hadoop: One DBA’s Story

This guest post comes to us courtesy of Gwen Shapira (@gwenshap), a database consultant for The Pythian Group (and an Oracle ACE Director).

Most western countries use street names and numbers to navigate inside cities. But in Japan, where I live now, very few streets have them.

Sometimes solving technical problems is similar to navigating a city without many street names: Once you arrive at the desired location, the path seems obvious, but on the way there are many detours and interesting sights to be seen.

Here’s an example: in a client engagement in Tokyo for which I was a database consultant, we needed to move about 10TB from Hadoop into an Oracle Exadata box as a one-time load.

The options we considered were:

  1. Use FUSE to mount HDFS on Exadata and use Oracle external tables (an Oracle Database feature through which you can access external data as if it were stored in a table) to load the files.
  2. Install Hadoop on Oracle Exadata, use HDFS to copy the files to the compute nodes, and load them from there.
  3. Write process on the existing server that will read data from Hadoop and load it into Oracle Exadata.
  4. Use Sqoop to insert data into Exadata directly from Hadoop.
  5. Use a pre-packed Oracle connector.

FUSE and Sqoop seemed like the least intrusive options, requiring minimal changes on the Exadata or Hadoop, and we debated the pros and cons of each. Ultimately using FUSE and external tables, while not a common approach, seemed the best choice for our situation. Once we had the data in text files, we could load data into our tables using any method we wanted, using all of Oracle Exadata’s features to optimize compression, parallelism, and load times.

Being a consultant, I sent the customer system administrators links to relevant Hadoop-FUSE documentation and asked them to get cranking.

The system administrator called me back to say that Hadoop is “backed up" nightly to NFS, and that it would be easier if we just used that data. Since I didn’t mind getting data that is few hours old (I know how to close the data gap), I agreed to their plan.

I used "less" to peek at one of the files. This is what I saw:

SEQ org.apache.hadoop.io.BytesWritable

An experienced Hadoop-er would know exactly what she was looking at, but I was still in learning mode. In a discussion with the application developer, I was told that the file was compressed.

Just for fun, I decided to write the decompression Java code myself. Read the compressed file, output text to STDOUT, use it as an external tables pre-processor: How difficult could it be? I just needed some code samples of how to use the LZO libraries, and we would be set.

Or so I thought. Almost immediately, I got as lost as a drunk tourist in Shinjuku at night. Hadoop in fact has more than one LZO library, and I was not sure if they are mutually compatible. Files can be block compressed or row compressed, and I did not know what my file was. The libraries were used only as plug-ins to Hadoop; apparently no one used them on files outside Hadoop. There was also something called LZOP, and a lot of C libraries that I tried using but that kept claiming my file was not in fact LZO. At that point, I called it a day.

But before going to sleep, I took a look at Hadoop: The Definitive Guide (2nd edition), by Cloudera’s Tom White, hoping for some ideas and insights. As expected, the book proved as useful as Google Maps in Tokyo.

Two hours and three glasses of Kirin later, I discovered that:

  • The file is not just compressed – it is in fact a SequenceFile. I heard about SequenceFiles at my Cloudera developer course, but I had not actually used one before.
  • SequenceFiles have very descriptive headers that include the key data type (BytesWritable), value data type (Text), whether the file is compressed (YES), block compressed (YES) and codec (LZO).
  • Tom White had thoughtfully included example code that reads SequenceFiles. The book didn’t mention compression, but from my readings I gathered that SequenceFile.Reader might be able to handle that data automatically if I include the LZO libraries in my class path.

This looked easy enough. I just needed to get Tom’s example to work with my environment (within Exadata, no Hadoop installed, to be used as input for external tables).

The following steps turned out to be the correct ones:

Step 1: Prepare the environment. To do this I needed the Hadoop core package and the Hadoop-LZO package. Hadoop core (hadoop-0.20.2-cdh3u2-core.jar) was copied from the Hadoop server (also Linux 64-bit, so it was compatible), LZO was installed from Hadoop-GPL-Packing, an excellent little project that saved me much pain. The location of the packages was added to CLASS_PATH environment variable. You will also need commons-logging.jar somewhere in the class path as Hadoop uses log4j.

I also needed native libraries for Hadoop and LZO. I copied libhadoop.so from same server, the hadoop-gpl-packing package included native LZO libraries; the locations of both went into LD_LIBRARY_PATH.

Step 2: Compile SequenceFileReaderDemo (from the book) and test it on a sample file:

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import com.hadoop.compression.lzo.LzoCodec;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileReadDemo {
        public static void main(String[] args) throws IOException {
                String uri = args[0];
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create(uri), conf);
                Path path = new Path(uri);
                SequenceFile.Reader reader = null;
                try {
                        reader = new SequenceFile.Reader(fs, path, conf);
                        BytesWritable key = (BytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
                        Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
                        long position = reader.getPosition();
                        while (reader.next(key, value)) {
                                String syncSeen = reader.syncSeen() ? "*" : "";
                                //System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
                                System.out.printf("%s\n", value);
                                                        position = reader.getPosition();
                                // beginning of next record
                finally {IOUtils.closeStream(reader);}

To my joy and disbelief, this "just worked". I had to add import org.apache.hadoop.io.BytesWritable to support our particular key, but once I set the environment right, javac SequenceFileReaderDemo.java ran without issues. Testing the reader of a sample file also worked with no special problems other than those that led to the discovery of native libraries:

export PATH=/usr/java/latest/bin:/usr/kerberos/bin:
export CLASSPATH=/home/oracle/work/shapira/DWH/hadoop-0.20.2-cdh3u2-core.jar:
/home/oracle/work/shapira/DWH export LD_LIBRARY_PATH=:

java SequenceFileReadDemo $1  2>/dev/null

Step 3: Use SequenceFileReaderDemo as pre-processor for external tables. This was probably the most difficult step (relatively speaking). Integration usually is.

First, I had to tweak the output of SequenceFileReaderDemo to match the expectations of Oracle Database. Don’t output row numbers, or keys, or alignment marks – just the values and nothing but the values. Also, Hadoop will try to send info messages to STDERR, which causes Oracle Database to assume something is wrong with the External table and quit. Rather than learn how to configure log4j parameters for my non-existing Hadoop, I just piped STDERR into null, which although sloppy would do for a POC.

Step 4: Load the data from the external table into a real table. I used Create Table as Select for the POC, but there are many other options. (That’s one of the nicest things about external tables; the feature is so flexible that you can even read sequence files!)

To test the output, I also had to set one more environment variable: export NLS_LANG=JAPANESE_JAPAN.UTF8. Otherwise, most text came out as junk:

-- external table test

--create directory nfs_dir as '/home/oracle/work/shapira/DWH';
--create directory exec_dir as '/home/oracle/work/shapira/DWH';

--grant read,write on directory nfs_dir to dw_test;
--grant execute on directory exec_dir to dw_test;

conn dw_test/dw_test
drop table ext_shop;
drop table test1;
drop table test6;

CREATE TABLE ext_test (
        db_name varchar2(30),
        test_id number,
        db      varchar2(30),
        system  varchar2(30),
        url     varchar2(1024),
        test_name varchar2(1024),
        test_name_kana varchar2(1024),
        time_stamp timestamp
        TYPE oracle_loader
        default directory nfs_dir
        access parameters (
                records delimited by newline characterset utf8
                preprocessor exec_dir:'test_reader.sh'
                fields terminated by "|"
                missing field values are null
                (db_name,test_id,db,system,url,test_name,test_name_kana,time_stamp char(19)
                  DATE_FORMAT TIMESTAMP 'yyyy-mm-dd hh24:mi:ss')
        location ('000009_0'))
set pagesize 300 linesize 200;
select * from ext_test where rownum<=2;

set timi on
create table test1 as select * from ext_test;

--Create table test2 compress for query low as select * from ext_test;
--Create table test3 compress for query high as select * from ext_test;
--Create table test4 compress for archive low as select * from ext_test;
--Create table test5 compress for archive high as select * from ext_test;
create table test6 compress for oltp as select * from ext_test;

select segment_name,blocks,bytes/1024 from dba_segments where segment_name like 'test%';
select count(*) from test1;
select count(*) from test6;

Of course, this was just enough to show the reluctant developers how things are done. I think they were happy with the results.

But this was not real production code. The most glaring issue is that the error handling is dubious at best. But what was more worrying for me was that most of the data is stored in many small files - not exactly the best way to Hadoop. The overhead of creating a new JVM to load each file would become an issue.

I could have had SequenceFileReader accept entire directories as inputs and process all files in the directories at once, but directories have highly variable sizes and there are not enough of them. I decided that the input would have to be a range of dates or filenames within a directory, and used this to partition the loading process manually.

Maybe I was obsessing over the details of a one-time load, but no one complained. Japanese are famous for getting the small details right!

No Responses

Leave a comment

× 7 = thirty five