[Infra & Server]

[Airflow] Airflow 개념 중요한 것들만 골라서 정리

quokkalover 2025. 5. 9. 22:57

 

지난 글에서 Airflow에서 온보딩하기 위한 학습순서, 그리고 왜 그 개념들을 공부해야 하는지 간단하게 다뤄보았다.

 

아직 Airflow에 온보딩하기 전에 이 문서를 봤다면 아래 글을 먼저 한번 보고 오는 것을 강력 추천한다!! Airflow를 처음 접하게 될 경우 봐야 할 정보가 너무 많다 보니, 실질적으로 온보딩할 때 뭐에 집중해야 하는지 감을 잡는데 도움이 되기 위해 작성한 글이다.

Airflow 온보딩 가이드: 뭐부터 배워야할지 학습 순서만 다룸

 

무튼 온보딩 가이드에 이어서, 이왕 공부한거 나중에 복습할겸 글을 적어보고자 한다. 복잡해 보이는 Airflow를 시스템 구성 요소(웹 서버, 스케줄러, Executor 등)와 워크플로우 구성 요소(DAG, Operator, Task 등)로 나누어 설명하고, 각 요소들이 어떻게 상호작용하는지 실용적인 관점에서 다뤄보겠다!

 

 

 

 

 

Airflow를 완전히 이해하려면 두 가지 관점에서 봐야 한다:

1) 시스템 아키텍처를 구성하는 요소들과
2) 워크플로우를 구성하는 개념적 요소들이다.

 

시스템 아키텍처 구성 요소

Airflow 시스템은 5가지 핵심 구성 요소로 이루어진 인프라를 갖추고 있다:

  1. 웹 서버(Web Server): DAG와 태스크를 모니터링하고 관리하기 위한 사용자 인터페이스를 제공한다. 사용자는 이 인터페이스를 통해 DAG의 실행 상태를 확인하고, 실패한 태스크를 재시도하거나, 로그를 검토할 수 있다.
  2. 스케줄러(Scheduler): DAG를 주기적으로 검사하여 실행해야 할 태스크를 결정한다. 스케줄러는 의존성 관계를 분석하여 실행 가능한 태스크를 식별하고, 이를 실행 대기열에 추가한다.
  3. 메타데이터 데이터베이스(Metadata Database): DAG, 태스크 인스턴스, 변수, 연결 정보 등 Airflow의 모든 상태 정보를 저장한다. 이 데이터베이스는 스케줄러와 웹 서버가 상태를 공유하는 중앙 허브 역할을 한다.
  4. Executor(실행기): 태스크가 어떻게 그리고 어디서 실행될지 결정한다. 이는 태스크의 분배와 자원 관리를 담당하는 핵심 구성 요소다.
  5. Workers(작업자): 실제로 태스크를 실행하는 프로세스다. 분산 설정에서는 여러 worker가 서로 다른 머신에서 동시에 태스크를 처리할 수 있다.

이 다섯 가지는 Airflow가 시스템으로서 작동하는 데 필요한 인프라 구성 요소들이다. 아키텍처를 이해해야 인프라 설계 결정을 할 때(예를 들어 확장할 때, 병렬처리가 필요해졌을 때), 장애가 발생했을 때 어디를 봐야하는지 알 수 있고 문서 읽다보면 자주 등장하는 개념이기 때문에 간단하게나마 개념이해는 해두는게 좋다.

워크플로우 구성 요소 (개념적 요소)

워크플로우를 구성하는 개념적 요소들은 실제로 사용자가 데이터 파이프라인을 설계할 때 다루는 부분이다:

  1. DAG(Directed Acyclic Graph): 전체 워크플로우의 구조를 정의한다.
  2. Operator: 앞서 자세히 설명했듯이, 이는 태스크 유형을 정의하는 템플릿/클래스다. 여기에는 표준 Operator, Sensor, TaskFlow 데코레이터(@task)가 포함된다.
  3. Task: Operator의 인스턴스로, DAG 내에서 실행될 구체적인 작업이다.
  4. TaskInstance: 특정 실행 날짜에 실행되는 태스크의 구체적인 인스턴스다.
  5. Connection & Variable: 외부 시스템 연결 정보와 설정값을 저장한다.

위 내용을 잘 이해하면 문제가 생겼을 때 더 빨리 해결할 수 있다. 예를 들어 Task가 실패했을 때, 이게 DAG 구조의 문제인지, Operator 설정의 문제인지, 아니면 Connection 문제인지 빠르게 파악할 수 있다.
또 Airflow로 할 수 있는 일과 할 수 없는 일의 경계도 더 잘 알게 된다.

DAG (Directed Acyclic Graph)

DAG(Directed Acyclic Graph)는 DAG객체가 정의된 파이썬 스크립트다.

 

왜 DAG가 중요할까? Airflow에서 어떤 작업을 하려면 반드시 DAG 파일을 만들고, 그 안에 작업 순서를 정의해야 한다. 아무리 간단한 작업이라도 예외는 없다. 그래서 Airflow의 모든 작업은 DAG 파일을 쓰는 것으로 시작된다.

 

DAG 파일은 결국 Python 스크립트인데, Airflow는 이 파일에서 특별한 구조를 찾아 해석한다. 이 파일안에 정의된 DAG 객체를 찾고, 거기에 연결된 태스크들과 그들 간의 의존성을 파악한다. 그리고 그 바탕으로 실행 계획을 세운다.

 

Airflow의 가장 편리한 점은 파일 기반 자동 감지 시스템이다. Airflow 설정에 지정된 dags 디렉토리에 DAG가 코딩된 파이썬 스크립트를 넣기만 하면, Airflow가 자동으로 이를 발견하고 몇 분내에 웹 UI에 표시한다. 파일을 수정하면 변경사항도 자동으로 반영된다.

 

간단한 DAG 코드 예시:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# DAG 정의
dag = DAG('simple_example', start_date=datetime(2023, 1, 1))

# 태스크 정의
task_a = BashOperator(task_id='task_a', bash_command='echo "A 실행"', dag=dag)
task_b = BashOperator(task_id='task_b', bash_command='echo "B 실행"', dag=dag)
task_c = BashOperator(task_id='task_c', bash_command='echo "C 실행"', dag=dag)
task_d = BashOperator(task_id='task_d', bash_command='echo "D 실행"', dag=dag)

# 실행 순서 정의: A → B,C(병렬로) → D
task_a >> [task_b, task_c] >> task_d

 

Operator

Operator는 DAG안에 무엇을 수행할지를 Python 클래스로 정의된 코드다. Airflow에서 무엇을 수행하는지 정의하는 기본 작업 단위다. 각 Operator는 자신만의 전문 영역이 있어서, 필요한 작업에 맞는 Operator를 선택해 사용하면 된다.

Operator의 세 가지 주요 카테고리

Airflow에서는 세 가지 주요 유형의 Operator가 있으며, "특정 작업을 수행하는 방법을 정의한 클래스"다. 모두 내부적으로 BaseOperator 클래스를 상속한다:

 

1. 표준 Operators

사전 정의된 작업 템플릿으로, 다양한 유형의 작업을 수행할 수 있다:

  • 액션 Operator
    • BashOperator: 개발자의 로직이 담긴 Bash 명령을 실행한다
    • PythonOperator: 개발자의 로직이 담긴 Python 함수를 실행한다
    • SQLOperator 변형: SQL 쿼리를 실행한다
    • DockerOperator: Docker 컨테이너를 실행한다
    • KubernetesPodOperator: Kubernetes Pod에서 태스크를 실행한다
  • 전송 Operator
    • 시스템 간에 데이터를 이동시킨다
    • 예: S3ToRedshiftOperator, MySqlToHiveOperator

이러한 Operator들은 각각 특정 유형의 작업에 최적화되어 있어, 코드 재사용성을 높이고 개발 시간을 단축시킨다.

 

2. Sensors(센서)

Sensors는 외부 이벤트가 발생하기를 기다리는 역할을 한다:

  • 특정 조건(예: 파일 도착, API 응답, 데이터베이스 레코드 존재)을 주기적으로 확인한다
  • 조건이 충족될 때까지 대기한다
  • 흔한 예시: FileSensor, ExternalTaskSensor, HttpSensor

Sensor는 워크플로우의 다른 부분 또는 외부 시스템과의 동기화가 필요할 때 특히 유용하다.

 

3. TaskFlow 데코레이터(@task)

TaskFlow API는 Airflow 2.0에서 도입된 기능으로, Python 함수를 직접 태스크로 변환한다:

  • @task 데코레이터로 표준 Python 함수를 태스크로 패키징한다
  • 내부적으로는 PythonOperator를 사용하지만 더 간결한 문법을 제공한다
  • 태스크 간 데이터 전달이 더 직관적이다
  • 개발자가 복잡한 Operator 구성 대신 익숙한 Python 코드를 작성할 수 있다

 

공통 기반 구조와 일관성

이러한 세 가지 유형의 Operator는 모두 Airflow의 BaseOperator 클래스에서 파생되었기 때문에:

  • 동일한 기본 인프라를 공유한다
  • 같은 방식으로 스케줄링되고 실행된다
  • 동일한 재시도 메커니즘, 로깅, 모니터링 기능을 갖는다
  • 동일한 의존성 정의 방식을 사용한다

이러한 일관성은 다양한 Operator 유형을 쉽게 조합하여 복잡한 워크플로우를 구축할 수 있게 해준다.

 

Operator와 Task의 관계

이 둘의 관계를 이해하는 것은 Airflow의 기본 구조를 파악하는 데 중요하다:

  • Operator: 작업 유형을 정의하는 템플릿/클래스다
  • Task: Operator를 인스턴스화하여 만든 구체적인 작업이다

 

즉, Operator는 클래스이고, Task는 그 클래스의 인스턴스다. 이는 객체 지향 프로그래밍에서의 클래스와 인스턴스 관계와 정확히 일치한다. DAG 정의 파일에서 Operator를 사용하여 Task를 생성하면, 스케줄러가 이를 기반으로 실제 실행 단위인 Task Instance를 생성한다.

처음에 "Task와 Operator가 같은 것 아닌가?"라고 혼동했었는데, 둘은 확실히 다르다. 이 차이를 이해하는 게 중요한데, 특히 오류가 발생했을 때 "이것이 Operator 자체의 문제인가, 아니면 특정 Task의 설정 문제인가?"를 구분할 수 있어야 한다.

  • PythonOperator라는 틀에 내 함수를 넣으면 "데이터_추출"이라는 구체적인 Task가 생긴다
  • 같은 PythonOperator로 다른 함수를 넣으면 "데이터_변환"이라는 또 다른 Task가 만들어진다

Python Operator와 Task의 예시를 들어보면 아래와 같다.

# 실제 Airflow 코드 예시
from airflow.operators.python import PythonOperator

# 수행할 실제 함수들
def extract_data():
    # 데이터 추출 로직
    print("데이터를 추출합니다")
    return {"raw_data": [1, 2, 3]}

def transform_data():
    # 데이터 변환 로직
    print("데이터를 변환합니다")
    return {"transformed_data": [2, 4, 6]}

# PythonOperator라는 '틀'에 서로 다른 함수를 넣어서 각각 다른 Task를 생성
extract_task = PythonOperator(
    task_id='데이터_추출',  # 이 이름이 Task의 이름이 됨
    python_callable=extract_data,  # 실행할 함수
    dag=dag
)

transform_task = PythonOperator(
    task_id='데이터_변환',
    python_callable=transform_data,
    dag=dag
)

만약 문제가 발생했을 때 "모든 Python 함수 실행이 안 된다"면 PythonOperator 자체의 문제(버전 호환성 문제, 의존성 문제, 권한 문제, 메모리 제한 등등)고, "데이터_추출만 실패한다"면 그 특정 Task에 사용된 함수의 문제라고 구분할 수 있다.

이 개념을 제대로 이해하면 Airflow 워크플로우를 더 유연하게 설계할 수 있고, 필요에 따라 적절한 Operator를 선택하거나 커스텀 Operator를 만들어 재사용할 수 있다.

 

실행 흐름

DAG 정의 파일에서 Operator를 사용하여 Task를 생성할 때의 실행 흐름은 다음과 같다:

  1. DAG 파일에서 Operator를 인스턴스화하여 Task를 정의한다
  2. 스케줄러가 DAG를 파싱하고 실행해야 할 Task를 식별한다
  3. 실행 시점에 스케줄러는 Operator 정의를 기반으로 Task Instance를 생성한다
  4. Executor가 이 Task Instance를 실행할 Worker를 결정한다
  5. Worker가 Task Instance를 실행하고 결과를 메타데이터 DB에 기록한다

이렇게 Operator는 DAG에서 실제 작업이 수행되는 방식을 정의하는 핵심 구성 요소로, Airflow 워크플로우의 기본 빌딩 블록 역할을 한다.

 

Executor

Executor가 무엇이고 왜 알아야 하나요?

Airflow에서 Executor는 태스크가 실제로 어떻게 실행되는지를 결정하는 구성 요소다. DAG를 설계하고 작성하는 것도 중요하지만, 이 DAG들이 실제로 어떤 방식으로 처리되는지는 Executor 설정에 따라 크게 달라진다.

Executor 선택은 다음과 같은 실질적인 문제에 영향을 미친다:

  1. 처리 속도: 동일한 DAG도 Executor에 따라 몇 분에서 몇 시간까지 실행 시간 차이가 날 수 있다.
    • 직렬 vs 병렬 처리의 근본적 차이
  2. 리소스 활용: 서버 자원을 얼마나 효율적으로 사용할 수 있는지가 결정된다.
  3. 확장성: 워크로드가 증가할 때 시스템을 어떻게 확장할 수 있는지가 달라진다.
  4. 운영 복잡성: 일부 Executor는 설정과 유지보수가 쉬운 반면, 다른 것들은 추가 인프라가 필요하다.

Airflow 설정에서는 한 가지 유형의 Executor만 설정할 수 있으며, 이 결정은 전체 시스템 구조에 영향을 미치므로 프로젝트 초기에 신중하게 선택해야 한다.

 

주요 Executor 유형과 그 용도

1. SequentialExecutor

가장 기본적인 Executor로, 한 번에 하나의 태스크만 처리한다.

  • 특징: 단일 프로세스에서 태스크를 순차적으로 실행
  • 적합한 상황:
    • Airflow 학습 단계에서 개념 이해할 때
    • 간단한 로컬 테스트 환경
    • SQLite 데이터베이스 사용 시 (SQLite는 동시 쓰기를 지원하지 않음)
  • 실제 사용 시 고려사항:
    테스트나 개발 환경 외에는 거의 사용하지 않는다. 10개의 태스크가 있다면 10번째 태스크는 앞의 9개가 모두 완료될 때까지 기다려야 하므로 전체 실행 시간이 크게 늘어난다.

2. LocalExecutor

단일 서버에서 여러 태스크를 동시에 처리할 수 있는 기본적인 병렬 처리 기능을 제공한다.

  • 특징: 로컬 머신에서 여러 Python 프로세스를 생성하여 병렬 실행
  • 적합한 상황:
    • 중소 규모 워크로드 (하루 수백~수천 개 태스크)
    • 단일 서버로 운영해야 하는 제약이 있는 환경
    • 복잡한 분산 시스템 구성 없이 합리적인 성능이 필요할 때
  • 실제 사용 시 고려사항:
    8코어 서버라면 최대 8개 태스크를 동시에 처리할 수 있어 SequentialExecutor보다 훨씬 효율적이다. 문서에 의하면 많은 스타트업이나 중소 규모 데이터 팀이 이 방식으로 시작한다. 서버 메모리와 CPU 코어 수가 동시 실행 가능한 태스크 수를 제한한다.

3. CeleryExecutor

여러 서버에 워크로드를 분산하여 높은 확장성을 제공한다.

  • 특징: Celery 큐를 사용해 여러 워커 노드에 태스크 분배
  • 적합한 상황:
    • 대규모 워크로드 (하루 수만 개 이상의 태스크)
    • 수평적 확장이 필요한 성장 중인 데이터 플랫폼
    • 고가용성이 요구되는 프로덕션 환경
  • 실제 사용 시 고려사항:여러 워커 서버를 추가하는 것만으로 처리 용량을 늘릴 수 있어 대규모 환경에 적합하다. 다만 Redis나 RabbitMQ 같은 메시지 브로커가 추가로 필요하고, 분산 시스템 운영에 따른 복잡성이 증가한다. 문서에 의하면 대부분의 대기업이나 데이터 중심 조직이 이 방식을 선택한다.

4. KubernetesExecutor

쿠버네티스 환경에서 각 태스크를 독립적인 컨테이너로 실행한다.

  • 특징: 태스크별로 격리된 Kubernetes Pod 생성
  • 적합한 상황:
    • 클라우드 네이티브 환경
    • 자원 요구사항이 태스크마다 크게 다른 경우
    • 인프라 비용 최적화가 중요한 경우
  • 실제 사용 시 고려사항:
    가장 유연한 자원 할당이 가능하며, 태스크가 필요한 만큼만 자원을 사용한다. 메모리를 많이 사용하는 태스크와 CPU를 많이 사용하는 태스크가 혼합된 워크로드에 이상적이다. Kubernetes 관리 경험이 필요하지만, 회사가 이미 Kubernetes 환경을 구축했고, 클라우드 환경이라면 가장 비용 효율적인 선택일 수 있다. 제가 속한 회사에서는 Kubernetes Executor를 사용한다.

 

스케줄러와 Executor: 태스크 실행의 이면

스케줄러와 Executor의 관계를 이해하는 것은 Airflow의 아키텍처를 제대로 파악하는 데 중요하다. 이 둘은 마치 관리자와 작업 할당 시스템의 관계와 유사하다.

스케줄러

스케줄러는 단일 구현만 존재한다. 그러나 스케줄러의 작동 방식을 제어하는 중요한 설정들이 있다. 예를 들어 동시성 제어나, hearbeat 주기 등등.. 근데 아키텍처 관점에서는 특별히 어떤 유형이 있는건 아니기 때문에 이번에는 Executor와의 관계만 다룬다.

기본 작동 원리

스케줄러는 "무엇을 언제 실행할지" 결정하고, Executor는 "어떻게 실행할지"를 담당한다. 하지만 여기서 중요한 점은:

Executor는 별도의 독립 서비스가 아니라 스케줄러 내부에서 작동하는 구성 요소다.

 

모든 Executor 유형은 스케줄러 프로세스 내에서 작동하지만, 태스크 실행 방식에 따라 두 가지 패턴으로 나눌 수 있다:

 

1. 로컬 실행 패턴 (SequentialExecutor, LocalExecutor)

로컬 Executor는 스케줄러와 같은 서버에서 태스크를 직접 실행한다:

  • 작동 방식: 스케줄러가 태스크를 실행할 때가 되면, 로컬 Executor는 해당 서버에서 직접 프로세스를 생성해 태스크를 실행한다.
  • 실제 흐름:
    1. 스케줄러가 실행할 태스크를 식별
    2. 내장된 Executor에게 태스크 전달
    3. Executor가 같은 서버에서 태스크 실행
    4. 결과가 데이터베이스에 기록됨
  • 인프라 영향: 모든 부하가 단일 서버에 집중되므로, 해당 서버의 자원(CPU, 메모리)이 전체 처리 용량을 제한한다.

 

2. 원격 실행 패턴 (CeleryExecutor, KubernetesExecutor)

원격 Executor는 태스크 실행을 다른 시스템에 위임한다:

  • 작동 방식: Executor 자체는 여전히 스케줄러 내부에 있지만, 태스크 실행 자체는 외부 워커나 시스템에 전달한다.
  • 실제 흐름:
    1. 스케줄러가 실행할 태스크를 식별
    2. 내장된 Executor가 태스크 정보를 외부 시스템에 전달
      • CeleryExecutor: 메시지 큐에 태스크 정보 전송
      • KubernetesExecutor: Kubernetes API를 호출하여 Pod 생성
    3. 외부 워커/Pod가 태스크를 실행
    4. 결과가 데이터베이스에 기록됨
  • 인프라 영향: 태스크 처리 부하가 여러 워커 노드에 분산되므로, 전체 처리 용량은 워커의 수와 각 워커의 자원에 의해 결정된다.

 

Executor와 Operator의 상호작용

Executor와 Operator는 Airflow의 태스크 실행 프로세스에서 서로 다음과 같은 단계로 상호작용한다.

  1. DAG 정의 단계:
    • 개발자는 Operator 정의를 포함하는 DAG 파일을 작성한다.
    • 각 Operator 인스턴스는 DAG 내의 태스크를 나타낸다.
    • 이는 실행 시간이 아닌 코드 정의 시간에 발생한다.
  2. 스케줄링 단계:
    • 스케줄러는 실행할 준비가 된 태스크를 결정한다.
    • DAG 구조와 일정에 따라 태스크 인스턴스를 생성한다.
    • Operator: 태스크가 수행해야 할 논리를 정의한다(예: PythonOperator, BashOperator).
    • TaskInstance: 특정 시점에서 Operator의 특정 실행을 나타낸다.
  3. 실행 단계:
    • 스케줄러는 태스크 인스턴스를 Executor에 전달한다.
    • Executor는 TaskInstance의 execute() 메서드를 실행한다.
    • Executor는 TaskInstance 자체를 생성하지 않고, 그들의 execute() 메서드가 실행되는 방식(프로세스, 스레드 또는 원격)을 관리한다.
    • 결과는 메타데이터 데이터베이스에 보고된다.

중요한 점은:

  • DAG 파일은 Operator 정의를 포함한다 (개발자가 작성)
  • Executors는 Operator 코드를 실행할 프로세스/환경을 생성한다
  • Operators는 스스로 생성되지 않는다 - Executor가 생성한 환경 내에서 실행된다

 

분산 실행 모델: Airflow의 확장 방식

Airflow에서 워크로드가 증가하면 단일 서버로는 모든 태스크를 효율적으로 처리할 수 없게 된다. 이때 분산 Executor(Celery, Kubernetes)를 사용하면 여러 머신에 작업을 분산시켜 처리 용량을 확장할 수 있다.

분산 환경의 작동 방식

분산 Airflow 환경은 크게 두 부분으로 나뉜다: 중앙 제어 시스템과 실제 작업을 수행하는 워커들이다.

중앙 제어 시스템(스케줄러+Executor)은 태스크를 관리하는 두뇌 역할을 한다. 이 시스템은:

  • DAG를 분석하고 실행할 태스크를 결정한다
  • 각 태스크를 적절한 워커에게 할당한다
  • 태스크 실행 상태를 지속적으로 모니터링한다
  • 실패한 태스크를 재시도하거나 의존성 있는 다음 태스크를 시작한다

워커 노드는 실제 작업이 일어나는 곳이다. 이 워커들은:

  • 중앙 제어 시스템으로부터 실행할 태스크 정보를 받는다
  • 해당 태스크의 Operator 코드를 로드하고 실행한다
  • 예를 들어, Python 함수를 실행하거나 SQL 쿼리를 처리한다
  • 실행 결과와 상태를 중앙 메타데이터 데이터베이스에 기록한다

이 두 부분은 다음과 같이 상호작용한다:

  1. 스케줄러가 실행할 태스크를 발견하면, Executor 컴포넌트에게 이 태스크를 넘긴다
  2. Executor는 이 태스크를 워커 풀에 배정한다:
    • CeleryExecutor: 메시지 큐(RabbitMQ, Redis)에 태스크 정보를 넣는다
    • KubernetesExecutor: 태스크를 실행할 Pod를 생성한다
  3. 워커는 할당된 태스크를 가져와 실행한다
  4. 태스크가 완료되면 결과가 중앙 메타데이터 DB에 저장된다
  5. 스케줄러는 이 결과를 확인하고 다음 태스크를 준비한다

 

분산 환경일때 신경써야 할 부분들

분산 Airflow 환경을 구축할 때 가장 어려운 부분은 일관성과 동기화다. 여러 머신에서 동일한 코드가 동일하게 작동하도록 만드는 것이 핵심 과제다.

 

실무에서는 코드 배포 파이프라인이 가장 큰 골칫거리가 된다. DAG 코드를 변경할 때마다 모든 워커에 동일하게 반영되어야 하는데, 이 과정에서 지연이나 오류가 발생하면 일부 워커는 새 코드로, 일부는 예전 코드로 실행되는 혼란스러운 상황이 벌어진다.

 

환경 일관성 문제도 쉽지 않다. 특히 데이터 과학 프로젝트에서는 새로운 패키지가 자주 추가되는데, 이를 모든 워커에 동일하게 설치하고 버전을 맞추는 일이 생각보다 복잡하다. 컨테이너화가 이 문제를 많이 해결해주지만, 그만큼 인프라 복잡도가 올라가는 트레이드오프가 있다.

결국 소규모 팀에서는 단일 서버 설정의 단순함이 주는 이점이 크고, 대규모 팀에서는 분산 환경의 복잡성을 감수하더라도 확장성을 확보하는 것이 중요하다.

 

Airflow의 대규모 데이터 처리 전략

(1) 동적 태스크 매핑

동적 태스크 매핑은 Airflow 2.3에서 도입된 기능으로, 데이터 항목 목록에 대해 동일한 작업을 병렬로 수행할 수 있게 해준다. 이는 .expand() 메서드를 통해 구현되며, 런타임에 결정되는 데이터에 따라 태스크를 자동으로 생성한다.

이 기능이 등장하기 전에는 두 가지 비효율적인 방법만 있었다: 1) 모든 항목에 대해 하드코딩된 태스크를 미리 만들거나, 2) 하나의 태스크에서 모든 항목을 처리하는 방법이다. 동적 태스크 매핑은 코드 중복 없이 개별 태스크의 모니터링 장점을 제공한다.

규모별 처리 전략

데이터 처리 규모에 따라 최적의 접근 방식이 달라진다:

 

소규모 데이터셋 (수십~수백 항목):
각 항목마다 개별 태스크를 생성하는 것이 이상적이다. 개별 처리 과정을 모니터링하고 문제가 생긴 항목만 재시도할 수 있다.

# 소규모 데이터셋 처리 방식
file_list = get_files_to_process()  # 100개 정도의 파일
process_task = process_file.expand(file_path=file_list)

 

중규모 데이터셋 (수천~수만 항목):
항목들을 적절한 크기의 배치로 그룹화하고, 각 배치를 처리하는 태스크를 생성한다. 이는 태스크 수를 관리 가능한 수준으로 유지하면서도 병렬 처리를 가능하게 한다.

# 중규모 데이터셋 배치 처리 방식
all_items = get_many_items()  # 2만 개의 항목
batches = create_batches(all_items, batch_size=1000)  # 20개 배치로 그룹화
process_task = process_batch.expand(batch=batches)

시스템 부하를 고려한 배치 크기 선택이 중요하다. 실무에서는 처음에 작은 데이터셋으로 잘 작동하던 파이프라인이 데이터가 늘어나면서 문제가 생기는 경우가 많다. 스케줄러 메모리 사용량과 메타데이터 DB 부하를 모니터링하면서 적절한 배치 크기를 조정해야 한다.

 

(2) 외부 처리 시스템과의 통합

 

대규모 데이터셋 (수십만 항목 이상):
이 규모에서는 Airflow를 데이터 처리보다 오케스트레이션에 집중하고, 실제 병렬 처리는 Spark나 Dask 같은 전문 시스템에 위임하는 것이 효과적이다.

# Spark를 활용한 대규모 처리
spark_submit = SparkSubmitOperator(
    task_id='process_large_dataset',
    application='/path/to/spark_job.py',
    conf={'spark.executor.instances': '50'}
)

이런 접근 방식의 장점은 각 도구의 강점을 활용한다는 점이다. Airflow는 워크플로우 의존성 관리, 스케줄링, 재시도 로직을 담당하고, Spark는 대규모 데이터셋의 효율적인 분산 처리를 담당한다. 이는 시스템 리소스를 최적화하고 각 도구가 설계된 목적에 맞게 사용하는 방식이다.

 

실행 환경별 고려사항

 

KubernetesExecutor 환경:


KubernetesExecutor를 사용할 때는 각 태스크가 별도의 Pod로 실행된다. 많은 수의 Pod를 동시에 생성하면 Kubernetes API 서버에 과부하가 발생할 수 있으므로, 동적매핑을 수행할때는 배치 처리 전략이 더욱 중요해진다.

Kubernetes 환경에서도 태스크당 자원 할당을 최적화할 수 있다. 메모리 집약적인 작업과 CPU 집약적인 작업에 각각 다른 자원 요청을 설정하여 클러스터 활용도를 높일 수 있다.

대규모 데이터 처리의 핵심은 결국 "적절한 도구를 적절한 작업에 사용하는 것"이다. Airflow의 강점은 복잡한 워크플로우 관리에 있으며, 데이터 규모가 증가함에 따라 처리 방식을 진화시켜 이 강점을 최대한 활용해야 한다.