Storing Flink savepoint and artifacts in AWS S3

We can run jobs in Flink in either Batch mode or Streaming mode. In streaming mode, we are able to save the state of the job in a specified interval. This allows for the job to resume from a given state in case of failure. These saved states are known as savepoints

A thorough discussion on the subject is beyond the scope of this post so I refer the avid reader to the Flink guide on savepoints.

An important distinction is that savepoints are different from checkpoints in the sense that the former requires user intervention while the latter is invoked by the flink cluster through for example in HA (High Availability) mode.

An important thing to note is that savepoints can only be invoked for streaming jobs; it won’t work for batch jobs.

In order to test such a functionality with the Flink Operator, we use the Kafka test guide example, which provides a streaming example using kafka pipelines.

The rest of this post assumes you have a running kind cluster with the flink operator installed. If not, please refer to the previous posts on the subject.

The following steps detail how to use such an image in a streaming job running in a kubernetes cluster locally via the flink operator:

1. Build the kafka example

We need to clone the Flink playground example and make the following changes to the Dockerfile:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
###############################################################################
# Build Operations Playground Image
###############################################################################

FROM apache/flink:1.15.2-scala_2.12-java8

WORKDIR /opt/flink/bin

# Copy s3 plugins
RUN cd ../ && \
    mkdir -p plugins/s3-fs-hadoop && \
    cp opt/flink-s3-fs-hadoop-1.15.2.jar plugins/s3-fs-hadoop

# Copy Click Count Job
COPY --from=builder /opt/flink-playground-clickcountjob/target/flink-playground-clickcountjob-*.jar /opt/ClickCountJob.jar

To save savepoints to remote cloud storage such as S3, we need to enable the filestorage plugins within the apache/flink image. This is done by copying the /opt/flink/opt/flink-s3-fs-hadoop-1.15.2.jar into its own plugin directory of /opt/flink/plugins/flink-s3-fs-hadoop-1.15.2.jar

Once the custom image is built we can load it into the kind cluster

2. Install kafka

Next we need to install the helm repo for the kafka cluster. The following script is adapted from Kafka test guide:

1
2
3
4
5
6
7
kubectl create ns kafka

helm repo add incubator https://charts.helm.sh/incubator

helm install my-kafka incubator/kafka --namespace kafka

helm status my-kafka -n kafka

Make the script executable and run it. Check that the namespace has the pods running.

3. Create the deployment

We need to create a generator deployment that writes data to the kafka cluster. This is adopted from the Kafka test guide:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Example from the flink playground
# Deployment that writes data to kafka cluster
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-click-generator
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-click-generator
  template:
    metadata:
      labels:
        app: kafka-click-generator
    spec:
      containers:
        - name: kafka-click-generator
          image: m1l0/flink-ops-playground:1.15.2-scala_2.12
          command: ["java"]
          args:
            - "-classpath"
            - "/opt/ClickCountJob.jar:/opt/flink/lib/*"
            - "org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator"
            - "--bootstrap.servers"
            - "my-kafka.kafka.svc.cluster.local:9092"
            - "--topic"
            - "input"

Note that we are using our own custom image built in step 1.

Create the deployment and check that the pods are running.

4. Create the consumer

The consumer of the data stream is a flink operator job that runs the ClickCount.jar application that consumes data from the kafka stream.

The job is deployed as a standalone application cluster via the flink operator.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
  name: clickcount
spec:
  flinkVersion: "1.15.2"
  image:
    name: m1l0/flink-ops-playground:1.15.2-scala_2.12
    pullPolicy: IfNotPresent
  jobManager:
    ports:
      ui: 8081
    resources:
      limits:
        memory: "2Gi"
        cpu: "200m"
  taskManager:
    replicas: 2
    resources:
      limits:
        memory: "2Gi"
        cpu: "200m"
  envVars:
  - name: ENABLE_BUILT_IN_PLUGINS
    value: flink-s3-fs-hadoop-1.15.2.jar;
  - name: AWS_ACCESS_KEY_ID
    valueFrom:
      secretKeyRef:
        name: flink-aws-secret
        key: access_key_id
  - name: AWS_SECRET_ACCESS_KEY
    valueFrom:
      secretKeyRef:
        name: flink-aws-secret
        key: secret_access_key
  job:
    jarFile: /opt/ClickCountJob.jar
    className: org.apache.flink.playgrounds.ops.clickcount.ClickEventCount
    args:
      [
        "--bootstrap.servers",
        "my-kafka.kafka.svc.cluster.local:9092",
        "--checkpointing",
        "--event-time",
      ]
    parallelism: 1
    savepointsDir: "s3a://flinkexps/savepoints"
    autoSavepointSeconds: 10
  flinkProperties:
    taskmanager.numberOfTaskSlots: "1"

There are several important things to note here.

The ENABLE_BUILT_IN_PLUGINS is required to allow the plugins to be copied over to the client else the job will fail with plugins not found error.

The AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY_ID are fetched from a kubernetes secret and mounted as env vars within the job container.

Within the job spec we need to declare the following 2 properties to allow for automatic savepoint creation via the flink operator.

The savePointsDir is the target location of our savepoints. Note that the file system prefix is s3a as we are using the hadoop filesystem integration and it only works with that prefix.

We also need to declare autoSavepointSeconds which is a non-negative integer which specifies how often to create a savepoint. In this example we set it to a lower/frequent interval of 10secs to test if it works.

Apply the above configuration and check the status of the job submitted via:

1
kubectl describe flinkcluster clickcount

If all goes well, after 10 seconds, you should see a stream of savepoints being created

Flink Application CLI AWS S3 console

5. Triggering savepoints

You can also trigger the manual creation of savepoints as highlighted in the Flink operator savepoints guide. Of all the approaches listed, the easiest one I found to work for me was to annotate the cluster manually.

For example, given the flinkcluster resource above, I can run:

1
kubectl annotate flinkclusters clickcount flinkclusters.flinkoperator.k8s.io/user-control=savepoint

You should see the message User control savepoint triggered event message and the savepoint shown under the job specs.

Saving artifacts

To save artifacts to S3 from batch jobs, we need to do the same as above:

  • Create a custom image where we copy over the hadoop s3 plugins
  • Create and mount the env vars
  • Specify the target s3 bucket with s3a prefix

Below is an example of a batch job submitted to the flink operator:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
  name: pyflink-wordcount
  namespace: default
spec:
  flinkVersion: "1.15.2"
  image:
    name: m1l0/pyflink:1.15.2-scala_2.12
    pullPolicy: IfNotPresent
  taskManager:
    replicas: 1
    # Below is needed to access attached volumes as flink user
    securityContext:
      runAsUser: 9999
      runAsGroup: 9999
      fsGroup: 9999
  envVars:
  - name: ENABLE_BUILT_IN_PLUGINS
    value: flink-s3-fs-hadoop-1.15.2.jar;
  - name: AWS_ACCESS_KEY_ID
    valueFrom:
      secretKeyRef:
        name: flink-aws-secret
        key: access_key_id
  - name: AWS_SECRET_ACCESS_KEY
    valueFrom:
      secretKeyRef:
        name: flink-aws-secret
        key: secret_access_key
  job:
    pyFile: "examples/python/datastream/word_count.py"
    args: ["--output", "s3a://flinkexps/artifacts/pyflink/"]
    restartPolicy: Never
  flinkProperties:
    s3.path.style.access: "true"

The above runs an example pyflink job provided in the Flink docker image which we customized by copying over the hadoop plugins. Note that the same env vars as specified for streaming jobs must be present for it to work in batch mode.

Summary

This post attempts to explain how to create and store savepoints in remote cloud storage either through running a streaming job or as the artifacts of a batch job.

In the next posts, I will attempt to cover checkpoints and the high availability mode in a flink cluster.

H4ppy H4ck1n6 !!!