バックグラウンドジョブの冪等性、後付けすると地獄になる理由
バックグラウンドジョブの冪等性設計、後付けすると本当につらい
以前、決済系のバッチ処理で「同じ請求が二重に走った」インシデントを経験した。原因はシンプルで、ジョブキューの at-least-once 配信保証と、ジョブ実装側が冪等でなかったことの組み合わせだった。リトライは正常系として設計されていたのに、冪等性は「まあ滅多に起きないから」で後回しにされていた。当然ながら、後からそれを直すのは地獄だった。
バックグラウンドジョブのリトライは例外ではなく通常運転だ。ネットワーク瞬断、デプロイのタイミング、ワーカープロセスの強制終了、どれでも発生する。冪等性は最初から設計に組み込むべきで、後付けでできるものではない。
冪等キーの設計
冪等性を担保する最もシンプルな手段は「冪等キー」だ。「この処理はこの ID で一度だけ実行する」という契約をシステムに持たせる。
冪等キーの設計で重要なのは、キーの粒度をジョブの意図と一致させることだ。たとえば「ユーザー A の 2024 年 6 月分の請求を実行する」というジョブなら、冪等キーは billing:user_id=123:month=2024-06 のような形になる。job_id だけを使うのは危険で、ジョブがリエンキューされた場合に別の job_id が発行されてしまい、同じ処理が二度走る。
def process_monthly_billing(user_id: int, billing_month: str):
idempotency_key = f"billing:{user_id}:{billing_month}"
with db.transaction():
existing = db.query(
"SELECT id, status FROM idempotency_keys WHERE key = %s FOR UPDATE",
[idempotency_key]
)
if existing and existing["status"] == "completed":
return # 既に完了済み、スキップ
if not existing:
db.execute(
"INSERT INTO idempotency_keys (key, status) VALUES (%s, 'processing')",
[idempotency_key]
)
# 実際の請求処理
_charge_user(user_id, billing_month)
db.execute(
"UPDATE idempotency_keys SET status = 'completed' WHERE key = %s",
[idempotency_key]
)
FOR UPDATE でロックを取っているのは、複数ワーカーが同時に同じキーを処理しようとした場合の競合対策だ。これがないと、どちらも existing = None を読んで両方が処理を開始する。
DBレベルの重複排除
冪等キーテーブルに UNIQUE 制約を張って、楽観的に INSERT して重複を検知するパターンも有効だ。特に処理の開始前に確実にロックしたいケースで使いやすい。
CREATE TABLE idempotency_keys (
key VARCHAR(255) PRIMARY KEY,
status VARCHAR(20) NOT NULL DEFAULT 'processing',
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
completed_at TIMESTAMP
);
def process_with_upsert(idempotency_key: str, handler):
try:
db.execute(
"INSERT INTO idempotency_keys (key) VALUES (%s)",
[idempotency_key]
)
except UniqueViolationError:
# 既に誰かが処理中か完了済み
row = db.query("SELECT status FROM idempotency_keys WHERE key = %s", [idempotency_key])
if row["status"] == "completed":
return
# processing のままなら前の処理が途中で死んでいる可能性がある
# ここで冪等キーの有効期限やタイムアウトを見て再処理を許可するか判断する
raise RetryableError("Job may be in progress, will retry")
handler()
db.execute(
"UPDATE idempotency_keys SET status = 'completed', completed_at = NOW() WHERE key = %s",
[idempotency_key]
)
processing のまま残ったレコードの扱いは設計の悩みどころで、一定時間後に再処理を許可するか、アラートを出して手動確認にするかはシステムの性質による。決済なら後者を選ぶことが多い。
メール送信の場合
決済と違ってメール送信は「二重送信してしまった」ときのダメージが小さいが、ゼロではない。パスワードリセットメールが 5 通届いたらユーザー体験は最悪だ。
メール送信の冪等キーは email_type:user_id:trigger_event_id のような形で管理するのが実務的だ。trigger_event_id に相当するものがなければ、日付やリクエスト ID を使う。
def send_password_reset_email(user_id: int, reset_token_id: str):
idempotency_key = f"email:password_reset:{user_id}:{reset_token_id}"
if idempotency_store.exists(idempotency_key):
logger.info("Email already sent, skipping", key=idempotency_key)
return
email_service.send(
to=get_user_email(user_id),
template="password_reset",
token=get_reset_token(reset_token_id)
)
# メール送信は外部サービスの呼び出しなのでここでも失敗しうる
# 送信成功後に記録する。送信前に記録すると「記録は残ったが送信されていない」が起きる
idempotency_store.set(idempotency_key, ttl=86400)
この順序の話は重要で、「先に記録してから処理する」か「処理してから記録する」かで、障害時に発生しうる問題の種類が変わる。決済のように二重実行が致命的なケースは先に記録(先にロック)、メールのように未送信のほうが困るケースは処理後に記録、という判断軸が一つある。
後付けするとどうつらいのか
冒頭の経験をもう少し具体的に言うと、問題は「すでに動いているジョブのロジックに手を入れる」こと自体がリスクになる点だ。特に決済処理は、ステート遷移が複雑に絡んでいると「この冪等キーのスコープをどこにするか」の判断が非常に難しくなる。ジョブが複数のフェーズを持つ場合、フェーズ単位で冪等キーを管理しないといけないが、それを後から切り出すのは外科手術に近い。
最初から設計に組み込んでおけば、ジョブの構造がシンプルなうちに「どのスコープで一意性を保証するか」を決められる。それだけで後の修正コストが大きく変わる。