aboutsummaryrefslogblamecommitdiff
path: root/kafka/test/kafkacat.yml
blob: fe1c39656ea2cbb5cecd839f5c834ff5aaa774a2 (plain) (tree)
1
2
3
4
5
6
7
8
9
10


               
                
                       


              

                      
 
                        
 
             

                        
 
              



                                        
                                                               

                                          
                                                       
 
                                                  
                                                                                    
                                                   
                         









                                                  
 






                                                                           
                                                           

                                                                                                                    
                                                                                                 

                                                                                                             
 
          





                             



                    
                           
                       




                          
                                                                                                          




                                                  
                         
                 
                         

                      

                          
                        
                
         
                
                       
     
                                                                                                      
             

                
                                        
                          


             
                                          
                            





                                                                                   

                 
                      
                                                                                                       

                         
                                     



                   

                                                                                          
                                       
                                                                           
           





                            
                                                                                                       

                         
                                     

                                    



                   
                                                                                                               

                                  




                            
                      
                                                                                                       

                         
                                     
                
                   
            
                        



                       
                


                                 



                       
                
                                           


                          

                            


                    
                        

                    
---
kind: ConfigMap
metadata:
  name: kafkacat
  namespace: test-kafka
apiVersion: v1
data:

  setup.sh: |-
    touch /tmp/testlog

    tail -f /tmp/testlog

  test.sh: |-
    exec >> /tmp/testlog
    exec 2>&1

    PC_WAIT=.2
    MAX_AGE=5

    UNIQUE="${HOSTNAME}@$(date -u -Ins)"

    echo "${UNIQUE: -41:5}:Test $UNIQUE" >> /shared/produce.tmp
    sleep $PC_WAIT
    LAST=$(tail -n 1 /shared/consumed.tmp)
    [ -z "$LAST" ] && echo "Nothing consumed" && exit 1

    LAST_TS=$(echo $LAST | awk -F';' '{print $1}')
    [ -z "$LAST_TS" ] && echo "Failed to get timestamp for message: $LAST" && exit 1
    LAST_MSG=$(echo $LAST | awk -F';' '{print $4}')
    NOW=$(date -u +%s%3N)
    DIFF_S=$((($NOW - $LAST_TS)/1000))
    DIFF_MS=$((($NOW - $LAST_TS)%1000))
    #echo "$NOW ($(date +%FT%H:%M:%S.%3N)):"
    #echo "$LAST_TS"

    if [ $DIFF_S -gt $MAX_AGE ]; then
      echo "Last message is $DIFF_S.$DIFF_MS old:"
      echo "$LAST_MSG"
      exit 10
    fi

    if [[ "$LAST_MSG" != *"$UNIQUE" ]]; then
      echo "Last message (at $LAST_TS) isn't from this test run ($UNIQUE):"
      echo "$LAST_MSG"
      exit 11
    fi

    # get info about this message
    kafkacat -Q -b $BOOTSTRAP -t test-kafkacat:0:$LAST_TS \
      -X socket.timeout.ms=600 -X session.timeout.ms=300 -X request.timeout.ms=50 -X metadata.request.timeout.ms=600
    [ $? -eq 0 ] || echo "At $(date +%FT%H:%M:%S.%3N) bootstrap broker(s) might be down"
    # but don't fail the test; producer and consumer should keep going if there are other brokers

    # We haven't asserted that the consumer works, so we'll just have to assume that it will exit if it fails

    exit 0

  quit-on-nonzero-exit.sh: |-
    exec >> /tmp/testlog
    exec 2>&1

    exit 0
---
apiVersion: batch/v1
kind: Job
metadata:
  name: topic-test-kafkacat
  namespace: test-kafka
spec:
  template:
    spec:
      containers:
      - name: topic-create
        image: solsson/kafka:1.0.0@sha256:17fdf1637426f45c93c65826670542e36b9f3394ede1cb61885c6a4befa8f72d
        command:
        - ./bin/kafka-topics.sh
        - --zookeeper
        -   zookeeper.kafka.svc.cluster.local:2181
        - --create
        - --if-not-exists
        - --topic
        -   test-kafkacat
        - --partitions
        -   "1"
      restartPolicy: Never
---
apiVersion: apps/v1beta2
kind: ReplicaSet
metadata:
  name: kafkacat
  namespace: test-kafka
spec:
  # Note that this test sets a consumer group, but asserts assume that the tests gets its own messages
  replicas: 1
  selector:
    matchLabels:
      test-target: kafka-client-kafkacat
      test-type: readiness
  template:
    metadata:
      labels:
        test-target: kafka-client-kafkacat
        test-type: readiness
        # for example:
        # readonly - can be used in production
        # isolated - read/write but in a manner that does not affect other services
        # load - unsuitable for production because it uses significant resources
        # chaos - unsuitable for production because it injects failure modes
        #test-use:
    spec:
      containers:
      - name: producer
        image: solsson/kafkacat@sha256:b32eedf936f3cde44cd164ddc77dfcf7565a8af4e357ff6de1abe4389ca530c9
        env:
        - name: BOOTSTRAP
          value: bootstrap.kafka:9092
        command:
        - /bin/bash
        - -cex
        - >
          echo "--- start $HOSTNAME $(date --iso-8601='ns' -u) ---" >> /shared/produce.tmp
          ;
          tail -f /shared/produce.tmp |
          kafkacat -P -b $BOOTSTRAP -t test-kafkacat -v -T -d broker -K ':'
          ;
        volumeMounts:
        - name: config
          mountPath: /test
        - name: shared
          mountPath: /shared
      - name: consumer
        image: solsson/kafkacat@sha256:b32eedf936f3cde44cd164ddc77dfcf7565a8af4e357ff6de1abe4389ca530c9
        env:
        - name: BOOTSTRAP
          value: bootstrap.kafka:9092
        - name: CONSUMER_GROUP_ID
          value: test-kafkacat-group
        command:
        - /bin/bash
        - -cex
        - >
          kafkacat -b $BOOTSTRAP -G $CONSUMER_GROUP_ID test-kafkacat -o -1 -f '%T;%k:%p;%o;%s\n' -u -d broker |
          tee /shared/consumed.tmp
          ;
        volumeMounts:
        - name: config
          mountPath: /test
        - name: shared
          mountPath: /shared
      - name: testcase
        image: solsson/kafkacat@sha256:b32eedf936f3cde44cd164ddc77dfcf7565a8af4e357ff6de1abe4389ca530c9
        env:
        - name: BOOTSTRAP
          value: bootstrap.kafka:9092
        command:
        - /bin/bash
        - -e
        - /test/setup.sh
        readinessProbe:
          exec:
            command:
            - /bin/bash
            - -e
            - /test/test.sh
          initialDelaySeconds: 10
          periodSeconds: 10
        livenessProbe:
          exec:
            command:
            - /bin/bash
            - -e
            - /test/quit-on-nonzero-exit.sh
        volumeMounts:
        - name: config
          mountPath: /test
        - name: shared
          mountPath: /shared
      volumes:
      - name: config
        configMap:
          name: kafkacat
      - name: shared
        emptyDir: {}