End-to-End ODI12c ETL on Oracle Big Data Appliance Pt.1 : Flume to Initial Hive Table
A few months ago I posted an article on the ODI12c examples in the new Oracle Big Data Appliance, and over the past few weeks I’ve been writing about the various components within the Cloudera CDH Hadoop stack, including Hive, Pig, Spark and Flume. Since then I’ve built out a virtualized example of an Oracle Big Data Appliance using the Oracle Big Data 3.0 software set, and I thought it’d be interesting to create an example ETL flow through the system showing how ODI could be used to initiate and control the process. As with any situation where you actually build a demo to do something specific, as opposed to just play around with the technology, you end up uncovering a few quirks and techniques that you wouldn’t have otherwise been aware of, so I’ll spend this week going through the various steps and calling out things others in the same situation might find useful - the steps I’ll go through are below, and I’ll add the links as the articles get published during the week;
- End-to-End ODI12c ETL on Oracle Big Data Appliance Pt.1 : Flume to Initial Hive Table
- End-to-End ODI12c ETL on Oracle Big Data Appliance Pt.2 : Hive Table Joins, Aggregation and Loading
- End-to-End ODI12c ETL on Oracle Big Data Appliance Pt.3 : Enhance with Oracle Reference Data via Sqoop, and CKMs
- End-to-End ODI12c ETL on Oracle Big Data Appliance Pt.4 : Transforming Hive Data using Python & Streaming
- End-to-End ODI12c ETL on Oracle Big Data Appliance Pt.5 : Bulk Unload to Oracle
As an overview, what I’ll be working with is a six-node Hadoop cluster running Cloudera CDH5, Oracle Big Data Connectors 3.0 and Oracle Data Integrator 12c, as you’d get with the current version of Oracle Big Data Appliance. Obviously BDA goes up to eighteen nodes as a maximum, and under the covers there’s lots more memory and much better networking that I was able to set up on VMWare ESXi, but what I’ve got is enough to prove the process. The diagram below shows the six nodes, and where the software is installed.
I took a couple of short-cuts with the setup; obviously each node has a lot less RAM than BDA’s 64GB per node, but the main node in the cluster (bdanode1) running the HDFS NameNode and all the ODI software got bumped-up to 28GB, with the next two allocated 12GB and the others 8GB - enough to work through a demo at least. I also ran ODI Studio on the bdanode1 as well, instead of setting it up on a separate client VM, mainly to avoid having to set up all the Hadoop and Hive libraries on another machine. Other than that though, its the same CDH distribution you get on BDA, the same version of ODI and the Big Data Connectors, and so on, with the following software versions and downloads being used:
- Oracle Enterprise Linux 6.4 64-bit configured to use Oracle Public Yum and the 11gR2 Pre-Install RPM (to set the nodes up for Oracle software such as ODI, Oracle DB client etc)
- Cloudera Manager and CDH5, via the Linux Cloudera Manager command-line installer (cloudera-manager-installer.bin)
- Oracle Data Integrator 12c for Linux x64
- Oracle Big Data Connectors 3.0
Setup of the Hadoop cluster is really out of scope for these articles, except to say that with CDH5, I find it easier to select the (non-default, deprecated) Packages install type rather than the new Parcels type, as this new methods installs all of the Hadoop software on each node in a new place - /opt/cloudera - rather than the usual /usr/lib/hadoop, /usr/lib/hive and so forth, meaning that most configuration examples you’ll read point to the wrong place for your install. Parcels are Cloudera’s way forward for delivering software components (there’s advantages in terms of patching across the cluster) but if you’re getting started with the Hadoop platform, installing in the old location usually makes things easier to follow. Other than that, the two bits of configuration you need to do is firstly to tell ODI Studio where to find that various Hadoop libraries and configuration files; because I’ve installed Studio directly on the main Hadoop node, I could then just add that node’s file locations to the Oracle user’s $HOME/.odi/oracledi/userlib/additional_path.txt file, so that it looked like this:
Also to make use of Oracle Loader for Hadoop, one of the Oracle Big Data Connectors and something we’ll use at the end to bulk-unload data from Hadoop to an Oracle database, the second thing I’ll need to do is set a couple of environment variables in the “oracle” user's .bashrc profile file pointing to where OLH is installed, and where the Hadoop and Hive libraries and configuration files are:
[oracle@bdanode1 userlib]$ cat $HOME/.bashrc # .bashrc # Source global definitions if [ -f /etc/bashrc ]; then . /etc/bashrc fi # User specific aliases and functions export HADOOP_CLASSPATH=/home/oracle/oracle/product/oraloader-3.0.0-h2/jlib/*:/etc/hive/conf export OLH_HOME=/home/oracle/oracle/product/oraloader-3.0.0-h2
The scenario I’ll be working with is similar to the ones I covered on the blog recently, where I landed data from the Rittman Mead blog webserver into HDFS files using Hive, and then processed the files using Hive, Pig, Spark and so on. In this example though, I’ll use ODI to do the data manipulation where possible, but still use Hive and so forth under-the-covers to do the work. The diagram below shows the data flow that i’ll be looking to set up in the example:
So in this scenario incoming data is being landed in the Hadoop cluster by Flume, using the process outlined in this previous blog post. All Flume is is a transport mechanism; it doesn’t in itself have any processing ability (making it analogous to GoldenGate) and all it does it transport log file entries from one place to another, via Flume agents on either end. All we as developers need to be aware of is (a) where the log files will be landed, and (b) that Flume will keep continuously writing to these files until the source log file gets rotated - therefore I can’t assume a log file is completely written to when I read from it.
What this means in practice is that if I want to do incremental loads, I need to consider the fact that a file I’m reading to might have more data in it later on. There’s various solutions to this - principally having Flume write to HBase, rather than raw HDFS files, and then I read from the HBase database noting the last extraction point at the time - but to keep things simple I’ll just do a full-load each time the ETL run takes place, meaning that I don’t need to think about incremental loads throughout the system.
So the first thing I need to do is have ODI take the incoming files and load them into a Hive table. To do this I set up Topology entries for the incoming HFDS files, and here’s the first “gotcha” - to create a connection to HDFS files, you use the regular File technology, but you leave the JDBC driver type blank, and put the address of the HDFS NameNode in to the JDBC URL - which of course is technically invalid and won’t allow you to either test it, or reverse-engineer the file names in, but is a “hack” used by the IKM File to Hive KM to get the address of your HDFS NameNode (if you choose to source the incoming file from HDFS rather than the local filesystem).
Then, when you come to register the physical schemas to go with the new File technology data server, you’ll need to be aware that ODI just appends the final file name to the directory name when retrieving the file data - so if you want the connection to point to a directory, rather than just a file, you’ll need to set up the physical schema to be the directory “above” the one you’re interested in, and then set the file name later on to be that directory. In this example , I want the final file reference to point to hdfs://bdanode1.rittmandev.com/user/oracle/weblog_incoming_files/apache_access_combined, a whole directory (HDFS aggregates all the files in a directory if you reference just the directory in an operation) rather than just a single log file. You can see the directory and the log files it contains in the Hue screenshot below:
I therefore set the physical schema to be hdfs://bdanode1.rittmandev.com/user/oracle/weblog_incoming_files, and the file reference in the datastore model is set to the final directory name, like this:
If it seems a bit confusing, it’ll become clearer in a moment.
Now I need to go back to the Topology navigator and create a connection through to the Hive server on the Big Data Appliance VMs - in fact recent versions of CDH (CDH4, CDH5) swap out the old Hive Server for HiveServer2, so you’ll need to use the correct JDBC drivers (as supplied with CDH4/5) to connect to it, and also create the JDBC URL in the format jdbc:hive2://[machine name:port], as shown inn the screenshot below:
A quick note about security at this point; by default, most CDH4/5 clusters are set up as what’s called “unsecured”, which means that whilst you use a username and login to connect to Cloudera Manager, for example, by this default although Hive and Impala request user credentials when you connect, they don’t actually check the password against anything, for example your LDAP server. You can connect these tools to LDAP or Active Directory, and typically you’d combine this with Kerebos authentication between the various components and when you connect via Impala and Hive, and typically you’d also use Apache Sentry to provide role-based access control to the data within individual HDFS files. But by default, Hive will accept more or less anything as a password when you connect, but then you may hit issues later on when your HDFS files are owned by a different user to the one you connect as.
Where this manifests itself is when a Hive table has underlying HDFS files owned by, say, the “admin” user in CDH5 (because that’s how you connected to Hue to upload them), but then you connect as the “oracle” user through Hive to then manipulate the Hive table contents. In practice, what I try to do is create any Hive tables (using Hue) as the user I’m then going to connect in Hive to them as, which means you’ll most probably need to go into Hue and create a new user called “oracle” (if that’s what you’re running ODI as, and connecting through Hive as) before creating the Hive tables you’ll then import into the ODI topology.
So once you’ve done all that, you can go into the Designer navigator and reverse-engineer the definition of your Hive tables into datastore models. In my case, I’ve got a bunch of tables that I’ll be using throughout the whole ETL process.
Now it’s time for the first ODI mapping, to take the raw log file directory and load it into a Hive table. As it stands though, these raw log files are rows of just a single “column” of data - the log file entry in Apache CombinedLogFormat format. To make them useful to the rest of the ETL process I’ll somehow need to parse them into the individual log file elements, so I create a target Hive table that contains an entry for the raw log entry, and then columns for the various log file elements:
The way that I parse the log file is to use a feature within the IKM File to Hive (LOAD DATA) KM that allows me to specify a regular expressed Serde (Serializer-Deserializer) to parse the log file entry into its individual columns, like this (note that you’ll need to ensure the hive-contrib-* JAR file is available to all of your Hadoop nodes before using this SerDe)
In this instance, I want the KM to leave the source files in-place when doing the data load (Hive by default moves incoming source files into the /user/hive/warehouse directory area) as these files most probably haven’t been finished written-to by Flume yet, so I leave the EXTERNAL_TABLE value set to true (Hive external table, not Oracle external table) and make sure FILE_IS_LOCAL is set to FALSE, so that this KM knows to use the HDFS file location hack I set up in the topology earlier. Then, I just run the mapping and check that it’s worked OK:
and I can check from the Model pane in the Designer navigator that I’ve now got a Hive table of individually split-up log entry columns to work with, for the rest of the downstream ETL process:
So that’s the first stage done - next, I’ll be combining this Hive table with data from another one, using the IKM Hive Control Append KM.