- 1 なぜ Data Analytics Vol2 か — リアルタイム分析アーキテクチャ概観
- 2 Kinesis Data Streams 本番運用
- 3 Amazon MSK 本番運用
- 4 Amazon QuickSight 本番運用
- 5 EMR Serverless 本番運用
- 6 詰まりポイント 7選 図解
- 6.1 詰まり① Kinesis Shard数過不足 — WriteProvisionedThroughputExceeded / Hot Shard発生
- 6.2 詰まり② KCL チェックポイント遅延 — DynamoDB Throttle / リース競合 / IteratorAge増加
- 6.3 詰まり③ MSK パーティション再割当暴発 — Consumer Group Rebalance Storm
- 6.4 詰まり④ QuickSight SPICE容量枯渇 — リフレッシュ失敗 / データソース肥大化
- 6.5 詰まり⑤ EMR Serverless コールドスタート — Pre-initialized未設定によるSLA違反
- 6.6 詰まり⑥ Kinesis→Firehose Transform Lambdaタイムアウト — 6MB制限 / バッファリング設定ミス
- 6.7 詰まり⑦ MSK Connect Connector障害 — DLQ未設定 / Offset管理ミス / Plugin互換性
- 7 アンチパターン → 正解パターン変換演習
- 8 まとめ — Data Analytics 二部作完成 × 55記事化達成
なぜ Data Analytics Vol2 か — リアルタイム分析アーキテクチャ概観
AWS本番運用シリーズ Data Analytics Vol2 — リアルタイム分析統合ガイド
本記事はData Analytics Vol1(バッチ分析基盤: Athena×Glue×Lake Formation×Redshift Serverless)に続く Vol2 です。リアルタイムデータ分析基盤を 4サービス で解説します。
| レイヤー | サービス | 本記事での主題 |
|—|—|—|
| ストリーミング取込 | Kinesis Data Streams | Shard設計 / Enhanced Fan-Out / On-Demand / KCL 2.x |
| メッセージング | Amazon MSK | Serverless / Connect / Topic設計 / Tiered Storage / Schema Registry |
| 可視化・BI | Amazon QuickSight | SPICE最適化 / Embedding / RLS / Q (NLQ) |
| 分散処理 | EMR Serverless | Application設計 / Spark Job / Auto-scaling / Cost最適化 |
Vol1 と Vol2 — バッチ × リアルタイムの二部作構造
AWS データ分析基盤は大きく「バッチ分析」と「リアルタイム分析」の2レイヤーで構成されます。Vol1 では Athena × Glue × Lake Formation × Redshift Serverless を用いたバッチ分析基盤(データの蓄積・整理・分析)を解説しました。本 Vol2 では、そのバッチ基盤と密接に連携しながらリアルタイムデータを流入・処理・可視化する 4サービスを取り上げます。
| 観点 | Vol1(バッチ分析基盤) | Vol2(リアルタイム分析基盤) |
|---|---|---|
| 処理タイミング | 定期バッチ(時間〜日次) | ミリ秒〜秒単位のストリーミング |
| 主なユースケース | データカタログ整備・DWH クエリ最適化 | リアルタイムダッシュボード・ストリーム処理 |
| データ蓄積先 | S3 Data Lake(長期保管・コスト優先) | Kinesis → S3/Redshift へ継続流入 |
| 分析スタイル | アドホック クエリ・定型レポート | ライブモニタリング・イベント駆動処理 |
| 代表サービス | Athena / Glue / Lake Formation / Redshift Serverless | Kinesis / MSK / QuickSight / EMR Serverless |
二部作の連携フロー: Kinesis Firehose が S3 Data Lake へデータを継続流入 → Glue ETL で整形 → Athena / Redshift で分析 → QuickSight で可視化。2記事を組み合わせると「データの流入から可視化まで」のフルサイクルが完成します。
リアルタイム分析4レイヤー詳細
本記事が扱う4サービスは、リアルタイム分析基盤の異なる役割を担います。各レイヤーの主なユースケースと本番設計上の要点を概観します。
レイヤー1: ストリーミング取込 — Kinesis Data Streams(§2)
Amazon Kinesis Data Streams(KDS)は、毎秒数万〜数百万件のレコードをリアルタイムで取り込む AWS マネージドストリーミングサービスです。処理単位「Shard」でスループットを制御し、On-Demand モードでは最大 200 MB/s まで自動スケーリングします。Enhanced Fan-Out(専用スループット Consumer 毎 2 MB/s)を使うと、複数のダウンストリームシステムが同一ストリームを独立して高速消費できます。
主なユースケース:
– クリックストリーム・アクセスログのリアルタイム収集
– IoT デバイスデータ(センサー・GPS・温度計)の大量取込
– EC サイトの購買行動ログ収集と異常検知パイプライン
本番設計の要点:
– Hot Shard を避けるための Partition Key 設計(カーディナリティ確保)
– On-Demand vs Provisioned の選定基準(ピーク予測可否・コスト計算)
– Firehose 連携による S3 / Redshift への自動配信
レイヤー2: メッセージング — Amazon MSK(§3)
Amazon MSK(Managed Streaming for Apache Kafka)はフルマネージド Kafka サービスです。MSK Serverless ではブローカー・ストレージ・ZooKeeper の管理が完全不要で、IAM 認証によるゼロトラスト接続が可能です。MSK Connect(マネージド Kafka Connect)でコネクタの自動スケーリングと監視を一元管理できます。
主なユースケース:
– マイクロサービス間のイベントバス(疎結合アーキテクチャ)
– Change Data Capture(CDC)パイプライン(RDS → S3 / Redshift)
– 大規模ログ集約とリアルタイム解析
本番設計の要点:
– MSK Serverless vs Provisioned の使い分け(スループット規模・IAM 認証要件)
– Topic 設計(パーティション数計算・Compaction・Retention 戦略)
– Tiered Storage による長期 Retention とコスト最適化
レイヤー3: 可視化・BI — Amazon QuickSight(§4)
Amazon QuickSight は SPICE(Super-fast, Parallel, In-memory Calculation Engine)で数百万行を高速可視化するマネージド BI サービスです。RLS(Row-Level Security)でユーザー単位のデータアクセス制御が可能で、Embedding API で自社アプリへのダッシュボード統合ができます。Q(NLQ)機能により「先月の売上上位 10 商品は?」といった自然言語クエリにも対応します。
主なユースケース:
– 経営ダッシュボード(KPI・売上・在庫のリアルタイム監視)
– SaaS アプリへの Embedding(マルチテナント BI)
– ノンエンジニア向けセルフサービス分析(Q NLQ 活用)
本番設計の要点:
– SPICE キャパシティ管理と増分リフレッシュによる更新コスト削減
– RLS ルールによるテナント分離(マルチテナント SaaS)
– Embedding セッション管理とカスタムドメイン設定
レイヤー4: 分散処理 — EMR Serverless(§5)
Amazon EMR Serverless は Apache Spark / Hive をサーバーレスで実行するマネージド分散処理サービスです。ジョブ実行時のみ Executor が起動し、完了後は自動停止します。Pre-initialized Capacity を設定するとコールドスタートを回避でき、SLA 要件の厳しいバッチ処理にも対応します。
主なユースケース:
– 大規模機械学習の特徴量エンジニアリング(数テラバイト規模)
– Kinesis / MSK から受け取ったストリームデータの集計・変換
– 定期バッチ ETL(EC2 管理不要・コスト最適化)
本番設計の要点:
– Pre-initialized Capacity でのコールドスタート回避(SLA 担保)
– Spark Job のメモリ・Executor 設定チューニング
– vCPU 時間ベースのコスト管理と Spot 利用
対象読者と前提知識
対象読者
本記事は以下のエンジニアを主な読者として想定しています。
- Streaming Engineer: Kinesis / MSK でリアルタイムパイプラインを構築・運用する。KCL 2.x や MSK Connect の本番設計に課題を感じている
- Data Platform Engineer: ストリーミング取込から分析基盤連携までの統合アーキテクチャを設計する。EMR Serverless の Auto-scaling 設計に悩んでいる
- BI Engineer: QuickSight でセルフサービス BI を全社展開する。SPICE チューニングや Embedding の本番設計を担当している
前提知識
| 分野 | 想定知識レベル |
|---|---|
| AWS 基礎 | IAM / VPC / S3 の基本操作・概念を理解している |
| S3 / Athena 基本 | Data Lake の概念と Athena によるクエリ経験がある(Vol1 参照推奨) |
| Kafka 基礎概念 | Producer / Consumer / Topic / Partition の概念を理解している |
| Python 基礎 | SDK コード例を読み解き、写経・改変できる |
Vol1 未読の方は先にバッチ分析基盤(Athena × Glue × Lake Formation × Redshift Serverless)を理解しておくことを推奨します。Firehose → S3 連携や QuickSight データソース設定の文脈が把握しやすくなります。
本記事の読み方ガイド
本記事は各サービスの「本番運用ノウハウ」に特化して構成されています。入門チュートリアルは公式 Workshop / Getting Started を参照のうえ、本記事では本番環境で必要な設計判断・チューニング・トラブルシュートに絞って解説します。
| セクション | 内容 | 推奨読者 |
|---|---|---|
| §2 Kinesis | Shard 設計・On-Demand・Enhanced Fan-Out・KCL 2.x・Firehose 連携 | Streaming Engineer |
| §3 MSK | Serverless・Connect・Topic 設計・Tiered Storage・Schema Registry | Streaming / Platform Engineer |
| §4 QuickSight | SPICE 最適化・Embedding・RLS・Dashboard 共有・Q NLQ | BI Engineer |
| §5 EMR Serverless | Application 設計・Spark Job・Auto-scaling・S3 連携 | Data Platform Engineer |
| §6 詰まり 7 選 | 本番でハマりやすい 7 パターン図解 | 全員 |
| §7 演習 5 問 | アンチパターン→正解パターン変換 | 全員 |
| §8 まとめ | 二部作完成・全軸クロスリンク | 全員 |
読み方の推奨: 担当の §2〜§5 から読み始め、§6(詰まり 7 選)→§7(演習)の順に進めると本番設計力を段階的に積み上げられます。担当外のセクションも §6 の図解と §7 の演習は全員必読です。
Kinesis Data Streams 本番運用

Kinesis Data Streams の本番運用はShard 設計から始まります。スループット計算を誤るとスロットリングが頻発し、ストリーミング基盤全体の遅延・欠損を引き起こします。
§2-1. Shard 設計 — キャパシティ計算・Resharding・Hot Shard 対策
Shard のキャパシティ基礎
1 Shard あたりのキャパシティは以下のとおりです。
| 方向 | 上限 | 詳細 |
|---|---|---|
| 書込(Ingest) | 1 MB/s | 最大 1,000 records/s |
| 読取(GetRecords) | 2 MB/s | 最大 5 TPS / Consumer |
| Enhanced Fan-Out 読取 | 2 MB/s / Consumer | 各 Consumer が独立確保 |
| データ保持期間 | デフォルト 24h | 最大 365 日(延長設定可) |
必要 Shard 数の算出式:
必要 Shard 数(書込) = ceil( 最大書込スループット [MB/s] / 1 MB/s )
必要 Shard 数(読取) = ceil( 最大読取スループット [MB/s] / 2 MB/s )
適用 Shard 数 = max(書込計算, 読取計算) × 安全係数 1.5
例: 書込ピーク 8 MB/s・読取ピーク 10 MB/s の場合 → max(8, 5) × 1.5 = 12 Shard
# ストリーム作成(Provisioned モード: 12 Shard)
aws kinesis create-stream \
--stream-name prod-clickstream \
--shard-count 12 \
--region ap-northeast-1
# ストリーム状態・Shard 数確認
aws kinesis describe-stream-summary \
--stream-name prod-clickstream \
--query 'StreamDescriptionSummary.[StreamStatus,OpenShardCount,RetentionPeriodHours]'
Resharding — スケールアウト・スケールイン
Shard 数の変更は Split(分割)と Merge(統合)で行います。
| 操作 | コマンド | 用途 |
|---|---|---|
| Split | split-shard | スループット増加時(1 Shard → 2 Shard) |
| Merge | merge-shards | スループット低下時(隣接 2 Shard → 1 Shard) |
# Shard 分割(スケールアウト / Hot Shard 対策)
aws kinesis split-shard \
--stream-name prod-clickstream \
--shard-to-split shardId-000000000003 \
--new-starting-hash-key 170141183460469231731687303715884105728
# Shard 統合(スケールイン / コスト最適化)
aws kinesis merge-shards \
--stream-name prod-clickstream \
--shard-to-merge shardId-000000000006 \
--adjacent-shard-to-merge shardId-000000000007
Shard 変更は 1 日 10 回まで(スケールダウン制限)。連続変更には 30 秒のインターバルが必要です。
Hot Shard 対策 — Partition Key 設計
同一 Partition Key のレコードは同一 Shard に割り当てられます。Key のカーディナリティが低いと特定 Shard に負荷が集中し WriteProvisionedThroughputExceeded が頻発します。
import boto3, uuid, json
kinesis = boto3.client('kinesis', region_name='ap-northeast-1')
# NG: 固定 Key → 全レコードが同一 Shard に集中
kinesis.put_record(
StreamName='prod-clickstream',
Data=json.dumps(event).encode('utf-8'),
PartitionKey='fixed-key'
)
# OK: ユーザー ID を Key(同一ユーザーの順序保証 + 分散)
kinesis.put_record(
StreamName='prod-clickstream',
Data=json.dumps(event).encode('utf-8'),
PartitionKey=event['user_id']
)
# OK: UUID(順序不要・最大分散)
kinesis.put_record(
StreamName='prod-clickstream',
Data=json.dumps(event).encode('utf-8'),
PartitionKey=str(uuid.uuid4())
)
%% Mermaid01: Kinesis リアルタイム取込パイプライン
graph LR
Producer[Data Producer] --> KDS[Kinesis Data Streams]
KDS --> EFO[Enhanced Fan-Out Consumer]
KDS --> KCL[KCL 2.x Consumer]
KDS --> Firehose[Kinesis Firehose]
Firehose --> S3[S3 Data Lake]
Firehose --> RS[Redshift Serverless]
EFO --> Lambda[Lambda Processor]
KCL --> App[Custom Application]
Kinesis On-Demand vs Provisioned — 本番選定の決め手
On-Demandモードは最大200MB/s書込・400MB/s読取まで自動スケーリングし、Shard管理が不要。トラフィックが予測困難なワークロードや急激なスパイクがあるユースケースに最適。一方、安定スループットが見込める場合はProvisionedモードの方がコスト効率が高い。本番では「ピーク時の1.5倍」を基準にモード選定する。
§2-2. On-Demand モード — 自動スケーリングとコスト計算
On-Demand モードでは Shard の手動管理が不要になりますが、料金体系が Provisioned と異なります。
| 項目 | On-Demand | Provisioned |
|---|---|---|
| 最大書込スループット | 200 MB/s(ストリーム全体) | Shard 数 × 1 MB/s |
| 最大読取スループット | 400 MB/s(ストリーム全体) | Shard 数 × 2 MB/s |
| Shard 管理 | 不要(自動スケーリング) | 手動(Split / Merge) |
| 課金方式 | 処理データ量ベース | Shard 時間ベース(未使用でも課金) |
| 最適ユースケース | 予測困難・スパイクあり | 安定した高スループット |
コスト比較(東京リージョン目安): 書込 10 GB/日・読取 20 GB/日のケース:
– On-Demand: 書込・読取 $0.014/GB × 30 GB/日 × 30 日 ≒ 月額 $12〜
– Provisioned 5 Shard: $0.021/Shard 時間 × 5 × 720h = 月額 $75.6
スループットが安定して予測できる本番環境では Provisioned の方がコスト効率が高い。月間トラフィックの変動が大きい場合(ピーク/オフピーク比 5 倍超)は On-Demand が有利です。
# On-Demand モードでストリーム作成
aws kinesis create-stream \
--stream-name prod-events-ondemand \
--stream-mode-details StreamMode=ON_DEMAND \
--region ap-northeast-1
# 既存 Provisioned ストリームを On-Demand に切り替え
aws kinesis update-stream-mode \
--stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/prod-events \
--stream-mode-details StreamMode=ON_DEMAND
§2-3. Enhanced Fan-Out — 専用スループットによる並列消費
Enhanced Fan-Out の仕組みと比較
標準の GetRecords API では、1 Shard あたり 2 MB/s の読取スループットを全 Consumer が共有します。Consumer が増えると 1 台あたりの帯域が減少しレイテンシが増大します。Enhanced Fan-Out は各 Consumer が専用の 2 MB/sを確保するため、Consumer 数が増えてもスループットが劣化しません。
| 比較項目 | 標準 Consumer | Enhanced Fan-Out |
|---|---|---|
| 読取スループット | 2 MB/s / Shard(全 Consumer 共有) | 2 MB/s / Shard / Consumer(専用) |
| 配信方式 | ポーリング(GetRecords API) | HTTP/2 Push(SubscribeToShard API) |
| レイテンシ | 200ms〜 | 70ms 以下(p99) |
| 登録上限 | 制限なし | 20 Consumer / ストリーム |
| 追加料金 | なし | $0.015/Consumer 時間 + $0.013/GB |
| 最適ユースケース | 低スループット・コスト優先 | 複数 Consumer・低レイテンシ要件 |
Enhanced Fan-Out の設定
# Consumer 登録(Enhanced Fan-Out)
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/prod-clickstream \
--consumer-name analytics-consumer
# 登録 Consumer 一覧と状態確認
aws kinesis list-stream-consumers \
--stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/prod-clickstream \
--query 'Consumers[*].[ConsumerName,ConsumerStatus,ConsumerARN]' \
--output table
Consumer は ACTIVE ステータスになるまで数秒かかります(CREATING 中はサブスクリプション不可)。登録上限 20 を超えるユースケースでは、Fan-Out Consumer の集約レイヤーを挟む設計を検討します。
§2-4. KCL 2.x — リース管理・チェックポイント・Graceful Shutdown
KCL 2.x のアーキテクチャ
Kinesis Client Library(KCL)2.x は、複数の Worker(ECS タスク / EC2 インスタンス)間で Shard を自動分散処理するライブラリです。DynamoDB テーブルをリース管理ストアとして使用し、各 Worker が担当 Shard のチェックポイントを記録します。
| コンポーネント | 役割 |
|---|---|
| DynamoDB リーステーブル | Shard ↔ Worker のマッピング・チェックポイント保存 |
| LeaseCoordinator | リース取得・更新・返却の調停 |
| ShardSyncer | Shard 分割/統合時の新 Shard 自動追跡 |
| RecordProcessor | ビジネスロジック実装(ユーザー定義) |
KCL 2.x Consumer の実装(Python)
import json
from amazon_kinesis_client.lib.processor import RecordProcessorBase
class ClickStreamProcessor(RecordProcessorBase):
def initialize(self, initialization_input):
self.shard_id = initialization_input.shard_id
self.checkpoint_error_count = 0
def process_records(self, process_records_input):
records = process_records_input.records
for record in records:
payload = json.loads(record.binary_data)
self._handle_event(payload)
# 100 レコードごとにチェックポイント(DynamoDB 書込)
if records:
try:
process_records_input.checkpointer.checkpoint()
self.checkpoint_error_count = 0
except Exception:
self.checkpoint_error_count += 1
if self.checkpoint_error_count > 5:
raise
def _handle_event(self, payload):
pass # ビジネスロジック(DynamoDB 書込・S3 バッファリング等)
def lease_lost(self, lease_lost_input):
# 別 Worker にリースを移譲された — チェックポイント書込不可
pass
def shard_ended(self, shard_ended_input):
# Shard 終了(Split/Merge 後)→ 最終チェックポイント書込
shard_ended_input.checkpointer.checkpoint()
def shutdown_requested(self, shutdown_requested_input):
# Graceful Shutdown(ECS タスク停止・オートスケーリング縮小時)
shutdown_requested_input.checkpointer.checkpoint()
DynamoDB スロットリング対策
チェックポイント頻度が高すぎると DynamoDB 書込スループットを圧迫します。高スループット環境では KCL リーステーブルを On-Demand モードにすることでスロットリングを防止できます。
# KCL リーステーブルを On-Demand に変更(スロットリング防止)
aws dynamodb update-table \
--table-name prod-clickstream \
--billing-mode PAY_PER_REQUEST
§2-5. Firehose 連携 — S3/Redshift/OpenSearch への自動配信
配信先と基本設定
Amazon Data Firehose(旧 Kinesis Firehose)は KDS のデータをバッファリングしながら各配信先へ自動配信します。
| 配信先 | バッファ設定 | 主なユースケース |
|---|---|---|
| S3 | サイズ: 1–128 MB / 時間: 60–900 秒 | Data Lake 蓄積・長期保管 |
| Redshift | COPY コマンド経由(S3 中間ステージ必須) | DWH へのロード |
| OpenSearch Service | ドキュメント単位インデックス | ログ検索・可視化 |
| HTTP Endpoint | カスタム配信先(Splunk 等) | 外部 SIEM / 監視ツール |
Dynamic Partitioning — S3 パスの自動分割
Dynamic Partitioning を使うと、レコード内フィールドを S3 プレフィックスに自動展開できます。Athena のパーティションプルーニングに直結するため、クエリコスト削減に重要です。
{
"DeliveryStreamName": "prod-clickstream-firehose",
"ExtendedS3DestinationConfiguration": {
"BucketARN": "arn:aws:s3:::prod-datalake-raw",
"Prefix": "clickstream/year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/",
"ErrorOutputPrefix": "clickstream-errors/!{firehose:error-output-type}/",
"DynamicPartitioningConfiguration": {
"Enabled": true
},
"ProcessingConfiguration": {
"Enabled": true,
"Processors": [
{
"Type": "MetadataExtraction",
"Parameters": [
{
"ParameterName": "MetadataExtractionQuery",
"ParameterValue": "{year:.event_date[0:4],month:.event_date[5:7],day:.event_date[8:10]}"
},
{
"ParameterName": "JsonParsingEngine",
"ParameterValue": "JQ-1.6"
}
]
}
]
},
"BufferingHints": {
"SizeInMBs": 128,
"IntervalInSeconds": 300
},
"CompressionFormat": "PARQUET"
}
}
Transform Lambda — 変換とサイズ制限
Firehose は各レコードを Lambda で変換できます。Lambda のペイロード上限(6 MB)に注意が必要で、変換後レコードサイズがこの上限を超えると配信失敗します。バッファリング設定(サイズ・時間)はデータ鮮度とコストのトレードオフです。リアルタイム性が重要なケースでは短く(60 秒)、コスト優先では大きく(128 MB / 900 秒)設定します。
Amazon MSK 本番運用

MSK Serverless — Kafka運用負荷ゼロの選択肢
MSK Serverlessはブローカー・ストレージ・ZooKeeperの管理が完全不要。IAM認証のみ対応(SASL/SCRAM不可)、パーティション数に応じた自動スケーリング、秒単位課金。開発・テスト環境や中規模ストリーミングに最適。ただし大規模環境(100パーティション超/高スループット)ではProvisioned MSKのコスト効率が勝る場合がある。
MSK Serverless アーキテクチャと制約事項
MSK Serverlessは2023年に一般提供(GA)となったフルマネージドKafkaサービスです。ブローカーインスタンス数・EBSストレージ・ZooKeeperの管理が完全不要で、Apache Kafkaと100%互換のAPIを提供します。
MSK Serverlessの主要仕様:
| 項目 | 仕様 |
|---|---|
| Kafkaバージョン | 2.8.2.tiered以降 |
| 最大書込スループット | 200 MB/s (クラスタ全体) |
| 最大読取スループット | 400 MB/s (クラスタ全体) |
| 最大パーティション数 | 200 (デフォルト) / 上限緩和申請で拡張可 |
| 認証方式 | IAM必須 (SASL/SCRAMは非対応) |
| 暗号化 | TLS必須 (PLAINTEXT通信不可) |
| Retention上限 | 7日 (Tiered Storage有効化で無期限化可能) |
| AZ構成 | 2または3 AZ |
課金モデル (us-east-1参考):
– クラスタ稼働時間: $0.00556/時間
– 書込ユニット: $0.10/GB
– 読取ユニット: $0.05/GB
– ストレージ: $0.10/GB-月 (ローカルTier)
MSK Serverless vs Provisioned 選定マトリクス:
| 選定基準 | MSK Serverless推奨 | MSK Provisioned推奨 |
|---|---|---|
| スループット特性 | 変動大・スパイク対応 | 安定した高スループット |
| 認証方式 | IAMのみで充足 | SASL/SCRAMが必要 |
| 管理コスト | 最小化したい | インフラ設定を細かく制御したい |
| レイテンシ要件 | p99 < 500ms許容 | 低レイテンシ厳格要件あり |
| コスト特性 | 低〜中スループット帯 | 高スループット安定稼働 |
MSK Serverlessクラスタ作成例:
# MSK Serverless クラスタ作成
aws kafka create-cluster-v2 \
--cluster-name "production-msk-serverless" \
--serverless '{
"vpcConfigs": [
{
"subnetIds": ["subnet-xxxxxx", "subnet-yyyyyy", "subnet-zzzzzz"],
"securityGroupIds": ["sg-msk-brokers"]
}
],
"clientAuthentication": {
"sasl": {
"iam": {"enabled": true}
}
}
}'
MSK ServerlessはIAM認証が必須のため、Kafkaクライアントにaws-msk-iam-sasl-signerライブラリを追加します:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
class MSKTokenProvider:
def __init__(self, region):
self.region = region
def token(self):
token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(self.region)
return token, expiry_ms
producer = KafkaProducer(
bootstrap_servers='boot-xxxxx.kafka.us-east-1.amazonaws.com:9098',
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=MSKTokenProvider(region='us-east-1'),
value_serializer=lambda v: v.encode('utf-8'),
compression_type='lz4',
acks='all',
retries=3
)
producer.send('orders-events', b'{"order_id":"ORD-001","amount":15000}')
producer.flush()
MSK Connect — マネージドKafka Connectフレームワーク
MSK ConnectはAWS管理のKafka Connectサービスです。Source/Sink Connectorをサーバーレス環境で実行でき、スケーリング・モニタリング・パッチ適用が完全自動化されます。EC2上でKafka Connectを自前運用する場合に比べ、インフラ管理工数を大幅削減できます。
MSK Connectの主要コンポーネント:
| コンポーネント | 説明 | 設定ポイント |
|---|---|---|
| Custom Plugin | ConnectorのJAR/ZIPファイル | S3にアップロード後ARN参照 |
| Connector | Plugin + 設定の実行単位 | Worker数・MCU数・設定プロパティ |
| Worker設定 | JVM・変換クラス・コンバータ | key/value.converter指定必須 |
| Auto-scaling | Min/Max MCU数による自動調整 | CPU使用率ベースのスケールポリシー |
Source Connector — Debezium MySQLによるCDC設定:
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "production-rds.xxxxx.us-east-1.rds.amazonaws.com",
"database.port": "3306",
"database.user": "${secretsmanager:prod/rds:username}",
"database.password": "${secretsmanager:prod/rds:password}",
"database.server.name": "production-db",
"database.include.list": "orders,customers,inventory",
"database.history.kafka.bootstrap.servers": "${bootstrapServers}",
"database.history.kafka.topic": "dbhistory.production-db",
"include.schema.changes": "true",
"snapshot.mode": "initial",
"key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"key.converter.region": "us-east-1",
"key.converter.schemaAutoRegistrationEnabled": "true",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter.region": "us-east-1",
"value.converter.schemaAutoRegistrationEnabled": "true"
}
Sink Connector — S3 Sink (Parquet + 時系列パーティション):
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "4",
"topics": "production-db.orders,production-db.customers",
"s3.region": "us-east-1",
"s3.bucket.name": "production-data-lake",
"s3.part.size": "5242880",
"flush.size": "10000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"rotate.interval.ms": "300000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"timestamp.extractor": "RecordField",
"timestamp.field": "event_time",
"errors.deadletterqueue.topic.name": "dlq-s3-sink",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.tolerance": "all"
}
MSK ConnectのAuto-scaling設定:
aws kafkaconnect create-connector \
--connector-name "s3-sink-orders-production" \
--capacity '{
"autoScaling": {
"maxWorkerCount": 10,
"mcuCount": 1,
"minWorkerCount": 2,
"scaleInPolicy": {"cpuUtilizationPercentage": 20},
"scaleOutPolicy": {"cpuUtilizationPercentage": 75}
}
}' \
--connector-configuration file://s3-sink-config.json \
--kafka-cluster '{
"apacheKafkaCluster": {
"bootstrapServers": "boot-xxxxx.kafka.us-east-1.amazonaws.com:9092",
"vpc": {
"securityGroups": ["sg-connector"],
"subnets": ["subnet-xxxxx", "subnet-yyyyy"]
}
}
}' \
--kafka-cluster-client-authentication '{"authenticationType":"IAM"}' \
--kafka-cluster-encryption-in-transit '{"encryptionType":"TLS"}' \
--kafka-connect-version "2.7.1" \
--plugins '[{"customPlugin":{"customPluginArn":"arn:aws:kafkaconnect:us-east-1:123456789:custom-plugin/s3-sink/xxxxx","revision":1}}]' \
--service-execution-role-arn "arn:aws:iam::123456789:role/MSKConnectRole"
Dead Letter Queueは必ず設定してください。未設定の場合、変換失敗したメッセージがConnectorを停止させ、データパイプライン全体が停止します。
Topic設計 — パーティション計算とRetention戦略
Kafkaスループット設計の基本はパーティション数計算です。適切なパーティション数の算定式:
パーティション数 = max(
ceil(目標書込スループット [MB/s] / パーティション書込上限 [MB/s]),
ceil(目標読取スループット [MB/s] / パーティション読取上限 [MB/s])
)
パーティション書込上限: ~30 MB/s (lz4圧縮時、ブローカー性能依存)
パーティション読取上限: ~30 MB/s (Consumer 1スレッドあたり)
実例計算 (Eコマース注文ストリーム):
目標書込: 300 MB/s ÷ 30 MB/s = 10パーティション
目標読取: 600 MB/s (Consumer 2グループ並列) ÷ 30 MB/s = 20パーティション
→ max(10, 20) = 20パーティション
安全マージン1.5倍 → 30パーティション採用
本番Topicパラメータ設定:
# 本番 Topic 作成
aws kafka create-topic \
--cluster-arn "arn:aws:kafka:us-east-1:123456789:cluster/production/xxxxx" \
--create-topic-input '{
"Name": "orders-events",
"NumPartitions": 30,
"ReplicationFactor": 3,
"ConfigEntries": [
{"Name": "retention.ms", "Value": "604800000"},
{"Name": "retention.bytes", "Value": "107374182400"},
{"Name": "min.insync.replicas", "Value": "2"},
{"Name": "compression.type", "Value": "lz4"},
{"Name": "max.message.bytes", "Value": "1048576"},
{"Name": "message.timestamp.type", "Value": "LogAppendTime"},
{"Name": "cleanup.policy", "Value": "delete"}
]
}'
min.insync.replicas=2は本番必須設定です。レプリカ1台が落ちた状態でもAck=all(acks=-1)のProducerが書き込み可能になります。
Compaction Topic (Changelog / State Store用):
# Key-Value最新状態のみ保持するCompacted Topic
# cleanup.policy=compact,delete: 圧縮+Retention期間超過データ削除の組合せ
aws kafka create-topic \
--cluster-arn "arn:aws:kafka:us-east-1:123456789:cluster/production/xxxxx" \
--create-topic-input '{
"Name": "customer-profiles-state",
"NumPartitions": 20,
"ReplicationFactor": 3,
"ConfigEntries": [
{"Name": "cleanup.policy", "Value": "compact"},
{"Name": "min.cleanable.dirty.ratio", "Value": "0.1"},
{"Name": "segment.ms", "Value": "86400000"},
{"Name": "delete.retention.ms", "Value": "86400000"},
{"Name": "max.compaction.lag.ms", "Value": "3600000"}
]
}'
Key設計のベストプラクティス:
– Null Keyを避ける — 全パーティションにラウンドロビン配送され順序保証が失われる
– 高カーディナリティなKey(UUIDなど)はHash分散で均等パーティショニングを実現
– Consumer Groupとパーティション数は1:1以上の比率を設計段階で確認すること
– パーティション数は後から増やせるが、減らすと既存Consumer Groupのオフセットが無効化される
Tiered Storage — コスト最適化とRetention無期限化
MSK Tiered Storageは2024年GAとなった機能で、Kafkaデータを透過的にS3へ階層化します。EBSストレージをホットTierとして使用し、設定した保持期間を超えたデータをコールドTier(S3)へ自動移行します。
Tiered Storageのデータ配置:
| Tier | 保存場所 | 読取レイテンシ | ストレージコスト |
|---|---|---|---|
| ホットTier | ブローカーEBS | < 10ms | ~$0.10/GB-月 |
| コールドTier | Amazon S3 | 50〜200ms | ~$0.023/GB-月 |
Tiered Storage有効化 (Provisioned MSK、クラスタ作成時のみ指定可):
aws kafka create-cluster-v2 \
--cluster-name "tiered-storage-production" \
--provisioned '{
"brokerNodeGroupInfo": {
"instanceType": "kafka.m5.4xlarge",
"clientSubnets": ["subnet-xxxxx", "subnet-yyyyy", "subnet-zzzzz"],
"storageInfo": {
"ebsStorageInfo": {"volumeSize": 500}
}
},
"numberOfBrokerNodes": 3,
"kafkaVersion": "3.6.0",
"storageMode": "TIERED"
}'
Topic単位でTiered Storage動作を設定します:
# remote.storage.enable=true : このTopicをTiered Storage対象に設定
# local.retention.ms : ローカルEBS保持期間 (残りはS3へ移行)
# local.retention.bytes=-1 : バイトサイズ制限なし
# retention.ms=-1 : S3への無期限Retention
aws kafka update-topic-configuration \
--cluster-arn "arn:aws:kafka:us-east-1:123456789:cluster/tiered-storage-production/xxxxx" \
--create-topic-input '{
"Name": "clickstream-events",
"ConfigEntries": [
{"Name": "remote.storage.enable", "Value": "true"},
{"Name": "local.retention.ms", "Value": "172800000"},
{"Name": "local.retention.bytes", "Value": "-1"},
{"Name": "retention.ms", "Value": "-1"}
]
}'
ConsumerはBrokerへ通常のFetchリクエストを送るだけで、ブローカーがローカルEBSとS3の両方から透過的にデータを取得します。古いOffsetへのシークも同じKafka APIで動作します。
コスト試算 (1TB/日書込・30日Retention):
– 従来EBSのみ: 30TB × $0.10 = $3,000/月
– Tiered Storage (ローカル2日 + S3 28日): 2TB × $0.10 + 28TB × $0.023 = $844/月
– 削減率: 約72%
Schema Registry — Glue Schema Registryによる型安全なメッセージング
Glue Schema Registry — データ契約による破壊的変更の自動防止
Glue Schema RegistryはAWS管理のスキーマ管理サービスです。ProducerとConsumer間の「データ契約」を中央管理し、スキーマ互換性ルールで破壊的変更を自動検出します。Avro / Protobuf / JSON Schemaをサポートし、KafkaペイロードにスキーマバージョンIDを埋め込むことでConsumer側のデシリアライズを保証します。
| 互換性ルール | 意味 | デプロイ順序 |
|————|——|————|
| BACKWARD | 新スキーマで旧データを読取可能 | Consumer先にデプロイ |
| FORWARD | 旧スキーマで新データを読取可能 | Producer先にデプロイ |
| FULL | 双方向互換 | 順序問わず安全 |
| NONE | チェックなし | 開発・テスト環境専用 |
本番環境ではBACKWARD互換性 + Consumer先行デプロイが標準パターンです。スキーマはローカルキャッシュされるため、Glue API呼び出しはスキーマ変更時のみ発生します。
Avroスキーマ定義 (後方互換フィールド追加の例):
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.example.events",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "JPY"},
{"name": "event_time", "type": "long", "logicalType": "timestamp-millis"},
{"name": "status", "type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
}},
{"name": "shipping_address", "type": ["null", "string"], "default": null}
]
}
shipping_addressフィールドをdefault: null付きのunion型で追加することで、旧スキーマで書かれたメッセージも新スキーマConsumerで読み取れます(BACKWARD互換)。
Glue Schema RegistryとKafkaの統合 (Python):
import boto3
from aws_schema_registry import SchemaRegistryClient
from aws_schema_registry.serde import GlueSchemaRegistrySerializer, GlueSchemaRegistryDeserializer
from kafka import KafkaProducer, KafkaConsumer
glue_client = boto3.client('glue', region_name='us-east-1')
schema_registry_client = SchemaRegistryClient(
glue_client,
registry_name='production-registry'
)
# Serializer設定 (Producer側)
serializer = GlueSchemaRegistrySerializer(
client=schema_registry_client,
schema_definition=open('order_event.avsc').read(),
data_format='AVRO',
schema_name='OrderEvent',
compatibility='BACKWARD'
)
producer = KafkaProducer(
bootstrap_servers='boot-xxxxx.kafka.us-east-1.amazonaws.com:9098',
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=MSKTokenProvider(region='us-east-1'),
value_serializer=serializer.serialize
)
# Deserializer設定 (Consumer側)
deserializer = GlueSchemaRegistryDeserializer(client=schema_registry_client)
consumer = KafkaConsumer(
'orders-events',
bootstrap_servers='boot-xxxxx.kafka.us-east-1.amazonaws.com:9098',
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=MSKTokenProvider(region='us-east-1'),
value_deserializer=lambda v: deserializer.deserialize(v, 'orders-events'),
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id='orders-processor-v1'
)
for message in consumer:
event = message.value
print(f"order_id: {event['order_id']}, status: {event['status']}")
consumer.commit()
Glue Schema Registryはスキーマバージョン情報をKafkaメッセージの先頭8バイトに埋め込みます。ConsumerはヘッダーからスキーマバージョンIDを抽出してGlueからスキーマを取得し、自動デシリアライズします。BACKWARD互換性ルール + Consumer先行デプロイを組み合わせることで、ダウンタイムなしのスキーマ進化を実現できます。
Amazon QuickSight 本番運用

QuickSight SPICE — 本番BIの心臓部
SPICE(Super-fast, Parallel, In-memory Calculation Engine)はQuickSightの高速インメモリエンジン。SPICEキャパシティの管理(リージョン単位購入)、増分リフレッシュによる更新コスト削減、Direct Queryとの使い分けが本番BI安定運用の鍵。大規模ダッシュボード(100万行超)ではSPICEの圧縮・列指向ストレージが圧倒的なレスポンス優位を示す。
SPICE最適化 — キャパシティ管理と増分リフレッシュ
SPICE(Super-fast, Parallel, In-memory Calculation Engine)はQuickSightの列指向インメモリエンジンです。データをSPICEにインポートすることで、数億行規模のデータセットでも秒単位のレスポンスを実現します。
SPICEキャパシティ管理:
# SPICEキャパシティ確認 (リージョン単位)
aws quicksight describe-account-settings \
--aws-account-id 123456789012
# SPICEキャパシティ購入 (1GBあたり $0.25/月)
aws quicksight purchase-spice-capacity \
--aws-account-id 123456789012 \
--spice-capacity-to-purchase 100
# データセットごとのSPICE使用量確認
aws quicksight describe-data-set \
--aws-account-id 123456789012 \
--data-set-id "orders-dataset-prod" \
--query 'DataSet.ConsumedSpiceCapacityInBytes'
SPICEキャパシティはリージョン単位で購入します。初期10GB/ユーザーが無料割り当てされますが、本番BI環境では追加購入が必要です。
増分リフレッシュの設定 (Full refresh vs Incremental):
| 方式 | 対象データ | SPICE消費 | 所要時間 | 適用条件 |
|---|---|---|---|---|
| Full refresh | 全データ再取込み | 現在の2倍 (一時) | 長い | 制限なし |
| Incremental | 差分のみ更新 | 差分分のみ | 短い | タイムスタンプ列が必要 |
# データセット作成 (増分リフレッシュ対応)
aws quicksight create-data-set \
--aws-account-id 123456789012 \
--data-set-id "orders-incremental" \
--name "注文データ (増分リフレッシュ)" \
--import-mode SPICE \
--physical-table-map file://table-map.json \
--logical-table-map file://logical-table.json
# 増分リフレッシュスケジュール設定
aws quicksight put-data-set-refresh-properties \
--aws-account-id 123456789012 \
--data-set-id "orders-incremental" \
--data-set-refresh-properties '{
"RefreshConfiguration": {
"IncrementalRefresh": {
"LookbackWindow": {
"ColumnName": "order_updated_at",
"Size": 1,
"SizeUnit": "DAY"
}
}
}
}'
# リフレッシュスケジュール作成 (毎時実行)
aws quicksight create-refresh-schedule \
--aws-account-id 123456789012 \
--data-set-id "orders-incremental" \
--schedule '{
"ScheduleId": "hourly-incremental",
"ScheduleFrequency": {
"Interval": "HOURLY",
"RefreshOnDay": {"DayOfWeek": "MONDAY"}
},
"StartAfterDateTime": "2024-01-01T00:00:00Z",
"RefreshType": "INCREMENTAL_REFRESH"
}'
Direct Query vs SPICE 使い分け:
| 要件 | 推奨方式 |
|---|---|
| リアルタイム性が必要 (< 1分遅延) | Direct Query |
| 高速レスポンス必須 (< 1秒) | SPICE |
| データ量が1億行超 | SPICE |
| データ更新頻度が高い (< 5分) | Direct Query |
| コスト最適化重視 | SPICE (クエリ課金回避) |
データセット設計のベストプラクティス:
– 不要な列を事前に除外してSPICE容量を節約 (非表示列は削除推奨)
– JOIN処理はデータセット作成時に完結させる (ダッシュボード時点ではJOIN不可)
– カーディナリティの高い列(UUID等)はDimensionではなくMeasureに設定してインデックス効率を上げる
– 数値フォーマット・タイムゾーンはデータセット作成時に正規化する
Embedding — アプリケーション組込BI
QuickSight EmbeddingはQuickSightのダッシュボード・Visual・Q検索バーをiframeで外部アプリに組み込む機能です。2022年からEmbedding v2 APIが提供され、Anonymous Embeddingが一般提供されました。
EmbeddingタイプとAPIの対応:
| Embeddingタイプ | API | 認証 | 主なユースケース |
|---|---|---|---|
| Anonymous Embedding | GenerateEmbedUrlForAnonymousUser | 不要 | パブリックダッシュボード |
| Registered Embedding | GenerateEmbedUrlForRegisteredUser | QuickSightアカウント | 社内ポータル |
| Console Embedding | GenerateEmbedUrlForRegisteredUser (console) | QuickSightアカウント | 管理者用フルUI |
Anonymous Embedding 実装例:
import boto3
import json
def generate_anonymous_embed_url(
aws_account_id: str,
dashboard_id: str,
allowed_domains: list[str]
) -> str:
client = boto3.client('quicksight', region_name='us-east-1')
response = client.generate_embed_url_for_anonymous_user(
AwsAccountId=aws_account_id,
SessionLifetimeInMinutes=480,
Namespace='default',
AuthorizedResourceArns=[
f'arn:aws:quicksight:us-east-1:{aws_account_id}:dashboard/{dashboard_id}'
],
ExperienceConfiguration={
'Dashboard': {
'InitialDashboardId': dashboard_id
}
},
AllowedDomains=allowed_domains,
SessionTags=[
{'Key': 'region', 'Value': 'JP'},
{'Key': 'tenant_id', 'Value': 'customer-001'}
]
)
return response['EmbedUrl']
embed_url = generate_anonymous_embed_url(
aws_account_id='123456789012',
dashboard_id='dashboard-prod-orders',
allowed_domains=['https://app.example.com']
)
Registered Embedding 実装例 (IAMロール経由):
def generate_registered_embed_url(
aws_account_id: str,
dashboard_id: str,
user_arn: str
) -> str:
client = boto3.client('quicksight', region_name='us-east-1')
response = client.generate_embed_url_for_registered_user(
AwsAccountId=aws_account_id,
SessionLifetimeInMinutes=600,
UserArn=user_arn,
ExperienceConfiguration={
'Dashboard': {
'InitialDashboardId': dashboard_id,
'FeatureConfigurations': {
'StatePersistence': {'Enabled': True},
'SharedView': {'Enabled': True}
}
}
},
AllowedDomains=['https://internal.example.com']
)
return response['EmbedUrl']
QuickSight Embedding SDK (JavaScript) によるiframe統合:
npm install amazon-quicksight-embedding-sdk
import { createEmbeddingContext } from 'amazon-quicksight-embedding-sdk';
async function embedDashboard(containerDiv, embedUrl) {
const embeddingContext = await createEmbeddingContext();
const embeddedDashboard = await embeddingContext.embedDashboard({
url: embedUrl,
container: containerDiv,
scrolling: 'no',
height: '700px',
width: '100%',
onChange: (changeEvent) => {
if (changeEvent.eventName === 'FRAME_LOADED') {
console.log('Dashboard loaded');
}
}
});
// パラメータ動的変更 (フィルター連携)
embeddedDashboard.setParameters([
{Name: 'StartDate', Values: ['2024-01-01']},
{Name: 'EndDate', Values: ['2024-12-31']},
{Name: 'Region', Values: ['JP']}
]);
}
セッション管理上の注意点: EmbedUrlのTTLは15分〜10時間(SessionLifetimeInMinutes)で設定します。URLが期限切れになると403エラーになるため、フロントエンドでURLの有効期限を管理し、期限前にバックエンドAPIで再発行する実装が必要です。
RLS — Row-Level Security によるデータアクセス制御
RLS(Row-Level Security)はデータセットレベルで行フィルタを適用し、ユーザーやグループが参照できるデータを制限する機能です。SPICE・Direct Queryどちらにも適用されます。
RLSの設定方式:
| 方式 | フィルタ列 | 適用粒度 | 管理方法 |
|---|---|---|---|
| ユーザーベース | UserName (QuickSightユーザー名) | ユーザー単位 | CSVまたはデータセット |
| グループベース | GroupName (QuickSightグループ名) | グループ単位 | CSVまたはデータセット |
| Tag-based RLS | IAMタグ / Session Tags | 動的 (実行時解決) | IAMポリシー |
RLSルールデータセット (CSV形式):
UserName,region,department
alice@example.com,JP,sales
bob@example.com,US,engineering
charlie@example.com,, ← 空値 = 全データアクセス可
RLS設定API (Python boto3):
import boto3
client = boto3.client('quicksight', region_name='us-east-1')
# RLS用データセット作成 (ルール定義)
client.create_data_set(
AwsAccountId='123456789012',
DataSetId='orders-rls-rules',
Name='注文データRLSルール',
ImportMode='SPICE',
PhysicalTableMap={
'rls-table': {
'S3Source': {
'DataSourceArn': 'arn:aws:quicksight:us-east-1:123456789012:datasource/s3-rls',
'InputColumns': [
{'Name': 'UserName', 'Type': 'STRING'},
{'Name': 'region', 'Type': 'STRING'},
{'Name': 'department', 'Type': 'STRING'}
],
'UploadSettings': {
'Format': 'CSV',
'ContainsHeader': True
}
}
}
}
)
# メインデータセットにRLS適用
client.update_data_set(
AwsAccountId='123456789012',
DataSetId='orders-dataset-prod',
Name='注文データ (RLS適用)',
ImportMode='SPICE',
PhysicalTableMap={},
LogicalTableMap={},
RowLevelPermissionDataSet={
'Namespace': 'default',
'Arn': 'arn:aws:quicksight:us-east-1:123456789012:dataset/orders-rls-rules',
'PermissionPolicy': 'GRANT_ACCESS',
'Status': 'ENABLED'
}
)
Tag-based RLS (Anonymous Embedding + 動的フィルタ):
# Anonymous Embeddingでセッションタグを渡すことでRLSを動的適用
client.generate_embed_url_for_anonymous_user(
AwsAccountId='123456789012',
SessionLifetimeInMinutes=480,
Namespace='default',
AuthorizedResourceArns=[
'arn:aws:quicksight:us-east-1:123456789012:dashboard/orders-dashboard'
],
ExperienceConfiguration={
'Dashboard': {'InitialDashboardId': 'orders-dashboard'}
},
AllowedDomains=['https://app.example.com'],
SessionTags=[
{'Key': 'tenant_id', 'Value': 'customer-001'},
{'Key': 'region', 'Value': 'JP'}
]
)
RLSルールデータセット側にtenant_id・region列を定義し、Tag-based RLSを有効化することで、SessionTagsの値でフィルタリングが動的に行われます。マルチテナントSaaSでの外部向けBI埋め込みに特に有効です。
Dashboard共有 — フォルダ管理と名前空間
フォルダ管理:
QuickSightのフォルダはダッシュボード・データセット・分析を整理し、アクセス権限を一括管理するための仕組みです。
# フォルダ作成
aws quicksight create-folder \
--aws-account-id 123456789012 \
--folder-id "sales-team-folder" \
--name "営業チーム共有" \
--folder-type SHARED
# フォルダへの権限付与 (グループ単位)
aws quicksight update-folder-permissions \
--aws-account-id 123456789012 \
--folder-id "sales-team-folder" \
--grant-permissions '[
{
"Principal": "arn:aws:quicksight:us-east-1:123456789012:group/default/sales-managers",
"Actions": ["quicksight:DescribeFolder", "quicksight:ListFolderMembers", "quicksight:CreateFolderMembership"]
}
]'
# ダッシュボードをフォルダに追加
aws quicksight create-folder-membership \
--aws-account-id 123456789012 \
--folder-id "sales-team-folder" \
--member-id "orders-dashboard" \
--member-type DASHBOARD
名前空間 (マルチテナント構成):
QuickSight名前空間はマルチテナント環境でテナントごとにユーザー・グループ・ダッシュボードを完全分離する仕組みです。
# 名前空間作成 (テナントごとに独立したQuickSight空間)
aws quicksight create-namespace \
--aws-account-id 123456789012 \
--namespace "tenant-customer001" \
--identity-store QUICKSIGHT
# 名前空間内にユーザー登録
aws quicksight register-user \
--aws-account-id 123456789012 \
--namespace "tenant-customer001" \
--email "admin@customer001.com" \
--identity-type IAM \
--iam-arn "arn:aws:iam::123456789012:role/QuickSightTenantRole" \
--user-role READER \
--session-name "customer001-admin"
Email配信スケジュール (ダッシュボードスナップショット):
# ダッシュボードPDFスナップショットをメール配信
aws quicksight create-ingestion \
--aws-account-id 123456789012 \
--data-set-id "orders-dataset-prod" \
--ingestion-id "manual-refresh-$(date +%Y%m%d%H%M)"
# スナップショットジョブ開始 (PDF形式)
aws quicksight start-dashboard-snapshot-job \
--aws-account-id 123456789012 \
--dashboard-id "orders-dashboard" \
--snapshot-job-id "weekly-report-$(date +%Y%m%d)" \
--user-configuration '{
"AnonymousUsers": [{
"RowLevelPermissionTagConfiguration": {
"TagRules": [{"TagKey": "region", "ColumnName": "region"}]
},
"AuthorizedTargetsByService": [{
"Service": "QUICKSIGHT",
"AuthorizedTargets": ["arn:aws:quicksight:us-east-1:123456789012:dashboard/orders-dashboard"]
}]
}]
}' \
--snapshot-configuration '{
"FileGroups": [{
"Files": [{"SheetSelections": [{"SheetId": "sales-overview", "SelectionScope": "ALL_VISUALS"}], "FormatType": "PDF"}]
}],
"DestinationConfiguration": {
"S3Destinations": [{"BucketConfiguration": {"BucketName": "reports-bucket", "BucketPrefix": "weekly/"}}]
}
}'
QuickSight Q — 自然言語クエリとGenerative BI
QuickSight Q + Generative BI — 非エンジニアが直接データを問い合わせる世界
QuickSight Q(NLQ: Natural Language Queries)は自然言語でデータを問い合わせる機能です。2024年にはClaude powered Generative BIが追加され、「売上が最も高い地域は?」と入力するだけでビジュアルが自動生成されます。BIエンジニアが全ダッシュボードを事前定義する必要がなくなり、ビジネスユーザーの自律的なデータ探索が可能になります。
| 機能 | 概要 | 設定ポイント |
|——|——|————|
| Topic定義 | データセット + フィールドメタデータの束 | Q機能の起点 |
| Friendly name | フィールドの自然言語エイリアス | 精度向上の要 |
| Custom Answer | 特定質問への固定回答定義 | よくある質問の最適化 |
| Generative BI | AIによるビジュアル自動生成 | 2024 GA |
| Executive Summary | AI生成のダッシュボードサマリ | 経営層向けレポート |
Topic定義とフィールドメタデータ設定:
# Q用Topic作成
aws quicksight create-topic \
--aws-account-id 123456789012 \
--topic-id "sales-analysis-topic" \
--topic '{
"Name": "売上分析トピック",
"Description": "注文・売上データの自然言語クエリ用トピック",
"DataSets": [
{
"DatasetArn": "arn:aws:quicksight:us-east-1:123456789012:dataset/orders-dataset-prod",
"DatasetName": "注文データ",
"DatasetDescription": "全注文・売上・顧客データ",
"Filters": [],
"Columns": [
{
"ColumnName": "order_amount",
"ColumnFriendlyName": "売上金額",
"ColumnDescription": "注文1件あたりの売上金額 (円)",
"ColumnSynonyms": ["売上", "売り上げ", "金額", "revenue"],
"ColumnDataRole": "MEASURE",
"Aggregation": "SUM",
"DefaultFormatting": {
"DisplayFormat": "CURRENCY",
"DisplayFormatOptions": {
"UseBlankCellFormat": false,
"Prefix": "¥",
"DecimalSeparator": "DOT",
"ThousandsSeparator": {"Visibility": "VISIBLE"}
}
}
},
{
"ColumnName": "order_date",
"ColumnFriendlyName": "注文日",
"ColumnDescription": "注文が行われた日付",
"ColumnSynonyms": ["日付", "受注日", "購入日"],
"ColumnDataRole": "DIMENSION",
"TimeGranularity": "DAY"
},
{
"ColumnName": "region",
"ColumnFriendlyName": "地域",
"ColumnDescription": "顧客の地域 (JP/US/EU)",
"ColumnSynonyms": ["エリア", "地方", "リージョン"],
"ColumnDataRole": "DIMENSION"
}
]
}
]
}'
Custom Answer (よくある質問への最適化):
client = boto3.client('quicksight', region_name='us-east-1')
# Custom Answerでよく使うクエリを定義
client.create_topic_reviewed_answer(
AwsAccountId='123456789012',
TopicId='sales-analysis-topic',
Answers=[
{
'AnswerId': 'monthly-sales-by-region',
'Question': '地域別の月次売上を見たい',
'DatasetArn': 'arn:aws:quicksight:us-east-1:123456789012:dataset/orders-dataset-prod',
'PrimaryVisual': {
'VisualId': 'bar-chart-region-monthly',
'Role': 'PRIMARY',
'Ir': {
'Metrics': [{'Operand': 'order_amount', 'Function': 'SUM'}],
'GroupBys': [{'FieldName': 'region'}, {'FieldName': 'order_date', 'TimeGranularity': 'MONTH'}]
}
}
}
]
)
Q精度向上のベストプラクティス:
– フィールドのFriendly nameを日本語で設定する (英語カラム名では誤認識が多発)
– シノニムを3〜5個設定する (「売上」「売り上げ」「revenue」「金額」など)
– 計算フィールドをデータセット側で事前定義し、Topicに含める
– カーディナリティが1万超の列(商品IDなど)はDimensionではなくMeasureに設定してNLQ性能を保つ
– Custom Answerで「経営陣がよく聞く質問」トップ20を事前定義しておく
Generative BI(2024 GA)はTopicに紐付けて使用します。ユーザーが自然言語で入力すると、Claudeがデータ構造を解析してビジュアルを自動選択・生成します。Executive Summary機能はダッシュボードのデータを自動解析し、「今月の売上は先月比+12%、主な要因は東日本エリアの新規顧客増加」のような自然言語サマリを生成します。
EMR Serverless 本番運用

%% Mermaid02: Data Analytics Vol1↔Vol2 相関マップ (§8まとめ用)
graph TB
subgraph Vol1["Vol1: バッチ分析基盤"]
Athena[Athena クエリ層]
Glue[Glue ETL層]
LF[Lake Formation ガバナンス]
RS[Redshift Serverless DWH]
end
subgraph Vol2["Vol2: リアルタイム分析基盤"]
KDS[Kinesis ストリーミング]
MSK[MSK メッセージング]
QS[QuickSight 可視化]
EMR[EMR Serverless 分散処理]
end
S3[Amazon S3 Data Lake]
KDS -->|Firehose| S3
MSK -->|Connect| S3
S3 --> Glue
S3 --> Athena
S3 --> RS
S3 --> EMR
EMR --> S3
RS --> QS
Athena --> QS
LF -->|権限制御| Athena
LF -->|権限制御| Glue
LF -->|権限制御| RS
EMR Serverlessは2022年にGAし、サーバーレスアーキテクチャでApache Spark・Hive・Prestoジョブを実行できるサービスです。クラスター管理不要・従量課金・自動スケーリングの3特性により、散発的なバッチ処理からリアルタイム集計まで幅広いユースケースをカバーします。2024-2025年にかけてemr-7.xシリーズとGraviton3インスタンスへの対応が拡充され、従来比最大20%のパフォーマンス向上と15%のコスト削減が実現されています。
Application 設計
リリースバージョン選定
EMR Serverlessのリリースラベルは emr-6.x と emr-7.x の2系統があります。2024年以降の新規プロジェクトは emr-7.x を推奨します。
| リリース | Spark | Hive | Presto/Trino | 主な特徴 |
|---|---|---|---|---|
| emr-6.15.0 | 3.4.1 | 3.1.3 | Presto 0.282 | 安定版・実績多数 |
| emr-7.0.0 | 3.5.0 | 3.1.3 | Trino 435 | Java 17対応・OSS最新版 |
| emr-7.1.0 | 3.5.1 | 3.3.1 | Trino 442 | Graviton3対応・推奨最新 |
本番導入時は以下の基準でバージョンを選定します。既存コードベースがSpark 3.3以前に依存する場合はemr-6.15で安定運用し、新規プロジェクトまたはパフォーマンス重視ではemr-7.1.0を選択します。
aws emr-serverless create-application \
--name "prod-spark-app" \
--type SPARK \
--release-label "emr-7.1.0" \
--initial-capacity '{
"DRIVER": {
"workerCount": 2,
"workerConfiguration": {
"cpu": "4vCPU",
"memory": "16GB",
"disk": "200GB"
}
},
"EXECUTOR": {
"workerCount": 10,
"workerConfiguration": {
"cpu": "4vCPU",
"memory": "16GB",
"disk": "200GB"
}
}
}' \
--maximum-capacity '{"cpu": "400vCPU", "memory": "3000GB", "disk": "20000GB"}' \
--network-configuration '{
"subnetIds": ["subnet-aabbccdd", "subnet-eeff0011"],
"securityGroupIds": ["sg-12345678"]
}' \
--auto-stop-config '{"enabled": true, "idleTimeoutMinutes": 15}'
Pre-initialized Capacity
EMR Serverless Pre-initialized Capacity — コールドスタート回避の鍵
EMR Serverlessはジョブ投入時にWorkerを起動するため、初回実行に1-3分のコールドスタートが発生する。Pre-initialized capacityを設定すると、指定したDriver/Executor数を事前起動状態で待機させ、ジョブ即時開始が可能になる。低レイテンシ要件のバッチジョブやスケジュール実行の定時性担保に不可欠。ただしアイドル時間も課金対象のため、実行頻度とのバランスで設定する。
設定指針:
– 毎時スケジュールジョブ: Driver×2 / Executor×10を事前起動 → 起動待ち解消
– 5分以上間隔が空くユースケース: Pre-initialized不要(コスト > メリット)
– idleTimeoutMinutes: 15との組み合わせで、ジョブ完了後15分でWorker自動停止
Pre-initialized capacityはApplication作成後でも update-application コマンドで変更できます。本番では initialCapacity を段階的に調整し、CloudWatch Metrics の RunningWorkerCount を監視して適正値を導出します。
Network 設定
EMR ServerlessをVPC内に配置することで、S3 VPCエンドポイント経由のプライベート通信とGlue Data Catalogへのアクセスが安定します。
{
"networkConfiguration": {
"subnetIds": [
"subnet-private-1a",
"subnet-private-1c"
],
"securityGroupIds": ["sg-emr-serverless-workers"]
}
}
セキュリティグループの設計ポイント:
– アウトバウンド: S3エンドポイント(443)・Glue API(443)・CloudWatch Logs(443)を許可
– インバウンド: EMR Serverless内部通信のみ(同一SG間通信を許可)
– S3 Gateway VPCエンドポイントを事前作成しておくことでデータ転送コストを削減
IAMロール設計
Job実行ロールには最小権限の原則に基づき、必要なS3バケット・Glue Catalog・CloudWatch Logsへのアクセスのみを付与します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3DataAccess",
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject", "s3:DeleteObject", "s3:ListBucket"],
"Resource": [
"arn:aws:s3:::my-datalake-bucket",
"arn:aws:s3:::my-datalake-bucket/*"
]
},
{
"Sid": "GlueCatalogAccess",
"Effect": "Allow",
"Action": [
"glue:GetDatabase", "glue:GetTable", "glue:GetPartitions",
"glue:CreateTable", "glue:UpdateTable", "glue:BatchCreatePartition"
],
"Resource": ["arn:aws:glue:ap-northeast-1:123456789012:catalog",
"arn:aws:glue:ap-northeast-1:123456789012:database/analytics",
"arn:aws:glue:ap-northeast-1:123456789012:table/analytics/*"]
},
{
"Sid": "CloudWatchLogs",
"Effect": "Allow",
"Action": ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"],
"Resource": "arn:aws:logs:ap-northeast-1:123456789012:log-group:/aws/emr-serverless/*"
}
]
}
Spark Job 本番設計
Spark Submit パラメータ
Spark JobのパフォーマンスはDriver/Executorのメモリ・コア設定に大きく依存します。本番では実データ量に基づき段階的にチューニングします。
aws emr-serverless start-job-run \
--application-id "00fm8cutsk7pru09" \
--execution-role-arn "arn:aws:iam::123456789012:role/EMRServerlessJobRole" \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://my-scripts/jobs/data_aggregation.py",
"entryPointArguments": [
"--input-path", "s3://my-datalake/raw/events/",
"--output-path", "s3://my-datalake/aggregated/daily/",
"--date", "2025-01-15"
],
"sparkSubmitParameters": "--conf spark.driver.cores=4 --conf spark.driver.memory=14g --conf spark.executor.cores=4 --conf spark.executor.memory=14g --conf spark.executor.instances=20 --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=5 --conf spark.dynamicAllocation.maxExecutors=50"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"cloudWatchLoggingConfiguration": {
"enabled": true,
"logGroupName": "/aws/emr-serverless/jobs",
"logStreamNamePrefix": "data-aggregation"
},
"s3MonitoringConfiguration": {
"logUri": "s3://my-logs-bucket/emr-serverless/"
}
}
}'
Executor メモリの設定指針:
| データ量 | Driver memory | Executor memory | Executor数 |
|---|---|---|---|
| ~10GB | 4GB | 8GB | 5-10 |
| 10-100GB | 8GB | 16GB | 10-30 |
| 100GB-1TB | 14GB | 28GB | 20-50 |
| 1TB超 | 14GB | 28GB | 動的割当(最大100) |
依存ライブラリ管理
EMR Serverlessでの依存ライブラリ配布には3つの方法があります。用途に応じて使い分けます。
# 方法1: Maven座標指定(小規模ライブラリ)
--conf spark.jars.packages=org.apache.spark:spark-avro_2.12:3.5.1,io.delta:delta-spark_2.12:3.2.0
# 方法2: S3アップロード済みWHLファイル
--conf spark.submit.pyFiles=s3://my-scripts/libs/custom_utils-1.2.0-py3-none-any.whl
# 方法3: カスタムDockerイメージ(大規模依存ライブラリ環境)
# custom-image-uri: 123456789012.dkr.ecr.ap-northeast-1.amazonaws.com/emr-custom:emr-7.1.0-spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
spark = SparkSession.builder \
.appName("DataAggregationJob") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("value", LongType(), True)
])
df = spark.read \
.schema(schema) \
.parquet("s3://my-datalake/raw/events/dt=2025-01-15/")
result = df.groupBy("event_type", F.date_trunc("hour", "timestamp").alias("hour")) \
.agg(
F.count("event_id").alias("event_count"),
F.sum("value").alias("total_value"),
F.approx_count_distinct("user_id").alias("unique_users")
)
result.write \
.mode("overwrite") \
.partitionBy("event_type") \
.parquet("s3://my-datalake/aggregated/daily/dt=2025-01-15/")
spark.stop()
Job監視
CloudWatch MetricsとSpark UIの2ルートでジョブを監視します。
# Job Run状態確認
aws emr-serverless get-job-run \
--application-id "00fm8cutsk7pru09" \
--job-run-id "00fmdlcik3gtu20t"
# 全Job Run一覧(直近24時間)
aws emr-serverless list-job-runs \
--application-id "00fm8cutsk7pru09" \
--states RUNNING PENDING FAILED \
--created-at-after "$(date -u -d '24 hours ago' +%Y-%m-%dT%H:%M:%SZ 2>/dev/null || date -u -v-24H +%Y-%m-%dT%H:%M:%SZ)"
Spark UI はジョブ実行中に EMR コンソールから直接アクセス可能です。Stage/Task レベルのボトルネック分析、Shuffle Read/Write の異常値検出、GC オーバーヘッドの確認に活用します。
Auto-scaling 設定
Min/Max Worker 設定
maximumCapacity でApplication全体のリソース上限を設定し、initialCapacity で事前起動数を制御します。
{
"maximumCapacity": {
"cpu": "400vCPU",
"memory": "3000GB",
"disk": "20000GB"
}
}
vCPU/Memory上限の設計指針:
– 上限は「最大並列ジョブ数 × 最大Executor数 × Executor仕様」で算出
– 想定外の暴走ジョブによる過課金を防ぐため、上限は推定最大値の120%程度に設定
– AWSアカウントのService Quotaと照合(デフォルトはリージョンあたり400vCPU)
スケーリング動作と Job 完了時自動停止
EMR Serverlessは需要ベースで自動スケーリングします。ジョブ投入時にPending Executorが発生すると新しいWorkerが自動起動し、アイドル状態が続くと autoStopConfig に従い自動縮退します。
{
"autoStopConfig": {
"enabled": true,
"idleTimeoutMinutes": 15
}
}
idleTimeoutMinutes: 全ジョブ完了後、Applicationが自動停止するまでの待機時間- Pre-initialized capacityを設定している場合、アイドルWorkerは
idleTimeoutMinutes後に解放 enabled: falseにすると手動停止が必要になるため、本番ではtrueを推奨
Cost 最適化
vCPU 時間・Memory 時間課金モデル
EMR Serverlessの課金はWorkerが稼働した時間に対して発生します。ジョブ実行中だけでなくPre-initialized capacityのアイドル時間も課金対象です。
計算式: コスト = vCPU使用時間(h) × $0.052 + GBメモリ使用時間(h) × $0.0057
| Workerタイプ | vCPU | Memory | 1時間コスト | 8時間コスト |
|---|---|---|---|---|
| 小(Driver) | 2 | 4GB | $0.127 | $1.018 |
| 標準(Executor) | 4 | 16GB | $0.299 | $2.390 |
| 大(重量Executor) | 8 | 32GB | $0.598 | $4.781 |
| Pre-init 10台(標準) | 40 | 160GB | $2.992 | $23.936 |
Pre-initialized vs On-demand 比較
| 観点 | Pre-initialized | On-demand |
|---|---|---|
| 起動レイテンシ | 数秒(即時) | 1-3分 |
| アイドル課金 | あり | なし |
| 適用ユースケース | 毎時バッチ・定時SLA要件 | 不定期実行・開発テスト |
| コスト最適化方法 | 必要最小台数に絞る | デフォルトで最適 |
| idleTimeout設定 | 15-30分が目安 | 5-10分で十分 |
Cost Explorer 連携とタグベースコスト配分
EMR Serverlessジョブへのタグ付けにより、プロジェクト・チーム・環境単位のコスト可視化が可能です。
aws emr-serverless tag-resource \
--resource-arn "arn:aws:emr-serverless:ap-northeast-1:123456789012:/applications/00fm8cutsk7pru09" \
--tags '{"Project": "analytics-platform", "Team": "data-engineering", "Environment": "production", "CostCenter": "DE-001"}'
Cost Explorerのタグフィルタと組み合わせることで、Application単位・ジョブ種別単位の月次コストレポートを自動生成できます。
S3 連携設計
Input/Output パス設計
S3上のデータ配置はパーティション構造を事前設計することで、Sparkのパーティションプルーニング効率が大幅に向上します。
from pyspark.sql import SparkSession
import sys
spark = SparkSession.builder.appName("S3PartitionedIO").getOrCreate()
input_path = "s3://my-datalake/raw/events/year=2025/month=01/day=15/"
df = spark.read.parquet(input_path)
df_processed = df.filter(df.event_type == "purchase") \
.withColumn("revenue_usd", df.value / 100.0)
df_processed.write \
.mode("overwrite") \
.partitionBy("event_type", "year", "month") \
.parquet("s3://my-datalake/aggregated/events/")
Shuffle Storage 選定
Shuffleデータの保存先はS3 ShuffleとEBS Shuffleの2択です。
| 観点 | S3 Shuffle | EBS Shuffle |
|---|---|---|
| 耐障害性 | 高(S3の11 nines) | 低(Worker障害でデータ消失) |
| コスト | S3ストレージ料金が発生 | EBS料金(起動中のみ) |
| レイテンシ | やや高い | 低い |
| 推奨ユースケース | 大規模Shuffle(100GB超) | 低レイテンシ・小〜中規模Shuffle |
{
"sparkSubmitParameters": "--conf spark.emr-serverless.storage.shuffle.type=s3 --conf spark.emr-serverless.storage.shuffle.s3.bucket=my-shuffle-bucket --conf spark.emr-serverless.storage.shuffle.s3.prefix=emr-shuffle/"
}
Glue Data Catalog 統合と Hive Metastore
EMR ServerlessはGlue Data Catalogをデフォルトのメタストアとして利用します。追加設定なしでGlueで管理するデータベース・テーブルをSparkから参照可能です。
spark = SparkSession.builder \
.appName("GlueCatalogIntegration") \
.config("hive.metastore.client.factory.class",
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.enableHiveSupport() \
.getOrCreate()
-- Glue Catalog上のテーブルをSpark SQLで操作
USE analytics;
CREATE TABLE IF NOT EXISTS daily_aggregation (
event_type STRING,
hour TIMESTAMP,
event_count BIGINT,
total_value BIGINT,
unique_users BIGINT
)
USING PARQUET
PARTITIONED BY (dt STRING)
LOCATION 's3://my-datalake/aggregated/daily/';
MSCK REPAIR TABLE daily_aggregation;
SELECT
event_type,
DATE_TRUNC('day', hour) AS day,
SUM(event_count) AS total_events,
SUM(unique_users) AS daily_unique_users
FROM daily_aggregation
WHERE dt = '2025-01-15'
GROUP BY event_type, DATE_TRUNC('day', hour)
ORDER BY total_events DESC;
Lake Formation Fine-grained Access Control
EMR Serverless と Lake Formation を連携させることで、テーブル・カラム・行レベルの細粒度アクセス制御が実現します。2024年のアップデートにより、EMR ServerlessジョブからLake Formation管理テーブルへの直接アクセスが安定化されました。
{
"configurationOverrides": {
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.hadoop.fs.s3.impl": "com.amazon.ws.emr.hadoop.fs.EmrFileSystem",
"spark.sql.catalog.glue_catalog": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.glue_catalog.warehouse": "s3://my-datalake/iceberg/",
"spark.sql.catalog.glue_catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"spark.sql.catalog.glue_catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO"
}
}
]
}
}
Lake Formation設定の要点:
– Job実行ロールをLake FormationのData Lake管理者または適切なデータアクセス権限で登録
– DESCRIBE権限: テーブルスキーマ読取に必要
– SELECT権限: データ読取(カラムレベルで絞り込み可能)
– Row Filter: 特定ユーザー/ロールに表示する行を動的フィルタリング
詰まりポイント 7選 図解
リアルタイムデータ分析基盤で本番エンジニアが実際に詰まるポイントを7パターン体系化しました。エラー症状・根本原因・即効対処法をep-boxで解説します。
| # | サービス | 詰まりポイント | 主要エラー/症状 |
|---|---|---|---|
| ① | Kinesis Data Streams | Shard数過不足 | WriteProvisionedThroughputExceeded / Hot Shard |
| ② | KCL 2.x | チェックポイント遅延 | DynamoDB Throttle / IteratorAge増加 |
| ③ | Amazon MSK | パーティション再割当暴発 | Consumer Group Rebalance Storm |
| ④ | QuickSight | SPICE容量枯渇 | リフレッシュ失敗 / 表示データ陳腐化 |
| ⑤ | EMR Serverless | コールドスタート | Pre-initialized未設定によるSLA違反 |
| ⑥ | Kinesis→Firehose | Transform Lambdaタイムアウト | 6MB制限 / バッファリング設定ミス |
| ⑦ | MSK Connect | Connector障害 | DLQ未設定 / Poison Pill / Offset管理ミス |
詰まり① Kinesis Shard数過不足 — WriteProvisionedThroughputExceeded / Hot Shard発生
症状: WriteProvisionedThroughputExceeded エラーが多発し、CloudWatch WriteProvisionedThroughputExceeded.Records が急増する。特定Shardのみ負荷集中(Hot Shard)が発生し、Consumer側のIteratorAgeが異常拡大する。
原因分析:
Provisionedモードでは1 Shardあたり書込1MB/s・1,000 records/s、読取2MB/s・5 transactions/sの上限がある。以下の3パターンで頻発する。
| パターン | 原因 | 症状 |
|—|—|—|
| Shard数不足 | プロデューサー書込レート > Shard合計容量 | WriteProvisionedThroughputExceeded |
| Hot Shard | Partition Keyの偏り(ユーザーIDやIPアドレスに集中) | 特定Shardのみ負荷集中 |
| On-Demand移行忘れ | トラフィック急増時にProvisioned維持 | スパイク時のエラー率急増 |
対処法:
– Shard数計算式: 必要Shard数 = max(ceil(最大書込MB/s / 1), ceil(最大records/s / 1000))
– Hot Shard対策: Partition KeyにUUID/乱数の分散成分を追加(例: user_id + "-" + str(random.randint(0, 9)))
– On-Demand移行: トラフィック予測困難またはスパイクがある場合は即座にOn-Demand切替
# Shard数確認
aws kinesis describe-stream-summary \
--stream-name my-stream \
--query 'StreamDescriptionSummary.OpenShardCount'
# On-Demand モードへ切替(Provisionedから即時変更可能)
aws kinesis update-stream-mode \
--stream-arn arn:aws:kinesis:ap-northeast-1:123456789012:stream/my-stream \
--stream-mode-details StreamMode=ON_DEMAND
CloudWatchアラート設定推奨メトリクス:
– WriteProvisionedThroughputExceeded.Records — 閾値: 100件/分で警告
– GetRecords.IteratorAgeMilliseconds — 閾値: 60,000ms(1分)で警告
– PutRecord.Success — 成功率が99%を下回った場合に通知
詰まり② KCL チェックポイント遅延 — DynamoDB Throttle / リース競合 / IteratorAge増加
症状: KCL Consumerの IteratorAge が増加し続け、消費が追いつかない。DynamoDBの ProvisionedThroughputExceededException が多発し、チェックポイント処理が断続的に失敗する。複数Worker間でリース競合が発生し、同一Shardを取り合う状態になる。
原因分析:
| 原因 | 詳細 | 対処 |
|—|—|—|
| DynamoDB RCU/WCU不足 | KCL管理テーブル(リース/チェックポイント)のスループット超過 | PAY_PER_REQUESTに変更 |
| リース競合 | Worker数 > Shard数で複数WorkerがShard奪い合い | Worker数をShard数以下に抑制 |
| チェックポイント間隔が長い | 障害時に大量レコードの再処理が必要になる | 間隔を短縮(30〜60秒推奨) |
| Graceful shutdown未実装 | 終了前にチェックポイントが保存されず重複処理 | shutdown()でfinal checkpoint必須 |
対処法:
# KCL 2.x チェックポイント設定例
import time
import json
class MyRecordProcessor:
def __init__(self):
self.CHECKPOINT_FREQ_SECONDS = 60
self.last_checkpoint_time = time.time()
def initialize(self, initialize_input):
self.shard_id = initialize_input.shard_id
def process_records(self, process_records_input):
for record in process_records_input.records:
data = json.loads(record.data)
self._process(data)
if time.time() - self.last_checkpoint_time > self.CHECKPOINT_FREQ_SECONDS:
try:
process_records_input.checkpointer.checkpoint()
self.last_checkpoint_time = time.time()
except CheckpointError:
# Throttlingは一時的 — 次サイクルでリトライ
pass
def shutdown(self, shutdown_input):
# Graceful shutdown: 必ずfinal checkpointを実行
if shutdown_input.reason == ShutdownReason.TERMINATE:
shutdown_input.checkpointer.checkpoint()
DynamoDB設定: KCL管理テーブルのキャパシティは PAY_PER_REQUEST を推奨。Worker数は常にShard数以下に維持すること。
詰まり③ MSK パーティション再割当暴発 — Consumer Group Rebalance Storm
症状: Kafkaコンシューマーが Rebalancing... ログを大量出力し、処理が断続的に停止する。session.timeout.ms 超過が頻発してグループから追い出される。スケールアウト時にRebalanceが収束せず、処理停止が長時間続く。
根本原因: Consumer内の処理が重くなり max.poll.interval.ms を超過するか、GCポーズなどで session.timeout.ms 内のHeartbeatが失敗し、Rebalanceが連鎖的に発生する。
| 設定パラメータ | デフォルト値 | 推奨値 | 理由 |
|—|—|—|—|
| session.timeout.ms | 45,000ms | 60,000ms | 重い処理中のタイムアウト防止 |
| max.poll.interval.ms | 300,000ms | 600,000ms | バッチ処理への余裕確保 |
| heartbeat.interval.ms | 3,000ms | 3,000ms | session_timeout の1/20以下に維持 |
対処法:
# Kafka Consumer — Rebalance Storm対策設定
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=[
'b-1.my-cluster.kafka.ap-northeast-1.amazonaws.com:9092'
],
group_id='my-consumer-group',
session_timeout_ms=60000,
max_poll_interval_ms=600000,
heartbeat_interval_ms=3000,
# Static Membership: 再起動時もパーティション維持(Rebalance抑制)
group_instance_id='consumer-instance-1',
# Sticky Assignor: Rebalance時のパーティション移動を最小化
partition_assignment_strategy=[
'kafka.coordinator.assignors.StickyAssignor'
],
)
Static Membership (group.instance.id) を設定すると、Consumer再起動時にRebalanceが発生しなくなる。コンテナ再起動が頻繁な環境では特に有効。
詰まり④ QuickSight SPICE容量枯渇 — リフレッシュ失敗 / データソース肥大化
症状: データセットのリフレッシュが SPICE capacity limit exceeded エラーで失敗し、ダッシュボードのデータが古いまま更新されない。月末に突然リフレッシュが全て失敗し始める。スケジュール実行が連続失敗してもアラートに気づかない。
原因分析:
| 原因 | 詳細 |
|—|—|
| SPICEキャパシティ上限到達 | リージョン単位のSPICE総容量が枯渇 |
| 全件リフレッシュの非効率 | 毎回全レコードを再取込し容量・時間を無駄消費 |
| データセット設計の問題 | 不要列・JOIN結果の膨張・重複データで容量過多 |
| 容量アラート未設定 | 枯渇直前まで気づかない |
対処法:
# SPICE残容量確認
aws quicksight describe-account-settings \
--aws-account-id 123456789012 \
--query 'AccountSettings'
# データセットリフレッシュ状態確認(最新10件)
aws quicksight list-ingestions \
--aws-account-id 123456789012 \
--data-set-id xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx \
--query 'Ingestions[:10].{Status:IngestionStatus,Size:IngestionSizeInBytes,Error:ErrorInfo}'
SPICEキャパシティ管理のベストプラクティス:
1. CloudWatch SPICECapacityUsed で70%アラート・90%クリティカルアラートを設定
2. タイムスタンプ列があるデータセットは全件リフレッシュから増分リフレッシュに切替
3. データソース側でSELECT列を必要最小限に絞り、集計はソース側で実施
4. 大規模データセットは Direct Query vs SPICE のコストトレードオフを再評価
詰まり⑤ EMR Serverless コールドスタート — Pre-initialized未設定によるSLA違反
症状: ジョブを投入してから処理が実際に始まるまで1〜3分間の待機が発生する。定時スケジュールジョブの開始遅延でSLAを継続的に違反する。緊急のデータ再処理ジョブが遅延してビジネスインパクトが生じる。
原因: EMR ServerlessはリクエストベースでWorkerコンテナを起動するアーキテクチャのため、Pre-initialized capacity を設定しない場合、ジョブ受付からWorker起動完了まで初期化時間が発生する。
| Pre-initialized設定 | ジョブ開始速度 | コスト特性 |
|—|—|—|
| 設定あり | 数秒以内(即時起動) | アイドル時も最低限課金 |
| 設定なし | 1〜3分(コンテナ起動待ち) | ジョブ実行時間のみ課金 |
対処法:
# Pre-initialized Capacity設定(定時バッチ・低レイテンシ要件に)
aws emr-serverless update-application \
--application-id ap-01234567890abcdef \
--initial-capacity '{
"DRIVER": {
"workerCount": 1,
"workerConfiguration": {
"cpu": "2vCPU",
"memory": "4GB"
}
},
"EXECUTOR": {
"workerCount": 10,
"workerConfiguration": {
"cpu": "2vCPU",
"memory": "8GB"
}
}
}'
使い分けの指針: 1時間に複数回実行するジョブ・定時SLAがあるジョブ・緊急再処理用途はPre-initialized設定で安定運用。深夜1回のみ実行する日次バッチはOn-demand起動でコスト最小化。
詰まり⑥ Kinesis→Firehose Transform Lambdaタイムアウト — 6MB制限 / バッファリング設定ミス
症状: Kinesis Firehoseの DataTransformation Lambdaが頻繁にタイムアウトし、S3への配信遅延や欠損が発生する。CloudWatch DataTransformation.Duration.p99 が300秒に張り付く。ProcessingFailedRecords カウントが増加し続け、原因レコードが特定できない。
原因分析:
| 制約/設定 | 値 | 影響 |
|—|—|—|
| Lambda バッチサイズ上限 | 6MB | 設定によっては頻繁にタイムアウト |
| Lambda タイムアウト上限 | 300秒 | Firehoseは最大3分Lambda実行を待機 |
| BufferSizeInMBs設定ミス | 大きすぎる値 | 蓄積レコードが6MB制限に頻繁に到達 |
| DLQ未設定 | — | 変換失敗レコードが追跡不能 |
対処法:
{
"ProcessingConfiguration": {
"Enabled": true,
"Processors": [{
"Type": "Lambda",
"Parameters": [
{
"ParameterName": "LambdaArn",
"ParameterValue": "arn:aws:lambda:ap-northeast-1:123456789012:function:transform"
},
{"ParameterName": "BufferSizeInMBs", "ParameterValue": "3"},
{"ParameterName": "BufferIntervalInSeconds", "ParameterValue": "60"},
{"ParameterName": "NumberOfRetries", "ParameterValue": "3"}
]
}]
},
"S3BackupMode": "FailedDataOnly",
"S3BackupConfiguration": {
"BucketARN": "arn:aws:s3:::my-firehose-error-backup"
}
}
重要ポイント: BufferSizeInMBs を3MB以下に設定し、6MB制限に余裕を持たせる。S3BackupMode: FailedDataOnly で変換失敗レコードをS3へバックアップし、Poison Pillレコードを特定・除去できる体制を整える。
詰まり⑦ MSK Connect Connector障害 — DLQ未設定 / Offset管理ミス / Plugin互換性
症状: MSK ConnectのConnectorが突然停止し再起動を繰り返す(Poison Pill問題)。変換エラーの詳細が追跡できない。Connectorを再起動しても同じレコードで止まり続ける。ログに Plugin class not found や ClassNotFoundException が出力される。
原因分析:
| 問題パターン | 原因 | 影響 |
|—|—|—|
| DLQ未設定 | 変換失敗レコードがエラーログにのみ残る | Poison Pill: 同一レコードで無限停止 |
| Offset管理ミス | 手動リセット手順誤り(Connector停止前にリセット) | 重複処理またはデータ欠損 |
| Plugin互換性 | ConnectorバージョンとKafka APIバージョン不一致 | ClassNotFoundException / 起動失敗 |
| タスク数過多 | MSK Serverlessのパーティション数超過 | タスクがSHUFFLED状態で起動不能 |
対処法:
# DLQ設定付きConnector作成
aws kafkaconnect create-connector \
--connector-name my-debezium-connector \
--kafka-cluster '{
"apacheKafkaCluster": {
"bootstrapServers": "b-1.my-msk.kafka.ap-northeast-1.amazonaws.com:9092",
"vpc": {
"securityGroups": ["sg-0123456789abcdef0"],
"subnets": ["subnet-0123456789abcdef0"]
}
}
}' \
--connector-configuration '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"errors.deadletterqueue.topic.name": "my-connector-dlq",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"tasks.max": "4"
}'
Connector障害時の復旧手順:
1. Connectorを停止(PAUSED 状態に変更)してからOffsetリセット
2. DLQのメッセージを調査して失敗原因を特定
3. Plugin互換性確認(MSKバージョン × Connector Pluginバージョンマトリクス参照)
4. 設定修正後、Connectorを再作成またはUpdate APIで設定更新
アンチパターン → 正解パターン変換演習
実際の現場でよくある設計ミスを5問取り上げます。アンチパターン(NG)のコードと正解パターン(OK)のコードを対比し、移行ポイントを解説します。
演習① SQS+Lambda個別処理 → Kinesis Enhanced Fan-Out + KCL 2.x
アンチパターン (NG): 高スループットイベントをSQS→Lambda個別処理で実装
# NG: SQS + Lambda個別処理 (高スループット時の問題)
# - メッセージ順序保証なし
# - 最大10件/バッチ(スループット制限あり)
# - Partition Key分散の仕組みがない
def lambda_handler(event, context):
for record in event['Records']:
# 1件ずつ処理 → 高コスト・高遅延
process_event(record['body'])
正解パターン (OK): Kinesis Enhanced Fan-Out + KCL 2.x
# OK: Kinesis Enhanced Fan-Out + KCL 2.x
# - Enhanced Fan-Out: 各Consumer専用2MB/sスループット(HTTP/2 push型)
# - KCL 2.x: 自動シャード管理・チェックポイント・リース管理
# - 順序保証(シャード単位)+ 並列Consumer構成
import json
import time
class StreamingConsumer:
def __init__(self):
self.CHECKPOINT_FREQ_SECONDS = 60
self.last_checkpoint_time = time.time()
def initialize(self, initialize_input):
self.shard_id = initialize_input.shard_id
def process_records(self, process_records_input):
records = process_records_input.records
# バッチ処理: 複数レコードを一度に処理
events = [json.loads(r.data) for r in records]
batch_process_events(events)
if time.time() - self.last_checkpoint_time > self.CHECKPOINT_FREQ_SECONDS:
process_records_input.checkpointer.checkpoint(
records[-1].sequence_number
)
self.last_checkpoint_time = time.time()
選定基準: 1秒あたり1,000件超のイベント処理、順序保証が必要、複数Consumer(Fan-Out)が必要な場合はKinesis一択。SQSは順序不要・低スループット・ポーリング型に向く。
演習② EC2上Kafka自前運用 → MSK Serverless + MSK Connect
アンチパターン (NG): EC2でApache Kafkaを自前構築・運用
# NG: EC2上でKafka自前運用(3台クラスター例)
# 問題: ZooKeeper管理・ブローカーパッチ適用・ディスク管理・ネットワーク設定が全て手動
# セキュリティパッチ未適用リスク / スケールアウト作業が複雑
# Kafkaブローカー起動(手動管理)
ssh kafka-broker-1
/opt/kafka/bin/kafka-server-start.sh \
/opt/kafka/config/server.properties \
--override broker.id=1 \
--override listeners=PLAINTEXT://kafka-broker-1:9092 \
--override log.dirs=/data/kafka-logs
正解パターン (OK): MSK Serverless + MSK Connect
# OK: MSK Serverless クラスター作成(管理不要)
aws kafka create-cluster-v2 \
--cluster-name my-msk-serverless \
--serverless '{
"vpcConfigs": [{
"subnetIds": ["subnet-aaa", "subnet-bbb"],
"securityGroupIds": ["sg-msk-brokers"]
}],
"clientAuthentication": {
"sasl": {"iam": {"enabled": true}}
}
}'
# MSK Connect でS3 Sink Connectorをマネージド実行
aws kafkaconnect create-connector \
--connector-name s3-sink-connector \
--connector-configuration '{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.bucket.name": "my-data-lake",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"tasks.max": "4",
"errors.deadletterqueue.topic.name": "s3-sink-dlq",
"errors.tolerance": "all"
}'
移行効果: ブローカー管理・ZooKeeper・パッチ適用・ディスク拡張が全てAWS管理になり、Kafkaインフラ運用工数を大幅削減できる。
演習③ 手動BI運用 → QuickSight SPICE + Embedding + RLS
アンチパターン (NG): オープンソースBIツール(Redash/Metabase)の自前EC2運用
# NG: MetabaseをEC2で自前運用
# 問題: アップデート管理・バックアップ・スケーリング・権限管理が全て手動
# セキュリティパッチ未適用リスク / 可用性SLA担保困難
docker run -d \
-p 3000:3000 \
--name metabase \
-v metabase-data:/metabase-data \
metabase/metabase:latest
正解パターン (OK): QuickSight SPICE + Embedding + RLS
{
"DataSetId": "sales-dataset",
"ImportMode": "SPICE",
"RowLevelPermissionDataSet": {
"Arn": "arn:aws:quicksight:ap-northeast-1:123456789012:dataset/rls-rules",
"PermissionPolicy": "GRANT_ACCESS",
"FormatVersion": "VERSION_2"
}
}
# QuickSight Embedding: アプリ内にダッシュボードを埋め込み
import boto3
qs = boto3.client('quicksight', region_name='ap-northeast-1')
# Registered User Embedding URL取得
response = qs.generate_embed_url_for_registered_user(
AwsAccountId='123456789012',
SessionLifetimeInMinutes=60,
UserArn='arn:aws:quicksight:ap-northeast-1:123456789012:user/default/analyst1',
ExperienceConfiguration={
'Dashboard': {'InitialDashboardId': 'my-dashboard-id'}
}
)
embed_url = response['EmbedUrl']
RLS設定のポイント: 初期設計時にRLS(Row-Level Security)を組み込む。後付けは既存ダッシュボード全てへの影響テストが必要になり工数が増大する。
演習④ EMR on EC2固定クラスタ → EMR Serverless + Auto-scaling
アンチパターン (NG): EMR on EC2の固定クラスターを24時間稼働
# NG: EMR on EC2固定クラスター(24時間課金・手動管理)
# 問題: ジョブがない時間帯も大型EC2インスタンスが課金継続
# クラスターバージョン管理・パッチ適用・ノード追加が手動
aws emr create-cluster \
--name "fixed-always-on-cluster" \
--instance-groups '[
{"InstanceGroupType": "MASTER", "InstanceType": "m5.2xlarge", "InstanceCount": 1},
{"InstanceGroupType": "CORE", "InstanceType": "m5.4xlarge", "InstanceCount": 10}
]' \
--release-label emr-6.15.0 \
--no-auto-terminate
正解パターン (OK): EMR Serverless + Auto-scaling
# OK: EMR Serverless Application作成
aws emr-serverless create-application \
--name my-spark-app \
--type SPARK \
--release-label emr-7.1.0 \
--auto-start-configuration '{"enabled": true}' \
--auto-stop-configuration '{"enabled": true, "idleTimeoutMinutes": 15}'
# Spark Job投入(使った時間のみ課金)
aws emr-serverless start-job-run \
--application-id ap-01234567890abcdef \
--execution-role-arn arn:aws:iam::123456789012:role/EMRServerlessRole \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://my-bucket/scripts/etl_job.py",
"sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.sql.shuffle.partitions=200"
}
}'
コスト削減効果: 固定クラスター(m5.4xlarge×10台、24時間)vs EMR Serverless(実行時2時間/日)で最大90%のコスト削減を実現するケースがある。
演習⑤ Kinesis→S3バッチ変換 → Firehose Dynamic Partitioning + Parquet直接出力
アンチパターン (NG): Kinesis→S3に生JSON保存→Lambda/Glueで定期バッチ変換
# NG: 生JSONをS3に保存してバッチ変換
# 問題: Athenaクエリのフルスキャン発生 / Parquet変換ジョブの別途管理
# 遅延発生・変換ジョブ障害時のデータ欠損リスク
aws firehose create-delivery-stream \
--delivery-stream-name json-raw-stream \
--extended-s3-destination-configuration '{
"BucketARN": "arn:aws:s3:::my-raw-json-bucket",
"Prefix": "raw/dt=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}/"
}'
# → 別途Glueジョブで毎時Parquet変換が必要(管理コスト高)
正解パターン (OK): Firehose Dynamic Partitioning + Parquet直接出力
{
"DeliveryStreamName": "parquet-partitioned-stream",
"ExtendedS3DestinationConfiguration": {
"BucketARN": "arn:aws:s3:::my-data-lake",
"Prefix": "events/year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/",
"ErrorOutputPrefix": "errors/!{firehose:error-output-type}/",
"DynamicPartitioningConfiguration": {
"Enabled": true,
"RetryOptions": {"DurationInSeconds": 300}
},
"ProcessingConfiguration": {
"Enabled": true,
"Processors": [{
"Type": "MetadataExtraction",
"Parameters": [
{
"ParameterName": "MetadataExtractionQuery",
"ParameterValue": "{year: .timestamp[0:4], month: .timestamp[5:7]}"
},
{"ParameterName": "JsonParsingEngine", "ParameterValue": "JQ-1.6"}
]
}]
},
"DataFormatConversionConfiguration": {
"Enabled": true,
"OutputFormatConfiguration": {
"Serializer": {"ParquetSerDe": {"Compression": "SNAPPY"}}
},
"SchemaConfiguration": {
"RoleARN": "arn:aws:iam::123456789012:role/FirehoseGlueRole",
"DatabaseName": "my_db",
"TableName": "events"
}
}
}
}
効果: Glue Data CatalogのスキーマをFirehoseが自動参照してParquet変換。Dynamic Partitioningでイベント内の属性値(年月等)をS3パス構造に直接反映。Athenaクエリのパーティションプルーニングが有効になり、スキャンコストを大幅削減。別途Glueバッチジョブ管理も不要になる。
まとめ — Data Analytics 二部作完成 × 55記事化達成
本記事の総括と習得ポイント
本記事ではKinesis Data Streams・Amazon MSK・Amazon QuickSight・EMR Serverlessという4つのAWSリアルタイム分析サービスの本番運用ノウハウを体系的に解説しました。各サービスで習得した最重要ポイントをまとめます。
Kinesis Data Streams — Shard設計とOn-Demand移行がコアスキル
Kinesis本番運用の最重要事項は「適切なShard数設計」と「On-Demand/Provisioned選定」です。WriteProvisionedThroughputExceeded エラーは、Shard数不足またはHot Shard(Partition Keyの偏り)が原因です。トラフィックの予測が困難なワークロードはOn-Demand、安定スループットが見込める場合はProvisionedを選定します。Enhanced Fan-OutとKCL 2.xを組み合わせることで、大量Consumer構成でも専用スループット(2MB/s/Consumer)を確保し、IteratorAge増加を防止します。
Amazon MSK — Serverless選定とRebalance防止がコアスキル
MSK本番運用の鍵は「MSK Serverless vs Provisioned MSKの選定判断」と「Consumer Group Rebalance Storm防止」です。MSK ServerlessはIAM認証のみ対応のため、既存SASL/SCRAM環境では注意が必要です。Rebalance Stormは session.timeout.ms・max.poll.interval.ms・Static Membershipの3点セットで防止します。MSK ConnectのDLQ設定は本番必須であり、Poison Pill問題を防ぐ唯一の確実な手段です。
Amazon QuickSight — SPICE容量管理とEmbedding設計がコアスキル
QuickSight本番運用で最も重要なのは「SPICEキャパシティ管理」です。SPICEはリージョン単位の固定容量であり、監視アラートなしに運用すると突然のリフレッシュ失敗に気づかないまま古いデータが表示され続けます。増分リフレッシュ・適切なデータセット設計・70/90%アラートの3点が安定運用の基本です。RLS設定は後付けが困難なため、初期設計時に組み込む必要があります。
EMR Serverless — Pre-initialized CapacityとSpark設計がコアスキル
EMR Serverlessで最重要なのは「コールドスタート制御」です。Pre-initialized Capacityは定時SLAがあるジョブで必須の設定です。コスト最適化の観点では、実行頻度・実行時間・アイドル課金の3要素を計算してPre-initializedの有無を決定します。大規模Sparkジョブでは spark.sql.shuffle.partitions をExecutor数×2〜4倍に設定し、Shuffle過多による性能劣化を防止します。
Vol1×Vol2統合視点 — バッチとリアルタイムの役割分担
Data Analytics二部作を通じて、AWSデータ分析基盤の全体像が完成しました。Vol1(バッチ分析)はデータの「蓄積・整理・分析」を担い、Vol2(リアルタイム分析)はデータの「流入・処理・可視化」を担います。S3データレイクを中心にVol1とVol2が連携し、Kinesis/MSKからS3へ取り込まれたリアルタイムデータをGlue/Athena/Redshiftで分析する統合パイプラインが、現代のAWSデータ基盤の標準的な構成です。実際のシステム設計では、両者を組み合わせた「Lambda Architecture」または「Kappa Architecture」の選択が重要な設計判断となります。
本番運用エンジニアが最初に直面する3つの壁
Shard/パーティション設計の壁: Kinesis・MSKともに、パーティション数の設計ミスはスループット不足または過剰コストに直結します。計算式に基づいた設計と、安全マージン(1.5倍)の確保が本番投入前の最重要チェックポイントです。On-DemandモードはShard管理不要ですが、安定スループット環境ではProvisionedよりコストが高くなる場合があります。
観測可能性(Observability)の壁: リアルタイムデータ基盤は障害が静かに起きます。IteratorAge増加・SPICEリフレッシュ失敗・EMR Serverlessジョブの遅延は、適切なCloudWatchアラートがなければ気づくのが遅れます。本番投入と同時にアラートを設定し、ダッシュボードを構築することが安定運用の前提条件です。
コスト管理の壁: Kinesis On-Demandの高水位課金・SPICE容量の予期しない超過・EMR ServerlessのPre-initialized課金・MSK Serverlessの書込/読取ユニット課金は、設計時に見えにくいコストです。Cost Explorerでサービス別タグを使ったコスト可視化を初日から設定し、月次のコスト異常アラートを必ず設定してください。
シリーズを通じたAWSデータエンジニアへの道
Vol1とVol2を通して学んだ8サービス(Athena・Glue・Lake Formation・Redshift Serverless・Kinesis・MSK・QuickSight・EMR Serverless)は、現代のデータエンジニアリングで最も需要の高い技術スタックです。AWSのデータ分析基盤設計・構築・運用の全工程をカバーするスキルセットを習得したことで、リアルタイムデータパイプラインのアーキテクチャ設計から本番運用まで一貫して対応できるエンジニアになれます。
AWS本番運用シリーズ Data Analytics 二部作完成
Vol1(バッチ分析基盤)とVol2(リアルタイム分析基盤)の二部作により、Data Analytics軸が完全に完成しました。AWSデータ分析に必要な6レイヤーを網羅しています。
Vol1 × Vol2 対比表 — バッチ vs リアルタイム
| 観点 | Vol1: バッチ分析基盤 | Vol2: リアルタイム分析基盤 |
|—|—|—|
| 主役サービス | Athena / Glue / Lake Formation / Redshift Serverless | Kinesis / MSK / QuickSight / EMR Serverless |
| データ特性 | 蓄積済みデータ / 静的データセット | 流入中データ / 継続的ストリーム |
| 処理タイミング | スケジュール実行(日次/時次) | リアルタイム / ニアリアルタイム |
| 主なユースケース | 日次集計 / DWH / データカタログ管理 | イベント処理 / リアルタイムBI / 異常検知 |
| コスト最適化 | クエリスキャン量削減(Parquet/パーティション) | Shard/パーティション適正化 / SPICE容量管理 |
| 権限管理 | Lake Formationによる統一ガバナンス | IAM + RLS(QuickSight)+ MSK ACL |
| 障害対応 | クエリ再実行 / Glueジョブ再起動 | Rebalance復旧 / Checkpoint再生 / DLQ確認 |
二部作で網羅したAWS Data Analyticsサービス一覧:
| カテゴリ | サービス | Vol |
|—|—|—|
| クエリ/分析 | Amazon Athena | Vol1 |
| ETL/データ統合 | AWS Glue(ETL / Data Catalog / Crawler) | Vol1 |
| データガバナンス | AWS Lake Formation | Vol1 |
| データウェアハウス | Amazon Redshift Serverless | Vol1 |
| ストリーミング取込 | Amazon Kinesis Data Streams + Firehose | Vol2 |
| メッセージング | Amazon MSK(Serverless + Connect + Schema Registry) | Vol2 |
| BI / 可視化 | Amazon QuickSight(SPICE + Embedding + RLS) | Vol2 |
| 分散処理 | Amazon EMR Serverless(Spark) | Vol2 |
Vol1 + Vol2 = Data Analytics軸完結。AWSデータ分析基盤の全レイヤーをカバー。
二部作統合設計のアーキテクチャパターン:
| アーキテクチャ | 構成 | 適用ユースケース |
|—|—|—|
| Lambda Architecture | バッチ層(Vol1)+スピード層(Vol2)+サービング層(QS) | 精度とリアルタイム性の両立 |
| Kappa Architecture | Vol2のみ(ストリームで全処理) | シンプルなリアルタイム処理 |
| Modern Data Stack | S3中心にVol1+Vol2を疎結合連携 | スケーラブルな分析基盤 |
どのアーキテクチャを選択する場合も、S3データレイクを中心とした「単一の真実の源(Single Source of Truth)」の確立が最重要設計原則です。Vol1のLake Formationによるガバナンスと、Vol2のKinesis/MSKによるリアルタイム取込を組み合わせることで、高品質でアクセス制御された統合データ基盤が実現します。
二部作を通じて身に付く設計スキル:
– データパイプライン全体設計: ソース → 取込 → 変換 → 蓄積 → 分析 → 可視化の全工程を設計できる
– コスト最適化設計: Shard/パーティション・SPICE・Pre-initialized・Retained Storageの適正サイジング
– 可用性・耐障害性設計: DLQ・チェックポイント・Multi-AZ・Auto-scaling の組合せ設計
– 権限管理設計: Lake Formation Fine-grained Access + QuickSight RLS の統合ガバナンス
– 監視・アラート設計: CloudWatchメトリクス・アラーム・ダッシュボードの体系的設計
55記事化達成 — AWS本番運用シリーズ全22軸完結
本記事の公開でAWS本番運用シリーズは55記事に到達しました。Data Analytics二部作の完成をもって全22軸の縦深化を達成しました。
全22軸 — AWS本番運用シリーズ完全ラインナップ:
| # | 軸タイトル | 巻数 | 完結状況 |
|—|—|—|—|
| 1 | EC2 / Auto Scaling 本番運用 | 複数巻 | 完結 |
| 2 | VPC / Networking 本番運用 | 複数巻 | 完結 |
| 3 | IAM / Security 本番運用 | 複数巻 | 完結 |
| 4 | RDS / Aurora 本番運用 | 複数巻 | 完結 |
| 5 | S3 基盤 本番運用 | 複数巻 | 完結 |
| 6 | Storage(EBS/EFS/FSx)本番運用 | Vol1-3 | 完結 |
| 7 | Lambda / Serverless 本番運用 | 複数巻 | 完結 |
| 8 | ECS / Fargate 本番運用 | 複数巻 | 完結 |
| 9 | CloudWatch / Observability 本番運用 | 複数巻 | 完結 |
| 10 | CloudFormation / IaC 本番運用 | 複数巻 | 完結 |
| 11 | DynamoDB 本番運用 | 複数巻 | 完結 |
| 12 | ElastiCache 本番運用 | 複数巻 | 完結 |
| 13 | API Gateway 本番運用 | 複数巻 | 完結 |
| 14 | Step Functions 本番運用 | 複数巻 | 完結 |
| 15 | EventBridge 本番運用 | 複数巻 | 完結 |
| 16 | SNS / SQS 本番運用 | 複数巻 | 完結 |
| 17 | Route 53 / DNS 本番運用 | 複数巻 | 完結 |
| 18 | CloudFront / CDN 本番運用 | 複数巻 | 完結 |
| 19 | EKS / Kubernetes 本番運用 | 複数巻 | 完結 |
| 20 | Network(Transit GW/Direct Connect/VPN)本番運用 | Vol1-3 | 完結 |
| 21 | Backup / DR 本番運用 | 複数巻 | 完結 |
| 22 | Data Analytics 本番運用 | Vol1 + Vol2 | ✓ COMPLETE |
55記事化達成により、AWSクラウドエンジニアが本番運用で直面するあらゆる領域を網羅するシリーズが完成しました。各軸の記事は個別に参照可能ですが、縦深化された複数巻を通して読むことで、入門から本番運用レベルまでの体系的なスキルアップが可能です。
55記事の規模感と学習時間:
| 項目 | 規模 |
|—|—|
| 総記事数 | 55記事 |
| 総軸数 | 22軸 |
| 各軸の平均巻数 | 2.5巻 |
| 1記事あたりの推定読了時間 | 20〜40分 |
| シリーズ全体の推定学習時間 | 22〜37時間 |
本シリーズは「知っている」から「本番で使える」への橋渡しを目的として設計されています。理論的な解説にとどまらず、実際の設定値・CLIコマンド・コードサンプル・詰まりポイントの解決策を体系化しています。AWS認定試験(SAP・DOP・DAS等)の実践的な補強教材としても活用できます。
Data Analytics Vol1 × Vol2 統合アーキテクチャ
graph TB
subgraph Vol1["Vol1: バッチ分析基盤 — 蓄積・整理・分析"]
Athena["Athena\nクエリ層"]
Glue["Glue\nETL + Data Catalog"]
LF["Lake Formation\nガバナンス・権限"]
RS["Redshift Serverless\nDWH"]
end
subgraph Vol2["Vol2: リアルタイム分析基盤 — 流入・処理・可視化"]
KDS["Kinesis Data Streams\nストリーミング取込"]
MSK["Amazon MSK\nメッセージング基盤"]
QS["QuickSight\nSPICE + Embedding + BI"]
EMR["EMR Serverless\nSpark分散処理"]
end
S3["Amazon S3\nData Lake\n統合ストレージ"]
KDS -->|"Firehose\nParquet変換"| S3
MSK -->|"MSK Connect\nS3 Sink"| S3
S3 --> Glue
Glue -->|"ETL変換"| S3
S3 --> Athena
S3 --> RS
S3 --> EMR
EMR -->|"処理結果"| S3
RS --> QS
Athena --> QS
LF -->|"Fine-grained\n権限制御"| Athena
LF -->|"Fine-grained\n権限制御"| Glue
LF -->|"Fine-grained\n権限制御"| RS
S3データレイクを中心に、Vol2(リアルタイム)から流入したデータをVol1(バッチ)で整理・分析し、QuickSightで可視化するフルサイクルアーキテクチャです。Lake FormationがVol1全体の権限管理を統括し、MSK ConnectとFirehoseがリアルタイムデータをS3に取り込む接続点となります。
次のステップ — 関連記事で学びをさらに深める
Data Analytics二部作を完了した皆さんへ、次のステップとして読むべき関連記事を案内します。
Vol1のバッチ分析基盤(Athena×Glue×Lake Formation×Redshift Serverless)をまだお読みでない方は、Vol2と合わせてAWSデータ分析の全体像を把握することを強くお勧めします。Vol1ではデータの蓄積・整理・ガバナンスの観点を、Vol2ではリアルタイム処理・可視化の観点を学べます。2記事を通読することで、S3中心のモダンデータアーキテクチャの設計力が身に付きます。
Step Functionsを活用したデータパイプラインのオーケストレーション、EventBridgeによるイベント駆動アーキテクチャ、SNS/SQSとKinesisの使い分けなど、本シリーズの他の軸と組み合わせることで、より実践的なデータ基盤設計の知識が得られます。
AWS本番運用シリーズの全22軸・55記事は、カテゴリページから一覧できます。自分のスキルギャップに合わせて読む順番を選んでください。各記事は独立して参照可能ですが、縦深化された複数巻を連続して読むことで、体系的なAWSエンジニアリング力の向上が期待できます。S3データレイクを軸としたデータ分析基盤の構築は、現代のクラウドエンジニアに必須のスキルセットです。ぜひVol1とVol2を組み合わせて、本番環境に耐えるリアルタイムデータ分析基盤を設計・構築してください。
Data Analytics本番運用を始める読者へのチェックリスト:
本記事の内容を実際のプロジェクトに適用する際の確認事項を整理しました。
- [ ] Kinesis Shard数を計算式で算出し、On-Demand/Provisioned選定を文書化した
- [ ] MSK Serverless vs Provisioned の選定判断をレイテンシ・スループット・コストで比較した
- [ ] KCL Consumer のDynamoDB管理テーブルを
PAY_PER_REQUESTに設定した - [ ] MSK Consumer の
session.timeout.ms・max.poll.interval.msを処理時間に合わせて設定した - [ ] MSK Connect の全ConnectorにDLQ(Dead Letter Queue)を設定した
- [ ] QuickSight SPICE容量の70%・90%アラートをCloudWatchに設定した
- [ ] EMR Serverless の Pre-initialized Capacity をSLA要件に基づいて設定した
- [ ] Firehose DataTransformation Lambdaの
BufferSizeInMBsを3MB以下に設定した - [ ] Kinesis/MSK/QuickSight/EMR Serverless の CloudWatchダッシュボードを作成した
- [ ] コスト可視化のためのリソースタグ(Project/Team/Environment)を全サービスに設定した
Vol1: バッチ分析基盤 (Athena×Glue×LF×Redshift)
関連: Cost Optimization本番運用 Vol1 | Cost Explorer×Budgets×Savings Plans×Compute Optimizer