ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • TumblingWindow 예제
    flink 2021. 5. 17. 14:28

    문제

    입력으로 1초마다 random TimeInput(timestamp: Long, value: Int) 이 들어올 때, 2초 단위로 나누어 value 값을 sum 해 출력한다.

     

    Input(in code)

    case class TimeInput(timestamp: Long, value: Int)
    
    val timeInput = TimeInput(System.currentTimeMillis(), Random.nextInt(100))
    context.collect(timeInput)
    
    // 예시
    (1621227515090, 96)
    -------------------
    (1621227516114, 11)
    (1621227517128, 86)
    -------------------
    (1621227518144, 54)
    (1621227519153, 91)
    -------------------
    (1621227520168, 9)
    (1621227521181, 78)
    -------------------
    (1621227522189, 40)
    (1621227523197, 12)
    -------------------
    (1621227524212, 96)
    (1621227525224, 18)
    -------------------
    (1621227526239, 99)

     

    Expected Output

    input 이 random 하게 들어와 실행할 때 마다 결과가 달라질 수 있다.

    > SOURCE: (1621227515090, 96)
    > SOURCE: (1621227516114, 11)
    TimeInput(1621227515090,96)
    > SOURCE: (1621227517128, 86)
    > SOURCE: (1621227518144, 54)
    TimeInput(1621227516114,97)
    > SOURCE: (1621227519153, 91)
    > SOURCE: (1621227520168, 9)
    TimeInput(1621227518144,145)
    > SOURCE: (1621227521181, 78)
    > SOURCE: (1621227522189, 40)
    TimeInput(1621227520168,87)
    > SOURCE: (1621227523197, 12)
    > SOURCE: (1621227524212, 96)
    TimeInput(1621227522189,52)
    > SOURCE: (1621227525224, 18)
    > SOURCE: (1621227526239, 99)
    TimeInput(1621227524212,114)

     

     

    Code

    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    
    import scala.util.Random
    
    object TumblingWindowExample {
      case class TimeInput(timestamp: Long, value: Int)
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        // 1초마다 random TimeInput 생성
        val data = env.addSource(new RichParallelSourceFunction[TimeInput] {
          var running: Boolean = true
    
          override def run(context: SourceFunction.SourceContext[TimeInput]): Unit = while (running) {
            val timeInput = TimeInput(System.currentTimeMillis(), Random.nextInt(100))
            context.collect(timeInput)
            println(s"> SOURCE: (${timeInput.timestamp}, ${timeInput.value})")
            Thread.sleep(1000L)
          }
    
          override def cancel(): Unit = running = false
        })
    
        val sum =
          data
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TimeInput](Time.seconds(0L)) {
              override def extractTimestamp(element: TimeInput): Long = element.timestamp
            })
            .windowAll(TumblingEventTimeWindows.of(Time.seconds(2L)))
            .reduce((x, y) => TimeInput(x.timestamp, x.value + y.value))
    
        sum.print()
    
        env.execute("TumblingWindow Example")
      }
    }
    

     

    참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/

    'flink' 카테고리의 다른 글

    ValueState 예제  (0) 2021.05.19
    GlobalWindows 예제  (0) 2021.05.17
    Iterate 예제  (0) 2021.05.17
    Stream Split 예제  (0) 2021.05.17
    Stream Reduce, Min, MinBy, Max, MaxBy 예제  (0) 2021.05.16

    댓글

Designed by Tistory.