Akka Streamをいい感じに停止させる
Akka Streamを使ったサーバーを書いていて、サーバーの終了処理を行う時にstreamを停止させる必要が出てきました。どうしたら良いでしょうか。
いきなり ActorSystem#terminate
とりあえずAkka Stream自体の終了のさせ方。これは無理やり ActorSystem#terminate
を呼んでも止まることは止まります。
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1)) val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) val result = src.runWith(sink) Thread.sleep(1000) system.terminate() logger.info("result:" + Await.result(result, Duration.Inf)))
ですが、以下のようなエラーが出てしまいます。ダメそうです。
Exception in thread "main" akka.stream.AbruptStageTerminationException: GraphStage [akka.stream.impl.HeadOptionStage$$anon$3@783971c3] terminated abruptly, caused by for example materializer or actor system termination.
KillSwitch
というわけで適当にググると https://stackoverflow.com/a/38326082 というStackOverflowの回答がすぐに出てきます。どうやらKillSwitchというのを使えば良いようです。(Konrad さんの回答なので安心)ちなみにちゃんとAkkaのドキュメントにも載っています。
こんな感じ
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1)) val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) val (killSwitch, result) = src .viaMat(KillSwitches.single)(Keep.right) .toMat(sink)(Keep.both) .run() Thread.sleep(1000) killSwitch.shutdown() system.terminate() logger.info("result: " + Await.result(result, Duration.Inf))
KillSwitchでまずストリームを終了させ、それからActorSystemを終了させればエラーは起きません。
JVMのshutdown hookを使う
さて、サーバーの終了時にAkka Streamを停止させたいので Runtime.getRuntime.addShutdownHook
を使うことにしました。
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1)) val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) val (killSwitch, result) = src .viaMat(KillSwitches.single)(Keep.right) .toMat(sink)(Keep.both) .run() Runtime.getRuntime.addShutdownHook(new Thread(() => { killSwitch.shutdown() logger.info("result: " + Await.result(result, Duration.Inf)) system.terminate() }))
一見良さそうですが、これはエラーになることがあります。
Exception in thread "Thread-1" akka.stream.AbruptStageTerminationException: GraphStage [akka.stream.impl.HeadOptionStage$$anon$3@773caeb2] terminated abruptly, caused by for example materializer or actor system termination.
なぜ?
CoordinatedShutdown
不思議に思ってActorSystemのコードを読んでいたら、ActorSystemのインスタンスが作られる時には、JVMのshutdown hookにActorSystemの終了処理が追加されていることを知りました。これが原因でActorSystemに依存するような処理、例えばAkka StreamをJVMのshutdown hookで行おうとしても、タイミングによっては先にActorSystemが終了しうまくいかないようです。
ActorSystemのshutdown hookはCoordinatedShutdownというクラスで登録されています。そしてActorSystemに依存する処理のShutdownもCoordinatedShutdownを使うことで安全に行えます。次のコードがその例で、ActorSystemが終了する前(CoordinatedShtudown.PhaseBeforeActorSystemTerminateという値で指定している)にAkka Streamの終了処理を行っています。
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1)) val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) val (killSwitch, result) = src .viaMat(KillSwitches.single)(Keep.right) .toMat(sink)(Keep.both) .run() CoordinatedShutdown(system).addTask( CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "app-shutdown") { () => killSwitch.shutdown() logger.info("result: " + Await.result(result, Duration.Inf)) // ActorSystemはCoordinatedShutdownを使っていれば勝手に終了する // system.terminate() Future.successful(akka.Done) }
ちなみにCoordinatedShutdownによる終了処理はJVMのshutdown hookに任せずに自分で呼ぶことも可能です。
CoordinatedShutdown(actorSystem) .run(CoordinatedShutdown.JvmExitReason)
CoordinatedShutdownは主にakka-clusterのための機能のようで、不要であれば使わないように設定することもできます。
slick2とslick3を同居させる
slick2からslick3への移行はAPIが根本的に変わったためにめちゃくちゃ大変です。blocking-slick を使った方法なども知られていますが、ちょっと使って良いものかは悩みます。
blocking-slickを使っても使わなくてもslick2からslick3への書き換えを一度にやるのは大きなプロジェクトだとかなりのパワーが必要です。slick2とslick3を1つのプロジェクト内で同居させ、徐々にslick3に移行させていくことはできないでしょうか。
通常、同一ライブラリの2つのバージョンがプロジェクト内に同居するということはありえません。ところがslick2とslick3の場合、package名が
- slick2: scala.slick.*
- slick3: slick.*
と変わっているのでslick2とslick3の両方を依存関係に加えることができればできるはずです。
slick2とslick3の両方を依存関係に加える
単純に考えると次のようなことがしたいのですが、これはできません。
libraryDependencies ++= Seq( "com.typesafe.slick" %% "slick" % "2.1.0", "com.typesafe.slick" %% "slick" % "3.2.1" )
同一ライブラリの別バージョンがあった場合、sbtはバージョンが新しいものを優先して依存関係に追加します。そのためslick2は読み込んでくれません。sbtのバージョンによっては次のようなメッセージが出ているでしょう。
[warn] There may be incompatibilities among your library dependencies. [warn] Here are some of the libraries that were evicted: [warn] * com.typesafe.slick:slick_2.11:2.1.0 -> 3.2.1
sbtに依存関係を管理させるとslick2とslick3を同居させることはできなそうです。幸い、sbtはlibというディレクトリにjarを追加することでそのjarを読み込んでくれますから、slick3のjarを自分で用意してlibに入れます。
バイナリ互換性について
slick2はtypesafe/configの1.2に依存しているけどslick3はtypesafe/configの1.3に依存している。でもtypesafe/configの1.2と1.3はバイナリ互換なようでセーフ。
コード例
というわけで次のコードはslick2とslick3を同居させた例です。
package com.example import scala.concurrent.ExecutionContext.Implicits.global object Slick3 { import _root_.slick.jdbc.H2Profile.api._ val db = Database.forConfig("h2mem1") class Suppliers(tag: Tag) extends Table[(Int, String, String, String, String, String)](tag, "SUPPLIERS") { def id = column[Int]("SUP_ID", O.PrimaryKey) // This is the primary key column def name = column[String]("SUP_NAME") def street = column[String]("STREET") def city = column[String]("CITY") def state = column[String]("STATE") def zip = column[String]("ZIP") // Every table needs a * projection with the same type as the table's type parameter def * = (id, name, street, city, state, zip) } val suppliers = TableQuery[Suppliers] // Definition of the COFFEES table class Coffees(tag: Tag) extends Table[(String, Int, Double, Int, Int)](tag, "COFFEES") { def name = column[String]("COF_NAME", O.PrimaryKey) def supID = column[Int]("SUP_ID") def price = column[Double]("PRICE") def sales = column[Int]("SALES") def total = column[Int]("TOTAL") def * = (name, supID, price, sales, total) // A reified foreign key relation that can be navigated to create a join def supplier = foreignKey("SUP_FK", supID, suppliers)(_.id) } val coffees = TableQuery[Coffees] def setup() = { val action = DBIO.seq( // Create the tables, including primary and foreign keys (suppliers.schema ++ coffees.schema).create, // Insert some suppliers suppliers += (101, "Acme, Inc.", "99 Market Street", "Groundsville", "CA", "95199"), suppliers += ( 49, "Superior Coffee", "1 Party Place", "Mendocino", "CA", "95460"), suppliers += (150, "The High Ground", "100 Coffee Lane", "Meadows", "CA", "93966"), // Equivalent SQL code: // insert into SUPPLIERS(SUP_ID, SUP_NAME, STREET, CITY, STATE, ZIP) values (?,?,?,?,?,?) // Insert some coffees (using JDBC's batch insert feature, if supported by the DB) coffees ++= Seq( ("Colombian", 101, 7.99, 0, 0), ("French_Roast", 49, 8.99, 0, 0), ("Espresso", 150, 9.99, 0, 0), ("Colombian_Decaf", 101, 8.99, 0, 0), ("French_Roast_Decaf", 49, 9.99, 0, 0) ) // Equivalent SQL code: // insert into COFFEES(COF_NAME, SUP_ID, PRICE, SALES, TOTAL) values (?,?,?,?,?) ) db.run(action) } def run(): Unit = { println("Coffees:") for { _ <- setup() result <- db.run(coffees.result) } { result.foreach { case (name, supID, price, sales, total) => println(" " + name + "\t" + supID + "\t" + price + "\t" + sales + "\t" + total) } } } } object Slick2 { import scala.slick.driver.H2Driver.simple._ val db = Database.forConfig("h2mem2") class Suppliers(tag: Tag) extends Table[(Int, String, String, String, String, String)](tag, "SUPPLIERS") { def id = column[Int]("SUP_ID", O.PrimaryKey) // This is the primary key column def name = column[String]("SUP_NAME") def street = column[String]("STREET") def city = column[String]("CITY") def state = column[String]("STATE") def zip = column[String]("ZIP") // Every table needs a * projection with the same type as the table's type parameter def * = (id, name, street, city, state, zip) } val suppliers = TableQuery[Suppliers] // Definition of the COFFEES table class Coffees(tag: Tag) extends Table[(String, Int, Double, Int, Int)](tag, "COFFEES") { def name = column[String]("COF_NAME", O.PrimaryKey) def supID = column[Int]("SUP_ID") def price = column[Double]("PRICE") def sales = column[Int]("SALES") def total = column[Int]("TOTAL") def * = (name, supID, price, sales, total) // A reified foreign key relation that can be navigated to create a join def supplier = foreignKey("SUP_FK", supID, suppliers)(_.id) } val coffees = TableQuery[Coffees] def setup() = { db.withSession { implicit session => // Create the tables, including primary and foreign keys suppliers.ddl.create coffees.ddl.create // Insert some suppliers suppliers += (101, "Acme, Inc.", "99 Market Street", "Groundsville", "CA", "95199") suppliers += ( 49, "Superior Coffee", "1 Party Place", "Mendocino", "CA", "95460") suppliers += (150, "The High Ground", "100 Coffee Lane", "Meadows", "CA", "93966") // Equivalent SQL code: // insert into SUPPLIERS(SUP_ID, SUP_NAME, STREET, CITY, STATE, ZIP) values (?,?,?,?,?,?) // Insert some coffees (using JDBC's batch insert feature, if supported by the DB) coffees ++= Seq( ("Colombian", 101, 7.99, 0, 0), ("French_Roast", 49, 8.99, 0, 0), ("Espresso", 150, 9.99, 0, 0), ("Colombian_Decaf", 101, 8.99, 0, 0), ("French_Roast_Decaf", 49, 9.99, 0, 0) ) // Equivalent SQL code: // insert into COFFEES(COF_NAME, SUP_ID, PRICE, SALES, TOTAL) values (?,?,?,?,?) } } def run(): Unit = { println("Coffees:") db.withSession { implicit session => coffees.list.foreach { case (name, supID, price, sales, total) => println(" " + name + "\t" + supID + "\t" + price + "\t" + sales + "\t" + total) } } } } object Main { def main(args: Array[String]): Unit = { println("------------- Slick3 --------------") Slick3.setup() Slick3.run() println("------------- Slick2 --------------") Slick2.setup() Slick2.run() } }
slick3のクラスをimportする時は _root_
を頭につけた方が良いと思います。scalaは
import scala.io.Source
が
import io.Source
と書けるように、scalaパッケージがデフォルトでimportされているので、_root_
をつけないとslick3を読み込みたいのかslick2を読み込みたいのか不明瞭になってしまうためです。
というわけでslick2とslick3を同居させることができたんですが、これはこれで新たな地獄を生み出してしまったのかもしれません。
circeについて
混迷極まるScalaのJsonライブラリ事情ですが、最近はcirceというライブラリもメジャーになってきました。ただ個人的にはあまり仕事では採用する気にはならないライブラリです。
catsに依存している
circeの売りとして、cats以外のライブラリに依存していないというものがあるんですが、catsに依存してるという時点ですでに判断が別れるところだと思います。私はscalazやcatsは仕事ではほぼ使ったことがないので依存ライブラリがcatsに依存しているのは結構嫌です。またscalaz派から見ても同じようなライブラリが加わるのは嫌でしょう。
shapelessに依存している
なぜcirceの人気が出てきたかといえば、おそらくgenericの存在ではないでしょうか。
play-jsonでは毎回 implicit val hogeFormat = Json.Format[Hoge]
を書かされるのが辛いですが、circeのgenericを使えばその煩わしさから解放されます。
ところがcirceのgenericってつまりshapelessのgenericなんですよね。従ってcirceのgenericモジュールを使うとshapelessがくっついてきます。shapelessはscalazやcats以上に難易度が高いライブラリだと思います。少なくとも implicit val hogeFormat = Json.Format[Hoge]
と書きたくないばかりにshapelessを使う、というのはやり過ぎではないかと思います。
scalazに依存している
circeにはopticsというモジュールがあります。Jsonの構造をグリグリといじるのに便利なモジュールなんですが、これはMonocleというライブラリを使っています。Monocleはshapelessに加えてscalazを使っています。なんか強い奴らばっかり出てきますね。
まあopticsの機能はよくあるようなアプリケーションでは使わないでしょう。 jsonの構造を変更するとしてもそれは一旦jsonをドメインオブジェクトに変更したのちに、そのドメインオブジェクトを変更し、それをjsonに戻すという手順を踏むでしょう。jsonライブラリ自信がjsonの構造をいじるというのはjson自体が重要な関心事でない限りはあまり必要にならなそうです。
以上難しいライブラリ使いたくないマンのぼやきでした。
Scalaのcase classに副作用のある振る舞いを持たせる時のパターン
不変オブジェクトを使ったプログラムを書く時に便利なcase classですが、case classのメリットを生かしつつ副作用を扱うためにはどのような書き方をするのが良いでしょうか。具体的には外部APIやデータベースへのアクセスをcase classを使ってどう実装するのが良いか考えます。
例として次のような簡単なUserクラスを用意します。
case class User(id: Int, name: String)
Userの名前を変更するメソッドを追加してみます。これはcopyメソッドを使えば簡単です。ここまではサンプルコードでよく目にするような例です。
case class User(id: Int, name: String) { def rename(name: String): User = copy(name = name) }
さて、このユーザー情報をデータベースに保存することにしましょう。保存処理自体はUserRepositoryというクラスを利用します。次の例はあまり良くない例です。
case class User(id: Int, name: String) { def rename(name: String): User = copy(name = name) def save(): Unit = UserRepository.save(this) } object UserRepository { def save(user: User): Unit = ... }
UserRepositoryはシングルトンとして実装しました。簡単ですがUser#saveは副作用を伴うメソッドで、テストをするのにデータベースの管理までが必要になって面倒です。
こういった副作用を分離するために関数型っぽいアプローチを取ることもできますが、ここではJavaっぽくインタフェースで副作用のある処理との結合を切り離し、副作用を制御できるようにします。
trait UserRepository { def save(user: User): Unit } class UserRepositoryImpl extends UserRepository { def save(user: User): Unit = ... } class UserRepositoryMock extends UserRepository { var users: Seq[User] = Seq.empty def save(user: User): Unit = users :+ user }
UserRepositoryをインタフェースとして実装し、Userクラスにはこのインタフェースに対しての実装を書くことにしました。これでテストの時にMockを使えばDBの管理まで考える必要がなくなります。
さて、このUserRepositoryをcase classに組み込むとどうなるでしょうか。
case class User( id: Int, name: String, userRepository: UserRepository) { def rename(name: String): User = copy(name = name) def save(): Unit = userRepository.save(this) }
コンストラクタでUserRepositoryを渡してみました。しかしこれはNGです。case classには自動でequalsメソッドが付いてきますが、そのメソッドは比較対象である2つのcase classのフィールドが同じかを見ています。ということはUserRepositoryが何を持って同値とするかを決めておかないとまずいのです。つまりUserRepositoryのequalsを実装する、という作業が必要ですが正直面倒。そもそもUserRepositoryに同値性を持たせることは必要だろうか?
コンストラクタでは無くメソッドの引数で渡すというのはどうでしょうか。
case class User( id: Int, name: String) { def rename(name: String): User = copy(name = name) def save(userRepository: UserRepository): Unit = userRepository.save(this) }
これはcase classの使い方として間違ってはいないですが、どうせそのUserインスタンスではいつも同じUserRepositoryを使うはずなのに、メソッド引数で毎回指定するというのは正直面倒です。そこでこの引数をimplicitにして引数を省略…とかやると破滅するのでやめましょう。
で、私の結論ですが、case classに副作用を混ぜようとすることを諦めて、補助的なクラスに切り出せば良いと思います。Userの操作なのでUserOpsとかでいいかな。
case class User( id: Int, name: String) { def rename(name: String): User = copy(name = name) } class UserOps(userRepository: userRepository) { def save(user: User): Unit = userRepository.save(user) }
これならcase classのequalsを壊すこともないし、テストもしやすい。主観だけどコンパニオンオブジェクトみたいなものと捉えれば違和感もあまりない。
ついでに、DIコンテナの恩恵を受けるのが簡単というメリットもあります。
case class User( id: Int, name: String) { def rename(name: String): User = copy(name = name) } class UserOps @Inject() (userRepository: userRepository) { def save(user: User): Unit = userRepository.save(user) }
ドメイン駆動設計ではよくデータと振る舞いが一緒にあるべきと言われますが、1つのクラスに押し込める必要はないかと思います。この場合はUser+UserOpsで1つのUserモデルと考えることになります。汎用的で使いやすいパターンではないかと思います。
というわけでタイトルは嘘で「Scalaのcase classに副作用のある振る舞いを持たせないパターン」でした。
typesafe configの設定パスをscalaのコードで表現する
ScalaMatsuriの感想ブログです
ScalaMatsuriでscala.metaの話を2つ聞いて面白そうと思ったので私もやってみました。 typesafe configをscala.metaとscalameta/paradiseのmacro annotationで設定パスを文字列ではなくscalaのコードで表現できるようにしたやつです。
こんな感じで使います。
// src/main/resources/application.conf akka { actor { serializers { akka-containers = "akka.remote.serialization.MessageContainerSerializer" } } }
import com.typesafe.config.ConfigFactory import com.github.tototoshi.configpath.compile @compile("src/main/resources/application.conf") object path object Example { def main(args: Array[String]): Unit = { val config = ConfigFactory.load() val serializer1 = config.getString( path.akka.actor.serializers.`akka-containers`.full) val serializer2 = config.getString( "akka.actor.serializers.akka-containers") assert(serializer1 == serializer2) } }
objectかclassにcompileアノテーションをつけて、typesafe configの設定ファイルパスを渡すとmacroでコードを生成します。 上記のような設定ファイルがあった時、次のコードは
@compile("src/main/resources/application.conf") object path
次のように展開されています。
object path { abstract class ConfigTree(val name: String, val full: String) object `akka` extends ConfigTree("akka", "akka") { object `actor` extends ConfigTree("actor", "akka.actor") { object `serializers` extends ConfigTree("serializers", "akka.actor.serializers") { object `akka-containers` extends ConfigTree("akka-containers", "akka.actor.serializers.akka-containers") } } } }
これで設定の名前を間違えることがなくなりました。すごい。IntelliJがマクロに弱くて赤くなるけどね。IntelliJが追いついて補完が効くようになれば意外と便利かもしれない。
Scalaで#map系メソッドで副作用を起こすとバグるやつ
遅延評価と副作用は相性悪いよねって話です。
Scalaでforeachではなくmapの中で副作用を起こすとたまに評価タイミングによるわかりづらいバグに遭遇することがあります。
次のコードはMap#mapValuesの中でscalikejdbcで書き込みを行おうとするコードです。
一見うまくいくようで、エラーになります。手元では java.sql.SQLException: Connection is null.
というエラーが発生しています。
import scalikejdbc._ val contents = DB.localTx { implicit session => data.mapValues { s => val text = s * 2 sql"insert into test_table(text) values ($text)".update().apply() text } } contents.foreach(println)
mapValuesに渡している関数の処理はcontentsが評価されるまで行われません。つまり contents.foreach(println)
の処理が行われるタイミングでDBへの書き込みを実行するのでその時にはトランザクションがcommitされてしまっているのです。
ちなみにこれはあくまでmapValuesの実装がそうなっているからで、mapでは起きませんでした。そんな実装依存でいいのかって話ですが、そんなこと言うと副作用はダメだ、参照透過なら何も問題ないだろ!と怒られます。怖いですね。
これは .view.force
とやってその場で評価させるとうまくいっちゃいます。
import scalikejdbc._ val contents = DB.localTx { implicit session => data.mapValues { s => val text = s * 2 sql"insert into test_table(text) values ($text)".update().apply() text }.view.force } contents.foreach(println)
いや、なんかひどいですねこれ。.view.force
?意味ないじゃん、と思って消すとバグります。
やっぱり普通に副作用は分けて、mapじゃなくてforeachにしましょう。
val texts = data.mapValues { s => s * 2 } DB.localTx { implicit session => texts.foreach { case (k, v) => sql"insert into test_table(text) values ($v)".update().apply() } }
for式の中のEitherでifを使いたい
scala 2.12ではJava8対応の変更が多く、派手な変更は多くはありませんでした。そんな中でEitherがright-biasedになったのは割とキャッチーなのではないでしょうか。Eitherには今までflatMapなどのメソッドがなかったので、for式の中で使うときなど、いったんRightProjectionに変換する必要があったのですが、それが必要なくなりました。
for { x <- Right(3).right } yield x
これが
for { x <- Right(3) } yield x
こう書けるようになりました。
これは便利ですね。まあrightが取れるだけなんですが、私もよく .right
つけろよとコンパイラに怒られていたので嬉しいです。
さて、これでscalaのEitherが実用的になったという声も聞こえるのですが、実際 .right
が不要になったから実用的かというと疑問で、どちらかというとscalaのEitherで困るのはfor式の中でifが使えないことだと思います。私はwithFilter問題と勝手に呼んでいてRightProjectionにwithFilterがないせいなんですが、この問題は2.12.xでもまだ健在です。
scala> for { | x <- Right(3) | if x % 2 == 1 | y = x + 1 | } yield y <console>:13: error: value withFilter is not a member of scala.util.Right[Nothing,Int] x <- Right(3) ^
ほら、if使えないでしょ。これは困りますね。そこでEitherに限った方法ではないんですが、ちょっとヘルパーを定義します。
def when[E](p: Boolean)(e: => E): Either[E, Unit] = if (p) Right(()) else Left(e)
このwhenはpの条件を満たせばRightを返すのでそれ以下のfor式も実行されます。満たさなければLeftを返してエラーにします。
scala> val result = for { | x <- Right(3) | _ <- when(x % 2 == 1)("error") | y = x + 1 | } yield y result: scala.util.Either[String,Int] = Right(4) scala> val result = for { | x <- Right(3) | _ <- when(x % 2 == 0)("error") | y = x + 1 | } yield y result: scala.util.Either[String,Int] = Left(error)
これでEitherでもifっぽいことをできるようになり、めでたしです。
追記
@gakuzzzzさんにEither.condの存在を教えられた。
def when[E](p: Boolean)(e: => E): Either[E, Unit] =
Either.cond(p, (), e)