维表和流表

维表是数仓中的一个概念,维表中的维度属性是观察数据的角度,补充事实表的信息。在实时数仓中,同样也有维表与事实表的概念,其中事实表通常为kafka的实时流数据,维表通常存储在外部设备中(比如MySQL,HBase)。对于每条流式数据,可以关联一个外部维表数据源,为实时计算提供数据关联查询。维表可能是会不断变化的,在维表JOIN时,需指明这条记录关联维表快照的时刻。
本文主要介绍:

1
2
3
4
1、流表和维表的区别
2、流表和维表join的数据流转解析
3、双流join的数据流转解析
4、代码示例和场景

1、流表和维表的区别:

流表:实时流数据映射成的表,在join查询中,每来一条数据都会主动去维表中查询是否有匹配的数据

维表:维度信息表,一般在外部存储(Redis,Mysql中)维表是被动查询的,目前Flink SQL的维表JOIN仅支持对当前时刻维表快照的关联
2、流表和维表join:

在这里插入图片描述
3、流表和流表关联(双流jion):

在这里插入图片描述
4、代码示例(FlinkSQL)

流表和维表join
场景:
流表是用户的行为数据,记录用户学习内容,学习时长等信息
维表是学生信息数据(姓名,手机号)
如果cpt表有一条实时流数据A同学的学习记录过来,此时student表还没有这个学生的信息,获取到的姓名和手机号就是null。当student表更新有A的信息,后续的实时流数据能关联获取到A的姓名和手机号,但是student表更新之前的实时流数据依旧是获取不到的(这点需要注意)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
创建kafka数据源,流表
table_env.execute_sql(
     """CREATE TABLE cpt(
         content_id string,
         student_uid string,
         course_id string,
         content_type int,
         study_time bigint,
         chapter_id string,
         section_id string,
         progress int,
         updated_at string,
         created_at string,
         playback_time int,
         ptime AS PROCTIME()
       ) WITH (
         'connector' = 'kafka',
         'topic' = 'stream_content_study_progress',
         'properties.bootstrap.servers' = 'localhost:9092',
         'properties.group.id' = 't1',
         'scan.startup.mode'='earliest-offset',
         'format' = 'json'
       )
       """)

创建Redis维表
table_env.execute_sql(
   """create table student(
      uid string,
      mobile string,
      name string,
      PRIMARY KEY (uid) NOT ENFORCED
     )with(
     'connector'='redis',
     'ip'='localhost',
     'password'='',
     'port'='6379',
     'table'='user',
     'database'='1'
     )

     """)
创建sink表
table_env.execute_sql("""create table res(
 uid string,
 mobile string,
 courseId string,
 viewingTime bigint,
 updateTime bigint
 )with(
'connector'='kafka',
'topic'='course_viewing_v1',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'changelog-json'
)""")

流表和维表join
table_env.execute_sql(
   """
   insert into res
   select
       t.student_uid as uid,
       student.mobile as mobile,
       t.course_id as courseId,
       t.study_time as viewingTime,
       unix_timestamp(t.updated_at)*1000 as updateTime,
   from cpt as t
   left join student for system_time as of t.ptime on t.student_uid=student.uid
   """
)

流表和流表join
** 场景:上面描述的场景,通过双流join能很好的解决。两个都是流表,两个表当中的任何一个表来数据时,都会去另一个表中查询是否有匹配的数据(包含历史)**

创建流表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
table_env.execute_sql(
     """CREATE TABLE cpt(
         content_id string,
         student_uid string,
         course_id string,
         content_type int,
         study_time bigint,
         chapter_id string,
         section_id string,
         progress int,
         updated_at string,
         created_at string,
         playback_time int,
         ptime AS PROCTIME()
       ) WITH (
         'connector' = 'kafka',
         'topic' = 'stream_content_study_progress',
         'properties.bootstrap.servers' = 'localhost:9092',
         'properties.group.id' = 't1',
         'scan.startup.mode'='earliest-offset',
         'format' = 'json'
       )
       """)

创建流表,其实是维度表
table_env.execute_sql(
   """create table student(
      uid string,
      mobile string,
      name string,
      PRIMARY KEY (uid) NOT ENFORCED
     )with(
    'connector' = 'kafka',
         'topic' = 'stream_student',
         'properties.bootstrap.servers' = 'localhost:9092',
         'properties.group.id' = 't1',
         'scan.startup.mode'='earliest-offset',
         'format' = 'json'
     )

双流join
table_env.execute_sql(
   """
   insert into res
   select
      t.student_uid as uid,
      student.mobile as mobile,
      t.course_id as courseId,
      t.study_time as viewingTime,
      unix_timestamp(t.updated_at)*1000 as updateTime,
   from cpt as t
  left join student t.student_uid=student.uid
   """
)