Step Functions Callbackパターン完全ガイド — .waitForTaskTokenで実現する経費承認ワークフローのハンズオン

目次

Step Functions Callbackパターン完全ガイド — .waitForTaskTokenで実現する経費承認ワークフロー

公開日: 2026-04-12
難易度: 中級〜上級
所要時間: 約120分
シリーズ: [Step Functions シリーズ 第5回]

この記事で学ぶこと
– .waitForTaskToken によるCallback パターンの仕組み(.sync との違い)
– タスクトークンのライフサイクル(発行→SQS送信→停止→返送→再開)
– AWS CLIで手動SendTaskSuccessを叩く「承認ボタン」体験
– 段階的構築(Step A→B→C→D): 基本Callback→タイムアウト→承認/却下分岐→SNS通知
– Terraform での完全構築(コンソール版とASL完全一致)

シリーズ前回記事:
– 第1回: AWS Step Functions 入門
– 第2回: ECS × Step Functions 入門
– 第3回: Step Functions エラーハンドリング完全ガイド
– 第4回: Step Functions 入出力データフロー制御完全ガイド


目次

  1. Callbackパターン(.waitForTaskToken)概念編
  2. アーキテクチャ解説
  3. AWSコンソールでのハンズオン
  4. Terraformでの構築
  5. 実践Tips(Callbackパターン設計ガイド)
  6. ハンズオン後の削除手順
  7. まとめと次のステップ

Section 1: Callbackパターン(.waitForTaskToken)概念編

1-1. Callbackパターンとは

AWS Step Functions には、外部リソースとの連携方法として .sync(同期).waitForTaskToken(Callback) の2つの統合パターンがある。

通常の .sync パターンでは、Step Functions が Lambda や ECS などを呼び出し、処理完了まで自動的に待機する。これは「Step Functions 側が完了を検知する」モデルだ。

一方、.waitForTaskToken(Callbackパターン) は「外部システムが完了を通知する」モデルである。Step Functions はタスクトークンと呼ばれる一意の識別子を発行し、それを外部システムに渡したうえで実行を 停止(pause) する。外部システムは処理完了後に SendTaskSuccess または SendTaskFailure API を呼び出すことで、Step Functions の実行が再開される。

主な用途:
– 人間の承認が必要なフロー(上長承認、コンプライアンスチェックなど)
– 外部 API の非同期応答待ち(決済処理、審査システムなど)
– SQS / API Gateway 経由のマイクロサービス間非同期連携
– Step Functions のタイムアウト内では完了しない長時間処理


1-2. .sync vs .waitForTaskToken 比較表

特性.sync.waitForTaskToken
実行モード同期(完了まで自動待機)非同期(トークン返送まで停止)
最大待機時間1年(Standard Workflow)1年(TimeoutSeconds 設定推奨)
ユースケースLambda / ECS / Glue 等の完了待ち人間承認・外部 API・SQS / API GW 経由の非同期処理
タスクトークン不要必須($$.Task.Token
完了通知SF が自動検知外部システムが SendTaskSuccess / SendTaskFailure を呼ぶ
HeartbeatSeconds一部リソースのみ対応利用可能(長時間処理では推奨)

1-3. タスクトークンのライフサイクル(最重要)

Callback パターンの核心は タスクトークン にある。以下がその一連の流れだ。

① SF: タスクトークン発行($$.Task.Token) ↓② SF: トークンを SQS / API GW 経由で外部システムに送信 → 実行を停止 ↓(外部システムが処理中 — 数秒〜数日)③ 外部: SendTaskSuccess(token, output) または SendTaskFailure(token, error) を呼ぶ ↓④ SF: 停止から再開 → 次のステートへ進む

タスクトークンの重要な特性:

特性内容
形式推測不可能な不透明文字列(内部識別子)
最大長2,048 文字
有効期間最大 1 年(ワークフロー実行期間と同じ)
使用回数一度限り(SendTaskSuccess / SendTaskFailure のどちらか一方のみ有効)
スコープ同一 AWS アカウント内のみ(クロスアカウント不可)
保管責任外部システム側が安全に保管する責任を持つ

ポイント: トークンは $$.Task.Token で Parameters に埋め込む。第4弾(データフロー制御)で学んだ Context Object($$)の知識がここで直接活きる。


1-4. SendTaskSuccess / SendTaskFailure / SendTaskHeartbeat API

SendTaskSuccess — 処理成功を通知

aws stepfunctions send-task-success \  --task-token "TOKEN" \  --task-output '{"approved": true, "approver": "manager"}'
パラメータ必須制限
task-token1〜2,048 文字
task-output最大 262,144 バイト(256 KB)の JSON 文字列

task-output の内容は ResultPath に格納される。

SendTaskFailure — 処理失敗を通知

aws stepfunctions send-task-failure \  --task-token "TOKEN" \  --error "ExpenseRejected" \  --cause "却下理由: 金額が承認限度額を超過"
パラメータ必須制限
task-token1〜2,048 文字
error任意最大 256 文字(Catch の ErrorEquals で捕捉可能)
cause任意最大 32,768 文字

error フィールドは Catch ブロックで ErrorEquals: ["ExpenseRejected"] のように捕捉できる。

SendTaskHeartbeat — 生存確認を送信

aws stepfunctions send-task-heartbeat \  --task-token "TOKEN"

HeartbeatSeconds のタイマーをリセットする「生存確認」用 API。長時間処理中に定期的に呼び出すことで States.HeartbeatTimeout を防ぐ。成功時はレスポンスボディなしの HTTP 200 が返る(実行履歴イベントは生成されない)。

共通エラーコード:

エラー意味
InvalidTokenトークンの形式が不正
TaskDoesNotExistトークンが有効なタスクに対応しない
TaskTimedOutタスクがタイムアウト済みまたは完了済み

1-5. HeartbeatSeconds と TimeoutSeconds の Callback 固有挙動

パラメータ意味タイムアウト時のエラー
TimeoutSecondsタスク全体のタイムアウト(SendTaskSuccess / Failure の最終期限)States.Timeout
HeartbeatSeconds連続する Heartbeat 間隔の最大値States.HeartbeatTimeout

設定例:

{  "Type": "Task",  "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",  "HeartbeatSeconds": 3600,  "TimeoutSeconds": 86400,  "Parameters": { "QueueUrl": "<SQS_QUEUE_URL>", "MessageBody": {"task_token.$": "$$.Task.Token" }  }}

重要な設計指針:

  1. HeartbeatSeconds < TimeoutSeconds を必ず守る(例: Heartbeat=3600、Timeout=86400)
  2. HeartbeatSeconds は外部システムが定期的に生存確認を送れる場合のみ設定する
  3. TimeoutSeconds は SQS メッセージの可視性タイムアウトとは独立した別の概念
  4. TimeoutSeconds のデフォルト値は 99,999,999 秒(約 3.17 年)だが、明示的に設定することを推奨
  5. SendTaskHeartbeatHeartbeatSeconds タイマーをリセットするが、TimeoutSeconds は延長しない

1-6. ユースケース一覧

ユースケース説明Callback を使う理由
人間の承認フロー上長承認、リリース承認、コンプライアンスチェック承認まで数時間〜数日かかる可能性
外部 API 非同期応答外部決済処理、外部審査システムの応答待ち外部 API が非同期モデルを採用
サービス間オーケストレーションマイクロサービス間の非同期連携(SQS / EventBridge 経由)受信側サービスが完了タイミングを制御
手動データ検証オペレーターが目視確認後に処理を再開するフロー人間の判断が必要
支払い処理待ちユーザーが支払いフォームに入力するまでの待機ユーザー操作完了まで時間不定

1-7. Context Object でのトークン取得方法

$$.Task.TokenContext Object$$)経由で取得する。第4弾(データフロー制御)で学んだ Context Object の知識がここで直接活きる。

{  "WaitForApproval": { "Type": "Task", "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken", "Parameters": {"QueueUrl": "<SQS_QUEUE_URL>","MessageBody": {  "task_token.$": "$$.Task.Token",  "expense_id.$": "$.expense_id"} }  }}

構文のポイント:

記法意味
"task_token.$".$ サフィックス = 値を動的参照(パス/Context Object)
"$$.Task.Token"$$ = Context Object、.Task.Token = 現在のタスクのトークン
"expense_id.$": "$.expense_id"$ = 入力 JSON から値を参照

注意: $$.Task.TokenParameters および ResultSelector の中でのみ参照可能。InputPathOutputPath では使用できない。


Section 1 まとめ:
.waitForTaskToken は「外部システムが完了を通知する」非同期パターン
– タスクトークンは一度限り有効、最大 2,048 文字、有効期間最大 1 年
HeartbeatSecondsStates.HeartbeatTimeout)と TimeoutSecondsStates.Timeout)は独立したタイムアウト機構
$$.Task.TokenParameters / ResultSelector 内でのみ参照可能


Section 2: アーキテクチャ解説

2-1. ハンズオンシナリオ概要

本ハンズオンでは 経費承認ワークフロー をシナリオとして使用します。社員が経費を申請し、上長が承認または却下し、結果をメールで通知するという実務でよく見られる非同期承認フローを AWS Step Functions の Callbackパターン(.waitForTaskToken)で実装します。

ワークフローの概要

フェーズ処理担当コンポーネント
申請受付経費情報をDynamoDBに保存し、承認依頼をSQSへ送信SubmitExpense Lambda
承認待機タスクトークンを発行し実行を停止WaitForApproval(Callback)
上長確認SQSからメッセージを取得し、CLIで承認/却下を返送承認者(SQS + SF API)
分岐処理承認/却下/タイムアウトで後続ステートへ分岐RouteApproval(Choice)
完了通知申請者へSNS経由でメール通知NotifyApproved/Rejected/Timeout

各コンポーネントの役割

  • Step Functions State Machine — ワークフロー全体のオーケストレーター。.waitForTaskToken を使い、承認者の返答まで実行を停止する
  • SQS キュー — タスクトークンを埋め込んだメッセージを承認者へ配送するメッセージバッファ
  • DynamoDB — 経費申請データ(申請者、金額、申請日時)を永続化
  • SNS — 承認/却下/タイムアウト通知を申請者へメール配信
  • Lambda — 経費受付処理と経理システムへの反映処理を担当

2-2. 全体アーキテクチャ

全体アーキテクチャ

上図は経費承認ワークフロー全体のアーキテクチャを示しています。

ワークフローは Lambda または EventBridge Scheduler によってトリガーされ、Step Functions State Machine が処理を開始します。State Machine 内部は縦に配置された5つの主要ステートで構成されます。

Callbackの核心: WaitForApproval ステートは ★停止・待機ポイント です。このステートに到達すると Step Functions はタスクトークンを生成し、SQSへメッセージを送信した後、外部から SendTaskSuccess または SendTaskFailure が呼ばれるまで実行を停止します。この待機中にも Step Functions の状態は保持され、課金は発生しません(Standard Workflow の場合)。

承認者はSQSキューからメッセージを取得し、タスクトークンを使って aws stepfunctions send-task-success を実行することでワークフローを再開させます。


2-3. Callbackフロー詳解

Callbackフロー

Callbackパターンのデータフローを上段(発行・送信・停止)と下段(承認・返送・再開)に分けて示しています。

タスクトークンの発行

State Definition で .waitForTaskToken を指定したリソースが呼ばれると、Step Functions は自動的にタスクトークン(約1KBの不透明な文字列)を生成します。このトークンは $$.Task.Token という組み込み変数で参照できます。

"WaitForApproval": {  "Type": "Task",  "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",  "Parameters": { "QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789/approval-queue", "MessageBody": {"TaskToken.$": "$$.Task.Token","ExpenseId.$": "$.expenseId","Amount.$": "$.amount" }  },  "HeartbeatSeconds": 86400,  "Next": "RouteApproval"}

停止と待機

SQSへのメッセージ送信が完了した時点で、ステートの実行は停止します。State Machine は RUNNING 状態を維持したまま、タスクトークンが返送されるのを待ちます。Standard Workflow では最大1年間待機でき、HeartbeatSeconds でハートビートタイムアウトを設定できます。

トークンの返送と再開

承認者はSQSメッセージからタスクトークンを取得し、以下のいずれかを実行します:

コマンド意味後続動作
send-task-success承認(結果JSONを渡す)RouteApprovalステートへ進む
send-task-failureシステムエラーエラーハンドラへ
# 承認例aws stepfunctions send-task-success \  --task-token "AQC..." \  --task-output '{"approved": true, "approver": "manager-a", "comment": "承認します"}'# 却下例aws stepfunctions send-task-success \  --task-token "AQC..." \  --task-output '{"approved": false, "approver": "manager-a", "comment": "金額が上限超過"}'

2-4. 承認/却下シーケンス

承認シーケンス

承認フロー(通常系)

  1. SF → SubmitExpense Lambda: 経費申請を受け付け、DynamoDBに保存
  2. Lambda → SF: 申請ID・金額などを返却
  3. SF → SQS: タスクトークンを埋め込んだメッセージを送信
  4. SF: ⏸ 実行停止(待機開始)— WaitForApproval ステートで停止
  5. 承認者 → SQS: SQSキューからメッセージを受信(ポーリング or Lambda トリガー)
  6. 承認者 → SF: SendTaskSuccess {"approved": true, "approver": "manager"} を送信
  7. SF: ▶ 実行再開(待機終了)— RouteApproval ステートへ進む
  8. SF → ProcessExpense Lambda: 経理システムへ反映
  9. SF → SNS: 承認完了通知を送信 → 申請者へメール
  10. SF: SUCCEEDED終了

却下フロー

ステップ6で {"approved": false} が返送されると、RouteApproval(Choiceステート)の条件分岐により却下パスに進み、NotifyRejected(SNS)を経てFAILED終了します。

"RouteApproval": {  "Type": "Choice",  "Choices": [ {"Variable": "$.approved","BooleanEquals": true,"Next": "ProcessExpense" }, {"Variable": "$.approved","BooleanEquals": false,"Next": "NotifyRejected" }  ]}

タイムアウトフロー

WaitForApproval ステートに HeartbeatSeconds: 86400(24時間)を設定することで、承認が一定時間行われない場合に States.HeartbeatTimeout エラーが発生します。Catch ブロックで補足し NotifyTimeout(SNS)へ分岐させることで、申請者・申請者管理者へタイムアウト通知を送ります。

"WaitForApproval": {  ...,  "HeartbeatSeconds": 86400,  "Catch": [ {"ErrorEquals": ["States.HeartbeatTimeout"],"Next": "NotifyTimeout" }  ]}

ポイント: HeartbeatSeconds はトークン返送のタイムアウトを制御します。ワークフロー全体の最大実行時間は TimeoutSeconds で別途制御できます。Standard Workflow のデフォルト最大実行時間は1年です。


Section 3: AWSコンソールでのハンズオン

このセクションでは、AWSコンソールを使って経費承認ワークフローを実際に構築します。
「承認ボタンを押す」体験をAWS CLIで行うことが、このハンズオンの核心です。

ステートマシンはStep A→B→C→Dと段階的に機能を追加しながら構築します。最終形(Step D)では、SQS Callback + 承認/却下分岐 + SNS通知が全て揃った完成形を体験できます。


3-1. 前提条件

必要なAWS権限

以下のサービスを操作できるIAM権限が必要です。

サービス必要な権限
AWS Lambda関数の作成・編集・実行
AWS Step Functionsステートマシンの作成・実行
Amazon SQSキューの作成・メッセージ受信
Amazon SNSトピックの作成・サブスクリプション管理
AWS IAMロール・ポリシーの作成
Amazon CloudWatch Logsログの閲覧

AWS CLI の準備

このハンズオンでは、SendTaskSuccess/SendTaskFailure を AWS CLI で実行します。
コンソールからは送信できないため、CLIのインストールと設定が必須です。

# バージョン確認aws --version# aws-cli/2.x.x Python/3.x.x ...# 認証確認aws sts get-caller-identity

認証情報が正しく設定されていれば、アカウントIDとARNが表示されます。


3-2. Lambda 2関数のデプロイ

経費承認ワークフローでは2つのLambda関数を使用します。

関数名役割
submit_expense経費申請を受け付け、ステータスをpendingに設定
process_expense承認後の経理処理(DynamoDB書き込みなど)を実行

submit_expense 関数の作成

コンソール操作手順:

  1. AWSコンソールで Lambda を開く
  2. 「関数の作成」をクリック
  3. 設定値:
  4. 作成方法: 一から作成
  5. 関数名: submit_expense
  6. ランタイム: Python 3.12
  7. アーキテクチャ: x86_64
  8. 「関数の作成」をクリック
  9. コードエディタに以下を貼り付け:
import jsonimport osimport boto3from datetime import datetimedef lambda_handler(event, context): expense_id = event.get("expense_id") amount = event.get("amount", 0) description = event.get("description", "") applicant = event.get("applicant", "unknown") if not expense_id:  raise ValueError("expense_id は必須です") if amount <= 0:  raise ValueError("amountは正の値が必要です") # DynamoDB への保存(オプション — Step Dで有効化) # dynamodb = boto3.resource("dynamodb") # table = dynamodb.Table(os.environ.get("EXPENSE_TABLE", "expense-requests")) # table.put_item(Item={ #  "expense_id": expense_id, #  "amount": amount, #  "description": description, #  "applicant": applicant, #  "status": "pending", #  "submitted_at": datetime.utcnow().isoformat() # }) print(f"経費申請受付: expense_id={expense_id}, amount={amount}, applicant={applicant}") return {  "expense_id": expense_id,  "status": "pending",  "submitted_at": datetime.utcnow().isoformat() }
  1. 「Deploy」をクリックして保存
  2. 関数ARNをメモ(例: arn:aws:lambda:ap-northeast-1:123456789012:function:submit_expense
    <LAMBDA_ARN_SUBMIT_EXPENSE> プレースホルダーに使用

process_expense 関数の作成

同様の手順で2つ目の関数を作成します。

  1. Lambda コンソール → 「関数の作成」
  2. 設定値:
  3. 関数名: process_expense
  4. ランタイム: Python 3.12
  5. コードエディタに以下を貼り付け:
import jsonimport osfrom datetime import datetimedef lambda_handler(event, context): expense_id = event.get("expense_id") amount = event.get("amount", 0) approver = event.get("approver", "unknown") print(f"経理処理実行: expense_id={expense_id}, amount={amount}, approver={approver}") # 経理システムへの連携処理(ハンズオンではモック) return {  "expense_id": expense_id,  "amount": amount,  "approver": approver,  "processed_at": datetime.utcnow().isoformat(),  "accounting_ref": f"ACC-{expense_id}-001" }
  1. 「Deploy」をクリックして保存
  2. 関数ARNをメモ
    <LAMBDA_ARN_PROCESS_EXPENSE> プレースホルダーに使用

3-3. SQSキュー作成

Callbackパターンの中心となるSQSキューを作成します。

コンソール操作手順:

  1. AWSコンソールで Amazon SQS を開く
  2. 「キューの作成」をクリック
  3. 設定値:
  4. タイプ: スタンダード
  5. 名前: expense-approval-queue
  6. 可視性タイムアウト: 300秒(デフォルト30秒から変更推奨)
    > 可視性タイムアウトが短いと、承認者がメッセージを処理中に他のコンシューマーが再取得してしまいます。
  7. その他はデフォルトのまま「キューの作成」をクリック
  8. 作成後、キュー詳細画面から キューURL をメモ
  9. 例: https://sqs.ap-northeast-1.amazonaws.com/123456789012/expense-approval-queue
  10. <SQS_QUEUE_URL> プレースホルダーに使用

3-4. SNSトピック作成とメールサブスクリプション

承認/却下/タイムアウトの通知を送るSNSトピックを作成します(Step Dで使用)。

コンソール操作手順:

  1. AWSコンソールで Amazon SNS を開く
  2. 「トピックの作成」をクリック
  3. 設定値:
  4. タイプ: スタンダード
  5. 名前: expense-notification
  6. 「トピックの作成」をクリック
  7. トピックARNをメモ
  8. 例: arn:aws:sns:ap-northeast-1:123456789012:expense-notification
  9. <SNS_TOPIC_ARN> プレースホルダーに使用

メールサブスクリプションの設定:

  1. 作成したトピックの詳細画面 → 「サブスクリプションの作成」
  2. 設定値:
  3. プロトコル: Eメール
  4. エンドポイント: 通知を受け取るメールアドレスを入力
  5. 「サブスクリプションの作成」をクリック
  6. 入力したメールアドレスに確認メールが届く
  7. メール内の 「Confirm subscription」リンクをクリック

    このステップを忘れると、SNS通知がメールに届きません。


3-5. IAMロール作成

Step FunctionsがLambda・SQS・SNSを操作するためのIAMロールを作成します。

コンソール操作手順:

  1. AWSコンソールで IAM を開く → 「ロール」→「ロールを作成」
  2. 信頼されたエンティティの選択:
  3. タイプ: AWSのサービス
  4. ユースケース: 「Step Functions」を選択(リストにない場合は「カスタム信頼ポリシー」を使用)
  5. 信頼ポリシー(カスタム選択時):
    json
    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Principal": {
    "Service": "states.amazonaws.com"
    },
    "Action": "sts:AssumeRole"
    }
    ]}
  6. ロール名: sf-expense-execution-role
  7. ロール作成後、「インラインポリシーを追加」から以下を設定:
{  "Version": "2012-10-17",  "Statement": [ {"Sid": "InvokeLambda","Effect": "Allow","Action": "lambda:InvokeFunction","Resource": [  "<LAMBDA_ARN_SUBMIT_EXPENSE>",  "<LAMBDA_ARN_PROCESS_EXPENSE>"] }, {"Sid": "SendSQSMessage","Effect": "Allow","Action": "sqs:SendMessage","Resource": "<SQS_QUEUE_ARN>" }, {"Sid": "PublishSNS","Effect": "Allow","Action": "sns:Publish","Resource": "<SNS_TOPIC_ARN>" }, {"Sid": "CloudWatchLogs","Effect": "Allow","Action": [  "logs:CreateLogGroup",  "logs:CreateLogDelivery",  "logs:PutLogEvents",  "logs:DescribeLogGroups",  "logs:DescribeLogStreams",  "logs:GetLogDelivery",  "logs:ListLogDeliveries",  "logs:UpdateLogDelivery",  "logs:DeleteLogDelivery",  "logs:PutResourcePolicy",  "logs:DescribeResourcePolicies"],"Resource": "*" }  ]}

SQS_QUEUE_ARN について: キューのARNはキューURL とは異なります。SQSコンソールでキューの詳細を開くと「ARN」フィールドで確認できます(例: arn:aws:sqs:ap-northeast-1:123456789012:expense-approval-queue)。

  1. ポリシー名: sf-expense-policy として保存

3-6. ステートマシンを段階的に構築(記事の核心)

ここからがこのハンズオンの核心です。Step A→D と段階的にステートマシンを構築し、Callbackパターンの仕組みを体験的に理解します。


Step A: 基本Callback — 承認体験を初めて体感する

まず最もシンプルな形でCallbackパターンを体験します。

ステートマシン作成手順:

  1. AWSコンソールで Step Functions を開く
  2. 「ステートマシンの作成」をクリック
  3. 「コードでワークフローを記述」を選択
  4. 以下のASLを貼り付け(<プレースホルダー> を実際の値に置換):
{  "Comment": "経費承認ワークフロー — Step A(基本Callback)",  "StartAt": "SubmitExpense",  "States": { "SubmitExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>","ResultPath": "$.submission","Next": "WaitForApproval" }, "WaitForApproval": {"Type": "Task","Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken","Parameters": {  "QueueUrl": "<SQS_QUEUE_URL>",  "MessageBody": { "task_token.$": "$$.Task.Token", "expense_id.$": "$.expense_id", "amount.$": "$.amount"  }},"ResultPath": "$.approval","Next": "ProcessExpense" }, "ProcessExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>","ResultPath": "$.processing","Next": "ExpenseApproved" }, "ExpenseApproved": {"Type": "Succeed" }  }}

$$.Task.Token がCallbackの核心: $$ はStep Functionsのコンテキストオブジェクトへのアクセスです。実行時に一意のトークンが生成され、SQSメッセージに埋め込まれます。このトークンを後でSendTaskSuccessに渡すことで、ワークフローが再開します。

  1. 設定値:
  2. ステートマシン名: expense-approval-step-a
  3. 実行ロール: sf-expense-execution-role
  4. ログ設定: ALL レベル を推奨(待機中の挙動が CloudWatch で確認できます)
  5. 「ステートマシンの作成」をクリック

実行テスト:

  1. 「実行の開始」をクリック
  2. 入力JSONに以下を貼り付け:
{  "expense_id": "EXP-001",  "amount": 50000,  "description": "出張交通費",  "applicant": "yamada.taro"}
  1. 「実行の開始」をクリック

WaitForApproval で止まることを確認:

実行グラフで WaitForApproval ステートが青色(RUNNING)のまま止まっているはずです。
これがCallbackパターンの動作です — Step Functionsは外部からの応答を待って一時停止しています。

SQSからtask_tokenを取得:

  1. SQSコンソール → expense-approval-queue を開く
  2. 「メッセージを送受信」→「メッセージをポーリング」をクリック
  3. 受信したメッセージの本文を確認:
    json
    {
    "task_token": "AQCAAAAKAAAAAAAAADg...(長い文字列)",
    "expense_id": "EXP-001",
    "amount": 50000
    }
  4. task_token の値をコピー(非常に長い文字列です)

★ 承認体験(記事の核心)

AWS CLIで以下を実行します。これが「承認ボタンを押す」体験です:

aws stepfunctions send-task-success \  --task-token "SQSメッセージから取得したtask_token" \  --task-output '{"approved": true, "approver": "manager.suzuki"}'

コマンド実行後、Step Functions コンソールに戻ると:
WaitForApproval → SUCCEEDED
ProcessExpense → 実行中
ExpenseApproved → SUCCEEDED

実行全体が SUCCEEDED になることを確認してください。

これがCallbackパターンの基本動作です: Step Functionsはtask_tokenを発行してSQSにメッセージを送り、外部(今回はCLI)からSendTaskSuccessが届くまで待機します。実際のシステムでは「CLIコマンド」の部分が「承認者がWebアプリで承認ボタンを押す」操作に相当します。


Step B: タイムアウト付き — HeartbeatSeconds / TimeoutSeconds を体験

ステートマシン作成手順:

  1. 「ステートマシンの作成」→「コードでワークフローを記述」
  2. 以下のASLを貼り付け:
{  "Comment": "経費承認ワークフロー — Step B(タイムアウト付き)",  "StartAt": "SubmitExpense",  "States": { "SubmitExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>","ResultPath": "$.submission","Next": "WaitForApproval" }, "WaitForApproval": {"Type": "Task","Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken","Parameters": {  "QueueUrl": "<SQS_QUEUE_URL>",  "MessageBody": { "task_token.$": "$$.Task.Token", "expense_id.$": "$.expense_id", "amount.$": "$.amount"  }},"HeartbeatSeconds": 3600,"TimeoutSeconds": 86400,"ResultPath": "$.approval","Catch": [  { "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"], "ResultPath": "$.error", "Next": "ExpenseTimeout"  }],"Next": "ProcessExpense" }, "ProcessExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>","ResultPath": "$.processing","Next": "ExpenseApproved" }, "ExpenseApproved": {"Type": "Succeed" }, "ExpenseTimeout": {"Type": "Fail","Error": "ApprovalTimeout","Cause": "承認がタイムアウトしました" }  }}
  1. ステートマシン名: expense-approval-step-b
  2. 「ステートマシンの作成」をクリック

タイムアウト値の意味:

パラメータ意味
HeartbeatSeconds36001時間以内にハートビートがなければタイムアウト
TimeoutSeconds8640024時間以内にSendTaskSuccess/Failureがなければタイムアウト

補足テスト — タイムアウト体験:

本番用ASLのTimeoutSeconds=86400(24時間)は変更せず、別途タイムアウトを体験するための短縮版を使います。新しいステートマシンで TimeoutSeconds30 に変更してテストしてください:

"WaitForApproval": {  "Type": "Task",  "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",  "Parameters": { ... },  "HeartbeatSeconds": 30,  "TimeoutSeconds": 30,  "Catch": [ {"ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],"ResultPath": "$.error","Next": "ExpenseTimeout" }  ],  "Next": "ProcessExpense"}

SendTaskSuccessを送らずに30秒待つと、States.Timeout エラーが発生し、Catch により ExpenseTimeout(Fail)ステートに遷移します。


Step C: 承認/却下分岐 — SendTaskFailure体験

ステートマシン作成手順:

  1. 「ステートマシンの作成」→「コードでワークフローを記述」
  2. 以下のASLを貼り付け:
{  "Comment": "経費承認ワークフロー — Step C(承認/却下分岐)",  "StartAt": "SubmitExpense",  "States": { "SubmitExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>","ResultPath": "$.submission","Next": "WaitForApproval" }, "WaitForApproval": {"Type": "Task","Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken","Parameters": {  "QueueUrl": "<SQS_QUEUE_URL>",  "MessageBody": { "task_token.$": "$$.Task.Token", "expense_id.$": "$.expense_id", "amount.$": "$.amount"  }},"HeartbeatSeconds": 3600,"TimeoutSeconds": 86400,"ResultPath": "$.approval","Catch": [  { "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"], "ResultPath": "$.error", "Next": "ExpenseTimeout"  }],"Next": "RouteApproval" }, "RouteApproval": {"Type": "Choice","Choices": [  { "Variable": "$.approval.approved", "BooleanEquals": true, "Next": "ProcessExpense"  }],"Default": "ExpenseRejected" }, "ProcessExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>","ResultPath": "$.processing","Next": "ExpenseApproved" }, "ExpenseApproved": {"Type": "Succeed" }, "ExpenseRejected": {"Type": "Fail","Error": "ExpenseRejected","Cause": "経費申請が却下されました" }, "ExpenseTimeout": {"Type": "Fail","Error": "ApprovalTimeout","Cause": "承認がタイムアウトしました" }  }}
  1. ステートマシン名: expense-approval-step-c

承認テスト(approved: true):

aws stepfunctions send-task-success \  --task-token "TOKEN" \  --task-output '{"approved": true, "approver": "manager.suzuki"}'

RouteApprovalapproved == trueProcessExpenseExpenseApproved (SUCCEEDED)

却下テスト(approved: false):

aws stepfunctions send-task-success \  --task-token "TOKEN" \  --task-output '{"approved": false, "reason": "金額が承認限度額を超過"}'

RouteApprovalDefaultExpenseRejected (FAILED) に遷移します。

SendTaskSuccess で却下を表現する: approved: false を含む正常な応答を返すことで、Choice ステートでルーティングします。これは「処理は正常に完了した(タスクトークンは有効に解決した)が、ビジネスロジック的に却下された」という設計です。

SendTaskFailure 体験:

SendTaskFailure は「外部処理自体が失敗した」場合に使います:

aws stepfunctions send-task-failure \  --task-token "TOKEN" \  --error "ExpenseRejected" \  --cause "却下理由: 経費規程違反"

WaitForApprovalCatchErrorEquals: ["ExpenseRejected"] を追加していれば捕捉できます。
このパターンは「承認システム側でエラーが発生した」ケースに有効です。


Step D: 最終形 — SNS通知付き完成形

ステートマシン作成手順:

  1. 「ステートマシンの作成」→「コードでワークフローを記述」
  2. 以下のASLを貼り付け(<SNS_TOPIC_ARN> を実際のARNに置換):
{  "Comment": "経費承認ワークフロー — Step D(最終形: SQS Callback+承認・却下分岐+SNS通知)",  "StartAt": "SubmitExpense",  "States": { "SubmitExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>","Parameters": {  "expense_id.$": "$.expense_id",  "amount.$": "$.amount",  "description.$": "$.description",  "applicant.$": "$.applicant"},"ResultPath": "$.submission","Next": "WaitForApproval" }, "WaitForApproval": {"Type": "Task","Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken","Parameters": {  "QueueUrl": "<SQS_QUEUE_URL>",  "MessageBody": { "task_token.$": "$$.Task.Token", "expense_id.$": "$.expense_id", "amount.$": "$.amount", "applicant.$": "$.applicant", "message": "経費申請の承認をお願いします"  }},"HeartbeatSeconds": 3600,"TimeoutSeconds": 86400,"ResultPath": "$.approval","Catch": [  { "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"], "ResultPath": "$.error", "Next": "NotifyTimeout"  }],"Next": "RouteApproval" }, "RouteApproval": {"Type": "Choice","Choices": [  { "Variable": "$.approval.approved", "BooleanEquals": true, "Next": "ProcessExpense"  }],"Default": "NotifyRejected" }, "ProcessExpense": {"Type": "Task","Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>","Parameters": {  "expense_id.$": "$.expense_id",  "amount.$": "$.amount",  "approver.$": "$.approval.approver"},"ResultPath": "$.processing","Next": "NotifyApproved" }, "NotifyApproved": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": {  "TopicArn": "<SNS_TOPIC_ARN>",  "Message.$": "States.Format('経費申請 {} が承認されました(承認者: {})', $.expense_id, $.approval.approver)",  "Subject": "経費申請 承認完了通知"},"ResultPath": null,"Next": "ExpenseApproved" }, "NotifyRejected": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": {  "TopicArn": "<SNS_TOPIC_ARN>",  "Message.$": "States.Format('経費申請 {} が却下されました', $.expense_id)",  "Subject": "経費申請 却下通知"},"ResultPath": null,"Next": "ExpenseRejected" }, "NotifyTimeout": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": {  "TopicArn": "<SNS_TOPIC_ARN>",  "Message.$": "States.Format('経費申請 {} の承認がタイムアウトしました', $.expense_id)",  "Subject": "経費申請 承認タイムアウト"},"ResultPath": null,"Next": "ExpenseTimeout" }, "ExpenseApproved": {"Type": "Succeed" }, "ExpenseRejected": {"Type": "Fail","Error": "ExpenseRejected","Cause": "経費申請が却下されました" }, "ExpenseTimeout": {"Type": "Fail","Error": "ApprovalTimeout","Cause": "承認がタイムアウトしました" }  }}
  1. ステートマシン名: expense-approval-final

3つのシナリオをテスト:

① 承認フロー:

aws stepfunctions send-task-success \  --task-token "TOKEN" \  --task-output '{"approved": true, "approver": "manager.suzuki"}'

NotifyApproved → SNS メール受信(件名: 「経費申請 承認完了通知」)→ ExpenseApproved (SUCCEEDED)

② 却下フロー:

aws stepfunctions send-task-success \  --task-token "TOKEN" \  --task-output '{"approved": false, "reason": "金額超過"}'

NotifyRejected → SNS メール受信(件名: 「経費申請 却下通知」)→ ExpenseRejected (FAILED)

③ タイムアウトフロー(補足テスト):

本番ASLのTimeoutSeconds=86400は変更せず、TimeoutSeconds: 30 の短縮版で確認します:

SendTaskSuccessを送らずに30秒待つ→ States.Timeout 発生→ NotifyTimeout → SNS メール受信(件名: 「経費申請 承認タイムアウト」)→ ExpenseTimeout (FAILED)

States.Format について: Step Functionsの組み込み関数で、動的な文字列を生成します。'経費申請 {} が承認されました', $.expense_id のように、{} が変数の値に置換されます。SNS通知のメッセージを動的に生成するのに便利です。


3-7. CloudWatch Logsで待機→再開の挙動を確認

Step Functionsの実行ログを確認することで、Callbackパターンの内部動作を詳しく把握できます。

ログ確認手順

  1. Step Functionsコンソール → 対象の実行を開く
  2. 「実行の詳細」タブ → 「ログ」セクションを確認
  3. CloudWatch Logsのリンクをクリックして詳細ログを表示

確認すべきイベントの流れ

ログで以下のシーケンスを確認してください:

WaitForApprovalStateEntered ← WaitForApproval ステート開始TaskSubmitted← SQSにメッセージ送信 + Task Token発行(...長い待機時間...)TaskSucceeded← SendTaskSuccess を受信WaitForApprovalStateExited  ← WaitForApproval ステート終了ProcessExpenseStateEntered  ← 次のステートへ遷移

ExecutionHistory APIで待機時間を計測

AWS CLIを使ってプログラマティックに実行履歴を取得し、待機時間を計算できます:

# 実行ARNを確認aws stepfunctions list-executions \  --state-machine-arn "<YOUR_STATE_MACHINE_ARN>" \  --query 'executions[0].executionArn' \  --output text# 実行履歴を取得aws stepfunctions get-execution-history \  --execution-arn "<EXECUTION_ARN>" \  --query 'events[?type==`TaskSubmitted` || type==`TaskSucceeded`].[timestamp,type]' \  --output table

出力例:

-----------------------------------------| timestamp  | type |-----------------------------------------| 2026-04-12T10:00:00Z| TaskSubmitted || 2026-04-12T10:05:23Z| TaskSucceeded |-----------------------------------------

TaskSubmittedTaskSucceeded のタイムスタンプ差が、実際の承認待機時間です。
この例では 5分23秒 承認待ちだったことがわかります。

CloudWatch Metrics でモニタリング

本番環境では以下のメトリクスを監視することを推奨します:

メトリクス説明アラート推奨値
ExecutionsFailed失敗した実行数1以上でアラート
ExecutionsTimedOutタイムアウトした実行数1以上でアラート
ExecutionThrottledスロットリングされた実行数急増時にアラート

まとめ(Section 3)

このセクションでは、経費承認ワークフローを段階的に構築しました。

Step追加した機能学んだこと
A基本Callback$$.Task.Token の仕組み、SendTaskSuccessによる再開
BタイムアウトHeartbeatSeconds/TimeoutSeconds、States.Timeout の Catch
C承認/却下分岐Choice ステートでの approved フラグによるルーティング
DSNS通知States.Format を使った動的メッセージ、3シナリオの完全テスト

Callbackパターンの本質: Step Functionsはtask_tokenを外部に渡し、そのトークンが返ってくるまで待ちます。「承認者がボタンを押す」「外部システムが処理を完了する」「人間が確認する」など、任意の外部イベントでワークフローを再開できるのがCallbackパターンの強みです。

次のSection 4では、このワークフロー全体をTerraformで自動構築します。


Section 4: Terraformでの構築

Section 3のコンソール操作で構築した経費承認ワークフローを、今度はTerraformで再現します。インフラをコードで管理することで、環境の再現性が高まり、チーム開発での一貫した運用が可能になります。


4-1. 前提条件

  • Terraform 1.0以上インストール済み
  • AWS CLI設定済み(aws configure 実行済み)
  • 以下のIAM権限が付与されていること:
  • Lambda の作成・実行
  • IAM ロール・ポリシーの作成
  • SQS キューの作成・操作
  • SNS トピックの作成・サブスクライブ
  • Step Functions ステートマシンの作成・実行
  • CloudWatch Logs の作成

4-2. ディレクトリ構成

sf-callback/├── main.tf├── variables.tf├── outputs.tf├── lambda/│├── submit_expense.py│└── process_expense.py└── statemachine/ └── definition.json.tpl

作業ディレクトリを作成します:

mkdir -p sf-callback/lambda sf-callback/statemachinecd sf-callback

4-3. Lambda ソースコード

Section 3(コンソール版)と同じロジック構成の Lambda 関数です。

lambda/submit_expense.py

経費申請を受け付け、申請情報をそのまま返す関数です。

import jsonimport loggingfrom datetime import datetime, timezonelogger = logging.getLogger()logger.setLevel(logging.INFO)def lambda_handler(event, context): """ 経費申請受付 Lambda 受け取った申請情報を検証し、タイムスタンプを付与して返す """ logger.info(f"受信イベント: {json.dumps(event, ensure_ascii=False)}") expense_id = event.get("expense_id") amount = event.get("amount") description = event.get("description", "") applicant = event.get("applicant") # 基本バリデーション if not expense_id or amount is None or not applicant:  raise ValueError("expense_id, amount, applicant は必須パラメータです") if amount <= 0:  raise ValueError("amount は正の数値である必要があります") submitted_at = datetime.now(timezone.utc).isoformat() result = {  "expense_id": expense_id,  "amount": amount,  "description": description,  "applicant": applicant,  "submitted_at": submitted_at,  "status": "SUBMITTED", } logger.info(f"申請受付完了: {json.dumps(result, ensure_ascii=False)}") return result

lambda/process_expense.py

承認済みの経費を処理する関数です。

import jsonimport loggingfrom datetime import datetime, timezonelogger = logging.getLogger()logger.setLevel(logging.INFO)def lambda_handler(event, context): """ 経費処理 Lambda 承認済みの経費申請を処理し、処理結果を返す """ logger.info(f"受信イベント: {json.dumps(event, ensure_ascii=False)}") expense_id = event.get("expense_id") amount = event.get("amount") approver = event.get("approver") if not expense_id or amount is None or not approver:  raise ValueError("expense_id, amount, approver は必須パラメータです") processed_at = datetime.now(timezone.utc).isoformat() result = {  "expense_id": expense_id,  "amount": amount,  "approver": approver,  "processed_at": processed_at,  "status": "PROCESSED",  "payment_scheduled": True, } logger.info(f"経費処理完了: {json.dumps(result, ensure_ascii=False)}") return result

4-4. statemachine/definition.json.tpl

コンソール版(Section 3)の Step D と同じ ASL 構造です。

Terraform の templatefile() 関数を使用し、${...} 形式のプレースホルダーを実際の ARN・URL に置換します。

templatefile() のしくみ

# main.tf での使用例definition = templatefile("statemachine/definition.json.tpl", {  submit_expense_arn  = aws_lambda_function.submit_expense.arn,  process_expense_arn = aws_lambda_function.process_expense.arn,  sqs_queue_url = aws_sqs_queue.expense_approval.url,  sns_topic_arn = aws_sns_topic.expense_notification.arn})
プレースホルダー置換される値
${submit_expense_arn}submit_expense Lambda の ARN
${process_expense_arn}process_expense Lambda の ARN
${sqs_queue_url}SQS キューの URL
${sns_topic_arn}SNS トピックの ARN

statemachine/definition.json.tpl の内容

{  "Comment": "経費承認ワークフロー — Step D(最終形: SQS Callback+承認・却下分岐+SNS通知)",  "StartAt": "SubmitExpense",  "States": { "SubmitExpense": {"Type": "Task","Resource": "${submit_expense_arn}","Parameters": {  "expense_id.$": "$.expense_id",  "amount.$": "$.amount",  "description.$": "$.description",  "applicant.$": "$.applicant"},"ResultPath": "$.submission","Next": "WaitForApproval" }, "WaitForApproval": {"Type": "Task","Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken","Parameters": {  "QueueUrl": "${sqs_queue_url}",  "MessageBody": { "task_token.$": "$$.Task.Token", "expense_id.$": "$.expense_id", "amount.$": "$.amount", "applicant.$": "$.applicant", "message": "経費申請の承認をお願いします"  }},"HeartbeatSeconds": 3600,"TimeoutSeconds": 86400,"ResultPath": "$.approval","Catch": [  { "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"], "ResultPath": "$.error", "Next": "NotifyTimeout"  }],"Next": "RouteApproval" }, "RouteApproval": {"Type": "Choice","Choices": [  { "Variable": "$.approval.approved", "BooleanEquals": true, "Next": "ProcessExpense"  }],"Default": "NotifyRejected" }, "ProcessExpense": {"Type": "Task","Resource": "${process_expense_arn}","Parameters": {  "expense_id.$": "$.expense_id",  "amount.$": "$.amount",  "approver.$": "$.approval.approver"},"ResultPath": "$.processing","Next": "NotifyApproved" }, "NotifyApproved": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": {  "TopicArn": "${sns_topic_arn}",  "Message.$": "States.Format('経費申請 {} が承認されました(承認者: {})', $.expense_id, $.approval.approver)",  "Subject": "経費申請 承認完了通知"},"ResultPath": null,"Next": "ExpenseApproved" }, "NotifyRejected": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": {  "TopicArn": "${sns_topic_arn}",  "Message.$": "States.Format('経費申請 {} が却下されました', $.expense_id)",  "Subject": "経費申請 却下通知"},"ResultPath": null,"Next": "ExpenseRejected" }, "NotifyTimeout": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": {  "TopicArn": "${sns_topic_arn}",  "Message.$": "States.Format('経費申請 {} の承認がタイムアウトしました', $.expense_id)",  "Subject": "経費申請 承認タイムアウト"},"ResultPath": null,"Next": "ExpenseTimeout" }, "ExpenseApproved": {"Type": "Succeed" }, "ExpenseRejected": {"Type": "Fail","Error": "ExpenseRejected","Cause": "経費申請が却下されました" }, "ExpenseTimeout": {"Type": "Fail","Error": "ApprovalTimeout","Cause": "承認がタイムアウトしました" }  }}

ポイント: コンソール版 Step D と ASL 構造が完全一致しています。ステート名・遷移・Parameters・HeartbeatSeconds・TimeoutSeconds・Catch のすべてが同一です。Terraform では ${...} 部分のみ動的に補間されます。


4-5. variables.tf

variable "aws_region" {  description = "AWSリージョン"  default  = "ap-northeast-1"}variable "project_name" {  description = "プロジェクト名(リソース命名プレフィックスに使用)"  default  = "expense-approval"}variable "notification_email" {  description = "承認/却下/タイムアウト通知先メールアドレス"  type  = string}

4-6. main.tf(完全なコード)

terraform {  required_version = ">= 1.0"  required_providers { aws = {source  = "hashicorp/aws"version = "~> 5.0" } archive = {source  = "hashicorp/archive"version = "~> 2.0" }  }}provider "aws" {  region = var.aws_region}data "aws_caller_identity" "current" {}data "aws_region" "current" {}# ===================================================# Lambda ZIPパッケージ生成# ===================================================data "archive_file" "submit_expense" {  type  = "zip"  source_file = "${path.module}/lambda/submit_expense.py"  output_path = "${path.module}/.terraform/tmp/submit_expense.zip"}data "archive_file" "process_expense" {  type  = "zip"  source_file = "${path.module}/lambda/process_expense.py"  output_path = "${path.module}/.terraform/tmp/process_expense.zip"}# ===================================================# IAM ロール: Lambda 実行ロール# ===================================================resource "aws_iam_role" "lambda_role" {  name = "${var.project_name}-lambda-role"  assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{  Action = "sts:AssumeRole"  Effect = "Allow"  Principal = { Service = "lambda.amazonaws.com"  }} ]  })}resource "aws_iam_role_policy_attachment" "lambda_basic" {  role = aws_iam_role.lambda_role.name  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"}# ===================================================# IAM ロール: Step Functions 実行ロール# ===================================================resource "aws_iam_role" "sf_execution_role" {  name = "${var.project_name}-sf-execution-role"  assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{  Action = "sts:AssumeRole"  Effect = "Allow"  Principal = { Service = "states.amazonaws.com"  }} ]  })}resource "aws_iam_role_policy" "sf_policy" {  name = "${var.project_name}-sf-policy"  role = aws_iam_role.sf_execution_role.id  policy = jsonencode({ Version = "2012-10-17" Statement = [{  Sid = "InvokeLambda"  Effect = "Allow"  Action = ["lambda:InvokeFunction"]  Resource = [ aws_lambda_function.submit_expense.arn, aws_lambda_function.process_expense.arn  ]},{  Sid = "SendSQSMessage"  Effect = "Allow"  Action = ["sqs:SendMessage"]  Resource = [aws_sqs_queue.expense_approval.arn]},{  Sid = "PublishSNS"  Effect = "Allow"  Action = ["sns:Publish"]  Resource = [aws_sns_topic.expense_notification.arn]},{  Sid = "CloudWatchLogs"  Effect = "Allow"  Action = [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups"  ]  Resource = ["*"]} ]  })}# ===================================================# CloudWatch Logs グループ# ===================================================resource "aws_cloudwatch_log_group" "submit_expense" {  name  = "/aws/lambda/${var.project_name}-submit-expense"  retention_in_days = 7}resource "aws_cloudwatch_log_group" "process_expense" {  name  = "/aws/lambda/${var.project_name}-process-expense"  retention_in_days = 7}# ===================================================# Lambda 関数# ===================================================resource "aws_lambda_function" "submit_expense" {  filename= data.archive_file.submit_expense.output_path  function_name = "${var.project_name}-submit-expense"  role = aws_iam_role.lambda_role.arn  handler = "submit_expense.lambda_handler"  runtime = "python3.12"  source_code_hash = data.archive_file.submit_expense.output_base64sha256  depends_on = [ aws_iam_role_policy_attachment.lambda_basic, aws_cloudwatch_log_group.submit_expense  ]}resource "aws_lambda_function" "process_expense" {  filename= data.archive_file.process_expense.output_path  function_name = "${var.project_name}-process-expense"  role = aws_iam_role.lambda_role.arn  handler = "process_expense.lambda_handler"  runtime = "python3.12"  source_code_hash = data.archive_file.process_expense.output_base64sha256  depends_on = [ aws_iam_role_policy_attachment.lambda_basic, aws_cloudwatch_log_group.process_expense  ]}# ===================================================# SQS キュー# ===================================================resource "aws_sqs_queue" "expense_approval" {  name  = "${var.project_name}-approval"  # VisibilityTimeout は HeartbeatSeconds(3600)より短くしないこと  # ここでは 300 秒(5分)に設定(承認者がメッセージを処理する想定時間)  visibility_timeout_seconds = 300  message_retention_seconds  = 86400  # 1日}# ===================================================# SNS トピック&メールサブスクリプション# ===================================================resource "aws_sns_topic" "expense_notification" {  name = "${var.project_name}-notification"}resource "aws_sns_topic_subscription" "email" {  topic_arn = aws_sns_topic.expense_notification.arn  protocol  = "email"  endpoint  = var.notification_email}# ===================================================# Step Functions ステートマシン# ===================================================resource "aws_sfn_state_machine" "expense_approval" {  name  = "${var.project_name}-workflow"  role_arn = aws_iam_role.sf_execution_role.arn  definition = templatefile("statemachine/definition.json.tpl", { submit_expense_arn  = aws_lambda_function.submit_expense.arn, process_expense_arn = aws_lambda_function.process_expense.arn, sqs_queue_url = aws_sqs_queue.expense_approval.url, sns_topic_arn = aws_sns_topic.expense_notification.arn  })  depends_on = [ aws_iam_role_policy.sf_policy  ]}

4-7. outputs.tf

output "state_machine_arn" {  description = "Step Functions ステートマシンの ARN"  value = aws_sfn_state_machine.expense_approval.arn}output "sqs_queue_url" {  description = "承認待ち SQS キューの URL"  value = aws_sqs_queue.expense_approval.url}output "sns_topic_arn" {  description = "通知用 SNS トピックの ARN"  value = aws_sns_topic.expense_notification.arn}output "submit_expense_lambda_arn" {  description = "submit_expense Lambda の ARN"  value = aws_lambda_function.submit_expense.arn}output "process_expense_lambda_arn" {  description = "process_expense Lambda の ARN"  value = aws_lambda_function.process_expense.arn}

4-8. デプロイ手順

ファイルの準備ができたら、以下の順番でデプロイします。

# 1. 初期化(プロバイダーのダウンロード)terraform init# 2. 実行計画を確認terraform plan -var="notification_email=your@email.com"# 3. リソースを作成terraform apply -var="notification_email=your@email.com"

terraform apply 実行後、SNS サブスクリプションの確認メールが届きます。メール内の「Confirm subscription」リンクをクリックしてください。これをしないと通知メールが届きません。

terraform apply が成功すると、各リソースの ARN・URL が出力されます:

Outputs:state_machine_arn  = "arn:aws:states:ap-northeast-1:123456789012:stateMachine:expense-approval-workflow"sqs_queue_url= "https://sqs.ap-northeast-1.amazonaws.com/123456789012/expense-approval-approval"sns_topic_arn= "arn:aws:sns:ap-northeast-1:123456789012:expense-approval-notification"submit_expense_lambda_arn= "arn:aws:lambda:ap-northeast-1:123456789012:function:expense-approval-submit-expense"process_expense_lambda_arn  = "arn:aws:lambda:ap-northeast-1:123456789012:function:expense-approval-process-expense"

4-9. 動作確認(CLI)

Step 1: 経費申請を実行する

# ステートマシン ARN を取得STATE_MACHINE_ARN=$(terraform output -raw state_machine_arn)SQS_QUEUE_URL=$(terraform output -raw sqs_queue_url)# 経費申請を開始EXECUTION=$(aws stepfunctions start-execution \  --state-machine-arn "$STATE_MACHINE_ARN" \  --input '{ "expense_id": "EXP-TF-001", "amount": 50000, "description": "出張交通費", "applicant": "yamada.taro"  }')EXECUTION_ARN=$(echo "$EXECUTION" | jq -r '.executionArn')echo "実行ARN: $EXECUTION_ARN"

Step 2: SQS からタスクトークンを取得する

ワークフローが WaitForApproval ステートに到達すると、SQS にメッセージが届きます。

# SQS メッセージを受信(タスクトークンが含まれる)MESSAGE=$(aws sqs receive-message \  --queue-url "$SQS_QUEUE_URL" \  --max-number-of-messages 1)echo "$MESSAGE" | jq '.'# タスクトークンを取り出すTASK_TOKEN=$(echo "$MESSAGE" | jq -r '.Messages[0].Body | fromjson | .task_token')echo "タスクトークン: $TASK_TOKEN"# SQS からメッセージを削除(受信後は必ず削除する)RECEIPT_HANDLE=$(echo "$MESSAGE" | jq -r '.Messages[0].ReceiptHandle')aws sqs delete-message \  --queue-url "$SQS_QUEUE_URL" \  --receipt-handle "$RECEIPT_HANDLE"

Step 3-A: 承認する

aws stepfunctions send-task-success \  --task-token "$TASK_TOKEN" \  --task-output '{"approved": true, "approver": "manager.suzuki"}'

承認後の流れ:
1. RouteApprovalProcessExpenseNotifyApproved
2. SNS 経由でメール通知が届く
3. ExpenseApproved(Succeed)で正常終了

Step 3-B: 却下する

aws stepfunctions send-task-success \  --task-token "$TASK_TOKEN" \  --task-output '{"approved": false, "approver": "manager.suzuki"}'

却下後の流れ:
1. RouteApprovalNotifyRejected
2. SNS 経由でメール通知が届く
3. ExpenseRejected(Fail)で終了

実行状態の確認

# 実行状態を確認aws stepfunctions describe-execution \  --execution-arn "$EXECUTION_ARN" \  | jq '{status: .status, startDate: .startDate, stopDate: .stopDate}'

後片付け

確認が終わったらリソースを削除します:

terraform destroy -var="notification_email=your@email.com"

注意: terraform destroy は作成したすべての AWS リソースを削除します。実行前に内容を確認してください。


まとめ: コンソール版との対応関係

コンソール版(Section 3)Terraform(Section 4)
Step D の ASL を手動入力definition.json.tpl として管理
Lambda を手動作成aws_lambda_function リソース
SQS を手動作成aws_sqs_queue リソース
SNS を手動作成aws_sns_topic リソース
IAM を手動設定aws_iam_role / aws_iam_role_policy

コンソール版 Step D と ASL 構造が完全一致しています。 ステート名・遷移・Parameters・HeartbeatSeconds(3600)・TimeoutSeconds(86400)・Catch の構成はすべて同一であり、${...} のプレースホルダー部分のみが Terraform の templatefile() によって実際の ARN・URL に置換されます。


Section 5: 実践Tips(Callbackパターン設計ガイド)

この Section では、Callbackパターンを実務で使う際のTipsと注意点を解説します。

5-1. タスクトークンの寿命と安全な保管

寿命: タスクトークンは最大1年間有効だが、TimeoutSeconds で短縮することを強く推奨。

安全な保管場所(重要度順):

保管場所用途注意点
DynamoDB標準的な保管場所(expense_idをキーに保存)TTLを設定して自動削除推奨
SSM Parameter Store短期保管・設定値と一緒に管理SecureStringで暗号化
SQSメッセージ本文配信と同時に受け取るシンプルな方法メッセージ可視性タイムアウトに注意

DynamoDBでのトークン保管例(Lambda内):

import boto3import osdef save_task_token(expense_id: str, task_token: str) -> None: dynamodb = boto3.resource("dynamodb") table = dynamodb.Table(os.environ["EXPENSE_TABLE"]) table.update_item(  Key={"expense_id": expense_id},  UpdateExpression="SET task_token = :token, #st = :status",  ExpressionAttributeNames={"#st": "status"},  ExpressionAttributeValues={":token": task_token, ":status": "pending_approval"} )

5-2. 再送・リトライ時のべき等性設計

問題: SQSのメッセージは少なくとも1回配信(at-least-once delivery)。
同じタスクトークンで SendTaskSuccess が複数回呼ばれた場合、2回目以降はエラーになる。

べき等性の実装パターン:

import boto3import jsondef handle_approval(expense_id: str, approved: bool, approver: str) -> dict: dynamodb = boto3.resource("dynamodb") table = dynamodb.Table("expense-requests") # べき等性チェック: 既に処理済みか確認 item = table.get_item(Key={"expense_id": expense_id})["Item"] if item.get("status") not in ("pending_approval",):  print(f"既に処理済み: expense_id={expense_id}, status={item['status']}")  return {"already_processed": True} task_token = item["task_token"] # ステータスを更新してから SendTaskSuccess(二重送信防止) table.update_item(  Key={"expense_id": expense_id},  UpdateExpression="SET #st = :status",  ConditionExpression="attribute_exists(task_token)",  ExpressionAttributeNames={"#st": "status"},  ExpressionAttributeValues={":status": "processing"} ) sf_client = boto3.client("stepfunctions") sf_client.send_task_success(  taskToken=task_token,  output=json.dumps({"approved": approved, "approver": approver}) ) return {"sent": True}

ポイント: ステータスを processing に更新してから SendTaskSuccess を呼ぶことで、並列実行やリトライによる二重送信を防止する。

5-3. API Gateway + Lambda で Webベース承認UIを作るパターン(概念のみ)

CLIではなくWebブラウザから承認するパターン:

① SF: タスクトークンをDynamoDBに保存 + メール/Slack通知(承認URLを含む)② 承認者: ブラウザでURLにアクセス → 承認/却下ボタンをクリック③ API GW + Lambda: expense_idからDynamoDBでトークンを取得 → SendTaskSuccess/Failure④ SF: 再開

承認URL例: https://api.example.com/approve?expense_id=EXP-001&action=approve

このパターンにより、エンジニア以外の承認者(マネージャーや経理担当者)もブラウザから承認作業を行えるようになる。

5-4. HeartbeatSeconds の活用パターン

長時間処理中の「生存確認」用途として HeartbeatSeconds を活用する:

import boto3import timedef long_running_process_with_heartbeat(task_token: str) -> None: sf_client = boto3.client("stepfunctions") for i in range(10):  # 長時間処理のシミュレーション  # 処理ステップ(例: 5分かかる処理)  time.sleep(300)  # Heartbeat送信(HeartbeatSecondsのタイマーをリセット)  sf_client.send_task_heartbeat(taskToken=task_token)  print(f"Heartbeat送信 ({i+1}/10)") # 処理完了 sf_client.send_task_success(  taskToken=task_token,  output='{"completed": true}' )

使い分けの目安:

設定役割推奨値
TimeoutSecondsステップ全体のタイムアウト処理の最大所要時間 × 1.5
HeartbeatSeconds生存確認の間隔Heartbeat送信間隔 × 2

HeartbeatSeconds を設定しておくと、処理が途中でフリーズした場合に States.HeartbeatTimeout エラーとして検出でき、Catch/Retry で適切に対処できる。


Section 6: ハンズオン後の削除手順

6-1. コスト注意事項

⚠️ 放置するとSQSメッセージ、DynamoDBストレージなどで課金が発生する可能性があります。

リソース月額目安備考
Lambda無料枠内(本ハンズオン程度)100万リクエスト/月まで無料
Step Functions無料枠内(本ハンズオン程度)4,000回の状態遷移まで無料
SQS無料枠内(本ハンズオン程度)100万リクエスト/月まで無料
SNS無料枠内(本ハンズオン程度)100万件まで無料
DynamoDB~$0.25/GB/月ストレージ課金に注意
CloudWatch Logs~$0.50/GBロググループを削除で停止

6-2. Terraformで構築した場合

以下のコマンド1つで全リソースを削除できる:

terraform destroy -var="notification_email=your@email.com"

terraform destroy を実行すると、terraform apply で作成した全リソースが削除される。削除前に確認プロンプトが表示されるので、yes と入力する。

Terraformで管理されない CloudWatch Logs は手動で削除が必要:

aws logs delete-log-group --log-group-name /aws/lambda/submit-expenseaws logs delete-log-group --log-group-name /aws/lambda/process-expense

6-3. コンソールで構築した場合の削除チェックリスト

以下の順番で削除する(依存関係に注意):

  • [ ] Step Functions ステートマシンを削除(コンソール: Step Functions → ステートマシン)
  • expense-callback-step-aexpense-callback-step-bexpense-callback-step-cexpense-callback-final
  • [ ] Lambda 関数を削除(コンソール: Lambda → 関数)
  • submit-expenseprocess-expense
  • [ ] SQS キューを削除(コンソール: SQS → キュー)
  • expense-approval-queue
  • [ ] SNS トピックを削除(コンソール: SNS → トピック)
  • expense-notification(サブスクリプションも自動削除)
  • [ ] DynamoDB テーブルを削除(コンソール: DynamoDB → テーブル)
  • expense-requests
  • [ ] IAM ロールと付随するポリシーを削除(コンソール: IAM → ロール)
  • sf-expense-execution-role
  • [ ] CloudWatch Logs ロググループを削除(コンソール: CloudWatch → ロールグループ)
  • /aws/lambda/submit-expense/aws/lambda/process-expense

Section 7: まとめと次のステップ

7-1. この記事で学んだこと

このハンズオンを通じて、以下の内容を実践的に習得した:

  • .waitForTaskToken によるCallbackパターンの仕組み — トークン発行→停止→返送→再開のライフサイクル
  • .sync vs .waitForTaskToken の使い分け — 同期的なECS/Lambdaタスク実行と非同期の人間承認フローの違い
  • SendTaskSuccess / SendTaskFailure / SendTaskHeartbeat API の使い方 — AWS CLI と Python SDKの両方で操作
  • HeartbeatSeconds / TimeoutSeconds のCallback固有の設定 — 生存確認と全体タイムアウトの使い分け
  • AWS CLIを使った手動承認体験 — 「承認ボタン」の代わりにCLIコマンドで承認する感覚をつかむ
  • 段階的構築(Step A→B→C→D)でCallback機能を段階的に拡張 — 基本構造から始めてDynamoDB統合・SQS連携・人間承認まで積み上げ
  • タスクトークンの安全な保管(DynamoDB)とべき等性設計 — 実務で使える堅牢な実装パターン

7-2. 次のステップ

Callbackパターンをマスターした後は、以下のテーマに進むとStep Functionsの理解がさらに深まる:

  • Distributed Map(大規模並列バッチ処理) — S3上の大量ファイルを並列処理するパターン — 第6弾予告
  • Express vs Standard Workflow の使い分け — 高頻度処理とコスト最適化のトレードオフ — 第7弾予告
  • EventBridge + Step Functions の連携 — イベント駆動アーキテクチャでStep Functionsを自動起動するパターン

7-3. シリーズリンク


本記事は「AWS ハンズオン TechBlog」Step Functions シリーズの第5回です。

シリーズ一覧:
– 第1回: AWS Step Functions 入門 — コンソールとTerraformで学ぶハンズオン
– 第2回: ECS × Step Functions 入門 — CSVバッチをFargateタスクでジョブ化するハンズオン
– 第3回: Step Functions エラーハンドリング完全ガイド — 注文処理パイプラインで学ぶRetry/Catch/Timeout
– 第4回: Step Functions 入出力データフロー制御完全ガイド — 5つのフィルタでペイロードを最適化するハンズオン
– 第5回: 本記事(Callbackパターン完全ガイド)


7-4. 参考リンク

  • Amazon States Language — .waitForTaskToken
  • SendTaskSuccess API リファレンス
  • Step Functions Callback パターンの例
  • SQS 可視性タイムアウト
最新情報をチェックしよう!