Flink
info
- The server for deploying Flink needs to join the Kubernetes cluster where the microservices reside as a worker node first.
- In this document, taints and labels will be added to Flink to ensure that only the Flink service is scheduled to run on the deployment server for Flink.
Loading Images
Operations need to be performed on each node server in the Kubernetes cluster
- Server Supports Internet Access
- Server Does Not Support Internet Access
crictl pull registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.19.710
# Link for downloading the offline Flink image file, upload to the deployment server after download
https://pdpublic.mingdao.com/private-deployment/offline/mingdaoyun-flink-linux-amd64-1.19.710.tar.gz
Load the offline image on the server
gunzip -d mingdaoyun-flink-linux-amd64-1.19.710.tar.gz
ctr -n k8s.io image import mingdaoyun-flink-linux-amd64-1.19.710.tar
Deploy Flink
Operate on the first Kubernetes master server.
-
Create a directory to store Flink configuration files.
mkdir /data/mingdao/script/kubernetes/flink -
Create Flink configuration files.
First, navigate to the directory where Flink configuration files are stored:
cd /data/mingdao/script/kubernetes/flinkExecute
vim flink.yamland write the following configuration:apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
namespace: default
labels:
app: flink
data:
flink-conf.yaml: |+
state.checkpoints.num-retained: 5
cluster.evenly-spread-out-slots: true
env.java.opts: "-Duser.timezone=Asia/Shanghai"
task.cancellation.timeout: 0
kubernetes.cluster-id: md-flink
kubernetes.namespace: default
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://mdoc/recovery
#restart-strategy: none
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10s
state.backend: rocksdb
state.checkpoints.dir: s3://mdoc/checkpoints # "s3://<your-bucket>/<endpoint>"
state.savepoints.dir: s3://mdoc/savepoints
state.backend.incremental: true
s3.access-key: mingdao # Object storage access key
s3.secret-key: 123456789 # Object storage secret key
s3.ssl.enabled: false # Set to true if using cloud object storage with https
s3.path.style.access: true # Set to false if not using MinIO object storage
s3.endpoint: 192.168.10.16:9011 # Object storage endpoint
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 100
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 3072m # Memory limit for a single JobManager instance
taskmanager.memory.process.size: 12288m # Memory limit for a single TaskManager instance
taskmanager.memory.jvm-overhead.fraction: 0.1
taskmanager.memory.managed.fraction: 0.2
taskmanager.memory.network.fraction: 0.1
parallelism.default: 2
metrics.job.status.enable: STATE
metrics.reporters: kafka_reporter,kafka_reporter_running,kafka_reporter2,kafka_reporter_running2
metrics.reporter.kafka_reporter.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addresses
metrics.reporter.kafka_reporter.chunk.size: 20000
metrics.reporter.kafka_reporter.interval: 60s
metrics.reporter.kafka_reporter.filter.metrics: numRecordsIn,numRecordsOut,runningTime
metrics.reporter.kafka_reporter.topic: flink_metrics_counter
metrics.reporter.kafka_reporter.taskNamePrefix: HAP0x5c2_
metrics.reporter.kafka_reporter_running.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter_running.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addresses
metrics.reporter.kafka_reporter_running.chunk.size: 20000
metrics.reporter.kafka_reporter_running.interval: 60s
metrics.reporter.kafka_reporter_running.filter.metrics: RUNNINGState
metrics.reporter.kafka_reporter_running.topic: flink_metrics_gauge
metrics.reporter.kafka_reporter_running.taskNamePrefix: HAP0x5c2_
metrics.reporter.kafka_reporter2.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter2.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addresses
metrics.reporter.kafka_reporter2.chunk.size: 20000
metrics.reporter.kafka_reporter2.interval: 60s
metrics.reporter.kafka_reporter2.filter.metrics: numRecordsIn,numRecordsOut,runningTime
metrics.reporter.kafka_reporter2.topic: flink_metrics_counter-hdp
metrics.reporter.kafka_reporter2.taskNamePrefix: HDP0x5c2_
metrics.reporter.kafka_reporter_running2.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter_running2.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addresses
metrics.reporter.kafka_reporter_running2.chunk.size: 20000
metrics.reporter.kafka_reporter_running2.interval: 60s
metrics.reporter.kafka_reporter_running2.filter.metrics: RUNNINGState
metrics.reporter.kafka_reporter_running2.topic: flink_metrics_gauge-hdp
metrics.reporter.kafka_reporter_running2.taskNamePrefix: HDP0x5c2_
log4j-console.properties: |+
# The following configuration affects logging behavior for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment the following section if you only want to change Flink's logging behavior
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log levels of common libraries or connectors at INFO level.
# Configuration in the root logger won't override this.
# You must manually modify the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Output all INFO level logs to console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Output all INFO level logs to a specified rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Disable irrelevant (error) warnings from Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink-service-account
namespace: default
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: configmap-access
namespace: default
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["update", "get", "watch", "list", "create", "edit", "delete", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: configmap-access-binding
namespace: default
subjects:
- kind: ServiceAccount
name: flink-service-account
namespace: default
roleRef:
kind: Role
name: configmap-access
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: default
spec:
replicas: 1 # Only support starting 1 replica
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
annotations:
md-update: "20210615101801239"
spec:
# Ensure Pods only run on nodes labeled 'hap: flink', even if these nodes are marked 'NoSchedule'
tolerations:
- key: "hap"
operator: "Equal"
value: "flink"
effect: "NoSchedule"
nodeSelector:
hap: flink
containers:
- name: jobmanager
image: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.19.710
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
# The following args overwrite the value of jobmanager.rpc.address configured in the configuration ConfigMap to POD_IP.
args: ["jobmanager", "$(POD_IP)"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
readinessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 3
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 60
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: localtime
mountPath: /etc/localtime
securityContext:
runAsUser: 9999 # Refer to the default _flink_ user in the official Flink image, modify if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: localtime
hostPath:
path: /usr/share/zoneinfo/Etc/GMT-8
type: ""
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: default
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
annotations:
md-update: "20210615101801240"
spec:
# Ensure Pods only run on nodes labeled 'hap: flink', even if these nodes are marked 'NoSchedule'
tolerations:
- key: "hap"
operator: "Equal"
value: "flink"
effect: "NoSchedule"
nodeSelector:
hap: flink
containers:
- name: taskmanager
image: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.19.710
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 60
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: localtime
mountPath: /etc/localtime
securityContext:
runAsUser: 9999 # Refer to the default _flink_ user in the official Flink image, modify if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: localtime
hostPath:
path: /usr/share/zoneinfo/Etc/GMT-8
type: ""
---
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
namespace: default
spec:
ports:
- name: rpc
port: 6123
targetPort: 6123
nodePort: 6123
- name: blob-server
port: 6124
targetPort: 6124
nodePort: 6124
- name: webui
port: 8081
targetPort: 8081
nodePort: 28081
sessionAffinity: ClientIP
type: NodePort
selector:
app: flink
component: jobmanager- In actual deployment, make sure to modify the information in the ConfigMap
flink-conf.yaml.
- In actual deployment, make sure to modify the information in the ConfigMap
Configure Taint Tolerance
Designate dedicated servers for the Flink service, not to mix resources with microservices
-
Apply taint to the specified Kubernetes nodes
kubectl taint nodes $flink_node_name hap=flink:NoSchedule
kubectl label nodes $flink_node_name hap=flink- Replace
$flink_node_namewith the actual node name, which can be viewed usingkubectl get node
- Replace
-
Create a namespace
kubectl create ns flink -
Configure Flink service to use the
flinknamespacesed -i 's/namespace: default/namespace: flink/g' flink.yaml
Start the Flink Service
kubectl apply -f flink.yaml
Create Start and Stop Scripts
## Content of start_flink.sh
cat > /data/mingdao/script/kubernetes/flink/start_flink.sh << 'EOF'
#!/bin/bash
baseDir=$(dirname $0)
kubectl apply -f $baseDir/flink.yaml
EOF
## Content of stop_flink.sh
cat > /data/mingdao/script/kubernetes/flink/stop_flink.sh << 'EOF'
#!/bin/bash
baseDir=$(dirname $0)
kubectl delete -f $baseDir/flink.yaml
EOF
## Content of restart_flink.sh
cat > /data/mingdao/script/kubernetes/flink/restart_flink.sh << 'EOF'
#!/bin/bash
RESTART_DATATIME=$(date +%Y%m%d%H%M%S)
baseDir=$(dirname $0)
sed -r -i "/md-update:/ s/([[:digit:]]+)/$RESTART_DATATIME/" $baseDir/flink.yaml
/bin/bash $baseDir/start_flink.sh
EOF
#### Authorization
chmod +x /data/mingdao/script/kubernetes/flink/{start_flink.sh,stop_flink.sh,restart_flink.sh}
Configure Microservices to Connect to Flink
Add new variables to the config.yaml:
ENV_FLINK_URL: "http://flink-jobmanager.flink:8081"
Restart microservices to take effect
bash restart.sh