-
broadcast join 과 bucket joinspark 2021. 7. 26. 00:11
broadcast join
join 할 데이터가 작은게 있을 경우 spark 에서 자동으로 broadcast join 을 적용해 준다.
broadcast join 을 적용하라고 broadcast() function 으로 Hint 를 줄 수도 있다.
import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.{Column, DataFrame, SparkSession} object MySpark extends Serializable { def main(args: Array[String]): Unit = { val spark: SparkSession = ??? val smallDf: DataFrame = ??? val hugeDf: DataFrame = ??? val joinExpr: Column = ??? // org.apache.spark.sql.functions.broadcast() 로 Hint 를 주도록 한다. val joinDf: DataFrame = hugeDf.join(broadcast(smallDf), joinExpr, "inner") joinDf.show() spark.stop() } }
bucket join
다음과 같이 2 단계로 bucket join 을 적용한다.
- join 할 테이블을 각각 bucketBy 를 적용해 저장하기, spark managed tables 참고
- bucket 이 적용된 데이트를 읽어 join 적용 (필요시 broadcast join 이 적용되지 않도록 spark.sql.autoBroadcastJoinThreshold 설정)
import org.apache.spark.sql.{Column, DataFrame, SparkSession} object MySpark extends Serializable { def main(args: Array[String]): Unit = { val spark: SparkSession = ??? val df1: DataFrame = ??? val df2: DataFrame = ??? spark.sql("CREATE DATABASE IF NOT EXISTS MyDB") df1.write.bucketBy(10, "id").saveAsTable("MyDB.my_tbl_1") df2.write.bucketBy(10, "id").saveAsTable("MyDB.my_tbl_2") val bucketDf1 = spark.read.table("MyDB.my_tbl_1") val bucketDf2 = spark.read.table("MyDB.my_tbl_2") val joinExpr: Column = ??? // spark 에서 자동으로 broadcast join 이 적용하는 것을 막을 필요가 있다면 spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val joinDf: DataFrame = bucketDf1.join(bucketDf2, joinExpr, "inner") joinDf.show() spark.stop() } }
broadcast 혹은 bucket join 이 적용되었는지 확인
https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f 처럼 spark DAG visualization 을 보고 확인한다.
bucket 수가 다른 테이블들을 join 할 경우
https://medium.com/@alpeshvirani/shuffling-and-bucketing-in-spark-3-5111ffaf17e1 를 참고하면
spark.sql.shuffle.partitions 값에 따라 결과가 달라진다.
- spark.sql.shuffle.partitions=100 일때 40 buckets, 80 buckets 테이블들 join
=> 두 테이블 모두 shuffle 발생
- spark.sql.shuffle.partitions=50 일때 40 buckets, 80 buckets 테이블들 join
=> 40 buckets 짜리 테이블만 80 개의 partition 으로 shuffle 된 후 join 됨
spark 3.1 부터는 spark.conf.set("spark.sql.bucketing.coalesceBucketsInJoin.enabled", "true") 설정하면 80 개 짜리를 40개에 맞춰 join 하는 것도 가능?
bucket 생성시 파일 개수 줄이기
https://medium.com/@alpeshvirani/shuffling-and-bucketing-in-spark-3-5111ffaf17e1 를 참고하면
"we need to create one bucket per partition and it can be achieved by custom partitions"
df.repartition(expr("pmod(hash(col1), n)"))
'spark' 카테고리의 다른 글
Spark Broadcast 사용하기 (0) 2022.01.30 spark log level 조정하기 (0) 2022.01.30 spark managed tables - partitionBy, bucketBy, sortBy (0) 2021.07.25 spark tables - managed vs unmanaged(external) tables (0) 2021.07.25 데이터의 partition 수와 partition 별로 레코드 수 확인 방법 (0) 2021.07.18