002-kafka基本安装

阅读量: zyh 2019-03-03 14:01:58
Categories: > Tags:

基本

3节点为例

/etc/hosts添加

10.200.16.51 data01.zyh.cool data01
10.200.16.52 data02.zyh.cool data02
10.200.16.53 data03.zyh.cool data03

每个节点执行

hostnamectl set-hostname <hostName>

安装

http://kafka.apache.org/downloads

下载二进制包

cd /export && mkdir src && cd src
curl https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz -o kafka.tgz
tar xf kafka.tgz -C ../ && cd .. && mv kafka_* kafka && cd kafka

配置和启动

zk

一份集群(3节点)模式下的zk配置

dataDir=/export/kafka/zk_data
clientPort=2181
maxClientCnxns=0
admin.enableServer=false

# 心跳
## ms单位,2秒一次检测
tickTime=2000
## 主从间初始化时候允许心跳检测失败的次数
initLimit=10
## 主从间同步时候允许心跳检测失败的次数
syncLimit=5

# 节点列表
server.1=data01:2888:3888
server.2=data02:2888:3888
server.3=data03:2888:3888

分别在三个节点上,创建zk的myid文件

# node01
mkdir -p /export/kafka/zk_data
echo '1' > /export/kafka/zk_data/myid
# node02
mkdir -p /export/kafka/zk_data
echo '2' > /export/kafka/zk_data/myid
# node03
mkdir -p /export/kafka/zk_data
echo '3' > /export/kafka/zk_data/myid

启动脚本

vim zookeeper
#!/bin/bash
#判断用户是否传参
if [ $# -ne 1 ];then
    echo "无效参数,用法为: $0  {start|stop|restart|status}"
    exit
fi
#获取用户输入的命令
cmd=$1

#定义函数功能
function Manger(){
    case $cmd in
    start)
        echo "启动服务"        
        remoteExec $cmd
        ;;
    stop)
        echo "停止服务"
        remoteExec $cmd
        ;;
    *)
        echo "无效参数,用法为: $0  {start|stop}"
        ;;
    esac
}

#定义执行的命令
function remoteExec(){
    for i in `echo {01..03}`;do
            tput setab 0; tput setaf 2; tput bold
            echo ========== zookeeper: data${i} $cmd ================
            tput sgr0
            if [[ $cmd == "start" ]];then
                ssh data${i}  "source /etc/profile ; cd /export/kafka; ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties"
            elif [[ $cmd == "stop" ]];then
                ssh data${i}  "source /etc/profile ; cd /export/kafka; ./bin/zookeeper-server-stop.sh"
            fi
    done
}



#调用函数
Manger
chmod u+x zookeeper
./zookeeper start

✨日志位于 logs 目录

kafka

一份集群(3节点)模式下的kafka配置

# broker id
## 不同节点设置不同的值,例如三个节点,分别是1/2/3
broker.id=1

# kafka 参数
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 数据目录。存储主题分区日志,主题数据以文件夹方式程序,文件夹名称:<主题名>-<分区ID>
log.dirs=/export/kafka/data
# 多少vcpu,就设置多少
num.partitions=8
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
group.initial.rebalance.delay.ms=0

# kafka 链接 zk 的参数
zookeeper.connect=data01.zyh.cool:2181,data02.zyh.cool:2181,data03.zyh.cool:2181
zookeeper.connection.timeout.ms=18000

# 监听地址
## 不同节点设置不同的值
listeners=SASL_PLAINTEXT://data01.zyh.cool:8123,PLAINTEXT://data01.zyh.cool:9092
advertised.listeners=SASL_PLAINTEXT://data01.zyh.cool:8123,PLAINTEXT://data01.zyh.cool:9092

启动脚本

一键启动脚本(需提前ssh互信)

vi kafka
#!/bin/bash
#判断用户是否传参
if [ $# -ne 1 ];then
    echo "无效参数,用法为: $0  {start|stop|restart|status}"
    exit
fi
#获取用户输入的命令
cmd=$1

#定义函数功能
function Manger(){
    case $cmd in
    start)
        echo "启动服务"        
        remoteExec $cmd
        ;;
    stop)
        echo "停止服务"
        remoteExec $cmd
        ;;
    *)
        echo "无效参数,用法为: $0  {start|stop}"
        ;;
    esac
}

#定义执行的命令
function remoteExec(){
    for i in `echo {01..03}`;do
            tput setab 0; tput setaf 2; tput bold
            echo ========== kafka: data${i} $cmd ================
            tput sgr0
            if [[ $cmd == "start" ]];then
                ssh data${i}  "source /etc/profile ; cd /export/kafka; ./bin/kafka-server-start.sh -daemon config/server.properties"
            elif [[ $cmd == "stop" ]];then
                ssh data${i}  "source /etc/profile ; cd /export/kafka; ./bin/kafka-server-stop.sh"
            fi
    done
}



#调用函数
Manger
chmod u+x kafka
./kafka start

创建主题

bin/kafka-topics.sh --bootstrap-server data01.zyh.cool:9092 --create --replication-factor 3 --partitions 8 --topic zz.it.monit

  1. 主题一经创建,则不可重命名,只能通过新建主题并迁移数据。
  2. 主题名,应遵循一定的层次结构,以便于通过前缀来进行权控。例如:
    • <organization>.<team>.<dataset>.<event-name> 按照组织结构命名
    • <project>.<product>.<event-name> 按照项目结构命名
  3. 当复制因子数<broker的时候,一个主题的副本分区将【无序(分区ID)】【尽量平均】的写入所有的broker中。
  4. 当复制因子数=broker的时候,一个主题的副本分区将【有序(分区ID)】【完全一致】的写入所有的broker中。

事件读写

生产者写入两条事件,事件将被永久存储

bin/kafka-console-producer.sh --bootstrap-server data01.zyh.cool:9092 --topic zz.it.monit
>This is my first event
This is my second event

消费者读取两条事件,因事件被永久存储,故而可以多次消费

bin/kafka-console-consumer.sh --bootstrap-server data01.zyh.cool:9092 --topic zz.it.monit --from-beginning
This is my first event
This is my second event

ℹ️生产者的数据可以立即在消费者端读取显示.

第三方客户端

https://github.com/edenhill/kcat

kcat是一个更加强大的客户端,包含了消费和生产,支持按照时间戳来消费,方便重新消费因某些原因导致丢失的数据。

ℹ️以前它叫kafkacat🙄

查看topic列表元信息

docker run -it --rm --network=host edenhill/kcat:1.7.0 \
-b data01.zyh.cool:8123 \
-X security.protocol="SASL_PLAINTEXT" \
-X sasl.mechanism=SCRAM-SHA-256 \
-X sasl.username="elk" \
-X sasl.password="123456" \
-L

获取某个时间范围内的事件

docker run -it --rm --network=host edenhill/kcat:1.7.0 \
-b data01.zyh.cool:8123 \
-X security.protocol="SASL_PLAINTEXT" \
-X sasl.mechanism=SCRAM-SHA-256 \
-X sasl.username="elk" \
-X sasl.password="123456" \
-G logstash \
-t zz.it.elk.syslog.secure \
-C \
-o s@`date -d "20211129 16:57" +%s`000 \
-o e@`date -d "20211129 18:00" +%s`000
-G 消费组
-o 指定偏移
s@<value> (timestamp in ms to start at)
e@<value> (timestamp in ms to stop at (not included))