Exploring Apache Spark on the New BigDataLite 3.0 VM

The latest version of Oracle’s BigDataLite VirtualBox VM went up on OTN last week, and amongst other things it includes the latest CDH5.0 version of Cloudera Distribution including Hadoop, as featured on Oracle Big Data Appliance. This new version comes with an update to MapReduce, moving it to MapReduce 2.0 (with 1.0 still there for backwards-compatibility) and with YARN as the replacement for the Hadoop JobTracker. If you’re developing on CDH or the BigDataLite VM you shouldn’t notice any differences with the move to YARN, but it’s a more forward-looking, modular way of tracking and allocating resources on these types of compute clusters that also opens them up to processing models other than MapReduce.

The other new Hadoop feature that you’ll notice with BigDataLite VM and CDH5 is an updated version of Hue, the web-based development environment you use for creating Hive queries, uploading files and so on. As the version of Hue shipped is now Hue 3.5, there’s proper support for quoted CSV files in the Hue / Hive uploader (hooray) along with support for stripping the first, title, line, and an updated Pig editor that prompts you for command syntax (like the Hortonworks Pig editor).

This new version of BigDataLite also seems to have had Cloudera Manager removed (or at least, it’s not available as usual at http://bigdatalite:7180), with instead a utility provided on the desktop that allows you to stop and start the various VM services, including the Oracle Database and Oracle NoSQL database that also come with the VM. Strictly-speaking its actually easier to use than Cloudera Manager, but it’s a shame it’s gone as there’s lots of monitoring and configuration tools in the product that I’ve found useful in the past.

CDH5 also comes with Apache Spark, a cluster processing framework that’s being positioned as the long-term replacement for MapReduce. Spark is technically a programming model that allows developers to create scripts, or programs, that bring together operators such as filters, aggregators, joiners and group-bys using languages such as Scala and Python, but crucially this can all happen in-memory - making Spark potentially much faster than MapReduce for doing both batch, and ad-hoc analysis.

This article on the Cloudera blog goes into more detail on what Apache Spark is and how it improves over MapReduce, and this article takes a look at the Spark architecture and how it’s approach avoids the multi-stage execution model that MapReduce uses (something you’ll see if you ever do a join in Pig or Hive). But what does some basic Spark code look like, using the default Scala language most people associate Spark with? Let’s take a look at some sample code, using the same Rittman Mead Webserver log files I used in the previous Pig and Hive/Impala articles.

You can start up Spark in interactive mode, like you do with Pig and Grunt, by opening a Terminal session and typing in “spark-shell”:

[oracle@bigdatalite ~]$ spark-shell
14/05/12 20:56:50 INFO HttpServer: Starting HTTP Server
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 0.9.0
/_/

Using Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51)
Type in expressions to have them evaluated.
Type :help for more information.
14/05/12 20:56:56 INFO Slf4jLogger: Slf4jLogger started
14/05/12 20:56:56 INFO Remoting: Starting remoting
14/05/12 20:56:56 INFO Remoting: Remoting started; 
...

14/05/12 20:56:57 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140512205657-0002
14/05/12 20:56:57 INFO SparkILoop: Created spark context..
Spark context available as sc.

scala>

Spark has the concept of RDDs, “Resilient Distributed Datasets” that can be thought of as similar to relations in Pig, and tables in Hive, but which crucially can be cached in RAM for improved performance when you need to access their dataset repeatedly. Like Pig, Spark features “lazy execution”, only processing the various Spark commands when you actually need to (for example, when outputting the results of a data-flow”, so let’s run two more commands to load in one of the log files on HDFS, and then count the log file lines within it.

scala> val logfile = sc.textFile("logs/access_log")
14/05/12 21:18:59 INFO MemoryStore: ensureFreeSpace(77353) called with curMem=234759, maxMem=309225062
14/05/12 21:18:59 INFO MemoryStore: Block broadcast_2 stored as values to memory (estimated size 75.5 KB, free 294.6 MB)
logfile: org.apache.spark.rdd.RDD[String] = MappedRDD[31] at textFile at <console>:15

scala> logfile.count()
14/05/12 21:19:06 INFO FileInputFormat: Total input paths to process : 1
14/05/12 21:19:06 INFO SparkContext: Starting job: count at <console>:1
...
14/05/12 21:19:06 INFO SparkContext: Job finished: count at <console>:18, took 0.192536694 s
res7: Long = 154563

So the file contains 154563 records. Running the logfile.count() command again, though, brings back the count immediately as the RDD has been cached; we can explicitly cache RDDs directly in these commands if we like, by using the “.cache” method:

scala> val logfile = sc.textFile("logs/access_log").cache

So let’s try some filtering, retrieving just those log entries where the user is requesting our BI Apps 11g homepage ("/biapps11g/“):

scala> val biapps11g = logfile.filter(line => line.contains("/biapps11g/"))
biapps11g: org.apache.spark.rdd.RDD[String] = FilteredRDD[34] at filter at <console>:17
scala> biapps11g.count()
...
14/05/12 21:28:28 INFO SparkContext: Job finished: count at <console>:20, took 0.387960876 s
res9: Long = 403 

Or I can create a dataset containing just those records that have a 404 error code:

scala> val errors = logfile.filter(_.contains("404"))
errors: org.apache.spark.rdd.RDD[String] = FilteredRDD[36] at filter at <console>:17
scala> errors.count()
...
res11: Long = 1577 

Spark, using Scala as the language, has routines for filtering, joining, splitting and otherwise transforming data, but something that’s quite common in Spark is to create Java JAR files, typically from compiled Scala code, to encapsulate certain common data transformations, such as this Apache CombinedFormat Log File parser available on Github from @alvinalexander, author of the Scala Cookbook. Once you’ve compiled this into a JAR file and added it to your SPARK_CLASSPATH (see his blog post for full details, and from where the Spark examples below were taken from), you can start to work with the individual log file elements, like we did in the Hive and Pig examples where we parsed the log file using Regexes.

scala> import com.alvinalexander.accesslogparser._
import com.alvinalexander.accesslogparser._

scala> val p = new AccessLogParser
p: com.alvinalexander.accesslogparser.AccessLogParser = com.alvinalexander.accesslogparser.AccessLogParser@6d32bc14

Then I can access the HTTP Status Code using its own property, like this:

def getStatusCode(line: Option[AccessLogRecord]) = {
  line match {
    case Some(l) => l.httpStatusCode
    case None => "0"
  }
}

log.filter(line => getStatusCode(p.parseRecord(line)) == "404").count 
...
res12: Long = 1233 

Then we can use a similar method to retrieve all of the “request” entries in the log file where the user got a 404 error, starting off by defining two methods that will help with the request parsing - note the use of the :paste command which allows you to block-paste a set of commands into the scala-shell:

scala> :paste
// Entering paste mode (ctrl-D to finish)
def getRequest(rawAccessLogString: String): Option[String] = {
  val accessLogRecordOption = p.parseRecord(rawAccessLogString)
  accessLogRecordOption match {
    case Some(rec) => Some(rec.request)
    case None => None
  }
}
def extractUriFromRequest(requestField: String) = requestField.split(" ")(1)
// Exiting paste mode, now interpreting.

getRequest: (rawAccessLogString: String)Option[String]
extractUriFromRequest: (requestField: String)String

Now we can run the code to output the URIs that have been generating 404 errors:

scala> :paste
// Entering paste mode (ctrl-D to finish)
log.filter(line => getStatusCode(p.parseRecord(line)) == "404").map(getRequest(_)).count
val recs = log.filter(line => getStatusCode(p.parseRecord(line)) == "404").map(getRequest(_))
val distinctRecs = log.filter(line => getStatusCode(p.parseRecord(line)) == "404")
                      .map(getRequest(_))
                      .collect { case Some(requestField) => requestField }
                      .map(extractUriFromRequest(_))
                      .distinct
distinctRecs.count
distinctRecs.foreach(println)
...
/wp2/wp-content/uploads/2009/11/fin5.jpg/
wp2/wp-content/uploads/2009/08/qv10.jpg/
wp2/wp-content/uploads/2010/02/image32.png/2013/08/inside-my-home-office-development-lab-vmware-os-x-server/
wp-content/themes/optimize/thumb.php 
...

So Spark is commonly-considered the successor to MapReduce, and you can start playing around with it on the new BigDataLite VM. Unless you’re a Java (or Scala, or Python) programmer, Spark isn’t quite as easy as Pig or Hive to get into, but the potential benefits over MapReduce are impressive and it’d be worth taking a look. Hopefully we’ll have more on Spark on the blog over the next few months, as we get to grips with it properly.