Akka Streamをいい感じに停止させる

Akka Streamを使ったサーバーを書いていて、サーバーの終了処理を行う時にstreamを停止させる必要が出てきました。どうしたら良いでしょうか。

いきなり ActorSystem#terminate

とりあえずAkka Stream自体の終了のさせ方。これは無理やり ActorSystem#terminate を呼んでも止まることは止まります。

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val src: Source[Int, NotUsed] = 
  Source.fromIterator[Int](() => Iterator.continually(1))
val sink: Sink[Int, Future[Int]] =
  Sink.fold(0)(_ + _)
val result = src.runWith(sink)

Thread.sleep(1000)

system.terminate()

logger.info("result:" + Await.result(result, Duration.Inf)))

ですが、以下のようなエラーが出てしまいます。ダメそうです。

Exception in thread "main" akka.stream.AbruptStageTerminationException: GraphStage [akka.stream.impl.HeadOptionStage$$anon$3@783971c3] terminated abruptly, caused by for example materializer or actor system termination.

KillSwitch

というわけで適当にググるhttps://stackoverflow.com/a/38326082 というStackOverflowの回答がすぐに出てきます。どうやらKillSwitchというのを使えば良いようです。(Konrad さんの回答なので安心)ちなみにちゃんとAkkaのドキュメントにも載っています。

こんな感じ

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1))
val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)

val (killSwitch, result) = src
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(sink)(Keep.both)
  .run()

Thread.sleep(1000)

killSwitch.shutdown()

system.terminate()

logger.info("result: " + Await.result(result, Duration.Inf))      

KillSwitchでまずストリームを終了させ、それからActorSystemを終了させればエラーは起きません。

JVMのshutdown hookを使う

さて、サーバーの終了時にAkka Streamを停止させたいので Runtime.getRuntime.addShutdownHook を使うことにしました。

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1))
val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)

val (killSwitch, result) = src
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(sink)(Keep.both)
  .run()

Runtime.getRuntime.addShutdownHook(new Thread(() => {
  killSwitch.shutdown()
  logger.info("result: " + Await.result(result, Duration.Inf))
  system.terminate()
}))

一見良さそうですが、これはエラーになることがあります。

Exception in thread "Thread-1" akka.stream.AbruptStageTerminationException: GraphStage [akka.stream.impl.HeadOptionStage$$anon$3@773caeb2] terminated abruptly, caused by for example materializer or actor system termination.

なぜ?

CoordinatedShutdown

不思議に思ってActorSystemのコードを読んでいたら、ActorSystemのインスタンスが作られる時には、JVMのshutdown hookにActorSystemの終了処理が追加されていることを知りました。これが原因でActorSystemに依存するような処理、例えばAkka StreamをJVMのshutdown hookで行おうとしても、タイミングによっては先にActorSystemが終了しうまくいかないようです。

ActorSystemのshutdown hookはCoordinatedShutdownというクラスで登録されています。そしてActorSystemに依存する処理のShutdownもCoordinatedShutdownを使うことで安全に行えます。次のコードがその例で、ActorSystemが終了する前(CoordinatedShtudown.PhaseBeforeActorSystemTerminateという値で指定している)にAkka Streamの終了処理を行っています。

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1))
val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)

val (killSwitch, result) = src
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(sink)(Keep.both)
  .run()

CoordinatedShutdown(system).addTask(
  CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "app-shutdown") {
    () =>
      killSwitch.shutdown()
      logger.info("result: " + Await.result(result, Duration.Inf))
      // ActorSystemはCoordinatedShutdownを使っていれば勝手に終了する
      // system.terminate()
      Future.successful(akka.Done)
}

ちなみにCoordinatedShutdownによる終了処理はJVMのshutdown hookに任せずに自分で呼ぶことも可能です。

CoordinatedShutdown(actorSystem)
  .run(CoordinatedShutdown.JvmExitReason)

CoordinatedShutdownは主にakka-clusterのための機能のようで、不要であれば使わないように設定することもできます。

https://doc.akka.io/docs/akka/2.5.11/project/migration-guide-2.4.x-2.5.x.html?language=scala#coordinated-shutdown