Using HBase and Impala to Add Update and Delete Capability to Hive DW Tables, and Improve Query Response Times

One of our customers is looking to offload part of their data warehouse platform to Hadoop, extracting data out of a source system and loading it into Apache Hive tables for subsequent querying using OBIEE11g. One of the challenges that the project faces though is how to handle updates to dimensions (and in their case, fact table records) when HDFS and Hive are typically append-only filesystems; ideally writes to fact tables should only require INSERTs and filesystem appends but in this case they wanted to use an accumulating fact snapshot table, whilst the dimension tables all used SCD1-type attributes that had their values overwritten when updates to those values came through from the source system.

The obvious answer then was to use Apache HBase as part of the design, a NoSQL database that sits over HDFS but allows updates and deletes to individual rows of data rather than restricting you just to append/inserts. I covered HBase briefly on the blog a few months ago when we used it to store webserver log entries brought into Hadoop via Flume, but in this case it makes an ideal landing point for data coming into our Hadoop system as we can maintain a current-state record of the data brought into the source system updating and overwriting values if we need to. What was also interesting to me though was how well we could integrate this HBase data into our mainly SQL-style data processing; how much Java I’d have to use to work with HBase, and whether we could get OBIEE to connect to the HBase tables and query them directly (with a reasonable response time). In particular, could we use the Hive-on-HBase feature to create Hive tables over the HBase ones, and then query those efficiently using OBIEE, so that the data flow looked like this?

To test this idea out, I took the Flight Delays dataset from the OBIEE11g SampleApp & Exalytics demo data [PDF] and created four HBase tables to hold the data from them, using the BigDataLite 4.1 VM and the HBase Shell. This dataset has four tables:

  • FLIGHT_DELAYS - around 220m US flight records listing the origin airport, destination airport, carrier, year and a bunch of metrics (flights, late minutes, distance etc)
  • GEOG_ORIGIN - a list of all the airports in the US along with their city, state, name and so on
  • GEOG_DEST - a copy of the GEOG_ORIGIN table, used for filtering and aggregating on both origin and destination 
  • CARRIERS - a list of all the airlines associated with flights in the FLIGHT_DELAYS table

HBase is a NoSQL, key/value-store database where individual rows have a key, and then one or more column families made up of one or more columns. When you define a HBase table you only define the column families, and the data load itself creates the columns within them in a similar way to how the Endeca Server holds “jagged” data - individual rows might have different columns to each other and like MongoDB you can define a new column just by loading it into the database.

Using the HBase Shell CLI on the BigDataLite VM I therefore create the HBase tables using just these high-level column family definitions, with the individual columns within the column families to be defined later when I load data into them.

hbase shell
 
create 'carriers','details'
create 'geog_origin','origin'
create 'geog_dest','dest'
create 'flight_delays','dims','measures'

To get data into HBase tables there’s a variety of methods you can use. Most probably for the full project we’ll write a Java application that uses the HBase client to read, write, update and delete rows that are read in from the source application (see this previous blog post for an example where we use Flume as the source), or to set up some example data we can use the HBase Shell and enter the HBase row/cell values directly, like this for the geog_dest table:

put 'geog_dest','LAX','dest:airport_name','Los Angeles, CA: Los Angeles'
put 'geog_dest','LAX','dest:airport_name','Los Angeles, CA: Los Angeles'
put 'geog_dest','LAX','dest:city','Los Angeles, CA'
put 'geog_dest','LAX','dest:state','California'
put 'geog_dest','LAX','dest:id','12892'

and you can then use the “scan” command from the HBase shell to see those values stored in HBase’s key/value store, keyed on LAX as the key.

hbase(main):015:0> scan 'geog_dest'
ROW                                    COLUMN+CELL                                                                                                     
 LAX                                   column=dest:airport_name, timestamp=1432067861347, value=Los Angeles, CA: Los Angeles                           
 LAX                                   column=dest:city, timestamp=1432067861375, value=Los Angeles, CA                                                
 LAX                                   column=dest:id, timestamp=1432067862018, value=12892                                                            
 LAX                                   column=dest:state, timestamp=1432067861404, value=California                                                    
1 row(s) in 0.0240 seconds

For testing purposes though we need a large volume of rows and entering them all in by-hand isn’t practical, so this is where we start to use the Hive integration that now comes with HBase. For the BigDataLite 4.1 VM all you need to do to get this working is install the hive-hbase package using yum (after first installing the Cloudera CDH5 repo into /etc/yum.repos.d), load the relevant JAR files when starting your Hive shell session, and then create a Hive table over the HBase table mapping Hive columns to the relevant HBase ones, like this:

hive
 
ADD JAR /usr/lib/hive/lib/zookeeper.jar;
ADD JAR /usr/lib/hive/lib/hive-hbase-handler.jar;
ADD JAR /usr/lib/hive/lib/guava-11.0.2.jar;
ADD JAR /usr/lib/hive/lib/hbase-client.jar;
ADD JAR /usr/lib/hive/lib/hbase-common.jar;
ADD JAR /usr/lib/hive/lib/hbase-hadoop-compat.jar;
ADD JAR /usr/lib/hive/lib/hbase-hadoop2-compat.jar;
ADD JAR /usr/lib/hive/lib/hbase-protocol.jar;
ADD JAR /usr/lib/hive/lib/hbase-server.jar;
ADD JAR /usr/lib/hive/lib/htrace-core.jar;
 
CREATE EXTERNAL TABLE hbase_carriers
 (key string,
  carrier_desc string
) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES 
("hbase.columns.mapping" = ":key,details:carrier_desc")
TBLPROPERTIES ("hbase.table.name" = "carriers");
 
CREATE EXTERNAL TABLE hbase_geog_origin
 (key string,
  origin_airport_name string,
  origin_city string,
  origin_state string,
  origin_id string
) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES 
("hbase.columns.mapping" = ":key,origin:airport_name,origin:city,origin:state,origin:id")
TBLPROPERTIES ("hbase.table.name" = "geog_origin");
 
CREATE EXTERNAL TABLE hbase_geog_dest
 (key string,
  dest_airport_name string,
  dest_city string,
  dest_state string,
  dest_id string
) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES 
("hbase.columns.mapping" = ":key,dest:airport_name,dest:city,dest:state,dest:id")
TBLPROPERTIES ("hbase.table.name" = "geog_dest");
 
CREATE EXTERNAL TABLE hbase_flight_delays
 (key string,
  year string,
  carrier string,
  orig string,
  dest string,
  flights tinyint,
  late   tinyint,
  cancelled bigint,
  distance smallint
) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES 
("hbase.columns.mapping" = ":key,dims:year,dims:carrier,dims:orig,dims:dest,measures:flights,measures:late,measures:cancelled,measures:distance")
TBLPROPERTIES ("hbase.table.name" = "flight_delays");

Bulk loading data into these Hive-on-HBase tables is then just a matter of loading the source data into a regular Hive table, and then running INSERT INTO TABLE … SELECT commands to copy the regular Hive rows into the HBase tables via their Hive metadata overlays:

insert into table hbase_carriers                           
select carrier, carrier_desc from carriers;
 
insert into table hbase_geog_origin
select * from geog_origin;
 
insert into table hbase_geog_dest
select * from geog_dest;
 
insert into table hbase_flight_delays
select row_number() over (), * from flight_delays;

Note that I had to create a synthetic sequence number key for the fact table, as the source data for that table doesn’t have a unique key for each row - something fairly common for data warehouse fact table datasets. In fact storing fact table data into a HBase table is not a very good idea for a number of reasons that we’ll see in a moment, and bear-in-mind that HBase is designed for sparse datasets and low-latency inserts and row retrievals so don’t read too much into this approach yet.

So going back to the original reason for using HBase to store these tables, updating rows within them is pretty straightforward. Taking the geog_origin HBase table at the start, if we get the row for SFO at the start using a Hive query over the HBase table, it looks like this:

hive> select * from hbase_geog_origin where key = 'SFO'; 
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
...
SFO   San Francisco, CA: San Francisco   San Francisco, CA   California   14771
Time taken: 29.126 seconds, Fetched: 1 row(s)

To update that row and others, I can load a new data file into the Hive table using HiveQL’s LOAD DATA command, or INSERT INTO TABLE … SELECT from another Hive table containing the updates, like this:

insert into table hbase_geog_origin    
select * from origin_updates;

To check that the value has in-fact updated I can either run the same SELECT query against the Hive table over the HBase one, or drop into the HBase shell and check it there:

hbase(main):001:0> get 'geog_origin','SFO'
COLUMN                                 CELL                                                                                                           
 origin:airport_name                   timestamp=1432050681685, value=San Francisco, CA: San Francisco International                                  
 origin:city                           timestamp=1432050681685, value=San Francisco, CA                                                               
 origin:id                             timestamp=1432050681685, value=14771                                                                           
 origin:state                          timestamp=1432050681685, value=California                                                                      
4 row(s) in 0.2740 seconds

In this case the update file/Hive table changed the SFO airport name from "San Francisco” to “San Francisco International”. I can change it back again using the HBase Shell like this, if I want:

put 'geog_origin','SFO','origin:airport_name','San Francisco, CA: San Francisco'

and then checking it again using the HBase Shell’s GET command on that key value shows it’s back to the old value - HBase actually stores X number of versions of each cell with a timestamp for each version, but by default it shows you the current one:

hbase(main):003:0> get 'geog_origin','SFO'
COLUMN                                 CELL                                                                                                           
 origin:airport_name                   timestamp=1432064747843, value=San Francisco, CA: San Francisco                                                
 origin:city                           timestamp=1432050681685, value=San Francisco, CA                                                               
 origin:id                             timestamp=1432050681685, value=14771                                                                           
 origin:state                          timestamp=1432050681685, value=California                                                                      
4 row(s) in 0.0130 seconds

So, so far so good. We’ve got a way of storing data in Hive-type tables on Hadoop and a way of updating and amending records within them by using HBase as the underlying storage, but what are these tables like to query? Hive-on-HBase tables with just a handful of HBase rows return data almost immediately, for example when I create a copy of the geog_dest HBase table and put just a single row entry into it, then query it using a Hive table over it:

hive> select * from hbase_geog_dest2;
OK
LAXLos Angeles, CA: Los AngelesLos Angeles, CACalifornia12892
Time taken: 0.257 seconds, Fetched: 1 row(s)

Hive in this case even with a single row would normally take 30 seconds or more to return just that row; but when we move up to larger datasets such as the flight delays fact table itself, running a simple row count on the Hive table and then comparing that to the same query running against the Hive-on-HBase version shows a significant time-penalty for the HBase version:

hive> select sum(cast(flights as bigint)) as flight_count from flight_delays;
Total jobs = 1
Launching Job 1 out of 1
...
Total MapReduce CPU Time Spent: 7 seconds 670 msec
OK
29483653
Time taken: 37.327 seconds, Fetched: 1 row(s)

compared to the Hive-on-HBase version of the fact table:

hive> select sum(cast(flights as bigint)) as flight_count from hbase_flight_delays;
Total jobs = 1
Launching Job 1 out of 1
...
Total MapReduce CPU Time Spent: 1 minutes 19 seconds 240 msec
OK
21473738
Time taken: 99.154 seconds, Fetched: 1 row(s)

And that’s to be expected; as I said earlier, HBase is aimed at low-latency single-row operations rather than full table scan, aggregation-type queries, so it’s not unexpected that HBase performs badly here, but the response time is even worse if I try and join the HBase-stored Hive fact table to one or more of the dimension tables also stored in HBase.

In our particular customer example though these HBase tables were only going to be loaded once-a-day, so what if we copy the current version of each HBase table row into a snapshot Hive table stored in regular HDFS storage, so that our data loading process looks like this:

and then OBIEE queries the snapshot of the Hive-on-HBase table joined to the dimension table still stored in HBase, so that the query side looks like this:

Let’s try it out by taking the original Hive table I used earlier on to load the hbase_flight_delays table. and join that to one of the Hive-on-HBase dimension tables; I’ll start first by creating a baseline response time by joining that source Hive fact table to the source Hive dimension table (also used earlier to load the corresponding Hive-on-HBase table):

select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from flight_delays f 
join geog_origin o on f.orig = o.origin                                                             
and o.origin_state = 'California'                                                                       
group by o.origin_airport_name; 
...
OK
17638Arcata/Eureka, CA: Arcata
9146Bakersfield, CA: Meadows Field
125433Burbank, CA: Bob Hope
...
1653Santa Maria, CA: Santa Maria Public/Capt. G. Allan Hancock Field
Time taken: 43.896 seconds, Fetched: 27 row(s)

So that’s just under 44 seconds to do the query entirely using regular Hive tables. So what if I swap-out the regular Hive dimension table for the Hive-on-HBase version, how does that affect the response time?

hive> select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from flight_delays f       
    > join hbase_geog_origin o on f.orig = o.key                                                        
    > and o.origin_state = 'California'                                                                 
    > group by o.origin_airport_name;
...
OK
17638Arcata/Eureka, CA: Arcata
9146Bakersfield, CA: Meadows Field
125433Burbank, CA: Bob Hope
...
1653Santa Maria, CA: Santa Maria Public/Capt. G. Allan Hancock Field
Time taken: 51.757 seconds, Fetched: 27 row(s)

That’s interesting - even though we used the (updatable) Hive-on-HBase dimension table in the query, the response time only went up a few seconds to 51, compared to the 44 when we used just regular Hive tables. Taking it one step further though, what if we used Cloudera Impala as our query engine and copied the Hive-on-HBase fact table into a Parquet-stored Impala table, so that our inward data flow looked like this:

By using the Impala MPP engine - running on Hadoop but directly reading the underlying data files, rather than going through MapReduce as Hive does - and in-addition storing its data in column-store query-orientated Parquet storage, we can take advantage of OBIEE 11.1.1.9’s new support for Impala and potentially bring the query response time even further. Let’s go into the Impala Shell on the BigDataLite 4.1 VM, update Impala’s view of the Hive Metastore table data dictionary, and then create the corresponding Impala snapshot fact table using a CREATE TABLE … AS SELECT Impala SQL command:

[oracle@bigdatalite ~]$ impala-shell
 
[bigdatalite.localdomain:21000] > invalidate metadata;
 
[bigdatalite.localdomain:21000] > create table impala_flight_delays
                                > stored as parquet
                                > as select * from hbase_flight_delays;

Now let’s use the Impala Shell to join the Impala version of the flight delays table with data stored in Parquet files, to the Hive-on-HBase dimension table created earlier within our Hive environment:

[bigdatalite.localdomain:21000] > select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from impala_flight_delays f
                                > join hbase_geog_origin o on f.orig = o.key
                                > and o.origin_state = 'California'  
                                > group by o.origin_airport_name;
Query: select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from impala_flight_delays f
join hbase_geog_origin o on f.orig = o.key
and o.origin_state = 'California'
group by o.origin_airport_name
+--------------+------------------------------------------------------------------+
| flight_count | origin_airport_name                                              |
+--------------+------------------------------------------------------------------+
| 31907        | Fresno, CA: Fresno Yosemite International                        |
| 125433       | Burbank, CA: Bob Hope                                            |
...
| 1653         | Santa Maria, CA: Santa Maria Public/Capt. G. Allan Hancock Field |
+--------------+------------------------------------------------------------------+
Fetched 27 row(s) in 2.16s

Blimey - 2.16 seconds, compared to the best time of 44 seconds we go earlier when we just used regular Hive tables, let alone join to the dimension table stored in HBase. Let’s crank-it-up a bit and join another dimension table in, filtering on both origin and destination values:

[bigdatalite.localdomain:21000] > select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from impala_flight_delays f
                                > join hbase_geog_origin o on f.orig = o.key
                                > join hbase_geog_dest d on f.dest = d.key
                                > and o.origin_state = 'California'  
                                > and d.dest_state = 'New York'
                                > group by o.origin_airport_name;
Query: select sum(cast(f.flights as bigint)) as flight_count, o.origin_airport_name from impala_flight_delays f
join hbase_geog_origin o on f.orig = o.key
join hbase_geog_dest d on f.dest = d.key
and o.origin_state = 'California'
and d.dest_state = 'New York'
group by o.origin_airport_name
+--------------+-------------------------------------------------------+
| flight_count | origin_airport_name                                   |
+--------------+-------------------------------------------------------+
| 947          | Sacramento, CA: Sacramento International              |
| 3880         | San Diego, CA: San Diego International                |
| 4030         | Burbank, CA: Bob Hope                                 |
| 41909        | San Francisco, CA: San Francisco International        |
| 3489         | Oakland, CA: Metropolitan Oakland International       |
| 937          | San Jose, CA: Norman Y. Mineta San Jose International |
| 41407        | Los Angeles, CA: Los Angeles International            |
| 794          | Ontario, CA: Ontario International                    |
| 4176         | Long Beach, CA: Long Beach Airport                    |
+--------------+-------------------------------------------------------+
Fetched 9 row(s) in 1.48s

Even faster. So that’s what we’ll be going with as our initial approach for the data loading and querying; load data into HBase tables as planned at the start, taking advantage of HBase’s CRUD capabilities but bulk-loading and initially reading the data using Hive tables over the HBase ones; but then, before we make the data available for querying by OBIEE, we copy the current state of the HBase fact table into a Parquet-stored Impala table, using Impala’s ability to work with Hive tables and metadata and create joins across both Impala and Hive tables, even when one of the Hive tables uses HBase as its underlying storage.