MicroAd Developers Blog

マイクロアドのエンジニアブログです。インフラ、開発、分析について発信していきます。

Akkaを利用したクローラ開発

こんにちは、Tech Labの中野(@tosametal)です。 本記事ではAkka(Scala)を利用して開発したクローラの紹介を行います。
ストリームデータに対してクロールをする上で工夫した、ドメインごとのクロール頻度制御などについて紹介します。
Akkaはメッセージ指向のアクタシステムがベースとなっており、スケーラブルで弾力性のあるシステムを構築することが出来ます。
Akkaの詳細な説明は本記事では省略するため、興味がある方は 公式ドキュメント を読んで頂ければと思います。

概要

マイクロアドで独自に定義した、あらゆる事柄を目的を軸に階層構造で分類したものをトピックカテゴリと呼びます。
今回紹介するのはサイトのページのトピックカテゴリを判定するための前処理として動くクローラです。
以下はページのトピックカテゴリ判定を行うシステムの一部で、クローラは赤枠で囲った部分です。

f:id:tosametal:20201001192308p:plain

kafkaから判定対象のURLを取得し、そのURLに対してクロールを行い、結果(htmlやクロール出来なかった理由など)をkafkaに書き込みます。

要件

このクローラで重要なのが以下3点です。

(1)スケーラビリティ
クロール対象となるURLはDSP1/SSP2のRTB3時に取得できるURLまたは広告主サイトに設置したタグ発火時に取得できるURLなどです。 広告枠が増えたり接続SSPが増えるとクロール対象URLは増加し、システムの負荷も上がることが想像できます。

(2)高速な処理
出来るだけはやくクロールしてトピックカテゴリ分析を行い、広告配信への活用を行いたいです。

(3)ドメインごとのクロール頻度制御
kafkaから取得したデータに対して制御を行わずにクロールしてしまうと、同じドメイン(サイト)に対して高頻度にクロールを行い、負荷をかけてしまう恐れがあります。

これらを踏まえ、以下の図のような仕組みを考えました。

f:id:tosametal:20201001192302p:plain

kafkaから読み込んだデータを分配する役割のノードと、実際にクロールを行うノードを分けています。
分配ノードではドメインを元にkafkaから読み込んだURLをクロール用ノードに振り分けます。この時、同じドメインは必ず同じクロールノードに振り分けられるようにします。
クロール用ノードでは同じドメインのURLは同じノードで処理されるようにすることで、外部のストレージに依存せずにクロール頻度制御のための状態管理を行うことが出来ます。 また、クロールノードを増やすことで負荷分散が可能になります。

容易にスケール可能であることや、出来るだけ高速に処理したいという点を考慮し、このクローラの実装ではAkkaを採用することにしました。

Akkaによる実装

ではAkkaを用いた実装を紹介していきます。
クローラの全体構成としては、executor、distributor、crawlerの3種類のroleを持つクラスタをAkka Clusterで構築しました。

f:id:tosametal:20201001192211p:plain

楕円はアクタを表しています。
先ほどの分配ノードがdistributorノード、クロールノードがcrawlerノードに該当します。 executorは直接クロールの処理に絡むわけではありませんが、クラスタ全体を管理するうえで必要になりました。
ではそれぞれのroleについて詳細に説明していきます。

role1: executor

Akka Clusterのseedノードにもなります。
Receptionist を利用してクラスタ内のアクタの参照を取得し、全ノードが参加したことを検知するとクラスタ全体の処理を開始する役割を持ちます。

クラスタ起動時に行われるアクタ間のメッセージのイメージ(CrawlerMainの子アクタは一部省略)

f:id:tosametal:20201001192201p:plain

DistributorMainアクタやCrawlerMainアクタはsetup時にReceptionistに登録を行い、 NodeMonitorアクタは定期的にReceptionistに問い合わせることでクラスタに参加しているアクタの参照を取得します。

// DistributorMainアクタがReceptionistにRegisterメッセージを送る部分一部抜粋
object DistributorMain {
 lazy val key: ServiceKey[Command] = ServiceKey[Command]("distributor")

 def apply: Behavior[Command] = Behaviors.setup { context =>
   context.system.receptionist ! Receptionist.Register(key, context.self)
 }
}

ExecutorはNodeMonitorからのメッセージでアクタの参照を受け取り、全ノードが参加したことを確認したうえでDistributorのStreamを開始します。

role2: distributor

kafkaから取得したURLのドメインから、分配するcrawlerノードを決定するなどの役割を持ちます。
書き込み部分もそうですが、kafkaからの読み込みはAlpakka(Akka Streams)のKafka APIを利用しています。
以下はDistributorアクタ内で生成されるAkka Streamsのグラフです。

f:id:tosametal:20201001192257p:plain

consumerでkafkaからデータの読み出しを行い、flowでrobots.txtの取得などを行います。 partitionはドメインとクロールノードの数から、どのクロールノードに分配するかを決定し、sinkは対応するcrawlerのアクタにメッセージを送信します。 GraphDSLを用いて上記のグラフを作成する実装のイメージは以下のようになります。
※ mergeとflowの間のKillSwitchは図では省略しています

RunnableGraph.fromGraph(
 GraphDSL.create(KillSwitches.single[ConsumerRecord[String, Option[PageTopicAnalysisRequest]]]) {
   implicit b => sw =>
     import akka.stream.scaladsl.GraphDSL.Implicit
     val partition = b.add(Partition[CrawlRequest](crawlers.size, Partitioner.apply(crawlers.size))
     val merge = b.add(Merge[ConsumerRecord[String, Option[PageTopicAnalysisRequest]]](kafkaConsumerNum)
     Seq.fill(kafkaConsumerNum)(consumer).zipWithIndex.foreach {
       case (cons, index) =>
         cons ~> merge.in(index)
     }
  
     merge ~> sw ~> flow ~> partition

     sinks.foreach { sink =>
       partition.map {
         case allowed: CrawlRequest.Allowed => RequestCrawl(allowed)
         case forbidden: CrawlRequest.Forbidden => SkipCrawl(forbidden)
         case unknown: CrawlRequest.Drop => DropRequest(unknown)
       } ~> sink
     }
  
     ClosedShape
 }
)

また、クローラ全体でbackpressureを効かせることが出来ないため、consumerでthrottleを使ってスループットの調整を行なっています。

role3: crawler

クロール処理を行います。アクタで実装しています。
以下はアクタ間のメッセージのイメージです。矢印の向きにメッセージが送られます。

f:id:tosametal:20201001192246p:plain

distributorからのメッセージをsourceアクタで受け取ります。
partitionアクタで、現在時刻、最終クロール時刻、ドメインごとクロール頻度制御QPS、現在クロール処理中か否か、という情報を用いてクロール可能かどうかの判定を行います。
クロール可能な場合はcrawlアクタに送信し、そこでクロールを行い結果をkafkaSinkに送ります。
クロール不可能でリトライも不可能な場合はgiveupアクタに送信し、クロールは行わずにクロールが出来なかったという結果をkafkaSinkに送ります。
(リトライ不可能となるのは、リトライ上限回数を超えたリクエストや最大保持期限を超えたリクエストなど)
クロール不可能だがリトライ可能な場合はretryアクタに送信します。retryアクタでは以下のようなデータ構造でリトライ対象のリクエストを保持し、定期的にpartitionアクタに送信します。

f:id:tosametal:20201001192312p:plain

kafkaSinkアクタではAlpakkaのKafka APIを用いて結果をkafkaに書き込みを行います。

最後に

紹介してきたクラスタ全体図をまとめると以下のようになります。

f:id:tosametal:20201001192221p:plain

簡単にでしたがAkkaを用いて実装したクローラの紹介でした。
ドキュメントを読めば実装はそこまで苦労することはなかったため、図を中心とした仕組みや構成の解説としました。
以上、最後まで読んで頂きありがとうございました。


  1. Demand Side Platformの略。広告主側が使用するプラットフォーム。

  2. Supply Side Platformの略。媒体側が使用するプラットフォーム。

  3. Real Time Biddingの略。リアルタイムで入札を行う仕組み。