flink

TumblingWindow 예제

wefree 2021. 5. 17. 14:28

문제

입력으로 1초마다 random TimeInput(timestamp: Long, value: Int) 이 들어올 때, 2초 단위로 나누어 value 값을 sum 해 출력한다.

 

Input(in code)

case class TimeInput(timestamp: Long, value: Int)

val timeInput = TimeInput(System.currentTimeMillis(), Random.nextInt(100))
context.collect(timeInput)

// 예시
(1621227515090, 96)
-------------------
(1621227516114, 11)
(1621227517128, 86)
-------------------
(1621227518144, 54)
(1621227519153, 91)
-------------------
(1621227520168, 9)
(1621227521181, 78)
-------------------
(1621227522189, 40)
(1621227523197, 12)
-------------------
(1621227524212, 96)
(1621227525224, 18)
-------------------
(1621227526239, 99)

 

Expected Output

input 이 random 하게 들어와 실행할 때 마다 결과가 달라질 수 있다.

> SOURCE: (1621227515090, 96)
> SOURCE: (1621227516114, 11)
TimeInput(1621227515090,96)
> SOURCE: (1621227517128, 86)
> SOURCE: (1621227518144, 54)
TimeInput(1621227516114,97)
> SOURCE: (1621227519153, 91)
> SOURCE: (1621227520168, 9)
TimeInput(1621227518144,145)
> SOURCE: (1621227521181, 78)
> SOURCE: (1621227522189, 40)
TimeInput(1621227520168,87)
> SOURCE: (1621227523197, 12)
> SOURCE: (1621227524212, 96)
TimeInput(1621227522189,52)
> SOURCE: (1621227525224, 18)
> SOURCE: (1621227526239, 99)
TimeInput(1621227524212,114)

 

 

Code

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

import scala.util.Random

object TumblingWindowExample {
  case class TimeInput(timestamp: Long, value: Int)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 1초마다 random TimeInput 생성
    val data = env.addSource(new RichParallelSourceFunction[TimeInput] {
      var running: Boolean = true

      override def run(context: SourceFunction.SourceContext[TimeInput]): Unit = while (running) {
        val timeInput = TimeInput(System.currentTimeMillis(), Random.nextInt(100))
        context.collect(timeInput)
        println(s"> SOURCE: (${timeInput.timestamp}, ${timeInput.value})")
        Thread.sleep(1000L)
      }

      override def cancel(): Unit = running = false
    })

    val sum =
      data
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TimeInput](Time.seconds(0L)) {
          override def extractTimestamp(element: TimeInput): Long = element.timestamp
        })
        .windowAll(TumblingEventTimeWindows.of(Time.seconds(2L)))
        .reduce((x, y) => TimeInput(x.timestamp, x.value + y.value))

    sum.print()

    env.execute("TumblingWindow Example")
  }
}

 

참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/