diff options
23 files changed, 820 insertions, 59 deletions
diff --git a/10broker-config.yml b/10broker-config.yml index 7d296bf..6ae5514 100644 --- a/10broker-config.yml +++ b/10broker-config.yml @@ -22,6 +22,16 @@ data: else sed -i "s/#init#broker.rack=#init#/broker.rack=$ZONE/" /etc/kafka/server.properties fi + + kubectl -n $POD_NAMESPACE label pod $POD_NAME kafka-broker-id=$KAFKA_BROKER_ID + + OUTSIDE_HOST=$(kubectl get node "$NODE_NAME" -o jsonpath='{.status.addresses[?(@.type=="InternalIP")].address}') + if [ $? -ne 0 ]; then + echo "Outside (i.e. cluster-external access) host lookup command failed" + else + OUTSIDE_HOST=${OUTSIDE_HOST}:3240${KAFKA_BROKER_ID} + sed -i "s|#init#advertised.listeners=OUTSIDE://#init#|advertised.listeners=OUTSIDE://${OUTSIDE_HOST}|" /etc/kafka/server.properties + fi } server.properties: |- @@ -49,9 +59,6 @@ data: #init#broker.rack=#init# - # Switch to enable topic deletion or not, default value is false - delete.topic.enable=true - ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from @@ -61,14 +68,18 @@ data: # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 + listeners=OUTSIDE://:9094,PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 + #init#advertised.listeners=OUTSIDE://#init#,PLAINTEXT://:9092 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OUTSIDE:PLAINTEXT + inter.broker.listener.name=PLAINTEXT # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 @@ -259,3 +270,22 @@ data: # Change to DEBUG to enable audit log for the authorizer log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender log4j.additivity.kafka.authorizer.logger=false + + jmx-kafka-prometheus.yml: |+ + lowercaseOutputName: true + jmxUrl: service:jmx:rmi:///jndi/rmi://127.0.0.1:5555/jmxrmi + ssl: false + whitelistObjectNames: ["kafka.server:*","java.lang:*"] + rules: + - pattern : kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=(.+)><>Value + - pattern : kafka.server<type=BrokerTopicMetrics, name=(BytesInPerSec|BytesOutPerSec|MessagesInPerSec), topic=(.+)><>OneMinuteRate + - pattern : kafka.server<type=KafkaRequestHandlerPool, name=RequestHandlerAvgIdlePercent><>OneMinuteRate + - pattern : kafka.server<type=Produce><>queue-size + - pattern : kafka.server<type=ReplicaManager, name=(PartitionCount|UnderReplicatedPartitions)><>(Value|OneMinuteRate) + - pattern : kafka.server<type=controller-channel-metrics, broker-id=(.+)><>(.*) + - pattern : kafka.server<type=socket-server-metrics, networkProcessor=(.+)><>(.*) + - pattern : kafka.server<type=Fetch><>queue-size + - pattern : kafka.server<type=SessionExpireListener, name=(.+)><>OneMinuteRate + - pattern : java.lang<type=OperatingSystem><>SystemCpuLoad + - pattern : java.lang<type=Memory><HeapMemoryUsage>used + - pattern : java.lang<type=OperatingSystem><>FreePhysicalMemorySize diff --git a/50kafka.yml b/50kafka.yml index c564ffe..db17c3d 100644 --- a/50kafka.yml +++ b/50kafka.yml @@ -1,38 +1,61 @@ -apiVersion: apps/v1beta1 +apiVersion: apps/v1beta2 kind: StatefulSet metadata: name: kafka namespace: kafka spec: +spec: + selector: + matchLabels: + app: kafka serviceName: "broker" replicas: 3 + updateStrategy: + type: RollingUpdate template: metadata: labels: app: kafka annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "5556" spec: terminationGracePeriodSeconds: 30 initContainers: - name: init-config - image: solsson/kafka-initutils@sha256:c275d681019a0d8f01295dbd4a5bae3cfa945c8d0f7f685ae1f00f2579f08c7d + image: solsson/kafka-initutils@sha256:c98d7fb5e9365eab391a5dcd4230fc6e72caf929c60f29ff091e3b0215124713 env: - name: NODE_NAME valueFrom: fieldRef: fieldPath: spec.nodeName + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace command: ['/bin/bash', '/etc/kafka/init.sh'] volumeMounts: - name: config mountPath: /etc/kafka containers: - name: broker - image: solsson/kafka:0.11.0.0@sha256:b27560de08d30ebf96d12e74f80afcaca503ad4ca3103e63b1fd43a2e4c976ce + image: solsson/kafka:1.0.0@sha256:17fdf1637426f45c93c65826670542e36b9f3394ede1cb61885c6a4befa8f72d env: - name: KAFKA_LOG4J_OPTS value: -Dlog4j.configuration=file:/etc/kafka/log4j.properties + - name: JMX_PORT + value: "5555" ports: - - containerPort: 9092 + - name: jmx + containerPort: 5555 + - name: inside + containerPort: 9092 + - name: outside + containerPort: 9094 command: - ./bin/kafka-server-start.sh - /etc/kafka/server.properties @@ -49,16 +72,40 @@ spec: cpu: 100m memory: 512Mi readinessProbe: - exec: - command: - - /bin/sh - - -c - - 'echo "" | nc -w 1 127.0.0.1 9092' + tcpSocket: + port: 9092 + timeoutSeconds: 10 volumeMounts: - name: config mountPath: /etc/kafka - name: data mountPath: /var/lib/kafka/data + - name: metrics + image: solsson/kafka-prometheus-jmx-exporter@sha256:348b0f6510b08dff70ba468a16d25dc8def480fe79aca0e3c76f098d67b108a3 + command: + - java + - -Xmx64M + - -XX:MaxMetaspaceSize=32m + - -jar + - jmx_prometheus_httpserver.jar + - "5556" + - /etc/kafka/jmx-kafka-prometheus.yml + ports: + - containerPort: 5556 + livenessProbe: + httpGet: + path: /liveness + port: 5556 + periodSeconds: 60 + resources: + requests: + cpu: 0m + memory: 100Mi + limits: + memory: 150Mi + volumeMounts: + - name: config + mountPath: /etc/kafka volumes: - name: config configMap: @@ -68,6 +115,7 @@ spec: name: data spec: accessModes: [ "ReadWriteOnce" ] + storageClassName: kafka-broker resources: requests: storage: 200Gi @@ -1,17 +1,64 @@ +# [SMP](https://github.com/StreamingMicroservicesPlatform) using Kubernetes and Kafka +Streaming Platform for start-ups and small DevOps teams. +A self-hosted PaaS, if you will, for using Kafka as backend for Your Microservices. -# Kafka on Kubernetes +We do what [Confluent's quickstart](https://docs.confluent.io/current/quickstart.html) does, +but in Kubernetes as that's where services live in production. +The setup includes Kafka, Schema Regitstry and REST Proxy. + +## Scope + + * Starts nicely on Minikube, with resources left for your services. + * Can scale up to production workloads. + - Example: `kubectl apply -f ./scale-3/` (TODO; we haven't really solved how to make scale a simple parameter yet) + +## Decisions up front + +Before you `kubectl` anything, you need to decide on: + + * Storage classes for Kafka and Zookeeper volumes. Select one of: + - `kubectl apply -f configure-minikube/` + - `kubectl apply -f configure-gke-pd/` + * Run self-tests or not. They do generate some load, but indicate if the platform is working or not. + - To include tests, replace `apply -f` with `apply -fR` in your `kubectl`s below. + - Anything that isn't READY in `kubectl get pods -l test-target=kafka,test-type=readiness -w --all-namespaces` is a failed test. + +## Create + +Prerequisites: + * Kubernets 1.8 (the [v2.0.0](https://github.com/Yolean/kubernetes-kafka/releases/tag/v2.0.0) release supports earlier versions) + +Required: + * `kubectl apply -f ./zookeeper/` + * `kubectl apply -f ./kafka/` + +Optional: + * `kubectl apply -f ./confluent-platform/` + * `kubectl apply -f ./prometheus/` + * `kubectl apply -f ./kube-events/` + * `kubectl apply -f ./ksql/` (coming in v3.1.0) + * `kubectl apply -f ./log-processing-pipeline/` (coming in v3.2.0) + + + + + +# (previous readme) Kafka on Kubernetes Transparent Kafka setup that you can grow with. Good for both experiments and production. How to use: + * Good to know: you'll likely want to fork this repo. It prioritizes clarity over configurability, using plain manifests and .propeties files; no client side logic. * Run a Kubernetes cluster, [minikube](https://github.com/kubernetes/minikube) or real. * Quickstart: use the `kubectl apply`s below. - * Kafka for real: fork and have a look at [addon](https://github.com/Yolean/kubernetes-kafka/labels/addon)s. + * Have a look at [addon](https://github.com/Yolean/kubernetes-kafka/labels/addon)s, or the official forks: + - [kubernetes-kafka-small](https://github.com/Reposoft/kubernetes-kafka-small) for single-node clusters like Minikube. + - [StreamingMicroservicesPlatform](https://github.com/StreamingMicroservicesPlatform/kubernetes-kafka) Like Confluent's [platform quickstart](https://docs.confluent.io/current/connect/quickstart.html) but for Kubernetes. * Join the discussion in issues and PRs. -No readable readme can properly introduce both [Kafka](http://kafka.apache.org/) and [Kubernets](https://kubernetes.io/), +No readable readme can properly introduce both [Kafka](http://kafka.apache.org/) and [Kubernetes](https://kubernetes.io/), but we think the combination of the two is a great backbone for microservices. Back when we read [Newman](http://samnewman.io/books/building_microservices/) we were beginners with both. Now we've read [Kleppmann](http://dataintensive.net/), [Confluent](https://www.confluent.io/blog/) and [SRE](https://landing.google.com/sre/book.html) and enjoy this "Streaming Platform" lock-in :smile:. @@ -27,6 +74,12 @@ The goal is to provide [Bootstrap servers](http://kafka.apache.org/documentation Zookeeper at `zookeeper.kafka.svc.cluster.local:2181`. +## Prepare storage classes + +For Minikube run `kubectl create -f configure-minikube/`. + +There's a similar setup for GKE, `configure/gke-*`. You might want to tweak it before creating. + ## Start Zookeeper The [Kafka book](https://www.confluent.io/resources/kafka-definitive-guide-preview-edition/) recommends that Kafka has its own Zookeeper cluster with at least 5 instances. @@ -61,16 +114,11 @@ For clusters that enfoce [RBAC](https://kubernetes.io/docs/admin/authorization/r kubectl apply -f rbac-namespace-default/ ``` -## Caution: `Delete` Reclaim Policy is default - -In production you likely want to [manually set Reclaim Policy](https://kubernetes.io/docs/tasks/administer-cluster/change-pv-reclaim-policy/), -our your data will be gone if the generated [volume claim](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistentvolumeclaims)s are deleted. - -This can't be done [in manifests](https://github.com/Yolean/kubernetes-kafka/pull/50), -at least not [until Kubernetes 1.8](https://github.com/kubernetes/features/issues/352). - ## Tests +Tests are based on the [kube-test](https://github.com/Yolean/kube-test) concept. +Like the rest of this repo they have `kubectl` as the only local dependency. + ``` kubectl apply -f test/ # Anything that isn't READY here is a failed test diff --git a/configure-minikube/storageclass-broker-minikube.yml b/configure-minikube/storageclass-broker-minikube.yml new file mode 100644 index 0000000..3cff3b2 --- /dev/null +++ b/configure-minikube/storageclass-broker-minikube.yml @@ -0,0 +1,5 @@ +kind: StorageClass +apiVersion: storage.k8s.io/v1 +metadata: + name: kafka-broker +provisioner: k8s.io/minikube-hostpath diff --git a/configure-minikube/storageclass-zookeeper-minikube.yml b/configure-minikube/storageclass-zookeeper-minikube.yml new file mode 100644 index 0000000..ba89eb4 --- /dev/null +++ b/configure-minikube/storageclass-zookeeper-minikube.yml @@ -0,0 +1,5 @@ +kind: StorageClass +apiVersion: storage.k8s.io/v1 +metadata: + name: kafka-zookeeper +provisioner: k8s.io/minikube-hostpath diff --git a/configure/gke-storageclass-broker-pd.yml b/configure/gke-storageclass-broker-pd.yml new file mode 100644 index 0000000..dbb7203 --- /dev/null +++ b/configure/gke-storageclass-broker-pd.yml @@ -0,0 +1,8 @@ +kind: StorageClass +apiVersion: storage.k8s.io/v1 +metadata: + name: kafka-broker +provisioner: kubernetes.io/gce-pd +reclaimPolicy: Retain +parameters: + type: pd-standard diff --git a/configure/gke-storageclass-zookeeper-ssd.yml b/configure/gke-storageclass-zookeeper-ssd.yml new file mode 100644 index 0000000..5d6673a --- /dev/null +++ b/configure/gke-storageclass-zookeeper-ssd.yml @@ -0,0 +1,8 @@ +kind: StorageClass +apiVersion: storage.k8s.io/v1 +metadata: + name: kafka-zookeeper +provisioner: kubernetes.io/gce-pd +reclaimPolicy: Retain +parameters: + type: pd-ssd diff --git a/events-kube/events-kube-kafka.yml b/events-kube/events-kube-kafka.yml new file mode 100644 index 0000000..086137d --- /dev/null +++ b/events-kube/events-kube-kafka.yml @@ -0,0 +1,51 @@ +apiVersion: apps/v1beta2 +kind: Deployment +metadata: + name: events-kube-kafka + namespace: kafka +spec: + replicas: 1 + strategy: + type: RollingUpdate + rollingUpdate: + # prefer duplicate events over missed + maxUnavailable: 0 + maxSurge: 1 + selector: + matchLabels: + app: events + from: kube + to: kafka + template: + metadata: + labels: + app: events + from: kube + to: kafka + spec: + containers: + - name: kafkacat-curl + image: solsson/kafkacat-curl@sha256:6ad61f2e6343359c3972d7a86815568c0a1d0560068134c5d702a152eb5123a0 + env: + - name: BOOTSTRAP + value: kafka-0.broker.kafka.svc.cluster.local:9092,kafka-1.broker.kafka.svc.cluster.local:9092,kafka-2.broker.kafka.svc.cluster.local:9092 + - name: TOPIC + value: ops-kube-events-all-json-001 + command: + - /bin/bash + - -ec + - > + curl + -f + -s + --cacert /run/secrets/kubernetes.io/serviceaccount/ca.crt + --header "Authorization: Bearer $(cat /run/secrets/kubernetes.io/serviceaccount/token)" + https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT/api/v1/watch/events + | + kafkacat + -b $BOOTSTRAP + -t $TOPIC + -P + -z snappy + -v + -d broker,topic diff --git a/events-kube/rbac/cluster-events-watcher.yml b/events-kube/rbac/cluster-events-watcher.yml new file mode 100644 index 0000000..c8384b6 --- /dev/null +++ b/events-kube/rbac/cluster-events-watcher.yml @@ -0,0 +1,30 @@ +# If events-kube-kafka-* goes crashlooping you probably need this +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1beta1 +metadata: + name: events-watcher + labels: + origin: github.com_Yolean_kubernetes-kafka +rules: +- apiGroups: + - "" + resources: + - events + verbs: + - watch +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1beta1 +metadata: + name: kafka-events-watcher + labels: + origin: github.com_Yolean_kubernetes-kafka +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: events-watcher +subjects: +- kind: ServiceAccount + name: default + namespace: kafka diff --git a/events-kube/test/events-topic.yml b/events-kube/test/events-topic.yml new file mode 100644 index 0000000..29891f7 --- /dev/null +++ b/events-kube/test/events-topic.yml @@ -0,0 +1,89 @@ +--- +kind: ConfigMap +metadata: + name: events-topic + namespace: test-kafka +apiVersion: v1 +data: + + setup.sh: |- + touch /tmp/testlog + + tail -f /tmp/testlog + + test.sh: |- + exec >> /tmp/testlog + exec 2>&1 + + PREVIOUS=$(sha1sum /tmp/event 2>/dev/null || echo "") + kafkacat -b $BOOTSTRAP -t $TOPIC -C -o -1 -c 1 | tee /tmp/event + CURRENT=$(sha1sum /tmp/event) + [ "$PREVIOUS" == "$CURRENT" ] && echo "{\"test-result\": \"No new event in $TOPIC\"}" && exit 1 + + exit 0 + + quit-on-nonzero-exit.sh: |- + exec >> /tmp/testlog + exec 2>&1 + + exit 0 + +--- +apiVersion: apps/v1beta2 +kind: Deployment +metadata: + name: events-topic + namespace: test-kafka +spec: + replicas: 1 + strategy: + type: Recreate + selector: + matchLabels: + test-target: events-topic + test-type: readiness + template: + metadata: + labels: + test-target: events-topic + test-type: readiness + # for example: + # readonly - can be used in production + # isolated - read/write but in a manner that does not affect other services + # load - unsuitable for production because it uses significant resources + # chaos - unsuitable for production because it injects failure modes + #test-use: + spec: + containers: + - name: testcase + image: solsson/kafkacat@sha256:ebebf47061300b14a4b4c2e1e4303ab29f65e4b95d34af1b14bb8f7ec6da7cef + env: + - name: BOOTSTRAP + value: kafka-0.broker.kafka.svc.cluster.local:9092,kafka-1.broker.kafka.svc.cluster.local:9092,kafka-2.broker.kafka.svc.cluster.local:9092 + - name: TOPIC + value: ops-kube-events-all-json-001 + command: + - /bin/bash + - -e + - /test/setup.sh + readinessProbe: + exec: + command: + - /bin/bash + - -e + - /test/test.sh + initialDelaySeconds: 10 + periodSeconds: 60 + livenessProbe: + exec: + command: + - /bin/bash + - -e + - /test/quit-on-nonzero-exit.sh + volumeMounts: + - name: config + mountPath: /test + volumes: + - name: config + configMap: + name: events-topic diff --git a/events-kube/topic-ops-kube-events-all-json.yml b/events-kube/topic-ops-kube-events-all-json.yml new file mode 100644 index 0000000..62f2b80 --- /dev/null +++ b/events-kube/topic-ops-kube-events-all-json.yml @@ -0,0 +1,32 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: topic-ops-kube-events-all-json + namespace: kafka +spec: + template: + metadata: + labels: + app: topic-create + topic-id: ops-kube-events-all-json + topic-gen: "001" + spec: + containers: + - name: kafka + image: solsson/kafka:0.11.0.0@sha256:b27560de08d30ebf96d12e74f80afcaca503ad4ca3103e63b1fd43a2e4c976ce + command: + - ./bin/kafka-topics.sh + - --zookeeper + - zookeeper:2181 + - --create + - --if-not-exists + - --topic + - ops-kube-events-all-json-001 + - --partitions + - "1" + - --replication-factor + - "1" + - --config + # 8 days + - retention.ms=691200000 + restartPolicy: Never diff --git a/outside-services/outside-0.yml b/outside-services/outside-0.yml new file mode 100644 index 0000000..7bc12bd --- /dev/null +++ b/outside-services/outside-0.yml @@ -0,0 +1,15 @@ +kind: Service +apiVersion: v1 +metadata: + name: outside-0 + namespace: kafka +spec: + selector: + app: kafka + kafka-broker-id: "0" + ports: + - protocol: TCP + targetPort: 9094 + port: 32400 + nodePort: 32400 + type: NodePort
\ No newline at end of file diff --git a/outside-services/outside-1.yml b/outside-services/outside-1.yml new file mode 100644 index 0000000..1642ee0 --- /dev/null +++ b/outside-services/outside-1.yml @@ -0,0 +1,15 @@ +kind: Service +apiVersion: v1 +metadata: + name: outside-1 + namespace: kafka +spec: + selector: + app: kafka + kafka-broker-id: "1" + ports: + - protocol: TCP + targetPort: 9094 + port: 32401 + nodePort: 32401 + type: NodePort
\ No newline at end of file diff --git a/outside-services/outside-2.yml b/outside-services/outside-2.yml new file mode 100644 index 0000000..78c313c --- /dev/null +++ b/outside-services/outside-2.yml @@ -0,0 +1,15 @@ +kind: Service +apiVersion: v1 +metadata: + name: outside-2 + namespace: kafka +spec: + selector: + app: kafka + kafka-broker-id: "2" + ports: + - protocol: TCP + targetPort: 9094 + port: 32402 + nodePort: 32402 + type: NodePort
\ No newline at end of file diff --git a/test/basic-produce-consume.yml b/test/basic-produce-consume.yml index fdacea0..f73b1ab 100644 --- a/test/basic-produce-consume.yml +++ b/test/basic-produce-consume.yml @@ -38,13 +38,17 @@ data: exit 0 --- -apiVersion: apps/v1beta1 +apiVersion: apps/v1beta2 kind: Deployment metadata: name: basic-produce-consume namespace: test-kafka spec: replicas: 1 + selector: + matchLabels: + test-target: kafka + test-type: readiness template: metadata: labels: @@ -53,7 +57,7 @@ spec: spec: containers: - name: testcase - image: solsson/kafka:0.11.0.0@sha256:b27560de08d30ebf96d12e74f80afcaca503ad4ca3103e63b1fd43a2e4c976ce + image: solsson/kafka:1.0.0@sha256:17fdf1637426f45c93c65826670542e36b9f3394ede1cb61885c6a4befa8f72d env: - name: BOOTSTRAP value: kafka-0.broker.kafka.svc.cluster.local:9092,kafka-1.broker.kafka.svc.cluster.local:9092,kafka-2.broker.kafka.svc.cluster.local:9092 diff --git a/test/basic-with-kafkacat.yml b/test/basic-with-kafkacat.yml index a8974e8..5d626bc 100644 --- a/test/basic-with-kafkacat.yml +++ b/test/basic-with-kafkacat.yml @@ -1,41 +1,76 @@ --- kind: ConfigMap metadata: - name: basic-with-kafkacat + name: test-basic-with-kafkacat namespace: test-kafka apiVersion: v1 data: setup.sh: |- touch /tmp/testlog - tail -f /tmp/testlog - continue.sh: |- - exit 0 + tail -f /tmp/testlog - run.sh: |- + test.sh: |- exec >> /tmp/testlog exec 2>&1 - unique=$(date -Ins) + PC_WAIT=.2 + MAX_AGE=5 + + UNIQUE="${HOSTNAME}@$(date -u -Ins)" + + echo "Test $UNIQUE" >> /shared/produce.tmp + sleep $PC_WAIT + LAST=$(tail -n 1 /shared/consumed.tmp) + + LAST_TS=$(echo $LAST | awk -F';' '{print $1}') + LAST_MSG=$(echo $LAST | awk -F';' '{print $4}') + NOW=$(date -u +%s%3N) + DIFF_S=$((($NOW - $LAST_TS)/1000)) + DIFF_MS=$((($NOW - $LAST_TS)%1000)) + #echo "$NOW ($(date +%FT%H:%M:%S.%3N)):" + #echo "$LAST_TS" + + if [ $DIFF_S -gt $MAX_AGE ]; then + echo "Last message is $DIFF_S.$DIFF_MS old:" + echo "$LAST_MSG" + exit 10 + fi + + if [[ "$LAST_MSG" != *"$UNIQUE" ]]; then + echo "Last message (at $LAST_TS) isn't from this test run ($UNIQUE):" + echo "$LAST_MSG" + exit 11 + fi - echo "Test $unique" | kafkacat -P -b $BOOTSTRAP -t test-basic-with-kafkacat -v - kafkacat -C -b $BOOTSTRAP -t test-basic-with-kafkacat -o -1 -e | grep $unique + # get info about this message + kafkacat -Q -b $BOOTSTRAP -t test-basic-with-kafkacat:0:$LAST_TS \ + -X socket.timeout.ms=600 -X session.timeout.ms=300 -X request.timeout.ms=50 -X metadata.request.timeout.ms=600 + [ $? -eq 0 ] || echo "At $(date +%FT%H:%M:%S.%3N) bootstrap broker(s) might be down" + # but don't fail the test; producer and consumer should keep going if there are other brokers + + # We haven't asserted that the consumer works, so we'll just have to assume that it will exit if it fails exit 0 + quit-on-nonzero-exit.sh: |- + exec >> /tmp/testlog + exec 2>&1 + + exit 0 --- apiVersion: batch/v1 kind: Job metadata: - name: basic-with-kafkacat + name: test-basic-with-kafkacat namespace: test-kafka spec: template: spec: containers: - name: topic-create - image: solsson/kafka:0.11.0.0@sha256:b27560de08d30ebf96d12e74f80afcaca503ad4ca3103e63b1fd43a2e4c976ce + image: solsson/kafka:1.0.0@sha256:17fdf1637426f45c93c65826670542e36b9f3394ede1cb61885c6a4befa8f72d command: - ./bin/kafka-topics.sh - --zookeeper @@ -50,54 +85,99 @@ spec: - "1" restartPolicy: Never --- -apiVersion: apps/v1beta1 +apiVersion: apps/v1beta2 kind: Deployment metadata: - name: basic-with-kafkacat + name: test-basic-with-kafkacat namespace: test-kafka spec: replicas: 1 + strategy: + type: Recreate + selector: + matchLabels: + test-target: kafka + test-type: readiness template: metadata: labels: test-target: kafka test-type: readiness + # for example: + # readonly - can be used in production + # isolated - read/write but in a manner that does not affect other services + # load - unsuitable for production because it uses significant resources + # chaos - unsuitable for production because it injects failure modes + #test-use: spec: containers: + - name: producer + image: solsson/kafkacat@sha256:ebebf47061300b14a4b4c2e1e4303ab29f65e4b95d34af1b14bb8f7ec6da7cef + env: + - name: BOOTSTRAP + value: kafka-0.broker.kafka.svc.cluster.local:9092 + command: + - /bin/bash + - -cex + - > + echo "--- start $HOSTNAME $(date --iso-8601='ns' -u) ---" >> /shared/produce.tmp + ; + tail -f /shared/produce.tmp | + kafkacat -P -b $BOOTSTRAP -t test-basic-with-kafkacat -v -T -d broker + ; + volumeMounts: + - name: config + mountPath: /test + - name: shared + mountPath: /shared + - name: consumer + image: solsson/kafkacat@sha256:ebebf47061300b14a4b4c2e1e4303ab29f65e4b95d34af1b14bb8f7ec6da7cef + env: + - name: BOOTSTRAP + value: kafka-0.broker.kafka.svc.cluster.local:9092 + command: + - /bin/bash + - -cex + - > + kafkacat -C -b $BOOTSTRAP -t test-basic-with-kafkacat -o -1 -f '%T;%p;%o;%s\n' -u -d broker | + tee /shared/consumed.tmp + ; + volumeMounts: + - name: config + mountPath: /test + - name: shared + mountPath: /shared - name: testcase - # common test images - #image: solsson/curl@sha256:8b0927b81d10043e70f3e05e33e36fb9b3b0cbfcbccdb9f04fd53f67a270b874 - image: solsson/kafkacat@sha256:1266d140c52cb39bf314b6f22b6d7a01c4c9084781bc779fdfade51214a713a8 - #image: solsson/kubectl-kafkacat@sha256:3715a7ede3f168f677ee6faf311ff6887aff31f660cfeecad5d87b4f18516321 + image: solsson/kafkacat@sha256:ebebf47061300b14a4b4c2e1e4303ab29f65e4b95d34af1b14bb8f7ec6da7cef env: - name: BOOTSTRAP - #value: kafka-0.broker.kafka.svc.cluster.local:9092,kafka-1.broker.kafka.svc.cluster.local:9092,kafka-2.broker.kafka.svc.cluster.local:9092 value: kafka-0.broker.kafka.svc.cluster.local:9092 - - name: ZOOKEEPER - value: zookeeper.kafka.svc.cluster.local:2181 - # Test set up command: - /bin/bash - -e - /test/setup.sh - # Test run, again and again readinessProbe: exec: command: - /bin/bash - -e - - /test/run.sh - # Test quit on nonzero exit + - /test/test.sh + initialDelaySeconds: 10 + periodSeconds: 10 livenessProbe: exec: command: - /bin/bash - -e - - /test/continue.sh + - /test/quit-on-nonzero-exit.sh volumeMounts: - name: config mountPath: /test + - name: shared + mountPath: /shared volumes: - name: config configMap: - name: basic-with-kafkacat + name: test-basic-with-kafkacat + - name: shared + emptyDir: {} diff --git a/test/jmx-selftest.yml b/test/jmx-selftest.yml new file mode 100644 index 0000000..9e328ea --- /dev/null +++ b/test/jmx-selftest.yml @@ -0,0 +1,41 @@ +# Sets up a pod that monitors itself, to test resource usage etc. +# kubectl exec -n test-kafka jmx-selftest-... -- /bin/sh -c 'apk add --no-cache curl && curl http://localhost:5556/metrics' +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: jmx-selftest + namespace: test-kafka +spec: + replicas: 1 + template: + metadata: + labels: + test-target: jmx-exporter + test-type: readiness + # Uncomment to test with Prometheus + #annotations: + # prometheus.io/scrape: "true" + # prometheus.io/port: "5556" + spec: + containers: + - name: monitor + image: solsson/kafka-prometheus-jmx-exporter@sha256:348b0f6510b08dff70ba468a16d25dc8def480fe79aca0e3c76f098d67b108a3 + command: + - java + - -Dcom.sun.management.jmxremote.ssl=false + - -Dcom.sun.management.jmxremote.authenticate=false + - -Dcom.sun.management.jmxremote.port=5555 + - -jar + - jmx_prometheus_httpserver.jar + - "5556" + - example_configs/httpserver_sample_config.yml + ports: + - name: jmx + containerPort: 5555 + - name: slashmetrics + containerPort: 5556 + # Test run, again and again + readinessProbe: + httpGet: + path: /metrics + port: 5556 diff --git a/test/metrics.yml b/test/metrics.yml new file mode 100644 index 0000000..b28cae8 --- /dev/null +++ b/test/metrics.yml @@ -0,0 +1,107 @@ +# kubectl apply -f test/metrics.yml && kubectl scale --replicas=0 deploy/metrics && kubectl scale --replicas=1 deploy/metrics +# kubectl exec metrics-... -- tail -f /tmp/loglast | egrep 'time_total|^jmx_scrape_duration_seconds|^java_lang_memory_heapmemoryusage_used|^java_lang_memory_nonheapmemoryusage_used' +--- +kind: ConfigMap +metadata: + name: metrics + namespace: test-kafka +apiVersion: v1 +data: + + curl-format.txt: |- + \n + # ------ curl stats ------\n + time_namelookup %{time_namelookup}\n + time_connect %{time_connect}\n + time_appconnect %{time_appconnect}\n + time_pretransfer %{time_pretransfer}\n + time_redirect %{time_redirect}\n + time_starttransfer %{time_starttransfer}\n + \n + time_total{url="%{url_effective}"} %{time_total}\n + \n + http_code{url="%{url_effective}"} %{http_code}\n + size_download{url="%{url_effective}"} %{size_download}\n + content_type %{content_type}\n + # ----- curl complete -----\n + \n + + setup.sh: |- + touch /tmp/testlog + tail -f /tmp/testlog + + continue.sh: |- + exit 0 + + run.sh: |- + exec >> /tmp/testlog + exec 2>&1 + + date -u -Iseconds | tee /tmp/loglast + + curl -w "@/test/curl-format.txt" -s --max-time $MAX_RESPONSE_TIME \ + http://kafka-0.broker.kafka.svc.cluster.local:5556/metrics \ + | tee -a /tmp/loglast \ + | grep http_code \ + | grep 200 + + curl -w "@/test/curl-format.txt" -s --max-time $MAX_RESPONSE_TIME \ + http://zoo-0.zoo.kafka.svc.cluster.local:5556/metrics \ + | tee -a /tmp/loglast \ + | grep http_code \ + | grep 200 + + curl -w "@/test/curl-format.txt" -s --max-time $MAX_RESPONSE_TIME \ + http://pzoo-0.pzoo.kafka.svc.cluster.local:5556/metrics \ + | tee -a /tmp/loglast \ + | grep http_code \ + | grep 200 + + exit 0 + +--- +apiVersion: apps/v1beta1 +kind: Deployment +metadata: + name: metrics + namespace: test-kafka +spec: + replicas: 1 + template: + metadata: + labels: + test-target: kafka + test-type: readiness + spec: + containers: + - name: testcase + image: solsson/curl@sha256:8c0c5d669b3dd67932da934024252af59fb9d0fa0e5118b5a737b35c5e1487bf + env: + - name: MAX_RESPONSE_TIME + value: "3" + # Test set up + command: + - /bin/bash + - -e + - /test/setup.sh + # Test run, again and again + readinessProbe: + exec: + command: + - /bin/bash + - -e + - /test/run.sh + # Test quit on nonzero exit + livenessProbe: + exec: + command: + - /bin/bash + - -e + - /test/continue.sh + volumeMounts: + - name: config + mountPath: /test + volumes: + - name: config + configMap: + name: metrics diff --git a/yahoo-kafka-manager/kafka-manager-service.yml b/yahoo-kafka-manager/kafka-manager-service.yml new file mode 100644 index 0000000..3d26adf --- /dev/null +++ b/yahoo-kafka-manager/kafka-manager-service.yml @@ -0,0 +1,12 @@ +kind: Service +apiVersion: v1 +metadata: + name: kafka-manager + namespace: kafka +spec: + selector: + app: kafka-manager + ports: + - protocol: TCP + port: 80 + targetPort: 80 diff --git a/yahoo-kafka-manager/kafka-manager.yml b/yahoo-kafka-manager/kafka-manager.yml new file mode 100644 index 0000000..77319e4 --- /dev/null +++ b/yahoo-kafka-manager/kafka-manager.yml @@ -0,0 +1,26 @@ +apiVersion: apps/v1beta2 +kind: Deployment +metadata: + name: kafka-manager + namespace: kafka +spec: + replicas: 1 + selector: + matchLabels: + app: kafka-manager + template: + metadata: + labels: + app: kafka-manager + spec: + containers: + - name: kafka-manager + image: solsson/kafka-manager@sha256:e07b5c50b02c761b3471ebb62ede88afc0625e325fe428316e32fec7f227ff9b + ports: + - containerPort: 80 + env: + - name: ZK_HOSTS + value: zookeeper.kafka:2181 + command: + - ./bin/kafka-manager + - -Dhttp.port=80
\ No newline at end of file diff --git a/zookeeper/10zookeeper-config.yml b/zookeeper/10zookeeper-config.yml index e796b4b..c9b7a6f 100644 --- a/zookeeper/10zookeeper-config.yml +++ b/zookeeper/10zookeeper-config.yml @@ -35,3 +35,22 @@ data: # Suppress connection log messages, three lines per livenessProbe execution log4j.logger.org.apache.zookeeper.server.NIOServerCnxnFactory=WARN log4j.logger.org.apache.zookeeper.server.NIOServerCnxn=WARN + + jmx-zookeeper-prometheus.yaml: |+ + rules: + - pattern: "org.apache.ZooKeeperService<name0=ReplicatedServer_id(\\d)><>(\\w+)" + name: "zookeeper_$2" + - pattern: "org.apache.ZooKeeperService<name0=ReplicatedServer_id(\\d), name1=replica.(\\d)><>(\\w+)" + name: "zookeeper_$3" + labels: + replicaId: "$2" + - pattern: "org.apache.ZooKeeperService<name0=ReplicatedServer_id(\\d), name1=replica.(\\d), name2=(\\w+)><>(\\w+)" + name: "zookeeper_$4" + labels: + replicaId: "$2" + memberType: "$3" + - pattern: "org.apache.ZooKeeperService<name0=ReplicatedServer_id(\\d), name1=replica.(\\d), name2=(\\w+), name3=(\\w+)><>(\\w+)" + name: "zookeeper_$4_$5" + labels: + replicaId: "$2" + memberType: "$3" diff --git a/zookeeper/50pzoo.yml b/zookeeper/50pzoo.yml index 7fd373c..b52a82b 100644 --- a/zookeeper/50pzoo.yml +++ b/zookeeper/50pzoo.yml @@ -1,22 +1,31 @@ -apiVersion: apps/v1beta1 +apiVersion: apps/v1beta2 kind: StatefulSet metadata: name: pzoo namespace: kafka spec: +spec: + selector: + matchLabels: + app: zookeeper + storage: persistent serviceName: "pzoo" replicas: 3 + updateStrategy: + type: RollingUpdate template: metadata: labels: app: zookeeper storage: persistent annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "5556" spec: terminationGracePeriodSeconds: 10 initContainers: - name: init-config - image: solsson/kafka:0.11.0.0@sha256:b27560de08d30ebf96d12e74f80afcaca503ad4ca3103e63b1fd43a2e4c976ce + image: solsson/kafka:1.0.0@sha256:17fdf1637426f45c93c65826670542e36b9f3394ede1cb61885c6a4befa8f72d command: ['/bin/bash', '/etc/kafka/init.sh'] volumeMounts: - name: config @@ -25,10 +34,12 @@ spec: mountPath: /var/lib/zookeeper/data containers: - name: zookeeper - image: solsson/kafka:0.11.0.0@sha256:b27560de08d30ebf96d12e74f80afcaca503ad4ca3103e63b1fd43a2e4c976ce + image: solsson/kafka:1.0.0@sha256:17fdf1637426f45c93c65826670542e36b9f3394ede1cb61885c6a4befa8f72d env: - name: KAFKA_LOG4J_OPTS value: -Dlog4j.configuration=file:/etc/kafka/log4j.properties + - name: JMX_PORT + value: "5555" command: - ./bin/zookeeper-server-start.sh - /etc/kafka/zookeeper.properties @@ -48,12 +59,36 @@ spec: command: - /bin/sh - -c - - '[ "imok" = "$(echo ruok | nc -w 1 127.0.0.1 2181)" ]' + - '[ "imok" = "$(echo ruok | nc -w 1 -q 1 127.0.0.1 2181)" ]' + livenessProbe: + exec: + command: + - /bin/sh + - -c + - '[ "imok" = "$(echo ruok | nc -w 5 -q 1 127.0.0.1 2181)" ]' volumeMounts: - name: config mountPath: /etc/kafka - name: data mountPath: /var/lib/zookeeper/data + - name: metrics + image: solsson/kafka-prometheus-jmx-exporter@sha256:348b0f6510b08dff70ba468a16d25dc8def480fe79aca0e3c76f098d67b108a3 + command: + - java + - -Xmx64M + - -XX:MaxMetaspaceSize=32m + - -jar + - jmx_prometheus_httpserver.jar + - "5556" + - example_configs/zookeeper.yaml + ports: + - containerPort: 5556 + resources: + requests: + cpu: 0m + memory: 100Mi + limits: + memory: 150Mi volumes: - name: config configMap: @@ -63,6 +98,7 @@ spec: name: data spec: accessModes: [ "ReadWriteOnce" ] + storageClassName: kafka-zookeeper resources: requests: - storage: 10Gi + storage: 1Gi diff --git a/zookeeper/51zoo.yml b/zookeeper/51zoo.yml index f5d1f91..f12385d 100644 --- a/zookeeper/51zoo.yml +++ b/zookeeper/51zoo.yml @@ -1,22 +1,30 @@ -apiVersion: apps/v1beta1 +apiVersion: apps/v1beta2 kind: StatefulSet metadata: name: zoo namespace: kafka spec: + selector: + matchLabels: + app: zookeeper + storage: ephemeral serviceName: "zoo" replicas: 2 + updateStrategy: + type: RollingUpdate template: metadata: labels: app: zookeeper storage: ephemeral annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "5556" spec: terminationGracePeriodSeconds: 10 initContainers: - name: init-config - image: solsson/kafka:0.11.0.0@sha256:b27560de08d30ebf96d12e74f80afcaca503ad4ca3103e63b1fd43a2e4c976ce + image: solsson/kafka:1.0.0@sha256:17fdf1637426f45c93c65826670542e36b9f3394ede1cb61885c6a4befa8f72d command: ['/bin/bash', '/etc/kafka/init.sh'] env: - name: ID_OFFSET @@ -28,10 +36,12 @@ spec: mountPath: /var/lib/zookeeper/data containers: - name: zookeeper - image: solsson/kafka:0.11.0.0@sha256:b27560de08d30ebf96d12e74f80afcaca503ad4ca3103e63b1fd43a2e4c976ce + image: solsson/kafka:1.0.0@sha256:17fdf1637426f45c93c65826670542e36b9f3394ede1cb61885c6a4befa8f72d env: - name: KAFKA_LOG4J_OPTS value: -Dlog4j.configuration=file:/etc/kafka/log4j.properties + - name: JMX_PORT + value: "5555" command: - ./bin/zookeeper-server-start.sh - /etc/kafka/zookeeper.properties @@ -51,12 +61,39 @@ spec: command: - /bin/sh - -c - - '[ "imok" = "$(echo ruok | nc -w 1 127.0.0.1 2181)" ]' + - '[ "imok" = "$(echo ruok | nc -w 1 -q 1 127.0.0.1 2181)" ]' + livenessProbe: + exec: + command: + - /bin/sh + - -c + - '[ "imok" = "$(echo ruok | nc -w 5 -q 1 127.0.0.1 2181)" ]' volumeMounts: - name: config mountPath: /etc/kafka - name: data mountPath: /var/lib/zookeeper/data + - name: metrics + image: solsson/kafka-prometheus-jmx-exporter@sha256:348b0f6510b08dff70ba468a16d25dc8def480fe79aca0e3c76f098d67b108a3 + command: + - java + - -Xmx64M + - -XX:MaxMetaspaceSize=32m + - -jar + - jmx_prometheus_httpserver.jar + - "5556" + - /etc/kafka/jmx-zookeeper-prometheus.yaml + ports: + - containerPort: 5556 + resources: + requests: + cpu: 0m + memory: 100Mi + limits: + memory: 150Mi + volumeMounts: + - name: config + mountPath: /etc/kafka volumes: - name: config configMap: |