ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Stream WordCount 예제
    flink 2021. 5. 15. 23:57

    문제

    localhost:9999 로 부터 socket 데이터를 읽어 stream word count 를 화면에 출력 한다.

     

    Input(localhost:9999)

    ~$ netcat -l 9999
    Noman
    Joyce
    Noman
    Isidore
    Nipun
    Rebekah
    Nipun

     

    Expected Output

    WordCount(Noman,1)
    WordCount(Noman,2)
    WordCount(Nipun,1)
    WordCount(Nipun,2)

     

    Code

    import org.apache.flink.api.java.utils.ParameterTool
    import org.apache.flink.streaming.api.scala._
    
    object StreamWordCountExample {
      case class WordCount(word: String, count: Int)
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val params = ParameterTool.fromArgs(args)
        env.getConfig.setGlobalJobParameters(params)
    
        val text: DataStream[String]         = env.socketTextStream("localhost", 9999)
        val filtered                         = text.filter(_.startsWith("N"))
        val tokenized: DataStream[WordCount] = filtered.map(x => WordCount(x, 1))
        val counts: DataStream[WordCount]    = tokenized.keyBy("word").sum("count")
    
        counts.print()
        env.execute("Stream WordCount")
      }
    }
    

     

    실행

    1. netcat -l 9999
    2. Code 실행 하면 아래와 같은 메시지를 터미널에서 볼 수 있다.
      예) Web frontend listening at http://localhost:59364
    3. netcat 이 실행된 터미널에서 Input 내용 입력
    4. Code 실행 터미널에서 Output 내용 확인
    5. (또는)  http://localhost:59364 접속해 flink 관리페이지의 Task Managers > Stdout 에서 Output 확인

     

    설명

    Data Sources for Stream API

    • readTextFile(path)
    • readFile(fileInputFormat, path)
    • readFile(fileInputFormat, path, watchType, interval, pathFilter): 디렉토리 내에 새로 추가된 데이터를 자동으로 읽기 가능
    • socketTextStream
    • addSource

    Data Sinks for Stream API

    • writeAsText / TextOutputFormat
    • writeAsCsv / CsvOutputFormat
    • print()
    • writeUsingOutputFormat / FileOutputFormat
    • writeToSocket
    • addSink

     

    참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/

    'flink' 카테고리의 다른 글

    Iterate 예제  (0) 2021.05.17
    Stream Split 예제  (0) 2021.05.17
    Stream Reduce, Min, MinBy, Max, MaxBy 예제  (0) 2021.05.16
    DataSet Join 예제  (0) 2021.05.15
    DataSet WordCount 예제  (0) 2021.05.15

    댓글

Designed by Tistory.