[MLOps] 소규모 및 중규모 데이터 처리
Pandas는 소규모 데이터 변환에 최적화된 라이브러리입니다. 결측치 처리, 중복 제거, 집계 등 기능으로 지저분한 데이터를 정제하고 분석 효율을 높여줍니다.
개요
소규모 및 중규모 데이터 세트의 데이터 변환을 정리하였습니다.
소규모·중규모 데이터 변환: Pandas를 활용한 효율적인 접근
대용량 데이터 처리와 달리, 소규모 및 중규모 데이터셋(일반적으로 수 기가바이트 미만)을 다룰 때는 처리 속도보다는 분석가의 편의성, 효율성, 그리고 코드의 단순성이 더 중요합니다.
이 영역에서 파이썬의 Pandas 라이브러리는 데이터 조작과 분석을 위한 강력하고 직관적인 도구를 제공하며, 사실상의 표준으로 자리 잡고 있습니다.
- Pandas: 데이터프레임(DataFrame)이라는 2차원 테이블 형태의 자료구조를 기반으로, 지저분한 데이터를 정제하고, 원하는 형태로 가공하며, 기초적인 분석을 수행하는 데 최적화되어 있습니다.
- Polars: Pandas와 유사하지만, Rust 언어를 기반으로 하여 멀티코어 활용과 성능 최적화에 초점을 맞춘 라이브러리입니다. 더 빠른 계산 속도가 필요할 때 고려해볼 수 있는 훌륭한 대안입니다.
Pandas를 이용한 핵심 데이터 정제(Data Cleaning) 기술
ETL(추출, 변환, 적재) 파이프라인에서 '변환(Transform)'의 첫 단계는 데이터를 깨끗하게 만드는 것입니다.
1. 데이터 정리와 결측치 처리
분석의 정확도를 떨어뜨리는 가장 큰 원인 중 하나는 누락된 데이터(결측치, NaN
)입니다.
dropna()
: 결측치가 포함된 행(row)이나 열(column)을 제거합니다. 데이터 손실을 감수할 수 있거나, 결측치가 매우 적을 때 사용합니다.
# 'age' 열에 결측치가 있는 모든 행을 제거
cleaned_df = df.dropna(subset=['age'])
fillna()
: 결측치를 특정 값으로 채웁니다. 데이터의 특성에 따라 0, 평균, 중앙값, 최빈값 등으로 대체하여 데이터 손실을 최소화할 수 있습니다.
# 'score' 열의 결측치를 0으로 대체
df['score'].fillna(0, inplace=True)
# 'age' 열의 결측치를 전체 나이의 평균으로 대체
mean_age = df['age'].mean()
df['age'].fillna(mean_age, inplace=True)
2. 중복 데이터 제거
완전히 동일한 데이터가 반복해서 나타나는 경우, 분석 결과를 왜곡할 수 있습니다. drop_duplicates()
함수로 간단하게 제거할 수 있습니다.
# 모든 열의 값이 완전히 동일한 중복 행을 제거
unique_df = df.drop_duplicates()
# 'user_id' 열을 기준으로 중복을 판단하고, 첫 번째 값만 남김
unique_users_df = df.drop_duplicates(subset=['user_id'], keep='first')
3. 데이터 타입 변환
데이터가 의도와 다른 타입으로 저장된 경우(예: 숫자가 문자열로 저장), 계산이나 분석이 불가능합니다. astype()
을 사용해 데이터 타입을 표준화하고 변환합니다.
# 'price' 열이 문자열('object') 타입일 경우, 정수('int') 타입으로 변환
df['price'] = df['price'].astype(int)
# 'date' 열이 문자열일 경우, 날짜/시간('datetime') 타입으로 변환
df['date'] = pd.to_datetime(df['date'])
분석을 위한 데이터 가공 및 변환 (Data Manipulation)
데이터가 깨끗해졌다면, 이제 분석 목적에 맞게 데이터를 자르고, 붙이고, 요약하는 단계입니다.
1. 필터링, 정렬 (Filtering, Sorting)
- 필터링: 특정 조건을 만족하는 데이터만 추출합니다.
# 30세 이상인 사용자만 필터링
adults_df = df[df['age'] >= 30]
# 서울에 거주하는 30세 이상 사용자 필터링
target_df = df[(df['age'] >= 30) & (df['city'] == 'Seoul')]
- 정렬: 특정 열을 기준으로 데이터를 오름차순 또는 내림차순으로 정렬합니다.
# 'join_date'를 기준으로 최신순(내림차순)으로 정렬
sorted_df = df.sort_values(by='join_date', ascending=False)
2. 집계 (Aggregation)
데이터를 특정 그룹으로 묶어 통계량을 계산하는 핵심적인 기능입니다. groupby()
와 집계 함수(sum
, mean
, count
, agg
등)를 함께 사용합니다.
# 각 도시(city)별 평균 나이 계산
city_avg_age = df.groupby('city')['age'].mean()
# 각 도시별 사용자 수와 평균 구매 금액 계산
city_stats = df.groupby('city').agg(
user_count=('user_id', 'count'),
avg_purchase=('purchase_amount', 'mean')
)
3. 병합 및 연결 (Merging & Joining)
여러 개의 데이터프레임(테이블)을 특정 기준(Key)에 따라 하나로 합칩니다. SQL의 JOIN과 유사한 기능입니다.
merge()
: 공통된 열이나 인덱스를 기준으로 두 데이터프레임을 통합합니다.
# 'user_id'를 기준으로 user_df와 purchase_df를 병합
merged_df = pd.merge(user_df, purchase_df, on='user_id', how='left')
Pandas는 이처럼 데이터 정제부터 변환, 집계, 병합에 이르는 포괄적인 도구를 제공하여, 작거나 중간 크기의 데이터셋을 다루는 분석가에게 필수적인 라이브러리입니다.
Pandas in Action # Demo
실제 데모를 통해 정리하였습니다. Jupyter Notebook 환경을 사용했지만 실제로는 오케스트레이션 도구를 통해 코드를 분리하고 관리하여야 합니다.
데모 자료 입니다
import pandas as pd
# Step 1: Load data
print("Loading data...")
df = pd.read_csv("mock_data.csv")
print(df.head())
print(df.info())
print(df.isnull().sum())
print(df.describe())
print(df['department'].unique())
import pandas as pd
import json
df = pd.read_csv("./mock_data.csv")
print(df.head())
age_median = df["age"].median()
salary_median = df["salary"].median()
df['age'] = df['age'].fillna(age_median)
df['salary'] = df['salary'].fillna(salary_median)
df["department"] = df["department"].fillna("Unknown")
print(df.isnull().sum())
df['profile'] = df['profile'].apply(lambda x: json.loads(x) if pd.notnull(x) else {})
# Extract 'address', 'phone', and 'email' from 'profile' column
df['address'] = df['profile'].apply(lambda x: x.get('address', None))
df['phone'] = df['profile'].apply(lambda x: x.get('phone', None))
df['email'] = df['profile'].apply(lambda x: x.get('email', None))
df = df.drop(columns=['profile'])
df.to_csv("./cleaned_data.csv")
print(df.head())
import pandas as pd
df = pd.read_csv("cleaned_data.csv")
df['address_length'] = df['address'].apply(lambda x: len(str(x)))
df['salary_category'] = df["salary"]
# 1. 구간의 경계값 정의
# - 0 ~ 50000
# - 50001 ~ 70000
# - 70001 ~ 100000
# 이렇게 3개의 구간을 만들기 위해 4개의 경계값을 설정합니다.
bins = [0, 50000, 70000, 100000]
# 2. 각 구간에 붙일 이름(레이블) 정의
# 위에서 만든 3개의 구간에 순서대로 'low', 'medium', 'high'라는 이름을 붙입니다.
labels = ['low', 'medium', 'high']
# 3. pd.cut() 함수를 사용하여 'salary_category'라는 새 열 생성
df['salary_category'] = pd.cut(
df['salary'], # 구간을 나눌 원본 데이터 (salary 열)
bins=bins, # 위에서 정의한 경계값 리스트를 사용
labels=labels, # 각 구간에 붙일 이름표 리스트를 사용
include_lowest=True # 첫 구간(0~50000)에 최소 경계값인 0을 포함시킴
)
# 1. 'department' 열을 기준으로 데이터를 그룹으로 묶습니다.
summary_report = df.groupby("department").agg({
# .agg() 메소드는 각 그룹에 적용할 계산 규칙을 딕셔너리 형태로 전달받습니다.
# key는 열 이름, value는 적용할 함수(문자열)입니다.
"salary": "mean", # 'salary' 열에는 'mean'(평균) 함수를 적용합니다.
"age": "mean" # 'age' 열에도 'mean'(평균) 함수를 적용합니다.
}).reset_index()
# .reset_index()는 groupby 과정에서 인덱스로 설정된 'department'를
# 다시 일반적인 열(column)으로 변환해주는 역할을 합니다.
df.to_csv("transformed_data.csv", index=False)
print("Transformed data saved to 'transformed_data.csv'")