NO IMAGE

AWS Config×SSM Automation Terraform drift 自動修復

NO IMAGE
目次

1. この記事について

AWS パラメーターシート自動化シリーズ

本シリーズは「Terraform で設計した AWS インフラをいかに正確に運用し続けるか」という問いに、3段階で答えます。

  • 第1弾(設計の可視化): Terraform コードから AWS パラメーターシート(Excel)を自動生成し、設計レビューや社内共有を効率化する仕組みを構築しました。
  • 第2弾(設計整合の検証): AWS Config の実リソース情報とパラメーターシートを突合する単体テスト(pytest)を整備し、構築後の受入検証を自動化しました。
  • 第3弾(本記事・drift の修復): 第2弾で「検知」した drift を、SSM Automation の runbook で実際に「是正」し、事後検証まで含めた閉ループを完成させます。

シリーズ3段階の物語

第1弾: 設計を可視化する
  └─ Terraform コード → Excel パラメーターシート
  ↓ レビュー・社内共有
第2弾: 設計との整合を継続検証する
  └─ AWS Config × pytest → Verdict(OK/NG/UNKNOWN)
  ↓ Verdict.NG = drift 検知
第3弾(本記事): drift を閉ループで自動修復する
  └─ Verdict.NG → Manual Approval → SSM Automation runbook
  → 事後検証 → Verdict.OK → 閉ループ完成

第2弾の Verdict.NG が本記事の入口です。検知された drift リソースを SSM Automation の runbook に渡し、承認フローを経て自動修復—さらに修復後の再検証まで自動化することで、「設計 → 検証 → 修復」の3段階が初めて完結します。

本記事で作れるようになるもの

本記事を通じて、以下の仕組みを構築できます。

機能内容
AWS Config Remediation 設定Terraform で aws_config_remediation_configuration を定義し、ルール違反を検知したら SSM Automation runbook を自動起動
Manual Approval フローaws:approve ステップで人の承認を必須とし、想定外の自動修復を防止
カスタム runbook単純なリソース修正では対応できない「state 突合が必要な修復」をスクリプトステップで実装
修復後の自動再検証修復完了後に第2弾の compare() を再実行し、Verdict.OK になるまで追跡
RemediationRow dataclass修復前後の状態・承認者・エラー情報を型安全に管理するデータ構造

fig01

第2弾未読の方へ

本記事では §3 で AWS Config Remediation Configuration の基礎から解説します。また、第2弾の検証ロジック(compare() / DiffRow)を本記事でも使用しますが、利用箇所ではコードを再掲しますので、第2弾未読でも手順を追えるよう構成しています。

第2弾の単体テスト自動化からシリーズを読み始めたい方は、第2弾 § 2 〜 §4 を先にご覧いただくことをお勧めします。

最終回か否か?

本記事で「設計 → 検証 → 修復」の基本3段階は完成します。ただし、マルチ AWS アカウント統制・Sentinel / OPA を使った policy-as-code 統合・GHA OIDC と Terraform Cloud を組み合わせた高度な CI/CD パイプラインへの発展は、本シリーズとは別の文脈で取り組む余地があります。本記事はその出発点として、「運用ツールとして独立完結する修復自動化」に集中します。

前提知識

前提知識(必読):

関連シリーズ:

  • AWS×Terraform 複数人開発シリーズ 全3弾 — Manual Approval を GHA OIDC と連携させる発展編への誘導

この記事の構成

全11セクションで、AWS Config × SSM Automation の修復自動化を段階的に解説します。

セクションタイトル内容の概要
§1(本節)この記事についてシリーズ概要・前提知識・学習ゴール
§2業務背景drift 放置リスク・手動修復コスト・閉ループ化の意義
§3AWS Config Remediation 基礎Remediation Configuration と SSM Automation の関係
§4SSM Automation runbook 設計修復 runbook の設計パターン・ステップ構成
§5Terraform リソース定義config_rule・remediation_configuration・ssm_document の定義
§6カスタム runbook 実装state 突合が必要な修復の runbook スクリプト実装
§7修復ロジック実装(Python)RemediationRow dataclass・状態遷移・ロールバック
§8pytest による修復検証pre/post スナップショット・修復後の Verdict 検証
§9運用通知の設計SNS / Slack 通知・承認フローのオペレーション
§10運用コスト・セキュリティ考慮SSM Automation 課金・IAM 最小権限・audit trail
§11まとめと今後の展開シリーズ完結・発展編への誘導

2. 業務背景: 検知から修復までを閉ループ化する意義

2.1 「検知できた、その次」の現実

第2弾では pytest × AWS Config で drift を Verdict.NG として検知する仕組みを構築しました。しかし「検知できた」だけでは運用フローは完結しません。

典型的なエンタープライズ案件において、検知後に実際に起きることを整理してみます。

Verdict.NG 検出後の手動フロー(Before):
──────────────────────────────────────
1. pytest が DiffRow リスト(差分一覧)を出力
2. 担当者がリストを確認・原因調査(5〜15分/件)
3. AWS マネジメントコンソールまたは CLI でリソース修正(2〜10分/件)
4. Terraform state と実態の乖離があれば手動で terraform refresh + plan(10〜20分/件)
5. Excel パラメーターシートを手動更新(5〜10分/件)
6. 修正完了後に再度 pytest を実行して Verdict.OK を確認(3〜5分/件)
7. 変更記録を社内チケットシステムに記入(5〜10分/件)

合計(1件あたり): 約 30〜70分

これが「検知しただけ」で終わる世界のコストです。

2.2 規模が大きくなると何が起きるか

10 環境 × 50 項目を管理するプロジェクトを例に考えます。

状況工数
drift 件数: 20件(10環境 × 50項目のうち 4%)
手動修復: 1件あたり平均 10分 × 20件200分(約3時間20分)
修復後の再検証 + Excel 更新: 1件あたり 5分 × 20件100分
チケット記録: 1件あたり 5分 × 20件100分
合計400分(約6時間40分)
自動化後: runbook 起動 + Manual Approval + 事後検証約 3〜5分/件 = 60〜100分(大部分は待機)

手作業対比で約4〜6倍のコスト削減になります。しかも手作業には「修正漏れ」「設定ミス」「state 不整合」というリスクが常に存在します。

fig02

2.3 ドリフトを放置すると何が起きるか

エンタープライズ案件でドリフトが長期間放置された場合、以下のリスクが顕在化します。

監査・コンプライアンスリスク

  • ISO 27001 / SOC2 審査で「設計通りの構成か」を問われたとき、実態との乖離が発覚する。
  • 「Terraform コードに記載はあるが実リソースは別設定」という状態は、設計書と実環境の不整合として指摘される。
  • 特に暗号化設定(S3 SSE・RDS 暗号化)・アクセスログ有効化の drift は高リスク項目として扱われる。

セキュリティリスク

  • S3 パブリックアクセスブロックの drift(terraform: true → 実態: false)は即座にデータ漏洩リスクになる。
  • セキュリティグループの egress 過剰開放は lateral movement の経路になりうる。
  • KMS キーポリシーの drift は暗号化の意図を無効化する。

運用コストリスク

  • 想定外のインスタンスタイプ変更(terraform: t3.small → 実態: t3.xlarge)が請求サイクル末に発覚するケースがある。
  • マルチリージョン展開の場合、修正漏れが複数リージョンに伝播し、修正コストが指数的に増大する。

Terraform state 不整合リスク

  • 手動でリソースを修正した場合、Terraform state と実態が乖離する。
  • 次回 terraform plan 時に意図しない差分が表示され、誤った apply で設定が巻き戻るリスクがある。
  • この問題を避けるため、本記事では「runbook 直接修復」だけでなく「Terraform 経由の修復」を第一選択として設計します。

2.4 CI/CD ではなく「運用ツール」として設計する理由

drift の自動修復を「CI/CD パイプラインに組み込む」アプローチも存在します。しかし本シリーズでは意図的にこれを採用しません。

その理由は第2弾でも説明した「操作の重み」思想にあります。

CI/CD 組込みアプローチの問題点:
  - PR マージのたびに修復が自動実行される
  - 人間が「どのリソースが、なぜ修復されたか」を把握しにくい
  - Terraform state との整合性確認が pipeline の速度に追われて省略されやすい
  - 本番環境の修復を「自動化だから安全」と過信するリスク

本記事の運用ツールアプローチ:
  - drift 検知は定期 / イベント駆動で実行
  - 修復は必ず Manual Approval を経由(aws:approve ステップ)
  - 承認者が「何を、なぜ、どう修復するか」を確認してから実行
  - 修復後に terraform refresh + plan を実行して state 整合性を確認

Manual Approval が必須である技術的理由は3つあります。

  1. Terraform state 保護: AWS リソースを直接修正すると state との乖離が生じる。修復前に「この修正は Terraform 経由で行うか、runbook 直接修復で行うか」を人間が判断する必要がある。

  2. 不可逆操作の防止: 暗号化キーの変更・セキュリティグループの削除など、一部の修復操作は元に戻せない。自動実行ではなく人の判断を必須にすることで事故を防ぐ。

  3. 監査証跡の担保: 誰が・いつ・何を承認したかが SSM Automation の実行履歴に記録され、監査対応に使用できる。

2.5 AWS Config Remediation を初めて使う方へ

「AWS Config は使っているが Remediation Configuration は触ったことがない」という方も多いと思います。

AWS Config の Remediation Configuration とは、「あるルールが NON_COMPLIANT になったとき、どの SSM Automation runbook を実行するか」を紐付ける設定です。Terraform では aws_config_remediation_configuration リソースとして定義できます。

SSM Automation については、AWS が多数の managed runbook(AWS-EnableS3BucketEncryption など)を提供しており、単純なリソース修正であれば独自実装なしで利用できます。より複雑な修復(state 突合・複合条件)はカスタム runbook として SSM Document に定義します。

本記事 §3 で基礎から丁寧に解説しますので、Remediation Configuration が初めてでも手順を追えます。

2.6 閉ループ化で実現できること

本記事のゴールである「閉ループ」を整理します。

閉ループのサイクル(After):
────────────────────────────────────────
[検知]
  AWS Config Rule → NON_COMPLIANT
  ↓
[通知]
  SNS / Slack → 担当者に通知
  ↓
[確認・承認]
  担当者が DiffRow リスト確認 → Manual Approval(3〜5分)
  ↓
[修復実行]
  SSM Automation runbook → AWS API 呼び出しでリソース修正
  ↓
[事後検証]
  pytest + compare() → Verdict.OK 確認
  ↓
[記録]
  RemediationRow が自動生成 → 修復前後の状態・承認者・実行 ID を保存
  ↓
[state 整合確認]
  terraform refresh + plan → 次回 apply 時の意図しない差分を排除
────────────────────────────────────────
人の介在: Manual Approval のみ(5分以内)
自動化範囲: 検知 → 通知 → runbook 実行 → 事後検証 → 記録

この閉ループを構築することで、「drift を放置しない」インフラ運用の基盤が完成します。次 §3 では、この閉ループを支える AWS Config Remediation Configuration と SSM Automation の関係を基礎から解説します。

3. AWS Config Remediation 基礎 — Remediation Configuration と SSM Automation の関係

🔰 Remediation 初挑戦の方へ
「Remediation って難しそう」と感じている方、大丈夫です。本章では AWS Config の “ルール違反検知 → 自動or手動修復” の仕組みを、Terraform コードと IAM 設定を含めてゼロから丁寧に解説します。
Config Rule がトリガーになって SSM Automation runbook が走るという 2段ロケット構造 を掴めれば、あとはパラメーター調整だけです。

3-1. Config Rule の compliance state — 4 種類と Remediation の関係

AWS Config ルールは、対象リソースを評価するたびに compliance state を返します。
この state によって Remediation が起動するかどうかが決まります。

compliance state意味Remediation 発動
COMPLIANTルールを満たしているしない
NON_COMPLIANTルール違反する(Automatic or Manual)
ERROR評価中にエラー発生(権限不足等)しない
INSUFFICIENT_DATA評価データが不足(リソース作成直後など)しない

NON_COMPLIANT のみが Remediation のトリガーになりますERROR / INSUFFICIENT_DATA は対象外のため、IAM 権限ミスや評価タイムラグは別途モニタリングしてください。

# 現在の compliance state を CLI で確認
aws configservice describe-compliance-by-config-rule \
  --config-rule-names s3-bucket-server-side-encryption-enabled \
  --compliance-types NON_COMPLIANT \
  --query 'ComplianceByConfigRules[*].{Rule:ConfigRuleName,State:Compliance.ComplianceType}' \
  --output table

3-2. RemediationConfiguration の 2 系統 — Automatic vs Manual

Config ルール違反を検知したとき、どう修復するかを定義するのが RemediationConfiguration です。
設定できる修復モードは 2 系統あります。

Automatic Remediation(automatic = true

NON_COMPLIANT を検知した瞬間、AWS が自動的に runbook を実行します。
設定が最もシンプルで、一見便利に見えます。しかし本記事では Automatic は使いません。

Manual Remediation(automatic = false)← 本記事の採用方式

NON_COMPLIANT を検知しても runbook は自動実行されません。
オペレーターが AWS コンソールまたは CLI から明示的に “修復を実行” を選択したときだけ runbook が走ります。


3-3. Automatic Remediation の危険性 ─ なぜ Manual を選ぶか

「自動修復なら楽でいいじゃないか」 — その誘惑に乗ってはいけません。

危険 1: Terraform state の不整合

Automatic Remediation が S3 バケット暗号化を有効化した場合、AWS リソースの実体は変わりますが Terraform の tfstate は古い値のまま です。
次の terraform plan で Terraform は「実体と state が食い違っている」と判断し、意図しない変更差分が出ます。
最悪の場合、次の terraform apply で修復した設定が上書きされて元の違反状態に戻ります。

[時系列]
T+0Config が NON_COMPLIANT 検知
T+1Automatic Remediation が S3 暗号化を ON
T+2tfstate は暗号化=OFF のまま(state ファイル未更新)
T+3CI/CD の terraform apply が実行 → 暗号化=OFF に戻す 😱

危険 2: 連鎖修復(Cascade Remediation)

あるリソース修復 → 別の Config Rule が NON_COMPLIANT 化 → その Remediation が実行 → さらに別のリソースが変わる…という連鎖が止まらず、本番環境ではトレースに数時間かかります。

危険 3: コスト暴走

変更トリガーの評価頻度が高い場合、同じリソースが繰り返し評価 → 繰り返し修復 → SSM ステップ課金が積み重なります。

Manual Remediation ならオペレーターが実行前に内容を確認できるため、上記リスクを全て回避できます。 これが本記事(殿要件)が automatic = false を徹底する理由です。


3-4. AWS-managed runbook 代表例一覧

AWS が公式提供している AWS-managed runbook は数百種類あります。
よく使われる代表的な 10 種類を以下にまとめます。

runbook 名対象リソース実施内容
AWS-EnableS3BucketEncryptionS3 バケットサーバーサイド暗号化(SSE-S3/SSE-KMS)を有効化
AWS-ConfigureS3BucketLoggingS3 バケットアクセスログ記録を有効化・ログ先バケット設定
AWS-EnableS3BucketPublicAccessBlockS3 バケットパブリックアクセスブロックを有効化
AWS-EnableCloudTrailLogFileValidationCloudTrailログファイルの整合性検証を有効化
AWS-EnableVpcFlowLogsVPCVPC フローログを CloudWatch Logs または S3 へ有効化
AWS-EnableRDSBackupRDS インスタンス自動バックアップを有効化・バックアップ保持期間設定
AWS-EnableMultiAZOnRDSInstanceRDS インスタンスMulti-AZ 配置を有効化
AWS-DisablePublicAccessToRDSInstanceRDS インスタンスパブリックアクセスを無効化
AWS-ConfigureEbsEncryptionByDefaultEBS ボリュームアカウントレベルの EBS デフォルト暗号化を有効化
AWS-EnableKeyRotationKMS キー年次キーローテーションを有効化
# AWS-managed runbook の詳細を確認
aws ssm describe-document \
  --name "AWS-EnableS3BucketEncryption" \
  --query 'Document.{Name:Name,Schema:SchemaVersion,Platform:PlatformTypes}'

3-5. SSM Automation runbook の基本形

SSM Automation の runbook は YAML(または JSON)で記述します。
schemaVersion: "0.3" が Automation 専用のバージョンです(2.2 は Run Command 用なので混同注意)。

# runbooks/enable-s3-encryption.yaml
description: "S3 バケットのサーバーサイド暗号化を有効化する"
schemaVersion: "0.3"
assumeRole: "{{ AutomationAssumeRole }}"

parameters:
  BucketName:
 type: String
  AutomationAssumeRole:
 type: String
 default: ""
  SSEAlgorithm:
 type: String
 default: "aws:kms"
 allowedValues: ["aws:kms", "AES256"]

mainSteps:
  - name: CheckCurrentEncryption
 action: aws:assertAwsResourceProperty
 inputs:
Service: s3
Api: GetBucketEncryption
Bucket: "{{ BucketName }}"
PropertySelector: "$.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm"
DesiredValues: [""]
 onFailure: "step:EnableEncryption"
 isEnd: true

  - name: EnableEncryption
 action: aws:executeAwsApi
 description: "S3 バケット暗号化を有効化"
 inputs:
Service: s3
Api: PutBucketEncryption
Bucket: "{{ BucketName }}"
ServerSideEncryptionConfiguration:
  Rules:
 - ApplyServerSideEncryptionByDefault:
  SSEAlgorithm: "{{ SSEAlgorithm }}"
 isEnd: true

mainSteps の主要アクションを用途別に整理します:

アクション用途
aws:assertAwsResourcePropertyAWS リソースのプロパティを検証(前提条件確認)
aws:executeAwsApiAWS API を直接呼び出して設定変更
aws:executeScriptPython / PowerShell スクリプトを実行(複雑なロジック)
aws:waitForAwsResourcePropertyリソースの状態変化を待機(非同期操作)
aws:approve人間の承認(Manual Approval)を要求

3-6. Terraform 最小コード例 — aws_config_remediation_configuration

Terraform で RemediationConfiguration を定義する最小構成です。

# terraform/modules/config-remediation/main.tf
# Terraform 1.9.x / hashicorp/aws ~> 5.0

# --- Config Rule(S3 暗号化検証)---
resource "aws_config_config_rule" "s3_encryption" {
  name = "s3-bucket-server-side-encryption-enabled"

  source {
 owner = "AWS"
 source_identifier = "S3_BUCKET_SERVER_SIDE_ENCRYPTION_ENABLED"
  }

  depends_on = [aws_config_configuration_recorder.main]
}

# --- Remediation Configuration(Manual・automatic=false 必須)---
resource "aws_config_remediation_configuration" "s3_encryption" {
  config_rule_name = aws_config_config_rule.s3_encryption.name
  resource_type = "AWS::S3::Bucket"
  target_type= "SSM_DOCUMENT"
  target_id  = "AWS-EnableS3BucketEncryption"

  # ★ automatic = false 必須(殿要件・Terraform state 整合性保護)
  automatic = false

  # runbook に渡すパラメーター
  parameter {
 name= "BucketName"
 resource_value = "RESOURCE_ID"
  }

  parameter {
 name= "AutomationAssumeRole"
 static_value = aws_iam_role.config_remediation.arn
  }

  parameter {
 name= "SSEAlgorithm"
 static_value = "aws:kms"
  }
}

parameter ブロックには 2 種類の値指定があります:

指定方法キー説明
resource_value = "RESOURCE_ID"resource_valueConfig が評価した対象リソースの ID を自動セット
static_value = "..."static_value固定値を直接指定

3-7. IAM Role の設計 — Config → SSM の信頼関係

Remediation が実行されるとき、AWS Config は IAM ロールを「引き受けて(AssumeRole)」SSM Automation を呼び出します。
このロールの設計が正しくないと、修復が権限エラーで失敗します。

# IAM ロール: Config Remediation 用
resource "aws_iam_role" "config_remediation" {
  name = "config-remediation-role"

  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Effect = "Allow"
  Principal = {
 # ★ Config が AssumeRole できるよう ssm.amazonaws.com も追加
 Service = [
"ssm.amazonaws.com",
"config.amazonaws.com"
 ]
  }
  Action = "sts:AssumeRole"
}
 ]
  })

  tags = {
 ManagedBy = "terraform"
  }
}

# インラインポリシー: SSM Automation 実行 + 対象リソース操作
resource "aws_iam_role_policy" "config_remediation_ssm" {
  name = "config-remediation-ssm-policy"
  role = aws_iam_role.config_remediation.id

  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Sid = "SSMAutomation"
  Effect = "Allow"
  Action = ["ssm:StartAutomationExecution", "ssm:StopAutomationExecution", "ssm:GetAutomationExecution"]
  Resource = "arn:aws:ssm:ap-northeast-1:123456789012:automation-execution/*"
},
{
  Sid = "S3Remediation"
  Effect = "Allow"
  Action = ["s3:PutBucketEncryption", "s3:GetBucketEncryption", "s3:PutBucketPublicAccessBlock"]
  Resource = "arn:aws:s3:::*"
},
{
  Sid= "PassRoleToSSM"
  Effect= "Allow"
  Action= "iam:PassRole"
  Resource = "arn:aws:iam::123456789012:role/config-remediation-role"
}
 ]
  })
}

IAM の信頼関係を図で確認します:

[AWS Config]
 │
 │ AssumeRole(config.amazonaws.com)
 ▼
[config-remediation-role]
 │
 │ StartAutomationExecution
 ▼
[SSM Automation]
 │
 │ 実際の AWS API 呼び出し(S3, RDS, VPC...)
 ▼
[対象リソース]

3-8. Remediation 実行フローの全体像

fig03

上図の流れを CLI コマンドで追います。

# NON_COMPLIANT なリソースを特定
aws configservice get-compliance-details-by-config-rule \
  --config-rule-name s3-bucket-server-side-encryption-enabled \
  --compliance-types NON_COMPLIANT \
  --query 'EvaluationResults[*].EvaluationResultIdentifier.EvaluationResultQualifier.ResourceId' \
  --output text

# Manual Remediation を実行
aws configservice start-remediation-execution \
  --config-rule-name s3-bucket-server-side-encryption-enabled \
  --resource-keys '[{"resourceType":"AWS::S3::Bucket","resourceId":"my-bucket-name"}]'

# 実行結果を確認
aws configservice describe-remediation-execution-statuses \
  --config-rule-name s3-bucket-server-side-encryption-enabled \
  --query 'RemediationExecutionStatuses[*].{Resource:ResourceKey.ResourceId,State:State}' \
  --output table

3-9. コスト感 — Remediation Configuration は無料

💰 コスト整理

| 課金対象 | 単価 |
|———|——|
| Remediation Configuration 定義 | 無料 |
| SSM Automation ステップ | 月 1,000 step まで無料 / 以降 $0.002/step |

1 runbook が平均 10 ステップなら月 100 回の修復まで無料。Automatic Remediation で毎分自動実行が走ると無料枠をすぐ使い切るため、Manual + 承認フローが課金制御にも有効です。


まとめ: Section 3 のポイント

項目本記事の選択理由
Remediation モードManualautomatic = falsetfstate 整合性・連鎖修復・コスト暴走リスク回避
runbook 選択AWS-managed 優先・カスタムは §6 で解説保守コスト最小化
IAM 設計ssm.amazonaws.com + config.amazonaws.com の二重 PrincipalSSM Automation 実行に両方必要
修復後作業必ず terraform refresh / terraform plan -refresh-onlytfstate と実体の乖離解消

次の §4 では SSM Automation runbook の内部構造(mainSteps の各アクション・ログ・エラーハンドリング)を深掘りします。

4. SSM Automation 深掘り — runbook 構造・権限設計・ログ

Section 3 では AWS Config Remediation Configuration の全体像を把握しました。このセクションでは「修復の実行エンジン」である SSM Automation runbook の内部構造を深く掘り下げます。カスタム runbook を自分で書けるようになることで、AWS-managed runbook が対応していない複合条件の修復も自動化できます。


4-1. schemaVersion の選択

SSM Document には複数の schemaVersion があります。混同しやすいため最初に整理します。

# schemaVersion の選択指針
schemaVersion: '0.3'# ← Automation(本記事はこれ一択)
# schemaVersion: '2.2'  # RunCommand(EC2 上でコマンド実行・今回は使わない)
# schemaVersion: '0.3'が Automation 専用。Remediation Configuration には必ず 0.3 を使う。
schemaVersion用途対象
0.3Automation(本記事)AWS リソースの状態変更・API 呼び出し・Python スクリプト実行
2.2RunCommandEC2 インスタンス上のシェル / PowerShell コマンド実行
1.2旧式 RunCommand非推奨・新規利用禁止

AWS Config Remediation Configuration は Automation(0.3)のみ サポートしています。


4-2. mainSteps の 20 種アクション(主要5種詳説)

SSM Automation の mainSteps には 20 種を超えるアクションが用意されています。本記事で使う主要 5 種を詳説します。

mainSteps アクション分類

【検証系】
  aws:assertAwsResourceProperty← ★本記事多用(事前条件チェック)
  aws:waitForAwsResourceProperty  ← 非同期処理完了待ち(例: EC2 running まで待つ)

【実行系】
  aws:executeAwsApi← ★本記事多用(直接 API 呼び出し)
  aws:executeScript← ★本記事多用(Python で複雑なロジックを書く)
  aws:invokeLambdaFunction  ← Lambda 経由(重い処理・外部依存がある場合)

【フロー制御系】
  aws:approve← Manual Approval(§7 で詳述)
  aws:branch ← 条件分岐(if 文相当)
  aws:sleep  ← 指定秒数待機

【リソース系】
  aws:createStack  ← CloudFormation スタック作成
  aws:deleteStack  ← CloudFormation スタック削除

aws:assertAwsResourceProperty — 事前条件チェック

- name: assertEncryption
  action: 'aws:assertAwsResourceProperty'
  onFailure: 'step:enableEncryption'# 失敗したら enableEncryption ステップへ
  isCritical: false# このステップ単独では abort しない
  inputs:
 Service: s3
 Api: GetBucketEncryption
 Bucket: '{{BucketName}}'
 PropertySelector: '$.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm'
 DesiredValues:
- 'AES256'
- 'aws:kms'

PropertySelectorjq ライクな JSON パスで AWS API レスポンスの属性を指定します。DesiredValues に複数値を列挙すると OR 条件になります。

aws:executeAwsApi — 直接 API 呼び出し

- name: enableEncryption
  action: 'aws:executeAwsApi'
  inputs:
 Service: s3
 Api: PutBucketEncryption
 Bucket: '{{BucketName}}'
 ServerSideEncryptionConfiguration:
Rules:
  - ApplyServerSideEncryptionByDefault:
SSEAlgorithm: 'AES256'

Service は boto3 のクライアント名(s3, ec2, rds…)、Api はメソッド名(PascalCase)です。boto3 ドキュメントそのままの引数が使えます。

aws:executeScript — Python runtime の利用

- name: fetchCurrentConfig
  action: 'aws:executeScript'
  inputs:
 Runtime: 'python3.11'
 Handler: 'script_handler'
 InputPayload:
BucketName: '{{BucketName}}'
 Script: |
import boto3
import json

def script_handler(events: dict, context) -> dict:
 """S3 バケットの現在の暗号化設定を取得して返す。"""
 bucket = events["BucketName"]
 s3 = boto3.client("s3")
 try:
  resp = s3.get_bucket_encryption(Bucket=bucket)
  sse = resp["ServerSideEncryptionConfiguration"]["Rules"][0]
  return {"EncryptionEnabled": True, "Algorithm": sse["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]}
 except s3.exceptions.from_code("ServerSideEncryptionConfigurationNotFoundError"):
  return {"EncryptionEnabled": False, "Algorithm": ""}
  outputs:
 - Name: EncryptionEnabled
Selector: '$.Payload.EncryptionEnabled'
Type: Boolean
 - Name: Algorithm
Selector: '$.Payload.Algorithm'
Type: String

重要ポイント:
Handlerdef script_handler(events, context): と一致させる
events には InputPayload の内容が dict として渡される
outputs で後続ステップが参照できる変数を定義する


4-3. カスタム runbook YAML フル(mainSteps 3 段構成)

S3 バケットのサーバーサイド暗号化を有効化するカスタム runbook の完成形です。

# runbooks/remediate-s3-encryption.yaml
schemaVersion: '0.3'
description: |
  S3 バケットのサーバーサイド暗号化を確認・有効化する。
  手順:
 1. assertEncryption— 既に暗号化済なら終了(冪等性保証)
 2. enableEncryption— AES-256 で暗号化を有効化
 3. verifyEncryption— 有効化の確認(サニティチェック)
assumeRole: '{{AutomationAssumeRole}}'

parameters:
  BucketName:
 type: String
 description: '対象 S3 バケット名'
 allowedPattern: '^[a-z0-9][a-z0-9\-]{1,61}[a-z0-9]$'
  AutomationAssumeRole:
 type: String
 description: 'SSM Automation が引き受ける IAM ロール ARN'
 allowedPattern: '^arn:aws:iam::\d{12}:role/.+$'
  SnsTopicArn:
 type: String
 description: '修復完了通知先の SNS トピック ARN(省略可)'
 default: ''

mainSteps:
  # Step 1: 事前確認(既に暗号化済なら Step 3 へスキップ)
  - name: assertEncryption
 action: 'aws:assertAwsResourceProperty'
 onFailure: 'step:enableEncryption'# チェック失敗 = 未暗号化 → Step 2 へ
 isCritical: false
 inputs:
Service: s3
Api: GetBucketEncryption
Bucket: '{{BucketName}}'
PropertySelector: '$.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm'
DesiredValues:
  - 'AES256'
  - 'aws:kms'
 nextStep: verifyEncryption  # チェック成功 = 既に暗号化済 → Step 3 へ

  # Step 2: 暗号化有効化
  - name: enableEncryption
 action: 'aws:executeAwsApi'
 onFailure: Abort
 inputs:
Service: s3
Api: PutBucketEncryption
Bucket: '{{BucketName}}'
ServerSideEncryptionConfiguration:
  Rules:
 - ApplyServerSideEncryptionByDefault:
  SSEAlgorithm: 'AES256'
BucketKeyEnabled: true
 nextStep: verifyEncryption

  # Step 3: 修復後の確認(サニティチェック)
  - name: verifyEncryption
 action: 'aws:assertAwsResourceProperty'
 onFailure: Abort
 isCritical: true
 inputs:
Service: s3
Api: GetBucketEncryption
Bucket: '{{BucketName}}'
PropertySelector: '$.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm'
DesiredValues:
  - 'AES256'
  - 'aws:kms'
 isEnd: true

この 3 段構成が 冪等性 を担保します。Step 1 で「既に修復済」と判断されれば Step 2 をスキップして Step 3 の確認だけ行い、同じ runbook を何度実行しても副作用が出ません(§7 で詳述)。


4-4. assumeRole の設計 — 3 段信頼関係

SSM Automation の権限設計でもっとも複雑なのが assumeRole まわりです。

3 段信頼関係

[Config Service]
 │
 │  (1) Remediation Configuration を実行
 │→ AWS Config が SSM Automation を StartExecution
 ▼
[SSM Service]
 │
 │  (2) SSM が assumeRole で IAM ロールを引き受ける
 │→ runbook 内の API 呼び出し権限を取得
 ▼
[AutomationAssumeRole]  ←←← ここが runbook の {{AutomationAssumeRole}}
  iam:role/ssm-remediation-role
 │
 │  (3) 対象リソースの操作 API を呼び出す
 │s3:PutBucketEncryption / ec2:ModifyInstance 等
 ▼
[対象 AWS リソース]
  S3 / EC2 / RDS ...

信頼ポリシーの設定:

{
  "Version": "2012-10-17",
  "Statement": [
 {
"Effect": "Allow",
"Principal": {
  "Service": [
 "ssm.amazonaws.com",
 "config.amazonaws.com"
  ]
},
"Action": "sts:AssumeRole",
"Condition": {
  "StringEquals": {
 "aws:SourceAccount": "123456789012"
  }
}
 }
  ]
}

config.amazonaws.comssm.amazonaws.com の両方を信頼します。aws:SourceAccount 条件で自アカウント以外からの AssumeRole を拒否します(confused deputy 対策)。

アクセス許可ポリシー:

{
  "Version": "2012-10-17",
  "Statement": [
 {
"Sid": "S3RemediationPermissions",
"Effect": "Allow",
"Action": [
  "s3:GetBucketEncryption",
  "s3:PutBucketEncryption",
  "s3:GetBucketVersioning",
  "s3:PutBucketVersioning"
],
"Resource": "arn:aws:s3:::*"
 },
 {
"Sid": "SSMAutomationBasePermissions",
"Effect": "Allow",
"Action": [
  "ssm:GetAutomationExecution",
  "ssm:StartAutomationExecution",
  "ec2:DescribeInstanceStatus"
],
"Resource": "*"
 },
 {
"Sid": "CloudWatchLogsPermissions",
"Effect": "Allow",
"Action": [
  "logs:CreateLogGroup",
  "logs:CreateLogStream",
  "logs:PutLogEvents",
  "logs:DescribeLogGroups"
],
"Resource": "arn:aws:logs:ap-northeast-1:123456789012:log-group:/aws/ssm/*"
 }
  ]
}

4-5. SSM Parameter Store 連携

runbook 内でシークレットや設定値を参照するには、{{ssm:/path/to/parameter}} 構文を使います。

parameters:
  KmsKeyArn:
 type: String
 description: '暗号化に使う KMS キー ARN'
 default: '{{ssm:/myapp/kms/s3-key-arn}}'  # SSM Parameter Store から取得

mainSteps:
  - name: enableKmsEncryption
 action: 'aws:executeAwsApi'
 inputs:
Service: s3
Api: PutBucketEncryption
Bucket: '{{BucketName}}'
ServerSideEncryptionConfiguration:
  Rules:
 - ApplyServerSideEncryptionByDefault:
  SSEAlgorithm: 'aws:kms'
  KMSMasterKeyID: '{{KmsKeyArn}}'

シークレット(DBパスワード等)は {{ssm-secure:/path/to/secret}} を使うと SecureString パラメータを参照できます。runbook YAML 内にシークレットをハードコードしないためのパターンです。

Parameter Store パラメータの事前登録:

aws ssm put-parameter \
  --name "/myapp/kms/s3-key-arn" \
  --value "arn:aws:kms:ap-northeast-1:123456789012:key/abcd1234-..." \
  --type "String" \
  --region ap-northeast-1

4-6. CloudWatch Logs 出力設定

SSM Automation の実行ログは CloudWatch Logs に転送できます。

# Automation 実行時にログ出力先を指定(CLI 例)
aws ssm start-automation-execution \
  --document-name "Custom-RemediateS3Encryption" \
  --parameters "BucketName=myorg-terraform-state,AutomationAssumeRole=arn:aws:iam::123456789012:role/ssm-remediation-role" \
  --log-delivery-mode AWS \
  --cloud-watch-output-config '{
 "CloudWatchLogGroupName": "/aws/ssm/Custom-RemediateS3Encryption",
 "CloudWatchOutputEnabled": true
  }' \
  --region ap-northeast-1

Terraform で CloudWatch Logs グループを事前作成:

resource "aws_cloudwatch_log_group" "ssm_automation" {
  name  = "/aws/ssm/Custom-RemediateS3Encryption"
  retention_in_days = 90

  tags = {
 Project = "param-sheet-remediation"
 Env  = var.env
  }
}

ログで確認できる情報:
– 各ステップの開始・終了時刻
aws:executeScript の Python 標準出力(print() の出力がそのまま記録)
onFailure で分岐したステップ名と理由
– API 呼び出し結果(成功/失敗・レスポンスの一部)


4-7. input/output の型指定

parameters セクションで使える型を整理します。

parameters:
  # 基本型
  BucketName:
 type: String
 description: 'バケット名'
 allowedPattern: '^[a-z0-9][a-z0-9\-]{1,61}[a-z0-9]$'  # 正規表現バリデーション

  MaxRetries:
 type: Integer
 description: '最大リトライ回数'
 default: 3

  EnableVersioning:
 type: Boolean
 description: 'バージョニングを有効化するか'
 default: false

  # リスト型(複数バケットに同時適用する場合)
  BucketNames:
 type: StringList
 description: '対象バケット名のリスト'

  # Map 型(キーバリューペア)
  Tags:
 type: MapList
 description: 'タグリスト'
 default:
- Key: 'ManagedBy'
  Value: 'SSMAutomation'

後続ステップで前のステップ出力を参照するには {{stepName.OutputName}} 構文を使います:

  - name: fetchConfig
 action: 'aws:executeScript'
 outputs:
- Name: CurrentAlgorithm
  Selector: '$.Payload.Algorithm'
  Type: String

  - name: logCurrentState
 action: 'aws:executeScript'
 inputs:
InputPayload:
  Algorithm: '{{fetchConfig.CurrentAlgorithm}}'  # 前ステップの出力を参照

4-8. aws:executeScript Python runtime の IAM 権限ミニマム化

  - name: analyzeCompliance
 action: 'aws:executeScript'
 inputs:
Runtime: 'python3.11'
Handler: 'script_handler'
InputPayload:
  BucketName: '{{BucketName}}'
Script: |
  import boto3
  import json
  from botocore.exceptions import ClientError

  def script_handler(events: dict, context) -> dict:
"""
最小権限原則:
- s3:GetBucketEncryption のみ必要
- s3:ListAllMyBuckets は不要(バケット名を引数で受け取るため)
"""
bucket = events["BucketName"]
s3 = boto3.client("s3", region_name="ap-northeast-1")

try:
 resp = s3.get_bucket_encryption(Bucket=bucket)
 rules = resp["ServerSideEncryptionConfiguration"]["Rules"]
 sse_alg = rules[0]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
 return {
  "Compliant": True,
  "Algorithm": sse_alg,
  "Message": f"暗号化設定済み: {sse_alg}",
 }
except ClientError as e:
 if e.response["Error"]["Code"] == "ServerSideEncryptionConfigurationNotFoundError":
  return {
"Compliant": False,
"Algorithm": "",
"Message": "暗号化未設定",
  }
 raise

最小権限 IAM ポリシー(executeScript 専用):

{
  "Sid": "ExecuteScriptMinimal",
  "Effect": "Allow",
  "Action": [
 "s3:GetBucketEncryption"
  ],
  "Resource": "arn:aws:s3:::*"
}

aws:executeScript実行ロール(AutomationAssumeRole)の権限で boto3 を呼び出します。Lambda 関数の実行ロールとは別の概念ですので注意してください。


fig04

図 4: SSM Automation runbook ステップフロー(aws:assertAwsResourcePropertyaws:executeAwsApiaws:executeScript の 3 段)


Section 4 まとめ

要素ポイント
schemaVersionAutomation = 0.3(RunCommand の 2.2 と混同しないこと)
mainSteps20 種のアクションのうち本記事では assert / executeApi / executeScript を多用
assumeRoleConfig → SSM → AutomationAssumeRole の 3 段信頼。confused deputy 対策に SourceAccount 条件必須
Parameter Store{{ssm:/path}} で設定値・{{ssm-secure:/path}} でシークレットを参照
CloudWatch Logs/aws/ssm/<runbook-name> に出力。Python の print() もそのまま記録
IAM 最小権限executeScript の boto3 呼び出しに必要な Action のみ付与

Section 5 では、この runbook を Terraform で管理する 4 点セットを実装します。


5. Terraform で Remediation Configuration を管理する

Section 4 で設計した SSM Automation runbook と AWS Config Remediation Configuration を、すべて Terraform で宣言的に管理します。IaC として管理することで、Drift(手動変更による設定ずれ)をコード側でも検知・修復できるようになります。


5-1. Terraform リソース全体像

本セクションで作成するリソースは 4 点セットです。

Terraform リソース依存グラフ

aws_config_config_rule.s3_encryption
  │
  └──→ aws_config_remediation_configuration.s3_encryption
  │
  ├──→ aws_ssm_document.remediate_s3_encryption
  │(content = file("runbooks/remediate-s3-encryption.yaml"))
  │
  └──→ aws_iam_role.ssm_remediation
(assumeRole ARN を渡す)

fig05

図 5: Terraform リソース依存グラフ(config_rule → remediation_configuration → ssm_document → iam_role)


5-2. aws_config_config_rule — Config ルール定義

# modules/config-remediation/main.tf

resource "aws_config_config_rule" "s3_encryption" {
  name  = "s3-bucket-server-side-encryption-enabled"
  description = "S3 バケットのサーバーサイド暗号化が有効であることを確認する"

  source {
 owner = "AWS"# AWS-managed ルールを使用
 source_identifier = "S3_BUCKET_SERVER_SIDE_ENCRYPTION_ENABLED"
  }

  # スコープ: S3 バケットのみ対象
  scope {
 compliance_resource_types = ["AWS::S3::Bucket"]
  }

  # オプション: 特定タグのついたバケットのみ対象にする場合
  # scope {
  #tag_key= "ManagedBy"
  #tag_value = "Terraform"
  # }

  tags = local.common_tags

  depends_on = [aws_config_configuration_recorder.main]
}

# --- source.owner の 3 系統 ---
# "AWS"  : AWS-managed ルール(本セクションで採用)
# "CUSTOM_LAMBDA" : 旧世代のカスタムルール(Lambda 関数で評価ロジックを実装)
# "CUSTOM_POLICY" : AWS Config の新しいカスタムルール(Guard ポリシー言語)
# CUSTOM_LAMBDA より軽量・Lambda 不要

source.owner の使い分け:

owner利用場面必要スキル
AWSAWS 公式の 200+ ルールで賄えるケース(本記事はこれ)なし(ルール名を指定するだけ)
CUSTOM_LAMBDA複雑なロジックが必要・他サービスとの連携が必要Python / Node.js
CUSTOM_POLICYGuard DSL で評価ロジックを記述(2023 年 GA)CloudFormation Guard 構文

5-3. aws_ssm_document — runbook の Terraform 管理

resource "aws_ssm_document" "remediate_s3_encryption" {
  name= "Custom-RemediateS3Encryption"
  document_type= "Automation"
  document_format = "YAML"

  # runbook YAML ファイルを外部ファイルとして管理
  content = file("${path.module}/runbooks/remediate-s3-encryption.yaml")

  tags = local.common_tags
}

ディレクトリ構成:

modules/config-remediation/
├── main.tf
├── variables.tf
├── outputs.tf
├── locals.tf
└── runbooks/
 ├── remediate-s3-encryption.yaml← Section 4-3 で作成したファイル
 ├── remediate-rds-backup.yaml← (Section 6 で ashigaru4 が実装)
 └── remediate-s3-versioning.yaml← (Section 6 で ashigaru4 が実装)

file() 関数で外部 YAML を読み込むことで、runbook の変更履歴が git で追跡されます。SSM コンソール上での直接編集は禁止し、必ず PR → terraform apply のフローを守ります。


5-4. aws_iam_role — Config/SSM 信頼関係

resource "aws_iam_role" "ssm_remediation" {
  name  = "ssm-config-remediation-role"
  description = "AWS Config Remediation が SSM Automation を実行するための IAM ロール"

  assume_role_policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Effect = "Allow"
  Principal = {
 Service = [
"ssm.amazonaws.com",
"config.amazonaws.com"
 ]
  }
  Action = "sts:AssumeRole"
  Condition = {
 StringEquals = {
"aws:SourceAccount" = data.aws_caller_identity.current.account_id
 }
  }
}
 ]
  })

  tags = local.common_tags
}

data "aws_caller_identity" "current" {}

resource "aws_iam_role_policy" "ssm_remediation_s3" {
  name = "s3-remediation-permissions"
  role = aws_iam_role.ssm_remediation.id

  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Sid = "S3RemediationPermissions"
  Effect = "Allow"
  Action = [
 "s3:GetBucketEncryption",
 "s3:PutBucketEncryption",
 "s3:GetBucketVersioning",
 "s3:PutBucketVersioning"
  ]
  Resource = "arn:aws:s3:::*"
},
{
  Sid = "SSMBasePermissions"
  Effect = "Allow"
  Action = [
 "ssm:GetAutomationExecution",
 "ssm:StartAutomationExecution",
 "logs:CreateLogGroup",
 "logs:CreateLogStream",
 "logs:PutLogEvents"
  ]
  Resource = "*"
},
{
  Sid = "CloudWatchLogsScoped"
  Effect = "Allow"
  Action = [
 "logs:CreateLogGroup",
 "logs:CreateLogStream",
 "logs:PutLogEvents"
  ]
  Resource = "arn:aws:logs:${var.region}:${data.aws_caller_identity.current.account_id}:log-group:/aws/ssm/*"
}
 ]
  })
}

5-5. aws_config_remediation_configuration — 接続の核

resource "aws_config_remediation_configuration" "s3_encryption" {
  config_rule_name = aws_config_config_rule.s3_encryption.name

  # 修復実行のターゲット(SSM Document を指定)
  target_type = "SSM_DOCUMENT"
  target_id= aws_ssm_document.remediate_s3_encryption.name
  target_version = "$DEFAULT"# SSM Document のデフォルトバージョン

  # ★ automatic = false 徹底(殿要件:手動承認が必要)
  # automatic = true にすると NON_COMPLIANT 検知後に自動修復実行される
  # 本シリーズでは Approval なしの自動実行は禁止
  automatic = false

  # runbook に渡すパラメータ
  parameter {
 name= "BucketName"
 resource_value = "RESOURCE_ID"# Config が検知したリソース ID を自動注入
  }

  parameter {
 name= "AutomationAssumeRole"
 static_value = aws_iam_role.ssm_remediation.arn
  }

  parameter {
 name= "SnsTopicArn"
 static_value = var.sns_topic_arn# 通知先 SNS トピック(省略可)
  }

  # リトライ設定(automatic=true 時のみ有効だが、ベストプラクティスとして記載)
  maximum_automatic_attempts= 1
  retry_attempt_seconds  = 60
  execution_controls {
 ssm_controls {
concurrent_execution_rate_percentage = 25
error_percentage= 20
 }
  }

  depends_on = [
 aws_config_config_rule.s3_encryption,
 aws_ssm_document.remediate_s3_encryption,
 aws_iam_role.ssm_remediation
  ]
}

parameter ブロックの3種類:

type用途
resource_value = "RESOURCE_ID"Config が検知したリソース ID を動的注入バケット名・インスタンス ID
static_value = "..."固定値を渡すIAM ロール ARN・リージョン
ssm_parameter_value = "/path"SSM Parameter Store から実行時に取得KMS キー ARN・設定値

5-6. variables.tf と outputs.tf

# modules/config-remediation/variables.tf
variable "env" {
  type  = string
  description = "環境名(dev / stg / prod)"
}

variable "region" {
  type  = string
  default  = "ap-northeast-1"
  description = "AWS リージョン"
}

variable "sns_topic_arn" {
  type  = string
  description = "修復完了通知先の SNS トピック ARN(空文字列で無効化)"
  default  = ""
}

locals {
  common_tags = {
 ManagedBy = "Terraform"
 Project= "param-sheet-remediation"
 Env = var.env
  }
}
# modules/config-remediation/outputs.tf
output "config_rule_arn" {
  value = aws_config_config_rule.s3_encryption.arn
  description = "Config Rule の ARN(§8 pytest fixture で参照)"
}

output "ssm_document_name" {
  value = aws_ssm_document.remediate_s3_encryption.name
  description = "SSM Document 名(§8 pytest で StartAutomationExecution に渡す)"
}

output "remediation_role_arn" {
  value = aws_iam_role.ssm_remediation.arn
  description = "修復実行 IAM ロール ARN"
}

5-7. tfstate 管理(§8 pytest との連携)

Section 8(ashigaru6 担当)の pytest は、terraform output で取得した値を pytest フィクスチャに渡します。そのために tfstate を S3 backend に保存し、pytest 実行時に参照できるようにします。

# environments/dev/backend.tf
terraform {
  backend "s3" {
 bucket= "myorg-terraform-state"
 key= "param-sheet-remediation/dev/terraform.tfstate"
 region= "ap-northeast-1"
 dynamodb_table = "terraform-state-lock"
 encrypt  = true
  }
}
# pytest 実行前に tfstate から値を取得(conftest.py で使う)
terraform output -json -state=environments/dev/terraform.tfstate > /tmp/tf_outputs.json
# tests/conftest.py(§8 で ashigaru6 が実装する pytest fixture の前提)
import json
import pytest
from pathlib import Path

@pytest.fixture(scope="session")
def tf_outputs() -> dict:
 """terraform output -json の結果を読み込む。"""
 output_file = Path("/tmp/tf_outputs.json")
 if not output_file.exists():
  pytest.skip(reason="terraform output ファイルが存在しません。terraform apply を先に実行してください。")
 return json.loads(output_file.read_text())

@pytest.fixture(scope="session")
def ssm_document_name(tf_outputs: dict) -> str:
 return tf_outputs["ssm_document_name"]["value"]

@pytest.fixture(scope="session")
def config_rule_arn(tf_outputs: dict) -> str:
 return tf_outputs["config_rule_arn"]["value"]

5-8. automatic = false の運用フロー

本シリーズでは automatic = false を徹底します(殿要件)。手動修復のトリガー方法を整理します。

# 1. NON_COMPLIANT リソースを確認
aws configservice get-compliance-details-by-config-rule \
  --config-rule-name s3-bucket-server-side-encryption-enabled \
  --compliance-types NON_COMPLIANT \
  --region ap-northeast-1

# 2. 手動で修復を実行(1リソース)
aws configservice start-remediation-execution \
  --config-rule-name s3-bucket-server-side-encryption-enabled \
  --resource-keys resourceType=AWS::S3::Bucket,resourceId=myorg-terraform-state \
  --region ap-northeast-1

# 3. 修復実行 ID を取得して進捗確認
aws ssm get-automation-execution \
  --automation-execution-id <execution-id> \
  --region ap-northeast-1 \
  --query 'AutomationExecution.AutomationExecutionStatus'

automatic = true を使わない理由(殿要件の背景):

  1. 無限ループリスク: Config Rule が修復後に再度 NON_COMPLIANT を検知 → 自動修復 → 再検知 のサイクルが発生することがある
  2. 監査証跡の欠落: 誰がいつ承認したか記録が残らない
  3. 誤修復の影響範囲: 本番環境で意図しないリソースが書き換えられる可能性

Section 5 まとめ

4 点セット完成形の対応関係

aws_config_config_rule  ← 「何を検知するか」(S3 暗号化未設定)
 ↓
aws_config_remediation_configuration ← 「どの runbook で修復するか」(SSM Document を指定)
 ↓  「誰の権限で実行するか」(IAM ロール ARN を渡す)
aws_ssm_document  ← 「どうやって修復するか」(YAML runbook 本体)
 ↓
aws_iam_role← 「修復に必要な権限は何か」(S3 Put / CloudWatch Logs)

Section 6(ashigaru4 担当)では、この雛形をもとに S3 バージョニング・RDS バックアップ保持期間など 複数リソースタイプのカスタム runbook を実装します。


RemediationRow dataclass 参照(§7 との契約)

Section 7(ashigaru5 担当)の修復ロジックが参照する RemediationRow dataclass を先出しします(spec §4 dataclass 先出し契約準拠)。

# context/skills/drift_remediation_contract.py
from dataclasses import dataclass
from enum import Enum
from typing import Any

class RemediationState(Enum):
 DETECTED = "DETECTED"
 APPROVED = "APPROVED"
 EXECUTING= "EXECUTING"
 VERIFIED = "VERIFIED"
 FAILED= "FAILED"
 ROLLED_BACK = "ROLLED_BACK"

@dataclass(frozen=True)
class RemediationRow:
 resource_address: str # e.g. aws_s3_bucket.audit_logs
 config_rule_name: str # e.g. s3-bucket-server-side-encryption-enabled
 runbook_name:  str # e.g. Custom-RemediateS3Encryption
 pre_compliance:str # COMPLIANT / NON_COMPLIANT / ERROR
 post_compliance:  str | None# None = 未実行
 automation_execution_id:str | None
 approver:str | None# IAM principal ARN
 state:RemediationState
 error_message: str = ""
 note: str = ""

P4(ashigaru4)が §6 カスタム runbook を実装する際は、runbook_name フィールドに格納する SSM Document 名を articles/param-sheet3-s4-5.mdaws_ssm_document.remediate_s3_encryption.name(= "Custom-RemediateS3Encryption")と整合させてください。

6. カスタム runbook 実装 — YAML + Python lambda ハイブリッド

Section 5 では aws_ssm_document に外部 YAML を読み込む Terraform パターンを実装しました。Section 4-3 の S3 暗号化 runbook は aws:executeAwsApi だけで完結する比較的単純なケースでした。しかし実務では、AWS-managed runbook が対応していない複合条件の修復が頻繁に登場します。

本セクションでは「どのケースでカスタム runbook が必要になるか」の判断フローから始め、aws:executeScript(Python handler)を中核に据えた カスタム runbook 完成形 2 本を実装します。


6-1. AWS-managed runbook では賄えないケース — 判断フローチャート

fig06

図 6: AWS-managed runbook vs カスタム runbook 使い分けフローチャート

NON_COMPLIANT リソースを修復したい
 │
 ├─[Q1] AWS Systems Manager Automation Document ライブラリに
 │ 対応 runbook があるか?
 │  YES → AWS-managed runbook を使う(Section 4 のパターン)
 │  NO  ↓
 │
 ├─[Q2] 修復ロジックは「単一 API 呼び出し」で完結するか?
 │  YES → aws:executeAwsApi だけのカスタム runbook(軽量)
 │  NO  ↓
 │
 ├─[Q3] 以下のいずれかに該当するか?
 │ ・複数 API の結果を組み合わせた条件判定
 │ ・既存設定を読み取ってからマージ・加工して書き戻す
 │ ・外部システム(DynamoDB / SSM Parameter Store)との突合
 │ ・エラー詳細を構造化ログとして出力したい
 │  YES → aws:executeScript(Python handler)込みのカスタム runbook ← 本セクション
 │  NO  ↓
 │
 └─[Q4] runbook 実行自体が重い処理か(>10分 / 外部 API 呼び出し多数)?
 YES → aws:invokeLambdaFunction(Lambda に委譲)
 NO  → aws:executeScript で十分

判断のポイント:
aws:executeAwsApi は単一 API・単一リソースに最適。boto3 ドキュメントの引数をそのまま書けるが、Python のロジック(if文・ループ)は書けない
aws:executeScript を使うと Python で自由にロジックを書けるが、ステップ間でのデータ受け渡しは outputs 定義が必要
– Lambda 委譲は最後の手段。SSM Automation 内に Lambda の IAM 権限が別途必要になり、権限設計が複雑化する。


6-2. aws:executeScript Python handler 設計

カスタム runbook に Python ロジックを組み込む際のシグネチャと設計ルールを整理します。

def script_handler(events: dict, context) -> dict:
 """
 events : InputPayload に渡した dict がそのまま入る
 context: LambdaContext 相当(request_id など)。通常は使わない
 return : dict(outputs の Selector で取り出す値を含む)
 """
 bucket_name = events["BucketName"]# InputPayload のキーをそのまま参照
 # boto3 処理
 s3 = boto3.client("s3")
 s3.put_bucket_encryption(
  Bucket=bucket_name,
  ServerSideEncryptionConfiguration={
"Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
  }
 )
 return {"Status": "Success", "BucketName": bucket_name}

設計ルール:

項目ルール理由
シグネチャdef script_handler(events, context) 固定Handler: script_handler と一致させる必要がある
戻り値必ず dict を返すoutputs$.Payload.* で取り出すため
InputPayload キーPascalCase 推奨parameters セクションの変数名と統一
boto3 クライアント関数内で毎回生成スクリプト実行環境は Lambda ライクで再利用不可
print()デバッグ用途で可CloudWatch Logs に出力される(§4-6 参照)
importboto3 / json / os のみ使用可ランタイム環境に標準搭載。pip install 不可

6-3. カスタム runbook 完成形① — S3 バケット暗号化有効化(Python handler 版)

Section 4-3 の aws:executeAwsApi 版は「単純な PutBucketEncryption」でした。実務では現在の暗号化設定を読み取り・KMS キーを上書きしないという条件付き修復が必要になります。aws:executeScript を使った Python handler 版に拡張します。

# modules/config-remediation/runbooks/custom-remediate-s3-encryption.yaml
schemaVersion: '0.3'
description: |
  S3 バケットのサーバーサイド暗号化を確認・有効化する(Python handler 版)。
  AES256 または KMS 暗号化済みの場合はスキップ。
  未暗号化の場合のみ AES256 を適用する(既存の KMS 設定を上書きしない)。
assumeRole: '{{AutomationAssumeRole}}'

parameters:
  BucketName:
 type: String
 description: '対象 S3 バケット名'
 allowedPattern: '^[a-z0-9][a-z0-9\.\-]{1,61}[a-z0-9]$'
  AutomationAssumeRole:
 type: String
 description: 'SSM Automation が引き受ける IAM ロール ARN'
 allowedPattern: '^arn:aws:iam::\d{12}:role/.+$'

mainSteps:
  # Step 1: 現在の暗号化設定を読み取り(複合条件判定)
  - name: fetchEncryptionConfig
 action: 'aws:executeScript'
 timeoutSeconds: 60
 onFailure: Abort
 inputs:
Runtime: 'python3.11'
Handler: 'script_handler'
InputPayload:
  BucketName: '{{BucketName}}'
Script: |
  import boto3
  import json
  from botocore.exceptions import ClientError

  def script_handler(events: dict, context) -> dict:
bucket = events["BucketName"]
s3 = boto3.client("s3")
try:
 resp = s3.get_bucket_encryption(Bucket=bucket)
 rules = resp["ServerSideEncryptionConfiguration"]["Rules"]
 sse = rules[0]["ApplyServerSideEncryptionByDefault"]
 alg = sse.get("SSEAlgorithm", "")
 # AES256 または KMS 設定済み → スキップ
 already_encrypted = alg in ("AES256", "aws:kms")
 return {
  "AlreadyEncrypted": already_encrypted,
  "CurrentAlgorithm": alg,
 }
except ClientError as e:
 if e.response["Error"]["Code"] == "ServerSideEncryptionConfigurationNotFoundError":
  return {"AlreadyEncrypted": False, "CurrentAlgorithm": ""}
 raise
 outputs:
- Name: AlreadyEncrypted
  Selector: '$.Payload.AlreadyEncrypted'
  Type: Boolean
- Name: CurrentAlgorithm
  Selector: '$.Payload.CurrentAlgorithm'
  Type: String

  # Step 2: 未暗号化の場合のみ AES256 を適用(冪等性保証)
  - name: applyEncryptionIfNeeded
 action: 'aws:executeScript'
 timeoutSeconds: 60
 onFailure: Abort
 inputs:
Runtime: 'python3.11'
Handler: 'script_handler'
InputPayload:
  BucketName: '{{BucketName}}'
  AlreadyEncrypted: '{{fetchEncryptionConfig.AlreadyEncrypted}}'
Script: |
  import boto3

  def script_handler(events: dict, context) -> dict:
if events["AlreadyEncrypted"]:
 return {"Status": "Skipped", "Message": "既に暗号化設定済み"}
bucket = events["BucketName"]
s3 = boto3.client("s3")
s3.put_bucket_encryption(
 Bucket=bucket,
 ServerSideEncryptionConfiguration={
  "Rules": [{
"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"},
"BucketKeyEnabled": True,
  }]
 }
)
return {"Status": "Applied", "Message": f"AES256 を適用: {bucket}"}
 outputs:
- Name: RemediationStatus
  Selector: '$.Payload.Status'
  Type: String

  # Step 3: 修復後の確認(サニティチェック)
  - name: verifyEncryption
 action: 'aws:assertAwsResourceProperty'
 timeoutSeconds: 30
 onFailure: Abort
 isCritical: true
 inputs:
Service: s3
Api: GetBucketEncryption
Bucket: '{{BucketName}}'
PropertySelector: '$.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm'
DesiredValues:
  - 'AES256'
  - 'aws:kms'
 isEnd: true

6-4. カスタム runbook 完成形② — RDS バックアップ保持期間修正

RDS のバックアップ保持期間(BackupRetentionPeriod)が規定値(7日)未満の場合に修正する runbook です。現在の設定値を読み取り、必要最低限の変更だけ加えるため、aws:executeScript が必須です。

# modules/config-remediation/runbooks/custom-remediate-rds-backup.yaml
schemaVersion: '0.3'
description: |
  RDS DB インスタンスのバックアップ保持期間が 7 日未満の場合に修正する。
  既に 7 日以上設定済みの場合はスキップ(冪等性保証)。
assumeRole: '{{AutomationAssumeRole}}'

parameters:
  DBInstanceIdentifier:
 type: String
 description: '対象 RDS DB インスタンス識別子'
 allowedPattern: '^[a-zA-Z][a-zA-Z0-9\-]{0,62}$'
  AutomationAssumeRole:
 type: String
 description: 'SSM Automation が引き受ける IAM ロール ARN'
 allowedPattern: '^arn:aws:iam::\d{12}:role/.+$'
  RequiredRetentionDays:
 type: Integer
 description: '要求するバックアップ保持日数(デフォルト 7 日)'
 default: 7

mainSteps:
  # Step 1: 現在の保持期間を読み取り
  - name: fetchRetentionPeriod
 action: 'aws:executeScript'
 timeoutSeconds: 60
 onFailure: Abort
 inputs:
Runtime: 'python3.11'
Handler: 'script_handler'
InputPayload:
  DBInstanceIdentifier: '{{DBInstanceIdentifier}}'
  RequiredRetentionDays: '{{RequiredRetentionDays}}'
Script: |
  import boto3

  def script_handler(events: dict, context) -> dict:
db_id = events["DBInstanceIdentifier"]
required = int(events["RequiredRetentionDays"])
rds = boto3.client("rds")
resp = rds.describe_db_instances(DBInstanceIdentifier=db_id)
instance = resp["DBInstances"][0]
current = instance["BackupRetentionPeriod"]
status = instance["DBInstanceStatus"]
needs_fix = current < required
return {
 "CurrentRetentionDays": current,
 "NeedsRemediation": needs_fix,
 "DBStatus": status,
}
 outputs:
- Name: CurrentRetentionDays
  Selector: '$.Payload.CurrentRetentionDays'
  Type: Integer
- Name: NeedsRemediation
  Selector: '$.Payload.NeedsRemediation'
  Type: Boolean
- Name: DBStatus
  Selector: '$.Payload.DBStatus'
  Type: String

  # Step 2: available 状態でなければ待機(変更には available が必要)
  - name: waitForAvailable
 action: 'aws:waitForAwsResourceProperty'
 timeoutSeconds: 600
 onFailure: Abort
 inputs:
Service: rds
Api: DescribeDBInstances
DBInstanceIdentifier: '{{DBInstanceIdentifier}}'
PropertySelector: '$.DBInstances[0].DBInstanceStatus'
DesiredValues:
  - 'available'

  # Step 3: 保持期間が不足の場合のみ修正
  - name: applyRetentionIfNeeded
 action: 'aws:executeScript'
 timeoutSeconds: 120
 onFailure: Abort
 inputs:
Runtime: 'python3.11'
Handler: 'script_handler'
InputPayload:
  DBInstanceIdentifier: '{{DBInstanceIdentifier}}'
  NeedsRemediation: '{{fetchRetentionPeriod.NeedsRemediation}}'
  RequiredRetentionDays: '{{RequiredRetentionDays}}'
Script: |
  import boto3

  def script_handler(events: dict, context) -> dict:
if not events["NeedsRemediation"]:
 return {"Status": "Skipped", "Message": "既に要件を満たしています"}
db_id = events["DBInstanceIdentifier"]
required = int(events["RequiredRetentionDays"])
rds = boto3.client("rds")
rds.modify_db_instance(
 DBInstanceIdentifier=db_id,
 BackupRetentionPeriod=required,
 ApplyImmediately=True,
)
return {"Status": "Applied", "Message": f"保持期間を {required} 日に設定: {db_id}"}
 outputs:
- Name: RemediationStatus
  Selector: '$.Payload.Status'
  Type: String

  # Step 4: 設定値の確認
  - name: verifyRetention
 action: 'aws:assertAwsResourceProperty'
 timeoutSeconds: 60
 onFailure: Abort
 isCritical: true
 inputs:
Service: rds
Api: DescribeDBInstances
DBInstanceIdentifier: '{{DBInstanceIdentifier}}'
PropertySelector: '$.DBInstances[0].BackupRetentionPeriod'
DesiredValues:
  - '7'
  - '14'
  - '30'
  - '35'
 isEnd: true

RDS runbook の補足:
– Step 2 の aws:waitForAwsResourceProperty は RDS 特有の処置。modifyDBInstanceavailable 状態でなければ受け付けられないため。
ApplyImmediately: True にすることで次のメンテナンスウィンドウを待たずに即時適用。
DesiredValues に 7 / 14 / 30 / 35 を列挙することで「7 日以上ならOK」という OR 条件を表現。


6-5. Terraform apply との競合回避

カスタム runbook が AWS リソースを直接変更すると、terraform state との間に乖離(drift)が生まれます。この乖離を放置すると次回の terraform plan で予期しない差分が出るため、必ず後処理フローを設計します。

競合回避フロー

[SSM Automation runbook]
 │ 1. リソースを直接修復(S3 暗号化 ON / RDS 保持期間 7d)
 │ ※ runbook 内で terraform apply は絶対に呼ばない
 │(aws_terraform_apply のような SSM アクションは存在しない)
 ▼
[修復完了後の別フロー(§7 で詳述)]
 │ 2. terraform refresh(state を実際のリソース状態に同期)
 │terraform refresh -target=aws_s3_bucket.audit_logs
 │ 3. terraform plan(diff ゼロを確認)
 │terraform plan -target=aws_s3_bucket.audit_logs -out=post_remediation.tfplan
 │ 4. diff がゼロなら完了 / diff が残る場合は手動確認
 ▼
[RemediationRow.state = VERIFIED]

runbook 内で terraform apply を呼ばない理由:

  1. 循環参照リスク: terraform apply が Config Rule を再評価 → NON_COMPLIANT 再検知 → runbook 再実行のループが起きる
  2. 権限の複雑化: SSM Automation の実行ロールに terraform state S3 bucket / DynamoDB の権限が追加で必要になる
  3. べき等性の破壊: terraform apply はリソース作成・変更・削除を行う可能性があり、修復 runbook のスコープを超える

正しい分離境界:

SSM Automation runbook の責務:
  → NON_COMPLIANT なリソースを COMPLIANT な状態に戻す(単一リソース操作)

terraform refresh / plan の責務:
  → state ファイルを現実の AWS 状態と同期させる(§7 修復ロジック全体の後処理)

6-6. SSM Parameter Store によるバージョン管理

runbook を更新するたびに SSM Document のバージョンが自動インクリメントされますが、どのバージョンがいつデプロイされたかを人間が参照できる形で管理するため、Parameter Store に versiondeployed_at を記録します。

# デプロイ時にバージョン情報を登録
RUNBOOK_VERSION=$(aws ssm describe-document \
  --name "Custom-RemediateS3Encryption" \
  --query 'Document.LatestVersion' \
  --output text)

aws ssm put-parameter \
  --name "/app/runbooks/s3-encryption/v${RUNBOOK_VERSION}" \
  --value "{\"ssm_document_version\": \"${RUNBOOK_VERSION}\", \"deployed_at\": \"$(date -u +%Y-%m-%dT%H:%M:%SZ)\", \"deployed_by\": \"terraform-apply\"}" \
  --type "String" \
  --overwrite \
  --region ap-northeast-1

# /app/runbooks/s3-encryption/v1 の例
# {
#"ssm_document_version": "1",
#"deployed_at": "2026-04-19T10:00:00Z",
#"deployed_by": "terraform-apply"
# }

Terraform で管理する場合は aws_ssm_parameter リソースを使います:

# modules/config-remediation/main.tf(抜粋)
resource "aws_ssm_parameter" "runbook_version_s3" {
  name  = "/app/runbooks/s3-encryption/v${aws_ssm_document.remediate_s3_encryption.document_version}"
  type  = "String"
  value = jsonencode({
 ssm_document_version = aws_ssm_document.remediate_s3_encryption.document_version
 deployed_at = timestamp()
 deployed_by = "terraform-apply"
  })

  lifecycle {
 ignore_changes = [value]  # timestamp() は apply ごとに変わるため
  }

  tags = local.common_tags
}

名前空間の設計:

/app/runbooks/
├── s3-encryption/
│├── v1 → SSM Document version 1(初回デプロイ)
│├── v2 → バグ修正パッチ
│└── v3 → KMS 対応追加
├── rds-backup/
│├── v1 → 初回デプロイ
│└── v2 → MultiAZ 対応追加
└── s3-versioning/
 └── v1 → 初回デプロイ

この命名規則により aws ssm get-parameters-by-path --path "/app/runbooks/" --recursive で全 runbook のデプロイ履歴を一覧できます。


6-7. Terraform リソース — カスタム runbook 2 本の登録

Section 5-3 の構造を踏まえ、今回の 2 本の runbook を aws_ssm_document として登録します。

# modules/config-remediation/main.tf(追記分)

resource "aws_ssm_document" "custom_remediate_s3_encryption" {
  name= "Custom-RemediateS3Encryption-Python"
  document_type= "Automation"
  document_format = "YAML"
  content= file("${path.module}/runbooks/custom-remediate-s3-encryption.yaml")
  tags= local.common_tags
}

resource "aws_ssm_document" "custom_remediate_rds_backup" {
  name= "Custom-RemediateRDSBackupRetention"
  document_type= "Automation"
  document_format = "YAML"
  content= file("${path.module}/runbooks/custom-remediate-rds-backup.yaml")
  tags= local.common_tags
}

resource "aws_config_remediation_configuration" "rds_backup" {
  config_rule_name = aws_config_config_rule.rds_backup_retention.name
  target_type= "SSM_DOCUMENT"
  target_id  = aws_ssm_document.custom_remediate_rds_backup.name
  target_version= "$DEFAULT"
  automatic  = false

  parameter {
 name  = "DBInstanceIdentifier"
 resource_value = "RESOURCE_ID"
  }
  parameter {
 name= "AutomationAssumeRole"
 static_value = aws_iam_role.ssm_remediation.arn
  }
  parameter {
 name= "RequiredRetentionDays"
 static_value = "7"
  }

  depends_on = [
 aws_config_config_rule.rds_backup_retention,
 aws_ssm_document.custom_remediate_rds_backup,
 aws_iam_role.ssm_remediation,
  ]
}

IAM ポリシーへの追加 — RDS 修復に必要な Action:

resource "aws_iam_role_policy" "ssm_remediation_rds" {
  name = "rds-remediation-permissions"
  role = aws_iam_role.ssm_remediation.id

  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [
{
  Sid = "RDSRemediationPermissions"
  Effect = "Allow"
  Action = [
 "rds:DescribeDBInstances",
 "rds:ModifyDBInstance",
  ]
  Resource = "arn:aws:rds:${var.region}:${data.aws_caller_identity.current.account_id}:db:*"
}
 ]
  })
}

Section 6 まとめ

要素実装ポイント
カスタム runbook の要否Q1〜Q4 の判断フローで決定。複合条件・データ加工が必要なら aws:executeScript
Python handlerdef script_handler(events, context) -> dict 固定シグネチャ。eventsInputPayload が入る
冪等性Step 1 で現在状態を読み取り → Step 2 で「変更が必要な場合のみ修正」パターン
terraform apply との競合runbook 内で terraform apply は呼ばない。修復後に terraform refresh → plan を別フローで実施(§7 詳述)
Parameter Store バージョン管理/app/runbooks/{name}/v{N} 名前空間でデプロイ履歴を記録
3 段構成assertAwsResourceProperty(事前確認)→ executeScript(Python 処理)→ assertAwsResourceProperty(事後検証)

Section 7(ashigaru5 担当)では、これらの runbook を安全に運用するための冪等性設計・ロールバック・Manual Approvalを詳述します。


7. 修復ロジック設計 — 冪等性・ロールバック・Manual Approval

本セクションは本記事の核となる。§6 で実装したカスタム runbook を「安全に・何度でも・承認付きで」実行するための設計パターンを体系化する。AWS Config × SSM Automation の閉ループを本番環境で動かすには、冪等性・ロールバック・承認フローの3要素が不可欠だ。

fig07

図07: 修復ロジック状態遷移図 — DETECTED → APPROVED → EXECUTING → VERIFIED / ROLLBACK


7.1 RemediationState — 状態遷移の全体像

まず状態遷移を dataclass で定義する。これは §8 の pytest / §9 の監視通知と共有する先出し契約(parallel execution のため spec 時点で固定)だ。

# context/skills/drift_remediation_contract.py
from dataclasses import dataclass, field
from enum import Enum
from typing import Any

class RemediationState(Enum):
 DETECTED= "DETECTED"  # Config が NON_COMPLIANT を検知
 APPROVED= "APPROVED"  # Manual Approval 通過
 EXECUTING  = "EXECUTING" # SSM Automation 実行中
 VERIFIED= "VERIFIED"  # 事後検証 OK(drift=0 確認)
 FAILED  = "FAILED" # 修復失敗(ロールバック未実施)
 ROLLED_BACK = "ROLLED_BACK" # ロールバック完了

@dataclass(frozen=True)
class RemediationRow:
 resource_address: str  # e.g. aws_s3_bucket.audit_logs
 config_rule_name: str  # e.g. s3-bucket-server-side-encryption-enabled
 runbook_name: str# e.g. AWS-EnableS3BucketEncryption
 pre_compliance: str # COMPLIANT / NON_COMPLIANT / ERROR
 post_compliance: str | None  # None = 未実行
 automation_execution_id: str | None
 approver: str | None# IAM principal ARN
 state: RemediationState
 error_message: str = ""
 note: str = ""

この契約を先出しすることで、§7(修復ロジック)/ §8(pytest 検証)/ §9(運用通知)の並列実装が可能になる。各セクションは RemediationState enum を import して処理を記述する。


7.2 冪等性設計 — 再実行しても副作用ゼロ

SSM Automation runbook の最大の落とし穴は「同じ runbook を2回実行したとき何が起きるか」が保証されない点だ。Config Rule がタイミング次第で同一リソースに複数の修復トリガーを送ることがある。

冪等性チェックの実装パターンaws:assertAwsResourceProperty アクションで「すでに修復済みなら早期 return」する構造だ。

# runbooks/remediate-s3-encryption-idempotent.yaml
schemaVersion: '0.3'
description: S3バケット暗号化有効化(冪等性保証版)
parameters:
  BucketName:
 type: String
 description: 対象 S3 バケット名
  AutomationAssumeRole:
 type: String
 description: SSM Automation が assume する IAM ロール ARN

mainSteps:
  # ── Step 1: 冪等性チェック ──────────────────────────────────────────────
  - name: checkAlreadyEncrypted
 action: 'aws:assertAwsResourceProperty'
 timeoutSeconds: 60
 onFailure: 'step:savePreState'  # 未暗号化 → 次ステップへ
 inputs:
Service: s3
Api: GetBucketEncryption
BucketName: '{{BucketName}}'
PropertySelector: '$.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm'
DesiredValues:
  - 'aws:kms'
  - 'AES256'
 nextStep: alreadyCompliant  # 暗号化済み → 早期終了

  - name: alreadyCompliant
 action: 'aws:sleep'
 inputs:
Duration: PT0S
 isEnd: true  # 冪等性ガード:修復済みなら何もしない

  # ── Step 2: 修復前状態を S3 に保存(ロールバック用)────────────────────
  - name: savePreState
 action: 'aws:executeScript'
 timeoutSeconds: 120
 onFailure: Abort
 inputs:
Runtime: python3.11
Handler: script_handler
InputPayload:
  bucket_name: '{{BucketName}}'
  execution_id: '{{automation:EXECUTION_ID}}'
Script: |
  import boto3, json, datetime

  def script_handler(events, context):
s3 = boto3.client('s3')
bucket = events['bucket_name']
exec_id = events['execution_id']
ts = datetime.datetime.utcnow().isoformat()

# 修復前の暗号化設定を取得(存在しない場合も記録)
try:
 enc = s3.get_bucket_encryption(Bucket=bucket)
 pre_config = enc['ServerSideEncryptionConfiguration']
except s3.exceptions.ClientError as e:
 if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':
  pre_config = None
 else:
  raise

# SSM Automation Logs バケットへ保存
log_bucket = f"ssm-automation-logs-{boto3.client('sts').get_caller_identity()['Account']}"
key = f"pre-state/{bucket}/{exec_id}/encryption.json"
s3.put_object(
 Bucket=log_bucket,
 Key=key,
 Body=json.dumps({'pre_config': pre_config, 'timestamp': ts, 'execution_id': exec_id}),
 ServerSideEncryption='AES256',
)
return {'pre_state_key': key, 'log_bucket': log_bucket}
 outputs:
- Name: PreStateKey
  Selector: $.Payload.pre_state_key
  Type: String
- Name: LogBucket
  Selector: $.Payload.log_bucket
  Type: String

  # ── Step 3: Manual Approval ──────────────────────────────────────────────
  - name: approveRemediation
 action: 'aws:approve'
 timeoutSeconds: 3600
 onFailure: Abort
 inputs:
NotificationArn: '{{SNSTopicArn}}'
Message: 'S3バケット {{BucketName}} の暗号化(AES256)有効化を承認してください。\n修復前状態: s3://{{savePreState.LogBucket}}/{{savePreState.PreStateKey}}'
MinRequiredApprovals: 1
Approvers:
  - '{{ApproverIamArn}}'

  # ── Step 4: 修復実行 ────────────────────────────────────────────────────
  - name: enableEncryption
 action: 'aws:executeAwsApi'
 timeoutSeconds: 120
 maxAttempts: 3
 onFailure: 'step:rollbackEncryption'
 inputs:
Service: s3
Api: PutBucketEncryption
BucketName: '{{BucketName}}'
ServerSideEncryptionConfiguration:
  Rules:
 - ApplyServerSideEncryptionByDefault:
  SSEAlgorithm: 'AES256'
BucketKeyEnabled: true

  # ── Step 5: 事後確認 ────────────────────────────────────────────────────
  - name: verifyEncryption
 action: 'aws:assertAwsResourceProperty'
 timeoutSeconds: 60
 maxAttempts: 3
 onFailure: 'step:rollbackEncryption'
 inputs:
Service: s3
Api: GetBucketEncryption
BucketName: '{{BucketName}}'
PropertySelector: '$.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm'
DesiredValues:
  - 'AES256'
  - 'aws:kms'
 isEnd: true

  # ── Step 6: ロールバック ─────────────────────────────────────────────────
  - name: rollbackEncryption
 action: 'aws:executeScript'
 timeoutSeconds: 120
 inputs:
Runtime: python3.11
Handler: script_handler
InputPayload:
  bucket_name: '{{BucketName}}'
  log_bucket: '{{savePreState.LogBucket}}'
  pre_state_key: '{{savePreState.PreStateKey}}'
Script: |
  import boto3, json

  def script_handler(events, context):
s3 = boto3.client('s3')
bucket = events['bucket_name']

# 修復前状態を取得
obj = s3.get_object(Bucket=events['log_bucket'], Key=events['pre_state_key'])
pre = json.loads(obj['Body'].read())['pre_config']

if pre is None:
 # 元々暗号化なし → 暗号化設定を削除して元に戻す
 s3.delete_bucket_encryption(Bucket=bucket)
else:
 # 元の暗号化設定を復元
 s3.put_bucket_encryption(
  Bucket=bucket,
  ServerSideEncryptionConfiguration=pre,
 )
return {'rollback': 'completed'}
 isEnd: true

冪等性設計の要点

  • checkAlreadyEncrypted ステップを必ず先頭に置き、修復済みなら何もせず終了する
  • onFailure: 'step:savePreState' — assert 失敗(未修復状態)を次ステップへの「成功シグナル」として使う逆転の発想
  • alreadyCompliantisEnd: true で状態遷移を VERIFIED 相当で終わらせる

7.3 ロールバック戦略 — 修復前状態の保全と復元

修復が失敗したとき、元の状態に戻せなければ「修復によって壊れた」になる。ロールバック設計の基本は 修復前状態を外部ストレージ(S3)に保存してから変更操作を開始することだ。

ロールバックフロー全体図:

savePreState → approve → enableEncryption
│
  成功│  失敗 (onFailure: rollbackEncryption)
 ↓ ↓
verifyEncryption  rollbackEncryption
 │  │
成功 ↓完了 ↓
 VERIFIEDROLLED_BACK (exit code: 2)
失敗 → FAILED (exit code: 3)

ロールバック設計のチェックリスト:

チェック項目実装例
修復前状態の保存先SSM Automation Logs 専用 S3 バケット(ssm-automation-logs-{AccountId}
保存キー設計pre-state/{resource}/{execution_id}/encryption.json — 実行 ID で一意
ロールバック操作の冪等性delete_bucket_encryption / put_bucket_encryption は何度呼んでも同じ結果
ロールバック失敗の通知SNS → PagerDuty / Slack へエスカレーション(§9 で詳述)
保存データの TTLS3 ライフサイクルルールで 90 日後に削除(監査ログは CloudTrail で別途保持)

SSM Automation Logs バケットの Terraform 定義:

resource "aws_s3_bucket" "ssm_automation_logs" {
  bucket = "ssm-automation-logs-${data.aws_caller_identity.current.account_id}"
}

resource "aws_s3_bucket_lifecycle_configuration" "ssm_logs_lifecycle" {
  bucket = aws_s3_bucket.ssm_automation_logs.id

  rule {
 id  = "delete-pre-state-after-90-days"
 status = "Enabled"

 filter {
prefix = "pre-state/"
 }

 expiration {
days = 90
 }
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "ssm_logs_enc" {
  bucket = aws_s3_bucket.ssm_automation_logs.id

  rule {
 apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
 }
  }
}

7.4 Manual Approval — aws:approve ステップの完全実装

本記事の設計方針は automatic = false(殿要件)だ。すべての修復は人間の承認を経てから実行される。aws:approve ステップは SSM Automation における承認ゲートで、SNS 経由でメール・Slack・ntfy の3系統に承認リクエストを送れる。

aws:approve ステップの完全定義:

- name: approveRemediation
  action: 'aws:approve'
  timeoutSeconds: 3600 # 1時間以内に承認が必要
  onFailure: Abort  # タイムアウト → Abort(承認なし修復の防止)
  inputs:
 NotificationArn: '{{SNSTopicArn}}'
 Message: |
S3バケット {{BucketName}} の暗号化(AES256)有効化を承認してください。

[リソース情報]
バケット名: {{BucketName}}
Config Rule: s3-bucket-server-side-encryption-enabled
修復 Runbook: {{automation:DOCUMENT_NAME}}
実行 ID: {{automation:EXECUTION_ID}}
修復前状態: s3://{{savePreState.LogBucket}}/{{savePreState.PreStateKey}}

[承認 / 却下]
AWS Console: https://console.aws.amazon.com/systems-manager/automation
 MinRequiredApprovals: 1
 Approvers:
- '{{ApproverIamArn}}'# IAM ユーザー ARN または IAM ロール ARN

承認通知の3系統:

SNS トピックに複数のサブスクライバーを紐付けることで、1つのトリガーからメール・Slack・ntfy へ同時配信できる。

# SNS トピック(承認通知用)
resource "aws_sns_topic" "remediation_approval" {
  name = "config-remediation-approval"
}

# サブスクライバー1: メール(直接承認 URL が届く)
resource "aws_sns_topic_subscription" "email" {
  topic_arn = aws_sns_topic.remediation_approval.arn
  protocol  = "email"
  endpoint  = "ops-team@example.com"
}

# サブスクライバー2: HTTPS エンドポイント(Slack webhook 転送 Lambda 経由)
resource "aws_sns_topic_subscription" "slack_lambda" {
  topic_arn = aws_sns_topic.remediation_approval.arn
  protocol  = "lambda"
  endpoint  = aws_lambda_function.slack_notifier.arn
}

# サブスクライバー3: ntfy(セルフホスト通知)
resource "aws_sns_topic_subscription" "ntfy" {
  topic_arn = aws_sns_topic.remediation_approval.arn
  protocol  = "https"
  endpoint  = "https://ntfy.example.com/config-remediation"
}

IAM プリンシパル許可リストの設計:

Approvers には IAM ユーザー ARN・IAM ロール ARN・IAM グループ ARN のいずれも指定できる。本番運用では複数承認者MinRequiredApprovals: 2)にすることで四つ目チェックが機能する。

# 複数承認者パターン(2名承認必須)
- name: approveRemediation
  action: 'aws:approve'
  timeoutSeconds: 7200# 2時間
  inputs:
 NotificationArn: '{{SNSTopicArn}}'
 Message: '{{BucketName}} の修復を承認してください(2名承認必須)'
 MinRequiredApprovals: 2
 Approvers:
- 'arn:aws:iam::123456789012:user/alice'
- 'arn:aws:iam::123456789012:user/bob'
- 'arn:aws:iam::123456789012:role/OpsEngineerRole'  # ロールでも可

承認タイムアウトの挙動に注意

timeoutSeconds を過ぎると SSM Automation の実行ステータスが TimedOut になる。onFailure: Abort を設定しておかないと後続の修復ステップが実行されてしまう危険がある。承認ゲートは 必ず onFailure: Abort を設定すること。


7.5 エラーハンドリング — maxAttempts / timeoutSeconds / isCritical

SSM Automation の各ステップには3つの主要な障害対策パラメータがある。

パラメータ用途推奨設定
maxAttemptsリトライ回数(デフォルト: 1)API 呼び出しは 3、承認ステップは 1
timeoutSecondsステップのタイムアウト軽い API: 60、スクリプト: 300、承認: 3600
isCritical失敗時に全体を失敗扱いにするか必須ステップ: true(デフォルト)、オプション通知: false

使い分けの実例:

steps:
  # API 呼び出し — リトライあり、クリティカル
  - name: enableEncryption
 action: 'aws:executeAwsApi'
 maxAttempts: 3
 timeoutSeconds: 120
 isCritical: true # 失敗 → 全体失敗
 onFailure: 'step:rollbackEncryption'
 inputs: ...

  # Slack 通知 — リトライ1回のみ、非クリティカル
  - name: notifySlack
 action: 'aws:executeScript'
 maxAttempts: 1
 timeoutSeconds: 30
 isCritical: false# Slack 通知失敗でも修復自体は成功扱い
 onFailure: Continue
 inputs: ...

  # スクリプト実行 — リトライなし、十分なタイムアウト
  - name: savePreState
 action: 'aws:executeScript'
 maxAttempts: 1
 timeoutSeconds: 300
 isCritical: true
 onFailure: Abort
 inputs: ...

7.6 exit code 設計

SSM Automation の実行結果を下流システム(EventBridge → Lambda → pytest)で正確に処理するために、exit code を統一定義する。

exit code意味対応アクション
0修復成功(VERIFIED)Config Rule が COMPLIANT に戻ったことを確認
1修復失敗(FAILED)PagerDuty エスカレーション、手動対応キューへ
2ロールバック成功(ROLLED_BACK)Slack 通知、修復計画の見直し
3ロールバック失敗緊急エスカレーション(リソースが不定状態)
124承認タイムアウト(TimedOut)翌営業日 SLA 違反アラート

exit code の実装は EventBridge ルールのパターンマッチングで行う(§9 で詳述):

{
  "source": ["aws.ssm"],
  "detail-type": ["EC2 Automation Execution Status-change Notification"],
  "detail": {
 "Status": ["Failed", "TimedOut", "Cancelled", "CompletedWithFailure"]
  }
}

7.7 Terraform state との整合 — 修復後の drift ゼロ確認フロー

本記事最大の課題は「SSM Automation が AWS リソースを書き換えたとき、Terraform state はそれを知らない」問題だ。§6 の runbook 内では terraform apply を直接呼ばない(競合の危険)。代わりに修復後に別フローで state を同期する

Terraform state 同期フロー:

SSM Automation 修復完了
  │
  ▼
EventBridge Rule (Execution Status = Succeeded)
  │
  ▼
Lambda: trigger_terraform_refresh
  │
  ├─ terraform refresh → state と実 AWS の同期
  │
  ├─ terraform plan → diff ゼロ確認
  ││
  │ diff あり ──→ Slack 通知「手動 import が必要」
  ││
  │ diff ゼロ ──→ VERIFIED 確定
  │
  └─ RemediationState を VERIFIED に更新

Lambda: terraform_refresh の実装スケッチ:

import subprocess, boto3, os

def handler(event, context):
 """SSM Automation 完了後に terraform refresh → plan を実行して state 整合を確認"""
 execution_id = event['detail']['ExecutionId']
 tf_dir = os.environ['TF_WORKING_DIR']# /var/task/terraform

 # terraform refresh
 result = subprocess.run(
  ['terraform', 'refresh', '-no-color'],
  cwd=tf_dir, capture_output=True, text=True, timeout=300
 )
 if result.returncode != 0:
  notify_slack(f"terraform refresh 失敗 (execution: {execution_id})\n{result.stderr}")
  return {'status': 'refresh_failed'}

 # terraform plan — diff ゼロ確認
 plan = subprocess.run(
  ['terraform', 'plan', '-detailed-exitcode', '-no-color'],
  cwd=tf_dir, capture_output=True, text=True, timeout=300
 )
 # exit code: 0=no changes, 1=error, 2=changes present
 if plan.returncode == 0:
  notify_slack(f"✅ Terraform drift ゼロ確認済み (execution: {execution_id})")
  return {'status': 'verified', 'drift': False}
 elif plan.returncode == 2:
  notify_slack(f"⚠️ Terraform diff あり — 手動 import が必要 (execution: {execution_id})\n{plan.stdout}")
  return {'status': 'drift_remaining', 'drift': True}
 else:
  notify_slack(f"terraform plan エラー (execution: {execution_id})\n{plan.stderr}")
  return {'status': 'plan_error'}

terraform import が必要なケース

SSM Automation が新しいリソースを作成した場合(例: KMS キーの自動生成)、terraform refresh だけでは state に追加できない。この場合は terraform import aws_kms_key.auto_generated <key_arn> を手動で実行し、その後 terraform plan で diff ゼロを確認する。


7.8 危険なパターン — 自動修復の無限ループ回避

最も危険なアンチパターン: automatic = true + cyclic Config Rules の組み合わせ。

Config Rule: s3-bucket-tagging (タグ必須チェック)
 │
 ▼  NON_COMPLIANT
SSM Automation: タグを付与
 │
 ▼  Config Rule が再評価
Config Rule: タグを付与したことで別のルール (s3-bucket-logging) が
 NON_COMPLIANT になる
 │
 ▼  再び NON_COMPLIANT
SSM Automation: ログ有効化 → さらに別のルールが...

対策1: automatic = false の徹底(本記事設計方針)

resource "aws_config_remediation_configuration" "s3_encryption" {
  config_rule_name = aws_config_config_rule.s3_encryption.name
  target_type= "SSM_DOCUMENT"
  target_id  = "AWS-EnableS3BucketEncryption"

  automatic= false  # ← 必ず false
  maximum_automatic_attempts = 1# automatic=false でも念のため 1 に設定
  retry_attempt_seconds= 600
}

対策2: SSM Automation 内部でのループガード

- name: checkLoopGuard
  action: 'aws:executeScript'
  inputs:
 Runtime: python3.11
 Handler: script_handler
 InputPayload:
execution_id: '{{automation:EXECUTION_ID}}'
bucket_name: '{{BucketName}}'
 Script: |
import boto3

def script_handler(events, context):
 ssm = boto3.client('ssm')
 # 直近1時間に同一バケットで実行済みの自動化を確認
 response = ssm.list_automation_executions(
  Filters=[
{'Key': 'DocumentNamePrefix', 'Values': ['remediate-s3']},
{'Key': 'ExecutionStatus', 'Values': ['Success', 'InProgress']},
  ]
 )
 recent = [
  e for e in response['AutomationExecutionMetadataList']
  if events['bucket_name'] in str(e.get('Targets', ''))
 ]
 if len(recent) > 2:
  raise Exception(f"ループガード発動: {events['bucket_name']} に直近の実行が{len(recent)}件")
 return {'loop_check': 'passed', 'recent_count': len(recent)}

対策3: Config Rule の評価間隔を広げる

resource "aws_config_config_rule" "s3_encryption" {
  name = "s3-bucket-server-side-encryption-enabled"

  source {
 owner = "AWS"
 source_identifier = "S3_BUCKET_SERVER_SIDE_ENCRYPTION_ENABLED"
  }

  # 継続的評価ではなく定期評価(24時間ごと)でループリスクを低減
  maximum_execution_frequency = "TwentyFour_Hours"
}

cyclic rules の見分け方

修復操作がリソースの別の属性を変更する可能性があるルールは cyclic risk が高い。例: タグ付与 → タグ強制ルール → 再評価ループ。新しい Config Rule を追加する際は、修復操作が他の Config Rule の評価対象属性を変更しないかを必ず確認する。


7.9 第2弾の compare() との統合 — 閉ループ検証

第2弾(articles/param-sheet-aws-config-drift-test.md)で実装した compare() 関数を修復後の事後検証に再利用する。これが本シリーズの「閉ループ」の核心だ。

# §8 pytest から参照するパターン(先出し)
from param_sheet_compare import compare  # 第2弾 pip install 済

def verify_post_remediation(execution_id: str) -> RemediationRow:
 """修復後の状態を compare() で検証し RemediationRow を返す"""
 # 修復後の Config スナップショットを取得
 post_snapshot = get_config_snapshot()

 # 第2弾の compare() で drift ゼロ確認
 post_diff = compare(terraform_state=get_tf_state(), config_snapshot=post_snapshot)

 if len(post_diff.ng_rows) == 0:
  state = RemediationState.VERIFIED
 else:
  state = RemediationState.FAILED

 return RemediationRow(
  resource_address=post_diff.resource_address,
  config_rule_name=post_diff.config_rule_name,
  runbook_name="AWS-EnableS3BucketEncryption",
  pre_compliance="NON_COMPLIANT",
  post_compliance="COMPLIANT" if state == RemediationState.VERIFIED else "NON_COMPLIANT",
  automation_execution_id=execution_id,
  approver=None,
  state=state,
 )

§8 では verify_post_remediation() をフィクスチャとして利用し、修復前後の2世代スナップショット比較を pytest で自動化する。


§7 まとめ — 修復ロジック設計のチェックリスト

  • 冪等性: aws:assertAwsResourceProperty で修復済みを早期検出 → 再実行しても副作用ゼロ
  • ロールバック: 修復前状態を S3 に保存 → onFailure で逆操作 runbook を呼び出す
  • Manual Approval: aws:approve + timeoutSeconds: 3600 + onFailure: Abort の3点セット
  • エラーハンドリング: maxAttempts: 3(API) / isCritical: false(通知ステップ)の使い分け
  • Terraform state 整合: 修復後に terraform refresh → plan で diff ゼロ確認
  • 無限ループ回避: automatic = false 徹底 + ループガード実装 + 評価間隔の拡大
  • exit code 統一: 0=成功 / 1=修復失敗 / 2=ロールバック成功 / 3=ロールバック失敗 / 124=タイムアウト

8. pytest による修復結果検証(第2弾突合ツール再利用)

本セクションでは、Section 7 で設計した SSM Automation runbook による修復が本当に成功したか
pytest で自動検証する実装を解説します。

「修復前のスナップショット取得 → runbook 実行 → 修復後の再スナップショット取得 → 差分ゼロを assert」
という修復前後の 2 世代比較が設計の核です。


8-0. 図8: pytest 検証フロー

fig08

pre_remediation_snapshot← compare(expected, actual) → pre_diff(ng_rows > 0)
  ↓
trigger_runbook(SSM StartAutomationExecution)
  ↓
wait_for_completion(ポーリング or waiter)
  ↓
post_remediation_snapshot  ← compare(expected, actual) → post_diff(ng_rows == 0)
  ↓
assert len(post_diff.ng_rows) == 0

8-1. dataclass 先出し契約(§7 提示・再掲)

§7 の修復ロジックで定義された RemediationRow を pytest 側でも使用します。
本セクションでは import して利用するだけで、定義は §7 に委ねます。

# context/skills/drift_remediation_contract.py (§7 で定義・ここでは再掲)
from dataclasses import dataclass, field
from enum import Enum
from typing import Any

class RemediationState(Enum):
 DETECTED = "DETECTED"
 APPROVED = "APPROVED"
 EXECUTING= "EXECUTING"
 VERIFIED = "VERIFIED"
 FAILED= "FAILED"
 ROLLED_BACK = "ROLLED_BACK"

@dataclass(frozen=True)
class RemediationRow:
 resource_address: str
 config_rule_name: str
 runbook_name: str
 pre_compliance: str  # COMPLIANT / NON_COMPLIANT / ERROR
 post_compliance: str | None# None = 未実行
 automation_execution_id: str | None
 approver: str | None
 state: RemediationState
 error_message: str = ""
 note: str = ""

8-2. conftest.py(fixture 設計)

修復検証テストの fixture は 3 段階 で構成されます。

fixture目的スコープ
pre_diff修復前の drift 状態(前条件: ng_rows > 0)function
runbook_execution_idSSM Automation 実行 IDfunction
post_diffrunbook 完了後の再検査結果(事後条件: ng_rows == 0)function
# tests/conftest.py
from __future__ import annotations

import os
import time
from typing import Any

import boto3
import pytest

from param_sheet_drift import compare# 第2弾 §7 のモジュール
from scripts.config_fetcher import config_fetch
from scripts.tf_plan_parser import parse_plan

# -----------------------------------------------------------------------
# 共通ユーティリティ
# -----------------------------------------------------------------------

def get_expected(resource_address: str, plan_path: str) -> dict[str, Any]:
 """plan.json から指定アドレスの期待値を取得する。"""
 all_expected = parse_plan(plan_path)
 return all_expected.get(resource_address, {})


def get_actual(config_client, resource_type: str, resource_id: str) -> dict[str, Any]:
 """AWS Config から指定リソースの現状値を取得する。"""
 result = config_fetch(
  config_client,
  resource_types=[resource_type],
  resource_ids=[resource_id],
 )
 return result.get(resource_id, {})


def wait_for_completion(
 ssm_client,
 execution_id: str,
 timeout_seconds: int = 300,
 poll_interval: int = 10,
) -> str:
 """SSM Automation 実行が終了するまでポーリングする。終了ステータスを返す。"""
 deadline = time.monotonic() + timeout_seconds
 while time.monotonic() < deadline:
  resp = ssm_client.get_automation_execution(
AutomationExecutionId=execution_id
  )
  status = resp["AutomationExecution"]["AutomationExecutionStatus"]
  if status in {"Success", "Failed", "Cancelled", "TimedOut"}:
return status
  time.sleep(poll_interval)
 raise TimeoutError(
  f"SSM Automation {execution_id!r} が {timeout_seconds} 秒以内に完了しませんでした"
 )


# -----------------------------------------------------------------------
# セッションスコープ: AWS クライアント
# -----------------------------------------------------------------------

@pytest.fixture(scope="session")
def aws_clients():
 """boto3 セッションから各種クライアントを生成する。"""
 env = os.getenv("TEST_ENV", "dev")
 profile = os.getenv(f"AWS_PROFILE_{env.upper()}", env)
 session = boto3.Session(profile_name=profile, region_name="ap-northeast-1")
 return {
  "ssm": session.client("ssm"),
  "config": session.client("config"),
 }


# -----------------------------------------------------------------------
# Function スコープ: 修復前後の 2 世代 fixture
# -----------------------------------------------------------------------

@pytest.fixture
def pre_diff(aws_clients, resource_address, resource_type, resource_id, plan_path):
 """修復前の drift 状態を取得する(前条件検査用)。"""
 expected = get_expected(resource_address, plan_path)
 actual= get_actual(aws_clients["config"], resource_type, resource_id)
 return compare(expected=expected, actual=actual)


@pytest.fixture
def runbook_execution_id(aws_clients, runbook_name, runbook_params):
 """SSM Automation を実行して実行 ID を返す。"""
 ssm = aws_clients["ssm"]

 # SSM Automation 実行権限の確認(権限なしは明示的にスキップ)
 try:
  resp = ssm.start_automation_execution(
DocumentName=runbook_name,
Parameters=runbook_params,
  )
  return resp["AutomationExecutionId"]
 except ssm.exceptions.ClientError as exc:
  if exc.response["Error"]["Code"] == "AccessDeniedException":
pytest.skip(
 reason=(
  "SSM Automation 実行権限が必要です "
  "(iam:PassRole / ssm:StartAutomationExecution)。"
  "SKIP = FAIL ルール準拠: このテストは権限付与後に再実行してください。"
 )
)
  raise


@pytest.fixture
def post_diff(aws_clients, runbook_execution_id, resource_address, resource_type, resource_id, plan_path):
 """runbook 完了後に再検査した drift 状態を返す。"""
 status = wait_for_completion(aws_clients["ssm"], runbook_execution_id)
 if status != "Success":
  pytest.fail(f"SSM Automation が失敗しました: status={status!r}")

 expected = get_expected(resource_address, plan_path)
 actual= get_actual(aws_clients["config"], resource_type, resource_id)
 return compare(expected=expected, actual=actual)

8-3. test_remediation.py(メインテスト)

修復前後の事前/事後条件

# tests/test_remediation.py
from __future__ import annotations

import pytest

# -----------------------------------------------------------------------
# parametrize: Config Rule × runbook の組み合わせ
# -----------------------------------------------------------------------

REMEDIATION_CASES = [
 pytest.param(
  "s3-bucket-server-side-encryption-enabled",
  "NON_COMPLIANT",
  "AWS-EnableS3BucketEncryption",
  {"BucketName": ["myorg-terraform-state"]},
  "aws_s3_bucket.state",
  "AWS::S3::Bucket",
  "myorg-terraform-state",
  id="s3-sse",
 ),
 pytest.param(
  "ec2-instance-no-public-ip",
  "NON_COMPLIANT",
  "AWSConfigRemediation-DisablePublicAccessToRDSInstance",
  {"InstanceId": ["i-0123456789abcdef0"]},
  "aws_instance.web",
  "AWS::EC2::Instance",
  "i-0123456789abcdef0",
  id="ec2-no-public-ip",
 ),
]


@pytest.mark.parametrize(
 "rule_name, expected_compliance, runbook_name, runbook_params, "
 "resource_address, resource_type, resource_id",
 REMEDIATION_CASES,
)
def test_remediation_success(
 pre_diff,
 post_diff,
 rule_name: str,
 expected_compliance: str,
 runbook_name: str,
 runbook_params: dict,
 resource_address: str,
 resource_type: str,
 resource_id: str,
):
 """修復 runbook 実行前後で drift が解消されることを検証する。"""
 # 前条件: 修復前に drift が存在すること
 assert len(pre_diff.ng_rows) > 0, (
  f"前条件未達: {resource_address} に drift が存在しません。"
  "テストデータが修復済みの状態です。"
 )

 # 事後条件: 修復後に drift がゼロになること
 assert len(post_diff.ng_rows) == 0, (
  f"{resource_address} の修復後に {len(post_diff.ng_rows)} 件の drift が残存しています:\n"
  + "\n".join(
f"  {r.attribute}: expected={r.expected!r} actual={r.actual!r}"
for r in post_diff.ng_rows
  )
 )

SSM Automation が Config Rule に準拠することの事後確認

@pytest.mark.parametrize(
 "rule_name, expected_compliance, runbook_name, runbook_params, "
 "resource_address, resource_type, resource_id",
 REMEDIATION_CASES,
)
def test_config_compliance_after_remediation(
 aws_clients,
 runbook_execution_id,
 rule_name: str,
 expected_compliance: str,
 runbook_name: str,
 runbook_params: dict,
 resource_address: str,
 resource_type: str,
 resource_id: str,
):
 """runbook 完了後に AWS Config の準拠ステータスが COMPLIANT になることを確認する。"""
 from scripts.config_fetcher import get_compliance_status

 # Config は即時反映しないため最大 60 秒待機
 import time
 deadline = time.monotonic() + 60
 while time.monotonic() < deadline:
  status = get_compliance_status(
aws_clients["config"], rule_name, resource_id
  )
  if status == "COMPLIANT":
break
  time.sleep(5)

 assert status == "COMPLIANT", (
  f"Config Rule {rule_name!r} が修復後も {status!r} のままです。"
  "Config Recorder の記録間隔(最大 6 時間)に注意してください。"
 )

8-4. moto でのモック戦略

moto は boto3 の多くのサービスをモック可能ですが、SSM Automation の start_automation_execution
は 2024年時点で部分モックのみ
対応であり、runbook ステップの実行は再現できません。

このため、「SSM Automation 呼び出し単体のユニットテスト」と「実 AWS 環境での結合テスト」を分離
する戦略を取ります。

# tests/test_remediation_unit.py(moto モック使用 — ユニットテスト)
import boto3
import pytest
from moto import mock_aws
from unittest.mock import patch, MagicMock


@mock_aws
def test_start_automation_execution_called(monkeypatch):
 """SSM StartAutomationExecution が正しいパラメータで呼ばれることを確認する(モック)。"""
 import boto3
 ssm = boto3.client("ssm", region_name="ap-northeast-1")

 # moto は start_automation_execution を受け付けるが実行 ID のみ返す(ステップは非実行)
 resp = ssm.start_automation_execution(
  DocumentName="AWS-EnableS3BucketEncryption",
  Parameters={"BucketName": ["myorg-terraform-state"]},
 )
 assert "AutomationExecutionId" in resp


@mock_aws
def test_wait_for_completion_success(monkeypatch):
 """wait_for_completion が Success ステータスで正常終了することを確認する(モック)。"""
 # moto の SSM Automation は即座に Success を返す挙動を利用
 import boto3
 from tests.conftest import wait_for_completion

 ssm = boto3.client("ssm", region_name="ap-northeast-1")
 resp = ssm.start_automation_execution(
  DocumentName="AWS-EnableS3BucketEncryption",
  Parameters={"BucketName": ["myorg-terraform-state"]},
 )
 exec_id = resp["AutomationExecutionId"]

 status = wait_for_completion(ssm, exec_id, timeout_seconds=10)
 assert status == "Success"

モック制約の注意: moto の SSM Automation モックは実際の runbook ステップを実行しません。
test_remediation_success(前後差分比較)は実 AWS 環境で実行する統合テストとして分類してください。
ユニットテスト(test_remediation_unit.py)と統合テスト(test_remediation.py)を
pytest.initestpaths で分離することを推奨します。


8-5. SKIP = FAIL ルール準拠の文書化

⚠️ SKIP = FAIL ルール(本プロジェクト運用規則)
テストレポートに SKIP が 1 件以上存在する場合、「テスト未完了」として扱います。
pytest.skip(reason=...) を使用する際は必ず スキップ理由と再実行条件reason に明記してください。
CIレポートで skip 件数を確認し、0 になるまで環境を整備してから「テスト完了」と報告すること。

本セクションの skip 条件と対処方法:

skip 発生条件reason 文字列対処
AccessDeniedException on start_automation_execution"SSM Automation 実行権限が必要です (iam:PassRole / ssm:StartAutomationExecution)。SKIP = FAIL ルール準拠: このテストは権限付与後に再実行してください。"IAM ポリシーに ssm:StartAutomationExecutioniam:PassRole を追加
Config Recorder 未有効"AWS Config Recorder が無効です。pytest.ini の skip_if_no_recorder マーカー参照。"Config Recorder を有効化後に再実行
SSM Automation タイムアウトfixture 内で TimeoutErrorpytest.fail() に変換(skip ではなく fail)runbook の timeoutSeconds を増加、または手動でステータスを確認
# pytest.ini
[pytest]
markers =
 integration: 実 AWS 環境が必要な統合テスト(SSM Automation / Config 実行権限)
 unit: moto モックで実行可能なユニットテスト
 skip_if_no_recorder: Config Recorder が無効の場合にスキップ

# 統合テストを除外して高速ユニットテストのみ実行する場合:
# pytest -m "unit" tests/
# 統合テストを含めて全件実行する場合:
# pytest tests/

9. 運用 — 監視・通知・監査ログ

修復 runbook が実行されるたびに誰が何をしたかを記録し、失敗時に即座に通知する仕組みが
エンタープライズ運用では不可欠です。
本セクションでは EventBridge → SNS による通知設定と、CloudTrail を使った監査ログの活用を解説します。


9-1. EventBridge rule — SSM Automation 通知

SSM Automation の実行ステータスが変化すると、EventBridge に自動的にイベントが発行されます。
このイベントを SNS トピックへルーティングし、メール・Slack・ntfy で通知します。

# terraform/modules/drift_ops/eventbridge.tf
terraform {
  required_version = ">= 1.9.0"
  required_providers {
 aws = { source = "hashicorp/aws", version = "~> 5.0" }
  }
}

resource "aws_cloudwatch_event_rule" "ssm_automation_status" {
  name  = "ssm-automation-status-change"
  description = "SSM Automation 実行ステータス変化を SNS へ転送"

  event_pattern = jsonencode({
 source= ["aws.ssm"]
 "detail-type" = ["EC2 Automation Execution Status-change Notification"]
 detail = {
Status = ["Success", "Failed", "Cancelled", "TimedOut"]
 }
  })
}

resource "aws_cloudwatch_event_target" "ssm_to_sns" {
  rule= aws_cloudwatch_event_rule.ssm_automation_status.name
  target_id = "SsmToSns"
  arn = aws_sns_topic.drift_ops.arn

  input_transformer {
 input_paths = {
exec_id  = "$.detail.ExecutionId"
status= "$.detail.Status"
doc_name = "$.detail.DocumentName"
 }
 input_template = "\"[drift-ops] runbook=<doc_name> status=<status> execution_id=<exec_id>\""
  }
}

9-2. SNS → メール / Slack webhook / ntfy の 3 系統

SNS トピックとメール購読

# terraform/modules/drift_ops/sns.tf
resource "aws_sns_topic" "drift_ops" {
  name  = "drift-ops-notifications"
  kms_master_key_id = aws_kms_key.sns.id
}

resource "aws_sns_topic_subscription" "email" {
  topic_arn = aws_sns_topic.drift_ops.arn
  protocol  = "email"
  endpoint  = var.ops_email  # シークレットは変数経由
}

Slack webhook(Lambda 経由)

SNS → Lambda → Slack Incoming Webhook の構成が最小実装です。

resource "aws_sns_topic_subscription" "lambda_slack" {
  topic_arn = aws_sns_topic.drift_ops.arn
  protocol  = "lambda"
  endpoint  = aws_lambda_function.slack_notifier.arn
}
# lambda/slack_notifier/index.py
import json
import os
import urllib.request


def handler(event: dict, context) -> None:
 """SNS メッセージを Slack webhook へ転送する。"""
 webhook_url = os.environ["SLACK_WEBHOOK_URL"]  # SSM Parameter Store 経由で設定
 for record in event.get("Records", []):
  message = record["Sns"]["Message"]
  payload = json.dumps({"text": f":wrench: {message}"}).encode()
  req = urllib.request.Request(
webhook_url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
  )
  with urllib.request.urlopen(req) as resp:
resp.read()

セキュリティ注記: SLACK_WEBHOOK_URL は Lambda 環境変数に直接埋め込まず、
SSM Parameter Store(SecureString)から起動時に取得する構成を推奨します。

ntfy(セルフホスト型プッシュ通知)

# lambda/slack_notifier/index.py に ntfy 送信を追加する例(選択式)
import os
import urllib.request

NTFY_TOPIC = os.getenv("NTFY_TOPIC", "drift-ops")
NTFY_SERVER = os.getenv("NTFY_SERVER", "https://ntfy.sh")

def notify_ntfy(message: str, priority: str = "default") -> None:
 """ntfy トピックへ通知を送る。"""
 req = urllib.request.Request(
  f"{NTFY_SERVER}/{NTFY_TOPIC}",
  data=message.encode(),
  headers={"Priority": priority, "Title": "drift-ops"},
  method="POST",
 )
 with urllib.request.urlopen(req) as resp:
  resp.read()

9-3. CloudTrail 監査ログ — 誰が承認・実行したか

SSM Automation の実行操作は CloudTrail に自動記録されます。
who approved / who executed を特定するクエリ例:

# 過去 24 時間の SSM StartAutomationExecution イベントを確認
aws cloudtrail lookup-events \
  --lookup-attributes AttributeKey=EventName,AttributeValue=StartAutomationExecution \
  --start-time "$(date -u -d '24 hours ago' +%Y-%m-%dT%H:%M:%SZ)" \
  --region ap-northeast-1 \
  --query 'Events[].{Time:EventTime, User:Username, ExecId:CloudTrailEvent}' \
  --output table

# Manual Approval の承認操作(SendAutomationSignal)を確認
aws cloudtrail lookup-events \
  --lookup-attributes AttributeKey=EventName,AttributeValue=SendAutomationSignal \
  --start-time "$(date -u -d '7 days ago' +%Y-%m-%dT%H:%M:%SZ)" \
  --region ap-northeast-1 \
  --output json | jq '.Events[] | {time: .EventTime, user: .Username}'

9-4. 監査要件対応 — CloudWatch Logs → S3 → Athena

長期保管・クロス集計が必要な場合は、SSM Automation の実行ログを S3 に Export し
Athena でクエリ可能にします。

# terraform/modules/drift_ops/cloudwatch.tf

# SSM Automation ログを CloudWatch Logs グループへ集約
resource "aws_cloudwatch_log_group" "ssm_automation" {
  name  = "/aws/ssm/automation/drift-ops"
  retention_in_days = 90
  kms_key_id  = aws_kms_key.logs.arn
}

# S3 Export タスク(Lambda で週次実行)
resource "aws_lambda_function" "log_exporter" {
  function_name = "ssm-log-exporter"
  runtime = "python3.12"
  handler = "index.handler"
  memory_size= 128
  timeout = 300
  role = aws_iam_role.log_exporter.arn
  filename= "log_exporter.zip"

  environment {
 variables = {
LOG_GROUP_NAME= aws_cloudwatch_log_group.ssm_automation.name
EXPORT_BUCKET = aws_s3_bucket.audit_logs.bucket
EXPORT_PREFIX = "ssm-automation-logs"
 }
  }
}

Athena でのクエリ例:

-- drift 修復の週次サマリ(修復成功率・平均実行時間)
SELECT
  DATE_TRUNC('week', from_iso8601_timestamp(timestamp)) AS week,
  runbook_name,
  COUNT(*) AS total_executions,
  SUM(CASE WHEN status = 'Success' THEN 1 ELSE 0 END) AS success_count,
  ROUND(
 100.0 * SUM(CASE WHEN status = 'Success' THEN 1 ELSE 0 END) / COUNT(*), 1
  ) AS success_rate_pct,
  AVG(execution_duration_seconds) AS avg_duration_sec
FROM ssm_automation_logs
WHERE year = '2026'
GROUP BY 1, 2
ORDER BY 1 DESC, 4 DESC;

9-5. 運用 KPI — 月次 drift レポート

drift 修復運用を継続的に改善するために、以下の KPI を月次で記録します。

KPI計測方法目標
月次 drift 検知件数Config Rule の NON_COMPLIANT 記録数前月比 10% 減少トレンド
修復成功率SSM Automation Success / 全実行数≥ 95%
平均承認時間SendAutomationSignal の時刻 – StartAutomationExecution の時刻≤ 4 時間
手動介入率runbook 失敗後に手動操作が発生した割合≤ 5%

これらの KPI を Python スクリプトで集計し、Excel の drift_kpi シートへ自動記録する拡張も
drift_report.py(第2弾 §9-6)と同じパターンで実装可能です。

# scripts/drift_kpi.py(月次 KPI 集計スクリプト骨子)
from __future__ import annotations

import json
from datetime import datetime, timezone
from pathlib import Path

import boto3


def collect_monthly_kpi(year: int, month: int, region: str = "ap-northeast-1") -> dict:
 """指定年月の drift 修復 KPI を CloudTrail から集計する。"""
 client = boto3.client("cloudtrail", region_name=region)

 start = datetime(year, month, 1, tzinfo=timezone.utc)
 end= datetime(year, month + 1, 1, tzinfo=timezone.utc) if month < 12 \
else datetime(year + 1, 1, 1, tzinfo=timezone.utc)

 executions: list[dict] = []
 paginator = client.get_paginator("lookup_events")
 for page in paginator.paginate(
  LookupAttributes=[{"AttributeKey": "EventName", "AttributeValue": "StartAutomationExecution"}],
  StartTime=start,
  EndTime=end,
 ):
  executions.extend(page["Events"])

 total= len(executions)
 success = sum(1 for e in executions if _is_success(e))

 return {
  "year": year,
  "month": month,
  "total_executions": total,
  "success_count": success,
  "success_rate_pct": round(100.0 * success / total, 1) if total else 0.0,
 }


def _is_success(event: dict) -> bool:
 """CloudTrail イベントから成功判定する(簡易実装)。"""
 trail = json.loads(event.get("CloudTrailEvent", "{}"))
 return trail.get("responseElements", {}).get("automationExecutionId") is not None

Section 8・9 執筆完了。 pytest 修復前後の2世代 fixture / moto partial モック戦略 / SKIP = FAIL 文書化 / EventBridge→SNS 3系統通知 / CloudTrail 監査 / 月次 KPI 集計を網羅。

Section 10. ハンズオン実行と成果物確認

本記事で構築してきたパイプラインを一気通貫で走らせます。第2弾の pytest 突合(20秒)→ SSM Automation runbook 実行(60〜120秒)→ 事後 pytest 検証(20秒)という3段のフローを make drift-repair-all の1コマンドで完結させます。

Remediation 未有効の読者へ: Section 3 の aws_config_remediation_configuration が未適用の場合、Section 4〜7 のコード設計学習と Section 8 の pytest モックテストまではローカルで進められます。実際の drift 修復を体験するには Section 3 のリソース一式を terraform apply してから本章のハンズオンを実施してください。SSM Automation の実行コスト(step 課金・1000 step/月無料)も本章で明示します。


10-1. 事前確認チェックリスト

# AWS 認証確認
aws sts get-caller-identity
# {
#"Account": "123456789012",
#"Arn": "arn:aws:iam::123456789012:user/drift-repair-tester"
# }

# Config Recorder が有効か確認
aws configservice describe-configuration-recorder-status \
  --region ap-northeast-1 \
  --query 'ConfigurationRecordersStatus[*].{Name:name,Recording:recording}' \
  --output table

# Remediation Configuration が登録済みか確認
aws configservice describe-remediation-configurations \
  --config-rule-names \
 s3-bucket-server-side-encryption-enabled \
 rds-instance-backup-enabled \
  --region ap-northeast-1 \
  --output table

# SSM Automation 実行権限確認
aws ssm describe-automation-executions \
  --region ap-northeast-1 \
  --max-results 1 \
  --output json > /dev/null && echo "SSM Automation: OK"

10-2. エンドツーエンド実行スクリプト(make drift-repair-all)

#!/usr/bin/env bash
# run_drift_repair.sh — 検知→承認→修復→事後検証の一気通貫実行
# 使い方: bash run_drift_repair.sh [ENV: dev|stg|prod]
set -euo pipefail

ENVIRONMENT="${1:-dev}"
PLAN_FILE="environments/${ENVIRONMENT}/plan.json"
REPORT_DIR="output/remediation-reports/${ENVIRONMENT}"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)

mkdir -p "${REPORT_DIR}"

echo "============================================="
echo "  Drift Repair Pipeline — ${ENVIRONMENT}"
echo "============================================="

# ── Step 1: 事前 drift 検知(第2弾 pytest 再利用)~20秒 ───────────
echo ""
echo "▶ [1/4] 事前 drift 検知 (pytest)..."
PLAN_PATH="${PLAN_FILE}" \
AWS_ENV="${ENVIRONMENT}" \
pytest tests/test_drift.py \
  -v --tb=short \
  --junit-xml="${REPORT_DIR}/pre_drift_${TIMESTAMP}.xml" \
  2>&1 | tee "${REPORT_DIR}/pre_drift_${TIMESTAMP}.log" || true

PRE_NG=$(grep -c 'FAILED' "${REPORT_DIR}/pre_drift_${TIMESTAMP}.log" || echo 0)
echo "事前 drift NG 件数: ${PRE_NG}"

if [ "${PRE_NG}" -eq 0 ]; then
  echo "✓ drift なし — 修復実行をスキップします"
  exit 0
fi

# ── Step 2: Manual Approval 取得(SNS → メール承認)──────────────
echo ""
echo "▶ [2/4] Manual Approval 待機中..."
echo "  承認メールが ${APPROVER_EMAIL:-admin@example.com} に送信されました"
echo "  メール内のリンクをクリックして修復を承認してください"
echo "  タイムアウト: ${APPROVAL_TIMEOUT_MINUTES:-60} 分"

# Manual Approval は aws:approve ステップで SSM が管理するため
# CLI から直接操作はしない。承認後に SSM が自動で次ステップを実行する。
# ここでは承認済み前提でシミュレーション(実環境では不要)

# ── Step 3: SSM Automation runbook 実行(~60-120秒)─────────────
echo ""
echo "▶ [3/4] SSM Automation runbook 実行中..."

# Config の Remediation Configuration を使って修復を起動
EXECUTION_IDS=()
NON_COMPLIANT_RESOURCES=$(aws configservice get-compliance-details-by-config-rule \
  --config-rule-name "s3-bucket-server-side-encryption-enabled" \
  --compliance-types NON_COMPLIANT \
  --region ap-northeast-1 \
  --query 'EvaluationResults[*].EvaluationResultIdentifier.EvaluationResultQualifier.ResourceId' \
  --output text 2>/dev/null || echo "")

for resource_id in ${NON_COMPLIANT_RESOURCES}; do
  echo "  修復対象: ${resource_id}"
  EXEC_ID=$(aws configservice start-remediation-execution \
 --config-rule-name "s3-bucket-server-side-encryption-enabled" \
 --resource-keys "[{\"resourceType\":\"AWS::S3::Bucket\",\"resourceId\":\"${resource_id}\"}]" \
 --region ap-northeast-1 \
 --query 'FailedItems' \
 --output text 2>/dev/null || echo "")

  if [ -n "${EXEC_ID}" ]; then
 EXECUTION_IDS+=("${EXEC_ID}")
 echo "  Execution ID: ${EXEC_ID}"
  fi
done

# runbook 完了待機
echo "  runbook 完了を待機中(最大 3 分)..."
sleep 90
echo "  ✓ runbook 実行完了(推定)"

# ── Step 4: 事後 drift 検証(第2弾 pytest 再利用)~20秒 ──────────
echo ""
echo "▶ [4/4] 事後 drift 検証 (pytest)..."
PLAN_PATH="${PLAN_FILE}" \
AWS_ENV="${ENVIRONMENT}" \
pytest tests/test_drift.py \
  tests/test_remediation.py \
  -v --tb=short \
  --junit-xml="${REPORT_DIR}/post_drift_${TIMESTAMP}.xml" \
  2>&1 | tee "${REPORT_DIR}/post_drift_${TIMESTAMP}.log"

POST_NG=$(grep -c 'FAILED' "${REPORT_DIR}/post_drift_${TIMESTAMP}.log" || echo 0)

echo ""
echo "============================================="
echo "  Drift Repair Summary — ${ENVIRONMENT}"
echo "  事前 NG: ${PRE_NG} 件 → 事後 NG: ${POST_NG} 件"
if [ "${POST_NG}" -eq 0 ]; then
  echo "  ✓ 全修復完了 — drift クリア"
else
  echo "  ✗ 未解決 drift: ${POST_NG} 件 (要確認)"
fi
echo "============================================="

実行方法:

chmod +x run_drift_repair.sh

# dev 環境の修復パイプライン実行
./run_drift_repair.sh dev

# Makefile 経由(全環境一括)
make drift-repair-all

10-3. 実行結果サンプル

修復前(drift 検知フェーズ)

▶ [1/4] 事前 drift 検知 (pytest)...

tests/test_drift.py::test_no_ng_verdict FAILED
tests/test_drift.py::test_s3_bucket_encryption[aws_s3_bucket.audit_logs] FAILED

=== FAILURES ===
test_no_ng_verdict
  AssertionError: 2件のNG検出:
 [NG] aws_s3_bucket.audit_logs / server_side_encryption_configuration
expected='aws:kms' actual=None
 [NG] aws_db_instance.main / backup_retention_period
expected=7 actual=0

事前 drift NG 件数: 2

修復後(事後検証フェーズ)

▶ [4/4] 事後 drift 検証 (pytest)...

tests/test_drift.py::test_no_ng_verdict PASSED
tests/test_drift.py::test_s3_bucket_encryption[aws_s3_bucket.audit_logs] PASSED
tests/test_drift.py::test_rds_backup_retention[aws_db_instance.main] PASSED
tests/test_remediation.py::test_remediation_completed PASSED

=================== 12 passed in 19.84s ===================

=============================================
  Drift Repair Summary — dev
  事前 NG: 2 件 → 事後 NG: 0 件
  ✓ 全修復完了 — drift クリア
=============================================

Excel「差分一覧」修復前/後の比較

【param-sheet.xlsx — Sheet2(修復前)】
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
リソース名属性期待値現状値  判定  修復後判定
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
aws_s3_bucket.audit_logs server_side_encryption aws:kms  NoneNG (実行中)
aws_db_instance.main  backup_retention_period 7 0NG (実行中)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

【param-sheet.xlsx — Sheet2(修復後)】
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
リソース名属性期待値現状値判定  修復後判定
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
aws_s3_bucket.audit_logs server_side_encryption aws:kms  aws:kms  OK ✓ OK
aws_db_instance.main  backup_retention_period 7  7  OK ✓ OK
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

修復後の Sheet2 には差分がゼロになった証跡が残り、これが受入試験書の「修復完了確認欄」として機能します。


10-4. 実行時間の目安とコスト

Step 1: 事前 drift 検知 (pytest)  ─── ~20 秒
Step 2: Manual Approval 待機─── 担当者の応答速度次第(設定: 60 分タイムアウト)
Step 3: SSM Automation runbook 実行  ─── 60〜120 秒(リソース数・runbook step 数による)
Step 4: 事後 drift 検証 (pytest)  ─── ~20 秒
 ─────────────
自動実行部分合計(承認除く) ─── ~3 分

SSM Automation コスト:
– Step 実行数: aws:assertAwsResourceProperty × 1 + aws:executeScript × 1 + aws:executeAwsApi × 1 = 3 step/修復1件
– 無料枠: 1,000 step/月(小規模ハンズオンは無料枠で完結)
– 超過分: $0.002/step(100件修復 × 3 step = $0.60)
– S3 ログ: 標準料金(数KB/修復 → ハンズオン規模では数円以下)


10-5. トラブルシュート

ケース 1: Manual Approval タイムアウト

FAILED: aws:approve step timed out after 3600 seconds.
  ApprovalStatus: WaitingForApproval
  Execution ID: aaaabbbb-cccc-dddd-eeee-ffffgggghhhh

対処法:
1. SNS メールの受信ボックスを確認(迷惑メールフォルダも)
2. aws ssm send-automation-signal で手動承認:

aws ssm send-automation-signal \
  --automation-execution-id "aaaabbbb-cccc-dddd-eeee-ffffgggghhhh" \
  --signal-type "Approve" \
  --payload '{"Comment":["Approved by admin"]}' \
  --region ap-northeast-1
  1. タイムアウト値を延長したい場合は §7 の timeoutSeconds を調整後 aws ssm update-document で再登録

ケース 2: runbook 権限不足

Error: Failed to execute step 'RemediateS3Encryption'.
  StatusMessage: User: arn:aws:iam::123456789012:role/ssm-automation-role
  is not authorized to perform: s3:PutBucketEncryption on resource: arn:aws:s3:::audit-logs-bucket

対処法: Section 5 の aws_iam_role ポリシーに不足権限を追加:

# terraform/remediation_iam.tf(追記)
resource "aws_iam_role_policy" "ssm_s3_remediation" {
  name = "ssm-s3-remediation-policy"
  role = aws_iam_role.ssm_automation.id

  policy = jsonencode({
 Version = "2012-10-17"
 Statement = [{
Sid = "AllowS3Remediation"
Effect = "Allow"
Action = [
  "s3:PutBucketEncryption",
  "s3:GetBucketEncryption",
  "s3:PutBucketVersioning",
  "s3:GetBucketVersioning",
]
Resource = "arn:aws:s3:::*"
 }]
  })
}

terraform apply 後、修復を再試行してください。


ケース 3: rollback 失敗時の手動リカバリ

Error: Rollback step 'RestoreS3Config' failed.
  StatusMessage: The specified bucket configuration could not be restored.
  SavedState: s3://myorg-runbook-states/aws_s3_bucket.audit_logs/20260419T095000Z.json

対処法:
1. S3 に保存された修復前スナップショットを確認:

aws s3 cp s3://myorg-runbook-states/aws_s3_bucket.audit_logs/20260419T095000Z.json - | jq .
  1. スナップショットを参照して手動でリソースを元の状態に戻す:
# 例: S3 バケット暗号化を元の設定に戻す
aws s3api put-bucket-encryption \
  --bucket audit-logs-bucket \
  --server-side-encryption-configuration '{"Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]}'
  1. 手動リカバリ後、Terraform state を同期:
cd environments/dev
terraform refresh
terraform plan  # diff が 0 件であることを確認

Section 11. まとめと次の発展

本記事では、AWS Config × Systems Manager Automation を組み合わせた drift 自動修復パイプラインを構築しました。これによりシリーズ3段階の閉ループが完成しました。


11-1. シリーズ3段階の完成宣言

param-sheet-tf-config シリーズ3弾を通して構築してきたパイプラインを振り返ります。

【第1弾】 設計の可視化
─────────────────────────────────────────────────────────────────
  Terraform コード → parse_plan() → write_excel()
  → Excel パラメーターシート(期待値列)
  → 設計レビュー・顧客提出 ✓

【第2弾】 設計整合の単体テスト化
─────────────────────────────────────────────────────────────────
  terraform plan JSON (期待値)
  AWS Config Advanced Query (現状値)
  → compare() → DiffRow [Verdict: OK / NG / UNKNOWN]
  → pytest → PASS / FAIL 自動判定 ✓

【第3弾(本記事)】 drift 是正の閉ループ化
─────────────────────────────────────────────────────────────────
  Verdict.NG → Config Remediation → SSM Automation runbook
  → Manual Approval → 自動修復 → 事後 pytest 検証
  → Excel Sheet2「修復後判定」全 OK ✓

  設計 → 検証 → 修復 の3段階が完成。閉ループ達成。
─────────────────────────────────────────────────────────────────

11-2. 本記事で身につくスキルの棚卸

本シリーズ3弾を完走した読者は、以下のスキルが手元に揃います。

カテゴリ習得スキル
AWS ConfigRecorder / Remediation Configuration / Advanced Query / Config Rule の設計と Terraform 管理
SSM Automationrunbook 構造(schemaVersion 0.3)/ aws:approve / aws:executeScript / 権限設計
修復設計冪等性実装パターン / ロールバック S3 スナップショット設計 / Manual Approval フロー
pytest 応用修復前後の2世代比較 fixture / pre_diff → runbook → post_diff の一気通貫テスト
Terraform 運用修復後の terraform refreshplan diff ゼロ 確認フロー / state 整合性保守
Excel 証跡管理Sheet2「差分一覧」の修復前/後 2列拡張 / 受入試験書への自動転記

11-3. 本記事で意図的に踏み込まなかった領域

領域理由参照先
GitHub Actions との Manual Approval 連携CI/CD 統合は cmd_040 へ委譲複数人開発シリーズ第2弾
Automatic Remediation(非承認・即時修復)state 不整合リスク・殿要件により除外AWS 公式ドキュメント
Organizations + SCP でのマルチAC統制マルチアカウント権限ハードル高第4弾候補(未着手)
policy-as-code(OPA / Sentinel)事前ガード別シリーズ候補第4弾候補(未着手)

11-4. 読者別の発展ルート

ルート A: CI/CD に組み込みたい

make drift-repair-all を GitHub Actions の workflow_dispatch に乗せ、Manual Approval を environment.protection_rules と連動させることで、PR マージ → drift 検知 → 承認 → 修復の自動化が完成します。

AWS×Terraform 複数人開発 第2弾(GitHub Actions+OIDC) が最短経路です。


ルート B: マルチAC統制に発展させたい

Organizations + AWS Config Aggregator + SCP(Service Control Policies)を組み合わせることで、複数 AWS アカウントを横断した drift 監視・修復ができます。これは本シリーズ第4弾の候補です。


ルート C: 社内展開・運用監視に使いたい

本記事の run_drift_repair.sh + EventBridge + SNS 通知(Section 9)を組み合わせて、月次の drift チェック → 自動通知 → 担当者承認 → 修復という運用フローを確立できます。

drift 件数・修復成功率・平均承認時間を CloudWatch Metrics で可視化することで、インフラ品質の KPI ダッシュボードが完成します。


11-5. おわりに

param-sheet-tf-config シリーズ3弾を通して、Terraform × AWS Config を軸にした「設計 → 検証 → 修復」の閉ループが完成しました。

  • 第1弾: Terraform コードから Excel パラメーターシートを自動生成し、設計書作成の工数を削減
  • 第2弾: AWS Config で実環境の現状値を取得し、pytest で設計値との差分を単体テスト化
  • 第3弾(本記事): drift として検知された差分を SSM Automation で自動修復し、事後検証まで閉ループ化

エンタープライズ案件で避けられない「設計書と実環境の乖離」を、このシリーズで体系的に自動化できます。修復後の terraform refreshplan diff ゼロ 確認まで含めることで、Terraform state との整合性も維持されます。

CI/CD に組み込んでさらに発展させたい場合は、下記のシリーズが参考になります。

CI/CD へ発展させる(複数人開発シリーズ 第2弾: GitHub Actions+OIDC)