マルチプロダクトにおける全操作を網羅する監査ログ収集基盤:設計思想と実装 - 抽出編

はじめに


こんにちは。プレイドのKARTE Datahubチームでエンジニアをしている安海です。

この記事は「マルチプロダクトにおける全操作を網羅する監査ログ収集基盤:設計思想と実装 - 収集編」の続編です。
前回は監査ログの収集基盤について解説しましたが、今回は収集した監査ログをどのように抽出し、ユーザーに提供しているのか、そのアーキテクチャと技術的な工夫について詳しく解説します。
監査ログの抽出基盤を設計・運用する際に直面した課題と、それに対する解決策を共有することで、同様の課題に取り組むエンジニアの一助となれば幸いです。

抽出基盤のアーキテクチャ


監査ログの抽出基盤は、「他プロジェクトのデータが混在しない安全性」と「開発者の実装のしやすさ」を両立することを目指して設計されています。具体的には、以下のようなアーキテクチャを採用しています。

auditlog_table_function_architecture.png

なぜテーブル関数を採用したのか

まず、監査ログの抽出基盤でなぜ BigQueryのテーブル関数 を採用したのかについて説明したいと思います。テーブル関数は、SQLクエリをカプセル化し、関数として再利用できるBigQueryの機能です。通常のテーブルと同様にクエリ内で参照でき、さらに引数を受け取れる点が大きな特徴です。ビューと異なり、実行時にパラメータを渡せるため、同一のロジックを様々な条件で使い回すことができます。

例えば、以下のようにシンプルなテーブル関数を定義できます。

CREATE OR REPLACE TABLE FUNCTION project_dataset.get_project_logs(project_id STRING, start_date DATE, end_date DATE)
AS (
  SELECT *
  FROM `logs.audit_logs`
  WHERE project_id = project_id
    AND DATE(timestamp) BETWEEN start_date AND end_date
);

このテーブル関数は、クエリから次のように参照できます。

SELECT *
FROM project_dataset.get_project_logs('project-123', DATE('2023-01-01'), DATE('2023-01-31'))
WHERE severity = 'ERROR'
ORDER BY timestamp DESC
LIMIT 100;

テーブル関数を採用した主な理由は、クエリ実行時にパラメータを受け取れる柔軟性にあります。特に監査ログでは、ユーザーが任意の期間のログを抽出できることが重要な要件でした。テーブル関数を使用することで、日付範囲などのパラメータを実行時に指定でき、ユーザーは必要なデータだけを効率的に取得できます。

さらに、生ログは日時でパーティショニングされているため、日付範囲を指定することでパーティションプルーニングが働きます。この仕組みにより、不要なデータスキャンを最小限に抑え、クエリのパフォーマンスを向上させることが可能です。

また、SQLのロジックをテーブル関数内にカプセル化できるため、クエリの再利用性が高まることもメリットです。これにより、複雑なロジックを一箇所に集約し、メンテナンス性を向上させることができます。

テーブル関数の役割と設計

抽出基盤の中核となるのは3種類のテーブル関数です。これらの役割と設計思想について解説します。

1. プロジェクトログ抽出テーブル関数

この基盤層のテーブル関数は、引数で指定されたプロジェクトIDに対応するログだけを抽出する役割を担っています。他プロジェクトのログが露出するリスクを最小化するため、この関数は変更頻度が低くなるような設計としています。

単一の責務に特化することで、SQLの複雑性を抑制し、堅牢性を高めています。この層がセキュリティの要となるため、変更する際にはGithubで指定された CODE_OWNER のコードレビュープロセスを経る必要があります。

2. プロダクト別変換テーブル関数

プレイドでは複数のプロダクトを展開しており、各プロダクトでクライアント向け監査ログへの変換方法が異なります。そこで、プロダクトごとに専用の変換テーブル関数を用意し、それぞれのデータ構造に合わせた変換ロジックを定義できるようにしています。

実際のデータ変換の例はこのような感じです。例えば、以下のような生ログのJSON構造があるとします:

{
  "jsonPayload": {
    "user": {
      "id": "user_12345",
      "name": "山田太郎",
      "email": "yamada@example.com"
    },
    "http_request_meta": {
      "ip": "192.168.1.1",
      "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)..."
    },
    "resource": {
      "type": "dataset",
      "id": "dataset_789",
      "action_type": "update"
    },
    "detail": [
      {
        "detailType": "diff",
        "new": {"dataset_name": "分析用データセット2023"},
        "old": {"dataset_name": "分析用データセット"}
      },
      {
        "detailType": "fields",
        "value": {"created_at": "2023-05-01T09:00:00Z"}
      }
    ],
    "timestamp": "2023-05-01T10:15:30Z"
  }
}

このような生ログから、ユーザーに見せる監査ログとして以下のような形式に変換したいとします:

user_id ip_address action resource_type resource_id timestamp before_edit after_edit
user_12345 192.168.1.1 update dataset dataset_789 2023-05-01T10:15:30Z { "dataset_name": "分析用データセット" } { "dataset_name": "分析用データセット2023" }

このような変換を行うために、各プロダクトチームはどのフィールドをどのように抽出・変換するかをマッピング定義として記述します。これにより、生ログの複雑なネスト構造から必要な情報だけを取り出し、ユーザーにとって理解しやすい形式に整形できます。

この層では宣言的なアプローチを採用しており、各プロダクトチームは変換マッピングを定義するだけで、自動的にSQL文(テーブル関数)が生成される仕組みを整えています。これにより、SQLを意識しなくても監査ログの変換ロジックを実装できます。

マッピングの仕様:

  • 基本構造
    • map 内の key は整形後の監査ログのカラム名を指定します
    • columnName には生ログのデータパスを指定します
    • type にはデータ型(STRING, JSON など)を指定します
  • detail フィールドの処理
    • detail フィールドには比較的自由な形式のデータが格納されるため、特殊なマッピングを指定できます:
      • detailType - 処理対象のデータタイプ(diff/fields/error など)
      • detailFieldsKey - オブジェクト内の抽出したいキー名

実際に detail に格納されるデータの例
※ 同一 detailType のオブジェクトが複数入らないようにしています

[
  {detailType: "diff", new: {dataset_name: "hoge"} old: { dataset_name: "fuga" }},
  {detailType: "fields", value: { hoge: 1, fuga: "piyo" }},
  {detailType: "error", message: "this is error" }
]

以下は変換マッピングの例です。このマッピングでは、ユーザーID、IPアドレス、編集前の値などを抽出する方法を定義しています。

また、より柔軟なデータ抽出が必要な場合には、customセクションを利用してカスタムのSELECT文を記述できます。これにより、既存のパターンでは対応できない特殊な要件にも対応可能です。例えば、特定の条件に基づいてデータを加工したり、集約した結果を取得することができます。

{
  map: {
    user_id: { columnName: 'user.id', type: 'STRING' },
    ip_address: { columnName: 'http_request_meta.ip', type: 'STRING' },
    before_edit: {
      columnName: detail,
      type: 'JSON',
      detailType: 'diff',
      detailFieldsKey: 'old',
    },
  },
  custom: {
    selectQueries: [
      `TO_JSON(ANY_VALUE(IF(STRING(detail.detailType) = 'fields' AND CAST(jsonPayload.resource.action_type AS STRING) = 'create', detail.value, NULL))) AS created_value`,
    ],
  },
  version: '1'
};

このマッピング定義から、以下のようなSQLが自動生成されます。

SELECT
  CAST(jsonPayload.user.id AS STRING) AS user_id,
  CAST(jsonPayload.http_request_meta.ip AS STRING) AS ip_address,
  TO_JSON(ANY_VALUE(IF(detail.detailType = 'diff', detail.old, NULL))) AS before_edit,
  TO_JSON(ANY_VALUE(IF(STRING(detail.detailType) = 'fields' AND CAST(jsonPayload.resource.action_type AS STRING) = 'create', detail.value, NULL))) AS created_value
FROM
  sampleTableFunction(api_key, start_date, end_date)
  ...

3. プロジェクト別テーブル関数

最終的なデータ提供層として、プロジェクトごとにデータセットを分け、その中にプロダクトごとのテーブル関数を配置しています。この設計には二つの利点があります。

  1. 将来的にBigQuery sharing経由などでデータセットごとユーザーに提供する可能性への対応
  2. マッピング変更時の影響範囲の局所化

このテーブル関数は基本的に中間層(プロダクト別変換テーブル関数)を参照するだけの構造になっています。そのため、中間層の変換ロジックが変更された場合でも、すべてのプロジェクトデータセット内のテーブル関数を更新する必要がなく、運用負荷を大幅に削減できます。

テーブル関数の自動生成と管理

続いて、テーブル関数の作成・更新プロセスについて説明していきます。構成としては、以下のようになっており、基本的に自動化されています。

auditlog_table_function_automation.png

プロジェクトログ抽出テーブル関数の管理

基盤層のテーブル関数は、リポジトリ内のコード変更をトリガーにGitHub Actionsを通じて更新されます。(構成図の左側参照) この重要なコンポーネントはCODE_OWNERで厳格に管理されており、特定のレビュアーの承認がなければ変更できない仕組みになっています。

プロダクト別テーブル関数の自動生成

各プロダクトチームは、マッピング定義を更新するだけで、GitHub Actions経由でSQLが自動生成され、対応するテーブル関数が更新されます。(構成図の左側参照)

プロジェクト別データセット・テーブル関数の自動生成

新しいプロジェクトのデータが生ログに追加された場合、自動的に対応するデータセットとテーブル関数が作成される仕組みを実装しています。(構成図の右側参照) 具体的には、Cloud Schedulerで定期的にCloud Run関数を起動し、直近の生ログをスキャンして新規プロジェクトを検出します。

検出された新しいプロジェクトに対しては、データセットの作成とテーブル関数の生成が自動的に行われます。データセット・テーブル関数が作成済みのプロジェクトに関する情報は、BigQueryの簡易的なテーブルを使い以下のように管理しています。

project_id product version created_at
xxx talk 0 2025-04-30 02:50:19.420000 UTC
xxx datahub 0 2025-04-30 02:50:19.420000 UTC
xxx datahub 1 2025-04-30 02:51:19.420000 UTC

保守運用の工夫


監査ログ基盤の運用には、データの安全性確保と長期的な保守性の両立が求められます。ここでは、その課題に対応するための技術的な工夫を紹介します。

効率的なデータダウンロード機構

監査ログは大量のデータを扱うため、効率的なダウンロード機構が必要です。そこで、BigQueryのストリーミング機能を活用し、データを10MBごとに圧縮してクライアントに送信する仕組みを実装しています。

また、ユーザビリティ向上のための工夫として、生ログに含まれるprojectIdなどの単なる識別子を、ストリーミング処理中にアプリケーションのデータベースから取得したプロジェクト情報と結合し、 株式会社プレイド検証環境 のようなユーザーが理解しやすい表記に変換しています。

この仕組みは現状は十分に対応できていますが、予期せぬ大量データなどによりエラーが発生した場合は、Datadogにエラーを通知するようにしているため、迅速に対応できるようになっています。

多層的なセーフティネット

他プロジェクトのデータが混入するリスクを最小化するため、複数のセーフティネットを設けています。

まず、テーブル関数のセキュリティ監視として、Cloud Scheduler と Cloud Run 関数を組み合わせ、定期的にテーブル関数にアクセスして返されるデータを検証しています。他プロジェクトのデータが検出された場合は、即時 Datadog のアラートが発報される仕組みです。

さらに、ユーザーがデータをダウンロードするストーリミング処理にも、BigQueryで二重チェックを行い、他プロジェクトのログが含まれていないことを確認しています。万一の混入が検出された場合は、ダウンロード処理を中止する安全機構を導入しています。

継続的なサービス監視

監査ログは機能的な性質上、月に一回ダウンロードされるなど、日常的に頻繁に使用される機能ではありません。そのため、問題が長期間気づかれず放置されてしまうリスクがあります。この課題に対応するため、Datadog Synthetics (外形監視ツール)を活用して、画面から定期的にダウンロードを行い実際にファイルがダウンロードできるかのチェックしています。

これにより、実際のユーザーが機能を使用する前に問題を発見し、迅速に対応することができる体制を整えています。

テーブル関数のバージョン管理

監査ログのカラム構成や形式は、プロダクトの進化に合わせて変更される可能性があります。変更があった場合でも、既存のシステムやユーザーの利用を妨げないために、テーブル関数にはバージョン管理機能を導入しています。

マッピング定義にversionパラメータを指定することで、新しいバージョンのテーブル関数を追加しつつ、既存のバージョンも並行して維持できます。これにより、ユーザーは自身のニーズに合わせたバージョンを選択できるようになり、互換性を保ちながら監査ログを変更させることができるようになっています。

まとめ


監査ログの抽出基盤は、セキュリティとユーザビリティのバランスを取りながら、開発者が容易に実装・拡張できる設計を目指しています。多層構造のテーブル関数、自動生成の仕組み、複数のセーフティネットにより、安全かつ柔軟な監査ログ提供を実現しています。

マルチプロダクト環境では、新しいプロダクトの追加や既存プロダクトの変更が頻繁に発生しますが、この基盤により、各プロダクトチームは複雑なセキュリティ実装をほとんど意識せずに、安全に監査ログを拡張できます。プロダクト間の監査ログの一貫性を保ちながらも、それぞれのプロダクト特有の情報を柔軟に取り込める設計が、マルチプロダクト戦略を支える重要な要素となっています。

今後も新たな要件やプロダクトの追加に対応しながら、監査ログ基盤の進化を続けていく予定です。また、さらなる活用方法として、機密情報を適切に除去した上で社内の調査・分析用途に監査ログを活用することも検討しています。例えば、長期間使用されていないアカウントの洗い出しやプロダクト使用状況分析などを通じて、セキュリティの強化やサービス改善に繋げていくことが可能です。このように監査ログは単なる記録としてだけでなく、データドリブンな意思決定を支える貴重な情報源となる可能性を秘めています。