ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • fibonacci 수열 구현하기
    akka & pekko/streams 2023. 7. 26. 18:23

    akka/pekko streams 를 이용해 fibonacci 수열을 출력한다.

    cyclic graph 를 구성하게 되는데, 이때 deadlock 이 발생할 수 있다. (특히 Merge 할 때)

    https://pekko.apache.org/docs/pekko/current//stream/stream-graphs.html#graph-cycles-liveness-and-deadlocks

    이를 대응하기 위해, Merge 대신에 MergePreferred 를 사용했다.

     

    import org.apache.pekko.actor.ActorSystem
    import org.apache.pekko.stream.scaladsl.{Broadcast, Flow, GraphDSL, MergePreferred, RunnableGraph, Sink, Source, Zip}
    import org.apache.pekko.stream.{ClosedShape, UniformFanInShape}
    
    import scala.concurrent.ExecutionContextExecutor
    
    object PekkoFibonacci {
      def main(args: Array[String]): Unit = {
        implicit val system: ActorSystem = ActorSystem("pekko")
        implicit val ec: ExecutionContextExecutor = system.dispatcher
    
        val graph = GraphDSL.create() { implicit builder =>
          import GraphDSL.Implicits._
    
          val zip = builder.add(Zip[BigInt, BigInt])
          val mergePreferred = builder.add(MergePreferred[(BigInt, BigInt)](1))
    
          val fiboLogic = Flow[(BigInt, BigInt)].map { pair =>
            val last = pair._1
            val previous = pair._2
    
            Thread.sleep(500)
            (last + previous, last)
          }
    
          val broadcast = builder.add(Broadcast[(BigInt, BigInt)](2))
          val extractLast = builder.add(Flow[(BigInt, BigInt)].map(_._1))
    
          zip.out ~> mergePreferred ~> fiboLogic ~> broadcast ~> extractLast
          mergePreferred <~ broadcast
    
          UniformFanInShape(extractLast.out, zip.in0, zip.in1)
        }
    
        val fiboGraph = RunnableGraph.fromGraph(
          GraphDSL.create() { implicit builder =>
            import GraphDSL.Implicits._
    
            val zero = builder.add(Source.single[BigInt](0))
            val one = builder.add(Source.single[BigInt](1))
            val fibo = builder.add(graph)
    
            val sink = Sink.foreach[BigInt](println)
    
            zero ~> fibo.in(0)
            one ~> fibo.in(1)
            fibo.out ~> sink
    
            ClosedShape
          }
        )
    
        fiboGraph.run()
      }
    }

    'akka & pekko > streams' 카테고리의 다른 글

    stream 처리 완료 후 ActorSystem 종료하기  (0) 2023.07.22

    댓글

Designed by Tistory.