MicroAd Developers Blog

マイクロアドのエンジニアブログです。インフラ、開発、分析について発信していきます。

Digdag REST APIによるバッチの並列実行数の制御

はじめに

マイクロアドでサーバサイドエンジニアをしている高橋です。
今までの技術ブログでも記事になっていますが、マイクロアドでは主にDigdagとPythonを用いてデータ処理バッチを開発しています。

developers.microad.co.jp
developers.microad.co.jp
developers.microad.co.jp

今回はバッチの並列実行数を制御する必要が生じた際のお話をします。

バッチの並列実行数を制御する必要があるとき

定期実行されているバッチは、基本的に次の実行までに処理が完了するように実装されています。
毎時実行される場合は1時間以内に、毎日実行される場合は1日以内に処理が完了します1

しかしながら、設計によってはある1つのバッチが並列実行されることもあります。例えば以下のような場合です。

  • 不定期に実行され、1回1回時間をかけて処理するバッチ
  • 他の画面やバッチからkickされて処理が開始するバッチ

そのほか障害が発生してバッチがストップした場合にも、障害復旧後に同様の現象が起こりえます。
並列実行されても問題ないものがほとんどですが、設計によっては他のバッチやデータソースに悪影響を及ぼすこともあります2
これらの問題を未然に防ぐため1つのバッチの並列実行数を制御する必要があります。

Digdag REST APIによる制御

DigdagにはREST API3が存在します。
適切なID、パラメータと共にリクエストを送信するとバッチを実行したり、バッチのステータスを取得したりできます。
このREST APIを用いて実行中のバッチのステータスを取得し、並列実行数を制御する方針で進めました。

並列数を制御するために /api/projects/{id}/sessions というエンドポイントを使用します。
パラメータとして対象バッチのワークフロー名と実行中のsession_id4を組み合わせます。

REST APIから返却されるレスポンスの例(公式ドキュメントより抜粋)

{
  "sessions": [
    {
      ~~ 略 ~~
      "lastAttempt": {
        "id": {},
        "success": false,
        "done": false,
        "cancelRequested": false,
        "retryAttemptName": "string",
        "params": {
          "empty": false,
          "factory": {},
          "keys": [
            "string"
          ]
        },
        "finishedAt": 0,
        "createdAt": 0
      },
      ~~ 略 ~~
    }
  ]
}

返却値のキーのうち、 donecancelRequested というキーをチェックする必要があります。
done = false (終わっていない) かつ cancelRequested = false (中止リクエストが飛んでいない) だった場合を実行中と判断します。
以下、具体的にDigdag+Pythonで並列実行数を制御するためのサンプルコードを記載します。
シンプルに過去のセッションが完了するまでwhile文で待ち続ける実装にしています。

wait.dig

timezone: 'Asia/Tokyo'

_export:
   docker:
      image: "${バッチを実行するDockerイメージ}"

+wait_older_session:
   py>: wait.wait_older_session

~~ 以後、実行したいtaskを記載 ~~

wait.py

import json
import time
import urllib.parse
import urllib.request

WAIT_SEC = 60  # 適当な値
PRALLEL_LIMIT = 3 # 適当な値


def wait_older_session(project_id: int, session_id: int) -> None:
    """
    この処理より古いセッションが${PARALLEL_LIMIT}個未満になるまでwaitし続ける
    :params project_id: プロジェクトID
    :params session_id: セッションID
    """
    api_endpoint = urllib.parse.urljoin(
        ${digdagのendpoint},
        f'api/projects/{project_id}/sessions/?workflow={${バッチのワークフロー名}}&last_id={session_id}'
    )

    # DigdagのAPIを使って実行中のワークフローのセッション一覧を取得し、
    # この処理のsession_idより古いsession_idかつ実行中のセッションが${PARALLEL_LIMIT}個未満になるまでwaitし続ける
    while True:
        r = urllib.request.urlopen(api_endpoint)
        r_json = json.loads(r.read())
        sessions = r_json['sessions']

        # doneもcancelRequestedもFalseならば実行中であると判定
        running_sessions = list(
            filter(
                lambda session: (session['lastAttempt']['done'] is False and session['lastAttempt']['cancelRequested'] is False),
                sessions))
        # 念のためコードでも古いsession_idに絞り込むロジックを実装
        older_ids = [
            int(s['id']) for s in running_sessions if int(s['id']) < session_id
        ]
        older_ids_count = len(older_ids)

        # 実行中のセッションが${PARALLEL_LIMIT}個未満になったらループを抜ける
        if older_ids_count < PRALLEL_LIMIT:
            break

        iter_num += 1
        if iter_num % 5 == 0:
            # 5ループごとにprint出力
            print('古いセッションが動き続けています。 session_id: %s', older_ids)
        # ${PARALLEL_LIMIT}個以上実行されている場合はsleep
        time.sleep(WAIT_SEC)

制御を取り入れた結果

今回の制御を取り入れたことにより、他のバッチやデータソースに悪影響を及ぼすケースが減りました。
また、障害発生時にも1つのバッチの並列実行数が多くなりすぎないようにリトライ実行のタイミングを調整する必要がなくなりました。
よって今回の改善はバッチの運用負荷軽減に大きく貢献しました!

課題

今回の制御を取り入れたことで、waitを除いたバッチのメインロジックがどれぐらいの実行時間なのか一目で見えにくくなってしまいました。
バッチの実行時間はシステムの異常を検知する大事な指標なので、その部分も考慮できればより完璧でした。

まとめ

Digdag REST APIを活用すると他にもできることはたくさんありますが、今回は同一バッチの並列実行数の制御に絞ってご紹介いたしました。
ニッチな話ですが、少しでも参考になれば幸いです。


  1. 実際にはシステムの可用性の観点からかなり時間的な余裕をもって実行完了するように実装されています。

  2. クエリを実行するHadoopへの負荷が上がる、RDBのデッドロックが発生する、などが悪影響の具体例として挙げられます。

  3. Digdag REST API公式ドキュメント

  4. last_idというパラメータに設定すると、REST APIから返却されるセッションを対象のsession_idより古いsession_idを持つものだけに絞り込むことが可能です。セッションの概念に関する詳細はこの記事をご参照ください。 Treasure Workflowのsession timeとスケジューリングについて - Treasure Data - Support Engineering Team blog