ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Iterate 예제
    flink 2021. 5. 17. 11:08

    문제

    입력으로 0, 1, 2 가 있을 때, 각각의 숫자를 1씩 증가시키면서 3까지 도달하도록 한다.

    모든 숫자가 3에 도달하면 종료되며, 각각의 숫자에 대한 상태를 출력한다.

     

    Input(in code)

    /**
     * initial: 최초 설정 값
     * current: (증가된) 현재 값
     * increased: 증가된 정도
     * current = initial + increased
     */
    case class MyNumber(initial: Long, current: Long, increased: Int)
    
    env.generateSequence(0, 2).map(x => MyNumber(x, x, 0))

     

    Expected Output

    current 값이 3 에 도달했을 때, MyNumber 출력

    MyNumber(2,3,1)
    MyNumber(1,3,2)
    MyNumber(0,3,3)

     

    Code

    import org.apache.flink.streaming.api.scala._
    
    object IterateExample {
      case class MyNumber(initial: Long, current: Long, increased: Int)
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val data: DataStream[MyNumber] = env.generateSequence(0, 2).map(x => MyNumber(x, x, 0))
    
        // 1 이 증가된 notYet 이 feedback input 으로 orgStream 에 재입력 된다.
        // current 값이 3 일 경우 최종 output 으로 분류되어 전달된다.
        def stepFunction(orgStream: DataStream[MyNumber]): (DataStream[MyNumber], DataStream[MyNumber]) = {
          val arrived = orgStream.filter(_.current == 3)
          val notYet  = orgStream.filter(_.current <= 3).map(x => MyNumber(x.initial, x.current + 1, x.increased + 1))
          (notYet, arrived)
        }
    
        // 3초 동안 feedback input 이 없으면 stream 을 종료한다.
        val iteration: DataStream[MyNumber] = data.iterate(stepFunction _, 3000L)
    
        iteration.print()
        env.execute("Iterate Example")
      }
    }

     

    참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/

    'flink' 카테고리의 다른 글

    GlobalWindows 예제  (0) 2021.05.17
    TumblingWindow 예제  (0) 2021.05.17
    Stream Split 예제  (0) 2021.05.17
    Stream Reduce, Min, MinBy, Max, MaxBy 예제  (0) 2021.05.16
    Stream WordCount 예제  (0) 2021.05.15

    댓글

Designed by Tistory.