-
Structured Streaming - Dataframe 으로 Aggregation 하기spark streaming 2022. 3. 20. 23:34
문제
socket 에서 streaming 데이터를 읽어 count, groupBy 같은 aggregation 을 한다.
코드
import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, SparkSession} object StreamingTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("StreamingTest").master("local[2]").getOrCreate() val lines: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 12345) .load() val query: DataFrame = lines.select(count("*").as("lineCount")) // val numbers = lines.select(col("value").cast(IntegerType).as("number")) // val query: DataFrame = numbers.select(sum(col("number")).as("agg_so_far")) // val query: DataFrame = lines.select(col("value").as("name")) // .groupBy(col("name")) // .count() query.writeStream .format("console") .outputMode("complete") // append and update not supported on aggregations without watermarks .start() .awaitTermination() } }
결과
netcat 으로 입력데이터를 socket 으로 전달해 테스트 할 수 있다.
nc -lk 12345
설명
- aggregation 은 micro-batch level 에서 처리된다.
- distinct 나 sort 같은 몇몇 aggregation 은 지원되지 않는다.
- the append output mode not supported without watermarks
'spark streaming' 카테고리의 다른 글
Event Time Windows (0) 2022.04.23 DStreams (0) 2022.04.10 Structured Streaming - Datasets (0) 2022.04.07 Structured Streaming - Join (0) 2022.03.30 Structured Streaming - Dataframe 으로 streaming 처리하기 (0) 2022.03.20