Setup
drop table if exists account_stage;
create table account_stage (
id int,
name varchar(10),
last_updated timestamptz,
operation char(1),
constraint account_stage_pk primary key (id,last_updated)
);
insert into account_stage values(1,'name1a','2022-01-01','U');
insert into account_stage values(1,'name1a','2022-01-02','D');
insert into account_stage values(2,'name2a','2022-01-01','U');
insert into account_stage values(2,'name2b','2022-01-02','U');
insert into account_stage values(5,'name5','2022-01-02','I');
drop table if exists account;
create table account (
id int,
name varchar(10),
last_updated timestamptz,
constraint account_pk primary key (id)
);
insert into account values(1,'name1','2021-12-31');
insert into account values(2,'name2','2021-12-31');
insert into account values(3,'name3','2021-12-31');
insert into account values(4,'name4','2021-12-31');
test=# select * from account_stage;
id | name | last_updated | operation
----+--------+------------------------+-----------
1 | name1a | 2022-01-01 00:00:00+00 | U
1 | name1a | 2022-01-02 00:00:00+00 | D
2 | name2a | 2022-01-01 00:00:00+00 | U
2 | name2b | 2022-01-02 00:00:00+00 | U
5 | name5 | 2022-01-02 00:00:00+00 | I
(5 rows)
test=# select * from account;
id | name | last_updated
----+-------+------------------------
1 | name1 | 2021-12-31 00:00:00+00
2 | name2 | 2021-12-31 00:00:00+00
3 | name3 | 2021-12-31 00:00:00+00
4 | name4 | 2021-12-31 00:00:00+00
(4 rows)
Get the net changes from staging table
select distinct id,
first_value(name) over w as name,
first_value(last_updated) over w as last_updated,
first_value(operation) over w as operation
from account_stage
window w as (partition by id order by last_updated desc)
order by id;
test=# select distinct id,
test-# first_value(name) over w as name,
test-# first_value(last_updated) over w as last_updated,
test-# first_value(operation) over w as operation
test-# from account_stage
test-# window w as (partition by id order by last_updated desc)
test-# order by id;
id | name | last_updated | operation
----+--------+------------------------+-----------
1 | name1a | 2022-01-02 00:00:00+00 | D
2 | name2b | 2022-01-02 00:00:00+00 | U
5 | name5 | 2022-01-02 00:00:00+00 | I
(3 rows)
Use Merge feature to merge the changes from staging table into target table
merge into account
using
(
select distinct id,
first_value(name) over w as name,
first_value(last_updated) over w as last_updated,
first_value(operation) over w as operation
from account_stage
window w as (partition by id order by last_updated desc)
order by id
) cdc
on account.id=cdc.id
when not matched and cdc.operation='I' then
insert values(cdc.id,cdc.name,cdc.last_updated)
when matched and cdc.operation='D' then
delete
when matched and cdc.operation='U' then
update set name=cdc.name,
last_updated=cdc.last_updated
;
test=# merge into account
test-# using
test-# (
test(# select distinct id,
test(# first_value(name) over w as name,
test(# first_value(last_updated) over w as last_updated,
test(# first_value(operation) over w as operation
test(# from account_stage
test(# window w as (partition by id order by last_updated desc)
test(# order by id
test(# ) cdc
test-# on account.id=cdc.id
test-# when not matched and cdc.operation='I' then
test-# insert values(cdc.id,cdc.name,cdc.last_updated)
test-# when matched and cdc.operation='D' then
test-# delete
test-# when matched and cdc.operation='U' then
test-# update set name=cdc.name,
test-# last_updated=cdc.last_updated
test-# ;
MERGE 3
test=# select * from account order by id;
id | name | last_updated
----+--------+------------------------
2 | name2b | 2022-01-02 00:00:00+00
3 | name3 | 2021-12-31 00:00:00+00
4 | name4 | 2021-12-31 00:00:00+00
5 | name5 | 2022-01-02 00:00:00+00
(4 rows)
Reference:
No comments:
Post a Comment