KARTEにおけるKubeflow Pipelineの活用

こんにちは、エンジニアの @kargo @nichimu です。KARTEのDatahubというプロダクトに携わっており、KARTEのデータ及びお客様のデータを活用したプロダクトの開発を行っています。
今回は、KARTEにおいて活用しているコンテナネイティブのワークフロー基盤であるKubeflow Pipelineの組み込みについての概要をご紹介したいと思います。

Kubeflow とは

ここ数年でMLOpsという言葉が浸透するようになり、機械学習の実運用についての具体例がたくさん見受けられるようになりました。
Kubeflow はそのような機械学習の実運用をスムーズに行うためのオープンソースのプラットフォームです。Kubernetes上で動作することを前提とされているので、GKEやEKS、オンプレミスのKubernetes基盤にデプロイすることができます。直近では最新のv1.0が公開されているようにかなりのスピードで発展してきており、コミュニティも盛んです。
Kubeflowは様々な機能がありますが、今回はその中でも Kubeflow Pipeline というコンテナのワークフロー基盤を活用した事例についてご紹介していきます。
Kubeflow Pipelineの基本的な使い方についてはそれだけで1つのブログが書けるボリュームですので、以下の内容では使い方については触れません。それを前提とした上で、活用方法について述べていきます。基本的な使い方はドキュメントがわかりやすいので是非ともご参考にして頂ければと思います。

Components について

弊社では、KARTEに蓄積されたデータを用いて、クラスタリング・需要予測・レコメンデーション・購買予測・不正検知など様々な取り組みを行っています。
Kubeflow Pipelineで動かすPipelineは大きく分けて

  • 学習用パイプライン
  • 推論用パイプライン

の2つがあります。これらを各タスクごとに管理しています。

各タスクごとにコンポーネントはなるべく共通化したく、TFX の概念を大いに参考にさせて頂きました。TFXは機械学習タスクに必要なコンポーネントが丁寧に抽象化されており、今回は以下の図のようなコンポーネントを参考にしました。

実際に学習用パイプラインで構築したコンポーネントは

  • ExampleGen
  • SchemaGen
  • Transform
  • Trainer
  • Evaluator
  • Pusher

の6つです。

ExampleGenでは、BQにある膨大なデータからクエリによりデータを抽出
SchemaGenでは、抽出したデータのスキーマを自動で判定
Transformでは、LabelEncorder等により前処理を実行
Trainerでは、GridSearch等を用いて最適なモデルの学習
Evaluatorでは、学習済モデルの評価を行い、モデルの妥当性を検証
Pusherでは、評価されたモデルをデプロイする
といった実装になっています。
これらをPipelineとして単純に直列に以下の図のように繋いでいます。
各コンポーネントでのデータの受け渡しはGCSを用いており、データのPATHを受け渡すようにしています。

推論パイプラインでは、
ExampleGen, SchemaGen, Transformerまでは同様で、これにデプロイしたモデルを用いたPredictorというコンポーネントを直列に繋いで推論を行っています。

また、直列で繋いでいる理由として大量のリソースを消費するものに関しては、各コンポーネント内で外部にジョブを投げているからです。

例えば、ExampleGenにおいてはApache Beam SDKを用いて実装しているので、大規模分散処理に関してはDafaflow上で実行することが可能です。さらに、Trainerにおいても、学習ジョブはAI Platform Trainingで実行しているため、こちらも大規模並列学習が可能になっています。
つまり、Kubeflow Pipelineをデプロイしているクラスタのリソースは最小で、必要に応じて外部のマネージメントサービスを活用することで、簡単なタスクから複雑なタスクまで幅広く対応できるようになっています。

Pipelines SDK の活用

Kubeflow Pipeline には Pipeline SDK と呼ばれるものも用意されており、外部からでも簡単にパイプラインの実行や管理ができるようになっています。これを活用しフロントエンドと連携をしています。
例えば、

client = kfp.Client(
    host="YOUR_KUBEFLOW_HOST", 
    client_id="OAUTH_CLIENT_ID"
)
res = client.run_pipeline(
    experiment_id="YOUR_EXPERIMENT_ID",
    job_name="JOB_NAME",
    pipeline_id="PIPELINE_ID",
    params="ARGUMENTS",
)

といったように指定したパイプラインの実行ができます。
paramsに任意の引数を渡すことで柔軟にパイプラインの実行が可能です。

wait_for_run_completionやget_runメソッドも用意されているので、パイプラインの実行状況を確認することも容易です。

補足ですが、私たちの環境ではCloudIAP認証を用いているため、client_idが必要です。また、実行するサービスアカウントにIAP で保護されたウェブアプリ ユーザーという権限を付与する必要がある点にも注意が必要です。

フロント画面との連携

KARTEの管理画面で使用している技術を簡単に述べると、フロント: Vue.js、バックエンド: Node.js、DB: MongoDBで構成されています。
管理画面から任意のパラメータを入力し、Kubeflow Pipelineのパイプライン処理を実行をさせた上で、モデルの結果はいつでも管理画面から取得できる状態にしたいというのが今回の要件です。

現状、Pipeline SDKにはpythonのものしかなく、バックエンドがNode.jsであるKARTEで使おうと思うと、Node.jsにあるPython-shellを使用するかFlask等を使ってRESTの口をつくるかの2通りありました。
その中で、我々は、新たにPythonベースのコンテナをGKEに立てて、Node.jsベースのコンテナとkubeflow clusterの橋渡しとなるようなinternalなAPIとして用いる方針をとりました。
PythonベースのコンテナでPipeline SDKを使えるようにすることで、KARTE管理画面からKubeflow Pipelineを実行できるようにしました。

また、PipelineのジョブパラメータはMongoDBに保存しておくことで、いつでもKARTE管理画面からジョブの実行結果を確認できるようにしました。

このようにKARTEの管理画面からも簡単に任意のパラメーターでパイプラインを実行し、結果を取得することが可能です。

こういったSDKもきちんと用意されており、連携も簡単にできる点も素晴らしいですね。

今後の課題

サービスアカウント問題

現状のKubeflow Pipelineの仕組みではパイプラインをKubeflowにデプロイする段階でSecretをコードで指定する仕組みになっています

dsl.ContainerOp(
    name="NAME",
    image="IMAGE",
    command=["COMMAND"],
    arguments=["ARGUMENTS"],
)
.apply(gcp.use_gcp_secret("user-gcp-sa"))

そのため、実行するパイプラインごとに引数等でサービスアカウントを指定することができません。こちらも柔軟に変更できた方が権限管理は非常に便利になるのですが、現状は未対応なので場合によっては裏側で色々と権限管理を頑張る必要性が出てきます。

Kubeflow? TFX? AIPlatform?

少し余談にはなりますが、このあたりきちんと使い分けられている方は少ないのではないでしょうか。
Kubeflowはドキュメントにしたがってデプロイするとやや大きめなMachineTypeでNodePoolが作成され、GPU Node Poolも作成されます。つまり、このクラスタ内のリソースを使うのだろうと思っていたのですが、exampleを見ると Data Proc や AI Platform にジョブを投げているものもあります。さらに、TFXではTensoflow Transformerというものがあり、これはDataFlowにジョブを投げるものとなっています。こうなってくるとかなりややこしく、結局どういう構成がベストプラクティスなのかが見えづらい状況です。
あくまで私たちの仕組みでは、Kubeflow Pipeline自体のリソースは最小限に、コンポーネントの各ジョブは外部リソースを使ってスケールさせるという選択肢をとったに過ぎないので、この内容がみなさまの実際のケースに沿って最適な選択の一助となれば幸いです。