flink
TumblingWindow 예제
wefree
2021. 5. 17. 14:28
문제
입력으로 1초마다 random TimeInput(timestamp: Long, value: Int) 이 들어올 때, 2초 단위로 나누어 value 값을 sum 해 출력한다.
Input(in code)
case class TimeInput(timestamp: Long, value: Int)
val timeInput = TimeInput(System.currentTimeMillis(), Random.nextInt(100))
context.collect(timeInput)
// 예시
(1621227515090, 96)
-------------------
(1621227516114, 11)
(1621227517128, 86)
-------------------
(1621227518144, 54)
(1621227519153, 91)
-------------------
(1621227520168, 9)
(1621227521181, 78)
-------------------
(1621227522189, 40)
(1621227523197, 12)
-------------------
(1621227524212, 96)
(1621227525224, 18)
-------------------
(1621227526239, 99)
Expected Output
input 이 random 하게 들어와 실행할 때 마다 결과가 달라질 수 있다.
> SOURCE: (1621227515090, 96)
> SOURCE: (1621227516114, 11)
TimeInput(1621227515090,96)
> SOURCE: (1621227517128, 86)
> SOURCE: (1621227518144, 54)
TimeInput(1621227516114,97)
> SOURCE: (1621227519153, 91)
> SOURCE: (1621227520168, 9)
TimeInput(1621227518144,145)
> SOURCE: (1621227521181, 78)
> SOURCE: (1621227522189, 40)
TimeInput(1621227520168,87)
> SOURCE: (1621227523197, 12)
> SOURCE: (1621227524212, 96)
TimeInput(1621227522189,52)
> SOURCE: (1621227525224, 18)
> SOURCE: (1621227526239, 99)
TimeInput(1621227524212,114)
Code
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Random
object TumblingWindowExample {
case class TimeInput(timestamp: Long, value: Int)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 1초마다 random TimeInput 생성
val data = env.addSource(new RichParallelSourceFunction[TimeInput] {
var running: Boolean = true
override def run(context: SourceFunction.SourceContext[TimeInput]): Unit = while (running) {
val timeInput = TimeInput(System.currentTimeMillis(), Random.nextInt(100))
context.collect(timeInput)
println(s"> SOURCE: (${timeInput.timestamp}, ${timeInput.value})")
Thread.sleep(1000L)
}
override def cancel(): Unit = running = false
})
val sum =
data
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TimeInput](Time.seconds(0L)) {
override def extractTimestamp(element: TimeInput): Long = element.timestamp
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(2L)))
.reduce((x, y) => TimeInput(x.timestamp, x.value + y.value))
sum.print()
env.execute("TumblingWindow Example")
}
}
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/