flink

DataSet Join 예제

wefree 2021. 5. 15. 20:34

문제

person 파일 (id, name) 과 location 파일 (id, city) 을 left outer join 해 output 파일에 저장한다. 

 

Person 파일 (person.txt)

1,John
2,Albert
3,Lui
4,Smith
5,Robert

 

Location 파일 (location.txt)

1,DC
2,NY
4,LA
6,LU
7,DL
8,NH

 

Expected Output

1 Some(John) Some(DC)
2 Some(Albert) Some(NY)
3 Some(Lui) None
4 Some(Smith) Some(LA)
5 Some(Robert) None

 

Code

import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._

object JoinExample {
  case class Person(id: Int, name: String)
  case class Location(id: Int, city: String)
  case class PersonLocation(id: Int, name: Option[String], location: Option[String])

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val params = ParameterTool.fromArgs(args)
    env.getConfig.setGlobalJobParameters(params)

    val personSet: DataSet[Person] = env
      .readTextFile(params.get("person"))
      .map(_.split(",") match { case Array(id, name) => Person(id.toInt, name) })

    val locationSet: DataSet[Location] = env
      .readTextFile(params.get("location"))
      .map(_.split(",") match { case Array(id, city) => Location(id.toInt, city) })

    val joined: DataSet[PersonLocation] = personSet
      .leftOuterJoin(locationSet, JoinHint.BROADCAST_HASH_SECOND)
      .where("id")
      .equalTo("id")
      .apply((person, location) => PersonLocation(person.id, Some(person.name), Option(location).map(_.city)))

    joined.writeAsCsv(params.get("output"), "\n", " ")
    env.execute("Left Outer Join Example")
  }
}

 

설명

// JoinHint 로 BROADCAST_HASH_SECOND, REPARTITION_HASH_SECOND, REPARTITION_SORT_MERGE 등.
//   - BROADCAST_HASH_SECOND: 오른쪽 데이터 크기가 작을때, 오른쪽 데이터 전체를 
//     broadcast 해 map side join
//   - REPARTITION_HASH_SECOND: ???
//   - REPARTITION_SORT_MERGE: 왼쪽 or 오른쪽 데이터중 하나가 정렬되어 있을 때
personSet.leftOuterJoin(locationSet, JoinHint.BROADCAST_HASH_SECOND)

// 한편 아래와 같은 join 함수도 있으니 참고하자
personSet.joinWithHuge(...)
personSet.joinWithTiny(...)

 

참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/