-
Kafka docker-compose로 구축Kafka 2021. 9. 17. 16:28
단일 kafka, docker-compose로 구축
Kafka를 로컬 환경에서 테스트하기 위해 docker-compose위에 올릴 수 있다.
docker-compose: yaml파일을 통해 여러 container를 하나로 묶어서 한번에 docker환경에 올리는 방식이다. 서로 관련이 있는 service를 한번에 올릴때 편리하다.
docker-compose를 위한 yml file을 작성해보자.
version: '2.1' services: #생성할 서비스 목록 zookeeper: hostname: lonezk image: zookeeper:3.4.9 #image file, 없으면 hub에서 자동으로 가져옴. ports: - "2184:2184" environment: #환경 변수 설정 ZOO_MY_ID: 4 ZOO_PORT: 2184 ZOO_SERVER: server.4=lonezk:2888:3888 volumes: #디렉토리 마운트file - ./lonekafka/lonezk/data:/data - ./lonekafka/lonezk/datalog:/datalog kafka: hostname: lonekafka image: confluentinc/cp-kafka:5.5.1 ports: - "9095:9095" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://lonekafka:19095,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9095 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL #KAFKA_ADVERTISED_HOST_NAME: host.docker.internal KAFKA_ZOOKEEPER_CONNECT: "lonezk:2184" KAFKA_BROKER_ID: 4 volumes: - ./lonekafka/data:/var/lib/kafka/data depends_on: #lonezk 가 생성된 후에 lonekafka를 설치한다. - zookeeper kafdrop: #kafka broker를 모니터링하기 위한 UI image: obsidiandynamics/kafdrop restart: "no" ports: - "9001:9001" environment: KAFKA_BROKERCONNECT: "lonekafka:19095" JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify" SERVER_PORT: 9001 MANAGEMENT_SERVER_PORT: 9001 depends_on: - kafka
조금 자세히 살펴보면,
- hostname : container에 이름을 지정해주는 것이라고 생각할 수 있다.
- image : docker image를 뜻하는데 local에 이 이미지가 없을 경우 dockerhub에서 자동으로 끌어와 사용한다.
- ports : port 지정, kafka 경우, 주로 9092를 많이 사용하는데, 본인은 9092~9094까지 다른 곳에 할당되어 있어, 9095를 사용하였다.
- environment : 환경변수를 설정한다
- depends_on : 어떤 container를 먼저 올릴지 결정한다. 예를들어 depends_on : -kafka 라고 되어있으면, kafka 먼저 올리고 다음에 해당 컨테이너를 올리겠다는 뜻이다.
- volume : 마운트할 볼륨을 지정한다. 로컬에 있는 directory를 container 안에 마운트 한다고 생각하면 된다.
docker-compose를 올릴때는
#shell,bash docker-compose -f <yaml 파일이름> up -d
-f : 파일명 지정, 파일명 지정없이 쓰고 싶으면 yaml파일 이름을 docker-compose.yml로 지정하면 된다.
-d : 백그라운드에서 실행
실행시 다음과 같이 올라간 것을 확인할 수 있다.
kafka client test - producer
kafka consumer producer를 생성하여 kafka가 정상 작동하는지 확인해보자.
kafka에서 제공하는 console client를 사용해도 되지만 python library 중 kafka-python을 사용하였다.
(kafka-python : https://github.com/dpkp/kafka-python)
kafka-python library는 pip을 통해 설치하였다.
from kafka import KafkaProducer from json import dumps from time import sleep import sys def on_send_success(record_metadata): print("topic recorded:",record_metadata.topic) print("partition recorded:",record_metadata.partition) print("offset recorded:",record_metadata.offset) def on_send_error(excp): print(excp) # 카프카 서버 bootstrap_servers = ["localhost:9095"] # 카프카 producer 생성 producer = KafkaProducer(bootstrap_servers=bootstrap_servers, key_serializer=None, acks=1, #linger_ms=500, value_serializer=lambda x: dumps(x).encode('utf-8') ) # 카프카 토픽 str_topic_name = 'lone-p1r1' #produce for i in range(50): response = producer.send(str_topic_name, "test" ).add_callback(on_send_success).add_errback(on_send_error) time.sleep(0.5)
이전 포스트에서 설명했다시피 bootstrap_server를 지정해줌으로써 broker와 연결한다.
producer를 생성할때는 여러 설정 값들을 줄 수 있다. (설정 값 docs : https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html)
kafka topic을 지정할 때는 원하는 토픽의 이름을 적어주면 되고, 만약 없는 토픽이라면 자동적으로 그 이름의 토픽이 생성된다.
단, default 토픽 설정은 replication factor : 1, partitton: 1 로 설정된다.
이 코드에서는 test라는 메시지를 50회 전송하였다. 물론 key값과 value값을 명시해서 produce할 수 있다.
kafka broker monitoring UI : Kafdrop
사실 docker-compose를 올릴때 kafdrop이라는 컨테이너도 함께 올린 것을 확인할 수 있다.
kafdrop은 kafka broker를 모니터링하는 웹 ui이다.
따라서 localhost:9001로 접속하면 broker 정보를 확인할 수 있다. (설정해준 포트번호에 따라 주소가 바뀔수도 있다. 본인은 9001 포트를 할당했다.)
이 처럼 kafka의 상태, topic등 다양한 정보를 확인할 수 있다.
방금 우리가 메시지를 보낸 lone-p1r1의 메시지도 볼 수 있다.
'view message' 를 누르면 메시지 내용도 확인할 수 있다.
consumer.py
이제 kafka 'lone-p1r1'에 있는 메시지를 consume 해보자
from kafka import KafkaConsumer from json import loads from time import sleep import time # 카프카 서버 bootstrap_servers = ["localhost:9095"] # 카프카 토픽 str_topic_name = 'lone-p1r1' # 카프카 consumer group 생성 str_group_name = 'g1' #-------------comsumption data----------- consumer = KafkaConsumer(str_topic_name, #kafka topic name bootstrap_servers= ["localhost:9095"],#kafka server auto_offset_reset='earliest', #가장 처음 offset부터 enable_auto_commit=True, #마지막으로 읽은 offset 위치 commit auto_commit_interval_ms=500, #offset commit 주기, default : 5000 #group_id=str_group_name, #이 consumer가 생성될 consumer group value_deserializer=lambda x: loads(x.decode('utf-8')) #serialize된 메시지를 deserialize ) for event in consumer: event_data = event.value print(event_data) sleep(1)
consumer group을 지정해줄수 있다.
consumer도 여러 설정값이 있다. (설정값 docs : https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html)
'Kafka' 카테고리의 다른 글
Kafka streams로 kafka 내부 데이터 처리 (1) 2022.01.13 Kafka connect로 kafka와 여러 서비스 연결하기 (0) 2022.01.12 Kubernetes 위에 kafka 구축 - strimzi (0) 2022.01.11 Kafka 튜닝, 최적화 방안 (1) 2022.01.10 Apache Kafka란 - 이론 공부 내용 (2) 2021.09.12