MicroAd Developers Blog

マイクロアドのエンジニアブログです。インフラ、開発、分析について発信していきます。

より使いやすいデータ分析基盤にするために

京都研究所・TechLabの田中です。 今回は、データ分析基盤をより使いやすくするために実施した取り組みをご紹介します。
この記事では、結論だけでなくそこに至るまでの過程も一緒に紹介しているので、実務の雰囲気の一端も感じ取っていただければ幸いです。

1.ことの始まり

マイクロアドでは、すべてのデータを1つのHadoopクラスタに集約してシステム共通のデータ基盤1として利用しています。さらに、その共通データ基盤とは別にデータ分析基盤を用意しています。
このデータ分析基盤は、稼働中のシステムに影響させることなく自由に分析で使える環境として利用でき、現在では主にビジネス側のアナリストと開発側の機械学習エンジニアが利用しています。

ある日、ビジネス側のアナリストから次のような問い合わせが来ました。

以前利用していたGreenplumでは5分で終わっていたクエリが、今のデータ分析基盤では30分かかるようになった。さすがに業務に支障が出ているからなんとかしてほしい。

ちなみにそのクエリはこのような非常にシンプルなものでした。

SELECT col_1.xxx_id, count(1)  
FROM db_name.table_name
WHERE dt = '20191212'
GROUP BY col_1.xxx_id

まずは、テーブルのサイズやwhereで指定しているカラムでパーティションが効いているのか確認する必要があります。 しかし、以前利用していたDWHであるGreenplumと比べて応答速度が大幅に遅くなっているという点を踏まえると、何かしらの調査・対応は必要そうです。
というわけで、クエリの応答時間を改善するための対策を考えることになりました。

2.状況の整理

なぜクエリの応答がこんなに遅いのか。問い合わせ後の調査も踏まえた上で、考えられる原因は以下のようなものでした。

2-1 Hive on MR

データ分析基盤としては、Hadoopディストリビューションの1つあるCDH(v5.14.0)を採用しています。 利用者がデータへアクセスする際、クエリエンジンにはHive、分散データ処理にはMapReduceという、いわゆる Hive on MR の構成で使ってもらう環境になっていました。

MapReduceでは、データ処理のステージが変わるたびにディスクへの書き出しと読み取り(=ディスクI/O)が発生し、その分時間がかかります。
分散データ処理にTezやSparkを利用する構成であれば、Hive on MRよりは高速だったでしょう。
Hive on MR は、大量のデータを時間をかけてバッチ処理するのに適した仕組みであり、短時間で何度もサイクルを回すようなアドホックな分析に向きません。

2-2 Complex型のカラムに対する pushdown が機能しない

データ基盤のテーブルは、列指向フォーマットの1つであるORC形式で作成されており、さらに、ネストされたComplex型(struct型やarray型)のカラムが多く含まれていました。

Hiveクエリ内のselectやwhere句では、ネストしたカラムの一部の要素だけを指定しても、 pushdown が効きません。 つまり、Complex型カラム内の一部の要素だけを指定しても、処理する際には他の指定してない要素もすべてが走査する対象となってしまいます。

こうして、利用しない余計なデータまでもが上記のMapReduceで発生するI/Oに巻きこまれるので、なおのこと応答時間がかかるわけです。

3.対策を考える

この点を踏まえた上で、対策を考えます。

3-1 Hive on MR 以外の選択肢

上述したとおり、Hive on MR は大規模データを対象としたバッチ処理と相性がよく、アドホックな分析向きではありません。
そこで、よりアドホックな分析に向いている構成に切り替えることで、クエリの応答速度改善が見込めそうです。

まず、 分散データ処理をMapReduceからTezやSparkなどに変更することを検討します。MapReduce以外でCDHがサポートしているのはSparkです。

ただ、Hive on Sparkの構成にしても、結局Hiveを使う以上はORCのComplex型カラムに含まれる1つの要素に対するpushdownができません2。 対象としているComplex型カラム(冒頭で紹介したクエリのcol_1)には、利用する要素(xxx_id)の他に非常に多くの要素が含まれていました。そのため、このカラムを走査してパースする処理に時間がかかります。実際にクエリの応答速度への改善効果が薄かったことも踏まえて、Sparkも選択肢からは外しました。

次に、 クエリエンジンをHiveからImpala/Prestoなど分析向けのものに変更することを検討します。分析向きのクエリエンジンの中で、CDHがサポートしているのはImpalaでした。

Impalaには、Complex型カラムを含むテーブルを参照するクエリでは多少のオーバヘッドが生じるといった制約があります3。しかし、その点を考慮してもインメモリ処理による高速化の恩恵のほうがはるかに大きいと考えられました。

そこで、Hive on MR 以外の選択肢としてImpalaを採用することにしました。

3-2 ネストしたカラムへのpushdown

HiveやImpalaは、ORCやParquetのフォーマットでネストしたComplex型カラムへのpushdownには対応していません。そのため、クエリエンジン側でpushdownに対応するのは難しそうです。
一方で、テーブルのカラム構成を変更するという手もあります。ただ、そのカラム構成はシステム上の利便性などを考慮した上で設計されたものである上に、そのカラムのネストをほどいてフラットに変更するのは、規模が大きすぎて現実的ではありませんでした。

3-3 必要なアクションは?

というわけで、クエリエンジンとしてImpalaを利用できる状態を作り上げ、分析者にImpalaを利用してもらう方針を採用しました。
そのために必要なアクションを考えます。

この問題に取り組んでいた当時、多くのテーブルはデータフォーマットがORCでした。 しかし、利用しているCDH のImpala (2.11.0-cdh5.14.4 RELEASE)はORCフォーマットに対応していないため、Impalaからデータを読み取ることができません。

このバージョンのImpalaが対応している列指向フォーマットはParquetです。 したがって、データフォーマットをORCからParquetに変換してやればよい、ということになります。 ParquetであればHiveからも読み取ることができるため、Hiveを使うバッチ処理もImpalaを使うアドホック分析も、それぞれクエリエンジンの特徴を生かした仕事が可能になります。

これでゴールが定まりました。完了条件と、満たすべき要件は以下の通りとなります。


完了条件

  • クエリの応答速度がアドホック分析に耐えるレベルまで改善すること。

要件

  • 分析で利用するテーブルがすべてParquet形式として用意されている
  • ParquetフォーマットのテーブルをImpalaで扱える環境が用意されている
  • 恒常的にORCフォーマットのテーブルをParquetに変換する処理が動いている

本来であれば、ベンチマークとなるクエリとその応答速度を設定することで、「アドホック分析に耐えるレベルのクエリの応答速度」を具体的に決めておくべきだったかもしれません。
しかし、今回はそこまで厳密には完了条件を決めず、「体感として十分速くなればよかろう」といった見通しでひとまずスタートしました。

4.つまづきポイント

上記の要件を実現するために、このような構成をとることにしました。

Parquet変換のための構成図
開発を進める際に直面した問題を1つご紹介します。

4-1 HDFSファイルが圧縮できていない

ORCフォーマットのHDFSファイルをParquetに変換する方法ですが、 HiveクエリでORCテーブルからselectして、Parquetテーブルにinsert overwriteする形で実施しました。

しかし、いざ実行してみると同じでテーブルのデータであるにも関わらず、ParquetのファイルサイズがORCの2.5〜5倍程度にまで増加してしまいました。 ORCとParquetでフォーマットが違うとはいえ、サイズが5倍も異なるとは考えづらく、何かありそうです。

調査した結果、原因はParquetフォーマットだけHDFSファイルが圧縮できていないためでした。何が原因だったのでしょう?

4-2 そのDDLは無力

ORCテーブルとParquetテーブルのDDLはそれぞれ以下のようなものでした。

ORCテーブルのDDL

CREATE EXTERNAL TABLE db_name.table_orc (
  `col_1` string, 
  `col_2` bigint
PARTITIONED BY ( 
  `dt` string)
STORED AS parquet
LOCATION '/path/to/db_name/table_orc/'
TBLPROPERTIES ("orc.compress"="ZLIB")
;

ParquetテーブルのDDL

CREATE EXTERNAL TABLE db_name.table_parquet (
  `col_1` string, 
  `col_2` bigint
PARTITIONED BY ( 
  `dt` string)
STORED AS parquet
LOCATION '/path/to/db_name/table_parquet/'
TBLPROPERTIES ("parquet.compress"="SNAPPY")
;

これらのDDLでは、TBLPROPERTIES ("orc.compress"="ZLIB")TBLPROPERTIES ("parquet.compress"="SNAPPY") の部分で圧縮形式を指定しています。 指定しているつもりでした。

実際には、Parquetテーブルの場合はTBLPROPERTIESでSNAPPYといった圧縮形式を指定しても、その形式の圧縮ファイルは作成されません

4-3 SETで指定するパラメータ

当初は以下のようなinsert文を実行していましたが、このクエリでParquetテーブルに生成されるHDFSファイルは非圧縮になります。
ORC→Parquetのselect-insert文(圧縮ファイルにならない)

SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

INSERT OVERWRITE TABLE db_name.table_parquet PARTITION (dt)
SELECT *
FROM db_name.table_orc
WHERE dt=20191201
;

Parquetテーブルへのinsert文を実行するときに、このような設定が必要でした。
圧縮に必要だったパラメータ

SET parquet.compression=SNAPPY;
SET mapred.output.compress=true;
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;

したがって、目的に適したクエリは次のようになります。
ORC→Parquetのselect-insert文(圧縮ファイルになる)

SET parquet.compression=SNAPPY;
SET mapred.output.compress=true;
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

INSERT OVERWRITE TABLE db_name.table_parquet PARTITION (dt)
SELECT *
FROM db_name.table_orc
WHERE dt=20191201
;

上記では圧縮形式がSNAPPYの場合に必要な設定を示しています。別の圧縮形式を指定する場合は parquet.compression mapred.output.compression.codecでそれぞれ適切な値を指定します。 例えば、GZIPの場合は次のようになります。

GZIP圧縮する場合の設定値

SET parquet.compression=GZIP;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;

Parquetがサポートしている圧縮形式は、UNCOMPRESSED GZIP SNAPPY のいずれかです 4。そして実際にファイルが圧縮される形式は、insert文を実行するときに設定する上記のプロパティによって決まります。

たとえテーブル作成時にDDLで TBLPROPERTIES (“parquet.compress”=“gzip”) としていても、 insert文の実行時にSETで指定するパラメータをSNAPPY用の値にした場合は、SNAPPY圧縮のParquetファイルが出来上がります。
また、圧縮関連のプロパティを指定していなければ、DDLのTBLPROPERTIESの内容に関わらず非圧縮のParquetファイルが出来上がります。

4-4 HDFSファイルの圧縮形式を確認

では最後に、本当にHDFSファイルが圧縮できているのか確認しましょう。使うコマンドは parquet-tools です5

ファイルが圧縮されているか確認する

$ parquet-tools meta <path_to_hdfs_file>

<...中略...>
----------------------------------------
col_1:  INT64 SNAPPY ...<以下略>
col_2:  INT64 SNAPPY ...<以下略>

insert文実行時にSETで指定した通り、SNAPPY圧縮になっていることがわかります。
なお、ファイルが圧縮されていない場合はこのような出力が表示されます。

ファイルが圧縮されているか確認する

$ parquet-tools meta <path_to_hdfs_file>

<...中略...>
----------------------------------------
col_1:  INT64 UNCOMPRESSED ...<以下略>
col_2:  INT64 UNCOMPRESSED ...<以下略>

そんなこんなでParquetファイルの圧縮対応をした後、改めてファイルサイズを比較したところ、ParquetがORCの約1.5〜2倍に収まっていることが確認できました。

5.対策の効果

3-3 で確認したとおり、今回の施策の完了条件は「クエリの応答速度がアドホック分析に耐えるレベルまで改善すること」でした。 冒頭で紹介したクエリを実行したところ、Hiveで30分かかっていたクエリが、Impalaでは50s弱で結果が返ってくることが確認できました。
応答速度が30倍以上速くなり、アドホック分析に耐えるレベルまで改善されたとして、本件の対応は終了となりました。

現在では、分析者用CDHのバージョンアップ(5.14.4→6.3.2)を進めています。
CDHv6.3.2のImpala(v3.2.0-cdh6.3.2)では、experimental feature ではありますが ORCでも読み込むことができるようです6

参考リンク


  1. データ基盤構築の詳細はこちらの記事をご覧ください。
  2. このあたりはクエリをEXPLAINして実行計画をとることで確認できます
  3. Complex Types (Impala 2.3 or higher only)
  4. テーブル情報を取得するクエリである DESCRIBE [FORMATTED] <table_name>; を実行すると、実行結果の中にCompressedという項目が見つかります。しかし、このCompressedの値によってHDFSファイルが圧縮されているかどうかを判定することはできません。 なぜなら、出力内の STORAGE INFORMATION > Compressed の値はファイルが圧縮されているかどうかに関わらず、基本的にNoと表示されるからです。DESCRIBE Statement | 5.4.x | Cloudera Documentation
  5. Using Apache Parquet Data Files with CDH | 5.14.x | Cloudera Documentation
  6. Using the ORC File Format with Impala Tables | 6.3.x | Cloudera Documentation