ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • ValueState 예제
    flink 2021. 5. 19. 21:53

    문제

    입력으로 MyInput(key, value) 가 들어올 때, 각각의 key 값으로 groupBy 해 각각의 id 에 대한 짝수번째 value 값을 출력한다.

     

    Input(in code)

    case class MyInput(key: String, value: Int)
    
    val data = env.fromElements(
          MyInput("a", 1),
          MyInput("b", 2),
          MyInput("a", 3),
          MyInput("a", 4),
          MyInput("b", 5),
          MyInput("a", 6),
          MyInput("b", 7)
        )

     

    Expected Output

    3
    5
    6

     

    Code

    import org.apache.flink.api.common.functions.RichFlatMapFunction
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    object ValueStateExample {
      case class MyInput(key: String, value: Int)
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val data = env.fromElements(
          MyInput("a", 1),
          MyInput("b", 2),
          MyInput("a", 3),
          MyInput("a", 4),
          MyInput("b", 5),
          MyInput("a", 6),
          MyInput("b", 7)
        )
    
        val nth = data
          .keyBy(_.key)
          .flatMap(new StateNthFunction(2))
    
        nth.print()
    
        env.execute("Value State Example")
      }
    
      case class MyState(value: Int)
    
      class StateNthFunction(perN: Int) extends RichFlatMapFunction[MyInput, Int] {
        private var counter: ValueState[MyState] = _
    
        override def open(parameters: Configuration): Unit = {
          val counterDescriptor = new ValueStateDescriptor("counter", classOf[MyState])
          counter = getRuntimeContext.getState[MyState](counterDescriptor)
        }
    
        override def flatMap(timeInput: MyInput, collector: Collector[Int]): Unit = {
          val currentCount = {
            val now = counter.value()
            if (now == null) MyState(1) else now
          }
    
          if (currentCount.value % perN == 0) {
            collector.collect(timeInput.value)
          }
    
          counter.update(MyState(currentCount.value + 1))
        }
      }
    }
    

     

    설명

    • 키 상태로는 ValueState[T] 이외에 ListState[T], MapState[K, V], ReducingState[T], AggregatingState[I, O] 등이 있다.
    • 키 상태는 KeyedStream 에 적용하는 함수에서만 사용할 수 있다. (DataStream.keyBy() 호출이 된 상태여야함)

    'flink' 카테고리의 다른 글

    BroadcastState 예제  (0) 2021.05.20
    CheckPoint 예제  (0) 2021.05.19
    GlobalWindows 예제  (0) 2021.05.17
    TumblingWindow 예제  (0) 2021.05.17
    Iterate 예제  (0) 2021.05.17

    댓글

Designed by Tistory.