spark streaming
-
Event Time Windowsspark streaming 2022. 4. 23. 16:43
문제 아래와 같은 데이터가 socket stream 으로 들어올 때, event time sliding window 를 이용해 전체 합을 구해보자 outputMode(complete, append, update) 에 따라 결과가 달라질 수 있는데, 여기서는 complete 로 테스트 해 본다. {"id":"21b12ec2-fc44-4068-889b-53cde1d936de","time":"2019-03-01T01:34:53.675+09:00","item":"TV","quantity":0} {"id":"237b1f51-208c-41a8-bd05-8c356746ce91","time":"2019-03-01T01:36:45.675+09:00","item":"iPad","quantity":5} {"id":"5d0b..
-
DStreamsspark streaming 2022. 4. 10. 18:49
코드 import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamingTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("StreamingTest").master("local[2]").getOrCreate() // 입력 데이터 존재 여부에 관계 없이 무조건 1초마다 배치가 실행됨 val ssc: StreamingContext = new ..
-
Structured Streaming - Datasetsspark streaming 2022. 4. 7. 21:24
문제 Dataset 을 Streaming 에 활용해 보자 코드 import spark.implicits._ val carsDS: Dataset[Car] = spark.readStream .format("socket") .option("host", "localhost") .option("port", 12345) .load() // DF with single string column "value" .select(from_json(col("value"), carsSchema).as("car")) // composite column (struct) .selectExpr("car.*") // DF with multiple columns .as[Car] // encoder can be passed implicitl..
-
Structured Streaming - Joinspark streaming 2022. 3. 30. 19:56
설명 Streaming Join 에는 다음과 같은 제약이 있다. stream joining with static RIGHT outer join/full outer join/right_semi NOT permitted join stream with stream left/right outer joins ARE supported, but MUST have watermarks full outer joins are NOT supported 참고: https://www.udemy.com/course/spark-streaming
-
Structured Streaming - Dataframe 으로 Aggregation 하기spark streaming 2022. 3. 20. 23:34
문제 socket 에서 streaming 데이터를 읽어 count, groupBy 같은 aggregation 을 한다. 코드 import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, SparkSession} object StreamingTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("StreamingTest").master("local[2]").getOrCreate() val lines: DataFrame = spark.readStream .format("socket") .optio..
-
Structured Streaming - Dataframe 으로 streaming 처리하기spark streaming 2022. 3. 20. 20:25
문제 1. socket 에서 streaming 데이터를 읽어 처리하기 2. file 에서 streaming 데이터를 읽어 처리하기 코드 import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, SparkSession} import scala.concurrent.duration.DurationInt object StreamingTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("StreamingTest")...