-
Iterate 예제flink 2021. 5. 17. 11:08
문제
입력으로 0, 1, 2 가 있을 때, 각각의 숫자를 1씩 증가시키면서 3까지 도달하도록 한다.
모든 숫자가 3에 도달하면 종료되며, 각각의 숫자에 대한 상태를 출력한다.
Input(in code)
/** * initial: 최초 설정 값 * current: (증가된) 현재 값 * increased: 증가된 정도 * current = initial + increased */ case class MyNumber(initial: Long, current: Long, increased: Int) env.generateSequence(0, 2).map(x => MyNumber(x, x, 0))
Expected Output
current 값이 3 에 도달했을 때, MyNumber 출력
MyNumber(2,3,1) MyNumber(1,3,2) MyNumber(0,3,3)
Code
import org.apache.flink.streaming.api.scala._ object IterateExample { case class MyNumber(initial: Long, current: Long, increased: Int) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val data: DataStream[MyNumber] = env.generateSequence(0, 2).map(x => MyNumber(x, x, 0)) // 1 이 증가된 notYet 이 feedback input 으로 orgStream 에 재입력 된다. // current 값이 3 일 경우 최종 output 으로 분류되어 전달된다. def stepFunction(orgStream: DataStream[MyNumber]): (DataStream[MyNumber], DataStream[MyNumber]) = { val arrived = orgStream.filter(_.current == 3) val notYet = orgStream.filter(_.current <= 3).map(x => MyNumber(x.initial, x.current + 1, x.increased + 1)) (notYet, arrived) } // 3초 동안 feedback input 이 없으면 stream 을 종료한다. val iteration: DataStream[MyNumber] = data.iterate(stepFunction _, 3000L) iteration.print() env.execute("Iterate Example") } }
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/
'flink' 카테고리의 다른 글
GlobalWindows 예제 (0) 2021.05.17 TumblingWindow 예제 (0) 2021.05.17 Stream Split 예제 (0) 2021.05.17 Stream Reduce, Min, MinBy, Max, MaxBy 예제 (0) 2021.05.16 Stream WordCount 예제 (0) 2021.05.15