본문 바로가기
Infra System

kafka cluster installation on kubernetes

by kellis 2020. 10. 13.

Kafka on kubernetes를 참고하여, kafka cluster를 설치하였으며, 실제로 수행한 명령어만 이 포스트에 기재되어 있습니다. 자세한 설명은 링크를 참고하시기 바랍니다. 

 

또한, helm을 이용하여 설치하였으므로 helm이 설치되어 있어야 합니다. 

 

1. 설치 방법

1. helm repository 추가

helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator

2. helm으로 kafka cluster 설치

helm install --name air-kafka --namespace message-broker incubator/kafka -f values.yaml

values.yaml을 옵션으로 넘겨주지 않으면 기본 설정으로 설치되며, 기본 설정 시에는 노드-컨테이너 간 포트 매핑이 안되기 때문에 외부에서 접근이 불가능합니다. 

 

values.yaml은 다음과 같고, {masterNodeIp}에는 해당하는 값을 넣어주어야 합니다. 

더보기

# ------------------------------------------------------------------------------

# Kafka:

# ------------------------------------------------------------------------------

 

## The StatefulSet installs 3 pods by default

replicas: 3

 

## The kafka image repository

image: "confluentinc/cp-kafka"

 

## The kafka image tag

imageTag: "5.0.1"  # Confluent image for Kafka 2.0.0

 

## Specify a imagePullPolicy

## ref: http://kubernetes.io/docs/user-guide/images/#pre-pulling-images

imagePullPolicy: "IfNotPresent"

 

## Configure resource requests and limits

## ref: http://kubernetes.io/docs/user-guide/compute-resources/

resources: {}

  # limits:

  #   cpu: 200m

  #   memory: 1536Mi

  # requests:

  #   cpu: 100m

  #   memory: 1024Mi

kafkaHeapOptions: "-Xmx1G -Xms1G"

 

## Optional Container Security context

securityContext: {}

 

## The StatefulSet Update Strategy which Kafka will use when changes are applied.

## ref: https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#update-strategies

updateStrategy:

  type: "OnDelete"

 

## Start and stop pods in Parallel or OrderedReady (one-by-one.)  Note - Can not change after first release.

## ref: https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/#pod-management-policy

podManagementPolicy: OrderedReady

 

## Useful if using any custom authorizer

## Pass in some secrets to use (if required)

# secrets:

# - name: myKafkaSecret

#   keys:

#     - username

#     - password

#   # mountPath: /opt/kafka/secret

# - name: myZkSecret

#   keys:

#     - user

#     - pass

#   mountPath: /opt/zookeeper/secret

 

 

## The subpath within the Kafka container's PV where logs will be stored.

## This is combined with `persistence.mountPath`, to create, by default: /opt/kafka/data/logs

logSubPath: "logs"

 

## Use an alternate scheduler, e.g. "stork".

## ref: https://kubernetes.io/docs/tasks/administer-cluster/configure-multiple-schedulers/

##

# schedulerName:

 

## Use an alternate serviceAccount

## Useful when using images in custom repositories

# serviceAccountName:

 

## Set a pod priorityClassName

# priorityClassName: high-priority

 

## Pod scheduling preferences (by default keep pods within a release on separate nodes).

## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity

## By default we don't set affinity

affinity: {}

## Alternatively, this typical example defines:

## antiAffinity (to keep Kafka pods on separate pods)

## and affinity (to encourage Kafka pods to be collocated with Zookeeper pods)

# affinity:

#   podAntiAffinity:

#     requiredDuringSchedulingIgnoredDuringExecution:

#     - labelSelector:

#         matchExpressions:

#         - key: app

#           operator: In

#           values:

#           - kafka

#       topologyKey: "kubernetes.io/hostname"

#   podAffinity:

#     preferredDuringSchedulingIgnoredDuringExecution:

#      - weight: 50

#        podAffinityTerm:

#          labelSelector:

#            matchExpressions:

#            - key: app

#              operator: In

#              values:

#                - zookeeper

#          topologyKey: "kubernetes.io/hostname"

 

## Node labels for pod assignment

## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#nodeselector

nodeSelector: {}

 

## Readiness probe config.

## ref: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/

##

readinessProbe:

  initialDelaySeconds: 30

  periodSeconds: 10

  timeoutSeconds: 5

  successThreshold: 1

  failureThreshold: 3

 

## Period to wait for broker graceful shutdown (sigterm) before pod is killed (sigkill)

## ref: https://kubernetes-v1-4.github.io/docs/user-guide/production-pods/#lifecycle-hooks-and-termination-notice

## ref: https://kafka.apache.org/10/documentation.html#brokerconfigs controlled.shutdown.*

terminationGracePeriodSeconds: 60

 

# Tolerations for nodes that have taints on them.

# Useful if you want to dedicate nodes to just run kafka

https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/

tolerations: []

# tolerations:

# - key: "key"

#   operator: "Equal"

#   value: "value"

#   effect: "NoSchedule"

 

## Headless service.

##

headless:

  # annotations:

  # targetPort:

  port: 9092

 

## External access.

##

external:

  enabled: true

  # type can be either NodePort or LoadBalancer

  type: NodePort

  # annotations:

  #  service.beta.kubernetes.io/openstack-internal-load-balancer: "true"

  dns:

    useInternal: false

    useExternal: true

  # If using external service type LoadBalancer and external dns, set distinct to true below.

  # This creates an A record for each statefulset pod/broker. You should then map the

  # A record of the broker to the EXTERNAL IP given by the LoadBalancer in your DNS server.

  distinct: false

  servicePort: 19092

  firstListenerPort: 31090

  domain: cluster.local

  loadBalancerIP: []

  init:

    image: "lwolf/kubectl_deployer"

    imageTag: "0.4"

    imagePullPolicy: "IfNotPresent"

 

# Annotation to be added to Kafka pods

podAnnotations: {}

 

# Labels to be added to Kafka pods

podLabels: {}

  # service: broker

  # team: developers

 

podDisruptionBudget: {}

  # maxUnavailable: 1  # Limits how many Kafka pods may be unavailable due to voluntary disruptions.

 

## Configuration Overrides. Specify any Kafka settings you would like set on the StatefulSet

## here in map format, as defined in the official docs.

## ref: https://kafka.apache.org/documentation/#brokerconfigs

##

configurationOverrides:

  "confluent.support.metrics.enable": false  # Disables confluent metric submission

  # "auto.leader.rebalance.enable": true

  # "auto.create.topics.enable": true

  # "controlled.shutdown.enable": true

  # "controlled.shutdown.max.retries": 100

 

  ## Options required for external access via NodePort

  ## ref:

  ## - http://kafka.apache.org/documentation/#security_configbroker

  ## - https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic

  ##

  ## Setting "advertised.listeners" here appends to "PLAINTEXT://${POD_IP}:9092,", ensure you update the domain

  ## If external service type is Nodeport:

  "advertised.listeners": |-

     EXTERNAL://{MasterNodeIP}:$((31090 + ${KAFKA_BROKER_ID}))

  ## If external service type is LoadBalancer and distinct is true:

  # "advertised.listeners": |-

  #   EXTERNAL://kafka-$((${KAFKA_BROKER_ID})).cluster.local:19092

  ## If external service type is LoadBalancer and distinct is false:

  # "advertised.listeners": |-

  #   EXTERNAL://${LOAD_BALANCER_IP}:31090

  ## Uncomment to define the EXTERNAL Listener protocol

  "listener.security.protocol.map": |-

     PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT

 

## set extra ENVs

#   key: "value"

envOverrides: {}

 

 

## A collection of additional ports to expose on brokers (formatted as normal containerPort yaml)

# Useful when the image exposes metrics (like prometheus, etc.) through a javaagent instead of a sidecar

additionalPorts: {}

 

## Persistence configuration. Specify if and how to persist data to a persistent volume.

##

persistence:

  enabled: true

 

  ## The size of the PersistentVolume to allocate to each Kafka Pod in the StatefulSet. For

  ## production servers this number should likely be much larger.

  ##

  size: "1Gi"

 

  ## The location within the Kafka container where the PV will mount its storage and Kafka will

  ## store its logs.

  ##

  mountPath: "/opt/kafka/data"

 

  ## Kafka data Persistent Volume Storage Class

  ## If defined, storageClassName: <storageClass>

  ## If set to "-", storageClassName: "", which disables dynamic provisioning

  ## If undefined (the default) or set to null, no storageClassName spec is

  ##   set, choosing the default provisioner.  (gp2 on AWS, standard on

  ##   GKE, AWS & OpenStack)

  ##

  # storageClass:

 

jmx:

  ## Rules to apply to the Prometheus JMX Exporter.  Note while lots of stats have been cleaned and exposed,

  ## there are still more stats to clean up and expose, others will never get exposed.  They keep lots of duplicates

  ## that can be derived easily.  The configMap in this chart cleans up the metrics it exposes to be in a Prometheus

  ## format, eg topic, broker are labels and not part of metric name. Improvements are gladly accepted and encouraged.

  configMap:

 

    ## Allows disabling the default configmap, note a configMap is needed

    enabled: true

 

    ## Allows setting values to generate confimap

    ## To allow all metrics through (warning its crazy excessive) comment out below `overrideConfig` and set

    ## `whitelistObjectNames: []`

    overrideConfig: {}

      # jmxUrl: service:jmx:rmi:///jndi/rmi://127.0.0.1:5555/jmxrmi

      # lowercaseOutputName: true

      # lowercaseOutputLabelNames: true

      # ssl: false

      # rules:

      # - pattern: ".*"

 

    ## If you would like to supply your own ConfigMap for JMX metrics, supply the name of that

    ## ConfigMap as an `overrideName` here.

    overrideName: ""

 

  ## Port the jmx metrics are exposed in native jmx format, not in Prometheus format

  port: 5555

 

  ## JMX Whitelist Objects, can be set to control which JMX metrics are exposed.  Only whitelisted

  ## values will be exposed via JMX Exporter.  They must also be exposed via Rules.  To expose all metrics

  ## (warning its crazy excessive and they aren't formatted in a prometheus style) (1) `whitelistObjectNames: []`

  ## (2) commented out above `overrideConfig`.

  whitelistObjectNames:  # []

  - kafka.controller:*

  - kafka.server:*

  - java.lang:*

  - kafka.network:*

  - kafka.log:*

 

## Prometheus Exporters / Metrics

##

prometheus:

  ## Prometheus JMX Exporter: exposes the majority of Kafkas metrics

  jmx:

    enabled: false

 

    ## The image to use for the metrics collector

    image: solsson/kafka-prometheus-jmx-exporter@sha256

 

    ## The image tag to use for the metrics collector

    imageTag: a23062396cd5af1acdf76512632c20ea6be76885dfc20cd9ff40fb23846557e8

 

    ## Interval at which Prometheus scrapes metrics, note: only used by Prometheus Operator

    interval: 10s

 

    ## Timeout at which Prometheus timeouts scrape run, note: only used by Prometheus Operator

    scrapeTimeout: 10s

 

    ## Port jmx-exporter exposes Prometheus format metrics to scrape

    port: 5556

 

    resources: {}

      # limits:

      #   cpu: 200m

      #   memory: 1Gi

      # requests:

      #   cpu: 100m

      #   memory: 100Mi

 

  ## Prometheus Kafka Exporter: exposes complimentary metrics to JMX Exporter

  kafka:

    enabled: false

 

    ## The image to use for the metrics collector

    image: danielqsj/kafka-exporter

 

    ## The image tag to use for the metrics collector

    imageTag: v1.2.0

 

    ## Interval at which Prometheus scrapes metrics, note: only used by Prometheus Operator

    interval: 10s

 

    ## Timeout at which Prometheus timeouts scrape run, note: only used by Prometheus Operator

    scrapeTimeout: 10s

 

    ## Port kafka-exporter exposes for Prometheus to scrape metrics

    port: 9308

 

    ## Resource limits

    resources: {}

#      limits:

#        cpu: 200m

#        memory: 1Gi

#      requests:

#        cpu: 100m

#        memory: 100Mi

 

    # Tolerations for nodes that have taints on them.

    # Useful if you want to dedicate nodes to just run kafka-exporter

    # https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/

    tolerations: []

    # tolerations:

    # - key: "key"

    #   operator: "Equal"

    #   value: "value"

    #   effect: "NoSchedule"

 

    ## Pod scheduling preferences (by default keep pods within a release on separate nodes).

    ## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity

    ## By default we don't set affinity

    affinity: {}

    ## Alternatively, this typical example defines:

    ## affinity (to encourage Kafka Exporter pods to be collocated with Kafka pods)

    # affinity:

    #   podAffinity:

    #     preferredDuringSchedulingIgnoredDuringExecution:

    #      - weight: 50

    #        podAffinityTerm:

    #          labelSelector:

    #            matchExpressions:

    #            - key: app

    #              operator: In

    #              values:

    #                - kafka

    #          topologyKey: "kubernetes.io/hostname"

 

    ## Node labels for pod assignment

    ## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#nodeselector

    nodeSelector: {}

 

  operator:

    ## Are you using Prometheus Operator?

    enabled: false

 

    serviceMonitor:

      # Namespace Prometheus is installed in

      namespace: monitoring

 

      ## Defaults to whats used if you follow CoreOS [Prometheus Install Instructions](https://github.com/coreos/prometheus-operator/tree/master/helm#tldr)

      ## [Prometheus Selector Label](https://github.com/coreos/prometheus-operator/blob/master/helm/prometheus/templates/prometheus.yaml#L65)

      ## [Kube Prometheus Selector Label](https://github.com/coreos/prometheus-operator/blob/master/helm/kube-prometheus/values.yaml#L298)

      selector:

        prometheus: kube-prometheus

 

## Kafka Config job configuration

##

configJob:

  ## Specify the number of retries before considering kafka-config job as failed.

  ## https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/#pod-backoff-failure-policy

  backoffLimit: 6

 

## Topic creation and configuration.

## The job will be run on a deployment only when the config has been changed.

## - If 'partitions' and 'replicationFactor' are specified we create the topic (with --if-not-exists.)

## - If 'partitions', 'replicationFactor' and 'reassignPartitions' are specified we reassign the partitions to

## increase the replication factor of an existing topic.

## - If 'partitions' is specified we 'alter' the number of partitions. This will

## silently and safely fail if the new setting isn’t strictly larger than the old (i.e. a NOOP.) Do be aware of the

## implications for keyed topics (ref: https://docs.confluent.io/current/kafka/post-deployment.html#admin-operations)

## - If 'defaultConfig' is specified it's deleted from the topic configuration. If it isn't present,

## it will silently and safely fail.

## - If 'config' is specified it's added to the topic configuration.

##

## Note: To increase the 'replicationFactor' of a topic, 'reassignPartitions' must be set to true (see above).

##

topics: []

  # - name: myExistingTopicConfig

  #   config: "cleanup.policy=compact,delete.retention.ms=604800000"

  # - name: myExistingTopicReassignPartitions

  #   partitions: 8

  #   replicationFactor: 5

  #   reassignPartitions: true

  # - name: myExistingTopicPartitions

  #   partitions: 8

  # - name: myNewTopicWithConfig

  #   partitions: 8

  #   replicationFactor: 3

  #   defaultConfig: "segment.bytes,segment.ms"

  #   config: "cleanup.policy=compact,delete.retention.ms=604800000"

  # - name: myAclTopicPartitions

  #   partitions: 8

  #   acls:

  #     - user: read

  #       operations: [ Read ]

  #     - user: read_and_write

  #       operations:

  #         - Read

  #         - Write

  #     - user: all

  #       operations: [ All ]

 

# ------------------------------------------------------------------------------

# Zookeeper:

# ------------------------------------------------------------------------------

 

zookeeper:

  ## If true, install the Zookeeper chart alongside Kafka

  ## ref: https://github.com/kubernetes/charts/tree/master/incubator/zookeeper

  enabled: true

 

  ## Configure Zookeeper resource requests and limits

  ## ref: http://kubernetes.io/docs/user-guide/compute-resources/

  resources: ~

 

  ## Environmental variables to set in Zookeeper

  env:

    ## The JVM heap size to allocate to Zookeeper

    ZK_HEAP_SIZE: "1G"

 

  persistence:

    enabled: false

    ## The amount of PV storage allocated to each Zookeeper pod in the statefulset

    # size: "2Gi"

 

  ## Specify a Zookeeper imagePullPolicy

  ## ref: http://kubernetes.io/docs/user-guide/images/#pre-pulling-images

  image:

    PullPolicy: "IfNotPresent"

 

  ## If the Zookeeper Chart is disabled a URL and port are required to connect

  url: ""

  port: 2181

 

  ## Pod scheduling preferences (by default keep pods within a release on separate nodes).

  ## ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity

  ## By default we don't set affinity:

  affinity: {}  # Criteria by which pod label-values influence scheduling for zookeeper pods.

  # podAntiAffinity:

  #   requiredDuringSchedulingIgnoredDuringExecution:

  #     - topologyKey: "kubernetes.io/hostname"

  #       labelSelector:

  #         matchLabels:

  #           release: zookeeper


2. 컨테이너에서의 기본 명령

helm install 시에 실행 방법에 대한 명령어입니다.

 

1. 토픽 리스트 목록 확인

kubectl -n message-broker exec air-kafka-0 -- kafka-topics --zookeeper air-kafka-zookeeper:2181 --list

2. 토픽 생성

kubectl -n message-broker exec air-kafka-0 -- kafka-topics --zookeeper air-kafka-zookeeper:2181 --topic {토픽명} --create --partitions 1 --replication-factor 1

3. 토픽 구독

kubectl -n message-broker exec -ti air-kafka-0 -- kafka-console-consumer --bootstrap-server air-kafka:9092 --topic {토픽명} --from-beginning

4. 토픽 생산

kubectl -n message-broker exec -ti air-kafka-0 -- kafka-console-producer --broker-list air-kafka-headless:9092 --topic {토픽명}

위 명령어로 해당 컨테이너에 kafka가 잘 설치되었는지 확인 가능합니다. 


3. 외부에서 접근 가능한지 확인

1. 먼저 kafka가 어떤 포트로 오픈되었는지 확인합니다.

kubectl get svc -n message-broker

아래 air-kafka-0-external의 포트를 보면, 19092:31090로 되어있는데(31090은 상황에 따라 변경될 수 있음) 외부에서 접근 가능한 포트는 31090입니다. 

2. 이 포트에 대해 접근 가능하도록 보안 그룹에서 설정해줍니다. 

3. kafka가 설치된 외부 서버에서 토픽을 구독/생성하는 등의 테스트를 해봅니다. 


4. 원격 접속

 

1. 토픽 생성

bin/kafka-topics.sh --bootstrap-server 13.209.8.232:31090 --create --topic test1 --partitions 3 --replication-factor 3

2. 토픽 목록 조회

bin/kafka-topics.sh --bootstrap-server 13.209.8.232:31090 --list

3. consumer

bin/kafka-console-consumer.sh --bootstrap-server 13.209.8.232:31090 --topic 토픽명 --from-beginning

4. producer

bin/kafka-console-producer.sh --broker-list  13.209.8.232:31090 --topic 토픽명

5. 토픽 삭제

bin/kafka-topics.sh --delete --bootstrap-server 13.209.8.232:31090 --topic 토픽명

'Infra System' 카테고리의 다른 글

mongodb install on ec2  (0) 2020.10.13
mariadb cluster install on kubernetes  (0) 2020.10.13
jenkins install on kubernetes  (0) 2020.10.13
helm installation  (0) 2020.10.13
Kops로 Kubernetes cluster 구축하기  (0) 2020.10.13

댓글