spark
-
groupBy 후 group 당 N 개의 record 만 남기기spark 2025. 1. 6. 21:19
groupBy 후 group 당 최대 N=300 개만 남긴다고 할 때 방법1groupByKey + mapGroups 사용val urlLineDs: Dataset[(String, Seq[String])] = spark.read.text(path) .toDF("url", "item") .groupByKey(row => row.getAs[String]("url")) .mapGroups { case (url, rowIter: Iterator[Row]) => val itemList: Seq[String] = rowIter.take(300).map(_.getAs[String]("item")).toList (url, itemList) } 방법2window function 사용val window = Windo..
-
groupBy 후 collect_list 에 여러개의 column 값을 담기spark 2024. 11. 19. 20:46
collect_list(struct(???)) 를 사용한다.import spark.implicits._case class BaseMetric( id: String, log_type: String, device_type: String, clk_count: Long, imp_count: Long)case class BaseMetricGroup( id: String, metrics: Seq[BaseMetric])val metricDs: Dataset[BaseMetric] = ???metricDs .groupBy("id") .agg( collect_list( struct(BaseMetric.fields.map(col): _*) ).as("metrics") ) .as[BaseMetricGroup]
-
DataFrame 에 신규 칼럼 추가하기spark 2024. 7. 19. 19:19
문제보통 withColumn() 으로 쉽게 신규 칼럼을 추가할 수 있다. 하지만 mapPartitions() 등을 사용할 때는 쉽지가 않다. 코드https://stackoverflow.com/questions/33876155/how-to-add-columns-into-org-apache-spark-sql-row-inside-of-mappartitions import org.apache.spark.rdd.RDDimport org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.sql.{DataFrame, Row, SparkSession}val spark: SparkSession = ...val df: D..
-
write partitionByspark 2023. 2. 26. 19:19
partition 부분적으로 업데이트하기 위해 partitionOverwriteMode 를 dynamic 으로 설정함 (default: STATIC) ds.write .mode(SaveMode.Overwrite) .option("partitionOverwriteMode", "dynamic") .partitionBy("queryset_id", "log_date", "time_grains") .parquet(basePath) partition 마다 생성되는 파일 개수 조정을 위해서는 아래 링크 참고 https://stackoverflow.com/questions/44808415/spark-parquet-partitioning-large-number-of-files/44810607#44810607 위의 예제에..
-
Spark Dataset joinWith bug?spark 2022. 2. 26. 00:27
문제 아래 코드의 결과는 어떻게 될까? import org.apache.spark.sql.SparkSession object SparkTest { case class Person(name: String, age: Int, gender: String) def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() import spark.implicits._ val all = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS() val males = all.fi..
-
Spark Broadcast 사용하기spark 2022. 1. 30. 22:01
문제 Movie Title 정보를 가진 Map[Int, String] 이 있다. val movieTitles: Map[Int, String] = Map(1 -> "Back To The Future", 2 -> "The Matrix") movieTitles 를 broadcast 해 아래 DataSet[MovieScore] 에서 조회할 수 있도록 한다. case class MovieScore(id: Int, score: Int) import spark.implicits._ val movieScoreDs: Dataset[MovieScore] = Seq(MovieScore(1, 9), MovieScore(2, 10)).toDS() 최종 결과는 movieScoreDs 와 movieTitles 가 (map side..
-
spark log level 조정하기spark 2022. 1. 30. 20:16
문제 spark 을 local mode 로 실행할 때, 로그가 아래처럼 너무 많아 출력결과를 살펴보기가 어렵다. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 22/01/30 20:11:12 INFO SparkContext: Running Spark version 3.0.0 22/01/30 20:11:13 INFO ResourceUtils: ============================================================== 22/01/30 20:11:13 INFO ResourceUtils: Resources for spark.driver: 22/01/30 20:11:13 INFO ..