ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Structured Streaming - Dataframe 으로 streaming 처리하기
    spark streaming 2022. 3. 20. 20:25

    문제

    1. socket 에서 streaming 데이터를 읽어 처리하기

    2. file 에서 streaming 데이터를 읽어 처리하기

     

    코드

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.streaming.Trigger
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    import scala.concurrent.duration.DurationInt
    
    object StreamingTest {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder().appName("StreamingTest").master("local[2]").getOrCreate()
    
        // socket 으로 부터 읽기
        val lines: DataFrame = spark.readStream
          .format("socket")
          .option("host", "localhost")
          .option("port", 12345)
          .load()
    
        // file 에서 읽기
    //    val lines = spark.readStream
    //      .format("csv")
    //      .option("header", "false")
    //      .option("dateFormat", "yyyy-MM-dd")
    //      .schema(mySchema)
    //      .load("src/main/resources/data")
    
        val shortLines: DataFrame = lines.filter(length(col("value")) <= 5)
        println(shortLines.isStreaming)
    
        shortLines.writeStream
          .format("console")
          .outputMode("append")
          .trigger(
            Trigger.ProcessingTime(2.seconds) // every 2 seconds run the query
            // Trigger.Once() // single batch, then terminate
            // Trigger.Continuous(2.seconds) // experimental, every 2 seconds create a batch with whatever you have
          )
          .start()
          .awaitTermination()
      }
    }

     

    결과

    netcat 으로 입력데이터를 socket 으로 전달해 테스트 할 수 있다. 

    nc -lk 12345

     

    설명

    Output modes

    • append: only add new records
    • update: modify records in place (if query has no aggregations, equivalent with append)
    • complete: rewrite everything

    Trigger

    • Trigger.ProcessingTime(2.seconds): 처리할 신규 입력이 있을 때 2초마다 실행
    • Trigger.Once()
    • Trigger.Continuous(2.seconds): 신규 입력이 없더라도 무조건 2초마다 실행

    파일에서 읽기

    위의 예제처럼 streaming 으로 작성하면,

    • 먼저 디렉토리(src/main/resources/data) 에 존재하는 파일을 읽어 처리한다.
    • 디렉토리(src/main/resources/data) 에 신규 파일이 추가될 때 마다 microbatch 가 실행되어 자동으로 처리된다.

     

    참고: https://www.udemy.com/course/spark-streaming

    'spark streaming' 카테고리의 다른 글

    Event Time Windows  (0) 2022.04.23
    DStreams  (0) 2022.04.10
    Structured Streaming - Datasets  (0) 2022.04.07
    Structured Streaming - Join  (0) 2022.03.30
    Structured Streaming - Dataframe 으로 Aggregation 하기  (0) 2022.03.20

    댓글

Designed by Tistory.