-
Kafka streams로 kafka 내부 데이터 처리Kafka 2022. 1. 13. 14:16
Kafka streams
Kafka Streams는 Stream API로 구축된 애플리케이션이고, 브로커와 별도로 구성된다.
//Stream API : 컬렉션, 배열등의 저장 요소를 하나씩 참조하며 함수형 인터페이스(람다식)를 적용하며 반복적으로 처리할 수 있도록 해주는 기능
kafka 내부에서 메시지 파이프라인을 구성한다고 생각할 수 있다. 따라서 브로커의 특정 토픽을 구독하여 일련의 로직을 처리한 뒤에, 다시 다른 토픽으로 publish한다. 쉽게 말하면 kafka내부에 있는 데이터에 가공처리한 후 다시 다른 토픽이나 같은 토픽으로 넣어주는 역할을 한다.
Kafka streams는 라이브러리로 제공되기 때문에 단순히 main 함수 내에서도 구현이 가능하고, 특정한 프레임워크에 종속되지 않을 수 있다.
Kafka streams는 java와 scala 언어를 지원하고 있다.
Streams의 특징
Kafka Streams application은 내부적으로 consumer API를 사용하기 때문에 여러 이점을 가진다.
l 확장에 유연함
l 장애 수용성을 가짐
l 분산 형태로 구성 가능
그렇기 때문에 스트림 애플리케이션도 마찬가지로 확장에 유연하고, 장애 수용성을 가지고, 분산 형태로 구성할 수 있다.
Streams의 구성
kafka streams는 스트림 처리를 하는 프로세스들이 서로 연결되어 토폴로지(topology, 네트워킹 구조내에서의 물리적인 구성)를 만들어서 처리하는 API이다.
구성요소로는
- Source processor
- 최상위 프로세서, kafka 토픽들에서 데이터를 읽어 아래 프로세서에 전달
- Stream Processor
- 중간 프로세서, 데이터를 변환한 후, 다음 프로세서에 저장
- Sink Processor
- 최하위 프로세서, 상위 프로세서로부터 받은 데이터를 kafka 특정 토픽에 저장
따라서 구현할 수 있는 기능들은
- 토픽 내부 민감 데이터 마스킹
- 다양한 source로부터 수집된 메시지 규격화
- 일정 간격으로 토픽 내부의 특정 이벤트 감지
- 특정 필드를 이용한 파이프라인 조인
등이 있다. 그리고 이 기능들을 구현할 수 있도록 Streams API는 filter(), map(), groupBy() 등 다양한 처리, 집계 함수를 제공하고 있다.
Kafka streams 구현방식
두가지 구현 방식이 있다.
Stream DSL (Domain Specific Language)는 Stream Processor API를 사용해서 구현되어 있다.
이 방식은 미리 제공되는 함수들을 이용해서 토폴로지를 정의하는 방식이다. 이 방식은 스트림과 테이블에 대한 추상화(KStream, KTable, GlobalKTable)를 제공하고, 선언적인 함수형 스타일의 stateless transformantion을 제공한다(map, filter 등). 뿐만 아니라 stateful transformation을 제공한다. (aggregation, join, windowing) 따라서 이 방식은 Processor API보다 비교적 추상적이고 사용하기가 쉽다.
//stateful transformation: Streams가 작동할때 상태값을 저장한다는 것이다. 즉 n번째 상태값이 n+1번째 결과값에 영향을 미친다는 것이다
Processor API 은 정의된 함수를 사용하지 않고 직접 구현해야 한다. 구현하기에는 어렵지만 그만큼 정교한 로직을 짤 수 있다. Processor API를 통해서 임의의 스트림 프로세서를 정의할 수 있으며 Processor API는 Stateless, Stateful 오퍼레이션을 구현하기 위해 사용될 수 있다.
(이 프로세서를 관련된 State Store와 연관시켜 Processor Topology를 구성할 수 있다.)
//State Stores: Stateful한 Processor 혹은 Transformer를 구현하기 위해서는, 1개 이상의 이 State Store를 제공해야한다. (Stateless Processor 혹은 transformer는 State Store가 필요하지 않다) State Store는 최근에 받은 input records를 저장하기 위해서 사용되기도 하며, input records의 중복을 제거하기 위해서도 사용된다. State Store의 또 다른 특징은 외부 어플리케이션이 State Store에 쿼리할 수 있다는 것이다.
Kafka streams 활용
로컬환경에서 docker-compose로 띄워둔 kafka에 대해 테스트하였다.
Scala 언어를 사용해서 kafka streams API를 사용하였고, Stream DSL방식으로 구현하였다.
본인 github에도 정리해두었으니 참고바란다. (kubernetes에 올려 동작시키는 방법)
https://github.com/JackCokebb/kafka-all#kafka-streams
Localhost에 있는 kafka와 연결하였고 kafka 내부 토픽 lone-p1r1-2에 있는 데이터를 가져와 address 정보를 *****로 마스킹하는 로직이다. 그 후에 새로운 토픽 lone-p1r1-output에 저장한다.
마스킹 작업이 성공적으로 된 것을 확인할 수 있다.
출처 및 참고
https://always-kimkim.tistory.com/entry/kafka101-streams?category=876258
https://t3guild.com/2020/04/30/kafka-%EC%8B%A4%EC%A0%84-%EC%BD%94%EB%93%9C-with-strimzi/
https://strimzi.io/docs/operators/latest/overview.html
https://gunju-ko.github.io/kafka/kafka-stream/2018/05/28/Stream-DSL.html
'Kafka' 카테고리의 다른 글
Kafka connect로 kafka와 여러 서비스 연결하기 (0) 2022.01.12 Kubernetes 위에 kafka 구축 - strimzi (0) 2022.01.11 Kafka 튜닝, 최적화 방안 (1) 2022.01.10 Kafka docker-compose로 구축 (1) 2021.09.17 Apache Kafka란 - 이론 공부 내용 (2) 2021.09.12 - Source processor