Flink
信息
- HAP 的聚合表、数据集成功能以及 HDP 相关能力均依赖 Flink 组件;如需启用对应功能,请按需部署 Flink 服务。
- 部署 Flink 的服务器需要先以工作节点身份加入微服务所在的 kubernetes 集群。
- 在本文档中会给 Flink 添加污点与标签,仅允许 Flink 服务调度到部署 Flink 的服务器上运行。
加载镜像
kubernetes 集群中各节点服务器上都需要操作
- 服务器支持访问互联网
- 服务器不支持访问互联网
crictl pull registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.19.720
# flink 离线镜像文件下载链接,下载完成后上传到部署服务器
https://pdpublic.mingdao.com/private-deployment/offline/mingdaoyun-flink-linux-amd64-1.19.720.tar.gz
在服务器上加载离线镜像
gunzip -d mingdaoyun-flink-linux-amd64-1.19.720.tar.gz
ctr -n k8s.io image import mingdaoyun-flink-linux-amd64-1.19.720.tar
部署 Flink
在 kubernetes master 第一台服务器上操作
-
创建 flink 配置文件存放目录
mkdir /data/mingdao/script/kubernetes/flink -
创建 flink 配置文件
先进入 flink 配置文件存放目录
cd /data/mingdao/script/kubernetes/flink执行
vim flink.yaml写入以下配置apiVersion: v1kind: ConfigMapmetadata:name: flink-confignamespace: defaultlabels:app: flinkdata:flink-conf.yaml: |+state.checkpoints.num-retained: 5cluster.evenly-spread-out-slots: trueenv.java.opts: "-Duser.timezone=Asia/Shanghai"task.cancellation.timeout: 0kubernetes.cluster-id: md-flinkkubernetes.namespace: defaulthigh-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactoryhigh-availability.storageDir: s3://mdoc/recovery#restart-strategy: nonerestart-strategy: fixed-delayrestart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10sstate.backend: rocksdbstate.checkpoints.dir: s3://mdoc/checkpoints # "s3://<your-bucket>/<endpoint>"state.savepoints.dir: s3://mdoc/savepointsstate.backend.incremental: trues3.access-key: mingdao # 对象存储aks3.secret-key: 123456789 # 对象存储sks3.ssl.enabled: false # 如果是使用的云厂商对象存储,是 https 地址则修改为 trues3.path.style.access: true # 如果使用的是非 MinIO 对象存储,则改为 falses3.endpoint: 192.168.10.16:9011 # 对象存储地址jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 100blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3072m # 单个 jobmanager 实例内存限制taskmanager.memory.process.size: 12288m # 单个 taskmanager 实例内存限制taskmanager.memory.jvm-overhead.fraction: 0.1taskmanager.memory.managed.fraction: 0.2taskmanager.memory.network.fraction: 0.1parallelism.default: 2metrics.job.status.enable: STATEmetrics.reporters: kafka_reporter,kafka_reporter_running,kafka_reporter2,kafka_reporter_running2metrics.reporter.kafka_reporter.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactorymetrics.reporter.kafka_reporter.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # kafka 地址metrics.reporter.kafka_reporter.chunk.size: 20000metrics.reporter.kafka_reporter.interval: 60smetrics.reporter.kafka_reporter.filter.metrics: numRecordsIn,numRecordsOut,runningTimemetrics.reporter.kafka_reporter.topic: flink_metrics_countermetrics.reporter.kafka_reporter.taskNamePrefix: HAP0x5c2_metrics.reporter.kafka_reporter_running.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactorymetrics.reporter.kafka_reporter_running.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # kafka 地址metrics.reporter.kafka_reporter_running.chunk.size: 20000metrics.reporter.kafka_reporter_running.interval: 60smetrics.reporter.kafka_reporter_running.filter.metrics: RUNNINGStatemetrics.reporter.kafka_reporter_running.topic: flink_metrics_gaugemetrics.reporter.kafka_reporter_running.taskNamePrefix: HAP0x5c2_metrics.reporter.kafka_reporter2.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactorymetrics.reporter.kafka_reporter2.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # kafka 地址metrics.reporter.kafka_reporter2.chunk.size: 20000metrics.reporter.kafka_reporter2.interval: 60smetrics.reporter.kafka_reporter2.filter.metrics: numRecordsIn,numRecordsOut,runningTimemetrics.reporter.kafka_reporter2.topic: flink_metrics_counter-hdpmetrics.reporter.kafka_reporter2.taskNamePrefix: HDP0x5c2_metrics.reporter.kafka_reporter_running2.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactorymetrics.reporter.kafka_reporter_running2.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # kafka 地址metrics.reporter.kafka_reporter_running2.chunk.size: 20000metrics.reporter.kafka_reporter_running2.interval: 60smetrics.reporter.kafka_reporter_running2.filter.metrics: RUNNINGStatemetrics.reporter.kafka_reporter_running2.topic: flink_metrics_gauge-hdpmetrics.reporter.kafka_reporter_running2.taskNamePrefix: HDP0x5c2_log4j-console.properties: |+# 如下配置会同时影响用户代码和 Flink 的日志行为rootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# 如果你只想改变 Flink 的日志行为则可以取消如下的注释部分#logger.flink.name = org.apache.flink#logger.flink.level = INFO# 下面几行将公共 libraries 或 connectors 的日志级别保持在 INFO 级别。# root logger 的配置不会覆盖此处配置。# 你必须手动修改这里的日志级别。logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# 将所有 info 级别的日志输出到 consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# 将所有 info 级别的日志输出到指定的 rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# 关闭 Netty channel handler 中不相关的(错误)警告logger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF---apiVersion: v1kind: ServiceAccountmetadata:name: flink-service-accountnamespace: default---kind: RoleapiVersion: rbac.authorization.k8s.io/v1metadata:name: configmap-accessnamespace: defaultrules:- apiGroups: [""]resources: ["configmaps"]verbs: ["update", "get", "watch", "list", "create", "edit", "delete", "patch"]---kind: RoleBindingapiVersion: rbac.authorization.k8s.io/v1metadata:name: configmap-access-bindingnamespace: defaultsubjects:- kind: ServiceAccountname: flink-service-accountnamespace: defaultroleRef:kind: Rolename: configmap-accessapiGroup: rbac.authorization.k8s.io---apiVersion: apps/v1kind: Deploymentmetadata:name: flink-jobmanagernamespace: defaultspec:replicas: 1 # 仅支持启动1个selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerannotations:md-update: "20210615101801239"spec:# 确保Pod仅在标签为'hap: flink'的节点上运行,即使这些节点被标记为'NoSchedule'tolerations:- key: "hap"operator: "Equal"value: "flink"effect: "NoSchedule"nodeSelector:hap: flinkcontainers:- name: jobmanagerimage: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.19.720env:- name: POD_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.podIP# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.args: ["jobmanager", "$(POD_IP)"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuireadinessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 3livenessProbe:tcpSocket:port: 6123initialDelaySeconds: 60periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf- name: localtimemountPath: /etc/localtimesecurityContext:runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMapsvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: localtimehostPath:path: /usr/share/zoneinfo/Etc/GMT-8type: ""---apiVersion: apps/v1kind: Deploymentmetadata:name: flink-taskmanagernamespace: defaultspec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerannotations:md-update: "20210615101801240"spec:# 确保Pod仅在标签为'hap: flink'的节点上运行,即使这些节点被标记为'NoSchedule'tolerations:- key: "hap"operator: "Equal"value: "flink"effect: "NoSchedule"nodeSelector:hap: flinkcontainers:- name: taskmanagerimage: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.19.720args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 60periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf/- name: localtimemountPath: /etc/localtimesecurityContext:runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMapsvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: localtimehostPath:path: /usr/share/zoneinfo/Etc/GMT-8type: ""---apiVersion: v1kind: Servicemetadata:name: flink-jobmanagernamespace: defaultspec:ports:- name: rpcport: 6123targetPort: 6123nodePort: 6123- name: blob-serverport: 6124targetPort: 6124nodePort: 6124- name: webuiport: 8081targetPort: 8081nodePort: 28081sessionAffinity: ClientIPtype: NodePortselector:app: flinkcomponent: jobmanager- 实际部署中,注意修改 ConfigMap flink-conf.yaml 中的信息
配置污点容忍度
指定 flink 服务专用服务器,与微服务不混合使用资源
-
给指定 k8s 节点打污点
kubectl taint nodes $flink_node_name hap=flink:NoSchedulekubectl label nodes $flink_node_name hap=flink$flink_node_name注意换成实际的节点名称,可通过kubectl get node查看
-
创建命名空间
kubectl create ns flink -
配置 flink 服务使用
flink命名空间sed -i 's/namespace: default/namespace: flink/g' flink.yaml
启动 flink 服务
kubectl apply -f flink.yaml
创 建启停脚本
## start_flink.sh 内容
cat > /data/mingdao/script/kubernetes/flink/start_flink.sh << 'EOF'
#!/bin/bash
baseDir=$(dirname $0)
kubectl apply -f $baseDir/flink.yaml
EOF
## stop_flink.sh 内容
cat > /data/mingdao/script/kubernetes/flink/stop_flink.sh << 'EOF'
#!/bin/bash
baseDir=$(dirname $0)
kubectl delete -f $baseDir/flink.yaml
EOF
## restart_flink.sh 内容
cat > /data/mingdao/script/kubernetes/flink/restart_flink.sh << 'EOF'
#!/bin/bash
RESTART_DATATIME=$(date +%Y%m%d%H%M%S)
baseDir=$(dirname $0)
sed -r -i "/md-update:/ s/([[:digit:]]+)/$RESTART_DATATIME/" $baseDir/flink.yaml
/bin/bash $baseDir/start_flink.sh
EOF
#### 赋权
chmod +x /data/mingdao/script/kubernetes/flink/{start_flink.sh,stop_flink.sh,restart_flink.sh}
微服务配置接入 flink
在 config.yaml 中新增变量:
ENV_FLINK_URL: "http://flink-jobmanager.flink:8081"
重启微服务生效
bash restart.sh