Skip to main content

Flink

info
  • The server for deploying Flink needs to join the Kubernetes cluster where the microservices reside as a worker node first.
  • In this document, taints and labels will be added to Flink to ensure that only the Flink service is scheduled to run on the deployment server for Flink.

Loading Images

Operations need to be performed on each node server in the Kubernetes cluster

crictl pull registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.17.1.530

Operations to be performed on the first Kubernetes master server

  1. Create a directory for Flink configuration files

    mkdir /data/mingdao/script/kubernetes/flink
  2. Create the Flink configuration file

    First, navigate to the directory for storing Flink configuration files

    cd /data/mingdao/script/kubernetes/flink

    Execute vim flink.yaml and enter the following configuration

    apiVersion: v1
    kind: ConfigMap
    metadata:
    name: flink-config
    namespace: default
    labels:
    app: flink
    data:
    flink-conf.yaml: |+
    env.java.opts: "-Duser.timezone=Asia/Shanghai"
    task.cancellation.timeout: 0
    kubernetes.cluster-id: md-flink
    kubernetes.namespace: default
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: s3://mdoc/recovery
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 3
    restart-strategy.fixed-delay.delay: 10s
    state.backend: rocksdb
    state.checkpoints.dir: s3://mdoc/checkpoints # "s3://<your-bucket>/<endpoint>"
    state.savepoints.dir: s3://mdoc/savepoints
    state.backend.incremental: true
    s3.access-key: mingdao # Object storage access key
    s3.secret-key: 123456789 # Object storage secret key
    s3.ssl.enabled: false # Modify to true if using cloud object storage with an HTTPS address
    s3.path.style.access: true # Change to false if not using MinIO object storage
    s3.endpoint: 192.168.10.16:9011 # Object storage address
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 100
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 3072m # Memory limit for a single jobmanager instance
    taskmanager.memory.process.size: 12288m # Memory limit for a single taskmanager instance
    taskmanager.memory.jvm-overhead.fraction: 0.1
    taskmanager.memory.managed.fraction: 0.2
    taskmanager.memory.network.fraction: 0.1
    parallelism.default: 2
    metrics.job.status.enable: "STATE"
    metrics.reporters: kafka_reporter,kafka_reporter_running
    metrics.reporter.kafka_reporter.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
    metrics.reporter.kafka_reporter.bootstrapServers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka address
    metrics.reporter.kafka_reporter.chunkSize: 20000
    metrics.reporter.kafka_reporter.interval: 60 SECONDS
    metrics.reporter.kafka_reporter.filter: numRecordsIn,numRecordsOut,uptime
    metrics.reporter.kafka_reporter_running.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
    metrics.reporter.kafka_reporter_running.bootstrapServers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka address
    metrics.reporter.kafka_reporter_running.chunkSize: 20000
    metrics.reporter.kafka_reporter_running.interval: 60 SECONDS
    metrics.reporter.kafka_reporter_running.filter: runningState
    log4j-console.properties: |+
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

    ---

    apiVersion: v1
    kind: ServiceAccount
    metadata:
    name: flink-service-account
    namespace: default
    ---
    kind: Role
    apiVersion: rbac.authorization.k8s.io/v1
    metadata:
    name: configmap-access
    namespace: default
    rules:
    - apiGroups: [""]
    resources: ["configmaps"]
    verbs: ["update", "get", "watch", "list", "create", "edit", "delete"]
    ---
    kind: RoleBinding
    apiVersion: rbac.authorization.k8s.io/v1
    metadata:
    name: configmap-access-binding
    namespace: default
    subjects:
    - kind: ServiceAccount
    name: flink-service-account
    namespace: default
    roleRef:
    kind: Role
    name: configmap-access
    apiGroup: rbac.authorization.k8s.io

    ---

    apiVersion: apps/v1
    kind: Deployment
    metadata:
    name: flink-jobmanager
    namespace: default
    spec:
    replicas: 1 # Only supports launching 1
    selector:
    matchLabels:
    app: flink
    component: jobmanager
    template:
    metadata:
    labels:
    app: flink
    component: jobmanager
    annotations:
    md-update: "20210615101801239"
    spec:
    tolerations:
    - key: "hap"
    operator: "Equal"
    value: "flink"
    effect: "NoSchedule"
    nodeSelector:
    hap: flink
    containers:
    - name: jobmanager
    image: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.17.1.530
    env:
    - name: POD_IP
    valueFrom:
    fieldRef:
    apiVersion: v1
    fieldPath: status.podIP
    args: ["jobmanager", "$(POD_IP)"]
    ports:
    - containerPort: 6123
    name: rpc
    - containerPort: 6124
    name: blob-server
    - containerPort: 8081
    name: webui
    readinessProbe:
    tcpSocket:
    port: 6123
    initialDelaySeconds: 30
    periodSeconds: 3
    livenessProbe:
    tcpSocket:
    port: 6123
    initialDelaySeconds: 60
    periodSeconds: 60
    volumeMounts:
    - name: flink-config-volume
    mountPath: /opt/flink/conf
    - name: localtime
    mountPath: /etc/localtime
    securityContext:
    runAsUser: 9999 # Refers to the _flink_ user in the official Flink image, can be modified if necessary
    serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
    volumes:
    - name: flink-config-volume
    configMap:
    name: flink-config
    items:
    - key: flink-conf.yaml
    path: flink-conf.yaml
    - key: log4j-console.properties
    path: log4j-console.properties
    - name: localtime
    hostPath:
    path: /usr/share/zoneinfo/Etc/GMT-8
    type: ""

    ---

    apiVersion: apps/v1
    kind: Deployment
    metadata:
    name: flink-taskmanager
    namespace: default
    spec:
    replicas: 2
    selector:
    matchLabels:
    app: flink
    component: taskmanager
    template:
    metadata:
    labels:
    app: flink
    component: taskmanager
    annotations:
    md-update: "20210615101801240"
    spec:
    tolerations:
    - key: "hap"
    operator: "Equal"
    value: "flink"
    effect: "NoSchedule"
    nodeSelector:
    hap: flink
    containers:
    - name: taskmanager
    image: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.17.1.530
    args: ["taskmanager"]
    ports:
    - containerPort: 6122
    name: rpc
    - containerPort: 6125
    name: query-state
    livenessProbe:
    tcpSocket:
    port: 6122
    initialDelaySeconds: 60
    periodSeconds: 60
    volumeMounts:
    - name: flink-config-volume
    mountPath: /opt/flink/conf/
    - name: localtime
    mountPath: /etc/localtime
    securityContext:
    runAsUser: 9999 # Refers to the _flink_ user in the official Flink image, can be modified if necessary
    serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
    volumes:
    - name: flink-config-volume
    configMap:
    name: flink-config
    items:
    - key: flink-conf.yaml
    path: flink-conf.yaml
    - key: log4j-console.properties
    path: log4j-console.properties
    - name: localtime
    hostPath:
    path: /usr/share/zoneinfo/Etc/GMT-8
    type: ""

    ---

    apiVersion: v1
    kind: Service
    metadata:
    name: flink-jobmanager
    namespace: default
    spec:
    ports:
    - name: rpc
    port: 6123
    targetPort: 6123
    nodePort: 6123
    - name: blob-server
    port: 6124
    targetPort: 6124
    nodePort: 6124
    - name: webui
    port: 8081
    targetPort: 8081
    nodePort: 28081
    sessionAffinity: ClientIP
    type: NodePort
    selector:
    app: flink
    component: jobmanager
    • During actual deployment, ensure to modify the information in the ConfigMap flink-conf.yaml

Configure Taint Tolerance

Designate dedicated servers for the Flink service, not to mix resources with microservices

  1. Apply taint to the specified Kubernetes nodes

    kubectl taint nodes $flink_node_name hap=flink:NoSchedule
    kubectl label nodes $flink_node_name hap=flink
    • Replace $flink_node_name with the actual node name, which can be viewed using kubectl get node
  2. Create a namespace

    kubectl create ns flink
  3. Configure Flink service to use the flink namespace

    sed -i 's/namespace: default/namespace: flink/g' flink.yaml
kubectl apply -f flink.yaml

Add new variables to the config.yaml:

ENV_FLINK_URL: "http://flink-jobmanager.flink:8081"

Restart microservices to take effect

bash restart.sh