ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • BroadcastState 예제
    flink 2021. 5. 20. 01:37

    문제

    입력으로 직원 Employee(id: Int, dept: String) 목록과 퇴직자 RetireId(id: Int) 목록이 있을 때,

    직원 목록에서 퇴직자 목록을 제외 후 부서별(dept)로 남게되는 인원 정보 EmployeeLeft(dept: String, count: Int) 를 출력한다. RetireId 목록을 broadcast 해 구현한다.

     

    Input(in code)

    • 직원 Employee
      Employee(1, "dept1")
      Employee(2, "dept2")
      Employee(3, "dept3")
      Employee(4, "dept1")
      Employee(5, "dept2")
      Employee(6, "dept3")
      Employee(7, "dept4")​
    • 퇴직자 RetireId
      RetireId(2)
      RetireId(6)​

    Expected Output

    EmployeeLeft(dept1,1)
    Employee(2,dept2) is retired
    EmployeeLeft(dept3,1)
    EmployeeLeft(dept1,2)
    EmployeeLeft(dept2,1)
    Employee(6,dept3) is retired
    EmployeeLeft(dept4,1)

     

    Code

    import org.apache.flink.api.common.state.{MapStateDescriptor, ValueState, ValueStateDescriptor}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.datastream.BroadcastStream
    import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
    import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    object BroadcastStateExample {
      case class Employee(id: Int, dept: String)
      case class RetireId(id: Int)
      case class EmployeeLeft(dept: String, count: Int)
    
      // RetireEmployeeProcess.processBroadcastElement 에서 Map 의 key 값으로는 id 를 넣고 value 값으로는 의미없는 empty string 을 넣을 예정
      val retireIdsDescriptor: MapStateDescriptor[Int, String] =
        new MapStateDescriptor[Int, String]("retire_ids", classOf[Int], classOf[String])
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val employees: DataStream[Employee] = env.addSource(new RichParallelSourceFunction[Employee] {
          var running: Boolean = true
          override def run(ctx: SourceFunction.SourceContext[Employee]): Unit = {
            // retireIds 가 먼저 broadcast 완료된 후 employees 가 처리되기 위해 sleep 을 준다.
            Thread.sleep(1000L)
            ctx.collect(Employee(1, "dept1"))
            ctx.collect(Employee(2, "dept2"))
            ctx.collect(Employee(3, "dept3"))
            ctx.collect(Employee(4, "dept1"))
            ctx.collect(Employee(5, "dept2"))
            ctx.collect(Employee(6, "dept3"))
            ctx.collect(Employee(7, "dept4"))
          }
    
          override def cancel(): Unit = running = false
        })
    
        val retireIds: DataStream[RetireId] = env.fromElements(
          RetireId(2),
          RetireId(6)
        )
    
        val retireIdsBroadcast: BroadcastStream[RetireId] = retireIds.broadcast(retireIdsDescriptor)
    
        // 부서(dept)별 남게 되는 직원(employee) 수
        val employeeLeft: DataStream[EmployeeLeft] = employees
          .keyBy("dept")
          .connect(retireIdsBroadcast) // will return a BroadcastConnectedStream
          .process(new RetireEmployeeProcess)
    
        employeeLeft.print()
        env.execute("BroadcastState Example")
      }
    
      // dept 로 keyBy 를 해서 KeyedBroadcastProcessFunction 의 첫번째 type 이 String 임
      class RetireEmployeeProcess extends KeyedBroadcastProcessFunction[String, Employee, RetireId, EmployeeLeft] {
        // dept 별로 남아 있는 직원수를 상태로 관리하기 위해 (초기값은 0 으로 자동 세팅됨)
        private var counter: ValueState[Int] = _
    
        override def open(parameters: Configuration): Unit = {
          val counterDescriptor = new ValueStateDescriptor("counter", classOf[Int])
          counter = getRuntimeContext.getState[Int](counterDescriptor)
        }
    
        override def processElement(
          employee: Employee,
          ctx: KeyedBroadcastProcessFunction[String, Employee, RetireId, EmployeeLeft]#ReadOnlyContext,
          out: Collector[EmployeeLeft]
        ): Unit = {
          // println(ctx.getBroadcastState(retireIdsDescriptor).immutableEntries())
    
          // 현재 직원의 id 가 퇴직한 직원 목록에 있는지 체크
          val isRetireEmployee = ctx.getBroadcastState(retireIdsDescriptor).contains(employee.id)
    
          // 퇴직한 직원이 아니면 남아있는 직원수에 +1
          if (!isRetireEmployee) {
            val currentCount = counter.value()
            counter.update(currentCount + 1)
    
            out.collect(EmployeeLeft(employee.dept, counter.value()))
          } else {
            println(s"$employee is retired")
          }
        }
    
        // broadcast 된 retireId 를 받아서 retireIdsDescriptor 에 어떻게 넣을지(혹은 삭제)를 처리
        override def processBroadcastElement(
          retireId: RetireId,
          ctx: KeyedBroadcastProcessFunction[String, Employee, RetireId, EmployeeLeft]#Context,
          out: Collector[EmployeeLeft]
        ): Unit = {
          val id: Int = retireId.id
          ctx.getBroadcastState(retireIdsDescriptor).put(id, "")
        }
      }
    }
    

     

    설명

    퇴직자 정보가 broadcast 되기 전에 처리할 직원 데이터가 먼저 들어올 수도 있으므로, sleep 을 주어 직원 데이터가 나중에 유입되도록 조정했다.

     

     

    참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/

    'flink' 카테고리의 다른 글

    CheckPoint 예제  (0) 2021.05.19
    ValueState 예제  (0) 2021.05.19
    GlobalWindows 예제  (0) 2021.05.17
    TumblingWindow 예제  (0) 2021.05.17
    Iterate 예제  (0) 2021.05.17

    댓글

Designed by Tistory.