ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Stream Reduce, Min, MinBy, Max, MaxBy 예제
    flink 2021. 5. 16. 22:02

    문제

    text file 을 라인 단위로 읽어 월별 profit 평균을 출력한다.

     

    Input(/home/windbird123/product_info.txt)

    (month, product, profit)

    June,Bat,12
    June,Perfume,10
    July,Television,50
    June,Shirt,38
    June,Bat,41

     

    Expected Output

    (June,12)
    (June,11)
    (July,50)
    (June,20)
    (June,25)

     

    Code

    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    
    object StreamReduceExample {
      case class ProductInfo(month: String, product: String, profit: Int, count: Int)
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val data = env.readTextFile("/home/windbird123/product_info.txt")
        val mapped: DataStream[ProductInfo] =
          data.map(_.split(",")).map(x => ProductInfo(x(0), x(1), x(2).toInt, 1))
    
        // groupBy month
        val reduced: DataStream[ProductInfo] = mapped
          .keyBy("month")
          .reduce((current, preResult) =>
            ProductInfo(
              current.month,
              current.product,
              current.profit + preResult.profit,
              current.count + preResult.count
            )
          )
    
        val profitPerMonth = reduced.map(productInfo => (productInfo.month, productInfo.profit / productInfo.count))
        profitPerMonth.print()
        env.execute("Average Profit Per Month")
      }
    }
    

     

    설명

    min 과 minBy 는 위의 Input(/home/windbird123/product_info.txt) 에 대해 각각 어떻게 처리하는지 알아보자

    min 의 경우 관심있는 profit 필드만을 업데이트 하고, 관심없는 product 필드는 업데이트 하지 않는다.

    모든 필드까지 업데이트 하기 위해서는 minBy 를 사용해야 한다. (max, maxBy 도 동일하게 동작한다.)

    • min
      val reduced: DataStream[ProductInfo] = mapped
            .keyBy("month")
            .min("profit")
      reduced.print()
      
      // 출력 결과
      ProductInfo(June,Bat,12,1)
      ProductInfo(June,Bat,10,1)
      ProductInfo(July,Television,50,1)
      ProductInfo(June,Bat,10,1)
      ProductInfo(June,Bat,10,1)
    • minBy
      val reduced: DataStream[ProductInfo] = mapped
            .keyBy("month")
            .minBy("profit")
      reduced.print()
      
      // 출력 결과
      ProductInfo(June,Bat,12,1)
      ProductInfo(June,Perfume,10,1)
      ProductInfo(July,Television,50,1)
      ProductInfo(June,Perfume,10,1)
      ProductInfo(June,Perfume,10,1)

     

    참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/

    'flink' 카테고리의 다른 글

    Iterate 예제  (0) 2021.05.17
    Stream Split 예제  (0) 2021.05.17
    Stream WordCount 예제  (0) 2021.05.15
    DataSet Join 예제  (0) 2021.05.15
    DataSet WordCount 예제  (0) 2021.05.15

    댓글

Designed by Tistory.