[MLOps] 대용량 데이터셋을 다루는 여러가지 도구(Spark, Kafka, Flink)

Apache Spark, Kafka, Flink를 이용해 대규모 데이터와 실시간 스트림을 빠르고 안정적으로 처리하여 비즈니스 가치를 만드는 핵심 기술을 알아봅니다.

[MLOps] 대용량 데이터셋을 다루는 여러가지 도구(Spark, Kafka, Flink)
Photo by Mitchell Luo / Unsplash

개요

소규모, 중규모의 데이터를 넘어 기가, 페타바이트 단위의 큰 데이터셋을 다루는 방법을 정리하였습니다.

Large Datasets - Apache Spark

실제 사용 사례에서는 수천개의 파일과 대용량 크기의 데이터를 다루는 일이 빈번합니다.

1. 대규모 데이터셋과 분산 처리

  • 배경: 단일 서버로 처리하기 힘든 수천 개의 파일, 페타바이트(PB)급의 대용량 데이터를 다루기 위해 등장.
  • 핵심 원리: 분산 컴퓨팅을 활용해 거대한 데이터셋을 여러 컴퓨터(노드)로 구성된 클러스터에 나눔.
  • 효과: 각 노드에서 데이터를 병렬로 동시에 처리하여 전체 작업 시간을 획기적으로 단축.
  • 주요 사용 사례: 웹사이트 클릭스트림 데이터 분석, IoT 장치 센서 데이터 실시간 처리 등.

2. RDD (Resilient Distributed Datasets)

  • 장애 복구 (Fault Tolerance): 특정 노드에 장애가 발생해도 데이터 유실 없이 작업을 지속할 수 있음.
    • 작동 원리: 데이터 자체를 복제하는 대신, 데이터가 생성된 과정, Lineage를 기록함.
    • 복구 과정: 노드 다운 시, 기록된 Lineage을 바탕으로 원본 데이터로부터 필요한 데이터 조각(파티션)을 다시 계산하여 완벽하게 복구.

3. 인메모리(In-memory) 컴퓨팅과 속도

  • 핵심: 데이터 처리 시 발생하는 중간 결과 값을 디스크가 아닌 메모리에 저장.
  • Hadoop MapReduce와의 비교:
    • 하둡: 각 작업 단계마다 중간 결과를 디스크(HDFS)에 쓰고 다시 읽어와 디스크 I/O 병목 발생.
    • 스파크: 중간 결과를 메모리에 상주시킨 채로 다음 연산을 수행하여 디스크 I/O를 최소화하고, 이를 통해 하둡 대비 수십~수백 배 빠른 성능을 제공.

4. 실시간 처리와 머신러닝(ML) 통합

  • 실시간 데이터 소스: Apache Kafka와 같은 메시징 시스템을 통해 IoT 센서 데이터, 로그, 금융 거래 데이터 등을 지속적으로 수신.
  • ML 모델 활용:
    • 실시간으로 가공/집계된 데이터를 피처(Feature)로 변환.
      • DataLake와의 가장 차별화된 부분 → DataLake는 실시간 처리(X)
    • 미리 학습된 ML 모델에 이 피처를 즉시 공급하여 예측 및 분석 수행.

5. 통합 API와 유연성 💻

  • 다양한 언어 지원: Scala, Python (PySpark), Java, R 등 여러 프로그래밍 언어 지원
  • 생태계: Spark SQL(데이터 프레임), MLlib(머신러닝), GraphX(그래프 처리) 등 다양한 라이브러리

Streaming Datasets - Kafka & Apache Flink

실시간 스트리밍 데이터를 다루는 도구를 알아봅니다.

  • Real time Data Analysis

Kafka에서 Apache Flink를 통해 데이터를 실시간으로 집계

집계된 데이터는 데이터베이스 내 다른 데이터간 집계 가능

Apache kafka

실시간 데이터 스트림을 관리하는 도구로 사용하며, 실시간 데이터 스트림을 통해 낮은 지연 시간 처리와 높은 처리량을 제공합니다.

따라서 데이터 일관성과 속도가 중요한 애플리케이션에 많이 사용되고 있습니다.

  • 처리량과 데이터 스트리밍
    • 데이터를 토픽(Topic) 단위로 관리하며, 각 토픽을 여러 개의 파티션(Partition)으로 나누어 저장하고 처리합니다.
      • 대규모 데이터 스트림을 병렬로 처리하여 최소한의 지연 시간을 유지할 수 있습니다.
  • 확장성
    • Kafka 클러스터는 여러 브로커(Broker) 서버들로 구성됩니다.
      • 데이터 처리량이 증가하면 클러스터에 브로커 서버를 추가하는 수평적 확장을 통해 손쉽게 성능을 높일 수 있습니다.
        • 브로커(노드)를 추가하여 파티션의 리더를 나누고 소비자 그룹을 확장함
  • 결함 내성 (Fault Tolerance)
    • 각 파티션은 여러 브로커에 복제(Replication)되어 내구성을 보장합니다. 하나의 원본(리더)과 다수의 복제본(팔로워)으로 구성되며, 리더가 있는 브로커에 장애가 발생하더라도 다른 브로커의 팔로워가 즉시 리더로 승격되어 데이터 유실 없이 서비스를 지속할 수 있습니다.
  • 분산 아키텍처
    • 토픽의 파티션들이 클러스터를 구성하는 여러 브로커에 분산되어 저장 및 관리됩니다. 이를 통한 높은 내구성과 분산 처리를 수행합니다.
  • 실시간 데이터 수집과 메시징
    • 대규모 실시간 데이터 스트림을 생산자(Producer)로부터 안정적으로 수신하여 소비자(Consumer)에게 전달하는 분산 메시징 시스템의 역할을 수행합니다.

Kafka가 실시간으로 수집한다면, 이를 처리할 강력한 도구가 필요합니다.

  • 역할: 실시간 데이터 처리 및 분석 엔진
    • Kafka와 같은 메시징 시스템이 실시간으로 데이터를 수집하고 전달하면, Flink는 그 데이터를 받아 즉시 처리하고 분석하는 강력한 스트리밍 처리 프레임워크입니다.
  • 처리 방식: 스트리밍, 이벤트 기반, 배치
    • 스트리밍 처리: 데이터가 발생하는 즉시 개별 이벤트 단위로 처리하며, 밀리초(ms) 수준의 매우 낮은 지연 시간을 목표로 합니다.
    • 배치 처리: 스트리밍 엔진을 기반으로 유한한 데이터셋(배치)도 효율적으로 처리할 수 있도록 지원합니다.
  • 핵심 기능 1: 상태 기반(Stateful) 스트리밍 처리
    • 단순히 데이터를 통과시키는 것을 넘어, 계산의 중간 결괏값이나 누적된 정보를 자체적으로 '상태(State)'로 저장하고 관리할 수 있습니다.
      • 상태를 통해 시간 경과에 따른 집계(예: 사용자별 최근 1시간 접속 횟수)나 복잡한 패턴 감지 등 고차원적인 분석이 가능합니다.
        • 지연데이터, 게으른 평가를 수행합니다.
  • 핵심 기능 2: 정확성 보증 (Exactly-Once)
    • 네트워크나 서버 장애가 발생하더라도 데이터가 중복 처리되거나 유실되지 않고, '정확히 딱 한 번'만 처리되는 것을 보장합니다.
      • 분산 스냅샷 기술을 통해 정확히 한 번 처리를 보장합니다.
        • 체크포인트 베리어
  • 주요 사용 사례 및 통합
    • 복잡한 실시간 센서 데이터 분석, 사기 탐지 시스템, 실시간 추천 엔진 등에 사용됩니다.
    • Kafka, Hadoop HDFS, 다양한 데이터베이스 등 여러 빅데이터 도구들과 원활하게 통합되어 강력한 데이터 파이프라인을 구축할 수 있습니다.

Apache Kafka Demo

기본적인 설정을 구현해보고 Kafka가 메시지 버스로 어떻게 작동하는지 확인합니다.

  1. 환경 설정을 진행합니다.
sudo apt update
sudo apt install -y python3-pip python3-venv

python3 -m venv kafka_venv
source kafka_venv/bin/activate
  1. Kafka 실행
version: '3'
services:
  # Zookeeper service
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.6
    container_name: admin-zookeeper-1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  # Kafka service
  kafka:
    image: confluentinc/cp-kafka:7.6.6
    container_name: admin-kafka-1
    depends_on:
      - zookeeper
    ports:
      - "9092:9092" # Kafka port exposed to the host port
    environment:
      KAFKA_BROKER_ID: 1
      # Zookeeper connection zookeeper service name and port
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      # Kafka advertised listeners
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  1. Topic 생성하기
docker exec admin-kafka-1 kafka-topics \
  --create \
  --topic sample-topic \
  --bootstrap-server localhost:9092 \
  --partitions 1 \
  --replication-factor 1
  1. 생산자 샘플 코드
"""
This script is a Python producer that sends messages to a Kafka topic
"""
#!/usr/bin/env python3

from kafka import KafkaProducer
import json
import time
from datetime import datetime

def create_producer():
    """
    Create and return a Kafka producer with JSON serialization.

    Returns:
        KafkaProducer: A configured Kafka producer that serializes messages to JSON.
    """
    return KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda x: json.dumps(x).encode('utf-8')
    )

def generate_message():
    """
    Generate a sample message with a timestamp and a random value.

    Returns:
        dict: A dictionary containing a timestamp and a random value.
    """
    return {
        'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'value': round(time.time() % 100, 2)
    }

def main():
    """
    Main function to create a Kafka producer and continuously send messages.

    Sends messages to a Kafka topic every second until interrupted.
    Handles keyboard interrupt to gracefully close the producer.
    """
    producer = create_producer()
    topic_name = 'sample-topic'

    try:
        while True:
            # Generate and send a message to the Kafka topic
            message = generate_message()
            producer.send(topic_name, value=message)
            print(f"Produced message: {message}")
            time.sleep(1)

    except KeyboardInterrupt:
        print("Stopping producer...")
        producer.close()

if __name__ == "__main__":
    main()
Notion Image