End-to-End ODI12c ETL on Oracle Big Data Appliance Pt.4 : Transforming Data using Python & Hive Streaming

This week I’m taking an in-depth look at ETL on the Oracle Big Data Appliance, using Oracle Data Integrator 12c to call the various bits of Hadoop functionality and orchestrate the whole process. So far, I’ve landed web log data into the Hadoop cluster using Flume, created a Hive table over the log data using Hive and the RegEx SerDe, then used further Hive transformations to join this data to other reference data, some of which came from an Oracle database via Sqoop. Here’s a complete listing of the posts so far, and the ones to come:

Our next transformation is a bit trickier though. The data we’re bringing in from our webserver has IP addresses recorded for each HTTP access, and what we want to do is use that IP address to identify the country that the website visitor was located in. Geocoding, as it’s called, is fairly easy to do if your data is stored in a database such as Oracle Database or you’re using a language like Python, with two steps needed to retrieve the IP address’s country:

  1. First convert the IP address into a single integer, like this:
address = '174.36.207.186'
( o1, o2, o3, o4 ) = address.split('.')
integer_ip =   ( 16777216 * o1 )
             + (    65536 * o2 )
             + (      256 * o3 ) 
           +              o4
  1. Then, using a freely-downloadable database from MaxMind, look to see which range of numbers your converted IP address is in:
SELECT ip_country
FROM geoip
WHERE 2921648058 BETWEEN begin_ip_num AND end_ip_num
LIMIT 1

The only problem is that Hive doesn’t support anything other than equi-joins. In a previous blog post I got around this issue by using Impala rather than Hive, and in another one I used Pig, and a custom Python UDF, to do the geocoding outside of the main Pig data flow. I could use Pig and a Python UDF in this situation, creating an ODI Procedure to call Pig from the command-line, but instead I’ll use another ODI KM, IKM Hive Transform, to call a Python script but entirely controlled from within ODI.

To set this geocoding process up, I first create a new ODI mapping, with my current Hive table as the source, and another Hive table this time with an additional column for the IP address country, as the target. I then map what columns I can into the target, leaving just the country column without a mapping. Notice I’ve put the source table within an ODI12c dataset - this isn’t mandatory, it’s just something I decided to do at the time - it doesn’t impact on the rest of the process.

Now in a similar way to how I created the Python UDF to do the country lookup for Pig, I’ll create a variation of that script that expects columns of data as an input, outputs other columns of data, and does the geocoding within the script by calling the Python GeoIP API provided by MaxMind. Most importantly though, I’ll use it in conjunction with IKM Hive Transform’s ability to stream it’s Hive data through an arbitrary script, making use of the Hive TRANSFORM function and a feature called “hive streaming". To keep things simple let's first set it up outside of the KM, see how it works, and then configure the KM to use this python script as its transformation function.

Let’s start by looking at the source Hive table I’ll be starting with:

hive> describe access_per_post_categories;
OK
hostname           string             None                
request_date       string             None                
post_id            string             None                
title              string             None                
author             string             None                
category           string             None                
Time taken: 0.112 seconds, Fetched: 6 row(s)
hive> select * from access_per_post_categories limit 5;
OK
137.254.4.12[02/Jun/2014:21:20:41 +0000]2093Hyperion Planning : Installing the Sample Planning Application in EPM 11.1Mark RittmanHyperion
137.254.4.12[02/Jun/2014:21:20:42 +0000]2093Hyperion Planning : Installing the Sample Planning Application in EPM 11.1Mark RittmanHyperion
137.254.4.12[02/Jun/2014:21:20:43 +0000]2093Hyperion Planning : Installing the Sample Planning Application in EPM 11.1Mark RittmanHyperion
137.254.4.12[02/Jun/2014:21:20:45 +0000]2093Hyperion Planning : Installing the Sample Planning Application in EPM 11.1Mark RittmanHyperion
137.254.4.12[02/Jun/2014:21:21:23 +0000]2093Hyperion Planning : Installing the Sample Planning Application in EPM 11.1Mark RittmanHyperion
Time taken: 0.976 seconds, Fetched: 5 row(s)

The target table is exactly the same, except for the additional column for country:

hive> describe access_per_post_full;
OK
hostname           string             None                
request_date       string             None                
post_id            string             None                
title              string             None                
author             string             None                
country            string             None                
category           string             None                
Time taken: 0.098 seconds, Fetched: 7 row(s)

The way IKM Hive Transform works is by streaming each incoming row of Hive data to the script registered with it, with each row’s columns tab-separated. In-turn, it expects the script to output similar rows of tab-separated values, each line terminated with a newline. My script, therefore, needs to accept an arbitrary number of incoming rows, parse each row into its constituent columns, call the MaxMind API to do the geocoding, and then output a tab-separated set of columns for each row it processes (a bit like pipelined table functions in Oracle PL/SQL).

So here’s the Python script that does this:

[root@bdanode1 tmp]# cat add_countries.py
#!/usr/bin/python
import sys
sys.path.append('/usr/lib/python2.6/site-packages/')
import pygeoip
gi = pygeoip.GeoIP('/tmp/GeoIP.dat')
for line in sys.stdin:
  line = line.rstrip()
  hostname,request_date,post_id,title,author,category = line.split('\t')
  country = gi.country_name_by_addr(hostname)
  print hostname+'\t'+request_date+'\t'+post_id+'\t'+title+'\t'+author+'\t'+country+'\t'+category

A few things to note about the script:

  1. “Import sys” brings in the Python libraries that then allows me to read the input data, via sys.stdin.
    2. I’ve previously installed the MaxMind GeoIP API, database and libraries, with the GeoIP.dat database stored alongside the script in /tmp. Import pygeoip brings in the Python library to use the API.
    3. The bit that does the geocoding is “gi.country_name_by_addr”, which calls the MaxMind API. It’s this bit that replaces the need for a join with a BETWEEN clause in HiveQL.
    4. To output data back to the calling Hive transformation, I just “print” it, using “\t” for tab.

One thing we’ll need to do though, similar to how we had to copy the JAR file used by Hive for the RegEx SerDe around all the nodes in the cluster, Is copy the GeoIP.dat file used by the MaxMind geocoding API to the same place on all the cluster nodes. We’ll also have to set the permissions on this file so it’s readable and writeable by the Hive/YARN process on each Hadoop node:

officeimac:~ markrittman$ ssh root@bdanode2.rittmandev.com
The authenticity of host 'bdanode2.rittmandev.com (192.168.2.231)' can't be established.
RSA key fingerprint is 1b:e7:ec:01:57:0c:09:4e:6d:19:08:a4:0d:df:00:e0.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'bdanode2.rittmandev.com' (RSA) to the list of known hosts.
root@bdanode2.rittmandev.com's password: 
Last login: Fri Jun  6 16:55:44 2014 from 192.168.2.201
[root@bdanode2 ~]# ls /tmp/Geo* -l
-rwxrwxrwx 1 oracle oracle 687502 Jun  7 12:31 /tmp/GeoIP.dat

I also need to make sure the MaxMind geocoding Python API is installed into each node too:

wget https://raw.github.com/pypa/pip/master/contrib/get-pip.py
python get-pip.py 
pip install pygeoip

Now I can run a HiveQL command and test out the script. I’ll start up a Hive shell session, and the first thing I need to do is register the script with Hive, distributing across the cluster and putting it in cache. Then I can select against the source table, using the TRANSFORM USING clause:

[oracle@bdanode1 ~]$ hive
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.min.split.size.per.rack is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.rack
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.min.split.size.per.node is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.node
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
14/06/09 20:48:00 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
14/06/09 20:48:01 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore.
 
Logging initialized using configuration in jar:file:/usr/lib/hive/lib/hive-common-0.12.0-cdh5.0.1.jar!/hive-log4j.properties
hive> add file /tmp/add_countries.py;                                
Added resource: /tmp/add_countries.py
hive> select transform (hostname,request_date,post_id,title,author,category)
    > using 'add_countries.py'                                              
    > as (hostname,request_date,post_id,title,author,category,country)      
    > from access_per_post_categories; 
...
63.73.199.69[02/Jun/2014:21:19:28 +0000] 14726 SmartView as the Replacement for BI Office with OBIEE 11.1.1.7 Mark Rittman United States Oracle EPM
77.125.81.239[02/Jun/2014:21:18:58 +0000] 14476 Upgrading OBIEE to 11.1.1.7 Robin Moffatt Israel Oracle BI Suite EE
77.125.81.239[02/Jun/2014:21:18:59 +0000] 14476 Upgrading OBIEE to 11.1.1.7 Robin Moffatt Israel Oracle BI Suite EE
Time taken: 27.645 seconds, Fetched: 176 row(s)

So it looks like it’s working. Let’s move back to ODI now and reference the script within the IKM Hive Transform KM settings. Note that I enter the name and filesystem location of the script in the KM settings but I don’t use the TRANSFORM_SCRIPT option to directly key the script contents into the KM; this should work, and if you do so the KM will write the script contents out to the agent’s host filesystem at the start of the process, but the KM in this ODI12c release has an issue parsing these keyed-in scripts, so you’re best leaving this setting empty and the KM will just use the file already in the filesystem.

After running the mapping, I then go over to the Operator navigator and see that it’s run successfully. Looking at the code the KM generates, I can see it referencing our script in the HiveQL statement, with the script outputting the extra country column that’s then loaded into the target table along with the other columns.

Then finally, checking the target table, I can see that each row in my final Hive table has the country name alongside each log file entry, along with the IP address and other details.

So - now we’re at the stage where we’ve finished the processing with the Hadoop cluster, and I’d like to copy the final set of data into an Oracle database, so that I can analyse it using Oracle SQL and other tools. In the final installment in this series we’ll do just that, using IKM File/Hive to Oracle (OLH/ODCH) and Oracle Loader for Hadoop, one of the Oracle Big Data Connectors.