- 1 Step Functions Distributed Map完全ガイド — S3大規模並列処理をハンズオンで習得
- 1.1 Section 1: Distributed Map概念編
- 1.2 1-1. Distributed Mapとは
- 1.3 1-2. Inline Map vs Distributed Map 比較表
- 1.4 1-3. アーキテクチャ: 親実行 → 子実行
- 1.5 1-4. 主要フィールド詳細解説
- 1.6 1-5. ユースケース一覧
- 1.7 1-6. 第4弾(データフロー制御)との関連
- 1.8 1-7. コスト構造
- 1.9 Section 2: アーキテクチャ解説
- 1.10 Section 3: AWSコンソールでのハンズオン
- 1.11 3-1. 前提条件
- 1.12 3-2. 準備: S3バケット作成
- 1.13 3-3. Lambda関数のデプロイ(csv-converter)
- 1.14 3-4. IAMロール作成
- 1.15 3-5. ステートマシンを段階的に構築(記事の核心)
- 1.16 3-6. 実行結果の確認
- 1.17 3-7. CloudWatch Logsで並列処理の様子を確認
- 1.18 まとめ(コンソール編)
- 1.19 Section 4: Terraformでの構築
- 1.20 4-1. 前提条件
- 1.21 4-2. ディレクトリ構成
- 1.22 4-3. Lambda ソースコード
- 1.23 4-4. statemachine/definition.json.tpl
- 1.24 4-5. variables.tf
- 1.25 4-6. main.tf(完全なコード)
- 1.26 4-7. outputs.tf
- 1.27 4-8. デプロイ手順
- 1.28 4-9. テスト用CSVアップロード
- 1.29 4-10. 動作確認
- 1.30 まとめ(Terraform編)
- 1.31 Section 5: 実践Tips(Distributed Map設計ガイド)
- 1.32 Section 6: ハンズオン後の削除手順
- 1.33 Section 7: まとめと次のステップ
Step Functions Distributed Map完全ガイド — S3大規模並列処理をハンズオンで習得
公開日: 2026-04-13
難易度: 中級〜上級
所要時間: 約120分
シリーズ: [Step Functions シリーズ 第6回]
この記事で学ぶこと
– Distributed Map と Inline Map の違い(アイテム数上限・子実行・ItemReader/ResultWriter)
– ItemReader(S3 listObjectsV2)を使ったS3オブジェクト大規模一括処理
– MaxConcurrency / ToleratedFailurePercentage / ResultWriter の設定と挙動
– 段階的構築(Step A→B→C→D): 基本→並列度制御→エラー許容→ResultWriter完成形
– Terraform での完全構築(コンソール版とASL完全一致)シリーズ前回記事:
– 第1回: AWS Step Functions 入門
– 第2回: ECS × Step Functions 入門
– 第3回: Step Functions エラーハンドリング完全ガイド
– 第4回: Step Functions 入出力データフロー制御完全ガイド
– 第5回: Step Functions Callbackパターン完全ガイド
Section 1: Distributed Map概念編
1-1. Distributed Mapとは
AWS Step FunctionsのMap Stateには、2つの動作モードが存在します。
- Inline Map(
Mode: INLINE): 同一ワークフロー実行内で並列処理を行うデフォルトモード。同時実行できるイテレーション数の上限は 40件。 - Distributed Map(
Mode: DISTRIBUTED): 各アイテムに対して子ワークフロー(Standard/Express実行)を個別に起動して並列処理するモード。アイテム数は事実上無制限(最大1億件)、同時実行上限は最大10,000件。
Distributed Mapが必要な状況
| シナリオ | 理由 |
|---|---|
| 40アイテムを超える大規模データ処理 | Inline Mapの40件上限を超えるため |
| S3上のファイル群を直接参照したい場合 | ItemReaderでS3オブジェクト一覧を取得できるのはDistributed Mapのみ |
| 子実行を独立して管理・監視したい場合 | 各子実行が独立した実行履歴を持ち、個別追跡が可能 |
1-2. Inline Map vs Distributed Map 比較表
| 特性 | Inline Map | Distributed Map |
|---|---|---|
| Mode設定 | INLINE(デフォルト) | DISTRIBUTED |
| アイテム上限 | 40件 | 事実上無制限(最大1億件) |
| 同時実行上限 | 40件 | 最大10,000件(MaxConcurrency=0で上限まで) |
| 実行方式 | 同一実行内での並列 | 子ワークフロー(Standard/Express)起動 |
| ItemReader | 不可(配列のみ) | 可(S3/JSON配列/CSV/JSONL/Parquet) |
| ResultWriter | 不可 | 可(S3出力・JSONL形式) |
| 課金 | 親実行の状態遷移数のみ | 親 + 子実行の状態遷移数 |
| 実行履歴 | 親実行に含まれる | 子実行ごとに独立 |
| ユースケース | 小規模並列処理(〜40件) | 大規模データ処理 / S3直接参照 |
1-3. アーキテクチャ: 親実行 → 子実行
Distributed Mapでは、ItemReaderでデータソースを読み込み、各アイテムに対して子ワークフローを起動します。全子実行の結果はResultWriterでS3に集約されます。
┌─────────────────────────────────────────┐
│ 親ワークフロー実行│
│ ProcessCSVFiles (Distributed Map State) │
│ │ │
│ ItemReader: S3 listObjectsV2│
│ [file1.csv, file2.csv, ..., fileN.csv] │
│ │ │
│ ┌────┴────────────────────────┐│
│ │ 子実行1 子実行2 子実行N ││
│ │ file1 file2 fileN││
│ │ LambdaLambda Lambda ││
│ └────┬────────────────────────┘│
│ │ │
│ ResultWriter: S3 output/results/ │
└─────────────────────────────────────────┘
- 子実行はStandard ExecutionまたはExpress Executionを選択可能
- MaxConcurrencyで同時実行数を制御(
0=上限(10,000)まで一斉起動)
1-4. 主要フィールド詳細解説
ItemReader
データソースを指定するフィールド。Distributed Mapでのみ使用可能。
| Resourceタイプ | 説明 |
|---|---|
arn:aws:states:::s3:listObjectsV2 | S3バケットのオブジェクト一覧を取得 |
arn:aws:states:::s3:getObject(JSON/JSONL) | S3オブジェクトの中身(JSON配列またはJSONL形式) |
arn:aws:states:::s3:getObject(CSV) | S3 CSVファイルの行ごとに処理 |
ItemsPath(JSON配列) | 入力JSONの配列フィールドを直接参照 |
S3 listObjectsV2の例(プレフィックス配下のオブジェクト一覧):
"ItemReader": {
"Resource": "arn:aws:states:::s3:listObjectsV2",
"Parameters": {
"Bucket": "<INPUT_BUCKET_NAME>",
"Prefix": "input/"
}
}
→ 各子実行のInputは以下のような形式になります:
{ "Key": "input/file1.csv", "Size": 1024, "ETag": "abc123...", "LastModified": "..." }
ItemProcessor
子実行内で動かすワークフロー定義とモード設定を記述します。
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "ConvertCSV",
"States": { ... }
}
ExecutionType の選択基準:
| 項目 | STANDARD | EXPRESS |
|---|---|---|
| 最大実行時間 | 1年 | 5分 |
| 実行モデル | Exactly-once | At-least-once |
| 実行履歴 | CloudWatch + Step Functions コンソール | CloudWatch Logs のみ |
| コスト | 状態遷移数課金(低単価) | 実行回数 + 実行時間課金 |
| 向いているケース | 長時間処理・監査要件あり | 短時間高スループット処理 |
MaxConcurrency
同時に実行する子ワークフロー数の上限を指定します。
"MaxConcurrency": 5
| 値 | 動作 |
|---|---|
0 | 上限なし(最大10,000件まで一斉起動) |
1〜10000 | 指定した数を超えないよう並列実行を制御 |
ポイント: Lambdaのアカウントデフォルト同時実行数(1,000)との兼ね合いで設定。大量アイテムを処理する場合は
MaxConcurrencyを下げてスロットリングを回避。
ToleratedFailurePercentage / ToleratedFailureCount
全体を失敗とみなす前に許容できる失敗率/件数を設定します。どちらか一方、または両方を指定可能(両方指定時はいずれかを超えた時点で失敗)。
"ToleratedFailurePercentage": 20
| フィールド | 説明 | 例 |
|---|---|---|
ToleratedFailurePercentage | 失敗を許容する割合(%) | 20 → 全体の20%以下なら全体SUCCEEDED |
ToleratedFailureCount | 失敗を許容する絶対件数 | 5 → 5件以下の失敗なら全体SUCCEEDED |
ResultWriter
全子実行の結果をS3にJSONL形式で集約出力します。
"ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Parameters": {
"Bucket": "<OUTPUT_BUCKET_NAME>",
"Prefix": "results/"
}
}
- 出力形式はJSONL(1行1アイテムのJSON)
- ResultWriterを省略した場合、全子実行の結果が親実行の出力に含まれる → 大規模データには非推奨(256KBの出力上限に引っかかりやすい)
BatchInput
全子実行に共通の追加入力を渡すフィールド。
"BatchInput": {
"output_bucket": "<OUTPUT_BUCKET_NAME>"
}
- ItemReaderで取得したアイテム(各ファイルのメタデータ等)と
BatchInputがマージされて子実行に渡される - 各子実行に共通の設定値(バケット名、処理パラメータ等)を渡したい場合に有用
1-5. ユースケース一覧
| ユースケース | 説明 | 典型アイテム数 |
|---|---|---|
| S3大量オブジェクト処理 | ログファイル/CSVの一括変換・集計 | 数百〜数百万件 |
| CSVデータ一括変換 | ETL前処理、データクレンジング | 数千〜数十万行 |
| 画像リサイズ/サムネイル生成 | S3内の画像ファイルを並列変換 | 数百〜数万枚 |
| ETLパイプライン | データ変換・ロード処理 | 数万〜数百万件 |
| ログ解析 | アクセスログの集計・異常検知 | 数百〜数千ファイル |
1-6. 第4弾(データフロー制御)との関連
第4弾「データフロー制御」で学んだ知識は、Distributed MapのItemProcessor内でもそのまま活用できます。
$(ドル記号): 各子実行への入力(ItemReaderが渡すアイテム)を参照$$(ダブルドル): Context Objectを参照$$Map.Item.Index: 現在処理中のアイテムのインデックス(0始まり)$$Map.Item.Value: 現在処理中のアイテムの値- 子実行内でも
InputPath/Parameters/ResultSelector/ResultPathが同様に機能
1-7. コスト構造
Distributed Mapはアイテムごとに子実行を起動するため、課金が親+子の二層になります。
コスト試算例:
アイテム数: 1,000件
子実行: STANDARD(各3ステート)
親実行: 数ステート(Map State含む)
子実行: 1,000実行 × 3ステート = 3,000状態遷移
合計: 親の遷移数 + 3,000遷移分の課金
| ExecutionType | 課金モデル | 短時間大量処理への向き |
|---|---|---|
| STANDARD | 状態遷移数課金($0.025 / 1,000遷移) | 遷移数が多いほど割高 |
| EXPRESS | 実行回数 + 実行時間(GB秒) | 短時間処理なら低コスト |
コスト最適化のポイント:
– 短時間処理(5分以内)にはExecutionType: EXPRESSが有利
– MaxConcurrencyを低く設定するとスループットは下がるが、AWS API制限(Lambda同時実行数等)に余裕ができる
– 大量の小ファイルはS3バッチ操作との比較も検討する
Section 2: アーキテクチャ解説
2-1. ハンズオンシナリオ概要
本ハンズオンでは、S3バケットに格納された大量のCSVファイルを Step Functions の Distributed Map を使って並列処理し、変換結果をS3出力バケットに集約するシナリオを実装します。
全体像: S3入力バケット(input/ プレフィックス配下)に複数のCSVファイルを配置し、Step Functionsのステートマシンを起動すると、Distributed Map StateがファイルリストをItemReaderで読み取り、Lambda関数(csv-converter)を複数の子実行として並列に呼び出します。各Lambda関数はCSVファイルを変換してS3出力バケットに書き込み、最終的にResultWriterがすべての実行結果をJSONLファイルとして集約します。

各コンポーネントの役割を以下の表に整理します。
| コンポーネント | 役割 |
|---|---|
| Step Functions(Distributed Map) | 親ワークフロー、ItemReader/子実行管理/ResultWriter制御 |
| Lambda(csv-converter) | 各CSVファイルの変換処理(子実行内で呼び出し) |
| S3入力バケット | 処理対象CSVファイルの保存場所(input/ プレフィックス) |
| S3出力バケット | 変換結果ファイルとResultWriterの集約JSONLの保存場所(results/ プレフィックス) |
| IAMロール | SFがS3/Lambda/CloudWatch Logsにアクセスするための権限 |
| CloudWatch Logs | 子実行のログ記録(デバッグ・監視) |
2-2. Distributed Map 実行フロー(段階的構築の概要)
Distributed Map は「親実行」が全体を制御し、各アイテムに対して「子実行」を起動する2層構造を持ちます。

実行の流れは以下の4ステップです。
- 親実行が開始 → ステートマシンが起動
- ItemReader がアイテムリストを取得 → S3 の
listObjectsV2API でファイル一覧を取得し、各ファイルを1件のアイテムとして扱う - 子実行を並列起動 → MaxConcurrencyの制限内で、各アイテムに対してSTANDARD実行として子実行を起動。Lambda(csv-converter)を呼び出す
- ResultWriter が結果を集約 → 全子実行の完了後、実行結果をJSONLファイルとしてS3に書き込み、親実行がSUCCEEDEDになる
本ハンズオンはStep A→Dの4段階で段階的に機能を追加していきます。
| ステップ | 追加機能 | 目的 |
|---|---|---|
| Step A | 基本Distributed Map(ItemReader + Lambda) | 並列処理の基礎を体験 |
| Step B | MaxConcurrency=5 | Lambda同時実行数の制御を習得 |
| Step C | ToleratedFailurePercentage=20 | 一部失敗を許容して大規模処理を安定化 |
| Step D | ResultWriter + Retry/Catch完成形 | 結果集約とエラーハンドリングを完成 |
この段階的な構築により、各機能が「何を解決するのか」を実感しながら学習できます。
2-3. エラー許容・MaxConcurrency の挙動
Distributed Map の2つの重要な制御パラメータを解説します。

MaxConcurrency(同時実行数制御)
MaxConcurrency は Distributed Map が同時に起動できる子実行の最大数を制御します。
MaxConcurrency=5に設定した場合、常に最大5件の子実行が同時実行中の状態になります- 1件の子実行が完了すると、即座にキューから次のアイテムが取り出され、空いたスロットに投入されます
MaxConcurrency=0に設定すると制限なし(全アイテムを同時起動)ですが、Lambdaの同時実行数制限に注意が必要です
実務上の推奨: Lambda のアカウントレベルの同時実行数制限(デフォルト1,000)と、下流サービス(DynamoDB、RDSなど)のスループット上限を考慮して設定します。
ToleratedFailurePercentage(失敗許容率)
大規模な並列処理では、一部のアイテムが失敗することは避けられません。ToleratedFailurePercentage はこの「部分的な失敗」を許容するための設定です。
- 全アイテムの処理完了後、失敗件数の割合を算出
- 失敗率 ≤ 設定値 → 親実行は
SUCCEEDED(失敗した子実行の情報はResultWriterの出力に記録される) - 失敗率 > 設定値 →
States.ExceededToleratedFailureThresholdエラーで親実行がFAILED
例:
– 総件数100件、失敗18件 → 失敗率18% ≤ 20% → SUCCEEDED
– 総件数100件、失敗25件 → 失敗率25% > 20% → FAILED
実務上の推奨: バッチ処理のビジネス要件に合わせて設定します。「99%以上成功すればよい」ような用途では ToleratedFailurePercentage=1、「一定数の失敗は許容できる」場合は10〜20%程度が目安です。
Note:
ToleratedFailureCountで絶対数を指定することも可能です。どちらか片方、または両方を設定できます(両方設定した場合はいずれかが超過した時点でFAILED)。
Section 3: AWSコンソールでのハンズオン
このセクションでは、AWSコンソールを使ってS3上のCSVファイルをDistributed Mapで並列処理するワークフローを実際に構築します。
「数十ファイルを同時並列処理する」体験が、このハンズオンの核心です。
ステートマシンはStep A→B→C→Dと段階的に機能を追加しながら構築します。最終形(Step D)では、MaxConcurrency制御 + 部分失敗許容 + ResultWriterによる結果集約 + Retry/Catchが全て揃った完成形を体験できます。
3-1. 前提条件
必要なAWS権限
以下のサービスを操作できるIAM権限が必要です。
| サービス | 必要な権限 |
|---|---|
| Amazon S3 | バケットの作成・オブジェクトのアップロード・ダウンロード |
| AWS Lambda | 関数の作成・編集・実行 |
| AWS Step Functions | ステートマシンの作成・実行 |
| AWS IAM | ロール・ポリシーの作成 |
| Amazon CloudWatch Logs | ログの閲覧 |
AWS CLIの準備
このハンズオンでは、CSVファイルのアップロードや結果確認にAWS CLIを使用します。
# バージョン確認
aws --version
# aws-cli/2.x.x Python/3.x.x ...
# 認証確認
aws sts get-caller-identity
認証情報が正しく設定されていれば、アカウントIDとARNが表示されます。
テスト用CSVファイル
このハンズオンでは10〜20個のCSVファイルを使用します。内容は任意です(顧客データCSVを想定)。
3-2. 準備: S3バケット作成
Distributed Mapで使用する入力バケットと出力バケットを作成します。
入力バケットの作成
コンソール操作手順:
- AWSコンソールで Amazon S3 を開く
- 「バケットを作成」をクリック
- 設定値:
- バケット名:
dm-input-{ランダム文字列}(例:dm-input-abc123)
> バケット名はグローバルで一意である必要があります。末尾にランダムな英数字を追加してください。 - AWSリージョン: アジアパシフィック(東京)ap-northeast-1
- その他: デフォルトのまま(パブリックアクセスはすべてブロック)
- 「バケットを作成」をクリック
- バケット名をメモ →
<INPUT_BUCKET_NAME>プレースホルダーに使用
出力バケットの作成
同様の手順で出力バケットを作成します。
- 「バケットを作成」をクリック
- バケット名:
dm-output-{ランダム文字列}(例:dm-output-abc123) - リージョン: ap-northeast-1(東京)
- 「バケットを作成」をクリック
- バケット名をメモ →
<OUTPUT_BUCKET_NAME>プレースホルダーに使用
テスト用CSVファイルのアップロード
入力バケットに input/ プレフィックスでCSVファイルをアップロードします。
# テスト用CSVファイルを10個生成してアップロード
for i in $(seq 1 10); do
echo "id,name,amount
$i,customer$i,$((i * 1000))" > /tmp/file${i}.csv
aws s3 cp /tmp/file${i}.csv s3://dm-input-{ランダム文字列}/input/file${i}.csv
done
# アップロード確認
aws s3 ls s3://dm-input-{ランダム文字列}/input/
10個のCSVファイルが input/ プレフィックス下に表示されることを確認してください。
3-3. Lambda関数のデプロイ(csv-converter)
Distributed Mapの各子実行が呼び出すLambda関数を作成します。
コンソール操作手順:
- AWSコンソールで Lambda を開く
- 「関数の作成」をクリック
- 設定値:
- 作成方法: 一から作成
- 関数名:
csv-converter - ランタイム: Python 3.12
- アーキテクチャ: x86_64
- 「関数の作成」をクリック
- コードエディタに以下を貼り付け:
import json
import os
import boto3
import csv
import io
from datetime import datetime
s3 = boto3.client("s3")
def lambda_handler(event, context):
# InputはS3オブジェクトメタデータ: {"Key": "input/fileN.csv", "Size": ..., "ETag": ...}
key = event.get("Key")
if not key:
raise ValueError("Key is required in event")
# 入力バケット名は環境変数から取得
input_bucket = os.environ.get("INPUT_BUCKET")
output_bucket = os.environ.get("OUTPUT_BUCKET")
if not input_bucket or not output_bucket:
raise ValueError("INPUT_BUCKET and OUTPUT_BUCKET environment variables are required")
# S3からCSVを読み込み
response = s3.get_object(Bucket=input_bucket, Key=key)
csv_content = response["Body"].read().decode("utf-8")
# CSV処理(例: 各行にタイムスタンプを追加)
rows = list(csv.DictReader(io.StringIO(csv_content)))
processed_rows = []
for row in rows:
row["processed_at"] = datetime.utcnow().isoformat()
row["status"] = "converted"
processed_rows.append(row)
# 処理結果をS3に出力
filename = key.split("/")[-1] # input/file1.csv → file1.csv
output_key = f"output/{filename}"
if processed_rows:
fieldnames = processed_rows[0].keys()
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(processed_rows)
s3.put_object(
Bucket=output_bucket,
Key=output_key,
Body=output.getvalue().encode("utf-8"),
ContentType="text/csv"
)
print(f"Converted: {key} → {output_key} ({len(processed_rows)} rows)")
return {
"input_key": key,
"output_key": output_key,
"rows_processed": len(processed_rows)
}
- 「Deploy」をクリックして保存
- 関数ARNをメモ (例:
arn:aws:lambda:ap-northeast-1:123456789012:function:csv-converter)
→<LAMBDA_FUNCTION_ARN>プレースホルダーに使用
環境変数の設定
- Lambda関数の画面 → 「設定」タブ → 「環境変数」→「編集」をクリック
- 以下の環境変数を追加:
| キー | 値 |
|---|---|
INPUT_BUCKET | dm-input-{ランダム文字列} |
OUTPUT_BUCKET | dm-output-{ランダム文字列} |
- 「保存」をクリック
なぜ環境変数を使うか: バケット名をコードにハードコードすると、環境ごとの変更が困難になります。環境変数を使うことでコードを変更せずに異なるバケットへ切り替えられます。
3-4. IAMロール作成
Step FunctionsがS3・Lambda・CloudWatch Logsを操作するためのIAMロールを作成します。
コンソール操作手順:
- AWSコンソールで IAM を開く → 「ロール」→「ロールを作成」
- 信頼されたエンティティの選択:
- タイプ: AWSのサービス
- ユースケース: 「Step Functions」を選択(リストにない場合は「カスタム信頼ポリシー」を使用)
- 信頼ポリシー(カスタム選択時):
json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "states.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]} - ロール名:
sf-distmap-execution-role - ロール作成後、「インラインポリシーを追加」から以下を設定:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "InvokeLambda",
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "<LAMBDA_FUNCTION_ARN>"
},
{
"Sid": "S3InputRead",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::dm-input-{ランダム文字列}",
"arn:aws:s3:::dm-input-{ランダム文字列}/*"
]
},
{
"Sid": "S3OutputWrite",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::dm-output-{ランダム文字列}/*"
},
{
"Sid": "StepFunctionsStartExecution",
"Effect": "Allow",
"Action": "states:StartExecution",
"Resource": "*"
},
{
"Sid": "CloudWatchLogs",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogDelivery",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:GetLogDelivery",
"logs:ListLogDeliveries",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies"
],
"Resource": "*"
}
]
}
- ポリシー名:
sf-distmap-policyとして保存
states:StartExecutionがDistributed Map必須の理由: Distributed Mapは各アイテムを独立した子実行(子ワークフロー)として起動します。この子実行の起動にはStep Functions自身のstates:StartExecution権限が必要です。Inline Mapでは不要ですが、Distributed Mapでは必ず付与してください。
3-5. ステートマシンを段階的に構築(記事の核心)
ここからがこのハンズオンの核心です。Step A→Dと段階的にステートマシンを構築し、Distributed Mapの仕組みを体験的に理解します。
Step A: 基本Distributed Map — 並列処理を初体験
まず最もシンプルな形でDistributed Mapを体験します。
ステートマシン作成手順:
- AWSコンソールで Step Functions を開く
- 「ステートマシンの作成」をクリック
- 「コードでワークフローを記述」を選択
- 以下のASLを貼り付け(
<INPUT_BUCKET_NAME>と<LAMBDA_FUNCTION_ARN>を実際の値に置換):
{
"Comment": "S3 CSV大規模並列処理 — Step A(基本Distributed Map)",
"StartAt": "ProcessCSVFiles",
"States": {
"ProcessCSVFiles": {
"Type": "Map",
"ItemReader": {
"Resource": "arn:aws:states:::s3:listObjectsV2",
"Parameters": {
"Bucket": "<INPUT_BUCKET_NAME>",
"Prefix": "input/"
}
},
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "ConvertCSV",
"States": {
"ConvertCSV": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "<LAMBDA_FUNCTION_ARN>",
"Payload.$": "$"
},
"ResultSelector": {
"result.$": "$.Payload"
},
"End": true
}
}
},
"End": true
}
}
}
ItemReaderがDistributed Mapの核心:arn:aws:states:::s3:listObjectsV2を使うことで、S3バケット内のオブジェクト一覧を自動的に取得し、各オブジェクトを1つのアイテムとして処理します。Lambda側ではevent["Key"]でオブジェクトキーを受け取れます。
- 設定値:
- ステートマシン名:
csv-distmap-step-a - 実行ロール:
sf-distmap-execution-role - ログ設定: ALLレベル を推奨(子実行の状況確認に必須)
- 「ステートマシンの作成」をクリック
実行テスト:
- 「実行の開始」をクリック
- 入力JSONに以下を貼り付け(ItemReaderでS3バケット名を直接指定しているため、入力は空で可):
{}
- 「実行の開始」をクリック
実行確認ポイント:
- 「マップ実行」タブを開くと、10個の子実行が並列に起動されることを確認
- 各子実行をクリックして
ConvertCSVステートの成功(SUCCEEDED)を確認 - S3出力バケットの
output/プレフィックスに変換済みCSVが出力されることを確認:bash
aws s3 ls s3://dm-output-{ランダム文字列}/output/
全体の実行が SUCCEEDED になれば、Step Aは完了です。
Step B: MaxConcurrency=5 — 並列度を制御する
無制限並列処理に上限を設け、Lambda同時実行数を制御します。
ステートマシン作成手順:
- 「ステートマシンの作成」→「コードでワークフローを記述」
- 以下のASLを貼り付け(プレースホルダーを置換):
{
"Comment": "S3 CSV大規模並列処理 — Step B(MaxConcurrency=5)",
"StartAt": "ProcessCSVFiles",
"States": {
"ProcessCSVFiles": {
"Type": "Map",
"ItemReader": {
"Resource": "arn:aws:states:::s3:listObjectsV2",
"Parameters": {
"Bucket": "<INPUT_BUCKET_NAME>",
"Prefix": "input/"
}
},
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "ConvertCSV",
"States": {
"ConvertCSV": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "<LAMBDA_FUNCTION_ARN>",
"Payload.$": "$"
},
"ResultSelector": {
"result.$": "$.Payload"
},
"End": true
}
}
},
"MaxConcurrency": 5,
"End": true
}
}
}
- ステートマシン名:
csv-distmap-step-b - 実行ロール:
sf-distmap-execution-role - 「ステートマシンの作成」をクリック
MaxConcurrencyの効果:
| 設定 | 挙動 |
|---|---|
MaxConcurrency: 0(デフォルト) | 全アイテムを同時に並列処理(無制限) |
MaxConcurrency: 5 | 最大5子実行が同時稼働。完了次第キューから次を起動 |
確認方法:
10ファイルを処理すると、最初に5件が同時に起動し、完了次第残りが起動します。「マップ実行」タブで同時実行数が5件を超えないことを観察してください。
CloudWatch Logsで同時実行の様子を確認:
aws logs filter-log-events \
--log-group-name "/aws/states/csv-distmap-step-b" \
--start-time $(date -d "1 hour ago" +%s000) \
--filter-pattern "ConvertCSV"
なぜMaxConcurrencyが重要か: Lambda同時実行数のソフトリミット(デフォルト1,000)や、接続先DB・APIのスロットリング制限を超えないようにするために、MaxConcurrencyで並列度を制御します。1,000万件処理でも安全に実行できます。
Step C: ToleratedFailurePercentage=20 — 一部失敗を許容する
Distributed Mapでは一部のアイテムが失敗しても、閾値以内なら全体をSUCCEEDEDにできます。
ステートマシン作成手順:
- 「ステートマシンの作成」→「コードでワークフローを記述」
- 以下のASLを貼り付け(プレースホルダーを置換):
{
"Comment": "S3 CSV大規模並列処理 — Step C(ToleratedFailurePercentage=20)",
"StartAt": "ProcessCSVFiles",
"States": {
"ProcessCSVFiles": {
"Type": "Map",
"ItemReader": {
"Resource": "arn:aws:states:::s3:listObjectsV2",
"Parameters": {
"Bucket": "<INPUT_BUCKET_NAME>",
"Prefix": "input/"
}
},
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "ConvertCSV",
"States": {
"ConvertCSV": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "<LAMBDA_FUNCTION_ARN>",
"Payload.$": "$"
},
"ResultSelector": {
"result.$": "$.Payload"
},
"End": true
}
}
},
"MaxConcurrency": 5,
"ToleratedFailurePercentage": 20,
"End": true
}
}
}
- ステートマシン名:
csv-distmap-step-c - 「ステートマシンの作成」をクリック
失敗体験ハンズオン:
ToleratedFailurePercentage=20 の動作を体験するために、Lambdaを一時的にエラー状態にします。
- Lambda コンソール →
csv-converterを開く - 「設定」→「環境変数」→「編集」
INPUT_BUCKETの値を 存在しないバケット名 に変更(例:dm-input-doesnotexist)- 「保存」をクリック
上記の状態でStep Cのステートマシンを実行すると:
– 全10件がLambdaエラー(S3 NoSuchBucket)になります
確認後は、INPUT_BUCKET を正しいバケット名に戻してください。
確認ポイント:
| 状況 | 結果 |
|---|---|
| 10件中2件以下が失敗(20%以内) | 全体は SUCCEEDED(失敗した子実行はFAILED個別表示) |
| 10件中3件以上が失敗(20%超) | 全体が FAILED(States.ExceedToleratedFailureThreshold) |
ToleratedFailurePercentagevsToleratedFailureCount: パーセント指定の他に件数指定も可能です(ToleratedFailureCount: 2)。データ件数が変動する場合はパーセント指定が柔軟です。
Step D: ResultWriter + Retry/Catch 完成形
全機能を統合した完成形です。ResultWriterで結果をS3に集約し、Retry/Catchで個別失敗に対処します。
ステートマシン作成手順:
- 「ステートマシンの作成」→「コードでワークフローを記述」
- 以下のASLを貼り付け(
<INPUT_BUCKET_NAME>・<LAMBDA_FUNCTION_ARN>・<OUTPUT_BUCKET_NAME>を実際の値に置換):
{
"Comment": "S3 CSV大規模並列処理 — Step D(ResultWriter+Retry/Catch完成形)",
"StartAt": "ProcessCSVFiles",
"States": {
"ProcessCSVFiles": {
"Type": "Map",
"ItemReader": {
"Resource": "arn:aws:states:::s3:listObjectsV2",
"Parameters": {
"Bucket": "<INPUT_BUCKET_NAME>",
"Prefix": "input/"
}
},
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "ConvertCSV",
"States": {
"ConvertCSV": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "<LAMBDA_FUNCTION_ARN>",
"Payload.$": "$"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleConversionError",
"ResultPath": "$.error"
}
],
"ResultSelector": {
"status": "SUCCESS",
"output_key.$": "$.Payload.output_key"
},
"ResultPath": "$.conversion_result",
"Next": "ConversionSuccess"
},
"ConversionSuccess": {
"Type": "Pass",
"Parameters": {
"status": "SUCCESS",
"input_key.$": "$.Key",
"output_key.$": "$.conversion_result.output_key"
},
"End": true
},
"HandleConversionError": {
"Type": "Pass",
"Parameters": {
"status": "FAILED",
"input_key.$": "$.Key",
"error.$": "$.error.Cause"
},
"End": true
}
}
},
"MaxConcurrency": 5,
"ToleratedFailurePercentage": 20,
"ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Parameters": {
"Bucket": "<OUTPUT_BUCKET_NAME>",
"Prefix": "results/"
}
},
"ResultPath": "$.map_result",
"End": true
}
}
}
- ステートマシン名:
csv-distmap-final - 実行ロール:
sf-distmap-execution-role - 「ステートマシンの作成」をクリック
Step Dで追加された機能の説明:
| 機能 | 設定 | 効果 |
|---|---|---|
| Retry | IntervalSeconds: 2, MaxAttempts: 3, BackoffRate: 2 | Lambda一時的失敗時に最大3回リトライ(2秒→4秒→8秒の指数バックオフ) |
| Catch | States.ALL → HandleConversionError | 3回リトライしても失敗 → {"status": "FAILED"} を記録して継続 |
| ResultWriter | s3:putObject → results/ | 全子実行の結果をMANIFEST.jsonとしてS3に集約 |
| ResultPath | $.map_result | Mapステートの出力をResultWriterの参照先に保存 |
実行テスト:
入力JSONは空で実行可能です:
{}
実行後の確認:
# ResultWriterが出力したMANIFEST.jsonを確認
aws s3 ls s3://dm-output-{ランダム文字列}/results/
aws s3 cp s3://dm-output-{ランダム文字列}/results/MANIFEST.json - | python3 -m json.tool
# 変換済みCSVを確認
aws s3 ls s3://dm-output-{ランダム文字列}/output/
aws s3 cp s3://dm-output-{ランダム文字列}/output/file1.csv -
MANIFEST.jsonには各子実行の結果ファイルへの参照リストが含まれます。
ResultWriterの仕組み: 全子実行が完了すると、Step FunctionsがMANIFEST.jsonを自動生成してS3に保存します。MANIFEST.jsonには各子実行の出力ファイルへのS3パスが記録されており、後続処理でこのファイルを読むことで全結果をまとめて参照できます。
3-6. 実行結果の確認
S3出力の確認
# ResultWriterの出力(MANIFEST.json)を確認
aws s3 ls s3://dm-output-{ランダム文字列}/results/
aws s3 cp s3://dm-output-{ランダム文字列}/results/MANIFEST.json - | python3 -m json.tool
# 変換済みCSVファイルの一覧確認
aws s3 ls s3://dm-output-{ランダム文字列}/output/
# 個別ファイルの内容確認
aws s3 cp s3://dm-output-{ランダム文字列}/output/file1.csv -
変換済みCSVには、元のフィールドに加えて processed_at と status フィールドが追加されていることを確認してください。
マップ実行タブでの確認
Step Functionsコンソールの実行詳細画面で「マップ実行」タブを開くと:
- 各子実行のステータス(SUCCEEDED/FAILED)が一覧表示されます
- 各子実行をクリックすると、その子実行のグラフが表示されます(Distributed Mapの固有機能)
- Step Dでは、失敗した子実行でも
HandleConversionErrorを通じて{"status": "FAILED"}として記録されます
3-7. CloudWatch Logsで並列処理の様子を確認
ログ確認手順
- Step Functionsコンソール → 対象の実行を開く
- 「実行の詳細」タブ → 「ログ」セクションを確認
- CloudWatch Logsのリンクをクリックして詳細ログを表示
Distributed Map固有の特性: 独立したログストリーム
Distributed Mapでは各子実行が独立したログストリームを持ちます。これはInline Mapとの重要な違いです。
# ログストリームの一覧を確認(各子実行が独立したストリームを持つことを確認)
aws logs describe-log-streams \
--log-group-name "/aws/states/csv-distmap-final" \
--order-by LastEventTime \
--descending \
--max-items 15 \
--query 'logStreams[*].logStreamName'
各子実行(子ワークフロー)が独立したログストリームで記録されているため、特定ファイルの処理ログを個別に追跡できます。
並列処理のシーケンスを確認
ログで以下のシーケンスを確認してください(MaxConcurrency=5の場合):
MapStateEntered ← Distributed Mapステート開始
MapRunStarted← マップ実行開始
ChildWorkflowExecutionStarted×5 ← 最初の5子実行が同時に起動
ChildWorkflowExecutionSucceeded ← 子実行1が完了
ChildWorkflowExecutionStarted ← 6番目の子実行が起動(スロット開放)
...
MapRunSucceeded ← 全子実行完了
MapStateExited ← Distributed Mapステート終了
CloudWatch Metricsでモニタリング
本番環境では以下のメトリクスを監視することを推奨します:
| メトリクス | 説明 | アラート推奨値 |
|---|---|---|
ExecutionsFailed | 失敗した実行数 | 1以上でアラート |
ExecutionsTimedOut | タイムアウトした実行数 | 1以上でアラート |
ExecutionThrottled | スロットリングされた実行数 | 急増時にアラート |
MapRunsFailed | 失敗したマップ実行数 | 1以上でアラート |
まとめ(コンソール編)
このセクションでは、S3×2バケット + Lambda + Distributed Mapを段階的に構築しました。
| Step | 追加した機能 | 学んだこと |
|---|---|---|
| A | 基本Distributed Map | ItemReaderでS3一覧取得、$.Key で個別ファイル処理 |
| B | MaxConcurrency=5 | 並列度の制御、Lambda同時実行数の管理 |
| C | ToleratedFailurePercentage=20 | 部分失敗の許容、States.ExceedToleratedFailureThreshold |
| D | ResultWriter + Retry/Catch | 結果の自動集約、指数バックオフリトライ、個別エラーの記録 |
Distributed Mapの本質: Step Functionsが自動的にS3オブジェクト一覧を取得し、各ファイルを独立した子実行として並列処理します。「10件でも1,000万件でも同じASLで処理できる」のがDistributed Mapの最大の強みです。
次のSection 4では、このワークフロー全体をTerraformで自動構築します。
Section 4: Terraformでの構築
本セクションでは、コンソール版(Section 3)で構築したDistributed MapステートマシンとASL構造が完全一致するTerraform構成を解説します。S3入力バケット内のCSVファイル群をDistributed Mapで並列処理し、S3出力バケットに集約するシナリオを実装します。
4-1. 前提条件
- Terraform 1.0以上インストール済み
- AWS CLI設定済み(
aws configureで認証情報設定) - 適切なIAM権限(S3 / Lambda / Step Functions / IAM リソースの作成・管理権限)
4-2. ディレクトリ構成
sf-distmap/
├── main.tf
├── variables.tf
├── outputs.tf
├── lambda/
│└── csv_converter.py
└── statemachine/
└── definition.json.tpl
4-3. Lambda ソースコード
lambda/csv_converter.py — コンソール版(Section 3)と同じロジック構成のコードです。
import json
import os
import boto3
import csv
import io
from datetime import datetime
s3 = boto3.client("s3")
def lambda_handler(event, context):
key = event.get("Key")
if not key:
raise ValueError("Key is required in event")
input_bucket = os.environ.get("INPUT_BUCKET")
output_bucket = os.environ.get("OUTPUT_BUCKET")
if not input_bucket or not output_bucket:
raise ValueError("INPUT_BUCKET and OUTPUT_BUCKET environment variables are required")
response = s3.get_object(Bucket=input_bucket, Key=key)
csv_content = response["Body"].read().decode("utf-8")
rows = list(csv.DictReader(io.StringIO(csv_content)))
processed_rows = []
for row in rows:
row["processed_at"] = datetime.utcnow().isoformat()
row["status"] = "converted"
processed_rows.append(row)
filename = key.split("/")[-1]
output_key = f"output/{filename}"
if processed_rows:
fieldnames = processed_rows[0].keys()
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(processed_rows)
s3.put_object(
Bucket=output_bucket,
Key=output_key,
Body=output.getvalue().encode("utf-8"),
ContentType="text/csv"
)
return {
"input_key": key,
"output_key": output_key,
"rows_processed": len(processed_rows)
}
4-4. statemachine/definition.json.tpl
コンソール版(Section 3)のStep D と同じASL構造です。Terraformでは templatefile() 関数を使ってバケット名とLambda ARNを動的に補間します。${...} 形式の変数はTerraform実行時に実際の値に置き換わります。
${input_bucket_name}→ 入力S3バケット名(ItemReader用)${output_bucket_name}→ 出力S3バケット名(ResultWriter用)${lambda_function_arn}→ Lambda関数のARN
{
"Comment": "S3 CSV大規模並列処理 — Step D(ResultWriter+Retry/Catch完成形)",
"StartAt": "ProcessCSVFiles",
"States": {
"ProcessCSVFiles": {
"Type": "Map",
"ItemReader": {
"Resource": "arn:aws:states:::s3:listObjectsV2",
"Parameters": {
"Bucket": "${input_bucket_name}",
"Prefix": "input/"
}
},
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "ConvertCSV",
"States": {
"ConvertCSV": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "${lambda_function_arn}",
"Payload.$": "$"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleConversionError",
"ResultPath": "$.error"
}
],
"ResultSelector": {
"status": "SUCCESS",
"output_key.$": "$.Payload.output_key"
},
"ResultPath": "$.conversion_result",
"Next": "ConversionSuccess"
},
"ConversionSuccess": {
"Type": "Pass",
"Parameters": {
"status": "SUCCESS",
"input_key.$": "$.Key",
"output_key.$": "$.conversion_result.output_key"
},
"End": true
},
"HandleConversionError": {
"Type": "Pass",
"Parameters": {
"status": "FAILED",
"input_key.$": "$.Key",
"error.$": "$.error.Cause"
},
"End": true
}
}
},
"MaxConcurrency": 5,
"ToleratedFailurePercentage": 20,
"ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Parameters": {
"Bucket": "${output_bucket_name}",
"Prefix": "results/"
}
},
"ResultPath": "$.map_result",
"End": true
}
}
}
4-5. variables.tf
variable "aws_region" {
default = "ap-northeast-1"
}
variable "project_name" {
default = "csv-distmap"
}
variable "random_suffix" {
description = "バケット名の一意性を確保するためのサフィックス(例: abc123)"
type = string
}
4-6. main.tf(完全なコード)
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
archive = {
source = "hashicorp/archive"
version = "~> 2.0"
}
}
}
provider "aws" {
region = var.aws_region
}
# ─────────────────────────────────────────────
# S3バケット(入力・出力)
# ─────────────────────────────────────────────
resource "aws_s3_bucket" "input" {
bucket = "dm-input-${var.random_suffix}"
}
resource "aws_s3_bucket_public_access_block" "input" {
bucket = aws_s3_bucket.input.id
block_public_acls = true
block_public_policy = true
ignore_public_acls= true
restrict_public_buckets = true
}
resource "aws_s3_bucket" "output" {
bucket = "dm-output-${var.random_suffix}"
}
resource "aws_s3_bucket_public_access_block" "output" {
bucket = aws_s3_bucket.output.id
block_public_acls = true
block_public_policy = true
ignore_public_acls= true
restrict_public_buckets = true
}
# ─────────────────────────────────────────────
# Lambda関数
# ─────────────────────────────────────────────
data "archive_file" "lambda_zip" {
type = "zip"
source_file = "${path.module}/lambda/csv_converter.py"
output_path = "${path.module}/lambda.zip"
}
resource "aws_iam_role" "lambda_role" {
name = "${var.project_name}-lambda-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
Action = "sts:AssumeRole"
}
]
})
}
resource "aws_iam_role_policy_attachment" "lambda_basic" {
role = aws_iam_role.lambda_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
resource "aws_iam_role_policy" "lambda_s3_policy" {
name = "${var.project_name}-lambda-s3-policy"
role = aws_iam_role.lambda_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = ["s3:GetObject"]
Resource = "${aws_s3_bucket.input.arn}/*"
},
{
Effect = "Allow"
Action = ["s3:PutObject"]
Resource = "${aws_s3_bucket.output.arn}/*"
}
]
})
}
resource "aws_cloudwatch_log_group" "lambda_logs" {
name = "/aws/lambda/${var.project_name}-csv-converter"
retention_in_days = 14
}
resource "aws_lambda_function" "csv_converter" {
filename= data.archive_file.lambda_zip.output_path
function_name = "${var.project_name}-csv-converter"
role = aws_iam_role.lambda_role.arn
handler = "csv_converter.lambda_handler"
runtime = "python3.12"
source_code_hash = data.archive_file.lambda_zip.output_base64sha256
timeout = 60
environment {
variables = {
INPUT_BUCKET = aws_s3_bucket.input.bucket
OUTPUT_BUCKET = aws_s3_bucket.output.bucket
}
}
depends_on = [aws_cloudwatch_log_group.lambda_logs]
}
# ─────────────────────────────────────────────
# Step Functions(Distributed Map)
# ─────────────────────────────────────────────
resource "aws_iam_role" "sf_execution_role" {
name = "${var.project_name}-sf-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = { Service = "states.amazonaws.com" }
Action = "sts:AssumeRole"
}
]
})
}
resource "aws_iam_role_policy" "sf_policy" {
name = "${var.project_name}-sf-policy"
role = aws_iam_role.sf_execution_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
# Lambda呼び出し
Effect= "Allow"
Action= ["lambda:InvokeFunction"]
Resource = aws_lambda_function.csv_converter.arn
},
{
# 入力バケット(ItemReader: listObjectsV2に必須)
Effect = "Allow"
Action = ["s3:GetObject", "s3:ListBucket"]
Resource = [
aws_s3_bucket.input.arn,
"${aws_s3_bucket.input.arn}/*"
]
},
{
# 出力バケット(ResultWriter用)
Effect = "Allow"
Action = ["s3:GetObject", "s3:PutObject"]
Resource = [
aws_s3_bucket.output.arn,
"${aws_s3_bucket.output.arn}/*"
]
},
{
# Distributed Map子実行起動(必須)
Effect= "Allow"
Action= ["states:StartExecution"]
Resource = "*"
},
{
# CloudWatch Logsへの書き込み
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogDelivery",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeResourcePolicies"
]
Resource = "*"
}
]
})
}
resource "aws_sfn_state_machine" "csv_distmap" {
name = "${var.project_name}-state-machine"
role_arn = aws_iam_role.sf_execution_role.arn
definition = templatefile("${path.module}/statemachine/definition.json.tpl", {
input_bucket_name= aws_s3_bucket.input.bucket
output_bucket_name = aws_s3_bucket.output.bucket
lambda_function_arn = aws_lambda_function.csv_converter.arn
})
logging_configuration {
level= "ALL"
include_execution_data = true
log_destination = "${aws_cloudwatch_log_group.sf_logs.arn}:*"
}
depends_on = [aws_iam_role_policy.sf_policy]
}
resource "aws_cloudwatch_log_group" "sf_logs" {
name = "/aws/states/${var.project_name}-state-machine"
retention_in_days = 14
}
4-7. outputs.tf
output "state_machine_arn" {
description = "ステートマシンのARN"
value = aws_sfn_state_machine.csv_distmap.arn
}
output "input_bucket_name" {
description = "入力S3バケット名"
value = aws_s3_bucket.input.bucket
}
output "output_bucket_name" {
description = "出力S3バケット名"
value = aws_s3_bucket.output.bucket
}
output "lambda_function_arn" {
description = "Lambda関数のARN"
value = aws_lambda_function.csv_converter.arn
}
4-8. デプロイ手順
terraform init
terraform plan -var="random_suffix=abc123"
terraform apply -var="random_suffix=abc123"
random_suffix はS3バケット名のグローバル一意性を確保するために使います。abc123 の部分は任意の文字列(英小文字・数字)に変更してください。
4-9. テスト用CSVアップロード
デプロイ完了後、入力バケットにテスト用CSVファイルをアップロードします。
# テストCSVファイルを入力バケットにアップロード
for i in $(seq 1 10); do
echo -e "id,name,amount\n$i,customer$i,$((i * 1000))" > /tmp/file${i}.csv
aws s3 cp /tmp/file${i}.csv \
s3://$(terraform output -raw input_bucket_name)/input/file${i}.csv
done
4-10. 動作確認
# ステートマシン実行
aws stepfunctions start-execution \
--state-machine-arn $(terraform output -raw state_machine_arn) \
--input '{}'
# 実行結果確認
aws s3 ls s3://$(terraform output -raw output_bucket_name)/results/
aws s3 cp s3://$(terraform output -raw output_bucket_name)/results/MANIFEST.json - | python3 -m json.tool
実行後、results/ プレフィックス配下に MANIFEST.json が生成されます。このファイルにはResultWriterによる集約結果(各アイテムのステータス)が記録されています。
まとめ(Terraform編)
| ポイント | 内容 |
|---|---|
| ASL定義 | コンソール版Step DのASLをそのまま definition.json.tpl として使用 |
| テンプレート補間 | templatefile() で3変数(入力バケット・出力バケット・Lambda ARN)を注入 |
| IAM権限 | states:StartExecution(子実行起動)と s3:ListBucket(ItemReader)が必須 |
| 並列度制御 | MaxConcurrency: 5・ToleratedFailurePercentage: 20 でコンソール版と同一設定 |
| エラー耐性 | Retry(最大3回・指数バックオフ)+ Catch(全エラーをハンドリング)済み |
Section 5: 実践Tips(Distributed Map設計ガイド)
このSectionでは、Distributed Mapを実務で使う際のTipsと注意点を解説します。
5-1. コスト計算: 子実行数 × 状態遷移数
課金の仕組み:
– 親実行: 状態遷移数 × 単価($0.000025/遷移)
– 子実行(Standard): 各実行の状態遷移数 × 単価(別途カウント)
📌 無料枠: 毎月4,000回の状態遷移まで無料。本ハンズオン程度なら無料枠に収まります。
コスト見積もり例(Step D: 3ステート/子実行, 1,000ファイル, Standard実行):
| 項目 | 計算 | 料金目安 |
|---|---|---|
| 親実行の状態遷移 | 1実行 × 1遷移 | ¥0(無料枠内) |
| 子実行数 | 1,000実行 | 1,000実行 |
| 子実行あたりの状態遷移 | ConvertCSV + ConversionSuccess = 2遷移 | 2遷移/実行 |
| 合計状態遷移数 | 1,000 × 2 = 2,000遷移 | 無料枠4,000遷移以内 → $0 |
| Lambda実行コスト | 1,000回 × 平均1秒 × 128MB | ~$0.002 |
Express実行との比較:
| 項目 | Standard | Express |
|---|---|---|
| 最大実行時間 | 1年 | 5分 |
| 価格モデル | 状態遷移数課金($0.000025/遷移) | 実行数($1/100万) + 継続時間課金 |
| 高スループット向き | × | ✓(最大100,000遷移/秒) |
| 実行履歴参照 | ✓(90日保存) | △(CloudWatch Logsのみ) |
| 選択基準 | 長時間処理・デバッグ優先 | 大量・短時間・高スループット |
5-2. MaxConcurrency のチューニング
Lambda同時実行数制限との関係:
– Lambda デフォルト同時実行数: 1,000(リージョン全体のソフトリミット)
– 他のLambda関数と共有している場合、MaxConcurrencyを低く設定してリソースを確保
– Distributed Mapの最大並列数は 10,000 まで設定可能(Step Functions サービスクォータ)
推奨設定パターン:
| シナリオ | MaxConcurrency推奨値 | 理由 |
|---|---|---|
| 開発・テスト | 5〜10 | デバッグしやすい |
| 本番(Lambda共有環境) | 50〜100 | 他関数へのリソース確保 |
| 本番(専用Lambda) | 500〜1,000 | Lambdaの同時実行上限近くまで活用 |
| 外部API呼び出しあり | API のレート制限に合わせる | スロットリング回避 |
0設定(無制限)の注意点:
"Map": {
"Type": "Map",
"MaxConcurrency": 0, // 0 = 無制限(非推奨)
...
}
- 数千アイテムを一斉起動するとLambdaスロットリングが多発
ToleratedFailurePercentageと組み合わせない場合、全体FAILEDリスクあり- 推奨: 常にMaxConcurrencyを明示的に設定する
5-3. 大規模実行時の注意(10万+アイテム)
制限値(2026年時点):
| 制限 | 値 | 備考 |
|---|---|---|
| ItemReader(S3 listObjectsV2) | 最大1億オブジェクト | 自動ページネーション |
| 同時実行子ワークフロー数(最大) | 10,000 | Step Functions クォータ |
| 子実行の最大実行時間(Standard) | 1年 | 大規模バッチも問題なし |
大規模処理での注意点:
- S3 listObjectsV2のページネーション: 自動的に処理される(Step Functions側で管理)
- 実行時間の試算: Standard実行は子実行1年以内。100万ファイルを1分/ファイルで処理する場合:
- MaxConcurrency=1,000 → 実行時間 = 100万/1,000 × 60秒 = 60,000秒 ≈ 17時間(問題なし)
- CloudWatch Logs のコスト: 子実行が多いとログ量が膨大になる。
本番環境ではERRORレベルのみ記録することを検討:hcl
# Terraform例
logging_configuration {
level = "ERROR"
include_execution_data = false
log_destination = "${aws_cloudwatch_log_group.sf.arn}:*"
} - ResultWriter の活用: 大規模処理では結果を親実行の出力に含めると256KB制限を超える。
必ずResultWriterでS3に集約すること:json
"ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Parameters": {
"Bucket": "<OUTPUT_BUCKET>",
"Prefix": "results/"
}
}
5-4. CSVヘッダー行の扱い(ItemReader CSV設定)
S3のCSVファイルを行ごとに子実行で処理する場合(listObjectsV2ではなくCSV形式のItemReader):
"ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": {
"InputType": "CSV",
"CSVHeaderLocation": "FIRST_ROW"
},
"Parameters": {
"Bucket": "<BUCKET_NAME>",
"Key": "<CSV_KEY>"
}
}
CSVHeaderLocationの設定値:
| 値 | 動作 |
|---|---|
FIRST_ROW | 1行目をヘッダーとして扱い、各行はオブジェクトに変換 |
GIVEN | ヘッダー名を CSVHeaders フィールドで指定 |
- ヘッダー行は自動スキップされるため、手動でのフィルタリングは不要
- 各行のデータはヘッダー名をキーとするJSONオブジェクトとして子実行に渡される
5-5. Inline Map vs Distributed Map の選択基準
アイテム数が40以下?
YES → Inline Map(コスト低・設定シンプル)
NO ↓
データソースがS3オブジェクトリスト/CSVファイル?
YES → Distributed Map(ItemReader: S3利用)
NO ↓
子実行を独立して監視・再実行したい?
YES → Distributed Map
NO ↓
処理時間が5分以内かつ高スループットが必要?
YES → Distributed Map(ExecutionType: EXPRESS)
NO → Distributed Map(ExecutionType: STANDARD)
💡 Tips: 「とりあえずDistributed Map」は避けること。40アイテム以下ならInline Mapの方がシンプルでコストも低い。
Section 6: ハンズオン後の削除手順
⚠️ 放置するとS3ストレージ料金、CloudWatch Logs料金が発生します。ハンズオン終了後は忘れずに削除してください。
6-1. コスト注意事項
| リソース | 月額目安 | 備考 |
|---|---|---|
| Lambda | 無料枠内(本ハンズオン程度) | 100万リクエスト/月まで無料 |
| Step Functions | 無料枠内(本ハンズオン程度) | 4,000回の状態遷移まで無料 |
| S3 | ~$0.025/GB/月 | テストCSVなら数円以下 |
| CloudWatch Logs | ~$0.76/GB | ALLレベルだと子実行分も発生 |
6-2. Terraformで構築した場合
# S3バケット内のオブジェクトを先に削除(バケット削除の前提)
aws s3 rm s3://$(terraform output -raw input_bucket_name) --recursive
aws s3 rm s3://$(terraform output -raw output_bucket_name) --recursive
# リソース一括削除
terraform destroy -var="random_suffix=abc123"
手動削除が必要なもの(Terraformが管理しないリソース):
aws logs delete-log-group \
--log-group-name /aws/lambda/csv-distmap-csv-converter
6-3. コンソールで構築した場合の削除チェックリスト
- [ ] Step Functions ステートマシン(4つ: step-a/b/c/final)を削除
- [ ] Lambda 関数(csv-converter)を削除
- [ ] S3バケット(
dm-input-XXXXX): オブジェクト全削除 → バケット削除 - [ ] S3バケット(
dm-output-XXXXX): オブジェクト全削除 → バケット削除 - [ ] IAMロール(
sf-distmap-execution-role)と付随するポリシーを削除 - [ ] CloudWatch Logs ロググループを削除
S3バケットの完全削除(CLIで一括):
# バージョン管理が無効の場合
aws s3 rb s3://dm-input-{ランダム文字列} --force
aws s3 rb s3://dm-output-{ランダム文字列} --force
💡 バージョン管理が有効な場合は、先にすべてのバージョンを削除してから
rbを実行する必要があります。
Section 7: まとめと次のステップ
7-1. この記事で学んだこと
本記事では、AWS Step Functions の Distributed Map を使った大規模並列処理を、実際のCSV変換パイプラインで体験しました。
| テーマ | 学んだこと |
|---|---|
| Distributed Map の基本 | Inline Mapとの違い(アイテム数上限・子実行・ItemReader/ResultWriter) |
| ItemReader | S3 listObjectsV2を使ったS3オブジェクト大規模一括処理 |
| 並列度制御 | MaxConcurrencyによるLambda同時実行数との兼ね合い |
| 耐障害性 | ToleratedFailurePercentageによる一部失敗の許容 |
| 結果集約 | ResultWriterによる処理結果のS3集約(MANIFEST.json) |
| エラーハンドリング | Retry/CatchによるLambda一時的失敗への対応 |
| コスト最適化 | Standard vs Expressの選択基準と料金計算 |
7-2. 次のステップ
本シリーズの続きとして、以下のテーマを予定しています:
- 第7弾予告: Express vs Standard Workflow の詳細比較 — ユースケース別の最適解を解説
- 第8弾予告: SDK Direct Integration(Lambda-less)— Lambdaを介さずAWSサービスを直接呼び出す手法
- 応用編: Distributed Map × DynamoDB(大量アイテムの読み書き)
7-3. シリーズリンク
本記事は「AWS ハンズオン TechBlog」Step Functions シリーズの第6回です。
シリーズ一覧:
– 第1回: AWS Step Functions 入門 — コンソールとTerraformで学ぶハンズオン
– 第2回: ECS × Step Functions 入門 — CSVバッチをFargateタスクでジョブ化するハンズオン
– 第3回: Step Functions エラーハンドリング完全ガイド — 注文処理パイプラインで学ぶRetry/Catch/Timeout
– 第4回: Step Functions 入出力データフロー制御完全ガイド — 5つのフィルタでペイロードを最適化するハンズオン
– 第5回: Step Functions Callbackパターン完全ガイド — .waitForTaskTokenで実現する経費承認ワークフロー
– 第6回: 本記事(Distributed Map完全ガイド)
7-4. 参考リンク
- Amazon States Language — Distributed Map State
- Step Functions Distributed Map の処理モード
- ItemReader の設定オプション
- ResultWriter の出力形式(MANIFEST.json)