注意的点
网上有很多文章只写了一个字段做测试,所以Flink sql 在写mysql sink的时候 primary key (uid) NOT ENFORCED 是生效的。 但是前提是只有一个字段,flinksql检测数据的时候就不需要关心其是否重复。
而当我们的数据源有多个字段时,这时候再需要upsert into时 primary key (uid) NOT ENFORCED 就不会生效了。
1 | flinksql 不支持 insert overwrite 的语句, 需要注册 实现OverwritableTableSink接口。代码就又复杂了。 |
解决方法
当需要对一整张mysql表进行 insert or update 时,把所有select 语句中的字段 都加入到 group by 语句中 。 这样flinksql识别语法,并根据主键更新mysql表的记录。**
对于实时流来说,如果两条记录全部字段一模一样,本质上代表统计的结果集没有任何变动。 故只需要保留一条即可。 不存在数据被去重掉、丢失的问题。
1 | import bean.{AuthorDetail, AuthorTotalData} |