flink

ValueState 예제

wefree 2021. 5. 19. 21:53

문제

입력으로 MyInput(key, value) 가 들어올 때, 각각의 key 값으로 groupBy 해 각각의 id 에 대한 짝수번째 value 값을 출력한다.

 

Input(in code)

case class MyInput(key: String, value: Int)

val data = env.fromElements(
      MyInput("a", 1),
      MyInput("b", 2),
      MyInput("a", 3),
      MyInput("a", 4),
      MyInput("b", 5),
      MyInput("a", 6),
      MyInput("b", 7)
    )

 

Expected Output

3
5
6

 

Code

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object ValueStateExample {
  case class MyInput(key: String, value: Int)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val data = env.fromElements(
      MyInput("a", 1),
      MyInput("b", 2),
      MyInput("a", 3),
      MyInput("a", 4),
      MyInput("b", 5),
      MyInput("a", 6),
      MyInput("b", 7)
    )

    val nth = data
      .keyBy(_.key)
      .flatMap(new StateNthFunction(2))

    nth.print()

    env.execute("Value State Example")
  }

  case class MyState(value: Int)

  class StateNthFunction(perN: Int) extends RichFlatMapFunction[MyInput, Int] {
    private var counter: ValueState[MyState] = _

    override def open(parameters: Configuration): Unit = {
      val counterDescriptor = new ValueStateDescriptor("counter", classOf[MyState])
      counter = getRuntimeContext.getState[MyState](counterDescriptor)
    }

    override def flatMap(timeInput: MyInput, collector: Collector[Int]): Unit = {
      val currentCount = {
        val now = counter.value()
        if (now == null) MyState(1) else now
      }

      if (currentCount.value % perN == 0) {
        collector.collect(timeInput.value)
      }

      counter.update(MyState(currentCount.value + 1))
    }
  }
}

 

설명

  • 키 상태로는 ValueState[T] 이외에 ListState[T], MapState[K, V], ReducingState[T], AggregatingState[I, O] 등이 있다.
  • 키 상태는 KeyedStream 에 적용하는 함수에서만 사용할 수 있다. (DataStream.keyBy() 호출이 된 상태여야함)