Generic Pivot Function

Many folks ask how to do a “dynamic pivot”. It’s often best to use a preliminary query to generate the PIVOT clause. Here’s a generic function to do just that.

[UPDATE 2018-07-31: for advanced use cases, I wrote a more powerful function called ADVANCED_PIVOT, which I explain here: Improved PIVOT Function]

Anatomy of PIVOT

PIVOT clause from Oracle documentation

Let’s take a simple example, getting the average salary by department and by job:

select dname, job, avg(sal) avg_sal
from DEPT join EMP using(DEPTNO)
group by dname, job
order by 1,2;
DNAME JOB AVG_SAL
ACCOUNTING CLERK 1300
ACCOUNTING MANAGER 2450
ACCOUNTING PRESIDENT 5000
RESEARCH ANALYST 3000
RESEARCH CLERK 950
RESEARCH MANAGER 2975
SALES CLERK 950
SALES MANAGER 2850
SALES SALESMAN 1400

 

We want three rows, one per department, and five “pivot columns” for each of the five distinct jobs. The JOB column data become pivot column names and the SAL column data become pivot column data. The desired result is:

DNAME ANALYST CLERK MANAGER PRESIDENT SALESMAN
ACCOUNTING 1300 2450 5000
RESEARCH 3000 950 2975
SALES 950 2850 1400

 

A PIVOT clause needs:

  1. One row source for input: a table, view or query in parentheses. Each column in the input must have a unique name. The row source does not need to do a GROUP BY: the PIVOT clause will do the aggregation.
  2. One or more aggregate functions: they produce the data for the pivot columns.
  3. a PIVOT_FOR clause with one or more columns whose values will correspond to new column names.
  4. a PIVOT_IN clause that lists the pivot columns with their source values.
select * from (
  select dname, job, sal from DEPT join EMP using(DEPTNO)
)
pivot(
  AVG(SAL)
  for JOB
  in (
    'ANALYST' as "ANALYST",
    'CLERK' as "CLERK",
    'MANAGER' as "MANAGER",
    'PRESIDENT' as "PRESIDENT",
    'SALESMAN' as "SALESMAN"
  )
) order by 1
  • line 2 is the input.
  • line 5 is the aggregate function.
    There will be an implicit GROUP BY of DNAME (because not mentioned in the PIVOT) and JOB (because mentioned in the PIVOT_FOR clause).
  • line 6 is the PIVOT_FOR clause: JOB
  • lines 7 through 13 are the PIVOT_IN clause.
    In line 8, a pivot column called ANALYST is defined for when JOB = ‘ANALYST’. The result of the AVG(SAL) function for DNAME = ‘RESEARCH’ and JOB = ‘ANALYST’ will go into the ‘RESEARCH’ row and the ANALYST column.

A Generic PIVOT Template

I like to use what I call “SQL templates“. They are a way of describing a pattern: you distinguish what varies each time from what stays the same, and you put placeholders where the variable stuff should go. For PIVOT, the template has four placeholders.

select * from #SOURCE#
pivot(#AGGFUNCS# for #PIVOTFOR# in (
#PIVOTINLIST#
)) order by 1,2
  1. #SOURCE# is the row source: table, view, query in parentheses.
  2. #AGGFUNCS# is one or more aggregate functions separated by commas, with or without aliases.
  3. #PIVOTFOR# must contain exactly one column name from the source. This is a limitation of my function.
  4. #PIVOTINLIST# lists the pivot columns.

My function gets the first three placeholders from the input and generates the fourth one.

Generating the value for #PIVOTINLIST#

The problem with PIVOT is that the list of pivot columns depends on the data, so we need to know the data before querying it! In our example, we list five column names for the five JOBs in the EMP table, but how do we know what those jobs are?

My function uses a preliminary query to get that list. It has its own template using two of the same placeholders:

q'£select listagg('''' || #PIVOTFOR# || ''' as "' || #PIVOTFOR# || '"', ',')
within group(order by #PIVOTFOR#)
from (select distinct #PIVOTFOR# from #SOURCE#)£'

The Code

create or replace function generic_pivot(
  p_source in varchar2,   -- table, view or query in parentheses
  p_pivotfor in varchar2, -- one column from the input
  p_aggfuncs in varchar2  -- one or more aggregation functions
) return sys_refcursor is

-- Calculates pivot_in_list using SQL 1, updates SQL 2 text and opens ref cursor.
-- Pivot_for_clause can contain one column

  l_sql sys.odcivarchar2list := sys.odcivarchar2list(

q'£select listagg('''' || #PIVOTFOR# || ''' as "' || #PIVOTFOR# || '"', ',')
within group(order by #PIVOTFOR#)
from (select distinct #PIVOTFOR# from #SOURCE#)£',

'select * from #SOURCE#
pivot(#AGGFUNCS# for #PIVOTFOR# in (
#PIVOTINLIST#
)) order by 1,2'
  );
  l_refcur sys_refcursor;
  l_pivotinlist varchar2(32767);
begin
  for i in 1..l_sql.count loop
    l_sql(i) := replace(l_sql(i), '#SOURCE#', p_source);
    l_sql(i) := replace(l_sql(i), '#PIVOTFOR#', p_pivotfor);
  end loop;
  dbms_output.put_line(l_sql(1));
  dbms_output.put_line('/');
  open l_refcur for l_sql(1);
  fetch l_refcur into l_pivotinlist;
  close l_refcur;
  l_sql(2) := replace(l_sql(2), '#AGGFUNCS#', p_aggfuncs);
  l_sql(2) := replace(l_sql(2), '#PIVOTINLIST#', l_pivotinlist);
  dbms_output.put_line(l_sql(2));
  dbms_output.put_line('/');
  open l_refcur for l_sql(2);
  return l_refcur;
end generic_pivot;
/

Test results

SQL> set serveroutput off
SQL> var rc refcursor
SQL> begin
  2    :rc := generic_pivot(
  3      '(select dname, job, sal from DEPT join EMP using(DEPTNO))', 
  4      'JOB', 
  5      'AVG(SAL)'
  6    );
  7  end;
  8  /

PL/SQL procedure successfully completed.

SQL> print :rc

DNAME             ANALYST      CLERK    MANAGER  PRESIDENT   SALESMAN
-------------- ---------- ---------- ---------- ---------- ----------
ACCOUNTING                      1300       2450       5000           
RESEARCH             3000        950       2975                      
SALES                            950       2850                  1400


SQL> begin
  2    :rc := generic_pivot(
  3      '(select dname, job, sal from DEPT join EMP using(DEPTNO))', 
  4      'DNAME', 
  5      'AVG(SAL)'
  6    );
  7  end;
  8  /

PL/SQL procedure successfully completed.

SQL> print :rc

JOB       ACCOUNTING   RESEARCH      SALES
--------- ---------- ---------- ----------
ANALYST                    3000           
CLERK           1300        950        950
MANAGER         2450       2975       2850
PRESIDENT       5000                      
SALESMAN                              1400


SQL> begin
  2    :rc := generic_pivot(
  3      '(select dname, job, sal from DEPT join EMP using(DEPTNO))', 
  4      'JOB', 
  5      'MIN(SAL) min, MAX(SAL) max'
  6    );
  7  end;
  8  /

PL/SQL procedure successfully completed.

SQL> print :rc

DNAME          ANALYST_MIN ANALYST_MAX  CLERK_MIN  CLERK_MAX MANAGER_MIN MANAGER_MAX PRESIDENT_MIN PRESIDENT_MAX SALESMAN_MIN SALESMAN_MAX
-------------- ----------- ----------- ---------- ---------- ----------- ----------- ------------- ------------- ------------ ------------
ACCOUNTING                                   1300       1300        2450        2450          5000          5000                          
RESEARCH              3000        3000        800       1100        2975        2975                                                      
SALES                                         950        950        2850        2850                                     1250         1600

Hope this helps!

Advertisements

More on Comparing Columns across Rows

My previous post got several replies about other ways to compare columns across rows. Here’s my take on all of them.

Test data (again)

create table t as
with data as (
  select level n from dual connect by level <= 3
)
, tests as (
  select row_number() over (order by a.n, b.n) test_id,
  nullif(a.n, 3) a, 
  nullif(b.n, 3) b,
  case a.n when b.n then 'Same' else 'Different' end status
  from data a, data b
)
select test_id, tbl, n, status
from tests
unpivot include nulls (n for tbl in(A,B));

select * from t order by 1,2;
TEST_ID TBL N STATUS
1 A 1 Same
1 B 1 Same
2 A 1 Different
2 B 2 Different
3 A 1 Different
3 B Different
4 A 2 Different
4 B 1 Different
5 A 2 Same
5 B 2 Same
6 A 2 Different
6 B Different
7 A Different
7 B 1 Different
8 A Different
8 B 2 Different
9 A Same
9 B Same

 

My solution (again)

It looks like the most concise test for a difference is COUNT(N) BETWEEN 1 and COUNT(DISTINCT N).

select t.*,
count(distinct n) over(partition by test_id) cnt_distinct,
count(n) over(partition by test_id) cnt,
case when count(n) over(partition by test_id)
  between 1 and count(distinct n) over(partition by test_id)
  then 'Different' else 'Same' end new_status
from t
order by 4,5,6,1,2;
TEST_ID TBL N STATUS CNT_DISTINCT CNT NEW_STATUS
3 A 1 Different 1 1 Different
3 B Different 1 1 Different
6 A 2 Different 1 1 Different
6 B Different 1 1 Different
7 A Different 1 1 Different
7 B 1 Different 1 1 Different
8 A Different 1 1 Different
8 B 2 Different 1 1 Different
2 A 1 Different 2 2 Different
2 B 2 Different 2 2 Different
4 A 2 Different 2 2 Different
4 B 1 Different 2 2 Different
9 A Same 0 0 Same
9 B Same 0 0 Same
1 A 1 Same 1 2 Same
1 B 1 Same 1 2 Same
5 A 2 Same 1 2 Same
5 B 2 Same 1 2 Same

 

Using the DUMP() function (Andrej Pashchenko)

Since the DUMP() function returns a non-null result in every case, we can simply test

COUNT( DISTINCT DUMP(n) ) OVER (PARTITION BY TEST_ID)

If the result is 1, N is the same, otherwise it is different.

This solution is more concise, but in my tests it ran 10 times slower. The DUMP() function chews up some CPU.

Using SYS_OP_MAP_NONNULL() (Tony Hasler)

As with DUMP(), we can test

COUNT( DISTINCT SYS_OP_MAP_NONNULL(n) ) OVER (PARTITION BY TEST_ID)

Again a concise solution. Surprisingly, it runs twice as slow as my solution – and despite one mention in the documentation I doubt that this function is supported.

Simplifying the comparison

Zahar Hilkevich suggested simply comparing the two counts:

select t.*,
count(distinct n) over(partition by test_id) cnt_distinct,
count(n) over(partition by test_id) cnt,
case when count(n)          over(partition by test_id)
        = count(distinct n) over(partition by test_id)
  then 'Different' else 'Same' end new_status
from t
order by 4,5,6,1,2;
TEST_ID TBL N STATUS CNT_DISTINCT CNT NEW_STATUS
3 A 1 Different 1 1 Different
3 B Different 1 1 Different
6 A 2 Different 1 1 Different
6 B Different 1 1 Different
7 A Different 1 1 Different
7 B 1 Different 1 1 Different
8 A Different 1 1 Different
8 B 2 Different 1 1 Different
2 A 1 Different 2 2 Different
2 B 2 Different 2 2 Different
4 A 2 Different 2 2 Different
4 B 1 Different 2 2 Different
9 A Same 0 0 Different
9 B Same 0 0 Different
1 A 1 Same 1 2 Same
1 B 1 Same 1 2 Same
5 A 2 Same 1 2 Same
5 B 2 Same 1 2 Same

 

Notice that in test case 9, when both values are null, Zahar’s test says they are “different”. This is not a problem in the context of displaying different values, since the output values will be null no matter what.

Conclusion

Thanks to those who mentioned other more concise solutions. Despite my taste for concision, I think the solution I proposed is correct and efficient. The choice is yours!

Actually Seeing the Differences

When comparing data between two tables, it’s one thing to query the differences and another thing to actually see them.

I have two tables EMP and EMP2. Here are the rows that are different. Quick, what columns have changed?

EMPNO TBL ROW_CNT ENAME JOB MGR SAL COMM
7369 EMP 2 SMITH CLERK 7902 800
7369 EMP2 2 SMITE CLERK 7902 800
7499 EMP 2 ALLEN SALESMAN 7698 1600 300
7499 EMP2 2 ALLEN SALESGUY 7698 1600 300
7521 EMP 2 WARD SALESMAN 7698 1250 500
7521 EMP2 2 WARD SALESMAN 7788 1250 500
7654 EMP 2 MARTIN SALESMAN 7698 1250 1400
7654 EMP2 2 MARTIN SALESMAN 7698 1750 1400
7698 EMP 2 BLAKE MANAGER 7839 2850
7698 EMP2 2 BLAKE MANAGER 7839 2850 1000
7788 EMP2 1 SCOTT ANALYST 7566 3000
7902 EMP 1 FORD ANALYST 7566 3000

 

I thought so. Now suppose I blank out the columns that are the same in both tables?

EMPNO TBL ROW_CNT ENAME JOB MGR SAL COMM
7369 EMP 2 SMITH
7369 EMP2 2 SMITE
7499 EMP 2 SALESMAN
7499 EMP2 2 SALESGUY
7521 EMP 2 7698
7521 EMP2 2 7788
7654 EMP 2 1250
7654 EMP2 2 1750
7698 EMP 2
7698 EMP2 2 1000
7788 EMP2 1 SCOTT ANALYST 7566 3000
7902 EMP 1 FORD ANALYST 7566 3000

 

That’s better. Now, how can I do that?

Comparing columns across rows

I need to compare two columns in two different rows, and I need the result of the comparison in each row. That sounds like a job for analytic functions. It would be hard to use LAG() or LEAD() because one row would need LEAD() and the other would need LAG(). I finally came up with a way to use COUNT().

For testing, I created a little table T:

TEST_ID TBL N STATUS
1 A 1 Same
1 B 1 Same
2 A 1 Different
2 B 2 Different
3 A 1 Different
3 B Different
4 A 2 Different
4 B 1 Different
5 A 2 Same
5 B 2 Same
6 A 2 Different
6 B Different
7 A Different
7 B 1 Different
8 A Different
8 B 2 Different
9 A Same
9 B Same

 

If I use COUNT(DISTINCT N) I should get either 1 (same values) or 2 (different values) and I’m done: wrong! When I count N or DISTINCT N, null values don’t count. So I thought of comparing COUNT(N) and COUNT(DISTINCT N).

select t.*,
count(distinct n) over(partition by test_id) cnt_distinct,
count(n) over(partition by test_id) cnt
from t
order by 4,5,6,1,2;
TEST_ID TBL N STATUS CNT_DISTINCT CNT
3 A 1 Different 1 1
3 B Different 1 1
6 A 2 Different 1 1
6 B Different 1 1
7 A Different 1 1
7 B 1 Different 1 1
8 A Different 1 1
8 B 2 Different 1 1
2 A 1 Different 2 2
2 B 2 Different 2 2
4 A 2 Different 2 2
4 B 1 Different 2 2
9 A Same 0 0
9 B Same 0 0
1 A 1 Same 1 2
1 B 1 Same 1 2
5 A 2 Same 1 2
5 B 2 Same 1 2

 

It looks like the most concise test for a difference is COUNT(N) BETWEEN 1 and COUNT(DISTINCT N).

select t.*,
count(distinct n) over(partition by test_id) cnt_distinct,
count(n) over(partition by test_id) cnt,
case when count(n) over(partition by test_id)
  between 1 and count(distinct n) over(partition by test_id)
  then 'Different' else 'Same' end new_status
from t
order by 4,5,6,1,2;
TEST_ID TBL N STATUS CNT_DISTINCT CNT NEW_STATUS
3 A 1 Different 1 1 Different
3 B Different 1 1 Different
6 A 2 Different 1 1 Different
6 B Different 1 1 Different
7 A Different 1 1 Different
7 B 1 Different 1 1 Different
8 A Different 1 1 Different
8 B 2 Different 1 1 Different
2 A 1 Different 2 2 Different
2 B 2 Different 2 2 Different
4 A 2 Different 2 2 Different
4 B 1 Different 2 2 Different
9 A Same 0 0 Same
9 B Same 0 0 Same
1 A 1 Same 1 2 Same
1 B 1 Same 1 2 Same
5 A 2 Same 1 2 Same
5 B 2 Same 1 2 Same

 

Finally, I apply this technique to each non-PK column in my EMP comparison and I get the desired result.

select EMPNO,
  case NEW_CNT when 1 then 'EMP2' else 'EMP' end tbl,
  ROW_CNT,
  case when count(ENAME) over(partition by EMPNO)
    between 1 and count(distinct ENAME) over(partition by EMPNO)
    then ENAME end ENAME,
  case when count(JOB) over(partition by EMPNO)
    between 1 and count(distinct JOB) over(partition by EMPNO)
    then JOB end JOB,
  case when count(MGR) over(partition by EMPNO)
    between 1 and count(distinct MGR) over(partition by EMPNO)
    then MGR end MGR,
  case when count(SAL) over(partition by EMPNO)
    between 1 and count(distinct SAL) over(partition by EMPNO)
    then SAL end SAL,
  case when count(COMM) over(partition by EMPNO)
    between 1 and count(distinct COMM) over(partition by EMPNO)
    then COMM end COMM
FROM (
  select
    EMPNO, ENAME, JOB, MGR, SAL, COMM,
    sum(NEW_CNT) NEW_CNT, count(*) over(partition by EMPNO) ROW_CNT
  FROM (
    select 
    EMPNO, ENAME, JOB, MGR, SAL, COMM,
    -1 NEW_CNT
    from EMP O
    union all
    select
    EMPNO, ENAME, JOB, MGR, SAL, COMM,
    1 NEW_CNT
    from emp2 N
  )
  group by
    EMPNO, ENAME, JOB, MGR, SAL, COMM
  having sum(NEW_CNT) != 0
)
order by 1, 2, new_cnt;

Conclusion

By using two COUNT() analytic functions, I can tell whether two columns in two different rows are the same or not, considering two NULLs to be “the same”. This allows me to compare rows, then to compare columns and blank out all but the true differences.

Print a table with one column name + value per row

There are some utilities out there to print tables with one column per row. Why not use a single SQL statement?

Asktom solution

See Dynamic query to print out any table for a 40-line anonymous PL/SQL block that prints out column name and value pairs using DBMS_OUTPUT. Output from the EMP table looks like this:

EMPNO                         : 7369
ENAME                         : SMITH
JOB                           : CLERK
MGR                           : 7902
HIREDATE                      : 17-dec-1980 00:00:00
SAL                           : 800
COMM                          :
DEPTNO                        : 20
-----------------
EMPNO                         : 7499
...

Tanel Poder

Tanel has a similar script here: https://blog.tanelpoder.com/files/scripts/printtab2.sql

SQL only: test data

To make things interesting, here is a table with various number and date-related columns:

drop table t purge;
create table t(
  pk number primary key,
  var varchar2(10),
  rw raw(10),
  num number,
  dte date,
  ts timestamp,
  tsz timestamp with time zone,
  tsl timestamp with local time zone,
  ids interval day to second,
  iym interval year to month
);

insert into t select level,
  '2', hextoraw('FF'),4.5, sysdate,
  systimestamp, systimestamp, systimestamp, 
  to_dsinterval('P1DT2H3M4S'), to_yminterval('P1Y1M')
from dual
connect by level <= 2;
commit;

select * from t;
PK VAR RW NUM DTE TS TSZ TSL IDS IYM
1 2 FF 4,5 2018-05-18 12:21:33 2018-05-18 12:21:33 2018-05-18 12:21:33 +02:00 2018-05-18 12:21:33 EUROPE/PARIS +01 02:03:04. +00001-01
2 2 FF 4,5 2018-05-18 12:21:33 2018-05-18 12:21:33 2018-05-18 12:21:33 +02:00 2018-05-18 12:21:33 EUROPE/PARIS +01 02:03:04. +00001-01

 

Using DBMS_XMLGEN

DBMS_XMLGEN.GETXMLTYPE generates an XML document with the content of whatever query you pass it:

select dbms_xmlgen.getxmltype('select * from t where pk = 1') from dual;

1
2
FF
4,5
2018-05-18 12:21:33
2018-05-18 12:21:33
2018-05-18 12:21:33 +02:00
2018-05-18 12:21:33
+01 02:03:04
+01-01

As you can see, the entire content is wrapped in a ROWSET tag and each row is wrapped in a ROW tag. Within each row, a tag with the column name encloses the column value.

Query with one row

We can generate SQL data from our XML using a generic XMLTABLE clause:

select * from xmltable(
  'ROWSET/ROW/*'
  passing dbms_xmlgen.getxmltype('select * from t where pk = 1')
  columns
    col_name varchar2(30) path 'name()',
    col_val varchar2(100) path 'text()' -- after study, more correct than '.'
);
COL_NAME COL_VAL
PK 1
VAR 2
RW FF
NUM 4,5
DTE 2018-05-18 12:21:33
TS 2018-05-18 12:21:33,523620
TSZ 2018-05-18 12:21:33,523620 +02:00
TSL 2018-05-18 12:21:33,523620
IDS +01 02:03:04
IYM +01-01

 

Watch out! My session’s NLS parameters were used by DBMS_XMLGEN to convert numbers and datetime data to strings, including the comma as a decimal marker because I am in France. Note I changed my timestamp display parameters to show fractional seconds – yes with a comma as decimal “point”…

Query with multiple rows

If there are multiple rows, we can mark each row with an increasing counter – and while we’re at it, we’ll do the same for each column.

select rn, cn, col_name, col_val from xmltable(
  'ROWSET/ROW' passing dbms_xmlgen.getxmltype('select * from t')
  columns
    rn for ordinality,
    xmldata xmltype path '*'
) a
, xmltable('*' passing a.xmldata
  columns
    cn for ordinality,
    col_name varchar2(30) path 'name()',
    col_val varchar2(100) path 'text()' -- after study, more correct than '.'
);
RN CN COL_NAME COL_VAL
1 1 PK 1
1 2 VAR 2
1 3 RW FF
1 4 NUM 4,5
1 5 DTE 2018-05-18 12:21:33
1 6 TS 2018-05-18 12:21:33,523620
1 7 TSZ 2018-05-18 12:21:33,523620 +02:00
1 8 TSL 2018-05-18 12:21:33,523620
1 9 IDS +01 02:03:04
1 10 IYM +01-01
2 1 PK 2
2 2 VAR 2
2 3 RW FF
2 4 NUM 4,5
2 5 DTE 2018-05-18 12:21:33
2 6 TS 2018-05-18 12:21:33,523620
2 7 TSZ 2018-05-18 12:21:33,523620 +02:00
2 8 TSL 2018-05-18 12:21:33,523620
2 9 IDS +01 02:03:04
2 10 IYM +01-01

 

Update: XMLTYPE(CURSOR(…))

Thanks to Iudith Mentzel, I (re)learned that XMLTYPE can take a REF CURSOR as input. This is great news, because with a REF CURSOR the input query can contain bind variables. With version 12.2.0.1 there is a bug when mixing XMLTYPE(CURSOR(…)) with XMLTABLE – if the path contains ‘.’ ! There is no problem on livesql with version 18c.

By using ‘text()’, which finally seems to be the correct path to use anyway, I get the desired results:

select rn, cn, col_name, col_val from xmltable(
  'ROWSET/ROW' passing xmltype(cursor(select * from t))
  columns
    rn for ordinality,
    xmldata xmltype path '*'
) a
, xmltable('*' passing a.xmldata
  columns
    cn for ordinality,
    col_name varchar2(30) path 'name()',
    col_val varchar2(100) path 'text()'
);

Summary

Using DBMS_XMLGEN (or better, XMLTYPE(CURSOR(…)) and XMLTABLE, we can “print” tables one row per column, with a row counter, a column counter, the column name and the column value as a string. Only one SQL statement is required; it produces a result set that, compared to DBMS output, is much easier to massage further or to display in an APEX application.

Hope this helps!

COMP_SYNC 2: exclude surrogate keys

At the recent ILOUG conference, Sabine Heimsath asked how to compare two tables where the surrogate keys do not match. Here’s how, using my revised comparison package.

Test data

drop table o purge;
create table o (
  pk number generated always as identity primary key,
  val1 number,
  val2 number
);
insert into o(val1, val2)
select level, level from dual connect by level <= 10;

drop table n purge;
create table n (
  pk number generated always as identity start with 42 primary key,
  val1 number,
  val2 number
);
insert into n(val1, val2)
select level+1, level+1 from dual connect by level <= 10;

 

Simple compare: the COMPARE_SQL function

If you exclude a column from the comparison, the SQL from this function will also exclude that column from the output. If there are duplicate rows with the same data, they are grouped together in the output, with a count of the number of rows.

select comp_sync.compare_sql('o','n',p_exclude_cols=>'pk') from dual;

select /*+ qb_name(COMPARE) */
  "VAL1", "VAL2",
  decode(sign(sum(Z##NEW_CNT)), 1, 'I', 'D') Z##OP,
  abs(sum(Z##NEW_CNT)) Z##CNT
FROM (
  select /*+ qb_name(old) */
  "VAL1", "VAL2"
    , -1 Z##NEW_CNT
  from O O
  union all
  select /*+ qb_name(new) */
  "VAL1", "VAL2"
    , 1 Z##NEW_CNT
  from n N
)
group by
  "VAL1", "VAL2"
having sum(Z##NEW_CNT) != 0
order by 1, Z##OP;
VAL1 VAL2 Z##OP Z##CNT
1 1 D 1
11 11 I 1

 

Detailed compare: the CDC_SQL function

The SQL from this function will do the comparison you want, but it will return all the involved rows and all the columns.

select comp_sync.cdc_sql('o','n',p_exclude_cols=>'pk') from dual;

select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    "PK", "VAL1", "VAL2",
    case
      when Z##NEW = 1
        and sum(Z##NEW) over(partition by
          "VAL1", "VAL2"
        order by null rows unbounded preceding) > sum(Z##OLD) over(partition by
          "VAL1", "VAL2"
        )
        then 'I'
      when Z##OLD = 1
        and sum(Z##OLD) over(partition by
          "VAL1", "VAL2"
        order by null rows unbounded preceding) > sum(Z##NEW) over(partition by
          "VAL1", "VAL2"
        )
        then 'D'
    end Z##OP, Z##RID
  FROM (
    select /*+ qb_name(old) */
    "PK", "VAL1", "VAL2",
    1 Z##OLD, 0 Z##NEW, rowid Z##RID
    from O O
    union all
    select /*+ qb_name(new) */
    "PK", "VAL1", "VAL2",
    0, 1, null
    from n N
  )
)
where Z##OP is not null;
PK VAL1 VAL2 Z##OP Z##RID
1 1 1 D AAAX/cAAZAAAEfGA
51 11 11 I

 

SYNC_SQL: synchronizing the data

This will generate a MERGE statement that assumes you want to insert new rows into the “old” table with the same key as the “new” table. This is almost certainly not what you want, but all you have to do is adjust the INSERT part manually. In this case, the surrogate key is generated automatically so we just need to remove that column from the INSERT clause.

select comp_sync.sync_sql('o','n',p_exclude_cols=>'pk') from dual;

merge /*+ qb_name(SYNC_PARTITION) USE_NL(O) */ into (
  select /*+ qb_name(target) */
    "PK", "VAL1", "VAL2", rowid Z##RID
  from O
) O
using (
select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    "PK", "VAL1", "VAL2",
    case
      when Z##NEW = 1
        and sum(Z##NEW) over(partition by
          "VAL1", "VAL2"
        order by null rows unbounded preceding) > sum(Z##OLD) over(partition by
          "VAL1", "VAL2"
        )
        then 'I'
      when Z##OLD = 1
        and sum(Z##OLD) over(partition by
          "VAL1", "VAL2"
        order by null rows unbounded preceding) > sum(Z##NEW) over(partition by
          "VAL1", "VAL2"
        )
        then 'D'
    end Z##OP, Z##RID
  FROM (
    select /*+ qb_name(old) */
    "PK", "VAL1", "VAL2",
    1 Z##OLD, 0 Z##NEW, rowid Z##RID
    from O O
    union all
    select /*+ qb_name(new) */
    "PK", "VAL1", "VAL2",
    0, 1, null
    from n N
  )
)
where Z##OP is not null
) N
on (
  O.Z##RID = n.Z##RID
)
when matched then update set
  "VAL1"=N."VAL1"
  delete where N.Z##OP = 'D'
when not matched then insert (
  --"PK", "VAL1", "VAL2"
  "VAL1", "VAL2"
) values(
  --N."PK", N."VAL1", N."VAL2"
  N."VAL1", N."VAL2"
);

2 rows merged.

COMP_SYNC 1: a new table compare/sync package

I have been meaning to update my COMPARE_SYNC package for some time. I want to change the interface and the functionality a bit, so I am leaving the existing package alone and creating a new one called COMP_SYNC.

If you have used the old package, I would greatly appreciate any feedback on the new version: functionality, performance, bugs, etc. Comment away and thanks in advance.

What COMP_SYNC does for you

The package returns CLOBs containing SQL statements for you to adjust / test / execute. It uses CDC (Change Data Capture) format, with a flag (Z##OP) on each row with ‘I’ for insert, ‘U’ for update and ‘D’ for delete.

  • COMPARE_SQL: COMPARE_SQL returns SQL that compares new source and old target using Tom Kyte’s GROUP BY method. Omitted columns are not compared and do not appear in the output.
    • ‘D’ rows are in “old” but not in “new”.
    • ‘I’ rows are in “new” but not in “old”.
      Since there may be duplicates, Z##CNT has the number of rows involved.
  • CDC_SQL: compares an “old” table (not a view) to “new”. You can exclude columns from the comparison, but the output shows entire rows with all columns, including the ROWID of the “old” row. For every ‘U’ row there is a corresponding ‘O’ (for “old”) row with the old values.
  • SYNC_SQL: compares and syncs from source to target: inserts, updates and deletes.
    Works with any combination of key and non-key columns.
  • SYNC_UPSERT_SQL: inserts and updates but no deletes. Works only when there are both key and non-key columns.
  • SYNC_CDC_SQL: directly applies changes from a CDC table such as returned by CDC_SQL.

Parameter changes

If you have already used COMPARE_SYNC, here is what changed:

  • Columns are now in comma-separated lists and not in little SYS.ODCIVARCHAR2LIST tables.
  • Table names and column names are converted to upper case unless you put them in double quotes.
  • P_EXCLUDE_COLS replaces P_ALL_COLS: if you want to exclude columns from the comparison just list them here, instead of having to list all the columns you want to include.
  • P_PREFIX replaces P_OPERATION_COL: I use a few column names in addition to the actual tables, so the prefix is now applied to all of them to avoid collisions with your names.

The code

[Update 2018-02-13: added source code files]

This site does not allow upload of source code, so I had to add a “.doc” suffix.

comp_sync-pks.doc : package specification, rename to comp_sync.pks

comp_sync-pkb.doc : package body, rename to comp_sync.pkb

create or replace package COMP_SYNC
authid current_user as
/*
COMP_SYNC generates SQL for comparing or synchronizing
"old" target and "new" source.
 
- "Old" can be a table or view, local or remote.
  Indicate separately the "old" owner, "old" table and "old" dblink.
  To compare two queries, create a view to use as the "old".
  To sync, "old" must be a table but I do not check that for you.
- "New" can be local, remote, table, view or a query enclosed in parentheses.
  Examples: 'SCOTT.EMP', 'T_SOURCE@DBLINK', '(select * from SCOTT.EMP@DBLINK)'
 
Note: I never check the "new" source for validity.
I only check the "old" target for validity when I look up columns from the data dictionary.
So the generated SQL is not guaranteed to run without error!
   
The generated SQL is returned as a CLOB.
 
To debug, change the value of G_DOLOG to true. See the beginning of the package body.
 
INPUT PARAMETERS:

-- Required
  
P_OLD_TABLE  : name of the target table or view. Must exist in the database.
 
P_NEW_SOURCE : source table or view - or query enclosed in parentheses.

-- Optional
 
P_OLD_OWNER  : owner of the target. Must exist in the database.
  The default is null, which assumes the current user.
 
P_EXCLUDE_COLS   : optional comma-separated list of columns to OMIT from the comparison.
  If you leave out P_EXCLUDE_COLS, every non-virtual column will be compared,
  both visible and invisible.
  If you omit a PK column, the tables are considered not to have a primary key.
 
P_KEY_COLS : optional comma-separated list of primary key columns.
  This overrides the default search for PK columns in ALL_CONS_COLUMNS.
   
P_OLD_DBLINK : dblink to the target database.
  The default is null, which means the target is in the local database.
   
P_PREFIX : prefix to the names of the columns such as the CDC flag
  ('D', 'I', 'U' or 'O' for the "old" rows being updated).
  When syncing, I delete the rows marked 'D' and ignore the rows marked 'O'.
  The default prefix is 'Z##'.
 
Pre 2018-02-01:
  See the COMPARE_SYNC package.
2018-02-01: Major overhaul
    - Parameters reordered to have most usual first
    - P_EXCLUDE_COLS (to exclude some columns) replaces P_ALL_COLS (that included columns).
    - P_OPERATION_COL is replaced by P_PREFIX that begins all column names I make up.
    - P_EXCLUDE_COLS and P_KEY_COLS are now comma-separated lists and not little tables.
    - User, table and column names are now upper cased unless within double quotes.
    - Instead of passing a huge record among internal procedures,
      I now use global variables. So sue me!
    - CDC output rows include the ROWID of the target table, which is used for efficient syncing.
*/
/*
COMPARING:
 
COMPARE_SQL returns SQL that compares new source and old target
using Tom Kyte's GROUP BY method.
Omitted columns are not compared and do not appear in the output.
'D' rows are in "old" but not in "new".
'I' rows are in "new" but not in "old".
Since there may be duplicates, Z##CNT has the number of rows involved.

Example:
  select COMP_SYNC.COMPARE_SQL('T_TARGET', 'T_SOURCE') from DUAL;
*/
  function COMPARE_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;
/*
CDC_SQL produces CDC output: 'D', 'I', 'U' - or 'O' for the "old" rows being updated.
The output includes the ROWID of the target, except when 'I'.

Example:
  select COMP_SYNC.CDC_SQL('T_TARGET', 'T_SOURCE') from DUAL;
*/
  function CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;
/*
SYNCHRONIZING
 
The package can synchronize in one of three ways:
1) SYNC: Compare and sync from source to target: inserts, updates and deletes.
    Works with any combination of key and non-key columns,
    but the target must be a table because I use the ROWID.
    
2) SYNC_UPSERT: sync from source to target: inserts and updates but no deletes.
    Requires a target with both primary key and non-key columns.
    It does not allow for omitting columns: the workaround is to use a view on the target.
    
3) SYNC_CDC: the source is a "Change Data Capture" table.
  It contains inserts, updates and deletes to be directly applied.
  Must contain an column ending with 'OP' containing the operation flag (I,U,D),
  and a column ending in 'RID' with the ROWID of the target row if U or D. 
*/
/*
Example:
  select COMP_SYNC.SYNC_SQL(
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE'
  ) from DUAL;
*/
  function SYNC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;

/*
Example:
  select COMP_SYNC.SYNC_UPSERT_SQL(
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_SOURCE',
    P_KEY_COLS => 'C1,C2'
  ) from DUAL;
*/
  function SYNC_UPSERT_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null
  ) return clob;
 
/*
Example:
  select COMP_SYNC.SYNC_CDC_SQL(
    P_OLD_TABLE => 'T_TARGET',
    P_NEW_SOURCE => 'T_CDC',
    P_OLD_OWNER => user,
    P_KEY_COLS => 'C1,C2',
    P_PREFIX => 'OPCODE'
  ) from DUAL;
*/
  function SYNC_CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob;
 
end COMP_SYNC;
/
create or replace package body COMP_SYNC as
 
  G_DOLOG constant BOOLEAN := false;
    C_NEWLINE constant varchar2(2) := '
';
  
  type TT_VARCHAR2 is table of VARCHAR2(255);
  
  -- set by CHECK_COMMON_INPUTS
  G_OLD_OWNER varchar2(255);
  G_OLD_TABLE varchar2(255);
  G_NEW_SOURCE varchar2(4000);
  G_OLD_DBLINK varchar2(255);
  G_OPERATION_COL varchar2(255);
  G_OLD_OWNER_TABLE varchar2(255);
  
  -- set by MAKE_REPLACEMENTS
  G_ALL_COLS TT_VARCHAR2;   -- all non-virtual columns
  G_SOME_COLS TT_VARCHAR2;  -- all non-virtual columns except those listed on P_EXCLUDE_COLS
  G_KEY_COLS TT_VARCHAR2;   -- from P_KEY_COLS, or by default the "old" primary key columns
  G_FIRST_COL TT_VARCHAR2; -- first column in G_SOME_COLS
  G_ALL_COL_CLOB clob;  
  G_SOME_COL_CLOB clob;
  G_INSERT_COL_CLOB clob;
  G_KEY_COL_CLOB clob;
  G_ON_COL_CLOB clob;
  G_SET_COL_CLOB clob;
  G_FIRST_COL_CLOB clob;
  G_DECODE_COL_CLOB clob;
 
  procedure LOGGER(P_TXT in clob, P_DOLOG in boolean default false) is
  begin
    if G_DOLOG or P_DOLOG then
      DBMS_OUTPUT.PUT_LINE('prompt > ' || P_TXT);
    end if;
  end LOGGER;
  
  /* sets all G_OLD_* parameters, G_NEW_SOURCE and G_OPERATION_COL.
     If P_OLD_OWNER is null, G_OLD_OWNER := user but G_OLD_OWNER_TABLE does not mention schema.
     OWNER, TABLE and OPERATION_COL are uppercased unless within double quotes.
     OWNER is checked for existence. OLD_TABLE is checked for existence later if necessary. */
  procedure CHECK_COMMON_INPUTS(
    P_OLD_OWNER in varchar2,
    P_OLD_TABLE in varchar2,
    P_OLD_DBLINK in varchar2,
    P_NEW_SOURCE in varchar2
  ) is
    L_CNT number;
    L_SQL varchar2(255) :=
q'!select COUNT(*) from ALL_USERS#DBLINK# where USERNAME = trim('"' from '#OLD_OWNER#')!';
  begin
    LOGGER('CHECK_COMMON_INPUTS');
    
    if P_OLD_TABLE is null then 
      RAISE_APPLICATION_ERROR(
        -20001,
        'P_OLD_TABLE must not be null.'
      );
    end if;
    
    if P_OLD_DBLINK is null or SUBSTR(P_OLD_DBLINK,1,1) = '@' then
      G_OLD_DBLINK := upper(P_OLD_DBLINK);
    else
      G_OLD_DBLINK :=  '@' || upper(P_OLD_DBLINK);
    end if;
    
    if substr(P_OLD_OWNER,1,1) = '"' then
      G_OLD_OWNER := P_OLD_OWNER;
    else
      G_OLD_OWNER := upper(P_OLD_OWNER);
    end if;
    
    if substr(P_OLD_TABLE,1,1) = '"' then
      G_OLD_TABLE := P_OLD_TABLE;
    else
      G_OLD_TABLE := upper(P_OLD_TABLE);
    end if;
    
    if G_OLD_OWNER is null then
      G_OLD_OWNER_TABLE := G_OLD_TABLE || G_OLD_DBLINK;
      G_OLD_OWNER := user;
    else
      G_OLD_OWNER_TABLE := G_OLD_OWNER || '.' || G_OLD_TABLE || G_OLD_DBLINK;
    end if;
    
    L_SQL := replace(L_SQL, '#DBLINK#', G_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', G_OLD_OWNER);
    LOGGER(L_SQL);
    execute immediate L_SQL into L_CNT;
    if L_CNT = 0 then
      RAISE_APPLICATION_ERROR(
        -20002,
        'OLD_OWNER = ' ||G_OLD_OWNER|| ': user not found in the database.'
      );
    end if;
    
    if P_NEW_SOURCE is null then
      RAISE_APPLICATION_ERROR(
        -20003,
        'P_NEW_SOURCE is null. Must be table, view or query within parentheses.'
      );
    else
      G_NEW_SOURCE := P_NEW_SOURCE;
    end if;
  
  end CHECK_COMMON_INPUTS;
  
  function COL_TOKENIZE(
    p_string in varchar2
  )
  return TT_VARCHAR2
  as
    c_delim constant varchar2(1) := ',';
    i_prev_pos pls_integer := 1;
    i_pos pls_integer;
    i_max_pos pls_integer := length(p_string) + 1;
    l_col varchar2(255);
    lt_out TT_VARCHAR2 := new TT_VARCHAR2();
    i_out pls_integer := 0;
  begin
    loop
      i_pos := instr(p_string, c_delim, i_prev_pos);
      if i_pos = 0 then
        i_pos := i_max_pos;
      end if;
      l_col := trim(substr(p_string, i_prev_pos, i_pos - i_prev_pos));
      if substr(l_col,1,1) != '"' then
        l_col := '"' || upper(l_col) || '"';
      end if;
      i_out := i_out + 1;
      lt_out.extend;
      lt_out(i_out) := l_col;
      exit when i_pos = i_max_pos;
      i_prev_pos := i_pos + 1;
    end loop;
    return lt_out;
  end COL_TOKENIZE;
 
  /*
  Format input array into CLOB with configurable maximum line length.
  Indentation is handled later using BIG_REPLACE.
  Pattern is simplified printf: each occurence of '%s' is replaced by the array element.
  */
  function STRINGAGG(
    PT_COLS in TT_VARCHAR2,
    P_PATTERN in varchar2 default '%s',
    P_SEPARATOR in varchar2 default ',',
    P_LINEMAXLEN in number default 80
  ) return clob is
    L_CLOB clob;
    L_NEW varchar2(255);
    L_LINELEN number := 0;
  begin
    for I in 1..PT_COLS.COUNT LOOP
      L_NEW := case when I > 1 then ' ' end
        || replace(P_PATTERN, '%s', PT_COLS(I))
        || case when I < PT_COLS.COUNT then P_SEPARATOR end; if L_LINELEN + length(L_NEW) > P_LINEMAXLEN then
        L_CLOB := L_CLOB || C_NEWLINE;
        L_LINELEN := 0;
        L_NEW := SUBSTR(L_NEW,2);
      end if;
      L_CLOB := L_CLOB || L_NEW;
      L_LINELEN := L_LINELEN + length(L_NEW);
    end LOOP;
    return L_CLOB;
  end STRINGAGG;
  
  procedure BIG_REPLACE(
    p_clob in out nocopy clob,
    p_search in varchar2,
    p_replace in clob
  ) is
    c_replace_len constant integer := 30000;
    l_iter integer;
  begin
    if p_search is null then
      RAISE_APPLICATION_ERROR(
        -20004,
        'Internal error in BIG_REPLACE: p_search parameter is null.'
      );
    end if;
    if p_replace is null then
      logger('G_ALL_COL_CLOB : '||G_ALL_COL_CLOB, true);
      logger('G_SOME_COL_CLOB : '||G_SOME_COL_CLOB, true);
      logger('G_INSERT_COL_CLOB : '||G_INSERT_COL_CLOB, true);
      logger('G_KEY_COL_CLOB : '||G_KEY_COL_CLOB, true);
      logger('G_ON_COL_CLOB : '||G_ON_COL_CLOB, true);
      logger('G_SET_COL_CLOB : '||G_SET_COL_CLOB, true);
      logger('G_FIRST_COL_CLOB : '||G_FIRST_COL_CLOB, true);
      logger('G_DECODE_COL_CLOB : '||G_DECODE_COL_CLOB, true);
      RAISE_APPLICATION_ERROR(
        -20005,
        'Internal error in BIG_REPLACE: p_replace parameter is null.'
      );
    end if;
    l_iter := ceil(length(p_replace) / c_replace_len);
    --logger('length(p_replace) : '||length(p_replace));
    --logger('l_iter : '||l_iter);
    for i in 1..l_iter loop
      --logger('(i-1)*c_replace_len+1 : '||((i-1)*c_replace_len+1));
      p_clob := replace(
        p_clob, 
        p_search,
        substr(p_replace, (i-1)*c_replace_len+1, c_replace_len)
          || case when i < l_iter then p_search end ); end loop; end BIG_REPLACE; function GET_ALL_COLS return TT_VARCHAR2 is l_version number; l_instance_sql varchar2(255) := q'!select to_number(regexp_substr(banner, 'Release ([^|.]+)', 1, 1, 'i', 1)) from v$version#DBLINK# where rownum = 1!'; L_TAB_COLS SYS.ODCIVARCHAR2LIST; L_ALL_COLS TT_VARCHAR2 := new TT_VARCHAR2(); L_SQL varchar2(255) := q'!select '"'||COLUMN_NAME||'"' from ALL_TAB_COLS#DBLINK# where (OWNER, TABLE_NAME, VIRTUAL_COLUMN) = ((trim('"' from '#OLD_OWNER#'), trim('"' from '#OLD_TABLE#'), 'NO')) and #VERSION_DEPENDENT# order by SEGMENT_COLUMN_ID!'; begin LOGGER('GET_ALL_COLS'); l_instance_sql := replace(l_instance_sql, '#DBLINK#', G_OLD_DBLINK); LOGGER(l_instance_sql); execute immediate l_instance_sql into l_version; logger('l_version = ' || l_version); if l_version >= 12 then
      L_SQL := replace(L_SQL, '#VERSION_DEPENDENT#', 'USER_GENERATED = ''YES''');
    else
      L_SQL := replace(L_SQL, '#VERSION_DEPENDENT#', 'HIDDEN_COLUMN = ''NO''');
    end if;
    L_SQL := replace(L_SQL, '#DBLINK#', G_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', G_OLD_OWNER);
    L_SQL := replace(L_SQL, '#OLD_TABLE#', G_OLD_TABLE);
    LOGGER(L_SQL);
    execute immediate L_SQL bulk collect into L_TAB_COLS;
    if L_TAB_COLS.COUNT = 0 then
      RAISE_APPLICATION_ERROR(
        -20006,
        G_OLD_OWNER_TABLE || ': table not found.'
      );
    end if;
    L_ALL_COLS.extend(L_TAB_COLS.count);
    for i in 1..L_TAB_COLS.count loop
      L_ALL_COLS(i) := L_TAB_COLS(i);
    end loop;
    return L_ALL_COLS;
  end GET_ALL_COLS;
 
  function GET_KEY_COLS return TT_VARCHAR2 is
    L_KEY_COLS TT_VARCHAR2 := new TT_VARCHAR2();
    L_KEY_COL_LIST SYS.ODCIVARCHAR2LIST;
    L_SQL varchar2(4000) := 
q'!select '"'||COLUMN_NAME||'"'
from ALL_CONS_COLUMNS#DBLINK#
where (OWNER, CONSTRAINT_NAME) = (
  select OWNER, CONSTRAINT_NAME from ALL_CONSTRAINTS#DBLINK#
  where (OWNER, TABLE_NAME, CONSTRAINT_TYPE) =
        ((trim('"' from '#OLD_OWNER#'), trim('"' from '#OLD_TABLE#'), 'P'))
)!';
  begin
    LOGGER('GET_KEY_COLS');
    L_SQL := replace(L_SQL, '#DBLINK#', G_OLD_DBLINK);
    L_SQL := replace(L_SQL, '#OLD_OWNER#', G_OLD_OWNER);
    L_SQL := replace(L_SQL, '#OLD_TABLE#', G_OLD_TABLE);
    LOGGER(L_SQL);
    execute immediate L_SQL bulk collect into L_KEY_COL_LIST;
    L_KEY_COLS.extend(L_KEY_COL_LIST.count);
    for i in 1..L_KEY_COL_LIST.count loop
    L_KEY_COLS(i) := L_KEY_COL_LIST(i);
    end loop;
    return L_KEY_COLS;
  end GET_KEY_COLS;
 
  procedure MAKE_REPLACEMENTS(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2,
    P_EXCLUDE_COLS in varchar2,
    P_KEY_COLS in varchar2,
    P_OLD_DBLINK in varchar2
  ) is
    L_NON_KEY_COLS TT_VARCHAR2;
    L_EXCLUDE_COLS TT_VARCHAR2;
  begin
    LOGGER('MAKE_REPLACEMENTS');
    check_common_inputs(
      P_OLD_OWNER,
      P_OLD_TABLE,
      P_OLD_DBLINK,
      P_NEW_SOURCE
    );
    G_ALL_COLS := GET_ALL_COLS;
    if P_EXCLUDE_COLS is null then
      G_SOME_COLS := G_ALL_COLS;
    else
      L_EXCLUDE_COLS := COL_TOKENIZE(P_EXCLUDE_COLS);
      G_SOME_COLS := G_ALL_COLS multiset except L_EXCLUDE_COLS;
    end if;
    G_FIRST_COL := new TT_VARCHAR2(G_SOME_COLS(1));
    G_ALL_COL_CLOB := STRINGAGG(G_ALL_COLS);
    G_SOME_COL_CLOB := STRINGAGG(G_SOME_COLS);
    G_INSERT_COL_CLOB := STRINGAGG(G_ALL_COLS, 'N.%s');
    G_FIRST_COL_CLOB := STRINGAGG(G_FIRST_COL, '%s=N.%s');
    
    if P_KEY_COLS is null then
      G_KEY_COLS := GET_KEY_COLS;
    else
      G_KEY_COLS := COL_TOKENIZE(P_KEY_COLS);
    end if;
    
    if cardinality(G_KEY_COLS multiset intersect L_EXCLUDE_COLS) > 0 then
      G_KEY_COLS := null;
    end if;
    
    G_KEY_COL_CLOB := null;
    G_ON_COL_CLOB := null;
    G_SET_COL_CLOB := null;
    G_DECODE_COL_CLOB := null;
    if G_KEY_COLS is not null and G_KEY_COLS.COUNT > 0 then
      G_KEY_COL_CLOB := STRINGAGG(G_KEY_COLS);
      G_ON_COL_CLOB := STRINGAGG(G_KEY_COLS, 'O.%s=N.%s', ' and');
      L_NON_KEY_COLS := G_SOME_COLS multiset except G_KEY_COLS;
      if L_NON_KEY_COLS.COUNT between 1 and G_SOME_COLS.COUNT - 1 then
        G_SET_COL_CLOB := STRINGAGG(L_NON_KEY_COLS, '%s=N.%s');
        G_DECODE_COL_CLOB := STRINGAGG(L_NON_KEY_COLS, 'decode(O.%s,N.%s,0,1)');
      end if;
    end if;
    
    logger('G_ALL_COL_CLOB : '||G_ALL_COL_CLOB);
    logger('G_SOME_COL_CLOB : '||G_SOME_COL_CLOB);
    logger('G_INSERT_COL_CLOB : '||G_INSERT_COL_CLOB);
    logger('G_KEY_COL_CLOB : '||G_KEY_COL_CLOB);
    logger('G_ON_COL_CLOB : '||G_ON_COL_CLOB);
    logger('G_SET_COL_CLOB : '||G_SET_COL_CLOB);
    logger('G_FIRST_COL_CLOB : '||G_FIRST_COL_CLOB);
    logger('G_DECODE_COL_CLOB : '||G_DECODE_COL_CLOB);

  end MAKE_REPLACEMENTS;

  function COMPARE_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_CLOB constant clob :=
'select /*+ qb_name(COMPARE) */
  #SOME_COLS#,
  decode(sign(sum(#PREFIX#NEW_CNT)), 1, ''I'', ''D'') #PREFIX#OP,
  abs(sum(#PREFIX#NEW_CNT)) #PREFIX#CNT
FROM (
  select /*+ qb_name(old) */
  #SOME_COLS#
    , -1 #PREFIX#NEW_CNT
  from #OLD# O
  union all
  select /*+ qb_name(new) */
  #SOME_COLS#
    , 1 #PREFIX#NEW_CNT
  from #NEW# N
)
group by
  #SOME_COLS#
having sum(#PREFIX#NEW_CNT) != 0
order by 1, #PREFIX#OP';
  begin
    LOGGER('COMPARE_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      P_EXCLUDE_COLS,
      null,
      P_OLD_DBLINK
    );
    L_CLOB := replace(
      C_CLOB,
      '#SOME_COLS#',
      replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    return L_CLOB;
  end COMPARE_SQL;

  function CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_GROUP_CLOB constant clob :=
'select /*+ qb_name(CDC_GROUP) */
    #SOME_COLS#,
  case count(*) over(partition by #KEY_COLS#) - #PREFIX#NEW_CNT
    when 0 then ''I''
    when 1 then ''U''
    when 2 then ''D''
    when 3 then ''O''
  end #PREFIX#OP,
  max(#PREFIX#RID) over(partition by #KEY_COLS#) #PREFIX#RID
FROM (
  select /*+ qb_name(COMPARE) NO_MERGE */
    #SOME_COLS#,
    sum(#PREFIX#NEW_CNT) #PREFIX#NEW_CNT,
    max(#PREFIX#RID) #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #SOME_COLS#,
    -1 #PREFIX#NEW_CNT, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #SOME_COLS#,
    1 #PREFIX#NEW_CNT, null
    from #NEW# N
  )
  group by
    #SOME_COLS#
  having sum(#PREFIX#NEW_CNT) != 0
)
order by 1, #PREFIX#OP';
    C_PARTITION_CLOB constant clob :=
'select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    #ALL_COLS#,
    case
      when #PREFIX#NEW = 1
        and sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        )
        then ''I''
      when #PREFIX#OLD = 1
        and sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        )
        then ''D''
    end #PREFIX#OP, #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #ALL_COLS#,
    1 #PREFIX#OLD, 0 #PREFIX#NEW, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #ALL_COLS#,
    0, 1, null
    from #NEW# N
  )
)
where #PREFIX#OP is not null';
  begin
    LOGGER('COMPARE_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      P_EXCLUDE_COLS,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    if G_KEY_COL_CLOB is null or P_EXCLUDE_COLS is not null then
      L_CLOB := C_PARTITION_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
      big_replace(
        L_CLOB,
        '#ALL_COLS#',
        replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
    else
      L_CLOB := C_GROUP_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '    ')
      );
      big_replace(L_CLOB, '#KEY_COLS#', G_KEY_COL_CLOB);
    end if;
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    return L_CLOB;
  end CDC_SQL; 
  
  function SYNC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_EXCLUDE_COLS in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_GROUP_CLOB constant clob :=
'merge /*+ qb_name(SYNC_GROUP) USE_NL(O) */ into (
  select /*+ qb_name(target) */
  #ALL_COLS#, rowid #PREFIX#RID
  from #OLD#
) O
using (
select * from (
select /*+ qb_name(CDC_GROUP) */
    #SOME_COLS#,
  case count(*) over(partition by #KEY_COLS#) - #PREFIX#NEW_CNT
    when 0 then ''I''
    when 1 then ''U''
    when 2 then ''D''
    when 3 then ''O''
  end #PREFIX#OP,
  max(#PREFIX#RID) over(partition by #KEY_COLS#) #PREFIX#RID
FROM (
  select /*+ qb_name(COMPARE) NO_MERGE */
    #SOME_COLS#,
    sum(#PREFIX#NEW_CNT) #PREFIX#NEW_CNT,
    max(#PREFIX#RID) #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #SOME_COLS#,
    -1 #PREFIX#NEW_CNT, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #SOME_COLS#,
    1 #PREFIX#NEW_CNT, null
    from #NEW# N
  )
  group by
    #SOME_COLS#
  having sum(#PREFIX#NEW_CNT) != 0
)
)
where #PREFIX#OP in(''I'',''U'',''D'')
) N
on (
  O.#PREFIX#RID = n.#PREFIX#RID
)
when matched then update set
  #SET_COLS#
  where N.#PREFIX#OP in (''U'', ''D'')
  delete where N.#PREFIX#OP = ''D''
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';
    C_PARTITION_CLOB constant clob :=
'merge /*+ qb_name(SYNC_PARTITION) USE_NL(O) */ into (
  select /*+ qb_name(target) */
    #ALL_COLS#, rowid #PREFIX#RID
  from #OLD#
) O
using (
select /*+ qb_name(CDC_PARTITION) */ * from (
  select /*+ qb_name(before_filter) */
    #ALL_COLS#,
    case
      when #PREFIX#NEW = 1
        and sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        )
        then ''I''
      when #PREFIX#OLD = 1
        and sum(#PREFIX#OLD) over(partition by
          #SOME_COLS#
        order by null rows unbounded preceding) > sum(#PREFIX#NEW) over(partition by
          #SOME_COLS#
        )
        then ''D''
    end #PREFIX#OP, #PREFIX#RID
  FROM (
    select /*+ qb_name(old) */
    #ALL_COLS#,
    1 #PREFIX#OLD, 0 #PREFIX#NEW, rowid #PREFIX#RID
    from #OLD# O
    union all
    select /*+ qb_name(new) */
    #ALL_COLS#,
    0, 1, null
    from #NEW# N
  )
)
where #PREFIX#OP is not null
) N
on (
  O.#PREFIX#RID = n.#PREFIX#RID
)
when matched then update set
  #FIRST_COL#
  delete where N.#PREFIX#OP = ''D''
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';
  begin
    LOGGER('SYNC_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      P_EXCLUDE_COLS,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    if G_KEY_COL_CLOB is null or G_SET_COL_CLOB is null or P_EXCLUDE_COLS is not null then
      L_CLOB := C_PARTITION_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
      big_replace(
        L_CLOB,
        '#ALL_COLS#',
        replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '          ')
      );
      L_CLOB := replace(L_CLOB, '#FIRST_COL#', G_FIRST_COL_CLOB);
    else
      L_CLOB := C_GROUP_CLOB;
      big_replace(
        L_CLOB,
        '#SOME_COLS#',
        replace(G_SOME_COL_CLOB, C_NEWLINE, C_NEWLINE || '    ')
      );
      big_replace(
        L_CLOB,
        '#ALL_COLS#',
        replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
      );
      big_replace(
        L_CLOB,
        '#SET_COLS#',
        replace(G_SET_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
      );
      L_CLOB := replace(L_CLOB, '#KEY_COLS#', G_KEY_COL_CLOB);
    end if;
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    big_replace(
      L_CLOB,
      '#INSERT_COLS#',
      replace(G_INSERT_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    return L_CLOB;
  end SYNC_SQL;
 
  function SYNC_UPSERT_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null
  ) return clob is
    L_CLOB clob;
    C_CLOB constant clob :=
'merge /*+ qb_name(SYNC_UPSERT) USE_NL(O) */ into (
  select /*+ qb_name(target) */
  #ALL_COLS#
  from #OLD#
) O
using (
  select /*+ qb_name(source) */
  #ALL_COLS#
  from #NEW#
) N
on (
  #ON_COLS#
)
when matched then update set
  #SET_COLS#
  where 1 in (
    #DECODE_COLS#
  )
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';

  begin
    LOGGER('SYNC_UPSERT_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      null,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    if G_SET_COL_CLOB is null then
      RAISE_APPLICATION_ERROR(
        -20007,
        'SYNC_UPSERT_SQL requires a target with both primary and non-key columns'
      );
    end if;
    L_CLOB := C_CLOB;
    big_replace(
      L_CLOB,
      '#ALL_COLS#',
      replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    big_replace(
      L_CLOB,
      '#ON_COLS#',
      replace(G_ON_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    big_replace(
      L_CLOB,
      '#SET_COLS#',
      replace(G_SET_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    big_replace(
      L_CLOB,
      '#DECODE_COLS#',
      replace(G_DECODE_COL_CLOB, C_NEWLINE, C_NEWLINE || '    ')
    );
    big_replace(
      L_CLOB,
      '#INSERT_COLS#',
      replace(G_INSERT_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    return L_CLOB;
  end SYNC_UPSERT_SQL;
 
  function SYNC_CDC_SQL(
    P_OLD_TABLE in varchar2,
    P_NEW_SOURCE in varchar2,
    P_OLD_OWNER in varchar2 default null,
    P_KEY_COLS in varchar2 default null,
    P_OLD_DBLINK in varchar2 default null,
    P_PREFIX in varchar2 default 'Z##'  --'
  ) return clob is
    L_CLOB clob;
    C_CLOB constant clob :=
'merge /*+ qb_name(SYNC_CDC_SQL) USE_NL(O) */ into (
  select /*+ qb_name(target) */
  #ALL_COLS#, rowid #PREFIX#RID
  from #OLD#
) O
using (
  select /*+ qb_name(source) */ #PREFIX#OP, #PREFIX#RID,
  #ALL_COLS#
  from #NEW#
  where #PREFIX#OP in(''D'', ''I'', ''U'')
) N
on (
  O.#PREFIX#RID = n.#PREFIX#RID
)
when matched then update set
  #SET_COLS#
  delete where N.#PREFIX#OP = ''D''
when not matched then insert (
  #ALL_COLS#
) values(
  #INSERT_COLS#
)';
 
  begin
    LOGGER('SYNC_CDC_SQL');
    MAKE_REPLACEMENTS(
      P_OLD_TABLE,
      P_NEW_SOURCE,
      P_OLD_OWNER,
      null,
      P_KEY_COLS,
      P_OLD_DBLINK
    );
    L_CLOB := C_CLOB;
    big_replace(
      L_CLOB,
      '#ALL_COLS#',
      replace(G_ALL_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    if G_SET_COL_CLOB is not null then
      big_replace(
        L_CLOB,
        '#SET_COLS#',
        replace(G_SET_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
      );
    else
      L_CLOB := replace(L_CLOB, '#SET_COLS#', G_FIRST_COL_CLOB);
    end if;
    big_replace(
      L_CLOB,
      '#INSERT_COLS#',
      replace(G_INSERT_COL_CLOB, C_NEWLINE, C_NEWLINE || '  ')
    );
    L_CLOB := replace(L_CLOB, '#OLD#', G_OLD_OWNER_TABLE);
    L_CLOB := replace(L_CLOB, '#NEW#', G_NEW_SOURCE);
    L_CLOB := replace(L_CLOB, '#PREFIX#', P_PREFIX);
    return L_CLOB;
  end SYNC_CDC_SQL;
   
end COMP_SYNC;
/
/

Techniques for Comparing Tables

In my “Advanced Row Pattern Matching” presentation, I demonstrate using MATCH_RECOGNIZE to compare tables. Kim Berg Hansen asked me to compare this technique with others. I did some quick tests and here are the results with some comments.

Technique Seconds
Full join 1
Group by (HASH) 1
Group by (SORT) 1.4
Analytic function 2.5
MATCH_RECOGNIZE 3.7

 

The “Full join” technique only works when we have a primary or unique key that is shared by both tables. I prefer the GROUP BY technique popularized by Tom Kyte, even though it may be a bit slower. When testing, I noticed that the HASH GROUP BY algorithm performs better than SORT GROUP BY, as others have written.

If either of the tables contains duplicate rows (which may happen if we don’t compare all of the columns, or if there is no primary key), then GROUP BY will output one row. This may be a problem if we want data (such as the ROWID)  that was not included in the comparison. In that case, we could use analytic functions or the MATCH_RECOGNIZE clause to compare and output all the rows and columns of interest. As you can see, the analytic function is more than twice as slow but it easily beats the MATCH_RECOGNIZE clause.

I use the output from table comparisons to synchronize the tables, so capturing the ROWID is important to me even when a primary or unique key is not available. For that use case, I will prefer analytic functions from now on.