Flink
info
- 部署 flink 的服务器需要先以工作节点身份加入微服务所在的 kubernetes 集群。
- 在本文档中会给 flink 添加污点与标签,仅允许 flink 服务会调度在 flink 的部署服务器上运行。
加载镜像
kubernetes 集群中各节点服务器上都需要操作
- 服务器支持访问互联网
- 服务器不支持访问互联网
crictl pull registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.17.1.530
# flink 离线镜像文件下载链接,下载完成后上传到部署服务器
https://pdpublic.mingdao.com/private-deployment/offline/mingdaoyun-flink-linux-amd64-1.17.1.530.tar.gz
在服务器上加载离线镜像
gunzip -d mingdaoyun-flink-linux-amd64-1.17.1.530.tar.gz
ctr -n k8s.io image import mingdaoyun-flink-linux-amd64-1.17.1.530.tar
部署 flink
在 kubernetes master 第一台服务器上操作
-
创建 flink 配置文件存放目录
mkdir /data/mingdao/script/kubernetes/flink
-
创建 flink 配置文件
先进入 flink 配置文件存放目录
cd /data/mingdao/script/kubernetes/flink
执行
vim flink.yaml
写入以下配置apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
namespace: default
labels:
app: flink
data:
flink-conf.yaml: |+
env.java.opts: "-Duser.timezone=Asia/Shanghai"
task.cancellation.timeout: 0
kubernetes.cluster-id: md-flink
kubernetes.namespace: default
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://mdoc/recovery
#restart-strategy: none
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10s
state.backend: rocksdb
state.checkpoints.dir: s3://mdoc/checkpoints # "s3://<your-bucket>/<endpoint>"
state.savepoints.dir: s3://mdoc/savepoints
state.backend.incremental: true
s3.access-key: mingdao # 对象存储ak
s3.secret-key: 123456789 # 对象存储sk
s3.ssl.enabled: false # 如果是使用的云厂商对象存储,是 https 地址则修改为 true
s3.path.style.access: true # 如果使用的是非 MinIO 对象存储,则改为 false
s3.endpoint: 192.168.10.16:9011 # 对象存储地址
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 100
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 3072m # 单个 jobmanager 实例内存限制
taskmanager.memory.process.size: 12288m # 单个 taskmanager 实例内存限制
taskmanager.memory.jvm-overhead.fraction: 0.05
taskmanager.memory.managed.fraction: 0.05
taskmanager.memory.network.fraction: 0.05
parallelism.default: 2
metrics.job.status.enable: "STATE"
metrics.reporters: kafka_reporter,kafka_reporter_running
metrics.reporter.kafka_reporter.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter.bootstrapServers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # kafka 地址
metrics.reporter.kafka_reporter.chunkSize: 20000
metrics.reporter.kafka_reporter.interval: 60 SECONDS
metrics.reporter.kafka_reporter.filter: numRecordsIn,numRecordsOut,uptime
metrics.reporter.kafka_reporter_running.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter_running.bootstrapServers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # kafka 地址
metrics.reporter.kafka_reporter_running.chunkSize: 20000
metrics.reporter.kafka_reporter_running.interval: 60 SECONDS
metrics.reporter.kafka_reporter_running.filter: runningState
log4j-console.properties: |+
# 如下配置会同时影响用户代码和 Flink 的日志行为
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# 如果你只想改变 Flink 的日志行为则可以取消如下的注释部分
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# 下面几行将公共 libraries 或 connectors 的日志级别保持在 INFO 级别。
# root logger 的配置不会覆盖此处配置。
# 你必须手动修改这里的日志级别。
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# 将所有 info 级别的日志输出到 console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# 将所有 info 级别的日志输出到指定的 rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# 关闭 Netty channel handler 中不相关的(错误)警告
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink-service-account
namespace: default
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: configmap-access
namespace: default
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["update", "get", "watch", "list", "create", "edit", "delete"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: configmap-access-binding
namespace: default
subjects:
- kind: ServiceAccount
name: flink-service-account
namespace: default
roleRef:
kind: Role
name: configmap-access
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: default
spec:
replicas: 1 # 仅支持启动1个
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
annotations:
md-update: "20210615101801239"
spec:
# 确保Pod仅在标签为'hap: flink'的节点上运行,即使这些节点被标记为'NoSchedule'
tolerations:
- key: "hap"
operator: "Equal"
value: "flink"
effect: "NoSchedule"
nodeSelector:
hap: flink
containers:
- name: jobmanager
image: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.17.1.530
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
args: ["jobmanager", "$(POD_IP)"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
readinessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 3
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 60
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: localtime
mountPath: /etc/localtime
securityContext:
runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: localtime
hostPath:
path: /usr/share/zoneinfo/Etc/GMT-8
type: ""
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: default
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
annotations:
md-update: "20210615101801240"
spec:
# 确保Pod仅在标签为'hap: flink'的节点上运行,即使这些节点被标记为'NoSchedule'
tolerations:
- key: "hap"
operator: "Equal"
value: "flink"
effect: "NoSchedule"
nodeSelector:
hap: flink
containers:
- name: taskmanager
image: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.17.1.530
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 60
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: localtime
mountPath: /etc/localtime
securityContext:
runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: localtime
hostPath:
path: /usr/share/zoneinfo/Etc/GMT-8
type: ""
---
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
namespace: default
spec:
ports:
- name: rpc
port: 6123
targetPort: 6123
nodePort: 6123
- name: blob-server
port: 6124
targetPort: 6124
nodePort: 6124
- name: webui
port: 8081
targetPort: 8081
nodePort: 28081
sessionAffinity: ClientIP
type: NodePort
selector:
app: flink
component: jobmanager- 实际部署中,注意修改 ConfigMap flink-conf.yaml 中的信息
配置污点容忍度
指定 flink 服务专用服务器,与微服务不混合使用资源
-
给指定 k8s 节点打污点
kubectl taint nodes $flink_node_name hap=flink:NoSchedule
kubectl label nodes $flink_node_name hap=flink$flink_node_name
注意换成实际的节点名称,可通过kubectl get node
查看
-
创建命名空间
kubectl create ns flink
-
配置 flink 服务使用
flink
命名空间sed -i 's/namespace: default/namespace: flink/g' flink.yaml
启动 flink 服务
kubectl apply -f flink.yaml
微服务配置接入 flink
在 config.yaml 中新增变量:
ENV_FLINK_URL: "http://flink-jobmanager.flink:8081"
重启微服务生效
bash restart.sh