ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka connect로 kafka와 여러 서비스 연결하기
    Kafka 2022. 1. 12. 12:30

    Kafka connect???

    kafka connect는 kafka와 다른 데이터 시스템 사이에 데이터를 스트리밍해주는 오픈소스 툴이다. kafka connect는 쉽고 간단하게 메시지 파이프를 구성하도록 도와준다. 데이터베이스 전체를 읽을 수 있고, metrics 정보도 모을 수 있다.

     

     

    kafka connect 구성

    • connector
      • connect 내부의 실제 메시지 파이프라인 
      • connect에 connector를 구성할 땐 connect에 connector 관련 설정을 전달하면 됨
      • 구성된 connector는 주기적으로 메시지를 확인하고,
      • 새로운 메시지가 있으면 파이프라인을 통해 흘려 보낸다.
    • Source connector
      • 외부 애플리케이션에서 kafka로 데이터를 가져오는 역할
      • 외부 애플리케이션 -> connect -> kafka
    • sink connector 
      • kafka에서 외부 애플리케이션으로 데이터를 보내는 역할
      • kafka -> connect -> 외부 애플리케이션

    Connect에는 단일 모드와 분산 모드가 있는데, 단일 모드는 파일형태로, 분산 모드는 REST API로도 전달할 수 있다.

    Connect는 일종의 템플릿이 구현되어 있고, 이 템플릿과 설정 값을 기준으로 인스턴스를 생성하는 방식이다.

    connect에서는 이 템플릿을 plugin이라고 정의한다.

     

    connector 인스턴스를 생성하면, connect 워커 프로세스 내부에 테스크들이 생성되고, 파이프라인이 구동된다.

    테스크 내부에는 외부 시스템간의 메시지포맷을 변환하는 컨버터가 구성되고, 필요시 transform을 통해 간단한 처리를 실시한다.

    따라서 Connect는 이미 구현된 plugin, 환경에 맞게 설정 값만 넣어주면 쉽고 간편하게 파이프라인을 구성할 수 있다.

    복잡한 로직이 없는 파이프라인 구성에 큰 강점을 가진다고 할 수 있다.

    여러 데이터 시스템에 대해서 이미 만들어진 plugin 들이 많기 때문에, 쉽게 파이프라인을 구성할 수 있게 되었다.

     

    Kafka connect - mongoDB와 연결

    본인 github에도 코드와 함께 자세하게 정리해두었다.

    https://github.com/JackCokebb/kafka-all#kafka-connect

     

    GitHub - JackCokebb/kafka-all

    Contribute to JackCokebb/kafka-all development by creating an account on GitHub.

    github.com

     

     

    kafka connect가 사용할 image 생성 - Dockerfile 생성

    mongoDB connector plugin을 포함하고 있는 connect cluster를 생성해보자.

    Dockerfile은 kafka connect가 사용할 image파일 생성 용도이다. 

     

    connect cluster를 kubernetes에 배치할때 사용하는 image에, 각 서비스에서 제공하는 connector plugin들을 포함해야한다. plugin 파일은 서비스 홈페이지에서 받을 수 있다. //예시: mongoDB connector plugin

     

    Install the MongoDB Kafka Connector — MongoDB Kafka Connector

    Docs Home → MongoDB Kafka ConnectorLearn how to install the MongoDB Kafka Connector. The MongoDB Kafka Connector is available for Confluent Platform and Apache Kafka deployments. To see installation instructions for your deployment type, navigate to one

    docs.mongodb.com

     

    strimzi kafka를 image베이스로 하고 plugin이 들어있는 directory를 마운트해준다.

    Dockerfile을 작성할때, plugin이 담겨있는 directory를 COPY하는 부분에서 주의할 점은 image build할때 생성되는, 즉 plugin 파일들을 복사해서 넣어줄 plugin directory에 바로 plugin JAR 파일들을 넣으면 안되고, 서비스마다 directory를 만들어서 한번에 넣어줘야 전체를 읽어들인다.

     

    <MongoDB connector plugin 구성>

     

    이후 Docker image를 빌드하고 개인 레포지토리(docker repository - https://hub.docker.com/)에 push 하였다.

     

    kafka connect resource 생성

    kafka connect 라는 커스텀 리소스를 생성하고 kubectl apply 해주면 kubernetes에 올려가있는 cluster operator가 kafka connect cluster를 생성해준다. 

    빨간 박스의 image를 보면 개인 레포지토리에 push한 image를 사용했음을 알 수 있다.

     

    Lens로 모니터링 - kafka connect cluster가 생성된것을 확인

    kafka connector resource 생성

    kafka connect cluster가 생성되었으면 connector를 생성해줘야 한다. 어떤 connector를 생성하기 위해 connector resource를 정의하면 connect cluster에서 connector instance가 생기는 것처럼 이해할 수 있다.

     

     

    strimzi.io/cluster에는 이전에 생성해준 cluster의 이름을 넣어주고 

    connection.uri는 mongoDB의 uri를 입력해준다. 가린 부분은 mongoDB에서 사용하는 user와 password를 <user:password>형식으로 입력했다. (ex. mongodb://user123:password333@mongo-cluster-rs0~~~~)

    database에는 가져오고자하는 데이터의 db 이름을 적어준다. jee라고 적게되면 jee라는 db에 있는 데이터들을 가져오겠다는 뜻이다. copy.existing을 true로 하면 db에 이미 저장된 데이터부터 가져오고 false면 연결 이후 생긴 데이터들만 가져온다.

     

    작성한 커스텀 리소스를 kubectl apply 해주면 connector instance가 생기고 작성한 내용을 바탕으로 작업을 실행한다.

    위에 작성한대로 jee라는 데이터베이스에서 data를 가져와 새로운 topic안에 저장한다.

    mongodb-source-connector.yaml 파일에서 지정해준DB의 collection 에서 data를 가져와 kafka의 새로운 topic에 저장된다. 저장되는 topic의 이름은 default로는 <DBname>.<collectionName>으로 저장된다.

    따라서 jee라는 db안에 patients라는 collection이 있어 jee.patients라는 toipic이 생성된다.

     

    kafka console consumer로 jee.patient를 consume해본 모습

    이 토픽을 consume해보면 데이터가 잘 저장되어 있음을 알 수 있다.

     

     

    참고 및 출처

    https://always-kimkim.tistory.com/entry/kafka101-connect

    https://docs.mongodb.com/kafka-connector/current/kafka-source/

    https://t3guild.com/2020/04/30/kafka-%EC%8B%A4%EC%A0%84-%EC%BD%94%EB%93%9C-with-strimzi/

    https://strimzi.io/docs/operators/latest/overview.html

    'Kafka' 카테고리의 다른 글

    Kafka streams로 kafka 내부 데이터 처리  (1) 2022.01.13
    Kubernetes 위에 kafka 구축 - strimzi  (0) 2022.01.11
    Kafka 튜닝, 최적화 방안  (1) 2022.01.10
    Kafka docker-compose로 구축  (1) 2021.09.17
    Apache Kafka란 - 이론 공부 내용  (2) 2021.09.12
JackCokebb dev blog