Flink
info
- The HAP aggregated table, data integration, and HDP-related capabilities all depend on the Flink component. Deploy Flink as needed if you want to enable these features.
- The server for deploying Flink needs to join the Kubernetes cluster where the microservices reside as a worker node first.
- In this document, taints and labels will be added to Flink to ensure that only the Flink service is scheduled on the Flink deployment server.
Loading Images
Operations need to be performed on each node server in the Kubernetes cluster
- Server Supports Internet Access
- Server Does Not Support Internet Access
crictl pull registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.19.720
# Link for downloading the offline Flink image file, upload to the deployment server after download
https://pdpublic.mingdao.com/private-deployment/offline/mingdaoyun-flink-linux-amd64-1.19.720.tar.gz
Load the offline image on the server
gunzip -d mingdaoyun-flink-linux-amd64-1.19.720.tar.gz
ctr -n k8s.io image import mingdaoyun-flink-linux-amd64-1.19.720.tar
Deploy Flink
Operate on the first Kubernetes master server.
-
Create a directory to store Flink configuration files.
mkdir /data/mingdao/script/kubernetes/flink -
Create Flink configuration files.
First, navigate to the directory where Flink configuration files are stored:
cd /data/mingdao/script/kubernetes/flinkExecute
vim flink.yamland write the following configuration:apiVersion: v1kind: ConfigMapmetadata:name: flink-confignamespace: defaultlabels:app: flinkdata:flink-conf.yaml: |+state.checkpoints.num-retained: 5cluster.evenly-spread-out-slots: trueenv.java.opts: "-Duser.timezone=Asia/Shanghai"task.cancellation.timeout: 0kubernetes.cluster-id: md-flinkkubernetes.namespace: defaulthigh-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactoryhigh-availability.storageDir: s3://mdoc/recovery#restart-strategy: nonerestart-strategy: fixed-delayrestart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10sstate.backend: rocksdbstate.checkpoints.dir: s3://mdoc/checkpoints # "s3://<your-bucket>/<endpoint>"state.savepoints.dir: s3://mdoc/savepointsstate.backend.incremental: trues3.access-key: mingdao # Object storage access keys3.secret-key: 123456789 # Object storage secret keys3.ssl.enabled: false # Set to true if using cloud object storage with httpss3.path.style.access: true # Set to false if not using MinIO object storages3.endpoint: 192.168.10.16:9011 # Object storage endpointjobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 100blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3072m # Memory limit for a single JobManager instancetaskmanager.memory.process.size: 12288m # Memory limit for a single TaskManager instancetaskmanager.memory.jvm-overhead.fraction: 0.1taskmanager.memory.managed.fraction: 0.2taskmanager.memory.network.fraction: 0.1parallelism.default: 2metrics.job.status.enable: STATEmetrics.reporters: kafka_reporter,kafka_reporter_running,kafka_reporter2,kafka_reporter_running2metrics.reporter.kafka_reporter.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactorymetrics.reporter.kafka_reporter.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addressesmetrics.reporter.kafka_reporter.chunk.size: 20000metrics.reporter.kafka_reporter.interval: 60smetrics.reporter.kafka_reporter.filter.metrics: numRecordsIn,numRecordsOut,runningTimemetrics.reporter.kafka_reporter.topic: flink_metrics_countermetrics.reporter.kafka_reporter.taskNamePrefix: HAP0x5c2_metrics.reporter.kafka_reporter_running.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactorymetrics.reporter.kafka_reporter_running.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addressesmetrics.reporter.kafka_reporter_running.chunk.size: 20000metrics.reporter.kafka_reporter_running.interval: 60smetrics.reporter.kafka_reporter_running.filter.metrics: RUNNINGStatemetrics.reporter.kafka_reporter_running.topic: flink_metrics_gaugemetrics.reporter.kafka_reporter_running.taskNamePrefix: HAP0x5c2_metrics.reporter.kafka_reporter2.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactorymetrics.reporter.kafka_reporter2.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addressesmetrics.reporter.kafka_reporter2.chunk.size: 20000metrics.reporter.kafka_reporter2.interval: 60smetrics.reporter.kafka_reporter2.filter.metrics: numRecordsIn,numRecordsOut,runningTimemetrics.reporter.kafka_reporter2.topic: flink_metrics_counter-hdpmetrics.reporter.kafka_reporter2.taskNamePrefix: HDP0x5c2_metrics.reporter.kafka_reporter_running2.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactorymetrics.reporter.kafka_reporter_running2.bootstrap.servers: 192.168.10.7:9092,192.168.10.8:9092,192.168.10.9:9092 # Kafka addressesmetrics.reporter.kafka_reporter_running2.chunk.size: 20000metrics.reporter.kafka_reporter_running2.interval: 60smetrics.reporter.kafka_reporter_running2.filter.metrics: RUNNINGStatemetrics.reporter.kafka_reporter_running2.topic: flink_metrics_gauge-hdpmetrics.reporter.kafka_reporter_running2.taskNamePrefix: HDP0x5c2_log4j-console.properties: |+# The following configuration affects logging behavior for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment the following section if you only want to change Flink's logging behavior#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log levels of common libraries or connectors at INFO level.# Configuration in the root logger won't override this.# You must manually modify the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Output all INFO level logs to consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Output all INFO level logs to a specified rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Disable irrelevant (error) warnings from Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF---apiVersion: v1kind: ServiceAccountmetadata:name: flink-service-accountnamespace: default---kind: RoleapiVersion: rbac.authorization.k8s.io/v1metadata:name: configmap-accessnamespace: defaultrules:- apiGroups: [""]resources: ["configmaps"]verbs: ["update", "get", "watch", "list", "create", "edit", "delete", "patch"]---kind: RoleBindingapiVersion: rbac.authorization.k8s.io/v1metadata:name: configmap-access-bindingnamespace: defaultsubjects:- kind: ServiceAccountname: flink-service-accountnamespace: defaultroleRef:kind: Rolename: configmap-accessapiGroup: rbac.authorization.k8s.io---apiVersion: apps/v1kind: Deploymentmetadata:name: flink-jobmanagernamespace: defaultspec:replicas: 1 # Only support starting 1 replicaselector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerannotations:md-update: "20210615101801239"spec:# Ensure Pods only run on nodes labeled 'hap: flink', even if these nodes are marked 'NoSchedule'tolerations:- key: "hap"operator: "Equal"value: "flink"effect: "NoSchedule"nodeSelector:hap: flinkcontainers:- name: jobmanagerimage: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.19.720env:- name: POD_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.podIP# The following args overwrite the value of jobmanager.rpc.address configured in the configuration ConfigMap to POD_IP.args: ["jobmanager", "$(POD_IP)"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuireadinessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 3livenessProbe:tcpSocket:port: 6123initialDelaySeconds: 60periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf- name: localtimemountPath: /etc/localtimesecurityContext:runAsUser: 9999 # Refer to the default _flink_ user in the official Flink image, modify if necessaryserviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMapsvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: localtimehostPath:path: /usr/share/zoneinfo/Etc/GMT-8type: ""---apiVersion: apps/v1kind: Deploymentmetadata:name: flink-taskmanagernamespace: defaultspec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerannotations:md-update: "20210615101801240"spec:# Ensure Pods only run on nodes labeled 'hap: flink', even if these nodes are marked 'NoSchedule'tolerations:- key: "hap"operator: "Equal"value: "flink"effect: "NoSchedule"nodeSelector:hap: flinkcontainers:- name: taskmanagerimage: registry.cn-hangzhou.aliyuncs.com/mdpublic/mingdaoyun-flink:1.19.720args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 60periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf/- name: localtimemountPath: /etc/localtimesecurityContext:runAsUser: 9999 # Refer to the default _flink_ user in the official Flink image, modify if necessaryserviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMapsvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: localtimehostPath:path: /usr/share/zoneinfo/Etc/GMT-8type: ""---apiVersion: v1kind: Servicemetadata:name: flink-jobmanagernamespace: defaultspec:ports:- name: rpcport: 6123targetPort: 6123nodePort: 6123- name: blob-serverport: 6124targetPort: 6124nodePort: 6124- name: webuiport: 8081targetPort: 8081nodePort: 28081sessionAffinity: ClientIPtype: NodePortselector:app: flinkcomponent: jobmanager- In actual deployment, make sure to modify the information in the ConfigMap
flink-conf.yaml.
- In actual deployment, make sure to modify the information in the ConfigMap
Configure Taint Tolerance
Designate dedicated servers for the Flink service, not to mix resources with microservices
-
Apply taint to the specified Kubernetes nodes
kubectl taint nodes $flink_node_name hap=flink:NoSchedulekubectl label nodes $flink_node_name hap=flink- Replace
$flink_node_namewith the actual node name, which can be viewed usingkubectl get node
- Replace
-
Create a namespace
kubectl create ns flink -
Configure Flink service to use the
flinknamespacesed -i 's/namespace: default/namespace: flink/g' flink.yaml
Start the Flink Service
kubectl apply -f flink.yaml
Create Start and Stop Scripts
## Content of start_flink.sh
cat > /data/mingdao/script/kubernetes/flink/start_flink.sh << 'EOF'
#!/bin/bash
baseDir=$(dirname $0)
kubectl apply -f $baseDir/flink.yaml
EOF
## Content of stop_flink.sh
cat > /data/mingdao/script/kubernetes/flink/stop_flink.sh << 'EOF'
#!/bin/bash
baseDir=$(dirname $0)
kubectl delete -f $baseDir/flink.yaml
EOF
## Content of restart_flink.sh
cat > /data/mingdao/script/kubernetes/flink/restart_flink.sh << 'EOF'
#!/bin/bash
RESTART_DATATIME=$(date +%Y%m%d%H%M%S)
baseDir=$(dirname $0)
sed -r -i "/md-update:/ s/([[:digit:]]+)/$RESTART_DATATIME/" $baseDir/flink.yaml
/bin/bash $baseDir/start_flink.sh
EOF
#### Authorization
chmod +x /data/mingdao/script/kubernetes/flink/{start_flink.sh,stop_flink.sh,restart_flink.sh}
Configure Microservices to Connect to Flink
Add new variables to the config.yaml:
ENV_FLINK_URL: "http://flink-jobmanager.flink:8081"
Restart microservices to take effect
bash restart.sh