- 1 Step Functions 入出力データフロー制御完全ガイド — 5つのフィルタでペイロードを最適化するハンズオン
- 1.1 目次
- 1.2 1. Step Functions 入出力データフロー制御(概念編)
- 1.3 2. アーキテクチャ解説
- 1.4 3. AWSコンソールでのハンズオン
- 1.5 3-1. 前提条件
- 1.6 3-2. Lambda 2関数のデプロイ
- 1.7 3-3. IAMロール作成(Step Functions実行ロール)
- 1.8 3-4. ステートマシン作成(enrichment パイプライン)
- 1.9 3-5. 実行して各フィルタの効果を確認
- 1.10 3-6. よくある間違いのデバッグ体験
- 1.11 まとめ(コンソール編)
- 1.12 4. Terraformでの構築
- 1.13 4-1. 前提条件
- 1.14 4-2. ディレクトリ構成
- 1.15 4-3. Lambda ソースコード
- 1.16 4-4. statemachine/definition.json.tpl
- 1.17 4-5. variables.tf
- 1.18 4-6. main.tf(完全なコード)
- 1.19 4-7. outputs.tf
- 1.20 4-8. デプロイ手順
- 1.21 4-9. 動作確認(CLIで実行)
- 1.22 クリーンアップ(Terraform版)
- 1.23 まとめ(Terraform編)
- 1.24 5. 応用パターン集(Intrinsic Functions活用)
- 1.25 6. ハンズオン後の削除手順
- 1.26 7. まとめと次のステップ
Step Functions 入出力データフロー制御完全ガイド — 5つのフィルタでペイロードを最適化するハンズオン
公開日: 2026-04-12
難易度: 中級
所要時間: 約90分
シリーズ: [Step Functions シリーズ 第4回]
この記事で学ぶこと
– 5つのデータフローフィルタ(InputPath/Parameters/ResultSelector/ResultPath/OutputPath)の役割と評価順序
– 注文enrichmentパイプライン(顧客情報取得→価格計算→出力整形)で全フィルタを実践
– ResultPath “$.key” でステートをまたいでデータを積み重ねるパターン
– Context Object ($$) と Intrinsic Functions でLambda不要のデータ変換
– Terraform版との完全一致ASL定義シリーズ前回記事:
– 第1回: AWS Step Functions 入門 — コンソールとTerraformで学ぶハンズオン
– 第2回: ECS × Step Functions 入門 — CSVバッチをFargateタスクでジョブ化するハンズオン
– 第3回: Step Functions エラーハンドリング完全ガイド
目次
- Step Functions 入出力データフロー制御(概念編)
- アーキテクチャ解説
- AWSコンソールでのハンズオン
- Terraformでの構築
- 応用パターン集(Intrinsic Functions活用)
- ハンズオン後の削除手順
- まとめと次のステップ
1. Step Functions 入出力データフロー制御(概念編)
1-1. なぜデータフロー制御が必要か
分散ステートマシンでのデータ肥大化問題
Step Functionsのステートマシンは、前のステートの出力が次のステートの入力として引き渡される仕組みになっている。シンプルなワークフローではこれで十分だが、複数のステップが連鎖するワークフローでは問題が生じる。
例えば、注文処理ワークフローを考えてみよう。注文情報として orderId, customerId, items, shippingAddress, paymentInfo, metadata などが初期入力として渡されるとする。ReserveInventory ステートが完了すると、その結果(在庫確保レポート)が加わる。ProcessPayment が完了すると、また決済結果が追加される。ステートが進むにつれ、後続の Lambda 関数には関係のないデータまで蓄積され続ける。
これには 3 つの実害がある:
- Lambda ペイロード上限への接近: Lambda 関数の入出力には 6MB の制限がある。不要なデータを引き回すと、この上限に意図せず近づく
- ビジネスロジックの混乱: Lambda 関数の実装者が「どのフィールドを使えばいいか」を迷う。不要フィールドが混在していると、誤ったフィールドを参照するバグにつながる
- 可読性の低下: Step Functions コンソールで実行履歴を確認する際に、不要データが多いとデバッグが困難になる
Step Functions が提供する 5 つのフィルタによる解決策
Step Functions は、各ステートでデータを精密にコントロールするための 5 つのフィルタ をASL(Amazon States Language)に組み込んでいる:
| フィルタ | 役割 |
|---|---|
InputPath | ステート入力全体から一部を抜き出す |
Parameters | Task への入力を柔軟に組み立てる |
ResultSelector | Task 結果から必要部分を選択・整形する |
ResultPath | Task 結果をステート入力のどこに配置するか指定する |
OutputPath | 次のステートへ渡す最終出力を絞り込む |
これら 5 つを使いこなすことで、「Lambda には必要なデータだけを渡し、次のステートには必要な結果だけを送る」という精密なデータフロー制御が実現できる。
1-2. 評価順序の図解(最重要)
5 つのフィルタは 必ず決まった順序で 評価される。この順序を理解しないと、意図しない動作が起きても原因が追えない。
State 入力(前 State の出力 or 実行開始時の input)
↓ ① InputPath — 入力全体から一部を抜き出す(JSONPath)
↓ ② Parameters— Task への入力を自由に組み立てる(固定値 + JSONPath 参照混在可)
↓ [Task / Lambda 実行]
↓ ③ ResultSelector — Task 結果から必要部分を選択
↓ ④ ResultPath— Task 結果を元の入力のどこに配置するか
↓ ⑤ OutputPath— 最終出力の絞り込み
次の State の入力
重要な概念的区別:2 つのレイヤー
この 5 つのフィルタには 2 つの異なるレイヤー が存在する:
| レイヤー | フィルタ | 作用対象 |
|---|---|---|
| State レイヤー | InputPath, OutputPath | State の入出力(ステートマシン全体のデータフロー) |
| Task レイヤー | Parameters, ResultSelector | Task の入出力(Lambda などのリソースに渡す/受け取るデータ) |
| 結合レイヤー | ResultPath | Task 結果を State の入力にマージする(2 つのレイヤーをつなぐ) |
この区別がわかっていないと、「InputPath と Parameters のどちらを使えばいいか」「ResultSelector と OutputPath の違いは何か」という混乱に陥る。
1-3. 各フィルタの詳細解説
InputPath
役割: ステート入力全体から一部を抜き出して Task 入力の候補にする。
| 値 | 動作 |
|---|---|
"$" | 入力全体を渡す(デフォルト) |
"$.order" | order フィールドのみを抜き出す(JSONPath) |
null | 空の JSON {} を渡す(Task が入力を必要としない場合) |
構文: JSONPath 文字列($ で始まる)
"InputPath": "$.order"
使いどころ: ステート入力のうち、Task に関連するサブオブジェクト全体をまとめて渡したいとき。複雑な絞り込みや値の組み立てが必要なら Parameters を使う。
Parameters
役割: Task への入力を柔軟に組み立てる。固定値と入力からの参照を自由に組み合わせられる。最もよく使うフィルタ。
| 記法 | 動作 |
|---|---|
"key": "value" | 固定値(文字列をそのまま渡す) |
"key.$": "$.path" | JSONPath 参照(.$ サフィックスが必須) |
"key.$": "$$.Execution.Id" | Context Object 参照($$ で実行コンテキストを参照) |
"Parameters": {
"order_id.$": "$.order_id",
"customer_id.$": "$.customer_id",
"action": "lookup",
"execution_id.$": "$$.Execution.Id"
}
よくある間違い:
.$サフィックスを忘れると、"$.order_id"という文字列リテラルがそのまま Lambda に渡される。Lambda のログを見ると"order_id": "$.order_id"となっており、デバッグに気づきにくいバグになる。
InputPath との使い分け:
– InputPath: 入力の一部を そのまま Task に渡したい → シンプル
– Parameters: 複数フィールドを 選んで組み立て直したい、固定値を追加したい → 柔軟
ResultSelector
役割: Task の実行結果から必要なフィールドのみを選択して整形する。Parameters と 同じ記法(.$ サフィックス)を使う。
"ResultSelector": {
"customer_name.$": "$.name",
"tier.$": "$.subscription_tier",
"source": "dynamodb-lookup"
}
使いどころ: Lambda の実行結果には、ステータスコードやメタデータなど不要な情報が含まれることがある。ResultSelector で必要なフィールドだけを選んでから ResultPath や OutputPath に渡す。
OutputPathとの違い:ResultSelectorは Task 結果(Lambda の return 値) に作用する。OutputPathはResultPath適用後の State 全体の出力 に作用する。処理のタイミングが異なる。
ResultPath
役割: Task 結果(ResultSelector 適用後)を State 入力のどこに配置するかを指定する。元の入力と Task 結果を マージ する操作。
| 値 | 動作 |
|---|---|
"$" | Task 結果で State 入力全体を置換(デフォルト)。元の入力は失われる |
"$.result" | 元の入力を保持しつつ、$.result に Task 結果を追加(最もよく使う) |
null | Task 結果を破棄し、元の入力をそのまま次に渡す(副作用のみの Task に使用) |
元の入力: { "orderId": "ORD-001", "amount": 5000 }
Task 結果: { "customer_name": "田中太郎", "tier": "premium" }
ResultPath: "$"→ { "customer_name": "田中太郎", "tier": "premium" } ← 元入力消滅
ResultPath: "$.customer" → { "orderId": "ORD-001", "amount": 5000, "customer": { "customer_name": "田中太郎", "tier": "premium" } }
ResultPath: null → { "orderId": "ORD-001", "amount": 5000 } ← Task 結果破棄
OutputPath との混乱ポイント:
– ResultPath = Task 結果をどこに 「入れるか」(マージ操作)
– OutputPath = 最終出力から何を 「取り出すか」(フィルタ操作)
OutputPath
役割: ResultPath 適用後の State 全体のデータから、次のステートへ渡す部分を絞り込む。
| 値 | 動作 |
|---|---|
"$" | 全体を渡す(デフォルト) |
"$.customer" | customer フィールドのみを次のステートへ渡す |
null | 空の JSON {} を次のステートへ渡す |
"OutputPath": "$.customer"
使いどころ: ワークフローの途中で、特定のステート以降に不要なデータを切り捨てたいとき。InputPath がステートの 入口 でフィルタするのに対し、OutputPath は 出口 でフィルタする。
1-4. よくある混乱ポイントまとめ
| 混乱ポイント | 正しい理解 |
|---|---|
ResultPath: "$" vs ResultPath: null | "$" = Task 結果で全体を置換(元入力が消える)/ null = Task 結果を破棄(元入力を維持) |
Parameters の ".$" 忘れ | .$ なしだと JSONPath が文字列リテラルとして渡される → Lambda のログで発覚するが気づきにくい |
InputPath vs Parameters の使い分け | InputPath = 単純な切り出し(サブオブジェクト丸ごと) / Parameters = 柔軟な組み立て(複数フィールド選択・固定値追加) |
ResultSelector vs OutputPath | ResultSelector = Task 結果(Lambda の戻り値)に作用 / OutputPath = ResultPath 適用後の State 全体の出力に作用 |
ResultPath vs OutputPath | ResultPath = 結果のマージ先(「どこに入れるか」) / OutputPath = 最終出力の切り出し(「何を取り出すか」) |
| デフォルト挙動 | InputPath, OutputPath のデフォルトは "$"(全体)。ResultPath のデフォルトも "$"(全置換)。すべて省略 = データがそのまま流れると勘違いしやすいが、ResultPath: "$" は元入力を破棄する点に注意 |
1-5. Intrinsic Functions との連携
Step Functions には、Parameters や ResultSelector の中で使える Intrinsic Functions(組み込み関数) が用意されている。単純な値の抜き出し・組み立て以上の処理を Lambda を介さずに直接 ASL 内で行える。
主な Intrinsic Functions
| 関数 | 説明 | 例 |
|---|---|---|
States.Format | 文字列を動的に組み立てる | "arn:aws:sns:ap-northeast-1:{}:{}" |
States.JsonMerge | 2 つの JSON オブジェクトをマージする | 元の入力と Task 結果の浅いマージ |
States.StringToJson | 文字列を JSON に変換する | Lambda が文字列で返した JSON のパース |
States.JsonToString | JSON を文字列に変換する | JSONPath 参照値の文字列化 |
States.Array | 配列を生成する | 複数の値を配列にまとめる |
States.ArrayLength | 配列の長さを取得する | — |
States.Base64Encode / States.Base64Decode | Base64 変換 | — |
States.Hash | ハッシュ値を生成する | MD5, SHA-256 など |
States.MathAdd / States.MathRandom | 数値演算 | カウンターのインクリメントなど |
States.Format の使用例
"Parameters": {
"message.$": "States.Format('注文 {} の処理を開始します。顧客: {}', $.orderId, $.customerId)"
}
States.JsonMerge の使用例
Parameters で元の入力と Task 結果を浅くマージしたいとき(ResultPath では実現できない複雑なマージが必要な場合)に使用する。
"ResultSelector": {
"merged.$": "States.JsonMerge($$.Execution.Input, $, false)"
}
注意:
States.JsonMergeの第 3 引数はマージモードで、現時点ではfalse(浅いマージ)のみサポートされている。深いマージは未対応。
1-6. ベストプラクティス
不要なデータは早めに除去する
InputPath と OutputPath を活用して、Lambda に渡す前・次のステートに渡す前に不要データを除去する。Lambda の入出力ペイロード制限(6MB)への余裕が生まれるだけでなく、実行履歴の可読性も向上する。
ResultPath: "$.result" を常套手段として使う
元のステート入力を保持しながら Task の結果を追加するのに最適なパターン。後続のステートで元の入力(例: orderId)と Task 結果(例: customer 情報)の両方を参照できるようになる。
"ResultPath": "$.lookupResult"
Parameters の .$ サフィックスはコードレビューで必ず確認する
"key": "$.path" と "key.$": "$.path" は外見が似ているが動作が全く異なる。前者は文字列リテラル "$.path" を渡し、後者は JSONPath 参照で実際の値を渡す。CI/CD パイプラインに ASL の静的解析(例: cfn-lint、aws-sam-cli validate)を組み込む。
Context Object($$)を活用する
$$.Execution.Id(実行 ARN)や $$.Task.Token(タスクトークン)は、べき等処理やコールバックパターンで頻繁に使う。これらは Parameters の .$ サフィックスと組み合わせて参照できる。
"Parameters": {
"idempotency_key.$": "$$.Execution.Id",
"task_token.$": "$$.Task.Token"
}
データフロー全体を図示してから実装する
5 つのフィルタが組み合わさると、どの時点でどのデータが存在するかを頭の中で追うのが難しくなる。ステートごとに「入力 → InputPath → Parameters → Task → ResultSelector → ResultPath → OutputPath → 出力」の流れを図にしてから ASL を書くと、バグを未然に防げる。
概念を理解したところで、次のセクションでは実際にハンズオンで使う注文エンリッチメントパイプラインのアーキテクチャを解説する。
2. アーキテクチャ解説
2-1. ハンズオンシナリオ概要
本記事のハンズオンでは 注文エンリッチメントパイプライン を題材にします。ECサイトで注文を受け付けた際、注文データに顧客情報・価格情報を付加して最終的な処理済み注文を生成するシナリオです。
注文データ (order_id, customer_id, items)
↓ GetCustomerInfo ← 顧客情報をDynamoDB相当から取得
↓ CalculatePrice ← 顧客ティアに応じた割引価格を計算
↓ FormatOutput ← 最終出力を整形
処理済み注文 (enriched order data)
パイプラインを構成する3ステート
| ステート | 種別 | 役割 | 使用するデータフローフィルタ |
|---|---|---|---|
| GetCustomerInfo | Lambda State | customer_id でDynamoDBから顧客情報を取得し、注文データに $.customer として追加 | Parameters, ResultSelector, ResultPath |
| CalculatePrice | Lambda State | 顧客ティア(premium/standard)に応じた割引を適用して最終価格を計算し、$.pricing として追加 | Parameters, ResultSelector, ResultPath |
| FormatOutput | Pass State | 前段までのデータを最終的な出力形式に整形して返す | Parameters, OutputPath |
この3ステートのパイプラインを通じて、5つのデータフローフィルタ(InputPath / Parameters / ResultSelector / ResultPath / OutputPath)が具体的にどこで・どのように機能するかを体験的に学ぶことができます。
2-2. 全体アーキテクチャ

上図は注文エンリッチメントパイプラインの全体構成を示しています。
- 左: 実行トリガー(ユーザーからのAPI呼び出しや EventBridge によるスケジュール実行)
- 中央: Step Functions ステートマシン(
sf-order-enrichment-pipeline) - GetCustomerInfo Lambda → CalculatePrice Lambda → FormatOutput Pass State の3ステートで構成
- 各ステートの内部にどのフィルタが設定されているかを示している
- 右: 最終出力(エンリッチ済み注文データ)
- 下部: IAMロール(Lambda実行権限・DynamoDB読み取り権限・CloudWatch Logs書き込み権限)
パイプラインを流れるデータの変化(俯瞰)
パイプライン開始時の入力データはシンプルな注文情報です。
// ExecutionInput (StartExecution で渡すデータ)
{
"order_id": "ORD-001",
"customer_id": "CUST-123",
"items": [
{ "sku": "P-001", "qty": 2, "price": 5000 }
]
}
このデータが3ステートを通過することで、以下のようにエンリッチされます。
CalculatePrice通過後の中間データ(FormatOutput適用前):
{
"order_id": "ORD-001",
"customer_id": "CUST-123",
"items": [{"sku": "P-001", "qty": 2, "price": 5000}],
"customer": {
"customer_name": "田中太郎",
"customer_tier": "premium"
},
"pricing": {
"subtotal": 10000,
"discount": 2000,
"total": 8000
}
}
FormatOutput(Pass State)で整形した最終出力:
{
"order_id": "ORD-001",
"customer_name": "田中太郎",
"total_amount": 8000,
"discount_applied": 2000,
"status": "enriched"
}
💡 FormatOutputにより、不要なフィールド(customer_id / items / customer{} / pricing{})が除去され、
整形された5フィールドのシンプルなオブジェクトになります。
ポイント: Step Functions のデータフロー制御を活用することで、各Lambdaは自分の責務に必要なフィールドだけを受け取り、計算結果を元データに上書きせずに追加できます。これが「ステートフルな変換」の本質です。
2-3. データフロー制御フィルタの評価順序

Step Functions では、1つのステートが実行されるたびに最大5つのフィルタが決まった順序で適用されます。
[State入力]
→ ① InputPath : State入力から「Taskに渡す範囲」を選択
→ ② Parameters: Task入力を「必要なフィールドのみ」に組み立て
→ [Task実行]: Lambda / Activity / 組み込み関数など
→ ③ ResultSelector: Task出力から「使うフィールドのみ」を選択
→ ④ ResultPath: State入力に「Taskの結果をマージ」
→ ⑤ OutputPath: マージ後の全体から「次のStateに渡す範囲」を選択
[次のState入力]
各フィルタの作用レイヤー
| フィルタ | 作用レイヤー | 色 | 役割 |
|---|---|---|---|
| ① InputPath | State I/O 操作 | 青 | State入力の「どこを使うか」を JSONPath で選択 |
| ② Parameters | Task I/O 操作 | 緑 | Task に渡す入力を「新しいオブジェクト」として構築 |
| ③ ResultSelector | Task I/O 操作 | 緑 | Taskの生レスポンスから「使うフィールドのみ」を選択 |
| ④ ResultPath | マージ操作 | 橙 | State入力のどの位置に「Taskの結果を書き込むか」を指定 |
| ⑤ OutputPath | State I/O 操作 | 青 | マージ後のState入力から「次のStateに渡す範囲」を選択 |
「.$」サフィックスの重要性
Parameters と ResultSelector で最も重要なルールが 「.$」サフィックスです。
// ❌ リテラル値として扱われる (JSONPath参照にならない)
"Parameters": {
"customer_id": "$.customer_id"// => 文字列 "$.customer_id" がそのまま渡る
}
// ✅ JSONPath参照として評価される
"Parameters": {
"customer_id.$": "$.customer_id" // => State入力の $.customer_id の値が渡る
}
"キー名.$" という形式にすることで、右辺の文字列がJSONPathとして評価され、実行時の入力データから動的に値を取得できます。この .$ をつけ忘れると、値がそのまま文字列として渡されてしまうため、バグの温床になります。
2-4. ステートごとのデータ変換例

上図は GetCustomerInfo ステートにおけるデータ変換の全過程を示しています。1ステートの実行中に何が起きているかを追跡することで、フィルタの動作を直感的に理解できます。
Step 0 → 1: InputPath の適用
// State入力 (Step 0)
{
"order_id": "ORD-001",
"customer_id": "CUST-123",
"items": [{"sku": "P-001", "qty": 2, "price": 5000}]
}
// InputPath: "$" を設定した場合 (Step 1) → 変化なし
// "$" はJSONドキュメント全体を指すため、State入力がそのままTask処理に渡る
InputPath: "$" はデフォルト値です。特定のサブツリーだけをTaskに渡したい場合は "$.order" のように指定します。
Step 1 → 2: Parameters の適用(必要項目のみ抽出)
// Parameters 設定
{
"customer_id.$": "$.customer_id"
}
// => Task (Lambda) が受け取るイベント (Step 2)
{
"customer_id": "CUST-123"// customer_id のみ抽出して渡す
}
GetCustomerInfo Lambda には顧客IDさえあれば十分です。items や order_id などの不要なフィールドを除外することで、Lambdaの責務を明確にし、最小権限の原則に近い設計が実現できます。
Step 2 → 3: Lambda実行(生レスポンス)
// Lambda の return 値 (Step 3)
{
"name": "Tanaka Taro",
"tier": "premium",
"email": "tanaka@example.com"
}
Lambdaが返したレスポンスには email フィールドが含まれています。このフィールドは後続ステートでは不要ですが、この時点ではまだ生の状態です。
Step 3 → 4: ResultSelector の適用(必要フィールドのみ選択)
// ResultSelector 設定
{
"customer_name.$": "$.name",
"customer_tier.$": "$.tier"
}
// => ResultSelector 適用後 (Step 4)
{
"customer_name": "Tanaka Taro",
"customer_tier": "premium"
// email は除外、フィールド名もリネームされた
}
ResultSelector は2つの役割を同時に果たします。
- フィールドの選択:
emailを除外し、後続ステートに渡さない - フィールドのリネーム:
name→customer_name、tier→customer_tierにリネーム
Parameters との違いは「入力に作用するか、出力に作用するか」です。Parameters はTask入力を構築し、ResultSelector はTask出力を整形します。
Step 4 → 5: ResultPath の適用(元データにマージ)
// ResultPath 設定
"ResultPath": "$.customer"
// => ResultPath 適用後 (Step 5) — 次のStateへ渡されるデータ
{
"order_id": "ORD-001", // ← 元のState入力が保持されている
"customer_id": "CUST-123", // ← 元のState入力が保持されている
"items": [...],// ← 元のState入力が保持されている
"customer": { // ← ResultSelector の結果が追加された
"customer_name": "Tanaka Taro",
"customer_tier": "premium"
}
}
ResultPath: "$.customer" は「ResultSelector で整形した結果を、State入力の $.customer というキーに書き込む」という意味です。元のState入力(order_id, customer_id, items)はそのまま保持され、$.customer が追加される形でマージされます。
ResultPath: nullの注意点:ResultPathをnullに設定すると、Taskの結果が完全に破棄され、元のState入力がそのまま次のStateに渡ります。Task結果を使わずにフロー制御だけ行いたい場合に有用ですが、誤って設定すると計算結果が消えてしまうので注意が必要です。
次のステートの CalculatePrice は、この $.customer.customer_tier を参照して割引率を決定します。ResultPath によって前のステートが「何を計算したか」が後続ステートから参照可能な状態で引き継がれるのが、Step Functions のデータ蓄積型パイプラインのポイントです。
アーキテクチャの全体像を把握したところで、実際にAWSコンソールを使ってこのパイプラインを構築してみよう。
3. AWSコンソールでのハンズオン
このセクションでは、AWSコンソールを使って 注文 enrichment パイプライン を実際に構築します。GetCustomerInfo → CalculatePrice → FormatOutput という3ステートのワークフローを通じて、Step Functions の全5フィルタ(InputPath / Parameters / ResultSelector / ResultPath / OutputPath)がどのようにデータを変換するかを体感します。
3-1. 前提条件
- AWSアカウントおよび以下のサービスを操作できるIAM権限
- AWS Lambda(関数の作成・実行・テスト)
- AWS Step Functions(ステートマシンの作成・実行)
- AWS IAM(ロール・ポリシーの作成)
- Amazon CloudWatch Logs(ログの参照)
- Pythonランタイム (3.12) はAWSが管理するため、ローカル環境へのPythonインストールは不要
- Lambda の「インラインエディタ」機能を使用するため、コードのZIPアップロード等も不要
3-2. Lambda 2関数のデプロイ
enrichment パイプラインでは2つのLambda関数を使います。まず両関数をコンソールから作成します。
3-2-1. get-customer-info 関数の作成
関数の概要: 顧客IDを受け取り、顧客名・ティア・メールアドレス等の全情報を返します。Step Functions 側の ResultSelector で必要なフィールドだけを選択します。
作成手順:
- AWSコンソール → Lambda を開く
- 「関数の作成」をクリック
- 「一から作成」を選択し、以下を入力:
- 関数名:
get-customer-info - ランタイム:
Python 3.12 - アーキテクチャ:
x86_64(デフォルト) - 「実行ロール」は「基本的なLambdaアクセス許可で新しいロールを作成」を選択(デフォルト)
- 「関数の作成」をクリック
- コードエディタ(インラインエディタ)が開いたら、
lambda_function.pyの内容を以下のコードにすべて置き換えてください:
import json
# 顧客マスタ(ハンズオン用モックデータ)
CUSTOMERS = {
"CUST-123": {"name": "田中太郎", "tier": "premium", "email": "tanaka@example.com"},
"CUST-456": {"name": "鈴木花子", "tier": "standard", "email": "suzuki@example.com"},
"CUST-789": {"name": "佐藤次郎", "tier": "basic", "email": "sato@example.com"},
}
def lambda_handler(event, context):
customer_id = event.get("customer_id")
if not customer_id:
raise ValueError("customer_id は必須です")
customer = CUSTOMERS.get(customer_id)
if not customer:
raise ValueError(f"顧客が見つかりません: {customer_id}")
# 全フィールドを返す(ResultSelectorで必要なものだけ選択させる)
return {
"customer_id": customer_id,
"name": customer["name"],
"tier": customer["tier"],
"email": customer["email"],
"last_login": "2026-04-10T10:00:00Z",
"preferences": {"language": "ja", "currency": "JPY"}
}
- 「Deploy」ボタンをクリックしてデプロイ(「変更がデプロイされました」と表示されれば成功)
動作確認(テスト実行):
- 「テスト」タブ → 「新しいイベントを作成」
- イベント名:
test-cust123 - イベントJSONに以下を入力:
{"customer_id": "CUST-123"}
- 「テスト」をクリック
- 「実行結果: 成功」となり、以下のような出力が返ることを確認:
{
"customer_id": "CUST-123",
"name": "田中太郎",
"tier": "premium",
"email": "tanaka@example.com",
"last_login": "2026-04-10T10:00:00Z",
"preferences": {
"language": "ja",
"currency": "JPY"
}
}
💡 ポイント: この関数は
last_login・preferencesなど不要な情報も含めて返しています。Step Functions の ResultSelector を使って、後工程で必要なnameとtierだけを選択するのがこのハンズオンの肝です。

3-2-2. calculate-price 関数の作成
関数の概要: 注文IDと商品リスト、顧客ティアを受け取り、小計・割引・合計を計算して返します。
作成手順:
- Lambda コンソール → 「関数の作成」
- 以下を入力:
- 関数名:
calculate-price - ランタイム:
Python 3.12 - 実行ロール: 基本的なLambdaアクセス許可(デフォルト)
- 「関数の作成」をクリック
- インラインエディタで
lambda_function.pyを以下のコードに置き換え:
import json
# ティア別割引率
DISCOUNT_RATES = {
"premium": 0.20, # 20% off
"standard": 0.10, # 10% off
"basic": 0.00, # 割引なし
}
def lambda_handler(event, context):
order_id = event.get("order_id")
items = event.get("items", [])
customer_tier = event.get("customer_tier", "basic")
# 小計計算
subtotal = sum(item.get("price", 0) * item.get("qty", 1) for item in items)
# 割引計算
discount_rate = DISCOUNT_RATES.get(customer_tier, 0.0)
discount = round(subtotal * discount_rate)
total = subtotal - discount
# 詳細な結果を返す(ResultSelectorで必要なものだけ選択させる)
return {
"order_id": order_id,
"subtotal": subtotal,
"discount": discount,
"discount_rate": discount_rate,
"total": total,
"currency": "JPY",
"calculated_at": "2026-04-12T10:00:00Z"
}
- 「Deploy」をクリック
動作確認(テスト実行):
- 「テスト」タブ → 新しいイベントを作成
- イベントJSONに以下を入力:
{
"order_id": "ORD-001",
"items": [{"id": "A", "price": 3000, "qty": 2}],
"customer_tier": "premium"
}
- 「テスト」をクリック
- 以下のような出力を確認(subtotal: 6000、discount: 1200、total: 4800):
{
"order_id": "ORD-001",
"subtotal": 6000,
"discount": 1200,
"discount_rate": 0.2,
"total": 4800,
"currency": "JPY",
"calculated_at": "2026-04-12T10:00:00Z"
}
💡 ポイント: この関数も
discount_rate・currency・calculated_atなど、最終出力には不要な情報を返しています。ResultSelector でsubtotal・discount・totalの3フィールドだけに絞り込みます。
3-3. IAMロール作成(Step Functions実行ロール)
Step Functions が Lambda を呼び出すために専用の実行ロールを作成します。
手順:
- AWSコンソール → IAM → 「ロール」→「ロールを作成」
- 信頼されたエンティティタイプ: 「AWSのサービス」
- サービス: 「Step Functions」を選択 → 「次へ」
- 許可ポリシー: この画面では何も選択せずに「次へ」(後でインラインポリシーを追加)
- ロール名:
sf-dataflow-execution-role - 「ロールを作成」をクリック
インラインポリシーの追加:
- 作成したロール
sf-dataflow-execution-roleを開く - 「許可」タブ → 「許可を追加」→「インラインポリシーを作成」
- 「JSON」タブに切り替え、以下をすべて貼り付け:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "InvokeLambda",
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": [
"arn:aws:lambda:*:*:function:get-customer-info",
"arn:aws:lambda:*:*:function:calculate-price"
]
},
{
"Sid": "CloudWatchLogs",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogDelivery",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries"
],
"Resource": "*"
},
{
"Sid": "XRay",
"Effect": "Allow",
"Action": [
"xray:PutTraceSegments",
"xray:GetSamplingRules",
"xray:GetSamplingTargets"
],
"Resource": "*"
}
]
}
- ポリシー名:
sf-dataflow-policy→ 「ポリシーの作成」
3-4. ステートマシン作成(enrichment パイプライン)
3-4-1. Lambda ARNの確認
ASLに埋め込む前に、2つのLambda関数のARNを確認します。
- Lambda コンソール →
get-customer-info関数を開く - 右上の「関数ARN」をコピー(例:
arn:aws:lambda:ap-northeast-1:123456789012:function:get-customer-info) - 同様に
calculate-price関数のARNもコピー
3-4-2. ステートマシンの作成
- AWSコンソール → Step Functions → 「ステートマシンの作成」
- 「コードでワークフローを記述」を選択
- 以下のASLを貼り付けます。
<LAMBDA_ARN_GET_CUSTOMER>と<LAMBDA_ARN_CALCULATE_PRICE>を手順3-4-1でコピーした実際のARNに置き換えてください:
{
"Comment": "注文データ enrichment パイプライン(データフロー制御総合演習)",
"StartAt": "GetCustomerInfo",
"States": {
"GetCustomerInfo": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_GET_CUSTOMER>",
"Parameters": {
"customer_id.$": "$.customer_id"
},
"ResultSelector": {
"customer_name.$": "$.name",
"customer_tier.$": "$.tier"
},
"ResultPath": "$.customer",
"Next": "CalculatePrice"
},
"CalculatePrice": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_CALCULATE_PRICE>",
"Parameters": {
"order_id.$": "$.order_id",
"items.$": "$.items",
"customer_tier.$": "$.customer.customer_tier"
},
"ResultSelector": {
"subtotal.$": "$.subtotal",
"discount.$": "$.discount",
"total.$": "$.total"
},
"ResultPath": "$.pricing",
"Next": "FormatOutput"
},
"FormatOutput": {
"Type": "Pass",
"Parameters": {
"order_id.$": "$.order_id",
"customer_name.$": "$.customer.customer_name",
"total_amount.$": "$.pricing.total",
"discount_applied.$": "$.pricing.discount",
"status": "enriched"
},
"End": true
}
}
}
- ステートマシン名:
order-enrichment-pipeline - 「実行ロールを選択」→ 「既存のロールを選択」→
sf-dataflow-execution-role - 「ログ設定」を展開:
- CloudWatch Logs: 有効化
- ログレベル:
ALL(データフロー確認のために全ログを出力) - CloudWatch Logs のロググループ: 自動作成を選択(または任意のロググループ名を入力)
💡 ログレベルを ALL にする理由:
ERRORだと正常実行時のデータを見られません。学習中はALLに設定することで、各フィルタ適用後のデータを CloudWatch Logs で詳細に確認できます。
- 「ステートマシンの作成」をクリック

ASLの各ステートで何をしているか(解説):
| ステート | フィルタ | 役割 |
|---|---|---|
| GetCustomerInfo | Parameters | customer_id だけをLambdaに渡す(".$" サフィックスで参照渡し) |
| GetCustomerInfo | ResultSelector | Lambdaの生の出力から name→customer_name、tier→customer_tier のみ抽出・リネーム |
| GetCustomerInfo | ResultPath | 抽出結果を $.customer に格納(元の order_id, items は保持) |
| CalculatePrice | Parameters | $.customer.customer_tier で前ステートが追加したデータを参照 |
| CalculatePrice | ResultSelector | 価格計算結果から subtotal・discount・total のみ抽出 |
| CalculatePrice | ResultPath | 抽出結果を $.pricing に格納 |
| FormatOutput | Parameters | Pass State で最終出力を整形(不要フィールドを除去) |
3-5. 実行して各フィルタの効果を確認
3-5-1. ステートマシンを実行
order-enrichment-pipelineのページ → 「実行の開始」- 実行名: 任意(デフォルトのUUIDでも可)
- 入力に以下のJSONを貼り付け:
{
"order_id": "ORD-001",
"customer_id": "CUST-123",
"items": [
{"id": "ITEM-A", "name": "Tシャツ", "price": 3000, "qty": 2},
{"id": "ITEM-B", "name": "パーカー", "price": 5000, "qty": 1}
]
}
- 「実行の開始」をクリック
- ビジュアルフローで3つのステートが順番に緑色になることを確認

3-5-2. GetCustomerInfo ステートのデータフロー確認
ビジュアルフローで「GetCustomerInfo」ステートをクリック → 「入出力」タブを開きます。
各フィルタの効果を確認してください:
| タブ/項目 | 表示されるデータ | 説明 |
|---|---|---|
| 入力 | {"order_id": "ORD-001", "customer_id": "CUST-123", "items": [...]} | ステートマシンへの初期入力 |
| Parameters適用後 | {"customer_id": "CUST-123"} | customer_id だけに絞り込まれ、これがLambdaに渡される |
| Lambdaの生の出力 | {"customer_id": "CUST-123", "name": "田中太郎", "tier": "premium", "email": "...", "last_login": "...", "preferences": {...}} | Lambdaが返す全データ |
| ResultSelector適用後 | {"customer_name": "田中太郎", "customer_tier": "premium"} | name→customer_name、tier→customer_tier にリネームされ、不要フィールドが除去 |
| State出力(ResultPath適用後) | 元の入力 + "customer": {"customer_name": "田中太郎", "customer_tier": "premium"} | 元データを保持しつつ $.customer に顧客情報が追加 |
💡 ポイント: ResultPath
"$.customer"により、元のorder_id/customer_id/itemsが保持されています。ResultPath"$"にするとLambdaの結果で上書きされてしまい、次ステートでorder_idを参照できなくなります。

3-5-3. CalculatePrice ステートのデータフロー確認
同様に「CalculatePrice」ステートをクリック → 「入出力」タブ:
| タブ/項目 | 表示されるデータ | 説明 |
|---|---|---|
| Parameters適用後 | {"order_id": "ORD-001", "items": [...], "customer_tier": "premium"} | 前ステートが $.customer に追加した customer_tier を参照できている |
| Lambdaの生の出力 | {"order_id": "ORD-001", "subtotal": 11000, "discount": 2200, "discount_rate": 0.2, "total": 8800, "currency": "JPY", ...} | Lambdaが返す全データ |
| ResultSelector適用後 | {"subtotal": 11000, "discount": 2200, "total": 8800} | 3フィールドだけに絞り込まれた |
| State出力(ResultPath適用後) | 元データ + "pricing": {"subtotal": 11000, "discount": 2200, "total": 8800} | $.pricing に価格情報が追加 |
💡 計算の確認: Tシャツ 3000円×2 + パーカー 5000円×1 = 11,000円(小計)。premiumティアの20%割引 = 2,200円。合計 = 8,800円。
3-5-4. FormatOutput ステート(Pass State)の確認
「FormatOutput」ステートをクリック → 「入出力」タブ:
Parameters適用後(最終出力):
{
"order_id": "ORD-001",
"customer_name": "田中太郎",
"total_amount": 8800,
"discount_applied": 2200,
"status": "enriched"
}
不要なフィールド(customer_id・items・customer.*・pricing.*)が除去された、クリーンな最終出力です。Pass State + Parameters の組み合わせで、データを加工せずに「整形された出力」を作れます。
3-6. よくある間違いのデバッグ体験
実際にミスを体験することで、各フィルタの役割をより深く理解できます。
デバッグ①: ".$" サフィックスを忘れた場合
変更箇所: ステートマシンのASLを編集 → GetCustomerInfo の Parameters を以下のように誤って変更:
"Parameters": {
"customer_id": "$.customer_id"
}
(正しくは "customer_id.$": "$.customer_id")
実行して確認:
- ステートマシンを実行(入力は同じJSONを使用)
- GetCustomerInfo ステートが失敗する
- エラー内容を確認すると、Lambdaに
"$.customer_id"という文字列リテラルが渡されていることがわかる - Lambda は
customer_idの値が"$.customer_id"という文字列になるため「顧客が見つかりません: $.customer_id」エラーを返す
学習ポイント:
– "customer_id": "$.customer_id" → "$.customer_id" という文字列が渡される
– "customer_id.$": "$.customer_id" → 入力JSONの customer_id フィールドの値("CUST-123")が渡される
– ".$" サフィックスは「JsonPathで参照する」という宣言。忘れると文字列リテラルになる
確認後、ASLを元の正しいバージョンに戻してください。

デバッグ②: ResultPath "$" で元データが消える場合
変更箇所: GetCustomerInfo の ResultPath を "$.customer" から "$" に変更:
"ResultPath": "$"
実行して確認:
- ステートマシンを実行
- GetCustomerInfo は成功するが、CalculatePrice が失敗する
- CalculatePrice の入力を確認すると、
order_id・itemsが消え、ResultSelector の出力(customer_name・customer_tier)だけになっている - CalculatePrice の Parameters で参照する
$.order_id・$.itemsが存在しないためエラー
学習ポイント:
– ResultPath "$" = Lambda の出力結果でステートの全データを上書き
– ResultPath "$.customer" = Lambda の出力結果を customer キーとして追加(元データは保持)
– 「元データを保持しながら新データを追加する」には必ずサブパス($.xxx)を指定する
確認後、ASLを元の "$.customer" に戻してください。
まとめ(コンソール編)
このハンズオンで体験した5フィルタの役割を整理します:
| フィルタ | タイミング | 役割 | 今回の使用例 |
|---|---|---|---|
| InputPath | ステート入力前 | 入力全体から必要部分を選択 | 今回は使用せず(デフォルト $) |
| Parameters | Lambda呼び出し前 | Lambda に渡すデータを整形 | GetCustomerInfo: customer_id のみ渡す |
| ResultSelector | Lambda出力直後 | Lambda の生の出力から必要フィールドを選択 | 不要フィールド(email, last_login等)を除去 |
| ResultPath | 結果マージ時 | 結果をどこに置くか指定 | $.customer, $.pricing で元データを保持 |
| OutputPath | ステート出力前 | 次ステートに渡すデータを選択 | 今回は使用せず(デフォルト $) |
覚えておきたいパターン: Lambda に渡すデータを絞るには Parameters、Lambda の結果から不要フィールドを除くには ResultSelector、元データを壊さずに結果を追加するには ResultPath にサブパス を指定する。この3つを組み合わせるだけで、複雑なデータ変換を宣言的に表現できます。
同じパイプラインを次はTerraformでコードとして定義する。コンソール版と同一のASL構造であることを確認しながら進めよう。
4. Terraformでの構築
Section 3ではAWSコンソールを使って注文 enrichment パイプラインを構築しました。このセクションでは、まったく同じStep Functionsステートマシンと2つのLambda関数をTerraformで定義し、IaCによる再現性の高いインフラ構築を体験します。
ポイント: TerraformのASL定義はSection 3(コンソール版)と完全一致しています。同じロジックをコードで表現する際の違いを比較してみてください。
4-1. 前提条件
作業開始前に以下の準備が整っていることを確認してください。
| 項目 | 確認方法 |
|---|---|
| Terraform 1.0以上 | terraform version |
| AWS CLI設定済み | aws configure list |
| 適切なIAM権限 | Lambda / Step Functions / IAM / CloudWatch Logs の操作権限 |
# バージョン確認
terraform version
# Terraform v1.x.x 以上であること
aws configure list
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, region が設定済みであること
4-2. ディレクトリ構成
以下のディレクトリ構成でプロジェクトを作成します。
sf-dataflow/
├── main.tf # メインリソース定義
├── variables.tf# 変数定義
├── outputs.tf # 出力値定義
├── lambda/
│├── get_customer_info.py# 顧客情報取得Lambda
│└── calculate_price.py # 価格計算Lambda
└── statemachine/
└── definition.json.tpl # ステートマシン定義テンプレート
# ディレクトリ作成
mkdir -p sf-dataflow/lambda sf-dataflow/statemachine
cd sf-dataflow
4-3. Lambda ソースコード
Section 3のコンソール版と同じロジック構成のコードです(Parameters / ResultSelector / ResultPath のデータフロー制御はコンソール版と完全に同じ構造)。なお、顧客データのティア名・サンプルデータはTerraform版では簡略化しています。コンソール版と合わせて使用する場合は顧客ID(CUST-123等)とティア名(premium/standard/basic)を統一してください。
lambda/get_customer_info.py
import json
# モック顧客データベース
CUSTOMERS = {
"CUST-123": {"name": "田中太郎", "tier": "gold"},
"CUST-456": {"name": "佐藤花子", "tier": "silver"},
"CUST-789": {"name": "鈴木一郎", "tier": "bronze"},
}
def lambda_handler(event, context):
customer_id = event.get("customer_id")
if customer_id not in CUSTOMERS:
raise ValueError(f"Customer not found: {customer_id}")
customer = CUSTOMERS[customer_id]
return {
"name": customer["name"],
"tier": customer["tier"]
}
lambda/calculate_price.py
import json
# ティア別割引率
DISCOUNT_RATES = {
"gold":0.20,# 20% 割引
"silver": 0.10,# 10% 割引
"bronze": 0.05,# 5% 割引
}
def lambda_handler(event, context):
items= event.get("items", [])
customer_tier = event.get("customer_tier", "bronze")
# 小計計算
subtotal = sum(item["price"] * item["qty"] for item in items)
# 割引計算
discount_rate = DISCOUNT_RATES.get(customer_tier, 0)
discount= int(subtotal * discount_rate)
total= subtotal - discount
return {
"subtotal": subtotal,
"discount": discount,
"total": total
}
4-4. statemachine/definition.json.tpl
コンソール版(Section 3)と同一のASL構造です。
Terraformでは templatefile() 関数を使って、Lambda関数ARNなどの動的な値を ${変数名} プレースホルダーで埋め込みます。main.tf 側から変数マップを渡すことで、実行時に正しいARNに置換されます。
{
"Comment": "注文データ enrichment パイプライン(データフロー制御総合演習)",
"StartAt": "GetCustomerInfo",
"States": {
"GetCustomerInfo": {
"Type": "Task",
"Resource": "${get_customer_arn}",
"Parameters": {
"customer_id.$": "$.customer_id"
},
"ResultSelector": {
"customer_name.$": "$.name",
"customer_tier.$": "$.tier"
},
"ResultPath": "$.customer",
"Next": "CalculatePrice"
},
"CalculatePrice": {
"Type": "Task",
"Resource": "${calculate_price_arn}",
"Parameters": {
"order_id.$": "$.order_id",
"items.$": "$.items",
"customer_tier.$": "$.customer.customer_tier"
},
"ResultSelector": {
"subtotal.$": "$.subtotal",
"discount.$": "$.discount",
"total.$": "$.total"
},
"ResultPath": "$.pricing",
"Next": "FormatOutput"
},
"FormatOutput": {
"Type": "Pass",
"Parameters": {
"order_id.$": "$.order_id",
"customer_name.$": "$.customer.customer_name",
"total_amount.$": "$.pricing.total",
"discount_applied.$": "$.pricing.discount",
"status": "enriched"
},
"End": true
}
}
}
templatefile() の仕組み:
${get_customer_arn}→aws_lambda_function.get_customer_info.arnに置換${calculate_price_arn}→aws_lambda_function.calculate_price.arnに置換$.で始まるJSONPath参照はStep Functionsの構文のため、Terraformの変数補間とは区別されます(${が含まれないため競合しません)
4-5. variables.tf
variable "aws_region" {
description = "デプロイ先のAWSリージョン"
default = "ap-northeast-1"
}
variable "project_name" {
description = "プロジェクト名(リソース命名に使用)"
default = "order-enrichment"
}
4-6. main.tf(完全なコード)
以下の main.tf で全リソースを定義します。terraform plan が通る完全な状態です。
# ============================================================
# Provider & Data Sources
# ============================================================
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = ">= 5.0"
}
}
}
provider "aws" {
region = var.aws_region
}
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
# ============================================================
# Lambda: ソースコードをZIPアーカイブ化
# ============================================================
data "archive_file" "get_customer_info" {
type = "zip"
source_file = "${path.module}/lambda/get_customer_info.py"
output_path = "${path.module}/.terraform/lambda_zips/get_customer_info.zip"
}
data "archive_file" "calculate_price" {
type = "zip"
source_file = "${path.module}/lambda/calculate_price.py"
output_path = "${path.module}/.terraform/lambda_zips/calculate_price.zip"
}
# ============================================================
# IAMロール: Lambda用
# ============================================================
resource "aws_iam_role" "lambda_role" {
name = "${var.project_name}-lambda-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy_attachment" "lambda_basic" {
role = aws_iam_role.lambda_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
# ============================================================
# IAMロール: Step Functions用
# ============================================================
resource "aws_iam_role" "sf_execution_role" {
name = "${var.project_name}-sf-execution-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "sf_policy" {
name = "${var.project_name}-sf-invoke-lambda"
role = aws_iam_role.sf_execution_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Action = "lambda:InvokeFunction"
Resource = [
aws_lambda_function.get_customer_info.arn,
aws_lambda_function.calculate_price.arn,
]
}]
})
}
# ============================================================
# CloudWatch Log Groups(Lambda用)
# ============================================================
resource "aws_cloudwatch_log_group" "get_customer_info" {
name = "/aws/lambda/${var.project_name}-get-customer-info"
retention_in_days = 7
}
resource "aws_cloudwatch_log_group" "calculate_price" {
name = "/aws/lambda/${var.project_name}-calculate-price"
retention_in_days = 7
}
# ============================================================
# Lambda Functions
# ============================================================
resource "aws_lambda_function" "get_customer_info" {
function_name = "${var.project_name}-get-customer-info"
filename= data.archive_file.get_customer_info.output_path
source_code_hash = data.archive_file.get_customer_info.output_base64sha256
runtime = "python3.12"
handler = "get_customer_info.lambda_handler"
role = aws_iam_role.lambda_role.arn
depends_on = [
aws_cloudwatch_log_group.get_customer_info,
aws_iam_role_policy_attachment.lambda_basic,
]
}
resource "aws_lambda_function" "calculate_price" {
function_name = "${var.project_name}-calculate-price"
filename= data.archive_file.calculate_price.output_path
source_code_hash = data.archive_file.calculate_price.output_base64sha256
runtime = "python3.12"
handler = "calculate_price.lambda_handler"
role = aws_iam_role.lambda_role.arn
depends_on = [
aws_cloudwatch_log_group.calculate_price,
aws_iam_role_policy_attachment.lambda_basic,
]
}
# ============================================================
# Step Functions State Machine
# ============================================================
resource "aws_sfn_state_machine" "order_enrichment" {
name = "${var.project_name}-state-machine"
role_arn = aws_iam_role.sf_execution_role.arn
definition = templatefile("${path.module}/statemachine/definition.json.tpl", {
get_customer_arn = aws_lambda_function.get_customer_info.arn
calculate_price_arn = aws_lambda_function.calculate_price.arn
})
depends_on = [aws_iam_role_policy.sf_policy]
}
4-7. outputs.tf
output "state_machine_arn" {
description = "Step FunctionsステートマシンのリソースARN"
value = aws_sfn_state_machine.order_enrichment.arn
}
output "state_machine_name" {
description = "Step Functionsステートマシン名"
value = aws_sfn_state_machine.order_enrichment.name
}
output "get_customer_info_arn" {
description = "get_customer_info Lambda関数のARN"
value = aws_lambda_function.get_customer_info.arn
}
output "calculate_price_arn" {
description = "calculate_price Lambda関数のARN"
value = aws_lambda_function.calculate_price.arn
}
4-8. デプロイ手順
# 初期化(プロバイダーのダウンロード)
terraform init
# 実行計画の確認(変更内容を事前確認)
terraform plan
# リソースの作成
terraform apply
# "yes" を入力して確定
terraform plan の出力例(抜粋):
Plan: 10 to add, 0 to change, 0 to destroy.
以下のリソースが作成されます:
– aws_iam_role.lambda_role
– aws_iam_role.sf_execution_role
– aws_iam_role_policy.sf_policy
– aws_iam_role_policy_attachment.lambda_basic
– aws_cloudwatch_log_group.get_customer_info
– aws_cloudwatch_log_group.calculate_price
– aws_lambda_function.get_customer_info
– aws_lambda_function.calculate_price
– aws_sfn_state_machine.order_enrichment
– (archive_file は data source のため数に含まれない)
4-9. 動作確認(CLIで実行)
デプロイ完了後、AWS CLIでステートマシンを実行して動作を確認します。
# ステートマシンを実行
aws stepfunctions start-execution \
--state-machine-arn $(terraform output -raw state_machine_arn) \
--input '{
"order_id": "ORD-TF-001",
"customer_id": "CUST-123",
"items": [
{"id": "ITEM-A", "name": "Tシャツ", "price": 3000, "qty": 2}
]
}'
出力例(実行ARNが返る):
{
"executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:order-enrichment-state-machine:abc123",
"startDate": "2026-04-12T10:00:00.000Z"
}
# 実行結果を確認(<EXECUTION_ARN>を上記のexecutionArnに置き換え)
aws stepfunctions describe-execution \
--execution-arn <EXECUTION_ARN> \
--query 'output' --output text | python3 -m json.tool
期待する出力:
{
"order_id": "ORD-TF-001",
"customer_name": "田中太郎",
"total_amount": 4800,
"discount_applied": 1200,
"status": "enriched"
}
計算内訳:
– 小計: 3,000円 × 2個 = 6,000円
– 割引: 6,000円 × 20%(goldティア)= 1,200円
– 合計: 6,000円 − 1,200円 = 4,800円
クリーンアップ(Terraform版)
動作確認後、不要なリソースを削除します。
terraform destroy
# "yes" を入力して確定
まとめ(Terraform編)
このセクションでは、Section 3(コンソール版)と完全に同一のASL構造を持つStep Functionsステートマシンを、Terraformを使ってコードで表現しました。
| 比較項目 | コンソール版(Section 3) | Terraform版(Section 4) |
|---|---|---|
| ASL定義 | JSONをコンソールに貼り付け | templatefile() で動的に生成 |
| Lambda ARN | コンソール上で手動選択 | ${get_customer_arn} プレースホルダーで補間 |
| 再現性 | 手順書が必要 | terraform apply で即再現 |
| バージョン管理 | 困難 | Gitで管理可能 |
ASL構造の一致確認: ステート名(GetCustomerInfo / CalculatePrice / FormatOutput)、Parameters、ResultSelector、ResultPath はすべてコンソール版(Section 3)と完全一致しています。Terraformの ${...} プレースホルダーはLambda ARNのみに使用しており、Step Functionsのロジックには一切変更を加えていません。
基本的なパイプラインの構築が終わったところで、より高度なデータ変換テクニックを見ていこう。
5. 応用パターン集(Intrinsic Functions活用)
この Section では、データフロー制御でよく使われる応用パターンを ASL 例とともに解説します。
概念と実装例のみ紹介します(ハンズオン手順は省略)。
5-1. Intrinsic Functions を使ったデータ変換
Intrinsic Functions = ASL 内でデータ変換を Lambda 不要で行える組み込み関数です。Parameters / ResultSelector 内で使用でき、ステート定義の中に直接記述できます。
主要な Intrinsic Functions 一覧:
| 関数 | 用途 | 例 |
|---|---|---|
States.Format | 文字列フォーマット | "States.Format('注文 {} 処理完了', $.order_id)" |
States.StringToJson | 文字列を JSON にパース | EventBridge の detail が文字列の場合に便利 |
States.JsonToString | JSON を文字列にシリアライズ | SQS/SNS へのメッセージ本文に使用 |
States.Array | 複数値から配列を生成 | "States.Array($.a, $.b, $.c)" |
States.ArrayGetItem | 配列から要素取得 | "States.ArrayGetItem($.items, 0)" |
States.ArrayLength | 配列の長さ取得 | 件数チェックに使用 |
States.ArrayUnique | 配列の重複除去 | 重複 ID の除去など |
States.Base64Encode | Base64 エンコード | バイナリデータの文字列化 |
States.Base64Decode | Base64 デコード | エンコードされたペイロードの復元 |
States.Hash | SHA256/MD5 ハッシュ生成 | データの整合性チェック |
States.JsonMerge | JSON 同士をマージ | 複数ソースのデータ統合 |
States.MathAdd | 数値加算 | カウンターのインクリメント |
States.MathRandom | 乱数生成 | A/B テストの振り分けなど |
States.UUID | UUID v4 生成 | 実行ごとに一意の ID を付与 |
States.ArrayContains | 配列に値が含まれるか判定 | 条件分岐のフラグ生成 |
ASL 例(States.Format + States.UUID):
"GenerateOrderId": {
"Type": "Pass",
"Parameters": {
"order_id.$": "States.UUID()",
"message.$": "States.Format('注文 {} を受け付けました', $.customer_name)",
"processed_at.$": "$$.Execution.StartTime"
},
"Next": "ProcessOrder"
}
このステートは Lambda を呼び出さずに、
– States.UUID() でユニークな注文 ID を生成
– States.Format() で顧客名を埋め込んだメッセージを生成
– $$.Execution.StartTime で実行開始時刻を付与
という 3 つの変換を一度に行います。
5-2. Context Object ($$) を使った実行情報の埋め込み
Context Object = $$ で参照できる Step Functions の実行コンテキスト情報です。
ステートマシンの実行 ID や開始時刻など、AWS が自動的に付与するメタデータにアクセスできます。
主要な Context Object フィールド:
| フィールド | 内容 |
|---|---|
$$.Execution.Id | 実行 ARN(一意の識別子) |
$$.Execution.Name | 実行名 |
$$.Execution.StartTime | 実行開始時刻(ISO 8601) |
$$.Execution.Input | 実行時の入力(全体) |
$$.State.Name | 現在のステート名 |
$$.State.EnteredTime | ステート開始時刻 |
$$.Task.Token | .waitForTaskToken 用トークン |
$$.StateMachine.Id | ステートマシン ARN |
$$.StateMachine.Name | ステートマシン名 |
ASL 例(実行 ID を Lambda に渡す):
"ProcessWithContext": {
"Type": "Task",
"Resource": "<LAMBDA_ARN>",
"Parameters": {
"order_id.$": "$.order_id",
"execution_id.$": "$$.Execution.Id",
"execution_name.$": "$$.Execution.Name",
"started_at.$": "$$.Execution.StartTime"
},
"ResultPath": "$.result",
"Next": "Done"
}
重要:
$$は Context Object 参照、$は State 入力参照です。同一のParametersブロック内で混在して使用できます。
Context Object を活用することで、分散トレーシングや監査ログに実行 ID を自動付与でき、Lambda 側で ID を生成・管理する手間が省けます。
5-3. InputPath + OutputPath のみで軽量フィルタリング
Lambda を呼び出さずに Pass State で InputPath / OutputPath を組み合わせることで、ペイロードから必要な部分だけを抽出できます。
"FilterState": {
"Type": "Pass",
"InputPath": "$.order",
"Parameters": {
"essentials": {
"id.$": "$.id",
"amount.$": "$.amount"
}
},
"OutputPath": "$.essentials",
"Next": "NextState"
}
ユースケース: 大きなペイロードから必要な部分だけを次のステートに渡す。
Lambda の 6 MB ペイロード制限対策として有効で、不要なフィールドを早期に除去することで後続ステートの処理負荷も軽減できます。
5-4. ResultPath null でサイドエフェクト専用ステート
"ResultPath": null = Task の結果を破棄し、入力データをそのまま次のステートに渡すパターンです。
通知・ログ記録・監査など、元データを変更せずに副作用だけを実行したい場合に使用します。
ASL 例(監査ログ記録 + データパッシング):
"AuditLog": {
"Type": "Task",
"Resource": "<LAMBDA_ARN_AUDIT>",
"Parameters": {
"event_type": "order_processed",
"order_id.$": "$.order_id",
"execution_id.$": "$$.Execution.Id"
},
"ResultPath": null,
"Next": "NotifyCustomer"
}
AuditLog Lambda が何を返しても、State のデータは変更されません。NotifyCustomer ステートには AuditLog 実行前と同じ入力がそのまま渡されます。
データパイプラインの途中でログ記録・通知を挟む定番パターンであり、処理の主系統に副作用を追加する際の標準的な実装方法です。
6. ハンズオン後の削除手順
6-1. コスト注意事項
⚠️ 本ハンズオンで作成したリソースは無料枠内で動作しますが、長期間放置すると課金が発生します。
| リソース | 月額目安 | 備考 |
|---|---|---|
| Lambda | 無料枠 100 万リクエスト以内なら無料 | 本ハンズオン程度では課金なし |
| Step Functions | 4,000 回の状態遷移まで無料 | 本ハンズオン程度では課金なし |
| CloudWatch Logs | ~$0.50/GB(保存)+ ~$0.01/GB(取得) | ロググループを削除することで停止 |
ハンズオン完了後は、不要なリソースを削除しておくことを推奨します。
6-2. Terraform で構築した場合
以下のコマンド 1 つで、Terraform で作成したリソースをまとめて削除できます。
cd terraform/
terraform destroy
実行すると削除対象リソースの一覧が表示され、確認後に削除が実行されます。
terraform destroy で削除されないリソースへの注意:
retain_on_delete を設定した CloudWatch Logs ロググループは Terraform 管理外となるため、手動削除が必要です。
aws logs delete-log-group --log-group-name /aws/lambda/get-customer-info
aws logs delete-log-group --log-group-name /aws/lambda/calculate-price
6-3. コンソールで構築した場合の削除チェックリスト
コンソールから手動で構築した場合は、以下の順番で削除してください。
- [ ] Step Functions ステートマシン(
order-enrichment-pipeline)を削除 - マネジメントコンソール → Step Functions → ステートマシン → 選択して削除
- [ ] Lambda 関数(
get-customer-info/calculate-price)を削除 - マネジメントコンソール → Lambda → 関数 → 各関数を選択して削除
- [ ] IAM ロール(
sf-dataflow-execution-role)と付随するポリシーを削除 - マネジメントコンソール → IAM → ロール → 検索して削除
- [ ] CloudWatch Logs ロググループ(Lambda 名のもの)を削除
- マネジメントコンソール → CloudWatch → ロググループ → 各グループを選択して削除
7. まとめと次のステップ
7-1. この記事で学んだこと
本記事では、Step Functions のデータフロー制御を支える 5 つのフィルタとその周辺技術を学びました。
- 5 つのフィルタの役割と評価順序:
InputPath→Parameters→ResultSelector→ResultPath→OutputPathの順で評価され、それぞれが入出力データの異なる側面を制御する Parametersの".$"サフィックスの重要性:".$"がないと JSONPath ではなく文字列リテラルとして扱われ、意図しない動作の原因になるResultPath "$.key"でデータを保持しながらステートを積み重ねるパターン: 各ステートの結果を別キーに保存することで、全ステップの出力を最終ステートまで引き継げるResultPath nullでサイドエフェクト専用ステートを実現するパターン: 通知・監査ログなどの副作用を、メインのデータフローに影響を与えずに挿入できる- Context Object (
$$) を使った実行コンテキスト情報の活用: 実行 ID・開始時刻・ステートマシン名などを Lambda に自動付与でき、トレーサビリティが向上する - Intrinsic Functions(
States.Format/States.UUID等)で Lambda 不要のデータ変換: 文字列フォーマット・UUID 生成・配列操作などを ASL 内で完結させ、Lambda コスト・レイテンシを削減できる - enrichment パイプラインでの全フィルタ総合活用: 顧客情報取得 → 価格計算 → レポート生成という実践的なパイプラインで、5 つのフィルタの連携パターンを体験した
7-2. 次のステップ
Step Functions の世界はさらに広がります。今後のシリーズで以下のトピックを解説予定です。
- Callback パターン(
.waitForTaskToken) — 人間の承認フローや非同期ジョブ待機など、外部イベントを待機する高度なパターン - Distributed Map — 大規模な並列バッチ処理を効率的に実現するためのステートタイプ
- Express vs Standard Workflow の使い分け — コスト・実行時間・実行保証レベルの違いを理解し、用途に応じて選択する方法
- SDK Integration(Lambda 不要で直接 AWS サービスを呼び出す) — Lambda を介さずに DynamoDB・SQS・SNS などを直接操作し、シンプルかつ低コストなワークフローを構築する方法
7-3. シリーズリンク
本記事は「AWS ハンズオン TechBlog」Step Functions シリーズの第 4 回です。
シリーズ一覧:
– 第 1 回: AWS Step Functions 入門 — コンソールと Terraform で学ぶハンズオン
– 第 2 回: ECS × Step Functions 入門 — CSV バッチを Fargate タスクでジョブ化するハンズオン
– 第 3 回: Step Functions エラーハンドリング完全ガイド — 注文処理パイプラインで学ぶ Retry/Catch/Timeout
– 第 4 回: 本記事(Step Functions 入出力データフロー制御完全ガイド)
7-4. 参考リンク
- Amazon States Language — 入出力処理
- Step Functions Intrinsic Functions リファレンス
- Context Object リファレンス
- Step Functions クォータ(ペイロードサイズ制限)