控制方式

  • 信道加密 Encryption(SSL)
  • 认证 Authentication(SSL or SASL):控制 client/broker 之间的连接
    • SSL: clients authenticate to Kafka using SSL certificates
    • SASL(SASL_PLAINTEXT or SASL_SSL): kafka 支持一些 SASL 机制:
      • GSSAPI (Kerberos) - 从0.9.0.0版本开始
      • PLAIN - 从0.10.0.0版本开始: clients authenticate using username/pssword
      • SCRAM-SHA-256 和 SCRAM-SHA-512 - 从0.10.2.0版本开始
      • OAUTHBEARER - 从2.0版本开始
  • 授权 Authorisation(ACL):控制 host/producer/consumer 对 topic 的读写权限

kafka 提供以下 4 种 security.protocol:

  • PLAINTEXT
  • SSL
  • SASL_PLAINTEXT
  • SASL_SSL

PLAINTEXT

SSL

setup per-node certificate truststore/keystore for brokers & clients

Broker-side Client-side
listeners=SSL://127.0.0.1:6667  
inter.broker.protocol=SSL security.protocol=SSL

SASL_PLAINTEXT

Broker-side Client-side
listeners=SASL_PLAINTEXT://127.0.0.1:6667  
inter.broker.protocol=SASL_PLAINTEXT security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN | GSSAPI | SCRAM sasl.mechanism=PLAIN | GSSAPI | SCRAM-SHA-256 | SCRAM-SHA-512

SASL_SSL

Broker-side Client-side
listeners=SASL_PLAINTEXT://127.0.0.1:6667  
inter.broker.protocol=SASL_PLAINTEXT security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN / GSSAPI / SCRAM sasl.mechanism=PLAIN / GSSAPI / SCRAM-SHA-256 / SCRAM-SHA-512

解决的问题

  • Currently, any client can access your Kafka cluster (authentication)
  • The clients can publish / consumer any topic data (authorisation)
  • All the data being sent is fully visible on the network (encryption)

Authentication

SASL/PLAIN Authentication

1. 服务端

修改 config/server.properties

# 认证配置
## 不同协议端口可以配置多个
listeners=SASL_PLAINTEXT://ip:port
#listeners=PLAINTEXT://ip:port,SASL_PLAINTEXT://ip:port
## brokers 之间通信使用的协议,默认为 PLAINTEXT,可用 PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# ACL 配置
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
## 默认情况下,如果资源没有配置 acls 规则,只有 super user 可以访问该资源;这里改成 true,表示所有人都可访问
allow.everyone.if.no.acl.found=true
## 可以配置多个,以 `;` 分割
super.users=User:admin;User:alice

添加 config/kafka_server_jaas.conf 文件:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin_pass"
    user_admin="admin_pass"
    user_alice="alice_pass";
};
  • usernamepassword 是 broker 与其他 broker 建立连接所使用的使用的用户名和密码
  • 配置中的 user_admin="admin_pass" 的规则是 user_<username>="<password>",就是说配置了 2 个用户 adminalice,密码分别是 admin_passalice_pass,broker 用这些信息来验证客户端的连接,包括来自其他 broker 的连接

运行 kafka 服务时,需要将 JAAS 配置文件位置作为 JVM 参数:

-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf

可以修改 bin/kafka-server-start.sh,增加以下内容:

if [[ -f /usr/local/kafka/config/kafka_server_jaas.conf ]]; then
  export KAFKA_OPTS='-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf'
fi

2. 客户端

客户端连接时需要配置 kafka_client_jaas.conf 文件

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="alice"
  password="alice_pass";
};

然后在程序中添加配置:

// 添加环境变量,需要指定配置文件的路径
// 或者添加启动 JVM 参数 `-Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf`
System.setProperty("java.security.auth.login.config", "/etc/kafka/conf/kafka_client_jaas.conf");

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");

使用命令行时,在 jvm 中增加启动参数 java.security.auth.login.config,例如:修改 bin/kafka-console-consumer.sh

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"

增加配置文件 consumer.properties

group.id=__test
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

运行 kafka-console-consumer.sh

$ kafka-console-consumer.sh --bootstrap-server $(hostname):9092 --topic foo --from-beginning --consumer.config /etc/kafka/conf/consumer.properties

SASL/SCRAM Authentication

权限控制(Authorisation)

kafka 的权限控制可以通过 kafka-acls.sh 脚本添加,配置存储在 zookeeper 的 /kafka-acl 路径上

Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP

进行上述 Authentication 配置后,还需要给 topic 增加权限:

$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer \
                --authorizer-properties \
                zookeeper.connect=localhost:2181 \
                --add --allow-principal User:alice --operation Write --topic foo

$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer \
                --authorizer-properties \
                zookeeper.connect=localhost:2181 \
                --add --allow-principal User:godman --operation Read --topic __ucloud_test --group __ucloud_test

$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer \
                --authorizer-properties \
                zookeeper.connect=localhost:2181 \
                --add --allow-principal User:root --producer --topic __ucloud_test

列出权限:

$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer \
                --authorizer-properties zookeeper.connect=localhost:2181 --list

移除权限:

$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer  \
                --authorizer-properties zookeeper.connect=localhost:2181 --remove --topic foo