aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStaffan Olsson <staffan@repos.se>2018-02-01 20:25:57 +0100
committerStaffan Olsson <staffan@repos.se>2018-02-01 20:25:57 +0100
commit7c1bc7ac341b98866187bd7b59a82f1c0140c661 (patch)
treeb2780b033011e26bf4899a4a8c94c6151a0a6bc8
parenta95da8726b11592219cf507b20ed67ad40a8f06a (diff)
downloadkubernetes-kafka-7c1bc7ac341b98866187bd7b59a82f1c0140c661.tar.gz
kubernetes-kafka-7c1bc7ac341b98866187bd7b59a82f1c0140c661.tar.bz2
kubernetes-kafka-7c1bc7ac341b98866187bd7b59a82f1c0140c661.zip
A copy of the kafkacat based test
-rw-r--r--kafka/test/consumer-group.yml181
1 files changed, 181 insertions, 0 deletions
diff --git a/kafka/test/consumer-group.yml b/kafka/test/consumer-group.yml
new file mode 100644
index 0000000..98410bc
--- /dev/null
+++ b/kafka/test/consumer-group.yml
@@ -0,0 +1,181 @@
+---
+kind: ConfigMap
+metadata:
+ name: kafkacat
+ namespace: test-kafka
+apiVersion: v1
+data:
+
+ setup.sh: |-
+ touch /tmp/testlog
+
+ tail -f /tmp/testlog
+
+ test.sh: |-
+ exec >> /tmp/testlog
+ exec 2>&1
+
+ PC_WAIT=.2
+ MAX_AGE=5
+
+ UNIQUE="${HOSTNAME}@$(date -u -Ins)"
+
+ echo "${UNIQUE: -41:5}:Test $UNIQUE" >> /shared/produce.tmp
+ sleep $PC_WAIT
+ LAST=$(tail -n 1 /shared/consumed.tmp)
+ [ -z "$LAST" ] && echo "Nothing consumed" && exit 1
+
+ LAST_TS=$(echo $LAST | awk -F';' '{print $1}')
+ [ -z "$LAST_TS" ] && echo "Failed to get timestamp for message: $LAST" && exit 1
+ LAST_MSG=$(echo $LAST | awk -F';' '{print $4}')
+ NOW=$(date -u +%s%3N)
+ DIFF_S=$((($NOW - $LAST_TS)/1000))
+ DIFF_MS=$((($NOW - $LAST_TS)%1000))
+ #echo "$NOW ($(date +%FT%H:%M:%S.%3N)):"
+ #echo "$LAST_TS"
+
+ if [ $DIFF_S -gt $MAX_AGE ]; then
+ echo "Last message is $DIFF_S.$DIFF_MS old:"
+ echo "$LAST_MSG"
+ exit 10
+ fi
+
+ if [[ "$LAST_MSG" != *"$UNIQUE" ]]; then
+ echo "Last message (at $LAST_TS) isn't from this test run ($UNIQUE):"
+ echo "$LAST_MSG"
+ exit 11
+ fi
+
+ # get info about this message
+ kafkacat -Q -b $BOOTSTRAP -t test-kafkacat:0:$LAST_TS \
+ -X socket.timeout.ms=600 -X session.timeout.ms=300 -X request.timeout.ms=50 -X metadata.request.timeout.ms=600
+ [ $? -eq 0 ] || echo "At $(date +%FT%H:%M:%S.%3N) bootstrap broker(s) might be down"
+ # but don't fail the test; producer and consumer should keep going if there are other brokers
+
+ # We haven't asserted that the consumer works, so we'll just have to assume that it will exit if it fails
+
+ exit 0
+
+ quit-on-nonzero-exit.sh: |-
+ exec >> /tmp/testlog
+ exec 2>&1
+
+ exit 0
+---
+apiVersion: batch/v1
+kind: Job
+metadata:
+ name: topic-test-kafkacat
+ namespace: test-kafka
+spec:
+ template:
+ spec:
+ containers:
+ - name: topic-create
+ image: solsson/kafka:1.0.0@sha256:17fdf1637426f45c93c65826670542e36b9f3394ede1cb61885c6a4befa8f72d
+ command:
+ - ./bin/kafka-topics.sh
+ - --zookeeper
+ - zookeeper.kafka.svc.cluster.local:2181
+ - --create
+ - --if-not-exists
+ - --topic
+ - test-kafkacat
+ - --partitions
+ - "1"
+ restartPolicy: Never
+---
+apiVersion: apps/v1beta2
+kind: ReplicaSet
+metadata:
+ name: kafkacat
+ namespace: test-kafka
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ test-target: kafka-client-kafkacat
+ test-type: readiness
+ template:
+ metadata:
+ labels:
+ test-target: kafka-client-kafkacat
+ test-type: readiness
+ # for example:
+ # readonly - can be used in production
+ # isolated - read/write but in a manner that does not affect other services
+ # load - unsuitable for production because it uses significant resources
+ # chaos - unsuitable for production because it injects failure modes
+ #test-use:
+ spec:
+ containers:
+ - name: producer
+ image: solsson/kafkacat@sha256:b32eedf936f3cde44cd164ddc77dfcf7565a8af4e357ff6de1abe4389ca530c9
+ env:
+ - name: BOOTSTRAP
+ value: bootstrap.kafka:9092
+ command:
+ - /bin/bash
+ - -cex
+ - >
+ echo "--- start $HOSTNAME $(date --iso-8601='ns' -u) ---" >> /shared/produce.tmp
+ ;
+ tail -f /shared/produce.tmp |
+ kafkacat -P -b $BOOTSTRAP -t test-kafkacat -v -T -d broker -K ':'
+ ;
+ volumeMounts:
+ - name: config
+ mountPath: /test
+ - name: shared
+ mountPath: /shared
+ - name: consumer
+ image: solsson/kafkacat@sha256:b32eedf936f3cde44cd164ddc77dfcf7565a8af4e357ff6de1abe4389ca530c9
+ env:
+ - name: BOOTSTRAP
+ value: bootstrap.kafka:9092
+ command:
+ - /bin/bash
+ - -cex
+ - >
+ kafkacat -C -b $BOOTSTRAP -t test-kafkacat -o -1 -f '%T;%k:%p;%o;%s\n' -u -d broker |
+ tee /shared/consumed.tmp
+ ;
+ volumeMounts:
+ - name: config
+ mountPath: /test
+ - name: shared
+ mountPath: /shared
+ - name: testcase
+ image: solsson/kafkacat@sha256:b32eedf936f3cde44cd164ddc77dfcf7565a8af4e357ff6de1abe4389ca530c9
+ env:
+ - name: BOOTSTRAP
+ value: bootstrap.kafka:9092
+ command:
+ - /bin/bash
+ - -e
+ - /test/setup.sh
+ readinessProbe:
+ exec:
+ command:
+ - /bin/bash
+ - -e
+ - /test/test.sh
+ initialDelaySeconds: 10
+ periodSeconds: 10
+ livenessProbe:
+ exec:
+ command:
+ - /bin/bash
+ - -e
+ - /test/quit-on-nonzero-exit.sh
+ volumeMounts:
+ - name: config
+ mountPath: /test
+ - name: shared
+ mountPath: /shared
+ volumes:
+ - name: config
+ configMap:
+ name: kafkacat
+ - name: shared
+ emptyDir: {}