- 1 ECS Task × Step Functions でジョブオーケストレーション — S3 CSVバッチ処理ハンズオン
- 1.1 1. 概念編
- 1.2 2. アーキテクチャ解説
- 1.3 3. AWSコンソールでのハンズオン
- 1.4 4. Terraformでの構築
- 1.5 5. ハンズオン後の削除手順
- 1.6 6. まとめと次のステップ
ECS Task × Step Functions でジョブオーケストレーション — S3 CSVバッチ処理ハンズオン
公開日: 2026-04-12
難易度: 中級
所要時間: 約90分
タグ: AWS, Step Functions, ECS, Fargate, Terraform, ハンズオン, バッチ処理
この記事で学ぶこと
– ECS FargateとStep Functionsを組み合わせたバッチジョブ制御
– S3 CSVファイルを読み込み・処理・書き戻すECSコンテナの実装
– コンソールとTerraform両方でのステートマシン構築
– Retry/Catchによる堅牢なエラーハンドリング
1. 概念編
1-1. ECS Task / Fargate とは
Amazon ECS(Elastic Container Service)は、AWS が提供するフルマネージドのコンテナオーケストレーションサービスです。Docker コンテナのデプロイ・管理・スケーリングを自動化し、開発者がアプリケーションのロジックに集中できる環境を提供します。
EC2 起動タイプ vs Fargate 起動タイプ
ECS には2種類の起動タイプがあり、用途に応じて使い分けます。
| 項目 | EC2 起動タイプ | Fargate 起動タイプ |
|---|---|---|
| サーバー管理 | EC2 インスタンスを自分で管理 | 不要(AWS が管理) |
| コスト | インスタンス稼働中は常時課金 | タスク実行時間・リソース量で課金 |
| カスタマイズ性 | OS レベルのカスタマイズ可能 | コンテナ単位のみ |
| スケーリング | インスタンス数を管理する必要あり | 自動(インフラ意識不要) |
| 向いているケース | 長時間稼働・高負荷・特殊要件 | バッチ処理・スポット起動・シンプル構成 |
Fargate のメリット
- サーバー管理不要: EC2 インスタンスのプロビジョニング・パッチ適用・スケーリングを AWS に任せられます。インフラ運用コストを大幅に削減できます。
- タスク単位課金: 実際にコンテナが動いている時間と消費した vCPU・メモリ量のみ課金されます。バッチ処理のように「必要なときだけ起動」するユースケースでは特にコスト効率が高くなります。
ECS の基本概念
ECS を理解する上で、以下の4つの概念を押さえておきましょう。
| 概念 | 説明 |
|---|---|
| クラスター | コンテナを実行するための論理的なグループ。Fargate の場合、実体はマネージドインフラ。 |
| タスク定義 | コンテナイメージ・CPU・メモリ・環境変数などを定義したテンプレート(「設計図」)。 |
| タスク | タスク定義をもとに実際に起動されるコンテナの実行単位。本記事ではバッチ処理の1ジョブに相当。 |
| サービス | 指定した数のタスクを常時維持する仕組み。Web サーバーなど常駐型アプリに使用(本記事では不使用)。 |
1-2. AWS Step Functions とは(おさらい)
AWS Step Functions については以下の入門記事で詳しく解説しています。本記事では要点のみ振り返ります。
AWS Step Functions 入門 — コンソールとTerraformで学ぶハンズオン
Step Functions のポイント(3行まとめ):
- ステートマシンでワークフローを JSON(ASL: Amazon States Language)として定義し、各ステップの実行順序・分岐・並列処理を宣言的に記述します。
- Lambda・ECS・DynamoDB・S3 など 200 以上の AWS サービスと SDK 統合でき、グルーコードなしでサービス間を連携できます。
- AWS コンソール上のビジュアルワークフローエディタでグラフを確認しながら実行状態をリアルタイムに追跡できます。
1-3. なぜ ECS × Step Functions を組み合わせるか
ECS Fargate と Step Functions を組み合わせることで、バッチ処理の弱点をカバーしつつ、オーケストレーションの強みを最大化できます。
長時間バッチ処理の管理(Lambda の 15 分制限を回避)
AWS Lambda はタイムアウト上限が 15 分です。数十 GB の CSV 変換や大規模な機械学習推論など、実行に数時間かかるバッチ処理には対応できません。ECS Fargate にはタイムアウト制限がなく、Step Functions でその実行を制御することで、長時間ジョブを安全に扱えます。
宣言的なリトライ・エラーハンドリング(Retry / Catch)
Step Functions のステート定義には Retry と Catch フィールドが用意されています。「最大3回リトライ、その後失敗ステートへ遷移」といった複雑なエラーハンドリングをコードなしで JSON で宣言できます。アプリケーション側にリトライロジックを書く必要がなく、信頼性の高いバッチパイプラインを構築できます。
可視化(実行グラフでジョブ状態を一目確認)
Step Functions コンソールでは、ワークフローの各ステップがグラフ上で色分けされ、実行中・成功・失敗の状態をリアルタイムで確認できます。複数の ECS タスクが直列・並列で走るパイプラインでも、どのステップで止まったかを即座に特定でき、デバッグ効率が大幅に向上します。
並列実行(Parallel ステートで複数 ECS タスク同時起動)
Step Functions の Parallel ステートを使うと、独立した ECS タスクを同時に起動できます。例えば「地域 A・B・C のデータを同時に処理して結果を集約する」といったパターンを、コードを書かずに実現できます。
Lambda 不要の直接 SDK 統合(コスト・レイテンシ削減)
従来は「Step Functions → Lambda → ECS 起動」という間接的な構成が一般的でしたが、現在は Step Functions から ECS タスクを 直接起動できます。Lambda の実行コスト・起動レイテンシを省けるため、シンプルかつ低コストな構成になります。
1-4. Step Functions × ECS 統合パターン
Step Functions から ECS タスクを呼び出す方法は主に2パターンあります。
| 項目 | .sync パターン | .waitForTaskToken パターン |
|---|---|---|
| リソース ARN | arn:aws:states:::ecs:runTask.sync | arn:aws:states:::ecs:runTask.waitForTaskToken |
| 動作 | SF が ECS タスクの完了を待ち受け、終了後に次ステートへ進む | ECS タスクがトークンを受け取り、処理完了後に SF へコールバック |
| 向いているケース | 数分〜数時間のバッチ処理(本記事のユースケース) | 外部システムとの非同期連携、人間によるレビュー待ちなど |
| タイムアウト上限 | Step Functions の最大実行時間(1年)に依存 | 同上(コールバックが来るまで待機) |
| ECS タスク側の実装 | 変更不要(タスクが終了すれば自動で次へ) | タスク内でトークンを受け取り SendTaskSuccess/Failure API を呼ぶ実装が必要 |
本記事では .sync パターンを使用します。 ECS タスクが終了するまで Step Functions が待機し、終了後に次のステップへ自動的に移行するシンプルな構成です。コールバック実装が不要なため、バッチ処理の入門には最適です。
1-5. 典型ユースケース
ECS × Step Functions の組み合わせは、以下のような場面で威力を発揮します。
1. ETLパイプライン(本記事のシナリオ: S3 CSV → 変換 → S3)
S3 に置かれた CSV ファイルを ECS Fargate コンテナで読み込み、データ変換・クレンジング処理を実施して、結果を再び S3 へ書き戻すパターンです。Step Functions が処理の開始・監視・後続ステップへの連携を担い、シンプルなコンテナで複雑なパイプラインを実現します。本記事ではこのシナリオをハンズオン形式で実装します。
2. 動画・画像処理バッチ
アップロードされた動画のトランスコードやサムネイル生成など、CPU・メモリを大量消費する処理に適しています。Fargate でリソースを柔軟に割り当て、処理完了後に自動終了するため、常時起動インスタンスが不要でコストを最適化できます。
3. 機械学習推論バッチ(SageMaker との違いも言及)
学習済みモデルをコンテナにパッケージし、バッチデータへの推論を定期実行するパターンです。SageMaker Batch Transform はマネージドで便利ですが、カスタムライブラリや独自フレームワークが必要な場合は ECS Fargate の方が柔軟性が高く、コンテナイメージさえ用意すればどんな ML 環境でも動かせます。
4. レポート生成・PDF 出力
月次・週次で集計データを取得し、PDF や Excel 形式のレポートを自動生成する処理です。Step Functions でデータ取得 → 集計 → PDF 変換 → S3 保存 → メール送信という一連のフローを制御でき、スケジュール実行(EventBridge との連携)で完全自動化が可能です。
5. データウェアハウスへのロード処理
S3 や RDS から大量データを抽出し、Redshift や Snowflake などのデータウェアハウスへロードするバッチです。ECS コンテナで重い変換処理を担い、Step Functions で依存関係のある複数ステップ(抽出 → 変換 → ロード → 検証)を順序制御することで、信頼性の高い ELT パイプラインを構築できます。
概念を把握したところで、次は実際に構築する構成の全体像を確認しましょう。
2. アーキテクチャ解説
本セクションでは、ECS FargateとStep Functionsを組み合わせたCSVバッチ処理ジョブの全体像を、3つの観点から図解する。
2-1. 全体アーキテクチャ

S3バケットに入力CSVがアップロードされると、Step Functionsのステートマシンが起動し、ECS Fargateタスクを呼び出す。Fargateタスクは、Amazon ECRから取得したDockerイメージを使って起動し、S3から入力データを読み込み、変換処理後に結果をS3へ書き出す。タスクの実行ログはAmazon CloudWatch Logsに自動転送され、運用監視が容易になる。処理の各ステップはStep Functionsが状態管理するため、エラー時のリトライ・フォールバックをコードなしで制御できる。
2-2. ステートマシンフロー

ステートマシンは「RunBatchJob」(Task状態)を起点とし、ECS Fargateのタスク完了を同期的に待機する(ecs:runTask.sync統合パターン)。タスクが正常終了すると「JobSucceeded」(Succeed状態)へ遷移し、実行完了となる。タスクが失敗した場合は「JobFailed」(Fail状態)へ遷移し、実行エラーとして記録される。また、RunBatchJob状態にはリトライ設定(MaxAttempts: 2、IntervalSeconds: 30)とキャッチ設定が組み込まれており、一時的な障害に対して自動的に再試行が行われる。
2-3. シーケンス図

シーケンス図はStep Functions・ECS Fargate・S3間の時系列インタラクションを示している。①ユーザーがStep FunctionsのStartExecutionを呼び出すことで処理が始まる。②Step FunctionsがECS Fargateに対してrunTaskを発行し、タスクが起動する。③ECSタスクはS3(Input)からGetObjectでCSVデータを取得する。④取得したデータに対してコンテナ内でデータ変換処理を実行する。⑤処理済みデータをPutObjectでS3(Output)に書き込む。⑥ECSタスクから完了通知がStep Functionsに返却され、⑦ステートマシンが次のステート(JobSucceeded)へ遷移して実行完了となる。
アーキテクチャの全体像を把握できたので、いよいよAWSコンソールを使って実際に構築してみましょう。
3. AWSコンソールでのハンズオン
このセクションでは、AWSコンソールを使って実際にS3 CSVバッチ処理パイプラインを構築します。ECS Fargateでコンテナタスクを実行し、Step Functionsでその実行を制御する一連の流れを手を動かしながら学んでいきましょう。
3-0. 前提条件
作業を始める前に、以下の環境が整っていることを確認してください。
- AWSアカウント: 有効なAWSアカウントを持っていること
- IAM権限: ECS、Step Functions、IAM、ECR、S3、CloudWatch Logsを操作できる権限があること(管理者権限推奨)
- Docker ローカル環境: ECRへのイメージPushに必要。
docker versionでDocker Desktopが動作していることを確認 - AWS CLI: インストール済みかつ
aws configureで認証情報を設定済みであること(aws sts get-caller-identityで確認可能) - デフォルトVPC: 本ハンズオンではデフォルトVPCを使用します。AWSコンソールの「VPC」→「お使いのVPC」でデフォルトVPCが存在することを確認してください
3-1. サンプルPythonスクリプト(ECSタスクで実行するコード)
今回構築するECSタスクが実行するPythonスクリプトを用意します。このスクリプトは、S3からCSVファイルを読み込み、amount 列の値を2倍に変換して、別のS3バケットに書き戻します。
app.py:
import os, boto3, csv, iodef main(): s3 = boto3.client('s3') input_bucket = os.environ['INPUT_BUCKET'] input_key = os.environ['INPUT_KEY'] output_bucket = os.environ['OUTPUT_BUCKET'] output_key = os.environ['OUTPUT_KEY'] # S3からCSV読み込み obj = s3.get_object(Bucket=input_bucket, Key=input_key) reader = csv.DictReader(io.StringIO(obj['Body'].read().decode('utf-8'))) rows = list(reader) # 変換処理(例: amount列を2倍) for row in rows: if 'amount' in row:row['amount'] = str(float(row['amount']) * 2) # 結果をS3に書き込み output = io.StringIO() writer = csv.DictWriter(output, fieldnames=rows[0].keys()) writer.writeheader() writer.writerows(rows) s3.put_object(Bucket=output_bucket, Key=output_key,Body=output.getvalue().encode('utf-8')) print(f"処理完了: {len(rows)} 行を {output_bucket}/{output_key} に書き込み")if __name__ == '__main__': main()処理の入出力先はすべて環境変数で受け取ります。Step Functionsの入力JSONから動的に注入されるため、コードを変更せずに様々なCSVファイルを処理できる設計になっています。
次に、このスクリプトをコンテナ化するための Dockerfile を作成します。
Dockerfile:
FROM python:3.11-slimWORKDIR /appRUN pip install boto3COPY app.py .CMD ["python", "app.py"]app.py と Dockerfile を同じディレクトリに配置してください。
3-2. ECRリポジトリ作成 → ビルド → Push
作成したDockerイメージをAWS ECR(Elastic Container Registry)にPushします。
① ECRリポジトリ作成
aws ecr create-repository \ --repository-name csv-processor \ --region ap-northeast-1出力の repositoryUri をメモしておきます(例: 123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/csv-processor)。
② ECRへのDockerログイン
aws ecr get-login-password --region ap-northeast-1 \ | docker login --username AWS \ --password-stdin 123456789012.dkr.ecr.ap-northeast-1.amazonaws.com注意:
123456789012はご自身のAWSアカウントIDに置き換えてください。
③ Dockerイメージをビルド・タグ付け・Push
# イメージのビルドdocker build -t csv-processor .# ECRリポジトリ用にタグ付けdocker tag csv-processor:latest \ 123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/csv-processor:latest# ECRへPushdocker push \ 123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/csv-processor:latestPushしたイメージURIは後のタスク定義作成で使用するため、コピーして手元に控えておいてください。
3-3. IAMロール作成(3種類)
このハンズオンでは、ECSとStep Functionsが適切なAWSリソースにアクセスできるよう、3つのIAMロールを作成します。
① ECSタスク実行ロール(ecsTaskExecutionRole)
ECSエージェントがECRからコンテナイメージをPullし、CloudWatch Logsにログを書き込むために必要なロールです。
信頼ポリシー(ecs-trust-policy.json):
{ "Version": "2012-10-17", "Statement": [ {"Effect": "Allow","Principal": { "Service": "ecs-tasks.amazonaws.com"},"Action": "sts:AssumeRole" } ]}# ロール作成aws iam create-role \ --role-name ecsTaskExecutionRole \ --assume-role-policy-document file://ecs-trust-policy.json# AWS管理ポリシーをアタッチaws iam attach-role-policy \ --role-name ecsTaskExecutionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy② ECSタスクロール(csv-processor-task-role)
コンテナ内のアプリケーション(app.py)がS3を読み書きするために必要なロールです。
信頼ポリシーは ecsTaskExecutionRole と同じ内容を使用します。
インラインポリシー(s3-access-policy.json):
{ "Version": "2012-10-17", "Statement": [ {"Effect": "Allow","Action": [ "s3:GetObject", "s3:PutObject"],"Resource": "arn:aws:s3:::*" } ]}ベストプラクティス: 本番環境では
"Resource"を実際のバケットARN(例:arn:aws:s3:::my-input-bucket/*)に絞り込んでください。
# ロール作成aws iam create-role \ --role-name csv-processor-task-role \ --assume-role-policy-document file://ecs-trust-policy.json# インラインポリシーを付与aws iam put-role-policy \ --role-name csv-processor-task-role \ --policy-name S3AccessPolicy \ --policy-document file://s3-access-policy.json③ Step Functions実行ロール(sf-ecs-execution-role)
Step FunctionsがECSタスクを起動するために必要なロールです。
信頼ポリシー(sf-trust-policy.json):
{ "Version": "2012-10-17", "Statement": [ {"Effect": "Allow","Principal": { "Service": "states.amazonaws.com"},"Action": "sts:AssumeRole" } ]}インラインポリシー(sf-ecs-policy.json):
{ "Version": "2012-10-17", "Statement": [ {"Effect": "Allow","Action": [ "ecs:RunTask", "ecs:StopTask", "ecs:DescribeTasks"],"Resource": "*" }, {"Effect": "Allow","Action": "iam:PassRole","Resource": [ "arn:aws:iam::123456789012:role/ecsTaskExecutionRole", "arn:aws:iam::123456789012:role/csv-processor-task-role"] }, {"Effect": "Allow","Action": [ "events:PutTargets", "events:PutRule", "events:DescribeRule"],"Resource": "*" } ]}注意:
123456789012はご自身のAWSアカウントIDに置き換えてください。iam:PassRoleはStep FunctionsがECSにロールを渡す際に必要です。
# ロール作成aws iam create-role \ --role-name sf-ecs-execution-role \ --assume-role-policy-document file://sf-trust-policy.json# インラインポリシーを付与aws iam put-role-policy \ --role-name sf-ecs-execution-role \ --policy-name ECSRunTaskPolicy \ --policy-document file://sf-ecs-policy.json3-4. ECSクラスター作成
Fargateタスクを実行するECSクラスターを作成します。
- AWSコンソールの検索バーで 「ECS」 を検索し、「Elastic Container Service」を開きます
- 左メニューの 「クラスター」 をクリック
- 「クラスターの作成」 ボタンをクリック
- 設定を入力します:
- クラスター名:
csv-batch-cluster - インフラストラクチャ: 「AWS Fargate (サーバーレス)」にチェック
- 「作成」 ボタンをクリック
CLIで作成する場合:
aws ecs create-cluster \ --cluster-name csv-batch-cluster \ --capacity-providers FARGATE \ --region ap-northeast-1クラスターのステータスが「ACTIVE」になれば作成完了です。
3-5. タスク定義作成
ECSタスクの設定(使用するコンテナイメージ、CPU/メモリ、IAMロール等)をタスク定義として登録します。
- ECSコンソールの左メニューで 「タスク定義」 をクリック
- 「新しいタスク定義の作成」 → 「JSONを使用した新しいタスク定義の作成」 を選択
- 以下のJSONを貼り付けます(
<YOUR_ACCOUNT_ID>をご自身のアカウントIDに変更):
{ "family": "csv-processor", "networkMode": "awsvpc", "requiresCompatibilities": ["FARGATE"], "cpu": "256", "memory": "512", "executionRoleArn": "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/ecsTaskExecutionRole", "taskRoleArn": "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/csv-processor-task-role", "containerDefinitions": [ {"name": "csv-processor","image": "<YOUR_ACCOUNT_ID>.dkr.ecr.ap-northeast-1.amazonaws.com/csv-processor:latest","essential": true,"environment": [],"logConfiguration": { "logDriver": "awslogs", "options": { "awslogs-group": "/ecs/csv-processor", "awslogs-region": "ap-northeast-1", "awslogs-stream-prefix": "ecs" }} } ]}- 「作成」 ボタンをクリック
| 設定項目 | 値 | 説明 |
|---|---|---|
| タスク定義名(family) | csv-processor | タスク定義の識別名 |
| 起動タイプ | FARGATE | サーバーレスでコンテナを実行 |
| タスクCPU | 256 (0.25 vCPU) | バッチ処理に十分なCPU |
| メモリ | 512 MB (0.5 GB) | CSVの読み書きには十分 |
| タスク実行ロール | ecsTaskExecutionRole | ECRイメージPull・ログ出力用 |
| タスクロール | csv-processor-task-role | S3アクセス用 |
ポイント: 環境変数(INPUT_BUCKET等)はタスク定義には設定しません。Step Functionsの
ContainerOverridesから実行時に動的に注入されます。
CloudWatch Logsのロググループを事前に作成しておきましょう:
aws logs create-log-group \ --log-group-name /ecs/csv-processor \ --region ap-northeast-13-6. Step Functionsステートマシン作成
いよいよStep Functionsのステートマシンを作成します。ECSタスクをオーケストレーションするASL(Amazon States Language)を定義します。
手順
- AWSコンソールの検索バーで 「Step Functions」 を検索して開きます
- 「ステートマシンの作成」 をクリック
- 「空白で記述」 を選択し、「次へ」 をクリック
- 「コード」 タブを選択し、以下のASL JSONを貼り付けます:
{ "Comment": "S3 CSVバッチ処理ジョブ — ECS Fargate × Step Functions", "StartAt": "RunBatchJob", "States": { "RunBatchJob": {"Type": "Task","Resource": "arn:aws:states:::ecs:runTask.sync","Parameters": { "LaunchType": "FARGATE", "Cluster.$": "$.cluster_arn", "TaskDefinition.$": "$.task_definition_arn", "NetworkConfiguration": { "AwsvpcConfiguration": {"Subnets.$": "$.subnets","SecurityGroups.$": "$.security_groups","AssignPublicIp": "ENABLED" } }, "Overrides": { "ContainerOverrides": [{ "Name": "csv-processor", "Environment": [ {"Name": "INPUT_BUCKET", "Value.$": "$.input_bucket"}, {"Name": "INPUT_KEY", "Value.$": "$.input_key"}, {"Name": "OUTPUT_BUCKET", "Value.$": "$.output_bucket"}, {"Name": "OUTPUT_KEY", "Value.$": "$.output_key"} ]} ] }},"Retry": [ { "ErrorEquals": ["States.TaskFailed"], "IntervalSeconds": 30, "MaxAttempts": 2, "BackoffRate": 2.0 }],"Catch": [ { "ErrorEquals": ["States.ALL"], "Next": "JobFailed", "ResultPath": "$.error" }],"Next": "JobSucceeded" }, "JobSucceeded": {"Type": "Succeed" }, "JobFailed": {"Type": "Fail","Error": "BatchJobFailed","Cause": "ECS task failed. Check CloudWatch Logs for details." } }}- 設定を入力します:
- ステートマシン名:
csv-batch-state-machine - 実行ロール: 「既存のロールを選択」→
sf-ecs-execution-role - 「作成」 ボタンをクリック
ASL解説
このASLのポイントを理解しておきましょう。
ecs:runTask.sync とは?
Resource に指定した arn:aws:states:::ecs:runTask.sync は、Step Functionsが持つECSとの最適化された統合(Optimized Integration)です。末尾の .sync により、ECSタスクが完了するまでStep Functionsが自動的に待機します。タスクのポーリングはStep Functionsが内部で行うため、Lambdaなどを挟む必要がありません。
.$ サフィックスの意味
"Cluster.$": "$.cluster_arn" のように、値に .$ が付いたフィールドはステートマシンの実行時入力JSONから動的に値を取得します。$ はルートパスを示し、$.cluster_arn は入力JSONの cluster_arn フィールドを参照します。これにより、同じステートマシンを異なる環境・バケット・ファイルに対して再利用できます。
Retry/Catch の設定値
| 設定 | 値 | 意味 |
|---|---|---|
ErrorEquals | States.TaskFailed | ECSタスクが失敗した場合にリトライ |
IntervalSeconds | 30 | 初回リトライまでの待機時間(秒) |
MaxAttempts | 2 | 最大リトライ回数(初回 + 最大2回 = 計3回試行) |
BackoffRate | 2.0 | リトライ間隔の指数バックオフ係数(30s → 60s) |
Catch | States.ALL | すべてのエラーをキャッチ → JobFailed 状態へ遷移 |
3-7. ステートマシン実行・結果確認
S3バケットとサンプルCSVの準備
実行前に、入力データを格納するS3バケットとサンプルCSVファイルを用意します。
# 入力・出力バケット作成(バケット名はグローバルで一意にする必要があります)aws s3 mb s3://my-csv-input-bucket-<YOUR_UNIQUE_SUFFIX> --region ap-northeast-1aws s3 mb s3://my-csv-output-bucket-<YOUR_UNIQUE_SUFFIX> --region ap-northeast-1# サンプルCSV作成 & アップロードcat << 'EOF' > sample.csvid,name,amount1,Alice,1002,Bob,2003,Charlie,150EOFaws s3 cp sample.csv s3://my-csv-input-bucket-<YOUR_UNIQUE_SUFFIX>/data/sample.csvステートマシンの実行
- Step Functionsコンソールで
csv-batch-state-machineを開きます - 「実行の開始」 をクリック
- 入力JSONに以下を貼り付けます(各値はご自身の環境に合わせて変更してください):
{ "cluster_arn": "arn:aws:ecs:ap-northeast-1:123456789012:cluster/csv-batch-cluster", "task_definition_arn": "arn:aws:ecs:ap-northeast-1:123456789012:task-definition/csv-processor:1", "subnets": ["subnet-0123456789abcdef0"], "security_groups": ["sg-0123456789abcdef0"], "input_bucket": "my-csv-input-bucket-<YOUR_UNIQUE_SUFFIX>", "input_key": "data/sample.csv", "output_bucket": "my-csv-output-bucket-<YOUR_UNIQUE_SUFFIX>", "output_key": "result/processed.csv"}ヒント:
subnetsとsecurity_groupsはデフォルトVPCのものを使用します。コンソールの「VPC」→「サブネット」でデフォルトVPCに紐づくサブネットIDを、「セキュリティグループ」でデフォルトセキュリティグループのIDを確認してください。
- 「実行の開始」 ボタンをクリック
実行グラフと結果の確認
実行が開始されると、グラフビューで各ステートの進捗がリアルタイムで確認できます。
- 青色: 実行中
- 緑色: 成功
- 赤色: 失敗
RunBatchJob → JobSucceeded の順で緑色になれば成功です。
CloudWatch Logsの確認
ECSタスクのログを確認するには:
- AWSコンソールで 「CloudWatch」 を開きます
- 左メニューの 「ロগগループ」 をクリック
/ecs/csv-processorを選択します- 最新のログストリームを開くと、
app.pyの出力が確認できます:
処理完了: 3 行を my-csv-output-bucket-xxx/result/processed.csv に書き込み処理結果の確認
出力バケットにCSVが書き込まれているか確認します:
aws s3 cp \ s3://my-csv-output-bucket-<YOUR_UNIQUE_SUFFIX>/result/processed.csv \ - | catamount 列が2倍になった結果が出力されていれば成功です:
id,name,amount1,Alice,200.02,Bob,400.03,Charlie,300.0以上でAWSコンソールを使ったECS Fargate × Step Functionsのハンズオンは完了です。次のセクションでは、同じ構成をTerraformで再現する方法を解説します。
4. Terraformでの構築
コンソールで構築したS3→ECS Fargate→S3のCSVバッチジョブを、今度はTerraformを使ってコード化してみましょう。Infrastructure as Code(IaC)で管理することで、環境の再現性が高まり、チームでの共有や変更管理が容易になります。
4-1. 前提条件
作業を始める前に、以下の環境が整っていることを確認してください。
- Terraform: バージョン 1.0 以上がインストール済み
bash
terraform version
# Terraform v1.x.x 以上であることを確認 - AWS CLI:
aws configureで認証情報が設定済みbash
aws sts get-caller-identity
# アカウントIDとIAMユーザー/ロール情報が返ってくれば設定完了 - Docker: ECRへのコンテナイメージPushに使用します
bash
docker version
# Docker Engine が動作していることを確認 - IAM 権限: 以下の操作権限が必要です
AdministratorAccessまたは以下の個別権限iam:CreateRole/iam:AttachRolePolicy/iam:PutRolePolicyecr:CreateRepository/ecs:CreateCluster/ecs:RegisterTaskDefinitionstates:CreateStateMachinelogs:CreateLogGroup
4-2. ディレクトリ構成
以下のディレクトリ構成でプロジェクトを作成します。
ecs-sf-handson/├── main.tf├── variables.tf├── outputs.tf└── statemachine/ └── definition.jsonmkdir -p ecs-sf-handson/statemachinecd ecs-sf-handson4-3. statemachine/definition.json
コンソール版と同一のASL定義です。 このファイルはコンソールで作成したステートマシンと一字一句同じ定義を使用しています。
{ "Comment": "S3 CSVバッチ処理ジョブ — ECS Fargate × Step Functions", "StartAt": "RunBatchJob", "States": { "RunBatchJob": {"Type": "Task","Resource": "arn:aws:states:::ecs:runTask.sync","Parameters": { "LaunchType": "FARGATE", "Cluster.$": "$.cluster_arn", "TaskDefinition.$": "$.task_definition_arn", "NetworkConfiguration": { "AwsvpcConfiguration": {"Subnets.$": "$.subnets","SecurityGroups.$": "$.security_groups","AssignPublicIp": "ENABLED" } }, "Overrides": { "ContainerOverrides": [{ "Name": "csv-processor", "Environment": [ {"Name": "INPUT_BUCKET", "Value.$": "$.input_bucket"}, {"Name": "INPUT_KEY", "Value.$": "$.input_key"}, {"Name": "OUTPUT_BUCKET", "Value.$": "$.output_bucket"}, {"Name": "OUTPUT_KEY", "Value.$": "$.output_key"} ]} ] }},"Retry": [ { "ErrorEquals": ["States.TaskFailed"], "IntervalSeconds": 30, "MaxAttempts": 2, "BackoffRate": 2.0 }],"Catch": [ { "ErrorEquals": ["States.ALL"], "Next": "JobFailed", "ResultPath": "$.error" }],"Next": "JobSucceeded" }, "JobSucceeded": {"Type": "Succeed" }, "JobFailed": {"Type": "Fail","Error": "BatchJobFailed","Cause": "ECS task failed. Check CloudWatch Logs for details." } }}4-4. variables.tf
プロジェクト全体で使用する変数を定義します。vpc_id と subnet_ids はお使いの環境に合わせて設定してください。
# variables.tfvariable "aws_region" { description = "AWSリージョン" type = string default = "ap-northeast-1"}variable "project_name" { description = "プロジェクト名(リソース命名に使用)" type = string default = "csv-batch"}variable "vpc_id" { description = "デフォルトVPCのID(AWSコンソール → VPC で確認)" type = string # 例: "vpc-0123456789abcdef0"}variable "subnet_ids" { description = "デフォルトVPCのサブネットID一覧(AWSコンソール → VPC → サブネット で確認)" type = list(string) # 例: ["subnet-0123456789abcdef0", "subnet-0fedcba9876543210"]}補足:
vpc_idとsubnet_idsはデフォルトVPCの情報を使用します。AWSコンソールの VPC サービスから確認してください。または、後述のmain.tfに含まれるdataソースで自動取得することもできます(コメントアウトされた代替実装を参照)。
4-5. main.tf
すべてのAWSリソースを定義するメインファイルです。terraform plan が通る完全な実装を記載しています。
# main.tfterraform { required_version = ">= 1.0" required_providers { aws = {source = "hashicorp/aws"version = ">= 5.0" } }}provider "aws" { region = var.aws_region}# ============================================================# Data Sources# ============================================================# デフォルトVPC(variables.tfのvpc_idを使わず自動取得する場合はこちらを使用)data "aws_vpc" "default" { default = true}# デフォルトVPCのサブネット一覧(自動取得する場合)data "aws_subnets" "default" { filter { name= "vpc-id" values = [data.aws_vpc.default.id] }}# 現在のAWSアカウントID取得data "aws_caller_identity" "current" {}# ============================================================# ECR リポジトリ# ============================================================resource "aws_ecr_repository" "csv_processor" { name = "${var.project_name}-csv-processor" image_tag_mutability = "MUTABLE" image_scanning_configuration { scan_on_push = true } tags = { Project= var.project_name ManagedBy = "terraform" }}# ============================================================# IAM ロール — ECS タスク実行ロール# ============================================================resource "aws_iam_role" "ecs_task_execution_role" { name = "${var.project_name}-ecs-task-execution-role" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "ecs-tasks.amazonaws.com" }} ] }) tags = { Project= var.project_name ManagedBy = "terraform" }}# ECS タスク実行に必要なマネージドポリシーをアタッチresource "aws_iam_role_policy_attachment" "ecs_task_execution_role_policy" { role = aws_iam_role.ecs_task_execution_role.name policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"}# ============================================================# IAM ロール — ECS タスクロール(アプリケーション権限)# ============================================================resource "aws_iam_role" "ecs_task_role" { name = "${var.project_name}-ecs-task-role" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "ecs-tasks.amazonaws.com" }} ] }) tags = { Project= var.project_name ManagedBy = "terraform" }}# S3 への読み書き権限(CSVファイルの入出力)resource "aws_iam_role_policy" "task_s3_policy" { name = "${var.project_name}-task-s3-policy" role = aws_iam_role.ecs_task_role.id policy = jsonencode({ Version = "2012-10-17" Statement = [{ Effect = "Allow" Action = [ "s3:GetObject", "s3:PutObject" ] Resource = "arn:aws:s3:::*"} ] })}# ============================================================# IAM ロール — Step Functions 実行ロール# ============================================================resource "aws_iam_role" "sf_execution_role" { name = "${var.project_name}-sf-execution-role" assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{ Action = "sts:AssumeRole" Effect = "Allow" Principal = { Service = "states.amazonaws.com" }} ] }) tags = { Project= var.project_name ManagedBy = "terraform" }}# Step Functions が ECS タスクを起動し、ログを記録する権限resource "aws_iam_role_policy" "sf_ecs_policy" { name = "${var.project_name}-sf-ecs-policy" role = aws_iam_role.sf_execution_role.id policy = jsonencode({ Version = "2012-10-17" Statement = [{ Effect = "Allow" Action = [ "ecs:RunTask", "ecs:StopTask", "ecs:DescribeTasks" ] Resource = "*"},{ Effect= "Allow" Action= "iam:PassRole" Resource = [ aws_iam_role.ecs_task_execution_role.arn, aws_iam_role.ecs_task_role.arn ]},{ Effect = "Allow" Action = [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ] Resource = "*"},{ Effect = "Allow" Action = [ "events:PutTargets", "events:PutRule", "events:DescribeRule" ] Resource = "arn:aws:events:${var.aws_region}:${data.aws_caller_identity.current.account_id}:rule/StepFunctionsGetEventsForECSTaskRule"} ] })}# ============================================================# セキュリティグループ — ECS タスク用# ============================================================resource "aws_security_group" "ecs_task_sg" { name = "${var.project_name}-ecs-task-sg" description = "ECS タスク用セキュリティグループ(アウトバウンドのみ)" vpc_id= var.vpc_id # アウトバウンド: すべての通信を許可(S3アクセス等に必要) egress { from_port= 0 to_port = 0 protocol = "-1" cidr_blocks = ["0.0.0.0/0"] } tags = { Project= var.project_name ManagedBy = "terraform" }}# ============================================================# CloudWatch Logs グループ# ============================================================resource "aws_cloudwatch_log_group" "ecs_task" { name = "/ecs/${var.project_name}/csv-processor" retention_in_days = 30 tags = { Project= var.project_name ManagedBy = "terraform" }}# ============================================================# ECS クラスター# ============================================================resource "aws_ecs_cluster" "main" { name = "${var.project_name}-cluster" setting { name = "containerInsights" value = "enabled" } tags = { Project= var.project_name ManagedBy = "terraform" }}# ============================================================# ECS タスク定義# ============================================================resource "aws_ecs_task_definition" "csv_processor" { family = "csv-processor" requires_compatibilities = ["FARGATE"] network_mode = "awsvpc" cpu = "256" memory = "512" execution_role_arn = aws_iam_role.ecs_task_execution_role.arn task_role_arn= aws_iam_role.ecs_task_role.arn container_definitions = jsonencode([ {name= "csv-processor"image = "${aws_ecr_repository.csv_processor.repository_url}:latest"essential = trueenvironment = [ # 実行時に Step Functions から上書きされる(Overrides で渡す) { name = "INPUT_BUCKET", value = "" }, { name = "INPUT_KEY", value = "" }, { name = "OUTPUT_BUCKET", value = "" }, { name = "OUTPUT_KEY", value = "" }]logConfiguration = { logDriver = "awslogs" options = { "awslogs-group"= aws_cloudwatch_log_group.ecs_task.name "awslogs-region" = var.aws_region "awslogs-stream-prefix" = "ecs" }} } ]) tags = { Project= var.project_name ManagedBy = "terraform" }}# ============================================================# Step Functions ステートマシン# ============================================================resource "aws_sfn_state_machine" "csv_batch" { name = "${var.project_name}-state-machine" role_arn = aws_iam_role.sf_execution_role.arn # statemachine/definition.json を読み込む(コンソール版と同一のASL定義) definition = file("statemachine/definition.json") tags = { Project= var.project_name ManagedBy = "terraform" }}4-6. outputs.tf
デプロイ後に参照する情報を出力します。後続のDocker Push手順やAWS CLIでの実行確認に使用します。
# outputs.tfoutput "ecr_repository_url" { description = "ECR リポジトリの URL(docker push に使用)" value = aws_ecr_repository.csv_processor.repository_url}output "ecs_cluster_arn" { description = "ECS クラスターの ARN" value = aws_ecs_cluster.main.arn}output "task_definition_arn" { description = "ECS タスク定義の ARN" value = aws_ecs_task_definition.csv_processor.arn}output "state_machine_arn" { description = "Step Functions ステートマシンの ARN" value = aws_sfn_state_machine.csv_batch.arn}output "state_machine_name" { description = "Step Functions ステートマシン名" value = aws_sfn_state_machine.csv_batch.name}4-7. デプロイ手順
ファイルがすべて作成できたら、Terraformでデプロイします。
初期化
terraform init初回実行時にAWSプロバイダーがダウンロードされます。
Initializing the backend...Initializing provider plugins...- Finding hashicorp/aws versions matching ">= 5.0.0"...- Installing hashicorp/aws v5.x.x...Terraform has been successfully initialized!実行計画の確認
terraform plan作成されるリソースを確認します。+ が付いているリソースが新規作成されます。Plan: 行に作成リソース数が表示されます。
Plan: 13 to add, 0 to change, 0 to destroy.Changes to Outputs: + ecr_repository_url = (known after apply) + ecs_cluster_arn = (known after apply) + state_machine_arn= (known after apply) + state_machine_name = "csv-batch-state-machine" + task_definition_arn = (known after apply)plan 出力の読み方:
+は新規作成、~は変更、-は削除を意味します。(known after apply)は適用後でないと確定しない値(ARNなど)です。Plan:行でリソース数を必ず確認してからapplyに進みましょう。
デプロイ実行
terraform apply確認プロンプトに yes を入力すると実際にリソースが作成されます。
Do you want to perform these actions? Terraform will perform the following actions: # ... (省略) Enter a value: yesApply complete! Resources: 13 added, 0 changed, 0 destroyed.Outputs:ecr_repository_url = "123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/csv-batch-csv-processor"ecs_cluster_arn = "arn:aws:ecs:ap-northeast-1:123456789012:cluster/csv-batch-cluster"state_machine_arn= "arn:aws:states:ap-northeast-1:123456789012:stateMachine:csv-batch-state-machine"state_machine_name = "csv-batch-state-machine"task_definition_arn = "arn:aws:ecs:ap-northeast-1:123456789012:task-definition/csv-processor:1"4-8. コンテナイメージのPush(Terraform apply後)
terraform apply が完了したら、ECRリポジトリにコンテナイメージをPushします。
ECRへのログイン
# ECRリポジトリURLを変数に設定ECR_URL=$(terraform output -raw ecr_repository_url)AWS_REGION="ap-northeast-1"# ECRへDockerログインaws ecr get-login-password --region ${AWS_REGION} | \ docker login --username AWS --password-stdin ${ECR_URL}コンテナイメージのビルドとPush
# イメージをビルド(Dockerfileがあるディレクトリで実行)docker build -t csv-processor .# ECRリポジトリ用にタグ付けdocker tag csv-processor:latest ${ECR_URL}:latest# ECRへPushdocker push ${ECR_URL}:latestPush完了後、AWSコンソールの ECR → リポジトリ → csv-batch-csv-processor でイメージが登録されていることを確認してください。
4-9. ステートマシン実行確認(Terraform版)
terraform output で取得したARNを使って、AWS CLIからステートマシンを実行します。
実行コマンド
aws stepfunctions start-execution \ --state-machine-arn $(terraform output -raw state_machine_arn) \ --input '{ "cluster_arn": "<ECS_CLUSTER_ARN>", "task_definition_arn": "<TASK_DEF_ARN>", "subnets": ["<SUBNET_ID>"], "security_groups": ["<SG_ID>"], "input_bucket": "my-input-bucket", "input_key": "data/sample.csv", "output_bucket": "my-output-bucket", "output_key": "result/processed.csv" }'<ECS_CLUSTER_ARN> などの値は terraform output で確認できます。
# 各ARNを確認terraform output ecs_cluster_arnterraform output task_definition_arn実行結果の確認
# 実行ARNを変数に設定EXECUTION_ARN="<start-executionで返却されたexecutionArn>"aws stepfunctions describe-execution \ --execution-arn "${EXECUTION_ARN}" \ --region ap-northeast-1"status": "SUCCEEDED" が返れば成功です。失敗した場合は "status": "FAILED" となり、cause フィールドにエラー内容が含まれます。
AWSコンソールでの確認
- AWS マネジメントコンソール → Step Functions に移動
- ステートマシン 一覧に
csv-batch-state-machineが表示されることを確認 - ステートマシンをクリック → 実行履歴でグラフビューを確認
- CloudWatch Logs →
/ecs/csv-batch/csv-processorでタスクのログを確認
ハンズオンが完了したら、不要なコストが発生しないようリソースを削除しましょう。
5. ハンズオン後の削除手順
5-1. 重要: コスト発生リソースへの注意
⚠️ NAT Gatewayは常時課金(約$0.045/時間 ≒ 約$33/月)です。
ハンズオン完了後は速やかに削除してください。
本ハンズオンではデフォルトVPCを使用するためNAT Gatewayは作成していませんが、独自VPCを構築した場合は追加でNAT Gateway料金が発生します。下表を参考に、作成したリソースを確認してください。
| リソース | 月額目安 | 備考 |
|---|---|---|
| NAT Gateway | ~$33 | 本ハンズオンでデフォルトVPCを使用する場合は不要 |
| ECS Fargate(タスク実行中のみ) | 実行時間課金 | タスク停止で課金停止 |
| CloudWatch Logs | 保存量に応じて | ロググループを削除で停止 |
5-2. Terraformで構築した場合の削除
Terraformで構築した場合は terraform destroy でほとんどのリソースを一括削除できますが、ECRのコンテナイメージはTerraform管理外のため手動削除が必要です。
# まずECRのイメージを手動削除(terraform destroyでは削除されない)aws ecr batch-delete-image \ --repository-name csv-processor \ --image-ids imageTag=latest# Terraform リソース削除terraform destroyterraform destroy で削除されないリソースには注意が必要です。
- ECR内のコンテナイメージ:
batch-delete-imageコマンドで事前に手動削除が必要 - CloudWatch Logsのロググループ: 手動削除するか、ロググループの保持期間を設定して自動削除させる
- S3バケット内のオブジェクト: バケット自体がTerraform管理外の場合、オブジェクト・バケットともに手動削除が必要
5-3. コンソールで構築した場合の削除チェックリスト
AWSコンソールから構築した場合は、以下の順序でリソースを削除してください。依存関係があるため、上から順に削除することを推奨します。
- [ ] Step Functions ステートマシンを削除
- [ ] ECSタスク定義を登録解除(削除ではなく「INACTIVE」化)
- [ ] ECSクラスターを削除
- [ ] ECRリポジトリを削除(内部イメージも削除される)
- [ ] IAMロール 3種を削除(ecs_task_execution / ecs_task / sf_execution)
- [ ] CloudWatch Logsのロググループを削除
- [ ] S3バケット(作成した場合)を削除
💡 ヒント: 削除後は AWS Billing コンソール で請求が止まっていることを確認しましょう。予期せぬ課金が続いている場合は、リソースが残っている可能性があります。
6. まとめと次のステップ
6-1. 本記事のまとめ
本記事では、Amazon ECS FargateとAWS Step Functionsを組み合わせたサーバーレスバッチジョブ基盤の構築をハンズオン形式で学びました。以下が本記事を通じて習得したポイントです。
- ECS FargateとStep Functionsを組み合わせたバッチジョブ構成: サーバーレスでスケーラブルなバッチ処理基盤を構築する方法を習得
.sync統合パターンによるタスク完了待ち受け:arn:aws:states:::ecs:runTask.syncを使ってECSタスクの完了を確実に待機する方法を理解- Retry/Catchによる宣言的エラーハンドリング: ステートマシン定義内でリトライポリシーとエラーキャッチを記述し、堅牢なワークフローを実現
- コンソール操作とTerraformの両方による構築: GUI操作でAWSリソースの構造を把握しつつ、IaCでの再現方法も習得
- ASL定義の外部ファイル化(Terraform版):
file()関数を使いステートマシン定義を外部JSONファイルに分離管理し、保守性を向上 - CloudWatch Logsによる実行ログ確認: ECSタスクとStep Functionsの実行履歴・ログを CloudWatch で一元確認する手順を習得
- IAMロールの最小権限設計: タスク実行ロール・タスクロール・SF実行ロールをそれぞれ分離し、最小権限の原則を実践
- コスト管理の基本: NAT Gatewayなど常時課金リソースの把握と、ハンズオン後の削除手順
6-2. 発展トピック(次のステップ)
本ハンズオンで基本的なECS × Step Functionsパターンを習得できました。次回以降のシリーズでは、以下の発展トピックを解説予定です。
| トピック | 概要 |
|---|---|
| Parallel ステートによる複数ECSタスクの並列実行 | 独立した複数バッチ処理を同時実行してスループットを向上 |
.waitForTaskToken パターン | 長時間処理や人承認フローなど、外部イベント待ちのワークフローを実装 |
| EventBridge + Step Functions | スケジュール実行やS3イベントトリガーによるイベント駆動バッチ構成 |
| コンテナイメージの CI/CD | GitHub Actions → ECR自動Pushによる継続的デリバリーパイプラインの構築 |
| Step Functions Express Workflow | 高頻度・短時間の小バッチ処理に最適化されたワークフロータイプの活用 |
6-3. 参考リンク
本記事の内容をさらに深掘りするための公式ドキュメントです。
- Amazon ECS ドキュメント(Fargate起動タイプ)
- Step Functions + ECS 統合(呼び出しSDKの統合)
- Amazon States Language 仕様
- Terraform aws_ecs_task_definition リソース
- Terraform aws_sfn_state_machine リソース
6-4. 記事フッター
本記事は「AWS ハンズオン TechBlog」Step Functions シリーズの第2回です。
第1回: AWS Step Functions 入門 — コンソールとTerraformで学ぶハンズオン
次回: さらなる発展トピックを近日公開予定