Asynchronous Hotlog Distributed Change Data Capture and OWB “Paris”
A couple of days ago I looked at using the Transportable Modules feature in OWB "Paris" so that I could quickly replicate source data into a data warehouse. Transportable Modules use either transportable tablespaces, or data pump, to bulk move data from a remote source database into the warehouse's staging area. This is one way of getting data into a data warehouse, but the "holy grail" is finding some way of automatically picking up changes in the source database and trickle-feeding them into the warehouse. If you can do this, you can feed your warehouse almost in real-time, and you don't have to change your source application to provide this "changed data".
Most of you are probably aware that a feature to do this, called "Changed Data Capture", has been in the Oracle database since version 9i. The first version of Change Data Capture placed a set of triggers on the tables in the source application, and these triggers propagated inserts, updates and deletes to your data warehouse's staging area. The only problem with this was the "invasive" nature of the approach - the tables in your source application had to be modified to put the triggers on, and the source database transactions couldn't commit until they'd propagated the changes over - hence it's title of "Synchronous" Change Data Capture.
Change Data Capture was enhanced though in Oracle Database 10g in that it became "Asynchronous". Instead of using triggers to pick up and propagate changes, a process on the source database picked up the transactions from the redo log, and used the Streams mechanism in Oracle Database 10g to transmit the changes, asynchronously, to the target environment. By using the redo log and streams, the process was non-invasive and only had minimal impact on the source application.
When you get to Oracle Database 10g Release 2, there are in fact several variations on Asynchronous Change Data Capture. Asynchronous Autolog Change Data Capture uses the database's redo log transport mechanism to propagate changes to the target database, whilst Asynchronous Hotlog Change Data Capture reads from the redo log and uses Oracle Streams to move the data from one schema to another within the same database. The one that's of most interest to us though is Asynchronous Distributed Hotlog Change Data Capture (that's a mouthful), which mines the redo log, uses Oracle Streams, but propages that data changes to a database remote from the source database. This method most resembles the real-life situation that you'd face when maintaining a data warehouse, when your source applications are running on 9iR2 or higher, are on a separate instance or indeed a separate server, and you want to automatically propagate changed data from these applications to your data warehouse.
So how do you set this up then, and more interestingly, how do you make it work with Oracle Warehouse Builder 10gR2 "Paris"?
Well, the reality is that with at least the first release of OWB10gR2, the setting up of Asynchronous Change Data Capture takes place mostly outside of the tool; what you can then do is use OWB "Paris" to pick up the changed data when it arrives at the target data warehouse, and use the Change Data Capture PL/SQL API to request updated data (referred to as "extending the subscription window"), and then mark it as read after it's loaded (known as "purging the subscription window"). In that respect, the technique is just as applicable to OWB10gR1 and earlier, but I'll base the example on OWB "Paris" as that the version I'm working with at the moment.
Before we start though, a quick briefer on Asynchronous Distributed Hotlog Change Data Capture. Change Data Capture (I'll shorten it to CDC from now onwards) uses the concept of publishers, database users that publishes captured changed data, and subscribers, database users that consumes the changed data via subscriptions. CDC makes use of change tables and subscriber views. Subscribers to the changes get their own subscriber views against the change tables so that they look at a consistent set of data. Subscribers can extend and purge their subscription windows, and data can be purged from the change tables if no subscribers include the data in their views anymore. CDC is controlled using calls to PL/SQL database packages that you set up yourself manually when you enable CDC, and then call from OWB transformations when you copy the data into your warehouse staging area. Probably the best guide to Asynchronous Change Data Capture is Mark Van de Weil's "Asynchronous Change Data Capture Cookbook", which gives an excellent background to using the feature including a step-by-step set of instructions for propagating changes from the SCOTT schema. So then, how do we go about using it?
In this example, I want to propagate changes from a 10gR1 database (10.1.0.2) to a 10gR2 (10.2.0.1) database, although my source database could be 9iR2 or higher. The data is in the OEDATA schema (a copy of the OE schema) and I'm looking to pick up changes to the ORDERS and ORDER_ITEMS tables, which I need to capture together as the two tables go together in an order transaction. The source database is actually the infrastructure database for Application Server 10g but for what we're doing, it could be any database.
The first thing we want to do then is set up the source and target databases to work with change data capture. The source database has to be in ARCHIVELOG mode, and both databases have to have GLOBAL_NAMES=TRUE set, and in particular the source database has to have the following parameters set:
- compatible='9.2.0.0.0' or higher
- global_names=TRUE
- job_queue_processes=2
- log_archive_dest_1='LOCATION=<full path>'
- log_archive_format='%t_%s.dbf'
- log_parallelism=1
- open_links=4
- open_links_per_instance=4
- parallel_max_servers=3
- processes=150
- undo_retention=3600
(note that I couldn't set LOG_PARALLELISM=1 on my 10gR1 database as this only existings for 9i, so I left this as it was, and I also couldn't start up the database afterwards when I set the LOG_ARCHIVE_FORMAT to '%t_%s.dbf', it complained about wanting a '%r_' in there as well, so I left that one as the default value.)
The destination database needs to have the following parameters set:
- compatible='10.2.0.1.0'
- global_names=TRUE
- open_links=4
- open_links_per_instance=4
- parallel_max_servers=2
- processes=150
- java_pool_size=50M
- streams_pool_size=50M
I then logged into the source schema and did a CTAS to copy half of the ORDERS table into ORDERS_NEW and then another to copy half of ORDER_ITEMS into ORDER_ITEMS_NEW, so that I could copy them back in again to simulate data coming into the application. The final step was to then set up entries in the TNSNAMES.ORA files for the source and target schemas, to make sure that the source database could communicate with the target database, and vice versa.
[oracle@centos admin]$ cat /u01/app/oracle/product/10.2.0/db_1/network/admin/tnsnames.ora # tnsnames.ora Network Configuration File: /u01/app/oracle/product/10.2.0/db_1/network/admin/tnsnames.ora # Generated by Oracle configuration tools.AS10G =
(DESCRIPTION =
(ADDRESS_LIST =
(ADDRESS = (PROTOCOL = TCP)(HOST = centos)(PORT = 1521))
)
(CONNECT_DATA =
(SERVICE_NAME = as10g.rittman)
)
)ORA10G =
(DESCRIPTION =
(ADDRESS_LIST =
(ADDRESS = (PROTOCOL = TCP)(HOST = centos)(PORT = 1521))
)
(CONNECT_DATA =
(SERVICE_NAME = ora10g)
)
)EXTPROC_CONNECTION_DATA =
(DESCRIPTION =
(ADDRESS_LIST =
(ADDRESS = (PROTOCOL = IPC)(KEY = EXTPROC0))
)
(CONNECT_DATA =
(SID = PLSExtProc)
(PRESENTATION = RO)
)
)[oracle@centos admin]$ cat /u01/app/oracle/product/10.1.2.0.2/OracleAS_1/network/admin/tnsnames.ora
tnsnames.ora Network Configuration File: /u01/app/oracle/product/10.1.2.0.2/OracleAS_1/network/admin/tnsnames.ora
Generated by Oracle configuration tools.
ORA10G =
(DESCRIPTION =
(ADDRESS_LIST =
(ADDRESS = (PROTOCOL = TCP)(HOST = centos.rittman)(PORT = 1521))
)
(CONNECT_DATA =
(SERVICE_NAME = ora10g)
)
)AS10G =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = centos.rittman)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = as10g.rittman)
)
)EXTPROC_CONNECTION_DATA =
(DESCRIPTION =
(ADDRESS_LIST =
(ADDRESS = (PROTOCOL = TCP)(HOST = centos.rittman)(PORT = 1521))
)
(CONNECT_DATA =
(SERVICE_NAME = PLSExtProc)
)
)
The next step then was to create a user on the source database called CDC_SOURCE_PUB that would publish the changed data (AS10g is my source database, ORA10G is the target). This user, which has the DBA role so should be kept secured, is then granted access to the various CDC publishing procedures.
SQL> conn sys/password@as10g as sysdba Connected. SQL> create user cdc_source_pub 2 identified by password 3 default tablespace users 4 temporary tablespace temp 5 quota unlimited on system 6 quota unlimited on users 7 quota unlimited on sysaux 8 /User created.
SQL> grant create session, create table, create database link,
2 select_catalog_role, execute_catalog_role, dba to
3 cdc_source_pub
4 /Grant succeeded.
SQL> grant execute on dbms_aqadm to cdc_source_pub;Grant succeeded.
SQL> grant execute on dbms_capture_adm to cdc_source_pub;
Grant succeeded.
SQL> grant execute on dbms_apply_adm to cdc_source_pub;
Grant succeeded.
SQL> grant execute on dbms_propagation_adm to cdc_source_pub;
Grant succeeded.
SQL> grant execute on dbms_streams_adm to cdc_source_pub;
Grant succeeded.
SQL> begin
2 dbms_rule_adm.grant_system_privilege (
3 privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
4 grantee => 'cdc_source_pub',
5 grant_option => FALSE);
6 dbms_rule_adm.grant_system_privilege (
7 privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
8 grantee => 'cdc_source_pub',
9 grant_option => FALSE);
10 END;
11 /PL/SQL procedure successfully completed.
Once this is done, I then connect to the target data warehouse and create a user called CDC_STG_PUB that subscribes to this stream of changed data. Again this user is granted the roles and priviledges that allow it to use the CDC PL/SQL API.
SQL> conn sys/password@ora10g as sysdba Connected.SQL> create user cdc_stg_pub
2 identified by password
3 default tablespace users
4 temporary tablespace temp
5 quota unlimited on users
6 quota unlimited on system
7 quota unlimited on sysaux
8 /User created.
SQL> grant create session, create table, create sequence,
2 select_catalog_role, execute_catalog_role, create database link, dba
3 to cdc_stg_pub
4 /Grant succeeded.
SQL> grant execute on dbms_aqadm to cdc_stg_pub ;
Grant succeeded.
SQL> grant execute on dbms_capture_adm to cdc_stg_pub ;
Grant succeeded.
SQL> grant execute on dbms_apply_adm to cdc_stg_pub ;
Grant succeeded.
SQL> grant execute on dbms_propagation_adm to cdc_stg_pub ;
Grant succeeded.
SQL> grant execute on dbms_streams_adm to cdc_stg_pub ;
Grant succeeded.
SQL> begin
2 dbms_rule_adm.grant_system_privilege (
3 privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
4 grantee => 'cdc_stg_pub',
5 grant_option => FALSE);
6 dbms_rule_adm.grant_system_privilege (
7 privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
8 grantee => 'cdc_stg_pub',
9 grant_option => FALSE);
10 end ;
11 /PL/SQL procedure successfully completed.
Now I connect to the source database and create a database link to connect through to the target database, then test it out, and then do the same for the target database.
SQL> conn cdc_source_pub/password@as10g Connected. SQL> create database link ora10g 2 connect to cdc_stg_pub 3 identified by password 4 using 'ora10g' 5 /Database link created.
SQL> select * from dual@ora10g
2 ;D
X
SQL> conn cdc_stg_pub/password@ora10g
Connected.
SQL> create database link as10g
2 connect to cdc_source_pub
3 identified by password
4 using 'as10g'
5 /Database link created.
SQL> select * from dual@as10g
2 ;D
X
Note that because you've got GLOBAL_NAMES=TRUE setup, you need to make sure the name of the database link matches the name of the target database exactly. In my case I had to do an alter database rename global_name to ORA10G.RITTMAN (the name of my domain) and the same for the AS10G database before it would work.
SQL> conn system/password@ora10g Connected.SQL> select * from global_name;
GLOBAL_NAME
ORA10G.RITTMAN
SQL> conn system/password@as10g
Connected.
SQL> select * from global_name;GLOBAL_NAME
AS10G.RITTMAN
SQL>
At this point, you've configured the two databases, set up the publishing and subscribing user accounts, and set the permissions so that they can access the CDC PL/SQL API. The next step that needs to be carried out is to set up and enable Change Data Capture.
Asynchronous Change Data Capture works through mining the redo logs on the source database, but for this to work best you have to tell the source database to perform supplemental logging on the tables that you're tracking.
SQL> alter table orders 2 add supplemental log group 3 log_group_orders(order_id) 4 always 5 /Table altered.
SQL> alter table order_items
2 add supplemental log group
3 log_group_order_items(order_id, line_item_id)
4 always
5 /Table altered.
Now, we connect to the CDC publisher account on the source database, and use the API to create the hotlog change source. This step creates an associated, still disabled, Streams capture process, capture queue and queue table on the source database.
SQL> conn cdc_stg_pub/password@ora10g Connected.SQL> begin 2 dbms_cdc_publish.create_hotlog_change_source( 3 change_source_name => 'ord_itm_src', 4 description => 'ORDERS and ORDER_ITEMS source', 5 source_database => 'as10g.rittman') ; -- database link name 6 end; 7 /PL/SQL procedure successfully completed.
I then check that the capture process has been set up correctly.
SQL> col capture_name for a15 SQL> col name for a15 SQL> col queue_table for a15 SQL> col status for a15 SQL> SQL> select cap.capture_name, q.name, qt.queue_table, cap.status 2 from dba_capture@as10g.rittman cap 3 , dba_queues@as10g.rittman q 4 , dba_queue_tables@as10g.rittman qt 5 where cap.queue_owner = 'CDC_SOURCE_PUB' 6 and q.owner = 'CDC_SOURCE_PUB' 7 and qt.owner = 'CDC_SOURCE_PUB' 8 and q.name = cap.queue_name 9 and qt.queue_table = q.queue_table 10 and cap.capture_name like '%ORD_ITM_SRC%' ;CAPTURE_NAME NAME QUEUE_TABLE STATUS
CC$ORD_ITM_SRC CQ$ORD_ITM_SRC CT$ORD_ITM_SRC DISABLED
Next, I create a distributed hotlog change set on the staging database. This will create an associated, still disabled, streams apply process, an apply queue and apply queue table, as well as a Streams propagation definition from the capture stream at the source database.
SQL> begin 2 dbms_cdc_publish.create_change_set( 3 change_set_name => 'ord_items_set', 4 description => 'ORDERS and ORDERITEMS change set', 5 change_source_name => 'ord_itm_src') ; 6 end ; 7 /PL/SQL procedure successfully completed.
I then verify that it was set up correctly.
SQL> select set_name, set_description, change_source_name 2 , apply_name, queue_name, queue_table_name 3 from change_sets 4 where publisher = 'CDC_STG_PUB' 5 and set_name = 'ORD_ITEMS_SET' ;SET_NAME
SET_DESCRIPTION
CHANGE_SOURCE_NAME APPLY_NAME
QUEUE_NAME QUEUE_TABLE_NAME
ORD_ITEMS_SET
ORDERS and ORDERITEMS change set
ORD_ITM_SRC CDC$A_ORD_ITEMS_SET
CDC$Q_ORD_ITEMS_SET CDC$T_ORD_ITEMS_SET
... and then check that the underlying Streams definition was created as part of the CDC API call...
...which shows the disabled streams apply process with it's details. I then check the streams propagation definition on the source AS10g database, by querying the source data dictionary using my database link.SQL> select app.apply_name, q.name, app.status, qt.queue_table 2 from dba_apply app 3 , dba_queues q 4 , dba_queue_tables qt 5 where app.apply_user = 'CDC_STG_PUB' 6 and q.owner = 'CDC_STG_PUB' 7 and qt.owner = 'CDC_STG_PUB' 8 and q.name = app.queue_name 9 and qt.queue_table = q.queue_table 10 and app.apply_name like '%ORD_ITEMS%' ;APPLY_NAME NAME STATUS QUEUE_TABLE
CDC$A_ORD_ITEMS_SET CDC$Q_ORD_ITEMS DISABLED CDC$T_ORD_ITEMS
_SET _SET
SQL> select p.propagation_source_name, p.propagation_name 2 , p.staging_database, p.destination_queue, ps.change_set_name 3 from change_propagations p 4 , change_propagation_sets ps 5 where p.destination_queue_publisher = 'CDC_STG_PUB' 6 and ps.change_set_publisher = 'CDC_STG_PUB' 7 and ps.propagation_source_name = p.propagation_source_name 8 and ps.propagation_name = p.propagation_name 9 and ps.staging_database = p.staging_database 10 and p.propagation_source_name = 'ORD_ITM_SRC' ;PROPAGATION_SOURCE_NAME PROPAGATION_NAME
STAGING_DATABASE
DESTINATION_QUEUE CHANGE_SET_NAME
ORD_ITM_SRC CP$ORD_ITEMS_SET
ORA10G.RITTMAN
CDC$Q_ORD_ITEMS_SET ORD_ITEMS_SETSQL> select propagation_name, source_queue_owner, source_queue_name
2 , destination_queue_owner, destination_queue_name
3 , destination_dblink
4 from dba_propagation@as10g.rittman
5 where destination_queue_owner = 'CDC_STG_PUB'
6 and propagation_name like '%ORD_ITEMS%' ;PROPAGATION_NAME SOURCE_QUEUE_OWNER
SOURCE_QUEUE_NAME DESTINATION_QUEUE_OWNER
DESTINATION_QUEUE_NAME
DESTINATION_DBLINK
CP$ORD_ITEMS_SET CDC_SOURCE_PUB
CQ$ORD_ITM_SRC CDC_STG_PUB
CDC$Q_ORD_ITEMS_SET
ORA10G.RITTMAN
The result shows the propagation from a database queue owned by CDC_SOURCE_PUB to another queue owned by user CDC_STG_PUB on the staging database.
Still with me? Good.
Now, still logged in as the CDC_STG_PUB user on the target database, I create the change capture tables, and grant SELECT on them to the schema that will subscribe to the changes, STAGING.
SQL> begin 2 dbms_cdc_publish.create_change_table( 3 owner => 'cdc_stg_pub', 4 change_table_name => 'orders_ct', 5 change_set_name => 'ord_items_set', 6 source_schema => 'oedata', 7 source_table => 'orders', 8 column_type_list => 'order_id number(12), order_date timestamp(6) with local time zone, order_m ode varchar2(24), customer_id number(6), order_status number(2), order_total number(8,2), sales_rep_ id number(6), promotion_id number(6)', 9 capture_values => 'both', 10 rs_id => 'y', 11 row_id => 'n', 12 user_id => 'n', 13 timestamp => 'y', 14 object_id => 'n', 15 source_colmap => 'n', 16 target_colmap => 'y', 17 options_string => null) ; 18 end ; 19 /PL/SQL procedure successfully completed.
SQL> grant select on orders_ct to staging ;
Grant succeeded.
SQL>
SQL> begin
2 dbms_cdc_publish.create_change_table(
3 owner => 'cdc_stg_pub',
4 change_table_name => 'order_items_ct',
5 change_set_name => 'ord_items_set',
6 source_schema => 'oedata',
7 source_table => 'order_items',
8 column_type_list => 'order_id number(12), line_item_id number(3), product_id number(6), unit_pr
ice number(8,2), quantity number(8)',
9 capture_values => 'both',
10 rs_id => 'y',
11 row_id => 'n',
12 user_id => 'n',
13 timestamp => 'y',
14 object_id => 'n',
15 source_colmap => 'n',
16 target_colmap => 'y',
17 options_string => null) ;
18 end ;
19 /PL/SQL procedure successfully completed.
SQL> grant select on order_items_ct to staging;
Grant succeeded.
Then, I check the change table definitions...
SQL> col change_table_name for a20 SQL> col change_set_name for a20 SQL> col source_schema_name for a20 SQL> col source_table_name for a20 SQL> set linesize 200SQL> select change_table_name, change_set_name
2 , source_schema_name, source_table_name
3 from change_tables
4 where change_table_schema = 'CDC_STG_PUB'
5 and change_set_name = 'ORD_ITEMS_SET'
6 order by change_table_name ;CHANGE_TABLE_NAME CHANGE_SET_NAME SOURCE_SCHEMA_NAME SOURCE_TABLE_NAME
ORDERS_CT ORD_ITEMS_SET OEDATA ORDERS
ORDER_ITEMS_CT ORD_ITEMS_SET OEDATA ORDER_ITEMS
... and then verify the apply rules definition on the target database.
SQL> col streams_name for a21 SQL> col table_owner for a20 SQL> col table_name for a20 SQL> col source_database for a20SQL> select streams_name, streams_type, table_owner, table_name
2 , rule_type, source_database
3 from dba_streams_table_rules
4 where rule_owner = 'CDC_STG_PUB'
5 and table_owner = 'OEDATA'
6 order by table_name, rule_type, streams_type ;STREAMS_NAME STREAMS_TYP TABLE_OWNER TABLE_NAME RULE_TY SOURCE_DATABASE
CDC$A_ORD_ITEMS_SET APPLY OEDATA ORDERS DDL AS10G.RITTMAN
CDC$A_ORD_ITEMS_SET APPLY OEDATA ORDERS DML AS10G.RITTMAN
CDC$A_ORD_ITEMS_SET APPLY OEDATA ORDER_ITEMS DDL AS10G.RITTMAN
CDC$A_ORD_ITEMS_SET APPLY OEDATA ORDER_ITEMS DML AS10G.RITTMAN
The results of this query show four streams, one for DML and one for DDL on each table.
I then verify the capture and propagation rules definitions on the source database, by running the following cross database link statement on the target database.
SQL> select streams_name, streams_type, table_owner 2 , table_name, rule_type, source_database 3 from dba_streams_table_rules@as10g.rittman 4 where rule_owner = 'CDC_SOURCE_PUB' 5 and table_owner = 'OEDATA' 6 order by table_name, rule_type, streams_type ;STREAMS_NAME STREAMS_TYP TABLE_OWNER TABLE_NAME RULE_TY SOURCE_DATABASE
CC$ORD_ITM_SRC CAPTURE OEDATA ORDERS DDL AS10G.RITTMAN
CP$ORD_ITEMS_SET PROPAGATION OEDATA ORDERS DDL AS10G.RITTMAN
CC$ORD_ITM_SRC CAPTURE OEDATA ORDERS DML AS10G.RITTMAN
CP$ORD_ITEMS_SET PROPAGATION OEDATA ORDERS DML AS10G.RITTMAN
CC$ORD_ITM_SRC CAPTURE OEDATA ORDER_ITEMS DDL AS10G.RITTMAN
CP$ORD_ITEMS_SET PROPAGATION OEDATA ORDER_ITEMS DDL AS10G.RITTMAN
CC$ORD_ITM_SRC CAPTURE OEDATA ORDER_ITEMS DML AS10G.RITTMAN
CP$ORD_ITEMS_SET PROPAGATION OEDATA ORDER_ITEMS DML AS10G.RITTMAN8 rows selected.
What I can see here is a CAPTURE and PROPAGATION rule for each table I'm tracking, a total of eight rules.
Now that the publishing and capture processes are set up and in DEFAULT mode, it's time to enable CDC. First of all, activate the change set on the target database...
SQL> begin 2 dbms_cdc_publish.alter_change_set( 3 change_set_name => 'ord_items_set', 4 enable_capture => 'Y') ; 5 end ; 6 /PL/SQL procedure successfully completed.
SQL> select apply_name, status
2 from dba_apply
3 where apply_user = 'CDC_STG_PUB'
4 and apply_name like '%ORD_ITEMS%' ;APPLY_NAME STATUS
CDC$A_ORD_ITEMS_SET ENABLED
...and then activate the change source on the source database.
SQL> begin 2 dbms_cdc_publish.alter_hotlog_change_source( 3 change_source_name => 'ord_itm_src', 4 enable_source => 'Y') ; 5 end; 6 /PL/SQL procedure successfully completed.
SQL> select capture_name, status
2 from dba_capture@as10g.rittman
3 where queue_owner = 'CDC_SOURCE_PUB'
4 and capture_name like '%ORD_ITM%';CAPTURE_NAME STATUS
CC$ORD_ITM_SRC ENABLED
Change Data Capture on the source database, for the ORDERS and ORDER_ITEMS tables in the OEDATA schema, is active, and any DML or DDL changes made to these tables will propagate across to our target database.
The next step is to create a subscription to these changes for our STAGING user, which is the staging area for the data warehouse...
SQL> conn staging/password@ora10g Connected.SQL> begin
2 dbms_cdc_subscribe.create_subscription(
3 change_set_name => 'ord_items_set',
4 description => 'ORDERS and ORDER ITEMS change subscription',
5 subscription_name => 'ord_itm_sub1');
6 end;
7 /PL/SQL procedure successfully completed.
SQL> begin
2 dbms_cdc_subscribe.subscribe(
3 subscription_name => 'ord_itm_sub1',
4 source_schema => 'oedata',
5 source_table => 'order_items',
6 column_list => 'order_id, line_item_id, unit_price, quantity',
7 subscriber_view => 'order_items_chg_view') ;
8 end ;
9 /PL/SQL procedure successfully completed.
SQL> begin
2 dbms_cdc_subscribe.subscribe(
3 subscription_name => 'ord_itm_sub1',
4 source_schema => 'oedata',
5 source_table => 'orders',
6 column_list => 'order_id, order_date, order_mode, customer_id, order_status, order_total, s
ales_rep_id, promotion_id',
7 subscriber_view => 'order_chg_view') ;
8 end ;
9 /PL/SQL procedure successfully completed.
SQL> begin
2 dbms_cdc_subscribe.activate_subscription(
3 subscription_name => 'ord_itm_sub1') ;
4 end ;
5 /PL/SQL procedure successfully completed.
... and then verify this subscription.
SQL> set linesize 200 SQL> col description for a30 SQL> col subscription_name for a20 SQL> col set_name for a10 SQL> set pagesize 100SQL> select s.subscription_name, s.set_name, s.description
2 , st.source_schema_name, st.source_table_name, st.view_name
3 , sc.column_name
4 from user_subscriptions s
5 , user_subscribed_tables st
6 , user_subscribed_columns sc
7 where s.subscription_name = 'ORD_ITM_SUB1'
8 and st.handle = s.handle
9 and sc.handle = s.handle
10 and st.source_schema_name = sc.source_schema_name
11 and st.source_table_name = sc.source_table_name
12 order by st.source_schema_name, st.source_table_name
13 , sc.column_name ;SUBSCRIPTION_NAME SET_NAME DESCRIPTION SOURCE_SCHEMA_NAME SOURCE_TABLE_NAME
ORD_ITM_SUB1 ORD_ITEMS_ ORDERS and ORDER ITEMS change OEDATA ORDERS
SET subscriptionORD_ITM_SUB1 ORD_ITEMS_ ORDERS and ORDER ITEMS change OEDATA ORDERS
SET subscriptionORD_ITM_SUB1 ORD_ITEMS_ ORDERS and ORDER ITEMS change OEDATA ORDERS
SET subscriptionORD_ITM_SUB1 ORD_ITEMS_ ORDERS and ORDER ITEMS change OEDATA ORDERS
SET subscriptionORD_ITM_SUB1 ORD_ITEMS_ ORDERS and ORDER ITEMS change OEDATA ORDERS
SET subscriptionORD_ITM_SUB1 ORD_ITEMS_ ORDERS and ORDER ITEMS change OEDATA ORDERS
SET subscriptionORD_ITM_SUB1 ORD_ITEMS_ ORDERS and ORDER ITEMS change OEDATA ORDERS
SET subscriptionORD_ITM_SUB1 ORD_ITEMS_ ORDERS and ORDER ITEMS change OEDATA ORDERS
SET subscriptionORD_ITM_SUB1 ORD_ITEMS_ ORDERS and ORDER ITEMS change OEDATA ORDER_ITEMS
SET subscriptionORD_ITM_SUB1 ORD_ITEMS_ ORDERS and ORDER ITEMS change OEDATA ORDER_ITEMS
SET subscriptionORD_ITM_SUB1 ORD_ITEMS_ ORDERS and ORDER ITEMS change OEDATA ORDER_ITEMS
SET subscriptionORD_ITM_SUB1 ORD_ITEMS_ ORDERS and ORDER ITEMS change OEDATA ORDER_ITEMS
SET subscription12 rows selected.
Everything should now be set up. To test it out, I connect to the source schema and insert some rows into my tables.
SQL> conn oedata/password@as10g Connected.SQL> select min(order_id), max(order_id)
2 from orders_new
3 /MIN(ORDER_ID) MAX(ORDER_ID)
2400 2458
SQL> insert into orders
2 select *
3 from orders_new
4 where order_id between 2400 and 2410
5 /11 rows created.
SQL> insert into order_items
2 select *
3 from order_items_new
4 where order_id between 2400 and 2410
5 /29 rows created.
SQL> commit;
Commit complete.
Now, if I connect to the CDC_STG_PUB user on the target database, I can see the DML changes that have propagated across (the "I" in the "OP" column is for "INSERT")
SQL> conn cdc_stg_pub/password@ora10g Connected.SQL> set linesize 300
SQL> col order_date for a31SQL> select operation$ operation
2 , to_char(timestamp$, 'dd-mon-yyyy hh24:mi:ss')
3 , order_id
4 , order_date
5 , order_mode
6 , order_status
7 from orders_ct
8 order by timestamp$
9 /OP TO_CHAR(TIMESTAMP$,' ORDER_ID ORDER_DATE ORDER_MODE ORDER_STATUS
I 13-apr-2006 19:28:25 2400 10-JUL-99 01.34.29.559387 AM direct 2
I 13-apr-2006 19:28:25 2401 10-JUL-99 02.22.53.554822 AM direct 3
I 13-apr-2006 19:28:25 2402 02-JUL-99 03.34.44.665170 AM direct 8
I 13-apr-2006 19:28:25 2403 01-JUL-99 04.49.13.615512 PM direct 0
I 13-apr-2006 19:28:25 2404 01-JUL-99 04.49.13.664085 PM direct 6
I 13-apr-2006 19:28:25 2410 24-MAY-00 10.19.51.985501 AM direct 6
I 13-apr-2006 19:28:25 2406 29-JUN-99 04.41.20.098765 AM direct 8
I 13-apr-2006 19:28:25 2407 29-JUN-99 07.03.21.526005 AM direct 9
I 13-apr-2006 19:28:25 2408 29-JUN-99 08.59.31.333617 AM direct 1
I 13-apr-2006 19:28:25 2409 29-JUN-99 09.53.41.984501 AM direct 2
I 13-apr-2006 19:28:25 2405 01-JUL-99 04.49.13.678123 PM direct 511 rows selected.
SQL> select operation$ operation
2 , to_char(timestamp$, 'dd-mon-yyyy hh24:mi:ss')
3 , order_id
4 , line_item_id
5 , product_id
6 , unit_price
7 , quantity
8 from order_items_ct
9 order by timestamp$
10 /OP TO_CHAR(TIMESTAMP$,' ORDER_ID LINE_ITEM_ID PRODUCT_ID UNIT_PRICE QUANTITY
I 13-apr-2006 19:57:48 2400 4 2999 880 16
I 13-apr-2006 19:57:48 2410 4 2995 68 8
I 13-apr-2006 19:57:48 2407 2 2752 86 18
I 13-apr-2006 19:57:48 2406 3 2761 26 19
I 13-apr-2006 19:57:48 2407 3 2761 26 21
I 13-apr-2006 19:57:48 2410 5 3003 2866.6 15
I 13-apr-2006 19:57:48 2410 6 3051 12 21
I 13-apr-2006 19:57:48 2404 2 2808 0 37
I 13-apr-2006 19:57:48 2406 4 2782 62 31
I 13-apr-2006 19:57:48 2400 1 2976 52 4
I 13-apr-2006 19:57:48 2401 1 2492 41 4OP TO_CHAR(TIMESTAMP$,' ORDER_ID LINE_ITEM_ID PRODUCT_ID UNIT_PRICE QUANTITY
I 13-apr-2006 19:57:48 2402 1 2536 75 8
I 13-apr-2006 19:57:48 2403 1 2522 44 5
I 13-apr-2006 19:57:48 2404 1 2721 85 6
I 13-apr-2006 19:57:48 2405 1 2638 137 9
I 13-apr-2006 19:57:48 2406 1 2721 85 5
I 13-apr-2006 19:57:48 2407 1 2721 85 5
I 13-apr-2006 19:57:48 2408 1 2751 61 3
I 13-apr-2006 19:57:48 2409 1 2810 6 8
I 13-apr-2006 19:57:48 2410 1 2976 46 10
I 13-apr-2006 19:57:48 2400 2 2982 41 1
I 13-apr-2006 19:57:48 2401 2 2496 268.4 3OP TO_CHAR(TIMESTAMP$,' ORDER_ID LINE_ITEM_ID PRODUCT_ID UNIT_PRICE QUANTITY
I 13-apr-2006 19:57:48 2406 2 2725 3.3 4
I 13-apr-2006 19:57:48 2408 2 2761 26 1
I 13-apr-2006 19:57:48 2400 3 2986 123 4
I 13-apr-2006 19:57:48 2410 2 2982 40 5
I 13-apr-2006 19:57:48 2410 3 2986 120 6
I 13-apr-2006 19:57:48 2408 3 2783 10 10
I 13-apr-2006 19:57:48 2400 5 3003 2866.6 1929 rows selected.
Then I connect as the STAGING user and check the ORDER_CHG_VIEW, which is empty as I haven't called the procedure to extend the subscription window. Then I extend the window, check the view again, and my changed data is in the view. Finally, I purge the window, check the view again, and this time it's empty. Jackpot.
SQL> conn staging/password@ora10g Connected.SQL> select * from order_chg_view;
no rows selected
SQL> begin
2 dbms_cdc_subscribe.extend_window(
3 subscription_name => 'ord_itm_sub1');
4 end;
5 /PL/SQL procedure successfully completed.
SQL> select operation$ operation
2 , to_char(timestamp$, 'dd-mon-yyyy hh24:mi:ss')
3 , order_id
4 , order_mode
5 , order_status
6 from order_chg_view
7 order by timestamp$
8 /OP TO_CHAR(TIMESTAMP$,' ORDER_ID ORDER_MODE ORDER_STATUS
I 13-apr-2006 19:28:25 2400 direct 2
I 13-apr-2006 19:28:25 2401 direct 3
I 13-apr-2006 19:28:25 2402 direct 8
I 13-apr-2006 19:28:25 2403 direct 0
I 13-apr-2006 19:28:25 2404 direct 6
I 13-apr-2006 19:28:25 2410 direct 6
I 13-apr-2006 19:28:25 2406 direct 8
I 13-apr-2006 19:28:25 2407 direct 9
I 13-apr-2006 19:28:25 2408 direct 1
I 13-apr-2006 19:28:25 2409 direct 2
I 13-apr-2006 19:28:25 2405 direct 511 rows selected.
SQL> begin
2 dbms_cdc_subscribe.purge_window(
3 subscription_name => 'ord_itm_sub1');
4 end;
5 /PL/SQL procedure successfully completed.
SQL> select operation$ operation
2 , to_char(timestamp$, 'dd-mon-yyyy hh24:mi:ss')
3 , order_id
4 , order_mode
5 , order_status
6 from order_chg_view
7 order by timestamp$
8 /no rows selected
SQL>
So, that's Asynchronous Hotlog Change Data Capture working with my two databases. But how do I make it work with Oracle Warehouse Builder "Paris", or indeed any version of OWB?
It's fairly straightforward actually, now that the hard work has been done outside of the tool. The way that we'll integrate it with OWB is firstly, to import the two views (ORDER_CHG_VIEW and ORDER_ITEMS_CHG_VIEW) into OWB, and then create two procedures to firstly extend the subscription window, and secondly to purge it. Then, we'll create a mapping, bring the two source views into the mapping, join them and use them to load into our warehouse staging area, and add pre- and post-mapping procedures into the mapping to call the window extending and purging.
Starting off with the views then, we import them into our STAGING module, along with the table we'll load the data into.
Now we'll define the window extend procedure...
... and the window purge procedure.
Once that's done, we can then define the mapping, adding the pre- and post-mapping procedures to call our CDC transformations.
To give it some data to work with, I connect to the source schema and insert some data.
SQL> conn oedata/password@as10g Connected. SQL> insert into orders 2 select * 3 from orders_new 4 where order_id between 2421 and 2430 5 /10 rows created.
SQL> insert into order_items
2 select *
3 from order_items_new
4 where order_id between 2421 and 2430
5 /69 rows created.
SQL> commit;
Commit complete.
Then it's just a case of deploying it using the Control Center, and then running it.
Finally, to check that it's worked, we view the CUSTOMER_ORDERS table in the Data Viewer...
... and run a double-check using SQL*Plus.
SQL> conn staging/password@ora10g Connected. SQL> select * from customer_orders;ORDER_ID ORDER_DATE ORDER_MO CUSTOMER_ID ORDER_STATUS ORDER_TOTAL SALES_REP_ID PROMOTION_ID L
2422 16-DEC-99 10.19.55.462332 PM direct 144 2 11188.5 153 2426 17-NOV-98 02.22.11.262552 AM direct 148 6 7200 2427 10-NOV-99 03.34.22.362124 AM direct 149 7 9055 163 2428 10-NOV-99 04.41.34.463567 AM direct 116 8 14685.8 2429 10-NOV-99 05.49.25.526321 AM direct 117 9 50125 154 2421 12-MAR-99 09.53.54.562432 PM direct 109 1 72836 2422 16-DEC-99 10.19.55.462332 PM direct 144 2 11188.5 153 2423 21-NOV-99 12.22.33.362632 PM direct 145 3 10367.7 160 2426 17-NOV-98 02.22.11.262552 AM direct 148 6 7200 2427 10-NOV-99 03.34.22.362124 AM direct 149 7 9055 163 2428 10-NOV-99 04.41.34.463567 AM direct 116 8 14685.8
ORDER_ID ORDER_DATE ORDER_MO CUSTOMER_ID ORDER_STATUS ORDER_TOTAL SALES_REP_ID PROMOTION_ID L
2429 10-NOV-99 05.49.25.526321 AM direct 117 9 50125 154 2430 02-OCT-99 06.18.36.663332 AM direct 101 8 29669.9 159 2421 12-MAR-99 09.53.54.562432 PM direct 109 1 72836 2422 16-DEC-99 10.19.55.462332 PM direct 144 2 11188.5 153 2423 21-NOV-99 12.22.33.362632 PM direct 145 3 10367.7 160 2424 21-NOV-99 12.22.33.263332 PM direct 146 4 13824 153 2425 17-NOV-98 01.34.22.162552 AM direct 147 5 1500.8 163 2426 17-NOV-98 02.22.11.262552 AM direct 148 6 7200 2427 10-NOV-99 03.34.22.362124 AM direct 149 7 9055 163 2428 10-NOV-99 04.41.34.463567 AM direct 116 8 14685.8 2429 10-NOV-99 05.49.25.526321 AM direct 117 9 50125 154
ORDER_ID ORDER_DATE ORDER_MO CUSTOMER_ID ORDER_STATUS ORDER_TOTAL SALES_REP_ID PROMOTION_ID L
2430 02-OCT-99 06.18.36.663332 AM direct 101 8 29669.9 159 2421 12-MAR-99 09.53.54.562432 PM direct 109 1 72836 2422 16-DEC-99 10.19.55.462332 PM direct 144 2 11188.5 153 2423 21-NOV-99 12.22.33.362632 PM direct 145 3 10367.7 160 2424 21-NOV-99 12.22.33.263332 PM direct 146 4 13824 153 2425 17-NOV-98 01.34.22.162552 AM direct 147 5 1500.8 163 2427 10-NOV-99 03.34.22.362124 AM direct 149 7 9055 163 2428 10-NOV-99 04.41.34.463567 AM direct 116 8 14685.8 2429 10-NOV-99 05.49.25.526321 AM direct 117 9 50125 154 2430 02-OCT-99 06.18.36.663332 AM direct 101 8 29669.9 159 2421 12-MAR-99 09.53.54.562432 PM direct 109 1 72836
ORDER_ID ORDER_DATE ORDER_MO CUSTOMER_ID ORDER_STATUS ORDER_TOTAL SALES_REP_ID PROMOTION_ID L
2422 16-DEC-99 10.19.55.462332 PM direct 144 2 11188.5 153 2424 21-NOV-99 12.22.33.263332 PM direct 146 4 13824 153 2425 17-NOV-98 01.34.22.162552 AM direct 147 5 1500.8 163 2428 10-NOV-99 04.41.34.463567 AM direct 116 8 14685.8 2429 10-NOV-99 05.49.25.526321 AM direct 117 9 50125 154 2430 02-OCT-99 06.18.36.663332 AM direct 101 8 29669.9 159 2421 12-MAR-99 09.53.54.562432 PM direct 109 1 72836 2426 17-NOV-98 02.22.11.262552 AM direct 148 6 7200 2428 10-NOV-99 04.41.34.463567 AM direct 116 8 14685.8 2429 10-NOV-99 05.49.25.526321 AM direct 117 9 50125 154 2430 02-OCT-99 06.18.36.663332 AM direct 101 8 29669.9 159
ORDER_ID ORDER_DATE ORDER_MO CUSTOMER_ID ORDER_STATUS ORDER_TOTAL SALES_REP_ID PROMOTION_ID L
2421 12-MAR-99 09.53.54.562432 PM direct 109 1 72836 2422 16-DEC-99 10.19.55.462332 PM direct 144 2 11188.5 153 2427 10-NOV-99 03.34.22.362124 AM direct 149 7 9055 163 2428 10-NOV-99 04.41.34.463567 AM direct 116 8 14685.8 2429 10-NOV-99 05.49.25.526321 AM direct 117 9 50125 154 2421 12-MAR-99 09.53.54.562432 PM direct 109 1 72836 2422 16-DEC-99 10.19.55.462332 PM direct 144 2 11188.5 153 2423 21-NOV-99 12.22.33.362632 PM direct 145 3 10367.7 160 2427 10-NOV-99 03.34.22.362124 AM direct 149 7 9055 163 2428 10-NOV-99 04.41.34.463567 AM direct 116 8 14685.8 2429 10-NOV-99 05.49.25.526321 AM direct 117 9 50125 154
ORDER_ID ORDER_DATE ORDER_MO CUSTOMER_ID ORDER_STATUS ORDER_TOTAL SALES_REP_ID PROMOTION_ID L
2423 21-NOV-99 12.22.33.362632 PM direct 145 3 10367.7 160 2427 10-NOV-99 03.34.22.362124 AM direct 149 7 9055 163 2428 10-NOV-99 04.41.34.463567 AM direct 116 8 14685.8 2429 10-NOV-99 05.49.25.526321 AM direct 117 9 50125 154 2423 21-NOV-99 12.22.33.362632 PM direct 145 3 10367.7 160 2426 17-NOV-98 02.22.11.262552 AM direct 148 6 7200 2428 10-NOV-99 04.41.34.463567 AM direct 116 8 14685.8 2422 16-DEC-99 10.19.55.462332 PM direct 144 2 11188.5 153 2423 21-NOV-99 12.22.33.362632 PM direct 145 3 10367.7 160 2428 10-NOV-99 04.41.34.463567 AM direct 116 8 14685.8 2429 10-NOV-99 05.49.25.526321 AM direct 117 9 50125 154
ORDER_ID ORDER_DATE ORDER_MO CUSTOMER_ID ORDER_STATUS ORDER_TOTAL SALES_REP_ID PROMOTION_ID L
2421 12-MAR-99 09.53.54.562432 PM direct 109 1 72836 2422 16-DEC-99 10.19.55.462332 PM direct 144 2 11188.5 153 2421 12-MAR-99 09.53.54.562432 PM direct 109 1 72836
69 rows selected.
SQL>
So there you go. Although it seems a bit scary at first, you just have to work through the instructions and set up the publisher user on one side, and the subscriber on the other. I'd imagine in a future version of OWB the setting up of CDC will be done using a wizard, but it worked first time for me this way (after a bit of fiddling around following the setting of GLOBAL_NAMES=TRUE) and once set up, it works with little intervention.
Next week, in the final installment of this series, I'll be looking at how OWB "Paris" handles slowly changing dimensions. In the meantime, here's a few other articles I've written on Paris: