-
Spark Dataset joinWith bug?spark 2022. 2. 26. 00:27
문제
아래 코드의 결과는 어떻게 될까?
import org.apache.spark.sql.SparkSession object SparkTest { case class Person(name: String, age: Int, gender: String) def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() import spark.implicits._ val all = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS() val males = all.filter(_.gender == "M") val females = all.filter(_.gender == "F") val joined = males.joinWith(females, males("name") === females("name"), "inner") joined.show(false) spark.stop() } }
name 이 Kim 인 레코드만 조인되어, 1개의 레코드만 출력될 것으로 기대했지만 놀랍게도 엉망으로 나온다.
_1 _2 {Kim, 10, M} {Kim, 30, F} {Lee, 20, M} {Kim, 30, F} 해결
방법1
all 을 filter 해서 males 와 females 로 만들어 예상과 다른 결과가 나온다.
아래와 같이 수정되면 기대한 결과가 나온다.
// val all = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS() val males = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M")).toDS() val females = Seq(Person("Kim", 30, "F")).toDS() val joined = males.joinWith(females, males("name") === females("name"), "inner")
_1 _2 {Kim, 10, M} {Kim, 30, F} 방법2
joinWith 대신에 join 을 사용한다.
val all = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS() val males = all.filter(_.gender == "M") val females = all.filter(_.gender == "F") val joined = males.join(females, males("name") === females("name"), "inner")
name age gender name age gender Kim 10 M Kim 30 F 방법3
Dataset 을 고집하고 싶다면, joinWith 를 사용하기전에 각각을 alias 한다.
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.col object SparkTest { case class Person(name: String, age: Int, gender: String) def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() import spark.implicits._ val all = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS() val males = all.filter(_.gender == "M") val females = all.filter(_.gender == "F") val joined = males.alias("M").joinWith(females.alias("F"), col("M.name") === col("F.name"), "inner") joined.show(false) spark.stop() } }
_1 _2 {Kim, 10, M} {Kim, 30, F} joinWith 를 쓸 때는 항상 안전하게 alias 후 사용하는 것이 좋을지?
'spark' 카테고리의 다른 글
DataFrame 에 신규 칼럼 추가하기 (0) 2024.07.19 write partitionBy (0) 2023.02.26 Spark Broadcast 사용하기 (0) 2022.01.30 spark log level 조정하기 (0) 2022.01.30 broadcast join 과 bucket join (0) 2021.07.26