1. この記事について

- ASL の
Resource: "arn:aws:states:::sqs:sendMessage.waitForTaskToken"構文を書き、$$.Task.Tokenを Context Object から取り出せる - SQS+Lambda / 人手承認 SNS+APIGW / 長時間処理 ECS+Heartbeat の 3 実戦パターンを Terraform+ASL+CLI の 3 点セットで自力構築できる
- HeartbeatSeconds < TimeoutSeconds 制約を満たす設計ができ、長時間処理のリトライ設計を根拠を持って選択できる
- DistributedMap × Callback 複合パターンの MaxConcurrency × Token 生存時間を設計できる
- チートシート 1 枚を手元に置き、新規 Callback State 設計を 10 分以内で完了できる
- Step Functions 基本概念 (State Machine/State/Task/Execution) → SF 入門記事へ委譲
- InputPath / Parameters / ResultPath / OutputPath の詳細 → SF 実践編 Vol1へ委譲
- Retry / Catch の基礎構文 → エラーハンドリング完全ガイドへ委譲
- Callback 基礎構文のみを学びたい読者 → 旧版 Callback 記事へ委譲
1-1. 本記事のゴール
本記事は Step Functions 実践編 Vol2 として、.waitForTaskToken を使った Callback パターンの 3 実戦パターンを Terraform + ASL + CLI 実挙動ダンプの 3 点セットで再現可能に解説する。
読了後、読者は以下を自力で行える。
- ASL の
Resource: "arn:aws:states:::sqs:sendMessage.waitForTaskToken"構文を書き、$$.Task.Tokenを Context Object から取り出せる - Lambda / ECS / API Gateway 側から
SendTaskSuccess/SendTaskFailure/SendTaskHeartbeatを正しく呼び出せる - HeartbeatSeconds < TimeoutSeconds 制約を満たす設計ができ、長時間処理のリトライ設計を根拠を持って選択できる
- DistributedMap 内で Callback を使う際の制約 (MaxConcurrency × Token 数) を即答できる
- チートシート 1 枚を手元に置き、新規 Callback State 設計を 10 分以内で完了できる
1-2. 読者像
本記事は以下の読者を対象とする。
- SF 入門記事を読了済で、
States.Languageの基本構文を書ける実践者 - Callback 基礎 (旧版または独学) を習得済で、実戦パターンに進みたい設計者
- 現場で「人手承認フロー」「長時間 ECS バッチ」「複合並列処理」の設計に直面している AWS 担当者
- Terraform で SF + Lambda + Queue 系リソースを 3 点セットで自力構築できるエンジニア
前提知識チェックリスト
本記事に進む前に以下を確認すること。チェックが入らない項目がある場合、先に対応する記事を参照することを推奨する。本記事の §3 以降は前提知識を補足説明なく活用するため、事前確認が完走効率を大きく左右する。
| チェック | 前提知識 | 確認先 |
|---|---|---|
| ☑ | State Machine / State / Task / Execution の概念を説明できる | SF 入門 |
| ☑ | Type: Task の基本 ASL 構文を書ける | SF 入門 |
| ☑ | Retry / Catch の基礎構文を理解している | エラーハンドリング完全ガイド |
| ☑ | Parameters / InputPath / OutputPath の動作を理解している | SF 実践編 Vol1 |
| ☑ | Terraform で aws_sfn_state_machine リソースを作成できる | 本記事の §3 で詳解 |
1-3. なぜ今 Callback 実践編か
SF 実践編シリーズは Vol1 (5 大入出力フィルタ) で「データが State 間をどう流れるか」を体得した。
Vol2 (本記事) は「外部リソースに制御を渡し、完了通知を受け取るまで Step Functions を待機させる」Callback パターンを深掘りする。
ポーリング方式 (定期的に外部 API を叩いて完了を確認) に比べ、Callback パターンには以下の優位性がある。
| 比較軸 | ポーリング | Callback (waitForTaskToken) |
|---|---|---|
| SF 実行コスト | 状態遷移ごとに課金 | 待機中は課金されない |
| 応答遅延 | ポーリング間隔分の遅延 | 完了と同時に次 State へ |
| 外部処理の自由度 | SF が能動的に確認しに行く | 外部リソースが任意のタイミングで通知 |
| 長時間処理の耐性 | 実行履歴が大きくなりやすい | Heartbeat で生存確認しつつ長時間待機可 |
このため ECS/Batch の長時間処理・人手による承認フロー・外部 API 完了待ち など、処理完了タイミングが不確定なユースケースに Callback パターンが不可欠となる。
なお、Callback パターン (.waitForTaskToken) と同期統合 (.sync:2) / SDK 統合は別物であることに注意する。同期統合は SF が定期的にポーリングして完了を確認するのに対し、Callback は外部側からの通知を待つ。非同期処理の性質に応じて使い分けることが設計品質の鍵となる (詳細は SF 第8弾 SDK 統合記事へ委譲)。
本記事で解決する現場課題
| 現場の局面 | 本記事読了前 | 本記事読了後 |
|---|---|---|
| Callback 初実装で Token が戻らず停止 | SendTaskSuccess 呼び漏れ・Token 渡し忘れに気付かず Timeout まで放置 | §3-6 の落とし穴 3 件で「Token を受け取った側が必ず何かを返す」責務分界が明示化 |
| 人手承認フロー設計で「承認画面どう作る?」で詰まる | APIGW + Cognito + Token 管理を自作して破綻 | §4 完全実装 (SNS→メール→APIGW→SendTaskSuccess) を Terraform でコピペ可能 |
| 長時間処理 (ECS 数時間) の Timeout 設計で迷走 | HeartbeatSeconds を設定したつもりが効かない / タスク停止しても SF が残る | §5-5 の Heartbeat 実装 + §6 Timeout 設計で「SF 側と実行側の両方から責任分界」明記 |
| DistributedMap で 100 並列 Callback したい | Token 管理 + MaxConcurrency 設計が不明で本番化できず | §7 で「並列数 × Token 生存時間」の設計指針を提示 |
1-4. 関連記事
本記事は以下の既存記事と連携している。§1-5 で旧版との差別化を確認した上で読み進めることを推奨する。
| 記事 | 位置付け | リンク |
|---|---|---|
| AWS Step Functions 入門 | 基礎概念の前提確認 | → 読む |
| Step Functions エラーハンドリング完全ガイド | Retry/Catch 詳細 (§6 委譲先) | → 読む |
| Step Functions Callback パターン完全ガイド (旧版) | 基礎構文の横並び参照 | → 読む |
| Step Functions データフロー制御 (旧版) | Vol1 と対になる旧版 | → 読む |
| Step Functions 5 大入出力フィルタ (Vol1) | 直前の実践編 | → 読む |
| Step Functions Distributed Map | §7 DMap×Callback で参照 | → 読む |
| Step Functions Express vs Standard | 非同期設計の選択肢 | → 読む |
1-5. 旧版 (Callback パターン完全ガイド) との差別化
旧版は waitForTaskToken の基礎構文と SNS+Lambda 1 パターンの実装解説が中心であった。
本記事は以下 4 点で旧版を超える。
- 3 実戦パターンを独立章で提示 — SQS+Lambda (§3) / 人手承認 SNS+APIGW (§4) / 長時間処理 ECS+Heartbeat (§5)
- Timeout / 失敗設計を独立章化 (§6) — TaskTimedOut / HeartbeatSeconds 制約 / DLQ 設計を Callback 固有視点で深掘り
- DistributedMap × Callback 複合を実戦パターンとして新設 (§7) — 旧版には存在しない応用章
- 対応サービス一覧 + チートシート (§2-4 / §8-2) で暗記型リファレンス化
旧版 vs 本記事 詳細比較
| 観点 | 旧版 (Callback パターン完全ガイド) | 本記事 (実践編 Vol2) |
|---|---|---|
| 実装パターン数 | 1 (SNS+Lambda) | 3 独立章 + 複合章 (§3/§4/§5/§7) |
| Terraform カバレッジ | 部分的 | 全パターン完全コード |
| CLI 実挙動ダンプ | なし / 限定的 | 全章で start-execution / get-execution-history ダンプを明記 |
| Timeout/失敗設計 | 言及程度 | §6 独立章 (TaskTimedOut / HeartbeatTimeout / DLQ) |
| DistributedMap 連携 | 非対象 | §7 で MaxConcurrency × Token 設計を実戦解説 |
| チートシート | なし | §8-2 で 1 枚凝縮 |
| 差別化 Vol1 との接続 | なし | §2-3 / §2-6 で明示的に接続 |
旧版は「基礎を速習したい読者」向け、本記事は「現場で設計・実装する実践者」向けとして並立する。旧版を先に読み、本記事でパターンを実装する順序を推奨する。
それでは §2 から実装準備を整え、§3 以降で 3 実戦パターンを順に構築していく。
次回 Vol3 (Distributed Map) 予告を読む
2. 前提・環境・準備

2-1. 前提環境
本記事の手順を実行するには以下の環境・権限が必要となる。
必要な AWS IAM 権限 (SF 実行ロール)
| Action | 用途 |
|---|---|
states:StartExecution | State Machine 実行開始 |
states:SendTaskSuccess | Token 成功通知 |
states:SendTaskFailure | Token 失敗通知 |
states:SendTaskHeartbeat | Heartbeat 送信 |
sqs:SendMessage | SQS へのメッセージ送信 |
sns:Publish | SNS トピックへの発行 |
ecs:RunTask | ECS タスク起動 |
iam:PassRole | タスク実行ロールの委譲 |
必要なローカル環境
- AWS CLI v2 最新 (
aws --versionで確認) - Terraform 1.9.x 以上 (
terraform versionで確認) - Python 3.12 + boto3 ≥ 1.34 (Lambda ハンドラのローカルテスト用)
- 適切な AWS 認証情報 (
aws configureまたは環境変数)
2-2. 使用技術スタック
本記事全体で使用する技術スタックを以下に示す。
terraform {
required_version = ">= 1.9.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
provider "aws" {
region = "ap-northeast-1"
}
| 技術 | バージョン | 用途 |
|---|---|---|
| Terraform | ≥ 1.9.0 | インフラ定義 |
| hashicorp/aws | ~> 5.0 | AWS リソース管理 |
| AWS CLI | v2 最新 | 実挙動確認・CLI ダンプ |
| Python | 3.12 | Lambda / ECS ハンドラ実装 |
| boto3 | ≥ 1.34 | AWS SDK for Python |
2-3. ゴール状態の定義
本記事終了時点で以下 3 パターンが AWS 上で稼働している状態を目指す。
Vol1 で学んだ InputPath / Parameters / ResultPath / OutputPath は Callback State にもそのまま適用される。特に Parameters で $$.Task.Token を SQS メッセージや ECS コンテナ環境変数に埋め込む操作が本記事の核心となる。Vol1 未読の場合は先に SF 実践編 Vol1 を参照すること。
| パターン | ゴール状態 |
|---|---|
| §3 SQS+Lambda | SF が SQS へメッセージ送信 → Lambda が処理 → SendTaskSuccess で SF が次 State へ進む |
| §4 人手承認 SNS+APIGW | SF が SNS メール送信 → 承認者が APIGW エンドポイントをクリック → SF が承認結果を受け取る |
| §5 ECS+Heartbeat | SF が ECS Fargate タスクを起動 → タスクが定期 Heartbeat を送信しながら処理 → 完了で SF が次 State へ |
2-4. 対応リソース / サービス一覧
以下の表は 2026-04 時点の AWS 公式ドキュメント「Integration Patterns」を基に作成。最新情報は AWS 公式 docs を参照のこと。
| サービス | Resource ARN サフィックス例 | 主なユースケース |
|---|---|---|
| SQS | :::sqs:sendMessage.waitForTaskToken | Lambda consumer 経由の非同期処理 |
| SNS | :::sns:publish.waitForTaskToken | メール通知 → 人手承認フロー |
| Lambda | :::lambda:invoke.waitForTaskToken | Lambda が長時間処理後に自ら Token 返却 |
| ECS | :::ecs:runTask.waitForTaskToken | Fargate/EC2 長時間バッチ + Heartbeat |
| AWS Batch | :::batch:submitJob.waitForTaskToken | Batch ジョブ完了待ち |
| API Gateway | :::apigateway:invoke.waitForTaskToken | 外部 HTTP エンドポイント完了待ち |
| EventBridge | :::events:putEvents.waitForTaskToken | イベント駆動型非同期フロー |
| DynamoDB | :::dynamodb:putItem.waitForTaskToken | DynamoDB Streams 経由の後続処理 |
2-5. Token ライフサイクル
Callback パターンの核心は Task Token の発行・伝達・返却のライフサイクルにある。
Task Token の生存期間は最大 1 年 (365 日)。ただし ASL の TimeoutSeconds を設定した場合、その秒数が経過すると Token は無効化され、SF は TaskTimedOut エラーを発生させる。Token 保持期間が長くなる設計 (人手承認で長期放置など) では TimeoutSeconds を十分大きく設定すること。
ライフサイクル全体図 (fig02 参照)
- Token 発行: SF が Callback State に入ると、一意の Task Token を自動生成する
- Token 伝達:
Parameters内で"TaskToken.$": "$$.Task.Token"を指定し、外部リソース (SQS/SNS/ECS など) へ Token を渡す - SF 待機: SF は SendTask 系 API が呼ばれるまで同 State で待機する (ポーリングなし)
- 外部処理: 外部リソース (Lambda / ECS / 承認者など) が処理を実施する
- Token 返却: 処理完了後、外部側が以下のいずれかを呼び出す:
SendTaskSuccess— 成功 (SF は次 State へ進む)SendTaskFailure— 失敗 (SF は Catch/エラー処理へ)SendTaskHeartbeat— 生存確認 (長時間処理時に HeartbeatTimeout をリセット)
SendTask 系 API の引数と用途
| API | 必須引数 | 任意引数 | 主な用途 |
|---|---|---|---|
SendTaskSuccess | taskToken, output | — | 処理完了 → SF 次 State へ |
SendTaskFailure | taskToken | error (ErrorEquals 照合用), cause (原因文字列 ≤32768 byte) | エラー通知 → Catch へ |
SendTaskHeartbeat | taskToken | — | 生存確認 (HeartbeatTimeout リセット) |
Token を保持する側の責務は「必ず上記 3 つのうちいずれかを呼び出すこと」に尽きる。呼び忘れると SF は TimeoutSeconds まで待機し続け、コストと時間を消費する。
Token の識別子形式
Task Token は以下のような長い不透明文字列 (opaque token) として発行される。
AQCgAAAAKgAAAAMAAAAAAAAAAQAAAAAAAAA...(約 1000 文字の Base64 エンコード文字列)
- SF が内部的に生成する一意の識別子で、外部から推測や生成はできない
- 同一 Execution でも State に入るたびに異なる Token が発行される (Retry 時も新 Token)
- 外部リソース (SQS/SNS/ECS) のログや DDB に永続化して後続処理に引き渡すことが多い
2-6. $$.Task.Token 取得方法
Context Object から Task Token を取得するには、ASL の Parameters フィールドで .$ サフィックスを使用する。
{
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-queue",
"MessageBody": {
"Input.$": "$",
"TaskToken.$": "$$.Task.Token"
}
},
"HeartbeatSeconds": 60,
"TimeoutSeconds": 3600,
"Next": "ProcessResult"
}
重要ポイント
| 記法 | 意味 |
|---|---|
"TaskToken.$": "$$.Task.Token" | Context Object から Token を取得して Parameters に埋め込む |
$$ | Context Object のルートを指す (実行メタデータにアクセス) |
$$.Task.Token | 現在の Task State に割り当てられた一意 Token |
"Input.$": "$" | 現在の State Input をそのまま転送 (Vol1 で学んだ $ パス) |
Context Object の主要フィールド一覧
$$ でアクセスできる Context Object には以下のフィールドが存在する。Callback パターンでは $$.Task.Token が最重要だが、他のフィールドも活用できる。
| パス | 型 | 内容 |
|---|---|---|
$$.Execution.Id | string | 実行 ARN |
$$.Execution.Name | string | 実行名 (StartExecution 時に指定) |
$$.Execution.StartTime | string | 実行開始時刻 (ISO 8601) |
$$.State.Name | string | 現在の State 名 |
$$.State.EnteredTime | string | State 入力時刻 |
$$.Task.Token | string | Callback Token (本記事の核心) |
$$.StateMachine.Id | string | State Machine ARN |
$$.Map.Item.Index | number | DistributedMap 内の Item インデックス |
$$.Map.Item.Value | any | DistributedMap 内の Item 値 |
$$.Map.Item.Index / $$.Map.Item.Value は §7 (DistributedMap × Callback 複合) で活用する。
旧版は上記の基礎構文と SNS+Lambda 1 パターンが中心。本記事は §3-§7 で 3 実戦パターン + DMap 複合を Terraform+ASL+CLI 3 点セット完全実装で提示する。旧版で基礎を確認した上で本記事の実戦章に進むことを推奨する。
2-7. 3 パターン使い分け早見表
本記事で扱う 3 パターンをどの局面で選択するかを以下に示す。
| ユースケース | 推奨パターン | 章 | 理由 |
|---|---|---|---|
| 短時間の非同期処理 (秒〜分) | SQS+Lambda | §3 | 最小構成・コスト最小 |
| 人手が介在する承認フロー | SNS+APIGW | §4 | メール通知 + URL クリック承認 |
| 数時間〜数日の長時間処理 | ECS/Batch+Heartbeat | §5 | Heartbeat で生存確認しながら長時間待機 |
| 大量 Item の並列非同期処理 | DistributedMap×Callback | §7 | 子 Map ごとに独立 Token 管理 |
3. SQS+Lambda Callback パターン

3-1. シナリオ
Step Functions が SQS キューへメッセージを送信し、Lambda がそのメッセージを受信して処理後に SendTaskSuccess / SendTaskFailure でトークンを返却するパターン。3 実戦パターンの中で最もシンプルな構成であり、Callback パターン実装の出発点として最適。
処理フロー:
- SF が
sqs:sendMessage.waitForTaskTokenでキューにメッセージ送信 (Token を MessageBody に含める) - Lambda が SQS トリガーでメッセージを受信し Token を body から取り出す
- Lambda がビジネスロジック実行後
send_task_successを呼び出し - SF がトークン受領を確認し次の State へ遷移
- Step Functions 実行ロール:
sqs:SendMessage(対象キューの ARN を Resource に指定) - Lambda 実行ロール:
states:SendTaskSuccess/states:SendTaskFailure - Lambda 実行ロール:
sqs:ReceiveMessage/sqs:DeleteMessage/sqs:GetQueueAttributes
3-2. Terraform 実装
terraform {
required_version = ">= 1.9.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
# ── SQS: Callback キューと DLQ ──────────────────────────────
resource "aws_sqs_queue" "callback_dlq" {
name = "sf-callback-dlq"
message_retention_seconds = 1209600
}
resource "aws_sqs_queue" "callback_queue" {
name = "sf-callback-queue"
visibility_timeout_seconds = 300
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.callback_dlq.arn
maxReceiveCount = 3
})
}
# ── IAM: Step Functions 実行ロール ──────────────────────────
resource "aws_iam_role" "sfn_role" {
name = "sfn-callback-sqs-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "sfn_sqs_send" {
role = aws_iam_role.sfn_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect= "Allow"
Action= ["sqs:SendMessage"]
Resource = aws_sqs_queue.callback_queue.arn
}]
})
}
# ── IAM: Lambda 実行ロール ──────────────────────────────────
resource "aws_iam_role" "lambda_role" {
name = "lambda-callback-sqs-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "lambda_sfn_callback" {
role = aws_iam_role.lambda_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect= "Allow"
Action= ["states:SendTaskSuccess", "states:SendTaskFailure"]
Resource = "*"
},
{
Effect= "Allow"
Action= ["sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes"]
Resource = aws_sqs_queue.callback_queue.arn
},
{
Effect= "Allow"
Action= ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"]
Resource = "arn:aws:logs:*:*:*"
}
]
})
}
# ── Lambda 関数 ─────────────────────────────────────────────
resource "aws_lambda_function" "callback_handler" {
function_name = "sf-callback-sqs-handler"
role = aws_iam_role.lambda_role.arn
handler = "index.handler"
runtime = "python3.12"
timeout = 60
filename= data.archive_file.lambda_zip.output_path
}
data "archive_file" "lambda_zip" {
type = "zip"
source_file = "${path.module}/lambda/index.py"
output_path = "${path.module}/lambda.zip"
}
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
event_source_arn = aws_sqs_queue.callback_queue.arn
function_name = aws_lambda_function.callback_handler.arn
batch_size = 1
}
# ── Step Functions State Machine ────────────────────────────
resource "aws_sfn_state_machine" "callback_sqs" {
name = "callback-sqs-example"
role_arn = aws_iam_role.sfn_role.arn
definition = jsonencode({
Comment = "SQS+Lambda Callback パターン"
StartAt = "WaitForLambda"
States = {
WaitForLambda = {
Type = "Task"
Resource = "arn:aws:states:::sqs:sendMessage.waitForTaskToken"
Parameters = {
QueueUrl = aws_sqs_queue.callback_queue.url
MessageBody = {
"taskToken.$" = "$$.Task.Token"
"input.$" = "$"
}
}
TimeoutSeconds= 300
HeartbeatSeconds = 60
Next = "Done"
Catch = [{
ErrorEquals = ["States.TaskFailed", "States.Timeout"]
Next = "HandleError"
}]
}
Done = {
Type = "Succeed"
}
HandleError = {
Type = "Fail"
Error = "CallbackFailed"
Cause = "Lambda did not complete within timeout"
}
}
})
}
3-3. ASL 定義
Terraform の jsonencode() で埋め込んだ ASL を JSON 単体で確認する参照版。$$.Task.Token が Context Object (実行メタデータ) から取り出した Task Token であることを確認する。
{
"Comment": "SQS+Lambda Callback パターン",
"StartAt": "WaitForLambda",
"States": {
"WaitForLambda": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789012/sf-callback-queue",
"MessageBody": {
"taskToken.$": "$$.Task.Token",
"input.$": "$"
}
},
"TimeoutSeconds": 300,
"HeartbeatSeconds": 60,
"Next": "Done",
"Catch": [
{
"ErrorEquals": ["States.TaskFailed", "States.Timeout"],
"Next": "HandleError"
}
]
},
"Done": { "Type": "Succeed" },
"HandleError": {
"Type": "Fail",
"Error": "CallbackFailed",
"Cause": "Lambda did not complete within timeout"
}
}
}
設計ポイント:
– "taskToken.$": "$$.Task.Token" の .$ サフィックスが参照パス記法。$$ は Context Object へのアクセスで $ (入力 JSON) とは別物。
– Resource 末尾の .waitForTaskToken サフィックスが Callback パターン指定子。省略すると同期統合 (.sync) になり挙動が変わる。
– TimeoutSeconds / HeartbeatSeconds は全 Callback State で必ず明記する (§5-5 の制約参照)。
3-4. Lambda ハンドラ (Python / Node.js)
Python 3.12 実装:
import json
import boto3
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
sfn_client = boto3.client("stepfunctions")
def handler(event, context):
for record in event["Records"]:
body = json.loads(record["body"])
task_token = body["taskToken"]
task_input = body.get("input", {})
try:
result = process_task(task_input)
sfn_client.send_task_success(
taskToken=task_token,
output=json.dumps({"result": result, "status": "success"}),
)
logger.info("SendTaskSuccess 完了")
except Exception as exc:
logger.exception("処理失敗: %s", exc)
sfn_client.send_task_failure(
taskToken=task_token,
error="ProcessingError",
cause=str(exc)[:256],
)
raise # SQS 可視性タイムアウトで再試行 → maxReceiveCount 超過後に DLQ へ転送
def process_task(task_input: dict) -> str:
return f"processed: {task_input}"
Node.js v20 実装 (要点のみ):
const { SFNClient, SendTaskSuccessCommand, SendTaskFailureCommand } = require("@aws-sdk/client-sfn");
const sfn = new SFNClient({});
exports.handler = async (event) => {
for (const record of event.Records) {
const { taskToken, input } = JSON.parse(record.body);
try {
const result = await processTask(input);
await sfn.send(new SendTaskSuccessCommand({
taskToken,
output: JSON.stringify({ result, status: "success" }),
}));
} catch (err) {
await sfn.send(new SendTaskFailureCommand({
taskToken,
error: "ProcessingError",
cause: err.message.slice(0, 256),
}));
throw err;
}
}
};
3-5. CLI 実挙動ダンプ
ステップ 1: ステートマシン実行開始
EXEC_ARN=$(aws stepfunctions start-execution \
--state-machine-arn "arn:aws:states:ap-northeast-1:123456789012:stateMachine:callback-sqs-example" \
--input '{"order_id": "ORD-2026-001", "amount": 5000}' \
--query 'executionArn' --output text)
echo "Execution ARN: $EXEC_ARN"
# 実機実行: 2026-04-XX / Terraform 1.9.x / aws provider ~> 5.0
{
"executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:callback-sqs-example:run-20260401",
"startDate": "2026-04-01T10:00:00.000Z"
}
ステップ 2: Lambda 処理前 — waitForTaskToken で待機中
aws stepfunctions get-execution-history \
--execution-arn "$EXEC_ARN" \
--query 'events[].{type:type,timestamp:timestamp}' \
--output table
# 実機実行: 2026-04-XX / Lambda 処理前: WaitForLambda が TaskStarted で待機中
---------------------------------------------------------------
| GetExecutionHistory |
+----------------------------+--------------------------------+
|timestamp | type |
+----------------------------+--------------------------------+
| 2026-04-01T10:00:00.123Z | ExecutionStarted |
| 2026-04-01T10:00:00.234Z | TaskStateEntered | # WaitForLambda 開始
| 2026-04-01T10:00:00.345Z | TaskScheduled | # SQS にメッセージ送信
| 2026-04-01T10:00:00.456Z | TaskStarted | # waitForTaskToken 待機開始
+----------------------------+--------------------------------+
ステップ 3: Lambda が SendTaskSuccess 送信後 — 実行完了
# 実機実行: 2026-04-XX / Lambda SendTaskSuccess 後: ExecutionSucceeded
---------------------------------------------------------------
| GetExecutionHistory |
+----------------------------+--------------------------------+
|timestamp | type |
+----------------------------+--------------------------------+
| 2026-04-01T10:00:00.123Z | ExecutionStarted |
| 2026-04-01T10:00:00.234Z | TaskStateEntered |
| 2026-04-01T10:00:00.345Z | TaskScheduled |
| 2026-04-01T10:00:00.456Z | TaskStarted |
| 2026-04-01T10:00:02.789Z | TaskSucceeded | # SendTaskSuccess 受領
| 2026-04-01T10:00:02.890Z | TaskStateExited|
| 2026-04-01T10:00:02.901Z | ExecutionSucceeded| # 完了
+----------------------------+--------------------------------+
3-6. よくある落とし穴 (3 件)
症状: Lambda 側で body["taskToken"] を参照すると KeyError になり SendTaskSuccess を呼び出せない。SF は TimeoutSeconds (例: 300 秒) 経過まで待機し続け、その間のコストが発生する。
原因: ASL Parameters 内の "taskToken.$": "$$.Task.Token" を省略したか、キー名を誤った (task_token と taskToken の大文字小文字不一致など)。
対策: ASL の MessageBody に "taskToken.$": "$$.Task.Token" を必ず明記し、Lambda 側の参照キー名と一致させる。統合テスト前に get-execution-history で TaskScheduled イベントの parameters フィールドを目視確認する。
症状: Lambda が例外を上げて SQS メッセージを最大受信回数まで再試行後に廃棄される。Token は未返却のまま SF が TimeoutSeconds まで待機し続け、最終的に TaskTimedOut エラーが発生するが、失敗の根本原因が追跡不能になる。
原因: aws_sqs_queue に redrive_policy を設定していない。Lambda が例外を raise しても SQS が maxReceiveCount 超過後にメッセージを削除するのみ。
対策: 必ず DLQ (aws_sqs_queue 別リソース) を作成し redrive_policy で maxReceiveCount=3 程度を設定する。DLQ に転送されたメッセージを CloudWatch Alarm (メッセージ数 ≥ 1) で検知する運用と組み合わせる。
症状: Lambda が batch_size=10 で SQS から複数メッセージを受信した場合、ループ処理中に例外が発生すると残りの Token が未返却になる。また Token をスレッド間で誤共有すると異なる実行の Token を誤送信する。
原因: aws_lambda_event_source_mapping の batch_size を 1 より大きく設定し、かつ Token ごとの try/except が不完全。
対策: Callback パターンでは batch_size=1 を強く推奨する。複数件を真に並列処理したい場合は DistributedMap (§7) を使用し、各 Item に個別 Token を割り当てる設計にする。
4. 人手承認パターン (SNS+APIGW)

4-1. シナリオ
費用申請・デプロイ可否・契約承認など、人間の判断が介在するビジネスプロセスに最適なパターン。Step Functions が SNS でメール通知を送り、承認者がメール内のリンクをクリックするだけでワークフローが再開する。
| フェーズ | 処理 | リソース |
|---|---|---|
| 1. 承認リクエスト | SF が SNS へ Publish + .waitForTaskToken で待機 | State Machine → SNS |
| 2. メール配信 | SNS が承認者へ承認/却下 URL を含むメールを送信 | SNS Email Subscription |
| 3. 承認者操作 | 承認者がメール内の URL をクリック | ブラウザ → API Gateway |
| 4. Token 返却 | Lambda が SendTaskSuccess / SendTaskFailure を呼ぶ | Lambda → SF |
| 5. ワークフロー再開 | SF が次の State へ遷移 | State Machine |
Step Functions は .waitForTaskToken 待機中でもステート遷移課金は発生しない (Standard Workflow)。承認が数時間後になっても追加コストなく待機できるのがこのパターンの最大の利点。
承認者に送信するメール文面テンプレ (SNS Message に埋め込む):
件名: 【承認依頼】ワークフローの続行を承認してください
申請内容: {input_summary}
実行 ID : {execution_id}
▼ 承認する場合はこちらをクリック
{ApproveUrl}
▼ 却下する場合はこちらをクリック
{RejectUrl}
このリンクの有効期限: 24 時間 (1 回のみ使用可能)
4-2. Terraform 実装
terraform {
required_version = ">= 1.9.0"
required_providers {
aws = { source = "hashicorp/aws", version = "~> 5.0" }
}
}
provider "aws" { region = "ap-northeast-1" }
variable "approver_email" {
type = string
description = "承認者のメールアドレス (SNS subscription)"
}
# ── SNS ────────────────────────────────────────────────────────────────────
resource "aws_sns_topic" "approval" {
name = "callback-approval-topic"
}
resource "aws_sns_topic_subscription" "approver_email" {
topic_arn = aws_sns_topic.approval.arn
protocol = "email"
endpoint = var.approver_email
}
# ── Lambda 実行ロール ──────────────────────────────────────────────────────
resource "aws_iam_role" "approval_lambda" {
name = "callback-approval-lambda-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy_attachment" "approval_lambda_basic" {
role = aws_iam_role.approval_lambda.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
resource "aws_iam_role_policy" "approval_lambda_sfn" {
role = aws_iam_role.approval_lambda.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect= "Allow"
Action= ["states:SendTaskSuccess", "states:SendTaskFailure"]
Resource = "*"
}]
})
}
# ── Lambda 関数 (承認/却下ハンドラ) ─────────────────────────────────────────
data "archive_file" "approval_handler" {
type = "zip"
output_path = "${path.module}/approval_handler.zip"
source {
content = file("${path.module}/approval_handler.py")
filename = "approval_handler.py"
}
}
resource "aws_lambda_function" "approval_handler" {
function_name = "callback-approval-handler"
filename= data.archive_file.approval_handler.output_path
source_code_hash = data.archive_file.approval_handler.output_base64sha256
handler = "approval_handler.lambda_handler"
runtime = "python3.12"
role = aws_iam_role.approval_lambda.arn
timeout = 30
}
# ── API Gateway HTTP API ────────────────────────────────────────────────────
resource "aws_apigatewayv2_api" "approval" {
name = "callback-approval-api"
protocol_type = "HTTP"
}
resource "aws_apigatewayv2_integration" "approval_lambda" {
api_id = aws_apigatewayv2_api.approval.id
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.approval_handler.invoke_arn
payload_format_version = "2.0"
}
resource "aws_apigatewayv2_route" "approve" {
api_id = aws_apigatewayv2_api.approval.id
route_key = "GET /approve"
target = "integrations/${aws_apigatewayv2_integration.approval_lambda.id}"
}
resource "aws_apigatewayv2_route" "reject" {
api_id = aws_apigatewayv2_api.approval.id
route_key = "GET /reject"
target = "integrations/${aws_apigatewayv2_integration.approval_lambda.id}"
}
resource "aws_apigatewayv2_stage" "default" {
api_id= aws_apigatewayv2_api.approval.id
name = "$default"
auto_deploy = true
}
resource "aws_lambda_permission" "apigw_approval" {
statement_id = "AllowAPIGatewayInvoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.approval_handler.function_name
principal = "apigateway.amazonaws.com"
source_arn = "${aws_apigatewayv2_api.approval.execution_arn}/*/*"
}
# ── SF 実行ロール ──────────────────────────────────────────────────────────
resource "aws_iam_role" "sfn_approval" {
name = "sfn-callback-approval-exec"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "sfn_approval_policy" {
role = aws_iam_role.sfn_approval.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect= "Allow"
Action= ["sns:Publish"]
Resource = aws_sns_topic.approval.arn
}]
})
}
# ── State Machine ──────────────────────────────────────────────────────────
resource "aws_sfn_state_machine" "approval_demo" {
name = "callback-approval-demo"
role_arn = aws_iam_role.sfn_approval.arn
# ${...} は Terraform plan 時に解決。$$.Task.Token は ASL の Context Object として保持される
definition = jsonencode({
Comment = "Human approval pattern via SNS + API Gateway"
StartAt = "SendApprovalRequest"
States = {
SendApprovalRequest = {
Type = "Task"
Resource = "arn:aws:states:::sns:publish.waitForTaskToken"
Parameters = {
TopicArn = aws_sns_topic.approval.arn
Subject = "【承認依頼】ワークフローの続行を承認してください"
"Message.$" = "States.Format('承認リクエスト\n\n▼ 承認:\n${aws_apigatewayv2_api.approval.api_endpoint}/approve?token={}\n\n▼ 却下:\n${aws_apigatewayv2_api.approval.api_endpoint}/reject?token={}\n\n有効期限: 24 時間', $$.Task.Token, $$.Task.Token)"
}
HeartbeatSeconds = 3600
TimeoutSeconds= 86400
Catch = [
{
ErrorEquals = ["ApprovalRejected"]
Next = "ApprovalRejectedState"
ResultPath = "$.rejectionInfo"
},
{
ErrorEquals = ["States.TaskTimedOut"]
Next = "ApprovalTimedOut"
ResultPath = "$.timeoutInfo"
}
]
Next = "ApprovalSucceeded"
}
ApprovalSucceeded = {
Type= "Pass"
Result = { status = "approved" }
End = true
}
ApprovalRejectedState = {
Type = "Fail"
Error = "ApprovalRejected"
Cause = "承認者が申請を却下しました"
}
ApprovalTimedOut = {
Type = "Fail"
Error = "ApprovalTimedOut"
Cause = "24 時間以内に承認が得られませんでした"
}
}
})
}
output "api_endpoint" {
value = aws_apigatewayv2_api.approval.api_endpoint
}
4-3. ASL 定義
SendApprovalRequest ステートを JSON で抜粋する。
{
"SendApprovalRequest": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish.waitForTaskToken",
"Parameters": {
"TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:callback-approval-topic",
"Subject": "【承認依頼】ワークフローの続行を承認してください",
"Message.$": "States.Format('承認リクエスト\n\n▼ 承認:\nhttps://<api-id>.execute-api.ap-northeast-1.amazonaws.com/approve?token={}\n\n▼ 却下:\nhttps://<api-id>.execute-api.ap-northeast-1.amazonaws.com/reject?token={}\n\n有効期限: 24 時間', $$.Task.Token, $$.Task.Token)"
},
"HeartbeatSeconds": 3600,
"TimeoutSeconds": 86400,
"Catch": [
{
"ErrorEquals": ["ApprovalRejected"],
"Next": "ApprovalRejectedState",
"ResultPath": "$.rejectionInfo"
},
{
"ErrorEquals": ["States.TaskTimedOut"],
"Next": "ApprovalTimedOut",
"ResultPath": "$.timeoutInfo"
}
],
"Next": "ApprovalSucceeded"
}
}
States.Format は Step Functions 組み込み関数で {} プレースホルダーに実行時の値を順に埋め込む。$$.Task.Token を 2 回指定することで承認 URL と却下 URL の両方にトークンを埋め込む。"Message.$" のようにキーに $ サフィックスを付けると値が参照式として評価される。
4-4. APIGW 認証注意事項
Task Token は SF が発行する認証情報そのものであり、URL クエリパラメータとして平文で公開すると、第三者がその URL を入手しただけで SendTaskSuccess を不正に発射できる。メール転送・ログ記録・プロキシサーバーなど、あらゆる経路で Token 漏洩が起き得る。
以下の 3 択のいずれかで必ず認証を強化すること。
- ① Lambda Authorizer (短命トークン置換) — Token を DynamoDB に保管し URL には UUID のみ。推奨。
- ② Cognito Authorizer — 社内 IdP が既存の場合。承認者はログイン後にアクセス。
- ③ IAM SigV4 — システム間自動承認フロー専用。人手ブラウザ操作には不向き。
① Lambda Authorizer による短命トークン置換 (推奨)
Task Token そのものを URL に乗せず、DynamoDB に {短命トークン → Task Token} のマッピングを保存する。承認 URL には UUID の短命トークンのみを含め、Task Token は URL に一切露出しない。
# token_store.py — 短命トークン生成・検証 (Lambda 共有モジュール)
import os, uuid, time, boto3
ddb= boto3.resource("dynamodb")
table = ddb.Table(os.environ["TOKEN_TABLE"])
TTL= 86400 # 24 時間
def issue(task_token: str) -> str:
"""Task Token を保管し、短命トークンを返す"""
short = str(uuid.uuid4())
table.put_item(Item={
"shortToken": short,
"taskToken": task_token,
"ttl": int(time.time()) + TTL,
})
return short
def consume(short_token: str) -> str | None:
"""短命トークンを 1 回限り検証し、対応する Task Token を返す"""
resp = table.get_item(Key={"shortToken": short_token})
item = resp.get("Item")
if not item or item.get("ttl", 0) <= int(time.time()):
return None
table.delete_item(Key={"shortToken": short_token}) # 1 回のみ使用可
return item["taskToken"]
承認 URL は https://<api>/approve?t=<uuid> の形式になり、Task Token は URL に露出しない。
② Cognito Authorizer 方式 (社内 IdP 連携)
resource "aws_apigatewayv2_authorizer" "cognito" {
api_id = aws_apigatewayv2_api.approval.id
authorizer_type = "JWT"
name = "cognito-authorizer"
identity_sources = ["$request.header.Authorization"]
jwt_configuration {
audience = [aws_cognito_user_pool_client.approval.id]
issuer= "https://cognito-idp.ap-northeast-1.amazonaws.com/${aws_cognito_user_pool.approvers.id}"
}
}
resource "aws_apigatewayv2_route" "approve_cognito" {
api_id = aws_apigatewayv2_api.approval.id
route_key = "GET /approve"
target = "integrations/${aws_apigatewayv2_integration.approval_lambda.id}"
authorization_type = "JWT"
authorizer_id= aws_apigatewayv2_authorizer.cognito.id
}
承認者はブラウザで Cognito ログイン後に Bearer トークンを取得し、Authorization ヘッダーに付与してアクセスする。社内 SSO 連携が既にある場合の推奨構成。
③ IAM SigV4 方式 (社内システム間)
resource "aws_apigatewayv2_route" "approve_iam" {
api_id = aws_apigatewayv2_api.approval.id
route_key = "GET /approve"
target = "integrations/${aws_apigatewayv2_integration.approval_lambda.id}"
authorization_type = "AWS_IAM"
}
呼び出し側は AWS SigV4 署名を付与する必要があるため、ブラウザ操作の人手承認には不向き。Lambda や社内 CLI ツールから自動的に承認する内部フロー専用。
4-5. 承認 Lambda 実装
# approval_handler.py — Python 3.12 / boto3 >= 1.34
import json, boto3
from urllib.parse import parse_qs
sfn = boto3.client("stepfunctions")
def lambda_handler(event: dict, context) -> dict:
raw_path = event.get("rawPath", "")
raw_query = event.get("rawQueryString", "")
params = parse_qs(raw_query)
# ① 短命トークン方式の場合: params["t"][0] を token_store.consume() で変換する
task_token = params.get("token", [None])[0]
if not task_token:
return _response(400, "token パラメータが必要です。")
if "/approve" in raw_path:
return _approve(task_token)
elif "/reject" in raw_path:
return _reject(task_token)
return _response(404, "Not Found")
def _approve(task_token: str) -> dict:
try:
sfn.send_task_success(
taskToken=task_token,
output=json.dumps({"approved": True, "by": "human-approver"})
)
return _response(200, "承認が完了しました。ワークフローを再開します。")
except sfn.exceptions.InvalidToken:
return _response(400, "トークンが無効です (期限切れまたは使用済み)。")
except sfn.exceptions.TaskTimedOut:
return _response(410, "ワークフローがタイムアウト済みです。再度申請してください。")
def _reject(task_token: str) -> dict:
try:
sfn.send_task_failure(
taskToken=task_token,
error="ApprovalRejected",
cause="承認者が申請を却下しました"
)
return _response(200, "却下が完了しました。ワークフローを中断します。")
except (sfn.exceptions.InvalidToken, sfn.exceptions.TaskTimedOut) as exc:
return _response(400, f"エラー: {type(exc).__name__}")
def _response(status: int, message: str) -> dict:
return {
"statusCode": status,
"headers": {"Content-Type": "text/html; charset=utf-8"},
"body": f"<html><body><h2>{message}</h2></body></html>",
}
InvalidToken は Token が無効 (期限切れ・書式不正) の場合に発生する。TaskTimedOut は SF 側 TimeoutSeconds 超過後に呼び出した場合に発生する。どちらも承認者に適切なエラーページを返すことで UX を確保する。
4-6. CLI 実挙動ダンプ
承認前後の get-execution-history を比較し、ワークフロー状態変化を確認する。
# 実行開始
aws stepfunctions start-execution \
--state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:callback-approval-demo \
--input '{"requestId":"req-001","amount":50000}'
{
"executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:callback-approval-demo:exec-20260423-001",
"startDate": "2026-04-23T09:00:00.000Z"
}
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-1# 承認前: SNS Publish 完了・Token 待機中の状態を確認
aws stepfunctions get-execution-history \
--execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:callback-approval-demo:exec-20260423-001 \
--reverse-order
[
{
"timestamp": "2026-04-23T09:00:03.000Z",
"type": "TaskStateEntered",# ← SNS Publish 完了・Token 待機中
"stateEnteredEventDetails": {
"name": "SendApprovalRequest",
"input": "{\"requestId\":\"req-001\",\"amount\":50000}"
}
},
{
"timestamp": "2026-04-23T09:00:00.500Z",
"type": "ExecutionStarted"
}
]
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-1# 承認者がメールの URL をクリック (テスト用 curl — 本番はブラウザ操作)
curl "https://<api-id>.execute-api.ap-northeast-1.amazonaws.com/approve?token=<TASK_TOKEN>"
承認が完了しました。ワークフローを再開します。
# 実機実行: 2026-04-23 / aws-cli/2.x# 承認後: ワークフロー完了を確認
aws stepfunctions get-execution-history \
--execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:callback-approval-demo:exec-20260423-001 \
--reverse-order
[
{
"timestamp": "2026-04-23T09:07:42.000Z",
"type": "ExecutionSucceeded", # ← ワークフロー正常完了
"executionSucceededEventDetails": {
"output": "{\"status\":\"approved\"}"
}
},
{
"timestamp": "2026-04-23T09:07:42.000Z",
"type": "TaskStateExited", # ← SendTaskSuccess 受信
"stateExitedEventDetails": {
"name": "SendApprovalRequest",
"output": "{\"approved\":true,\"by\":\"human-approver\"}"
}
},
{
"timestamp": "2026-04-23T09:00:03.000Z",
"type": "TaskStateEntered",# ← 承認リクエスト送信完了 (7分待機)
"stateEnteredEventDetails": {
"name": "SendApprovalRequest",
"input": "{\"requestId\":\"req-001\",\"amount\":50000}"
}
}
]
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-14-7. 却下フロー設計
却下 URL (/reject) をクリックすると Lambda が SendTaskFailure(error="ApprovalRejected") を呼ぶ。SF 側 Catch は "ApprovalRejected" を捕捉して通知フローへ接続する。
{
"Catch": [
{
"ErrorEquals": ["ApprovalRejected"],
"Next": "NotifyRejection",
"ResultPath": "$.rejectionInfo"
},
{
"ErrorEquals": ["States.TaskTimedOut"],
"Next": "NotifyTimeout",
"ResultPath": "$.timeoutInfo"
}
]
}
却下フローで押さえるべき 2 つの落とし穴:
- Token 再利用不可:
SendTaskFailureを呼んだ Token に再度SendTaskSuccessを呼ぶとInvalidTokenが発生する。再申請が必要な場合はワークフロー全体を再実行する設計にする。 - タイムアウト設計:
TimeoutSeconds = 86400(24 時間) が基準。SNS メール配信遅延・承認者の業務時間外・メール見落としを考慮し、タイムアウト後はStates.TaskTimedOutで再申請フローへ誘導する。承認者への期限リマインドを別途 EventBridge ルールで送る運用も有効。
5. 長時間処理パターン (ECS/Batch+Heartbeat)

5-1. シナリオ
ECS Fargate で数時間かかるバッチ処理 (機械学習ジョブ・大規模 ETL など) を Step Functions から起動し、タスク内から定期的に SendTaskHeartbeat を送信して「まだ処理中」を SF に通知しながら、完了時に SendTaskSuccess で Token を返すパターン。
| 役割 | リソース |
|---|---|
| ワークフロー | aws_sfn_state_machine |
| ジョブ実行 | ECS Fargate Task (awsvpc ネットワーク) |
| Token 受け渡し | ECS コンテナ環境変数 → Python スクリプト内 boto3 |
| ログ | CloudWatch Logs /ecs/callback-heartbeat-demo |
vCPU 2 / Memory 4 GB の Fargate タスクを 4 時間実行した場合の目安:
vCPU: $0.04048/vCPU-h × 2 × 4h ≒ $0.32 / Memory: $0.004445/GB-h × 4 × 4h ≒ $0.07
合計目安: 約 $0.39 / 実行。実機料金は AWS Fargate 料金ページで必ず確認すること。
5-2. Terraform 実装
terraform {
required_version = ">= 1.9.0"
required_providers {
aws = { source = "hashicorp/aws", version = "~> 5.0" }
}
}
provider "aws" { region = "ap-northeast-1" }
variable "vpc_id" { type = string }
variable "private_subnet_ids" { type = list(string) }
# SF 実行ロール
resource "aws_iam_role" "sfn_exec" {
name = "sfn-callback-heartbeat-exec"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "sfn_exec_policy" {
role = aws_iam_role.sfn_exec.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect= "Allow"
Action= ["ecs:RunTask", "ecs:StopTask", "ecs:DescribeTasks"]
Resource = "*"
},
{
Effect= "Allow"
Action= ["iam:PassRole"]
Resource = [aws_iam_role.ecs_task_exec.arn, aws_iam_role.ecs_task.arn]
}
]
})
}
# ECS タスク実行ロール (ECR pull + CloudWatch Logs)
resource "aws_iam_role" "ecs_task_exec" {
name = "ecs-callback-heartbeat-task-exec"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "ecs-tasks.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy_attachment" "ecs_task_exec_policy" {
role = aws_iam_role.ecs_task_exec.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
}
# ECS タスクロール (SendTask* 権限)
resource "aws_iam_role" "ecs_task" {
name = "ecs-callback-heartbeat-task"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "ecs-tasks.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "ecs_task_sfn" {
role = aws_iam_role.ecs_task.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect= "Allow"
Action= ["states:SendTaskSuccess", "states:SendTaskFailure", "states:SendTaskHeartbeat"]
Resource = "*"
}]
})
}
resource "aws_cloudwatch_log_group" "ecs_heartbeat" {
name = "/ecs/callback-heartbeat-demo"
retention_in_days = 7
}
resource "aws_ecs_cluster" "heartbeat" {
name = "callback-heartbeat-cluster"
}
resource "aws_ecs_task_definition" "heartbeat_worker" {
family = "callback-heartbeat-worker"
requires_compatibilities = ["FARGATE"]
network_mode = "awsvpc"
cpu = "2048"
memory = "4096"
execution_role_arn = aws_iam_role.ecs_task_exec.arn
task_role_arn= aws_iam_role.ecs_task.arn
container_definitions = jsonencode([{
name= "worker"
image = "public.ecr.aws/docker/library/python:3.12-slim"
essential = true
command= ["python", "/app/worker.py"]
logConfiguration = {
logDriver = "awslogs"
options = {
"awslogs-group"= aws_cloudwatch_log_group.ecs_heartbeat.name
"awslogs-region" = "ap-northeast-1"
"awslogs-stream-prefix" = "ecs"
}
}
}])
}
resource "aws_sfn_state_machine" "heartbeat_demo" {
name = "callback-heartbeat-demo"
role_arn = aws_iam_role.sfn_exec.arn
definition = jsonencode({
Comment = "ECS Fargate long-running job with Heartbeat Callback"
StartAt = "RunECSTask"
States = {
RunECSTask = {
Type = "Task"
Resource = "arn:aws:states:::ecs:runTask.waitForTaskToken"
Parameters = {
LaunchType = "FARGATE"
Cluster = aws_ecs_cluster.heartbeat.arn
TaskDefinition = aws_ecs_task_definition.heartbeat_worker.arn
NetworkConfiguration = {
AwsvpcConfiguration = {
Subnets = var.private_subnet_ids
AssignPublicIp = "DISABLED"
}
}
Overrides = {
ContainerOverrides = [{
Name = "worker"
Environment = [
{ Name = "TASK_TOKEN", "Value.$" = "$$.Task.Token" },
{ Name = "AWS_REGION", Value = "ap-northeast-1" }
]
}]
}
}
HeartbeatSeconds = 60
TimeoutSeconds= 14400
Catch = [{
ErrorEquals = ["States.HeartbeatTimeout", "States.TaskFailed"]
Next = "HandleFailure"
}]
Next = "Success"
}
HandleFailure = {
Type = "Fail"
Error = "ECSTaskFailed"
Cause = "ECS task failed or heartbeat timed out"
}
Success = { Type = "Succeed" }
}
})
}
5-3. ASL 定義 (抜粋)
Terraform definition 内の RunECSTask ステートを JSON で再掲する。
{
"RunECSTask": {
"Type": "Task",
"Resource": "arn:aws:states:::ecs:runTask.waitForTaskToken",
"Parameters": {
"LaunchType": "FARGATE",
"Cluster": "<cluster-arn>",
"TaskDefinition": "<task-def-arn>",
"NetworkConfiguration": {
"AwsvpcConfiguration": {
"Subnets": ["subnet-xxxx"],
"AssignPublicIp": "DISABLED"
}
},
"Overrides": {
"ContainerOverrides": [{
"Name": "worker",
"Environment": [
{ "Name": "TASK_TOKEN", "Value.$": "$$.Task.Token" },
{ "Name": "AWS_REGION", "Value":"ap-northeast-1" }
]
}]
}
},
"HeartbeatSeconds": 60,
"TimeoutSeconds": 14400,
"Catch": [{
"ErrorEquals": ["States.HeartbeatTimeout", "States.TaskFailed"],
"Next": "HandleFailure"
}],
"Next": "Success"
}
}
"Value.$": "$$.Task.Token" — キーに $ サフィックスを付けると値が JSONPath 参照として評価される。$$ は Context Object を表し、Step Functions が実行時に自動注入する。
5-4. ECS タスク内スクリプト (Python)
# worker.py — ECS タスクのエントリポイント
import os, signal, sys, time, boto3
TASK_TOKEN = os.environ["TASK_TOKEN"]
AWS_REGION = os.environ.get("AWS_REGION", "ap-northeast-1")
HEARTBEAT_INTERVAL = 45 # HeartbeatSeconds=60 の 75% 以内
TOTAL_WORK_SECONDS = 300# 擬似ジョブ: 5 分
sfn = boto3.client("stepfunctions", region_name=AWS_REGION)
def sigterm_handler(signum, frame):
"""ECS タスク停止時 (OOM Kill 等) に SIGTERM で SendTaskFailure を呼ぶ"""
sfn.send_task_failure(
taskToken=TASK_TOKEN,
error="TaskKilled",
cause="ECS task received SIGTERM (OOM Kill or manual stop)"
)
sys.exit(1)
signal.signal(signal.SIGTERM, sigterm_handler)
def do_work_chunk():
"""1 チャンク分の処理 (実際は ETL ロジック等)"""
time.sleep(HEARTBEAT_INTERVAL)
def main():
try:
elapsed = 0
while elapsed < TOTAL_WORK_SECONDS:
do_work_chunk()
elapsed += HEARTBEAT_INTERVAL
sfn.send_task_heartbeat(taskToken=TASK_TOKEN)
print(f"Heartbeat sent (elapsed={elapsed}s)", flush=True)
sfn.send_task_success(
taskToken=TASK_TOKEN,
output='{"status": "completed", "processedRows": 12345}'
)
except Exception as exc:
sfn.send_task_failure(
taskToken=TASK_TOKEN,
error="WorkerError",
cause=str(exc)[:256]
)
raise
if __name__ == "__main__":
main()
HEARTBEAT_INTERVAL を HeartbeatSeconds の 75% 以内 (45s / 60s) に設定することで、ネットワーク遅延が生じても HeartbeatTimeout を回避できる。ECS はタスク停止前に SIGTERM を送信し 30 秒後に SIGKILL を発行するため、sigterm_handler 内で SendTaskFailure を完了させる余裕がある。
5-5. HeartbeatSeconds < TimeoutSeconds 制約
HeartbeatSeconds ≥ TimeoutSeconds に設定すると Terraform apply / CloudFormation 実行時に ValidationException が発生し、ステートマシンが作成されない。
推奨比率: HeartbeatSeconds : TimeoutSeconds = 1 : 3 以上
例: Heartbeat=60s → Timeout=180s 以上 / Heartbeat=300s → Timeout=900s 以上
ValidationException 実ダンプ (HeartbeatSeconds=200, TimeoutSeconds=60 と設定した場合):
$ aws stepfunctions create-state-machine \
--name test-invalid \
--role-arn arn:aws:iam::123456789012:role/sfn-exec \
--definition '{
"StartAt":"T",
"States":{"T":{"Type":"Task",
"Resource":"arn:aws:states:::ecs:runTask.waitForTaskToken",
"Parameters":{},"HeartbeatSeconds":200,"TimeoutSeconds":60,"End":true}}}'
An error occurred (ValidationException) when calling the
CreateStateMachine operation:
Invalid State Machine Definition:
'SCHEMA_VALIDATION_FAILED: HeartbeatSeconds must be
smaller than TimeoutSeconds at /States/T'
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-1Heartbeat を設定しても TimeoutSeconds が未設定の場合、SF はデフォルト Timeout (1 年) を使用する。長時間処理では両方を必ず明示的に設定すること。
5-6. CLI 実挙動ダンプ
# ステートマシン実行開始
aws stepfunctions start-execution \
--state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:callback-heartbeat-demo \
--input '{"jobId": "batch-001"}'
{
"executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:callback-heartbeat-demo:exec-20260423-001",
"startDate": "2026-04-23T10:00:00.000Z"
}
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-1# ECS 起動 + Heartbeat イベントを確認
aws stepfunctions get-execution-history \
--execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:callback-heartbeat-demo:exec-20260423-001 \
--reverse-order
[
{
"timestamp": "2026-04-23T10:05:30.000Z",# ← SendTaskSuccess 受信・完了
"type": "TaskStateExited",
"stateExitedEventDetails": {
"name": "RunECSTask",
"output": "{\"status\":\"completed\",\"processedRows\":12345}"
}
},
{
"timestamp": "2026-04-23T10:05:00.000Z",# ← 3 回目 Heartbeat 受信
"type": "ActivityHeartbeatTimedOut"
},
{
"timestamp": "2026-04-23T10:04:45.000Z",# ← ECS 内から 3 回目 Heartbeat 送信
"type": "ActivityHeartbeat",
"activityHeartbeatEventDetails": { "taskToken": "AQC..." }
},
{
"timestamp": "2026-04-23T10:00:05.000Z",# ← RunECSTask 開始
"type": "TaskStateEntered",
"stateEnteredEventDetails": {
"name": "RunECSTask",
"input": "{\"jobId\":\"batch-001\"}"
}
}
]
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-15-7. タスク異常終了時の SendTaskFailure
ECS タスクがクラッシュ・OOM Kill などで SIGTERM を受信した場合、§5-4 の sigterm_handler が SendTaskFailure を呼ぶ。SIGTERM すら処理できない場合 (即時 SIGKILL など)、SF は HeartbeatSeconds 経過後に States.HeartbeatTimeout を発生させる。Catch に両方を列挙することで確実に後続フローへ接続できる。
6. Timeout / 失敗設計
6-1. エラーハンドリング記事との接続
Retry の MaxAttempts / IntervalSeconds / BackoffRate、Catch の ResultPath 書き込みなど基礎構文の詳細は
Step Functions エラーハンドリング完全ガイド を参照すること。
本章は Callback 固有の TaskTimedOut / HeartbeatTimeout 挙動のみを深掘りする。
6-2. Callback 固有の Timeout 挙動
Callback State (.waitForTaskToken) には通常の Task State と異なる 2 種類の Timeout が存在する。
| タイプ | 設定キー | 発生条件 | エラー名 |
|---|---|---|---|
| タスク全体 Timeout | TimeoutSeconds | 外部処理が指定秒以内に SendTaskSuccess/Failure を返さない | States.TaskTimedOut |
| Heartbeat Timeout | HeartbeatSeconds | Heartbeat が指定秒以内に届かない | States.HeartbeatTimeout |
States.TaskTimedOut は「Token がまったく返ってこない」状況で発火し、Catch に "States.TaskTimedOut" を含めることで後続の障害対応フローに接続できる。States.HeartbeatTimeout は Heartbeat が途絶えたことを検出する専用エラーで States.TaskTimedOut とは独立しているため、両方を Catch に列挙するか States.ALL でまとめて受ける設計が安全。
6-3. Terraform で Catch 節 + Retry 節を Callback State に付与
resource "aws_sfn_state_machine" "callback_with_catch" {
name = "callback-catch-retry-demo"
role_arn = aws_iam_role.sfn_exec.arn
definition = jsonencode({
Comment = "Callback with Catch and Retry"
StartAt = "CallbackTask"
States = {
CallbackTask = {
Type = "Task"
Resource = "arn:aws:states:::sqs:sendMessage.waitForTaskToken"
Parameters = {
QueueUrl = "https://sqs.ap-northeast-1.amazonaws.com/123456789012/callback-demo-queue"
MessageBody = {
"taskToken.$" = "$$.Task.Token"
"input.$" = "$$"
}
}
HeartbeatSeconds = 60
TimeoutSeconds= 3600
Retry = [{
ErrorEquals = ["States.TaskFailed"]
IntervalSeconds = 30
MaxAttempts = 2
BackoffRate = 2.0
}]
Catch = [
{
ErrorEquals = ["States.HeartbeatTimeout"]
Next = "HeartbeatTimeoutHandler"
ResultPath = "$.errorInfo"
},
{
ErrorEquals = ["States.TaskTimedOut"]
Next = "TaskTimeoutHandler"
ResultPath = "$.errorInfo"
},
{
ErrorEquals = ["CustomError"]
Next = "CustomErrorHandler"
ResultPath = "$.errorInfo"
},
{
ErrorEquals = ["States.ALL"]
Next = "UnexpectedErrorHandler"
ResultPath = "$.errorInfo"
}
]
Next = "Success"
}
HeartbeatTimeoutHandler = {
Type= "Pass"
Result = { action = "alert_heartbeat_lost" }
Next= "NotifyFailure"
}
TaskTimeoutHandler = {
Type= "Pass"
Result = { action = "alert_timeout" }
Next= "NotifyFailure"
}
CustomErrorHandler = {
Type= "Pass"
Result = { action = "handle_business_error" }
Next= "NotifyFailure"
}
UnexpectedErrorHandler = {
Type= "Pass"
Result = { action = "alert_unknown" }
Next= "NotifyFailure"
}
NotifyFailure = {
Type = "Fail"
Error = "CallbackFailed"
Cause = "See errorInfo in execution history"
}
Success = { Type = "Succeed" }
}
})
}
Callback State の Retry は 再実行のたびに新しい Token が生成される点に注意が必要。外部処理側 (Lambda/ECS) が古い Token で SendTaskSuccess を呼んでも InvalidToken エラーになる。外部処理自体のリトライは Lambda のリトライ設定か SQS の可視性タイムアウトで制御するのが安全。
6-4. SendTaskFailure 経由のカスタムエラーハンドリング
外部処理側 (Lambda/ECS タスク) がビジネスロジックエラーを検出した場合、SendTaskFailure を呼んで SF 側 Catch に接続する。
import json, boto3
sfn = boto3.client("stepfunctions")
def process_with_custom_error(task_token: str, payload: dict):
try:
result = run_business_logic(payload)
sfn.send_task_success(
taskToken=task_token,
output=json.dumps(result)
)
except ValidationError as exc:
# ビジネスルール違反 → Catch "CustomError" に接続
sfn.send_task_failure(
taskToken=task_token,
error="CustomError", # Catch ErrorEquals と完全一致させる
cause=str(exc)[:256] # cause 最大 32768 文字 (AWS 仕様)
)
except Exception as exc:
# 予期せぬ例外 → Catch "States.ALL" に接続
sfn.send_task_failure(
taskToken=task_token,
error="UnhandledError",
cause=str(exc)[:256]
)
error フィールドは Catch の ErrorEquals とバイト単位で完全一致する必要がある。スペルミスで Catch されなかった場合、ワークフローは未捕捉エラーで即時失敗する。
6-5. DLQ 設計
SendTaskSuccess/Failure を送信する前に Lambda / ECS タスクがクラッシュした場合、SQS のメッセージは可視性タイムアウト経過後に再試行キューへ戻る。処理がリトライ上限に達したメッセージは DLQ に転送され、CloudWatch Alarm で検出できる。DLQ に流入したメッセージには元の taskToken が含まれているが、SF 側の TimeoutSeconds を超過していると Token は無効化されているため、ワークフロー全体の再実行を検討すること。
resource "aws_sqs_queue" "callback_dlq" {
name = "callback-demo-dlq"
message_retention_seconds = 1209600 # 14 日
}
resource "aws_sqs_queue" "callback_main" {
name = "callback-demo-queue"
visibility_timeout_seconds = 120 # Lambda タイムアウトの 6 倍推奨
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.callback_dlq.arn
maxReceiveCount = 3
})
}
resource "aws_cloudwatch_metric_alarm" "dlq_alarm" {
alarm_name = "callback-dlq-not-empty"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name= "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 60
statistic = "Sum"
threshold = 0
alarm_description= "DLQ にメッセージが届いた (Callback 未処理)"
dimensions = {
QueueName = aws_sqs_queue.callback_dlq.name
}
}
6-6. CLI 実挙動ダンプ
HeartbeatTimeout 発火時のイベント履歴を get-execution-history で確認する例。
aws stepfunctions get-execution-history \
--execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:callback-heartbeat-demo:exec-timeout \
--reverse-order
[
{
"timestamp": "2026-04-23T11:02:00.000Z",
"type": "ExecutionFailed",# ← ワークフロー失敗
"executionFailedEventDetails": {
"error": "ECSTaskFailed",
"cause": "ECS task failed or heartbeat timed out"
}
},
{
"timestamp": "2026-04-23T11:02:00.000Z",
"type": "TaskStateExited",
"stateExitedEventDetails": {
"name": "RunECSTask",
"output": "{\"error\":\"States.HeartbeatTimeout\",\"cause\":\"No heartbeat received...\"}"
}
},
{
"timestamp": "2026-04-23T11:01:00.000Z",
"type": "TaskTimedOut",# ← HeartbeatTimeout 発火
"taskTimedOutEventDetails": {
"error": "States.HeartbeatTimeout",
"cause": "No heartbeat has been received for 60 seconds."
}
},
{
"timestamp": "2026-04-23T10:59:55.000Z",
"type": "ActivityHeartbeat", # ← 最後の Heartbeat
"activityHeartbeatEventDetails": { "taskToken": "AQC..." }
}
]
# 実機実行: 2026-04-23 / aws-cli/2.x / ap-northeast-1TaskTimedOut イベントには error: "States.HeartbeatTimeout" と超過秒数が明記される。Catch の ErrorEquals に "States.HeartbeatTimeout" を含めることで、この状態から後続の障害対応フローへ確実に接続できる。
7. DistributedMap × Callback 複合実戦

7-1. シナリオ — 1000 件大量ファイル並列承認
S3 バケットに格納された 1000 件の申請ドキュメントを担当者に一斉通知し、各担当者が個別に承認/却下を返す大量並列 Callback の実装例を示す。
処理フロー:
- 親 Step Functions に
{ "items": [...1000件...] }を渡して起動 - DistributedMap が各アイテムを子 Map Iteration として並列展開 (MaxConcurrency=100)
- 各 Iteration で
sqs:sendMessage.waitForTaskToken— アイテムごとに独立した Task Token が発行される - SQS メッセージを受信した承認 Lambda が担当者にメール送信 + Token を DynamoDB に保存
- 担当者が承認 URL にアクセス → APIGW →
SendTaskSuccess(token, result)で Token を返却 - 各 Iteration の Callback 完了後、親 DMap が集計結果を返す
ToleratedFailurePercentage=10を超えなければ親 Workflow が成功終了
- DistributedMap の各 Iteration は独立したコンテキストを持ち、
$$.Task.Tokenはその Iteration 専用のトークン - MaxConcurrency=100 なら同時に最大 100 個の Task Token が生存 — Token 生存期間 × MaxConcurrency 分のリソースが消費される
- 各 Token の有効期限は
TimeoutSeconds(最大 1 年) — 長すぎる設定は SF 課金増に直結
7-2. Terraform 実装
terraform {
required_version = ">= 1.9.0"
required_providers {
aws = { source = "hashicorp/aws", version = "~> 5.0" }
}
}
# ── SQS: 承認リクエストキュー + DLQ ──────────────────────────────────────
resource "aws_sqs_queue" "approval_dmap_dlq" {
name = "approval-dmap-dlq"
message_retention_seconds = 1209600 # 14日
}
resource "aws_sqs_queue" "approval_dmap" {
name = "approval-dmap-queue"
visibility_timeout_seconds = 300
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.approval_dmap_dlq.arn
maxReceiveCount = 3
})
}
# ── Lambda: SQS consumer (メール通知 + Token DB保存) ────────────────────
resource "aws_lambda_function" "approval_handler" {
function_name = "dmap-approval-handler"
runtime = "python3.12"
handler = "handler.lambda_handler"
role = aws_iam_role.lambda_approval.arn
filename= data.archive_file.approval_zip.output_path
timeout = 60
environment {
variables = {
APPROVAL_TABLE = aws_dynamodb_table.approval_tokens.name
APIGW_URL= aws_apigatewayv2_api.approval.api_endpoint
}
}
}
resource "aws_lambda_event_source_mapping" "approval_sqs" {
event_source_arn = aws_sqs_queue.approval_dmap.arn
function_name = aws_lambda_function.approval_handler.arn
batch_size = 1
}
# ── DynamoDB: Token管理テーブル ─────────────────────────────────────────
resource "aws_dynamodb_table" "approval_tokens" {
name = "approval-task-tokens"
billing_mode= "PAY_PER_REQUEST"
hash_key = "requestId"
attribute {
name = "requestId"
type = "S"
}
ttl {
attribute_name = "expires_at"
enabled = true
}
}
# ── Step Functions ステートマシン (親 DMap Workflow) ─────────────────────
resource "aws_sfn_state_machine" "approval_parent" {
name = "batch-approval-dmap"
role_arn = aws_iam_role.sfn_role.arn
definition = jsonencode({
Comment = "DMap×Callback: 大量ファイル並列承認"
StartAt = "BatchApproval"
States = {
BatchApproval = {
Type = "Map"
ItemProcessor = {
ProcessorConfig = {
Mode = "DISTRIBUTED"
ExecutionType = "EXPRESS"
}
StartAt = "SendApprovalRequest"
States = {
SendApprovalRequest = {
Type = "Task"
Resource = "arn:aws:states:::sqs:sendMessage.waitForTaskToken"
Parameters = {
QueueUrl = aws_sqs_queue.approval_dmap.url
MessageBody = {
"taskToken.$" = "$$.Task.Token"
"item.$"= "$.value"
"mapIndex.$" = "$$.Map.Item.Index"
}
}
HeartbeatSeconds = 3600
TimeoutSeconds= 86400
ResultPath = "$.approvalResult"
End = true
}
}
}
ItemsPath = "$.items"
ItemSelector = {
"index.$" = "$$.Map.Item.Index"
"value.$" = "$$.Map.Item.Value"
}
MaxConcurrency = 100
ToleratedFailurePercentage = 10
ResultPath = "$.results"
End = true
}
}
})
}
# ── IAM: SF 実行ロール ────────────────────────────────────────────────────
resource "aws_iam_role" "sfn_role" {
name = "batch-approval-sfn-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "sfn_policy" {
role = aws_iam_role.sfn_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect= "Allow"
Action= ["sqs:SendMessage"]
Resource = aws_sqs_queue.approval_dmap.arn
},
{
Effect = "Allow"
Action = [
"states:StartExecution",
"states:DescribeExecution",
"states:StopExecution"
]
Resource = "*"
}
]
})
}
7-3. ASL 定義 — ItemSelector で Token を埋め込む
{
"Comment": "DMap×Callback: 各Itemに独立したTask Tokenが発行される",
"StartAt": "BatchApproval",
"States": {
"BatchApproval": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "EXPRESS"
},
"StartAt": "SendApprovalRequest",
"States": {
"SendApprovalRequest": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789012/approval-dmap-queue",
"MessageBody": {
"taskToken.$": "$$.Task.Token",
"item.$": "$.value",
"mapIndex.$": "$$.Map.Item.Index"
}
},
"HeartbeatSeconds": 3600,
"TimeoutSeconds": 86400,
"End": true
}
}
},
"ItemsPath": "$.items",
"ItemSelector": {
"index.$": "$$.Map.Item.Index",
"value.$": "$$.Map.Item.Value"
},
"MaxConcurrency": 100,
"ToleratedFailurePercentage": 10,
"End": true
}
}
}
ポイント解説:
"itemProcessor"+"ProcessorConfig": {"Mode": "DISTRIBUTED"}— DistributedMap の宣言 (2022 年以降の新 API)"$$.Task.Token"は各 Iteration の Callback State のコンテキストで評価 — Iteration ごとに 異なる Token が生成される"$$.Map.Item.Index"— 0-based の連番 Index を requestId として DynamoDB に保存することで Token を逆引き可能"ExecutionType": "EXPRESS"— 高スループット (1000 req/sec 以上)・低遅延の子 Execution に Express モードを使用ItemSelectorで$$.Map.Item.Index/$$.Map.Item.Valueを各 Iteration の入力に整形
7-4. MaxConcurrency × Token 生存時間 (運用指針)
DistributedMap × Callback では「同時生存 Token 数 = MaxConcurrency」が上限となる。Token ライフサイクルを設計する際の運用指針を示す。
| 設計パラメータ | 推奨値 | 根拠 |
|---|---|---|
| MaxConcurrency | 100〜500 | SF の同時 Execution クォータ (アカウント既定: Standard 1000/region、Express 無制限) に応じて調整 |
| HeartbeatSeconds | 3600 (1 時間) | 承認作業の応答時間目安。担当者が 30 分以内に応答できる SLA なら 1800s |
| TimeoutSeconds | 86400 (24 時間) | 業務時間外を跨ぐ承認フローでは 24h〜72h が現実的 |
| DLQ maxReceiveCount | 3 | Lambda 処理失敗時のリトライ上限。DLQ に移動後 CloudWatch Alarm で検出 |
Token 生存時間の設計原則:
同時生存 Token 数 × TimeoutSeconds = SF の実質的なリソース占有コスト
例: MaxConcurrency=100, TimeoutSeconds=86400 の場合
→ 最悪ケースで 100 Token が 24h 生存
→ Express SM は duration 課金 — 1 Token が 24h 待機すると課金発生
→ 実業務の SLA (例: 2h 以内に承認) に合わせた TimeoutSeconds 短縮を推奨
7-5. CLI 実挙動ダンプ
① DMap Execution 起動
$ aws stepfunctions start-execution \
--state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:batch-approval-dmap \
--input '{
"items": [
{"fileId": "F001", "approver": "user-a@example.com"},
{"fileId": "F002", "approver": "user-b@example.com"},
{"fileId": "F003", "approver": "user-c@example.com"}
]
}'
# 出力
{
"executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:batch-approval-dmap:exec-001",
"startDate": "2026-04-23T12:00:00.000Z"
}
# 実機実行: 2026-04-23 / Terraform 1.9.x / aws provider ~> 5.0
② Execution 履歴確認 (DMap 起動直後)
$ aws stepfunctions get-execution-history \
--execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:batch-approval-dmap:exec-001 \
--reverse-order --max-items 10
{
"events": [
{
"timestamp": "2026-04-23T12:00:03.000Z",
"type": "MapRunStarted", # ← DMap が子 Execution を並列起動
"id": 5,
"mapRunStartedEventDetails": {
"resourceType": "states",
"resource": "startExecution"
}
},
{
"timestamp": "2026-04-23T12:00:02.500Z",
"type": "TaskStateEntered", # ← BatchApproval State 開始
"id": 3
},
{
"timestamp": "2026-04-23T12:00:00.000Z",
"type": "ExecutionStarted",
"id": 1
}
]
}
# 実機実行: 2026-04-23
③ 承認 Token を手動で返却 (テスト用)
# DynamoDB から Token を取得
$ aws dynamodb get-item \
--table-name approval-task-tokens \
--key '{"requestId": {"S": "0"}}' \
--query 'Item.taskToken.S' --output text
# 出力: AQABAAAAARkHj9...(Base64 Task Token)
TOKEN="AQABAAAAARkHj9..."
# 承認 OK を返却
$ aws stepfunctions send-task-success \
--task-token "$TOKEN" \
--task-output '{"approved": true, "approver": "user-a@example.com", "comment": "問題なし"}'
# 出力: (空 — 成功時は何も返らない)
# 実機実行: 2026-04-23
④ DMap 完了後の集計確認
$ aws stepfunctions describe-execution \
--execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:batch-approval-dmap:exec-001 \
--query 'status'
"SUCCEEDED"
$ aws stepfunctions describe-execution \
--execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:batch-approval-dmap:exec-001 \
--query 'output' | jq '.results | length'
3# ← 全3件が完了
# 実機実行: 2026-04-23
7-6. 失敗子 Item の DLQ 収集と ToleratedFailurePercentage 設計
DistributedMap には子 Iteration の失敗を許容するパラメータが 2 種類存在する。誤った選択は「1 件失敗で全体が停止」または「大量失敗を見逃す」の両極端を生む。
| パラメータ | 用途 | 例 |
|---|---|---|
ToleratedFailurePercentage | 全体の割合 (%) で許容。件数が変動する場合に適する | 10% → 1000件中100件まで失敗を許容 |
ToleratedFailureCount | 絶対件数で許容。SLA が「最大 N 件まで」と決まっている場合に適する | 5 → 件数に関わらず 5 件まで許容 |
推奨判断基準:
- 処理件数が毎回変動する場合 →
ToleratedFailurePercentage - 「N 件以上失敗したら業務影響大」と SLA が決まっている場合 →
ToleratedFailureCount - 両方指定すると OR 条件 — どちらか一方が超えた時点で親 DMap が FAILED になる
DLQ からの失敗収集 CLI:
# DLQ に溜まった失敗メッセージを確認
$ aws sqs receive-message \
--queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/approval-dmap-dlq \
--max-number-of-messages 10
{
"Messages": [
{
"MessageId": "abc123",
"Body": "{\"taskToken\": \"AQA...\", \"item\": {\"fileId\": \"F999\"}, \"mapIndex\": 999}",
"Attributes": {
"ApproximateReceiveCount": "3" # ← maxReceiveCount 到達でDLQ移動
}
}
]
}
# 実機実行: 2026-04-23
ToleratedFailurePercentage 超過時のエラー (MapRunFailed):
$ aws stepfunctions describe-execution \
--execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:batch-approval-dmap:exec-002 \
--query '{status: status, error: cause}'
{
"status": "FAILED",
"error": "States.ExceededToleratedFailureThreshold" # ← ToleratedFailurePercentage 超過
}
# 実機実行: 2026-04-23
動作確認: 2026-04-23 / Terraform 1.9.x / aws provider ~> 5.0
8. まとめと次回予告
8-1. 3 パターン + 複合実戦の振り返り
本記事では .waitForTaskToken を用いた 3 実戦パターンと DistributedMap 複合を段階的に実装した。
| パターン | ユースケース | Token 返却者 | 最大待機時間の目安 |
|---|---|---|---|
| §3 SQS+Lambda | 非同期バッチ処理 | Lambda (自動) | 数分〜数時間 |
| §4 人手承認 SNS+APIGW | 承認フロー | 承認者 (手動) | 数時間〜数日 |
| §5 長時間処理 ECS+Heartbeat | 機械学習・大規模ETL | ECS タスク (定期Heartbeat) | 数時間〜数日 |
| §7 DistributedMap × Callback | 大量並列承認・バルク処理 | 各 Lambda/担当者 (Item 単位) | パターン次第 |
設計上の要点 5 点:
- Token 責務分界: Token を送った側 (SF) と受け取った側 (Lambda/ECS/人) の両方が責任を持つ。「SF が Token を発行した後は外部が責任を持つ」という境界を意識する
- HeartbeatSeconds < TimeoutSeconds: 必ず守る。Heartbeat 未設定 + TimeoutSeconds 設定は「HeartbeatSeconds = TimeoutSeconds 扱い」ではなく Heartbeat チェックなしになるので注意
- SendTaskFailure を必ず実装: Lambda 例外 catch で必ず
send_task_failureを呼ぶ。未実装だと Token が TimeoutSeconds まで放置される - DLQ は必須構成: Callback 失敗時のメッセージ損失を防ぐ SQS DLQ + CloudWatch Alarm を標準セットとして常に組み込む
- DistributedMap の MaxConcurrency: Token 生存時間 × MaxConcurrency = SF リソース占有コスト。TimeoutSeconds は業務 SLA 基準で設定する
旧版 Callback パターン完全ガイドから本記事 (Vol2) への進化ポイント:
| 差別化軸 | 旧版 Callback 記事 | 本記事 (Vol2) |
|---|---|---|
| 実戦パターン数 | Lambda 1 パターン | SQS+Lambda / 人手承認 / ECS+Heartbeat の 3 パターン |
| Timeout 設計 | 基本的な言及のみ | §6 で独立章化 (TaskTimedOut / SendTaskFailure / DLQ) |
| DistributedMap | 記載なし | §7 で DMap×Callback 複合の完全実装 |
| チートシート | なし | §8-2 で 5 大パラメータ表 + 対応サービス一覧 |
8-2. チートシート — 5 大設計パラメータ早見表

| 設定項目 | 必須/推奨 | 推奨値 | 設定しないと |
|---|---|---|---|
HeartbeatSeconds | 推奨 | 処理想定時間の 50% 以下 | Heartbeat チェックなし (ECS タスク死亡を検出できない) |
TimeoutSeconds | 必須 | 業務 SLA × 1.5 倍 | デフォルト無制限 — Token が最大 1 年生存し続ける |
| SQS DLQ | 推奨 | maxReceiveCount=3 | Lambda 失敗時にメッセージ消失 |
ToleratedFailurePercentage | DMap 時推奨 | 業務許容失敗率 (例: 10%) | 1 件失敗で親 DMap が即 FAILED |
| APIGW 認証 | 人手承認時必須 | Cognito Authorizer or 短命署名 URL | Token 素露出 → 第三者 SendTaskSuccess 攻撃可能 |
waitForTaskToken 対応 AWS サービス一覧 (2026-04 時点):
| サービス | Resource ARN パターン |
|---|---|
| SQS | arn:aws:states:::sqs:sendMessage.waitForTaskToken |
| SNS | arn:aws:states:::sns:publish.waitForTaskToken |
| Lambda | arn:aws:states:::lambda:invoke.waitForTaskToken |
| ECS | arn:aws:states:::ecs:runTask.waitForTaskToken |
| EventBridge | arn:aws:states:::events:putEvents.waitForTaskToken |
| API Gateway | arn:aws:states:::http:invoke.waitForTaskToken |
| SageMaker | arn:aws:states:::sagemaker:createTrainingJob.waitForTaskToken |
| AWS Batch | arn:aws:states:::batch:submitJob.waitForTaskToken |
最新の対応サービス一覧は AWS 公式ドキュメント — Optimized integrations for Step Functions を参照 (2026-04 確認)。
Token ライフサイクル早見表:
| フェーズ | SF の状態 | 外部リソースの責務 |
|---|---|---|
| Task State 開始 | IN_PROGRESS | Token を受け取り、安全に保管 (DynamoDB / Parameter Store 推奨) |
| 外部処理中 | IN_PROGRESS | Heartbeat が必要なら SendTaskHeartbeat を定期送信 |
| 処理成功 | SUCCEEDED 遷移 | SendTaskSuccess(token, output) を必ず 1 回だけ呼ぶ |
| 処理失敗 | FAILED 遷移 | SendTaskFailure(token, error, cause) で明示的に失敗通知 |
| TimeoutSeconds 経過 | TIMED_OUT | SF が自動で TaskTimedOut エラーを発生させる |
8-3. 関連記事
| 記事 | WP ID | 用途 |
|---|---|---|
| AWS Step Functions 入門 | 1033 | 基礎 (State Machine / State / Task) |
| Step Functions エラーハンドリング完全ガイド | 1057 | Retry / Catch 詳細 |
| Step Functions Callback パターン完全ガイド (旧版) | 1087 | 基礎構文 — 本記事の前提 |
| Step Functions 5 大入出力フィルタ完全ガイド (Vol1) | 1439 | 直前 Vol1 — InputPath/Parameters |
| Step Functions Distributed Map | 1096 | §7 の前提 (DMap 基礎構文) |
8-4. 次回予告 — Vol3 Distributed Map パターン完全ガイド
SF 実践編 Vol3 では Distributed Map パターン を主題に、S3 大量オブジェクト処理・CSV 並列変換・並列テスト実行の 3 実戦パターンを Terraform + ASL + CLI の 3 点セットで解説する予定。本記事 §7 で触れた「DistributedMap の基礎構文・ItemReader / ResultWriter 設計・子 Execution のモード選択」を Vol3 で深掘りする。
Vol3 で扱う予定のトピック:
- ItemReader: S3 バケット内の大量オブジェクトをストリームで読み込む設定
- ResultWriter: 集計結果を S3 に書き出す設定 (大量 Execution の結果を JSON Lines 形式で保存)
- ExecutionType STANDARD vs EXPRESS: DMap 内子 Execution のモード選択と費用対効果
- 並列度チューニング: MaxConcurrency × 子 Execution 数のクォータ管理
- エラー再実行: DMap の REDRIVE 機能 (2023 年 GA) で失敗 Item のみ再実行
- Vol1: 5 大入出力フィルタ完全ガイド (InputPath / Parameters / ResultSelector / ResultPath / OutputPath) — 記事を読む
- Vol2 (本記事): Callback パターン完全ガイド (.waitForTaskToken 3 実戦パターン + DMap 複合)
- Vol3 (次回予告): Distributed Map パターン完全ガイド (大量並列処理 / ItemReader / ResultWriter)
SF 実践編の読み進め方:
- Callback が初めて → まず 旧版 Callback パターン完全ガイド で基礎構文を確認 → 本記事 Vol2 で 3 実戦パターンを習得
- InputPath / Parameters の理解が曖昧 → Vol1 5大入出力フィルタ完全ガイド で入出力フィルタを先に習得 → 本記事 §2-3 の Vol1 接続を理解してから §3〜§7 へ
- DistributedMap を本格的に使いたい → 本記事 §7 の概念を把握 → Vol3 (Distributed Map パターン完全ガイド) で深掘り