跳转至

kafka 基础操作

预设环境变量

预设置环境变量,方便操作:

# 将 kafka 命令脚本路径加入到 PATH
export KAFKA_HOME=/usr/local/kafka
export PATH="$PATH:${KAFKA_HOME}/bin"

# zk 连接地址
export ZK_CONNECT="$(hostname):2181"

# kafka 连接地址
export BOOTSTRAP_SERVER="$(hostname):9092"

# 如果有 jaas 认证
export KAFKA_OPTS="-Djava.security.auth.login.config=${KAFKA_HOME}/config/kafka_server_jaas.conf"

# 如果 broker 通过在 kafka-run-class.sh 文件内设置 JMX_PORT,则这里需要设置成不同的 port
# (一般 broker 开启 JMX_PORT 最好在 kafka-server-start.sh 文件内设置,kafka-run-class.sh 文件内的修改会影响到所有命令脚本)
# export JMX_PORT=9997

基本操作

topic

创建 topic

kafka-topics.sh --zookeeper ${ZK_CONNECT} --create --replication-factor 3 --partitions 3 --topic __test

kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --create --replication-factor 3 --partitions 3 --topic __test

删除 topic

kafka-topics.sh --zookeeper ${ZK_CONNECT} --delete --topic __test

kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --delete --topic __test

topic 列表

kafka-topics.sh --zookeeper ${ZK_CONNECT} --list

kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --list

topic 详情

kafka-topics.sh --zookeeper ${ZK_CONNECT} --describe --topic test

kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --describe --topic __test

修改 topic 分区数

kafka-topics.sh --zookeeper ${ZK_CONNECT} --alter --topic __test --partitions 5

kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --alter --topic __test --partitions 5

生产/消费

生产 消息

kafka-console-producer.sh --broker-list ${BOOTSTRAP_SERVER} --topic __test

kafka-console-producer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic __test

消费 消息

kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic __test --from-beginning

kafka-console-consumer.sh --property print.timestamp=true --property print.key=true \
    --bootstrap-server ${BOOTSTRAP_SERVER} --group test_group --topic test --from-beginning

consumer

consumer 列表

# 记录在 zookeeper 中的消费组(2.x.x 版本以上废弃)
kafka-consumer-groups.sh --zookeeper ${ZK_CONNECT} --list

# 记录在 __consumer_offsets 中的消费组,Kafka 版本 <= 0.9.x.x
kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --list --new-consumer

# 记录在 __consumer_offsets 中的消费组,Kafka 版本 > 0.9.x.x
kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --list

consumer 详情

# 记录在 zookeeper 中的消费组(2.x.x 版本以上废弃)
kafka-consumer-groups.sh --zookeeper ${ZK_CONNECT} --describe --group $group

# 记录在 __consumer_offsets 中的消费组,Kafka 版本 <= 0.9.x.x
kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER}  --new-consumer --describe --group $group

# 记录在 __consumer_offsets 中的消费组,Kafka 版本 > 0.9.x.x
kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --describe --group $group

kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --describe --group my-group --members

kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --describe --group my-group --members --verbose

kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --describe --group my-group --state

消费者选项

  • 使用 zookeeper 保存消费组数据(2.x.x 版本以上废弃): --zookeeper localhost:2181
  • 使用 __consumer_offsets 保存消费组数据: --bootstrap-server localhost:9092
  • 指定 group 名:
    • --group group1
    • --consumer-property group.id=group1
  • 指定 topic:
    • --topic foo
    • --whitelist ".*"
  • 指定 partition:
    • --partition 0
  • 指定 offset:
    • --from-beginning
    • --offset 3418783
  • 消费多少条消息:
    • --max-messages 10

ZK Group(2.x.x 版本以上废弃)

kafka-console-consumer.sh --zookeeper ${ZK_CONNECT} --topic test --from-beginning --group group1

kafka-console-consumer.sh --zookeeper ${ZK_CONNECT} --topic test --consumer-property group.id=group1

KF Group

kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic test

kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --whitelist ".*"

property

kafka-console-consumer.sh --property print.timestamp=true --property print.key=true --bootstrap-server ${BOOTSTRAP_SERVER} --topic __test --from-beginning

从指定 partition, offset 开始消费

kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic logs --partition 0 --offset 3418783

从指定 partition, offset 开始消费指定数量消息:

kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic logs --partition 7 --offset 1340190464 --max-messages 10

broker 参数(动态)

查看参数:

kafka-configs.sh --bootstrap-server ${BOOTSTRAP_SERVER} --entity-type brokers --describe

kafka-configs.sh --bootstrap-server ${BOOTSTRAP_SERVER} --entity-type brokers --entity-name 0 --describe

设置副本同步的限流参数:

kafka-configs.sh --bootstrap-server ${BOOTSTRAP_SERVER} --entity-type brokers --entity-name 0 \
    --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024"

删除副本同步限流参数:

kafka-configs.sh --zookeeper ${ZK_CONNECT} -entity-type brokers  --entity-name 0 \
    --alter --delete-config 'leader.replication.throttled.rate,follower.replication.throttled.rate'

kafka-configs.sh --bootstrap-server ${BOOTSTRAP_SERVER} -entity-type brokers --entity-name 0 \
    --alter --delete-config 'leader.replication.throttled.rate,follower.replication.throttled.rate'

设置集群级别的参数

kafka-configs.sh --bootstrap-server ${BOOTSTRAP_SERVER} --entity-type brokers --entity-default \
    --alter --add-config 'log.cleaner.threads=2'

查看集群级别的参数

kafka-configs.sh --bootstrap-server ${BOOTSTRAP_SERVER} --entity-type brokers --entity-default --describe

topic 参数

查看参数:

kafka-configs.sh --bootstrap-server ${BOOTSTRAP_SERVER} --entity-type topics --describe

kafka-configs.sh --bootstrap-server ${BOOTSTRAP_SERVER} --entity-type topics --entity-name __test --describe

修改消息大小限制:

$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --entity-type topics --entity-name __test \
    --alter --add-config max.message.bytes=4194304

修改消息保留时长:

$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --entity-type topics --entity-name __test \
    --alter --add-config retention.ms=259200000

修改 __consumer_offsets 保留策略:

$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --entity-type topics --entity-name __consumer_offsets --describe

$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --entity-type topics --entity-name __consumer_offsets \
    --alter --delete-config cleanup.policy

$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --entity-type topics --entity-name __consumer_offsets \
    --alter --add-config retention.ms=2592000000
$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --entity-type topics --entity-name __consumer_offsets \
    --alter --add-config cleanup.policy=delete

$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --entity-type topics --entity-name __consumer_offsets \
    --alter --delete-config retention.ms
$ kafka-configs.sh --zookeeper ${ZK_CONNECT} --entity-type topics --entity-name __consumer_offsets \
    --alter --add-config cleanup.policy=compact

设置副本同步流量限制:

kafka-configs.sh --zookeeper ${ZK_CONNECT} --entity-type topics --entity-name test-throttled \
    --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"

删除副本同步流量限制:

kafka-configs.sh --zookeeper ${ZK_CONNECT} -entity-type topics --entity-name test-throttled \
    --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas'

选举 leader

触发集群内所有 topic partition 的最优 leader 选举:

$ kafka-preferred-replica-election.sh --zookeeper ${ZK_CONNECT}

触发 partitions.json 文件指定的 topic partition 的最优 leader 选举:

kafka-preferred-replica-election.sh --zookeeper ${ZK_CONNECT} --path-to-json-file partitions.json

partitions.json 文件内容如下:

{
    "partitions": [
        {
            "partition": 1,
            "topic": "__test"
        }
    ]
}

从 2.4.0 版本开始,推荐使用 kafka-leader-election.sh 触发选举

可以通过 --topic, --partition 参数指定 topic, partition,通过 --election-type 指定选举类型为 preferred/unclean

$ kafka-leader-election.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic <topic> --partition <partition> --election-type preferred

也可以通过 --path-to-json-file 指定文件包含的 topic partition 的最优 leader 选举

$ kafka-leader-election.sh --bootstrap-server ${BOOTSTRAP_SERVER} --path-to-json-file partitions.json --election-type preferred

删除消费组

ZK 类型

$ kafka-consumer-groups.sh --zookeeper ${ZK_CONNECT} --delete --group console-consumer-38645

KF 类型

$ kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --delete --group console-consumer-97214

查看 topic offset

最终的 offset

$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${BOOTSTRAP_SERVER} --time -1 --topic test

最早的 offset

$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ${BOOTSTRAP_SERVER} --time -2 --topic test

设置 consumer current offset

重置 offset 到最新位置:

kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER}  --reset-offsets --to-latest --group $group --topic $topic --execute

设置到指定的 offset:

kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --group $group --reset-offsets --to-offset 6250 --topic $topic --execute

根据时间设置,设置到大于等于该时间的第一个 offset:

kafka-consumer-groups.sh --bootstrap-server ${BOOTSTRAP_SERVER} --group $group --reset-offsets --to-datetime 2019-12-12T16:59:59.000 --topic $topic --execute

读取 __consumer_offsets

0.11.0.0 之前版本

$ kafka-console-consumer.sh --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --zookeeper ${ZK_CONNECT} --topic __consumer_offsets

0.11.0.0 之后版本(含)

$ kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server ${BOOTSTRAP_SERVER} --topic __consumer_offsets

格式:

[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

分区规则:

Math.abs(groupID.hashCode()) % numPartitions

查看日志/索引文件

查看日志文件

$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000283198.log --print-data-log

查看索引文件

$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 0000000000000045.timeindex

查看请求使用的 API Version

$ kafka-broker-api-versions.sh  --bootstrap-server ${BOOTSTRAP_SERVER}

查看副本同步 lag

kafka-replica-verification.sh --broker-list ${BOOTSTRAP_SERVER}

kafka-replica-verification.sh --broker-list ${BOOTSTRAP_SERVER} --topic-white-list .*

获取 broker topic 分区实际目录分布

kafka-log-dirs.sh --bootstrap-server localhost:9092  --describe [--broker-list "0,1,2"] [--topic-list "t1,t2"]