データエンジニアリング入門【ETLパイプラインをPythonで作る方法】

データエンジニアリング入門【ETLパイプラインをPythonで作る方法】 AIツール・レビュー

はじめに

機械学習の実務では「データを収集して加工して保存する」ETL(Extract・Transform・Load)パイプラインを構築する作業が多くあります。データエンジニアリングのスキルはMLエンジニアの市場価値を大きく高めます。

ETLとは

Extractはデータソース(DB・API・CSV)からデータを取得すること、Transformはデータのクレンジング・変換・集計すること、Loadは変換後のデータをDBやDWHに書き込むことです。

シンプルなETLパイプラインの実装

import pandas as pd
import requests
from sqlalchemy import create_engine
import logging
from datetime import datetime

# ロギング設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)

# Extract: APIからデータを取得
def extract_from_api(url: str, date: str) -> pd.DataFrame:
    logger.info(f'APIからデータ取得中: {date}')
    response = requests.get(url, params={'date': date})
    response.raise_for_status()
    return pd.DataFrame(response.json())

# Transform: データを加工
def transform(df: pd.DataFrame) -> pd.DataFrame:
    logger.info('データ変換中...')
    # 型変換
    df['created_at'] = pd.to_datetime(df['created_at'])
    df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
    
    # 欠損値処理
    df = df.dropna(subset=['order_id', 'amount'])
    df['user_id'] = df['user_id'].fillna('unknown')
    
    # 集計特徴量の追加
    df['date'] = df['created_at'].dt.date
    df['hour'] = df['created_at'].dt.hour
    df['is_weekend'] = df['created_at'].dt.dayofweek.isin([5, 6]).astype(int)
    
    # 不要列の削除
    df = df.drop(columns=['raw_data', 'internal_id'], errors='ignore')
    
    logger.info(f'変換完了: {len(df)}件')
    return df

# Load: データベースに書き込む
def load_to_db(df: pd.DataFrame, table_name: str, engine):
    logger.info(f'DBへ書き込み中: {table_name}')
    df.to_sql(
        table_name, engine,
        if_exists='append',
        index=False,
        chunksize=1000  # 一度に1000件ずつ書き込む
    )
    logger.info(f'書き込み完了: {len(df)}件')

# メインのETL実行
def run_etl(date: str):
    engine = create_engine('postgresql://user:pass@localhost:5432/datawarehouse')
    
    # パイプライン実行
    raw_df = extract_from_api('https://api.example.com/orders', date)
    clean_df = transform(raw_df)
    load_to_db(clean_df, 'orders_processed', engine)
    
    logger.info(f'ETL完了: {date}')
    return len(clean_df)

# 実行
if __name__ == '__main__':
    today = datetime.now().strftime('%Y-%m-%d')
    n = run_etl(today)
    print(f'処理完了: {n}件')

Apache Airflowでスケジューリング

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def etl_task(**context):
    date = context['ds']  # 実行日を取得
    run_etl(date)

# DAG定義(毎日深夜2時に実行)
with DAG(
    'daily_etl',
    start_date=datetime(2026, 1, 1),
    schedule='0 2 * * *',
    catchup=False
) as dag:
    
    task = PythonOperator(
        task_id='run_etl',
        python_callable=etl_task
    )

まとめ

ETLパイプラインはExtract→Transform→Loadの3ステップで構成されます。エラーハンドリングとロギングを最初から入れる習慣が重要です。定期実行にはAirflow・Prefect・Cloud Schedulerなどを使いましょう。データエンジニアリングスキルはMLエンジニアとデータサイエンティスト両方の市場価値を高めます。

💼 ITエンジニア転職特化

自分らしく働けるエンジニア転職を目指すなら【strategy career】

年収1000万・残業月30時間以下・リモート可の求人多数

💼 無料で転職相談する →

※アフィリエイト広告を含みます

※本記事にはアフィリエイトリンクが含まれます。

コメント

タイトルとURLをコピーしました