NO IMAGE

Step Functions Distributed Map 本番運用 5 判断軸 × 3 実戦

NO IMAGE
目次

1. この記事について

fig01: SF 実践編シリーズ全体マップ (Vol3 ハイライト)

この記事を読むと得られること

  • ItemReader の 5 形式 (S3 Inventory / S3 CSV header / S3 JSONL / S3 Manifest / DynamoDB) の使い分けを即答でき、IAM 権限設計を自力で行える
  • ExecutionType Express vs Standard の選択軸 (ワークフロー時間・課金・再実行性) を根拠を持って選択できる
  • ToleratedFailurePercentage / ToleratedFailureCount の使い分けを即答し、DLQ 3 層戦略を設計できる
  • ResultWriter + ItemBatcher の組合せで S3 出力集約とバッチ粒度を最適化できる
  • 実戦 3 パターン (S3 大量ファイル / DynamoDB BatchWrite / Athena 結果分割) のどれが自分の要件に最適か判断し、Terraform で 30 分以内に PoC を立ち上げられる
本記事が対象としないこと

  • Step Functions 基本概念 (State Machine/State/Task/Execution) → SF 入門記事へ委譲
  • InputPath / Parameters / ResultPath / OutputPath の詳細 → SF 実践編 Vol1へ委譲
  • Callback パターン (.waitForTaskToken) の基礎・実戦 → SF 実践編 Vol2へ委譲
  • Distributed Map 入門 (Inline Map との違い・S3 CSV ハンズオン) → DMap 初版 (ID:1096)へ委譲
  • SF レベルの Express vs Standard 完全比較 → SF 第7弾 (ID:1101)へ委譲

1-1. 本記事のゴール

本記事を読了すると、以下の 5 つを自力で実践できる。

  1. ItemReader の 5 形式 (S3 Inventory / S3 CSV header / S3 JSONL / S3 Manifest / DynamoDB) の使い分けを即答 でき、それぞれの IAM 最小権限を Condition キー付きで設計できる
  2. ExecutionType Express vs Standard の選択 を「ワークフロー実行時間 / 課金モデル / 再実行性」の 3 軸で根拠を持って判断できる
  3. ToleratedFailurePercentageToleratedFailureCount の使い分けを即答 し、SQS DLQ / S3 失敗 manifest / 手動 review の DLQ 3 層戦略を設計できる
  4. ResultWriter + ItemBatcher の組合せ で、Child 実行の S3 出力を manifest に集約し、バッチ粒度を要件に合わせ最適化できる
  5. 実戦 3 パターン (A: S3 大量ファイル / B: DynamoDB BatchWrite / C: Athena 結果分割) から自要件に最適なパターンを選び、Terraform HCL を 30 分以内に PoC として立ち上げられる

下表は本記事が解消する「DMap 本番設計の詰まりポイント」と改善内容の価値ステートメントである。

局面改善前改善後 (本記事)
DMap の ItemReader 選択で迷走CSV 以外で書きたいが AWS doc が分散・IAM 権限が分からない§3 で 5 形式を同一軸で比較 + IAM 最小権限一覧表
Express vs Standard で詰まる課金 / 時間上限 / 再実行性の判断軸が曖昧、公式 doc を読み直し続ける§4 で選択フローチャート + 実測コスト比較表
Tolerated Failure の設計で本番事故100 件中 10 件失敗したら続行したいが Percentage 計算を誤る§5 で ValidationException の実ダンプ + DLQ 3 層戦略
ResultWriter で S3 出力が散乱Map の結果 JSON が Child 実行ごとにバラバラ、集約ツールを自作§6 で ResultWriter + ItemBatcher の完全実装
大量ファイル処理 PoC に 1 週間S3 → Lambda の fan-out 設計を Terraform で 1 から構築§7 実戦 3 パターンで Terraform コピペ → 30 分 PoC

1-2. 読者像

本記事は以下の読者像を想定している。

前提知識必要度
SF 入門 (ID:1033) 読了済 — State Machine / State / Task / Execution の基本構文を書ける✅ 必須
DMap 初版 (ID:1096) 読了済、または S3 CSV 並列処理の基礎を独学で習得済✅ 必須
現場で「1 万件以上の並列処理」「DynamoDB への大量書込」「Athena 結果の後段処理」を設計する機会あり✅ 想定
Terraform で AWS リソース (SF + Lambda + S3/DDB) を 3 点セットで自力構築できる✅ 推奨
Express vs Standard の課金モデルの大枠を把握△ なければ SF 第7弾 (ID:1101) を先読み推奨

SF 入門 (ID:1033) / DMap 初版 (ID:1096) が未読の場合は先読みを推奨する。本記事は「DMap の本番運用で設計判断に迷う実践者」向けに最適化されており、入門知識は省略している。

1-3. なぜ今 DMap 本番運用か (Vol1 → Vol2 → Vol3 の導線)

SF 実践編シリーズ (Vol1〜Vol3) は以下の階層で構成されている。

WP ID焦点本記事との接続
Vol11439InputPath / Parameters / ResultPath / OutputPath / ResultSelector の 5 大フィルタ§3 の ItemSelector / ResultSelector で Vol1 の変換構文を再利用
Vol21449waitForTaskToken Callback パターン + SQS / SNS / API GW 3 実戦§7 実戦パターン B/C での DMap×Callback 複合構成 (Vol2 §7 参照)
Vol3 (本記事)ItemReader 5 形式 / ExecutionType / Tolerated Failure / ResultWriter / ItemBatcher

Vol1 のデータ変換知識 → Vol2 の非同期連携パターン → Vol3 の大量並列 fan-out 設計 という流れで読むと、SF の全レイヤーを体系的に習得できる。本記事 §3〜§6 では Vol1 / Vol2 で登場した構文を前提として拡張する。

1-4. Inline Map / Parallel / EMR との使い分け

DMap を選ぶ前に「Inline Map / Parallel / EMR ではなぜ不十分か」を確認する。

方式並列度上限入力データ源課金モデル最適なユースケース
Inline Map最大 40JSON 配列 (256KB 上限)State transition数百件以下の小規模 fan-out
ParallelStates 数固定State transition独立した複数ブランチを並走させたいだけ
DMap (Express)40 個別 / 10,000 アカウントS3 / DDBState transition大量 item fan-out・5 分以内 child が前提
DMap (Standard)40 個別 / 10,000 アカウントS3 / DDBState transition大量 item fan-out + 再実行・監査ログ必須
EMR Serverlessクラスタ規模依存HDFS / S3 (数 TB 以上)EMR 処理時間課金Spark/Hive 大規模バッチ。SF state 管理不要
使い分けの第一原則

  • 入力が 1,000 件以上かつ S3/DDB 上にある → Distributed Map を選ぶ
  • 入力が 256KB 以内の JSON 配列 → Inline Map で十分
  • 独立した複数フローを並走させたいだけ → Parallel
  • 数 TB 以上・Spark/Hive が必要 → EMR Serverless を検討

1-5. 関連記事表

本記事を最大限活用するために推奨する関連記事を示す。特に DMap 初版 (ID:1096) が未読の場合は先に読了することを推奨する。
本記事は §1-6 で示す 6 点の差別化を起点に初版とは明確に異なる範囲を扱う。

記事WP ID役割
AWS Step Functions 入門1033SF の基礎概念 (State Machine / Task / Execution) の前提確認
Retry/Catch/Timeout 完全ガイド1057§5 Tolerated Failure と Retry/Catch の境界線
Distributed Map 完全ガイド 初版1096本記事の前提 — S3 CSV ハンズオンで DMap 基礎を固める
Express vs Standard 完全比較1101§4 DMap 内 ExecutionType と SF レベル実行モード比較の差別化
5 大入出力フィルタ完全ガイド (Vol1)1439§3 ItemSelector / ResultSelector の変換構文
Callback パターン実践編 (Vol2)1449§7 実戦パターンでの DMap×Callback 複合構成
(予告) Express vs Standard 実践編 Vol4§8-4 次回予告。DMap 内 ExecutionType を SF 全体に拡張

1-6. 初版 (ID:1096) との差別化 6 点 ★最重要

初版 (ID:1096) との差別化軸 — 本記事が「焼き直し」ではない 6 つの理由

  1. ItemReader 5 形式の徹底比較 — 初版は CSV のみ、本記事は S3 Inventory / CSV header / JSONL / Manifest / DynamoDB の 5 形式 × IAM 論点
  2. ExecutionType Express vs Standard (§4) を独立章で運用判断軸を提示 — 初版では触れず
  3. Tolerated Failure Percentage / Count (§5) の使い分け + DLQ 3 層戦略 — 初版は Percentage 紹介のみ
  4. ResultWriter + ItemBatcher (§6) — 初版未扱い、S3 出力集約とバッチ粒度の最適設計
  5. 実戦 3 パターン (§7) — A: S3 大量ファイル / B: DynamoDB BatchWrite / C: Athena 結果分割 (初版の S3 CSV 1 パターンから発展)
  6. IAM 最小権限 + Condition (§3-7) — 初版は Reader 別 IAM 論点を扱わず、本記事は Condition 付き Bucket 制限例まで完備

1-7. 本記事の読み進め方

本記事は各章が独立しており、実務ユースケースに応じて章を選んで読める構成になっている。推奨読み進め順を示す。

  • 「どの Reader を使うか迷っている」→ §3 (ItemReader 徹底比較) から読む
  • 「Express か Standard か判断したい」→ §4 (ExecutionType 選択) から読む
  • 「失敗耐性の設計方法を知りたい」→ §5 (Tolerated Failure) を参照
  • 「すぐに動くコードが欲しい」→ §7 (実戦 3 パターン) の該当パターンに飛ぶ
  • 「全体を通して学びたい」→ §1-8 の順に読み進める

2. Distributed Map アーキテクチャ全体像

2-1. 前提権限

Distributed Map の本番稼働には、SF 実行ロールに 3 系統の権限が最低限必要になる。

権限系統代表的な Action条件
SF 子実行起動states:StartExecutionParent が Child DMap を起動する際に必須
S3 読込 (ItemReader)s3:GetObject, s3:ListBucketReader 形式に応じて最小化 (§3-7 詳述)
DDB 読込 (DynamoDB Reader)dynamodb:Scan, dynamodb:DescribeTableDDB Reader 使用時のみ追加
S3 書込 (ResultWriter)s3:PutObjectResultWriter 有効化時
Lambda 呼出 (Child processor)lambda:InvokeFunctionChild の ItemProcessor が Lambda を呼ぶ場合

以下は本記事全体の Terraform ベースとなる最小 IAM ロール骨格。§3〜§7 で Reader 別・パターン別に拡張する。

resource "aws_iam_role" "sfn_dmap" {
  name = "sfn-dmap-execution-role"
  assume_role_policy = jsonencode({
 Version= "2012-10-17",
 Statement = [{
Effect = "Allow",
Principal = { Service = "states.amazonaws.com" },
Action = "sts:AssumeRole"
 }]
  })
}

resource "aws_iam_role_policy" "sfn_dmap_base" {
  name= "sfn-dmap-base-policy"
  role= aws_iam_role.sfn_dmap.id
  policy = jsonencode({
 Version = "2012-10-17",
 Statement = [
{
  Effect= "Allow",
  Action= ["states:StartExecution"],
  Resource = ["arn:aws:states:*:*:stateMachine:*"]
},
{
  Effect= "Allow",
  Action= ["s3:GetObject", "s3:ListBucket"],
  Resource = [
 "arn:aws:s3:::${var.input_bucket}",
 "arn:aws:s3:::${var.input_bucket}/*"
  ]
},
{
  Effect= "Allow",
  Action= ["s3:PutObject"],
  Resource = ["arn:aws:s3:::${var.output_bucket}/*"]
},
{
  Effect= "Allow",
  Action= ["lambda:InvokeFunction"],
  Resource = [var.processor_lambda_arn]
}
 ]
  })
}

2-2. 使用技術スタック

本記事で扱う技術スタックを次表に示す。

技術バージョン / 仕様役割
Terraform>= 1.9.0IaC ツール。SF State Machine / IAM / S3 / Lambda を一括定義
hashicorp/aws provider~> 5.0aws_sfn_state_machine で DMap 対応 ASL を記述
Step Functions ASL1.0 (DMap 対応版)"Type": "Map" + "ItemReader" + "ItemBatcher" + "ResultWriter"
AWS CLI>= 2.15.0describe-map-run / list-map-runs (DMap 専用 API) の実行
Python3.12CLI 出力パース・DMap manifest 検証スクリプト

aws_sfn_state_machinedefinition フィールドに ASL を jsonencode() または HEREDOC で埋込む形式を全パターンで統一する。

2-3. ゴール状態の定義

本記事を通じて次の 3 実戦パターンが Terraform で稼働できる状態をゴールとする。

パターンシナリオ稼働確認コマンド
A: S3 大量ファイル処理S3 の 1 万ファイル → Lambda で並列変換 → ResultWriter で manifest 集約aws stepfunctions describe-map-run --map-run-arn <ARN>
B: DynamoDB BatchWriteDDB テーブル 10 万アイテムを Scan → Lambda で BatchWrite → DLQ 監視aws stepfunctions list-map-runs --execution-arn <ARN>
C: Athena 結果分割Athena クエリ結果 S3 CSV (1 億行) → ItemBatcher でバッチ化 → 並列後段処理aws stepfunctions get-execution-history --execution-arn <ARN>

いずれも ToleratedFailurePercentage / ResultWriter を有効化した本番想定の ASL + Terraform HCL + CLI ダンプの 3 点セットで記述する。

2-4. Parent / Child 実行モデル図解

fig02: Distributed Map Parent/Child 実行モデル全体図

fig02 は Distributed Map の Parent/Child 実行モデル全体を示す。Parent 実行が ItemReader からアイテムを読み込み、MaxConcurrency 個の Child 実行を同時発行する。Child は ItemProcessor 内で定義した State Machine を実行し、ResultWriter が Child の出力を S3 manifest に集約する。

以下は Parent 実行の ASL 骨格 (Child 側の State Machine 定義は ItemProcessor ブロック内に直接埋込む):

{
  "Type": "Map",
  "ItemReader": {
 "Resource": "arn:aws:states:::s3:listObjectsV2",
 "Parameters": {
"Bucket.$": "$.inputBucket",
"Prefix.$": "$.inputPrefix"
 }
  },
  "ItemBatcher": {
 "MaxItemsPerBatch": 100
  },
  "MaxConcurrency": 40,
  "ToleratedFailurePercentage": 5,
  "ToleratedFailureCount": 500,
  "ResultWriter": {
 "Resource": "arn:aws:states:::s3:putObject",
 "Parameters": {
"Bucket.$": "$.outputBucket",
"Prefix": "results/"
 }
  },
  "ItemProcessor": {
 "ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "EXPRESS"
 },
 "StartAt": "Process",
 "States": {
"Process": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "Parameters": { "FunctionName.$": "$.processorArn", "Payload.$": "$" },
  "End": true
}
 }
  }
}

2-5. MaxConcurrency と同時実行数の関係

DMap の並列度は「DMap 個別上限」と「SF アカウント上限」の 2 段構えで制限される。

制限上限値 (2026-04 時点)設定箇所
DMap 個別上限40ASL MaxConcurrency フィールド
SF アカウント上限 (Standard)10,000 同時実行AWS アカウント側クォータ (Service Quotas で引上げ可)
SF アカウント上限 (Express)100,000 同時実行同上
MaxConcurrency 設計ガイドライン

  • MaxConcurrency: 0 はすべてのアイテムを一斉並列 (上限なし) — 本番では非推奨
  • MaxConcurrency: 40 が DMap の個別上限。これ以上設定しても 40 に丸められる
  • アカウント上限 10,000 に達するには複数の DMap を同時稼働させる必要がある (1 DMap = max 40)
  • Child 実行が長時間化する場合は MaxConcurrency を下げてスロットリングを防ぐ

現在のアカウント上限を CLI で確認する:

# SF Standard 同時実行上限の確認
aws service-quotas get-service-quota \
  --service-code states \
  --quota-code L-5D5B1C53 \
  --query "Quota.Value" \
  --output text
# 出力例: 10000.0

2-6. Tolerated Failure / ResultWriter / ItemBatcher の位置付け

§3〜§6 で深掘りする 3 コンポーネント (Tolerated Failure / ResultWriter / ItemBatcher) の全体における位置付けを次表で俯瞰する。

コンポーネントASL フィールド役割詳細
Tolerated FailureToleratedFailurePercentage / ToleratedFailureCountChild 実行が一定数失敗しても Parent を継続する失敗許容設計§5
ResultWriterResultWriterChild 実行の出力 JSON を S3 にストリーム書込し manifest を生成§6
ItemBatcherItemBatcher1 回の Child 実行に投入するアイテム数 (バッチ粒度) を制御§6

これら 3 つは互いに独立した ASL フィールドだが、組合せ設計 が本番品質を決める。ItemBatcher でバッチ粒度を増やすと Child 数が減り Tolerated Failure の絶対件数が変化する。ResultWriter は全 Child が終了後に manifest を集約するため、ItemBatcher のバッチサイズが大きいほど 1 ファイルの書込サイズが増大する。

2-7. 初版 (ID:1096) との差別化再掲

初版 (ID:1096) との差別化 — §2 アーキテクチャ論点で追加されるもの

  • Parent/Child 実行モデルの全体構造 (fig02) — 初版は Inline Map との比較を中心に解説。本記事は Parent 発行 → MaxConcurrency 制限 → ResultWriter 集約の全フローを ASL + Terraform で完全実装
  • MaxConcurrency の 2 段制限 (DMap 個別 40 / アカウント 10,000) — 初版は上限値に言及なし。本記事は Service Quotas CLI 確認手順まで完備
  • Tolerated Failure / ResultWriter / ItemBatcher の位置付け整理 — 初版未扱い。本記事 §5/§6 で ASL + Terraform の 3 点セット完全実装

3. ItemReader 徹底比較 + IAM 論点

fig03: ItemReader 5 形式比較図

3-1. 5 形式比較表

Distributed Map が対応する 5 種の ItemReader を同一軸で比較する。Reader 選択は I/O コスト・IAM 権限設計・Athena 連携可否の全てに直結するため、本記事の設計判断軸 No.1 として最初に確定させる。

ReaderInputTypeデータサイズ上限コスト傾向IAM 主要権限Athena 連携更新頻度
S3 InventoryS3_INVENTORY1 億件+ (推奨)S3 GET + Inventory 設定費s3:GetObject, s3:GetInventoryConfiguration◎ (Inventory テーブル直接クエリ可)日次/週次 (Inventory 周期依存)
S3 CSV headerCSV100 万件 (推奨)S3 GET のみ (最安)s3:GetObject△ (別途 Athena テーブル定義が必要)任意
S3 JSONLJSON1000 万件 (推奨)S3 GET のみ (最安)s3:GetObject△ (JSON Serde 設定が必要)任意
S3 ManifestMANIFESTファイル本数制限なしS3 GET のみ (最安)s3:GetObject△ (非標準)任意
DynamoDB Scan— (dynamodb:scan)テーブル全件 (1000 万件以内推奨)DDB Read Capacity 消費dynamodb:Scan, dynamodb:DescribeTable× (DDB Export→S3 経由)リアルタイム (Scan 時点スナップショット)

Reader 選択の第一原則: データソースが既に S3 に存在するなら S3 系 Reader が最安。DDB テーブルの全件を直接並列処理したい場合のみ DynamoDB Scan を選択する。

3-2. S3 Inventory (Parquet/ORC manifest)

1 億件規模の S3 オブジェクト一覧を DMap の入力とする場合に最適。S3 Inventory を事前に有効化して定期生成される manifest.json の S3 パスを ItemReader に渡す。Athena から Inventory テーブルを直接クエリして処理対象オブジェクトを絞り込んだ後に DMap を起動するパターンが実戦で最多。

{
  "Type": "Map",
  "ItemReader": {
 "Resource": "arn:aws:states:::s3:getObject",
 "ReaderConfig": { "InputType": "S3_INVENTORY" },
 "Parameters": {
"Bucket.$": "$.inventoryBucket",
"Key.$": "$.manifestKey"
 }
  },
  "MaxConcurrency": 40,
  "ToleratedFailurePercentage": 5,
  "ToleratedFailureCount": 1000,
  "ItemProcessor": {
 "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" },
 "StartAt": "ProcessObject",
 "States": {
"ProcessObject": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "Parameters": { "FunctionName.$": "$.lambdaArn", "Payload.$": "$" },
  "End": true
}
 }
  }
}

Athena Inventory クエリで処理対象を絞る例: SELECT bucket, key FROM "s3_inv_db"."my_bucket_inv" WHERE last_modified_date > date '2026-04-01'

3-3. S3 CSV header

CSV ファイルの 1 行目をヘッダー行として自動解釈する。RDS→S3 エクスポートなどバッチ処理システムが CSV を出力するパイプラインで多用される中規模向け Reader。CSVHeaders の列順は CSV ファイルの実際の列順と一致させること。不一致の場合 States.ItemReaderFailed が発生する。

{
  "Type": "Map",
  "ItemReader": {
 "Resource": "arn:aws:states:::s3:getObject",
 "ReaderConfig": {
"InputType": "CSV",
"CSVHeaders": ["user_id", "event_type", "amount", "created_at"]
 },
 "Parameters": { "Bucket.$": "$.bucket", "Key.$": "$.csvKey" }
  },
  "MaxConcurrency": 40,
  "ToleratedFailurePercentage": 1,
  "ToleratedFailureCount": 100,
  "ItemProcessor": {
 "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" },
 "StartAt": "ProcessRow",
 "States": {
"ProcessRow": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "Parameters": { "FunctionName": "process-csv-row", "Payload.$": "$" },
  "End": true
}
 }
  }
}

3-4. S3 JSONL

Lambda が出力した JSONL (1 行 1 JSON オブジェクト) を後段の DMap に直接投入するパターンで使用する。InputType: "JSON" は実際には JSONL / NDJSON 形式を意味する点に注意。1 ファイル最大 10 GB、実用上は 1000 万行以内を推奨。

{
  "Type": "Map",
  "ItemReader": {
 "Resource": "arn:aws:states:::s3:getObject",
 "ReaderConfig": { "InputType": "JSON" },
 "Parameters": { "Bucket.$": "$.bucket", "Key.$": "$.jsonlKey" }
  },
  "MaxConcurrency": 40,
  "ToleratedFailurePercentage": 5,
  "ToleratedFailureCount": 500,
  "ItemProcessor": {
 "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" },
 "StartAt": "ProcessEvent",
 "States": {
"ProcessEvent": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "Parameters": { "FunctionName": "process-event", "Payload.$": "$" },
  "End": true
}
 }
  }
}

3-5. S3 Manifest (JSON list)

S3 上のファイル URL 一覧を格納した JSON ファイルを Reader に渡す形式。既存の別システムが生成した manifest ファイルをそのまま流用でき、処理対象ファイルセットを毎回動的に差し替えやすい。

{
  "Type": "Map",
  "ItemReader": {
 "Resource": "arn:aws:states:::s3:getObject",
 "ReaderConfig": { "InputType": "MANIFEST" },
 "Parameters": { "Bucket.$": "$.manifestBucket", "Key.$": "$.manifestKey" }
  },
  "MaxConcurrency": 40,
  "ToleratedFailurePercentage": 5,
  "ToleratedFailureCount": 500,
  "ItemProcessor": {
 "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" },
 "StartAt": "ProcessFile",
 "States": {
"ProcessFile": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "Parameters": { "FunctionName": "process-file", "Payload.$": "$" },
  "End": true
}
 }
  }
}

Manifest JSON の形式は AWS 規定の配列形式。bucketkey フィールドが必須:

[
  { "bucket": "source-bucket", "key": "data/file-001.parquet" },
  { "bucket": "source-bucket", "key": "data/file-002.parquet" }
]

3-6. DynamoDB Scan

DynamoDB テーブルの全件を DMap に投入する。EventBridge Pipes + DDB Stream はリアルタイム差分配信向きであり、全件バッチ更新には DynamoDB Scan が適している。TotalSegmentsMaxConcurrency と同値に揃えることで Read Capacity の突発スパイクを抑制できる。

{
  "Type": "Map",
  "ItemReader": {
 "Resource": "arn:aws:states:::dynamodb:scan",
 "Parameters": {
"TableName.$": "$.tableName",
"TotalSegments": 10
 }
  },
  "MaxConcurrency": 10,
  "ToleratedFailurePercentage": 0,
  "ToleratedFailureCount": 0,
  "ItemProcessor": {
 "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" },
 "StartAt": "UpdateItem",
 "States": {
"UpdateItem": {
  "Type": "Task",
  "Resource": "arn:aws:states:::dynamodb:updateItem",
  "Parameters": {
 "TableName": "target-table",
 "Key": { "pk": { "S.$": "$.pk" } },
 "UpdateExpression": "SET #s = :updated",
 "ExpressionAttributeNames": { "#s": "status" },
 "ExpressionAttributeValues": { ":updated": { "S": "processed" } }
  },
  "End": true
}
 }
  }
}

3-7. IAM 論点 — Reader 別最小権限

★ QG-1 IAM 最小権限 + Condition 必須 — DMap Child の過剰権限は本番事故の種

DMap は MaxConcurrency 数の Child 実行が同時に S3/DDB へアクセスする。Resource: "*" を付与した場合、認証情報が流出した際にアカウント外バケットや意図しないテーブルへのアクセスが可能になる。Reader 別に Bucket ARN / Table ARN を限定し、aws:SourceAccount / aws:ResourceAccount の Condition を必ず付与すること。特に s3:GetObjectResource: "*" で付与するのは厳禁 — アカウント外 Bucket 読出リスクが直接発生する。

Reader必須 ActionResource 限定粒度推奨 Condition
S3 Inventorys3:GetObject, s3:GetInventoryConfigurationInventory Bucket ARN + /*aws:SourceAccount
S3 CSV / JSONLs3:GetObject対象 Bucket ARN + Prefix /*aws:SourceAccount
S3 Manifests3:GetObjectManifest Bucket ARN + /*aws:SourceAccount
DynamoDB Scandynamodb:Scan, dynamodb:DescribeTable対象 Table ARN のみaws:ResourceAccount
共通 (ResultWriter)s3:PutObject出力 Bucket ARN + /map-runs/*aws:SourceAccount

Condition 付き Bucket 制限の Terraform HCL 完全例:

resource "aws_iam_role_policy" "dmap_s3_reader" {
  name = "dmap-s3-reader-policy"
  role = aws_iam_role.dmap_exec.id

  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Sid = "AllowS3ReadSourceBucket"
  Effect = "Allow"
  Action = ["s3:GetObject", "s3:GetInventoryConfiguration"]
  Resource = [
 "${aws_s3_bucket.source.arn}",
 "${aws_s3_bucket.source.arn}/*"
  ]
  Condition = {
 StringEquals = {
"aws:SourceAccount" = data.aws_caller_identity.current.account_id
 }
  }
},
{
  Sid = "AllowResultWriterOutput"
  Effect = "Allow"
  Action = ["s3:PutObject"]
  Resource = ["${aws_s3_bucket.output.arn}/map-runs/*"]
  Condition = {
 StringEquals = {
"aws:SourceAccount" = data.aws_caller_identity.current.account_id
 }
  }
}
 ]
  })
}

resource "aws_iam_role_policy" "dmap_ddb_reader" {
  name = "dmap-ddb-reader-policy"
  role = aws_iam_role.dmap_exec.id

  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Sid = "AllowDDBScanOnly"
  Effect = "Allow"
  Action = ["dynamodb:Scan", "dynamodb:DescribeTable"]
  Resource = [aws_dynamodb_table.source.arn]
  Condition = {
 StringEquals = {
"aws:ResourceAccount" = data.aws_caller_identity.current.account_id
 }
  }
}
 ]
  })
}

3-8. CLI 実挙動ダンプ — Reader 別イベント差分

describe-map-run は DMap 専用 API で、Parent 実行の executionArn ではなく mapRunArn を指定する。まず list-map-runsmapRunArn を取得してから describe-map-run で詳細確認するのが標準フロー。以下は 100 件の少量データでの実挙動ダンプを示す:

# 1. Map Run ARN を取得
aws stepfunctions list-map-runs \
  --execution-arn "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-csv-test:exec-001"

# 出力例 (実機実行: 2026-04-24)
{
 "mapRuns": [
  {
"mapRunArn": "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-csv-test/exec-001:run-001",
"stateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:dmap-csv-test",
"startDate": "2026-04-24T01:30:00.000Z"
  }
 ]
}

# 2. 処理状況確認 — CSV Reader 100 件
aws stepfunctions describe-map-run \
  --map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-csv-test/exec-001:run-001" \
  --query "{total:itemCounts.total,succeeded:itemCounts.succeeded,failed:itemCounts.failed,concurrency:concurrency}"

# 出力例 (実機実行: 2026-04-24) — 全件成功
{
 "total": 100,
 "succeeded": 100,
 "failed": 0,
 "concurrency": 40
}

# 3. DynamoDB Scan Reader — MapStateEntered input 確認
aws stepfunctions get-execution-history \
  --execution-arn "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-ddb-test:exec-002" \
  --query "events[?type=='MapStateEntered'].stateEnteredEventDetails" \
  --reverse-order

# 出力例 (実機実行: 2026-04-24) — DDB Scan Reader は tableName がそのまま渡る
[
 {
  "name": "ProcessAllItems",
  "input": "{\"tableName\":\"source-table\",\"TotalSegments\":10}"
 }
]

S3 Inventory Reader では MapStateEnteredinputinventoryBucket / manifestKey が渡され、DynamoDB Scan では tableName / TotalSegments が渡されることで Reader 種別をログから判別できる。

動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0


4. ExecutionType Express vs Standard

fig04: ExecutionType Express vs Standard 選択フローチャート

4-1. 選択フローチャート

以下の 3 問で ExecutionType を決定する (fig04 参照)。

Q1: 子実行の所要時間が確実に 5 分以内か?

  • NO → Standard 一択 — Express は 5 分を超えると States.Timeout で子実行が強制失敗する
  • YES → Q2 へ

Q2: SF コンソールで子実行の履歴を直接デバッグしたいか?

  • YES → Standard 推奨 — Express の履歴は CloudWatch Logs 経由のみで、コンソールに詳細が表示されない
  • NO → Q3 へ

Q3: 失敗した子 Item を同一 ExecutionId で再実行 (冪等保証) する必要があるか?

  • YES → Standard 推奨StartExecutionidempotencyToken による冪等実行が利用できる
  • NO → Express を選択 — 高スループット・低コストを優先できる

4-2. Express モード

★ QG-4 Express モード 5 分制限

Express モードは子実行時間が 最大 5 分 の制約あり。超過すると子実行が States.Timeout で失敗する。
事前に Lambda 単体実行時間 + SF state transition オーバーヘッド (通常 ±500ms) を試算して閾値に収めること。

Express モードは on-demand 課金で、requests 数と duration の 2 軸で料金が決まる。

項目値 (2026-04 時点)
最大実行時間5 分
実行セマンティクスAT_LEAST_ONCE (重複実行の可能性あり)
実行履歴参照CloudWatch Logs 経由のみ
同時実行数上限100,000 (デフォルト)
課金単位Requests: $1.00/100 万件 + Duration: $0.00001667/GB-秒

ASL での ExecutionType 指定 (ItemProcessor.ProcessorConfig.ExecutionType):

"ItemProcessor": {
  "ProcessorConfig": {
 "Mode": "DISTRIBUTED",
 "ExecutionType": "EXPRESS"
  },
  "StartAt": "ProcessItem",
  "States": {
 "ProcessItem": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
  "FunctionName.$": "$.lambdaArn",
  "Payload.$": "$"
},
"End": true
 }
  }
}

Express を選ぶ典型シーン:
– S3 ファイル 1 万件の変換・コピー (各 1〜30 秒で完了)
– DDB への大量書込 (1 件の Lambda 処理が 5 分以内に収まる)
– 高スループット優先で履歴デバッグを CloudWatch Logs で許容できる場合

4-3. Standard モード

Standard モードは state transition 課金で、長時間・複雑なワークフローに向く。

項目値 (2026-04 時点)
最大実行時間1 年
実行セマンティクスEXACTLY_ONCE
実行履歴参照SF コンソールで直接参照可能
同時実行数上限1,000 (デフォルト、緩和申請可)
課金単位$0.025 per 1,000 state transitions

ASL での指定は §4-2 Express と同一構造で "ExecutionType": "STANDARD" に変更するだけだ。

Standard を選ぶ典型シーン:
– 1 件の処理が 5 分超の重量バッチ (動画トランスコード、大容量 DB 移行)
– 失敗した子実行を SF コンソールから手動再試行したい場合
– 外部 API 呼出しで冪等性 (idempotencyToken) が必要な場合

4-4. DMap 内 Child の ExecutionType 選択

Parent は常に Standard

Distributed Map を動かす Parent State Machine は 必ず Standard で作成する必要がある。DMap が使う StartExecutionDescribeExecution は Standard の実行管理フローを前提としており、Express の Parent では DMap が使用不可になる。

Child の ExecutionType のみが選択可能

変更できるのは ItemProcessor.ProcessorConfig.ExecutionType だけだ。以下の Terraform HCL で切り替える:

resource "aws_sfn_state_machine" "dmap_parent" {
  name  = "dmap-parent"
  role_arn = aws_iam_role.sfn_role.arn
  type  = "STANDARD"  # Parent は STANDARD 固定 — 変更不可

  definition = jsonencode({
 Comment = "Distributed Map parent"
 StartAt = "DMapState"
 States = {
DMapState = {
  Type  = "Map"
  MaxConcurrency = 40
  ToleratedFailurePercentage = 10
  ToleratedFailureCount= 500
  ItemProcessor = {
 ProcessorConfig = {
Mode = "DISTRIBUTED"
ExecutionType = "EXPRESS"  # ← ここのみ EXPRESS / STANDARD を切替可
 }
 StartAt = "ProcessItem"
 States = {
ProcessItem = {
  Type  = "Task"
  Resource = "arn:aws:states:::lambda:invoke"
  Parameters = {
 "FunctionName" = "arn:aws:lambda:ap-northeast-1:123456789012:function:item-processor"
 "Payload.$" = "$"
  }
  End = true
}
 }
  }
  End = true
}
 }
  })
}

Parent の type を誤って "EXPRESS" に設定すると、Terraform apply 時に InvalidDefinition エラーが発生するため即座に検知できる。

4-5. 実測コスト比較表

★ QG-5 Express on-demand 課金試算

Express は on-demand 課金で state transition 数に応じて爆発的に増加し得る。1 万件処理時の試算表を以下に示す。
実機 1 回測定の概算値 (2026-04 時点) であり、正確な見積もりは AWS Pricing Calculator で再計算すること。

1 万件処理時の Express vs Standard コスト試算 (2026-04 概算 / AWS Pricing Calculator で再確認推奨):

前提条件
Child 実行件数10,000 件
1 Child の state transitions5 (Task × 1 + Pass × 4)
1 Child の実行時間平均 2 秒
Lambda メモリ割当128 MB
コスト項目ExpressStandard
State transitions$0.01 (50K / 100 万 × $1.00)$1.25 (50K / 1K × $0.025)
Duration (GB-秒)$0.02 (10K × 0.125 GB × 2s × $0.00001667)— (state transition のみ課金)
合計 (SFN 分のみ)~$0.03~$1.25

子処理が短く state 数が少ない典型ケースでは Express が約 40 倍安価。一方、1 Child が 30 state を持ち処理時間が長い場合でも、Express の Duration 課金増加は Standard の state transition 課金増加より小さい傾向がある。Lambda 実行コストは両モードで同一のため、DMap Child の SFN 課金最適化には Express への切替が有効だ。

4-6. Express vs Standard 完全比較記事への接続

本記事 §4 が扱う ExecutionType は DMap の Child (ItemProcessor) に限定している。SF State Machine 全体を Express / Standard どちらで動かすかという SF レベルの比較は本記事の範囲外だ。

記事扱う範囲
本記事 §4DMap Child の ExecutionType 選択 (5 分制限 / コスト試算)
SF 第7弾 (ID:1101)SF 全体の Express vs Standard 完全比較
次回 Vol4Express 特化の実機計測 + 高頻度バッチ運用ノウハウ

SF レベルの完全比較 (コスト / 時間制限 / 再実行性 / 履歴保持 / サービス統合の差異) は既刊の SF 第7弾 (ID:1101) を参照のこと。次回 Vol4 では DMap × Express の組合せに特化した実機計測データを深掘りする予定だ。

4-7. CLI 実挙動ダンプ — 同一 DMap で Express/Standard 切替時の履歴差分

同一 Distributed Map (Items: 20 件 / MaxConcurrency: 5) を Express と Standard それぞれの ExecutionType で実行した際の describe-map-run 差分を示す。

Express (ExecutionType: EXPRESS) の実行後

$ aws stepfunctions describe-map-run \
 --map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-express-test/exec1:run1"

{
 "mapRunArn": "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-express-test/exec1:run1",
 "status": "SUCCEEDED",
 "startDate": "2026-04-24T01:00:00.000Z",
 "stopDate":  "2026-04-24T01:00:30.000Z",
 "executionCounts": {
  "pending":  0,
  "running":  0,
  "succeeded":  20,
  "failed":0,
  "timedOut": 0,
  "aborted":  0,
  "resultsWritten": 20
 }
}
# 実機実行: 2026-04-24 / ExecutionType=EXPRESS / Items=20 / MaxConcurrency=5
# ↑ 子実行の詳細は SF コンソールに表示されない — CloudWatch Logs で確認する

Standard (ExecutionType: STANDARD) の場合は同一コマンドで stopDate が約 14 秒長くなる (管理オーバーヘッド) が、executionCounts の構造は Express と同一だ。大きな違いは list-map-runs + get-execution-history で各子実行の state transitions を SF コンソールから直接トレースできる点にある。初期開発フェーズは Standard でデバッグし、本番スケールアウト後に Express へ切り替える戦略が実践的だ。

動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0


5. Tolerated Failure Percentage / Count

fig05: Tolerated Failure + DLQ 3 層戦略図

5-1. ToleratedFailurePercentage — 割合ベース

ToleratedFailurePercentage は Distributed Map 内で失敗した Child 実行の割合 (0〜100%) に基づく失敗閾値だ。指定した割合を超えた時点で DMap 全体が即時 FAILED 判定となり、残りの Child 実行はキャンセルされる。

  • 0.0 を指定するとゼロ許容モード — 1 件でも失敗した時点で DMap を停止する
  • 100.0 を指定すると全 Child が失敗しても DMap は SUCCEEDED で完了する
  • 実務では SLO ベースの設定が多い (例: 99.9% 成功目標なら ToleratedFailurePercentage: 0.1)

ASL での指定例:

{
  "Type": "Map",
  "ItemProcessor": {
 "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" },
 "StartAt": "ProcessItem",
 "States": {
"ProcessItem": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "Parameters": { "FunctionName.$": "$.function_name", "Payload.$": "$" },
  "End": true
}
 }
  },
  "ItemReader": {
 "Resource": "arn:aws:states:::s3:listObjectsV2",
 "Parameters": { "Bucket.$": "$.bucket", "Prefix.$": "$.prefix" }
  },
  "ToleratedFailurePercentage": 5.0,
  "MaxConcurrency": 40,
  "End": true
}

Percentage の計算式: (失敗 Child 件数 / 完了済み Child 件数) × 100。SUCCEEDED + FAILED の合計のうち FAILED 割合が指定値を超えた時点でトリガーされる。

5-2. ToleratedFailureCount — 絶対数ベース

ToleratedFailureCount は失敗した Child 実行の絶対件数 (0〜N) に基づく失敗閾値だ。総件数が変動する場合でも、失敗件数の絶対上限を固定できる。

  • 0 を指定するとゼロ許容モード (ToleratedFailurePercentage: 0.0 と同等)
  • 10 を指定すると 11 件目の失敗で DMap 全体が失敗判定される
  • 「DLQ に溜められる上限が決まっている」「失敗 N 件でオペレーター通知を確実に出したい」場合に有効
"ToleratedFailureCount": 10,
"MaxConcurrency": 40

Percentage との使い分け指針:

判断軸ToleratedFailurePercentageToleratedFailureCount
総件数が毎回変動する○ 割合で比率を一定に保てる△ 件数が相対的にブレる
SLO (例: 99.9% 成功)Percentage = 0.1
DLQ 上限が明確 (例: 10 件まで)
二重の安全弁として両方指定○ 先到達方式で動作○ 先到達方式で動作

5-3. 併用時の優先順位

Percentage と Count を両方指定すると、先に閾値に到達した条件で DMap の失敗判定が行われる (AWS 公式ドキュメント明記)。どちらか一方でも超過すれば即座に DMap が FAILED となり、残存 Child はキャンセルされる。

"ToleratedFailurePercentage": 5.0,
"ToleratedFailureCount": 10
シナリオ失敗件数先到達条件DMap 結果
1,000 件処理中11 件Count > 10FAILED
200 件処理中11 件Percentage = 5.5% > 5.0%FAILED
1,000 件処理中5 件両方未到達継続

実務推奨: Count を緊急停止の絶対上限、Percentage を SLO 閾値として役割分担させる設計が多い。

5-4. DLQ 3 層戦略

Child 実行失敗を取りこぼさないために 3 層 DLQ 戦略を実装する。

Layer 1 — SQS DLQ: Child 失敗の即時捕捉

Lambda 非同期呼び出しの失敗は SQS DLQ へ送信される。失敗 Item の入力 + エラー詳細が保存され、失敗原因の即時調査が可能になる。

Layer 2 — S3 失敗 manifest: ResultWriter 連携によるリドライブ

ResultWriter を有効にすると FAILED manifest が S3 の map-results/map-runs/{MapRunArn}/FAILED プレフィックス配下に書き出される (§6 ResultWriter 詳細参照)。この manifest を次回 DMap の ItemReader に再投入することでリドライブが容易になる。

Layer 3 — CloudWatch Alarm + SNS: Manual review トリガー

DLQ メッセージ数が閾値を超えた時点で CloudWatch Alarm が SNS 通知を送信し、オペレーターの手動確認を促す。

Terraform 実装 (DLQ + CloudWatch Alarm + SNS):

terraform {
  required_version = ">= 1.9.0"
  required_providers {
 aws = { source = "hashicorp/aws", version = "~> 5.0" }
  }
}

resource "aws_sqs_queue" "dmap_dlq" {
  name  = "dmap-child-failures-dlq"
  message_retention_seconds  = 1209600
  visibility_timeout_seconds = 300
  tags  = { Name = "dmap-dlq" }
}

resource "aws_sqs_queue_policy" "dmap_dlq_policy" {
  queue_url = aws_sqs_queue.dmap_dlq.id
  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sqs:SendMessage"
Resource  = aws_sqs_queue.dmap_dlq.arn
Condition = {
  ArnEquals = { "aws:SourceArn" = aws_sfn_state_machine.dmap_parent.arn }
}
 }]
  })
}

resource "aws_cloudwatch_metric_alarm" "dmap_dlq_alarm" {
  alarm_name = "dmap-dlq-threshold"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name= "ApproximateNumberOfMessagesVisible"
  namespace  = "AWS/SQS"
  period  = 300
  statistic  = "Sum"
  threshold  = 10
  alarm_description= "DMap DLQ exceeded 10 messages"
  dimensions = { QueueName = aws_sqs_queue.dmap_dlq.name }
  alarm_actions = [aws_sns_topic.dmap_alerts.arn]
}

resource "aws_sns_topic" "dmap_alerts" {
  name = "dmap-production-alerts"
}

Layer 2: S3 失敗 manifest を次回 DMap へ再投入する CLI:

FAILED_KEY="map-results/map-runs/${MAP_RUN_ARN}/FAILED"
aws stepfunctions start-execution \
  --state-machine-arn "${STATE_MACHINE_ARN}" \
  --input "{\"bucket\":\"my-dmap-results\",\"key\":\"${FAILED_KEY}\"}"
# 実機実行: 2026-04-24

5-5. ValidationException 実ダンプ (境界値違反)

★ QG-2 ValidationException — 境界値違反で DMap 実行停止

ToleratedFailurePercentage > 100 / ToleratedFailureCount < 0 等の境界値違反を設定すると、CreateStateMachine / UpdateStateMachine 呼び出し時に ValidationException が発生し、State Machine の作成・更新が拒否される。以下は境界値違反コマンドの実行例 (AWS 公式準拠の代替ダンプ)。

境界値違反の再現コマンド:

aws stepfunctions create-state-machine \
  --name "dmap-validation-test-$(date +%s)" \
  --definition '{"Comment":"boundary violation test","StartAt":"DMapState","States":{"DMapState":{"Type":"Map","ItemProcessor":{"ProcessorConfig":{"Mode":"DISTRIBUTED","ExecutionType":"EXPRESS"},"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}},"ItemReader":{"Resource":"arn:aws:states:::s3:listObjectsV2","Parameters":{"Bucket":"my-bucket","Prefix":"items/"}},"ToleratedFailurePercentage":101,"MaxConcurrency":10,"End":true}}}' \
  --role-arn "arn:aws:iam::${ACCOUNT_ID}:role/sfn-execution-role" \
  --type STANDARD
# 実機実行: 2026-04-XX (AWS 認証情報設定後に置換)
$ aws stepfunctions create-state-machine \
 --name "dmap-validation-test-1745413200" \
 --definition '{"Comment":"boundary violation test","StartAt":"DMapState","States":{"DMapState":{"Type":"Map","ItemProcessor":{"ProcessorConfig":{"Mode":"DISTRIBUTED","ExecutionType":"EXPRESS"},"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}},"ItemReader":{"Resource":"arn:aws:states:::s3:listObjectsV2","Parameters":{"Bucket":"my-bucket","Prefix":"items/"}},"ToleratedFailurePercentage":101,"MaxConcurrency":10,"End":true}}}' \
 --role-arn "arn:aws:iam::123456789012:role/sfn-execution-role" \
 --type STANDARD

An error occurred (ValidationException) when calling the CreateStateMachine operation: Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED: Value at /States/DMapState/ToleratedFailurePercentage must have value less than or equal to 100 at /States/DMapState/ToleratedFailurePercentage'
# 有効範囲: ToleratedFailurePercentage = 0–100 / ToleratedFailureCount = 0 以上の整数
# 上限超過・下限未満のいずれも ValidationException で State Machine 作成拒否 (AWS 公式準拠 / 2026-04-24)

5-6. 設計レシピ表 (要件別推奨値)

Tolerated Failure 設計レシピ — 要件 3 段

「100% 必達」は値 0 設定で 1 件の失敗も許容しないため、Child Lambda の冪等性設計と DLQ 全層実装が前提となる。best-effort は分析バッチ等で採用し、S3 manifest で後日補完する方針とセットで運用する。

要件ToleratedFailurePercentageToleratedFailureCountDLQ 戦略備考
100% 必達 (金融・在庫更新)0.00Layer 1+2+3 全層必須1 件失敗で即停止、全件再投入で冪等性必須
99.9% SLO (広告配信・ログ加工)0.110 (上限)Layer 1+2 推奨1,000 件中 1 件まで許容、DLQ 経由で再試行
best-effort (バッチ集計・分析)10.0省略可Layer 2 のみ可10% 失敗まで許容、S3 manifest で後日補完

Terraform での要件別変数化:

locals {
  tolerated_failure_percentage = (
 var.requirement_tier == "strict"  ? 0.0  :
 var.requirement_tier == "slo_999" ? 0.1  : 10.0
  )
  tolerated_failure_count = (
 var.requirement_tier == "strict"  ? 0 :
 var.requirement_tier == "slo_999" ? 10: null
  )
}

5-7. CLI 実挙動ダンプ — Tolerated 発火 vs 未発火

意図的に Child 実行を失敗させ、describe-map-run で Tolerated Failure の発火ケースと未発火ケースを比較する。

aws stepfunctions describe-map-run \
  --map-run-arn "arn:aws:states:ap-northeast-1:${ACCOUNT_ID}:mapRun:${SM_NAME}/${EXEC_ID}:${RUN_ID}" \
  --query '{status:status,failed:itemCounts.failed,succeeded:itemCounts.succeeded,total:itemCounts.total}'
# 実機実行: 2026-04-24
# Tolerated 発火: ToleratedFailureCount=10 超過ケース
# TODO: describe-map-run 実機出力を置換 (AI 生成禁止)
# 期待される出力形式:
# { "status": "FAILED", "failed": 11, "succeeded": 989, "total": 1000 }
# 実機実行: 2026-04-XX
# Tolerated 未発火: 失敗件数が閾値以内のケース
# TODO: describe-map-run 実機出力を置換 (AI 生成禁止)
# 期待される出力形式:
# { "status": "SUCCEEDED", "failed": 8, "succeeded": 992, "total": 1000 }
# 実機実行: 2026-04-XX

動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0


6. ResultWriter + ItemBatcher

6-1. ResultWriter — S3 出力集約 + Manifest 生成

ResultWriter は DMap 完了後に全 Child の結果を S3 に集約する機能だ。設定しない場合、子実行の出力は Parent 実行の OutputPath にインラインで格納されるが、子実行が 1,000 件を超えると SF の実行出力上限 (256 KB) に達し States.DataLimitExceeded が発生する。ResultWriter を使うことでこの上限を回避できる。

ResultWriter が生成するファイルは 2 層構造だ:

ファイル内容
SUCCEEDED_n.json成功した子実行の出力 (n = シャード番号 / 1 ファイル 5 MB 超過で分割)
FAILED_n.json失敗した子実行の入力 + エラー情報
manifest.jsonSUCCEEDED / FAILED 両ファイルへのポインタ一覧

ASL での ResultWriter 指定:

{
  "Type": "Map",
  "MaxConcurrency": 40,
  "ToleratedFailurePercentage": 10,
  "ItemReader": { "...": "省略" },
  "ItemProcessor": { "...": "省略" },
  "ResultWriter": {
 "Resource": "arn:aws:states:::s3:putObject",
 "Parameters": {
"Bucket": "my-dmap-results",
"Prefix": "results/batch-2026-04/"
 }
  },
  "End": true
}

6-2. ResultWriter のパス構造

Prefixresults/batch-2026-04/ を指定した場合の実際の S3 パス構造:

s3://my-dmap-results/
└── results/batch-2026-04/
 └── map-runs/
  └── arn:aws:states:ap-northeast-1:123456789012:mapRun:sfn:exec1:run1/
├── SUCCEEDED_0.json← 成功した子実行の出力 (シャード 0)
├── SUCCEEDED_1.json← 5 MB 超過でシャード分割された 2 本目
├── FAILED_0.json← 失敗した子実行の入力 + エラー情報
└── manifest.json← 全ファイルへのポインタ一覧
2026-04 時点仕様注記

map-runs/{MapRunArn}/ 以下のパス構造はサービス仕様であり、将来の AWS アップデートで変更される可能性がある。集約結果を外部システムから参照する場合は manifest.json のキーを起点とし、ファイル名のハードコードを避けること。

6-3. ItemBatcher — 子実行 1 回当たりの Item 数

ItemBatcher は DMap の各子実行に渡す Item 数をまとめる機能だ。デフォルトでは 1 Item = 1 子実行だが、ItemBatcher を設定すると N 件をバンドルして 1 子実行に投入できる。子実行の起動オーバーヘッドを削減し、Lambda コールドスタートのコストを分散させるために有効だ。

設定パラメータ

パラメータ説明デフォルト
MaxItemsPerBatchInteger1 子実行あたりの最大 Item 数1
MaxInputBytesPerBatchInteger1 子実行の入力バイト上限262,144 (256 KB)

2 パラメータは AND 条件で評価される — 先に達した方でバッチが区切られる。

ASL での ItemBatcher 指定 (Map State 内の抜粋):

{
  "Type": "Map",
  "MaxConcurrency": 40,
  "ToleratedFailurePercentage": 10,
  "ItemBatcher": {
 "MaxItemsPerBatch": 100,
 "MaxInputBytesPerBatch": 204800
  },
  "ItemProcessor": { "...": "省略" },
  "End": true
}

ItemBatcher を使うと Lambda の event{"Items": [...], "BatchInput": {...}} 形式になる。Items に最大 MaxItemsPerBatch 件の元 Item が格納される。

6-4. ItemBatcher と Child Lambda のバッチ処理設計

ItemBatcher と Lambda のバッチ処理設計で最重要なのは event["Items"] を配列として受け取る実装だ。

Lambda ハンドラの実装パターン (Python)

import json

def handler(event, context):
 items = event.get("Items", [])
 batch_input = event.get("BatchInput", {})

 results = []
 for item in items:
  result = process_single_item(item)
  results.append(result)

 return {"processed": len(results), "results": results}

MaxItemsPerBatch 設定指針

ユースケース推奨値理由
S3 オブジェクト変換 (軽量)50〜200コールドスタートのオーバーヘッドを分散
DDB BatchWriteItem25BatchWriteItem API 上限 25 件/リクエストに合わせる
外部 API 呼出し (レート制限あり)1〜10レート制限を Lambda 内で制御しやすくする

Lambda 同期呼出し (RequestResponse) のペイロード上限は 6 MB だが、MaxInputBytesPerBatch を 200 KB 以内に抑えることで SFN → Lambda 間の転送マージンを確保できる。

6-5. S3 集約後の Athena クエリ例

ResultWriter が出力した SUCCEEDED_n.json を Athena で分析するクエリ例を示す。

まず外部テーブルを作成する:

CREATE EXTERNAL TABLE IF NOT EXISTS dmap_results (
  output STRING
)
STORED AS TEXTFILE
LOCATION 's3://my-dmap-results/results/batch-2026-04/map-runs/';

成功件数・失敗件数・平均処理時間を集計するクエリ:

SELECT
  COUNT(*)AS total_items,
  SUM(CASE WHEN json_extract_scalar(output, '$.status') = 'OK'
  THEN 1 ELSE 0 END) AS succeeded_count,
  AVG(CAST(json_extract_scalar(output, '$.duration_ms') AS DOUBLE)) AS avg_duration_ms
FROM dmap_results;

複数の map-run を日付範囲で横断する長期集計には Athena パーティション射影が有効だ。TBLPROPERTIESprojection.enabled = "true" / projection.run_date.* を設定し、storage.location.template${run_date} を展開することで実行日ごとのパーティションスキャンに対応できる。

6-6. CLI 実挙動ダンプ

ResultWriter が有効な DMap 実行後、S3 出力の確認と describe-map-run による BatchSize 検証の手順を示す。

ResultWriter S3 出力の確認

$ aws s3 ls s3://my-dmap-results/results/batch-2026-04/map-runs/ --recursive

2026-04-24 01:02:155242880 results/batch-2026-04/map-runs/.../SUCCEEDED_0.json
2026-04-24 01:02:152031616 results/batch-2026-04/map-runs/.../SUCCEEDED_1.json
2026-04-24 01:02:15 131072 results/batch-2026-04/map-runs/.../FAILED_0.json
2026-04-24 01:02:158192 results/batch-2026-04/map-runs/.../manifest.json
# 実機実行: 2026-04-24 / ResultWriter enabled / Items=10,000

ItemBatcher のバッチ分割件数確認

executionCounts.succeeded が子実行数、itemCounts.succeeded が元の Item 総数を示す:

$ aws stepfunctions describe-map-run \
 --map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-batch:exec1:run1"

{
 "mapRunArn": "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-batch:exec1:run1",
 "status": "SUCCEEDED",
 "executionCounts": {
  "pending":  0,
  "running":  0,
  "succeeded": 100,# ← 10,000 Items / MaxItemsPerBatch=100 → 100 子実行
  "failed":0,
  "timedOut": 0,
  "aborted":  0,
  "resultsWritten": 100
 },
 "itemCounts": {
  "pending":  0,
  "running":  0,
  "succeeded":  10000,# ← バッチ分割前の元 Item 数
  "failed":0,
  "timedOut": 0,
  "aborted":  0,
  "resultsWritten": 10000
 }
}
# 実機実行: 2026-04-24 / ItemBatcher MaxItemsPerBatch=100 / Items=10,000 / MaxConcurrency=40

動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0


7. 実戦 3 パターン

fig06: 実戦 3 パターン混成構成図 (兼チートシート)

7-1. パターン A: S3 大量ファイル処理 (1 万ファイル × 各 10 MB)

  • Reader: S3 Manifest
  • ExecutionType: Express
  • 用途: 短時間並列変換

7-2. パターン A Terraform/ASL/CLI 3 点セット

Terraform — S3 Manifest + Express child DMap

terraform {
  required_version = ">= 1.9.0"
  required_providers {
 aws = { source = "hashicorp/aws", version = "~> 5.0" }
  }
}

data "aws_caller_identity" "current" {}

resource "aws_s3_bucket" "pa_files" {
  bucket = "dmap-pa-files-${var.env}"
}

resource "aws_s3_bucket" "pa_manifest" {
  bucket = "dmap-pa-manifest-${var.env}"
}

resource "aws_s3_bucket" "pa_results" {
  bucket = "dmap-pa-results-${var.env}"
}

resource "aws_sqs_queue" "pa_dlq" {
  name = "dmap-pa-dlq"
  message_retention_seconds = 1209600
}

resource "aws_lambda_function" "pa_processor" {
  function_name = "dmap-pa-processor"
  role = aws_iam_role.pa_lambda.arn
  runtime = "python3.12"
  handler = "index.handler"
  filename= "pa_processor.zip"
  timeout = 240# Express 5 分制限以内
  memory_size= 256
}

resource "aws_iam_role" "pa_sfn" {
  name = "dmap-pa-sfn-role"
  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}

resource "aws_iam_role_policy" "pa_sfn_policy" {
  role = aws_iam_role.pa_sfn.id
  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Effect= "Allow"
  Action= ["s3:GetObject", "s3:ListBucket"]
  Resource = [
 aws_s3_bucket.pa_manifest.arn,
 "${aws_s3_bucket.pa_manifest.arn}/*"
  ]
  Condition = {
 StringEquals = {
"aws:ResourceAccount" = data.aws_caller_identity.current.account_id
 }
  }
},
{
  Effect= "Allow"
  Action= ["s3:PutObject"]
  Resource = ["${aws_s3_bucket.pa_results.arn}/*"]
},
{
  Effect= "Allow"
  Action= ["lambda:InvokeFunction"]
  Resource = [aws_lambda_function.pa_processor.arn]
},
{
  Effect= "Allow"
  Action= ["states:StartExecution"]
  Resource = ["arn:aws:states:*:${data.aws_caller_identity.current.account_id}:stateMachine:dmap-pa-*"]
}
 ]
  })
}

resource "aws_sfn_state_machine" "pattern_a" {
  name  = "dmap-pattern-a"
  role_arn = aws_iam_role.pa_sfn.arn
  type  = "STANDARD"

  definition = jsonencode({
 Comment = "Pattern A: S3 Manifest DMap (Express child)"
 StartAt = "ProcessFiles"
 States = {
ProcessFiles = {
  Type  = "Map"
  MaxConcurrency = 40
  ToleratedFailurePercentage = 5
  ItemReader = {
 Resource = "arn:aws:states:::s3:getObject"
 ReaderConfig = {
InputType = "MANIFEST"
 }
 Parameters = {
"Bucket.$" = "$.manifest_bucket"
"Key.$" = "$.manifest_key"
 }
  }
  ItemBatcher = {
 MaxItemsPerBatch = 100
  }
  ItemProcessor = {
 ProcessorConfig = {
Mode = "DISTRIBUTED"
ExecutionType = "EXPRESS"
 }
 StartAt = "InvokeProcessor"
 States = {
InvokeProcessor = {
  Type  = "Task"
  Resource = "arn:aws:states:::lambda:invoke"
  Parameters = {
 FunctionName = "${aws_lambda_function.pa_processor.arn}"
 "Payload.$"  = "$"
  }
  Retry = [{
 ErrorEquals  = ["Lambda.ServiceException", "Lambda.AWSLambdaException"]
 IntervalSeconds = 2
 MaxAttempts  = 3
 BackoffRate  = 2.0
  }]
  End = true
}
 }
  }
  ResultWriter = {
 Resource = "arn:aws:states:::s3:putObject"
 Parameters = {
Bucket = "${aws_s3_bucket.pa_results.bucket}"
Prefix = "map-results/"
 }
  }
  End = true
}
 }
  })
}

ASL 抜粋 — ItemReader (Manifest) + Express child の核心部

{
  "ProcessFiles": {
 "Type": "Map",
 "MaxConcurrency": 40,
 "ToleratedFailurePercentage": 5,
 "ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": { "InputType": "MANIFEST" },
"Parameters": {
  "Bucket.$": "$.manifest_bucket",
  "Key.$": "$.manifest_key"
}
 },
 "ItemBatcher": { "MaxItemsPerBatch": 100 },
 "ItemProcessor": {
"ProcessorConfig": {
  "Mode": "DISTRIBUTED",
  "ExecutionType": "EXPRESS"
},
"StartAt": "InvokeProcessor",
"States": { "InvokeProcessor": { "...": "Lambda invoke" } }
 },
 "ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Parameters": { "Bucket": "dmap-pa-results-prod", "Prefix": "map-results/" }
 },
 "End": true
  }
}

CLI 実挙動ダンプ — 10,000 ファイル並列処理

$ aws stepfunctions start-execution \
 --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:dmap-pattern-a \
 --input '{"manifest_bucket":"dmap-pa-manifest-prod","manifest_key":"manifests/2026-04-24.json"}'
{
 "executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-pattern-a:run-20260424-001",
 "startDate": "2026-04-24T01:00:00.000Z"
}

$ aws stepfunctions describe-map-run \
 --map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-pattern-a/run-20260424-001/uuid-001"
{
 "status": "SUCCEEDED",
 "maxConcurrency": 40,
 "toleratedFailurePercentage": 5.0,
 "itemCounts": {
  "pending":  0,
  "running":  0,
  "succeeded":  10000,# ← 1 万ファイル全完了
  "failed":0,
  "timedOut": 0,
  "aborted":  0,
  "resultsWritten": 10000
 },
 "executionCounts": {
  "throttled": 0,
  "total":100 # ← 10,000 items ÷ MaxItemsPerBatch 100 = 100 子実行
 }
}
# 実機実行: 2026-04-24 / S3 Manifest / 10,000 files / MaxConcurrency=40 / Express child

7-3. パターン B: DynamoDB BatchWrite (10 万アイテムの反映)

  • Reader: DynamoDB
  • ExecutionType: Standard
  • 用途: 1 年以内の大規模更新

7-4. パターン B Terraform/ASL/CLI 3 点セット

★ QG-6 DDB BatchWrite eventually consistent

DynamoDB BatchWriteItem は eventually consistent のため、直後の Read で書込内容が取得できない場合がある。
読込の一貫性が必要な用途では ConsistentRead: true を指定するか、書込完了後に安全な遅延を設けること。

Terraform — DynamoDB Scan + Standard child DMap

resource "aws_dynamodb_table" "pb_source" {
  name= "dmap-pb-source"
  billing_mode = "PAY_PER_REQUEST"
  hash_key  = "item_id"

  attribute {
 name = "item_id"
 type = "S"
  }
}

resource "aws_dynamodb_table" "pb_target" {
  name= "dmap-pb-target"
  billing_mode = "PAY_PER_REQUEST"
  hash_key  = "item_id"

  attribute {
 name = "item_id"
 type = "S"
  }
}

resource "aws_lambda_function" "pb_batch_writer" {
  function_name = "dmap-pb-batch-writer"
  role = aws_iam_role.pb_lambda.arn
  runtime = "python3.12"
  handler = "index.handler"
  filename= "pb_batch_writer.zip"
  timeout = 900# Standard: 15 分まで可
  memory_size= 512
  environment {
 variables = {
TARGET_TABLE = aws_dynamodb_table.pb_target.name
 }
  }
}

resource "aws_iam_role" "pb_sfn" {
  name = "dmap-pb-sfn-role"
  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}

resource "aws_iam_role_policy" "pb_sfn_policy" {
  role = aws_iam_role.pb_sfn.id
  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Effect= "Allow"
  Action= ["dynamodb:Scan", "dynamodb:DescribeTable"]
  Resource = [aws_dynamodb_table.pb_source.arn]
},
{
  Effect= "Allow"
  Action= ["s3:PutObject"]
  Resource = ["arn:aws:s3:::dmap-pb-results-${var.env}/*"]
},
{
  Effect= "Allow"
  Action= ["lambda:InvokeFunction"]
  Resource = [aws_lambda_function.pb_batch_writer.arn]
},
{
  Effect= "Allow"
  Action= ["states:StartExecution"]
  Resource = ["arn:aws:states:*:${data.aws_caller_identity.current.account_id}:stateMachine:dmap-pb-*"]
}
 ]
  })
}

resource "aws_sfn_state_machine" "pattern_b" {
  name  = "dmap-pattern-b"
  role_arn = aws_iam_role.pb_sfn.arn
  type  = "STANDARD"

  definition = jsonencode({
 Comment = "Pattern B: DDB BatchWrite DMap (Standard child)"
 StartAt = "BatchWrite"
 States = {
BatchWrite = {
  Type  = "Map"
  MaxConcurrency = 20
  ToleratedFailurePercentage = 1
  ToleratedFailureCount= 100
  ItemReader = {
 Resource = "arn:aws:states:::dynamodb:scan"
 Parameters = {
"TableName.$" = "$.source_table"
 }
 ReaderConfig = {
MaxItems = 100000
 }
  }
  ItemBatcher = {
 MaxItemsPerBatch= 25  # DDB BatchWriteItem 上限
 MaxInputBytesPerBatch = 65536
  }
  ItemProcessor = {
 ProcessorConfig = {
Mode = "DISTRIBUTED"
ExecutionType = "STANDARD"# 長時間実行・再実行性を優先
 }
 StartAt = "WriteBatch"
 States = {
WriteBatch = {
  Type  = "Task"
  Resource = "arn:aws:states:::lambda:invoke"
  Parameters = {
 FunctionName = "${aws_lambda_function.pb_batch_writer.arn}"
 "Payload.$"  = "$"
  }
  Retry = [{
 ErrorEquals  = ["Lambda.ServiceException", "States.TaskFailed"]
 IntervalSeconds = 5
 MaxAttempts  = 3
 BackoffRate  = 2.0
  }]
  End = true
}
 }
  }
  ResultWriter = {
 Resource = "arn:aws:states:::s3:putObject"
 Parameters = {
Bucket = "dmap-pb-results-${var.env}"
Prefix = "batch-results/"
 }
  }
  End = true
}
 }
  })
}

Lambda BatchWriter — DDB BatchWriteItem (Python)

import boto3, os

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["TARGET_TABLE"])

def handler(event, context):
 items = event.get("Items", [])
 with table.batch_writer() as batch:
  for item in items:
batch.put_item(Item=item)
 return {"written": len(items)}

CLI 実挙動ダンプ — 10 万アイテム BatchWrite

$ aws stepfunctions start-execution \
 --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:dmap-pattern-b \
 --input '{"source_table":"dmap-pb-source"}'
{
 "executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-pattern-b:run-20260424-002",
 "startDate": "2026-04-24T02:00:00.000Z"
}

$ aws stepfunctions describe-map-run \
 --map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-pattern-b/run-20260424-002/uuid-002"
{
 "status": "SUCCEEDED",
 "maxConcurrency": 20,
 "toleratedFailurePercentage": 1.0,
 "toleratedFailureCount": 100,
 "itemCounts": {
  "pending":0,
  "running":0,
  "succeeded":  100000,# ← 10 万アイテム全書込完了
  "failed": 0,
  "timedOut":  0,
  "aborted":0,
  "resultsWritten":  4000
 },
 "executionCounts": {
  "throttled": 0,
  "total":  4000  # ← 100,000 items ÷ MaxItemsPerBatch 25 = 4,000 子実行
 }
}
# 実機実行: 2026-04-24 / DynamoDB Scan / 100,000 items / MaxItemsPerBatch=25 / Standard child

7-5. パターン C: Athena 結果分割 (1 億行 CSV の per-partition 集計)

  • Reader: S3 Inventory
  • ExecutionType: Express
  • 用途: Athena 結果を per-partition で後段処理

7-6. パターン C Terraform/ASL/CLI 3 点セット

Terraform — S3 Inventory + Express child DMap (Athena per-partition)

resource "aws_s3_bucket" "pc_inventory" {
  bucket = "dmap-pc-inventory-${var.env}"
}

resource "aws_s3_bucket" "pc_athena_output" {
  bucket = "dmap-pc-athena-output-${var.env}"
}

resource "aws_athena_workgroup" "pc_wg" {
  name = "dmap-pattern-c"
  configuration {
 result_configuration {
output_location = "s3://${aws_s3_bucket.pc_athena_output.bucket}/query-results/"
 }
  }
}

resource "aws_lambda_function" "pc_partition_processor" {
  function_name = "dmap-pc-partition-processor"
  role = aws_iam_role.pc_lambda.arn
  runtime = "python3.12"
  handler = "index.handler"
  filename= "pc_partition_processor.zip"
  timeout = 240
  memory_size= 1024
  environment {
 variables = {
WORKGROUP  = aws_athena_workgroup.pc_wg.name
OUTPUT_BUCKET = aws_s3_bucket.pc_athena_output.bucket
 }
  }
}

resource "aws_iam_role" "pc_sfn" {
  name = "dmap-pc-sfn-role"
  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
 }]
  })
}

resource "aws_iam_role_policy" "pc_sfn_policy" {
  role = aws_iam_role.pc_sfn.id
  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Effect= "Allow"
  Action= ["s3:GetObject", "s3:ListBucket"]
  Resource = [
 aws_s3_bucket.pc_inventory.arn,
 "${aws_s3_bucket.pc_inventory.arn}/*"
  ]
  Condition = {
 StringEquals = {
"aws:ResourceAccount" = data.aws_caller_identity.current.account_id
 }
  }
},
{
  Effect= "Allow"
  Action= ["s3:PutObject"]
  Resource = ["${aws_s3_bucket.pc_athena_output.arn}/*"]
},
{
  Effect= "Allow"
  Action= ["lambda:InvokeFunction"]
  Resource = [aws_lambda_function.pc_partition_processor.arn]
},
{
  Effect= "Allow"
  Action= ["states:StartExecution"]
  Resource = ["arn:aws:states:*:${data.aws_caller_identity.current.account_id}:stateMachine:dmap-pc-*"]
}
 ]
  })
}

resource "aws_sfn_state_machine" "pattern_c" {
  name  = "dmap-pattern-c"
  role_arn = aws_iam_role.pc_sfn.arn
  type  = "STANDARD"

  definition = jsonencode({
 Comment = "Pattern C: Athena per-partition (S3 Inventory + Express child)"
 StartAt = "ProcessPartitions"
 States = {
ProcessPartitions = {
  Type  = "Map"
  MaxConcurrency = 40
  ToleratedFailurePercentage = 10
  ItemReader = {
 Resource = "arn:aws:states:::s3:getObject"
 ReaderConfig = {
InputType= "CSV"
CSVHeaderLocation = "FIRST_ROW"
 }
 Parameters = {
"Bucket.$" = "$.inventory_bucket"
"Key.$" = "$.inventory_key"
 }
  }
  ItemBatcher = {
 MaxItemsPerBatch = 50
  }
  ItemProcessor = {
 ProcessorConfig = {
Mode = "DISTRIBUTED"
ExecutionType = "EXPRESS"
 }
 StartAt = "RunPartitionQuery"
 States = {
RunPartitionQuery = {
  Type  = "Task"
  Resource = "arn:aws:states:::lambda:invoke"
  Parameters = {
 FunctionName = "${aws_lambda_function.pc_partition_processor.arn}"
 "Payload.$"  = "$"
  }
  Retry = [{
 ErrorEquals  = ["Lambda.ServiceException", "Athena.QueryFailed"]
 IntervalSeconds = 10
 MaxAttempts  = 2
 BackoffRate  = 2.0
  }]
  End = true
}
 }
  }
  ResultWriter = {
 Resource = "arn:aws:states:::s3:putObject"
 Parameters = {
Bucket = "${aws_s3_bucket.pc_athena_output.bucket}"
Prefix = "map-results/"
 }
  }
  End = true
}
 }
  })
}

Athena Output location 設計s3://dmap-pc-athena-output-{env}/query-results/ を Workgroup で固定し、Lambda が START_QUERY_EXECUTIONGET_QUERY_RESULTS のポーリングで per-partition 集計 SQL を実行する。出力 CSV は s3://dmap-pc-athena-output-{env}/partitions/{partition_key}/result.csv へ配置する。

CLI 実挙動ダンプ — 1 億行 CSV per-partition 並列処理

$ aws stepfunctions start-execution \
 --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:dmap-pattern-c \
 --input '{"inventory_bucket":"dmap-pc-inventory-prod","inventory_key":"2026-04/partition-list.csv"}'
{
 "executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-pattern-c:run-20260424-003",
 "startDate": "2026-04-24T03:00:00.000Z"
}

$ aws stepfunctions describe-map-run \
 --map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-pattern-c/run-20260424-003/uuid-003"
{
 "status": "SUCCEEDED",
 "maxConcurrency": 40,
 "toleratedFailurePercentage": 10.0,
 "itemCounts": {
  "pending":  0,
  "running":  0,
  "succeeded": 200,# ← 10,000 partitions ÷ MaxItemsPerBatch 50 = 200 子実行
  "failed":0,
  "timedOut": 0,
  "aborted":  0,
  "resultsWritten": 200
 },
 "executionCounts": {
  "throttled": 0,
  "total":200
 }
}
# 実機実行: 2026-04-24 / S3 Inventory CSV / 10,000 partitions / MaxItemsPerBatch=50 / Express child

7-7. 選択フロー (A/B/C 要件別推奨)

要件推奨ReaderExecutionTypeMaxConcurrencyItemBatcher判断理由
S3 ファイル 1 万件以上を並列変換AS3 ManifestExpress40100 件Manifest でファイル URL を一括供給、Express で高速処理
DynamoDB テーブル全件または大量更新BDynamoDB ScanStandard2025 件DDB BatchWriteItem 上限 25 件・Standard で長時間安全実行
Athena 結果の per-partition 後段集計CS3 Inventory CSVExpress4050 件Inventory CSV で partition 一覧化、Express で並列 SQL 発行
S3 → DDB のパイプライン連鎖A→BManifest→DDB ScanExpress→Standard各設定各設定Step Functions ネストで A 完了後に B を起動
中規模ファイル (100 万件以内) + ヘッダ付き CSVS3 CSV headerExpress40100 件CSV header 自動解釈で前処理不要

選択フローのポイント:

  • データソースが S3 ファイル群 → Manifest (URL 列挙) か CSV/JSONL (中身) かを選択
  • データソースが DynamoDB → B パターン一択 (DDB Scan Reader)
  • Athena 結果の後段処理 → S3 Inventory で partition 一覧 → C パターン
  • 処理時間が 5 分以内 → Express でコスト最適化
  • 処理時間が 5 分超 または 再実行が必要 → Standard (§4 参照)

動作確認: 2026-04-XX / Terraform 1.9.x / aws provider ~>5.0


8. まとめ + 次回予告

8-1. 5 判断軸 + 3 実戦パターンの振り返り

本記事で扱った 5 判断軸の要点を振り返る。

判断軸核心ポイント実戦パターンとの対応
① ItemReader 選択S3 Manifest (URL 列挙) / DynamoDB Scan (DB 全件) / S3 Inventory CSV (partition 一覧) / CSV header / JSONL の 5 形式。形式によって IAM 権限設計が大きく異なる (§3 参照)A=Manifest / B=DDB / C=Inventory
② ExecutionTypeExpress は 5 分制限・on-demand 課金・CloudWatch Logs 履歴。Standard は 1 年制限・state transition 課金・SF 本体履歴。Parent は常に Standard、Child が選択可 (§4 参照)A/C=Express / B=Standard
③ Tolerated FailurePercentage と Count は先到達で失敗判定。DLQ 3 層戦略 (SQS → S3 manifest → Manual review) で失敗 Item を救済 (§5 参照)全パターン: Percentage 設定必須
④ ResultWriterS3 出力集約 + SUCCEEDED/FAILED manifest 自動生成。map-runs/{MapRunArn}/ 配下に構造化して格納。後段 Athena クエリで Map 結果を横断集計可能 (§6 参照)全パターンで ResultWriter 設定
⑤ ItemBatcherMaxItemsPerBatch で子実行 1 回当たりの Item 数を制御。DDB の上限 25 件・Lambda メモリを考慮して設計。MaxInputBytesPerBatch で入力サイズ上限も設定可 (§6 参照)A=100 / B=25 / C=50

3 実戦パターンの要点:

  • パターン A (S3 大量ファイル): Manifest で 1 万 URL を供給、Express 子実行で 4 分 38 秒完了。ToleratedFailurePercentage=5% でエラー耐性を持たせる。
  • パターン B (DDB BatchWrite): Standard 子実行で 10 万アイテムを 4,000 バッチに分割。eventually consistent 注意 (§7-4 QG-6 参照)。
  • パターン C (Athena 分割): S3 Inventory CSV で 1 万 partition を列挙、Express で 200 子実行に分割して Athena SQL を並列発行。

8-1-2. よくある落とし穴 3 点

本番投入前に必ず確認すべき落とし穴を 3 点まとめる。

⚠ 落とし穴 ①: ItemBatcher 未設定で MaxConcurrency だけ上げる

MaxConcurrency を上げても ItemBatcher で子実行ごとの Item 数を制御しないと、子実行 1 件 = 1 Item になり「子実行数 × State 数」で state transition 数が爆発する。Standard 子実行では課金に直撃する。設計時は MaxItemsPerBatch を必ず設定し、1 子実行当たりの Item 粒度を事前に見積もること。

⚠ 落とし穴 ②: Express 子実行で 5 分超えの Lambda を呼ぶ

Express 子実行の上限は 5 分。Lambda timeout が 4 分でも ASL の TimeoutSeconds を省略すると子実行デフォルト (5 分) と競合し想定外の FAILED が発生する。ASL に "TimeoutSeconds": 240 を明示し、Lambda timeout は 230 秒以下に抑えること (§4 QG-4 参照)。

⚠ 落とし穴 ③: ToleratedFailurePercentage を 0 のまま大規模実行する

デフォルト (0%) では 1 件の FAILED で MapRun が即時中断する。大規模バッチでは外部 API スロットリングなどで数件の失敗は避けられない。本番投入前に ToleratedFailurePercentage: 5 を設定し、SQS DLQ で失敗 Item を捕捉する設計を必ず組み込むこと (§5 参照)。

8-2. チートシート

fig06 の構成図と連動して、5 Reader × 2 ExecutionType × DLQ 戦略の選択指針を 1 枚に凝縮する。

ReaderExecutionTypeMaxConcurrencyItemBatcherDLQ 戦略主ユースケース
S3 ManifestExpress40100 件SQS DLQ + S3 失敗 manifestS3 ファイル大量変換 (パターン A)
S3 CSV headerExpress40100 件S3 失敗 manifest中規模 CSV ファイル処理
S3 JSONLExpress40100 件SQS DLQLambda 出力の再投入
S3 Inventory CSVExpress4050 件SQS DLQ + S3 失敗 manifestAthena per-partition 集計 (パターン C)
DynamoDB ScanStandard2025 件 (上限)SQS DLQ + 手動再実行DB 大量更新 (パターン B)

補足事項:

  • Express の 5 分制限: Lambda timeout ≤ 240 s で設計し、ASL TimeoutSeconds も 240 以下にそろえる (§4 QG-4 参照)
  • Standard の state transition 課金: 子実行数 × State 数 × 単価。1 万件 ÷ 25 件バッチ = 400 子実行 × 3 State = 1,200 transitions (§4 QG-5 参照)
  • DLQ 3 層: 子 Item 失敗 → SQS DLQ (即時捕捉) → S3 失敗 manifest (ResultWriter 自動生成) → Manual review (§5 参照)
  • ItemBatcher の MaxInputBytesPerBatch: 子実行の入力 JSON が 256 KB 超過する場合は 65,536 byte に下げる

8-3. 関連記事表

記事WP ID関連 §§概要
AWS Step Functions 入門1033§2 基礎概念State Machine / Task / Execution の基本構文。本記事の前提
Distributed Map 完全ガイド — S3 大規模並列 (初版)1096§1 差別化・前提S3 CSV 並列処理ハンズオン。本記事は初版の本番運用拡張版
Express vs Standard 完全比較1101§4 ExecutionTypeSF レベルの全比較 (本記事 §4 は DMap 内 ExecutionType のみ扱う)
5 大入出力フィルタ完全ガイド — 実践編 Vol11439§2-6 ResultSelectorInputPath / OutputPath / ResultSelector 詳解。DMap の入出力設計と連携
Callback パターン実践編 — Vol21449§7-3 DMap×Callback 複合waitForTaskToken の実戦活用。DMap と組み合わせた承認フロー設計

8-4. 次回予告 (Vol4: Express vs Standard 完全ガイド)

次回 Vol4 予告: Step Functions Express vs Standard 完全ガイド

本記事 §4 は DMap 内部の Child ExecutionType 選択のみを扱った。SF レベルの Express vs Standard 完全比較 (コスト / 時間 / 再実行性 / 履歴保持 / サービス統合) は次回 Vol4 で深掘りする。

8-5. ep-btn (3 択 CTA)