MongoDB to BigQuery: フルマネージドなリアルタイムレプリケーションの実現

こんにちわ。プレイドでエンジニアをしている塩澤(@hshiozawa)です。

今回は、MongoDB から BigQuery へのリアルタイムレプリケーションシステムを開発したので紹介します。以前にも MongoDB から BigQuery へのリアルタイムレプリケーションについて取り組みましたが Production 環境で運用するには課題がある状態でした。その後、全体の設計を見直し、フルマネージドなサービスのみを使って Production Ready なシステムができたので紹介します。

背景

MongoDB

プレイドでは様々なプロダクトを開発していて、状況に応じて様々なデータベースを活用しています。その中でも顧客の設定情報など、アプリケーション用の汎用データには MongoDB をよく利用しています。プレイドでは多くのアプリケーションが Node.js で書かれていて、それと親和性が高いのもよく利用する理由の一つです。

もともとセルフホストした MongoDB を使っていましたが、現在は DBaaS の MongoDB Atlas を利用してフルマネージドな MongoDB を運用しています。

株式会社プレイド、MongoDB Atlas と Google Cloud で、消費者行動のインサイトを強化

MongoDB の運用については過去にもいくつかの記事を書いているので参考にしてください。

BigQuery

MongoDB と同じように多くのプロダクトで使っているのが BigQuery です。ほぼすべてのプロダクトで BigQuery を利用しているといっても過言ではありません。イベントデータの保存から解析にいたるまで様々なユースケースで利用しています。プレイドでは初期から BigQuery を利用していて、過去にはサンフランシスコで行われた Google Cloud Next で CTO 牧野が BigQuery を使ったサービスの構築例について登壇しています。

他にもプレイドには BigQuery の豊富な知見があり、MongoDB 同様にさまざまな記事も公開しているのでぜひ参考にしてください。

バッチによるレプリケーション

このような理由からプレイドでは BigQuery を使って MongoDB のデータを参照したいケースが多くあります。そのような理由から当初、MongoDB のデータを BigQuery にレプリケーションする仕組みを実装しました。バッチ処理によって MongoDB のデータを BigQuery にエクスポートするというものです。

batch.png

具体的には MongoDB 上のコレクションを対象にしてドキュメントを抽出・フィルタするアプリケーションがあり、 GKE の CronJob で定期的に実行します。ドキュメントは Line Delimited JSON(ldj)形式に変換してファイルとして Google Cloud Storage にアップロードします。BigQuery の外部テーブル機能を使って ldj ファイルを参照してクエリできる、という仕組みです。

一部では外部テーブルでパフォーマンス要件が満たせない場合もあります。その場合は、抽出・フィルタしたドキュメントを BigQuery のネイティブテーブルにロードしてから利用しています。

課題

既存のバッチによるレプリケーションは構成がシンプルであり、リトライやトラブル時の復帰が容易であるというメリットがありました。このような運用のシンプルさが大きなメリットです。ジョブが特定の時間に正常終了していることを確認すれば、データの品質の検証も簡単です。しかし、プレイドが提供するプロダクトの数が増え、レプリケーションするデータ量が増加し、データの利用用途も多岐にわたるようになりました。さらにマルチリージョンでのレプリケーションも必要になったころから、バッチによるレプリケーションに課題が顕在化しました。

リアルタイム性

バッチによるレプリケーションの最大の欠点はリアルタイム性の欠如です。もともと BigQuery にレプリケーションしたデータは日々の解析処理に利用されていました。その場合は解析ジョブの実行時までのデータが反映されていれば十分でした。しかし、次第に利用状況の確認、トラブル時の調査に利用されるようになりました。加えて、顧客が設定データを BigQuery から参照できる機能も提供され、解析処理がより高度になることで鮮度の高いデータがほしいという声が増えてきました。

レプリケーションコストの増加

バッチによるレプリケーションでは変更のないデータも常に転送する必要があります。そのため、データが増えるにしたがってレプリケーションにかかる時間的コスト、転送コストの増大が見られるようになりました。また、一部のユースケースではレプリケーション量が増えたことでアプリケーションの Out of Memory エラーなどが発生し対応に追われるなどして、メンテナンスのためのコストも増えていました。

スキーマ管理

MongoDB は基本的にスキーマレスなデータベースです。一方、BigQuery はスキーマが固定されたデータベースです。このギャップを埋めるために旧来の方式ではバッチ処理時にスキーマを決めていました。最初に BigQuery 側のスキーマを決めておき、それに合わせてレプリケーションロジックを実装していく方式です。

そのため、スキーマを事前に定義しておく必要があり、MongoDB 側で新たなコレクションやフィールドを追加した際の手間が大きく開発者体験が損なわれていました。必要な Mongo のデータベース・コレクションをコード上で追加、レプリケーションジョブをデプロイしなおしてから再実行する必要がありました。

ストリーミングによるレプリケーション

これらの問題を解消するためストリーミングによるレプリケーションを再設計しました。ポイントは次のとおりです。

Confluent Cloud Kakfa を利用

ストリーミングプラットフォームとして Confluent Cloud の Kafka を利用しました。Confluent Cloud はフルマネージドな Kafka を提供しているサービスです。Kafka は 2010 年頃から開発され、今では多種多様な業界でストリーミングプラットフォームとして利用されています。

作りたいのは MongoDB の Change Stream Event を使って変更を検知し、BigQuery に書き込むという比較的シンプルなストリームパイプラインです。しかし、Change Stream Event の位置記憶(watermark の保持)、スケール時の並行処理の考慮、エラー発生時のリトライ、パイプライン起動時のフル同期(バックフィル)、などの実運用に耐えるものを作るにはこれらの点を考える必要があります。Kafka を利用することでこれらのかなりの部分をカバーできることを期待して採用しました。

プレイドはこれまで Production 環境で Kafka を利用したことがなくセルフホストで Kafka クラスタを運用するのは現実的ではないと考え、フルマネージドの Kafka が利用できるサービスを探しました。Google Cloud や AWS も Managed な Kafka を提供していますが、一番運用が簡単そうな Confluent を選択しました。また、マネージドな Kafka Connect が利用できる点も Confluent を選んだ理由です。

パイプラインの構成

pipeline.png

パイプライン構成は上記のようになっています。

MongoDB と BigQuery のそれぞれに接続するために Confluent の提供する Fully Managed Connector を利用しました。

MongoDB Atlas Source Connector (以下、Mongo Connector) には過去のデータのバックフィル機能が備わっています。設定で startup.mode=copy_existing を指定するとすでに存在するドキュメントすべてを Kafka Topic に送信してから、Change Stream を使ったストリーミングが始まります。MongoDB を BigQuery にストリーミングする手段はいくつかあったのですが、バックフィルを他のコンポーネントを使わずにオプション一つで実現できるのはとても有用です。

Google BigQuery Sink V2 Connector for Confluent Cloud (以下、BQ Connector) は Storage Write API をつかった CDC による書き込みが実現できるのがよい点です。以前までの BigQuery は UPSERT を苦手とする印象がありましたが、これを利用することでドキュメントの削除や更新をダイレクトに反映することができます。また、Storage Write を利用しているので書き込みのパフォーマンスもかなりよいです。

MongoDB や BigQuery の Connector は他にもいくつか種類があります。MongoDB が提供するオープンソースのものDebizeum ベースのものが存在していて、これらを Custom Connector として利用できます。ただ、Fully Managed Connector は terraform や管理画面で設定を作るだけで利用できます。加えて、Confluent によってテストされておりトラブル時のサポートも受けられる点は大きな利点です。

BigQuery でのスキーマレス

スキーマ管理の問題を解決するためにBigQuery のネイティブ JSON 型を利用し、ドキュメント全体をすべて一つのカラムに保存することにしました。

schemaless.png

MongoDB Atlas の一つのクラスタに対して一つの BigQuery テーブルが対応します。Atlas クラスタ内のドキュメントはすべて、一つの BigQuery テーブルの fullDocument という JSON 型のカラムに保存するようにしています。

こうすることで同期時点でドキュメントのスキーマを考える必要がなくなりました。常にすべての MongoDB のドキュメントが BigQuery に同期されるようになります。新しく追加されたフィールドは、その段階ですぐに BigQuery のテーブルの fullDocument に入ります。

ただ、毎回 fullDocument の JSON にクエリを書くのは大変です。そこで下記のようにコレクションごとの View を作って扱いやすいかたちにしています。View をつくることでスキーマありの BigQuery のようなシンプルなクエリを書くことができます。また、新しいフィールドが必要になれば View を更新するだけで済みます。参照先のテーブルにはすべてデータがあるからです。

# Create View
CREATE VIEW View1 AS
SELECT
  fullDocument._id as id
  fullDocument.field1 as field1
  ...
FROM TableA
WHERE database = 'DatabaseX' AND collection = 'Collection1'

# Simple query for view
SELECT * FROM View1 WHERE field1 = 'value1'

もちろん、この方式にはいくつかデメリットがあります。まず、データの更新時には fullDocument カラムを更新するかたちになります。そのため、ドキュメントをフィールドごとに管理した場合よりも、更新時のコストは大きくなるでしょう。また、クエリ時にはネイティブとはいえ JSON をクエリするのでスロットをより多く利用していると思います。(両者とも実測はしていませんが)。

ただ、これらはマテリアライズドビューなどの方法を用いることで緩和できます。今回のユースケースを考えた場合には前者のメリットが後者のデメリットを大きく上回っていると判断しました。

実装のポイント

実際にシステムを構築するときに解決したいくつかのポイントについて書いていきます。

Kafka メッセージの変換

最終的に BigQuery に書き込むためにはパイプラインでのデータの変換が必要です。具体的には MongoDB の Change Stream Event を BQ Connector の期待する形式に変換する必要がありました。

Kafka Connect には Single Message Transforms (SMTs) という Connector 内でデータを変換する仕組みがあります。その名のとおり Single Message に対しての変換です。メッセージを集約するなどの変換はできませんが、メッセージ単位であればキーやバリューの加工ができるようになっています。ただし、Confluent の Fully Managed Connector は Apache Kafka の SMT と比較して制限があるため注意が必要です。

BQ Connector を使ってネイティブ JSON 型のカラムに書き込むためには JSON を stringify する必要がありました。この機能は直接的に SMTs には備わっていません。より自由度の高い加工ができる ksqlDB を導入すると JSON を stiringify することができますが、追加のコストがかかります。

今回は ksqlDB は使わずに Mongo Connector のスキーマ設定と SMTs でできるいくつかの変換を組み合わせることで BQ Connector の期待する形式に変換しました。具体的には output.schema.infer.value=falseとすると fullDocument の JSON が stringify された状態を保つことができます。これを利用して ksqlDB なしでのパイプラインを実現しています。

メッセージサイズ

検証のため Production 環境の MongoDB クラスタのレプリケーションを開始したところ、ごく一部のドキュメントサイズが大きすぎて Mongo Connector が停止してしまう問題が発生しました。MongoDB のドキュメントサイズの上限は BSON で 16MB ですが、Confluent の Kafka クラスタの扱えるメッセージサイズの上限は通常 8MiB です(Dedicated クラスタ除く)。

この問題には Mongo Connector に設定できる Change Stream の pipeline にドキュメントサイズをチェックする機構を導入して解決しました。大きすぎるドキュメントは中身を空にして同期する方式にし、BigQuery テーブルのステータスカラムを ERROR と記録しました。

"pipeline": [
  {
    "$addFields": {
      // ドキュメントサイズが大きすぎると Confluent Kafka でエラーになる。
      // Confluent Kafka でこの種のエラーハンドリングが難しいので
      // MongoDB から取得する段階でフィルタする
      "fullDocument": {
        "$cond": {
          "if": { "$gt" : [{ "$bsonSize" : "$fullDocument" }, 6291456] },
          "then": { "_id" : "$documentKey._id" }
          "else": "$fullDocument"
        }
      }
    }
  }   
]

当初、この問題に対して Kafka のメッセージ圧縮機能や Kafka Connector の Error Handling の仕組みを利用することを検討しました。しかし、通常のクラスタでは圧縮機能が自由に利用できないこと、メッセージサイズの上限エラーは Connector Error Handling が利用できないことが分かり、pipeline でフィルタする方法にしました。

Dedicated クラスタを利用するとメッセージサイズの上限を 16MiB にすることができます。ただ、ごくごく一部のドキュメントのために Dedicated クラスタのコストを払うのは合理的ではないと判断しました。

BigQuery change data capture (CDC) での更新

BigQuery Sink v2 Sink Connector を利用すると BigQuery change data capture (CDC) を利用でき、レコードの UPSERT や DELETE が実現できます。ただし、BigQuery CDC を実現するためには以下の要件を実装する必要があります。

  1. 書き込みに Storage Write API を使用する
  2. テーブルに主キーを決める
  3. 書き込み時に疑似列を設定する
    1. _CHANGE_TYPE (必須)
    2. _CHANGE_SEQUENCE_NUMBER (オプション)

BQ Connector を使って ingestion.mode=UPSERT_DELETEと設定すると 1 については実現できるので 2 と 3 について実装する必要があります。

2 の主キーについては database, collection, documentKeyの組み合わせを主キーとしました。当初、documentKey のみを主キーとしましたが、いくつかのコレクションで documentKey が重複することが分かったためです。MongoDB の仕様上、コレクション内で _id は一意ですが、コレクションをまたぐと一意でないケースがあります。今回はドキュメントをアプリケーションでコピーした場合に同一の _id となることがありました。

疑似列については BQ Connector 側で SMTs を使い _CHANGE_TYPE_CHANGE_SEQUENCE_NUMBER に値をいれるようにしています。_CNAHGE_TYPE は Change Stream の operationType に応じて UPSERTDELETED をいれます。_CHANGE_SEQUENCE_NUMBER は Change Stream Event の clusterTime規定された仕様にあわせて変換しています。こうすることでレコードの更新の順序が入れかわることを防ぎます。

BigQuery CDC テーブルでの block pruning

当初、database, collection でクラスタリングを設定していました。これは collection ごとにビューを作ったときに block pruning を有効にして実行を効率化することを目指していました。しかし、実際に検証するとクラスタリングを設定しても block pruning は有効になっていないことが分かりました。Google Cloud のドキュメント上では下記のように記載されていて block pruning が有効になるものと考えていました。

All three types of BigQuery CDC jobs take advantage of BigQuery clustering,

この件を Google Cloud のサポートに問い合わせたところ、この文は block pruning ではなく他の最適化のことを指していることが分かりました。結果としてクラスタリングを設定するだけでは block pruning は有効にならないことが分かりました。

block pruning を有効にするためには max_staleness を設定する必要があります。CDC テーブルはマスターテーブルと差分テーブルに分かれていて、クエリ時に最新の結果を得るために自動でこの二つのテーブルのマージクエリが実行されます。max_staleness を有効にすると、ある程度の古い結果が返ることを許容することになりますが、マージクエリを避けマスターテーブルにだけクエリすることができます。

実際に max_staleness を設定すると block pruning が効くようになりました。ただ、この値は最小は1分から設定できますが、検証したかぎりは10分以上に設定しないとマージクエリが走ってしまって block pruning が効かないことが分かりました。この点については Google Cloud 側にフィードバックしていて、今後マージクエリに対してもクラスタリングによる block pruning が有効になることを期待しています。

データの再同期

運用上重要なのは問題があった場合のデータの再同期です。何かしらの不整合・エラーが起きた場合にクラスタのデータを再同期したいケースが想定されます。再同期するための方法はいくつかあり、一番確実なのはパイプラインをすべて削除してから作りなおし再同期する方法です。

ただ、よりシンプルな方法として MongoDB Connector が保持している offset を削除する方法が使えることが分かりました。offset を削除すると Connector の状態をリセットして、データを再同期することができます。Kafka クラスタの REST API を利用できるので、下記のリクエストを発行すると再同期することができます。

POST /connect/v1/environments/{env_id}/clusters/{cluster_id}/connectors/{connector_name}/offsets/request
{
  "type": "DELETE"
}

パイプラインを削除せずに再同期ができるので BigQuery テーブルを削除せずにすみます。再同期中であってもデータが何も参照できない状態を防ぐことができます。ただし、この再同期は結果整合です。再同期がすべて終わるまでは再同期の後に発生したドキュメントの変更が反映されてない点は注意が必要です。実際には Github Actions からこのリクエストを実行できるようにしているので、何か問題があったときにはすぐに再同期が可能です。

更なる取り組み

ここまで書いたように積極的にマネージドなサービスを用いることで MongoDB から BigQuery へのリアルタイムレプリケーションを実現することができました。しかし、マネージドサービスを利用したことでコストの面では改善点があります。

このアーキテクチャでは同期する MongoDB Atlas クラスタが増えると、増えたパイプラインの数に比例してコストが増加します。Confluent においては Kafka のクラスタの利用料に加えて、Active な Kafka Connector ごとに利用料がかかるため、現状のコスト構造は

  • cluster_price + pipeline_num * pipeline_unit_cost

となっています。

プレイドはプロダクトごとに MongoDB Atlas クラスタを用意しています。現時点でもそれなりにプロダクトがありますが、今後さらに Atlas クラスタが増えていくと、コストの増加が問題になることが予想されます。

この点は Confluent のコスト体系の改善に期待しつつ、アーキテクチャレベルの改善にも取り組んでいます。もちろん、単純にパイプラインをまとめることでコストは抑えることができますが、管理・運用面、分かりやすさでのデメリットが大きくなります。そこで、論理的に独立したパイプライン構造を維持しつつ、コスト構造はパイプラインに比例しないシステムの構築に取り組んでいます。

まとめ

Confluent Cloud の Kafka を使って production ready な MongoDB から BigQuery へのリアルタイムレプリケーションを実現しました。ストリーミングアーキテクチャを採用することでリアルタイム性を確保し、データ量増加によるコスト増大やレプリケーション対象の追加に伴う手間を軽減しました。実際に10分ほどで新しい MongoDB クラスタの同期を開始できるようになっています。また、BigQuery のネイティブ JSON 型を活用することでスキーマ管理の課題を解消し、MongoDB のスキーマレス特性を活かしたシンプルな運用を実現できました。

実装ではパイプライン構築、エラーハンドリング、BigQuery CDC の利用、パフォーマンスの課題に直面しましたがうまく解決することができました。その中で Confluent や Kafka のノウハウ・経験を得られたこと、BigQuery CDC に対する深い理解ができたことも大きな成果です。

このシステムはすでにいくつかのプロダクトの Production 環境で利用し、安定して動作しています。今後、既存のバッチ処理によるレプリケーションを完全にこのリアルタイムレプリケーションに置き換える予定です。これによりデータのリアルタイム性をさらに強化し、運用効率の向上とコスト削減を実現していきます。