spark

Spark Broadcast 사용하기

wefree 2022. 1. 30. 22:01

문제

Movie Title 정보를 가진 Map[Int, String] 이 있다.

val movieTitles: Map[Int, String] = Map(1 -> "Back To The Future", 2 -> "The Matrix")

movieTitles 를 broadcast 해 아래 DataSet[MovieScore] 에서 조회할 수 있도록 한다.

case class MovieScore(id: Int, score: Int)

import spark.implicits._
val movieScoreDs: Dataset[MovieScore] = Seq(MovieScore(1, 9), MovieScore(2, 10)).toDS()

최종 결과는 movieScoreDs 와 movieTitles 가 (map side) join 된 형태인 테이블이 된다.

id score title
1 9 Back To The Future
2 10 The Matrix

코드

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.{Dataset, SparkSession}

object SparkBroadcastExample {
  case class MovieScore(id: Int, score: Int)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("MovieInfo")
      .master("local[*]")
      .getOrCreate()

    val movieTitles: Map[Int, String] = Map(1 -> "Back To The Future", 2 -> "The Matrix")

    // broadcast movieTitles
    val movieTitleBroadcast: Broadcast[Map[Int, String]] = spark.sparkContext.broadcast(movieTitles)

    import spark.implicits._
    val movieScoreDs: Dataset[MovieScore] = Seq(MovieScore(1, 9), MovieScore(2, 10)).toDS()

    // broadcast 값을 조회하기 위해서는 value method 를 호출해야 한다.
    val lookupTitle    = (id: Int) => movieTitleBroadcast.value(id)
    val lookupTitleUDF = udf(lookupTitle)

    val movieScoreWithTitle = movieScoreDs.withColumn("title", lookupTitleUDF(col("id")))
    movieScoreWithTitle.show()

    spark.stop()
  }
}

설명

  • 위의 코드에서 broadcast 대상 object 가 Map[Int, String] 인데, 복잡한 object 도 broadcast 가 잘 될까?
  • lookupTitleUDF 를 정의해 사용했지만, movieScoreDs.map(...) 로 DataSet 형태로 처리하는 것도 가능하다.