3.2 SCRAM mechanisms
Salted Challenge Response Authentication Mechanism
与 PLAIN 机制类似,但在 zookeeper 中存储用户信息(默认在 /config/users
节点下),因此能够动态添加用户,对密码使用 sha 进行加密
broker 配置
创建 SCRAM 证书(设置用户名、密码)
export ZK_CONNECT="$(hostname):2181"
增加用户:
kafka-configs.sh --zookeeper ${ZK_CONNECT} --alter --add-config 'SCRAM-SHA-256=[password=*****],SCRAM-SHA-512=[password=*****]' --entity-type users --entity-name alice
kafka-configs.sh --bootstrap-server ${BOOTSTRAP_SERVER} --alter --add-config 'SCRAM-SHA-256=[password=*****],SCRAM-SHA-512=[password=*****]' --entity-type users --entity-name alice
描述用户:
kafka-configs.sh --zookeeper ${ZK_CONNECT} --describe --entity-type users --entity-name alice
删除用户:
kafka-configs.sh --zookeeper ${ZK_CONNECT} --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
kafka_server_jaas.conf
如果 broker 之间通过 SASL 通信,必须设置 broker 与其他 broker 进行连接的用户名和密码
增加 kafka_server_jaas.conf
文件:
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin_pass";
};
运行 kafka 服务时,需要将 JAAS 配置文件位置作为 JVM 参数:
-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf
可以修改 bin/kafka-server-start.sh
,增加以下内容:
if [[ -f /usr/local/kafka/config/kafka_server_jaas.conf ]]; then
export KAFKA_OPTS='-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf'
fi
server.properties
监听地址,这里同时使用 SASL_PLAINTEXT
与 SASL_SSL
,要注意使用 SASL_SSL
需要配置 SSL,与之前一致,这里不做说明
listeners=SASL_PLAINTEXT://:9094,SASL_SSL://:9095
advertised.listeners=SASL_PLAINTEXT://192.168.16.5:9094,SASL_SSL://180.76.140.179:9095
SASL 使用的机制:
sasl.enabled.mechanisms=SCRAM-SHA-256
# 这里也可以配置多个
# sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
broker 之间如果要使用 SASL 进行通信,需要设置:
# SASL_PLAINTEXT 或者 SASL_SSL
security.inter.broker.protocol=SASL_PLAINTEXT
# SCRAM-SHA-256 或者 SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
客户端配置
客户端连接时需要配置 kafka_client_jaas.conf
文件
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="alice"
password="alice_pass";
};
运行客户端时 kafka_client_jaas.conf
文件位置需要 作为 JVM 参数:
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
可以执行以下命令设置环境变量 KAFKA_OPTS
:
export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf'
这个环境变量会被 kafka-run-class.sh
脚本读取并在运行 Java 时当作 JVM 参数传入
增加 client.properties
文件:
security.protocol=SASL_PLAINTEXT
## 如果使用 SASL_SSL,需要同时配置 SSL
# security.protocol=SASL_SSL
#
# ssl.truststore.location=/etc/kafka/client.truststore.jks
# ssl.truststore.password=kafka1234
# ssl.endpoint.identification.algorithm=
# ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
# ssl.protocol = TLSv1.2
sasl.mechanism=SCRAM-SHA-512
运行 kafka-console-consumer.sh
kafka-console-consumer.sh --bootstrap-server 192.168.16.5:9094 --topic __test --from-beginning --consumer.config client.properties
生产者同理:
kafka-console-producer.sh --bootstrap-server 192.168.16.5:9094 --topic __test --producer.config client.properties