AWS App Integration Vol2|Step Functions×AppFlow×MQ×MWAA

目次

なぜ Application Integration Vol2 か — 4層アーキテクチャ概観

Application Integration本番運用シリーズ ナビゲーション

Vol1(統合プラットフォーム基盤) — つなぐ・流す・制御する
Vol2(ワークフロー・データ統合・エンタープライズMQ・本記事) — オーケストレートする・統合する・橋渡しする・スケジュールする

| 章 | テーマ | 主な施策 |
|—-|——–|———|
| §2 | AWS Step Functions Workflows 本番運用 | Standard/Express・エラーハンドリング・分散マップ・コールバック |
| §3 | Amazon AppFlow 本番運用 | Salesforce/SAP/Slack統合・スケジュール/イベントトリガ |
| §4 | Amazon MQ 本番運用 | ActiveMQ/RabbitMQ・Network of Brokers・JMS/STOMP/MQTT |
| §5 | Amazon MWAA 本番運用 | Airflow DAG・Worker/Scheduler・実行ロール |
| §6 | 詰まりポイント7選 + 演習 | 頻出パターンの解決策 + アンチパターン変換5問 |
| §7 | Vol1×Vol2統合アーキテクチャ | 選定フローチャート + 完全体フロー |

Application Integration Vol2 ワークフロー・データ統合4層アーキテクチャ
fig01: Application Integration Vol2 ワークフロー・データ統合4層アーキテクチャ

AWS Application Integration は「つなぐ・流す・制御する・オーケストレートする」の4層で構成される。Vol1(SQS / SNS / EventBridge / API Gateway / AppSync)でメッセージング統合プラットフォームの基盤を確立した。Vol2(本記事)はその上位層として、複雑な処理フローの調整・SaaSデータ統合・エンタープライズMQブローカー管理・バッチワークフロースケジューリングという4つの高度化レイヤーを担う。

Vol1 + Vol2 = Application Integration 完全体

Vol1が解決した課題(基盤層):

  • SQS でサービス間の非同期デカップリングを実現
  • SNS でファンアウト配信・複数サブスクライバーへの並列通知を確立
  • EventBridge でイベント駆動アーキテクチャのルーティング基盤を構築
  • API Gateway で外部向けエンドポイントとリクエスト制御を整備
  • AppSync でリアルタイムGraphQL APIとサブスクリプション基盤を提供

Vol2が解決する課題(オーケストレーション層):

Vol1基盤の上で「ステート管理・条件分岐・エラー回復が必要な複雑処理フロー」が発生したとき、Step Functions が調整役として機能する。SalesforceやSAP等のSaaSとAWS間のデータ同期にはAppFlowが最適解になる。既存のActiveMQ/RabbitMQをクラウド移行する際はAmazon MQで互換性を維持できる。そしてAirflowベースのバッチDAGスケジューリングにはMWAAが対応する。

レイヤーサービス担う役割Vol1との関係
ワークフローStep Functions複雑なステートマシン・エラーハンドリングEventBridge → Step Functions 実行トリガー
データ統合Amazon AppFlowSaaS↔AWS双方向データ同期S3/Redshiftへの格納はSNS通知連携可
エンタープライズMQAmazon MQJMS/STOMP/MQTT互換ブローカーSQSへの移行困難なレガシーシステム対応
バッチスケジューリングAmazon MWAAAirflow DAGによる複雑依存スケジュールEventBridgeと組み合わせたイベントトリガーも可

4層アーキテクチャで達成できる本番運用状態

本記事(Vol2)を読み終えた時点で以下の設計・実装能力が身につく。

  1. Step Functions: Standard vs Express の選択判断・Retry/Catch による耐障害設計・Distributed Map での10,000並行処理・コールバックを用いた人手承認フロー・SDK直接統合による270+サービスへのノーコード接続
  2. Amazon AppFlow: Salesforce/SAP/Slack等SaaSとのスケジュール/イベントトリガー同期・フィールドマッピング・データ変換・VPC経由プライベート接続
  3. Amazon MQ: ActiveMQ vs RabbitMQの選択・Active/Standby冗長構成・Network of Brokers 広域分散・JMS/STOMP/MQTTプロトコル別ユースケース
  4. Amazon MWAA: Airflow DAG設計・Worker/Schedulerのスケール設定・実行ロール権限・プラグイン管理・DAGデプロイパイプライン

対象読者とスキル前提

本記事はVol1既読(またはSQS/SNS/EventBridgeの基本理解がある)Cloud Architect / Backend Engineerを対象とする。AWSマネジメントコンソールの基本操作・IAMロールの概念・PythonまたはNode.jsの基礎があれば、コードサンプルをそのまま本番環境へ適用できる。

Vol1との連携ポイント

Vol1で構築したEventBridge → Step Functions の実行トリガー、SQS DLQ → Step Functions コールバックパターン、SNS → AppFlow 通知連携は本記事の各§で具体的なコード付きで解説する。Vol1未読の場合はApplication Integration本番運用Vol1を先に参照することを推奨する。


AWS Step Functions Workflows 本番運用 — Standard vs Express / エラーハンドリング / 分散マップ

Step Functions Standard vs Express ワークフロー比較 + エラーハンドリングフロー
fig02: Step Functions Standard vs Express ワークフロー比較 + エラーハンドリングフロー
sequenceDiagram
 participant Trigger as トリガ (API/EventBridge)
 participant SFN as Step Functions
 participant Lambda as Lambda タスク
 participant DDB as DynamoDB
 participant DLQ as DLQ
 Trigger->>SFN: 実行開始 (StartExecution)
 SFN->>Lambda: タスク1実行
 Lambda-->>SFN: 結果
 SFN->>DDB: タスク2 (SDK直接統合)
 DDB-->>SFN: 結果
 SFN->>SFN: Catch/Retry エラーハンドリング
 alt 失敗時
  SFN->>DLQ: 失敗イベント送信
 end
 Note over SFN: Standard: 長期(最大1年)<br/>Express: 高頻度(最大5分)

AWS Step Functions は AWS が提供するフルマネージドのステートマシンサービスである。Amazon States Language (ASL) と呼ばれるJSON/YAML形式の定義ファイルで処理フロー(ステートマシン)を記述し、各ステートの遷移条件・エラー処理・並行実行をコード量を最小化して実装できる。

Standard Workflow vs Express Workflow — 5軸比較

Step Functions は用途によって使い分ける2種類のワークフロータイプを提供する。

比較軸Standard WorkflowExpress Workflow
最大実行時間最大1年最大5分
課金モデルステート遷移回数 × 単価実行数 + 実行時間(GB秒)
スループット2,000 実行/秒100,000 実行/秒以上
履歴保存90日間 (Step Functions コンソール)CloudWatch Logs へ転送
主なユースケース長期ETL・人手承認フロー・注文処理高頻度API・IoTデータ処理・ストリーム変換
実行の一意性重複実行なし (Exactly-once 保証)At-least-once (冪等設計が必須)

Standard Workflow の選択基準:

  • 処理時間が5分を超える可能性がある場合(バッチETL、レポート生成等)
  • 人手承認フロー(コールバック Wait for Task Token)を含む場合
  • 実行履歴をコンソールで可視化してデバッグしたい場合
  • 注文処理・決済フローなど正確に1回の実行保証が必要な場合

Express Workflow の選択基準:

  • 1リクエスト5分以内に完結する高頻度処理(API Gateway → Step Functions)
  • IoTデバイスからの大量センサーデータ変換・ルーティング
  • 1秒間に数千〜数万件のリアルタイム処理が必要な場合
Standard/Express の課金構造を理解して設計する

Standard: 月 4,000 ステート遷移まで無料。超過分は 1,000 遷移あたり $0.025。ステートマシンの各ステート移動(Task/Choice/Wait/Parallel/Map 等)が1遷移としてカウントされる。ループや分岐が多いステートマシンは遷移数が膨らむため、不要なパスバイステートを削減するリファクタリングが効く。

Express: 月 1,000,000 実行まで無料。超過分は 100 万実行あたり $1.00 + 持続時間 (GB秒) $0.00001。実行時間が短く超高頻度の処理に適している。

費用試算例: 1日 100 万件の IoT センサー処理(1件あたり平均 2 秒、64MB使用)
→ 実行費用: $1.00/日 + 持続時間費用: 1,000,000 × 2秒 × 64/1024GB × $0.00001 ≒ $1.25/日 = 約$2.25/日

エラーハンドリング — Retry / Catch / DLQ

Step Functions の耐障害設計の核心は RetryCatch の組み合わせである。

Retry 設定:

{
  "Type": "Task",
  "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:ProcessOrder",
  "Retry": [
 {
"ErrorEquals": ["Lambda.ServiceException", "Lambda.AWSLambdaException"],
"IntervalSeconds": 2,
"BackoffRate": 2.0,
"MaxAttempts": 3,
"JitterStrategy": "FULL"
 },
 {
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 5,
"BackoffRate": 1.5,
"MaxAttempts": 2
 }
  ],
  "Catch": [
 {
"ErrorEquals": ["States.ALL"],
"Next": "HandleError",
"ResultPath": "$.error"
 }
  ]
}
Retry パラメーター説明推奨値
IntervalSeconds初回リトライまでの待機秒数1〜5秒
BackoffRateリトライ間隔の乗数 (指数バックオフ)2.0
MaxAttempts最大リトライ回数3 (べき等操作)
JitterStrategyランダムジッター (FULL/NONE)FULL (Thundering Herd 防止)

Catch でのフォールバック遷移:

{
  "Catch": [
 {
"ErrorEquals": ["CustomError.PaymentDeclined"],
"Next": "NotifyPaymentFailed",
"ResultPath": "$.error"
 },
 {
"ErrorEquals": ["States.Timeout"],
"Next": "TimeoutHandler",
"ResultPath": "$.timeoutError"
 },
 {
"ErrorEquals": ["States.ALL"],
"Next": "SendToDLQ",
"ResultPath": "$.unhandledError"
 }
  ]
}

DLQ 送信ステート(SQS 直接統合):

{
  "SendToDLQ": {
 "Type": "Task",
 "Resource": "arn:aws:states:::sqs:sendMessage",
 "Parameters": {
"QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789012/OrderProcessingDLQ",
"MessageBody.$": "States.JsonToString($)",
"MessageAttributes": {
  "ErrorType": {
 "DataType": "String",
 "StringValue.$": "$.unhandledError.Error"
  }
}
 },
 "End": true
  }
}

ResultPath: "$.error" を指定することで、エラー情報を元の入力に追記しながら次のステートへ渡すことができる。ResultPath: null にすると元の入力をそのまま次ステートへ渡す。

分散マップ (Distributed Map) — 大規模並列処理

Distributed Map は 10,000 並行子ワークフローを実行できる Step Functions の大規模並列処理機能である。S3 内のオブジェクト一覧を入力として各行/各ファイルを並列処理するユースケースに最適。

Distributed Map ASL 定義:

{
  "ProcessS3Records": {
 "Type": "Map",
 "ItemProcessor": {
"ProcessorConfig": {
  "Mode": "DISTRIBUTED",
  "ExecutionType": "STANDARD"
},
"StartAt": "ProcessRecord",
"States": {
  "ProcessRecord": {
 "Type": "Task",
 "Resource": "arn:aws:states:::lambda:invoke",
 "Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:123456789012:function:ProcessRecord",
"Payload.$": "$"
 },
 "End": true
  }
}
 },
 "ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": {
  "InputType": "CSV",
  "CSVHeaderLocation": "FIRST_ROW"
},
"Parameters": {
  "Bucket.$": "$.bucket",
  "Key.$": "$.key"
}
 },
 "MaxConcurrency": 1000,
 "ToleratedFailureCount": 10,
 "ToleratedFailurePercentage": 1,
 "ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Parameters": {
  "Bucket.$": "$.outputBucket",
  "Prefix": "results/"
}
 },
 "Next": "AggregateResults"
  }
}
Distributed Map パラメーター説明
Mode: DISTRIBUTED子ステートマシンとして独立実行
MaxConcurrency同時実行上限 (最大 10,000)
ToleratedFailureCount許容する絶対失敗数
ToleratedFailurePercentage許容する失敗率 (%)
ItemReaderS3 CSV/JSON/JSONL をソースとして読み込む
ResultWriter各子実行の結果を S3 に集約出力

ToleratedFailureCountToleratedFailurePercentage を設定することで、全体の一部が失敗してもステートマシン全体が中断されるのを防ぎ、バッチ処理の部分完了を許容できる。

コールバック (Wait for Task Token) — 人手承認フロー

Wait for Task Token は外部システムや人手の応答を待機する非同期パターンである。SQS / SNS / EventBridge / API Gateway 等を通じて外部システムに Task Token を渡し、承認後に SendTaskSuccess / SendTaskFailure を呼び出してステートマシンを再開する。

人手承認フローの ASL:

{
  "WaitForApproval": {
 "Type": "Task",
 "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
 "Parameters": {
"QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789012/ApprovalQueue",
"MessageBody": {
  "TaskToken.$": "$$.Task.Token",
  "OrderId.$": "$.orderId",
  "Amount.$": "$.amount",
  "ApprovalUrl.$": "States.Format('https://approval.example.com/approve?token={}', $$.Task.Token)"
}
 },
 "HeartbeatSeconds": 86400,
 "TimeoutSeconds": 172800,
 "Next": "ProcessApprovedOrder",
 "Catch": [
{
  "ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],
  "Next": "HandleApprovalTimeout"
}
 ]
  }
}

承認者がコールバックを送信する Lambda:

import boto3

sfn_client = boto3.client('stepfunctions')

def lambda_handler(event, context):
 task_token = event['taskToken']
 approved = event.get('approved', False)

 if approved:
  sfn_client.send_task_success(
taskToken=task_token,
output='{"approvalStatus": "APPROVED", "approvedBy": "' + event['approver'] + '"}'
  )
 else:
  sfn_client.send_task_failure(
taskToken=task_token,
error='ApprovalRejected',
cause='Approver rejected the request'
  )

$$.Task.Token は ASL の組み込み変数で現在のタスクトークンを参照する。HeartbeatSeconds を設定すると、指定秒数内にコールバックがない場合にタイムアウトエラーが発生する。

SDK 直接統合 (270+ AWSサービス)

Step Functions の SDK 統合により、Lambda を介さずに DynamoDB / EventBridge / S3 / SQS / SNS など270以上のAWSサービスを直接呼び出せる。Lambda 関数の作成・デプロイ・管理コストを削減し、ステートマシンの処理速度も向上する。

DynamoDB 直接書き込み:

{
  "SaveToDatabase": {
 "Type": "Task",
 "Resource": "arn:aws:states:::dynamodb:putItem",
 "Parameters": {
"TableName": "Orders",
"Item": {
  "orderId": {"S.$": "$.orderId"},
  "status": {"S": "PROCESSING"},
  "updatedAt": {"S.$": "$$.Execution.StartTime"},
  "amount": {"N.$": "States.Format('{}', $.amount)"}
}
 },
 "ResultPath": null,
 "Next": "PublishEvent"
  }
}

EventBridge へのイベント発行:

{
  "PublishEvent": {
 "Type": "Task",
 "Resource": "arn:aws:states:::events:putEvents",
 "Parameters": {
"Entries": [
  {
 "EventBusName": "order-events",
 "Source": "order.processing",
 "DetailType": "OrderStatusChanged",
 "Detail": {
"orderId.$": "$.orderId",
"status": "PROCESSING",
"timestamp.$": "$$.Execution.StartTime"
 }
  }
]
 },
 "End": true
  }
}

JSONata 式によるペイロード変換:

Step Functions の ASL v2 では States.Format / States.StringToJson / States.JsonToString に加え、JSONata 式によるインライン変換が利用できる。

{
  "TransformPayload": {
 "Type": "Pass",
 "Parameters": {
"orderSummary.$": "States.Format('Order #{} - {} JPY ({})', $.orderId, $.amount, $.status)",
"itemCount.$": "States.ArrayLength($.items)",
"firstItem.$": "States.ArrayGetItem($.items, 0)",
"totalWithTax.$": "States.MathAdd($.amount, States.MathMultiply($.amount, 0.1))"
 },
 "Next": "SendNotification"
  }
}
Step Functions 本番設計チェックリスト

1. ワークフロータイプの選択: 5分超・人手承認・正確に1回の実行保証が必要 → Standard。高頻度・5分以内・冪等 → Express。
2. Retry にジッターを設定: JitterStrategy: FULL で複数ステートマシンの同時リトライによる集中アクセスを分散させる。
3. ResultPath の設計: エラーをキャッチした後のステートで元データを保持したい場合は ResultPath: "$.error" を指定する。ResultPath: null にすると元の入力がそのまま渡される。
4. Distributed Map の失敗許容率: バッチ処理の性質に応じて ToleratedFailurePercentage を設定する。0 に設定すると1件の失敗で全体が停止するため、大規模バッチでは 1〜5% を目安にする。
5. Task Token の有効期限管理: コールバックパターンでは HeartbeatSeconds を必ず設定する。設定なしでは承認者が応答しない場合にステートマシンが永久に停止状態となる。
6. SDK統合 vs Lambda: 単純なAWSサービス呼び出しにはSDK統合を使う。ビジネスロジック・データ変換・複数API呼び出しの組み合わせにはLambdaを使う。


Amazon AppFlow 本番運用 — Salesforce/SAP/Slack統合 / スケジュールトリガ

Amazon AppFlow Salesforce統合フロー + スケジュールトリガ構成
fig03: Amazon AppFlow Salesforce統合フロー + スケジュールトリガ構成

対応コネクタ一覧

AppFlow は以下のカテゴリに分類されたコネクタをネイティブサポートしている。

カテゴリ代表コネクタ
CRM/営業Salesforce / HubSpot / Zendesk
ERP/基幹SAP OData / SAP S/4HANA Cloud
コラボレーションSlack / ServiceNow / Jira Cloud
マーケティングMarketo / Google Analytics / Amplitude
送信先 (Destination)Amazon S3 / Redshift / Salesforce / EventBridge / Snowflake

Salesforce コネクタは商談 (Opportunity)・リード (Lead)・ケース (Case)・カスタムオブジェクトを対象にでき、SAP OData はエンティティセット単位でクエリが可能。Slack コネクタはチャンネルメッセージの抽出やユーザーディレクトリ同期に使われる。

フロー設定

AppFlow のフロー実行は 3 種類のトリガを選択できる。

トリガ種別説明代表ユースケース
スケジュールcron / レート式で定期実行日次 Salesforce 商談データ取得
イベントソースの変更通知で即時実行Salesforce Platform Events / S3 イベント
オンデマンドコンソール / API から手動実行初回フル同期・デバッグ

データ取得モード

  • フル転送 (Full transfer): 毎回ソース全件を取得。初回同期・小規模データセットに適す。
  • 増分転送 (Incremental transfer): 最終実行以降に変更されたレコードのみ取得。LastModifiedDate / Sequence 番号ベース。ソース側が増分クエリをサポートしている必要がある。

データ変換パイプライン

# AppFlow フロー定義例 (AWS CloudFormation)
FlowTransformationExample:
  Type: AWS::AppFlow::Flow
  Properties:
 FlowName: salesforce-to-s3-daily
 TriggerConfig:
TriggerType: Scheduled
TriggerProperties:
  ScheduleExpression: "rate(1 days)"
  DataPullMode: Incremental
 SourceFlowConfig:
ConnectorType: Salesforce
SourceConnectorProperties:
  Salesforce:
 Object: Opportunity
 EnableDynamicFieldUpdate: true
 DestinationFlowConfigList:
- ConnectorType: S3
  DestinationConnectorProperties:
 S3:
BucketName: !Ref DataLakeBucket
S3OutputFormatConfig:
  FileType: PARQUET
  PrefixConfig:
 PrefixType: PATH
 PrefixFormat: YEAR_MONTH_DAY
 Tasks:
# フィールドマッピング: Salesforce 名 → S3 列名
- TaskType: Map
  SourceFields: [Id, Name, Amount, CloseDate, StageName]
  ConnectorOperator:
 Salesforce: NO_OP
  DestinationField: opportunity_id
# フィルタ: 成立済み案件のみ通過
- TaskType: Filter
  SourceFields: [StageName]
  ConnectorOperator:
 Salesforce: EQUAL_TO
  TaskProperties:
 - Key: VALUE
Value: "Closed Won"
# マスキング: 金額フィールドを伏字
- TaskType: Mask
  SourceFields: [Amount]
  ConnectorOperator:
 Salesforce: MASK_ALL

変換パイプライン要素

変換タイプ説明
Mapソースフィールド名を宛先スキーマへ対応付け
Filter条件に合うレコードのみ転送
Mask機密フィールドを **** で置換
Validateフィールド値の型・長さを検証。不正データは別の S3 パスに隔離
Concatenate複数フィールドを結合して 1 フィールドに出力
Arithmetic数値演算 (加算・乗算等)
AppFlow 増分転送の前提条件

増分転送を利用するにはソースコネクタが変更検知カラムをサポートしている必要がある。Salesforce の場合は LastModifiedDate、SAP OData では ModifiedAt フィールドを AppFlow が内部参照する。フロー設定で DataPullMode: Incremental を指定しても、ソースオブジェクトが対応していない場合は自動的にフルスキャンにフォールバックするため、初回実行後のメタデータ (Last export time) で動作を確認すること。

大規模初回同期 (100 万件超) では日付範囲別に複数フローを並列定義し、Salesforce API クォータ (24 時間ウィンドウ / 15 分ウィンドウ) を分散させる設計が有効。

プライベート接続 (PrivateLink 統合)

VPC 内のデータソースや社内オンプレミスシステムと AppFlow を接続する場合、PrivateLink 経由のプライベート接続を使用するとインターネットを経由しない安全なデータ転送が実現できる。

# プライベート接続プロファイル作成 (AWS CLI)
aws appflow create-connector-profile \
  --connector-profile-name salesforce-prod \
  --connector-type Salesforce \
  --connection-mode Private \
  --connector-profile-config '{
 "connectorProfileProperties": {
"Salesforce": {
  "instanceUrl": "https://myorg.my.salesforce.com",
  "isSandboxEnvironment": false
}
 }
  }'
接続モード通信経路代表ユースケース
Publicインターネット経由Salesforce / Slack など SaaS
PrivateAWS PrivateLink / VPCオンプレ / VPC 内カスタムコネクタ

カスタムコネクタを VPC 内に構築する場合は、AppFlow が生成する VPC エンドポイントに対してセキュリティグループとルートテーブルを適切に設定する必要がある。

エラーハンドリング (失敗データの S3 保存)

AppFlow フローで宛先への書き込みに失敗したレコードは S3 の専用バケットに自動保存できる。

{
  "errorHandlingConfig": {
 "failOnFirstDestinationError": false,
 "bucketName": "appflow-error-records",
 "bucketPrefix": "errors/salesforce-to-s3/"
  }
}
  • failOnFirstDestinationError: false — 1 件失敗してもフロー全体を継続実行
  • 失敗レコードは s3://appflow-error-records/errors/salesforce-to-s3/YYYY/MM/DD/ に CSV 形式で保存
  • CloudWatch メトリクス RecordsProcessed / RecordsFailed / FlowExecutionsStarted でフロー健全性を監視

アラーム設定例

# 失敗レコード数アラーム作成
aws cloudwatch put-metric-alarm \
  --alarm-name appflow-record-failed \
  --metric-name RecordsFailed \
  --namespace AWS/AppFlow \
  --dimensions Name=FlowName,Value=salesforce-to-s3-daily \
  --statistic Sum \
  --period 3600 \
  --threshold 100 \
  --comparison-operator GreaterThanOrEqualToThreshold \
  --evaluation-periods 1 \
  --alarm-actions arn:aws:sns:ap-northeast-1:123456789012:appflow-alert
AppFlow パフォーマンス設計のポイント

1. Salesforce API クォータ管理: AppFlow は Salesforce Bulk API 2.0 を使用するため、クォータ消費はバルクジョブ単位で計算される。1 フロー実行あたり最大 50,000 件のバッチが複数発行されるため、並列フロー数に注意する。
2. Parquet 形式の活用: S3 宛先では PARQUET 形式を指定すると Athena / Redshift Spectrum でのクエリコストを大幅削減できる。CSV との比較で圧縮率 3〜10 倍が期待できる。
3. EventBridge 宛先: 増分データをリアルタイムに後続サービスへ連携する場合は S3 の代わりに EventBridge を宛先にして Lambda / Step Functions をトリガする構成が低レイテンシ。
4. フロー実行ログ: CloudWatch Logs の /aws/appflow/{FlowName} にデバッグ情報が出力される。障害調査時は errorMessage フィールドを最初に確認する。


Amazon MQ 本番運用 — ActiveMQ vs RabbitMQ / Network of Brokers / JMS/STOMP/MQTT

Amazon MQ ActiveMQ Active/Standby + Network of Brokers構成
fig04: Amazon MQ ActiveMQ Active/Standby + Network of Brokers構成

ActiveMQ vs RabbitMQ 選定

Amazon MQ は Apache ActiveMQRabbitMQ の 2 種類のブローカーエンジンを提供する。

比較軸ActiveMQRabbitMQ
プロトコルJMS / OpenWire / STOMP / AMQP 1.0 / MQTT / WebSocketAMQP 0-9-1 / MQTT / STOMP
メッセージモデルキュー (P2P) + トピック (Pub/Sub)Exchange → Routing Key → Queue
主なユースケースオンプレ ActiveMQ 移行・JMS 既存アプリ高スループット・柔軟ルーティング
最大インスタンスmq.m5.4xlargemq.m5.4xlarge
デフォルトポート61616 (OpenWire) / 8162 (Web Console)5671 (AMQPS) / 15671 (Management)

選定指針

  • オンプレミスの ActiveMQ / IBM MQ を AWS へ移行する場合 → ActiveMQ (JMS 互換性が高い)
  • 新規構築でメッセージルーティングの柔軟性を重視する場合 → RabbitMQ (Exchange/Binding モデル)
  • MQTT デバイス統合とキュー処理を同一ブローカーで行う場合 → ActiveMQ (MQTT + JMS 同時対応)

Active/Standby 高可用性構成

Amazon MQ は Single-instanceActive/Standby の 2 構成を提供する。本番環境では Active/Standby を必ず使用する。

[Active Broker] [Standby Broker]
 ││
 ├── EFS 共有ストレージ ──┤  (ActiveMQ の場合)
 ││
 └── 自動フェイルオーバー ┘
(約 60〜120 秒)
項目内容
同期方式Amazon EFS (ActiveMQ) / EBS レプリカ (RabbitMQ)
フェイルオーバー時間約 60〜120 秒
接続 URLfailover:(ssl://b-xxx-1.mq.region.amazonaws.com:61617,ssl://b-xxx-2.mq.region.amazonaws.com:61617)
SLA99.9% (Active/Standby 構成時)

Failover リスナー実装例 (Java)

// ActiveMQ Java クライアント - Failover URI で自動再接続
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
 "failover:(ssl://b-xxxxxxxx-1.mq.ap-northeast-1.amazonaws.com:61617," +
 "ssl://b-xxxxxxxx-2.mq.ap-northeast-1.amazonaws.com:61617)" +
 "?initialReconnectDelay=1000&maxReconnectDelay=30000"
);
factory.setUserName("admin");
factory.setPassword(System.getenv("MQ_PASSWORD"));

Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination queue = session.createQueue("orders");
MessageConsumer consumer = session.createConsumer(queue);

consumer.setMessageListener(message -> {
 try {
  System.out.println("受信: " + ((TextMessage) message).getText());
  message.acknowledge(); // CLIENT_ACKNOWLEDGE: 処理完了後に明示 ACK
 } catch (JMSException e) {
  // ACK しないと再配送される
 }
});
フェイルオーバー時の接続断対策

Active/Standby 構成でフェイルオーバーが発生すると、アクティブブローカーへの接続が強制切断される。failover:// プロトコルで接続 URL を複数指定することで自動再接続できるが、再接続が完了するまでの約 60〜120 秒間は新規メッセージの送受信が停止する。

この間のデータ損失を防ぐには:

プロデューサー側: JMSException 発生時に SQS などのフォールバックキューへ送信するリトライロジックを実装する
コンシューマー側: CLIENT_ACKNOWLEDGE モードを使用し、メッセージ処理完了後に message.acknowledge() を明示的に呼び出す。未 ACK メッセージはフェイルオーバー後に再配送される

Network of Brokers

大規模トラフィックや複数リージョン展開では Network of Brokers (NoB) でブローカーをメッシュ接続する。

メッシュトポロジー — 全ブローカー間が双方向接続

Region ARegion B
┌──────────────────┐┌──────────────────┐
│  Broker-1 (A/S)  │◄───────►│  Broker-3 (A/S)  │
│  Broker-2 (A/S)  │◄───────►│  Broker-4 (A/S)  │
└──────────────────┘└──────────────────┘

ハブ&スポーク構成 — 集中ルーティング型。スポークはハブとのみ接続

 Hub Broker (集中)
/\
Spoke-1 Spoke-2
(東京 AZ-a)  (東京 AZ-c)
<!-- activemq.xml - ネットワークコネクタ定義 -->
<networkConnectors>
  <networkConnector
 name="hub-connector"
 uri="static:(ssl://hub-broker.mq.ap-northeast-1.amazonaws.com:61617)"
 duplex="true"
 networkTTL="2"
 messageTTL="-1"
 dynamicOnly="true">
 <excludedDestinations>
<queue physicalName="internal.>"/>
 </excludedDestinations>
  </networkConnector>
</networkConnectors>
NoB パラメータ説明
duplex="true"双方向転送 (false = 片方向のみ)
networkTTLメッセージが跨げる最大ホップ数 (デフォルト 1)
messageTTLTTL 変更 (-1 = 元の TTL を維持)
dynamicOnlyコンシューマーが存在するキューのみ転送 (帯域節約)

跨リージョン構成では VPC ピアリングまたは Transit Gateway でブローカー間の TCP 経路を確保し、SSL (ssl://) を使用した暗号化接続が必須となる。

JMS / STOMP / MQTT プロトコル対応

Python — STOMP クライアント

import stomp

class OrderListener(stomp.ConnectionListener):
 def on_message(self, frame):
  print(f"受信: {frame.body}")
  self.conn.ack(frame.headers['message-id'], frame.headers['subscription'])

conn = stomp.Connection12(
 [('b-xxxxxxxx.mq.ap-northeast-1.amazonaws.com', 61614)],
 use_ssl=True
)
conn.set_listener('', OrderListener())
conn.connect('admin', 'password', wait=True)
conn.subscribe('/queue/orders', id=1, ack='client')

# メッセージ送信
conn.send('/queue/orders', '{"orderId": "12345"}', content_type='application/json')

Node.js — MQTT クライアント

const mqtt = require('mqtt');

const client = mqtt.connect(
  'mqtts://b-xxxxxxxx.mq.ap-northeast-1.amazonaws.com:8883',
  {
 username: 'admin',
 password: process.env.MQ_PASSWORD,
 clientId: `node-client-${Date.now()}`,
 rejectUnauthorized: true
  }
);

client.on('connect', () => {
  client.subscribe('orders/#', { qos: 1 });
});

client.on('message', (topic, message) => {
  console.log(`topic: ${topic}, payload: ${message.toString()}`);
  // QoS 1: PUBACK を自動送信するため明示 ACK 不要
});

ActiveMQ プロトコル対応ポート一覧

プロトコル標準ポートTLS ポート
OpenWire (JMS)6161661617
STOMP6161361614
AMQP 1.056725671
MQTT18838883
WebSocket6161461619

メンテナンスウィンドウ / エンドポイント分離

メンテナンスウィンドウ設定

Amazon MQ は週次メンテナンスウィンドウ内でマイナーバージョンの自動アップグレードを実施する。本番環境ではトラフィックが最低の時間帯に設定する。

# メンテナンスウィンドウを日曜 04:00 JST に設定
aws mq update-broker \
  --broker-id b-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx \
  --maintenance-window-start-time \
 DayOfWeek=SUNDAY,TimeOfDay=04:00,TimeZone=Asia/Tokyo \
  --auto-minor-version-upgrade true

エンドポイント分離 (VPC 内 vs パブリック)

配置オプションアクセス経路代表ユースケース
VPC 内 (推奨)ENI 直接 / VPC ピアリング本番システム・機密データ転送
パブリックインターネット (TLS 必須)開発・検証・外部パートナー連携

VPC 内配置を選択した場合、ブローカーのエンドポイント URL は b-xxxxxxxx.mq.region.amazonaws.com ではなく、VPC 内の ENI IP として解決される。セキュリティグループはブローカーのポート (61617 等) に対して必要なソース IP / セキュリティグループのみを許可する。

Amazon MQ 本番設計チェックリスト

1. Active/Standby 必須: Single-instance はメンテナンス時にダウンタイムが発生する。本番は必ず Active/Standby を選択する。
2. ブローカーサイジング: mq.m5.large を基準に、メッセージサイズ × スループットで上位サイズを検討する。ActiveMQ は Heap 使用率 (CloudWatch: HeapUsage) が 70% を超えると処理が詰まり始める。
3. 接続数上限: ActiveMQ はブローカーサイズ別に最大接続数の制限がある (mq.m5.large = 1,000 接続)。マイクロサービスが多数接続する場合はコネクションプールを必ず使用する。
4. ストレージ肥大化対策: 消費されないメッセージが蓄積すると EFS ストレージが増大する。デッドレターキューの定期クリーニングとメッセージ TTL の設定を組み合わせて管理する。
5. CloudWatch アラーム優先設定: TotalConsumerCount=0 (コンシューマー消失) と StorePercentUsage > 80 (ストレージ逼迫) を最優先でアラーム設定する。


Amazon MWAA 本番運用 — Airflow DAG / Worker/Scheduler / 実行ロール

Airflow DAG設計パターン

Amazon MWAA (Managed Workflows for Apache Airflow) は、Apache Airflowをフルマネージドで提供します。DAGファイルはS3のdags/フォルダに配置し、MWAAが自動的に同期します。

DAGファイル配置とS3連携

# s3://your-bucket/dags/sample_etl_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
 'owner': 'data-team',
 'retries': 2,
 'retry_delay': timedelta(minutes=5),
}

with DAG(
 dag_id='sample_etl_pipeline',
 default_args=default_args,
 schedule_interval='0 2 * * *',
 start_date=days_ago(1),
 catchup=False,
 tags=['etl', 'production'],
) as dag:
 pass

TaskGroupによるDAG構造化

SubDAGはAirflow 2.x で非推奨のためTaskGroupを使用します。

from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator

with DAG(dag_id='structured_etl', schedule_interval='@daily',
start_date=days_ago(1), catchup=False) as dag:

 with TaskGroup('extract') as extract_group:
  extract_s3  = PythonOperator(task_id='extract_s3',  python_callable=extract_from_s3)
  extract_rds = PythonOperator(task_id='extract_rds', python_callable=extract_from_rds)

 with TaskGroup('transform') as transform_group:
  transform = PythonOperator(task_id='transform', python_callable=transform_data)

 with TaskGroup('load') as load_group:
  load_s3 = PythonOperator(task_id='load_s3', python_callable=load_to_s3)

 extract_group >> transform_group >> load_group

動的DAG生成

# dags/dynamic_pipelines.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

DAG_CONFIGS = [
 {'dag_id': 'pipeline_salesforce',  'schedule': '0 1 * * *', 'source': 'salesforce'},
 {'dag_id': 'pipeline_s3_hourly','schedule': '0 * * * *', 'source': 's3'},
 {'dag_id': 'pipeline_rds_nightly', 'schedule': '0 3 * * *', 'source': 'rds'},
]

def create_dag(config: dict) -> DAG:
 with DAG(
  dag_id=config['dag_id'],
  schedule_interval=config['schedule'],
  start_date=days_ago(1),
  catchup=False,
 ) as dag:
  PythonOperator(
task_id='process',
python_callable=lambda source=config['source']: process(source),
  )
 return dag

for cfg in DAG_CONFIGS:
 globals()[cfg['dag_id']] = create_dag(cfg)

Worker / Scheduler / Webserver 構成

環境クラスvCPUメモリ推奨用途
mw1.small24 GB開発・PoC
mw1.medium48 GBステージング
mw1.large816 GB本番・大規模処理
# Terraform — MWAA環境定義
resource "aws_mwaa_environment" "prod" {
  name= "prod-mwaa"
  airflow_version = "2.8.1"
  environment_class  = "mw1.large"
  min_workers  = 2
  max_workers  = 10
  schedulers= 2
  source_bucket_arn  = aws_s3_bucket.mwaa.arn
  dag_s3_path  = "dags/"
  plugins_s3_path = "plugins/plugins.zip"
  execution_role_arn = aws_iam_role.mwaa_execution.arn

  airflow_configuration_options = {
 "core.parallelism" = "32"
 "core.dag_concurrency"= "16"
 "core.max_active_runs_per_dag" = "4"
  }

  network_configuration {
 security_group_ids = [aws_security_group.mwaa.id]
 subnet_ids= aws_subnet.private[*].id
  }

  logging_configuration {
 dag_processing_logs { enabled = true; log_level = "INFO" }
 scheduler_logs{ enabled = true; log_level = "WARNING" }
 worker_logs{ enabled = true; log_level = "INFO" }
 task_logs  { enabled = true; log_level = "INFO" }
  }
}
Worker自動スケーリング設計指針

min_workers ≥ 2: 0設定はコールドスタートで遅延が発生するため本番非推奨
max_workers: DAGの最大並列タスク数を想定して設定。過剰設定はコスト増大
schedulers ≥ 2: スケジューラ障害の単一障害点を排除。本番では必ず2以上
parallelism: 全Worker合計の同時タスク実行上限
dag_concurrency: 同一DAGの同時タスク実行上限


実行ロール設計 — aws_default接続 / Operator別IAMポリシー

{
  "Version": "2012-10-17",
  "Statement": [
 {
"Sid": "S3DataAccess",
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject", "s3:ListBucket"],
"Resource": [
  "arn:aws:s3:::your-data-bucket",
  "arn:aws:s3:::your-data-bucket/*"
]
 },
 {
"Sid": "GlueJobOperator",
"Effect": "Allow",
"Action": ["glue:StartJobRun", "glue:GetJobRun"],
"Resource": "arn:aws:glue:ap-northeast-1:123456789012:job/*"
 },
 {
"Sid": "StepFunctionsOperator",
"Effect": "Allow",
"Action": ["states:StartExecution", "states:DescribeExecution"],
"Resource": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:*"
 },
 {
"Sid": "SecretsAccess",
"Effect": "Allow",
"Action": ["secretsmanager:GetSecretValue"],
"Resource": "arn:aws:secretsmanager:ap-northeast-1:123456789012:secret:airflow/*"
 }
  ]
}

DAG管理 — Git連携 / CI/CD / 動的パラメータ

GitHub ActionsによるDAG自動デプロイ

# .github/workflows/deploy_dags.yaml
name: Deploy DAGs to MWAA
on:
  push:
 branches: [main]
 paths: ['dags/**']
jobs:
  validate:
 runs-on: ubuntu-latest
 steps:
- uses: actions/checkout@v4
- run: pip install apache-airflow==2.8.1
- name: DAG構文チェック
  run: |
 python -c "
 import glob, importlib.util, sys
 errors = []
 for f in glob.glob('dags/**/*.py', recursive=True):
  try:
spec = importlib.util.spec_from_file_location('dag', f)
mod  = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
  except Exception as e:
errors.append(f'{f}: {e}')
 if errors:
  print('ERRORS:', errors); sys.exit(1)
 "
  deploy:
 needs: validate
 runs-on: ubuntu-latest
 steps:
- uses: actions/checkout@v4
- uses: aws-actions/configure-aws-credentials@v4
  with:
 role-to-assume: arn:aws:iam::123456789012:role/github-actions-mwaa
 aws-region: ap-northeast-1
- run: |
 aws s3 sync dags/ s3://your-mwaa-bucket/dags/ \
--delete --exclude "*.pyc" --exclude "__pycache__/*"

Variables と Connections — Secrets Manager統合

from airflow.models import Variable
from airflow.hooks.base import BaseHook

env  = Variable.get('environment',default_var='dev')
s3_bucket  = Variable.get('data_bucket',default_var='dev-data-bucket')
batch_size = int(Variable.get('batch_size', default_var='100'))

conn= BaseHook.get_connection('redshift_prod')
host= conn.host
schema = conn.schema
# Secrets Managerバックエンド設定
secrets.backend: "airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend"
secrets.backend_kwargs: '{"connections_prefix":"airflow/connections","variables_prefix":"airflow/variables"}'

プラグイン管理 — カスタムOperator / Hook

# plugins/operators/data_quality_operator.py
from airflow.models import BaseOperator

class DataQualityOperator(BaseOperator):
 def __init__(self, table: str, threshold: float = 0.95, *args, **kwargs):
  super().__init__(*args, **kwargs)
  self.table  = table
  self.threshold = threshold

 def execute(self, context):
  score = self._check_quality(self.table)
  if score < self.threshold:
raise ValueError(f'品質スコア不足: {score:.2f} < {self.threshold}')
  return score

 def _check_quality(self, table: str) -> float:
  return 0.98
# scripts/deploy_plugins.py
import boto3, subprocess, os

def deploy_plugins(bucket: str, prefix: str = 'plugins/plugins.zip'):
 subprocess.run(
  ['zip', '-r', '/tmp/plugins.zip', '.', '-x', '*.pyc', '-x', '__pycache__/*'],
  cwd='plugins/', check=True
 )
 boto3.client('s3').upload_file('/tmp/plugins.zip', bucket, prefix)

if __name__ == '__main__':
 deploy_plugins(os.environ['MWAA_BUCKET'])

CloudWatchメトリクス監視

メトリクス説明推奨しきい値
DAGFileProcessingTotalParseTimeDAGファイル解析時間≤30秒
RunningTasks実行中タスク数max_workers×0.9超でアラート
QueuedTasksキュー待機タスク数≥10件で警告
SchedulerHeartbeatスケジューラ死活0回/分でクリティカル
# CloudWatchアラーム — スケジューラ死活監視 (Terraform)
resource "aws_cloudwatch_metric_alarm" "scheduler_heartbeat" {
  alarm_name = "mwaa-scheduler-heartbeat"
  comparison_operator = "LessThanThreshold"
  evaluation_periods  = 3
  metric_name= "SchedulerHeartbeat"
  namespace  = "AmazonMWAA"
  period  = 60
  statistic  = "Sum"
  threshold  = 1
  treat_missing_data  = "breaching"
  dimensions = { Environment = "prod-mwaa", Function = "Scheduler" }
  alarm_actions = [aws_sns_topic.critical_alerts.arn]
}
MWAA本番運用チェックリスト

デプロイ前確認:
– DAG構文チェックをCI/CDで自動化(構文エラーは全Worker停止の原因)
catchup=False 設定(意図しない過去分一括実行防止)
max_active_runs 設定(同一DAGの同時実行数制御)
– 実行ロールに最小権限ポリシーのみ付与

本番稼働中確認:
– SchedulerHeartbeatアラームで停止を即時検知
– QueuedTasksが継続増加する場合はmax_workers引き上げを検討
– S3 dags/バケットのバージョニング有効化(DAGロールバック対応)
– Secrets Managerシークレットのローテーション設定


詰まりポイント7選 + アンチパターン→正解パターン変換

詰まり①: Step Functions Standard料金爆発

詰まり①: Standard Workflowで高頻度処理 → 月次コストが50倍に

症状: 高頻度APIコール処理をStandard Workflowで実装したところ、月次コストが想定の50倍に膨張。

原因: Standard Workflowはステート遷移回数課金。毎秒1,000回×10ステート = 月3,000万回遷移。

解決策: 処理時間5分以内・高頻度・冪等設計可能 → Express Workflowに変更。

{
  "Comment": "Express Workflow — 高頻度APIコール向け",
  "StartAt": "CallAPI",
  "States": {
 "CallAPI": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:api-caller",
"End": true
 }
  }
}

判断基準: 処理時間 > 5分 または Exactly-once必須 → Standard。それ以外 → Express。


詰まり②: AppFlow タイムゾーン誤設定

詰まり②: AppFlow スケジュールがUTC基準で意図しない時刻に実行

症状: 毎日JST09:00に同期するよう設定したが、実際はJST00:00(UTC15:00)に実行。

原因: AppFlowのスケジュール設定は常にUTC基準。JST09:00 = UTC00:00を設定する必要がある。

import boto3
from datetime import datetime, timezone

client = boto3.client('appflow', region_name='ap-northeast-1')

# JST 09:00 = UTC 00:00 で設定
response = client.create_flow(
 flowName='salesforce-to-s3-daily',
 triggerConfig={
  'triggerType': 'Scheduled',
  'triggerProperties': {
'Scheduled': {
 'scheduleExpression': 'rate(1days)',
 'dataPullMode': 'Incremental',
 'scheduleStartTime': datetime(2026, 5, 28, 0, 0, 0, tzinfo=timezone.utc),
}
  }
 },
)

予防策: AppFlow設定ドキュメントに「スケジュール時刻はUTC基準」と明記。


詰まり③: Amazon MQ Network of Brokersでメッシュループ

詰まり③: 双方向コネクタ設定でメッセージが無限ループ

症状: 東京↔大阪ブローカーを双方向コネクタで接続したところ、転送済みメッセージが再転送されキューが急増。

原因: Broker A→BとB→Aの双方向コネクタを設定した際にメッシュループが発生。

解決策: duplex=false を明示し、一方向(ハブ→スポーク)で設定。

<!-- activemq.xml — ネットワークコネクタ正設定 -->
<networkConnectors>
  <networkConnector
 name="tokyo-to-osaka"
 uri="static:(ssl://osaka-broker:61617)"
 duplex="false"
 networkTTL="2"
 dynamicOnly="true">
 <excludedDestinations>
<queue physicalName="response.*" />
 </excludedDestinations>
  </networkConnector>
</networkConnectors>

詰まり④: MWAA DAG構文エラーで全Worker停止

詰まり④: 構文エラーのDAGファイルがS3にデプロイされて全DAGが停止

症状: DAGファイルを更新してS3にアップロードしたところ、全DAGのスケジューリングが停止。

原因: 構文エラーのあるDAGファイルがDAGプロセッサに読み込まれ、正常なDAGも巻き込んで停止。

解決策:
1. 構文エラーのDAGファイルをS3から即座に削除
2. DAGFileProcessingTotalParseTime メトリクスで異常検知(通常1-5秒が30秒超)
3. CI/CDパイプラインでDAG構文チェックを必須化


詰まり⑤: Step Functions 分散マップで子ステート爆発

詰まり⑤: Distributed Mapで大量アイテム処理 → APIレート制限エラー

症状: S3の100万件オブジェクトに分散マップを実行したところ TooManyRequests エラーで停止。

原因: MaxConcurrency 未設定で全アイテムの子実行を同時開始しようとしAPIレート制限に抵触。

{
  "Type": "Map",
  "ItemReader": {
 "Resource": "arn:aws:states:::s3:listObjectsV2",
 "Parameters": { "Bucket": "your-bucket", "Prefix": "data/" }
  },
  "MaxConcurrency": 100,
  "ToleratedFailurePercentage": 10,
  "ItemBatcher": {
 "MaxItemsPerBatch": 100
  },
  "Iterator": {
 "StartAt": "ProcessBatch",
 "States": {
"ProcessBatch": {
  "Type": "Task",
  "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:batch-processor",
  "End": true
}
 }
  }
}

詰まり⑥: AppFlow Salesforce認証期限切れ

詰まり⑥: Salesforce OAuthトークン有効期限切れでフロー失敗

症状: 毎日正常だったSalesforce→S3フローが突然失敗。ConnectorAuthenticationException: OAuth token expired

原因: Salesforce管理者がOAuth接続アプリのリフレッシュトークン有効期限を90日→15日に変更。

解決策:
1. AppFlowコンソール → 接続 → Salesforce接続を再認証
2. Salesforce OAuth接続アプリのリフレッシュトークン有効期限を確認・調整
3. CloudWatch EventsでAppFlowフロー失敗を検知するアラートを設定

# Lambda — AppFlowフロー失敗通知
import boto3

def handler(event, context):
 detail = event.get('detail', {})
 if detail.get('status') == 'Execution Failed':
  flow_name = detail.get('flowName', 'unknown')
  error  = detail.get('errorInfo', {}).get('executionMessage', '')
  boto3.client('sns').publish(
TopicArn='arn:aws:sns:ap-northeast-1:123456789012:alerts',
Subject=f'AppFlowフロー失敗: {flow_name}',
Message=f'フロー: {flow_name}\nエラー: {error}',
  )

詰まり⑦: MWAA タイムスタンプ不整合でDAG重複実行

詰まり⑦: execution_date の不整合でDAGが重複実行・データ二重書き込み

症状: 夏時間切り替え直後、ETL DAGが1日分を2回実行しデータウェアハウスにレコードが重複。

原因: schedule_interval にJSTを想定したcron式を設定していたが、AirflowはUTC基準で動作。夏時間の切り替わりで同一 execution_date の重複実行が発生。

解決策: スケジュールはUTC基準で設計し、pendulum でタイムゾーンを明示する。

import pendulum
from airflow import DAG

local_tz = pendulum.timezone('Asia/Tokyo')

with DAG(
 dag_id='timezone_safe_etl',
 schedule_interval='0 0 * * *',  # UTC 00:00 = JST 09:00
 start_date=pendulum.datetime(2026, 1, 1, tz=local_tz),
 catchup=False,
) as dag:
 pass

冪等設計: execution_date をパーティションキーにしてUPSERTし、重複実行を無害化する。


アンチパターン→正解パターン変換 5問

Q1: Step Functions — Retry/Catch未設定

// アンチパターン: Lambda呼び出しにRetry/Catchなし
{ "Type": "Task", "Resource": "arn:aws:lambda:...:function:processor", "End": true }
// 正解パターン: Retry + Catch + DLQ送信
{
  "Type": "Task",
  "Resource": "arn:aws:lambda:...:function:processor",
  "Retry": [
 {
"ErrorEquals": ["Lambda.ServiceException", "Lambda.TooManyRequestsException"],
"IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2
 }
  ],
  "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "SendToDLQ" }],
  "Next": "Success"
}

Q2: AppFlow — スケジュール過剰実行

# アンチパターン: 毎分スケジュール実行(不必要なAPI呼び出し)
trigger_config = {'triggerType': 'Scheduled', 'scheduleExpression': 'rate(1minutes)'}

# 正解パターン: イベントトリガ(データ更新時のみ実行)
trigger_config = {'triggerType': 'Event'}

Q3: Amazon MQ — 接続数上限未設定

<!-- アンチパターン: maxConnections未設定でブローカー過負荷 -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>

<!-- 正解パターン: 接続数上限とHeartbeat設定 -->
<transportConnector
  name="openwire"
  uri="tcp://0.0.0.0:61616?maximumConnections=500&amp;wireFormat.maxFrameSize=104857600"
  updateClusterClients="true"/>

Q4: MWAA — catchup=Trueによる過去分一括実行

# アンチパターン: catchup=True(デフォルト)で720回のDAGRunが即座に開始
with DAG(dag_id='etl', schedule_interval='@hourly', start_date=days_ago(30)) as dag:
 pass

# 正解パターン: catchup=False で最新分のみ実行
with DAG(dag_id='etl', schedule_interval='@hourly',
start_date=days_ago(30), catchup=False) as dag:
 pass

Q5: Step Functions — 分散マップのエラー許容未設定

// アンチパターン: ToleratedFailurePercentage未設定 → 1件失敗で全体停止
{ "Type": "Map", "MaxConcurrency": 100, "Iterator": {} }

// 正解パターン: 10%失敗まで許容し部分成功を継続
{
  "Type": "Map",
  "MaxConcurrency": 100,
  "ToleratedFailurePercentage": 10,
  "ToleratedFailureCount": 1000,
  "Iterator": {}
}

Vol1×Vol2統合アーキテクチャ + 選定フローチャート

sequenceDiagram
 participant Vol1 as Vol1 メッセージング基盤
 participant Vol2 as Vol2 ワークフロー・統合
 participant Prod as 本番App Integration環境
 Vol1->>Prod: SQS/SNS/EventBridge/API GW/AppSync基盤
 Vol2->>Prod: Step Functions/AppFlow/MQ/MWAA高度化
 Prod-->>Vol1: 単純メッセージング → Vol2でワークフロー化
 Prod-->>Vol2: 複雑統合 → Vol1基盤を活用
 Note over Vol1,Vol2: App Integration完全体<br/>Foundation → Orchestration

Vol1(SQS/SNS/EventBridge/API Gateway/AppSync)で確立したメッセージング基盤に、Vol2のオーケストレーション層を統合すると、シンプルなメッセージ転送から複雑なマルチサービス連携まで対応できるApplication Integration完全体が完成する。

本章では4つの代表的な統合パターンを具体的なAWS設定コードと共に解説し、「どのサービスを選ぶか」の判断基準を体系化したフローチャートを提示する。Cloud Architectは4パターンを使い分けることで、要件ごとに最適なサービス組み合わせを即座に選定できるようになる。

パターン1 — API GW → Step Functions Express → SQS (Vol1) → Lambda

同期APIリクエストをワークフロー化し、Vol1のSQSキュー経由でLambdaに渡すパターン。API Gatewayがフロントエンドとして機能し、Step Functions Expressがバリデーション・ルーティング・エラーハンドリングを担当する。Expressは最大5分・高頻度実行に特化しており、APIレスポンスを待たずに処理をキューイングできる。

パターン1 — 適用シーンと特徴
  • Eコマース注文処理: API GW受信 → SFN Express(在庫確認・バリデーション)→ SQS → 決済Lambda
  • フォーム送信パイプライン: API GW → SFN Express(バリデーション・フィルタ)→ SQS(Vol1)→ 通知Lambda
  • リアルタイム通知連携: API GW同期リクエスト → SFN Express(マルチステップ処理)→ SNS(Vol1)→ 各チャネル配信
  • メリット: API GWのリクエスト/レスポンスとSQSの非同期処理を完全分離。Step Functionsで可視化・リトライを一元管理できる。
{
  "Comment": "API GW → Step Functions Express → SQS → Lambda パターン",
  "StartAt": "ValidateRequest",
  "States": {
 "ValidateRequest": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
  "FunctionName": "validate-request",
  "Payload.$": "$"
},
"Next": "RouteToQueue",
"Retry": [
  {
 "ErrorEquals": ["Lambda.ServiceException"],
 "IntervalSeconds": 2,
 "MaxAttempts": 3,
 "BackoffRate": 2
  }
],
"Catch": [
  {
 "ErrorEquals": ["ValidationError"],
 "Next": "ReturnValidationError"
  }
]
 },
 "RouteToQueue": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage",
"Parameters": {
  "QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789012/process-queue.fifo",
  "MessageBody.$": "States.JsonToString($.payload)",
  "MessageGroupId.$": "$.customerId",
  "MessageDeduplicationId.$": "$.requestId"
},
"End": true
 },
 "ReturnValidationError": {
"Type": "Fail",
"Error": "ValidationFailed",
"Cause": "Input validation failed"
 }
  }
}

API Gateway → Step Functions Express の接続はHTTP API + EventBridge Pipe、または API Gateway → Lambda(起動処理)→ SFN Express の2段階で構成できる。Express WorkflowはCloudWatch Logsへのリアルタイム実行ログ出力に対応しており、リクエストごとのトレースが容易になる。

パターン2 — EventBridge → AppFlow (Vol2) → S3 → Step Functions

EventBridgeのスケジュールイベントでAppFlowをトリガし、Salesforce/SAP等のSaaSからS3へのデータ同期完了後にStep Functionsでデータ変換パイプラインを起動するパターン。AppFlowの実行完了イベントはEventBridgeに自動送信されるため、ポーリング不要でS3到着後の処理を即座に開始できる。

{
  "source": ["aws.appflow"],
  "detail-type": ["AppFlow End Flow Run Report"],
  "detail": {
 "status": ["Execution Successful"],
 "flow-name": ["salesforce-to-s3-daily"]
  }
}

上記EventBridgeルールにStep FunctionsをターゲットとしてIAMロール付きで設定することで、AppFlow完了 → Step Functions自動起動が実現する。AppFlowのデータ変換機能(マスク・連結・算術演算)でSalesforceカスタムフィールドをParquet形式に変換し、Step Functions Standardでデータ品質チェック・DynamoDB書き込み・SNS通知を一連のワークフローとして管理できる。

import boto3
import json

def start_appflow_and_setup_monitor(flow_name: str, sm_arn: str) -> dict:
 """AppFlowフロー手動起動 — 完了後はEventBridge経由でStep Functionsが自動起動"""
 appflow = boto3.client('appflow', region_name='ap-northeast-1')

 response = appflow.start_flow(flowName=flow_name)

 return {
  "flowExecutionId": response['executionId'],
  "flowStatus": response['flowStatus'],
  "nextAction": "EventBridge自動トリガ待機中",
  "targetStateMachine": sm_arn,
  "note": "AppFlow完了イベントがEventBridgeに自動送信 → Step Functionsが起動"
 }

AppFlow → S3 → Step Functions パターンはSalesforce Opportunity → S3 CSV → ETL処理という典型的なCRM連携シナリオに最適。EventBridge連携により完全サーバーレスのイベントドリブン統合パイプラインが完成する。

パターン3 — SNS Fan-out (Vol1) → Amazon MQ (Vol2) → JMS Subscribers

Vol1のSNS Fan-outパターンにAmazon MQを組み合わせ、JMS対応のエンタープライズシステムをSubscriberとして統合するパターン。SNS Topicへの単一Publishで、AWSネイティブ(SQS + Lambda)とレガシーMQシステム(JMS/STOMP)の両方に同時配信できる。

import boto3
import json
import stomp

def configure_sns_mq_fanout(sns_topic_arn: str, mq_endpoint: str, bridge_queue_url: str) -> None:
 """SNS Fan-out: SQS(Lambdaへ) + Amazon MQ(JMS Subscriberへ) の二股配信設定"""
 sns = boto3.client('sns', region_name='ap-northeast-1')
 sqs = boto3.client('sqs', region_name='ap-northeast-1')

 queue_attrs = sqs.get_queue_attributes(
  QueueUrl=bridge_queue_url,
  AttributeNames=['QueueArn']
 )
 bridge_queue_arn = queue_attrs['Attributes']['QueueArn']

 sns.subscribe(
  TopicArn=sns_topic_arn,
  Protocol='sqs',
  Endpoint=bridge_queue_arn
 )

 conn = stomp.Connection(host_and_ports=[(mq_endpoint, 61613)])
 conn.connect(login='mquser', passcode='mqpassword', wait=True)
 print(f"Connected to Amazon MQ at {mq_endpoint}")
 return conn

エンタープライズシステム統合における代表的な組み合わせは以下の通りである:

レガシーシステムVol1サービスVol2サービス接続プロトコル
IBM MQSNS Fan-outAmazon MQ(ActiveMQ)JMS/OpenWire
RabbitMQ on-premSQSAmazon MQ(RabbitMQ)AMQP 0.9.1
TIBCO EMSSNSAmazon MQ(ActiveMQ)STOMP
ActiveMQ on-premSQS + LambdaAmazon MQ(ActiveMQ)OpenWire直接移行

このパターンはオンプレミスMQシステムからAWSへのマイグレーション期間中の並行稼働に特に有効。既存JMSクライアントを無修正のまま接続でき、ActiveMQ Active/Standby構成と組み合わせることでエンタープライズレベルの高可用性を実現できる。

パターン4 — MWAA → Step Functions Standard → 複合データパイプライン

MWAA(Managed Apache Airflow)でDAGとして定義したスケジュールバッチから、Step Functions Standardのステートマシンを起動し複合データパイプラインを実行するパターン。AirflowのDAG依存関係管理とStep Functionsのマイクロサービスオーケストレーションを組み合わせることで、エンタープライズ規模のデータパイプラインを構築できる。

from airflow import DAG
from airflow.providers.amazon.aws.operators.step_function import StepFunctionStartExecutionOperator
from airflow.providers.amazon.aws.sensors.step_function import StepFunctionExecutionSensor
from datetime import datetime, timedelta
import json

default_args = {
 "owner": "data-team",
 "retries": 2,
 "retry_delay": timedelta(minutes=10),
 "email_on_failure": True,
}

with DAG(
 dag_id="mwaa_sfn_composite_pipeline",
 default_args=default_args,
 schedule_interval="0 2 * * *",
 start_date=datetime(2026, 1, 1),
 catchup=False,
 tags=["production", "data-pipeline"],
) as dag:

 start_etl = StepFunctionStartExecutionOperator(
  task_id="start_step_functions_etl",
  state_machine_arn="arn:aws:states:ap-northeast-1:123456789012:stateMachine:composite-etl-pipeline",
  name="{{ ds_nodash }}-etl-run",
  input=json.dumps({
"date": "{{ ds }}",
"sourceBucket": "s3://data-lake/raw/{{ ds }}/",
"targetBucket": "s3://data-lake/processed/{{ ds }}/"
  }),
  aws_conn_id="aws_default",
 )

 wait_for_etl = StepFunctionExecutionSensor(
  task_id="wait_for_etl_completion",
  execution_arn="{{ task_instance.xcom_pull(task_ids='start_step_functions_etl') }}",
  aws_conn_id="aws_default",
 )

 start_etl >> wait_for_etl

MWAAのStepFunctionExecutionSensorを使用することで、Airflow DAGがStep Functionsの完了を待機してから後続タスクを実行できる。長期実行ETL処理をStep Functions Standardでオーケストレートし、全体スケジューリングはMWAAが担う役割分担が推奨構成となる。

選定フローチャート — 同期 vs 非同期 → 短期 vs 長期 → ワークフロー vs データ統合 vs MQ

4パターンの経験をベースに、初見の要件に対してどのサービスを選ぶかを体系的に判断するフローチャートを以下に示す。「同期 vs 非同期 → 短期 vs 長期 → ワークフロー vs データ統合 vs MQ vs DAGスケジューリング」の順で評価することで、最適なサービスを即座に特定できる。

[スタート: 要件の特定]

Q1: 同期(即時応答が必須)?
YES → API Gateway + Lambda / Step Functions Express
  │
  ├─ 複数ステップの処理が必要 → Step Functions Express
  └─ 単一Lambda処理で十分 → API GW + Lambda直接統合

NO  → 非同期処理 → Q2へ

Q2: 実行時間・頻度は?
5分以内 かつ 高頻度(秒〜分単位トリガ) → Step Functions Express
長時間(分〜時間・日単位)または状態保存が必要 → Step Functions Standard
どちらか確定したら → Q3へ

Q3: 主目的は?
SaaS(Salesforce/SAP/Slack等)との双方向データ統合
 → Amazon AppFlow(14種以上の公式コネクタを活用)

JMS/STOMP/AMQPプロトコルが必須(既存MQシステムとの接続)
 → Amazon MQ(ActiveMQ or RabbitMQ)

複雑なDAG依存関係付きバッチスケジューリング(ML/ETL)
 → Amazon MWAA(Managed Airflow)

AWSネイティブメッセージングで完結
 → SQS/SNS/EventBridge(Vol1)

Q4: Vol1資産との連携は?
SQS/SNSをサブシステムとして活用 → Vol2サービス + Vol1サービス組み合わせ
EventBridgeトリガが起点 → EventBridge → Vol2サービスのChain
AppSync(GraphQL)がフロントエンド → AppSync → Step Functionsのリアルタイム統合
Vol1×Vol2 統合設計の鉄則 — サービス選定早見表

| 要件 | 推奨サービス | シリーズ |
|——|————-|———|
| 同期APIリクエスト受信 | API Gateway + Lambda | Vol1 |
| 複数ステップの同期ワークフロー | API GW + Step Functions Express | Vol1+2 |
| 高頻度・短時間の非同期処理 | SQS + Lambda / SFN Express | Vol1+2 |
| 長期実行・状態管理が必要 | Step Functions Standard | Vol2 |
| SaaS → AWS データ同期 | Amazon AppFlow | Vol2 |
| JMS/STOMP/MQTT対応 | Amazon MQ(ActiveMQ or RabbitMQ) | Vol2 |
| Airflow DAG + 複雑依存関係 | Amazon MWAA | Vol2 |
| イベントドリブン通知配信 | EventBridge + SNS | Vol1 |
| シンプルスケジュールcron | EventBridge Scheduler | Vol1 |
| GraphQL + リアルタイム更新 | AppSync + DynamoDB Streams | Vol1 |

Vol1×Vol2を組み合わせる際の鉄則は「Vol1で流し、Vol2でオーケストレートする」である。SQS/SNS/EventBridgeでメッセージを確実に転送・分散し、Step Functions/AppFlow/MQ/MWAAで高度化・可視化することで、スケーラビリティとメンテナビリティを両立できる。

4パターンは固定ではなく、要件進化とともに組み合わせを変えることができる。例えばEventBridge(Vol1)のスケジュールイベントにAppFlow(Vol2)を繋ぎ、完了後にStep Functions Standard(Vol2)で処理し、結果をSNS(Vol1)で通知するフルスタック統合パターンも構成可能だ。選定フローチャートと4パターンを組み合わせることで、どのような要件が来ても「Vol1の何を使い、Vol2の何で高度化するか」を迷わず決定できる設計力が身につく。


まとめ — Application Integration 二部作完成 + 全軸クロスリンク

§8-1 — Vol2 4サービス要点まとめ

Vol2で学んだ4サービスの要点を整理する。Vol1(SQS/SNS/EventBridge/API GW/AppSync)と組み合わせることでApplication Integration完全体が完成し、あらゆる統合要件に対応できるアーキテクチャ基盤が整う。

サービス主目的代表ユースケース実行時間上限料金単位
Step Functions Standard長期ワークフロー・状態管理ETL/承認フロー/分散バッチ最大1年状態遷移数
Step Functions Express高頻度・高スループットAPI処理/イベント変換/ストリーム処理最大5分実行時間×リクエスト数
Amazon AppFlowSaaS-AWS双方向データ統合Salesforce/SAP/Slack→S3/Redshift1回/フローフロー実行×処理データ量
Amazon MQ (ActiveMQ)JMSエンタープライズメッセージングレガシー移行/JMSアプリ接続常時稼働ブローカー時間×ストレージ
Amazon MQ (RabbitMQ)AMQPメッセージングマイクロサービス間非同期通信常時稼働ブローカー時間×ストレージ
Amazon MWAAManaged Airflow DAGスケジューリング複雑依存バッチ/MLパイプラインDAG単位環境時間+ワーカー数

設計判断の重要な境界線:

判断基準選択肢A選択肢B判断ポイント
Standard vs ExpressStandardExpress5分超または状態保存 → Standard / 高頻度・短時間 → Express
AppFlow vs LambdaAppFlowカスタムLambdaSaaS公式コネクタが存在 → AppFlow / 独自API → Lambda
MQ vs SQSAmazon MQSQSJMS/STOMP/AMQP必須 → MQ / AWSネイティブ → SQS
MWAA vs EventBridgeMWAAEventBridge Scheduler複雑DAG依存 → MWAA / シンプルcron → EventBridge

エラーハンドリング戦略の比較:

サービスリトライ機構失敗時の通知可視化
Step Functions StandardASL Retry/Catch定義EventBridge → SNSコンソールで実行ツリー確認可能
Step Functions ExpressASL Retry/Catch定義CloudWatch Logs高頻度のため集計メトリクスで監視
Amazon AppFlowフロー設定でリトライ回数指定SNS通知設定Flow run履歴画面
Amazon MQDeadLetterQueue設定CloudWatch AlarmsActiveMQ管理コンソール
Amazon MWAADAG retry / task retry設定メールアラート / SNSAirflow Web UI

§8-2 — 全軸クロスリンク

Application Integration本番運用シリーズはVol1+Vol2で完結するが、実際の本番環境ではデータベース・ストレージ・コンテナ・サーバーレスなど複数の軸と組み合わせて使用する。以下のクロスリンクで関連軸の記事を参照し、統合的なAWSアーキテクチャ設計力を身につけよう。

Application Integrationシリーズ — 二部作全体ナビゲーション

Vol1+Vol2 = Application Integration完全体 — 基盤メッセージング(Vol1)× オーケストレーション(Vol2)で全要件をカバー

関連軸クロスリンク — Application Integrationと連携する主要サービス群

Application Integrationの真価はこれらの関連軸と組み合わせることで発揮される。AppFlow → S3 → Athena(Analytics)のパイプラインや、Step Functions → DynamoDB(Database)の直接統合など、軸をまたいだアーキテクチャ設計が本番環境の標準パターンとなる。

さらに深掘りできる関連軸 — コンテナ・ネットワーク・移行

§8-3 — Application Integration二部作完成宣言

Vol1(基盤)+ Vol2(ワークフロー/統合)= App Integration完全体 — 第25軸二部作完成

Vol1で5つのメッセージングサービス(SQS/SNS/EventBridge/API Gateway/AppSync)を網羅し、Vol2で4つのオーケストレーションサービス(Step Functions/AppFlow/Amazon MQ/MWAA)を完全解説した。2本の記事を通じて、AWSのApplication Integration領域全体をカバーする体系的な知識を提供できた。

Vol1で習得したこと:

  • SQS FIFO/標準キュー — 順序保証・冪等性・デッドレターキューの設計
  • SNS Fan-out — トピック設計・フィルタポリシー・クロスアカウント配信
  • EventBridge — ルール設計・スケジューラ・カスタムバス・パイプ
  • API Gateway — REST/HTTP API・オーソライザ・スロットリング・VPC統合
  • AppSync — GraphQL・リゾルバ・リアルタイムサブスクリプション・マッピングテンプレート

Vol2で習得したこと:

  • Step Functions Standard — 長期実行・エラーハンドリング・分散マップ・コールバック
  • Step Functions Express — 高頻度実行・SDK直接統合・同期実行
  • Amazon AppFlow — SaaS統合・スケジュールトリガ・データ変換・PrivateLink
  • Amazon MQ — ActiveMQ/RabbitMQ・Active/Standby・Network of Brokers・JMS/STOMP/MQTT
  • Amazon MWAA — DAG設計・Worker/Scheduler・実行ロール・プラグイン管理

Vol1+Vol2の統合で実現したこと:

  • API GW → SFN Express → SQS → Lambda の同期+非同期橋渡しパターン
  • EventBridge → AppFlow → S3 → Step Functions のSaaS統合パイプライン
  • SNS Fan-out → Amazon MQ → JMS のエンタープライズブリッジパターン
  • MWAA → Step Functions Standard → 複合データパイプライン のハイブリッドオーケストレーション

本二部作を完読したCloud Architect / Backend Engineerは、AWSのApplication Integration領域における設計判断を自信を持って行えるようになる。「どのサービスを使うか」の迷いがなくなり、要件定義から本番運用まで一貫した設計方針を持てる状態が達成できる。

64記事化達成 — 第25軸二部作完成:

本記事(Vol2)の公開により、AWSハンズオンシリーズは第25軸の二部作を完成させ、累計64記事化を達成した。コンピューティング・ネットワーク・セキュリティ・データベース・ストレージ・コンテナ・サーバーレス・機械学習・アナリティクス・Application Integration など25の軸にわたる包括的なAWS本番運用ガイドが完成した。

設計力向上のための3つの視点:

Vol1+Vol2を学んだ後、さらに設計力を高めるには以下の3つの視点が有効である。

  1. レイヤー思考: メッセージング層(Vol1)とオーケストレーション層(Vol2)を明確に分離した設計を意識する。各層の責務を明確にすることで、将来の変更に強いアーキテクチャが構築できる。

  2. コスト最適化: Step Functions ExpressとStandard、Amazon MQのブローカーサイズ、MWAAの環境クラスは要件に応じて適切に選択する。設計段階でAWS Pricing Calculatorを用いたコスト見積もりを行うことが推奨される。

  3. 可観測性: 各サービスのCloudWatch連携(Step Functions実行ログ、AppFlow実行レポート、MQブローカーメトリクス、MWAA DAGログ)を事前に設定し、本番環境での障害対応を迅速化する。

§8-4 — 読者アクションリスト

本記事の内容を自分のプロジェクトに活かすために、以下のアクションリストを実践しよう。

Step Functions習得アクション:

  • [ ] Step Functions Standard/Expressのコスト計算ツールで自プロジェクトの料金を試算する
  • [ ] AWSコンソールでサンプルASLステートマシンを作成し、実行履歴の可視化を体験する
  • [ ] 既存Lambda Chainをステートマシンに書き換え、エラーハンドリングの改善効果を確認する
  • [ ] 分散マップ(Distributed Map)で大規模並列処理のパフォーマンス検証を実施する

Amazon AppFlow習得アクション:

  • [ ] Salesforce Developer Edition(無料)でAppFlowの接続フローをトライアル実行する
  • [ ] S3出力先のParquetフォーマット変換を設定し、Athenaでのクエリを試す
  • [ ] EventBridge連携を設定し、AppFlow完了トリガーでLambdaが自動起動することを確認する
  • [ ] プライベート接続設定を検討し、社内データソースとの安全な統合を計画する

Amazon MQ習得アクション:

  • [ ] Amazon MQ for ActiveMQをSingle-instance Brokerで起動し、JMSクライアントで接続テストする
  • [ ] Active/Standby構成でフェイルオーバーをシミュレートし、切り替え時間を計測する
  • [ ] 既存のオンプレミスActiveMQとAmazon MQのNetwork of Brokersを構成し、メッセージ転送を確認する
  • [ ] RabbitMQ版でAMQP接続をテストし、既存RabbitMQアプリの移行可能性を評価する

Amazon MWAA習得アクション:

  • [ ] ローカルのApache Airflowで動作確認済みのDAGをMWAAに移行し、依存関係の差異を確認する
  • [ ] MWAAとS3の連携でDAGファイルの自動同期を設定し、GitOpsパターンを構築する
  • [ ] StepFunctionStartExecutionOperatorを使用してStep FunctionsをMWAAから起動するDAGを作成する
  • [ ] MWAA Webサーバーへのアクセス制御(VPC Private/Public)を要件に合わせて設定する

Vol1×Vol2統合 実践アクション:

  • [ ] §7で解説した4パターンのうち、自プロジェクトに最も近いパターンを1つ選んでプロトタイプを作成する
  • [ ] 選定フローチャートを使って現在の設計を評価し、より適切なサービス組み合わせがないか検討する
  • [ ] Vol1のEventBridgeルールとVol2のAppFlow/Step Functionsを繋ぐイベントドリブンパイプラインを設計書に落とす
  • [ ] AWS Well-Architected ToolでApplication Integrationの設計をレビューし、改善ポイントを把握する

§8-5 — 公式ドキュメント & クロスリンク

本記事で解説した4サービスの公式ドキュメントと関連記事へのリンクをまとめる。実装時の詳細仕様や最新情報は常に公式ドキュメントを参照することを推奨する。

各サービスの公式ドキュメントには本記事で触れられなかった高度な機能(Step FunctionsのVersioning・Alias管理、AppFlowのカスタムコネクタ開発、Amazon MQのネットワーク暗号化設定、MWAAのカスタムプラグイン開発)が詳細に解説されている。本番環境で問題が発生した際は、公式ドキュメントのトラブルシューティングセクションとAWS re:Postコミュニティを優先的に参照すること。

AWS Step Functions 公式ドキュメント — ASL / Standard/Express / エラーハンドリング

Amazon MQ 公式ドキュメント — ActiveMQ / RabbitMQ / Network of Brokers

Application Integration本番運用 Vol1 — SQS × SNS × EventBridge × API GW × AppSync

Amazon MWAA 公式ドキュメント — Airflow DAG / Worker / Scheduler / 実行ロール


Application Integration Vol1+Vol2 二部作完成 — AWSのメッセージング・ワークフロー・データ統合・エンタープライズMQ・スケジュールバッチの全領域をカバーする包括的なガイドが完成した。本二部作で身につけた設計力を活かして、エンタープライズ規模のAWS統合アーキテクチャを自信を持って設計・実装しよう。

次のステップ — 学習ロードマップ:

Application Integration Vol1+Vol2の学習完了後は、以下のロードマップで知識を体系化することを推奨する。

  1. 即時実践: 本記事のコードサンプルをAWSコンソールまたはCLIで試す(まずStep Functionsサンプルステートマシンから)
  2. 実プロジェクト適用: 既存のLambda Chain/直接API呼び出しをStep Functionsに段階的に移行する
  3. 隣接軸の習得: Analytics(Glue×Athena)を学び、AppFlowとの統合パイプラインを構築する
  4. アーキテクチャレビュー: AWS Well-Architected Frameworkのサーバーレスレンズを参照し、設計を評価する
  5. コスト最適化: 本番稼働後3ヶ月で利用パターンを分析し、Standard/Expressの使い分けを最適化する

Application IntegrationはAWSアーキテクチャの中核となる領域であり、全サービスの設計品質に直結する。本二部作を基盤として、AWS Certified Solutions Architect – Professional(SAP-C02)レベルのアーキテクチャ設計力を継続的に磨いていこう。

最後に — Application Integration設計力の本質:

本番運用で真に価値が出るのは、個別サービスの知識よりも「どのサービスをどの組み合わせで使うか」という判断力だ。本二部作で体系化した設計パターン・選定フローチャート・統合アーキテクチャを手元に置き、新しい要件が来るたびに参照することで、設計力は確実に蓄積されていく。

SQSがあるからStep Functionsは不要ではなく、Step FunctionsがあるからSQSは不要でもない。Vol1とVol2は競合ではなく補完関係にある。この相補的な設計思想こそが、Application Integration完全体の核心である。