跳转至

kafka 基础操作

预设环境变量

预设置环境变量,方便操作:

# 将 kafka 命令脚本路径加入到 PATH
export KAFKA_HOME=/usr/local/kafka
export PATH="$PATH:${KAFKA_HOME}/bin"

# zk 连接地址
export ZK_CONNECT="$(hostname):2181"

# kafka 连接地址
export BOOTSTRAP_SERVER="$(hostname):9092"

# 如果有 jaas 认证
export KAFKA_OPTS="-Djava.security.auth.login.config=${KAFKA_HOME}/config/kafka_server_jaas.conf"

# 如果 broker 通过在 kafka-run-class.sh 文件内设置 JMX_PORT,则这里需要设置成不同的 port
# (一般 broker 开启 JMX_PORT 最好在 kafka-server-start.sh 文件内设置,kafka-run-class.sh 文件内的修改会影响到所有命令脚本)
# export JMX_PORT=9997

基本操作

topic

创建 topic

kafka-topics.sh --zookeeper ${ZK_CONNECT} --create --replication-factor 3 --partitions 3 --topic __test

kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --create --replication-factor 3 --partitions 3 --topic __test

删除 topic

kafka-topics.sh --zookeeper ${ZK_CONNECT} --delete --topic __test

kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --delete --topic __test

topic 列表

kafka-topics.sh --zookeeper ${ZK_CONNECT} --list

kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --list

topic 详情

kafka-topics.sh --zookeeper ${ZK_CONNECT} --describe --topic test

kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --describe --topic __test

修改 topic 分区数

kafka-topics.sh --zookeeper ${ZK_CONNECT} --alter --topic __test --partitions 5

kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --alter --topic __test --partitions 5

生产/消费

生产 消息

kafka-console-producer.sh --broker-list ${BOOTSTRAP_SERVER} --topic __test

kafka-console-producer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic __test

消费 消息

kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic __test --from-beginning

kafka-console-consumer.sh --property print.timestamp=true --property print.key=true \
    --bootstrap-server ${BOOTSTRAP_SERVER} --group test_group --topic test --from-beginning

consumer

consumer 列表

# 记录在 zookeeper 中的消费组(2.x.x 版本以上废弃)
kafka-consumer-groups.sh --zookeeper ${ZK_CONNECT} --list

# 记录在 __consumer_offsets 中的消费组,Kafka 版本 <= 0.9.x.x
kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --list --new-consumer

# 记录在 __consumer_offsets 中的消费组,Kafka 版本 > 0.9.x.x
kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --list

consumer 详情

# 记录在 zookeeper 中的消费组(2.x.x 版本以上废弃)
kafka-consumer-groups.sh --zookeeper ${ZK_CONNECT} --describe --group $group

# 记录在 __consumer_offsets 中的消费组,Kafka 版本 <= 0.9.x.x
kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER}  --new-consumer --describe --group $group

# 记录在 __consumer_offsets 中的消费组,Kafka 版本 > 0.9.x.x
kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --describe --group $group

kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --describe --group my-group --members

kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --describe --group my-group --members --verbose

kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --describe --group my-group --state

消费者选项

  • 使用 zookeeper 保存消费组数据(2.x.x 版本以上废弃): --zookeeper localhost:2181
  • 使用 __consumer_offsets 保存消费组数据: --bootstrap-server localhost:9092
  • 指定 group 名:
    • --group group1
    • --consumer-property group.id=group1
  • 指定 topic:
    • --topic foo
    • --whitelist ".*"
  • 指定 partition:
    • --partition 0
  • 指定 offset:
    • --from-beginning
    • --offset 3418783
  • 消费多少条消息:
    • --max-messages 10

ZK Group(2.x.x 版本以上废弃)

kafka-console-consumer.sh --zookeeper ${ZK_CONNECT} --topic test --from-beginning --group group1

kafka-console-consumer.sh --zookeeper ${ZK_CONNECT} --topic test --consumer-property group.id=group1

KF Group

kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic test

kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --whitelist ".*"

property

kafka-console-consumer.sh --property print.timestamp=true --property print.key=true --bootstrap-server ${BOOTSTRAP_SERVER} --topic __test --from-beginning

从指定 partition, offset 开始消费

kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic logs --partition 0 --offset 3418783

从指定 partition, offset 开始消费指定数量消息:

kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic logs --partition 7 --offset 1340190464 --max-messages 10

broker 参数(动态)

查看参数:

kafka-configs.sh --describe --bootstrap-server ${BOOTSTRAP_SERVER} --entity-type brokers

设置副本同步的限流参数:

kafka-configs.sh --bootstrap-server ${BOOTSTRAP_SERVER} --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0

kafka-configs.sh --bootstrap-server ${BOOTSTRAP_SERVER} --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1

kafka-configs.sh --bootstrap-server ${BOOTSTRAP_SERVER} --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2

删除副本同步限流参数:

kafka-configs.sh --zookeeper ${ZK_CONNECT} -entity-type brokers --alter --delete-config 'leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0

topic 参数

查看参数:

kafka-configs.sh --describe --bootstrap-server ${BOOTSTRAP_SERVER} --entity-type topics

修改消息保留大小:

$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --alter --entity-type topics --entity-name __test --add-config max.message.bytes=4194304

修改消息保留时长:

$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --alter --entity-type topics --entity-name __test --add-config retention.ms=259200000

修改 __consumer_offsets 保留策略:

$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --describe --entity-type topics --entity-name __consumer_offsets

$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --alter --entity-type topics --entity-name __consumer_offsets --delete-config cleanup.policy

$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --alter --entity-type topics --entity-name __consumer_offsets --add-config retention.ms=2592000000
$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --alter --entity-type topics --entity-name __consumer_offsets --add-config cleanup.policy=delete

$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --alter --entity-type topics --entity-name __consumer_offsets --delete-config retention.ms
$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --alter --entity-type topics --entity-name __consumer_offsets --add-config cleanup.policy=compact

设置副本同步流量限制:

kafka-configs.sh --zookeeper ${ZK_CONNECT} --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"

删除副本同步流量限制:

kafka-configs.sh --zookeeper ${ZK_CONNECT} -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttled

重新平衡 leader

触发集群内所有 topic partition 的最优 leader 选举:

$ kafka-preferred-replica-election.sh --zookeeper ${ZK_CONNECT}

触发 partitions.json 文件指定的 topic partition 的最优 leader 选举:

kafka-preferred-replica-election.sh --zookeeper ${ZK_CONNECT} --path-to-json-file partitions.json

partitions.json 文件内容如下:

{
    "partitions": [
        {
            "partition": 45,
            "topic": "a8a3cbf02a7e4aa7b1b52ab5297f9066__sku2ava_rec"
        }
    ]
}

删除消费组

KF 类型

$ kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --delete --group console-consumer-97214

ZK 类型

$ kafka-consumer-groups.sh --zookeeper ${ZK_CONNECT} --delete --group console-consumer-38645

查看 topic offset

最终的 offset

$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${BOOTSTRAP_SERVER} --time -1 --topic test

最早的 offset

$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${BOOTSTRAP_SERVER} --time -2 --topic test

设置 consumer current offset

重置 offset 到最新位置:

kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER}  --reset-offsets --to-latest --group $group --topic $topic --execute

设置到指定的 offset:

kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --group $group --reset-offsets --to-offset 6250 --topic $topic --execute

根据时间设置,设置到大于等于该时间的第一个 offset:

kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --group $group --reset-offsets --to-datetime 2019-12-12T16:59:59.000 --topic $topic --execute

读取 __consumer_offsets

0.11.0.0 之前版本

$ kafka-console-consumer.sh --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --zookeeper ${ZK_CONNECT} --topic __consumer_offsets

0.11.0.0 之后版本(含)

$ kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server ${BOOTSTRAP_SERVER} --topic __consumer_offsets

格式:

[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

分区规则:

Math.abs(groupID.hashCode()) % numPartitions

查看日志/索引文件

查看日志文件

$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000283198.log --print-data-log

查看索引文件

$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 0000000000000045.timeindex

查看请求使用的 API Version

$ kafka-broker-api-versions.sh  --bootstrap-server ${BOOTSTRAP_SERVER}

查看副本同步 lag

kafka-replica-verification.sh --broker-list ${BOOTSTRAP_SERVER}

kafka-replica-verification.sh --broker-list ${BOOTSTRAP_SERVER} --topic-white-list .*

获取 broker topic 分区实际目录分布

kafka-log-dirs.sh --bootstrap-server localhost:9092  --describe [--broker-list "0,1,2"] [--topic-list "t1,t2"]
Back to top