Kafka 集群
| 服务器IP | 主机角色 |
|---|---|
| 192.168.10.7 | Kafka Node01 |
| 192.168.10.8 | Kafka Node02 |
| 192.168.10.9 | Kafka Node03 |
Kafka Node01
-
下载 jdk 安装包
- 服务器支持访问互联网
- 服务器不支持访问互联网
wget https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz# jdk 安装包文件下载链接,下载完成后上传到部署服务器https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz -
解压 jdk 到安装目录
tar -zxvf OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gzmv jdk-21.0.8+9 /usr/local/openjdk-21 -
配置 java 软链接
ln -s /usr/local/openjdk-21/bin/java /bin/java -
下载 kafka 安装包
- 服务器支持访问互联网
- 服务器不支持访问互联网
wget https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz# jdk 安装包文件下载链接,下载完成后上传到部署服务器https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz -
解压 kafka 到安装目录
tar -zxvf kafka_2.13-3.9.1.tgz -C /usr/localmv /usr/local/kafka_2.13-3.9.1/ /usr/local/kafka/ -
创建数据目录
mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/ -
添加 zookeeper myid
echo 1 > /data/kafka/zookeeper/myid- 注意,各节点的 myid 不同
-
修改 zookeeper 配置文件
cat > /usr/local/kafka/config/zookeeper.properties <<EOFadmin.enableServer=falsedataDir=/data/kafka/zookeeper/clientPort=2181maxClientCnxns=0initLimit=10syncLimit=5server.1=192.168.10.7:2888:3888server.2=192.168.10.8:2888:3888server.3=192.168.10.9:2888:3888EOF- server.1 对应 myid 文件内容为1的服务器
- server.2 对应 myid 文件内容为2的服务器
- server.3 对应 myid 文件内容为3的服务器
-
修改 kafka 内存限制为4g
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.shsed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh -
修改 kafka 配置文件
cat > /usr/local/kafka/config/server.properties <<'EOF'broker.id=0listeners=PLAINTEXT://0.0.0.0:9092advertised.listeners=PLAINTEXT://192.168.10.7:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/data/kafka/kafka-logs/num.partitions=10num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=3transaction.state.log.replication.factor=3transaction.state.log.min.isr=2log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0default.replication.factor=3acks=allmin.insync.replicas=2message.max.bytes=10485760replica.fetch.max.bytes=10485760EOF- 注意,broker.id 各节点值不同
- listeners 值在部署时修改为实际的本机IP
- zookeeper.connect 值部署时修改为实际的 zookeeper 三台节点IP
-
创建 kafka 用户
useradd -M -s /sbin/nologin kafka -
授权 kafka 相关目录权限
chown -R kafka:kafka /usr/local/kafka /data/kafka -
配置 systemd 管理 zookeeper
cat > /etc/systemd/system/zookeeper.service <<'EOF'[Unit]Description=Zookeeper[Service]User=kafkaGroup=kafkaLimitNOFILE=102400LimitNPROC=102400ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.propertiesExecStop=/usr/bin/kill $MAINPIDRestart=on-failure[Install]WantedBy=multi-user.targetEOF -
配置 systemd 管理 kafka
cat > /etc/systemd/system/kafka.service <<'EOF'[Unit]Description=KafkaAfter=zookeeper.serviceRequires=zookeeper.service[Service]User=kafkaGroup=kafkaLimitNOFILE=102400LimitNPROC=102400ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiesExecStop=/usr/bin/kill $MAINPIDRestart=on-failure[Install]WantedBy=multi-user.targetEOF -
启动 zookeeper 与 kafka 并加入开机自启动
systemctl start zookeepersystemctl enable zookeepersystemctl start kafkasystemctl enable kafka -
服务状态检查
systemctl status zookeepersystemctl status kafka
Kafka Node02
-
下载 jdk 安装包
- 服务器支持访问互联网
- 服务器不支持访问互联网
wget https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz# jdk 安装包文件下载链接,下载完成后上传到部署服务器https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz -
解压 jdk 到安装目录
tar -zxvf OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gzmv jdk-21.0.8+9 /usr/local/openjdk-21 -
配置 java 软链接
ln -s /usr/local/openjdk-21/bin/java /bin/java -
下载 kafka 安装包
- 服务器支持访问互联网
- 服务器不支持访问互联网
wget https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz# jdk 安装包文件下载链接,下载完成后上传到部署服务器https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz -
解压 kafka 到安装目录
tar -zxvf kafka_2.13-3.9.1.tgz -C /usr/localmv /usr/local/kafka_2.13-3.9.1/ /usr/local/kafka/ -
创建数据目录
mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/ -
添加 zookeeper myid
echo 2 > /data/kafka/zookeeper/myid- 注意,各节点的 myid 不同
-
修改 zookeeper 配置文件
cat > /usr/local/kafka/config/zookeeper.properties <<EOFadmin.enableServer=falsedataDir=/data/kafka/zookeeper/clientPort=2181maxClientCnxns=0initLimit=10syncLimit=5server.1=192.168.10.7:2888:3888server.2=192.168.10.8:2888:3888server.3=192.168.10.9:2888:3888EOF- server.1 对应 myid 文件内容为1的服务器
- server.2 对应 myid 文件内容为2的服务器
- server.3 对应 myid 文件内容为3的服务器
-
修改 kafka 内存限制为4g
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.shsed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh -
修改 kafka 配置文件
cat > /usr/local/kafka/config/server.properties <<'EOF'broker.id=1listeners=PLAINTEXT://0.0.0.0:9092advertised.listeners=PLAINTEXT://192.168.10.8:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/data/kafka/kafka-logs/num.partitions=10num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=3transaction.state.log.replication.factor=3transaction.state.log.min.isr=2log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0default.replication.factor=3acks=allmin.insync.replicas=2message.max.bytes=10485760replica.fetch.max.bytes=10485760EOF- 注意,broker.id 各节点值不同
- listeners 值在部署时修改为实际的本机IP
- zookeeper.connect 值部署时修改为实际的 zookeeper 三台节点IP
-
创建 kafka 用户
useradd -M -s /sbin/nologin kafka -
授权 kafka 相关目录权限
chown -R kafka:kafka /usr/local/kafka /data/kafka -
配置 systemd 管理 zookeeper
cat > /etc/systemd/system/zookeeper.service <<'EOF'[Unit]Description=Zookeeper[Service]User=kafkaGroup=kafkaLimitNOFILE=102400LimitNPROC=102400ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.propertiesExecStop=/usr/bin/kill $MAINPIDRestart=on-failure[Install]WantedBy=multi-user.targetEOF -
配置 systemd 管理 kafka
cat > /etc/systemd/system/kafka.service <<'EOF'[Unit]Description=KafkaAfter=zookeeper.serviceRequires=zookeeper.service[Service]User=kafkaGroup=kafkaLimitNOFILE=102400LimitNPROC=102400ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiesExecStop=/usr/bin/kill $MAINPIDRestart=on-failure[Install]WantedBy=multi-user.targetEOF -
启动 zookeeper 与 kafka 并加入开机自启动
systemctl start zookeepersystemctl enable zookeepersystemctl start kafkasystemctl enable kafka -
服务状态检查
systemctl status zookeepersystemctl status kafka
Kafka Node03
-
下载 jdk 安装包
- 服务器支持访问互联网
- 服务器不支持访问互联网
wget https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz# jdk 安装包文件下载链接,下载完成后上传到部署服务器https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gz -
解压 jdk 到安装目录
tar -zxvf OpenJDK21U-jdk_x64_linux_hotspot_21.0.8_9.tar.gzmv jdk-21.0.8+9 /usr/local/openjdk-21 -
配置 java 软链接
ln -s /usr/local/openjdk-21/bin/java /bin/java -
下载 kafka 安装包
- 服务器支持访问互联网
- 服务器不支持访问互联网
wget https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz# jdk 安装包文件下载链接,下载完成后上传到部署服务器https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz -
解压 kafka 到安装目录
tar -zxvf kafka_2.13-3.9.1.tgz -C /usr/localmv /usr/local/kafka_2.13-3.9.1/ /usr/local/kafka/ -
创建数据目录
mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/ -
添加 zookeeper myid
echo 3 > /data/kafka/zookeeper/myid- 注意,各节点的 myid 不同
-
修改 zookeeper 配置文件
cat > /usr/local/kafka/config/zookeeper.properties <<EOFadmin.enableServer=falsedataDir=/data/kafka/zookeeper/clientPort=2181maxClientCnxns=0initLimit=10syncLimit=5server.1=192.168.10.7:2888:3888server.2=192.168.10.8:2888:3888server.3=192.168.10.9:2888:3888EOF- server.1 对应 myid 文件内容为1的服务器
- server.2 对应 myid 文件内容为2的服务器
- server.3 对应 myid 文件内容为3的服务器
-
修改 kafka 内存限制为4g
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.shsed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh -
修改 kafka 配置文件
cat > /usr/local/kafka/config/server.properties <<'EOF'broker.id=2listeners=PLAINTEXT://0.0.0.0:9092advertised.listeners=PLAINTEXT://192.168.10.9:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/data/kafka/kafka-logs/num.partitions=10num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=3transaction.state.log.replication.factor=3transaction.state.log.min.isr=2log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0default.replication.factor=3acks=allmin.insync.replicas=2message.max.bytes=10485760replica.fetch.max.bytes=10485760EOF- 注意,broker.id 各节点值不同
- listeners 值在部署时修改为实际的本机IP
- zookeeper.connect 值部署时修改为实际的 zookeeper 三台节点IP
-
创建 kafka 用户
useradd -M -s /sbin/nologin kafka -
授权 kafka 相关目录权限
chown -R kafka:kafka /usr/local/kafka /data/kafka -
配置 systemd 管理 zookeeper
cat > /etc/systemd/system/zookeeper.service <<'EOF'[Unit]Description=Zookeeper[Service]User=kafkaGroup=kafkaLimitNOFILE=102400LimitNPROC=102400ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.propertiesExecStop=/usr/bin/kill $MAINPIDRestart=on-failure[Install]WantedBy=multi-user.targetEOF -
配置 systemd 管理 kafka
cat > /etc/systemd/system/kafka.service <<'EOF'[Unit]Description=KafkaAfter=zookeeper.serviceRequires=zookeeper.service[Service]User=kafkaGroup=kafkaLimitNOFILE=102400LimitNPROC=102400ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiesExecStop=/usr/bin/kill $MAINPIDRestart=on-failure[Install]WantedBy=multi-user.targetEOF -
启动 zookeeper 与 kafka 并加入开机自启动
systemctl start zookeepersystemctl enable zookeepersystemctl start kafkasystemctl enable kafka -
服务状态检查
systemctl status zookeepersystemctl status kafka
Kafka 集群验证
-
Kafka Node01 启动生产端
/usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test01- 启动后,在终端随意输入消息内容
-
Kafka Node02/3 节点上查看消息
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test01/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test01- 执行后如果输出了在生产端输入的消息内容则正常