spark
-
각 파티션에서 TaskContext를 사용해 현재 partitionId가져오기spark 2025. 7. 11. 23:00
import org.apache.spark.sql.SparkSessionobject PartitionIdTest { def main(args: Array[String]): Unit = { // TaskId는 Spark에서 작업을 실행할 때 각 작업에 대한 고유한 식별자를 제공합니다. // 이 식별자는 Spark의 내부 로깅 및 모니터링에 사용됩니다. // SparkContext의 getTaskId() 메서드를 사용하여 현재 작업의 TaskId를 가져올 수 있습니다. implicit val spark: SparkSession = SparkSession.builder().appName("TaskIdTest").master("local[*]").getOrCreate() import spark.impl..
-
Dataset 으로 정의된 case class 의 fields 가져오기spark 2025. 6. 10. 21:06
case class PwlLog( log_type: String, device_type: String, event_type: String)val fields: Array[String] = classOf[PwlLog].getDeclaredFields.map(_.getName)spark.read .... .withColumn("log_type", ...) .select(fields.map(col): _*) .as[PwlLog]
-
groupBy 후 group 당 N 개의 record 만 남기기spark 2025. 1. 6. 21:19
groupBy 후 group 당 최대 N=300 개만 남긴다고 할 때 방법1groupByKey + mapGroups 사용val urlLineDs: Dataset[(String, Seq[String])] = spark.read.text(path) .toDF("url", "item") .groupByKey(row => row.getAs[String]("url")) .mapGroups { case (url, rowIter: Iterator[Row]) => val itemList: Seq[String] = rowIter.take(300).map(_.getAs[String]("item")).toList (url, itemList) } 방법2window function 사용val window = Windo..
-
groupBy 후 collect_list 에 여러개의 column 값을 담기spark 2024. 11. 19. 20:46
collect_list(struct(???)) 를 사용한다.import spark.implicits._case class BaseMetric( id: String, log_type: String, device_type: String, clk_count: Long, imp_count: Long)case class BaseMetricGroup( id: String, metrics: Seq[BaseMetric])val metricDs: Dataset[BaseMetric] = ???metricDs .groupBy("id") .agg( collect_list( struct(BaseMetric.fields.map(col): _*) ).as("metrics") ) .as[BaseMetricGroup]
-
DataFrame 에 신규 칼럼 추가하기spark 2024. 7. 19. 19:19
문제보통 withColumn() 으로 쉽게 신규 칼럼을 추가할 수 있다. 하지만 mapPartitions() 등을 사용할 때는 쉽지가 않다. 코드https://stackoverflow.com/questions/33876155/how-to-add-columns-into-org-apache-spark-sql-row-inside-of-mappartitions import org.apache.spark.rdd.RDDimport org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.sql.{DataFrame, Row, SparkSession}val spark: SparkSession = ...val df: D..
-
write partitionByspark 2023. 2. 26. 19:19
partition 부분적으로 업데이트하기 위해 partitionOverwriteMode 를 dynamic 으로 설정함 (default: STATIC) ds.write .mode(SaveMode.Overwrite) .option("partitionOverwriteMode", "dynamic") .partitionBy("queryset_id", "log_date", "time_grains") .parquet(basePath) partition 마다 생성되는 파일 개수 조정을 위해서는 아래 링크 참고 https://stackoverflow.com/questions/44808415/spark-parquet-partitioning-large-number-of-files/44810607#44810607 위의 예제에..
-
Spark Dataset joinWith bug?spark 2022. 2. 26. 00:27
문제 아래 코드의 결과는 어떻게 될까? import org.apache.spark.sql.SparkSession object SparkTest { case class Person(name: String, age: Int, gender: String) def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() import spark.implicits._ val all = Seq(Person("Kim", 10, "M"), Person("Lee", 20, "M"), Person("Kim", 30, "F")).toDS() val males = all.fi..