ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spark Broadcast 사용하기
    spark 2022. 1. 30. 22:01

    문제

    Movie Title 정보를 가진 Map[Int, String] 이 있다.

    val movieTitles: Map[Int, String] = Map(1 -> "Back To The Future", 2 -> "The Matrix")

    movieTitles 를 broadcast 해 아래 DataSet[MovieScore] 에서 조회할 수 있도록 한다.

    case class MovieScore(id: Int, score: Int)
    
    import spark.implicits._
    val movieScoreDs: Dataset[MovieScore] = Seq(MovieScore(1, 9), MovieScore(2, 10)).toDS()

    최종 결과는 movieScoreDs 와 movieTitles 가 (map side) join 된 형태인 테이블이 된다.

    id score title
    1 9 Back To The Future
    2 10 The Matrix

    코드

    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.sql.functions.{col, udf}
    import org.apache.spark.sql.{Dataset, SparkSession}
    
    object SparkBroadcastExample {
      case class MovieScore(id: Int, score: Int)
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder
          .appName("MovieInfo")
          .master("local[*]")
          .getOrCreate()
    
        val movieTitles: Map[Int, String] = Map(1 -> "Back To The Future", 2 -> "The Matrix")
    
        // broadcast movieTitles
        val movieTitleBroadcast: Broadcast[Map[Int, String]] = spark.sparkContext.broadcast(movieTitles)
    
        import spark.implicits._
        val movieScoreDs: Dataset[MovieScore] = Seq(MovieScore(1, 9), MovieScore(2, 10)).toDS()
    
        // broadcast 값을 조회하기 위해서는 value method 를 호출해야 한다.
        val lookupTitle    = (id: Int) => movieTitleBroadcast.value(id)
        val lookupTitleUDF = udf(lookupTitle)
    
        val movieScoreWithTitle = movieScoreDs.withColumn("title", lookupTitleUDF(col("id")))
        movieScoreWithTitle.show()
    
        spark.stop()
      }
    }

    설명

    • 위의 코드에서 broadcast 대상 object 가 Map[Int, String] 인데, 복잡한 object 도 broadcast 가 잘 될까?
    • lookupTitleUDF 를 정의해 사용했지만, movieScoreDs.map(...) 로 DataSet 형태로 처리하는 것도 가능하다.

    'spark' 카테고리의 다른 글

    write partitionBy  (0) 2023.02.26
    Spark Dataset joinWith bug?  (0) 2022.02.26
    spark log level 조정하기  (0) 2022.01.30
    broadcast join 과 bucket join  (0) 2021.07.26
    spark managed tables - partitionBy, bucketBy, sortBy  (0) 2021.07.25

    댓글

Designed by Tistory.