spark

spark 에서 groupByKey 로 특정 column 기준 unique row 만 남기기

wefree 2021. 7. 16. 21:45

문제

아래와 같이 Person 데이터가 있을 때

name age location
a 1 A
b 2 B
a 3 C

 

name 기준으로 unique 한 row 만 남긴다. name 이 a 인 레코드가 2개 있는데, 중복이 제거되어 둘 중 임의의 하나만 남으면 된다. 단순히 groupBy(name) 을 사용하면 age, location 에 대해 aggregation 을 해야되서 복잡해 진다.

 

Code

import org.apache.spark.sql.{Dataset, SparkSession}

final case class Person(name: String, age: Int, location: String)

object MySpark extends Serializable {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val ds: Dataset[Person] = Seq(Person("a", 1, "A"), Person("b", 2, "B"), Person("a", 3, "C")).toDS()

    val groupByName = ds.groupByKey(_.name).mapGroups {
      case (_, iter) => iter.next()
    }

    groupByName.show(false)
    spark.stop
  }
}

 

결과

name age location
a 1 A
b 2 B

혹은

name age location
b 2 B
a 3 C

 

 

참고

	// ds 데이터를 partitionBy 기준으로 grouping 했을 때 중복이 있을 경우, orderBy 기준으로 정렬후 첫번째 레코드만 남긴다.
	def filterFirst(df: DataFrame, partitionBy: Seq[Column], orderBy: Seq[Column])(implicit spark: SparkSession): DataFrame = {
		val windowRowNumber = Window.partitionBy(partitionBy: _*).orderBy(orderBy: _*)

		df.withColumn("_row_number", row_number().over(windowRowNumber))
			.where(col("_row_number") === 1)
			.drop("_row_number")
	}