PLAID Engineer Blog

PLAID Engineer Blog


PLAID Engineer Blog

大規模データを解析するストリーミングアルゴリズムをサクッと見てみる

Yuki MakinoYuki Makino

こんにちは、プレイドの @makinoy といいます。

私たちはKARTEというサービスを提供していますが、解析色が非常に強いサービスなので社内では解析周りのアルゴリズムの利用検討をよく行っています。
その流れで、最近ストリーミングアルゴリズムについて検討したので、代表的なものをちょっとここで書いてみることにします。

この紹介では、”大規模データ解析のためのストリーミングアルゴリズムをサクっと見てみる”をテーマにしようと思います。

ストリーミングアルゴリズムとは、一連のデータの流れであるストリームを処理するアルゴリズムです。
データは次々にやってくるので、基本的にひとつのデータ単位(イベント)に対して1回だけ処理をするのが特徴です。大量なデータを処理するために、ひとつのイベントに対して、短い計算時間で限られたメモリを使うところも特徴のひとつです。

そのアルゴリズムの代表的なものを、適当にピックアップして紹介しようと思います。
精度や理論的保証の話が肝なのですが、サクッと見てみるという目的のため今回は省きます。また実際に使う場合は理論的保証があるとはいえ、実用上値が真の値と大きくぶれてしまうこともあるので、目的に合わせて実験してみるのも大事です。が、それも今回は省きます。

数を数えるストリーミングアルゴリズム

最近ではあまり遭遇しそうにないですが、例えば非常に多くのデータストリームを扱っていて、そのイベント数を数えたいとします。そういう時に使えるメモリが非常に限られているとした場合に使える、割りと古いアルゴリズムを紹介します。

Morris近似カウンティングアルゴリズム (Morris approximate counting algorithm)(http://algo.inria.fr/flajolet/Publications/Flajolet85c.pdf) です。
このアルゴリズムは、データストリームのイベントの数を数えます。 疑似コードはこんな感じです。

counter <- 0  
for event in stream  
  r = random number from 0 to 1
  if r < p
    then c <- c + 1

確率的に(モンテカルロ法的に)カウンタを増やしていくことで大まかなイベントの数を数えていきます。わかりやすくp=1/2とすると、さっくり行ってしまえば1/2のメモリ使用量でカウントができ、p=2^-cとすると、2の乗数の桁数を見積もることができます。
この場合、メモリ使用量は可算の最大の数をMとするとlog2 M (10進数をbitで保存) から log2 log2 M (10進数の桁+1をbitで保存) に節約できることになります。

頻出するイベントを計算するストリーミングアルゴリズム

次は、ストリームの中で頻出するイベントを計算するアルゴリズムをいくつか紹介します。

完全に正しく頻出するイベントのリストを得るには、基本的には出現するイベントのパターンとその出現数をすべて記録していく必要があり、メモリ使用量が非常に大きくなってしまいます。また、最終的に頻度順にソートする必要があるため、その計算コストもかなり大きくなります(比較ソートであればO(n log n))。
それに対してこれから説明するアルゴリズムは、ある程度の誤差を許容することによって、保持する必要のあるメモリ量を大胆に減らします。

まずは、Majorityアルゴリズム ( http://www.cs.utexas.edu/~moore/best-ideas/mjrty/ )です。 疑似コードはこんな感じです。

count ← 0  
e <- null  
for event in the stream  
    if count == 0
        e <- event
    if event equals to e
        count <- count + 1
    else
        count <- count - 1

最頻出のイベント用にカウンタとイベントを記憶しておくレジスタを1個ずつ用意して、そのイベントが出現したらカウンタを1増やし、そうでなければ1減らすという非常にシンプルでわかりやすいアルゴリズムです。半数以上の同じイベントがストリームを占める場合、この方法で最頻出イベントがわかります。

次はFrequentアルゴリズム (http://erikdemaine.org/papers/NetworkStats_TR2002/paper.pdf )です。 疑似コードはこんな感じです。

k: Top k

eventToCount <- new Map  
for event in the stream  
  if  not eventToCount.contains(event)
    if eventToCount.size < k
      eventToCount.put(event, 1) 
    else if an event z whose count is zero exists
      eventToCount.remove(z)
      eventToCount.put(event, 1)
    else
      // すべてのカウントを1ずつ減らす
      for (event, count) in eventToCount
        eventToCount(count, count - 1)  
  else
      eventToCount.put(event, eventToCount.get(event) + 1)

Majorityアルゴリズムを最頻出だけのeventだけでなくk件の複数のイベントを保持するよう拡張したものです。

LossyCountアルゴリズム (http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.19.8594&rep=rep1&type=pdf ) です。
疑似コードはこんな感じです。

k: Top k

n <- 0  
eventToCount <- new Map  
for event in the stream  
  n <- n + 1
  if  not eventToCount.contains(event)
     eventToCount.put(event, 1 + ∆)
  else
     eventToCount.put(event,  eventToCount.get(event) + 1)
  if  |_ n/k _| != ∆
     ∆ = |_ n/k _|
    for (event, count) in eventToCount
      if count <  ∆ 
         eventToCount.remove(event)

Frequentアルゴリズムを発展させたものです。もし出現回数がマップに含まれていない場合は、Δはその時点での頻度誤差であり、Δ+1でスタートし、Δが更新されたタイミングで、Δより小さいカウンタをすべて削除します。

SpaceSavingアルゴリズム (http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.114.9563&rep=rep1&type=pdf) です。
疑似コードはこんな感じです。

k: Top k

eventToCount <- new Map  
for event in the stream  
  if  not eventToCount.contains(event)
     if eventToCount.size < k
        eventToCount.put(event, 1)
     else
        minEvent <- null
        minCount <- +∞
        for (event, count) in eventToCount
            if count < minCount
                minCount = count
                minEvent = event
        // 保持している最小カウントのイベントと置き換え
        eventToCount.remove(minEvent)  
        eventToCount.put(event, minCount + 1)
  else 
    eventToCount.put(event,  eventToCount.get(event) + 1)

こちらもFrequentアルゴリズムを発展させたものです。あふれた場合に、その時点での最小のものとスワップし、その値+1からスタートします。保持するeventとカウントのペアがk個で済むのが特徴です。

ざっくり言ってしまうとLossyCountingアルゴリズム、SpaceSavingアルゴリズムは、Frequentアルゴリズムと比較すると、それまでの出現回数を考慮するので、誤差を抑えることができます。

Count-Min Sketchアルゴリズム (http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf) です。
疑似コードはこんな感じです。

// 幅w 深さd の二次元配列を用意 w, d は最大誤差と最小確率によって設定。詳しくは論文を。
a <- a[w][d]  
h1(x), h2(x), ... hd(x) : xはイベント ハッシュ関数をd個用意

for event in stream  
  for i in 0..d
    index <- hi(event)
    a[index][i] <- a[index][i] + 1 

function count(event) {  
  event: input event
  min <- +∞
  for i in 0..d
    index <- hi(event)
    count = a[index][i]
    if count < min
      min = count
  return min
}

このアルゴリズムは、いままでのアルゴリズムとは違い、頻出イベントを記録するのではなく、イベントの出現回数を見積もります。
複数のハッシュ関数を使いeventを元にそれぞれindexを計算し、そのindexに対応する場所にカウンタを持たせ数を数えていきます。出現回数を取り出すときは、対応する複数のカウントから最小の値を取り出します。

最頻出のイベントをリストすることはできませんが、すべてのイベントに対して見積もり数を計算することができ、さらに最小値がHashの衝突で大きくなってしまうことがあっても小さく見積もってしまうことはありません。

異なり数を数えるストリーミングアルゴリズム

次は、eventのユニークな数(異なり数)を数えるアルゴリズムであるHyperLogLog (http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) を紹介します。

データの集合の出現する異なり数を数えるのは、簡単そうに思えて実は結構難しい問題です。なぜ難しいかといえば、大きく二つ難しさがあって、圧縮するとはいえ全てのパターンを記憶する必要があること、全体でのユニークネスを取るために同期が必要なことです。

例えば、その問題を解決し動的なテキスト辞書を作成するために、簡潔データ構造を使って、こういうテクニックが使われていたりします。(http://www-01.ibm.com/common/ssi/cgi-bin/ssialias?subtype=ST&infotype=SA&htmlfid=CO112689JPJA&attachment=CO112689JPJA.PDF) 個人的な話ですが、その昔その実装の並列性を上げる作業などを少しやったりしました。

前置きが長くなってしまいました。疑似コードはこんな感じです。

ρ(x): xはビット列  一番左にある1bitの位置を返す 
M : [b] b個のレジスタを用意。  
m : 2^b  
h(x) : xはイベント イベントからhash値を計算する関数。精度によってbの個数、hash空間のサイズを決める。  
am : mに依存する定数。下を参照。

for i in [0..M]  
  M[i] <- -∞
for event in stream  
  x = h(event) 
    // 最初のb bitをレジスタを選ぶのに使う
  j = 1 + <x1, x2, ... xb>2
  // 残りのbit列をρにかける
  w = xb+1 xb+2 ...
  M[j] = max(M[j], ρ(w))

function unique_count() {  
  Z =  Σ 2^-M[j]
  return am * m^2 / Z
}

$$ \alpha_m = m\left(\int_0^\infty\left(\log_2\left( \frac{2+u}{1+u}\right)\right)^{m}du\right)^{-1} $$

肝は、Hash値上で0が続く確率が小さいことを利用して、それまで出てきた最大の0が続く数pによって2^pで見積もるところです。複数のレジスタを用意して平均を計算することによって乱雑さによる誤差を抑えます。

計算結果をρによってHash値をならんだ0の数だけ Log Log X オーダーで 保存しているのでメモリ使用量が非常に小さく抑えられます。

その値の最大値を計算する時に工夫した調和平均を使用し、誤差を抑えているところがHyperたる所以で、シンプルに最大値を幾何平均するLogLogアルゴリズムが先にありました。ちなみにSuperLogLogというアルゴリズムもあるようです。面白いですね。

似たような手法にMinHash ( https://en.wikipedia.org/wiki/MinHash )と呼ばれるHash値の最小から類似性を計算する手法があります。これらで使われているようなHash等で計算した集約された値のみ保存する方法はScketchingと呼ばれています。

Synopsis (Summary) 構造

代表的なストリーミングアルゴリズムを紹介しました。これらのストリーミングアルゴリズムに共通するのは、計算処理結果を集約して小さいメモリで保存し、それを次の計算に利用するというところです。そういうデータ構造をSynopsis (Summary) 構造と呼びます。

サンプリングした値、今回紹介した中にもあったHashを使った値を保存するSketch、他にも今回紹介しなかった、ヒストグラム、wavletと呼ばれるものもSynopsis構造に含まれます。また、広義にはSumやMaxなどの記述統計量も含まれると思います。今回紹介できなかった構造に関しては、また改めて紹介できればと思います。

データ処理エンジンに支えられたKARTE

プレイドでは、接客するお客様のこれまでの行動情報のデータストリームを処理し、瞬時に最新の集約した値で更新する独自の解析エンジンを日々開発しています。そのエンジン計算結果を利用し、お客様の最新の状態に合わせてリアルタイムに接客するためのサービスKARTEを提供しています。

最後に

ウェブ接客プラットフォーム「KARTE」を運営するプレイドでは、 KARTEを支える技術に興味を持つエンジニア(インターンも!)を募集しています。

詳しくはこちら(Wantedly)をご覧ください。 もしくはこちらのボタンよりお気軽に「話を聞きに行きたい」と押してください!

Yuki Makino
Author

Yuki Makino

Comments