INTRO
이번에 알아볼 내용은 Kubernetes 상에서 Kafka를 구성하게 하는 Strimzi Operator에 대해서 알아보겠습니다. 그리고 Kafka 클러스터 배포, 모니터링을 위한 Ui for Apache Kafka구성, Prometheus+Grafana로 브로커 모니터링 구성까지 해보겠습니다.
Strimzi 가 할수 있는 역할은?
Strimzi는 K8s환경에서 Kafka 운영 관리에 도움을 주는 Operator 입니다
- 공식 link: https://strimzi.io/
- Operator 제공 기능 : 카프카 클러스터/구성요소 배포 및 관리, 카프카 접속 설정, 카프카 업그레이드, 브로커 brokers 관리, 토픽 topic 과 유저 user 생성 및 관리
- Operator 로 배포 및 관리 : Kafka clusters, Kafka Connect, Kafka MirrorMaker, Kafka Bridge, Kafka Exporter, Cruise Control, Entity Operator (User, Topic)
Cluster Operator 는 카프카/주키퍼 클러스터를 생성/배포 및 관리
Topic Operator 는 토픽 Topic 생성, 삭제, 변경 등 관리
Strimzi Operator 로 설치하기
- link: https://artifacthub.io/packages/helm/strimzi/strimzi-kafka-operator/0.38.0 (2023.11.16기준 latest)
# 네임스페이스 생성
kubectl create namespace kafka
# Repo 추가
helm repo add strimzi https://strimzi.io/charts/
helm show values strimzi/strimzi-kafka-operator
# 차트 설치 : 오퍼레이터 파드 설치
helm install kafka-operator strimzi/strimzi-kafka-operator --version 0.38.0 --namespace kafka
# 배포한 리소스 확인 : Operator 디플로이먼트(파드)
kubectl get deploy,pod -n kafka
kubectl get-all -n kafka
# 오퍼레이터가 지원하는 카프카 버전 확인
kubectl describe deploy -n kafka | grep KAFKA_IMAGES: -A3
# 배포한 리소스 확인 : CRDs - 각각이 제공 기능으로 봐도됨!
kubectl get crd | grep strimzi
kafkabridges.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkaconnectors.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkaconnects.kafka.strimzi.io 2023-11-11T06:01:20Z
kafkamirrormaker2s.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkamirrormakers.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkanodepools.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkarebalances.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkas.kafka.strimzi.io 2023-11-11T06:01:20Z
kafkatopics.kafka.strimzi.io 2023-11-11T06:01:21Z
kafkausers.kafka.strimzi.io 2023-11-11T06:01:21Z
strimzipodsets.core.strimzi.io 2023-11-11T06:01:20Z
# (참고) CRD 상세 정보 확인
kubectl describe crd kafkas.kafka.strimzi.io
kubectl describe crd kafkatopics.kafka.strimzi.io
(좌측상단부터 시계방향으로)
Operator Pod가 배포된 모습, CRD가 배포된 모습, Operator Deployment가 배포된 모습, 해당스크립트 실행한 내용
AWS EKS → GCP GKE 용 파일 수정하기
DOIK2스터디에서는 AWS EKS를 사용하고있지만, 포스팅 환경에서는 GCP GKE를 사용하고있기때문에 적절하게 파일내용을 변경하려고합니다. 수정해야할 파일은 kafka-1.yaml 입니다.
kafka-1.yaml 원본
]apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: false
- name: external
port: 9094
type: nodeport
tls: false
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.6"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: true
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- kafka
topologyKey: "topology.ebs.csi.aws.com/zone"
zookeeper:
replicas: 3
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
storage:
type: persistent-claim
size: 10Gi
deleteClaim: true
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- zookeeper
topologyKey: "topology.ebs.csi.aws.com/zone"
entityOperator:
topicOperator: {}
userOperator: {}
우선, 정확히 뭘 고칠려고하는지를 알아야합니다
spec.kafka.template.pod.affinity.podaffinity.requiredDuringSchedulingIgnoredDuringExecution.topologyKey
와 spec.zookeeper.template.pod.affinity.podaffinity.requiredDuringSchedulingIgnoredDuringExecution.topologyKey
에서 사용하고있는 EBS 관련 옵션을 제거해주어야합니다.
그리고, 어떻게 고칠수 있는지를 알아야합니다.
topology 의 속성값을 변경할 수 있습니다. 아래 페이지를 보면 topology.kubernetes.io/zone
설정값과 kubernetes.io/hostname
값 중 하나로 고를 수 있습니다.
- GKE topology link: https://cloud.google.com/kubernetes-engine/docs/concepts/autopilot-overview?hl=ko#pod_affinity_and_anti-affinity
- Kubernetes topology link: https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/
topology.kubernetes.io/zone
설정값과 kubernetes.io/hostname
중 어떤것을 선택해야할까요?
이를 알기위해서는 ‘GKE 생성모드’에 대해서 알아야합니다.
GKE는 Autopilot 모드와 Standard모드가 존재합니다.
Autopilot모드로 설정한 경우 topology.kubernetes.io/zone
를 사용하며,
Standard모드로 설정한 경우 kubernetes.io/hostname
을 사용합니다.
게시글에서는 Standard모드로 설정한 노드4개짜리 클러스터를 사용하고있으므로 kubernetes.io/hostname
을 사용합니다.
둘의 차이점은 아래 링크에서 확인할 수 있습니다.
- https://cloud.google.com/kubernetes-engine/docs/concepts/autopilot-overview?hl=ko
- https://cloud.google.com/kubernetes-engine/docs/how-to/creating-a-zonal-cluster?hl=ko
따라서, kafka-1.yaml 파일을 수정하면 아래처럼 만들수 있습니다.
kafka-1.yaml 수정본
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: false
- name: external
port: 9094
type: nodeport
tls: false
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.6"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: true
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- kafka
topologyKey: "kubernetes.io/hostname"
zookeeper:
replicas: 3
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
storage:
type: persistent-claim
size: 10Gi
deleteClaim: true
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- zookeeper
topologyKey: "kubernetes.io/hostname"
entityOperator:
topicOperator: {}
userOperator: {}
카프카 클러스터 배포
실습구성도(최종)
- 배포가 다 끝나면, 마스터노드에 배포된 파드들이 워커노드들에 배치됩니다.
- 주키퍼/브로커 파드가 배치되는 노드의 순서와 토픽의 Leader 파티션의 배치는 다를 수 있습니다.
- 기존 Statefulset(sts) 대신에 StrimziPodSets(sps)를 기본설정으로 진행합니다.
- StrimziPodSets(sps)특징: sts중간 파드 바로 삭제 불가, 파드 Spec이 동일하도록 강제된다(볼륨, CPU, Mem)
아래 커맨드를 입력해서 배포합니다.
kubectl apply -f kafka-1.yaml -n kafka
#좌측상단: 모니터링
watch kubectl get kafka,strimzipodsets,pod,svc,endpointslice,pvc -n kafka
#우측상단: 모니터링
kubectl logs deployment/strimzi-cluster-operator -n kafka -f
#좌측하단: 오브젝트 배포: Kafka(Broker, 3개), Zookeeper(3개), EntityOperator(Deployment type)
kubectl apply -f kafka-1.yaml -n kafka
#우측하단: pod배포가 잘된 모습
- Zookeeper 가 배포된 이후에, Kafka(Broker)가 배포된다.
배포내용 확인
배포확인0
kubectl get kafka -n kafka
kubectl get cm,secret -n kafka
kubectl get strimzipodsets -n kafka
배포확인1
kubectl get node --label-columns=kubernetes.io/hostname
kubectl get pod -n kafka -l app.kubernetes.io/name=zookeeper
kubectl get pod -n kafka -l app.kubernetes.io/instance=my-cluster
각각의 노드별로 떠있는 Pod확인
맨 위에보이는 노드에는 my-cluster-entity-operator 라고하는 pod가 확인됩니다.
자세하게 확인해보면 tls-sidecar, topic-operator, user-operator가 만들어져있는것을 확인할 수 있습니다.
Strimzi로 배포된 Broker 의 상세정보를 configmap에서 확인할 수 있습니다.
kubectl describe cm -n kafka my-cluster-kafka-0
#아래는 메세지의 일부분. Broker Clustering 연결설정이나 로그가 저장되는 위치를 확인할 수 있다.
server.config:
----
##############################
##############################
# This file is automatically generated by the Strimzi Cluster Operator
# Any changes to this file will be ignored and overwritten!
##############################
##############################
##########
# Node / Broker ID
##########
broker.id=0
node.id=0
##########
# Kafka message logs configuration
##########
log.dirs=/var/lib/kafka/data-0/kafka-log0
##########
# Common listener configuration
##########
listener.security.protocol.map=CONTROLPLANE-9090:SSL,REPLICATION-9091:SSL,PLAIN-9092:PLAINTEXT,TLS-9093:PLAINTEXT,EXTERNAL-9094:PLAINTEXT
listeners=CONTROLPLANE-9090://0.0.0.0:9090,REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092,TLS-9093://0.0.0.0:9093,EXTERNAL-9094://0.0.0.0:9094
advertised.listeners=CONTROLPLANE-9090://my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9090,REPLICATION-9091://my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9091,PLAIN-9092://my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9092,TLS-9093://my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9093,EXTERNAL-9094://${STRIMZI_NODEPORT_DEFAULT_ADDRESS}:32510
inter.broker.listener.name=REPLICATION-9091
control.plane.listener.name=CONTROLPLANE-9090
sasl.enabled.mechanisms=
ssl.endpoint.identification.algorithm=HTTPS
##########
# User provided configuration
##########
default.replication.factor=3
inter.broker.protocol.version=3.6
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3
log.message.format.version=3.6
반면 zookeeper는 각각 Configmap이 있는게 아니라, cluster에 대한 Configmap만 존재합니다.
kubectl describe cm -n kafka my-cluster-zookeeper-config
Name: my-cluster-zookeeper-config
Namespace: kafka
Labels: app.kubernetes.io/instance=my-cluster
app.kubernetes.io/managed-by=strimzi-cluster-operator
app.kubernetes.io/name=zookeeper
app.kubernetes.io/part-of=strimzi-my-cluster
strimzi.io/cluster=my-cluster
strimzi.io/component-type=zookeeper
strimzi.io/kind=Kafka
strimzi.io/name=my-cluster-zookeeper
Annotations: <none>
Data
====
log4j.properties:
----
# Do not change this generated file. Logging can be configured in the corresponding Kubernetes resource.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %p %m (%c) [%t]%n
zookeeper.root.logger=INFO
log4j.rootLogger=${zookeeper.root.logger}, CONSOLE
zookeeper.node-count:
----
3
BinaryData
====
Events: <none>
Listeners 정보확인
kubectl get kafka -n kafka my-cluster -o jsonpath={.status.listeners} | jq
- bootstrapServers 주소는 브로커주소로서, 해당 브로커로 접근할 수 있다
- 9092는 Plaintext(평문), 9093은 TLS(암호화), 9094는 external(nodePort연결)
[
{
"addresses": [
{
"host": "my-cluster-kafka-bootstrap.kafka.svc",
"port": 9092
}
],
"bootstrapServers": "my-cluster-kafka-bootstrap.kafka.svc:9092",
"name": "plain"
},
{
"addresses": [
{
"host": "my-cluster-kafka-bootstrap.kafka.svc",
"port": 9093
}
],
"bootstrapServers": "my-cluster-kafka-bootstrap.kafka.svc:9093",
"name": "tls"
},
{
"addresses": [
{
"host": "34.64.203.144",
"port": 32631
},
{
"host": "34.64.160.2",
"port": 32631
},
{
"host": "34.64.92.44",
"port": 32631
}
],
"bootstrapServers": "34.64.160.2:32631,34.64.203.144:32631,34.64.92.44:32631",
"name": "external"
}
]
테스트용 Pod 생성
Client 파일을 다운로드합니다.
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/myclient.yaml
myclient.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: myclient
labels:
app: kafkaclient
spec:
selector:
matchLabels:
name: kafkaclient
template:
metadata:
labels:
name: kafkaclient
spec:
containers:
- name: kafkaclient
image: bitnami/kafka:${VERSION}
command: ["tail"]
args: ["-f", "/dev/null"]
terminationGracePeriodSeconds: 0
아래키워드로 배포합니다.
각각의 노드에 client가 daemonset형태로 띄워진것을 확인할 수 있습니다.
VERSION=3.6 envsubst < myclient.yaml | kubectl apply -f -
카프카 클라이언트는 아래의 쉘 스크립트를 갖고있습니다.
#kubectl exec -it ds/myclient -- ls /opt/bitnami/kafka/bin
connect-distributed.sh kafka-consumer-perf-test.sh kafka-metadata-shell.sh kafka-transactions.sh
connect-mirror-maker.sh kafka-delegation-tokens.sh kafka-mirror-maker.sh kafka-verifiable-consumer.sh
connect-plugin-path.sh kafka-delete-records.sh kafka-producer-perf-test.sh kafka-verifiable-producer.sh
connect-standalone.sh kafka-dump-log.sh kafka-reassign-partitions.sh trogdor.sh
kafka-acls.sh kafka-e2e-latency.sh kafka-replica-verification.sh windows
kafka-broker-api-versions.sh kafka-features.sh kafka-run-class.sh zookeeper-security-migration.sh
kafka-cluster.sh kafka-get-offsets.sh kafka-server-start.sh zookeeper-server-start.sh
kafka-configs.sh kafka-jmx.sh kafka-server-stop.sh zookeeper-server-stop.sh
kafka-console-consumer.sh kafka-leader-election.sh kafka-storage.sh zookeeper-shell.sh
kafka-console-producer.sh kafka-log-dirs.sh kafka-streams-application-reset.sh
kafka-consumer-groups.sh kafka-metadata-quorum.sh kafka-topics.sh
SVC 도메인 이름을 변수에 지정합니다.
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092
sudo sh -c "echo export SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092 >> /etc/profile"
브로커 정보확인하는 명령어
# 브로커 정보
kubectl exec -it ds/myclient -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS
# 브로커에 설정된 각종 기본값 확인 : --broker --all --describe 로 조회
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 1 --all --describe
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 2 --all --describe
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 0 --all --describe
토픽 정보확인하는 명령어
# 토픽 리스트 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list
# 토픽 리스트 확인 (kubectl native) : PARTITIONS, REPLICATION FACTOR
kubectl get kafkatopics -n kafka
Kafka UI(UI for Apache Kafka) 배포하기
# 레포 추가하기
helm repo add kafka-ui https://provectus.github.io/kafka-ui-charts
# 참조할 value파일 만들기
cat <<EOF > kafkaui-values.yml
yamlApplicationConfig:
kafka:
clusters:
- name: yaml
bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc:9092
auth:
type: disabled
management:
health:
ldap:
enabled: false
EOF
# 배포하기
helm install kafka-ui kafka-ui/kafka-ui -f kafkaui-values.yml
# 접속 확인
kubectl patch svc kafka-ui -p '{"spec":{"type":"LoadBalancer"}}'
kubectl annotate service kafka-ui "external-dns.alpha.kubernetes.io/hostname=kafka-ui"
echo -e "kafka-ui Web URL = http://kafka-ui"
배포가 완료되었으며, 해당 대시보드에 접근해보자.
kafka-ui가 떠있는 EXTERNAL-IP(클러스터의 IP)로 직접 접근해서 확인할 수 있습니다.
위의 경우 EXTERNAL-IP는 34.64.75.130 이며, 사용하는 포트는 80입니다.
브라우저 주소 창에 EXTERNAL-IP:PORT번호 를 입력합니다.
Prometheus + Grafana 모니터링 연결하기
Exporter 설정된 카프카 클러스터 배포
# exporter 관련 설정 확인
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/kafka-2.yaml
cat kafka-2.yaml | yh
# exporter 설정된 카프카 클러스터 배포
kubectl apply -f kafka-2.yaml -n kafka
이때 Deployment로 kafka-exporter가 생성되고, 생성되었던 카프카클러스터 Update가 됩니다.
- 기존 카프카 클러스터가 update되기때문에 RollingUpdate하는 과정을 진행한다(default옵션).
예제코드 복사하기
#
git clone https://github.com/AmarendraSingh88/kafka-on-kubernetes.git
cd kafka-on-kubernetes/kafka-demo/demo3-monitoring/
프로메테우스 설치하기
#monitoring namespace가 없기때문에 만들어준다.
kubectl create namespace monitoring
# 프로메테우스 설치 : --server-side 옵션 설정
kubectl apply -f prometheus-operator-deployment.yaml -n monitoring --server-side
kubectl apply -f prometheus.yaml -n monitoring
kubectl apply -f prometheus-rules.yaml -n monitoring
kubectl apply -f strimzi-pod-monitor.yaml -n monitoring
그라파나 설치하기
# 그라파나 설치
kubectl apply -f grafana/grafana.yaml -n monitoring
kubectl patch svc -n monitoring grafana -p '{"spec":{"type":"LoadBalancer"}}'
kubectl annotate service grafana -n monitoring "external-dns.alpha.kubernetes.io/hostname=grafana.$MyDomain"
# 접속 정보 확인
echo -e "Grafana URL = http://grafana.$MyDomain:3000"
그라파나 대시보드 접속방법
- grafana service는 monitoring namespace내부에 배포되기때문에 직접확인하려면
kubectl get svc -n monitoring
으로 확인할 수 있습니다.
kafka ui를 확인할때와 똑같이, EXTERNAL-IP:PORT 로 접근하겠습니다.
- 위의 사진에서는 34.64.218.79:3000
- 그라파나 웹 접속 : admin / admin
- 그라파나 데이터 소스 설정 : 프로메테우스파드-0 에 헤드리스 접속 =주소 입력 후 연결 확인
prometheus-prometheus-0.prometheus-operated.monitoring.svc.cluster.local:
9090
하단에 저장버튼 누르기!
- 그라파나 대시보드 추가:
아래링크에서 원하는 json 파일을 다운로드 받는다.
다운로드 받은 json 파일을 import 합니다.
데이터 소스를 Prometheus 로 지정해줘야합니다.
완성!
OUTRO
다음게시글에서는 토픽생성과 메세지 전달에 대해서 작성해보겠습니다.
'외부활동' 카테고리의 다른 글
코딩테스트 준비 플랫폼 추천: 코드트리 (0) | 2024.03.03 |
---|---|
[DOIK2] 스터디: Stackable 로 Airflow 배포하기 + 스터디후기 (3) | 2023.11.26 |
[DOIK2] 스터디: 조금 자세하게 설명한 Kafka (0) | 2023.11.15 |
구글 클라우드 스터디잼: GenAI 수료후기 (1) | 2023.11.11 |
[DOIK2] 스터디: Percona Operator for mongoDB (0) | 2023.11.11 |