パーサー拡張・分散ジョブ制御・クエリ高速化 ── 内製OLAP「mila」の全レイヤーに挑んだ4ヶ月
Posted on
はじめに
こんにちは。
2026年の2月から5月まで、プレイドのCore Platformチームでエンジニアインターンとしてお世話になりました、高田晴成(たかた はるなり)と申します。慶應義塾大学環境情報学部を4月に卒業し、今年の9月からカナダの大学院で修士課程を始める予定です。
研究分野は耐障害性のある分散システムとデータベースで、プレイドが内製する分散OLAP(オンライン分析処理)データベースmilaに強く惹かれ、インターンに参加しました。
OLAPデータベースを内製する企業でのインターンは初めてで、プレイドの解析基盤として様々な仕組みを備えたmilaの開発に携わり、多くのことを学びました。この記事では、インターンでの取り組みと学びを振り返ります。
内製OLAPデータベースmilaとは
はじめに、私がインターンで開発に携わった内製OLAPデータベースの「mila」について説明します。
弊社ではこれまでに顧客データを中心とした様々な分析サービスを提供しており、蓄積されたデータの総量は8PBを超えています。エージェントを使った分析を含め、多種多様な顧客分析のニーズに対応してきましたが、その過程で、汎用的な分析ニーズを主眼に置いた既存の分析用DBでは解決が難しい課題があることも判明しました。
そこで、顧客分析に特化した内製OLAPデータベースの開発に着手していました。
顧客分析に特化した特徴の例として、ユーザーID統合、ユーザIDをキーにしたデータ配置、ファネル・リテンション分析などがあります。
milaの詳細に関する記事については、今後掲載予定です。
インターンでの取り組み
新規の独自拡張DDLの追加
2つのテーブル名をAtomicにスワップする以下のコマンドを実装しました。
ALTER TABLE <table 1> SWAP WITH <table 2>milaではSQLパーサーとしてApache Calciteを使用しており.ftl (FreeMarker Template Language)で記述することでカスタムのSQL構文ルールを追加することができます。
今回は、以下のルールを追加することでDDL文の拡張をしました。
SqlAlterTable SqlAlterTableSwap(Span s) :
{
boolean ifExists;
SqlIdentifier t1, t2;
}
{
ifExists = IfExistsOpt()
t1 = CompoundIdentifier() <SWAP><WITH> t2 = CompoundIdentifier()
{ return new SqlAlterTableSwapWith(s.end(this), ifExists, t1, t2); }
}次に、クエリを実際に実行する部分です。テーブル名などのメタデータはSpannerで管理しており、以下の3ステップをSpannerのトランザクション内で実行することで、Atomicなテーブル名の入れ替えを実現しました。
<tmp> <- <table1>
<table1> <- <table2>
<table2> <- <tmp>Jobのキャンセル機構の構築
milaにおいて、データベースのメンテナンスやデータの同期は、クエリ処理とは別にmila内のjob基盤で行われています。
具体的には、以下のようなjobの処理があります。
- データ取り込み - データソースのBigQueryからデータを同期
- ストレージ最適化 - 小さいファイルをマージ、不要ファイルを削除
- ガベージコレクション - 不要メタデータの削除
Jobの中には、処理に長時間かかるものもあり、その間に「やっぱり止めたい」と気づいても、途中で止める手段がありませんでした。
Jobのキャンセルができないと、間違ったジョブが完了するまでリソースを消費し続けてしまいます。特に、複数のコンポーネントから構成され、大量なデータを処理するmilaにおいてはこれは顕著な問題です。
そこで、Jobのキャンセル機構を実装しました。
当ブログでは、データの同期Jobを例に説明します。
同期Jobにおいては、主に以下の三つのコンポーネントが全体の処理を担います。
| コンポーネント | 役割 |
|---|---|
| Broker | 外部APIリクエストを受け付け、JobをRedisのキューに登録 |
| Controller | RedisからJobをpollし、定義されたステップを順番に実行して、状態をRedisに保存するバックグラウンドループ。 |
| Job-Worker | Redisからシャードタスクをpollし、BQ Storage Read APIでデータを取得して、milaフォーマット(body file / dict file)に変換しGCSに書き込むワーカー。 |
全体の流れは以下の通りです:
- クライアントがBrokerに対してJob開始のAPIを呼び出す
- BrokerがRedisにJobを登録
- ControllerがRedisをpoll、Jobを取得
- ControllerがシャードタスクをRedisのキューに登録
- Job-Worker
├─ Redisからタスクをpoll
├─ タスクを処理
└─ Redisに完了報告 - Controllerが全シャード完了を確認、Redisに完了フラグをset
基本方針
Redisに登録されたJobにキャンセルフラグを立て、各コンポーネントがpolling時にチェックするというシンプルな設計を採用しました。
- クライアントが /cancelJob API を呼び出す
- Broker が Redis キャンセルフラグをセット
- Controller /Job-worker が次のpolling時にフラグを検知
- 処理を中断し、リソースをクリーンアップ
キャンセルの検知はpolling形式で設計しました。ControllerもJob-workerも、もともとRedisを定期的にpollingする設計になっています。キャンセル通知のために新たなPush機構を入れるより、既存のpollingループにチェックを追加する方がシンプルで確実でした。
BigQueryジョブのキャンセル
ジョブによってはBigQueryなどの外部リソースへのクエリ発行を行うものもあります。
このようなケースでも、Controllerがキャンセルを検知したタイミングで、BigQuery側の処理に対して連動してキャンセルを実行します。
例えば、BigQueryへのクエリ発行後、その完了を待機している最中にキャンセル要求が発生した場合を考えます。このとき、Controllerは対象のBigQueryジョブIDを保持しているため、キャンセル検知と同時にBigQueryのAPIを呼び出して該当ジョブを即座に停止させることができます。この確実なキャンセル機構により、不要なクエリ実行に伴う課金を最小限に抑えることが可能です。
クエリの高速化
milaチームでは、内部的に他のDB(Clickhouse, Apache Druid, StarRocks)と性能比較を行なっており、特定のクエリパターンでは、パフォーマンスに改善の余地があることがわかっていました。
データソースをREES46として、ECサイトの商品アクセスランキングTOP100を取得するクエリを高速化するタスクを担いました。
SELECT
product_id,
COUNT() as access_count,
COUNT() FILTER(WHERE event_type = 'view') as view_count,
COUNT() FILTER(WHERE event_type = 'cart') as cart_count,
COUNT() FILTER(WHERE event_type = 'purchase') as purchase_count
FROM rees46_events
WHERE TIME_IN_INTERVAL(__time, '2019-10-01T00:00:00Z/2020-01-01T00:00:00Z')
AND product_id IS NOT NULL
GROUP BY product_id
ORDER BY access_count DESC, product_id
LIMIT 100高速化に取り組むにあたり、まずボトルネックを特定する必要がありました。そこでasync-Profilerを使用しました
async-profilerは低いオーバーヘッドで正確なデータを取得するために開発されたサンプリングプロファイラです。サンプリングは以下の手順で行われます。
- OSシグナルの送信:
Linuxの perf_events を使用し、対象のJavaスレッドに対して一定周期(例:10msごと)で SIGPROF(タイマーシグナル)を送信します。
- 割り込み処理の実行:
シグナルを受信したスレッドは、実行中の処理を一時中断し、割り込み処理プログラム(シグナルハンドラ)を実行します。
FlameGraph(フレームグラフ)は、収集したスタックトレースの統計データを視覚的に階層化して表示するグラフです。async-profilerで取得したプロファイリングの結果はFlameGraphに起こすことが可能です。
FlameGraphの見方は以下の通りです。
- 横軸(幅):全体のサンプリング数に占める、そのメソッドの実行時間の割合を示します。左から右への方向は時系列ではなく、アルファベット順などでソートされています。
- 縦軸(高さ):コールスタックの深さを示します。下から上に向かって、呼び出し元のメソッドから呼び出し先のメソッドへと積み上がります。
グラフ上で横幅が広く表示されているバーは、実行時間の占有率が高い処理、すなわちシステムのボトルネックの可能性が高い箇所です。
- 最上部に位置する広いバー:そのメソッド自体の処理(自メソッド内の計算など)に時間がかかっています。
- 下部〜中堅に位置する広いバー:そのメソッドから呼び出されている複数の子メソッドの実行時間の合計が長いことを示します。
この横幅の広いバー(占有率の高い処理)を特定し、該当コードのロジックや呼び出し頻度を見直すことで、効率的なパフォーマンスチューニングが可能になります。
async-profilerのプロファイリングを基に実施したパフォーマンス改善事例をいくつか紹介します。
ハッシュテーブルの高速化
1. メモリアクセス回数の削減による高速化
背景
Javaの標準的なHashMapはヒープ上にオブジェクトを生成するため、大量のグループキーを扱う場合にGC負荷が問題になります。milaでDirectBufferを使用してオフヒープメモリを直接操作することで、GCの負荷を下げ、安定した集計処理を実現しています。
// apache.datasketchesのWritableMemoryImplインスタンス
WritableMemory memory = WritableMemory.writableWrap(
directBuffer, // DirectByteBuffer(ネイティブメモリ)
ByteOrder.nativeOrder(),
memoryRequestServer
);対象の処理は単体では極めて短時間ですが、プロファイリングの結果、呼び出しの回数が非常に高いことが判明しました。したがって、1回あたりの処理時間をわずかでも削減することが、クエリ全体の高速化に大きく貢献すると判断しました。
変更内容
8バイトキーのハッシュ値計算において、memory.getInt() を2回呼び出していた処理を、memory.getLong() の1回呼び出しへと統合。取得した8バイトのデータから、CPUのビットシフト演算を用いて必要な値を抽出する方式に変更しました。
// 変更前:
// 1回目:メモリから低位の4バイト(int)を取ってくる
int lower_data = memory.getInt(position);
// 2回目:メモリから高位の4バイト(int)を取ってくる
int upper_data = memory.getInt(position + 4);
// ハッシュ値を計算して返す
return 31 * (31 + lower_data) + upper_data;// 変更後:
// 1回だけ:メモリから8バイト(long)を丸ごと一括で取ってくる
long combined_data = memory.getLong(position);
// 【CPU内部で処理】
// キャスト(型変換)するだけで、下位32ビットが取り出せる
int lower_data = (int) combined_data;
// 右に32ビット論理シフトするだけで、上位32ビットが取り出せる
int upper_data = (int) (combined_data >>> 32);
// ハッシュ値を計算して返す(計算式はまったく同じ)
return 31 * (31 + lower_data) + upper_data; 効果
コストの高いメモリアクセス命令の回数を半減させ、当処理を行う関数の実行時間が41.4%削減されました。
2. 8バイトアライメント調整によるハッシュテーブルアクセス速度低下の防止
背景
GROUP BYクエリでは、グループキー(例:user_id)ごとに集計値(例:COUNT(*)やSUM(amount))を保持する必要があります。このとき、数百万〜数千万のグループキーを効率的に管理するため、ハッシュテーブルを使用しています。
ハッシュ値計算とは別に、ハッシュテーブル本体へのキー・値の読み書き(getLong/putLong)もプロファイリングでホットスポットとして現れていました。コードを確認したところ、データが8バイト境界に揃っていないケースがあり、メモリにアラインされていない状態でアクセスを行っていることが判明しました。
変更内容
getLong/putLong を行うデータ構造(MemoryOpenHashTable)において、各オフセットとバケットサイズを8バイト境界に整列(アライメント)させる変更を行いました。
効果
メモリ使用量は増加するものの、境界またぎによるCPUの余分な読み込み・結合処理を排除し、アクセス速度の低下を防止しました。
3. ヒープの再構築のオーバーヘッドの除去
背景
「アクセス数の多い商品TOP 100」のようなクエリでは、数百万件のデータから上位100件だけを取得します。このとき、全データをメモリに保持してからソートすると、メモリを大量に消費してしまいます。この問題を解決するため、ヒープ構造(優先度付きキュー)が採用されていました。ヒープには常に最小値(または最大値)が先頭にあるという性質があります。
データ追加時の動作:
- 新しいデータが来たら、ヒープに追加
- ヒープのサイズが100件を超えたら、最小値(=100位以下)を削除
- 結果として、常に上位100件だけがメモリに保持される
この方式により、何百万件のデータが流れてきても、メモリには常に100件分しか保持しません。データ追加時のヒープ操作は高速(O(log N))なので、この段階では効率的です。
しかし、問題は最終結果を取り出す段階(drain処理)にありました。ヒープから要素を1件取り出すたびに、残りの要素を並べ替える「再構築」が発生します。100件取り出す場合、計100回の再構築オーバーヘッドが生じていました。FlameGraphにも大きな山として表示されており、ボトルネックであることは明らかでした。
変更内容
データの追加時はヒープを使い、取り出し時は別の方式を使うよう変更しました:
- 追加時: 従来通りヒープを使用(上位100件だけをメモリに保持)
- 取り出し時: 配列にコピー → 1回だけソート → 順次出力
効果
ヒープの少ないメモリで上位N件を追跡できるというメリットは維持しつつ、取り出し時の再構築オーバーヘッドを排除しました。結果、クエリの全体処理時間が0.7倍ほどに短縮されました。
milaの開発を通じて得た学び
4ヶ月間、milaの開発にどっぷり浸かり、多くのことを学びました。
ここでは、特に印象深かった3つの学びを紹介します。
1. データベース開発のイロハ
milaは、データフォーマットからクエリエンジンに至るまで、ほとんどのコンポーネントを内製開発している特徴的なデータ基盤です。
プレイドが抱える膨大なトラフィックを捌くためのチューニングや、OLAPならではの大規模データを高速に処理するための工夫が、システムの随所に施されています。
普段データベースを利用したり、概念を勉強しているだけではわからない、実装上の緻密な最適化やデータ構造・アルゴリズムがパフォーマンスに与える直接的な影響を、身をもって体感することができました。
クエリの高速化を例に挙げると、以前は「GroupByは遅い」程度の理解しかありませんでした。実際にGroupByエンジンのコードを読み、なぜボトルネックになりやすいのかがコードレベルでわかり、その上でハッシュテーブルの構造やメモリ配置がどうパフォーマンスに効くのかを目の当たりにしました。
既存のデータベースを利用するだけでは見えにくい内部の仕組みを、内製開発の視点から体験できたことは大きな財産になりました。
2. 実践的な分散システム設計
milaは、Broker, Computing Node, Job-Worker, Redisなど、役割の異なる複数のコンポーネントから構成される複雑な分散システムです。
大学で分散システムの研究をしており、理論的な難しさは理解しているつもりでした。しかし、本番環境で大量のリクエストを捌く生きたシステムの開発を経験し、その難しさを改めて痛感しました。
特に印象深かったのは、クエリのキャンセル機構の開発です。
分散システムでは、ネットワークの遅延やメッセージが届くタイミングによって、システム全体の挙動がパターンのように変化します。
- どのタイミングでキャンセルリクエストが届くか
- その時、各コンポーネントはどの処理フェーズにいるのか
これらによって、キャンセルに参加すべきコンポーネントや必要なクリーンアップ処理が動的に変わるため、あらゆるエッジケースを想定した設計が求められました。机上の研究だけでは味わえない、リアルな分散システム開発の醍醐味と複雑さを肌で感じることができました。
3. パフォーマンス改善の手法
milaのクエリ高速化というミッションを通じて、システムソフトウェアにおける再現性のあるパフォーマンス改善サイクルを体得しました。
具体的には、勘に頼るのではなく、以下のようなエンジニアリングアプローチを徹底しました。
- プロファイリング(計測): async-profilerや、FlameGraph 等でシステムの性能やリソース消費を記録、可視化する。
- ボトルネックの特定(観察): グラフやデータを観察し、どこで処理が滞っているのか、真の原因(ボトルネック)を突き止める。
- コードリーディングと対策(改善): 該当箇所のコードを深く読み込み、アルゴリズムの最適化や無駄な処理の削減を施す。
この「計測 → 観察 → コード読解 → 改善」のサイクルは、データベース開発に限らず、あらゆるシステムソフトウェアのパフォーマンス改善に活きる汎用的なスキルだと実感しています。
プレイドで働いて感じたこと
プレイドの皆様、そしてmilaチームの皆様、4ヶ月間大変お世話になりました。
振り返ると、刺激が多く、快適に働ける素晴らしい職場環境でした。特に魅力的だと感じた点を紹介します。
風通しの良さ
milaチームはメンバー間のコミュニケーションが活発で、課題に対して綿密に議論しながら進めていく姿勢がとても印象的でした。
フラットに意見を交わし、最適解を導き出すカルチャーは大きな学びになりました。
高い技術力と技術への熱量
milaチームはメンバーの皆様がそれぞれ高い技術力を持っており、問題に直面しても必ず相談できる方がいて心強いです。
日頃からmilaの開発に役立ちそうな記事や論文を積極的に共有し、どう活かせるかを議論する姿、DEIMなどの学会で先端研究を調査する姿には、エンジニアとして多くの刺激を受けました。
ウェルカムランチ制度と他職種との交流
入社直後にウェルカムランチ制度を通じて、チーム内外の方とフランクに話す機会を頂けました。
エンジニア以外の職種の方からも直接業務の話を伺え、それぞれの専門分野で熱量を持って仕事に取り組む姿が伝わってきて、刺激になりました。
最強のコーヒーメーカーがある
エンジニアとして働く上で重要なのが集中力です。
集中力を高める上で重要なのがカフェインです。
また、美味しくカフェインを摂取することでリラックスにもつながります。
プレイドのオフィスには優れたコーヒーメーカーがあり、私は出社をしたら必ず利用してコーヒーを飲んでいました。また、メンターの方と作業を分担してカフェラテを作ったのもいい思い出です。
終わりに
4ヶ月間のインターンを通じて、OLAPデータベースの内部実装から分散システムの設計、パフォーマンスチューニングまで、幅広い経験を積むことができました。
特に、実際のプロダクション環境で8PBものデータを扱うシステムの開発に携われたことは、大学での研究だけでは得られない貴重な経験でした。理論と実践のギャップを埋め、エンジニアとして一段成長できたと感じています。
9月からはカナダの大学院で分散システムとデータベースの研究を続けます。milaでの経験は、研究の方向性を考える上でも大きな糧になると確信しています。
最後に、日々の業務で丁寧に指導してくださったmilaチームの皆様、インターンの機会をくださったプレイドの皆様に心より感謝申し上げます。
ありがとうございました。