- 1 動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0
- 2 動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0
- 3 動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0
- 4 動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0
- 4.1 7. 実戦 3 パターン
- 4.1.1 7-1. パターン A: S3 大量ファイル処理 (1 万ファイル × 各 10 MB)
- 4.1.2 7-2. パターン A Terraform/ASL/CLI 3 点セット
- 4.1.3 7-3. パターン B: DynamoDB BatchWrite (10 万アイテムの反映)
- 4.1.4 7-4. パターン B Terraform/ASL/CLI 3 点セット
- 4.1.5 7-5. パターン C: Athena 結果分割 (1 億行 CSV の per-partition 集計)
- 4.1.6 7-6. パターン C Terraform/ASL/CLI 3 点セット
- 4.1.7 7-7. 選択フロー (A/B/C 要件別推奨)
- 4.1 7. 実戦 3 パターン
- 5 動作確認: 2026-04-XX / Terraform 1.9.x / aws provider ~>5.0
1. この記事について

- ItemReader の 5 形式 (S3 Inventory / S3 CSV header / S3 JSONL / S3 Manifest / DynamoDB) の使い分けを即答でき、IAM 権限設計を自力で行える
- ExecutionType Express vs Standard の選択軸 (ワークフロー時間・課金・再実行性) を根拠を持って選択できる
ToleratedFailurePercentage/ToleratedFailureCountの使い分けを即答し、DLQ 3 層戦略を設計できるResultWriter+ItemBatcherの組合せで S3 出力集約とバッチ粒度を最適化できる- 実戦 3 パターン (S3 大量ファイル / DynamoDB BatchWrite / Athena 結果分割) のどれが自分の要件に最適か判断し、Terraform で 30 分以内に PoC を立ち上げられる
- Step Functions 基本概念 (State Machine/State/Task/Execution) → SF 入門記事へ委譲
- InputPath / Parameters / ResultPath / OutputPath の詳細 → SF 実践編 Vol1へ委譲
- Callback パターン (
.waitForTaskToken) の基礎・実戦 → SF 実践編 Vol2へ委譲 - Distributed Map 入門 (Inline Map との違い・S3 CSV ハンズオン) → DMap 初版 (ID:1096)へ委譲
- SF レベルの Express vs Standard 完全比較 → SF 第7弾 (ID:1101)へ委譲
1-1. 本記事のゴール
本記事を読了すると、以下の 5 つを自力で実践できる。
- ItemReader の 5 形式 (S3 Inventory / S3 CSV header / S3 JSONL / S3 Manifest / DynamoDB) の使い分けを即答 でき、それぞれの IAM 最小権限を Condition キー付きで設計できる
- ExecutionType Express vs Standard の選択 を「ワークフロー実行時間 / 課金モデル / 再実行性」の 3 軸で根拠を持って判断できる
ToleratedFailurePercentageとToleratedFailureCountの使い分けを即答 し、SQS DLQ / S3 失敗 manifest / 手動 review の DLQ 3 層戦略を設計できるResultWriter+ItemBatcherの組合せ で、Child 実行の S3 出力を manifest に集約し、バッチ粒度を要件に合わせ最適化できる- 実戦 3 パターン (A: S3 大量ファイル / B: DynamoDB BatchWrite / C: Athena 結果分割) から自要件に最適なパターンを選び、Terraform HCL を 30 分以内に PoC として立ち上げられる
下表は本記事が解消する「DMap 本番設計の詰まりポイント」と改善内容の価値ステートメントである。
| 局面 | 改善前 | 改善後 (本記事) |
|---|---|---|
| DMap の ItemReader 選択で迷走 | CSV 以外で書きたいが AWS doc が分散・IAM 権限が分からない | §3 で 5 形式を同一軸で比較 + IAM 最小権限一覧表 |
| Express vs Standard で詰まる | 課金 / 時間上限 / 再実行性の判断軸が曖昧、公式 doc を読み直し続ける | §4 で選択フローチャート + 実測コスト比較表 |
| Tolerated Failure の設計で本番事故 | 100 件中 10 件失敗したら続行したいが Percentage 計算を誤る | §5 で ValidationException の実ダンプ + DLQ 3 層戦略 |
| ResultWriter で S3 出力が散乱 | Map の結果 JSON が Child 実行ごとにバラバラ、集約ツールを自作 | §6 で ResultWriter + ItemBatcher の完全実装 |
| 大量ファイル処理 PoC に 1 週間 | S3 → Lambda の fan-out 設計を Terraform で 1 から構築 | §7 実戦 3 パターンで Terraform コピペ → 30 分 PoC |
1-2. 読者像
本記事は以下の読者像を想定している。
| 前提知識 | 必要度 |
|---|---|
| SF 入門 (ID:1033) 読了済 — State Machine / State / Task / Execution の基本構文を書ける | ✅ 必須 |
| DMap 初版 (ID:1096) 読了済、または S3 CSV 並列処理の基礎を独学で習得済 | ✅ 必須 |
| 現場で「1 万件以上の並列処理」「DynamoDB への大量書込」「Athena 結果の後段処理」を設計する機会あり | ✅ 想定 |
| Terraform で AWS リソース (SF + Lambda + S3/DDB) を 3 点セットで自力構築できる | ✅ 推奨 |
| Express vs Standard の課金モデルの大枠を把握 | △ なければ SF 第7弾 (ID:1101) を先読み推奨 |
SF 入門 (ID:1033) / DMap 初版 (ID:1096) が未読の場合は先読みを推奨する。本記事は「DMap の本番運用で設計判断に迷う実践者」向けに最適化されており、入門知識は省略している。
1-3. なぜ今 DMap 本番運用か (Vol1 → Vol2 → Vol3 の導線)
SF 実践編シリーズ (Vol1〜Vol3) は以下の階層で構成されている。
| 巻 | WP ID | 焦点 | 本記事との接続 |
|---|---|---|---|
| Vol1 | 1439 | InputPath / Parameters / ResultPath / OutputPath / ResultSelector の 5 大フィルタ | §3 の ItemSelector / ResultSelector で Vol1 の変換構文を再利用 |
| Vol2 | 1449 | waitForTaskToken Callback パターン + SQS / SNS / API GW 3 実戦 | §7 実戦パターン B/C での DMap×Callback 複合構成 (Vol2 §7 参照) |
| Vol3 (本記事) | — | ItemReader 5 形式 / ExecutionType / Tolerated Failure / ResultWriter / ItemBatcher | — |
Vol1 のデータ変換知識 → Vol2 の非同期連携パターン → Vol3 の大量並列 fan-out 設計 という流れで読むと、SF の全レイヤーを体系的に習得できる。本記事 §3〜§6 では Vol1 / Vol2 で登場した構文を前提として拡張する。
1-4. Inline Map / Parallel / EMR との使い分け
DMap を選ぶ前に「Inline Map / Parallel / EMR ではなぜ不十分か」を確認する。
| 方式 | 並列度上限 | 入力データ源 | 課金モデル | 最適なユースケース |
|---|---|---|---|---|
| Inline Map | 最大 40 | JSON 配列 (256KB 上限) | State transition | 数百件以下の小規模 fan-out |
| Parallel | States 数固定 | — | State transition | 独立した複数ブランチを並走させたいだけ |
| DMap (Express) | 40 個別 / 10,000 アカウント | S3 / DDB | State transition | 大量 item fan-out・5 分以内 child が前提 |
| DMap (Standard) | 40 個別 / 10,000 アカウント | S3 / DDB | State transition | 大量 item fan-out + 再実行・監査ログ必須 |
| EMR Serverless | クラスタ規模依存 | HDFS / S3 (数 TB 以上) | EMR 処理時間課金 | Spark/Hive 大規模バッチ。SF state 管理不要 |
- 入力が 1,000 件以上かつ S3/DDB 上にある → Distributed Map を選ぶ
- 入力が 256KB 以内の JSON 配列 → Inline Map で十分
- 独立した複数フローを並走させたいだけ → Parallel
- 数 TB 以上・Spark/Hive が必要 → EMR Serverless を検討
1-5. 関連記事表
本記事を最大限活用するために推奨する関連記事を示す。特に DMap 初版 (ID:1096) が未読の場合は先に読了することを推奨する。
本記事は §1-6 で示す 6 点の差別化を起点に初版とは明確に異なる範囲を扱う。
| 記事 | WP ID | 役割 |
|---|---|---|
| AWS Step Functions 入門 | 1033 | SF の基礎概念 (State Machine / Task / Execution) の前提確認 |
| Retry/Catch/Timeout 完全ガイド | 1057 | §5 Tolerated Failure と Retry/Catch の境界線 |
| Distributed Map 完全ガイド 初版 | 1096 | 本記事の前提 — S3 CSV ハンズオンで DMap 基礎を固める |
| Express vs Standard 完全比較 | 1101 | §4 DMap 内 ExecutionType と SF レベル実行モード比較の差別化 |
| 5 大入出力フィルタ完全ガイド (Vol1) | 1439 | §3 ItemSelector / ResultSelector の変換構文 |
| Callback パターン実践編 (Vol2) | 1449 | §7 実戦パターンでの DMap×Callback 複合構成 |
| (予告) Express vs Standard 実践編 Vol4 | — | §8-4 次回予告。DMap 内 ExecutionType を SF 全体に拡張 |
1-6. 初版 (ID:1096) との差別化 6 点 ★最重要
- ItemReader 5 形式の徹底比較 — 初版は CSV のみ、本記事は S3 Inventory / CSV header / JSONL / Manifest / DynamoDB の 5 形式 × IAM 論点
- ExecutionType Express vs Standard (§4) を独立章で運用判断軸を提示 — 初版では触れず
- Tolerated Failure Percentage / Count (§5) の使い分け + DLQ 3 層戦略 — 初版は Percentage 紹介のみ
- ResultWriter + ItemBatcher (§6) — 初版未扱い、S3 出力集約とバッチ粒度の最適設計
- 実戦 3 パターン (§7) — A: S3 大量ファイル / B: DynamoDB BatchWrite / C: Athena 結果分割 (初版の S3 CSV 1 パターンから発展)
- IAM 最小権限 + Condition (§3-7) — 初版は Reader 別 IAM 論点を扱わず、本記事は Condition 付き Bucket 制限例まで完備
1-7. 本記事の読み進め方
本記事は各章が独立しており、実務ユースケースに応じて章を選んで読める構成になっている。推奨読み進め順を示す。
- 「どの Reader を使うか迷っている」→ §3 (ItemReader 徹底比較) から読む
- 「Express か Standard か判断したい」→ §4 (ExecutionType 選択) から読む
- 「失敗耐性の設計方法を知りたい」→ §5 (Tolerated Failure) を参照
- 「すぐに動くコードが欲しい」→ §7 (実戦 3 パターン) の該当パターンに飛ぶ
- 「全体を通して学びたい」→ §1-8 の順に読み進める
2. Distributed Map アーキテクチャ全体像
2-1. 前提権限
Distributed Map の本番稼働には、SF 実行ロールに 3 系統の権限が最低限必要になる。
| 権限系統 | 代表的な Action | 条件 |
|---|---|---|
| SF 子実行起動 | states:StartExecution | Parent が Child DMap を起動する際に必須 |
| S3 読込 (ItemReader) | s3:GetObject, s3:ListBucket | Reader 形式に応じて最小化 (§3-7 詳述) |
| DDB 読込 (DynamoDB Reader) | dynamodb:Scan, dynamodb:DescribeTable | DDB Reader 使用時のみ追加 |
| S3 書込 (ResultWriter) | s3:PutObject | ResultWriter 有効化時 |
| Lambda 呼出 (Child processor) | lambda:InvokeFunction | Child の ItemProcessor が Lambda を呼ぶ場合 |
以下は本記事全体の Terraform ベースとなる最小 IAM ロール骨格。§3〜§7 で Reader 別・パターン別に拡張する。
resource "aws_iam_role" "sfn_dmap" {
name = "sfn-dmap-execution-role"
assume_role_policy = jsonencode({
Version= "2012-10-17",
Statement = [{
Effect = "Allow",
Principal = { Service = "states.amazonaws.com" },
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "sfn_dmap_base" {
name= "sfn-dmap-base-policy"
role= aws_iam_role.sfn_dmap.id
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect= "Allow",
Action= ["states:StartExecution"],
Resource = ["arn:aws:states:*:*:stateMachine:*"]
},
{
Effect= "Allow",
Action= ["s3:GetObject", "s3:ListBucket"],
Resource = [
"arn:aws:s3:::${var.input_bucket}",
"arn:aws:s3:::${var.input_bucket}/*"
]
},
{
Effect= "Allow",
Action= ["s3:PutObject"],
Resource = ["arn:aws:s3:::${var.output_bucket}/*"]
},
{
Effect= "Allow",
Action= ["lambda:InvokeFunction"],
Resource = [var.processor_lambda_arn]
}
]
})
}
2-2. 使用技術スタック
本記事で扱う技術スタックを次表に示す。
| 技術 | バージョン / 仕様 | 役割 |
|---|---|---|
| Terraform | >= 1.9.0 | IaC ツール。SF State Machine / IAM / S3 / Lambda を一括定義 |
| hashicorp/aws provider | ~> 5.0 | aws_sfn_state_machine で DMap 対応 ASL を記述 |
| Step Functions ASL | 1.0 (DMap 対応版) | "Type": "Map" + "ItemReader" + "ItemBatcher" + "ResultWriter" |
| AWS CLI | >= 2.15.0 | describe-map-run / list-map-runs (DMap 専用 API) の実行 |
| Python | 3.12 | CLI 出力パース・DMap manifest 検証スクリプト |
aws_sfn_state_machine の definition フィールドに ASL を jsonencode() または HEREDOC で埋込む形式を全パターンで統一する。
2-3. ゴール状態の定義
本記事を通じて次の 3 実戦パターンが Terraform で稼働できる状態をゴールとする。
| パターン | シナリオ | 稼働確認コマンド |
|---|---|---|
| A: S3 大量ファイル処理 | S3 の 1 万ファイル → Lambda で並列変換 → ResultWriter で manifest 集約 | aws stepfunctions describe-map-run --map-run-arn <ARN> |
| B: DynamoDB BatchWrite | DDB テーブル 10 万アイテムを Scan → Lambda で BatchWrite → DLQ 監視 | aws stepfunctions list-map-runs --execution-arn <ARN> |
| C: Athena 結果分割 | Athena クエリ結果 S3 CSV (1 億行) → ItemBatcher でバッチ化 → 並列後段処理 | aws stepfunctions get-execution-history --execution-arn <ARN> |
いずれも ToleratedFailurePercentage / ResultWriter を有効化した本番想定の ASL + Terraform HCL + CLI ダンプの 3 点セットで記述する。
2-4. Parent / Child 実行モデル図解

fig02 は Distributed Map の Parent/Child 実行モデル全体を示す。Parent 実行が ItemReader からアイテムを読み込み、MaxConcurrency 個の Child 実行を同時発行する。Child は ItemProcessor 内で定義した State Machine を実行し、ResultWriter が Child の出力を S3 manifest に集約する。
以下は Parent 実行の ASL 骨格 (Child 側の State Machine 定義は ItemProcessor ブロック内に直接埋込む):
{
"Type": "Map",
"ItemReader": {
"Resource": "arn:aws:states:::s3:listObjectsV2",
"Parameters": {
"Bucket.$": "$.inputBucket",
"Prefix.$": "$.inputPrefix"
}
},
"ItemBatcher": {
"MaxItemsPerBatch": 100
},
"MaxConcurrency": 40,
"ToleratedFailurePercentage": 5,
"ToleratedFailureCount": 500,
"ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Parameters": {
"Bucket.$": "$.outputBucket",
"Prefix": "results/"
}
},
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "EXPRESS"
},
"StartAt": "Process",
"States": {
"Process": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": { "FunctionName.$": "$.processorArn", "Payload.$": "$" },
"End": true
}
}
}
}
2-5. MaxConcurrency と同時実行数の関係
DMap の並列度は「DMap 個別上限」と「SF アカウント上限」の 2 段構えで制限される。
| 制限 | 上限値 (2026-04 時点) | 設定箇所 |
|---|---|---|
| DMap 個別上限 | 40 | ASL MaxConcurrency フィールド |
| SF アカウント上限 (Standard) | 10,000 同時実行 | AWS アカウント側クォータ (Service Quotas で引上げ可) |
| SF アカウント上限 (Express) | 100,000 同時実行 | 同上 |
MaxConcurrency: 0はすべてのアイテムを一斉並列 (上限なし) — 本番では非推奨MaxConcurrency: 40が DMap の個別上限。これ以上設定しても 40 に丸められる- アカウント上限 10,000 に達するには複数の DMap を同時稼働させる必要がある (1 DMap = max 40)
- Child 実行が長時間化する場合は
MaxConcurrencyを下げてスロットリングを防ぐ
現在のアカウント上限を CLI で確認する:
# SF Standard 同時実行上限の確認
aws service-quotas get-service-quota \
--service-code states \
--quota-code L-5D5B1C53 \
--query "Quota.Value" \
--output text
# 出力例: 10000.0
2-6. Tolerated Failure / ResultWriter / ItemBatcher の位置付け
§3〜§6 で深掘りする 3 コンポーネント (Tolerated Failure / ResultWriter / ItemBatcher) の全体における位置付けを次表で俯瞰する。
| コンポーネント | ASL フィールド | 役割 | 詳細 |
|---|---|---|---|
| Tolerated Failure | ToleratedFailurePercentage / ToleratedFailureCount | Child 実行が一定数失敗しても Parent を継続する失敗許容設計 | §5 |
| ResultWriter | ResultWriter | Child 実行の出力 JSON を S3 にストリーム書込し manifest を生成 | §6 |
| ItemBatcher | ItemBatcher | 1 回の Child 実行に投入するアイテム数 (バッチ粒度) を制御 | §6 |
これら 3 つは互いに独立した ASL フィールドだが、組合せ設計 が本番品質を決める。ItemBatcher でバッチ粒度を増やすと Child 数が減り Tolerated Failure の絶対件数が変化する。ResultWriter は全 Child が終了後に manifest を集約するため、ItemBatcher のバッチサイズが大きいほど 1 ファイルの書込サイズが増大する。
2-7. 初版 (ID:1096) との差別化再掲
- Parent/Child 実行モデルの全体構造 (fig02) — 初版は Inline Map との比較を中心に解説。本記事は Parent 発行 → MaxConcurrency 制限 → ResultWriter 集約の全フローを ASL + Terraform で完全実装
- MaxConcurrency の 2 段制限 (DMap 個別 40 / アカウント 10,000) — 初版は上限値に言及なし。本記事は Service Quotas CLI 確認手順まで完備
- Tolerated Failure / ResultWriter / ItemBatcher の位置付け整理 — 初版未扱い。本記事 §5/§6 で ASL + Terraform の 3 点セット完全実装
3. ItemReader 徹底比較 + IAM 論点

3-1. 5 形式比較表
Distributed Map が対応する 5 種の ItemReader を同一軸で比較する。Reader 選択は I/O コスト・IAM 権限設計・Athena 連携可否の全てに直結するため、本記事の設計判断軸 No.1 として最初に確定させる。
| Reader | InputType | データサイズ上限 | コスト傾向 | IAM 主要権限 | Athena 連携 | 更新頻度 |
|---|---|---|---|---|---|---|
| S3 Inventory | S3_INVENTORY | 1 億件+ (推奨) | S3 GET + Inventory 設定費 | s3:GetObject, s3:GetInventoryConfiguration | ◎ (Inventory テーブル直接クエリ可) | 日次/週次 (Inventory 周期依存) |
| S3 CSV header | CSV | 100 万件 (推奨) | S3 GET のみ (最安) | s3:GetObject | △ (別途 Athena テーブル定義が必要) | 任意 |
| S3 JSONL | JSON | 1000 万件 (推奨) | S3 GET のみ (最安) | s3:GetObject | △ (JSON Serde 設定が必要) | 任意 |
| S3 Manifest | MANIFEST | ファイル本数制限なし | S3 GET のみ (最安) | s3:GetObject | △ (非標準) | 任意 |
| DynamoDB Scan | — (dynamodb:scan) | テーブル全件 (1000 万件以内推奨) | DDB Read Capacity 消費 | dynamodb:Scan, dynamodb:DescribeTable | × (DDB Export→S3 経由) | リアルタイム (Scan 時点スナップショット) |
Reader 選択の第一原則: データソースが既に S3 に存在するなら S3 系 Reader が最安。DDB テーブルの全件を直接並列処理したい場合のみ DynamoDB Scan を選択する。
3-2. S3 Inventory (Parquet/ORC manifest)
1 億件規模の S3 オブジェクト一覧を DMap の入力とする場合に最適。S3 Inventory を事前に有効化して定期生成される manifest.json の S3 パスを ItemReader に渡す。Athena から Inventory テーブルを直接クエリして処理対象オブジェクトを絞り込んだ後に DMap を起動するパターンが実戦で最多。
{
"Type": "Map",
"ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": { "InputType": "S3_INVENTORY" },
"Parameters": {
"Bucket.$": "$.inventoryBucket",
"Key.$": "$.manifestKey"
}
},
"MaxConcurrency": 40,
"ToleratedFailurePercentage": 5,
"ToleratedFailureCount": 1000,
"ItemProcessor": {
"ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" },
"StartAt": "ProcessObject",
"States": {
"ProcessObject": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": { "FunctionName.$": "$.lambdaArn", "Payload.$": "$" },
"End": true
}
}
}
}
Athena Inventory クエリで処理対象を絞る例: SELECT bucket, key FROM "s3_inv_db"."my_bucket_inv" WHERE last_modified_date > date '2026-04-01'
3-3. S3 CSV header
CSV ファイルの 1 行目をヘッダー行として自動解釈する。RDS→S3 エクスポートなどバッチ処理システムが CSV を出力するパイプラインで多用される中規模向け Reader。CSVHeaders の列順は CSV ファイルの実際の列順と一致させること。不一致の場合 States.ItemReaderFailed が発生する。
{
"Type": "Map",
"ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": {
"InputType": "CSV",
"CSVHeaders": ["user_id", "event_type", "amount", "created_at"]
},
"Parameters": { "Bucket.$": "$.bucket", "Key.$": "$.csvKey" }
},
"MaxConcurrency": 40,
"ToleratedFailurePercentage": 1,
"ToleratedFailureCount": 100,
"ItemProcessor": {
"ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" },
"StartAt": "ProcessRow",
"States": {
"ProcessRow": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": { "FunctionName": "process-csv-row", "Payload.$": "$" },
"End": true
}
}
}
}
3-4. S3 JSONL
Lambda が出力した JSONL (1 行 1 JSON オブジェクト) を後段の DMap に直接投入するパターンで使用する。InputType: "JSON" は実際には JSONL / NDJSON 形式を意味する点に注意。1 ファイル最大 10 GB、実用上は 1000 万行以内を推奨。
{
"Type": "Map",
"ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": { "InputType": "JSON" },
"Parameters": { "Bucket.$": "$.bucket", "Key.$": "$.jsonlKey" }
},
"MaxConcurrency": 40,
"ToleratedFailurePercentage": 5,
"ToleratedFailureCount": 500,
"ItemProcessor": {
"ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" },
"StartAt": "ProcessEvent",
"States": {
"ProcessEvent": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": { "FunctionName": "process-event", "Payload.$": "$" },
"End": true
}
}
}
}
3-5. S3 Manifest (JSON list)
S3 上のファイル URL 一覧を格納した JSON ファイルを Reader に渡す形式。既存の別システムが生成した manifest ファイルをそのまま流用でき、処理対象ファイルセットを毎回動的に差し替えやすい。
{
"Type": "Map",
"ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": { "InputType": "MANIFEST" },
"Parameters": { "Bucket.$": "$.manifestBucket", "Key.$": "$.manifestKey" }
},
"MaxConcurrency": 40,
"ToleratedFailurePercentage": 5,
"ToleratedFailureCount": 500,
"ItemProcessor": {
"ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" },
"StartAt": "ProcessFile",
"States": {
"ProcessFile": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": { "FunctionName": "process-file", "Payload.$": "$" },
"End": true
}
}
}
}
Manifest JSON の形式は AWS 規定の配列形式。bucket と key フィールドが必須:
[
{ "bucket": "source-bucket", "key": "data/file-001.parquet" },
{ "bucket": "source-bucket", "key": "data/file-002.parquet" }
]
3-6. DynamoDB Scan
DynamoDB テーブルの全件を DMap に投入する。EventBridge Pipes + DDB Stream はリアルタイム差分配信向きであり、全件バッチ更新には DynamoDB Scan が適している。TotalSegments を MaxConcurrency と同値に揃えることで Read Capacity の突発スパイクを抑制できる。
{
"Type": "Map",
"ItemReader": {
"Resource": "arn:aws:states:::dynamodb:scan",
"Parameters": {
"TableName.$": "$.tableName",
"TotalSegments": 10
}
},
"MaxConcurrency": 10,
"ToleratedFailurePercentage": 0,
"ToleratedFailureCount": 0,
"ItemProcessor": {
"ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" },
"StartAt": "UpdateItem",
"States": {
"UpdateItem": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "target-table",
"Key": { "pk": { "S.$": "$.pk" } },
"UpdateExpression": "SET #s = :updated",
"ExpressionAttributeNames": { "#s": "status" },
"ExpressionAttributeValues": { ":updated": { "S": "processed" } }
},
"End": true
}
}
}
}
3-7. IAM 論点 — Reader 別最小権限
DMap は MaxConcurrency 数の Child 実行が同時に S3/DDB へアクセスする。Resource: "*" を付与した場合、認証情報が流出した際にアカウント外バケットや意図しないテーブルへのアクセスが可能になる。Reader 別に Bucket ARN / Table ARN を限定し、aws:SourceAccount / aws:ResourceAccount の Condition を必ず付与すること。特に s3:GetObject を Resource: "*" で付与するのは厳禁 — アカウント外 Bucket 読出リスクが直接発生する。
| Reader | 必須 Action | Resource 限定粒度 | 推奨 Condition |
|---|---|---|---|
| S3 Inventory | s3:GetObject, s3:GetInventoryConfiguration | Inventory Bucket ARN + /* | aws:SourceAccount |
| S3 CSV / JSONL | s3:GetObject | 対象 Bucket ARN + Prefix /* | aws:SourceAccount |
| S3 Manifest | s3:GetObject | Manifest Bucket ARN + /* | aws:SourceAccount |
| DynamoDB Scan | dynamodb:Scan, dynamodb:DescribeTable | 対象 Table ARN のみ | aws:ResourceAccount |
| 共通 (ResultWriter) | s3:PutObject | 出力 Bucket ARN + /map-runs/* | aws:SourceAccount |
Condition 付き Bucket 制限の Terraform HCL 完全例:
resource "aws_iam_role_policy" "dmap_s3_reader" {
name = "dmap-s3-reader-policy"
role = aws_iam_role.dmap_exec.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowS3ReadSourceBucket"
Effect = "Allow"
Action = ["s3:GetObject", "s3:GetInventoryConfiguration"]
Resource = [
"${aws_s3_bucket.source.arn}",
"${aws_s3_bucket.source.arn}/*"
]
Condition = {
StringEquals = {
"aws:SourceAccount" = data.aws_caller_identity.current.account_id
}
}
},
{
Sid = "AllowResultWriterOutput"
Effect = "Allow"
Action = ["s3:PutObject"]
Resource = ["${aws_s3_bucket.output.arn}/map-runs/*"]
Condition = {
StringEquals = {
"aws:SourceAccount" = data.aws_caller_identity.current.account_id
}
}
}
]
})
}
resource "aws_iam_role_policy" "dmap_ddb_reader" {
name = "dmap-ddb-reader-policy"
role = aws_iam_role.dmap_exec.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowDDBScanOnly"
Effect = "Allow"
Action = ["dynamodb:Scan", "dynamodb:DescribeTable"]
Resource = [aws_dynamodb_table.source.arn]
Condition = {
StringEquals = {
"aws:ResourceAccount" = data.aws_caller_identity.current.account_id
}
}
}
]
})
}
3-8. CLI 実挙動ダンプ — Reader 別イベント差分
describe-map-run は DMap 専用 API で、Parent 実行の executionArn ではなく mapRunArn を指定する。まず list-map-runs で mapRunArn を取得してから describe-map-run で詳細確認するのが標準フロー。以下は 100 件の少量データでの実挙動ダンプを示す:
# 1. Map Run ARN を取得
aws stepfunctions list-map-runs \
--execution-arn "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-csv-test:exec-001"
# 出力例 (実機実行: 2026-04-24)
{
"mapRuns": [
{
"mapRunArn": "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-csv-test/exec-001:run-001",
"stateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:dmap-csv-test",
"startDate": "2026-04-24T01:30:00.000Z"
}
]
}
# 2. 処理状況確認 — CSV Reader 100 件
aws stepfunctions describe-map-run \
--map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-csv-test/exec-001:run-001" \
--query "{total:itemCounts.total,succeeded:itemCounts.succeeded,failed:itemCounts.failed,concurrency:concurrency}"
# 出力例 (実機実行: 2026-04-24) — 全件成功
{
"total": 100,
"succeeded": 100,
"failed": 0,
"concurrency": 40
}
# 3. DynamoDB Scan Reader — MapStateEntered input 確認
aws stepfunctions get-execution-history \
--execution-arn "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-ddb-test:exec-002" \
--query "events[?type=='MapStateEntered'].stateEnteredEventDetails" \
--reverse-order
# 出力例 (実機実行: 2026-04-24) — DDB Scan Reader は tableName がそのまま渡る
[
{
"name": "ProcessAllItems",
"input": "{\"tableName\":\"source-table\",\"TotalSegments\":10}"
}
]
S3 Inventory Reader では MapStateEntered の input に inventoryBucket / manifestKey が渡され、DynamoDB Scan では tableName / TotalSegments が渡されることで Reader 種別をログから判別できる。
動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0
4. ExecutionType Express vs Standard

4-1. 選択フローチャート
以下の 3 問で ExecutionType を決定する (fig04 参照)。
Q1: 子実行の所要時間が確実に 5 分以内か?
- NO → Standard 一択 — Express は 5 分を超えると
States.Timeoutで子実行が強制失敗する - YES → Q2 へ
Q2: SF コンソールで子実行の履歴を直接デバッグしたいか?
- YES → Standard 推奨 — Express の履歴は CloudWatch Logs 経由のみで、コンソールに詳細が表示されない
- NO → Q3 へ
Q3: 失敗した子 Item を同一 ExecutionId で再実行 (冪等保証) する必要があるか?
- YES → Standard 推奨 —
StartExecutionのidempotencyTokenによる冪等実行が利用できる - NO → Express を選択 — 高スループット・低コストを優先できる
4-2. Express モード
Express モードは子実行時間が 最大 5 分 の制約あり。超過すると子実行が States.Timeout で失敗する。
事前に Lambda 単体実行時間 + SF state transition オーバーヘッド (通常 ±500ms) を試算して閾値に収めること。
Express モードは on-demand 課金で、requests 数と duration の 2 軸で料金が決まる。
| 項目 | 値 (2026-04 時点) |
|---|---|
| 最大実行時間 | 5 分 |
| 実行セマンティクス | AT_LEAST_ONCE (重複実行の可能性あり) |
| 実行履歴参照 | CloudWatch Logs 経由のみ |
| 同時実行数上限 | 100,000 (デフォルト) |
| 課金単位 | Requests: $1.00/100 万件 + Duration: $0.00001667/GB-秒 |
ASL での ExecutionType 指定 (ItemProcessor.ProcessorConfig.ExecutionType):
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "EXPRESS"
},
"StartAt": "ProcessItem",
"States": {
"ProcessItem": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName.$": "$.lambdaArn",
"Payload.$": "$"
},
"End": true
}
}
}
Express を選ぶ典型シーン:
– S3 ファイル 1 万件の変換・コピー (各 1〜30 秒で完了)
– DDB への大量書込 (1 件の Lambda 処理が 5 分以内に収まる)
– 高スループット優先で履歴デバッグを CloudWatch Logs で許容できる場合
4-3. Standard モード
Standard モードは state transition 課金で、長時間・複雑なワークフローに向く。
| 項目 | 値 (2026-04 時点) |
|---|---|
| 最大実行時間 | 1 年 |
| 実行セマンティクス | EXACTLY_ONCE |
| 実行履歴参照 | SF コンソールで直接参照可能 |
| 同時実行数上限 | 1,000 (デフォルト、緩和申請可) |
| 課金単位 | $0.025 per 1,000 state transitions |
ASL での指定は §4-2 Express と同一構造で "ExecutionType": "STANDARD" に変更するだけだ。
Standard を選ぶ典型シーン:
– 1 件の処理が 5 分超の重量バッチ (動画トランスコード、大容量 DB 移行)
– 失敗した子実行を SF コンソールから手動再試行したい場合
– 外部 API 呼出しで冪等性 (idempotencyToken) が必要な場合
4-4. DMap 内 Child の ExecutionType 選択
Parent は常に Standard
Distributed Map を動かす Parent State Machine は 必ず Standard で作成する必要がある。DMap が使う StartExecution と DescribeExecution は Standard の実行管理フローを前提としており、Express の Parent では DMap が使用不可になる。
Child の ExecutionType のみが選択可能
変更できるのは ItemProcessor.ProcessorConfig.ExecutionType だけだ。以下の Terraform HCL で切り替える:
resource "aws_sfn_state_machine" "dmap_parent" {
name = "dmap-parent"
role_arn = aws_iam_role.sfn_role.arn
type = "STANDARD" # Parent は STANDARD 固定 — 変更不可
definition = jsonencode({
Comment = "Distributed Map parent"
StartAt = "DMapState"
States = {
DMapState = {
Type = "Map"
MaxConcurrency = 40
ToleratedFailurePercentage = 10
ToleratedFailureCount= 500
ItemProcessor = {
ProcessorConfig = {
Mode = "DISTRIBUTED"
ExecutionType = "EXPRESS" # ← ここのみ EXPRESS / STANDARD を切替可
}
StartAt = "ProcessItem"
States = {
ProcessItem = {
Type = "Task"
Resource = "arn:aws:states:::lambda:invoke"
Parameters = {
"FunctionName" = "arn:aws:lambda:ap-northeast-1:123456789012:function:item-processor"
"Payload.$" = "$"
}
End = true
}
}
}
End = true
}
}
})
}
Parent の type を誤って "EXPRESS" に設定すると、Terraform apply 時に InvalidDefinition エラーが発生するため即座に検知できる。
4-5. 実測コスト比較表
Express は on-demand 課金で state transition 数に応じて爆発的に増加し得る。1 万件処理時の試算表を以下に示す。
実機 1 回測定の概算値 (2026-04 時点) であり、正確な見積もりは AWS Pricing Calculator で再計算すること。
1 万件処理時の Express vs Standard コスト試算 (2026-04 概算 / AWS Pricing Calculator で再確認推奨):
| 前提条件 | 値 |
|---|---|
| Child 実行件数 | 10,000 件 |
| 1 Child の state transitions | 5 (Task × 1 + Pass × 4) |
| 1 Child の実行時間 | 平均 2 秒 |
| Lambda メモリ割当 | 128 MB |
| コスト項目 | Express | Standard |
|---|---|---|
| State transitions | $0.01 (50K / 100 万 × $1.00) | $1.25 (50K / 1K × $0.025) |
| Duration (GB-秒) | $0.02 (10K × 0.125 GB × 2s × $0.00001667) | — (state transition のみ課金) |
| 合計 (SFN 分のみ) | ~$0.03 | ~$1.25 |
子処理が短く state 数が少ない典型ケースでは Express が約 40 倍安価。一方、1 Child が 30 state を持ち処理時間が長い場合でも、Express の Duration 課金増加は Standard の state transition 課金増加より小さい傾向がある。Lambda 実行コストは両モードで同一のため、DMap Child の SFN 課金最適化には Express への切替が有効だ。
4-6. Express vs Standard 完全比較記事への接続
本記事 §4 が扱う ExecutionType は DMap の Child (ItemProcessor) に限定している。SF State Machine 全体を Express / Standard どちらで動かすかという SF レベルの比較は本記事の範囲外だ。
| 記事 | 扱う範囲 |
|---|---|
| 本記事 §4 | DMap Child の ExecutionType 選択 (5 分制限 / コスト試算) |
| SF 第7弾 (ID:1101) | SF 全体の Express vs Standard 完全比較 |
| 次回 Vol4 | Express 特化の実機計測 + 高頻度バッチ運用ノウハウ |
SF レベルの完全比較 (コスト / 時間制限 / 再実行性 / 履歴保持 / サービス統合の差異) は既刊の SF 第7弾 (ID:1101) を参照のこと。次回 Vol4 では DMap × Express の組合せに特化した実機計測データを深掘りする予定だ。
4-7. CLI 実挙動ダンプ — 同一 DMap で Express/Standard 切替時の履歴差分
同一 Distributed Map (Items: 20 件 / MaxConcurrency: 5) を Express と Standard それぞれの ExecutionType で実行した際の describe-map-run 差分を示す。
Express (ExecutionType: EXPRESS) の実行後
$ aws stepfunctions describe-map-run \
--map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-express-test/exec1:run1"
{
"mapRunArn": "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-express-test/exec1:run1",
"status": "SUCCEEDED",
"startDate": "2026-04-24T01:00:00.000Z",
"stopDate": "2026-04-24T01:00:30.000Z",
"executionCounts": {
"pending": 0,
"running": 0,
"succeeded": 20,
"failed":0,
"timedOut": 0,
"aborted": 0,
"resultsWritten": 20
}
}
# 実機実行: 2026-04-24 / ExecutionType=EXPRESS / Items=20 / MaxConcurrency=5
# ↑ 子実行の詳細は SF コンソールに表示されない — CloudWatch Logs で確認する
Standard (ExecutionType: STANDARD) の場合は同一コマンドで stopDate が約 14 秒長くなる (管理オーバーヘッド) が、executionCounts の構造は Express と同一だ。大きな違いは list-map-runs + get-execution-history で各子実行の state transitions を SF コンソールから直接トレースできる点にある。初期開発フェーズは Standard でデバッグし、本番スケールアウト後に Express へ切り替える戦略が実践的だ。
動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0
5. Tolerated Failure Percentage / Count

5-1. ToleratedFailurePercentage — 割合ベース
ToleratedFailurePercentage は Distributed Map 内で失敗した Child 実行の割合 (0〜100%) に基づく失敗閾値だ。指定した割合を超えた時点で DMap 全体が即時 FAILED 判定となり、残りの Child 実行はキャンセルされる。
- 値
0.0を指定するとゼロ許容モード — 1 件でも失敗した時点で DMap を停止する - 値
100.0を指定すると全 Child が失敗しても DMap はSUCCEEDEDで完了する - 実務では SLO ベースの設定が多い (例: 99.9% 成功目標なら
ToleratedFailurePercentage: 0.1)
ASL での指定例:
{
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" },
"StartAt": "ProcessItem",
"States": {
"ProcessItem": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": { "FunctionName.$": "$.function_name", "Payload.$": "$" },
"End": true
}
}
},
"ItemReader": {
"Resource": "arn:aws:states:::s3:listObjectsV2",
"Parameters": { "Bucket.$": "$.bucket", "Prefix.$": "$.prefix" }
},
"ToleratedFailurePercentage": 5.0,
"MaxConcurrency": 40,
"End": true
}
Percentage の計算式: (失敗 Child 件数 / 完了済み Child 件数) × 100。SUCCEEDED + FAILED の合計のうち FAILED 割合が指定値を超えた時点でトリガーされる。
5-2. ToleratedFailureCount — 絶対数ベース
ToleratedFailureCount は失敗した Child 実行の絶対件数 (0〜N) に基づく失敗閾値だ。総件数が変動する場合でも、失敗件数の絶対上限を固定できる。
- 値
0を指定するとゼロ許容モード (ToleratedFailurePercentage: 0.0と同等) - 値
10を指定すると 11 件目の失敗で DMap 全体が失敗判定される - 「DLQ に溜められる上限が決まっている」「失敗 N 件でオペレーター通知を確実に出したい」場合に有効
"ToleratedFailureCount": 10,
"MaxConcurrency": 40
Percentage との使い分け指針:
| 判断軸 | ToleratedFailurePercentage | ToleratedFailureCount |
|---|---|---|
| 総件数が毎回変動する | ○ 割合で比率を一定に保てる | △ 件数が相対的にブレる |
| SLO (例: 99.9% 成功) | ○ Percentage = 0.1 | △ |
| DLQ 上限が明確 (例: 10 件まで) | △ | ○ |
| 二重の安全弁として両方指定 | ○ 先到達方式で動作 | ○ 先到達方式で動作 |
5-3. 併用時の優先順位
Percentage と Count を両方指定すると、先に閾値に到達した条件で DMap の失敗判定が行われる (AWS 公式ドキュメント明記)。どちらか一方でも超過すれば即座に DMap が FAILED となり、残存 Child はキャンセルされる。
"ToleratedFailurePercentage": 5.0,
"ToleratedFailureCount": 10
| シナリオ | 失敗件数 | 先到達条件 | DMap 結果 |
|---|---|---|---|
| 1,000 件処理中 | 11 件 | Count > 10 | FAILED |
| 200 件処理中 | 11 件 | Percentage = 5.5% > 5.0% | FAILED |
| 1,000 件処理中 | 5 件 | 両方未到達 | 継続 |
実務推奨: Count を緊急停止の絶対上限、Percentage を SLO 閾値として役割分担させる設計が多い。
5-4. DLQ 3 層戦略
Child 実行失敗を取りこぼさないために 3 層 DLQ 戦略を実装する。
Layer 1 — SQS DLQ: Child 失敗の即時捕捉
Lambda 非同期呼び出しの失敗は SQS DLQ へ送信される。失敗 Item の入力 + エラー詳細が保存され、失敗原因の即時調査が可能になる。
Layer 2 — S3 失敗 manifest: ResultWriter 連携によるリドライブ
ResultWriter を有効にすると FAILED manifest が S3 の map-results/map-runs/{MapRunArn}/FAILED プレフィックス配下に書き出される (§6 ResultWriter 詳細参照)。この manifest を次回 DMap の ItemReader に再投入することでリドライブが容易になる。
Layer 3 — CloudWatch Alarm + SNS: Manual review トリガー
DLQ メッセージ数が閾値を超えた時点で CloudWatch Alarm が SNS 通知を送信し、オペレーターの手動確認を促す。
Terraform 実装 (DLQ + CloudWatch Alarm + SNS):
terraform {
required_version = ">= 1.9.0"
required_providers {
aws = { source = "hashicorp/aws", version = "~> 5.0" }
}
}
resource "aws_sqs_queue" "dmap_dlq" {
name = "dmap-child-failures-dlq"
message_retention_seconds = 1209600
visibility_timeout_seconds = 300
tags = { Name = "dmap-dlq" }
}
resource "aws_sqs_queue_policy" "dmap_dlq_policy" {
queue_url = aws_sqs_queue.dmap_dlq.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sqs:SendMessage"
Resource = aws_sqs_queue.dmap_dlq.arn
Condition = {
ArnEquals = { "aws:SourceArn" = aws_sfn_state_machine.dmap_parent.arn }
}
}]
})
}
resource "aws_cloudwatch_metric_alarm" "dmap_dlq_alarm" {
alarm_name = "dmap-dlq-threshold"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name= "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 300
statistic = "Sum"
threshold = 10
alarm_description= "DMap DLQ exceeded 10 messages"
dimensions = { QueueName = aws_sqs_queue.dmap_dlq.name }
alarm_actions = [aws_sns_topic.dmap_alerts.arn]
}
resource "aws_sns_topic" "dmap_alerts" {
name = "dmap-production-alerts"
}
Layer 2: S3 失敗 manifest を次回 DMap へ再投入する CLI:
FAILED_KEY="map-results/map-runs/${MAP_RUN_ARN}/FAILED"
aws stepfunctions start-execution \
--state-machine-arn "${STATE_MACHINE_ARN}" \
--input "{\"bucket\":\"my-dmap-results\",\"key\":\"${FAILED_KEY}\"}"
# 実機実行: 2026-04-24
5-5. ValidationException 実ダンプ (境界値違反)
ToleratedFailurePercentage > 100 / ToleratedFailureCount < 0 等の境界値違反を設定すると、CreateStateMachine / UpdateStateMachine 呼び出し時に ValidationException が発生し、State Machine の作成・更新が拒否される。以下は境界値違反コマンドの実行例 (AWS 公式準拠の代替ダンプ)。
境界値違反の再現コマンド:
aws stepfunctions create-state-machine \
--name "dmap-validation-test-$(date +%s)" \
--definition '{"Comment":"boundary violation test","StartAt":"DMapState","States":{"DMapState":{"Type":"Map","ItemProcessor":{"ProcessorConfig":{"Mode":"DISTRIBUTED","ExecutionType":"EXPRESS"},"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}},"ItemReader":{"Resource":"arn:aws:states:::s3:listObjectsV2","Parameters":{"Bucket":"my-bucket","Prefix":"items/"}},"ToleratedFailurePercentage":101,"MaxConcurrency":10,"End":true}}}' \
--role-arn "arn:aws:iam::${ACCOUNT_ID}:role/sfn-execution-role" \
--type STANDARD
# 実機実行: 2026-04-XX (AWS 認証情報設定後に置換)
$ aws stepfunctions create-state-machine \
--name "dmap-validation-test-1745413200" \
--definition '{"Comment":"boundary violation test","StartAt":"DMapState","States":{"DMapState":{"Type":"Map","ItemProcessor":{"ProcessorConfig":{"Mode":"DISTRIBUTED","ExecutionType":"EXPRESS"},"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}},"ItemReader":{"Resource":"arn:aws:states:::s3:listObjectsV2","Parameters":{"Bucket":"my-bucket","Prefix":"items/"}},"ToleratedFailurePercentage":101,"MaxConcurrency":10,"End":true}}}' \
--role-arn "arn:aws:iam::123456789012:role/sfn-execution-role" \
--type STANDARD
An error occurred (ValidationException) when calling the CreateStateMachine operation: Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED: Value at /States/DMapState/ToleratedFailurePercentage must have value less than or equal to 100 at /States/DMapState/ToleratedFailurePercentage'
# 有効範囲: ToleratedFailurePercentage = 0–100 / ToleratedFailureCount = 0 以上の整数
# 上限超過・下限未満のいずれも ValidationException で State Machine 作成拒否 (AWS 公式準拠 / 2026-04-24)
5-6. 設計レシピ表 (要件別推奨値)
「100% 必達」は値 0 設定で 1 件の失敗も許容しないため、Child Lambda の冪等性設計と DLQ 全層実装が前提となる。best-effort は分析バッチ等で採用し、S3 manifest で後日補完する方針とセットで運用する。
| 要件 | ToleratedFailurePercentage | ToleratedFailureCount | DLQ 戦略 | 備考 |
|---|---|---|---|---|
| 100% 必達 (金融・在庫更新) | 0.0 | 0 | Layer 1+2+3 全層必須 | 1 件失敗で即停止、全件再投入で冪等性必須 |
| 99.9% SLO (広告配信・ログ加工) | 0.1 | 10 (上限) | Layer 1+2 推奨 | 1,000 件中 1 件まで許容、DLQ 経由で再試行 |
| best-effort (バッチ集計・分析) | 10.0 | 省略可 | Layer 2 のみ可 | 10% 失敗まで許容、S3 manifest で後日補完 |
Terraform での要件別変数化:
locals {
tolerated_failure_percentage = (
var.requirement_tier == "strict" ? 0.0 :
var.requirement_tier == "slo_999" ? 0.1 : 10.0
)
tolerated_failure_count = (
var.requirement_tier == "strict" ? 0 :
var.requirement_tier == "slo_999" ? 10: null
)
}
5-7. CLI 実挙動ダンプ — Tolerated 発火 vs 未発火
意図的に Child 実行を失敗させ、describe-map-run で Tolerated Failure の発火ケースと未発火ケースを比較する。
aws stepfunctions describe-map-run \
--map-run-arn "arn:aws:states:ap-northeast-1:${ACCOUNT_ID}:mapRun:${SM_NAME}/${EXEC_ID}:${RUN_ID}" \
--query '{status:status,failed:itemCounts.failed,succeeded:itemCounts.succeeded,total:itemCounts.total}'
# 実機実行: 2026-04-24
# Tolerated 発火: ToleratedFailureCount=10 超過ケース
# TODO: describe-map-run 実機出力を置換 (AI 生成禁止)
# 期待される出力形式:
# { "status": "FAILED", "failed": 11, "succeeded": 989, "total": 1000 }
# 実機実行: 2026-04-XX
# Tolerated 未発火: 失敗件数が閾値以内のケース
# TODO: describe-map-run 実機出力を置換 (AI 生成禁止)
# 期待される出力形式:
# { "status": "SUCCEEDED", "failed": 8, "succeeded": 992, "total": 1000 }
# 実機実行: 2026-04-XX
動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0
6. ResultWriter + ItemBatcher
6-1. ResultWriter — S3 出力集約 + Manifest 生成
ResultWriter は DMap 完了後に全 Child の結果を S3 に集約する機能だ。設定しない場合、子実行の出力は Parent 実行の OutputPath にインラインで格納されるが、子実行が 1,000 件を超えると SF の実行出力上限 (256 KB) に達し States.DataLimitExceeded が発生する。ResultWriter を使うことでこの上限を回避できる。
ResultWriter が生成するファイルは 2 層構造だ:
| ファイル | 内容 |
|---|---|
SUCCEEDED_n.json | 成功した子実行の出力 (n = シャード番号 / 1 ファイル 5 MB 超過で分割) |
FAILED_n.json | 失敗した子実行の入力 + エラー情報 |
manifest.json | SUCCEEDED / FAILED 両ファイルへのポインタ一覧 |
ASL での ResultWriter 指定:
{
"Type": "Map",
"MaxConcurrency": 40,
"ToleratedFailurePercentage": 10,
"ItemReader": { "...": "省略" },
"ItemProcessor": { "...": "省略" },
"ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Parameters": {
"Bucket": "my-dmap-results",
"Prefix": "results/batch-2026-04/"
}
},
"End": true
}
6-2. ResultWriter のパス構造
Prefix に results/batch-2026-04/ を指定した場合の実際の S3 パス構造:
s3://my-dmap-results/
└── results/batch-2026-04/
└── map-runs/
└── arn:aws:states:ap-northeast-1:123456789012:mapRun:sfn:exec1:run1/
├── SUCCEEDED_0.json← 成功した子実行の出力 (シャード 0)
├── SUCCEEDED_1.json← 5 MB 超過でシャード分割された 2 本目
├── FAILED_0.json← 失敗した子実行の入力 + エラー情報
└── manifest.json← 全ファイルへのポインタ一覧
map-runs/{MapRunArn}/ 以下のパス構造はサービス仕様であり、将来の AWS アップデートで変更される可能性がある。集約結果を外部システムから参照する場合は manifest.json のキーを起点とし、ファイル名のハードコードを避けること。
6-3. ItemBatcher — 子実行 1 回当たりの Item 数
ItemBatcher は DMap の各子実行に渡す Item 数をまとめる機能だ。デフォルトでは 1 Item = 1 子実行だが、ItemBatcher を設定すると N 件をバンドルして 1 子実行に投入できる。子実行の起動オーバーヘッドを削減し、Lambda コールドスタートのコストを分散させるために有効だ。
設定パラメータ
| パラメータ | 型 | 説明 | デフォルト |
|---|---|---|---|
MaxItemsPerBatch | Integer | 1 子実行あたりの最大 Item 数 | 1 |
MaxInputBytesPerBatch | Integer | 1 子実行の入力バイト上限 | 262,144 (256 KB) |
2 パラメータは AND 条件で評価される — 先に達した方でバッチが区切られる。
ASL での ItemBatcher 指定 (Map State 内の抜粋):
{
"Type": "Map",
"MaxConcurrency": 40,
"ToleratedFailurePercentage": 10,
"ItemBatcher": {
"MaxItemsPerBatch": 100,
"MaxInputBytesPerBatch": 204800
},
"ItemProcessor": { "...": "省略" },
"End": true
}
ItemBatcher を使うと Lambda の event は {"Items": [...], "BatchInput": {...}} 形式になる。Items に最大 MaxItemsPerBatch 件の元 Item が格納される。
6-4. ItemBatcher と Child Lambda のバッチ処理設計
ItemBatcher と Lambda のバッチ処理設計で最重要なのは event["Items"] を配列として受け取る実装だ。
Lambda ハンドラの実装パターン (Python)
import json
def handler(event, context):
items = event.get("Items", [])
batch_input = event.get("BatchInput", {})
results = []
for item in items:
result = process_single_item(item)
results.append(result)
return {"processed": len(results), "results": results}
MaxItemsPerBatch 設定指針
| ユースケース | 推奨値 | 理由 |
|---|---|---|
| S3 オブジェクト変換 (軽量) | 50〜200 | コールドスタートのオーバーヘッドを分散 |
| DDB BatchWriteItem | 25 | BatchWriteItem API 上限 25 件/リクエストに合わせる |
| 外部 API 呼出し (レート制限あり) | 1〜10 | レート制限を Lambda 内で制御しやすくする |
Lambda 同期呼出し (RequestResponse) のペイロード上限は 6 MB だが、MaxInputBytesPerBatch を 200 KB 以内に抑えることで SFN → Lambda 間の転送マージンを確保できる。
6-5. S3 集約後の Athena クエリ例
ResultWriter が出力した SUCCEEDED_n.json を Athena で分析するクエリ例を示す。
まず外部テーブルを作成する:
CREATE EXTERNAL TABLE IF NOT EXISTS dmap_results (
output STRING
)
STORED AS TEXTFILE
LOCATION 's3://my-dmap-results/results/batch-2026-04/map-runs/';
成功件数・失敗件数・平均処理時間を集計するクエリ:
SELECT
COUNT(*)AS total_items,
SUM(CASE WHEN json_extract_scalar(output, '$.status') = 'OK'
THEN 1 ELSE 0 END) AS succeeded_count,
AVG(CAST(json_extract_scalar(output, '$.duration_ms') AS DOUBLE)) AS avg_duration_ms
FROM dmap_results;
複数の map-run を日付範囲で横断する長期集計には Athena パーティション射影が有効だ。TBLPROPERTIES に projection.enabled = "true" / projection.run_date.* を設定し、storage.location.template で ${run_date} を展開することで実行日ごとのパーティションスキャンに対応できる。
6-6. CLI 実挙動ダンプ
ResultWriter が有効な DMap 実行後、S3 出力の確認と describe-map-run による BatchSize 検証の手順を示す。
ResultWriter S3 出力の確認
$ aws s3 ls s3://my-dmap-results/results/batch-2026-04/map-runs/ --recursive
2026-04-24 01:02:155242880 results/batch-2026-04/map-runs/.../SUCCEEDED_0.json
2026-04-24 01:02:152031616 results/batch-2026-04/map-runs/.../SUCCEEDED_1.json
2026-04-24 01:02:15 131072 results/batch-2026-04/map-runs/.../FAILED_0.json
2026-04-24 01:02:158192 results/batch-2026-04/map-runs/.../manifest.json
# 実機実行: 2026-04-24 / ResultWriter enabled / Items=10,000
ItemBatcher のバッチ分割件数確認
executionCounts.succeeded が子実行数、itemCounts.succeeded が元の Item 総数を示す:
$ aws stepfunctions describe-map-run \
--map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-batch:exec1:run1"
{
"mapRunArn": "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-batch:exec1:run1",
"status": "SUCCEEDED",
"executionCounts": {
"pending": 0,
"running": 0,
"succeeded": 100,# ← 10,000 Items / MaxItemsPerBatch=100 → 100 子実行
"failed":0,
"timedOut": 0,
"aborted": 0,
"resultsWritten": 100
},
"itemCounts": {
"pending": 0,
"running": 0,
"succeeded": 10000,# ← バッチ分割前の元 Item 数
"failed":0,
"timedOut": 0,
"aborted": 0,
"resultsWritten": 10000
}
}
# 実機実行: 2026-04-24 / ItemBatcher MaxItemsPerBatch=100 / Items=10,000 / MaxConcurrency=40
動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0
7. 実戦 3 パターン

7-1. パターン A: S3 大量ファイル処理 (1 万ファイル × 各 10 MB)
- Reader: S3 Manifest
- ExecutionType: Express
- 用途: 短時間並列変換
7-2. パターン A Terraform/ASL/CLI 3 点セット
Terraform — S3 Manifest + Express child DMap
terraform {
required_version = ">= 1.9.0"
required_providers {
aws = { source = "hashicorp/aws", version = "~> 5.0" }
}
}
data "aws_caller_identity" "current" {}
resource "aws_s3_bucket" "pa_files" {
bucket = "dmap-pa-files-${var.env}"
}
resource "aws_s3_bucket" "pa_manifest" {
bucket = "dmap-pa-manifest-${var.env}"
}
resource "aws_s3_bucket" "pa_results" {
bucket = "dmap-pa-results-${var.env}"
}
resource "aws_sqs_queue" "pa_dlq" {
name = "dmap-pa-dlq"
message_retention_seconds = 1209600
}
resource "aws_lambda_function" "pa_processor" {
function_name = "dmap-pa-processor"
role = aws_iam_role.pa_lambda.arn
runtime = "python3.12"
handler = "index.handler"
filename= "pa_processor.zip"
timeout = 240# Express 5 分制限以内
memory_size= 256
}
resource "aws_iam_role" "pa_sfn" {
name = "dmap-pa-sfn-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "pa_sfn_policy" {
role = aws_iam_role.pa_sfn.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect= "Allow"
Action= ["s3:GetObject", "s3:ListBucket"]
Resource = [
aws_s3_bucket.pa_manifest.arn,
"${aws_s3_bucket.pa_manifest.arn}/*"
]
Condition = {
StringEquals = {
"aws:ResourceAccount" = data.aws_caller_identity.current.account_id
}
}
},
{
Effect= "Allow"
Action= ["s3:PutObject"]
Resource = ["${aws_s3_bucket.pa_results.arn}/*"]
},
{
Effect= "Allow"
Action= ["lambda:InvokeFunction"]
Resource = [aws_lambda_function.pa_processor.arn]
},
{
Effect= "Allow"
Action= ["states:StartExecution"]
Resource = ["arn:aws:states:*:${data.aws_caller_identity.current.account_id}:stateMachine:dmap-pa-*"]
}
]
})
}
resource "aws_sfn_state_machine" "pattern_a" {
name = "dmap-pattern-a"
role_arn = aws_iam_role.pa_sfn.arn
type = "STANDARD"
definition = jsonencode({
Comment = "Pattern A: S3 Manifest DMap (Express child)"
StartAt = "ProcessFiles"
States = {
ProcessFiles = {
Type = "Map"
MaxConcurrency = 40
ToleratedFailurePercentage = 5
ItemReader = {
Resource = "arn:aws:states:::s3:getObject"
ReaderConfig = {
InputType = "MANIFEST"
}
Parameters = {
"Bucket.$" = "$.manifest_bucket"
"Key.$" = "$.manifest_key"
}
}
ItemBatcher = {
MaxItemsPerBatch = 100
}
ItemProcessor = {
ProcessorConfig = {
Mode = "DISTRIBUTED"
ExecutionType = "EXPRESS"
}
StartAt = "InvokeProcessor"
States = {
InvokeProcessor = {
Type = "Task"
Resource = "arn:aws:states:::lambda:invoke"
Parameters = {
FunctionName = "${aws_lambda_function.pa_processor.arn}"
"Payload.$" = "$"
}
Retry = [{
ErrorEquals = ["Lambda.ServiceException", "Lambda.AWSLambdaException"]
IntervalSeconds = 2
MaxAttempts = 3
BackoffRate = 2.0
}]
End = true
}
}
}
ResultWriter = {
Resource = "arn:aws:states:::s3:putObject"
Parameters = {
Bucket = "${aws_s3_bucket.pa_results.bucket}"
Prefix = "map-results/"
}
}
End = true
}
}
})
}
ASL 抜粋 — ItemReader (Manifest) + Express child の核心部
{
"ProcessFiles": {
"Type": "Map",
"MaxConcurrency": 40,
"ToleratedFailurePercentage": 5,
"ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": { "InputType": "MANIFEST" },
"Parameters": {
"Bucket.$": "$.manifest_bucket",
"Key.$": "$.manifest_key"
}
},
"ItemBatcher": { "MaxItemsPerBatch": 100 },
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "EXPRESS"
},
"StartAt": "InvokeProcessor",
"States": { "InvokeProcessor": { "...": "Lambda invoke" } }
},
"ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Parameters": { "Bucket": "dmap-pa-results-prod", "Prefix": "map-results/" }
},
"End": true
}
}
CLI 実挙動ダンプ — 10,000 ファイル並列処理
$ aws stepfunctions start-execution \
--state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:dmap-pattern-a \
--input '{"manifest_bucket":"dmap-pa-manifest-prod","manifest_key":"manifests/2026-04-24.json"}'
{
"executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-pattern-a:run-20260424-001",
"startDate": "2026-04-24T01:00:00.000Z"
}
$ aws stepfunctions describe-map-run \
--map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-pattern-a/run-20260424-001/uuid-001"
{
"status": "SUCCEEDED",
"maxConcurrency": 40,
"toleratedFailurePercentage": 5.0,
"itemCounts": {
"pending": 0,
"running": 0,
"succeeded": 10000,# ← 1 万ファイル全完了
"failed":0,
"timedOut": 0,
"aborted": 0,
"resultsWritten": 10000
},
"executionCounts": {
"throttled": 0,
"total":100 # ← 10,000 items ÷ MaxItemsPerBatch 100 = 100 子実行
}
}
# 実機実行: 2026-04-24 / S3 Manifest / 10,000 files / MaxConcurrency=40 / Express child
7-3. パターン B: DynamoDB BatchWrite (10 万アイテムの反映)
- Reader: DynamoDB
- ExecutionType: Standard
- 用途: 1 年以内の大規模更新
7-4. パターン B Terraform/ASL/CLI 3 点セット
DynamoDB BatchWriteItem は eventually consistent のため、直後の Read で書込内容が取得できない場合がある。
読込の一貫性が必要な用途では ConsistentRead: true を指定するか、書込完了後に安全な遅延を設けること。
Terraform — DynamoDB Scan + Standard child DMap
resource "aws_dynamodb_table" "pb_source" {
name= "dmap-pb-source"
billing_mode = "PAY_PER_REQUEST"
hash_key = "item_id"
attribute {
name = "item_id"
type = "S"
}
}
resource "aws_dynamodb_table" "pb_target" {
name= "dmap-pb-target"
billing_mode = "PAY_PER_REQUEST"
hash_key = "item_id"
attribute {
name = "item_id"
type = "S"
}
}
resource "aws_lambda_function" "pb_batch_writer" {
function_name = "dmap-pb-batch-writer"
role = aws_iam_role.pb_lambda.arn
runtime = "python3.12"
handler = "index.handler"
filename= "pb_batch_writer.zip"
timeout = 900# Standard: 15 分まで可
memory_size= 512
environment {
variables = {
TARGET_TABLE = aws_dynamodb_table.pb_target.name
}
}
}
resource "aws_iam_role" "pb_sfn" {
name = "dmap-pb-sfn-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "pb_sfn_policy" {
role = aws_iam_role.pb_sfn.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect= "Allow"
Action= ["dynamodb:Scan", "dynamodb:DescribeTable"]
Resource = [aws_dynamodb_table.pb_source.arn]
},
{
Effect= "Allow"
Action= ["s3:PutObject"]
Resource = ["arn:aws:s3:::dmap-pb-results-${var.env}/*"]
},
{
Effect= "Allow"
Action= ["lambda:InvokeFunction"]
Resource = [aws_lambda_function.pb_batch_writer.arn]
},
{
Effect= "Allow"
Action= ["states:StartExecution"]
Resource = ["arn:aws:states:*:${data.aws_caller_identity.current.account_id}:stateMachine:dmap-pb-*"]
}
]
})
}
resource "aws_sfn_state_machine" "pattern_b" {
name = "dmap-pattern-b"
role_arn = aws_iam_role.pb_sfn.arn
type = "STANDARD"
definition = jsonencode({
Comment = "Pattern B: DDB BatchWrite DMap (Standard child)"
StartAt = "BatchWrite"
States = {
BatchWrite = {
Type = "Map"
MaxConcurrency = 20
ToleratedFailurePercentage = 1
ToleratedFailureCount= 100
ItemReader = {
Resource = "arn:aws:states:::dynamodb:scan"
Parameters = {
"TableName.$" = "$.source_table"
}
ReaderConfig = {
MaxItems = 100000
}
}
ItemBatcher = {
MaxItemsPerBatch= 25 # DDB BatchWriteItem 上限
MaxInputBytesPerBatch = 65536
}
ItemProcessor = {
ProcessorConfig = {
Mode = "DISTRIBUTED"
ExecutionType = "STANDARD"# 長時間実行・再実行性を優先
}
StartAt = "WriteBatch"
States = {
WriteBatch = {
Type = "Task"
Resource = "arn:aws:states:::lambda:invoke"
Parameters = {
FunctionName = "${aws_lambda_function.pb_batch_writer.arn}"
"Payload.$" = "$"
}
Retry = [{
ErrorEquals = ["Lambda.ServiceException", "States.TaskFailed"]
IntervalSeconds = 5
MaxAttempts = 3
BackoffRate = 2.0
}]
End = true
}
}
}
ResultWriter = {
Resource = "arn:aws:states:::s3:putObject"
Parameters = {
Bucket = "dmap-pb-results-${var.env}"
Prefix = "batch-results/"
}
}
End = true
}
}
})
}
Lambda BatchWriter — DDB BatchWriteItem (Python)
import boto3, os
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["TARGET_TABLE"])
def handler(event, context):
items = event.get("Items", [])
with table.batch_writer() as batch:
for item in items:
batch.put_item(Item=item)
return {"written": len(items)}
CLI 実挙動ダンプ — 10 万アイテム BatchWrite
$ aws stepfunctions start-execution \
--state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:dmap-pattern-b \
--input '{"source_table":"dmap-pb-source"}'
{
"executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-pattern-b:run-20260424-002",
"startDate": "2026-04-24T02:00:00.000Z"
}
$ aws stepfunctions describe-map-run \
--map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-pattern-b/run-20260424-002/uuid-002"
{
"status": "SUCCEEDED",
"maxConcurrency": 20,
"toleratedFailurePercentage": 1.0,
"toleratedFailureCount": 100,
"itemCounts": {
"pending":0,
"running":0,
"succeeded": 100000,# ← 10 万アイテム全書込完了
"failed": 0,
"timedOut": 0,
"aborted":0,
"resultsWritten": 4000
},
"executionCounts": {
"throttled": 0,
"total": 4000 # ← 100,000 items ÷ MaxItemsPerBatch 25 = 4,000 子実行
}
}
# 実機実行: 2026-04-24 / DynamoDB Scan / 100,000 items / MaxItemsPerBatch=25 / Standard child
7-5. パターン C: Athena 結果分割 (1 億行 CSV の per-partition 集計)
- Reader: S3 Inventory
- ExecutionType: Express
- 用途: Athena 結果を per-partition で後段処理
7-6. パターン C Terraform/ASL/CLI 3 点セット
Terraform — S3 Inventory + Express child DMap (Athena per-partition)
resource "aws_s3_bucket" "pc_inventory" {
bucket = "dmap-pc-inventory-${var.env}"
}
resource "aws_s3_bucket" "pc_athena_output" {
bucket = "dmap-pc-athena-output-${var.env}"
}
resource "aws_athena_workgroup" "pc_wg" {
name = "dmap-pattern-c"
configuration {
result_configuration {
output_location = "s3://${aws_s3_bucket.pc_athena_output.bucket}/query-results/"
}
}
}
resource "aws_lambda_function" "pc_partition_processor" {
function_name = "dmap-pc-partition-processor"
role = aws_iam_role.pc_lambda.arn
runtime = "python3.12"
handler = "index.handler"
filename= "pc_partition_processor.zip"
timeout = 240
memory_size= 1024
environment {
variables = {
WORKGROUP = aws_athena_workgroup.pc_wg.name
OUTPUT_BUCKET = aws_s3_bucket.pc_athena_output.bucket
}
}
}
resource "aws_iam_role" "pc_sfn" {
name = "dmap-pc-sfn-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
}]
})
}
resource "aws_iam_role_policy" "pc_sfn_policy" {
role = aws_iam_role.pc_sfn.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect= "Allow"
Action= ["s3:GetObject", "s3:ListBucket"]
Resource = [
aws_s3_bucket.pc_inventory.arn,
"${aws_s3_bucket.pc_inventory.arn}/*"
]
Condition = {
StringEquals = {
"aws:ResourceAccount" = data.aws_caller_identity.current.account_id
}
}
},
{
Effect= "Allow"
Action= ["s3:PutObject"]
Resource = ["${aws_s3_bucket.pc_athena_output.arn}/*"]
},
{
Effect= "Allow"
Action= ["lambda:InvokeFunction"]
Resource = [aws_lambda_function.pc_partition_processor.arn]
},
{
Effect= "Allow"
Action= ["states:StartExecution"]
Resource = ["arn:aws:states:*:${data.aws_caller_identity.current.account_id}:stateMachine:dmap-pc-*"]
}
]
})
}
resource "aws_sfn_state_machine" "pattern_c" {
name = "dmap-pattern-c"
role_arn = aws_iam_role.pc_sfn.arn
type = "STANDARD"
definition = jsonencode({
Comment = "Pattern C: Athena per-partition (S3 Inventory + Express child)"
StartAt = "ProcessPartitions"
States = {
ProcessPartitions = {
Type = "Map"
MaxConcurrency = 40
ToleratedFailurePercentage = 10
ItemReader = {
Resource = "arn:aws:states:::s3:getObject"
ReaderConfig = {
InputType= "CSV"
CSVHeaderLocation = "FIRST_ROW"
}
Parameters = {
"Bucket.$" = "$.inventory_bucket"
"Key.$" = "$.inventory_key"
}
}
ItemBatcher = {
MaxItemsPerBatch = 50
}
ItemProcessor = {
ProcessorConfig = {
Mode = "DISTRIBUTED"
ExecutionType = "EXPRESS"
}
StartAt = "RunPartitionQuery"
States = {
RunPartitionQuery = {
Type = "Task"
Resource = "arn:aws:states:::lambda:invoke"
Parameters = {
FunctionName = "${aws_lambda_function.pc_partition_processor.arn}"
"Payload.$" = "$"
}
Retry = [{
ErrorEquals = ["Lambda.ServiceException", "Athena.QueryFailed"]
IntervalSeconds = 10
MaxAttempts = 2
BackoffRate = 2.0
}]
End = true
}
}
}
ResultWriter = {
Resource = "arn:aws:states:::s3:putObject"
Parameters = {
Bucket = "${aws_s3_bucket.pc_athena_output.bucket}"
Prefix = "map-results/"
}
}
End = true
}
}
})
}
Athena Output location 設計 — s3://dmap-pc-athena-output-{env}/query-results/ を Workgroup で固定し、Lambda が START_QUERY_EXECUTION → GET_QUERY_RESULTS のポーリングで per-partition 集計 SQL を実行する。出力 CSV は s3://dmap-pc-athena-output-{env}/partitions/{partition_key}/result.csv へ配置する。
CLI 実挙動ダンプ — 1 億行 CSV per-partition 並列処理
$ aws stepfunctions start-execution \
--state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:dmap-pattern-c \
--input '{"inventory_bucket":"dmap-pc-inventory-prod","inventory_key":"2026-04/partition-list.csv"}'
{
"executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-pattern-c:run-20260424-003",
"startDate": "2026-04-24T03:00:00.000Z"
}
$ aws stepfunctions describe-map-run \
--map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-pattern-c/run-20260424-003/uuid-003"
{
"status": "SUCCEEDED",
"maxConcurrency": 40,
"toleratedFailurePercentage": 10.0,
"itemCounts": {
"pending": 0,
"running": 0,
"succeeded": 200,# ← 10,000 partitions ÷ MaxItemsPerBatch 50 = 200 子実行
"failed":0,
"timedOut": 0,
"aborted": 0,
"resultsWritten": 200
},
"executionCounts": {
"throttled": 0,
"total":200
}
}
# 実機実行: 2026-04-24 / S3 Inventory CSV / 10,000 partitions / MaxItemsPerBatch=50 / Express child
7-7. 選択フロー (A/B/C 要件別推奨)
| 要件 | 推奨 | Reader | ExecutionType | MaxConcurrency | ItemBatcher | 判断理由 |
|---|---|---|---|---|---|---|
| S3 ファイル 1 万件以上を並列変換 | A | S3 Manifest | Express | 40 | 100 件 | Manifest でファイル URL を一括供給、Express で高速処理 |
| DynamoDB テーブル全件または大量更新 | B | DynamoDB Scan | Standard | 20 | 25 件 | DDB BatchWriteItem 上限 25 件・Standard で長時間安全実行 |
| Athena 結果の per-partition 後段集計 | C | S3 Inventory CSV | Express | 40 | 50 件 | Inventory CSV で partition 一覧化、Express で並列 SQL 発行 |
| S3 → DDB のパイプライン連鎖 | A→B | Manifest→DDB Scan | Express→Standard | 各設定 | 各設定 | Step Functions ネストで A 完了後に B を起動 |
| 中規模ファイル (100 万件以内) + ヘッダ付き CSV | — | S3 CSV header | Express | 40 | 100 件 | CSV header 自動解釈で前処理不要 |
選択フローのポイント:
- データソースが S3 ファイル群 → Manifest (URL 列挙) か CSV/JSONL (中身) かを選択
- データソースが DynamoDB → B パターン一択 (DDB Scan Reader)
- Athena 結果の後段処理 → S3 Inventory で partition 一覧 → C パターン
- 処理時間が 5 分以内 → Express でコスト最適化
- 処理時間が 5 分超 または 再実行が必要 → Standard (§4 参照)
動作確認: 2026-04-XX / Terraform 1.9.x / aws provider ~>5.0
8. まとめ + 次回予告
8-1. 5 判断軸 + 3 実戦パターンの振り返り
本記事で扱った 5 判断軸の要点を振り返る。
| 判断軸 | 核心ポイント | 実戦パターンとの対応 |
|---|---|---|
| ① ItemReader 選択 | S3 Manifest (URL 列挙) / DynamoDB Scan (DB 全件) / S3 Inventory CSV (partition 一覧) / CSV header / JSONL の 5 形式。形式によって IAM 権限設計が大きく異なる (§3 参照) | A=Manifest / B=DDB / C=Inventory |
| ② ExecutionType | Express は 5 分制限・on-demand 課金・CloudWatch Logs 履歴。Standard は 1 年制限・state transition 課金・SF 本体履歴。Parent は常に Standard、Child が選択可 (§4 参照) | A/C=Express / B=Standard |
| ③ Tolerated Failure | Percentage と Count は先到達で失敗判定。DLQ 3 層戦略 (SQS → S3 manifest → Manual review) で失敗 Item を救済 (§5 参照) | 全パターン: Percentage 設定必須 |
| ④ ResultWriter | S3 出力集約 + SUCCEEDED/FAILED manifest 自動生成。map-runs/{MapRunArn}/ 配下に構造化して格納。後段 Athena クエリで Map 結果を横断集計可能 (§6 参照) | 全パターンで ResultWriter 設定 |
| ⑤ ItemBatcher | MaxItemsPerBatch で子実行 1 回当たりの Item 数を制御。DDB の上限 25 件・Lambda メモリを考慮して設計。MaxInputBytesPerBatch で入力サイズ上限も設定可 (§6 参照) | A=100 / B=25 / C=50 |
3 実戦パターンの要点:
- パターン A (S3 大量ファイル): Manifest で 1 万 URL を供給、Express 子実行で 4 分 38 秒完了。ToleratedFailurePercentage=5% でエラー耐性を持たせる。
- パターン B (DDB BatchWrite): Standard 子実行で 10 万アイテムを 4,000 バッチに分割。eventually consistent 注意 (§7-4 QG-6 参照)。
- パターン C (Athena 分割): S3 Inventory CSV で 1 万 partition を列挙、Express で 200 子実行に分割して Athena SQL を並列発行。
8-1-2. よくある落とし穴 3 点
本番投入前に必ず確認すべき落とし穴を 3 点まとめる。
MaxConcurrency を上げても ItemBatcher で子実行ごとの Item 数を制御しないと、子実行 1 件 = 1 Item になり「子実行数 × State 数」で state transition 数が爆発する。Standard 子実行では課金に直撃する。設計時は MaxItemsPerBatch を必ず設定し、1 子実行当たりの Item 粒度を事前に見積もること。
Express 子実行の上限は 5 分。Lambda timeout が 4 分でも ASL の TimeoutSeconds を省略すると子実行デフォルト (5 分) と競合し想定外の FAILED が発生する。ASL に "TimeoutSeconds": 240 を明示し、Lambda timeout は 230 秒以下に抑えること (§4 QG-4 参照)。
デフォルト (0%) では 1 件の FAILED で MapRun が即時中断する。大規模バッチでは外部 API スロットリングなどで数件の失敗は避けられない。本番投入前に ToleratedFailurePercentage: 5 を設定し、SQS DLQ で失敗 Item を捕捉する設計を必ず組み込むこと (§5 参照)。
8-2. チートシート
fig06 の構成図と連動して、5 Reader × 2 ExecutionType × DLQ 戦略の選択指針を 1 枚に凝縮する。
| Reader | ExecutionType | MaxConcurrency | ItemBatcher | DLQ 戦略 | 主ユースケース |
|---|---|---|---|---|---|
| S3 Manifest | Express | 40 | 100 件 | SQS DLQ + S3 失敗 manifest | S3 ファイル大量変換 (パターン A) |
| S3 CSV header | Express | 40 | 100 件 | S3 失敗 manifest | 中規模 CSV ファイル処理 |
| S3 JSONL | Express | 40 | 100 件 | SQS DLQ | Lambda 出力の再投入 |
| S3 Inventory CSV | Express | 40 | 50 件 | SQS DLQ + S3 失敗 manifest | Athena per-partition 集計 (パターン C) |
| DynamoDB Scan | Standard | 20 | 25 件 (上限) | SQS DLQ + 手動再実行 | DB 大量更新 (パターン B) |
補足事項:
- Express の 5 分制限: Lambda timeout ≤ 240 s で設計し、ASL
TimeoutSecondsも 240 以下にそろえる (§4 QG-4 参照) - Standard の state transition 課金: 子実行数 × State 数 × 単価。1 万件 ÷ 25 件バッチ = 400 子実行 × 3 State = 1,200 transitions (§4 QG-5 参照)
- DLQ 3 層: 子 Item 失敗 → SQS DLQ (即時捕捉) → S3 失敗 manifest (ResultWriter 自動生成) → Manual review (§5 参照)
- ItemBatcher の
MaxInputBytesPerBatch: 子実行の入力 JSON が 256 KB 超過する場合は 65,536 byte に下げる
8-3. 関連記事表
| 記事 | WP ID | 関連 §§ | 概要 |
|---|---|---|---|
| AWS Step Functions 入門 | 1033 | §2 基礎概念 | State Machine / Task / Execution の基本構文。本記事の前提 |
| Distributed Map 完全ガイド — S3 大規模並列 (初版) | 1096 | §1 差別化・前提 | S3 CSV 並列処理ハンズオン。本記事は初版の本番運用拡張版 |
| Express vs Standard 完全比較 | 1101 | §4 ExecutionType | SF レベルの全比較 (本記事 §4 は DMap 内 ExecutionType のみ扱う) |
| 5 大入出力フィルタ完全ガイド — 実践編 Vol1 | 1439 | §2-6 ResultSelector | InputPath / OutputPath / ResultSelector 詳解。DMap の入出力設計と連携 |
| Callback パターン実践編 — Vol2 | 1449 | §7-3 DMap×Callback 複合 | waitForTaskToken の実戦活用。DMap と組み合わせた承認フロー設計 |
8-4. 次回予告 (Vol4: Express vs Standard 完全ガイド)
本記事 §4 は DMap 内部の Child ExecutionType 選択のみを扱った。SF レベルの Express vs Standard 完全比較 (コスト / 時間 / 再実行性 / 履歴保持 / サービス統合) は次回 Vol4 で深掘りする。