MicroAd Developers Blog

マイクロアドのエンジニアブログです。インフラ、開発、分析について発信していきます。

Digdagを使ったジョブ管理

はじめまして。アプリケーションエンジニアの中野です。
以前、MicroAdのデータ基盤の記事で紹介されていましたが、マイクロアドではデータ基盤刷新のタイミングでワークフロー管理ツールのDigdagを採用しました。
今回の記事では、Digdag採用の経緯やワークフローを作成する際に注意した点を紹介します。

Digdag採用の経緯

マイクロアドのDSP*1であるBLADEではBidRequestやImpression*2、Click、Conversion*3、その他BLADEから出力される様々なログやマイクロアドの他のプロダクトのログ、他社から提供されるデータなど、様々なデータを広告配信最適化の分析に活かしています。
これらのログを分析するバッチ処理は各々のジョブが複雑な依存関係を持っています。
これまではcronやJenkinsを用いてこれらの処理を行っていましたが

  • コード管理が出来ていない。
  • コード管理出来ていても依存が複雑になるとコードも複雑になり見通しが悪い。保守性が下がる。
  • 障害発生時のリカバリ手順が複雑で対応可能な人が限られていた。

といった問題がありました。これらを解決するため、Digdagを採用しました。

Digdagではワークフローの記述はYAMLに似たDSLでdigという拡張子のファイルに記述するため、コードでの管理はもちろん可能です。依存が複雑になったとしても見通しの良いコードも書きやすいです。
またsession_timeというジョブの実行予定時間を保持することが出来る概念が存在するためリカバリが容易な冪等な処理*4が行いやすいです。
リカバリ自体もUIのリトライボタンを押すだけで実行可能なので手順などを気にする必要がなくなりました。

リトライ画面
f:id:tosametal:20180906111025p:plain

他にも

  • 定期実行、実行順序の制御、実行結果の保持といったワークフロー管理ツールに最低限求める機能を備えている
  • 学習コストの低さやDigdagサーバのスケールアウトの容易さ
  • ジョブのDockerコンテナでの実行の容易さ

など様々なメリットを感じたことから採用に至りました。

現在主にHadoopでのETL処理の制御にDigdagを使用しています。
DigdagサーバでHadoopクライアントとなるDockerコンテナを立ち上げ、beelineでクエリを実行します。

ワークフローを作成するうえで注意した点

では、Digdagを用いて複雑な処理をワークフローで管理するために気をつけた点、工夫した点などを紹介します。 以下のような点に注意しました。

  • Projectは小さく意味のある単位で分割
  • Projectを跨いだ依存はs3_waitで解決
  • 見通しの良いワークフローを作る

ではそれぞれ説明していきます。

Projectは小さく意味のある単位で分割

Digdagではワークフローを記述したファイルとその中で使用される設定ファイルやスクリプトなどをまとめたものをProjectと呼びます。
ProjectはDigdagサーバでバージョン管理される単位であり、また、UIでもProjectごとにワークフローの確認を行うことが出来ます。

UIの例 f:id:tosametal:20180904175612p:plain

Projectは出来るだけ小さく意味のある単位(DBごとなど)で構成しています。

f:id:tosametal:20180904174156p:plain

上記の図はBLADEに関連するDBの一部です。
log_db, business_db, bid_price_dbという3つのDBがありますが、それぞれに対応したProjectを作成し、各DBで行うべき操作のみをそのProject内では記述します。
各Projectを出来るだけ小さな単位で構成することで各Projectの責務が明確になり、またデプロイ時の影響も小さな範囲に抑えることが出来ます。
上記の図の例では

  • log_dbはKafkaから取り出した生のログを後続処理で使用しやすいフォーマットへの変換と重複排除
  • business_dbはlog_dbのデータを元にビジネスサイドが参照するデータを作成
  • bid_price_dbはRTB時の入札金額を算出

などのように責務が分かれています。
モジュール設計と同じで、Projectも疎結合で凝集度を高めることを目標に設計しています。

Projectを跨いだ依存はs3_waitで解決

それぞれのProjectを小さく作ると、Project間での連携が必要になる場面が多くなると思います。

Digdagでは同一Project内での依存関係はワークフローの記述で制御できます。
以下の例ではtaskAの完了後にtaskBとtaskCが並行に処理され、BとCがどちらも完了するとtaskDが実行されます。

+taskA:
  echo>: hello taskA
+taskBC:
  _parallel: true
  +taskB:
    echo>: hello taskB
  +taskC:
    echo>: hello taskC
 
+taskD:
  echo>: hello taskD

しかし他Projectとの依存関係は上記のようにワークフローの記述だけでは実現できません。
そこで標準機能として備わっているs3_waitと、独自で作成したs3_touchというコマンドを使用してs3のフラグファイルの有無によってProjectを跨いだ連携を行っています。

s3_touch.dig

+s3_touch:
  _export:
    docker:
      image: microad_docker_registry/s3_touch:latest
  _env:
    AWS_ACCESS_KEY_ID: ${secret:aws.s3.access_key_id}
    AWS_SECRET_ACCESS_KEY: ${secret:aws.s3.secret_access_key}
    AWS_DEFAULT_REGION: ${secret:aws.s3.region}
  sh>: s3_touch ${flag_file}

※ s3_touchというdocker imageではawscliを用いてaws s3 cp flag_file を行うs3_touchというコマンドを作成しています

log_dbでの処理が完了した場合には、それをあらわすフラグファイルをs3にs3_touchコマンドを使用して配置します。
使用する側のbusiness_dbとbid_price_dbでは該当のログが正常に生成されているかどうかの確認はs3にフラグファイルが存在するかどうかで判定し、存在する場合はlog_dbを参照して処理を開始します。

log_dbの処理

+convert_log:
  echo>: convert log and remove duplication
+touch:
  call>: s3_touch.dig
  flag_file: aws_bucket/flag_file

後続で使用されるログを生成するDBではs3_touchでフラグファイルを作成

business_dbの処理

+wait:
  s3_wait>: aws_bucket/flag_file
+task:
  echo>: finish task

log_dbで生成されたログを使用する場合はs3_waitでログの存在確認

s3_waitオペレータを使用すると、指定したファイルが出現するまで待ち続け、後続の処理には進みません。
log_dbの処理が失敗するとs3にフラグファイルは作成されず、後続処理のbusiness_dbとbid_price_dbではフラグファイルの存在の確認が出来ないため処理が進みません。
log_dbでリカバリを行い、フラグファイルが作成されると、business_dbとbid_price_db側で特にリカバリの操作などはしなくても、フラグファイルの存在を確認し勝手に処理が進みます。
s3_waitを使うとリカバリ時の対応も楽になります。

見通しの良いワークフローを作る

ワークフロー自体に分岐条件やループ、ネストなどがてんこもりになっていたら可読性が下がり、いったい何の処理を行っているのか解読に苦労してしまいます。
ワークフローの見通しを良くするために、ワークフローファイルには処理の詳細は書かず処理の流れだけを記述しています。
そのために工夫した点をいくつか紹介します。

1. 設定はまとめてinclude

例えば時間などは以下のようにtime.digというファイルにまとめています。
使用する側のワークフローでは毎度moment(session_time).format("YYYYMMDD")のような煩わしい記述をしなくてもよくなり、time.todayのように直感的に日付を取得することが出来ます。

time.dig

time:
  today: ${moment(session_time).format("YYYYMMDD")}
  yesterday: ${moment(session_time).add(-1, 'days').format("YYYYMMDD")}

time_user.dig

_export:
  !include : 'time.dig'

+task:
  echo>: ${time.today}

また全てのワークフローで使用する設定などは以下のようにまとめ、そのファイルをincludeします。

!include : 'env.dig'
!include : 'time.dig'
2. Python APIを利用

ワークフローでif>:オペレータやfor_each>:などを多用するとネストが深くなり処理の流れが読みづらくなってしまいます。
Python APIのdigdag.env.add_subtaskを利用し条件分岐などはPythonで処理することでワークフローの見通しを良くすることが出来ます。

例えば前述したs3_touchは以下のように書けます。

import digdag

class S3:
    def touch(self, **kwargs):
        is_production = kwargs['is_production']
        flag_file = kwargs['flag_file']
        if is_production:
            digdag.env.add_subtask({
                '_type': 'call',
                '_command': 's3_touch.dig',
                '_export': {'flag_file' : flag_file}
            })
        else:
            print(f"wait flag_file: {flag_file}")

ワークフローで渡したis_production変数の値に応じて標準出力するかs3_waitを実行するか判断

最後に

設計的な話ばかりで具体的なコードはほとんど載せられませんでしたが雰囲気は伝わったでしょうか。
まだDigdagを導入してそれほど長くないため、まだまだこれから気づく事がたくさん出てきそうです。
その際にはまたこの場で共有出来ればとおもいます。

*1:広告配信プラットフォーム。Demand Side Platformの略。

*2:広告表示のこと。

*3:広告表示によって獲得できる成果のこと。

*4:同じ処理を何度実行しても同じ結果になるような処理のこと。