Kafka on kubernetes를 참고하여, kafka cluster를 설치하였으며, 실제로 수행한 명령어만 이 포스트에 기재되어 있습니다. 자세한 설명은 링크를 참고하시기 바랍니다.
또한, helm을 이용하여 설치하였으므로 helm이 설치되어 있어야 합니다.
1. 설치 방법
1. helm repository 추가
helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
2. helm으로 kafka cluster 설치
helm install --name air-kafka --namespace message-broker incubator/kafka -f values.yaml
values.yaml을 옵션으로 넘겨주지 않으면 기본 설정으로 설치되며, 기본 설정 시에는 노드-컨테이너 간 포트 매핑이 안되기 때문에 외부에서 접근이 불가능합니다.
values.yaml은 다음과 같고, {masterNodeIp}에는 해당하는 값을 넣어주어야 합니다.
# ------------------------------------------------------------------------------
# Kafka:
# ------------------------------------------------------------------------------
## The StatefulSet installs 3 pods by default
replicas: 3
## The kafka image repository
image: "confluentinc/cp-kafka"
## The kafka image tag
imageTag: "5.0.1" # Confluent image for Kafka 2.0.0
## Specify a imagePullPolicy
## ref: http://kubernetes.io/docs/user-guide/images/#pre-pulling-images
imagePullPolicy: "IfNotPresent"
## Configure resource requests and limits
## ref: http://kubernetes.io/docs/user-guide/compute-resources/
resources: {}
# limits:
# cpu: 200m
# memory: 1536Mi
# requests:
# cpu: 100m
# memory: 1024Mi
kafkaHeapOptions: "-Xmx1G -Xms1G"
## Optional Container Security context
securityContext: {}
## The StatefulSet Update Strategy which Kafka will use when changes are applied.
## ref: https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#update-strategies
updateStrategy:
type: "OnDelete"
## Start and stop pods in Parallel or OrderedReady (one-by-one.) Note - Can not change after first release.
## ref: https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/#pod-management-policy
podManagementPolicy: OrderedReady
## Useful if using any custom authorizer
## Pass in some secrets to use (if required)
# secrets:
# - name: myKafkaSecret
# keys:
# - username
# - password
# # mountPath: /opt/kafka/secret
# - name: myZkSecret
# keys:
# - user
# - pass
# mountPath: /opt/zookeeper/secret
## The subpath within the Kafka container's PV where logs will be stored.
## This is combined with `persistence.mountPath`, to create, by default: /opt/kafka/data/logs
logSubPath: "logs"
## Use an alternate scheduler, e.g. "stork".
## ref: https://kubernetes.io/docs/tasks/administer-cluster/configure-multiple-schedulers/
##
# schedulerName:
## Use an alternate serviceAccount
## Useful when using images in custom repositories
# serviceAccountName:
## Set a pod priorityClassName
# priorityClassName: high-priority
## Pod scheduling preferences (by default keep pods within a release on separate nodes).
## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
## By default we don't set affinity
affinity: {}
## Alternatively, this typical example defines:
## antiAffinity (to keep Kafka pods on separate pods)
## and affinity (to encourage Kafka pods to be collocated with Zookeeper pods)
# affinity:
# podAntiAffinity:
# requiredDuringSchedulingIgnoredDuringExecution:
# - labelSelector:
# matchExpressions:
# - key: app
# operator: In
# values:
# - kafka
# topologyKey: "kubernetes.io/hostname"
# podAffinity:
# preferredDuringSchedulingIgnoredDuringExecution:
# - weight: 50
# podAffinityTerm:
# labelSelector:
# matchExpressions:
# - key: app
# operator: In
# values:
# - zookeeper
# topologyKey: "kubernetes.io/hostname"
## Node labels for pod assignment
## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#nodeselector
nodeSelector: {}
## Readiness probe config.
## ref: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/
##
readinessProbe:
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 3
## Period to wait for broker graceful shutdown (sigterm) before pod is killed (sigkill)
## ref: https://kafka.apache.org/10/documentation.html#brokerconfigs controlled.shutdown.*
terminationGracePeriodSeconds: 60
# Tolerations for nodes that have taints on them.
# Useful if you want to dedicate nodes to just run kafka
# https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
tolerations: []
# tolerations:
# - key: "key"
# operator: "Equal"
# value: "value"
# effect: "NoSchedule"
## Headless service.
##
headless:
# annotations:
# targetPort:
port: 9092
## External access.
##
external:
enabled: true
# type can be either NodePort or LoadBalancer
type: NodePort
# annotations:
# service.beta.kubernetes.io/openstack-internal-load-balancer: "true"
dns:
useInternal: false
useExternal: true
# If using external service type LoadBalancer and external dns, set distinct to true below.
# This creates an A record for each statefulset pod/broker. You should then map the
# A record of the broker to the EXTERNAL IP given by the LoadBalancer in your DNS server.
distinct: false
servicePort: 19092
firstListenerPort: 31090
domain: cluster.local
loadBalancerIP: []
init:
image: "lwolf/kubectl_deployer"
imageTag: "0.4"
imagePullPolicy: "IfNotPresent"
# Annotation to be added to Kafka pods
podAnnotations: {}
# Labels to be added to Kafka pods
podLabels: {}
# service: broker
# team: developers
podDisruptionBudget: {}
# maxUnavailable: 1 # Limits how many Kafka pods may be unavailable due to voluntary disruptions.
## Configuration Overrides. Specify any Kafka settings you would like set on the StatefulSet
## here in map format, as defined in the official docs.
## ref: https://kafka.apache.org/documentation/#brokerconfigs
##
configurationOverrides:
"confluent.support.metrics.enable": false # Disables confluent metric submission
# "auto.leader.rebalance.enable": true
# "auto.create.topics.enable": true
# "controlled.shutdown.enable": true
# "controlled.shutdown.max.retries": 100
## Options required for external access via NodePort
## ref:
## - http://kafka.apache.org/documentation/#security_configbroker
## - https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic
##
## Setting "advertised.listeners" here appends to "PLAINTEXT://${POD_IP}:9092,", ensure you update the domain
## If external service type is Nodeport:
"advertised.listeners": |-
EXTERNAL://{MasterNodeIP}:$((31090 + ${KAFKA_BROKER_ID}))
## If external service type is LoadBalancer and distinct is true:
# "advertised.listeners": |-
# EXTERNAL://kafka-$((${KAFKA_BROKER_ID})).cluster.local:19092
## If external service type is LoadBalancer and distinct is false:
# "advertised.listeners": |-
# EXTERNAL://${LOAD_BALANCER_IP}:31090
## Uncomment to define the EXTERNAL Listener protocol
"listener.security.protocol.map": |-
PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
## set extra ENVs
# key: "value"
envOverrides: {}
## A collection of additional ports to expose on brokers (formatted as normal containerPort yaml)
# Useful when the image exposes metrics (like prometheus, etc.) through a javaagent instead of a sidecar
additionalPorts: {}
## Persistence configuration. Specify if and how to persist data to a persistent volume.
##
persistence:
enabled: true
## The size of the PersistentVolume to allocate to each Kafka Pod in the StatefulSet. For
## production servers this number should likely be much larger.
##
size: "1Gi"
## The location within the Kafka container where the PV will mount its storage and Kafka will
## store its logs.
##
mountPath: "/opt/kafka/data"
## Kafka data Persistent Volume Storage Class
## If defined, storageClassName: <storageClass>
## If set to "-", storageClassName: "", which disables dynamic provisioning
## If undefined (the default) or set to null, no storageClassName spec is
## set, choosing the default provisioner. (gp2 on AWS, standard on
## GKE, AWS & OpenStack)
##
# storageClass:
jmx:
## Rules to apply to the Prometheus JMX Exporter. Note while lots of stats have been cleaned and exposed,
## there are still more stats to clean up and expose, others will never get exposed. They keep lots of duplicates
## that can be derived easily. The configMap in this chart cleans up the metrics it exposes to be in a Prometheus
## format, eg topic, broker are labels and not part of metric name. Improvements are gladly accepted and encouraged.
configMap:
## Allows disabling the default configmap, note a configMap is needed
enabled: true
## Allows setting values to generate confimap
## To allow all metrics through (warning its crazy excessive) comment out below `overrideConfig` and set
## `whitelistObjectNames: []`
overrideConfig: {}
# jmxUrl: service:jmx:rmi:///jndi/rmi://127.0.0.1:5555/jmxrmi
# lowercaseOutputName: true
# lowercaseOutputLabelNames: true
# ssl: false
# rules:
# - pattern: ".*"
## If you would like to supply your own ConfigMap for JMX metrics, supply the name of that
## ConfigMap as an `overrideName` here.
overrideName: ""
## Port the jmx metrics are exposed in native jmx format, not in Prometheus format
port: 5555
## JMX Whitelist Objects, can be set to control which JMX metrics are exposed. Only whitelisted
## values will be exposed via JMX Exporter. They must also be exposed via Rules. To expose all metrics
## (warning its crazy excessive and they aren't formatted in a prometheus style) (1) `whitelistObjectNames: []`
## (2) commented out above `overrideConfig`.
whitelistObjectNames: # []
- kafka.controller:*
- kafka.server:*
- java.lang:*
- kafka.network:*
- kafka.log:*
## Prometheus Exporters / Metrics
##
prometheus:
## Prometheus JMX Exporter: exposes the majority of Kafkas metrics
jmx:
enabled: false
## The image to use for the metrics collector
image: solsson/kafka-prometheus-jmx-exporter@sha256
## The image tag to use for the metrics collector
imageTag: a23062396cd5af1acdf76512632c20ea6be76885dfc20cd9ff40fb23846557e8
## Interval at which Prometheus scrapes metrics, note: only used by Prometheus Operator
interval: 10s
## Timeout at which Prometheus timeouts scrape run, note: only used by Prometheus Operator
scrapeTimeout: 10s
## Port jmx-exporter exposes Prometheus format metrics to scrape
port: 5556
resources: {}
# limits:
# cpu: 200m
# memory: 1Gi
# requests:
# cpu: 100m
# memory: 100Mi
## Prometheus Kafka Exporter: exposes complimentary metrics to JMX Exporter
kafka:
enabled: false
## The image to use for the metrics collector
image: danielqsj/kafka-exporter
## The image tag to use for the metrics collector
imageTag: v1.2.0
## Interval at which Prometheus scrapes metrics, note: only used by Prometheus Operator
interval: 10s
## Timeout at which Prometheus timeouts scrape run, note: only used by Prometheus Operator
scrapeTimeout: 10s
## Port kafka-exporter exposes for Prometheus to scrape metrics
port: 9308
## Resource limits
resources: {}
# limits:
# cpu: 200m
# memory: 1Gi
# requests:
# cpu: 100m
# memory: 100Mi
# Tolerations for nodes that have taints on them.
# Useful if you want to dedicate nodes to just run kafka-exporter
# https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
tolerations: []
# tolerations:
# - key: "key"
# operator: "Equal"
# value: "value"
# effect: "NoSchedule"
## Pod scheduling preferences (by default keep pods within a release on separate nodes).
## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
## By default we don't set affinity
affinity: {}
## Alternatively, this typical example defines:
## affinity (to encourage Kafka Exporter pods to be collocated with Kafka pods)
# affinity:
# podAffinity:
# preferredDuringSchedulingIgnoredDuringExecution:
# - weight: 50
# podAffinityTerm:
# labelSelector:
# matchExpressions:
# - key: app
# operator: In
# values:
# - kafka
# topologyKey: "kubernetes.io/hostname"
## Node labels for pod assignment
## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#nodeselector
nodeSelector: {}
operator:
## Are you using Prometheus Operator?
enabled: false
serviceMonitor:
# Namespace Prometheus is installed in
namespace: monitoring
## Defaults to whats used if you follow CoreOS [Prometheus Install Instructions](https://github.com/coreos/prometheus-operator/tree/master/helm#tldr)
## [Prometheus Selector Label](https://github.com/coreos/prometheus-operator/blob/master/helm/prometheus/templates/prometheus.yaml#L65)
## [Kube Prometheus Selector Label](https://github.com/coreos/prometheus-operator/blob/master/helm/kube-prometheus/values.yaml#L298)
selector:
prometheus: kube-prometheus
## Kafka Config job configuration
##
configJob:
## Specify the number of retries before considering kafka-config job as failed.
backoffLimit: 6
## Topic creation and configuration.
## The job will be run on a deployment only when the config has been changed.
## - If 'partitions' and 'replicationFactor' are specified we create the topic (with --if-not-exists.)
## - If 'partitions', 'replicationFactor' and 'reassignPartitions' are specified we reassign the partitions to
## increase the replication factor of an existing topic.
## - If 'partitions' is specified we 'alter' the number of partitions. This will
## silently and safely fail if the new setting isn’t strictly larger than the old (i.e. a NOOP.) Do be aware of the
## implications for keyed topics (ref: https://docs.confluent.io/current/kafka/post-deployment.html#admin-operations)
## - If 'defaultConfig' is specified it's deleted from the topic configuration. If it isn't present,
## it will silently and safely fail.
## - If 'config' is specified it's added to the topic configuration.
##
## Note: To increase the 'replicationFactor' of a topic, 'reassignPartitions' must be set to true (see above).
##
topics: []
# - name: myExistingTopicConfig
# config: "cleanup.policy=compact,delete.retention.ms=604800000"
# - name: myExistingTopicReassignPartitions
# partitions: 8
# replicationFactor: 5
# reassignPartitions: true
# - name: myExistingTopicPartitions
# partitions: 8
# - name: myNewTopicWithConfig
# partitions: 8
# replicationFactor: 3
# defaultConfig: "segment.bytes,segment.ms"
# config: "cleanup.policy=compact,delete.retention.ms=604800000"
# - name: myAclTopicPartitions
# partitions: 8
# acls:
# - user: read
# operations: [ Read ]
# - user: read_and_write
# operations:
# - Read
# - Write
# - user: all
# operations: [ All ]
# ------------------------------------------------------------------------------
# Zookeeper:
# ------------------------------------------------------------------------------
zookeeper:
## If true, install the Zookeeper chart alongside Kafka
## ref: https://github.com/kubernetes/charts/tree/master/incubator/zookeeper
enabled: true
## Configure Zookeeper resource requests and limits
## ref: http://kubernetes.io/docs/user-guide/compute-resources/
resources: ~
## Environmental variables to set in Zookeeper
env:
## The JVM heap size to allocate to Zookeeper
ZK_HEAP_SIZE: "1G"
persistence:
enabled: false
## The amount of PV storage allocated to each Zookeeper pod in the statefulset
# size: "2Gi"
## Specify a Zookeeper imagePullPolicy
## ref: http://kubernetes.io/docs/user-guide/images/#pre-pulling-images
image:
PullPolicy: "IfNotPresent"
## If the Zookeeper Chart is disabled a URL and port are required to connect
url: ""
port: 2181
## Pod scheduling preferences (by default keep pods within a release on separate nodes).
## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
## By default we don't set affinity:
affinity: {} # Criteria by which pod label-values influence scheduling for zookeeper pods.
# podAntiAffinity:
# requiredDuringSchedulingIgnoredDuringExecution:
# - topologyKey: "kubernetes.io/hostname"
# labelSelector:
# matchLabels:
# release: zookeeper
2. 컨테이너에서의 기본 명령
helm install 시에 실행 방법에 대한 명령어입니다.
1. 토픽 리스트 목록 확인
kubectl -n message-broker exec air-kafka-0 -- kafka-topics --zookeeper air-kafka-zookeeper:2181 --list
2. 토픽 생성
kubectl -n message-broker exec air-kafka-0 -- kafka-topics --zookeeper air-kafka-zookeeper:2181 --topic {토픽명} --create --partitions 1 --replication-factor 1
3. 토픽 구독
kubectl -n message-broker exec -ti air-kafka-0 -- kafka-console-consumer --bootstrap-server air-kafka:9092 --topic {토픽명} --from-beginning
4. 토픽 생산
kubectl -n message-broker exec -ti air-kafka-0 -- kafka-console-producer --broker-list air-kafka-headless:9092 --topic {토픽명}
위 명령어로 해당 컨테이너에 kafka가 잘 설치되었는지 확인 가능합니다.
3. 외부에서 접근 가능한지 확인
1. 먼저 kafka가 어떤 포트로 오픈되었는지 확인합니다.
kubectl get svc -n message-broker
아래 air-kafka-0-external의 포트를 보면, 19092:31090로 되어있는데(31090은 상황에 따라 변경될 수 있음) 외부에서 접근 가능한 포트는 31090입니다.
2. 이 포트에 대해 접근 가능하도록 보안 그룹에서 설정해줍니다.
3. kafka가 설치된 외부 서버에서 토픽을 구독/생성하는 등의 테스트를 해봅니다.
4. 원격 접속
1. 토픽 생성
bin/kafka-topics.sh --bootstrap-server 13.209.8.232:31090 --create --topic test1 --partitions 3 --replication-factor 3
2. 토픽 목록 조회
bin/kafka-topics.sh --bootstrap-server 13.209.8.232:31090 --list
3. consumer
bin/kafka-console-consumer.sh --bootstrap-server 13.209.8.232:31090 --topic 토픽명 --from-beginning
4. producer
bin/kafka-console-producer.sh --broker-list 13.209.8.232:31090 --topic 토픽명
5. 토픽 삭제
bin/kafka-topics.sh --delete --bootstrap-server 13.209.8.232:31090 --topic 토픽명
'Infra System' 카테고리의 다른 글
mongodb install on ec2 (0) | 2020.10.13 |
---|---|
mariadb cluster install on kubernetes (0) | 2020.10.13 |
jenkins install on kubernetes (0) | 2020.10.13 |
helm installation (0) | 2020.10.13 |
Kops로 Kubernetes cluster 구축하기 (0) | 2020.10.13 |
댓글