# Distributed Flume Setup With an S3 Sink

This is a guest repost contributed by Eric Lubow, CTO at SimpleReach.  It originally appeared here.

I have recently spent a few days getting up to speed with FlumeCloudera‘s distributed log offering. If you haven’t seen this and deal with lots of logs, you are definitely missing out on a fantastic project. I’m not going to spend time talking about it because you can read more about it in the user’s guide or in the Quora Flume Topic in ways that are better than I can describe it. But I will tell you about is my experience setting up Flume in a distributed environment to sync logs to an Amazon S3 sink.

As CTO of SimpleReach, a company that does most of it’s work in the cloud, I’m constantly strategizing on how we can take advantage of the cloud for auto-scaling. Depending on the time of day or how much content distribution we are dealing with, we will spawn new instances to accommodate the load. We will still need the logs from those machines for later analysis (batch jobs like making use of Elastic Map Reduce).

I am going to attempt to do this as step-by-step as possible but much of the terminology I use is described in the user’s guide and there is an expectation that you have at least skimmed it prior to starting this HOWTO. I am using EMR (Elastic Map Reduce) on EC2 and not the provided Hadoop by Cloudera. Additionally, the Cloudera version of Flume that I am working with is CDH3b3 (v0.9.1+29).

## Context

I have 3 kinds of servers all running CentOS in the Amazon cloud:

1. a1: This is the agent which is producing all the logs

2. c1: This is the collector which is aggregating all the logs (from a1, a2, a3, etc)

3. u1: This is the flume master node which is sending out all the commands

There are actually n agents, but for this example, we’ll keep it simple. Also, for a complete copy of the config files, please check out the full gist available here.

## Initial Setup

On both a1 and c1, you’ll have to install flume-node (flume-node contains the files necessary to run the agent or the collector).

# curl http://archive.cloudera.com/redhat/cdh/cloudera-cdh3.repo > /etc/yum.repos.d/cloudera-cdh3.repo
# yum update yum
# yum install flume flume-node


On u1, you’ll need to install the flume-master RPM:

# curl http://archive.cloudera.com/redhat/cdh/cloudera-cdh3.repo > /etc/yum.repos.d/cloudera-cdh3.repo
# yum update yum
# yum install flume flume-master


On each host, you need to copy the conf template file to the site specific config file. That is to say:

cp flume-site.xml.template flume-site.xml


First let’s jump onto the agent and set that up. Tune the $master_IP and$collector_IP variables appropriately, but change your /etc/flume/conf/flume-site.xml to look like:


flume.master.servers
$master_IP This is the address for the config servers status server (http) flume.collector.event.host$collector_IP
This is the host name of the default "remote" collector.

flume.collector.port
35853
This default tcp port that the collector listens to in order to receive events it is collecting.

flume.agent.logdir
/mnt/flume-${user.name}/agent This is the directory that write-ahead logging data or disk-failover data is collected from applications gets written to. The agent watches this directory.  Now on to the collector. Same file, different config. Replace all the variables with you$master IP address (you should be using Amazon’s internal IPs otherwise you will be paying the regional charge). The $account and$secret variables are both your Amazon EC2/S3 account key and secret Access key respectively. The $bucket is the S3 bucket that will contain the log files. Also worthy of pointing out is theflume.collector.roll.millis and flume.collector.dfs.compress.gzip. The millis is how frequently the log file gets truncated and the next file begins to be written to. It would be nice if this could be done by file size and not only by time, but it works for now. The other config option is flume.collector.dfs.compress.gzip. This ensures that the logfiles are compressed prior to being dumped onto S3 (saves LOTS of space).  flume.master.servers$master
This is the address for the config servers status server (http)

flume.collector.event.host
localhost
This is the host name of the default "remote" collector.

flume.collector.port
35853
This default tcp port that the collector listens to in order to receive events it is collecting.

fs.default.name
s3n://$account:$secret@$bucket fs.s3n.impl org.apache.hadoop.fs.s3native.NativeS3FileSystem fs.s3.awsAccessKeyId$account

fs.s3.awsSecretAccessKey
$secret fs.s3n.awsAccessKeyId$account

fs.s3n.awsSecretAccessKey
$secret flume.agent.logdir /mnt/flume-${user.name}/agent
This is the directory that write-ahead logging data
or disk-failover data is collected from applications gets
written to. The agent watches this directory.

flume.collector.dfs.dir
file:///mnt/flume-${user.name}/collected This is a dfs directory that is the the final resting place for logs to be stored in. This defaults to a local dir in /tmp but can be hadoop URI path that such as hdfs://namenode/path/ flume.collector.dfs.compress.gzip true Writes compressed output in gzip format to dfs. value is boolean type, i.e. true/false flume.collector.roll.millis 60000 The time (in milliseconds) between when hdfs files are closed and a new file is opened (rolled).  While we are still on the collector, in order to properly write to S3, you’ll need to make 4 file adjustments and all of them will go into the /usr/lib/flume/lib/ directory. 1. commons-codec-1.4.jar 2. jets3t-0.6.1.jar 3. commons-httpclient-3.0.1.jar 4. emr-hadoop-core-0.20.jar The one thing that should be noted here is that the emr-hadoop-core-0.20.jar file replaces the hadoop-core.jar symlink. The emr-hadoop-core-0.20.jar file is the hadoop-core.jar file from an EC2 Hadoop cluster instance. Note: This will break the ability to seamlessly upgrade via the RPM (which is how you installed it if you’ve been following my HOWTO). Keep these files around just in case. I have added a tarball of the files here, but they are all still available with a quick Google search. And now on to the master. There was actually no configuration that I did on the master file system to get things up and running. But if flume is writing to a /tmp directory on an ephemeral file system, then it should be fixed. ## Web Based Setup I chose to do the individual machine setup via the master web interface. You can get to this pointing your web browser at http://u1:35871/ (replace u1 with public DNS IP of your flume master). Ensure that the port is accessible from the outside through your security settings. At this point, it was easiest for me to ensure all hosts running flume could talk to all ports on all other hosts running flume. You can certainly lock this down to the individual ports for security once everything is up and running. At this point, you should go to a1 and c1 run /etc/init.d/flume-node start. If everything goes well, then the master (whose IP is specified in their configs) should be notified of their existence. Now you can configure them from the web. Click on the config link and then fill in the text lines as follows (use what is in bold): • Agent Node:$agent_ec2_internal_ip
• Source: tailDir(“/mnt/logs/”,”.*.log”)
• Sink: agentBESink(“$collector_ec2_internal_ip”,35853) Note: I chose to use tailDir since I will control rotating the logs on my own. I am also using agentBESink because I am ok with losing log lines if the case arises. Now click Submit Query and go back to the config page to setup the collector: • Agent Node:$collector_ec2_internal_ip
• Source: collectorSource(35853)
• Sink:collectorSink(“s3n://$account:$secret@$bucket/logs/%Y/%m/%d/%H00?,”server”) This is going to tell the collector that we are sinking to s3native with the$account key and the $secret key into the$bucket with an initial folder of ‘logs’. It will then log to sub-folders with YYYY/MM/DD/HH00 (or 2011/02/03/1300/server-.log). There will be 60 gziped files in each folder since the timing is setup to be 1 file per minute. Now clickSubmit Query and go to the ‘master’ page and you should see 2 commands listed as “SUCCEEDED” in the command history. If they have not succeeded, ensure a few things have been done (there are probably more, but this is a handy start:

1. Always use double quotes (“) since single quotes (‘) aren’t interpreted correctly. UPDATE: Single quotes are interpreted correctly, they are just not accepted intentionally (Thanks jmhsieh)

2. In your regex, use something like “.*\\.log” since the ‘.’ is part of the regex.

3. In your regex, ensure that your blackslashes are properly escaped: “foo\\bar” is the correct version of trying to match “foo\bar”.

4. Ensure any ‘/’ are inserted as ‘%2F’ in the Amazon account and secret codes.

Additionally, there are also tables of Node Status and Node Configuration. These should match up with what you think you configured.

At this point everything should work. Admittedly I had a lot of trouble getting to this point. But with the help of the Cloudera folks and the users on irc.freenode.net in #flume, I was able to get things going. The logs sadly aren’t too helpful here in most cases (but look anyway cause they might provide you with more info than they provided for me). If I missed anything in this post or there is something else I am unaware of, then let me know.

## References

#### Filed under:

5 Responses
• Aaron / February 08, 2011 / 7:57 AM

Great post which helped me through a number of issues.

One comment – there is no need to download the commons-codec, commons-httpclient and jets3t jar files (if you installed from packages) – they are part of the hadoop-0.20 package, which is required by flume and installed automatically. Just create the following file and restart your flume node.

/usr/lib/flume/bin/flume-env.sh

• Julian / February 16, 2012 / 3:48 PM

Thanks for the post! I have followed your instructions to a “t” and am having problems with the commands through the web interface. I am getting the error “Attempted to write an invalid sink/source: Lexer error at char ‘\u201C’ at line 1 char 12.” when running config [flume-agent, tailDir(?/mylog/?,?.*.log?), agentBESink(?flume-collector?,35853). Any ideas?

• julian / February 16, 2012 / 7:36 PM

2012-02-16 18:31:57,547 WARN org.jets3t.service.impl.rest.httpclient.RestS3Service: Response ‘/server20120216-183127448-0800.9116837958553.00000020.tmp’ – Unexpected response code 403, expected 200

I am now getting this error message. Any ideas?

• Julian / February 17, 2012 / 1:11 PM

I got everything working! Turned out to be that my amazon credentials had slashes wich was causing the char error above. Additionally, I was using an IAM user that should have write and read access, but it turns out that jets3 performs more operations than just read and write. This was causing the access denied errors.
All future comments will be posted to:
cdh-user@cloudera.org as this is the appropriate place for support discussions.

Thanks again for the doc!