Skip to main content

Flink

info
  • 部署 flink 的服务器需要先以工作节点身份加入微服务所在的 kubernetes 集群。
  • 在本文档中会给 flink 添加污点与标签,仅允许 flink 服务会调度在 flink 的部署服务器上运行。

加载镜像

kubernetes 集群中各节点服务器上都需要操作

crictl pull registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.17.1.530

在 kubernetes master 第一台服务器上操作

  1. 创建 flink 配置文件存放目录

    mkdir /data/mingdao/script/kubernetes/flink
  2. 创建 flink 配置文件

    先进入 flink 配置文件存放目录

    cd /data/mingdao/script/kubernetes/flink

    执行 vim flink.yaml 写入以下配置

    apiVersion: v1
    kind: ConfigMap
    metadata:
    name: flink-config
    namespace: default
    labels:
    app: flink
    data:
    flink-conf.yaml: |+
    env.java.opts: "-Duser.timezone=Asia/Shanghai"
    task.cancellation.timeout: 0
    kubernetes.cluster-id: md-flink
    kubernetes.namespace: default
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: s3://mdoc/recovery
    #restart-strategy: none
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 3
    restart-strategy.fixed-delay.delay: 10s
    state.backend: rocksdb
    state.checkpoints.dir: s3://mdoc/checkpoints # "s3://<your-bucket>/<endpoint>"
    state.savepoints.dir: s3://mdoc/savepoints
    state.backend.incremental: true
    s3.access-key: mingdao # 对象存储ak
    s3.secret-key: 123456789 # 对象存储sk
    s3.ssl.enabled: false # 如果是使用的云厂商对象存储,是 https 地址则修改为 true
    s3.path.style.access: true # 如果使用的是非 MinIO 对象存储,则改为 false
    s3.endpoint: 192.168.10.16:9011 # 对象存储地址
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 100
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 3072m # 单个 jobmanager 实例内存限制
    taskmanager.memory.process.size: 12288m # 单个 taskmanager 实例内存限制
    taskmanager.memory.jvm-overhead.fraction: 0.05
    taskmanager.memory.managed.fraction: 0.05
    taskmanager.memory.network.fraction: 0.05
    parallelism.default: 2
    metrics.job.status.enable: "STATE"
    metrics.reporters: kafka_reporter,kafka_reporter_running
    metrics.reporter.kafka_reporter.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
    metrics.reporter.kafka_reporter.bootstrapServers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # kafka 地址
    metrics.reporter.kafka_reporter.chunkSize: 20000
    metrics.reporter.kafka_reporter.interval: 60 SECONDS
    metrics.reporter.kafka_reporter.filter: numRecordsIn,numRecordsOut,uptime
    metrics.reporter.kafka_reporter_running.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
    metrics.reporter.kafka_reporter_running.bootstrapServers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # kafka 地址
    metrics.reporter.kafka_reporter_running.chunkSize: 20000
    metrics.reporter.kafka_reporter_running.interval: 60 SECONDS
    metrics.reporter.kafka_reporter_running.filter: runningState
    log4j-console.properties: |+
    # 如下配置会同时影响用户代码和 Flink 的日志行为
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # 如果你只想改变 Flink 的日志行为则可以取消如下的注释部分
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # 下面几行将公共 libraries 或 connectors 的日志级别保持在 INFO 级别。
    # root logger 的配置不会覆盖此处配置。
    # 你必须手动修改这里的日志级别。
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # 将所有 info 级别的日志输出到 console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # 将所有 info 级别的日志输出到指定的 rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # 关闭 Netty channel handler 中不相关的(错误)警告
    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

    ---

    apiVersion: v1
    kind: ServiceAccount
    metadata:
    name: flink-service-account
    namespace: default
    ---
    kind: Role
    apiVersion: rbac.authorization.k8s.io/v1
    metadata:
    name: configmap-access
    namespace: default
    rules:
    - apiGroups: [""]
    resources: ["configmaps"]
    verbs: ["update", "get", "watch", "list", "create", "edit", "delete"]
    ---
    kind: RoleBinding
    apiVersion: rbac.authorization.k8s.io/v1
    metadata:
    name: configmap-access-binding
    namespace: default
    subjects:
    - kind: ServiceAccount
    name: flink-service-account
    namespace: default
    roleRef:
    kind: Role
    name: configmap-access
    apiGroup: rbac.authorization.k8s.io

    ---

    apiVersion: apps/v1
    kind: Deployment
    metadata:
    name: flink-jobmanager
    namespace: default
    spec:
    replicas: 1 # 仅支持启动1个
    selector:
    matchLabels:
    app: flink
    component: jobmanager
    template:
    metadata:
    labels:
    app: flink
    component: jobmanager
    annotations:
    md-update: "20210615101801239"
    spec:
    # 确保Pod仅在标签为'hap: flink'的节点上运行,即使这些节点被标记为'NoSchedule'
    tolerations:
    - key: "hap"
    operator: "Equal"
    value: "flink"
    effect: "NoSchedule"
    nodeSelector:
    hap: flink
    containers:
    - name: jobmanager
    image: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.17.1.530
    env:
    - name: POD_IP
    valueFrom:
    fieldRef:
    apiVersion: v1
    fieldPath: status.podIP
    # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
    args: ["jobmanager", "$(POD_IP)"]
    ports:
    - containerPort: 6123
    name: rpc
    - containerPort: 6124
    name: blob-server
    - containerPort: 8081
    name: webui
    readinessProbe:
    tcpSocket:
    port: 6123
    initialDelaySeconds: 30
    periodSeconds: 3
    livenessProbe:
    tcpSocket:
    port: 6123
    initialDelaySeconds: 60
    periodSeconds: 60
    volumeMounts:
    - name: flink-config-volume
    mountPath: /opt/flink/conf
    - name: localtime
    mountPath: /etc/localtime
    securityContext:
    runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
    serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
    volumes:
    - name: flink-config-volume
    configMap:
    name: flink-config
    items:
    - key: flink-conf.yaml
    path: flink-conf.yaml
    - key: log4j-console.properties
    path: log4j-console.properties
    - name: localtime
    hostPath:
    path: /usr/share/zoneinfo/Etc/GMT-8
    type: ""

    ---

    apiVersion: apps/v1
    kind: Deployment
    metadata:
    name: flink-taskmanager
    namespace: default
    spec:
    replicas: 2
    selector:
    matchLabels:
    app: flink
    component: taskmanager
    template:
    metadata:
    labels:
    app: flink
    component: taskmanager
    annotations:
    md-update: "20210615101801240"
    spec:
    # 确保Pod仅在标签为'hap: flink'的节点上运行,即使这些节点被标记为'NoSchedule'
    tolerations:
    - key: "hap"
    operator: "Equal"
    value: "flink"
    effect: "NoSchedule"
    nodeSelector:
    hap: flink
    containers:
    - name: taskmanager
    image: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.17.1.530
    args: ["taskmanager"]
    ports:
    - containerPort: 6122
    name: rpc
    - containerPort: 6125
    name: query-state
    livenessProbe:
    tcpSocket:
    port: 6122
    initialDelaySeconds: 60
    periodSeconds: 60
    volumeMounts:
    - name: flink-config-volume
    mountPath: /opt/flink/conf/
    - name: localtime
    mountPath: /etc/localtime
    securityContext:
    runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
    serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
    volumes:
    - name: flink-config-volume
    configMap:
    name: flink-config
    items:
    - key: flink-conf.yaml
    path: flink-conf.yaml
    - key: log4j-console.properties
    path: log4j-console.properties
    - name: localtime
    hostPath:
    path: /usr/share/zoneinfo/Etc/GMT-8
    type: ""

    ---

    apiVersion: v1
    kind: Service
    metadata:
    name: flink-jobmanager
    namespace: default
    spec:
    ports:
    - name: rpc
    port: 6123
    targetPort: 6123
    nodePort: 6123
    - name: blob-server
    port: 6124
    targetPort: 6124
    nodePort: 6124
    - name: webui
    port: 8081
    targetPort: 8081
    nodePort: 28081
    sessionAffinity: ClientIP
    type: NodePort
    selector:
    app: flink
    component: jobmanager
    • 实际部署中,注意修改 ConfigMap flink-conf.yaml 中的信息

配置污点容忍度

指定 flink 服务专用服务器,与微服务不混合使用资源

  1. 给指定 k8s 节点打污点

    kubectl taint nodes $flink_node_name hap=flink:NoSchedule
    kubectl label nodes $flink_node_name hap=flink
    • $flink_node_name 注意换成实际的节点名称,可通过 kubectl get node 查看
  2. 创建命名空间

    kubectl create ns flink
  3. 配置 flink 服务使用 flink 命名空间

    sed -i 's/namespace: default/namespace: flink/g' flink.yaml
kubectl apply -f flink.yaml

在 config.yaml 中新增变量:

ENV_FLINK_URL: "http://flink-jobmanager.flink:8081"

重启微服务生效

bash restart.sh