-
Structured Streaming - Dataframe 으로 streaming 처리하기spark streaming 2022. 3. 20. 20:25
문제
1. socket 에서 streaming 데이터를 읽어 처리하기
2. file 에서 streaming 데이터를 읽어 처리하기
코드
import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, SparkSession} import scala.concurrent.duration.DurationInt object StreamingTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("StreamingTest").master("local[2]").getOrCreate() // socket 으로 부터 읽기 val lines: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 12345) .load() // file 에서 읽기 // val lines = spark.readStream // .format("csv") // .option("header", "false") // .option("dateFormat", "yyyy-MM-dd") // .schema(mySchema) // .load("src/main/resources/data") val shortLines: DataFrame = lines.filter(length(col("value")) <= 5) println(shortLines.isStreaming) shortLines.writeStream .format("console") .outputMode("append") .trigger( Trigger.ProcessingTime(2.seconds) // every 2 seconds run the query // Trigger.Once() // single batch, then terminate // Trigger.Continuous(2.seconds) // experimental, every 2 seconds create a batch with whatever you have ) .start() .awaitTermination() } }
결과
netcat 으로 입력데이터를 socket 으로 전달해 테스트 할 수 있다.
nc -lk 12345
설명
Output modes
- append: only add new records
- update: modify records in place (if query has no aggregations, equivalent with append)
- complete: rewrite everything
Trigger
- Trigger.ProcessingTime(2.seconds): 처리할 신규 입력이 있을 때 2초마다 실행
- Trigger.Once()
- Trigger.Continuous(2.seconds): 신규 입력이 없더라도 무조건 2초마다 실행
파일에서 읽기
위의 예제처럼 streaming 으로 작성하면,
- 먼저 디렉토리(src/main/resources/data) 에 존재하는 파일을 읽어 처리한다.
- 디렉토리(src/main/resources/data) 에 신규 파일이 추가될 때 마다 microbatch 가 실행되어 자동으로 처리된다.
'spark streaming' 카테고리의 다른 글
Event Time Windows (0) 2022.04.23 DStreams (0) 2022.04.10 Structured Streaming - Datasets (0) 2022.04.07 Structured Streaming - Join (0) 2022.03.30 Structured Streaming - Dataframe 으로 Aggregation 하기 (0) 2022.03.20