-
CheckPoint 예제flink 2021. 5. 19. 22:42
문제
Stream WordCount 예제에 CheckPoint 를 적용해 본다.
Code
import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.scala._ object CheckPointExample { case class WordCount(word: String, count: Int) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val params = ParameterTool.fromArgs(args) env.getConfig.setGlobalJobParameters(params) // start a checkpoint every 1000 ms env.enableCheckpointing(1000); // to set minimum progress time to happen between checkpoints env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500); // checkpoints have to complete within 10000 ms, or are discarded env.getCheckpointConfig.setCheckpointTimeout(10000); // set mode to exactly-once (this is the default) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // AT_LEAST_ONCE // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig.setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION ); // DELETE_ON_CANCELLATION env.setRestartStrategy( RestartStrategies.fixedDelayRestart(3, 100) ); // number of restart attempts , delay in each restart val text: DataStream[String] = env.socketTextStream("localhost", 9999) val filtered = text.filter(_.startsWith("N")) val tokenized: DataStream[WordCount] = filtered.map(x => WordCount(x, 1)) val counts: DataStream[WordCount] = tokenized.keyBy("word").sum("count") counts.print() env.execute("CheckPoint Example") } }
설명
RestartStrategies 로는 다음과 같은 것들이 있다.
- RestartStrategies.fixedDelayRestart
- RestartStrategies.failureRateRestart
- RestartStrategies.noRestart
- RestartStrategies.fallBackRestart: The cluster defined restart strategy is used
참고: https://www.udemy.com/course/apache-flink-a-real-time-hands-on-course-on-flink/
'flink' 카테고리의 다른 글
BroadcastState 예제 (0) 2021.05.20 ValueState 예제 (0) 2021.05.19 GlobalWindows 예제 (0) 2021.05.17 TumblingWindow 예제 (0) 2021.05.17 Iterate 예제 (0) 2021.05.17