flink
Stream Split 예제
wefree
2021. 5. 17. 00:01
문제
숫자가 입력된 text file 을 라인 단위로 읽어 짝수, 홀수로 나누어 각각 파일로 저장한다.
Input(/home/windbird123/number.txt)
454
565
45654
767
889
78
Expected Output
- Even(/home/windbird123/even.txt)
454 45654 78
- Odd(/home/windbird123/odd.txt)
565 767 889
Code
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}
object StreamSplitExample {
case class ProductInfo(month: String, product: String, profit: Int, count: Int)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val numbers = env.readTextFile("/home/windbird123/number.txt").map(_.toInt)
val splitted: SplitStream[Int] = numbers.split(x => if (x % 2 == 0) Seq("even") else Seq("odd"))
val even: DataStream[Int] = splitted.select("even")
val odd: DataStream[Int] = splitted.select("odd")
even.writeAsText("/home/windbird123/even.txt")
odd.writeAsText("/home/windbird123/odd.txt")
env.execute("Stream Split")
}
}
설명
https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream 를 보면 split 대신에 filter 를 사용하라는 이야기도 있다. https://wefree.tistory.com/13 에서 split 으로 stepFunction 을 구현했을 때, 잘 동작하지 않아 filter 로 구현했다.
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/