MicroAd Developers Blog

マイクロアドのエンジニアブログです。インフラ、開発、分析について発信していきます。

Spark Connectの検証における知見と課題

マイクロアドでサーバーサイドエンジニアをしている高橋です。

PySpark x Spark Connectの検証によって知見が溜まってきたので共有します。

検証に至った経緯は別の記事で紹介しています。

developers.microad.co.jp

Spark Connect関連の情報は検索してもまだ少なく、試行錯誤しながら進めています。

この記事が少しでも参考になれば幸いです。

構成

今回想定する構成は概ね別記事で紹介されているものになります。

データ基盤移行計画とPySpark(Spark Connect)の検証 - MicroAd Developers Blog

Kubernetes上でSpark Connect serverを動かし、PySparkから集計処理を流します。

Spark Connect serverの起動はPythonのkubernetesライブラリを使用して行いますが、kubectlコマンドで行うのと内容的には同じです。

Sparkのバージョンは3.5.0です。

知見

Spark Connect server起動時にオプションを全て渡さなければならない

検証を開始した時期の最大のつまづきポイントでした。

SparkのオプションはSpark Connect clientとなるPySparkアプリケーションには設定しません。

必要な設定はSpark Connect server起動時の際、Spark Connect server側に設定する必要があります。

Spark Connect server起動時に引数で設定したり、Kubernetesの機能のConfigMapで設定したりできます。

設定するオプションの例として以下のようなものが挙げられます。

  • カタログの設定
  • ログ関連の設定
  • S3のシークレット
  • jarファイルのパス
    • UDF
    • mysql-connector
    • その他DB接続関連

例えばカタログの設定関連でHiveサポートを有効化するenableHiveSupportメソッドがあります。

PySparkでセッション作成時にこのメソッドを呼び出してもHiveが使用できるようにはなりません。

Spark Connect server起動時にspark.sql.catalogImplementationの項目にhiveを設定することで有効化できます。

Decimal型の列同士の四則演算で精度が変わる

これについてはSpark Connect特有のものではなくSparkそのものの動作になります。

ここでは加算を例として取り上げます。

Decimal(18,8)型の列ABがあったとします。

A + Bの結果の型はDecimal(19,8)となります。

> df.withColumn("A + B", col("A") + col("B")).printSchema()
root
 |-- A: decimal(18,8) (nullable = true)
 |-- B: decimal(18,8) (nullable = true)
 |-- A + B: decimal(19,8) (nullable = true)

precisionが1増えて19になっています。 整数部分が10 → 11桁になり小数部分は8桁のままなので精度が落ちてはおらず、この場合は問題なさそうです。

一方で、precisionが最大の38だった場合はどうでしょうか。

> df.withColumn("A + B", col("A") + col("B")).printSchema()
root
 |-- A: decimal(38,8) (nullable = true)
 |-- B: decimal(38,8) (nullable = true)
 |-- A + B: decimal(38,7) (nullable = true)

このとき、precisionは元々最大だったので増えていません。 整数部分は30 → 31桁になり小数部分は8 → 7桁と減少しているので精度が落ちてしまいます。

これは減算でも同じ挙動が確認されます。

Sparkでは精度が同じDecimal同士の加減算においては整数部分の桁数を1増やそうとするようです。

precisionの上限の38に達している場合、小数部分の精度維持よりも整数部分の桁数を増やす動作が優先され、結果として精度が低下します。

Hiveのクエリ・UDFが使える

こちらもSpark Connect特有のものではなくSparkそのものの動作になります。

クエリエンジンとしてのHiveが存在していない場合でも、SparkでHiveのクエリを実行できます。

UDFについてもHive用のUDFをそのまま動かすことができます(このときHive用UDFのjarのパスは前述の通り事前に渡しておく必要があります)。

パフォーマンス等の検証は必要になりますが、Hiveから移行する場面ではSpark用に書き直さずこれらが使えるのは大きなメリットです。

課題点

検証を進める中で判明した課題点としてSpark Connect serverから実行するSparkアプリケーションをクラスタ内で分散して実行するには工夫が必要という点が挙げられます。

Spark Connect serverの起動方法として公式で用意されているstart-connect-server.shで起動するとStandalone Modeになってしまいます。

Cluster ManagerとしてKubernetes APIを指定出来ないので、Executor PodがKubernetesクラスタのノード間で分散して実行ができません。

これでは大規模なデータ処理に耐えられないことが想定されるため、Hadoopエコシステムの代替となるデータ処理基盤として運用していく上で大きな問題となります。

この部分については解決の目処は立っていますが、記事執筆時点でも調査・検証を進めています。