Blitz(前編): 自由度と即時更新性を担保したAggregation

Introduction

PLAIDでは9年ほど前から、大規模ストリーミングデータから高速に、Aggregation/Filteringをするモジュールを育ててきて、KARTEというプロダクトの基盤としてきています。Personalizeのようなタスクをツールにガシガシ組み込むために必要なモジュールで、かなりニッチな情報ということもあって外部にあまり発信する機会がありませんでした。(決して面倒だったからではありません。決して。。)

ただ、少し前にそのエンジンの刷新プロジェクトが終わったところで、そのプロジェクトの発信と併せて、初期コンセプト上の面白み(コード上は2度3度と刷新されていますが、初期コンセプトの面白い部分は踏襲されつつ新しいものが加わっています)もそれなりにあるし、発信してみようと思って書き始めました。

要求されること

現在は コードネーム “Blitz” と呼ばれる分析モジュールは、Personalizationに特化したもので、スキーマが固定されないJson形式での次のようなイベントストリーム

{
  $meta: {
    name: "nashibao",
    isMember: 1
  },
  $buy: {
    items: [{
			sku: "xxx",
			price: 1000,
    }]
  }
}

[図1: イベントデータ]

がuserIdに紐づいて飛んできた時に、次のようなフィルタリングを高速に処理することができるものです。

match("userId-xxx",
  DAY.Current('$meta.isMember', 'last') = 1,
  ALL.Current('$buy.items.price', 'avg') >= 10000,
  WEEK.Previous('$buy.items.price', 'sum') <= 100,
  WEEK.Previous('$session', 'count') > 10,
  ...
)

[図2: ルール定義の疑似コード]

これは “userId-xxx” が “会員” であり、1年間では購買単価が1万円以上で、先週のセッション数が10以上に関わらずほとんど買い物に繋がらなかったユーザー なのかどうかを検証するルールを示した疑似コードです。

この時に、厳しい性能要件をクリアするように作られています。

種類 説明
スケーラビリティ - High Trafficなユーザー数やイベント数に対してスケールする
- 時間経過と共に性能が劣化しない
- 時間経過と共にストレージコストが爆発しない
低レイテンシ - 数百ms以内にレスポンスする
新鮮性 - 計算にラグが発生しない = 数秒前のイベントも考慮される
(初期ver: 結果整合性 → 現在: 強整合性)
スキーマレス - 送信されるイベントは、スキーマが固定されない
- 多様なデータ型(Nest / Array含む)が送信される
即時更新性 - 検証ルールの変更は、数分でデプロイされる
- 過去データを遡って考慮する
自由度 - 多種多様なタイムウィンドウ、演算子に対応している

特にルールやイベントの自由度を高く維持する割に、過去データの再集計などでラグが発生せずにルールの変更を反映し、数百ms以内にレスポンスを返せる状態に持っていくことが非常に めんどくさい 難しい要件になります。

既存の基盤やフレームワークをカスタムして作る方針で初期は検討しましたが、なかなか要件に見合わなかったため、ストレージレイヤーより上部に関しては主に自社で構築しています。

Name スケーラビリティ 低レイテンシ 新鮮性 スキーマレス 即時更新性 自由度
Embed (単純に事前に処理を組み込んだもの) x x
Batch Processing (ex: MapReduce) x x -
Stream Processing (ex: Beam) - (過去データは捨てられる) x
Lambda Architecture ▲ (x 1sec未満) x
Blitz

SQLほどの自由度は持たせない代わりに、他の要件を満たすように作られています。考え方としては、Aggregation処理を事前処理と事後処理の2つの処理に分解しているという意味で、Lambda Architectureや一部のOLAPに非常に近く、それらをStream処理向けに実装したものと理解するのが近いかもしれません。

ストレージ&計算コスト

Space ComplexityTime Complexity = O(T)

通常、ルールを変更した上で過去データも含めて計算し直すためには、時系列データをスキャンする必要があるため、必要なストレージは経過時間(T)に比例して増えていきます。また処理によっては分散も可能ですが、計算コストもそのストレージサイズに比例して増えるので、結果的に時間に比例して増えていきます。

これに対してBlitzでは

Space ComplexityTime Complexity = O(D)

時系列でのデータを、後続処理での自由度を減らさないように送信イベントのスキーマのフィールド数(D)ごとに最小単位の統計値の集合に圧縮し(Pre-Aggregation処理)(この最小単位の統計値の計算の種類は実は大きいですが、固定値)、これらを検証ルールへの適合時にもう一度組み合わせる (Post-Aggregation処理) ことで、ストレージコスト、計算コスト共に、時間(T)ではなくイベントスキーマのフィールド数(D)に比例するように変更しています。

image.png

様々なタイムウィンドウと演算子を念頭に置き、多様な統計量を用意するために、事前にある程度大きな空間をAggregate軸(この場合ユーザー)ごとに用意して置く必要があります。そのため経過時間に対して、実は初期は過大に空間を食いますが、経過時間と共に処理後の方が空間・計算コスト共に効率的になっていく想定です。

image(1).png

Pre-Aggregation処理

処理のフローやアーキテクチャの考え方には面白いところがたくさんありますが、ここではストレージサイズに話を絞ることにし、Pre-Aggregation処理によるデータ変換をどのように行なっているかを簡単に紹介します。

統計的な圧縮処理(Pre-Aggregation)のデータフローの簡易的な図は以下です。BlitzではMapReduceのReduce処理と同じように、Tree構造で分解可能な処理のみを許しています。そうすることで処理の分散性を保つと共に、下に示すような統計値同士のRollupも可能にするため、さらにストレージサイズを下げることができます(たとえば、日のデータを7つ持っていれば週のデータを作れます)。

image(2).png

Aggregationの自由度や新鮮性を達成するために、Reduce処理の実態は非常に複雑で、CAS操作のようにEventそのものと様々なTimewindowの統計値を読み出し結合した上で、上書き(Swap)する操作を含んでいます。

どのようにRollupを計画するのか、競合をどのように排除するかも非常に面白い部分ですが、ここでは紹介しません。

このようにすることでRead時に消費したEventや古くなったSlotはどんどん破棄していくことができます。

image(3).png

最後に

さて前編はここまでにします。紹介した通り、この初期コンセプトの考え方は至ってシンプルなものです。しかし、机上ではシンプルですが、実際の実装上においては、Hight Trafficを裁くための技術やHot Spotを避けるための技術、強整合を保つための技術などなど、様々な面白さが詰まっています。

後編ではBlitzの開発メンバーがその実装に入り込んだ面白さを紹介します。