spark
Spark Broadcast 사용하기
wefree
2022. 1. 30. 22:01
문제
Movie Title 정보를 가진 Map[Int, String] 이 있다.
val movieTitles: Map[Int, String] = Map(1 -> "Back To The Future", 2 -> "The Matrix")
movieTitles 를 broadcast 해 아래 DataSet[MovieScore] 에서 조회할 수 있도록 한다.
case class MovieScore(id: Int, score: Int)
import spark.implicits._
val movieScoreDs: Dataset[MovieScore] = Seq(MovieScore(1, 9), MovieScore(2, 10)).toDS()
최종 결과는 movieScoreDs 와 movieTitles 가 (map side) join 된 형태인 테이블이 된다.
id | score | title |
1 | 9 | Back To The Future |
2 | 10 | The Matrix |
코드
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.{Dataset, SparkSession}
object SparkBroadcastExample {
case class MovieScore(id: Int, score: Int)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("MovieInfo")
.master("local[*]")
.getOrCreate()
val movieTitles: Map[Int, String] = Map(1 -> "Back To The Future", 2 -> "The Matrix")
// broadcast movieTitles
val movieTitleBroadcast: Broadcast[Map[Int, String]] = spark.sparkContext.broadcast(movieTitles)
import spark.implicits._
val movieScoreDs: Dataset[MovieScore] = Seq(MovieScore(1, 9), MovieScore(2, 10)).toDS()
// broadcast 값을 조회하기 위해서는 value method 를 호출해야 한다.
val lookupTitle = (id: Int) => movieTitleBroadcast.value(id)
val lookupTitleUDF = udf(lookupTitle)
val movieScoreWithTitle = movieScoreDs.withColumn("title", lookupTitleUDF(col("id")))
movieScoreWithTitle.show()
spark.stop()
}
}
설명
- 위의 코드에서 broadcast 대상 object 가 Map[Int, String] 인데, 복잡한 object 도 broadcast 가 잘 될까?
- lookupTitleUDF 를 정의해 사용했지만, movieScoreDs.map(...) 로 DataSet 형태로 처리하는 것도 가능하다.