MicroAd Developers Blog

マイクロアドのエンジニアブログです。インフラ、開発、分析について発信していきます。

Pythonのdigdagモジュールを利用してDigdagバッチの処理継続可能なエラーをハンドリングする

はじめに

サーバサイドエンジニアの前西です。主にETL処理1のバッチ開発を行っています。

マイクロアドでは、データ処理バッチを主にDigdagとPythonで開発2しています。

今回は、マイクロアドで開発しているDigdagバッチでのエラー処理について、簡単に紹介したいと思います。

処理中断すべきエラーと処理継続可能なエラー

エラーには下記の2種類があります。

  1. 処理中断すべきエラー
  2. 処理継続可能なエラー

特に何もエラーハンドリングをしない場合、エラー発生時には処理が中断されます。今回の記事では2の処理継続可能なエラーのハンドリングについて考えます。

処理継続可能なエラーが発生する場面

例えば、バッチ処理において処理の対象が複数あり、それらを一つずつ処理するようなロジックを書くことがしばしばあります。具体例として、下記のhoge.pyworkflow.digで構成されるジョブについて考えてみます。

### hoge.py
def sample() -> None:
    ads: List[Ad] = fetch_target_ads()  # 処理対象を取得
    
    sample_impl(ads)


def sample_impl(ads) -> None:
    # 対象を一つずつ処理
    for ad in ads:
        do_something(ad)
### workflow.dig
+hoge:
  py>: hoge.sample

このような処理において、forループ中、とあるaddo_somethingの処理中にエラーが発生した場合、その時点で処理が終了してしまいます。すなわち、それより後に処理されるはずだったadは処理されることなく終了します。

コネクションエラーなどの場合は再実行すれば成功する可能性がありますが、例えば対象のadの内容に不整合があってエラーが発生している場合、不整合を解消しない限り、何度リトライしても成功せず、同じ状況の繰り返しに陥ります。一つ不整合データがあるだけで、他の正常なデータも処理できなくなってしまうのは好ましくないです。

処理継続するエラー処理の実現

上記の状況を解消するために、ループ中にエラーハンドリングを追加してみます。

### hoge.py
def sample() -> None:
    ads: List[Ad] = fetch_target_ads()  # 処理対象を取得
    
    sample_impl(ads)


def sample_impl(ads) -> None:
    # 対象を一つずつ処理
    for ad in ads:
        try:
            do_something(ad)
        except SomeException:
            # 例外を捕捉し、処理を継続する
            LOGGER.error('error!')

途中でエラーが発生した場合にも処理を継続できるようになりました。しかし、except句を利用して例外発生時にも処理を継続させるため、Digdagのジョブの実行結果としては成功となってしまいます。これではエラーが発生したことに気付くのが困難です。

この問題を解決するためには、処理を継続しつつ、処理終了後にDigdagのジョブの実行結果を失敗にする必要があります。これを実現するためには、PythonからDigdagに処理に失敗したことを伝えるパラメータを渡すことができれば良さそうです。値の受け渡しにはdigdagモジュールのdigdag.env.storeが利用できます。3

### hoge.py
def sample() -> None:
    ads: List[Ad] = fetch_target_ads()  # 処理対象を取得

    has_error: bool = sample_impl(ads)

    # digdagの環境変数に値をセットし、Digdag側にバッチをfailさせたいことを伝える
    import digdag
    digdag.env.store({'has_error': has_error})


def sample_impl(ads) -> bool:
    """
    処理途中でエラーが発生したか否かのbool値を返却する
    """
    has_error = False
    # 対象を一つずつ処理
    for ad in ads:
        try:
            do_something(ad)
        except SomeException:
            # 例外を捕捉し、処理を継続する
            LOGGER.error('error!')
            has_error = True
    return has_error
### workflow.dig
_export:
  has_error: false

+hoge:
  py>: hoge.sample

+detect_error:
  # Pythonでhas_errorにtrueがセットされた場合、バッチをfailさせる
  if>: ${has_error}
  _do:
    fail>: This job has errors! Check error log.

_error:
  # エラーメールを送信
  mail>: body.txt
  subject: 'This workflow failed.'
  to: [me@example.com]

Python側でセットしたhas_errorパラメータの値をDigdagで読み、ifオペレータで分岐させた後failオペレータでバッチを失敗させています。これにより、処理途中でエラーが発生した場合にも継続して他の処理対象を処理しつつ、Digdag側にエラー処理を任せることができるようになりました。

このようにしておけば、Digdag側でエラーメールを送信することもできるので、容易に検知が可能です。

開発段階でエラーが発生することが想定されていなかったとしても、このことを常に考慮して開発しておくことで、予期せぬ事態が発生した場合にも被害を最小限に抑えることができます。

ここまでの流れを図にまとめると以下のようになります。

before
f:id:taka_maenishi:20220217200548p:plain
after
f:id:taka_maenishi:20220217200555p:plain

おわりに

Digdagバッチの処理継続可能なエラーのハンドリングについて、簡単にではありますが紹介しました。参考になれば幸いです。


  1. Extract, Transform, Loadの頭文字を並べたもので、データを利用しやすい形へと変換する一連の処理のこと。

  2. DigdagとPythonを用いたバッチ開発については、過去のブログ記事もあわせてご覧ください。

  3. https://docs.digdag.io/workflow_definition.html#using-api