MicroAd Developers Blog

マイクロアドのエンジニアブログです。インフラ、開発、分析について発信していきます。

HTTP cookieを使って年齢と性別を推定する

背景

表題をみた皆様はこう思われたに違いない「何を今更そんな話、、、、」。

実はこれ、

developers.microad.co.jp

でサラリと触れられている”SPSS"に関係があるんです。

SPSSという壁

マイクロアドではSPSS Modeler(以下SPSS)を使って年齢と性別を推定していました。いま流行の機械学習でね。

皆さんのSPSSについての印象は大まかに次の3パターンに分かれるでしょう。

「Ⅰ. SPSSってなんぞや?(((゚д゚; )」
「Ⅱ. (∩゚д゚)Rの有償版?」
「Ⅲ. あぁ、察した、、、(そそくさと退散)彡(* ̄з ̄)」

「( ・ิω・ิ)SPSSのスペシャリストです。」 なんて人はみないですよね。 マイクロアドでは具体的に次の課題を抱えていました。  

* 使用方法を知っている人材が市場に少ないので教育から始めざるを得ない
* GUIで簡単に操れるためきれいな設計が難しく、保守も難しくなる
* SPSSはライセンス料が高価

など(勿論、他にも色々)....

SPSSは使わない

おっと、自己紹介が遅れました。
私(id:purucat)は、”大脳洋公開日記”やTJOじさん等を横目に見ながら、学生時代までは脳科学の研究者を目指していた系のソフトウェアエンジニアです*1

SPSSは、先に上げたように課題も多く、 また、次ような条件が揃っていたため、アイデアとコードをスクラッチから実装してみました。
Python3でscikit-learnを使いました。

  • マイクロアドは新しいシステムの導入に寛容である *2
  • 私が大学院で機械学習にも関係する研究室にいたこともあり、記憶をほじくり返せば対応できなくはなさそうであった。
  • scikit-learn等のライブラリの充実 *3

本題 ~ HTTP cookieを使って年齢と性別を推定する

広告を配信する際に閲覧者の年齢や性別が分かると何が嬉しいのでしょうか?

閲覧者の情報を持っていない場合、広告は闇雲に配信されてしまいます。このような状況では、男性に対して女性向けのバッグなどの広告が配信される事もありえます。
一般的には、女性向け商品に男性は余り興味を持たないと思われますので、広告を出す側も、閲覧する側も良い思いはしないでしょう。

一方、閲覧者の年齢や性別を利用して配信することができれば、広告を出す側はターゲットとする層に配信できる為、広告の費用対効果が改善されます。 又、閲覧者にとっても自分の興味に近い広告が出るようになります。(Win-Winの関係?)

ただ、年齢や性別の情報が紐付けられた閲覧者というのは、ごく一部です。 そのため、情報が存在しない閲覧者については、年齢や性別を推定する必要があります。

問題の設定

では、年齢や性別を推定する際に使用可能なデータは何でしょうか?

一例とはなりますが、 ホームページにアカウントを登録し、且つ、登録情報の広告利用への許諾をしていただいた閲覧者からは、 下記のような、データを提供していただいております。

使用可能なデータ 説明
広告閲覧者が訪問したURL https://www.microad.co.jp/about/?search=this-is&query=example マイクロアドのcookie を持っているBrowserに紐づいている 
年齢や性別の情報 年齢:68歳、性別:男 業務提携先から頂戴した

図で表すと下記の図のようになります。

f:id:purucat:20180427172437p:plain

以上のようなデータを「学習データ」として、ホームページ訪問履歴を持つ閲覧者の年齢と性別を推定するというのが問題設定となります。
下記が推定対象の具体例になります。

推定対象 推定対象の広告閲覧者の行動履歴の例
年齢 `競馬/top.htm`を1回、`OX新聞/page1.htm`を1回、`OX新聞/yyy/xxx.htm`を1回、`www.microad.co.jp/`を1回 訪問した
性別 `競馬/top.htm`を1回、`OX新聞/page1.htm`を1回、`OX新聞/yyy/xxx.htm`を1回、`www.microad.co.jp/`を1回 訪問した

f:id:purucat:20180427172520p:plain

使用可能なデータを使って「学習」をし、推定対象の広告閲覧者の行動履歴を使用して推定対象の値を決めるという問題です。

以降、主に性別推定についてお話しますが、実際には同じ枠組みで年齢推定もやっていました。

データを可視化してアプローチ方法を考える

何はともあれ、データの特性を把握しないと、どのような手法でアプローチすればよいかわかりません。 まずは定石の可視化をしてみました。

閲覧者が訪れたサイトがどういう性質のものかを分析するためにはHTMLのmeta tag (descriptionやkeywards)を 見るのが直感的に良さげと思われます。

ただし、使用可能な学習データの分布が不明であるなどの理由から、まずはFully Qualified Domain Name (FQDN)毎の被訪問数*4と、そのユニークな被訪問数を男女比率(男性が青。女性が赤)で表示するように、プロットしてみました。

f:id:purucat:20180427174005p:plain

いい感じに男女比率がFQDN毎にバラけているのがわかります。 また、サイト毎に1ユーザの訪問頻度が大きく異なることもわかりました。 取り敢えずは、FQDNレベルの粒度で「特徴量」を作成して良さそうです。 *5

ついでに、閲覧者毎の一日あたりのFQDN訪問頻度もみてみましょう。

visit to different FQDNs per unique audience id

横軸の"Unique audience id"は閲覧者毎に訪問頻度が多い順にソートしたものです。 閲覧者毎にサイトを訪問する頻度が違うという事実を踏まえて、機械学習の方法を考えるのが良さそうです。

モデリング

FQDN毎に閲覧者の性別が異なる傾向にあるのがわかりました。 又、訪問数も閲覧者毎に異なることがわかりました。

f:id:purucat:20180427180448p:plain

scikit-learnで機械学習をするためには、学習に利用する「特徴量」を数値化する必要があります。

今回、上図の女性と男性を次のような計算式を利用して、次のような数値ベクトルの特徴量で表現しました。

女性: ([0.9, 0.1] + [0.5, 0.5]) / 2 = [0.7, 0.3]
男性: ([0.2, 0.8]*2+[0.5, 0.5])/3= [0.3, 0.7]

特徴量のスケールを揃える観点*6と数値計算の収束という観点からL1ノルムを使いました。

学習モデルは多項ロジスティック回帰を使用しました。

結果

性別

性別の推定は正解率(accuracy)で67~75%ほどの精度でした。

年齢

「10歳から21歳までの年齢」である人に対して広告を出したい、、、という条件で広告を出すのは一般的であると思われますが、 年齢の推定を「点推定」、つまりピンポイントで推定してみるということもやってみました。

その結果、ちょっと興味深い結果が出たのでチラ見せしちゃいます。 あくまでも一例ですが、0歳から99歳の100分類問題として学習と推定をすると下図の様になりました。 f:id:purucat:20180427182846p:plain 横軸が正解で、縦軸が推定です。 対角線に分布しているデータは推定が正しかったことを表します。

ただFQDNだけを特徴量とするのは荒すぎのようで、閲覧者毎の年齢由来の特徴を完全には表現出来ていないようです。 今後はサイトのmetaタグから、特徴量を作り、実験してみるのも良さそうですね。

結果の総評

以上のような精度になった理由として、家族で共用のブラウザを利用しただけで、ある意味正解がない状態ができてしまうという事などが考えられます。 そこら辺を考えるとまずまずの精度ではないかとも思えてきます。

プロダクション環境に組み込む

プロダクション環境に組み込む前に10-分割交差検証などを実施しました。

結果、SPSSを使う時以上の推定の安定性や、計算速度の性能向上が確認できたため、SPSS版の推定処理を置き換える形で、プロダクション環境に組み込みこむ事が出来ました。

データ収集と主な加工はHiveで分散処理させています。 推定部分は諸事情により、Pythonをインストールした一台のサーバーで実行しているため分散処理はできてはおりません。 ただ、現在のデータサイズでは一台でも処理上の問題はありませんでした。

終わりに

ご拝読ありがとうございました。

マイクロアドでの仕事のイメージが少しは皆様に伝えられたならば光栄です。-=ニ_(´・∀・`)_/ニ=-イェーイ♪

*1:脳科学系に携わっていた方々は、アドテクがブームであった頃はよく広告系に流れていたものです。

*2:IT業界には変化を嫌うおじさんとおばさんの溜まり場があったりするけれども。

*3:始めるまでは、アルゴリズムも自分で実装する気持ちでいたのだけど。

*4:複数回訪問されている場合もカウント。

*5:あまり細かい粒度でみてしまうと、「過学習」してしまいますしね。ここには記載しませんが、その他考慮したことは色々あります。

*6:訪問頻度は何らかの特徴にはなりえそうですが各閲覧者は「閲覧者」という観点では平等ですので、訪問頻度が多い閲覧者ほど相対的に「強い」特徴であるべきではありません。

広告配信プログラム PerlからScalaへの軌跡

MicroAdの京都研究所でソフトウェアエンジニアとして勤務している池田です。主に広告配信やReal Time Bidding(以下RTB)のプログラムの開発に携わってきました。 本日はアドネットワークと呼ばれていた時代から現在主流となったRTBに至るまで、MicroAdの広告配信プログラムで採用したプログラミング言語の経緯についてご紹介します。

MicroAd 配信プログラムと採用言語の変遷

サービスの開発年次と採用言語

サービス 概要 開発言語
2004 BlogClick ブログに特化したコンテンツマッチ型広告配信 Perl
2006 MicroAd リターゲティング/行動ターゲティング広告配信 Perl
2009 MicroAd配信改修 アーキテクチャ変更 Perl
2009 MicroAd VASCO 媒体向け無償アドサーバ PHP *1
2011 BLADE Demand-Side Platform (DSP) Java
2011 AdFunnel Supply Side Platform (SSP) Perl → Java
2014 COMPASS Supply SIde Platform (SSP) Scala
2015〜  BLADE Scala化 開発言語 変更 Java → Scala *2

f:id:mmotoi0:20180423175632p:plain
RTBの概念図

2004 - BlogClick

2004年にサイバーエージェントの1事業としてBlogClickというブログに特化したコンテンツマッチ型の広告配信サービスがリリースされました。開発言語はPerlでした。当時、私はまだ入社していなかったので採用に至った正確な理由はわかりませんが、初期開発を担当していた会社が得意な言語だったからだと思います。

2006 - MicroAd

BlogClickはその後サービス名をMicroAdに変更し、ブログ特化ではなく幅広いメディアへの配信を行うようになりました。2007年にはサイバーエージェントの1事業から分社化しました。 配信プログラムは当初のPerlのプログラムが改修されていき、ブログ以外のメディアへの配信や行動ターゲティング/リターゲティングといった広告配信にも対応していきました。当然プログラムはどんどん複雑になっていきました。機能追加は既存コードのコピーと必要最低限の変更のみで適切なリファクタリングが実施されておらず、重複するコードや不適切なクラス/関数で溢れかえりました。テストコードもなくバージョン管理システムは一部SVNが使われていましたが末尾に'_yyyymmdd'の付いたファイルが大量に出来上がっていました。 お世辞にもいいコードではありませんでした。

2009 - MicroAd 配信改修

2008年に私がMicroAdに入社し、2009年に配信プログラムを1から書き直すことになりました。 理由としてはデータ分析チームの要望に従って配信ロジックを変更する必要があった事とアーキテクチャの見直しを行って処理速度を上げたかったためです。 ただ言語はPerlのままでした。

2011 - AdFunnelとBLADEの開発

この頃にRTB配信に注力することになりAdFunnel(SSP)とBLADE(DSP)を開発することになりました。 AdFunnelは既存配信サービスであるMicroAdを改修することで対応する予定だったのですが、このタイミングで開発言語をPerlではなくJavaにすることなりました。同じくBLADEの開発もJavaで行われることになりました。

Javaにした1番の理由は並列処理のサポートが手厚いことです。SSPでは並列で複数のDSPにHTTPリクエストを送り処理する必要があります。 当時の主流の言語の中でJavaに匹敵する並列処理機能を持ちつつ高パフォーマンスな言語はなかったと思います。もし現在だったらGo言語を検討したかもしれません。 DSPは基本的に並列処理の必要はありませんが、SSPと並行して開発することやパフォーマンスも良好なこともあってBLADEのRTBもJavaにしました。

私自身Javaの基本的な文法はひと通り理解していましたが、ある程度規模のあるシステムをJavaで開発した経験はありませんでした。 そのためJava開発のプラクティスやRTBで必要となる並列処理について学ぶべく、オブジェクト指向やデザインパターン, 並列処理について勉強しながらの開発になりました。

Javaのメリット

AdFunnelやBLADEの開発や運用をJavaで進めていくうちに、並列処理やパフォーマンス以外のJavaの利点を感じるようになりました。 それはリファクタリングのしやすさです。EclipseやIntelliJといったIDEのサポートがある事が大きな理由ですが、それらの機能も言語が静的型付きだからであるため実現出来ている事を理解しました。 私はもともとRubyが好きで、Javaは好きな言語ではありませんでしたがリファクタリングのしやすさ、型付きであることで好きな言語になりました嫌いじゃなくなりました。 ただし、なんでもJavaでの開発が良いというわけではなく、あくまでもリファクタリングが多く必要になるような大規模なアプリケーションの開発に限った場合です。 今でもリファクタリングが必要のない使い捨てや改修が発生しない小規模なアプリケーションならRuby等のほうが良いと思っています。

2014 - COMPASS配信でのScalaの採用

当初わかってなかったJavaの利点を認識し始めた一方で、LL系言語と比べてコードが書きづらい、長くなるといったデメリットは開発当初からずっと感じていました。 もっとLL系のように簡潔にコードを書きたいとずっと考えていた結果、2014年に始まったCOMPASS(SSP)の配信システムを開発するタイミングでScalaを採用することにしました。 2010 - 2011年ごろからWebや書籍等でScalaの情報が増え始め、個人で勉強をしていました。 が、難しくて何度か挫折しました。 ただ、Javaでの開発を2,3年するうちに上述のJavaのデメリットを強く感じるようになり、ScalaならJavaのメリット(並列処理/パフォーマンス/リファクタリングのしやすさ/静的型付き)を引き継ぎつつデメリットの多くが解消されると実感しました。

型推論による型の省略、ケースクラスによる代数的データ型の定義、高階関数/匿名関数による関数の汎用化、immutableなデータ構造と参照透明なメソッドを推奨することにより堅牢かつシンプルにコードが書けることはScalaでの開発当初から特に気に入っていた点です。もちろんtraitによるmixin合成, 合成を容易にするfor-comprehensionなどのScalaの機能もメンテナンスしやすいコードを書くために役立っています。

COMPASSの広告配信プログラムをScalaで実装することに不安はありました。 今では同業のアドテク系を含む多くの企業でScalaは採用されていますが当時はあまりなかったと思います。 自分自身が何度か挫折していることもあって学習コストが高いことも認識していました。 それでも、個人的にJavaのデメリットを解消してScalaのメリットを取り入れたかったこと、いっしょに働く京都研究所のメンバーならメリットを理解した上でScalaを使えるようになってもらえると思ったので採用しました。 Scalaの質問はある程度私がサポートできること、Scala独特な機能を使わずにJavaとほぼ同じようにも書けること(Better Javaとして使う)、Javaのクラスと協調して動作するので最悪Javaでコードがかけるのも採用に踏み切った理由です。

2015 - BLADE RTBのScala化

COMPASSの配信でScalaを使うようになってBLADEのRTBもScalaで書きたいと思うようになり、部分的にJavaからScalaへの移行をはじめました。 現時点で全コードの3割程度がScalaになっています。またCOMPASSの配信で実装した一部の機能(DBアクセス等)は独立したScalaのライブラリとして切り出し、BLADEのRTBと共有しています。 COMPASSの配信についてはほぼ100%Scalaで実装されていますが、BLADEのRTBは3割にとどまっており、Scalaのメリットを受ける一方でScalaとJavaが混在することによる書きづらさも存在します。

まとめ

配信/RTB特有の機能/非機能要件のため当初のPerlからJavaに変更し、そしてJavaのデメリットを解消するためScalaの採用に至りました。

しばらくは広告配信/RTBではScalaの採用を続けると思いますが、各プログラミング言語とも日々進化していますし新しい言語も登場しています。 今後もシステムの要件や言語の特徴、開発メンバー(新規採用も含む)も踏まえ、採用するプログラミング言語を決定していくことになると思います。

今回はMicroAdでの配信/RTBで採用してきたプログラミング言語の経緯についてのご紹介でしたが、今後は配信/RTBプログラムやプログラミング言語のTips的なことも書いていきたいと思いますのでよろしくお願いします。

*1:ossのopenxをベースに改修したのでここでは触れません

*2:コード全体でのScala移行進行度は3割程度

順序保証型分散ストリーム処理

はじめまして.マイクロアドの新卒3年目のサーバサイドエンジニアの松宮です. 主にMicroAd BLADEという広告配信プラットフォームの開発をしています. サーバサイドの開発言語はJava/Scalaが中心で,最近はScalaの利用シーンが増えてきました.

ということで本日はScalaが活躍するお話です. タイトルの通り,順序保証型分散ストリーム処理 についてお話します. 上記用語は分散ストリーム処理に順序保証型という単語をくっつけた造語で,分散ストリーム処理において時系列を保持して処理を実行する方法です. 「大量のデータを順番に処理したい!」という方がいらっしゃれば是非ともご一読頂ければ幸いです.

順序保証型分散ストリーム処理とは

そもそもストリーム処理とは,無尽蔵に生成されるデータを半永久的に処理し続けることを指し,アクセスログやセンサーから得られるログデータなどの時々刻々と生成されるデータを処理する目的で利用されます. 一方,分散処理はプロセスやサーバを分散させて処理することで大量データを効率よく捌くために利用されます. つまり分散ストリーム処理とは大量のストリームデータを効率よく処理するために利用されます.

f:id:kmatsumiya6:20180328122733p:plain
分散ストリーム処理の概念図

しかしストリーム処理と分散処理の両方においてデータの順序を保証することが難しいという欠点があります. ストリーム処理では,ストリームデータを到着順に処理するため,予め時系列順であることをストリームデータを生成する側が保証しなければなりません. また,分散処理ではストリームデータを分散して処理するため時系列を保ったまま処理することはできません.

ところで時系列順で処理したい時とはどういった場合でしょうか. ここでは「アクセスログでユーザを追跡すること」を例に話します.弊社はアドテクの会社なので,広告を闇雲に配信している訳ではなく,ユーザの行動履歴を分析した上で適切な広告を配信しています.(ターゲティング広告と呼びます)そのために,ユーザがどういう行動をしているかを追跡するためにアクセスログを利用しています. 例えば,ECサイトでの購買効果をユーザの行動履歴から分析したい場合には,トップページ,商品ページ,購入ページと進む流れを分析する事が考えられます. しかし,アクセスログの順序が入れ替わりトップページ,購入ページ,商品ページのように記録されてしまった場合,正しく分析できないのは直感的に分かるかと思います. よって,アクセスログを正しい順序で処理することが求められます.

f:id:kmatsumiya6:20180328123938p:plain
アクセスログでユーザを追跡

アクセスログは1秒あたりに数万件がやってきます.これは途方もない数です.一日あたりに換算すると数億~数十億というログ量になります. ただ蓄積するだけならば話は単純ですが,実際にはアクセスログを整形・加工し,分析できる状態に仕上げる必要があります. これら大量のアクセスログをストリーム処理するためには単一のプロセスでは処理が間に合いません.そこで分散ストリーム処理の検討が必要になってきます.さらに分析で利用するために順序を保証する処理も同時に実現する必要がありました.

このような経緯で弊社では順序保証型分散ストリーム処理を設計・導入するに至りました.

どうやって順序を保証するか?

既に話した通り,分散ストリーム処理において順序を保証するには次の2つの課題をクリアする必要があります.

  • ストリーム処理: 入力データの順序への依存
  • 分散処理: 非同期処理による整合性の喪失

これらの課題を順番に見ていきます.

ストリーム処理: 入力データの順序への依存

膨大なアクセスログは,通常1つのサーバではなく複数のサーバで収集されると思います. そのため,ストリーム処理の入力データは複数のサーバから非同期に送られてくることになります. 非同期であるため,そのままではログの時系列が揃わなくなるのは分かるかと思います. そのため,時系列順で処理するためには入力データをソートする必要があります. そこで弊社ではSpark Streamingという分散ストリーム処理基盤を採用しました.分散ストリーム処理基盤には色々種類があるのですが,中でもSpark Streamingには面白い特徴があります. それはマイクロバッチと呼ばれる処理方式でストリーム処理を実現していることです. これは短時間(数ミリ秒から数十秒)で終わるバッチ処理(マイクロバッチ)を繰り返してストリーム処理を実現する手法です.

f:id:kmatsumiya6:20180328125358p:plain
一般的な方式とマイクロバッチ方式の比較

では,マイクロバッチ方式だと何が嬉しいかというと,微小時間のストリームデータをまとめて扱えるという点があります. この性質により短い時間間隔に到達したストリームデータをまとめてコレクションとして扱い,整形加工が可能です. 例えばマイクロバッチ内でストリームデータをソートし,時系列の順序に並び替えることが可能になります. つまり,ストリーム処理における入力データが事前にソートされている必要があるという条件をストリーム処理側で制御できるようになります.

これで1つ目の課題は解決できそうです.続いて2つ目の課題を見ていきます.

分散処理: 非同期処理による整合性の喪失

分散処理なので,複数のサーバ・複数のプロセスによって 実行されます.そのため,順序は保証されません. これを解決するためには,まず我々が保証したい順序をより明確にする必要があります. アクセスログの例で話すと,我々はユーザを追跡するために順序を保証したい訳です. つまりユーザ毎に順序が保証できさえすれば,必ずしも全てのログの順序が保証されていなくても良い訳です.

弊社ではユーザ単位で処理を分散することを検討しました.異なるユーザ間の時系列は当然保証されませんが,個別のユーザにおいては 同じプロセス・同じサーバ上で処理されるため順序を保証できるようになりました. 例えば,並列度を3として,Aさん〜Gさんはプロセス1,Hさん〜Pさんはプロセス2,Qさん〜Zさんはプロセス3で処理すると決めます. Aさんのアクセスログは全てプロセス1で処理され,Zさんのアクセスログは全てプロセス3で処理されます. すると特定のユーザは特定のプロセスで処理されるため,順序が崩れることはありません.さらに各プロセスは完全に独立して処理できるようになります.

これで2つ目の課題も解決できそうです.

アーキテクチャの全体像

そこで以下のようなアーキテクチャを設計しました.

f:id:kmatsumiya6:20180328134054p:plain
順序保証型分散ストリーム処理

マイクロバッチは予め設定した微小時間間隔と並列度で一斉に起動し,それぞれのバッチには特定ユーザのログだけが渡されます.そしてメインの処理を実行する前に微小時間間隔で集まったデータをログのタイムスタンプでソートします.このままデータを整形・加工し,出力することでユーザ単位の順序は保持された状態になります.複数のプロセスが非同期に出力しますのでプロセス間では出力の順序は保証されませんが,単一のプロセスにおいては順序が保証されます.このようにして弊社では順序保証型分散ストリーム処理を実現しています.

大まかな概念の説明をした所で,具体的な実装を少し紹介したいと思います.

ソート処理

Spark StreamingではScalaのコレクションを操作するのとほぼ同じ感覚でストリーム処理を記述できます. では,実際にソート処理を書いてみましょう.ソートする際はパーティション毎にソートする必要があります. 具体的な説明は省きますが,Spark Streamingにおいてパーティションとは処理の単位となっていて,1つのパーティションが1つのプロセスと紐付きます. 下記コードを見ていきます.

inputStream.mapPartitions( part => {
  part.toArray.sortWith((x, y) => x.accessTime< y.accessTime).iterator
}, preservePartitioning = true
).print()

mapPartitions関数で各プロセス毎にArrayクラスのsortWith関数を利用して時系列順にソートします. これだけで各プロセス内ではレコードがソートされた状態になります. 尚,inputStreamはDStreamというストリームデータの受け口となるオブジェクトを示しています.

下記に実際にコンパイルして動かせるサンプルコードを載せます. *1

コード全体

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

object Driver {

  def main(args: Array[String]): Unit = {
    // Sparkを使う準備
    val sparkConf = new SparkConf().setMaster("local").setAppName("test-spark")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Milliseconds(10000))

    // ストリームの作成
    val stream = ssc.socketTextStream("localhost", 7777)

    // メインとなる処理
    stream.mapPartitions(part => {
      part.toArray.sortWith((x, y) => x < y).iterator
    }, preservePartitioning = true
    ).print()

    // ストリーム処理の開始
    ssc.start()
    ssc.awaitTermination()
  }
}

ncコマンドでストリームデータを生成

$ nc -lk 7777

実は完璧に順序は保証できない

順序保証型と言っておりましたが実は完璧には順序を保証できていません. なぜかというと,マイクロバッチを一定間隔で実行しているので,バッチ処理間にまたがる順序のズレを保証できないためです. 本当に順序を完璧に保証するためには一定期間分ログをすべて集約しソートする必要があります. ここはどうしてもトレードオフの関係があり,リアルタイム性と順序の整列度合いを完全に両立する方法はおそらくありません.

順序保証型分散ストリーム処理を採用する際は,リアルタイム性と順序の整列度を天秤にかけ調整する必要があります.

その他

本記事では順序保証型分散ストリーム処理についてお話しましたが,入力ソースをどうするか,外部データソースの連携をどうするかなど ハイパフォーマンスな設計にするための方法をお話できていません. この辺りは改めて別の機会に書けたらと思います.

それでは,今回は以上となります.ご清覧ありがとうございました.

*1:ただし実際に動かすにはSparkコマンドのインストールだったりjarの生成のためにbuild.sbtを準備したりと若干手間がかかりますのでご留意を・・

さようならNetezza

 はじめまして、マイクロアドでインフラを統括している元井です。会社がサイバーエージェントから分社化するちょっと前から所属しているのでもう11年、ここで言うのもあれですが正直こんなに長居するとは思っていませんでした。いろんなことがありましたので、ここで小出ししていく予定です。

さて、マイクロアドでは分析者向けデータウェアハウス(以下DWH)としてNetezza(今だとPureDataという製品名ですが導入当時からの親しみを込めてNetezzaと呼びます)を利用していましたが、とうとうこの3月で停止する運びとなりましてシステム開発部全員が一丸となって対応にあたっています。クラウド・オンプレにおいて分散基盤が流行りの昨今、ラックモデルという拡張が容易ではないのもあってこの運びとなりました(とは言え2回の拡張をしました)。

思い起こせばマイクロアドがNetezzaを導入したのは2010年。TwinFin3というモデルでした。2010年当時、広告接触ユーザ数が5000万人くらいで、当然それなりのユーザアクセスログも蓄積されていました。どのくらいアクセスがあったのか記憶がさだかではないですが確か日に3億程度だったかと(今思えばそれでも相当大きなリクエスト数でした)。

この頃から会社としてデータの重要性を認識し、分析者向けの環境として各マスターDB、集計DBからデータを集約したMySQLサーバを運用していました。広告効果の分析や、次期商品(ちなみにこの時の次期商品というのがReal Time Biddingと呼ばれる広告表示の際にオークションが行われるという仕組みの入札機能です)の開発のためにもっと大量のデータを扱いたい!という分析者の要望があがり、テラバイト規模のデータを高速に取り扱えるものとして幾つかのDWHを検証しました。下記の選定基準を軸にサンプルデータを用いて比較しました。

  • データ容量
  • データロードの速さ
  • クエリ性能
  • 運用の容易さ

そして検証を重ねた結果、当時使っていたMySQLサーバの100倍(サーバスペックが低すぎたわけではなかったはず)程度のクエリ性能が出たNetezzaの導入を決定したわけです。f:id:mmotoi0:20180315121726p:plain

とは言え、なんでそんなに速いのかNetezzaよ。内部的に分散処理を行えるように設計されているのと、FPGAを搭載していたからです。例えばWHERE句を使ったクエリであれば、HDDから読み出されたデータは通常メモリ上に展開された後にCPU処理されます。ですがNetezzaではHDDから読み出す際、先にFPGAでフィルタ処理をして条件にマッチしたデータのみをメモリに書き込みます。ディスクから読み出すと同時にフィルタ処理行われているため高速なわけです。他にもマニアックな技術がいろいろあったはずですがうろ覚えなものでここでは割愛します。

Netezzaを導入したタイミングで分析者用のSPSS Clementineのサーバ版であるModeler serverを使うようになって分析処理性能が一気に加速しました。そして、高速になった結果としてデータ処理もデータ容量も増えまくりました。しかし、人間の欲というものは恐ろしく、速いことを当然と捉えるようになると人間をダメにするのにも役立ちました(速いスピードで適当なクエリも処理できてしまうのでクソクエリも増加したとかしないとか)。

そんなこともあってか、あれほど速かったTwinFin3はいつの間にか遅くなり2011年にはTwinFin6を導入しました。名前からわかるとおりTwinFin3の倍のスペックでした。

この頃からReal Time Biddingが主流になったお陰で入札額計算や行動履歴から何に関心を持っているかの自動算出など様々な分析処理を行うようになりました。そのため速さの恩恵をあずかったのはわずかな期間でした。
一部の処理はHadoopに逃がすといった対応をしてなんとか使い続けていたのですが、どうしようもなくなってしまい、2013年に新バージョンであるPureDataを導入しました。2ラックモデルで重さは2トン。いちおうアプライアンスということでラックに機器が搭載された状態での輸送でした。データセンターへの設置の際はラックを立たせた状態のまま移動させなくてはいけなく、このような納品は滅多にないので大人たちがはしゃいでたのを覚えています。
流石に容量、パフォーマンスと申し分なく、良き分析基盤として稼働しておりました(がしかし人間の(ry)。

ちなみに頑なにNetezzaを使っていたのは高速処理の機能で分析者が使っているSPSSとの接続性の良さが大きかったからです。SPSS+NetezzaだとSQLプッシュバックと呼ばれるSQL最適化を効かせる事でパフォーマンスを向上させる素敵な機能があります。通常の分析ツールはDWHのデータを分析環境にダウンロードして処理するので、複雑な処理のパフォーマンスを向上させるためには別途中間テーブルを作るなどのチューニングが必要になります。しかし、SPSS+NetezzaでSQL最適化を効かせるとNetezza上で処理を完結するようになり容易にパフォーマンスが向上させることが可能でした。

そんなNetezzaもスケールアウトが難しいという問題を抱えています。そのためビジネスの要求に合わせて拡大することが難しくなり、冒頭ありましたように3月で利用終了です。

近ごろは大量データを取り扱う基盤は常に変化しており、オンプレでもクラウドでもスケールアウトが容易なプロダクトが増えてきています。ビジネスの要求に合わせて様々な変化に対応出来るインフラにして行きたいですね。

f:id:mmotoi0:20180315123244p:plain

それでは今後ともよろしくお願いします。