-
Spark Broadcast 사용하기spark 2022. 1. 30. 22:01
문제
Movie Title 정보를 가진 Map[Int, String] 이 있다.
val movieTitles: Map[Int, String] = Map(1 -> "Back To The Future", 2 -> "The Matrix")
movieTitles 를 broadcast 해 아래 DataSet[MovieScore] 에서 조회할 수 있도록 한다.
case class MovieScore(id: Int, score: Int) import spark.implicits._ val movieScoreDs: Dataset[MovieScore] = Seq(MovieScore(1, 9), MovieScore(2, 10)).toDS()
최종 결과는 movieScoreDs 와 movieTitles 가 (map side) join 된 형태인 테이블이 된다.
id score title 1 9 Back To The Future 2 10 The Matrix 코드
import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.{Dataset, SparkSession} object SparkBroadcastExample { case class MovieScore(id: Int, score: Int) def main(args: Array[String]): Unit = { val spark = SparkSession.builder .appName("MovieInfo") .master("local[*]") .getOrCreate() val movieTitles: Map[Int, String] = Map(1 -> "Back To The Future", 2 -> "The Matrix") // broadcast movieTitles val movieTitleBroadcast: Broadcast[Map[Int, String]] = spark.sparkContext.broadcast(movieTitles) import spark.implicits._ val movieScoreDs: Dataset[MovieScore] = Seq(MovieScore(1, 9), MovieScore(2, 10)).toDS() // broadcast 값을 조회하기 위해서는 value method 를 호출해야 한다. val lookupTitle = (id: Int) => movieTitleBroadcast.value(id) val lookupTitleUDF = udf(lookupTitle) val movieScoreWithTitle = movieScoreDs.withColumn("title", lookupTitleUDF(col("id"))) movieScoreWithTitle.show() spark.stop() } }
설명
- 위의 코드에서 broadcast 대상 object 가 Map[Int, String] 인데, 복잡한 object 도 broadcast 가 잘 될까?
- lookupTitleUDF 를 정의해 사용했지만, movieScoreDs.map(...) 로 DataSet 형태로 처리하는 것도 가능하다.
'spark' 카테고리의 다른 글
write partitionBy (0) 2023.02.26 Spark Dataset joinWith bug? (0) 2022.02.26 spark log level 조정하기 (0) 2022.01.30 broadcast join 과 bucket join (0) 2021.07.26 spark managed tables - partitionBy, bucketBy, sortBy (0) 2021.07.25