Skip to content

Kafka部署

kafka需要放行9092端口,开发环境可以关闭防火墙,生产不允许关闭时,可以放行端口

bash
$ firewall-cmd --zone=public --add-port=9092/tcp --permanent        #开启 9092 端口
$ firewall-cmd --reload   #重启防火墙
$ firewall-cmd --zone=public --list-ports  #查看对外成功开放的端口

单节点 3.7.0

配置JAVA环境变量

bash
$ tar xf jdk-8u261-linux-x64.tar.gz
$ mv jdk1.8.0_261 /data/jdk1.8

$ vi /etc/profile
export JAVA_HOME=/data/jdk1.8
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=$JAVA_HOME/bin:$PATH

$ source /etc/profile

解压kafka安装包

bash
# 离线下载在上传也是可以的
$ wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
$ tar xf kafka_2.13-3.7.0.tgz
$ mv kafka_2.13-3.7.0 /data/kafka

修改zk配置

bash
$ vi /data/kafka/config/zookeeper.properties
dataDir=/data/kafka/data/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false

修改kafka配置

bash
$ vi /data/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/data/kafka/data/logs
zookeeper.connect=127.0.0.1:2181

zk托管到systemd

bash
$ vi /usr/lib/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper service
Requires=network.target
After=network.target

[Service]
Type=forking
Environment=JAVA_HOME=/data/jdk1.8
ExecStart=/data/kafka/bin/zookeeper-server-start.sh -daemon /data/kafka/config/zookeeper.properties
ExecStop=/data/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target

kafka托管到systemd

bash
$ vi /usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service

[Service]
Type=forking
Environment=JAVA_HOME=/data/jdk1.8
ExecStart=/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties
ExecStop=/data/kafka/bin/kafka-server-stop.sh
Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target

重载systemd配置并启动服务

bash
$ systemctl daemon-reload
$ systemctl start zookeeper
$ systemctl start kafka

集群部署 3.7.0

解压

bash
$ tar xf kafka_2.13-3.7.0.tgz
$ mv kafka_2.13-3.7.0 /data/kafka

修改server.properties

如果zk信息配置的是主机名需要自己做hosts配置

bash
$ cd /data/kafka/config/
$ vi server.properties

# broker的全局唯一编号,不能重复,只能是数字。
# ===== 注意修改 =====
broker.id=0
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘IO的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
# ===== 注意修改 =====
log.dirs=/data/kafka/data/logs
# topic在当前broker上的分区个数
num.partitions=1
# 用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
# segment文件保留的最长时间,超时将被删除
log.retention.hours=168
# 每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
# 配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
# ===== 注意修改 =====
zookeeper.connect=kafka-node1:2181,kafka-node2:2181,kafka-node3:2181/kafka

分发安装包

bash
$ scp -r kafka xxx.xxx.xxx:/data/kafka

调整其他节点配置信息

bash
$ vi /data/kafka/config/server.properties
# 节点二
broker.id=1
# 节点三
broker.id=2

配置环境变量

记得分发到其他节点

bash
$ vi /etc/profile.d/kafka-env.sh

#KAFKA_HOME
export KAFKA_HOME=/data/kafka
export PATH=$PATH:$KAFKA_HOME/bin

刷新环境变量

每个节点配置后都需要刷新,或者退出登录再进去

bash
$ . /etc/profile

启动集群

启动kafka前需要先启动zookeeper,这里假定已经启动

bash
$ cd /data/kafka
$ bin/kafka-server-start.sh -daemon config/server.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
# 注意:配置文件的路径要能够到server.properties。

关闭集群

bash
$ bin/kafka-server-stop.sh 
$ bin/kafka-server-stop.sh 
$ bin/kafka-server-stop.sh

简单启停脚本

bash
$ vi kfk.sh
#! /bin/bash
case $1 in
"start"){
    for i in kafka-node1 kafka-node2 kafka-node3
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties"
    done
};;
"stop"){
    for i in kafka-node1 kafka-node2 kafka-node3
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/data/kafka/bin/kafka-server-stop.sh "
    done
};;
esac


# 添加执行权限
$ chmod +x kfk.sh

# 启停集群命令
$ kfk.sh start
$ kfk.sh stop

注意:

停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。

Kafka命令行操作

主题命令行操作

查看操作主题命令参数

bash
$ bin/kafka-topics.sh
参数描述
–bootstrap-server <String: server toconnect to>连接的Kafka Broker主机名称和端口号
–topic <String: topic>操作的topic名称
–create创建主题
–delete删除主题
–alter修改主题
–list查看所有主题
–describe查看主题详细描述
–partitions <Integer: # of partitions>设置分区数
–replication-factor<Integer: replication factor>设置分区副本
–config <String: name=value>更新系统默认的配置

查看当前服务器中的所有topic

bash
$ bin/kafka-topics.sh --bootstrap-server kafka-node1:9092 --list

创建first topic

bash
$ bin/kafka-topics.sh --bootstrap-server kafka-node1:9092 --create --partitions 1 --replication-factor 3 --topic first

选项说明:

–topic 定义topic名

–replication-factor 定义副本数

–partitions 定义分区数

查看first主题的详情

bash
$ bin/kafka-topics.sh --bootstrap-server kafka-node1:9092 --describe --topic first

修改分区数(注意:分区数只能增加,不能减少)

bash
$ bin/kafka-topics.sh --bootstrap-server kafka-node1:9092 --alter --topic first --partitions 3

删除topic

bash
$ bin/kafka-topics.sh --bootstrap-server kafka-node1:9092 --delete --topic first

生产者命令行操作

查看操作生产者命令参数

bash
$ bin/kafka-console-producer.sh
参数描述
–bootstrap-server <String: server toconnect to>连接的Kafka Broker主机名称和端口号。
–topic <String: topic>操作的topic名称。

发送消息

bash
$ bin/kafka-console-producer.sh --bootstrap-server kafka-node1:9092 --topic first
>hello world
>ceshi cs

消费者命令行操作

查看操作消费者命令参数

bash
$ bin/kafka-console-consumer.sh
参数描述
–bootstrap-server <String: server toconnect to>连接的Kafka Broker主机名称和端口号
–topic <String: topic>操作的topic名称
–from-beginning从头开始消费
–group <String: consumer group id>指定消费者组名称

消费消息

  • 消费first主题中的数据
bash
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092 --topic first
  • 把主题中所有的数据都读取出来(包括历史数据)
bash
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092 --from-beginning --topic first

错误集合

UseG1GC问题

bash
# 启动 kafka ,然后发现没起来
$ systemctl start kafka
# 查看日志 kafkaServer.out,出现以下内容
$ cat kafkaServer.out 
Error: VM option 'UseG1GC' is experimental and must be enabled via -XX:+UnlockExperimentalVMOptions.
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

解决方案:

编辑bin/kafka-run-class.sh约286行,删除-XX:+UseG1GC,然后重新启动即可

bash
# JVM performance options
# MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supported
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
  # 修改前
  # KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
  # 修改后
  KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
fi

集群内部访问kafka变成主机名

如下提示,找不到主机:k8s-master-1.cluster.local

bash
22:10:33.315 [dbListener-0-C-1] WARN  o.a.k.c.NetworkClient - [initiateConnect,969] - [Consumer clientId=consumer-db-group-2, groupId=db-group] Error connecting to node k8s-master-1.cluster.local:9092 (id: 1 rack: null)
java.net.UnknownHostException: k8s-master-1.cluster.local
        at java.net.InetAddress.getAllByName0(InetAddress.java:1281)
        at java.net.InetAddress.getAllByName(InetAddress.java:1193)
        at java.net.InetAddress.getAllByName(InetAddress.java:1127)
        at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
        at ....

解决办法:

编辑bin/server.properties,取消对于注释,修改成IP,也就是xxx.xxx.xxx.xxx表示的是当前kafka节点主机IP

bash
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092

#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092