Kafka 集群
| 服务器IP | 主机角色 | 
|---|---|
| 192.168.10.7 | Kafka Node01 | 
| 192.168.10.8 | Kafka Node02 | 
| 192.168.10.9 | Kafka Node03 | 
Kafka Node01
- 
下载 jdk 安装包
- 服务器支持访问互联网
 - 服务器不支持访问互联网
 
wget https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz# jdk 安装包文件下载链接,下载完成后上传到部署服务器
https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz - 
解压 jdk 到安装目录
tar -zxvf OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz
mv jdk-21.0.8+9 /usr/local/openjdk-21 - 
配置 java 软链接
ln -s /usr/local/openjdk-21/bin/java /bin/java - 
下载 kafka 安装包
- 服务器支持访问互联网
 - 服务器不支持访问互联网
 
wget https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz# jdk 安装包文件下载链接,下载完成后上传到部署服务器
https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz - 
解压 kafka 到安装目录
tar -zxvf kafka_2.13-3.9.1.tgz -C /usr/local
mv /usr/local/kafka_2.13-3.9.1/ /usr/local/kafka/ - 
创建数据目录
mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/ - 
添加 zookeeper myid
echo 1 > /data/kafka/zookeeper/myid- 注意,各节点的 myid 不同
 
 - 
修改 zookeeper 配置文件
cat > /usr/local/kafka/config/zookeeper.properties <<EOF
admin.enableServer=false
dataDir=/data/kafka/zookeeper/
clientPort=2181
maxClientCnxns=0
initLimit=10
syncLimit=5
server.1=192.168.10.7:2888:3888
server.2=192.168.10.8:2888:3888
server.3=192.168.10.9:2888:3888
EOF- server.1 对应 myid 文件内容为1的服务器
 - server.2 对应 myid 文件内容为2的服 务器
 - server.3 对应 myid 文件内容为3的服务器
 
 - 
修改 kafka 内存限制为4g
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.sh
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh - 
修改 kafka 配置文件
cat > /usr/local/kafka/config/server.properties <<EOF
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.10.7:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs/
num.partitions=10
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
default.replication.factor=3
acks=all
min.insync.replicas=2
message.max.bytes=10485760
replica.fetch.max.bytes=10485760
EOF- 注意,broker.id 各节点值不同
 - listeners 值在部署时修改为实际的本机IP
 - zookeeper.connect 值部署时修改为实际的 zookeeper 三台节点IP
 
 - 
创建 kafka 用户
useradd -M -s /sbin/nologin kafka - 
授权 kafka 相关目录权限
chown -R kafka:kafka /usr/local/kafka /data/kafka - 
配置 systemd 管理 zookeeper
cat > /etc/systemd/system/zookeeper.service <<EOF
[Unit]
Description=Zookeeper
[Service]
User=kafka
Group=kafka
LimitNOFILE=102400
LimitNPROC=102400
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/bin/kill \$MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF - 
配置 systemd 管理 kafka
cat > /etc/systemd/system/kafka.service <<EOF
[Unit]
Description=Kafka
After=zookeeper.service
Requires=zookeeper.service
[Service]
User=kafka
Group=kafka
LimitNOFILE=102400
LimitNPROC=102400
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/bin/kill \$MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF - 
启动 zookeeper 与 kafka 并加入开机自启动
systemctl start zookeeper
systemctl enable zookeeper
systemctl start kafka
systemctl enable kafka 
Kafka Node02
- 
下载 jdk 安装包
- 服务器支持访问互联网
 - 服务器不支持访问互联网
 
wget https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz# jdk 安装包文件下载链接,下载完成后上传到部署服务器
https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz - 
解压 jdk 到安装目录
tar -zxvf OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz
mv jdk-21.0.8+9 /usr/local/openjdk-21 - 
配置 java 软链接
ln -s /usr/local/openjdk-21/bin/java /bin/java - 
下载 kafka 安装包
- 服务器支持访问互联网
 - 服务器不支持访问互联网
 
wget https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz# jdk 安装包文件下载链接,下载完成后上传到部署服务器
https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz - 
解压 kafka 到安装目录
tar -zxvf kafka_2.13-3.9.1.tgz -C /usr/local
mv /usr/local/kafka_2.13-3.9.1/ /usr/local/kafka/ - 
创建数据目录
mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/ - 
添加 zookeeper myid
echo 2 > /data/kafka/zookeeper/myid- 注意,各节点的 myid 不同
 
 - 
修改 zookeeper 配置文件
cat > /usr/local/kafka/config/zookeeper.properties <<EOF
admin.enableServer=false
dataDir=/data/kafka/zookeeper/
clientPort=2181
maxClientCnxns=0
initLimit=10
syncLimit=5
server.1=192.168.10.7:2888:3888
server.2=192.168.10.8:2888:3888
server.3=192.168.10.9:2888:3888
EOF- server.1 对应 myid 文件内容为1的服务器
 - server.2 对应 myid 文件内容为2的服务器
 - server.3 对应 myid 文件内容为3的服务器
 
 - 
修改 kafka 内存限制为4g
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.sh
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh - 
修改 kafka 配置文件
cat > /usr/local/kafka/config/server.properties <<EOF
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.10.8:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs/
num.partitions=10
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
default.replication.factor=3
acks=all
min.insync.replicas=2
message.max.bytes=10485760
replica.fetch.max.bytes=10485760
EOF- 注意,broker.id 各节点值不同
 - listeners 值在部署时修改为实际的本机IP
 - zookeeper.connect 值部署时修改为实际的 zookeeper 三台节点IP
 
 - 
创建 kafka 用户
useradd -M -s /sbin/nologin kafka - 
授权 kafka 相关目录权限
chown -R kafka:kafka /usr/local/kafka /data/kafka - 
配置 systemd 管理 zookeeper
cat > /etc/systemd/system/zookeeper.service <<EOF
[Unit]
Description=Zookeeper
[Service]
User=kafka
Group=kafka
LimitNOFILE=102400
LimitNPROC=102400
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/bin/kill \$MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF - 
配置 systemd 管理 kafka
cat > /etc/systemd/system/kafka.service <<EOF
[Unit]
Description=Kafka
After=zookeeper.service
Requires=zookeeper.service
[Service]
User=kafka
Group=kafka
LimitNOFILE=102400
LimitNPROC=102400
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/bin/kill \$MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF - 
启动 zookeeper 与 kafka 并加入开机自启动
systemctl start zookeeper
systemctl enable zookeeper
systemctl start kafka
systemctl enable kafka 
Kafka Node03
- 
下载 jdk 安装包
- 服务器支持访问互联网
 - 服务器不支持访问互联网
 
wget https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz# jdk 安装包文件下载链接,下载完成后上传到部署服务器
https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz - 
解压 jdk 到安装目录
tar -zxvf OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz
mv jdk-21.0.8+9 /usr/local/openjdk-21 - 
配置 java 软链接
ln -s /usr/local/openjdk-21/bin/java /bin/java - 
下载 kafka 安装包
- 服务器支持访问互联网
 - 服务器不支持访问互联网
 
wget https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz# jdk 安装包文件下载链接,下载完成后上传到部署服务器
https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz - 
解压 kafka 到安装目录
tar -zxvf kafka_2.13-3.9.1.tgz -C /usr/local
mv /usr/local/kafka_2.13-3.9.1/ /usr/local/kafka/ - 
创建数据目录
mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/ - 
添加 zookeeper myid
echo 3 > /data/kafka/zookeeper/myid- 注意,各节点的 myid 不同
 
 - 
修改 zookeeper 配置文件
cat > /usr/local/kafka/config/zookeeper.properties <<EOF
admin.enableServer=false
dataDir=/data/kafka/zookeeper/
clientPort=2181
maxClientCnxns=0
initLimit=10
syncLimit=5
server.1=192.168.10.7:2888:3888
server.2=192.168.10.8:2888:3888
server.3=192.168.10.9:2888:3888
EOF- server.1 对应 myid 文件内容为1的服务器
 - server.2 对应 myid 文件内容为2的服务器
 - server.3 对应 myid 文件内容为3的服务器
 
 - 
修改 kafka 内存限制为4g
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.sh
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh - 
修改 kafka 配置文件
cat > /usr/local/kafka/config/server.properties <<EOF
broker.id=2
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.10.9:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs/
num.partitions=10
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
default.replication.factor=3
acks=all
min.insync.replicas=2
message.max.bytes=10485760
replica.fetch.max.bytes=10485760
EOF- 注意,broker.id 各节点值不同
 - listeners 值在部署时修改为实际的本机IP
 - zookeeper.connect 值部署时修改为实际的 zookeeper 三台节点IP
 
 - 
创建 kafka 用户
useradd -M -s /sbin/nologin kafka - 
授权 kafka 相关目录权限
chown -R kafka:kafka /usr/local/kafka /data/kafka - 
配置 systemd 管理 zookeeper
cat > /etc/systemd/system/zookeeper.service <<EOF
[Unit]
Description=Zookeeper
[Service]
User=kafka
Group=kafka
LimitNOFILE=102400
LimitNPROC=102400
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/bin/kill \$MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF - 
配置 systemd 管理 kafka
cat > /etc/systemd/system/kafka.service <<EOF
[Unit]
Description=Kafka
After=zookeeper.service
Requires=zookeeper.service
[Service]
User=kafka
Group=kafka
LimitNOFILE=102400
LimitNPROC=102400
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/bin/kill \$MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF - 
启动 zookeeper 与 kafka 并加入开机自启动
systemctl start zookeeper
systemctl enable zookeeper
systemctl start kafka
systemctl enable kafka 
Kafka 集群验证
- 
Kafka Node01 启动生产端
/usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test01- 启动后,在终端随意输入消息内容
 
 - 
Kafka Node02/3 节点上查看消息
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test01/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test01- 执行后如果输出了在生产端输入的消息内容则正常