- 1 Step Functions Callbackパターン完全ガイド — .waitForTaskTokenで実現する経費承認ワークフロー
- 1.1 目次
- 1.2 Section 1: Callbackパターン(.waitForTaskToken)概念編
- 1.3 Section 2: アーキテクチャ解説
- 1.4 Section 3: AWSコンソールでのハンズオン
- 1.5 3-1. 前提条件
- 1.6 3-2. Lambda 2関数のデプロイ
- 1.7 3-3. SQSキュー作成
- 1.8 3-4. SNSトピック作成とメールサブスクリプション
- 1.9 3-5. IAMロール作成
- 1.10 3-6. ステートマシンを段階的に構築(記事の核心)
- 1.11 3-7. CloudWatch Logsで待機→再開の挙動を確認
- 1.12 まとめ(Section 3)
- 1.13 Section 4: Terraformでの構築
- 1.14 4-1. 前提条件
- 1.15 4-2. ディレクトリ構成
- 1.16 4-3. Lambda ソースコード
- 1.17 4-4. statemachine/definition.json.tpl
- 1.18 4-5. variables.tf
- 1.19 4-6. main.tf(完全なコード)
- 1.20 4-7. outputs.tf
- 1.21 4-8. デプロイ手順
- 1.22 4-9. 動作確認(CLI)
- 1.23 後片付け
- 1.24 まとめ: コンソール版との対応関係
- 1.25 Section 5: 実践Tips(Callbackパターン設計ガイド)
- 1.26 Section 6: ハンズオン後の削除手順
- 1.27 Section 7: まとめと次のステップ
Step Functions Callbackパターン完全ガイド — .waitForTaskTokenで実現する経費承認ワークフロー
公開日: 2026-04-12
難易度: 中級〜上級
所要時間: 約120分
シリーズ: [Step Functions シリーズ 第5回]
この記事で学ぶこと
– .waitForTaskToken によるCallback パターンの仕組み(.sync との違い)
– タスクトークンのライフサイクル(発行→SQS送信→停止→返送→再開)
– AWS CLIで手動SendTaskSuccessを叩く「承認ボタン」体験
– 段階的構築(Step A→B→C→D): 基本Callback→タイムアウト→承認/却下分岐→SNS通知
– Terraform での完全構築(コンソール版とASL完全一致)シリーズ前回記事:
– 第1回: AWS Step Functions 入門
– 第2回: ECS × Step Functions 入門
– 第3回: Step Functions エラーハンドリング完全ガイド
– 第4回: Step Functions 入出力データフロー制御完全ガイド
目次
- Callbackパターン(.waitForTaskToken)概念編
- アーキテクチャ解説
- AWSコンソールでのハンズオン
- Terraformでの構築
- 実践Tips(Callbackパターン設計ガイド)
- ハンズオン後の削除手順
- まとめと次のステップ
Section 1: Callbackパターン(.waitForTaskToken)概念編
1-1. Callbackパターンとは
AWS Step Functions には、外部リソースとの連携方法として .sync(同期) と .waitForTaskToken(Callback) の2つの統合パターンがある。
通常の .sync パターンでは、Step Functions が Lambda や ECS などを呼び出し、処理完了まで自動的に待機する。これは「Step Functions 側が完了を検知する」モデルだ。
一方、.waitForTaskToken(Callbackパターン) は「外部システムが完了を通知する」モデルである。Step Functions はタスクトークンと呼ばれる一意の識別子を発行し、それを外部システムに渡したうえで実行を 停止(pause) する。外部システムは処理完了後に SendTaskSuccess または SendTaskFailure API を呼び出すことで、Step Functions の実行が再開される。
主な用途:
– 人間の承認が必要なフロー(上長承認、コンプライアンスチェックなど)
– 外部 API の非同期応答待ち(決済処理、審査システムなど)
– SQS / API Gateway 経由のマイクロサービス間非同期連携
– Step Functions のタイムアウト内では完了しない長時間処理
1-2. .sync vs .waitForTaskToken 比較表
| 特性 | .sync | .waitForTaskToken |
|---|---|---|
| 実行モード | 同期(完了まで自動待機) | 非同期(トークン返送まで停止) |
| 最大待機時間 | 1年(Standard Workflow) | 1年(TimeoutSeconds 設定推奨) |
| ユースケース | Lambda / ECS / Glue 等の完了待ち | 人間承認・外部 API・SQS / API GW 経由の非同期処理 |
| タスクトークン | 不要 | 必須($$.Task.Token) |
| 完了通知 | SF が自動検知 | 外部システムが SendTaskSuccess / SendTaskFailure を呼ぶ |
| HeartbeatSeconds | 一部リソースのみ対応 | 利用可能(長時間処理では推奨) |
1-3. タスクトークンのライフサイクル(最重要)
Callback パターンの核心は タスクトークン にある。以下がその一連の流れだ。
① SF: タスクトークン発行($$.Task.Token) ↓② SF: トークンを SQS / API GW 経由で外部システムに送信 → 実行を停止 ↓(外部システムが処理中 — 数秒〜数日)③ 外部: SendTaskSuccess(token, output) または SendTaskFailure(token, error) を呼ぶ ↓④ SF: 停止から再開 → 次のステートへ進むタスクトークンの重要な特性:
| 特性 | 内容 |
|---|---|
| 形式 | 推測不可能な不透明文字列(内部識別子) |
| 最大長 | 2,048 文字 |
| 有効期間 | 最大 1 年(ワークフロー実行期間と同じ) |
| 使用回数 | 一度限り(SendTaskSuccess / SendTaskFailure のどちらか一方のみ有効) |
| スコープ | 同一 AWS アカウント内のみ(クロスアカウント不可) |
| 保管責任 | 外部システム側が安全に保管する責任を持つ |
ポイント: トークンは
$$.Task.Tokenで Parameters に埋め込む。第4弾(データフロー制御)で学んだ Context Object($$)の知識がここで直接活きる。
1-4. SendTaskSuccess / SendTaskFailure / SendTaskHeartbeat API
SendTaskSuccess — 処理成功を通知
aws stepfunctions send-task-success \ --task-token "TOKEN" \ --task-output '{"approved": true, "approver": "manager"}'| パラメータ | 必須 | 制限 |
|---|---|---|
task-token | ○ | 1〜2,048 文字 |
task-output | ○ | 最大 262,144 バイト(256 KB)の JSON 文字列 |
task-output の内容は ResultPath に格納される。
SendTaskFailure — 処理失敗を通知
aws stepfunctions send-task-failure \ --task-token "TOKEN" \ --error "ExpenseRejected" \ --cause "却下理由: 金額が承認限度額を超過"| パラメータ | 必須 | 制限 |
|---|---|---|
task-token | ○ | 1〜2,048 文字 |
error | 任意 | 最大 256 文字(Catch の ErrorEquals で捕捉可能) |
cause | 任意 | 最大 32,768 文字 |
error フィールドは Catch ブロックで ErrorEquals: ["ExpenseRejected"] のように捕捉できる。
SendTaskHeartbeat — 生存確認を送信
aws stepfunctions send-task-heartbeat \ --task-token "TOKEN"HeartbeatSeconds のタイマーをリセットする「生存確認」用 API。長時間処理中に定期的に呼び出すことで States.HeartbeatTimeout を防ぐ。成功時はレスポンスボディなしの HTTP 200 が返る(実行履歴イベントは生成されない)。
共通エラーコード:
| エラー | 意味 |
|---|---|
InvalidToken | トークンの形式が不正 |
TaskDoesNotExist | トークンが有効なタスクに対応しない |
TaskTimedOut | タスクがタイムアウト済みまたは完了済み |
1-5. HeartbeatSeconds と TimeoutSeconds の Callback 固有挙動
| パラメータ | 意味 | タイムアウト時のエラー |
|---|---|---|
TimeoutSeconds | タスク全体のタイムアウト(SendTaskSuccess / Failure の最終期限) | States.Timeout |
HeartbeatSeconds | 連続する Heartbeat 間隔の最大値 | States.HeartbeatTimeout |
設定例:
{ "Type": "Task", "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken", "HeartbeatSeconds": 3600, "TimeoutSeconds": 86400, "Parameters": { "QueueUrl": "<SQS_QUEUE_URL>", "MessageBody": {"task_token.$": "$$.Task.Token" } }}重要な設計指針:
HeartbeatSeconds<TimeoutSecondsを必ず守る(例: Heartbeat=3600、Timeout=86400)HeartbeatSecondsは外部システムが定期的に生存確認を送れる場合のみ設定するTimeoutSecondsは SQS メッセージの可視性タイムアウトとは独立した別の概念TimeoutSecondsのデフォルト値は 99,999,999 秒(約 3.17 年)だが、明示的に設定することを推奨SendTaskHeartbeatはHeartbeatSecondsタイマーをリセットするが、TimeoutSecondsは延長しない
1-6. ユースケース一覧
| ユースケース | 説明 | Callback を使う理由 |
|---|---|---|
| 人間の承認フロー | 上長承認、リリース承認、コンプライアンスチェック | 承認まで数時間〜数日かかる可能性 |
| 外部 API 非同期応答 | 外部決済処理、外部審査システムの応答待ち | 外部 API が非同期モデルを採用 |
| サービス間オーケストレーション | マイクロサービス間の非同期連携(SQS / EventBridge 経由) | 受信側サービスが完了タイミングを制御 |
| 手動データ検証 | オペレーターが目視確認後に処理を再開するフロー | 人間の判断が必要 |
| 支払い処理待ち | ユーザーが支払いフォームに入力するまでの待機 | ユーザー操作完了まで時間不定 |
1-7. Context Object でのトークン取得方法
$$.Task.Token は Context Object($$)経由で取得する。第4弾(データフロー制御)で学んだ Context Object の知識がここで直接活きる。
{ "WaitForApproval": { "Type": "Task", "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken", "Parameters": {"QueueUrl": "<SQS_QUEUE_URL>","MessageBody": { "task_token.$": "$$.Task.Token", "expense_id.$": "$.expense_id"} } }}構文のポイント:
| 記法 | 意味 |
|---|---|
"task_token.$" | .$ サフィックス = 値を動的参照(パス/Context Object) |
"$$.Task.Token" | $$ = Context Object、.Task.Token = 現在のタスクのトークン |
"expense_id.$": "$.expense_id" | $ = 入力 JSON から値を参照 |
注意:
$$.Task.TokenはParametersおよびResultSelectorの中でのみ参照可能。InputPathやOutputPathでは使用できない。
Section 1 まとめ:
– .waitForTaskToken は「外部システムが完了を通知する」非同期パターン
– タスクトークンは一度限り有効、最大 2,048 文字、有効期間最大 1 年
– HeartbeatSeconds(States.HeartbeatTimeout)と TimeoutSeconds(States.Timeout)は独立したタイムアウト機構
– $$.Task.Token は Parameters / ResultSelector 内でのみ参照可能
Section 2: アーキテクチャ解説
2-1. ハンズオンシナリオ概要
本ハンズオンでは 経費承認ワークフロー をシナリオとして使用します。社員が経費を申請し、上長が承認または却下し、結果をメールで通知するという実務でよく見られる非同期承認フローを AWS Step Functions の Callbackパターン(.waitForTaskToken)で実装します。
ワークフローの概要
| フェーズ | 処理 | 担当コンポーネント |
|---|---|---|
| 申請受付 | 経費情報をDynamoDBに保存し、承認依頼をSQSへ送信 | SubmitExpense Lambda |
| 承認待機 | タスクトークンを発行し実行を停止 | WaitForApproval(Callback) |
| 上長確認 | SQSからメッセージを取得し、CLIで承認/却下を返送 | 承認者(SQS + SF API) |
| 分岐処理 | 承認/却下/タイムアウトで後続ステートへ分岐 | RouteApproval(Choice) |
| 完了通知 | 申請者へSNS経由でメール通知 | NotifyApproved/Rejected/Timeout |
各コンポーネントの役割
- Step Functions State Machine — ワークフロー全体のオーケストレーター。
.waitForTaskTokenを使い、承認者の返答まで実行を停止する - SQS キュー — タスクトークンを埋め込んだメッセージを承認者へ配送するメッセージバッファ
- DynamoDB — 経費申請データ(申請者、金額、申請日時)を永続化
- SNS — 承認/却下/タイムアウト通知を申請者へメール配信
- Lambda — 経費受付処理と経理システムへの反映処理を担当
2-2. 全体アーキテクチャ

上図は経費承認ワークフロー全体のアーキテクチャを示しています。
ワークフローは Lambda または EventBridge Scheduler によってトリガーされ、Step Functions State Machine が処理を開始します。State Machine 内部は縦に配置された5つの主要ステートで構成されます。
Callbackの核心: WaitForApproval ステートは ★停止・待機ポイント です。このステートに到達すると Step Functions はタスクトークンを生成し、SQSへメッセージを送信した後、外部から SendTaskSuccess または SendTaskFailure が呼ばれるまで実行を停止します。この待機中にも Step Functions の状態は保持され、課金は発生しません(Standard Workflow の場合)。
承認者はSQSキューからメッセージを取得し、タスクトークンを使って aws stepfunctions send-task-success を実行することでワークフローを再開させます。
2-3. Callbackフロー詳解

Callbackパターンのデータフローを上段(発行・送信・停止)と下段(承認・返送・再開)に分けて示しています。
タスクトークンの発行
State Definition で .waitForTaskToken を指定したリソースが呼ばれると、Step Functions は自動的にタスクトークン(約1KBの不透明な文字列)を生成します。このトークンは $$.Task.Token という組み込み変数で参照できます。
"WaitForApproval": { "Type": "Task", "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken", "Parameters": { "QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789/approval-queue", "MessageBody": {"TaskToken.$": "$$.Task.Token","ExpenseId.$": "$.expenseId","Amount.$": "$.amount" } }, "HeartbeatSeconds": 86400, "Next": "RouteApproval"}停止と待機
SQSへのメッセージ送信が完了した時点で、ステートの実行は停止します。State Machine は RUNNING 状態を維持したまま、タスクトークンが返送されるのを待ちます。Standard Workflow では最大1年間待機でき、HeartbeatSeconds でハートビートタイムアウトを設定できます。
トークンの返送と再開
承認者はSQSメッセージからタスクトークンを取得し、以下のいずれかを実行します:
| コマンド | 意味 | 後続動作 |
|---|---|---|
send-task-success | 承認(結果JSONを渡す) | RouteApprovalステートへ進む |
send-task-failure | システムエラー | エラーハンドラへ |
# 承認例aws stepfunctions send-task-success \ --task-token "AQC..." \ --task-output '{"approved": true, "approver": "manager-a", "comment": "承認します"}'# 却下例aws stepfunctions send-task-success \ --task-token "AQC..." \ --task-output '{"approved": false, "approver": "manager-a", "comment": "金額が上限超過"}'2-4. 承認/却下シーケンス

承認フロー(通常系)
- SF → SubmitExpense Lambda: 経費申請を受け付け、DynamoDBに保存
- Lambda → SF: 申請ID・金額などを返却
- SF → SQS: タスクトークンを埋め込んだメッセージを送信
- SF: ⏸ 実行停止(待機開始)—
WaitForApprovalステートで停止 - 承認者 → SQS: SQSキューからメッセージを受信(ポーリング or Lambda トリガー)
- 承認者 → SF:
SendTaskSuccess {"approved": true, "approver": "manager"}を送信 - SF: ▶ 実行再開(待機終了)—
RouteApprovalステートへ進む - SF → ProcessExpense Lambda: 経理システムへ反映
- SF → SNS: 承認完了通知を送信 → 申請者へメール
- SF: SUCCEEDED終了
却下フロー
ステップ6で {"approved": false} が返送されると、RouteApproval(Choiceステート)の条件分岐により却下パスに進み、NotifyRejected(SNS)を経てFAILED終了します。
"RouteApproval": { "Type": "Choice", "Choices": [ {"Variable": "$.approved","BooleanEquals": true,"Next": "ProcessExpense" }, {"Variable": "$.approved","BooleanEquals": false,"Next": "NotifyRejected" } ]}タイムアウトフロー
WaitForApproval ステートに HeartbeatSeconds: 86400(24時間)を設定することで、承認が一定時間行われない場合に States.HeartbeatTimeout エラーが発生します。Catch ブロックで補足し NotifyTimeout(SNS)へ分岐させることで、申請者・申請者管理者へタイムアウト通知を送ります。
"WaitForApproval": { ..., "HeartbeatSeconds": 86400, "Catch": [ {"ErrorEquals": ["States.HeartbeatTimeout"],"Next": "NotifyTimeout" } ]}ポイント:
HeartbeatSecondsはトークン返送のタイムアウトを制御します。ワークフロー全体の最大実行時間はTimeoutSecondsで別途制御できます。Standard Workflow のデフォルト最大実行時間は1年です。
Section 3: AWSコンソールでのハンズオン
このセクションでは、AWSコンソールを使って経費承認ワークフローを実際に構築します。
「承認ボタンを押す」体験をAWS CLIで行うことが、このハンズオンの核心です。
ステートマシンはStep A→B→C→Dと段階的に機能を追加しながら構築します。最終形(Step D)では、SQS Callback + 承認/却下分岐 + SNS通知が全て揃った完成形を体験できます。
3-1. 前提条件
必要なAWS権限
以下のサービスを操作できるIAM権限が必要です。
| サービス | 必要な権限 |
|---|---|
| AWS Lambda | 関数の作成・編集・実行 |
| AWS Step Functions | ステートマシンの作成・実行 |
| Amazon SQS | キューの作成・メッセージ受信 |
| Amazon SNS | トピックの作成・サブスクリプション管理 |
| AWS IAM | ロール・ポリシーの作成 |
| Amazon CloudWatch Logs | ログの閲覧 |
AWS CLI の準備
このハンズオンでは、SendTaskSuccess/SendTaskFailure を AWS CLI で実行します。
コンソールからは送信できないため、CLIのインストールと設定が必須です。
# バージョン確認aws --version# aws-cli/2.x.x Python/3.x.x ...# 認証確認aws sts get-caller-identity認証情報が正しく設定されていれば、アカウントIDとARNが表示されます。
3-2. Lambda 2関数のデプロイ
経費承認ワークフローでは2つのLambda関数を使用します。
| 関数名 | 役割 |
|---|---|
submit_expense | 経費申請を受け付け、ステータスをpendingに設定 |
process_expense | 承認後の経理処理(DynamoDB書き込みなど)を実行 |
submit_expense 関数の作成
コンソール操作手順:
- AWSコンソールで Lambda を開く
- 「関数の作成」をクリック
- 設定値:
- 作成方法: 一から作成
- 関数名:
submit_expense - ランタイム: Python 3.12
- アーキテクチャ: x86_64
- 「関数の作成」をクリック
- コードエディタに以下を貼り付け:
import jsonimport osimport boto3from datetime import datetimedef lambda_handler(event, context): expense_id = event.get("expense_id") amount = event.get("amount", 0) description = event.get("description", "") applicant = event.get("applicant", "unknown") if not expense_id: raise ValueError("expense_id は必須です") if amount <= 0: raise ValueError("amountは正の値が必要です") # DynamoDB への保存(オプション — Step Dで有効化) # dynamodb = boto3.resource("dynamodb") # table = dynamodb.Table(os.environ.get("EXPENSE_TABLE", "expense-requests")) # table.put_item(Item={ # "expense_id": expense_id, # "amount": amount, # "description": description, # "applicant": applicant, # "status": "pending", # "submitted_at": datetime.utcnow().isoformat() # }) print(f"経費申請受付: expense_id={expense_id}, amount={amount}, applicant={applicant}") return { "expense_id": expense_id, "status": "pending", "submitted_at": datetime.utcnow().isoformat() }- 「Deploy」をクリックして保存
- 関数ARNをメモ(例:
arn:aws:lambda:ap-northeast-1:123456789012:function:submit_expense)
→<LAMBDA_ARN_SUBMIT_EXPENSE>プレースホルダーに使用
process_expense 関数の作成
同様の手順で2つ目の関数を作成します。
- Lambda コンソール → 「関数の作成」
- 設定値:
- 関数名:
process_expense - ランタイム: Python 3.12
- コードエディタに以下を貼り付け:
import jsonimport osfrom datetime import datetimedef lambda_handler(event, context): expense_id = event.get("expense_id") amount = event.get("amount", 0) approver = event.get("approver", "unknown") print(f"経理処理実行: expense_id={expense_id}, amount={amount}, approver={approver}") # 経理システムへの連携処理(ハンズオンではモック) return { "expense_id": expense_id, "amount": amount, "approver": approver, "processed_at": datetime.utcnow().isoformat(), "accounting_ref": f"ACC-{expense_id}-001" }- 「Deploy」をクリックして保存
- 関数ARNをメモ
→<LAMBDA_ARN_PROCESS_EXPENSE>プレースホルダーに使用
3-3. SQSキュー作成
Callbackパターンの中心となるSQSキューを作成します。
コンソール操作手順:
- AWSコンソールで Amazon SQS を開く
- 「キューの作成」をクリック
- 設定値:
- タイプ: スタンダード
- 名前:
expense-approval-queue - 可視性タイムアウト: 300秒(デフォルト30秒から変更推奨)
> 可視性タイムアウトが短いと、承認者がメッセージを処理中に他のコンシューマーが再取得してしまいます。 - その他はデフォルトのまま「キューの作成」をクリック
- 作成後、キュー詳細画面から キューURL をメモ
- 例:
https://sqs.ap-northeast-1.amazonaws.com/123456789012/expense-approval-queue - →
<SQS_QUEUE_URL>プレースホルダーに使用
3-4. SNSトピック作成とメールサブスクリプション
承認/却下/タイムアウトの通知を送るSNSトピックを作成します(Step Dで使用)。
コンソール操作手順:
- AWSコンソールで Amazon SNS を開く
- 「トピックの作成」をクリック
- 設定値:
- タイプ: スタンダード
- 名前:
expense-notification - 「トピックの作成」をクリック
- トピックARNをメモ
- 例:
arn:aws:sns:ap-northeast-1:123456789012:expense-notification - →
<SNS_TOPIC_ARN>プレースホルダーに使用
メールサブスクリプションの設定:
- 作成したトピックの詳細画面 → 「サブスクリプションの作成」
- 設定値:
- プロトコル: Eメール
- エンドポイント: 通知を受け取るメールアドレスを入力
- 「サブスクリプションの作成」をクリック
- 入力したメールアドレスに確認メールが届く
- メール内の 「Confirm subscription」リンクをクリック
このステップを忘れると、SNS通知がメールに届きません。
3-5. IAMロール作成
Step FunctionsがLambda・SQS・SNSを操作するためのIAMロールを作成します。
コンソール操作手順:
- AWSコンソールで IAM を開く → 「ロール」→「ロールを作成」
- 信頼されたエンティティの選択:
- タイプ: AWSのサービス
- ユースケース: 「Step Functions」を選択(リストにない場合は「カスタム信頼ポリシー」を使用)
- 信頼ポリシー(カスタム選択時):
json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "states.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]} - ロール名:
sf-expense-execution-role - ロール作成後、「インラインポリシーを追加」から以下を設定:
{ "Version": "2012-10-17", "Statement": [ {"Sid": "InvokeLambda","Effect": "Allow","Action": "lambda:InvokeFunction","Resource": [ "<LAMBDA_ARN_SUBMIT_EXPENSE>", "<LAMBDA_ARN_PROCESS_EXPENSE>"] }, {"Sid": "SendSQSMessage","Effect": "Allow","Action": "sqs:SendMessage","Resource": "<SQS_QUEUE_ARN>" }, {"Sid": "PublishSNS","Effect": "Allow","Action": "sns:Publish","Resource": "<SNS_TOPIC_ARN>" }, {"Sid": "CloudWatchLogs","Effect": "Allow","Action": [ "logs:CreateLogGroup", "logs:CreateLogDelivery", "logs:PutLogEvents", "logs:DescribeLogGroups", "logs:DescribeLogStreams", "logs:GetLogDelivery", "logs:ListLogDeliveries", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies"],"Resource": "*" } ]}SQS_QUEUE_ARN について: キューのARNはキューURL とは異なります。SQSコンソールでキューの詳細を開くと「ARN」フィールドで確認できます(例:
arn:aws:sqs:ap-northeast-1:123456789012:expense-approval-queue)。
- ポリシー名:
sf-expense-policyとして保存
3-6. ステートマシンを段階的に構築(記事の核心)
ここからがこのハンズオンの核心です。Step A→D と段階的にステートマシンを構築し、Callbackパターンの仕組みを体験的に理解します。
Step A: 基本Callback — 承認体験を初めて体感する
まず最もシンプルな形でCallbackパターンを体験します。
ステートマシン作成手順:
- AWSコンソールで Step Functions を開く
- 「ステートマシンの作成」をクリック
- 「コードでワークフローを記述」を選択
- 以下のASLを貼り付け(
<プレースホルダー>を実際の値に置換):
{ "Comment": "経費承認ワークフロー — Step A(基本Callback)", "StartAt": "SubmitExpense", "States": { "SubmitExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>","ResultPath": "$.submission","Next": "WaitForApproval" }, "WaitForApproval": {"Type": "Task","Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken","Parameters": { "QueueUrl": "<SQS_QUEUE_URL>", "MessageBody": { "task_token.$": "$$.Task.Token", "expense_id.$": "$.expense_id", "amount.$": "$.amount" }},"ResultPath": "$.approval","Next": "ProcessExpense" }, "ProcessExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>","ResultPath": "$.processing","Next": "ExpenseApproved" }, "ExpenseApproved": {"Type": "Succeed" } }}
$$.Task.TokenがCallbackの核心:$$はStep Functionsのコンテキストオブジェクトへのアクセスです。実行時に一意のトークンが生成され、SQSメッセージに埋め込まれます。このトークンを後でSendTaskSuccessに渡すことで、ワークフローが再開します。
- 設定値:
- ステートマシン名:
expense-approval-step-a - 実行ロール:
sf-expense-execution-role - ログ設定: ALL レベル を推奨(待機中の挙動が CloudWatch で確認できます)
- 「ステートマシンの作成」をクリック
実行テスト:
- 「実行の開始」をクリック
- 入力JSONに以下を貼り付け:
{ "expense_id": "EXP-001", "amount": 50000, "description": "出張交通費", "applicant": "yamada.taro"}- 「実行の開始」をクリック
WaitForApproval で止まることを確認:
実行グラフで WaitForApproval ステートが青色(RUNNING)のまま止まっているはずです。
これがCallbackパターンの動作です — Step Functionsは外部からの応答を待って一時停止しています。
SQSからtask_tokenを取得:
- SQSコンソール →
expense-approval-queueを開く - 「メッセージを送受信」→「メッセージをポーリング」をクリック
- 受信したメッセージの本文を確認:
json
{
"task_token": "AQCAAAAKAAAAAAAAADg...(長い文字列)",
"expense_id": "EXP-001",
"amount": 50000
} task_tokenの値をコピー(非常に長い文字列です)
★ 承認体験(記事の核心)
AWS CLIで以下を実行します。これが「承認ボタンを押す」体験です:
aws stepfunctions send-task-success \ --task-token "SQSメッセージから取得したtask_token" \ --task-output '{"approved": true, "approver": "manager.suzuki"}'コマンド実行後、Step Functions コンソールに戻ると:
– WaitForApproval → SUCCEEDED
– ProcessExpense → 実行中
– ExpenseApproved → SUCCEEDED
実行全体が SUCCEEDED になることを確認してください。
これがCallbackパターンの基本動作です: Step Functionsはtask_tokenを発行してSQSにメッセージを送り、外部(今回はCLI)からSendTaskSuccessが届くまで待機します。実際のシステムでは「CLIコマンド」の部分が「承認者がWebアプリで承認ボタンを押す」操作に相当します。
Step B: タイムアウト付き — HeartbeatSeconds / TimeoutSeconds を体験
ステートマシン作成手順:
- 「ステートマシンの作成」→「コードでワークフローを記述」
- 以下のASLを貼り付け:
{ "Comment": "経費承認ワークフロー — Step B(タイムアウト付き)", "StartAt": "SubmitExpense", "States": { "SubmitExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>","ResultPath": "$.submission","Next": "WaitForApproval" }, "WaitForApproval": {"Type": "Task","Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken","Parameters": { "QueueUrl": "<SQS_QUEUE_URL>", "MessageBody": { "task_token.$": "$$.Task.Token", "expense_id.$": "$.expense_id", "amount.$": "$.amount" }},"HeartbeatSeconds": 3600,"TimeoutSeconds": 86400,"ResultPath": "$.approval","Catch": [ { "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"], "ResultPath": "$.error", "Next": "ExpenseTimeout" }],"Next": "ProcessExpense" }, "ProcessExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>","ResultPath": "$.processing","Next": "ExpenseApproved" }, "ExpenseApproved": {"Type": "Succeed" }, "ExpenseTimeout": {"Type": "Fail","Error": "ApprovalTimeout","Cause": "承認がタイムアウトしました" } }}- ステートマシン名:
expense-approval-step-b - 「ステートマシンの作成」をクリック
タイムアウト値の意味:
| パラメータ | 値 | 意味 |
|---|---|---|
HeartbeatSeconds | 3600 | 1時間以内にハートビートがなければタイムアウト |
TimeoutSeconds | 86400 | 24時間以内にSendTaskSuccess/Failureがなければタイムアウト |
補足テスト — タイムアウト体験:
本番用ASLのTimeoutSeconds=86400(24時間)は変更せず、別途タイムアウトを体験するための短縮版を使います。新しいステートマシンで TimeoutSeconds を 30 に変更してテストしてください:
"WaitForApproval": { "Type": "Task", "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken", "Parameters": { ... }, "HeartbeatSeconds": 30, "TimeoutSeconds": 30, "Catch": [ {"ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],"ResultPath": "$.error","Next": "ExpenseTimeout" } ], "Next": "ProcessExpense"}SendTaskSuccessを送らずに30秒待つと、States.Timeout エラーが発生し、Catch により ExpenseTimeout(Fail)ステートに遷移します。
Step C: 承認/却下分岐 — SendTaskFailure体験
ステートマシン作成手順:
- 「ステートマシンの作成」→「コードでワークフローを記述」
- 以下のASLを貼り付け:
{ "Comment": "経費承認ワークフロー — Step C(承認/却下分岐)", "StartAt": "SubmitExpense", "States": { "SubmitExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>","ResultPath": "$.submission","Next": "WaitForApproval" }, "WaitForApproval": {"Type": "Task","Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken","Parameters": { "QueueUrl": "<SQS_QUEUE_URL>", "MessageBody": { "task_token.$": "$$.Task.Token", "expense_id.$": "$.expense_id", "amount.$": "$.amount" }},"HeartbeatSeconds": 3600,"TimeoutSeconds": 86400,"ResultPath": "$.approval","Catch": [ { "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"], "ResultPath": "$.error", "Next": "ExpenseTimeout" }],"Next": "RouteApproval" }, "RouteApproval": {"Type": "Choice","Choices": [ { "Variable": "$.approval.approved", "BooleanEquals": true, "Next": "ProcessExpense" }],"Default": "ExpenseRejected" }, "ProcessExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>","ResultPath": "$.processing","Next": "ExpenseApproved" }, "ExpenseApproved": {"Type": "Succeed" }, "ExpenseRejected": {"Type": "Fail","Error": "ExpenseRejected","Cause": "経費申請が却下されました" }, "ExpenseTimeout": {"Type": "Fail","Error": "ApprovalTimeout","Cause": "承認がタイムアウトしました" } }}- ステートマシン名:
expense-approval-step-c
承認テスト(approved: true):
aws stepfunctions send-task-success \ --task-token "TOKEN" \ --task-output '{"approved": true, "approver": "manager.suzuki"}'RouteApproval で approved == true → ProcessExpense → ExpenseApproved (SUCCEEDED)
却下テスト(approved: false):
aws stepfunctions send-task-success \ --task-token "TOKEN" \ --task-output '{"approved": false, "reason": "金額が承認限度額を超過"}'RouteApproval の Default → ExpenseRejected (FAILED) に遷移します。
SendTaskSuccess で却下を表現する:
approved: falseを含む正常な応答を返すことで、Choice ステートでルーティングします。これは「処理は正常に完了した(タスクトークンは有効に解決した)が、ビジネスロジック的に却下された」という設計です。
SendTaskFailure 体験:
SendTaskFailure は「外部処理自体が失敗した」場合に使います:
aws stepfunctions send-task-failure \ --task-token "TOKEN" \ --error "ExpenseRejected" \ --cause "却下理由: 経費規程違反"WaitForApproval の Catch に ErrorEquals: ["ExpenseRejected"] を追加していれば捕捉できます。
このパターンは「承認システム側でエラーが発生した」ケースに有効です。
Step D: 最終形 — SNS通知付き完成形
ステートマシン作成手順:
- 「ステートマシンの作成」→「コードでワークフローを記述」
- 以下のASLを貼り付け(
<SNS_TOPIC_ARN>を実際のARNに置換):
{ "Comment": "経費承認ワークフロー — Step D(最終形: SQS Callback+承認・却下分岐+SNS通知)", "StartAt": "SubmitExpense", "States": { "SubmitExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>","Parameters": { "expense_id.$": "$.expense_id", "amount.$": "$.amount", "description.$": "$.description", "applicant.$": "$.applicant"},"ResultPath": "$.submission","Next": "WaitForApproval" }, "WaitForApproval": {"Type": "Task","Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken","Parameters": { "QueueUrl": "<SQS_QUEUE_URL>", "MessageBody": { "task_token.$": "$$.Task.Token", "expense_id.$": "$.expense_id", "amount.$": "$.amount", "applicant.$": "$.applicant", "message": "経費申請の承認をお願いします" }},"HeartbeatSeconds": 3600,"TimeoutSeconds": 86400,"ResultPath": "$.approval","Catch": [ { "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"], "ResultPath": "$.error", "Next": "NotifyTimeout" }],"Next": "RouteApproval" }, "RouteApproval": {"Type": "Choice","Choices": [ { "Variable": "$.approval.approved", "BooleanEquals": true, "Next": "ProcessExpense" }],"Default": "NotifyRejected" }, "ProcessExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>","Parameters": { "expense_id.$": "$.expense_id", "amount.$": "$.amount", "approver.$": "$.approval.approver"},"ResultPath": "$.processing","Next": "NotifyApproved" }, "NotifyApproved": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": { "TopicArn": "<SNS_TOPIC_ARN>", "Message.$": "States.Format('経費申請 {} が承認されました(承認者: {})', $.expense_id, $.approval.approver)", "Subject": "経費申請 承認完了通知"},"ResultPath": null,"Next": "ExpenseApproved" }, "NotifyRejected": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": { "TopicArn": "<SNS_TOPIC_ARN>", "Message.$": "States.Format('経費申請 {} が却下されました', $.expense_id)", "Subject": "経費申請 却下通知"},"ResultPath": null,"Next": "ExpenseRejected" }, "NotifyTimeout": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": { "TopicArn": "<SNS_TOPIC_ARN>", "Message.$": "States.Format('経費申請 {} の承認がタイムアウトしました', $.expense_id)", "Subject": "経費申請 承認タイムアウト"},"ResultPath": null,"Next": "ExpenseTimeout" }, "ExpenseApproved": {"Type": "Succeed" }, "ExpenseRejected": {"Type": "Fail","Error": "ExpenseRejected","Cause": "経費申請が却下されました" }, "ExpenseTimeout": {"Type": "Fail","Error": "ApprovalTimeout","Cause": "承認がタイムアウトしました" } }}- ステートマシン名:
expense-approval-final
3つのシナリオをテスト:
① 承認フロー:
aws stepfunctions send-task-success \ --task-token "TOKEN" \ --task-output '{"approved": true, "approver": "manager.suzuki"}'→ NotifyApproved → SNS メール受信(件名: 「経費申請 承認完了通知」)→ ExpenseApproved (SUCCEEDED)
② 却下フロー:
aws stepfunctions send-task-success \ --task-token "TOKEN" \ --task-output '{"approved": false, "reason": "金額超過"}'→ NotifyRejected → SNS メール受信(件名: 「経費申請 却下通知」)→ ExpenseRejected (FAILED)
③ タイムアウトフロー(補足テスト):
本番ASLのTimeoutSeconds=86400は変更せず、TimeoutSeconds: 30 の短縮版で確認します:
SendTaskSuccessを送らずに30秒待つ→ States.Timeout 発生→ NotifyTimeout → SNS メール受信(件名: 「経費申請 承認タイムアウト」)→ ExpenseTimeout (FAILED)
States.Formatについて: Step Functionsの組み込み関数で、動的な文字列を生成します。'経費申請 {} が承認されました', $.expense_idのように、{}が変数の値に置換されます。SNS通知のメッセージを動的に生成するのに便利です。
3-7. CloudWatch Logsで待機→再開の挙動を確認
Step Functionsの実行ログを確認することで、Callbackパターンの内部動作を詳しく把握できます。
ログ確認手順
- Step Functionsコンソール → 対象の実行を開く
- 「実行の詳細」タブ → 「ログ」セクションを確認
- CloudWatch Logsのリンクをクリックして詳細ログを表示
確認すべきイベントの流れ
ログで以下のシーケンスを確認してください:
WaitForApprovalStateEntered ← WaitForApproval ステート開始TaskSubmitted← SQSにメッセージ送信 + Task Token発行(...長い待機時間...)TaskSucceeded← SendTaskSuccess を受信WaitForApprovalStateExited ← WaitForApproval ステート終了ProcessExpenseStateEntered ← 次のステートへ遷移ExecutionHistory APIで待機時間を計測
AWS CLIを使ってプログラマティックに実行履歴を取得し、待機時間を計算できます:
# 実行ARNを確認aws stepfunctions list-executions \ --state-machine-arn "<YOUR_STATE_MACHINE_ARN>" \ --query 'executions[0].executionArn' \ --output text# 実行履歴を取得aws stepfunctions get-execution-history \ --execution-arn "<EXECUTION_ARN>" \ --query 'events[?type==`TaskSubmitted` || type==`TaskSucceeded`].[timestamp,type]' \ --output table出力例:
-----------------------------------------| timestamp | type |-----------------------------------------| 2026-04-12T10:00:00Z| TaskSubmitted || 2026-04-12T10:05:23Z| TaskSucceeded |-----------------------------------------TaskSubmitted と TaskSucceeded のタイムスタンプ差が、実際の承認待機時間です。
この例では 5分23秒 承認待ちだったことがわかります。
CloudWatch Metrics でモニタリング
本番環境では以下のメトリクスを監視することを推奨します:
| メトリクス | 説明 | アラート推奨値 |
|---|---|---|
ExecutionsFailed | 失敗した実行数 | 1以上でアラート |
ExecutionsTimedOut | タイムアウトした実行数 | 1以上でアラート |
ExecutionThrottled | スロットリングされた実行数 | 急増時にアラート |
まとめ(Section 3)
このセクションでは、経費承認ワークフローを段階的に構築しました。
| Step | 追加した機能 | 学んだこと |
|---|---|---|
| A | 基本Callback | $$.Task.Token の仕組み、SendTaskSuccessによる再開 |
| B | タイムアウト | HeartbeatSeconds/TimeoutSeconds、States.Timeout の Catch |
| C | 承認/却下分岐 | Choice ステートでの approved フラグによるルーティング |
| D | SNS通知 | States.Format を使った動的メッセージ、3シナリオの完全テスト |
Callbackパターンの本質: Step Functionsはtask_tokenを外部に渡し、そのトークンが返ってくるまで待ちます。「承認者がボタンを押す」「外部システムが処理を完了する」「人間が確認する」など、任意の外部イベントでワークフローを再開できるのがCallbackパターンの強みです。
次のSection 4では、このワークフロー全体をTerraformで自動構築します。
Section 4: Terraformでの構築
Section 3のコンソール操作で構築した経費承認ワークフローを、今度はTerraformで再現します。インフラをコードで管理することで、環境の再現性が高まり、チーム開発での一貫した運用が可能になります。
4-1. 前提条件
- Terraform 1.0以上インストール済み
- AWS CLI設定済み(
aws configure実行済み) - 以下のIAM権限が付与されていること:
- Lambda の作成・実行
- IAM ロール・ポリシーの作成
- SQS キューの作成・操作
- SNS トピックの作成・サブスクライブ
- Step Functions ステートマシンの作成・実行
- CloudWatch Logs の作成
4-2. ディレクトリ構成
sf-callback/├── main.tf├── variables.tf├── outputs.tf├── lambda/│├── submit_expense.py│└── process_expense.py└── statemachine/ └── definition.json.tpl作業ディレクトリを作成します:
mkdir -p sf-callback/lambda sf-callback/statemachinecd sf-callback4-3. Lambda ソースコード
Section 3(コンソール版)と同じロジック構成の Lambda 関数です。
lambda/submit_expense.py
経費申請を受け付け、申請情報をそのまま返す関数です。
import jsonimport loggingfrom datetime import datetime, timezonelogger = logging.getLogger()logger.setLevel(logging.INFO)def lambda_handler(event, context): """ 経費申請受付 Lambda 受け取った申請情報を検証し、タイムスタンプを付与して返す """ logger.info(f"受信イベント: {json.dumps(event, ensure_ascii=False)}") expense_id = event.get("expense_id") amount = event.get("amount") description = event.get("description", "") applicant = event.get("applicant") # 基本バリデーション if not expense_id or amount is None or not applicant: raise ValueError("expense_id, amount, applicant は必須パラメータです") if amount <= 0: raise ValueError("amount は正の数値である必要があります") submitted_at = datetime.now(timezone.utc).isoformat() result = { "expense_id": expense_id, "amount": amount, "description": description, "applicant": applicant, "submitted_at": submitted_at, "status": "SUBMITTED", } logger.info(f"申請受付完了: {json.dumps(result, ensure_ascii=False)}") return resultlambda/process_expense.py
承認済みの経費を処理する関数です。
import jsonimport loggingfrom datetime import datetime, timezonelogger = logging.getLogger()logger.setLevel(logging.INFO)def lambda_handler(event, context): """ 経費処理 Lambda 承認済みの経費申請を処理し、処理結果を返す """ logger.info(f"受信イベント: {json.dumps(event, ensure_ascii=False)}") expense_id = event.get("expense_id") amount = event.get("amount") approver = event.get("approver") if not expense_id or amount is None or not approver: raise ValueError("expense_id, amount, approver は必須パラメータです") processed_at = datetime.now(timezone.utc).isoformat() result = { "expense_id": expense_id, "amount": amount, "approver": approver, "processed_at": processed_at, "status": "PROCESSED", "payment_scheduled": True, } logger.info(f"経費処理完了: {json.dumps(result, ensure_ascii=False)}") return result4-4. statemachine/definition.json.tpl
コンソール版(Section 3)の Step D と同じ ASL 構造です。
Terraform の templatefile() 関数を使用し、${...} 形式のプレースホルダーを実際の ARN・URL に置換します。
templatefile() のしくみ
# main.tf での使用例definition = templatefile("statemachine/definition.json.tpl", { submit_expense_arn = aws_lambda_function.submit_expense.arn, process_expense_arn = aws_lambda_function.process_expense.arn, sqs_queue_url = aws_sqs_queue.expense_approval.url, sns_topic_arn = aws_sns_topic.expense_notification.arn})| プレースホルダー | 置換される値 |
|---|---|
${submit_expense_arn} | submit_expense Lambda の ARN |
${process_expense_arn} | process_expense Lambda の ARN |
${sqs_queue_url} | SQS キューの URL |
${sns_topic_arn} | SNS トピックの ARN |
statemachine/definition.json.tpl の内容
{ "Comment": "経費承認ワークフロー — Step D(最終形: SQS Callback+承認・却下分岐+SNS通知)", "StartAt": "SubmitExpense", "States": { "SubmitExpense": {"Type": "Task","Resource": "${submit_expense_arn}","Parameters": { "expense_id.$": "$.expense_id", "amount.$": "$.amount", "description.$": "$.description", "applicant.$": "$.applicant"},"ResultPath": "$.submission","Next": "WaitForApproval" }, "WaitForApproval": {"Type": "Task","Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken","Parameters": { "QueueUrl": "${sqs_queue_url}", "MessageBody": { "task_token.$": "$$.Task.Token", "expense_id.$": "$.expense_id", "amount.$": "$.amount", "applicant.$": "$.applicant", "message": "経費申請の承認をお願いします" }},"HeartbeatSeconds": 3600,"TimeoutSeconds": 86400,"ResultPath": "$.approval","Catch": [ { "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"], "ResultPath": "$.error", "Next": "NotifyTimeout" }],"Next": "RouteApproval" }, "RouteApproval": {"Type": "Choice","Choices": [ { "Variable": "$.approval.approved", "BooleanEquals": true, "Next": "ProcessExpense" }],"Default": "NotifyRejected" }, "ProcessExpense": {"Type": "Task","Resource": "${process_expense_arn}","Parameters": { "expense_id.$": "$.expense_id", "amount.$": "$.amount", "approver.$": "$.approval.approver"},"ResultPath": "$.processing","Next": "NotifyApproved" }, "NotifyApproved": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": { "TopicArn": "${sns_topic_arn}", "Message.$": "States.Format('経費申請 {} が承認されました(承認者: {})', $.expense_id, $.approval.approver)", "Subject": "経費申請 承認完了通知"},"ResultPath": null,"Next": "ExpenseApproved" }, "NotifyRejected": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": { "TopicArn": "${sns_topic_arn}", "Message.$": "States.Format('経費申請 {} が却下されました', $.expense_id)", "Subject": "経費申請 却下通知"},"ResultPath": null,"Next": "ExpenseRejected" }, "NotifyTimeout": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": { "TopicArn": "${sns_topic_arn}", "Message.$": "States.Format('経費申請 {} の承認がタイムアウトしました', $.expense_id)", "Subject": "経費申請 承認タイムアウト"},"ResultPath": null,"Next": "ExpenseTimeout" }, "ExpenseApproved": {"Type": "Succeed" }, "ExpenseRejected": {"Type": "Fail","Error": "ExpenseRejected","Cause": "経費申請が却下されました" }, "ExpenseTimeout": {"Type": "Fail","Error": "ApprovalTimeout","Cause": "承認がタイムアウトしました" } }}ポイント: コンソール版 Step D と ASL 構造が完全一致しています。ステート名・遷移・Parameters・HeartbeatSeconds・TimeoutSeconds・Catch のすべてが同一です。Terraform では
${...}部分のみ動的に補間されます。
4-5. variables.tf
variable "aws_region" { description = "AWSリージョン" default = "ap-northeast-1"}variable "project_name" { description = "プロジェクト名(リソース命名プレフィックスに使用)" default = "expense-approval"}variable "notification_email" { description = "承認/却下/タイムアウト通知先メールアドレス" type = string}4-6. main.tf(完全なコード)
terraform { required_version = ">= 1.0" required_providers { aws = {source = "hashicorp/aws"version = "~> 5.0" } archive = {source = "hashicorp/archive"version = "~> 2.0" } }}provider "aws" { region = var.aws_region}data "aws_caller_identity" "current" {}data "aws_region" "current" {}# ===================================================# Lambda ZIPパッケージ生成# ===================================================data "archive_file" "submit_expense" { type = "zip" source_file = "${path.module}/lambda/submit_expense.py" output_path = "${path.module}/.terraform/tmp/submit_expense.zip"}data "archive_file" "process_expense" { type = "zip" source_file = "${path.module}/lambda/process_expense.py" output_path = "${path.module}/.terraform/tmp/process_expense.zip"}# ===================================================# IAM ロール: Lambda 実行ロール# ===================================================resource "aws_iam_role" "lambda_role" { name = "${var.project_name}-lambda-role" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "lambda.amazonaws.com" }} ] })}resource "aws_iam_role_policy_attachment" "lambda_basic" { role = aws_iam_role.lambda_role.name policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"}# ===================================================# IAM ロール: Step Functions 実行ロール# ===================================================resource "aws_iam_role" "sf_execution_role" { name = "${var.project_name}-sf-execution-role" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "states.amazonaws.com" }} ] })}resource "aws_iam_role_policy" "sf_policy" { name = "${var.project_name}-sf-policy" role = aws_iam_role.sf_execution_role.id policy = jsonencode({ Version = "2012-10-17" Statement = [{ Sid = "InvokeLambda" Effect = "Allow" Action = ["lambda:InvokeFunction"] Resource = [ aws_lambda_function.submit_expense.arn, aws_lambda_function.process_expense.arn ]},{ Sid = "SendSQSMessage" Effect = "Allow" Action = ["sqs:SendMessage"] Resource = [aws_sqs_queue.expense_approval.arn]},{ Sid = "PublishSNS" Effect = "Allow" Action = ["sns:Publish"] Resource = [aws_sns_topic.expense_notification.arn]},{ Sid = "CloudWatchLogs" Effect = "Allow" Action = [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ] Resource = ["*"]} ] })}# ===================================================# CloudWatch Logs グループ# ===================================================resource "aws_cloudwatch_log_group" "submit_expense" { name = "/aws/lambda/${var.project_name}-submit-expense" retention_in_days = 7}resource "aws_cloudwatch_log_group" "process_expense" { name = "/aws/lambda/${var.project_name}-process-expense" retention_in_days = 7}# ===================================================# Lambda 関数# ===================================================resource "aws_lambda_function" "submit_expense" { filename= data.archive_file.submit_expense.output_path function_name = "${var.project_name}-submit-expense" role = aws_iam_role.lambda_role.arn handler = "submit_expense.lambda_handler" runtime = "python3.12" source_code_hash = data.archive_file.submit_expense.output_base64sha256 depends_on = [ aws_iam_role_policy_attachment.lambda_basic, aws_cloudwatch_log_group.submit_expense ]}resource "aws_lambda_function" "process_expense" { filename= data.archive_file.process_expense.output_path function_name = "${var.project_name}-process-expense" role = aws_iam_role.lambda_role.arn handler = "process_expense.lambda_handler" runtime = "python3.12" source_code_hash = data.archive_file.process_expense.output_base64sha256 depends_on = [ aws_iam_role_policy_attachment.lambda_basic, aws_cloudwatch_log_group.process_expense ]}# ===================================================# SQS キュー# ===================================================resource "aws_sqs_queue" "expense_approval" { name = "${var.project_name}-approval" # VisibilityTimeout は HeartbeatSeconds(3600)より短くしないこと # ここでは 300 秒(5分)に設定(承認者がメッセージを処理する想定時間) visibility_timeout_seconds = 300 message_retention_seconds = 86400 # 1日}# ===================================================# SNS トピック&メールサブスクリプション# ===================================================resource "aws_sns_topic" "expense_notification" { name = "${var.project_name}-notification"}resource "aws_sns_topic_subscription" "email" { topic_arn = aws_sns_topic.expense_notification.arn protocol = "email" endpoint = var.notification_email}# ===================================================# Step Functions ステートマシン# ===================================================resource "aws_sfn_state_machine" "expense_approval" { name = "${var.project_name}-workflow" role_arn = aws_iam_role.sf_execution_role.arn definition = templatefile("statemachine/definition.json.tpl", { submit_expense_arn = aws_lambda_function.submit_expense.arn, process_expense_arn = aws_lambda_function.process_expense.arn, sqs_queue_url = aws_sqs_queue.expense_approval.url, sns_topic_arn = aws_sns_topic.expense_notification.arn }) depends_on = [ aws_iam_role_policy.sf_policy ]}4-7. outputs.tf
output "state_machine_arn" { description = "Step Functions ステートマシンの ARN" value = aws_sfn_state_machine.expense_approval.arn}output "sqs_queue_url" { description = "承認待ち SQS キューの URL" value = aws_sqs_queue.expense_approval.url}output "sns_topic_arn" { description = "通知用 SNS トピックの ARN" value = aws_sns_topic.expense_notification.arn}output "submit_expense_lambda_arn" { description = "submit_expense Lambda の ARN" value = aws_lambda_function.submit_expense.arn}output "process_expense_lambda_arn" { description = "process_expense Lambda の ARN" value = aws_lambda_function.process_expense.arn}4-8. デプロイ手順
ファイルの準備ができたら、以下の順番でデプロイします。
# 1. 初期化(プロバイダーのダウンロード)terraform init# 2. 実行計画を確認terraform plan -var="notification_email=your@email.com"# 3. リソースを作成terraform apply -var="notification_email=your@email.com"terraform apply 実行後、SNS サブスクリプションの確認メールが届きます。メール内の「Confirm subscription」リンクをクリックしてください。これをしないと通知メールが届きません。
terraform apply が成功すると、各リソースの ARN・URL が出力されます:
Outputs:state_machine_arn = "arn:aws:states:ap-northeast-1:123456789012:stateMachine:expense-approval-workflow"sqs_queue_url= "https://sqs.ap-northeast-1.amazonaws.com/123456789012/expense-approval-approval"sns_topic_arn= "arn:aws:sns:ap-northeast-1:123456789012:expense-approval-notification"submit_expense_lambda_arn= "arn:aws:lambda:ap-northeast-1:123456789012:function:expense-approval-submit-expense"process_expense_lambda_arn = "arn:aws:lambda:ap-northeast-1:123456789012:function:expense-approval-process-expense"4-9. 動作確認(CLI)
Step 1: 経費申請を実行する
# ステートマシン ARN を取得STATE_MACHINE_ARN=$(terraform output -raw state_machine_arn)SQS_QUEUE_URL=$(terraform output -raw sqs_queue_url)# 経費申請を開始EXECUTION=$(aws stepfunctions start-execution \ --state-machine-arn "$STATE_MACHINE_ARN" \ --input '{ "expense_id": "EXP-TF-001", "amount": 50000, "description": "出張交通費", "applicant": "yamada.taro" }')EXECUTION_ARN=$(echo "$EXECUTION" | jq -r '.executionArn')echo "実行ARN: $EXECUTION_ARN"Step 2: SQS からタスクトークンを取得する
ワークフローが WaitForApproval ステートに到達すると、SQS にメッセージが届きます。
# SQS メッセージを受信(タスクトークンが含まれる)MESSAGE=$(aws sqs receive-message \ --queue-url "$SQS_QUEUE_URL" \ --max-number-of-messages 1)echo "$MESSAGE" | jq '.'# タスクトークンを取り出すTASK_TOKEN=$(echo "$MESSAGE" | jq -r '.Messages[0].Body | fromjson | .task_token')echo "タスクトークン: $TASK_TOKEN"# SQS からメッセージを削除(受信後は必ず削除する)RECEIPT_HANDLE=$(echo "$MESSAGE" | jq -r '.Messages[0].ReceiptHandle')aws sqs delete-message \ --queue-url "$SQS_QUEUE_URL" \ --receipt-handle "$RECEIPT_HANDLE"Step 3-A: 承認する
aws stepfunctions send-task-success \ --task-token "$TASK_TOKEN" \ --task-output '{"approved": true, "approver": "manager.suzuki"}'承認後の流れ:
1. RouteApproval → ProcessExpense → NotifyApproved
2. SNS 経由でメール通知が届く
3. ExpenseApproved(Succeed)で正常終了
Step 3-B: 却下する
aws stepfunctions send-task-success \ --task-token "$TASK_TOKEN" \ --task-output '{"approved": false, "approver": "manager.suzuki"}'却下後の流れ:
1. RouteApproval → NotifyRejected
2. SNS 経由でメール通知が届く
3. ExpenseRejected(Fail)で終了
実行状態の確認
# 実行状態を確認aws stepfunctions describe-execution \ --execution-arn "$EXECUTION_ARN" \ | jq '{status: .status, startDate: .startDate, stopDate: .stopDate}'後片付け
確認が終わったらリソースを削除します:
terraform destroy -var="notification_email=your@email.com"注意:
terraform destroyは作成したすべての AWS リソースを削除します。実行前に内容を確認してください。
まとめ: コンソール版との対応関係
| コンソール版(Section 3) | Terraform(Section 4) |
|---|---|
| Step D の ASL を手動入力 | definition.json.tpl として管理 |
| Lambda を手動作成 | aws_lambda_function リソース |
| SQS を手動作成 | aws_sqs_queue リソース |
| SNS を手動作成 | aws_sns_topic リソース |
| IAM を手動設定 | aws_iam_role / aws_iam_role_policy |
コンソール版 Step D と ASL 構造が完全一致しています。 ステート名・遷移・Parameters・HeartbeatSeconds(3600)・TimeoutSeconds(86400)・Catch の構成はすべて同一であり、${...} のプレースホルダー部分のみが Terraform の templatefile() によって実際の ARN・URL に置換されます。
Section 5: 実践Tips(Callbackパターン設計ガイド)
この Section では、Callbackパターンを実務で使う際のTipsと注意点を解説します。
5-1. タスクトークンの寿命と安全な保管
寿命: タスクトークンは最大1年間有効だが、TimeoutSeconds で短縮することを強く推奨。
安全な保管場所(重要度順):
| 保管場所 | 用途 | 注意点 |
|---|---|---|
| DynamoDB | 標準的な保管場所(expense_idをキーに保存) | TTLを設定して自動削除推奨 |
| SSM Parameter Store | 短期保管・設定値と一緒に管理 | SecureStringで暗号化 |
| SQSメッセージ本文 | 配信と同時に受け取るシンプルな方法 | メッセージ可視性タイムアウトに注意 |
DynamoDBでのトークン保管例(Lambda内):
import boto3import osdef save_task_token(expense_id: str, task_token: str) -> None: dynamodb = boto3.resource("dynamodb") table = dynamodb.Table(os.environ["EXPENSE_TABLE"]) table.update_item( Key={"expense_id": expense_id}, UpdateExpression="SET task_token = :token, #st = :status", ExpressionAttributeNames={"#st": "status"}, ExpressionAttributeValues={":token": task_token, ":status": "pending_approval"} )5-2. 再送・リトライ時のべき等性設計
問題: SQSのメッセージは少なくとも1回配信(at-least-once delivery)。
同じタスクトークンで SendTaskSuccess が複数回呼ばれた場合、2回目以降はエラーになる。
べき等性の実装パターン:
import boto3import jsondef handle_approval(expense_id: str, approved: bool, approver: str) -> dict: dynamodb = boto3.resource("dynamodb") table = dynamodb.Table("expense-requests") # べき等性チェック: 既に処理済みか確認 item = table.get_item(Key={"expense_id": expense_id})["Item"] if item.get("status") not in ("pending_approval",): print(f"既に処理済み: expense_id={expense_id}, status={item['status']}") return {"already_processed": True} task_token = item["task_token"] # ステータスを更新してから SendTaskSuccess(二重送信防止) table.update_item( Key={"expense_id": expense_id}, UpdateExpression="SET #st = :status", ConditionExpression="attribute_exists(task_token)", ExpressionAttributeNames={"#st": "status"}, ExpressionAttributeValues={":status": "processing"} ) sf_client = boto3.client("stepfunctions") sf_client.send_task_success( taskToken=task_token, output=json.dumps({"approved": approved, "approver": approver}) ) return {"sent": True}ポイント: ステータスを processing に更新してから SendTaskSuccess を呼ぶことで、並列実行やリトライによる二重送信を防止する。
5-3. API Gateway + Lambda で Webベース承認UIを作るパターン(概念のみ)
CLIではなくWebブラウザから承認するパターン:
① SF: タスクトークンをDynamoDBに保存 + メール/Slack通知(承認URLを含む)② 承認者: ブラウザでURLにアクセス → 承認/却下ボタンをクリック③ API GW + Lambda: expense_idからDynamoDBでトークンを取得 → SendTaskSuccess/Failure④ SF: 再開承認URL例: https://api.example.com/approve?expense_id=EXP-001&action=approve
このパターンにより、エンジニア以外の承認者(マネージャーや経理担当者)もブラウザから承認作業を行えるようになる。
5-4. HeartbeatSeconds の活用パターン
長時間処理中の「生存確認」用途として HeartbeatSeconds を活用する:
import boto3import timedef long_running_process_with_heartbeat(task_token: str) -> None: sf_client = boto3.client("stepfunctions") for i in range(10): # 長時間処理のシミュレーション # 処理ステップ(例: 5分かかる処理) time.sleep(300) # Heartbeat送信(HeartbeatSecondsのタイマーをリセット) sf_client.send_task_heartbeat(taskToken=task_token) print(f"Heartbeat送信 ({i+1}/10)") # 処理完了 sf_client.send_task_success( taskToken=task_token, output='{"completed": true}' )使い分けの目安:
| 設定 | 役割 | 推奨値 |
|---|---|---|
TimeoutSeconds | ステップ全体のタイムアウト | 処理の最大所要時間 × 1.5 |
HeartbeatSeconds | 生存確認の間隔 | Heartbeat送信間隔 × 2 |
HeartbeatSeconds を設定しておくと、処理が途中でフリーズした場合に States.HeartbeatTimeout エラーとして検出でき、Catch/Retry で適切に対処できる。
Section 6: ハンズオン後の削除手順
6-1. コスト注意事項
⚠️ 放置するとSQSメッセージ、DynamoDBストレージなどで課金が発生する可能性があります。
| リソース | 月額目安 | 備考 |
|---|---|---|
| Lambda | 無料枠内(本ハンズオン程度) | 100万リクエスト/月まで無料 |
| Step Functions | 無料枠内(本ハンズオン程度) | 4,000回の状態遷移まで無料 |
| SQS | 無料枠内(本ハンズオン程度) | 100万リクエスト/月まで無料 |
| SNS | 無料枠内(本ハンズオン程度) | 100万件まで無料 |
| DynamoDB | ~$0.25/GB/月 | ストレージ課金に注意 |
| CloudWatch Logs | ~$0.50/GB | ロググループを削除で停止 |
6-2. Terraformで構築した場合
以下のコマンド1つで全リソースを削除できる:
terraform destroy -var="notification_email=your@email.com"
terraform destroyを実行すると、terraform applyで作成した全リソースが削除される。削除前に確認プロンプトが表示されるので、yesと入力する。
Terraformで管理されない CloudWatch Logs は手動で削除が必要:
aws logs delete-log-group --log-group-name /aws/lambda/submit-expenseaws logs delete-log-group --log-group-name /aws/lambda/process-expense6-3. コンソールで構築した場合の削除チェックリスト
以下の順番で削除する(依存関係に注意):
- [ ] Step Functions ステートマシンを削除(コンソール: Step Functions → ステートマシン)
expense-callback-step-a、expense-callback-step-b、expense-callback-step-c、expense-callback-final- [ ] Lambda 関数を削除(コンソール: Lambda → 関数)
submit-expense、process-expense- [ ] SQS キューを削除(コンソール: SQS → キュー)
expense-approval-queue- [ ] SNS トピックを削除(コンソール: SNS → トピック)
expense-notification(サブスクリプションも自動削除)- [ ] DynamoDB テーブルを削除(コンソール: DynamoDB → テーブル)
expense-requests- [ ] IAM ロールと付随するポリシーを削除(コンソール: IAM → ロール)
sf-expense-execution-role- [ ] CloudWatch Logs ロググループを削除(コンソール: CloudWatch → ロールグループ)
/aws/lambda/submit-expense、/aws/lambda/process-expense
Section 7: まとめと次のステップ
7-1. この記事で学んだこと
このハンズオンを通じて、以下の内容を実践的に習得した:
.waitForTaskTokenによるCallbackパターンの仕組み — トークン発行→停止→返送→再開のライフサイクル.syncvs.waitForTaskTokenの使い分け — 同期的なECS/Lambdaタスク実行と非同期の人間承認フローの違いSendTaskSuccess/SendTaskFailure/SendTaskHeartbeatAPI の使い方 — AWS CLI と Python SDKの両方で操作HeartbeatSeconds/TimeoutSecondsのCallback固有の設定 — 生存確認と全体タイムアウトの使い分け- AWS CLIを使った手動承認体験 — 「承認ボタン」の代わりにCLIコマンドで承認する感覚をつかむ
- 段階的構築(Step A→B→C→D)でCallback機能を段階的に拡張 — 基本構造から始めてDynamoDB統合・SQS連携・人間承認まで積み上げ
- タスクトークンの安全な保管(DynamoDB)とべき等性設計 — 実務で使える堅牢な実装パターン
7-2. 次のステップ
Callbackパターンをマスターした後は、以下のテーマに進むとStep Functionsの理解がさらに深まる:
- Distributed Map(大規模並列バッチ処理) — S3上の大量ファイルを並列処理するパターン — 第6弾予告
- Express vs Standard Workflow の使い分け — 高頻度処理とコスト最適化のトレードオフ — 第7弾予告
- EventBridge + Step Functions の連携 — イベント駆動アーキテクチャでStep Functionsを自動起動するパターン
7-3. シリーズリンク
本記事は「AWS ハンズオン TechBlog」Step Functions シリーズの第5回です。
シリーズ一覧:
– 第1回: AWS Step Functions 入門 — コンソールとTerraformで学ぶハンズオン
– 第2回: ECS × Step Functions 入門 — CSVバッチをFargateタスクでジョブ化するハンズオン
– 第3回: Step Functions エラーハンドリング完全ガイド — 注文処理パイプラインで学ぶRetry/Catch/Timeout
– 第4回: Step Functions 入出力データフロー制御完全ガイド — 5つのフィルタでペイロードを最適化するハンズオン
– 第5回: 本記事(Callbackパターン完全ガイド)
7-4. 参考リンク
- Amazon States Language —
.waitForTaskToken SendTaskSuccessAPI リファレンス- Step Functions Callback パターンの例
- SQS 可視性タイムアウト