Step Functions Distributed Map完全ガイド — S3大規模並列処理をハンズオンで習得

目次

Step Functions Distributed Map完全ガイド — S3大規模並列処理をハンズオンで習得

公開日: 2026-04-13
難易度: 中級〜上級
所要時間: 約120分
シリーズ: [Step Functions シリーズ 第6回]

この記事で学ぶこと
– Distributed Map と Inline Map の違い(アイテム数上限・子実行・ItemReader/ResultWriter)
– ItemReader(S3 listObjectsV2)を使ったS3オブジェクト大規模一括処理
– MaxConcurrency / ToleratedFailurePercentage / ResultWriter の設定と挙動
– 段階的構築(Step A→B→C→D): 基本→並列度制御→エラー許容→ResultWriter完成形
– Terraform での完全構築(コンソール版とASL完全一致)

シリーズ前回記事:
– 第1回: AWS Step Functions 入門
– 第2回: ECS × Step Functions 入門
– 第3回: Step Functions エラーハンドリング完全ガイド
– 第4回: Step Functions 入出力データフロー制御完全ガイド
– 第5回: Step Functions Callbackパターン完全ガイド


Section 1: Distributed Map概念編

1-1. Distributed Mapとは

AWS Step FunctionsのMap Stateには、2つの動作モードが存在します。

  • Inline MapMode: INLINE): 同一ワークフロー実行内で並列処理を行うデフォルトモード。同時実行できるイテレーション数の上限は 40件
  • Distributed MapMode: DISTRIBUTED): 各アイテムに対して子ワークフロー(Standard/Express実行)を個別に起動して並列処理するモード。アイテム数は事実上無制限(最大1億件)、同時実行上限は最大10,000件。

Distributed Mapが必要な状況

シナリオ理由
40アイテムを超える大規模データ処理Inline Mapの40件上限を超えるため
S3上のファイル群を直接参照したい場合ItemReaderでS3オブジェクト一覧を取得できるのはDistributed Mapのみ
子実行を独立して管理・監視したい場合各子実行が独立した実行履歴を持ち、個別追跡が可能

1-2. Inline Map vs Distributed Map 比較表

特性Inline MapDistributed Map
Mode設定INLINE(デフォルト)DISTRIBUTED
アイテム上限40件事実上無制限(最大1億件)
同時実行上限40件最大10,000件(MaxConcurrency=0で上限まで)
実行方式同一実行内での並列子ワークフロー(Standard/Express)起動
ItemReader不可(配列のみ)可(S3/JSON配列/CSV/JSONL/Parquet)
ResultWriter不可可(S3出力・JSONL形式)
課金親実行の状態遷移数のみ親 + 子実行の状態遷移数
実行履歴親実行に含まれる子実行ごとに独立
ユースケース小規模並列処理(〜40件)大規模データ処理 / S3直接参照

1-3. アーキテクチャ: 親実行 → 子実行

Distributed Mapでは、ItemReaderでデータソースを読み込み、各アイテムに対して子ワークフローを起動します。全子実行の結果はResultWriterでS3に集約されます。

┌─────────────────────────────────────────┐
│ 親ワークフロー実行│
│  ProcessCSVFiles (Distributed Map State) │
│ │ │
│  ItemReader: S3 listObjectsV2│
│  [file1.csv, file2.csv, ..., fileN.csv]  │
│ │ │
│  ┌────┴────────────────────────┐│
│  │  子実行1  子実行2  子実行N  ││
│  │  file1 file2 fileN││
│  │  LambdaLambda  Lambda  ││
│  └────┬────────────────────────┘│
│ │ │
│  ResultWriter: S3 output/results/  │
└─────────────────────────────────────────┘

  • 子実行はStandard ExecutionまたはExpress Executionを選択可能
  • MaxConcurrencyで同時実行数を制御(0=上限(10,000)まで一斉起動)

1-4. 主要フィールド詳細解説

ItemReader

データソースを指定するフィールド。Distributed Mapでのみ使用可能。

Resourceタイプ説明
arn:aws:states:::s3:listObjectsV2S3バケットのオブジェクト一覧を取得
arn:aws:states:::s3:getObject(JSON/JSONL)S3オブジェクトの中身(JSON配列またはJSONL形式)
arn:aws:states:::s3:getObject(CSV)S3 CSVファイルの行ごとに処理
ItemsPath(JSON配列)入力JSONの配列フィールドを直接参照

S3 listObjectsV2の例(プレフィックス配下のオブジェクト一覧):

"ItemReader": {
  "Resource": "arn:aws:states:::s3:listObjectsV2",
  "Parameters": {
 "Bucket": "<INPUT_BUCKET_NAME>",
 "Prefix": "input/"
  }
}


→ 各子実行のInputは以下のような形式になります:

{ "Key": "input/file1.csv", "Size": 1024, "ETag": "abc123...", "LastModified": "..." }


ItemProcessor

子実行内で動かすワークフロー定義とモード設定を記述します。

"ItemProcessor": {
  "ProcessorConfig": {
 "Mode": "DISTRIBUTED",
 "ExecutionType": "STANDARD"
  },
  "StartAt": "ConvertCSV",
  "States": { ... }
}


ExecutionType の選択基準:

項目STANDARDEXPRESS
最大実行時間1年5分
実行モデルExactly-onceAt-least-once
実行履歴CloudWatch + Step Functions コンソールCloudWatch Logs のみ
コスト状態遷移数課金(低単価)実行回数 + 実行時間課金
向いているケース長時間処理・監査要件あり短時間高スループット処理

MaxConcurrency

同時に実行する子ワークフロー数の上限を指定します。

"MaxConcurrency": 5

動作
0上限なし(最大10,000件まで一斉起動)
1〜10000指定した数を超えないよう並列実行を制御

ポイント: Lambdaのアカウントデフォルト同時実行数(1,000)との兼ね合いで設定。大量アイテムを処理する場合は MaxConcurrency を下げてスロットリングを回避。


ToleratedFailurePercentage / ToleratedFailureCount

全体を失敗とみなす前に許容できる失敗率/件数を設定します。どちらか一方、または両方を指定可能(両方指定時はいずれかを超えた時点で失敗)。

"ToleratedFailurePercentage": 20

フィールド説明
ToleratedFailurePercentage失敗を許容する割合(%)20 → 全体の20%以下なら全体SUCCEEDED
ToleratedFailureCount失敗を許容する絶対件数5 → 5件以下の失敗なら全体SUCCEEDED

ResultWriter

全子実行の結果をS3にJSONL形式で集約出力します。

"ResultWriter": {
  "Resource": "arn:aws:states:::s3:putObject",
  "Parameters": {
 "Bucket": "<OUTPUT_BUCKET_NAME>",
 "Prefix": "results/"
  }
}

  • 出力形式はJSONL(1行1アイテムのJSON)
  • ResultWriterを省略した場合、全子実行の結果が親実行の出力に含まれる → 大規模データには非推奨(256KBの出力上限に引っかかりやすい)

BatchInput

全子実行に共通の追加入力を渡すフィールド。

"BatchInput": {
  "output_bucket": "<OUTPUT_BUCKET_NAME>"
}

  • ItemReaderで取得したアイテム(各ファイルのメタデータ等)とBatchInputがマージされて子実行に渡される
  • 各子実行に共通の設定値(バケット名、処理パラメータ等)を渡したい場合に有用

1-5. ユースケース一覧

ユースケース説明典型アイテム数
S3大量オブジェクト処理ログファイル/CSVの一括変換・集計数百〜数百万件
CSVデータ一括変換ETL前処理、データクレンジング数千〜数十万行
画像リサイズ/サムネイル生成S3内の画像ファイルを並列変換数百〜数万枚
ETLパイプラインデータ変換・ロード処理数万〜数百万件
ログ解析アクセスログの集計・異常検知数百〜数千ファイル

1-6. 第4弾(データフロー制御)との関連

第4弾「データフロー制御」で学んだ知識は、Distributed MapのItemProcessor内でもそのまま活用できます。

  • $(ドル記号): 各子実行への入力(ItemReaderが渡すアイテム)を参照
  • $$(ダブルドル): Context Objectを参照
  • $$Map.Item.Index: 現在処理中のアイテムのインデックス(0始まり)
  • $$Map.Item.Value: 現在処理中のアイテムの値
  • 子実行内でも InputPath / Parameters / ResultSelector / ResultPath が同様に機能

1-7. コスト構造

Distributed Mapはアイテムごとに子実行を起動するため、課金が親+子の二層になります。

コスト試算例:
  アイテム数: 1,000件
  子実行: STANDARD(各3ステート)

  親実行: 数ステート(Map State含む)
  子実行: 1,000実行 × 3ステート = 3,000状態遷移
  合計: 親の遷移数 + 3,000遷移分の課金

ExecutionType課金モデル短時間大量処理への向き
STANDARD状態遷移数課金($0.025 / 1,000遷移)遷移数が多いほど割高
EXPRESS実行回数 + 実行時間(GB秒)短時間処理なら低コスト

コスト最適化のポイント:
– 短時間処理(5分以内)にはExecutionType: EXPRESSが有利
MaxConcurrencyを低く設定するとスループットは下がるが、AWS API制限(Lambda同時実行数等)に余裕ができる
– 大量の小ファイルはS3バッチ操作との比較も検討する


Section 2: アーキテクチャ解説

2-1. ハンズオンシナリオ概要

本ハンズオンでは、S3バケットに格納された大量のCSVファイルを Step Functions の Distributed Map を使って並列処理し、変換結果をS3出力バケットに集約するシナリオを実装します。

全体像: S3入力バケット(input/ プレフィックス配下)に複数のCSVファイルを配置し、Step Functionsのステートマシンを起動すると、Distributed Map StateがファイルリストをItemReaderで読み取り、Lambda関数(csv-converter)を複数の子実行として並列に呼び出します。各Lambda関数はCSVファイルを変換してS3出力バケットに書き込み、最終的にResultWriterがすべての実行結果をJSONLファイルとして集約します。

アーキテクチャ全体構成

各コンポーネントの役割を以下の表に整理します。

コンポーネント役割
Step Functions(Distributed Map)親ワークフロー、ItemReader/子実行管理/ResultWriter制御
Lambda(csv-converter)各CSVファイルの変換処理(子実行内で呼び出し)
S3入力バケット処理対象CSVファイルの保存場所(input/ プレフィックス)
S3出力バケット変換結果ファイルとResultWriterの集約JSONLの保存場所(results/ プレフィックス)
IAMロールSFがS3/Lambda/CloudWatch Logsにアクセスするための権限
CloudWatch Logs子実行のログ記録(デバッグ・監視)

2-2. Distributed Map 実行フロー(段階的構築の概要)

Distributed Map は「親実行」が全体を制御し、各アイテムに対して「子実行」を起動する2層構造を持ちます。

Distributed Map実行フロー

実行の流れは以下の4ステップです。

  1. 親実行が開始 → ステートマシンが起動
  2. ItemReader がアイテムリストを取得 → S3 の listObjectsV2 API でファイル一覧を取得し、各ファイルを1件のアイテムとして扱う
  3. 子実行を並列起動 → MaxConcurrencyの制限内で、各アイテムに対してSTANDARD実行として子実行を起動。Lambda(csv-converter)を呼び出す
  4. ResultWriter が結果を集約 → 全子実行の完了後、実行結果をJSONLファイルとしてS3に書き込み、親実行がSUCCEEDEDになる

本ハンズオンはStep A→Dの4段階で段階的に機能を追加していきます。

ステップ追加機能目的
Step A基本Distributed Map(ItemReader + Lambda)並列処理の基礎を体験
Step BMaxConcurrency=5Lambda同時実行数の制御を習得
Step CToleratedFailurePercentage=20一部失敗を許容して大規模処理を安定化
Step DResultWriter + Retry/Catch完成形結果集約とエラーハンドリングを完成

この段階的な構築により、各機能が「何を解決するのか」を実感しながら学習できます。


2-3. エラー許容・MaxConcurrency の挙動

Distributed Map の2つの重要な制御パラメータを解説します。

エラー許容・MaxConcurrency挙動

MaxConcurrency(同時実行数制御)

MaxConcurrency は Distributed Map が同時に起動できる子実行の最大数を制御します。

  • MaxConcurrency=5 に設定した場合、常に最大5件の子実行が同時実行中の状態になります
  • 1件の子実行が完了すると、即座にキューから次のアイテムが取り出され、空いたスロットに投入されます
  • MaxConcurrency=0 に設定すると制限なし(全アイテムを同時起動)ですが、Lambdaの同時実行数制限に注意が必要です

実務上の推奨: Lambda のアカウントレベルの同時実行数制限(デフォルト1,000)と、下流サービス(DynamoDB、RDSなど)のスループット上限を考慮して設定します。

ToleratedFailurePercentage(失敗許容率)

大規模な並列処理では、一部のアイテムが失敗することは避けられません。ToleratedFailurePercentage はこの「部分的な失敗」を許容するための設定です。

  • 全アイテムの処理完了後、失敗件数の割合を算出
  • 失敗率 ≤ 設定値 → 親実行は SUCCEEDED(失敗した子実行の情報はResultWriterの出力に記録される)
  • 失敗率 > 設定値States.ExceededToleratedFailureThreshold エラーで親実行が FAILED

:
– 総件数100件、失敗18件 → 失敗率18% ≤ 20% → SUCCEEDED
– 総件数100件、失敗25件 → 失敗率25% > 20% → FAILED

実務上の推奨: バッチ処理のビジネス要件に合わせて設定します。「99%以上成功すればよい」ような用途では ToleratedFailurePercentage=1、「一定数の失敗は許容できる」場合は10〜20%程度が目安です。

Note: ToleratedFailureCount で絶対数を指定することも可能です。どちらか片方、または両方を設定できます(両方設定した場合はいずれかが超過した時点でFAILED)。


Section 3: AWSコンソールでのハンズオン

このセクションでは、AWSコンソールを使ってS3上のCSVファイルをDistributed Mapで並列処理するワークフローを実際に構築します。
「数十ファイルを同時並列処理する」体験が、このハンズオンの核心です。

ステートマシンはStep A→B→C→Dと段階的に機能を追加しながら構築します。最終形(Step D)では、MaxConcurrency制御 + 部分失敗許容 + ResultWriterによる結果集約 + Retry/Catchが全て揃った完成形を体験できます。


3-1. 前提条件

必要なAWS権限

以下のサービスを操作できるIAM権限が必要です。

サービス必要な権限
Amazon S3バケットの作成・オブジェクトのアップロード・ダウンロード
AWS Lambda関数の作成・編集・実行
AWS Step Functionsステートマシンの作成・実行
AWS IAMロール・ポリシーの作成
Amazon CloudWatch Logsログの閲覧

AWS CLIの準備

このハンズオンでは、CSVファイルのアップロードや結果確認にAWS CLIを使用します。

# バージョン確認
aws --version
# aws-cli/2.x.x Python/3.x.x ...

# 認証確認
aws sts get-caller-identity


認証情報が正しく設定されていれば、アカウントIDとARNが表示されます。

テスト用CSVファイル

このハンズオンでは10〜20個のCSVファイルを使用します。内容は任意です(顧客データCSVを想定)。


3-2. 準備: S3バケット作成

Distributed Mapで使用する入力バケットと出力バケットを作成します。

入力バケットの作成

コンソール操作手順:

  1. AWSコンソールで Amazon S3 を開く
  2. 「バケットを作成」をクリック
  3. 設定値:
  4. バケット名: dm-input-{ランダム文字列} (例: dm-input-abc123
    > バケット名はグローバルで一意である必要があります。末尾にランダムな英数字を追加してください。
  5. AWSリージョン: アジアパシフィック(東京)ap-northeast-1
  6. その他: デフォルトのまま(パブリックアクセスはすべてブロック)
  7. 「バケットを作成」をクリック
  8. バケット名をメモ<INPUT_BUCKET_NAME> プレースホルダーに使用

出力バケットの作成

同様の手順で出力バケットを作成します。

  1. 「バケットを作成」をクリック
  2. バケット名: dm-output-{ランダム文字列} (例: dm-output-abc123
  3. リージョン: ap-northeast-1(東京)
  4. 「バケットを作成」をクリック
  5. バケット名をメモ<OUTPUT_BUCKET_NAME> プレースホルダーに使用

テスト用CSVファイルのアップロード

入力バケットに input/ プレフィックスでCSVファイルをアップロードします。

# テスト用CSVファイルを10個生成してアップロード
for i in $(seq 1 10); do
  echo "id,name,amount
$i,customer$i,$((i * 1000))" > /tmp/file${i}.csv
  aws s3 cp /tmp/file${i}.csv s3://dm-input-{ランダム文字列}/input/file${i}.csv
done

# アップロード確認
aws s3 ls s3://dm-input-{ランダム文字列}/input/


10個のCSVファイルが input/ プレフィックス下に表示されることを確認してください。


3-3. Lambda関数のデプロイ(csv-converter)

Distributed Mapの各子実行が呼び出すLambda関数を作成します。

コンソール操作手順:

  1. AWSコンソールで Lambda を開く
  2. 「関数の作成」をクリック
  3. 設定値:
  4. 作成方法: 一から作成
  5. 関数名: csv-converter
  6. ランタイム: Python 3.12
  7. アーキテクチャ: x86_64
  8. 「関数の作成」をクリック
  9. コードエディタに以下を貼り付け:
import json
import os
import boto3
import csv
import io
from datetime import datetime

s3 = boto3.client("s3")

def lambda_handler(event, context):
 # InputはS3オブジェクトメタデータ: {"Key": "input/fileN.csv", "Size": ..., "ETag": ...}
 key = event.get("Key")
 if not key:
  raise ValueError("Key is required in event")

 # 入力バケット名は環境変数から取得
 input_bucket = os.environ.get("INPUT_BUCKET")
 output_bucket = os.environ.get("OUTPUT_BUCKET")

 if not input_bucket or not output_bucket:
  raise ValueError("INPUT_BUCKET and OUTPUT_BUCKET environment variables are required")

 # S3からCSVを読み込み
 response = s3.get_object(Bucket=input_bucket, Key=key)
 csv_content = response["Body"].read().decode("utf-8")

 # CSV処理(例: 各行にタイムスタンプを追加)
 rows = list(csv.DictReader(io.StringIO(csv_content)))
 processed_rows = []
 for row in rows:
  row["processed_at"] = datetime.utcnow().isoformat()
  row["status"] = "converted"
  processed_rows.append(row)

 # 処理結果をS3に出力
 filename = key.split("/")[-1]  # input/file1.csv → file1.csv
 output_key = f"output/{filename}"

 if processed_rows:
  fieldnames = processed_rows[0].keys()
  output = io.StringIO()
  writer = csv.DictWriter(output, fieldnames=fieldnames)
  writer.writeheader()
  writer.writerows(processed_rows)
  s3.put_object(
Bucket=output_bucket,
Key=output_key,
Body=output.getvalue().encode("utf-8"),
ContentType="text/csv"
  )

 print(f"Converted: {key} → {output_key} ({len(processed_rows)} rows)")

 return {
  "input_key": key,
  "output_key": output_key,
  "rows_processed": len(processed_rows)
 }

  1. 「Deploy」をクリックして保存
  2. 関数ARNをメモ (例: arn:aws:lambda:ap-northeast-1:123456789012:function:csv-converter
    <LAMBDA_FUNCTION_ARN> プレースホルダーに使用

環境変数の設定

  1. Lambda関数の画面 → 「設定」タブ → 「環境変数」→「編集」をクリック
  2. 以下の環境変数を追加:
キー
INPUT_BUCKETdm-input-{ランダム文字列}
OUTPUT_BUCKETdm-output-{ランダム文字列}
  1. 「保存」をクリック

なぜ環境変数を使うか: バケット名をコードにハードコードすると、環境ごとの変更が困難になります。環境変数を使うことでコードを変更せずに異なるバケットへ切り替えられます。


3-4. IAMロール作成

Step FunctionsがS3・Lambda・CloudWatch Logsを操作するためのIAMロールを作成します。

コンソール操作手順:

  1. AWSコンソールで IAM を開く → 「ロール」→「ロールを作成」
  2. 信頼されたエンティティの選択:
  3. タイプ: AWSのサービス
  4. ユースケース: 「Step Functions」を選択(リストにない場合は「カスタム信頼ポリシー」を使用)
  5. 信頼ポリシー(カスタム選択時):
    json
    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Principal": {
    "Service": "states.amazonaws.com"
    },
    "Action": "sts:AssumeRole"
    }
    ]}
  6. ロール名: sf-distmap-execution-role
  7. ロール作成後、「インラインポリシーを追加」から以下を設定:
{
  "Version": "2012-10-17",
  "Statement": [
 {
"Sid": "InvokeLambda",
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "<LAMBDA_FUNCTION_ARN>"
 },
 {
"Sid": "S3InputRead",
"Effect": "Allow",
"Action": [
  "s3:GetObject",
  "s3:ListBucket"
],
"Resource": [
  "arn:aws:s3:::dm-input-{ランダム文字列}",
  "arn:aws:s3:::dm-input-{ランダム文字列}/*"
]
 },
 {
"Sid": "S3OutputWrite",
"Effect": "Allow",
"Action": [
  "s3:GetObject",
  "s3:PutObject"
],
"Resource": "arn:aws:s3:::dm-output-{ランダム文字列}/*"
 },
 {
"Sid": "StepFunctionsStartExecution",
"Effect": "Allow",
"Action": "states:StartExecution",
"Resource": "*"
 },
 {
"Sid": "CloudWatchLogs",
"Effect": "Allow",
"Action": [
  "logs:CreateLogGroup",
  "logs:CreateLogDelivery",
  "logs:PutLogEvents",
  "logs:DescribeLogGroups",
  "logs:DescribeLogStreams",
  "logs:GetLogDelivery",
  "logs:ListLogDeliveries",
  "logs:UpdateLogDelivery",
  "logs:DeleteLogDelivery",
  "logs:PutResourcePolicy",
  "logs:DescribeResourcePolicies"
],
"Resource": "*"
 }
  ]
}

  1. ポリシー名: sf-distmap-policy として保存

states:StartExecution がDistributed Map必須の理由: Distributed Mapは各アイテムを独立した子実行(子ワークフロー)として起動します。この子実行の起動にはStep Functions自身の states:StartExecution 権限が必要です。Inline Mapでは不要ですが、Distributed Mapでは必ず付与してください。


3-5. ステートマシンを段階的に構築(記事の核心)

ここからがこのハンズオンの核心です。Step A→Dと段階的にステートマシンを構築し、Distributed Mapの仕組みを体験的に理解します。


Step A: 基本Distributed Map — 並列処理を初体験

まず最もシンプルな形でDistributed Mapを体験します。

ステートマシン作成手順:

  1. AWSコンソールで Step Functions を開く
  2. 「ステートマシンの作成」をクリック
  3. 「コードでワークフローを記述」を選択
  4. 以下のASLを貼り付け(<INPUT_BUCKET_NAME><LAMBDA_FUNCTION_ARN> を実際の値に置換):
{
  "Comment": "S3 CSV大規模並列処理 — Step A(基本Distributed Map)",
  "StartAt": "ProcessCSVFiles",
  "States": {
 "ProcessCSVFiles": {
"Type": "Map",
"ItemReader": {
  "Resource": "arn:aws:states:::s3:listObjectsV2",
  "Parameters": {
 "Bucket": "<INPUT_BUCKET_NAME>",
 "Prefix": "input/"
  }
},
"ItemProcessor": {
  "ProcessorConfig": {
 "Mode": "DISTRIBUTED",
 "ExecutionType": "STANDARD"
  },
  "StartAt": "ConvertCSV",
  "States": {
 "ConvertCSV": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
  "FunctionName": "<LAMBDA_FUNCTION_ARN>",
  "Payload.$": "$"
},
"ResultSelector": {
  "result.$": "$.Payload"
},
"End": true
 }
  }
},
"End": true
 }
  }
}


ItemReader がDistributed Mapの核心: arn:aws:states:::s3:listObjectsV2 を使うことで、S3バケット内のオブジェクト一覧を自動的に取得し、各オブジェクトを1つのアイテムとして処理します。Lambda側では event["Key"] でオブジェクトキーを受け取れます。

  1. 設定値:
  2. ステートマシン名: csv-distmap-step-a
  3. 実行ロール: sf-distmap-execution-role
  4. ログ設定: ALLレベル を推奨(子実行の状況確認に必須)
  5. 「ステートマシンの作成」をクリック

実行テスト:

  1. 「実行の開始」をクリック
  2. 入力JSONに以下を貼り付け(ItemReaderでS3バケット名を直接指定しているため、入力は空で可):
{}

  1. 「実行の開始」をクリック

実行確認ポイント:

  • 「マップ実行」タブを開くと、10個の子実行が並列に起動されることを確認
  • 各子実行をクリックして ConvertCSV ステートの成功(SUCCEEDED)を確認
  • S3出力バケットの output/ プレフィックスに変換済みCSVが出力されることを確認:
    bash
    aws s3 ls s3://dm-output-{ランダム文字列}/output/

全体の実行が SUCCEEDED になれば、Step Aは完了です。


Step B: MaxConcurrency=5 — 並列度を制御する

無制限並列処理に上限を設け、Lambda同時実行数を制御します。

ステートマシン作成手順:

  1. 「ステートマシンの作成」→「コードでワークフローを記述」
  2. 以下のASLを貼り付け(プレースホルダーを置換):
{
  "Comment": "S3 CSV大規模並列処理 — Step B(MaxConcurrency=5)",
  "StartAt": "ProcessCSVFiles",
  "States": {
 "ProcessCSVFiles": {
"Type": "Map",
"ItemReader": {
  "Resource": "arn:aws:states:::s3:listObjectsV2",
  "Parameters": {
 "Bucket": "<INPUT_BUCKET_NAME>",
 "Prefix": "input/"
  }
},
"ItemProcessor": {
  "ProcessorConfig": {
 "Mode": "DISTRIBUTED",
 "ExecutionType": "STANDARD"
  },
  "StartAt": "ConvertCSV",
  "States": {
 "ConvertCSV": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
  "FunctionName": "<LAMBDA_FUNCTION_ARN>",
  "Payload.$": "$"
},
"ResultSelector": {
  "result.$": "$.Payload"
},
"End": true
 }
  }
},
"MaxConcurrency": 5,
"End": true
 }
  }
}

  1. ステートマシン名: csv-distmap-step-b
  2. 実行ロール: sf-distmap-execution-role
  3. 「ステートマシンの作成」をクリック

MaxConcurrencyの効果:

設定挙動
MaxConcurrency: 0(デフォルト)全アイテムを同時に並列処理(無制限)
MaxConcurrency: 5最大5子実行が同時稼働。完了次第キューから次を起動

確認方法:

10ファイルを処理すると、最初に5件が同時に起動し、完了次第残りが起動します。「マップ実行」タブで同時実行数が5件を超えないことを観察してください。

CloudWatch Logsで同時実行の様子を確認:

aws logs filter-log-events \
  --log-group-name "/aws/states/csv-distmap-step-b" \
  --start-time $(date -d "1 hour ago" +%s000) \
  --filter-pattern "ConvertCSV"


なぜMaxConcurrencyが重要か: Lambda同時実行数のソフトリミット(デフォルト1,000)や、接続先DB・APIのスロットリング制限を超えないようにするために、MaxConcurrencyで並列度を制御します。1,000万件処理でも安全に実行できます。


Step C: ToleratedFailurePercentage=20 — 一部失敗を許容する

Distributed Mapでは一部のアイテムが失敗しても、閾値以内なら全体をSUCCEEDEDにできます。

ステートマシン作成手順:

  1. 「ステートマシンの作成」→「コードでワークフローを記述」
  2. 以下のASLを貼り付け(プレースホルダーを置換):
{
  "Comment": "S3 CSV大規模並列処理 — Step C(ToleratedFailurePercentage=20)",
  "StartAt": "ProcessCSVFiles",
  "States": {
 "ProcessCSVFiles": {
"Type": "Map",
"ItemReader": {
  "Resource": "arn:aws:states:::s3:listObjectsV2",
  "Parameters": {
 "Bucket": "<INPUT_BUCKET_NAME>",
 "Prefix": "input/"
  }
},
"ItemProcessor": {
  "ProcessorConfig": {
 "Mode": "DISTRIBUTED",
 "ExecutionType": "STANDARD"
  },
  "StartAt": "ConvertCSV",
  "States": {
 "ConvertCSV": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
  "FunctionName": "<LAMBDA_FUNCTION_ARN>",
  "Payload.$": "$"
},
"ResultSelector": {
  "result.$": "$.Payload"
},
"End": true
 }
  }
},
"MaxConcurrency": 5,
"ToleratedFailurePercentage": 20,
"End": true
 }
  }
}

  1. ステートマシン名: csv-distmap-step-c
  2. 「ステートマシンの作成」をクリック

失敗体験ハンズオン:

ToleratedFailurePercentage=20 の動作を体験するために、Lambdaを一時的にエラー状態にします。

  1. Lambda コンソール → csv-converter を開く
  2. 「設定」→「環境変数」→「編集」
  3. INPUT_BUCKET の値を 存在しないバケット名 に変更(例: dm-input-doesnotexist
  4. 「保存」をクリック

上記の状態でStep Cのステートマシンを実行すると:
– 全10件がLambdaエラー(S3 NoSuchBucket)になります

確認後は、INPUT_BUCKET を正しいバケット名に戻してください。

確認ポイント:

状況結果
10件中2件以下が失敗(20%以内)全体は SUCCEEDED(失敗した子実行はFAILED個別表示)
10件中3件以上が失敗(20%超)全体が FAILEDStates.ExceedToleratedFailureThreshold

ToleratedFailurePercentage vs ToleratedFailureCount: パーセント指定の他に件数指定も可能です(ToleratedFailureCount: 2)。データ件数が変動する場合はパーセント指定が柔軟です。


Step D: ResultWriter + Retry/Catch 完成形

全機能を統合した完成形です。ResultWriterで結果をS3に集約し、Retry/Catchで個別失敗に対処します。

ステートマシン作成手順:

  1. 「ステートマシンの作成」→「コードでワークフローを記述」
  2. 以下のASLを貼り付け(<INPUT_BUCKET_NAME><LAMBDA_FUNCTION_ARN><OUTPUT_BUCKET_NAME> を実際の値に置換):
{
  "Comment": "S3 CSV大規模並列処理 — Step D(ResultWriter+Retry/Catch完成形)",
  "StartAt": "ProcessCSVFiles",
  "States": {
 "ProcessCSVFiles": {
"Type": "Map",
"ItemReader": {
  "Resource": "arn:aws:states:::s3:listObjectsV2",
  "Parameters": {
 "Bucket": "<INPUT_BUCKET_NAME>",
 "Prefix": "input/"
  }
},
"ItemProcessor": {
  "ProcessorConfig": {
 "Mode": "DISTRIBUTED",
 "ExecutionType": "STANDARD"
  },
  "StartAt": "ConvertCSV",
  "States": {
 "ConvertCSV": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
  "FunctionName": "<LAMBDA_FUNCTION_ARN>",
  "Payload.$": "$"
},
"Retry": [
  {
 "ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
 ],
 "IntervalSeconds": 2,
 "MaxAttempts": 3,
 "BackoffRate": 2
  }
],
"Catch": [
  {
 "ErrorEquals": ["States.ALL"],
 "Next": "HandleConversionError",
 "ResultPath": "$.error"
  }
],
"ResultSelector": {
  "status": "SUCCESS",
  "output_key.$": "$.Payload.output_key"
},
"ResultPath": "$.conversion_result",
"Next": "ConversionSuccess"
 },
 "ConversionSuccess": {
"Type": "Pass",
"Parameters": {
  "status": "SUCCESS",
  "input_key.$": "$.Key",
  "output_key.$": "$.conversion_result.output_key"
},
"End": true
 },
 "HandleConversionError": {
"Type": "Pass",
"Parameters": {
  "status": "FAILED",
  "input_key.$": "$.Key",
  "error.$": "$.error.Cause"
},
"End": true
 }
  }
},
"MaxConcurrency": 5,
"ToleratedFailurePercentage": 20,
"ResultWriter": {
  "Resource": "arn:aws:states:::s3:putObject",
  "Parameters": {
 "Bucket": "<OUTPUT_BUCKET_NAME>",
 "Prefix": "results/"
  }
},
"ResultPath": "$.map_result",
"End": true
 }
  }
}

  1. ステートマシン名: csv-distmap-final
  2. 実行ロール: sf-distmap-execution-role
  3. 「ステートマシンの作成」をクリック

Step Dで追加された機能の説明:

機能設定効果
RetryIntervalSeconds: 2, MaxAttempts: 3, BackoffRate: 2Lambda一時的失敗時に最大3回リトライ(2秒→4秒→8秒の指数バックオフ)
CatchStates.ALL → HandleConversionError3回リトライしても失敗 → {"status": "FAILED"} を記録して継続
ResultWriters3:putObject → results/全子実行の結果をMANIFEST.jsonとしてS3に集約
ResultPath$.map_resultMapステートの出力をResultWriterの参照先に保存

実行テスト:

入力JSONは空で実行可能です:

{}


実行後の確認:

# ResultWriterが出力したMANIFEST.jsonを確認
aws s3 ls s3://dm-output-{ランダム文字列}/results/
aws s3 cp s3://dm-output-{ランダム文字列}/results/MANIFEST.json - | python3 -m json.tool

# 変換済みCSVを確認
aws s3 ls s3://dm-output-{ランダム文字列}/output/
aws s3 cp s3://dm-output-{ランダム文字列}/output/file1.csv -


MANIFEST.jsonには各子実行の結果ファイルへの参照リストが含まれます。

ResultWriterの仕組み: 全子実行が完了すると、Step FunctionsがMANIFEST.jsonを自動生成してS3に保存します。MANIFEST.jsonには各子実行の出力ファイルへのS3パスが記録されており、後続処理でこのファイルを読むことで全結果をまとめて参照できます。


3-6. 実行結果の確認

S3出力の確認

# ResultWriterの出力(MANIFEST.json)を確認
aws s3 ls s3://dm-output-{ランダム文字列}/results/
aws s3 cp s3://dm-output-{ランダム文字列}/results/MANIFEST.json - | python3 -m json.tool

# 変換済みCSVファイルの一覧確認
aws s3 ls s3://dm-output-{ランダム文字列}/output/

# 個別ファイルの内容確認
aws s3 cp s3://dm-output-{ランダム文字列}/output/file1.csv -


変換済みCSVには、元のフィールドに加えて processed_atstatus フィールドが追加されていることを確認してください。

マップ実行タブでの確認

Step Functionsコンソールの実行詳細画面で「マップ実行」タブを開くと:

  • 各子実行のステータス(SUCCEEDED/FAILED)が一覧表示されます
  • 各子実行をクリックすると、その子実行のグラフが表示されます(Distributed Mapの固有機能)
  • Step Dでは、失敗した子実行でも HandleConversionError を通じて {"status": "FAILED"} として記録されます

3-7. CloudWatch Logsで並列処理の様子を確認

ログ確認手順

  1. Step Functionsコンソール → 対象の実行を開く
  2. 「実行の詳細」タブ → 「ログ」セクションを確認
  3. CloudWatch Logsのリンクをクリックして詳細ログを表示

Distributed Map固有の特性: 独立したログストリーム

Distributed Mapでは各子実行が独立したログストリームを持ちます。これはInline Mapとの重要な違いです。

# ログストリームの一覧を確認(各子実行が独立したストリームを持つことを確認)
aws logs describe-log-streams \
  --log-group-name "/aws/states/csv-distmap-final" \
  --order-by LastEventTime \
  --descending \
  --max-items 15 \
  --query 'logStreams[*].logStreamName'


各子実行(子ワークフロー)が独立したログストリームで記録されているため、特定ファイルの処理ログを個別に追跡できます。

並列処理のシーケンスを確認

ログで以下のシーケンスを確認してください(MaxConcurrency=5の場合):

MapStateEntered ← Distributed Mapステート開始
MapRunStarted← マップ実行開始
ChildWorkflowExecutionStarted×5  ← 最初の5子実行が同時に起動
ChildWorkflowExecutionSucceeded  ← 子実行1が完了
ChildWorkflowExecutionStarted ← 6番目の子実行が起動(スロット開放)
...
MapRunSucceeded ← 全子実行完了
MapStateExited  ← Distributed Mapステート終了


CloudWatch Metricsでモニタリング

本番環境では以下のメトリクスを監視することを推奨します:

メトリクス説明アラート推奨値
ExecutionsFailed失敗した実行数1以上でアラート
ExecutionsTimedOutタイムアウトした実行数1以上でアラート
ExecutionThrottledスロットリングされた実行数急増時にアラート
MapRunsFailed失敗したマップ実行数1以上でアラート

まとめ(コンソール編)

このセクションでは、S3×2バケット + Lambda + Distributed Mapを段階的に構築しました。

Step追加した機能学んだこと
A基本Distributed MapItemReaderでS3一覧取得、$.Key で個別ファイル処理
BMaxConcurrency=5並列度の制御、Lambda同時実行数の管理
CToleratedFailurePercentage=20部分失敗の許容、States.ExceedToleratedFailureThreshold
DResultWriter + Retry/Catch結果の自動集約、指数バックオフリトライ、個別エラーの記録

Distributed Mapの本質: Step Functionsが自動的にS3オブジェクト一覧を取得し、各ファイルを独立した子実行として並列処理します。「10件でも1,000万件でも同じASLで処理できる」のがDistributed Mapの最大の強みです。

次のSection 4では、このワークフロー全体をTerraformで自動構築します。


Section 4: Terraformでの構築

本セクションでは、コンソール版(Section 3)で構築したDistributed MapステートマシンとASL構造が完全一致するTerraform構成を解説します。S3入力バケット内のCSVファイル群をDistributed Mapで並列処理し、S3出力バケットに集約するシナリオを実装します。


4-1. 前提条件

  • Terraform 1.0以上インストール済み
  • AWS CLI設定済み(aws configure で認証情報設定)
  • 適切なIAM権限(S3 / Lambda / Step Functions / IAM リソースの作成・管理権限)

4-2. ディレクトリ構成

sf-distmap/
├── main.tf
├── variables.tf
├── outputs.tf
├── lambda/
│└── csv_converter.py
└── statemachine/
 └── definition.json.tpl


4-3. Lambda ソースコード

lambda/csv_converter.py — コンソール版(Section 3)と同じロジック構成のコードです。

import json
import os
import boto3
import csv
import io
from datetime import datetime

s3 = boto3.client("s3")

def lambda_handler(event, context):
 key = event.get("Key")
 if not key:
  raise ValueError("Key is required in event")

 input_bucket = os.environ.get("INPUT_BUCKET")
 output_bucket = os.environ.get("OUTPUT_BUCKET")

 if not input_bucket or not output_bucket:
  raise ValueError("INPUT_BUCKET and OUTPUT_BUCKET environment variables are required")

 response = s3.get_object(Bucket=input_bucket, Key=key)
 csv_content = response["Body"].read().decode("utf-8")

 rows = list(csv.DictReader(io.StringIO(csv_content)))
 processed_rows = []
 for row in rows:
  row["processed_at"] = datetime.utcnow().isoformat()
  row["status"] = "converted"
  processed_rows.append(row)

 filename = key.split("/")[-1]
 output_key = f"output/{filename}"

 if processed_rows:
  fieldnames = processed_rows[0].keys()
  output = io.StringIO()
  writer = csv.DictWriter(output, fieldnames=fieldnames)
  writer.writeheader()
  writer.writerows(processed_rows)
  s3.put_object(
Bucket=output_bucket,
Key=output_key,
Body=output.getvalue().encode("utf-8"),
ContentType="text/csv"
  )

 return {
  "input_key": key,
  "output_key": output_key,
  "rows_processed": len(processed_rows)
 }


4-4. statemachine/definition.json.tpl

コンソール版(Section 3)のStep D と同じASL構造です。Terraformでは templatefile() 関数を使ってバケット名とLambda ARNを動的に補間します。${...} 形式の変数はTerraform実行時に実際の値に置き換わります。

  • ${input_bucket_name} → 入力S3バケット名(ItemReader用)
  • ${output_bucket_name} → 出力S3バケット名(ResultWriter用)
  • ${lambda_function_arn} → Lambda関数のARN
{
  "Comment": "S3 CSV大規模並列処理 — Step D(ResultWriter+Retry/Catch完成形)",
  "StartAt": "ProcessCSVFiles",
  "States": {
 "ProcessCSVFiles": {
"Type": "Map",
"ItemReader": {
  "Resource": "arn:aws:states:::s3:listObjectsV2",
  "Parameters": {
 "Bucket": "${input_bucket_name}",
 "Prefix": "input/"
  }
},
"ItemProcessor": {
  "ProcessorConfig": {
 "Mode": "DISTRIBUTED",
 "ExecutionType": "STANDARD"
  },
  "StartAt": "ConvertCSV",
  "States": {
 "ConvertCSV": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
  "FunctionName": "${lambda_function_arn}",
  "Payload.$": "$"
},
"Retry": [
  {
 "ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
 ],
 "IntervalSeconds": 2,
 "MaxAttempts": 3,
 "BackoffRate": 2
  }
],
"Catch": [
  {
 "ErrorEquals": ["States.ALL"],
 "Next": "HandleConversionError",
 "ResultPath": "$.error"
  }
],
"ResultSelector": {
  "status": "SUCCESS",
  "output_key.$": "$.Payload.output_key"
},
"ResultPath": "$.conversion_result",
"Next": "ConversionSuccess"
 },
 "ConversionSuccess": {
"Type": "Pass",
"Parameters": {
  "status": "SUCCESS",
  "input_key.$": "$.Key",
  "output_key.$": "$.conversion_result.output_key"
},
"End": true
 },
 "HandleConversionError": {
"Type": "Pass",
"Parameters": {
  "status": "FAILED",
  "input_key.$": "$.Key",
  "error.$": "$.error.Cause"
},
"End": true
 }
  }
},
"MaxConcurrency": 5,
"ToleratedFailurePercentage": 20,
"ResultWriter": {
  "Resource": "arn:aws:states:::s3:putObject",
  "Parameters": {
 "Bucket": "${output_bucket_name}",
 "Prefix": "results/"
  }
},
"ResultPath": "$.map_result",
"End": true
 }
  }
}


4-5. variables.tf

variable "aws_region" {
  default = "ap-northeast-1"
}

variable "project_name" {
  default = "csv-distmap"
}

variable "random_suffix" {
  description = "バケット名の一意性を確保するためのサフィックス(例: abc123)"
  type  = string
}


4-6. main.tf(完全なコード)

terraform {
  required_version = ">= 1.0"
  required_providers {
 aws = {
source  = "hashicorp/aws"
version = "~> 5.0"
 }
 archive = {
source  = "hashicorp/archive"
version = "~> 2.0"
 }
  }
}

provider "aws" {
  region = var.aws_region
}

# ─────────────────────────────────────────────
# S3バケット(入力・出力)
# ─────────────────────────────────────────────

resource "aws_s3_bucket" "input" {
  bucket = "dm-input-${var.random_suffix}"
}

resource "aws_s3_bucket_public_access_block" "input" {
  bucket = aws_s3_bucket.input.id

  block_public_acls = true
  block_public_policy  = true
  ignore_public_acls= true
  restrict_public_buckets = true
}

resource "aws_s3_bucket" "output" {
  bucket = "dm-output-${var.random_suffix}"
}

resource "aws_s3_bucket_public_access_block" "output" {
  bucket = aws_s3_bucket.output.id

  block_public_acls = true
  block_public_policy  = true
  ignore_public_acls= true
  restrict_public_buckets = true
}

# ─────────────────────────────────────────────
# Lambda関数
# ─────────────────────────────────────────────

data "archive_file" "lambda_zip" {
  type  = "zip"
  source_file = "${path.module}/lambda/csv_converter.py"
  output_path = "${path.module}/lambda.zip"
}

resource "aws_iam_role" "lambda_role" {
  name = "${var.project_name}-lambda-role"

  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Effect = "Allow"
  Principal = { Service = "lambda.amazonaws.com" }
  Action = "sts:AssumeRole"
}
 ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_basic" {
  role = aws_iam_role.lambda_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

resource "aws_iam_role_policy" "lambda_s3_policy" {
  name = "${var.project_name}-lambda-s3-policy"
  role = aws_iam_role.lambda_role.id

  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Effect = "Allow"
  Action = ["s3:GetObject"]
  Resource = "${aws_s3_bucket.input.arn}/*"
},
{
  Effect = "Allow"
  Action = ["s3:PutObject"]
  Resource = "${aws_s3_bucket.output.arn}/*"
}
 ]
  })
}

resource "aws_cloudwatch_log_group" "lambda_logs" {
  name  = "/aws/lambda/${var.project_name}-csv-converter"
  retention_in_days = 14
}

resource "aws_lambda_function" "csv_converter" {
  filename= data.archive_file.lambda_zip.output_path
  function_name = "${var.project_name}-csv-converter"
  role = aws_iam_role.lambda_role.arn
  handler = "csv_converter.lambda_handler"
  runtime = "python3.12"
  source_code_hash = data.archive_file.lambda_zip.output_base64sha256
  timeout = 60

  environment {
 variables = {
INPUT_BUCKET  = aws_s3_bucket.input.bucket
OUTPUT_BUCKET = aws_s3_bucket.output.bucket
 }
  }

  depends_on = [aws_cloudwatch_log_group.lambda_logs]
}

# ─────────────────────────────────────────────
# Step Functions(Distributed Map)
# ─────────────────────────────────────────────

resource "aws_iam_role" "sf_execution_role" {
  name = "${var.project_name}-sf-role"

  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Effect = "Allow"
  Principal = { Service = "states.amazonaws.com" }
  Action = "sts:AssumeRole"
}
 ]
  })
}

resource "aws_iam_role_policy" "sf_policy" {
  name = "${var.project_name}-sf-policy"
  role = aws_iam_role.sf_execution_role.id

  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  # Lambda呼び出し
  Effect= "Allow"
  Action= ["lambda:InvokeFunction"]
  Resource = aws_lambda_function.csv_converter.arn
},
{
  # 入力バケット(ItemReader: listObjectsV2に必須)
  Effect = "Allow"
  Action = ["s3:GetObject", "s3:ListBucket"]
  Resource = [
 aws_s3_bucket.input.arn,
 "${aws_s3_bucket.input.arn}/*"
  ]
},
{
  # 出力バケット(ResultWriter用)
  Effect = "Allow"
  Action = ["s3:GetObject", "s3:PutObject"]
  Resource = [
 aws_s3_bucket.output.arn,
 "${aws_s3_bucket.output.arn}/*"
  ]
},
{
  # Distributed Map子実行起動(必須)
  Effect= "Allow"
  Action= ["states:StartExecution"]
  Resource = "*"
},
{
  # CloudWatch Logsへの書き込み
  Effect = "Allow"
  Action = [
 "logs:CreateLogGroup",
 "logs:CreateLogDelivery",
 "logs:PutLogEvents",
 "logs:DescribeLogGroups",
 "logs:DescribeResourcePolicies"
  ]
  Resource = "*"
}
 ]
  })
}

resource "aws_sfn_state_machine" "csv_distmap" {
  name  = "${var.project_name}-state-machine"
  role_arn = aws_iam_role.sf_execution_role.arn

  definition = templatefile("${path.module}/statemachine/definition.json.tpl", {
 input_bucket_name= aws_s3_bucket.input.bucket
 output_bucket_name  = aws_s3_bucket.output.bucket
 lambda_function_arn = aws_lambda_function.csv_converter.arn
  })

  logging_configuration {
 level= "ALL"
 include_execution_data = true
 log_destination  = "${aws_cloudwatch_log_group.sf_logs.arn}:*"
  }

  depends_on = [aws_iam_role_policy.sf_policy]
}

resource "aws_cloudwatch_log_group" "sf_logs" {
  name  = "/aws/states/${var.project_name}-state-machine"
  retention_in_days = 14
}


4-7. outputs.tf

output "state_machine_arn" {
  description = "ステートマシンのARN"
  value = aws_sfn_state_machine.csv_distmap.arn
}

output "input_bucket_name" {
  description = "入力S3バケット名"
  value = aws_s3_bucket.input.bucket
}

output "output_bucket_name" {
  description = "出力S3バケット名"
  value = aws_s3_bucket.output.bucket
}

output "lambda_function_arn" {
  description = "Lambda関数のARN"
  value = aws_lambda_function.csv_converter.arn
}


4-8. デプロイ手順

terraform init
terraform plan -var="random_suffix=abc123"
terraform apply -var="random_suffix=abc123"


random_suffix はS3バケット名のグローバル一意性を確保するために使います。abc123 の部分は任意の文字列(英小文字・数字)に変更してください。


4-9. テスト用CSVアップロード

デプロイ完了後、入力バケットにテスト用CSVファイルをアップロードします。

# テストCSVファイルを入力バケットにアップロード
for i in $(seq 1 10); do
  echo -e "id,name,amount\n$i,customer$i,$((i * 1000))" > /tmp/file${i}.csv
  aws s3 cp /tmp/file${i}.csv \
 s3://$(terraform output -raw input_bucket_name)/input/file${i}.csv
done


4-10. 動作確認

# ステートマシン実行
aws stepfunctions start-execution \
  --state-machine-arn $(terraform output -raw state_machine_arn) \
  --input '{}'

# 実行結果確認
aws s3 ls s3://$(terraform output -raw output_bucket_name)/results/
aws s3 cp s3://$(terraform output -raw output_bucket_name)/results/MANIFEST.json - | python3 -m json.tool


実行後、results/ プレフィックス配下に MANIFEST.json が生成されます。このファイルにはResultWriterによる集約結果(各アイテムのステータス)が記録されています。


まとめ(Terraform編)

ポイント内容
ASL定義コンソール版Step DのASLをそのまま definition.json.tpl として使用
テンプレート補間templatefile() で3変数(入力バケット・出力バケット・Lambda ARN)を注入
IAM権限states:StartExecution(子実行起動)と s3:ListBucket(ItemReader)が必須
並列度制御MaxConcurrency: 5ToleratedFailurePercentage: 20 でコンソール版と同一設定
エラー耐性Retry(最大3回・指数バックオフ)+ Catch(全エラーをハンドリング)済み

Section 5: 実践Tips(Distributed Map設計ガイド)

このSectionでは、Distributed Mapを実務で使う際のTipsと注意点を解説します。

5-1. コスト計算: 子実行数 × 状態遷移数

課金の仕組み:
– 親実行: 状態遷移数 × 単価($0.000025/遷移)
– 子実行(Standard): 各実行の状態遷移数 × 単価(別途カウント)

📌 無料枠: 毎月4,000回の状態遷移まで無料。本ハンズオン程度なら無料枠に収まります。

コスト見積もり例(Step D: 3ステート/子実行, 1,000ファイル, Standard実行):

項目計算料金目安
親実行の状態遷移1実行 × 1遷移¥0(無料枠内)
子実行数1,000実行1,000実行
子実行あたりの状態遷移ConvertCSV + ConversionSuccess = 2遷移2遷移/実行
合計状態遷移数1,000 × 2 = 2,000遷移無料枠4,000遷移以内 → $0
Lambda実行コスト1,000回 × 平均1秒 × 128MB~$0.002

Express実行との比較:

項目StandardExpress
最大実行時間1年5分
価格モデル状態遷移数課金($0.000025/遷移)実行数($1/100万) + 継続時間課金
高スループット向き×✓(最大100,000遷移/秒)
実行履歴参照✓(90日保存)△(CloudWatch Logsのみ)
選択基準長時間処理・デバッグ優先大量・短時間・高スループット

5-2. MaxConcurrency のチューニング

Lambda同時実行数制限との関係:
– Lambda デフォルト同時実行数: 1,000(リージョン全体のソフトリミット)
– 他のLambda関数と共有している場合、MaxConcurrencyを低く設定してリソースを確保
– Distributed Mapの最大並列数は 10,000 まで設定可能(Step Functions サービスクォータ)

推奨設定パターン:

シナリオMaxConcurrency推奨値理由
開発・テスト5〜10デバッグしやすい
本番(Lambda共有環境)50〜100他関数へのリソース確保
本番(専用Lambda)500〜1,000Lambdaの同時実行上限近くまで活用
外部API呼び出しありAPI のレート制限に合わせるスロットリング回避

0設定(無制限)の注意点:

"Map": {
  "Type": "Map",
  "MaxConcurrency": 0,  // 0 = 無制限(非推奨)
  ...
}

  • 数千アイテムを一斉起動するとLambdaスロットリングが多発
  • ToleratedFailurePercentage と組み合わせない場合、全体FAILEDリスクあり
  • 推奨: 常にMaxConcurrencyを明示的に設定する

5-3. 大規模実行時の注意(10万+アイテム)

制限値(2026年時点):

制限備考
ItemReader(S3 listObjectsV2)最大1億オブジェクト自動ページネーション
同時実行子ワークフロー数(最大)10,000Step Functions クォータ
子実行の最大実行時間(Standard)1年大規模バッチも問題なし

大規模処理での注意点:

  1. S3 listObjectsV2のページネーション: 自動的に処理される(Step Functions側で管理)
  2. 実行時間の試算: Standard実行は子実行1年以内。100万ファイルを1分/ファイルで処理する場合:
  3. MaxConcurrency=1,000 → 実行時間 = 100万/1,000 × 60秒 = 60,000秒 ≈ 17時間(問題なし)
  4. CloudWatch Logs のコスト: 子実行が多いとログ量が膨大になる。
    本番環境では ERROR レベルのみ記録することを検討:
    hcl
    # Terraform例
    logging_configuration {
    level = "ERROR"
    include_execution_data = false
    log_destination = "${aws_cloudwatch_log_group.sf.arn}:*"
    }
  5. ResultWriter の活用: 大規模処理では結果を親実行の出力に含めると256KB制限を超える。
    必ずResultWriterでS3に集約すること:
    json
    "ResultWriter": {
    "Resource": "arn:aws:states:::s3:putObject",
    "Parameters": {
    "Bucket": "<OUTPUT_BUCKET>",
    "Prefix": "results/"
    }
    }

5-4. CSVヘッダー行の扱い(ItemReader CSV設定)

S3のCSVファイルを行ごとに子実行で処理する場合(listObjectsV2ではなくCSV形式のItemReader):

"ItemReader": {
  "Resource": "arn:aws:states:::s3:getObject",
  "ReaderConfig": {
 "InputType": "CSV",
 "CSVHeaderLocation": "FIRST_ROW"
  },
  "Parameters": {
 "Bucket": "<BUCKET_NAME>",
 "Key": "<CSV_KEY>"
  }
}


CSVHeaderLocationの設定値:

動作
FIRST_ROW1行目をヘッダーとして扱い、各行はオブジェクトに変換
GIVENヘッダー名を CSVHeaders フィールドで指定
  • ヘッダー行は自動スキップされるため、手動でのフィルタリングは不要
  • 各行のデータはヘッダー名をキーとするJSONオブジェクトとして子実行に渡される

5-5. Inline Map vs Distributed Map の選択基準

アイテム数が40以下?
YES → Inline Map(コスト低・設定シンプル)
NO  ↓
データソースがS3オブジェクトリスト/CSVファイル?
YES → Distributed Map(ItemReader: S3利用)
NO  ↓
子実行を独立して監視・再実行したい?
YES → Distributed Map
NO  ↓
処理時間が5分以内かつ高スループットが必要?
YES → Distributed Map(ExecutionType: EXPRESS)
NO  → Distributed Map(ExecutionType: STANDARD)


💡 Tips: 「とりあえずDistributed Map」は避けること。40アイテム以下ならInline Mapの方がシンプルでコストも低い。


Section 6: ハンズオン後の削除手順

⚠️ 放置するとS3ストレージ料金、CloudWatch Logs料金が発生します。ハンズオン終了後は忘れずに削除してください。

6-1. コスト注意事項

リソース月額目安備考
Lambda無料枠内(本ハンズオン程度)100万リクエスト/月まで無料
Step Functions無料枠内(本ハンズオン程度)4,000回の状態遷移まで無料
S3~$0.025/GB/月テストCSVなら数円以下
CloudWatch Logs~$0.76/GBALLレベルだと子実行分も発生

6-2. Terraformで構築した場合

# S3バケット内のオブジェクトを先に削除(バケット削除の前提)
aws s3 rm s3://$(terraform output -raw input_bucket_name) --recursive
aws s3 rm s3://$(terraform output -raw output_bucket_name) --recursive

# リソース一括削除
terraform destroy -var="random_suffix=abc123"


手動削除が必要なもの(Terraformが管理しないリソース):

aws logs delete-log-group \
  --log-group-name /aws/lambda/csv-distmap-csv-converter


6-3. コンソールで構築した場合の削除チェックリスト

  • [ ] Step Functions ステートマシン(4つ: step-a/b/c/final)を削除
  • [ ] Lambda 関数(csv-converter)を削除
  • [ ] S3バケット(dm-input-XXXXX): オブジェクト全削除 → バケット削除
  • [ ] S3バケット(dm-output-XXXXX): オブジェクト全削除 → バケット削除
  • [ ] IAMロール(sf-distmap-execution-role)と付随するポリシーを削除
  • [ ] CloudWatch Logs ロググループを削除

S3バケットの完全削除(CLIで一括):

# バージョン管理が無効の場合
aws s3 rb s3://dm-input-{ランダム文字列} --force
aws s3 rb s3://dm-output-{ランダム文字列} --force


💡 バージョン管理が有効な場合は、先にすべてのバージョンを削除してから rb を実行する必要があります。


Section 7: まとめと次のステップ

7-1. この記事で学んだこと

本記事では、AWS Step Functions の Distributed Map を使った大規模並列処理を、実際のCSV変換パイプラインで体験しました。

テーマ学んだこと
Distributed Map の基本Inline Mapとの違い(アイテム数上限・子実行・ItemReader/ResultWriter)
ItemReaderS3 listObjectsV2を使ったS3オブジェクト大規模一括処理
並列度制御MaxConcurrencyによるLambda同時実行数との兼ね合い
耐障害性ToleratedFailurePercentageによる一部失敗の許容
結果集約ResultWriterによる処理結果のS3集約(MANIFEST.json)
エラーハンドリングRetry/CatchによるLambda一時的失敗への対応
コスト最適化Standard vs Expressの選択基準と料金計算

7-2. 次のステップ

本シリーズの続きとして、以下のテーマを予定しています:

  • 第7弾予告: Express vs Standard Workflow の詳細比較 — ユースケース別の最適解を解説
  • 第8弾予告: SDK Direct Integration(Lambda-less)— Lambdaを介さずAWSサービスを直接呼び出す手法
  • 応用編: Distributed Map × DynamoDB(大量アイテムの読み書き)

7-3. シリーズリンク


本記事は「AWS ハンズオン TechBlog」Step Functions シリーズの第6回です。

シリーズ一覧:
– 第1回: AWS Step Functions 入門 — コンソールとTerraformで学ぶハンズオン
– 第2回: ECS × Step Functions 入門 — CSVバッチをFargateタスクでジョブ化するハンズオン
– 第3回: Step Functions エラーハンドリング完全ガイド — 注文処理パイプラインで学ぶRetry/Catch/Timeout
– 第4回: Step Functions 入出力データフロー制御完全ガイド — 5つのフィルタでペイロードを最適化するハンズオン
– 第5回: Step Functions Callbackパターン完全ガイド — .waitForTaskTokenで実現する経費承認ワークフロー
– 第6回: 本記事(Distributed Map完全ガイド)


7-4. 参考リンク

  • Amazon States Language — Distributed Map State
  • Step Functions Distributed Map の処理モード
  • ItemReader の設定オプション
  • ResultWriter の出力形式(MANIFEST.json)
最新情報をチェックしよう!