ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka docker-compose로 구축
    Kafka 2021. 9. 17. 16:28

    단일 kafka, docker-compose로 구축

    Kafka를 로컬 환경에서 테스트하기 위해 docker-compose위에 올릴 수 있다.

     

    docker-compose: yaml파일을 통해 여러 container를 하나로 묶어서 한번에 docker환경에 올리는 방식이다. 서로 관련이 있는 service를 한번에 올릴때 편리하다. 

     

    docker-compose를 위한  yml file을 작성해보자.

    version: '2.1'
    services:                    #생성할 서비스 목록
      zookeeper:
        hostname: lonezk
        image: zookeeper:3.4.9            #image file, 없으면 hub에서 자동으로 가져옴.
        ports:
          - "2184:2184"
        environment:                      #환경 변수 설정
            ZOO_MY_ID: 4
            ZOO_PORT: 2184
            ZOO_SERVER: server.4=lonezk:2888:3888
        volumes:                          #디렉토리 마운트file
          - ./lonekafka/lonezk/data:/data
          - ./lonekafka/lonezk/datalog:/datalog
    
    
      kafka:
        hostname: lonekafka
        image: confluentinc/cp-kafka:5.5.1
        ports:
          - "9095:9095"
        environment:
          KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://lonekafka:19095,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9095
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
          #KAFKA_ADVERTISED_HOST_NAME: host.docker.internal
          KAFKA_ZOOKEEPER_CONNECT: "lonezk:2184"
          KAFKA_BROKER_ID: 4
    
        volumes:
          - ./lonekafka/data:/var/lib/kafka/data
        depends_on:               #lonezk 가 생성된 후에 lonekafka를 설치한다.
          - zookeeper
    
      kafdrop:         #kafka broker를 모니터링하기 위한 UI
        image: obsidiandynamics/kafdrop
        restart: "no"
        ports:
          - "9001:9001"
        environment:
          KAFKA_BROKERCONNECT: "lonekafka:19095"
          JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
          SERVER_PORT: 9001
          MANAGEMENT_SERVER_PORT: 9001
        depends_on:
          - kafka

    조금 자세히 살펴보면,

     

    • hostname : container에 이름을 지정해주는 것이라고 생각할 수 있다.
    • image : docker image를 뜻하는데 local에 이 이미지가 없을 경우 dockerhub에서 자동으로 끌어와 사용한다.
    • ports : port 지정, kafka 경우, 주로 9092를 많이 사용하는데, 본인은 9092~9094까지 다른 곳에 할당되어 있어,  9095를 사용하였다.
    • environment :  환경변수를 설정한다
    • depends_on : 어떤 container를 먼저 올릴지 결정한다. 예를들어 depends_on : -kafka 라고 되어있으면, kafka 먼저 올리고 다음에 해당 컨테이너를 올리겠다는 뜻이다. 
    • volume : 마운트할 볼륨을 지정한다. 로컬에 있는 directory를 container 안에 마운트 한다고 생각하면 된다.

    docker-compose를 올릴때는 

    #shell,bash
    docker-compose -f <yaml 파일이름> up -d

    -f : 파일명 지정, 파일명 지정없이 쓰고 싶으면 yaml파일 이름을 docker-compose.yml로 지정하면 된다.

    -d : 백그라운드에서 실행

     

    실행시 다음과 같이 올라간 것을 확인할 수 있다.

    docker로 확인해본 모습

     

    kafka client test - producer

    kafka consumer producer를 생성하여 kafka가 정상 작동하는지 확인해보자.

    kafka에서 제공하는 console client를 사용해도 되지만 python library 중 kafka-python을 사용하였다.

    (kafka-python : https://github.com/dpkp/kafka-python)

     

    kafka-python library는 pip을 통해 설치하였다. 

    from kafka import KafkaProducer
    from json import dumps
    from time import sleep
    import sys
    
    def on_send_success(record_metadata):
        print("topic recorded:",record_metadata.topic)
        print("partition recorded:",record_metadata.partition)
        print("offset recorded:",record_metadata.offset)
    
    def on_send_error(excp):
            print(excp)
    
    # 카프카 서버
    bootstrap_servers = ["localhost:9095"]
    
    # 카프카 producer 생성
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                             key_serializer=None,
                             acks=1,
                             #linger_ms=500,
                             value_serializer=lambda x: dumps(x).encode('utf-8')
                            )
    
    # 카프카 토픽
    str_topic_name = 'lone-p1r1'
    
    #produce
    for i in range(50):
        response = producer.send(str_topic_name,
                                 "test"
                                ).add_callback(on_send_success).add_errback(on_send_error)
        time.sleep(0.5)

    이전 포스트에서 설명했다시피 bootstrap_server를 지정해줌으로써  broker와 연결한다. 

     

    producer를 생성할때는 여러 설정 값들을 줄 수 있다. (설정 값 docs : https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)

     

    kafka topic을 지정할 때는 원하는 토픽의 이름을 적어주면 되고, 만약 없는 토픽이라면 자동적으로 그 이름의 토픽이 생성된다.

    단, default 토픽 설정은 replication factor : 1, partitton: 1 로 설정된다.

     

    이 코드에서는 test라는 메시지를 50회 전송하였다. 물론 key값과 value값을 명시해서 produce할 수 있다.

     

    produce되는 모습

    kafka broker monitoring UI : Kafdrop

    사실 docker-compose를 올릴때 kafdrop이라는 컨테이너도 함께 올린 것을 확인할 수 있다.

    kafdrop은 kafka broker를 모니터링하는 웹 ui이다.

     

    따라서 localhost:9001로 접속하면 broker 정보를 확인할 수 있다. (설정해준 포트번호에 따라 주소가 바뀔수도 있다. 본인은  9001 포트를 할당했다.)

     

    이 처럼  kafka의 상태, topic등 다양한 정보를 확인할 수 있다.

    방금 우리가 메시지를 보낸 lone-p1r1의 메시지도 볼 수 있다.

    'view message' 를 누르면 메시지 내용도 확인할 수 있다.

     

    consumer.py

    이제 kafka 'lone-p1r1'에 있는 메시지를 consume 해보자

    from kafka import KafkaConsumer
    from json import loads
    from time import sleep
    import time
    
    
    # 카프카 서버
    bootstrap_servers = ["localhost:9095"]
    
    # 카프카 토픽
    str_topic_name    = 'lone-p1r1'
    
    # 카프카 consumer group 생성
    str_group_name = 'g1'
    
    #-------------comsumption data-----------
    consumer = KafkaConsumer(str_topic_name,         #kafka topic name
                             bootstrap_servers= ["localhost:9095"],#kafka server
                             auto_offset_reset='earliest',            #가장 처음 offset부터
                             enable_auto_commit=True,      #마지막으로 읽은 offset 위치 commit
                             auto_commit_interval_ms=500,  #offset commit 주기, default : 5000
                             #group_id=str_group_name,     #이 consumer가 생성될 consumer group
                             value_deserializer=lambda x: loads(x.decode('utf-8')) #serialize된 메시지를 deserialize
                            )
    for event in consumer:
         event_data = event.value
         print(event_data)
         sleep(1)

    consumer group을 지정해줄수 있다. 

    consumer도 여러 설정값이 있다. (설정값 docs : https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html)

     

    메시지가 consume됨을 확인할 수 있다.

     

JackCokebb dev blog