flink

GlobalWindows 예제

wefree 2021. 5. 17. 20:38

문제

text file 을 라인 단위로 읽어 월별 신규 레코드가 2개씩 추가될 때 마다 전체 profit sum 을 계산해 출력한다.

 

Input(/home/windbird123/product_info.txt)

(month, product, profit)

June,Bat,12
June,Perfume,10
July,Television,50
June,Shirt,38
June,Bat,41

 

Expected Output

ProductInfo(June,?,22)
ProductInfo(June,?,101)

 

Code

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger

object GlobalWindowExample {
  case class ProductInfo(month: String, product: String, profit: Int)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val data = env.readTextFile("/home/windbird123/product_info.txt")
    val mapped: DataStream[ProductInfo] =
      data.map(_.split(",")).map(x => ProductInfo(x(0), x(1), x(2).toInt))

    // groupBy month
    val reduced: DataStream[ProductInfo] = mapped
      .keyBy("month")
      .window(GlobalWindows.create())
      .trigger(CountTrigger.of(2))
      .reduce((x: ProductInfo, y: ProductInfo) => ProductInfo(x.month, "?", x.profit + y.profit))

    reduced.print()
    env.execute("Global Window Example")
  }
}

 

설명

  • GlobalWindows 를 사용하면 key 별로 window 가 생성 된다.
  • trigger 에 의해 언제 계산할지 결정된다. (trigger 가 없으면 계산이 안됨)

 

참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/