spark streaming

DStreams

wefree 2022. 4. 10. 18:49

코드

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingTest {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("StreamingTest").master("local[2]").getOrCreate()

    // 입력 데이터 존재 여부에 관계 없이 무조건 1초마다 배치가 실행됨
    val ssc: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(1))

    // my_dir 에 현재 있는 파일은 무시하고, 새로 생성되는 파일만 모니터링해 처리된다.
//    val inputStream: DStream[String] = ssc.textFileStream("my_dir")

    val inputStream: DStream[String] = ssc.socketTextStream("localhost", 12345)
    val wordsStream                  = inputStream.flatMap(line => line.split(" "))
//    wordsStream.print()

    // 1. batch 마다 디렉토리가 생성됨
    // 2. 디렉토리내에 partition 마다 파일이 생성됨
    wordsStream.saveAsTextFiles("...")

    ssc.start()
    ssc.awaitTermination()
  }
}

 

결과

netcat 으로 입력데이터를 socket 으로 전달해 테스트 할 수 있다. 

nc -lk 12345

 

설명

몇 가지 유용한 함수를 기록해 보면

val readPeople(): DStream[Person] = ???

def countPeople(): DStream[Long] = readPeople().count() // the number of entries in every batch
def countNames(): DStream[(String, Long)] = readPeople().map(_.firstName).countByValue()
def countNamesReduce(): DStream[(String, Int)] = readPeople()
  .map(_.firstName)
  .map(name => (name, 1))
  .reduceByKey((a, b) => a + b)
  
readPeople().foreachRDD{ rdd => ??? }

for {
  line <- dataStream
  word <- line.split(" ")
} yield word.toUpperCase()

 

참고: https://www.udemy.com/course/spark-streaming