1 概述
flink operator及其flink集群,默认不直接支持华为云OBS,需要在这些java程序的插件目录放一个jar包,以及修改flink配置后,才能支持集成华为云OBS。
相关链接参考:
https://support.huaweicloud.com/bestpractice-obs/obs_05_1516.html
2 环境准备
2.1 华为云kubernetes集群
准备一个kubernetes集群,如下图所示:
flink_operator_helm_13">2.2 flink operator helm包下载地址
https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz
2.3 cert-manager yaml文件下载地址
https://github.com/jetstack/cert-manager/releases/download/v1.17.2/cert-manager.yaml
flink_25">2.4 准备flink应用示例
https://github.com/apache/flink/tree/master/flink-examples
将flink官方示例的代码编译成jar包,再上传到对象存储OBS,如下图所示:
这些jar包存放在华为云OBS对象存储上,flink operator和可以通过OBS协议拉取jar包,最终提交给flink集群,并且flink集群的jobmanager、flink taskmanager也能读写OBS对象存储。
3 部署
3.1 安装cert-manager
此组件是flink operator webhook的一个依赖,因此先安装它。
cd /tmp
wget https://github.com/jetstack/cert-manager/releases/download/v1.17.1/cert-manager.yaml
kubectl apply -f cert-manager.yaml
3.2 安装helm二进制工具
cd /tmp
wget https://get.helm.sh/helm-v3.16.2-linux-amd64.tar.gz
tar xf helm-v3.16.2-linux-amd64.tar.gz
cd linux-amd64
/bin/cp -f helm /usr/bin/
helm env
flink_operator_54">3.3 部署flink operator
下载fink operator的helm包,解压文件,最后通过helm命令将它部署在flink namespace中。
cd /tmp
wget https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz
tar xf flink-kubernetes-operator-1.10.0-helm.tgz
修改flink-kubernetes-operator/values.yaml文件,在文件的defaultConfiguration.flink-conf.yaml字段下新增如下内容:
defaultConfiguration:
flink-conf.yaml: |+
fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem
fs.obs.access.key: *********你的ak*********
fs.obs.secret.key: *********你的sk*********
fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com # 这是对象存储端点,依据实际情况填写
部署k8s资源,命令如下:
helm upgrade --install flink-operator -n flink --create-namespace \
--set image.repository=swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator \
--set image.tag=1.10.0 \
./flink-kubernetes-operator/
我将flink-obs的jar包放入到镜像swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45中,此镜像是公共镜像,大家可随意拉取使用。
接着,更新operator deployment(需要使用initContainer和obs-plugin的volume的挂载),直接kubectl apply如下内容即可:
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
meta.helm.sh/release-name: flink-operator
meta.helm.sh/release-namespace: flink
generation: 4
labels:
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: flink-kubernetes-operator
app.kubernetes.io/version: 1.10.0
helm.sh/chart: flink-kubernetes-operator-1.10.0
name: flink-kubernetes-operator
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: flink-kubernetes-operator
strategy:
type: Recreate
template:
metadata:
annotations:
kubectl.kubernetes.io/default-container: flink-kubernetes-operator
creationTimestamp: null
labels:
app.kubernetes.io/name: flink-kubernetes-operator
spec:
initContainers:
- image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45
name: sidecar
command: ["sh"]
args: [
"-c",
"mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"
]
volumeMounts:
- name: obs-plugin
mountPath: /opt/flink/plugins/obs-fs-hadoop
containers:
- command:
- /docker-entrypoint.sh
- operator
env:
- name: OPERATOR_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: HOST_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.hostIP
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: OPERATOR_NAME
value: flink-kubernetes-operator
- name: FLINK_CONF_DIR
value: /opt/flink/conf
- name: FLINK_PLUGINS_DIR
value: /opt/flink/plugins
- name: LOG_CONFIG
value: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties
- name: JVM_ARGS
image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0
imagePullPolicy: IfNotPresent
livenessProbe:
failureThreshold: 3
httpGet:
path: /
port: health-port
scheme: HTTP
initialDelaySeconds: 30
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
name: flink-kubernetes-operator
ports:
- containerPort: 8085
name: health-port
protocol: TCP
resources: {}
securityContext: {}
startupProbe:
failureThreshold: 30
httpGet:
path: /
port: health-port
scheme: HTTP
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /opt/flink/conf
name: flink-operator-config-volume
- mountPath: /opt/flink/artifacts
name: flink-artifacts-volume
- name: obs-plugin
mountPath: /opt/flink/plugins/obs-fs-hadoop
- command:
- /docker-entrypoint.sh
- webhook
env:
- name: WEBHOOK_KEYSTORE_PASSWORD
valueFrom:
secretKeyRef:
key: password
name: flink-operator-webhook-secret
- name: WEBHOOK_KEYSTORE_FILE
value: /certs/keystore.p12
- name: WEBHOOK_KEYSTORE_TYPE
value: pkcs12
- name: WEBHOOK_SERVER_PORT
value: "9443"
- name: LOG_CONFIG
value: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties
- name: JVM_ARGS
- name: FLINK_CONF_DIR
value: /opt/flink/conf
- name: FLINK_PLUGINS_DIR
value: /opt/flink/plugins
- name: OPERATOR_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0
imagePullPolicy: IfNotPresent
name: flink-webhook
resources: {}
securityContext: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /certs
name: keystore
readOnly: true
- mountPath: /opt/flink/conf
name: flink-operator-config-volume
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext:
runAsGroup: 9999
runAsUser: 9999
serviceAccount: flink-operator
serviceAccountName: flink-operator
terminationGracePeriodSeconds: 30
volumes:
- configMap:
defaultMode: 420
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-operator.properties
path: log4j-operator.properties
- key: log4j-console.properties
path: log4j-console.properties
name: flink-operator-config
name: flink-operator-config-volume
- emptyDir: {}
name: flink-artifacts-volume
- name: keystore
secret:
defaultMode: 420
items:
- key: keystore.p12
path: keystore.p12
secretName: webhook-server-cert
- name: obs-plugin
emptyDir: {}
flink_session_cluster_268">3.4 部署flink session cluster
kubectl apply以下资源即可部署一个flink session集群,文件内容如下:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-session-cluster
namespace: flink
spec:
image: swr.cn-south-1.myhuaweicloud.com/migrator/flink:1.19
flinkVersion: v1_19
flinkConfiguration:
fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem
fs.obs.access.key: *********你的ak*********
fs.obs.secret.key: *********你的sk*********
fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com # 这是对象存储端点,依据实际情况填写
jobManager:
resource:
memory: "2048m"
cpu: 2
taskManager:
resource:
memory: "2048m"
cpu: 2
serviceAccount: flink
podTemplate:
spec:
volumes:
- name: obs-plugin
emptyDir: {}
containers:
# Do not change the main container name
- name: flink-main-container
volumeMounts:
- name: obs-plugin
mountPath: /opt/flink/plugins/obs-fs-hadoop
initContainers:
- image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45
name: sidecar
command: ["sh"]
args: [
"-c",
"mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"
]
volumeMounts:
- name: obs-plugin
mountPath: /opt/flink/plugins/obs-fs-hadoop
flink_318">4 提交flink作业
kubectl apply以下资源即可:
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: basic-session-job-example
namespace: flink
spec:
deploymentName: flink-session-cluster
job:
jarURI: obs://你的桶/StateMachineExample.jar # jar包的位置,按实际情况填写
parallelism: 1
可见flink作业是running状态,说明jar包被flink operator从华为云对象存储OBS拉取下来并提交到flink集群中。
继续查看flink operator日志,可以看见obs相关的信息:
小结
本文介绍flink operator及其管理的flink集群是如何对接到华为云对象存储OBS,对接完成后,不仅可以将作业的jar包存储在对象存储,也可以将flink作业的状态、输入输出等存储在对象存储。