マイクロアドのサーバサイドエンジニアの松宮です。本日は技術Tips記事ということで、Akka Streamsで例外を扱う時の注意点についてのお話です。
Akka Streamsはフローを分岐した複雑なグラフを作る事が簡単に出来ます。また、例外についても捕捉する手段はいくつか用意されており、エラーにも柔軟な対応が出来ます。しかし、非線形なグラフで例外を扱う時は注意が必要です。
線形なグラフにおける例外処理
まず、シンプルな一直線のストリーム処理から見ていきます。下記のコードは1~10の値を順番にprintするというストリーム処理です。
implicit val system = ActorSystem() implicit val ec = system.dispatcher implicit val materializer = ActorMaterializer() val stream = Source(1 to 10) .via(Flow[Int].map(a => if (a == 5) throw new Exception("5 is invalid") else a)) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) .runForeach(println) stream.onComplete { _ => println("Done!") system.terminate() }
ポイントは5の入力を受けた時だけ例外を発生させるという点です。 Akka Streamsではストリーム処理中に例外が発生すると「スーパービジョン戦略(Supervision Strategies)」に従って例外がハンドリングされるようになっています。 スーパービジョン戦略は下記の3パターンがあり、本稿ではResumeの戦略に着目して話を進めたいと思います。
- Stop: ストリームを停止
- Resume: 例外が発生した要素を除外して処理を続行
- Restart : ストリームを再起動して、処理を続行
掲載したストリーム処理を実行すると「5」の要素を処理する際に例外が発生するため、スーパービジョン戦略に従って「5」は捨てられ処理は続行します。その実行結果は以下のようになります。
- 実行結果
1 2 3 4 6 7 8 9 10 Done!
非線形なグラフで生じる問題
では、次に非線形なグラフでも同様に見て行きます。
implicit val system = ActorSystem() implicit val ec = system.dispatcher implicit val materializer = ActorMaterializer() val stream = Source(1 to 10) .via(graph) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) .runForeach(println) // 異常なフロー. 5が来ると例外を発生させるがTryで包み込み例外を伝えない private def dangerFlow: Flow[Int, String, NotUsed] = { Flow[Int].map(a => if (a == 5) throw new Exception("5 is invalid") else a.toString) } private def safeFlow: Flow[Int, String, NotUsed] = Flow[Int].map(s => "hello" + s) def graph = Flow.fromGraph(GraphDSL.create() { implicit b => import akka.stream.scaladsl.GraphDSL.Implicits._ val bcast = b.add(Broadcast[Int](2)) val zip = b.add(Zip[String, String]) bcast.out(0) ~> dangerFlow ~> zip.in0 bcast.out(1) ~> safeFlow ~> zip.in1 FlowShape(bcast.in, zip.out) }) stream.onComplete { _ => println("Done!") system.terminate() }
上記は1~10の要素をsafeFlowとdangerFlowに分岐させ、それぞれのフローで処理した後、Zipで1つの値にまとめるという非線形なストリーム処理です。 dangerFlowは例外を投げてしまいますが、先程と同じようにスーパビジョン戦略は「Resume」にしているので問題なく処理されると想定できます。では実行してみましょう。
- 実行結果
(1,hello1) (2,hello2) (3,hello3) (4,hello4)
なんと4の要素を処理して以降、ストリーム処理が詰まってしまいました。
後続のフローは例外を知らない
「5」だけスキップして処理を続行して欲しい所ですが、そう出来ない理由は後続のフローは例外の発生を感知できないためです。
dangerFlowが例外を投げスーパービジョン戦略によって処理を続行しようとしますが、Zip
はdangerFlowで例外が発生した事を知らないため、dangerFlowからの入力を永遠に待ち続けてしまいます。その結果、Broadcast
も全てのポートが利用可能にならないと新しい要素をemitできないためストリームが詰まってしまいます。(タイムアウトで制御する機能もありますが。タイムアウトすると処理の続行はできない)
解決方法
そこで、非線形なグラフで例外を扱うには、Try
等でラップし、例外を伝播させないようにします。
割と元も子もない解決策ですが、最もシンプルな解決策かと思います。
詳しくはAkka Streams pitfalls to avoid — part 2に掲載されています。
val stream = Source(1 to 10) .via(graph) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) .runForeach(println) // 異常なフロー. 5が来ると例外を発生させるがTryで包み込み例外を伝えない private def dangerFlow: Flow[Int, Try[String], NotUsed] = { Flow[Int].map(a => if (a == 5) Failure(new Exception("5 is invalid")) else Try(a.toString)) } private def safeFlow: Flow[Int, String, NotUsed] = Flow[Int].map(s => "hello" + s) def graph = Flow.fromGraph(GraphDSL.create() { implicit b => import akka.stream.scaladsl.GraphDSL.Implicits._ val bcast = b.add(Broadcast[Int](2)) val zip = b.add(Zip[Try[String], String]) bcast.out(0) ~> dangerFlow ~> zip.in0 bcast.out(1) ~> safeFlow ~> zip.in1 FlowShape(bcast.in, zip.out) }) stream.onComplete { _ => println("Done!") system.terminate() }
dangerFlowの例外をTry
でラップしたことで、例外が発生しても後続のフローへ何かしらの値が出力されるのでZip
が永遠に待つ事はなくなりました。
- 実行結果
(Success(1),hello1) (Success(2),hello2) (Success(3),hello3) (Success(4),hello4) (Failure(java.lang.Exception: 5 is invalid),hello5) (Success(6),hello6) (Success(7),hello7) (Success(8),hello8) (Success(9),hello9) (Success(10),hello10) Done!
めでたし!めでたし!
おわりに
Akka Streamsを使うことで、複雑なストリーム処理が格段に書きやすくなるメリットがある一方、まだまだ文献も少なく思わぬところで詰まったりするので今回の記事を通して誰かが救われれば幸いです。