機械学習パイプラインをAirflowで自動化する方法

機械学習パイプラインをAirflowで自動化する方法 AIツール・レビュー

はじめに

機械学習プロジェクトを本番運用すると「定期的にデータ取得→前処理→学習→評価→デプロイ」を自動化したくなります。Apache Airflowはこの種のワークフロー自動化の定番ツールです。ML向けの活用方法を解説します。

Airflowの基本概念

AirflowはPythonでDAG(有向非巡回グラフ)を書いて、タスクの依存関係と実行スケジュールを定義するワークフロー管理ツールです。GUIダッシュボードでワークフローの実行状況・エラー・再実行をブラウザで管理できます。AWSのMWAA・GCPのCloud Composerとしてマネージドサービスでも使えます。

MLパイプラインのDAG実装例

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def fetch_data():
    """データ取得"""
    import pandas as pd
    df = pd.read_sql("SELECT * FROM sensor_data WHERE date = CURDATE()", conn)
    df.to_csv('/tmp/raw_data.csv', index=False)
    print(f"取得件数: {len(df)}")

def preprocess():
    """前処理"""
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    df = pd.read_csv('/tmp/raw_data.csv')
    df = df.dropna()
    scaler = StandardScaler()
    df[['temp', 'vibration']] = scaler.fit_transform(df[['temp', 'vibration']])
    df.to_csv('/tmp/processed_data.csv', index=False)

def train_model():
    """モデル学習"""
    from sklearn.ensemble import RandomForestClassifier
    import joblib
    import pandas as pd
    df = pd.read_csv('/tmp/processed_data.csv')
    X, y = df.drop('label', axis=1), df['label']
    model = RandomForestClassifier(n_estimators=100)
    model.fit(X, y)
    joblib.dump(model, '/tmp/model.pkl')

def evaluate_and_deploy():
    """評価してデプロイ"""
    import joblib
    # 評価して閾値を超えたらデプロイ
    model = joblib.load('/tmp/model.pkl')
    # ... 評価ロジック ...

# DAGの定義
with DAG(
    'ml_pipeline',
    schedule_interval='0 2 * * *',  # 毎日2時に実行
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    
    t1 = PythonOperator(task_id='fetch_data', python_callable=fetch_data)
    t2 = PythonOperator(task_id='preprocess', python_callable=preprocess)
    t3 = PythonOperator(task_id='train_model', python_callable=train_model)
    t4 = PythonOperator(task_id='evaluate_deploy', python_callable=evaluate_and_deploy)
    
    t1 >> t2 >> t3 >> t4  # 依存関係の定義

エラーハンドリングとリトライ

AirflowはTask単位でのリトライが可能です。DBへの接続失敗など一時的なエラーはリトライで解決できます。

default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['your@email.com']
}

タスク失敗時にSlack通知を送る設定も可能で、本番運用では必須の設定です。

AirflowとMLflowの組み合わせ

Airflowでワークフローを管理・MLflowで実験・モデルを管理するという組み合わせがMLOpsの標準的な構成です。学習タスクの中でmlflow.log_params・mlflow.log_metricsを呼ぶことで、実験結果が自動的にMLflowに記録されます。モデルの評価スコアが閾値を超えた場合のみデプロイするという自動化も実現できます。

よくある質問

「Airflowの代替ツールはありますか」という質問をよく受けます。Prefect・Dagster・Kubeflow Pipelinesがアルタナティブです。Prefectは設定がシンプルでPythonに馴染みやすいため、Airflowより学習コストが低いという評価もあります。「ローカル環境でAirflowを試すには」という質問については、docker-compose up -d で複数のAirflowコンポーネント(Scheduler・Webserver・Worker)を一括起動できます。公式のdocker-compose.yamlを使うのが最も簡単です。

まとめ

AirflowによるMLパイプラインの自動化は「定期実行・エラーハンドリング・再実行・GUIでの監視」という4点が大きな価値をもたらします。MLflowと組み合わせることで本格的なMLOpsが実現できます。まずdocker-composeでAirflowをローカル起動して、シンプルなDAGを1つ作ることが最初のステップです。

💼 ITエンジニア転職特化

自分らしく働けるエンジニア転職を目指すなら【strategy career】

年収1000万・残業月30時間以下・リモート可の求人多数

💼 無料で転職相談する →

※アフィリエイト広告を含みます

コメント

タイトルとURLをコピーしました