spark
Spark Dataset joinWith bug?
wefree
2022. 2. 26. 00:27
문제
아래 코드의 결과는 어떻게 될까?
import org.apache.spark.sql.SparkSession
object SparkTest {
case class Person(name: String, age: Int, gender: String)
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
val all = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS()
val males = all.filter(_.gender == "M")
val females = all.filter(_.gender == "F")
val joined = males.joinWith(females, males("name") === females("name"), "inner")
joined.show(false)
spark.stop()
}
}
name 이 Kim 인 레코드만 조인되어, 1개의 레코드만 출력될 것으로 기대했지만 놀랍게도 엉망으로 나온다.
_1 | _2 |
{Kim, 10, M} | {Kim, 30, F} |
{Lee, 20, M} | {Kim, 30, F} |
해결
방법1
all 을 filter 해서 males 와 females 로 만들어 예상과 다른 결과가 나온다.
아래와 같이 수정되면 기대한 결과가 나온다.
// val all = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS()
val males = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M")).toDS()
val females = Seq(Person("Kim", 30, "F")).toDS()
val joined = males.joinWith(females, males("name") === females("name"), "inner")
_1 | _2 |
{Kim, 10, M} | {Kim, 30, F} |
방법2
joinWith 대신에 join 을 사용한다.
val all = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS()
val males = all.filter(_.gender == "M")
val females = all.filter(_.gender == "F")
val joined = males.join(females, males("name") === females("name"), "inner")
name | age | gender | name | age | gender |
Kim | 10 | M | Kim | 30 | F |
방법3
Dataset 을 고집하고 싶다면, joinWith 를 사용하기전에 각각을 alias 한다.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
object SparkTest {
case class Person(name: String, age: Int, gender: String)
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
val all = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS()
val males = all.filter(_.gender == "M")
val females = all.filter(_.gender == "F")
val joined = males.alias("M").joinWith(females.alias("F"), col("M.name") === col("F.name"), "inner")
joined.show(false)
spark.stop()
}
}
_1 | _2 |
{Kim, 10, M} | {Kim, 30, F} |
joinWith 를 쓸 때는 항상 안전하게 alias 후 사용하는 것이 좋을지?