spark
-
write partitionByspark 2023. 2. 26. 19:19
partition 부분적으로 업데이트하기 위해 partitionOverwriteMode 를 dynamic 으로 설정함 (default: STATIC) ds.write .mode(SaveMode.Overwrite) .option("partitionOverwriteMode", "dynamic") .partitionBy("queryset_id", "log_date", "time_grains") .parquet(basePath) partition 마다 생성되는 파일 개수 조정을 위해서는 아래 링크 참고 https://stackoverflow.com/questions/44808415/spark-parquet-partitioning-large-number-of-files/44810607#44810607 위의 예제에..
-
Spark Dataset joinWith bug?spark 2022. 2. 26. 00:27
문제 아래 코드의 결과는 어떻게 될까? import org.apache.spark.sql.SparkSession object SparkTest { case class Person(name: String, age: Int, gender: String) def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() import spark.implicits._ val all = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS() val males = all.fi..
-
Spark Broadcast 사용하기spark 2022. 1. 30. 22:01
문제 Movie Title 정보를 가진 Map[Int, String] 이 있다. val movieTitles: Map[Int, String] = Map(1 -> "Back To The Future", 2 -> "The Matrix") movieTitles 를 broadcast 해 아래 DataSet[MovieScore] 에서 조회할 수 있도록 한다. case class MovieScore(id: Int, score: Int) import spark.implicits._ val movieScoreDs: Dataset[MovieScore] = Seq(MovieScore(1, 9), MovieScore(2, 10)).toDS() 최종 결과는 movieScoreDs 와 movieTitles 가 (map side..
-
spark log level 조정하기spark 2022. 1. 30. 20:16
문제 spark 을 local mode 로 실행할 때, 로그가 아래처럼 너무 많아 출력결과를 살펴보기가 어렵다. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 22/01/30 20:11:12 INFO SparkContext: Running Spark version 3.0.0 22/01/30 20:11:13 INFO ResourceUtils: ============================================================== 22/01/30 20:11:13 INFO ResourceUtils: Resources for spark.driver: 22/01/30 20:11:13 INFO ..
-
broadcast join 과 bucket joinspark 2021. 7. 26. 00:11
broadcast join join 할 데이터가 작은게 있을 경우 spark 에서 자동으로 broadcast join 을 적용해 준다. broadcast join 을 적용하라고 broadcast() function 으로 Hint 를 줄 수도 있다. import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.{Column, DataFrame, SparkSession} object MySpark extends Serializable { def main(args: Array[String]): Unit = { val spark: SparkSession = ??? val smallDf: DataFrame = ??? val hugeDf: ..
-
spark managed tables - partitionBy, bucketBy, sortByspark 2021. 7. 25. 19:37
문제 spark 에서 schema 를 적용해 데이터를 읽기에서 사용된 csv 형식의 데이터를 spark managed table 로 저장하기 partitionBy, bucketBy, sortBy 도 사용해 보도록 한다. Code /** * build.sbt 에 아래 의존성을 추가해 준다. * "org.apache.spark" %% "spark-hive" % sparkVersion */ import org.apache.spark.sql.{DataFrame, Encoders, SaveMode, SparkSession} import java.sql.Date final case class Person(name: String, age: Int, birthDay: Date) object MySpark extends..
-
spark tables - managed vs unmanaged(external) tablesspark 2021. 7. 25. 18:54
Managed Table table metadata / data 를 spark 에서 관리 table drop 시 metadata 와 data 모두 삭제 data 는 spark.sql.warehouse.dir 에 정의된 곳에 저장 (spark.sql.warehouse.dir 는 cluster 에서 관리됨 - user 가 지정해 사용할 수 없다) bucketing, sorting 해 테이블 생성 가능 Unmanaged Table table metadata 는 spark 에서 관리되고, data 는 user 가 지정한 특정 위치(LOCATION) 에서 관리된다. 이미 존재하는 데이터를 대상으로 테이블 생성이 가능 이미 존재하는 데이터가 아닌 새로운 데이터로 테이블을 생성하는 것도 가능 (bucketing, s..
-
데이터의 partition 수와 partition 별로 레코드 수 확인 방법spark 2021. 7. 18. 21:57
문제 DataFrame df 에 대해 다음을 확인하는 방법 몇개의 partition 으로 나뉘어져 있는지? partition-id 별로 몇개의 record 가 들어 있는지? Code val df: DataFrame = ??? val numPartitions: Int = df.rdd.getNumPartitions println(numPartitions) import org.apache.spark.sql.functions.spark_partition_id df.groupBy(spark_partition_id()).count().show()