Trickle-Feeding Log Files to HDFS using Apache Flume

In some previous articles on the blog I’ve analysed Apache webserver log files sitting on a Hadoop cluster using Hive, Pig and most recently, Apache Spark. In all cases the log files have already been sitting on the Hadoop cluster, SFTP’d to my local workstation and then uploaded to HDFS, the Hadoop distributed filesystem, using Hue, and the only way to add to them is to repeat the process and manually copy them across from our webserver. But what if I want these log files to be copied-across automatically, in a kind of “trickle-feed” process similar to how Oracle GoldenGate trickle-feeds database transactions to a data warehouse? Enter Apache Flume, a component within Hadoop and the Cloudera CDH4/5 distribution of Hadoop, which does exactly this.

Flume is an Apache project within the overall Hadoop ecosystem that provides a reliable, distributed mechanism for collecting aggregating and moving large amounts of log data. Similar to GoldenGate it has transaction collectors, mechanisms to reliably transmit data from source to target, and mechanisms to write those log events to a centralised data store, for example HDFS. It’s free and comes with Cloudera CDH, and coupled with something at the target end to then process and work with the incoming log entries, is a pretty powerful and flexible way to transmit log-type entries from (potentially) multiple source providers to a central Hadoop cluster.

To take our example, we’ve got a webserver that’s generating our Apache CombinedLogFormat log entries as users generate activity on the website. We then set up Flume agents on the source webserver, and then the Hadoop client node that’s going to receive the log entries, which then writes the log entries to HDFS just like any other file activity. The Flume agent on the webserver source “tail”s the Apache access.log file copying across entries as they’re made (more or less), so that the target HDFS log file copies are kept up to date with individual log entries, not just whole log files as they’re closed off, with the diagram below showing the overall schematic:

Down at the Flume component level, Flume consists of agents, Java processes that sit on the source, target and any intermediate servers; channels, intermediate staging points that can persist log entries to disk, database or memory; and sinks, processes that take log transactions out of a channel and write them to disk. Flume is designed to be distributed and resilient, and won’t take the source down if the target Hadoop environment isn’t available; if this type of situation occurs, transactions will slowly fill-up the channel used by the source agent until such time as it runs out of space, and then further log transactions are lost until the target comes back up and the source agent’s channel regains some spare space. The diagram below, from the Cloudera blog about the latest generation of Flume (Flume NG, for “Next Generation”) shows the Flume product topology:

whilst the next diagram shows how Flume can collect and aggregate log entries from multiple servers, and then combine them into one log stream sent to a single target.

In our example, that’s all there is to it; in more complex examples, perhaps where the source is sending XML log entries, you’d need a downstream processor on the target platform to decode, deserialise or parse the incoming log files - Flume is just a transport mechanism and doesn’t do any transformation itself. You can also choose how the log entries are held by each of the agents’ channels; in the example we’re going to use, channel data is just held in-memory which is fast to run and setup, but you’d lose all of your data in the process if the server went down. Other, more production-level processes would persist the channel entries to file, or even a mySQL database.

For our setup we need to two agents, one on the source and one on the target server, each of which has its own configuration file. The source agent configuration file looks like this, with key entries called-out underneath it:

## SOURCE AGENT ##
## Local instalation: /home/ec2-user/apache-flume
## configuration file location:  /home/ec2-user/apache-flume/conf
## bin file location: /home/ec2-user/apache-flume/bin
## START Agent: bin/flume-ng agent -c conf -f conf/flume-src-agent.conf -n source_agent

# http://flume.apache.org/FlumeUserGuide.html#exec-source
source_agent.sources = apache_server
source_agent.sources.apache_server.type = exec
source_agent.sources.apache_server.command = tail -f /etc/httpd/logs/access_log
source_agent.sources.apache_server.batchSize = 1
source_agent.sources.apache_server.channels = memoryChannel
source_agent.sources.apache_server.interceptors = itime ihost itype

# http://flume.apache.org/FlumeUserGuide.html#timestamp-interceptor
source_agent.sources.apache_server.interceptors.itime.type = timestamp

# http://flume.apache.org/FlumeUserGuide.html#host-interceptor
source_agent.sources.apache_server.interceptors.ihost.type = host
source_agent.sources.apache_server.interceptors.ihost.useIP = false
source_agent.sources.apache_server.interceptors.ihost.hostHeader = host

# http://flume.apache.org/FlumeUserGuide.html#static-interceptor
source_agent.sources.apache_server.interceptors.itype.type = static
source_agent.sources.apache_server.interceptors.itype.key = log_type
source_agent.sources.apache_server.interceptors.itype.value = apache_access_combined

# http://flume.apache.org/FlumeUserGuide.html#memory-channel
source_agent.channels = memoryChannel
source_agent.channels.memoryChannel.type = memory
source_agent.channels.memoryChannel.capacity = 100

## Send to Flume Collector on Hadoop Node
# http://flume.apache.org/FlumeUserGuide.html#avro-sink
source_agent.sinks = avro_sink
source_agent.sinks.avro_sink.type = avro
source_agent.sinks.avro_sink.channel = memoryChannel
source_agent.sinks.avro_sink.hostname = 81.155.163.172
source_agent.sinks.avro_sink.port = 4545
  • Source is set to “apache_server”, i.e. an Apache HTTP server
  • The capture mechanism is the Linux “tail” command
  • Log entries are held by the channel mechanism in-memory, rather than to file or database
  • Timestamp is used by the source collector to tell which entries are new
  • The agent then sends the log entries to a corresponding Flume agent on the Hadoop Cluster, in this case an IP address that corresponds to my network’s external IP address, with Flume network traffic then NATted by my router to cdh4-node1.rittmandev.com, the client node in my CDH4.6 Hadoop cluster running on VMWare.

The target server in my Hadoop cluster then has a corresponding configuration file set up, looking like this:

## TARGET AGENT ##
## configuration file location:  /etc/flume-ng/conf
## START Agent: flume-ng agent -c conf -f /etc/flume-ng/conf/flume-trg-agent.conf -n collector

#http://flume.apache.org/FlumeUserGuide.html#avro-source
collector.sources = AvroIn
collector.sources.AvroIn.type = avro
collector.sources.AvroIn.bind = 0.0.0.0
collector.sources.AvroIn.port = 4545
collector.sources.AvroIn.channels = mc1 mc2

## Channels ##
## Source writes to 2 channels, one for each sink
collector.channels = mc1 mc2

#http://flume.apache.org/FlumeUserGuide.html#memory-channel

collector.channels.mc1.type = memory
collector.channels.mc1.capacity = 100

collector.channels.mc2.type = memory
collector.channels.mc2.capacity = 100

## Sinks ##
collector.sinks = LocalOut HadoopOut

## Write copy to Local Filesystem 
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
collector.sinks.LocalOut.type = file_roll
collector.sinks.LocalOut.sink.directory = /var/log/flume-ng
collector.sinks.LocalOut.sink.rollInterval = 0
collector.sinks.LocalOut.channel = mc1

## Write to HDFS
#http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
collector.sinks.HadoopOut.type = hdfs
collector.sinks.HadoopOut.channel = mc2
collector.sinks.HadoopOut.hdfs.path = /user/root/flume-channel/%{log_type}/%y%m%d
collector.sinks.HadoopOut.hdfs.fileType = DataStream
collector.sinks.HadoopOut.hdfs.writeFormat = Text
collector.sinks.HadoopOut.hdfs.rollSize = 0
collector.sinks.HadoopOut.hdfs.rollCount = 10000
collector.sinks.HadoopOut.hdfs.rollInterval = 600

Key entries in this log file are:

  • Apache AVRO is the file format we’re using to transmit the data, and Flume is working on port 4545
  • There’s two sink collector channels defined - “mc1” for writing file entries to the local server filesystem, and one to HDFS
  • The maximum number of events (log entries) Flume will store in the various channels (log entry persistence stores) is 100, meaning that if the target platform goes down and more than 100 log transactions back-up, then further ones will get lost until we can clear the channel down. Of course this limit can be increased, assuming there’s memory or disk spare.

I then SSH into the target Hadoop node and start the Flume agent, like this:

[root@cdh4-node1 ~]# flume-ng agent -c conf -f /etc/flume-ng/conf/flume-trg-agent.conf -n collector
Info: Including Hadoop libraries found via (/usr/bin/hadoop) for HDFS access
Info: Excluding /usr/lib/hadoop/lib/slf4j-api-1.6.1.jar from class path
...
14/05/18 18:15:29 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
14/05/18 18:15:29 INFO hdfs.BucketWriter: Creating /user/root/flume-channel/apache_access_combined/18052014/FlumeData.1400433329254.tmp

and then repeat the step for the source webserver, like this:

[ec2-user@ip-10-35-143-131 apache-flume]$ sudo bin/flume-ng agent -c conf -f conf/flume-src-agent.conf -n source_agent
Warning: JAVA_HOME is not set!
+ exec /usr/bin/java -Xmx20m -cp '/home/ec2-user/apache-flume/conf:/home/ec2-user/apache-flume/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/flume-src-agent.conf -n source_agent

Finally, moving across to Hue I can see new log entries being written to the HDFS file system:

So there you go - simple transport of webserver log entries from a remote server to my Hadoop cluster, via Apache Flume - thanks again to Nelio Guimaraes from the RM team for setting the example up.