AWS Step Functions Callbackパターン完全ガイド:.waitForTaskToken非同期承認ハンズオン

目次

Step Functions Callbackパターン完全ガイド — .waitForTaskTokenで実現する経費承認ワークフロー

公開日: 2026-04-12
難易度: 中級〜上級
所要時間: 約120分
シリーズ: [Step Functions シリーズ 第5回]

📋 この記事で学ぶこと

  • .waitForTaskToken によるCallback パターンの仕組み(.sync との違い)
  • タスクトークンのライフサイクル(発行→SQS送信→停止→返送→再開)
  • AWS CLIで手動SendTaskSuccessを叩く「承認ボタン」体験
  • 段階的構築(Step A→B→C→D): 基本Callback→タイムアウト→承認/却下分岐→SNS通知
  • Terraform での完全構築(コンソール版とASL完全一致)

目次

  1. Callbackパターン(.waitForTaskToken)概念編
  2. アーキテクチャ解説
  3. AWSコンソールでのハンズオン
  4. Terraformでの構築
  5. 実践Tips(Callbackパターン設計ガイド)
  6. ハンズオン後の削除手順
  7. まとめと次のステップ

Section 1: Callbackパターン(.waitForTaskToken)概念編

1-1. Callbackパターンとは

AWS Step Functions には、外部リソースとの連携方法として .sync(同期).waitForTaskToken(Callback) の2つの統合パターンがある。

通常の .sync パターンでは、Step Functions が Lambda や ECS などを呼び出し、処理完了まで自動的に待機する。これは「Step Functions 側が完了を検知する」モデルだ。

一方、.waitForTaskToken(Callbackパターン) は「外部システムが完了を通知する」モデルである。Step Functions はタスクトークンと呼ばれる一意の識別子を発行し、それを外部システムに渡したうえで実行を 停止(pause) する。外部システムは処理完了後に SendTaskSuccess または SendTaskFailure API を呼び出すことで、Step Functions の実行が再開される。

主な用途:
– 人間の承認が必要なフロー(上長承認、コンプライアンスチェックなど)
– 外部 API の非同期応答待ち(決済処理、審査システムなど)
– SQS / API Gateway 経由のマイクロサービス間非同期連携
– Step Functions のタイムアウト内では完了しない長時間処理


1-2. .sync vs .waitForTaskToken 比較表

特性.sync.waitForTaskToken
実行モード同期(完了まで自動待機)非同期(トークン返送まで停止)
最大待機時間1年(Standard Workflow)1年(TimeoutSeconds 設定推奨)
ユースケースLambda / ECS / Glue 等の完了待ち人間承認・外部 API・SQS / API GW 経由の非同期処理
タスクトークン不要必須($$.Task.Token
完了通知SF が自動検知外部システムが SendTaskSuccess / SendTaskFailure を呼ぶ
HeartbeatSeconds一部リソースのみ対応利用可能(長時間処理では推奨)

1-3. タスクトークンのライフサイクル(最重要)

Callback パターンの核心は タスクトークン にある。以下がその一連の流れだ。

① SF: タスクトークン発行($$.Task.Token)
 ↓
② SF: トークンを SQS / API GW 経由で外部システムに送信 → 実行を停止
 ↓(外部システムが処理中 — 数秒〜数日)
③ 外部: SendTaskSuccess(token, output) または SendTaskFailure(token, error) を呼ぶ
 ↓
④ SF: 停止から再開 → 次のステートへ進む

タスクトークンの重要な特性:

特性内容
形式推測不可能な不透明文字列(内部識別子)
最大長2,048 文字
有効期間最大 1 年(ワークフロー実行期間と同じ)
使用回数一度限り(SendTaskSuccess / SendTaskFailure のどちらか一方のみ有効)
スコープ同一 AWS アカウント内のみ(クロスアカウント不可)
保管責任外部システム側が安全に保管する責任を持つ
ポイント: トークンは $$.Task.Token で Parameters に埋め込む。第4弾(データフロー制御)で学んだ Context Object($$)の知識がここで直接活きる。

1-4. SendTaskSuccess / SendTaskFailure / SendTaskHeartbeat API

SendTaskSuccess — 処理成功を通知

aws stepfunctions send-task-success \
  --task-token "TOKEN" \
  --task-output '{"approved": true, "approver": "manager"}'
パラメータ必須制限
task-token1〜2,048 文字
task-output最大 262,144 バイト(256 KB)の JSON 文字列

task-output の内容は ResultPath に格納される。

SendTaskFailure — 処理失敗を通知

aws stepfunctions send-task-failure \
  --task-token "TOKEN" \
  --error "ExpenseRejected" \
  --cause "却下理由: 金額が承認限度額を超過"
パラメータ必須制限
task-token1〜2,048 文字
error任意最大 256 文字(Catch の ErrorEquals で捕捉可能)
cause任意最大 32,768 文字

error フィールドは Catch ブロックで ErrorEquals: ["ExpenseRejected"] のように捕捉できる。

SendTaskHeartbeat — 生存確認を送信

aws stepfunctions send-task-heartbeat \
  --task-token "TOKEN"

HeartbeatSeconds のタイマーをリセットする「生存確認」用 API。長時間処理中に定期的に呼び出すことで States.HeartbeatTimeout を防ぐ。成功時はレスポンスボディなしの HTTP 200 が返る(実行履歴イベントは生成されない)。

共通エラーコード:

エラー意味
InvalidTokenトークンの形式が不正
TaskDoesNotExistトークンが有効なタスクに対応しない
TaskTimedOutタスクがタイムアウト済みまたは完了済み

1-5. HeartbeatSeconds と TimeoutSeconds の Callback 固有挙動

パラメータ意味タイムアウト時のエラー
TimeoutSecondsタスク全体のタイムアウト(SendTaskSuccess / Failure の最終期限)States.Timeout
HeartbeatSeconds連続する Heartbeat 間隔の最大値States.HeartbeatTimeout

設定例:

{
  "Type": "Task",
  "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
  "HeartbeatSeconds": 3600,
  "TimeoutSeconds": 86400,
  "Parameters": {
 "QueueUrl": "<SQS_QUEUE_URL>",
 "MessageBody": {
"task_token.$": "$$.Task.Token"
 }
  }
}

重要な設計指針:

  1. HeartbeatSeconds < TimeoutSeconds を必ず守る(例: Heartbeat=3600、Timeout=86400)
  2. HeartbeatSeconds は外部システムが定期的に生存確認を送れる場合のみ設定する
  3. TimeoutSeconds は SQS メッセージの可視性タイムアウトとは独立した別の概念
  4. TimeoutSeconds のデフォルト値は 99,999,999 秒(約 3.17 年)だが、明示的に設定することを推奨
  5. SendTaskHeartbeatHeartbeatSeconds タイマーをリセットするが、TimeoutSeconds は延長しない

1-6. ユースケース一覧

ユースケース説明Callback を使う理由
人間の承認フロー上長承認、リリース承認、コンプライアンスチェック承認まで数時間〜数日かかる可能性
外部 API 非同期応答外部決済処理、外部審査システムの応答待ち外部 API が非同期モデルを採用
サービス間オーケストレーションマイクロサービス間の非同期連携(SQS / EventBridge 経由)受信側サービスが完了タイミングを制御
手動データ検証オペレーターが目視確認後に処理を再開するフロー人間の判断が必要
支払い処理待ちユーザーが支払いフォームに入力するまでの待機ユーザー操作完了まで時間不定

1-7. Context Object でのトークン取得方法

$$.Task.TokenContext Object$$)経由で取得する。第4弾(データフロー制御)で学んだ Context Object の知識がここで直接活きる。

{
  "WaitForApproval": {
 "Type": "Task",
 "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
 "Parameters": {
"QueueUrl": "<SQS_QUEUE_URL>",
"MessageBody": {
  "task_token.$": "$$.Task.Token",
  "expense_id.$": "$.expense_id"
}
 }
  }
}

構文のポイント:

記法意味
"task_token.$".$ サフィックス = 値を動的参照(パス/Context Object)
"$$.Task.Token"$$ = Context Object、.Task.Token = 現在のタスクのトークン
"expense_id.$": "$.expense_id"$ = 入力 JSON から値を参照
⚠️ 注意: $$.Task.TokenParameters および ResultSelector の中でのみ参照可能。InputPathOutputPath では使用できない。

Section 1 まとめ:
.waitForTaskToken は「外部システムが完了を通知する」非同期パターン
– タスクトークンは一度限り有効、最大 2,048 文字、有効期間最大 1 年
HeartbeatSecondsStates.HeartbeatTimeout)と TimeoutSecondsStates.Timeout)は独立したタイムアウト機構
$$.Task.TokenParameters / ResultSelector 内でのみ参照可能


Section 2: アーキテクチャ解説

2-1. ハンズオンシナリオ概要

本ハンズオンでは 経費承認ワークフロー をシナリオとして使用します。社員が経費を申請し、上長が承認または却下し、結果をメールで通知するという実務でよく見られる非同期承認フローを AWS Step Functions の Callbackパターン(.waitForTaskToken)で実装します。

ワークフローの概要

フェーズ処理担当コンポーネント
申請受付経費情報をDynamoDBに保存し、承認依頼をSQSへ送信SubmitExpense Lambda
承認待機タスクトークンを発行し実行を停止WaitForApproval(Callback)
上長確認SQSからメッセージを取得し、CLIで承認/却下を返送承認者(SQS + SF API)
分岐処理承認/却下/タイムアウトで後続ステートへ分岐RouteApproval(Choice)
完了通知申請者へSNS経由でメール通知NotifyApproved/Rejected/Timeout

各コンポーネントの役割

  • Step Functions State Machine — ワークフロー全体のオーケストレーター。.waitForTaskToken を使い、承認者の返答まで実行を停止する
  • SQS キュー — タスクトークンを埋め込んだメッセージを承認者へ配送するメッセージバッファ
  • DynamoDB — 経費申請データ(申請者、金額、申請日時)を永続化
  • SNS — 承認/却下/タイムアウト通知を申請者へメール配信
  • Lambda — 経費受付処理と経理システムへの反映処理を担当

2-2. 全体アーキテクチャ

全体アーキテクチャ

上図は経費承認ワークフロー全体のアーキテクチャを示しています。

ワークフローは Lambda または EventBridge Scheduler によってトリガーされ、Step Functions State Machine が処理を開始します。State Machine 内部は縦に配置された5つの主要ステートで構成されます。

Callbackの核心: WaitForApproval ステートは ★停止・待機ポイント です。このステートに到達すると Step Functions はタスクトークンを生成し、SQSへメッセージを送信した後、外部から SendTaskSuccess または SendTaskFailure が呼ばれるまで実行を停止します。この待機中にも Step Functions の状態は保持され、課金は発生しません(Standard Workflow の場合)。

承認者はSQSキューからメッセージを取得し、タスクトークンを使って aws stepfunctions send-task-success を実行することでワークフローを再開させます。


2-3. Callbackフロー詳解

Callbackフロー

Callbackパターンのデータフローを上段(発行・送信・停止)と下段(承認・返送・再開)に分けて示しています。

タスクトークンの発行

State Definition で .waitForTaskToken を指定したリソースが呼ばれると、Step Functions は自動的にタスクトークン(約1KBの不透明な文字列)を生成します。このトークンは $$.Task.Token という組み込み変数で参照できます。

"WaitForApproval": {
  "Type": "Task",
  "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
  "Parameters": {
 "QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789/approval-queue",
 "MessageBody": {
"TaskToken.$": "$$.Task.Token",
"ExpenseId.$": "$.expenseId",
"Amount.$": "$.amount"
 }
  },
  "HeartbeatSeconds": 86400,
  "Next": "RouteApproval"
}

停止と待機

SQSへのメッセージ送信が完了した時点で、ステートの実行は停止します。State Machine は RUNNING 状態を維持したまま、タスクトークンが返送されるのを待ちます。Standard Workflow では最大1年間待機でき、HeartbeatSeconds でハートビートタイムアウトを設定できます。

トークンの返送と再開

承認者はSQSメッセージからタスクトークンを取得し、以下のいずれかを実行します:

コマンド意味後続動作
send-task-success承認(結果JSONを渡す)RouteApprovalステートへ進む
send-task-failureシステムエラーエラーハンドラへ
# 承認例
aws stepfunctions send-task-success \
  --task-token "AQC..." \
  --task-output '{"approved": true, "approver": "manager-a", "comment": "承認します"}'

# 却下例
aws stepfunctions send-task-success \
  --task-token "AQC..." \
  --task-output '{"approved": false, "approver": "manager-a", "comment": "金額が上限超過"}'

2-4. 承認/却下シーケンス

承認シーケンス

承認フロー(通常系)

  1. SF → SubmitExpense Lambda: 経費申請を受け付け、DynamoDBに保存
  2. Lambda → SF: 申請ID・金額などを返却
  3. SF → SQS: タスクトークンを埋め込んだメッセージを送信
  4. SF: ⏸ 実行停止(待機開始)— WaitForApproval ステートで停止
  5. 承認者 → SQS: SQSキューからメッセージを受信(ポーリング or Lambda トリガー)
  6. 承認者 → SF: SendTaskSuccess {"approved": true, "approver": "manager"} を送信
  7. SF: ▶ 実行再開(待機終了)— RouteApproval ステートへ進む
  8. SF → ProcessExpense Lambda: 経理システムへ反映
  9. SF → SNS: 承認完了通知を送信 → 申請者へメール
  10. SF: SUCCEEDED終了

却下フロー

ステップ6で {"approved": false} が返送されると、RouteApproval(Choiceステート)の条件分岐により却下パスに進み、NotifyRejected(SNS)を経てFAILED終了します。

"RouteApproval": {
  "Type": "Choice",
  "Choices": [
 {
"Variable": "$.approved",
"BooleanEquals": true,
"Next": "ProcessExpense"
 },
 {
"Variable": "$.approved",
"BooleanEquals": false,
"Next": "NotifyRejected"
 }
  ]
}

タイムアウトフロー

WaitForApproval ステートに HeartbeatSeconds: 86400(24時間)を設定することで、承認が一定時間行われない場合に States.HeartbeatTimeout エラーが発生します。Catch ブロックで補足し NotifyTimeout(SNS)へ分岐させることで、申請者・申請者管理者へタイムアウト通知を送ります。

"WaitForApproval": {
  ...,
  "HeartbeatSeconds": 86400,
  "Catch": [
 {
"ErrorEquals": ["States.HeartbeatTimeout"],
"Next": "NotifyTimeout"
 }
  ]
}
ポイント: HeartbeatSeconds はトークン返送のタイムアウトを制御します。ワークフロー全体の最大実行時間は TimeoutSeconds で別途制御できます。Standard Workflow のデフォルト最大実行時間は1年です。

Section 3: AWSコンソールでのハンズオン

このセクションでは、AWSコンソールを使って経費承認ワークフローを実際に構築します。
「承認ボタンを押す」体験をAWS CLIで行うことが、このハンズオンの核心です。

ステートマシンはStep A→B→C→Dと段階的に機能を追加しながら構築します。最終形(Step D)では、SQS Callback + 承認/却下分岐 + SNS通知が全て揃った完成形を体験できます。


3-1. 前提条件

必要なAWS権限

以下のサービスを操作できるIAM権限が必要です。

サービス必要な権限
AWS Lambda関数の作成・編集・実行
AWS Step Functionsステートマシンの作成・実行
Amazon SQSキューの作成・メッセージ受信
Amazon SNSトピックの作成・サブスクリプション管理
AWS IAMロール・ポリシーの作成
Amazon CloudWatch Logsログの閲覧

AWS CLI の準備

このハンズオンでは、SendTaskSuccess/SendTaskFailure を AWS CLI で実行します。
コンソールからは送信できないため、CLIのインストールと設定が必須です。

# バージョン確認
aws --version
# aws-cli/2.x.x Python/3.x.x ...

# 認証確認
aws sts get-caller-identity

認証情報が正しく設定されていれば、アカウントIDとARNが表示されます。


3-2. Lambda 2関数のデプロイ

経費承認ワークフローでは2つのLambda関数を使用します。

関数名役割
submit_expense経費申請を受け付け、ステータスをpendingに設定
process_expense承認後の経理処理(DynamoDB書き込みなど)を実行

submit_expense 関数の作成

コンソール操作手順:

  1. AWSコンソールで Lambda を開く
  2. 「関数の作成」をクリック
  3. 設定値:
  4. 作成方法: 一から作成
  5. 関数名: submit_expense
  6. ランタイム: Python 3.12
  7. アーキテクチャ: x86_64
  8. 「関数の作成」をクリック
  9. コードエディタに以下を貼り付け:
import json
import os
import boto3
from datetime import datetime

def lambda_handler(event, context):
 expense_id = event.get("expense_id")
 amount = event.get("amount", 0)
 description = event.get("description", "")
 applicant = event.get("applicant", "unknown")

 if not expense_id:
  raise ValueError("expense_id は必須です")
 if amount <= 0:
  raise ValueError("amountは正の値が必要です")

 # DynamoDB への保存(オプション — Step Dで有効化)
 # dynamodb = boto3.resource("dynamodb")
 # table = dynamodb.Table(os.environ.get("EXPENSE_TABLE", "expense-requests"))
 # table.put_item(Item={
 #  "expense_id": expense_id,
 #  "amount": amount,
 #  "description": description,
 #  "applicant": applicant,
 #  "status": "pending",
 #  "submitted_at": datetime.utcnow().isoformat()
 # })

 print(f"経費申請受付: expense_id={expense_id}, amount={amount}, applicant={applicant}")

 return {
  "expense_id": expense_id,
  "status": "pending",
  "submitted_at": datetime.utcnow().isoformat()
 }
  1. 「Deploy」をクリックして保存
  2. 関数ARNをメモ(例: arn:aws:lambda:ap-northeast-1:123456789012:function:submit_expense
    <LAMBDA_ARN_SUBMIT_EXPENSE> プレースホルダーに使用

process_expense 関数の作成

同様の手順で2つ目の関数を作成します。

  1. Lambda コンソール → 「関数の作成」
  2. 設定値:
  3. 関数名: process_expense
  4. ランタイム: Python 3.12
  5. コードエディタに以下を貼り付け:
import json
import os
from datetime import datetime

def lambda_handler(event, context):
 expense_id = event.get("expense_id")
 amount = event.get("amount", 0)
 approver = event.get("approver", "unknown")

 print(f"経理処理実行: expense_id={expense_id}, amount={amount}, approver={approver}")

 # 経理システムへの連携処理(ハンズオンではモック)
 return {
  "expense_id": expense_id,
  "amount": amount,
  "approver": approver,
  "processed_at": datetime.utcnow().isoformat(),
  "accounting_ref": f"ACC-{expense_id}-001"
 }
  1. 「Deploy」をクリックして保存
  2. 関数ARNをメモ
    <LAMBDA_ARN_PROCESS_EXPENSE> プレースホルダーに使用

3-3. SQSキュー作成

Callbackパターンの中心となるSQSキューを作成します。

コンソール操作手順:

  1. AWSコンソールで Amazon SQS を開く
  2. 「キューの作成」をクリック
  3. 設定値:
  4. タイプ: スタンダード
  5. 名前: expense-approval-queue
  6. 可視性タイムアウト: 300秒(デフォルト30秒から変更推奨)
    > 可視性タイムアウトが短いと、承認者がメッセージを処理中に他のコンシューマーが再取得してしまいます。
  7. その他はデフォルトのまま「キューの作成」をクリック
  8. 作成後、キュー詳細画面から キューURL をメモ
  9. 例: https://sqs.ap-northeast-1.amazonaws.com/123456789012/expense-approval-queue
  10. <SQS_QUEUE_URL> プレースホルダーに使用

3-4. SNSトピック作成とメールサブスクリプション

承認/却下/タイムアウトの通知を送るSNSトピックを作成します(Step Dで使用)。

コンソール操作手順:

  1. AWSコンソールで Amazon SNS を開く
  2. 「トピックの作成」をクリック
  3. 設定値:
  4. タイプ: スタンダード
  5. 名前: expense-notification
  6. 「トピックの作成」をクリック
  7. トピックARNをメモ
  8. 例: arn:aws:sns:ap-northeast-1:123456789012:expense-notification
  9. <SNS_TOPIC_ARN> プレースホルダーに使用

メールサブスクリプションの設定:

  1. 作成したトピックの詳細画面 → 「サブスクリプションの作成」
  2. 設定値:
  3. プロトコル: Eメール
  4. エンドポイント: 通知を受け取るメールアドレスを入力
  5. 「サブスクリプションの作成」をクリック
  6. 入力したメールアドレスに確認メールが届く
  7. メール内の 「Confirm subscription」リンクをクリック
⚠️ 注意: このステップを忘れると、SNS通知がメールに届きません。

3-5. IAMロール作成

Step FunctionsがLambda・SQS・SNSを操作するためのIAMロールを作成します。

コンソール操作手順:

  1. AWSコンソールで IAM を開く → 「ロール」→「ロールを作成」
  2. 信頼されたエンティティの選択:
  3. タイプ: AWSのサービス
  4. ユースケース: 「Step Functions」を選択(リストにない場合は「カスタム信頼ポリシー」を使用)
  5. 信頼ポリシー(カスタム選択時):
    json
    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Principal": {
    "Service": "states.amazonaws.com"
    },
    "Action": "sts:AssumeRole"
    }
    ]}
  6. ロール名: sf-expense-execution-role
  7. ロール作成後、「インラインポリシーを追加」から以下を設定:
{
  "Version": "2012-10-17",
  "Statement": [
 {
"Sid": "InvokeLambda",
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": [
  "<LAMBDA_ARN_SUBMIT_EXPENSE>",
  "<LAMBDA_ARN_PROCESS_EXPENSE>"
]
 },
 {
"Sid": "SendSQSMessage",
"Effect": "Allow",
"Action": "sqs:SendMessage",
"Resource": "<SQS_QUEUE_ARN>"
 },
 {
"Sid": "PublishSNS",
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": "<SNS_TOPIC_ARN>"
 },
 {
"Sid": "CloudWatchLogs",
"Effect": "Allow",
"Action": [
  "logs:CreateLogGroup",
  "logs:CreateLogDelivery",
  "logs:PutLogEvents",
  "logs:DescribeLogGroups",
  "logs:DescribeLogStreams",
  "logs:GetLogDelivery",
  "logs:ListLogDeliveries",
  "logs:UpdateLogDelivery",
  "logs:DeleteLogDelivery",
  "logs:PutResourcePolicy",
  "logs:DescribeResourcePolicies"
],
"Resource": "*"
 }
  ]
}
💡 SQS_QUEUE_ARN について: キューのARNはキューURL とは異なります。SQSコンソールでキューの詳細を開くと「ARN」フィールドで確認できます(例: arn:aws:sqs:ap-northeast-1:123456789012:expense-approval-queue)。
  1. ポリシー名: sf-expense-policy として保存

3-6. ステートマシンを段階的に構築(記事の核心)

ここからがこのハンズオンの核心です。Step A→D と段階的にステートマシンを構築し、Callbackパターンの仕組みを体験的に理解します。


Step A: 基本Callback — 承認体験を初めて体感する

まず最もシンプルな形でCallbackパターンを体験します。

ステートマシン作成手順:

  1. AWSコンソールで Step Functions を開く
  2. 「ステートマシンの作成」をクリック
  3. 「コードでワークフローを記述」を選択
  4. 以下のASLを貼り付け(<プレースホルダー> を実際の値に置換):
{
  "Comment": "経費承認ワークフロー — Step A(基本Callback)",
  "StartAt": "SubmitExpense",
  "States": {
 "SubmitExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>",
"ResultPath": "$.submission",
"Next": "WaitForApproval"
 },
 "WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
  "QueueUrl": "<SQS_QUEUE_URL>",
  "MessageBody": {
 "task_token.$": "$$.Task.Token",
 "expense_id.$": "$.expense_id",
 "amount.$": "$.amount"
  }
},
"ResultPath": "$.approval",
"Next": "ProcessExpense"
 },
 "ProcessExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>",
"ResultPath": "$.processing",
"Next": "ExpenseApproved"
 },
 "ExpenseApproved": {
"Type": "Succeed"
 }
  }
}
$$.Task.Token がCallbackの核心: $$ はStep Functionsのコンテキストオブジェクトへのアクセスです。実行時に一意のトークンが生成され、SQSメッセージに埋め込まれます。このトークンを後でSendTaskSuccessに渡すことで、ワークフローが再開します。
  1. 設定値:
  2. ステートマシン名: expense-approval-step-a
  3. 実行ロール: sf-expense-execution-role
  4. ログ設定: ALL レベル を推奨(待機中の挙動が CloudWatch で確認できます)
  5. 「ステートマシンの作成」をクリック

実行テスト:

  1. 「実行の開始」をクリック
  2. 入力JSONに以下を貼り付け:
{
  "expense_id": "EXP-001",
  "amount": 50000,
  "description": "出張交通費",
  "applicant": "yamada.taro"
}
  1. 「実行の開始」をクリック

WaitForApproval で止まることを確認:

実行グラフで WaitForApproval ステートが青色(RUNNING)のまま止まっているはずです。
これがCallbackパターンの動作です — Step Functionsは外部からの応答を待って一時停止しています。

SQSからtask_tokenを取得:

  1. SQSコンソール → expense-approval-queue を開く
  2. 「メッセージを送受信」→「メッセージをポーリング」をクリック
  3. 受信したメッセージの本文を確認:
    json
    {
    "task_token": "AQCAAAAKAAAAAAAAADg...(長い文字列)",
    "expense_id": "EXP-001",
    "amount": 50000
    }
  4. task_token の値をコピー(非常に長い文字列です)

★ 承認体験(記事の核心)

AWS CLIで以下を実行します。これが「承認ボタンを押す」体験です:

aws stepfunctions send-task-success \
  --task-token "SQSメッセージから取得したtask_token" \
  --task-output '{"approved": true, "approver": "manager.suzuki"}'

コマンド実行後、Step Functions コンソールに戻ると:
WaitForApproval → SUCCEEDED
ProcessExpense → 実行中
ExpenseApproved → SUCCEEDED

実行全体が SUCCEEDED になることを確認してください。

これがCallbackパターンの基本動作です: Step Functionsはtask_tokenを発行してSQSにメッセージを送り、外部(今回はCLI)からSendTaskSuccessが届くまで待機します。実際のシステムでは「CLIコマンド」の部分が「承認者がWebアプリで承認ボタンを押す」操作に相当します。

Step B: タイムアウト付き — HeartbeatSeconds / TimeoutSeconds を体験

ステートマシン作成手順:

  1. 「ステートマシンの作成」→「コードでワークフローを記述」
  2. 以下のASLを貼り付け:
{
  "Comment": "経費承認ワークフロー — Step B(タイムアウト付き)",
  "StartAt": "SubmitExpense",
  "States": {
 "SubmitExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>",
"ResultPath": "$.submission",
"Next": "WaitForApproval"
 },
 "WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
  "QueueUrl": "<SQS_QUEUE_URL>",
  "MessageBody": {
 "task_token.$": "$$.Task.Token",
 "expense_id.$": "$.expense_id",
 "amount.$": "$.amount"
  }
},
"HeartbeatSeconds": 3600,
"TimeoutSeconds": 86400,
"ResultPath": "$.approval",
"Catch": [
  {
 "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],
 "ResultPath": "$.error",
 "Next": "ExpenseTimeout"
  }
],
"Next": "ProcessExpense"
 },
 "ProcessExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>",
"ResultPath": "$.processing",
"Next": "ExpenseApproved"
 },
 "ExpenseApproved": {
"Type": "Succeed"
 },
 "ExpenseTimeout": {
"Type": "Fail",
"Error": "ApprovalTimeout",
"Cause": "承認がタイムアウトしました"
 }
  }
}
  1. ステートマシン名: expense-approval-step-b
  2. 「ステートマシンの作成」をクリック

タイムアウト値の意味:

パラメータ意味
HeartbeatSeconds36001時間以内にハートビートがなければタイムアウト
TimeoutSeconds8640024時間以内にSendTaskSuccess/Failureがなければタイムアウト

補足テスト — タイムアウト体験:

本番用ASLのTimeoutSeconds=86400(24時間)は変更せず、別途タイムアウトを体験するための短縮版を使います。新しいステートマシンで TimeoutSeconds30 に変更してテストしてください:

"WaitForApproval": {
  "Type": "Task",
  "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
  "Parameters": { ... },
  "HeartbeatSeconds": 30,
  "TimeoutSeconds": 30,
  "Catch": [
 {
"ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],
"ResultPath": "$.error",
"Next": "ExpenseTimeout"
 }
  ],
  "Next": "ProcessExpense"
}

SendTaskSuccessを送らずに30秒待つと、States.Timeout エラーが発生し、Catch により ExpenseTimeout(Fail)ステートに遷移します。


Step C: 承認/却下分岐 — SendTaskFailure体験

ステートマシン作成手順:

  1. 「ステートマシンの作成」→「コードでワークフローを記述」
  2. 以下のASLを貼り付け:
{
  "Comment": "経費承認ワークフロー — Step C(承認/却下分岐)",
  "StartAt": "SubmitExpense",
  "States": {
 "SubmitExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>",
"ResultPath": "$.submission",
"Next": "WaitForApproval"
 },
 "WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
  "QueueUrl": "<SQS_QUEUE_URL>",
  "MessageBody": {
 "task_token.$": "$$.Task.Token",
 "expense_id.$": "$.expense_id",
 "amount.$": "$.amount"
  }
},
"HeartbeatSeconds": 3600,
"TimeoutSeconds": 86400,
"ResultPath": "$.approval",
"Catch": [
  {
 "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],
 "ResultPath": "$.error",
 "Next": "ExpenseTimeout"
  }
],
"Next": "RouteApproval"
 },
 "RouteApproval": {
"Type": "Choice",
"Choices": [
  {
 "Variable": "$.approval.approved",
 "BooleanEquals": true,
 "Next": "ProcessExpense"
  }
],
"Default": "ExpenseRejected"
 },
 "ProcessExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>",
"ResultPath": "$.processing",
"Next": "ExpenseApproved"
 },
 "ExpenseApproved": {
"Type": "Succeed"
 },
 "ExpenseRejected": {
"Type": "Fail",
"Error": "ExpenseRejected",
"Cause": "経費申請が却下されました"
 },
 "ExpenseTimeout": {
"Type": "Fail",
"Error": "ApprovalTimeout",
"Cause": "承認がタイムアウトしました"
 }
  }
}
  1. ステートマシン名: expense-approval-step-c

承認テスト(approved: true):

aws stepfunctions send-task-success \
  --task-token "TOKEN" \
  --task-output '{"approved": true, "approver": "manager.suzuki"}'

RouteApprovalapproved == trueProcessExpenseExpenseApproved (SUCCEEDED)

却下テスト(approved: false):

aws stepfunctions send-task-success \
  --task-token "TOKEN" \
  --task-output '{"approved": false, "reason": "金額が承認限度額を超過"}'

RouteApprovalDefaultExpenseRejected (FAILED) に遷移します。

SendTaskSuccess で却下を表現する: approved: false を含む正常な応答を返すことで、Choice ステートでルーティングします。これは「処理は正常に完了した(タスクトークンは有効に解決した)が、ビジネスロジック的に却下された」という設計です。

SendTaskFailure 体験:

SendTaskFailure は「外部処理自体が失敗した」場合に使います:

aws stepfunctions send-task-failure \
  --task-token "TOKEN" \
  --error "ExpenseRejected" \
  --cause "却下理由: 経費規程違反"

WaitForApprovalCatchErrorEquals: ["ExpenseRejected"] を追加していれば捕捉できます。
このパターンは「承認システム側でエラーが発生した」ケースに有効です。


Step D: 最終形 — SNS通知付き完成形

ステートマシン作成手順:

  1. 「ステートマシンの作成」→「コードでワークフローを記述」
  2. 以下のASLを貼り付け(<SNS_TOPIC_ARN> を実際のARNに置換):
{
  "Comment": "経費承認ワークフロー — Step D(最終形: SQS Callback+承認・却下分岐+SNS通知)",
  "StartAt": "SubmitExpense",
  "States": {
 "SubmitExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_SUBMIT_EXPENSE>",
"Parameters": {
  "expense_id.$": "$.expense_id",
  "amount.$": "$.amount",
  "description.$": "$.description",
  "applicant.$": "$.applicant"
},
"ResultPath": "$.submission",
"Next": "WaitForApproval"
 },
 "WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
  "QueueUrl": "<SQS_QUEUE_URL>",
  "MessageBody": {
 "task_token.$": "$$.Task.Token",
 "expense_id.$": "$.expense_id",
 "amount.$": "$.amount",
 "applicant.$": "$.applicant",
 "message": "経費申請の承認をお願いします"
  }
},
"HeartbeatSeconds": 3600,
"TimeoutSeconds": 86400,
"ResultPath": "$.approval",
"Catch": [
  {
 "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],
 "ResultPath": "$.error",
 "Next": "NotifyTimeout"
  }
],
"Next": "RouteApproval"
 },
 "RouteApproval": {
"Type": "Choice",
"Choices": [
  {
 "Variable": "$.approval.approved",
 "BooleanEquals": true,
 "Next": "ProcessExpense"
  }
],
"Default": "NotifyRejected"
 },
 "ProcessExpense": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_PROCESS_EXPENSE>",
"Parameters": {
  "expense_id.$": "$.expense_id",
  "amount.$": "$.amount",
  "approver.$": "$.approval.approver"
},
"ResultPath": "$.processing",
"Next": "NotifyApproved"
 },
 "NotifyApproved": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
  "TopicArn": "<SNS_TOPIC_ARN>",
  "Message.$": "States.Format('経費申請 {} が承認されました(承認者: {})', $.expense_id, $.approval.approver)",
  "Subject": "経費申請 承認完了通知"
},
"ResultPath": null,
"Next": "ExpenseApproved"
 },
 "NotifyRejected": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
  "TopicArn": "<SNS_TOPIC_ARN>",
  "Message.$": "States.Format('経費申請 {} が却下されました', $.expense_id)",
  "Subject": "経費申請 却下通知"
},
"ResultPath": null,
"Next": "ExpenseRejected"
 },
 "NotifyTimeout": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
  "TopicArn": "<SNS_TOPIC_ARN>",
  "Message.$": "States.Format('経費申請 {} の承認がタイムアウトしました', $.expense_id)",
  "Subject": "経費申請 承認タイムアウト"
},
"ResultPath": null,
"Next": "ExpenseTimeout"
 },
 "ExpenseApproved": {
"Type": "Succeed"
 },
 "ExpenseRejected": {
"Type": "Fail",
"Error": "ExpenseRejected",
"Cause": "経費申請が却下されました"
 },
 "ExpenseTimeout": {
"Type": "Fail",
"Error": "ApprovalTimeout",
"Cause": "承認がタイムアウトしました"
 }
  }
}
  1. ステートマシン名: expense-approval-final

3つのシナリオをテスト:

① 承認フロー:

aws stepfunctions send-task-success \
  --task-token "TOKEN" \
  --task-output '{"approved": true, "approver": "manager.suzuki"}'

NotifyApproved → SNS メール受信(件名: 「経費申請 承認完了通知」)→ ExpenseApproved (SUCCEEDED)

② 却下フロー:

aws stepfunctions send-task-success \
  --task-token "TOKEN" \
  --task-output '{"approved": false, "reason": "金額超過"}'

NotifyRejected → SNS メール受信(件名: 「経費申請 却下通知」)→ ExpenseRejected (FAILED)

③ タイムアウトフロー(補足テスト):

本番ASLのTimeoutSeconds=86400は変更せず、TimeoutSeconds: 30 の短縮版で確認します:

SendTaskSuccessを送らずに30秒待つ
→ States.Timeout 発生
→ NotifyTimeout → SNS メール受信(件名: 「経費申請 承認タイムアウト」)→ ExpenseTimeout (FAILED)
💡 States.Format について: Step Functionsの組み込み関数で、動的な文字列を生成します。'経費申請 {} が承認されました', $.expense_id のように、{} が変数の値に置換されます。SNS通知のメッセージを動的に生成するのに便利です。

3-7. CloudWatch Logsで待機→再開の挙動を確認

Step Functionsの実行ログを確認することで、Callbackパターンの内部動作を詳しく把握できます。

ログ確認手順

  1. Step Functionsコンソール → 対象の実行を開く
  2. 「実行の詳細」タブ → 「ログ」セクションを確認
  3. CloudWatch Logsのリンクをクリックして詳細ログを表示

確認すべきイベントの流れ

ログで以下のシーケンスを確認してください:

WaitForApprovalStateEntered ← WaitForApproval ステート開始
TaskSubmitted← SQSにメッセージ送信 + Task Token発行
(...長い待機時間...)
TaskSucceeded← SendTaskSuccess を受信
WaitForApprovalStateExited  ← WaitForApproval ステート終了
ProcessExpenseStateEntered  ← 次のステートへ遷移

ExecutionHistory APIで待機時間を計測

AWS CLIを使ってプログラマティックに実行履歴を取得し、待機時間を計算できます:

# 実行ARNを確認
aws stepfunctions list-executions \
  --state-machine-arn "<YOUR_STATE_MACHINE_ARN>" \
  --query 'executions[0].executionArn' \
  --output text

# 実行履歴を取得
aws stepfunctions get-execution-history \
  --execution-arn "<EXECUTION_ARN>" \
  --query 'events[?type==`TaskSubmitted` || type==`TaskSucceeded`].[timestamp,type]' \
  --output table

出力例:

-----------------------------------------
| timestamp  | type |
-----------------------------------------
| 2026-04-12T10:00:00Z| TaskSubmitted |
| 2026-04-12T10:05:23Z| TaskSucceeded |
-----------------------------------------

TaskSubmittedTaskSucceeded のタイムスタンプ差が、実際の承認待機時間です。
この例では 5分23秒 承認待ちだったことがわかります。

CloudWatch Metrics でモニタリング

本番環境では以下のメトリクスを監視することを推奨します:

メトリクス説明アラート推奨値
ExecutionsFailed失敗した実行数1以上でアラート
ExecutionsTimedOutタイムアウトした実行数1以上でアラート
ExecutionThrottledスロットリングされた実行数急増時にアラート

まとめ(Section 3)

このセクションでは、経費承認ワークフローを段階的に構築しました。

Step追加した機能学んだこと
A基本Callback$$.Task.Token の仕組み、SendTaskSuccessによる再開
BタイムアウトHeartbeatSeconds/TimeoutSeconds、States.Timeout の Catch
C承認/却下分岐Choice ステートでの approved フラグによるルーティング
DSNS通知States.Format を使った動的メッセージ、3シナリオの完全テスト

Callbackパターンの本質: Step Functionsはtask_tokenを外部に渡し、そのトークンが返ってくるまで待ちます。「承認者がボタンを押す」「外部システムが処理を完了する」「人間が確認する」など、任意の外部イベントでワークフローを再開できるのがCallbackパターンの強みです。

次のSection 4では、このワークフロー全体をTerraformで自動構築します。


Section 4: Terraformでの構築

Section 3のコンソール操作で構築した経費承認ワークフローを、今度はTerraformで再現します。インフラをコードで管理することで、環境の再現性が高まり、チーム開発での一貫した運用が可能になります。


4-1. 前提条件

  • Terraform 1.0以上インストール済み
  • AWS CLI設定済み(aws configure 実行済み)
  • 以下のIAM権限が付与されていること:
  • Lambda の作成・実行
  • IAM ロール・ポリシーの作成
  • SQS キューの作成・操作
  • SNS トピックの作成・サブスクライブ
  • Step Functions ステートマシンの作成・実行
  • CloudWatch Logs の作成

4-2. ディレクトリ構成

sf-callback/
├── main.tf
├── variables.tf
├── outputs.tf
├── lambda/
│├── submit_expense.py
│└── process_expense.py
└── statemachine/
 └── definition.json.tpl

作業ディレクトリを作成します:

mkdir -p sf-callback/lambda sf-callback/statemachine
cd sf-callback

4-3. Lambda ソースコード

Section 3(コンソール版)と同じロジック構成の Lambda 関数です。

lambda/submit_expense.py

経費申請を受け付け、申請情報をそのまま返す関数です。

import json
import logging
from datetime import datetime, timezone

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):
 """
 経費申請受付 Lambda
 受け取った申請情報を検証し、タイムスタンプを付与して返す
 """
 logger.info(f"受信イベント: {json.dumps(event, ensure_ascii=False)}")

 expense_id = event.get("expense_id")
 amount = event.get("amount")
 description = event.get("description", "")
 applicant = event.get("applicant")

 # 基本バリデーション
 if not expense_id or amount is None or not applicant:
  raise ValueError("expense_id, amount, applicant は必須パラメータです")

 if amount <= 0:
  raise ValueError("amount は正の数値である必要があります")

 submitted_at = datetime.now(timezone.utc).isoformat()

 result = {
  "expense_id": expense_id,
  "amount": amount,
  "description": description,
  "applicant": applicant,
  "submitted_at": submitted_at,
  "status": "SUBMITTED",
 }

 logger.info(f"申請受付完了: {json.dumps(result, ensure_ascii=False)}")
 return result

lambda/process_expense.py

承認済みの経費を処理する関数です。

import json
import logging
from datetime import datetime, timezone

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):
 """
 経費処理 Lambda
 承認済みの経費申請を処理し、処理結果を返す
 """
 logger.info(f"受信イベント: {json.dumps(event, ensure_ascii=False)}")

 expense_id = event.get("expense_id")
 amount = event.get("amount")
 approver = event.get("approver")

 if not expense_id or amount is None or not approver:
  raise ValueError("expense_id, amount, approver は必須パラメータです")

 processed_at = datetime.now(timezone.utc).isoformat()

 result = {
  "expense_id": expense_id,
  "amount": amount,
  "approver": approver,
  "processed_at": processed_at,
  "status": "PROCESSED",
  "payment_scheduled": True,
 }

 logger.info(f"経費処理完了: {json.dumps(result, ensure_ascii=False)}")
 return result

4-4. statemachine/definition.json.tpl

コンソール版(Section 3)の Step D と同じ ASL 構造です。

Terraform の templatefile() 関数を使用し、${...} 形式のプレースホルダーを実際の ARN・URL に置換します。

templatefile() のしくみ

# main.tf での使用例
definition = templatefile("statemachine/definition.json.tpl", {
  submit_expense_arn  = aws_lambda_function.submit_expense.arn,
  process_expense_arn = aws_lambda_function.process_expense.arn,
  sqs_queue_url = aws_sqs_queue.expense_approval.url,
  sns_topic_arn = aws_sns_topic.expense_notification.arn
})
プレースホルダー置換される値
${submit_expense_arn}submit_expense Lambda の ARN
${process_expense_arn}process_expense Lambda の ARN
${sqs_queue_url}SQS キューの URL
${sns_topic_arn}SNS トピックの ARN

statemachine/definition.json.tpl の内容

{
  "Comment": "経費承認ワークフロー — Step D(最終形: SQS Callback+承認・却下分岐+SNS通知)",
  "StartAt": "SubmitExpense",
  "States": {
 "SubmitExpense": {
"Type": "Task",
"Resource": "${submit_expense_arn}",
"Parameters": {
  "expense_id.$": "$.expense_id",
  "amount.$": "$.amount",
  "description.$": "$.description",
  "applicant.$": "$.applicant"
},
"ResultPath": "$.submission",
"Next": "WaitForApproval"
 },
 "WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
  "QueueUrl": "${sqs_queue_url}",
  "MessageBody": {
 "task_token.$": "$$.Task.Token",
 "expense_id.$": "$.expense_id",
 "amount.$": "$.amount",
 "applicant.$": "$.applicant",
 "message": "経費申請の承認をお願いします"
  }
},
"HeartbeatSeconds": 3600,
"TimeoutSeconds": 86400,
"ResultPath": "$.approval",
"Catch": [
  {
 "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],
 "ResultPath": "$.error",
 "Next": "NotifyTimeout"
  }
],
"Next": "RouteApproval"
 },
 "RouteApproval": {
"Type": "Choice",
"Choices": [
  {
 "Variable": "$.approval.approved",
 "BooleanEquals": true,
 "Next": "ProcessExpense"
  }
],
"Default": "NotifyRejected"
 },
 "ProcessExpense": {
"Type": "Task",
"Resource": "${process_expense_arn}",
"Parameters": {
  "expense_id.$": "$.expense_id",
  "amount.$": "$.amount",
  "approver.$": "$.approval.approver"
},
"ResultPath": "$.processing",
"Next": "NotifyApproved"
 },
 "NotifyApproved": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
  "TopicArn": "${sns_topic_arn}",
  "Message.$": "States.Format('経費申請 {} が承認されました(承認者: {})', $.expense_id, $.approval.approver)",
  "Subject": "経費申請 承認完了通知"
},
"ResultPath": null,
"Next": "ExpenseApproved"
 },
 "NotifyRejected": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
  "TopicArn": "${sns_topic_arn}",
  "Message.$": "States.Format('経費申請 {} が却下されました', $.expense_id)",
  "Subject": "経費申請 却下通知"
},
"ResultPath": null,
"Next": "ExpenseRejected"
 },
 "NotifyTimeout": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
  "TopicArn": "${sns_topic_arn}",
  "Message.$": "States.Format('経費申請 {} の承認がタイムアウトしました', $.expense_id)",
  "Subject": "経費申請 承認タイムアウト"
},
"ResultPath": null,
"Next": "ExpenseTimeout"
 },
 "ExpenseApproved": {
"Type": "Succeed"
 },
 "ExpenseRejected": {
"Type": "Fail",
"Error": "ExpenseRejected",
"Cause": "経費申請が却下されました"
 },
 "ExpenseTimeout": {
"Type": "Fail",
"Error": "ApprovalTimeout",
"Cause": "承認がタイムアウトしました"
 }
  }
}
ポイント: コンソール版 Step D と ASL 構造が完全一致しています。ステート名・遷移・Parameters・HeartbeatSeconds・TimeoutSeconds・Catch のすべてが同一です。Terraform では ${...} 部分のみ動的に補間されます。

4-5. variables.tf

variable "aws_region" {
  description = "AWSリージョン"
  default  = "ap-northeast-1"
}

variable "project_name" {
  description = "プロジェクト名(リソース命名プレフィックスに使用)"
  default  = "expense-approval"
}

variable "notification_email" {
  description = "承認/却下/タイムアウト通知先メールアドレス"
  type  = string
}

4-6. main.tf(完全なコード)

terraform {
  required_version = ">= 1.0"
  required_providers {
 aws = {
source  = "hashicorp/aws"
version = "~> 5.0"
 }
 archive = {
source  = "hashicorp/archive"
version = "~> 2.0"
 }
  }
}

provider "aws" {
  region = var.aws_region
}

data "aws_caller_identity" "current" {}
data "aws_region" "current" {}

# ===================================================
# Lambda ZIPパッケージ生成
# ===================================================

data "archive_file" "submit_expense" {
  type  = "zip"
  source_file = "${path.module}/lambda/submit_expense.py"
  output_path = "${path.module}/.terraform/tmp/submit_expense.zip"
}

data "archive_file" "process_expense" {
  type  = "zip"
  source_file = "${path.module}/lambda/process_expense.py"
  output_path = "${path.module}/.terraform/tmp/process_expense.zip"
}

# ===================================================
# IAM ロール: Lambda 実行ロール
# ===================================================

resource "aws_iam_role" "lambda_role" {
  name = "${var.project_name}-lambda-role"

  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Action = "sts:AssumeRole"
  Effect = "Allow"
  Principal = {
 Service = "lambda.amazonaws.com"
  }
}
 ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_basic" {
  role = aws_iam_role.lambda_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

# ===================================================
# IAM ロール: Step Functions 実行ロール
# ===================================================

resource "aws_iam_role" "sf_execution_role" {
  name = "${var.project_name}-sf-execution-role"

  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Action = "sts:AssumeRole"
  Effect = "Allow"
  Principal = {
 Service = "states.amazonaws.com"
  }
}
 ]
  })
}

resource "aws_iam_role_policy" "sf_policy" {
  name = "${var.project_name}-sf-policy"
  role = aws_iam_role.sf_execution_role.id

  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Sid = "InvokeLambda"
  Effect = "Allow"
  Action = ["lambda:InvokeFunction"]
  Resource = [
 aws_lambda_function.submit_expense.arn,
 aws_lambda_function.process_expense.arn
  ]
},
{
  Sid = "SendSQSMessage"
  Effect = "Allow"
  Action = ["sqs:SendMessage"]
  Resource = [aws_sqs_queue.expense_approval.arn]
},
{
  Sid = "PublishSNS"
  Effect = "Allow"
  Action = ["sns:Publish"]
  Resource = [aws_sns_topic.expense_notification.arn]
},
{
  Sid = "CloudWatchLogs"
  Effect = "Allow"
  Action = [
 "logs:CreateLogDelivery",
 "logs:GetLogDelivery",
 "logs:UpdateLogDelivery",
 "logs:DeleteLogDelivery",
 "logs:ListLogDeliveries",
 "logs:PutResourcePolicy",
 "logs:DescribeResourcePolicies",
 "logs:DescribeLogGroups"
  ]
  Resource = ["*"]
}
 ]
  })
}

# ===================================================
# CloudWatch Logs グループ
# ===================================================

resource "aws_cloudwatch_log_group" "submit_expense" {
  name  = "/aws/lambda/${var.project_name}-submit-expense"
  retention_in_days = 7
}

resource "aws_cloudwatch_log_group" "process_expense" {
  name  = "/aws/lambda/${var.project_name}-process-expense"
  retention_in_days = 7
}

# ===================================================
# Lambda 関数
# ===================================================

resource "aws_lambda_function" "submit_expense" {
  filename= data.archive_file.submit_expense.output_path
  function_name = "${var.project_name}-submit-expense"
  role = aws_iam_role.lambda_role.arn
  handler = "submit_expense.lambda_handler"
  runtime = "python3.12"
  source_code_hash = data.archive_file.submit_expense.output_base64sha256

  depends_on = [
 aws_iam_role_policy_attachment.lambda_basic,
 aws_cloudwatch_log_group.submit_expense
  ]
}

resource "aws_lambda_function" "process_expense" {
  filename= data.archive_file.process_expense.output_path
  function_name = "${var.project_name}-process-expense"
  role = aws_iam_role.lambda_role.arn
  handler = "process_expense.lambda_handler"
  runtime = "python3.12"
  source_code_hash = data.archive_file.process_expense.output_base64sha256

  depends_on = [
 aws_iam_role_policy_attachment.lambda_basic,
 aws_cloudwatch_log_group.process_expense
  ]
}

# ===================================================
# SQS キュー
# ===================================================

resource "aws_sqs_queue" "expense_approval" {
  name  = "${var.project_name}-approval"
  # VisibilityTimeout は HeartbeatSeconds(3600)より短くしないこと
  # ここでは 300 秒(5分)に設定(承認者がメッセージを処理する想定時間)
  visibility_timeout_seconds = 300
  message_retention_seconds  = 86400  # 1日
}

# ===================================================
# SNS トピック&メールサブスクリプション
# ===================================================

resource "aws_sns_topic" "expense_notification" {
  name = "${var.project_name}-notification"
}

resource "aws_sns_topic_subscription" "email" {
  topic_arn = aws_sns_topic.expense_notification.arn
  protocol  = "email"
  endpoint  = var.notification_email
}

# ===================================================
# Step Functions ステートマシン
# ===================================================

resource "aws_sfn_state_machine" "expense_approval" {
  name  = "${var.project_name}-workflow"
  role_arn = aws_iam_role.sf_execution_role.arn

  definition = templatefile("statemachine/definition.json.tpl", {
 submit_expense_arn  = aws_lambda_function.submit_expense.arn,
 process_expense_arn = aws_lambda_function.process_expense.arn,
 sqs_queue_url = aws_sqs_queue.expense_approval.url,
 sns_topic_arn = aws_sns_topic.expense_notification.arn
  })

  depends_on = [
 aws_iam_role_policy.sf_policy
  ]
}

4-7. outputs.tf

output "state_machine_arn" {
  description = "Step Functions ステートマシンの ARN"
  value = aws_sfn_state_machine.expense_approval.arn
}

output "sqs_queue_url" {
  description = "承認待ち SQS キューの URL"
  value = aws_sqs_queue.expense_approval.url
}

output "sns_topic_arn" {
  description = "通知用 SNS トピックの ARN"
  value = aws_sns_topic.expense_notification.arn
}

output "submit_expense_lambda_arn" {
  description = "submit_expense Lambda の ARN"
  value = aws_lambda_function.submit_expense.arn
}

output "process_expense_lambda_arn" {
  description = "process_expense Lambda の ARN"
  value = aws_lambda_function.process_expense.arn
}

4-8. デプロイ手順

ファイルの準備ができたら、以下の順番でデプロイします。

# 1. 初期化(プロバイダーのダウンロード)
terraform init

# 2. 実行計画を確認
terraform plan -var="notification_email=your@email.com"

# 3. リソースを作成
terraform apply -var="notification_email=your@email.com"

terraform apply 実行後、SNS サブスクリプションの確認メールが届きます。メール内の「Confirm subscription」リンクをクリックしてください。これをしないと通知メールが届きません。

terraform apply が成功すると、各リソースの ARN・URL が出力されます:

Outputs:

state_machine_arn  = "arn:aws:states:ap-northeast-1:123456789012:stateMachine:expense-approval-workflow"
sqs_queue_url= "https://sqs.ap-northeast-1.amazonaws.com/123456789012/expense-approval-approval"
sns_topic_arn= "arn:aws:sns:ap-northeast-1:123456789012:expense-approval-notification"
submit_expense_lambda_arn= "arn:aws:lambda:ap-northeast-1:123456789012:function:expense-approval-submit-expense"
process_expense_lambda_arn  = "arn:aws:lambda:ap-northeast-1:123456789012:function:expense-approval-process-expense"

4-9. 動作確認(CLI)

Step 1: 経費申請を実行する

# ステートマシン ARN を取得
STATE_MACHINE_ARN=$(terraform output -raw state_machine_arn)
SQS_QUEUE_URL=$(terraform output -raw sqs_queue_url)

# 経費申請を開始
EXECUTION=$(aws stepfunctions start-execution \
  --state-machine-arn "$STATE_MACHINE_ARN" \
  --input '{
 "expense_id": "EXP-TF-001",
 "amount": 50000,
 "description": "出張交通費",
 "applicant": "yamada.taro"
  }')

EXECUTION_ARN=$(echo "$EXECUTION" | jq -r '.executionArn')
echo "実行ARN: $EXECUTION_ARN"

Step 2: SQS からタスクトークンを取得する

ワークフローが WaitForApproval ステートに到達すると、SQS にメッセージが届きます。

# SQS メッセージを受信(タスクトークンが含まれる)
MESSAGE=$(aws sqs receive-message \
  --queue-url "$SQS_QUEUE_URL" \
  --max-number-of-messages 1)

echo "$MESSAGE" | jq '.'

# タスクトークンを取り出す
TASK_TOKEN=$(echo "$MESSAGE" | jq -r '.Messages[0].Body | fromjson | .task_token')
echo "タスクトークン: $TASK_TOKEN"

# SQS からメッセージを削除(受信後は必ず削除する)
RECEIPT_HANDLE=$(echo "$MESSAGE" | jq -r '.Messages[0].ReceiptHandle')
aws sqs delete-message \
  --queue-url "$SQS_QUEUE_URL" \
  --receipt-handle "$RECEIPT_HANDLE"

Step 3-A: 承認する

aws stepfunctions send-task-success \
  --task-token "$TASK_TOKEN" \
  --task-output '{"approved": true, "approver": "manager.suzuki"}'

承認後の流れ:
1. RouteApprovalProcessExpenseNotifyApproved
2. SNS 経由でメール通知が届く
3. ExpenseApproved(Succeed)で正常終了

Step 3-B: 却下する

aws stepfunctions send-task-success \
  --task-token "$TASK_TOKEN" \
  --task-output '{"approved": false, "approver": "manager.suzuki"}'

却下後の流れ:
1. RouteApprovalNotifyRejected
2. SNS 経由でメール通知が届く
3. ExpenseRejected(Fail)で終了

実行状態の確認

# 実行状態を確認
aws stepfunctions describe-execution \
  --execution-arn "$EXECUTION_ARN" \
  | jq '{status: .status, startDate: .startDate, stopDate: .stopDate}'

後片付け

確認が終わったらリソースを削除します:

terraform destroy -var="notification_email=your@email.com"
⚠️ 注意: terraform destroy は作成したすべての AWS リソースを削除します。実行前に内容を確認してください。

まとめ: コンソール版との対応関係

コンソール版(Section 3)Terraform(Section 4)
Step D の ASL を手動入力definition.json.tpl として管理
Lambda を手動作成aws_lambda_function リソース
SQS を手動作成aws_sqs_queue リソース
SNS を手動作成aws_sns_topic リソース
IAM を手動設定aws_iam_role / aws_iam_role_policy

コンソール版 Step D と ASL 構造が完全一致しています。 ステート名・遷移・Parameters・HeartbeatSeconds(3600)・TimeoutSeconds(86400)・Catch の構成はすべて同一であり、${...} のプレースホルダー部分のみが Terraform の templatefile() によって実際の ARN・URL に置換されます。


Section 5: 実践Tips(Callbackパターン設計ガイド)

📋 この Section では、Callbackパターンを実務で使う際のTipsと注意点を解説します。

5-1. タスクトークンの寿命と安全な保管

寿命: タスクトークンは最大1年間有効だが、TimeoutSeconds で短縮することを強く推奨。

安全な保管場所(重要度順):

保管場所用途注意点
DynamoDB標準的な保管場所(expense_idをキーに保存)TTLを設定して自動削除推奨
SSM Parameter Store短期保管・設定値と一緒に管理SecureStringで暗号化
SQSメッセージ本文配信と同時に受け取るシンプルな方法メッセージ可視性タイムアウトに注意

DynamoDBでのトークン保管例(Lambda内):

import boto3
import os

def save_task_token(expense_id: str, task_token: str) -> None:
 dynamodb = boto3.resource("dynamodb")
 table = dynamodb.Table(os.environ["EXPENSE_TABLE"])
 table.update_item(
  Key={"expense_id": expense_id},
  UpdateExpression="SET task_token = :token, #st = :status",
  ExpressionAttributeNames={"#st": "status"},
  ExpressionAttributeValues={":token": task_token, ":status": "pending_approval"}
 )

5-2. 再送・リトライ時のべき等性設計

問題: SQSのメッセージは少なくとも1回配信(at-least-once delivery)。
同じタスクトークンで SendTaskSuccess が複数回呼ばれた場合、2回目以降はエラーになる。

べき等性の実装パターン:

import boto3
import json

def handle_approval(expense_id: str, approved: bool, approver: str) -> dict:
 dynamodb = boto3.resource("dynamodb")
 table = dynamodb.Table("expense-requests")

 # べき等性チェック: 既に処理済みか確認
 item = table.get_item(Key={"expense_id": expense_id})["Item"]
 if item.get("status") not in ("pending_approval",):
  print(f"既に処理済み: expense_id={expense_id}, status={item['status']}")
  return {"already_processed": True}

 task_token = item["task_token"]

 # ステータスを更新してから SendTaskSuccess(二重送信防止)
 table.update_item(
  Key={"expense_id": expense_id},
  UpdateExpression="SET #st = :status",
  ConditionExpression="attribute_exists(task_token)",
  ExpressionAttributeNames={"#st": "status"},
  ExpressionAttributeValues={":status": "processing"}
 )

 sf_client = boto3.client("stepfunctions")
 sf_client.send_task_success(
  taskToken=task_token,
  output=json.dumps({"approved": approved, "approver": approver})
 )
 return {"sent": True}

ポイント: ステータスを processing に更新してから SendTaskSuccess を呼ぶことで、並列実行やリトライによる二重送信を防止する。

5-3. API Gateway + Lambda で Webベース承認UIを作るパターン(概念のみ)

CLIではなくWebブラウザから承認するパターン:

① SF: タスクトークンをDynamoDBに保存 + メール/Slack通知(承認URLを含む)
② 承認者: ブラウザでURLにアクセス → 承認/却下ボタンをクリック
③ API GW + Lambda: expense_idからDynamoDBでトークンを取得 → SendTaskSuccess/Failure
④ SF: 再開

承認URL例: https://api.example.com/approve?expense_id=EXP-001&action=approve

このパターンにより、エンジニア以外の承認者(マネージャーや経理担当者)もブラウザから承認作業を行えるようになる。

5-4. HeartbeatSeconds の活用パターン

長時間処理中の「生存確認」用途として HeartbeatSeconds を活用する:

import boto3
import time

def long_running_process_with_heartbeat(task_token: str) -> None:
 sf_client = boto3.client("stepfunctions")

 for i in range(10):  # 長時間処理のシミュレーション
  # 処理ステップ(例: 5分かかる処理)
  time.sleep(300)

  # Heartbeat送信(HeartbeatSecondsのタイマーをリセット)
  sf_client.send_task_heartbeat(taskToken=task_token)
  print(f"Heartbeat送信 ({i+1}/10)")

 # 処理完了
 sf_client.send_task_success(
  taskToken=task_token,
  output='{"completed": true}'
 )

使い分けの目安:

設定役割推奨値
TimeoutSecondsステップ全体のタイムアウト処理の最大所要時間 × 1.5
HeartbeatSeconds生存確認の間隔Heartbeat送信間隔 × 2

HeartbeatSeconds を設定しておくと、処理が途中でフリーズした場合に States.HeartbeatTimeout エラーとして検出でき、Catch/Retry で適切に対処できる。


Section 6: ハンズオン後の削除手順

6-1. コスト注意事項

⚠️ 放置するとSQSメッセージ、DynamoDBストレージなどで課金が発生する可能性があります。
リソース月額目安備考
Lambda無料枠内(本ハンズオン程度)100万リクエスト/月まで無料
Step Functions無料枠内(本ハンズオン程度)4,000回の状態遷移まで無料
SQS無料枠内(本ハンズオン程度)100万リクエスト/月まで無料
SNS無料枠内(本ハンズオン程度)100万件まで無料
DynamoDB~$0.25/GB/月ストレージ課金に注意
CloudWatch Logs~$0.50/GBロググループを削除で停止

6-2. Terraformで構築した場合

以下のコマンド1つで全リソースを削除できる:

terraform destroy -var="notification_email=your@email.com"
💡 terraform destroy を実行すると、terraform apply で作成した全リソースが削除される。削除前に確認プロンプトが表示されるので、yes と入力する。

Terraformで管理されない CloudWatch Logs は手動で削除が必要:

aws logs delete-log-group --log-group-name /aws/lambda/submit-expense
aws logs delete-log-group --log-group-name /aws/lambda/process-expense

6-3. コンソールで構築した場合の削除チェックリスト

以下の順番で削除する(依存関係に注意):

  • [ ] Step Functions ステートマシンを削除(コンソール: Step Functions → ステートマシン)
  • expense-callback-step-aexpense-callback-step-bexpense-callback-step-cexpense-callback-final
  • [ ] Lambda 関数を削除(コンソール: Lambda → 関数)
  • submit-expenseprocess-expense
  • [ ] SQS キューを削除(コンソール: SQS → キュー)
  • expense-approval-queue
  • [ ] SNS トピックを削除(コンソール: SNS → トピック)
  • expense-notification(サブスクリプションも自動削除)
  • [ ] DynamoDB テーブルを削除(コンソール: DynamoDB → テーブル)
  • expense-requests
  • [ ] IAM ロールと付随するポリシーを削除(コンソール: IAM → ロール)
  • sf-expense-execution-role
  • [ ] CloudWatch Logs ロググループを削除(コンソール: CloudWatch → ロールグループ)
  • /aws/lambda/submit-expense/aws/lambda/process-expense

Section 7: まとめと次のステップ

7-1. この記事で学んだこと

このハンズオンを通じて、以下の内容を実践的に習得した:

  • .waitForTaskToken によるCallbackパターンの仕組み — トークン発行→停止→返送→再開のライフサイクル
  • .sync vs .waitForTaskToken の使い分け — 同期的なECS/Lambdaタスク実行と非同期の人間承認フローの違い
  • SendTaskSuccess / SendTaskFailure / SendTaskHeartbeat API の使い方 — AWS CLI と Python SDKの両方で操作
  • HeartbeatSeconds / TimeoutSeconds のCallback固有の設定 — 生存確認と全体タイムアウトの使い分け
  • AWS CLIを使った手動承認体験 — 「承認ボタン」の代わりにCLIコマンドで承認する感覚をつかむ
  • 段階的構築(Step A→B→C→D)でCallback機能を段階的に拡張 — 基本構造から始めてDynamoDB統合・SQS連携・人間承認まで積み上げ
  • タスクトークンの安全な保管(DynamoDB)とべき等性設計 — 実務で使える堅牢な実装パターン

7-2. 次のステップ

Callbackパターンをマスターした後は、以下のテーマに進むとStep Functionsの理解がさらに深まる:

  • Distributed Map(大規模並列バッチ処理) — S3上の大量ファイルを並列処理するパターン — 第6弾予告
  • Express vs Standard Workflow の使い分け — 高頻度処理とコスト最適化のトレードオフ — 第7弾予告
  • EventBridge + Step Functions の連携 — イベント駆動アーキテクチャでStep Functionsを自動起動するパターン

7-3. シリーズリンク


本記事は「AWS ハンズオン TechBlog」Step Functions シリーズの第5回です。

シリーズ一覧:
– 第1回: AWS Step Functions 入門 — コンソールとTerraformで学ぶハンズオン
– 第2回: ECS × Step Functions 入門 — CSVバッチをFargateタスクでジョブ化するハンズオン
– 第3回: Step Functions エラーハンドリング完全ガイド — 注文処理パイプラインで学ぶRetry/Catch/Timeout
– 第4回: Step Functions 入出力データフロー制御完全ガイド — 5つのフィルタでペイロードを最適化するハンズオン
– 第5回: 本記事(Callbackパターン完全ガイド)


次の記事を読む: 第6回 Step Functions Distributed Map ▶

7-4. 参考リンク

  • Amazon States Language — .waitForTaskToken
  • SendTaskSuccess API リファレンス
  • Step Functions Callback パターンの例
  • SQS 可視性タイムアウト