MicroAd Developers Blog

マイクロアドのエンジニアブログです。インフラ、開発、分析について発信していきます。

Dockerコンテナを使ったHiveクエリの自動テスト

こんにちは!マイクロアドでサーバーサイドエンジニアとしてバッチ開発を担当している根本 (id:realyutanemoto)です。

マイクロアドではHadoop分散ファイルシステム(以下、HDFSとする)にビッグデータを蓄積し、その加工処理(ETL処理)を行うバッチを作っています。 今回はそのテストの一部でHive1クエリの自動テストを実装した方法を紹介します。

バッチ処理のテスト

Hiveを使ったバッチ処理の例

広告の配信システムでは膨大な量のログが蓄積されます。

広告枠に表示する広告はRTB (Real-Time Bidding) と呼ばれるオークションを行って決定されるのですが、マイクロアドで開発しているシステムではそのオークションの結果が出るたびにログが出力され、蓄積されます。そのほかにも広告が表示されたりクリックされたりするたびにログが出力され、さらにそれらを集計して分析や広告主への請求といった用途のために加工する際にもその都度ログが蓄積されます。

マイクロアドではそういった広告の配信ログをはじめとしたビッグデータをHDFS上に蓄積して運用しています。HDFSに蓄積されたデータをフォーマット変換・加工・転送・保守といった形で操作するバッチシステムを構築しており、HDFS上のデータを操作するのにHiveというSQLクエリエンジンを使っています。

マイクロアドのバッチ処理の役割

処理内容の例として、例えば下記のように「2つのテーブルをjoinして別テーブルに挿入」というような処理は多くのバッチ処理で実装されているものです。

入力: ログAテーブル

time id common_id price
2023-05-18 12:01:00 1 1 100
2023-05-18 12:02:00 2 2 200
2023-05-18 12:03:00 3 3 300
... ... ... ...

入力: ログBテーブル

time id common_id
2023-05-18 12:50:00 101 1
2023-05-18 13:10:00 121 9
2023-05-18 19:40:00 140 3
... ... ...

出力: ログCテーブル (ログAとログBを結合)

common_id a_id a_time b_id b_time price
1 1 2023-05-18 12:01:00 101 2023-05-18 12:50:00 100
3 3 2023-05-18 12:03:00 140 2023-05-18 19:40:00 300
... ... ... ... ...

主なバッチ処理の役割は、Dockerコンテナ上に動いているHDFS上でログAとログBをjoinする下記のようなHiveクエリを実行することです。

INSERT OVERWRITE TABLE ログC
SELECT
  a.common_id,
  a.id AS a_id,
  a.time AS a_time,
  b.id AS b_id,
  b.time AS b_time,
  a.price AS price
FROM
  ログA AS a
  INNER JOIN ログB AS b ON a.common_id = b.common_id

なぜ自動テストしたいのか

マイクロアドでは広告配信システムを自社で開発・運用しています。そのためシステムの品質を自社で担保する必要があり、実装に変更が加えられるたびに開発チームがテストを実施して要件を満たすことの確認を徹底しています。

自社のプロダクトが大きくなっていくことは誇らしい一方で、スケールしていくプロダクトの品質担保にかかるコストは増大していきます。

これまでは

  1. HDFSにテストデータを挿入
  2. バッチを実行
  3. 出力結果を取得して比較

といった一連の流れを手動で実行しており、処理実装に変更を加える際にはその手動実行の結果が主な品質担保の材料となっていました。

しかし

  • 運用するバッチの数が増加(現在は300個以上のワークフローが稼働)
  • 手動テストの場合テストケースやデータの再利用性が低い

といった背景もあり、手動テストだけではメンテナンスコストと品質担保の両面で限界があります。

※自動ユニットテストは実装していましたが、担保できる内容はクエリ以外のロジックやクエリ自体が想定と一致することの確認に限られます。

これまで手動テストで実施していたような内容の一部を自動テストにより実装することでテストの再利用性が向上し、バッチ処理に改修を加えるコストを大きく削減できます。

上記のような背景から、運用コストの低減とソフトウェア品質の向上を図るために、現在マイクロアドでは手動テストに対する自動テストの比率を上げていこうという方針で開発を進めています。今回紹介しているHiveのテストもその一環で新たに実装されたものです。

Hiveを使ったバッチ処理での自動テスト

冒頭で挙げた例のように、マイクロアドのバッチ処理では、Hadoopクラスタ上で動作するHiveと呼ばれる分散型SQLクエリエンジンでクエリを実行する処理が多く作られています。

そうしたバッチの挙動をテストするため、Hiveを使ったバッチ処理で想定通りにデータの操作が行えているかを確認するのが今回自動化するテストの役割です。

自動テスト作成時の観点

冒頭の、テーブルをJOINするバッチの例を思い出してください。 ログAとログBをJOINしてログCに挿入するのがバッチの処理内容でした。

そのような場合にテストを実装するときの観点には例えば以下のようなものがあります。

  • 境界値(それぞれの入力値の型で許容される最大・最小値などを入力データとしたときに想定通りの出力がされるか)
  • 異常値
  • 特殊な文字(改行・タブ・NULL・空文字・エスケープなどの扱い)
  • JOIN時のレコードの紐づき関係(ログAとログBが1:1、1:N、N:1、N:Nで紐づくパターン)
  • 冪等性(何度実行しても同じ結果となっている性質)が担保されているか

こうしたテスト観点の中には自動テストによる確認が難しいもの(例えば冪等性についてはUIからバッチを実行するプロセスまで含めて確認できるのが望ましい)もあります。

一方で境界値や異常値などテストデータの作り方だけで確認ができるものについては、クエリの実行によるデータの状態のみを確認すれば十分です。

バッチ処理について自動化するテストの範囲では、上記のようなクエリ実行前後の比較のみで確認できる項目を検証の対象とします。

自動テストですること

上記で例に挙げた観点のうち、境界値を検証するテストを考えてみましょう。冒頭の例で考えると、入力には

  • time
  • id
  • common_id
  • price

カラムを持ったログAと

  • time
  • id
  • common_id

を持ったログBが存在しました。境界値を考えるにあたっては、以下のような入力テストデータを作成して実行し、その出力を見ます(timeはTimestamp型、idはINT型を想定)。

検証カラム 条件
time 最大値 2038-01-19 03:14:07
time 最小値 1970-01-01 00:00:00
id 最大値 2,147,483,647
id 0 0
id 最小値 -2,147,483,648
... ... ...

これらの境界値を含むログAとログBを入力としてバッチを実行した結果、上記で定義したテストデータの値を保持したログCが出力されたことを確認できれば、境界値のテストは問題ないと言えます。

またこのテスト実行の際には任意の環境で任意の入出力を行えるよう、Dockerコンテナ上に立ち上げたHDFSへ接続してテストを実行します。

自動テストの実装

自動テストの実行の流れをまとめると、以下のようなプロセスが必要になります。

  1. Hadoopを動作させるDockerコンテナを立ち上げ
  2. Hiveテーブルの初期化
  3. テーブルに入力テストデータのレコードを挿入
  4. バッチ処理の実行
  5. バッチ実行による出力データの検証
  6. (別の組み合わせのテストデータで実行する場合は)2-5を繰り返す
  7. Dockerコンテナを終了

Dockerコンテナの立ち上げ

ポイントとなるのはDockerコンテナの立ち上げとそこへの接続です。

マイクロアドのデータ基盤ではCloudera社のCDH (Cloudera’s Distribution of Apache Hadoop) でHadoopクラスタを構築しています。そこで自動テストで必要になる最小のコンポーネント(HDFS、YARN、Hive)を単一のコンテナでサービスを起動し、ローカル環境でもテスト出来るテスト用のDockerコンテナを用意して利用しています。

CDHについては以下の記事でも紹介していますので、詳細は下記をご覧ください。 developers.microad.co.jp

このためテスト時にはローカル環境でそのコンテナイメージを利用してDockerコンテナを立ち上げて利用します。

テストデータの操作

テストデータの初期化以降では、主にimpyla2というライブラリを用いてクエリを実行してテストデータを操作します。

from impala.dbapi import connect
conn = connect(...)
cursor = conn.cursor()
cursor.execute("INSERT OVERWRITE TABLE ログA SELECT ...")

上記のライブラリを用いるとクエリを用いてそのままテストデータの操作ができるため、テストデータの準備をする際には手動テストのときに準備していたINSERTクエリを再利用しながら実装できます。

バッチ処理の出力比較についても、テスト結果となるデータを取得するSELECTクエリを準備して上記の形で実行し、結果を一般的なユニットテストフレームワークのassertのように比較する実装となります。

実装のまとめ

上記を踏まえて、自動テストコードの最低限の実装をまとめると以下のようになります。

import unittest
from typing import List

class TestHiveSample(unittest.TestCase):
    def test_hive_sample_1(self) -> None:
        # impalaを使ったHive接続
        from impala.dbapi import connect
        conn = connect(設定値を渡す)
        cursor = conn.cursor()

        # テストデータの挿入
        cursor.execute("INSERT OVERWRITE TABLE ログA SELECT ...")

        # バッチ処理の実行
        # 上記で挿入したテストデータを元に別のテーブルに出力する処理(冒頭の例のバッチ処理のイメージ)
        テスト対象のバッチ処理()

        # 出力テストデータの取得(テスト対象のバッチ処理で出力されたデータをSELECT)
        cursor.execute("SELECT * FROM ログC WHERE ...")
        actual_result: List[tuple] = cursor.fetchall()

        # Hive接続をclose
        cursor.close()

        # 結果比較(検証)
        expected_result = [
            (1, 1, '2023-05-18 12:01:00', 101, '2023-05-18 12:50:00', 100),
            (3, 3, '2023-05-18 12:03:00', 140, '2023-05-18 19:40:00', 300)
        ]
        self.assertEqual(expected_result, actual_result)

Dockerコンテナ上でHiveサービスを起動し、その上で上記テストを実行するというのが現在運用しているHive自動テストです。ユニットテストで確認する範囲に、Dockerコンテナで稼働しているHDFSへ実際に接続してクエリ実行するまでの処理を加えるようなイメージです。

他にもHDFS上のファイルを初期化する部分や結果を見やすくするログ出力、少し工夫してJSONを元にしてテストデータを挿入する処理などがありましたが、上記のテストコードでは省略しています。別の記事が1本かけるくらいのボリュームになってしまうため、より詳細な実装については機会があれば別記事で紹介したいです。

おわりに

今回の記事では、マイクロアドのHiveを扱ったバッチ処理の概要とその自動テストの内容を紹介しました。

テクニカルな知識については多く触れることができませんでしたが、今後機会があればより実装寄りの話にフォーカスした内容も別の記事で挙げられたらいいなと思っています。

今回共有した自動テストの方法が少しでも役に立ったと思ってもらえるような内容になっていれば嬉しいです。