
Bigqueryの内部処理について徹底解剖してみた
Posted on
プレイドの @nashibao です。
弊社では結構BigQuery
を使い倒させていただいていて、社内向けのバッチ解析やChartIO
/ModeAnalytics
等を介した社外向けのレポーティングとしての利用だけでなく、ABテストなどの集計系では(Query数のコントロールのためにキャッシュは介していますが)ほぼ直接アプリケーションのバックエンドとして利用しています。
またStreaming Insertが思ったより安定しているので、ニアリアルタイムにイベントを反映することができ、適当なQueryを投げ込んでPBの集計を気軽にやって、数百万使って唖然としてとりあえず寝て忘れる、みたいなことをしています。
先日も"GCP NEXT World Tour in Tokyo"において、弊社 @makinoy がジョブズばりのプレゼンをさせてもらいました。
Google Cloud Platform 国内最大のイベント「GCP NEXT World Tour in Tokyo」にプレイドが登壇しました
さて、これだけBigQueryを使い込んでいる弊社ですが、仕組みについてはあまり外に出てないため、ほとんど知りません。
DremelのPaper(2010/VLDB) が6年前に出ていますが、その内容としては非常に単純なQuery Processingのケースのみを扱っていて、シンプルな分割統治型のツリー処理
と、独自の省スペースかつ効率的なNested Dataのカラムナ表現
の二点を中心に議論されています。論文の最後でも次のように締められています。
In the future, we plan to cover in more depth such areas as
formal algebraic specification, joins, extensibility mechanisms, etc.
後半に乗っているDremel
と既存のGoogle MapReduce/Sawzall
(Google 社内のMapReduce実装)とのパフォーマンス比較、キレイに線形にスケールする実験結果は一見の価値があります。ただ、カラムナについて独自のエンコーディング/FBMを使ったアセンブリ手法は、これがポッと作れてしまうGoogleはさすがだなぁ、と思う反面、カラムナにしたらそりゃパフォーマンスは上がるだろうという感じだし、割となんでもござれのHDDベースのMapReduceと比較して分割統治に特化した構造がパフォーマンスが出るのも当然だろ、という気がします。
それよりもむしろ、テラ/ペタサイズのJOINやGroup Byが入り組んだクエリが数秒~数十秒で返ってくる
という理解不能な事象について深掘りして欲しい気がします。つまりちょうどDremel
のペーパーの範囲外ですw
Amazon.co.jp: Google BigQuery Analytics 電子書籍: Jordan Tigani …
そしてようやく本題ですが、JOINを含めたQuery Processingについては2014年に出ているこちらの本の方が詳しいので、少し紹介したいと思います。
ちなみにタイトルは(かなり)盛り気味です。
先に感想
いくつかポイントがあると思いますが、僕が個人的に思うBigQueryのキモは次の3点です。(ちょっと想像で物を言っているので間違ってたら、、すみませんw)
- まずまずDisk IOの並列化(並みの並列化じゃない)
- スピード重視でメモリとネットワークを酷使する設計(並みの酷使っぷりじゃない)
- Slowなところを避ける代わりに出来なきゃ死ぬ設計。やり直せばいいという割り切り(その点逆にMapReduceはすごい)
Disk IOを並列化して、読込をボトルネックから解放するのはこれ系では前提感あります。それに加えてネットワークを酷使したり、Hash PartitionによるShufflingなどで、メモリオーバーでabortするリスクをとるポリシーあたりがHadoopを含めたMap Reduce界隈と違うのかな、と感じます。
基本構造とベーシックなクエリ処理
基本的な構造としては次のようになっているようです。
クエリはRoot/Mixer/Slot
の順で伝播します。その時Slotのみで解決できないQuery要素に関しては排除されて伝播されます。Slot
は実際にStorageレイヤと通信し最初の処理をするLeaf Server
のスレッドのことのようです。そして、処理結果がSlot/Mixer/Rootの順で伝播し、それぞれ統治処理が行われます。
例えばJOINやGroup Byが絡まず、単純な分割統治処理のようなケースの場合
SELECT corpus, word, word_count FROM tbl
WHERE LENGTH(word) > 4
ORDER BY word_count
DESC LIMIT 5
(word_countが最初からaggregationされているという現実にはなさそうなケースですが)単純に同じQueryがSlotに伝播され、それぞれのSlotが返す結果に対してMixerが同じQueryをかければ終了です。
Small Group By
次のようにGroup Byがある場合も分割統治型で処理されます。
SELECT corpus, word, SUM(word_count) AS total FROM tbl
WHERE ...
Group By corpus
ORDER BY total
DESC LIMIT 5
まず Order
句とLIMIT
句を排除してSlotに渡しクエリし、その結果をMixerに渡し、Mixerでは元のクエリをかけて終了です。ただしDistinctなKeyが多い場合メモリに乗り切らずabortします。
BroadCast Join
さてJoinですが、単純な場合としてJoinする片方がSlotにバラまけれるサイズであれば分割統治型の繰り返しで対応できます。(BigQueryでは 8mb
が閾値のようです)。こちらは
SELECT hoge FROM a
JOIN
(SELECT fuga FROM b) AS c
ON a.hoge = c.fuga
- サブクエリ
SELECT fugue FROM b
のみをSlotに配布 - Mixerに結果が返される
- 返された結果をインライン展開して、全体のクエリを配布
- Mixerに結果が集められ、Aggregationして終了
一つ面白いのは、例えば5000 Leafあれば 5000 x 8MB = 40GBのネットワークコストをかけている、というところです。
Shuffled Query(Big Join/Big Group By)
最後に複雑なケースとして、大規模なJOINと大規模なGroup Byのケースです。MapReduceと同じようにShuffleフェーズを組み合わせて処理をしますが、Merge Sortではなく、単純なHash Partitioningで分散します。つまり100台のLeafで処理していれば、keyに対して100でmodを取り対応する各Leafに配布します。
(元々は JOIN EACH
やGROUP EACH BY
といった特殊な句が用意されていましたが、自動で判断するようになったようなので、省きます。( Query Reference ))
SELECT hoge FROM a
JOIN
(SELECT fuga FROM b GROUP BY fuga) AS c
ON a.hoge = c.fuga
b
に関するサブクエリをLeafに配布する。(例えば100Leaf)fuga
を100個にHash Partitioning(shuffle
)し、Leaf間でデータを交換する- Leafでサブクエリを実行しMixerに終了したかどうかだけを返す(データは返さない)
- 次に
a
に関するクエリをLeafに配布する(例えば500Leaf) hoge
を500個にPartitionしLeaf間でデータを交換する- さらに3のデータも500個にPartitionし、Leaf間でデータを交換する(この処理がどこに挟まるかちょっと書いてなくて想像で書いてます)
- Shard上でJOINのクエリを実行し、Mixerに結果を返す
- Mixerで統合し終了
大きな注意としては、どでかいデータでHashが偏ると乗り切らずAbortする、という点です。
最後に
先に書いた感想と同じですが、割と古典的というか泥臭いやり方でShuffleを実装したりしているところ、ネットワーク/メモリを酷使し、合わないものはAbortするところなど、失敗を許容する代わりにパフォーマンスを出す設計が重要なように思います。
もう少しMapReduce界隈との設計思想の違いについても想いを馳せてみたかったんですが、図を作っていたら力尽きた感じがあるので、議論したい方は是非ともPlaidの社内勉強会等に来てください!
ウェブ接客プラットフォーム「KARTE」を運営するプレイドでは、 KARTEを支える技術に興味を持つエンジニア(インターンも!)を募集しています。
詳しくはプレイドの採用ページか、Wantedlyをご覧ください。