如何重新分布kafka分区、增加分区副本数

放弃不难,但坚持很酷~

前言: 前几天,我通过 Kafka 自带的 kafka-reassign-partitions.sh 脚本工具,完成了对 topic 分区副本数的增加。其实 kafka-reassign-partitions.sh 不仅可以实现分区副本数的增加,它还可以实现对 topic 分区的分配。 所以对于 topic 分区分配以及分区副本数的增加,本篇小文都会讲到,图文实操,讲解详细,看完别忘了点赞哦!

Kafka:2.11-1.1.0

一、准备工作

1、创建一个 8 分区、1 副本的 topic:

已知,Kafka 集群中有两个 kafka broker ,id 分别为 200、201 。

1
2
3
4
5
# 创建名为create17的topic
./bin/kafka-topics.sh --create --zookeeper cdh-worker-1:2181/kafka --replication-factor 1 --partitions 8 --topic create17

# 查看名为create17的topic详情:分区数、副本数、Isr等
./bin/kafka-topics.sh --describe --zookeeper cdh-worker-1:2181/kafka --topic create17

二、分区分配

已知,Kafka 集群中有两个 kafka broker ,id 分别为 200、201 ,现在再加一个 broker 节点,id 为 202 。Kafka 只会负载均衡新创建的 topic 分区。所以现在,我们需要将已存在的 create17 topic 的 8 个分区均匀分布在 3 个 broker 节点上,以便实现尽可能的负载均衡,提高写入和消费速度。

Kafka 不会对已存在的分区进行均衡分配,所以需要我们手动执行分区分配操作。

1、声明要分配分区的 topic 列表

Kafka 的 bin 目录中有一个 kafka-reassign-partitions.sh 脚本工具,我们可以通过它分配分区。在此之前,我们需要先按照要求定义一个文件,里面说明哪些 topic 需要分配分区。文件内容如下:

1
2
3
4
5
6
7
8
9
> cat topic-generate.json
{
"topics": [
  {
    "topic": "create17"
  }
],
"version": 1
}

2、通过 --topics-to-move-json-file 参数,生成分区分配策略 --generate

1
./bin/kafka-reassign-partitions.sh --zookeeper cdh-worker-1:2181/kafka --topics-to-move-json-file topic-generate.json --broker-list "200,201,202" --generate
  • **–broker-list:值为要分配的 kafka broker id,以逗号分隔,该参数必不可少。**脚本会根据你的 topic-generate.json 文件,获取 topic 列表,为这些 topic 生成分布在 broker list 上面的分区分配策略。

输出结果中有你当前的分区分配策略,也有 Kafka 期望的分配策略,在期望的分区分配策略里,kafka 已经尽可能的为你分配均衡。

我们先将 Current partition replica assignment 的内容备份,以便回滚到原来的分区分配状态。

然后将 Proposed partition reassignment configuration 的内容拷贝到一个新的文件中(文件名称、格式任意,但要保证内容为json格式)。

1
2
3
> cat partition-replica-reassignment.json
{"version":1,"partitions":[
{"topic":"create17","partition":2,"replicas":[200],"log_dirs":["any"]},{"topic":"create17","partition":7,"replicas":[202],"log_dirs":["any"]},{"topic":"create17","partition":4,"replicas":[202],"log_dirs":["any"]},{"topic":"create17","partition":1,"replicas":[202],"log_dirs":["any"]},{"topic":"create17","partition":6,"replicas":[201],"log_dirs":["any"]},{"topic":"create17","partition":3,"replicas":[201],"log_dirs":["any"]},{"topic":"create17","partition":0,"replicas":[201],"log_dirs":["any"]},{"topic":"create17","partition":5,"replicas":[200],"log_dirs":["any"]}]}

3、通过 --reassignment-json-file 参数,执行分区分配策略 --execute

1
./bin/kafka-reassign-partitions.sh --zookeeper cdh-worker-1:2181/kafka --reassignment-json-file partition-replica-reassignment.json --execute

4、通过 --reassignment-json-file 参数,检查分区分配进度 --verify

1
./bin/kafka-reassign-partitions.sh --zookeeper cdh-worker-1:2181/kafka --reassignment-json-file partition-replica-reassignment.json --verify

我们再来看一下 topic : create17 的详细信息,broker 200 上有两个分区、broker 201、202 上分别有三个分区:

三、增大分区副本数

1、增加各 partition 所属的 replicas broker id

修改 partition-replica-reassignment.json 文件,增加各 partition 所属的 replicas broker id 。

1
2
{"version":1,"partitions":[
{"topic":"create17","partition":6,"replicas":[201,200,202]},{"topic":"create17","partition":2,"replicas":[201,200,202]},{"topic":"create17","partition":4,"replicas":[201,200,202]},{"topic":"create17","partition":5,"replicas":[201,200,202]},{"topic":"create17","partition":0,"replicas":[201,200,202]},{"topic":"create17","partition":3,"replicas":[201,200,202]},{"topic":"create17","partition":1,"replicas":[201,200,202]},{"topic":"create17","partition":7,"replicas":[201,200,202]}]}

2、通过 --reassignment-json-file 参数,执行分区副本分配策略 --execute

1
./bin/kafka-reassign-partitions.sh --zookeeper cdh-worker-1:2181/kafka --reassignment-json-file partition-replica-reassignment.json --execute

然后,查看一下 topic:create17 ,发现:Replicas 列表正如上述 partition-replica-reassignment.json 描述的一样。

** 在创建一个topic时,kafka尽量将partition均分在所有的brokers上,并且将replicas也均分在不同的broker上。**
每个partitiion的所有replicas叫做"assigned replicas",“assigned replicas"中的第一个replicas叫"preferred replica”,一般刚创建的topic"preferred replica"是leader。leader replica负责所有的读写。
但随着时间推移,broker可能会停机,会导致leader迁移,导致机群的负载不均衡。我们期望对topic的leader进行重新负载均衡,让partition选择"preferred replica"做为leader。

但根据上述图片示例,很明显,Replicas 列表分配不均。我们可以使用 --generate 参数再生成一份分配相对均匀的分区副本策略,这样就不用我们自己排列组合了。

最后得到的分区副本策略是这样的:

1
2
{"version":1,"partitions":[
{"topic":"create17","partition":2,"replicas":[200,202,201],"log_dirs":["any","any","any"]},{"topic":"create17","partition":7,"replicas":[202,201,200],"log_dirs":["any","any","any"]},{"topic":"create17","partition":4,"replicas":[202,200,201],"log_dirs":["any","any","any"]},{"topic":"create17","partition":1,"replicas":[202,201,200],"log_dirs":["any","any","any"]},{"topic":"create17","partition":6,"replicas":[201,200,202],"log_dirs":["any","any","any"]},{"topic":"create17","partition":3,"replicas":[201,202,200],"log_dirs":["any","any","any"]},{"topic":"create17","partition":0,"replicas":[201,200,202],"log_dirs":["any","any","any"]},{"topic":"create17","partition":5,"replicas":[200,201,202],"log_dirs":["any","any","any"]}]}

再执行一下 --execute 操作,最后的 topic:create17 的详细信息为:

这样的话,假如 broker 202 挂了的话,分区1、7 的 Leader replica 会变为 201;分区4的 Leader replica 会变为 200 ,比上面咱们手动生成的策略要好。

四、小结

1、本文介绍了使用 Kafka 自带的 kafka-reassign-partitions.sh 脚本工具完成对 topic 的分区分配、分区副本增加操作。该脚本有三个参数:

  • –generate:配合着 --topics-to-move-json-file 可以生成分区分配策略,该参数适用于分区多的情况。
  • –execute:配合着 --reassignment-json-file 可以执行分区分配策略。
  • –verify:配合着 --reassignment-json-file 可以检查分区分配进度。

通过以上命令,是既可以分配分区,也可以增加分区副本数,非常方便。

2、也简单介绍了 kafka preferred replica ,它是 “assigned replicas” 中的第一个 replica 。当 kafka leader replica 挂掉的话, partition 会选择 “preferred replica” 做为 leader replica 。

抛一个小疑问,kafka leader replica 如果不挂掉的话,如何选择某 replica 为指定 leader 呢?我们下一篇答案揭晓。

‍‍‍‍‍‍‍‍‍‍好了,本篇 kafka 实操就到这里结束啦,感觉有帮助的朋友,希望能点个赞哦!