kafka的认证是通过java认证技术JAAS实现。
http://kafka.apache.org/documentation/#security_sasl_plain
服务端
开启用户认证配置
添加超级用户
- 创建超级用户
# 创建超级用户
bin/kafka-configs.sh --zookeeper data01.zyh.cool:2181 --alter --add-config 'SCRAM-SHA-256=[password=broker]' --entity-type users --entity-name broker
✨通过SCRAM
将用户信息存放在zookeeper
中
- 超级用户客户端配置
✨超级用户用于brokers之间互联和管理用户
vim config/my.tools.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="broker" \
password="broker";
追加认证配置
https://kafka.apache.org/documentation/#security_sasl
config/server.properties
✨认证地址(认证+明文):SASL_PLAINTEXT
✨非认证地址(明文):PLAINTEXT
# 监听地址的通信协议和认证算法
listeners=SASL_PLAINTEXT://data01.zyh.cool:8123,PLAINTEXT://data01.zyh.cool:9092
advertised.listeners=SASL_PLAINTEXT://data01.zyh.cool:8123,PLAINTEXT://data01.zyh.cool:9092
## 开启访问broker的认证机制sasl,以及配置所用的机制算法
sasl.enabled.mechanisms=SCRAM-SHA-256
# brokers之间的通信协议和认证算法
## 配置broker之间的网络通信协议,这里采用认证+明文。
security.inter.broker.protocol=SASL_PLAINTEXT
## 开启broker之间认证通信机制sasl,以及配置所用的机制算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
## brokers 之间的认证方式:sasl.jaas.config
### sasl.jaas.config的写法: listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="broker" \
password="broker";
✨SASL/SCRAM算法将用户数据存放在zk,因此SASL/SCRAM算法可以动态的增删用户。
重启服务
./kafka stop
./kafka start
创建普通用户
创建一个用户名elk
,密码123456
bin/kafka-configs.sh --bootstrap-server data01.zyh.cool:8123 --command-config config/my.tools.properties --alter --add-config 'SCRAM-SHA-256=[password=123456]' --entity-type users --entity-name elk
列出用户信息
bin/kafka-configs.sh --bootstrap-server data01.zyh.cool:8123 --command-config config/my.tools.properties --describe --entity-type users --entity-name elk
删除用户
bin/kafka-configs.sh --bootstrap-server data01.zyh.cool:8123 --command-config config/my.tools.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name elk
资源授权规则
💥默认情况下,任何用户访问任何资源都不需要授权。因此,需要针对资源添加授权限制。
http://kafka.apache.org/documentation/#security_authz
-
ACL规则是针对资源
topic
或者消费者组
添加规则 -
规则由
principal
+host
+operation
+permissionType
组成-
principal
就是User或者Group ✨ 这个属性决定了是否需要使用认证端口8123 -
host
是允许的主机 -
operation
是允许的操作 -
permissionType
允许或拒绝
-
-
💥一个请求,必须满足
topic
或者消费者组
两类下的列出规则。-
对于生产者,需要匹配
topic
下的规则。 -
对于消费者,需要同时满足
topic
和消费者组
规则。
-
追加授权配置
# ACL默认规则
# 设置授权方式,这里是ACL授权
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 设置超级账号,如果是多个需要分号分割,例如:User:admin;User:root
super.users=User:broker
# true 的意思是:如果某个topic没有ACL规则,则所有用户都可以通过非认证端口9092访问这个topic。
allow.everyone.if.no.acl.found=true
✨上述配置默认情况下仅允许broker
用户是超级用户可以无视ACL规则。
✨其它用户发起的请求,不管是访问认证端口8123还是非认证端口9092,均没有权限。
重启服务
./kafka stop
./kafka start
授权命令
http://kafka.apache.org/documentation/#security_authz_cli
bin/kafka-acls.sh
用来授权的工具
常用的参数:
–bootstrap-server 指定broker列表
–add 添加
–remove 删除
–producer 指定生产者规则,在没有operation参数时,默认就是只写
–consumer 指定消费者规则,在没有operation参数时,默认就是只读
–operation 授权列表(READ、WRITE、DELETE、CREATE、ALTER、DESCRIBE、ALL)
–allow-principal User:[用户名] 允许指定用户
–deny-prinicipal User:[用户名] 拒绝指定用户
–allow-host [Host] 允许的主机网络
–deny-host [Host] 拒绝的主机网络
–group [group_name] 此规则仅能用于此消费者组
–resource-pattern-type 匹配类型,常用的有前缀匹配 prefixed
–topic 以字符串匹配的主题
–command-config 指定管理员配置文件
绑定写入权限
bin/kafka-acls.sh \
--bootstrap-server data01.zyh.cool:8123 \
--command-config config/my.tools.properties \
--topic zz.it.elk. \
--resource-pattern-type prefixed \
--add \
--allow-principal User:elk \
--producer
针对访问资源是zz.it.elk.
开头的 topic, 添加用户elk
只写权限。
绑定只读权限
bin/kafka-acls.sh \
--bootstrap-server data01.zyh.cool:8123 \
--command-config config/my.tools.properties \
--group logstash \
--topic zz.it.elk. \
--resource-pattern-type prefixed \
--add \
--allow-principal User:* \
--consumer
针对访问资源是zz.it.elk.
开头的 topic,且访问的消费者组是logstash
,添加所有用户的只读权限。
这条规则因允许所有用户,所以客户端可以使用非认证端口9092,但消费者组必须是
logstash
。
✨–consumer参数必须同时使用–group和–topic
列出ACL规则
列出所有授权规则
可以通过追加 --principal User:elk 列出某个认证用户使用的规则.
bin/kafka-acls.sh \
--bootstrap-server data01.zyh.cool:8123 \
--command-config config/my.tools.properties \
--list
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=zz.it.elk., patternType=PREFIXED)`:
(principal=User:elk, host=*, operation=CREATE, permissionType=ALLOW)
(principal=User:elk, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:*, host=*, operation=READ, permissionType=ALLOW)
(principal=User:elk, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:*, host=*, operation=DESCRIBE, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=logstash, patternType=PREFIXED)`:
(principal=User:*, host=*, operation=READ, permissionType=ALLOW)
💥resource 就是 ACL 绑定的对象。ACL是针对 resource 的,不是针对用户的。
删除授权
针对 resource 删除 ACL 规则。
- 删除
Current ACLs for resource ResourcePattern(resourceType=TOPIC, name=zz.it., patternType=PREFIXED)
下的ACL规则
bin/kafka-acls.sh --bootstrap-server data01.zyh.cool:8123 --command-config config/my.tools.properties --topic zz.it.elk. --resource-pattern-type prefixed --remove
- 删除
Current ACLs for resource ResourcePattern(resourceType=GROUP, name=logstash, patternType=PREFIXED)
下的ACL规则
bin/kafka-acls.sh --bootstrap-server data01.zyh.cool:8123 --command-config config/my.tools.properties --group logstash --resource-pattern-type prefixed --remove
客户端
kafka自带的客户端工具
生产客户端配置config/my.producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="elk" \
password="123456";
消费客户端配置config/my.consumer.properties
group.id=logstash
#security.protocol=SASL_PLAINTEXT
#sasl.mechanism=SCRAM-SHA-256
#sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
# username="elk" \
# password="123456";
测试
# 创建主题
bin/kafka-topics.sh --bootstrap-server data01.zyh.cool:8123 --command-config config/my.tools.properties --create --replication-factor 3 --partitions 8 --topic zz.it.elk.syslog
# 生产连接broker
bin/kafka-console-producer.sh --bootstrap-server data01.zyh.cool:8123 --producer.config config/my.producer.properties --topic zz.it.elk.syslog
# 消费者连接broker
bin/kafka-console-consumer.sh --bootstrap-server data01.zyh.cool:9092 --consumer.config config/my.consumer.properties --topic zz.it.elk.syslog