over窗口聚合函数

在标准 SQL 中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值。比如说,我们可以以每一行数据为基准,计算它之前 1 小时内所有数据的平均值;也可以计算它之前 10 个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的“开窗函数”。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口 TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。

与标准 SQL 中一致,Flink SQL 中的开窗函数也是通过 OVER 子句来实现的,所以有时开窗聚合也叫作“OVER 聚合”(Over Aggregation)。基本语法如下:

1
2
3
4
5
6
7
SELECT
<聚合函数> OVER (
[PARTITION BY <字段 1>[, <字段 2>, ...]]
ORDER BY <时间属性字段>
<开窗范围>),
...
FROM ...

MySQL窗口函数教学,MySQL窗口函数和Hive窗口函数以及Flink窗口函数区别不大,只是语法上有些区别,例如开窗范围方面,勤奋的读者一定会看明白。

这里 OVER 关键字前面是一个聚合函数,它会应用在后面 OVER 定义的窗口上。在 OVER子句中主要有以下几个部分:
PARTITION BY(可选)

用来指定分区的键(key),类似于 GROUP BY 的分组,这部分是可选的;
ORDER BY

OVER 窗口是基于当前行扩展出的一段数据范围,选择的标准可以基于时间也可以基于数量。不论那种定义,数据都应该是以某种顺序排列好的;而表中的数据本身是无序的。所以在OVER 子句中必须用 ORDER BY 明确地指出数据基于那个字段排序。在 Flink 的流处理中,目前只支持按照时间属性的升序排列,所以这里 ORDER BY 后面的字段必须是定义好的时间属性。
开窗范围

对于开窗函数而言,还有一个必须要指定的就是开窗的范围,也就是到底要扩展多少行来做聚合。这个范围是由 BETWEEN <下界> AND <上界> 来定义的,也就是“从下界到上界”的范围。目前支持的上界只能是 CURRENT ROW,也就是定义一个“从之前某一行到当前行”的范围,所以一般的形式为:

1
BETWEEN ... PRECEDING AND CURRENT ROW

开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还应该在两种模式之间做出选择:范围间隔(RANGE intervals)和行间隔(ROW intervals)。
范围间隔

范围间隔以 RANGE 为前缀,就是基于 ORDER BY 指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。
例如开窗范围选择当前行之前 1 小时的数据:

1
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW

行间隔

行间隔以 ROWS 为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了。
** 例如开窗范围选择当前行之前的 5 行数据(最终聚合会包括当前行,所以一共 6 条数据):**

1
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW

需求:统计每个用户当前这次访问以及之前三次访问的平均时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    public static void main(String[] args) throws Exception {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);

       StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

       //1.在创建表的DDL中直接定义时间属性
       String creatDDL = "CREATE TABLE clickTable (" +
               "user_name STRING," +
               "url STRING," +
               "ts BIGINT," +
               "et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000))," + //事件时间 FROM_UNIXTIME() 能转换为年月日时分秒这样的格式 转换秒
               " WATERMARK FOR et AS et - INTERVAL '1' SECOND " + //watermark 延迟一秒
               ")WITH(" +
               " 'connector' = 'filesystem'," +
               " 'path' = 'input/clicks.txt'," +
               " 'format' = 'csv'" +
               ")";

       tableEnv.executeSql(creatDDL);


       //需求:统计每个用户当前这次访问以及之前三次访问的平均时间
       Table overWindow = tableEnv.sqlQuery("select user_name," +
               " avg(ts) OVER(" +
               " PARTITION BY user_name " +
               " ORDER BY et " +
               " ROWS BETWEEN 3 PRECEDING AND CURRENT ROW" +
               ") AS avg_ts " +
               "FROM clickTable");


       tableEnv.toChangelogStream(overWindow).print("over window:");


       env.execute();
  }
}

本文来源:https://blog.csdn.net/weixin_45417821/article/details/124640798