flink
Stream WordCount 예제
wefree
2021. 5. 15. 23:57
문제
localhost:9999 로 부터 socket 데이터를 읽어 stream word count 를 화면에 출력 한다.
Input(localhost:9999)
~$ netcat -l 9999
Noman
Joyce
Noman
Isidore
Nipun
Rebekah
Nipun
Expected Output
WordCount(Noman,1)
WordCount(Noman,2)
WordCount(Nipun,1)
WordCount(Nipun,2)
Code
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object StreamWordCountExample {
case class WordCount(word: String, count: Int)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
val text: DataStream[String] = env.socketTextStream("localhost", 9999)
val filtered = text.filter(_.startsWith("N"))
val tokenized: DataStream[WordCount] = filtered.map(x => WordCount(x, 1))
val counts: DataStream[WordCount] = tokenized.keyBy("word").sum("count")
counts.print()
env.execute("Stream WordCount")
}
}
실행
- netcat -l 9999
- Code 실행 하면 아래와 같은 메시지를 터미널에서 볼 수 있다.
예) Web frontend listening at http://localhost:59364 - netcat 이 실행된 터미널에서 Input 내용 입력
- Code 실행 터미널에서 Output 내용 확인
- (또는) http://localhost:59364에 접속해 flink 관리페이지의 Task Managers > Stdout 에서 Output 확인
설명
Data Sources for Stream API
- readTextFile(path)
- readFile(fileInputFormat, path)
- readFile(fileInputFormat, path, watchType, interval, pathFilter): 디렉토리 내에 새로 추가된 데이터를 자동으로 읽기 가능
- socketTextStream
- addSource
Data Sinks for Stream API
- writeAsText / TextOutputFormat
- writeAsCsv / CsvOutputFormat
- print()
- writeUsingOutputFormat / FileOutputFormat
- writeToSocket
- addSink
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/