분산 시스템/Kafka

[Kafka - step 4] 프로듀서란? (Kafka Producer) (파티셔너, 컴프레션, 배치전송, 그리고 설정 방법)

quokkalover 2022. 4. 4. 00:04

현재 카프카 정리 시리즈를 포스팅 하고 있다. 카프카 정리 시리즈에서 다루는 여러가지 주제가 궁금할 경우 본 글 을 참고하길 바란다.

 

본 글은 프로듀서의 개념을 이해하기 위해 작성했다. 프로듀서에 대한 개념과 기본적인 동작방식그리고 간단한 파이썬 프로듀서 구현체 대해 다룬다.

 

 

 

Producer란?

프로듀서는 메시지를 생산해서 카프카의 토픽으로 메시지를 보내는 역할을 담당한다.

프로듀서는 데이터를 전송할 때 리더 파티션을 가지고 있는 카프카 브로커와 직접 통신한다.

 


Producer 동작방식

먼저 프로듀서의 전체 흐름을 아래 그림과 함께 설명하겠다.

 

Producer Record

카프카로 전송하기 위한 실제 데이터며, 레코드는 토픽, 파티션, 키, 밸류로 구성된다.

 

프로듀서가 카프카로 레코드를 전송할 때 카프카의 특정 토픽으로 메시지를 전송한다. 따라서 레코드에서 토픽과 밸류는 필숫값이고, 특정 파티션을 지정하기 위한 레코드의 파티션과 특정 파티션에 레코드를 정렬하기 위한 레코드의 키는 필숫값이 아닌 선택사항이다.


Send() & Serializer

 

Send()메소드를 통해 메시지가 직렬화되는데, 이 때 지정된 설정대로 producer record는 바이트 뭉치 형태로 변환된다.


Partitioner

파티셔너는 프로듀서 어플리케이션 내에서 생성된 메시지를 카프카에 보낼 때 이 메시지가 어떤 토픽의 어떤 파티션에 전달될지를 정하는 역할을 담당한다.

producer record의 선택사항인 파티션에 이미 지정돼 있다면 파티셔너는 아무 동작도 하지 않고 지정된 파티션으로 레코드를 전달한다.

 

프로듀서는 레코드들을 파티션별로 잠시 모아두는 배치 방식으로 데이터를 전송한다.

파티션을 지정하지 않은 경우에는 키를 가지고 파티션을 선택해 레코드를 전달하는데, 프로듀서 API를 사용하는 경우 크게 아래 두 가지 방식의 파티셔너가 제공된다.

  • Round Robing Partitioner
    • 메시지가 발생하는대로 파티션을 순회하면서 전송한다. 브로커와의 잦은 통신으로 인한 단점으로 인해 2.5부터는 배치 전송방식을 사용하는 Uniform Sticky Partitioner가 기본 파티셔너다.
  • Unifrom Sticky Partitioner
    • Round Robin의 브로커와의 잦은 통신을 해결하기 위해 데이터가 배치로 모두 묶일 때 까지 기다린 뒤 묵여진 데이터 덩어리는 모두 동일한 파티션에 전송함으로써 높은 처리량과 낮은 리소스 사용률을 가진다.

Compression

메시지 압축이 설정되었다면 설정된 포맷에 맞추어 메시지를 압축한다. 압축된 메시지는 브로커로 빠르게 메시지를 전달할 수 있을 뿐 아니라 브로커 내부에서 빠른 복제가 가능하도록 한다. 아래 테이블 비교표를 보고 알맞는 압축 포맷을 선택할 수 있다.


배치 전송

파티셔닝과 압축을 마친 후에 프로듀서는 메시지를 TCP프로토콜을 통해 브로커 리더 파티션으로 전송한다. 하지만 앞서 말했듯 카프카는 네트워크 왕복 오버헤드를 줄이기 위해 지정된 만큼 메시지를 저장했다가 한번에 브로커로 배치 전송한다. 이 과정은 프로듀서 내부의 Record Accumulator가 담당하여 처리한다. RA는 각 토픽 파티션에 대응하는 배치 큐를 구성하고 메시지들은 레코드 배치 형태로 묶여 큐에 저장한다.


Sender Thread

각 배치큐에 저장된 레코드 배치들은 때가 되면 각각 브로커에 전달된다. 이 과정은 Sender가 처리한다. Sender는 스레드 형태로 구성되며, 관리자가 설정한 특정 조건에 만족한 레코드 배치를 브로커로 전송한다. 이 때 Sender 쓰레드는 네트워크 비용을 줄이기 위해 piggyback(뒤에 업다)방식으로 조건을 만족하지 않은 다른 레코드 배치를 조건을 만족한 것과 함께 브로커로 전송한다. 아래 그림에서 토픽 A의 파티션2가 함께 전송되는게 그 예다.

브로커에 네트워크 전송 요청을 보낸 Sender는 설정 값에 따라 브로커의 응답을 기다리거나 혹은 기다리지 않는다. 만약 기다리지 않는 설정인 경우, 메시지의 전송에 대한 과정이 끝난다. 하지만 응답을 기다리는 경우에는 메시지 전송 성공 여부를 응답으로 받는다. 이 때, 브로커에서 메시지 전송이 실패한 경우에는 설정 값에 따라 재시도한다. 재시도 횟수를 초과한 경우에는 예외를 뱉어낸다. 반대로 성공한 경우에는 메시지가 저장된 정보를 반환한다.

메타데이터는 메시지가 저장된 토픽, 파티션, 오프셋, 타임스탬프 정보를 가지고 있다.


프로듀서의 주요 설정

카프카를 이용해 메시지를 전송하고자 한다면 프로듀서의 옵션을 잘 파악해야 한다.

  • 메시지를 손실없이 보내고 싶은 경우
  • 메시지를 빠르게 보내고 싶은 경우

등 다양한 요구사항들이 있기 때문에, 이러한 니즈를 잘 파악하고 프로듀서를 다뤄야 좀 더 효율적으로 사용할 수 있다.

  • bootstrap.servers : 카프카 클러스터와 연결 시 사용되는 설정. 브로커의 호스트와 포트 정보
  • compression.type : 압축 타입에 관한 설정. none, gzip, snappy, lz5, zstd등 원하는 타입 설정 가능
  • buffer.memory : Record Accumulator에서 카프카로 데이터를 보내기 위해 잠시 대기할 수 있는 메모리의 총 양(전체 메모리 바이트)
  • batch.size : 레코드 배치의 크기에 관한 설정(bytes). 레코드 배치가 해당 설정 값만큼 쌓이면 메시지를 전송. 적절한 배치 크기 설정은 성능에 도움을 준다.
  • linger.ms : 레코드 배치의 최대 전송 대기 시간 설정(ms). batch.size에 만족하지 않더라도 레코드 배치는 이 설정에 따라 전송.
  • acks : 프로듀서가 전송 후 브로커의 응답을 기다리는 설정. '0' 일 경우 응답을 기다리지 않고(일부 메시지 손실 가능성), '1' 일 경우 리더 파티션의 응답만 기다린다. (대부분 1로 설정함)'all' 혹은 '-1' 일 경우 팔로워가 메시지를 받았는지 여부를 확인. (메시지 손실 가능성 없음)
  • retires : 일시적인 오류로 인해 전송에 실패한 데이터를 다시 보내는 횟수
  • max.in.flight.requests.per.connection : 하나의 커넥션에서 프로듀서가 ACK없이 전송할 수 있는 요청 수. 메시지의 순서가 중요하다면 1로 설정할 것을 권장하지만, 성능은 떨어질 수 있다.

Producer 설정시 유의해야 하는 값

카프카 프로듀서 설정 값은 요구되는 메시지 전송 환경에 따라 적합하지 않을 수 있다. 안정성 보장, Ordered Delivery, Data Durability 등과 같은 요구사항에 따라 적절한 값을 설정하는게 필요하다.

 

따라서 처리량과 지연율, 지연율과 전송 안정성 사이에서의 트레이드 오프를 고려하는게 좋다. 이것 뿐 아니라 다른 조합도 있기는 하지만 간략하게 소개한다.

  • 처리량과 지연율 trade off :
    • batch.size, linger.ms : 설정 값이 높을수록 처리량은 증가하지만, 그만큼 저장되었다가 전송되므로 지연율이 높아진다.
    • 예:
    • linger.ms=500 batch.size=16384 buffer.memory=33554432
  • 안정성과 지연율 :
    • acks 설정이 0 -> 1 -> all 로 설정될수록 메시지 전송 안정성은 증가하지만, 그만큼 응답을 대기해야 하므로 지연율이 높아진다.
    • 안정성을 보장하기 위해 아래와 같은 설정을 해볼 수 있겠다.
    • enable.idempotence=true max.in.flight.requests.per.connection=5 acks=all retries=2147483647 transactional.id=_UNIQUE-ID_ transaction.timeout.ms=900000

 

Producer Python 구현 예시

from kafka import KafkaProducer 
from json import dumps 
import time 

producer = KafkaProducer(acks=0, compression_type='gzip', bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8')) 

start = time.time() 
for i in range(10000): 
    data = {'str' : 'richet'+str(i)+'message'} 
    producer.send('test', value=data)  # 'test' = topic
    producer.flush() 
    print("elapsed :", time.time() - start)

 


본 글은 카프카의 프로듀서가 무엇인지에 대해 간단하게 알아보았다.

 

이제 프로듀서를 알아보았으니 컨슈머에 대해 알아볼 차례다.  다음 글은 예상했겠지만 카프카의 컨슈머에 대해 알아본다.

컨슈머가 무엇인지에 대해 알아본 뒤에는 실제로 프로듀서-카프카-컨슈머를 어떻게 연동하고, 활용하는지와 같은 real life example을 다룰 예정이다.

 

 

 

참고자료

https://needjarvis.tistory.com/607

https://always-kimkim.tistory.com/entry/kafka101-producer

https://needjarvis.tistory.com/607

https://ichi.pro/ko/apache-kafka-silijeu-1bu-apache-kafka-sogae-10688303144962