COMP_SYNC 2: exclude surrogate keys

At the recent ILOUG conference, Sabine Heimsath asked how to compare two tables where the surrogate keys do not match. Here’s how, using my revised comparison package.

Test data

drop table o purge;
create table o (
  pk number generated always as identity primary key,
  val1 number,
  val2 number
);
insert into o(val1, val2)
select level, level from dual connect by level <= 10;

drop table n purge;
create table n (
  pk number generated always as identity start with 42 primary key,
  val1 number,
  val2 number
);
insert into n(val1, val2)
select level+1, level+1 from dual connect by level <= 10;

 

Simple compare: the COMPARE_SQL function

If you exclude a column from the comparison, the SQL from this function will also exclude that column from the output. If there are duplicate rows with the same data, they are grouped together in the output, with a count of the number of rows.

select comp_sync.compare_sql('o','n',p_exclude_cols=>'pk') from dual;

select /*+ qb_name(COMPARE) */
  "VAL1", "VAL2",
  decode(sign(sum(Z##NEW_CNT)), 1, 'I', 'D') Z##OP,
  abs(sum(Z##NEW_CNT)) Z##CNT
FROM (
  select /*+ qb_name(old) */
  "VAL1", "VAL2"
    , -1 Z##NEW_CNT
  from O O
  union all
  select /*+ qb_name(new) */
  "VAL1", "VAL2"
    , 1 Z##NEW_CNT
  from n N
)
group by
  "VAL1", "VAL2"
having sum(Z##NEW_CNT) != 0
order by 1, Z##OP;
VAL1 VAL2 Z##OP Z##CNT
1 1 D 1
11 11 I 1

 

Detailed compare: the CDC_SQL function

The SQL from this function will do the comparison you want, but it will return all the involved rows and all the columns.

select comp_sync.cdc_sql('o','n',p_exclude_cols=>'pk') from dual;

select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    "PK", "VAL1", "VAL2",
    case
      when Z##NEW = 1
        and sum(Z##NEW) over(partition by
          "VAL1", "VAL2"
        order by null rows unbounded preceding) > sum(Z##OLD) over(partition by
          "VAL1", "VAL2"
        )
        then 'I'
      when Z##OLD = 1
        and sum(Z##OLD) over(partition by
          "VAL1", "VAL2"
        order by null rows unbounded preceding) > sum(Z##NEW) over(partition by
          "VAL1", "VAL2"
        )
        then 'D'
    end Z##OP, Z##RID
  FROM (
    select /*+ qb_name(old) */
    "PK", "VAL1", "VAL2",
    1 Z##OLD, 0 Z##NEW, rowid Z##RID
    from O O
    union all
    select /*+ qb_name(new) */
    "PK", "VAL1", "VAL2",
    0, 1, null
    from n N
  )
)
where Z##OP is not null;
PK VAL1 VAL2 Z##OP Z##RID
1 1 1 D AAAX/cAAZAAAEfGA
51 11 11 I

 

SYNC_SQL: synchronizing the data

This will generate a MERGE statement that assumes you want to insert new rows into the “old” table with the same key as the “new” table. This is almost certainly not what you want, but all you have to do is adjust the INSERT part manually. In this case, the surrogate key is generated automatically so we just need to remove that column from the INSERT clause.

select comp_sync.sync_sql('o','n',p_exclude_cols=>'pk') from dual;

merge /*+ qb_name(SYNC_PARTITION) USE_NL(O) */ into (
  select /*+ qb_name(target) */
    "PK", "VAL1", "VAL2", rowid Z##RID
  from O
) O
using (
select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    "PK", "VAL1", "VAL2",
    case
      when Z##NEW = 1
        and sum(Z##NEW) over(partition by
          "VAL1", "VAL2"
        order by null rows unbounded preceding) > sum(Z##OLD) over(partition by
          "VAL1", "VAL2"
        )
        then 'I'
      when Z##OLD = 1
        and sum(Z##OLD) over(partition by
          "VAL1", "VAL2"
        order by null rows unbounded preceding) > sum(Z##NEW) over(partition by
          "VAL1", "VAL2"
        )
        then 'D'
    end Z##OP, Z##RID
  FROM (
    select /*+ qb_name(old) */
    "PK", "VAL1", "VAL2",
    1 Z##OLD, 0 Z##NEW, rowid Z##RID
    from O O
    union all
    select /*+ qb_name(new) */
    "PK", "VAL1", "VAL2",
    0, 1, null
    from n N
  )
)
where Z##OP is not null
) N
on (
  O.Z##RID = n.Z##RID
)
when matched then update set
  "VAL1"=N."VAL1"
  delete where N.Z##OP = 'D'
when not matched then insert (
  --"PK", "VAL1", "VAL2"
  "VAL1", "VAL2"
) values(
  --N."PK", N."VAL1", N."VAL2"
  N."VAL1", N."VAL2"
);

2 rows merged.
Advertisements

COMP_SYNC 1: a new table compare/sync package

I have been meaning to update my COMPARE_SYNC package for some time. I want to change the interface and the functionality a bit, so I am leaving the existing package alone and creating a new one called COMP_SYNC.

If you have used the old package, I would greatly appreciate any feedback on the new version: functionality, performance, bugs, etc. Comment away and thanks in advance.

What COMP_SYNC does for you

The package returns CLOBs containing SQL statements for you to adjust / test / execute. It uses CDC (Change Data Capture) format, with a flag (Z##OP) on each row with ‘I’ for insert, ‘U’ for update and ‘D’ for delete.

  • COMPARE_SQL: COMPARE_SQL returns SQL that compares new source and old target using Tom Kyte’s GROUP BY method. Omitted columns are not compared and do not appear in the output.
    • ‘D’ rows are in “old” but not in “new”.
    • ‘I’ rows are in “new” but not in “old”.
      Since there may be duplicates, Z##CNT has the number of rows involved.
  • CDC_SQL: compares an “old” table (not a view) to “new”. You can exclude columns from the comparison, but the output shows entire rows with all columns, including the ROWID of the “old” row. For every ‘U’ row there is a corresponding ‘O’ (for “old”) row with the old values.
  • SYNC_SQL: compares and syncs from source to target: inserts, updates and deletes.
    Works with any combination of key and non-key columns.
  • SYNC_UPSERT_SQL: inserts and updates but no deletes. Works only when there are both key and non-key columns.
  • SYNC_CDC_SQL: directly applies changes from a CDC table such as returned by CDC_SQL.

Parameter changes

If you have already used COMPARE_SYNC, here is what changed:

  • Columns are now in comma-separated lists and not in little SYS.ODCIVARCHAR2LIST tables.
  • Table names and column names are converted to upper case unless you put them in double quotes.
  • P_EXCLUDE_COLS replaces P_ALL_COLS: if you want to exclude columns from the comparison just list them here, instead of having to list all the columns you want to include.
  • P_PREFIX replaces P_OPERATION_COL: I use a few column names in addition to the actual tables, so the prefix is now applied to all of them to avoid collisions with your names.

The code

[Update 2018-02-13: added source code files]

This site does not allow upload of source code, so I had to add a “.doc” suffix.

comp_sync-pks.doc : package specification, rename to comp_sync.pks

comp_sync-pkb.doc : package body, rename to comp_sync.pkb

create or replace package COMP_SYNC
authid current_user as
/*
COMP_SYNC generates SQL for comparing or synchronizing
"old" target and "new" source.
 
- "Old" can be a table or view, local or remote.
  Indicate separately the "old" owner, "old" table and "old" dblink.
  To compare two queries, create a view to use as the "old".
  To sync, "old" must be a table but I do not check that for you.
- "New" can be local, remote, table, view or a query enclosed in parentheses.
  Examples: 'SCOTT.EMP', 'T_SOURCE@DBLINK', '(select * from SCOTT.EMP@DBLINK)'
 
Note: I never check the "new" source for validity.
I only check the "old" target for validity when I look up columns from the data dictionary.
So the generated SQL is not guaranteed to run without error!
   
The generated SQL is returned as a CLOB.
 
To debug, change the value of G_DOLOG to true. See the beginning of the package body.
 
INPUT PARAMETERS:

-- Required
  
P_OLD_TABLE  : name of the target table or view. Must exist in the database.
 
P_NEW_SOURCE : source table or view - or query enclosed in parentheses.

-- Optional
 
P_OLD_OWNER  : owner of the target. Must exist in the database.
  The default is null, which assumes the current user.
 
P_EXCLUDE_COLS   : optional comma-separated list of columns to OMIT from the comparison.
  If you leave out P_EXCLUDE_COLS, every non-virtual column will be compared,
  both visible and invisible.
  If you omit a PK column, the tables are considered not to have a primary key.
 
P_KEY_COLS : optional comma-separated list of primary key columns.
  This overrides the default search for PK columns in ALL_CONS_COLUMNS.
   
P_OLD_DBLINK : dblink to the target database.
  The default is null, which means the target is in the local database.
   
P_PREFIX : prefix to the names of the columns such as the CDC flag
  ('D', 'I', 'U' or 'O' for the "old" rows being updated).
  When syncing, I delete the rows marked 'D' and ignore the rows marked 'O'.
  The default prefix is 'Z##'.
 
Pre 2018-02-01:
  See the COMPARE_SYNC package.
2018-02-01: Major overhaul
    - Parameters reordered to have most usual first
    - P_EXCLUDE_COLS (to exclude some columns) replaces P_ALL_COLS (that included columns).
    - P_OPERATION_COL is replaced by P_PREFIX that begins all column names I make up.
    - P_EXCLUDE_COLS and P_KEY_COLS are now comma-separated lists and not little tables.
    - User, table and column names are now upper cased unless within double quotes.
    - Instead of passing a huge record among internal procedures,
      I now use global variables. So sue me!
    - CDC output rows include the ROWID of the target table, which is used for efficient syncing.
*/
/*
COMPARING:
 
COMPARE_SQL returns SQL that compares new source and old target
using Tom Kyte's GROUP BY method.
Omitted columns are not compared and do not appear in the output.
'D' rows are in "old" but not in "new".
'I' rows are in "new" but not in "old".
Since there may be duplicates, Z##CNT has the number of rows involved.

Example:
  select COMP_SYNC.COMPARE_SQL('T_TARGET', 'T_SOURCE') from DUAL;
*/
  function COMPARE_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;
/*
CDC_SQL produces CDC output: 'D', 'I', 'U' - or 'O' for the "old" rows being updated.
The output includes the ROWID of the target, except when 'I'.

Example:
  select COMP_SYNC.CDC_SQL('T_TARGET', 'T_SOURCE') from DUAL;
*/
  function CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;
/*
SYNCHRONIZING
 
The package can synchronize in one of three ways:
1) SYNC: Compare and sync from source to target: inserts, updates and deletes.
    Works with any combination of key and non-key columns,
    but the target must be a table because I use the ROWID.
    
2) SYNC_UPSERT: sync from source to target: inserts and updates but no deletes.
    Requires a target with both primary key and non-key columns.
    It does not allow for omitting columns: the workaround is to use a view on the target.
    
3) SYNC_CDC: the source is a "Change Data Capture" table.
  It contains inserts, updates and deletes to be directly applied.
  Must contain an column ending with 'OP' containing the operation flag (I,U,D),
  and a column ending in 'RID' with the ROWID of the target row if U or D. 
*/
/*
Example:
  select COMP_SYNC.SYNC_SQL(
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE'
  ) from DUAL;
*/
  function SYNC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;

/*
Example:
  select COMP_SYNC.SYNC_UPSERT_SQL(
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE',
    P_KEY_COLS => 'C1,C2'
  ) from DUAL;
*/
  function SYNC_UPSERT_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null
  ) return clob;
 
/*
Example:
  select COMP_SYNC.SYNC_CDC_SQL(
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_CDC',
    P_OLD_OWNER => user,
    P_KEY_COLS => 'C1,C2',
    P_PREFIX => 'OPCODE'
  ) from DUAL;
*/
  function SYNC_CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;
 
end COMP_SYNC;
/
create or replace package body COMP_SYNC as
 
  G_DOLOG constant BOOLEAN := false;
    C_NEWLINE constant varchar2(2) := '
';
  
  type TT_VARCHAR2 is table of VARCHAR2(255);
  
  -- set by CHECK_COMMON_INPUTS
  G_OLD_OWNER varchar2(255);
  G_OLD_TABLE varchar2(255);
  G_NEW_SOURCE varchar2(4000);
  G_OLD_DBLINK varchar2(255);
  G_OPERATION_COL varchar2(255);
  G_OLD_OWNER_TABLE varchar2(255);
  
  -- set by MAKE_REPLACEMENTS
  G_ALL_COLS TT_VARCHAR2;   -- all non-virtual columns
  G_SOME_COLS TT_VARCHAR2;  -- all non-virtual columns except those listed on P_EXCLUDE_COLS
  G_KEY_COLS TT_VARCHAR2;   -- from P_KEY_COLS, or by default the "old" primary key columns
  G_FIRST_COL TT_VARCHAR2; -- first column in G_SOME_COLS
  G_ALL_COL_CLOB clob;  
  G_SOME_COL_CLOB clob;
  G_INSERT_COL_CLOB clob;
  G_KEY_COL_CLOB clob;
  G_ON_COL_CLOB clob;
  G_SET_COL_CLOB clob;
  G_FIRST_COL_CLOB clob;
  G_DECODE_COL_CLOB clob;
 
  procedure LOGGER(P_TXT in clob, P_DOLOG in boolean default false) is
  begin
    if G_DOLOG or P_DOLOG then
      DBMS_OUTPUT.PUT_LINE('prompt > ' || P_TXT);
    end if;
  end LOGGER;
  
  /* sets all G_OLD_* parameters, G_NEW_SOURCE and G_OPERATION_COL.
     If P_OLD_OWNER is null, G_OLD_OWNER := user but G_OLD_OWNER_TABLE does not mention schema.
     OWNER, TABLE and OPERATION_COL are uppercased unless within double quotes.
     OWNER is checked for existence. OLD_TABLE is checked for existence later if necessary. */
  procedure CHECK_COMMON_INPUTS(
    P_OLD_OWNER in varchar2,
    P_OLD_TABLE in varchar2,
    P_OLD_DBLINK in varchar2,
    P_NEW_SOURCE in varchar2
  ) is
    L_CNT number;
    L_SQL varchar2(255) :=
q'!select COUNT(*) from ALL_USERS#DBLINK# where USERNAME = trim('"' from '#OLD_OWNER#')!';
  begin
    LOGGER('CHECK_COMMON_INPUTS');
    
    if P_OLD_TABLE is null then 
      RAISE_APPLICATION_ERROR(
        -20001,
        'P_OLD_TABLE must not be null.'
      );
    end if;
    
    if P_OLD_DBLINK is null or SUBSTR(P_OLD_DBLINK,1,1) = '@' then
      G_OLD_DBLINK := upper(P_OLD_DBLINK);
    else
      G_OLD_DBLINK :=  '@' || upper(P_OLD_DBLINK);
    end if;
    
    if substr(P_OLD_OWNER,1,1) = '"' then
      G_OLD_OWNER := P_OLD_OWNER;
    else
      G_OLD_OWNER := upper(P_OLD_OWNER);
    end if;
    
    if substr(P_OLD_TABLE,1,1) = '"' then
      G_OLD_TABLE := P_OLD_TABLE;
    else
      G_OLD_TABLE := upper(P_OLD_TABLE);
    end if;
    
    if G_OLD_OWNER is null then
      G_OLD_OWNER_TABLE := G_OLD_TABLE || G_OLD_DBLINK;
      G_OLD_OWNER := user;
    else
      G_OLD_OWNER_TABLE := G_OLD_OWNER || '.' || G_OLD_TABLE || G_OLD_DBLINK;
    end if;
    
    L_SQL := replace(L_SQL, '#DBLINK#', G_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', G_OLD_OWNER);
    LOGGER(L_SQL);
    execute immediate L_SQL into L_CNT;
    if L_CNT = 0 then
      RAISE_APPLICATION_ERROR(
        -20002,
        'OLD_OWNER = ' ||G_OLD_OWNER|| ': user not found in the database.'
      );
    end if;
    
    if P_NEW_SOURCE is null then
      RAISE_APPLICATION_ERROR(
        -20003,
        'P_NEW_SOURCE is null. Must be table, view or query within parentheses.'
      );
    else
      G_NEW_SOURCE := P_NEW_SOURCE;
    end if;
  
  end CHECK_COMMON_INPUTS;
  
  function COL_TOKENIZE(
    p_string in varchar2
  )
  return TT_VARCHAR2
  as
    c_delim constant varchar2(1) := ',';
    i_prev_pos pls_integer := 1;
    i_pos pls_integer;
    i_max_pos pls_integer := length(p_string) + 1;
    l_col varchar2(255);
    lt_out TT_VARCHAR2 := new TT_VARCHAR2();
    i_out pls_integer := 0;
  begin
    loop
      i_pos := instr(p_string, c_delim, i_prev_pos);
      if i_pos = 0 then
        i_pos := i_max_pos;
      end if;
      l_col := trim(substr(p_string, i_prev_pos, i_pos - i_prev_pos));
      if substr(l_col,1,1) != '"' then
        l_col := '"' || upper(l_col) || '"';
      end if;
      i_out := i_out + 1;
      lt_out.extend;
      lt_out(i_out) := l_col;
      exit when i_pos = i_max_pos;
      i_prev_pos := i_pos + 1;
    end loop;
    return lt_out;
  end COL_TOKENIZE;
 
  /*
  Format input array into CLOB with configurable maximum line length.
  Indentation is handled later using BIG_REPLACE.
  Pattern is simplified printf: each occurence of '%s' is replaced by the array element.
  */
  function STRINGAGG(
    PT_COLS in TT_VARCHAR2,
    P_PATTERN in varchar2 default '%s',
    P_SEPARATOR in varchar2 default ',',
    P_LINEMAXLEN in number default 80
  ) return clob is
    L_CLOB clob;
    L_NEW varchar2(255);
    L_LINELEN number := 0;
  begin
    for I in 1..PT_COLS.COUNT LOOP
      L_NEW := case when I > 1 then ' ' end
        || replace(P_PATTERN, '%s', PT_COLS(I))
        || case when I < PT_COLS.COUNT then P_SEPARATOR end; if L_LINELEN + length(L_NEW) > P_LINEMAXLEN then
        L_CLOB := L_CLOB || C_NEWLINE;
        L_LINELEN := 0;
        L_NEW := SUBSTR(L_NEW,2);
      end if;
      L_CLOB := L_CLOB || L_NEW;
      L_LINELEN := L_LINELEN + length(L_NEW);
    end LOOP;
    return L_CLOB;
  end STRINGAGG;
  
  procedure BIG_REPLACE(
    p_clob in out nocopy clob,
    p_search in varchar2,
    p_replace in clob
  ) is
    c_replace_len constant integer := 30000;
    l_iter integer;
  begin
    if p_search is null then
      RAISE_APPLICATION_ERROR(
        -20004,
        'Internal error in BIG_REPLACE: p_search parameter is null.'
      );
    end if;
    if p_replace is null then
      logger('G_ALL_COL_CLOB : '||G_ALL_COL_CLOB, true);
      logger('G_SOME_COL_CLOB : '||G_SOME_COL_CLOB, true);
      logger('G_INSERT_COL_CLOB : '||G_INSERT_COL_CLOB, true);
      logger('G_KEY_COL_CLOB : '||G_KEY_COL_CLOB, true);
      logger('G_ON_COL_CLOB : '||G_ON_COL_CLOB, true);
      logger('G_SET_COL_CLOB : '||G_SET_COL_CLOB, true);
      logger('G_FIRST_COL_CLOB : '||G_FIRST_COL_CLOB, true);
      logger('G_DECODE_COL_CLOB : '||G_DECODE_COL_CLOB, true);
      RAISE_APPLICATION_ERROR(
        -20005,
        'Internal error in BIG_REPLACE: p_replace parameter is null.'
      );
    end if;
    l_iter := ceil(length(p_replace) / c_replace_len);
    --logger('length(p_replace) : '||length(p_replace));
    --logger('l_iter : '||l_iter);
    for i in 1..l_iter loop
      --logger('(i-1)*c_replace_len+1 : '||((i-1)*c_replace_len+1));
      p_clob := replace(
        p_clob, 
        p_search,
        substr(p_replace, (i-1)*c_replace_len+1, c_replace_len)
          || case when i < l_iter then p_search end ); end loop; end BIG_REPLACE; function GET_ALL_COLS return TT_VARCHAR2 is l_version number; l_instance_sql varchar2(255) := q'!select to_number(regexp_substr(banner, 'Release ([^|.]+)', 1, 1, 'i', 1)) from v$version#DBLINK# where rownum = 1!'; L_TAB_COLS SYS.ODCIVARCHAR2LIST; L_ALL_COLS TT_VARCHAR2 := new TT_VARCHAR2(); L_SQL varchar2(255) := q'!select '"'||COLUMN_NAME||'"' from ALL_TAB_COLS#DBLINK# where (OWNER, TABLE_NAME, VIRTUAL_COLUMN) = ((trim('"' from '#OLD_OWNER#'), trim('"' from '#OLD_TABLE#'), 'NO')) and #VERSION_DEPENDENT# order by SEGMENT_COLUMN_ID!'; begin LOGGER('GET_ALL_COLS'); l_instance_sql := replace(l_instance_sql, '#DBLINK#', G_OLD_DBLINK); LOGGER(l_instance_sql); execute immediate l_instance_sql into l_version; logger('l_version = ' || l_version); if l_version >= 12 then
      L_SQL := replace(L_SQL, '#VERSION_DEPENDENT#', 'USER_GENERATED = ''YES''');
    else
      L_SQL := replace(L_SQL, '#VERSION_DEPENDENT#', 'HIDDEN_COLUMN = ''NO''');
    end if;
    L_SQL := replace(L_SQL, '#DBLINK#', G_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', G_OLD_OWNER);
    L_SQL := replace(L_SQL, '#OLD_TABLE#', G_OLD_TABLE);
    LOGGER(L_SQL);
    execute immediate L_SQL bulk collect into L_TAB_COLS;
    if L_TAB_COLS.COUNT = 0 then
      RAISE_APPLICATION_ERROR(
        -20006,
        G_OLD_OWNER_TABLE || ': table not found.'
      );
    end if;
    L_ALL_COLS.extend(L_TAB_COLS.count);
    for i in 1..L_TAB_COLS.count loop
      L_ALL_COLS(i) := L_TAB_COLS(i);
    end loop;
    return L_ALL_COLS;
  end GET_ALL_COLS;
 
  function GET_KEY_COLS return TT_VARCHAR2 is
    L_KEY_COLS TT_VARCHAR2 := new TT_VARCHAR2();
    L_KEY_COL_LIST SYS.ODCIVARCHAR2LIST;
    L_SQL varchar2(4000) := 
q'!select '"'||COLUMN_NAME||'"'
from ALL_CONS_COLUMNS#DBLINK#
where (OWNER, CONSTRAINT_NAME) = (
  select OWNER, CONSTRAINT_NAME from ALL_CONSTRAINTS#DBLINK#
  where (OWNER, TABLE_NAME, CONSTRAINT_TYPE) =
        ((trim('"' from '#OLD_OWNER#'), trim('"' from '#OLD_TABLE#'), 'P'))
)!';
  begin
    LOGGER('GET_KEY_COLS');
    L_SQL := replace(L_SQL, '#DBLINK#', G_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', G_OLD_OWNER);
    L_SQL := replace(L_SQL, '#OLD_TABLE#', G_OLD_TABLE);
    LOGGER(L_SQL);
    execute immediate L_SQL bulk collect into L_KEY_COL_LIST;
    L_KEY_COLS.extend(L_KEY_COL_LIST.count);
    for i in 1..L_KEY_COL_LIST.count loop
    L_KEY_COLS(i) := L_KEY_COL_LIST(i);
    end loop;
    return L_KEY_COLS;
  end GET_KEY_COLS;
 
  procedure MAKE_REPLACEMENTS(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2,
    P_EXCLUDE_COLS in varchar2,
    P_KEY_COLS in varchar2,
    P_OLD_DBLINK in varchar2
  ) is
    L_NON_KEY_COLS TT_VARCHAR2;
    L_EXCLUDE_COLS TT_VARCHAR2;
  begin
    LOGGER('MAKE_REPLACEMENTS');
    check_common_inputs(
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_OLD_DBLINK,
      P_NEW_SOURCE
    );
    G_ALL_COLS := GET_ALL_COLS;
    if P_EXCLUDE_COLS is null then
      G_SOME_COLS := G_ALL_COLS;
    else
      L_EXCLUDE_COLS := COL_TOKENIZE(P_EXCLUDE_COLS);
      G_SOME_COLS := G_ALL_COLS multiset except L_EXCLUDE_COLS;
    end if;
    G_FIRST_COL := new TT_VARCHAR2(G_SOME_COLS(1));
    G_ALL_COL_CLOB := STRINGAGG(G_ALL_COLS);
    G_SOME_COL_CLOB := STRINGAGG(G_SOME_COLS);
    G_INSERT_COL_CLOB := STRINGAGG(G_ALL_COLS, 'N.%s');
    G_FIRST_COL_CLOB := STRINGAGG(G_FIRST_COL, '%s=N.%s');
    
    if P_KEY_COLS is null then
      G_KEY_COLS := GET_KEY_COLS;
    else
      G_KEY_COLS := COL_TOKENIZE(P_KEY_COLS);
    end if;
    
    if cardinality(G_KEY_COLS multiset intersect L_EXCLUDE_COLS) > 0 then
      G_KEY_COLS := null;
    end if;
    
    G_KEY_COL_CLOB := null;
    G_ON_COL_CLOB := null;
    G_SET_COL_CLOB := null;
    G_DECODE_COL_CLOB := null;
    if G_KEY_COLS is not null and G_KEY_COLS.COUNT > 0 then
      G_KEY_COL_CLOB := STRINGAGG(G_KEY_COLS);
      G_ON_COL_CLOB := STRINGAGG(G_KEY_COLS, 'O.%s=N.%s', ' and');
      L_NON_KEY_COLS := G_SOME_COLS multiset except G_KEY_COLS;
      if L_NON_KEY_COLS.COUNT between 1 and G_SOME_COLS.COUNT - 1 then
        G_SET_COL_CLOB := STRINGAGG(L_NON_KEY_COLS, '%s=N.%s');
        G_DECODE_COL_CLOB := STRINGAGG(L_NON_KEY_COLS, 'decode(O.%s,N.%s,0,1)');
      end if;
    end if;
    
    logger('G_ALL_COL_CLOB : '||G_ALL_COL_CLOB);
    logger('G_SOME_COL_CLOB : '||G_SOME_COL_CLOB);
    logger('G_INSERT_COL_CLOB : '||G_INSERT_COL_CLOB);
    logger('G_KEY_COL_CLOB : '||G_KEY_COL_CLOB);
    logger('G_ON_COL_CLOB : '||G_ON_COL_CLOB);
    logger('G_SET_COL_CLOB : '||G_SET_COL_CLOB);
    logger('G_FIRST_COL_CLOB : '||G_FIRST_COL_CLOB);
    logger('G_DECODE_COL_CLOB : '||G_DECODE_COL_CLOB);

  end MAKE_REPLACEMENTS;

  function COMPARE_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_CLOB constant clob :=
'select /*+ qb_name(COMPARE) */
  #SOME_COLS#,
  decode(sign(sum(#PREFIX#NEW_CNT)), 1, ''I'', ''D'') #PREFIX#OP,
  abs(sum(#PREFIX#NEW_CNT)) #PREFIX#CNT
FROM (
  select /*+ qb_name(old) */
  #SOME_COLS#
    , -1 #PREFIX#NEW_CNT
  from #OLD# O
  union all
  select /*+ qb_name(new) */
  #SOME_COLS#
    , 1 #PREFIX#NEW_CNT
  from #NEW# N
)
group by
  #SOME_COLS#
having sum(#PREFIX#NEW_CNT) != 0
order by 1, #PREFIX#OP';
  begin
    LOGGER('COMPARE_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      P_EXCLUDE_COLS,
      null,
      P_OLD_DBLINK
    );
    L_CLOB := replace(
      C_CLOB,
      '#SOME_COLS#',
      replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    return L_CLOB;
  end COMPARE_SQL;

  function CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_GROUP_CLOB constant clob :=
'select /*+ qb_name(CDC_GROUP) */
    #SOME_COLS#,
  case count(*) over(partition by #KEY_COLS#) - #PREFIX#NEW_CNT
    when 0 then ''I''
    when 1 then ''U''
    when 2 then ''D''
    when 3 then ''O''
  end #PREFIX#OP,
  max(#PREFIX#RID) over(partition by #KEY_COLS#) #PREFIX#RID
FROM (
  select /*+ qb_name(COMPARE) NO_MERGE */
    #SOME_COLS#,
    sum(#PREFIX#NEW_CNT) #PREFIX#NEW_CNT,
    max(#PREFIX#RID) #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #SOME_COLS#,
    -1 #PREFIX#NEW_CNT, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #SOME_COLS#,
    1 #PREFIX#NEW_CNT, null
    from #NEW# N
  )
  group by
    #SOME_COLS#
  having sum(#PREFIX#NEW_CNT) != 0
)
order by 1, #PREFIX#OP';
    C_PARTITION_CLOB constant clob :=
'select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    #ALL_COLS#,
    case
      when #PREFIX#NEW = 1
        and sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        )
        then ''I''
      when #PREFIX#OLD = 1
        and sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        )
        then ''D''
    end #PREFIX#OP, #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #ALL_COLS#,
    1 #PREFIX#OLD, 0 #PREFIX#NEW, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #ALL_COLS#,
    0, 1, null
    from #NEW# N
  )
)
where #PREFIX#OP is not null';
  begin
    LOGGER('COMPARE_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      P_EXCLUDE_COLS,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    if G_KEY_COL_CLOB is null or P_EXCLUDE_COLS is not null then
      L_CLOB := C_PARTITION_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
      big_replace(
        L_CLOB,
        '#ALL_COLS#',
        replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
    else
      L_CLOB := C_GROUP_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '    ')
      );
      big_replace(L_CLOB, '#KEY_COLS#', G_KEY_COL_CLOB);
    end if;
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    return L_CLOB;
  end CDC_SQL; 
  
  function SYNC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_GROUP_CLOB constant clob :=
'merge /*+ qb_name(SYNC_GROUP) USE_NL(O) */ into (
  select /*+ qb_name(target) */
  #ALL_COLS#, rowid #PREFIX#RID
  from #OLD#
) O
using (
select * from (
select /*+ qb_name(CDC_GROUP) */
    #SOME_COLS#,
  case count(*) over(partition by #KEY_COLS#) - #PREFIX#NEW_CNT
    when 0 then ''I''
    when 1 then ''U''
    when 2 then ''D''
    when 3 then ''O''
  end #PREFIX#OP,
  max(#PREFIX#RID) over(partition by #KEY_COLS#) #PREFIX#RID
FROM (
  select /*+ qb_name(COMPARE) NO_MERGE */
    #SOME_COLS#,
    sum(#PREFIX#NEW_CNT) #PREFIX#NEW_CNT,
    max(#PREFIX#RID) #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #SOME_COLS#,
    -1 #PREFIX#NEW_CNT, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #SOME_COLS#,
    1 #PREFIX#NEW_CNT, null
    from #NEW# N
  )
  group by
    #SOME_COLS#
  having sum(#PREFIX#NEW_CNT) != 0
)
)
where #PREFIX#OP in(''I'',''U'',''D'')
) N
on (
  O.#PREFIX#RID = n.#PREFIX#RID
)
when matched then update set
  #SET_COLS#
  where N.#PREFIX#OP in (''U'', ''D'')
  delete where N.#PREFIX#OP = ''D''
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';
    C_PARTITION_CLOB constant clob :=
'merge /*+ qb_name(SYNC_PARTITION) USE_NL(O) */ into (
  select /*+ qb_name(target) */
    #ALL_COLS#, rowid #PREFIX#RID
  from #OLD#
) O
using (
select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    #ALL_COLS#,
    case
      when #PREFIX#NEW = 1
        and sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        )
        then ''I''
      when #PREFIX#OLD = 1
        and sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        )
        then ''D''
    end #PREFIX#OP, #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #ALL_COLS#,
    1 #PREFIX#OLD, 0 #PREFIX#NEW, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #ALL_COLS#,
    0, 1, null
    from #NEW# N
  )
)
where #PREFIX#OP is not null
) N
on (
  O.#PREFIX#RID = n.#PREFIX#RID
)
when matched then update set
  #FIRST_COL#
  delete where N.#PREFIX#OP = ''D''
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';
  begin
    LOGGER('SYNC_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      P_EXCLUDE_COLS,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    if G_KEY_COL_CLOB is null or G_SET_COL_CLOB is null or P_EXCLUDE_COLS is not null then
      L_CLOB := C_PARTITION_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
      big_replace(
        L_CLOB,
        '#ALL_COLS#',
        replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
      L_CLOB := replace(L_CLOB, '#FIRST_COL#', G_FIRST_COL_CLOB);
    else
      L_CLOB := C_GROUP_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '    ')
      );
      big_replace(
        L_CLOB,
        '#ALL_COLS#',
        replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
      );
      big_replace(
        L_CLOB,
        '#SET_COLS#',
        replace(G_SET_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
      );
      L_CLOB := replace(L_CLOB, '#KEY_COLS#', G_KEY_COL_CLOB);
    end if;
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    big_replace(
      L_CLOB,
      '#INSERT_COLS#',
      replace(G_INSERT_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    return L_CLOB;
  end SYNC_SQL;
 
  function SYNC_UPSERT_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
    L_CLOB clob;
    C_CLOB constant clob :=
'merge /*+ qb_name(SYNC_UPSERT) USE_NL(O) */ into (
  select /*+ qb_name(target) */
  #ALL_COLS#
  from #OLD#
) O
using (
  select /*+ qb_name(source) */
  #ALL_COLS#
  from #NEW#
) N
on (
  #ON_COLS#
)
when matched then update set
  #SET_COLS#
  where 1 in (
    #DECODE_COLS#
  )
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';

  begin
    LOGGER('SYNC_UPSERT_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      null,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    if G_SET_COL_CLOB is null then
      RAISE_APPLICATION_ERROR(
        -20007,
        'SYNC_UPSERT_SQL requires a target with both primary and non-key columns'
      );
    end if;
    L_CLOB := C_CLOB;
    big_replace(
      L_CLOB,
      '#ALL_COLS#',
      replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    big_replace(
      L_CLOB,
      '#ON_COLS#',
      replace(G_ON_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    big_replace(
      L_CLOB,
      '#SET_COLS#',
      replace(G_SET_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    big_replace(
      L_CLOB,
      '#DECODE_COLS#',
      replace(G_DECODE_COL_CLOB, C_NEWLINE, C_NEWLINE || '    ')
    );
    big_replace(
      L_CLOB,
      '#INSERT_COLS#',
      replace(G_INSERT_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    return L_CLOB;
  end SYNC_UPSERT_SQL;
 
  function SYNC_CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_CLOB constant clob :=
'merge /*+ qb_name(SYNC_CDC_SQL) USE_NL(O) */ into (
  select /*+ qb_name(target) */
  #ALL_COLS#, rowid #PREFIX#RID
  from #OLD#
) O
using (
  select /*+ qb_name(source) */ #PREFIX#OP, #PREFIX#RID,
  #ALL_COLS#
  from #NEW#
  where #PREFIX#OP in(''D'', ''I'', ''U'')
) N
on (
  O.#PREFIX#RID = n.#PREFIX#RID
)
when matched then update set
  #SET_COLS#
  delete where N.#PREFIX#OP = ''D''
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';
 
  begin
    LOGGER('SYNC_CDC_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      null,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    L_CLOB := C_CLOB;
    big_replace(
      L_CLOB,
      '#ALL_COLS#',
      replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    if G_SET_COL_CLOB is not null then
      big_replace(
        L_CLOB,
        '#SET_COLS#',
        replace(G_SET_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
      );
    else
      L_CLOB := replace(L_CLOB, '#SET_COLS#', G_FIRST_COL_CLOB);
    end if;
    big_replace(
      L_CLOB,
      '#INSERT_COLS#',
      replace(G_INSERT_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    return L_CLOB;
  end SYNC_CDC_SQL;
   
end COMP_SYNC;
/
/

10 Cool things about the COMPARE_SYNC package

@thatjeffsmith recently recommended an article about making your blog more popular. The article said “lists of 10 things” were great ways to get more readers. Hey, if that’s all it takes…

COMPARE_SYNC is a package that generates SQL to compare data or synchronize tables. Here are 10 good reasons to use it.

1. Efficiently compare tables, views or query results

The COMPARE SQL uses Tom Kyte’s GROUP BY method. When comparing two tables, there is only one full scan of each table. The “new” source can be a table, view or query in parentheses. The “old” source must be a table or a view.

2. Fully synchronize an “old” table with a “new” source

The SYNC SQL compares “old” and “new”, then applies the differences to the “old” table using MERGE. The end result is that the “old” table is identical to the “new” source.

3. UPSERT synchronize the “old” table from a “new” source

If you want to apply changes to the “old” table, but without doing any DELETEs, then MERGE can compare and change with only one full scan of each table!

4. Apply Change Data Capture (CDC) input to the “old” table

All you need is a column in the input that contains ‘D’ when the row is to be deleted. One MERGE statement will do the rest.

5. Synchronize data with or without primary keys

With primary keys, the MERGE statement combines updates, deletes and inserts to do the job. Without primary keys, I generate a MERGE statement that uses only deletes and inserts. (The UPSERT and CDC methods require primary keys.)

6. Do no unnecessary changes

All of the above methods change rows only when they need to be changed! If the data is already in sync, you will see the message “0 rows merged”.

7. Correctly handle remote data, virtual columns and invisible columns

Yes, the “old” table can be remote: you can “push” the changes to the remote database. The generated SQL automatically excludes virtual columns and includes invisible columns, so all real data is synchronized.

8. Customize the SQL to be generated

  • You can explicitly list the columns to be compared or synchronized.
  • You can explicitly list the “primary key” columns, if the “old” target does not have a primary key but there is a unique, non-null column or column list you can use.

9. Avoid SQL injection

The package runs with the privileges of the current user, so there is no risk of “privilege escalation”. Besides, the package doesn’t do anything to your data! It just generates SQL for you to review and execute if you choose.

10. Tested over 27000 times!

I generated code to call the package with every possible combination of input parameters, then I called the package, and then I ran the SQL that was generated. In case you’re wondering, yes I did find (and correct) some bugs.

I hope this package proves useful to some of you out there.

Way Too Invisible Columns

Oracle Database 12c introduced “invisible columns”: they are only visible when you name them explicitly in the SELECT list. Unfortunately, they seem to be even more invisible when you access them through a database link! Here are some surprising results from SELECT and MERGE statements.

Test data

I made this setup as concise as possible, so it is not realistic.

  • I will select from table T, and I will merge into T using a source table S.
  • @ORLC@LOOPBACK is a database link to the same database.
  • V_LOCAL is a view on T. It explicitly names the invisible column I1, so I1 should be “visible” when accessing the view.
  • V_REMOTE is a view on T, but through the database link.
create table T(
  K1 number primary key,
  I1 number INVISIBLE not null
);

insert into T (K1,I1)
select 1, 1 from DUAL;

create table S(
  K1 number primary key,
  I1 number INVISIBLE not null
);

insert into S (K1,I1)
select 1, 1 from DUAL;

create database link ORCL@LOOPBACK
connect to STEW identified by STEW using
  '(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=
127.0.0.1)(PORT=1521)))(CONNECT_DATA=(service_name=orcl.localdomain)(SERVER=DEDICATED)))';

create or replace view V_LOCAL as
select K1, I1 from T;

create or replace view V_REMOTE as
select K1, I1 from T@ORCL@LOOPBACK;

Testing SELECT

> select * from T
/
        K1
----------
         1 

> select K1, I1 from T
/
        K1         I1
---------- ----------
         1          1

This just shows how the invisible column works: you don’t see it when you say SELECT *.
Now let’s try the views. Since I named the column in each view, I assume it will be visible at all times.

> select * from V_LOCAL
/
        K1         I1
---------- ----------
         1          1 

> select * from V_REMOTE
/
        K1         I1
---------- ----------
         1          1

OK, that worked as expected. Now let’s try to access the “local” view through the database link.

> select * from V_LOCAL@ORCL@LOOPBACK
/
        K1
----------
         1

Oops! The very same SELECT on the very same view gives different results when accessed remotely. The invisible column has gone into hiding again.
Note that this is on version 12.1.0.2.
[Update 2018-02-11: I get the same result in version 12.2.0.1]

Testing MERGE with a WHERE clause

To be concise, I am using MERGE to update the I1 column only. Since I don’t want to do any unnecessary work, I make sure the I1 column is different before doing the update. That is why I add the clause where O.I1 != N.I1.

That WHERE clause is the problem. Oracle refuses to “see” the O.I1 column: it keeps saying it is an “invalid identifier”.

Local table: exception
> merge into T O
using (select K1, I1 from S) N
on (O.K1 = N.K1)
when matched then update set I1=N.I1
  where O.I1 != N.I1
/
SQL Error: ORA-00904: "O"."I1": invalid identifier
Local table with inline view: OK

I already figured out how to work around this problem: use an “inline view” in the INTO clause.

> merge into ( select K1, I1 from T ) O
using (select K1, I1 from S) N
on (O.K1 = N.K1)
when matched then update set I1=N.I1
  where O.I1 != N.I1
/
0 rows merged.
Remote table with inline view: exception

I’m going to use the same “inline view” technique here, but with a remote table.

> merge into (
  select K1, I1 from T@ORCL@LOOPBACK
) O
using (select K1, I1 from S) N
on (O.K1 = N.K1)
when matched then update set I1=N.I1
  where O.I1 != N.I1
/
SQL Error: ORA-00904: "A1"."I1": invalid identifier
ORA-02063: preceding line from ORCL@LOOPBACK

Look closely at the error message: the “O” alias has disappeared, to be replaced by “A1”. Apparently, the remote database is using an “A1” alias internally, but it does not realize that the column should be visible.

I tried several different ways to work around this problem. To be brief, I’ll just show the combination that finally worked.

Inline view of remote access to local view: OK

Believe it or not, the only combinition that worked was to create a view on the table, then access that view while also explicitly naming the columns.

> merge into (
  select K1, I1 from V_LOCAL@ORCL@LOOPBACK
) O
using (select K1, I1 from S) N
on (O.K1 = N.K1)
when matched then update set I1=N.I1
  where O.I1 != N.I1
/
0 rows merged.

Conclusion

Watch out for invisible columns on remote tables! They are even more invisible remotely than they are locally. Views that are accessed remotely don’t act the same as when they are accessed locally.

Using MERGE on remote invisible columns is especially challenging.

All these tests use the same Oracle version on the local and remote databases. Who knows what will happen when a pre-12c database tries to access an invisible column in a remote 12c database?

MERGE and invisible columns = invisible documentation?

Oracle 12c introduced “invisible columns” to help us add columns to tables without breaking existing applications. The documentation explains how they work with SELECT and INSERT, but not MERGE. Here’s what happened when I tried MERGE.

Before: visible columns, existing SQL

Here is some simple test data with two tables. The source table will be used in the MERGE statement.

create table T_TARGET as
select 1 PK, 0 VALUE_VISIBLE from DUAL;

create table T_SOURCE as
select 1 PK, 1 VALUE_VISIBLE from DUAL
union all
select 2 PK, 2 VALUE_VISIBLE from DUAL;

Now some simple SQL just to show how things work without invisible columns. These are examples of how not to code:

  • select * is a bad practice: we should list explicitly the columns we want back.
  • insert into should also have an explicit list of columns within parentheses, just before the values() clause.
  • the insert part of the MERGE statement should also have an explicit list of columns.

If we all followed the good practice of listing columns explicitly, there would be no need for invisible columns!

> select * from T_TARGET

        PK VALUE_VISIBLE
---------- -------------
         1             0 

> insert into T_TARGET values(2,0)
1 rows inserted.

> select * from T_TARGET

        PK VALUE_VISIBLE
---------- -------------
         1             0
         2             0 

> rollback
rollback complete.

> merge into T_TARGET O
using T_SOURCE N
on (O.PK = N.PK)
when matched then update
  set VALUE_VISIBLE = N.VALUE_VISIBLE
when not matched then insert values(N.PK, N.VALUE_VISIBLE)

2 rows merged.

> select * from T_TARGET

        PK VALUE_VISIBLE
---------- -------------
         1             1
         2             2 

> rollback
rollback complete.

After: invisible column, existing SQL

When I add an invisible column, then run the same three statements, I get exactly the same results, even for the MERGE.

> alter table T_TARGET add VALUE_INVISIBLE number INVISIBLE
table T_TARGET altered.

> alter table T_SOURCE add VALUE_INVISIBLE number INVISIBLE
table T_SOURCE altered.

> update T_SOURCE set VALUE_INVISIBLE = VALUE_VISIBLE
2 rows updated.

> commit
committed.
> select * from T_TARGET

        PK VALUE_VISIBLE
---------- -------------
         1             0 

> insert into T_TARGET values(2,0)
1 rows inserted.

> select * from T_TARGET

        PK VALUE_VISIBLE
---------- -------------
         1             0
         2             0 

> rollback
rollback complete.

> merge into T_TARGET O
using T_SOURCE N
on (O.PK = N.PK)
when matched then update
  set VALUE_VISIBLE = N.VALUE_VISIBLE
when not matched then insert values(N.PK, N.VALUE_VISIBLE)

2 rows merged.

> select * from T_TARGET

        PK VALUE_VISIBLE
---------- -------------
         1             1
         2             2 

> rollback
rollback complete.

After: accessing the invisible column

The “old” SQL statements worked exactly as before. The only thing to watch out for is that the new column will be null in any newly inserted rows.

Now let’s change the SQL to work with the new invisible column. As far as SELECT and INSERT are concerned, the documentation says to just list all the columns.

> select PK, VALUE_VISIBLE, VALUE_INVISIBLE from T_TARGET

        PK VALUE_VISIBLE VALUE_INVISIBLE
---------- ------------- ---------------
         1             0                 

> insert into T_TARGET (PK, VALUE_VISIBLE, VALUE_INVISIBLE)
  values(2,0,0)
1 rows inserted.

> select PK, VALUE_VISIBLE, VALUE_INVISIBLE from T_TARGET
        PK VALUE_VISIBLE VALUE_INVISIBLE
---------- ------------- ---------------
         1             0
         2             0               0 

> rollback
rollback complete.

All right, now how do I make the MERGE work? I suppose I just have to list the columns in the insert part:

> merge into T_TARGET O
using T_SOURCE N
on (O.PK = N.PK)
when matched then update set
  VALUE_VISIBLE = N.VALUE_VISIBLE,
  VALUE_INVISIBLE = N.VALUE_INVISIBLE
when not matched then insert
  (PK, VALUE_VISIBLE, VALUE_INVISIBLE)
  values(N.PK, N.VALUE_VISIBLE, N.VALUE_INVISIBLE)

...
Error at Command Line : 9 Column : 33
Error report -
SQL Error: ORA-00904: "N"."VALUE_INVISIBLE": invalid identifier

Oops! It took me awhile to realize that “using T_SOURCE” was the same as “using (select * from T_SOURCE)“! I can’t just refer to the table anymore.

> merge into T_TARGET O
using (
  select PK, VALUE_VISIBLE, VALUE_INVISIBLE from T_SOURCE
) N
on (O.PK = N.PK)
when matched then update set
  VALUE_VISIBLE = N.VALUE_VISIBLE,
  VALUE_INVISIBLE = N.VALUE_INVISIBLE
when not matched then insert
  (PK, VALUE_VISIBLE, VALUE_INVISIBLE)
  values(N.PK, N.VALUE_VISIBLE, N.VALUE_INVISIBLE)

2 rows merged.

> rollback
rollback complete.

Victory! but my work is not done: I don’t like this MERGE statement because I may update rows that don’t need updating. To make sure this doesn’t happen, I’m going to add a where clause.

> merge into T_TARGET O
using (
  select PK, VALUE_VISIBLE, VALUE_INVISIBLE from T_SOURCE
) N
on (O.PK = N.PK)
when matched then update set
  VALUE_VISIBLE = N.VALUE_VISIBLE,
  VALUE_INVISIBLE = N.VALUE_INVISIBLE
  where 1 in (
    DECODE(O.VALUE_VISIBLE,  N.VALUE_VISIBLE,  0,1),
    DECODE(O.VALUE_INVISIBLE,N.VALUE_INVISIBLE,0,1)
  )
when not matched then insert
  (PK, VALUE_VISIBLE, VALUE_INVISIBLE)
  values(N.PK, N.VALUE_VISIBLE, N.VALUE_INVISIBLE)
...
Error at Command Line : 11 Column : 12
Error report -
SQL Error: ORA-00904: "O"."VALUE_INVISIBLE": invalid identifier

This is one of the stranger results I have seen from a SQL statement. I was able to update the VALUE_INVISIBLE column before, but I can’t refer to it here? I finally tried the same trick as for the using clause:

> merge into (
  select PK, VALUE_VISIBLE, VALUE_INVISIBLE from T_TARGET
) O
using (
  select PK, VALUE_VISIBLE, VALUE_INVISIBLE from T_SOURCE
) N
on (O.PK = N.PK)
when matched then update set
  VALUE_VISIBLE = N.VALUE_VISIBLE,
  VALUE_INVISIBLE = N.VALUE_INVISIBLE
  where 1 in (
    DECODE(O.VALUE_VISIBLE,  N.VALUE_VISIBLE,  0,1),
    DECODE(O.VALUE_INVISIBLE,N.VALUE_INVISIBLE,0,1)
  )
when not matched then insert
  (PK, VALUE_VISIBLE, VALUE_INVISIBLE)
  values(N.PK, N.VALUE_VISIBLE, N.VALUE_INVISIBLE)

2 rows merged.

Conclusion

The MERGE starts with the equivalent of a SELECT...FROM <target> RIGHT OUTER JOIN <source>. Any columns we read later on must be included in that SELECT. We read the source columns all over the place, and we may also read the target columns in WHERE clauses. If we need to read invisible columns, they must be explicitly named by using a view or a subquery.

[Update 2015-01-26: After on discussion on OTN, the use of a subquery may be OK.]

Notice I use a subquery in the INTO clause. The documentation on MERGE talks specifically about tables and views, but an “inline view” should be OK. I have opened an SR with Oracle Support to make sure. See my question on OTN for details.

COMPARE_SYNC: Introducing the package

After many blog posts about comparing and synchronizing tables, I have united all the techniques I presented in one place. The COMPARE_SYNC package generates SQL for

  • Comparing tables, views and queries, both local and remote.
  • Synchronizing, or applying changes to target tables from either source tables or “Change Data Capture” input.

This is a “helper” tool for developers. It does not change the data, it just returns a SQL statement that you can analyze, test and deploy as you wish.

For “help”, look at the comments in the package specification.

I’ll be blogging about some use cases soon. In the meantime, check it out…and please give me feedback in the comments.

Alas, WordPress won’t let me upload .sql files, so I’m afraid you’ll have to do some copying and pasting: sorry.

Hope this helps…

[Update 2015-01-25: the extra “--'” at the end of “default 'OPERATION'” is just a workaround for the SQL syntax highligher.]

[Update 2015-01-30: P_OLD_OWNER now has a default value of null, which means assume the target belongs to the current user but don’t put the owner in the generated SQL. Added the P_OLD_DBLINK parameter. Bug fixes.]

[Update 2015-03-03: Changed name to COMPARE_SYNC. Column lists are CLOBs and are formatted in lines of 80 characters max. Bug fix to allow querying ALL_TAB_COLS in versions 10 and 11.]

[Update 2015-03-06: To get DB version, using V$VERSION (accessible to PUBLIC) instead of V$INSTANCE. Now accessing ALL_CONSTRAINTS from remote database when appropriate.]

create or replace package COMPARE_SYNC
authid current_user as
/*
COMPARE_SYNC generates SQL for comparing or synchronizing
"old" target and "new" source.

- "Old" can be a table or view, local or remote.
  Indicate separately the "old" owner, "old" table and "old" dblink.
  To compare two queries, create a view to use as the "old".
  To sync, "old" is usually a table but I do not check that for you.
- "New" can be local, remote, table, view or a query enclosed in parentheses.
  Examples: 'SCOTT.EMP', 'T_SOURCE@DBLINK', '(select * from SCOTT.EMP@DBLINK)'

Note: I never check the "new" source for validity.
I only check the "old" target for validity
when I look up columns from the data dictionary.
So the generated SQL is not guaranteed to run without error!
  
The generated SQL is returned as a CLOB.

To debug, change the value of G_DOLOG to true. See line 16 of the package body.

COMMON INPUT PARAMETERS:

P_OLD_OWNER  : owner of the target. Must exist in the database.
  The default is null, which assumes the current user.
  
P_OLD_TABLE  : name of the target table or view. Must exist in the database.

P_NEW_SOURCE : source table or view - or query enclosed in parentheses.

P_TAB_COLS   : optional sys.odcivarchar2list() array of columns to compare/sync.
  If you leave out P_TAB_COLS, every non-virtual column will be compared/synced,
  both visible and invisible.
  
P_OLD_DBLINK : dblink to the target database.
  The default is null, which means the target is in the local database.

2015-01-30:
  bug fixes. Added P_OLD_DBLINK parameter. P_OLD_OWNER now has default value.
2015-02-28:
  Changed name of package to COMPARE_SYNC
  Column lists are now reformatted so line length is 80 maximum.
  Column lists are now CLOB instead of VARCHAR2, so no limits on number of columns.
  Fixed bug accessing ALL_TAB_COLS.USER_GENERATED, which was only added in 12.1.
    I now use different code for previous versions.
2015-03-06:
  To get DB version, using V$VERSION (accessible to PUBLIC) instead of V$INSTANCE.
  Now accessing ALL_CONSTRAINTS from remote database when appropriate.
*/
/*
COMPARING

COMPARE_SQL returns SQL that compares new source and old target
using Tom Kyte's GROUP BY method.
*/
/*
Example:
  select COMPARE_SYNC.COMPARE_SQL(user, 'T_TARGET', 'T_SOURCE') from DUAL;
*/
  function COMPARE_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_DBLINK in varchar2 default null
  ) return clob;

/*
Example:
  select COMPARE_SYNC.COMPARE_SQL(
    null, 
    'T_TARGET', 
    'T_SOURCE',
    SYS.ODCIVARCHAR2LIST('C1','C2','C3', '"c4"')
  ) from DUAL;
*/
  function COMPARE_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_TAB_COLS in SYS.ODCIVARCHAR2LIST,
    P_OLD_DBLINK in varchar2 default null
  ) return clob;
  
/*
SYNCHRONIZING

The package can synchronize in one of three ways:
1) SYNC: Compare and sync from source to target: inserts, updates and deletes.
2) SYNC_UPSERT: sync from source to target: inserts and updates but no deletes.
3) SYNC_CDC: the source is a "Change Data Capture" table.
  It contains inserts, updates and deletes to be directly applied.

SYNC_UPSERT and SYNC_CDC require a target
  with both primary key and non-key columns.
SYNC works with any combination of key and non-key columns,
but the target must be a table when I use the ROWID.

Additional input parameters are:

P_KEY_COLS : optional array of primary key columns as sys.odcivarchar2list().
  This overrides the default search for PK columns in ALL_CONS_COLUMNS.
  You can specify P_KEY_COLS without specifying P_TAB_COLS,
  but not the reverse.
  
P_OPERATION_COL : name of the column containing the CDC flag ('D', 'I', 'U').
  The default is 'OPERATION'.
  I delete the rows where the value is 'D'. I ignore any other value
  because I can tell whether to insert or update without it.
*/
/*
Example:
  select COMPARE_SYNC.SYNC_SQL(
    P_OLD_OWNER => user,
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE'
  ) from DUAL;
*/
  function SYNC_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_DBLINK in varchar2 default null
  ) return clob;
  
/*
Example:
  select COMPARE_SYNC.SYNC_SQL(
    P_OLD_OWNER => user,
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE'
    P_KEY_COLS => SYS.ODCIVARCHAR2LIST('C1','C2')
  ) from DUAL;
*/
  function SYNC_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OLD_DBLINK in varchar2 default null
  ) return clob;
  
/*
Example:
  select COMPARE_SYNC.SYNC_SQL(
    P_OLD_OWNER => user,
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE',
    P_TAB_COLS => SYS.ODCIVARCHAR2LIST('C1','C2','C3', '"c4"'),
    P_KEY_COLS => SYS.ODCIVARCHAR2LIST('C1','C2')
  ) from DUAL;
*/
  function SYNC_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_TAB_COLS in SYS.ODCIVARCHAR2LIST,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OLD_DBLINK in varchar2 default null
  ) return clob;
  
/*
Example:
  select COMPARE_SYNC.SYNC_UPSERT_SQL(
    P_OLD_OWNER => user,
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE'
  ) from DUAL;
*/
  function SYNC_UPSERT_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_DBLINK in varchar2 default null
  ) return clob;

/*
Example:
  select COMPARE_SYNC.SYNC_UPSERT_SQL(
    P_OLD_OWNER => user,
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE',
    P_KEY_COLS => SYS.ODCIVARCHAR2LIST('C1','C2')
  ) from DUAL;
*/
  function SYNC_UPSERT_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OLD_DBLINK in varchar2 default null
  ) return clob;

/*
Example:
  select COMPARE_SYNC.SYNC_UPSERT_SQL(
    P_OLD_OWNER => user,
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE',
    P_TAB_COLS => SYS.ODCIVARCHAR2LIST('C1','C2','C3', '"c4"'),
    P_KEY_COLS => SYS.ODCIVARCHAR2LIST('C1','C2')
  ) from DUAL;
*/
  function SYNC_UPSERT_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_TAB_COLS in SYS.ODCIVARCHAR2LIST,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OLD_DBLINK in varchar2 default null
  ) return clob;

/*
Example:
  select COMPARE_SYNC.SYNC_CDC_SQL(
    P_OLD_OWNER => user,
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_CDC'
  ) from DUAL;
*/
  function SYNC_CDC_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OPERATION_COL in varchar2 default 'OPERATION',  --'
    P_OLD_DBLINK in varchar2 default null
  ) return clob;

/*
Example:
  select COMPARE_SYNC.SYNC_CDC_SQL(
    P_OLD_OWNER => user,
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_CDC',
    P_KEY_COLS => SYS.ODCIVARCHAR2LIST('C1','C2')
  ) from DUAL;
*/
  function SYNC_CDC_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OPERATION_COL in varchar2 default 'OPERATION',  --'
    P_OLD_DBLINK in varchar2 default null
  ) return clob;

/*
Example:
  select COMPARE_SYNC.SYNC_CDC_SQL(
    P_OLD_OWNER => user,
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_CDC',
    P_TAB_COLS => SYS.ODCIVARCHAR2LIST('C1','C2','C3', '"c4"'),
    P_KEY_COLS => SYS.ODCIVARCHAR2LIST('C1','C2'),
    P_OPERATION_COL => 'OPCODE'
  ) from DUAL;
*/
  function SYNC_CDC_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_TAB_COLS in SYS.ODCIVARCHAR2LIST,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OPERATION_COL in varchar2 default 'OPERATION',  --'
    P_OLD_DBLINK in varchar2 default null
  ) return clob;

end COMPARE_SYNC;
/

 

create or replace package body COMPARE_SYNC as

  G_DOLOG CONSTANT BOOLEAN := false;

  type T_REPL is RECORD(
    OLD_OWNER_TABLE varchar2(255),
    NEW_SOURCE varchar2(4000),
    FIRST_COL USER_TABLES.TABLE_NAME%type,
    ALL_COLS2 clob,
    ALL_COLS4 clob,
    ALL_COLS6 clob,
    INSERT_COLS2 clob,
    PK_COLS6 clob,
    ON_COLS2 clob,
    SET_COLS2 clob,
    DECODE_COLS2 clob,
    OPERATION_COL USER_TABLES.TABLE_NAME%type
  );

  procedure LOGGER(P_TXT in clob) is
  begin
    if G_DOLOG then
      DBMS_OUTPUT.PUT_LINE('prompt > ' || P_TXT);
    end if;
  end LOGGER;

  /*
  Format input array into CLOB with configurable maximum line length
  and configurable indentation. Indent all lines including the first.
  Start the result on a new line in the first column.
  Pattern is simplified printf: each occurence of '%s' is replaced by the array element.
  */
  function STRINGAGG(
    PT_COLS in SYS.ODCIVARCHAR2LIST,
    P_INDENTLEN in integer default 4,
    P_PATTERN in varchar2 default '%s',
    P_SEPARATOR in varchar2 default ',',
    P_LINEMAXLEN in number default 80
  ) return clob is
    C_NEWLINE varchar2(2) := '
';
    L_CLOB clob := RPAD(' ', P_INDENTLEN, ' ');
    L_NEW varchar2(128);
    L_LINELEN number := P_INDENTLEN;
  begin
    for I in 1..PT_COLS.COUNT LOOP
      L_NEW := case when I > 1 then ' ' end
        || replace(P_PATTERN, '%s', PT_COLS(I))
        || case when I < PT_COLS.COUNT then P_SEPARATOR end;
      if L_LINELEN + length(L_NEW) > P_LINEMAXLEN then
        L_CLOB := L_CLOB || C_NEWLINE || RPAD(' ', P_INDENTLEN, ' ');
        L_LINELEN := P_INDENTLEN;
        L_NEW := SUBSTR(L_NEW,2);
      end if;
      L_CLOB := L_CLOB || L_NEW;
      L_LINELEN := L_LINELEN + length(L_NEW);
    end LOOP;
    return L_CLOB;
  end STRINGAGG;

  procedure MAKE_REPLACEMENTS(
    P_REPL in OUT NOCOPY T_REPL,
    P_OLD_OWNER in varchar2,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_TAB_COLS in SYS.ODCIVARCHAR2LIST,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OLD_DBLINK in varchar2,
    P_OPERATION_COL in varchar2 default null
  ) is
    L_NON_KEY_COLS SYS.ODCIVARCHAR2LIST;
  begin
    LOGGER('MAKE_REPLACEMENTS');
    P_REPL := null;
    if P_OLD_OWNER is null then
      P_REPL.OLD_OWNER_TABLE := P_OLD_TABLE || P_OLD_DBLINK;
    else
      P_REPL.OLD_OWNER_TABLE := P_OLD_OWNER || '.' || P_OLD_TABLE || P_OLD_DBLINK;
    end if;
    if P_NEW_SOURCE is null then
      RAISE_APPLICATION_ERROR(
        -20001,
        'P_NEW_SOURCE is null. Must be table, view or query within parentheses.'
      );
    else
      P_REPL.NEW_SOURCE := P_NEW_SOURCE;
    end if;
    if P_TAB_COLS is null or P_TAB_COLS.COUNT = 0 then
      RAISE_APPLICATION_ERROR(
        -20002,
        'P_TAB_COLS is null or is an empty collection.'
      );
    else
      P_REPL.FIRST_COL := P_TAB_COLS(1);
      P_REPL.ALL_COLS2 := STRINGAGG(P_TAB_COLS,2);
      P_REPL.ALL_COLS4 := STRINGAGG(P_TAB_COLS,4);
      P_REPL.ALL_COLS6 := STRINGAGG(P_TAB_COLS,6);
      P_REPL.INSERT_COLS2 := STRINGAGG(P_TAB_COLS, 2, 'N.%s');
    end if;
    if P_KEY_COLS is not null and P_KEY_COLS.COUNT > 0 then
      P_REPL.PK_COLS6 := STRINGAGG(P_KEY_COLS, 6);
      P_REPL.ON_COLS2 := STRINGAGG(P_KEY_COLS, 2, 'O.%s=N.%s', ' and ');
      select column_value bulk collect into L_NON_KEY_COLS
      from table(P_TAB_COLS)
      where replace(column_value,'"','') not in (
        select replace(column_value,'"','') from table(P_KEY_COLS)
      );
      if L_NON_KEY_COLS.COUNT between 1 and P_TAB_COLS.COUNT - 1 then
        P_REPL.SET_COLS2 := STRINGAGG(L_NON_KEY_COLS, 2, '%s=N.%s');
        P_REPL.DECODE_COLS2 := STRINGAGG(L_NON_KEY_COLS, 2, 'decode(O.%s,N.%s,0,1)');
      end if;
    end if;
    P_REPL.OPERATION_COL := P_OPERATION_COL;
  end MAKE_REPLACEMENTS;

  procedure OLD_OWNER_CHECK(
    P_OLD_OWNER in varchar2,
    P_OLD_DBLINK in varchar2
  ) is
    L_CNT number;
    L_SQL varchar2(255) :=
q'!select COUNT(*) from ALL_USERS#DBLINK# where USERNAME = '#OLD_OWNER#'!';
  begin
    LOGGER('old_owner_check');
    if P_OLD_OWNER is not null then
      L_SQL := replace(L_SQL, '#DBLINK#', P_OLD_DBLINK);
      L_SQL := replace(L_SQL, '#OLD_OWNER#', NVL(P_OLD_OWNER, user));
      LOGGER(L_SQL);
      execute immediate L_SQL into L_CNT;
      if L_CNT = 0 then
        RAISE_APPLICATION_ERROR(
          -20003,
          'P_OLD_OWNER = ' ||P_OLD_OWNER|| ': user not found in the database.'
        );
      end if;
    end if;
  end OLD_OWNER_CHECK;

  function GET_TAB_COLS(
    P_OLD_OWNER in varchar2,
    P_OLD_TABLE in varchar2,
    P_OLD_DBLINK in varchar2
  ) return SYS.ODCIVARCHAR2LIST is
    l_version number;
    l_instance_sql varchar2(255) := 
q'!select to_number(regexp_substr(banner, 'Release ([^|.]+)', 1, 1, 'i', 1))
from v$version#DBLINK#
where rownum = 1!';
    L_TAB_COLS SYS.ODCIVARCHAR2LIST;
    L_SQL varchar2(255) := 
q'!select '"'||COLUMN_NAME||'"'
from ALL_TAB_COLS#DBLINK#
where (OWNER, TABLE_NAME, VIRTUAL_COLUMN) =
(('#OLD_OWNER#', '#OLD_TABLE#', 'NO'))
and #VERSION_DEPENDENT#
order by SEGMENT_COLUMN_ID!';
  begin
    LOGGER('get_tab_cols');
    OLD_OWNER_CHECK(P_OLD_OWNER, P_OLD_DBLINK);
    l_instance_sql := replace(l_instance_sql, '#DBLINK#', P_OLD_DBLINK);
    LOGGER(l_instance_sql);
    execute immediate l_instance_sql into l_version;
    logger('l_version = ' || l_version);
    if l_version >= 12 then
      L_SQL := replace(L_SQL, '#VERSION_DEPENDENT#', 'USER_GENERATED = ''YES''');
    else
      L_SQL := replace(L_SQL, '#VERSION_DEPENDENT#', 'HIDDEN_COLUMN = ''NO''');
    end if;
    L_SQL := replace(L_SQL, '#DBLINK#', P_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', NVL(P_OLD_OWNER, user));
    L_SQL := replace(L_SQL, '#OLD_TABLE#', P_OLD_TABLE);
    LOGGER(L_SQL);
    execute immediate L_SQL bulk collect into L_TAB_COLS;
    if L_TAB_COLS.COUNT = 0 then
      RAISE_APPLICATION_ERROR(
        -20004,
        NVL(P_OLD_OWNER, user) || '.' ||P_OLD_TABLE || ': table not found.'
      );
    end if;
    return L_TAB_COLS;
  end GET_TAB_COLS;
  
  function PREFIX_DBLINK( P_OLD_DBLINK in varchar2) return varchar2 is
  begin
    if P_OLD_DBLINK is null or SUBSTR(P_OLD_DBLINK,1,1) = '@' then
      return P_OLD_DBLINK;
    else
      return '@' || P_OLD_DBLINK;
    end if;
  end PREFIX_DBLINK;

  function GET_KEY_COLS(
    P_OLD_OWNER in varchar2,
    P_OLD_TABLE in varchar2,
    P_OLD_DBLINK in varchar2
  ) return SYS.ODCIVARCHAR2LIST is
    L_KEY_COLS SYS.ODCIVARCHAR2LIST;
    L_SQL varchar2(4000) := 
q'!select '"'||COLUMN_NAME||'"'
from ALL_CONS_COLUMNS#DBLINK#
where (OWNER, CONSTRAINT_NAME) = (
  select OWNER, CONSTRAINT_NAME from ALL_CONSTRAINTS#DBLINK#
  where (OWNER, TABLE_NAME, CONSTRAINT_TYPE) =
        (('#OLD_OWNER#', '#OLD_TABLE#', 'P'))
)!';
  begin
    LOGGER('get_key_cols');
    OLD_OWNER_CHECK(P_OLD_OWNER, P_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#DBLINK#', P_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', NVL(P_OLD_OWNER, user));
    L_SQL := replace(L_SQL, '#OLD_TABLE#', P_OLD_TABLE);
    LOGGER(L_SQL);
    execute immediate L_SQL bulk collect into L_KEY_COLS;
    return L_KEY_COLS;
  end GET_KEY_COLS;

  function COMPARE_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_TAB_COLS in SYS.ODCIVARCHAR2LIST,
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
    L_REPL T_REPL;
    L_OLD_DBLINK varchar2(255) := PREFIX_DBLINK(P_OLD_DBLINK);
  begin
    LOGGER('compare_sql with tab_cols');
    MAKE_REPLACEMENTS(
      L_REPL,
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_TAB_COLS,
      SYS.ODCIVARCHAR2LIST(),
      L_OLD_DBLINK
    );
    return to_clob('select
')||L_REPL.ALL_COLS2||',
sum(OLD_CNT) OLD_CNT, sum(NEW_CNT) NEW_CNT
FROM (
  select
'||L_REPL.ALL_COLS2||',
  1 OLD_CNT, 0 NEW_CNT
  from '||L_REPL.OLD_OWNER_TABLE||' O
  union all
  select
'||L_REPL.ALL_COLS2||',
  0 OLD_CNT, 1 NEW_CNT
  from '||L_REPL.NEW_SOURCE||' N
)
group by
'||L_REPL.ALL_COLS2||'
having sum(OLD_CNT) != sum(NEW_CNT)
order by 1, NEW_CNT';
  end COMPARE_SQL;

  function COMPARE_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
    L_OLD_DBLINK varchar2(255) := PREFIX_DBLINK(P_OLD_DBLINK);
  begin
    LOGGER('compare_sql without tab_cols');
    return COMPARE_SQL(
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_NEW_SOURCE,
      GET_TAB_COLS(P_OLD_OWNER, P_OLD_TABLE, L_OLD_DBLINK),
      L_OLD_DBLINK
    );
  end COMPARE_SQL;

  function SYNC_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_TAB_COLS in SYS.ODCIVARCHAR2LIST,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
  
    L_REPL T_REPL;
    L_OLD_DBLINK varchar2(255) := PREFIX_DBLINK(P_OLD_DBLINK);

  begin
    LOGGER('sync_sql with tab_cols');
    MAKE_REPLACEMENTS(
      L_REPL,
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_TAB_COLS,
      P_KEY_COLS,
      L_OLD_DBLINK
    );
    if L_REPL.SET_COLS2 is null then
      LOGGER('without set_cols');
      return to_clob('merge /*+ use_nl(O) */into ')||L_REPL.OLD_OWNER_TABLE||' O
using (
  select * from (
    select
    SUM(Z##FLAG) over(partition by
'||L_REPL.ALL_COLS6||'
    ) Z##NUM_ROWS,
    COUNT(NULLIF(Z##FLAG,-1)) over(partition by
'||L_REPL.ALL_COLS6||'
      order by null rows unbounded preceding
    ) Z##NEW,
    COUNT(NULLIF(Z##FLAG,1)) over(partition by
'||L_REPL.ALL_COLS6||'
      order by null rows unbounded preceding
    ) Z##OLD,
    a.* from (
      select
'||L_REPL.ALL_COLS6||',
      -1 Z##FLAG, rowid Z##RID
      from '||L_REPL.OLD_OWNER_TABLE||' O
      union all
      select
'||L_REPL.ALL_COLS6||',
      1 Z##FLAG, null
      from '||L_REPL.NEW_SOURCE||' N
    ) a
  )
  where Z##NUM_ROWS != 0
  and SIGN(Z##NUM_ROWS) = Z##FLAG
  and ABS(Z##NUM_ROWS) >=
    case SIGN(Z##NUM_ROWS) when 1 then Z##NEW else Z##OLD end
) N
on (O.rowid = N.Z##RID)
when matched then update set '||L_REPL.FIRST_COL||' = N.'||L_REPL.FIRST_COL||'
delete where 1=1
when not matched then insert (
'||L_REPL.ALL_COLS2||'
) values(
'||L_REPL.INSERT_COLS2||'
)';
    else
      LOGGER('with set_cols');
      return to_clob('merge into ')||L_REPL.OLD_OWNER_TABLE||' O
using (
  select * from (
    select
'||L_REPL.ALL_COLS4||',
    COUNT(*) over(partition by
'||L_REPL.PK_COLS6||'
    )
    - SUM(Z##_CNT) Z##IUD_FLAG
    from (
      select
'||L_REPL.ALL_COLS6||',
      -1 Z##_CNT
      from '||L_REPL.OLD_OWNER_TABLE||' O
      union all
      select
'||L_REPL.ALL_COLS6||',
      1 Z##_CNT
      from '||L_REPL.NEW_SOURCE||' N
    )
    group by
'||L_REPL.ALL_COLS4||'
    having SUM(Z##_CNT) != 0
  )
  where Z##IUD_FLAG < 3
) N
on (
'||L_REPL.ON_COLS2||'
)
when matched then update set
'||L_REPL.SET_COLS2||'
  delete where N.Z##IUD_FLAG = 2
when not matched then insert (
'||L_REPL.ALL_COLS2||'
) values(
'||L_REPL.INSERT_COLS2||'
)';
    end if;
  end SYNC_SQL;

  function SYNC_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
    L_OLD_DBLINK varchar2(255) := PREFIX_DBLINK(P_OLD_DBLINK);
  begin
    LOGGER('sync_sql without key_cols');
    return SYNC_SQL(
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_NEW_SOURCE,
      GET_TAB_COLS(P_OLD_OWNER, P_OLD_TABLE, L_OLD_DBLINK),
      GET_KEY_COLS(P_OLD_OWNER, P_OLD_TABLE, L_OLD_DBLINK),
      L_OLD_DBLINK
    );
  end SYNC_SQL;

  function SYNC_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
    L_OLD_DBLINK varchar2(255) := PREFIX_DBLINK(P_OLD_DBLINK);
  begin
    LOGGER('sync_sql with key_cols');
    return SYNC_SQL(
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_NEW_SOURCE,
      GET_TAB_COLS(P_OLD_OWNER, P_OLD_TABLE, L_OLD_DBLINK),
      P_KEY_COLS,
      L_OLD_DBLINK
    );
  end SYNC_SQL;

  function SYNC_UPSERT_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_TAB_COLS in SYS.ODCIVARCHAR2LIST,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OLD_DBLINK in varchar2 default null
  ) return clob is

    L_REPL T_REPL;
    L_OLD_DBLINK varchar2(255) := PREFIX_DBLINK(P_OLD_DBLINK);

  begin
    LOGGER('sync_upsert_sql with tab_cols');
    MAKE_REPLACEMENTS(
      L_REPL,
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_TAB_COLS,
      P_KEY_COLS,
      L_OLD_DBLINK
    );
    if L_REPL.SET_COLS2 is null then
      RAISE_APPLICATION_ERROR(
        -20005,
        'SYNC_UPSERT_SQL requires a target with both primary and non-key columns'
      );
    end if;
    return to_clob('merge into (
  select
')||L_REPL.ALL_COLS2||'
  from '||L_REPL.OLD_OWNER_TABLE||'
) O
using (
  select
'||L_REPL.ALL_COLS2||'
  from '||L_REPL.NEW_SOURCE||'
) N
on (
'||L_REPL.ON_COLS2||'
)
when matched then update set
'||L_REPL.SET_COLS2||'
where 1 in (
'||L_REPL.DECODE_COLS2||'
)
when not matched then insert (
'||L_REPL.ALL_COLS2||'
) values(
'||L_REPL.INSERT_COLS2||'
)';
  end SYNC_UPSERT_SQL;

  function SYNC_UPSERT_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
    L_OLD_DBLINK varchar2(255) := PREFIX_DBLINK(P_OLD_DBLINK);
  begin
    LOGGER('sync_upsert_sql without key_cols');
    return SYNC_UPSERT_SQL(
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_NEW_SOURCE,
      GET_TAB_COLS(P_OLD_OWNER, P_OLD_TABLE, L_OLD_DBLINK),
      GET_KEY_COLS(P_OLD_OWNER, P_OLD_TABLE, L_OLD_DBLINK),
      L_OLD_DBLINK
    );
  end SYNC_UPSERT_SQL;

  function SYNC_UPSERT_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
    L_OLD_DBLINK varchar2(255) := PREFIX_DBLINK(P_OLD_DBLINK);
  begin
    LOGGER('sync_upsert_sql with key_cols');
    return SYNC_UPSERT_SQL(
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_NEW_SOURCE,
      GET_TAB_COLS(P_OLD_OWNER, P_OLD_TABLE, L_OLD_DBLINK),
      P_KEY_COLS,
      L_OLD_DBLINK
    );
  end SYNC_UPSERT_SQL;

  function SYNC_CDC_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_TAB_COLS in SYS.ODCIVARCHAR2LIST,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OPERATION_COL in varchar2 default 'OPERATION',
    P_OLD_DBLINK in varchar2 default null
  ) return clob is

    L_REPL T_REPL;
    L_OLD_DBLINK varchar2(255) := PREFIX_DBLINK(P_OLD_DBLINK);

  begin
    LOGGER('sync_cdc_sql with tab_cols');
    LOGGER('P_OPERATION_COL = ' || P_OPERATION_COL);
    if P_OPERATION_COL is null then
      RAISE_APPLICATION_ERROR(
        -20006,
        'P_OPERATION_COL is null. Must be valid column in source data.'
      );
    end if;
    MAKE_REPLACEMENTS(
      L_REPL,
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_TAB_COLS,
      P_KEY_COLS,
      L_OLD_DBLINK,
      P_OPERATION_COL
    );
    if L_REPL.SET_COLS2 is null then
      RAISE_APPLICATION_ERROR(
        -20007,
        'SYNC_CDC_SQL requires a target with both primary and non-key columns'
      );
    end if;
    return to_clob('merge into (
  select
')||L_REPL.ALL_COLS2||'
  from '||L_REPL.OLD_OWNER_TABLE||'
) O
using (
  select '||L_REPL.OPERATION_COL||',
'||L_REPL.ALL_COLS2||'
  from '||L_REPL.NEW_SOURCE||'
) N
on (
'||L_REPL.ON_COLS2||'
)
when matched then update set
'||L_REPL.SET_COLS2||'
where N.'||L_REPL.OPERATION_COL||' = ''D'' or 1 in (
'||L_REPL.DECODE_COLS2||'
)
delete where N.'||L_REPL.OPERATION_COL||' = ''D''
when not matched then insert (
'||L_REPL.ALL_COLS2||'
) values(
'||L_REPL.INSERT_COLS2||'
) where N.'||L_REPL.OPERATION_COL||' != ''D''';
  end SYNC_CDC_SQL;

  function SYNC_CDC_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_KEY_COLS in SYS.ODCIVARCHAR2LIST,
    P_OPERATION_COL in varchar2 default 'OPERATION',
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
    L_OLD_DBLINK varchar2(255) := PREFIX_DBLINK(P_OLD_DBLINK);
  begin
    LOGGER('sync_cdc_sql with key_cols');
    return SYNC_CDC_SQL(
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_NEW_SOURCE,
      GET_TAB_COLS(P_OLD_OWNER, P_OLD_TABLE, L_OLD_DBLINK),
      P_KEY_COLS,
      P_OPERATION_COL,
      L_OLD_DBLINK
    );
  end SYNC_CDC_SQL;

  function SYNC_CDC_SQL(
    P_OLD_OWNER in varchar2 default null,
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OPERATION_COL in varchar2 default 'OPERATION',
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
    L_OLD_DBLINK varchar2(255) := PREFIX_DBLINK(P_OLD_DBLINK);
  begin
    LOGGER('sync_cdc_sql without key_cols');
    return SYNC_CDC_SQL(
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_NEW_SOURCE,
      GET_TAB_COLS(P_OLD_OWNER, P_OLD_TABLE, L_OLD_DBLINK),
      GET_KEY_COLS(P_OLD_OWNER, P_OLD_TABLE, L_OLD_DBLINK),
      P_OPERATION_COL,
      L_OLD_DBLINK
    );
  end SYNC_CDC_SQL;

end COMPARE_SYNC;
/