Flink
-
BroadcastState 예제flink 2021. 5. 20. 01:37
문제 입력으로 직원 Employee(id: Int, dept: String) 목록과 퇴직자 RetireId(id: Int) 목록이 있을 때, 직원 목록에서 퇴직자 목록을 제외 후 부서별(dept)로 남게되는 인원 정보 EmployeeLeft(dept: String, count: Int) 를 출력한다. RetireId 목록을 broadcast 해 구현한다. Input(in code) 직원 Employee Employee(1, "dept1") Employee(2, "dept2") Employee(3, "dept3") Employee(4, "dept1") Employee(5, "dept2") Employee(6, "dept3") Employee(7, "dept4") 퇴직자 RetireId RetireId(2..
-
CheckPoint 예제flink 2021. 5. 19. 22:42
문제 Stream WordCount 예제에 CheckPoint 를 적용해 본다. Code import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.scala._ object CheckPointExample { ..
-
ValueState 예제flink 2021. 5. 19. 21:53
문제 입력으로 MyInput(key, value) 가 들어올 때, 각각의 key 값으로 groupBy 해 각각의 id 에 대한 짝수번째 value 값을 출력한다. Input(in code) case class MyInput(key: String, value: Int) val data = env.fromElements( MyInput("a", 1), MyInput("b", 2), MyInput("a", 3), MyInput("a", 4), MyInput("b", 5), MyInput("a", 6), MyInput("b", 7) ) Expected Output 3 5 6 Code import org.apache.flink.api.common.functions.RichFlatMapFunction import ..
-
GlobalWindows 예제flink 2021. 5. 17. 20:38
문제 text file 을 라인 단위로 읽어 월별 신규 레코드가 2개씩 추가될 때 마다 전체 profit sum 을 계산해 출력한다. Input(/home/windbird123/product_info.txt) (month, product, profit) June,Bat,12 June,Perfume,10 July,Television,50 June,Shirt,38 June,Bat,41 Expected Output ProductInfo(June,?,22) ProductInfo(June,?,101) Code import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnviron..
-
TumblingWindow 예제flink 2021. 5. 17. 14:28
문제 입력으로 1초마다 random TimeInput(timestamp: Long, value: Int) 이 들어올 때, 2초 단위로 나누어 value 값을 sum 해 출력한다. Input(in code) case class TimeInput(timestamp: Long, value: Int) val timeInput = TimeInput(System.currentTimeMillis(), Random.nextInt(100)) context.collect(timeInput) // 예시 (1621227515090, 96) ------------------- (1621227516114, 11) (1621227517128, 86) ------------------- (1621227518144, 54) (1621..
-
Iterate 예제flink 2021. 5. 17. 11:08
문제 입력으로 0, 1, 2 가 있을 때, 각각의 숫자를 1씩 증가시키면서 3까지 도달하도록 한다. 모든 숫자가 3에 도달하면 종료되며, 각각의 숫자에 대한 상태를 출력한다. Input(in code) /** * initial: 최초 설정 값 * current: (증가된) 현재 값 * increased: 증가된 정도 * current = initial + increased */ case class MyNumber(initial: Long, current: Long, increased: Int) env.generateSequence(0, 2).map(x => MyNumber(x, x, 0)) Expected Output current 값이 3 에 도달했을 때, MyNumber 출력 MyNumber(2,3,..
-
Stream Split 예제flink 2021. 5. 17. 00:01
문제 숫자가 입력된 text file 을 라인 단위로 읽어 짝수, 홀수로 나누어 각각 파일로 저장한다. Input(/home/windbird123/number.txt) 454 565 45654 767 889 78 Expected Output Even(/home/windbird123/even.txt) 454 45654 78 Odd(/home/windbird123/odd.txt) 565 767 889 Code import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment} object StreamSplitExample { case ..
-
Stream Reduce, Min, MinBy, Max, MaxBy 예제flink 2021. 5. 16. 22:02
문제 text file 을 라인 단위로 읽어 월별 profit 평균을 출력한다. Input(/home/windbird123/product_info.txt) (month, product, profit) June,Bat,12 June,Perfume,10 July,Television,50 June,Shirt,38 June,Bat,41 Expected Output (June,12) (June,11) (July,50) (June,20) (June,25) Code import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object StreamReduceEx..