flink

Stream WordCount 예제

wefree 2021. 5. 15. 23:57

문제

localhost:9999 로 부터 socket 데이터를 읽어 stream word count 를 화면에 출력 한다.

 

Input(localhost:9999)

~$ netcat -l 9999
Noman
Joyce
Noman
Isidore
Nipun
Rebekah
Nipun

 

Expected Output

WordCount(Noman,1)
WordCount(Noman,2)
WordCount(Nipun,1)
WordCount(Nipun,2)

 

Code

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object StreamWordCountExample {
  case class WordCount(word: String, count: Int)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val params = ParameterTool.fromArgs(args)
    env.getConfig.setGlobalJobParameters(params)

    val text: DataStream[String]         = env.socketTextStream("localhost", 9999)
    val filtered                         = text.filter(_.startsWith("N"))
    val tokenized: DataStream[WordCount] = filtered.map(x => WordCount(x, 1))
    val counts: DataStream[WordCount]    = tokenized.keyBy("word").sum("count")

    counts.print()
    env.execute("Stream WordCount")
  }
}

 

실행

  1. netcat -l 9999
  2. Code 실행 하면 아래와 같은 메시지를 터미널에서 볼 수 있다.
    예) Web frontend listening at http://localhost:59364
  3. netcat 이 실행된 터미널에서 Input 내용 입력
  4. Code 실행 터미널에서 Output 내용 확인
  5. (또는)  http://localhost:59364 접속해 flink 관리페이지의 Task Managers > Stdout 에서 Output 확인

 

설명

Data Sources for Stream API

  • readTextFile(path)
  • readFile(fileInputFormat, path)
  • readFile(fileInputFormat, path, watchType, interval, pathFilter): 디렉토리 내에 새로 추가된 데이터를 자동으로 읽기 가능
  • socketTextStream
  • addSource

Data Sinks for Stream API

  • writeAsText / TextOutputFormat
  • writeAsCsv / CsvOutputFormat
  • print()
  • writeUsingOutputFormat / FileOutputFormat
  • writeToSocket
  • addSink

 

참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/