Kafka命令行常用命令(2)

2023-05-22


说明:


在扩大kafka集群时,我们需要满足两个要求:


  1. 将指定的topic转移到集群中添加的node上。
  2. 将指定的topicpartition转移到新的node上。

1. 将topic转移到新的node上


假设现在一个kafka集群运行三个broker,broker.id依次为101,102,103,随后由于业务数据的突然增加,需要增加三个broker,broker.id依次为104,105,106.目的是使push。-token-将topic转移到新的node上。


1、migration脚本-push-token-topic.json文件的内容如下:










  1. {
  2. "topics":
  3. [
  4. {
  5. "topic":"push-token-topic"
  6. }
  7. ],
  8. "version":1
  9. }





2、执行脚本如下所示:










  1. root@localhost:$./bin/kafka-reassign-partitions.sh--192.168.zookeper.225:2183--topics-to-move-json-filemigration-push-token-topic.json--broker-list"104,105,106"





json脚本的生成分配partitions 恢复备份使用:


Current partition replica assignment


{"version":1,"partitions":[{"topic":"cluster-switch-topic","partition":10,"replicas":[8]},{"topic":"cluster-switch-topic","partition":5,"replicas":[4]},{"topic":"cluster-switch-topic","partition":3,"replicas":[5]},{"topic":"cluster-switch-topic","partition":4,"replicas":[5]},{"topic":"cluster-switch-topic","partition":9,"replicas":[5]},{"topic":"cluster-switch-topic","partition":1,"replicas":[5]},{"topic":"cluster-switch-topic","partition":11,"replicas":[4]},{"topic":"cluster-switch-topic","partition":7,"replicas":[5]},{"topic":"cluster-switch-topic","partition":2,"replicas":[4]},{"topic":"cluster-switch-topic","partition":0,"replicas":[4]},{"topic":"cluster-switch-topic","partition":6,"replicas":[4]},{"topic":"cluster-switch-topic","partition":8,"replicas":[4]}]}


重新分配parttions的json脚本如下:


migration-topic-cluster-switch-topic.json


{"version":1,"partitions":[{"topic":"cluster-switch-topic","partition":10,"replicas":[5]},{"topic":"cluster-switch-topic","partition":5,"replicas":[4]},{"topic":"cluster-switch-topic","partition":4,"replicas":[5]},{"topic":"cluster-switch-topic","partition":3,"replicas":[4]},{"topic":"cluster-switch-topic","partition":9,"replicas":[4]},{"topic":"cluster-switch-topic","partition":1,"replicas":[4]},{"topic":"cluster-switch-topic","partition":11,"replicas":[4]},{"topic":"cluster-switch-topic","partition":7,"replicas":[4]},{"topic":"cluster-switch-topic","partition":2,"replicas":[5]},{"topic":"cluster-switch-topic","partition":0,"replicas":[5]},{"topic":"cluster-switch-topic","partition":6,"replicas":[5]},{"topic":"cluster-switch-topic","partition":8,"replicas":[5]}]}


3、执行:










  1. root@localhost:$bin/kafka-reassign-partitions.sh--192.168.zookeper.225:2183--reassignment-json-filemigration-topic-cluster-switch-topic.json--execute





一个json格式文件expand将在执行后生成。-cluster-reassignment.json




4、查询执行状态:








  1. bin/kafka-reassign-partitions.sh--192.168.zookeper.225:2183



不需要partion就可以在正常执行后回到当前的数据迁移状态,信息状态与下面相似。










  1. Reassignmentofpartition[push-token-topic,0]completedsuccessfully//成功移动
  2. Reassignmentofpartition[push-token-topic,1]isinprogress///代表数据在移动过程中的数据。
  3. Reassignmentofpartition[push-token-topic,2]isinprogress
  4. Reassignmentofpartition[push-token-topic,1]completedsuccessfully
  5. Reassignmentofpartition[push-token-topic,2]completedsuccessfully





这样做不会影响原来集群中的topic业务。


2.修改topic(replicats-factor)副本数量


假设push是初始的-token-topic是一个副本,为了提高可用性,需要改为两个副本。


replicass脚本-update-push-token-topic.json文件的内容如下:


{


"partitions":


[


{


"topic": "log.mobile_nginx",


"partition": 0,


"replicas": [101,102,104]


},


{


"topic": "log.mobile_nginx",


"partition": 1,


"replicas": [102,103,106]


}


],


"version":1


}


2、执行:










  1. root@localhost:$./bin/kafka-reassign-partitions.sh--192.168.zookeper.225:2183



3、verify








  1. bin/kafka-reassign-partitions.sh--192.168.zookeper.225:2181

如下:




Status of partition reassignment: Reassignment of partition [log.mobile_nginx,0] completed successfully Reassignment of partition [log.mobile_nginx,1] completed successfully




自定义分区和转移


1、The first step is to hand craft the custom reassignment plan in a json file-






> cat custom-reassignment.json {"version":1,"partitions":[{"topic":"foo1""partition":0,"replicas":[5,6]},{"topic":"foo2""partition":1,"replicas":[2,3]}]}




> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1""partition":0,"replicas":[1,2]}, {"topic":"foo2""partition":1,"replicas":[3,4]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1""partition":0,"replicas":[5,6]}, {"topic":"foo2""partition":1,"replicas":[2,3]}] }






3、The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option






bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo2,1] completed successfully






4.分区扩展用法topic


a.首先扩大分区数量,脚本如下:


比如:push-token-最初的topic分区数为12个,目前已增至15个。


root@localhost:$ ./bin/kafka-topics.sh --zookeeper 192.168.2.225:2183 --alter --partitions 15 --topic push-token-topic


b.设置topic分区副本


root@localhost:$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183


--reassignment-json-file partitions-extension-push-token-topic.json --execute


脚本partitions-extension-push-token-topic.json文件的内容如下:


{


"partitions":


[


{


"topic": "push-token-topic",


"partition": 12,


"replicas": [101,102]


},


{


"topic": "push-token-topic",


"partition": 13,


"replicas": [103,104]


},


{


"topic": "push-token-topic",


"partition": 14,


"replicas": [105,106]


}


],


"version":1


}


本文仅代表作者观点,版权归原创者所有,如需转载请在文中注明来源及作者名字。

免责声明:本文系转载编辑文章,仅作分享之用。如分享内容、图片侵犯到您的版权或非授权发布,请及时与我们联系进行审核处理或删除,您可以发送材料至邮箱:service@tojoy.com