Kafka命令行常用命令(2)
说明:
在扩大kafka集群时,我们需要满足两个要求:
- 将指定的topic转移到集群中添加的node上。
- 将指定的topicpartition转移到新的node上。
1. 将topic转移到新的node上
假设现在一个kafka集群运行三个broker,broker.id依次为101,102,103,随后由于业务数据的突然增加,需要增加三个broker,broker.id依次为104,105,106.目的是使push。-token-将topic转移到新的node上。
1、migration脚本-push-token-topic.json文件的内容如下:
- {
- "topics":
- [
- {
- "topic":"push-token-topic"
- }
- ],
- "version":1
- }
2、执行脚本如下所示:
- root@localhost:$./bin/kafka-reassign-partitions.sh--192.168.zookeper.225:2183--topics-to-move-json-filemigration-push-token-topic.json--broker-list"104,105,106"
json脚本的生成分配partitions 恢复备份使用:
Current partition replica assignment
{"version":1,"partitions":[{"topic":"cluster-switch-topic","partition":10,"replicas":[8]},{"topic":"cluster-switch-topic","partition":5,"replicas":[4]},{"topic":"cluster-switch-topic","partition":3,"replicas":[5]},{"topic":"cluster-switch-topic","partition":4,"replicas":[5]},{"topic":"cluster-switch-topic","partition":9,"replicas":[5]},{"topic":"cluster-switch-topic","partition":1,"replicas":[5]},{"topic":"cluster-switch-topic","partition":11,"replicas":[4]},{"topic":"cluster-switch-topic","partition":7,"replicas":[5]},{"topic":"cluster-switch-topic","partition":2,"replicas":[4]},{"topic":"cluster-switch-topic","partition":0,"replicas":[4]},{"topic":"cluster-switch-topic","partition":6,"replicas":[4]},{"topic":"cluster-switch-topic","partition":8,"replicas":[4]}]}
重新分配parttions的json脚本如下:
migration-topic-cluster-switch-topic.json
{"version":1,"partitions":[{"topic":"cluster-switch-topic","partition":10,"replicas":[5]},{"topic":"cluster-switch-topic","partition":5,"replicas":[4]},{"topic":"cluster-switch-topic","partition":4,"replicas":[5]},{"topic":"cluster-switch-topic","partition":3,"replicas":[4]},{"topic":"cluster-switch-topic","partition":9,"replicas":[4]},{"topic":"cluster-switch-topic","partition":1,"replicas":[4]},{"topic":"cluster-switch-topic","partition":11,"replicas":[4]},{"topic":"cluster-switch-topic","partition":7,"replicas":[4]},{"topic":"cluster-switch-topic","partition":2,"replicas":[5]},{"topic":"cluster-switch-topic","partition":0,"replicas":[5]},{"topic":"cluster-switch-topic","partition":6,"replicas":[5]},{"topic":"cluster-switch-topic","partition":8,"replicas":[5]}]}
3、执行:
- root@localhost:$bin/kafka-reassign-partitions.sh--192.168.zookeper.225:2183--reassignment-json-filemigration-topic-cluster-switch-topic.json--execute
一个json格式文件expand将在执行后生成。-cluster-reassignment.json
4、查询执行状态:
- bin/kafka-reassign-partitions.sh--192.168.zookeper.225:2183
不需要partion就可以在正常执行后回到当前的数据迁移状态,信息状态与下面相似。
- Reassignmentofpartition[push-token-topic,0]completedsuccessfully//成功移动
- Reassignmentofpartition[push-token-topic,1]isinprogress///代表数据在移动过程中的数据。
- Reassignmentofpartition[push-token-topic,2]isinprogress
- Reassignmentofpartition[push-token-topic,1]completedsuccessfully
- Reassignmentofpartition[push-token-topic,2]completedsuccessfully
这样做不会影响原来集群中的topic业务。
2.修改topic(replicats-factor)副本数量
假设push是初始的-token-topic是一个副本,为了提高可用性,需要改为两个副本。
replicass脚本-update-push-token-topic.json文件的内容如下:
{
"partitions":
[
{
"topic": "log.mobile_nginx",
"partition": 0,
"replicas": [101,102,104]
},
{
"topic": "log.mobile_nginx",
"partition": 1,
"replicas": [102,103,106]
}
],
"version":1
}
2、执行:
- root@localhost:$./bin/kafka-reassign-partitions.sh--192.168.zookeper.225:2183
3、verify
- bin/kafka-reassign-partitions.sh--192.168.zookeper.225:2181
如下:
Status of partition reassignment: Reassignment of partition [log.mobile_nginx,0] completed successfully Reassignment of partition [log.mobile_nginx,1] completed successfully
自定义分区和转移
1、The first step is to hand craft the custom reassignment plan in a json file-
> cat custom-reassignment.json {"version":1,"partitions":[{"topic":"foo1""partition":0,"replicas":[5,6]},{"topic":"foo2""partition":1,"replicas":[2,3]}]}
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1""partition":0,"replicas":[1,2]}, {"topic":"foo2""partition":1,"replicas":[3,4]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1""partition":0,"replicas":[5,6]}, {"topic":"foo2""partition":1,"replicas":[2,3]}] }
3、The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo2,1] completed successfully
4.分区扩展用法topic
a.首先扩大分区数量,脚本如下:
比如:push-token-最初的topic分区数为12个,目前已增至15个。
root@localhost:$ ./bin/kafka-topics.sh --zookeeper 192.168.2.225:2183 --alter --partitions 15 --topic push-token-topic
b.设置topic分区副本
root@localhost:$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183
--reassignment-json-file partitions-extension-push-token-topic.json --execute
脚本partitions-extension-push-token-topic.json文件的内容如下:
{
"partitions":
[
{
"topic": "push-token-topic",
"partition": 12,
"replicas": [101,102]
},
{
"topic": "push-token-topic",
"partition": 13,
"replicas": [103,104]
},
{
"topic": "push-token-topic",
"partition": 14,
"replicas": [105,106]
}
],
"version":1
}
本文仅代表作者观点,版权归原创者所有,如需转载请在文中注明来源及作者名字。
免责声明:本文系转载编辑文章,仅作分享之用。如分享内容、图片侵犯到您的版权或非授权发布,请及时与我们联系进行审核处理或删除,您可以发送材料至邮箱:service@tojoy.com