
機械学習 x PySparkでアルゴリズム開発をサクサクやる検証
Posted on
こんにちは、プレイドの @nashibao です。
ちょっと前になりますが、PySparkの勉強会向けにPySparkの機械学習ライブラリであるmllib
を弄ってみたりして面白かったので共有しようと思います。
コンテンツ
- 実験の動機
- PySparkの紹介
MLLib
のアルゴリズムの拡張- 感想
この検証はPyData Tokyo #3の発表にかこつけて行われたので、その時の発表資料も貼っておきます。
PyData Tokyo Meetup #3での発表資料
実験の動機
分散環境での機械学習は実装が面倒なので、どこで実験するかは結構重要かな、と思います。その中でPythonが研究者周りでよく使われてる & ライブラリが豊富 & Spark扱いやすそう、というのもあってPySparkは人気のある候補な気がします。
ちなみにプレイドでは購買系やその他ディープなデータを数PBレベルでいじくりまわして楽しむことができる環境がありますので、興味がある方は是非ともこちら(Wantedly)の方をポチッとしてください!
PySparkの紹介
PySparkそのものの紹介はいろんなところであると思いますが(slideshare.net/iktakahiro/pyspark1など)、コードを見るのが一番理解が早いと思うので、このエントリでは後半の流れと簡単さからPageRank
を例に追いかけてみようと思います。
PageRankは、Googleが利用している最もよく知られたリンク解析手法のひとつです。リンク解析としては、他にもHITSやTrustRankなどが知られています。PageRank
はその中でも比較的単純な部類に入る手法です。
PageRank
は主成分分析の一つとして捉えることが出来ますが、リンク上を遷移する集団の定常状態を計算したもの
として直感的に捉えることができます。
rank(u)
: ページuにいる確率cnt(u)
: ページuの総リンク数α
: どれかひとつのリンクをクリックする確率
とすると、1つのiterationは次のように書けます。
これはPySparkを使って下記のようにMapReduce側の実装をほぼ意識せずにゴリゴリ書けます。
from operator import add
# links
# URL [neighbor URL]
# URL [neighbor URL]
# ...
# ranks
# URL 1.0
# URL 1.0
# ...
def computeContribs(urls, rank):
cnt_urls = len(urls)
for url in urls:
yield (url, rank / cnt_urls)
for iteration in range(int(sys.argv[2])):
contribs = links.join(ranks).flatMap(
lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
ranks = contribs.reduceByKey(add).mapValues(lambda rank: (1 - alpha) + alpha * rank)
参考:
https://github.com/smutneja03/PageRank_Pyspark/blob/master/pagerank.py
https://github.com/apache/spark/blob/master/examples/src/main/python/pagerank.py
よく見てみるとflatMap
のところでcontrib
を計算し、reduceByKey
のところでsum
を計算しています。比較的直感的に実装に落とし込めていることがわかる気がしますが、これを通常の分散環境での実装に落とそうとすると(たとえばMapReduceなど)、ここまで見通しよく書くのは至難の技だと思います。
反対に実装が隠蔽されているため、パフォーマンスチューニングなどは難しくなるかもしれません。上の例で言えば、mapValues
、join
, flatMap
が一つ目のMap
処理になり、reduceByKey
がReduce
処理になるんだろうなとは思いますが実際の挙動はコード上からはわからないです。
ちなみに前段で書いた通り固有値分解としてPageRank
を捉えることもでき、固有値分解をMapReduce
で計算する方法もしられている(C Chu: 2007)ので、そちらを使ったPySparkの実装などもできそうです。ここでは触れないですが、興味がある人は実装してみてください。
試しに上記のコードをwikipedia_jpにかけてみると次のように、一般的な用語ばっかりでてきておもしろくない感じになりますw
(u'日本', 41.11417911980084)
(u'アメリカ合衆国', 25.727987714812645)
(u'2004年', 22.378072677346033)
(u'2006年', 21.613018044381537)
(u'鉄道駅', 21.523966541611397)
(u'2005年', 21.498401773500778)
(u'4日', 20.89361325094468)
(u'昭和', 20.877112907965273)
(u'2007年', 20.54134162971899)
(u'2003年', 19.59822590116101)
(u'2008年', 18.547461486087336)
(u'日本の鉄道駅一覧', 17.476217631862756)
(u'1987年', 17.310971901161757)
(u'東京都', 17.133036902763976)
(u'2002年', 17.100045312106694)
(u'2000年', 16.59660273572512)
(u'2001年', 16.424965975651848)
(u'2009年', 16.354841071773176)
(u'プラットホーム', 16.022050486743105)
(u'イギリス', 15.522258306083586)
...
アルゴリズムの拡張
さて、本題のpysparkでアルゴリズム開発やってみよう的な話ですが、mllib
の中のレコメンデーションのアルゴリズムの一部を変更してみようと思います。
mllib
のレコメンデーションで使われているMatrix FactorizationはNLPなどでよく使われる教師なし学習の一種です。行列演算の延長で実装できることが多いため、比較的パフォーマンスを確保しやすいのが特徴です。目的関数も学習手法も様々な手法が提案されていますが、mllib
ではALS(Alternating Least Square) が使われています。ALSのUpdateの実装は次のように書けます。
# ALSのupdate則
def update_als(x, W, H, V):
w = W[x, :]
v = V[x, :]
m = H.shape[0]
k = H.shape[1]
HtH = H.T * H
HtVt = H.T * v.T
for i in range(k):
HtH[i, i] += LAMBDA * m
return np.linalg.solve(HtH, HtVt)
全体コードはここにあげておきます。注意点としては、上記のコードではパラメータがオンメモリで処理できることを期待している点です。MapReduceを利用したMFでは通常はパラメータ行列も分散したまま解けるようになっていることが期待されます。参考1 参考2 本来はその分散の仕方が面白いところだとは思いますが、割愛します。
さて、本題であるアルゴリズムのアップデートをやってみようと思います。ALSは比較的収束は速いが収束保証がなく、収束値の近くで遅くなりがちな性質があります。そこで収束保証がされているが、あまり収束が速くはないGMU(Gaussian Multiplicative Update)というものに切り替えることで収束を改善してみます。GMUのアップデートの式は次のようになります。
ParseError: KaTeX parse error: Expected '}', got '−' at position 37: …{WHH^{T} + 10^{−̲9}}
実装は次のように書けます。
# GMUのupdate則
def update_gaussian_mu(x, W, H, V):
w = W[x, :]
v = V[x, :]
return multiply(w, (v * H) / (w * (H.T * H) + 10**-9)).T
結果は上記のスライド
に載せてあります。
駆け足ですが、PySparkを使うと見通しよくアルゴリズムを付け替えできることがわかるかと思います。
参考:
-
Distributed Nonnegative Matrix Factorization for
Web-Scale Dyadic Data Analysis on MapReduce -
ALS Algorithms for the Nonnegative Matrix Factorization in Text Mining
-
Music Recommendations at Scale with Spark - Christopher Johnson (Spotify)
感想
SparkはS3をそのままいじれるので、S3に入っているサービスのログとかに単純なアルゴリズムを試してみるにはかなり良さげです。割と直感的にいじれるのでゴリゴリ改良できるかもしれません。
MLlibのような機械学習ライブラリはなかなか流行ってこなかったりして、流行らないとupdateも遅かったりするので、直にPySpark等を使っていじくりまわすのはありかな、と個人的には思います。
ちなみにプレイドでは古いコールドデータの多くはBigQueryに置いてるので、BigqueryのSpark Connectorあたりを使うのも模索中です。購買系のディープなデータをゴリゴリいじくり回してみたいエンジニア(or 研究者 or インターン)大大募集中です!
最後に
ウェブ接客プラットフォーム「KARTE」を運営するプレイドでは、
KARTEを支える技術に興味を持つエンジニア(インターンも!)を募集しています。
詳しくはこちら(Wantedly)をご覧ください。
もしくはこちらのボタンよりお気軽に「話を聞きに行きたい」と押してください!