Akka Streamをいい感じに停止させる
Akka Streamを使ったサーバーを書いていて、サーバーの終了処理を行う時にstreamを停止させる必要が出てきました。どうしたら良いでしょうか。
いきなり ActorSystem#terminate
とりあえずAkka Stream自体の終了のさせ方。これは無理やり ActorSystem#terminate
を呼んでも止まることは止まります。
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1)) val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) val result = src.runWith(sink) Thread.sleep(1000) system.terminate() logger.info("result:" + Await.result(result, Duration.Inf)))
ですが、以下のようなエラーが出てしまいます。ダメそうです。
Exception in thread "main" akka.stream.AbruptStageTerminationException: GraphStage [akka.stream.impl.HeadOptionStage$$anon$3@783971c3] terminated abruptly, caused by for example materializer or actor system termination.
KillSwitch
というわけで適当にググると https://stackoverflow.com/a/38326082 というStackOverflowの回答がすぐに出てきます。どうやらKillSwitchというのを使えば良いようです。(Konrad さんの回答なので安心)ちなみにちゃんとAkkaのドキュメントにも載っています。
こんな感じ
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1)) val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) val (killSwitch, result) = src .viaMat(KillSwitches.single)(Keep.right) .toMat(sink)(Keep.both) .run() Thread.sleep(1000) killSwitch.shutdown() system.terminate() logger.info("result: " + Await.result(result, Duration.Inf))
KillSwitchでまずストリームを終了させ、それからActorSystemを終了させればエラーは起きません。
JVMのshutdown hookを使う
さて、サーバーの終了時にAkka Streamを停止させたいので Runtime.getRuntime.addShutdownHook
を使うことにしました。
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1)) val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) val (killSwitch, result) = src .viaMat(KillSwitches.single)(Keep.right) .toMat(sink)(Keep.both) .run() Runtime.getRuntime.addShutdownHook(new Thread(() => { killSwitch.shutdown() logger.info("result: " + Await.result(result, Duration.Inf)) system.terminate() }))
一見良さそうですが、これはエラーになることがあります。
Exception in thread "Thread-1" akka.stream.AbruptStageTerminationException: GraphStage [akka.stream.impl.HeadOptionStage$$anon$3@773caeb2] terminated abruptly, caused by for example materializer or actor system termination.
なぜ?
CoordinatedShutdown
不思議に思ってActorSystemのコードを読んでいたら、ActorSystemのインスタンスが作られる時には、JVMのshutdown hookにActorSystemの終了処理が追加されていることを知りました。これが原因でActorSystemに依存するような処理、例えばAkka StreamをJVMのshutdown hookで行おうとしても、タイミングによっては先にActorSystemが終了しうまくいかないようです。
ActorSystemのshutdown hookはCoordinatedShutdownというクラスで登録されています。そしてActorSystemに依存する処理のShutdownもCoordinatedShutdownを使うことで安全に行えます。次のコードがその例で、ActorSystemが終了する前(CoordinatedShtudown.PhaseBeforeActorSystemTerminateという値で指定している)にAkka Streamの終了処理を行っています。
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1)) val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) val (killSwitch, result) = src .viaMat(KillSwitches.single)(Keep.right) .toMat(sink)(Keep.both) .run() CoordinatedShutdown(system).addTask( CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "app-shutdown") { () => killSwitch.shutdown() logger.info("result: " + Await.result(result, Duration.Inf)) // ActorSystemはCoordinatedShutdownを使っていれば勝手に終了する // system.terminate() Future.successful(akka.Done) }
ちなみにCoordinatedShutdownによる終了処理はJVMのshutdown hookに任せずに自分で呼ぶことも可能です。
CoordinatedShutdown(actorSystem) .run(CoordinatedShutdown.JvmExitReason)
CoordinatedShutdownは主にakka-clusterのための機能のようで、不要であれば使わないように設定することもできます。