Skip to main content

Kafka Cluster

Server IPHost Role
192.168.10.7Kafka Node01
192.168.10.8Kafka Node02
192.168.10.9Kafka Node03

Kafka Node01

  1. Download the jdk installation package

    wget https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
  2. Extract jdk to the installation directory

    tar -zxvf OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
    mv jdk8u292-b10/ /usr/local/openjdk-8
  3. Configure java symlink

    ln -s /usr/local/openjdk-8/bin/java /bin/java
  4. Download the kafka installation package

    wget https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz
  5. Extract kafka to the installation directory

    tar -zxvf kafka_2.13-3.9.1.tgz -C /usr/local
    mv /usr/local/kafka_2.13-3.9.1/ /usr/local/kafka/
  6. Create data directory

    mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/
  7. Add zookeeper myid

    echo 1 > /data/kafka/zookeeper/myid
    • Note, the myid differs for each node
  8. Modify zookeeper configuration file

    cat > /usr/local/kafka/config/zookeeper.properties <<EOF
    admin.enableServer=false
    dataDir=/data/kafka/zookeeper/
    clientPort=2181
    maxClientCnxns=0
    initLimit=10
    syncLimit=5
    server.1=192.168.10.7:2888:3888
    server.2=192.168.10.8:2888:3888
    server.3=192.168.10.9:2888:3888
    EOF
    • server.1 corresponds to the server with myid content 1
    • server.2 corresponds to the server with myid content 2
    • server.3 corresponds to the server with myid content 3
  9. Modify kafka memory limit to 4g

    sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.sh
    sed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh
  10. Modify kafka configuration file

    cat > /usr/local/kafka/config/server.properties <<EOF
    broker.id=0
    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://192.168.10.7:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/data/kafka/kafka-logs/
    num.partitions=10
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=2
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    default.replication.factor=3
    acks=all
    min.insync.replicas=2
    message.max.bytes=10485760
    replica.fetch.max.bytes=10485760
    EOF
    • Note, broker.id differs for each node
    • Modify the listeners value to the actual local IP during deployment
    • Modify the zookeeper.connect value to the actual IPs of the three zookeeper nodes during deployment
  11. Create kafka user

    useradd -M -s /sbin/nologin kafka
  12. Grant permissions to kafka-related directories

    chown -R kafka:kafka /usr/local/kafka /data/kafka
  13. Configure systemd to manage zookeeper

    cat > /etc/systemd/system/zookeeper.service <<EOF
    [Unit]
    Description=Zookeeper
    [Service]
    User=kafka
    Group=kafka
    LimitNOFILE=102400
    LimitNPROC=102400
    ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
    ExecStop=/usr/bin/kill \$MAINPID
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    EOF
  14. Configure systemd to manage kafka

    cat > /etc/systemd/system/kafka.service <<EOF
    [Unit]
    Description=Kafka
    After=zookeeper.service
    Requires=zookeeper.service
    [Service]
    User=kafka
    Group=kafka
    LimitNOFILE=102400
    LimitNPROC=102400
    ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
    ExecStop=/usr/bin/kill \$MAINPID
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    EOF
  15. Start zookeeper and kafka and enable them to start on boot

    systemctl start zookeeper
    systemctl enable zookeeper
    systemctl start kafka
    systemctl enable kafka

Kafka Node02

  1. Download the jdk installation package

    wget https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
  2. Extract jdk to the installation directory

    tar -zxvf OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
    mv jdk8u292-b10/ /usr/local/openjdk-8
  3. Configure java symlink

    ln -s /usr/local/openjdk-8/bin/java /bin/java
  4. Download the kafka installation package

    wget https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz
  5. Extract kafka to the installation directory

    tar -zxvf kafka_2.13-3.9.1.tgz -C /usr/local
    mv /usr/local/kafka_2.13-3.9.1/ /usr/local/kafka/
  6. Create data directory

    mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/
  7. Add zookeeper myid

    echo 2 > /data/kafka/zookeeper/myid
    • Note, the myid differs for each node
  8. Modify zookeeper configuration file

    cat > /usr/local/kafka/config/zookeeper.properties <<EOF
    admin.enableServer=false
    dataDir=/data/kafka/zookeeper/
    clientPort=2181
    maxClientCnxns=0
    initLimit=10
    syncLimit=5
    server.1=192.168.10.7:2888:3888
    server.2=192.168.10.8:2888:3888
    server.3=192.168.10.9:2888:3888
    EOF
    • server.1 corresponds to the server with myid content 1
    • server.2 corresponds to the server with myid content 2
    • server.3 corresponds to the server with myid content 3
  9. Modify kafka memory limit to 4g

    sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.sh
    sed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh
  10. Modify kafka configuration file

    cat > /usr/local/kafka/config/server.properties <<EOF
    broker.id=1
    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://192.168.10.8:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/data/kafka/kafka-logs/
    num.partitions=10
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=2
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    default.replication.factor=3
    acks=all
    min.insync.replicas=2
    message.max.bytes=10485760
    replica.fetch.max.bytes=10485760
    EOF
    • Note, broker.id differs for each node
    • Modify the listeners value to the actual local IP during deployment
    • Modify the zookeeper.connect value to the actual IPs of the three zookeeper nodes during deployment
  11. Create kafka user

    useradd -M -s /sbin/nologin kafka
  12. Grant permissions to kafka-related directories

    chown -R kafka:kafka /usr/local/kafka /data/kafka
  13. Configure systemd to manage zookeeper

    cat > /etc/systemd/system/zookeeper.service <<EOF
    [Unit]
    Description=Zookeeper
    [Service]
    User=kafka
    Group=kafka
    LimitNOFILE=102400
    LimitNPROC=102400
    ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
    ExecStop=/usr/bin/kill \$MAINPID
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    EOF
  14. Configure systemd to manage kafka

    cat > /etc/systemd/system/kafka.service <<EOF
    [Unit]
    Description=Kafka
    After=zookeeper.service
    Requires=zookeeper.service
    [Service]
    User=kafka
    Group=kafka
    LimitNOFILE=102400
    LimitNPROC=102400
    ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
    ExecStop=/usr/bin/kill \$MAINPID
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    EOF
  15. Start zookeeper and kafka and enable them to start on boot

    systemctl start zookeeper
    systemctl enable zookeeper
    systemctl start kafka
    systemctl enable kafka

Kafka Node03

  1. Download the jdk installation package

    wget https://pdpublic.mingdao.com/private-deployment/offline/common/OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
  2. Extract jdk to the installation directory

    tar -zxvf OpenJDK8U-jdk_x64_linux_hotspot_8u292b10.tar.gz
    mv jdk8u292-b10/ /usr/local/openjdk-8
  3. Configure java symlink

    ln -s /usr/local/openjdk-8/bin/java /bin/java
  4. Download the kafka installation package

    wget https://pdpublic.mingdao.com/private-deployment/offline/common/kafka_2.13-3.9.1.tgz
  5. Extract kafka to the installation directory

    tar -zxvf kafka_2.13-3.9.1.tgz -C /usr/local
    mv /usr/local/kafka_2.13-3.9.1/ /usr/local/kafka/
  6. Create data directory

    mkdir -p /data/kafka/zookeeper/ /data/kafka/kafka-logs/
  7. Add zookeeper myid

    echo 3 > /data/kafka/zookeeper/myid
    • Note, the myid differs for each node
  8. Modify zookeeper configuration file

    cat > /usr/local/kafka/config/zookeeper.properties <<EOF
    admin.enableServer=false
    dataDir=/data/kafka/zookeeper/
    clientPort=2181
    maxClientCnxns=0
    initLimit=10
    syncLimit=5
    server.1=192.168.10.7:2888:3888
    server.2=192.168.10.8:2888:3888
    server.3=192.168.10.9:2888:3888
    EOF
    • server.1 corresponds to the server with myid content 1
    • server.2 corresponds to the server with myid content 2
    • server.3 corresponds to the server with myid content 3
  9. Modify kafka memory limit to 4g

    sed -i ':a;N;$!ba;s/Xm[xs]1G/Xmx4G/1' /usr/local/kafka/bin/kafka-server-start.sh
    sed -i ':a;N;$!ba;s/Xm[xs]1G/Xms4G/1' /usr/local/kafka/bin/kafka-server-start.sh
  10. Modify kafka configuration file

    cat > /usr/local/kafka/config/server.properties <<EOF
    broker.id=2
    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://192.168.10.9:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/data/kafka/kafka-logs/
    num.partitions=10
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=3
    transaction.state.log.replication.factor=3
    transaction.state.log.min.isr=2
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.10.7:2181,192.168.10.8:2181,192.168.10.9:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    default.replication.factor=3
    acks=all
    min.insync.replicas=2
    message.max.bytes=10485760
    replica.fetch.max.bytes=10485760
    EOF
    • Note, broker.id differs for each node
    • Modify the listeners value to the actual local IP during deployment
    • Modify the zookeeper.connect value to the actual IPs of the three zookeeper nodes during deployment
  11. Create kafka user

    useradd -M -s /sbin/nologin kafka
  12. Grant permissions to kafka-related directories

    chown -R kafka:kafka /usr/local/kafka /data/kafka
  13. Configure systemd to manage zookeeper

    cat > /etc/systemd/system/zookeeper.service <<EOF
    [Unit]
    Description=Zookeeper
    [Service]
    User=kafka
    Group=kafka
    LimitNOFILE=102400
    LimitNPROC=102400
    ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
    ExecStop=/usr/bin/kill \$MAINPID
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    EOF
  14. Configure systemd to manage kafka

    cat > /etc/systemd/system/kafka.service <<EOF
    [Unit]
    Description=Kafka
    After=zookeeper.service
    Requires=zookeeper.service
    [Service]
    User=kafka
    Group=kafka
    LimitNOFILE=102400
    LimitNPROC=102400
    ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
    ExecStop=/usr/bin/kill \$MAINPID
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    EOF
  15. Start zookeeper and kafka and enable them to start on boot

    systemctl start zookeeper
    systemctl enable zookeeper
    systemctl start kafka
    systemctl enable kafka

Kafka Cluster Verification

  1. Start producer on Kafka Node01

    /usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test01
    • After starting, input any message content in the terminal
  2. View messages on Kafka Node02/3 nodes

    /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test01
    /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test01
    • If the message content inputted on the producer side is output, it is functioning properly