ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Event Time Windows
    spark streaming 2022. 4. 23. 16:43

    문제

    아래와 같은 데이터가 socket stream 으로 들어올 때, event time sliding window 를 이용해 전체 합을 구해보자

    outputMode(complete, append, update) 에 따라 결과가 달라질 수 있는데, 여기서는 complete 로 테스트 해 본다.

    {"id":"21b12ec2-fc44-4068-889b-53cde1d936de","time":"2019-03-01T01:34:53.675+09:00","item":"TV","quantity":0}
    {"id":"237b1f51-208c-41a8-bd05-8c356746ce91","time":"2019-03-01T01:36:45.675+09:00","item":"iPad","quantity":5}
    {"id":"5d0b529a-2efb-4888-aea0-c59d9587f88e","time":"2019-03-01T06:23:59.675+09:00","item":"iPhone","quantity":5}
    {"id":"586ab1b2-4526-4caf-9077-64f52a8d5305","time":"2019-03-01T06:26:16.675+09:00","item":"TV","quantity":3}

     

    코드

    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    object EventTimeWindows {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = ...
        
        val onlinePurchaseSchema = StructType(
          Array(
            StructField("id", StringType),
            StructField("time", TimestampType),
            StructField("item", StringType),
            StructField("quantity", IntegerType)
          )
        )
        
        val purchasesDF: DataFrame = spark.readStream
          .format("socket")
          .option("host", "localhost")
          .option("port", 12345)
          .load()
          .select(from_json(col("value"), onlinePurchaseSchema).as("purchase"))
          .selectExpr("purchase.*")
          
        val windowByDay = purchasesDF
          .groupBy(window(col("time"), "1 day", "1 hour").as("time")) // struct column: has fields {start, end}
          .agg(sum("quantity").as("totalQuantity"))
          .select(
            col("time").getField("start").as("start"),
            col("time").getField("end").as("end"),
            col("totalQuantity")
          )
    
        windowByDay.writeStream
          .format("console")
          .outputMode("complete")
          .start()
          .awaitTermination()
      }
    }

     

    결과

    netcat 으로 입력데이터를 socket 으로 전달해 테스트 할 수 있다. 

    nc -lk 12345

     

    설명

    window(col("time"), "1 day", "1 hour") 로 windowDuration, slideDuration 를 지정했는데, 마지막 파라미터로 startTime 도 지정할 수 있다.

     

    참고: https://www.udemy.com/course/spark-streaming

    댓글

Designed by Tistory.