Flinksql upsert-MysqlSink时primary key 根据主键更新失效问题,Mysql报错duplicate Key主键冲突

注意的点

网上有很多文章只写了一个字段做测试,所以Flink sql 在写mysql sink的时候 primary key (uid) NOT ENFORCED 是生效的。 但是前提是只有一个字段,flinksql检测数据的时候就不需要关心其是否重复。
而当我们的数据源有多个字段时,这时候再需要upsert into时 primary key (uid) NOT ENFORCED 就不会生效了。

1
2
3
4
flinksql 不支持 insert overwrite 的语句, 需要注册 实现OverwritableTableSink接口。代码就又复杂了。
flinksql 不识别 mysql 的 insert into value on duplicate key update 。
flinksql 是没有mysql的方言dailect可以注册的,只有hive的方言。
所以 无法insert overwrite table select * from temp_table 。 故需要用group by 语句,来对实时的结果集做upsert。

解决方法

当需要对一整张mysql表进行 insert or update 时,把所有select 语句中的字段 都加入到 group by 语句中 。 这样flinksql识别语法,并根据主键更新mysql表的记录。**
对于实时流来说,如果两条记录全部字段一模一样,本质上代表统计的结果集没有任何变动。 故只需要保留一条即可。 不存在数据被去重掉、丢失的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import bean.{AuthorDetail, AuthorTotalData}
import com.alibaba.fastjson.JSON
import com.dhgate.myyshop.util.{FlinkEnv, RedisUtil}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.slf4j.LoggerFactory
import redis.clients.jedis.Jedis

import java.util.Properties

object Mds_Tik_Author_Detail_bak {

 private val log = LoggerFactory.getLogger(this.getClass)

 def main(args: Array[String]): Unit = {

val TASK_NAME: String = this.getClass().getSimpleName().filter(!_.equals('$'))
System.setProperty("HADOOP_USER_NAME", "hadoop")
val in = Thread.currentThread.getContextClassLoader.getResourceAsStream("myyshop.properties")
val prop = new Properties
try {
 prop.load(in)
} catch {
 case e: Exception => {
   e.printStackTrace()
}
}

var streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
//streamEnv = FlinkEnv.setStreaminigEnv(streamEnv, prop.getProperty("FLINK_AUTHOR_DETAIL_CHECKPOINT"))

val properties = new Properties()
properties.setProperty("bootstrap.servers", prop.getProperty("KAFKA_BEIJING"))
properties.setProperty("group.id", prop.getProperty("FLINK_TOPIC_AUTHOR_DETAIL_GROUP_ID_2"))

val author_detail_stream: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer[String](
 prop.getProperty("FLINK_AUTHOR_DETAIL"),
 new SimpleStringSchema(),
 properties
).setStartFromEarliest()

import org.apache.flink.streaming.api.scala._

val rawData = streamEnv.addSource(author_detail_stream).setParallelism(1)
val authorDetailDS: DataStream[AuthorDetail] = rawData
.filter(jsonStr => {
   val jsonObj = JSON.parseObject(jsonStr)
   val operationType = jsonObj.getString("operation_type")
   "INSERT".equalsIgnoreCase(operationType)
})
.map(jsonStr => {
   val jsonObj = JSON.parseObject(jsonStr)
   val uid = jsonObj.getString("uid")
   // 关注数
   var followingCount = jsonObj.getLong("following_count")
   // 粉丝数
   var followerCount = jsonObj.getLong("follower_count")
   // 喜欢数
   var totalFavorited = jsonObj.getLong("total_favorited")
   // 作品数
   var awemeCount = jsonObj.getLong("aweme_count")
   if (followingCount == null) {
     followingCount = 0L
  }
   if (followerCount == null) {
     followerCount = 0L
  }
   if (totalFavorited == null) {
     totalFavorited = 0L
  }
   if (awemeCount == null) {
     awemeCount = 0L
  }

   new AuthorDetail(uid, followingCount, followerCount, totalFavorited, awemeCount)
   // author_detail(uid, followingCount, followerCount, totalFavorited, awemeCount)
})

val prefix = "author_detail"

// 数据进来,根据uid查询redis,若没有动态存redis和mysql,增量为0;若有则动态更新mysql,根据redis首条记录计算增量
val value: DataStream[AuthorTotalData] = authorDetailDS.map(new RichMapFunction[AuthorDetail, AuthorTotalData] {
 var jedis: Jedis = null

 override def open(parameters: Configuration): Unit = {
   jedis = RedisUtil.getJedis()
   super.open(parameters)
}

 override def map(authorDetail: AuthorDetail): AuthorTotalData = {
   val key = prefix + authorDetail.getUid
   val redisValue = jedis.hget(key, "followingCount")
   if (redisValue == null) {
     jedis.hset(key, "followingCount", authorDetail.getFollowingCount.toString)
     jedis.hset(key, "followerCount", authorDetail.getFollowerCount.toString)
     jedis.hset(key, "totalFavorited", authorDetail.getTotalFavorited.toString)
     jedis.hset(key, "awemeCount", authorDetail.getAwemeCount.toString)
     //mysql操作
     new AuthorTotalData(authorDetail.getUid, authorDetail.getFollowingCount, authorDetail.getFollowerCount, authorDetail.getTotalFavorited, authorDetail.getAwemeCount, 0, 0, 0, 0)
     //author_detail_sink(authorDetail.uid, authorDetail.following_count, authorDetail.follower_count, authorDetail.total_favorited, authorDetail.aweme_count, 0L, 0L, 0L, 0L)
  } else {
     val oldFollowingCount = redisValue.toLong
     val oldFollowerCount = jedis.hget(key, "followerCount").toLong
     val oldTotalFavorited = jedis.hget(key, "totalFavorited").toLong
     val oldAwemeCount = jedis.hget(key, "awemeCount").toLong

     val followingCountIncrease = authorDetail.getFollowingCount - oldFollowingCount
     val followerCountIncrease = authorDetail.getFollowerCount - oldFollowerCount
     val totalFavoritedIncrease = authorDetail.getTotalFavorited - oldTotalFavorited
     val awemeCountIncrease = authorDetail.getAwemeCount - oldAwemeCount

     //mysql操作
     new AuthorTotalData(authorDetail.getUid, oldFollowingCount, oldFollowerCount, oldTotalFavorited, oldAwemeCount, followingCountIncrease, followerCountIncrease, totalFavoritedIncrease, awemeCountIncrease)
     //author_detail_sink(authorDetail.uid, oldFollowingCount, oldFollowerCount, oldTotalFavorited, oldAwemeCount, followingCountIncrease, followerCountIncrease, totalFavoritedIncrease, awemeCountIncrease)
  }
}


 override def close(): Unit = {
   jedis.close()
   super.close()
}
})
//value.addSink(new Myyshop_Dwd_Sink(prop))
//   println(value)
import org.apache.flink.table.api.SqlDialect


val blinkStreamSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val tableEnv = StreamTableEnvironment.create(streamEnv, blinkStreamSettings)

//把返回的流注册成临时表AuthorTotalData,代表source
tableEnv.createTemporaryView("AuthorTotalData", value)


val MYSQL_TABLE_SINK_DDL = "" +
 "CREATE TABLE `flilnk_author_total_data` (\n" +
 "   uid             STRING ,\n" +
 "   following_count     bigint,\n" +
 "   follower_count       bigint,\n" +
 "   total_favorited         bigint,\n" +
 "   aweme_count             bigint,\n" +
 "   following_count_increase bigint,\n" +
 "   follower_count_increase bigint,\n" +
 "   total_favorited_increase bigint,\n" +
 "   aweme_count_increase bigint, \n" +
 "   primary key (uid) NOT ENFORCED \n" +
 ") WITH (\n" +
 " 'connector.type' = 'jdbc', -- 连接方式\n" +
 " 'connector.url' = 'jdbc:mysql://ip:3306/nacos?useUnicode=true&characterEncoding=utf-8&useSSL=false', -- jdbc的url\n" +
 " 'connector.table' = 'author_total_data', -- 表名\n" +
 " 'connector.driver' = 'com.mysql.jdbc.Driver', -- 驱动名字,可以不填,会自动从上面的jdbc url解析 \n" +
 " 'connector.username' = 'root', -- 用户名\n" +
 " 'connector.password' = 'xxx', -- 密码\n" +
 //" 'connector.write.unique-key' = 'uid', -- 要更新的主键\n" +
 " 'connector.write.max-retries' = '3',-- 写入失败时,重试3次\n" +
 " 'connector.write.flush.max-rows' = '100', -- 意思是攒满多少条才触发写入 \n" +
 " 'connector.write.flush.interval' = '1s' -- 意思是攒满多少秒才触发写入;这2个参数,无论数据满足哪个条件,就会触发写入\n" +
 ")"
//创建flink-mysql-ddl语句,映射到数据库表, 走flinksql集成的SINK
tableEnv.sqlUpdate(MYSQL_TABLE_SINK_DDL)

tableEnv.executeSql("insert into flilnk_author_total_data   " +
 " select " +
 "uid, " +
 "followingCount as following_count , " +
 "followerCount as follower_count," +
 "totalFavorited as total_favorited," +
 "awemeCount as aweme_count," +
 "followingCountIncrease as following_count_increase , " +
 "followerCountIncrease as follower_count_increase , " +
 "totalFavoritedIncrease as total_favorited_increase," +
 "awemeCountIncrease as aweme_count_increase " +
 " from " +
 " AuthorTotalData " +
 "group by uid,followingCount,followerCount,totalFavorited,awemeCount,followingCountIncrease,followerCountIncrease,totalFavoritedIncrease,awemeCountIncrease "
)


streamEnv.execute(TASK_NAME)
}
}