spark

Spark Dataset joinWith bug?

wefree 2022. 2. 26. 00:27

문제

아래 코드의 결과는 어떻게 될까?

import org.apache.spark.sql.SparkSession

object SparkTest {
  case class Person(name: String, age: Int, gender: String)

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
    import spark.implicits._

    val all     = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS()
    val males   = all.filter(_.gender == "M")
    val females = all.filter(_.gender == "F")

    val joined = males.joinWith(females, males("name") === females("name"), "inner")
    joined.show(false)

    spark.stop()
  }
}

 

name 이 Kim 인 레코드만 조인되어, 1개의 레코드만 출력될 것으로 기대했지만 놀랍게도 엉망으로 나온다.

_1 _2
{Kim, 10, M} {Kim, 30, F}
{Lee, 20, M} {Kim, 30, F}

 

해결

방법1

all 을 filter 해서 males 와 females 로 만들어 예상과 다른 결과가 나온다.

아래와 같이 수정되면 기대한 결과가 나온다.

//    val all     = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS()
    val males   = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M")).toDS()
    val females = Seq(Person("Kim", 30, "F")).toDS()

    val joined = males.joinWith(females, males("name") === females("name"), "inner")
_1 _2
{Kim, 10, M} {Kim, 30, F}

 

방법2

joinWith 대신에 join 을 사용한다.

    val all     = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS()
    val males   = all.filter(_.gender == "M")
    val females = all.filter(_.gender == "F")
    
    val joined  = males.join(females, males("name") === females("name"), "inner")
name age gender name age gender
Kim 10 M Kim 30 F

 

방법3

Dataset 을 고집하고 싶다면, joinWith 를 사용하기전에 각각을 alias 한다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

object SparkTest {
  case class Person(name: String, age: Int, gender: String)

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
    import spark.implicits._

    val all     = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS()
    val males   = all.filter(_.gender == "M")
    val females = all.filter(_.gender == "F")

    val joined = males.alias("M").joinWith(females.alias("F"), col("M.name") === col("F.name"), "inner")
    joined.show(false)

    spark.stop()
  }
}
_1 _2
{Kim, 10, M} {Kim, 30, F}

joinWith 를 쓸 때는 항상 안전하게 alias 후 사용하는 것이 좋을지?