ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spark Dataset joinWith bug?
    spark 2022. 2. 26. 00:27

    문제

    아래 코드의 결과는 어떻게 될까?

    import org.apache.spark.sql.SparkSession
    
    object SparkTest {
      case class Person(name: String, age: Int, gender: String)
    
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
        import spark.implicits._
    
        val all     = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS()
        val males   = all.filter(_.gender == "M")
        val females = all.filter(_.gender == "F")
    
        val joined = males.joinWith(females, males("name") === females("name"), "inner")
        joined.show(false)
    
        spark.stop()
      }
    }

     

    name 이 Kim 인 레코드만 조인되어, 1개의 레코드만 출력될 것으로 기대했지만 놀랍게도 엉망으로 나온다.

    _1 _2
    {Kim, 10, M} {Kim, 30, F}
    {Lee, 20, M} {Kim, 30, F}

     

    해결

    방법1

    all 을 filter 해서 males 와 females 로 만들어 예상과 다른 결과가 나온다.

    아래와 같이 수정되면 기대한 결과가 나온다.

    //    val all     = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS()
        val males   = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M")).toDS()
        val females = Seq(Person("Kim", 30, "F")).toDS()
    
        val joined = males.joinWith(females, males("name") === females("name"), "inner")
    _1 _2
    {Kim, 10, M} {Kim, 30, F}

     

    방법2

    joinWith 대신에 join 을 사용한다.

        val all     = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS()
        val males   = all.filter(_.gender == "M")
        val females = all.filter(_.gender == "F")
        
        val joined  = males.join(females, males("name") === females("name"), "inner")
    name age gender name age gender
    Kim 10 M Kim 30 F

     

    방법3

    Dataset 을 고집하고 싶다면, joinWith 를 사용하기전에 각각을 alias 한다.

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.col
    
    object SparkTest {
      case class Person(name: String, age: Int, gender: String)
    
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
        import spark.implicits._
    
        val all     = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS()
        val males   = all.filter(_.gender == "M")
        val females = all.filter(_.gender == "F")
    
        val joined = males.alias("M").joinWith(females.alias("F"), col("M.name") === col("F.name"), "inner")
        joined.show(false)
    
        spark.stop()
      }
    }
    _1 _2
    {Kim, 10, M} {Kim, 30, F}

    joinWith 를 쓸 때는 항상 안전하게 alias 후 사용하는 것이 좋을지?

    'spark' 카테고리의 다른 글

    write partitionBy  (0) 2023.02.26
    Spark Broadcast 사용하기  (0) 2022.01.30
    spark log level 조정하기  (0) 2022.01.30
    broadcast join 과 bucket join  (0) 2021.07.26
    spark managed tables - partitionBy, bucketBy, sortBy  (0) 2021.07.25

    댓글

Designed by Tistory.