Step Functions 入出力データフロー制御完全ガイド — 5つのフィルタでペイロードを最適化するハンズオン

目次

Step Functions 入出力データフロー制御完全ガイド — 5つのフィルタでペイロードを最適化するハンズオン

公開日: 2026-04-12
難易度: 中級
所要時間: 約90分
シリーズ: [Step Functions シリーズ 第4回]

この記事で学ぶこと
– 5つのデータフローフィルタ(InputPath/Parameters/ResultSelector/ResultPath/OutputPath)の役割と評価順序
– 注文enrichmentパイプライン(顧客情報取得→価格計算→出力整形)で全フィルタを実践
– ResultPath “$.key” でステートをまたいでデータを積み重ねるパターン
– Context Object ($$) と Intrinsic Functions でLambda不要のデータ変換
– Terraform版との完全一致ASL定義

シリーズ前回記事:
– 第1回: AWS Step Functions 入門 — コンソールとTerraformで学ぶハンズオン
– 第2回: ECS × Step Functions 入門 — CSVバッチをFargateタスクでジョブ化するハンズオン
– 第3回: Step Functions エラーハンドリング完全ガイド


目次

  1. Step Functions 入出力データフロー制御(概念編)
  2. アーキテクチャ解説
  3. AWSコンソールでのハンズオン
  4. Terraformでの構築
  5. 応用パターン集(Intrinsic Functions活用)
  6. ハンズオン後の削除手順
  7. まとめと次のステップ

1. Step Functions 入出力データフロー制御(概念編)

1-1. なぜデータフロー制御が必要か

分散ステートマシンでのデータ肥大化問題

Step Functionsのステートマシンは、前のステートの出力が次のステートの入力として引き渡される仕組みになっている。シンプルなワークフローではこれで十分だが、複数のステップが連鎖するワークフローでは問題が生じる。

例えば、注文処理ワークフローを考えてみよう。注文情報として orderId, customerId, items, shippingAddress, paymentInfo, metadata などが初期入力として渡されるとする。ReserveInventory ステートが完了すると、その結果(在庫確保レポート)が加わる。ProcessPayment が完了すると、また決済結果が追加される。ステートが進むにつれ、後続の Lambda 関数には関係のないデータまで蓄積され続ける。

これには 3 つの実害がある:

  1. Lambda ペイロード上限への接近: Lambda 関数の入出力には 6MB の制限がある。不要なデータを引き回すと、この上限に意図せず近づく
  2. ビジネスロジックの混乱: Lambda 関数の実装者が「どのフィールドを使えばいいか」を迷う。不要フィールドが混在していると、誤ったフィールドを参照するバグにつながる
  3. 可読性の低下: Step Functions コンソールで実行履歴を確認する際に、不要データが多いとデバッグが困難になる

Step Functions が提供する 5 つのフィルタによる解決策

Step Functions は、各ステートでデータを精密にコントロールするための 5 つのフィルタ をASL(Amazon States Language)に組み込んでいる:

フィルタ役割
InputPathステート入力全体から一部を抜き出す
ParametersTask への入力を柔軟に組み立てる
ResultSelectorTask 結果から必要部分を選択・整形する
ResultPathTask 結果をステート入力のどこに配置するか指定する
OutputPath次のステートへ渡す最終出力を絞り込む

これら 5 つを使いこなすことで、「Lambda には必要なデータだけを渡し、次のステートには必要な結果だけを送る」という精密なデータフロー制御が実現できる。


1-2. 評価順序の図解(最重要)

5 つのフィルタは 必ず決まった順序で 評価される。この順序を理解しないと、意図しない動作が起きても原因が追えない。

State 入力(前 State の出力 or 実行開始時の input)
  ↓ ① InputPath — 入力全体から一部を抜き出す(JSONPath)
  ↓ ② Parameters— Task への入力を自由に組み立てる(固定値 + JSONPath 参照混在可)
  ↓ [Task / Lambda 実行]
  ↓ ③ ResultSelector — Task 結果から必要部分を選択
  ↓ ④ ResultPath— Task 結果を元の入力のどこに配置するか
  ↓ ⑤ OutputPath— 最終出力の絞り込み
次の State の入力


重要な概念的区別:2 つのレイヤー

この 5 つのフィルタには 2 つの異なるレイヤー が存在する:

レイヤーフィルタ作用対象
State レイヤーInputPath, OutputPathState の入出力(ステートマシン全体のデータフロー)
Task レイヤーParameters, ResultSelectorTask の入出力(Lambda などのリソースに渡す/受け取るデータ)
結合レイヤーResultPathTask 結果を State の入力にマージする(2 つのレイヤーをつなぐ)

この区別がわかっていないと、「InputPathParameters のどちらを使えばいいか」「ResultSelectorOutputPath の違いは何か」という混乱に陥る。


1-3. 各フィルタの詳細解説

InputPath

役割: ステート入力全体から一部を抜き出して Task 入力の候補にする。

動作
"$"入力全体を渡す(デフォルト
"$.order"order フィールドのみを抜き出す(JSONPath)
null空の JSON {} を渡す(Task が入力を必要としない場合)

構文: JSONPath 文字列($ で始まる)

"InputPath": "$.order"


使いどころ: ステート入力のうち、Task に関連するサブオブジェクト全体をまとめて渡したいとき。複雑な絞り込みや値の組み立てが必要なら Parameters を使う。


Parameters

役割: Task への入力を柔軟に組み立てる。固定値と入力からの参照を自由に組み合わせられる。最もよく使うフィルタ

記法動作
"key": "value"固定値(文字列をそのまま渡す)
"key.$": "$.path"JSONPath 参照(.$ サフィックスが必須
"key.$": "$$.Execution.Id"Context Object 参照($$ で実行コンテキストを参照)
"Parameters": {
  "order_id.$": "$.order_id",
  "customer_id.$": "$.customer_id",
  "action": "lookup",
  "execution_id.$": "$$.Execution.Id"
}


よくある間違い: .$ サフィックスを忘れると、"$.order_id" という文字列リテラルがそのまま Lambda に渡される。Lambda のログを見ると "order_id": "$.order_id" となっており、デバッグに気づきにくいバグになる。

InputPath との使い分け:
InputPath: 入力の一部を そのまま Task に渡したい → シンプル
Parameters: 複数フィールドを 選んで組み立て直したい、固定値を追加したい → 柔軟


ResultSelector

役割: Task の実行結果から必要なフィールドのみを選択して整形する。Parameters同じ記法.$ サフィックス)を使う。

"ResultSelector": {
  "customer_name.$": "$.name",
  "tier.$": "$.subscription_tier",
  "source": "dynamodb-lookup"
}


使いどころ: Lambda の実行結果には、ステータスコードやメタデータなど不要な情報が含まれることがある。ResultSelector で必要なフィールドだけを選んでから ResultPathOutputPath に渡す。

OutputPath との違い: ResultSelectorTask 結果(Lambda の return 値) に作用する。OutputPathResultPath 適用後の State 全体の出力 に作用する。処理のタイミングが異なる。


ResultPath

役割: Task 結果(ResultSelector 適用後)を State 入力のどこに配置するかを指定する。元の入力と Task 結果を マージ する操作。

動作
"$"Task 結果で State 入力全体を置換デフォルト)。元の入力は失われる
"$.result"元の入力を保持しつつ、$.result に Task 結果を追加(最もよく使う
nullTask 結果を破棄し、元の入力をそのまま次に渡す(副作用のみの Task に使用)
元の入力: { "orderId": "ORD-001", "amount": 5000 }
Task 結果: { "customer_name": "田中太郎", "tier": "premium" }

ResultPath: "$"→ { "customer_name": "田中太郎", "tier": "premium" }  ← 元入力消滅
ResultPath: "$.customer" → { "orderId": "ORD-001", "amount": 5000, "customer": { "customer_name": "田中太郎", "tier": "premium" } }
ResultPath: null  → { "orderId": "ORD-001", "amount": 5000 }  ← Task 結果破棄


OutputPath との混乱ポイント:
ResultPath = Task 結果をどこに 「入れるか」(マージ操作)
OutputPath = 最終出力から何を 「取り出すか」(フィルタ操作)


OutputPath

役割: ResultPath 適用後の State 全体のデータから、次のステートへ渡す部分を絞り込む。

動作
"$"全体を渡す(デフォルト
"$.customer"customer フィールドのみを次のステートへ渡す
null空の JSON {} を次のステートへ渡す
"OutputPath": "$.customer"


使いどころ: ワークフローの途中で、特定のステート以降に不要なデータを切り捨てたいとき。InputPath がステートの 入口 でフィルタするのに対し、OutputPath出口 でフィルタする。


1-4. よくある混乱ポイントまとめ

混乱ポイント正しい理解
ResultPath: "$" vs ResultPath: null"$" = Task 結果で全体を置換(元入力が消える)/ null = Task 結果を破棄(元入力を維持)
Parameters".$" 忘れ.$ なしだと JSONPath が文字列リテラルとして渡される → Lambda のログで発覚するが気づきにくい
InputPath vs Parameters の使い分けInputPath = 単純な切り出し(サブオブジェクト丸ごと) / Parameters = 柔軟な組み立て(複数フィールド選択・固定値追加)
ResultSelector vs OutputPathResultSelector = Task 結果(Lambda の戻り値)に作用 / OutputPath = ResultPath 適用後の State 全体の出力に作用
ResultPath vs OutputPathResultPath = 結果のマージ先(「どこに入れるか」) / OutputPath = 最終出力の切り出し(「何を取り出すか」)
デフォルト挙動InputPath, OutputPath のデフォルトは "$"(全体)。ResultPath のデフォルトも "$"(全置換)。すべて省略 = データがそのまま流れると勘違いしやすいが、ResultPath: "$" は元入力を破棄する点に注意

1-5. Intrinsic Functions との連携

Step Functions には、ParametersResultSelector の中で使える Intrinsic Functions(組み込み関数) が用意されている。単純な値の抜き出し・組み立て以上の処理を Lambda を介さずに直接 ASL 内で行える。

主な Intrinsic Functions

関数説明
States.Format文字列を動的に組み立てる"arn:aws:sns:ap-northeast-1:{}:{}"
States.JsonMerge2 つの JSON オブジェクトをマージする元の入力と Task 結果の浅いマージ
States.StringToJson文字列を JSON に変換するLambda が文字列で返した JSON のパース
States.JsonToStringJSON を文字列に変換するJSONPath 参照値の文字列化
States.Array配列を生成する複数の値を配列にまとめる
States.ArrayLength配列の長さを取得する
States.Base64Encode / States.Base64DecodeBase64 変換
States.Hashハッシュ値を生成するMD5, SHA-256 など
States.MathAdd / States.MathRandom数値演算カウンターのインクリメントなど

States.Format の使用例

"Parameters": {
  "message.$": "States.Format('注文 {} の処理を開始します。顧客: {}', $.orderId, $.customerId)"
}


States.JsonMerge の使用例

Parameters で元の入力と Task 結果を浅くマージしたいとき(ResultPath では実現できない複雑なマージが必要な場合)に使用する。

"ResultSelector": {
  "merged.$": "States.JsonMerge($$.Execution.Input, $, false)"
}


注意: States.JsonMerge の第 3 引数はマージモードで、現時点では false(浅いマージ)のみサポートされている。深いマージは未対応。


1-6. ベストプラクティス

不要なデータは早めに除去する

InputPathOutputPath を活用して、Lambda に渡す前・次のステートに渡す前に不要データを除去する。Lambda の入出力ペイロード制限(6MB)への余裕が生まれるだけでなく、実行履歴の可読性も向上する。

ResultPath: "$.result" を常套手段として使う

元のステート入力を保持しながら Task の結果を追加するのに最適なパターン。後続のステートで元の入力(例: orderId)と Task 結果(例: customer 情報)の両方を参照できるようになる。

"ResultPath": "$.lookupResult"


Parameters.$ サフィックスはコードレビューで必ず確認する

"key": "$.path""key.$": "$.path" は外見が似ているが動作が全く異なる。前者は文字列リテラル "$.path" を渡し、後者は JSONPath 参照で実際の値を渡す。CI/CD パイプラインに ASL の静的解析(例: cfn-lintaws-sam-cli validate)を組み込む。

Context Object($$)を活用する

$$.Execution.Id(実行 ARN)や $$.Task.Token(タスクトークン)は、べき等処理やコールバックパターンで頻繁に使う。これらは Parameters.$ サフィックスと組み合わせて参照できる。

"Parameters": {
  "idempotency_key.$": "$$.Execution.Id",
  "task_token.$": "$$.Task.Token"
}


データフロー全体を図示してから実装する

5 つのフィルタが組み合わさると、どの時点でどのデータが存在するかを頭の中で追うのが難しくなる。ステートごとに「入力 → InputPath → Parameters → Task → ResultSelector → ResultPath → OutputPath → 出力」の流れを図にしてから ASL を書くと、バグを未然に防げる。


概念を理解したところで、次のセクションでは実際にハンズオンで使う注文エンリッチメントパイプラインのアーキテクチャを解説する。


2. アーキテクチャ解説

2-1. ハンズオンシナリオ概要

本記事のハンズオンでは 注文エンリッチメントパイプライン を題材にします。ECサイトで注文を受け付けた際、注文データに顧客情報・価格情報を付加して最終的な処理済み注文を生成するシナリオです。

注文データ (order_id, customer_id, items)
 ↓ GetCustomerInfo ← 顧客情報をDynamoDB相当から取得
 ↓ CalculatePrice  ← 顧客ティアに応じた割引価格を計算
 ↓ FormatOutput ← 最終出力を整形
処理済み注文 (enriched order data)


パイプラインを構成する3ステート

ステート種別役割使用するデータフローフィルタ
GetCustomerInfoLambda Statecustomer_id でDynamoDBから顧客情報を取得し、注文データに $.customer として追加Parameters, ResultSelector, ResultPath
CalculatePriceLambda State顧客ティア(premium/standard)に応じた割引を適用して最終価格を計算し、$.pricing として追加Parameters, ResultSelector, ResultPath
FormatOutputPass State前段までのデータを最終的な出力形式に整形して返すParameters, OutputPath

この3ステートのパイプラインを通じて、5つのデータフローフィルタ(InputPath / Parameters / ResultSelector / ResultPath / OutputPath)が具体的にどこで・どのように機能するかを体験的に学ぶことができます。


2-2. 全体アーキテクチャ

全体アーキテクチャ

上図は注文エンリッチメントパイプラインの全体構成を示しています。

  • : 実行トリガー(ユーザーからのAPI呼び出しや EventBridge によるスケジュール実行)
  • 中央: Step Functions ステートマシン(sf-order-enrichment-pipeline
  • GetCustomerInfo Lambda → CalculatePrice Lambda → FormatOutput Pass State の3ステートで構成
  • 各ステートの内部にどのフィルタが設定されているかを示している
  • : 最終出力(エンリッチ済み注文データ)
  • 下部: IAMロール(Lambda実行権限・DynamoDB読み取り権限・CloudWatch Logs書き込み権限)

パイプラインを流れるデータの変化(俯瞰)

パイプライン開始時の入力データはシンプルな注文情報です。

// ExecutionInput (StartExecution で渡すデータ)
{
  "order_id": "ORD-001",
  "customer_id": "CUST-123",
  "items": [
 { "sku": "P-001", "qty": 2, "price": 5000 }
  ]
}


このデータが3ステートを通過することで、以下のようにエンリッチされます。

CalculatePrice通過後の中間データ(FormatOutput適用前):

{
  "order_id": "ORD-001",
  "customer_id": "CUST-123",
  "items": [{"sku": "P-001", "qty": 2, "price": 5000}],
  "customer": {
 "customer_name": "田中太郎",
 "customer_tier": "premium"
  },
  "pricing": {
 "subtotal": 10000,
 "discount": 2000,
 "total": 8000
  }
}


FormatOutput(Pass State)で整形した最終出力:

{
  "order_id": "ORD-001",
  "customer_name": "田中太郎",
  "total_amount": 8000,
  "discount_applied": 2000,
  "status": "enriched"
}


💡 FormatOutputにより、不要なフィールド(customer_id / items / customer{} / pricing{})が除去され、
整形された5フィールドのシンプルなオブジェクトになります。

ポイント: Step Functions のデータフロー制御を活用することで、各Lambdaは自分の責務に必要なフィールドだけを受け取り、計算結果を元データに上書きせずに追加できます。これが「ステートフルな変換」の本質です。


2-3. データフロー制御フィルタの評価順序

データフロー制御フィルタ

Step Functions では、1つのステートが実行されるたびに最大5つのフィルタが決まった順序で適用されます。

[State入力]
  → ① InputPath : State入力から「Taskに渡す範囲」を選択
  → ② Parameters: Task入力を「必要なフィールドのみ」に組み立て
  → [Task実行]: Lambda / Activity / 組み込み関数など
  → ③ ResultSelector: Task出力から「使うフィールドのみ」を選択
  → ④ ResultPath: State入力に「Taskの結果をマージ」
  → ⑤ OutputPath: マージ後の全体から「次のStateに渡す範囲」を選択
[次のState入力]


各フィルタの作用レイヤー

フィルタ作用レイヤー役割
① InputPathState I/O 操作State入力の「どこを使うか」を JSONPath で選択
② ParametersTask I/O 操作Task に渡す入力を「新しいオブジェクト」として構築
③ ResultSelectorTask I/O 操作Taskの生レスポンスから「使うフィールドのみ」を選択
④ ResultPathマージ操作State入力のどの位置に「Taskの結果を書き込むか」を指定
⑤ OutputPathState I/O 操作マージ後のState入力から「次のStateに渡す範囲」を選択

「.$」サフィックスの重要性

ParametersResultSelector で最も重要なルールが 「.$」サフィックスです。

// ❌ リテラル値として扱われる (JSONPath参照にならない)
"Parameters": {
  "customer_id": "$.customer_id"// => 文字列 "$.customer_id" がそのまま渡る
}

// ✅ JSONPath参照として評価される
"Parameters": {
  "customer_id.$": "$.customer_id"  // => State入力の $.customer_id の値が渡る
}


"キー名.$" という形式にすることで、右辺の文字列がJSONPathとして評価され、実行時の入力データから動的に値を取得できます。この .$ をつけ忘れると、値がそのまま文字列として渡されてしまうため、バグの温床になります。


2-4. ステートごとのデータ変換例

ステートデータ変換

上図は GetCustomerInfo ステートにおけるデータ変換の全過程を示しています。1ステートの実行中に何が起きているかを追跡することで、フィルタの動作を直感的に理解できます。

Step 0 → 1: InputPath の適用

// State入力 (Step 0)
{
  "order_id": "ORD-001",
  "customer_id": "CUST-123",
  "items": [{"sku": "P-001", "qty": 2, "price": 5000}]
}

// InputPath: "$" を設定した場合 (Step 1) → 変化なし
// "$" はJSONドキュメント全体を指すため、State入力がそのままTask処理に渡る


InputPath: "$" はデフォルト値です。特定のサブツリーだけをTaskに渡したい場合は "$.order" のように指定します。

Step 1 → 2: Parameters の適用(必要項目のみ抽出)

// Parameters 設定
{
  "customer_id.$": "$.customer_id"
}

// => Task (Lambda) が受け取るイベント (Step 2)
{
  "customer_id": "CUST-123"// customer_id のみ抽出して渡す
}


GetCustomerInfo Lambda には顧客IDさえあれば十分です。itemsorder_id などの不要なフィールドを除外することで、Lambdaの責務を明確にし、最小権限の原則に近い設計が実現できます。

Step 2 → 3: Lambda実行(生レスポンス)

// Lambda の return 値 (Step 3)
{
  "name": "Tanaka Taro",
  "tier": "premium",
  "email": "tanaka@example.com"
}


Lambdaが返したレスポンスには email フィールドが含まれています。このフィールドは後続ステートでは不要ですが、この時点ではまだ生の状態です。

Step 3 → 4: ResultSelector の適用(必要フィールドのみ選択)

// ResultSelector 設定
{
  "customer_name.$": "$.name",
  "customer_tier.$": "$.tier"
}

// => ResultSelector 適用後 (Step 4)
{
  "customer_name": "Tanaka Taro",
  "customer_tier": "premium"
  // email は除外、フィールド名もリネームされた
}


ResultSelector は2つの役割を同時に果たします。

  1. フィールドの選択: email を除外し、後続ステートに渡さない
  2. フィールドのリネーム: namecustomer_nametiercustomer_tier にリネーム

Parameters との違いは「入力に作用するか、出力に作用するか」です。Parameters はTask入力を構築し、ResultSelector はTask出力を整形します。

Step 4 → 5: ResultPath の適用(元データにマージ)

// ResultPath 設定
"ResultPath": "$.customer"

// => ResultPath 適用後 (Step 5) — 次のStateへ渡されるデータ
{
  "order_id": "ORD-001",  // ← 元のState入力が保持されている
  "customer_id": "CUST-123", // ← 元のState入力が保持されている
  "items": [...],// ← 元のState入力が保持されている
  "customer": {  // ← ResultSelector の結果が追加された
 "customer_name": "Tanaka Taro",
 "customer_tier": "premium"
  }
}


ResultPath: "$.customer" は「ResultSelector で整形した結果を、State入力の $.customer というキーに書き込む」という意味です。元のState入力(order_id, customer_id, items)はそのまま保持され、$.customer が追加される形でマージされます。

ResultPath: null の注意点: ResultPathnull に設定すると、Taskの結果が完全に破棄され、元のState入力がそのまま次のStateに渡ります。Task結果を使わずにフロー制御だけ行いたい場合に有用ですが、誤って設定すると計算結果が消えてしまうので注意が必要です。

次のステートの CalculatePrice は、この $.customer.customer_tier を参照して割引率を決定します。ResultPath によって前のステートが「何を計算したか」が後続ステートから参照可能な状態で引き継がれるのが、Step Functions のデータ蓄積型パイプラインのポイントです。


アーキテクチャの全体像を把握したところで、実際にAWSコンソールを使ってこのパイプラインを構築してみよう。


3. AWSコンソールでのハンズオン

このセクションでは、AWSコンソールを使って 注文 enrichment パイプライン を実際に構築します。GetCustomerInfo → CalculatePrice → FormatOutput という3ステートのワークフローを通じて、Step Functions の全5フィルタ(InputPath / Parameters / ResultSelector / ResultPath / OutputPath)がどのようにデータを変換するかを体感します。


3-1. 前提条件

  • AWSアカウントおよび以下のサービスを操作できるIAM権限
  • AWS Lambda(関数の作成・実行・テスト)
  • AWS Step Functions(ステートマシンの作成・実行)
  • AWS IAM(ロール・ポリシーの作成)
  • Amazon CloudWatch Logs(ログの参照)
  • Pythonランタイム (3.12) はAWSが管理するため、ローカル環境へのPythonインストールは不要
  • Lambda の「インラインエディタ」機能を使用するため、コードのZIPアップロード等も不要

3-2. Lambda 2関数のデプロイ

enrichment パイプラインでは2つのLambda関数を使います。まず両関数をコンソールから作成します。

3-2-1. get-customer-info 関数の作成

関数の概要: 顧客IDを受け取り、顧客名・ティア・メールアドレス等の全情報を返します。Step Functions 側の ResultSelector で必要なフィールドだけを選択します。

作成手順:

  1. AWSコンソール → Lambda を開く
  2. 関数の作成」をクリック
  3. 「一から作成」を選択し、以下を入力:
  4. 関数名: get-customer-info
  5. ランタイム: Python 3.12
  6. アーキテクチャ: x86_64(デフォルト)
  7. 「実行ロール」は「基本的なLambdaアクセス許可で新しいロールを作成」を選択(デフォルト)
  8. 関数の作成」をクリック
  9. コードエディタ(インラインエディタ)が開いたら、lambda_function.py の内容を以下のコードにすべて置き換えてください:
import json

# 顧客マスタ(ハンズオン用モックデータ)
CUSTOMERS = {
 "CUST-123": {"name": "田中太郎", "tier": "premium", "email": "tanaka@example.com"},
 "CUST-456": {"name": "鈴木花子", "tier": "standard", "email": "suzuki@example.com"},
 "CUST-789": {"name": "佐藤次郎", "tier": "basic", "email": "sato@example.com"},
}

def lambda_handler(event, context):
 customer_id = event.get("customer_id")
 if not customer_id:
  raise ValueError("customer_id は必須です")

 customer = CUSTOMERS.get(customer_id)
 if not customer:
  raise ValueError(f"顧客が見つかりません: {customer_id}")

 # 全フィールドを返す(ResultSelectorで必要なものだけ選択させる)
 return {
  "customer_id": customer_id,
  "name": customer["name"],
  "tier": customer["tier"],
  "email": customer["email"],
  "last_login": "2026-04-10T10:00:00Z",
  "preferences": {"language": "ja", "currency": "JPY"}
 }

  1. Deploy」ボタンをクリックしてデプロイ(「変更がデプロイされました」と表示されれば成功)

動作確認(テスト実行):

  1. テスト」タブ → 「新しいイベントを作成」
  2. イベント名: test-cust123
  3. イベントJSONに以下を入力:
{"customer_id": "CUST-123"}

  1. テスト」をクリック
  2. 「実行結果: 成功」となり、以下のような出力が返ることを確認:
{
  "customer_id": "CUST-123",
  "name": "田中太郎",
  "tier": "premium",
  "email": "tanaka@example.com",
  "last_login": "2026-04-10T10:00:00Z",
  "preferences": {
 "language": "ja",
 "currency": "JPY"
  }
}


💡 ポイント: この関数は emaillast_loginpreferences など不要な情報も含めて返しています。Step Functions の ResultSelector を使って、後工程で必要な nametier だけを選択するのがこのハンズオンの肝です。

図1: Lambda関数の作成


3-2-2. calculate-price 関数の作成

関数の概要: 注文IDと商品リスト、顧客ティアを受け取り、小計・割引・合計を計算して返します。

作成手順:

  1. Lambda コンソール → 「関数の作成
  2. 以下を入力:
  3. 関数名: calculate-price
  4. ランタイム: Python 3.12
  5. 実行ロール: 基本的なLambdaアクセス許可(デフォルト)
  6. 関数の作成」をクリック
  7. インラインエディタで lambda_function.py を以下のコードに置き換え:
import json

# ティア別割引率
DISCOUNT_RATES = {
 "premium":  0.20,  # 20% off
 "standard": 0.10,  # 10% off
 "basic": 0.00,  # 割引なし
}

def lambda_handler(event, context):
 order_id = event.get("order_id")
 items = event.get("items", [])
 customer_tier = event.get("customer_tier", "basic")

 # 小計計算
 subtotal = sum(item.get("price", 0) * item.get("qty", 1) for item in items)

 # 割引計算
 discount_rate = DISCOUNT_RATES.get(customer_tier, 0.0)
 discount = round(subtotal * discount_rate)
 total = subtotal - discount

 # 詳細な結果を返す(ResultSelectorで必要なものだけ選択させる)
 return {
  "order_id": order_id,
  "subtotal": subtotal,
  "discount": discount,
  "discount_rate": discount_rate,
  "total": total,
  "currency": "JPY",
  "calculated_at": "2026-04-12T10:00:00Z"
 }

  1. Deploy」をクリック

動作確認(テスト実行):

  1. 「テスト」タブ → 新しいイベントを作成
  2. イベントJSONに以下を入力:
{
  "order_id": "ORD-001",
  "items": [{"id": "A", "price": 3000, "qty": 2}],
  "customer_tier": "premium"
}

  1. テスト」をクリック
  2. 以下のような出力を確認(subtotal: 6000、discount: 1200、total: 4800):
{
  "order_id": "ORD-001",
  "subtotal": 6000,
  "discount": 1200,
  "discount_rate": 0.2,
  "total": 4800,
  "currency": "JPY",
  "calculated_at": "2026-04-12T10:00:00Z"
}


💡 ポイント: この関数も discount_ratecurrencycalculated_at など、最終出力には不要な情報を返しています。ResultSelector で subtotaldiscounttotal の3フィールドだけに絞り込みます。


3-3. IAMロール作成(Step Functions実行ロール)

Step Functions が Lambda を呼び出すために専用の実行ロールを作成します。

手順:

  1. AWSコンソール → IAM → 「ロール」→「ロールを作成
  2. 信頼されたエンティティタイプ: 「AWSのサービス
  3. サービス: 「Step Functions」を選択 → 「次へ」
  4. 許可ポリシー: この画面では何も選択せずに「次へ」(後でインラインポリシーを追加)
  5. ロール名: sf-dataflow-execution-role
  6. ロールを作成」をクリック

インラインポリシーの追加:

  1. 作成したロール sf-dataflow-execution-role を開く
  2. 許可」タブ → 「許可を追加」→「インラインポリシーを作成
  3. JSON」タブに切り替え、以下をすべて貼り付け:
{
  "Version": "2012-10-17",
  "Statement": [
 {
"Sid": "InvokeLambda",
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": [
  "arn:aws:lambda:*:*:function:get-customer-info",
  "arn:aws:lambda:*:*:function:calculate-price"
]
 },
 {
"Sid": "CloudWatchLogs",
"Effect": "Allow",
"Action": [
  "logs:CreateLogGroup",
  "logs:CreateLogDelivery",
  "logs:PutLogEvents",
  "logs:DescribeLogGroups",
  "logs:DescribeLogStreams",
  "logs:GetLogDelivery",
  "logs:UpdateLogDelivery",
  "logs:DeleteLogDelivery",
  "logs:ListLogDeliveries"
],
"Resource": "*"
 },
 {
"Sid": "XRay",
"Effect": "Allow",
"Action": [
  "xray:PutTraceSegments",
  "xray:GetSamplingRules",
  "xray:GetSamplingTargets"
],
"Resource": "*"
 }
  ]
}

  1. ポリシー名: sf-dataflow-policy → 「ポリシーの作成

3-4. ステートマシン作成(enrichment パイプライン)

3-4-1. Lambda ARNの確認

ASLに埋め込む前に、2つのLambda関数のARNを確認します。

  1. Lambda コンソール → get-customer-info 関数を開く
  2. 右上の「関数ARN」をコピー(例: arn:aws:lambda:ap-northeast-1:123456789012:function:get-customer-info
  3. 同様に calculate-price 関数のARNもコピー

3-4-2. ステートマシンの作成

  1. AWSコンソール → Step Functions → 「ステートマシンの作成
  2. コードでワークフローを記述」を選択
  3. 以下のASLを貼り付けます。<LAMBDA_ARN_GET_CUSTOMER><LAMBDA_ARN_CALCULATE_PRICE> を手順3-4-1でコピーした実際のARNに置き換えてください:
{
  "Comment": "注文データ enrichment パイプライン(データフロー制御総合演習)",
  "StartAt": "GetCustomerInfo",
  "States": {
 "GetCustomerInfo": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_GET_CUSTOMER>",
"Parameters": {
  "customer_id.$": "$.customer_id"
},
"ResultSelector": {
  "customer_name.$": "$.name",
  "customer_tier.$": "$.tier"
},
"ResultPath": "$.customer",
"Next": "CalculatePrice"
 },
 "CalculatePrice": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_CALCULATE_PRICE>",
"Parameters": {
  "order_id.$": "$.order_id",
  "items.$": "$.items",
  "customer_tier.$": "$.customer.customer_tier"
},
"ResultSelector": {
  "subtotal.$": "$.subtotal",
  "discount.$": "$.discount",
  "total.$": "$.total"
},
"ResultPath": "$.pricing",
"Next": "FormatOutput"
 },
 "FormatOutput": {
"Type": "Pass",
"Parameters": {
  "order_id.$": "$.order_id",
  "customer_name.$": "$.customer.customer_name",
  "total_amount.$": "$.pricing.total",
  "discount_applied.$": "$.pricing.discount",
  "status": "enriched"
},
"End": true
 }
  }
}

  1. ステートマシン名: order-enrichment-pipeline
  2. 実行ロールを選択」→ 「既存のロールを選択」→ sf-dataflow-execution-role
  3. ログ設定」を展開:
  4. CloudWatch Logs: 有効化
  5. ログレベル: ALL(データフロー確認のために全ログを出力)
  6. CloudWatch Logs のロググループ: 自動作成を選択(または任意のロググループ名を入力)

💡 ログレベルを ALL にする理由: ERROR だと正常実行時のデータを見られません。学習中は ALL に設定することで、各フィルタ適用後のデータを CloudWatch Logs で詳細に確認できます。

  1. ステートマシンの作成」をクリック

図2: ASL入力画面


ASLの各ステートで何をしているか(解説):

ステートフィルタ役割
GetCustomerInfoParameterscustomer_id だけをLambdaに渡す(".$" サフィックスで参照渡し)
GetCustomerInfoResultSelectorLambdaの生の出力から namecustomer_nametiercustomer_tier のみ抽出・リネーム
GetCustomerInfoResultPath抽出結果を $.customer に格納(元の order_id, items は保持)
CalculatePriceParameters$.customer.customer_tier で前ステートが追加したデータを参照
CalculatePriceResultSelector価格計算結果から subtotaldiscounttotal のみ抽出
CalculatePriceResultPath抽出結果を $.pricing に格納
FormatOutputParametersPass State で最終出力を整形(不要フィールドを除去)

3-5. 実行して各フィルタの効果を確認

3-5-1. ステートマシンを実行

  1. order-enrichment-pipeline のページ → 「実行の開始
  2. 実行名: 任意(デフォルトのUUIDでも可)
  3. 入力に以下のJSONを貼り付け:
{
  "order_id": "ORD-001",
  "customer_id": "CUST-123",
  "items": [
 {"id": "ITEM-A", "name": "Tシャツ", "price": 3000, "qty": 2},
 {"id": "ITEM-B", "name": "パーカー", "price": 5000, "qty": 1}
  ]
}

  1. 実行の開始」をクリック
  2. ビジュアルフローで3つのステートが順番に緑色になることを確認

図3: 実行成功の様子(ビジュアルフロー)


3-5-2. GetCustomerInfo ステートのデータフロー確認

ビジュアルフローで「GetCustomerInfo」ステートをクリック → 「入出力」タブを開きます。

各フィルタの効果を確認してください:

タブ/項目表示されるデータ説明
入力{"order_id": "ORD-001", "customer_id": "CUST-123", "items": [...]}ステートマシンへの初期入力
Parameters適用後{"customer_id": "CUST-123"}customer_id だけに絞り込まれ、これがLambdaに渡される
Lambdaの生の出力{"customer_id": "CUST-123", "name": "田中太郎", "tier": "premium", "email": "...", "last_login": "...", "preferences": {...}}Lambdaが返す全データ
ResultSelector適用後{"customer_name": "田中太郎", "customer_tier": "premium"}namecustomer_nametiercustomer_tier にリネームされ、不要フィールドが除去
State出力(ResultPath適用後)元の入力 + "customer": {"customer_name": "田中太郎", "customer_tier": "premium"}元データを保持しつつ $.customer に顧客情報が追加

💡 ポイント: ResultPath "$.customer" により、元の order_id / customer_id / items保持されています。ResultPath "$" にするとLambdaの結果で上書きされてしまい、次ステートで order_id を参照できなくなります。

図4: GetCustomerInfoの入出力タブ


3-5-3. CalculatePrice ステートのデータフロー確認

同様に「CalculatePrice」ステートをクリック → 「入出力」タブ:

タブ/項目表示されるデータ説明
Parameters適用後{"order_id": "ORD-001", "items": [...], "customer_tier": "premium"}前ステートが $.customer に追加した customer_tier を参照できている
Lambdaの生の出力{"order_id": "ORD-001", "subtotal": 11000, "discount": 2200, "discount_rate": 0.2, "total": 8800, "currency": "JPY", ...}Lambdaが返す全データ
ResultSelector適用後{"subtotal": 11000, "discount": 2200, "total": 8800}3フィールドだけに絞り込まれた
State出力(ResultPath適用後)元データ + "pricing": {"subtotal": 11000, "discount": 2200, "total": 8800}$.pricing に価格情報が追加

💡 計算の確認: Tシャツ 3000円×2 + パーカー 5000円×1 = 11,000円(小計)。premiumティアの20%割引 = 2,200円。合計 = 8,800円。


3-5-4. FormatOutput ステート(Pass State)の確認

FormatOutput」ステートをクリック → 「入出力」タブ:

Parameters適用後(最終出力):

{
  "order_id": "ORD-001",
  "customer_name": "田中太郎",
  "total_amount": 8800,
  "discount_applied": 2200,
  "status": "enriched"
}


不要なフィールド(customer_iditemscustomer.*pricing.*)が除去された、クリーンな最終出力です。Pass State + Parameters の組み合わせで、データを加工せずに「整形された出力」を作れます。


3-6. よくある間違いのデバッグ体験

実際にミスを体験することで、各フィルタの役割をより深く理解できます。


デバッグ①: ".$" サフィックスを忘れた場合

変更箇所: ステートマシンのASLを編集 → GetCustomerInfo の Parameters を以下のように誤って変更:

"Parameters": {
  "customer_id": "$.customer_id"
}


(正しくは "customer_id.$": "$.customer_id"

実行して確認:

  1. ステートマシンを実行(入力は同じJSONを使用)
  2. GetCustomerInfo ステートが失敗する
  3. エラー内容を確認すると、Lambdaに "$.customer_id" という文字列リテラルが渡されていることがわかる
  4. Lambda は customer_id の値が "$.customer_id" という文字列になるため「顧客が見つかりません: $.customer_id」エラーを返す

学習ポイント:
"customer_id": "$.customer_id""$.customer_id" という文字列が渡される
"customer_id.$": "$.customer_id" → 入力JSONの customer_id フィールドの"CUST-123")が渡される
".$" サフィックスは「JsonPathで参照する」という宣言。忘れると文字列リテラルになる

確認後、ASLを元の正しいバージョンに戻してください。

図5: デバッグ —


デバッグ②: ResultPath "$" で元データが消える場合

変更箇所: GetCustomerInfo の ResultPath を "$.customer" から "$" に変更:

"ResultPath": "$"


実行して確認:

  1. ステートマシンを実行
  2. GetCustomerInfo は成功するが、CalculatePrice が失敗する
  3. CalculatePrice の入力を確認すると、order_iditems が消え、ResultSelector の出力(customer_namecustomer_tier)だけになっている
  4. CalculatePrice の Parameters で参照する $.order_id$.items が存在しないためエラー

学習ポイント:
– ResultPath "$" = Lambda の出力結果でステートの全データを上書き
– ResultPath "$.customer" = Lambda の出力結果を customer キーとして追加(元データは保持)
– 「元データを保持しながら新データを追加する」には必ずサブパス($.xxx)を指定する

確認後、ASLを元の "$.customer" に戻してください。


まとめ(コンソール編)

このハンズオンで体験した5フィルタの役割を整理します:

フィルタタイミング役割今回の使用例
InputPathステート入力前入力全体から必要部分を選択今回は使用せず(デフォルト $
ParametersLambda呼び出し前Lambda に渡すデータを整形GetCustomerInfo: customer_id のみ渡す
ResultSelectorLambda出力直後Lambda の生の出力から必要フィールドを選択不要フィールド(email, last_login等)を除去
ResultPath結果マージ時結果をどこに置くか指定$.customer, $.pricing で元データを保持
OutputPathステート出力前次ステートに渡すデータを選択今回は使用せず(デフォルト $

覚えておきたいパターン: Lambda に渡すデータを絞るには Parameters、Lambda の結果から不要フィールドを除くには ResultSelector、元データを壊さずに結果を追加するには ResultPath にサブパス を指定する。この3つを組み合わせるだけで、複雑なデータ変換を宣言的に表現できます。


同じパイプラインを次はTerraformでコードとして定義する。コンソール版と同一のASL構造であることを確認しながら進めよう。


4. Terraformでの構築

Section 3ではAWSコンソールを使って注文 enrichment パイプラインを構築しました。このセクションでは、まったく同じStep Functionsステートマシンと2つのLambda関数をTerraformで定義し、IaCによる再現性の高いインフラ構築を体験します。

ポイント: TerraformのASL定義はSection 3(コンソール版)と完全一致しています。同じロジックをコードで表現する際の違いを比較してみてください。


4-1. 前提条件

作業開始前に以下の準備が整っていることを確認してください。

項目確認方法
Terraform 1.0以上terraform version
AWS CLI設定済みaws configure list
適切なIAM権限Lambda / Step Functions / IAM / CloudWatch Logs の操作権限
# バージョン確認
terraform version
# Terraform v1.x.x 以上であること

aws configure list
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, region が設定済みであること


4-2. ディレクトリ構成

以下のディレクトリ構成でプロジェクトを作成します。

sf-dataflow/
├── main.tf  # メインリソース定義
├── variables.tf# 変数定義
├── outputs.tf  # 出力値定義
├── lambda/
│├── get_customer_info.py# 顧客情報取得Lambda
│└── calculate_price.py  # 価格計算Lambda
└── statemachine/
 └── definition.json.tpl # ステートマシン定義テンプレート

# ディレクトリ作成
mkdir -p sf-dataflow/lambda sf-dataflow/statemachine
cd sf-dataflow


4-3. Lambda ソースコード

Section 3のコンソール版と同じロジック構成のコードです(Parameters / ResultSelector / ResultPath のデータフロー制御はコンソール版と完全に同じ構造)。なお、顧客データのティア名・サンプルデータはTerraform版では簡略化しています。コンソール版と合わせて使用する場合は顧客ID(CUST-123等)とティア名(premium/standard/basic)を統一してください。

lambda/get_customer_info.py

import json

# モック顧客データベース
CUSTOMERS = {
 "CUST-123": {"name": "田中太郎", "tier": "gold"},
 "CUST-456": {"name": "佐藤花子", "tier": "silver"},
 "CUST-789": {"name": "鈴木一郎", "tier": "bronze"},
}

def lambda_handler(event, context):
 customer_id = event.get("customer_id")

 if customer_id not in CUSTOMERS:
  raise ValueError(f"Customer not found: {customer_id}")

 customer = CUSTOMERS[customer_id]

 return {
  "name": customer["name"],
  "tier": customer["tier"]
 }


lambda/calculate_price.py

import json

# ティア別割引率
DISCOUNT_RATES = {
 "gold":0.20,# 20% 割引
 "silver": 0.10,# 10% 割引
 "bronze": 0.05,#  5% 割引
}

def lambda_handler(event, context):
 items= event.get("items", [])
 customer_tier = event.get("customer_tier", "bronze")

 # 小計計算
 subtotal = sum(item["price"] * item["qty"] for item in items)

 # 割引計算
 discount_rate = DISCOUNT_RATES.get(customer_tier, 0)
 discount= int(subtotal * discount_rate)
 total= subtotal - discount

 return {
  "subtotal": subtotal,
  "discount": discount,
  "total": total
 }


4-4. statemachine/definition.json.tpl

コンソール版(Section 3)と同一のASL構造です。

Terraformでは templatefile() 関数を使って、Lambda関数ARNなどの動的な値を ${変数名} プレースホルダーで埋め込みます。main.tf 側から変数マップを渡すことで、実行時に正しいARNに置換されます。

{
  "Comment": "注文データ enrichment パイプライン(データフロー制御総合演習)",
  "StartAt": "GetCustomerInfo",
  "States": {
 "GetCustomerInfo": {
"Type": "Task",
"Resource": "${get_customer_arn}",
"Parameters": {
  "customer_id.$": "$.customer_id"
},
"ResultSelector": {
  "customer_name.$": "$.name",
  "customer_tier.$": "$.tier"
},
"ResultPath": "$.customer",
"Next": "CalculatePrice"
 },
 "CalculatePrice": {
"Type": "Task",
"Resource": "${calculate_price_arn}",
"Parameters": {
  "order_id.$": "$.order_id",
  "items.$": "$.items",
  "customer_tier.$": "$.customer.customer_tier"
},
"ResultSelector": {
  "subtotal.$": "$.subtotal",
  "discount.$": "$.discount",
  "total.$": "$.total"
},
"ResultPath": "$.pricing",
"Next": "FormatOutput"
 },
 "FormatOutput": {
"Type": "Pass",
"Parameters": {
  "order_id.$": "$.order_id",
  "customer_name.$": "$.customer.customer_name",
  "total_amount.$": "$.pricing.total",
  "discount_applied.$": "$.pricing.discount",
  "status": "enriched"
},
"End": true
 }
  }
}


templatefile() の仕組み:

  • ${get_customer_arn}aws_lambda_function.get_customer_info.arn に置換
  • ${calculate_price_arn}aws_lambda_function.calculate_price.arn に置換
  • $. で始まるJSONPath参照はStep Functionsの構文のため、Terraformの変数補間とは区別されます(${ が含まれないため競合しません)

4-5. variables.tf

variable "aws_region" {
  description = "デプロイ先のAWSリージョン"
  default  = "ap-northeast-1"
}

variable "project_name" {
  description = "プロジェクト名(リソース命名に使用)"
  default  = "order-enrichment"
}


4-6. main.tf(完全なコード)

以下の main.tf で全リソースを定義します。terraform plan が通る完全な状態です。

# ============================================================
# Provider & Data Sources
# ============================================================
terraform {
  required_providers {
 aws = {
source  = "hashicorp/aws"
version = ">= 5.0"
 }
  }
}

provider "aws" {
  region = var.aws_region
}

data "aws_caller_identity" "current" {}
data "aws_region" "current" {}

# ============================================================
# Lambda: ソースコードをZIPアーカイブ化
# ============================================================
data "archive_file" "get_customer_info" {
  type  = "zip"
  source_file = "${path.module}/lambda/get_customer_info.py"
  output_path = "${path.module}/.terraform/lambda_zips/get_customer_info.zip"
}

data "archive_file" "calculate_price" {
  type  = "zip"
  source_file = "${path.module}/lambda/calculate_price.py"
  output_path = "${path.module}/.terraform/lambda_zips/calculate_price.zip"
}

# ============================================================
# IAMロール: Lambda用
# ============================================================
resource "aws_iam_role" "lambda_role" {
  name = "${var.project_name}-lambda-role"

  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_basic" {
  role = aws_iam_role.lambda_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

# ============================================================
# IAMロール: Step Functions用
# ============================================================
resource "aws_iam_role" "sf_execution_role" {
  name = "${var.project_name}-sf-execution-role"

  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}

resource "aws_iam_role_policy" "sf_policy" {
  name = "${var.project_name}-sf-invoke-lambda"
  role = aws_iam_role.sf_execution_role.id

  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Action = "lambda:InvokeFunction"
Resource = [
  aws_lambda_function.get_customer_info.arn,
  aws_lambda_function.calculate_price.arn,
]
 }]
  })
}

# ============================================================
# CloudWatch Log Groups(Lambda用)
# ============================================================
resource "aws_cloudwatch_log_group" "get_customer_info" {
  name  = "/aws/lambda/${var.project_name}-get-customer-info"
  retention_in_days = 7
}

resource "aws_cloudwatch_log_group" "calculate_price" {
  name  = "/aws/lambda/${var.project_name}-calculate-price"
  retention_in_days = 7
}

# ============================================================
# Lambda Functions
# ============================================================
resource "aws_lambda_function" "get_customer_info" {
  function_name = "${var.project_name}-get-customer-info"
  filename= data.archive_file.get_customer_info.output_path
  source_code_hash = data.archive_file.get_customer_info.output_base64sha256
  runtime = "python3.12"
  handler = "get_customer_info.lambda_handler"
  role = aws_iam_role.lambda_role.arn

  depends_on = [
 aws_cloudwatch_log_group.get_customer_info,
 aws_iam_role_policy_attachment.lambda_basic,
  ]
}

resource "aws_lambda_function" "calculate_price" {
  function_name = "${var.project_name}-calculate-price"
  filename= data.archive_file.calculate_price.output_path
  source_code_hash = data.archive_file.calculate_price.output_base64sha256
  runtime = "python3.12"
  handler = "calculate_price.lambda_handler"
  role = aws_iam_role.lambda_role.arn

  depends_on = [
 aws_cloudwatch_log_group.calculate_price,
 aws_iam_role_policy_attachment.lambda_basic,
  ]
}

# ============================================================
# Step Functions State Machine
# ============================================================
resource "aws_sfn_state_machine" "order_enrichment" {
  name  = "${var.project_name}-state-machine"
  role_arn = aws_iam_role.sf_execution_role.arn

  definition = templatefile("${path.module}/statemachine/definition.json.tpl", {
 get_customer_arn = aws_lambda_function.get_customer_info.arn
 calculate_price_arn = aws_lambda_function.calculate_price.arn
  })

  depends_on = [aws_iam_role_policy.sf_policy]
}


4-7. outputs.tf

output "state_machine_arn" {
  description = "Step FunctionsステートマシンのリソースARN"
  value = aws_sfn_state_machine.order_enrichment.arn
}

output "state_machine_name" {
  description = "Step Functionsステートマシン名"
  value = aws_sfn_state_machine.order_enrichment.name
}

output "get_customer_info_arn" {
  description = "get_customer_info Lambda関数のARN"
  value = aws_lambda_function.get_customer_info.arn
}

output "calculate_price_arn" {
  description = "calculate_price Lambda関数のARN"
  value = aws_lambda_function.calculate_price.arn
}


4-8. デプロイ手順

# 初期化(プロバイダーのダウンロード)
terraform init

# 実行計画の確認(変更内容を事前確認)
terraform plan

# リソースの作成
terraform apply
# "yes" を入力して確定


terraform plan の出力例(抜粋):

Plan: 10 to add, 0 to change, 0 to destroy.


以下のリソースが作成されます:
aws_iam_role.lambda_role
aws_iam_role.sf_execution_role
aws_iam_role_policy.sf_policy
aws_iam_role_policy_attachment.lambda_basic
aws_cloudwatch_log_group.get_customer_info
aws_cloudwatch_log_group.calculate_price
aws_lambda_function.get_customer_info
aws_lambda_function.calculate_price
aws_sfn_state_machine.order_enrichment
– (archive_file は data source のため数に含まれない)


4-9. 動作確認(CLIで実行)

デプロイ完了後、AWS CLIでステートマシンを実行して動作を確認します。

# ステートマシンを実行
aws stepfunctions start-execution \
  --state-machine-arn $(terraform output -raw state_machine_arn) \
  --input '{
 "order_id": "ORD-TF-001",
 "customer_id": "CUST-123",
 "items": [
{"id": "ITEM-A", "name": "Tシャツ", "price": 3000, "qty": 2}
 ]
  }'


出力例(実行ARNが返る):

{
 "executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:order-enrichment-state-machine:abc123",
 "startDate": "2026-04-12T10:00:00.000Z"
}

# 実行結果を確認(<EXECUTION_ARN>を上記のexecutionArnに置き換え)
aws stepfunctions describe-execution \
  --execution-arn <EXECUTION_ARN> \
  --query 'output' --output text | python3 -m json.tool


期待する出力:

{
 "order_id": "ORD-TF-001",
 "customer_name": "田中太郎",
 "total_amount": 4800,
 "discount_applied": 1200,
 "status": "enriched"
}


計算内訳:
– 小計: 3,000円 × 2個 = 6,000円
– 割引: 6,000円 × 20%(goldティア)= 1,200円
– 合計: 6,000円 − 1,200円 = 4,800円


クリーンアップ(Terraform版)

動作確認後、不要なリソースを削除します。

terraform destroy
# "yes" を入力して確定


まとめ(Terraform編)

このセクションでは、Section 3(コンソール版)と完全に同一のASL構造を持つStep Functionsステートマシンを、Terraformを使ってコードで表現しました。

比較項目コンソール版(Section 3)Terraform版(Section 4)
ASL定義JSONをコンソールに貼り付けtemplatefile() で動的に生成
Lambda ARNコンソール上で手動選択${get_customer_arn} プレースホルダーで補間
再現性手順書が必要terraform apply で即再現
バージョン管理困難Gitで管理可能

ASL構造の一致確認: ステート名(GetCustomerInfo / CalculatePrice / FormatOutput)、ParametersResultSelectorResultPath はすべてコンソール版(Section 3)と完全一致しています。Terraformの ${...} プレースホルダーはLambda ARNのみに使用しており、Step Functionsのロジックには一切変更を加えていません。


基本的なパイプラインの構築が終わったところで、より高度なデータ変換テクニックを見ていこう。


5. 応用パターン集(Intrinsic Functions活用)

この Section では、データフロー制御でよく使われる応用パターンを ASL 例とともに解説します。
概念と実装例のみ紹介します(ハンズオン手順は省略)。

5-1. Intrinsic Functions を使ったデータ変換

Intrinsic Functions = ASL 内でデータ変換を Lambda 不要で行える組み込み関数です。
Parameters / ResultSelector 内で使用でき、ステート定義の中に直接記述できます。

主要な Intrinsic Functions 一覧:

関数用途
States.Format文字列フォーマット"States.Format('注文 {} 処理完了', $.order_id)"
States.StringToJson文字列を JSON にパースEventBridge の detail が文字列の場合に便利
States.JsonToStringJSON を文字列にシリアライズSQS/SNS へのメッセージ本文に使用
States.Array複数値から配列を生成"States.Array($.a, $.b, $.c)"
States.ArrayGetItem配列から要素取得"States.ArrayGetItem($.items, 0)"
States.ArrayLength配列の長さ取得件数チェックに使用
States.ArrayUnique配列の重複除去重複 ID の除去など
States.Base64EncodeBase64 エンコードバイナリデータの文字列化
States.Base64DecodeBase64 デコードエンコードされたペイロードの復元
States.HashSHA256/MD5 ハッシュ生成データの整合性チェック
States.JsonMergeJSON 同士をマージ複数ソースのデータ統合
States.MathAdd数値加算カウンターのインクリメント
States.MathRandom乱数生成A/B テストの振り分けなど
States.UUIDUUID v4 生成実行ごとに一意の ID を付与
States.ArrayContains配列に値が含まれるか判定条件分岐のフラグ生成

ASL 例(States.Format + States.UUID):

"GenerateOrderId": {
  "Type": "Pass",
  "Parameters": {
 "order_id.$": "States.UUID()",
 "message.$": "States.Format('注文 {} を受け付けました', $.customer_name)",
 "processed_at.$": "$$.Execution.StartTime"
  },
  "Next": "ProcessOrder"
}


このステートは Lambda を呼び出さずに、
States.UUID() でユニークな注文 ID を生成
States.Format() で顧客名を埋め込んだメッセージを生成
$$.Execution.StartTime で実行開始時刻を付与

という 3 つの変換を一度に行います。


5-2. Context Object ($$) を使った実行情報の埋め込み

Context Object = $$ で参照できる Step Functions の実行コンテキスト情報です。
ステートマシンの実行 ID や開始時刻など、AWS が自動的に付与するメタデータにアクセスできます。

主要な Context Object フィールド:

フィールド内容
$$.Execution.Id実行 ARN(一意の識別子)
$$.Execution.Name実行名
$$.Execution.StartTime実行開始時刻(ISO 8601)
$$.Execution.Input実行時の入力(全体)
$$.State.Name現在のステート名
$$.State.EnteredTimeステート開始時刻
$$.Task.Token.waitForTaskToken 用トークン
$$.StateMachine.Idステートマシン ARN
$$.StateMachine.Nameステートマシン名

ASL 例(実行 ID を Lambda に渡す):

"ProcessWithContext": {
  "Type": "Task",
  "Resource": "<LAMBDA_ARN>",
  "Parameters": {
 "order_id.$": "$.order_id",
 "execution_id.$": "$$.Execution.Id",
 "execution_name.$": "$$.Execution.Name",
 "started_at.$": "$$.Execution.StartTime"
  },
  "ResultPath": "$.result",
  "Next": "Done"
}


重要: $$ は Context Object 参照、$ は State 入力参照です。同一の Parameters ブロック内で混在して使用できます。

Context Object を活用することで、分散トレーシングや監査ログに実行 ID を自動付与でき、Lambda 側で ID を生成・管理する手間が省けます。


5-3. InputPath + OutputPath のみで軽量フィルタリング

Lambda を呼び出さずに Pass State で InputPath / OutputPath を組み合わせることで、ペイロードから必要な部分だけを抽出できます。

"FilterState": {
  "Type": "Pass",
  "InputPath": "$.order",
  "Parameters": {
 "essentials": {
"id.$": "$.id",
"amount.$": "$.amount"
 }
  },
  "OutputPath": "$.essentials",
  "Next": "NextState"
}


ユースケース: 大きなペイロードから必要な部分だけを次のステートに渡す。
Lambda の 6 MB ペイロード制限対策として有効で、不要なフィールドを早期に除去することで後続ステートの処理負荷も軽減できます。


5-4. ResultPath null でサイドエフェクト専用ステート

"ResultPath": null = Task の結果を破棄し、入力データをそのまま次のステートに渡すパターンです。
通知・ログ記録・監査など、元データを変更せずに副作用だけを実行したい場合に使用します。

ASL 例(監査ログ記録 + データパッシング):

"AuditLog": {
  "Type": "Task",
  "Resource": "<LAMBDA_ARN_AUDIT>",
  "Parameters": {
 "event_type": "order_processed",
 "order_id.$": "$.order_id",
 "execution_id.$": "$$.Execution.Id"
  },
  "ResultPath": null,
  "Next": "NotifyCustomer"
}


AuditLog Lambda が何を返しても、State のデータは変更されません。NotifyCustomer ステートには AuditLog 実行前と同じ入力がそのまま渡されます。

データパイプラインの途中でログ記録・通知を挟む定番パターンであり、処理の主系統に副作用を追加する際の標準的な実装方法です。


6. ハンズオン後の削除手順

6-1. コスト注意事項

⚠️ 本ハンズオンで作成したリソースは無料枠内で動作しますが、長期間放置すると課金が発生します。

リソース月額目安備考
Lambda無料枠 100 万リクエスト以内なら無料本ハンズオン程度では課金なし
Step Functions4,000 回の状態遷移まで無料本ハンズオン程度では課金なし
CloudWatch Logs~$0.50/GB(保存)+ ~$0.01/GB(取得)ロググループを削除することで停止

ハンズオン完了後は、不要なリソースを削除しておくことを推奨します。


6-2. Terraform で構築した場合

以下のコマンド 1 つで、Terraform で作成したリソースをまとめて削除できます。

cd terraform/
terraform destroy


実行すると削除対象リソースの一覧が表示され、確認後に削除が実行されます。

terraform destroy で削除されないリソースへの注意:

retain_on_delete を設定した CloudWatch Logs ロググループは Terraform 管理外となるため、手動削除が必要です。

aws logs delete-log-group --log-group-name /aws/lambda/get-customer-info
aws logs delete-log-group --log-group-name /aws/lambda/calculate-price


6-3. コンソールで構築した場合の削除チェックリスト

コンソールから手動で構築した場合は、以下の順番で削除してください。

  • [ ] Step Functions ステートマシンorder-enrichment-pipeline)を削除
  • マネジメントコンソール → Step Functions → ステートマシン → 選択して削除
  • [ ] Lambda 関数get-customer-info / calculate-price)を削除
  • マネジメントコンソール → Lambda → 関数 → 各関数を選択して削除
  • [ ] IAM ロールsf-dataflow-execution-role)と付随するポリシーを削除
  • マネジメントコンソール → IAM → ロール → 検索して削除
  • [ ] CloudWatch Logs ロググループ(Lambda 名のもの)を削除
  • マネジメントコンソール → CloudWatch → ロググループ → 各グループを選択して削除

7. まとめと次のステップ

7-1. この記事で学んだこと

本記事では、Step Functions のデータフロー制御を支える 5 つのフィルタとその周辺技術を学びました。

  • 5 つのフィルタの役割と評価順序: InputPathParametersResultSelectorResultPathOutputPath の順で評価され、それぞれが入出力データの異なる側面を制御する
  • Parameters".$" サフィックスの重要性: ".$" がないと JSONPath ではなく文字列リテラルとして扱われ、意図しない動作の原因になる
  • ResultPath "$.key" でデータを保持しながらステートを積み重ねるパターン: 各ステートの結果を別キーに保存することで、全ステップの出力を最終ステートまで引き継げる
  • ResultPath null でサイドエフェクト専用ステートを実現するパターン: 通知・監査ログなどの副作用を、メインのデータフローに影響を与えずに挿入できる
  • Context Object ($$) を使った実行コンテキスト情報の活用: 実行 ID・開始時刻・ステートマシン名などを Lambda に自動付与でき、トレーサビリティが向上する
  • Intrinsic Functions(States.Format / States.UUID 等)で Lambda 不要のデータ変換: 文字列フォーマット・UUID 生成・配列操作などを ASL 内で完結させ、Lambda コスト・レイテンシを削減できる
  • enrichment パイプラインでの全フィルタ総合活用: 顧客情報取得 → 価格計算 → レポート生成という実践的なパイプラインで、5 つのフィルタの連携パターンを体験した

7-2. 次のステップ

Step Functions の世界はさらに広がります。今後のシリーズで以下のトピックを解説予定です。

  • Callback パターン(.waitForTaskToken — 人間の承認フローや非同期ジョブ待機など、外部イベントを待機する高度なパターン
  • Distributed Map — 大規模な並列バッチ処理を効率的に実現するためのステートタイプ
  • Express vs Standard Workflow の使い分け — コスト・実行時間・実行保証レベルの違いを理解し、用途に応じて選択する方法
  • SDK Integration(Lambda 不要で直接 AWS サービスを呼び出す) — Lambda を介さずに DynamoDB・SQS・SNS などを直接操作し、シンプルかつ低コストなワークフローを構築する方法

7-3. シリーズリンク

本記事は「AWS ハンズオン TechBlog」Step Functions シリーズの第 4 回です。

シリーズ一覧:
– 第 1 回: AWS Step Functions 入門 — コンソールと Terraform で学ぶハンズオン
– 第 2 回: ECS × Step Functions 入門 — CSV バッチを Fargate タスクでジョブ化するハンズオン
– 第 3 回: Step Functions エラーハンドリング完全ガイド — 注文処理パイプラインで学ぶ Retry/Catch/Timeout
– 第 4 回: 本記事(Step Functions 入出力データフロー制御完全ガイド)


7-4. 参考リンク

  • Amazon States Language — 入出力処理
  • Step Functions Intrinsic Functions リファレンス
  • Context Object リファレンス
  • Step Functions クォータ(ペイロードサイズ制限)
最新情報をチェックしよう!