flink
DataSet Join 예제
wefree
2021. 5. 15. 20:34
문제
person 파일 (id, name) 과 location 파일 (id, city) 을 left outer join 해 output 파일에 저장한다.
Person 파일 (person.txt)
1,John
2,Albert
3,Lui
4,Smith
5,Robert
Location 파일 (location.txt)
1,DC
2,NY
4,LA
6,LU
7,DL
8,NH
Expected Output
1 Some(John) Some(DC)
2 Some(Albert) Some(NY)
3 Some(Lui) None
4 Some(Smith) Some(LA)
5 Some(Robert) None
Code
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
object JoinExample {
case class Person(id: Int, name: String)
case class Location(id: Int, city: String)
case class PersonLocation(id: Int, name: Option[String], location: Option[String])
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val personSet: DataSet[Person] = env
.readTextFile(params.get("person"))
.map(_.split(",") match { case Array(id, name) => Person(id.toInt, name) })
val locationSet: DataSet[Location] = env
.readTextFile(params.get("location"))
.map(_.split(",") match { case Array(id, city) => Location(id.toInt, city) })
val joined: DataSet[PersonLocation] = personSet
.leftOuterJoin(locationSet, JoinHint.BROADCAST_HASH_SECOND)
.where("id")
.equalTo("id")
.apply((person, location) => PersonLocation(person.id, Some(person.name), Option(location).map(_.city)))
joined.writeAsCsv(params.get("output"), "\n", " ")
env.execute("Left Outer Join Example")
}
}
설명
// JoinHint 로 BROADCAST_HASH_SECOND, REPARTITION_HASH_SECOND, REPARTITION_SORT_MERGE 등.
// - BROADCAST_HASH_SECOND: 오른쪽 데이터 크기가 작을때, 오른쪽 데이터 전체를
// broadcast 해 map side join
// - REPARTITION_HASH_SECOND: ???
// - REPARTITION_SORT_MERGE: 왼쪽 or 오른쪽 데이터중 하나가 정렬되어 있을 때
personSet.leftOuterJoin(locationSet, JoinHint.BROADCAST_HASH_SECOND)
// 한편 아래와 같은 join 함수도 있으니 참고하자
personSet.joinWithHuge(...)
personSet.joinWithTiny(...)
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/