Kafka local persistent volume with Kubernetes

Kafka local persistent volume with Kubernetes

This article is for users, who are familiar with Kubernetes cluster, its basic principles and Kafka and solves a specific data persistence problem related to running a "Confluent Platform" in Kubernetes cluster. It took me several hours to study the priciples and come up with this solution which works great. It will hopefully help other users having the same use case.

Cluster Storage PVC PV

Image Source: https://cloud.ibm.com/docs/openshift?topic=openshift-kube_concepts&locale=en

Solving persistence for your Kafka cluster when using Confluent Platform

What is Confluent Platform?

If you wanted to set up Kafka on Kubernetes cluster together with ksqlDB, you must have found a neat helm chart, which eases the deployment of this platform on your Kubernetes cluster from Confluent Platform.

This is the URL of the Confluent Platform helm chart:

By default, it starts 3 instances of Kafka, Zookeeper, ksql server, Kafka REST endpoints, Kafka connect and Schema Registry. You can use it right away and it's pre-configured the way that they all see each other and act as one platform.

Problem: how to setup persistence?

When you uninstall the helm chart and pods don't exist anymore, data which were stored in Kafka instances are gone. To prevent that, you should set in values.yaml:

  persistence:
    enabled: true

This would create a persistence volume claims (PVC), which would lookup a persistence volumes (PV) and bind them

But, you would probably end up with something like this: Persistent Volume Claim

There are several persistent volume claims (PVC), but are unable to find the corresponding persistent volume (PV)

Let's create the persistent volumes

storageClassName: local-storage

What is that? for Kafka, you should not use NFS (network file system) to use as persistent volume. Why? Because it's sooo slow. What's the answer then?

Local persistent volume!

First, you should create storage class and name it e.g. local-storage. Storage class can determine the "quality" of the service, or how fast such storage is. For example, you can have big, but slow storage, which you'd only use for backups, or fast and smaller SSD. This is what's StorageClass about. Let's name ours just "local-storage".

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: local-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer

In case of creating local storage, you need to set

provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer

Then, check out these lines:

  local:
    path: /mnt/data/datadir-0-cp-helm-charts-cp-kafka-0
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - pc

Oh yeah, Kubernetes allows you to use directory on node and use it as storage. Why is it so cool? because such local storage is usually very FAST. Ideal for Kafka with so many data operations.

pc in this case is the node where this mount directory exists.

Why is it needed? because pods can run on any nodes of your cluster. This is why we need to specify on which exact node is this directory present.

Reference to official documentation https://kubernetes.io/docs/concepts/storage/volumes/#local

Great, we can create such fast local volumes and use them right away!

Not so fast.. Another problem is, that confluent kafka platform by default spawns 3 instances of kafka and zookeeper. Each instance needs to write and read data from directory which belongs to the instance.

But it's like that by default, or?

No :) when you have just plain PV and PVC, any PVC binds to any available PV.

For first run, it can be OK, but if you shut down the chart and run it again, they can mix up, taking first available PV. So then, it can happen, that Kafka has directory which belongs to zoopeeker, or kafka-0 has directory which was used by kafka-2. This usually results in error.

What's the solution to this problem?

You need to pre-bind PVC to specific PV

How? By specifying a claimRef

  claimRef:
    name: datadir-0-cp-helm-charts-cp-kafka-0
    namespace: default

E.g. this persistence volume (PV) which belongs to kafka instance 0 will always bind to persistence volume claim (PVC) datadir-0-cp-helm-charts-cp-kafka-0 !

apiVersion: v1
kind: PersistentVolume
metadata:
  name: datadir-0-cp-helm-charts-cp-kafka-0
spec:
  claimRef:
    name: datadir-0-cp-helm-charts-cp-kafka-0
    namespace: default
  capacity:
    storage: 5Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /mnt/data/datadir-0-cp-helm-charts-cp-kafka-0
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - pc

Then, if you apply this claimRef to all other Persistent Volumes (PV), you will always have instances reading and writing from their own directory.

Full file which would create persistent volumes on your cluster, which can be used with confluent platform helm chart: (you'd just need to modify path and hostname value):

apiVersion: v1
kind: PersistentVolume
metadata:
  name: datadir-0-cp-helm-charts-cp-kafka-0
spec:
  claimRef:
    name: datadir-0-cp-helm-charts-cp-kafka-0
    namespace: default
  capacity:
    storage: 5Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /mnt/data/datadir-0-cp-helm-charts-cp-kafka-0
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - pc
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: datadir-0-cp-helm-charts-cp-kafka-1
spec:
  claimRef:
    name: datadir-0-cp-helm-charts-cp-kafka-1
    namespace: default
  capacity:
    storage: 5Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /mnt/data/datadir-0-cp-helm-charts-cp-kafka-1
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - pc
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: datadir-0-cp-helm-charts-cp-kafka-2
spec:
  claimRef:
    name: datadir-0-cp-helm-charts-cp-kafka-2
    namespace: default
  capacity:
    storage: 5Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /mnt/data/datadir-0-cp-helm-charts-cp-kafka-2
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - pc
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: datadir-cp-helm-charts-cp-zookeeper-0
spec:
  claimRef:
    name: datadir-cp-helm-charts-cp-zookeeper-0
    namespace: default
  capacity:
    storage: 5Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /mnt/data/datadir-cp-helm-charts-cp-zookeeper-0
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - pc
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: datalogdir-cp-helm-charts-cp-zookeeper-0
spec:
  claimRef:
    name: datalogdir-cp-helm-charts-cp-zookeeper-0
    namespace: default
  capacity:
    storage: 5Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  #  mountOptions:
  #    - hard
  #    - nfsvers=4.1
  local:
    path: /mnt/data/datalogdir-cp-helm-charts-cp-zookeeper-0
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - pc
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: datadir-cp-helm-charts-cp-zookeeper-1
spec:
  claimRef:
    name: datadir-cp-helm-charts-cp-zookeeper-1
    namespace: default
  capacity:
    storage: 5Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /mnt/data/datadir-cp-helm-charts-cp-zookeeper-1
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - pc
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: datalogdir-cp-helm-charts-cp-zookeeper-1
spec:
  claimRef:
    name: datalogdir-cp-helm-charts-cp-zookeeper-1
    namespace: default
  capacity:
    storage: 5Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /mnt/data/datalogdir-cp-helm-charts-cp-zookeeper-1
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - pc
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: datadir-cp-helm-charts-cp-zookeeper-2
spec:
  claimRef:
    name: datadir-cp-helm-charts-cp-zookeeper-2
    namespace: default
  capacity:
    storage: 5Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /mnt/data/datadir-cp-helm-charts-cp-zookeeper-2
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - pc
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: datalogdir-cp-helm-charts-cp-zookeeper-2
spec:
  claimRef:
    name: datalogdir-cp-helm-charts-cp-zookeeper-2
    namespace: default
  capacity:
    storage: 5Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /mnt/data/datalogdir-cp-helm-charts-cp-zookeeper-2
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - pc

this .yaml file you can simply save to e.g. pv.yaml, modify paths to match your filesystem, change hostname value and use:

kubeclt apply -f pv.yaml

This yaml file can be used for 3 instances, if you'd need more instances, you must add additional persistent volumes with specific claimRef values.

local directories

This is how directory with your data looks like. Each one is pre-bound to a specific pod according to the persistent volume's claimRef.

Then, let's just start the helm chart

helm install cp-helm-charts cp-helm-charts

Result:

Confluent platform with persistence

Voila! We've got out confluent platform up and running with local persistence! We can see 3 instances of Kafka and Zookeeper, then Kafka connect, Kafka REST API, ksqlDB server and Schema Registry in Running state! Data which you'd send to Kafka topics will be there even if you shut down the instances and start them again.

ksqlDB Here's how ksqlDB looks like

Let me know your experiences in comments!