-
ValueState 예제flink 2021. 5. 19. 21:53
문제
입력으로 MyInput(key, value) 가 들어올 때, 각각의 key 값으로 groupBy 해 각각의 id 에 대한 짝수번째 value 값을 출력한다.
Input(in code)
case class MyInput(key: String, value: Int) val data = env.fromElements( MyInput("a", 1), MyInput("b", 2), MyInput("a", 3), MyInput("a", 4), MyInput("b", 5), MyInput("a", 6), MyInput("b", 7) )
Expected Output
3 5 6
Code
import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object ValueStateExample { case class MyInput(key: String, value: Int) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val data = env.fromElements( MyInput("a", 1), MyInput("b", 2), MyInput("a", 3), MyInput("a", 4), MyInput("b", 5), MyInput("a", 6), MyInput("b", 7) ) val nth = data .keyBy(_.key) .flatMap(new StateNthFunction(2)) nth.print() env.execute("Value State Example") } case class MyState(value: Int) class StateNthFunction(perN: Int) extends RichFlatMapFunction[MyInput, Int] { private var counter: ValueState[MyState] = _ override def open(parameters: Configuration): Unit = { val counterDescriptor = new ValueStateDescriptor("counter", classOf[MyState]) counter = getRuntimeContext.getState[MyState](counterDescriptor) } override def flatMap(timeInput: MyInput, collector: Collector[Int]): Unit = { val currentCount = { val now = counter.value() if (now == null) MyState(1) else now } if (currentCount.value % perN == 0) { collector.collect(timeInput.value) } counter.update(MyState(currentCount.value + 1)) } } }
설명
- 키 상태로는 ValueState[T] 이외에 ListState[T], MapState[K, V], ReducingState[T], AggregatingState[I, O] 등이 있다.
- 키 상태는 KeyedStream 에 적용하는 함수에서만 사용할 수 있다. (DataStream.keyBy() 호출이 된 상태여야함)
'flink' 카테고리의 다른 글
BroadcastState 예제 (0) 2021.05.20 CheckPoint 예제 (0) 2021.05.19 GlobalWindows 예제 (0) 2021.05.17 TumblingWindow 예제 (0) 2021.05.17 Iterate 예제 (0) 2021.05.17