ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • DStreams
    spark streaming 2022. 4. 10. 18:49

    코드

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object StreamingTest {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder().appName("StreamingTest").master("local[2]").getOrCreate()
    
        // 입력 데이터 존재 여부에 관계 없이 무조건 1초마다 배치가 실행됨
        val ssc: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(1))
    
        // my_dir 에 현재 있는 파일은 무시하고, 새로 생성되는 파일만 모니터링해 처리된다.
    //    val inputStream: DStream[String] = ssc.textFileStream("my_dir")
    
        val inputStream: DStream[String] = ssc.socketTextStream("localhost", 12345)
        val wordsStream                  = inputStream.flatMap(line => line.split(" "))
    //    wordsStream.print()
    
        // 1. batch 마다 디렉토리가 생성됨
        // 2. 디렉토리내에 partition 마다 파일이 생성됨
        wordsStream.saveAsTextFiles("...")
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

     

    결과

    netcat 으로 입력데이터를 socket 으로 전달해 테스트 할 수 있다. 

    nc -lk 12345

     

    설명

    몇 가지 유용한 함수를 기록해 보면

    val readPeople(): DStream[Person] = ???
    
    def countPeople(): DStream[Long] = readPeople().count() // the number of entries in every batch
    def countNames(): DStream[(String, Long)] = readPeople().map(_.firstName).countByValue()
    def countNamesReduce(): DStream[(String, Int)] = readPeople()
      .map(_.firstName)
      .map(name => (name, 1))
      .reduceByKey((a, b) => a + b)
      
    readPeople().foreachRDD{ rdd => ??? }
    
    for {
      line <- dataStream
      word <- line.split(" ")
    } yield word.toUpperCase()

     

    참고: https://www.udemy.com/course/spark-streaming

    댓글

Designed by Tistory.