はじめに
機械学習の実務では「データを収集して加工して保存する」ETL(Extract・Transform・Load)パイプラインを構築する作業が多くあります。データエンジニアリングのスキルはMLエンジニアの市場価値を大きく高めます。
ETLとは
Extractはデータソース(DB・API・CSV)からデータを取得すること、Transformはデータのクレンジング・変換・集計すること、Loadは変換後のデータをDBやDWHに書き込むことです。
シンプルなETLパイプラインの実装
import pandas as pd
import requests
from sqlalchemy import create_engine
import logging
from datetime import datetime
# ロギング設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)
# Extract: APIからデータを取得
def extract_from_api(url: str, date: str) -> pd.DataFrame:
logger.info(f'APIからデータ取得中: {date}')
response = requests.get(url, params={'date': date})
response.raise_for_status()
return pd.DataFrame(response.json())
# Transform: データを加工
def transform(df: pd.DataFrame) -> pd.DataFrame:
logger.info('データ変換中...')
# 型変換
df['created_at'] = pd.to_datetime(df['created_at'])
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# 欠損値処理
df = df.dropna(subset=['order_id', 'amount'])
df['user_id'] = df['user_id'].fillna('unknown')
# 集計特徴量の追加
df['date'] = df['created_at'].dt.date
df['hour'] = df['created_at'].dt.hour
df['is_weekend'] = df['created_at'].dt.dayofweek.isin([5, 6]).astype(int)
# 不要列の削除
df = df.drop(columns=['raw_data', 'internal_id'], errors='ignore')
logger.info(f'変換完了: {len(df)}件')
return df
# Load: データベースに書き込む
def load_to_db(df: pd.DataFrame, table_name: str, engine):
logger.info(f'DBへ書き込み中: {table_name}')
df.to_sql(
table_name, engine,
if_exists='append',
index=False,
chunksize=1000 # 一度に1000件ずつ書き込む
)
logger.info(f'書き込み完了: {len(df)}件')
# メインのETL実行
def run_etl(date: str):
engine = create_engine('postgresql://user:pass@localhost:5432/datawarehouse')
# パイプライン実行
raw_df = extract_from_api('https://api.example.com/orders', date)
clean_df = transform(raw_df)
load_to_db(clean_df, 'orders_processed', engine)
logger.info(f'ETL完了: {date}')
return len(clean_df)
# 実行
if __name__ == '__main__':
today = datetime.now().strftime('%Y-%m-%d')
n = run_etl(today)
print(f'処理完了: {n}件')
Apache Airflowでスケジューリング
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def etl_task(**context):
date = context['ds'] # 実行日を取得
run_etl(date)
# DAG定義(毎日深夜2時に実行)
with DAG(
'daily_etl',
start_date=datetime(2026, 1, 1),
schedule='0 2 * * *',
catchup=False
) as dag:
task = PythonOperator(
task_id='run_etl',
python_callable=etl_task
)
まとめ
ETLパイプラインはExtract→Transform→Loadの3ステップで構成されます。エラーハンドリングとロギングを最初から入れる習慣が重要です。定期実行にはAirflow・Prefect・Cloud Schedulerなどを使いましょう。データエンジニアリングスキルはMLエンジニアとデータサイエンティスト両方の市場価値を高めます。
💼 ITエンジニア転職特化
自分らしく働けるエンジニア転職を目指すなら【strategy career】
年収1000万・残業月30時間以下・リモート可の求人多数
※アフィリエイト広告を含みます
![]()
※本記事にはアフィリエイトリンクが含まれます。


コメント