生产者参数配置
# 内存缓冲大小,单位 byte
buffer.memory=33554432
在 Producer 端用来存放尚未发送出去的 Message 的缓冲区大小,默认 32MB。
内存缓冲区内的消息以 batch 的形式组织,每个 batch 内包含单个分区的多条消息(一个 RecordBatch 对应一个 topic partition)。
如果多个 batch 同属于一个 broker,Producer 发送时会尽量把多个 batch 打包成一个 request 发送到 kafka broker 上。
# 内存缓冲区满后是否阻塞,`0.11.0.0` 版本之后废弃
block.on.buffer.full=false
内存缓冲区满了之后可以选择阻塞发送或抛出异常,由 block.on.buffer.full 的配置来决定(0.11.0.0 版本之后废弃)。
max.block.ms=60000
设置 Producer 的 send(), partitionsFor() 等方法的最多阻塞时长。
batch.size=16384
Producer 会把同一个 topic partition 的多个消息合并为一个 RecordBatch,batch.size 指明了合并后 batch 大小的上限。
如果这个值设置的太小,可能会导致所有的消息都不进行合并。
增加 batch.size 会增大吞吐量,但是同时也会增加延迟。
linger.ms=0
producer 合并的消息的大小未达到 batch.size,但如果存在时间达到 linger.ms,也会进行发送。
增加 linger.ms 可能会增加吞吐量,但同时也会增加延迟。
# 最大请求大小
max.request.size
Producer 发送生产请求的最大大小,同时也限制了单条消息的最大大小
# 压缩类型
compression.type=none
默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。
acks=1
这个配置可以设定发送消息后是否需要 Broker 端返回确认,可以理解为需要确认的副本数:
- 0:表示 producer 请求发出后立即返回,不需要等待 leader 的任何确认
- 1:表示 producer 发出请求后,leader 需要将 producer 请求消息写入后向 producer 返回成功响应,之后 producer 请求才确认成功并返回
- -1/all:表示 producer 发出请求后,leader 需要将 producer 请求消息写入并等待所有 ISR 副本同步后,向 producer 返回成功响应,之后 producer 请求才确认成功并返回。当 ISR 副本少于
min.insync.replicas设置的值时,producer 在此情况会报NotEnoughReplicas异常。
从上到下的设置,可靠性依次增强,吞吐量依次降低,延迟依次增加。
# 发送失败重试次数
retries=2147483647
# 每次重试间隔时间
retries.backoff.ms=100
# 请求-响应超时时间,应该大于服务端的 replica.lag.time.max.ms
request.timeout.ms=30000
不丢失数据保证
block.on.buffer.full = true
生产者消息在实际发送之前是保留在 buffer 中,buffer 满之后生产等待,而不是抛出异常(0.11.0.0 之后被移除)
# default 60000
max.block.ms=Long.MAX_VALUE
持续阻塞(包含 buffer 满阻塞)
acks=all
所有 follower 都响应后才认为消息提交成功(需要注意 broker 的 min.insync.replicas 参数)
retries=Integer.MAX_VALUE
发送失败后持续重试(单独设置这个可能会造成消息重复发送)
max.in.flight.requests.per.connection=1
单个线程在单个连接上能够发送的未响应请求个数,这个参数设置为 1 可以避免消息乱序,同时可以保证在 retry 是不会重复发送消息,但是会降低 producer io 线程的吞吐量
unclean.leader.election.enable=false
关闭 unclean leader 选举,即不允许非 ISR 中的副本被选举为 leader
幂等性
开启幂等写配置:
enable.idempotence
示例:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true"); // 开启幂等写
props.put("acks", "all"); // 当 enable.idempotence 为 true,这里默认为 all
KafkaProducer producer = new KafkaProducer(props);
kafka 可以通过设置 ack, retries 等参数保证消息不丢失,但是无法消息不重复(即 at least once)。
幂等性解决消息重复的问题,即多次发送同一条消息到 server 端,server 只会记录一次,之后重复发送的消息会被丢弃。
为了判断消息是否重复,Kafka 使用 producer_id + sequence_number 标记每条消息,由 topic partiton 所在 leader 进行判断并去重。
每个 producer 在初始化时,会向 server 端申请一个唯一的 producer_id。之后发送的每条消息,都会关联一个从 0 开始递增的 sequence_number,每个 topic partition 都会维护一个单独的 sequence_number。
因为每次 producer 初始化都会申请新的 producer_id,且 sequence_number 是分区维度的,所以只能保证单个会话,单个 partition 的幂等性,重复发送数据时 exactly once
事务性
生产者事务 id
transactional.id
设置事务 id 后,enable.idempotence 默认开启
kafka 对日志文件格式进行扩展,日志中分为 普通消息 和 控制消息(control batch),控制消息用来标记事务被成功提交还是被终止。
kafka 事务特性主要用于 2 种场景:
- 将多条消息的发送动作封装在一个事务中,形成原子操作,多条消息要么都发送成功,要么都发送失败。
- consume-transform-produce loop,将 消费消息-处理消息-发送消息 封装在一个事务中,形成原子操作。常见于流式处理应用,从一个上游接收消息,经过处理后发送给下游。
kafka 事务的实现原理是把全部消息都追加到分区日志中,并将未完成事务的消息标记为未提交。一旦事务提交,这些标记就会被改为已提交。
相关配置
# 事务超时时间,默认 1 分钟
# transaction coordinator 在主动终止正在进行的事务前,等待来自生产者的事务状态更新的最长时间
# 不能大于服务端的 transaction.max.timeout.ms 设置
transaction.timeout.ms=60000
场景 1
示例代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "ProducerTranscationnalExample");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
// 获取 PID
// 增加 PID 的 epoch
// 回滚这个 transactionId 之前的实例留下的未完成的事务
producer.initTransactions();
try {
// 开始一个事务
// 只会记录在 producer 本地状态,transaction coordinator 不会感知这个操作
producer.beginTransaction();
producer.send(new ProducerRecord(topic, "0", "msg test"));
producer.send(new ProducerRecord(topic, "1", "msg test"));
producer.send(new ProducerRecord(topic, "2", "msg test"));
producer.commitTransaction();
} catch (ProducerFencedException e1) {
// 已经有另一个活跃的 producer 在使用相同的 transactionId 了
e1.printStackTrace();
} catch (KafkaException e2) {
e2.printStackTrace();
producer.abortTransaction();
} finally {
producer.close();
}
- 原子性:事务保证多个写操作要么全部成功,要么全部失败
- 僵死进程:开启事务的 producer 会向 Transaction 申请一个 producer id,transaction id 与 producer id 一一对应,每个 producer id 对应一个 epoch,当新的 producer 使用相同的 transaction id 开启事务后,会获得相同的 producer id 和更高的 epoch,此时服务端可以根据 epoch 区分新旧 producer,旧的 producer 将不能写入消息。
场景 2
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("transactional.id", "test-transactional");
producerProps.put("acks", "all");
KafkaProducer producer = new KafkaProducer(producerProps);
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
consumerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
consumerProps.put("group.id", groupId);
consumerProps.put("enable.auto.commit", "false");
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer consumer = new KafkaConsumer(consumerProps);
consumer.subscribe(Collections.singleton("source_topic"));
producer.initTransactions();
try {
ConsumerRecords records = consumer.poll();
if (!records.isEmpty()) {
producer.beginTransaction();
// 处理消息
List<ProducerRecord> outputRecords = processRecords(records)
for (ProducerRecord ouputRecord : outputRecords) {
producer.send(outputRecord)
}
// 由 producer 提交 offset
producer.sendOffsetsToTransaction();
// 完成一个批次的 消费-处理-生产,提交事务
producer.commitTransaction();
}
} catch (ProducerFencedException e1) {
e1.printStackTrace();
producer.close();
} catch (KafkaException e2) {
e2.printStackTrace();
producer.abortTransaction();
}
producer.close();
- https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
参考
- kafka 生产请求协议:https://kafka.apache.org/protocol.html#The_Messages_Produce
- kafka 消息格式:https://kafka.apache.org/documentation/#messageformat