こんにちは、機械学習エンジニアの河本 (@nnkkmto) です。マイクロアドのシステムにおいて、機械学習が必要な部分の研究開発及び実装を担当しています。
この記事では GCP (Google Cloud Platform)、特に GCE (Google Compute Engine) を用いた機械学習モデル学習バッチのスケジュール実行について紹介できればと思います。
はじめに
現在マイクロアドでは、以下にあるようにGCP を用いた機械学習基盤の構築を進めていて、Kubeflow を用いた実行環境へ移行予定です。
その移行前の第一段階として、GCE を用いた学習バッチの実行環境を作成したのですが、今回は GCE 上での学習バッチのスケジューリング実行について、具体的にどう行っているのかを紹介します。
前提
- Kubeflow を用いた実行環境へ移行する前の仮の実行環境となるため手っ取り早く構築できること
- 複数インスタンスで学習バッチ実行を行えるように学習バッチ間で汎用的な環境であること
の2つが今回の前提となっています。
また、各インスタンスは単一バッチの実行のみが使用用途であり常に起動させ続ける必要がないこと、同じく仮の実行環境であることからインスタンスはプリエンティブルVMを使用しています。*1
全体の流れ
全体の流れは大まかには以下のようになっています。
各学習バッチに対してそれぞれインスタンスが割り振られていて、共通のスケジューリング部分により各インスタンスの起動を制御し、インスタンスの起動をトリガーとして学習バッチが実行され完了すればインスタンスも停止します。
この記事では説明を省きますが、その後学習バッチは GCS (Google Cloud Storage) 上にあるデータソースを読み込み、同じく GCS 上に学習済みのモデルを保存します。そしてオンプレ環境にある HDFS へ転送され予測API等で読み込まれます。
また、元のデータソースはオンプレ環境 HDFS 上にあるため、アプリケーションごとに整形され GCS ヘバッチ転送されています。
各処理の詳細
以下では学習バッチのスケジューリング実行に関して具体的な実装内容について説明します。
スケジューリング
スケジューリング部分では、各インスタンスの起動を制御しています。
構成は
Scheduling compute instances with Cloud Scheduler | Cloud Scheduler Documentation | Google Cloud
を参考にしていて、流れは以下のようになっています。
- Cloud Scheduler により、指定時間に対象インスタンスを起動するようにメッセージを Cloud Pub/Sub へ送信
- Cloud Pub/Sub で Cloud Functions へメッセージ受け渡し
- Cloud Functions でそのメッセージ内容に応じて対象インスタンスを起動
Cloud Scheduler で対象インスタンスの指定と起動時間の管理を行うことで、各インスタンスの起動を一元管理できるようになっています。
Cloud Scheduler
Cloud Scheduler は GCP における crontab のようなもので、各スケジューリングを一元管理できます。
Cloud Scheduler では以下のように、ターゲットを Cloud Pub/Sub にして対象インスタンスごとにスケジュールと送信するメッセージを設定します。
Cloud Pub/Sub
Cloud Pub/Sub では「トピックを作成」から以下のようにインスタンス起動用のトピックを用意し、このトピックに Cloud Scheduler からメッセージを送信し Cloud Functions のトリガーとします。
Cloud Functions
Cloud Functions はサーバレスの実行環境で関数を登録すれば任意のトリガーによって実行されます。
今回は
- 対象インスタンスのリスト
- 起動か停止か
- 待ち時間
の3つをメッセージとして受け取り、それによって処理を行う関数を GCS 上に保存し登録しました。
関数の詳細な実装は省きますが、以下の公式ドキュメント
Scheduling compute instances with Cloud Scheduler | Cloud Scheduler Documentation | Google Cloud
もしくは Python での実装は
Cloud Scheduler + Cloud Pub/Sub + Cloud Functions でGCEのインスタンスの自動起動or停止させてみた - Qiita
が詳しいです。
インスタンス起動による学習バッチの実行
設定したスケジュールにより「インスタンス起動 → 学習バッチ実行 → インスタンス停止」の処理を行うため、インスタンス起動時に実行されるスクリプトである startup-script を各インスタンスに metadata として登録します。
スクリプトは簡単には以下のような内容になります。
# 学習バッチ実行 python script.py if [ "$?" = 0 ]; then # インスタンス停止 sudo shutdown -h now exit else # エラー時の処理 python script2.py fi
また、このスクリプトは GCS 上に保存している場合、インスタンスの編集画面内で以下のようにして登録可能です。
preempted 時のリトライ処理
前述のようにインスタンスはプリエンティブルVMを使用しているのですが、プリエンティブルVMは起動時間制限があり最大で24時間以内に停止(preempted)します。
そのため、プリエンティブルVMの起動時間制限により停止した場合はインスタンスを再起動することにより、バッチ実行をリトライする処理を入れています。
具体的には、インスタンス停止時に実行されるスクリプトである shutdown-script を各インスタンスの metadata として登録し、プリエンプトされたと判定された場合のみ「30秒以上後*2にインスタンスを起動」というメッセージを Cloud Pub/Sub の先ほど用意したトピックにパブリッシュするようにしています。
プリエンプトされたかどうかは
プリエンプティブル VM を作成して使用する | Compute Engine ドキュメント | Google Cloud
にあるように、curl コマンドで取得可能なため、スクリプトは以下のようにしています。
IS_PREEMPTED=$(curl "http://metadata.google.internal/computeMetadata/v1/instance/preempted" -H "Metadata-Flavor: Google") if [ "$IS_PREEMPTED" = "TRUE" ]; then # 起動メッセージを送信 python script.py exit fi
shutdown-script は startup-script と同様に GCS 上に保存し以下のように各インスタンスに登録しています。
また、Python での Cloud Pub/Sub へのメッセージの送信は
Google Cloud の公式レポジトリ https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/pubsub/cloud-client/publisher.py
にあるように google-cloud-pubsub
モジュールを使用し以下のような関数で実行できます。
from google.cloud import pubsub_v1 def publish_messages(project_id, topic_name, msg): publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) data = msg.encode('utf-8') future = publisher.publish(topic_path, data=data)
終わりに
まだ運用し始めてそこまで時間がたっておらずここに記載されていない問題が起きる可能性があることや、ベタベタに GCP のドキュメントをベースにしているため、この方法が確実だとも独自の方法だとも言えないのですが、もし参考になれば本当に嬉しいです。読んでくださりありがとうございました。