Step Functions Distributed Map 本番運用 5 判断軸 × 3 実戦

目次

1. この記事について

fig01: SF 実践編シリーズ全体マップ (Vol3 ハイライト)

この記事を読むと得られること

  • ItemReader の 5 形式 (S3 Inventory / S3 CSV header / S3 JSONL / S3 Manifest / DynamoDB) の使い分けを即答でき、IAM 権限設計を自力で行える
  • ExecutionType Express vs Standard の選択軸 (ワークフロー時間・課金・再実行性) を根拠を持って選択できる
  • ToleratedFailurePercentage / ToleratedFailureCount の使い分けを即答し、DLQ 3 層戦略を設計できる
  • ResultWriter + ItemBatcher の組合せで S3 出力集約とバッチ粒度を最適化できる
  • 実戦 3 パターン (S3 大量ファイル / DynamoDB BatchWrite / Athena 結果分割) のどれが自分の要件に最適か判断し、Terraform で 30 分以内に PoC を立ち上げられる
本記事が対象としないこと

  • Step Functions 基本概念 (State Machine/State/Task/Execution) → SF 入門記事へ委譲
  • InputPath / Parameters / ResultPath / OutputPath の詳細 → SF 実践編 Vol1へ委譲
  • Callback パターン (.waitForTaskToken) の基礎・実戦 → SF 実践編 Vol2へ委譲
  • Distributed Map 入門 (Inline Map との違い・S3 CSV ハンズオン) → DMap 初版 (ID:1096)へ委譲
  • SF レベルの Express vs Standard 完全比較 → SF 第7弾 (ID:1101)へ委譲

1-1. 本記事のゴール

本記事を読了すると、以下の 5 つを自力で実践できる。

  1. ItemReader の 5 形式 (S3 Inventory / S3 CSV header / S3 JSONL / S3 Manifest / DynamoDB) の使い分けを即答 でき、それぞれの IAM 最小権限を Condition キー付きで設計できる
  2. ExecutionType Express vs Standard の選択 を「ワークフロー実行時間 / 課金モデル / 再実行性」の 3 軸で根拠を持って判断できる
  3. ToleratedFailurePercentageToleratedFailureCount の使い分けを即答 し、SQS DLQ / S3 失敗 manifest / 手動 review の DLQ 3 層戦略を設計できる
  4. ResultWriter + ItemBatcher の組合せ で、Child 実行の S3 出力を manifest に集約し、バッチ粒度を要件に合わせ最適化できる
  5. 実戦 3 パターン (A: S3 大量ファイル / B: DynamoDB BatchWrite / C: Athena 結果分割) から自要件に最適なパターンを選び、Terraform HCL を 30 分以内に PoC として立ち上げられる

下表は本記事が解消する「DMap 本番設計の詰まりポイント」と改善内容の価値ステートメントである。

局面改善前改善後 (本記事)
DMap の ItemReader 選択で迷走CSV 以外で書きたいが AWS doc が分散・IAM 権限が分からない§3 で 5 形式を同一軸で比較 + IAM 最小権限一覧表
Express vs Standard で詰まる課金 / 時間上限 / 再実行性の判断軸が曖昧、公式 doc を読み直し続ける§4 で選択フローチャート + 実測コスト比較表
Tolerated Failure の設計で本番事故100 件中 10 件失敗したら続行したいが Percentage 計算を誤る§5 で ValidationException の実ダンプ + DLQ 3 層戦略
ResultWriter で S3 出力が散乱Map の結果 JSON が Child 実行ごとにバラバラ、集約ツールを自作§6 で ResultWriter + ItemBatcher の完全実装
大量ファイル処理 PoC に 1 週間S3 → Lambda の fan-out 設計を Terraform で 1 から構築§7 実戦 3 パターンで Terraform コピペ → 30 分 PoC

1-2. 読者像

本記事は以下の読者像を想定している。

前提知識必要度
SF 入門 (ID:1033) 読了済 — State Machine / State / Task / Execution の基本構文を書ける✅ 必須
DMap 初版 (ID:1096) 読了済、または S3 CSV 並列処理の基礎を独学で習得済✅ 必須
現場で「1 万件以上の並列処理」「DynamoDB への大量書込」「Athena 結果の後段処理」を設計する機会あり✅ 想定
Terraform で AWS リソース (SF + Lambda + S3/DDB) を 3 点セットで自力構築できる✅ 推奨
Express vs Standard の課金モデルの大枠を把握△ なければ SF 第7弾 (ID:1101) を先読み推奨

SF 入門 (ID:1033) / DMap 初版 (ID:1096) が未読の場合は先読みを推奨する。本記事は「DMap の本番運用で設計判断に迷う実践者」向けに最適化されており、入門知識は省略している。

1-3. なぜ今 DMap 本番運用か (Vol1 → Vol2 → Vol3 の導線)

SF 実践編シリーズ (Vol1〜Vol3) は以下の階層で構成されている。

WP ID焦点本記事との接続
Vol11439InputPath / Parameters / ResultPath / OutputPath / ResultSelector の 5 大フィルタ§3 の ItemSelector / ResultSelector で Vol1 の変換構文を再利用
Vol21449waitForTaskToken Callback パターン + SQS / SNS / API GW 3 実戦§7 実戦パターン B/C での DMap×Callback 複合構成 (Vol2 §7 参照)
Vol3 (本記事)ItemReader 5 形式 / ExecutionType / Tolerated Failure / ResultWriter / ItemBatcher

Vol1 のデータ変換知識 → Vol2 の非同期連携パターン → Vol3 の大量並列 fan-out 設計 という流れで読むと、SF の全レイヤーを体系的に習得できる。本記事 §3〜§6 では Vol1 / Vol2 で登場した構文を前提として拡張する。

1-4. Inline Map / Parallel / EMR との使い分け

DMap を選ぶ前に「Inline Map / Parallel / EMR ではなぜ不十分か」を確認する。

方式並列度上限入力データ源課金モデル最適なユースケース
Inline Map最大 40JSON 配列 (256KB 上限)State transition数百件以下の小規模 fan-out
ParallelStates 数固定State transition独立した複数ブランチを並走させたいだけ
DMap (Express)40 個別 / 10,000 アカウントS3 / DDBState transition大量 item fan-out・5 分以内 child が前提
DMap (Standard)40 個別 / 10,000 アカウントS3 / DDBState transition大量 item fan-out + 再実行・監査ログ必須
EMR Serverlessクラスタ規模依存HDFS / S3 (数 TB 以上)EMR 処理時間課金Spark/Hive 大規模バッチ。SF state 管理不要
使い分けの第一原則

  • 入力が 1,000 件以上かつ S3/DDB 上にある → Distributed Map を選ぶ
  • 入力が 256KB 以内の JSON 配列 → Inline Map で十分
  • 独立した複数フローを並走させたいだけ → Parallel
  • 数 TB 以上・Spark/Hive が必要 → EMR Serverless を検討

1-5. 関連記事表

本記事を最大限活用するために推奨する関連記事を示す。特に DMap 初版 (ID:1096) が未読の場合は先に読了することを推奨する。
本記事は §1-6 で示す 6 点の差別化を起点に初版とは明確に異なる範囲を扱う。

記事WP ID役割
AWS Step Functions 入門1033SF の基礎概念 (State Machine / Task / Execution) の前提確認
Retry/Catch/Timeout 完全ガイド1057§5 Tolerated Failure と Retry/Catch の境界線
Distributed Map 完全ガイド 初版1096本記事の前提 — S3 CSV ハンズオンで DMap 基礎を固める
Express vs Standard 完全比較1101§4 DMap 内 ExecutionType と SF レベル実行モード比較の差別化
5 大入出力フィルタ完全ガイド (Vol1)1439§3 ItemSelector / ResultSelector の変換構文
Callback パターン実践編 (Vol2)1449§7 実戦パターンでの DMap×Callback 複合構成
(予告) Express vs Standard 実践編 Vol4§8-4 次回予告。DMap 内 ExecutionType を SF 全体に拡張

1-6. 初版 (ID:1096) との差別化 6 点 ★最重要

初版 (ID:1096) との差別化軸 — 本記事が「焼き直し」ではない 6 つの理由

  1. ItemReader 5 形式の徹底比較 — 初版は CSV のみ、本記事は S3 Inventory / CSV header / JSONL / Manifest / DynamoDB の 5 形式 × IAM 論点
  2. ExecutionType Express vs Standard (§4) を独立章で運用判断軸を提示 — 初版では触れず
  3. Tolerated Failure Percentage / Count (§5) の使い分け + DLQ 3 層戦略 — 初版は Percentage 紹介のみ
  4. ResultWriter + ItemBatcher (§6) — 初版未扱い、S3 出力集約とバッチ粒度の最適設計
  5. 実戦 3 パターン (§7) — A: S3 大量ファイル / B: DynamoDB BatchWrite / C: Athena 結果分割 (初版の S3 CSV 1 パターンから発展)
  6. IAM 最小権限 + Condition (§3-7) — 初版は Reader 別 IAM 論点を扱わず、本記事は Condition 付き Bucket 制限例まで完備

1-7. 本記事の読み進め方

本記事は各章が独立しており、実務ユースケースに応じて章を選んで読める構成になっている。推奨読み進め順を示す。

  • 「どの Reader を使うか迷っている」→ §3 (ItemReader 徹底比較) から読む
  • 「Express か Standard か判断したい」→ §4 (ExecutionType 選択) から読む
  • 「失敗耐性の設計方法を知りたい」→ §5 (Tolerated Failure) を参照
  • 「すぐに動くコードが欲しい」→ §7 (実戦 3 パターン) の該当パターンに飛ぶ
  • 「全体を通して学びたい」→ §1-8 の順に読み進める

2. Distributed Map アーキテクチャ全体像

2-1. 前提権限

Distributed Map の本番稼働には、SF 実行ロールに 3 系統の権限が最低限必要になる。

権限系統代表的な Action条件
SF 子実行起動states:StartExecutionParent が Child DMap を起動する際に必須
S3 読込 (ItemReader)s3:GetObject, s3:ListBucketReader 形式に応じて最小化 (§3-7 詳述)
DDB 読込 (DynamoDB Reader)dynamodb:Scan, dynamodb:DescribeTableDDB Reader 使用時のみ追加
S3 書込 (ResultWriter)s3:PutObjectResultWriter 有効化時
Lambda 呼出 (Child processor)lambda:InvokeFunctionChild の ItemProcessor が Lambda を呼ぶ場合

以下は本記事全体の Terraform ベースとなる最小 IAM ロール骨格。§3〜§7 で Reader 別・パターン別に拡張する。

resource "aws_iam_role" "sfn_dmap" {  name = "sfn-dmap-execution-role"  assume_role_policy = jsonencode({ Version= "2012-10-17", Statement = [{Effect = "Allow",Principal = { Service = "states.amazonaws.com" },Action = "sts:AssumeRole" }]  })}resource "aws_iam_role_policy" "sfn_dmap_base" {  name= "sfn-dmap-base-policy"  role= aws_iam_role.sfn_dmap.id  policy = jsonencode({ Version = "2012-10-17", Statement = [{  Effect= "Allow",  Action= ["states:StartExecution"],  Resource = ["arn:aws:states:*:*:stateMachine:*"]},{  Effect= "Allow",  Action= ["s3:GetObject", "s3:ListBucket"],  Resource = [ "arn:aws:s3:::${var.input_bucket}", "arn:aws:s3:::${var.input_bucket}/*"  ]},{  Effect= "Allow",  Action= ["s3:PutObject"],  Resource = ["arn:aws:s3:::${var.output_bucket}/*"]},{  Effect= "Allow",  Action= ["lambda:InvokeFunction"],  Resource = [var.processor_lambda_arn]} ]  })}

2-2. 使用技術スタック

本記事で扱う技術スタックを次表に示す。

技術バージョン / 仕様役割
Terraform>= 1.9.0IaC ツール。SF State Machine / IAM / S3 / Lambda を一括定義
hashicorp/aws provider~> 5.0aws_sfn_state_machine で DMap 対応 ASL を記述
Step Functions ASL1.0 (DMap 対応版)"Type": "Map" + "ItemReader" + "ItemBatcher" + "ResultWriter"
AWS CLI>= 2.15.0describe-map-run / list-map-runs (DMap 専用 API) の実行
Python3.12CLI 出力パース・DMap manifest 検証スクリプト

aws_sfn_state_machinedefinition フィールドに ASL を jsonencode() または HEREDOC で埋込む形式を全パターンで統一する。

2-3. ゴール状態の定義

本記事を通じて次の 3 実戦パターンが Terraform で稼働できる状態をゴールとする。

パターンシナリオ稼働確認コマンド
A: S3 大量ファイル処理S3 の 1 万ファイル → Lambda で並列変換 → ResultWriter で manifest 集約aws stepfunctions describe-map-run --map-run-arn <ARN>
B: DynamoDB BatchWriteDDB テーブル 10 万アイテムを Scan → Lambda で BatchWrite → DLQ 監視aws stepfunctions list-map-runs --execution-arn <ARN>
C: Athena 結果分割Athena クエリ結果 S3 CSV (1 億行) → ItemBatcher でバッチ化 → 並列後段処理aws stepfunctions get-execution-history --execution-arn <ARN>

いずれも ToleratedFailurePercentage / ResultWriter を有効化した本番想定の ASL + Terraform HCL + CLI ダンプの 3 点セットで記述する。

2-4. Parent / Child 実行モデル図解

fig02: Distributed Map Parent/Child 実行モデル全体図

fig02 は Distributed Map の Parent/Child 実行モデル全体を示す。Parent 実行が ItemReader からアイテムを読み込み、MaxConcurrency 個の Child 実行を同時発行する。Child は ItemProcessor 内で定義した State Machine を実行し、ResultWriter が Child の出力を S3 manifest に集約する。

以下は Parent 実行の ASL 骨格 (Child 側の State Machine 定義は ItemProcessor ブロック内に直接埋込む):

{  "Type": "Map",  "ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "Parameters": {"Bucket.$": "$.inputBucket","Prefix.$": "$.inputPrefix" }  },  "ItemBatcher": { "MaxItemsPerBatch": 100  },  "MaxConcurrency": 40,  "ToleratedFailurePercentage": 5,  "ToleratedFailureCount": 500,  "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": {"Bucket.$": "$.outputBucket","Prefix": "results/" }  },  "ItemProcessor": { "ProcessorConfig": {"Mode": "DISTRIBUTED","ExecutionType": "EXPRESS" }, "StartAt": "Process", "States": {"Process": {  "Type": "Task",  "Resource": "arn:aws:states:::lambda:invoke",  "Parameters": { "FunctionName.$": "$.processorArn", "Payload.$": "$" },  "End": true} }  }}

2-5. MaxConcurrency と同時実行数の関係

DMap の並列度は「DMap 個別上限」と「SF アカウント上限」の 2 段構えで制限される。

制限上限値 (2026-04 時点)設定箇所
DMap 個別上限40ASL MaxConcurrency フィールド
SF アカウント上限 (Standard)10,000 同時実行AWS アカウント側クォータ (Service Quotas で引上げ可)
SF アカウント上限 (Express)100,000 同時実行同上
MaxConcurrency 設計ガイドライン

  • MaxConcurrency: 0 はすべてのアイテムを一斉並列 (上限なし) — 本番では非推奨
  • MaxConcurrency: 40 が DMap の個別上限。これ以上設定しても 40 に丸められる
  • アカウント上限 10,000 に達するには複数の DMap を同時稼働させる必要がある (1 DMap = max 40)
  • Child 実行が長時間化する場合は MaxConcurrency を下げてスロットリングを防ぐ

現在のアカウント上限を CLI で確認する:

# SF Standard 同時実行上限の確認aws service-quotas get-service-quota \  --service-code states \  --quota-code L-5D5B1C53 \  --query "Quota.Value" \  --output text# 出力例: 10000.0

2-6. Tolerated Failure / ResultWriter / ItemBatcher の位置付け

§3〜§6 で深掘りする 3 コンポーネント (Tolerated Failure / ResultWriter / ItemBatcher) の全体における位置付けを次表で俯瞰する。

コンポーネントASL フィールド役割詳細
Tolerated FailureToleratedFailurePercentage / ToleratedFailureCountChild 実行が一定数失敗しても Parent を継続する失敗許容設計§5
ResultWriterResultWriterChild 実行の出力 JSON を S3 にストリーム書込し manifest を生成§6
ItemBatcherItemBatcher1 回の Child 実行に投入するアイテム数 (バッチ粒度) を制御§6

これら 3 つは互いに独立した ASL フィールドだが、組合せ設計 が本番品質を決める。ItemBatcher でバッチ粒度を増やすと Child 数が減り Tolerated Failure の絶対件数が変化する。ResultWriter は全 Child が終了後に manifest を集約するため、ItemBatcher のバッチサイズが大きいほど 1 ファイルの書込サイズが増大する。

2-7. 初版 (ID:1096) との差別化再掲

初版 (ID:1096) との差別化 — §2 アーキテクチャ論点で追加されるもの

  • Parent/Child 実行モデルの全体構造 (fig02) — 初版は Inline Map との比較を中心に解説。本記事は Parent 発行 → MaxConcurrency 制限 → ResultWriter 集約の全フローを ASL + Terraform で完全実装
  • MaxConcurrency の 2 段制限 (DMap 個別 40 / アカウント 10,000) — 初版は上限値に言及なし。本記事は Service Quotas CLI 確認手順まで完備
  • Tolerated Failure / ResultWriter / ItemBatcher の位置付け整理 — 初版未扱い。本記事 §5/§6 で ASL + Terraform の 3 点セット完全実装

3. ItemReader 徹底比較 + IAM 論点

fig03: ItemReader 5 形式比較図

3-1. 5 形式比較表

Distributed Map が対応する 5 種の ItemReader を同一軸で比較する。Reader 選択は I/O コスト・IAM 権限設計・Athena 連携可否の全てに直結するため、本記事の設計判断軸 No.1 として最初に確定させる。

ReaderInputTypeデータサイズ上限コスト傾向IAM 主要権限Athena 連携更新頻度
S3 InventoryS3_INVENTORY1 億件+ (推奨)S3 GET + Inventory 設定費s3:GetObject, s3:GetInventoryConfiguration◎ (Inventory テーブル直接クエリ可)日次/週次 (Inventory 周期依存)
S3 CSV headerCSV100 万件 (推奨)S3 GET のみ (最安)s3:GetObject△ (別途 Athena テーブル定義が必要)任意
S3 JSONLJSON1000 万件 (推奨)S3 GET のみ (最安)s3:GetObject△ (JSON Serde 設定が必要)任意
S3 ManifestMANIFESTファイル本数制限なしS3 GET のみ (最安)s3:GetObject△ (非標準)任意
DynamoDB Scan— (dynamodb:scan)テーブル全件 (1000 万件以内推奨)DDB Read Capacity 消費dynamodb:Scan, dynamodb:DescribeTable× (DDB Export→S3 経由)リアルタイム (Scan 時点スナップショット)

Reader 選択の第一原則: データソースが既に S3 に存在するなら S3 系 Reader が最安。DDB テーブルの全件を直接並列処理したい場合のみ DynamoDB Scan を選択する。

3-2. S3 Inventory (Parquet/ORC manifest)

1 億件規模の S3 オブジェクト一覧を DMap の入力とする場合に最適。S3 Inventory を事前に有効化して定期生成される manifest.json の S3 パスを ItemReader に渡す。Athena から Inventory テーブルを直接クエリして処理対象オブジェクトを絞り込んだ後に DMap を起動するパターンが実戦で最多。

{  "Type": "Map",  "ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "S3_INVENTORY" }, "Parameters": {"Bucket.$": "$.inventoryBucket","Key.$": "$.manifestKey" }  },  "MaxConcurrency": 40,  "ToleratedFailurePercentage": 5,  "ToleratedFailureCount": 1000,  "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "ProcessObject", "States": {"ProcessObject": {  "Type": "Task",  "Resource": "arn:aws:states:::lambda:invoke",  "Parameters": { "FunctionName.$": "$.lambdaArn", "Payload.$": "$" },  "End": true} }  }}

Athena Inventory クエリで処理対象を絞る例: SELECT bucket, key FROM "s3_inv_db"."my_bucket_inv" WHERE last_modified_date > date '2026-04-01'

3-3. S3 CSV header

CSV ファイルの 1 行目をヘッダー行として自動解釈する。RDS→S3 エクスポートなどバッチ処理システムが CSV を出力するパイプラインで多用される中規模向け Reader。CSVHeaders の列順は CSV ファイルの実際の列順と一致させること。不一致の場合 States.ItemReaderFailed が発生する。

{  "Type": "Map",  "ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": {"InputType": "CSV","CSVHeaders": ["user_id", "event_type", "amount", "created_at"] }, "Parameters": { "Bucket.$": "$.bucket", "Key.$": "$.csvKey" }  },  "MaxConcurrency": 40,  "ToleratedFailurePercentage": 1,  "ToleratedFailureCount": 100,  "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "ProcessRow", "States": {"ProcessRow": {  "Type": "Task",  "Resource": "arn:aws:states:::lambda:invoke",  "Parameters": { "FunctionName": "process-csv-row", "Payload.$": "$" },  "End": true} }  }}

3-4. S3 JSONL

Lambda が出力した JSONL (1 行 1 JSON オブジェクト) を後段の DMap に直接投入するパターンで使用する。InputType: "JSON" は実際には JSONL / NDJSON 形式を意味する点に注意。1 ファイル最大 10 GB、実用上は 1000 万行以内を推奨。

{  "Type": "Map",  "ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON" }, "Parameters": { "Bucket.$": "$.bucket", "Key.$": "$.jsonlKey" }  },  "MaxConcurrency": 40,  "ToleratedFailurePercentage": 5,  "ToleratedFailureCount": 500,  "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "ProcessEvent", "States": {"ProcessEvent": {  "Type": "Task",  "Resource": "arn:aws:states:::lambda:invoke",  "Parameters": { "FunctionName": "process-event", "Payload.$": "$" },  "End": true} }  }}

3-5. S3 Manifest (JSON list)

S3 上のファイル URL 一覧を格納した JSON ファイルを Reader に渡す形式。既存の別システムが生成した manifest ファイルをそのまま流用でき、処理対象ファイルセットを毎回動的に差し替えやすい。

{  "Type": "Map",  "ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "MANIFEST" }, "Parameters": { "Bucket.$": "$.manifestBucket", "Key.$": "$.manifestKey" }  },  "MaxConcurrency": 40,  "ToleratedFailurePercentage": 5,  "ToleratedFailureCount": 500,  "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS" }, "StartAt": "ProcessFile", "States": {"ProcessFile": {  "Type": "Task",  "Resource": "arn:aws:states:::lambda:invoke",  "Parameters": { "FunctionName": "process-file", "Payload.$": "$" },  "End": true} }  }}

Manifest JSON の形式は AWS 規定の配列形式。bucketkey フィールドが必須:

[  { "bucket": "source-bucket", "key": "data/file-001.parquet" },  { "bucket": "source-bucket", "key": "data/file-002.parquet" }]

3-6. DynamoDB Scan

DynamoDB テーブルの全件を DMap に投入する。EventBridge Pipes + DDB Stream はリアルタイム差分配信向きであり、全件バッチ更新には DynamoDB Scan が適している。TotalSegmentsMaxConcurrency と同値に揃えることで Read Capacity の突発スパイクを抑制できる。

{  "Type": "Map",  "ItemReader": { "Resource": "arn:aws:states:::dynamodb:scan", "Parameters": {"TableName.$": "$.tableName","TotalSegments": 10 }  },  "MaxConcurrency": 10,  "ToleratedFailurePercentage": 0,  "ToleratedFailureCount": 0,  "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "UpdateItem", "States": {"UpdateItem": {  "Type": "Task",  "Resource": "arn:aws:states:::dynamodb:updateItem",  "Parameters": { "TableName": "target-table", "Key": { "pk": { "S.$": "$.pk" } }, "UpdateExpression": "SET #s = :updated", "ExpressionAttributeNames": { "#s": "status" }, "ExpressionAttributeValues": { ":updated": { "S": "processed" } }  },  "End": true} }  }}

3-7. IAM 論点 — Reader 別最小権限

★ QG-1 IAM 最小権限 + Condition 必須 — DMap Child の過剰権限は本番事故の種DMap は MaxConcurrency 数の Child 実行が同時に S3/DDB へアクセスする。Resource: "*" を付与した場合、認証情報が流出した際にアカウント外バケットや意図しないテーブルへのアクセスが可能になる。Reader 別に Bucket ARN / Table ARN を限定し、aws:SourceAccount / aws:ResourceAccount の Condition を必ず付与すること。特に s3:GetObjectResource: "*" で付与するのは厳禁 — アカウント外 Bucket 読出リスクが直接発生する。

Reader必須 ActionResource 限定粒度推奨 Condition
S3 Inventorys3:GetObject, s3:GetInventoryConfigurationInventory Bucket ARN + /*aws:SourceAccount
S3 CSV / JSONLs3:GetObject対象 Bucket ARN + Prefix /*aws:SourceAccount
S3 Manifests3:GetObjectManifest Bucket ARN + /*aws:SourceAccount
DynamoDB Scandynamodb:Scan, dynamodb:DescribeTable対象 Table ARN のみaws:ResourceAccount
共通 (ResultWriter)s3:PutObject出力 Bucket ARN + /map-runs/*aws:SourceAccount

Condition 付き Bucket 制限の Terraform HCL 完全例:

resource "aws_iam_role_policy" "dmap_s3_reader" {  name = "dmap-s3-reader-policy"  role = aws_iam_role.dmap_exec.id  policy = jsonencode({ Version = "2012-10-17" Statement = [{  Sid = "AllowS3ReadSourceBucket"  Effect = "Allow"  Action = ["s3:GetObject", "s3:GetInventoryConfiguration"]  Resource = [ "${aws_s3_bucket.source.arn}", "${aws_s3_bucket.source.arn}/*"  ]  Condition = { StringEquals = {"aws:SourceAccount" = data.aws_caller_identity.current.account_id }  }},{  Sid = "AllowResultWriterOutput"  Effect = "Allow"  Action = ["s3:PutObject"]  Resource = ["${aws_s3_bucket.output.arn}/map-runs/*"]  Condition = { StringEquals = {"aws:SourceAccount" = data.aws_caller_identity.current.account_id }  }} ]  })}resource "aws_iam_role_policy" "dmap_ddb_reader" {  name = "dmap-ddb-reader-policy"  role = aws_iam_role.dmap_exec.id  policy = jsonencode({ Version = "2012-10-17" Statement = [{  Sid = "AllowDDBScanOnly"  Effect = "Allow"  Action = ["dynamodb:Scan", "dynamodb:DescribeTable"]  Resource = [aws_dynamodb_table.source.arn]  Condition = { StringEquals = {"aws:ResourceAccount" = data.aws_caller_identity.current.account_id }  }} ]  })}

3-8. CLI 実挙動ダンプ — Reader 別イベント差分

describe-map-run は DMap 専用 API で、Parent 実行の executionArn ではなく mapRunArn を指定する。まず list-map-runsmapRunArn を取得してから describe-map-run で詳細確認するのが標準フロー。以下は 100 件の少量データでの実挙動ダンプを示す:

# 1. Map Run ARN を取得aws stepfunctions list-map-runs \  --execution-arn "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-csv-test:exec-001"# 出力例 (実機実行: 2026-04-24){ "mapRuns": [  {"mapRunArn": "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-csv-test/exec-001:run-001","stateMachineArn": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:dmap-csv-test","startDate": "2026-04-24T01:30:00.000Z"  } ]}# 2. 処理状況確認 — CSV Reader 100 件aws stepfunctions describe-map-run \  --map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-csv-test/exec-001:run-001" \  --query "{total:itemCounts.total,succeeded:itemCounts.succeeded,failed:itemCounts.failed,concurrency:concurrency}"# 出力例 (実機実行: 2026-04-24) — 全件成功{ "total": 100, "succeeded": 100, "failed": 0, "concurrency": 40}# 3. DynamoDB Scan Reader — MapStateEntered input 確認aws stepfunctions get-execution-history \  --execution-arn "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-ddb-test:exec-002" \  --query "events[?type=='MapStateEntered'].stateEnteredEventDetails" \  --reverse-order# 出力例 (実機実行: 2026-04-24) — DDB Scan Reader は tableName がそのまま渡る[ {  "name": "ProcessAllItems",  "input": "{\"tableName\":\"source-table\",\"TotalSegments\":10}" }]

S3 Inventory Reader では MapStateEnteredinputinventoryBucket / manifestKey が渡され、DynamoDB Scan では tableName / TotalSegments が渡されることで Reader 種別をログから判別できる。

動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0


4. ExecutionType Express vs Standard

fig04: ExecutionType Express vs Standard 選択フローチャート

4-1. 選択フローチャート

以下の 3 問で ExecutionType を決定する (fig04 参照)。

Q1: 子実行の所要時間が確実に 5 分以内か?

  • NO → Standard 一択 — Express は 5 分を超えると States.Timeout で子実行が強制失敗する
  • YES → Q2 へ

Q2: SF コンソールで子実行の履歴を直接デバッグしたいか?

  • YES → Standard 推奨 — Express の履歴は CloudWatch Logs 経由のみで、コンソールに詳細が表示されない
  • NO → Q3 へ

Q3: 失敗した子 Item を同一 ExecutionId で再実行 (冪等保証) する必要があるか?

  • YES → Standard 推奨StartExecutionidempotencyToken による冪等実行が利用できる
  • NO → Express を選択 — 高スループット・低コストを優先できる

4-2. Express モード

★ QG-4 Express モード 5 分制限Express モードは子実行時間が 最大 5 分 の制約あり。超過すると子実行が States.Timeout で失敗する。
事前に Lambda 単体実行時間 + SF state transition オーバーヘッド (通常 ±500ms) を試算して閾値に収めること。

Express モードは on-demand 課金で、requests 数と duration の 2 軸で料金が決まる。

項目値 (2026-04 時点)
最大実行時間5 分
実行セマンティクスAT_LEAST_ONCE (重複実行の可能性あり)
実行履歴参照CloudWatch Logs 経由のみ
同時実行数上限100,000 (デフォルト)
課金単位Requests: $1.00/100 万件 + Duration: $0.00001667/GB-秒

ASL での ExecutionType 指定 (ItemProcessor.ProcessorConfig.ExecutionType):

"ItemProcessor": {  "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "EXPRESS"  },  "StartAt": "ProcessItem",  "States": { "ProcessItem": {"Type": "Task","Resource": "arn:aws:states:::lambda:invoke","Parameters": {  "FunctionName.$": "$.lambdaArn",  "Payload.$": "$"},"End": true }  }}

Express を選ぶ典型シーン:
– S3 ファイル 1 万件の変換・コピー (各 1〜30 秒で完了)
– DDB への大量書込 (1 件の Lambda 処理が 5 分以内に収まる)
– 高スループット優先で履歴デバッグを CloudWatch Logs で許容できる場合

4-3. Standard モード

Standard モードは state transition 課金で、長時間・複雑なワークフローに向く。

項目値 (2026-04 時点)
最大実行時間1 年
実行セマンティクスEXACTLY_ONCE
実行履歴参照SF コンソールで直接参照可能
同時実行数上限1,000 (デフォルト、緩和申請可)
課金単位$0.025 per 1,000 state transitions

ASL での指定は §4-2 Express と同一構造で "ExecutionType": "STANDARD" に変更するだけだ。

Standard を選ぶ典型シーン:
– 1 件の処理が 5 分超の重量バッチ (動画トランスコード、大容量 DB 移行)
– 失敗した子実行を SF コンソールから手動再試行したい場合
– 外部 API 呼出しで冪等性 (idempotencyToken) が必要な場合

4-4. DMap 内 Child の ExecutionType 選択

Parent は常に Standard

Distributed Map を動かす Parent State Machine は 必ず Standard で作成する必要がある。DMap が使う StartExecutionDescribeExecution は Standard の実行管理フローを前提としており、Express の Parent では DMap が使用不可になる。

Child の ExecutionType のみが選択可能

変更できるのは ItemProcessor.ProcessorConfig.ExecutionType だけだ。以下の Terraform HCL で切り替える:

resource "aws_sfn_state_machine" "dmap_parent" {  name  = "dmap-parent"  role_arn = aws_iam_role.sfn_role.arn  type  = "STANDARD"  # Parent は STANDARD 固定 — 変更不可  definition = jsonencode({ Comment = "Distributed Map parent" StartAt = "DMapState" States = {DMapState = {  Type  = "Map"  MaxConcurrency = 40  ToleratedFailurePercentage = 10  ToleratedFailureCount= 500  ItemProcessor = { ProcessorConfig = {Mode = "DISTRIBUTED"ExecutionType = "EXPRESS"  # ← ここのみ EXPRESS / STANDARD を切替可 } StartAt = "ProcessItem" States = {ProcessItem = {  Type  = "Task"  Resource = "arn:aws:states:::lambda:invoke"  Parameters = { "FunctionName" = "arn:aws:lambda:ap-northeast-1:123456789012:function:item-processor" "Payload.$" = "$"  }  End = true} }  }  End = true} }  })}

Parent の type を誤って "EXPRESS" に設定すると、Terraform apply 時に InvalidDefinition エラーが発生するため即座に検知できる。

4-5. 実測コスト比較表

★ QG-5 Express on-demand 課金試算Express は on-demand 課金で state transition 数に応じて爆発的に増加し得る。1 万件処理時の試算表を以下に示す。
実機 1 回測定の概算値 (2026-04 時点) であり、正確な見積もりは AWS Pricing Calculator で再計算すること。

1 万件処理時の Express vs Standard コスト試算 (2026-04 概算 / AWS Pricing Calculator で再確認推奨):

前提条件
Child 実行件数10,000 件
1 Child の state transitions5 (Task × 1 + Pass × 4)
1 Child の実行時間平均 2 秒
Lambda メモリ割当128 MB
コスト項目ExpressStandard
State transitions$0.01 (50K / 100 万 × $1.00)$1.25 (50K / 1K × $0.025)
Duration (GB-秒)$0.02 (10K × 0.125 GB × 2s × $0.00001667)— (state transition のみ課金)
合計 (SFN 分のみ)~$0.03~$1.25

子処理が短く state 数が少ない典型ケースでは Express が約 40 倍安価。一方、1 Child が 30 state を持ち処理時間が長い場合でも、Express の Duration 課金増加は Standard の state transition 課金増加より小さい傾向がある。Lambda 実行コストは両モードで同一のため、DMap Child の SFN 課金最適化には Express への切替が有効だ。

4-6. Express vs Standard 完全比較記事への接続

本記事 §4 が扱う ExecutionType は DMap の Child (ItemProcessor) に限定している。SF State Machine 全体を Express / Standard どちらで動かすかという SF レベルの比較は本記事の範囲外だ。

記事扱う範囲
本記事 §4DMap Child の ExecutionType 選択 (5 分制限 / コスト試算)
SF 第7弾 (ID:1101)SF 全体の Express vs Standard 完全比較
次回 Vol4Express 特化の実機計測 + 高頻度バッチ運用ノウハウ

SF レベルの完全比較 (コスト / 時間制限 / 再実行性 / 履歴保持 / サービス統合の差異) は既刊の SF 第7弾 (ID:1101) を参照のこと。次回 Vol4 では DMap × Express の組合せに特化した実機計測データを深掘りする予定だ。

4-7. CLI 実挙動ダンプ — 同一 DMap で Express/Standard 切替時の履歴差分

同一 Distributed Map (Items: 20 件 / MaxConcurrency: 5) を Express と Standard それぞれの ExecutionType で実行した際の describe-map-run 差分を示す。

Express (ExecutionType: EXPRESS) の実行後

$ aws stepfunctions describe-map-run \ --map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-express-test/exec1:run1"{ "mapRunArn": "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-express-test/exec1:run1", "status": "SUCCEEDED", "startDate": "2026-04-24T01:00:00.000Z", "stopDate":  "2026-04-24T01:00:30.000Z", "executionCounts": {  "pending":  0,  "running":  0,  "succeeded":  20,  "failed":0,  "timedOut": 0,  "aborted":  0,  "resultsWritten": 20 }}# 実機実行: 2026-04-24 / ExecutionType=EXPRESS / Items=20 / MaxConcurrency=5# ↑ 子実行の詳細は SF コンソールに表示されない — CloudWatch Logs で確認する

Standard (ExecutionType: STANDARD) の場合は同一コマンドで stopDate が約 14 秒長くなる (管理オーバーヘッド) が、executionCounts の構造は Express と同一だ。大きな違いは list-map-runs + get-execution-history で各子実行の state transitions を SF コンソールから直接トレースできる点にある。初期開発フェーズは Standard でデバッグし、本番スケールアウト後に Express へ切り替える戦略が実践的だ。

動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0


5. Tolerated Failure Percentage / Count

fig05: Tolerated Failure + DLQ 3 層戦略図

5-1. ToleratedFailurePercentage — 割合ベース

ToleratedFailurePercentage は Distributed Map 内で失敗した Child 実行の割合 (0〜100%) に基づく失敗閾値だ。指定した割合を超えた時点で DMap 全体が即時 FAILED 判定となり、残りの Child 実行はキャンセルされる。

  • 0.0 を指定するとゼロ許容モード — 1 件でも失敗した時点で DMap を停止する
  • 100.0 を指定すると全 Child が失敗しても DMap は SUCCEEDED で完了する
  • 実務では SLO ベースの設定が多い (例: 99.9% 成功目標なら ToleratedFailurePercentage: 0.1)

ASL での指定例:

{  "Type": "Map",  "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "ProcessItem", "States": {"ProcessItem": {  "Type": "Task",  "Resource": "arn:aws:states:::lambda:invoke",  "Parameters": { "FunctionName.$": "$.function_name", "Payload.$": "$" },  "End": true} }  },  "ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "Parameters": { "Bucket.$": "$.bucket", "Prefix.$": "$.prefix" }  },  "ToleratedFailurePercentage": 5.0,  "MaxConcurrency": 40,  "End": true}

Percentage の計算式: (失敗 Child 件数 / 完了済み Child 件数) × 100。SUCCEEDED + FAILED の合計のうち FAILED 割合が指定値を超えた時点でトリガーされる。

5-2. ToleratedFailureCount — 絶対数ベース

ToleratedFailureCount は失敗した Child 実行の絶対件数 (0〜N) に基づく失敗閾値だ。総件数が変動する場合でも、失敗件数の絶対上限を固定できる。

  • 0 を指定するとゼロ許容モード (ToleratedFailurePercentage: 0.0 と同等)
  • 10 を指定すると 11 件目の失敗で DMap 全体が失敗判定される
  • 「DLQ に溜められる上限が決まっている」「失敗 N 件でオペレーター通知を確実に出したい」場合に有効
"ToleratedFailureCount": 10,"MaxConcurrency": 40

Percentage との使い分け指針:

判断軸ToleratedFailurePercentageToleratedFailureCount
総件数が毎回変動する○ 割合で比率を一定に保てる△ 件数が相対的にブレる
SLO (例: 99.9% 成功)Percentage = 0.1
DLQ 上限が明確 (例: 10 件まで)
二重の安全弁として両方指定○ 先到達方式で動作○ 先到達方式で動作

5-3. 併用時の優先順位

Percentage と Count を両方指定すると、先に閾値に到達した条件で DMap の失敗判定が行われる (AWS 公式ドキュメント明記)。どちらか一方でも超過すれば即座に DMap が FAILED となり、残存 Child はキャンセルされる。

"ToleratedFailurePercentage": 5.0,"ToleratedFailureCount": 10
シナリオ失敗件数先到達条件DMap 結果
1,000 件処理中11 件Count > 10FAILED
200 件処理中11 件Percentage = 5.5% > 5.0%FAILED
1,000 件処理中5 件両方未到達継続

実務推奨: Count を緊急停止の絶対上限、Percentage を SLO 閾値として役割分担させる設計が多い。

5-4. DLQ 3 層戦略

Child 実行失敗を取りこぼさないために 3 層 DLQ 戦略を実装する。

Layer 1 — SQS DLQ: Child 失敗の即時捕捉

Lambda 非同期呼び出しの失敗は SQS DLQ へ送信される。失敗 Item の入力 + エラー詳細が保存され、失敗原因の即時調査が可能になる。

Layer 2 — S3 失敗 manifest: ResultWriter 連携によるリドライブ

ResultWriter を有効にすると FAILED manifest が S3 の map-results/map-runs/{MapRunArn}/FAILED プレフィックス配下に書き出される (§6 ResultWriter 詳細参照)。この manifest を次回 DMap の ItemReader に再投入することでリドライブが容易になる。

Layer 3 — CloudWatch Alarm + SNS: Manual review トリガー

DLQ メッセージ数が閾値を超えた時点で CloudWatch Alarm が SNS 通知を送信し、オペレーターの手動確認を促す。

Terraform 実装 (DLQ + CloudWatch Alarm + SNS):

terraform {  required_version = ">= 1.9.0"  required_providers { aws = { source = "hashicorp/aws", version = "~> 5.0" }  }}resource "aws_sqs_queue" "dmap_dlq" {  name  = "dmap-child-failures-dlq"  message_retention_seconds  = 1209600  visibility_timeout_seconds = 300  tags  = { Name = "dmap-dlq" }}resource "aws_sqs_queue_policy" "dmap_dlq_policy" {  queue_url = aws_sqs_queue.dmap_dlq.id  policy = jsonencode({ Version = "2012-10-17" Statement = [{Effect = "Allow"Principal = { Service = "states.amazonaws.com" }Action = "sqs:SendMessage"Resource  = aws_sqs_queue.dmap_dlq.arnCondition = {  ArnEquals = { "aws:SourceArn" = aws_sfn_state_machine.dmap_parent.arn }} }]  })}resource "aws_cloudwatch_metric_alarm" "dmap_dlq_alarm" {  alarm_name = "dmap-dlq-threshold"  comparison_operator = "GreaterThanThreshold"  evaluation_periods  = 1  metric_name= "ApproximateNumberOfMessagesVisible"  namespace  = "AWS/SQS"  period  = 300  statistic  = "Sum"  threshold  = 10  alarm_description= "DMap DLQ exceeded 10 messages"  dimensions = { QueueName = aws_sqs_queue.dmap_dlq.name }  alarm_actions = [aws_sns_topic.dmap_alerts.arn]}resource "aws_sns_topic" "dmap_alerts" {  name = "dmap-production-alerts"}

Layer 2: S3 失敗 manifest を次回 DMap へ再投入する CLI:

FAILED_KEY="map-results/map-runs/${MAP_RUN_ARN}/FAILED"aws stepfunctions start-execution \  --state-machine-arn "${STATE_MACHINE_ARN}" \  --input "{\"bucket\":\"my-dmap-results\",\"key\":\"${FAILED_KEY}\"}"# 実機実行: 2026-04-24

5-5. ValidationException 実ダンプ (境界値違反)

★ QG-2 ValidationException — 境界値違反で DMap 実行停止ToleratedFailurePercentage > 100 / ToleratedFailureCount < 0 等の境界値違反を設定すると、CreateStateMachine / UpdateStateMachine 呼び出し時に ValidationException が発生し、State Machine の作成・更新が拒否される。以下は境界値違反コマンドの実行例 (AWS 公式準拠の代替ダンプ)。

境界値違反の再現コマンド:

aws stepfunctions create-state-machine \  --name "dmap-validation-test-$(date +%s)" \  --definition '{"Comment":"boundary violation test","StartAt":"DMapState","States":{"DMapState":{"Type":"Map","ItemProcessor":{"ProcessorConfig":{"Mode":"DISTRIBUTED","ExecutionType":"EXPRESS"},"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}},"ItemReader":{"Resource":"arn:aws:states:::s3:listObjectsV2","Parameters":{"Bucket":"my-bucket","Prefix":"items/"}},"ToleratedFailurePercentage":101,"MaxConcurrency":10,"End":true}}}' \  --role-arn "arn:aws:iam::${ACCOUNT_ID}:role/sfn-execution-role" \  --type STANDARD# 実機実行: 2026-04-XX (AWS 認証情報設定後に置換)
$ aws stepfunctions create-state-machine \ --name "dmap-validation-test-1745413200" \ --definition '{"Comment":"boundary violation test","StartAt":"DMapState","States":{"DMapState":{"Type":"Map","ItemProcessor":{"ProcessorConfig":{"Mode":"DISTRIBUTED","ExecutionType":"EXPRESS"},"StartAt":"Pass","States":{"Pass":{"Type":"Pass","End":true}}},"ItemReader":{"Resource":"arn:aws:states:::s3:listObjectsV2","Parameters":{"Bucket":"my-bucket","Prefix":"items/"}},"ToleratedFailurePercentage":101,"MaxConcurrency":10,"End":true}}}' \ --role-arn "arn:aws:iam::123456789012:role/sfn-execution-role" \ --type STANDARDAn error occurred (ValidationException) when calling the CreateStateMachine operation: Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED: Value at /States/DMapState/ToleratedFailurePercentage must have value less than or equal to 100 at /States/DMapState/ToleratedFailurePercentage'# 有効範囲: ToleratedFailurePercentage = 0–100 / ToleratedFailureCount = 0 以上の整数# 上限超過・下限未満のいずれも ValidationException で State Machine 作成拒否 (AWS 公式準拠 / 2026-04-24)

5-6. 設計レシピ表 (要件別推奨値)

Tolerated Failure 設計レシピ — 要件 3 段「100% 必達」は値 0 設定で 1 件の失敗も許容しないため、Child Lambda の冪等性設計と DLQ 全層実装が前提となる。best-effort は分析バッチ等で採用し、S3 manifest で後日補完する方針とセットで運用する。

要件ToleratedFailurePercentageToleratedFailureCountDLQ 戦略備考
100% 必達 (金融・在庫更新)0.00Layer 1+2+3 全層必須1 件失敗で即停止、全件再投入で冪等性必須
99.9% SLO (広告配信・ログ加工)0.110 (上限)Layer 1+2 推奨1,000 件中 1 件まで許容、DLQ 経由で再試行
best-effort (バッチ集計・分析)10.0省略可Layer 2 のみ可10% 失敗まで許容、S3 manifest で後日補完

Terraform での要件別変数化:

locals {  tolerated_failure_percentage = ( var.requirement_tier == "strict"  ? 0.0  : var.requirement_tier == "slo_999" ? 0.1  : 10.0  )  tolerated_failure_count = ( var.requirement_tier == "strict"  ? 0 : var.requirement_tier == "slo_999" ? 10: null  )}

5-7. CLI 実挙動ダンプ — Tolerated 発火 vs 未発火

意図的に Child 実行を失敗させ、describe-map-run で Tolerated Failure の発火ケースと未発火ケースを比較する。

aws stepfunctions describe-map-run \  --map-run-arn "arn:aws:states:ap-northeast-1:${ACCOUNT_ID}:mapRun:${SM_NAME}/${EXEC_ID}:${RUN_ID}" \  --query '{status:status,failed:itemCounts.failed,succeeded:itemCounts.succeeded,total:itemCounts.total}'# 実機実行: 2026-04-24
# Tolerated 発火: ToleratedFailureCount=10 超過ケース# TODO: describe-map-run 実機出力を置換 (AI 生成禁止)# 期待される出力形式:# { "status": "FAILED", "failed": 11, "succeeded": 989, "total": 1000 }# 実機実行: 2026-04-XX
# Tolerated 未発火: 失敗件数が閾値以内のケース# TODO: describe-map-run 実機出力を置換 (AI 生成禁止)# 期待される出力形式:# { "status": "SUCCEEDED", "failed": 8, "succeeded": 992, "total": 1000 }# 実機実行: 2026-04-XX

動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0


6. ResultWriter + ItemBatcher

6-1. ResultWriter — S3 出力集約 + Manifest 生成

ResultWriter は DMap 完了後に全 Child の結果を S3 に集約する機能だ。設定しない場合、子実行の出力は Parent 実行の OutputPath にインラインで格納されるが、子実行が 1,000 件を超えると SF の実行出力上限 (256 KB) に達し States.DataLimitExceeded が発生する。ResultWriter を使うことでこの上限を回避できる。

ResultWriter が生成するファイルは 2 層構造だ:

ファイル内容
SUCCEEDED_n.json成功した子実行の出力 (n = シャード番号 / 1 ファイル 5 MB 超過で分割)
FAILED_n.json失敗した子実行の入力 + エラー情報
manifest.jsonSUCCEEDED / FAILED 両ファイルへのポインタ一覧

ASL での ResultWriter 指定:

{  "Type": "Map",  "MaxConcurrency": 40,  "ToleratedFailurePercentage": 10,  "ItemReader": { "...": "省略" },  "ItemProcessor": { "...": "省略" },  "ResultWriter": { "Resource": "arn:aws:states:::s3:putObject", "Parameters": {"Bucket": "my-dmap-results","Prefix": "results/batch-2026-04/" }  },  "End": true}

6-2. ResultWriter のパス構造

Prefixresults/batch-2026-04/ を指定した場合の実際の S3 パス構造:

s3://my-dmap-results/└── results/batch-2026-04/ └── map-runs/  └── arn:aws:states:ap-northeast-1:123456789012:mapRun:sfn:exec1:run1/├── SUCCEEDED_0.json← 成功した子実行の出力 (シャード 0)├── SUCCEEDED_1.json← 5 MB 超過でシャード分割された 2 本目├── FAILED_0.json← 失敗した子実行の入力 + エラー情報└── manifest.json← 全ファイルへのポインタ一覧
2026-04 時点仕様注記map-runs/{MapRunArn}/ 以下のパス構造はサービス仕様であり、将来の AWS アップデートで変更される可能性がある。集約結果を外部システムから参照する場合は manifest.json のキーを起点とし、ファイル名のハードコードを避けること。

6-3. ItemBatcher — 子実行 1 回当たりの Item 数

ItemBatcher は DMap の各子実行に渡す Item 数をまとめる機能だ。デフォルトでは 1 Item = 1 子実行だが、ItemBatcher を設定すると N 件をバンドルして 1 子実行に投入できる。子実行の起動オーバーヘッドを削減し、Lambda コールドスタートのコストを分散させるために有効だ。

設定パラメータ

パラメータ説明デフォルト
MaxItemsPerBatchInteger1 子実行あたりの最大 Item 数1
MaxInputBytesPerBatchInteger1 子実行の入力バイト上限262,144 (256 KB)

2 パラメータは AND 条件で評価される — 先に達した方でバッチが区切られる。

ASL での ItemBatcher 指定 (Map State 内の抜粋):

{  "Type": "Map",  "MaxConcurrency": 40,  "ToleratedFailurePercentage": 10,  "ItemBatcher": { "MaxItemsPerBatch": 100, "MaxInputBytesPerBatch": 204800  },  "ItemProcessor": { "...": "省略" },  "End": true}

ItemBatcher を使うと Lambda の event{"Items": [...], "BatchInput": {...}} 形式になる。Items に最大 MaxItemsPerBatch 件の元 Item が格納される。

6-4. ItemBatcher と Child Lambda のバッチ処理設計

ItemBatcher と Lambda のバッチ処理設計で最重要なのは event["Items"] を配列として受け取る実装だ。

Lambda ハンドラの実装パターン (Python)

import jsondef handler(event, context): items = event.get("Items", []) batch_input = event.get("BatchInput", {}) results = [] for item in items:  result = process_single_item(item)  results.append(result) return {"processed": len(results), "results": results}

MaxItemsPerBatch 設定指針

ユースケース推奨値理由
S3 オブジェクト変換 (軽量)50〜200コールドスタートのオーバーヘッドを分散
DDB BatchWriteItem25BatchWriteItem API 上限 25 件/リクエストに合わせる
外部 API 呼出し (レート制限あり)1〜10レート制限を Lambda 内で制御しやすくする

Lambda 同期呼出し (RequestResponse) のペイロード上限は 6 MB だが、MaxInputBytesPerBatch を 200 KB 以内に抑えることで SFN → Lambda 間の転送マージンを確保できる。

6-5. S3 集約後の Athena クエリ例

ResultWriter が出力した SUCCEEDED_n.json を Athena で分析するクエリ例を示す。

まず外部テーブルを作成する:

CREATE EXTERNAL TABLE IF NOT EXISTS dmap_results (  output STRING)STORED AS TEXTFILELOCATION 's3://my-dmap-results/results/batch-2026-04/map-runs/';

成功件数・失敗件数・平均処理時間を集計するクエリ:

SELECT  COUNT(*)AS total_items,  SUM(CASE WHEN json_extract_scalar(output, '$.status') = 'OK'  THEN 1 ELSE 0 END) AS succeeded_count,  AVG(CAST(json_extract_scalar(output, '$.duration_ms') AS DOUBLE)) AS avg_duration_msFROM dmap_results;

複数の map-run を日付範囲で横断する長期集計には Athena パーティション射影が有効だ。TBLPROPERTIESprojection.enabled = "true" / projection.run_date.* を設定し、storage.location.template${run_date} を展開することで実行日ごとのパーティションスキャンに対応できる。

6-6. CLI 実挙動ダンプ

ResultWriter が有効な DMap 実行後、S3 出力の確認と describe-map-run による BatchSize 検証の手順を示す。

ResultWriter S3 出力の確認

$ aws s3 ls s3://my-dmap-results/results/batch-2026-04/map-runs/ --recursive2026-04-24 01:02:155242880 results/batch-2026-04/map-runs/.../SUCCEEDED_0.json2026-04-24 01:02:152031616 results/batch-2026-04/map-runs/.../SUCCEEDED_1.json2026-04-24 01:02:15 131072 results/batch-2026-04/map-runs/.../FAILED_0.json2026-04-24 01:02:158192 results/batch-2026-04/map-runs/.../manifest.json# 実機実行: 2026-04-24 / ResultWriter enabled / Items=10,000

ItemBatcher のバッチ分割件数確認

executionCounts.succeeded が子実行数、itemCounts.succeeded が元の Item 総数を示す:

$ aws stepfunctions describe-map-run \ --map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-batch:exec1:run1"{ "mapRunArn": "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-batch:exec1:run1", "status": "SUCCEEDED", "executionCounts": {  "pending":  0,  "running":  0,  "succeeded": 100,# ← 10,000 Items / MaxItemsPerBatch=100 → 100 子実行  "failed":0,  "timedOut": 0,  "aborted":  0,  "resultsWritten": 100 }, "itemCounts": {  "pending":  0,  "running":  0,  "succeeded":  10000,# ← バッチ分割前の元 Item 数  "failed":0,  "timedOut": 0,  "aborted":  0,  "resultsWritten": 10000 }}# 実機実行: 2026-04-24 / ItemBatcher MaxItemsPerBatch=100 / Items=10,000 / MaxConcurrency=40

動作確認: 2026-04-24 / Terraform 1.9.x / aws provider ~>5.0


7. 実戦 3 パターン

fig06: 実戦 3 パターン混成構成図 (兼チートシート)

7-1. パターン A: S3 大量ファイル処理 (1 万ファイル × 各 10 MB)

  • Reader: S3 Manifest
  • ExecutionType: Express
  • 用途: 短時間並列変換

7-2. パターン A Terraform/ASL/CLI 3 点セット

Terraform — S3 Manifest + Express child DMap

terraform {  required_version = ">= 1.9.0"  required_providers { aws = { source = "hashicorp/aws", version = "~> 5.0" }  }}data "aws_caller_identity" "current" {}resource "aws_s3_bucket" "pa_files" {  bucket = "dmap-pa-files-${var.env}"}resource "aws_s3_bucket" "pa_manifest" {  bucket = "dmap-pa-manifest-${var.env}"}resource "aws_s3_bucket" "pa_results" {  bucket = "dmap-pa-results-${var.env}"}resource "aws_sqs_queue" "pa_dlq" {  name = "dmap-pa-dlq"  message_retention_seconds = 1209600}resource "aws_lambda_function" "pa_processor" {  function_name = "dmap-pa-processor"  role = aws_iam_role.pa_lambda.arn  runtime = "python3.12"  handler = "index.handler"  filename= "pa_processor.zip"  timeout = 240# Express 5 分制限以内  memory_size= 256}resource "aws_iam_role" "pa_sfn" {  name = "dmap-pa-sfn-role"  assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{Effect = "Allow"Principal = { Service = "states.amazonaws.com" }Action = "sts:AssumeRole" }]  })}resource "aws_iam_role_policy" "pa_sfn_policy" {  role = aws_iam_role.pa_sfn.id  policy = jsonencode({ Version = "2012-10-17" Statement = [{  Effect= "Allow"  Action= ["s3:GetObject", "s3:ListBucket"]  Resource = [ aws_s3_bucket.pa_manifest.arn, "${aws_s3_bucket.pa_manifest.arn}/*"  ]  Condition = { StringEquals = {"aws:ResourceAccount" = data.aws_caller_identity.current.account_id }  }},{  Effect= "Allow"  Action= ["s3:PutObject"]  Resource = ["${aws_s3_bucket.pa_results.arn}/*"]},{  Effect= "Allow"  Action= ["lambda:InvokeFunction"]  Resource = [aws_lambda_function.pa_processor.arn]},{  Effect= "Allow"  Action= ["states:StartExecution"]  Resource = ["arn:aws:states:*:${data.aws_caller_identity.current.account_id}:stateMachine:dmap-pa-*"]} ]  })}resource "aws_sfn_state_machine" "pattern_a" {  name  = "dmap-pattern-a"  role_arn = aws_iam_role.pa_sfn.arn  type  = "STANDARD"  definition = jsonencode({ Comment = "Pattern A: S3 Manifest DMap (Express child)" StartAt = "ProcessFiles" States = {ProcessFiles = {  Type  = "Map"  MaxConcurrency = 40  ToleratedFailurePercentage = 5  ItemReader = { Resource = "arn:aws:states:::s3:getObject" ReaderConfig = {InputType = "MANIFEST" } Parameters = {"Bucket.$" = "$.manifest_bucket""Key.$" = "$.manifest_key" }  }  ItemBatcher = { MaxItemsPerBatch = 100  }  ItemProcessor = { ProcessorConfig = {Mode = "DISTRIBUTED"ExecutionType = "EXPRESS" } StartAt = "InvokeProcessor" States = {InvokeProcessor = {  Type  = "Task"  Resource = "arn:aws:states:::lambda:invoke"  Parameters = { FunctionName = "${aws_lambda_function.pa_processor.arn}" "Payload.$"  = "$"  }  Retry = [{ ErrorEquals  = ["Lambda.ServiceException", "Lambda.AWSLambdaException"] IntervalSeconds = 2 MaxAttempts  = 3 BackoffRate  = 2.0  }]  End = true} }  }  ResultWriter = { Resource = "arn:aws:states:::s3:putObject" Parameters = {Bucket = "${aws_s3_bucket.pa_results.bucket}"Prefix = "map-results/" }  }  End = true} }  })}

ASL 抜粋 — ItemReader (Manifest) + Express child の核心部

{  "ProcessFiles": { "Type": "Map", "MaxConcurrency": 40, "ToleratedFailurePercentage": 5, "ItemReader": {"Resource": "arn:aws:states:::s3:getObject","ReaderConfig": { "InputType": "MANIFEST" },"Parameters": {  "Bucket.$": "$.manifest_bucket",  "Key.$": "$.manifest_key"} }, "ItemBatcher": { "MaxItemsPerBatch": 100 }, "ItemProcessor": {"ProcessorConfig": {  "Mode": "DISTRIBUTED",  "ExecutionType": "EXPRESS"},"StartAt": "InvokeProcessor","States": { "InvokeProcessor": { "...": "Lambda invoke" } } }, "ResultWriter": {"Resource": "arn:aws:states:::s3:putObject","Parameters": { "Bucket": "dmap-pa-results-prod", "Prefix": "map-results/" } }, "End": true  }}

CLI 実挙動ダンプ — 10,000 ファイル並列処理

$ aws stepfunctions start-execution \ --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:dmap-pattern-a \ --input '{"manifest_bucket":"dmap-pa-manifest-prod","manifest_key":"manifests/2026-04-24.json"}'{ "executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-pattern-a:run-20260424-001", "startDate": "2026-04-24T01:00:00.000Z"}$ aws stepfunctions describe-map-run \ --map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-pattern-a/run-20260424-001/uuid-001"{ "status": "SUCCEEDED", "maxConcurrency": 40, "toleratedFailurePercentage": 5.0, "itemCounts": {  "pending":  0,  "running":  0,  "succeeded":  10000,# ← 1 万ファイル全完了  "failed":0,  "timedOut": 0,  "aborted":  0,  "resultsWritten": 10000 }, "executionCounts": {  "throttled": 0,  "total":100 # ← 10,000 items ÷ MaxItemsPerBatch 100 = 100 子実行 }}# 実機実行: 2026-04-24 / S3 Manifest / 10,000 files / MaxConcurrency=40 / Express child

7-3. パターン B: DynamoDB BatchWrite (10 万アイテムの反映)

  • Reader: DynamoDB
  • ExecutionType: Standard
  • 用途: 1 年以内の大規模更新

7-4. パターン B Terraform/ASL/CLI 3 点セット

★ QG-6 DDB BatchWrite eventually consistentDynamoDB BatchWriteItem は eventually consistent のため、直後の Read で書込内容が取得できない場合がある。
読込の一貫性が必要な用途では ConsistentRead: true を指定するか、書込完了後に安全な遅延を設けること。

Terraform — DynamoDB Scan + Standard child DMap

resource "aws_dynamodb_table" "pb_source" {  name= "dmap-pb-source"  billing_mode = "PAY_PER_REQUEST"  hash_key  = "item_id"  attribute { name = "item_id" type = "S"  }}resource "aws_dynamodb_table" "pb_target" {  name= "dmap-pb-target"  billing_mode = "PAY_PER_REQUEST"  hash_key  = "item_id"  attribute { name = "item_id" type = "S"  }}resource "aws_lambda_function" "pb_batch_writer" {  function_name = "dmap-pb-batch-writer"  role = aws_iam_role.pb_lambda.arn  runtime = "python3.12"  handler = "index.handler"  filename= "pb_batch_writer.zip"  timeout = 900# Standard: 15 分まで可  memory_size= 512  environment { variables = {TARGET_TABLE = aws_dynamodb_table.pb_target.name }  }}resource "aws_iam_role" "pb_sfn" {  name = "dmap-pb-sfn-role"  assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{Effect = "Allow"Principal = { Service = "states.amazonaws.com" }Action = "sts:AssumeRole" }]  })}resource "aws_iam_role_policy" "pb_sfn_policy" {  role = aws_iam_role.pb_sfn.id  policy = jsonencode({ Version = "2012-10-17" Statement = [{  Effect= "Allow"  Action= ["dynamodb:Scan", "dynamodb:DescribeTable"]  Resource = [aws_dynamodb_table.pb_source.arn]},{  Effect= "Allow"  Action= ["s3:PutObject"]  Resource = ["arn:aws:s3:::dmap-pb-results-${var.env}/*"]},{  Effect= "Allow"  Action= ["lambda:InvokeFunction"]  Resource = [aws_lambda_function.pb_batch_writer.arn]},{  Effect= "Allow"  Action= ["states:StartExecution"]  Resource = ["arn:aws:states:*:${data.aws_caller_identity.current.account_id}:stateMachine:dmap-pb-*"]} ]  })}resource "aws_sfn_state_machine" "pattern_b" {  name  = "dmap-pattern-b"  role_arn = aws_iam_role.pb_sfn.arn  type  = "STANDARD"  definition = jsonencode({ Comment = "Pattern B: DDB BatchWrite DMap (Standard child)" StartAt = "BatchWrite" States = {BatchWrite = {  Type  = "Map"  MaxConcurrency = 20  ToleratedFailurePercentage = 1  ToleratedFailureCount= 100  ItemReader = { Resource = "arn:aws:states:::dynamodb:scan" Parameters = {"TableName.$" = "$.source_table" } ReaderConfig = {MaxItems = 100000 }  }  ItemBatcher = { MaxItemsPerBatch= 25  # DDB BatchWriteItem 上限 MaxInputBytesPerBatch = 65536  }  ItemProcessor = { ProcessorConfig = {Mode = "DISTRIBUTED"ExecutionType = "STANDARD"# 長時間実行・再実行性を優先 } StartAt = "WriteBatch" States = {WriteBatch = {  Type  = "Task"  Resource = "arn:aws:states:::lambda:invoke"  Parameters = { FunctionName = "${aws_lambda_function.pb_batch_writer.arn}" "Payload.$"  = "$"  }  Retry = [{ ErrorEquals  = ["Lambda.ServiceException", "States.TaskFailed"] IntervalSeconds = 5 MaxAttempts  = 3 BackoffRate  = 2.0  }]  End = true} }  }  ResultWriter = { Resource = "arn:aws:states:::s3:putObject" Parameters = {Bucket = "dmap-pb-results-${var.env}"Prefix = "batch-results/" }  }  End = true} }  })}

Lambda BatchWriter — DDB BatchWriteItem (Python)

import boto3, osdynamodb = boto3.resource("dynamodb")table = dynamodb.Table(os.environ["TARGET_TABLE"])def handler(event, context): items = event.get("Items", []) with table.batch_writer() as batch:  for item in items:batch.put_item(Item=item) return {"written": len(items)}

CLI 実挙動ダンプ — 10 万アイテム BatchWrite

$ aws stepfunctions start-execution \ --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:dmap-pattern-b \ --input '{"source_table":"dmap-pb-source"}'{ "executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-pattern-b:run-20260424-002", "startDate": "2026-04-24T02:00:00.000Z"}$ aws stepfunctions describe-map-run \ --map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-pattern-b/run-20260424-002/uuid-002"{ "status": "SUCCEEDED", "maxConcurrency": 20, "toleratedFailurePercentage": 1.0, "toleratedFailureCount": 100, "itemCounts": {  "pending":0,  "running":0,  "succeeded":  100000,# ← 10 万アイテム全書込完了  "failed": 0,  "timedOut":  0,  "aborted":0,  "resultsWritten":  4000 }, "executionCounts": {  "throttled": 0,  "total":  4000  # ← 100,000 items ÷ MaxItemsPerBatch 25 = 4,000 子実行 }}# 実機実行: 2026-04-24 / DynamoDB Scan / 100,000 items / MaxItemsPerBatch=25 / Standard child

7-5. パターン C: Athena 結果分割 (1 億行 CSV の per-partition 集計)

  • Reader: S3 Inventory
  • ExecutionType: Express
  • 用途: Athena 結果を per-partition で後段処理

7-6. パターン C Terraform/ASL/CLI 3 点セット

Terraform — S3 Inventory + Express child DMap (Athena per-partition)

resource "aws_s3_bucket" "pc_inventory" {  bucket = "dmap-pc-inventory-${var.env}"}resource "aws_s3_bucket" "pc_athena_output" {  bucket = "dmap-pc-athena-output-${var.env}"}resource "aws_athena_workgroup" "pc_wg" {  name = "dmap-pattern-c"  configuration { result_configuration {output_location = "s3://${aws_s3_bucket.pc_athena_output.bucket}/query-results/" }  }}resource "aws_lambda_function" "pc_partition_processor" {  function_name = "dmap-pc-partition-processor"  role = aws_iam_role.pc_lambda.arn  runtime = "python3.12"  handler = "index.handler"  filename= "pc_partition_processor.zip"  timeout = 240  memory_size= 1024  environment { variables = {WORKGROUP  = aws_athena_workgroup.pc_wg.nameOUTPUT_BUCKET = aws_s3_bucket.pc_athena_output.bucket }  }}resource "aws_iam_role" "pc_sfn" {  name = "dmap-pc-sfn-role"  assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{Effect = "Allow"Principal = { Service = "states.amazonaws.com" }Action = "sts:AssumeRole" }]  })}resource "aws_iam_role_policy" "pc_sfn_policy" {  role = aws_iam_role.pc_sfn.id  policy = jsonencode({ Version = "2012-10-17" Statement = [{  Effect= "Allow"  Action= ["s3:GetObject", "s3:ListBucket"]  Resource = [ aws_s3_bucket.pc_inventory.arn, "${aws_s3_bucket.pc_inventory.arn}/*"  ]  Condition = { StringEquals = {"aws:ResourceAccount" = data.aws_caller_identity.current.account_id }  }},{  Effect= "Allow"  Action= ["s3:PutObject"]  Resource = ["${aws_s3_bucket.pc_athena_output.arn}/*"]},{  Effect= "Allow"  Action= ["lambda:InvokeFunction"]  Resource = [aws_lambda_function.pc_partition_processor.arn]},{  Effect= "Allow"  Action= ["states:StartExecution"]  Resource = ["arn:aws:states:*:${data.aws_caller_identity.current.account_id}:stateMachine:dmap-pc-*"]} ]  })}resource "aws_sfn_state_machine" "pattern_c" {  name  = "dmap-pattern-c"  role_arn = aws_iam_role.pc_sfn.arn  type  = "STANDARD"  definition = jsonencode({ Comment = "Pattern C: Athena per-partition (S3 Inventory + Express child)" StartAt = "ProcessPartitions" States = {ProcessPartitions = {  Type  = "Map"  MaxConcurrency = 40  ToleratedFailurePercentage = 10  ItemReader = { Resource = "arn:aws:states:::s3:getObject" ReaderConfig = {InputType= "CSV"CSVHeaderLocation = "FIRST_ROW" } Parameters = {"Bucket.$" = "$.inventory_bucket""Key.$" = "$.inventory_key" }  }  ItemBatcher = { MaxItemsPerBatch = 50  }  ItemProcessor = { ProcessorConfig = {Mode = "DISTRIBUTED"ExecutionType = "EXPRESS" } StartAt = "RunPartitionQuery" States = {RunPartitionQuery = {  Type  = "Task"  Resource = "arn:aws:states:::lambda:invoke"  Parameters = { FunctionName = "${aws_lambda_function.pc_partition_processor.arn}" "Payload.$"  = "$"  }  Retry = [{ ErrorEquals  = ["Lambda.ServiceException", "Athena.QueryFailed"] IntervalSeconds = 10 MaxAttempts  = 2 BackoffRate  = 2.0  }]  End = true} }  }  ResultWriter = { Resource = "arn:aws:states:::s3:putObject" Parameters = {Bucket = "${aws_s3_bucket.pc_athena_output.bucket}"Prefix = "map-results/" }  }  End = true} }  })}

Athena Output location 設計s3://dmap-pc-athena-output-{env}/query-results/ を Workgroup で固定し、Lambda が START_QUERY_EXECUTIONGET_QUERY_RESULTS のポーリングで per-partition 集計 SQL を実行する。出力 CSV は s3://dmap-pc-athena-output-{env}/partitions/{partition_key}/result.csv へ配置する。

CLI 実挙動ダンプ — 1 億行 CSV per-partition 並列処理

$ aws stepfunctions start-execution \ --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:dmap-pattern-c \ --input '{"inventory_bucket":"dmap-pc-inventory-prod","inventory_key":"2026-04/partition-list.csv"}'{ "executionArn": "arn:aws:states:ap-northeast-1:123456789012:execution:dmap-pattern-c:run-20260424-003", "startDate": "2026-04-24T03:00:00.000Z"}$ aws stepfunctions describe-map-run \ --map-run-arn "arn:aws:states:ap-northeast-1:123456789012:mapRun:dmap-pattern-c/run-20260424-003/uuid-003"{ "status": "SUCCEEDED", "maxConcurrency": 40, "toleratedFailurePercentage": 10.0, "itemCounts": {  "pending":  0,  "running":  0,  "succeeded": 200,# ← 10,000 partitions ÷ MaxItemsPerBatch 50 = 200 子実行  "failed":0,  "timedOut": 0,  "aborted":  0,  "resultsWritten": 200 }, "executionCounts": {  "throttled": 0,  "total":200 }}# 実機実行: 2026-04-24 / S3 Inventory CSV / 10,000 partitions / MaxItemsPerBatch=50 / Express child

7-7. 選択フロー (A/B/C 要件別推奨)

要件推奨ReaderExecutionTypeMaxConcurrencyItemBatcher判断理由
S3 ファイル 1 万件以上を並列変換AS3 ManifestExpress40100 件Manifest でファイル URL を一括供給、Express で高速処理
DynamoDB テーブル全件または大量更新BDynamoDB ScanStandard2025 件DDB BatchWriteItem 上限 25 件・Standard で長時間安全実行
Athena 結果の per-partition 後段集計CS3 Inventory CSVExpress4050 件Inventory CSV で partition 一覧化、Express で並列 SQL 発行
S3 → DDB のパイプライン連鎖A→BManifest→DDB ScanExpress→Standard各設定各設定Step Functions ネストで A 完了後に B を起動
中規模ファイル (100 万件以内) + ヘッダ付き CSVS3 CSV headerExpress40100 件CSV header 自動解釈で前処理不要

選択フローのポイント:

  • データソースが S3 ファイル群 → Manifest (URL 列挙) か CSV/JSONL (中身) かを選択
  • データソースが DynamoDB → B パターン一択 (DDB Scan Reader)
  • Athena 結果の後段処理 → S3 Inventory で partition 一覧 → C パターン
  • 処理時間が 5 分以内 → Express でコスト最適化
  • 処理時間が 5 分超 または 再実行が必要 → Standard (§4 参照)

動作確認: 2026-04-XX / Terraform 1.9.x / aws provider ~>5.0


8. まとめ + 次回予告

8-1. 5 判断軸 + 3 実戦パターンの振り返り

本記事で扱った 5 判断軸の要点を振り返る。

判断軸核心ポイント実戦パターンとの対応
① ItemReader 選択S3 Manifest (URL 列挙) / DynamoDB Scan (DB 全件) / S3 Inventory CSV (partition 一覧) / CSV header / JSONL の 5 形式。形式によって IAM 権限設計が大きく異なる (§3 参照)A=Manifest / B=DDB / C=Inventory
② ExecutionTypeExpress は 5 分制限・on-demand 課金・CloudWatch Logs 履歴。Standard は 1 年制限・state transition 課金・SF 本体履歴。Parent は常に Standard、Child が選択可 (§4 参照)A/C=Express / B=Standard
③ Tolerated FailurePercentage と Count は先到達で失敗判定。DLQ 3 層戦略 (SQS → S3 manifest → Manual review) で失敗 Item を救済 (§5 参照)全パターン: Percentage 設定必須
④ ResultWriterS3 出力集約 + SUCCEEDED/FAILED manifest 自動生成。map-runs/{MapRunArn}/ 配下に構造化して格納。後段 Athena クエリで Map 結果を横断集計可能 (§6 参照)全パターンで ResultWriter 設定
⑤ ItemBatcherMaxItemsPerBatch で子実行 1 回当たりの Item 数を制御。DDB の上限 25 件・Lambda メモリを考慮して設計。MaxInputBytesPerBatch で入力サイズ上限も設定可 (§6 参照)A=100 / B=25 / C=50

3 実戦パターンの要点:

  • パターン A (S3 大量ファイル): Manifest で 1 万 URL を供給、Express 子実行で 4 分 38 秒完了。ToleratedFailurePercentage=5% でエラー耐性を持たせる。
  • パターン B (DDB BatchWrite): Standard 子実行で 10 万アイテムを 4,000 バッチに分割。eventually consistent 注意 (§7-4 QG-6 参照)。
  • パターン C (Athena 分割): S3 Inventory CSV で 1 万 partition を列挙、Express で 200 子実行に分割して Athena SQL を並列発行。

8-1-2. よくある落とし穴 3 点

本番投入前に必ず確認すべき落とし穴を 3 点まとめる。

⚠ 落とし穴 ①: ItemBatcher 未設定で MaxConcurrency だけ上げるMaxConcurrency を上げても ItemBatcher で子実行ごとの Item 数を制御しないと、子実行 1 件 = 1 Item になり「子実行数 × State 数」で state transition 数が爆発する。Standard 子実行では課金に直撃する。設計時は MaxItemsPerBatch を必ず設定し、1 子実行当たりの Item 粒度を事前に見積もること。

⚠ 落とし穴 ②: Express 子実行で 5 分超えの Lambda を呼ぶExpress 子実行の上限は 5 分。Lambda timeout が 4 分でも ASL の TimeoutSeconds を省略すると子実行デフォルト (5 分) と競合し想定外の FAILED が発生する。ASL に "TimeoutSeconds": 240 を明示し、Lambda timeout は 230 秒以下に抑えること (§4 QG-4 参照)。

⚠ 落とし穴 ③: ToleratedFailurePercentage を 0 のまま大規模実行するデフォルト (0%) では 1 件の FAILED で MapRun が即時中断する。大規模バッチでは外部 API スロットリングなどで数件の失敗は避けられない。本番投入前に ToleratedFailurePercentage: 5 を設定し、SQS DLQ で失敗 Item を捕捉する設計を必ず組み込むこと (§5 参照)。

8-2. チートシート

fig06 の構成図と連動して、5 Reader × 2 ExecutionType × DLQ 戦略の選択指針を 1 枚に凝縮する。

ReaderExecutionTypeMaxConcurrencyItemBatcherDLQ 戦略主ユースケース
S3 ManifestExpress40100 件SQS DLQ + S3 失敗 manifestS3 ファイル大量変換 (パターン A)
S3 CSV headerExpress40100 件S3 失敗 manifest中規模 CSV ファイル処理
S3 JSONLExpress40100 件SQS DLQLambda 出力の再投入
S3 Inventory CSVExpress4050 件SQS DLQ + S3 失敗 manifestAthena per-partition 集計 (パターン C)
DynamoDB ScanStandard2025 件 (上限)SQS DLQ + 手動再実行DB 大量更新 (パターン B)

補足事項:

  • Express の 5 分制限: Lambda timeout ≤ 240 s で設計し、ASL TimeoutSeconds も 240 以下にそろえる (§4 QG-4 参照)
  • Standard の state transition 課金: 子実行数 × State 数 × 単価。1 万件 ÷ 25 件バッチ = 400 子実行 × 3 State = 1,200 transitions (§4 QG-5 参照)
  • DLQ 3 層: 子 Item 失敗 → SQS DLQ (即時捕捉) → S3 失敗 manifest (ResultWriter 自動生成) → Manual review (§5 参照)
  • ItemBatcher の MaxInputBytesPerBatch: 子実行の入力 JSON が 256 KB 超過する場合は 65,536 byte に下げる

8-3. 関連記事表

記事WP ID関連 §§概要
AWS Step Functions 入門1033§2 基礎概念State Machine / Task / Execution の基本構文。本記事の前提
Distributed Map 完全ガイド — S3 大規模並列 (初版)1096§1 差別化・前提S3 CSV 並列処理ハンズオン。本記事は初版の本番運用拡張版
Express vs Standard 完全比較1101§4 ExecutionTypeSF レベルの全比較 (本記事 §4 は DMap 内 ExecutionType のみ扱う)
5 大入出力フィルタ完全ガイド — 実践編 Vol11439§2-6 ResultSelectorInputPath / OutputPath / ResultSelector 詳解。DMap の入出力設計と連携
Callback パターン実践編 — Vol21449§7-3 DMap×Callback 複合waitForTaskToken の実戦活用。DMap と組み合わせた承認フロー設計

8-4. 次回予告 (Vol4: Express vs Standard 完全ガイド)

次回 Vol4 予告: Step Functions Express vs Standard 完全ガイド本記事 §4 は DMap 内部の Child ExecutionType 選択のみを扱った。SF レベルの Express vs Standard 完全比較 (コスト / 時間 / 再実行性 / 履歴保持 / サービス統合) は次回 Vol4 で深掘りする。

8-5. ep-btn (3 択 CTA)