SQL table macros 14: parameterized views

Developers like to reuse code. In SQL, views are reusable queries with lots of advantages, but one drawback: no parameters to limit the queries to only those rows we want. Sure, we can add a WHERE clause when selecting from a view, but the view might return lots of unnecessary data before the WHERE clause filters the rows.

As Andrej Pashchenko recently wrote, SQL table macros can get the results we would want from parameterized views (if they existed), but they do not have all the advantages of real views: see https://blog.sqlora.com/en/parameterized-views-with-sql-macros-part-2/ He wants to combine the advantages of views and macros, and so do I.

(To skip the explanations and see what I came up with, go to My proposal.)

I’ll concentrate on these advantages of views:

  1. We can include them in queries anywhere a table could appear.
  2. The *_DEPENDENCIES views show the “dependency chain” between them and other objects.

Here is a super simple example:

create or replace view dept_emp as
  select deptno, dname, loc, empno, ename, job, mgr, sal, comm
  from dept
  left join emp using(deptno);
select name, type, referenced_name, referenced_type
from user_dependencies where name = 'DEPT_EMP';
NAME        TYPE    REFERENCED_NAME    REFERENCED_TYPE
DEPT_EMP    VIEW    DEPT               TABLE
DEPT_EMP    VIEW    EMP                TABLE

Now, what if we want to filter on DEPTNO? We can just copy the query from the view and add a filter:

create or replace function dept_emp_macro(p_deptno number)
return varchar2 sql_macro is
begin
  return '
select deptno, dname, loc, empno, ename, job, mgr, sal, comm
from dept
left join emp using(deptno)
WHERE DEPTNO = P_DEPTNO';
end dept_emp_macro;
/
select * from dept_emp_macro(10);
DEPTNO DNAME      LOC      EMPNO ENAME  JOB        MGR  SAL COMM
    10 ACCOUNTING NEW YORK  7839 KING   PRESIDENT      5000
    10 ACCOUNTING NEW YORK  7782 CLARK  MANAGER   7839 2450
    10 ACCOUNTING NEW YORK  7934 MILLER CLERK     7782 1300
select name, type, referenced_name, referenced_type
from user_dependencies where name = 'DEPT_EMP_MACRO';
NAME              TYPE        REFERENCED_NAME    REFERENCED_TYPE    
DEPT_EMP_MACRO    FUNCTION    STANDARD           PACKAGE  

With the macro, we have a parameterized query but we have lost the information about the dependency chain in USER_DEPENDENCIES, so the second advantage of views is lost. Not only that, but the first advantage is limited as well: currently SQL macros inside WITH clauses are not permitted, and WITH clauses inside SQL macros cannot use scalar parameters!

My proposal

  1. I write and test a complete query. A WITH clause at the beginning contains hard-coded parameters.
  2. I create a view based on that query, including the hard_coded parameters.
  3. A generic SQL macro returns the view text after stripping off the hard-coded WITH clause.
  4. I call the macro using a new WITH clause, that can include whatever values or bind variables I want.
create or replace function parameterized_view(
  p_view dbms_tf.table_t,
  P_PARMS dbms_tf.table_t
)
return clob sql_macro is
  l_view_text long;
begin
  select text into l_view_text from user_views
  where view_name = trim('"' from p_view.table_name);
  l_view_text :=
    regexp_replace(
      regexp_replace(l_view_text, 'with\s*p_parms[^)]*\)\s*', '', 1, 0, 'i'),
      '^,', 'with '
    );
  return l_view_text;
end parameterized_view;
/
create or replace view dept_emp_pv as
with P_PARMS as (select 'RESEARCH' dname, 'CLERK' job from dual)
, d as (
  select deptno, dname, loc from dept
  where dname = (select dname from P_PARMS)
)
, e as (
  select deptno, empno, ename, job, mgr, sal, comm
  from emp
  where job = (select job from P_PARMS)
)
select * from d left join e using(deptno);
with PARMS as (select 'SALES' dname, 'SALESMAN' job from dual)
select * from parameterized_view(dept_emp_pv, PARMS);
DEPTNO DNAME    LOC        EMPNO    ENAME     JOB         MGR     SAL    COMM
    30 SALES    CHICAGO    7499     ALLEN     SALESMAN   7698    1600     300
    30 SALES    CHICAGO    7521     WARD      SALESMAN   7698    1250     500
    30 SALES    CHICAGO    7654     MARTIN    SALESMAN   7698    1250    1400
    30 SALES    CHICAGO    7844     TURNER    SALESMAN   7698    1500

Note that in the view, the “parameters” clause must have the same name as the second parameter in the macro: P_PARMS. When I call the macro, I can give the clause any name I want, since I pass the name to the macro.

Remember, DEPT_EMP_PV is the identifier of a table or view, so if it is invalidated then the query will automatically be reparsed and the new text will be picked up from the data dictionary. The dependency chain will work even though the macro itself has not been invalidated.

To conclude, using one 16-line generic macro we can have almost all the advantages of views while adding support for parameters! The views do have to be written with this usage in mind, and they are meant to be used as main queries, not subqueries.

SQL table macros 13: compare with primary key

I just updated my COMPARE_ROWSETS function to handle an optional primary or unique key.

If that optional parameter is left out, the function uses the venerable GROUP BY method that handles duplicate rows.

If key columns are indicated, then the “Z#_OP” column can contain updates :

  • U: Updated data (as present in p_new_table)
  • O: Old data (as present in p_old_table)

There is no need for a “Z#_CNT” column since there can be no duplicate rows.

Here is an example of use with a key column :

drop table emp2 purge;
 
create table emp2 as select * from emp;
 
update emp2 set ename = ename||' KONG' where rownum = 1;
 
insert into emp2 
select EMPNO+1000, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO
from emp2 where rownum <= 4 and job != 'PRESIDENT';

select * from compare_rowsets(
  emp, emp2,
  p_key_cols => columns(empno),
  p_exclude_cols => columns(comm)
)
order by empno, z#_op;
Z#_OP EMPNO ENAME JOB MGR HIREDATE SAL DEPTNO
O 7839 KING PRESIDENT   1981-11-17 00:00:00 5000 10
U 7839 KING KONG PRESIDENT   1981-11-17 00:00:00 5000 10
I 8566 JONES MANAGER 7839 1981-04-02 00:00:00 2975 20
I 8698 BLAKE MANAGER 7839 1981-05-01 00:00:00 2850 30
I 8782 CLARK MANAGER 7839 1981-06-09 00:00:00 2450 10
I 8788 SCOTT ANALYST 7566 1987-04-19 00:00:00 3000 20
 

SQL table macros 12: compare tables or queries

The GROUP BY method for comparing table data just turned 18. To celebrate, let’s make it a macro.

My second ever blog post described a method, popularised by Tom Kyte, for comparing the contents of tables or queries. As part of my COMP_SYNC package, the COMPARE_SQL function returns a SELECT statement that uses that method. With a macro, instead of returning the statement I can just execute it and compare tables, views and / or named subqueries.

[UPDATE 2022-02-25 : there is now an optional parameter to indicate a primary or unique key.]

  • I call the two data sources “rowsets”: one is “old”, the other is “new”.
  • Optional excluded columns are excluded from the comparison and the output.
  • Optional key columns must identify all rows uniquely.
    • With key columns, output column “Z#_OP” indicates the type of change:
      • I: Insert into old (present in new, not old)
      • D: Delete from old (present in old, not new)
      • U: Updated data (as present in new)
      • O: Old data (as present in old)
    • Without key columns, the output contains only ‘D’ and ‘I’ rows, and column “Z#_CNT” shows the number of rows to be deleted or inserted.
  • The output contains the values used for the comparison, which may differ from the actual values of the data if the data types do not allow direct comparison (LOBs for example).

The code makes heavy use of my SQM_UTIL package to configure the SELECT template. Depending on the data type of each column, an expression is applied to allow comparisons. For example, LOB column content is replaced by a hash and user-defined types are replaced by JSON.

create or replace function compare_rowsets(
  p_old_table in dbms_tf.table_t,
  p_new_table in dbms_tf.table_t,
  p_key_cols in dbms_tf.columns_t default null,
  p_exclude_cols in dbms_tf.columns_t default null
) return clob sql_macro is
/*
Compares tables, views or named subqueries; one is "old", one is "new".
Optional excluded columns are excluded from the comparison and the output.

The output contains the values used for the comparison, which may differ
from the actual values of the data if the data types do not allow
direct comparison (LOBs for example).

Column "Z#_OP" indicates the type of change:
- I: Insert into old (present in new, not old)
- D: Delete from old (present in old, not new)
- U: Updated data (as present in new)
- O: Old data (as present in old)

If present, key column(s) must identify all rows uniquely.

Without key columns, the output contains only 'D' and 'I' rows,
and column "Z#_CNT" shows the number of rows to be deleted or inserted.
*/
  l_col_column_names_old long;
  l_col_comparables_old long;
  l_col_comparables_new long;
  l_col_keys long;
  l_sql clob;
begin
  sqm_util.col_column_names(p_old_table, l_col_column_names_old, p_exclude_cols);
  sqm_util.col_comparables(p_old_table, l_col_comparables_old, p_exclude_cols);
  sqm_util.col_comparables(p_new_table, l_col_comparables_new, p_exclude_cols);
  
  if p_key_cols is null then
  
    l_sql :=
'select /*+ qb_name(COMPARE) */
  decode(sign(sum(Z#_NEW_CNT)), 1, ''I'', ''D'') Z#_OP,
  abs(sum(Z#_NEW_CNT)) Z#_CNT,
  '|| l_col_column_names_old ||'
FROM (
  select /*+ qb_name(old) */
  '|| l_col_comparables_old ||'
    , -1 Z#_NEW_CNT
  from p_old_table O
  union all
  select /*+ qb_name(new) */
  '|| l_col_comparables_new ||'
    , 1 Z#_NEW_CNT
  from p_new_table N
)
group by
  '|| l_col_column_names_old ||'
having sum(Z#_NEW_CNT) != 0';

  else
    sqm_util.list_columns(p_key_cols, l_col_keys);
    l_sql :=

'select /*+ qb_name(COMPARE) */
  case count(*) over(partition by
    '|| l_col_keys ||'
  ) - Z#_NEW_CNT
    when 0 then ''I''
    when 1 then ''U''
    when 2 then ''D''
    when 3 then ''O''
  end Z#_OP,
  '|| l_col_column_names_old ||'
FROM (
  select
    '|| l_col_column_names_old ||',
    sum(Z#_NEW_CNT) Z#_NEW_CNT
  FROM (
    select /*+ qb_name(old) */
    '|| l_col_comparables_old ||',
    -1 Z#_NEW_CNT
    from p_old_table O
    union all
    select /*+ qb_name(new) */
    '|| l_col_comparables_new ||',
    1 Z#_NEW_CNT
    from p_new_table N
  )
  group by
    '|| l_col_column_names_old ||'
  having sum(Z#_NEW_CNT) != 0
)';

  end if;
  --dbms_output.put_line(l_sql);
  return l_sql;
end compare_rowsets;
/
drop table emp2 purge;
 
create table emp2 as select * from emp;
 
update emp2 set ename = ename||' KONG' where rownum = 1;
 
insert into emp2 
select EMPNO+1000, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO
from emp2 where rownum <= 4 and job != 'PRESIDENT';
 
select * from compare_rowsets(
  emp, emp2, 
  p_exclude_cols => columns(comm)
)
order by empno, z#_op;
Z#_OP Z#_CNT EMPNO ENAME JOB MGR HIREDATE SAL DEPTNO
D 1 7839 KING PRESIDENT   1981-11-17 00:00:00 5000 10
I 1 7839 KING KONG PRESIDENT   1981-11-17 00:00:00 5000 10
I 1 8566 JONES MANAGER 7839 1981-04-02 00:00:00 2975 20
I 1 8698 BLAKE MANAGER 7839 1981-05-01 00:00:00 2850 30
I 1 8782 CLARK MANAGER 7839 1981-06-09 00:00:00 2450 10
I 1 8788 SCOTT ANALYST 7566 1987-04-19 00:00:00 3000 20
 

Hierarchical JSON

At last, a use case where PL/SQL is faster than SQL : avoiding unnecessary ordering !

In the last few years, different solutions have been found to represent hierarchical data in JSON format :

I won’t go into the SQL solutions in detail. Basically there are three stages :

  1. Get the data in hierarchical order and return the order, the level, and the data as a JSON object.
  2. Using the LEAD() and LAG() functions, determine whether the current JSON object is a child, a parent or a sibling and apply the appropriate JSON “glue”.
  3. Aggregate the result into a CLOB, preserving the original order.

Notice that stages 2 and 3 require the data to be in the order established by stage 1. In the SQL solutions, the execution plan shows that unnecessary sorts were done, sometimes requiring the use of TEMP space : yuck !

I finally realised that a PL/SQL function could read the output from stage 1, and for each input row add the JSON “glue” and append the result to the CLOB that the function returns. No extra sorting, and it runs about 40% faster than the most optimal SQL solution. Let me explain a few details :

  • The main input is a CURSOR expression : this is not a literal, but a real live SELECT statement that can include bind variables. It returns only the level and a JSON object with the desired data, so it is quite generic. The ORDER SIBLINGS BY clause is optional.
  • The input parameter is a strongly typed REF CURSOR, which allows it to be fetched into a table of P/SQL records.
  • The input is fetched using BULK COLLECT with LIMIT, which uses little memory. Special handling is needed, because I need to know the levels of the rows immediately preceding and following the row being processed.
  • If a JSON object is a parent, I add a name / value pair where the value is an array. The name is by default ‘children’ but it is configurable by an optional second parameter.
  • Concatenating text to a CLOB is a costly operation. To speed it up, I fill up a VARCHAR2 buffer, then concatenate the buffer to the CLOB.
  • Use PLSQL_OPTIMIZE_LEVEL=3 in order to “inline” the calls to APPEND_JSO. This shaves off about .2 seconds per million rows processed.
create or replace package hier_to_clob is
  type t_hier_rec is record(
    lvl pls_integer,
    jso varchar2(4000)
  );
  type tt_hier_rec is table of t_hier_rec;
  TYPE t_hier_rc IS REF CURSOR RETURN t_hier_rec;
  function get(
    p_hier_rc in t_hier_rc,
    p_array_name varchar2 default 'children',
    p_limit integer default 500
  ) return clob;
end hier_to_clob;
/
create or replace package body hier_to_clob is
  function get(
    p_hier_rc in t_hier_rc,
    p_array_name varchar2 default 'children',
    p_limit integer default 500
  ) return clob is
    l_array_name varchar2(130) := ',"'||p_array_name||'":[';
    lt_hier_rec tt_hier_rec;
    l_hier_rec_prev t_hier_rec := t_hier_rec(0, null);
    l_lvl_prev2 pls_integer := 0;
    l_clob clob;
    l_buffer varchar2(32767) := null;
    l_buflen pls_integer := 0;
    do_prev boolean:= false;
    procedure append_jso(
      p_jso varchar2,
      p_lvl_prev pls_integer,
      p_lvl pls_integer,
      p_lvl_next pls_integer
    ) is
      l_jso varchar2(4000);
    begin
      l_jso :=
        case
          when p_lvl_prev = 0 then null
          when p_lvl - p_lvl_prev = 1 then l_array_name
          when p_lvl > 1 then ','
        end ||
        rtrim(p_jso, '}') ||
        rpad('}', (p_lvl - p_lvl_next) * 2 + 1, ']}');

      if l_buflen + lengthb(l_jso) > 32767 then
        l_clob := l_clob || l_buffer;
        l_buffer := l_jso;
        l_buflen := lengthb(l_buffer);
      else
        l_buffer := l_buffer || l_jso;
        l_buflen := l_buflen + lengthb(l_jso);
      end if;
    end append_jso;
  begin
    loop
      fetch p_hier_rc bulk collect into lt_hier_rec limit p_limit;
      if do_prev then
        append_jso(
          l_hier_rec_prev.jso,
          l_lvl_prev2,
          l_hier_rec_prev.lvl,
          case when lt_hier_rec.count > 0 then lt_hier_rec(1).lvl else 1 end
        );
        do_prev := false;
      end if;
      for i in 1..lt_hier_rec.count-1 loop
        append_jso(
          lt_hier_rec(i).jso,
          case
            when i = 1 then l_hier_rec_prev.lvl
            else lt_hier_rec(i-1).lvl
          end,
          lt_hier_rec(i).lvl,
          lt_hier_rec(i+1).lvl
        );
      end loop;
      if lt_hier_rec.count > 0 then
        l_lvl_prev2 :=
          case lt_hier_rec.count
            when 1 then l_hier_rec_prev.lvl
            else lt_hier_rec(lt_hier_rec.count-1).lvl
          end;
        l_hier_rec_prev := lt_hier_rec(lt_hier_rec.count);
        do_prev := true;
      end if;
      exit when p_hier_rc%notfound;
    end loop;
    if do_prev then
      append_jso(
        l_hier_rec_prev.jso,
        l_lvl_prev2,
        l_hier_rec_prev.lvl,
        1
      );
    end if;
    if l_buflen > 0 then
      l_clob := l_clob || l_buffer;
    end if;
    return l_clob;
  end get;
end hier_to_clob;
/
select hier_to_clob.get(
  cursor(
    select level, json_object(empno, ename)
    from emp
    start with empno = 7566
    connect by mgr = prior empno
  ),
  'grunts'
)
from dual;
/* pretty printing done outside of the function */
{
  "empno" : 7566,
  "ename" : "JONES",
  "grunts" :
  [
    {
      "empno" : 7788,
      "ename" : "SCOTT",
      "grunts" :
      [
        {
          "empno" : 7876,
          "ename" : "ADAMS"
        }
      ]
    },
    {
      "empno" : 7902,
      "ename" : "FORD",
      "grunts" :
      [
        {
          "empno" : 7369,
          "ename" : "SMITH"
        }
      ]
    }
  ]
}

SQL table macros 11: JUST_PIVOT for 21c (and 19c)

[HUGE UPDATE 2022-01-14 15:45 GMT: thanks to Iudith Mentzel, here is a much simpler and better solution than the one I originally posted today!]

[UPDATE 2022-01-15: to ensure that the INVALIDATE_OBJECT function is called only once, I made it DETERMINISTIC; I also wrapped the call in a scalar subquery, as Iudith Mentzel suggested in her comment below.]

My previous JUST_PIVOT solution for dynamic pivoting does not work in Oracle DB 21c: sorry about that!

Here’s the explanation:

  1. When we want to pivot rows to columns, the columns may change if the data changes.
  2. A SQL macro can change the columns, but only during the hard parse phase.
  3. My previous attempt forces a hard parse every time by using a “flashback query”:
    • All parsed statements cause a “cursor” to be placed in the shared pool;
    • cursors that can be “shared” are reused if the same statement is submitted again;
    • cursors from flashback queries are marked “unshareable”, which means the cursor cannot be be reused, so there is a hard parse at every execution.
  4. Starting with version 21c, flashback query cursors can be shared, which means that technique no longer works.

Is this a bug or a feature?

Why, after all these years, have flashback queries become shareable? Was this really a conscious decision by Oracle, or is it a bug that Oracle will “fix” and Make my Macro Great Again? Thanks to a tweet by Martin Berger (@martinberx) which got me to read this article by Tim Hall , I confirmed that this new behavior is intended:

select description from V$SYSTEM_FIX_CONTROL
where bugno = 10123661;

DESCRIPTION
--------------
Enable cursor sharing for AS OF queries
 

Alternative solution

This is based on a brilliant suggestion by Iudith Mentzel on Twitter (@mentzel_iudith):

  • Create a function that invalidates the SQL macro using DBMS_UTILITY.INVALIDATE;
  • Have the generated SQL statement call that function.
    • The macro will have finished, so the invalidation does not interfere with the macro execution;
    • The invalidation does not commit, so no bad side effects.
    • Also no child cursors! That was the drawback of the AS OF technique.
create or replace function invalidate_object (
  p_owner in varchar2,
  p_object_name in varchar2
) return number deterministic is
  l_object_id number;
begin
  select object_id into l_object_id
  from all_objects
  where (owner, object_name) = ((p_owner, p_object_name));
  dbms_utility.invalidate(
    p_object_id => l_object_id,
    p_option_flags => dbms_utility.INV_ERROR_ON_RESTRICTIONS
  );
  return 0;
end invalidate_object;
/

create or replace function just_pivot(
  p_INPUT_QUERY in sys.odcivarchar2list,
  p_GROUP_COLS in dbms_tf.columns_t,
  p_PIVOT_COLS in dbms_tf.columns_t,
  p_AGG_COLS in dbms_tf.columns_t,
  p_total_label in dbms_tf.columns_t default null   
) return clob sql_macro is

/* Just Pivot MACRO
This macro takes as input a query that does an explicit GROUP BY.
The macro first uses the input query to get the list of pivot columns,
then wraps the input query in a final query that does a PIVOT using MAX().

Input:
1) p_INPUT_QUERY: the input query must be a literal
   wrapped in a sys.odcivarchar2list() collection.
   This allows the content to be accessed by the macro
   (macros do not have access to the content of VARCHAR2 literals).

Every column in the input query must also appear in one of these column lists:
2) p_GROUP_COLS: columns that are GROUPed BY explicitly in the input query
   and implicitly in the PIVOT
3) p_PIVOT_COLS: column that are GROUPed BY explicitly in the input query
   and pivoted in the PIVOT
4) p_AGG_COLS: column that are aggregated explicitly in the input query
   and "re-aggregated" using MAX() in the PIVOT
5) p_total_label: label to use for pivot totals (if CUBE, ROLLUP, etc.)
   null values are assumed to be totals and will be replaced with the label
   ('Total' is the default) and will be ordered at the end

Processing:
- within the macro, execute the input query to get the list of pivot columns
- generate the final SQL statement using input query and intermediate result
- the final SQL statement will call a function to invalidate this function,
  which will invalidate the cursor and cause the query to be parsed
  at every execution.
*/

/*
- PIVOT_IN_LIST concatenates values from all p_PIVOT_COLS columns
  for example, if (DNAME,JOB) then
    ('ACCOUNTING','CLERK') as "ACCOUNTING_CLERK",
    ('ACCOUNTING','MANAGER') as "ACCOUNTING_MANAGER",
  and so on
*/
  l_pivot_in_list_sql long :=  
q'@select listagg('(' || EXPR || ') as "' || AL || '"', ',
') within group(order by #PIVOT_COLS#)
from (
  select distinct
  #EXPR# EXPR,
  #ALIAS# AL,
  #PIVOT_COLS#
  from (#INPUT_QUERY#)
)@';
  l_PIVOT_COLS varchar2(4000);
  l_EXPR varchar2(4000);
  l_ALIAS varchar2(4000);

  l_final_sql long :=
'select * from (
  select #NVL_COLS#,
  #AGG_COLS#
  from (#INPUT_QUERY#)
  where 0 = (
    select invalidate_object(
      '''||$$PLSQL_UNIT_OWNER||''','''||$$PLSQL_UNIT||'''
    ) from dual
  )
)
pivot(#AGG_MAX_COLS# for (#PIVOT_COLS#) in (
  #PIVOT_IN_LIST#
))
order by #ORDER_COLS#';
  l_NVL_COLS varchar2(4000);
  l_AGG_COLS varchar2(4000);
  l_AGG_MAX_COLS varchar2(4000);
  l_PIVOT_IN_LIST varchar2(4000);
  l_ORDER_COLS varchar2(4000);
  
  l_total_label varchar2(32) := 'Total';

begin

  -- set value of l_total_label, which is needed right away
  if p_total_label is not null then
    sqm_util.list_columns(p_total_label, l_total_label, '%s', null, true);
  end if;
  
  -- set values to be plugged into l_pivot_in_list_sql
  sqm_util.list_columns(p_PIVOT_COLS, l_PIVOT_COLS);
  sqm_util.list_columns(
    p_PIVOT_COLS, l_EXPR,
    '''''''''||nvl(%s||null,'''||l_total_label||''')||''''''''', 
    q'§||','||§',
    true
  );
  sqm_util.list_columns(p_PIVOT_COLS, l_ALIAS,
    'nvl(%s||null,'''||l_total_label||''')',
    q'§||'_'||§',
    true
  );
  
  -- plug values into l_pivot_in_list_sql
  l_pivot_in_list_sql := replace(replace(replace(replace(
    l_pivot_in_list_sql,
    '#PIVOT_COLS#', l_PIVOT_COLS),
    '#EXPR#', l_EXPR),
    '#ALIAS#', l_ALIAS),
    '#INPUT_QUERY#', p_INPUT_QUERY(1)
  );  
  -- dbms_output.put_line(l_pivot_in_list_sql);
  
  -- execute l_pivot_in_list_sql
  execute immediate l_pivot_in_list_sql into l_PIVOT_IN_LIST;
  -- dbms_output.put_line(l_PIVOT_IN_LIST);
  
  -- set values to be plugged into l_final_sql
  sqm_util.list_columns(
    p_GROUP_COLS multiset union p_PIVOT_COLS,
    l_NVL_COLS,
    'nvl(%s||null, '''||l_total_label||''') %s'
  );

  sqm_util.list_columns(p_AGG_COLS, l_AGG_MAX_COLS, 'max(%s) %s');
  sqm_util.list_columns(p_AGG_COLS, l_AGG_COLS);
  sqm_util.list_columns(p_GROUP_COLS, l_ORDER_COLS,
    'nullif(%s, '''||l_total_label||''')'
  );
  
  -- plug values into l_final_sql
  l_final_sql := replace(replace(replace(replace(replace(replace(replace(
    l_final_sql,
    '#NVL_COLS#', l_NVL_COLS),
    '#AGG_COLS#', l_AGG_COLS),
    '#INPUT_QUERY#', p_INPUT_QUERY(1)),
    '#AGG_MAX_COLS#', l_AGG_MAX_COLS),
    '#PIVOT_COLS#', l_PIVOT_COLS),
    '#ORDER_COLS#', l_ORDER_COLS),
    '#PIVOT_IN_LIST#', l_PIVOT_IN_LIST);
  
  dbms_output.put_line(l_final_sql);
  return l_final_sql;
  
end just_pivot;
/

SQL table macros 10: JUST_PIVOT examples

To show the power of the JUST_PIVOT macro, here are several examples. Please refer to:

The examples are based on a minimal dataset which is completely abstract. My apologies for not providing more pleasant data…

drop table t purge;
create table t as
with Col_A as (select 'Val_A'||level Col_A from dual connect by level <= 2),
Col_B as (select 'Val_B'||level Col_B from dual connect by level <= 2),
Col_C as (select 'Val_C'||level Col_C from dual connect by level <= 2),
Col_D as (select 'Val_D'||level Col_D from dual connect by level <= 2)
select * from Col_A,Col_B,Col_C,Col_D;

select * from t order by 1,2,3,4
COL_A COL_B COL_C COL_D
Val_A1 Val_B1 Val_C1 Val_D1
Val_A1 Val_B1 Val_C1 Val_D2
Val_A1 Val_B1 Val_C2 Val_D1
Val_A1 Val_B1 Val_C2 Val_D2
Val_A1 Val_B2 Val_C1 Val_D1
Val_A1 Val_B2 Val_C1 Val_D2
Val_A1 Val_B2 Val_C2 Val_D1
Val_A1 Val_B2 Val_C2 Val_D2
Val_A2 Val_B1 Val_C1 Val_D1
Val_A2 Val_B1 Val_C1 Val_D2
Val_A2 Val_B1 Val_C2 Val_D1
Val_A2 Val_B1 Val_C2 Val_D2
Val_A2 Val_B2 Val_C1 Val_D1
Val_A2 Val_B2 Val_C1 Val_D2
Val_A2 Val_B2 Val_C2 Val_D1
Val_A2 Val_B2 Val_C2 Val_D2
 

One group column, one pivot column, one aggregation column

This simplest example shows that generated column names are based on the value of the pivot column followed by the name of the aggregated column. The PIVOT clause does this for us.

select * from just_pivot(
  p_input_query => sys.odcivarchar2list(
q'§
select Col_A, Col_B, count(*) cnt
from t
group by Col_A, Col_B
§'
  ),
  p_group_cols => columns(Col_A),
  p_pivot_cols => columns(Col_B),
  p_agg_cols => columns(cnt)
);
Final query generated by the macro
with pivoted as (
  select * from (
    select nvl("COL_A"||null, 'Total') "COL_A",nvl("COL_B"||null, 'Total') "COL_B",
    "CNT"
    from (
select Col_A, Col_B, count(*) cnt
from t
group by Col_A, Col_B
)
  )
  pivot(max("CNT") "CNT" for ("COL_B") in (
    ('Val_B1') as "Val_B1",
('Val_B2') as "Val_B2"
  ))
)
select * from pivoted
as of scn dbms_flashback.get_system_change_number
order by nullif("COL_A", 'Total')

COL_A Val_B1_CNT Val_B2_CNT
Val_A1 4 4
Val_A2 4 4
 

Changing one line of code gives us totals by the group column, by the pivot column and a grand total. There is actually a fifth parameter called p_total_label that you can use to change the ‘Total’ label, but whatever the label is, the same one is used everywhere.

select * from just_pivot(
  p_input_query => sys.odcivarchar2list(
q'§
select Col_A, Col_B, count(*) cnt
from t
group by cube(Col_A, Col_B)
§'
  ),
  p_group_cols => columns(Col_A),
  p_pivot_cols => columns(Col_B),
  p_agg_cols => columns(cnt)
);
Final query generated by the macro
with pivoted as (
  select * from (
    select nvl("COL_A"||null, 'Total') "COL_A",nvl("COL_B"||null, 'Total') "COL_B",
    "CNT"
    from (
select Col_A, Col_B, count(*) cnt
from t
group by cube(Col_A, Col_B)
)
  )
  pivot(max("CNT") "CNT" for ("COL_B") in (
    ('Val_B1') as "Val_B1",
('Val_B2') as "Val_B2",
('Total') as "Total"
  ))
)
select * from pivoted
as of scn dbms_flashback.get_system_change_number
order by nullif("COL_A", 'Total')

COL_A Val_B1_CNT Val_B2_CNT Total_CNT
Val_A1 4 4 8
Val_A2 4 4 8
Total 8 8 16
 

This example shows how the GROUPING_ID function tells us the GROUP BY level of every “cell” in our output. I’ll use it to eliminate unwanted rows or columns.

select * from just_pivot(
  p_input_query => sys.odcivarchar2list(
q'§
select Col_A, Col_B, grouping_id(Col_A, Col_B) gid
from t
group by cube(Col_A, Col_B)
§'
  ),
  p_group_cols => columns(Col_A),
  p_pivot_cols => columns(Col_B),
  p_agg_cols => columns(gid)
);
COL_A Val_B1_GID Val_B2_GID Total_GID
Val_A1 0 0 1
Val_A2 0 0 1
Total 2 2 3
 

Let’s say I don’t want the Total_CNT column: I simply choose the GROUPING_ID values I want to keep.

select * from just_pivot(
  p_input_query => sys.odcivarchar2list(
q'§
select Col_A, Col_B, count(*) cnt
from t
group by cube(Col_A, Col_B)
having grouping_id(Col_A, Col_B) in (0,2)
§'
  ),
  p_group_cols => columns(Col_A),
  p_pivot_cols => columns(Col_B),
  p_agg_cols => columns(cnt)
);
COL_A Val_B1_CNT Val_B2_CNT
Val_A1 4 4
Val_A2 4 4
Total 8 8
 

Two aggregations

The PIVOT clause allows us to aggregate as many values as we want, of different datatypes if we want.

select * from just_pivot(
  p_input_query => sys.odcivarchar2list(
q'§
select Col_A, Col_B, count(*) cnt, 'Count='||count(*) lit
from t
group by cube(Col_A, Col_B)
§'
  ),
  p_group_cols => columns(Col_A),
  p_pivot_cols => columns(Col_B),
  p_agg_cols => columns(cnt,lit)
);
Final query generated by the macro
with pivoted as (
  select * from (
    select nvl("COL_A"||null, 'Total') "COL_A",nvl("COL_B"||null, 'Total') "COL_B",
    "CNT","LIT"
    from (
select Col_A, Col_B, count(*) cnt, 'Count='||count(*) lit
from t
group by cube(Col_A, Col_B)
)
  )
  pivot(max("CNT") "CNT",max("LIT") "LIT" for ("COL_B") in (
    ('Val_B1') as "Val_B1",
('Val_B2') as "Val_B2",
('Total') as "Total"
  ))
)
select * from pivoted
as of scn dbms_flashback.get_system_change_number
order by nullif("COL_A", 'Total')

COL_A Val_B1_CNT Val_B1_LIT Val_B2_CNT Val_B2_LIT Total_CNT Total_LIT
Val_A1 4 Count=4 4 Count=4 8 Count=8
Val_A2 4 Count=4 4 Count=4 8 Count=8
Total 8 Count=8 8 Count=8 16 Count=16
 

Two group-by columns

With two group-by columns, we would normally show subtotals by both columns, totals by the first column, and the grand total. To avoid getting totals by the second column, we can use the GROUPING_ID function just on those two columns.

select * from just_pivot(
  p_input_query => sys.odcivarchar2list(
q'§
select Col_A, Col_B, Col_C, count(*) cnt
from t
group by cube(Col_A, Col_B, Col_C)
having grouping_id(Col_A, Col_B) != 2
§'
  ),
  p_group_cols => columns(Col_A, Col_B),
  p_pivot_cols => columns(Col_C),
  p_agg_cols => columns(cnt)
);
Final query generated by the macro
with pivoted as (
  select * from (
    select nvl("COL_A"||null, 'Total') "COL_A",nvl("COL_B"||null, 'Total') "COL_B",nvl("COL_C"||null, 'Total') "COL_C",
    "CNT"
    from (
select Col_A, Col_B, Col_C, count(*) cnt
from t
group by cube(Col_A, Col_B, Col_C)
having grouping_id(Col_A, Col_B) != 2
)
  )
  pivot(max("CNT") "CNT" for ("COL_C") in (
    ('Val_C1') as "Val_C1",
('Val_C2') as "Val_C2",
('Total') as "Total"
  ))
)
select * from pivoted
as of scn dbms_flashback.get_system_change_number
order by nullif("COL_A", 'Total'),nullif("COL_B", 'Total')

COL_A COL_B Val_C1_CNT Val_C2_CNT Total_CNT
Val_A1 Val_B1 2 2 4
Val_A1 Val_B2 2 2 4
Val_A1 Total 4 4 8
Val_A2 Val_B1 2 2 4
Val_A2 Val_B2 2 2 4
Val_A2 Total 4 4 8
Total Total 8 8 16
 

Two pivot columns

Again I’ll show subtotals by both columns, totals by the first column, and the grand total. To avoid getting totals by the second column, use the GROUPING_ID function on the two pivot columns.

select * from just_pivot(
  p_input_query => sys.odcivarchar2list(
q'§
select Col_A, Col_B, Col_C, count(*) cnt
from t
group by cube(Col_A, Col_B, Col_C)
having grouping_id(Col_B, Col_C) != 2
§'
  ),
  p_group_cols => columns(Col_A),
  p_pivot_cols => columns(Col_B, Col_C),
  p_agg_cols => columns(cnt),
  p_total_label => columns("Tot")
);
Final query generated by the macro
with pivoted as (
  select * from (
    select nvl("COL_A"||null, 'Tot') "COL_A",nvl("COL_B"||null, 'Tot') "COL_B",nvl("COL_C"||null, 'Tot') "COL_C",
    "CNT"
    from (
select Col_A, Col_B, Col_C, count(*) cnt
from t
group by cube(Col_A, Col_B, Col_C)
having grouping_id(Col_B, Col_C) != 2
)
  )
  pivot(max("CNT") "CNT" for ("COL_B","COL_C") in (
    ('Val_B1','Val_C1') as "Val_B1_Val_C1",
('Val_B1','Val_C2') as "Val_B1_Val_C2",
('Val_B1','Tot') as "Val_B1_Tot",
('Val_B2','Val_C1') as "Val_B2_Val_C1",
('Val_B2','Val_C2') as "Val_B2_Val_C2",
('Val_B2','Tot') as "Val_B2_Tot",
('Tot','Tot') as "Tot_Tot"
  ))
)
select * from pivoted
as of scn dbms_flashback.get_system_change_number
order by nullif("COL_A", 'Tot')

COL_A Val_B1_Val_C1_CNT Val_B1_Val_C2_CNT Val_B1_Tot_CNT Val_B2_Val_C1_CNT Val_B2_Val_C2_CNT Val_B2_Tot_CNT Tot_Tot_CNT
Val_A1 2 2 4 2 2 4 8
Val_A2 2 2 4 2 2 4 8
Tot 4 4 8 4 4 8 16
 

Two group-by columns and two pivot columns

Again I’ll show subtotals, totals and the grand total. To avoid getting totals by the second column, use the GROUPING_ID function on the two group-by columns and the two pivot columns.

select * from just_pivot(
  p_input_query => sys.odcivarchar2list(
q'§
select Col_A, Col_B, Col_C, Col_D, count(*) cnt
from t
group by cube(Col_A, Col_B, Col_C, Col_D)
having 2 not in (
  grouping_id(Col_A, Col_B),
  grouping_id(Col_C, Col_D)
)
§'
  ),
  p_group_cols => columns(Col_A, Col_B),
  p_pivot_cols => columns(Col_C, Col_D),
  p_agg_cols => columns(cnt),
  p_total_label => columns("Tot")
);
Final query generated by the macro
with pivoted as (
  select * from (
    select nvl("COL_A"||null, 'Tot') "COL_A",nvl("COL_B"||null, 'Tot') "COL_B",nvl("COL_C"||null, 'Tot') "COL_C",nvl("COL_D"||null, 'Tot') "COL_D",
    "CNT"
    from (
select Col_A, Col_B, Col_C, Col_D, count(*) cnt
from t
group by cube(Col_A, Col_B, Col_C, Col_D)
having 2 not in (
  grouping_id(Col_A, Col_B),
  grouping_id(Col_C, Col_D)
)
)
  )
  pivot(max("CNT") "CNT" for ("COL_C","COL_D") in (
    ('Val_C1','Val_D1') as "Val_C1_Val_D1",
('Val_C1','Val_D2') as "Val_C1_Val_D2",
('Val_C1','Tot') as "Val_C1_Tot",
('Val_C2','Val_D1') as "Val_C2_Val_D1",
('Val_C2','Val_D2') as "Val_C2_Val_D2",
('Val_C2','Tot') as "Val_C2_Tot",
('Tot','Tot') as "Tot_Tot"
  ))
)
select * from pivoted
as of scn dbms_flashback.get_system_change_number
order by nullif("COL_A", 'Tot'),nullif("COL_B", 'Tot')

COL_A COL_B Val_C1_Val_D1_CNT Val_C1_Val_D2_CNT Val_C1_Tot_CNT Val_C2_Val_D1_CNT Val_C2_Val_D2_CNT Val_C2_Tot_CNT Tot_Tot_CNT
Val_A1 Val_B1 1 1 2 1 1 2 4
Val_A1 Val_B2 1 1 2 1 1 2 4
Val_A1 Tot 2 2 4 2 2 4 8
Val_A2 Val_B1 1 1 2 1 1 2 4
Val_A2 Val_B2 1 1 2 1 1 2 4
Val_A2 Tot 2 2 4 2 2 4 8
Tot Tot 4 4 8 4 4 8 16
 

This function allows any combination of one or more group-by columns, pivot columns and aggregations, limited only by screen real estate and how much horizontal scrolling you want to inflict on the users. In my next post I’ll talk about some limitations.

SQL table macros 9: just pivot!

The PIVOT clause does some aggregation, then pivots the result – but it is not dynamic.

Let’s make a deal: you do the aggregation and the JUST_PIVOT table macro will pivot the result, dynamically!

[UPDATE 2022-01-14: the solution I propose here works only in version 19c. For a more complicated solution that works in 19c and 21c, see SQL table macros 11: JUST_PIVOT for 21c (and 19c)]

Doing it the old-fashioned way

Let’s say we want the total salary for each job type in each department. This is a simple aggregation:

select dname, job, sum(sal) sal
from dept
join emp using(deptno)
group by dname, job
order by 1,2
DNAME JOB SAL
ACCOUNTING CLERK 1300
ACCOUNTING MANAGER 2450
ACCOUNTING PRESIDENT 5000
RESEARCH ANALYST 6000
RESEARCH CLERK 1900
RESEARCH MANAGER 2975
SALES CLERK 950
SALES MANAGER 2850
SALES SALESMAN 5600
 

Now that we know all possible JOB values, we can aggregate and pivot the result directly.

select * from (
  select dname, job, sal
  from dept
  join emp using(deptno)
)
pivot(sum(SAL) SAL for (JOB) in (
  ('ANALYST') as ANALYST,
  ('CLERK') as CLERK,
  ('MANAGER') as MANAGER,
  ('PRESIDENT') as PRESIDENT,
  ('SALESMAN') as SALESMAN
));
DNAME ANALYST_SAL CLERK_SAL MANAGER_SAL PRESIDENT_SAL SALESMAN_SAL
ACCOUNTING   1300 2450 5000  
RESEARCH 6000 1900 2975    
SALES   950 2850   5600
 
  • I call SAL an “aggregation column” because it is used in an aggregation function.
  • JOB is a “pivot column” because its values are used in create the new pivoted columns.
  • DNAME is a “group by column” because PIVOT groups by it implicitly.

Using the JUST_PIVOT macro

JUST_PIVOT asks you for four things:

  1. p_INPUT_QUERY: the input query must be a literal wrapped in a sys.odcivarchar2list() collection. This allows the content to be accessed by the macro; macros do not have access to the content of VARCHAR2 literals.

    Every column in the input query must also appear in one of these column lists:
  2. p_GROUP_COLS: columns that are GROUPed BY explicitly in the input query and implicitly in the PIVOT
  3. p_PIVOT_COLS: columns that are GROUPed BY explicitly in the input query and pivoted in the PIVOT
  4. p_AGG_COLS: column that are aggregated explicitly in the input query and “re-aggregated” using MAX() in the PIVOT
select * from just_pivot(
  p_input_query => sys.odcivarchar2list(
q'§
select dname, job, sum(sal) sal
from dept
join emp using(deptno)
group by dname, job
§'
  ),
  p_group_cols => columns(dname),
  p_pivot_cols => columns(job),
  p_agg_cols => columns(sal)
);
DNAME ANALYST_SAL CLERK_SAL MANAGER_SAL PRESIDENT_SAL SALESMAN_SAL
ACCOUNTING   1300 2450 5000  
RESEARCH 6000 1900 2975    
SALES   950 2850   5600
 

Advantages of the macro

  • The macro actually executes the input query a first time in order to build a complete “pivot-in-list”. We don’t have to.
  • The final query is a flashback query, so it is parsed every time and the macro generates an up-to-date final query every time.
  • If we generate total values, they are automatically put in the right places in the output!
    Here is an example, with one line of code changed :
select * from just_pivot(
  p_input_query => sys.odcivarchar2list(
q'§
select dname, job, sum(sal) sal
from dept
join emp using(deptno)
group by cube(dname, job)
§'
  ),
  p_group_cols => columns(dname),
  p_pivot_cols => columns(job),
  p_agg_cols => columns(sal)
);
DNAME ANALYST_SAL CLERK_SAL MANAGER_SAL PRESIDENT_SAL SALESMAN_SAL Total_SAL
ACCOUNTING   1300 2450 5000   8750
RESEARCH 6000 1900 2975     10875
SALES   950 2850   5600 9400
Total 6000 4150 8275 5000 5600 29025
 

My next post will show how the macro works with multiple group by, pivot and / or aggregation columns.

The code

/* Just Pivot MACRO
This macro takes as input a query that does an explicit GROUP BY.
The macro first uses the input query to get the list of pivot columns,
then wraps the input query in a final query that does a PIVOT using MAX().
The final query has a flashback clause in order to make the cursor unshareable:
  - there will be a hard parse phase at every execution,
  - the macro will execute and the list of pivot values will be up to date.

Input:
1) p_INPUT_QUERY: the input query must be a literal
   wrapped in a sys.odcivarchar2list() collection.
   This allows the content to be accessed by the macro;
   macros do not have access to the content of VARCHAR2 literals.

Every column in the input query must also appear in one of these column lists:
2) p_GROUP_COLS: columns that are GROUPed BY explicitly in the input query
   and implicitly in the PIVOT (*)
3) p_PIVOT_COLS: column that are GROUPed BY explicitly in the input query
   and pivoted in the PIVOT (*)
4) p_AGG_COLS: column that are aggregated explicitly in the input query
   and "re-aggregated" using MAX() in the PIVOT
5) p_total_label: label to use for pivot totals (if CUBE, ROLLUP, etc.)
   null values are assumed to be totals and will be replaced with some non-null label
   such as 'Total', which is the default, and will be ordered at the end

Processing:
- within the macro, execute the input query to get the list of pivot columns
- generate the final SQL statement based on the input query and the intermediate result,
  adding an AS OF clause to ensure the cursor will not be reused.
*/
create or replace function just_pivot(
  p_INPUT_QUERY in sys.odcivarchar2list,
  p_GROUP_COLS in dbms_tf.columns_t,
  p_PIVOT_COLS in dbms_tf.columns_t,
  p_AGG_COLS in dbms_tf.columns_t,
  p_total_label in dbms_tf.columns_t default null   
) return clob sql_macro is
/*
- PIVOT_IN_LIST concatenates values from all p_PIVOT_COLS columns
  for example, if (DNAME,JOB) then
    ('ACCOUNTING','CLERK') as "ACCOUNTING_CLERK",
    ('ACCOUNTING','MANAGER') as "ACCOUNTING_MANAGER",
  and so on
*/
  l_pivot_in_list_sql long :=  
q'@select listagg('(' || EXPR || ') as "' || AL || '"', ',
') within group(order by #PIVOT_COLS#)
from (
  select distinct
  #EXPR# EXPR,
  #ALIAS# AL,
  #PIVOT_COLS#
  from (#INPUT_QUERY#)
)@';
  l_PIVOT_COLS varchar2(4000);
  l_EXPR varchar2(4000);
  l_ALIAS varchar2(4000);

  l_final_sql long :=
'with pivoted as (
  select * from (
    select #NVL_COLS#,
    #AGG_COLS#
    from (#INPUT_QUERY#)
  )
  pivot(#AGG_MAX_COLS# for (#PIVOT_COLS#) in (
    #PIVOT_IN_LIST#
  ))
)
select * from pivoted
as of scn dbms_flashback.get_system_change_number
order by #ORDER_COLS#';
  l_NVL_COLS varchar2(4000);
  l_AGG_COLS varchar2(4000);
  l_AGG_MAX_COLS varchar2(4000);
  l_PIVOT_IN_LIST varchar2(4000);
  l_ORDER_COLS varchar2(4000);
  
  l_total_label varchar2(32) := 'Total';

begin
  -- set value of l_total_label, which is needed right away
  if p_total_label is not null then
    sqm_util.list_columns(p_total_label, l_total_label, '%s', null, true);
  end if;
  
  -- set values to be plugged into l_pivot_in_list_sql
  sqm_util.list_columns(p_PIVOT_COLS, l_PIVOT_COLS);
  sqm_util.list_columns(
    p_PIVOT_COLS, l_EXPR,
    '''''''''||nvl(%s||null,'''||l_total_label||''')||''''''''', 
    q'§||','||§',
    true
  );
  sqm_util.list_columns(p_PIVOT_COLS, l_ALIAS,
    'nvl(%s||null,'''||l_total_label||''')',
    q'§||'_'||§',
    true
  );
  
  -- plug values into l_pivot_in_list_sql
  l_pivot_in_list_sql := replace(replace(replace(replace(
    l_pivot_in_list_sql,
    '#PIVOT_COLS#', l_PIVOT_COLS),
    '#EXPR#', l_EXPR),
    '#ALIAS#', l_ALIAS),
    '#INPUT_QUERY#', p_INPUT_QUERY(1)
  );
  dbms_output.put_line('line 109');
  
  -- execute l_pivot_in_list_sql
  dbms_output.put_line(l_pivot_in_list_sql);
  execute immediate l_pivot_in_list_sql into l_PIVOT_IN_LIST;
  dbms_output.put_line(l_PIVOT_IN_LIST);
  
  -- set values to be plugged into l_final_sql
  sqm_util.list_columns(
    p_GROUP_COLS multiset union p_PIVOT_COLS,
    l_NVL_COLS,
    'nvl(%s||null, '''||l_total_label||''') %s'
  );

  sqm_util.list_columns(p_AGG_COLS, l_AGG_MAX_COLS, 'max(%s) %s');
  sqm_util.list_columns(p_AGG_COLS, l_AGG_COLS);
  sqm_util.list_columns(p_GROUP_COLS, l_ORDER_COLS,
    'nullif(%s, '''||l_total_label||''')'
  );
  
  -- plug values into l_final_sql
  l_final_sql := replace(replace(replace(replace(replace(replace(replace(
    l_final_sql,
    '#NVL_COLS#', l_NVL_COLS),
    '#AGG_COLS#', l_AGG_COLS),
    '#INPUT_QUERY#', p_INPUT_QUERY(1)),
    '#AGG_MAX_COLS#', l_AGG_MAX_COLS),
    '#PIVOT_COLS#', l_PIVOT_COLS),
    '#ORDER_COLS#', l_ORDER_COLS),
    '#PIVOT_IN_LIST#', l_PIVOT_IN_LIST);
  
  dbms_output.put_line(l_final_sql);
  return l_final_sql;
  
end just_pivot;
/

SQL table macros 8: Print table

I like this one!

Tom Kyte once wrote a procedure to output table contents one column at a time. For some history and a newer solution, see “Print a table with one column name + value per row”.

Using SQL macros, we can get a solution that is even better in my view. My previous solution was a SELECT statement, which I like, but as such it does not hide the code inside a function or procedure, and it is not really configurable.

To stick different column values into one column, we generally have to convert non-string values to VARCHAR2. For fun, I’ll create a little table with several datatypes:

create table datatypes (
C_NUMBER NUMBER,
C_VARCHAR2 VARCHAR2(16),
C_DATE DATE,
C_RAW RAW(16),
C_BINARY_FLOAT BINARY_FLOAT,
C_BINARY_DOUBLE BINARY_DOUBLE,
C_CLOB CLOB,
C_BFILE BFILE,
C_TIMESTAMP TIMESTAMP,
C_TIMESTAMP_WITH_TIME_ZONE TIMESTAMP WITH TIME ZONE,
C_INTERVAL_YEAR_TO_MONTH INTERVAL YEAR TO MONTH,
C_INTERVAL_DAY_TO_SECOND INTERVAL DAY TO SECOND
);

insert into datatypes 
select 3-level,'varchar2',sysdate,hextoraw('FF0102030405'),
  1.1,2.2,'clob',bfilename('DOWNLOAD_DIR', 'x.txt'),
  localtimestamp, localtimestamp,
  interval '1' year, interval '2' day
from dual
connect by level <= 2;

All these datatypes should work from version 19.6 on. If we have access to the Autonomous Cloud 19c version, or to version 21c, we can add some extra datatypes:


create or replace view v_datatypes as
select d.*,
  sys.odcinumberlist(1) c_varray,
  sys.FI_CATEGORICALS('A') c_nested_table,
  SYS.AWRRPT_TEXT_TYPE('a') c_object,
  JSON_object('sysdate':sysdate) j_object
from datatypes d;

Before showing the code of my PRINT_TABLE_MACRO, I’ll show what it does to the view V_DATATYPES. Again, if you try this on a non-cloud 19c version, use the table DATATYPES instead.

select * from print_table_macro(
  p_table => v_datatypes
);
RN COLUMN_NAME COLUMN_VALUE
1 C_NUMBER 2
1 C_VARCHAR2 varchar2
1 C_DATE 2021-12-16T13:40:08
1 C_RAW FF0102030405
1 C_BINARY_FLOAT 1,10000002
1 C_BINARY_DOUBLE 2,2000000000000002
1 C_CLOB clob
1 C_BFILE bfilename(‘DOWNLOAD_DIR’, ‘x.txt’)
1 C_TIMESTAMP 2021-12-16T14:40:8.003345
1 C_TIMESTAMP_WITH_TIME_ZONE 2021-12-16T14:40:8.003345 Europe/Paris
1 C_INTERVAL_YEAR_TO_MONTH +01-00
1 C_INTERVAL_DAY_TO_SECOND +02 00:00:00.000000
1 C_VARRAY [1]
1 C_NESTED_TABLE [“A”]
1 C_OBJECT {“OUTPUT”:”a”}
1 J_OBJECT {“sysdate”:”2021-12-16T13:40:14″}
2 C_NUMBER 1
2 C_VARCHAR2 varchar2
2 C_DATE 2021-12-16T13:40:08
2 C_RAW FF0102030405
2 C_BINARY_FLOAT 1,10000002
2 C_BINARY_DOUBLE 2,2000000000000002
2 C_CLOB clob
2 C_BFILE bfilename(‘DOWNLOAD_DIR’, ‘x.txt’)
2 C_TIMESTAMP 2021-12-16T14:40:8.003345
2 C_TIMESTAMP_WITH_TIME_ZONE 2021-12-16T14:40:8.003345 Europe/Paris
2 C_INTERVAL_YEAR_TO_MONTH +01-00
2 C_INTERVAL_DAY_TO_SECOND +02 00:00:00.000000
2 C_VARRAY [1]
2 C_NESTED_TABLE [“A”]
2 C_OBJECT {“OUTPUT”:”a”}
2 J_OBJECT {“sysdate”:”2021-12-16T13:40:14″}
 

Notice that the input rows are not sorted: I just use ROWNUM to identify each row. Also, each schema-level “User Defined Type” structure is converted to a string in JSON format. Now suppose I want to identify each row by one more columns, and sort by those columns.

select * from print_table_macro(
  p_table => v_datatypes, 
  p_key_cols => columns(c_number)
);
C_NUMBER COLUMN_NAME COLUMN_VALUE
1 C_VARCHAR2 varchar2
1 C_DATE 2021-12-16T13:40:08
1 C_RAW FF0102030405
1 C_BINARY_FLOAT 1,10000002
1 C_BINARY_DOUBLE 2,2000000000000002
1 C_CLOB clob
1 C_BFILE bfilename(‘DOWNLOAD_DIR’, ‘x.txt’)
1 C_TIMESTAMP 2021-12-16T14:40:8.003345
1 C_TIMESTAMP_WITH_TIME_ZONE 2021-12-16T14:40:8.003345 Europe/Paris
1 C_INTERVAL_YEAR_TO_MONTH +01-00
1 C_INTERVAL_DAY_TO_SECOND +02 00:00:00.000000
1 C_VARRAY [1]
1 C_NESTED_TABLE [“A”]
1 C_OBJECT {“OUTPUT”:”a”}
1 J_OBJECT {“sysdate”:”2021-12-16T13:45:50″}
2 C_VARCHAR2 varchar2
2 C_DATE 2021-12-16T13:40:08
2 C_RAW FF0102030405
2 C_BINARY_FLOAT 1,10000002
2 C_BINARY_DOUBLE 2,2000000000000002
2 C_CLOB clob
2 C_BFILE bfilename(‘DOWNLOAD_DIR’, ‘x.txt’)
2 C_TIMESTAMP 2021-12-16T14:40:8.003345
2 C_TIMESTAMP_WITH_TIME_ZONE 2021-12-16T14:40:8.003345 Europe/Paris
2 C_INTERVAL_YEAR_TO_MONTH +01-00
2 C_INTERVAL_DAY_TO_SECOND +02 00:00:00.000000
2 C_VARRAY [1]
2 C_NESTED_TABLE [“A”]
2 C_OBJECT {“OUTPUT”:”a”}
2 J_OBJECT {“sysdate”:”2021-12-16T13:45:50″}
 

Finally, I can exclude columns if I want.

select * from print_table_macro(
  p_table => v_datatypes, 
  p_key_cols => columns(c_number), 
  p_exclude_cols => columns(C_BFILE,C_TIMESTAMP,C_TIMESTAMP_WITH_TIME_ZONE)
);
C_NUMBER COLUMN_NAME COLUMN_VALUE
1 C_VARCHAR2 varchar2
1 C_DATE 2021-12-16T13:40:08
1 C_RAW FF0102030405
1 C_BINARY_FLOAT 1,10000002
1 C_BINARY_DOUBLE 2,2000000000000002
1 C_CLOB clob
1 C_INTERVAL_YEAR_TO_MONTH +01-00
1 C_INTERVAL_DAY_TO_SECOND +02 00:00:00.000000
1 C_VARRAY [1]
1 C_NESTED_TABLE [“A”]
1 C_OBJECT {“OUTPUT”:”a”}
1 J_OBJECT {“sysdate”:”2021-12-16T13:47:36″}
2 C_VARCHAR2 varchar2
2 C_DATE 2021-12-16T13:40:08
2 C_RAW FF0102030405
2 C_BINARY_FLOAT 1,10000002
2 C_BINARY_DOUBLE 2,2000000000000002
2 C_CLOB clob
2 C_INTERVAL_YEAR_TO_MONTH +01-00
2 C_INTERVAL_DAY_TO_SECOND +02 00:00:00.000000
2 C_VARRAY [1]
2 C_NESTED_TABLE [“A”]
2 C_OBJECT {“OUTPUT”:”a”}
2 J_OBJECT {“sysdate”:”2021-12-16T13:47:36″}
 

The code (depends on package SQM_UTIL)

create or replace function print_table_macro(
  p_table in dbms_tf.table_t,
  p_key_cols in dbms_tf.columns_t default null,
  p_exclude_cols in dbms_tf.columns_t default null
) return clob sql_macro is
  l_col_to_strings long;
  l_col_column_names long;
  l_sql long;
  l_key_list varchar2(4000);
  l_order_by varchar2(4000);
  l_exclude_cols dbms_tf.columns_t;
begin
  l_exclude_cols := p_key_cols;
  if l_exclude_cols is null then
    l_exclude_cols := p_exclude_cols;
  else
    if p_exclude_cols is not null then
      l_exclude_cols := l_exclude_cols multiset union p_exclude_cols;
    end if;
  end if;
  sqm_util.col_to_strings(p_table, l_col_to_strings, l_exclude_cols);
  sqm_util.col_column_names(p_table, l_col_column_names, l_exclude_cols);
  if p_key_cols is null then
    l_key_list := 'rownum rn';
    l_order_by := null;
  else
    sqm_util.list_columns(p_key_cols, l_key_list);
    l_order_by := 'order by ' || l_key_list;
  end if;
  l_sql := '
select * from (
  select ' || l_key_list || ',' || l_col_to_strings || ' from p_table
  ' || l_order_by || '
)
unpivot include nulls (column_value for column_name in (' || l_col_column_names || '))
';
  dbms_output.put_line(l_sql);
  return l_sql;
end print_table_macro;
/

SQL table macros 7: Select excluding

Sometimes it would be easier to SELECT saying what columns we don’t want, not those we do. Using my SQM_UTIL package, a very simple SQL macro will let us do that.

A common requirement

There are loads of “top-N” SQL queries that use analytic functions (also called window functions) to identify the rows we want. For example, suppose we want the highest paid employee in each department:

  • Start by ranking the employees by salary (descending), commission (descending), then maybe hiredate (descending) – since a younger employee with the same salary is better paid, in my opinion;
  • Then keep only the employees who have rank 1.
  select * from (
    select rank() over(
      partition by deptno
      order by sal desc, comm desc nulls last, hiredate desc
    ) rnk,
    emp.* from emp
  )
  where rnk = 1
RNK EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
1 7839 KING PRESIDENT   1981-11-17 00:00:00 5000   10
1 7788 SCOTT ANALYST 7566 1987-04-19 00:00:00 3000   20
1 7698 BLAKE MANAGER 7839 1981-05-01 00:00:00 2850   30
 

Generally, we don’t really want to see the RNK column in the output, so we have to list all the other columns explicitly – or we can write a SQL macro to exclude just the column we want. Using the procedure SQM_UTIL.COL_COLUMN_NAMES to do the heavy lifting, this macro is trivial:

create or replace function query_excluding(
  p_table in dbms_tf.table_t,
  p_exclude_cols in dbms_tf.columns_t
) return clob sql_macro is
  l_col_column_names varchar2(32767);
begin
  sqm_util.col_column_names(p_table, l_col_column_names, p_exclude_cols);
  return 'select ' || l_col_column_names || ' from p_table';
end query_excluding;
/
with data as (
  select * from (
    select rank() over(
      partition by deptno
      order by sal desc, comm desc nulls last, hiredate desc
    ) rnk,
    emp.* from emp
  )
  where rnk = 1
)
select * from query_excluding(
  data,
  columns(rnk)
);
EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
7839 KING PRESIDENT   1981-11-17 00:00:00 5000   10
7788 SCOTT ANALYST 7566 1987-04-19 00:00:00 3000   20
7698 BLAKE MANAGER 7839 1981-05-01 00:00:00 2850   30
 

SQL table macros 6: SQM_UTIL package

My SQL Macro Utility package provides services for SQL macro development and execution. Future blog posts will include macros that take advantage of the different services. There are three types:

  1. Get the contents of a DBMS_TF.TABLE_T structure, either in JSON format or as a table.
  2. Starting from a TABLE_T structure, return lists of:
    • the column names
    • their data types
    • expressions that convert the columns to strings
    • expressions that convert the columns to something that can be compared
      (a hash for LOBs, for example)
  3. Generate configurable SQL code for all the columns in a COLUMNS_T structure.

Normally, I try to explain the code I post. This package is too complicated for that, so I’m going to concentrate on how to use it, not how I wrote it.

Get the contents of a TABLE_T structure.

As I explained in SQL table macros 1: a moving target? , the DBMS_TF.TABLE_T type is pretty complex. To help during development, I provide two ways to display the data from a TABLE_T structure.

The function GET_TABLE_T_JSON is a table macro that returns the data in JSON format: the format mimics the structure as much as possible. I add “type_label”, which interprets the “type” and “charsetform” combination as a SQL datatype.

[UPDATE 2022-01-04: on livesql.oracle.com, the package body will not compile because the DBMS_TF.TABLE_T type does not contain the fields “table_schema_name” and “table_name”. Currently livesql uses version 19.8. Everything seems to work on the 19c and 21c Cloud versions.]

  -- SQL table macro to store the TABLE_T data as JSON, then return it
  function get_table_t_json(
    p_table in dbms_tf.table_t
  ) return varchar2 sql_macro;
select * from sqm_util.get_table_t_json(dept);

TABLE_T_JSON                                                                    
-------------------------
{
  "schema_name" : null,
  "package_name" : null,
  "ptf_name" : null,
  "table_schema_name" : "\"ADMIN\"",
  "table_name" : "\"DEPT\"",
  "column" :
  [
    {
      "description" :
      {
        "type" : 2,
        "max_len" : 22,
        "name" : "\"DEPTNO\"",
        "name_len" : 8,
        "precision" : 2,
        "scale" : 0,
        "charsetid" : 0,
        "charsetform" : 0,
        "collation" : 0,
        "type_label" : "NUMBER"
      },
      "pass_through" : true,
      "for_read" : false
    },
    {
      "description" :
      {
        "type" : 1,
        "max_len" : 14,
        "name" : "\"DNAME\"",
        "name_len" : 7,
        "precision" : 0,
        "scale" : 0,
        "charsetid" : 873,
        "charsetform" : 1,
        "collation" : 16382,
        "type_label" : "VARCHAR2"
      },
      "pass_through" : true,
      "for_read" : false
    },
    {
      "description" :
      {
        "type" : 1,
        "max_len" : 13,
        "name" : "\"LOC\"",
        "name_len" : 5,
        "precision" : 0,
        "scale" : 0,
        "charsetid" : 873,
        "charsetform" : 1,
        "collation" : 16382,
        "type_label" : "VARCHAR2"
      },
      "pass_through" : true,
      "for_read" : false
    }
  ]
}

The function GET_TABLE_T_FLATTENED is a table macro that returns the most significant data from the TABLE_T structure as a table.

select * from sqm_util.get_table_t_flattened(dept);
TABLE_SCHEMA_NAME TABLE_NAME COLUMN_ID TYPE CHARSETFORM TYPE_LABEL MAX_LEN NAME NAME_LEN PRECISION SCALE CHARSETID COLLATION
“ADMIN” “DEPT” 1 2 0 NUMBER 22 “DEPTNO” 8 2 0 0 0
“ADMIN” “DEPT” 2 1 1 VARCHAR2 14 “DNAME” 7 0 0 873 16382
“ADMIN” “DEPT” 3 1 1 VARCHAR2 13 “LOC” 5 0 0 873 16382
 

Generate code for the columns in a TABLE_T structure

Within the package body, I have placed little SQL templates for every SQL data type that macros support. For example:

  • the template 'to_char(%s, ''TM'') %s' converts a number to a string so that it can be UNPIVOTed;
  • '''HASH_SH256: ''||dbms_crypto.hash(%s, 4) %s' is used for LOBs to return a result that can be used to compare values.

There are four procedures that take all the columns in the TABLE_T data (except those you choose to exclude), apply the appropriate templates, then return a comma-separated list that can be easily plugged into a SELECT statement.

  procedure col_column_names(
    p_table in dbms_tf.table_t,
    p_column_names in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  );

  procedure col_type_labels(
    p_table in dbms_tf.table_t,
    p_type_labels in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  );

  procedure col_to_strings(
    p_table in dbms_tf.table_t,
    p_to_strings in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  );

  procedure col_comparables(
    p_table in dbms_tf.table_t,
    p_comparables in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  );

Examples (better examples will come in later blog posts):

create or replace function sqm (
  p_table in dbms_tf.table_t
) return varchar2 sql_macro is
  l_col_list varchar2(4000);
begin
  sqm_util.col_column_names(p_table, l_col_list);
  dbms_output.put_line('column names:'||l_col_list);
  sqm_util.col_type_labels(p_table, l_col_list);
  dbms_output.put_line('type labels:'||l_col_list);
  sqm_util.col_to_strings(p_table, l_col_list);
  dbms_output.put_line('to strings:'||l_col_list);
  sqm_util.col_comparables(p_table, l_col_list);
  dbms_output.put_line('comparables:'||l_col_list);
  return 'select * from dual';
end;
/
with data as (
   select sysdate dte, interval '1' day ids, to_clob('small clob') cl from dual
)
select * from sqm(data);

DUMMY   
-------
X        

column names:"DTE","IDS","CL"
type labels:EDATE,EINTERVAL_DS,CLOB
to strings:to_char("DTE", 'yyyy-mm-dd"T"hh24:mi:ss') "DTE",to_char("IDS") "IDS",to_char(substr("CL", 1, 1000)) "CL"
comparables:"DTE","IDS",'HASH_SH256: '||dbms_crypto.hash("CL", 4) "CL"

Generate code for the columns in a COLUMNS_T structure

In the TABLE_T structure, we know what the datatype of each column is, so we can apply templates intended for each type. With the COLUMNS_T structure, the intent is to pass a template and a list delimiter, in order to generate a list of expressions that only differ by the column name. Again, these lists can be plugged into SELECT statements easily. There is an option to remove the double quotes around the column names.

  procedure list_columns(
    p_columns in dbms_tf.columns_t,
    p_column_list in out nocopy varchar2,
    p_template in varchar2 default '%s',
    p_delimiter in varchar2 default ',',
    p_remove_quotes boolean default false
  );
create or replace function sqm (
  p_table in dbms_tf.table_t,
  p_columns in dbms_tf.columns_t
) return varchar2 sql_macro is
  l_col_list varchar2(4000);
  l_sql varchar2(4000);
begin
  sqm_util.list_columns(p_columns, l_col_list, 'nvl(%s||null, ''<NULL>'') %s');
  l_sql := 'select '  || l_col_list || ' from p_table';
  dbms_output.put_line(l_sql);
  return l_sql;
end;
/
with data as (
   select ename, mgr, comm
   from emp
   where (mgr is null or comm is null)
   and rownum < 5
)
select * from sqm(data, columns(ename, mgr, comm));

select nvl("ENAME"||null, '<NULL>') "ENAME",
nvl("MGR"||null, '<NULL>') "MGR",
nvl("COMM"||null, '<NULL>') "COMM" 
from p_table

ENAME   MGR      COMM     
KING     <NULL>    <NULL>    
BLAKE    7839      <NULL>    
CLARK    7839      <NULL>    
JONES    7839      <NULL> 

Again, these services will make more sense when I use them in future blog posts to do generic pivoting, unpivoting or comparing of tables.

Code (as of 2021-12-09)

create or replace type clob_varray1_t as varray(1) of clob
/
create or replace package sqm_util as
/* SQM = SQl Macro
Package providing common services for parameters of type DBMS_TF.TABLE_T and DBMS_TF.COLUMNS_T
*/

------- Getting the contents of a DBMS_TF.TABLE_T structure: better than DBMS_TF.TRACE

  -- SQL table macro to store the TABLE_T data as JSON, then return it
  function get_table_t_json(
    p_table in dbms_tf.table_t
  ) return varchar2 sql_macro;

  -- SQL table macro to store the TABLE_T data, then return it flattened
  function get_table_t_flattened(
    p_table in dbms_tf.table_t
  ) return varchar2 sql_macro;

-------- Lookup table based on (type)(charsetform) from DBMS_TF.TABLE_T.column(i).description
  type t_col_data is record(
    column_name varchar2(130),  -- from TABLE_T.column(i).description.name
    type_label varchar2(128),   -- My label for datatype associated with each type/charsetform
    to_string varchar2(256),    -- expression translating the datatype to a string (useful for UNPIVOT + comparisons)
    comparable varchar2(256)    -- expression translating the datatype to something comparable (e.g. hash for LOB)
  );
  type tt_col_data is table of t_col_data;

  -- procedure that fills a tt_col_data variable based on the input TABLE_T structure
  procedure col_data_records(
    p_table in dbms_tf.table_t,
    pt_col_data in out nocopy tt_col_data
  );

  -- procedure that fills comma-separated lists of data from a tt_col_data instance
  -- columns listed in the optional EXCLUDE parameter are omitted
  procedure col_data_strings(
    p_table in dbms_tf.table_t,
    p_column_names in out nocopy long,
    p_type_labels in out nocopy long,
    p_to_strings in out nocopy long,
    p_comparables in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  );

  -- convenience procedures to fill any one of the above lists
  procedure col_column_names(
    p_table in dbms_tf.table_t,
    p_column_names in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  );

  procedure col_type_labels(
    p_table in dbms_tf.table_t,
    p_type_labels in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  );

  procedure col_to_strings(
    p_table in dbms_tf.table_t,
    p_to_strings in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  );

  procedure col_comparables(
    p_table in dbms_tf.table_t,
    p_comparables in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  );

  -- procedure to convert a DBMS_TF.COLUMNS_T table into a string list.
  -- Each item in the list is generated from p_template with '%s' replaced by the column name
  -- Items are delimited by p_delimiter
  -- All COLUMNS_T members are double-quoted, so there is an option to remove the quotes
  procedure list_columns(
    p_columns in dbms_tf.columns_t,
    p_column_list in out nocopy varchar2,
    p_template in varchar2 default '%s',
    p_delimiter in varchar2 default ',',
    p_remove_quotes boolean default false
  );

-------- The stuff that follows is meant to be called by SQL generated from macros
  
  -- function used in to_string expression when datatype is BFILE
  function get_bfile_info(p_bfile in bfile) return varchar2;

  -- CLOB to store trace of a DBMS_TF.TABLE_T structure (in JSON pretty print format)
  table_t_clob clob;
  
  -- procedure that stores a JSON equivalent of an input TABLE_T structure
  -- I add the "type_label" from my lookup table
  procedure put_table_t_clob(
    p_table in dbms_tf.table_t
  );

  -- table function to return contents of table_t_clob, then reset
  function get_table_t_clob return clob_varray1_t;

end sqm_util;
/
create or replace package body sqm_util as

---- Types, constants, variables, functions and procedures accessible within BODY only

  -- Length of substring displayed for LOBs
  c_substr_length constant number := 1000;

  -- Constant values for stringifying datatypes (must produce %CHAR%, not N%CHAR%)
  c_self constant varchar2(128) := '%s';
  c_to_char constant varchar2(128) := 'to_char(%s) %s';
  c_num_to_string constant varchar2(128) := 'to_char(%s, ''TM'') %s';
  c_long_to_string constant varchar2(128) := '''(LONG* unsupported)'' %s';
  c_date_to_string constant varchar2(128) := 'to_char(%s, ''yyyy-mm-dd"T"hh24:mi:ss'') %s';
  c_raw_to_string constant varchar2(128) := 'rawtohex(%s) %s';
  c_xml_to_string constant varchar2(128) := '(%s).getstringval() %s';
  c_rowid_to_string constant varchar2(128) := 'rowidtochar(%s) %s';
  c_clob_to_string constant varchar2(128) := 'to_char(substr(%s, 1, '||c_substr_length||')) %s';
  c_nclob_to_string constant varchar2(128) := 'cast(to_char(substr(%s, 1, '||c_substr_length||')) as varchar2('||c_substr_length||')) %s';
  c_blob_to_string constant varchar2(128) := 'rawtohex(substrb(%s, 1, '||c_substr_length||')) %s';
  c_bfile_to_string constant varchar2(128) := 'sqm_util.get_bfile_info(%s) %s';
  c_json_to_string constant varchar2(128) := 'json_serialize(%s) %s';
  c_object_to_string constant varchar2(128) := 'json_object(%s) %s';
  c_collection_to_string constant varchar2(128) := 'json_array(%s) %s';
  c_ts_to_string constant varchar2(128) := 'to_char(%s, ''fmyyyy-mm-dd"T"hh24:mi:ss.ff'') %s';
  c_tsz_to_string constant varchar2(128) := 'to_char(%s, ''fmyyyy-mm-dd"T"hh24:mi:ss.ff tzr'') %s';
  c_tsltz_to_string constant varchar2(128) := 'to_char(%s at time zone ''UTC'', ''fmyyyy-mm-dd"T"hh24:mi:ss.ff tzr'') %s';
  
  -- Allow comparison, group by, etc. for datatypes that do not allow it natively
  c_lob_comparable constant varchar2(128) := '''HASH_SH256: ''||dbms_crypto.hash(%s, 4) %s';  
  
  -- Constant lookup table (indexed by type, charsetform) providing templates for col_data records
  -- Copy the template to the output record, plug the column name into the COLUMN_NAME field
  -- and in the other fields replace %s with the column name
  type taa_col_data is table of t_col_data index by pls_integer;
  type ttaa_col_data is table of taa_col_data index by pls_integer;
  ctaa_col_data constant ttaa_col_data := ttaa_col_data(
    1=>taa_col_data(
                    1=>t_col_data(null, 'VARCHAR2',       c_self,                 c_self),
                    2=>t_col_data(null, 'NVARCHAR2',      c_to_char,              c_self)
    ),
    2=>taa_col_data(0=>t_col_data(null, 'NUMBER',         c_num_to_string,        c_self)),
    8=>taa_col_data(0=>t_col_data(null, 'LONG',           c_long_to_string,       c_long_to_string)),
   12=>taa_col_data(0=>t_col_data(null, 'DATE',           c_date_to_string,       c_self)),
   13=>taa_col_data(0=>t_col_data(null, 'EDATE',          c_date_to_string,       c_self)),
   23=>taa_col_data(0=>t_col_data(null, 'RAW',            c_raw_to_string,        c_self)),
   24=>taa_col_data(0=>t_col_data(null, 'LONG_RAW',       c_long_to_string,       c_long_to_string)),
   58=>taa_col_data(0=>t_col_data(null, 'XMLTYPE',        c_xml_to_string,        c_xml_to_string)),
   69=>taa_col_data(0=>t_col_data(null, 'ROWID',          c_rowid_to_string,      c_self)),
      -- As of 2021-02-14 typecode 1 is returned instead of 96
   96=>taa_col_data(
                    1=>t_col_data(null, 'CHAR',           c_self,                 c_self),
                    2=>t_col_data(null, 'NCHAR',          c_to_char,              c_self)
  ),
  100=>taa_col_data(0=>t_col_data(null, 'BINARY_FLOAT',   c_num_to_string,        c_self)),
  101=>taa_col_data(0=>t_col_data(null, 'BINARY_DOUBLE',  c_num_to_string,        c_self)),
  112=>taa_col_data(
                    1=>t_col_data(null, 'CLOB',           c_clob_to_string,       c_lob_comparable),
                    2=>t_col_data(null, 'NCLOB',          c_nclob_to_string,      c_lob_comparable)
  ),
  113=>taa_col_data(0=>t_col_data(null, 'BLOB',           c_blob_to_string,       c_lob_comparable)),
  114=>taa_col_data(0=>t_col_data(null, 'BFILE',          c_bfile_to_string,      c_bfile_to_string)),
  119=>taa_col_data(0=>t_col_data(null, 'JSON',           c_json_to_string,       c_json_to_string)),
  121=>taa_col_data(0=>t_col_data(null, 'UDT_OBJECT',     c_object_to_string,     c_object_to_string)),
  122=>taa_col_data(0=>t_col_data(null, 'UDT_NESTED',     c_collection_to_string, c_collection_to_string)),
  123=>taa_col_data(0=>t_col_data(null, 'UDT_VARRAY',     c_collection_to_string, c_collection_to_string)),
  180=>taa_col_data(0=>t_col_data(null, 'TIMESTAMP',      c_ts_to_string,         c_self)),
  181=>taa_col_data(0=>t_col_data(null, 'TIMESTAMP_TZ',   c_tsz_to_string,        c_self)),
  182=>taa_col_data(0=>t_col_data(null, 'INTERVAL_YM',    c_to_char,              c_self)),
  183=>taa_col_data(0=>t_col_data(null, 'INTERVAL_DS',    c_to_char,              c_self)),
  187=>taa_col_data(0=>t_col_data(null, 'ETIMESTAMP',     c_ts_to_string,         c_self)),
  188=>taa_col_data(0=>t_col_data(null, 'ETIMESTAMP_TZ',  c_tsz_to_string,        c_self)),
  189=>taa_col_data(0=>t_col_data(null, 'EINTERVAL_YM',   c_to_char,              c_self)),
  190=>taa_col_data(0=>t_col_data(null, 'EINTERVAL_DS',   c_to_char,              c_self)),
  231=>taa_col_data(0=>t_col_data(null, 'TIMESTAMP_LTZ',  c_tsltz_to_string,      c_self)),
  232=>taa_col_data(0=>t_col_data(null, 'ETIMESTAMP_LTZ', c_tsltz_to_string,      c_self))
  );

---- Public functions / procedures: see package specification for description

  function get_bfile_info(p_bfile in bfile) return varchar2 is
    l_dir_alias varchar2(128);
    l_filename varchar2(128);
  begin
    dbms_lob.filegetname (p_bfile, l_dir_alias, l_filename); 
    return 'bfilename(''' || l_dir_alias || ''', ''' || l_filename ||''')';
  end get_bfile_info;

  procedure put_table_t_clob(
    p_table in dbms_tf.table_t
  ) is
    l_column dbms_tf.table_columns_t;
    ja_column json_array_t;
    jo_table json_object_t;
  
    procedure get_description(
      p_description in dbms_tf.column_metadata_t,
      jo_description in out nocopy json_object_t
    ) is
    begin
      jo_description := new json_object_t;
      jo_description.put('type', p_description.type);
      jo_description.put('max_len', p_description.max_len);
      jo_description.put('name', p_description.name);
      jo_description.put('name_len', p_description.name_len);
      jo_description.put('precision', p_description.precision);
      jo_description.put('scale', p_description.scale);
      jo_description.put('charsetid', p_description.charsetid);
      jo_description.put('charsetform', p_description.charsetform);
      jo_description.put('collation', p_description.collation);
      jo_description.put('type_label', ctaa_col_data(p_description.type)(p_description.charsetform).type_label);
      -- following lines commented out until Oracle supports this info
      --jo_description.put('schema_name', p_description.schema_name);
      --jo_description.put('schema_name_len', p_description.schema_name_len);
      --jo_description.put('type_name', p_description.type_name);
      --jo_description.put('type_name_len', p_description.type_name_len);
    end get_description;
    
    procedure get_column_element(
      p_column_element dbms_tf.column_t,
      jo_column_element in out nocopy json_object_t
    ) is
      l_description dbms_tf.column_metadata_t;
      jo_description json_object_t;
    begin
      jo_column_element := new json_object_t;
      l_description := p_column_element.description;
      get_description(l_description, jo_description);
      jo_column_element.put('description', jo_description);
      jo_column_element.put('pass_through', p_column_element.pass_through);
      jo_column_element.put('for_read', p_column_element.for_read);
    end get_column_element;
    
    procedure get_column(
      p_column dbms_tf.table_columns_t,
      ja_column in out nocopy json_array_t
    ) is
      l_column_element dbms_tf.column_t;
      jo_column_element json_object_t;
    begin
      ja_column := new json_array_t;
      for i in 1..p_column.count loop
        l_column_element := p_column(i);
        get_column_element(l_column_element, jo_column_element);
        ja_column.append(jo_column_element);
      end loop;
    end get_column;  
    
  begin
    jo_table := new json_object_t;
    jo_table.put('schema_name',p_table.schema_name);
    jo_table.put('package_name',p_table.package_name);
    jo_table.put('ptf_name',p_table.ptf_name);
    jo_table.put('table_schema_name',p_table.table_schema_name);
    jo_table.put('table_name',p_table.table_name);
    l_column := p_table.column;
    get_column(l_column, ja_column);
    jo_table.put('column',ja_column);
    sqm_util.table_t_clob := jo_table.to_clob;
    select json_serialize(sqm_util.table_t_clob returning clob pretty)
      into sqm_util.table_t_clob
    from dual;
  end put_table_t_clob;

  function get_table_t_clob return clob_varray1_t is
    l_clob clob := sqm_util.table_t_clob;
  begin
    sqm_util.table_t_clob := null;
    return clob_varray1_t(l_clob);
  end get_table_t_clob;

  function get_table_t_json(
    p_table in dbms_tf.table_t
  ) return varchar2 sql_macro is
    l_sql varchar2(4000) :=
      'select column_value table_t_json from sqm_util.get_table_t_clob()';
  begin
    put_table_t_clob(p_table);
    return l_sql;
  end get_table_t_json;

  function get_table_t_flattened(
    p_table in dbms_tf.table_t
  ) return varchar2 sql_macro is
    l_sql varchar2(4000) := '
select j.*
from sqm_util.get_table_t_clob(), json_table(
  column_value, ''$'' columns (
    table_schema_name, table_name,
    nested path ''$.column[*].description'' columns (
      column_id for ordinality,
      type number, charsetform number, type_label, max_len number,
      name, name_len number,
      precision number, scale number, charsetid number, collation number
    )
  )
) j';
  begin
    put_table_t_clob(p_table);
    return l_sql;
  end get_table_t_flattened;
  
  procedure col_data_records(
    p_table in dbms_tf.table_t,
    pt_col_data in out nocopy tt_col_data
  ) is
    l_table_columns_t dbms_tf.table_columns_t := p_table.column;
    l_meta dbms_tf.column_metadata_t;
    l_col_data t_col_data;
  begin
    pt_col_data := new tt_col_data();
    for i in 1..l_table_columns_t.count loop
      l_meta := l_table_columns_t(i).description;
      l_col_data := ctaa_col_data(l_meta.type)(l_meta.charsetform);
      l_col_data.column_name := l_meta.name;
      l_col_data.to_string := replace(l_col_data.to_string, '%s', l_meta.name);
      l_col_data.comparable := replace(l_col_data.comparable, '%s', l_meta.name);
      pt_col_data.extend;
      pt_col_data(i) := l_col_data;
    end loop;
  end col_data_records;

  procedure col_data_strings(
    p_table in dbms_tf.table_t,
    p_column_names in out nocopy long,
    p_type_labels in out nocopy long,
    p_to_strings in out nocopy long,
    p_comparables in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  ) is
    l_table_columns_t dbms_tf.table_columns_t := p_table.column;
    l_meta dbms_tf.column_metadata_t;
    l_col_data t_col_data;
    type t_cols_lookup is table of int index by varchar2(130);
    lt_cols_lookup t_cols_lookup;
    is_first_item boolean := true;
  begin
    if p_exclude_cols is not null then
      for i in 1..p_exclude_cols.count loop
        lt_cols_lookup(p_exclude_cols(i)) := 0;
      end loop;
    end if;
    
  for i in 1..l_table_columns_t.count loop
      l_meta := l_table_columns_t(i).description;
      if lt_cols_lookup.exists(l_meta.name) then
        continue;
      end if;
      l_col_data := ctaa_col_data(l_meta.type)(l_meta.charsetform);
      p_column_names := case when not is_first_item then p_column_names || ',' end
                        || l_meta.name;
      p_type_labels  := case when not is_first_item then p_type_labels  || ',' end
                        || l_col_data.type_label;
      p_to_strings   := case when not is_first_item then p_to_strings   || ',' end
                        || replace(l_col_data.to_string, '%s', l_meta.name);
      p_comparables  := case when not is_first_item then p_comparables  || ',' end
                        || replace(l_col_data.comparable, '%s', l_meta.name);
      is_first_item := false;
    end loop;
  end col_data_strings;

  procedure col_column_names(
    p_table in dbms_tf.table_t,
    p_column_names in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  ) is
    l_type_labels long;
    l_to_strings long;
    l_comparables long;
  begin
    col_data_strings(p_table, p_column_names,l_type_labels,l_to_strings,l_comparables,
      p_exclude_cols);
  end col_column_names;

  procedure col_type_labels(
    p_table in dbms_tf.table_t,
    p_type_labels in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  ) is
    l_column_names long;
    l_to_strings long;
    l_comparables long;
  begin
    col_data_strings(p_table, l_column_names,p_type_labels,l_to_strings,l_comparables,
      p_exclude_cols);
  end col_type_labels;

  procedure col_to_strings(
    p_table in dbms_tf.table_t,
    p_to_strings in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  ) is
    l_column_names long;
    l_type_labels long;
    l_comparables long;
  begin
    col_data_strings(p_table, l_column_names,l_type_labels,p_to_strings,l_comparables,
      p_exclude_cols);
  end col_to_strings;

  procedure col_comparables(
    p_table in dbms_tf.table_t,
    p_comparables in out nocopy long,
    p_exclude_cols in dbms_tf.columns_t default null
  ) is
    l_column_names long;
    l_type_labels long;
    l_to_strings long;
  begin
    col_data_strings(p_table, l_column_names,l_type_labels,l_to_strings,p_comparables,
      p_exclude_cols);
  end col_comparables;

  procedure list_columns(
    p_columns in dbms_tf.columns_t,
    p_column_list in out nocopy varchar2,
    p_template in varchar2 default '%s',
    p_delimiter in varchar2 default ',',
    p_remove_quotes boolean default false
  ) is
    l_column varchar2(130);
  begin
    for i in 1..p_columns.count loop
      l_column := p_columns(i);
      if p_remove_quotes then
        l_column := trim('"' from l_column);
      end if;
      p_column_list :=
        case when i > 1 then p_column_list || p_delimiter end ||
        replace(p_template, '%s', l_column);
    end loop;
  end list_columns;

end sqm_util;
/