flink
ValueState 예제
wefree
2021. 5. 19. 21:53
문제
입력으로 MyInput(key, value) 가 들어올 때, 각각의 key 값으로 groupBy 해 각각의 id 에 대한 짝수번째 value 값을 출력한다.
Input(in code)
case class MyInput(key: String, value: Int)
val data = env.fromElements(
MyInput("a", 1),
MyInput("b", 2),
MyInput("a", 3),
MyInput("a", 4),
MyInput("b", 5),
MyInput("a", 6),
MyInput("b", 7)
)
Expected Output
3
5
6
Code
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object ValueStateExample {
case class MyInput(key: String, value: Int)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val data = env.fromElements(
MyInput("a", 1),
MyInput("b", 2),
MyInput("a", 3),
MyInput("a", 4),
MyInput("b", 5),
MyInput("a", 6),
MyInput("b", 7)
)
val nth = data
.keyBy(_.key)
.flatMap(new StateNthFunction(2))
nth.print()
env.execute("Value State Example")
}
case class MyState(value: Int)
class StateNthFunction(perN: Int) extends RichFlatMapFunction[MyInput, Int] {
private var counter: ValueState[MyState] = _
override def open(parameters: Configuration): Unit = {
val counterDescriptor = new ValueStateDescriptor("counter", classOf[MyState])
counter = getRuntimeContext.getState[MyState](counterDescriptor)
}
override def flatMap(timeInput: MyInput, collector: Collector[Int]): Unit = {
val currentCount = {
val now = counter.value()
if (now == null) MyState(1) else now
}
if (currentCount.value % perN == 0) {
collector.collect(timeInput.value)
}
counter.update(MyState(currentCount.value + 1))
}
}
}
설명
- 키 상태로는 ValueState[T] 이외에 ListState[T], MapState[K, V], ReducingState[T], AggregatingState[I, O] 등이 있다.
- 키 상태는 KeyedStream 에 적용하는 함수에서만 사용할 수 있다. (DataStream.keyBy() 호출이 된 상태여야함)