- 1 1. リアルタイムストリーム処理の本番課題とManaged Service for Apache Flinkの全体像
- 2 2. Apache Flink基礎 — ステートフル処理・イベント時間・ウィンドウ
- 3 3. Managed Flinkアプリ — ランタイム・並列度・チェックポイント
- 4 4. 開発手法 — Table API/SQL・DataStream API・Studio notebooks
- 5 5. ソース&シンク統合 — Kinesis・MSK・S3・OpenSearch
- 6 6. スケール・運用・コスト
- 7 7. 実戦統合パターン — エンドツーエンド設計とKDA for SQL移行
- 8 8. つまずきポイント・アンチパターン・まとめ
- 9 Flinkの本番運用に関する疑問や設計レビューが必要な場合は、各リンク先の実践ガイドも合わせて参照されたい。
1. リアルタイムストリーム処理の本番課題とManaged Service for Apache Flinkの全体像

リアルタイムでイベントを処理し、集計・結合・異常検知を低レイテンシで行うためには、ステートフルなストリーム処理基盤が不可欠です。ユーザーの行動ログ、IoTセンサーデータ、決済トランザクションを発生と同時に分析する要件は増える一方で、バッチ処理では分単位・時間単位の遅延が避けられません。
リアルタイムストリーム処理が抱える本番課題
現場でリアルタイムストリーム処理を本番稼働させると、4つの壁にぶつかります。
① レイテンシとスループットのトレードオフ
ミリ秒オーダーの応答を要求されながら、毎秒数十万件のイベントを捌かなければならないシステムでは、マイクロバッチには限界があります。ストリーム処理エンジンは入力イベントをそのまま連続して処理し、レコードが届いた瞬間に下流へ流す設計が前提となります。チェックポイント間隔・並列度・ネットワーク帯域のバランスを崩すと、バックプレッシャが連鎖してレイテンシがスパイクします。
② 状態管理の複雑さ
集計・結合・重複排除はいずれも「過去のイベントを記憶した状態」を前提とします。インメモリで状態を保持すると障害時にデータが失われ、外部DBに毎回アクセスするとレイテンシが跳ね上がります。状態をローカルに保ちながら障害から正確に復旧するには、チェックポイントによる定期的な永続化と、それを支える状態バックエンドの設計が必要です。状態が肥大化すると、チェックポイント自体が遅延してスループット全体を圧迫します。
③ スケールアウトの難しさ
入力レートが急増したとき、処理ノードを追加するだけでは足りません。パーティション(シャード/Kafkaパーティション)と処理の並列度を対応させる設計が必要で、並列度が高すぎると状態のシャッフルコストが増大します。自動スケールには監視メトリクスに基づく精緻なポリシー設計が求められます。
④ 障害回復と正確な処理保証
障害からの復旧において「少なくとも1回(at-least-once)」では重複が生じ、「正確に1回(exactly-once)」を達成するにはソース・エンジン・シンクが連携してチェックポイントの整合性を保証する必要があります。自前でFlinkクラスターを構築・運用すると、ZooKeeperやJobManagerの冗長化、状態ストレージのバックアップ、アップグレード作業が運用負担となります。
Amazon Managed Service for Apache Flinkの全体像
Amazon Managed Service for Apache Flinkは、Apache Flinkアプリケーションをサーバーレスかつフルマネージドな環境で実行するAWSサービスです。インフラの調達・設定・パッチ適用・監視をAWSが担い、開発者はFlinkアプリのコードとビジネスロジックに集中できます。
サービスが管理する主な領域は以下の通りです。
- インフラプロビジョニング: KPU(Kinesis Processing Unit)と呼ばれる処理単位を自動で割り当て・回収します。
- チェックポイント管理: 設定した間隔でFlinkの状態を自動的に永続化し、障害発生時には最新のチェックポイントから自動復旧します。
- スケールアウト/イン: CloudWatchメトリクスに基づき、KPUを自動増減するオートスケールをサポートします。
- Flink Dashboard: ジョブの実行状況・バックプレッシャ・チェックポイント統計をウェブUIで確認できます。
- スナップショット(セーブポイント): 手動で任意時点の状態を保存し、アプリのバージョン更新やロールバックに活用できます。
- VPCサポート: アプリをVPC内に配置し、Kinesis/MSK/RDSなどプライベートなリソースへのアクセスを安全に構成できます。
- ログ統合: CloudWatch Logsへのログ出力をサポートし、アプリの起動エラーや実行ログを集中管理できます。
サービスはApache Flink 1.18・1.19・1.20をサポートしており(執筆時点)、Flinkのバージョンは更新が速いため、公開時に最新の対応バージョンをAWS公式ドキュメントで確認することを推奨します。
名称変更の経緯 — Kinesis Data Analyticsからの改称
2023年8月30日、AWSはAmazon Kinesis Data Analytics(KDA)をAmazon Managed Service for Apache Flinkに改称しました。機能・APIの破壊的変更はなく、既存のKDA for Apache Flinkアプリは改称後もそのまま動作します。改称の目的は、サービスの実態(Apache Flinkの管理実行)を名称でより明確に表すことにあります。
ただし、旧名称のAmazon Kinesis Data Analytics for SQL Applicationsは廃止スケジュールが確定しています。
| 廃止マイルストーン | 日付 |
|---|---|
| 新規アプリケーション作成の停止 | 2025年10月15日 |
| 既存アプリケーションの削除 | 2026年1月27日 |
KDA for SQL(SQLベースのストリーム処理)を利用中のユーザーは、Amazon Managed Service for Apache Flink(Flink SQL/Table APIまたはStudio notebooks)への移行が必要です。AWSは本サービスをKDA for SQLの公式移行先と位置づけています。移行の具体的な手順は§7で扱います。
KinesisとManaged Flinkの役割分担
AWS上でリアルタイムデータパイプラインを構築する際、KinesisとManaged Flinkは補完的な役割を担います。混同しやすいため、両者の位置づけを整理します。
| レイヤー | サービス | 役割 |
|---|---|---|
| 取込/配信層 | Kinesis Data Streams / Amazon MSK | イベントの受信・バッファリング・配信 |
| ステートフルリアルタイム処理層 | Managed Service for Apache Flink | 集計・結合・異常検知・ウィンドウ処理 |
| 分析/可視化層 | OpenSearch / QuickSight / S3+Athena | 結果の検索・可視化・バッチ分析 |
Kinesis Data Streamsは秒間数万〜数百万件のイベントを受け付け、シャードでスケールする高スループットな取込バッファです。プロデューサーがレコードを書き込み、コンシューマーが読み取ります。それ自体は「受け取ったレコードをそのまま渡す」だけで、集計・結合・ウィンドウ処理は行いません。
Amazon MSK(Managed Streaming for Apache Kafka)はKafkaプロトコルベースの取込・配信基盤です。Kinesis同様、取込と配信に特化しており、ステートフルな処理はコンシューマー側(Flinkなど)が担います。
Managed Service for Apache FlinkはKinesis Data StreamsやAmazon MSKを入力源(ソース)として受け取り、集計・結合・ウィンドウ・CEPといったステートフルな処理を担い、結果をシンク(S3・OpenSearch・Firehose等)に書き出します。取込は行わず、変換・集計・異常検知という処理層に特化しています。
この分離により、取込バッファを変えずに処理ロジックだけを入れ替えたり、複数のFlinkジョブが同じKinesisストリームを並行してコンシュームしたりといった柔軟なアーキテクチャが実現できます。
また、Kinesis Data StreamsやMSKはFlinkが停止している間もデータを保持し続けます(リテンション期間内)。デプロイ・更新・スケール操作中にデータを失わないこの設計上の独立性が、取込層と処理層を分けるメリットの一つです。
想定読者と既存記事との棲み分け
本記事はAWSでリアルタイムストリーム処理を設計・実装・運用する方を対象としています。Apache Flink自体の入門説明は最小限にとどめ、Managed Flinkサービス固有の設定・運用・コスト設計に集中します。
以下の既存記事と合わせて読むことで、データパイプライン全体を把握できます。
- Kinesis/MSKの取込設定: 「データ分析本番運用 Vol2 Kinesis/MSK/QuickSight/EMR Serverless」を参照してください。本記事ではKinesis・MSKの基本設定は再解説しません。
- データレイク分析(S3/Athena/Glue/Redshift): 「データレイク本番運用 Vol1 Glue/Athena/Redshift」を参照してください。Flinkの出力先としてS3を使う場合の詳細はそちらで扱います。
- OpenSearchへの出力: 「OpenSearch本番運用 Vol1」を参照してください。Flinkのシンクとしての設定はそちらに詳細があります。
主要ユースケース
Managed Service for Apache Flinkが実際に活用される代表的なユースケースを示します。
リアルタイムダッシュボード
Kinesis Data Streamsにデータを集めてFlinkで集計し、OpenSearchまたはDynamoDBに書き出すことで、秒単位で更新されるリアルタイムダッシュボードを実現できます。バッチ集計では達成できないミリ秒〜秒オーダーの鮮度が求められる場面に適しています。
不正検知・異常検知
決済トランザクションや認証ログをリアルタイムにFlinkで分析し、CEP(複合イベント処理)や時間ウィンドウ内のパターンマッチングで不審な動作を検出します。バッチ処理では見逃す数秒〜数分以内のイベントパターンをリアルタイムに捕捉できます。
IoTテレメトリ集計
工場やデバイスから送られるセンサーデータをMSK/Kinesisを通じてFlinkで受け取り、タンブリングウィンドウで一定期間の平均・最大・最小を算出してS3またはOpenSearchに格納します。イベント時間ベースの集計により、ネットワーク遅延によるデータ到着のばらつきを正確に扱えます。
ストリーミングETL
ソースシステムからのデータをFlinkで変換・エンリッチ・フィルタリングし、データレイク(S3)やDWH(Redshift)に継続的に送り込むストリーミングETLパイプラインを構築できます。バッチETLのように数時間ごとに実行するのではなく、データが発生した直後に変換処理を完了させられます。
- Apache Flink基礎 — ステートフル処理/イベント時間/ウォーターマーク/ウィンドウ(§2)
- Managed Flinkアプリ — ランタイム/並列度/チェックポイント/スナップショット(§3)
- 開発手法 — Table API/SQL・DataStream API(Java/Scala/Python)・Studio notebooks(§4)
- ソース&シンク統合 — Kinesis/MSK/S3/OpenSearch/Firehose(§5)
- スケール・運用・コスト(KPU)とKinesis Data Analytics for SQL移行(§6・§7)
- Kinesis Data Streams/MSK(Kafka) — ストリームの取込/配信(既存データ分析記事)
- Athena/Glue/Redshift — バッチ/クエリ/ETL(既存データ分析・データレイク記事)
- 本記事=Apache Flinkによるステートフルなリアルタイムストリーム処理(イベント時間/ウィンドウ/結合/CEP)
- ★2023年にKinesis Data Analyticsから改称。KDA for SQL(廃止予定)の公式移行先
ストリーム取込(Kinesis/MSK)はこちら(データ分析本番運用 Vol2)
2. Apache Flink基礎 — ステートフル処理・イベント時間・ウィンドウ

バッチ処理とストリーム処理の違い
データ処理には大きく2つのパラダイムがあります。バッチ処理では蓄積されたデータをまとめて一括処理します。時間的な境界が明確で実装はシンプルですが、処理が完了するまで結果を得られないため、分析から行動までのレイテンシが分〜時間単位になります。
ストリーム処理は、イベントが発生するたびに逐次処理するパラダイムです。Apache Flinkは無限に流れ続けるデータストリームを低レイテンシで処理するために設計されたストリーム処理エンジンです。イベントをミリ秒〜秒単位で処理し、異常検知・リアルタイムダッシュボード・ストリーミングETLのようなユースケースを実現します。
Flinkは「無限データストリームに対するステートフル処理」を中核概念として設計されており、バッチ処理もストリームの有限版として統一的に扱います。これにより同じAPI・ランタイムでバッチとストリームの両方を実装できます。
ステートフル処理とステートレス処理
ステートレス処理では各イベントを独立して処理します。フィルタリングや変換といった単純な操作はステートレスで実現できますが、集計・結合・重複排除には過去イベントの情報が必要です。
ステートフル処理では、Flinkが状態(State)を保持します。状態はRocksDB(デフォルト)またはJVMヒープ上に保持され、チェックポイントによって定期的に永続化されます。状態には2つの種類があります。
キー付き状態(Keyed State): キーでパーティションされたストリームにおいて、各キーに独立した状態を持ちます。ユーザーIDをキーにした場合、ユーザーごとのセッション状態や累計値を個別に管理できます。利用できる状態型には ValueState・ListState・MapState・ReducingState・AggregatingState があります。
オペレータ状態(Operator State): オペレータ(処理ノード)単位で保持する状態です。Kafkaコンシューマのオフセット管理などに使われます。並列度を変更した場合でも状態が適切に再配置されるため、スケールアウト時の状態管理を意識せずに実装できます。
状態の永続化はチェックポイントで行われ、障害発生時は最後のチェックポイントから処理を再開します。AWS環境ではチェックポイントのストレージにAmazon S3を使うことが一般的です(§3で詳述)。
イベント時間・処理時間・取込時間
Flinkには3種類の時刻モデルがあります。どの時刻基準を使うかは、集計の正確性とレイテンシのトレードオフを左右する重要な設計判断です。
イベント時間(Event Time): イベントが実際に発生した時刻です。ログのタイムスタンプやセンサーの計測時刻など、データに埋め込まれた時刻を使います。ネットワーク遅延やシステム障害でFlinkへの到達が遅れた場合でも、イベント時間基準で正確な集計が可能です。本番システムで最も推奨される時刻モデルです。
処理時間(Processing Time): Flinkがイベントを処理した時刻(システムクロック)です。実装が最もシンプルでレイテンシも最小ですが、遅延イベントが来た場合に集計結果が変わり得るため、正確性が求められるユースケースには適しません。
取込時間(Ingestion Time): イベントがFlinkソースに取り込まれた時刻です。イベント時間と処理時間の中間的な特性を持ちますが、現在はイベント時間が主流のため、取込時間の利用シーンは限られています。
本番環境でリアルタイムダッシュボードや不正検知を実装する場合は、イベント時間を使うことを推奨します。処理時間を使うのは、プロトタイピングや遅延許容度が高いユースケースに限定するのが適切です。
ウォーターマーク — 遅延データの許容と正確性
ストリーム処理でイベント時間を使う場合、遅延イベント(Late Events)への対処が必要です。ウォーターマーク(Watermark)は「この時刻以前のイベントはすべて到着済み」という宣言で、Flinkはウォーターマークに基づいてウィンドウの締め切りタイミングを判断します。
ウォーターマークの動作: たとえば現在の最大イベント時間に対して5秒の許容遅延を設定した場合、その5秒前の時刻をウォーターマークとして発行します。ウォーターマークを超えて到着したイベントは「遅延データ」として扱われます。Flinkはウォーターマークが特定の時刻を超えた瞬間に、その時刻に対応するウィンドウを閉じて集計結果を出力します。
正確性とレイテンシのトレードオフ: ウォーターマークの遅延許容を大きく設定すると多くの遅延イベントを取り込めるため集計の正確性は上がりますが、ウィンドウの出力が遅れレイテンシも増加します。逆に遅延を小さくするとレイテンシは下がりますが、遅延イベントの欠損リスクが高まります。
固定遅延ウォーターマーク(BoundedOutOfOrderness): 最もよく使われるウォーターマーク戦略です。最大遅延量を固定値で指定します。
// 最大5秒の遅延を許容するウォーターマーク設定例
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getTimestamp());
stream.assignTimestampsAndWatermarks(strategy);
遅延データの処理: ウォーターマークを超えて届いた遅延データは、デフォルトでは無視されます。allowedLatenessを設定することで、ウィンドウが閉じた後も一定期間ウィンドウを再更新できます。さらに遅れたイベントはサイドアウトプット(Side Output)に振り分け、別処理で取り込むことも可能です。遅延データの破棄数はCloudWatchのnumLateRecordsDroppedメトリクスで確認できます。
ウィンドウ処理 — タンブリング・スライディング・セッション・グローバル
ストリームのイベントを有限の集合にまとめて集計するのがウィンドウ処理です。Flinkは4種類のウィンドウタイプを提供します。
タンブリングウィンドウ(Tumbling Window): 重複のない固定サイズウィンドウです。5分間隔のタンブリングウィンドウは、0-5分・5-10分・10-15分のように重複なく区切ります。各イベントは必ず1つのウィンドウに属します。分単位の集計レポートや定期バッチ処理の置き換えに適しています。
スライディングウィンドウ(Sliding Window): スライド間隔でずれていく固定サイズウィンドウです。ウィンドウサイズ10分・スライド1分なら、0-10分・1-11分・2-12分のように毎分更新される10分集計を得られます。各イベントは複数のウィンドウに属するため、処理負荷はタンブリングより高くなります。移動平均や直近N分の集計に適しています。
セッションウィンドウ(Session Window): イベントの発生間隔(ギャップ)に基づいてウィンドウを区切ります。ギャップ30分なら、最後のイベントから30分以上間が空いた時点でセッションを閉じます。ユーザーアクティビティのセッション分析や、センサーデータの活動区間抽出に適しています。サイズが可変で事前に決められない点が特徴です。
グローバルウィンドウ(Global Window): すべてのイベントを1つのウィンドウにまとめます。カスタムトリガーを組み合わせて独自の区切り条件を実装する際に使います。件数ベースのウィンドウ(CountWindow)はグローバルウィンドウの応用です。
// タンブリングウィンドウ(5分ごとのカウント)の例
stream
.keyBy(event -> event.getUserId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("count");
// スライディングウィンドウ(10分幅・1分スライド)の例
stream
.keyBy(event -> event.getUserId())
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
.aggregate(new AverageAggregator());
Exactly-once処理保証 — チェックポイントとシンク冪等性
ストリーム処理では「メッセージを必ず1回だけ処理する」Exactly-once保証が重要です。Flinkはチェックポイントとシンクの冪等性を組み合わせてExactly-onceを実現します。
チェックポイント: Flinkは定期的にすべてのオペレータの状態を永続ストレージ(Amazon S3)にスナップショットとして保存します。障害発生時は最後のチェックポイントから処理を再開します。チェックポイントはChandy-Lamportアルゴリズムに基づき、処理を止めずに分散スナップショットを取得します。
at-least-once と Exactly-once: at-least-onceでは再処理時に同じイベントが複数回シンクに届く場合があります。Exactly-onceではシンクへの書き込みをトランザクションとして管理し、チェックポイントの完了と連動させます。ソース側(Kinesisなど)では読み取りオフセットを状態として保持し、再処理時に同じレコードから読み直します。
2フェーズコミット(2PC): FlinkのExactly-onceシンクは2フェーズコミットを使います。チェックポイント開始時点でシンクへプレコミットし、チェックポイント完了後に本コミットします。Amazon Kinesis Data StreamsとApache Kafkaシンクはこの方式に対応しています。2PCに対応していないシンクでは、Exactly-onceを完全に保証できない点に注意が必要です。
シンクの冪等性: 2PCが使えないシンク(S3など)では、冪等な書き込み(同じキーへの書き込みは上書き)でExactly-onceに近い動作を実現します。JDBCシンクはUPSERT(INSERT … ON CONFLICT DO UPDATE)による冪等書き込みに対応しています。S3シンクはパーティション単位でファイルを最終的に確定する方式を採用しています。
CEP — 複合イベント処理
CEP(Complex Event Processing)は複数のイベントのパターンを検知する技術です。FlinkのPattern APIを使うと、イベントの順序・時間制約・繰り返しを組み合わせた複雑なパターンを宣言的に記述できます。
代表的なユースケースは不正検知(短時間に複数の大額取引)、IoTアラート(センサー値が閾値を連続して超過)、セキュリティ監視(ログインエラーの連続)などです。
// 3分以内に3回以上ログイン失敗のパターン検知例
Pattern<LoginEvent, ?> pattern = Pattern.<LoginEvent>begin("failedLogin")
.where(SimpleCondition.of(e -> e.getType().equals("LOGIN_FAIL")))
.timesOrMore(3)
.within(Time.minutes(3));
PatternStream<LoginEvent> patternStream = CEP.pattern(keyedStream, pattern);
patternStream.process(new AlertProcessFunction());
パターンには begin / next(連続) / followedBy(非連続) / notNext / notFollowedBy などのシーケンス演算子があります。times / timesOrMore / optional といった量化子で繰り返しを表現し、within 条件で時間制約を設定します。複雑なパターンも宣言的に記述できるため、条件ロジックをコードへ分散させず管理できます。
状態操作のベストプラクティス
状態を扱うFlinkアプリを設計する際に知っておくべきベストプラクティスをまとめます。
状態のスコープを最小化する: 必要なフィールドのみを状態に保存します。大きなオブジェクトを丸ごと状態に入れるのではなく、後続処理に必要な情報のみを取り出して保存します。状態サイズを小さく保つことで、チェックポイントの所要時間とS3転送量を削減できます。
状態スキーマの互換性を維持する: 状態に保存するクラスのフィールドを削除・型変更すると、既存のチェックポイントからの復元に失敗します。後方互換性のある変更(フィールドの追加)か、スナップショット経由のアップグレードを計画します。
ウィンドウトリガーとlateness: Flinkのウィンドウはデフォルトのイベント時間トリガーに加え、CountTrigger(件数基準)などカスタムトリガーを設定できます。allowedLatenessとsideOutputTagを組み合わせると、遅延データを本流から分離して後処理できます。
集計関数の選択: 単純な集計(sum/count/avg)にはAggregateFunctionまたはReduceFunctionを使います。これらは状態として中間集計値のみを保持するため効率的です。一方、全入力イベントのリスト保持が必要な場合のみListStateを使います。状態サイズが増大しやすいListStateは、TTLと組み合わせて使用することを推奨します。
マネージド状態 vs Table API: DataStream APIではgetRuntimeContext().getState()で状態を明示的に宣言します。Table API/SQLではFlinkが内部的に状態を管理するため、開発者が状態を直接操作する必要はありません。複雑な状態操作が必要な場合はDataStream API、シンプルな集計・結合はTable API/SQLを選択します。
BackpressureとFlinkダッシュボード: Managed Flinkには組み込みのFlinkダッシュボードがあり、各オペレータのスループット・レイテンシ・バックプレッシャの状態を可視化できます。バックプレッシャが発生しているオペレータはダッシュボード上で赤くハイライトされます。CloudWatchメトリクス(backPressuredTimeMsPerSecond)と組み合わせて性能ボトルネックを特定します。
まとめ: Flink基礎を理解した上でManaged Flinkを使う
本節で解説したApache Flinkの基礎概念は、Managed Flinkアプリを設計・実装する際の土台となります。実装を始める前に以下の点を確認することを推奨します。
- どの時刻モデルを使うか(イベント時間 vs 処理時間)
- ウォーターマークの許容遅延をどこに設定するか
- どのウィンドウタイプが要件に合うか
- 状態はどこに何を保持するか、TTLは必要か
- Exactly-onceが必要か、シンクは冪等か
- ステートフル処理 — 過去イベントの状態を保持し集計/結合/重複排除を実現
- イベント時間+ウォーターマーク — イベント発生時刻基準の正確な集計と遅延データ処理
- ウィンドウ — タンブリング(固定)/スライディング(重複)/セッション(活動間隔)
- Exactly-once処理保証・CEP(複合イベント処理)
3. Managed Flinkアプリ — ランタイム・並列度・チェックポイント

Amazon Managed Service for Apache Flinkでアプリを本番稼働させるには、ランタイムの選定からチェックポイント・スナップショットの設計、状態バックエンドの最適化まで、複数の運用要素を体系的に組み合わせることが必要です。本節では、アプリ作成から日常的なライフサイクル管理まで、Managed Flink固有の構成要素を解説します。
3.1 アプリ作成フロー
Managed Flinkアプリは、マネジメントコンソール・AWS CLI・CloudFormation・Terraformから作成できます。アプリのコードはJARファイル(Java/Scala)・Pythonスクリプト(PyFlink)・SQLアプリのいずれかでS3バケットにアップロードし、アプリ設定でそのS3パスを参照します。
マネジメントコンソールからの作成手順
マネジメントコンソールで「Amazon Managed Service for Apache Flink」を開き、「アプリケーションを作成」を選択します。アプリ名・説明・ランタイム(Flinkバージョン)を指定した後、コードの場所としてS3バケットとオブジェクトキーを入力します。実行ロールにはKinesis・S3・MSKなど必要なサービスへのアクセス権を持つIAMロールを指定します。
AWS CLIからの作成
aws kinesisanalyticsv2 create-application \
--application-name my-flink-app \
--runtime-environment FLINK-1_20 \
--service-execution-role arn:aws:iam::123456789012:role/ManagedFlinkRole \
--application-configuration '{
"FlinkApplicationConfiguration": {
"CheckpointConfiguration": {
"ConfigurationType": "DEFAULT"
},
"ParallelismConfiguration": {
"ConfigurationType": "CUSTOM",
"Parallelism": 4,
"ParallelismPerKPU": 1,
"AutoScalingEnabled": true
}
},
"ApplicationCodeConfiguration": {
"CodeContent": {
"S3ContentLocation": {
"BucketARN": "arn:aws:s3:::my-app-bucket",
"FileKey": "flink-app-1.0.jar"
}
},
"CodeContentType": "ZIPFILE"
}
}'
Pythonアプリ(PyFlink)はZIPファイル形式でパッケージし、CodeContentTypeをZIPFILEに設定します。依存ライブラリはZIP内に同梱するか、カスタムコネクタとしてS3に配置します。SQLアプリはStudio notebooksを使ってFlink SQL形式で作成します(詳細は§4参照)。
3.2 ランタイム設定とKPUの関係
Managed Flinkが対応するFlinkバージョンは1.18・1.19・1.20です(執筆時点)。新機能やコネクタのアップデートは最新バージョンに追加されるため、新規アプリは最新安定版を選択することを推奨します。バージョンはアプリ更新時に変更可能ですが、コードの互換性確認が必要です。
KPU(Kinesis Processing Unit)の概念
KPUはManaged Flinkにおけるリソース管理と課金の基本単位です。1 KPUは1 vCPUと4 GBのメモリに相当し、秒単位で課金されます。アプリの並列度(parallelism)とKPU数は以下の関係を持ちます。
Parallelism: アプリ全体で同時実行するタスク数ParallelismPerKPU: 1 KPUあたりに割り当てるタスク数(デフォルト1)- 必要KPU数 ≒ Parallelism ÷ ParallelismPerKPU
例えば、Parallelism=4・ParallelismPerKPU=1の設定では最低4 KPUが必要です。ParallelismPerKPU=2に変更すると2 KPUで4並列を実現できますが、1 KPUあたりのメモリ4 GBを2タスクで共有するため、状態サイズが大きいアプリではOOMエラーのリスクがあります。スループット重視の本番アプリはParallelismPerKPU=1を基本とし、コスト最適化を検討する場合はメモリ使用量を測定した上で調整します。
オートスケール
AutoScalingEnabled: trueを設定するとCloudWatchメトリクスに基づいてKPUが自動的に増減します。スケールアップはCPU使用率などの負荷増加で発生し、スケールダウンは負荷の低下で発生します。スケール時にはチェックポイントが自動実行されるため、チェックポイント間隔を適切に設定することが重要です。
KinesisシャードやMSKパーティション数とparallelismが不整合な場合、一部のタスクが処理を担わないアイドル状態になります。Kinesis/MSKのシャード数に合わせてparallelismを設定することでリソースを最大限に活用できます。
3.3 チェックポイント — 自動状態永続化と障害復旧
チェックポイントはManaged Flinkが自動管理する状態の定期スナップショットです。アプリが障害などで予期せず停止した場合、最後に成功したチェックポイントから処理を自動的に再開します。
チェックポイントの仕組み
Managed Flinkはバリアベースのチェックポイント(Chandy-Lamportアルゴリズムベース)を実装しています。ストリームにバリアを挿入し、すべてのオペレーターがバリアを通過した時点の状態をS3に書き込みます。この仕組みにより、ソース(Kinesis/MSK)とシンク(S3/OpenSearch等)が冪等操作をサポートする場合に正確に1回(exactly-once)の処理保証を実現します。
チェックポイント設定パラメーター
| パラメーター | デフォルト | 説明 |
|---|---|---|
| チェックポイント間隔 | 60,000 ms | 連続するチェックポイントの開始間隔 |
| チェックポイントタイムアウト | 60,000 ms | チェックポイント完了の最大待機時間 |
| 最小一時停止間隔 | 5,000 ms | チェックポイント完了から次開始までの最小間隔 |
| 同時チェックポイント数 | 1 | 並列実行できるチェックポイントの最大数 |
チェックポイント間隔を短くしすぎると、バリア挿入とS3書込みのオーバーヘッドでスループットが低下します。間隔を長くしすぎると、障害時の再処理データ量が増加し、回復レイテンシも拡大します。状態サイズとスループット要件に応じて60秒〜5分の範囲で調整することを推奨します。
ConfigurationType
DEFAULT: AWSが推奨するデフォルト設定(間隔60秒)を使用CUSTOM: チェックポイント間隔・タイムアウト・最小一時停止間隔を個別に指定
3.4 スナップショット — 手動セーブポイントとアップグレード戦略
スナップショットはチェックポイントとは異なり、ユーザーが任意のタイミングで手動作成する状態保存です。Apache FlinkのセーブポイントIに相当し、アプリのアップグレードやロールバック時に状態を安全に引き継ぐために使用します。
スナップショットの作成
aws kinesisanalyticsv2 create-application-snapshot \
--application-name my-flink-app \
--snapshot-name before-upgrade-v2
スナップショットはアプリ更新の直前に必ず作成することを推奨します。作成には数秒〜数分を要し、実行中アプリへの影響は最小限です。
スナップショットからのアップグレード手順
- 現行バージョンのスナップショットを作成
- 新バージョンのJAR/PythonファイルをS3にアップロード
- アプリを停止(
stop-application) - アプリコード設定を新バージョンのS3パスに更新
RESTORE_FROM_CUSTOM_SNAPSHOTモードで起動
aws kinesisanalyticsv2 start-application \
--application-name my-flink-app \
--run-configuration '{
"ApplicationRestoreConfiguration": {
"ApplicationRestoreType": "RESTORE_FROM_CUSTOM_SNAPSHOT",
"SnapshotName": "before-upgrade-v2"
}
}'
この手順により、新バージョンが旧バージョンの処理中状態(集計値・オフセット等)を引き継いで処理を再開します。状態スキーマ(フィールド名・型)を変更するとスナップショットからの復元に失敗するため、互換性のないスキーマ変更を伴うアップグレードでは別途データ移行手順を検討します。
ロールバック手順
アップグレード後に問題が発生した場合、旧バージョンのJARと旧スナップショット名を指定して起動し直すことでロールバックできます。スナップショットはアプリ停止後も保持されるため、本番切替前のスナップショットを少なくとも1週間は保管します。
3.5 状態バックエンドの選定
状態バックエンドは、Flinkアプリが保持するキー付き状態(集計値・結合テーブル・重複排除セット等)の保存方式を決定します。Managed Flinkでは以下の2種類をサポートしています。
RocksDB状態バックエンド(本番推奨)
状態をRocksDB(組み込みキーバリューストア)に保存します。大規模な状態を効率的に扱い、状態がJVMメモリを超えるとディスクにスピルするため、数十GB〜数百GBの状態を持つアプリでも安定動作します。チェックポイント時はRocksDBのインクリメンタルスナップショットをS3に書き込みます。新規アプリのデフォルトはRocksDBです。
メモリ状態バックエンド(HashMapStateBackend)
状態をJVMヒープメモリ上に保持します。小規模な状態(数十MB以下)に限定する場合は低レイテンシを実現できますが、状態がメモリを超えるとOOMエラーが発生します。本番環境では基本的にRocksDBを選択し、HashMapは開発・テスト用途に限定します。
| 項目 | RocksDB | メモリ(HashMap) |
|---|---|---|
| 状態サイズ上限 | KPUのローカルディスク容量 | JVMヒープ内 |
| 読み書きレイテンシ | 中(ディスクI/O) | 低(メモリ直接) |
| チェックポイント速度 | インクリメンタルで効率的 | フル書込み |
| 推奨ケース | 大規模・本番 | 小規模・開発 |
3.6 ライフサイクル管理
起動と停止
アプリの起動はstart-applicationコマンドで行います。起動時の動作モードは以下から選択します。
SKIP_RESTORE_FROM_SNAPSHOT: 状態をリセットして最初から処理(初回起動・テスト時)RESTORE_FROM_LATEST_SNAPSHOT: 最新のチェックポイントから再開(通常の再起動)RESTORE_FROM_CUSTOM_SNAPSHOT: 指定スナップショットから再開(アップグレード時)
停止はstop-applicationで行います。Force=false(デフォルト)の場合、実行中のチェックポイントが完了してから停止するためデータ損失を防止できます。
aws kinesisanalyticsv2 stop-application \
--application-name my-flink-app \
--force false
コード・設定の更新
アプリコードを更新する場合は、新しいJARをS3にアップロードしupdate-applicationでS3パスを変更します。チェックポイント間隔・並列度・オートスケールなどの設定変更も同じコマンドで行います。更新はアプリを停止した状態で適用し、再起動時にスナップショットから状態を復元します。
Flinkバージョンのアップグレード
FlinkバージョンアップはAWSのマネジメントコンソールまたはCLIで実施します。上記スナップショット手順と組み合わせ、旧バージョンの状態を新バージョンに引き継ぎます。使用するコネクタのバージョン(flink-connector-kinesis等)もFlinkバージョンと合わせて更新し、バージョン間の互換性はAWSの公式リリースノートで確認します。
- ランタイム — Flink 1.18/1.19/1.20。並列度(parallelism)とKPU(1vCPU+4GB)の関係を設計
- チェックポイント — 実行中状態を定期永続化し障害から自動復旧
- スナップショット(セーブポイント) — 手動の状態保存→アプリ更新/ロールバック時に復元
- 状態バックエンド(RocksDB)・アプリライフサイクル(起動/更新/停止)・実行IAMロール
4. 開発手法 — Table API/SQL・DataStream API・Studio notebooks
Managed Service for Apache Flinkでアプリケーションを開発する手法は大きく3種類あります。宣言的なSQL/Table APIによる記述、低レベルな柔軟性を持つDataStream API、そしてApache Zeppelinベースのインタラクティブ開発環境であるStudio notebooksです。ユースケース・チームのスキルセット・要求される柔軟性によって最適な手法を選択することが重要です。
Table API / Flink SQL
Table APIは、ストリームをリレーショナルテーブルとして扱い、SQL文または宣言的なAPIで処理を記述する方式です。SQLに近い構文を使えるため、データエンジニアや分析担当者がすぐに習得できます。集計・結合・フィルタリングといった典型的な分析ユースケースに最も適しています。
Flink SQLの基本的なワークフローは、①ソーステーブルをDDLで定義、②シンクテーブルをDDLで定義、③INSERT INTO文でデータフローを記述、という3ステップです。
ソーステーブルの定義(Kinesis Data Streamsコネクタ)
CREATE TABLE kinesis_orders (
order_id STRING,
user_id STRING,
amountDOUBLE,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kinesis',
'stream' = 'orders-stream',
'aws.region' = 'ap-northeast-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
WATERMARK FOR order_timeの宣言により、イベント時間ベースのウィンドウ処理が可能になります。INTERVAL '5' SECONDは5秒の遅延許容量を意味し、遅延5秒以内のレコードを正確に集計対象に含めます。
シンクテーブルの定義(S3コネクタ)
CREATE TABLE s3_order_summary (
window_start TIMESTAMP(3),
window_endTIMESTAMP(3),
total_amount DOUBLE,
order_count BIGINT
) WITH (
'connector' = 'filesystem',
'path'= 's3://my-bucket/order-summary/',
'format' = 'json'
);
ウィンドウ集計クエリ
INSERT INTO s3_order_summary
SELECT
TUMBLE_START(order_time, INTERVAL '1' MINUTE) AS window_start,
TUMBLE_END(order_time, INTERVAL '1' MINUTE)AS window_end,
SUM(amount)AS total_amount,
COUNT(*)AS order_count
FROM kinesis_orders
GROUP BY TUMBLE(order_time, INTERVAL '1' MINUTE);
このクエリは1分間のタンブリングウィンドウで注文金額を集計します。TUMBLE関数はFlink SQL固有のウィンドウ関数であり、固定間隔で重複なくイベントを振り分けます。
MSK(Kafka)コネクタのDDL例
CREATE TABLE kafka_events (
event_type STRING,
payload STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic'= 'application-events',
'properties.bootstrap.servers' = 'broker1:9092,broker2:9092',
'properties.group.id' = 'flink-consumer-group',
'scan.startup.mode'= 'latest-offset',
'format' = 'json'
);
MSKコネクタはApache Kafkaコネクタと同一です。properties.*プレフィックスでKafkaクライアントプロパティをそのまま渡せます。
KDA for SQLからの移行先としてのFlink SQL
Kinesis Data Analytics for SQL Applicationsは2025年10月15日に新規作成が停止し、2026年1月27日に削除されました。既存のKDA SQLアプリケーションの公式な移行先がManaged FlinkのFlink SQLです。移行の流れは次の通りです。
- 既存のKDA SQLアプリケーションのSQL文を棚卸しします
- Studio notebookでFlink SQL構文に書き換えます
- notebookのインタラクティブ実行で動作を確認します
- 検証済みのSQLをManaged Flinkアプリケーションとしてデプロイします
KDA SQLはCREATE OR REPLACE STREAMとCREATE OR REPLACE PUMPという独自の記法を使いますが、Flink SQLは標準的なCREATE TABLE/INSERT INTO形式です。SQL文の書き換えが必要になりますが、処理ロジック自体は概ね移植できます。
DataStream API
DataStream APIは、ストリーム処理を低レベルで記述できる最も柔軟な開発インターフェースです。カスタムロジックの実装・機械学習モデルとの統合・複雑な状態管理が必要なシナリオに適しています。Java・Scala・Python(PyFlink)で記述できます。
言語選定基準
| 言語 | 推奨シナリオ | 備考 |
|---|---|---|
| Java | 高パフォーマンス・本番用途・エコシステム活用 | Flinkの主要言語。ライブラリが充実しています |
| Scala | 関数型スタイル・既存Scalaコードベースとの統合 | Flink 1.12以降はJARモジュールとして提供されます |
| Python(PyFlink) | データサイエンスチーム・ML推論パイプライン統合 | Java/Scalaより若干オーバーヘッドがあります |
一般的な本番用途ではJavaが推奨されます。PyFlinkはPython開発者が多いチームやML推論との統合に適しています。
DataStream処理の基本パターン(Java)
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// ソース: Kinesis Data Streamsから読み込み
Properties kinesisProps = new Properties();
kinesisProps.put(AWSConfigConstants.AWS_REGION, "ap-northeast-1");
kinesisProps.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
DataStream<String> rawStream = env.addSource(
new FlinkKinesisConsumer<>(
"orders-stream", new SimpleStringSchema(), kinesisProps
)
);
// トランスフォーム: JSONパース → フィルタ → keyBy → ウィンドウ集計
DataStream<OrderSummary> result = rawStream
.map(json -> parseOrder(json))
.filter(order -> order.getAmount() > 0)
.keyBy(order -> order.getUserId())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new OrderAggregator());
// シンク: S3へ書き込み
result.sinkTo(FileSink
.<OrderSummary>forRowFormat(
new Path("s3://my-bucket/output/"),
new SimpleStringEncoder<>()
)
.build()
);
env.execute("Order Aggregation Job");
主要オペレータ
keyByはストリームをキーでパーティショニングし、同一キーのレコードを同一サブタスクに集中させます。ステートフルな集計処理では必ず使用します。
windowはタンブリング・スライディング・セッションウィンドウを定義します。イベント時間ウィンドウを使う場合は、ソースにウォーターマーク設定が必要です。
processはProcessFunctionを使う低レベルAPIで、状態管理・タイマー・ウィンドウの詳細制御が可能です。複雑なCEP(複合イベント処理)や遅延トリガーのロジックを実装する際に使います。
PyFlinkコード例
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.kinesis import FlinkKinesisConsumer
env = StreamExecutionEnvironment.get_execution_environment()
kinesis_props = {
'aws.region': 'ap-northeast-1',
'flink.stream.initpos': 'LATEST'
}
ds = env.add_source(
FlinkKinesisConsumer(
'orders-stream', SimpleStringSchema(), kinesis_props
)
)
ds.map(lambda record: record.upper()).print()
env.execute('PyFlink Demo Job')
PyFlinkはPythonの簡潔な構文で記述でき、NumPy/pandas/scikit-learnといったPythonエコシステムとの統合が容易です。
Studio Notebooks
Studio notebooksはApache Zeppelinを基盤としたインタラクティブ開発環境で、SQL・Python・Scalaをリアルタイムで実行できます。主に次の用途に使います。
- 開発・デバッグ: コードを書いてすぐ実行結果を確認しながら処理ロジックを構築します
- 探索的データ分析: ストリームデータをインタラクティブに集計・可視化します
- KDA for SQL移行の試行: 既存SQLをFlink SQL構文に書き換えてnotebook上で動作確認します
- プロトタイピング: 本番デプロイ前のアイデア検証に使います
Studio notebookはManaged Flinkアプリケーションとは独立して管理されます。notebookはインタラクティブな開発ツールであり、24時間365日稼働させる本番処理には適していません。
Studioでの対話実行例(Flink SQL)
notebookのSQLパラグラフに次のように記述して実行します。
%flink.ssql(type=update)
SELECT
window_start,
window_end,
user_id,
SUM(amount) AS total
FROM TABLE(
TUMBLE(TABLE kinesis_orders, DESCRIPTOR(order_time), INTERVAL '30' SECOND)
)
GROUP BY window_start, window_end, user_id;
%flink.ssqlはZeppelinのFlinkインタープリタを指定するプレフィックスです。type=updateにより、実行結果がリアルタイムで更新表示されます。
Studioから本番アプリケーションへのデプロイ
検証済みのnotebookを本番Managed Flinkアプリケーションとして書き出す手順は次の通りです。
- Studio notebookの「Build」機能でコードをJARファイルにコンパイルします
- 出力JARをS3に配置します
- S3上のJARをコードソースとするManaged Flinkアプリケーションを作成します
- アプリケーションの並列度・KPU・チェックポイント設定を本番用に調整してから起動します
notebookからJARへの書き出しにより、インタラクティブ開発から本番デプロイへの流れをシームレスに実現できます。
開発環境とデバッグ
ローカル開発
Managed Flinkへデプロイする前に、ローカル環境でFlinkアプリケーションの動作確認を行います。FlinkのローカルモードではStreamExecutionEnvironment.getExecutionEnvironment()が自動的にローカルエグゼキューターを使用するため、特別な設定は不要です。
Dockerを使う場合は公式のFlink Dockerイメージが利用できます。
# Flink JobManagerを起動
docker run -d --name flink-jm \
-p 8081:8081 \
flink:1.19 jobmanager
# Flink TaskManagerを起動
docker run -d --name flink-tm \
--link flink-jm:jobmanager \
flink:1.19 taskmanager
Flink UIはhttp://localhost:8081でアクセスでき、ジョブの実行状況・バックプレッシャ・チェックポイント状態を確認できます。
ログ確認(CloudWatch Logs / Flink UI)
Managed Flink上で実行中のアプリケーションのログはCloudWatch Logsに出力されます。ロググループのパスは/aws/managed-flink/jobs/<アプリケーション名>形式です。
Flink UIはアプリケーションのダッシュボードページからアクセスでき、次の情報を確認できます。
- 実行中のジョブグラフ(オペレータ間のデータフローを有向グラフで表示)
- 各オペレータのレコード処理数とバックプレッシャ率
- チェックポイントの成否・所要時間・サイズ
- タスクマネージャのJVMメトリクス
よく使う管理コマンド(AWS CLI)
# アプリケーション一覧
aws kinesisanalyticsv2 list-applications \
--region ap-northeast-1
# アプリケーション詳細確認
aws kinesisanalyticsv2 describe-application \
--application-name my-flink-app \
--region ap-northeast-1
# アプリケーション起動
aws kinesisanalyticsv2 start-application \
--application-name my-flink-app \
--region ap-northeast-1
# スナップショット作成(アプリ更新前の状態保存)
aws kinesisanalyticsv2 create-application-snapshot \
--application-name my-flink-app \
--snapshot-name pre-update-v2 \
--region ap-northeast-1
API選定マトリクス
どの開発手法を選択するかは、ユースケースとチームのスキルによって決まります。次の表を参考に判断してください。
| 観点 | Table API/SQL | DataStream API | Studio notebooks |
|---|---|---|---|
| 学習コスト | 低(SQL知識があればすぐ使えます) | 高(Java/Scalaの習熟が必要です) | 低〜中 |
| 柔軟性 | 中(標準的なSQL操作の範囲内) | 高(カスタムロジックを自由に実装) | 中 |
| デバッグのしやすさ | 中 | 中〜低(コンパイル→デプロイのサイクル) | 高(即時フィードバック) |
| 本番向き | ○(ETL・集計・KDA移行) | ◎(複雑処理・高パフォーマンス) | ✗(開発専用) |
| 主なユースケース | リアルタイム集計・ストリーミングETL | ML統合・CEP・複雑なステートフル処理 | 開発・プロトタイプ・探索 |
| KDA for SQL移行 | ◎(最適な移行先) | ○(複雑なロジックが必要な場合) | ○(移行試行・検証の場) |
実際のプロジェクトでは、まずTable API/SQLで開始して対応しきれない複雑さが生じたタイミングでDataStream APIに移行するという段階的アプローチが有効です。Studio notebooksは常に開発・デバッグの補助として活用します。
- SQL/Table API — 宣言的・最も手軽(ストリームをテーブルとして扱う)
- DataStream API — 低レベル・柔軟(Java/Scala/Python)
- Studio notebooks — Zeppelinベースのインタラクティブ開発(探索→本番デプロイ)
- ★KDA for SQL(廃止)からの移行はStudio(Flink SQL)が受け皿
5. ソース&シンク統合 — Kinesis・MSK・S3・OpenSearch

Managed Service for Apache Flinkでリアルタイム処理を本番運用するには、ソース(入力)とシンク(出力)のコネクタ設定が品質とスループットを左右します。本節ではKinesis・MSK・S3・OpenSearch・Firehose・DynamoDB・JDBCの各コネクタについて、Flink固有の設定とベストプラクティスを解説します。
5-1. ソースコネクタ
Kinesis Data Streams
KinesisソースにはDataStream APIのFlinkKinesisConsumerまたはFlinkの新しいSource APIのKinesisSourceを使用します。
Properties kinesisProps = new Properties();
kinesisProps.setProperty(ConsumerConfigConstants.AWS_REGION, "ap-northeast-1");
kinesisProps.setProperty(
ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"
);
DataStream<String> kinesisStream = env.addSource(
new FlinkKinesisConsumer<>(
"my-kinesis-stream",
new SimpleStringSchema(),
kinesisProps
)
);
設定の重要ポイントは以下のとおりです。
- ShardAssigner: デフォルトは
UniformShardAssignerで各シャードをタスクに均等に割り当てます。ホットシャードが発生している場合はカスタム実装を検討してください。 - WatermarkStrategy: イベント時間処理を行う場合は、Kinesisレコードのタイムスタンプをもとにウォーターマーク戦略を設定します。
WatermarkStrategy.forBoundedOutOfOrdernessが一般的な出発点です。 - 必要なIAMアクセス許可: アプリの実行ロールに
kinesis:GetRecords・kinesis:GetShardIterator・kinesis:DescribeStream・kinesis:ListShardsが必要です。アプリ起動時にDescribeStreamでシャード数を確認するため、この権限が欠けると起動エラーになります。
Amazon MSK / Apache Kafka
Amazon MSKやセルフマネージドのApache KafkaにはFlinkのKafkaSourceコネクタを使用します。
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("b-1.my-cluster.kafka.ap-northeast-1.amazonaws.com:9092")
.setTopics("my-topic")
.setGroupId("flink-consumer-group")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafkaStream = env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"MSK Source"
);
MSK固有の注意点は以下のとおりです。
- bootstrap servers: プライベートサブネットのMSKブローカーエンドポイントを指定します。Managed FlinkとMSKが同一VPCまたはVPCピアリング経由で到達可能である必要があります。
- グループID: コンシューマーグループIDを設定することでオフセット管理が有効になります。チェックポイント失敗後の再開ではFlinkのチェックポイント内オフセットが優先されます。
- SASL/TLS設定: MSKでTLSが有効な場合は
SASL_SSLまたはSSLセキュリティプロトコルをPropertiesに追加してください。IAM認証を使用する場合はMSK IAM Authライブラリを依存関係に追加します。 - パーティション数とparallelism: Kafkaパーティション数がFlinkコンシューマタスクの並列度の上限になります。パーティション数を超えた並列度を設定しても余分なタスクは空のまま動作します。
S3(バッチ入力 / FileSource API)
S3に蓄積されたファイルをストリームとして取り込む場合はFileSource APIを使用します。バッチ処理だけでなく、新規ファイルを継続的に監視するストリーミングモードも利用できます。
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("s3://my-bucket/input/")
)
.monitorContinuously(Duration.ofMinutes(5))
.build();
DataStream<String> s3Stream = env.fromSource(
fileSource,
WatermarkStrategy.noWatermarks(),
"S3 File Source"
);
monitorContinuouslyで指定した間隔ごとにS3バケットをポーリングし、新規ファイルを検知します。バッチ処理の場合はmonitorContinuouslyを省略してそのままbuild()するだけで使用できます。
カスタムソース(外部API / HTTPポーリング)
REST APIや独自プロトコルのデータを取り込む場合はSourceFunctionを実装します。ポーリング間隔とFlinkのチェックポイント間隔を整合させることで処理の継続性を確保できます。
public class HttpPollingSource implements SourceFunction<String> {
private volatile boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (running) {
String data = callExternalApi();
synchronized (ctx.getCheckpointLock()) {
ctx.collect(data);
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
getCheckpointLock()を使ってデータ出力とチェックポイントを同期することで、チェックポイント中の状態一貫性が保たれます。外部APIのレート制限を超えないようThread.sleepで呼び出し間隔を制御してください。
5-2. シンクコネクタ
Amazon S3(FileSink・part file lifecycle)
S3への出力にはFileSinkを使用します。チェックポイントと連動したexactly-once書き込みが可能で、Parquet・ORC・JSONなど複数フォーマットに対応しています。
FileSink<GenericRecord> parquetSink = FileSink
.forBulkFormat(
new Path("s3://my-bucket/output/"),
ParquetAvroWriters.forGenericRecord(schema)
)
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH"))
.build();
processedStream.sinkTo(parquetSink);
OnCheckpointRollingPolicyを指定することで、チェックポイントが完了したタイミングでファイルがpending状態からcommitted状態(通常のファイル名)に遷移します。アプリが障害から再起動した場合、未コミットのpendingファイルは削除されるため重複書き込みが防止されます。DateTimeBucketAssignerでHive形式の時間パーティショニングを実現できます。
Amazon OpenSearch Service(BulkProcessor・retry・index名動的生成)
OpenSearch ServiceへはOpenSearch/Elasticsearchコネクタを使用します。内部的にBulkProcessorを使用してリクエストをバッチ化し、スループットを向上させます。
List<HttpHost> hosts = List.of(
new HttpHost("my-domain.ap-northeast-1.es.amazonaws.com", 443, "https")
);
OpensearchSink<EventRecord> openSearchSink = new OpensearchSinkBuilder<EventRecord>()
.setHosts(hosts.toArray(new HttpHost[0]))
.setEmitter((element, context, indexer) -> {
String indexName = "events-" + element.getDate();
indexer.add(
Requests.indexRequest()
.index(indexName)
.id(element.getEventId())
.source(element.toJsonMap())
);
})
.setBulkFlushMaxActions(500)
.setBulkFlushInterval(2000L)
.setFailureHandler(new RetryRejectedExecutionFailureHandler())
.build();
_idを明示的に指定することで、再処理時の重複書き込みを防ぐ冪等書き込みが実現します。indexNameの動的生成により日付ローテーションインデックスも容易に実現できます。setBulkFlushMaxActionsとsetBulkFlushIntervalでバッチサイズと最大待機時間を調整してください。
Amazon Kinesis Firehose(KinesisFirehoseSink・バッファサイズ/時間)
Kinesis FirehoseへはFlinkのKinesisFirehoseSinkコネクタを使用します。Firehoseのバッファリング設定とFlinkのバッファ設定を組み合わせてスループットを調整します。
KinesisFirehoseSink<String> firehoseSink = KinesisFirehoseSink.<String>builder()
.setFirehoseClientProperties(firehoseProps)
.setSerializationSchema(new SimpleStringSchema())
.setDeliveryStreamName("my-delivery-stream")
.setFailOnError(false)
.setMaxBatchSize(500)
.setMaxTimeInBufferMS(5000L)
.build();
processedStream.sinkTo(firehoseSink);
setMaxBatchSizeはFirehoseへのバッチリクエストの最大レコード数(上限500件)、setMaxTimeInBufferMSはバッファリング最大時間(ミリ秒)です。setFailOnError(false)とすることで個々のレコードエラーがパイプライン全体を止めないようにできます。ただし無効なレコードがFirehoseのデッドレターS3バケットに送られる点を監視してください。
Amazon DynamoDB(asyncDynamoDbSink・べき等書込・コスト注意)
DynamoDBへはFlinkのDynamoDbSinkコネクタを使用します。書き込みのべき等性を確保するために条件付きPutを活用します。
DynamoDbSink<ProcessedEvent> dynamoSink = DynamoDbSink.<ProcessedEvent>builder()
.setDynamoDbClient(DynamoDbAsyncClient.builder()
.region(Region.AP_NORTHEAST_1)
.build())
.setTableName("processed-events")
.setElementConverter(new ProcessedEventConverter())
.setMaxBatchSize(25)
.build();
DynamoDBのバッチ書き込みはBatchWriteItemの制限である25件が上限です。高スループット時のコスト最適化として、プロビジョンドモードでAuto Scalingを設定する方がオンデマンドモードより安価になるケースがあります。書き込みキャパシティユニット(WCU)の消費量を事前にシミュレーションしてください。
JDBC(RDS/Aurora・JDBC Sink・upsert vs insert)
RDSやAuroraにはFlinkのJdbcSinkを使用します。冪等性を確保するためupsertを使用します。
SinkFunction<SalesEvent> jdbcSink = JdbcSink.sink(
"INSERT INTO sales_summary (event_id, amount, updated_at) " +
"VALUES (?, ?, NOW()) " +
"ON CONFLICT (event_id) DO UPDATE " +
"SET amount = EXCLUDED.amount, updated_at = NOW()",
(statement, event) -> {
statement.setString(1, event.getEventId());
statement.setDouble(2, event.getAmount());
},
JdbcExecutionOptions.builder()
.withBatchSize(200)
.withBatchIntervalMs(2000)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://my-aurora.cluster.ap-northeast-1.rds.amazonaws.com:5432/mydb")
.withDriverName("org.postgresql.Driver")
.build()
);
ON CONFLICT DO UPDATE(PostgreSQL/Aurora PostgreSQL)で再処理時の重複を排除します。Aurora Serverless v2の場合、コールドスタート後の初回書き込みでレイテンシ増加のケースもあるためwithMaxRetriesでリトライを設定してください。
5-3. シリアライズ / デシリアライズ
コネクタで使用するシリアライズ形式の選定基準を以下にまとめます。
| フォーマット | 特徴 | 推奨用途 |
|---|---|---|
| JSON | 可読性高・スキーマレス | ログ分析・プロトタイピング・OpenSearch連携 |
| Avro | スキーマ付き・コンパクト | Schema Registryと連携するMSKパイプライン |
| Protobuf | 高速・コンパクト | マイクロサービス間の高スループット連携 |
| CSV | シンプル | 単純なストリーミングETL・S3バッチ入力 |
Schema Registry(Confluent / MSK)との統合: MSKとAWS Glue Schema Registryを組み合わせる場合はGlueSchemaRegistryAvroDeserializationSchemaを使用します。Avroスキーマの互換性チェックがSchema Registry側で自動的に行われるため、スキーマ進化を安全に管理できます。
TypeInformationの重要性: FlinkではJavaのジェネリクス型消去の影響で、型情報の明示的な指定が必要なケースもあります。
TypeInformation<SalesEvent> typeInfo = TypeInformation.of(SalesEvent.class);
SingleOutputStreamOperator<SalesEvent> result = stream.map(
event -> parser.parse(event),
typeInfo
);
型情報を省略するとFlinkは誤った型を推論し、シリアライズエラーやパフォーマンス低下を招くことがあります。独自クラスを使用する場合は必ずTypeInformationを明示してください。
5-4. 非同期I/Oエンリッチメント
リアルタイムストリームを外部データソースで補完(エンリッチ)する場合、同期I/Oではブロッキング待機によりスループットが大幅に低下します。AsyncDataStream APIを使用することでI/O待機をノンブロッキングにできます。
DataStream<EnrichedEvent> enriched = AsyncDataStream.unorderedWait(
inputStream,
new AsyncEnrichFunction(dynamoDbAsyncClient),
30, TimeUnit.SECONDS,
100
);
public class AsyncEnrichFunction extends RichAsyncFunction<Event, EnrichedEvent> {
@Override
public void asyncInvoke(Event input, ResultFuture<EnrichedEvent> resultFuture) {
CompletableFuture<UserProfile> future = dynamoDbClient.getItem(input.getUserId());
future.thenAccept(profile ->
resultFuture.complete(
Collections.singleton(new EnrichedEvent(input, profile))
)
);
}
@Override
public void timeout(Event input, ResultFuture<EnrichedEvent> resultFuture) {
resultFuture.complete(
Collections.singleton(EnrichedEvent.withoutEnrichment(input))
);
}
}
設定のポイントは以下のとおりです。
- unorderedWait vs orderedWait:
unorderedWaitはイベント順序を保証しない代わりにスループットを最大化します。順序保証が必要な場合はorderedWaitを使用しますが、遅いリクエストが後続をブロックします。 - タイムアウト: 第3・4引数でタイムアウトを指定します。タイムアウト時は
timeout()メソッドが呼ばれるため、フォールバック値を返してパイプラインの停止を防いでください。 - キャパシティ: 第5引数で同時実行可能な非同期リクエスト数を制御します。外部サービスのスロットリング制限に合わせて調整してください。
5-5. 配信保証とシンク冪等性
Flinkはチェックポイントメカニズムによってat-least-once配信を保証します。exactly-onceを実現するにはシンク側の冪等性またはトランザクショナルシンクが必要です。
| シンク | 配信保証 | 冪等性の仕組み |
|---|---|---|
| S3 (FileSink) | exactly-once | チェックポイント完了時にpending→committed遷移 |
| OpenSearch | 実質exactly-once | _id指定による重複上書き |
| DynamoDB | 実質exactly-once | 条件付きPut (attribute_not_exists) |
| Kinesis Firehose | at-least-once | 重複が発生する可能性あり |
| JDBC | 実質exactly-once | upsert + PKによる重複排除 |
S3のexactly-once(pending→committed): FileSinkはチェックポイントが成功するまでファイルを.pending状態に保ちます。障害でアプリが再起動した場合、未コミットのpendingファイルは削除され、再処理によって重複書き込みを防ぎます。
DynamoDBの条件付きPut: べき等書き込みにはattribute_not_exists条件を使用します。
PutItemRequest request = PutItemRequest.builder()
.tableName("events")
.item(item)
.conditionExpression("attribute_not_exists(event_id)")
.build();
同一event_idが既に存在する場合はConditionalCheckFailedExceptionが発生して書き込みをスキップします。この例外は正常系として扱い、エラーログに出力しないよう実装してください。
5-6. parallelismとシャード数の整合
ソースコネクタの並列度はデータソースの物理的な分割数(シャード / パーティション)と整合させることが重要です。整合が取れないとスループットの天井が下がるか、空のタスクによるリソースの無駄遣いが発生します。
Kinesisシャード数 = parallelism上限
KinesisコンシューマはシャードをFlinkタスクに割り当てます。parallelismがシャード数を超えても余分なタスクには割り当てるシャードがなく、空のままKPUを消費します。シャードを分割・統合した場合はアプリのparallelismも合わせて変更してください。
Kinesisシャード数: 8シャード
推奨parallelism: 8(1タスク = 1シャード)
KPU消費目安: parallelism ÷ parallelismPerKPU(デフォルト1) = 8 KPU
MSKパーティション数とparallelism
KafkaパーティションもKafkaコンシューマの並列度の上限です。Kafkaトピックのパーティション数とFlinkのparallelismを1:1に設定するのが基本です。
MSKトピックパーティション数: 12
推奨parallelism: 12(1タスク = 1パーティション)
KPU消費目安: 12 KPU(parallelismPerKPU=1の場合)
parallelismPerKPUを2以上に設定することでKPUコストを削減できますが、1KPUあたりのメモリ(4GB)を複数タスクで共有することになります。状態(RocksDB)が大きいアプリではメモリ不足になるため、parallelismPerKPUの増加はステートレスな処理や状態が小さいアプリに限定してください。
- ソース — Kinesis Data Streams / Amazon MSK(Kafka) / Kafka互換
- シンク — S3 / OpenSearch / Firehose / DynamoDB / JDBC / カスタム
- コネクタ・スキーマ・シリアライズ(JSON/Avro)・非同期I/Oエンリッチメント
- 配信保証(exactly-once)とシンク冪等性・シャード/パーティションとparallelismの対応
出力先のOpenSearchはこちら(OpenSearch本番運用 Vol1)
6. スケール・運用・コスト
flowchart LR
CW["CloudWatch\nメトリクス監視\ncpuUtilization / backPressuredTimeMsPerSecond"]
CW -->|CPU高騰・バックプレッシャ増| SCALEUP["KPU 増加\n並列度スケールアウト\n(最大KPU上限まで)"]
CW -->|CPU低下・負荷安定| SCALEDOWN["KPU 削減\nスケールイン\n(最小KPU下限まで)"]
SCALEUP --> CKPT["チェックポイント\n自動実行\n状態をS3に保存"]
SCALEDOWN --> CKPT
CKPT --> RESUME["処理再開\n最新チェックポイントから\n自動復旧"]
RESUME -->|継続監視| CW
KPU — 課金とスケールの基本単位
Managed Service for Apache Flinkにおけるリソースの最小単位はKPU(Kinesis Processing Unit)です。1 KPUは1 vCPU + 4 GBメモリで構成され、アプリケーションストレージとして50 GBのディスクが付属します。
KPUは秒単位で課金されます(1時間未満の端数も秒単位で計算)。アプリの実行中は課金が継続するため、不要なアプリは停止または削除してコストを抑制します。
Flinkの並列度(parallelism)は「同時に動作するタスクスロットの数」を意味します。Managed Flinkでは、並列度とKPU数が密接に関係します。デフォルトではparallelism per KPUが1ですが、設定により変更できます。KPU数 × parallelism per KPU が実行可能な並列タスクスロットの上限となります。
並列度4のアプリをparallelism per KPU=1で実行すると最低4 KPUが必要です。parallelism per KPU=2に設定すると2 KPUで同じ並列度を実現できますが、各KPUのメモリ消費が増加します。ステートフルなアプリで状態が大きい場合は、KPUあたりの並列度を下げてメモリ余裕を確保します。
KPUオートスケール
Managed Flinkはアプリごとにオートスケールを有効化できます。有効にすると、CloudWatchメトリクス(主にCPU使用率)を基準として、KPUの最小数と最大数の範囲内でKPUが自動増減します。
オートスケールの主な設定項目は以下の通りです。
| 設定 | 説明 |
|---|---|
| autoscalingEnabled | trueでオートスケールを有効化 |
| parallelism (初期値) | アプリ起動時の並列度 |
| parallelismPerKPU | KPUあたりの並列タスクスロット数(デフォルト1) |
| 最小KPU | スケールインの下限 |
| 最大KPU | スケールアウトの上限 |
CPU使用率の上昇が続くとスケールアウト、低下が続くとスケールインをトリガーします。スケール操作中はFlinkジョブが一時停止し、最新のチェックポイントから自動復旧します。このため、オートスケールにはチェックポイントの正常動作が前提となります。
過剰なスケールアウトを防ぐために最大KPUを適切に設定することが重要です。上限を設定しないと入力レートの急増に追従して際限なくKPUが増加し、コストが膨らみます。
バックプレッシャが慢性的に発生する場合は、並列度の設計や状態管理の見直しが先決です。オートスケールはCPU起因のボトルネックには効果的ですが、状態肥大・シャード不足・シンクのスループット不足が原因のバックプレッシャには根本的な対処が別途必要です。
監視メトリクス
Managed FlinkはCloudWatchにメトリクスを自動でエクスポートします。本番運用で監視すべき主要メトリクスを以下に示します。
| メトリクス | 説明 | 閾値の目安 |
|---|---|---|
| numRecordsInPerSecond | 入力レコード数/秒 | 設計スループットの±30% |
| numRecordsOutPerSecond | 出力レコード数/秒 | 入力と比較して処理の詰まりを検出 |
| cpuUtilization | CPU使用率 | 80%超えでスケールアウト検討 |
| heapMemoryUtilization | ヒープメモリ使用率 | 85%超えで状態管理の見直し |
| lastCheckpointSize | 最新チェックポイントのサイズ(バイト) | 急増は状態肥大のサイン |
| lastCheckpointDuration | チェックポイント所要時間(ms) | 設定間隔の10%超えで警戒 |
| numberOfFailedCheckpoints | チェックポイント失敗数 | 0以外は即アラート |
| downtime | アプリダウン時間(ms) | 0以外は障害の証拠 |
| KPUs | 実行中KPU数 | コスト監視の基準 |
バックプレッシャの検出には、Flink Dashboard(Managed Flinkコンソールから直接アクセス可能)の「Backpressure」タブが有効です。タスクごとにバックプレッシャ率が表示され、高率のタスクがボトルネックを示します。CloudWatchアラームとFlink Dashboardを組み合わせることで、異常を素早く特定できます。
CloudWatchアラームは少なくとも以下の3点に設定することを推奨します。
numberOfFailedCheckpoints >= 1— チェックポイント失敗の即時通知downtime > 0— アプリ停止の即時通知cpuUtilization > 80%を5分以上 — スケールアウト検討トリガー
状態バックエンド(RocksDB)の管理
Managed Flinkではデフォルトの状態バックエンドとしてRocksDB(EmbeddedRocksDBStateBackend)が推奨されます。RocksDBはローカルSSD(KPU付属の50 GB)に状態を書き出すため、インメモリバックエンドより大きな状態を扱えます。
状態の肥大化を防ぐための主な対策を以下に示します。
TTL(Time-to-Live)の設定
KeyedStateにTTLを設定すると、最後にアクセスされてから指定時間後に状態が自動削除されます。セッション管理・ウィンドウ結合・重複排除で蓄積し続ける状態の上限を明示的に制御できます。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("count", Long.class);
descriptor.enableTimeToLive(ttlConfig);
RocksDB compactionの設定
RocksDBはLSMツリー構造を採用しており、定期的なcompactionでデータを整理します。デフォルト設定でも動作しますが、書き込み量が多い場合はcompactionスレッド数やレベル設定を調整することでI/Oを平滑化できます。Managed Flinkではアプリの設定プロパティ(RocksDBConfigurableOptions)を通じてRocksDBのパラメータを変更できます。
チェックポイントサイズの監視
状態が肥大すると、チェックポイントのサイズと所要時間が増大します。lastCheckpointSizeとlastCheckpointDurationが増加傾向にある場合は、TTL設定の見直し・状態設計の簡素化・並列度の増加(状態の分散)を検討します。チェックポイントが設定間隔内に完了しないと、次のチェックポイントが開始できず、状態保全性が損なわれます。
チェックポイントからの自動障害復旧
Managed Flinkはアプリが異常終了した場合、最新の成功したチェックポイントから自動的に再起動します。ユーザーが明示的にスナップショット(セーブポイント)を保存した場合は、そのスナップショットから起動することでアプリのバージョン更新やロールバックが可能です。障害復旧の精度はチェックポイントの間隔に依存するため、許容するデータ損失量(RPO)に応じた間隔設定が必要です。
コスト計算例
Managed Flinkのコストはシンプルです。主な課金項目を整理します。
| 課金項目 | 単価(東京リージョン参考値) | 説明 |
|---|---|---|
| KPU時間 | $0.11/KPU-時間 | 秒単位で課金 |
| アプリストレージ | $0.10/GB-月 | 50 GB/KPUを超えた分のみ |
| Studioノートブック追加KPU | $0.11/KPU-時間 | Studio実行時の追加分 |
※単価はリージョンや時期により変動します。最新料金はAWS公式料金ページを参照してください。
コスト計算例: 並列度4のリアルタイム集計アプリ(24時間稼働)
前提条件:
– 並列度: 4、parallelism per KPU: 1 → KPU数: 4
– 月間稼働時間: 720時間(24時間×30日)
– 状態ストレージ: 50 GB/KPU以内(標準範囲)
KPUコスト: 4 KPU × $0.11/時間 × 720時間 = $316.80/月
ストレージコスト: 標準範囲内のため追加なし($0)
合計: 約$316.80/月(約47,500円/月 ※1ドル=150円換算)
並列度を適切に下げてKPUを2に削減できれば、コストは約$158.40/月に半減します。オートスケールと組み合わせることで、ピーク時は4 KPU、閑散時は2 KPUに自動調整し、コストを最適化できます。
KDA for SQL廃止後の移行コスト試算
KDA for SQLからManaged Flinkへの移行では、コスト構造が変化します。移行の際は、既存SQLアプリのスループットと状態量を計測し、Flink SQL/Table APIで同等の処理を実現するために必要な最小並列度とKPU数を見積もります。移行直後はKPUを多めに確保し、CloudWatchメトリクスを見ながら徐々にダウンサイジングするアプローチが安全です。
コスト最適化チェックリスト
- 不要なアプリは停止(コンソールまたはAPIでStopped状態に変更)
- オートスケールを有効化し最大KPUを実績の1.5倍程度に設定
- parallelism per KPUを増やして同じ並列度を少ないKPUで実現(ただしメモリ使用量を監視)
- Studio notebooksは開発・探索時のみ起動し、本番ジョブが不要な時間帯は停止
lastCheckpointDurationが増大している場合は状態管理を見直してKPU増加を抑制
バックプレッシャへの対応
バックプレッシャはFlinkジョブの下流タスクが処理しきれず上流タスクのスループットが低下する現象です。Flink DashboardのBackpressureタブでタスクごとのバックプレッシャ率を確認し、原因を特定します。
主な原因と対処を以下に示します。
| 原因 | 対処 |
|---|---|
| シンクのスループット不足 | シンク側のスロットリング設定を緩和・並列度を増加 |
| 状態(RocksDB)肥大によるI/O遅延 | TTL設定・並列度増加で状態を分散 |
| 入力シャード数 < Flink並列度 | シャード数を増加してKinesisの読み取りスループットを向上 |
| コンピュート不足(CPU高騰) | KPUをスケールアウト・parallelismを調整 |
バックプレッシャは表面的にはレイテンシの増大として現れるため、CloudWatchのnumRecordsInPerSecondが設計値を大きく下回っている場合は、Flink Dashboardと合わせてボトルネックのタスクを特定します。
- ★KPU(1vCPU+4GB)が課金/スケール単位。秒単位課金+アプリストレージ(50GB/KPU・$0.10/GB-月)
- オートスケールはCloudWatchメトリクス基準でKPU増減。並列度との整合を設計
- 監視=numRecordsIn/Out・チェックポイント時間/サイズ・バックプレッシャ・KPU使用率
- 状態(RocksDB)肥大とバックプレッシャに注意。並列度/KPU過大はコスト増
7. 実戦統合パターン — エンドツーエンド設計とKDA for SQL移行
flowchart TD
A["KDA for SQL\n廃止スケジュール確認\n新規停止: 2025-10-15\n完全削除: 2026-01-27"]
A --> B["棚卸し\nlist-applications\nSQL定義・ソース・シンク・UDF確認"]
B --> C["Studio Notebooks で\nFlink SQL に書き換え\nCREATE TABLE + INSERT INTO 形式"]
C --> D["インタラクティブ検証\n同一入力 → 同一出力確認\nExactly-once 保証確認\nレイテンシ要件チェック"]
D -->|検証NG| C
D -->|検証OK| E["本番 Managed Flink アプリ\nデプロイ・スタンバイ状態"]
E --> F["カットオーバー\nKDA for SQL 停止\nManaged Flink 起動"]
F --> G["出力確認\nOpenSearch / S3 / DynamoDB\n正常書込・データ整合確認"]
G -->|問題あり| H["ロールバック\nKDA for SQL 再起動\n原因調査"]
G -->|問題なし| I["KDA for SQL リソース削除\n移行完了"]
Managed Flinkを本番環境で最大限に活用するには、Kinesis/MSKからS3・OpenSearch・DynamoDBまでの一貫したデータパイプライン設計と、Kinesis Data Analytics for SQL(KDA for SQL)からの計画的な移行が必要です。本節では、代表的なE2E統合パターン・ユースケース別構成・KDA for SQL移行ロードマップ・IAM/VPC連携を解説します。
7.1 E2E統合パターン
Managed Flinkを中心としたE2E統合は「取込 → リアルタイム処理 → 出力」の3層構成です。
【取込層】
Kinesis Data Streams / Amazon MSK(Kafka)
↓ (ソースコネクタ)
【処理層】
Amazon Managed Service for Apache Flink
・ステートフル集計 / ウィンドウ処理
・ストリーム結合 / エンリッチメント
・異常検知 / CEP
↓ (シンクコネクタ)
【出力層】
Amazon S3 → データレイク / Athena分析
Amazon OpenSearch→ リアルタイム検索 / 可視化
Amazon DynamoDB → 低レイテンシ参照 / API応答
Amazon Kinesis Firehose → S3 / Redshiftへの配信
取込層のKinesis Data StreamsやAmazon MSKはそれぞれ固有の設計指針を持ちます。シャード数・パーティション数・保持期間などの設定は既存のKinesis/MSK記事を参照し、本節ではFlink処理層と出力先の接続設計に集中します。
シンク別の設計ポイント
S3シンクはFlink FileSink(またはStreamingFileSink)を使用し、Parquet/ORC/JSON形式でバケットに書き込みます。イベント時間に基づくHive互換パーティション(dt=2025-10-01/hr=14)を設定することで、後段のAthenaクエリ効率が向上します。ファイルのローリング(サイズ・時間基準)を設定してファイル数を適切にコントロールします。
OpenSearchシンクはApache Flink用OpenSearchコネクタを使用します。バルク書込みでスループットを確保し、ドキュメントIDへイベントの自然キー(センサーID・トランザクションID等)を設定することで冪等性を確保します。インデックス名はイベント時間に基づくローリング形式(日付別)を採用すると古いデータの削除管理が容易です。
DynamoDBシンクはAWSのFlink用DynamoDBコネクタを使用します。PutItemを冪等操作として利用するため、DynamoDBのPKにイベントの自然キーを設定します。書込みスループット(WCU)はFlink parallelismと書込みレートに応じてオートスケールを有効化します。
7.2 ユースケース別構成
リアルタイムダッシュボード
センサーやWebアクセスのイベントをKinesis/MSKで受信し、Flinkでタンブリングウィンドウ集計(1分・5分単位)を実施します。集計結果をOpenSearchにストリーミング書込みし、OpenSearch Dashboards(旧Kibana)でリアルタイム可視化します。
Kinesis/MSK → Flink(タンブリングウィンドウ集計 1分/5分) → OpenSearch → Dashboards
設計ポイントは以下のとおりです。
- ウィンドウサイズはダッシュボードのリフレッシュレートに合わせて設定
- イベント時間基準を使用し、ウォーターマーク遅延(5〜10秒)で遅延イベントを吸収
- OpenSearchへの書込みは
BulkProcessorでバッファリングしスループットを最大化
不正検知
金融取引やeコマースの決済ストリームを対象に、CEP(複合イベント処理)で異常パターンを検出します。短時間の複数回認証失敗・閾値超過トランザクション・地理的に矛盾する連続アクセスなどを検出してDynamoDBにアラートを書き込み、Lambda/SNSで後続処理をトリガーします。
MSK → Flink(CEP・ステートフル集計) → DynamoDB → Lambda → SNS
設計ポイントは以下のとおりです。
PatternStreamでCEPパターンを定義(例: 10秒以内に3回以上認証失敗)- キー付き状態のTTL(Time-to-Live)をCEPウィンドウに合わせて設定し状態肥大を防止
- DynamoDB書込みを冪等操作にしてexactly-once的な二重検出を防止
IoTテレメトリ集計
IoTデバイスからのテレメトリをKinesisで収集し、デバイスIDをキーにしたスライディングウィンドウで移動平均・異常値フィルタを適用します。正常データはS3(データレイク)へ、異常値はOpenSearchへ振り分けます。
Kinesis → Flink(スライディングウィンドウ・フィルタ) → S3(正常) / OpenSearch(異常)
設計ポイントは以下のとおりです。
- デバイス数が多い場合はKinesisシャード数をFlink parallelismに合わせて設計
- 異常値の定義(例: 移動平均の3σ超過)はFlinkのProcessFunctionで実装
- S3はデバイスIDと日時によるパーティション分割でAthena分析を高速化
リアルタイムETL
OLTPデータベースのCDC(変更データキャプチャ)ストリームやKafkaトピックを受信し、正規化・エンリッチメント・型変換を施してS3やRedshiftに書き込みます。
MSK(CDC) → Flink(変換・結合・エンリッチメント) → S3 / Redshift
設計ポイントは以下のとおりです。
- 参照テーブルとのJOINは非同期I/O(
AsyncDataStream.unorderedWait)または定期ブロードキャストステートで実装 - スキーマ変更(カラム追加等)に備えてAvroやProtobufのスキーマレジストリ活用を検討
- Redshiftへの書込みはFilesinkでS3 Parquetに出力し、COPY コマンドで取込む構成が安定
7.3 KDA for SQL → Managed Flink 移行ロードマップ
廃止スケジュールの確認
Kinesis Data Analytics for SQL Applicationsは以下のスケジュールで廃止されます。
| マイルストーン | 日付 | 内容 |
|---|---|---|
| 新規作成停止 | 2025-10-15 | 新規SQLアプリケーションの作成不可 |
| サービス完全削除 | 2026-01-27 | 既存アプリを含めてサービス終了 |
2026-01-27以降はKDA for SQLアプリが実行不能になるため、それ以前の移行完了が必須です。2025年中に移行計画を確定し、余裕を持ったスケジュールで進めます。
ステップ1: 現行KDA for SQLアプリの棚卸し
まず、すべてのKDA for SQLアプリをリストアップします。
aws kinesisanalytics list-applications \
--output table \
--query 'ApplicationSummaries[*].[ApplicationName,ApplicationStatus]'
各アプリについて以下を確認します。
describe-applicationでSQL定義を取得し、クエリの複雑さを評価- 入力ソース(Kinesis Data Streams/Firehose)と出力シンク(Kinesis/Lambda/S3)を整理
- リファレンスデータ(S3参照テーブル)の利用有無を確認
- CloudWatchメトリクスでアクティブに使用されているかを確認(実質未使用ならアーカイブ)
ステップ2: Flink SQLへの書換
KDA for SQLとFlink SQLは構文に差異があります。主な相違点を以下に示します。
| 機能 | KDA for SQL | Flink SQL |
|---|---|---|
| ストリーム宣言 | CREATE STREAM | CREATE TABLE + 'connector'='kinesis' |
| 出力操作(ポンプ) | CREATE PUMP ... INSERT INTO | INSERT INTO |
| ウィンドウ | ROWTIME_TO_EPOCHMILLISECONDS | TUMBLE(rowtime, INTERVAL '1' MINUTE) |
| 時間属性 | ROWTIME | WATERMARK FOR ts AS ts - INTERVAL '5' SECOND |
| リファレンスデータ | CREATE REFERENCE TABLE | CREATE TABLE + 'connector'='filesystem'(または静的ブロードキャスト) |
Studio notebooksを使ってFlink SQLの動作をインタラクティブに確認しながら書き換えることを推奨します。ノートブック上で少量のサンプルデータを使って動作検証を行い、クエリが意図した結果を返すことを確認してからManaged Flinkアプリとしてデプロイします。
ステップ3: テストと検証
書き換えたFlink SQLアプリを検証環境で実行し、以下を確認します。
- 入力スキーマの一致(KDA for SQLとFlink SQLでフィールド名・型・タイムスタンプ属性が同じか)
- 集計結果の一致(同一の入力データに対して同一の出力が得られるか)
- レイテンシ要件の達成(ウィンドウサイズと遅延の確認)
- Exactly-once保証の確認(シンク側の冪等性設定)
本番移行前にKDA for SQLとManaged Flinkを一定期間並走させ、出力結果を比較検証することを推奨します。並走期間中は二重書込みが発生するため、冪等シンク(DynamoDBのPutItem・OpenSearchのupsert等)を使用し重複データの影響を排除します。
ステップ4: Managed Flinkへの本番切替
検証完了後、本番カットオーバーを行います。
- 本番Managed Flinkアプリをデプロイしスタンバイ状態にする
- カットオーバー日時を関係者に通知する
- KDA for SQLアプリを停止する
- Managed Flinkアプリを起動する
- 出力先(OpenSearch/S3/DynamoDB等)に正常にデータが書き込まれていることを確認する
- 問題がなければKDA for SQLアプリのリソース(Firehose連携等)を削除する
カットオーバー後のロールバックに備え、KDA for SQLアプリの設定(SQL定義・入出力設定)はドキュメント化して保管します。
7.4 IAM/VPC連携
最小権限IAMロール設計
Managed Flinkアプリに付与する実行IAMロールには、必要なサービスへのアクセス権限のみを付与します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListShards"
],
"Resource": "arn:aws:kinesis:ap-northeast-1:ACCOUNT_ID:stream/INPUT_STREAM"
},
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject", "s3:ListBucket"],
"Resource": [
"arn:aws:s3:::my-output-bucket",
"arn:aws:s3:::my-output-bucket/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:PutLogEvents",
"logs:CreateLogGroup",
"logs:CreateLogStream"
],
"Resource": "arn:aws:logs:ap-northeast-1:ACCOUNT_ID:*"
}
]
}
チェックポイントとスナップショットの保存先S3バケットへのRead/Write権限も実行ロールに付与します。MSKへのアクセスにはkafka:*アクションではなく、MSK IAM認証を使用してコネクタ設定でawsIamAuthEnabled=trueを指定することが推奨されます。
VPC内でのFlink実行
MSKやプライベートサブネット内のリソース(RDSへのJDBCシンク等)にアクセスする場合は、アプリにVPC設定が必要です。サブネットIDとセキュリティグループIDを指定することで、Managed FlinkワーカーがVPC内で実行されます。
- MSKへのアクセス: MSKクラスターと同一VPCのサブネットを指定し、MSKのセキュリティグループがFlink側セキュリティグループからのアクセスを許可
- Kinesis Data Streamsへのアクセス: VPCエンドポイント(Gateway/Interface)を設定することでインターネット経由を回避
- 外部APIへのアクセスが必要な場合: NATゲートウェイ経由のルートをルートテーブルに設定
VPCを設定するとアプリの起動時間がわずかに増加します(ENIのアタッチに数十秒)。開発環境ではVPCなしで動作検証し、本番環境でVPCを有効化する流れが効率的です。
既存データ系記事との連携
Managed Flinkの取込・出力先となる各サービスの詳細設定は、以下の関連記事を参照してください。取込設計(Kinesis/MSK)・出力先設計(OpenSearch/データレイク)はそれぞれ独立した設計事項として本記事では解説を割愛しています。
- E2E — Kinesis/MSK(取込)→Managed Flink(リアルタイム集計/結合/異常検知)→S3/OpenSearch/DynamoDB(出力)
- ユースケース — リアルタイムダッシュボード/不正検知/IoTテレメトリ/ストリーミングETL
- ★KDA for SQL移行(2025-10-15新規停止/2026-01-27削除) — SQLアプリ棚卸し→Flink SQL(Studio)書き換え→検証→切替
- 取込はKinesis/MSK記事、出力先はOpenSearch/データレイク記事を参照
出力先のデータレイク分析はこちら(データレイク本番運用 Vol1)
Kinesis/イベント連携はこちら(サーバーレス本番運用 Vol2)
8. つまずきポイント・アンチパターン・まとめ
つまずきポイント
Managed Service for Apache Flinkを本番運用する上で、実際によく発生するトラブルと対処法を具体的な症状・原因・解決策の形でまとめます。
1. 処理時間とイベント時間の混同
症状: ストリーム集計の結果が想定と一致しない。遅延して届いたイベントが集計から漏れる。5分前の取引が現在の集計ウィンドウに入ってしまう。
原因: デフォルトではProcessingTime(処理時間)ベースの処理が適用されます。モバイルアプリやIoTデバイスからのイベントは、ネットワーク遅延や再接続で数分〜数十分遅れて届くことがあります。処理時間ウィンドウでは遅延イベントは最新のウィンドウに属するため、過去の時間帯の集計値が欠損します。
解決策: イベントに埋め込まれたタイムスタンプを使い、EventTimeを設定します。ウォーターマークで許容遅延を設定し、遅延データを正確に扱います。「いつ処理したか」ではなく「いつ発生したか」を基準とした設計に切り替えます。ウォーターマークの遅延許容は、データの遅延分布の99パーセンタイルを基準に設定します。
2. ウォーターマーク未設定・過小設定
症状: ウィンドウ集計でイベントが大量に欠損する。ウィンドウが期待どおりに閉じない(結果が出ない)、または早期に閉じてしまう。CloudWatchのnumLateRecordsDroppedが高い値を示す。
原因: イベント時間を使っているにもかかわらずウォーターマークを設定しない、または許容遅延を小さく設定しすぎています。ウォーターマーク未設定の場合、アイドル状態のソースではウォーターマーク発行が停止し、ウィンドウの終了タイミングが不定になります。許容遅延が小さすぎると、正常なネットワーク遅延で届いたイベントがすべて「遅延データ」として破棄されます。
解決策: データの遅延分布を事前に計測し、99パーセンタイルの遅延時間をウォーターマークの許容遅延に設定します。BoundedOutOfOrdernessWatermarksを使い、実測値に基づいて調整します。遅延データの破棄数はCloudWatchのnumLateRecordsDroppedメトリクスで継続的に監視し、破棄が多い場合は許容遅延を引き上げます。アイドルソースがある場合はwithIdlenessを設定します。
3. チェックポイント間隔の過短設定
症状: スループットが想定を大幅に下回る。バックプレッシャが恒常的に発生する。CloudWatchのlastCheckpointDurationがチェックポイント間隔に近い値を示している。
原因: チェックポイント間隔を短く設定しすぎると(例: 5〜10秒ごと)、チェックポイント処理のオーバーヘッドが処理全体の大部分を占めます。特にRocksDBの状態サイズが大きい場合、S3へのスナップショット書き込みに時間がかかり、次のチェックポイント開始までに前回が完了しない「チェックポイント停止」が発生します。
解決策: 通常は60秒〜5分のチェックポイント間隔が適切です。CloudWatchのlastCheckpointDurationメトリクスでチェックポイント所要時間を監視し、間隔の最低でも2倍の余裕を持たせます。minPauseBetweenCheckpointsで連続実行を防止し、同時チェックポイント数は1に設定します。RocksDBのenableIncrementalCheckpointingを有効化して差分スナップショットにすることでチェックポイントサイズを削減できます。
4. parallelismとKinesisシャード数の不整合
症状: Kinesisを読む並列タスクの一部がアイドルになる。一部のサブタスクのスループットが他と比較して大幅に低い。MillisBehindLatestが解消されない。
原因: Amazon Kinesis Data Streamsのコンシューマでは、1つのFlink並列タスクが1つ以上のシャードを担当します。Flinkのparallelismがシャード数を超えると、超過した並列タスクはシャードが割り当てられずアイドルになります。例えばシャード4・parallelism 8の場合、4タスクがアイドルになりKPUとコストが無駄になります。
解決策: Kinesisソースのparallelismはシャード数以下に設定します。シャード数を増やした場合は対応してparallelismも増やします。スケール後はCloudWatchのMillisBehindLatestメトリクスでラグを確認します。Kinesis EFO(Enhanced Fan-Out)を使用する場合は、シャードごとの専用スループット(2 MB/s)を最大限活用するためparallelism=シャード数が最適です。
5. RocksDB状態の肥大化
症状: メモリ不足(OOM)エラー、チェックポイントの異常な遅延、compaction処理によるCPUスパイク。CloudWatchのmanagedMemoryUtilizationが高止まりする。
原因: 状態のTTL(有効期限)を設定しないと、古いキーが無限に蓄積します。ウィンドウ処理での集計キーや、セッション状態のユーザーキーが積み上がることで、RocksDBのサイズが数GB〜数十GBに膨張します。TTL未設定のまま長期運用すると、チェックポイントのS3書込みサイズが増大し所要時間も増加します。
解決策: StateTtlConfigで状態のTTLを設定します。セッション状態なら最大セッション時間+バッファ、集計状態ならウィンドウサイズの2〜3倍を設定します。インクリメンタルチェックポイント(enableIncrementalCheckpointing)を有効化して増分スナップショットにすることで、状態が大きくてもチェックポイント時間を短縮できます。managedMemoryUtilizationとrocksdbCompactionTimeを監視して状態管理の効果を継続確認します。
6. KPUスケール設定の不適切
症状: 処理遅延の恒常化とコスト爆発が同時に発生する。KPUsメトリクスが急増して予算を超過する。または、スループット要件を満たせない。
原因: KPUはManaged Flinkの課金・スケール単位(1 KPU = 1 vCPU + 4GB RAM)です。KPU上限を低く設定しすぎるとスケールアウト不能になり処理が滞ります。逆に上限なくオートスケールを設定すると、バックプレッシャの瞬間的なスパイクで大量のKPUが起動してコスト急増を招きます。スケール時にはチェックポイントが自動実行されるため、頻繁なスケールはそのぶんオーバーヘッドが増えます。
解決策: 本番想定スループットでの必要KPU数をあらかじめ見積もり、最小・最大KPU数の両方を適切に設定します。CloudWatchのKPUsメトリクスで実際の使用数を監視し、コスト上限アラートを設定します。想定外のスパイクを防ぐためAWS Budgetsで日次・月次の支出アラートを設定します。
7. 非冪等シンクによるExactly-once破綻
症状: シンク(出力先)に重複レコードが蓄積される。同一のイベントIDに対して複数のレコードが存在する。
原因: Flink側でExactly-once設定をしていても、シンクが冪等でなければ再処理時に同じレコードが重複書き込みされます。例えば、同一主キーにINSERTするだけのJDBCシンクや、S3への追記書き込みでは重複が発生します。Kafkaトランザクション対応シンクを使わずにEXACTLY_ONCEを指定した場合も同様です。
解決策: シンクが冪等な書き込みをサポートしているか確認します。JDBCの場合はINSERT … ON CONFLICT DO UPDATE(upsert)を使います。Kinesis Data Streamsシンクは2PCに対応しています。冪等書き込みが困難なシンクの場合は、後処理でSQLのROW_NUMBER()やDEDUP処理を組み合わせます。
8. Kinesis Data Analytics for SQL廃止の見落とし
症状: 2025年10月以降に新規アプリを作成しようとするとエラーになる。既存のKDA for SQLアプリが2026年1月以降に動作しなくなる。
原因: Kinesis Data Analytics for SQL Applications(旧サービス)は2025-10-15に新規作成を停止し、2026-01-27にサービスが完全削除されます。旧KDA SQLは現在のAmazon Managed Service for Apache Flink(旧KDA for Flink)とは別サービスです。開発ロードマップに旧KDA SQLを前提とした計画が残っていた場合、移行が必要です。
解決策: 既存のKDA for SQLアプリをStudio Notebooks(Flink SQL)に移行します。SQL構文の多くは互換性がありますが、カスタムUDFやコネクタ設定は書き直しが必要です。2025年中に移行を完了させるスケジュールを組み、不要なアプリは早期に廃止します(§7で移行手順を詳述)。
9. バックプレッシャの根本原因特定の遅れ
症状: エンドツーエンドのレイテンシが徐々に悪化する。特定のオペレータでキューが詰まる。backPressuredTimeMsPerSecondメトリクスが継続的に高い値を示す。
原因: バックプレッシャはシンク(下流)のボトルネックが上流に伝播する現象です。下流のオペレータの処理能力が不足すると、上流ではキュー溢れによりソースの読み取りも停止します。原因の特定を遅らせると、シャードラグが積み上がって処理は追いつかなくなります。
解決策: Flinkダッシュボードで赤くハイライトされているオペレータがボトルネックです。CloudWatchのbackPressuredTimeMsPerSecondメトリクスでオペレータ別に特定し、そのオペレータのparallelismを増やすか、処理ロジックを最適化します。シンクへの書き込みがボトルネックの場合は、バッファリングサイズを増やすか、シンクのスループット上限を確認します。
10. Studio notebooksのKPU課金見落とし
症状: Studio notebooksを使用しているだけでコストが発生し続ける。開発・検証のつもりが本番同様のコストになっている。
原因: Studio notebooksはインタラクティブなFlink SQL/Python/Scala開発環境ですが、起動中はKPUが課金されます。デフォルトでは開発用に追加のKPU(最低1 KPU)が常時稼働するため、使用していない時間帯もコストが発生します。
解決策: 開発が不要な時間帯はStudio notebooksを停止します。本番利用のManaged Flinkアプリとは分けて管理し、コスト配分タグを使って可視化します。Studio notebooksから本番アプリへのデプロイフローを確立し、ノートブックは開発・検証専用に限定します。
アンチパターンと正解
| アンチパターン | 問題 | 正解 |
|---|---|---|
ProcessingTimeベースのウィンドウ集計をそのまま本番投入 | 遅延イベントが欠損し集計精度が低下する | EventTime + ウォーターマーク設定。ウォーターマーク遅延は実測値の99パーセンタイルを採用する |
| チェックポイント間隔を10秒以下に設定 | チェックポイントのオーバーヘッドがスループットを圧迫し、バックプレッシャが常態化する | 60秒〜5分を基準に設定し、lastCheckpointDurationを監視して調整する |
| parallelismをKinesisシャード数の2倍以上に設定 | 過半数の並列タスクがシャード割り当てなしでアイドル化し、KPUだけ消費する | parallelism ≤ シャード数とし、シャード数変更時は同時にparallelismも変更する |
| 状態にTTLを設定せずに長期運用 | RocksDB状態が肥大化しOOMやチェックポイント遅延・compactionスパイクを引き起こす | 業務要件に応じたTTLをStateTtlConfigで設定し、managedMemoryUtilizationを監視する |
| 非冪等シンクでExactly-once設定を省略 | 再処理時に重複レコードがシンクに書き込まれデータ品質が低下する | シンクの冪等性を確認し、2PCまたはupsert方式のシンクを選定する。困難な場合は後段でDEDUP処理を設ける |
| KPU最大値を制限せずオートスケール | スパイク時にKPUが急増しコストが予算を超過する | 最大KPU数を設定し、CloudWatchアラートとAWS BudgetsでコストをDouble-checkする |
つまずきポイント クイックリファレンス
頻出トラブルの症状と確認すべきメトリクスをまとめます。
| 症状 | まず確認するメトリクス | 典型的な原因 |
|---|---|---|
| 集計結果がずれる・欠損する | numLateRecordsDropped | 処理時間とイベント時間の混同、ウォーターマーク過小設定 |
| スループットが低い・バックプレッシャ | backPressuredTimeMsPerSecond | チェックポイント間隔過短、シンクのスループット不足 |
| OOMエラー・チェックポイント失敗 | managedMemoryUtilization, lastCheckpointSize | RocksDB状態肥大、TTL未設定 |
| Kinesisラグが解消しない | MillisBehindLatest, parallelism値 | parallelism > シャード数の不整合 |
| コストが予算超過 | KPUs, Studio notebooks稼働状況 | KPU上限未設定、Studio放置 |
| 重複レコードがシンクに発生 | シンクのexactly-once設定確認 | 非冪等シンク + at-least-once設定 |
トラブル発生時の初期調査手順
- CloudWatchダッシュボードで
numRecordsIn/Out・KPUs・backPressuredTimeMsPerSecondを確認 - Flinkダッシュボード(マネジメントコンソール)で赤いサブタスクを特定
- CloudWatch Logsでエラーログを検索(
ERROR・WARN・Exception) lastCheckpointDurationとnumberOfFailedCheckpointsでチェックポイント健全性を確認- 状態サイズが増えていないか
lastCheckpointSizeのトレンドを確認
本番運用監視の設計
Managed Service for Apache Flinkの本番運用では、以下のCloudWatchメトリクスを体系的に監視することを推奨します。
スループット・レイテンシ系
| メトリクス | 説明 | アラート基準 |
|---|---|---|
numRecordsInPerSecond | ソースからの入力レコード数/秒 | 0に近づいたら異常 |
numRecordsOutPerSecond | シンクへの出力レコード数/秒 | 入力との乖離が大きい場合 |
currentInputWatermark | 現在のウォーターマーク(ms) | 停止したら異常 |
backPressuredTimeMsPerSecond | バックプレッシャ状態の時間/秒(ms) | 1000 ms/s以上で要調査 |
チェックポイント系
| メトリクス | 説明 | アラート基準 |
|---|---|---|
lastCheckpointDuration | 直近チェックポイントの所要時間(ms) | 設定間隔の50%超で警告 |
lastCheckpointSize | 直近チェックポイントのサイズ(bytes) | 急増したら状態肥大の兆候 |
numberOfFailedCheckpoints | チェックポイント失敗数 | 1以上で要調査 |
リソース系
| メトリクス | 説明 | アラート基準 |
|---|---|---|
KPUs | 現在のKPU使用数 | 最大KPU上限の80%超で警告 |
managedMemoryUtilization | マネージドメモリ使用率(0-1) | 0.9超で警告 |
heapMemoryUtilization | JVMヒープ使用率(0-1) | 0.8超で警告 |
CloudWatch Alarmを設定し、SNSと連携してOpsチームに通知する体制を構築します。Managed Flinkの組み込みFlinkダッシュボードはマネジメントコンソールの「Flink Dashboard」リンクから参照できます。各サブタスクのスループット・レイテンシ・バックプレッシャがリアルタイムで確認でき、トラブルシュートの第一歩として活用します。
まとめ
本記事では、Amazon Managed Service for Apache Flinkを本番運用するにあたり必要なApache Flinkの基礎概念から、アプリ設計・KDA for SQL移行・よくある落とし穴と解決策まで解説しました。
§2のポイント: ストリーム処理の核心はステートフル処理とイベント時間管理です。イベント時間+ウォーターマークで遅延データを正確に扱い、タンブリング・スライディング・セッションウィンドウでリアルタイム集計を実現します。Exactly-onceはチェックポイントと冪等シンクの組み合わせで実現し、CEPで複雑なイベントパターンも検知できます。
§3以降のポイント: Managed Flinkアプリの実行環境(KPU・parallelism・チェックポイント設定・状態バックエンド)とTable API/SQL・DataStream API・Studioの開発手法を適切に組み合わせることが本番安定運用の鍵です。
運用の優先事項: ウォーターマーク設定・チェックポイント間隔・parallelismとシャード数の整合・RocksDB状態TTL管理の4点を設計初期から組み込むことで、後からの大規模修正を避けられます。KDA for SQL移行は2026年1月の廃止期限に向けて計画的に進めることが重要です。
Flinkバージョン選定の考え方: Managed Flinkは定期的に新しいFlinkバージョンに対応します。新規アプリは最新安定バージョンを選択し、既存アプリは半年〜1年ごとのバージョンアップを計画します。最新バージョンではコネクタのAPIやデプロイ設定に変更が加わる場合もあるため、更新前にリリースノートで破壊的変更を確認します。
開発から本番への流れ: 新機能の開発はStudio notebooksでFlink SQLを試作し、正しいクエリと処理結果を確認します。性能テストはManaged Flinkアプリとして検証環境で実行し、並列度・チェックポイント・KPU設定をチューニングします。本番デプロイはスナップショットを取得した後、ブルーグリーンまたはカナリアデプロイで切り替えます。
コスト最適化の進め方: 初期構築時はKPUをやや多めに設定してスループットを優先し、安定稼働後に実際のKPU使用率を観察してリサイジングします。オフピーク時間帯のアプリ停止・再開を自動化することで、24時間稼働が不要なユースケースではコストを大幅に削減できます。チェックポイントのS3ストレージコストはライフサイクルルールで古いチェックポイントを自動削除します。
セキュリティ設計: 実行IAMロールには最小権限を付与し、KinesisやS3のARNを具体的なリソース名で指定します。MSKへのアクセスはIAM認証またはTLS+SCRAMを使用し、VPC内に閉じて構成します。CloudTrailでAPI操作ログを記録し、不正な設定変更を検知します。
本シリーズVol2への展望: Vol2では、Managed Flinkの高度なトピックとして複雑なストリーム結合パターン(Interval Join / Temporal Table Join)・状態マイグレーション戦略・大規模クラスターのパフォーマンスチューニング・カスタムコネクタ開発を扱う予定です。Vol1で解説したステートフル処理・ウォーターマーク・Exactly-onceの基礎理解がVol2の前提となります。
周辺サービスとの連携: Managed Flinkはストリーム処理のハブとして機能します。取込はKinesis Data StreamsまたはAmazon MSK(Kafka)が担い、処理後データはS3・OpenSearch・DynamoDB・Firehoseが受けます。本記事の姉妹記事(下記)でそれぞれの詳細設定を参照されたい。
障害対応プレイブック: 本番障害時の初期対応として、まずCloudWatchダッシュボードでアプリが稼働中かどうかを確認します。アプリが停止していた場合は最新チェックポイントから再起動します。処理が追いつかない場合(ラグ増加)はKPUを増やし並列度を上げます。チェックポイントが失敗している場合はRocksDB状態サイズとS3書き込みエラーを確認します。重大な問題の場合は事前のスナップショットから状態を復元し、旧バージョンにロールバックします。
アーキテクチャレビューのポイント: 本番構成を設計する際は、以下の4点を必ずレビューします。①取込レート vs parallelism vs シャード数の整合性、②チェックポイント間隔と状態サイズのトレードオフ、③シンクの冪等性とExactly-once保証の設計、④KPU最大値とコスト上限の設定。これらを設計フェーズで確認することで、本番リリース後の大規模修正を防止できます。
以上がManaged Service for Apache Flinkの本番運用で押さえるべきポイントです。基礎概念を正しく理解した上で、監視・チェックポイント・状態管理を丁寧に設計することが、安定したストリーム処理パイプラインの構築につながります。