So What's the Real Point of ODI12c for Big Data Generating Pig and Spark Mappings?

Oracle ODI12c for Big Data came out the other week, and my colleague Jérôme Françoisse put together an introductory post on the new features shortly after, covering ODI’s new ability to generate Pig and Spark transformations as well as the traditional Hive ones. How this works is that you can now select Apache Pig, or Apache Spark (through pySpark, the Spark API through Python) as the implementation language for an ODI mapping, and ODI will generate one of those languages instead of HiveQL commands to run the mapping.

How this works is that ODI12c 12.1.3.0.1 adds a bunch of new component-style KMs to the standard 12c ones, providing filter, aggregate, file load and other features that generate pySpark and Pig code rather than the usual HiveQL statement parts. Component KMs have also been added for Hive as well, making it possible now to include non-Hive datastores in a mapping and join them all together, something it was hard to do in earlier versions of ODI12c where the Hive IKM expected to do the table data extraction as well.

But when you first look at this you may well be tempted to think “…so what?”, in that Pig compiles down to MapReduce in the end, just like Hive does, and you probably won’t get the benefits of running Spark for just a single batch mapping doing largely set-based transformations. To my mind where this new feature gets interesting is its ability to let you take existing Pig and Spark scripts, which process data in a different, dataflow-type way compared to Hive’s set-based transformations and which also potentially also use Pig and Spark-specific function libraries, and convert them to managed graphical mappings that you can orchestrate and run as part of a wider ODI integration process.

Pig, for example, has the LinkedIn-originated DataFu UDF library that makes it easy to sessionize and further transform log data, and the Piggybank community library that extends Pig’s loading and saving capabilities to additional storage formats, and provides additional basic UDFs for timestamp conversion, log parsing and so forth. We’ve used these libraries in the past to process log files from our blog’s webserver and create classification models to help predict whether a visitor will return, with the Pig script below using the DataFu and Piggybank libraries to perform these tasks easily in Pig.

register /opt/cloudera/parcels/CDH/lib/pig/datafu.jar;
register /opt/cloudera/parcels/CDH/lib/pig/piggybank.jar;

DEFINE Sessionize datafu.pig.sessions.Sessionize('60m');
DEFINE Median datafu.pig.stats.StreamingMedian();
DEFINE Quantile datafu.pig.stats.StreamingQuantile('0.9','0.95');
DEFINE VAR datafu.pig.VAR();
DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
DEFINE ISOToUnix org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix();

--------------------------------------------------------------------------------
-- Import and clean logs
raw_logs = LOAD '/user/flume/rm_logs/apache_access_combined' USING TextLoader AS (line:chararray);

-- Extract individual fields
logs_base = FOREACH raw_logs
GENERATE FLATTEN
(REGEX_EXTRACT_ALL(line,'^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"')) AS
(remoteAddr: chararray, remoteLogName: chararray, user: chararray, time: chararray, request: chararray, status: chararray, bytes_string: chararray, referrer:chararray, browser: chararray);

-- Remove Bots and convert timestamp
logs_base_nobots = FILTER logs_base BY NOT (browser matches '.*(spider|robot|bot|slurp|Bot|monitis|Baiduspider|AhrefsBot|EasouSpider|HTTrack|Uptime|FeedFetcher|dummy).*');

-- Remove uselesss columns and convert timestamp
clean_logs = FOREACH logs_base_nobots GENERATE CustomFormatToISO(time,'dd/MMM/yyyy:HH:mm:ss Z') as time, remoteAddr, request, status, bytes_string, referrer, browser;

--------------------------------------------------------------------------------
-- Sessionize the data

clean_logs_sessionized = FOREACH (GROUP clean_logs BY remoteAddr) {
ordered = ORDER clean_logs BY time;
GENERATE FLATTEN(Sessionize(ordered))
AS (time, remoteAddr, request, status, bytes_string, referrer, browser, sessionId);
};

-- The following steps will generate a tsv file in your home directory to download and work with in R
store clean_logs_sessionized into '/user/jmeyer/clean_logs' using PigStorage('\t','-schema');

If you know Pig (or read my previous articles on this theme), you’ll know that pig has the concept of an “alias”, a dataset you define using filters, aggregations, projections and other operations against other aliases, with a typical pig script starting with a large data extract and then progressively whittling it down to just the subset of data, and derived data, you’re interested in. When it comes to script execution, Pig only materializes these aliases when you tell it to store the results in permanent storage (file, Hive table etc) with the intermediate steps just being instructions on how to progressively arrive at the final result. Spark works in a similar way with its RDDs, transformations and operations which either create a new dataset based off of an existing one, or materialise the results in permanent storage when you run an “action”. So let’s see if ODI12c for Big Data can create a similar dataflow, based as much as possible on the script I’ve used above.

… and in-fact it can. The screenshot below shows the logical mapping to implement this same Pig dataflow, with the data coming into the mapping as a Hive table, an expression operator creating the equivalent of a Pig alias based off of a filtered, transformed version of the original source data using the Piggybank CustomFormatToISO UDF, and then runs the results of that through an ODI table function that in the background transforms the data using Pig’s GENERATE FLATTEN command and a call to the DataFu Sessionize UDF.

And this is the physical mapping to go with the logical mapping. Note that all of the Pig transformations are contained within a separate execution unit, that contains operators for the expression to transform and filter the initial dataset, and another for the table function.

The table function operator runs the input fields through an arbitrary Pig Latin script, in this case defining another alias to match the table function operator name and using the DataFu Sessionize UDF within a FOREACH to first sort, and then GENERATE FLATTEN the same columns but with a session ID for user sessions with the same IP address and within 60 seconds of each other.

If you’re interested in the detail of how this works and other usages of the new ODI12c for Big Data KMs, then come along to the masterclass I’m running with Jordan Meyer at the Brighton and Atlanta Rittman Mead BI Forums where I’ll go into the full details as part of a live end-to-end demo. Looking at the Pig Latin that comes out of it though, you can see it more or less matches the flow of the hand-written script and implements all of the key steps.

Finally, checking the output of the mapping I can see that the log entries have been sessionized and they’re ready to pass on to the next part of the classification model.

So that to my mind is where the value is in ODI generating Pig and Spark mappings. It’s not so much taking an existing Hive set-based mapping and just running it using a different language, it’s more about being able to implement graphically the sorts of data flows you can create with Pig and Spark, and being able to get access to the rich UDF and data access libraries that these two languages benefit from. As I said, come along to the masterclass Jordan and I are running, and I’ll go into much more detail and show how the mapping is set up, along with other mappings to create an end-to-end Hadoop data integration process.