분산 시스템/Kafka

[Kafka - step 7] 카프카 실습 (도커로 직접 카프카 실행해보기)

quokkalover 2022. 4. 4. 00:28

현재 카프카 정리 시리즈를 포스팅 하고 있다. 카프카 정리 시리즈에서 다루는 여러가지 주제가 궁금할 경우 본 글을 참고하길 바란다.

 

 

앞 글에서 카프카가 실제로 어떻게 활용되는지에 대해 알아보았다.

 

본 글에서는 카프카를 직접 도커를 활용하여 직접해보기 위해 예제를 아주 잘 정리해준 을 번역하고, 일부 수정한 글이다.

 

Setup - 실행환경 준비

먼저 아파치 카프카 도커 컨테이너를 실행해보자.

 

Confluent의 도커 이미지르 사용할 수도 있지만 필요 메모리가 8GB가 넘기 때문에 wurstmeister의 이미지를 fork한 내 개인 레포를 사용한다.

cd <your-directory>
git clone https://github.com/getveryrichet/kafka-docker.git
cd kafka-docker

 

카프카와 주키퍼 도커 컨테이너를 실행한다.

docker-compose up -d

docker_compose.yml에서 KAFKA_* 로 적힌 config들은 Kafka와 Zookeeper를 도커 컨테이너 밖에서 실행되는 Producer와 Consumer를 연결하는데 사용된다.

 

 

컨테이너가 정상 동작하는지 확인한다

docker-compose ps

 

 

파이썬으로 producer와 consumer를 구현할 것이기 때문에 Python이 카프카 라이브러리를 설치하자

pip install kafka-python

 

참고로 본 예제에서는 별도로 토픽을 생성하진 않을건데, 카프카에서는 브로커의 세팅에 auto.create.topics.enable를 true로 설정해주어 토픽이 생성돼있지 않아도 처음 메시지가 전송되면 기본세팅으로 토픽을 자동 생성해주도록 했다.


 

Producer & Consumer실습 코드

코드는 아래 레포를 참고하길 바란다.

https://github.com/getveryrichet/kafka-docker/tree/master/examples/simple_example

 

GitHub - getveryrichet/kafka-docker: Dockerfile for Apache Kafka

Dockerfile for Apache Kafka. Contribute to getveryrichet/kafka-docker development by creating an account on GitHub.

github.com

Producer

from datetime import datetime
import json
from kafka import KafkaProducer
import random
import time
import uuid

EVENT_TYPE_LIST = ['buy', 'sell', 'click', 'hover', 'idle_5']

producer = KafkaProducer(
   value_serializer=lambda msg: json.dumps(msg).encode('utf-8'), # we serialize our data to json for efficent transfer
   bootstrap_servers=['localhost:9092'])

TOPIC_NAME = 'events_topic'

def _produce_event():
    """
    Function to produce events
    """
    # UUID produces a universally unique identifier
    return {
        'event_id': str(uuid.uuid4()),
        'event_datetime': datetime.now().strftime('%Y-%m-%d-%H-%M-%S'),
        'event_type': random.choice(EVENT_TYPE_LIST)
    }

def send_events():
    while(True):
        data = _produce_event()
        time.sleep(3) # simulate some processing logic
        print("sending", data)
        result = producer.send(TOPIC_NAME, value=data)
        # print("result", result.__dict__)

if __name__ == '__main__':
    send_events()

위 예시는 랜덤으로 유저의 액션을 생성해서 토픽으로 전송하느 프로듀서의 예시다.

consumer

import json
from kafka import KafkaConsumer

TOPIC_NAME = 'events_topic'

consumer = KafkaConsumer(
    TOPIC_NAME,
    auto_offset_reset='earliest', # where to start reading the messages at
    group_id='event-collector-group-1', # consumer group id
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')) # we deserialize our data from json
)

def consume_events():
    for m in consumer:
        # any custom logic you need
        print("received", m.value)

if __name__ == '__main__':
    consume_events()

프로듀서와 컨슈머 실행

자 이제 프로듀서와 컨슈머를 구현했으니 한번 각각 실행해보자.

cd {git repo}/examples/simple_example

python producer.py 
python consumer.py

종료

pkill -f producer.py
pkill -f consumer.py

 

kafka에서 직접 확인

앞서 실행한 프로듀서를 통해 토픽이 진짜 생성됐는지 확인해보자.

docker exec  -it $(docker ps -aqf "name=kafka-docker_kafka") bash # get inside the Kafka container
$KAFKA_HOME/bin/kafka-topics.sh --list  --bootstrap-server kafka:9092 # list all the topics in this Kafka cluster

# you will see the topics
# __consumer_offsets
# events_topic

생성됐다!!


토픽 메시지 확인

생성된 토픽에 앞서 프로듀서가 전송한 메시지들이 잘 저장됐는지 확인해보자

# view messages stored in the `events_topic` topic
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic events_topic --from-beginning

exit # exit the container

 


자, 이제 직접 도커를 활용해 카프카를 실행해서, 토픽을 생성하고 프로듀서와 컨슈머를 직접 연동해보았다.

 

이제 Ansible와 같은 툴을 활용해 AWS 인스턴스에 카프카 클러스터를 직접 구성해보는 등을 시도해볼 수 있겠다.

필자가 읽었던 책에서 해당 내용에 대해 자세히 다루고 있으니, 본 글이 재밌었다면 <실전 카프카 개발부터 운영까지>를 한번 읽어보기를 추천한다.

 

 

 

 

 

참고자료

https://www.startdataengineering.com/post/what-why-and-how-apache-kafka/