OLAP-DBでのreal-time ingestionをカラムナに最適化する技術

こんにちは、プレイドでエンジニアをしている oga です。

この記事は社内勉強会の内容をブログ化したものになっていて、
OLAPデータベースにおける高速化の技術 - PLAID engineer blog
を踏まえ、今回は書き込み処理のテクニックにフォーカスした内容になっています。(是非上記のエントリもご覧ください)

この記事では、カラムナフォーマットの利点を最大限活かすために書き込み処理、特にリアルタイムの書き込み処理における技術的な課題とOSS DBの具体的なアプローチを紹介したいと思います。

Ingestionとは?

昨今のアプリケーション開発では、いわゆるOLTPに最適な各種RDBをはじめ、高速なR/Wを実現するKVS、大量データを集約するDWH、OLAPに特化したデータベースなど要件に応じてさまざまなデータベースを組み合わせて扱うケースが多いと思います。

Ingestionとは、あるデータソースからその目的に応じて最適なDBへデータを投入させる意味にはなりますが、データパイプラインの文脈ではOLTPが得意なDBや大規模なオブジェクトストレージまたはストリームデータソースから、大量データの加工や読み取りに適したDBにデータを投入することを指すことが多いです。

また、そのIngestionのパターンには、

  • 発生するデータを、リアルタイムに取り込む Streaming ingestion
  • 一定の周期(経過時間や蓄積されたデータ量など)で、まとめてデータを取り込む Batch ingestion

があります。

本来はデータソースと書き込み先DBの間のETLパイプラインを含めた意味合いがありますが、今回は特に書き込み先のOLAPデータベースの内部について深掘りしていきます。

Streaming型とBatch型

  • Streaming型
    • アーキテクチャパターンはいくつかあるが、MQ(Kafkaなど)にデータを滞留させそのブローカーが逐次DBへの書き込みを行う形が多い
    • リアルタイム性を持つことができる
    • 高いスループットを出す為に分散処理が必要になるが、高い書き込み頻度と並列性はカラムナフォーマットと相性が悪い
  • Batch型
    • 十分に大きなレコード数・サイズでデータをまとめて書き込む方法
    • カラムナフォーマットとは相性が良く、高速かつ効率的にデータを投入できる
    • データの鮮度が大きく落ちる
    • 複雑なデータフローではデータ量が急激に変動でクライアントサイドでデータを集約することが困難になる(スケールさせづらい)

Streaming型では特に主要なユースケースとしてImmutableデータを扱う例が一般的で(ログデータなどの時系列データ)、Immutableデータをリアルタイムに書き込むためにDB側にさまざまな工夫がなされています。

mutableデータを扱う場合はさらに考慮すべき点が増え、技術的なアプローチが変わってきます。

ImmutableとMutable

  • Immutable(不変) データ
    • いわゆるログ
      • クリックなどの行動ログ
      • 実行された売買取引のログ
      • システムのログ
      • etc…
    • 特性
      • データ量が非常に膨大
      • 時系列での集計処理が行われやすい
  • Mutable(可変) データ
    • いわゆるトランザクションデータ
      • 会員情報(名前やデモグラ、統計値など)
      • 売買取引のステータス
      • etc…
    • 特性
      • データ量は利用者数などと比例するが大抵は限定的

カラムナフォーマットの向き不向き

Streaming Batch
Immutable
本来不向きだが、最もニーズがある部分で各DBでさまざまな工夫が取られている。
メインのお題
△ ~ ◎
先にあげた問題がクリアできる前提だが、最もシンプル。
ログでは無いデータの生成頻度が低いデータはBatchで行う、などハイブリッドで構築するケースも多く、個人的にはある程度データ鮮度に制約を乗せておけば大抵はBatchのみの構成がベストのことが多いと感じる。
Mutable × ~ △
不向き。このようなデータフローの構築の必要性も大抵は低い。
ただ、それを可能にし得る方法も登場してきている。
サブのお題

内容次第(データ量や頻度)で運用可能。
Bulk updateの方法はDBによって異なるが、差分更新する形で会員情報テーブルを周期取り込みするケースなど。

Streaming型のIngestionとカラムナフォーマットの相性の悪さ

カラムナフォーマットは大まかに言えば、データを列毎にソートし圧縮することで高い圧縮効率が得られます。
そのように作られたデータファイルがあるとき、データを追加する際にどの位置にデータを挿入すべきか特定することは困難です。(つまりカラムナフォーマットのデータファイルは原則Immutable)

Batch型であれば大量のデータを一度に扱うため、一度の処理で扱うデータをまとめて同じデータファイルに書き込むだけで簡単です。

Streaming型のように次々と新たなデータの書き込み処理が発生する場合に、そのたびにデータファイルを生成するという形になると何が問題か、どのように解決していくかを具体を踏まえて見ていきます。

各DBがどのようなテクニックでリアルタイムに大量のデータ書き込みを実現しているのか

https://clickhouse.com/

ClickHouseはオープンソースの列指向型DBで、高速な大量データの集計処理に特化しています。

今回はClickHouseのアーキテクチャを解説する形で進めていきます。

Buffer table

書き込みリクエストをノードのインメモリ上に保持し、一定の周期でストレージレイヤーにflushするBuffer tableと呼ばれるものが存在します。
Untitled(9).png
https://clickhouse.com/blog/asynchronous-data-inserts-in-clickhouse より抜粋

① クライアントからのinsertリクエストを受信
② データがインメモリ上のBuffer tableに一時的に格納される
③ Buffer tableがトリガー条件(Bufferのサイズや任意の時間)でPartファイルにflushされる
④ ③のflushが正常完了するとinsertにACKが返却される(逆に失敗したらエラーと認識できる)

(Partとはパーティションキー毎に分割されるデータのストレージレイヤーの最小単位。データやIndex、metadataが含まれる)

これによって先述の「Streaming型のように次々と新たなデータの書き込み処理が発生する場合に、そのたびにデータファイルを生成する」という問題を解消することができます。

ただし、クライアントの多重度やデータ流入のリズム、パーティション設計などの要因と合わせ基本的にデータ量に比例してPartファイル数も増え、read時のファイル読み込み数が多くなりパフォーマンスが出ないケースも多くなります。(まとめて圧縮することで圧縮率が大きく高まるというカラムナフォーマットの利点を活かせず、また大量のファイルから読み込み結合する必要がありクエリ時のパフォーマンスが劣化してしまう)

その為、このBuffer tableに加えMerge treeという機構が重要な役割を持ちます。

より詳細な解説は画像引用元の以下ブログもご参照ください。

https://clickhouse.com/blog/asynchronous-data-inserts-in-clickhouse

ちなみにClickHouseでは書き込みの動作を同期と非同期を選択することが可能で、バッチ型の処理がクライアントサイドで十分可能であればシンプルな構造で使用することもできます。

同期書き込みモードの場合このBuffer tableの考え方はなくinsertの度にPartが生成される形になります。
Untitled(10).png
https://clickhouse.com/blog/asynchronous-data-inserts-in-clickhouse より抜粋

MergeTree

Buffer tableで登場した書き込みの最小単位であるPartファイルは高速な書き込みを最小限のリソースで行えるものの、
immutableであるためパーティション設計やバッファサイズ、クライアント側の分散度合いによってもファイルが分散しすぎてしまうことが多いです。

この問題を解消し、カラムナフォーマットの利点を最大限活かすためにあるのがMergeTreeという仕組みです。

Untitled(11).png
https://clickhouse.com/blog/supercharge-your-clickhouse-data-loads-part1 より抜粋 図1

Untitled(12).png
https://clickhouse.com/blog/supercharge-your-clickhouse-data-loads-part1 より抜粋 図2

Untitled(13).png
https://clickhouse.com/blog/supercharge-your-clickhouse-data-loads-part1 より抜粋 図3

図1
Beffer tableのflushは実際はPartの前にBlockという状態が存在する。
Beffer tableの内容がソートされてBlockに保持され、それが対になるPartに圧縮されて書き込まれる。

図2
書き込み処理とは分離されたバックグラウンド処理でPartをマージしていく。
= 2つ以上のPartを、解凍(Block化)→ソートし結合(Merged block化)→圧縮(Merged part化) する。

図3
Merged partの生成は閾値(150GB)に達するまで繰り返されていく。
MergeされたPartはマーキングされ読み取られることが無くなり、一定周期で消えていく。

このように、細分化されたPartファイルを順次結合し直していくことでread時のパフォーマンスを向上させることができます。

こちらもより詳細な解説は画像引用元の以下ブログもご参照ください。

https://clickhouse.com/blog/supercharge-your-clickhouse-data-loads-part1

ちなみに、Merge treeの処理過程で重複排除を行ったり集約処理をかけることも可能です。

Untitled(14).png
https://clickhouse.com/blog/supercharge-your-clickhouse-data-loads-part1 より抜粋

データ一貫性

上記のMerge treeで指定するキー・パーティション内における重複排除を自動で行うことができる。その為、Buffer tableの部分で紹介した一連のやり取りの中で、クライアントサイドのクラッシュ等によりflushはできているがACKを認識できず再送できなかった場合でも、再度クエリするだけで回復できるものになっている。(結果整合的なのは、主要な他DBでも同じ)

他の代表的なDBでは?

など他の代表的なDBでもこのあたりの考え方は共通する部分も多く差分という形で捉えるとより理解がしやすくなります。

Partは「Segment」と呼ばれるものだったり、MergeTreeは「Auto compaction」や「Rollup」がそれにあたります。

それぞれこのレイヤーよりも読み込みまで含めたトータルで特徴が見えてくるものが多い。

(今回はそこまで書き出すと長くなるので、ベースの考え方に絞ってここまでとします)

Mutableデータは扱えないのか

mutateが困難な理由はimmutableデータのstreaming ingestionにおける工夫が必要な点と同じですが、updateやdeleteを行うことが不可能な訳ではなく大抵のDBで実行することが可能です。

実際に行う場合は、対象となるレコードのデータが含まれるデータファイルを読み込み必要な変更を行なった後、その結果を全く新しいデータファイルとして保存する流れになります。

Untitled(15).png

Clickhouseの例 https://clickhouse.com/blog/common-getting-started-issues-with-clickhouse#3-mutation-pain より抜粋

高頻度、あるいは超長期間に渡るデータ更新などで大量にこのような処理が(大量かつ巨大なデータファイルの読み書き)発生すると大量のリソースが必要になってしまいます。

mutableデータは特性上データ量はImmutableと比較すると限定的なものの、大量なデータになり得るケースも無くは無いです。

大量処理が必要なケースを考えた時にどのような解決方法があるのかも紹介します。

単純に大量リソースの消費を受け入れる

頻度とコストの試算の上で受け入れて運用していくという形はあり得ると思います。

特にアドホックな処理の場合は特にその開発リソースを他にかけるという意思決定の方が価値がある場合が多いです。

OLAPデータベースの進化

  • Clickhouse
    • Lightweight UPDATE/DELETE
      • 通常は変更後Partファイルの再生成まで結果に変更が反映されないのを即時反映にする機能
      • 内部的にはどうやらmutateをマーキングするレイヤーがあり、SELECT時にそのレイヤーの情報を被せているイメージのよう(つまり本質的な負荷は変わらない)
  • Pinot
    • Stream Ingestion with Upsert
      • Pinotはラムダアーキテクチャ的な構造に当初からなっている
      • リアルタイムレイヤーにあたるRealtime serverではインメモリで「Mutable segment」という形でmutateが高速に可能なレイアウトでカラムナを保持しているらしい
      • Uberの事例は非常にわかりやすい。(オーダーデータは大量でリアルタイムな集計のインパクトが大きい。そして一定時間でImmutableになる)
      • 基本的にはある程度の周期でImmutable segmentにするべきらしいので一時的にMutableなデータを特に想定されている気がする

HTAPデータベースの利用

Sparkを組み合わせてHTAPとして構築するケースも多かったのが、最近は純粋に1DBにHTAP機能が内包されたデータベースが進化してきています。

基本は、従来の行指向ベースのデータベースとして振る舞い、内部でカラムナフォーマットが別途生成される流れで、クエリを実行する際オプティマイザによりカラムナでの処理が最適と判断され処理されます。

いずれも行指向がベースのためmutableデータが前提となっており、ETLパイプラインの構築もいらずシンプルな構成でOLTP/OLAPが実現できるのが大きな利点です。

ただし、データやワークロードの特性によっては特化したOLAPデータベースやDWHには敵わない部分も多いので現時点でHTAPで全てが解決するというレベルでは無いです。

ex)

  • 大規模データのELTはDWHが適していることが多い(一度行指向を挟む必要がある為。DWHからWriteBackするパターンはある)
  • 時系列データならRollupで最適化ができるOLAPデータベースがパフォーマンスが良い場合がある
  • mutateを前提としているためカラムナストレージ側でも書き込みにより最適化されている為、書き込み性能は高いが読み込み性能の分が悪い(こともあるが、データ性質次第ではあり問題にならないと言えるケースもある)
  • OLAPにユースケースが偏っている、などのケースではランニングコストは専用DBを組み合わせて使用した際より高くつく
データベース 提供会社 特徴 カラムナストレージ
AlloyDB Google Cloud Google Cloudのマネージドサービスであり、PostgreSQL互換、高性能かつ拡張性に優れたデータベースソリューション。マルチクラウドやオンプレミス環境でのデータモビリティを提供。 インメモリ型
Heatwave Oracle MySQLデータベースサービスのためのインメモリクエリアクセラレータ。トランザクション処理とリアルタイム分析を並行して行う能力を提供し、大規模な並列処理が可能。 インメモリ型
TiDB PingCAP 分散SQLデータベースであり、MySQLとの高い互換性を持つ。リアルタイム分析とトランザクション処理を1つのデータベースで処理可能。水平スケーリングや自己回復機能を持つ。 ストレージ型
Unistore Snowflake Snowflakeとシームレスに繋がるHTAPデータベース。まだ情報が少ない。 ストレージ型

ちなみに、プレイドでもAlloyDB、TiDBは試験導入と評価を行っておりました。本格導入まではまだ至っていませんが、複雑なデータパイプラインや多種DB運用から解放されたシンプルなアーキテクチャを実現する方法として大いに期待しています。

参考

まとめると

  • IngestionはStreaming型とBatch型があり、Batch型は構成がシンプルでカラムナフォーマットとの相性が良いがデータ鮮度を犠牲にしたり規模によってはスケーラビリティに問題が生じる
  • Streaming型はカラムナフォーマットへの最適化が必要になる
    • カラムナのデータファイルへのランダムアクセスは困難なため原則Immutable
    • 高頻度で書き込み処理を行いその度にデータファイルを作っているとカラムナフォーマットの利点である圧縮効率の良さを活かせない
    • データファイルの数が膨大になれば、クエリのパフォーマンスが悪くなる
    • Storage I/Oが非常に多くなりボトルネックが生じる
  • OLAPデータベースはその最適化を内包している(まるで内部に小さなBatch型ETLがある)
    • 基本はBufferingのレイヤーがいてデータファイルを可能な限りまとまったデータで作る
    • それでも多くなるため、バックグラウンドで理想的なサイズに結合して再圧縮を繰り返している
  • これらは特にImmutableデータを想定している
    • mutableデータの場合は効率が悪いためさらに別のアプローチが必要
    • データベース選択自体の幅を広げる可能性も最近増えてきている