Flink supports high-availability mode for both standalone and native Kubernetes via the Flink operator. This article aims to explain the purpose of HA and how to configure and run it in a local kind cluster.
A flink cluster has only 1 JobManager running at a given point in time. This presents as single point of failure. If the jobmanager fails currently running jobs within the cluster will also fail and have to be restarted from scratch.
Enabling HA allows the cluster to recover from such failures and ensures that streaming jobs especially can resume from its last known state via checkpoints.
In a previous blog post, I mentioned savepoints
and how we can resume a job from it. checkpoints
is a mechanism provided via the HA service. When HA is enabled, for any streaming job, Flink will make regular backups of the job’s state via checkpoints
which allows you to resume the job from in event of a cluster failure.
The main difference between savepoints
and checkpoints
is that the former is triggred by the user while the other is managed entirely by Flink.
The main purpose of having two complementary systems is that checkpoints
provide fast recoverable state in the event of cluster failures such as job manager or task manager pods being killed whereas savepoints
allow for more portability and is intended for long-term uses such as Flink versions upgrade and changes to job properties.
In HA mode, we can have more than 1 job manager pod running concurrently and only 1 of them is selected as the Leader via the leader election service.
In this post, we are using the flink-operator
to setup our session cluster. The default setting creates a stateful set of the jobmanager and the number of replicas is not configurable at this point. However, running a replicaset means there will always be at least 1 job manager pod running so it serves this use case.
To enable HA, the following conditions must be met:
-
Only use local storage for the high availability checkpoints. In the kind cluster config, we can mount an additional local volume and reference it in a persistent volume. The mounted volume must also have the ownership of
9999:9999
-
A service account which has permissions to edit configmaps. HA stores information on the cluster state such as the current jobmanager in these configmaps.
To create the HA volume I mounted a local volume in /tmp/flink-k8s-example on localhost to /flink-k8s-example on the node in the kind cluster config:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: InitConfiguration
nodeRegistration:
kubeletExtraArgs:
node-labels: "ingress-ready=true"
extraPortMappings:
- containerPort: 80
hostPort: 80
protocol: TCP
- containerPort: 443
hostPort: 443
protocol: TCP
extraMounts:
- hostPath: /tmp/artifacts
containerPath: /artifacts
- hostPath: /tmp/flink-k8s-example
containerPath: /flink-k8s-example
Next we create the PV, and PVC for the mounted volume:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
apiVersion: v1
kind: PersistentVolume
metadata:
name: flink-pv
labels:
type: local
spec:
storageClassName: manual
capacity:
storage: 1Gi
accessModes:
- ReadWriteOnce
hostPath:
path: /flink-k8s-example/
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: flink-shared-pvc
spec:
storageClassName: manual
accessModes:
- ReadWriteOnce
volumeName: flink-pv
resources:
requests:
storage: 1Gi
The PV mounts the /flink-k8s-example
path on the node to create a volume.
The service account can be created from the default cluster service account:
1
2
3
kubectl create serviceaccount flink-service-account
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink-service-account
We reference the service account in the flink cluster config:
1
2
3
4
5
6
7
8
9
10
11
12
apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
name: beam-flink-cluster
namespace: default
spec:
flinkVersion: "1.15.2"
image:
name: apache/flink:1.15.2
serviceAccountName: "flink-service-account"
...
The flink config needs to be updated to include the HA config:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
taskmanager.memory.process.size: "2g"
taskmanager.data.port: "6121"
taskmanager.numberOfTaskSlots: "3"
parallelism.default: "1"
state.backend: filesystem
state.backend.incremental: "true"
state.checkpoints.dir: file:///flink-shared/checkpoints
state.savepoints.dir: file:///flink-shared/savepoints
classloader.resolve-order: parent-first
execution.checkpointing.interval: "60"
# Kubernetes config
kubernetes.cluster-id: "beam-flink-cluster"
kubernetes.taskmanager.service-account: "flink-service-account"
# Below for HA config
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.jobmanager.port: "50010"
high-availability.storageDir: file:///flink-shared/ha
high-availability.cluster-id: "beam-flink-cluster"
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: "10"
We are using the built in HA services factory class for high-availability. We mount the volume created previously as state storage and we set the cluster id to be the cluster name, which is required.
Next we define the restart strategy and a timeout. As a precaution, I also pined the jobmanager port to 50010 as the configuration docs states that this port can be a random value when a new jobmanager is created.
We define the state checkpoint interval to be 60. Note that this value must be greater than 0 for checkpointing to work. We define the cluster id, which is set to the name of the flink cluster. We also add the custom service account to the taskmanager.
The state.backend
is set to filesystem. We also enable incremental checkpoint via state.backend.incremental
which only stores diffs of checkpoints rather than entire checkpoints. Note that the state.checkpoints.dir
and state.savepoints.dir
can also be set to remote storage locations such as s3, but the main high-availability.storageDir
has to be set to a volume.
Assuming the setup is right, when we start the flink session cluster, we should see the following in the jobmanager logs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
2022-12-11 15:29:32,308 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - Create KubernetesLeaderElector beam-flink-cluster-cluster-config-map with lock identity 3e81a3e8-b718-4c9e-96ad-cd8f0eacd48e.
2022-12-11 15:29:32,309 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for default/beam-flink-cluster-cluster-config-map, watching id:db45acf1-52b3-4239-91a6-7232c1c56bba
...
2022-12-11 15:29:32,378 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - New leader elected 3e81a3e8-b718-4c9e-96ad-cd8f0eacd48e for beam-flink-cluster-cluster-config-map.
...
2022-12-11 15:29:32,541 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@8b670c0.
2022-12-11 15:29:32,541 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] - Web frontend listening at http://beam-flink-cluster-jobmanager:8081.
2022-12-11 15:29:32,541 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] - http://beam-flink-cluster-jobmanager:8081 was granted leadership with leaderSessionID=92a23e47-73ea-4564-890f-0cb39937a15a
...
2022-12-11 15:29:32,556 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@3513d214.
2022-12-11 15:29:32,556 INFO org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - Starting resource manager service.
2022-12-11 15:29:32,556 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@7534785a.
2022-12-11 15:29:32,557 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='beam-flink-cluster-cluster-config-map'}.
2022-12-11 15:29:32,557 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='beam-flink-cluster-cluster-config-map'}.
2022-12-11 15:29:32,572 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for default/beam-flink-cluster-cluster-config-map, watching id:c389c018-0bda-437a-aea3-656aa64dc47f
2022-12-11 15:29:32,572 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for default/beam-flink-cluster-cluster-config-map, watching id:d25f089c-67eb-4211-8ca2-245e55409c37
2022-12-11 15:29:32,574 INFO org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunner [] - DefaultDispatcherRunner was granted leadership with leader id 92a23e47-73ea-4564-890f-0cb39937a15a. Creating new DispatcherLeaderProcess.
2022-12-11 15:29:32,581 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Start SessionDispatcherLeaderProcess.
2022-12-11 15:29:32,584 INFO org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - Resource manager service is granted leadership with session id 92a23e47-73ea-4564-890f-0cb39937a15a.
2022-12-11 15:29:32,585 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Recover all persisted job graphs that are not finished, yet.
2022-12-11 15:29:32,607 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/rpc/resourcemanager_0 .
2022-12-11 15:29:32,619 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Retrieved job ids [] from KubernetesStateHandleStore{configMapName='beam-flink-cluster-cluster-config-map'}
2022-12-11 15:29:32,619 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Successfully recovered 0 persisted job graphs.
...
The logs show that the leader election service is activated and the current job manager http://beam-flink-cluster-jobmanager:8081 is selected to be the leader. Note that the service is created automatically via the flink-operator in this case. It then creates a dispatcher and resource manager service and assign them as leader, updating the configmaps.
HA automatically tracks the current leader via configmaps. It created two configmaps: <flink cluster name>-cluster-config-map
and <flink cluster name>-configmap
. The first configmap contains the leader election details while the second configmap contains a copy of the flink and log4j configs used in the initial cluster setup.
We can use the following failure scenarios to test if HA is actually working:
Kill the current jobmanager pod process
This will terminate the jobmanager process. The logs should show it being restarted:
1
kubectl exec -it {jobmanager_pod_name} -- /bin/sh -c "kill 1"
Output of kubectl get pods:
1
2
3
beam-flink-cluster-jobmanager-0 1/1 Running 1 (55s ago) 19m
beam-flink-cluster-taskmanager-0 2/2 Running 0 19m
beam-flink-cluster-taskmanager-1 2/2 Running 0 19m
The jobmanager logs show the same startup information as when the cluster was first created, suggesting that a new jobmanager pod was created.
Kill a taskmanager pod process
1
kubectl exec -it {taskmanager_pod_name} -- /bin/sh -c "kill 1"
Output of the jobmanager logs shows that a new taskmanager process is started and registered:
1
2
3
4
2022-12-11 15:51:47,574 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Closing TaskExecutor connection 10.244.0.18:6122-3821cd because: The TaskExecutor is shutting down.
2022-12-11 15:52:01,144 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering TaskManager with ResourceID 10.244.0.18:6122-fe0c19 (akka.tcp://flink@10.244.0.18:6122/user/rpc/taskmanager_0) at ResourceManager
...
Delete the jobmanager pod
Delete the main jobmanager pod using:
1
kubectl delete pod {jobmanager_pod_name}
Output of jobmanager logs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
2022-12-11 15:55:28,061 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2022-12-11 15:55:28,064 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2022-12-11 15:55:28,064 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124
2022-12-11 15:55:28,068 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] - Shutting down rest endpoint.
2022-12-11 15:55:28,078 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] - Removing cache directory /tmp/flink-web-0b1e5be0-95c2-4716-9688-44ef1748278b/flink-web-ui
2022-12-11 15:55:28,080 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2022-12-11 15:55:28,090 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] - Shut down complete.
2022-12-11 15:55:28,091 INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent [] - Closing components.
2022-12-11 15:55:28,091 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2022-12-11 15:55:28,091 INFO org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='beam-flink-cluster-cluster-config-map'}.
2022-12-11 15:55:28,091 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2022-12-11 15:55:28,091 INFO org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='beam-flink-cluster-cluster-config-map'}.
2022-12-11 15:55:28,091 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2022-12-11 15:55:28,091 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Stopped to watch for default/beam-flink-cluster-cluster-config-map, watching id:dbf98f2b-6765-45be-bf62-afc1f15d84f6
2022-12-11 15:55:28,091 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Stopped to watch for default/beam-flink-cluster-cluster-config-map, watching id:a2cc3271-7ef2-47ee-9342-dfcbc83bbc4a
2022-12-11 15:55:28,110 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Stopping SessionDispatcherLeaderProcess.
2022-12-11 15:55:28,110 INFO org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - Stopping resource manager service.
2022-12-11 15:55:28,110 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopping dispatcher akka.tcp://flink@beam-flink-cluster-jobmanager:6123/user/rpc/dispatcher_1.
2022-12-11 15:55:28,111 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Stopping DefaultLeaderElectionService.
2022-12-11 15:55:28,111 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopping all currently running jobs of dispatcher akka.tcp://flink@beam-flink-cluster-jobmanager:6123/user/rpc/dispatcher_1.
2022-12-11 15:55:28,112 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopped dispatcher akka.tcp://flink@beam-flink-cluster-jobmanager:6123/user/rpc/dispatcher_1.
2022-12-11 15:55:28,114 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Stopping DefaultJobGraphStore.
2022-12-11 15:55:28,117 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Closing the slot manager.
2022-12-11 15:55:28,117 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Suspending the slot manager.
2022-12-11 15:55:28,119 INFO org.apache.flink.runtime.leaderelection.DefaultMultipleComponentLeaderElectionService [] - Closing DefaultMultipleComponentLeaderElectionService.
2022-12-11 15:55:28,120 INFO org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionDriver [] - Closing org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionDriver@5bb0c5c0.
The logs indicate that the leader election service was indeed closed when the jobmanager pod is deleted.
The output of the new jobmanager pod show that the new jobmanager pod is elected as the current leader:
1
2
3
4
5
6
7
8
9
10
2022-12-11 15:55:36,536 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - Create KubernetesLeaderElector beam-flink-cluster-cluster-config-map with lock identity 05df88b5-bbd2-400f-b1e8-9d4d386b2a43.
2022-12-11 15:55:36,537 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for default/beam-flink-cluster-cluster-config-map, watching id:fa91664e-7df9-4937-802b-20e3cba09bc9
...
2022-12-11 15:55:46,149 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] - http://beam-flink-cluster-jobmanager:8081 was granted leadership with leaderSessionID=28855791-dee8-4a20-9cd1-b1d5b91be838
2022-12-11 15:55:46,149 INFO org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - Resource manager service is granted leadership with session id 28855791-dee8-4a20-9cd1-b1d5b91be838.
...
Testing HA with checkpoints
We can use the built-in statemachine example to simulate a long running streaming job. Then we monitor the checkpoints directory to ensure that its created.
The jobmanager logs should show the job running and checkpoints being saved:
1
2
3
4
5
6
7
8
9
10
11
2022-12-11 16:02:21,632 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Flat Map -> Sink: Print to Std. Out (1/1) (e4adc6c6d1d4b2b9e9318cfd95e65e35) switched from INITIALIZING to RUNNING.
2022-12-11 16:02:23,370 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1670774543316 for job c506b6c290cc2d96a0e3f0eea10395c4.
2022-12-11 16:02:23,451 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 1 for job c506b6c290cc2d96a0e3f0eea10395c4 (7735 bytes, checkpointDuration=123 ms, finalizationTime=11 ms).
2022-12-11 16:02:25,326 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1670774545316 for job c506b6c290cc2d96a0e3f0eea10395c4.
2022-12-11 16:02:25,364 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 2 for job c506b6c290cc2d96a0e3f0eea10395c4 (8320 bytes, checkpointDuration=23 ms, finalizationTime=25 ms).
...
We can try to kill the jobmanager process and it should resume the job from the last checkpoint, which was checkpoint 106 for this example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2022-12-11 16:06:08,109 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Recover all persisted job graphs that are not finished, yet.
2022-12-11 16:06:08,187 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Retrieved job ids [c506b6c290cc2d96a0e3f0eea10395c4] from KubernetesStateHandleStore{configMapName='beam-flink-cluster-cluster-config-map'}
2022-12-11 16:06:08,188 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Trying to recover job with job id c506b6c290cc2d96a0e3f0eea10395c4.
2022-12-11 16:06:08,406 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Recovered JobGraph(jobId: c506b6c290cc2d96a0e3f0eea10395c4).
2022-12-11 16:06:08,407 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Successfully recovered 1 persisted job graphs.
2022-12-11 16:06:09,128 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job 'State machine job' (c506b6c290cc2d96a0e3f0eea10395c4).
2022-12-11 16:06:09,209 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10, backoffTimeMS=1000) for State machine job (c506b6c290cc2d96a0e3f0eea10395c4).
2022-12-11 16:06:09,230 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Recovering checkpoints from KubernetesStateHandleStore{configMapName='beam-flink-cluster-c506b6c290cc2d96a0e3f0eea10395c4-config-map'}.
2022-12-11 16:06:09,276 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Found 1 checkpoints in KubernetesStateHandleStore{configMapName='beam-flink-cluster-c506b6c290cc2d96a0e3f0eea10395c4-config-map'}.
2022-12-11 16:06:09,277 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Trying to fetch 1 checkpoints from storage.
2022-12-11 16:06:09,277 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Trying to retrieve checkpoint 106.
...
2022-12-11 16:06:09,504 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution of job 'State machine job' (c506b6c290cc2d96a0e3f0eea10395c4) under job master id 8c15f93c28f74b20cc1b30ccb4da4cc3.
2022-12-11 16:06:09,506 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2022-12-11 16:06:09,507 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job State machine job (c506b6c290cc2d96a0e3f0eea10395c4) switched from state CREATED to RUNNING.
...
2022-12-11 16:06:18,799 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 107 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1670774778787 for job c506b6c290cc2d96a0e3f0eea10395c4.
2022-12-11 16:06:18,884 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 107 for job c506b6c290cc2d96a0e3f0eea10395c4 (8338 bytes, checkpointDuration=72 ms, finalizationTime=25 ms).
2022-12-11 16:06:20,803 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 108 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1670774780787 for job c506b6c290cc2d96a0e3f0eea10395c4.
2022-12-11 16:06:20,843 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 108 for job c506b6c290cc2d96a0e3f0eea10395c4 (15169 bytes, checkpointDuration=32 ms, finalizationTime=24 ms).
...
As can be seen above, the job was restored and continued to create checkpoints from its last checkpoint.
When the job is stopped/cancelled manually, the HA data, including the checkpoints will also be automatically deleted:
1
2
3
4
5
6
7
...
2022-12-11 16:16:34,782 INFO org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices [] - Clean up the high availability data for job 034b3fe2d4f673aed68b38e787f8edf0.
2022-12-11 16:16:34,788 INFO org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices [] - Finished cleaning up the high availability data for job 034b3fe2d4f673aed68b38e787f8edf0.
2022-12-11 16:16:34,797 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed job graph 034b3fe2d4f673aed68b38e787f8edf0 from KubernetesStateHandleStore{configMapName='beam-flink-cluster-cluster-config-map'}.
This is because checkpoints, by default, are only used to resume a job from failures. However you can set the application configuration to retain the checkpoint on job cancellation.
Further information can be found in the Documentation on Flink HA. Details about Flink configuration can be found on Flink 1.15.2 configuration page. More information can also be found on Flink Checkpoints and Checkpoints vs savepoints.
Summary
This post attempts to explain how to setup high availability for a flink cluster running locally in a kind cluster but the same can be applied in an actual deployed flink cluster. In that case, the state backend should be changed to rocksdb
as well as other tweaks to the configuration which is for another article.
H4ppy H4ck1n6 !!!