COMP_SYNC 1: a new table compare/sync package

I have been meaning to update my COMPARE_SYNC package for some time. I want to change the interface and the functionality a bit, so I am leaving the existing package alone and creating a new one called COMP_SYNC.

If you have used the old package, I would greatly appreciate any feedback on the new version: functionality, performance, bugs, etc. Comment away and thanks in advance.

What COMP_SYNC does for you

The package returns CLOBs containing SQL statements for you to adjust / test / execute. It uses CDC (Change Data Capture) format, with a flag (Z##OP) on each row with ‘I’ for insert, ‘U’ for update and ‘D’ for delete.

  • COMPARE_SQL: COMPARE_SQL returns SQL that compares new source and old target using Tom Kyte’s GROUP BY method. Omitted columns are not compared and do not appear in the output.
    • ‘D’ rows are in “old” but not in “new”.
    • ‘I’ rows are in “new” but not in “old”.
      Since there may be duplicates, Z##CNT has the number of rows involved.
  • CDC_SQL: compares an “old” table (not a view) to “new”. You can exclude columns from the comparison, but the output shows entire rows with all columns, including the ROWID of the “old” row. For every ‘U’ row there is a corresponding ‘O’ (for “old”) row with the old values.
  • SYNC_SQL: compares and syncs from source to target: inserts, updates and deletes.
    Works with any combination of key and non-key columns.
  • SYNC_UPSERT_SQL: inserts and updates but no deletes. Works only when there are both key and non-key columns.
  • SYNC_CDC_SQL: directly applies changes from a CDC table such as returned by CDC_SQL.

Parameter changes

If you have already used COMPARE_SYNC, here is what changed:

  • Columns are now in comma-separated lists and not in little SYS.ODCIVARCHAR2LIST tables.
  • Table names and column names are converted to upper case unless you put them in double quotes.
  • P_EXCLUDE_COLS replaces P_ALL_COLS: if you want to exclude columns from the comparison just list them here, instead of having to list all the columns you want to include.
  • P_PREFIX replaces P_OPERATION_COL: I use a few column names in addition to the actual tables, so the prefix is now applied to all of them to avoid collisions with your names.

The code

[Update 2018-02-13: added source code files]

This site does not allow upload of source code, so I had to add a “.doc” suffix.

comp_sync-pks.doc : package specification, rename to comp_sync.pks

comp_sync-pkb.doc : package body, rename to comp_sync.pkb

create or replace package COMP_SYNC
authid current_user as
/*
COMP_SYNC generates SQL for comparing or synchronizing
"old" target and "new" source.
 
- "Old" can be a table or view, local or remote.
  Indicate separately the "old" owner, "old" table and "old" dblink.
  To compare two queries, create a view to use as the "old".
  To sync, "old" must be a table but I do not check that for you.
- "New" can be local, remote, table, view or a query enclosed in parentheses.
  Examples: 'SCOTT.EMP', 'T_SOURCE@DBLINK', '(select * from SCOTT.EMP@DBLINK)'
 
Note: I never check the "new" source for validity.
I only check the "old" target for validity when I look up columns from the data dictionary.
So the generated SQL is not guaranteed to run without error!
   
The generated SQL is returned as a CLOB.
 
To debug, change the value of G_DOLOG to true. See the beginning of the package body.
 
INPUT PARAMETERS:

-- Required
  
P_OLD_TABLE  : name of the target table or view. Must exist in the database.
 
P_NEW_SOURCE : source table or view - or query enclosed in parentheses.

-- Optional
 
P_OLD_OWNER  : owner of the target. Must exist in the database.
  The default is null, which assumes the current user.
 
P_EXCLUDE_COLS   : optional comma-separated list of columns to OMIT from the comparison.
  If you leave out P_EXCLUDE_COLS, every non-virtual column will be compared,
  both visible and invisible.
  If you omit a PK column, the tables are considered not to have a primary key.
 
P_KEY_COLS : optional comma-separated list of primary key columns.
  This overrides the default search for PK columns in ALL_CONS_COLUMNS.
   
P_OLD_DBLINK : dblink to the target database.
  The default is null, which means the target is in the local database.
   
P_PREFIX : prefix to the names of the columns such as the CDC flag
  ('D', 'I', 'U' or 'O' for the "old" rows being updated).
  When syncing, I delete the rows marked 'D' and ignore the rows marked 'O'.
  The default prefix is 'Z##'.
 
Pre 2018-02-01:
  See the COMPARE_SYNC package.
2018-02-01: Major overhaul
    - Parameters reordered to have most usual first
    - P_EXCLUDE_COLS (to exclude some columns) replaces P_ALL_COLS (that included columns).
    - P_OPERATION_COL is replaced by P_PREFIX that begins all column names I make up.
    - P_EXCLUDE_COLS and P_KEY_COLS are now comma-separated lists and not little tables.
    - User, table and column names are now upper cased unless within double quotes.
    - Instead of passing a huge record among internal procedures,
      I now use global variables. So sue me!
    - CDC output rows include the ROWID of the target table, which is used for efficient syncing.
*/
/*
COMPARING:
 
COMPARE_SQL returns SQL that compares new source and old target
using Tom Kyte's GROUP BY method.
Omitted columns are not compared and do not appear in the output.
'D' rows are in "old" but not in "new".
'I' rows are in "new" but not in "old".
Since there may be duplicates, Z##CNT has the number of rows involved.

Example:
  select COMP_SYNC.COMPARE_SQL('T_TARGET', 'T_SOURCE') from DUAL;
*/
  function COMPARE_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;
/*
CDC_SQL produces CDC output: 'D', 'I', 'U' - or 'O' for the "old" rows being updated.
The output includes the ROWID of the target, except when 'I'.

Example:
  select COMP_SYNC.CDC_SQL('T_TARGET', 'T_SOURCE') from DUAL;
*/
  function CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;
/*
SYNCHRONIZING
 
The package can synchronize in one of three ways:
1) SYNC: Compare and sync from source to target: inserts, updates and deletes.
    Works with any combination of key and non-key columns,
    but the target must be a table because I use the ROWID.
    
2) SYNC_UPSERT: sync from source to target: inserts and updates but no deletes.
    Requires a target with both primary key and non-key columns.
    It does not allow for omitting columns: the workaround is to use a view on the target.
    
3) SYNC_CDC: the source is a "Change Data Capture" table.
  It contains inserts, updates and deletes to be directly applied.
  Must contain an column ending with 'OP' containing the operation flag (I,U,D),
  and a column ending in 'RID' with the ROWID of the target row if U or D. 
*/
/*
Example:
  select COMP_SYNC.SYNC_SQL(
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE'
  ) from DUAL;
*/
  function SYNC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;

/*
Example:
  select COMP_SYNC.SYNC_UPSERT_SQL(
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE',
    P_KEY_COLS => 'C1,C2'
  ) from DUAL;
*/
  function SYNC_UPSERT_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null
  ) return clob;
 
/*
Example:
  select COMP_SYNC.SYNC_CDC_SQL(
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_CDC',
    P_OLD_OWNER => user,
    P_KEY_COLS => 'C1,C2',
    P_PREFIX => 'OPCODE'
  ) from DUAL;
*/
  function SYNC_CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;
 
end COMP_SYNC;
/
create or replace package body COMP_SYNC as
 
  G_DOLOG constant BOOLEAN := false;
    C_NEWLINE constant varchar2(2) := '
';
  
  type TT_VARCHAR2 is table of VARCHAR2(255);
  
  -- set by CHECK_COMMON_INPUTS
  G_OLD_OWNER varchar2(255);
  G_OLD_TABLE varchar2(255);
  G_NEW_SOURCE varchar2(4000);
  G_OLD_DBLINK varchar2(255);
  G_OPERATION_COL varchar2(255);
  G_OLD_OWNER_TABLE varchar2(255);
  
  -- set by MAKE_REPLACEMENTS
  G_ALL_COLS TT_VARCHAR2;   -- all non-virtual columns
  G_SOME_COLS TT_VARCHAR2;  -- all non-virtual columns except those listed on P_EXCLUDE_COLS
  G_KEY_COLS TT_VARCHAR2;   -- from P_KEY_COLS, or by default the "old" primary key columns
  G_FIRST_COL TT_VARCHAR2; -- first column in G_SOME_COLS
  G_ALL_COL_CLOB clob;  
  G_SOME_COL_CLOB clob;
  G_INSERT_COL_CLOB clob;
  G_KEY_COL_CLOB clob;
  G_ON_COL_CLOB clob;
  G_SET_COL_CLOB clob;
  G_FIRST_COL_CLOB clob;
  G_DECODE_COL_CLOB clob;
 
  procedure LOGGER(P_TXT in clob, P_DOLOG in boolean default false) is
  begin
    if G_DOLOG or P_DOLOG then
      DBMS_OUTPUT.PUT_LINE('prompt > ' || P_TXT);
    end if;
  end LOGGER;
  
  /* sets all G_OLD_* parameters, G_NEW_SOURCE and G_OPERATION_COL.
     If P_OLD_OWNER is null, G_OLD_OWNER := user but G_OLD_OWNER_TABLE does not mention schema.
     OWNER, TABLE and OPERATION_COL are uppercased unless within double quotes.
     OWNER is checked for existence. OLD_TABLE is checked for existence later if necessary. */
  procedure CHECK_COMMON_INPUTS(
    P_OLD_OWNER in varchar2,
    P_OLD_TABLE in varchar2,
    P_OLD_DBLINK in varchar2,
    P_NEW_SOURCE in varchar2
  ) is
    L_CNT number;
    L_SQL varchar2(255) :=
q'!select COUNT(*) from ALL_USERS#DBLINK# where USERNAME = trim('"' from '#OLD_OWNER#')!';
  begin
    LOGGER('CHECK_COMMON_INPUTS');
    
    if P_OLD_TABLE is null then 
      RAISE_APPLICATION_ERROR(
        -20001,
        'P_OLD_TABLE must not be null.'
      );
    end if;
    
    if P_OLD_DBLINK is null or SUBSTR(P_OLD_DBLINK,1,1) = '@' then
      G_OLD_DBLINK := upper(P_OLD_DBLINK);
    else
      G_OLD_DBLINK :=  '@' || upper(P_OLD_DBLINK);
    end if;
    
    if substr(P_OLD_OWNER,1,1) = '"' then
      G_OLD_OWNER := P_OLD_OWNER;
    else
      G_OLD_OWNER := upper(P_OLD_OWNER);
    end if;
    
    if substr(P_OLD_TABLE,1,1) = '"' then
      G_OLD_TABLE := P_OLD_TABLE;
    else
      G_OLD_TABLE := upper(P_OLD_TABLE);
    end if;
    
    if G_OLD_OWNER is null then
      G_OLD_OWNER_TABLE := G_OLD_TABLE || G_OLD_DBLINK;
      G_OLD_OWNER := user;
    else
      G_OLD_OWNER_TABLE := G_OLD_OWNER || '.' || G_OLD_TABLE || G_OLD_DBLINK;
    end if;
    
    L_SQL := replace(L_SQL, '#DBLINK#', G_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', G_OLD_OWNER);
    LOGGER(L_SQL);
    execute immediate L_SQL into L_CNT;
    if L_CNT = 0 then
      RAISE_APPLICATION_ERROR(
        -20002,
        'OLD_OWNER = ' ||G_OLD_OWNER|| ': user not found in the database.'
      );
    end if;
    
    if P_NEW_SOURCE is null then
      RAISE_APPLICATION_ERROR(
        -20003,
        'P_NEW_SOURCE is null. Must be table, view or query within parentheses.'
      );
    else
      G_NEW_SOURCE := P_NEW_SOURCE;
    end if;
  
  end CHECK_COMMON_INPUTS;
  
  function COL_TOKENIZE(
    p_string in varchar2
  )
  return TT_VARCHAR2
  as
    c_delim constant varchar2(1) := ',';
    i_prev_pos pls_integer := 1;
    i_pos pls_integer;
    i_max_pos pls_integer := length(p_string) + 1;
    l_col varchar2(255);
    lt_out TT_VARCHAR2 := new TT_VARCHAR2();
    i_out pls_integer := 0;
  begin
    loop
      i_pos := instr(p_string, c_delim, i_prev_pos);
      if i_pos = 0 then
        i_pos := i_max_pos;
      end if;
      l_col := trim(substr(p_string, i_prev_pos, i_pos - i_prev_pos));
      if substr(l_col,1,1) != '"' then
        l_col := '"' || upper(l_col) || '"';
      end if;
      i_out := i_out + 1;
      lt_out.extend;
      lt_out(i_out) := l_col;
      exit when i_pos = i_max_pos;
      i_prev_pos := i_pos + 1;
    end loop;
    return lt_out;
  end COL_TOKENIZE;
 
  /*
  Format input array into CLOB with configurable maximum line length.
  Indentation is handled later using BIG_REPLACE.
  Pattern is simplified printf: each occurence of '%s' is replaced by the array element.
  */
  function STRINGAGG(
    PT_COLS in TT_VARCHAR2,
    P_PATTERN in varchar2 default '%s',
    P_SEPARATOR in varchar2 default ',',
    P_LINEMAXLEN in number default 80
  ) return clob is
    L_CLOB clob;
    L_NEW varchar2(255);
    L_LINELEN number := 0;
  begin
    for I in 1..PT_COLS.COUNT LOOP
      L_NEW := case when I > 1 then ' ' end
        || replace(P_PATTERN, '%s', PT_COLS(I))
        || case when I < PT_COLS.COUNT then P_SEPARATOR end; if L_LINELEN + length(L_NEW) > P_LINEMAXLEN then
        L_CLOB := L_CLOB || C_NEWLINE;
        L_LINELEN := 0;
        L_NEW := SUBSTR(L_NEW,2);
      end if;
      L_CLOB := L_CLOB || L_NEW;
      L_LINELEN := L_LINELEN + length(L_NEW);
    end LOOP;
    return L_CLOB;
  end STRINGAGG;
  
  procedure BIG_REPLACE(
    p_clob in out nocopy clob,
    p_search in varchar2,
    p_replace in clob
  ) is
    c_replace_len constant integer := 30000;
    l_iter integer;
  begin
    if p_search is null then
      RAISE_APPLICATION_ERROR(
        -20004,
        'Internal error in BIG_REPLACE: p_search parameter is null.'
      );
    end if;
    if p_replace is null then
      logger('G_ALL_COL_CLOB : '||G_ALL_COL_CLOB, true);
      logger('G_SOME_COL_CLOB : '||G_SOME_COL_CLOB, true);
      logger('G_INSERT_COL_CLOB : '||G_INSERT_COL_CLOB, true);
      logger('G_KEY_COL_CLOB : '||G_KEY_COL_CLOB, true);
      logger('G_ON_COL_CLOB : '||G_ON_COL_CLOB, true);
      logger('G_SET_COL_CLOB : '||G_SET_COL_CLOB, true);
      logger('G_FIRST_COL_CLOB : '||G_FIRST_COL_CLOB, true);
      logger('G_DECODE_COL_CLOB : '||G_DECODE_COL_CLOB, true);
      RAISE_APPLICATION_ERROR(
        -20005,
        'Internal error in BIG_REPLACE: p_replace parameter is null.'
      );
    end if;
    l_iter := ceil(length(p_replace) / c_replace_len);
    --logger('length(p_replace) : '||length(p_replace));
    --logger('l_iter : '||l_iter);
    for i in 1..l_iter loop
      --logger('(i-1)*c_replace_len+1 : '||((i-1)*c_replace_len+1));
      p_clob := replace(
        p_clob, 
        p_search,
        substr(p_replace, (i-1)*c_replace_len+1, c_replace_len)
          || case when i < l_iter then p_search end ); end loop; end BIG_REPLACE; function GET_ALL_COLS return TT_VARCHAR2 is l_version number; l_instance_sql varchar2(255) := q'!select to_number(regexp_substr(banner, 'Release ([^|.]+)', 1, 1, 'i', 1)) from v$version#DBLINK# where rownum = 1!'; L_TAB_COLS SYS.ODCIVARCHAR2LIST; L_ALL_COLS TT_VARCHAR2 := new TT_VARCHAR2(); L_SQL varchar2(255) := q'!select '"'||COLUMN_NAME||'"' from ALL_TAB_COLS#DBLINK# where (OWNER, TABLE_NAME, VIRTUAL_COLUMN) = ((trim('"' from '#OLD_OWNER#'), trim('"' from '#OLD_TABLE#'), 'NO')) and #VERSION_DEPENDENT# order by SEGMENT_COLUMN_ID!'; begin LOGGER('GET_ALL_COLS'); l_instance_sql := replace(l_instance_sql, '#DBLINK#', G_OLD_DBLINK); LOGGER(l_instance_sql); execute immediate l_instance_sql into l_version; logger('l_version = ' || l_version); if l_version >= 12 then
      L_SQL := replace(L_SQL, '#VERSION_DEPENDENT#', 'USER_GENERATED = ''YES''');
    else
      L_SQL := replace(L_SQL, '#VERSION_DEPENDENT#', 'HIDDEN_COLUMN = ''NO''');
    end if;
    L_SQL := replace(L_SQL, '#DBLINK#', G_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', G_OLD_OWNER);
    L_SQL := replace(L_SQL, '#OLD_TABLE#', G_OLD_TABLE);
    LOGGER(L_SQL);
    execute immediate L_SQL bulk collect into L_TAB_COLS;
    if L_TAB_COLS.COUNT = 0 then
      RAISE_APPLICATION_ERROR(
        -20006,
        G_OLD_OWNER_TABLE || ': table not found.'
      );
    end if;
    L_ALL_COLS.extend(L_TAB_COLS.count);
    for i in 1..L_TAB_COLS.count loop
      L_ALL_COLS(i) := L_TAB_COLS(i);
    end loop;
    return L_ALL_COLS;
  end GET_ALL_COLS;
 
  function GET_KEY_COLS return TT_VARCHAR2 is
    L_KEY_COLS TT_VARCHAR2 := new TT_VARCHAR2();
    L_KEY_COL_LIST SYS.ODCIVARCHAR2LIST;
    L_SQL varchar2(4000) := 
q'!select '"'||COLUMN_NAME||'"'
from ALL_CONS_COLUMNS#DBLINK#
where (OWNER, CONSTRAINT_NAME) = (
  select OWNER, CONSTRAINT_NAME from ALL_CONSTRAINTS#DBLINK#
  where (OWNER, TABLE_NAME, CONSTRAINT_TYPE) =
        ((trim('"' from '#OLD_OWNER#'), trim('"' from '#OLD_TABLE#'), 'P'))
)!';
  begin
    LOGGER('GET_KEY_COLS');
    L_SQL := replace(L_SQL, '#DBLINK#', G_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', G_OLD_OWNER);
    L_SQL := replace(L_SQL, '#OLD_TABLE#', G_OLD_TABLE);
    LOGGER(L_SQL);
    execute immediate L_SQL bulk collect into L_KEY_COL_LIST;
    L_KEY_COLS.extend(L_KEY_COL_LIST.count);
    for i in 1..L_KEY_COL_LIST.count loop
    L_KEY_COLS(i) := L_KEY_COL_LIST(i);
    end loop;
    return L_KEY_COLS;
  end GET_KEY_COLS;
 
  procedure MAKE_REPLACEMENTS(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2,
    P_EXCLUDE_COLS in varchar2,
    P_KEY_COLS in varchar2,
    P_OLD_DBLINK in varchar2
  ) is
    L_NON_KEY_COLS TT_VARCHAR2;
    L_EXCLUDE_COLS TT_VARCHAR2;
  begin
    LOGGER('MAKE_REPLACEMENTS');
    check_common_inputs(
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_OLD_DBLINK,
      P_NEW_SOURCE
    );
    G_ALL_COLS := GET_ALL_COLS;
    if P_EXCLUDE_COLS is null then
      G_SOME_COLS := G_ALL_COLS;
    else
      L_EXCLUDE_COLS := COL_TOKENIZE(P_EXCLUDE_COLS);
      G_SOME_COLS := G_ALL_COLS multiset except L_EXCLUDE_COLS;
    end if;
    G_FIRST_COL := new TT_VARCHAR2(G_SOME_COLS(1));
    G_ALL_COL_CLOB := STRINGAGG(G_ALL_COLS);
    G_SOME_COL_CLOB := STRINGAGG(G_SOME_COLS);
    G_INSERT_COL_CLOB := STRINGAGG(G_ALL_COLS, 'N.%s');
    G_FIRST_COL_CLOB := STRINGAGG(G_FIRST_COL, '%s=N.%s');
    
    if P_KEY_COLS is null then
      G_KEY_COLS := GET_KEY_COLS;
    else
      G_KEY_COLS := COL_TOKENIZE(P_KEY_COLS);
    end if;
    
    if cardinality(G_KEY_COLS multiset intersect L_EXCLUDE_COLS) > 0 then
      G_KEY_COLS := null;
    end if;
    
    G_KEY_COL_CLOB := null;
    G_ON_COL_CLOB := null;
    G_SET_COL_CLOB := null;
    G_DECODE_COL_CLOB := null;
    if G_KEY_COLS is not null and G_KEY_COLS.COUNT > 0 then
      G_KEY_COL_CLOB := STRINGAGG(G_KEY_COLS);
      G_ON_COL_CLOB := STRINGAGG(G_KEY_COLS, 'O.%s=N.%s', ' and');
      L_NON_KEY_COLS := G_SOME_COLS multiset except G_KEY_COLS;
      if L_NON_KEY_COLS.COUNT between 1 and G_SOME_COLS.COUNT - 1 then
        G_SET_COL_CLOB := STRINGAGG(L_NON_KEY_COLS, '%s=N.%s');
        G_DECODE_COL_CLOB := STRINGAGG(L_NON_KEY_COLS, 'decode(O.%s,N.%s,0,1)');
      end if;
    end if;
    
    logger('G_ALL_COL_CLOB : '||G_ALL_COL_CLOB);
    logger('G_SOME_COL_CLOB : '||G_SOME_COL_CLOB);
    logger('G_INSERT_COL_CLOB : '||G_INSERT_COL_CLOB);
    logger('G_KEY_COL_CLOB : '||G_KEY_COL_CLOB);
    logger('G_ON_COL_CLOB : '||G_ON_COL_CLOB);
    logger('G_SET_COL_CLOB : '||G_SET_COL_CLOB);
    logger('G_FIRST_COL_CLOB : '||G_FIRST_COL_CLOB);
    logger('G_DECODE_COL_CLOB : '||G_DECODE_COL_CLOB);

  end MAKE_REPLACEMENTS;

  function COMPARE_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_CLOB constant clob :=
'select /*+ qb_name(COMPARE) */
  #SOME_COLS#,
  decode(sign(sum(#PREFIX#NEW_CNT)), 1, ''I'', ''D'') #PREFIX#OP,
  abs(sum(#PREFIX#NEW_CNT)) #PREFIX#CNT
FROM (
  select /*+ qb_name(old) */
  #SOME_COLS#
    , -1 #PREFIX#NEW_CNT
  from #OLD# O
  union all
  select /*+ qb_name(new) */
  #SOME_COLS#
    , 1 #PREFIX#NEW_CNT
  from #NEW# N
)
group by
  #SOME_COLS#
having sum(#PREFIX#NEW_CNT) != 0
order by 1, #PREFIX#OP';
  begin
    LOGGER('COMPARE_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      P_EXCLUDE_COLS,
      null,
      P_OLD_DBLINK
    );
    L_CLOB := replace(
      C_CLOB,
      '#SOME_COLS#',
      replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    return L_CLOB;
  end COMPARE_SQL;

  function CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_GROUP_CLOB constant clob :=
'select /*+ qb_name(CDC_GROUP) */
    #SOME_COLS#,
  case count(*) over(partition by #KEY_COLS#) - #PREFIX#NEW_CNT
    when 0 then ''I''
    when 1 then ''U''
    when 2 then ''D''
    when 3 then ''O''
  end #PREFIX#OP,
  max(#PREFIX#RID) over(partition by #KEY_COLS#) #PREFIX#RID
FROM (
  select /*+ qb_name(COMPARE) NO_MERGE */
    #SOME_COLS#,
    sum(#PREFIX#NEW_CNT) #PREFIX#NEW_CNT,
    max(#PREFIX#RID) #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #SOME_COLS#,
    -1 #PREFIX#NEW_CNT, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #SOME_COLS#,
    1 #PREFIX#NEW_CNT, null
    from #NEW# N
  )
  group by
    #SOME_COLS#
  having sum(#PREFIX#NEW_CNT) != 0
)
order by 1, #PREFIX#OP';
    C_PARTITION_CLOB constant clob :=
'select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    #ALL_COLS#,
    case
      when #PREFIX#NEW = 1
        and sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        )
        then ''I''
      when #PREFIX#OLD = 1
        and sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        )
        then ''D''
    end #PREFIX#OP, #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #ALL_COLS#,
    1 #PREFIX#OLD, 0 #PREFIX#NEW, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #ALL_COLS#,
    0, 1, null
    from #NEW# N
  )
)
where #PREFIX#OP is not null';
  begin
    LOGGER('COMPARE_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      P_EXCLUDE_COLS,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    if G_KEY_COL_CLOB is null or P_EXCLUDE_COLS is not null then
      L_CLOB := C_PARTITION_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
      big_replace(
        L_CLOB,
        '#ALL_COLS#',
        replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
    else
      L_CLOB := C_GROUP_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '    ')
      );
      big_replace(L_CLOB, '#KEY_COLS#', G_KEY_COL_CLOB);
    end if;
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    return L_CLOB;
  end CDC_SQL; 
  
  function SYNC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_GROUP_CLOB constant clob :=
'merge /*+ qb_name(SYNC_GROUP) USE_NL(O) */ into (
  select /*+ qb_name(target) */
  #ALL_COLS#, rowid #PREFIX#RID
  from #OLD#
) O
using (
select * from (
select /*+ qb_name(CDC_GROUP) */
    #SOME_COLS#,
  case count(*) over(partition by #KEY_COLS#) - #PREFIX#NEW_CNT
    when 0 then ''I''
    when 1 then ''U''
    when 2 then ''D''
    when 3 then ''O''
  end #PREFIX#OP,
  max(#PREFIX#RID) over(partition by #KEY_COLS#) #PREFIX#RID
FROM (
  select /*+ qb_name(COMPARE) NO_MERGE */
    #SOME_COLS#,
    sum(#PREFIX#NEW_CNT) #PREFIX#NEW_CNT,
    max(#PREFIX#RID) #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #SOME_COLS#,
    -1 #PREFIX#NEW_CNT, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #SOME_COLS#,
    1 #PREFIX#NEW_CNT, null
    from #NEW# N
  )
  group by
    #SOME_COLS#
  having sum(#PREFIX#NEW_CNT) != 0
)
)
where #PREFIX#OP in(''I'',''U'',''D'')
) N
on (
  O.#PREFIX#RID = n.#PREFIX#RID
)
when matched then update set
  #SET_COLS#
  where N.#PREFIX#OP in (''U'', ''D'')
  delete where N.#PREFIX#OP = ''D''
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';
    C_PARTITION_CLOB constant clob :=
'merge /*+ qb_name(SYNC_PARTITION) USE_NL(O) */ into (
  select /*+ qb_name(target) */
    #ALL_COLS#, rowid #PREFIX#RID
  from #OLD#
) O
using (
select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    #ALL_COLS#,
    case
      when #PREFIX#NEW = 1
        and sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        )
        then ''I''
      when #PREFIX#OLD = 1
        and sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        )
        then ''D''
    end #PREFIX#OP, #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #ALL_COLS#,
    1 #PREFIX#OLD, 0 #PREFIX#NEW, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #ALL_COLS#,
    0, 1, null
    from #NEW# N
  )
)
where #PREFIX#OP is not null
) N
on (
  O.#PREFIX#RID = n.#PREFIX#RID
)
when matched then update set
  #FIRST_COL#
  delete where N.#PREFIX#OP = ''D''
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';
  begin
    LOGGER('SYNC_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      P_EXCLUDE_COLS,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    if G_KEY_COL_CLOB is null or G_SET_COL_CLOB is null or P_EXCLUDE_COLS is not null then
      L_CLOB := C_PARTITION_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
      big_replace(
        L_CLOB,
        '#ALL_COLS#',
        replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
      L_CLOB := replace(L_CLOB, '#FIRST_COL#', G_FIRST_COL_CLOB);
    else
      L_CLOB := C_GROUP_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '    ')
      );
      big_replace(
        L_CLOB,
        '#ALL_COLS#',
        replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
      );
      big_replace(
        L_CLOB,
        '#SET_COLS#',
        replace(G_SET_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
      );
      L_CLOB := replace(L_CLOB, '#KEY_COLS#', G_KEY_COL_CLOB);
    end if;
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    big_replace(
      L_CLOB,
      '#INSERT_COLS#',
      replace(G_INSERT_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    return L_CLOB;
  end SYNC_SQL;
 
  function SYNC_UPSERT_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
    L_CLOB clob;
    C_CLOB constant clob :=
'merge /*+ qb_name(SYNC_UPSERT) USE_NL(O) */ into (
  select /*+ qb_name(target) */
  #ALL_COLS#
  from #OLD#
) O
using (
  select /*+ qb_name(source) */
  #ALL_COLS#
  from #NEW#
) N
on (
  #ON_COLS#
)
when matched then update set
  #SET_COLS#
  where 1 in (
    #DECODE_COLS#
  )
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';

  begin
    LOGGER('SYNC_UPSERT_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      null,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    if G_SET_COL_CLOB is null then
      RAISE_APPLICATION_ERROR(
        -20007,
        'SYNC_UPSERT_SQL requires a target with both primary and non-key columns'
      );
    end if;
    L_CLOB := C_CLOB;
    big_replace(
      L_CLOB,
      '#ALL_COLS#',
      replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    big_replace(
      L_CLOB,
      '#ON_COLS#',
      replace(G_ON_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    big_replace(
      L_CLOB,
      '#SET_COLS#',
      replace(G_SET_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    big_replace(
      L_CLOB,
      '#DECODE_COLS#',
      replace(G_DECODE_COL_CLOB, C_NEWLINE, C_NEWLINE || '    ')
    );
    big_replace(
      L_CLOB,
      '#INSERT_COLS#',
      replace(G_INSERT_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    return L_CLOB;
  end SYNC_UPSERT_SQL;
 
  function SYNC_CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_CLOB constant clob :=
'merge /*+ qb_name(SYNC_CDC_SQL) USE_NL(O) */ into (
  select /*+ qb_name(target) */
  #ALL_COLS#, rowid #PREFIX#RID
  from #OLD#
) O
using (
  select /*+ qb_name(source) */ #PREFIX#OP, #PREFIX#RID,
  #ALL_COLS#
  from #NEW#
  where #PREFIX#OP in(''D'', ''I'', ''U'')
) N
on (
  O.#PREFIX#RID = n.#PREFIX#RID
)
when matched then update set
  #SET_COLS#
  delete where N.#PREFIX#OP = ''D''
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';
 
  begin
    LOGGER('SYNC_CDC_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      null,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    L_CLOB := C_CLOB;
    big_replace(
      L_CLOB,
      '#ALL_COLS#',
      replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    if G_SET_COL_CLOB is not null then
      big_replace(
        L_CLOB,
        '#SET_COLS#',
        replace(G_SET_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
      );
    else
      L_CLOB := replace(L_CLOB, '#SET_COLS#', G_FIRST_COL_CLOB);
    end if;
    big_replace(
      L_CLOB,
      '#INSERT_COLS#',
      replace(G_INSERT_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    return L_CLOB;
  end SYNC_CDC_SQL;
   
end COMP_SYNC;
/
/
Advertisements

6 thoughts on “COMP_SYNC 1: a new table compare/sync package

  1. Thanks Stew :)

    I like the fact that you chose to differentiate the old package name from this new one, so that we do not accidentally overwrite our existing automation code.

    I’m an avid user of your old package and one of the things I had to “tweak” your MERGE SQL code generation section was to manually add compiler hints which are performance-friendly (like PARALLEL) and add DIRECT-PATH load hints, in order to by-pass the creation of excessive undo and redo logs. This becomes very crucial when comparing very large “source” and “target” data-sets (hundreds of millions of records), where performance is critical.

    If you could add those hints in your generated SELECT as well as MERGE code, it would be very easy to maintain my “tweaked version” and simply use your new versions on an “as-is” basis, otherwise, I would have to keep adding the tweaks every time I upgrade to your new versioned package.

    Many thanks and you have no idea how much I appreciate your awesome work on this…..I have hundreds of custom ETL routines that depend on your package :)

    • Hi Sam,

      Thanks for your kind words. I am very happy to learn that my package is helpful.

      I wouldn’t mind adding performance hints, but I don’t have the environment to properly test. Could you run my package against some tiny test tables and add the hints in the right places in the result?

      Are your tables in the same database? If not, I suspect PARALLEL doesn’t help much since you are stuck with one connection between the databases.

      Also, can’t parallel processing be managed without hints, using ALTER SESSION and / or ALTER TABLE? What about Degree Of Parallelism? Do you want the SELECT part of the MERGE in parallel or also the DML?

      I’m not sure there is a “one size fits all” solution I can put in my package for parallelism, but I could add an option to put APPEND hints in the right place.

      Best regards, Stew

      • Yes, most of my tables are on the same database, however, in cases where I hit other databases via a DBLINK, the PARALLEL is ignored.

        The Parallel Degree in ALTER TABLE does not do much. I use a combination of ALTER SESSION in my custom procedure which then makes a call to your package, retrieves the SELECT and/or MERGE code stored in a CLOB and I execute that, all within my procedure. I had to go into your package and ensure that I added the PARALLEL_INDEX (to read indexes in parallel) and PARALLEL (to read blocks in parallel) hints wherever appropriate in the SELECT statements for faster READS and I also added the APPEND hint in the MERGE code.

        As for the numeric value of a PARALLEL degree, it would be nice if you had an input parameter for that in your package/procedure so that a proper HINT string can be built and appended in your SELECT like:-

        SELECT /*+ PARALLEL_INDEX(n) PARALLEL(n) */ ……

        The same would apply by adding an APPEND hint to the MERGE code.

      • Hi Stew,

        I have a few more suggestions :)

        1. Names like “old” and “new” leave some room for doubt for a new user just starting to understand your package. Given that we always have a “source” and a “target” and we always need to find differences and eventually, sync between them, it does not matter whether the records in either of them were “old” or “new”.

        2. Make the entire package code available as a downloadable “txt” file so that we do not make any mistakes while copy-pasting it from the web page on here to the editor and removing the line numbers, etc.

        Thanks!

      • Sam,

        I am leaving “old” and “new”, which have been used in comparison programs in the past. “Source” and “target” make sense when syncing, less so when comparing.

        WordPress doesn’t allow me to upload .sql, .pks or .pkb files, which is why I have not been uploading files. It seems all I have to do is add a .doc suffix and all is good, so that is what I have done. Hope this helps…

  2. Awesome, thanks Stew :)

    Please don’t forget to rename your “doc” files with some kind of a revision/version number so that as you refine the code and release different versions of the package, we can clearly distinguish between them :)

    Many thanks!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s