-
TumblingWindow 예제flink 2021. 5. 17. 14:28
문제
입력으로 1초마다 random TimeInput(timestamp: Long, value: Int) 이 들어올 때, 2초 단위로 나누어 value 값을 sum 해 출력한다.
Input(in code)
case class TimeInput(timestamp: Long, value: Int) val timeInput = TimeInput(System.currentTimeMillis(), Random.nextInt(100)) context.collect(timeInput) // 예시 (1621227515090, 96) ------------------- (1621227516114, 11) (1621227517128, 86) ------------------- (1621227518144, 54) (1621227519153, 91) ------------------- (1621227520168, 9) (1621227521181, 78) ------------------- (1621227522189, 40) (1621227523197, 12) ------------------- (1621227524212, 96) (1621227525224, 18) ------------------- (1621227526239, 99)
Expected Output
input 이 random 하게 들어와 실행할 때 마다 결과가 달라질 수 있다.
> SOURCE: (1621227515090, 96) > SOURCE: (1621227516114, 11) TimeInput(1621227515090,96) > SOURCE: (1621227517128, 86) > SOURCE: (1621227518144, 54) TimeInput(1621227516114,97) > SOURCE: (1621227519153, 91) > SOURCE: (1621227520168, 9) TimeInput(1621227518144,145) > SOURCE: (1621227521181, 78) > SOURCE: (1621227522189, 40) TimeInput(1621227520168,87) > SOURCE: (1621227523197, 12) > SOURCE: (1621227524212, 96) TimeInput(1621227522189,52) > SOURCE: (1621227525224, 18) > SOURCE: (1621227526239, 99) TimeInput(1621227524212,114)
Code
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import scala.util.Random object TumblingWindowExample { case class TimeInput(timestamp: Long, value: Int) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 1초마다 random TimeInput 생성 val data = env.addSource(new RichParallelSourceFunction[TimeInput] { var running: Boolean = true override def run(context: SourceFunction.SourceContext[TimeInput]): Unit = while (running) { val timeInput = TimeInput(System.currentTimeMillis(), Random.nextInt(100)) context.collect(timeInput) println(s"> SOURCE: (${timeInput.timestamp}, ${timeInput.value})") Thread.sleep(1000L) } override def cancel(): Unit = running = false }) val sum = data .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TimeInput](Time.seconds(0L)) { override def extractTimestamp(element: TimeInput): Long = element.timestamp }) .windowAll(TumblingEventTimeWindows.of(Time.seconds(2L))) .reduce((x, y) => TimeInput(x.timestamp, x.value + y.value)) sum.print() env.execute("TumblingWindow Example") } }
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/
'flink' 카테고리의 다른 글
ValueState 예제 (0) 2021.05.19 GlobalWindows 예제 (0) 2021.05.17 Iterate 예제 (0) 2021.05.17 Stream Split 예제 (0) 2021.05.17 Stream Reduce, Min, MinBy, Max, MaxBy 예제 (0) 2021.05.16