MongoDB AtlasからBigQueryへのリアルタイムレプリケーション

はじめに

こんにちは、エンジニアインターンの八谷(やたがい)です。プレイドには2023年3月から10月の7ヶ月間インターンとして在籍し、前半はセキュリティチームにて、後半はプラットフォームチームにて開発に携わりました。

今回のブログではプラットフォームチームにて取り組んだ、異なるデータベース間のリアルタイムレプリケーションについて解説いたします。

MongoDB AtlasからBigQueryへのリアルタイムレプリケーション

背景

プレイドではアプリケーションのデータベースとしてMongoDB Atlas(以下MongoDB)、データ解析のためのデータウェアハウスとしてBigQueryを利用しています。MongoDBはドキュメント指向で高い柔軟性を持つ、アプリケーションデータの保管に適したNoSQLデータベースであり、BigQueryは大容量のデータのスケーラブルな分析を可能にするデータウェアハウスです。この二つのサービスを使い分ける中で、アプリケーションのデータをリアルタイムに解析する、つまりMongoDBからBigQueryにレプリケーションを行うニーズが必然的に生まれてきました。

今回のレプリケーション機構の実装前から、すでにプレイドではMongoDBからBigQueryへのレプリケーションが行われていました。具体的にはMongoDBからGoogle Cloud Storage (以下GCS)に一度データをコピーし、定期的にGCSからBigQueryにデータを送り込むという流れで実装されていたのですが、このレプリケーションの間隔は一日に一度であり、リアルタイム性が低いという課題が残っていました。

リアルタイム性の高いレプリケーションを行うためには、MongoDB側の変更を検知する仕組みと、BigQueryテーブルの効率の良い更新手法が提供されていなければなりません。MongoDBにはコレクションごとに変更を検知して関数を実行できるDatabase Triggerが提供されています。またBigQueryでは、API経由でテーブルをストリーミング更新する方法としてStorage Write APIが提供されています。Storage Write APIは、従来のStreaming Insert APIと比較して高速、低コストであり、At-least-onceセマンティクスなど、より最適化された書き込み方法がサポートされています。

また、レプリケーションは単にデータを追加していくだけでは実現できず、元データベース(この場合MongoDB)でのデータの削除や更新にも対応する必要があります。2022年下旬に、Storage Write APIを用いてUpsert/Deleteを可能にする「BigQuery CDC」が追加されました(2023年10月現在pre-GA機能)。このCDCでは、通常のテーブル変更の操作に_CHANGE_TYPEという仮想カラムを設定することで、レコードの更新や削除を可能にします。

これらの機能を組み合わせれば、よりリアルタイム性の高いレプリケーションを実現することができると考え、検討段階に入りました。

設計時の制約

MongoDBの要件

プレイドではマイクロサービスを採用しているため、MongoDBのアプリケーションデータも複数のデータベース、複数のコレクションに跨って保管されています。つまり、レプリケーションの際には複数のコレクションの変更を検知できるような機構を設計する必要があるのです。また、MongoDBのDatabase Triggerはコレクションごとにしか設定できず、それらの運用が破綻しないような設計を考える必要があります。

BigQuery CDCの要件

BigQuery CDCを利用するには以下の要件を満たす必要があります

  • デフォルトのストリームでStorage Write APIを使用する
  • 書き込み先のテーブルに主キーを設定する
  • 書き込み先のテーブルをクラスタ化する
  • _CHANGE_TYPEという仮想カラムを設定してUpsertもしくはDeleteを指定する

https://cloud.google.com/bigquery/docs/change-data-capture?hl=ja#prerequisites

また書き込みの際に重要な制約として

  • At-least-onceセマンティクスを使用すること
  • 書き込み先テーブルが事前に作成されていること

以上の二つが挙げられます。通常のStorage Write APIには書き込みの際にCREATE_IF_NEEDEDという、指定されたテーブルがない場合は作成するオプションを設定することができるのですが、BigQuery CDCではサポートされていません。

実装方法の選択肢と検討

Database TriggerとStorage Write APIを使ったレプリケーションの手法としては主に以下の4つの選択肢が存在し、それぞれについて検討を行いました。

1. Database Triggerの関数から直接書き込み

MongoDBのDatabase Triggerの関数から直接BigQuery CDCを利用してBigQueryテーブルに書き込むことができれば、Pub/Subなど他のコンポーネントを挟まないため最もメンテナンスコストが低くなります。しかし、現状BigQuery CDCはJavaのBigQueryクライアントライブラリのみでサポートされており、Database Triggerの関数はNode.jsのみで記述可能であったため、検討の初期段階でコンポーネントを挟まない書き込みは断念せざるを得ませんでした。

2. Googleが提供するDataflowテンプレート

Dataflowとは、Google Cloud PlatformのETLサービスです。ETLとはあるシステムから別のシステムへのデータ共有のためのツールであり、Dataflowでは例えばGCSからBigQuery、MongoDBなど外部サービスからGCS, BQなど内部サービスへのデータ共有がサポートされています。

Dataflowにはあらかじめパッケージ化されたテンプレートが提供されています。MongoDB to BigQueryに関してもバッチ処理、ストリーミング処理両方のテンプレートが提供されており、今回はそのうちストリーミング処理のCDCテンプレートの利用を検討しました。

このテンプレートでは、以下の図1に示されるように、MongoDBのコレクションからPub/Sub、そしてDataflowを挟んでBigQueryのテーブルというデータの流れでレプリケーションが実装されています。

y_1.png
(図1. CDCテンプレートのフローの図)

ユーザーはMongoDBのURI、データベースとコレクション名、書き込み先のBigQueryテーブル、使用するPub/Subのトピック名とその他オプションを指定します。MongoDBコレクション to Pub/Subの部分は自らDatabase Trigger関数を実装する必要がありますが、スキーマなどはテンプレート側がよしなに調整し、BigQueryテーブルにはStorage Write APIを用いて書き込みが行われます。

このテンプレートを使うメリットとして、レプリケーション機構の多くの部分をテンプレートが担っており、私たちユーザーのメンテナンスコストが低い点が挙げられます。書き込みにStorage Write APIのAt-least-onceセマンティクスを使うよう指定できるオプションも提供されており、BigQuery CDCが使えるのではないかと考え実際にレプリケーションを行ってみたのですが、**_CHANGE_TYPEを自ら設定することができず、Insert以外の操作が行えなかったため**このテンプレートの採用には至りませんでした。

3. BigQuery Subscription

BigQuery Subscriptionは、Pub/Subトピックのサブスクリプションの種類の一つです。Storage Write APIがサポートされており、トピックにpublishされたメッセージをDataflowなどを挟むことなく直接BigQueryテーブルに書き込むことができます。これは、今回のレプリケーションのように、ETLでデータの変換作業が必要ない場合に有用です。

選択肢2のDataflowテンプレートと同様にMongoDBコレクション to Pub/Subの部分は自ら実装する必要がありますが、Pub/Sub to BigQueryテーブルの部分はDataflowを使う必要がないため一つコンポーネントが減り、よりメンテナンスコストが下がるというメリットがありました。

y_2.png

(図2. 選択肢2と選択肢3のフローの比較の図)

しかし、BigQuery SubscriptionはAt-least-onceセマンティクスを設定するオプションが存在せず、先に述べたBigQuery CDCの要件を満たしていないため採用には至りませんでした。

4. Dataflowパイプラインの自前実装

データのフローは選択肢2のDataflowテンプレートを用いる場合と全く同じですが、テンプレートを用いずにパイプラインを自前で実装することで、選択肢2では不可能だった_CHANGE_TYPEの設定を可能にしています。そもそも自分で全て書くので_CHANGE_TYPE云々だけでなくJavaのDataflow SDKで可能なことは全てできるわけですが、他の選択肢に比べてメンテナンスコストは高くなります。他の選択肢を検討した結果、現状BigQuery CDCを使用するには自分でパイプラインを作るしかないという判断に至ったため、なるべくDataflowとそのベースになっているApache Beam以外のライブラリは使わず、運用しやすいコードを目指して自前実装をすることにしました。

以上の検討から、MongoDBではDatabase Triggerで各コレクションの変更を検知してPub/Subトピックにメッセージをpublishし、Pub/Subトピックにsubscribeし、BigQueryテーブルに書き込みを行うDataflowパイプラインを自作する方向で実装を進めていくことが決まりました。以下にコレクション単位でのレプリケーションフローを図示します。

y_3.png

(図3. 今回の機構の最終的なフロー)

Database Trigger関数の実装

MongoDBのコレクションに変更が加えられたときに発火する関数を実装していきます。「MongoDBの要件」でも述べたように、トリガー自体はコレクション一つ一つに設定していく必要がありますが、関数は同じものを再利用することができます。再利用可能な関数にするために、発火時に渡されるchangeEventオブジェクトから変更元のコレクションとオペレーションタイプ(更新or削除)を取得し、Pub/Subトピックにそれらの情報を含むメッセージをpublishします。

import { PubSub } from "@google-cloud/pubsub";

const PROJECT_ID = "gcp-project-123";
const TOPIC_ID = "pubsub-topic-id-123";

const OP_TYPE_UPSERT = ["insert", "replace", "update"];
const OP_TYPE_DELETE = ["delete"];

exports = async (changeEvent) => {
  try {
    // SA Credential is stored as an environment variable on Atlas
    const pubsubClient = new PubSub({ projectId: PROJECT_ID, credentials: context.values.get("GCP_JSON_KEY_SECRET") });
    const _id = changeEvent.documentKey._id;
    const opType = changeEvent.operationType;
    let mutation_type = null;
    // BQ Row Mutation only accepts UPSERT or DELETE as its change type
    if (OP_TYPE_UPSERT.includes(opType)) {
      mutation_type = "UPSERT";
    } else if (OP_TYPE_DELETE.includes(opType)) {
      mutation_type = "DELETE";
    } else {
      console.log(`Change Event Ignored: opType ${opType} is neither UPSERT nor DELETE event`);
      return;
    }
    // Create bq table name out of changeEvent.ns, which has db name and a collection name
    const ns = changeEvent.ns;
    const destination_table = `${ns.db}-${ns.coll}`;
    // Parse provided hybrid logical clock into UTC-formatted timestamp
    const hlc = changeEvent.clusterTime.toJSON()["$timestamp"];
    const timestamp = new Date(hlc.t * 1000).toUTCString();
    // fullDocument field is only provided when a certain data is inserted or replaced, not when one is deleted
    const source_data = mutation_type === "UPSERT" ? JSON.stringify(changeEvent.fullDocument) : "";
    const data = { _id, timestamp, source_data, mutation_type, destination_table };
    const topic = pubsubClient.topic(TOPIC_ID);
    const dataBuffer = Buffer.from(JSON.stringify(data));
    const messageId = await topic.publish(dataBuffer);
    console.log("message published: ", messageId);
    return messageId;
  } catch(err) {
    console.log("error performing event-driven process: ", err.message);
  }
};

Database Triggerには外部パッケージを登録して関数内で使用できる機能があります。今回はPub/Subのクライアントライブラリを使用するため、前もって@google-cloud/pubsubパッケージを登録しておきます。

関数内ではまずPubSubクライアントを作成し、環境変数に設定したクレデンシャルで認証を行います。その後入力のchangeEventオブジェクトから変更されたドキュメントのIDとオペレーションタイプを取得します。オペレーションタイプの中にはコレクションやデータベース自体の削除などもありますが、今回はドキュメントの更新と削除の場合のみ対応します。

再びchangeEventからコレクション名とデータベース名を取得します。上のコードの場合、BigQuery側ではdestination_tableの名称のテーブルがあらかじめ作成されている必要があります。

次に変更時のタイムスタンプを取得するのですが、MongoDBではタイムスタンプのフォーマットとしてHybrid Logical Clockが使われています。これは分散データベース向けのタイムスタンプで、論理時計と物理時計を組み合わせたフォーマットで提供されます。changeEventの中では文字列として渡されるため、これをJSONに変換して物理時計の部分を取り出し、BigQueryテーブルにもTIMESTAMP型として挿入可能な形に変換します。

最後に、ドキュメントが更新された時のみ更新後のドキュメントを取得し、それらを含むメッセージを指定したトピックにpublishします。

Database Trigger関数のデプロイ

Database Triggerはrealmプロジェクトに内包され、realm-cliを使ってローカルからデプロイすることができます。検証段階では関数を変更するたびにコマンドを叩いてデプロイしていれば良いのですが、プロダクションでの運用となるとそうは行きません。

MongoDBはRealmプロジェクトをGitHubから直接デプロイできる機能を提供しています。当初はこの機能を利用して、リポジトリの変更があるたびに自動的にデプロイされるように設定することを検討していたのですが、ここで問題が発生しました。

上の実装コードにもあるように、PubSubクライアントの認証に使うクレデンシャルは環境変数に保存しています。しかし、Realmプロジェクトをrealm-cliでpullすると、環境変数の内容も含まれてしまうのです。このままGitHubに上げてしまうとクレデンシャルが露出することになりますが、環境変数を消して上げると今度は環境変数のないプロジェクトがデプロイされてしまい、クラウド上の環境変数も消滅してしまいます。

この問題に対応するために、GitHubリポジトリ内からは環境変数を消し、CIで環境変数を追加した後MongoDBにデプロイするという手法を取りました。MongoDBが提供する自動デプロイ機能は使えませんが、これが現状最もセキュアなデプロイ方法だと考えています。

トリガーが大量発生する問題と対応

先にも述べたように、Database Triggerはコレクション一つ一つに設定する必要があります。図3のコレクション単位のレプリケーションフローは至ってシンプルに見えますが、コレクション単位ではなくプロジェクト全体で見るレプリケーションフローは以下のようになります。

Y_4.png

(図4. プロジェクト全体で見るレプリケーションフロー)

上記で説明したきたような手法を愚直に実装すれば、コレクションの数だけDatabase Trigger、Pub/Subトピック、Dataflowパイプラインが発生することになります。図4では5つほどのコレクションが図示されていますが実際には50を超えるコレクション数に対応する必要があります。これでは到底プロダクション運用には耐えられません。また、書き込みを行うにはBigQueryテーブルがあらかじめ作成されている必要があるため、新しいコレクションがMongoDB側で作らると同時に、対応するBigQueryテーブルを作成しておく必要があります。

Pub/SubトピックとDataflowパイプラインの問題については、以降の「Dataflowパイプラインの実装」で解説しますが、Dynamic Destinationという機能を利用して解決することができます。

Database Triggerが大量発生する問題については、直接的な解決策はありませんが、少なくとも関数そのものは一つに収めることができます。その関数を使うよう各コレクションにトリガーを設定するためのconfigファイルはどのみち逐一作成する必要があるのですが、そこはCIで自動化することができます。以下でその方法をざっくり説明します。

Realmプロジェクトのリポジトリに、トリガーを設定したいコレクションを羅列したテキストファイルを作成します。CIはそのファイルの更新を検知し、新しくコレクションが追加されたらそのコレクションに対してDatabase Triggerを設定するconfigファイルを追加し、デプロイします。それと同時に新しいコレクションに対応するBigQueryテーブルを作成することで、書き込み時のエラーを回避できます。

他の解決策として、定期的にMongoDBのコレクションを列挙するクローラをCloud Functionsなどで走らせておき、その都度全てのコレクションに対応するBigQueryテーブルが存在するかどうかを調べて、なければ新しく作成するという手段があります。これは手動でリストに追加する必要がないため楽ではありますが、毎度BigQueryにコレクションの数だけ問い合わせを行うのが非常に非効率であったため、CIで巻き取る手法を採用しました。

Dataflowパイプラインの実装

Pub/Subトピックからデータを受け取り、実際にBigQueryへの書き込みを行うDataflowパイプラインを実装していきます。2023年10月時点でBigQuery CDCをサポートしているのはJava SDKのみのため、Javaで実装します。

パイプラインでは、Pub/Subトピックから受け取ったメッセージのスキーマを定義するクラスと、先に述べたDynamic Destinationのクラス、そして実際にBigQueryテーブルに書き込みを行いパイプラインをデプロイする処理を記述する必要があります。

スキーマ定義クラスの記述

Pub/Subトピックから受け取ったメッセージのスキーマ定義クラスに関しては、Database Triggerの関数で作ったデータ型をBigQueryのTableRow型に変換するというのが主な役割ですが、BigQuery CDCを実現するために追加のメソッドが必要になります。これらを記述したのがMutationSchemaクラスです。

@DefaultCoder(AvroCoder.class)
  public static class MutationSchema {
    public String id;
    public String source_data;
    public String destination_table;
    public String timestamp;
    public MutationType mutation_type;

    public MutationSchema() {}

    public MutationSchema(String id, String source_data, String timestamp, String mutation_type, String destination_table) {
      this.id = id;
      this.source_data = source_data;
      this.destination_table = destination_table;
      this.timestamp = timestamp;
      if (mutation_type.equals("UPSERT")) {
        this.mutation_type = MutationType.UPSERT;
      } else if (mutation_type.equals("DELETE")) {
        this.mutation_type = MutationType.DELETE;
      } else {
        throw new IllegalArgumentException("Unexpected Mutation Type");
      }
    }
    
    public static TableRow ConvertToTableRow(MutationSchema x) {
      TableRow tRow = new TableRow()
        .set("id", x.id)
        .set("source_data", x.source_data)
        .set("timestamp", new Timestamp(System.currentTimeMillis()).toString());
      System.out.println("converted tRow following:");
      System.out.println(tRow);
      return tRow;
    }
    public static RowMutationInformation MutateRow(MutationSchema x) {
      RowMutationInformation rmi = RowMutationInformation.of(x.mutation_type, System.currentTimeMillis());
      return rmi;
    }
  }

一番下に記述したメソッドMutateRowがBigQuery CDCを実現するキモになります。先ほどから採算_CHANGE_TYPEが云々などと言ってきたのですが、実はTableRowに_CHANGE_TYPEカラムを入れることはなく、Java SDKではRowMutationInformationというクラスでBigQuery CDCを実現します。2023年10月時点でこの部分を解説したドキュメントは存在せず、JavaDocとソースコードを読んでクラスの使い方を理解する以外に方法はなさそうです。

Dynamic Destinationの記述

Dynamic Destinationとは、Pub/Subトピックから受け取ったメッセージに応じて、書き込むBigQueryテーブルを動的に変更できる機能です。これを利用することで、大量発生するはずだったPub/SubトピックとDataflowパイプラインはそれぞれ一つで済むのです。

y_5.png
(図5. Dynamic Destinationを利用した際のフロー)

public static class MyDynamicDestination extends DynamicDestinations<MutationSchema, String> {
  @Override
  public String getDestination(ValueInSingleWindow<MutationSchema> element) {
    return element.getValue().destination_table;
  }
  @Override
  public TableDestination getTable(String destination) {
    return new TableDestination(
      new TableReference().setProjectId(PROJECT_ID).setDatasetId(DATASET_ID).setTableId(destination),
      "destination table" + destination
    );
  }
  @Override
  public TableSchema getSchema(String destination) {
    TableSchema schema = generateTableSchema();
    return schema;
  }
}

public static TableSchema generateTableSchema() {
  List<TableFieldSchema> fields = new ArrayList<>();
  fields.add(new TableFieldSchema().setName("id").setType("STRING"));
  fields.add(new TableFieldSchema().setName("source_data").setType("STRING"));
  fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
  TableSchema schema = new TableSchema().setFields(fields);
  return schema;
}

入力のMutationSchemaクラスが持っているdestination_tableをそのまま宛先テーブル名として用います。Database Triggerの関数でデータベースとコレクション名をつなぎ合わせたものがdestination_tableなので、コレクションごとにBigQueryのテーブル名はユニークになります。DynamicDestinationに関してはこちらのブログが参考になります。

テーブル書き込みとパイプラインデプロイの記述

パイプラインのオプションなどは別途クラスを作る必要がありますが、基本的な処理は上のクラスを定義できれば動くはずです。

public static void main(String[] args) {
    PipelineOptionsFactory.register(MyPipelineOptions.class);
    MyPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
      .withValidation()
      .as(MyPipelineOptions.class);

      options.setProjectId(PROJECT_ID);
      options.setRunner(DataflowRunner.class);
      options.setRegion(REGION);
      options.setGcpTempLocation(GCP_TEMP_LOCATION);
      options.setStreaming(true);
      options.setJobName(JOB_NAME);
      options.setPubSubSubscription(PUBSUB_SUBSCRIPTION);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<PubsubMessage> messages = pipeline.apply("ReadPubSubSubscription", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getPubSubSubscription()));
    PCollection<MutationSchema> tRow = messages.apply("TransformToMutationSchema", ParDo.of(new DoFn<PubsubMessage, MutationSchema>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        PubsubMessage document = c.element();
        String payload = new String(document.getPayload(), StandardCharsets.UTF_8);
        JSONObject jsonObject = new JSONObject(payload);
        MutationSchema row = new MutationSchema(
          jsonObject.getString("_id"),
          jsonObject.getString("source_data"),
          jsonObject.getString("timestamp"),
          jsonObject.getString("mutation_type"),
          jsonObject.getString("destination_table")
        );
        c.output(row);
      }
    }));
    tRow.apply("WriteTableRowToBQ", BigQueryIO.<MutationSchema>write()
      .to(new MyDynamicDestination())
      .withFormatFunction(MutationSchema::ConvertToTableRow)
      .withRowMutationInformationFn(MutationSchema::MutateRow)
      .withCreateDisposition(CreateDisposition.CREATE_NEVER) // CreateDisposition.CREATE_IF_NEEDEDはサポートされていない
      .withWriteDisposition(WriteDisposition.WRITE_APPEND)
      .withExtendedErrorInfo()
      .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
    );
    pipeline.run();
  }

BigQuery CDCではCREATE_IF_NEEDEDはサポートされてないため、明示的にCREATE_NEVERを指定し、STORAGE_API_AT_LEAST_ONCEでセマンティクスまで指定します。

今後の展望

ひとまず実現したかったリアルタイムレプリケーションは実現できたのですが、Database Trigger周りなどは特に、運用面での懸念点は多く残ります。私はインターンを卒業してプレイドを離れてしまいますが、今後発展を遂げてより良いツールに成長することを願っています。

また、検討段階で採用を諦めたBQ Subscriptionですが、2023年10月16日にCDCがサポートされました。Dynamic Destinationにはまだ対応していないため今すぐに今回のツールに組み込むことはできませんが、似たようなツールを開発する方はウォッチしておく価値はありそうです。