Appearance
Kafka部署
kafka需要放行9092端口,开发环境可以关闭防火墙,生产不允许关闭时,可以放行端口
bash
$ firewall-cmd --zone=public --add-port=9092/tcp --permanent #开启 9092 端口
$ firewall-cmd --reload #重启防火墙
$ firewall-cmd --zone=public --list-ports #查看对外成功开放的端口单节点 3.7.0
配置JAVA环境变量
bash
$ tar xf jdk-8u261-linux-x64.tar.gz
$ mv jdk1.8.0_261 /data/jdk1.8
$ vi /etc/profile
export JAVA_HOME=/data/jdk1.8
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=$JAVA_HOME/bin:$PATH
$ source /etc/profile解压kafka安装包
bash
# 离线下载在上传也是可以的
$ wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
$ tar xf kafka_2.13-3.7.0.tgz
$ mv kafka_2.13-3.7.0 /data/kafka修改zk配置
bash
$ vi /data/kafka/config/zookeeper.properties
dataDir=/data/kafka/data/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false修改kafka配置
bash
$ vi /data/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/data/kafka/data/logs
zookeeper.connect=127.0.0.1:2181zk托管到systemd
bash
$ vi /usr/lib/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper service
Requires=network.target
After=network.target
[Service]
Type=forking
Environment=JAVA_HOME=/data/jdk1.8
ExecStart=/data/kafka/bin/zookeeper-server-start.sh -daemon /data/kafka/config/zookeeper.properties
ExecStop=/data/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.targetkafka托管到systemd
bash
$ vi /usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service
[Service]
Type=forking
Environment=JAVA_HOME=/data/jdk1.8
ExecStart=/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties
ExecStop=/data/kafka/bin/kafka-server-stop.sh
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.target重载systemd配置并启动服务
bash
$ systemctl daemon-reload
$ systemctl start zookeeper
$ systemctl start kafka集群部署 3.7.0
解压
bash
$ tar xf kafka_2.13-3.7.0.tgz
$ mv kafka_2.13-3.7.0 /data/kafka修改server.properties
如果zk信息配置的是主机名需要自己做hosts配置
bash
$ cd /data/kafka/config/
$ vi server.properties
# broker的全局唯一编号,不能重复,只能是数字。
# ===== 注意修改 =====
broker.id=0
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘IO的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
# ===== 注意修改 =====
log.dirs=/data/kafka/data/logs
# topic在当前broker上的分区个数
num.partitions=1
# 用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
# segment文件保留的最长时间,超时将被删除
log.retention.hours=168
# 每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
# 配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
# ===== 注意修改 =====
zookeeper.connect=kafka-node1:2181,kafka-node2:2181,kafka-node3:2181/kafka分发安装包
bash
$ scp -r kafka xxx.xxx.xxx:/data/kafka调整其他节点配置信息
bash
$ vi /data/kafka/config/server.properties
# 节点二
broker.id=1
# 节点三
broker.id=2配置环境变量
记得分发到其他节点
bash
$ vi /etc/profile.d/kafka-env.sh
#KAFKA_HOME
export KAFKA_HOME=/data/kafka
export PATH=$PATH:$KAFKA_HOME/bin刷新环境变量
每个节点配置后都需要刷新,或者退出登录再进去
bash
$ . /etc/profile启动集群
启动kafka前需要先启动zookeeper,这里假定已经启动
bash
$ cd /data/kafka
$ bin/kafka-server-start.sh -daemon config/server.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
# 注意:配置文件的路径要能够到server.properties。关闭集群
bash
$ bin/kafka-server-stop.sh
$ bin/kafka-server-stop.sh
$ bin/kafka-server-stop.sh简单启停脚本
bash
$ vi kfk.sh
#! /bin/bash
case $1 in
"start"){
for i in kafka-node1 kafka-node2 kafka-node3
do
echo " --------启动 $i Kafka-------"
ssh $i "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties"
done
};;
"stop"){
for i in kafka-node1 kafka-node2 kafka-node3
do
echo " --------停止 $i Kafka-------"
ssh $i "/data/kafka/bin/kafka-server-stop.sh "
done
};;
esac
# 添加执行权限
$ chmod +x kfk.sh
# 启停集群命令
$ kfk.sh start
$ kfk.sh stop注意:
停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。
Kafka命令行操作
主题命令行操作
查看操作主题命令参数
bash
$ bin/kafka-topics.sh| 参数 | 描述 |
|---|---|
| –bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号 |
| –topic <String: topic> | 操作的topic名称 |
| –create | 创建主题 |
| –delete | 删除主题 |
| –alter | 修改主题 |
| –list | 查看所有主题 |
| –describe | 查看主题详细描述 |
| –partitions <Integer: # of partitions> | 设置分区数 |
| –replication-factor<Integer: replication factor> | 设置分区副本 |
| –config <String: name=value> | 更新系统默认的配置 |
查看当前服务器中的所有topic
bash
$ bin/kafka-topics.sh --bootstrap-server kafka-node1:9092 --list创建first topic
bash
$ bin/kafka-topics.sh --bootstrap-server kafka-node1:9092 --create --partitions 1 --replication-factor 3 --topic first选项说明:
–topic 定义topic名
–replication-factor 定义副本数
–partitions 定义分区数
查看first主题的详情
bash
$ bin/kafka-topics.sh --bootstrap-server kafka-node1:9092 --describe --topic first修改分区数(注意:分区数只能增加,不能减少)
bash
$ bin/kafka-topics.sh --bootstrap-server kafka-node1:9092 --alter --topic first --partitions 3删除topic
bash
$ bin/kafka-topics.sh --bootstrap-server kafka-node1:9092 --delete --topic first生产者命令行操作
查看操作生产者命令参数
bash
$ bin/kafka-console-producer.sh| 参数 | 描述 |
|---|---|
| –bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号。 |
| –topic <String: topic> | 操作的topic名称。 |
发送消息
bash
$ bin/kafka-console-producer.sh --bootstrap-server kafka-node1:9092 --topic first
>hello world
>ceshi cs消费者命令行操作
查看操作消费者命令参数
bash
$ bin/kafka-console-consumer.sh| 参数 | 描述 |
|---|---|
| –bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号 |
| –topic <String: topic> | 操作的topic名称 |
| –from-beginning | 从头开始消费 |
| –group <String: consumer group id> | 指定消费者组名称 |
消费消息
- 消费first主题中的数据
bash
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092 --topic first- 把主题中所有的数据都读取出来(包括历史数据)
bash
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092 --from-beginning --topic first错误集合
UseG1GC问题
bash
# 启动 kafka ,然后发现没起来
$ systemctl start kafka
# 查看日志 kafkaServer.out,出现以下内容
$ cat kafkaServer.out
Error: VM option 'UseG1GC' is experimental and must be enabled via -XX:+UnlockExperimentalVMOptions.
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.解决方案:
编辑bin/kafka-run-class.sh约286行,删除-XX:+UseG1GC,然后重新启动即可
bash
# JVM performance options
# MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supported
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
# 修改前
# KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
# 修改后
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
fi集群内部访问kafka变成主机名
如下提示,找不到主机:k8s-master-1.cluster.local
bash
22:10:33.315 [dbListener-0-C-1] WARN o.a.k.c.NetworkClient - [initiateConnect,969] - [Consumer clientId=consumer-db-group-2, groupId=db-group] Error connecting to node k8s-master-1.cluster.local:9092 (id: 1 rack: null)
java.net.UnknownHostException: k8s-master-1.cluster.local
at java.net.InetAddress.getAllByName0(InetAddress.java:1281)
at java.net.InetAddress.getAllByName(InetAddress.java:1193)
at java.net.InetAddress.getAllByName(InetAddress.java:1127)
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
at ....解决办法:
编辑bin/server.properties,取消对于注释,修改成IP,也就是xxx.xxx.xxx.xxx表示的是当前kafka节点主机IP
bash
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092