- 1 Step Functions Callbackパターン完全ガイド — .waitForTaskTokenで実現する経費承認ワークフロー
- 1.1 目次
- 1.2 Section 1: Callbackパターン(.waitForTaskToken)概念編
- 1.3 Section 2: アーキテクチャ解説
- 1.4 Section 3: AWSコンソールでのハンズオン
- 1.5 3-1. 前提条件
- 1.6 3-2. Lambda 2関数のデプロイ
- 1.7 3-3. SQSキュー作成
- 1.8 3-4. SNSトピック作成とメールサブスクリプション
- 1.9 3-5. IAMロール作成
- 1.10 3-6. ステートマシンを段階的に構築(記事の核心)
- 1.11 3-7. CloudWatch Logsで待機→再開の挙動を確認
- 1.12 まとめ(Section 3)
- 1.13 Section 4: Terraformでの構築
- 1.14 4-1. 前提条件
- 1.15 4-2. ディレクトリ構成
- 1.16 4-3. Lambda ソースコード
- 1.17 4-4. statemachine/definition.json.tpl
- 1.18 4-5. variables.tf
- 1.19 4-6. main.tf(完全なコード)
- 1.20 4-7. outputs.tf
- 1.21 4-8. デプロイ手順
- 1.22 4-9. 動作確認(CLI)
- 1.23 後片付け
- 1.24 まとめ: コンソール版との対応関係
- 1.25 Section 5: 実践Tips(Callbackパターン設計ガイド)
- 1.26 Section 6: ハンズオン後の削除手順
- 1.27 Section 7: まとめと次のステップ
Step Functions Callbackパターン完全ガイド — .waitForTaskTokenで実現する経費承認ワークフロー
公開日: 2026-04-12
難易度: 中級〜上級
所要時間: 約120分
シリーズ: [Step Functions シリーズ 第5回]
- .waitForTaskToken によるCallback パターンの仕組み(.sync との違い)
- タスクトークンのライフサイクル(発行→SQS送信→停止→返送→再開)
- AWS CLIで手動SendTaskSuccessを叩く「承認ボタン」体験
- 段階的構築(Step A→B→C→D): 基本Callback→タイムアウト→承認/却下分岐→SNS通知
- Terraform での完全構築(コンソール版とASL完全一致)
目次
- Callbackパターン(.waitForTaskToken)概念編
- アーキテクチャ解説
- AWSコンソールでのハンズオン
- Terraformでの構築
- 実践Tips(Callbackパターン設計ガイド)
- ハンズオン後の削除手順
- まとめと次のステップ
Section 1: Callbackパターン(.waitForTaskToken)概念編
1-1. Callbackパターンとは
AWS Step Functions には、外部リソースとの連携方法として .sync(同期) と .waitForTaskToken(Callback) の2つの統合パターンがある。
通常の .sync パターンでは、Step Functions が Lambda や ECS などを呼び出し、処理完了まで自動的に待機する。これは「Step Functions 側が完了を検知する」モデルだ。
一方、.waitForTaskToken(Callbackパターン) は「外部システムが完了を通知する」モデルである。Step Functions はタスクトークンと呼ばれる一意の識別子を発行し、それを外部システムに渡したうえで実行を 停止(pause) する。外部システムは処理完了後に SendTaskSuccess または SendTaskFailure API を呼び出すことで、Step Functions の実行が再開される。
主な用途:
– 人間の承認が必要なフロー(上長承認、コンプライアンスチェックなど)
– 外部 API の非同期応答待ち(決済処理、審査システムなど)
– SQS / API Gateway 経由のマイクロサービス間非同期連携
– Step Functions のタイムアウト内では完了しない長時間処理
1-2. .sync vs .waitForTaskToken 比較表
| 特性 | .sync | .waitForTaskToken |
|---|---|---|
| 実行モード | 同期(完了まで自動待機) | 非同期(トークン返送まで停止) |
| 最大待機時間 | 1年(Standard Workflow) | 1年(TimeoutSeconds 設定推奨) |
| ユースケース | Lambda / ECS / Glue 等の完了待ち | 人間承認・外部 API・SQS / API GW 経由の非同期処理 |
| タスクトークン | 不要 | 必須($$.Task.Token) |
| 完了通知 | SF が自動検知 | 外部システムが SendTaskSuccess / SendTaskFailure を呼ぶ |
| HeartbeatSeconds | 一部リソースのみ対応 | 利用可能(長時間処理では推奨) |
1-3. タスクトークンのライフサイクル(最重要)
Callback パターンの核心は タスクトークン にある。以下がその一連の流れだ。
① SF: タスクトークン発行($$.Task.Token)
↓
② SF: トークンを SQS / API GW 経由で外部システムに送信 → 実行を停止
↓(外部システムが処理中 — 数秒〜数日)
③ 外部: SendTaskSuccess(token, output) または SendTaskFailure(token, error) を呼ぶ
↓
④ SF: 停止から再開 → 次のステートへ進む
タスクトークンの重要な特性:
| 特性 | 内容 |
|---|---|
| 形式 | 推測不可能な不透明文字列(内部識別子) |
| 最大長 | 2,048 文字 |
| 有効期間 | 最大 1 年(ワークフロー実行期間と同じ) |
| 使用回数 | 一度限り(SendTaskSuccess / SendTaskFailure のどちらか一方のみ有効) |
| スコープ | 同一 AWS アカウント内のみ(クロスアカウント不可) |
| 保管責任 | 外部システム側が安全に保管する責任を持つ |
$$.Task.Token で Parameters に埋め込む。第4弾(データフロー制御)で学んだ Context Object($$)の知識がここで直接活きる。1-4. SendTaskSuccess / SendTaskFailure / SendTaskHeartbeat API
SendTaskSuccess — 処理成功を通知
aws stepfunctions send-task-success \
--task-token "TOKEN" \
--task-output '{"approved": true, "approver": "manager"}'
| パラメータ | 必須 | 制限 |
|---|---|---|
task-token | ○ | 1〜2,048 文字 |
task-output | ○ | 最大 262,144 バイト(256 KB)の JSON 文字列 |
task-output の内容は ResultPath に格納される。
SendTaskFailure — 処理失敗を通知
aws stepfunctions send-task-failure \
--task-token "TOKEN" \
--error "ExpenseRejected" \
--cause "却下理由: 金額が承認限度額を超過"
| パラメータ | 必須 | 制限 |
|---|---|---|
task-token | ○ | 1〜2,048 文字 |
error | 任意 | 最大 256 文字(Catch の ErrorEquals で捕捉可能) |
cause | 任意 | 最大 32,768 文字 |
error フィールドは Catch ブロックで ErrorEquals: ["ExpenseRejected"] のように捕捉できる。
SendTaskHeartbeat — 生存確認を送信
aws stepfunctions send-task-heartbeat \
--task-token "TOKEN"
HeartbeatSeconds のタイマーをリセットする「生存確認」用 API。長時間処理中に定期的に呼び出すことで States.HeartbeatTimeout を防ぐ。成功時はレスポンスボディなしの HTTP 200 が返る(実行履歴イベントは生成されない)。
共通エラーコード:
| エラー | 意味 |
|---|---|
InvalidToken | トークンの形式が不正 |
TaskDoesNotExist | トークンが有効なタスクに対応しない |
TaskTimedOut | タスクがタイムアウト済みまたは完了済み |
1-5. HeartbeatSeconds と TimeoutSeconds の Callback 固有挙動
| パラメータ | 意味 | タイムアウト時のエラー |
|---|---|---|
TimeoutSeconds | タスク全体のタイムアウト(SendTaskSuccess / Failure の最終期限) | States.Timeout |
HeartbeatSeconds | 連続する Heartbeat 間隔の最大値 | States.HeartbeatTimeout |
設定例:
{
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"HeartbeatSeconds": 3600,
"TimeoutSeconds": 86400,
"Parameters": {
"QueueUrl": "<SQS_QUEUE_URL>",
"MessageBody": {
"task_token.$": "$$.Task.Token"
}
}
}
重要な設計指針:
HeartbeatSeconds<TimeoutSecondsを必ず守る(例: Heartbeat=3600、Timeout=86400)HeartbeatSecondsは外部システムが定期的に生存確認を送れる場合のみ設定するTimeoutSecondsは SQS メッセージの可視性タイムアウトとは独立した別の概念TimeoutSecondsのデフォルト値は 99,999,999 秒(約 3.17 年)だが、明示的に設定することを推奨SendTaskHeartbeatはHeartbeatSecondsタイマーをリセットするが、TimeoutSecondsは延長しない
1-6. ユースケース一覧
| ユースケース | 説明 | Callback を使う理由 |
|---|---|---|
| 人間の承認フロー | 上長承認、リリース承認、コンプライアンスチェック | 承認まで数時間〜数日かかる可能性 |
| 外部 API 非同期応答 | 外部決済処理、外部審査システムの応答待ち | 外部 API が非同期モデルを採用 |
| サービス間オーケストレーション | マイクロサービス間の非同期連携(SQS / EventBridge 経由) | 受信側サービスが完了タイミングを制御 |
| 手動データ検証 | オペレーターが目視確認後に処理を再開するフロー | 人間の判断が必要 |
| 支払い処理待ち | ユーザーが支払いフォームに入力するまでの待機 | ユーザー操作完了まで時間不定 |
1-7. Context Object でのトークン取得方法
$$.Task.Token は Context Object($$)経由で取得する。第4弾(データフロー制御)で学んだ Context Object の知識がここで直接活きる。
{
"WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "<SQS_QUEUE_URL>",
"MessageBody": {
"task_token.$": "$$.Task.Token",
"expense_id.$": "$.expense_id"
}
}
}
}
構文のポイント:
| 記法 | 意味 |
|---|---|
"task_token.$" | .$ サフィックス = 値を動的参照(パス/Context Object) |
"$$.Task.Token" | $$ = Context Object、.Task.Token = 現在のタスクのトークン |
"expense_id.$": "$.expense_id" | $ = 入力 JSON から値を参照 |
$$.Task.Token は Parameters および ResultSelector の中でのみ参照可能。InputPath や OutputPath では使用できない。Section 1 まとめ:
– .waitForTaskToken は「外部システムが完了を通知する」非同期パターン
– タスクトークンは一度限り有効、最大 2,048 文字、有効期間最大 1 年
– HeartbeatSeconds(States.HeartbeatTimeout)と TimeoutSeconds(States.Timeout)は独立したタイムアウト機構
– $$.Task.Token は Parameters / ResultSelector 内でのみ参照可能
Section 2: アーキテクチャ解説
2-1. ハンズオンシナリオ概要
本ハンズオンでは 経費承認ワークフロー をシナリオとして使用します。社員が経費を申請し、上長が承認または却下し、結果をメールで通知するという実務でよく見られる非同期承認フローを AWS Step Functions の Callbackパターン(.waitForTaskToken)で実装します。
ワークフローの概要
| フェーズ | 処理 | 担当コンポーネント |
|---|---|---|
| 申請受付 | 経費情報をDynamoDBに保存し、承認依頼をSQSへ送信 | SubmitExpense Lambda |
| 承認待機 | タスクトークンを発行し実行を停止 | WaitForApproval(Callback) |
| 上長確認 | SQSからメッセージを取得し、CLIで承認/却下を返送 | 承認者(SQS + SF API) |
| 分岐処理 | 承認/却下/タイムアウトで後続ステートへ分岐 | RouteApproval(Choice) |
| 完了通知 | 申請者へSNS経由でメール通知 | NotifyApproved/Rejected/Timeout |
各コンポーネントの役割
- Step Functions State Machine — ワークフロー全体のオーケストレーター。
.waitForTaskTokenを使い、承認者の返答まで実行を停止する - SQS キュー — タスクトークンを埋め込んだメッセージを承認者へ配送するメッセージバッファ
- DynamoDB — 経費申請データ(申請者、金額、申請日時)を永続化
- SNS — 承認/却下/タイムアウト通知を申請者へメール配信
- Lambda — 経費受付処理と経理システムへの反映処理を担当
2-2. 全体アーキテクチャ

上図は経費承認ワークフロー全体のアーキテクチャを示しています。
ワークフローは Lambda または EventBridge Scheduler によってトリガーされ、Step Functions State Machine が処理を開始します。State Machine 内部は縦に配置された5つの主要ステートで構成されます。
Callbackの核心: WaitForApproval ステートは ★停止・待機ポイント です。このステートに到達すると Step Functions はタスクトークンを生成し、SQSへメッセージを送信した後、外部から SendTaskSuccess または SendTaskFailure が呼ばれるまで実行を停止します。この待機中にも Step Functions の状態は保持され、課金は発生しません(Standard Workflow の場合)。
承認者はSQSキューからメッセージを取得し、タスクトークンを使って aws stepfunctions send-task-success を実行することでワークフローを再開させます。
2-3. Callbackフロー詳解

Callbackパターンのデータフローを上段(発行・送信・停止)と下段(承認・返送・再開)に分けて示しています。
タスクトークンの発行
State Definition で .waitForTaskToken を指定したリソースが呼ばれると、Step Functions は自動的にタスクトークン(約1KBの不透明な文字列)を生成します。このトークンは $$.Task.Token という組み込み変数で参照できます。
"WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789/approval-queue",
"MessageBody": {
"TaskToken.$": "$$.Task.Token",
"ExpenseId.$": "$.expenseId",
"Amount.$": "$.amount"
}
},
"HeartbeatSeconds": 86400,
"Next": "RouteApproval"
}
停止と待機
SQSへのメッセージ送信が完了した時点で、ステートの実行は停止します。State Machine は RUNNING 状態を維持したまま、タスクトークンが返送されるのを待ちます。Standard Workflow では最大1年間待機でき、HeartbeatSeconds でハートビートタイムアウトを設定できます。
トークンの返送と再開
承認者はSQSメッセージからタスクトークンを取得し、以下のいずれかを実行します:
| コマンド | 意味 | 後続動作 |
|---|---|---|
send-task-success | 承認(結果JSONを渡す) | RouteApprovalステートへ進む |
send-task-failure | システムエラー | エラーハンドラへ |
# 承認例
aws stepfunctions send-task-success \
--task-token "AQC..." \
--task-output '{"approved": true, "approver": "manager-a", "comment": "承認します"}'
# 却下例
aws stepfunctions send-task-success \
--task-token "AQC..." \
--task-output '{"approved": false, "approver": "manager-a", "comment": "金額が上限超過"}'
2-4. 承認/却下シーケンス

承認フロー(通常系)
- SF → SubmitExpense Lambda: 経費申請を受け付け、DynamoDBに保存
- Lambda → SF: 申請ID・金額などを返却
- SF → SQS: タスクトークンを埋め込んだメッセージを送信
- SF: ⏸ 実行停止(待機開始)—
WaitForApprovalステートで停止 - 承認者 → SQS: SQSキューからメッセージを受信(ポーリング or Lambda トリガー)
- 承認者 → SF:
SendTaskSuccess {"approved": true, "approver": "manager"}を送信 - SF: ▶ 実行再開(待機終了)—
RouteApprovalステートへ進む - SF → ProcessExpense Lambda: 経理システムへ反映
- SF → SNS: 承認完了通知を送信 → 申請者へメール
- SF: SUCCEEDED終了
却下フロー
ステップ6で {"approved": false} が返送されると、RouteApproval(Choiceステート)の条件分岐により却下パスに進み、NotifyRejected(SNS)を経てFAILED終了します。
"RouteApproval": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.approved",
"BooleanEquals": true,
"Next": "ProcessExpense"
},
{
"Variable": "$.approved",
"BooleanEquals": false,
"Next": "NotifyRejected"
}
]
}
タイムアウトフロー
WaitForApproval ステートに HeartbeatSeconds: 86400(24時間)を設定することで、承認が一定時間行われない場合に States.HeartbeatTimeout エラーが発生します。Catch ブロックで補足し NotifyTimeout(SNS)へ分岐させることで、申請者・申請者管理者へタイムアウト通知を送ります。
"WaitForApproval": {
...,
"HeartbeatSeconds": 86400,
"Catch": [
{
"ErrorEquals": ["States.HeartbeatTimeout"],
"Next": "NotifyTimeout"
}
]
}
HeartbeatSeconds はトークン返送のタイムアウトを制御します。ワークフロー全体の最大実行時間は TimeoutSeconds で別途制御できます。Standard Workflow のデフォルト最大実行時間は1年です。Section 3: AWSコンソールでのハンズオン
このセクションでは、AWSコンソールを使って経費承認ワークフローを実際に構築します。
「承認ボタンを押す」体験をAWS CLIで行うことが、このハンズオンの核心です。
ステートマシンはStep A→B→C→Dと段階的に機能を追加しながら構築します。最終形(Step D)では、SQS Callback + 承認/却下分岐 + SNS通知が全て揃った完成形を体験できます。
3-1. 前提条件
必要なAWS権限
以下のサービスを操作できるIAM権限が必要です。
| サービス | 必要な権限 |
|---|---|
| AWS Lambda | 関数の作成・編集・実行 |
| AWS Step Functions | ステートマシンの作成・実行 |
| Amazon SQS | キューの作成・メッセージ受信 |
| Amazon SNS | トピックの作成・サブスクリプション管理 |
| AWS IAM | ロール・ポリシーの作成 |
| Amazon CloudWatch Logs | ログの閲覧 |
AWS CLI の準備
このハンズオンでは、SendTaskSuccess/SendTaskFailure を AWS CLI で実行します。
コンソールからは送信できないため、CLIのインストールと設定が必須です。
# バージョン確認
aws --version
# aws-cli/2.x.x Python/3.x.x ...
# 認証確認
aws sts get-caller-identity
認証情報が正しく設定されていれば、アカウントIDとARNが表示されます。
3-2. Lambda 2関数のデプロイ
経費承認ワークフローでは2つのLambda関数を使用します。
| 関数名 | 役割 |
|---|---|
submit_expense | 経費申請を受け付け、ステータスをpendingに設定 |
process_expense | 承認後の経理処理(DynamoDB書き込みなど)を実行 |
submit_expense 関数の作成
コンソール操作手順:
- AWSコンソールで Lambda を開く
- 「関数の作成」をクリック
- 設定値:
- 作成方法: 一から作成
- 関数名:
submit_expense - ランタイム: Python 3.12
- アーキテクチャ: x86_64
- 「関数の作成」をクリック
- コードエディタに以下を貼り付け:
import json
import os
import boto3
from datetime import datetime
def lambda_handler(event, context):
expense_id = event.get("expense_id")
amount = event.get("amount", 0)
description = event.get("description", "")
applicant = event.get("applicant", "unknown")
if not expense_id:
raise ValueError("expense_id は必須です")
if amount <= 0:
raise ValueError("amountは正の値が必要です")
# DynamoDB への保存(オプション — Step Dで有効化)
# dynamodb = boto3.resource("dynamodb")
# table = dynamodb.Table(os.environ.get("EXPENSE_TABLE", "expense-requests"))
# table.put_item(Item={
# "expense_id": expense_id,
# "amount": amount,
# "description": description,
# "applicant": applicant,
# "status": "pending",
# "submitted_at": datetime.utcnow().isoformat()
# })
print(f"経費申請受付: expense_id={expense_id}, amount={amount}, applicant={applicant}")
return {
"expense_id": expense_id,
"status": "pending",
"submitted_at": datetime.utcnow().isoformat()
}
- 「Deploy」をクリックして保存
- 関数ARNをメモ(例:
arn:aws:lambda:ap-northeast-1:123456789012:function:submit_expense)
→<LAMBDA_ARN_SUBMIT_EXPENSE>プレースホルダーに使用
process_expense 関数の作成
同様の手順で2つ目の関数を作成します。
- Lambda コンソール → 「関数の作成」
- 設定値:
- 関数名:
process_expense - ランタイム: Python 3.12
- コードエディタに以下を貼り付け:
import json
import os
from datetime import datetime
def lambda_handler(event, context):
expense_id = event.get("expense_id")
amount = event.get("amount", 0)
approver = event.get("approver", "unknown")
print(f"経理処理実行: expense_id={expense_id}, amount={amount}, approver={approver}")
# 経理システムへの連携処理(ハンズオンではモック)
return {
"expense_id": expense_id,
"amount": amount,
"approver": approver,
"processed_at": datetime.utcnow().isoformat(),
"accounting_ref": f"ACC-{expense_id}-001"
}
- 「Deploy」をクリックして保存
- 関数ARNをメモ
→<LAMBDA_ARN_PROCESS_EXPENSE>プレースホルダーに使用
3-3. SQSキュー作成
Callbackパターンの中心となるSQSキューを作成します。
コンソール操作手順:
- AWSコンソールで Amazon SQS を開く
- 「キューの作成」をクリック
- 設定値:
- タイプ: スタンダード
- 名前:
expense-approval-queue - 可視性タイムアウト: 300秒(デフォルト30秒から変更推奨)
> 可視性タイムアウトが短いと、承認者がメッセージを処理中に他のコンシューマーが再取得してしまいます。 - その他はデフォルトのまま「キューの作成」をクリック
- 作成後、キュー詳細画面から キューURL をメモ
- 例:
https://sqs.ap-northeast-1.amazonaws.com/123456789012/expense-approval-queue - →
<SQS_QUEUE_URL>プレースホルダーに使用
3-4. SNSトピック作成とメールサブスクリプション
承認/却下/タイムアウトの通知を送るSNSトピックを作成します(Step Dで使用)。
コンソール操作手順:
- AWSコンソールで Amazon SNS を開く
- 「トピックの作成」をクリック
- 設定値:
- タイプ: スタンダード
- 名前:
expense-notification - 「トピックの作成」をクリック
- トピックARNをメモ
- 例:
arn:aws:sns:ap-northeast-1:123456789012:expense-notification - →
<SNS_TOPIC_ARN>プレースホルダーに使用
メールサブスクリプションの設定:
- 作成したトピックの詳細画面 → 「サブスクリプションの作成」
- 設定値:
- プロトコル: Eメール
- エンドポイント: 通知を受け取るメールアドレスを入力
- 「サブスクリプションの作成」をクリック
- 入力したメールアドレスに確認メールが届く
- メール内の 「Confirm subscription」リンクをクリック
3-5. IAMロール作成
Step FunctionsがLambda・SQS・SNSを操作するためのIAMロールを作成します。
コンソール操作手順:
- AWSコンソールで IAM を開く → 「ロール」→「ロールを作成」
- 信頼されたエンティティの選択:
- タイプ: AWSのサービス
- ユースケース: 「Step Functions」を選択(リストにない場合は「カスタム信頼ポリシー」を使用)
- 信頼ポリシー(カスタム選択時):
json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "states.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]} - ロール名:
sf-expense-execution-role - ロール作成後、「インラインポリシーを追加」から以下を設定:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "InvokeLambda",
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": [
"<LAMBDA_ARN_SUBMIT_EXPENSE>",
"<LAMBDA_ARN_PROCESS_EXPENSE>"
]
},
{
"Sid": "SendSQSMessage",
"Effect": "Allow",
"Action": "sqs:SendMessage",
"Resource": "<SQS_QUEUE_ARN>"
},
{
"Sid": "PublishSNS",
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": "<SNS_TOPIC_ARN>"
},
{
"Sid": "CloudWatchLogs",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogDelivery",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:GetLogDelivery",
"logs:ListLogDeliveries",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies"
],
"Resource": "*"
}
]
}
arn:aws:sqs:ap-northeast-1:123456789012:expense-approval-queue)。- ポリシー名:
sf-expense-policyとして保存
3-6. ステートマシンを段階的に構築(記事の核心)
ここからがこのハンズオンの核心です。Step A→D と段階的にステートマシンを構築し、Callbackパターンの仕組みを体験的に理解します。
Step A: 基本Callback — 承認体験を初めて体感する
まず最もシンプルな形でCallbackパターンを体験します。
ステートマシン作成手順:
- AWSコンソールで Step Functions を開く
- 「ステートマシンの作成」をクリック
- 「コードでワークフローを記述」を選択
- 以下のASLを貼り付け(
<プレースホルダー>を実際の値に置換):
{
"Comment": "経費承認ワークフロー — Step A(基本Callback)",
"StartAt": "SubmitExpense",
"States": {
"SubmitExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>",
"ResultPath": "$.submission",
"Next": "WaitForApproval"
},
"WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "<SQS_QUEUE_URL>",
"MessageBody": {
"task_token.$": "$$.Task.Token",
"expense_id.$": "$.expense_id",
"amount.$": "$.amount"
}
},
"ResultPath": "$.approval",
"Next": "ProcessExpense"
},
"ProcessExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>",
"ResultPath": "$.processing",
"Next": "ExpenseApproved"
},
"ExpenseApproved": {
"Type": "Succeed"
}
}
}
$$.Task.Token がCallbackの核心: $$ はStep Functionsのコンテキストオブジェクトへのアクセスです。実行時に一意のトークンが生成され、SQSメッセージに埋め込まれます。このトークンを後でSendTaskSuccessに渡すことで、ワークフローが再開します。- 設定値:
- ステートマシン名:
expense-approval-step-a - 実行ロール:
sf-expense-execution-role - ログ設定: ALL レベル を推奨(待機中の挙動が CloudWatch で確認できます)
- 「ステートマシンの作成」をクリック
実行テスト:
- 「実行の開始」をクリック
- 入力JSONに以下を貼り付け:
{
"expense_id": "EXP-001",
"amount": 50000,
"description": "出張交通費",
"applicant": "yamada.taro"
}
- 「実行の開始」をクリック
WaitForApproval で止まることを確認:
実行グラフで WaitForApproval ステートが青色(RUNNING)のまま止まっているはずです。
これがCallbackパターンの動作です — Step Functionsは外部からの応答を待って一時停止しています。
SQSからtask_tokenを取得:
- SQSコンソール →
expense-approval-queueを開く - 「メッセージを送受信」→「メッセージをポーリング」をクリック
- 受信したメッセージの本文を確認:
json
{
"task_token": "AQCAAAAKAAAAAAAAADg...(長い文字列)",
"expense_id": "EXP-001",
"amount": 50000
} task_tokenの値をコピー(非常に長い文字列です)
★ 承認体験(記事の核心)
AWS CLIで以下を実行します。これが「承認ボタンを押す」体験です:
aws stepfunctions send-task-success \
--task-token "SQSメッセージから取得したtask_token" \
--task-output '{"approved": true, "approver": "manager.suzuki"}'
コマンド実行後、Step Functions コンソールに戻ると:
– WaitForApproval → SUCCEEDED
– ProcessExpense → 実行中
– ExpenseApproved → SUCCEEDED
実行全体が SUCCEEDED になることを確認してください。
Step B: タイムアウト付き — HeartbeatSeconds / TimeoutSeconds を体験
ステートマシン作成手順:
- 「ステートマシンの作成」→「コードでワークフローを記述」
- 以下のASLを貼り付け:
{
"Comment": "経費承認ワークフロー — Step B(タイムアウト付き)",
"StartAt": "SubmitExpense",
"States": {
"SubmitExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>",
"ResultPath": "$.submission",
"Next": "WaitForApproval"
},
"WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "<SQS_QUEUE_URL>",
"MessageBody": {
"task_token.$": "$$.Task.Token",
"expense_id.$": "$.expense_id",
"amount.$": "$.amount"
}
},
"HeartbeatSeconds": 3600,
"TimeoutSeconds": 86400,
"ResultPath": "$.approval",
"Catch": [
{
"ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],
"ResultPath": "$.error",
"Next": "ExpenseTimeout"
}
],
"Next": "ProcessExpense"
},
"ProcessExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>",
"ResultPath": "$.processing",
"Next": "ExpenseApproved"
},
"ExpenseApproved": {
"Type": "Succeed"
},
"ExpenseTimeout": {
"Type": "Fail",
"Error": "ApprovalTimeout",
"Cause": "承認がタイムアウトしました"
}
}
}
- ステートマシン名:
expense-approval-step-b - 「ステートマシンの作成」をクリック
タイムアウト値の意味:
| パラメータ | 値 | 意味 |
|---|---|---|
HeartbeatSeconds | 3600 | 1時間以内にハートビートがなければタイムアウト |
TimeoutSeconds | 86400 | 24時間以内にSendTaskSuccess/Failureがなければタイムアウト |
補足テスト — タイムアウト体験:
本番用ASLのTimeoutSeconds=86400(24時間)は変更せず、別途タイムアウトを体験するための短縮版を使います。新しいステートマシンで TimeoutSeconds を 30 に変更してテストしてください:
"WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": { ... },
"HeartbeatSeconds": 30,
"TimeoutSeconds": 30,
"Catch": [
{
"ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],
"ResultPath": "$.error",
"Next": "ExpenseTimeout"
}
],
"Next": "ProcessExpense"
}
SendTaskSuccessを送らずに30秒待つと、States.Timeout エラーが発生し、Catch により ExpenseTimeout(Fail)ステートに遷移します。
Step C: 承認/却下分岐 — SendTaskFailure体験
ステートマシン作成手順:
- 「ステートマシンの作成」→「コードでワークフローを記述」
- 以下のASLを貼り付け:
{
"Comment": "経費承認ワークフロー — Step C(承認/却下分岐)",
"StartAt": "SubmitExpense",
"States": {
"SubmitExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>",
"ResultPath": "$.submission",
"Next": "WaitForApproval"
},
"WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "<SQS_QUEUE_URL>",
"MessageBody": {
"task_token.$": "$$.Task.Token",
"expense_id.$": "$.expense_id",
"amount.$": "$.amount"
}
},
"HeartbeatSeconds": 3600,
"TimeoutSeconds": 86400,
"ResultPath": "$.approval",
"Catch": [
{
"ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],
"ResultPath": "$.error",
"Next": "ExpenseTimeout"
}
],
"Next": "RouteApproval"
},
"RouteApproval": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.approval.approved",
"BooleanEquals": true,
"Next": "ProcessExpense"
}
],
"Default": "ExpenseRejected"
},
"ProcessExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>",
"ResultPath": "$.processing",
"Next": "ExpenseApproved"
},
"ExpenseApproved": {
"Type": "Succeed"
},
"ExpenseRejected": {
"Type": "Fail",
"Error": "ExpenseRejected",
"Cause": "経費申請が却下されました"
},
"ExpenseTimeout": {
"Type": "Fail",
"Error": "ApprovalTimeout",
"Cause": "承認がタイムアウトしました"
}
}
}
- ステートマシン名:
expense-approval-step-c
承認テスト(approved: true):
aws stepfunctions send-task-success \
--task-token "TOKEN" \
--task-output '{"approved": true, "approver": "manager.suzuki"}'
RouteApproval で approved == true → ProcessExpense → ExpenseApproved (SUCCEEDED)
却下テスト(approved: false):
aws stepfunctions send-task-success \
--task-token "TOKEN" \
--task-output '{"approved": false, "reason": "金額が承認限度額を超過"}'
RouteApproval の Default → ExpenseRejected (FAILED) に遷移します。
approved: false を含む正常な応答を返すことで、Choice ステートでルーティングします。これは「処理は正常に完了した(タスクトークンは有効に解決した)が、ビジネスロジック的に却下された」という設計です。SendTaskFailure 体験:
SendTaskFailure は「外部処理自体が失敗した」場合に使います:
aws stepfunctions send-task-failure \
--task-token "TOKEN" \
--error "ExpenseRejected" \
--cause "却下理由: 経費規程違反"
WaitForApproval の Catch に ErrorEquals: ["ExpenseRejected"] を追加していれば捕捉できます。
このパターンは「承認システム側でエラーが発生した」ケースに有効です。
Step D: 最終形 — SNS通知付き完成形
ステートマシン作成手順:
- 「ステートマシンの作成」→「コードでワークフローを記述」
- 以下のASLを貼り付け(
<SNS_TOPIC_ARN>を実際のARNに置換):
{
"Comment": "経費承認ワークフロー — Step D(最終形: SQS Callback+承認・却下分岐+SNS通知)",
"StartAt": "SubmitExpense",
"States": {
"SubmitExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>",
"Parameters": {
"expense_id.$": "$.expense_id",
"amount.$": "$.amount",
"description.$": "$.description",
"applicant.$": "$.applicant"
},
"ResultPath": "$.submission",
"Next": "WaitForApproval"
},
"WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "<SQS_QUEUE_URL>",
"MessageBody": {
"task_token.$": "$$.Task.Token",
"expense_id.$": "$.expense_id",
"amount.$": "$.amount",
"applicant.$": "$.applicant",
"message": "経費申請の承認をお願いします"
}
},
"HeartbeatSeconds": 3600,
"TimeoutSeconds": 86400,
"ResultPath": "$.approval",
"Catch": [
{
"ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],
"ResultPath": "$.error",
"Next": "NotifyTimeout"
}
],
"Next": "RouteApproval"
},
"RouteApproval": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.approval.approved",
"BooleanEquals": true,
"Next": "ProcessExpense"
}
],
"Default": "NotifyRejected"
},
"ProcessExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>",
"Parameters": {
"expense_id.$": "$.expense_id",
"amount.$": "$.amount",
"approver.$": "$.approval.approver"
},
"ResultPath": "$.processing",
"Next": "NotifyApproved"
},
"NotifyApproved": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "<SNS_TOPIC_ARN>",
"Message.$": "States.Format('経費申請 {} が承認されました(承認者: {})', $.expense_id, $.approval.approver)",
"Subject": "経費申請 承認完了通知"
},
"ResultPath": null,
"Next": "ExpenseApproved"
},
"NotifyRejected": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "<SNS_TOPIC_ARN>",
"Message.$": "States.Format('経費申請 {} が却下されました', $.expense_id)",
"Subject": "経費申請 却下通知"
},
"ResultPath": null,
"Next": "ExpenseRejected"
},
"NotifyTimeout": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "<SNS_TOPIC_ARN>",
"Message.$": "States.Format('経費申請 {} の承認がタイムアウトしました', $.expense_id)",
"Subject": "経費申請 承認タイムアウト"
},
"ResultPath": null,
"Next": "ExpenseTimeout"
},
"ExpenseApproved": {
"Type": "Succeed"
},
"ExpenseRejected": {
"Type": "Fail",
"Error": "ExpenseRejected",
"Cause": "経費申請が却下されました"
},
"ExpenseTimeout": {
"Type": "Fail",
"Error": "ApprovalTimeout",
"Cause": "承認がタイムアウトしました"
}
}
}
- ステートマシン名:
expense-approval-final
3つのシナリオをテスト:
① 承認フロー:
aws stepfunctions send-task-success \
--task-token "TOKEN" \
--task-output '{"approved": true, "approver": "manager.suzuki"}'
→ NotifyApproved → SNS メール受信(件名: 「経費申請 承認完了通知」)→ ExpenseApproved (SUCCEEDED)
② 却下フロー:
aws stepfunctions send-task-success \
--task-token "TOKEN" \
--task-output '{"approved": false, "reason": "金額超過"}'
→ NotifyRejected → SNS メール受信(件名: 「経費申請 却下通知」)→ ExpenseRejected (FAILED)
③ タイムアウトフロー(補足テスト):
本番ASLのTimeoutSeconds=86400は変更せず、TimeoutSeconds: 30 の短縮版で確認します:
SendTaskSuccessを送らずに30秒待つ
→ States.Timeout 発生
→ NotifyTimeout → SNS メール受信(件名: 「経費申請 承認タイムアウト」)→ ExpenseTimeout (FAILED)
States.Format について: Step Functionsの組み込み関数で、動的な文字列を生成します。'経費申請 {} が承認されました', $.expense_id のように、{} が変数の値に置換されます。SNS通知のメッセージを動的に生成するのに便利です。3-7. CloudWatch Logsで待機→再開の挙動を確認
Step Functionsの実行ログを確認することで、Callbackパターンの内部動作を詳しく把握できます。
ログ確認手順
- Step Functionsコンソール → 対象の実行を開く
- 「実行の詳細」タブ → 「ログ」セクションを確認
- CloudWatch Logsのリンクをクリックして詳細ログを表示
確認すべきイベントの流れ
ログで以下のシーケンスを確認してください:
WaitForApprovalStateEntered ← WaitForApproval ステート開始
TaskSubmitted← SQSにメッセージ送信 + Task Token発行
(...長い待機時間...)
TaskSucceeded← SendTaskSuccess を受信
WaitForApprovalStateExited ← WaitForApproval ステート終了
ProcessExpenseStateEntered ← 次のステートへ遷移
ExecutionHistory APIで待機時間を計測
AWS CLIを使ってプログラマティックに実行履歴を取得し、待機時間を計算できます:
# 実行ARNを確認
aws stepfunctions list-executions \
--state-machine-arn "<YOUR_STATE_MACHINE_ARN>" \
--query 'executions[0].executionArn' \
--output text
# 実行履歴を取得
aws stepfunctions get-execution-history \
--execution-arn "<EXECUTION_ARN>" \
--query 'events[?type==`TaskSubmitted` || type==`TaskSucceeded`].[timestamp,type]' \
--output table
出力例:
-----------------------------------------
| timestamp | type |
-----------------------------------------
| 2026-04-12T10:00:00Z| TaskSubmitted |
| 2026-04-12T10:05:23Z| TaskSucceeded |
-----------------------------------------
TaskSubmitted と TaskSucceeded のタイムスタンプ差が、実際の承認待機時間です。
この例では 5分23秒 承認待ちだったことがわかります。
CloudWatch Metrics でモニタリング
本番環境では以下のメトリクスを監視することを推奨します:
| メトリクス | 説明 | アラート推奨値 |
|---|---|---|
ExecutionsFailed | 失敗した実行数 | 1以上でアラート |
ExecutionsTimedOut | タイムアウトした実行数 | 1以上でアラート |
ExecutionThrottled | スロットリングされた実行数 | 急増時にアラート |
まとめ(Section 3)
このセクションでは、経費承認ワークフローを段階的に構築しました。
| Step | 追加した機能 | 学んだこと |
|---|---|---|
| A | 基本Callback | $$.Task.Token の仕組み、SendTaskSuccessによる再開 |
| B | タイムアウト | HeartbeatSeconds/TimeoutSeconds、States.Timeout の Catch |
| C | 承認/却下分岐 | Choice ステートでの approved フラグによるルーティング |
| D | SNS通知 | States.Format を使った動的メッセージ、3シナリオの完全テスト |
Callbackパターンの本質: Step Functionsはtask_tokenを外部に渡し、そのトークンが返ってくるまで待ちます。「承認者がボタンを押す」「外部システムが処理を完了する」「人間が確認する」など、任意の外部イベントでワークフローを再開できるのがCallbackパターンの強みです。
次のSection 4では、このワークフロー全体をTerraformで自動構築します。
Section 4: Terraformでの構築
Section 3のコンソール操作で構築した経費承認ワークフローを、今度はTerraformで再現します。インフラをコードで管理することで、環境の再現性が高まり、チーム開発での一貫した運用が可能になります。
4-1. 前提条件
- Terraform 1.0以上インストール済み
- AWS CLI設定済み(
aws configure実行済み) - 以下のIAM権限が付与されていること:
- Lambda の作成・実行
- IAM ロール・ポリシーの作成
- SQS キューの作成・操作
- SNS トピックの作成・サブスクライブ
- Step Functions ステートマシンの作成・実行
- CloudWatch Logs の作成
4-2. ディレクトリ構成
sf-callback/
├── main.tf
├── variables.tf
├── outputs.tf
├── lambda/
│├── submit_expense.py
│└── process_expense.py
└── statemachine/
└── definition.json.tpl
作業ディレクトリを作成します:
mkdir -p sf-callback/lambda sf-callback/statemachine
cd sf-callback
4-3. Lambda ソースコード
Section 3(コンソール版)と同じロジック構成の Lambda 関数です。
lambda/submit_expense.py
経費申請を受け付け、申請情報をそのまま返す関数です。
import json
import logging
from datetime import datetime, timezone
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
経費申請受付 Lambda
受け取った申請情報を検証し、タイムスタンプを付与して返す
"""
logger.info(f"受信イベント: {json.dumps(event, ensure_ascii=False)}")
expense_id = event.get("expense_id")
amount = event.get("amount")
description = event.get("description", "")
applicant = event.get("applicant")
# 基本バリデーション
if not expense_id or amount is None or not applicant:
raise ValueError("expense_id, amount, applicant は必須パラメータです")
if amount <= 0:
raise ValueError("amount は正の数値である必要があります")
submitted_at = datetime.now(timezone.utc).isoformat()
result = {
"expense_id": expense_id,
"amount": amount,
"description": description,
"applicant": applicant,
"submitted_at": submitted_at,
"status": "SUBMITTED",
}
logger.info(f"申請受付完了: {json.dumps(result, ensure_ascii=False)}")
return result
lambda/process_expense.py
承認済みの経費を処理する関数です。
import json
import logging
from datetime import datetime, timezone
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
経費処理 Lambda
承認済みの経費申請を処理し、処理結果を返す
"""
logger.info(f"受信イベント: {json.dumps(event, ensure_ascii=False)}")
expense_id = event.get("expense_id")
amount = event.get("amount")
approver = event.get("approver")
if not expense_id or amount is None or not approver:
raise ValueError("expense_id, amount, approver は必須パラメータです")
processed_at = datetime.now(timezone.utc).isoformat()
result = {
"expense_id": expense_id,
"amount": amount,
"approver": approver,
"processed_at": processed_at,
"status": "PROCESSED",
"payment_scheduled": True,
}
logger.info(f"経費処理完了: {json.dumps(result, ensure_ascii=False)}")
return result
4-4. statemachine/definition.json.tpl
コンソール版(Section 3)の Step D と同じ ASL 構造です。
Terraform の templatefile() 関数を使用し、${...} 形式のプレースホルダーを実際の ARN・URL に置換します。
templatefile() のしくみ
# main.tf での使用例
definition = templatefile("statemachine/definition.json.tpl", {
submit_expense_arn = aws_lambda_function.submit_expense.arn,
process_expense_arn = aws_lambda_function.process_expense.arn,
sqs_queue_url = aws_sqs_queue.expense_approval.url,
sns_topic_arn = aws_sns_topic.expense_notification.arn
})
| プレースホルダー | 置換される値 |
|---|---|
${submit_expense_arn} | submit_expense Lambda の ARN |
${process_expense_arn} | process_expense Lambda の ARN |
${sqs_queue_url} | SQS キューの URL |
${sns_topic_arn} | SNS トピックの ARN |
statemachine/definition.json.tpl の内容
{
"Comment": "経費承認ワークフロー — Step D(最終形: SQS Callback+承認・却下分岐+SNS通知)",
"StartAt": "SubmitExpense",
"States": {
"SubmitExpense": {
"Type": "Task",
"Resource": "${submit_expense_arn}",
"Parameters": {
"expense_id.$": "$.expense_id",
"amount.$": "$.amount",
"description.$": "$.description",
"applicant.$": "$.applicant"
},
"ResultPath": "$.submission",
"Next": "WaitForApproval"
},
"WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "${sqs_queue_url}",
"MessageBody": {
"task_token.$": "$$.Task.Token",
"expense_id.$": "$.expense_id",
"amount.$": "$.amount",
"applicant.$": "$.applicant",
"message": "経費申請の承認をお願いします"
}
},
"HeartbeatSeconds": 3600,
"TimeoutSeconds": 86400,
"ResultPath": "$.approval",
"Catch": [
{
"ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],
"ResultPath": "$.error",
"Next": "NotifyTimeout"
}
],
"Next": "RouteApproval"
},
"RouteApproval": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.approval.approved",
"BooleanEquals": true,
"Next": "ProcessExpense"
}
],
"Default": "NotifyRejected"
},
"ProcessExpense": {
"Type": "Task",
"Resource": "${process_expense_arn}",
"Parameters": {
"expense_id.$": "$.expense_id",
"amount.$": "$.amount",
"approver.$": "$.approval.approver"
},
"ResultPath": "$.processing",
"Next": "NotifyApproved"
},
"NotifyApproved": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "${sns_topic_arn}",
"Message.$": "States.Format('経費申請 {} が承認されました(承認者: {})', $.expense_id, $.approval.approver)",
"Subject": "経費申請 承認完了通知"
},
"ResultPath": null,
"Next": "ExpenseApproved"
},
"NotifyRejected": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "${sns_topic_arn}",
"Message.$": "States.Format('経費申請 {} が却下されました', $.expense_id)",
"Subject": "経費申請 却下通知"
},
"ResultPath": null,
"Next": "ExpenseRejected"
},
"NotifyTimeout": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "${sns_topic_arn}",
"Message.$": "States.Format('経費申請 {} の承認がタイムアウトしました', $.expense_id)",
"Subject": "経費申請 承認タイムアウト"
},
"ResultPath": null,
"Next": "ExpenseTimeout"
},
"ExpenseApproved": {
"Type": "Succeed"
},
"ExpenseRejected": {
"Type": "Fail",
"Error": "ExpenseRejected",
"Cause": "経費申請が却下されました"
},
"ExpenseTimeout": {
"Type": "Fail",
"Error": "ApprovalTimeout",
"Cause": "承認がタイムアウトしました"
}
}
}
${...} 部分のみ動的に補間されます。4-5. variables.tf
variable "aws_region" {
description = "AWSリージョン"
default = "ap-northeast-1"
}
variable "project_name" {
description = "プロジェクト名(リソース命名プレフィックスに使用)"
default = "expense-approval"
}
variable "notification_email" {
description = "承認/却下/タイムアウト通知先メールアドレス"
type = string
}
4-6. main.tf(完全なコード)
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
archive = {
source = "hashicorp/archive"
version = "~> 2.0"
}
}
}
provider "aws" {
region = var.aws_region
}
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
# ===================================================
# Lambda ZIPパッケージ生成
# ===================================================
data "archive_file" "submit_expense" {
type = "zip"
source_file = "${path.module}/lambda/submit_expense.py"
output_path = "${path.module}/.terraform/tmp/submit_expense.zip"
}
data "archive_file" "process_expense" {
type = "zip"
source_file = "${path.module}/lambda/process_expense.py"
output_path = "${path.module}/.terraform/tmp/process_expense.zip"
}
# ===================================================
# IAM ロール: Lambda 実行ロール
# ===================================================
resource "aws_iam_role" "lambda_role" {
name = "${var.project_name}-lambda-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy_attachment" "lambda_basic" {
role = aws_iam_role.lambda_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
# ===================================================
# IAM ロール: Step Functions 実行ロール
# ===================================================
resource "aws_iam_role" "sf_execution_role" {
name = "${var.project_name}-sf-execution-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "states.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy" "sf_policy" {
name = "${var.project_name}-sf-policy"
role = aws_iam_role.sf_execution_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "InvokeLambda"
Effect = "Allow"
Action = ["lambda:InvokeFunction"]
Resource = [
aws_lambda_function.submit_expense.arn,
aws_lambda_function.process_expense.arn
]
},
{
Sid = "SendSQSMessage"
Effect = "Allow"
Action = ["sqs:SendMessage"]
Resource = [aws_sqs_queue.expense_approval.arn]
},
{
Sid = "PublishSNS"
Effect = "Allow"
Action = ["sns:Publish"]
Resource = [aws_sns_topic.expense_notification.arn]
},
{
Sid = "CloudWatchLogs"
Effect = "Allow"
Action = [
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
]
Resource = ["*"]
}
]
})
}
# ===================================================
# CloudWatch Logs グループ
# ===================================================
resource "aws_cloudwatch_log_group" "submit_expense" {
name = "/aws/lambda/${var.project_name}-submit-expense"
retention_in_days = 7
}
resource "aws_cloudwatch_log_group" "process_expense" {
name = "/aws/lambda/${var.project_name}-process-expense"
retention_in_days = 7
}
# ===================================================
# Lambda 関数
# ===================================================
resource "aws_lambda_function" "submit_expense" {
filename= data.archive_file.submit_expense.output_path
function_name = "${var.project_name}-submit-expense"
role = aws_iam_role.lambda_role.arn
handler = "submit_expense.lambda_handler"
runtime = "python3.12"
source_code_hash = data.archive_file.submit_expense.output_base64sha256
depends_on = [
aws_iam_role_policy_attachment.lambda_basic,
aws_cloudwatch_log_group.submit_expense
]
}
resource "aws_lambda_function" "process_expense" {
filename= data.archive_file.process_expense.output_path
function_name = "${var.project_name}-process-expense"
role = aws_iam_role.lambda_role.arn
handler = "process_expense.lambda_handler"
runtime = "python3.12"
source_code_hash = data.archive_file.process_expense.output_base64sha256
depends_on = [
aws_iam_role_policy_attachment.lambda_basic,
aws_cloudwatch_log_group.process_expense
]
}
# ===================================================
# SQS キュー
# ===================================================
resource "aws_sqs_queue" "expense_approval" {
name = "${var.project_name}-approval"
# VisibilityTimeout は HeartbeatSeconds(3600)より短くしないこと
# ここでは 300 秒(5分)に設定(承認者がメッセージを処理する想定時間)
visibility_timeout_seconds = 300
message_retention_seconds = 86400 # 1日
}
# ===================================================
# SNS トピック&メールサブスクリプション
# ===================================================
resource "aws_sns_topic" "expense_notification" {
name = "${var.project_name}-notification"
}
resource "aws_sns_topic_subscription" "email" {
topic_arn = aws_sns_topic.expense_notification.arn
protocol = "email"
endpoint = var.notification_email
}
# ===================================================
# Step Functions ステートマシン
# ===================================================
resource "aws_sfn_state_machine" "expense_approval" {
name = "${var.project_name}-workflow"
role_arn = aws_iam_role.sf_execution_role.arn
definition = templatefile("statemachine/definition.json.tpl", {
submit_expense_arn = aws_lambda_function.submit_expense.arn,
process_expense_arn = aws_lambda_function.process_expense.arn,
sqs_queue_url = aws_sqs_queue.expense_approval.url,
sns_topic_arn = aws_sns_topic.expense_notification.arn
})
depends_on = [
aws_iam_role_policy.sf_policy
]
}
4-7. outputs.tf
output "state_machine_arn" {
description = "Step Functions ステートマシンの ARN"
value = aws_sfn_state_machine.expense_approval.arn
}
output "sqs_queue_url" {
description = "承認待ち SQS キューの URL"
value = aws_sqs_queue.expense_approval.url
}
output "sns_topic_arn" {
description = "通知用 SNS トピックの ARN"
value = aws_sns_topic.expense_notification.arn
}
output "submit_expense_lambda_arn" {
description = "submit_expense Lambda の ARN"
value = aws_lambda_function.submit_expense.arn
}
output "process_expense_lambda_arn" {
description = "process_expense Lambda の ARN"
value = aws_lambda_function.process_expense.arn
}
4-8. デプロイ手順
ファイルの準備ができたら、以下の順番でデプロイします。
# 1. 初期化(プロバイダーのダウンロード)
terraform init
# 2. 実行計画を確認
terraform plan -var="notification_email=your@email.com"
# 3. リソースを作成
terraform apply -var="notification_email=your@email.com"
terraform apply 実行後、SNS サブスクリプションの確認メールが届きます。メール内の「Confirm subscription」リンクをクリックしてください。これをしないと通知メールが届きません。
terraform apply が成功すると、各リソースの ARN・URL が出力されます:
Outputs:
state_machine_arn = "arn:aws:states:ap-northeast-1:123456789012:stateMachine:expense-approval-workflow"
sqs_queue_url= "https://sqs.ap-northeast-1.amazonaws.com/123456789012/expense-approval-approval"
sns_topic_arn= "arn:aws:sns:ap-northeast-1:123456789012:expense-approval-notification"
submit_expense_lambda_arn= "arn:aws:lambda:ap-northeast-1:123456789012:function:expense-approval-submit-expense"
process_expense_lambda_arn = "arn:aws:lambda:ap-northeast-1:123456789012:function:expense-approval-process-expense"
4-9. 動作確認(CLI)
Step 1: 経費申請を実行する
# ステートマシン ARN を取得
STATE_MACHINE_ARN=$(terraform output -raw state_machine_arn)
SQS_QUEUE_URL=$(terraform output -raw sqs_queue_url)
# 経費申請を開始
EXECUTION=$(aws stepfunctions start-execution \
--state-machine-arn "$STATE_MACHINE_ARN" \
--input '{
"expense_id": "EXP-TF-001",
"amount": 50000,
"description": "出張交通費",
"applicant": "yamada.taro"
}')
EXECUTION_ARN=$(echo "$EXECUTION" | jq -r '.executionArn')
echo "実行ARN: $EXECUTION_ARN"
Step 2: SQS からタスクトークンを取得する
ワークフローが WaitForApproval ステートに到達すると、SQS にメッセージが届きます。
# SQS メッセージを受信(タスクトークンが含まれる)
MESSAGE=$(aws sqs receive-message \
--queue-url "$SQS_QUEUE_URL" \
--max-number-of-messages 1)
echo "$MESSAGE" | jq '.'
# タスクトークンを取り出す
TASK_TOKEN=$(echo "$MESSAGE" | jq -r '.Messages[0].Body | fromjson | .task_token')
echo "タスクトークン: $TASK_TOKEN"
# SQS からメッセージを削除(受信後は必ず削除する)
RECEIPT_HANDLE=$(echo "$MESSAGE" | jq -r '.Messages[0].ReceiptHandle')
aws sqs delete-message \
--queue-url "$SQS_QUEUE_URL" \
--receipt-handle "$RECEIPT_HANDLE"
Step 3-A: 承認する
aws stepfunctions send-task-success \
--task-token "$TASK_TOKEN" \
--task-output '{"approved": true, "approver": "manager.suzuki"}'
承認後の流れ:
1. RouteApproval → ProcessExpense → NotifyApproved
2. SNS 経由でメール通知が届く
3. ExpenseApproved(Succeed)で正常終了
Step 3-B: 却下する
aws stepfunctions send-task-success \
--task-token "$TASK_TOKEN" \
--task-output '{"approved": false, "approver": "manager.suzuki"}'
却下後の流れ:
1. RouteApproval → NotifyRejected
2. SNS 経由でメール通知が届く
3. ExpenseRejected(Fail)で終了
実行状態の確認
# 実行状態を確認
aws stepfunctions describe-execution \
--execution-arn "$EXECUTION_ARN" \
| jq '{status: .status, startDate: .startDate, stopDate: .stopDate}'
後片付け
確認が終わったらリソースを削除します:
terraform destroy -var="notification_email=your@email.com"
terraform destroy は作成したすべての AWS リソースを削除します。実行前に内容を確認してください。まとめ: コンソール版との対応関係
| コンソール版(Section 3) | Terraform(Section 4) |
|---|---|
| Step D の ASL を手動入力 | definition.json.tpl として管理 |
| Lambda を手動作成 | aws_lambda_function リソース |
| SQS を手動作成 | aws_sqs_queue リソース |
| SNS を手動作成 | aws_sns_topic リソース |
| IAM を手動設定 | aws_iam_role / aws_iam_role_policy |
コンソール版 Step D と ASL 構造が完全一致しています。 ステート名・遷移・Parameters・HeartbeatSeconds(3600)・TimeoutSeconds(86400)・Catch の構成はすべて同一であり、${...} のプレースホルダー部分のみが Terraform の templatefile() によって実際の ARN・URL に置換されます。
Section 5: 実践Tips(Callbackパターン設計ガイド)
5-1. タスクトークンの寿命と安全な保管
寿命: タスクトークンは最大1年間有効だが、TimeoutSeconds で短縮することを強く推奨。
安全な保管場所(重要度順):
| 保管場所 | 用途 | 注意点 |
|---|---|---|
| DynamoDB | 標準的な保管場所(expense_idをキーに保存) | TTLを設定して自動削除推奨 |
| SSM Parameter Store | 短期保管・設定値と一緒に管理 | SecureStringで暗号化 |
| SQSメッセージ本文 | 配信と同時に受け取るシンプルな方法 | メッセージ可視性タイムアウトに注意 |
DynamoDBでのトークン保管例(Lambda内):
import boto3
import os
def save_task_token(expense_id: str, task_token: str) -> None:
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["EXPENSE_TABLE"])
table.update_item(
Key={"expense_id": expense_id},
UpdateExpression="SET task_token = :token, #st = :status",
ExpressionAttributeNames={"#st": "status"},
ExpressionAttributeValues={":token": task_token, ":status": "pending_approval"}
)
5-2. 再送・リトライ時のべき等性設計
問題: SQSのメッセージは少なくとも1回配信(at-least-once delivery)。
同じタスクトークンで SendTaskSuccess が複数回呼ばれた場合、2回目以降はエラーになる。
べき等性の実装パターン:
import boto3
import json
def handle_approval(expense_id: str, approved: bool, approver: str) -> dict:
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("expense-requests")
# べき等性チェック: 既に処理済みか確認
item = table.get_item(Key={"expense_id": expense_id})["Item"]
if item.get("status") not in ("pending_approval",):
print(f"既に処理済み: expense_id={expense_id}, status={item['status']}")
return {"already_processed": True}
task_token = item["task_token"]
# ステータスを更新してから SendTaskSuccess(二重送信防止)
table.update_item(
Key={"expense_id": expense_id},
UpdateExpression="SET #st = :status",
ConditionExpression="attribute_exists(task_token)",
ExpressionAttributeNames={"#st": "status"},
ExpressionAttributeValues={":status": "processing"}
)
sf_client = boto3.client("stepfunctions")
sf_client.send_task_success(
taskToken=task_token,
output=json.dumps({"approved": approved, "approver": approver})
)
return {"sent": True}
ポイント: ステータスを processing に更新してから SendTaskSuccess を呼ぶことで、並列実行やリトライによる二重送信を防止する。
5-3. API Gateway + Lambda で Webベース承認UIを作るパターン(概念のみ)
CLIではなくWebブラウザから承認するパターン:
① SF: タスクトークンをDynamoDBに保存 + メール/Slack通知(承認URLを含む)
② 承認者: ブラウザでURLにアクセス → 承認/却下ボタンをクリック
③ API GW + Lambda: expense_idからDynamoDBでトークンを取得 → SendTaskSuccess/Failure
④ SF: 再開
承認URL例: https://api.example.com/approve?expense_id=EXP-001&action=approve
このパターンにより、エンジニア以外の承認者(マネージャーや経理担当者)もブラウザから承認作業を行えるようになる。
5-4. HeartbeatSeconds の活用パターン
長時間処理中の「生存確認」用途として HeartbeatSeconds を活用する:
import boto3
import time
def long_running_process_with_heartbeat(task_token: str) -> None:
sf_client = boto3.client("stepfunctions")
for i in range(10): # 長時間処理のシミュレーション
# 処理ステップ(例: 5分かかる処理)
time.sleep(300)
# Heartbeat送信(HeartbeatSecondsのタイマーをリセット)
sf_client.send_task_heartbeat(taskToken=task_token)
print(f"Heartbeat送信 ({i+1}/10)")
# 処理完了
sf_client.send_task_success(
taskToken=task_token,
output='{"completed": true}'
)
使い分けの目安:
| 設定 | 役割 | 推奨値 |
|---|---|---|
TimeoutSeconds | ステップ全体のタイムアウト | 処理の最大所要時間 × 1.5 |
HeartbeatSeconds | 生存確認の間隔 | Heartbeat送信間隔 × 2 |
HeartbeatSeconds を設定しておくと、処理が途中でフリーズした場合に States.HeartbeatTimeout エラーとして検出でき、Catch/Retry で適切に対処できる。
Section 6: ハンズオン後の削除手順
6-1. コスト注意事項
| リソース | 月額目安 | 備考 |
|---|---|---|
| Lambda | 無料枠内(本ハンズオン程度) | 100万リクエスト/月まで無料 |
| Step Functions | 無料枠内(本ハンズオン程度) | 4,000回の状態遷移まで無料 |
| SQS | 無料枠内(本ハンズオン程度) | 100万リクエスト/月まで無料 |
| SNS | 無料枠内(本ハンズオン程度) | 100万件まで無料 |
| DynamoDB | ~$0.25/GB/月 | ストレージ課金に注意 |
| CloudWatch Logs | ~$0.50/GB | ロググループを削除で停止 |
6-2. Terraformで構築した場合
以下のコマンド1つで全リソースを削除できる:
terraform destroy -var="notification_email=your@email.com"
terraform destroy を実行すると、terraform apply で作成した全リソースが削除される。削除前に確認プロンプトが表示されるので、yes と入力する。Terraformで管理されない CloudWatch Logs は手動で削除が必要:
aws logs delete-log-group --log-group-name /aws/lambda/submit-expense
aws logs delete-log-group --log-group-name /aws/lambda/process-expense
6-3. コンソールで構築した場合の削除チェックリスト
以下の順番で削除する(依存関係に注意):
- [ ] Step Functions ステートマシンを削除(コンソール: Step Functions → ステートマシン)
expense-callback-step-a、expense-callback-step-b、expense-callback-step-c、expense-callback-final- [ ] Lambda 関数を削除(コンソール: Lambda → 関数)
submit-expense、process-expense- [ ] SQS キューを削除(コンソール: SQS → キュー)
expense-approval-queue- [ ] SNS トピックを削除(コンソール: SNS → トピック)
expense-notification(サブスクリプションも自動削除)- [ ] DynamoDB テーブルを削除(コンソール: DynamoDB → テーブル)
expense-requests- [ ] IAM ロールと付随するポリシーを削除(コンソール: IAM → ロール)
sf-expense-execution-role- [ ] CloudWatch Logs ロググループを削除(コンソール: CloudWatch → ロールグループ)
/aws/lambda/submit-expense、/aws/lambda/process-expense
Section 7: まとめと次のステップ
7-1. この記事で学んだこと
このハンズオンを通じて、以下の内容を実践的に習得した:
.waitForTaskTokenによるCallbackパターンの仕組み — トークン発行→停止→返送→再開のライフサイクル.syncvs.waitForTaskTokenの使い分け — 同期的なECS/Lambdaタスク実行と非同期の人間承認フローの違いSendTaskSuccess/SendTaskFailure/SendTaskHeartbeatAPI の使い方 — AWS CLI と Python SDKの両方で操作HeartbeatSeconds/TimeoutSecondsのCallback固有の設定 — 生存確認と全体タイムアウトの使い分け- AWS CLIを使った手動承認体験 — 「承認ボタン」の代わりにCLIコマンドで承認する感覚をつかむ
- 段階的構築(Step A→B→C→D)でCallback機能を段階的に拡張 — 基本構造から始めてDynamoDB統合・SQS連携・人間承認まで積み上げ
- タスクトークンの安全な保管(DynamoDB)とべき等性設計 — 実務で使える堅牢な実装パターン
7-2. 次のステップ
Callbackパターンをマスターした後は、以下のテーマに進むとStep Functionsの理解がさらに深まる:
- Distributed Map(大規模並列バッチ処理) — S3上の大量ファイルを並列処理するパターン — 第6弾予告
- Express vs Standard Workflow の使い分け — 高頻度処理とコスト最適化のトレードオフ — 第7弾予告
- EventBridge + Step Functions の連携 — イベント駆動アーキテクチャでStep Functionsを自動起動するパターン
7-3. シリーズリンク
本記事は「AWS ハンズオン TechBlog」Step Functions シリーズの第5回です。
シリーズ一覧:
– 第1回: AWS Step Functions 入門 — コンソールとTerraformで学ぶハンズオン
– 第2回: ECS × Step Functions 入門 — CSVバッチをFargateタスクでジョブ化するハンズオン
– 第3回: Step Functions エラーハンドリング完全ガイド — 注文処理パイプラインで学ぶRetry/Catch/Timeout
– 第4回: Step Functions 入出力データフロー制御完全ガイド — 5つのフィルタでペイロードを最適化するハンズオン
– 第5回: 本記事(Callbackパターン完全ガイド)
次の記事を読む: 第6回 Step Functions Distributed Map ▶
7-4. 参考リンク
- Amazon States Language —
.waitForTaskToken SendTaskSuccessAPI リファレンス- Step Functions Callback パターンの例
- SQS 可視性タイムアウト