003-kafka基本认证

阅读量: zyh 2019-03-03 15:01:58
Categories: > Tags:

kafka的认证是通过java认证技术JAAS实现。

http://kafka.apache.org/documentation/#security_sasl_plain

服务端

开启用户认证配置

添加超级用户

  1. 创建超级用户
# 创建超级用户
bin/kafka-configs.sh --zookeeper data01.zyh.cool:2181 --alter --add-config 'SCRAM-SHA-256=[password=broker]' --entity-type users --entity-name broker

✨通过SCRAM将用户信息存放在zookeeper

  1. 超级用户客户端配置

✨超级用户用于brokers之间互联和管理用户

vim config/my.tools.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="broker" \
  password="broker";

追加认证配置

https://kafka.apache.org/documentation/#security_sasl

config/server.properties

✨认证地址(认证+明文):SASL_PLAINTEXT

✨非认证地址(明文):PLAINTEXT

# 监听地址的通信协议和认证算法
listeners=SASL_PLAINTEXT://data01.zyh.cool:8123,PLAINTEXT://data01.zyh.cool:9092
advertised.listeners=SASL_PLAINTEXT://data01.zyh.cool:8123,PLAINTEXT://data01.zyh.cool:9092
## 开启访问broker的认证机制sasl,以及配置所用的机制算法
sasl.enabled.mechanisms=SCRAM-SHA-256 

# brokers之间的通信协议和认证算法
## 配置broker之间的网络通信协议,这里采用认证+明文。
security.inter.broker.protocol=SASL_PLAINTEXT
## 开启broker之间认证通信机制sasl,以及配置所用的机制算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
## brokers 之间的认证方式:sasl.jaas.config
### sasl.jaas.config的写法: listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="broker" \
    password="broker";

SASL/SCRAM算法将用户数据存放在zk,因此SASL/SCRAM算法可以动态的增删用户。

重启服务

./kafka stop
./kafka start

创建普通用户

创建一个用户名elk,密码123456

bin/kafka-configs.sh --bootstrap-server data01.zyh.cool:8123  --command-config config/my.tools.properties --alter --add-config 'SCRAM-SHA-256=[password=123456]' --entity-type users --entity-name elk

列出用户信息

bin/kafka-configs.sh --bootstrap-server data01.zyh.cool:8123  --command-config config/my.tools.properties --describe --entity-type users --entity-name elk

删除用户

bin/kafka-configs.sh --bootstrap-server data01.zyh.cool:8123  --command-config config/my.tools.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name elk

资源授权规则

💥默认情况下,任何用户访问任何资源都不需要授权。因此,需要针对资源添加授权限制。

http://kafka.apache.org/documentation/#security_authz

  1. ACL规则是针对资源topic或者消费者组添加规则

  2. 规则由principal+host+operation+permissionType组成

    • principal就是User或者Group ✨ 这个属性决定了是否需要使用认证端口8123

    • host是允许的主机

    • operation是允许的操作

    • permissionType允许或拒绝

  3. 💥一个请求,必须满足topic或者消费者组两类下的列出规则。

    • 对于生产者,需要匹配topic下的规则。

    • 对于消费者,需要同时满足topic消费者组规则。

追加授权配置

# ACL默认规则
# 设置授权方式,这里是ACL授权
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 设置超级账号,如果是多个需要分号分割,例如:User:admin;User:root
super.users=User:broker
# true 的意思是:如果某个topic没有ACL规则,则所有用户都可以通过非认证端口9092访问这个topic。
allow.everyone.if.no.acl.found=true

✨上述配置默认情况下仅允许broker用户是超级用户可以无视ACL规则。

✨其它用户发起的请求,不管是访问认证端口8123还是非认证端口9092,均没有权限。

重启服务

./kafka stop
./kafka start

授权命令

http://kafka.apache.org/documentation/#security_authz_cli

bin/kafka-acls.sh用来授权的工具

常用的参数:

–bootstrap-server 指定broker列表

–add 添加

–remove 删除

–producer 指定生产者规则,在没有operation参数时,默认就是只写

–consumer 指定消费者规则,在没有operation参数时,默认就是只读

–operation 授权列表(READ、WRITE、DELETE、CREATE、ALTER、DESCRIBE、ALL)

–allow-principal User:[用户名] 允许指定用户

–deny-prinicipal User:[用户名] 拒绝指定用户

–allow-host [Host] 允许的主机网络

–deny-host [Host] 拒绝的主机网络

–group [group_name] 此规则仅能用于此消费者组

–resource-pattern-type 匹配类型,常用的有前缀匹配 prefixed

–topic 以字符串匹配的主题

–command-config 指定管理员配置文件

绑定写入权限

bin/kafka-acls.sh \
--bootstrap-server data01.zyh.cool:8123 \
--command-config config/my.tools.properties \
--topic zz.it.elk. \
--resource-pattern-type prefixed \
--add \
--allow-principal User:elk \
--producer

针对访问资源是zz.it.elk.开头的 topic, 添加用户elk只写权限。

绑定只读权限

bin/kafka-acls.sh \
--bootstrap-server data01.zyh.cool:8123 \
--command-config config/my.tools.properties \
--group logstash \
--topic zz.it.elk. \
--resource-pattern-type prefixed \
--add \
--allow-principal User:* \
--consumer

针对访问资源是zz.it.elk.开头的 topic,且访问的消费者组是logstash,添加所有用户的只读权限。

这条规则因允许所有用户,所以客户端可以使用非认证端口9092,但消费者组必须是logstash

✨–consumer参数必须同时使用–group和–topic

列出ACL规则

列出所有授权规则

可以通过追加 --principal User:elk 列出某个认证用户使用的规则.

bin/kafka-acls.sh \
--bootstrap-server data01.zyh.cool:8123 \
--command-config config/my.tools.properties \
--list 

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=zz.it.elk., patternType=PREFIXED)`:
        (principal=User:elk, host=*, operation=CREATE, permissionType=ALLOW)
        (principal=User:elk, host=*, operation=WRITE, permissionType=ALLOW)
        (principal=User:*, host=*, operation=READ, permissionType=ALLOW)
        (principal=User:elk, host=*, operation=DESCRIBE, permissionType=ALLOW)
        (principal=User:*, host=*, operation=DESCRIBE, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=logstash, patternType=PREFIXED)`:
        (principal=User:*, host=*, operation=READ, permissionType=ALLOW)

💥resource 就是 ACL 绑定的对象。ACL是针对 resource 的,不是针对用户的。

删除授权

针对 resource 删除 ACL 规则。

bin/kafka-acls.sh --bootstrap-server data01.zyh.cool:8123 --command-config config/my.tools.properties --topic zz.it.elk. --resource-pattern-type prefixed --remove
bin/kafka-acls.sh --bootstrap-server data01.zyh.cool:8123 --command-config config/my.tools.properties --group logstash --resource-pattern-type prefixed --remove

客户端

kafka自带的客户端工具

生产客户端配置config/my.producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="elk" \
    password="123456";

消费客户端配置config/my.consumer.properties

group.id=logstash
#security.protocol=SASL_PLAINTEXT
#sasl.mechanism=SCRAM-SHA-256
#sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
#    username="elk" \
#    password="123456";

测试

# 创建主题
bin/kafka-topics.sh --bootstrap-server data01.zyh.cool:8123 --command-config config/my.tools.properties --create --replication-factor 3 --partitions 8 --topic zz.it.elk.syslog
# 生产连接broker
bin/kafka-console-producer.sh --bootstrap-server data01.zyh.cool:8123 --producer.config config/my.producer.properties  --topic zz.it.elk.syslog
# 消费者连接broker
bin/kafka-console-consumer.sh --bootstrap-server data01.zyh.cool:9092 --consumer.config config/my.consumer.properties  --topic zz.it.elk.syslog