-
Structured Streaming - Datasetsspark streaming 2022. 4. 7. 21:24
문제
Dataset 을 Streaming 에 활용해 보자
코드
import spark.implicits._ val carsDS: Dataset[Car] = spark.readStream .format("socket") .option("host", "localhost") .option("port", 12345) .load() // DF with single string column "value" .select(from_json(col("value"), carsSchema).as("car")) // composite column (struct) .selectExpr("car.*") // DF with multiple columns .as[Car] // encoder can be passed implicitly with spark.implicits val carCountByOrigin = carsDS.groupByKey(car => car.Origin).count() carCountByOrigin .writeStream .format("console") .outputMode("complete") .start() .awaitTermination()
'spark streaming' 카테고리의 다른 글
Event Time Windows (0) 2022.04.23 DStreams (0) 2022.04.10 Structured Streaming - Join (0) 2022.03.30 Structured Streaming - Dataframe 으로 Aggregation 하기 (0) 2022.03.20 Structured Streaming - Dataframe 으로 streaming 처리하기 (0) 2022.03.20