Skip to main content

Kafka 集群

服务器IP主机角色
192.168.10.7Kafka Node01
192.168.10.8Kafka Node02
192.168.10.9Kafka Node03

Kafka Node01

  1. 下载 jdk 安装包

    wget http://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
  2. 解压 jdk 到安装目录

    tar -zxvf OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
    mv jdk8u292-b10/ /usr/local/openjdk-8
  3. 配置 java 软链接

    ln -s /usr/local/openjdk-8/bin/java /bin/java
  4. 下载 kafka 安装包

    wget http://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.6.2.tgz
  5. 解压 kafka 到安装目录

    tar -zxvf kafka_2.13-3.6.2.tgz -C /usr/local
    mv /usr/local/kafka_2.13-3.6.2/ /usr/local/kafka/
  6. 创建数据目录

    mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/
  7. 添加 zookeeper myid

    echo 1 > /data/kafka/zookeeper/myid
    • 注意,各节点的 myid 不同
  8. 修改 zookeeper 配置文件

    cat > /usr/local/kafka/config/zookeeper.properties <<EOF
    admin.enableServer=false
    dataDir=/data/kafka/zookeeper/
    clientPort=2181
    maxClientCnxns=0
    initLimit=10
    syncLimit=5
    server.1=192.168.10.7:2888:3888
    server.2=192.168.10.8:2888:3888
    server.3=192.168.10.9:2888:3888
    EOF
    • server.1 对应 myid 文件内容为1的服务器
    • server.2 对应 myid 文件内容为2的服务器
    • server.3 对应 myid 文件内容为3的服务器
  9. 修改 kafka 内存限制为4g

    sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.sh
    sed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh
  10. 修改 kafka 配置文件

    cat > /usr/local/kafka/config/server.properties <<EOF
    broker.id=0
    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://192.168.10.7:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/data/kafka/kafka-logs/
    num.partitions=10
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=2
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    default.replication.factor=3
    acks=all
    min.insync.replicas=2
    message.max.bytes=10485760
    replica.fetch.max.bytes=10485760
    EOF
    • 注意,broker.id 各节点值不同
    • listeners 值在部署时修改为实际的本机IP
    • zookeeper.connect 值部署时修改为实际的 zookeeper 三台节点IP
  11. 创建 kafka 用户

    useradd -M -s /sbin/nologin kafka
  12. 授权 kafka 相关目录权限

    chown -R kafka:kafka /usr/local/kafka /data/kafka
  13. 配置 systemd 管理 zookeeper

    cat > /etc/systemd/system/zookeeper.service <<EOF
    [Unit]
    Description=Zookeeper
    [Service]
    User=kafka
    Group=kafka
    LimitNOFILE=102400
    LimitNPROC=102400
    ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
    ExecStop=/usr/bin/kill \$MAINPID
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    EOF
  14. 配置 systemd 管理 kafka

    cat > /etc/systemd/system/kafka.service <<EOF
    [Unit]
    Description=Kafka
    After=zookeeper.service
    Requires=zookeeper.service
    [Service]
    User=kafka
    Group=kafka
    LimitNOFILE=102400
    LimitNPROC=102400
    ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
    ExecStop=/usr/bin/kill \$MAINPID
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    EOF
  15. 启动 zookeeper 与 kafka 并加入开机自启动

    systemctl start zookeeper
    systemctl enable zookeeper
    systemctl start kafka
    systemctl enable kafka

Kafka Node02

  1. 下载 jdk 安装包

    wget http://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
  2. 解压 jdk 到安装目录

    tar -zxvf OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
    mv jdk8u292-b10/ /usr/local/openjdk-8
  3. 配置 java 软链接

    ln -s /usr/local/openjdk-8/bin/java /bin/java
  4. 下载 kafka 安装包

    wget http://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.6.2.tgz
  5. 解压 kafka 到安装目录

    tar -zxvf kafka_2.13-3.6.2.tgz -C /usr/local
    mv /usr/local/kafka_2.13-3.6.2/ /usr/local/kafka/
  6. 创建数据目录

    mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/
  7. 添加 zookeeper myid

    echo 2 > /data/kafka/zookeeper/myid
    • 注意,各节点的 myid 不同
  8. 修改 zookeeper 配置文件

    cat > /usr/local/kafka/config/zookeeper.properties <<EOF
    admin.enableServer=false
    dataDir=/data/kafka/zookeeper/
    clientPort=2181
    maxClientCnxns=0
    initLimit=10
    syncLimit=5
    server.1=192.168.10.7:2888:3888
    server.2=192.168.10.8:2888:3888
    server.3=192.168.10.9:2888:3888
    EOF
    • server.1 对应 myid 文件内容为1的服务器
    • server.2 对应 myid 文件内容为2的服务器
    • server.3 对应 myid 文件内容为3的服务器
  9. 修改 kafka 内存限制为4g

    sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.sh
    sed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh
  10. 修改 kafka 配置文件

    cat > /usr/local/kafka/config/server.properties <<EOF
    broker.id=1
    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://192.168.10.8:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/data/kafka/kafka-logs/
    num.partitions=10
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=2
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    default.replication.factor=3
    acks=all
    min.insync.replicas=2
    message.max.bytes=10485760
    replica.fetch.max.bytes=10485760
    EOF
    • 注意,broker.id 各节点值不同
    • listeners 值在部署时修改为实际的本机IP
    • zookeeper.connect 值部署时修改为实际的 zookeeper 三台节点IP
  11. 创建 kafka 用户

    useradd -M -s /sbin/nologin kafka
  12. 授权 kafka 相关目录权限

    chown -R kafka:kafka /usr/local/kafka /data/kafka
  13. 配置 systemd 管理 zookeeper

    cat > /etc/systemd/system/zookeeper.service <<EOF
    [Unit]
    Description=Zookeeper
    [Service]
    User=kafka
    Group=kafka
    LimitNOFILE=102400
    LimitNPROC=102400
    ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
    ExecStop=/usr/bin/kill \$MAINPID
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    EOF
  14. 配置 systemd 管理 kafka

    cat > /etc/systemd/system/kafka.service <<EOF
    [Unit]
    Description=Kafka
    After=zookeeper.service
    Requires=zookeeper.service
    [Service]
    User=kafka
    Group=kafka
    LimitNOFILE=102400
    LimitNPROC=102400
    ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
    ExecStop=/usr/bin/kill \$MAINPID
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    EOF
  15. 启动 zookeeper 与 kafka 并加入开机自启动

    systemctl start zookeeper
    systemctl enable zookeeper
    systemctl start kafka
    systemctl enable kafka

Kafka Node03

  1. 下载 jdk 安装包

    wget http://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
  2. 解压 jdk 到安装目录

    tar -zxvf OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
    mv jdk8u292-b10/ /usr/local/openjdk-8
  3. 配置 java 软链接

    ln -s /usr/local/openjdk-8/bin/java /bin/java
  4. 下载 kafka 安装包

    wget http://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.6.2.tgz
  5. 解压 kafka 到安装目录

    tar -zxvf kafka_2.13-3.6.2.tgz -C /usr/local
    mv /usr/local/kafka_2.13-3.6.2/ /usr/local/kafka/
  6. 创建数据目录

    mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/
  7. 添加 zookeeper myid

    echo 3 > /data/kafka/zookeeper/myid
    • 注意,各节点的 myid 不同
  8. 修改 zookeeper 配置文件

    cat > /usr/local/kafka/config/zookeeper.properties <<EOF
    admin.enableServer=false
    dataDir=/data/kafka/zookeeper/
    clientPort=2181
    maxClientCnxns=0
    initLimit=10
    syncLimit=5
    server.1=192.168.10.7:2888:3888
    server.2=192.168.10.8:2888:3888
    server.3=192.168.10.9:2888:3888
    EOF
    • server.1 对应 myid 文件内容为1的服务器
    • server.2 对应 myid 文件内容为2的服务器
    • server.3 对应 myid 文件内容为3的服务器
  9. 修改 kafka 内存限制为4g

    sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.sh
    sed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh
  10. 修改 kafka 配置文件

    cat > /usr/local/kafka/config/server.properties <<EOF
    broker.id=2
    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://192.168.10.9:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/data/kafka/kafka-logs/
    num.partitions=10
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=2
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    default.replication.factor=3
    acks=all
    min.insync.replicas=2
    message.max.bytes=10485760
    replica.fetch.max.bytes=10485760
    EOF
    • 注意,broker.id 各节点值不同
    • listeners 值在部署时修改为实际的本机IP
    • zookeeper.connect 值部署时修改为实际的 zookeeper 三台节点IP
  11. 创建 kafka 用户

    useradd -M -s /sbin/nologin kafka
  12. 授权 kafka 相关目录权限

    chown -R kafka:kafka /usr/local/kafka /data/kafka
  13. 配置 systemd 管理 zookeeper

    cat > /etc/systemd/system/zookeeper.service <<EOF
    [Unit]
    Description=Zookeeper
    [Service]
    User=kafka
    Group=kafka
    LimitNOFILE=102400
    LimitNPROC=102400
    ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
    ExecStop=/usr/bin/kill \$MAINPID
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    EOF
  14. 配置 systemd 管理 kafka

    cat > /etc/systemd/system/kafka.service <<EOF
    [Unit]
    Description=Kafka
    After=zookeeper.service
    Requires=zookeeper.service
    [Service]
    User=kafka
    Group=kafka
    LimitNOFILE=102400
    LimitNPROC=102400
    ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
    ExecStop=/usr/bin/kill \$MAINPID
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    EOF
  15. 启动 zookeeper 与 kafka 并加入开机自启动

    systemctl start zookeeper
    systemctl enable zookeeper
    systemctl start kafka
    systemctl enable kafka

Kafka 集群验证

  1. Kafka Node01 启动生产端

    /usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test01
    • 启动后,在终端随意输入消息内容
  2. Kafka Node02/3 节点上查看消息

    /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test01
    /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test01
    • 执行后如果输出了在生产端输入的消息内容则正常