-
fibonacci 수열 구현하기akka & pekko/streams 2023. 7. 26. 18:23
akka/pekko streams 를 이용해 fibonacci 수열을 출력한다.
cyclic graph 를 구성하게 되는데, 이때 deadlock 이 발생할 수 있다. (특히 Merge 할 때)
이를 대응하기 위해, Merge 대신에 MergePreferred 를 사용했다.
import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.scaladsl.{Broadcast, Flow, GraphDSL, MergePreferred, RunnableGraph, Sink, Source, Zip} import org.apache.pekko.stream.{ClosedShape, UniformFanInShape} import scala.concurrent.ExecutionContextExecutor object PekkoFibonacci { def main(args: Array[String]): Unit = { implicit val system: ActorSystem = ActorSystem("pekko") implicit val ec: ExecutionContextExecutor = system.dispatcher val graph = GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val zip = builder.add(Zip[BigInt, BigInt]) val mergePreferred = builder.add(MergePreferred[(BigInt, BigInt)](1)) val fiboLogic = Flow[(BigInt, BigInt)].map { pair => val last = pair._1 val previous = pair._2 Thread.sleep(500) (last + previous, last) } val broadcast = builder.add(Broadcast[(BigInt, BigInt)](2)) val extractLast = builder.add(Flow[(BigInt, BigInt)].map(_._1)) zip.out ~> mergePreferred ~> fiboLogic ~> broadcast ~> extractLast mergePreferred <~ broadcast UniformFanInShape(extractLast.out, zip.in0, zip.in1) } val fiboGraph = RunnableGraph.fromGraph( GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val zero = builder.add(Source.single[BigInt](0)) val one = builder.add(Source.single[BigInt](1)) val fibo = builder.add(graph) val sink = Sink.foreach[BigInt](println) zero ~> fibo.in(0) one ~> fibo.in(1) fibo.out ~> sink ClosedShape } ) fiboGraph.run() } }
'akka & pekko > streams' 카테고리의 다른 글
stream 처리 완료 후 ActorSystem 종료하기 (0) 2023.07.22