-
Stream Reduce, Min, MinBy, Max, MaxBy 예제flink 2021. 5. 16. 22:02
문제
text file 을 라인 단위로 읽어 월별 profit 평균을 출력한다.
Input(/home/windbird123/product_info.txt)
(month, product, profit)
June,Bat,12 June,Perfume,10 July,Television,50 June,Shirt,38 June,Bat,41
Expected Output
(June,12) (June,11) (July,50) (June,20) (June,25)
Code
import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object StreamReduceExample { case class ProductInfo(month: String, product: String, profit: Int, count: Int) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val data = env.readTextFile("/home/windbird123/product_info.txt") val mapped: DataStream[ProductInfo] = data.map(_.split(",")).map(x => ProductInfo(x(0), x(1), x(2).toInt, 1)) // groupBy month val reduced: DataStream[ProductInfo] = mapped .keyBy("month") .reduce((current, preResult) => ProductInfo( current.month, current.product, current.profit + preResult.profit, current.count + preResult.count ) ) val profitPerMonth = reduced.map(productInfo => (productInfo.month, productInfo.profit / productInfo.count)) profitPerMonth.print() env.execute("Average Profit Per Month") } }
설명
min 과 minBy 는 위의 Input(/home/windbird123/product_info.txt) 에 대해 각각 어떻게 처리하는지 알아보자
min 의 경우 관심있는 profit 필드만을 업데이트 하고, 관심없는 product 필드는 업데이트 하지 않는다.
모든 필드까지 업데이트 하기 위해서는 minBy 를 사용해야 한다. (max, maxBy 도 동일하게 동작한다.)
- min
val reduced: DataStream[ProductInfo] = mapped .keyBy("month") .min("profit") reduced.print() // 출력 결과 ProductInfo(June,Bat,12,1) ProductInfo(June,Bat,10,1) ProductInfo(July,Television,50,1) ProductInfo(June,Bat,10,1) ProductInfo(June,Bat,10,1)
- minBy
val reduced: DataStream[ProductInfo] = mapped .keyBy("month") .minBy("profit") reduced.print() // 출력 결과 ProductInfo(June,Bat,12,1) ProductInfo(June,Perfume,10,1) ProductInfo(July,Television,50,1) ProductInfo(June,Perfume,10,1) ProductInfo(June,Perfume,10,1)
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/
'flink' 카테고리의 다른 글
Iterate 예제 (0) 2021.05.17 Stream Split 예제 (0) 2021.05.17 Stream WordCount 예제 (0) 2021.05.15 DataSet Join 예제 (0) 2021.05.15 DataSet WordCount 예제 (0) 2021.05.15 - min