Kafka 集群
服务器IP | 主机角色 |
---|---|
192.168.10.7 | Kafka Node01 |
192.168.10.8 | Kafka Node02 |
192.168.10.9 | Kafka Node03 |
Kafka Node01
-
下载 jdk 安装包
- 服务器支持访问互联网
- 服务器不支持访问互联网
wget http://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
# jdk 安装包文件下载链接,下载完成后上传到部署服务器
http://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz -
解压 jdk 到安装目录
tar -zxvf OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
mv jdk8u292-b10/ /usr/local/openjdk-8 -
配置 java 软链接
ln -s /usr/local/openjdk-8/bin/java /bin/java
-
下载 kafka 安装包
- 服务器支持访问互联网
- 服务器不支持访问互联网
wget http://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.6.2.tgz
# jdk 安装包文件下载链接,下载完成后上传到部署服务器
http://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.6.2.tgz -
解压 kafka 到安装目录
tar -zxvf kafka_2.13-3.6.2.tgz -C /usr/local
mv /usr/local/kafka_2.13-3.6.2/ /usr/local/kafka/ -
创建数据目录
mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/
-
添加 zookeeper myid
echo 1 > /data/kafka/zookeeper/myid
- 注意,各节点的 myid 不同
-
修改 zookeeper 配置文件
cat > /usr/local/kafka/config/zookeeper.properties <<EOF
admin.enableServer=false
dataDir=/data/kafka/zookeeper/
clientPort=2181
maxClientCnxns=0
initLimit=10
syncLimit=5
server.1=192.168.10.7:2888:3888
server.2=192.168.10.8:2888:3888
server.3=192.168.10.9:2888:3888
EOF- server.1 对应 myid 文件内容为1的服务器
- server.2 对应 myid 文件内容为2的服务器
- server.3 对应 myid 文件内容为3的服务器
-
修改 kafka 内存限制为4g
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.sh
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh -
修改 kafka 配置文件
cat > /usr/local/kafka/config/server.properties <<EOF
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.10.7:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs/
num.partitions=10
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
default.replication.factor=3
acks=all
min.insync.replicas=2
message.max.bytes=10485760
replica.fetch.max.bytes=10485760
EOF- 注意,broker.id 各节点值不同
- listeners 值在部署时修改为实际的本机IP
- zookeeper.connect 值部署时修改为实际的 zookeeper 三台节点IP
-
创建 kafka 用户
useradd -M -s /sbin/nologin kafka
-
授权 kafka 相关目录权限
chown -R kafka:kafka /usr/local/kafka /data/kafka
-
配置 systemd 管理 zookeeper
cat > /etc/systemd/system/zookeeper.service <<EOF
[Unit]
Description=Zookeeper
[Service]
User=kafka
Group=kafka
LimitNOFILE=102400
LimitNPROC=102400
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/bin/kill \$MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF -
配置 systemd 管理 kafka
cat > /etc/systemd/system/kafka.service <<EOF
[Unit]
Description=Kafka
After=zookeeper.service
Requires=zookeeper.service
[Service]
User=kafka
Group=kafka
LimitNOFILE=102400
LimitNPROC=102400
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/bin/kill \$MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF -
启动 zookeeper 与 kafka 并加入开机自启动
systemctl start zookeeper
systemctl enable zookeeper
systemctl start kafka
systemctl enable kafka
Kafka Node02
-
下载 jdk 安装包
- 服务器支持访问互联网
- 服务器不支持访问互联网
wget http://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
# jdk 安装包文件下载链接,下载完成后上传到部署服务器
http://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz -
解压 jdk 到安装目录
tar -zxvf OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
mv jdk8u292-b10/ /usr/local/openjdk-8 -
配置 java 软链接
ln -s /usr/local/openjdk-8/bin/java /bin/java
-
下载 kafka 安装包
- 服务器支持访问互联网
- 服务器不支持访问互联网
wget http://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.6.2.tgz
# jdk 安装包文件下载链接,下载完成后上传到部署服务器
http://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.6.2.tgz -
解压 kafka 到安装目录
tar -zxvf kafka_2.13-3.6.2.tgz -C /usr/local
mv /usr/local/kafka_2.13-3.6.2/ /usr/local/kafka/ -
创建数据目录
mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/
-
添加 zookeeper myid
echo 2 > /data/kafka/zookeeper/myid
- 注意,各节点的 myid 不同
-
修改 zookeeper 配置文件
cat > /usr/local/kafka/config/zookeeper.properties <<EOF
admin.enableServer=false
dataDir=/data/kafka/zookeeper/
clientPort=2181
maxClientCnxns=0
initLimit=10
syncLimit=5
server.1=192.168.10.7:2888:3888
server.2=192.168.10.8:2888:3888
server.3=192.168.10.9:2888:3888
EOF- server.1 对应 myid 文件内容为1的服务器
- server.2 对应 myid 文件内容为2的服务器
- server.3 对应 myid 文件内容为3的服务器
-
修改 kafka 内存限制为4g
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.sh
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh -
修改 kafka 配置文件
cat > /usr/local/kafka/config/server.properties <<EOF
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.10.8:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs/
num.partitions=10
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
default.replication.factor=3
acks=all
min.insync.replicas=2
message.max.bytes=10485760
replica.fetch.max.bytes=10485760
EOF- 注意,broker.id 各节点值不同
- listeners 值在部署时修改为实际的本机IP
- zookeeper.connect 值部署时修改为实际的 zookeeper 三台节点IP
-
创建 kafka 用户
useradd -M -s /sbin/nologin kafka
-
授权 kafka 相关目录权限
chown -R kafka:kafka /usr/local/kafka /data/kafka
-
配置 systemd 管理 zookeeper
cat > /etc/systemd/system/zookeeper.service <<EOF
[Unit]
Description=Zookeeper
[Service]
User=kafka
Group=kafka
LimitNOFILE=102400
LimitNPROC=102400
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/bin/kill \$MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF -
配置 systemd 管理 kafka
cat > /etc/systemd/system/kafka.service <<EOF
[Unit]
Description=Kafka
After=zookeeper.service
Requires=zookeeper.service
[Service]
User=kafka
Group=kafka
LimitNOFILE=102400
LimitNPROC=102400
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/bin/kill \$MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF -
启动 zookeeper 与 kafka 并加入开机自启动
systemctl start zookeeper
systemctl enable zookeeper
systemctl start kafka
systemctl enable kafka
Kafka Node03
-
下载 jdk 安装包
- 服务器支持访问互联网
- 服务器不支持访问互联网
wget http://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
# jdk 安装包文件下载链接,下载完成后上传到部署服务器
http://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz -
解压 jdk 到安装目录
tar -zxvf OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
mv jdk8u292-b10/ /usr/local/openjdk-8 -
配置 java 软链接
ln -s /usr/local/openjdk-8/bin/java /bin/java
-
下载 kafka 安装包
- 服务器支持访问互联网
- 服务器不支持访问互联网
wget http://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.6.2.tgz
# jdk 安装包文件下载链接,下载完成后上传到部署服务器
http://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.6.2.tgz -
解压 kafka 到安装目录
tar -zxvf kafka_2.13-3.6.2.tgz -C /usr/local
mv /usr/local/kafka_2.13-3.6.2/ /usr/local/kafka/ -
创建数据目录
mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/
-
添加 zookeeper myid
echo 3 > /data/kafka/zookeeper/myid
- 注意,各节点的 myid 不同
-
修改 zookeeper 配置文件
cat > /usr/local/kafka/config/zookeeper.properties <<EOF
admin.enableServer=false
dataDir=/data/kafka/zookeeper/
clientPort=2181
maxClientCnxns=0
initLimit=10
syncLimit=5
server.1=192.168.10.7:2888:3888
server.2=192.168.10.8:2888:3888
server.3=192.168.10.9:2888:3888
EOF- server.1 对应 myid 文件内容为1的服务器
- server.2 对应 myid 文件内容为2的服务器
- server.3 对应 myid 文件内容为3的服务器
-
修改 kafka 内存限制为4g
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.sh
sed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh -
修改 kafka 配置文件
cat > /usr/local/kafka/config/server.properties <<EOF
broker.id=2
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.10.9:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs/
num.partitions=10
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
default.replication.factor=3
acks=all
min.insync.replicas=2
message.max.bytes=10485760
replica.fetch.max.bytes=10485760
EOF- 注意,broker.id 各节点值不同
- listeners 值在部署时修改为实际的本机IP
- zookeeper.connect 值部署时修改为实际的 zookeeper 三台节点IP
-
创建 kafka 用户
useradd -M -s /sbin/nologin kafka
-
授权 kafka 相关目录权限
chown -R kafka:kafka /usr/local/kafka /data/kafka
-
配置 systemd 管理 zookeeper
cat > /etc/systemd/system/zookeeper.service <<EOF
[Unit]
Description=Zookeeper
[Service]
User=kafka
Group=kafka
LimitNOFILE=102400
LimitNPROC=102400
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/bin/kill \$MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF -
配置 systemd 管理 kafka
cat > /etc/systemd/system/kafka.service <<EOF
[Unit]
Description=Kafka
After=zookeeper.service
Requires=zookeeper.service
[Service]
User=kafka
Group=kafka
LimitNOFILE=102400
LimitNPROC=102400
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/bin/kill \$MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF -
启动 zookeeper 与 kafka 并加入开机自启动
systemctl start zookeeper
systemctl enable zookeeper
systemctl start kafka
systemctl enable kafka
Kafka 集群验证
-
Kafka Node01 启动生产端
/usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test01
- 启动后,在终端随意输入消息内容
-
Kafka Node02/3 节点上查看消息
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test01
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test01
- 执行后如果输出了在生产端输入的消息内容则正常