ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka streams로 kafka 내부 데이터 처리
    Kafka 2022. 1. 13. 14:16

    Kafka streams

    Kafka StreamsStream API로 구축된 애플리케이션이고, 브로커와 별도로 구성된다.

    //Stream API : 컬렉션, 배열등의 저장 요소를 하나씩 참조하며 함수형 인터페이스(람다식)를 적용하며 반복적으로 처리할 수 있도록 해주는 기능

    kafka 내부에서 메시지 파이프라인을 구성한다고 생각할 수 있다. 따라서 브로커의 특정 토픽을 구독하여 일련의 로직을 처리한 뒤에, 다시 다른 토픽으로 publish한다. 쉽게 말하면 kafka내부에 있는 데이터에 가공처리한 후 다시 다른 토픽이나 같은 토픽으로 넣어주는 역할을 한다.

    Kafka streams는 라이브러리로 제공되기 때문에 단순히 main 함수 내에서도 구현이 가능하고, 특정한 프레임워크에 종속되지 않을 수 있다.

    Kafka streamsjavascala 언어를 지원하고 있다.

     

    Streams의 특징

    Kafka Streams application은 내부적으로 consumer API를 사용하기 때문에 여러 이점을 가진다.

    l  확장에 유연함

    l  장애 수용성을 가짐

    l  분산 형태로 구성 가능

     

    그렇기 때문에 스트림 애플리케이션도 마찬가지로 확장에 유연하고, 장애 수용성을 가지고, 분산 형태로 구성할 수 있다.

     

    Streams의 구성

    kafka streams는 스트림 처리를 하는 프로세스들이 서로 연결되어 토폴로지(topology, 네트워킹 구조내에서의 물리적인 구성)를 만들어서 처리하는 API이다.

     

     

    구성요소로는

    • Source processor
      • 최상위 프로세서, kafka 토픽들에서 데이터를 읽어 아래 프로세서에 전달
    • Stream Processor
      • 중간 프로세서, 데이터를 변환한 후, 다음 프로세서에 저장
    • Sink Processor
      • 최하위 프로세서, 상위 프로세서로부터 받은 데이터를 kafka 특정 토픽에 저장

     

    따라서 구현할 수 있는 기능들은

    • 토픽 내부 민감 데이터 마스킹
    • 다양한 source로부터 수집된 메시지 규격화
    • 일정 간격으로 토픽 내부의 특정 이벤트 감지
    • 특정 필드를 이용한 파이프라인 조인

    등이 있다. 그리고 이 기능들을 구현할 수 있도록 Streams APIfilter(), map(), groupBy() 등 다양한 처리, 집계 함수를 제공하고 있다.

     

    Kafka streams 구현방식

    두가지 구현 방식이 있다.

     

     

    Stream DSL (Domain Specific Language) Stream Processor API를 사용해서 구현되어 있다.

    이 방식은 미리 제공되는 함수들을 이용해서 토폴로지를 정의하는 방식이다. 이 방식은 스트림과 테이블에 대한 추상화(KStream, KTable, GlobalKTable)를 제공하고, 선언적인 함수형 스타일의 stateless transformantion을 제공한다(map, filter ). 뿐만 아니라 stateful transformation을 제공한다. (aggregation, join, windowing) 따라서 이 방식은 Processor API보다 비교적 추상적이고 사용하기가 쉽다.

    //stateful transformation:  Streams가 작동할때 상태값을 저장한다는 것이다. 즉 n번째 상태값이 n+1번째 결과값에 영향을 미친다는 것이다

     

     

    Processor API 은 정의된 함수를 사용하지 않고 직접 구현해야 한다. 구현하기에는 어렵지만 그만큼 정교한 로직을 짤 수 있다. Processor API를 통해서 임의의 스트림 프로세서를 정의할 수 있으며 Processor API Stateless, Stateful 오퍼레이션을 구현하기 위해 사용될 수 있다.

    (이 프로세서를 관련된 State Store와 연관시켜 Processor Topology를 구성할 수 있다.)

    //State Stores: Stateful한 Processor 혹은 Transformer를 구현하기 위해서는, 1개 이상의 이 State Store를 제공해야한다. (Stateless Processor 혹은 transformer는 State Store가 필요하지 않다) State Store는 최근에 받은 input records를 저장하기 위해서 사용되기도 하며, input records의 중복을 제거하기 위해서도 사용된다. State Store의 또 다른 특징은 외부 어플리케이션이 State Store에 쿼리할 수 있다는 것이다.

     

    Kafka streams 활용

    로컬환경에서 docker-compose로 띄워둔 kafka에 대해 테스트하였다.

    Scala 언어를 사용해서 kafka streams API를 사용하였고, Stream DSL방식으로 구현하였다.

     

    본인 github에도 정리해두었으니 참고바란다. (kubernetes에 올려 동작시키는 방법)

    https://github.com/JackCokebb/kafka-all#kafka-streams

     

    GitHub - JackCokebb/kafka-all

    Contribute to JackCokebb/kafka-all development by creating an account on GitHub.

    github.com

     

     

     

    Localhost에 있는 kafka와 연결하였고 kafka 내부 토픽 lone-p1r1-2에 있는 데이터를 가져와 address 정보를 *****로 마스킹하는 로직이다. 그 후에 새로운 토픽 lone-p1r1-output에 저장한다.

     

    마스킹 작업이 성공적으로 된 것을 확인할 수 있다.

     

     

    출처 및 참고 

    https://always-kimkim.tistory.com/entry/kafka101-streams?category=876258

    https://t3guild.com/2020/04/30/kafka-%EC%8B%A4%EC%A0%84-%EC%BD%94%EB%93%9C-with-strimzi/

    https://strimzi.io/docs/operators/latest/overview.html

    https://betterprogramming.pub/learn-stream-processing-with-kafka-streams-stateless-operations-2111080e6c53

    https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-

    https://gunju-ko.github.io/kafka/kafka-stream/2018/05/28/Stream-DSL.html

     

     

JackCokebb dev blog