- 1 なぜ Application Integration Vol2 か — 4層アーキテクチャ概観
- 2 AWS Step Functions Workflows 本番運用 — Standard vs Express / エラーハンドリング / 分散マップ
- 3 Amazon AppFlow 本番運用 — Salesforce/SAP/Slack統合 / スケジュールトリガ
- 4 Amazon MQ 本番運用 — ActiveMQ vs RabbitMQ / Network of Brokers / JMS/STOMP/MQTT
- 5 Amazon MWAA 本番運用 — Airflow DAG / Worker/Scheduler / 実行ロール
- 6 詰まりポイント7選 + アンチパターン→正解パターン変換
- 7 Vol1×Vol2統合アーキテクチャ + 選定フローチャート
- 7.1 パターン1 — API GW → Step Functions Express → SQS (Vol1) → Lambda
- 7.2 パターン2 — EventBridge → AppFlow (Vol2) → S3 → Step Functions
- 7.3 パターン3 — SNS Fan-out (Vol1) → Amazon MQ (Vol2) → JMS Subscribers
- 7.4 パターン4 — MWAA → Step Functions Standard → 複合データパイプライン
- 7.5 選定フローチャート — 同期 vs 非同期 → 短期 vs 長期 → ワークフロー vs データ統合 vs MQ
- 8 まとめ — Application Integration 二部作完成 + 全軸クロスリンク
なぜ Application Integration Vol2 か — 4層アーキテクチャ概観
- Application Integration本番運用Vol1 (SQS / SNS / EventBridge / API Gateway / AppSync)
- Application Integration本番運用Vol2 (Step Functions / AppFlow / Amazon MQ / MWAA) 本記事
Vol1(統合プラットフォーム基盤) — つなぐ・流す・制御する
Vol2(ワークフロー・データ統合・エンタープライズMQ・本記事) — オーケストレートする・統合する・橋渡しする・スケジュールする
| 章 | テーマ | 主な施策 |
|—-|——–|———|
| §2 | AWS Step Functions Workflows 本番運用 | Standard/Express・エラーハンドリング・分散マップ・コールバック |
| §3 | Amazon AppFlow 本番運用 | Salesforce/SAP/Slack統合・スケジュール/イベントトリガ |
| §4 | Amazon MQ 本番運用 | ActiveMQ/RabbitMQ・Network of Brokers・JMS/STOMP/MQTT |
| §5 | Amazon MWAA 本番運用 | Airflow DAG・Worker/Scheduler・実行ロール |
| §6 | 詰まりポイント7選 + 演習 | 頻出パターンの解決策 + アンチパターン変換5問 |
| §7 | Vol1×Vol2統合アーキテクチャ | 選定フローチャート + 完全体フロー |

AWS Application Integration は「つなぐ・流す・制御する・オーケストレートする」の4層で構成される。Vol1(SQS / SNS / EventBridge / API Gateway / AppSync)でメッセージング統合プラットフォームの基盤を確立した。Vol2(本記事)はその上位層として、複雑な処理フローの調整・SaaSデータ統合・エンタープライズMQブローカー管理・バッチワークフロースケジューリングという4つの高度化レイヤーを担う。
Vol1 + Vol2 = Application Integration 完全体
Vol1が解決した課題(基盤層):
- SQS でサービス間の非同期デカップリングを実現
- SNS でファンアウト配信・複数サブスクライバーへの並列通知を確立
- EventBridge でイベント駆動アーキテクチャのルーティング基盤を構築
- API Gateway で外部向けエンドポイントとリクエスト制御を整備
- AppSync でリアルタイムGraphQL APIとサブスクリプション基盤を提供
Vol2が解決する課題(オーケストレーション層):
Vol1基盤の上で「ステート管理・条件分岐・エラー回復が必要な複雑処理フロー」が発生したとき、Step Functions が調整役として機能する。SalesforceやSAP等のSaaSとAWS間のデータ同期にはAppFlowが最適解になる。既存のActiveMQ/RabbitMQをクラウド移行する際はAmazon MQで互換性を維持できる。そしてAirflowベースのバッチDAGスケジューリングにはMWAAが対応する。
| レイヤー | サービス | 担う役割 | Vol1との関係 |
|---|---|---|---|
| ワークフロー | Step Functions | 複雑なステートマシン・エラーハンドリング | EventBridge → Step Functions 実行トリガー |
| データ統合 | Amazon AppFlow | SaaS↔AWS双方向データ同期 | S3/Redshiftへの格納はSNS通知連携可 |
| エンタープライズMQ | Amazon MQ | JMS/STOMP/MQTT互換ブローカー | SQSへの移行困難なレガシーシステム対応 |
| バッチスケジューリング | Amazon MWAA | Airflow DAGによる複雑依存スケジュール | EventBridgeと組み合わせたイベントトリガーも可 |
4層アーキテクチャで達成できる本番運用状態
本記事(Vol2)を読み終えた時点で以下の設計・実装能力が身につく。
- Step Functions: Standard vs Express の選択判断・Retry/Catch による耐障害設計・Distributed Map での10,000並行処理・コールバックを用いた人手承認フロー・SDK直接統合による270+サービスへのノーコード接続
- Amazon AppFlow: Salesforce/SAP/Slack等SaaSとのスケジュール/イベントトリガー同期・フィールドマッピング・データ変換・VPC経由プライベート接続
- Amazon MQ: ActiveMQ vs RabbitMQの選択・Active/Standby冗長構成・Network of Brokers 広域分散・JMS/STOMP/MQTTプロトコル別ユースケース
- Amazon MWAA: Airflow DAG設計・Worker/Schedulerのスケール設定・実行ロール権限・プラグイン管理・DAGデプロイパイプライン
対象読者とスキル前提
本記事はVol1既読(またはSQS/SNS/EventBridgeの基本理解がある)Cloud Architect / Backend Engineerを対象とする。AWSマネジメントコンソールの基本操作・IAMロールの概念・PythonまたはNode.jsの基礎があれば、コードサンプルをそのまま本番環境へ適用できる。
Vol1で構築したEventBridge → Step Functions の実行トリガー、SQS DLQ → Step Functions コールバックパターン、SNS → AppFlow 通知連携は本記事の各§で具体的なコード付きで解説する。Vol1未読の場合はApplication Integration本番運用Vol1を先に参照することを推奨する。
AWS Step Functions Workflows 本番運用 — Standard vs Express / エラーハンドリング / 分散マップ

sequenceDiagram
participant Trigger as トリガ (API/EventBridge)
participant SFN as Step Functions
participant Lambda as Lambda タスク
participant DDB as DynamoDB
participant DLQ as DLQ
Trigger->>SFN: 実行開始 (StartExecution)
SFN->>Lambda: タスク1実行
Lambda-->>SFN: 結果
SFN->>DDB: タスク2 (SDK直接統合)
DDB-->>SFN: 結果
SFN->>SFN: Catch/Retry エラーハンドリング
alt 失敗時
SFN->>DLQ: 失敗イベント送信
end
Note over SFN: Standard: 長期(最大1年)<br/>Express: 高頻度(最大5分)
AWS Step Functions は AWS が提供するフルマネージドのステートマシンサービスである。Amazon States Language (ASL) と呼ばれるJSON/YAML形式の定義ファイルで処理フロー(ステートマシン)を記述し、各ステートの遷移条件・エラー処理・並行実行をコード量を最小化して実装できる。
Standard Workflow vs Express Workflow — 5軸比較
Step Functions は用途によって使い分ける2種類のワークフロータイプを提供する。
| 比較軸 | Standard Workflow | Express Workflow |
|---|---|---|
| 最大実行時間 | 最大1年 | 最大5分 |
| 課金モデル | ステート遷移回数 × 単価 | 実行数 + 実行時間(GB秒) |
| スループット | 2,000 実行/秒 | 100,000 実行/秒以上 |
| 履歴保存 | 90日間 (Step Functions コンソール) | CloudWatch Logs へ転送 |
| 主なユースケース | 長期ETL・人手承認フロー・注文処理 | 高頻度API・IoTデータ処理・ストリーム変換 |
| 実行の一意性 | 重複実行なし (Exactly-once 保証) | At-least-once (冪等設計が必須) |
Standard Workflow の選択基準:
- 処理時間が5分を超える可能性がある場合(バッチETL、レポート生成等)
- 人手承認フロー(コールバック Wait for Task Token)を含む場合
- 実行履歴をコンソールで可視化してデバッグしたい場合
- 注文処理・決済フローなど正確に1回の実行保証が必要な場合
Express Workflow の選択基準:
- 1リクエスト5分以内に完結する高頻度処理(API Gateway → Step Functions)
- IoTデバイスからの大量センサーデータ変換・ルーティング
- 1秒間に数千〜数万件のリアルタイム処理が必要な場合
Standard: 月 4,000 ステート遷移まで無料。超過分は 1,000 遷移あたり $0.025。ステートマシンの各ステート移動(Task/Choice/Wait/Parallel/Map 等)が1遷移としてカウントされる。ループや分岐が多いステートマシンは遷移数が膨らむため、不要なパスバイステートを削減するリファクタリングが効く。
Express: 月 1,000,000 実行まで無料。超過分は 100 万実行あたり $1.00 + 持続時間 (GB秒) $0.00001。実行時間が短く超高頻度の処理に適している。
費用試算例: 1日 100 万件の IoT センサー処理(1件あたり平均 2 秒、64MB使用)
→ 実行費用: $1.00/日 + 持続時間費用: 1,000,000 × 2秒 × 64/1024GB × $0.00001 ≒ $1.25/日 = 約$2.25/日
エラーハンドリング — Retry / Catch / DLQ
Step Functions の耐障害設計の核心は Retry と Catch の組み合わせである。
Retry 設定:
{
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:ProcessOrder",
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException", "Lambda.AWSLambdaException"],
"IntervalSeconds": 2,
"BackoffRate": 2.0,
"MaxAttempts": 3,
"JitterStrategy": "FULL"
},
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 5,
"BackoffRate": 1.5,
"MaxAttempts": 2
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleError",
"ResultPath": "$.error"
}
]
}
| Retry パラメーター | 説明 | 推奨値 |
|---|---|---|
IntervalSeconds | 初回リトライまでの待機秒数 | 1〜5秒 |
BackoffRate | リトライ間隔の乗数 (指数バックオフ) | 2.0 |
MaxAttempts | 最大リトライ回数 | 3 (べき等操作) |
JitterStrategy | ランダムジッター (FULL/NONE) | FULL (Thundering Herd 防止) |
Catch でのフォールバック遷移:
{
"Catch": [
{
"ErrorEquals": ["CustomError.PaymentDeclined"],
"Next": "NotifyPaymentFailed",
"ResultPath": "$.error"
},
{
"ErrorEquals": ["States.Timeout"],
"Next": "TimeoutHandler",
"ResultPath": "$.timeoutError"
},
{
"ErrorEquals": ["States.ALL"],
"Next": "SendToDLQ",
"ResultPath": "$.unhandledError"
}
]
}
DLQ 送信ステート(SQS 直接統合):
{
"SendToDLQ": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage",
"Parameters": {
"QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789012/OrderProcessingDLQ",
"MessageBody.$": "States.JsonToString($)",
"MessageAttributes": {
"ErrorType": {
"DataType": "String",
"StringValue.$": "$.unhandledError.Error"
}
}
},
"End": true
}
}
ResultPath: "$.error" を指定することで、エラー情報を元の入力に追記しながら次のステートへ渡すことができる。ResultPath: null にすると元の入力をそのまま次ステートへ渡す。
分散マップ (Distributed Map) — 大規模並列処理
Distributed Map は 10,000 並行子ワークフローを実行できる Step Functions の大規模並列処理機能である。S3 内のオブジェクト一覧を入力として各行/各ファイルを並列処理するユースケースに最適。
Distributed Map ASL 定義:
{
"ProcessS3Records": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "ProcessRecord",
"States": {
"ProcessRecord": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:123456789012:function:ProcessRecord",
"Payload.$": "$"
},
"End": true
}
}
},
"ItemReader": {
"Resource": "arn:aws:states:::s3:getObject",
"ReaderConfig": {
"InputType": "CSV",
"CSVHeaderLocation": "FIRST_ROW"
},
"Parameters": {
"Bucket.$": "$.bucket",
"Key.$": "$.key"
}
},
"MaxConcurrency": 1000,
"ToleratedFailureCount": 10,
"ToleratedFailurePercentage": 1,
"ResultWriter": {
"Resource": "arn:aws:states:::s3:putObject",
"Parameters": {
"Bucket.$": "$.outputBucket",
"Prefix": "results/"
}
},
"Next": "AggregateResults"
}
}
| Distributed Map パラメーター | 説明 |
|---|---|
Mode: DISTRIBUTED | 子ステートマシンとして独立実行 |
MaxConcurrency | 同時実行上限 (最大 10,000) |
ToleratedFailureCount | 許容する絶対失敗数 |
ToleratedFailurePercentage | 許容する失敗率 (%) |
ItemReader | S3 CSV/JSON/JSONL をソースとして読み込む |
ResultWriter | 各子実行の結果を S3 に集約出力 |
ToleratedFailureCount と ToleratedFailurePercentage を設定することで、全体の一部が失敗してもステートマシン全体が中断されるのを防ぎ、バッチ処理の部分完了を許容できる。
コールバック (Wait for Task Token) — 人手承認フロー
Wait for Task Token は外部システムや人手の応答を待機する非同期パターンである。SQS / SNS / EventBridge / API Gateway 等を通じて外部システムに Task Token を渡し、承認後に SendTaskSuccess / SendTaskFailure を呼び出してステートマシンを再開する。
人手承認フローの ASL:
{
"WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789012/ApprovalQueue",
"MessageBody": {
"TaskToken.$": "$$.Task.Token",
"OrderId.$": "$.orderId",
"Amount.$": "$.amount",
"ApprovalUrl.$": "States.Format('https://approval.example.com/approve?token={}', $$.Task.Token)"
}
},
"HeartbeatSeconds": 86400,
"TimeoutSeconds": 172800,
"Next": "ProcessApprovedOrder",
"Catch": [
{
"ErrorEquals": ["States.HeartbeatTimeout", "States.Timeout"],
"Next": "HandleApprovalTimeout"
}
]
}
}
承認者がコールバックを送信する Lambda:
import boto3
sfn_client = boto3.client('stepfunctions')
def lambda_handler(event, context):
task_token = event['taskToken']
approved = event.get('approved', False)
if approved:
sfn_client.send_task_success(
taskToken=task_token,
output='{"approvalStatus": "APPROVED", "approvedBy": "' + event['approver'] + '"}'
)
else:
sfn_client.send_task_failure(
taskToken=task_token,
error='ApprovalRejected',
cause='Approver rejected the request'
)
$$.Task.Token は ASL の組み込み変数で現在のタスクトークンを参照する。HeartbeatSeconds を設定すると、指定秒数内にコールバックがない場合にタイムアウトエラーが発生する。
SDK 直接統合 (270+ AWSサービス)
Step Functions の SDK 統合により、Lambda を介さずに DynamoDB / EventBridge / S3 / SQS / SNS など270以上のAWSサービスを直接呼び出せる。Lambda 関数の作成・デプロイ・管理コストを削減し、ステートマシンの処理速度も向上する。
DynamoDB 直接書き込み:
{
"SaveToDatabase": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "Orders",
"Item": {
"orderId": {"S.$": "$.orderId"},
"status": {"S": "PROCESSING"},
"updatedAt": {"S.$": "$$.Execution.StartTime"},
"amount": {"N.$": "States.Format('{}', $.amount)"}
}
},
"ResultPath": null,
"Next": "PublishEvent"
}
}
EventBridge へのイベント発行:
{
"PublishEvent": {
"Type": "Task",
"Resource": "arn:aws:states:::events:putEvents",
"Parameters": {
"Entries": [
{
"EventBusName": "order-events",
"Source": "order.processing",
"DetailType": "OrderStatusChanged",
"Detail": {
"orderId.$": "$.orderId",
"status": "PROCESSING",
"timestamp.$": "$$.Execution.StartTime"
}
}
]
},
"End": true
}
}
JSONata 式によるペイロード変換:
Step Functions の ASL v2 では States.Format / States.StringToJson / States.JsonToString に加え、JSONata 式によるインライン変換が利用できる。
{
"TransformPayload": {
"Type": "Pass",
"Parameters": {
"orderSummary.$": "States.Format('Order #{} - {} JPY ({})', $.orderId, $.amount, $.status)",
"itemCount.$": "States.ArrayLength($.items)",
"firstItem.$": "States.ArrayGetItem($.items, 0)",
"totalWithTax.$": "States.MathAdd($.amount, States.MathMultiply($.amount, 0.1))"
},
"Next": "SendNotification"
}
}
1. ワークフロータイプの選択: 5分超・人手承認・正確に1回の実行保証が必要 → Standard。高頻度・5分以内・冪等 → Express。
2. Retry にジッターを設定: JitterStrategy: FULL で複数ステートマシンの同時リトライによる集中アクセスを分散させる。
3. ResultPath の設計: エラーをキャッチした後のステートで元データを保持したい場合は ResultPath: "$.error" を指定する。ResultPath: null にすると元の入力がそのまま渡される。
4. Distributed Map の失敗許容率: バッチ処理の性質に応じて ToleratedFailurePercentage を設定する。0 に設定すると1件の失敗で全体が停止するため、大規模バッチでは 1〜5% を目安にする。
5. Task Token の有効期限管理: コールバックパターンでは HeartbeatSeconds を必ず設定する。設定なしでは承認者が応答しない場合にステートマシンが永久に停止状態となる。
6. SDK統合 vs Lambda: 単純なAWSサービス呼び出しにはSDK統合を使う。ビジネスロジック・データ変換・複数API呼び出しの組み合わせにはLambdaを使う。
Amazon AppFlow 本番運用 — Salesforce/SAP/Slack統合 / スケジュールトリガ

対応コネクタ一覧
AppFlow は以下のカテゴリに分類されたコネクタをネイティブサポートしている。
| カテゴリ | 代表コネクタ |
|---|---|
| CRM/営業 | Salesforce / HubSpot / Zendesk |
| ERP/基幹 | SAP OData / SAP S/4HANA Cloud |
| コラボレーション | Slack / ServiceNow / Jira Cloud |
| マーケティング | Marketo / Google Analytics / Amplitude |
| 送信先 (Destination) | Amazon S3 / Redshift / Salesforce / EventBridge / Snowflake |
Salesforce コネクタは商談 (Opportunity)・リード (Lead)・ケース (Case)・カスタムオブジェクトを対象にでき、SAP OData はエンティティセット単位でクエリが可能。Slack コネクタはチャンネルメッセージの抽出やユーザーディレクトリ同期に使われる。
フロー設定
AppFlow のフロー実行は 3 種類のトリガを選択できる。
| トリガ種別 | 説明 | 代表ユースケース |
|---|---|---|
| スケジュール | cron / レート式で定期実行 | 日次 Salesforce 商談データ取得 |
| イベント | ソースの変更通知で即時実行 | Salesforce Platform Events / S3 イベント |
| オンデマンド | コンソール / API から手動実行 | 初回フル同期・デバッグ |
データ取得モード
- フル転送 (Full transfer): 毎回ソース全件を取得。初回同期・小規模データセットに適す。
- 増分転送 (Incremental transfer): 最終実行以降に変更されたレコードのみ取得。
LastModifiedDate/ Sequence 番号ベース。ソース側が増分クエリをサポートしている必要がある。
データ変換パイプライン
# AppFlow フロー定義例 (AWS CloudFormation)
FlowTransformationExample:
Type: AWS::AppFlow::Flow
Properties:
FlowName: salesforce-to-s3-daily
TriggerConfig:
TriggerType: Scheduled
TriggerProperties:
ScheduleExpression: "rate(1 days)"
DataPullMode: Incremental
SourceFlowConfig:
ConnectorType: Salesforce
SourceConnectorProperties:
Salesforce:
Object: Opportunity
EnableDynamicFieldUpdate: true
DestinationFlowConfigList:
- ConnectorType: S3
DestinationConnectorProperties:
S3:
BucketName: !Ref DataLakeBucket
S3OutputFormatConfig:
FileType: PARQUET
PrefixConfig:
PrefixType: PATH
PrefixFormat: YEAR_MONTH_DAY
Tasks:
# フィールドマッピング: Salesforce 名 → S3 列名
- TaskType: Map
SourceFields: [Id, Name, Amount, CloseDate, StageName]
ConnectorOperator:
Salesforce: NO_OP
DestinationField: opportunity_id
# フィルタ: 成立済み案件のみ通過
- TaskType: Filter
SourceFields: [StageName]
ConnectorOperator:
Salesforce: EQUAL_TO
TaskProperties:
- Key: VALUE
Value: "Closed Won"
# マスキング: 金額フィールドを伏字
- TaskType: Mask
SourceFields: [Amount]
ConnectorOperator:
Salesforce: MASK_ALL
変換パイプライン要素
| 変換タイプ | 説明 |
|---|---|
| Map | ソースフィールド名を宛先スキーマへ対応付け |
| Filter | 条件に合うレコードのみ転送 |
| Mask | 機密フィールドを **** で置換 |
| Validate | フィールド値の型・長さを検証。不正データは別の S3 パスに隔離 |
| Concatenate | 複数フィールドを結合して 1 フィールドに出力 |
| Arithmetic | 数値演算 (加算・乗算等) |
増分転送を利用するにはソースコネクタが変更検知カラムをサポートしている必要がある。Salesforce の場合は LastModifiedDate、SAP OData では ModifiedAt フィールドを AppFlow が内部参照する。フロー設定で DataPullMode: Incremental を指定しても、ソースオブジェクトが対応していない場合は自動的にフルスキャンにフォールバックするため、初回実行後のメタデータ (Last export time) で動作を確認すること。
大規模初回同期 (100 万件超) では日付範囲別に複数フローを並列定義し、Salesforce API クォータ (24 時間ウィンドウ / 15 分ウィンドウ) を分散させる設計が有効。
プライベート接続 (PrivateLink 統合)
VPC 内のデータソースや社内オンプレミスシステムと AppFlow を接続する場合、PrivateLink 経由のプライベート接続を使用するとインターネットを経由しない安全なデータ転送が実現できる。
# プライベート接続プロファイル作成 (AWS CLI)
aws appflow create-connector-profile \
--connector-profile-name salesforce-prod \
--connector-type Salesforce \
--connection-mode Private \
--connector-profile-config '{
"connectorProfileProperties": {
"Salesforce": {
"instanceUrl": "https://myorg.my.salesforce.com",
"isSandboxEnvironment": false
}
}
}'
| 接続モード | 通信経路 | 代表ユースケース |
|---|---|---|
| Public | インターネット経由 | Salesforce / Slack など SaaS |
| Private | AWS PrivateLink / VPC | オンプレ / VPC 内カスタムコネクタ |
カスタムコネクタを VPC 内に構築する場合は、AppFlow が生成する VPC エンドポイントに対してセキュリティグループとルートテーブルを適切に設定する必要がある。
エラーハンドリング (失敗データの S3 保存)
AppFlow フローで宛先への書き込みに失敗したレコードは S3 の専用バケットに自動保存できる。
{
"errorHandlingConfig": {
"failOnFirstDestinationError": false,
"bucketName": "appflow-error-records",
"bucketPrefix": "errors/salesforce-to-s3/"
}
}
failOnFirstDestinationError: false— 1 件失敗してもフロー全体を継続実行- 失敗レコードは
s3://appflow-error-records/errors/salesforce-to-s3/YYYY/MM/DD/に CSV 形式で保存 - CloudWatch メトリクス
RecordsProcessed/RecordsFailed/FlowExecutionsStartedでフロー健全性を監視
アラーム設定例
# 失敗レコード数アラーム作成
aws cloudwatch put-metric-alarm \
--alarm-name appflow-record-failed \
--metric-name RecordsFailed \
--namespace AWS/AppFlow \
--dimensions Name=FlowName,Value=salesforce-to-s3-daily \
--statistic Sum \
--period 3600 \
--threshold 100 \
--comparison-operator GreaterThanOrEqualToThreshold \
--evaluation-periods 1 \
--alarm-actions arn:aws:sns:ap-northeast-1:123456789012:appflow-alert
1. Salesforce API クォータ管理: AppFlow は Salesforce Bulk API 2.0 を使用するため、クォータ消費はバルクジョブ単位で計算される。1 フロー実行あたり最大 50,000 件のバッチが複数発行されるため、並列フロー数に注意する。
2. Parquet 形式の活用: S3 宛先では PARQUET 形式を指定すると Athena / Redshift Spectrum でのクエリコストを大幅削減できる。CSV との比較で圧縮率 3〜10 倍が期待できる。
3. EventBridge 宛先: 増分データをリアルタイムに後続サービスへ連携する場合は S3 の代わりに EventBridge を宛先にして Lambda / Step Functions をトリガする構成が低レイテンシ。
4. フロー実行ログ: CloudWatch Logs の /aws/appflow/{FlowName} にデバッグ情報が出力される。障害調査時は errorMessage フィールドを最初に確認する。
Amazon MQ 本番運用 — ActiveMQ vs RabbitMQ / Network of Brokers / JMS/STOMP/MQTT

ActiveMQ vs RabbitMQ 選定
Amazon MQ は Apache ActiveMQ と RabbitMQ の 2 種類のブローカーエンジンを提供する。
| 比較軸 | ActiveMQ | RabbitMQ |
|---|---|---|
| プロトコル | JMS / OpenWire / STOMP / AMQP 1.0 / MQTT / WebSocket | AMQP 0-9-1 / MQTT / STOMP |
| メッセージモデル | キュー (P2P) + トピック (Pub/Sub) | Exchange → Routing Key → Queue |
| 主なユースケース | オンプレ ActiveMQ 移行・JMS 既存アプリ | 高スループット・柔軟ルーティング |
| 最大インスタンス | mq.m5.4xlarge | mq.m5.4xlarge |
| デフォルトポート | 61616 (OpenWire) / 8162 (Web Console) | 5671 (AMQPS) / 15671 (Management) |
選定指針
- オンプレミスの ActiveMQ / IBM MQ を AWS へ移行する場合 → ActiveMQ (JMS 互換性が高い)
- 新規構築でメッセージルーティングの柔軟性を重視する場合 → RabbitMQ (Exchange/Binding モデル)
- MQTT デバイス統合とキュー処理を同一ブローカーで行う場合 → ActiveMQ (MQTT + JMS 同時対応)
Active/Standby 高可用性構成
Amazon MQ は Single-instance と Active/Standby の 2 構成を提供する。本番環境では Active/Standby を必ず使用する。
[Active Broker] [Standby Broker]
││
├── EFS 共有ストレージ ──┤ (ActiveMQ の場合)
││
└── 自動フェイルオーバー ┘
(約 60〜120 秒)
| 項目 | 内容 |
|---|---|
| 同期方式 | Amazon EFS (ActiveMQ) / EBS レプリカ (RabbitMQ) |
| フェイルオーバー時間 | 約 60〜120 秒 |
| 接続 URL | failover:(ssl://b-xxx-1.mq.region.amazonaws.com:61617,ssl://b-xxx-2.mq.region.amazonaws.com:61617) |
| SLA | 99.9% (Active/Standby 構成時) |
Failover リスナー実装例 (Java)
// ActiveMQ Java クライアント - Failover URI で自動再接続
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"failover:(ssl://b-xxxxxxxx-1.mq.ap-northeast-1.amazonaws.com:61617," +
"ssl://b-xxxxxxxx-2.mq.ap-northeast-1.amazonaws.com:61617)" +
"?initialReconnectDelay=1000&maxReconnectDelay=30000"
);
factory.setUserName("admin");
factory.setPassword(System.getenv("MQ_PASSWORD"));
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination queue = session.createQueue("orders");
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(message -> {
try {
System.out.println("受信: " + ((TextMessage) message).getText());
message.acknowledge(); // CLIENT_ACKNOWLEDGE: 処理完了後に明示 ACK
} catch (JMSException e) {
// ACK しないと再配送される
}
});
Active/Standby 構成でフェイルオーバーが発生すると、アクティブブローカーへの接続が強制切断される。failover:// プロトコルで接続 URL を複数指定することで自動再接続できるが、再接続が完了するまでの約 60〜120 秒間は新規メッセージの送受信が停止する。
この間のデータ損失を防ぐには:
– プロデューサー側: JMSException 発生時に SQS などのフォールバックキューへ送信するリトライロジックを実装する
– コンシューマー側: CLIENT_ACKNOWLEDGE モードを使用し、メッセージ処理完了後に message.acknowledge() を明示的に呼び出す。未 ACK メッセージはフェイルオーバー後に再配送される
Network of Brokers
大規模トラフィックや複数リージョン展開では Network of Brokers (NoB) でブローカーをメッシュ接続する。
メッシュトポロジー — 全ブローカー間が双方向接続
Region ARegion B
┌──────────────────┐┌──────────────────┐
│ Broker-1 (A/S) │◄───────►│ Broker-3 (A/S) │
│ Broker-2 (A/S) │◄───────►│ Broker-4 (A/S) │
└──────────────────┘└──────────────────┘
ハブ&スポーク構成 — 集中ルーティング型。スポークはハブとのみ接続
Hub Broker (集中)
/\
Spoke-1 Spoke-2
(東京 AZ-a) (東京 AZ-c)
<!-- activemq.xml - ネットワークコネクタ定義 -->
<networkConnectors>
<networkConnector
name="hub-connector"
uri="static:(ssl://hub-broker.mq.ap-northeast-1.amazonaws.com:61617)"
duplex="true"
networkTTL="2"
messageTTL="-1"
dynamicOnly="true">
<excludedDestinations>
<queue physicalName="internal.>"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>
| NoB パラメータ | 説明 |
|---|---|
duplex="true" | 双方向転送 (false = 片方向のみ) |
networkTTL | メッセージが跨げる最大ホップ数 (デフォルト 1) |
messageTTL | TTL 変更 (-1 = 元の TTL を維持) |
dynamicOnly | コンシューマーが存在するキューのみ転送 (帯域節約) |
跨リージョン構成では VPC ピアリングまたは Transit Gateway でブローカー間の TCP 経路を確保し、SSL (ssl://) を使用した暗号化接続が必須となる。
JMS / STOMP / MQTT プロトコル対応
Python — STOMP クライアント
import stomp
class OrderListener(stomp.ConnectionListener):
def on_message(self, frame):
print(f"受信: {frame.body}")
self.conn.ack(frame.headers['message-id'], frame.headers['subscription'])
conn = stomp.Connection12(
[('b-xxxxxxxx.mq.ap-northeast-1.amazonaws.com', 61614)],
use_ssl=True
)
conn.set_listener('', OrderListener())
conn.connect('admin', 'password', wait=True)
conn.subscribe('/queue/orders', id=1, ack='client')
# メッセージ送信
conn.send('/queue/orders', '{"orderId": "12345"}', content_type='application/json')
Node.js — MQTT クライアント
const mqtt = require('mqtt');
const client = mqtt.connect(
'mqtts://b-xxxxxxxx.mq.ap-northeast-1.amazonaws.com:8883',
{
username: 'admin',
password: process.env.MQ_PASSWORD,
clientId: `node-client-${Date.now()}`,
rejectUnauthorized: true
}
);
client.on('connect', () => {
client.subscribe('orders/#', { qos: 1 });
});
client.on('message', (topic, message) => {
console.log(`topic: ${topic}, payload: ${message.toString()}`);
// QoS 1: PUBACK を自動送信するため明示 ACK 不要
});
ActiveMQ プロトコル対応ポート一覧
| プロトコル | 標準ポート | TLS ポート |
|---|---|---|
| OpenWire (JMS) | 61616 | 61617 |
| STOMP | 61613 | 61614 |
| AMQP 1.0 | 5672 | 5671 |
| MQTT | 1883 | 8883 |
| WebSocket | 61614 | 61619 |
メンテナンスウィンドウ / エンドポイント分離
メンテナンスウィンドウ設定
Amazon MQ は週次メンテナンスウィンドウ内でマイナーバージョンの自動アップグレードを実施する。本番環境ではトラフィックが最低の時間帯に設定する。
# メンテナンスウィンドウを日曜 04:00 JST に設定
aws mq update-broker \
--broker-id b-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx \
--maintenance-window-start-time \
DayOfWeek=SUNDAY,TimeOfDay=04:00,TimeZone=Asia/Tokyo \
--auto-minor-version-upgrade true
エンドポイント分離 (VPC 内 vs パブリック)
| 配置オプション | アクセス経路 | 代表ユースケース |
|---|---|---|
| VPC 内 (推奨) | ENI 直接 / VPC ピアリング | 本番システム・機密データ転送 |
| パブリック | インターネット (TLS 必須) | 開発・検証・外部パートナー連携 |
VPC 内配置を選択した場合、ブローカーのエンドポイント URL は b-xxxxxxxx.mq.region.amazonaws.com ではなく、VPC 内の ENI IP として解決される。セキュリティグループはブローカーのポート (61617 等) に対して必要なソース IP / セキュリティグループのみを許可する。
1. Active/Standby 必須: Single-instance はメンテナンス時にダウンタイムが発生する。本番は必ず Active/Standby を選択する。
2. ブローカーサイジング: mq.m5.large を基準に、メッセージサイズ × スループットで上位サイズを検討する。ActiveMQ は Heap 使用率 (CloudWatch: HeapUsage) が 70% を超えると処理が詰まり始める。
3. 接続数上限: ActiveMQ はブローカーサイズ別に最大接続数の制限がある (mq.m5.large = 1,000 接続)。マイクロサービスが多数接続する場合はコネクションプールを必ず使用する。
4. ストレージ肥大化対策: 消費されないメッセージが蓄積すると EFS ストレージが増大する。デッドレターキューの定期クリーニングとメッセージ TTL の設定を組み合わせて管理する。
5. CloudWatch アラーム優先設定: TotalConsumerCount=0 (コンシューマー消失) と StorePercentUsage > 80 (ストレージ逼迫) を最優先でアラーム設定する。
Amazon MWAA 本番運用 — Airflow DAG / Worker/Scheduler / 実行ロール
Airflow DAG設計パターン
Amazon MWAA (Managed Workflows for Apache Airflow) は、Apache Airflowをフルマネージドで提供します。DAGファイルはS3のdags/フォルダに配置し、MWAAが自動的に同期します。
DAGファイル配置とS3連携
# s3://your-bucket/dags/sample_etl_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='sample_etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *',
start_date=days_ago(1),
catchup=False,
tags=['etl', 'production'],
) as dag:
pass
TaskGroupによるDAG構造化
SubDAGはAirflow 2.x で非推奨のためTaskGroupを使用します。
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
with DAG(dag_id='structured_etl', schedule_interval='@daily',
start_date=days_ago(1), catchup=False) as dag:
with TaskGroup('extract') as extract_group:
extract_s3 = PythonOperator(task_id='extract_s3', python_callable=extract_from_s3)
extract_rds = PythonOperator(task_id='extract_rds', python_callable=extract_from_rds)
with TaskGroup('transform') as transform_group:
transform = PythonOperator(task_id='transform', python_callable=transform_data)
with TaskGroup('load') as load_group:
load_s3 = PythonOperator(task_id='load_s3', python_callable=load_to_s3)
extract_group >> transform_group >> load_group
動的DAG生成
# dags/dynamic_pipelines.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
DAG_CONFIGS = [
{'dag_id': 'pipeline_salesforce', 'schedule': '0 1 * * *', 'source': 'salesforce'},
{'dag_id': 'pipeline_s3_hourly','schedule': '0 * * * *', 'source': 's3'},
{'dag_id': 'pipeline_rds_nightly', 'schedule': '0 3 * * *', 'source': 'rds'},
]
def create_dag(config: dict) -> DAG:
with DAG(
dag_id=config['dag_id'],
schedule_interval=config['schedule'],
start_date=days_ago(1),
catchup=False,
) as dag:
PythonOperator(
task_id='process',
python_callable=lambda source=config['source']: process(source),
)
return dag
for cfg in DAG_CONFIGS:
globals()[cfg['dag_id']] = create_dag(cfg)
Worker / Scheduler / Webserver 構成
| 環境クラス | vCPU | メモリ | 推奨用途 |
|---|---|---|---|
| mw1.small | 2 | 4 GB | 開発・PoC |
| mw1.medium | 4 | 8 GB | ステージング |
| mw1.large | 8 | 16 GB | 本番・大規模処理 |
# Terraform — MWAA環境定義
resource "aws_mwaa_environment" "prod" {
name= "prod-mwaa"
airflow_version = "2.8.1"
environment_class = "mw1.large"
min_workers = 2
max_workers = 10
schedulers= 2
source_bucket_arn = aws_s3_bucket.mwaa.arn
dag_s3_path = "dags/"
plugins_s3_path = "plugins/plugins.zip"
execution_role_arn = aws_iam_role.mwaa_execution.arn
airflow_configuration_options = {
"core.parallelism" = "32"
"core.dag_concurrency"= "16"
"core.max_active_runs_per_dag" = "4"
}
network_configuration {
security_group_ids = [aws_security_group.mwaa.id]
subnet_ids= aws_subnet.private[*].id
}
logging_configuration {
dag_processing_logs { enabled = true; log_level = "INFO" }
scheduler_logs{ enabled = true; log_level = "WARNING" }
worker_logs{ enabled = true; log_level = "INFO" }
task_logs { enabled = true; log_level = "INFO" }
}
}
– min_workers ≥ 2: 0設定はコールドスタートで遅延が発生するため本番非推奨
– max_workers: DAGの最大並列タスク数を想定して設定。過剰設定はコスト増大
– schedulers ≥ 2: スケジューラ障害の単一障害点を排除。本番では必ず2以上
– parallelism: 全Worker合計の同時タスク実行上限
– dag_concurrency: 同一DAGの同時タスク実行上限
実行ロール設計 — aws_default接続 / Operator別IAMポリシー
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3DataAccess",
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject", "s3:ListBucket"],
"Resource": [
"arn:aws:s3:::your-data-bucket",
"arn:aws:s3:::your-data-bucket/*"
]
},
{
"Sid": "GlueJobOperator",
"Effect": "Allow",
"Action": ["glue:StartJobRun", "glue:GetJobRun"],
"Resource": "arn:aws:glue:ap-northeast-1:123456789012:job/*"
},
{
"Sid": "StepFunctionsOperator",
"Effect": "Allow",
"Action": ["states:StartExecution", "states:DescribeExecution"],
"Resource": "arn:aws:states:ap-northeast-1:123456789012:stateMachine:*"
},
{
"Sid": "SecretsAccess",
"Effect": "Allow",
"Action": ["secretsmanager:GetSecretValue"],
"Resource": "arn:aws:secretsmanager:ap-northeast-1:123456789012:secret:airflow/*"
}
]
}
DAG管理 — Git連携 / CI/CD / 動的パラメータ
GitHub ActionsによるDAG自動デプロイ
# .github/workflows/deploy_dags.yaml
name: Deploy DAGs to MWAA
on:
push:
branches: [main]
paths: ['dags/**']
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install apache-airflow==2.8.1
- name: DAG構文チェック
run: |
python -c "
import glob, importlib.util, sys
errors = []
for f in glob.glob('dags/**/*.py', recursive=True):
try:
spec = importlib.util.spec_from_file_location('dag', f)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
except Exception as e:
errors.append(f'{f}: {e}')
if errors:
print('ERRORS:', errors); sys.exit(1)
"
deploy:
needs: validate
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: arn:aws:iam::123456789012:role/github-actions-mwaa
aws-region: ap-northeast-1
- run: |
aws s3 sync dags/ s3://your-mwaa-bucket/dags/ \
--delete --exclude "*.pyc" --exclude "__pycache__/*"
Variables と Connections — Secrets Manager統合
from airflow.models import Variable
from airflow.hooks.base import BaseHook
env = Variable.get('environment',default_var='dev')
s3_bucket = Variable.get('data_bucket',default_var='dev-data-bucket')
batch_size = int(Variable.get('batch_size', default_var='100'))
conn= BaseHook.get_connection('redshift_prod')
host= conn.host
schema = conn.schema
# Secrets Managerバックエンド設定
secrets.backend: "airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend"
secrets.backend_kwargs: '{"connections_prefix":"airflow/connections","variables_prefix":"airflow/variables"}'
プラグイン管理 — カスタムOperator / Hook
# plugins/operators/data_quality_operator.py
from airflow.models import BaseOperator
class DataQualityOperator(BaseOperator):
def __init__(self, table: str, threshold: float = 0.95, *args, **kwargs):
super().__init__(*args, **kwargs)
self.table = table
self.threshold = threshold
def execute(self, context):
score = self._check_quality(self.table)
if score < self.threshold:
raise ValueError(f'品質スコア不足: {score:.2f} < {self.threshold}')
return score
def _check_quality(self, table: str) -> float:
return 0.98
# scripts/deploy_plugins.py
import boto3, subprocess, os
def deploy_plugins(bucket: str, prefix: str = 'plugins/plugins.zip'):
subprocess.run(
['zip', '-r', '/tmp/plugins.zip', '.', '-x', '*.pyc', '-x', '__pycache__/*'],
cwd='plugins/', check=True
)
boto3.client('s3').upload_file('/tmp/plugins.zip', bucket, prefix)
if __name__ == '__main__':
deploy_plugins(os.environ['MWAA_BUCKET'])
CloudWatchメトリクス監視
| メトリクス | 説明 | 推奨しきい値 |
|---|---|---|
| DAGFileProcessingTotalParseTime | DAGファイル解析時間 | ≤30秒 |
| RunningTasks | 実行中タスク数 | max_workers×0.9超でアラート |
| QueuedTasks | キュー待機タスク数 | ≥10件で警告 |
| SchedulerHeartbeat | スケジューラ死活 | 0回/分でクリティカル |
# CloudWatchアラーム — スケジューラ死活監視 (Terraform)
resource "aws_cloudwatch_metric_alarm" "scheduler_heartbeat" {
alarm_name = "mwaa-scheduler-heartbeat"
comparison_operator = "LessThanThreshold"
evaluation_periods = 3
metric_name= "SchedulerHeartbeat"
namespace = "AmazonMWAA"
period = 60
statistic = "Sum"
threshold = 1
treat_missing_data = "breaching"
dimensions = { Environment = "prod-mwaa", Function = "Scheduler" }
alarm_actions = [aws_sns_topic.critical_alerts.arn]
}
デプロイ前確認:
– DAG構文チェックをCI/CDで自動化(構文エラーは全Worker停止の原因)
– catchup=False 設定(意図しない過去分一括実行防止)
– max_active_runs 設定(同一DAGの同時実行数制御)
– 実行ロールに最小権限ポリシーのみ付与
本番稼働中確認:
– SchedulerHeartbeatアラームで停止を即時検知
– QueuedTasksが継続増加する場合はmax_workers引き上げを検討
– S3 dags/バケットのバージョニング有効化(DAGロールバック対応)
– Secrets Managerシークレットのローテーション設定
詰まりポイント7選 + アンチパターン→正解パターン変換
詰まり①: Step Functions Standard料金爆発
症状: 高頻度APIコール処理をStandard Workflowで実装したところ、月次コストが想定の50倍に膨張。
原因: Standard Workflowはステート遷移回数課金。毎秒1,000回×10ステート = 月3,000万回遷移。
解決策: 処理時間5分以内・高頻度・冪等設計可能 → Express Workflowに変更。
{
"Comment": "Express Workflow — 高頻度APIコール向け",
"StartAt": "CallAPI",
"States": {
"CallAPI": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:api-caller",
"End": true
}
}
}
判断基準: 処理時間 > 5分 または Exactly-once必須 → Standard。それ以外 → Express。
詰まり②: AppFlow タイムゾーン誤設定
症状: 毎日JST09:00に同期するよう設定したが、実際はJST00:00(UTC15:00)に実行。
原因: AppFlowのスケジュール設定は常にUTC基準。JST09:00 = UTC00:00を設定する必要がある。
import boto3
from datetime import datetime, timezone
client = boto3.client('appflow', region_name='ap-northeast-1')
# JST 09:00 = UTC 00:00 で設定
response = client.create_flow(
flowName='salesforce-to-s3-daily',
triggerConfig={
'triggerType': 'Scheduled',
'triggerProperties': {
'Scheduled': {
'scheduleExpression': 'rate(1days)',
'dataPullMode': 'Incremental',
'scheduleStartTime': datetime(2026, 5, 28, 0, 0, 0, tzinfo=timezone.utc),
}
}
},
)
予防策: AppFlow設定ドキュメントに「スケジュール時刻はUTC基準」と明記。
詰まり③: Amazon MQ Network of Brokersでメッシュループ
症状: 東京↔大阪ブローカーを双方向コネクタで接続したところ、転送済みメッセージが再転送されキューが急増。
原因: Broker A→BとB→Aの双方向コネクタを設定した際にメッシュループが発生。
解決策: duplex=false を明示し、一方向(ハブ→スポーク)で設定。
<!-- activemq.xml — ネットワークコネクタ正設定 -->
<networkConnectors>
<networkConnector
name="tokyo-to-osaka"
uri="static:(ssl://osaka-broker:61617)"
duplex="false"
networkTTL="2"
dynamicOnly="true">
<excludedDestinations>
<queue physicalName="response.*" />
</excludedDestinations>
</networkConnector>
</networkConnectors>
詰まり④: MWAA DAG構文エラーで全Worker停止
症状: DAGファイルを更新してS3にアップロードしたところ、全DAGのスケジューリングが停止。
原因: 構文エラーのあるDAGファイルがDAGプロセッサに読み込まれ、正常なDAGも巻き込んで停止。
解決策:
1. 構文エラーのDAGファイルをS3から即座に削除
2. DAGFileProcessingTotalParseTime メトリクスで異常検知(通常1-5秒が30秒超)
3. CI/CDパイプラインでDAG構文チェックを必須化
詰まり⑤: Step Functions 分散マップで子ステート爆発
症状: S3の100万件オブジェクトに分散マップを実行したところ TooManyRequests エラーで停止。
原因: MaxConcurrency 未設定で全アイテムの子実行を同時開始しようとしAPIレート制限に抵触。
{
"Type": "Map",
"ItemReader": {
"Resource": "arn:aws:states:::s3:listObjectsV2",
"Parameters": { "Bucket": "your-bucket", "Prefix": "data/" }
},
"MaxConcurrency": 100,
"ToleratedFailurePercentage": 10,
"ItemBatcher": {
"MaxItemsPerBatch": 100
},
"Iterator": {
"StartAt": "ProcessBatch",
"States": {
"ProcessBatch": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:batch-processor",
"End": true
}
}
}
}
詰まり⑥: AppFlow Salesforce認証期限切れ
症状: 毎日正常だったSalesforce→S3フローが突然失敗。ConnectorAuthenticationException: OAuth token expired。
原因: Salesforce管理者がOAuth接続アプリのリフレッシュトークン有効期限を90日→15日に変更。
解決策:
1. AppFlowコンソール → 接続 → Salesforce接続を再認証
2. Salesforce OAuth接続アプリのリフレッシュトークン有効期限を確認・調整
3. CloudWatch EventsでAppFlowフロー失敗を検知するアラートを設定
# Lambda — AppFlowフロー失敗通知
import boto3
def handler(event, context):
detail = event.get('detail', {})
if detail.get('status') == 'Execution Failed':
flow_name = detail.get('flowName', 'unknown')
error = detail.get('errorInfo', {}).get('executionMessage', '')
boto3.client('sns').publish(
TopicArn='arn:aws:sns:ap-northeast-1:123456789012:alerts',
Subject=f'AppFlowフロー失敗: {flow_name}',
Message=f'フロー: {flow_name}\nエラー: {error}',
)
詰まり⑦: MWAA タイムスタンプ不整合でDAG重複実行
症状: 夏時間切り替え直後、ETL DAGが1日分を2回実行しデータウェアハウスにレコードが重複。
原因: schedule_interval にJSTを想定したcron式を設定していたが、AirflowはUTC基準で動作。夏時間の切り替わりで同一 execution_date の重複実行が発生。
解決策: スケジュールはUTC基準で設計し、pendulum でタイムゾーンを明示する。
import pendulum
from airflow import DAG
local_tz = pendulum.timezone('Asia/Tokyo')
with DAG(
dag_id='timezone_safe_etl',
schedule_interval='0 0 * * *', # UTC 00:00 = JST 09:00
start_date=pendulum.datetime(2026, 1, 1, tz=local_tz),
catchup=False,
) as dag:
pass
冪等設計: execution_date をパーティションキーにしてUPSERTし、重複実行を無害化する。
アンチパターン→正解パターン変換 5問
Q1: Step Functions — Retry/Catch未設定
// アンチパターン: Lambda呼び出しにRetry/Catchなし
{ "Type": "Task", "Resource": "arn:aws:lambda:...:function:processor", "End": true }
// 正解パターン: Retry + Catch + DLQ送信
{
"Type": "Task",
"Resource": "arn:aws:lambda:...:function:processor",
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException", "Lambda.TooManyRequestsException"],
"IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2
}
],
"Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "SendToDLQ" }],
"Next": "Success"
}
Q2: AppFlow — スケジュール過剰実行
# アンチパターン: 毎分スケジュール実行(不必要なAPI呼び出し)
trigger_config = {'triggerType': 'Scheduled', 'scheduleExpression': 'rate(1minutes)'}
# 正解パターン: イベントトリガ(データ更新時のみ実行)
trigger_config = {'triggerType': 'Event'}
Q3: Amazon MQ — 接続数上限未設定
<!-- アンチパターン: maxConnections未設定でブローカー過負荷 -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
<!-- 正解パターン: 接続数上限とHeartbeat設定 -->
<transportConnector
name="openwire"
uri="tcp://0.0.0.0:61616?maximumConnections=500&wireFormat.maxFrameSize=104857600"
updateClusterClients="true"/>
Q4: MWAA — catchup=Trueによる過去分一括実行
# アンチパターン: catchup=True(デフォルト)で720回のDAGRunが即座に開始
with DAG(dag_id='etl', schedule_interval='@hourly', start_date=days_ago(30)) as dag:
pass
# 正解パターン: catchup=False で最新分のみ実行
with DAG(dag_id='etl', schedule_interval='@hourly',
start_date=days_ago(30), catchup=False) as dag:
pass
Q5: Step Functions — 分散マップのエラー許容未設定
// アンチパターン: ToleratedFailurePercentage未設定 → 1件失敗で全体停止
{ "Type": "Map", "MaxConcurrency": 100, "Iterator": {} }
// 正解パターン: 10%失敗まで許容し部分成功を継続
{
"Type": "Map",
"MaxConcurrency": 100,
"ToleratedFailurePercentage": 10,
"ToleratedFailureCount": 1000,
"Iterator": {}
}
Vol1×Vol2統合アーキテクチャ + 選定フローチャート
sequenceDiagram
participant Vol1 as Vol1 メッセージング基盤
participant Vol2 as Vol2 ワークフロー・統合
participant Prod as 本番App Integration環境
Vol1->>Prod: SQS/SNS/EventBridge/API GW/AppSync基盤
Vol2->>Prod: Step Functions/AppFlow/MQ/MWAA高度化
Prod-->>Vol1: 単純メッセージング → Vol2でワークフロー化
Prod-->>Vol2: 複雑統合 → Vol1基盤を活用
Note over Vol1,Vol2: App Integration完全体<br/>Foundation → Orchestration
Vol1(SQS/SNS/EventBridge/API Gateway/AppSync)で確立したメッセージング基盤に、Vol2のオーケストレーション層を統合すると、シンプルなメッセージ転送から複雑なマルチサービス連携まで対応できるApplication Integration完全体が完成する。
本章では4つの代表的な統合パターンを具体的なAWS設定コードと共に解説し、「どのサービスを選ぶか」の判断基準を体系化したフローチャートを提示する。Cloud Architectは4パターンを使い分けることで、要件ごとに最適なサービス組み合わせを即座に選定できるようになる。
パターン1 — API GW → Step Functions Express → SQS (Vol1) → Lambda
同期APIリクエストをワークフロー化し、Vol1のSQSキュー経由でLambdaに渡すパターン。API Gatewayがフロントエンドとして機能し、Step Functions Expressがバリデーション・ルーティング・エラーハンドリングを担当する。Expressは最大5分・高頻度実行に特化しており、APIレスポンスを待たずに処理をキューイングできる。
- Eコマース注文処理: API GW受信 → SFN Express(在庫確認・バリデーション)→ SQS → 決済Lambda
- フォーム送信パイプライン: API GW → SFN Express(バリデーション・フィルタ)→ SQS(Vol1)→ 通知Lambda
- リアルタイム通知連携: API GW同期リクエスト → SFN Express(マルチステップ処理)→ SNS(Vol1)→ 各チャネル配信
- メリット: API GWのリクエスト/レスポンスとSQSの非同期処理を完全分離。Step Functionsで可視化・リトライを一元管理できる。
{
"Comment": "API GW → Step Functions Express → SQS → Lambda パターン",
"StartAt": "ValidateRequest",
"States": {
"ValidateRequest": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "validate-request",
"Payload.$": "$"
},
"Next": "RouteToQueue",
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": ["ValidationError"],
"Next": "ReturnValidationError"
}
]
},
"RouteToQueue": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage",
"Parameters": {
"QueueUrl": "https://sqs.ap-northeast-1.amazonaws.com/123456789012/process-queue.fifo",
"MessageBody.$": "States.JsonToString($.payload)",
"MessageGroupId.$": "$.customerId",
"MessageDeduplicationId.$": "$.requestId"
},
"End": true
},
"ReturnValidationError": {
"Type": "Fail",
"Error": "ValidationFailed",
"Cause": "Input validation failed"
}
}
}
API Gateway → Step Functions Express の接続はHTTP API + EventBridge Pipe、または API Gateway → Lambda(起動処理)→ SFN Express の2段階で構成できる。Express WorkflowはCloudWatch Logsへのリアルタイム実行ログ出力に対応しており、リクエストごとのトレースが容易になる。
パターン2 — EventBridge → AppFlow (Vol2) → S3 → Step Functions
EventBridgeのスケジュールイベントでAppFlowをトリガし、Salesforce/SAP等のSaaSからS3へのデータ同期完了後にStep Functionsでデータ変換パイプラインを起動するパターン。AppFlowの実行完了イベントはEventBridgeに自動送信されるため、ポーリング不要でS3到着後の処理を即座に開始できる。
{
"source": ["aws.appflow"],
"detail-type": ["AppFlow End Flow Run Report"],
"detail": {
"status": ["Execution Successful"],
"flow-name": ["salesforce-to-s3-daily"]
}
}
上記EventBridgeルールにStep FunctionsをターゲットとしてIAMロール付きで設定することで、AppFlow完了 → Step Functions自動起動が実現する。AppFlowのデータ変換機能(マスク・連結・算術演算)でSalesforceカスタムフィールドをParquet形式に変換し、Step Functions Standardでデータ品質チェック・DynamoDB書き込み・SNS通知を一連のワークフローとして管理できる。
import boto3
import json
def start_appflow_and_setup_monitor(flow_name: str, sm_arn: str) -> dict:
"""AppFlowフロー手動起動 — 完了後はEventBridge経由でStep Functionsが自動起動"""
appflow = boto3.client('appflow', region_name='ap-northeast-1')
response = appflow.start_flow(flowName=flow_name)
return {
"flowExecutionId": response['executionId'],
"flowStatus": response['flowStatus'],
"nextAction": "EventBridge自動トリガ待機中",
"targetStateMachine": sm_arn,
"note": "AppFlow完了イベントがEventBridgeに自動送信 → Step Functionsが起動"
}
AppFlow → S3 → Step Functions パターンはSalesforce Opportunity → S3 CSV → ETL処理という典型的なCRM連携シナリオに最適。EventBridge連携により完全サーバーレスのイベントドリブン統合パイプラインが完成する。
パターン3 — SNS Fan-out (Vol1) → Amazon MQ (Vol2) → JMS Subscribers
Vol1のSNS Fan-outパターンにAmazon MQを組み合わせ、JMS対応のエンタープライズシステムをSubscriberとして統合するパターン。SNS Topicへの単一Publishで、AWSネイティブ(SQS + Lambda)とレガシーMQシステム(JMS/STOMP)の両方に同時配信できる。
import boto3
import json
import stomp
def configure_sns_mq_fanout(sns_topic_arn: str, mq_endpoint: str, bridge_queue_url: str) -> None:
"""SNS Fan-out: SQS(Lambdaへ) + Amazon MQ(JMS Subscriberへ) の二股配信設定"""
sns = boto3.client('sns', region_name='ap-northeast-1')
sqs = boto3.client('sqs', region_name='ap-northeast-1')
queue_attrs = sqs.get_queue_attributes(
QueueUrl=bridge_queue_url,
AttributeNames=['QueueArn']
)
bridge_queue_arn = queue_attrs['Attributes']['QueueArn']
sns.subscribe(
TopicArn=sns_topic_arn,
Protocol='sqs',
Endpoint=bridge_queue_arn
)
conn = stomp.Connection(host_and_ports=[(mq_endpoint, 61613)])
conn.connect(login='mquser', passcode='mqpassword', wait=True)
print(f"Connected to Amazon MQ at {mq_endpoint}")
return conn
エンタープライズシステム統合における代表的な組み合わせは以下の通りである:
| レガシーシステム | Vol1サービス | Vol2サービス | 接続プロトコル |
|---|---|---|---|
| IBM MQ | SNS Fan-out | Amazon MQ(ActiveMQ) | JMS/OpenWire |
| RabbitMQ on-prem | SQS | Amazon MQ(RabbitMQ) | AMQP 0.9.1 |
| TIBCO EMS | SNS | Amazon MQ(ActiveMQ) | STOMP |
| ActiveMQ on-prem | SQS + Lambda | Amazon MQ(ActiveMQ) | OpenWire直接移行 |
このパターンはオンプレミスMQシステムからAWSへのマイグレーション期間中の並行稼働に特に有効。既存JMSクライアントを無修正のまま接続でき、ActiveMQ Active/Standby構成と組み合わせることでエンタープライズレベルの高可用性を実現できる。
パターン4 — MWAA → Step Functions Standard → 複合データパイプライン
MWAA(Managed Apache Airflow)でDAGとして定義したスケジュールバッチから、Step Functions Standardのステートマシンを起動し複合データパイプラインを実行するパターン。AirflowのDAG依存関係管理とStep Functionsのマイクロサービスオーケストレーションを組み合わせることで、エンタープライズ規模のデータパイプラインを構築できる。
from airflow import DAG
from airflow.providers.amazon.aws.operators.step_function import StepFunctionStartExecutionOperator
from airflow.providers.amazon.aws.sensors.step_function import StepFunctionExecutionSensor
from datetime import datetime, timedelta
import json
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=10),
"email_on_failure": True,
}
with DAG(
dag_id="mwaa_sfn_composite_pipeline",
default_args=default_args,
schedule_interval="0 2 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["production", "data-pipeline"],
) as dag:
start_etl = StepFunctionStartExecutionOperator(
task_id="start_step_functions_etl",
state_machine_arn="arn:aws:states:ap-northeast-1:123456789012:stateMachine:composite-etl-pipeline",
name="{{ ds_nodash }}-etl-run",
input=json.dumps({
"date": "{{ ds }}",
"sourceBucket": "s3://data-lake/raw/{{ ds }}/",
"targetBucket": "s3://data-lake/processed/{{ ds }}/"
}),
aws_conn_id="aws_default",
)
wait_for_etl = StepFunctionExecutionSensor(
task_id="wait_for_etl_completion",
execution_arn="{{ task_instance.xcom_pull(task_ids='start_step_functions_etl') }}",
aws_conn_id="aws_default",
)
start_etl >> wait_for_etl
MWAAのStepFunctionExecutionSensorを使用することで、Airflow DAGがStep Functionsの完了を待機してから後続タスクを実行できる。長期実行ETL処理をStep Functions Standardでオーケストレートし、全体スケジューリングはMWAAが担う役割分担が推奨構成となる。
選定フローチャート — 同期 vs 非同期 → 短期 vs 長期 → ワークフロー vs データ統合 vs MQ
4パターンの経験をベースに、初見の要件に対してどのサービスを選ぶかを体系的に判断するフローチャートを以下に示す。「同期 vs 非同期 → 短期 vs 長期 → ワークフロー vs データ統合 vs MQ vs DAGスケジューリング」の順で評価することで、最適なサービスを即座に特定できる。
[スタート: 要件の特定]
Q1: 同期(即時応答が必須)?
YES → API Gateway + Lambda / Step Functions Express
│
├─ 複数ステップの処理が必要 → Step Functions Express
└─ 単一Lambda処理で十分 → API GW + Lambda直接統合
NO → 非同期処理 → Q2へ
Q2: 実行時間・頻度は?
5分以内 かつ 高頻度(秒〜分単位トリガ) → Step Functions Express
長時間(分〜時間・日単位)または状態保存が必要 → Step Functions Standard
どちらか確定したら → Q3へ
Q3: 主目的は?
SaaS(Salesforce/SAP/Slack等)との双方向データ統合
→ Amazon AppFlow(14種以上の公式コネクタを活用)
JMS/STOMP/AMQPプロトコルが必須(既存MQシステムとの接続)
→ Amazon MQ(ActiveMQ or RabbitMQ)
複雑なDAG依存関係付きバッチスケジューリング(ML/ETL)
→ Amazon MWAA(Managed Airflow)
AWSネイティブメッセージングで完結
→ SQS/SNS/EventBridge(Vol1)
Q4: Vol1資産との連携は?
SQS/SNSをサブシステムとして活用 → Vol2サービス + Vol1サービス組み合わせ
EventBridgeトリガが起点 → EventBridge → Vol2サービスのChain
AppSync(GraphQL)がフロントエンド → AppSync → Step Functionsのリアルタイム統合
| 要件 | 推奨サービス | シリーズ |
|——|————-|———|
| 同期APIリクエスト受信 | API Gateway + Lambda | Vol1 |
| 複数ステップの同期ワークフロー | API GW + Step Functions Express | Vol1+2 |
| 高頻度・短時間の非同期処理 | SQS + Lambda / SFN Express | Vol1+2 |
| 長期実行・状態管理が必要 | Step Functions Standard | Vol2 |
| SaaS → AWS データ同期 | Amazon AppFlow | Vol2 |
| JMS/STOMP/MQTT対応 | Amazon MQ(ActiveMQ or RabbitMQ) | Vol2 |
| Airflow DAG + 複雑依存関係 | Amazon MWAA | Vol2 |
| イベントドリブン通知配信 | EventBridge + SNS | Vol1 |
| シンプルスケジュールcron | EventBridge Scheduler | Vol1 |
| GraphQL + リアルタイム更新 | AppSync + DynamoDB Streams | Vol1 |
Vol1×Vol2を組み合わせる際の鉄則は「Vol1で流し、Vol2でオーケストレートする」である。SQS/SNS/EventBridgeでメッセージを確実に転送・分散し、Step Functions/AppFlow/MQ/MWAAで高度化・可視化することで、スケーラビリティとメンテナビリティを両立できる。
4パターンは固定ではなく、要件進化とともに組み合わせを変えることができる。例えばEventBridge(Vol1)のスケジュールイベントにAppFlow(Vol2)を繋ぎ、完了後にStep Functions Standard(Vol2)で処理し、結果をSNS(Vol1)で通知するフルスタック統合パターンも構成可能だ。選定フローチャートと4パターンを組み合わせることで、どのような要件が来ても「Vol1の何を使い、Vol2の何で高度化するか」を迷わず決定できる設計力が身につく。
まとめ — Application Integration 二部作完成 + 全軸クロスリンク
§8-1 — Vol2 4サービス要点まとめ
Vol2で学んだ4サービスの要点を整理する。Vol1(SQS/SNS/EventBridge/API GW/AppSync)と組み合わせることでApplication Integration完全体が完成し、あらゆる統合要件に対応できるアーキテクチャ基盤が整う。
| サービス | 主目的 | 代表ユースケース | 実行時間上限 | 料金単位 |
|---|---|---|---|---|
| Step Functions Standard | 長期ワークフロー・状態管理 | ETL/承認フロー/分散バッチ | 最大1年 | 状態遷移数 |
| Step Functions Express | 高頻度・高スループット | API処理/イベント変換/ストリーム処理 | 最大5分 | 実行時間×リクエスト数 |
| Amazon AppFlow | SaaS-AWS双方向データ統合 | Salesforce/SAP/Slack→S3/Redshift | 1回/フロー | フロー実行×処理データ量 |
| Amazon MQ (ActiveMQ) | JMSエンタープライズメッセージング | レガシー移行/JMSアプリ接続 | 常時稼働 | ブローカー時間×ストレージ |
| Amazon MQ (RabbitMQ) | AMQPメッセージング | マイクロサービス間非同期通信 | 常時稼働 | ブローカー時間×ストレージ |
| Amazon MWAA | Managed Airflow DAGスケジューリング | 複雑依存バッチ/MLパイプライン | DAG単位 | 環境時間+ワーカー数 |
設計判断の重要な境界線:
| 判断基準 | 選択肢A | 選択肢B | 判断ポイント |
|---|---|---|---|
| Standard vs Express | Standard | Express | 5分超または状態保存 → Standard / 高頻度・短時間 → Express |
| AppFlow vs Lambda | AppFlow | カスタムLambda | SaaS公式コネクタが存在 → AppFlow / 独自API → Lambda |
| MQ vs SQS | Amazon MQ | SQS | JMS/STOMP/AMQP必須 → MQ / AWSネイティブ → SQS |
| MWAA vs EventBridge | MWAA | EventBridge Scheduler | 複雑DAG依存 → MWAA / シンプルcron → EventBridge |
エラーハンドリング戦略の比較:
| サービス | リトライ機構 | 失敗時の通知 | 可視化 |
|---|---|---|---|
| Step Functions Standard | ASL Retry/Catch定義 | EventBridge → SNS | コンソールで実行ツリー確認可能 |
| Step Functions Express | ASL Retry/Catch定義 | CloudWatch Logs | 高頻度のため集計メトリクスで監視 |
| Amazon AppFlow | フロー設定でリトライ回数指定 | SNS通知設定 | Flow run履歴画面 |
| Amazon MQ | DeadLetterQueue設定 | CloudWatch Alarms | ActiveMQ管理コンソール |
| Amazon MWAA | DAG retry / task retry設定 | メールアラート / SNS | Airflow Web UI |
§8-2 — 全軸クロスリンク
Application Integration本番運用シリーズはVol1+Vol2で完結するが、実際の本番環境ではデータベース・ストレージ・コンテナ・サーバーレスなど複数の軸と組み合わせて使用する。以下のクロスリンクで関連軸の記事を参照し、統合的なAWSアーキテクチャ設計力を身につけよう。
- Application Integration本番運用 Vol1 (SQS / SNS / EventBridge / API GW / AppSync)
メッセージング統合プラットフォーム基盤 — SQS・SNS・EventBridge・API Gateway・AppSyncの5サービス完全ガイド - Application Integration本番運用 Vol2 (本記事)
Step Functions・AppFlow・Amazon MQ・MWAA — ワークフロー・データ統合・エンタープライズMQ・Airflow層
Vol1+Vol2 = Application Integration完全体 — 基盤メッセージング(Vol1)× オーケストレーション(Vol2)で全要件をカバー
- Serverless Vol1 (Lambda × API GW × Step Functions): Lambda×API GW×Step Functions入門 — サーバーレス本番運用の基礎
- Serverless Vol2 (EventBridge × SQS × SNS × Kinesis): EventBridge×SQS×SNS×Kinesis — イベントドリブンアーキテクチャ完全体
- Analytics Vol1 (Glue × Athena × Redshift): Glue×Athena×Redshift — AppFlowのS3出力先をAthenaで分析するパターンで特に参照
- Database Vol1 (RDS × Aurora × DynamoDB): RDS×Aurora×DynamoDB — Step FunctionsからDynamoDB SDK直接統合するパターンで参照
- Storage Vol1 (S3 × EFS × FSx): S3×EFS×FSx — AppFlowのデータ格納先・Step FunctionsのS3処理で参照
Application Integrationの真価はこれらの関連軸と組み合わせることで発揮される。AppFlow → S3 → Athena(Analytics)のパイプラインや、Step Functions → DynamoDB(Database)の直接統合など、軸をまたいだアーキテクチャ設計が本番環境の標準パターンとなる。
- Container Vol1 (ECS × Fargate × ECR): ECS×Fargate×ECR — Step FunctionsでECSタスクをオーケストレートするパターンで参照
- Container Vol2 (EKS × ArgoCD): EKS×ArgoCD — Kubernetes上のMWAA DAGデプロイで参照
- Migration Vol1 (DMS × MGN × Snow Family): DMS×MGN×Snow Family — オンプレMQからAmazon MQへの移行計画で参照
§8-3 — Application Integration二部作完成宣言
Vol1(基盤)+ Vol2(ワークフロー/統合)= App Integration完全体 — 第25軸二部作完成
Vol1で5つのメッセージングサービス(SQS/SNS/EventBridge/API Gateway/AppSync)を網羅し、Vol2で4つのオーケストレーションサービス(Step Functions/AppFlow/Amazon MQ/MWAA)を完全解説した。2本の記事を通じて、AWSのApplication Integration領域全体をカバーする体系的な知識を提供できた。
Vol1で習得したこと:
- SQS FIFO/標準キュー — 順序保証・冪等性・デッドレターキューの設計
- SNS Fan-out — トピック設計・フィルタポリシー・クロスアカウント配信
- EventBridge — ルール設計・スケジューラ・カスタムバス・パイプ
- API Gateway — REST/HTTP API・オーソライザ・スロットリング・VPC統合
- AppSync — GraphQL・リゾルバ・リアルタイムサブスクリプション・マッピングテンプレート
Vol2で習得したこと:
- Step Functions Standard — 長期実行・エラーハンドリング・分散マップ・コールバック
- Step Functions Express — 高頻度実行・SDK直接統合・同期実行
- Amazon AppFlow — SaaS統合・スケジュールトリガ・データ変換・PrivateLink
- Amazon MQ — ActiveMQ/RabbitMQ・Active/Standby・Network of Brokers・JMS/STOMP/MQTT
- Amazon MWAA — DAG設計・Worker/Scheduler・実行ロール・プラグイン管理
Vol1+Vol2の統合で実現したこと:
- API GW → SFN Express → SQS → Lambda の同期+非同期橋渡しパターン
- EventBridge → AppFlow → S3 → Step Functions のSaaS統合パイプライン
- SNS Fan-out → Amazon MQ → JMS のエンタープライズブリッジパターン
- MWAA → Step Functions Standard → 複合データパイプライン のハイブリッドオーケストレーション
本二部作を完読したCloud Architect / Backend Engineerは、AWSのApplication Integration領域における設計判断を自信を持って行えるようになる。「どのサービスを使うか」の迷いがなくなり、要件定義から本番運用まで一貫した設計方針を持てる状態が達成できる。
64記事化達成 — 第25軸二部作完成:
本記事(Vol2)の公開により、AWSハンズオンシリーズは第25軸の二部作を完成させ、累計64記事化を達成した。コンピューティング・ネットワーク・セキュリティ・データベース・ストレージ・コンテナ・サーバーレス・機械学習・アナリティクス・Application Integration など25の軸にわたる包括的なAWS本番運用ガイドが完成した。
設計力向上のための3つの視点:
Vol1+Vol2を学んだ後、さらに設計力を高めるには以下の3つの視点が有効である。
レイヤー思考: メッセージング層(Vol1)とオーケストレーション層(Vol2)を明確に分離した設計を意識する。各層の責務を明確にすることで、将来の変更に強いアーキテクチャが構築できる。
コスト最適化: Step Functions ExpressとStandard、Amazon MQのブローカーサイズ、MWAAの環境クラスは要件に応じて適切に選択する。設計段階でAWS Pricing Calculatorを用いたコスト見積もりを行うことが推奨される。
可観測性: 各サービスのCloudWatch連携(Step Functions実行ログ、AppFlow実行レポート、MQブローカーメトリクス、MWAA DAGログ)を事前に設定し、本番環境での障害対応を迅速化する。
§8-4 — 読者アクションリスト
本記事の内容を自分のプロジェクトに活かすために、以下のアクションリストを実践しよう。
Step Functions習得アクション:
- [ ] Step Functions Standard/Expressのコスト計算ツールで自プロジェクトの料金を試算する
- [ ] AWSコンソールでサンプルASLステートマシンを作成し、実行履歴の可視化を体験する
- [ ] 既存Lambda Chainをステートマシンに書き換え、エラーハンドリングの改善効果を確認する
- [ ] 分散マップ(Distributed Map)で大規模並列処理のパフォーマンス検証を実施する
Amazon AppFlow習得アクション:
- [ ] Salesforce Developer Edition(無料)でAppFlowの接続フローをトライアル実行する
- [ ] S3出力先のParquetフォーマット変換を設定し、Athenaでのクエリを試す
- [ ] EventBridge連携を設定し、AppFlow完了トリガーでLambdaが自動起動することを確認する
- [ ] プライベート接続設定を検討し、社内データソースとの安全な統合を計画する
Amazon MQ習得アクション:
- [ ] Amazon MQ for ActiveMQをSingle-instance Brokerで起動し、JMSクライアントで接続テストする
- [ ] Active/Standby構成でフェイルオーバーをシミュレートし、切り替え時間を計測する
- [ ] 既存のオンプレミスActiveMQとAmazon MQのNetwork of Brokersを構成し、メッセージ転送を確認する
- [ ] RabbitMQ版でAMQP接続をテストし、既存RabbitMQアプリの移行可能性を評価する
Amazon MWAA習得アクション:
- [ ] ローカルのApache Airflowで動作確認済みのDAGをMWAAに移行し、依存関係の差異を確認する
- [ ] MWAAとS3の連携でDAGファイルの自動同期を設定し、GitOpsパターンを構築する
- [ ] StepFunctionStartExecutionOperatorを使用してStep FunctionsをMWAAから起動するDAGを作成する
- [ ] MWAA Webサーバーへのアクセス制御(VPC Private/Public)を要件に合わせて設定する
Vol1×Vol2統合 実践アクション:
- [ ] §7で解説した4パターンのうち、自プロジェクトに最も近いパターンを1つ選んでプロトタイプを作成する
- [ ] 選定フローチャートを使って現在の設計を評価し、より適切なサービス組み合わせがないか検討する
- [ ] Vol1のEventBridgeルールとVol2のAppFlow/Step Functionsを繋ぐイベントドリブンパイプラインを設計書に落とす
- [ ] AWS Well-Architected ToolでApplication Integrationの設計をレビューし、改善ポイントを把握する
§8-5 — 公式ドキュメント & クロスリンク
本記事で解説した4サービスの公式ドキュメントと関連記事へのリンクをまとめる。実装時の詳細仕様や最新情報は常に公式ドキュメントを参照することを推奨する。
各サービスの公式ドキュメントには本記事で触れられなかった高度な機能(Step FunctionsのVersioning・Alias管理、AppFlowのカスタムコネクタ開発、Amazon MQのネットワーク暗号化設定、MWAAのカスタムプラグイン開発)が詳細に解説されている。本番環境で問題が発生した際は、公式ドキュメントのトラブルシューティングセクションとAWS re:Postコミュニティを優先的に参照すること。
AWS Step Functions 公式ドキュメント — ASL / Standard/Express / エラーハンドリング
Amazon MQ 公式ドキュメント — ActiveMQ / RabbitMQ / Network of Brokers
Application Integration本番運用 Vol1 — SQS × SNS × EventBridge × API GW × AppSync
Amazon MWAA 公式ドキュメント — Airflow DAG / Worker / Scheduler / 実行ロール
Application Integration Vol1+Vol2 二部作完成 — AWSのメッセージング・ワークフロー・データ統合・エンタープライズMQ・スケジュールバッチの全領域をカバーする包括的なガイドが完成した。本二部作で身につけた設計力を活かして、エンタープライズ規模のAWS統合アーキテクチャを自信を持って設計・実装しよう。
次のステップ — 学習ロードマップ:
Application Integration Vol1+Vol2の学習完了後は、以下のロードマップで知識を体系化することを推奨する。
- 即時実践: 本記事のコードサンプルをAWSコンソールまたはCLIで試す(まずStep Functionsサンプルステートマシンから)
- 実プロジェクト適用: 既存のLambda Chain/直接API呼び出しをStep Functionsに段階的に移行する
- 隣接軸の習得: Analytics(Glue×Athena)を学び、AppFlowとの統合パイプラインを構築する
- アーキテクチャレビュー: AWS Well-Architected Frameworkのサーバーレスレンズを参照し、設計を評価する
- コスト最適化: 本番稼働後3ヶ月で利用パターンを分析し、Standard/Expressの使い分けを最適化する
Application IntegrationはAWSアーキテクチャの中核となる領域であり、全サービスの設計品質に直結する。本二部作を基盤として、AWS Certified Solutions Architect – Professional(SAP-C02)レベルのアーキテクチャ設計力を継続的に磨いていこう。
最後に — Application Integration設計力の本質:
本番運用で真に価値が出るのは、個別サービスの知識よりも「どのサービスをどの組み合わせで使うか」という判断力だ。本二部作で体系化した設計パターン・選定フローチャート・統合アーキテクチャを手元に置き、新しい要件が来るたびに参照することで、設計力は確実に蓄積されていく。
SQSがあるからStep Functionsは不要ではなく、Step FunctionsがあるからSQSは不要でもない。Vol1とVol2は競合ではなく補完関係にある。この相補的な設計思想こそが、Application Integration完全体の核心である。