ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • DataSet Join 예제
    flink 2021. 5. 15. 20:34

    문제

    person 파일 (id, name) 과 location 파일 (id, city) 을 left outer join 해 output 파일에 저장한다. 

     

    Person 파일 (person.txt)

    1,John
    2,Albert
    3,Lui
    4,Smith
    5,Robert

     

    Location 파일 (location.txt)

    1,DC
    2,NY
    4,LA
    6,LU
    7,DL
    8,NH

     

    Expected Output

    1 Some(John) Some(DC)
    2 Some(Albert) Some(NY)
    3 Some(Lui) None
    4 Some(Smith) Some(LA)
    5 Some(Robert) None

     

    Code

    import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
    import org.apache.flink.api.java.utils.ParameterTool
    import org.apache.flink.api.scala._
    
    object JoinExample {
      case class Person(id: Int, name: String)
      case class Location(id: Int, city: String)
      case class PersonLocation(id: Int, name: Option[String], location: Option[String])
    
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val params = ParameterTool.fromArgs(args)
        env.getConfig.setGlobalJobParameters(params)
    
        val personSet: DataSet[Person] = env
          .readTextFile(params.get("person"))
          .map(_.split(",") match { case Array(id, name) => Person(id.toInt, name) })
    
        val locationSet: DataSet[Location] = env
          .readTextFile(params.get("location"))
          .map(_.split(",") match { case Array(id, city) => Location(id.toInt, city) })
    
        val joined: DataSet[PersonLocation] = personSet
          .leftOuterJoin(locationSet, JoinHint.BROADCAST_HASH_SECOND)
          .where("id")
          .equalTo("id")
          .apply((person, location) => PersonLocation(person.id, Some(person.name), Option(location).map(_.city)))
    
        joined.writeAsCsv(params.get("output"), "\n", " ")
        env.execute("Left Outer Join Example")
      }
    }

     

    설명

    // JoinHint 로 BROADCAST_HASH_SECOND, REPARTITION_HASH_SECOND, REPARTITION_SORT_MERGE 등.
    //   - BROADCAST_HASH_SECOND: 오른쪽 데이터 크기가 작을때, 오른쪽 데이터 전체를 
    //     broadcast 해 map side join
    //   - REPARTITION_HASH_SECOND: ???
    //   - REPARTITION_SORT_MERGE: 왼쪽 or 오른쪽 데이터중 하나가 정렬되어 있을 때
    personSet.leftOuterJoin(locationSet, JoinHint.BROADCAST_HASH_SECOND)
    
    // 한편 아래와 같은 join 함수도 있으니 참고하자
    personSet.joinWithHuge(...)
    personSet.joinWithTiny(...)

     

    참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/

    'flink' 카테고리의 다른 글

    Iterate 예제  (0) 2021.05.17
    Stream Split 예제  (0) 2021.05.17
    Stream Reduce, Min, MinBy, Max, MaxBy 예제  (0) 2021.05.16
    Stream WordCount 예제  (0) 2021.05.15
    DataSet WordCount 예제  (0) 2021.05.15

    댓글

Designed by Tistory.