aboutsummaryrefslogblamecommitdiff
path: root/kafka/test/consumer-group.yml
blob: 4b494eb89c38a572d9d577e955cac201d81d4828 (plain) (tree)
1
2
3
4


               
                              












































                                                                                    
                                                                         
















                                                                                                                    
                                         













                                                                                                          
                                       






                          
                              




                       
                                                      



                          
                                                        




















                                                                                                       
                                                                                         














                                                                                                       
                                                                                                                                





































                                                                                                       
                                      

                    
---
kind: ConfigMap
metadata:
  name: kafkacat-consumergroup
  namespace: test-kafka
apiVersion: v1
data:

  setup.sh: |-
    touch /tmp/testlog

    tail -f /tmp/testlog

  test.sh: |-
    exec >> /tmp/testlog
    exec 2>&1

    PC_WAIT=.2
    MAX_AGE=5

    UNIQUE="${HOSTNAME}@$(date -u -Ins)"

    echo "${UNIQUE: -41:5}:Test $UNIQUE" >> /shared/produce.tmp
    sleep $PC_WAIT
    LAST=$(tail -n 1 /shared/consumed.tmp)
    [ -z "$LAST" ] && echo "Nothing consumed" && exit 1

    LAST_TS=$(echo $LAST | awk -F';' '{print $1}')
    [ -z "$LAST_TS" ] && echo "Failed to get timestamp for message: $LAST" && exit 1
    LAST_MSG=$(echo $LAST | awk -F';' '{print $4}')
    NOW=$(date -u +%s%3N)
    DIFF_S=$((($NOW - $LAST_TS)/1000))
    DIFF_MS=$((($NOW - $LAST_TS)%1000))
    #echo "$NOW ($(date +%FT%H:%M:%S.%3N)):"
    #echo "$LAST_TS"

    if [ $DIFF_S -gt $MAX_AGE ]; then
      echo "Last message is $DIFF_S.$DIFF_MS old:"
      echo "$LAST_MSG"
      exit 10
    fi

    if [[ "$LAST_MSG" != *"$UNIQUE" ]]; then
      echo "Last message (at $LAST_TS) isn't from this test run ($UNIQUE):"
      echo "$LAST_MSG"
      exit 11
    fi

    # get info about this message
    kafkacat -Q -b $BOOTSTRAP -t test-kafkacat-consumergroup:0:$LAST_TS \
      -X socket.timeout.ms=600 -X session.timeout.ms=300 -X request.timeout.ms=50 -X metadata.request.timeout.ms=600
    [ $? -eq 0 ] || echo "At $(date +%FT%H:%M:%S.%3N) bootstrap broker(s) might be down"
    # but don't fail the test; producer and consumer should keep going if there are other brokers

    # We haven't asserted that the consumer works, so we'll just have to assume that it will exit if it fails

    exit 0

  quit-on-nonzero-exit.sh: |-
    exec >> /tmp/testlog
    exec 2>&1

    exit 0
---
apiVersion: batch/v1
kind: Job
metadata:
  name: topic-test-kafkacat-consumergroup
  namespace: test-kafka
spec:
  template:
    spec:
      containers:
      - name: topic-create
        image: solsson/kafka:1.0.0@sha256:17fdf1637426f45c93c65826670542e36b9f3394ede1cb61885c6a4befa8f72d
        command:
        - ./bin/kafka-topics.sh
        - --zookeeper
        -   zookeeper.kafka.svc.cluster.local:2181
        - --create
        - --if-not-exists
        - --topic
        -   test-kafkacat-consumergroup
        - --partitions
        -   "1"
      restartPolicy: Never
---
apiVersion: apps/v1beta2
kind: ReplicaSet
metadata:
  name: kafkacat-consumergroup
  namespace: test-kafka
spec:
  replicas: 1
  selector:
    matchLabels:
      test-target: kafka-client-kafkacat-consumergroup
      test-type: readiness
  template:
    metadata:
      labels:
        test-target: kafka-client-kafkacat-consumergroup
        test-type: readiness
        # for example:
        # readonly - can be used in production
        # isolated - read/write but in a manner that does not affect other services
        # load - unsuitable for production because it uses significant resources
        # chaos - unsuitable for production because it injects failure modes
        #test-use:
    spec:
      containers:
      - name: producer
        image: solsson/kafkacat@sha256:b32eedf936f3cde44cd164ddc77dfcf7565a8af4e357ff6de1abe4389ca530c9
        env:
        - name: BOOTSTRAP
          value: bootstrap.kafka:9092
        command:
        - /bin/bash
        - -cex
        - >
          echo "--- start $HOSTNAME $(date --iso-8601='ns' -u) ---" >> /shared/produce.tmp
          ;
          tail -f /shared/produce.tmp |
          kafkacat -P -b $BOOTSTRAP -t test-kafkacat-consumergroup -v -T -d broker -K ':'
          ;
        volumeMounts:
        - name: config
          mountPath: /test
        - name: shared
          mountPath: /shared
      - name: consumer
        image: solsson/kafkacat@sha256:b32eedf936f3cde44cd164ddc77dfcf7565a8af4e357ff6de1abe4389ca530c9
        env:
        - name: BOOTSTRAP
          value: bootstrap.kafka:9092
        command:
        - /bin/bash
        - -cex
        - >
          kafkacat -b $BOOTSTRAP -G test-kafkacat-consumergroup test-kafkacat-consumergroup -f '%T;%k:%p;%o;%s\n' -u -d broker |
          tee /shared/consumed.tmp
          ;
        volumeMounts:
        - name: config
          mountPath: /test
        - name: shared
          mountPath: /shared
      - name: testcase
        image: solsson/kafkacat@sha256:b32eedf936f3cde44cd164ddc77dfcf7565a8af4e357ff6de1abe4389ca530c9
        env:
        - name: BOOTSTRAP
          value: bootstrap.kafka:9092
        command:
        - /bin/bash
        - -e
        - /test/setup.sh
        readinessProbe:
          exec:
            command:
            - /bin/bash
            - -e
            - /test/test.sh
          initialDelaySeconds: 10
          periodSeconds: 10
        livenessProbe:
          exec:
            command:
            - /bin/bash
            - -e
            - /test/quit-on-nonzero-exit.sh
        volumeMounts:
        - name: config
          mountPath: /test
        - name: shared
          mountPath: /shared
      volumes:
      - name: config
        configMap:
          name: kafkacat-consumergroup
      - name: shared
        emptyDir: {}