[Kafka - step 7] 카프카 실습 (도커로 직접 카프카 실행해보기)
현재 카프카 정리 시리즈를 포스팅 하고 있다. 카프카 정리 시리즈에서 다루는 여러가지 주제가 궁금할 경우 본 글을 참고하길 바란다.
앞 글에서 카프카가 실제로 어떻게 활용되는지에 대해 알아보았다.
본 글에서는 카프카를 직접 도커를 활용하여 직접해보기 위해 예제를 아주 잘 정리해준 글을 번역하고, 일부 수정한 글이다.
Setup - 실행환경 준비
먼저 아파치 카프카 도커 컨테이너를 실행해보자.
Confluent의 도커 이미지르 사용할 수도 있지만 필요 메모리가 8GB가 넘기 때문에 wurstmeister의 이미지를 fork한 내 개인 레포를 사용한다.
cd <your-directory>
git clone https://github.com/getveryrichet/kafka-docker.git
cd kafka-docker
카프카와 주키퍼 도커 컨테이너를 실행한다.
docker-compose up -d
docker_compose.yml에서 KAFKA_*
로 적힌 config들은 Kafka와 Zookeeper를 도커 컨테이너 밖에서 실행되는 Producer와 Consumer를 연결하는데 사용된다.
컨테이너가 정상 동작하는지 확인한다
docker-compose ps
파이썬으로 producer와 consumer를 구현할 것이기 때문에 Python이 카프카 라이브러리를 설치하자
pip install kafka-python
참고로 본 예제에서는 별도로 토픽을 생성하진 않을건데, 카프카에서는 브로커의 세팅에 auto.create.topics.enable
를 true로 설정해주어 토픽이 생성돼있지 않아도 처음 메시지가 전송되면 기본세팅으로 토픽을 자동 생성해주도록 했다.
Producer & Consumer실습 코드
코드는 아래 레포를 참고하길 바란다.
https://github.com/getveryrichet/kafka-docker/tree/master/examples/simple_example
Producer
from datetime import datetime
import json
from kafka import KafkaProducer
import random
import time
import uuid
EVENT_TYPE_LIST = ['buy', 'sell', 'click', 'hover', 'idle_5']
producer = KafkaProducer(
value_serializer=lambda msg: json.dumps(msg).encode('utf-8'), # we serialize our data to json for efficent transfer
bootstrap_servers=['localhost:9092'])
TOPIC_NAME = 'events_topic'
def _produce_event():
"""
Function to produce events
"""
# UUID produces a universally unique identifier
return {
'event_id': str(uuid.uuid4()),
'event_datetime': datetime.now().strftime('%Y-%m-%d-%H-%M-%S'),
'event_type': random.choice(EVENT_TYPE_LIST)
}
def send_events():
while(True):
data = _produce_event()
time.sleep(3) # simulate some processing logic
print("sending", data)
result = producer.send(TOPIC_NAME, value=data)
# print("result", result.__dict__)
if __name__ == '__main__':
send_events()
위 예시는 랜덤으로 유저의 액션을 생성해서 토픽으로 전송하느 프로듀서의 예시다.
consumer
import json
from kafka import KafkaConsumer
TOPIC_NAME = 'events_topic'
consumer = KafkaConsumer(
TOPIC_NAME,
auto_offset_reset='earliest', # where to start reading the messages at
group_id='event-collector-group-1', # consumer group id
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')) # we deserialize our data from json
)
def consume_events():
for m in consumer:
# any custom logic you need
print("received", m.value)
if __name__ == '__main__':
consume_events()
프로듀서와 컨슈머 실행
자 이제 프로듀서와 컨슈머를 구현했으니 한번 각각 실행해보자.
cd {git repo}/examples/simple_example
python producer.py
python consumer.py
종료
pkill -f producer.py
pkill -f consumer.py
kafka에서 직접 확인
앞서 실행한 프로듀서를 통해 토픽이 진짜 생성됐는지 확인해보자.
docker exec -it $(docker ps -aqf "name=kafka-docker_kafka") bash # get inside the Kafka container
$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server kafka:9092 # list all the topics in this Kafka cluster
# you will see the topics
# __consumer_offsets
# events_topic
생성됐다!!
토픽 메시지 확인
생성된 토픽에 앞서 프로듀서가 전송한 메시지들이 잘 저장됐는지 확인해보자
# view messages stored in the `events_topic` topic
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic events_topic --from-beginning
exit # exit the container
자, 이제 직접 도커를 활용해 카프카를 실행해서, 토픽을 생성하고 프로듀서와 컨슈머를 직접 연동해보았다.
이제 Ansible와 같은 툴을 활용해 AWS 인스턴스에 카프카 클러스터를 직접 구성해보는 등을 시도해볼 수 있겠다.
필자가 읽었던 책에서 해당 내용에 대해 자세히 다루고 있으니, 본 글이 재밌었다면 <실전 카프카 개발부터 운영까지>를 한번 읽어보기를 추천한다.
참고자료
https://www.startdataengineering.com/post/what-why-and-how-apache-kafka/