flink

Stream Split 예제

wefree 2021. 5. 17. 00:01

문제

숫자가 입력된 text file 을 라인 단위로 읽어 짝수, 홀수로 나누어 각각 파일로 저장한다.

 

Input(/home/windbird123/number.txt)

454
565
45654
767
889
78

 

Expected Output

  • Even(/home/windbird123/even.txt)
    454
    45654
    78​
  • Odd(/home/windbird123/odd.txt)
    565
    767
    889​

 

Code

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}

object StreamSplitExample {
  case class ProductInfo(month: String, product: String, profit: Int, count: Int)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val numbers = env.readTextFile("/home/windbird123/number.txt").map(_.toInt)

    val splitted: SplitStream[Int] = numbers.split(x => if (x % 2 == 0) Seq("even") else Seq("odd"))

    val even: DataStream[Int] = splitted.select("even")
    val odd: DataStream[Int]  = splitted.select("odd")

    even.writeAsText("/home/windbird123/even.txt")
    odd.writeAsText("/home/windbird123/odd.txt")

    env.execute("Stream Split")
  }
}

 

설명

https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream 를 보면 split 대신에 filter 를 사용하라는 이야기도 있다. https://wefree.tistory.com/13 에서 split 으로 stepFunction 을 구현했을 때, 잘 동작하지 않아 filter 로 구현했다.

 

참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/