[MLOps] Apache Airflow 기능 및 용어 정리
Apache Airflow의 핵심 구성 요소인 DAG, Operator 등을 활용하여 복잡한 데이터 파이프라인을 자동화하는 방법을 소개합니다. 파이썬 코드로 워크플로우를 손쉽게 관리할 수 있습니다.
개요
24일 일요일 운좋게 Airflow 원데이 클래스를 확인하여 수강했습니다.
배웠던 이론들과 실습 코드들을 필요한만큼 정리하였습니다.
이번 주 8월 24일(일) 진행되는 Airflow 무료 특강에 무려 110분이 신청해 주셨습니다. 🙌 링크드인 포스트를 보고 신청해 주신 분들도 많을 텐데, 진심으로 감사드립니다. 🙏 사전 질문도 20개 이상 들어와 오늘부터 차근차근 답변을 달며 준비하고 있습니다. 코드잇 정규 강의에서는 2~3일에 걸쳐 강의하던 내용을 4시간에 압축해서… | Hyunsoo (Ryan) Lee | 10 comments
이번 주 8월 24일(일) 진행되는 Airflow 무료 특강에 무려 110분이 신청해 주셨습니다. 🙌 링크드인 포스트를 보고 신청해 주신 분들도 많을 텐데, 진심으로 감사드립니다. 🙏 사전 질문도 20개 이상 들어와 오늘부터 차근차근 답변을 달며 준비하고 있습니다. 코드잇 정규 강의에서는 2~3일에 걸쳐 강의하던 내용을 4시간에 압축해서 전달하려고 하다 보니 어떻게 하면 효율적인 강의가 가능할까 하는 고민도 생기고, 더 많은 내용들을 알려드리고 싶은 욕심도 생기네요.🤔(PPT 슬라이드가 75장.....!🤯) 남은 기간 동안 꼼꼼히 준비해서 여러분의 주말 4시간이 충분히 보람찬 시간이 될 수 있도록 하겠습니다. 💡 질문에 대한 답변은 잘 정리해서 강의 이후 링크드인에도 공유할 수 있도록 할게요! | 10 comments on LinkedIn
Apache Airflow 용어 정리
DAG(Directed Acyclie Graph)
- 자신으로 되돌아오는 경로가 없는 그래프
 - 작업이 다른 작업에 의존하거나 특정 순서로 실행 → 방향성
 - 무한 루프 방지를 위해 강제
 
Airflow에서 하나의 온전한 파이프라인 스크립트를 DAG라고 칭한다. → 순환되는 구조가 없어야한다
Operator
- 작업 파이프라인 안에서 하나의 요소(Task)를 정의하고 실행하는 단위
 - 1,000개가 넘는 오퍼레이터를 조합하여 작업을 실행
 - 목적에 맞게 오퍼레이터를 만들 수 있음
 
XCom
- 작업간 데이터나 리턴 값을 이후 작업에 전달하기 위한 메커니즘
 - Airflow에는 문법에 맞게 코드를 작성해야함
 - 동일한 DAG 내에서만 데이터 공유가 가능
 - 소규모 데이터를 공유하도록 권장
 - Python Operator의 반환 값은 자동으로 XCom에 저장
 
Sensor
- 특정 조건이 충족될 때 까지 기다리는 오퍼레이터
 - 조건이 충족되면 다음 작업을 이어서 진행하며 사용자 정의에 따른 대기 상태를 유지
 - 다양한 센서 메소드 뿐만 아니라 사용자 정의 센서 개발 가능
 
Connection
- Airflow가 외부 서비스와 통신하기 위한 URL, 사용자 인증 정보 등을 저장하는 설정
 - Hook에서 Connection 정보를 사용하여 외부 시스템과 연결을 수행
 - 다양한 도메인과 연결에 필요한 정보 저장
 
Jinja Template
- 작업시 동적으로 값을 치환할 수 있게 해주는 템플릿 기능
 - 컨텍스트 예시
- ds → 실행 날짜(2025-08-17)
{{ ds }}
 
 - ds → 실행 날짜(2025-08-17)
 
Hook
- 오퍼레이터 외부 시스템과 Connection 객체를 생성할 수 있도록 지원하는 인터페이스
 - Operator에서 사용하는 것보다 서비스들에 대한 자유도를 높히는 방안
- 커스텀 오퍼레이트 방안
 
 
Cron 표현식
- 작업 스케줄러
 
Variable
- 전역 변수 저장소
 
Directory 구조
- dags: 파이썬으로 개발한 DAG(Directed Acyclic Graph) 스크립트 파일들이 저장되는 디렉토리입니다. Airflow 스케줄러는 이 디렉토리를 주기적으로 스캔하여 DAG를 인식하고 실행합니다.
 - logs: DAG 파싱 및 파이프라인 작업 실행과 관련된 모든 로그가 저장되는 디렉토리입니다. 작업 실패 시 원인 분석을 위해 주로 사용됩니다.
 - config: Airflow의 핵심 설정 파일(
airflow.cfg)과 사용자 정의 설정 파일(예: 데이터베이스 연결 정보, Access Key 등)이 위치하는 디렉토리입니다. - plugins: 공통으로 사용되는 함수, 클래스, 후크(Hook), 오퍼레이터(Operator) 등 모듈화된 파이프라인 코드를 저장하는 디렉토리입니다. 이곳에 저장된 코드는 여러 DAG에서 재사용할 수 있습니다.
 - .env: Airflow 인프라 실행에 필요한 환경 변수들을 
KEY=VALUE형태로 정의하는 설정 파일입니다. 주로 데이터베이스 비밀번호나 API 키와 같은 민감한 정보를 관리하는 데 사용됩니다. 
Console

Airflow Architecture

- 이미지 대체
 
DAG 스크립트 구조
- DAG 객체 및 공통 정보 정의
- default_args
- 모든 Task들이 공통적으로 가지는 파라미터
- 재시도, 알림 등..
 
 
 - 모든 Task들이 공통적으로 가지는 파라미터
 - DAG()
- DAG 자체의 특성과 동작을 정의하는 파라미터
- 언제 어떻게 실행되는지 같은 워크플로우 자체의 설정값
 
 
 - DAG 자체의 특성과 동작을 정의하는 파라미터
 
 - default_args
 - 구체적인 Task 정의
 
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
default_args = dict(
    owner = 'bda', # 개별 DAG 관리자
    email = ['bda@airflow.com'],
    email_on_failure = False,
    retries = 3
    )
with DAG(
    dag_id="01_tutorial_dag",
    start_date=pendulum.datetime(2025, 8, 1, tz='Asia/Seoul'),
    schedule="30 10 * * *", # cron 표현식
    tags = ['20250824', 'BASIC'],
    default_args = default_args,
    catchup=False
):
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3")
    task4 = EmptyOperator(task_id="task4")
    task5 = EmptyOperator(task_id="task5")
# task1 >> task2 >> task3 >> task4 >> task5
task1 >> [task2, task3] >> task4 >> task5

- Task간 실행 순서 설정
@task데코레이터를 통해 훨씬 더 직관적인 방식으로 Task를 구성할 수 있음.
 
import pendulum, random
from airflow.sdk import DAG, task
"""
🔸 TaskFlow API
    - 파이썬 함수를 데코레이터로 감싸서 태스크로 변환하는 방식
    - 함수의 반환값이 다음 태스크 함수의 인자로 자동 전달되어 데이터 흐름을 간단히 표현 가능
    - AIRFLOW의 의존성 설정(task1 >> task2) 방식 대신, 함수 호출만으로 태스크 간 종속성 정의 가능
    - Pythonic한 방식으로 코드 작성이 가능하기 때문에 PythonOperator 대신 사용하는 것을 권장!
"""
default_args = dict(
    owner = 'bda',
    email = ['bda@airflow.com'],
    email_on_failure = False,
    retries = 3
    )
with DAG(
    dag_id="03_python_taskflow_dag",
    start_date=pendulum.datetime(2025, 8, 1, tz='Asia/Seoul'),
    schedule="30 10 * * *",
    tags = ['20250824','BASIC'],
    default_args = default_args,
    catchup=False
):
    @task(task_id='select_lang')
    def random_language():
        lang_list = ["PYTHON", 'JAVA', 'RUST']
        lang = random.sample(lang_list, 1)[0]
        print("SELECTED LANGUAGE : ", lang)
        return lang
    # @task(task_id='one')
    # def list_one():
    #     return [1,2,3]
    # @task(task_id='two')
    # def list_two(lst):
    #     return lst + [4,5,6]
    # @task(task_id='three')
    # def list_three(lst):
    #     return lst + [7,8,9]
    select_lang = random_language()
    # one = list_one()
    # two = list_two(one)
    # three = list_three(two)
    select_lang

Return이 없다면 결과적으론 XCom에서 표시되지 않음.
@task(task_id='get_brewery_api')
def get_brewery_api():
    URL = 'https://fakerapi.it/api/v2/users'
    response = requests.get(URL)
    res = response.json()['data']
    return res
@task(task_id='api_to_dataframe')
def api_to_dataframe(api_data):
    df = pd.json_normalize(api_data)
    for row in df.head().to_dict(orient='records'):
        pprint(row)
    print("df.shape : ", df.shape)
Jinja Template을 통해 활용 가능 - get_current_context()
ti를 통해 XCom 값을 가져올 수 있습니다.
import pendulum
from pprint import pprint
from airflow.sdk import DAG, task, get_current_context
"""
🔸 Airflow의 컨텍스트(Context)
    - 태스크가 실행될 때 DAG, Task, 실행 날짜, 매크로, 파라미터 등 실행 환경 관련 메타데이터를 담고 있는 딕셔너리
    - 시간 관련 데이터도 포함하고 있기 때문에 실행별로 값이 달라짐
    - 'ti' 객체를 활용하여 XCOM 저장소 데이터를 저장/조회 가능
"""
default_args = dict(
    owner = 'bda',
    email = ['bda@airflow.com'],
    email_on_failure = False,
    retries = 3
    )
with DAG(
    dag_id="05_context_check_dag",
    start_date=pendulum.datetime(2025, 8, 1, tz='Asia/Seoul'),
    schedule="30 10 * * *",
    tags = ['20250824','BASIC'],
    default_args = default_args,
    catchup=False
):
    @task(task_id='context_check')
    def context_check():
        ctx = get_current_context()
        pprint(ctx)
    context_check()

- Airflow Connection
 

Airflow 개발 관련 Tip
진행하고자하는 Operator 기능을 찾고 거기에 맞는 코드를 작성하면 Python에 있는 모든 작업을 자동화할 수 있습니다.