--- kind: ConfigMap metadata: name: kafkacat namespace: test-kafka apiVersion: v1 data: setup.sh: |- touch /tmp/testlog tail -f /tmp/testlog test.sh: |- exec >> /tmp/testlog exec 2>&1 PC_WAIT=.2 MAX_AGE=5 UNIQUE="${HOSTNAME}@$(date -u -Ins)" echo "${UNIQUE: -41:5}:Test $UNIQUE" >> /shared/produce.tmp sleep $PC_WAIT LAST=$(tail -n 1 /shared/consumed.tmp) [ -z "$LAST" ] && echo "Nothing consumed" && exit 1 LAST_TS=$(echo $LAST | awk -F';' '{print $1}') [ -z "$LAST_TS" ] && echo "Failed to get timestamp for message: $LAST" && exit 1 LAST_MSG=$(echo $LAST | awk -F';' '{print $4}') NOW=$(date -u +%s%3N) DIFF_S=$((($NOW - $LAST_TS)/1000)) DIFF_MS=$((($NOW - $LAST_TS)%1000)) #echo "$NOW ($(date +%FT%H:%M:%S.%3N)):" #echo "$LAST_TS" if [ $DIFF_S -gt $MAX_AGE ]; then echo "Last message is $DIFF_S.$DIFF_MS old:" echo "$LAST_MSG" exit 10 fi if [[ "$LAST_MSG" != *"$UNIQUE" ]]; then echo "Last message (at $LAST_TS) isn't from this test run ($UNIQUE):" echo "$LAST_MSG" exit 11 fi # get info about this message kafkacat -Q -b $BOOTSTRAP -t test-kafkacat:0:$LAST_TS \ -X socket.timeout.ms=600 -X session.timeout.ms=300 -X request.timeout.ms=50 -X metadata.request.timeout.ms=600 [ $? -eq 0 ] || echo "At $(date +%FT%H:%M:%S.%3N) bootstrap broker(s) might be down" # but don't fail the test; producer and consumer should keep going if there are other brokers # We haven't asserted that the consumer works, so we'll just have to assume that it will exit if it fails exit 0 quit-on-nonzero-exit.sh: |- exec >> /tmp/testlog exec 2>&1 exit 0 --- apiVersion: batch/v1 kind: Job metadata: name: topic-test-kafkacat namespace: test-kafka spec: template: spec: containers: - name: topic-create image: solsson/kafka:1.0.0@sha256:17fdf1637426f45c93c65826670542e36b9f3394ede1cb61885c6a4befa8f72d command: - ./bin/kafka-topics.sh - --zookeeper - zookeeper.kafka.svc.cluster.local:2181 - --create - --if-not-exists - --topic - test-kafkacat - --partitions - "1" restartPolicy: Never --- apiVersion: apps/v1beta2 kind: ReplicaSet metadata: name: kafkacat namespace: test-kafka spec: # Note that this test sets a consumer group, but asserts assume that the tests gets its own messages replicas: 1 selector: matchLabels: test-target: kafka-client-kafkacat test-type: readiness template: metadata: labels: test-target: kafka-client-kafkacat test-type: readiness # for example: # readonly - can be used in production # isolated - read/write but in a manner that does not affect other services # load - unsuitable for production because it uses significant resources # chaos - unsuitable for production because it injects failure modes #test-use: spec: containers: - name: producer image: solsson/kafkacat@sha256:b32eedf936f3cde44cd164ddc77dfcf7565a8af4e357ff6de1abe4389ca530c9 env: - name: BOOTSTRAP value: bootstrap.kafka:9092 command: - /bin/bash - -cex - > echo "--- start $HOSTNAME $(date --iso-8601='ns' -u) ---" >> /shared/produce.tmp ; tail -f /shared/produce.tmp | kafkacat -P -b $BOOTSTRAP -t test-kafkacat -v -T -d broker -K ':' ; volumeMounts: - name: config mountPath: /test - name: shared mountPath: /shared - name: consumer image: solsson/kafkacat@sha256:b32eedf936f3cde44cd164ddc77dfcf7565a8af4e357ff6de1abe4389ca530c9 env: - name: BOOTSTRAP value: bootstrap.kafka:9092 - name: CONSUMER_GROUP_ID value: test-kafkacat-group command: - /bin/bash - -cex - > kafkacat -b $BOOTSTRAP -G $CONSUMER_GROUP_ID test-kafkacat -o -1 -f '%T;%k:%p;%o;%s\n' -u -d broker | tee /shared/consumed.tmp ; volumeMounts: - name: config mountPath: /test - name: shared mountPath: /shared - name: testcase image: solsson/kafkacat@sha256:b32eedf936f3cde44cd164ddc77dfcf7565a8af4e357ff6de1abe4389ca530c9 env: - name: BOOTSTRAP value: bootstrap.kafka:9092 command: - /bin/bash - -e - /test/setup.sh readinessProbe: exec: command: - /bin/bash - -e - /test/test.sh initialDelaySeconds: 10 periodSeconds: 10 livenessProbe: exec: command: - /bin/bash - -e - /test/quit-on-nonzero-exit.sh volumeMounts: - name: config mountPath: /test - name: shared mountPath: /shared volumes: - name: config configMap: name: kafkacat - name: shared emptyDir: {}