-
GlobalWindows 예제flink 2021. 5. 17. 20:38
문제
text file 을 라인 단위로 읽어 월별 신규 레코드가 2개씩 추가될 때 마다 전체 profit sum 을 계산해 출력한다.
Input(/home/windbird123/product_info.txt)
(month, product, profit)
June,Bat,12 June,Perfume,10 July,Television,50 June,Shirt,38 June,Bat,41
Expected Output
ProductInfo(June,?,22) ProductInfo(June,?,101)
Code
import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows import org.apache.flink.streaming.api.windowing.triggers.CountTrigger object GlobalWindowExample { case class ProductInfo(month: String, product: String, profit: Int) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val data = env.readTextFile("/home/windbird123/product_info.txt") val mapped: DataStream[ProductInfo] = data.map(_.split(",")).map(x => ProductInfo(x(0), x(1), x(2).toInt)) // groupBy month val reduced: DataStream[ProductInfo] = mapped .keyBy("month") .window(GlobalWindows.create()) .trigger(CountTrigger.of(2)) .reduce((x: ProductInfo, y: ProductInfo) => ProductInfo(x.month, "?", x.profit + y.profit)) reduced.print() env.execute("Global Window Example") } }
설명
- GlobalWindows 를 사용하면 key 별로 window 가 생성 된다.
- trigger 에 의해 언제 계산할지 결정된다. (trigger 가 없으면 계산이 안됨)
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/
'flink' 카테고리의 다른 글
CheckPoint 예제 (0) 2021.05.19 ValueState 예제 (0) 2021.05.19 TumblingWindow 예제 (0) 2021.05.17 Iterate 예제 (0) 2021.05.17 Stream Split 예제 (0) 2021.05.17