こんにちは、Tech Labの中野(@tosametal)です。
本記事ではAkka(Scala)を利用して開発したクローラの紹介を行います。
ストリームデータに対してクロールをする上で工夫した、ドメインごとのクロール頻度制御などについて紹介します。
Akkaはメッセージ指向のアクタシステムがベースとなっており、スケーラブルで弾力性のあるシステムを構築することが出来ます。
Akkaの詳細な説明は本記事では省略するため、興味がある方は 公式ドキュメント を読んで頂ければと思います。
概要
マイクロアドで独自に定義した、あらゆる事柄を目的を軸に階層構造で分類したものをトピックカテゴリと呼びます。
今回紹介するのはサイトのページのトピックカテゴリを判定するための前処理として動くクローラです。
以下はページのトピックカテゴリ判定を行うシステムの一部で、クローラは赤枠で囲った部分です。
kafkaから判定対象のURLを取得し、そのURLに対してクロールを行い、結果(htmlやクロール出来なかった理由など)をkafkaに書き込みます。
要件
このクローラで重要なのが以下3点です。
(1)スケーラビリティ
クロール対象となるURLはDSP1/SSP2のRTB3時に取得できるURLまたは広告主サイトに設置したタグ発火時に取得できるURLなどです。
広告枠が増えたり接続SSPが増えるとクロール対象URLは増加し、システムの負荷も上がることが想像できます。
(2)高速な処理
出来るだけはやくクロールしてトピックカテゴリ分析を行い、広告配信への活用を行いたいです。
(3)ドメインごとのクロール頻度制御
kafkaから取得したデータに対して制御を行わずにクロールしてしまうと、同じドメイン(サイト)に対して高頻度にクロールを行い、負荷をかけてしまう恐れがあります。
これらを踏まえ、以下の図のような仕組みを考えました。
kafkaから読み込んだデータを分配する役割のノードと、実際にクロールを行うノードを分けています。
分配ノードではドメインを元にkafkaから読み込んだURLをクロール用ノードに振り分けます。この時、同じドメインは必ず同じクロールノードに振り分けられるようにします。
クロール用ノードでは同じドメインのURLは同じノードで処理されるようにすることで、外部のストレージに依存せずにクロール頻度制御のための状態管理を行うことが出来ます。
また、クロールノードを増やすことで負荷分散が可能になります。
容易にスケール可能であることや、出来るだけ高速に処理したいという点を考慮し、このクローラの実装ではAkkaを採用することにしました。
Akkaによる実装
ではAkkaを用いた実装を紹介していきます。
クローラの全体構成としては、executor、distributor、crawlerの3種類のroleを持つクラスタをAkka Clusterで構築しました。
楕円はアクタを表しています。
先ほどの分配ノードがdistributorノード、クロールノードがcrawlerノードに該当します。
executorは直接クロールの処理に絡むわけではありませんが、クラスタ全体を管理するうえで必要になりました。
ではそれぞれのroleについて詳細に説明していきます。
role1: executor
Akka Clusterのseedノードにもなります。
Receptionist
を利用してクラスタ内のアクタの参照を取得し、全ノードが参加したことを検知するとクラスタ全体の処理を開始する役割を持ちます。
クラスタ起動時に行われるアクタ間のメッセージのイメージ(CrawlerMainの子アクタは一部省略)
DistributorMainアクタやCrawlerMainアクタはsetup時にReceptionistに登録を行い、 NodeMonitorアクタは定期的にReceptionistに問い合わせることでクラスタに参加しているアクタの参照を取得します。
// DistributorMainアクタがReceptionistにRegisterメッセージを送る部分一部抜粋 object DistributorMain { lazy val key: ServiceKey[Command] = ServiceKey[Command]("distributor") def apply: Behavior[Command] = Behaviors.setup { context => context.system.receptionist ! Receptionist.Register(key, context.self) } }
ExecutorはNodeMonitorからのメッセージでアクタの参照を受け取り、全ノードが参加したことを確認したうえでDistributorのStreamを開始します。
role2: distributor
kafkaから取得したURLのドメインから、分配するcrawlerノードを決定するなどの役割を持ちます。
書き込み部分もそうですが、kafkaからの読み込みはAlpakka(Akka Streams)のKafka APIを利用しています。
以下はDistributorアクタ内で生成されるAkka Streamsのグラフです。
consumerでkafkaからデータの読み出しを行い、flowでrobots.txtの取得などを行います。
partitionはドメインとクロールノードの数から、どのクロールノードに分配するかを決定し、sinkは対応するcrawlerのアクタにメッセージを送信します。
GraphDSLを用いて上記のグラフを作成する実装のイメージは以下のようになります。
※ mergeとflowの間のKillSwitchは図では省略しています
RunnableGraph.fromGraph( GraphDSL.create(KillSwitches.single[ConsumerRecord[String, Option[PageTopicAnalysisRequest]]]) { implicit b => sw => import akka.stream.scaladsl.GraphDSL.Implicit val partition = b.add(Partition[CrawlRequest](crawlers.size, Partitioner.apply(crawlers.size)) val merge = b.add(Merge[ConsumerRecord[String, Option[PageTopicAnalysisRequest]]](kafkaConsumerNum) Seq.fill(kafkaConsumerNum)(consumer).zipWithIndex.foreach { case (cons, index) => cons ~> merge.in(index) } merge ~> sw ~> flow ~> partition sinks.foreach { sink => partition.map { case allowed: CrawlRequest.Allowed => RequestCrawl(allowed) case forbidden: CrawlRequest.Forbidden => SkipCrawl(forbidden) case unknown: CrawlRequest.Drop => DropRequest(unknown) } ~> sink } ClosedShape } )
また、クローラ全体でbackpressureを効かせることが出来ないため、consumerでthrottleを使ってスループットの調整を行なっています。
role3: crawler
クロール処理を行います。アクタで実装しています。
以下はアクタ間のメッセージのイメージです。矢印の向きにメッセージが送られます。
distributorからのメッセージをsourceアクタで受け取ります。
partitionアクタで、現在時刻、最終クロール時刻、ドメインごとクロール頻度制御QPS、現在クロール処理中か否か、という情報を用いてクロール可能かどうかの判定を行います。
クロール可能な場合はcrawlアクタに送信し、そこでクロールを行い結果をkafkaSinkに送ります。
クロール不可能でリトライも不可能な場合はgiveupアクタに送信し、クロールは行わずにクロールが出来なかったという結果をkafkaSinkに送ります。
(リトライ不可能となるのは、リトライ上限回数を超えたリクエストや最大保持期限を超えたリクエストなど)
クロール不可能だがリトライ可能な場合はretryアクタに送信します。retryアクタでは以下のようなデータ構造でリトライ対象のリクエストを保持し、定期的にpartitionアクタに送信します。
kafkaSinkアクタではAlpakkaのKafka APIを用いて結果をkafkaに書き込みを行います。
最後に
紹介してきたクラスタ全体図をまとめると以下のようになります。
簡単にでしたがAkkaを用いて実装したクローラの紹介でした。
ドキュメントを読めば実装はそこまで苦労することはなかったため、図を中心とした仕組みや構成の解説としました。
以上、最後まで読んで頂きありがとうございました。