Flink
info
- The server for deploying Flink needs to join the Kubernetes cluster where the microservices reside as a worker node first.
- In this document, taints and labels will be added to Flink to ensure that only the Flink service is scheduled to run on the deployment server for Flink.
Loading Images
Operations need to be performed on each node server in the Kubernetes cluster
- Server Supports Internet Access
- Server Does Not Support Internet Access
crictl pull registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.17.1.530
# Link for downloading the offline Flink image file, upload to the deployment server after download
https://pdpublic.mingdao.com/private-deployment/offline/mingdaoyun-flink-linux-amd64-1.17.1.530.tar.gz
Load the offline image on the server
gunzip -d mingdaoyun-flink-linux-amd64-1.17.1.530.tar.gz
ctr -n k8s.io image import mingdaoyun-flink-linux-amd64-1.17.1.530.tar
Deploy Flink
Operations to be performed on the first Kubernetes master server
-
Create a directory for Flink configuration files
mkdir /data/mingdao/script/kubernetes/flink
-
Create the Flink configuration file
First, navigate to the directory for storing Flink configuration files
cd /data/mingdao/script/kubernetes/flink
Execute
vim flink.yaml
and enter the following configurationapiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
namespace: default
labels:
app: flink
data:
flink-conf.yaml: |+
env.java.opts: "-Duser.timezone=Asia/Shanghai"
task.cancellation.timeout: 0
kubernetes.cluster-id: md-flink
kubernetes.namespace: default
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://mdoc/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10s
state.backend: rocksdb
state.checkpoints.dir: s3://mdoc/checkpoints # "s3://<your-bucket>/<endpoint>"
state.savepoints.dir: s3://mdoc/savepoints
state.backend.incremental: true
s3.access-key: mingdao # Object storage access key
s3.secret-key: 123456789 # Object storage secret key
s3.ssl.enabled: false # Modify to true if using cloud object storage with an HTTPS address
s3.path.style.access: true # Change to false if not using MinIO object storage
s3.endpoint: 192.168.10.16:9011 # Object storage address
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 100
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 3072m # Memory limit for a single jobmanager instance
taskmanager.memory.process.size: 12288m # Memory limit for a single taskmanager instance
taskmanager.memory.jvm-overhead.fraction: 0.1
taskmanager.memory.managed.fraction: 0.2
taskmanager.memory.network.fraction: 0.1
parallelism.default: 2
metrics.job.status.enable: "STATE"
metrics.reporters: kafka_reporter,kafka_reporter_running
metrics.reporter.kafka_reporter.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter.bootstrapServers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka address
metrics.reporter.kafka_reporter.chunkSize: 20000
metrics.reporter.kafka_reporter.interval: 60 SECONDS
metrics.reporter.kafka_reporter.filter: numRecordsIn,numRecordsOut,uptime
metrics.reporter.kafka_reporter_running.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter_running.bootstrapServers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka address
metrics.reporter.kafka_reporter_running.chunkSize: 20000
metrics.reporter.kafka_reporter_running.interval: 60 SECONDS
metrics.reporter.kafka_reporter_running.filter: runningState
log4j-console.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink-service-account
namespace: default
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: configmap-access
namespace: default
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["update", "get", "watch", "list", "create", "edit", "delete"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: configmap-access-binding
namespace: default
subjects:
- kind: ServiceAccount
name: flink-service-account
namespace: default
roleRef:
kind: Role
name: configmap-access
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: default
spec:
replicas: 1 # Only supports launching 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
annotations:
md-update: "20210615101801239"
spec:
tolerations:
- key: "hap"
operator: "Equal"
value: "flink"
effect: "NoSchedule"
nodeSelector:
hap: flink
containers:
- name: jobmanager
image: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.17.1.530
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
args: ["jobmanager", "$(POD_IP)"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
readinessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 3
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 60
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: localtime
mountPath: /etc/localtime
securityContext:
runAsUser: 9999 # Refers to the _flink_ user in the official Flink image, can be modified if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: localtime
hostPath:
path: /usr/share/zoneinfo/Etc/GMT-8
type: ""
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: default
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
annotations:
md-update: "20210615101801240"
spec:
tolerations:
- key: "hap"
operator: "Equal"
value: "flink"
effect: "NoSchedule"
nodeSelector:
hap: flink
containers:
- name: taskmanager
image: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.17.1.530
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 60
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: localtime
mountPath: /etc/localtime
securityContext:
runAsUser: 9999 # Refers to the _flink_ user in the official Flink image, can be modified if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: localtime
hostPath:
path: /usr/share/zoneinfo/Etc/GMT-8
type: ""
---
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
namespace: default
spec:
ports:
- name: rpc
port: 6123
targetPort: 6123
nodePort: 6123
- name: blob-server
port: 6124
targetPort: 6124
nodePort: 6124
- name: webui
port: 8081
targetPort: 8081
nodePort: 28081
sessionAffinity: ClientIP
type: NodePort
selector:
app: flink
component: jobmanager- During actual deployment, ensure to modify the information in the ConfigMap flink-conf.yaml
Configure Taint Tolerance
Designate dedicated servers for the Flink service, not to mix resources with microservices
-
Apply taint to the specified Kubernetes nodes
kubectl taint nodes $flink_node_name hap=flink:NoSchedule
kubectl label nodes $flink_node_name hap=flink- Replace
$flink_node_name
with the actual node name, which can be viewed usingkubectl get node
- Replace
-
Create a namespace
kubectl create ns flink
-
Configure Flink service to use the
flink
namespacesed -i 's/namespace: default/namespace: flink/g' flink.yaml
Start the Flink Service
kubectl apply -f flink.yaml
Configure Microservices to Connect to Flink
Add new variables to the config.yaml:
ENV_FLINK_URL: "http://flink-jobmanager.flink:8081"
Restart microservices to take effect
bash restart.sh