-
BroadcastState 예제flink 2021. 5. 20. 01:37
문제
입력으로 직원 Employee(id: Int, dept: String) 목록과 퇴직자 RetireId(id: Int) 목록이 있을 때,
직원 목록에서 퇴직자 목록을 제외 후 부서별(dept)로 남게되는 인원 정보 EmployeeLeft(dept: String, count: Int) 를 출력한다. RetireId 목록을 broadcast 해 구현한다.
Input(in code)
- 직원 Employee
Employee(1, "dept1") Employee(2, "dept2") Employee(3, "dept3") Employee(4, "dept1") Employee(5, "dept2") Employee(6, "dept3") Employee(7, "dept4")
- 퇴직자 RetireId
RetireId(2) RetireId(6)
Expected Output
EmployeeLeft(dept1,1) Employee(2,dept2) is retired EmployeeLeft(dept3,1) EmployeeLeft(dept1,2) EmployeeLeft(dept2,1) Employee(6,dept3) is retired EmployeeLeft(dept4,1)
Code
import org.apache.flink.api.common.state.{MapStateDescriptor, ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object BroadcastStateExample { case class Employee(id: Int, dept: String) case class RetireId(id: Int) case class EmployeeLeft(dept: String, count: Int) // RetireEmployeeProcess.processBroadcastElement 에서 Map 의 key 값으로는 id 를 넣고 value 값으로는 의미없는 empty string 을 넣을 예정 val retireIdsDescriptor: MapStateDescriptor[Int, String] = new MapStateDescriptor[Int, String]("retire_ids", classOf[Int], classOf[String]) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val employees: DataStream[Employee] = env.addSource(new RichParallelSourceFunction[Employee] { var running: Boolean = true override def run(ctx: SourceFunction.SourceContext[Employee]): Unit = { // retireIds 가 먼저 broadcast 완료된 후 employees 가 처리되기 위해 sleep 을 준다. Thread.sleep(1000L) ctx.collect(Employee(1, "dept1")) ctx.collect(Employee(2, "dept2")) ctx.collect(Employee(3, "dept3")) ctx.collect(Employee(4, "dept1")) ctx.collect(Employee(5, "dept2")) ctx.collect(Employee(6, "dept3")) ctx.collect(Employee(7, "dept4")) } override def cancel(): Unit = running = false }) val retireIds: DataStream[RetireId] = env.fromElements( RetireId(2), RetireId(6) ) val retireIdsBroadcast: BroadcastStream[RetireId] = retireIds.broadcast(retireIdsDescriptor) // 부서(dept)별 남게 되는 직원(employee) 수 val employeeLeft: DataStream[EmployeeLeft] = employees .keyBy("dept") .connect(retireIdsBroadcast) // will return a BroadcastConnectedStream .process(new RetireEmployeeProcess) employeeLeft.print() env.execute("BroadcastState Example") } // dept 로 keyBy 를 해서 KeyedBroadcastProcessFunction 의 첫번째 type 이 String 임 class RetireEmployeeProcess extends KeyedBroadcastProcessFunction[String, Employee, RetireId, EmployeeLeft] { // dept 별로 남아 있는 직원수를 상태로 관리하기 위해 (초기값은 0 으로 자동 세팅됨) private var counter: ValueState[Int] = _ override def open(parameters: Configuration): Unit = { val counterDescriptor = new ValueStateDescriptor("counter", classOf[Int]) counter = getRuntimeContext.getState[Int](counterDescriptor) } override def processElement( employee: Employee, ctx: KeyedBroadcastProcessFunction[String, Employee, RetireId, EmployeeLeft]#ReadOnlyContext, out: Collector[EmployeeLeft] ): Unit = { // println(ctx.getBroadcastState(retireIdsDescriptor).immutableEntries()) // 현재 직원의 id 가 퇴직한 직원 목록에 있는지 체크 val isRetireEmployee = ctx.getBroadcastState(retireIdsDescriptor).contains(employee.id) // 퇴직한 직원이 아니면 남아있는 직원수에 +1 if (!isRetireEmployee) { val currentCount = counter.value() counter.update(currentCount + 1) out.collect(EmployeeLeft(employee.dept, counter.value())) } else { println(s"$employee is retired") } } // broadcast 된 retireId 를 받아서 retireIdsDescriptor 에 어떻게 넣을지(혹은 삭제)를 처리 override def processBroadcastElement( retireId: RetireId, ctx: KeyedBroadcastProcessFunction[String, Employee, RetireId, EmployeeLeft]#Context, out: Collector[EmployeeLeft] ): Unit = { val id: Int = retireId.id ctx.getBroadcastState(retireIdsDescriptor).put(id, "") } } }
설명
퇴직자 정보가 broadcast 되기 전에 처리할 직원 데이터가 먼저 들어올 수도 있으므로, sleep 을 주어 직원 데이터가 나중에 유입되도록 조정했다.
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/
'flink' 카테고리의 다른 글
CheckPoint 예제 (0) 2021.05.19 ValueState 예제 (0) 2021.05.19 GlobalWindows 예제 (0) 2021.05.17 TumblingWindow 예제 (0) 2021.05.17 Iterate 예제 (0) 2021.05.17 - 직원 Employee