-
Stream Split 예제flink 2021. 5. 17. 00:01
문제
숫자가 입력된 text file 을 라인 단위로 읽어 짝수, 홀수로 나누어 각각 파일로 저장한다.
Input(/home/windbird123/number.txt)
454 565 45654 767 889 78
Expected Output
- Even(/home/windbird123/even.txt)
454 45654 78
- Odd(/home/windbird123/odd.txt)
565 767 889
Code
import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment} object StreamSplitExample { case class ProductInfo(month: String, product: String, profit: Int, count: Int) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val numbers = env.readTextFile("/home/windbird123/number.txt").map(_.toInt) val splitted: SplitStream[Int] = numbers.split(x => if (x % 2 == 0) Seq("even") else Seq("odd")) val even: DataStream[Int] = splitted.select("even") val odd: DataStream[Int] = splitted.select("odd") even.writeAsText("/home/windbird123/even.txt") odd.writeAsText("/home/windbird123/odd.txt") env.execute("Stream Split") } }
설명
https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream 를 보면 split 대신에 filter 를 사용하라는 이야기도 있다. https://wefree.tistory.com/13 에서 split 으로 stepFunction 을 구현했을 때, 잘 동작하지 않아 filter 로 구현했다.
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/
'flink' 카테고리의 다른 글
TumblingWindow 예제 (0) 2021.05.17 Iterate 예제 (0) 2021.05.17 Stream Reduce, Min, MinBy, Max, MaxBy 예제 (0) 2021.05.16 Stream WordCount 예제 (0) 2021.05.15 DataSet Join 예제 (0) 2021.05.15 - Even(/home/windbird123/even.txt)