整体架构
本文是 「直播实时数据建设」系列的第三篇,主要介绍「生产侧指标的建设」,比如当前正在直播直播间数,或者主播数等。在介绍生产侧指标的建设过程之前,先回顾下上一节的 「架构」 图。

架构
而本篇要介绍的 「生产侧指标」 的数据链路主要对应以下几个模块。
- 数据源:读取直播生产,比如开播,关播等 kafka 数据源日志;
- 数据处理:使用生产侧数据源 + 实时画像维表 + flink 建设生产侧实时指标;
- 数据汇:将处理完成的指标数据写入到 kafka 中。
图中 「标红」 模块为生产侧指标的数据链路涉及到的模块。用另一张图进行了标注。

生产侧架构
其中直播间实时画像维表的介绍已经在上节进行了介绍,感兴趣的话可以点击以下链接,跳转到上节进行阅读~

生产实践 | Flink + 直播(二)| 如何建设实时公共画像维表?
本小节就不针对 「生产侧指标的建设」中所有涉及指标的建设过程进行详细介绍了,主要以「当前分钟正在开播直播间数」 作为 「生产侧指标建设」 的一个代表性案例,介绍这个指标的整个建设过程。来为大家还原生产侧指标的业务过程以及技术方案。
Question
仍然从几个问题入手,介绍 「当前分钟正在开播直播间数」 的建设过程。
- 「当前分钟正在开播直播间数」 的定义什么?业务过程是怎么样的?举例?
- 怎样去建设这个指标?整体的指标计算流程?
1.聊聊定义?
当前分钟正在开播直播间数,其定义就是整个平台中,当前分钟正在开播的直播间数 + 单层维度下钻的当前分钟正在开播的直播间数。
举例:
现在的时间点是 2020-11-11 12:42,当前分钟直播的直播间数为 3000 个(其中根据平台维度下钻得到:IOS 平台为 1500,安卓平台为 1500)
到了 12:43 时,有 200 个直播间进行了关播(其中 100 个为 IOS,100 个为安卓),有 100 个直播间开播(全部为 IOS),则当前分钟正在直播的直播间数为 2900(根据平台维度下钻得到:IOS 平台为 1500,安卓平台为 1400)。
其中 2020-11-11 12:42 的 3000 以及 2020-11-11 12:43 的 2900 以及按照平台下钻的数值就为当前时间正在开播的直播间数,也就是最终产出的结果。
根据上述定义和分析,可以明确整个过程中涉及到的数据源和数据汇的 schema 信息。
数据源 schema
数据源 schema 如下。
| 字段 | 备注 |
|---|---|
| live_stream_id | 直播间 id |
| author_id | 主播 id |
| start_or_end | 开播还是关播 |
| timestamp | 时间戳 |
| … | … |
数据汇 schema
根据整体处理过程以及最终想要获取的结果,将数据汇 schema 信息确定如下。
| 字段 | 备注 |
|---|---|
| timestamp | 时间戳,汇总到分钟粒度 |
| metric_name | 指标名,举例:开播直播间数 |
| metric_value | 指标值,举例:3000(开播直播间数) |
| dim_name | 维度名,举例:平台,版本 |
| dim_value | 维度值,举例:IOS,8.1 |
| … | … |
❝Notes: 「metric_name 和 metric_value」: 这两个字段是为了之后进行指标扩充时进行的设计。比如后续如果需要加入开播主播数,开播时长等指标,不用修改数据汇 schema,只需要加一种 metric_name,就可以使用原有 schema 进行数据产出。 「dim_name 和 dim_value」: 目前建设的指标只提供了进行单维度下钻的能力,所以设计了 dim_name 和 dim_value 两个字段,可满足用户查看平台为 IOS 的当前开播直播间数或者使用开播软件版本为 8.1 的当前开播直播间数。如果后续业务场景需要多维下钻能力,可以在字段上面进行扩充。或者也可以提供明细数据在 OLAP 中进行多维下钻。 ❞
2.怎样建设?
对于当前分钟正在开播直播间数来说,其计算方式很简单,就是下面这个数学公式:
「当前分钟正在开播直播间数」 = 「上一分钟正在开播直播间数」 + 「当前分钟开播直播间数」 - 「当前分钟关播直播间数」
目前我们已经有了计算的公式,那么就可以详细分析下指标的计算处理逻辑。并且还可以获取到另一个信息,对于当前分钟正在开播直播间数的计算来说,是依赖上下文信息的,即 「上一分钟正在开播直播间数」 ,这也就是 「状态」 。
首先介绍指标处理逻辑。
指标处理逻辑
从获取到数据源,到产出指标的整体处理逻辑如下图所示。这里就不使用文字进行赘述了。

数据流转
其中标为 「粉色」 的模块为任务中的 「状态」 ,即任务中一直存储的当前分钟正在开播直播间数。
状态
既然涉及到了状态,那么这里就展开介绍一下我对 「状态」 的理解。如有错误,请在文末讨论中进行指出,我会和大家讨论。
状态其实就是一个记录上下文信息的东西,如果当前的计算过程依赖到上次计算的结果,那么上次计算的结果就是状态。举几个?;
- 「流处理」:如本节介绍的 「当前分钟正在开播直播间数」的计算,就是依赖上一分钟的正在开播直播间数(「状态」)进行的计算。可能有小伙伴会说,能不能不依赖上一分钟,从头开始计算可以不?答案是可以的,但是从头开始计算,也需要将所有历史数据进行存储,这些历史数据其实也就是状态,只不过我们将其优化为了上一分钟开播直播间数。
- 「批处理」:今天的全量表 = 昨天全量表(「状态」) + 今天的增量表。
- 「数据库存储」:最常见的 mysql 主键自增,unique key 等。为什么新插入一条数据主键会自增?因为 mysql 存储了主键的上一个值(「状态」)。为什么插入相同数据时,由于 unique key 会导致报错,就是因为 mysql 存储了所有 unique key 的字段的数据(「状态」)。
- 「生活」:当前的手机电量 = 上一分钟的手机电量(「状态」) + (充电/用电量)。为什么你越来越喜欢你的另一半?因为你对她的感觉 = 前一秒你对她的感觉(「状态」)+ 当前这一秒她亲了你一下。

生活中随处可见 「状态」 ,即使不是程序员,我相信也都可以理解 「状态」 的概念。
指标计算代码示例
实现方式举例如下。
1 | public class LiveStreamRealtimeMetricProdProcessorJob { |
复制
总结
本文衔接上文,主要介绍直播间 「生产侧指标的建设」 ,以 「当前分钟正在开播直播间数」 为代表举例。提出定义以及建设过程相关的问题,以这两个个问题出发,引出了以下两小节。
第一节简单介绍了当前分钟正在开播直播间数的定义。
第二节主要介绍了当前分钟正在开播直播间数的建设逻辑以及过程,并对 「状态」 这个概念进行了一个拓展介绍。
最后一节对本文进行了总结。
如果你也有相同的指标建设需求,或者存在一些指标建设过程中的问题,欢迎关注博主公众号,或者添加博主微信,互相交流~