Creating Real-Time Search Dashboards using Apache Solr, Hue, Flume and Cloudera Morphlines

Late last week Cloudera published a blog post on their developer site on building a real-time log analytics dashboard using Apache Kafka, Cloudera Search and Hue. As I’d recently been playing around with Oracle Big Data Discovery with our website log data as the data source, and as we’ve also been doing the same exercise in our development labs using ElasticSearch and Kibana I thought it’d be interesting to give it a go; partly out of curiosity around how Solr, Kafka and Hue search works and compares to Elasticsearch, but also to try and work out what extra benefit Big Data Discovery gives you above and beyond free and open-source tools.

In the example, Apache web log data is read from the Linux server via a Flume syslog source, then fed into Apache Kafka as the transport mechanism before being loaded into Solr using a data transformation framework called “morphlines”. I’ve been looking at Kafka as an alternative to Flume for ingesting data into a Hadoop system for a while mainly because of the tireless advocacy of Cloudera’s Gwen Shapira (Oracle ACE, ex-Pythian, now at Cloudera) who I respect immensely and has a great background in Oracle database administration as well as Hadoop, and because it potentially offers some useful benefits if used instead of, or more likely alongside, Flume - a publish-subscribe model vs. push, the ability to have multiple consumers as well as publishers, and a more robust transport mechanism that should avoid data loss when an agent node goes down. Kafka is now available as a parcel and service descriptor that you can download and then install within CDH5, and so I set up a separate VM in my Hadoop cluster as a Kafka broker and also installed Solr at the same time.

Working through the example, in the end I went with a slightly different and simplified approach that swapped the syslog Flume source for an Apache Server file tailing source, as our webserver was on a different host to the Flume agent and I’d previously set this up before for an earlier blog post. I also dropped the Kafka element as the Cloudera article wasn’t that clear to me whether it’d work in its published form or needed amending to use with Kafka ("To get data from Kafka, parse it with Morphlines, and index it into Solr, you can use an almost identical configuration”), and so I went with an architecture that looked like this:

Compared to Big Data Discovery, this approach has got some drawbacks, but some interesting benefits. From a drawback perspective, Apache Solr (or Cloudera Search as it’s called in CDH5, where Cloudera have integrated Solr with HDFS storage) needs some quite fiddly manual setup that’s definitely an IT task, rather than the point-and-click dataset setup that you get with Big Data Discovery. In terms of benefits though, apart from being free it’s potentially more scalable than Big Data Discovery as BDD has to sample the full Hadoop dataset and fit that sample (typically 1m rows, or 1-5% of the full dataset) into BDD’s Endeca Server-based DGraph engine; Solr, however, indexes the whole Hadoop dataset and can store its indexes and log files within HDFS across the cluster - potentially very interesting if it works.

Back to drawbacks though, the first complication is that Solr’s configuration settings in this Cloudera Search incarnation are stored in Apache Zookeeper, so you first have to download a template copy of the collection files (schema, index etc) from Zookeeper using solrctl, the command-line tool for SolrCloud (Solr running on a distributed cluster, as it is with Cloudera Search)

solrctl --zk bda5node2:2181/solr instancedir --generate $HOME/accessCollection

Then - and this again is a tricky part compared to Big Data Discovery - you have to edit the schema.xml file that Solr uses to determine which fields to index, what their datatypes are and so on. The Cloudera blog post points to a Github repo with the required schema.xml file for Apache Combined Log Format input files, I found I had to add an extra entry for the “text” field name before Solr would index properly, added at the end of the file except here:

<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" />
   <field name="time" type="tdate" indexed="true" stored="true" />
   <field name="record" type="text_general" indexed="true" stored="false" multiValued="true"/>
   <field name="client_ip" type="string" indexed="true" stored="true" />
   <field name="code" type="string" indexed="true" stored="true" />
   <field name="user_agent" type="string" indexed="true" stored="true" />
   <field name="protocol" type="string" indexed="true" stored="true" />   
   <field name="url" type="string" indexed="true" stored="true" />   
   <field name="request" type="string" indexed="true" stored="true" />
   <field name="referer" type="string" indexed="true" stored="true" />
   <field name="bytes" type="string" indexed="true" stored="true" />
   <field name="method" type="string" indexed="true" stored="true" />
   
   <field name="extension" type="string" indexed="true" stored="true" />   
   <field name="app" type="string" indexed="true" stored="true" />      
   <field name="subapp" type="string" indexed="true" stored="true" />
      
   <field name="device_family" type="string" indexed="true" stored="true" />
   <field name="user_agent_major" type="string" indexed="true" stored="true" />   
   <field name="user_agent_family" type="string" indexed="true" stored="true" />
   <field name="os_family" type="string" indexed="true" stored="true" />   
   <field name="os_major" type="string" indexed="true" stored="true" />
   
   <field name="region_code" type="string" indexed="true" stored="true" />
   <field name="country_code" type="string" indexed="true" stored="true" />
   <field name="city" type="string" indexed="true" stored="true" />
   <field name="latitude" type="float" indexed="true" stored="true" />
   <field name="longitude" type="float" indexed="true" stored="true" />
   <field name="country_name" type="string" indexed="true" stored="true" />
   <field name="country_code3" type="string" indexed="true" stored="true" />
 
   <field name="_version_" type="long" indexed="true" stored="true"/>
   <field name="text" type="text_general" indexed="true" stored="false" multiValued="true"/>
   <dynamicField name="ignored_*" type="ignored"/>

Then you have to upload the solr configuration settings to Zookeeper, and then configure Solr to use this particular set of Zookeeper Solr settings (note the “—create” before the accessCollection collection name in the second command, this was missing from the Cloudera steps but is needed to be a valid solrctl command)

solrctl --zk bda5node2:2181/solr instancedir --create accessCollection $HOME/accessCollection
solrctl --zk bda5node2:2181/solr --create accessCollection -s 1

At this point you should be able to go to the Solr web admin page within the CDH cluster (http://bda5node5.rittmandev.com:8983/solr/#/, in my case), and see the collection (a distributed Solr index) listed with the updated index schema.

Next I configure the Flume source agent on the RM webserver, using this Flume conf file:

## SOURCE AGENT ##
## Local instalation: /etc/flume1.5.0
## configuration file location:  /etc/flume1.5.0/conf/conf
## bin file location: /etc/flume1.5.0/conf/bin
## START Agent: bin/flume-ng agent -c conf -f conf/flume-src-agent.conf -n source_agent
 
# http://flume.apache.org/FlumeUserGuide.html#exec-source
source_agent.sources = apache_server
source_agent.sources.apache_server.type = exec
source_agent.sources.apache_server.command = tail -f /etc/httpd/logs/access_log
source_agent.sources.apache_server.batchSize = 1
source_agent.sources.apache_server.channels = memoryChannel
source_agent.sources.apache_server.interceptors = itime ihost itype
 
# http://flume.apache.org/FlumeUserGuide.html#timestamp-interceptor
source_agent.sources.apache_server.interceptors.itime.type = timestamp
 
# http://flume.apache.org/FlumeUserGuide.html#host-interceptor
source_agent.sources.apache_server.interceptors.ihost.type = host
source_agent.sources.apache_server.interceptors.ihost.useIP = false
source_agent.sources.apache_server.interceptors.ihost.hostHeader = host
 
# http://flume.apache.org/FlumeUserGuide.html#static-interceptor
source_agent.sources.apache_server.interceptors.itype.type = static
source_agent.sources.apache_server.interceptors.itype.key = log_type
source_agent.sources.apache_server.interceptors.itype.value = apache_access_combined
 
# http://flume.apache.org/FlumeUserGuide.html#memory-channel
source_agent.channels = memoryChannel
source_agent.channels.memoryChannel.type = memory
source_agent.channels.memoryChannel.capacity = 100
 
## Send to Flume Collector on Hadoop Node
# http://flume.apache.org/FlumeUserGuide.html#avro-sink
source_agent.sinks = avro_sink
source_agent.sinks.avro_sink.type = avro
source_agent.sinks.avro_sink.channel = memoryChannel
source_agent.sinks.avro_sink.hostname = rittmandev.com
source_agent.sinks.avro_sink.port = 4545

and then I set up a Flume sink agent as part of the Flume service using Cloudera Manager, initially set as “stopped”.

The Flume configuration file for this sink agent is where the clever stuff happens.

collector.sources = AvroIn
collector.sources.AvroIn.type = avro
collector.sources.AvroIn.bind = bda5node5
collector.sources.AvroIn.port = 4545
collector.sources.AvroIn.channels = mc1 mc2

collector.channels = mc1 mc2
collector.channels.mc1.type = memory
collector.channels.mc1.transactionCapacity = 1000
collector.channels.mc1.capacity = 100000
collector.channels.mc2.type = memory
collector.channels.mc2.capacity = 100000
collector.channels.mc2.transactionCapacity = 1000

collector.sinks = LocalOut MorphlineSolrSink

collector.sinks.LocalOut.type = file_roll
collector.sinks.LocalOut.sink.directory = /tmp/flume/website_logs
collector.sinks.LocalOut.sink.rollInterval = 0
collector.sinks.LocalOut.channel = mc1

collector.sinks.MorphlineSolrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
collector.sinks.MorphlineSolrSink.morphlineFile = /tmp/morphline.conf
collector.sinks.MorphlineSolrSink.channel = mc2

The interesting bit here is the MorphlineSolrSink flume sink. This Flume sink type routes flume events to a morphline script that in turn copies the log data into the HDFS storage area used by Solr, and passes it to Solr for immediate indexing. Cloudera Morphlines is a command-based lightweight ETL framework designed to transform streaming data from Flume, Spark and other sources and load it into HDFS, HBase or in our case, Solr. Morphlines config files define ETL routines that then call  extensible morphlines Kite SDK functions to perform transformations on incoming data streams such as

  • Split webserver request fields into HTTP protocol, method and URL requested
  • In conjunction with the Maxmind GeoIP database, generate the country, city and geocode for a given IP address
  • Converting dates and times in string format to a Solr-format date and timestamp

with the output then being passed to Solr in this instance, along with the UUID and other metadata Solr needs, for loading to the Solr index, or “collection” as its termed when it’s running across the cluster (note the full log files aren’t stored by this process into HDFS, just the Solr indexes and transaction logs). The morphlines config file I used is below, based on the one provided in the Github repo accompanying the Cloudera blog post - note though that you need to download and setup the Maxmind GeoIP database file, and install the Python pip utility and a couple of pip packages before this will work:

# Specify server locations in a SOLR_LOCATOR variable;
# used later in variable substitutions
# Change the zkHost to point to your own Zookeeper quorum
SOLR_LOCATOR : {
    # Name of solr collection
    collection : accessCollection
    # ZooKeeper ensemble
    zkHost : "bda5node2:2181/solr"
}
 
# Specify an array of one or more morphlines, each of which defines an ETL
# transformation chain. A morphline consists of one or more (potentially
# nested) commands. A morphline is a way to consume records (e.g. Flume events,
# HDFS files or blocks), turn them into a stream of records, and pipe the stream
# of records through a set of easily configurable transformations on it's way to
# Solr (or a MapReduceIndexerTool RecordWriter that feeds via a Reducer into Solr).
morphlines : [
{
    # Name used to identify a morphline. E.g. used if there are multiple morphlines in a
    # morphline config file
    id : morphline1
    # Import all morphline commands in these java packages and their subpackages.
    # Other commands that may be present on the classpath are not visible to this morphline.
    importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
    commands : [
    {
        ## Read the email stream and break it up into individual messages.
        ## The beginning of a message is marked by regex clause below
        ## The reason we use this command is that one event can have multiple
        ## messages
        readCSV {
    separator:  " "
            columns:  [client_ip,C1,C2,time,dummy1,request,code,bytes,referer,user_agent,C3]
    ignoreFirstLine : false
            quoteChar : "\""
            commentPrefix : ""
            trim : true
            charset : UTF-8
        }
    }
    {
split { 
inputField : request
outputFields : [method, url, protocol]          
separator : " "        
isRegex : false      
#separator : """\s*,\s*"""        
#  #isRegex : true      
addEmptyStrings : false
trim : true          
          }
    }
     {
split { 
inputField : url 
outputFields : ["", app, subapp]          
separator : "\/"        
isRegex : false      
#separator : """\s*,\s*"""        
#  #isRegex : true      
addEmptyStrings : false
trim : true          
          }
    }
    {
userAgent {
inputField : user_agent
outputFields : {
user_agent_family : "@{ua_family}"
user_agent_major  : "@{ua_major}"
device_family     : "@{device_family}"
os_family         : "@{os_family}"
os_major  : "@{os_major}"
}          
}
    }
    {
#Extract GEO information
geoIP {
            inputField : client_ip
            database : "/tmp/GeoLite2-City.mmdb"
}
     }
     {
# extract parts of the geolocation info from the Jackson JsonNode Java 
# # object contained in the _attachment_body field and store the parts in
# # the given record output fields:      
extractJsonPaths {
flatten : false
paths : { 
country_code : /country/iso_code
country_name : /country/names/en
                region_code  : /continent/code
#"/subdivisions[]/names/en" : "/subdivisions[]/names/en"     
#"/subdivisions[]/iso_code" : "/subdivisions[]/iso_code"     
city : /city/names/en
#/postal/code : /postal/code
latitude : /location/latitude
longitude : /location/longitude
#/location/latitude_longitude : /location/latitude_longitude
#/location/longitude_latitude : /location/longitude_latitude
} 
}
      }
      #{logInfo { format : "BODY : {}", args : ["@{}"] } }
    # add Unique ID, in case our message_id field from above is not present
    {
        generateUUID {
            field:id
        }
    }
    # convert the timestamp field to "yyyy-MM-dd'T'HH:mm:ss.SSSZ" format
    {
       #  21/Nov/2014:22:08:27
        convertTimestamp {
            field : time 
            inputFormats : ["[dd/MMM/yyyy:HH:mm:ss", "EEE, d MMM yyyy HH:mm:ss Z", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", "yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd"]
            inputTimezone : America/Los_Angeles
           outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
            outputTimezone : UTC
        }
    }
    # Consume the output record of the previous command and pipe another
    # record downstream.
    #
    # This command sanitizes record fields that are unknown to Solr schema.xml
    # by deleting them. Recall that Solr throws an exception on any attempt to
    # load a document that contains a field that isn't specified in schema.xml
    {
        sanitizeUnknownSolrFields {
            # Location from which to fetch Solr schema
            solrLocator : ${SOLR_LOCATOR}
        }
    }
    # load the record into a SolrServer or MapReduce SolrOutputFormat.
    {
        loadSolr {
            solrLocator : ${SOLR_LOCATOR}
        }
    }
    ]
}
]

Then it’s just a case of starting the target sink agent using Cloudera Manager, and the source agent on the RM webserver using the flume-ng command-line utility, and then (hopefully) watch the web activity log entries start to arrive as documents in the Solr index/collection - which, after a bit of fiddling around and correcting typos, it did:

What’s neat here is that instead of having to use either an ETL tool such as ODI to process and parse the log entries (as I did here, in an earlier blog post series on ODI on Hadoop), or use the Hive-to-DGraph data reload feature in BDD, I’ve instead just got a Flume sink running this morphlines process and my data is added in real-time to my Solr index, and as you’ll see in a moment, a Hue Search dashboard.

To get Hue to work with my Solr service and new index, you first have to add the Solr service URL details to the Hue configuration settings using Cloudera Manager, like this:

Then, you can select the index from the list presented by the Search application within Hue, and start creating your data discovery and faceted search dashboard.

with the end result, after a few minutes of setup, looking like this for me:

So how does Solr, Hue, Flume and Morphlines compare to Oracle Big Data Discovery as a potential search-and-discovery solution on Hadoop? What’s impressive is how little work, once I’d figured it out, it took to set this up including the real-time loading and indexing of data for the dashboard. Compared to a loading HDFS and Hive using ODI, and manually refreshing the BDD DGraph data store, it’s much more lightweight and pretty elegant. But, it’s clearly an IT / developer solution, and I spent a fair few late nights getting it all to work - getting the Solr schema.xml right was a tricky task, and the morphlines / Solr ingestion process was particularly hard to to debug and understand why it wasn’t working.

Oracle Big Data Discovery, by contrast, makes the data loading, transformation and enrichment process available to the business or data analyst, and provides much richer tools for cataloging and exploring the full universe of datasets on the Hadoop cluster. Morphlines compares well to the Groovy transformations provided by Big Data Discovery and Solr is extensible to add functionality such as sentiment analysis and text parsing, but again these are IT tasks and not something the average data analyst will want to do.

In summary then - Hue, Solr and the Morphlines transformation framework can be an excellent tool in the hands of IT professionals and can create surprisingly featureful and elegant solutions with just a bit of code and process configuration - but where Big Data Discovery comes into its own is putting significant parts of this capability in the hands of the business and the data analyst, and providing tools for data upload and wrangling, combining that data with other datasets, analyzing that whole dataset (or "data reservoir") and then collaborating with others around the organization.