MicroAd Developers Blog

マイクロアドのエンジニアブログです。インフラ、開発、分析について発信していきます。

順序保証型分散ストリーム処理

はじめまして.マイクロアドの新卒3年目のサーバサイドエンジニアの松宮です. 主にMicroAd BLADEという広告配信プラットフォームの開発をしています. サーバサイドの開発言語はJava/Scalaが中心で,最近はScalaの利用シーンが増えてきました.

ということで本日はScalaが活躍するお話です. タイトルの通り,順序保証型分散ストリーム処理 についてお話します. 上記用語は分散ストリーム処理に順序保証型という単語をくっつけた造語で,分散ストリーム処理において時系列を保持して処理を実行する方法です. 「大量のデータを順番に処理したい!」という方がいらっしゃれば是非ともご一読頂ければ幸いです.

順序保証型分散ストリーム処理とは

そもそもストリーム処理とは,無尽蔵に生成されるデータを半永久的に処理し続けることを指し,アクセスログやセンサーから得られるログデータなどの時々刻々と生成されるデータを処理する目的で利用されます. 一方,分散処理はプロセスやサーバを分散させて処理することで大量データを効率よく捌くために利用されます. つまり分散ストリーム処理とは大量のストリームデータを効率よく処理するために利用されます.

f:id:kmatsumiya6:20180328122733p:plain
分散ストリーム処理の概念図

しかしストリーム処理と分散処理の両方においてデータの順序を保証することが難しいという欠点があります. ストリーム処理では,ストリームデータを到着順に処理するため,予め時系列順であることをストリームデータを生成する側が保証しなければなりません. また,分散処理ではストリームデータを分散して処理するため時系列を保ったまま処理することはできません.

ところで時系列順で処理したい時とはどういった場合でしょうか. ここでは「アクセスログでユーザを追跡すること」を例に話します.弊社はアドテクの会社なので,広告を闇雲に配信している訳ではなく,ユーザの行動履歴を分析した上で適切な広告を配信しています.(ターゲティング広告と呼びます)そのために,ユーザがどういう行動をしているかを追跡するためにアクセスログを利用しています. 例えば,ECサイトでの購買効果をユーザの行動履歴から分析したい場合には,トップページ,商品ページ,購入ページと進む流れを分析する事が考えられます. しかし,アクセスログの順序が入れ替わりトップページ,購入ページ,商品ページのように記録されてしまった場合,正しく分析できないのは直感的に分かるかと思います. よって,アクセスログを正しい順序で処理することが求められます.

f:id:kmatsumiya6:20180328123938p:plain
アクセスログでユーザを追跡

アクセスログは1秒あたりに数万件がやってきます.これは途方もない数です.一日あたりに換算すると数億~数十億というログ量になります. ただ蓄積するだけならば話は単純ですが,実際にはアクセスログを整形・加工し,分析できる状態に仕上げる必要があります. これら大量のアクセスログをストリーム処理するためには単一のプロセスでは処理が間に合いません.そこで分散ストリーム処理の検討が必要になってきます.さらに分析で利用するために順序を保証する処理も同時に実現する必要がありました.

このような経緯で弊社では順序保証型分散ストリーム処理を設計・導入するに至りました.

どうやって順序を保証するか?

既に話した通り,分散ストリーム処理において順序を保証するには次の2つの課題をクリアする必要があります.

  • ストリーム処理: 入力データの順序への依存
  • 分散処理: 非同期処理による整合性の喪失

これらの課題を順番に見ていきます.

ストリーム処理: 入力データの順序への依存

膨大なアクセスログは,通常1つのサーバではなく複数のサーバで収集されると思います. そのため,ストリーム処理の入力データは複数のサーバから非同期に送られてくることになります. 非同期であるため,そのままではログの時系列が揃わなくなるのは分かるかと思います. そのため,時系列順で処理するためには入力データをソートする必要があります. そこで弊社ではSpark Streamingという分散ストリーム処理基盤を採用しました.分散ストリーム処理基盤には色々種類があるのですが,中でもSpark Streamingには面白い特徴があります. それはマイクロバッチと呼ばれる処理方式でストリーム処理を実現していることです. これは短時間(数ミリ秒から数十秒)で終わるバッチ処理(マイクロバッチ)を繰り返してストリーム処理を実現する手法です.

f:id:kmatsumiya6:20180328125358p:plain
一般的な方式とマイクロバッチ方式の比較

では,マイクロバッチ方式だと何が嬉しいかというと,微小時間のストリームデータをまとめて扱えるという点があります. この性質により短い時間間隔に到達したストリームデータをまとめてコレクションとして扱い,整形加工が可能です. 例えばマイクロバッチ内でストリームデータをソートし,時系列の順序に並び替えることが可能になります. つまり,ストリーム処理における入力データが事前にソートされている必要があるという条件をストリーム処理側で制御できるようになります.

これで1つ目の課題は解決できそうです.続いて2つ目の課題を見ていきます.

分散処理: 非同期処理による整合性の喪失

分散処理なので,複数のサーバ・複数のプロセスによって 実行されます.そのため,順序は保証されません. これを解決するためには,まず我々が保証したい順序をより明確にする必要があります. アクセスログの例で話すと,我々はユーザを追跡するために順序を保証したい訳です. つまりユーザ毎に順序が保証できさえすれば,必ずしも全てのログの順序が保証されていなくても良い訳です.

弊社ではユーザ単位で処理を分散することを検討しました.異なるユーザ間の時系列は当然保証されませんが,個別のユーザにおいては 同じプロセス・同じサーバ上で処理されるため順序を保証できるようになりました. 例えば,並列度を3として,Aさん〜Gさんはプロセス1,Hさん〜Pさんはプロセス2,Qさん〜Zさんはプロセス3で処理すると決めます. Aさんのアクセスログは全てプロセス1で処理され,Zさんのアクセスログは全てプロセス3で処理されます. すると特定のユーザは特定のプロセスで処理されるため,順序が崩れることはありません.さらに各プロセスは完全に独立して処理できるようになります.

これで2つ目の課題も解決できそうです.

アーキテクチャの全体像

そこで以下のようなアーキテクチャを設計しました.

f:id:kmatsumiya6:20180328134054p:plain
順序保証型分散ストリーム処理

マイクロバッチは予め設定した微小時間間隔と並列度で一斉に起動し,それぞれのバッチには特定ユーザのログだけが渡されます.そしてメインの処理を実行する前に微小時間間隔で集まったデータをログのタイムスタンプでソートします.このままデータを整形・加工し,出力することでユーザ単位の順序は保持された状態になります.複数のプロセスが非同期に出力しますのでプロセス間では出力の順序は保証されませんが,単一のプロセスにおいては順序が保証されます.このようにして弊社では順序保証型分散ストリーム処理を実現しています.

大まかな概念の説明をした所で,具体的な実装を少し紹介したいと思います.

ソート処理

Spark StreamingではScalaのコレクションを操作するのとほぼ同じ感覚でストリーム処理を記述できます. では,実際にソート処理を書いてみましょう.ソートする際はパーティション毎にソートする必要があります. 具体的な説明は省きますが,Spark Streamingにおいてパーティションとは処理の単位となっていて,1つのパーティションが1つのプロセスと紐付きます. 下記コードを見ていきます.

inputStream.mapPartitions( part => {
  part.toArray.sortWith((x, y) => x.accessTime< y.accessTime).iterator
}, preservePartitioning = true
).print()

mapPartitions関数で各プロセス毎にArrayクラスのsortWith関数を利用して時系列順にソートします. これだけで各プロセス内ではレコードがソートされた状態になります. 尚,inputStreamはDStreamというストリームデータの受け口となるオブジェクトを示しています.

下記に実際にコンパイルして動かせるサンプルコードを載せます. *1

コード全体

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

object Driver {

  def main(args: Array[String]): Unit = {
    // Sparkを使う準備
    val sparkConf = new SparkConf().setMaster("local").setAppName("test-spark")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Milliseconds(10000))

    // ストリームの作成
    val stream = ssc.socketTextStream("localhost", 7777)

    // メインとなる処理
    stream.mapPartitions(part => {
      part.toArray.sortWith((x, y) => x < y).iterator
    }, preservePartitioning = true
    ).print()

    // ストリーム処理の開始
    ssc.start()
    ssc.awaitTermination()
  }
}

ncコマンドでストリームデータを生成

$ nc -lk 7777

実は完璧に順序は保証できない

順序保証型と言っておりましたが実は完璧には順序を保証できていません. なぜかというと,マイクロバッチを一定間隔で実行しているので,バッチ処理間にまたがる順序のズレを保証できないためです. 本当に順序を完璧に保証するためには一定期間分ログをすべて集約しソートする必要があります. ここはどうしてもトレードオフの関係があり,リアルタイム性と順序の整列度合いを完全に両立する方法はおそらくありません.

順序保証型分散ストリーム処理を採用する際は,リアルタイム性と順序の整列度を天秤にかけ調整する必要があります.

その他

本記事では順序保証型分散ストリーム処理についてお話しましたが,入力ソースをどうするか,外部データソースの連携をどうするかなど ハイパフォーマンスな設計にするための方法をお話できていません. この辺りは改めて別の機会に書けたらと思います.

それでは,今回は以上となります.ご清覧ありがとうございました.

*1:ただし実際に動かすにはSparkコマンドのインストールだったりjarの生成のためにbuild.sbtを準備したりと若干手間がかかりますのでご留意を・・