目录

一、拉链表概念

二、拉链表对应的业务需求

三、代码实现

3.1 数据初始化:

3.2 创建ods层增量表:

3.3 创建dwd层拉链表

3.4 数据更新 ,将数据日期为2023-3-4的日期添加到拉链表中

3.4.1 先追加数据到ods层表

3.4.2更新dwd层表数据


一、拉链表概念

拉链表是一种数据模型,主要是针对数据仓库设计中表存储数据的方式而定义的,顾名思义,所谓拉链,就是记录历史。记录一个事物从开始,一直到当前状态的所有变化的信息。拉链表可以避免按每一天存储所有记录造成的海量存储问题,同时也是处理缓慢变化数据(SCD2)的一种常见方式。

百度百科的解释:拉链表是维护历史状态,以及最新状态数据的一种表,拉链表根据拉链粒度的不同,实际上相当于快照,只不过做了优化,去除了一部分不变的记录,通过拉链表可以很方便的还原出拉链时点的客户记录。

二、拉链表对应的业务需求

  1. 数据量比较大;
  2. 表中的部分字段会被update, 如用户的地址,产品的描述信息,订单的状态等等;
  3. 需要查看某一个时间点或者时间段的历史快照信息,比如,查看某一个订单在历史某一个时间点的状态,比如,查看某一个用户在过去某一段时间内,更新过几次等等;
  4. 变化的比例和频率不是很大,比如,总共有1000万的会员,每天新增和发生变化的有10万左右;
  5. 如果对这边表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费。
  6. 拉链历史表,既能满足反应数据的历史状态,又可以最大程度的节省存储;

缺点:

  • 开发/使用 成本略高,大量记录频繁变更会导致存储压缩效果降低
  • 不分区的话,数据量大的时候,查询效率低
  • 如果某个日期同步的数据出现问题需要重跑数据,则需要重跑从出问题的日期到当前日期的每一天数据

三、代码实现

拉链表主要用在dwd层,用来及时记录每个事务状态的。加入ods层数据发生的新增或者更新,相应的dwd层的数据也会改变。拉链表数据生成的思路是:ods更新或者新增的数据 + union +dwd拉链表历史数据(要更改历史数据中状态发生改变的字段)。
方法有两种: 窗口函数和union all 。

3.1 数据初始化:

导入数据到一张初始表 (外部表)

-----------拉链---------------create database lalian;use lalian;drop table if exists orders;create external tableorders(orderid int,createdate string,modifiedtime string,status string)row format delimited fields terminated by '\t'location '/tmp/lalian/orders';-- [root@reagan180 ~]# vim /opt/aa.txt-- [root@reagan180 ~]# hdfs dfs -mkdir -p /tmp/lalian/orders/-- [root@reagan180 ~]# hdfs dfs -put /opt/aa.txt /tmp/lalian/orders/select * from orders;

3.2 创建ods层增量表:(按日期分区)

将初始表添加覆盖到ods层表中,数据日期为2023-3-3

create tableods_orders_inc(orderid int,createdate string,modifiedtime string,status string)partitioned by (day string)row format delimited fields terminated by '\t';insert overwrite table ods_orders_inc partition (day='2023-03-03')select orderid, createdate, modifiedtime, status from orders;

3.3 创建dwd层拉链表

将ods层数据添加覆盖到dw层,dw表增加 start_timeend_time 两列数据用来记录时间动态。

其实默认end_time 为时间极限值 ‘9999-12-31’

create table dws_orders_his(orderid int,createdate string,modifiedtime string,status string,start_time string,end_time string)row format delimited fields terminated by '\t';insert overwritetabledws_orders_hisselect orderid, createdate, modifiedtime, status, modifiedtime,'9999-12-31' from ods_orders_inc where day='2023-03-03';

3.4 数据更新 ,将数据日期为2023-3-4的日期添加到拉链表中

3.4.1 先追加数据到ods层表

1、删除 hdfs 路径的aa.txt 文件 :[root@reagan180 ~]# hdfs dfs -rm -f /tmp/lalian/orders/*

2、将更新的数据重新传入hdfs路径:

[root@reagan180 ~]# hdfs dfs -put /opt/aa.txt /tmp/lalian/orders/

where条件,不会覆盖日期2023-3-3的数据

insert overwrite table ods_orders_inc partition (day='2023-03-04')select orderid, createdate, modifiedtime, status from orders where modifiedtime='2023-3-4';

3.4.2更新dwd层表数据

可以采用union all 和窗口函数

union all :

insert overwritetabledws_orders_hisselect tb.orderid, tb.createdate, tb.modifiedtime, tb.status, tb.start_time , tb.end_timefrom((select orderid, createdate, modifiedtime, status, modifiedtime as start_time,'9999-12-31' as end_time from ods_orders_inc where day='2023-03-04')union all(select t1.orderid,t1.createdate,t1.modifiedtime,t1.status,t1.start_time,if(t2.orderid is not null and t1.end_time>'2023-03-04', t2.modifiedtime, '9999-12-31') endtimefrom dws_orders_his t1left join(select orderid from ods_orders_inc where day='2023-03-04') t2 on t1.orderid=t2.orderid))tb order by tb.orderid,tb.start_time;

窗口函数:

lead(start_time, 1) over (partition by orderid order by start_time)

补充一下每日的用户更新表该怎么获取:

  1. 我们可以监听Mysql数据的变化,比如说用Canal,最后合并每日的变化,获取到最后的一个状态。
  2. 假设我们每天都会获得一份切片数据,我们可以通过取两天切片数据的不同来作为每日更新表。
  3. 每日的变更流水表。
  4. 通过etl工具对操作型数据库按照时间字段增量抽取到ods或者数据仓库(每天抽取前一天的数据),形成每天的增量数据(实际中使用最多的情形)。