-
DataFrame 에 신규 칼럼 추가하기spark 2024. 7. 19. 19:19
문제
보통 withColumn() 으로 쉽게 신규 칼럼을 추가할 수 있다. 하지만 mapPartitions() 등을 사용할 때는 쉽지가 않다.
코드
import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} val spark: SparkSession = ... val df: DataFrame = ... val rows: RDD[Row] = df.rdd.mapPartitions { partition => val connection: MyConnecton = new MyConnection(...) partition.map { row => val id: String = row.getAs[String]("id") val name: String = connection.lookup(id) // step1) 신규 데이터 추가 Row.fromSeq(row.toSeq :+ name) } } // step2) 신규 스키마 정의 val newSchema = StructType( df.schema.fields ++ Array(StructField("name", StringType, nullable = true)) ) // step3) RDD[Row] 를 DataFrame 으로 만들기 spark.createDataFrame(rows, newSchema)
'spark' 카테고리의 다른 글
Dataset.groupByKey + reduceGroups (0) 2025.01.06 groupBy 후 collect_list 에 여러개의 column 값을 담기 (0) 2024.11.19 write partitionBy (0) 2023.02.26 Spark Dataset joinWith bug? (0) 2022.02.26 Spark Broadcast 사용하기 (0) 2022.01.30