
MongoDB Atlas Stream Processing を使った BigQuery へのレプリケーション
Posted on
こんにちは。プレイドでエンジニアをしている塩澤(@hshiozawa)です。
昨年末、MongoDB Atlas から BigQuery へデータをリアルタイムレプリケーションする仕組み(ブログ記事)を作りました。しかし、コスト面の課題が残っていたため、既存のバッチシステムをこの新しい仕組みへと完全に移行できませんでした。
その後、MongoDB の Atlas Stream Processing というマネージドサービスをうまく使うことでコスト面での問題を解決し、バッチシステムをすべて置き換える目処がたちました。Atlas Stream Processing は比較的新しいサービスであり、日本語の情報はほとんどありません。グローバルで見てもプロダクション環境での利用事例はまだほとんどなさそうでした。
このような背景を踏まえ、なぜ私たちが Atlas Stream Processing という新しいサービスを採用して、どのようなアーキテクチャを採用し、どうやって課題を解決したのかについて述べていきます。
おさらい
以前のブログ記事で既存のバッチシステムによる MongoDB から BigQuery へのレプリケーションの課題と、それを解消する Confluent Cloud を使ったリアルタイムレプリケーションについて記事にしました。ここでは簡単にそのシステムの概要と、解決できなかった課題についておさらいします。
システムのアーキテクチャは上のとおりです。それぞれの部分を簡単に説明すると次のとおりです。
- MongoDB Atlas の Database Cluster ごとに作成した Atlas Source Connector に Change Event を送信する
- Source Connector は Change Event を受け取って Kafka Topic に書き込む
- BigQuery Sink v2 Connector は Atlas Cluster ごとに作成した BigQuery テーブルに document 全体を書き込む
- 利用者は BigQuery Logical View を介して、スキーマ定義されたレコード(= document)を読み取る
Confluent Cloud 上にストリーミングパイプラインを構築し、Confluent の提供するマネージドな Kafka Connector を使うことで、フルマネージドなシステムを作りました。BigQuery の JSON 型のカラムに1つのドキュメントをまとめて書き込み、Collection に対応する Logical View として定義することで MongoDB のスキーマレス特性を活かした運用を実現しました。
残った課題
すでにいくつかのプロダクトではこのシステムを利用し、多くの点でメリットを感じていました。一方、前回の記事の “更なる取り組み” に書いたようにコスト面での課題が残りました。パイプラインを増やしていくと、コストがそれに比例して増えていくという非常に現実的な問題です。
このアーキテクチャではコスト構造は次のようになっています。
Total cost = Kafka cluster cost
+ Network usage cost
+ Pipeline number * Pipeline unit cost
パイプライン単位コストは次のようなものです。
Pipeline unit cost = Source connector unit cost + Sink connector unit cost
プレイドでは MongoDB に大量のデータは入れていないので、Kakfa クラスタリソース(CPU やメモリ)、ネットワーク帯域の量は大きな比重を占めていません。この中で大きな比重を占めたのは パイプライン数 * パイプライン単位コスト の部分です。特に MongoDB と繋ぐ Source Connector の単位コストが高めになっていました。
この構成ではパイプライン数はレプリケーションする Atlas Cluster が増えるに比例してコストが増えていきます。本番環境と同等の構成をステージング環境にも構築するため必要なパイプライン数は2倍になります。Managed Kakfa Connector は Connector を作成するとコストがかかるため、利用頻度が少ないステージング環境でも同等のコストがかかります。
さらにプレイドはマルチプロダクト化を進めていて、プロダクトごとに Atlas Cluster を作るケースが多いです。結果、プロダクトが増えると階段状にコストが増えていきます。結論として、このコスト構造のままではシステムをさらに展開していくのは困難でした。
MongoDB Atlas Stream Processing の採用
これらの課題を解決するため、MongoDB の提供する Atlas Stream Processing (以下、Stream Processing) を採用しました。具体的には、すでに構築したシステムの一部をこの Stream Processing で置き換えることで上で述べた課題を解決しました。
Atlas Stream Processing | MongoDB
Stream Processing は、MongoDB の Change Streams を利用したリアルタイムにデータの変更イベントを取得・処理できるストリーム処理基盤です。具体的には、MongoDB 上で発生したドキュメントの変更イベントを専用のクエリ API を利用してストリームとして取り出して、リアルタイム処理することができ、データ連携やパイプライン構築に利用できます。2025年4月時点では書き込み先として Apache Kafka クラスタと MongoDB Atlas Time Series コレクションがサポートされています。
以下では Stream Processing の特徴を紹介しながら、採用した理由について述べていきます。
Apache Kafka への書き込み
当初、Confluent Cloud 以外のマネージド Kafka サービスへの置き換えを検討していました。しかし、これまでのシステムを再構築するのは不必要だと感じていました。金銭的なコストをのぞき、Confluent Cloud で構築したフルマネージドなシステムはとてもよくできていました。実際にいくつかのプロダクトでプロダクションで運用していました。また、Kafka Connector という仕組みを使う以上、パイプライン数に比例するというコスト構造は他の Kafka サービスでも同様でした。
そこでボトルネックとなっている MongoDB の Source Kafka Connector だけを置き換えられないかを検討しました。そしてこの Stream Processing はまさにそのユースケースにとてもよく当てはまるものでした。MongoDB から変更イベントを取得して Kafka に書き込むマネージドなサービスはまさに探していたものでした。
MongoDB の公式ブログに Kafka Source Connector の代替として使うケースが紹介されていて、検討段階から比較的強い確信を持つことができました。
It’s important to note that Atlas Stream Processing was built to simplify complex, continuous processing and streaming analytics rather than as a replacement for the Kafka Connector. However, even for the more basic data movement use cases referenced above, it provides a new alternative to the Kafka Connector.
ref: Atlas Stream Processing: A Cost-Effective Way to Integrate Kafka and MongoDB
コスト構造
Stream Processing の構成とコストについて説明します。
Stream Processing では一番外側のコンポーネントとして Stream Processing Instance (SPI) (①)があります。SPI を作るときにクラウドプロバイダー、リージョン、Tier を決めて、この組み合わせで単位コストが決まります。High Tier なインスタンスほど利用できる CPU と RAM が増えますが、1時間あたりの単位コストは増えます。
実際の総コストは SPI の中で動く Worker (②)の数によって決まります。Worker の数はストリーム処理ユニットである Stream Processor (SP) の数に比例します。具体的には 1 つの Worker は 4 つの SP を動かすことができ、4SP であれば 1 Worker、10 SP であれば 3 Worker となります。
Cost = Worker number * Instance unit cost
Worker number = Ceil( SP number / 4 )
この構造はパイプライン数に比例したコスト増加に困っていた私たちにとって非常に都合がいいものでした。インスタンスの Tier を選ぶこともできるので、「ステージング環境は Low Tier インスタンス」、「本番環境はリソースの多い High Tier なインスタンス」というコスト最適化も可能です。
パイプライン数に比例する構造であることに変わりはないのですが、比例係数が 1/4 になっていることと、そもそものインスタンスの単位コストが低いことで、コストを大幅に抑えることができました。
開発者体験のよさ
Stream Processing は MongoDB Atlas で提供されていて Atlas Database と非常に親和性が高いです。
プレイドはアプリケーションデータの保存先として Atlas Database を使っています。私たちはもともとセルフホストした MongoDB を利用していましたが、MongoDB Atlas に変えることでインフラを管理する手間を大幅に削減してくれました(参考)。Stream Processing も Atlas Database と同様にサーバーレスなサービスです。インフラを管理する手間はありませんし、新たなサービスを利用せずに MongoDB Atals で完結できるのはとても好都合です。
パイプラインの定義に利用する API や Syntax も MongoDB のクエリ記法と同じものを利用するので、学習コストも低く MongoDB のドキュメントモデルをベースとした概念でパイプラインを設定できるので開発者体験もよかったです。
また、より具体的な例だと、以前は同期する Atlas Cluster ごとに Change Stream の READ 権限を持ったアカウントを Kafka Connector のために払い出していました。Kafka Source Connector と Atlas Database の認証方法も限られており運用上の手間がありました。その点、Stream Processing なら Connection 作成時に Atlas Cluster の名前を指定するだけで連携ができるので、運用の手間が大きく下がりました。
新しいアーキテクチャ
Stream Processing を使った新しいアーキテクチャについて述べます。
一番のポイントは Kafka の MongoDB Source Connector を Stream Processing で置き換えた部分(①)です。加えて、さらなる最適化のためパイプラインの数だけ立ち上げていた BigQuery Sink v2 Connector を一つに集約しました(②)。これ以外に Stream Processing ではデータをバックフィルする仕組みが用意されていなかったため独自にアプリケーションを作成しました(③)。それぞれについて詳しく見ていきます。
Source Connector の置き換え
置き換え後の内部構成は次のようになっています。
Stream Processing では、Connection という論理的なオブジェクトで外部との接続を管理します。今回は Atlas Database ごとに Connection を作成しています。接続先が Atlas Database であれば接続の設定はとても簡単で専用のアカウントを払い出したりする必要はありません。一方、Kafka Cluster との接続のために1つだけ Connection を作成しました。この Connection の設定には Confluent Cloud との接続情報を設定しています。
次に、実際のストリーム処理を行う Stream Processor を作成します。Stream Processor への入力($source
)として、Atlas Database Connection を1つ指定します。Stream Processor では、前回の記事で述べた “Kakfa メッセージの変換”、“メッセージサイズ” に対応する変換処理やサイズチェック処理を実装しています。パイプライン処理は MongoDB の記法をそのまま利用できるので Kafka Connector での設定をそのまま流用できました。
最後にメッセージを Kafka に書き込みむように設定します。出力先の Connection($emit
)には Kafka Cluster Connection を指定します。この時、書き込む先の Topic を選択できるので対応する Topic を指定します。
実際の Stream Processor のパイプライン定義を抜粋すると次のようになっています。
{
"pipeline": [
{
"$source": {
"config": {
"pipeline": [
// 各種、必要なメッセージの変換処理
// 1. BigQuery CDC 形式への変換
// 2. サイズののチェック
// ...
]
},
"connectionName": "ConnectionForClusterA"
}
},
{
"$emit": {
"connectionName": "Kafka_Connection",
"topic": "KafkaTopicForClusterA"
}
}
]
}
Sink Connector の集約
Stream Processing への置き換えに加え、パイプラインごとに作成していた BigQuery Sink v2 Connector を一つにしました。
BigQuery Sink v2 Connector には、複数の Kakfa Topic をサブスクライブし Topic ごとに書き込む先の BigQuery テーブルを指定する機能があります(topic2table.map
)。これを使って1つの Sink Connector でこれまでと同じ機能を実現できます。また、BigQuery Sink v2 Connector は1つで高いスループットを出せるという情報を知ったので、パイプラインごとに Connector を作ることはやめました。
Sink Connector の並列数(Task 数)は簡単に増やすこともできるので、スループットが落ちた場合には管理画面か管理 API から並列数を増やすことで対応します。もちろん、並列数に応じてコストが増えるので注意は必要ですが、必要な時に簡単に Task 数を増減できる柔軟性があります。
バックフィル機構
2025年4月の時点では Stream Processing には Kafka Connector にある既存のデータを同期する機能(バックフィル)がありません。レプリケーションではこの機能は必須なのでいくつかの方法を検討して、最終的に独自にバックフィル機能を作りました。もともとフルマネージドな仕組みで作ることを前提にしていたので、新たにコードを書いてアプリケーションを作ることには少し迷いがありましたが、いくつかの理由から独自で実装することに決めました。
一つ目の理由は、「遠くない未来に Stream Processing にもバックフィル機構が実装されるだろう」と予想しているためです。それまでの間、自作したアプリケーションを使えればいいと考えました。Stream Processing は Kafka との連携を前提に設計されていて、Kafka Connector の代替としてのユースケースはかなり多いと考えています。そのような背景からバックフィル機構(mode=copy exists)と同等の機能はそのうち提供されると予想しており、実現したらそちらに切り替えればよいと判断しました。
二つ目の理由は、パイプラインの外側に独立した仕組みとして実装できそうだったからです。仕組みとして、
- Stream Processor の設定情報を Mongo Atlas 管理 API から取得
- 対象 Database の Collection の全 Document をすべて取得する。
- この時に上で取得した Stream Processor のパイプラインを設定を利用する。
- Kafka Message に変換して Kafka Client を使って Topic にパブリッシュする
というかなり単純な作りとしました。また、Mongo Source Connector はソースコードが公開されているので copy_exists の実装を参考にすることができました(github)。
メトリクスの取得
Production 環境で利用するためにはシステムの監視とモニタリングは欠かせません。特にストリーム処理のように常時稼働し続けるシステムでは、「止まっていないか」「データが流れているか」「スループットは落ちていないか」といった状態を継続的に観測できる仕組みが求められます。
プレイドでは、メトリクスの収集・可視化・アラートのすべてを一貫して Datadog に統一していますが、Stream Processing は現時点(2025年4月)では Datadog とのネイティブな統合機能を提供していません。また、Stream Processing 自体も監視機能はまだ限定的で、GUI 上では起動中か停止中か、失敗しているか程度の状態しか把握できず、メトリクスの取得方法も発展途上といえます。
そこで私たちは、Stream Processing の管理用 API を定期的に呼び出し、取得したメトリクスやステータスを Datadog に転送する専用の Bot を自作しました。この Bot は Stream Processor ごとの稼働状況、メトリクスをモニタリングし、Datadog に送信しています。そして、Datadog 側で一定条件を超えた場合にはアラートを発報する設計にしています。
この部分でも今後の機能追加である程度対応されるものだと予想しています。Datadog との Integration、ログ・モニタリング機能の向上などの更なる改善を期待しています。
まとめ
Stream Processing はバックフィル機構、メトリクス取得、管理画面など、今後の改善に期待したい部分はありつつも、Production 環境で利用できる安定性・信頼性はあると感じています。Stream Processing は MongoDB が提供しているので MongoDB との親和性が高く、さらに MongoDB Atlas を使っている組織にとって非常に導入しやすいです。MongoDB Atlas を利用している組織でストリーム処理を検討している場合、Atlas Stream Processing を一度試してみる価値は大いにあると思いました。
また、Stream Processing の出力先として Kafka 以外の選択肢も増えるとより多様なユースケースに対応できると感じました。MongoDB の Head of Streaming Products の Kenny Gorman 氏へのインタビューを見ると Kafka 以外への書き込みのリリースも予定しているようです。プレイドは特に Google Cloud の PubSub の利用ケースが多いので PubSub への書き込み対応も期待したいところです。
今回の取り組みで Atlas Stream Processing を活用して従来の Kafka Connector ベースのアーキテクチャで抱えていたコスト面での課題を解消できました。特にパイプライン数の増加に比例したコスト構造を解消できた点は大きく、よりスケーラブルなストリームレプリケーション基盤になっています。すでに Stream Processing を使って 20 以上のパイプラインを安定運用し一日に300万以上の変更イベントを処理して、BigQuery にデータをレプリケーションしています。これによりデータのリアルタイム性をさらに強化し、運用効率の向上とコスト削減を実現していきます。