INTRO

이번에 알아볼 내용은 Kubernetes 상에서 Kafka를 구성하게 하는 Strimzi Operator에 대해서 알아보겠습니다. 그리고 Kafka 클러스터 배포, 모니터링을 위한 Ui for Apache Kafka구성, Prometheus+Grafana로 브로커 모니터링 구성까지 해보겠습니다.

 

 

Strimzi 가 할수 있는 역할은?

 

 

Strimzi는 K8s환경에서 Kafka 운영 관리에 도움을 주는 Operator 입니다

  • 공식 link: https://strimzi.io/
  • Operator 제공 기능 : 카프카 클러스터/구성요소 배포 및 관리, 카프카 접속 설정, 카프카 업그레이드, 브로커 brokers 관리, 토픽 topic유저 user 생성 및 관리
  • Operator 로 배포 및 관리 : Kafka clusters, Kafka Connect, Kafka MirrorMaker, Kafka Bridge, Kafka Exporter, Cruise Control, Entity Operator (User, Topic)

 

 

Cluster Operator 는 카프카/주키퍼 클러스터를 생성/배포 및 관리

 

Topic Operator 는 토픽 Topic 생성, 삭제, 변경 등 관리

 

 

Strimzi Operator 로 설치하기

# 네임스페이스 생성
kubectl create namespace kafka

# Repo 추가
helm repo add strimzi https://strimzi.io/charts/
helm show values strimzi/strimzi-kafka-operator

# 차트 설치 : 오퍼레이터 파드 설치
helm install kafka-operator strimzi/strimzi-kafka-operator --version 0.38.0 --namespace kafka

# 배포한 리소스 확인 : Operator 디플로이먼트(파드)
kubectl get deploy,pod -n kafka
kubectl get-all -n kafka

# 오퍼레이터가 지원하는 카프카 버전 확인
kubectl describe deploy -n kafka | grep KAFKA_IMAGES: -A3

# 배포한 리소스 확인 : CRDs - 각각이 제공 기능으로 봐도됨!
kubectl get crd | grep strimzi
kafkabridges.kafka.strimzi.io				2023-11-11T06:01:21Z
kafkaconnectors.kafka.strimzi.io			 2023-11-11T06:01:21Z
kafkaconnects.kafka.strimzi.io			   2023-11-11T06:01:20Z
kafkamirrormaker2s.kafka.strimzi.io		  2023-11-11T06:01:21Z
kafkamirrormakers.kafka.strimzi.io		   2023-11-11T06:01:21Z
kafkanodepools.kafka.strimzi.io			  2023-11-11T06:01:21Z
kafkarebalances.kafka.strimzi.io			 2023-11-11T06:01:21Z
kafkas.kafka.strimzi.io					  2023-11-11T06:01:20Z
kafkatopics.kafka.strimzi.io				 2023-11-11T06:01:21Z
kafkausers.kafka.strimzi.io				  2023-11-11T06:01:21Z
strimzipodsets.core.strimzi.io			   2023-11-11T06:01:20Z

# (참고) CRD 상세 정보 확인
kubectl describe crd kafkas.kafka.strimzi.io
kubectl describe crd kafkatopics.kafka.strimzi.io

 

(좌측상단부터 시계방향으로)

Operator Pod가 배포된 모습, CRD가 배포된 모습, Operator Deployment가 배포된 모습, 해당스크립트 실행한 내용

 

AWS EKS → GCP GKE 용 파일 수정하기

DOIK2스터디에서는 AWS EKS를 사용하고있지만, 포스팅 환경에서는 GCP GKE를 사용하고있기때문에 적절하게 파일내용을 변경하려고합니다. 수정해야할 파일은 kafka-1.yaml 입니다.

 

kafka-1.yaml 원본

]apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
	version: 3.6.0
	replicas: 3
	listeners:
	  - name: plain
		port: 9092
		type: internal
		tls: false
	  - name: tls
		port: 9093
		type: internal
		tls: false
	  - name: external
		port: 9094
		type: nodeport
		tls: false
	readinessProbe:
	  initialDelaySeconds: 15
	  timeoutSeconds: 5
	livenessProbe:
	  initialDelaySeconds: 15
	  timeoutSeconds: 5
	config:
	  offsets.topic.replication.factor: 3
	  transaction.state.log.replication.factor: 3
	  transaction.state.log.min.isr: 2
	  default.replication.factor: 3
	  min.insync.replicas: 2
	  inter.broker.protocol.version: "3.6"
	storage:
	  type: jbod
	  volumes:
	  - id: 0
		type: persistent-claim
		size: 10Gi
		deleteClaim: true
	template: 
	  pod: 
		affinity: 
		  podAntiAffinity: 
			requiredDuringSchedulingIgnoredDuringExecution: 
			  - labelSelector: 
				  matchExpressions: 
					- key: app.kubernetes.io/name
					  operator: In
					  values: 
						- kafka
				topologyKey: "topology.ebs.csi.aws.com/zone"

  zookeeper:
	replicas: 3
	readinessProbe:
	  initialDelaySeconds: 15
	  timeoutSeconds: 5
	livenessProbe:
	  initialDelaySeconds: 15
	  timeoutSeconds: 5
	storage:
	  type: persistent-claim
	  size: 10Gi
	  deleteClaim: true
	template: 
	  pod: 
		affinity: 
		  podAntiAffinity: 
			requiredDuringSchedulingIgnoredDuringExecution: 
			  - labelSelector: 
				  matchExpressions: 
					- key: app.kubernetes.io/name
					  operator: In
					  values: 
						- zookeeper
				topologyKey: "topology.ebs.csi.aws.com/zone"

  entityOperator:
	topicOperator: {}
	userOperator: {}

 

우선, 정확히 뭘 고칠려고하는지를 알아야합니다

spec.kafka.template.pod.affinity.podaffinity.requiredDuringSchedulingIgnoredDuringExecution.topologyKey spec.zookeeper.template.pod.affinity.podaffinity.requiredDuringSchedulingIgnoredDuringExecution.topologyKey 에서 사용하고있는 EBS 관련 옵션을 제거해주어야합니다.

 

그리고, 어떻게 고칠수 있는지를 알아야합니다.

topology 의 속성값을 변경할 수 있습니다. 아래 페이지를 보면 topology.kubernetes.io/zone 설정값과 kubernetes.io/hostname 값 중 하나로 고를 수 있습니다.

 

topology.kubernetes.io/zone 설정값과 kubernetes.io/hostname 중 어떤것을 선택해야할까요?

이를 알기위해서는 ‘GKE 생성모드’에 대해서 알아야합니다.

GKE는 Autopilot 모드와 Standard모드가 존재합니다.

Autopilot모드로 설정한 경우 topology.kubernetes.io/zone 를 사용하며,

Standard모드로 설정한 경우 kubernetes.io/hostname 을 사용합니다.

 

게시글에서는 Standard모드로 설정한 노드4개짜리 클러스터를 사용하고있으므로 kubernetes.io/hostname 을 사용합니다.

 

둘의 차이점은 아래 링크에서 확인할 수 있습니다.

 

따라서, kafka-1.yaml 파일을 수정하면 아래처럼 만들수 있습니다.

kafka-1.yaml 수정본

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 3.6.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: false
      - name: external
        port: 9094
        type: nodeport
        tls: false
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.6"
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 10Gi
          deleteClaim: true
    template: 
      pod: 
        affinity: 
          podAntiAffinity: 
            requiredDuringSchedulingIgnoredDuringExecution: 
              - labelSelector: 
                  matchExpressions: 
                    - key: app.kubernetes.io/name
                      operator: In
                      values: 
                        - kafka
                topologyKey: "kubernetes.io/hostname"

  zookeeper:
    replicas: 3
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: true
    template: 
      pod: 
        affinity: 
          podAntiAffinity: 
            requiredDuringSchedulingIgnoredDuringExecution: 
              - labelSelector: 
                  matchExpressions: 
                    - key: app.kubernetes.io/name
                      operator: In
                      values: 
                        - zookeeper
                topologyKey: "kubernetes.io/hostname"

  entityOperator:
    topicOperator: {}
    userOperator: {}

 

 

카프카 클러스터 배포

실습구성도(최종)

  • 배포가 다 끝나면, 마스터노드에 배포된 파드들이 워커노드들에 배치됩니다.
  • 주키퍼/브로커 파드가 배치되는 노드의 순서와 토픽의 Leader 파티션의 배치는 다를 수 있습니다.
  • 기존 Statefulset(sts) 대신에 StrimziPodSets(sps)를 기본설정으로 진행합니다.
    • StrimziPodSets(sps)특징: sts중간 파드 바로 삭제 불가, 파드 Spec이 동일하도록 강제된다(볼륨, CPU, Mem)

 

 

 

아래 커맨드를 입력해서 배포합니다.

kubectl apply -f kafka-1.yaml -n kafka
#좌측상단: 모니터링
watch kubectl get kafka,strimzipodsets,pod,svc,endpointslice,pvc -n kafka

#우측상단: 모니터링
kubectl logs deployment/strimzi-cluster-operator -n kafka -f

#좌측하단: 오브젝트 배포: Kafka(Broker, 3개), Zookeeper(3개), EntityOperator(Deployment type)
kubectl apply -f kafka-1.yaml -n kafka

#우측하단: pod배포가 잘된 모습
  • Zookeeper 가 배포된 이후에, Kafka(Broker)가 배포된다.

 

배포내용 확인

 

배포확인0

kubectl get kafka -n kafka
kubectl get cm,secret -n kafka
kubectl get strimzipodsets -n kafka

 

배포확인1

kubectl get node --label-columns=kubernetes.io/hostname
kubectl get pod -n kafka -l app.kubernetes.io/name=zookeeper
kubectl get pod -n kafka -l app.kubernetes.io/instance=my-cluster

 

 

각각의 노드별로 떠있는 Pod확인

 

맨 위에보이는 노드에는 my-cluster-entity-operator 라고하는 pod가 확인됩니다.

자세하게 확인해보면 tls-sidecar, topic-operator, user-operator가 만들어져있는것을 확인할 수 있습니다.

 

 

 

Strimzi로 배포된 Broker 의 상세정보를 configmap에서 확인할 수 있습니다.

kubectl describe cm -n kafka my-cluster-kafka-0
#아래는 메세지의 일부분. Broker Clustering 연결설정이나 로그가 저장되는 위치를 확인할 수 있다.

server.config:
----
##############################
##############################
# This file is automatically generated by the Strimzi Cluster Operator
# Any changes to this file will be ignored and overwritten!
##############################
##############################

##########
# Node / Broker ID
##########
broker.id=0
node.id=0

##########
# Kafka message logs configuration
##########
log.dirs=/var/lib/kafka/data-0/kafka-log0

##########
# Common listener configuration
##########
listener.security.protocol.map=CONTROLPLANE-9090:SSL,REPLICATION-9091:SSL,PLAIN-9092:PLAINTEXT,TLS-9093:PLAINTEXT,EXTERNAL-9094:PLAINTEXT
listeners=CONTROLPLANE-9090://0.0.0.0:9090,REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092,TLS-9093://0.0.0.0:9093,EXTERNAL-9094://0.0.0.0:9094
advertised.listeners=CONTROLPLANE-9090://my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9090,REPLICATION-9091://my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9091,PLAIN-9092://my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9092,TLS-9093://my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9093,EXTERNAL-9094://${STRIMZI_NODEPORT_DEFAULT_ADDRESS}:32510
inter.broker.listener.name=REPLICATION-9091
control.plane.listener.name=CONTROLPLANE-9090
sasl.enabled.mechanisms=
ssl.endpoint.identification.algorithm=HTTPS

##########
# User provided configuration
##########
default.replication.factor=3
inter.broker.protocol.version=3.6
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3
log.message.format.version=3.6

 

반면 zookeeper는 각각 Configmap이 있는게 아니라, cluster에 대한 Configmap만 존재합니다.

kubectl describe cm -n kafka my-cluster-zookeeper-config
Name:		 my-cluster-zookeeper-config
Namespace:	kafka
Labels:	   app.kubernetes.io/instance=my-cluster
			  app.kubernetes.io/managed-by=strimzi-cluster-operator
			  app.kubernetes.io/name=zookeeper
			  app.kubernetes.io/part-of=strimzi-my-cluster
			  strimzi.io/cluster=my-cluster
			  strimzi.io/component-type=zookeeper
			  strimzi.io/kind=Kafka
			  strimzi.io/name=my-cluster-zookeeper
Annotations:  <none>

Data
====
log4j.properties:
----
# Do not change this generated file. Logging can be configured in the corresponding Kubernetes resource.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %p %m (%c) [%t]%n
zookeeper.root.logger=INFO
log4j.rootLogger=${zookeeper.root.logger}, CONSOLE

zookeeper.node-count:
----
3

BinaryData
====

Events:  <none>

 

Listeners 정보확인

kubectl get kafka -n kafka my-cluster -o jsonpath={.status.listeners} | jq
  • bootstrapServers 주소는 브로커주소로서, 해당 브로커로 접근할 수 있다
  • 9092는 Plaintext(평문), 9093은 TLS(암호화), 9094는 external(nodePort연결)
[
  {
	"addresses": [
	  {
		"host": "my-cluster-kafka-bootstrap.kafka.svc",
		"port": 9092
	  }
	],
	"bootstrapServers": "my-cluster-kafka-bootstrap.kafka.svc:9092",
	"name": "plain"
  },
  {
	"addresses": [
	  {
		"host": "my-cluster-kafka-bootstrap.kafka.svc",
		"port": 9093
	  }
	],
	"bootstrapServers": "my-cluster-kafka-bootstrap.kafka.svc:9093",
	"name": "tls"
  },
  {
	"addresses": [
	  {
		"host": "34.64.203.144",
		"port": 32631
	  },
	  {
		"host": "34.64.160.2",
		"port": 32631
	  },
	  {
		"host": "34.64.92.44",
		"port": 32631
	  }
	],
	"bootstrapServers": "34.64.160.2:32631,34.64.203.144:32631,34.64.92.44:32631",
	"name": "external"
  }
]

 

 

 

테스트용 Pod 생성

Client 파일을 다운로드합니다.

curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/myclient.yaml

 

myclient.yaml

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: myclient
  labels:
	app: kafkaclient
spec:
  selector:
	matchLabels:
	  name: kafkaclient
  template:
	metadata:
	  labels:
		name: kafkaclient
	spec:
	  containers:
	  - name: kafkaclient
		image: bitnami/kafka:${VERSION}
		command: ["tail"]
		args: ["-f", "/dev/null"]
	  terminationGracePeriodSeconds: 0

 

아래키워드로 배포합니다.

각각의 노드에 client가 daemonset형태로 띄워진것을 확인할 수 있습니다.

VERSION=3.6 envsubst < myclient.yaml | kubectl apply -f -

 

카프카 클라이언트는 아래의 쉘 스크립트를 갖고있습니다.

#kubectl exec -it ds/myclient -- ls /opt/bitnami/kafka/bin
connect-distributed.sh		  kafka-consumer-perf-test.sh  kafka-metadata-shell.sh		   kafka-transactions.sh
connect-mirror-maker.sh	   kafka-delegation-tokens.sh   kafka-mirror-maker.sh		   kafka-verifiable-consumer.sh
connect-plugin-path.sh		  kafka-delete-records.sh	   kafka-producer-perf-test.sh		   kafka-verifiable-producer.sh
connect-standalone.sh		  kafka-dump-log.sh		   kafka-reassign-partitions.sh		trogdor.sh
kafka-acls.sh			  kafka-e2e-latency.sh	   kafka-replica-verification.sh	   windows
kafka-broker-api-versions.sh  kafka-features.sh		   kafka-run-class.sh			   zookeeper-security-migration.sh
kafka-cluster.sh		  kafka-get-offsets.sh	   kafka-server-start.sh		   zookeeper-server-start.sh
kafka-configs.sh		  kafka-jmx.sh		   kafka-server-stop.sh			   zookeeper-server-stop.sh
kafka-console-consumer.sh	 kafka-leader-election.sh	   kafka-storage.sh			   zookeeper-shell.sh
kafka-console-producer.sh	 kafka-log-dirs.sh		   kafka-streams-application-reset.sh
kafka-consumer-groups.sh	  kafka-metadata-quorum.sh	   kafka-topics.sh

 

SVC 도메인 이름을 변수에 지정합니다.

SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092
sudo sh -c "echo export SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092 >> /etc/profile"

 

브로커 정보확인하는 명령어

# 브로커 정보
kubectl exec -it ds/myclient -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS

# 브로커에 설정된 각종 기본값 확인 : --broker --all --describe 로 조회
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 1 --all --describe
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 2 --all --describe
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 0 --all --describe

 

토픽 정보확인하는 명령어

# 토픽 리스트 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list

# 토픽 리스트 확인 (kubectl native) : PARTITIONS, REPLICATION FACTOR
kubectl get kafkatopics -n kafka

 

Kafka UI(UI for Apache Kafka) 배포하기

# 레포 추가하기
helm repo add kafka-ui https://provectus.github.io/kafka-ui-charts

# 참조할 value파일 만들기 
cat <<EOF > kafkaui-values.yml
yamlApplicationConfig:
  kafka:
	clusters:
	  - name: yaml
		bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc:9092
  auth:
	type: disabled
  management:
	health:
	  ldap:
		enabled: false
EOF

# 배포하기
helm install kafka-ui kafka-ui/kafka-ui -f kafkaui-values.yml

# 접속 확인
kubectl patch svc kafka-ui -p '{"spec":{"type":"LoadBalancer"}}'
kubectl annotate service kafka-ui "external-dns.alpha.kubernetes.io/hostname=kafka-ui"
echo -e "kafka-ui Web URL = http://kafka-ui"

 

 

배포가 완료되었으며, 해당 대시보드에 접근해보자.

kafka-ui가 떠있는 EXTERNAL-IP(클러스터의 IP)로 직접 접근해서 확인할 수 있습니다.

위의 경우 EXTERNAL-IP는 34.64.75.130 이며, 사용하는 포트는 80입니다.

브라우저 주소 창에 EXTERNAL-IP:PORT번호 를 입력합니다.

 

Prometheus + Grafana 모니터링 연결하기

 

Exporter 설정된 카프카 클러스터 배포

# exporter 관련 설정 확인
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/kafka-2.yaml
cat kafka-2.yaml | yh

# exporter 설정된 카프카 클러스터 배포
kubectl apply -f kafka-2.yaml -n kafka

이때 Deployment로 kafka-exporter가 생성되고, 생성되었던 카프카클러스터 Update가 됩니다.

  • 기존 카프카 클러스터가 update되기때문에 RollingUpdate하는 과정을 진행한다(default옵션).

 

 

 

예제코드 복사하기

#
git clone https://github.com/AmarendraSingh88/kafka-on-kubernetes.git
cd kafka-on-kubernetes/kafka-demo/demo3-monitoring/

 

프로메테우스 설치하기

#monitoring namespace가 없기때문에 만들어준다.
kubectl create namespace monitoring
# 프로메테우스 설치 : --server-side 옵션 설정
kubectl apply -f prometheus-operator-deployment.yaml -n monitoring --server-side
kubectl apply -f prometheus.yaml -n monitoring
kubectl apply -f prometheus-rules.yaml -n monitoring
kubectl apply -f strimzi-pod-monitor.yaml -n monitoring

 

 

그라파나 설치하기

# 그라파나 설치
kubectl apply -f grafana/grafana.yaml -n monitoring
kubectl patch svc -n monitoring grafana -p '{"spec":{"type":"LoadBalancer"}}'
kubectl annotate service grafana -n monitoring "external-dns.alpha.kubernetes.io/hostname=grafana.$MyDomain"

# 접속 정보 확인
echo -e "Grafana URL = http://grafana.$MyDomain:3000"

그라파나 대시보드 접속방법

  • grafana service는 monitoring namespace내부에 배포되기때문에 직접확인하려면 kubectl get svc -n monitoring으로 확인할 수 있습니다.

kafka ui를 확인할때와 똑같이, EXTERNAL-IP:PORT 로 접근하겠습니다.

  • 위의 사진에서는 34.64.218.79:3000

 

  • 그라파나 웹 접속 : admin / admin
  • 그라파나 데이터 소스 설정 : 프로메테우스파드-0 에 헤드리스 접속 =주소 입력 후 연결 확인

prometheus-prometheus-0.prometheus-operated.monitoring.svc.cluster.local:9090

 하단에 저장버튼 누르기!

  • 그라파나 대시보드 추가:

 아래링크에서 원하는 json 파일을 다운로드 받는다.

  https://github.com/AmarendraSingh88/kafka-on-kubernetes/tree/main/kafka-demo/demo3-monitoring/grafana

 다운로드 받은 json 파일을 import 합니다.

 데이터 소스를 Prometheus 로 지정해줘야합니다.

  

 완성!

  

OUTRO

다음게시글에서는 토픽생성과 메세지 전달에 대해서 작성해보겠습니다.

 

jjongguet