-
DStreamsspark streaming 2022. 4. 10. 18:49
코드
import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamingTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("StreamingTest").master("local[2]").getOrCreate() // 입력 데이터 존재 여부에 관계 없이 무조건 1초마다 배치가 실행됨 val ssc: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(1)) // my_dir 에 현재 있는 파일은 무시하고, 새로 생성되는 파일만 모니터링해 처리된다. // val inputStream: DStream[String] = ssc.textFileStream("my_dir") val inputStream: DStream[String] = ssc.socketTextStream("localhost", 12345) val wordsStream = inputStream.flatMap(line => line.split(" ")) // wordsStream.print() // 1. batch 마다 디렉토리가 생성됨 // 2. 디렉토리내에 partition 마다 파일이 생성됨 wordsStream.saveAsTextFiles("...") ssc.start() ssc.awaitTermination() } }
결과
netcat 으로 입력데이터를 socket 으로 전달해 테스트 할 수 있다.
nc -lk 12345
설명
몇 가지 유용한 함수를 기록해 보면
val readPeople(): DStream[Person] = ??? def countPeople(): DStream[Long] = readPeople().count() // the number of entries in every batch def countNames(): DStream[(String, Long)] = readPeople().map(_.firstName).countByValue() def countNamesReduce(): DStream[(String, Int)] = readPeople() .map(_.firstName) .map(name => (name, 1)) .reduceByKey((a, b) => a + b) readPeople().foreachRDD{ rdd => ??? } for { line <- dataStream word <- line.split(" ") } yield word.toUpperCase()
'spark streaming' 카테고리의 다른 글
Event Time Windows (0) 2022.04.23 Structured Streaming - Datasets (0) 2022.04.07 Structured Streaming - Join (0) 2022.03.30 Structured Streaming - Dataframe 으로 Aggregation 하기 (0) 2022.03.20 Structured Streaming - Dataframe 으로 streaming 처리하기 (0) 2022.03.20