spark
spark 에서 groupByKey 로 특정 column 기준 unique row 만 남기기
wefree
2021. 7. 16. 21:45
문제
아래와 같이 Person 데이터가 있을 때
name | age | location |
a | 1 | A |
b | 2 | B |
a | 3 | C |
name 기준으로 unique 한 row 만 남긴다. name 이 a 인 레코드가 2개 있는데, 중복이 제거되어 둘 중 임의의 하나만 남으면 된다. 단순히 groupBy(name) 을 사용하면 age, location 에 대해 aggregation 을 해야되서 복잡해 진다.
Code
import org.apache.spark.sql.{Dataset, SparkSession}
final case class Person(name: String, age: Int, location: String)
object MySpark extends Serializable {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.getOrCreate()
import spark.implicits._
val ds: Dataset[Person] = Seq(Person("a", 1, "A"), Person("b", 2, "B"), Person("a", 3, "C")).toDS()
val groupByName = ds.groupByKey(_.name).mapGroups {
case (_, iter) => iter.next()
}
groupByName.show(false)
spark.stop
}
}
결과
name | age | location |
a | 1 | A |
b | 2 | B |
혹은
name | age | location |
b | 2 | B |
a | 3 | C |
참고
// ds 데이터를 partitionBy 기준으로 grouping 했을 때 중복이 있을 경우, orderBy 기준으로 정렬후 첫번째 레코드만 남긴다.
def filterFirst(df: DataFrame, partitionBy: Seq[Column], orderBy: Seq[Column])(implicit spark: SparkSession): DataFrame = {
val windowRowNumber = Window.partitionBy(partitionBy: _*).orderBy(orderBy: _*)
df.withColumn("_row_number", row_number().over(windowRowNumber))
.where(col("_row_number") === 1)
.drop("_row_number")
}