SQL table macros 12: compare tables or queries

The GROUP BY method for comparing table data just turned 18. To celebrate, let’s make it a macro.

My second ever blog post described a method, popularised by Tom Kyte, for comparing the contents of tables or queries. As part of my COMP_SYNC package, the COMPARE_SQL function returns a SELECT statement that uses that method. With a macro, instead of returning the statement I can just execute it and compare tables, views and / or named subqueries.

[UPDATE 2022-02-25 : there is now an optional parameter to indicate a primary or unique key.]

  • I call the two data sources “rowsets”: one is “old”, the other is “new”.
  • Optional excluded columns are excluded from the comparison and the output.
  • Optional key columns must identify all rows uniquely.
    • With key columns, output column “Z#_OP” indicates the type of change:
      • I: Insert into old (present in new, not old)
      • D: Delete from old (present in old, not new)
      • U: Updated data (as present in new)
      • O: Old data (as present in old)
    • Without key columns, the output contains only ‘D’ and ‘I’ rows, and column “Z#_CNT” shows the number of rows to be deleted or inserted.
  • The output contains the values used for the comparison, which may differ from the actual values of the data if the data types do not allow direct comparison (LOBs for example).

The code makes heavy use of my SQM_UTIL package to configure the SELECT template. Depending on the data type of each column, an expression is applied to allow comparisons. For example, LOB column content is replaced by a hash and user-defined types are replaced by JSON.

create or replace function compare_rowsets(
  p_old_table in dbms_tf.table_t,
  p_new_table in dbms_tf.table_t,
  p_key_cols in dbms_tf.columns_t default null,
  p_exclude_cols in dbms_tf.columns_t default null
) return clob sql_macro is
/*
Compares tables, views or named subqueries; one is "old", one is "new".
Optional excluded columns are excluded from the comparison and the output.

The output contains the values used for the comparison, which may differ
from the actual values of the data if the data types do not allow
direct comparison (LOBs for example).

Column "Z#_OP" indicates the type of change:
- I: Insert into old (present in new, not old)
- D: Delete from old (present in old, not new)
- U: Updated data (as present in new)
- O: Old data (as present in old)

If present, key column(s) must identify all rows uniquely.

Without key columns, the output contains only 'D' and 'I' rows,
and column "Z#_CNT" shows the number of rows to be deleted or inserted.
*/
  l_col_column_names_old long;
  l_col_comparables_old long;
  l_col_comparables_new long;
  l_col_keys long;
  l_sql clob;
begin
  sqm_util.col_column_names(p_old_table, l_col_column_names_old, p_exclude_cols);
  sqm_util.col_comparables(p_old_table, l_col_comparables_old, p_exclude_cols);
  sqm_util.col_comparables(p_new_table, l_col_comparables_new, p_exclude_cols);
  
  if p_key_cols is null then
  
    l_sql :=
'select /*+ qb_name(COMPARE) */
  decode(sign(sum(Z#_NEW_CNT)), 1, ''I'', ''D'') Z#_OP,
  abs(sum(Z#_NEW_CNT)) Z#_CNT,
  '|| l_col_column_names_old ||'
FROM (
  select /*+ qb_name(old) */
  '|| l_col_comparables_old ||'
    , -1 Z#_NEW_CNT
  from p_old_table O
  union all
  select /*+ qb_name(new) */
  '|| l_col_comparables_new ||'
    , 1 Z#_NEW_CNT
  from p_new_table N
)
group by
  '|| l_col_column_names_old ||'
having sum(Z#_NEW_CNT) != 0';

  else
    sqm_util.list_columns(p_key_cols, l_col_keys);
    l_sql :=

'select /*+ qb_name(COMPARE) */
  case count(*) over(partition by
    '|| l_col_keys ||'
  ) - Z#_NEW_CNT
    when 0 then ''I''
    when 1 then ''U''
    when 2 then ''D''
    when 3 then ''O''
  end Z#_OP,
  '|| l_col_column_names_old ||'
FROM (
  select
    '|| l_col_column_names_old ||',
    sum(Z#_NEW_CNT) Z#_NEW_CNT
  FROM (
    select /*+ qb_name(old) */
    '|| l_col_comparables_old ||',
    -1 Z#_NEW_CNT
    from p_old_table O
    union all
    select /*+ qb_name(new) */
    '|| l_col_comparables_new ||',
    1 Z#_NEW_CNT
    from p_new_table N
  )
  group by
    '|| l_col_column_names_old ||'
  having sum(Z#_NEW_CNT) != 0
)';

  end if;
  --dbms_output.put_line(l_sql);
  return l_sql;
end compare_rowsets;
/
drop table emp2 purge;
 
create table emp2 as select * from emp;
 
update emp2 set ename = ename||' KONG' where rownum = 1;
 
insert into emp2 
select EMPNO+1000, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO
from emp2 where rownum <= 4 and job != 'PRESIDENT';
 
select * from compare_rowsets(
  emp, emp2, 
  p_exclude_cols => columns(comm)
)
order by empno, z#_op;
Z#_OP Z#_CNT EMPNO ENAME JOB MGR HIREDATE SAL DEPTNO
D 1 7839 KING PRESIDENT   1981-11-17 00:00:00 5000 10
I 1 7839 KING KONG PRESIDENT   1981-11-17 00:00:00 5000 10
I 1 8566 JONES MANAGER 7839 1981-04-02 00:00:00 2975 20
I 1 8698 BLAKE MANAGER 7839 1981-05-01 00:00:00 2850 30
I 1 8782 CLARK MANAGER 7839 1981-06-09 00:00:00 2450 10
I 1 8788 SCOTT ANALYST 7566 1987-04-19 00:00:00 3000 20
 
Advertisement

Actually Seeing the Differences

When comparing data between two tables, it’s one thing to query the differences and another thing to actually see them.

I have two tables EMP and EMP2. Here are the rows that are different. Quick, what columns have changed?

EMPNO TBL ROW_CNT ENAME JOB MGR SAL COMM
7369 EMP 2 SMITH CLERK 7902 800
7369 EMP2 2 SMITE CLERK 7902 800
7499 EMP 2 ALLEN SALESMAN 7698 1600 300
7499 EMP2 2 ALLEN SALESGUY 7698 1600 300
7521 EMP 2 WARD SALESMAN 7698 1250 500
7521 EMP2 2 WARD SALESMAN 7788 1250 500
7654 EMP 2 MARTIN SALESMAN 7698 1250 1400
7654 EMP2 2 MARTIN SALESMAN 7698 1750 1400
7698 EMP 2 BLAKE MANAGER 7839 2850
7698 EMP2 2 BLAKE MANAGER 7839 2850 1000
7788 EMP2 1 SCOTT ANALYST 7566 3000
7902 EMP 1 FORD ANALYST 7566 3000

 

I thought so. Now suppose I blank out the columns that are the same in both tables?

EMPNO TBL ROW_CNT ENAME JOB MGR SAL COMM
7369 EMP 2 SMITH
7369 EMP2 2 SMITE
7499 EMP 2 SALESMAN
7499 EMP2 2 SALESGUY
7521 EMP 2 7698
7521 EMP2 2 7788
7654 EMP 2 1250
7654 EMP2 2 1750
7698 EMP 2
7698 EMP2 2 1000
7788 EMP2 1 SCOTT ANALYST 7566 3000
7902 EMP 1 FORD ANALYST 7566 3000

 

That’s better. Now, how can I do that?

Comparing columns across rows

I need to compare two columns in two different rows, and I need the result of the comparison in each row. That sounds like a job for analytic functions. It would be hard to use LAG() or LEAD() because one row would need LEAD() and the other would need LAG(). I finally came up with a way to use COUNT().

For testing, I created a little table T:

TEST_ID TBL N STATUS
1 A 1 Same
1 B 1 Same
2 A 1 Different
2 B 2 Different
3 A 1 Different
3 B Different
4 A 2 Different
4 B 1 Different
5 A 2 Same
5 B 2 Same
6 A 2 Different
6 B Different
7 A Different
7 B 1 Different
8 A Different
8 B 2 Different
9 A Same
9 B Same

 

If I use COUNT(DISTINCT N) I should get either 1 (same values) or 2 (different values) and I’m done: wrong! When I count N or DISTINCT N, null values don’t count. So I thought of comparing COUNT(N) and COUNT(DISTINCT N).

select t.*,
count(distinct n) over(partition by test_id) cnt_distinct,
count(n) over(partition by test_id) cnt
from t
order by 4,5,6,1,2;
TEST_ID TBL N STATUS CNT_DISTINCT CNT
3 A 1 Different 1 1
3 B Different 1 1
6 A 2 Different 1 1
6 B Different 1 1
7 A Different 1 1
7 B 1 Different 1 1
8 A Different 1 1
8 B 2 Different 1 1
2 A 1 Different 2 2
2 B 2 Different 2 2
4 A 2 Different 2 2
4 B 1 Different 2 2
9 A Same 0 0
9 B Same 0 0
1 A 1 Same 1 2
1 B 1 Same 1 2
5 A 2 Same 1 2
5 B 2 Same 1 2

 

It looks like the most concise test for a difference is COUNT(N) BETWEEN 1 and COUNT(DISTINCT N).

select t.*,
count(distinct n) over(partition by test_id) cnt_distinct,
count(n) over(partition by test_id) cnt,
case when count(n) over(partition by test_id)
  between 1 and count(distinct n) over(partition by test_id)
  then 'Different' else 'Same' end new_status
from t
order by 4,5,6,1,2;
TEST_ID TBL N STATUS CNT_DISTINCT CNT NEW_STATUS
3 A 1 Different 1 1 Different
3 B Different 1 1 Different
6 A 2 Different 1 1 Different
6 B Different 1 1 Different
7 A Different 1 1 Different
7 B 1 Different 1 1 Different
8 A Different 1 1 Different
8 B 2 Different 1 1 Different
2 A 1 Different 2 2 Different
2 B 2 Different 2 2 Different
4 A 2 Different 2 2 Different
4 B 1 Different 2 2 Different
9 A Same 0 0 Same
9 B Same 0 0 Same
1 A 1 Same 1 2 Same
1 B 1 Same 1 2 Same
5 A 2 Same 1 2 Same
5 B 2 Same 1 2 Same

 

Finally, I apply this technique to each non-PK column in my EMP comparison and I get the desired result.

select EMPNO,
  case NEW_CNT when 1 then 'EMP2' else 'EMP' end tbl,
  ROW_CNT,
  case when count(ENAME) over(partition by EMPNO)
    between 1 and count(distinct ENAME) over(partition by EMPNO)
    then ENAME end ENAME,
  case when count(JOB) over(partition by EMPNO)
    between 1 and count(distinct JOB) over(partition by EMPNO)
    then JOB end JOB,
  case when count(MGR) over(partition by EMPNO)
    between 1 and count(distinct MGR) over(partition by EMPNO)
    then MGR end MGR,
  case when count(SAL) over(partition by EMPNO)
    between 1 and count(distinct SAL) over(partition by EMPNO)
    then SAL end SAL,
  case when count(COMM) over(partition by EMPNO)
    between 1 and count(distinct COMM) over(partition by EMPNO)
    then COMM end COMM
FROM (
  select
    EMPNO, ENAME, JOB, MGR, SAL, COMM,
    sum(NEW_CNT) NEW_CNT, count(*) over(partition by EMPNO) ROW_CNT
  FROM (
    select 
    EMPNO, ENAME, JOB, MGR, SAL, COMM,
    -1 NEW_CNT
    from EMP O
    union all
    select
    EMPNO, ENAME, JOB, MGR, SAL, COMM,
    1 NEW_CNT
    from emp2 N
  )
  group by
    EMPNO, ENAME, JOB, MGR, SAL, COMM
  having sum(NEW_CNT) != 0
)
order by 1, 2, new_cnt;

Conclusion

By using two COUNT() analytic functions, I can tell whether two columns in two different rows are the same or not, considering two NULLs to be “the same”. This allows me to compare rows, then to compare columns and blank out all but the true differences.

COMP_SYNC 2: exclude surrogate keys

At the recent ILOUG conference, Sabine Heimsath asked how to compare two tables where the surrogate keys do not match. Here’s how, using my revised comparison package.

Test data

drop table o purge;
create table o (
  pk number generated always as identity primary key,
  val1 number,
  val2 number
);
insert into o(val1, val2)
select level, level from dual connect by level <= 10;

drop table n purge;
create table n (
  pk number generated always as identity start with 42 primary key,
  val1 number,
  val2 number
);
insert into n(val1, val2)
select level+1, level+1 from dual connect by level <= 10;

 

Simple compare: the COMPARE_SQL function

If you exclude a column from the comparison, the SQL from this function will also exclude that column from the output. If there are duplicate rows with the same data, they are grouped together in the output, with a count of the number of rows.

select comp_sync.compare_sql('o','n',p_exclude_cols=>'pk') from dual;

select /*+ qb_name(COMPARE) */
  "VAL1", "VAL2",
  decode(sign(sum(Z##NEW_CNT)), 1, 'I', 'D') Z##OP,
  abs(sum(Z##NEW_CNT)) Z##CNT
FROM (
  select /*+ qb_name(old) */
  "VAL1", "VAL2"
    , -1 Z##NEW_CNT
  from O O
  union all
  select /*+ qb_name(new) */
  "VAL1", "VAL2"
    , 1 Z##NEW_CNT
  from n N
)
group by
  "VAL1", "VAL2"
having sum(Z##NEW_CNT) != 0
order by 1, Z##OP;
VAL1 VAL2 Z##OP Z##CNT
1 1 D 1
11 11 I 1

 

Detailed compare: the CDC_SQL function

The SQL from this function will do the comparison you want, but it will return all the involved rows and all the columns.

select comp_sync.cdc_sql('o','n',p_exclude_cols=>'pk') from dual;

select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    "PK", "VAL1", "VAL2",
    case
      when Z##NEW = 1
        and sum(Z##NEW) over(partition by
          "VAL1", "VAL2"
        order by null rows unbounded preceding) > sum(Z##OLD) over(partition by
          "VAL1", "VAL2"
        )
        then 'I'
      when Z##OLD = 1
        and sum(Z##OLD) over(partition by
          "VAL1", "VAL2"
        order by null rows unbounded preceding) > sum(Z##NEW) over(partition by
          "VAL1", "VAL2"
        )
        then 'D'
    end Z##OP, Z##RID
  FROM (
    select /*+ qb_name(old) */
    "PK", "VAL1", "VAL2",
    1 Z##OLD, 0 Z##NEW, rowid Z##RID
    from O O
    union all
    select /*+ qb_name(new) */
    "PK", "VAL1", "VAL2",
    0, 1, null
    from n N
  )
)
where Z##OP is not null;
PK VAL1 VAL2 Z##OP Z##RID
1 1 1 D AAAX/cAAZAAAEfGA
51 11 11 I

 

SYNC_SQL: synchronizing the data

This will generate a MERGE statement that assumes you want to insert new rows into the “old” table with the same key as the “new” table. This is almost certainly not what you want, but all you have to do is adjust the INSERT part manually. In this case, the surrogate key is generated automatically so we just need to remove that column from the INSERT clause.

select comp_sync.sync_sql('o','n',p_exclude_cols=>'pk') from dual;

merge /*+ qb_name(SYNC_PARTITION) USE_NL(O) */ into (
  select /*+ qb_name(target) */
    "PK", "VAL1", "VAL2", rowid Z##RID
  from O
) O
using (
select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    "PK", "VAL1", "VAL2",
    case
      when Z##NEW = 1
        and sum(Z##NEW) over(partition by
          "VAL1", "VAL2"
        order by null rows unbounded preceding) > sum(Z##OLD) over(partition by
          "VAL1", "VAL2"
        )
        then 'I'
      when Z##OLD = 1
        and sum(Z##OLD) over(partition by
          "VAL1", "VAL2"
        order by null rows unbounded preceding) > sum(Z##NEW) over(partition by
          "VAL1", "VAL2"
        )
        then 'D'
    end Z##OP, Z##RID
  FROM (
    select /*+ qb_name(old) */
    "PK", "VAL1", "VAL2",
    1 Z##OLD, 0 Z##NEW, rowid Z##RID
    from O O
    union all
    select /*+ qb_name(new) */
    "PK", "VAL1", "VAL2",
    0, 1, null
    from n N
  )
)
where Z##OP is not null
) N
on (
  O.Z##RID = n.Z##RID
)
when matched then update set
  "VAL1"=N."VAL1"
  delete where N.Z##OP = 'D'
when not matched then insert (
  --"PK", "VAL1", "VAL2"
  "VAL1", "VAL2"
) values(
  --N."PK", N."VAL1", N."VAL2"
  N."VAL1", N."VAL2"
);

2 rows merged.

COMP_SYNC 1: a new table compare/sync package

I have been meaning to update my COMPARE_SYNC package for some time. I want to change the interface and the functionality a bit, so I am leaving the existing package alone and creating a new one called COMP_SYNC.

If you have used the old package, I would greatly appreciate any feedback on the new version: functionality, performance, bugs, etc. Comment away and thanks in advance.

What COMP_SYNC does for you

The package returns CLOBs containing SQL statements for you to adjust / test / execute. It uses CDC (Change Data Capture) format, with a flag (Z##OP) on each row with ‘I’ for insert, ‘U’ for update and ‘D’ for delete.

  • COMPARE_SQL: COMPARE_SQL returns SQL that compares new source and old target using Tom Kyte’s GROUP BY method. Omitted columns are not compared and do not appear in the output.
    • ‘D’ rows are in “old” but not in “new”.
    • ‘I’ rows are in “new” but not in “old”.
      Since there may be duplicates, Z##CNT has the number of rows involved.
  • CDC_SQL: compares an “old” table (not a view) to “new”. You can exclude columns from the comparison, but the output shows entire rows with all columns, including the ROWID of the “old” row. For every ‘U’ row there is a corresponding ‘O’ (for “old”) row with the old values.
  • SYNC_SQL: compares and syncs from source to target: inserts, updates and deletes.
    Works with any combination of key and non-key columns.
  • SYNC_UPSERT_SQL: inserts and updates but no deletes. Works only when there are both key and non-key columns.
  • SYNC_CDC_SQL: directly applies changes from a CDC table such as returned by CDC_SQL.

Parameter changes

If you have already used COMPARE_SYNC, here is what changed:

  • Columns are now in comma-separated lists and not in little SYS.ODCIVARCHAR2LIST tables.
  • Table names and column names are converted to upper case unless you put them in double quotes.
  • P_EXCLUDE_COLS replaces P_ALL_COLS: if you want to exclude columns from the comparison just list them here, instead of having to list all the columns you want to include.
  • P_PREFIX replaces P_OPERATION_COL: I use a few column names in addition to the actual tables, so the prefix is now applied to all of them to avoid collisions with your names.

The code

[Update 2018-02-13: added source code files]

This site does not allow upload of source code, so I had to add a “.doc” suffix.

comp_sync-pks.doc : package specification, rename to comp_sync.pks

comp_sync-pkb.doc : package body, rename to comp_sync.pkb

create or replace package COMP_SYNC
authid current_user as
/*
COMP_SYNC generates SQL for comparing or synchronizing
"old" target and "new" source.
 
- "Old" can be a table or view, local or remote.
  Indicate separately the "old" owner, "old" table and "old" dblink.
  To compare two queries, create a view to use as the "old".
  To sync, "old" must be a table but I do not check that for you.
- "New" can be local, remote, table, view or a query enclosed in parentheses.
  Examples: 'SCOTT.EMP', 'T_SOURCE@DBLINK', '(select * from SCOTT.EMP@DBLINK)'
 
Note: I never check the "new" source for validity.
I only check the "old" target for validity when I look up columns from the data dictionary.
So the generated SQL is not guaranteed to run without error!
   
The generated SQL is returned as a CLOB.
 
To debug, change the value of G_DOLOG to true. See the beginning of the package body.
 
INPUT PARAMETERS:

-- Required
  
P_OLD_TABLE  : name of the target table or view. Must exist in the database.
 
P_NEW_SOURCE : source table or view - or query enclosed in parentheses.

-- Optional
 
P_OLD_OWNER  : owner of the target. Must exist in the database.
  The default is null, which assumes the current user.
 
P_EXCLUDE_COLS   : optional comma-separated list of columns to OMIT from the comparison.
  If you leave out P_EXCLUDE_COLS, every non-virtual column will be compared,
  both visible and invisible.
  If you omit a PK column, the tables are considered not to have a primary key.
 
P_KEY_COLS : optional comma-separated list of primary key columns.
  This overrides the default search for PK columns in ALL_CONS_COLUMNS.
   
P_OLD_DBLINK : dblink to the target database.
  The default is null, which means the target is in the local database.
   
P_PREFIX : prefix to the names of the columns such as the CDC flag
  ('D', 'I', 'U' or 'O' for the "old" rows being updated).
  When syncing, I delete the rows marked 'D' and ignore the rows marked 'O'.
  The default prefix is 'Z##'.
 
Pre 2018-02-01:
  See the COMPARE_SYNC package.
2018-02-01: Major overhaul
    - Parameters reordered to have most usual first
    - P_EXCLUDE_COLS (to exclude some columns) replaces P_ALL_COLS (that included columns).
    - P_OPERATION_COL is replaced by P_PREFIX that begins all column names I make up.
    - P_EXCLUDE_COLS and P_KEY_COLS are now comma-separated lists and not little tables.
    - User, table and column names are now upper cased unless within double quotes.
    - Instead of passing a huge record among internal procedures,
      I now use global variables. So sue me!
    - CDC output rows include the ROWID of the target table, which is used for efficient syncing.
*/
/*
COMPARING:
 
COMPARE_SQL returns SQL that compares new source and old target
using Tom Kyte's GROUP BY method.
Omitted columns are not compared and do not appear in the output.
'D' rows are in "old" but not in "new".
'I' rows are in "new" but not in "old".
Since there may be duplicates, Z##CNT has the number of rows involved.

Example:
  select COMP_SYNC.COMPARE_SQL('T_TARGET', 'T_SOURCE') from DUAL;
*/
  function COMPARE_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;
/*
CDC_SQL produces CDC output: 'D', 'I', 'U' - or 'O' for the "old" rows being updated.
The output includes the ROWID of the target, except when 'I'.

Example:
  select COMP_SYNC.CDC_SQL('T_TARGET', 'T_SOURCE') from DUAL;
*/
  function CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;
/*
SYNCHRONIZING
 
The package can synchronize in one of three ways:
1) SYNC: Compare and sync from source to target: inserts, updates and deletes.
    Works with any combination of key and non-key columns,
    but the target must be a table because I use the ROWID.
    
2) SYNC_UPSERT: sync from source to target: inserts and updates but no deletes.
    Requires a target with both primary key and non-key columns.
    It does not allow for omitting columns: the workaround is to use a view on the target.
    
3) SYNC_CDC: the source is a "Change Data Capture" table.
  It contains inserts, updates and deletes to be directly applied.
  Must contain an column ending with 'OP' containing the operation flag (I,U,D),
  and a column ending in 'RID' with the ROWID of the target row if U or D. 
*/
/*
Example:
  select COMP_SYNC.SYNC_SQL(
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE'
  ) from DUAL;
*/
  function SYNC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;

/*
Example:
  select COMP_SYNC.SYNC_UPSERT_SQL(
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE',
    P_KEY_COLS => 'C1,C2'
  ) from DUAL;
*/
  function SYNC_UPSERT_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null
  ) return clob;
 
/*
Example:
  select COMP_SYNC.SYNC_CDC_SQL(
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_CDC',
    P_OLD_OWNER => user,
    P_KEY_COLS => 'C1,C2',
    P_PREFIX => 'OPCODE'
  ) from DUAL;
*/
  function SYNC_CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;
 
end COMP_SYNC;
/
create or replace package body COMP_SYNC as
 
  G_DOLOG constant BOOLEAN := false;
    C_NEWLINE constant varchar2(2) := '
';
  
  type TT_VARCHAR2 is table of VARCHAR2(255);
  
  -- set by CHECK_COMMON_INPUTS
  G_OLD_OWNER varchar2(255);
  G_OLD_TABLE varchar2(255);
  G_NEW_SOURCE varchar2(4000);
  G_OLD_DBLINK varchar2(255);
  G_OPERATION_COL varchar2(255);
  G_OLD_OWNER_TABLE varchar2(255);
  
  -- set by MAKE_REPLACEMENTS
  G_ALL_COLS TT_VARCHAR2;   -- all non-virtual columns
  G_SOME_COLS TT_VARCHAR2;  -- all non-virtual columns except those listed on P_EXCLUDE_COLS
  G_KEY_COLS TT_VARCHAR2;   -- from P_KEY_COLS, or by default the "old" primary key columns
  G_FIRST_COL TT_VARCHAR2; -- first column in G_SOME_COLS
  G_ALL_COL_CLOB clob;  
  G_SOME_COL_CLOB clob;
  G_INSERT_COL_CLOB clob;
  G_KEY_COL_CLOB clob;
  G_ON_COL_CLOB clob;
  G_SET_COL_CLOB clob;
  G_FIRST_COL_CLOB clob;
  G_DECODE_COL_CLOB clob;
 
  procedure LOGGER(P_TXT in clob, P_DOLOG in boolean default false) is
  begin
    if G_DOLOG or P_DOLOG then
      DBMS_OUTPUT.PUT_LINE('prompt > ' || P_TXT);
    end if;
  end LOGGER;
  
  /* sets all G_OLD_* parameters, G_NEW_SOURCE and G_OPERATION_COL.
     If P_OLD_OWNER is null, G_OLD_OWNER := user but G_OLD_OWNER_TABLE does not mention schema.
     OWNER, TABLE and OPERATION_COL are uppercased unless within double quotes.
     OWNER is checked for existence. OLD_TABLE is checked for existence later if necessary. */
  procedure CHECK_COMMON_INPUTS(
    P_OLD_OWNER in varchar2,
    P_OLD_TABLE in varchar2,
    P_OLD_DBLINK in varchar2,
    P_NEW_SOURCE in varchar2
  ) is
    L_CNT number;
    L_SQL varchar2(255) :=
q'!select COUNT(*) from ALL_USERS#DBLINK# where USERNAME = trim('"' from '#OLD_OWNER#')!';
  begin
    LOGGER('CHECK_COMMON_INPUTS');
    
    if P_OLD_TABLE is null then 
      RAISE_APPLICATION_ERROR(
        -20001,
        'P_OLD_TABLE must not be null.'
      );
    end if;
    
    if P_OLD_DBLINK is null or SUBSTR(P_OLD_DBLINK,1,1) = '@' then
      G_OLD_DBLINK := upper(P_OLD_DBLINK);
    else
      G_OLD_DBLINK :=  '@' || upper(P_OLD_DBLINK);
    end if;
    
    if substr(P_OLD_OWNER,1,1) = '"' then
      G_OLD_OWNER := P_OLD_OWNER;
    else
      G_OLD_OWNER := upper(P_OLD_OWNER);
    end if;
    
    if substr(P_OLD_TABLE,1,1) = '"' then
      G_OLD_TABLE := P_OLD_TABLE;
    else
      G_OLD_TABLE := upper(P_OLD_TABLE);
    end if;
    
    if G_OLD_OWNER is null then
      G_OLD_OWNER_TABLE := G_OLD_TABLE || G_OLD_DBLINK;
      G_OLD_OWNER := user;
    else
      G_OLD_OWNER_TABLE := G_OLD_OWNER || '.' || G_OLD_TABLE || G_OLD_DBLINK;
    end if;
    
    L_SQL := replace(L_SQL, '#DBLINK#', G_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', G_OLD_OWNER);
    LOGGER(L_SQL);
    execute immediate L_SQL into L_CNT;
    if L_CNT = 0 then
      RAISE_APPLICATION_ERROR(
        -20002,
        'OLD_OWNER = ' ||G_OLD_OWNER|| ': user not found in the database.'
      );
    end if;
    
    if P_NEW_SOURCE is null then
      RAISE_APPLICATION_ERROR(
        -20003,
        'P_NEW_SOURCE is null. Must be table, view or query within parentheses.'
      );
    else
      G_NEW_SOURCE := P_NEW_SOURCE;
    end if;
  
  end CHECK_COMMON_INPUTS;
  
  function COL_TOKENIZE(
    p_string in varchar2
  )
  return TT_VARCHAR2
  as
    c_delim constant varchar2(1) := ',';
    i_prev_pos pls_integer := 1;
    i_pos pls_integer;
    i_max_pos pls_integer := length(p_string) + 1;
    l_col varchar2(255);
    lt_out TT_VARCHAR2 := new TT_VARCHAR2();
    i_out pls_integer := 0;
  begin
    loop
      i_pos := instr(p_string, c_delim, i_prev_pos);
      if i_pos = 0 then
        i_pos := i_max_pos;
      end if;
      l_col := trim(substr(p_string, i_prev_pos, i_pos - i_prev_pos));
      if substr(l_col,1,1) != '"' then
        l_col := '"' || upper(l_col) || '"';
      end if;
      i_out := i_out + 1;
      lt_out.extend;
      lt_out(i_out) := l_col;
      exit when i_pos = i_max_pos;
      i_prev_pos := i_pos + 1;
    end loop;
    return lt_out;
  end COL_TOKENIZE;
 
  /*
  Format input array into CLOB with configurable maximum line length.
  Indentation is handled later using BIG_REPLACE.
  Pattern is simplified printf: each occurence of '%s' is replaced by the array element.
  */
  function STRINGAGG(
    PT_COLS in TT_VARCHAR2,
    P_PATTERN in varchar2 default '%s',
    P_SEPARATOR in varchar2 default ',',
    P_LINEMAXLEN in number default 80
  ) return clob is
    L_CLOB clob;
    L_NEW varchar2(255);
    L_LINELEN number := 0;
  begin
    for I in 1..PT_COLS.COUNT LOOP
      L_NEW := case when I > 1 then ' ' end
        || replace(P_PATTERN, '%s', PT_COLS(I))
        || case when I < PT_COLS.COUNT then P_SEPARATOR end; if L_LINELEN + length(L_NEW) > P_LINEMAXLEN then
        L_CLOB := L_CLOB || C_NEWLINE;
        L_LINELEN := 0;
        L_NEW := SUBSTR(L_NEW,2);
      end if;
      L_CLOB := L_CLOB || L_NEW;
      L_LINELEN := L_LINELEN + length(L_NEW);
    end LOOP;
    return L_CLOB;
  end STRINGAGG;
  
  procedure BIG_REPLACE(
    p_clob in out nocopy clob,
    p_search in varchar2,
    p_replace in clob
  ) is
    c_replace_len constant integer := 30000;
    l_iter integer;
  begin
    if p_search is null then
      RAISE_APPLICATION_ERROR(
        -20004,
        'Internal error in BIG_REPLACE: p_search parameter is null.'
      );
    end if;
    if p_replace is null then
      logger('G_ALL_COL_CLOB : '||G_ALL_COL_CLOB, true);
      logger('G_SOME_COL_CLOB : '||G_SOME_COL_CLOB, true);
      logger('G_INSERT_COL_CLOB : '||G_INSERT_COL_CLOB, true);
      logger('G_KEY_COL_CLOB : '||G_KEY_COL_CLOB, true);
      logger('G_ON_COL_CLOB : '||G_ON_COL_CLOB, true);
      logger('G_SET_COL_CLOB : '||G_SET_COL_CLOB, true);
      logger('G_FIRST_COL_CLOB : '||G_FIRST_COL_CLOB, true);
      logger('G_DECODE_COL_CLOB : '||G_DECODE_COL_CLOB, true);
      RAISE_APPLICATION_ERROR(
        -20005,
        'Internal error in BIG_REPLACE: p_replace parameter is null.'
      );
    end if;
    l_iter := ceil(length(p_replace) / c_replace_len);
    --logger('length(p_replace) : '||length(p_replace));
    --logger('l_iter : '||l_iter);
    for i in 1..l_iter loop
      --logger('(i-1)*c_replace_len+1 : '||((i-1)*c_replace_len+1));
      p_clob := replace(
        p_clob, 
        p_search,
        substr(p_replace, (i-1)*c_replace_len+1, c_replace_len)
          || case when i < l_iter then p_search end ); end loop; end BIG_REPLACE; function GET_ALL_COLS return TT_VARCHAR2 is l_version number; l_instance_sql varchar2(255) := q'!select to_number(regexp_substr(banner, 'Release ([^|.]+)', 1, 1, 'i', 1)) from v$version#DBLINK# where rownum = 1!'; L_TAB_COLS SYS.ODCIVARCHAR2LIST; L_ALL_COLS TT_VARCHAR2 := new TT_VARCHAR2(); L_SQL varchar2(255) := q'!select '"'||COLUMN_NAME||'"' from ALL_TAB_COLS#DBLINK# where (OWNER, TABLE_NAME, VIRTUAL_COLUMN) = ((trim('"' from '#OLD_OWNER#'), trim('"' from '#OLD_TABLE#'), 'NO')) and #VERSION_DEPENDENT# order by SEGMENT_COLUMN_ID!'; begin LOGGER('GET_ALL_COLS'); l_instance_sql := replace(l_instance_sql, '#DBLINK#', G_OLD_DBLINK); LOGGER(l_instance_sql); execute immediate l_instance_sql into l_version; logger('l_version = ' || l_version); if l_version >= 12 then
      L_SQL := replace(L_SQL, '#VERSION_DEPENDENT#', 'USER_GENERATED = ''YES''');
    else
      L_SQL := replace(L_SQL, '#VERSION_DEPENDENT#', 'HIDDEN_COLUMN = ''NO''');
    end if;
    L_SQL := replace(L_SQL, '#DBLINK#', G_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', G_OLD_OWNER);
    L_SQL := replace(L_SQL, '#OLD_TABLE#', G_OLD_TABLE);
    LOGGER(L_SQL);
    execute immediate L_SQL bulk collect into L_TAB_COLS;
    if L_TAB_COLS.COUNT = 0 then
      RAISE_APPLICATION_ERROR(
        -20006,
        G_OLD_OWNER_TABLE || ': table not found.'
      );
    end if;
    L_ALL_COLS.extend(L_TAB_COLS.count);
    for i in 1..L_TAB_COLS.count loop
      L_ALL_COLS(i) := L_TAB_COLS(i);
    end loop;
    return L_ALL_COLS;
  end GET_ALL_COLS;
 
  function GET_KEY_COLS return TT_VARCHAR2 is
    L_KEY_COLS TT_VARCHAR2 := new TT_VARCHAR2();
    L_KEY_COL_LIST SYS.ODCIVARCHAR2LIST;
    L_SQL varchar2(4000) := 
q'!select '"'||COLUMN_NAME||'"'
from ALL_CONS_COLUMNS#DBLINK#
where (OWNER, CONSTRAINT_NAME) = (
  select OWNER, CONSTRAINT_NAME from ALL_CONSTRAINTS#DBLINK#
  where (OWNER, TABLE_NAME, CONSTRAINT_TYPE) =
        ((trim('"' from '#OLD_OWNER#'), trim('"' from '#OLD_TABLE#'), 'P'))
)!';
  begin
    LOGGER('GET_KEY_COLS');
    L_SQL := replace(L_SQL, '#DBLINK#', G_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', G_OLD_OWNER);
    L_SQL := replace(L_SQL, '#OLD_TABLE#', G_OLD_TABLE);
    LOGGER(L_SQL);
    execute immediate L_SQL bulk collect into L_KEY_COL_LIST;
    L_KEY_COLS.extend(L_KEY_COL_LIST.count);
    for i in 1..L_KEY_COL_LIST.count loop
    L_KEY_COLS(i) := L_KEY_COL_LIST(i);
    end loop;
    return L_KEY_COLS;
  end GET_KEY_COLS;
 
  procedure MAKE_REPLACEMENTS(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2,
    P_EXCLUDE_COLS in varchar2,
    P_KEY_COLS in varchar2,
    P_OLD_DBLINK in varchar2
  ) is
    L_NON_KEY_COLS TT_VARCHAR2;
    L_EXCLUDE_COLS TT_VARCHAR2;
  begin
    LOGGER('MAKE_REPLACEMENTS');
    check_common_inputs(
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_OLD_DBLINK,
      P_NEW_SOURCE
    );
    G_ALL_COLS := GET_ALL_COLS;
    if P_EXCLUDE_COLS is null then
      G_SOME_COLS := G_ALL_COLS;
    else
      L_EXCLUDE_COLS := COL_TOKENIZE(P_EXCLUDE_COLS);
      G_SOME_COLS := G_ALL_COLS multiset except L_EXCLUDE_COLS;
    end if;
    G_FIRST_COL := new TT_VARCHAR2(G_SOME_COLS(1));
    G_ALL_COL_CLOB := STRINGAGG(G_ALL_COLS);
    G_SOME_COL_CLOB := STRINGAGG(G_SOME_COLS);
    G_INSERT_COL_CLOB := STRINGAGG(G_ALL_COLS, 'N.%s');
    G_FIRST_COL_CLOB := STRINGAGG(G_FIRST_COL, '%s=N.%s');
    
    if P_KEY_COLS is null then
      G_KEY_COLS := GET_KEY_COLS;
    else
      G_KEY_COLS := COL_TOKENIZE(P_KEY_COLS);
    end if;
    
    if cardinality(G_KEY_COLS multiset intersect L_EXCLUDE_COLS) > 0 then
      G_KEY_COLS := null;
    end if;
    
    G_KEY_COL_CLOB := null;
    G_ON_COL_CLOB := null;
    G_SET_COL_CLOB := null;
    G_DECODE_COL_CLOB := null;
    if G_KEY_COLS is not null and G_KEY_COLS.COUNT > 0 then
      G_KEY_COL_CLOB := STRINGAGG(G_KEY_COLS);
      G_ON_COL_CLOB := STRINGAGG(G_KEY_COLS, 'O.%s=N.%s', ' and');
      L_NON_KEY_COLS := G_SOME_COLS multiset except G_KEY_COLS;
      if L_NON_KEY_COLS.COUNT between 1 and G_SOME_COLS.COUNT - 1 then
        G_SET_COL_CLOB := STRINGAGG(L_NON_KEY_COLS, '%s=N.%s');
        G_DECODE_COL_CLOB := STRINGAGG(L_NON_KEY_COLS, 'decode(O.%s,N.%s,0,1)');
      end if;
    end if;
    
    logger('G_ALL_COL_CLOB : '||G_ALL_COL_CLOB);
    logger('G_SOME_COL_CLOB : '||G_SOME_COL_CLOB);
    logger('G_INSERT_COL_CLOB : '||G_INSERT_COL_CLOB);
    logger('G_KEY_COL_CLOB : '||G_KEY_COL_CLOB);
    logger('G_ON_COL_CLOB : '||G_ON_COL_CLOB);
    logger('G_SET_COL_CLOB : '||G_SET_COL_CLOB);
    logger('G_FIRST_COL_CLOB : '||G_FIRST_COL_CLOB);
    logger('G_DECODE_COL_CLOB : '||G_DECODE_COL_CLOB);

  end MAKE_REPLACEMENTS;

  function COMPARE_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_CLOB constant clob :=
'select /*+ qb_name(COMPARE) */
  #SOME_COLS#,
  decode(sign(sum(#PREFIX#NEW_CNT)), 1, ''I'', ''D'') #PREFIX#OP,
  abs(sum(#PREFIX#NEW_CNT)) #PREFIX#CNT
FROM (
  select /*+ qb_name(old) */
  #SOME_COLS#
    , -1 #PREFIX#NEW_CNT
  from #OLD# O
  union all
  select /*+ qb_name(new) */
  #SOME_COLS#
    , 1 #PREFIX#NEW_CNT
  from #NEW# N
)
group by
  #SOME_COLS#
having sum(#PREFIX#NEW_CNT) != 0
order by 1, #PREFIX#OP';
  begin
    LOGGER('COMPARE_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      P_EXCLUDE_COLS,
      null,
      P_OLD_DBLINK
    );
    L_CLOB := replace(
      C_CLOB,
      '#SOME_COLS#',
      replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    return L_CLOB;
  end COMPARE_SQL;

  function CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_GROUP_CLOB constant clob :=
'select /*+ qb_name(CDC_GROUP) */
    #SOME_COLS#,
  case count(*) over(partition by #KEY_COLS#) - #PREFIX#NEW_CNT
    when 0 then ''I''
    when 1 then ''U''
    when 2 then ''D''
    when 3 then ''O''
  end #PREFIX#OP,
  max(#PREFIX#RID) over(partition by #KEY_COLS#) #PREFIX#RID
FROM (
  select /*+ qb_name(COMPARE) NO_MERGE */
    #SOME_COLS#,
    sum(#PREFIX#NEW_CNT) #PREFIX#NEW_CNT,
    max(#PREFIX#RID) #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #SOME_COLS#,
    -1 #PREFIX#NEW_CNT, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #SOME_COLS#,
    1 #PREFIX#NEW_CNT, null
    from #NEW# N
  )
  group by
    #SOME_COLS#
  having sum(#PREFIX#NEW_CNT) != 0
)
order by 1, #PREFIX#OP';
    C_PARTITION_CLOB constant clob :=
'select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    #ALL_COLS#,
    case
      when #PREFIX#NEW = 1
        and sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        )
        then ''I''
      when #PREFIX#OLD = 1
        and sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        )
        then ''D''
    end #PREFIX#OP, #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #ALL_COLS#,
    1 #PREFIX#OLD, 0 #PREFIX#NEW, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #ALL_COLS#,
    0, 1, null
    from #NEW# N
  )
)
where #PREFIX#OP is not null';
  begin
    LOGGER('COMPARE_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      P_EXCLUDE_COLS,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    if G_KEY_COL_CLOB is null or P_EXCLUDE_COLS is not null then
      L_CLOB := C_PARTITION_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
      big_replace(
        L_CLOB,
        '#ALL_COLS#',
        replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
    else
      L_CLOB := C_GROUP_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '    ')
      );
      big_replace(L_CLOB, '#KEY_COLS#', G_KEY_COL_CLOB);
    end if;
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    return L_CLOB;
  end CDC_SQL; 
  
  function SYNC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_GROUP_CLOB constant clob :=
'merge /*+ qb_name(SYNC_GROUP) USE_NL(O) */ into (
  select /*+ qb_name(target) */
  #ALL_COLS#, rowid #PREFIX#RID
  from #OLD#
) O
using (
select * from (
select /*+ qb_name(CDC_GROUP) */
    #SOME_COLS#,
  case count(*) over(partition by #KEY_COLS#) - #PREFIX#NEW_CNT
    when 0 then ''I''
    when 1 then ''U''
    when 2 then ''D''
    when 3 then ''O''
  end #PREFIX#OP,
  max(#PREFIX#RID) over(partition by #KEY_COLS#) #PREFIX#RID
FROM (
  select /*+ qb_name(COMPARE) NO_MERGE */
    #SOME_COLS#,
    sum(#PREFIX#NEW_CNT) #PREFIX#NEW_CNT,
    max(#PREFIX#RID) #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #SOME_COLS#,
    -1 #PREFIX#NEW_CNT, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #SOME_COLS#,
    1 #PREFIX#NEW_CNT, null
    from #NEW# N
  )
  group by
    #SOME_COLS#
  having sum(#PREFIX#NEW_CNT) != 0
)
)
where #PREFIX#OP in(''I'',''U'',''D'')
) N
on (
  O.#PREFIX#RID = n.#PREFIX#RID
)
when matched then update set
  #SET_COLS#
  where N.#PREFIX#OP in (''U'', ''D'')
  delete where N.#PREFIX#OP = ''D''
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';
    C_PARTITION_CLOB constant clob :=
'merge /*+ qb_name(SYNC_PARTITION) USE_NL(O) */ into (
  select /*+ qb_name(target) */
    #ALL_COLS#, rowid #PREFIX#RID
  from #OLD#
) O
using (
select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    #ALL_COLS#,
    case
      when #PREFIX#NEW = 1
        and sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        )
        then ''I''
      when #PREFIX#OLD = 1
        and sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        )
        then ''D''
    end #PREFIX#OP, #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #ALL_COLS#,
    1 #PREFIX#OLD, 0 #PREFIX#NEW, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #ALL_COLS#,
    0, 1, null
    from #NEW# N
  )
)
where #PREFIX#OP is not null
) N
on (
  O.#PREFIX#RID = n.#PREFIX#RID
)
when matched then update set
  #FIRST_COL#
  delete where N.#PREFIX#OP = ''D''
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';
  begin
    LOGGER('SYNC_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      P_EXCLUDE_COLS,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    if G_KEY_COL_CLOB is null or G_SET_COL_CLOB is null or P_EXCLUDE_COLS is not null then
      L_CLOB := C_PARTITION_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
      big_replace(
        L_CLOB,
        '#ALL_COLS#',
        replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
      L_CLOB := replace(L_CLOB, '#FIRST_COL#', G_FIRST_COL_CLOB);
    else
      L_CLOB := C_GROUP_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '    ')
      );
      big_replace(
        L_CLOB,
        '#ALL_COLS#',
        replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
      );
      big_replace(
        L_CLOB,
        '#SET_COLS#',
        replace(G_SET_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
      );
      L_CLOB := replace(L_CLOB, '#KEY_COLS#', G_KEY_COL_CLOB);
    end if;
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    big_replace(
      L_CLOB,
      '#INSERT_COLS#',
      replace(G_INSERT_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    return L_CLOB;
  end SYNC_SQL;
 
  function SYNC_UPSERT_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
    L_CLOB clob;
    C_CLOB constant clob :=
'merge /*+ qb_name(SYNC_UPSERT) USE_NL(O) */ into (
  select /*+ qb_name(target) */
  #ALL_COLS#
  from #OLD#
) O
using (
  select /*+ qb_name(source) */
  #ALL_COLS#
  from #NEW#
) N
on (
  #ON_COLS#
)
when matched then update set
  #SET_COLS#
  where 1 in (
    #DECODE_COLS#
  )
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';

  begin
    LOGGER('SYNC_UPSERT_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      null,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    if G_SET_COL_CLOB is null then
      RAISE_APPLICATION_ERROR(
        -20007,
        'SYNC_UPSERT_SQL requires a target with both primary and non-key columns'
      );
    end if;
    L_CLOB := C_CLOB;
    big_replace(
      L_CLOB,
      '#ALL_COLS#',
      replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    big_replace(
      L_CLOB,
      '#ON_COLS#',
      replace(G_ON_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    big_replace(
      L_CLOB,
      '#SET_COLS#',
      replace(G_SET_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    big_replace(
      L_CLOB,
      '#DECODE_COLS#',
      replace(G_DECODE_COL_CLOB, C_NEWLINE, C_NEWLINE || '    ')
    );
    big_replace(
      L_CLOB,
      '#INSERT_COLS#',
      replace(G_INSERT_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    return L_CLOB;
  end SYNC_UPSERT_SQL;
 
  function SYNC_CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_CLOB constant clob :=
'merge /*+ qb_name(SYNC_CDC_SQL) USE_NL(O) */ into (
  select /*+ qb_name(target) */
  #ALL_COLS#, rowid #PREFIX#RID
  from #OLD#
) O
using (
  select /*+ qb_name(source) */ #PREFIX#OP, #PREFIX#RID,
  #ALL_COLS#
  from #NEW#
  where #PREFIX#OP in(''D'', ''I'', ''U'')
) N
on (
  O.#PREFIX#RID = n.#PREFIX#RID
)
when matched then update set
  #SET_COLS#
  delete where N.#PREFIX#OP = ''D''
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';
 
  begin
    LOGGER('SYNC_CDC_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      null,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    L_CLOB := C_CLOB;
    big_replace(
      L_CLOB,
      '#ALL_COLS#',
      replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    if G_SET_COL_CLOB is not null then
      big_replace(
        L_CLOB,
        '#SET_COLS#',
        replace(G_SET_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
      );
    else
      L_CLOB := replace(L_CLOB, '#SET_COLS#', G_FIRST_COL_CLOB);
    end if;
    big_replace(
      L_CLOB,
      '#INSERT_COLS#',
      replace(G_INSERT_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    return L_CLOB;
  end SYNC_CDC_SQL;
   
end COMP_SYNC;
/
/

Techniques for Comparing Tables

In my “Advanced Row Pattern Matching” presentation, I demonstrate using MATCH_RECOGNIZE to compare tables. Kim Berg Hansen asked me to compare this technique with others. I did some quick tests and here are the results with some comments.

Technique Seconds
Full join 1
Group by (HASH) 1
Group by (SORT) 1.4
Analytic function 2.5
MATCH_RECOGNIZE 3.7

 

The “Full join” technique only works when we have a primary or unique key that is shared by both tables. I prefer the GROUP BY technique popularized by Tom Kyte, even though it may be a bit slower. When testing, I noticed that the HASH GROUP BY algorithm performs better than SORT GROUP BY, as others have written.

If either of the tables contains duplicate rows (which may happen if we don’t compare all of the columns, or if there is no primary key), then GROUP BY will output one row. This may be a problem if we want data (such as the ROWID)  that was not included in the comparison. In that case, we could use analytic functions or the MATCH_RECOGNIZE clause to compare and output all the rows and columns of interest. As you can see, the analytic function is more than twice as slow but it easily beats the MATCH_RECOGNIZE clause.

I use the output from table comparisons to synchronize the tables, so capturing the ROWID is important to me even when a primary or unique key is not available. For that use case, I will prefer analytic functions from now on.

Compare and Sync without Primary Keys

I have written a lot about comparing and synchronizing tables. My examples always had both primary keys and non-key columns, so I could do updates along with inserts and deletes. What about the other tables? Here’s a technique that works for them.

(Here is a list of my posts about comparing and synchronizing tables.)

The Idea

MERGE joins together rows from the target table and the source. The ON clause contains the join conditions. When we do an UPDATE, we can only change columns that are not mentioned in the ON clause.

  • What if we don’t have any primary or unique key? We have nothing to join on.
  • What if we have a primary key but no non-key columns? We can join, but there is nothing to update.

What we can do is DELETE and INSERT. We just need to provide the source (in the USING clause) containing:

  • the rows to delete, identified by ROWID
  • and the rows to insert.

Now, suppose there are duplicate rows, say 2 rows in the source and 3 rows in the target. Should we delete 3 target rows and insert 2 source rows, or just delete 1 target row? I prefer just deleting the 1 row, without doing any extra work.

Test data

I want to create test cases with different numbers of duplicate rows in the target and source tables. Here is a table showing each test case, the number of rows in each table and the number of deletes or inserts I want to do.

COL SOURCE_COUNT TARGET_COUNT INSERT_COUNT DELETE_COUNT
1 0 1 0 1
2 0 2 0 2
3 1 0 1 0
4 1 1 0 0
5 1 2 0 1
6 2 0 2 0
7 2 1 1 0
8 2 2 0 0

 

As you can see, I want to wind up with “8 rows merged”, 4 inserts and 4 deletes.My test data is simple, but the code to generate it is complicated. Please don’t get hung up on this part! Here’s the code:

create table t_target(
  col number
);

create table t_source(
  col number
);

insert first
when type = 't' then into t_target values(col)
when type = 's' then into t_source values(col)
with nums as (
  select level-1 n from dual
  connect by level <= 3
)
, test_cases as (
  select row_number() over(order by s.n, t.n) col,
  s.n s, t.n t
  from nums s, nums t
  where s.n+t.n > 0
)
select 's' type, col
from test_cases,
table(cast(multiset(
  select 0 from dual connect by level <= s) as sys.odcinumberlist
))
where s > 0
union all
select 't' type, col
from test_cases,
table(cast(multiset(
  select 0 from dual connect by level <= t) as sys.odcinumberlist
))
where t > 0;
commit;

Step 1: get all the data and the target ROWIDs

“Old” rows are flagged with -1 and “new” rows with 1.

select col,
  -1 Z##FLAG, rowid Z##RID
from T_TARGET o
union all
select col,
  1 Z##FLAG, null
from T_SOURCE n
order by col, z##flag;
COL Z##FLAG Z##RID
1 -1 AAAX1hAAHAAAQPbA
2 -1 AAAX1hAAHAAAQPbA
2 -1 AAAX1hAAHAAAQPbA
3 1
4 -1 AAAX1hAAHAAAQPbA
4 1
5 -1 AAAX1hAAHAAAQPbA
5 -1 AAAX1hAAHAAAQPbA
5 1
6 1
6 1
7 -1 AAAX1hAAHAAAQPbA
7 1
7 1
8 -1 AAAX1hAAHAAAQPbA
8 -1 AAAX1hAAHAAAQPbA
8 1
8 1

 

Step 2: how many rows to insert or delete

Here I use analytics to compare the rows. I partition by all the columns.

  • From the previous step, Z##FLAG is 1 for new rows and -1 for old ones.
  • Z##NUM_ROWS is the sum of Z##FLAG over the entire partition. so it indicates the number of rows to insert or delete. If Z##NUM_ROWS = 0, nothing needs to be done.
  • Z##NEW is an incremental number assigned to new rows.
  • Z##OLD is an incremental number assigned to old rows.
select
sum(Z##FLAG) over(partition by col) Z##NUM_ROWS,
count(nullif(Z##FLAG,-1)) over(
  partition by col
  order by null rows unbounded preceding
) Z##NEW,
count(nullif(Z##FLAG,1)) over(
  partition by col
  order by null rows unbounded preceding
) Z##OLD,
a.* from (
  select col,
    -1 Z##FLAG, rowid Z##RID
  from T_TARGET o
  union all
  select col,
    1 Z##FLAG, null
  from T_SOURCE n
) a
order by col, z##flag;
Z##NUM_ROWS Z##NEW Z##OLD COL Z##FLAG Z##RID
-1 0 1 1 -1 AAAX1hAAHAAAQPbA
-2 0 1 2 -1 AAAX1hAAHAAAQPbA
-2 0 2 2 -1 AAAX1hAAHAAAQPbA
1 1 0 3 1
0 0 1 4 -1 AAAX1hAAHAAAQPbA
0 1 1 4 1
-1 0 1 5 -1 AAAX1hAAHAAAQPbA
-1 0 2 5 -1 AAAX1hAAHAAAQPbA
-1 1 2 5 1
2 1 0 6 1
2 2 0 6 1
1 0 1 7 -1 AAAX1hAAHAAAQPbA
1 1 1 7 1
1 2 1 7 1
0 0 1 8 -1 AAAX1hAAHAAAQPbA
0 0 2 8 -1 AAAX1hAAHAAAQPbA
0 1 2 8 1
0 2 2 8 1

 

Step 3: Keep only rows of interest

For each partition:

  • I only care when “old” and “new” have different numbers of rows:
    Z##NUM_ROWS != 0
  • If Z##NUM_ROWS is positive, I want only “new” rows, and if it’s negative I want only “old” rows:
    sign(Z##NUM_ROWS) = Z##FLAG
  • I only want enough rows to make the numbers even. For example, if Z##NUM_ROWS is 1 then I want the row where Z##NEW is 1, but not the row where Z##NEW is 2:
    abs(Z##NUM_ROWS) >= case sign(Z##NUM_ROWS) when 1 then Z##NEW else Z##OLD end
select * from (
  select
  sum(Z##FLAG) over(partition by col) Z##NUM_ROWS,
  count(nullif(Z##FLAG,-1)) over(
    partition by col
    order by null rows unbounded preceding
  ) Z##NEW,
  count(nullif(Z##FLAG,1)) over(
    partition by col
    order by null rows unbounded preceding
  ) Z##OLD,
  a.* from (
    select col,
      -1 Z##FLAG, rowid Z##RID
    from T_TARGET o
    union all
    select col,
      1 Z##FLAG, null
    from T_SOURCE n
  ) a
)
where Z##NUM_ROWS != 0
and sign(Z##NUM_ROWS) = Z##FLAG
and abs(Z##NUM_ROWS) >=
  case sign(Z##NUM_ROWS) when 1 then z##new else z##old end;
Z##NUM_ROWS Z##NEW Z##OLD COL Z##FLAG Z##RID
-1 0 1 1 -1 AAAX1hAAHAAAQPbA
-2 0 1 2 -1 AAAX1hAAHAAAQPbA
-2 0 2 2 -1 AAAX1hAAHAAAQPbA
1 1 0 3 1
-1 0 1 5 -1 AAAX1hAAHAAAQPbA
2 1 0 6 1
2 2 0 6 1
1 1 0 7 1

 

Step 4: Use MERGE to delete old and insert new

This time I join Z##RID to the “old” ROWID.

  • When Z##RID is null there will be no match so the row will be inserted.
  • When there is a match, I update a column, any column, because I can’t delete a row unless I have updated it first.
  • Then I delete every row I have updated.

I should explain about the hint “use_nl(o)”. This tells Oracle to use a “nested loop” when joining the lines from step 3 to the target table. If I leave out the hint, Oracle will likely do a full scan. With the hint, Oracle will access the target table “BY USER ROWID”. Use the hint only when there are few rows to change, say around 1%. If you’re not sure, it might be more prudent to remove the hint.

merge /*+ use_nl(o) */into T_TARGET o
  using (
  select * from (
    select
    sum(Z##FLAG) over(partition by col) Z##NUM_ROWS,
    count(nullif(Z##FLAG,-1)) over(
      partition by col
      order by null rows unbounded preceding
    ) Z##NEW,
    count(nullif(Z##FLAG,1)) over(
      partition by col
      order by null rows unbounded preceding
    ) Z##OLD,
    a.* from (
      select col,
        -1 Z##FLAG, rowid Z##RID
      from T_TARGET o
      union all
      select col,
        1 Z##FLAG, null
      from T_SOURCE n
    ) a
  )
  where Z##NUM_ROWS != 0
  and sign(Z##NUM_ROWS) = Z##FLAG
  and abs(Z##NUM_ROWS) >=
    case sign(Z##NUM_ROWS) when 1 then z##new else z##old end
) n
on (o.ROWID = n.Z##RID)
when matched then update set col = n.col
delete where 1=1
when not matched then insert (col)
  values(n.col);

8 rows merged. (the first time)
0 rows merged. (the second time)

Step 5: Generate the MERGE

In the post Compare and sync tables: Generating the code, I generated the code for the GROUP BY and MERGE method. Now I’ll generate the code for this new method. Please see Generating SQL with SQL templates for an explanation of the MULTI_REPLACE package.

VARIABLE P_OLDOWNER varchar2(30)
VARIABLE P_OLDTABLE varchar2(30)
VARIABLE P_NEWSOURCE varchar2(256)
EXEC :P_OLDTABLE := 'T_TARGET';
EXEC :P_NEWSOURCE := 'T_SOURCE';

with INPUT as (
  select UPPER(NVL(:P_OLDOWNER, user)) OLD_OWNER,
  UPPER(:P_OLDTABLE) OLD_TABLE_NAME,
  :P_NEWSOURCE NEW_SOURCE,
  UPPER(NVL2(:P_OLDOWNER, :P_OLDOWNER || '.' || :P_OLDTABLE, :P_OLDTABLE)) OLD_TABLE
  from DUAL
)
, TAB_COLS as (
  select COLUMN_NAME, INTERNAL_COLUMN_ID COLUMN_ID
  from ALL_TAB_COLS, INPUT
  where (OWNER, TABLE_NAME) = ((OLD_OWNER, OLD_TABLE_NAME))
)
, COL_LIST as (
  select LISTAGG(COLUMN_NAME,',') within group(order by COLUMN_ID) ALL_COLS,
  LISTAGG('n.' || COLUMN_NAME,',') within group(order by COLUMN_ID) INSERT_COLS,
  min(COLUMN_NAME) COLUMN_NAME
  from TAB_COLS
)
select MULTI_REPLACE.TO_VARC(
'merge /*+ use_nl(o) */into #OLD_TABLE# o
  using (
  select * from (
    select 
    sum(Z##FLAG) over(partition by #ALL_COLS#) Z##NUM_ROWS,
    count(nullif(Z##FLAG,-1)) over(
      partition by #ALL_COLS#
      order by null rows unbounded preceding
    ) Z##NEW,
    count(nullif(Z##FLAG,1)) over(
      partition by #ALL_COLS#
      order by null rows unbounded preceding
    ) Z##OLD,
    a.* from (
      select #ALL_COLS#,
        -1 Z##FLAG, rowid Z##RID
      from #OLD_TABLE# o
      union all
      select #ALL_COLS#,
        1 Z##FLAG, null
      from #NEW_SOURCE# n
    ) a
  )
  where Z##NUM_ROWS != 0
  and sign(Z##NUM_ROWS) = Z##FLAG
  and abs(Z##NUM_ROWS) >=
    case sign(Z##NUM_ROWS) when 1 then Z##NEW else Z##OLD end
) n
on (o.ROWID = n.Z##RID)
when matched then update set #COLUMN_NAME# = n.#COLUMN_NAME#
delete where 1=1
when not matched then insert
  (#ALL_COLS#)
  values(#INSERT_COLS#);',
SYS.ODCIVARCHAR2LIST(
  '#OLD_TABLE#','#ALL_COLS#','#COLUMN_NAME#','#NEW_SOURCE#','#INSERT_COLS#'
),
SYS.ODCIVARCHAR2LIST(
    OLD_TABLE,    ALL_COLS,    COLUMN_NAME,    NEW_SOURCE,    INSERT_COLS
)
) SQL_TEXT
from INPUT, COL_LIST;

Conclusion

This method of synchronizing tables works with any combination of primary key and non-key fields. Most of the time, you will have tables with both primary keys and non-key fields; for those tables, the GROUP BY method is more efficient. For the others, you now have a solution so you’re all set.

Sync tables: generate MERGE using Unique constraint

In my post “Compare and sync tables: Generating the code“, I use the primary key constraint on the target table. A reader called “Bal” asked how to use a unique constraint instead.

(Here is a list of my posts about comparing and synchronizing tables.)

Test data

I’m going to create a target table with a unique constraint, but without any NOT NULL constraints on the columns.

drop table t_target purge;
create table t_target as
with col1 as (
  select 1 uk1 from dual
  union all
  select null from dual
)
, col2 as (
  select 2 uk2 from dual
  union all
  select null from dual
)
select uk1, uk2, 'Old value '|| rownum val from col1, col2;
alter table t_target add constraint t_target_uk unique(uk1,uk2);

drop table t_source purge;
create table t_source as
select uk1, uk2, replace(val,'Old','New') val from t_target;
UK1 UK2 VAL
1 2 New value 1
1 New value 2
2 New value 3
New value 4

Changes to the SQL template

I am going to compare columns UK1 and UK2, which may contain NULL values. I like to use DECODE for that. In this case, I want to generate

on (0 = ALL(decode(o.UK1,n.UK1,0,1),decode(o.UK2,n.UK2,0,1)))

So in my revised template, that line will read

on (0 = ALL(#ON_COLS#))

Changes to the code generator

Here is the new code generator. I will highlight the changed lines

VARIABLE P_OLDOWNER varchar2(30)
VARIABLE P_OLDTABLE varchar2(30)
VARIABLE P_NEWSOURCE varchar2(256)
EXEC :P_OLDTABLE := 'T_TARGET';
EXEC :P_NEWSOURCE := 'T_SOURCE';

with INPUT as (
  select UPPER(NVL(:P_OLDOWNER, user)) OLD_OWNER,
  UPPER(:P_OLDTABLE) OLD_TABLE_NAME,
  :P_NEWSOURCE NEW_SOURCE,
  UPPER(NVL2(:P_OLDOWNER, :P_OLDOWNER || '.' || :P_OLDTABLE, :P_OLDTABLE)) OLD_TABLE
  from DUAL
)
, TAB_COLS as (
  select COLUMN_NAME, INTERNAL_COLUMN_ID COLUMN_ID
  from ALL_TAB_COLS, INPUT
  where (OWNER, TABLE_NAME) = ((OLD_OWNER, OLD_TABLE_NAME))
)
, KEY_COLS as (
  select COLUMN_NAME, POSITION
  from ALL_CONS_COLUMNS, INPUT
  where (OWNER, CONSTRAINT_NAME) = (
    select OWNER, CONSTRAINT_NAME from ALL_CONSTRAINTS
    where (OWNER, TABLE_NAME, CONSTRAINT_TYPE) = ((OLD_OWNER, OLD_TABLE_NAME, 'U'))
  )
)
, COL_LIST as (
  select LISTAGG(COLUMN_NAME,',') within group(order by COLUMN_ID) ALL_COLS,
  LISTAGG('n.' || COLUMN_NAME,',') within group(order by COLUMN_ID) INSERT_COLS
  from TAB_COLS
)
, PK_LIST as (
  select LISTAGG(COLUMN_NAME,',') within group(order by POSITION) PK_COLS,
  LISTAGG('decode(o.'||COLUMN_NAME||',n.'||COLUMN_NAME||',0,1)',',')
    within group(order by POSITION) ON_COLS
  from KEY_COLS
)
, SET_LIST as (
  select LISTAGG(COLUMN_NAME || '=n.'||COLUMN_NAME,',')
    within group(order by COLUMN_ID) SET_COLS
  from TAB_COLS
  where COLUMN_NAME not in (select COLUMN_NAME from KEY_COLS)
)
select MULTI_REPLACE.TO_VARC(
'merge into #OLD_TABLE# O
using (
  select * from (
    select #ALL_COLS#,
    COUNT(*) over(partition by #PK_COLS#)
      - SUM(Z##_CNT) Z##IUD_FLAG
    from (
      select #ALL_COLS#,
        -1 Z##_CNT
      from #OLD_TABLE# O
      union all
      select #ALL_COLS#,
        1 Z##_CNT
      from #NEW_SOURCE# N
    )
    group by #ALL_COLS#
    having SUM(Z##_CNT) != 0
  )
  where Z##IUD_FLAG < 3
) N
on (0 = ALL(#ON_COLS#))
when matched then update
  set #SET_COLS#
  delete where N.Z##IUD_FLAG = 2
when not matched then insert
  (#ALL_COLS#)
  values(#INSERT_COLS#)',
SYS.ODCIVARCHAR2LIST('#OLD_TABLE#','#ALL_COLS#','#PK_COLS#',
  '#NEW_SOURCE#','#ON_COLS#','#SET_COLS#','#INSERT_COLS#'),
SYS.ODCIVARCHAR2LIST(  OLD_TABLE,    ALL_COLS,    PK_COLS,
    NEW_SOURCE,    ON_COLS,    SET_COLS,    INSERT_COLS)
) SQL_TEXT
from INPUT, COL_LIST, PK_LIST, SET_LIST;

The generated code is:

merge into T_TARGET O
using (
  select * from (
    select UK1,UK2,VAL,
    COUNT(*) over(partition by UK1,UK2)
      - SUM(Z##_CNT) Z##IUD_FLAG
    from (
      select UK1,UK2,VAL,
        -1 Z##_CNT
      from T_TARGET O
      union all
      select UK1,UK2,VAL,
        1 Z##_CNT
      from T_SOURCE N
    )
    group by UK1,UK2,VAL
    having SUM(Z##_CNT) != 0
  )
  where Z##IUD_FLAG < 3
) N
on (0 = ALL(decode(o.UK1,n.UK1,0,1),decode(o.UK2,n.UK2,0,1)))
when matched then update
  set VAL=n.VAL
  delete where N.Z##IUD_FLAG = 2
when not matched then insert
  (UK1,UK2,VAL)
  values(n.UK1,n.UK2,n.VAL);

Limitations

I have not tested this with tables that have more than one unique constraint.

Be warned that a unique constraint does not prevent duplicate lines with all NULL values. If the source can have more than one row with all NULL values in UK1 and UK2, this solution is not appropriate.

MERGE using Change Data Capture (CDC)

When you want to get a target table in sync with a source table, sometimes you have to compare the tables to obtain the differences. Other times the source system provides you with the differences and you just need to apply them. MERGE can help you do that.

(Here is a list of my posts about comparing and synchronizing tables.)

Situation

  • Target and source have the same columns.
  • The target table is to receive all the changes from the source, including deletes.
  • Target and source have primary keys and additional columns, so updates are possible.
  • The change table contains all the columns in changed source rows, plus an OPERATION column with 3 possible values:
    • ‘I’ if the row was inserted in the source
    • ‘U’ if the row was updated in the source.
    • ‘D’ if the row was deleted in the source.

Test data

To cover every possibility, I have 9 test cases:

Operation Target row Data same Action #
I Present Yes Do nothing 1
I Present No Update 2
U Present Yes Do nothing 3
U Present No Update 4
D Present Delete 5
(absent) Present Do nothing 6
I Absent Insert 7
U Absent Insert 8
D Absent Do nothing 9

I want my solution to be robust and to do as little work as possible. I want to be able to run the same code a second time and have nothing happen, since the target table is already synchronized. For these reasons, I treat the ‘I’ and ‘U’ operations exactly the same: the MERGE statement will do an UPSERT if the rows are different, and nothing if the rows are already the same.

Here is the test data I came up with:

create table t_target(ckey, cvalue) as
select 1, 'I, target = cdc > do nothing' from dual union all
select 2, 'I, target != cdc > update OLD' from dual union all
select 3, 'U, target = cdc > do nothing' from dual union all
select 4, 'U, target != cdc > update OLD' from dual union all
select 5, 'D > delete' from dual union all
select 6, 'target not in cdc > do nothing' from dual;
CKEY CVALUE
1 I, target = cdc > do nothing
2 I, target != cdc > update OLD
3 U, target = cdc > do nothing
4 U, target != cdc > update OLD
5 D > delete
6 target not in cdc > do nothing
drop table cdc purge;
create table cdc(operation, ckey, cvalue) as
select 'I', 1, 'I, target = cdc > do nothing' from dual union all
select 'I', 2, 'I, target != cdc > update NEW' from dual union all
select 'U', 3, 'U, target = cdc > do nothing' from dual union all
select 'U', 4, 'U, target != cdc > update NEW' from dual union all
select 'D', 5, 'D > delete' from dual union all
select 'I', 7, 'I not in target > Insert' from dual union all
select 'U', 8, 'U not in target > Insert' from dual union all
select 'D', 9, 'D not in target > do nothing' from dual;
OPERATION CKEY CVALUE
I 1 I, target = cdc > do nothing
I 2 I, target != cdc > update NEW
U 3 U, target = cdc > do nothing
U 4 U, target != cdc > update NEW
D 5 D > delete
I 7 I not in target > Insert
U 8 U not in target > Insert
D 9 D not in target > do nothing

First try: DELETE with WHERE

Remember, I will always run each MERGE statement twice to make sure no unnecessary work is done.

merge into t_target o
using cdc n
on (o.ckey = n.ckey)
when matched then update
  set cvalue = n.cvalue
delete where n.operation = 'D'
when not matched then insert
  (ckey, cvalue)
  values(n.ckey, n.cvalue);
CKEY CVALUE
1 I, target = cdc > do nothing
2 I, target != cdc > update NEW
3 U, target = cdc > do nothing
4 U, target != cdc > update NEW
6 target not in cdc > do nothing
7 I not in target > Insert
8 U not in target > Insert
9 D not in target > do nothing

Well, that didn’t work out too well: row 9 was supposed to be deleted, but it was already gone from the target table so I actually inserted it!

Second try: WHERE with DELETE and with INSERT

Let’s tell the MERGE not to insert rows with ‘D’ flags:

merge into t_target o
using cdc n
on (o.ckey = n.ckey)
when matched then update
  set cvalue = n.cvalue
delete where n.operation = 'D'
when not matched then insert
  (ckey, cvalue)
  values(n.ckey, n.cvalue)
  where n.operation != 'D';

7 rows merged.
CKEY CVALUE
1 I, target = cdc > do nothing
2 I, target != cdc > update NEW
3 U, target = cdc > do nothing
4 U, target != cdc > update NEW
6 target not in cdc > do nothing
7 I not in target > Insert
8 U not in target > Insert

Well, that’s better. I merged 7 rows the first time and the target table looks good. Unfortunately, when I run the statement again I get “6 rows merged” for no good reason.

Third try: WHERE with UPDATE, DELETE and INSERT

merge into t_target o
using cdc n
on (o.ckey = n.ckey)
when matched then update
  set cvalue = n.cvalue
  where n.operation = 'D'
  or decode(o.cvalue,n.cvalue,0,1) = 1
delete where n.operation = 'D'
when not matched then insert
  (ckey, cvalue)
  values(n.ckey, n.cvalue)
  where n.operation != 'D';

5 rows merged. (first time)
0 rows merged. (second time)

The trick is to only update rows with changes – or with the ‘D’ flag since you can’t delete a row if it hasn’t been updated first. Please note the DECODE trick, which compares the columns correctly even if one or both values are NULL.

MERGE magic and madness

Using the MERGE statement, you can insert into, delete from and update the same table all at once: that is the magic. If you don’t pay attention, you can also make the database do a lot of unnecessary work: that is the madness!

I’ve blogged a lot about comparing tables, then using MERGE to synchronize them. Let’s see what MERGE can do alone, without a prior comparison step.

(Here is a list of my posts about comparing and synchronizing tables.)

UPSERT

When Oracle introduced the MERGE statement, it could only do inserts and updates, but not deletes. This is a common scenario in transactions, when you want a row of data to go into a table whether a row with the same key exists or not. Here is an example using my typical “compare and sync” test data:

select * from t_target where key_num = 1;
no rows selected
merge into T_TARGET o
using (
  select 1 KEY_NUM,
  trunc(sysdate) KEY_DATE,
  trunc(sysdate) VAL_TS
  from DUAL
) n
on (o.KEY_NUM=n.KEY_NUM and o.KEY_DATE=n.KEY_DATE)
when matched then update
  set VAL_TS=n.VAL_TS,VAL_STRING='UPDATE'
when not matched then insert
  (KEY_NUM,KEY_DATE,VAL_TS,VAL_STRING)
  values(n.KEY_NUM,n.KEY_DATE,n.VAL_TS,'INSERT');
1 rows merged.
select * from t_target where key_num = 1;
KEY_NUM KEY_DATE VAL_TS VAL_STRING
1 2015-01-04 00:00:00 04-JAN-15 12.00.00.000000 AM INSERT

The row was inserted. Now run the same MERGE statement again and select the same row:

KEY_NUM KEY_DATE VAL_TS VAL_STRING
1 2015-01-04 00:00:00 04-JAN-15 12.00.00.000000 AM UPDATE

Now the row has been updated.

Synchronize tables without DELETE

There may be times when you want to apply new or changed data to your target table, without removing any historical data. In this case, there is no need for any comparison code before the MERGE; the MERGE statement will do a RIGHT JOIN between the target and the source, and will either UPDATE or INSERT based on whether the target row was found or not.

As a reminder, my test tables T_TARGET and T_SOURCE have 300 rows each.

  • T_TARGET has 10 rows not in T_SOURCE
  • T_SOURCE has 10 rows not in T_TARGET
  • There are 10 rows in both tables, but with different non-key values.

Since we are not doing deletes, there should be 10 updates and 10 inserts and that’s it.

Here is my first try:

merge into T_TARGET o
using T_SOURCE n
on (o.KEY_NUM=n.KEY_NUM and o.KEY_DATE=n.KEY_DATE)
when matched then update
  set VAL_TS=n.VAL_TS,VAL_STRING=n.VAL_STRING
when not matched then insert
  (KEY_NUM,KEY_DATE,VAL_TS,VAL_STRING)
  values(n.KEY_NUM,n.KEY_DATE,n.VAL_TS,n.VAL_STRING);
300 rows merged.

That’s funny, there should be only 20 rows merged! What happens if I run the statement again?

300 rows merged.

Madness!

I’ll bet you have figured out what’s wrong: I am updating rows in T_TARGET that are identical to T_SOURCE, in other words I am making Oracle do lots of work for nothing.

Fortunately, there is a way to filter out identical rows – with a WHERE clause.

merge into T_TARGET o
using T_SOURCE n
on (o.KEY_NUM=n.KEY_NUM and o.KEY_DATE=n.KEY_DATE)
when matched then update
  set VAL_TS=n.VAL_TS,VAL_STRING=n.VAL_STRING
  where 1 in (
    decode(o.VAL_TS,n.VAL_TS,0,1),
    decode(o.VAL_STRING,n.VAL_STRING,0,1)
  )
when not matched then insert
  (KEY_NUM,KEY_DATE,VAL_TS,VAL_STRING)
  values(n.KEY_NUM,n.KEY_DATE,n.VAL_TS,n.VAL_STRING);
20 rows merged.
(and the second time...)
0 rows merged.

Why DECODE?

When I compare the non-key values, I don’t know if they are NULL or not. One way to compare potentially NULL values is with DECODE: if both values are NULL then DECODE will return 0, and if only one is NULL then DECODE will return 1.

Date Ranges in Data Warehouses using Oracle 12c

When you load data with an “effective date” into a Data Warehouse, what happens when the new data is the same as yesterday’s? Do you add a new record with the new “effective date” or do you update a date range in the existing record?

At Open World last month, I got both answers: Tim Gorman presented an efficient method for loading “effective dates” and Dr. Holger Friedrich presented a way to maintain date ranges.

I mentioned to Dr. Friedrich that the 12c MATCH_RECOGNIZE clause could maintain those date ranges more efficiently. By the time I actually wrote the SQL, Dr. Friedrich had written practically the same thing. Here is my variant, inspired from Dr. Friedrich’s test case.

The Test Data

I have three tables:

  • ALL_DATA is the Data Warehouse. Each row is valid starting from DATE_FROM (inclusive) to DATE_TO (exclusive). The “current” rows have a DATE_TO of 9999-12-31.
  • CURRENT_DATA contains all the rows in ALL_DATA that have DATE_TO = 9999-12-31.
  • STAGING_DATA contains the data to be loaded. DATE_EFFECTIVE is the “effective date”.

To help me see what happens, I increment a sequence every time I load data to ALL_DATA. Updated rows are assigned the sequence number + 1/10, and new rows are assigned the sequence number + 2/10.

drop sequence load_seq;
create sequence load_seq;

drop table all_data purge;
create table all_data as
select 1 pk_id,
date '2014-11-13' date_from,
date '9999-12-31' date_to,
'A' rec_value,
load_seq.nextval+.2 load_id
from dual;

drop table current_data purge;
create table current_data as
select * from all_data;

drop table staging_data purge;
create table staging_data(pk_id, date_effective, rec_value)
as select
1, date '2014-12-01', 'A' from dual union all select
1, date '2014-12-02', 'B' from dual union all select
1, date '2014-12-03', 'B' from dual union all select
1, date '2014-12-04', 'C' from dual union all select
1, date '2014-12-05', 'A' from dual union all select
1, date '2014-12-06', 'A' from dual union all select
1, date '2014-12-07', 'D' from dual;
  • Line 20: The first row should be combined with the existing row in ALL_DATA
  • Lines 21-22: should be combined into one row
  • Lines 24-25: should be combined into one row.

Finding the Data to Change

Dr. Friedrich used the method I call “Start of Group” to find out what data to change. MATCH_RECOGNIZE replaces this method more efficiently and with less code.

select * from (
  select pk_id, date_effective, rec_value
  from staging_data
  union all
  select pk_id, date_from, rec_value
  from all_data
  where date_to >= (select min(date_effective) from staging_data)
) match_recognize (
  partition by pk_id order by date_effective
  measures first(date_effective) date_from,
    nvl(next(date_effective), date '9999-12-31') date_to,
    rec_value rec_value
  pattern(a b*)
  define b as rec_value = prev(rec_value)
);
  • Lines 5-7: I make sure to include all the rows in ALL_DATA that could be impacted by the new data.
  • Lines 13-14: In this simple example, REC_VALUE is the only data that can change. I group together all the consecutive rows that have the same value.
PK_ID DATE_FROM DATE_TO REC_VALUE
1 2014-11-13 2014-12-02 A
1 2014-12-02 2014-12-04 B
1 2014-12-04 2014-12-05 C
1 2014-12-05 2014-12-07 A
1 2014-12-07 9999-12-31 D

 

Merging the Changes

var load_id number;
exec :load_id := load_seq.nextval;

merge into all_data o
using (
  select * from (
    select pk_id, date_effective, rec_value
    from staging_data
    union all
    select pk_id, date_from, rec_value
    from all_data
    where date_to >= (select min(date_effective) from staging_data)
  ) match_recognize (
    partition by pk_id order by date_effective
    measures first(date_effective) date_from,
      nvl(next(date_effective), date '9999-12-31') date_to,
      rec_value rec_value
    pattern(a b*)
    define b as rec_value = prev(rec_value)
  )
) n
on ( (o.pk_id, o.date_from) = ((n.pk_id, n.date_from)) )
when matched then update set
  load_id = to_number(:load_id)+.1, date_to = n.date_to
  where o.date_to != n.date_to
when not matched then insert values(
  n.pk_id, n.date_from, n.date_to, n.rec_value, :load_id+.2
);

5 rows merged.
  • Line 25: In my USING clause, I may get rows from ALL_DATA that don’t need to be changed, so I check DATE_TO to make sure I don’t do an update for nothing.
  • Line 30: As you can see from the output below, I updated the existing row and inserted 4 new rows.
select * from all_data order by 3;
PK_ID DATE_FROM DATE_TO REC_VALUE LOAD_ID
1 2014-11-13 2014-12-02 A 2.1
1 2014-12-02 2014-12-04 B 2.2
1 2014-12-04 2014-12-05 C 2.2
1 2014-12-05 2014-12-07 A 2.2
1 2014-12-07 9999-12-31 D 2.2

 

Refreshing CURRENT_DATA

merge into current_data o
using (
  select * from all_data
  where date_to = date '9999-12-31'
) n
on (o.pk_id = n.pk_id)
when matched then update set
  load_id = n.load_id, date_from = n.date_from,
  rec_value = n.rec_value
  where o.load_id != n.load_id
when not matched then insert values(
  n.pk_id, n.date_from, n.date_to, n.rec_value, n.load_id
);

1 rows merged.

Avoiding Unnecessary Updates

If you run each MERGE statement a second time, you will see the lovely message “0 rows merged.” I always do this test. If you are changing data the second time, either there is a bug or you are updating a row to be the same as it was before, which is a lot of work for no benefit.