生产实践 | Flink + 直播(三)| 如何建设当前正在直播 xx 数

整体架构

本文是 「直播实时数据建设」系列的第三篇,主要介绍「生产侧指标的建设」,比如当前正在直播直播间数,或者主播数等。在介绍生产侧指标的建设过程之前,先回顾下上一节的 「架构」 图。

img

架构

而本篇要介绍的 「生产侧指标」 的数据链路主要对应以下几个模块。

  • 数据源:读取直播生产,比如开播,关播等 kafka 数据源日志;
  • 数据处理:使用生产侧数据源 + 实时画像维表 + flink 建设生产侧实时指标;
  • 数据汇:将处理完成的指标数据写入到 kafka 中。

图中 「标红」 模块为生产侧指标的数据链路涉及到的模块。用另一张图进行了标注。

img

生产侧架构

其中直播间实时画像维表的介绍已经在上节进行了介绍,感兴趣的话可以点击以下链接,跳转到上节进行阅读~

img

生产实践 | Flink + 直播(二)| 如何建设实时公共画像维表?

本小节就不针对 「生产侧指标的建设」中所有涉及指标的建设过程进行详细介绍了,主要以「当前分钟正在开播直播间数」 作为 「生产侧指标建设」 的一个代表性案例,介绍这个指标的整个建设过程。来为大家还原生产侧指标的业务过程以及技术方案。

Question

仍然从几个问题入手,介绍 「当前分钟正在开播直播间数」 的建设过程。

  • 「当前分钟正在开播直播间数」 的定义什么?业务过程是怎么样的?举例?
  • 怎样去建设这个指标?整体的指标计算流程?

1.聊聊定义?

当前分钟正在开播直播间数,其定义就是整个平台中,当前分钟正在开播的直播间数 + 单层维度下钻的当前分钟正在开播的直播间数。

举例:

现在的时间点是 2020-11-11 12:42,当前分钟直播的直播间数为 3000 个(其中根据平台维度下钻得到:IOS 平台为 1500,安卓平台为 1500)

到了 12:43 时,有 200 个直播间进行了关播(其中 100 个为 IOS,100 个为安卓),有 100 个直播间开播(全部为 IOS),则当前分钟正在直播的直播间数为 2900(根据平台维度下钻得到:IOS 平台为 1500,安卓平台为 1400)。

其中 2020-11-11 12:42 的 3000 以及 2020-11-11 12:43 的 2900 以及按照平台下钻的数值就为当前时间正在开播的直播间数,也就是最终产出的结果。

根据上述定义和分析,可以明确整个过程中涉及到的数据源和数据汇的 schema 信息。

数据源 schema

数据源 schema 如下。

字段备注
live_stream_id直播间 id
author_id主播 id
start_or_end开播还是关播
timestamp时间戳

数据汇 schema

根据整体处理过程以及最终想要获取的结果,将数据汇 schema 信息确定如下。

字段备注
timestamp时间戳,汇总到分钟粒度
metric_name指标名,举例:开播直播间数
metric_value指标值,举例:3000(开播直播间数)
dim_name维度名,举例:平台,版本
dim_value维度值,举例:IOS,8.1

❝Notes: 「metric_name 和 metric_value」: 这两个字段是为了之后进行指标扩充时进行的设计。比如后续如果需要加入开播主播数,开播时长等指标,不用修改数据汇 schema,只需要加一种 metric_name,就可以使用原有 schema 进行数据产出。 「dim_name 和 dim_value」: 目前建设的指标只提供了进行单维度下钻的能力,所以设计了 dim_name 和 dim_value 两个字段,可满足用户查看平台为 IOS 的当前开播直播间数或者使用开播软件版本为 8.1 的当前开播直播间数。如果后续业务场景需要多维下钻能力,可以在字段上面进行扩充。或者也可以提供明细数据在 OLAP 中进行多维下钻。 ❞

2.怎样建设?

对于当前分钟正在开播直播间数来说,其计算方式很简单,就是下面这个数学公式:

「当前分钟正在开播直播间数」 = 「上一分钟正在开播直播间数」 + 「当前分钟开播直播间数」 - 「当前分钟关播直播间数」

目前我们已经有了计算的公式,那么就可以详细分析下指标的计算处理逻辑。并且还可以获取到另一个信息,对于当前分钟正在开播直播间数的计算来说,是依赖上下文信息的,即 「上一分钟正在开播直播间数」 ,这也就是 「状态」

首先介绍指标处理逻辑。

指标处理逻辑

从获取到数据源,到产出指标的整体处理逻辑如下图所示。这里就不使用文字进行赘述了。

img

数据流转

其中标为 「粉色」 的模块为任务中的 「状态」 ,即任务中一直存储的当前分钟正在开播直播间数。

状态

既然涉及到了状态,那么这里就展开介绍一下我对 「状态」 的理解。如有错误,请在文末讨论中进行指出,我会和大家讨论。

状态其实就是一个记录上下文信息的东西,如果当前的计算过程依赖到上次计算的结果,那么上次计算的结果就是状态。举几个?;

  • 「流处理」:如本节介绍的 「当前分钟正在开播直播间数」的计算,就是依赖上一分钟的正在开播直播间数(「状态」)进行的计算。可能有小伙伴会说,能不能不依赖上一分钟,从头开始计算可以不?答案是可以的,但是从头开始计算,也需要将所有历史数据进行存储,这些历史数据其实也就是状态,只不过我们将其优化为了上一分钟开播直播间数。
  • 「批处理」:今天的全量表 = 昨天全量表(「状态」) + 今天的增量表。
  • 数据库存储」:最常见的 mysql 主键自增,unique key 等。为什么新插入一条数据主键会自增?因为 mysql 存储了主键的上一个值(「状态」)。为什么插入相同数据时,由于 unique key 会导致报错,就是因为 mysql 存储了所有 unique key 的字段的数据(「状态」)。
  • 「生活」:当前的手机电量 = 上一分钟的手机电量(「状态」) + (充电/用电量)。为什么你越来越喜欢你的另一半?因为你对她的感觉 = 前一秒你对她的感觉(「状态」)+ 当前这一秒她亲了你一下。

img

生活中随处可见 「状态」 ,即使不是程序员,我相信也都可以理解 「状态」 的概念。

指标计算代码示例

实现方式举例如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class LiveStreamRealtimeMetricProdProcessorJob {

   public static void main(String[] args) throws Exception {
       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       DataStream<SourceModel> source = SourceFactory.getSourceDataStream(...);

       DataStream<SinkModel> result = source
              .keyBy(new KeySelector<SourceModel, Long>() {
                   @Override
                   public Long getKey(SourceModel commonModel) throws Exception {
                       return commonModel.getLiveStreamId() % 1000;
                  }
              })
              .timeWindow(Time.seconds(60))
              .process(new ProcessWindowFunction<SourceModel, SinkModel, Long, TimeWindow>() {

                   private ValueState<Long> playingLiveStreamNumberValueState;

                   @Override
                   public void open(Configuration parameters) throws Exception {
                       super.open(parameters);
                       this.playingLiveStreamNumberValueState = getRuntimeContext().getState(...);
                  }

                   @Override
                   public void process(Long bucket, Context context, Iterable<SourceModel> iterable,
                           Collector<SinkModel> collector) throws Exception {
                       Long playingLiveStreamNumber = this.playingLiveStreamNumberValueState.value();

                       if (null == playingLiveStreamNumber) {
                           playingLiveStreamNumber = 0L;
                      }

                       List<SourceModel> sourceModels = (List<SourceModel>) iterable;

                       for (SourceModel sourceModel : sourceModels) {
                           if (BizType.I == sourceModel.getBizType()) {
                               playingLiveStreamNumber++;
                          } else {
                               playingLiveStreamNumber--;
                          }
                      }

                       this.playingLiveStreamNumberValueState.update(playingLiveStreamNumber);

                       collector.collect(
                               SinkModel.builder().build()
                      );
                  }
              });

       SinkFactory.setSinkDataStream(...);

       env.execute();
  }

   @Data
   @Builder
   static class SourceModel {
       // 直播间id
       private Long liveStreamId;
       // 开播时间,关播时间
       private Long time;
       // 主播id
       private Long authorId;
       // binlog 时间戳
       private long binlogTimestamp;
       // 开播,关播
       private BizType bizType;
  }

   enum BizType {
       I, // 开播
       D, // 关播
      ;
  }

   @Data
   @Builder
   static class SinkModel {
       // 时间戳,汇总到分钟粒度
       private Long timestamp;
       // 指标名
       private String metricName;
       // 指标值
       private double metricValue;
       // 维度名
       private String dimName;
       // 维度值
       private String dimValue;
  }
}

复制

总结

本文衔接上文,主要介绍直播间 「生产侧指标的建设」 ,以 「当前分钟正在开播直播间数」 为代表举例。提出定义以及建设过程相关的问题,以这两个个问题出发,引出了以下两小节。

第一节简单介绍了当前分钟正在开播直播间数的定义。

第二节主要介绍了当前分钟正在开播直播间数的建设逻辑以及过程,并对 「状态」 这个概念进行了一个拓展介绍。

最后一节对本文进行了总结。

如果你也有相同的指标建设需求,或者存在一些指标建设过程中的问题,欢迎关注博主公众号,或者添加博主微信,互相交流~