バックグラウンドジョブの冪等性、後付けすると地獄になる理由

  • #バックグラウンドジョブ
  • #冪等性
  • #データベース設計

バックグラウンドジョブの冪等性設計、後付けすると本当につらい

以前、決済系のバッチ処理で「同じ請求が二重に走った」インシデントを経験した。原因はシンプルで、ジョブキューの at-least-once 配信保証と、ジョブ実装側が冪等でなかったことの組み合わせだった。リトライは正常系として設計されていたのに、冪等性は「まあ滅多に起きないから」で後回しにされていた。当然ながら、後からそれを直すのは地獄だった。

バックグラウンドジョブのリトライは例外ではなく通常運転だ。ネットワーク瞬断、デプロイのタイミング、ワーカープロセスの強制終了、どれでも発生する。冪等性は最初から設計に組み込むべきで、後付けでできるものではない。

冪等キーの設計

冪等性を担保する最もシンプルな手段は「冪等キー」だ。「この処理はこの ID で一度だけ実行する」という契約をシステムに持たせる。

冪等キーの設計で重要なのは、キーの粒度をジョブの意図と一致させることだ。たとえば「ユーザー A の 2024 年 6 月分の請求を実行する」というジョブなら、冪等キーは billing:user_id=123:month=2024-06 のような形になる。job_id だけを使うのは危険で、ジョブがリエンキューされた場合に別の job_id が発行されてしまい、同じ処理が二度走る。

def process_monthly_billing(user_id: int, billing_month: str):
    idempotency_key = f"billing:{user_id}:{billing_month}"

    with db.transaction():
        existing = db.query(
            "SELECT id, status FROM idempotency_keys WHERE key = %s FOR UPDATE",
            [idempotency_key]
        )
        if existing and existing["status"] == "completed":
            return  # 既に完了済み、スキップ

        if not existing:
            db.execute(
                "INSERT INTO idempotency_keys (key, status) VALUES (%s, 'processing')",
                [idempotency_key]
            )

        # 実際の請求処理
        _charge_user(user_id, billing_month)

        db.execute(
            "UPDATE idempotency_keys SET status = 'completed' WHERE key = %s",
            [idempotency_key]
        )

FOR UPDATE でロックを取っているのは、複数ワーカーが同時に同じキーを処理しようとした場合の競合対策だ。これがないと、どちらも existing = None を読んで両方が処理を開始する。

DBレベルの重複排除

冪等キーテーブルに UNIQUE 制約を張って、楽観的に INSERT して重複を検知するパターンも有効だ。特に処理の開始前に確実にロックしたいケースで使いやすい。

CREATE TABLE idempotency_keys (
    key         VARCHAR(255) PRIMARY KEY,
    status      VARCHAR(20) NOT NULL DEFAULT 'processing',
    created_at  TIMESTAMP NOT NULL DEFAULT NOW(),
    completed_at TIMESTAMP
);
def process_with_upsert(idempotency_key: str, handler):
    try:
        db.execute(
            "INSERT INTO idempotency_keys (key) VALUES (%s)",
            [idempotency_key]
        )
    except UniqueViolationError:
        # 既に誰かが処理中か完了済み
        row = db.query("SELECT status FROM idempotency_keys WHERE key = %s", [idempotency_key])
        if row["status"] == "completed":
            return
        # processing のままなら前の処理が途中で死んでいる可能性がある
        # ここで冪等キーの有効期限やタイムアウトを見て再処理を許可するか判断する
        raise RetryableError("Job may be in progress, will retry")

    handler()

    db.execute(
        "UPDATE idempotency_keys SET status = 'completed', completed_at = NOW() WHERE key = %s",
        [idempotency_key]
    )

processing のまま残ったレコードの扱いは設計の悩みどころで、一定時間後に再処理を許可するか、アラートを出して手動確認にするかはシステムの性質による。決済なら後者を選ぶことが多い。

メール送信の場合

決済と違ってメール送信は「二重送信してしまった」ときのダメージが小さいが、ゼロではない。パスワードリセットメールが 5 通届いたらユーザー体験は最悪だ。

メール送信の冪等キーは email_type:user_id:trigger_event_id のような形で管理するのが実務的だ。trigger_event_id に相当するものがなければ、日付やリクエスト ID を使う。

def send_password_reset_email(user_id: int, reset_token_id: str):
    idempotency_key = f"email:password_reset:{user_id}:{reset_token_id}"

    if idempotency_store.exists(idempotency_key):
        logger.info("Email already sent, skipping", key=idempotency_key)
        return

    email_service.send(
        to=get_user_email(user_id),
        template="password_reset",
        token=get_reset_token(reset_token_id)
    )

    # メール送信は外部サービスの呼び出しなのでここでも失敗しうる
    # 送信成功後に記録する。送信前に記録すると「記録は残ったが送信されていない」が起きる
    idempotency_store.set(idempotency_key, ttl=86400)

この順序の話は重要で、「先に記録してから処理する」か「処理してから記録する」かで、障害時に発生しうる問題の種類が変わる。決済のように二重実行が致命的なケースは先に記録(先にロック)、メールのように未送信のほうが困るケースは処理後に記録、という判断軸が一つある。

後付けするとどうつらいのか

冒頭の経験をもう少し具体的に言うと、問題は「すでに動いているジョブのロジックに手を入れる」こと自体がリスクになる点だ。特に決済処理は、ステート遷移が複雑に絡んでいると「この冪等キーのスコープをどこにするか」の判断が非常に難しくなる。ジョブが複数のフェーズを持つ場合、フェーズ単位で冪等キーを管理しないといけないが、それを後から切り出すのは外科手術に近い。

最初から設計に組み込んでおけば、ジョブの構造がシンプルなうちに「どのスコープで一意性を保証するか」を決められる。それだけで後の修正コストが大きく変わる。