MicroAd Developers Blog

マイクロアドのエンジニアブログです。インフラ、開発、分析について発信していきます。

データ基盤移行計画とPySpark(Spark Connect)の検証

マイクロアドでサーバサイドエンジニアをしているタカギです。

今回はデータ基盤移行とPySparkについての話になります。

目次

データ基盤移行の概要

諸々の事情1により、データ基盤をHadoopから移行することになりました。

現在のデータ基盤でのETL/ELT処理はHadoopエコシステム(Hive、HDFSなど)を中心に構成されています。

※Hadoopについてはこちらの記事が参考になります。

これらをKubernetes、PySpark、S3互換ストレージ(詳細未確定)を組み合わせたデータ基盤へ移行する計画です。

すぐにすべてを移行するのは難しく、完全移行までは新旧並行稼働がそれなりの期間続く予定です。

今回の記事では、PySparkを使用したバッチ処理の部分に少しだけ触れていきます。

※その他にもさまざまなコンポーネントで新たなツールやアーキテクチャを導入していく予定ですが、そちらは割愛させていただきます。

データ基盤移行後のバッチ処理

データ基盤移行後はKubernetes上でPySparkを実行することで集計処理を行う予定です。

PySparkを実行するにあたって、Spark3.4で導入されたSpark Connect2を導入する方針で検証を進めています。

Spark Connectを使用することで、Spark3.3までとは異なる形でPySparkを実行できるようになりました。

Spark Connectを導入する

当初(Spark3.3を使用する前提での計画中)は、Kubernetes上でSpark用のカスタムリソース、カスタムオペレーターを使用してPySparkを実行する想定でした。3

事前準備として、PySpark用スクリプトを特定の場所に配置するか、Kubernetes上で起動するDockerイメージの中に含めるなどする必要があります。

マイクロアドではワークフローエンジンとしてDigdagを採用しており、DigdagでPySparkを使用したバッチ処理は以下のような流れとなります。

この方法だと、以下のような、(クリティカルではないとはいえ)若干悩ましい問題がありました。

  • バッチごとにPySpark用スクリプトのパスを記載したKubernetesマニフェストファイルを用意する必要がある
  • Pythonコードが大元の処理(処理a)とPySpark用スクリプト(処理b)に分割される(分割したくないケースもある)
  • PySpark用スクリプトにパラメータが必要な場合、マニフェストファイル経由で渡す必要がある
    • 集計処理は時間単位で行うことが多く、上記の例では dt(日付), hour(時間) をパラメータとして渡しています

一方で、Spark Connectは、SparkをAPIサーバのように扱うことができる仕組みとなっており、大元のPythonコードから直接PySparkを実行できます。

from datetime import datetime
from pyspark.sql import SparkSession


def main() -> None:
    """
    大元の処理
    """
    # 処理対象日時
    dt = "20230410"
    hour = "12"
    execute_pyspark(dt, hour)


def execute_pyspark(dt: str, hour: str) -> None:
    """
    pysparkの処理
    """
    spark_connect_url = "sc://test-k8s:31502"
    input_path = f"hdfs://test-cdh:8020/test_path/in_log/dt={dt}/hour={hour}"
    output_path = f"hdfs://test-cdh:8020/test_path/out_log/dt={dt}/hour={hour}"

    # SparkSession
    spark = SparkSession.builder.remote(spark_connect_url).getOrCreate()

    # ストレージからの読み込み
    df = spark.read.parquet(input_path)

    # 集計処理
    # df = df.xxx(xxx)

    # ストレージへの書き込み
    df.write.mode('overwrite').parquet(output_path)
    spark.stop()

Spark Connectを導入することで、悩ましい問題をいくつか解消できそうです。

一方で、良いことづくめではなく、問題点もありそうでした。

Spark Connectの問題点

複数のバッチ処理からのリクエストが同時に発生した場合、Spark Connectサーバの負荷が上昇し、すべてのバッチ処理が影響を受けます。

※リクエストするたびにspark-submitされるわけではなく、常時稼動しているSpark ConnectサーバのDriverを使いまわしてExecutorが生成されるようなイメージです。

Driverが落ちてしまった場合も、すべてのバッチ処理が影響を受けることになります。

この問題を避けるため、バッチ処理のたびにSpark Connectサーバを作成/削除する方針にしました。

  • 一連のバッチ処理の流れ
    • Spark Connectサーバ作成
    • PySpark実行
    • Spark Connectサーバ削除

Spark Connectサーバを作成/削除するステップが毎回発生することになるわけですが、以下のような制約がありそうです。

  • アプリケーション名をユニークにする必要がある
  • NodePortが重複しないように制御する必要がある(Spark ConnectはNodePortサービス経由でアクセスする前提)

しかしながら、それでもメリットのほうが大きいと判断し、Spark Connectを導入する方向で進めることにしました。

まとめ

データ基盤移行の概要と、PySparkの検証について書きました。

Spark Connectは非常に便利な機能だと感じています。この記事が参考になれば幸いです。

また、PySparkに限らずですが、現在もデータ基盤移行の検証は進んでいます。

PySpark検証の記事第2弾もご期待ください。

補足

Spark ConnectではすべてのAPIがサポートされるわけではなく、利用可能なAPIは限られているようです。

すでにSparkを利用している状況でSpark Connectを導入する場合は注意が必要ですね。

Apache Spark 3.4でサポートされるAPI PySpark: Spark 3.4では、Spark ConnectはDataFrame, Functions, Columnを含むほとんどのPySpark APIをサポートしています。 サポートされているPySpark APIは、API referenceドキュメントで「Supports Spark Connect」と表示されるので、既存のコードをSpark Connectに移行する前に、使用しているAPIが利用可能かどうかを確認できます。 https://www.databricks.com/jp/blog/2023/04/18/spark-connect-available-apache-spark.html