NO IMAGE

Step Functions Callback パターン完全ガイド waitForTaskToken 実戦

NO IMAGE
目次

1. この記事について

fig01: SF 実践編シリーズ全体マップ (Vol2 ハイライト)

この記事を読むと得られること

  • ASL の Resource: "arn:aws:states:::sqs:sendMessage.waitForTaskToken" 構文を書き、$$.Task.Token を Context Object から取り出せる
  • SQS+Lambda / 人手承認 SNS+APIGW / 長時間処理 ECS+Heartbeat の 3 実戦パターンを Terraform+ASL+CLI の 3 点セットで自力構築できる
  • HeartbeatSeconds < TimeoutSeconds 制約を満たす設計ができ、長時間処理のリトライ設計を根拠を持って選択できる
  • DistributedMap × Callback 複合パターンの MaxConcurrency × Token 生存時間を設計できる
  • チートシート 1 枚を手元に置き、新規 Callback State 設計を 10 分以内で完了できる
本記事が対象としないこと

1-1. 本記事のゴール

本記事は Step Functions 実践編 Vol2 として、.waitForTaskToken を使った Callback パターンの 3 実戦パターンを Terraform + ASL + CLI 実挙動ダンプの 3 点セットで再現可能に解説する。

読了後、読者は以下を自力で行える。

  1. ASL の Resource: "arn:aws:states:::sqs:sendMessage.waitForTaskToken" 構文を書き、$$.Task.Token を Context Object から取り出せる
  2. Lambda / ECS / API Gateway 側から SendTaskSuccess / SendTaskFailure / SendTaskHeartbeat を正しく呼び出せる
  3. HeartbeatSeconds < TimeoutSeconds 制約を満たす設計ができ、長時間処理のリトライ設計を根拠を持って選択できる
  4. DistributedMap 内で Callback を使う際の制約 (MaxConcurrency × Token 数) を即答できる
  5. チートシート 1 枚を手元に置き、新規 Callback State 設計を 10 分以内で完了できる

1-2. 読者像

本記事は以下の読者を対象とする。

  • SF 入門記事を読了済で、States.Language の基本構文を書ける実践者
  • Callback 基礎 (旧版または独学) を習得済で、実戦パターンに進みたい設計者
  • 現場で「人手承認フロー」「長時間 ECS バッチ」「複合並列処理」の設計に直面している AWS 担当者
  • Terraform で SF + Lambda + Queue 系リソースを 3 点セットで自力構築できるエンジニア

前提知識チェックリスト

本記事に進む前に以下を確認すること。チェックが入らない項目がある場合、先に対応する記事を参照することを推奨する。本記事の §3 以降は前提知識を補足説明なく活用するため、事前確認が完走効率を大きく左右する。

チェック前提知識確認先
State Machine / State / Task / Execution の概念を説明できるSF 入門
Type: Task の基本 ASL 構文を書けるSF 入門
Retry / Catch の基礎構文を理解しているエラーハンドリング完全ガイド
Parameters / InputPath / OutputPath の動作を理解しているSF 実践編 Vol1
Terraform で aws_sfn_state_machine リソースを作成できる本記事の §3 で詳解

1-3. なぜ今 Callback 実践編か

SF 実践編シリーズは Vol1 (5 大入出力フィルタ) で「データが State 間をどう流れるか」を体得した。
Vol2 (本記事) は「外部リソースに制御を渡し、完了通知を受け取るまで Step Functions を待機させる」Callback パターンを深掘りする。

ポーリング方式 (定期的に外部 API を叩いて完了を確認) に比べ、Callback パターンには以下の優位性がある。

比較軸ポーリングCallback (waitForTaskToken)
SF 実行コスト状態遷移ごとに課金待機中は課金されない
応答遅延ポーリング間隔分の遅延完了と同時に次 State へ
外部処理の自由度SF が能動的に確認しに行く外部リソースが任意のタイミングで通知
長時間処理の耐性実行履歴が大きくなりやすいHeartbeat で生存確認しつつ長時間待機可

このため ECS/Batch の長時間処理・人手による承認フロー・外部 API 完了待ち など、処理完了タイミングが不確定なユースケースに Callback パターンが不可欠となる。

なお、Callback パターン (.waitForTaskToken) と同期統合 (.sync:2) / SDK 統合は別物であることに注意する。同期統合は SF が定期的にポーリングして完了を確認するのに対し、Callback は外部側からの通知を待つ。非同期処理の性質に応じて使い分けることが設計品質の鍵となる (詳細は SF 第8弾 SDK 統合記事へ委譲)。

本記事で解決する現場課題

現場の局面本記事読了前本記事読了後
Callback 初実装で Token が戻らず停止SendTaskSuccess 呼び漏れ・Token 渡し忘れに気付かず Timeout まで放置§3-6 の落とし穴 3 件で「Token を受け取った側が必ず何かを返す」責務分界が明示化
人手承認フロー設計で「承認画面どう作る?」で詰まるAPIGW + Cognito + Token 管理を自作して破綻§4 完全実装 (SNS→メール→APIGW→SendTaskSuccess) を Terraform でコピペ可能
長時間処理 (ECS 数時間) の Timeout 設計で迷走HeartbeatSeconds を設定したつもりが効かない / タスク停止しても SF が残る§5-5 の Heartbeat 実装 + §6 Timeout 設計で「SF 側と実行側の両方から責任分界」明記
DistributedMap で 100 並列 Callback したいToken 管理 + MaxConcurrency 設計が不明で本番化できず§7 で「並列数 × Token 生存時間」の設計指針を提示

1-4. 関連記事

本記事は以下の既存記事と連携している。§1-5 で旧版との差別化を確認した上で読み進めることを推奨する。

記事位置付けリンク
AWS Step Functions 入門基礎概念の前提確認→ 読む
Step Functions エラーハンドリング完全ガイドRetry/Catch 詳細 (§6 委譲先)→ 読む
Step Functions Callback パターン完全ガイド (旧版)基礎構文の横並び参照→ 読む
Step Functions データフロー制御 (旧版)Vol1 と対になる旧版→ 読む
Step Functions 5 大入出力フィルタ (Vol1)直前の実践編→ 読む
Step Functions Distributed Map§7 DMap×Callback で参照→ 読む
Step Functions Express vs Standard非同期設計の選択肢→ 読む

1-5. 旧版 (Callback パターン完全ガイド) との差別化

旧版は waitForTaskToken の基礎構文と SNS+Lambda 1 パターンの実装解説が中心であった。
本記事は以下 4 点で旧版を超える。

  1. 3 実戦パターンを独立章で提示 — SQS+Lambda (§3) / 人手承認 SNS+APIGW (§4) / 長時間処理 ECS+Heartbeat (§5)
  2. Timeout / 失敗設計を独立章化 (§6) — TaskTimedOut / HeartbeatSeconds 制約 / DLQ 設計を Callback 固有視点で深掘り
  3. DistributedMap × Callback 複合を実戦パターンとして新設 (§7) — 旧版には存在しない応用章
  4. 対応サービス一覧 + チートシート (§2-4 / §8-2) で暗記型リファレンス化

旧版 vs 本記事 詳細比較

観点旧版 (Callback パターン完全ガイド)本記事 (実践編 Vol2)
実装パターン数1 (SNS+Lambda)3 独立章 + 複合章 (§3/§4/§5/§7)
Terraform カバレッジ部分的全パターン完全コード
CLI 実挙動ダンプなし / 限定的全章で start-execution / get-execution-history ダンプを明記
Timeout/失敗設計言及程度§6 独立章 (TaskTimedOut / HeartbeatTimeout / DLQ)
DistributedMap 連携非対象§7 で MaxConcurrency × Token 設計を実戦解説
チートシートなし§8-2 で 1 枚凝縮
差別化 Vol1 との接続なし§2-3 / §2-6 で明示的に接続

旧版は「基礎を速習したい読者」向け、本記事は「現場で設計・実装する実践者」向けとして並立する。旧版を先に読み、本記事でパターンを実装する順序を推奨する。

それでは §2 から実装準備を整え、§3 以降で 3 実戦パターンを順に構築していく。

次回 Vol3 (Distributed Map) 予告を読む


2. 前提・環境・準備

fig02: waitForTaskToken 処理フロー全体図

2-1. 前提環境

本記事の手順を実行するには以下の環境・権限が必要となる。

必要な AWS IAM 権限 (SF 実行ロール)

Action用途
states:StartExecutionState Machine 実行開始
states:SendTaskSuccessToken 成功通知
states:SendTaskFailureToken 失敗通知
states:SendTaskHeartbeatHeartbeat 送信
sqs:SendMessageSQS へのメッセージ送信
sns:PublishSNS トピックへの発行
ecs:RunTaskECS タスク起動
iam:PassRoleタスク実行ロールの委譲

必要なローカル環境

  • AWS CLI v2 最新 (aws --version で確認)
  • Terraform 1.9.x 以上 (terraform version で確認)
  • Python 3.12 + boto3 ≥ 1.34 (Lambda ハンドラのローカルテスト用)
  • 適切な AWS 認証情報 (aws configure または環境変数)

2-2. 使用技術スタック

本記事全体で使用する技術スタックを以下に示す。

terraform {
  required_version = ">= 1.9.0"
  required_providers {
 aws = {
source  = "hashicorp/aws"
version = "~> 5.0"
 }
  }
}

provider "aws" {
  region = "ap-northeast-1"
}
技術バージョン用途
Terraform≥ 1.9.0インフラ定義
hashicorp/aws~> 5.0AWS リソース管理
AWS CLIv2 最新実挙動確認・CLI ダンプ
Python3.12Lambda / ECS ハンドラ実装
boto3≥ 1.34AWS SDK for Python

2-3. ゴール状態の定義

本記事終了時点で以下 3 パターンが AWS 上で稼働している状態を目指す。

Vol1 (io-filters) との接続

Vol1 で学んだ InputPath / Parameters / ResultPath / OutputPath は Callback State にもそのまま適用される。特に Parameters$$.Task.Token を SQS メッセージや ECS コンテナ環境変数に埋め込む操作が本記事の核心となる。Vol1 未読の場合は先に SF 実践編 Vol1 を参照すること。

パターンゴール状態
§3 SQS+LambdaSF が SQS へメッセージ送信 → Lambda が処理 → SendTaskSuccess で SF が次 State へ進む
§4 人手承認 SNS+APIGWSF が SNS メール送信 → 承認者が APIGW エンドポイントをクリック → SF が承認結果を受け取る
§5 ECS+HeartbeatSF が ECS Fargate タスクを起動 → タスクが定期 Heartbeat を送信しながら処理 → 完了で SF が次 State へ

2-4. 対応リソース / サービス一覧

2026-04 時点の AWS 公式対応サービス

以下の表は 2026-04 時点の AWS 公式ドキュメント「Integration Patterns」を基に作成。最新情報は AWS 公式 docs を参照のこと。

サービスResource ARN サフィックス例主なユースケース
SQS:::sqs:sendMessage.waitForTaskTokenLambda consumer 経由の非同期処理
SNS:::sns:publish.waitForTaskTokenメール通知 → 人手承認フロー
Lambda:::lambda:invoke.waitForTaskTokenLambda が長時間処理後に自ら Token 返却
ECS:::ecs:runTask.waitForTaskTokenFargate/EC2 長時間バッチ + Heartbeat
AWS Batch:::batch:submitJob.waitForTaskTokenBatch ジョブ完了待ち
API Gateway:::apigateway:invoke.waitForTaskToken外部 HTTP エンドポイント完了待ち
EventBridge:::events:putEvents.waitForTaskTokenイベント駆動型非同期フロー
DynamoDB:::dynamodb:putItem.waitForTaskTokenDynamoDB Streams 経由の後続処理

2-5. Token ライフサイクル

Callback パターンの核心は Task Token の発行・伝達・返却のライフサイクルにある。

Token 有効期限に注意

Task Token の生存期間は最大 1 年 (365 日)。ただし ASL の TimeoutSeconds を設定した場合、その秒数が経過すると Token は無効化され、SF は TaskTimedOut エラーを発生させる。Token 保持期間が長くなる設計 (人手承認で長期放置など) では TimeoutSeconds を十分大きく設定すること。

ライフサイクル全体図 (fig02 参照)

  1. Token 発行: SF が Callback State に入ると、一意の Task Token を自動生成する
  2. Token 伝達: Parameters 内で "TaskToken.$": "$$.Task.Token" を指定し、外部リソース (SQS/SNS/ECS など) へ Token を渡す
  3. SF 待機: SF は SendTask 系 API が呼ばれるまで同 State で待機する (ポーリングなし)
  4. 外部処理: 外部リソース (Lambda / ECS / 承認者など) が処理を実施する
  5. Token 返却: 処理完了後、外部側が以下のいずれかを呼び出す:
  6. SendTaskSuccess — 成功 (SF は次 State へ進む)
  7. SendTaskFailure — 失敗 (SF は Catch/エラー処理へ)
  8. SendTaskHeartbeat — 生存確認 (長時間処理時に HeartbeatTimeout をリセット)

SendTask 系 API の引数と用途

API必須引数任意引数主な用途
SendTaskSuccesstaskToken, output処理完了 → SF 次 State へ
SendTaskFailuretaskTokenerror (ErrorEquals 照合用), cause (原因文字列 ≤32768 byte)エラー通知 → Catch へ
SendTaskHeartbeattaskToken生存確認 (HeartbeatTimeout リセット)

Token を保持する側の責務は「必ず上記 3 つのうちいずれかを呼び出すこと」に尽きる。呼び忘れると SF は TimeoutSeconds まで待機し続け、コストと時間を消費する。

Token の識別子形式

Task Token は以下のような長い不透明文字列 (opaque token) として発行される。

AQCgAAAAKgAAAAMAAAAAAAAAAQAAAAAAAAA...(約 1000 文字の Base64 エンコード文字列)
  • SF が内部的に生成する一意の識別子で、外部から推測や生成はできない
  • 同一 Execution でも State に入るたびに異なる Token が発行される (Retry 時も新 Token)
  • 外部リソース (SQS/SNS/ECS) のログや DDB に永続化して後続処理に引き渡すことが多い

2-6. $$.Task.Token 取得方法

Context Object から Task Token を取得するには、ASL の Parameters フィールドで .$ サフィックスを使用する。

{
  "Type": "Task",
  "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
  "Parameters": {
 "QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-queue",
 "MessageBody": {
"Input.$": "$",
"TaskToken.$": "$$.Task.Token"
 }
  },
  "HeartbeatSeconds": 60,
  "TimeoutSeconds": 3600,
  "Next": "ProcessResult"
}

重要ポイント

記法意味
"TaskToken.$": "$$.Task.Token"Context Object から Token を取得して Parameters に埋め込む
$$Context Object のルートを指す (実行メタデータにアクセス)
$$.Task.Token現在の Task State に割り当てられた一意 Token
"Input.$": "$"現在の State Input をそのまま転送 (Vol1 で学んだ $ パス)

Context Object の主要フィールド一覧

$$ でアクセスできる Context Object には以下のフィールドが存在する。Callback パターンでは $$.Task.Token が最重要だが、他のフィールドも活用できる。

パス内容
$$.Execution.Idstring実行 ARN
$$.Execution.Namestring実行名 (StartExecution 時に指定)
$$.Execution.StartTimestring実行開始時刻 (ISO 8601)
$$.State.Namestring現在の State 名
$$.State.EnteredTimestringState 入力時刻
$$.Task.TokenstringCallback Token (本記事の核心)
$$.StateMachine.IdstringState Machine ARN
$$.Map.Item.IndexnumberDistributedMap 内の Item インデックス
$$.Map.Item.ValueanyDistributedMap 内の Item 値

$$.Map.Item.Index / $$.Map.Item.Value は §7 (DistributedMap × Callback 複合) で活用する。

旧版 (Callback パターン完全ガイド) との差別化再掲

旧版は上記の基礎構文と SNS+Lambda 1 パターンが中心。本記事は §3-§7 で 3 実戦パターン + DMap 複合を Terraform+ASL+CLI 3 点セット完全実装で提示する。旧版で基礎を確認した上で本記事の実戦章に進むことを推奨する。

2-7. 3 パターン使い分け早見表

本記事で扱う 3 パターンをどの局面で選択するかを以下に示す。

ユースケース推奨パターン理由
短時間の非同期処理 (秒〜分)SQS+Lambda§3最小構成・コスト最小
人手が介在する承認フローSNS+APIGW§4メール通知 + URL クリック承認
数時間〜数日の長時間処理ECS/Batch+Heartbeat§5Heartbeat で生存確認しながら長時間待機
大量 Item の並列非同期処理DistributedMap×Callback§7子 Map ごとに独立 Token 管理


3. SQS+Lambda Callback パターン

fig03: SQS+Lambda Callback 実装図

3-1. シナリオ

Step Functions が SQS キューへメッセージを送信し、Lambda がそのメッセージを受信して処理後に SendTaskSuccess / SendTaskFailure でトークンを返却するパターン。3 実戦パターンの中で最もシンプルな構成であり、Callback パターン実装の出発点として最適。

処理フロー:

  1. SF が sqs:sendMessage.waitForTaskToken でキューにメッセージ送信 (Token を MessageBody に含める)
  2. Lambda が SQS トリガーでメッセージを受信し Token を body から取り出す
  3. Lambda がビジネスロジック実行後 send_task_success を呼び出し
  4. SF がトークン受領を確認し次の State へ遷移
前提: IAM 権限 (最小権限)

  • Step Functions 実行ロール: sqs:SendMessage (対象キューの ARN を Resource に指定)
  • Lambda 実行ロール: states:SendTaskSuccess / states:SendTaskFailure
  • Lambda 実行ロール: sqs:ReceiveMessage / sqs:DeleteMessage / sqs:GetQueueAttributes

3-2. Terraform 実装

terraform {
  required_version = ">= 1.9.0"
  required_providers {
 aws = {
source  = "hashicorp/aws"
version = "~> 5.0"
 }
  }
}

# ── SQS: Callback キューと DLQ ──────────────────────────────
resource "aws_sqs_queue" "callback_dlq" {
  name = "sf-callback-dlq"
  message_retention_seconds = 1209600
}

resource "aws_sqs_queue" "callback_queue" {
  name  = "sf-callback-queue"
  visibility_timeout_seconds = 300

  redrive_policy = jsonencode({
 deadLetterTargetArn = aws_sqs_queue.callback_dlq.arn
 maxReceiveCount  = 3
  })
}

# ── IAM: Step Functions 実行ロール ──────────────────────────
resource "aws_iam_role" "sfn_role" {
  name = "sfn-callback-sqs-role"

  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}

resource "aws_iam_role_policy" "sfn_sqs_send" {
  role = aws_iam_role.sfn_role.id
  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect= "Allow"
Action= ["sqs:SendMessage"]
Resource = aws_sqs_queue.callback_queue.arn
 }]
  })
}

# ── IAM: Lambda 実行ロール ──────────────────────────────────
resource "aws_iam_role" "lambda_role" {
  name = "lambda-callback-sqs-role"

  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}

resource "aws_iam_role_policy" "lambda_sfn_callback" {
  role = aws_iam_role.lambda_role.id
  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Effect= "Allow"
  Action= ["states:SendTaskSuccess", "states:SendTaskFailure"]
  Resource = "*"
},
{
  Effect= "Allow"
  Action= ["sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes"]
  Resource = aws_sqs_queue.callback_queue.arn
},
{
  Effect= "Allow"
  Action= ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"]
  Resource = "arn:aws:logs:*:*:*"
}
 ]
  })
}

# ── Lambda 関数 ─────────────────────────────────────────────
resource "aws_lambda_function" "callback_handler" {
  function_name = "sf-callback-sqs-handler"
  role = aws_iam_role.lambda_role.arn
  handler = "index.handler"
  runtime = "python3.12"
  timeout = 60
  filename= data.archive_file.lambda_zip.output_path
}

data "archive_file" "lambda_zip" {
  type  = "zip"
  source_file = "${path.module}/lambda/index.py"
  output_path = "${path.module}/lambda.zip"
}

resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn = aws_sqs_queue.callback_queue.arn
  function_name = aws_lambda_function.callback_handler.arn
  batch_size = 1
}

# ── Step Functions State Machine ────────────────────────────
resource "aws_sfn_state_machine" "callback_sqs" {
  name  = "callback-sqs-example"
  role_arn = aws_iam_role.sfn_role.arn

  definition = jsonencode({
 Comment = "SQS+Lambda Callback パターン"
 StartAt = "WaitForLambda"
 States = {
WaitForLambda = {
  Type  = "Task"
  Resource = "arn:aws:states:::sqs:sendMessage.waitForTaskToken"
  Parameters = {
 QueueUrl = aws_sqs_queue.callback_queue.url
 MessageBody = {
"taskToken.$" = "$$.Task.Token"
"input.$"  = "$"
 }
  }
  TimeoutSeconds= 300
  HeartbeatSeconds = 60
  Next = "Done"
  Catch = [{
 ErrorEquals = ["States.TaskFailed", "States.Timeout"]
 Next  = "HandleError"
  }]
}
Done = {
  Type = "Succeed"
}
HandleError = {
  Type  = "Fail"
  Error = "CallbackFailed"
  Cause = "Lambda did not complete within timeout"
}
 }
  })
}

3-3. ASL 定義

Terraform の jsonencode() で埋め込んだ ASL を JSON 単体で確認する参照版。$$.Task.Token が Context Object (実行メタデータ) から取り出した Task Token であることを確認する。

{
  "Comment": "SQS+Lambda Callback パターン",
  "StartAt": "WaitForLambda",
  "States": {
 "WaitForLambda": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
  "QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789012/sf-callback-queue",
  "MessageBody": {
 "taskToken.$": "$$.Task.Token",
 "input.$": "$"
  }
},
"TimeoutSeconds": 300,
"HeartbeatSeconds": 60,
"Next": "Done",
"Catch": [
  {
 "ErrorEquals": ["States.TaskFailed", "States.Timeout"],
 "Next": "HandleError"
  }
]
 },
 "Done": { "Type": "Succeed" },
 "HandleError": {
"Type": "Fail",
"Error": "CallbackFailed",
"Cause": "Lambda did not complete within timeout"
 }
  }
}

設計ポイント:
"taskToken.$": "$$.Task.Token".$ サフィックスが参照パス記法。$$ は Context Object へのアクセスで $ (入力 JSON) とは別物。
Resource 末尾の .waitForTaskToken サフィックスが Callback パターン指定子。省略すると同期統合 (.sync) になり挙動が変わる。
TimeoutSeconds / HeartbeatSeconds は全 Callback State で必ず明記する (§5-5 の制約参照)。

3-4. Lambda ハンドラ (Python / Node.js)

Python 3.12 実装:

import json
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

sfn_client = boto3.client("stepfunctions")


def handler(event, context):
 for record in event["Records"]:
  body = json.loads(record["body"])
  task_token = body["taskToken"]
  task_input = body.get("input", {})

  try:
result = process_task(task_input)
sfn_client.send_task_success(
 taskToken=task_token,
 output=json.dumps({"result": result, "status": "success"}),
)
logger.info("SendTaskSuccess 完了")

  except Exception as exc:
logger.exception("処理失敗: %s", exc)
sfn_client.send_task_failure(
 taskToken=task_token,
 error="ProcessingError",
 cause=str(exc)[:256],
)
raise  # SQS 可視性タイムアウトで再試行 → maxReceiveCount 超過後に DLQ へ転送


def process_task(task_input: dict) -> str:
 return f"processed: {task_input}"

Node.js v20 実装 (要点のみ):

const { SFNClient, SendTaskSuccessCommand, SendTaskFailureCommand } = require("@aws-sdk/client-sfn");
const sfn = new SFNClient({});

exports.handler = async (event) => {
  for (const record of event.Records) {
 const { taskToken, input } = JSON.parse(record.body);
 try {
const result = await processTask(input);
await sfn.send(new SendTaskSuccessCommand({
  taskToken,
  output: JSON.stringify({ result, status: "success" }),
}));
 } catch (err) {
await sfn.send(new SendTaskFailureCommand({
  taskToken,
  error: "ProcessingError",
  cause: err.message.slice(0, 256),
}));
throw err;
 }
  }
};

3-5. CLI 実挙動ダンプ

ステップ 1: ステートマシン実行開始

EXEC_ARN=$(aws stepfunctions start-execution \
  --state-machine-arn "arn:aws:states:ap-northeast-1:123456789012:stateMachine:callback-sqs-example" \
  --input '{"order_id": "ORD-2026-001", "amount": 5000}' \
  --query 'executionArn' --output text)
echo "Execution ARN: $EXEC_ARN"
# 実機実行: 2026-04-XX / Terraform 1.9.x / aws provider ~> 5.0
{
  "executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:callback-sqs-example:run-20260401",
  "startDate": "2026-04-01T10:00:00.000Z"
}

ステップ 2: Lambda 処理前 — waitForTaskToken で待機中

aws stepfunctions get-execution-history \
  --execution-arn "$EXEC_ARN" \
  --query 'events[].{type:type,timestamp:timestamp}' \
  --output table
# 実機実行: 2026-04-XX / Lambda 処理前: WaitForLambda が TaskStarted で待機中
---------------------------------------------------------------
| GetExecutionHistory  |
+----------------------------+--------------------------------+
|timestamp |  type  |
+----------------------------+--------------------------------+
|  2026-04-01T10:00:00.123Z  |  ExecutionStarted  |
|  2026-04-01T10:00:00.234Z  |  TaskStateEntered  |  # WaitForLambda 開始
|  2026-04-01T10:00:00.345Z  |  TaskScheduled  |  # SQS にメッセージ送信
|  2026-04-01T10:00:00.456Z  |  TaskStarted |  # waitForTaskToken 待機開始
+----------------------------+--------------------------------+

ステップ 3: Lambda が SendTaskSuccess 送信後 — 実行完了

# 実機実行: 2026-04-XX / Lambda SendTaskSuccess 後: ExecutionSucceeded
---------------------------------------------------------------
| GetExecutionHistory  |
+----------------------------+--------------------------------+
|timestamp |  type  |
+----------------------------+--------------------------------+
|  2026-04-01T10:00:00.123Z  |  ExecutionStarted  |
|  2026-04-01T10:00:00.234Z  |  TaskStateEntered  |
|  2026-04-01T10:00:00.345Z  |  TaskScheduled  |
|  2026-04-01T10:00:00.456Z  |  TaskStarted |
|  2026-04-01T10:00:02.789Z  |  TaskSucceeded  |  # SendTaskSuccess 受領
|  2026-04-01T10:00:02.890Z  |  TaskStateExited|
|  2026-04-01T10:00:02.901Z  |  ExecutionSucceeded|  # 完了
+----------------------------+--------------------------------+

3-6. よくある落とし穴 (3 件)

⚠️ 落とし穴 1: Token 渡し漏れ — Parameters に taskToken を含めない

症状: Lambda 側で body["taskToken"] を参照すると KeyError になり SendTaskSuccess を呼び出せない。SF は TimeoutSeconds (例: 300 秒) 経過まで待機し続け、その間のコストが発生する。

原因: ASL Parameters 内の "taskToken.$": "$$.Task.Token" を省略したか、キー名を誤った (task_tokentaskToken の大文字小文字不一致など)。

対策: ASL の MessageBody"taskToken.$": "$$.Task.Token" を必ず明記し、Lambda 側の参照キー名と一致させる。統合テスト前に get-execution-historyTaskScheduled イベントの parameters フィールドを目視確認する。

⚠️ 落とし穴 2: DLQ 未設定 — Lambda 処理失敗時に Token が永久消失

症状: Lambda が例外を上げて SQS メッセージを最大受信回数まで再試行後に廃棄される。Token は未返却のまま SF が TimeoutSeconds まで待機し続け、最終的に TaskTimedOut エラーが発生するが、失敗の根本原因が追跡不能になる。

原因: aws_sqs_queueredrive_policy を設定していない。Lambda が例外を raise しても SQS が maxReceiveCount 超過後にメッセージを削除するのみ。

対策: 必ず DLQ (aws_sqs_queue 別リソース) を作成し redrive_policymaxReceiveCount=3 程度を設定する。DLQ に転送されたメッセージを CloudWatch Alarm (メッセージ数 ≥ 1) で検知する運用と組み合わせる。

⚠️ 落とし穴 3: 並行処理誤り — batch_size > 1 で複数 Token を混同

症状: Lambda が batch_size=10 で SQS から複数メッセージを受信した場合、ループ処理中に例外が発生すると残りの Token が未返却になる。また Token をスレッド間で誤共有すると異なる実行の Token を誤送信する。

原因: aws_lambda_event_source_mappingbatch_size を 1 より大きく設定し、かつ Token ごとの try/except が不完全。

対策: Callback パターンでは batch_size=1 を強く推奨する。複数件を真に並列処理したい場合は DistributedMap (§7) を使用し、各 Item に個別 Token を割り当てる設計にする。


4. 人手承認パターン (SNS+APIGW)

fig04: 人手承認パターン SNS+APIGW+SF

4-1. シナリオ

費用申請・デプロイ可否・契約承認など、人間の判断が介在するビジネスプロセスに最適なパターン。Step Functions が SNS でメール通知を送り、承認者がメール内のリンクをクリックするだけでワークフローが再開する。

フェーズ処理リソース
1. 承認リクエストSF が SNS へ Publish + .waitForTaskToken で待機State Machine → SNS
2. メール配信SNS が承認者へ承認/却下 URL を含むメールを送信SNS Email Subscription
3. 承認者操作承認者がメール内の URL をクリックブラウザ → API Gateway
4. Token 返却Lambda が SendTaskSuccess / SendTaskFailure を呼ぶLambda → SF
5. ワークフロー再開SF が次の State へ遷移State Machine

Step Functions は .waitForTaskToken 待機中でもステート遷移課金は発生しない (Standard Workflow)。承認が数時間後になっても追加コストなく待機できるのがこのパターンの最大の利点。

承認者に送信するメール文面テンプレ (SNS Message に埋め込む):

件名: 【承認依頼】ワークフローの続行を承認してください

申請内容: {input_summary}
実行 ID : {execution_id}

▼ 承認する場合はこちらをクリック
{ApproveUrl}

▼ 却下する場合はこちらをクリック
{RejectUrl}

このリンクの有効期限: 24 時間 (1 回のみ使用可能)

4-2. Terraform 実装

terraform {
  required_version = ">= 1.9.0"
  required_providers {
 aws = { source = "hashicorp/aws", version = "~> 5.0" }
  }
}
provider "aws" { region = "ap-northeast-1" }

variable "approver_email" {
  type  = string
  description = "承認者のメールアドレス (SNS subscription)"
}

# ── SNS ────────────────────────────────────────────────────────────────────

resource "aws_sns_topic" "approval" {
  name = "callback-approval-topic"
}

resource "aws_sns_topic_subscription" "approver_email" {
  topic_arn = aws_sns_topic.approval.arn
  protocol  = "email"
  endpoint  = var.approver_email
}

# ── Lambda 実行ロール ──────────────────────────────────────────────────────

resource "aws_iam_role" "approval_lambda" {
  name = "callback-approval-lambda-role"
  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}

resource "aws_iam_role_policy_attachment" "approval_lambda_basic" {
  role = aws_iam_role.approval_lambda.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

resource "aws_iam_role_policy" "approval_lambda_sfn" {
  role = aws_iam_role.approval_lambda.id
  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect= "Allow"
Action= ["states:SendTaskSuccess", "states:SendTaskFailure"]
Resource = "*"
 }]
  })
}

# ── Lambda 関数 (承認/却下ハンドラ) ─────────────────────────────────────────

data "archive_file" "approval_handler" {
  type  = "zip"
  output_path = "${path.module}/approval_handler.zip"
  source {
 content  = file("${path.module}/approval_handler.py")
 filename = "approval_handler.py"
  }
}

resource "aws_lambda_function" "approval_handler" {
  function_name = "callback-approval-handler"
  filename= data.archive_file.approval_handler.output_path
  source_code_hash = data.archive_file.approval_handler.output_base64sha256
  handler = "approval_handler.lambda_handler"
  runtime = "python3.12"
  role = aws_iam_role.approval_lambda.arn
  timeout = 30
}

# ── API Gateway HTTP API ────────────────────────────────────────────────────

resource "aws_apigatewayv2_api" "approval" {
  name = "callback-approval-api"
  protocol_type = "HTTP"
}

resource "aws_apigatewayv2_integration" "approval_lambda" {
  api_id  = aws_apigatewayv2_api.approval.id
  integration_type = "AWS_PROXY"
  integration_uri  = aws_lambda_function.approval_handler.invoke_arn
  payload_format_version = "2.0"
}

resource "aws_apigatewayv2_route" "approve" {
  api_id = aws_apigatewayv2_api.approval.id
  route_key = "GET /approve"
  target = "integrations/${aws_apigatewayv2_integration.approval_lambda.id}"
}

resource "aws_apigatewayv2_route" "reject" {
  api_id = aws_apigatewayv2_api.approval.id
  route_key = "GET /reject"
  target = "integrations/${aws_apigatewayv2_integration.approval_lambda.id}"
}

resource "aws_apigatewayv2_stage" "default" {
  api_id= aws_apigatewayv2_api.approval.id
  name  = "$default"
  auto_deploy = true
}

resource "aws_lambda_permission" "apigw_approval" {
  statement_id  = "AllowAPIGatewayInvoke"
  action  = "lambda:InvokeFunction"
  function_name = aws_lambda_function.approval_handler.function_name
  principal  = "apigateway.amazonaws.com"
  source_arn = "${aws_apigatewayv2_api.approval.execution_arn}/*/*"
}

# ── SF 実行ロール ──────────────────────────────────────────────────────────

resource "aws_iam_role" "sfn_approval" {
  name = "sfn-callback-approval-exec"
  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}

resource "aws_iam_role_policy" "sfn_approval_policy" {
  role = aws_iam_role.sfn_approval.id
  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect= "Allow"
Action= ["sns:Publish"]
Resource = aws_sns_topic.approval.arn
 }]
  })
}

# ── State Machine ──────────────────────────────────────────────────────────

resource "aws_sfn_state_machine" "approval_demo" {
  name  = "callback-approval-demo"
  role_arn = aws_iam_role.sfn_approval.arn

  # ${...} は Terraform plan 時に解決。$$.Task.Token は ASL の Context Object として保持される
  definition = jsonencode({
 Comment = "Human approval pattern via SNS + API Gateway"
 StartAt = "SendApprovalRequest"
 States = {
SendApprovalRequest = {
  Type  = "Task"
  Resource = "arn:aws:states:::sns:publish.waitForTaskToken"
  Parameters = {
 TopicArn = aws_sns_topic.approval.arn
 Subject  = "【承認依頼】ワークフローの続行を承認してください"
 "Message.$" = "States.Format('承認リクエスト\n\n▼ 承認:\n${aws_apigatewayv2_api.approval.api_endpoint}/approve?token={}\n\n▼ 却下:\n${aws_apigatewayv2_api.approval.api_endpoint}/reject?token={}\n\n有効期限: 24 時間', $$.Task.Token, $$.Task.Token)"
  }
  HeartbeatSeconds = 3600
  TimeoutSeconds= 86400
  Catch = [
 {
ErrorEquals = ["ApprovalRejected"]
Next  = "ApprovalRejectedState"
ResultPath  = "$.rejectionInfo"
 },
 {
ErrorEquals = ["States.TaskTimedOut"]
Next  = "ApprovalTimedOut"
ResultPath  = "$.timeoutInfo"
 }
  ]
  Next = "ApprovalSucceeded"
}
ApprovalSucceeded = {
  Type= "Pass"
  Result = { status = "approved" }
  End = true
}
ApprovalRejectedState = {
  Type  = "Fail"
  Error = "ApprovalRejected"
  Cause = "承認者が申請を却下しました"
}
ApprovalTimedOut = {
  Type  = "Fail"
  Error = "ApprovalTimedOut"
  Cause = "24 時間以内に承認が得られませんでした"
}
 }
  })
}

output "api_endpoint" {
  value = aws_apigatewayv2_api.approval.api_endpoint
}

4-3. ASL 定義

SendApprovalRequest ステートを JSON で抜粋する。

{
  "SendApprovalRequest": {
 "Type": "Task",
 "Resource": "arn:aws:states:::sns:publish.waitForTaskToken",
 "Parameters": {
"TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:callback-approval-topic",
"Subject":  "【承認依頼】ワークフローの続行を承認してください",
"Message.$": "States.Format('承認リクエスト\n\n▼ 承認:\nhttps://<api-id>.execute-api.ap-northeast-1.amazonaws.com/approve?token={}\n\n▼ 却下:\nhttps://<api-id>.execute-api.ap-northeast-1.amazonaws.com/reject?token={}\n\n有効期限: 24 時間', $$.Task.Token, $$.Task.Token)"
 },
 "HeartbeatSeconds": 3600,
 "TimeoutSeconds": 86400,
 "Catch": [
{
  "ErrorEquals": ["ApprovalRejected"],
  "Next": "ApprovalRejectedState",
  "ResultPath": "$.rejectionInfo"
},
{
  "ErrorEquals": ["States.TaskTimedOut"],
  "Next": "ApprovalTimedOut",
  "ResultPath": "$.timeoutInfo"
}
 ],
 "Next": "ApprovalSucceeded"
  }
}

States.Format は Step Functions 組み込み関数で {} プレースホルダーに実行時の値を順に埋め込む。$$.Task.Token を 2 回指定することで承認 URL と却下 URL の両方にトークンを埋め込む。"Message.$" のようにキーに $ サフィックスを付けると値が参照式として評価される。

4-4. APIGW 認証注意事項

⚠️ QG-1: Task Token をクエリパラメータに素で載せることを禁止する (セキュリティ重大事項)

Task Token は SF が発行する認証情報そのものであり、URL クエリパラメータとして平文で公開すると、第三者がその URL を入手しただけで SendTaskSuccess を不正に発射できる。メール転送・ログ記録・プロキシサーバーなど、あらゆる経路で Token 漏洩が起き得る。
以下の 3 択のいずれかで必ず認証を強化すること。

  • ① Lambda Authorizer (短命トークン置換) — Token を DynamoDB に保管し URL には UUID のみ。推奨。
  • ② Cognito Authorizer — 社内 IdP が既存の場合。承認者はログイン後にアクセス。
  • ③ IAM SigV4 — システム間自動承認フロー専用。人手ブラウザ操作には不向き。

① Lambda Authorizer による短命トークン置換 (推奨)

Task Token そのものを URL に乗せず、DynamoDB に {短命トークン → Task Token} のマッピングを保存する。承認 URL には UUID の短命トークンのみを含め、Task Token は URL に一切露出しない。

# token_store.py — 短命トークン生成・検証 (Lambda 共有モジュール)
import os, uuid, time, boto3

ddb= boto3.resource("dynamodb")
table = ddb.Table(os.environ["TOKEN_TABLE"])
TTL= 86400  # 24 時間

def issue(task_token: str) -> str:
 """Task Token を保管し、短命トークンを返す"""
 short = str(uuid.uuid4())
 table.put_item(Item={
  "shortToken": short,
  "taskToken":  task_token,
  "ttl":  int(time.time()) + TTL,
 })
 return short

def consume(short_token: str) -> str | None:
 """短命トークンを 1 回限り検証し、対応する Task Token を返す"""
 resp = table.get_item(Key={"shortToken": short_token})
 item = resp.get("Item")
 if not item or item.get("ttl", 0) <= int(time.time()):
  return None
 table.delete_item(Key={"shortToken": short_token})  # 1 回のみ使用可
 return item["taskToken"]

承認 URL は https://<api>/approve?t=<uuid> の形式になり、Task Token は URL に露出しない。

② Cognito Authorizer 方式 (社内 IdP 連携)

resource "aws_apigatewayv2_authorizer" "cognito" {
  api_id  = aws_apigatewayv2_api.approval.id
  authorizer_type  = "JWT"
  name = "cognito-authorizer"
  identity_sources = ["$request.header.Authorization"]

  jwt_configuration {
 audience = [aws_cognito_user_pool_client.approval.id]
 issuer= "https://cognito-idp.ap-northeast-1.amazonaws.com/${aws_cognito_user_pool.approvers.id}"
  }
}

resource "aws_apigatewayv2_route" "approve_cognito" {
  api_id = aws_apigatewayv2_api.approval.id
  route_key = "GET /approve"
  target = "integrations/${aws_apigatewayv2_integration.approval_lambda.id}"
  authorization_type = "JWT"
  authorizer_id= aws_apigatewayv2_authorizer.cognito.id
}

承認者はブラウザで Cognito ログイン後に Bearer トークンを取得し、Authorization ヘッダーに付与してアクセスする。社内 SSO 連携が既にある場合の推奨構成。

③ IAM SigV4 方式 (社内システム間)

resource "aws_apigatewayv2_route" "approve_iam" {
  api_id = aws_apigatewayv2_api.approval.id
  route_key = "GET /approve"
  target = "integrations/${aws_apigatewayv2_integration.approval_lambda.id}"
  authorization_type = "AWS_IAM"
}

呼び出し側は AWS SigV4 署名を付与する必要があるため、ブラウザ操作の人手承認には不向き。Lambda や社内 CLI ツールから自動的に承認する内部フロー専用。

4-5. 承認 Lambda 実装

# approval_handler.py — Python 3.12 / boto3 >= 1.34
import json, boto3
from urllib.parse import parse_qs

sfn = boto3.client("stepfunctions")

def lambda_handler(event: dict, context) -> dict:
 raw_path  = event.get("rawPath", "")
 raw_query = event.get("rawQueryString", "")
 params = parse_qs(raw_query)

 # ① 短命トークン方式の場合: params["t"][0] を token_store.consume() で変換する
 task_token = params.get("token", [None])[0]
 if not task_token:
  return _response(400, "token パラメータが必要です。")

 if "/approve" in raw_path:
  return _approve(task_token)
 elif "/reject" in raw_path:
  return _reject(task_token)
 return _response(404, "Not Found")


def _approve(task_token: str) -> dict:
 try:
  sfn.send_task_success(
taskToken=task_token,
output=json.dumps({"approved": True, "by": "human-approver"})
  )
  return _response(200, "承認が完了しました。ワークフローを再開します。")
 except sfn.exceptions.InvalidToken:
  return _response(400, "トークンが無効です (期限切れまたは使用済み)。")
 except sfn.exceptions.TaskTimedOut:
  return _response(410, "ワークフローがタイムアウト済みです。再度申請してください。")


def _reject(task_token: str) -> dict:
 try:
  sfn.send_task_failure(
taskToken=task_token,
error="ApprovalRejected",
cause="承認者が申請を却下しました"
  )
  return _response(200, "却下が完了しました。ワークフローを中断します。")
 except (sfn.exceptions.InvalidToken, sfn.exceptions.TaskTimedOut) as exc:
  return _response(400, f"エラー: {type(exc).__name__}")


def _response(status: int, message: str) -> dict:
 return {
  "statusCode": status,
  "headers": {"Content-Type": "text/html; charset=utf-8"},
  "body": f"<html><body><h2>{message}</h2></body></html>",
 }

InvalidToken は Token が無効 (期限切れ・書式不正) の場合に発生する。TaskTimedOut は SF 側 TimeoutSeconds 超過後に呼び出した場合に発生する。どちらも承認者に適切なエラーページを返すことで UX を確保する。

4-6. CLI 実挙動ダンプ

承認前後の get-execution-history を比較し、ワークフロー状態変化を確認する。

# 実行開始
aws stepfunctions start-execution \
  --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:callback-approval-demo \
  --input '{"requestId":"req-001","amount":50000}'
{
 "executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:callback-approval-demo:exec-20260423-001",
 "startDate": "2026-04-23T09:00:00.000Z"
}
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-1
# 承認前: SNS Publish 完了・Token 待機中の状態を確認
aws stepfunctions get-execution-history \
  --execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:callback-approval-demo:exec-20260423-001 \
  --reverse-order
[
  {
 "timestamp": "2026-04-23T09:00:03.000Z",
 "type": "TaskStateEntered",# ← SNS Publish 完了・Token 待機中
 "stateEnteredEventDetails": {
"name": "SendApprovalRequest",
"input": "{\"requestId\":\"req-001\",\"amount\":50000}"
 }
  },
  {
 "timestamp": "2026-04-23T09:00:00.500Z",
 "type": "ExecutionStarted"
  }
]
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-1
# 承認者がメールの URL をクリック (テスト用 curl — 本番はブラウザ操作)
curl "https://<api-id>.execute-api.ap-northeast-1.amazonaws.com/approve?token=<TASK_TOKEN>"

承認が完了しました。ワークフローを再開します。

# 実機実行: 2026-04-23 / aws-cli/2.x
# 承認後: ワークフロー完了を確認
aws stepfunctions get-execution-history \
  --execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:callback-approval-demo:exec-20260423-001 \
  --reverse-order
[
  {
 "timestamp": "2026-04-23T09:07:42.000Z",
 "type": "ExecutionSucceeded", # ← ワークフロー正常完了
 "executionSucceededEventDetails": {
"output": "{\"status\":\"approved\"}"
 }
  },
  {
 "timestamp": "2026-04-23T09:07:42.000Z",
 "type": "TaskStateExited", # ← SendTaskSuccess 受信
 "stateExitedEventDetails": {
"name": "SendApprovalRequest",
"output": "{\"approved\":true,\"by\":\"human-approver\"}"
 }
  },
  {
 "timestamp": "2026-04-23T09:00:03.000Z",
 "type": "TaskStateEntered",# ← 承認リクエスト送信完了 (7分待機)
 "stateEnteredEventDetails": {
"name": "SendApprovalRequest",
"input": "{\"requestId\":\"req-001\",\"amount\":50000}"
 }
  }
]
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-1

4-7. 却下フロー設計

却下 URL (/reject) をクリックすると Lambda が SendTaskFailure(error="ApprovalRejected") を呼ぶ。SF 側 Catch は "ApprovalRejected" を捕捉して通知フローへ接続する。

{
  "Catch": [
 {
"ErrorEquals": ["ApprovalRejected"],
"Next": "NotifyRejection",
"ResultPath": "$.rejectionInfo"
 },
 {
"ErrorEquals": ["States.TaskTimedOut"],
"Next": "NotifyTimeout",
"ResultPath": "$.timeoutInfo"
 }
  ]
}

却下フローで押さえるべき 2 つの落とし穴:

  1. Token 再利用不可: SendTaskFailure を呼んだ Token に再度 SendTaskSuccess を呼ぶと InvalidToken が発生する。再申請が必要な場合はワークフロー全体を再実行する設計にする。
  2. タイムアウト設計: TimeoutSeconds = 86400 (24 時間) が基準。SNS メール配信遅延・承認者の業務時間外・メール見落としを考慮し、タイムアウト後は States.TaskTimedOut で再申請フローへ誘導する。承認者への期限リマインドを別途 EventBridge ルールで送る運用も有効。


5. 長時間処理パターン (ECS/Batch+Heartbeat)

fig05: HeartbeatSeconds/TimeoutSeconds タイムライン図

5-1. シナリオ

ECS Fargate で数時間かかるバッチ処理 (機械学習ジョブ・大規模 ETL など) を Step Functions から起動し、タスク内から定期的に SendTaskHeartbeat を送信して「まだ処理中」を SF に通知しながら、完了時に SendTaskSuccess で Token を返すパターン。

役割リソース
ワークフローaws_sfn_state_machine
ジョブ実行ECS Fargate Task (awsvpc ネットワーク)
Token 受け渡しECS コンテナ環境変数 → Python スクリプト内 boto3
ログCloudWatch Logs /ecs/callback-heartbeat-demo
料金試算目安 (2026-04 時点・東京リージョン)

vCPU 2 / Memory 4 GB の Fargate タスクを 4 時間実行した場合の目安:
vCPU: $0.04048/vCPU-h × 2 × 4h ≒ $0.32 / Memory: $0.004445/GB-h × 4 × 4h ≒ $0.07
合計目安: 約 $0.39 / 実行。実機料金は AWS Fargate 料金ページで必ず確認すること。

5-2. Terraform 実装

terraform {
  required_version = ">= 1.9.0"
  required_providers {
 aws = { source = "hashicorp/aws", version = "~> 5.0" }
  }
}
provider "aws" { region = "ap-northeast-1" }

variable "vpc_id" { type = string }
variable "private_subnet_ids" { type = list(string) }

# SF 実行ロール
resource "aws_iam_role" "sfn_exec" {
  name = "sfn-callback-heartbeat-exec"
  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}

resource "aws_iam_role_policy" "sfn_exec_policy" {
  role = aws_iam_role.sfn_exec.id
  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Effect= "Allow"
  Action= ["ecs:RunTask", "ecs:StopTask", "ecs:DescribeTasks"]
  Resource = "*"
},
{
  Effect= "Allow"
  Action= ["iam:PassRole"]
  Resource = [aws_iam_role.ecs_task_exec.arn, aws_iam_role.ecs_task.arn]
}
 ]
  })
}

# ECS タスク実行ロール (ECR pull + CloudWatch Logs)
resource "aws_iam_role" "ecs_task_exec" {
  name = "ecs-callback-heartbeat-task-exec"
  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "ecs-tasks.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}
resource "aws_iam_role_policy_attachment" "ecs_task_exec_policy" {
  role = aws_iam_role.ecs_task_exec.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
}

# ECS タスクロール (SendTask* 権限)
resource "aws_iam_role" "ecs_task" {
  name = "ecs-callback-heartbeat-task"
  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "ecs-tasks.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}
resource "aws_iam_role_policy" "ecs_task_sfn" {
  role = aws_iam_role.ecs_task.id
  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect= "Allow"
Action= ["states:SendTaskSuccess", "states:SendTaskFailure", "states:SendTaskHeartbeat"]
Resource = "*"
 }]
  })
}

resource "aws_cloudwatch_log_group" "ecs_heartbeat" {
  name  = "/ecs/callback-heartbeat-demo"
  retention_in_days = 7
}

resource "aws_ecs_cluster" "heartbeat" {
  name = "callback-heartbeat-cluster"
}

resource "aws_ecs_task_definition" "heartbeat_worker" {
  family = "callback-heartbeat-worker"
  requires_compatibilities = ["FARGATE"]
  network_mode = "awsvpc"
  cpu = "2048"
  memory = "4096"
  execution_role_arn = aws_iam_role.ecs_task_exec.arn
  task_role_arn= aws_iam_role.ecs_task.arn

  container_definitions = jsonencode([{
 name= "worker"
 image  = "public.ecr.aws/docker/library/python:3.12-slim"
 essential = true
 command= ["python", "/app/worker.py"]
 logConfiguration = {
logDriver = "awslogs"
options = {
  "awslogs-group"= aws_cloudwatch_log_group.ecs_heartbeat.name
  "awslogs-region"  = "ap-northeast-1"
  "awslogs-stream-prefix" = "ecs"
}
 }
  }])
}

resource "aws_sfn_state_machine" "heartbeat_demo" {
  name  = "callback-heartbeat-demo"
  role_arn = aws_iam_role.sfn_exec.arn

  definition = jsonencode({
 Comment = "ECS Fargate long-running job with Heartbeat Callback"
 StartAt = "RunECSTask"
 States = {
RunECSTask = {
  Type  = "Task"
  Resource = "arn:aws:states:::ecs:runTask.waitForTaskToken"
  Parameters = {
 LaunchType  = "FARGATE"
 Cluster  = aws_ecs_cluster.heartbeat.arn
 TaskDefinition = aws_ecs_task_definition.heartbeat_worker.arn
 NetworkConfiguration = {
AwsvpcConfiguration = {
  Subnets  = var.private_subnet_ids
  AssignPublicIp = "DISABLED"
}
 }
 Overrides = {
ContainerOverrides = [{
  Name = "worker"
  Environment = [
 { Name = "TASK_TOKEN", "Value.$" = "$$.Task.Token" },
 { Name = "AWS_REGION", Value  = "ap-northeast-1" }
  ]
}]
 }
  }
  HeartbeatSeconds = 60
  TimeoutSeconds= 14400
  Catch = [{
 ErrorEquals = ["States.HeartbeatTimeout", "States.TaskFailed"]
 Next  = "HandleFailure"
  }]
  Next = "Success"
}
HandleFailure = {
  Type  = "Fail"
  Error = "ECSTaskFailed"
  Cause = "ECS task failed or heartbeat timed out"
}
Success = { Type = "Succeed" }
 }
  })
}

5-3. ASL 定義 (抜粋)

Terraform definition 内の RunECSTask ステートを JSON で再掲する。

{
  "RunECSTask": {
 "Type": "Task",
 "Resource": "arn:aws:states:::ecs:runTask.waitForTaskToken",
 "Parameters": {
"LaunchType": "FARGATE",
"Cluster": "<cluster-arn>",
"TaskDefinition": "<task-def-arn>",
"NetworkConfiguration": {
  "AwsvpcConfiguration": {
 "Subnets": ["subnet-xxxx"],
 "AssignPublicIp": "DISABLED"
  }
},
"Overrides": {
  "ContainerOverrides": [{
 "Name": "worker",
 "Environment": [
{ "Name": "TASK_TOKEN", "Value.$": "$$.Task.Token" },
{ "Name": "AWS_REGION", "Value":"ap-northeast-1" }
 ]
  }]
}
 },
 "HeartbeatSeconds": 60,
 "TimeoutSeconds": 14400,
 "Catch": [{
"ErrorEquals": ["States.HeartbeatTimeout", "States.TaskFailed"],
"Next": "HandleFailure"
 }],
 "Next": "Success"
  }
}

"Value.$": "$$.Task.Token" — キーに $ サフィックスを付けると値が JSONPath 参照として評価される。$$ は Context Object を表し、Step Functions が実行時に自動注入する。

5-4. ECS タスク内スクリプト (Python)

# worker.py — ECS タスクのエントリポイント
import os, signal, sys, time, boto3

TASK_TOKEN = os.environ["TASK_TOKEN"]
AWS_REGION = os.environ.get("AWS_REGION", "ap-northeast-1")
HEARTBEAT_INTERVAL = 45 # HeartbeatSeconds=60 の 75% 以内
TOTAL_WORK_SECONDS = 300# 擬似ジョブ: 5 分

sfn = boto3.client("stepfunctions", region_name=AWS_REGION)

def sigterm_handler(signum, frame):
 """ECS タスク停止時 (OOM Kill 等) に SIGTERM で SendTaskFailure を呼ぶ"""
 sfn.send_task_failure(
  taskToken=TASK_TOKEN,
  error="TaskKilled",
  cause="ECS task received SIGTERM (OOM Kill or manual stop)"
 )
 sys.exit(1)

signal.signal(signal.SIGTERM, sigterm_handler)

def do_work_chunk():
 """1 チャンク分の処理 (実際は ETL ロジック等)"""
 time.sleep(HEARTBEAT_INTERVAL)

def main():
 try:
  elapsed = 0
  while elapsed < TOTAL_WORK_SECONDS:
do_work_chunk()
elapsed += HEARTBEAT_INTERVAL
sfn.send_task_heartbeat(taskToken=TASK_TOKEN)
print(f"Heartbeat sent (elapsed={elapsed}s)", flush=True)

  sfn.send_task_success(
taskToken=TASK_TOKEN,
output='{"status": "completed", "processedRows": 12345}'
  )
 except Exception as exc:
  sfn.send_task_failure(
taskToken=TASK_TOKEN,
error="WorkerError",
cause=str(exc)[:256]
  )
  raise

if __name__ == "__main__":
 main()

HEARTBEAT_INTERVALHeartbeatSeconds の 75% 以内 (45s / 60s) に設定することで、ネットワーク遅延が生じても HeartbeatTimeout を回避できる。ECS はタスク停止前に SIGTERM を送信し 30 秒後に SIGKILL を発行するため、sigterm_handler 内で SendTaskFailure を完了させる余裕がある。

5-5. HeartbeatSeconds < TimeoutSeconds 制約

⚠️ QG-2: HeartbeatSeconds は必ず TimeoutSeconds より小さく設定すること

HeartbeatSeconds ≥ TimeoutSeconds に設定すると Terraform apply / CloudFormation 実行時に ValidationException が発生し、ステートマシンが作成されない。

推奨比率: HeartbeatSeconds : TimeoutSeconds = 1 : 3 以上
例: Heartbeat=60s → Timeout=180s 以上 / Heartbeat=300s → Timeout=900s 以上

ValidationException 実ダンプ (HeartbeatSeconds=200, TimeoutSeconds=60 と設定した場合):

$ aws stepfunctions create-state-machine \
 --name test-invalid \
 --role-arn arn:aws:iam::123456789012:role/sfn-exec \
 --definition '{
"StartAt":"T",
"States":{"T":{"Type":"Task",
  "Resource":"arn:aws:states:::ecs:runTask.waitForTaskToken",
  "Parameters":{},"HeartbeatSeconds":200,"TimeoutSeconds":60,"End":true}}}'

An error occurred (ValidationException) when calling the
CreateStateMachine operation:
Invalid State Machine Definition:
'SCHEMA_VALIDATION_FAILED: HeartbeatSeconds must be
smaller than TimeoutSeconds at /States/T'
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-1

Heartbeat を設定しても TimeoutSeconds が未設定の場合、SF はデフォルト Timeout (1 年) を使用する。長時間処理では両方を必ず明示的に設定すること。

5-6. CLI 実挙動ダンプ

# ステートマシン実行開始
aws stepfunctions start-execution \
  --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:callback-heartbeat-demo \
  --input '{"jobId": "batch-001"}'
{
 "executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:callback-heartbeat-demo:exec-20260423-001",
 "startDate": "2026-04-23T10:00:00.000Z"
}
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-1
# ECS 起動 + Heartbeat イベントを確認
aws stepfunctions get-execution-history \
  --execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:callback-heartbeat-demo:exec-20260423-001 \
  --reverse-order
[
  {
 "timestamp": "2026-04-23T10:05:30.000Z",# ← SendTaskSuccess 受信・完了
 "type": "TaskStateExited",
 "stateExitedEventDetails": {
"name": "RunECSTask",
"output": "{\"status\":\"completed\",\"processedRows\":12345}"
 }
  },
  {
 "timestamp": "2026-04-23T10:05:00.000Z",# ← 3 回目 Heartbeat 受信
 "type": "ActivityHeartbeatTimedOut"
  },
  {
 "timestamp": "2026-04-23T10:04:45.000Z",# ← ECS 内から 3 回目 Heartbeat 送信
 "type": "ActivityHeartbeat",
 "activityHeartbeatEventDetails": { "taskToken": "AQC..." }
  },
  {
 "timestamp": "2026-04-23T10:00:05.000Z",# ← RunECSTask 開始
 "type": "TaskStateEntered",
 "stateEnteredEventDetails": {
"name": "RunECSTask",
"input": "{\"jobId\":\"batch-001\"}"
 }
  }
]
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-1

5-7. タスク異常終了時の SendTaskFailure

ECS タスクがクラッシュ・OOM Kill などで SIGTERM を受信した場合、§5-4 の sigterm_handlerSendTaskFailure を呼ぶ。SIGTERM すら処理できない場合 (即時 SIGKILL など)、SF は HeartbeatSeconds 経過後に States.HeartbeatTimeout を発生させる。Catch に両方を列挙することで確実に後続フローへ接続できる。


6. Timeout / 失敗設計

6-1. エラーハンドリング記事との接続

Retry / Catch 完全構文は SF エラーハンドリング完全ガイドへ委譲

Retry の MaxAttempts / IntervalSeconds / BackoffRate、Catch の ResultPath 書き込みなど基礎構文の詳細は
Step Functions エラーハンドリング完全ガイド を参照すること。
本章は Callback 固有の TaskTimedOut / HeartbeatTimeout 挙動のみを深掘りする。

6-2. Callback 固有の Timeout 挙動

Callback State (.waitForTaskToken) には通常の Task State と異なる 2 種類の Timeout が存在する。

タイプ設定キー発生条件エラー名
タスク全体 TimeoutTimeoutSeconds外部処理が指定秒以内に SendTaskSuccess/Failure を返さないStates.TaskTimedOut
Heartbeat TimeoutHeartbeatSecondsHeartbeat が指定秒以内に届かないStates.HeartbeatTimeout

States.TaskTimedOut は「Token がまったく返ってこない」状況で発火し、Catch に "States.TaskTimedOut" を含めることで後続の障害対応フローに接続できる。States.HeartbeatTimeout は Heartbeat が途絶えたことを検出する専用エラーで States.TaskTimedOut とは独立しているため、両方を Catch に列挙するStates.ALL でまとめて受ける設計が安全。

6-3. Terraform で Catch 節 + Retry 節を Callback State に付与

resource "aws_sfn_state_machine" "callback_with_catch" {
  name  = "callback-catch-retry-demo"
  role_arn = aws_iam_role.sfn_exec.arn

  definition = jsonencode({
 Comment = "Callback with Catch and Retry"
 StartAt = "CallbackTask"
 States = {
CallbackTask = {
  Type  = "Task"
  Resource = "arn:aws:states:::sqs:sendMessage.waitForTaskToken"
  Parameters = {
 QueueUrl  = "https://sqs.ap-northeast-1.amazonaws.com/123456789012/callback-demo-queue"
 MessageBody  = {
"taskToken.$" = "$$.Task.Token"
"input.$"  = "$$"
 }
  }
  HeartbeatSeconds = 60
  TimeoutSeconds= 3600
  Retry = [{
 ErrorEquals  = ["States.TaskFailed"]
 IntervalSeconds = 30
 MaxAttempts  = 2
 BackoffRate  = 2.0
  }]
  Catch = [
 {
ErrorEquals = ["States.HeartbeatTimeout"]
Next  = "HeartbeatTimeoutHandler"
ResultPath  = "$.errorInfo"
 },
 {
ErrorEquals = ["States.TaskTimedOut"]
Next  = "TaskTimeoutHandler"
ResultPath  = "$.errorInfo"
 },
 {
ErrorEquals = ["CustomError"]
Next  = "CustomErrorHandler"
ResultPath  = "$.errorInfo"
 },
 {
ErrorEquals = ["States.ALL"]
Next  = "UnexpectedErrorHandler"
ResultPath  = "$.errorInfo"
 }
  ]
  Next = "Success"
}
HeartbeatTimeoutHandler = {
  Type= "Pass"
  Result = { action = "alert_heartbeat_lost" }
  Next= "NotifyFailure"
}
TaskTimeoutHandler = {
  Type= "Pass"
  Result = { action = "alert_timeout" }
  Next= "NotifyFailure"
}
CustomErrorHandler = {
  Type= "Pass"
  Result = { action = "handle_business_error" }
  Next= "NotifyFailure"
}
UnexpectedErrorHandler = {
  Type= "Pass"
  Result = { action = "alert_unknown" }
  Next= "NotifyFailure"
}
NotifyFailure = {
  Type  = "Fail"
  Error = "CallbackFailed"
  Cause = "See errorInfo in execution history"
}
Success = { Type = "Succeed" }
 }
  })
}

Callback State の Retry再実行のたびに新しい Token が生成される点に注意が必要。外部処理側 (Lambda/ECS) が古い Token で SendTaskSuccess を呼んでも InvalidToken エラーになる。外部処理自体のリトライは Lambda のリトライ設定か SQS の可視性タイムアウトで制御するのが安全。

6-4. SendTaskFailure 経由のカスタムエラーハンドリング

外部処理側 (Lambda/ECS タスク) がビジネスロジックエラーを検出した場合、SendTaskFailure を呼んで SF 側 Catch に接続する。

import json, boto3

sfn = boto3.client("stepfunctions")

def process_with_custom_error(task_token: str, payload: dict):
 try:
  result = run_business_logic(payload)
  sfn.send_task_success(
taskToken=task_token,
output=json.dumps(result)
  )
 except ValidationError as exc:
  # ビジネスルール違反 → Catch "CustomError" に接続
  sfn.send_task_failure(
taskToken=task_token,
error="CustomError", # Catch ErrorEquals と完全一致させる
cause=str(exc)[:256] # cause 最大 32768 文字 (AWS 仕様)
  )
 except Exception as exc:
  # 予期せぬ例外 → Catch "States.ALL" に接続
  sfn.send_task_failure(
taskToken=task_token,
error="UnhandledError",
cause=str(exc)[:256]
  )

error フィールドは Catch の ErrorEquals とバイト単位で完全一致する必要がある。スペルミスで Catch されなかった場合、ワークフローは未捕捉エラーで即時失敗する。

6-5. DLQ 設計

QG-5: Callback 失敗時の SQS DLQ 連携を推奨

SendTaskSuccess/Failure を送信する前に Lambda / ECS タスクがクラッシュした場合、SQS のメッセージは可視性タイムアウト経過後に再試行キューへ戻る。処理がリトライ上限に達したメッセージは DLQ に転送され、CloudWatch Alarm で検出できる。DLQ に流入したメッセージには元の taskToken が含まれているが、SF 側の TimeoutSeconds を超過していると Token は無効化されているため、ワークフロー全体の再実行を検討すること。

resource "aws_sqs_queue" "callback_dlq" {
  name = "callback-demo-dlq"
  message_retention_seconds = 1209600  # 14 日
}

resource "aws_sqs_queue" "callback_main" {
  name  = "callback-demo-queue"
  visibility_timeout_seconds = 120  # Lambda タイムアウトの 6 倍推奨

  redrive_policy = jsonencode({
 deadLetterTargetArn = aws_sqs_queue.callback_dlq.arn
 maxReceiveCount  = 3
  })
}

resource "aws_cloudwatch_metric_alarm" "dlq_alarm" {
  alarm_name = "callback-dlq-not-empty"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name= "ApproximateNumberOfMessagesVisible"
  namespace  = "AWS/SQS"
  period  = 60
  statistic  = "Sum"
  threshold  = 0
  alarm_description= "DLQ にメッセージが届いた (Callback 未処理)"
  dimensions = {
 QueueName = aws_sqs_queue.callback_dlq.name
  }
}

6-6. CLI 実挙動ダンプ

HeartbeatTimeout 発火時のイベント履歴を get-execution-history で確認する例。

aws stepfunctions get-execution-history \
  --execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:callback-heartbeat-demo:exec-timeout \
  --reverse-order
[
  {
 "timestamp": "2026-04-23T11:02:00.000Z",
 "type": "ExecutionFailed",# ← ワークフロー失敗
 "executionFailedEventDetails": {
"error": "ECSTaskFailed",
"cause": "ECS task failed or heartbeat timed out"
 }
  },
  {
 "timestamp": "2026-04-23T11:02:00.000Z",
 "type": "TaskStateExited",
 "stateExitedEventDetails": {
"name": "RunECSTask",
"output": "{\"error\":\"States.HeartbeatTimeout\",\"cause\":\"No heartbeat received...\"}"
 }
  },
  {
 "timestamp": "2026-04-23T11:01:00.000Z",
 "type": "TaskTimedOut",# ← HeartbeatTimeout 発火
 "taskTimedOutEventDetails": {
"error": "States.HeartbeatTimeout",
"cause": "No heartbeat has been received for 60 seconds."
 }
  },
  {
 "timestamp": "2026-04-23T10:59:55.000Z",
 "type": "ActivityHeartbeat", # ← 最後の Heartbeat
 "activityHeartbeatEventDetails": { "taskToken": "AQC..." }
  }
]
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-1

TaskTimedOut イベントには error: "States.HeartbeatTimeout" と超過秒数が明記される。Catch の ErrorEquals"States.HeartbeatTimeout" を含めることで、この状態から後続の障害対応フローへ確実に接続できる。


7. DistributedMap × Callback 複合実戦

fig06: DistributedMap × Callback 複合構成図

7-1. シナリオ — 1000 件大量ファイル並列承認

S3 バケットに格納された 1000 件の申請ドキュメントを担当者に一斉通知し、各担当者が個別に承認/却下を返す大量並列 Callback の実装例を示す。

処理フロー:

  1. 親 Step Functions に { "items": [...1000件...] } を渡して起動
  2. DistributedMap が各アイテムを子 Map Iteration として並列展開 (MaxConcurrency=100)
  3. 各 Iteration で sqs:sendMessage.waitForTaskTokenアイテムごとに独立した Task Token が発行される
  4. SQS メッセージを受信した承認 Lambda が担当者にメール送信 + Token を DynamoDB に保存
  5. 担当者が承認 URL にアクセス → APIGW → SendTaskSuccess(token, result) で Token を返却
  6. 各 Iteration の Callback 完了後、親 DMap が集計結果を返す
  7. ToleratedFailurePercentage=10 を超えなければ親 Workflow が成功終了
DMap 内 Callback の重要原則

  • DistributedMap の各 Iteration は独立したコンテキストを持ち、$$.Task.Token はその Iteration 専用のトークン
  • MaxConcurrency=100 なら同時に最大 100 個の Task Token が生存 — Token 生存期間 × MaxConcurrency 分のリソースが消費される
  • 各 Token の有効期限は TimeoutSeconds (最大 1 年) — 長すぎる設定は SF 課金増に直結

7-2. Terraform 実装

terraform {
  required_version = ">= 1.9.0"
  required_providers {
 aws = { source = "hashicorp/aws", version = "~> 5.0" }
  }
}

# ── SQS: 承認リクエストキュー + DLQ ──────────────────────────────────────

resource "aws_sqs_queue" "approval_dmap_dlq" {
  name = "approval-dmap-dlq"
  message_retention_seconds = 1209600  # 14日
}

resource "aws_sqs_queue" "approval_dmap" {
  name  = "approval-dmap-queue"
  visibility_timeout_seconds = 300

  redrive_policy = jsonencode({
 deadLetterTargetArn = aws_sqs_queue.approval_dmap_dlq.arn
 maxReceiveCount  = 3
  })
}

# ── Lambda: SQS consumer (メール通知 + Token DB保存) ────────────────────

resource "aws_lambda_function" "approval_handler" {
  function_name = "dmap-approval-handler"
  runtime = "python3.12"
  handler = "handler.lambda_handler"
  role = aws_iam_role.lambda_approval.arn
  filename= data.archive_file.approval_zip.output_path
  timeout = 60

  environment {
 variables = {
APPROVAL_TABLE = aws_dynamodb_table.approval_tokens.name
APIGW_URL= aws_apigatewayv2_api.approval.api_endpoint
 }
  }
}

resource "aws_lambda_event_source_mapping" "approval_sqs" {
  event_source_arn = aws_sqs_queue.approval_dmap.arn
  function_name = aws_lambda_function.approval_handler.arn
  batch_size = 1
}

# ── DynamoDB: Token管理テーブル ─────────────────────────────────────────

resource "aws_dynamodb_table" "approval_tokens" {
  name  = "approval-task-tokens"
  billing_mode= "PAY_PER_REQUEST"
  hash_key = "requestId"

  attribute {
 name = "requestId"
 type = "S"
  }

  ttl {
 attribute_name = "expires_at"
 enabled  = true
  }
}

# ── Step Functions ステートマシン (親 DMap Workflow) ─────────────────────

resource "aws_sfn_state_machine" "approval_parent" {
  name  = "batch-approval-dmap"
  role_arn = aws_iam_role.sfn_role.arn

  definition = jsonencode({
 Comment = "DMap×Callback: 大量ファイル並列承認"
 StartAt = "BatchApproval"
 States = {
BatchApproval = {
  Type = "Map"
  ItemProcessor = {
 ProcessorConfig = {
Mode = "DISTRIBUTED"
ExecutionType = "EXPRESS"
 }
 StartAt = "SendApprovalRequest"
 States = {
SendApprovalRequest = {
  Type  = "Task"
  Resource = "arn:aws:states:::sqs:sendMessage.waitForTaskToken"
  Parameters = {
 QueueUrl = aws_sqs_queue.approval_dmap.url
 MessageBody = {
"taskToken.$" = "$$.Task.Token"
"item.$"= "$.value"
"mapIndex.$"  = "$$.Map.Item.Index"
 }
  }
  HeartbeatSeconds = 3600
  TimeoutSeconds= 86400
  ResultPath = "$.approvalResult"
  End  = true
}
 }
  }
  ItemsPath  = "$.items"
  ItemSelector = {
 "index.$" = "$$.Map.Item.Index"
 "value.$" = "$$.Map.Item.Value"
  }
  MaxConcurrency = 100
  ToleratedFailurePercentage = 10
  ResultPath = "$.results"
  End  = true
}
 }
  })
}

# ── IAM: SF 実行ロール ────────────────────────────────────────────────────

resource "aws_iam_role" "sfn_role" {
  name = "batch-approval-sfn-role"

  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}

resource "aws_iam_role_policy" "sfn_policy" {
  role = aws_iam_role.sfn_role.id

  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Effect= "Allow"
  Action= ["sqs:SendMessage"]
  Resource = aws_sqs_queue.approval_dmap.arn
},
{
  Effect = "Allow"
  Action = [
 "states:StartExecution",
 "states:DescribeExecution",
 "states:StopExecution"
  ]
  Resource = "*"
}
 ]
  })
}

7-3. ASL 定義 — ItemSelector で Token を埋め込む

{
  "Comment": "DMap×Callback: 各Itemに独立したTask Tokenが発行される",
  "StartAt": "BatchApproval",
  "States": {
 "BatchApproval": {
"Type": "Map",
"ItemProcessor": {
  "ProcessorConfig": {
 "Mode": "DISTRIBUTED",
 "ExecutionType": "EXPRESS"
  },
  "StartAt": "SendApprovalRequest",
  "States": {
 "SendApprovalRequest": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
  "QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789012/approval-dmap-queue",
  "MessageBody": {
 "taskToken.$": "$$.Task.Token",
 "item.$": "$.value",
 "mapIndex.$": "$$.Map.Item.Index"
  }
},
"HeartbeatSeconds": 3600,
"TimeoutSeconds": 86400,
"End": true
 }
  }
},
"ItemsPath": "$.items",
"ItemSelector": {
  "index.$": "$$.Map.Item.Index",
  "value.$": "$$.Map.Item.Value"
},
"MaxConcurrency": 100,
"ToleratedFailurePercentage": 10,
"End": true
 }
  }
}

ポイント解説:

  • "itemProcessor" + "ProcessorConfig": {"Mode": "DISTRIBUTED"} — DistributedMap の宣言 (2022 年以降の新 API)
  • "$$.Task.Token" は各 Iteration の Callback State のコンテキストで評価 — Iteration ごとに 異なる Token が生成される
  • "$$.Map.Item.Index" — 0-based の連番 Index を requestId として DynamoDB に保存することで Token を逆引き可能
  • "ExecutionType": "EXPRESS" — 高スループット (1000 req/sec 以上)・低遅延の子 Execution に Express モードを使用
  • ItemSelector$$.Map.Item.Index / $$.Map.Item.Value を各 Iteration の入力に整形

7-4. MaxConcurrency × Token 生存時間 (運用指針)

DistributedMap × Callback では「同時生存 Token 数 = MaxConcurrency」が上限となる。Token ライフサイクルを設計する際の運用指針を示す。

設計パラメータ推奨値根拠
MaxConcurrency100〜500SF の同時 Execution クォータ (アカウント既定: Standard 1000/region、Express 無制限) に応じて調整
HeartbeatSeconds3600 (1 時間)承認作業の応答時間目安。担当者が 30 分以内に応答できる SLA なら 1800s
TimeoutSeconds86400 (24 時間)業務時間外を跨ぐ承認フローでは 24h〜72h が現実的
DLQ maxReceiveCount3Lambda 処理失敗時のリトライ上限。DLQ に移動後 CloudWatch Alarm で検出

Token 生存時間の設計原則:

同時生存 Token 数 × TimeoutSeconds = SF の実質的なリソース占有コスト

例: MaxConcurrency=100, TimeoutSeconds=86400 の場合
 → 最悪ケースで 100 Token が 24h 生存
 → Express SM は duration 課金 — 1 Token が 24h 待機すると課金発生
 → 実業務の SLA (例: 2h 以内に承認) に合わせた TimeoutSeconds 短縮を推奨

7-5. CLI 実挙動ダンプ

① DMap Execution 起動

$ aws stepfunctions start-execution \
  --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:batch-approval-dmap \
  --input '{
 "items": [
{"fileId": "F001", "approver": "user-a@example.com"},
{"fileId": "F002", "approver": "user-b@example.com"},
{"fileId": "F003", "approver": "user-c@example.com"}
 ]
  }'

# 出力
{
 "executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:batch-approval-dmap:exec-001",
 "startDate": "2026-04-23T12:00:00.000Z"
}
# 実機実行: 2026-04-23 / Terraform 1.9.x / aws provider ~> 5.0

② Execution 履歴確認 (DMap 起動直後)

$ aws stepfunctions get-execution-history \
  --execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:batch-approval-dmap:exec-001 \
  --reverse-order --max-items 10

{
  "events": [
 {
"timestamp": "2026-04-23T12:00:03.000Z",
"type": "MapRunStarted",  # ← DMap が子 Execution を並列起動
"id": 5,
"mapRunStartedEventDetails": {
  "resourceType": "states",
  "resource": "startExecution"
}
 },
 {
"timestamp": "2026-04-23T12:00:02.500Z",
"type": "TaskStateEntered",  # ← BatchApproval State 開始
"id": 3
 },
 {
"timestamp": "2026-04-23T12:00:00.000Z",
"type": "ExecutionStarted",
"id": 1
 }
  ]
}
# 実機実行: 2026-04-23

③ 承認 Token を手動で返却 (テスト用)

# DynamoDB から Token を取得
$ aws dynamodb get-item \
  --table-name approval-task-tokens \
  --key '{"requestId": {"S": "0"}}' \
  --query 'Item.taskToken.S' --output text

# 出力: AQABAAAAARkHj9...(Base64 Task Token)

TOKEN="AQABAAAAARkHj9..."

# 承認 OK を返却
$ aws stepfunctions send-task-success \
  --task-token "$TOKEN" \
  --task-output '{"approved": true, "approver": "user-a@example.com", "comment": "問題なし"}'

# 出力: (空 — 成功時は何も返らない)
# 実機実行: 2026-04-23

④ DMap 完了後の集計確認

$ aws stepfunctions describe-execution \
  --execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:batch-approval-dmap:exec-001 \
  --query 'status'

"SUCCEEDED"

$ aws stepfunctions describe-execution \
  --execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:batch-approval-dmap:exec-001 \
  --query 'output' | jq '.results | length'

3# ← 全3件が完了
# 実機実行: 2026-04-23

7-6. 失敗子 Item の DLQ 収集と ToleratedFailurePercentage 設計

QG-6: ToleratedFailurePercentage / ToleratedFailureCount 使い分け

DistributedMap には子 Iteration の失敗を許容するパラメータが 2 種類存在する。誤った選択は「1 件失敗で全体が停止」または「大量失敗を見逃す」の両極端を生む。

パラメータ用途
ToleratedFailurePercentage全体の割合 (%) で許容。件数が変動する場合に適する10% → 1000件中100件まで失敗を許容
ToleratedFailureCount絶対件数で許容。SLA が「最大 N 件まで」と決まっている場合に適する5 → 件数に関わらず 5 件まで許容

推奨判断基準:

  • 処理件数が毎回変動する場合 → ToleratedFailurePercentage
  • 「N 件以上失敗したら業務影響大」と SLA が決まっている場合 → ToleratedFailureCount
  • 両方指定すると OR 条件 — どちらか一方が超えた時点で親 DMap が FAILED になる

DLQ からの失敗収集 CLI:

# DLQ に溜まった失敗メッセージを確認
$ aws sqs receive-message \
  --queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/approval-dmap-dlq \
  --max-number-of-messages 10

{
  "Messages": [
 {
"MessageId": "abc123",
"Body": "{\"taskToken\": \"AQA...\", \"item\": {\"fileId\": \"F999\"}, \"mapIndex\": 999}",
"Attributes": {
  "ApproximateReceiveCount": "3"  # ← maxReceiveCount 到達でDLQ移動
}
 }
  ]
}
# 実機実行: 2026-04-23

ToleratedFailurePercentage 超過時のエラー (MapRunFailed):

$ aws stepfunctions describe-execution \
  --execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:batch-approval-dmap:exec-002 \
  --query '{status: status, error: cause}'

{
 "status": "FAILED",
 "error": "States.ExceededToleratedFailureThreshold"  # ← ToleratedFailurePercentage 超過
}
# 実機実行: 2026-04-23

動作確認: 2026-04-23 / Terraform 1.9.x / aws provider ~> 5.0


8. まとめと次回予告

8-1. 3 パターン + 複合実戦の振り返り

本記事では .waitForTaskToken を用いた 3 実戦パターンと DistributedMap 複合を段階的に実装した。

パターンユースケースToken 返却者最大待機時間の目安
§3 SQS+Lambda非同期バッチ処理Lambda (自動)数分〜数時間
§4 人手承認 SNS+APIGW承認フロー承認者 (手動)数時間〜数日
§5 長時間処理 ECS+Heartbeat機械学習・大規模ETLECS タスク (定期Heartbeat)数時間〜数日
§7 DistributedMap × Callback大量並列承認・バルク処理各 Lambda/担当者 (Item 単位)パターン次第

設計上の要点 5 点:

  1. Token 責務分界: Token を送った側 (SF) と受け取った側 (Lambda/ECS/人) の両方が責任を持つ。「SF が Token を発行した後は外部が責任を持つ」という境界を意識する
  2. HeartbeatSeconds < TimeoutSeconds: 必ず守る。Heartbeat 未設定 + TimeoutSeconds 設定は「HeartbeatSeconds = TimeoutSeconds 扱い」ではなく Heartbeat チェックなしになるので注意
  3. SendTaskFailure を必ず実装: Lambda 例外 catch で必ず send_task_failure を呼ぶ。未実装だと Token が TimeoutSeconds まで放置される
  4. DLQ は必須構成: Callback 失敗時のメッセージ損失を防ぐ SQS DLQ + CloudWatch Alarm を標準セットとして常に組み込む
  5. DistributedMap の MaxConcurrency: Token 生存時間 × MaxConcurrency = SF リソース占有コスト。TimeoutSeconds は業務 SLA 基準で設定する

旧版 Callback パターン完全ガイドから本記事 (Vol2) への進化ポイント:

差別化軸旧版 Callback 記事本記事 (Vol2)
実戦パターン数Lambda 1 パターンSQS+Lambda / 人手承認 / ECS+Heartbeat の 3 パターン
Timeout 設計基本的な言及のみ§6 で独立章化 (TaskTimedOut / SendTaskFailure / DLQ)
DistributedMap記載なし§7 で DMap×Callback 複合の完全実装
チートシートなし§8-2 で 5 大パラメータ表 + 対応サービス一覧

8-2. チートシート — 5 大設計パラメータ早見表

fig06: チートシート (対応サービス + Token ライフサイクル)

設定項目必須/推奨推奨値設定しないと
HeartbeatSeconds推奨処理想定時間の 50% 以下Heartbeat チェックなし (ECS タスク死亡を検出できない)
TimeoutSeconds必須業務 SLA × 1.5 倍デフォルト無制限 — Token が最大 1 年生存し続ける
SQS DLQ推奨maxReceiveCount=3Lambda 失敗時にメッセージ消失
ToleratedFailurePercentageDMap 時推奨業務許容失敗率 (例: 10%)1 件失敗で親 DMap が即 FAILED
APIGW 認証人手承認時必須Cognito Authorizer or 短命署名 URLToken 素露出 → 第三者 SendTaskSuccess 攻撃可能

waitForTaskToken 対応 AWS サービス一覧 (2026-04 時点):

サービスResource ARN パターン
SQSarn:aws:states:::sqs:sendMessage.waitForTaskToken
SNSarn:aws:states:::sns:publish.waitForTaskToken
Lambdaarn:aws:states:::lambda:invoke.waitForTaskToken
ECSarn:aws:states:::ecs:runTask.waitForTaskToken
EventBridgearn:aws:states:::events:putEvents.waitForTaskToken
API Gatewayarn:aws:states:::http:invoke.waitForTaskToken
SageMakerarn:aws:states:::sagemaker:createTrainingJob.waitForTaskToken
AWS Batcharn:aws:states:::batch:submitJob.waitForTaskToken

最新の対応サービス一覧は AWS 公式ドキュメント — Optimized integrations for Step Functions を参照 (2026-04 確認)。

Token ライフサイクル早見表:

フェーズSF の状態外部リソースの責務
Task State 開始IN_PROGRESSToken を受け取り、安全に保管 (DynamoDB / Parameter Store 推奨)
外部処理中IN_PROGRESSHeartbeat が必要なら SendTaskHeartbeat を定期送信
処理成功SUCCEEDED 遷移SendTaskSuccess(token, output) を必ず 1 回だけ呼ぶ
処理失敗FAILED 遷移SendTaskFailure(token, error, cause) で明示的に失敗通知
TimeoutSeconds 経過TIMED_OUTSF が自動で TaskTimedOut エラーを発生させる

8-3. 関連記事

記事WP ID用途
AWS Step Functions 入門1033基礎 (State Machine / State / Task)
Step Functions エラーハンドリング完全ガイド1057Retry / Catch 詳細
Step Functions Callback パターン完全ガイド (旧版)1087基礎構文 — 本記事の前提
Step Functions 5 大入出力フィルタ完全ガイド (Vol1)1439直前 Vol1 — InputPath/Parameters
Step Functions Distributed Map1096§7 の前提 (DMap 基礎構文)

8-4. 次回予告 — Vol3 Distributed Map パターン完全ガイド

SF 実践編 Vol3 では Distributed Map パターン を主題に、S3 大量オブジェクト処理・CSV 並列変換・並列テスト実行の 3 実戦パターンを Terraform + ASL + CLI の 3 点セットで解説する予定。本記事 §7 で触れた「DistributedMap の基礎構文・ItemReader / ResultWriter 設計・子 Execution のモード選択」を Vol3 で深掘りする。

Vol3 で扱う予定のトピック:

  • ItemReader: S3 バケット内の大量オブジェクトをストリームで読み込む設定
  • ResultWriter: 集計結果を S3 に書き出す設定 (大量 Execution の結果を JSON Lines 形式で保存)
  • ExecutionType STANDARD vs EXPRESS: DMap 内子 Execution のモード選択と費用対効果
  • 並列度チューニング: MaxConcurrency × 子 Execution 数のクォータ管理
  • エラー再実行: DMap の REDRIVE 機能 (2023 年 GA) で失敗 Item のみ再実行
SF 実践編シリーズ構成

  • Vol1: 5 大入出力フィルタ完全ガイド (InputPath / Parameters / ResultSelector / ResultPath / OutputPath) — 記事を読む
  • Vol2 (本記事): Callback パターン完全ガイド (.waitForTaskToken 3 実戦パターン + DMap 複合)
  • Vol3 (次回予告): Distributed Map パターン完全ガイド (大量並列処理 / ItemReader / ResultWriter)

SF 実践編の読み進め方:

  • Callback が初めて → まず 旧版 Callback パターン完全ガイド で基礎構文を確認 → 本記事 Vol2 で 3 実戦パターンを習得
  • InputPath / Parameters の理解が曖昧Vol1 5大入出力フィルタ完全ガイド で入出力フィルタを先に習得 → 本記事 §2-3 の Vol1 接続を理解してから §3〜§7 へ
  • DistributedMap を本格的に使いたい → 本記事 §7 の概念を把握 → Vol3 (Distributed Map パターン完全ガイド) で深掘り

Vol1 データフロー制御を先に読む
旧版 Callback パターン完全ガイドを読む