-
Stream WordCount 예제flink 2021. 5. 15. 23:57
문제
localhost:9999 로 부터 socket 데이터를 읽어 stream word count 를 화면에 출력 한다.
Input(localhost:9999)
~$ netcat -l 9999 Noman Joyce Noman Isidore Nipun Rebekah Nipun
Expected Output
WordCount(Noman,1) WordCount(Noman,2) WordCount(Nipun,1) WordCount(Nipun,2)
Code
import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ object StreamWordCountExample { case class WordCount(word: String, count: Int) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) val text: DataStream[String] = env.socketTextStream("localhost", 9999) val filtered = text.filter(_.startsWith("N")) val tokenized: DataStream[WordCount] = filtered.map(x => WordCount(x, 1)) val counts: DataStream[WordCount] = tokenized.keyBy("word").sum("count") counts.print() env.execute("Stream WordCount") } }
실행
- netcat -l 9999
- Code 실행 하면 아래와 같은 메시지를 터미널에서 볼 수 있다.
예) Web frontend listening at http://localhost:59364 - netcat 이 실행된 터미널에서 Input 내용 입력
- Code 실행 터미널에서 Output 내용 확인
- (또는) http://localhost:59364에 접속해 flink 관리페이지의 Task Managers > Stdout 에서 Output 확인
설명
Data Sources for Stream API
- readTextFile(path)
- readFile(fileInputFormat, path)
- readFile(fileInputFormat, path, watchType, interval, pathFilter): 디렉토리 내에 새로 추가된 데이터를 자동으로 읽기 가능
- socketTextStream
- addSource
Data Sinks for Stream API
- writeAsText / TextOutputFormat
- writeAsCsv / CsvOutputFormat
- print()
- writeUsingOutputFormat / FileOutputFormat
- writeToSocket
- addSink
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/
'flink' 카테고리의 다른 글
Iterate 예제 (0) 2021.05.17 Stream Split 예제 (0) 2021.05.17 Stream Reduce, Min, MinBy, Max, MaxBy 예제 (0) 2021.05.16 DataSet Join 예제 (0) 2021.05.15 DataSet WordCount 예제 (0) 2021.05.15