쿼카러버의 기술 블로그

[Kafka - step 5] 컨슈머란? (Kafka Consumer) (컨슈머 그룹, 오프셋, 리밸런싱, 주요 옵션 등) 본문

분산 시스템/Kafka

[Kafka - step 5] 컨슈머란? (Kafka Consumer) (컨슈머 그룹, 오프셋, 리밸런싱, 주요 옵션 등)

quokkalover 2022. 4. 4. 00:12

현재 카프카 정리 시리즈를 포스팅 하고 있다. 카프카 정리 시리즈에서 다루는 여러가지 주제가 궁금할 경우 본 글 을 참고하길 바란다.

본 글은 컨슈머 개념을 이해하기 위해 작성했다. 컨슈머에 대한 개념과 구현예시를 중점으로 다룬다.

 

Consumer란?

토픽의 파티션에 저장되어 있는 메시지를 소비하는 역할을 하는 애플리케이션이나 서버를 컨슈머라고 부른다.

컨슈머가 단순하게 카프카로부터 메시지만 가져오는 것 같지만 내부적으로는 컨슈머 그룹, 리밸런싱 등 여러 동작을 수행한다.

컨슈머는 데이터를 요청할 때 리더 파티션을 가지고 있는 카프카 브로커와 통신한다.

Consumer 동작 방식

프로듀서가 카프카의 토픽으로 메시지를 전송하면 해당 메시지들은 브로커들의 로컬 디스크에 저장된다. 그리고 컨슈머는 토픽에 저장된 메시지를 가져온다. 컨슈머의 동작 방식에 대해 이해하기 위해서는 먼저 컨슈머 그룹이라는 개념에 대해 먼저 알아야 할 필요가 있다.

Consumer Group

컨슈머 그룹은 하나 이상의 컨슈머들이 모여있는 그룹을 의미한다. 컨슈머는 반드시 컨슈머 그룹에 속하게 된다. 그리고 이 컨슈머 그룹은 각 파티션의 리터에게 카프카 토픽에 저장된 메시지를 가져오기 위한 요청을 보낸다.

 

컨슈머가 특정 파티션의 데이터를 소비하기 위한 개념이라고 한다면, 컨슈머 그룹은 전체의 데이터 소비 관리를 위해 사용된다. 이를 요약하면 컨슈머 그룹은 토픽을 소비하고, 컨슈머는 파티션을 소비한다고 생각할 수 있다.

 

컨슈머 그룹의 목적은 크게 두 가지로 생각할 수 있다.

  • 가용성 : 특정 컨슈머가 장애가 발생하면 다른 컨슈머가 그 작업을 대체함을 통해 안정성을 확보
  • offset 관리 : 동일한 토픽을 구독하는 여러 컨슈머 그룹이 컨슘하더라도, 컨슈머 그룹별로 오프셋을 관리할 수 있도록 함

컨슈머 그룹 예제

이 때 파티션수와 컨슈머수는 일대일로 매핑되는 것이 이상적이다. 물론 파티션 수와 컨슈머 수의 비율을 반드시 일대일로 매핑해야 하는 것은 아니지만, 파티션 수보다 컨슈머 수가 많게 구현하는 것은 바람직한 구성이 아니다. 컨슈머 수가 파티션 수보다 더 많다고해서 더 빠르게 토픽의 메시지를 가져오거나 처리량이 높아지는 것이 아니라 더 많은 수의 컨슈머들이 그냥 대기 상태로만 존재하기 때문이다.

 

간혹 액티브 / 스탠바이 개념으로 추가 컨슈머가 더 있으면 좋을 것이라 생각할 수 있지만, 컨슈머 그룹 내에서 리밸런싱 동작을 통해 장애가 발생한 컨슈머의 동일한 그룹에 있는 다른 컨슈머가 그 역할을 대신 수행하기 때문에 굳이 장애 대비를 위한 추가 컨슈머 리소스를 할당하지 않아도 된다.

카프카가 이와 같은 제약을 만들게 된 이유는 파티션 내에서의 순서보장을 위해서다. 카프카는 토픽 수준에서의 순서 보장은 지원하지 않는다. 카프카 프로듀서가 첫 번째 메시지를 1번 파티션에 전송하고 두 번째 메시지를 Partiton 3에 전송했을대, Partition1과 Partition3을 소비하는 컨슈머가 같지 않을 수 있기 때문이다.

 

하지만 카프카는 같은 파티션에서의 순서는 보장한다. 즉 프로듀서가 Partition 1으로 첫 번째, 두 번째 메시지를 순차적으로 전송했다고 하면 컨슈머에서도 첫 번째, 두 번째 메시지 순으로 소비됨이 보장된다.

이러한 특성을 이용해 순서가 상관없는 메시지들은 서로 다른 파티션으로 전송하여 처리량과 고가용성을 도모하고, 순서가 중요한 메시지는 키 값과 필요하다면 파티셔너를 이용해 한 파티션으로 몰아서 전송할 수 있다.


다중 컨슈머 그룹

카프카의 큰 장점 중 하나가 다중 컨슈머 그룹을 지원한다는 것이다. 일반적인 메시지 큐는 한 컨슈머가 읽어가면 다른 컨슈머는 그 다음 메시지를 읽어간다.

다중 컨슈머 그룹

하지만 카프카는 컨슈머 그룹 단위로 다른 컨슈머 그룹과는 독립적으로 데이터를 읽어갈 수 있다. 위 그림에서 Consumer Group A와 Consumer Group B는 Topic A의 데이터를 소비할 수 있고 서로에게 간섭하지 않는다. 물론 두 컨슈머 그룹의 컨슈머 구성이 다르기 때문에 두 컨슈머 그룹의 처리량도 다를 것이다. 하지만 한 쪽이 느리게 읽어간다고 해서 다른 한 쪽에 영향을 주지는 않는다. 각 컨슈머 그룹엔 남 신경쓰지 않고 자신이 읽어야 할 메시지를 읽기 때문이다.


컨슈머 오프셋 관리

컨슈머 동작 중 가장 핵심은 오프셋 관리다. 컨슈머는 카프카에 저장된 메시지를 꺼내오는 역할을 하기 때문에, 컨슈머가 메시지를 어디까지 가져왔는지를 표시하는 것이 매우 중요하다. 예를 들어 코드 배포로 인해 컨슈머가 일시적으로 동작을 멈추고 재시작하거나, 컨슈머가 구동죽인 서버에서 문제가 발생해 새로운 컨슈머가 기존 컨슈머의 역할을 대신하는 경우에 기존 컨슈머의 마지막 메시지 위치부터 새로운 컨슈머가 메시지를 가져올 수 있어야 장애로부터 빠르게 복구될 수 있기 때문이다.

카프카에서는 메시지의 위치를 나타내는 위치를 오프셋이라고 부르는데, 이 오프셋은 숫자 형태로 나타낸다. 컨슈머 그룹은 자신의 오프셋 정보를 카프카에서 가장 안전한 저장소인 토픽에 저장한다. __consumer_offsets라는 토픽이 있는데, 이 토픽에 각 컨슈머 그룹별로 오프셋 위치정보가 기록된다. 즉 해당 토픽을 consume 하고 있으면 어떤 Consumer group id가 어느 offset 까지 처리했는지 알 수 있다


리밸런싱

컨슈머들은 하나의 컨슈머 그룹 안에 속해있고, 그룹 내의 컨슈머들은 서로의 정보를 공유한다. 컨슈머 그룹 내의 컨슈머들은 언제든지 자신이 속한 컨슈머 그룹에서 떠날 수 있고, 새로운 컨슈머가 합류할 수도 있다. 따라서 컨슈머 그룹은 이러한 변화를 인지하고 각 컨슈머들에게 작업을 균등하게 분배해야 한다.

 

컨슈머 그룹에서 각 컨슈머들에게 작업을 균등하게 분배하는 동작을 컨슈머 리밸런싱이라고 부른다. 예를 들어 컨슈머 1이 문제가 생겨 종료됐다면 컨슈머 2또는 컨슈머 3이 컨슈머 1이 하던 일을 대신해 토픽의 파티션 0을 컨슘하게 된다. 그리고 이 컨슈머 그룹 관리를 위해 별도의 코디네이터가 존재하는데 이를 카프카에서는 그룹 코디네이터라고 부른다.

그룹코디네이터는 각 컨슈머 그룹별로 존재하며, 이러한 그룹 코디네이터는 카프카 클러스터 내의 브로커 중 하나에 위치한다. 컨슈머 그룹 코디네이터는 아래 정보에 대해 변경이 발생하면 리밸런싱을 실시한다

  • 컨슈머 그룹의 멤버십 변화 : 컨슈머 그룹 내에 컨슈머가 제외되거나 추가됐을 경우
    • 컨슈머는 폴링하거나 커밋할때 Heartbeat를 그룹 코디네이터에 전달하는데, 일정 기간 동안 하트비트를 받지 못하면 작업이 불가한 것으로 판단하고 컨슈머의 파티션 소유권을 다른 컨슈머로 이관한다.
  • 새로운 파티션의 추가 혹은 변경 : 컨슈머 그룹이 구독하고 있는 토픽의 파티션이 추가 혹은 변경 된 경우
    • 파티션이 증가하거나 re-assign으로 변경이 발생하면 파티션에 대한 소유권을 재조정한다.

리밸런싱 작업이 일어나는 동안 컨슈머들은 일시 중지하므로 오버헤드가 큰 편이다. 컨슈머 그룹내의 모든 컨슈머의 읽기 작업이 중단되기 때문에 컨슈머 측의 일시적인 서비스 중단이 발생할 수 있다. 컨슈머 그룹 내에 소수의 컨슈머가 있다면 대수롭지 않게 넘어갈 수 있지만, 10개의 컨슈머가 있다면 최소 10번 이상의 리밸런싱이 발생하고 10번 이상의 컨슈머 중지가 일어난다.

 

스태틱 멤버십

대용량 메시지들을 처리하는 컨슈머 그룹이라면, 리밸런싱 동작으로 인해 원래 상태를 복구하는 데 상당한 시간이 소요될 수 있다. 원래는 매번 새로운 컨슈머로 인식해 새로운 엔티티 ID가 부여된다. 핮디만 컨슈머 설정 변경이나 소프트웨어 업데이트 등의 예측가능한 사유로 컨슈머가 재시작될 때와 같은 원인으로 인한 불필요한 리밸린싱이 발생하는 경우를 방지하기 위해 스태틱 멤버십이라는 기능이 있다. 스태틱 멤버십이란 컨슈머 그룹 내에서 컨슈머가 재시작 등으로 그룹에서 나갔다가 다시 합류하더라도 리밸런싱이 일어나지 않게 한다. 즉 컨슈머마다 인식할 수 있는 ID를 적용함으로써 다시 합류하더라도 코디네이터가 기존 구성원임을 인식할 수 있게 하는 것이다.

 

이 외에도 하트비트 전송 주기(heartbeat.interval.ms)등을 조절한다든지, 리밸런싱의 동작 방식 또한 알아두면 매우 좋은 정보이기는하나, 해당 내용은 추후 별도의 글로 다루도록 하겠다.

 


컨슈머 파티션 할당 전략

컨슈머는 자신의 토픽의 어느 파티션으로부터 레코드를 읽어올지 결정한다. 컨슈머 그룹의 리더 컨슈머가 정해진 파티션 할당 전략에 따라 각 컨슈머와 대상 토픽의 파티션을 매칭시킨다. 파티션 할당 전략은 컨슈머 옵션의 partition.assignment.strategy로 표시하고, RangeAssignor, RoundRobinAssignor, StickyAssignor, CooperativeStickyAssignor라는 총 네가지를 제공한다.

각 전략들의 요약은 아래와 같다.

  • RangeAssignor : 파티션 할당 전략의 기본값으로, 토픽별로 할당 전략을 사용함. 동일한 키를 이용하는 2개 이상의 토픽을 컨슘할 때 유용함
  • RoundRobinAssignor : 사용가능한 파티션과 컨슈머들을 라운드 로빈으로 할당함. 균등한 분배 가능
  • StickyAssignor : 컨슈머가 컨슘하고 있는 파티션을 계속 유지
  • CooperativeStickyAssignor : 스티키 방식과 유사하지만 전체 일시정지가 아닌 연속적인 재조정 방식 사용

컨슈머의 주요 옵션

  • bootstrap.servers : 브로커의 정보를 입력
  • fetch.min.bytes : 한번에 가져올 수 있는 최소 데이터 크기. 지정한 크기보다 작을 경우 요청에 응답하지 않고 데이터가 누적될때까지 기다린다
  • fetch.max.wait.ms : fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 대한 응답을 기다리는 최대 시간
  • group.id : 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자. 동일한 그룹 내의 컨슈머 정보는 모두 공유된다.
  • heartbeat.interval.ms : 컨슈머가 active한지 확인하는 주기. session.timeout.ms와 밀접한 관계가 있으며, session.timeout.ms보다 낮은 값으로 설정해야 한다. 일반적으로 sesion.timeout.ms의 1/3으로 설정한다
  • session.timeout.ms : 컨슈머가 종료된 것인지를 판단하는 시간. 컨슈머는 주기적으로 하트비트를 보내야 하고, 이 시간 전가지 하트비트를 보내지 않았으면 해당 컨슈머는 종료된 것으로 간주
  • max.partition.fetch.bytes : 파티션당 가져올 수 있는 최대 크기
  • enable.auto.commit : 백그라운드로 주기적으로 오프셋을 커밋
  • auto.offset.reset : 카프카에 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않는 경우에 다음 옵션으로 reset한다
    • earliest : 가장 초기의 오프셋값으로 설정
    • latest : 가장 마지막의 오프셋값으로 설정
    • none : 이전 오프셋값을 찾지 못하면 에러
  • fetch.max.bytes : 한 번의 가져오기 요청으로 가져올 수 있는 최대 크기
  • group.instance.id : 컨슈머의 고유한 식별자. 만약 설정하면 static 멤버로 간주되어, 불필요한 리밸런싱을 하지 않는다.
  • isolation.level : 트랜잭션 컨슈머에 사용되는 옵션으로 read_uncommited가 기본값이고, read_committed는 트랜잭션이 완료된 메시지만 읽는다
  • max.poll.records : 한번의 poll()요청으로 가져오는 최대 메시지 수
  • partition.assignment.strategy : 파티션 할당 전략이며 기본값은 range다
  •  

카프카는 파티션내의 메시지 순서만 보장

카프카는 각각의 파티션에 대해서만 순서를 보장한다.

1개의 파티션만 있는 경우에는 프로듀서가 보낸 순서대로 데이터를 가져올 수 있지만,

파티션이 여러개인 경우에는 프로듀서가 보낸 순서대로 메시지를 가져올 수 없다. (분산처리 / 병렬 처리 이기때문)


 

본 글에서는 카프카의 필수 구성요소인 컨슈머에 대해 다루었다. 

 

다음 글은 실제로 프로듀서-카프카-컨슈머를 어떻게 연동하고, 활용하는지와 같은 real life example을 다룰 예정이다.

 

 

 

 

 

참고자료

https://yearnlune.github.io/general/kafka/#topic

https://always-kimkim.tistory.com/entry/kafka101-consumer-rebalance

Comments