Loading, Updating and Deleting From HBase Tables using HiveQL and Python
Earlier in the week I blogged about a customer looking to offload part of the data warehouse platform to Hadoop, extracting data from a source system and then incrementally loading data into HBase and Hive before analysing it using OBIEE11g. One of the potential complications for this project was that the fact and dimension tables weren’t append-only; Hive and HDFS are generally considered write-once, read-many systems where data is inserted or appended into a file or table but generally then can’t be updated or overwritten without deleting the whole file and writing it again with the updated dataset.
To get around this problem we loaded our incoming data into HBase tables, a NoSQL key/value-store database that also runs on Hadoop and HDFS but permits update and delete operations on rows as well as selects and inserts; later on we took the main fact table stored in Hive-on-HBase and copied its contents into Impala to considerably improve the response time of queries against this tables and the still-Hive-on-HBase dimension tables, but going back to the insert-update-delete operations on the HBase tables, how exactly does this work and what’s the most efficient way to do it?
Taking a step back for a moment, HBase is a NoSQL, key/value-type database where each row has a key (for example, “SFO” for San Francisco airport) and then a number of columns, grouped into column families. In the Flight Delays dataset that we used in the previous blog post, an HBase of origin airports might have a few thousand entries with each entry, or row, keyed on a particular airport code like this:
(Note that at the start, these key values won’t be there - they’re more for illustrative purposes)
At the time of HBase table definition, you specify one or more “column families”. These are group headers for columns you might add earlier, and in the case of my origin airport table I might just use the column family name “dest”, so that the HBase table DDL looks like this:
create 'geog_origin','origin'
and the conceptual view of the table would look like this:
Now what’s neat about NoSQL-style databases like this (and Endeca Server is the same) is that you can define individual columns just by using them. For example, I could create columns for the airport name, airport city, airport state and airport code just by using their name in a data load, prefixing those column names with the named of a previously-defined column family. Using the HBase Shell, for example, I could issue the following PUT commands to insert the first row of data into this HBase table, like this:
put 'geog_origin’,’SFO','origin:airport_name','San Francisco, CA: San Francisco' put 'geog_origin’,’SFO','origin:city’,’San Francisco, CA' put 'geog_origin’,’SFO',’origin':state','California' put 'geog_origin’,'SFO',’origin':id’,'14771'
Now my HBase table conceptually looks like this:
If I then want to use another column under the “origin” column family for LAX, I can just do so by using it in the next set of PUT commands, like this:
put 'geog_origin','LAX’,origin:airport_name','Los Angeles, CA: Los Angeles' put 'geog_origin','LAX','origin:city','Los Angeles, CA' put 'geog_origin','LAX','origin:state','California' put 'geog_origin','LAX','origin:region’,’West Coast' put 'geog_origin','LAX','origin:id','12892'
Each column within column families has its values individually set, retrieved and deleted using PUT, GET and DELETE commands, and as long as you prefix the column name with one of the previously-defined column-family names and provide the key value for the row you’re interested in, HBase database tables are very flexible and were designed for simple product catalog-type applications running on hundreds of sharded server nodes for companies of the likes of Amazon, Google and Facebook (see this HBase “Powered-by” page for more examples of organizations using HBase).
But what HBase very much isn’t is a relational database like Oracle, Microsoft SQL server or even Apache Hive, databases that we’re much more likely to store data warehouse-type data in. In the previous post I showed how Hive table structures can in-fact be put over HBase tables, mapping HBase columns to Hive columns, and then HiveQL INSERT INTO TABLE … SELECT commands can be used to bulk-load these HBase tables with initial sets of data. So back to the original question - what’s the best way to then incrementally load and refresh these HBase tables, and I can I still use HiveQL for this?
In my original post, I defined Hive tables over my HBase ones using the Hive-on-Hbase (yum install hive-hbase) package and associated Hive storage handler; for example, the Hive table that provided SQL access over the flight_delays HBase tables was defined like this:
ADD JAR /usr/lib/hive/lib/zookeeper.jar; ADD JAR /usr/lib/hive/lib/hive-hbase-handler.jar; ADD JAR /usr/lib/hive/lib/guava-11.0.2.jar; ADD JAR /usr/lib/hive/lib/hbase-client.jar; ADD JAR /usr/lib/hive/lib/hbase-common.jar; ADD JAR /usr/lib/hive/lib/hbase-hadoop-compat.jar; ADD JAR /usr/lib/hive/lib/hbase-hadoop2-compat.jar; ADD JAR /usr/lib/hive/lib/hbase-protocol.jar; ADD JAR /usr/lib/hive/lib/hbase-server.jar; ADD JAR /usr/lib/hive/lib/htrace-core.jar; CREATE EXTERNAL TABLE hbase_flight_delays (key string, year string, carrier string, orig string, dest string, flights string, late string, cancelled string, distance string ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,dims:year,dims:carrier,dims:orig,dims:dest,measures:flights,measures:late,measures:cancelled,measures:distance") TBLPROPERTIES ("hbase.table.name" = "test1_flight_delays");
With the underlying HBase table defined with a key and two column families, one for dimension columns and one for fact (measure) ones - the key is a sequence number that I added to the source dataset to give each row a unique identifier.
create ‘test1_flight_delays','dims','measures'
To initially populate the table, I’ve created another Hive table with the initial set of source data in it, and I just insert its values in to the Hive-on-HBase table, like this:
insert into table hbase_flight_delays select * from flight_delays_initial_load; Total jobs = 1 ... Total MapReduce CPU Time Spent: 11 seconds 870 msec OK Time taken: 40.301 seconds
This initial load of 200,000 rows in this instance took 40 seconds to load; not bad, certainly acceptable for this particular project. Imagine now for every day after this we typically added another 500 or so flight records; in regular Hive this would be straightforward and we’d use the LOAD DATA or INSERT INTO TABLE … SELECT commands to add new file data to the Hive table’s underlying HDFS directories. And we can do this with the Hive-on-HBase table too, with the INSERT INTO TABLE command adding the incoming data to new rows/cells in the HBase table. Checking the row count and min/max ID values in the Hive-on-HBase table at the start, like this:
select count(*), min(cast(key as bigint)) as min_key, max(cast(key as bigint)) as max_key from hbase_flight_delays; Total jobs = 1 ... Total MapReduce CPU Time Spent: 14 seconds 660 msec OK 200000 1 200000 Time taken: 53.076 seconds, Fetched: 1 row(s)
I can see that there’s 200,000 rows in the HBase table, starting at key value 1 and ending at key value 200,000. The table containing new data has key values going from 200,001 to 200,500, so let’s insert that new data into the Hive-on-HBase table:
insert into table hbase_flight_delays select * from flight_delays_daily_update_500_rows; Total jobs = 1 ... Total MapReduce CPU Time Spent: 3 seconds 870 msec OK Time taken: 26.368 seconds
Not bad - 26 seconds for the 500 rows, not quite as fast as the initial load but acceptable. Let’s also check that the data went in OK:
select count(*), min(cast(key as bigint)) as min_key, max(cast(key as bigint)) as max_key from hbase_flight_delays; Total jobs = 1 ... Total MapReduce CPU Time Spent: 13 seconds 580 msec OK 200500 1 200500 Time taken: 44.552 seconds, Fetched: 1 row(s)
As I’d hoped, the number of rows has increased by 500 and the maximum key value is now 200,500. But how do we apply updates to the data in the table? I’ve got another source table that this time contains 1,000 randomly-selected rows from the initial data load dataset, where I’ve set the LATE column value to ‘999’:
hive> select * from flight_delays_daily_changes_1000_rows > limit 5; OK 21307 2008 WN BDL BWI 1 999 1 283 136461 2008 OO ORD TYS 0 999 1 475 107768 2008 WN BWI ORF 0 999 1 159 102393 2008 OO SLC ACV 0 999 1 635 110639 2008 WN BOI OAK 0 999 1 511 Time taken: 0.139 seconds, Fetched: 5 row(s)
In fact the way you apply these updates is just to INSERT INTO TABLE … SELECT again, and the incoming values create new versions of existing rows/cells if needed. Some versions of HBase automatically keep a number of versions of each cell value (typically 3 versions), however the version of HBase that comes with CDH5.2 and higher only keeps one version by default (you can increase this number per table, or system wide, using the steps in the CDH5.2 release notes). Let’s try this out now, first using the HBase shell to see the values and timestamps currently held for one particular key value I know should by updated by the next dataset:
hbase(main):029:0> get 'test1_flight_delays', '102393' COLUMN CELL dims:carrier timestamp=1432236609421, value=OO dims:dest timestamp=1432236609421, value=ACV dims:orig timestamp=1432236609421, value=SLC dims:year timestamp=1432236609421, value=2008 measures:cancelled timestamp=1432236609421, value=1 measures:distance timestamp=1432236609421, value=635 measures:flights timestamp=1432236609421, value=0 measures:late timestamp=1432236609421, value=0 8 row(s) in 0.0330 seconds
I’ll now use Hive to apply the updates, like this:
insert into table hbase_flight_delays select * from flight_delays_daily_changes_1000_rows; Total jobs = 1 ... Total MapReduce CPU Time Spent: 4 seconds 340 msec OK Time taken: 24.805 seconds select count(*), min(cast(key as bigint)) as min_key, max(cast(key as bigint)) as max_key from hbase_flight_delays; Total jobs = 1 ... Total MapReduce CPU Time Spent: 13 seconds 430 msec OK 200500 1 200500 Time taken: 47.379 seconds, Fetched: 1 row(s)
Notice how this third INSERT didn’t create any new rows, the max key ID in the follow-up query hasn’t increased since the previous insert of new data. Querying one of the rows that I know was changed by this new table of data updates, I can see that the LATE column value has been changed:
select * from hbase_flight_delays where key = '102393'; Total jobs = 1 ... Total MapReduce CPU Time Spent: 3 seconds 600 msec OK 102393 2008 OO SLC ACV 0 999 1 635
Let’s go into the HBase shell now and take a look at the columns cells for that same key ID:
hbase(main):030:0> get 'test1_flight_delays', '102393' COLUMN CELL dims:carrier timestamp=1432236723680, value=OO dims:dest timestamp=1432236723680, value=ACV dims:orig timestamp=1432236723680, value=SLC dims:year timestamp=1432236723680, value=2008 measures:cancelled timestamp=1432236723680, value=1 measures:distance timestamp=1432236723680, value=635 measures:flights timestamp=1432236723680, value=0 measures:late timestamp=1432236723680, value=999 8 row(s) in 0.0800 seconds
Notice how the timestamp for each of the cells has now updated? If I had more than the default 1 version of each cell enabled, I could query the previous versions to see the old values and timestamps. So this works pretty well, and all I need to do is use HiveQL and INSERT INTO TABLE … SELECT to initially populate, append to and even update values in the table. But what If I want to update HBase more “programmatically”, maybe as part of a process that reads directly from a source application (for example, Salesforce or a web service) and then writes directly into HBase without the intermediate step of landing the incoming data into a file? For this we can use the HBase Client API of which there are libraries for many languages with the most popular being the Java API. If Java is too much though and you’d rather interact with HBase using a language such as Python, as this Cloudera blog post explains you can use either a REST API interface to HBase or one using the Thrift interface and work with languages such as Python.
In my case, my preferred way of programatically working with HBase is to use Python and a developer library called Happybase, where I can also bring in other libraries such as ones to work with Hive and even ones to work with OBIEE and Fusion Middleware and do my work at a much higher-level of abstraction. To show how this might work, I’m going to use Python, the HBase Client API and Happybase to programatically read from my update Hive tables (in real-life I’d probably connect directly to a web service if going down this more complicated route) and write a routine to read rows from the Hive table and load them into HBase.
Again I’m using the Oracle Big Data Lite 4.1 VM which has Python 2.7.6 already installed, and to get ready to install the Happybase library I first need to install pip, the “preferred installer program” for Python. As per the pip installation instructions, first download pip and then install it from the command-line:
sudo python get-pip.py
Then use Pip to install Happybase
sudo pip install happybase
Whist you’re there you might as well install “pyhs2”, another python package that in this case lets us easily connect to Hive tables via the HiveServer2 interface found on CDH5+ and the Big Data Lite 4.1 VM.
sudo pip install pyhs2
Now I can put together a Python program such as the one below, that in this case creates a connection to a Hive table, selects all rows from it into a cursor and then PUTs these rows into the HBase table, via a batch process that sends data to HBase via the Thrift interface every 10,000 rows:
import pyhs2 import happybase connection = happybase.Connection('bigdatalite') flight_delays_hbase_table = connection.table('test1_flight_delays') b = flight_delays_hbase_table.batch(batch_size=10000) with pyhs2.connect(host='bigdatalite', port=10000, authMechanism="PLAIN", user='oracle', password='welcome1', database='default') as conn: with conn.cursor() as cur: #Execute query cur.execute("select * from flight_delays_initial_load") #Fetch table results for i in cur.fetch(): b.put(str(i[0]),{'dims:year': i[1], 'dims:carrier': i[2], 'dims:orig': i[3], 'dims:dest': i[4], 'measures:flights': i[5], 'measures:late': i[6], 'measures:cancelled': i[7], 'measures:distance': i[8]}) b.send()
which I can then run from the command-line like this:
[oracle@bigdatalite ~]$ python ./load_update_flight_delays.py
As I said, using this approach I could just as easily connect to a web service or read in data via Flume or Kafka, and I can delete rows as well as insert/update them and add any other logic. From my testing it’s not all that faster than going via HiveQL and INSERT INTO TABLE … SELECT scripts (most probably because I’m still going into HBase indirectly, via the Thrift interface) but it does offer the possibility of direct inserts into HBase (and therefore Hive) from the source application without the intermediate step of writing files to disk.
So to finish this short series, tomorrow I’ll look at how well these Hive-on-HBase tables, and the Impala table I created in the previous example, work when queried from OBIEE11g. Back tomorrow.