ECS × Step Functions 入門 — CSVバッチをFargateタスクでジョブ化するハンズオン

目次

ECS Task × Step Functions でジョブオーケストレーション — S3 CSVバッチ処理ハンズオン

公開日: 2026-04-12
難易度: 中級
所要時間: 約90分
タグ: AWS, Step Functions, ECS, Fargate, Terraform, ハンズオン, バッチ処理

この記事で学ぶこと
– ECS FargateとStep Functionsを組み合わせたバッチジョブ制御
– S3 CSVファイルを読み込み・処理・書き戻すECSコンテナの実装
– コンソールとTerraform両方でのステートマシン構築
– Retry/Catchによる堅牢なエラーハンドリング


1. 概念編

1-1. ECS Task / Fargate とは

Amazon ECS(Elastic Container Service)は、AWS が提供するフルマネージドのコンテナオーケストレーションサービスです。Docker コンテナのデプロイ・管理・スケーリングを自動化し、開発者がアプリケーションのロジックに集中できる環境を提供します。

EC2 起動タイプ vs Fargate 起動タイプ

ECS には2種類の起動タイプがあり、用途に応じて使い分けます。

項目EC2 起動タイプFargate 起動タイプ
サーバー管理EC2 インスタンスを自分で管理不要(AWS が管理)
コストインスタンス稼働中は常時課金タスク実行時間・リソース量で課金
カスタマイズ性OS レベルのカスタマイズ可能コンテナ単位のみ
スケーリングインスタンス数を管理する必要あり自動(インフラ意識不要)
向いているケース長時間稼働・高負荷・特殊要件バッチ処理・スポット起動・シンプル構成

Fargate のメリット

  • サーバー管理不要: EC2 インスタンスのプロビジョニング・パッチ適用・スケーリングを AWS に任せられます。インフラ運用コストを大幅に削減できます。
  • タスク単位課金: 実際にコンテナが動いている時間と消費した vCPU・メモリ量のみ課金されます。バッチ処理のように「必要なときだけ起動」するユースケースでは特にコスト効率が高くなります。

ECS の基本概念

ECS を理解する上で、以下の4つの概念を押さえておきましょう。

概念説明
クラスターコンテナを実行するための論理的なグループ。Fargate の場合、実体はマネージドインフラ。
タスク定義コンテナイメージ・CPU・メモリ・環境変数などを定義したテンプレート(「設計図」)。
タスクタスク定義をもとに実際に起動されるコンテナの実行単位。本記事ではバッチ処理の1ジョブに相当。
サービス指定した数のタスクを常時維持する仕組み。Web サーバーなど常駐型アプリに使用(本記事では不使用)。

1-2. AWS Step Functions とは(おさらい)

AWS Step Functions については以下の入門記事で詳しく解説しています。本記事では要点のみ振り返ります。

AWS Step Functions 入門 — コンソールとTerraformで学ぶハンズオン

Step Functions のポイント(3行まとめ):

  • ステートマシンでワークフローを JSON(ASL: Amazon States Language)として定義し、各ステップの実行順序・分岐・並列処理を宣言的に記述します。
  • Lambda・ECS・DynamoDB・S3 など 200 以上の AWS サービスと SDK 統合でき、グルーコードなしでサービス間を連携できます。
  • AWS コンソール上のビジュアルワークフローエディタでグラフを確認しながら実行状態をリアルタイムに追跡できます。

1-3. なぜ ECS × Step Functions を組み合わせるか

ECS Fargate と Step Functions を組み合わせることで、バッチ処理の弱点をカバーしつつ、オーケストレーションの強みを最大化できます。

長時間バッチ処理の管理(Lambda の 15 分制限を回避)

AWS Lambda はタイムアウト上限が 15 分です。数十 GB の CSV 変換や大規模な機械学習推論など、実行に数時間かかるバッチ処理には対応できません。ECS Fargate にはタイムアウト制限がなく、Step Functions でその実行を制御することで、長時間ジョブを安全に扱えます。

宣言的なリトライ・エラーハンドリング(Retry / Catch)

Step Functions のステート定義には RetryCatch フィールドが用意されています。「最大3回リトライ、その後失敗ステートへ遷移」といった複雑なエラーハンドリングをコードなしで JSON で宣言できます。アプリケーション側にリトライロジックを書く必要がなく、信頼性の高いバッチパイプラインを構築できます。

可視化(実行グラフでジョブ状態を一目確認)

Step Functions コンソールでは、ワークフローの各ステップがグラフ上で色分けされ、実行中・成功・失敗の状態をリアルタイムで確認できます。複数の ECS タスクが直列・並列で走るパイプラインでも、どのステップで止まったかを即座に特定でき、デバッグ効率が大幅に向上します。

並列実行(Parallel ステートで複数 ECS タスク同時起動)

Step Functions の Parallel ステートを使うと、独立した ECS タスクを同時に起動できます。例えば「地域 A・B・C のデータを同時に処理して結果を集約する」といったパターンを、コードを書かずに実現できます。

Lambda 不要の直接 SDK 統合(コスト・レイテンシ削減)

従来は「Step Functions → Lambda → ECS 起動」という間接的な構成が一般的でしたが、現在は Step Functions から ECS タスクを 直接起動できます。Lambda の実行コスト・起動レイテンシを省けるため、シンプルかつ低コストな構成になります。


1-4. Step Functions × ECS 統合パターン

Step Functions から ECS タスクを呼び出す方法は主に2パターンあります。

項目.sync パターン.waitForTaskToken パターン
リソース ARNarn:aws:states:::ecs:runTask.syncarn:aws:states:::ecs:runTask.waitForTaskToken
動作SF が ECS タスクの完了を待ち受け、終了後に次ステートへ進むECS タスクがトークンを受け取り、処理完了後に SF へコールバック
向いているケース数分〜数時間のバッチ処理(本記事のユースケース)外部システムとの非同期連携、人間によるレビュー待ちなど
タイムアウト上限Step Functions の最大実行時間(1年)に依存同上(コールバックが来るまで待機)
ECS タスク側の実装変更不要(タスクが終了すれば自動で次へ)タスク内でトークンを受け取り SendTaskSuccess/Failure API を呼ぶ実装が必要

本記事では .sync パターンを使用します。 ECS タスクが終了するまで Step Functions が待機し、終了後に次のステップへ自動的に移行するシンプルな構成です。コールバック実装が不要なため、バッチ処理の入門には最適です。


1-5. 典型ユースケース

ECS × Step Functions の組み合わせは、以下のような場面で威力を発揮します。

1. ETLパイプライン(本記事のシナリオ: S3 CSV → 変換 → S3)

S3 に置かれた CSV ファイルを ECS Fargate コンテナで読み込み、データ変換・クレンジング処理を実施して、結果を再び S3 へ書き戻すパターンです。Step Functions が処理の開始・監視・後続ステップへの連携を担い、シンプルなコンテナで複雑なパイプラインを実現します。本記事ではこのシナリオをハンズオン形式で実装します。

2. 動画・画像処理バッチ

アップロードされた動画のトランスコードやサムネイル生成など、CPU・メモリを大量消費する処理に適しています。Fargate でリソースを柔軟に割り当て、処理完了後に自動終了するため、常時起動インスタンスが不要でコストを最適化できます。

3. 機械学習推論バッチ(SageMaker との違いも言及)

学習済みモデルをコンテナにパッケージし、バッチデータへの推論を定期実行するパターンです。SageMaker Batch Transform はマネージドで便利ですが、カスタムライブラリや独自フレームワークが必要な場合は ECS Fargate の方が柔軟性が高く、コンテナイメージさえ用意すればどんな ML 環境でも動かせます。

4. レポート生成・PDF 出力

月次・週次で集計データを取得し、PDF や Excel 形式のレポートを自動生成する処理です。Step Functions でデータ取得 → 集計 → PDF 変換 → S3 保存 → メール送信という一連のフローを制御でき、スケジュール実行(EventBridge との連携)で完全自動化が可能です。

5. データウェアハウスへのロード処理

S3 や RDS から大量データを抽出し、Redshift や Snowflake などのデータウェアハウスへロードするバッチです。ECS コンテナで重い変換処理を担い、Step Functions で依存関係のある複数ステップ(抽出 → 変換 → ロード → 検証)を順序制御することで、信頼性の高い ELT パイプラインを構築できます。


概念を把握したところで、次は実際に構築する構成の全体像を確認しましょう。

2. アーキテクチャ解説

本セクションでは、ECS FargateとStep Functionsを組み合わせたCSVバッチ処理ジョブの全体像を、3つの観点から図解する。


2-1. 全体アーキテクチャ

全体アーキテクチャ

S3バケットに入力CSVがアップロードされると、Step Functionsのステートマシンが起動し、ECS Fargateタスクを呼び出す。Fargateタスクは、Amazon ECRから取得したDockerイメージを使って起動し、S3から入力データを読み込み、変換処理後に結果をS3へ書き出す。タスクの実行ログはAmazon CloudWatch Logsに自動転送され、運用監視が容易になる。処理の各ステップはStep Functionsが状態管理するため、エラー時のリトライ・フォールバックをコードなしで制御できる。


2-2. ステートマシンフロー

ステートマシンフロー

ステートマシンは「RunBatchJob」(Task状態)を起点とし、ECS Fargateのタスク完了を同期的に待機する(ecs:runTask.sync統合パターン)。タスクが正常終了すると「JobSucceeded」(Succeed状態)へ遷移し、実行完了となる。タスクが失敗した場合は「JobFailed」(Fail状態)へ遷移し、実行エラーとして記録される。また、RunBatchJob状態にはリトライ設定(MaxAttempts: 2、IntervalSeconds: 30)とキャッチ設定が組み込まれており、一時的な障害に対して自動的に再試行が行われる。


2-3. シーケンス図

シーケンス図

シーケンス図はStep Functions・ECS Fargate・S3間の時系列インタラクションを示している。①ユーザーがStep FunctionsのStartExecutionを呼び出すことで処理が始まる。②Step FunctionsがECS Fargateに対してrunTaskを発行し、タスクが起動する。③ECSタスクはS3(Input)からGetObjectでCSVデータを取得する。④取得したデータに対してコンテナ内でデータ変換処理を実行する。⑤処理済みデータをPutObjectでS3(Output)に書き込む。⑥ECSタスクから完了通知がStep Functionsに返却され、⑦ステートマシンが次のステート(JobSucceeded)へ遷移して実行完了となる。


アーキテクチャの全体像を把握できたので、いよいよAWSコンソールを使って実際に構築してみましょう。

3. AWSコンソールでのハンズオン

このセクションでは、AWSコンソールを使って実際にS3 CSVバッチ処理パイプラインを構築します。ECS Fargateでコンテナタスクを実行し、Step Functionsでその実行を制御する一連の流れを手を動かしながら学んでいきましょう。


3-0. 前提条件

作業を始める前に、以下の環境が整っていることを確認してください。

  • AWSアカウント: 有効なAWSアカウントを持っていること
  • IAM権限: ECS、Step Functions、IAM、ECR、S3、CloudWatch Logsを操作できる権限があること(管理者権限推奨)
  • Docker ローカル環境: ECRへのイメージPushに必要。docker version でDocker Desktopが動作していることを確認
  • AWS CLI: インストール済みかつ aws configure で認証情報を設定済みであること(aws sts get-caller-identity で確認可能)
  • デフォルトVPC: 本ハンズオンではデフォルトVPCを使用します。AWSコンソールの「VPC」→「お使いのVPC」でデフォルトVPCが存在することを確認してください

3-1. サンプルPythonスクリプト(ECSタスクで実行するコード)

今回構築するECSタスクが実行するPythonスクリプトを用意します。このスクリプトは、S3からCSVファイルを読み込み、amount 列の値を2倍に変換して、別のS3バケットに書き戻します。

app.py:

import os, boto3, csv, iodef main(): s3 = boto3.client('s3') input_bucket  = os.environ['INPUT_BUCKET'] input_key  = os.environ['INPUT_KEY'] output_bucket = os.environ['OUTPUT_BUCKET'] output_key = os.environ['OUTPUT_KEY'] # S3からCSV読み込み obj = s3.get_object(Bucket=input_bucket, Key=input_key) reader = csv.DictReader(io.StringIO(obj['Body'].read().decode('utf-8'))) rows = list(reader) # 変換処理(例: amount列を2倍) for row in rows:  if 'amount' in row:row['amount'] = str(float(row['amount']) * 2) # 結果をS3に書き込み output = io.StringIO() writer = csv.DictWriter(output, fieldnames=rows[0].keys()) writer.writeheader() writer.writerows(rows) s3.put_object(Bucket=output_bucket, Key=output_key,Body=output.getvalue().encode('utf-8')) print(f"処理完了: {len(rows)} 行を {output_bucket}/{output_key} に書き込み")if __name__ == '__main__': main()

処理の入出力先はすべて環境変数で受け取ります。Step Functionsの入力JSONから動的に注入されるため、コードを変更せずに様々なCSVファイルを処理できる設計になっています。

次に、このスクリプトをコンテナ化するための Dockerfile を作成します。

Dockerfile:

FROM python:3.11-slimWORKDIR /appRUN pip install boto3COPY app.py .CMD ["python", "app.py"]

app.pyDockerfile を同じディレクトリに配置してください。


3-2. ECRリポジトリ作成 → ビルド → Push

作成したDockerイメージをAWS ECR(Elastic Container Registry)にPushします。

① ECRリポジトリ作成

aws ecr create-repository \  --repository-name csv-processor \  --region ap-northeast-1

出力の repositoryUri をメモしておきます(例: 123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/csv-processor)。

② ECRへのDockerログイン

aws ecr get-login-password --region ap-northeast-1 \  | docker login --username AWS \ --password-stdin 123456789012.dkr.ecr.ap-northeast-1.amazonaws.com

注意: 123456789012 はご自身のAWSアカウントIDに置き換えてください。

③ Dockerイメージをビルド・タグ付け・Push

# イメージのビルドdocker build -t csv-processor .# ECRリポジトリ用にタグ付けdocker tag csv-processor:latest \  123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/csv-processor:latest# ECRへPushdocker push \  123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/csv-processor:latest

PushしたイメージURIは後のタスク定義作成で使用するため、コピーして手元に控えておいてください。


3-3. IAMロール作成(3種類)

このハンズオンでは、ECSとStep Functionsが適切なAWSリソースにアクセスできるよう、3つのIAMロールを作成します。

① ECSタスク実行ロール(ecsTaskExecutionRole

ECSエージェントがECRからコンテナイメージをPullし、CloudWatch Logsにログを書き込むために必要なロールです。

信頼ポリシーecs-trust-policy.json):

{  "Version": "2012-10-17",  "Statement": [ {"Effect": "Allow","Principal": {  "Service": "ecs-tasks.amazonaws.com"},"Action": "sts:AssumeRole" }  ]}
# ロール作成aws iam create-role \  --role-name ecsTaskExecutionRole \  --assume-role-policy-document file://ecs-trust-policy.json# AWS管理ポリシーをアタッチaws iam attach-role-policy \  --role-name ecsTaskExecutionRole \  --policy-arn arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy

② ECSタスクロール(csv-processor-task-role

コンテナ内のアプリケーション(app.py)がS3を読み書きするために必要なロールです。

信頼ポリシーecsTaskExecutionRole と同じ内容を使用します。

インラインポリシーs3-access-policy.json):

{  "Version": "2012-10-17",  "Statement": [ {"Effect": "Allow","Action": [  "s3:GetObject",  "s3:PutObject"],"Resource": "arn:aws:s3:::*" }  ]}

ベストプラクティス: 本番環境では "Resource" を実際のバケットARN(例: arn:aws:s3:::my-input-bucket/*)に絞り込んでください。

# ロール作成aws iam create-role \  --role-name csv-processor-task-role \  --assume-role-policy-document file://ecs-trust-policy.json# インラインポリシーを付与aws iam put-role-policy \  --role-name csv-processor-task-role \  --policy-name S3AccessPolicy \  --policy-document file://s3-access-policy.json

③ Step Functions実行ロール(sf-ecs-execution-role

Step FunctionsがECSタスクを起動するために必要なロールです。

信頼ポリシーsf-trust-policy.json):

{  "Version": "2012-10-17",  "Statement": [ {"Effect": "Allow","Principal": {  "Service": "states.amazonaws.com"},"Action": "sts:AssumeRole" }  ]}

インラインポリシーsf-ecs-policy.json):

{  "Version": "2012-10-17",  "Statement": [ {"Effect": "Allow","Action": [  "ecs:RunTask",  "ecs:StopTask",  "ecs:DescribeTasks"],"Resource": "*" }, {"Effect": "Allow","Action": "iam:PassRole","Resource": [  "arn:aws:iam::123456789012:role/ecsTaskExecutionRole",  "arn:aws:iam::123456789012:role/csv-processor-task-role"] }, {"Effect": "Allow","Action": [  "events:PutTargets",  "events:PutRule",  "events:DescribeRule"],"Resource": "*" }  ]}

注意: 123456789012 はご自身のAWSアカウントIDに置き換えてください。iam:PassRole はStep FunctionsがECSにロールを渡す際に必要です。

# ロール作成aws iam create-role \  --role-name sf-ecs-execution-role \  --assume-role-policy-document file://sf-trust-policy.json# インラインポリシーを付与aws iam put-role-policy \  --role-name sf-ecs-execution-role \  --policy-name ECSRunTaskPolicy \  --policy-document file://sf-ecs-policy.json

3-4. ECSクラスター作成

Fargateタスクを実行するECSクラスターを作成します。

  1. AWSコンソールの検索バーで 「ECS」 を検索し、「Elastic Container Service」を開きます
  2. 左メニューの 「クラスター」 をクリック
  3. 「クラスターの作成」 ボタンをクリック
  4. 設定を入力します:
  5. クラスター名: csv-batch-cluster
  6. インフラストラクチャ: 「AWS Fargate (サーバーレス)」にチェック
  7. 「作成」 ボタンをクリック

CLIで作成する場合:

aws ecs create-cluster \  --cluster-name csv-batch-cluster \  --capacity-providers FARGATE \  --region ap-northeast-1

クラスターのステータスが「ACTIVE」になれば作成完了です。


3-5. タスク定義作成

ECSタスクの設定(使用するコンテナイメージ、CPU/メモリ、IAMロール等)をタスク定義として登録します。

  1. ECSコンソールの左メニューで 「タスク定義」 をクリック
  2. 「新しいタスク定義の作成」「JSONを使用した新しいタスク定義の作成」 を選択
  3. 以下のJSONを貼り付けます(<YOUR_ACCOUNT_ID> をご自身のアカウントIDに変更):
{  "family": "csv-processor",  "networkMode": "awsvpc",  "requiresCompatibilities": ["FARGATE"],  "cpu": "256",  "memory": "512",  "executionRoleArn": "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/ecsTaskExecutionRole",  "taskRoleArn": "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/csv-processor-task-role",  "containerDefinitions": [ {"name": "csv-processor","image": "<YOUR_ACCOUNT_ID>.dkr.ecr.ap-northeast-1.amazonaws.com/csv-processor:latest","essential": true,"environment": [],"logConfiguration": {  "logDriver": "awslogs",  "options": { "awslogs-group": "/ecs/csv-processor", "awslogs-region": "ap-northeast-1", "awslogs-stream-prefix": "ecs"  }} }  ]}
  1. 「作成」 ボタンをクリック
設定項目説明
タスク定義名(family)csv-processorタスク定義の識別名
起動タイプFARGATEサーバーレスでコンテナを実行
タスクCPU256 (0.25 vCPU)バッチ処理に十分なCPU
メモリ512 MB (0.5 GB)CSVの読み書きには十分
タスク実行ロールecsTaskExecutionRoleECRイメージPull・ログ出力用
タスクロールcsv-processor-task-roleS3アクセス用

ポイント: 環境変数(INPUT_BUCKET等)はタスク定義には設定しません。Step Functionsの ContainerOverrides から実行時に動的に注入されます。

CloudWatch Logsのロググループを事前に作成しておきましょう:

aws logs create-log-group \  --log-group-name /ecs/csv-processor \  --region ap-northeast-1

3-6. Step Functionsステートマシン作成

いよいよStep Functionsのステートマシンを作成します。ECSタスクをオーケストレーションするASL(Amazon States Language)を定義します。

手順

  1. AWSコンソールの検索バーで 「Step Functions」 を検索して開きます
  2. 「ステートマシンの作成」 をクリック
  3. 「空白で記述」 を選択し、「次へ」 をクリック
  4. 「コード」 タブを選択し、以下のASL JSONを貼り付けます:
{  "Comment": "S3 CSVバッチ処理ジョブ — ECS Fargate × Step Functions",  "StartAt": "RunBatchJob",  "States": { "RunBatchJob": {"Type": "Task","Resource": "arn:aws:states:::ecs:runTask.sync","Parameters": {  "LaunchType": "FARGATE",  "Cluster.$": "$.cluster_arn",  "TaskDefinition.$": "$.task_definition_arn",  "NetworkConfiguration": { "AwsvpcConfiguration": {"Subnets.$": "$.subnets","SecurityGroups.$": "$.security_groups","AssignPublicIp": "ENABLED" }  },  "Overrides": { "ContainerOverrides": [{  "Name": "csv-processor",  "Environment": [ {"Name": "INPUT_BUCKET",  "Value.$": "$.input_bucket"}, {"Name": "INPUT_KEY",  "Value.$": "$.input_key"}, {"Name": "OUTPUT_BUCKET", "Value.$": "$.output_bucket"}, {"Name": "OUTPUT_KEY", "Value.$": "$.output_key"}  ]} ]  }},"Retry": [  { "ErrorEquals": ["States.TaskFailed"], "IntervalSeconds": 30, "MaxAttempts": 2, "BackoffRate": 2.0  }],"Catch": [  { "ErrorEquals": ["States.ALL"], "Next": "JobFailed", "ResultPath": "$.error"  }],"Next": "JobSucceeded" }, "JobSucceeded": {"Type": "Succeed" }, "JobFailed": {"Type": "Fail","Error": "BatchJobFailed","Cause": "ECS task failed. Check CloudWatch Logs for details." }  }}
  1. 設定を入力します:
  2. ステートマシン名: csv-batch-state-machine
  3. 実行ロール: 「既存のロールを選択」→ sf-ecs-execution-role
  4. 「作成」 ボタンをクリック

ASL解説

このASLのポイントを理解しておきましょう。

ecs:runTask.sync とは?

Resource に指定した arn:aws:states:::ecs:runTask.sync は、Step Functionsが持つECSとの最適化された統合(Optimized Integration)です。末尾の .sync により、ECSタスクが完了するまでStep Functionsが自動的に待機します。タスクのポーリングはStep Functionsが内部で行うため、Lambdaなどを挟む必要がありません。

.$ サフィックスの意味

"Cluster.$": "$.cluster_arn" のように、値に .$ が付いたフィールドはステートマシンの実行時入力JSONから動的に値を取得します。$ はルートパスを示し、$.cluster_arn は入力JSONの cluster_arn フィールドを参照します。これにより、同じステートマシンを異なる環境・バケット・ファイルに対して再利用できます。

Retry/Catch の設定値

設定意味
ErrorEqualsStates.TaskFailedECSタスクが失敗した場合にリトライ
IntervalSeconds30初回リトライまでの待機時間(秒)
MaxAttempts2最大リトライ回数(初回 + 最大2回 = 計3回試行)
BackoffRate2.0リトライ間隔の指数バックオフ係数(30s → 60s)
CatchStates.ALLすべてのエラーをキャッチ → JobFailed 状態へ遷移

3-7. ステートマシン実行・結果確認

S3バケットとサンプルCSVの準備

実行前に、入力データを格納するS3バケットとサンプルCSVファイルを用意します。

# 入力・出力バケット作成(バケット名はグローバルで一意にする必要があります)aws s3 mb s3://my-csv-input-bucket-<YOUR_UNIQUE_SUFFIX> --region ap-northeast-1aws s3 mb s3://my-csv-output-bucket-<YOUR_UNIQUE_SUFFIX> --region ap-northeast-1# サンプルCSV作成 & アップロードcat << 'EOF' > sample.csvid,name,amount1,Alice,1002,Bob,2003,Charlie,150EOFaws s3 cp sample.csv s3://my-csv-input-bucket-<YOUR_UNIQUE_SUFFIX>/data/sample.csv

ステートマシンの実行

  1. Step Functionsコンソールで csv-batch-state-machine を開きます
  2. 「実行の開始」 をクリック
  3. 入力JSONに以下を貼り付けます(各値はご自身の環境に合わせて変更してください):
{  "cluster_arn": "arn:aws:ecs:ap-northeast-1:123456789012:cluster/csv-batch-cluster",  "task_definition_arn": "arn:aws:ecs:ap-northeast-1:123456789012:task-definition/csv-processor:1",  "subnets": ["subnet-0123456789abcdef0"],  "security_groups": ["sg-0123456789abcdef0"],  "input_bucket": "my-csv-input-bucket-<YOUR_UNIQUE_SUFFIX>",  "input_key": "data/sample.csv",  "output_bucket": "my-csv-output-bucket-<YOUR_UNIQUE_SUFFIX>",  "output_key": "result/processed.csv"}

ヒント: subnetssecurity_groups はデフォルトVPCのものを使用します。コンソールの「VPC」→「サブネット」でデフォルトVPCに紐づくサブネットIDを、「セキュリティグループ」でデフォルトセキュリティグループのIDを確認してください。

  1. 「実行の開始」 ボタンをクリック

実行グラフと結果の確認

実行が開始されると、グラフビューで各ステートの進捗がリアルタイムで確認できます。

  • 青色: 実行中
  • 緑色: 成功
  • 赤色: 失敗

RunBatchJobJobSucceeded の順で緑色になれば成功です。

CloudWatch Logsの確認

ECSタスクのログを確認するには:

  1. AWSコンソールで 「CloudWatch」 を開きます
  2. 左メニューの 「ロগগループ」 をクリック
  3. /ecs/csv-processor を選択します
  4. 最新のログストリームを開くと、app.py の出力が確認できます:
処理完了: 3 行を my-csv-output-bucket-xxx/result/processed.csv に書き込み

処理結果の確認

出力バケットにCSVが書き込まれているか確認します:

aws s3 cp \  s3://my-csv-output-bucket-<YOUR_UNIQUE_SUFFIX>/result/processed.csv \  - | cat

amount 列が2倍になった結果が出力されていれば成功です:

id,name,amount1,Alice,200.02,Bob,400.03,Charlie,300.0

以上でAWSコンソールを使ったECS Fargate × Step Functionsのハンズオンは完了です。次のセクションでは、同じ構成をTerraformで再現する方法を解説します。

4. Terraformでの構築

コンソールで構築したS3→ECS Fargate→S3のCSVバッチジョブを、今度はTerraformを使ってコード化してみましょう。Infrastructure as Code(IaC)で管理することで、環境の再現性が高まり、チームでの共有や変更管理が容易になります。


4-1. 前提条件

作業を始める前に、以下の環境が整っていることを確認してください。

  • Terraform: バージョン 1.0 以上がインストール済み
    bash
    terraform version
    # Terraform v1.x.x 以上であることを確認
  • AWS CLI: aws configure で認証情報が設定済み
    bash
    aws sts get-caller-identity
    # アカウントIDとIAMユーザー/ロール情報が返ってくれば設定完了
  • Docker: ECRへのコンテナイメージPushに使用します
    bash
    docker version
    # Docker Engine が動作していることを確認
  • IAM 権限: 以下の操作権限が必要です
  • AdministratorAccess または以下の個別権限
  • iam:CreateRole / iam:AttachRolePolicy / iam:PutRolePolicy
  • ecr:CreateRepository / ecs:CreateCluster / ecs:RegisterTaskDefinition
  • states:CreateStateMachine
  • logs:CreateLogGroup

4-2. ディレクトリ構成

以下のディレクトリ構成でプロジェクトを作成します。

ecs-sf-handson/├── main.tf├── variables.tf├── outputs.tf└── statemachine/ └── definition.json
mkdir -p ecs-sf-handson/statemachinecd ecs-sf-handson

4-3. statemachine/definition.json

コンソール版と同一のASL定義です。 このファイルはコンソールで作成したステートマシンと一字一句同じ定義を使用しています。

{  "Comment": "S3 CSVバッチ処理ジョブ — ECS Fargate × Step Functions",  "StartAt": "RunBatchJob",  "States": { "RunBatchJob": {"Type": "Task","Resource": "arn:aws:states:::ecs:runTask.sync","Parameters": {  "LaunchType": "FARGATE",  "Cluster.$": "$.cluster_arn",  "TaskDefinition.$": "$.task_definition_arn",  "NetworkConfiguration": { "AwsvpcConfiguration": {"Subnets.$": "$.subnets","SecurityGroups.$": "$.security_groups","AssignPublicIp": "ENABLED" }  },  "Overrides": { "ContainerOverrides": [{  "Name": "csv-processor",  "Environment": [ {"Name": "INPUT_BUCKET",  "Value.$": "$.input_bucket"}, {"Name": "INPUT_KEY",  "Value.$": "$.input_key"}, {"Name": "OUTPUT_BUCKET", "Value.$": "$.output_bucket"}, {"Name": "OUTPUT_KEY", "Value.$": "$.output_key"}  ]} ]  }},"Retry": [  { "ErrorEquals": ["States.TaskFailed"], "IntervalSeconds": 30, "MaxAttempts": 2, "BackoffRate": 2.0  }],"Catch": [  { "ErrorEquals": ["States.ALL"], "Next": "JobFailed", "ResultPath": "$.error"  }],"Next": "JobSucceeded" }, "JobSucceeded": {"Type": "Succeed" }, "JobFailed": {"Type": "Fail","Error": "BatchJobFailed","Cause": "ECS task failed. Check CloudWatch Logs for details." }  }}

4-4. variables.tf

プロジェクト全体で使用する変数を定義します。vpc_idsubnet_ids はお使いの環境に合わせて設定してください。

# variables.tfvariable "aws_region" {  description = "AWSリージョン"  type  = string  default  = "ap-northeast-1"}variable "project_name" {  description = "プロジェクト名(リソース命名に使用)"  type  = string  default  = "csv-batch"}variable "vpc_id" {  description = "デフォルトVPCのID(AWSコンソール → VPC で確認)"  type  = string  # 例: "vpc-0123456789abcdef0"}variable "subnet_ids" {  description = "デフォルトVPCのサブネットID一覧(AWSコンソール → VPC → サブネット で確認)"  type  = list(string)  # 例: ["subnet-0123456789abcdef0", "subnet-0fedcba9876543210"]}

補足: vpc_idsubnet_ids はデフォルトVPCの情報を使用します。AWSコンソールの VPC サービスから確認してください。または、後述の main.tf に含まれる data ソースで自動取得することもできます(コメントアウトされた代替実装を参照)。


4-5. main.tf

すべてのAWSリソースを定義するメインファイルです。terraform plan が通る完全な実装を記載しています。

# main.tfterraform {  required_version = ">= 1.0"  required_providers { aws = {source  = "hashicorp/aws"version = ">= 5.0" }  }}provider "aws" {  region = var.aws_region}# ============================================================# Data Sources# ============================================================# デフォルトVPC(variables.tfのvpc_idを使わず自動取得する場合はこちらを使用)data "aws_vpc" "default" {  default = true}# デフォルトVPCのサブネット一覧(自動取得する場合)data "aws_subnets" "default" {  filter { name= "vpc-id" values = [data.aws_vpc.default.id]  }}# 現在のAWSアカウントID取得data "aws_caller_identity" "current" {}# ============================================================# ECR リポジトリ# ============================================================resource "aws_ecr_repository" "csv_processor" {  name  = "${var.project_name}-csv-processor"  image_tag_mutability = "MUTABLE"  image_scanning_configuration { scan_on_push = true  }  tags = { Project= var.project_name ManagedBy = "terraform"  }}# ============================================================# IAM ロール — ECS タスク実行ロール# ============================================================resource "aws_iam_role" "ecs_task_execution_role" {  name = "${var.project_name}-ecs-task-execution-role"  assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{  Action = "sts:AssumeRole"  Effect = "Allow"  Principal = { Service = "ecs-tasks.amazonaws.com"  }} ]  })  tags = { Project= var.project_name ManagedBy = "terraform"  }}# ECS タスク実行に必要なマネージドポリシーをアタッチresource "aws_iam_role_policy_attachment" "ecs_task_execution_role_policy" {  role = aws_iam_role.ecs_task_execution_role.name  policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"}# ============================================================# IAM ロール — ECS タスクロール(アプリケーション権限)# ============================================================resource "aws_iam_role" "ecs_task_role" {  name = "${var.project_name}-ecs-task-role"  assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{  Action = "sts:AssumeRole"  Effect = "Allow"  Principal = { Service = "ecs-tasks.amazonaws.com"  }} ]  })  tags = { Project= var.project_name ManagedBy = "terraform"  }}# S3 への読み書き権限(CSVファイルの入出力)resource "aws_iam_role_policy" "task_s3_policy" {  name = "${var.project_name}-task-s3-policy"  role = aws_iam_role.ecs_task_role.id  policy = jsonencode({ Version = "2012-10-17" Statement = [{  Effect = "Allow"  Action = [ "s3:GetObject", "s3:PutObject"  ]  Resource = "arn:aws:s3:::*"} ]  })}# ============================================================# IAM ロール — Step Functions 実行ロール# ============================================================resource "aws_iam_role" "sf_execution_role" {  name = "${var.project_name}-sf-execution-role"  assume_role_policy = jsonencode({ Version = "2012-10-17" Statement = [{  Action = "sts:AssumeRole"  Effect = "Allow"  Principal = { Service = "states.amazonaws.com"  }} ]  })  tags = { Project= var.project_name ManagedBy = "terraform"  }}# Step Functions が ECS タスクを起動し、ログを記録する権限resource "aws_iam_role_policy" "sf_ecs_policy" {  name = "${var.project_name}-sf-ecs-policy"  role = aws_iam_role.sf_execution_role.id  policy = jsonencode({ Version = "2012-10-17" Statement = [{  Effect = "Allow"  Action = [ "ecs:RunTask", "ecs:StopTask", "ecs:DescribeTasks"  ]  Resource = "*"},{  Effect= "Allow"  Action= "iam:PassRole"  Resource = [ aws_iam_role.ecs_task_execution_role.arn, aws_iam_role.ecs_task_role.arn  ]},{  Effect = "Allow"  Action = [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups"  ]  Resource = "*"},{  Effect = "Allow"  Action = [ "events:PutTargets", "events:PutRule", "events:DescribeRule"  ]  Resource = "arn:aws:events:${var.aws_region}:${data.aws_caller_identity.current.account_id}:rule/StepFunctionsGetEventsForECSTaskRule"} ]  })}# ============================================================# セキュリティグループ — ECS タスク用# ============================================================resource "aws_security_group" "ecs_task_sg" {  name  = "${var.project_name}-ecs-task-sg"  description = "ECS タスク用セキュリティグループ(アウトバウンドのみ)"  vpc_id= var.vpc_id  # アウトバウンド: すべての通信を許可(S3アクセス等に必要)  egress { from_port= 0 to_port  = 0 protocol = "-1" cidr_blocks = ["0.0.0.0/0"]  }  tags = { Project= var.project_name ManagedBy = "terraform"  }}# ============================================================# CloudWatch Logs グループ# ============================================================resource "aws_cloudwatch_log_group" "ecs_task" {  name  = "/ecs/${var.project_name}/csv-processor"  retention_in_days = 30  tags = { Project= var.project_name ManagedBy = "terraform"  }}# ============================================================# ECS クラスター# ============================================================resource "aws_ecs_cluster" "main" {  name = "${var.project_name}-cluster"  setting { name  = "containerInsights" value = "enabled"  }  tags = { Project= var.project_name ManagedBy = "terraform"  }}# ============================================================# ECS タスク定義# ============================================================resource "aws_ecs_task_definition" "csv_processor" {  family = "csv-processor"  requires_compatibilities = ["FARGATE"]  network_mode = "awsvpc"  cpu = "256"  memory = "512"  execution_role_arn = aws_iam_role.ecs_task_execution_role.arn  task_role_arn= aws_iam_role.ecs_task_role.arn  container_definitions = jsonencode([ {name= "csv-processor"image  = "${aws_ecr_repository.csv_processor.repository_url}:latest"essential = trueenvironment = [  # 実行時に Step Functions から上書きされる(Overrides で渡す)  { name = "INPUT_BUCKET",  value = "" },  { name = "INPUT_KEY",  value = "" },  { name = "OUTPUT_BUCKET", value = "" },  { name = "OUTPUT_KEY", value = "" }]logConfiguration = {  logDriver = "awslogs"  options = { "awslogs-group"= aws_cloudwatch_log_group.ecs_task.name "awslogs-region"  = var.aws_region "awslogs-stream-prefix" = "ecs"  }} }  ])  tags = { Project= var.project_name ManagedBy = "terraform"  }}# ============================================================# Step Functions ステートマシン# ============================================================resource "aws_sfn_state_machine" "csv_batch" {  name  = "${var.project_name}-state-machine"  role_arn = aws_iam_role.sf_execution_role.arn  # statemachine/definition.json を読み込む(コンソール版と同一のASL定義)  definition = file("statemachine/definition.json")  tags = { Project= var.project_name ManagedBy = "terraform"  }}

4-6. outputs.tf

デプロイ後に参照する情報を出力します。後続のDocker Push手順やAWS CLIでの実行確認に使用します。

# outputs.tfoutput "ecr_repository_url" {  description = "ECR リポジトリの URL(docker push に使用)"  value = aws_ecr_repository.csv_processor.repository_url}output "ecs_cluster_arn" {  description = "ECS クラスターの ARN"  value = aws_ecs_cluster.main.arn}output "task_definition_arn" {  description = "ECS タスク定義の ARN"  value = aws_ecs_task_definition.csv_processor.arn}output "state_machine_arn" {  description = "Step Functions ステートマシンの ARN"  value = aws_sfn_state_machine.csv_batch.arn}output "state_machine_name" {  description = "Step Functions ステートマシン名"  value = aws_sfn_state_machine.csv_batch.name}

4-7. デプロイ手順

ファイルがすべて作成できたら、Terraformでデプロイします。

初期化

terraform init

初回実行時にAWSプロバイダーがダウンロードされます。

Initializing the backend...Initializing provider plugins...- Finding hashicorp/aws versions matching ">= 5.0.0"...- Installing hashicorp/aws v5.x.x...Terraform has been successfully initialized!

実行計画の確認

terraform plan

作成されるリソースを確認します。+ が付いているリソースが新規作成されます。Plan: 行に作成リソース数が表示されます。

Plan: 13 to add, 0 to change, 0 to destroy.Changes to Outputs:  + ecr_repository_url  = (known after apply)  + ecs_cluster_arn  = (known after apply)  + state_machine_arn= (known after apply)  + state_machine_name  = "csv-batch-state-machine"  + task_definition_arn = (known after apply)

plan 出力の読み方: + は新規作成、~ は変更、- は削除を意味します。(known after apply) は適用後でないと確定しない値(ARNなど)です。Plan: 行でリソース数を必ず確認してから apply に進みましょう。

デプロイ実行

terraform apply

確認プロンプトに yes を入力すると実際にリソースが作成されます。

Do you want to perform these actions?  Terraform will perform the following actions:  # ... (省略)  Enter a value: yesApply complete! Resources: 13 added, 0 changed, 0 destroyed.Outputs:ecr_repository_url  = "123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/csv-batch-csv-processor"ecs_cluster_arn  = "arn:aws:ecs:ap-northeast-1:123456789012:cluster/csv-batch-cluster"state_machine_arn= "arn:aws:states:ap-northeast-1:123456789012:stateMachine:csv-batch-state-machine"state_machine_name  = "csv-batch-state-machine"task_definition_arn = "arn:aws:ecs:ap-northeast-1:123456789012:task-definition/csv-processor:1"

4-8. コンテナイメージのPush(Terraform apply後)

terraform apply が完了したら、ECRリポジトリにコンテナイメージをPushします。

ECRへのログイン

# ECRリポジトリURLを変数に設定ECR_URL=$(terraform output -raw ecr_repository_url)AWS_REGION="ap-northeast-1"# ECRへDockerログインaws ecr get-login-password --region ${AWS_REGION} | \  docker login --username AWS --password-stdin ${ECR_URL}

コンテナイメージのビルドとPush

# イメージをビルド(Dockerfileがあるディレクトリで実行)docker build -t csv-processor .# ECRリポジトリ用にタグ付けdocker tag csv-processor:latest ${ECR_URL}:latest# ECRへPushdocker push ${ECR_URL}:latest

Push完了後、AWSコンソールの ECRリポジトリcsv-batch-csv-processor でイメージが登録されていることを確認してください。


4-9. ステートマシン実行確認(Terraform版)

terraform output で取得したARNを使って、AWS CLIからステートマシンを実行します。

実行コマンド

aws stepfunctions start-execution \  --state-machine-arn $(terraform output -raw state_machine_arn) \  --input '{ "cluster_arn": "<ECS_CLUSTER_ARN>", "task_definition_arn": "<TASK_DEF_ARN>", "subnets": ["<SUBNET_ID>"], "security_groups": ["<SG_ID>"], "input_bucket": "my-input-bucket", "input_key": "data/sample.csv", "output_bucket": "my-output-bucket", "output_key": "result/processed.csv"  }'

<ECS_CLUSTER_ARN> などの値は terraform output で確認できます。

# 各ARNを確認terraform output ecs_cluster_arnterraform output task_definition_arn

実行結果の確認

# 実行ARNを変数に設定EXECUTION_ARN="<start-executionで返却されたexecutionArn>"aws stepfunctions describe-execution \  --execution-arn "${EXECUTION_ARN}" \  --region ap-northeast-1

"status": "SUCCEEDED" が返れば成功です。失敗した場合は "status": "FAILED" となり、cause フィールドにエラー内容が含まれます。

AWSコンソールでの確認

  1. AWS マネジメントコンソール → Step Functions に移動
  2. ステートマシン 一覧に csv-batch-state-machine が表示されることを確認
  3. ステートマシンをクリック → 実行履歴でグラフビューを確認
  4. CloudWatch Logs/ecs/csv-batch/csv-processor でタスクのログを確認

ハンズオンが完了したら、不要なコストが発生しないようリソースを削除しましょう。

5. ハンズオン後の削除手順

5-1. 重要: コスト発生リソースへの注意

⚠️ NAT Gatewayは常時課金(約$0.045/時間 ≒ 約$33/月)です。
ハンズオン完了後は速やかに削除してください。

本ハンズオンではデフォルトVPCを使用するためNAT Gatewayは作成していませんが、独自VPCを構築した場合は追加でNAT Gateway料金が発生します。下表を参考に、作成したリソースを確認してください。

リソース月額目安備考
NAT Gateway~$33本ハンズオンでデフォルトVPCを使用する場合は不要
ECS Fargate(タスク実行中のみ)実行時間課金タスク停止で課金停止
CloudWatch Logs保存量に応じてロググループを削除で停止

5-2. Terraformで構築した場合の削除

Terraformで構築した場合は terraform destroy でほとんどのリソースを一括削除できますが、ECRのコンテナイメージはTerraform管理外のため手動削除が必要です。

# まずECRのイメージを手動削除(terraform destroyでは削除されない)aws ecr batch-delete-image \  --repository-name csv-processor \  --image-ids imageTag=latest# Terraform リソース削除terraform destroy

terraform destroy で削除されないリソースには注意が必要です。

  • ECR内のコンテナイメージ: batch-delete-image コマンドで事前に手動削除が必要
  • CloudWatch Logsのロググループ: 手動削除するか、ロググループの保持期間を設定して自動削除させる
  • S3バケット内のオブジェクト: バケット自体がTerraform管理外の場合、オブジェクト・バケットともに手動削除が必要

5-3. コンソールで構築した場合の削除チェックリスト

AWSコンソールから構築した場合は、以下の順序でリソースを削除してください。依存関係があるため、上から順に削除することを推奨します。

  • [ ] Step Functions ステートマシンを削除
  • [ ] ECSタスク定義を登録解除(削除ではなく「INACTIVE」化)
  • [ ] ECSクラスターを削除
  • [ ] ECRリポジトリを削除(内部イメージも削除される)
  • [ ] IAMロール 3種を削除(ecs_task_execution / ecs_task / sf_execution)
  • [ ] CloudWatch Logsのロググループを削除
  • [ ] S3バケット(作成した場合)を削除

💡 ヒント: 削除後は AWS Billing コンソール で請求が止まっていることを確認しましょう。予期せぬ課金が続いている場合は、リソースが残っている可能性があります。


6. まとめと次のステップ

6-1. 本記事のまとめ

本記事では、Amazon ECS FargateとAWS Step Functionsを組み合わせたサーバーレスバッチジョブ基盤の構築をハンズオン形式で学びました。以下が本記事を通じて習得したポイントです。

  • ECS FargateとStep Functionsを組み合わせたバッチジョブ構成: サーバーレスでスケーラブルなバッチ処理基盤を構築する方法を習得
  • .sync 統合パターンによるタスク完了待ち受け: arn:aws:states:::ecs:runTask.sync を使ってECSタスクの完了を確実に待機する方法を理解
  • Retry/Catchによる宣言的エラーハンドリング: ステートマシン定義内でリトライポリシーとエラーキャッチを記述し、堅牢なワークフローを実現
  • コンソール操作とTerraformの両方による構築: GUI操作でAWSリソースの構造を把握しつつ、IaCでの再現方法も習得
  • ASL定義の外部ファイル化(Terraform版): file() 関数を使いステートマシン定義を外部JSONファイルに分離管理し、保守性を向上
  • CloudWatch Logsによる実行ログ確認: ECSタスクとStep Functionsの実行履歴・ログを CloudWatch で一元確認する手順を習得
  • IAMロールの最小権限設計: タスク実行ロール・タスクロール・SF実行ロールをそれぞれ分離し、最小権限の原則を実践
  • コスト管理の基本: NAT Gatewayなど常時課金リソースの把握と、ハンズオン後の削除手順

6-2. 発展トピック(次のステップ)

本ハンズオンで基本的なECS × Step Functionsパターンを習得できました。次回以降のシリーズでは、以下の発展トピックを解説予定です。

トピック概要
Parallel ステートによる複数ECSタスクの並列実行独立した複数バッチ処理を同時実行してスループットを向上
.waitForTaskToken パターン長時間処理や人承認フローなど、外部イベント待ちのワークフローを実装
EventBridge + Step Functionsスケジュール実行やS3イベントトリガーによるイベント駆動バッチ構成
コンテナイメージの CI/CDGitHub Actions → ECR自動Pushによる継続的デリバリーパイプラインの構築
Step Functions Express Workflow高頻度・短時間の小バッチ処理に最適化されたワークフロータイプの活用

6-3. 参考リンク

本記事の内容をさらに深掘りするための公式ドキュメントです。

  • Amazon ECS ドキュメント(Fargate起動タイプ)
  • Step Functions + ECS 統合(呼び出しSDKの統合)
  • Amazon States Language 仕様
  • Terraform aws_ecs_task_definition リソース
  • Terraform aws_sfn_state_machine リソース

6-4. 記事フッター


本記事は「AWS ハンズオン TechBlog」Step Functions シリーズの第2回です。
第1回: AWS Step Functions 入門 — コンソールとTerraformで学ぶハンズオン
次回: さらなる発展トピックを近日公開予定

最新情報をチェックしよう!