-
DataSet Join 예제flink 2021. 5. 15. 20:34
문제
person 파일 (id, name) 과 location 파일 (id, city) 을 left outer join 해 output 파일에 저장한다.
Person 파일 (person.txt)
1,John 2,Albert 3,Lui 4,Smith 5,Robert
Location 파일 (location.txt)
1,DC 2,NY 4,LA 6,LU 7,DL 8,NH
Expected Output
1 Some(John) Some(DC) 2 Some(Albert) Some(NY) 3 Some(Lui) None 4 Some(Smith) Some(LA) 5 Some(Robert) None
Code
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ object JoinExample { case class Person(id: Int, name: String) case class Location(id: Int, city: String) case class PersonLocation(id: Int, name: Option[String], location: Option[String]) def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val personSet: DataSet[Person] = env .readTextFile(params.get("person")) .map(_.split(",") match { case Array(id, name) => Person(id.toInt, name) }) val locationSet: DataSet[Location] = env .readTextFile(params.get("location")) .map(_.split(",") match { case Array(id, city) => Location(id.toInt, city) }) val joined: DataSet[PersonLocation] = personSet .leftOuterJoin(locationSet, JoinHint.BROADCAST_HASH_SECOND) .where("id") .equalTo("id") .apply((person, location) => PersonLocation(person.id, Some(person.name), Option(location).map(_.city))) joined.writeAsCsv(params.get("output"), "\n", " ") env.execute("Left Outer Join Example") } }
설명
// JoinHint 로 BROADCAST_HASH_SECOND, REPARTITION_HASH_SECOND, REPARTITION_SORT_MERGE 등. // - BROADCAST_HASH_SECOND: 오른쪽 데이터 크기가 작을때, 오른쪽 데이터 전체를 // broadcast 해 map side join // - REPARTITION_HASH_SECOND: ??? // - REPARTITION_SORT_MERGE: 왼쪽 or 오른쪽 데이터중 하나가 정렬되어 있을 때 personSet.leftOuterJoin(locationSet, JoinHint.BROADCAST_HASH_SECOND) // 한편 아래와 같은 join 함수도 있으니 참고하자 personSet.joinWithHuge(...) personSet.joinWithTiny(...)
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/
'flink' 카테고리의 다른 글
Iterate 예제 (0) 2021.05.17 Stream Split 예제 (0) 2021.05.17 Stream Reduce, Min, MinBy, Max, MaxBy 예제 (0) 2021.05.16 Stream WordCount 예제 (0) 2021.05.15 DataSet WordCount 예제 (0) 2021.05.15