spark streaming
DStreams
wefree
2022. 4. 10. 18:49
코드
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("StreamingTest").master("local[2]").getOrCreate()
// 입력 데이터 존재 여부에 관계 없이 무조건 1초마다 배치가 실행됨
val ssc: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(1))
// my_dir 에 현재 있는 파일은 무시하고, 새로 생성되는 파일만 모니터링해 처리된다.
// val inputStream: DStream[String] = ssc.textFileStream("my_dir")
val inputStream: DStream[String] = ssc.socketTextStream("localhost", 12345)
val wordsStream = inputStream.flatMap(line => line.split(" "))
// wordsStream.print()
// 1. batch 마다 디렉토리가 생성됨
// 2. 디렉토리내에 partition 마다 파일이 생성됨
wordsStream.saveAsTextFiles("...")
ssc.start()
ssc.awaitTermination()
}
}
결과
netcat 으로 입력데이터를 socket 으로 전달해 테스트 할 수 있다.
nc -lk 12345
설명
몇 가지 유용한 함수를 기록해 보면
val readPeople(): DStream[Person] = ???
def countPeople(): DStream[Long] = readPeople().count() // the number of entries in every batch
def countNames(): DStream[(String, Long)] = readPeople().map(_.firstName).countByValue()
def countNamesReduce(): DStream[(String, Int)] = readPeople()
.map(_.firstName)
.map(name => (name, 1))
.reduceByKey((a, b) => a + b)
readPeople().foreachRDD{ rdd => ??? }
for {
line <- dataStream
word <- line.split(" ")
} yield word.toUpperCase()