[MLOps] Apache Airflow 기능 및 용어 정리

Apache Airflow의 핵심 구성 요소인 DAG, Operator 등을 활용하여 복잡한 데이터 파이프라인을 자동화하는 방법을 소개합니다. 파이썬 코드로 워크플로우를 손쉽게 관리할 수 있습니다.

[MLOps] Apache Airflow 기능 및 용어 정리
Photo by Andrey Matveev / Unsplash

개요

24일 일요일 운좋게 Airflow 원데이 클래스를 확인하여 수강했습니다.

배웠던 이론들과 실습 코드들을 필요한만큼 정리하였습니다.

이번 주 8월 24일(일) 진행되는 Airflow 무료 특강에 무려 110분이 신청해 주셨습니다. 🙌 링크드인 포스트를 보고 신청해 주신 분들도 많을 텐데, 진심으로 감사드립니다. 🙏 사전 질문도 20개 이상 들어와 오늘부터 차근차근 답변을 달며 준비하고 있습니다. 코드잇 정규 강의에서는 2~3일에 걸쳐 강의하던 내용을 4시간에 압축해서… | Hyunsoo (Ryan) Lee | 10 comments
이번 주 8월 24일(일) 진행되는 Airflow 무료 특강에 무려 110분이 신청해 주셨습니다. 🙌 링크드인 포스트를 보고 신청해 주신 분들도 많을 텐데, 진심으로 감사드립니다. 🙏 사전 질문도 20개 이상 들어와 오늘부터 차근차근 답변을 달며 준비하고 있습니다. 코드잇 정규 강의에서는 2~3일에 걸쳐 강의하던 내용을 4시간에 압축해서 전달하려고 하다 보니 어떻게 하면 효율적인 강의가 가능할까 하는 고민도 생기고, 더 많은 내용들을 알려드리고 싶은 욕심도 생기네요.🤔(PPT 슬라이드가 75장.....!🤯) 남은 기간 동안 꼼꼼히 준비해서 여러분의 주말 4시간이 충분히 보람찬 시간이 될 수 있도록 하겠습니다. 💡 질문에 대한 답변은 잘 정리해서 강의 이후 링크드인에도 공유할 수 있도록 할게요! | 10 comments on LinkedIn

Apache Airflow 용어 정리

DAG(Directed Acyclie Graph)

  • 자신으로 되돌아오는 경로가 없는 그래프
  • 작업이 다른 작업에 의존하거나 특정 순서로 실행 → 방향성
  • 무한 루프 방지를 위해 강제

Airflow에서 하나의 온전한 파이프라인 스크립트를 DAG라고 칭한다. → 순환되는 구조가 없어야한다

Operator

  • 작업 파이프라인 안에서 하나의 요소(Task)를 정의하고 실행하는 단위
  • 1,000개가 넘는 오퍼레이터를 조합하여 작업을 실행
  • 목적에 맞게 오퍼레이터를 만들 수 있음

XCom

  • 작업간 데이터나 리턴 값을 이후 작업에 전달하기 위한 메커니즘
  • Airflow에는 문법에 맞게 코드를 작성해야함
  • 동일한 DAG 내에서만 데이터 공유가 가능
  • 소규모 데이터를 공유하도록 권장
  • Python Operator의 반환 값은 자동으로 XCom에 저장

Sensor

  • 특정 조건이 충족될 때 까지 기다리는 오퍼레이터
  • 조건이 충족되면 다음 작업을 이어서 진행하며 사용자 정의에 따른 대기 상태를 유지
  • 다양한 센서 메소드 뿐만 아니라 사용자 정의 센서 개발 가능

Connection

  • Airflow가 외부 서비스와 통신하기 위한 URL, 사용자 인증 정보 등을 저장하는 설정
  • Hook에서 Connection 정보를 사용하여 외부 시스템과 연결을 수행
  • 다양한 도메인과 연결에 필요한 정보 저장

Jinja Template

  • 작업시 동적으로 값을 치환할 수 있게 해주는 템플릿 기능
  • 컨텍스트 예시
    • ds → 실행 날짜(2025-08-17)
      • {{ ds }}

Hook

  • 오퍼레이터 외부 시스템과 Connection 객체를 생성할 수 있도록 지원하는 인터페이스
  • Operator에서 사용하는 것보다 서비스들에 대한 자유도를 높히는 방안
    • 커스텀 오퍼레이트 방안

Cron 표현식

  • 작업 스케줄러

Variable

  • 전역 변수 저장소

Directory 구조

  • dags: 파이썬으로 개발한 DAG(Directed Acyclic Graph) 스크립트 파일들이 저장되는 디렉토리입니다. Airflow 스케줄러는 이 디렉토리를 주기적으로 스캔하여 DAG를 인식하고 실행합니다.
  • logs: DAG 파싱 및 파이프라인 작업 실행과 관련된 모든 로그가 저장되는 디렉토리입니다. 작업 실패 시 원인 분석을 위해 주로 사용됩니다.
  • config: Airflow의 핵심 설정 파일(airflow.cfg)과 사용자 정의 설정 파일(예: 데이터베이스 연결 정보, Access Key 등)이 위치하는 디렉토리입니다.
  • plugins: 공통으로 사용되는 함수, 클래스, 후크(Hook), 오퍼레이터(Operator) 등 모듈화된 파이프라인 코드를 저장하는 디렉토리입니다. 이곳에 저장된 코드는 여러 DAG에서 재사용할 수 있습니다.
  • .env: Airflow 인프라 실행에 필요한 환경 변수들을 KEY=VALUE 형태로 정의하는 설정 파일입니다. 주로 데이터베이스 비밀번호나 API 키와 같은 민감한 정보를 관리하는 데 사용됩니다.

Console

Notion Image

Airflow Architecture

Notion Image
  • 이미지 대체

DAG 스크립트 구조

  • DAG 객체 및 공통 정보 정의
    • default_args
      • 모든 Task들이 공통적으로 가지는 파라미터
        • 재시도, 알림 등..
    • DAG()
      • DAG 자체의 특성과 동작을 정의하는 파라미터
        • 언제 어떻게 실행되는지 같은 워크플로우 자체의 설정값
  • 구체적인 Task 정의
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator

default_args = dict(
    owner = 'bda', # 개별 DAG 관리자
    email = ['bda@airflow.com'],
    email_on_failure = False,
    retries = 3
    )

with DAG(
    dag_id="01_tutorial_dag",
    start_date=pendulum.datetime(2025, 8, 1, tz='Asia/Seoul'),
    schedule="30 10 * * *", # cron 표현식
    tags = ['20250824', 'BASIC'],
    default_args = default_args,
    catchup=False
):

    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3")
    task4 = EmptyOperator(task_id="task4")
    task5 = EmptyOperator(task_id="task5")

# task1 >> task2 >> task3 >> task4 >> task5
task1 >> [task2, task3] >> task4 >> task5
Notion Image
  • Task간 실행 순서 설정
    • @task 데코레이터를 통해 훨씬 더 직관적인 방식으로 Task를 구성할 수 있음.
import pendulum, random
from airflow.sdk import DAG, task
"""
🔸 TaskFlow API
    - 파이썬 함수를 데코레이터로 감싸서 태스크로 변환하는 방식
    - 함수의 반환값이 다음 태스크 함수의 인자로 자동 전달되어 데이터 흐름을 간단히 표현 가능
    - AIRFLOW의 의존성 설정(task1 >> task2) 방식 대신, 함수 호출만으로 태스크 간 종속성 정의 가능
    - Pythonic한 방식으로 코드 작성이 가능하기 때문에 PythonOperator 대신 사용하는 것을 권장!
"""

default_args = dict(
    owner = 'bda',
    email = ['bda@airflow.com'],
    email_on_failure = False,
    retries = 3
    )

with DAG(
    dag_id="03_python_taskflow_dag",
    start_date=pendulum.datetime(2025, 8, 1, tz='Asia/Seoul'),
    schedule="30 10 * * *",
    tags = ['20250824','BASIC'],
    default_args = default_args,
    catchup=False
):
    @task(task_id='select_lang')
    def random_language():
        lang_list = ["PYTHON", 'JAVA', 'RUST']
        lang = random.sample(lang_list, 1)[0]
        print("SELECTED LANGUAGE : ", lang)
        return lang

    # @task(task_id='one')
    # def list_one():
    #     return [1,2,3]

    # @task(task_id='two')
    # def list_two(lst):
    #     return lst + [4,5,6]

    # @task(task_id='three')
    # def list_three(lst):
    #     return lst + [7,8,9]

    select_lang = random_language()
    # one = list_one()
    # two = list_two(one)
    # three = list_three(two)

    select_lang
Notion Image

Return이 없다면 결과적으론 XCom에서 표시되지 않음.

@task(task_id='get_brewery_api')
def get_brewery_api():
    URL = 'https://fakerapi.it/api/v2/users'
    response = requests.get(URL)

    res = response.json()['data']

    return res

@task(task_id='api_to_dataframe')
def api_to_dataframe(api_data):
    df = pd.json_normalize(api_data)

    for row in df.head().to_dict(orient='records'):
        pprint(row)
    print("df.shape : ", df.shape)

Jinja Template을 통해 활용 가능 - get_current_context()

  • ti 를 통해 XCom 값을 가져올 수 있습니다.
import pendulum
from pprint import pprint
from airflow.sdk import DAG, task, get_current_context

"""
🔸 Airflow의 컨텍스트(Context)
    - 태스크가 실행될 때 DAG, Task, 실행 날짜, 매크로, 파라미터 등 실행 환경 관련 메타데이터를 담고 있는 딕셔너리
    - 시간 관련 데이터도 포함하고 있기 때문에 실행별로 값이 달라짐
    - 'ti' 객체를 활용하여 XCOM 저장소 데이터를 저장/조회 가능
"""

default_args = dict(
    owner = 'bda',
    email = ['bda@airflow.com'],
    email_on_failure = False,
    retries = 3
    )

with DAG(
    dag_id="05_context_check_dag",
    start_date=pendulum.datetime(2025, 8, 1, tz='Asia/Seoul'),
    schedule="30 10 * * *",
    tags = ['20250824','BASIC'],
    default_args = default_args,
    catchup=False
):

    @task(task_id='context_check')
    def context_check():
        ctx = get_current_context()
        pprint(ctx)

    context_check()
Notion Image
  • Airflow Connection
Notion Image

Airflow 개발 관련 Tip

진행하고자하는 Operator 기능을 찾고 거기에 맞는 코드를 작성하면 Python에 있는 모든 작업을 자동화할 수 있습니다.