ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • broadcast join 과 bucket join
    spark 2021. 7. 26. 00:11

    broadcast join

    join 할 데이터가 작은게 있을 경우 spark 에서 자동으로 broadcast join 을 적용해 준다.

    broadcast join 을 적용하라고 broadcast() function 으로 Hint 를 줄 수도 있다.

    import org.apache.spark.sql.functions.broadcast
    import org.apache.spark.sql.{Column, DataFrame, SparkSession}
    
    object MySpark extends Serializable {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = ???
    
        val smallDf: DataFrame = ???
        val hugeDf: DataFrame  = ???
        val joinExpr: Column   = ???
    
        // org.apache.spark.sql.functions.broadcast() 로 Hint 를 주도록 한다.
        val joinDf: DataFrame = hugeDf.join(broadcast(smallDf), joinExpr, "inner")
        joinDf.show()
    
        spark.stop()
      }
    }

     

    bucket join

    다음과 같이 2 단계로 bucket join 을 적용한다.

    1. join 할 테이블을 각각 bucketBy 를 적용해 저장하기, spark managed tables 참고
    2. bucket 이 적용된 데이트를 읽어 join 적용 (필요시 broadcast join 이 적용되지 않도록 spark.sql.autoBroadcastJoinThreshold 설정)
    import org.apache.spark.sql.{Column, DataFrame, SparkSession}
    
    object MySpark extends Serializable {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = ???
    
        val df1: DataFrame = ???
        val df2: DataFrame = ???
    
        spark.sql("CREATE DATABASE IF NOT EXISTS MyDB")
        df1.write.bucketBy(10, "id").saveAsTable("MyDB.my_tbl_1")
        df2.write.bucketBy(10, "id").saveAsTable("MyDB.my_tbl_2")
    
        val bucketDf1 = spark.read.table("MyDB.my_tbl_1")
        val bucketDf2 = spark.read.table("MyDB.my_tbl_2")
    
        val joinExpr: Column = ???
    
        // spark 에서 자동으로 broadcast join 이 적용하는 것을 막을 필요가 있다면
        spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
        val joinDf: DataFrame = bucketDf1.join(bucketDf2, joinExpr, "inner")
        joinDf.show()
    
        spark.stop()
      }
    }

     

    broadcast 혹은 bucket join 이 적용되었는지 확인

    https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f 처럼 spark DAG visualization 을 보고 확인한다.

     

    bucket 수가 다른 테이블들을 join 할 경우

    https://medium.com/@alpeshvirani/shuffling-and-bucketing-in-spark-3-5111ffaf17e1 를 참고하면

     spark.sql.shuffle.partitions 값에 따라 결과가 달라진다.

    • spark.sql.shuffle.partitions=100 일때 40 buckets, 80 buckets 테이블들 join

          => 두 테이블 모두 shuffle 발생

     

    • spark.sql.shuffle.partitions=50 일때 40 buckets, 80 buckets 테이블들 join

          => 40 buckets 짜리 테이블만 80 개의 partition 으로 shuffle 된 후 join 됨

          spark 3.1 부터는 spark.conf.set("spark.sql.bucketing.coalesceBucketsInJoin.enabled", "true") 설정하면 80 개 짜리를 40개에 맞춰 join 하는 것도 가능?

     

    bucket 생성시 파일 개수 줄이기

    https://medium.com/@alpeshvirani/shuffling-and-bucketing-in-spark-3-5111ffaf17e1 를 참고하면

    "we need to create one bucket per partition and it can be achieved by custom partitions"

    df.repartition(expr("pmod(hash(col1), n)"))

    댓글

Designed by Tistory.