ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • GlobalWindows 예제
    flink 2021. 5. 17. 20:38

    문제

    text file 을 라인 단위로 읽어 월별 신규 레코드가 2개씩 추가될 때 마다 전체 profit sum 을 계산해 출력한다.

     

    Input(/home/windbird123/product_info.txt)

    (month, product, profit)

    June,Bat,12
    June,Perfume,10
    July,Television,50
    June,Shirt,38
    June,Bat,41

     

    Expected Output

    ProductInfo(June,?,22)
    ProductInfo(June,?,101)

     

    Code

    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
    import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
    
    object GlobalWindowExample {
      case class ProductInfo(month: String, product: String, profit: Int)
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val data = env.readTextFile("/home/windbird123/product_info.txt")
        val mapped: DataStream[ProductInfo] =
          data.map(_.split(",")).map(x => ProductInfo(x(0), x(1), x(2).toInt))
    
        // groupBy month
        val reduced: DataStream[ProductInfo] = mapped
          .keyBy("month")
          .window(GlobalWindows.create())
          .trigger(CountTrigger.of(2))
          .reduce((x: ProductInfo, y: ProductInfo) => ProductInfo(x.month, "?", x.profit + y.profit))
    
        reduced.print()
        env.execute("Global Window Example")
      }
    }
    

     

    설명

    • GlobalWindows 를 사용하면 key 별로 window 가 생성 된다.
    • trigger 에 의해 언제 계산할지 결정된다. (trigger 가 없으면 계산이 안됨)

     

    참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/

    'flink' 카테고리의 다른 글

    CheckPoint 예제  (0) 2021.05.19
    ValueState 예제  (0) 2021.05.19
    TumblingWindow 예제  (0) 2021.05.17
    Iterate 예제  (0) 2021.05.17
    Stream Split 예제  (0) 2021.05.17

    댓글

Designed by Tistory.