spark streaming

Structured Streaming - Dataframe 으로 Aggregation 하기

wefree 2022. 3. 20. 23:34

문제

socket 에서 streaming 데이터를 읽어 count, groupBy 같은 aggregation 을 한다.

 

코드

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}

object StreamingTest {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("StreamingTest").master("local[2]").getOrCreate()

    val lines: DataFrame = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 12345)
      .load()

    val query: DataFrame = lines.select(count("*").as("lineCount"))

//    val numbers = lines.select(col("value").cast(IntegerType).as("number"))
//    val query: DataFrame   = numbers.select(sum(col("number")).as("agg_so_far"))

//   val query: DataFrame = lines.select(col("value").as("name"))
//     .groupBy(col("name"))
//     .count()

    query.writeStream
      .format("console")
      .outputMode("complete") // append and update not supported on aggregations without watermarks
      .start()
      .awaitTermination()
  }
}

 

결과

netcat 으로 입력데이터를 socket 으로 전달해 테스트 할 수 있다. 

nc -lk 12345

 

설명

  • aggregation 은 micro-batch level 에서 처리된다. 
  • distinct 나 sort 같은 몇몇 aggregation 은 지원되지 않는다.
  • the append output mode not supported without watermarks

 

참고: https://www.udemy.com/course/spark-streaming