spark streaming
Structured Streaming - Dataframe 으로 Aggregation 하기
wefree
2022. 3. 20. 23:34
문제
socket 에서 streaming 데이터를 읽어 count, groupBy 같은 aggregation 을 한다.
코드
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
object StreamingTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("StreamingTest").master("local[2]").getOrCreate()
val lines: DataFrame = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 12345)
.load()
val query: DataFrame = lines.select(count("*").as("lineCount"))
// val numbers = lines.select(col("value").cast(IntegerType).as("number"))
// val query: DataFrame = numbers.select(sum(col("number")).as("agg_so_far"))
// val query: DataFrame = lines.select(col("value").as("name"))
// .groupBy(col("name"))
// .count()
query.writeStream
.format("console")
.outputMode("complete") // append and update not supported on aggregations without watermarks
.start()
.awaitTermination()
}
}
결과
netcat 으로 입력데이터를 socket 으로 전달해 테스트 할 수 있다.
nc -lk 12345
설명
- aggregation 은 micro-batch level 에서 처리된다.
- distinct 나 sort 같은 몇몇 aggregation 은 지원되지 않는다.
- the append output mode not supported without watermarks