ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • CheckPoint 예제
    flink 2021. 5. 19. 22:42

    문제

    Stream WordCount 예제에 CheckPoint 를 적용해 본다.

     

    Code

    import org.apache.flink.api.common.restartstrategy.RestartStrategies
    import org.apache.flink.api.java.utils.ParameterTool
    import org.apache.flink.streaming.api.CheckpointingMode
    import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
    import org.apache.flink.streaming.api.scala._
    
    object CheckPointExample {
      case class WordCount(word: String, count: Int)
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val params = ParameterTool.fromArgs(args)
        env.getConfig.setGlobalJobParameters(params)
    
        // start a checkpoint every 1000 ms
        env.enableCheckpointing(1000);
    
        // to set minimum progress time to happen between checkpoints
        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
    
        // checkpoints have to complete within 10000 ms, or are discarded
        env.getCheckpointConfig.setCheckpointTimeout(10000);
    
        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // AT_LEAST_ONCE
    
        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
    
        // enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig.enableExternalizedCheckpoints(
          ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        ); // DELETE_ON_CANCELLATION
    
        env.setRestartStrategy(
          RestartStrategies.fixedDelayRestart(3, 100)
        ); // number of restart attempts , delay in each restart
    
        val text: DataStream[String]         = env.socketTextStream("localhost", 9999)
        val filtered                         = text.filter(_.startsWith("N"))
        val tokenized: DataStream[WordCount] = filtered.map(x => WordCount(x, 1))
        val counts: DataStream[WordCount]    = tokenized.keyBy("word").sum("count")
    
        counts.print()
        env.execute("CheckPoint Example")
      }
    }
    

     

    설명

    RestartStrategies 로는 다음과 같은 것들이 있다.

    • RestartStrategies.fixedDelayRestart
    • RestartStrategies.failureRateRestart
    • RestartStrategies.noRestart
    • RestartStrategies.fallBackRestart: The cluster defined restart strategy is used

     

    참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/

    'flink' 카테고리의 다른 글

    BroadcastState 예제  (0) 2021.05.20
    ValueState 예제  (0) 2021.05.19
    GlobalWindows 예제  (0) 2021.05.17
    TumblingWindow 예제  (0) 2021.05.17
    Iterate 예제  (0) 2021.05.17

    댓글

Designed by Tistory.