基本
3节点为例
/etc/hosts
添加
10.200.16.51 data01.zyh.cool data01
10.200.16.52 data02.zyh.cool data02
10.200.16.53 data03.zyh.cool data03
每个节点执行
hostnamectl set-hostname <hostName>
安装
http://kafka.apache.org/downloads
下载二进制包
cd /export && mkdir src && cd src
curl https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz -o kafka.tgz
tar xf kafka.tgz -C ../ && cd .. && mv kafka_* kafka && cd kafka
配置和启动
zk
一份集群(3节点)模式下的zk配置
dataDir=/export/kafka/zk_data
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
# 心跳
## ms单位,2秒一次检测
tickTime=2000
## 主从间初始化时候允许心跳检测失败的次数
initLimit=10
## 主从间同步时候允许心跳检测失败的次数
syncLimit=5
# 节点列表
server.1=data01:2888:3888
server.2=data02:2888:3888
server.3=data03:2888:3888
分别在三个节点上,创建zk的myid文件
# node01
mkdir -p /export/kafka/zk_data
echo '1' > /export/kafka/zk_data/myid
# node02
mkdir -p /export/kafka/zk_data
echo '2' > /export/kafka/zk_data/myid
# node03
mkdir -p /export/kafka/zk_data
echo '3' > /export/kafka/zk_data/myid
启动脚本
vim zookeeper
#!/bin/bash
#判断用户是否传参
if [ $# -ne 1 ];then
echo "无效参数,用法为: $0 {start|stop|restart|status}"
exit
fi
#获取用户输入的命令
cmd=$1
#定义函数功能
function Manger(){
case $cmd in
start)
echo "启动服务"
remoteExec $cmd
;;
stop)
echo "停止服务"
remoteExec $cmd
;;
*)
echo "无效参数,用法为: $0 {start|stop}"
;;
esac
}
#定义执行的命令
function remoteExec(){
for i in `echo {01..03}`;do
tput setab 0; tput setaf 2; tput bold
echo ========== zookeeper: data${i} $cmd ================
tput sgr0
if [[ $cmd == "start" ]];then
ssh data${i} "source /etc/profile ; cd /export/kafka; ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties"
elif [[ $cmd == "stop" ]];then
ssh data${i} "source /etc/profile ; cd /export/kafka; ./bin/zookeeper-server-stop.sh"
fi
done
}
#调用函数
Manger
chmod u+x zookeeper
./zookeeper start
✨日志位于 logs 目录
kafka
一份集群(3节点)模式下的kafka配置
# broker id
## 不同节点设置不同的值,例如三个节点,分别是1/2/3
broker.id=1
# kafka 参数
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 数据目录。存储主题分区日志,主题数据以文件夹方式程序,文件夹名称:<主题名>-<分区ID>
log.dirs=/export/kafka/data
# 多少vcpu,就设置多少
num.partitions=8
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
group.initial.rebalance.delay.ms=0
# kafka 链接 zk 的参数
zookeeper.connect=data01.zyh.cool:2181,data02.zyh.cool:2181,data03.zyh.cool:2181
zookeeper.connection.timeout.ms=18000
# 监听地址
## 不同节点设置不同的值
listeners=SASL_PLAINTEXT://data01.zyh.cool:8123,PLAINTEXT://data01.zyh.cool:9092
advertised.listeners=SASL_PLAINTEXT://data01.zyh.cool:8123,PLAINTEXT://data01.zyh.cool:9092
启动脚本
一键启动脚本(需提前ssh互信)
vi kafka
#!/bin/bash
#判断用户是否传参
if [ $# -ne 1 ];then
echo "无效参数,用法为: $0 {start|stop|restart|status}"
exit
fi
#获取用户输入的命令
cmd=$1
#定义函数功能
function Manger(){
case $cmd in
start)
echo "启动服务"
remoteExec $cmd
;;
stop)
echo "停止服务"
remoteExec $cmd
;;
*)
echo "无效参数,用法为: $0 {start|stop}"
;;
esac
}
#定义执行的命令
function remoteExec(){
for i in `echo {01..03}`;do
tput setab 0; tput setaf 2; tput bold
echo ========== kafka: data${i} $cmd ================
tput sgr0
if [[ $cmd == "start" ]];then
ssh data${i} "source /etc/profile ; cd /export/kafka; ./bin/kafka-server-start.sh -daemon config/server.properties"
elif [[ $cmd == "stop" ]];then
ssh data${i} "source /etc/profile ; cd /export/kafka; ./bin/kafka-server-stop.sh"
fi
done
}
#调用函数
Manger
chmod u+x kafka
./kafka start
创建主题
bin/kafka-topics.sh --bootstrap-server data01.zyh.cool:9092 --create --replication-factor 3 --partitions 8 --topic zz.it.monit
- –bootstrap-server 你可以写集群里的任意节点.
- –replication-factor 复制因子数<=broker数量.
- –partitions 主题分区数,决定了主题下的日志分成多少份.
✨
- 主题一经创建,则不可重命名,只能通过新建主题并迁移数据。
- 主题名,应遵循一定的层次结构,以便于通过前缀来进行权控。例如:
<organization>.<team>.<dataset>.<event-name>
按照组织结构命名<project>.<product>.<event-name>
按照项目结构命名
- 当复制因子数<broker的时候,一个主题的副本分区将【无序(分区ID)】【尽量平均】的写入所有的broker中。
- 当复制因子数=broker的时候,一个主题的副本分区将【有序(分区ID)】【完全一致】的写入所有的broker中。
事件读写
生产者写入两条事件,事件将被永久存储
bin/kafka-console-producer.sh --bootstrap-server data01.zyh.cool:9092 --topic zz.it.monit
>This is my first event
This is my second event
消费者读取两条事件,因事件被永久存储,故而可以多次消费
bin/kafka-console-consumer.sh --bootstrap-server data01.zyh.cool:9092 --topic zz.it.monit --from-beginning
This is my first event
This is my second event
ℹ️生产者的数据可以立即在消费者端读取显示.
第三方客户端
https://github.com/edenhill/kcat
kcat
是一个更加强大的客户端,包含了消费和生产,支持按照时间戳来消费,方便重新消费因某些原因导致丢失的数据。
ℹ️以前它叫kafkacat
🙄
查看topic列表元信息
docker run -it --rm --network=host edenhill/kcat:1.7.0 \
-b data01.zyh.cool:8123 \
-X security.protocol="SASL_PLAINTEXT" \
-X sasl.mechanism=SCRAM-SHA-256 \
-X sasl.username="elk" \
-X sasl.password="123456" \
-L
获取某个时间范围内的事件
docker run -it --rm --network=host edenhill/kcat:1.7.0 \
-b data01.zyh.cool:8123 \
-X security.protocol="SASL_PLAINTEXT" \
-X sasl.mechanism=SCRAM-SHA-256 \
-X sasl.username="elk" \
-X sasl.password="123456" \
-G logstash \
-t zz.it.elk.syslog.secure \
-C \
-o s@`date -d "20211129 16:57" +%s`000 \
-o e@`date -d "20211129 18:00" +%s`000
-G 消费组
-o 指定偏移
s@<value> (timestamp in ms to start at)
e@<value> (timestamp in ms to stop at (not included))