-
Event Time Windowsspark streaming 2022. 4. 23. 16:43
문제
아래와 같은 데이터가 socket stream 으로 들어올 때, event time sliding window 를 이용해 전체 합을 구해보자
outputMode(complete, append, update) 에 따라 결과가 달라질 수 있는데, 여기서는 complete 로 테스트 해 본다.
{"id":"21b12ec2-fc44-4068-889b-53cde1d936de","time":"2019-03-01T01:34:53.675+09:00","item":"TV","quantity":0} {"id":"237b1f51-208c-41a8-bd05-8c356746ce91","time":"2019-03-01T01:36:45.675+09:00","item":"iPad","quantity":5} {"id":"5d0b529a-2efb-4888-aea0-c59d9587f88e","time":"2019-03-01T06:23:59.675+09:00","item":"iPhone","quantity":5} {"id":"586ab1b2-4526-4caf-9077-64f52a8d5305","time":"2019-03-01T06:26:16.675+09:00","item":"TV","quantity":3}
코드
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ object EventTimeWindows { def main(args: Array[String]): Unit = { val spark: SparkSession = ... val onlinePurchaseSchema = StructType( Array( StructField("id", StringType), StructField("time", TimestampType), StructField("item", StringType), StructField("quantity", IntegerType) ) ) val purchasesDF: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 12345) .load() .select(from_json(col("value"), onlinePurchaseSchema).as("purchase")) .selectExpr("purchase.*") val windowByDay = purchasesDF .groupBy(window(col("time"), "1 day", "1 hour").as("time")) // struct column: has fields {start, end} .agg(sum("quantity").as("totalQuantity")) .select( col("time").getField("start").as("start"), col("time").getField("end").as("end"), col("totalQuantity") ) windowByDay.writeStream .format("console") .outputMode("complete") .start() .awaitTermination() } }
결과
netcat 으로 입력데이터를 socket 으로 전달해 테스트 할 수 있다.
nc -lk 12345
설명
window(col("time"), "1 day", "1 hour") 로 windowDuration, slideDuration 를 지정했는데, 마지막 파라미터로 startTime 도 지정할 수 있다.
'spark streaming' 카테고리의 다른 글
DStreams (0) 2022.04.10 Structured Streaming - Datasets (0) 2022.04.07 Structured Streaming - Join (0) 2022.03.30 Structured Streaming - Dataframe 으로 Aggregation 하기 (0) 2022.03.20 Structured Streaming - Dataframe 으로 streaming 처리하기 (0) 2022.03.20