scala - Why don't Akka Streams application terminate normally? -
i wrote simple application using alpakka cassandra library
package com.abhi import akka.actor.actorsystem import akka.stream.{actormaterializer, closedshape} import akka.stream.alpakka.cassandra.scaladsl.cassandrasource import akka.stream.scaladsl.{flow, graphdsl, runnablegraph, sink} import com.datastax.driver.core.{cluster, row, simplestatement} import scala.concurrent.await import scala.concurrent.duration._ object myapp extends app { implicit val actorsystem = actorsystem() implicit val actormaterializer = actormaterializer() implicit val session = cluster .builder .addcontactpoints(list("localhost") :_*) .withport(9042) .withcredentials("foo", "bar") .build .connect("foobar") val stmt = new simplestatement("select col1, col2 foo").setfetchsize(20) val source = cassandrasource(stmt) val tofoo = flow[row].map(row => foo(row.getlong(0), row.long(1))) val sink = sink.foreach[foo](foo => println(foo.col1, foo.col2)) val graph = runnablegraph.fromgraph(graphdsl.create(sink){ implicit b => s => import graphdsl.implicits._ source.take(10) ~> tofoo ~> s closedshape }) // let run graph val future = graph.run() import actorsystem.dispatcher future.oncomplete{_ => session.close() await.result(actorsystem.terminate(), duration.inf) } await.result(future, duration.inf) system.exit(0) } case class foo(col1: long, col2: long)
this application runs expected prints 10 rows on screen.
but post hangs. when system.exit(0)
call executed throws exception
exception: sbt.trapexitsecurityexception thrown uncaughtexceptionhandler in thread "run-main-0"
but still application not stop running. hangs.
i don't understand why doesn't application terminate (in fact shouldn't need system.exit(0) call.
the way exit application via control c.
this might happen because sbt runs code in own jvm instance, system.exit
exit sbt's jvm giving above result.
did try setting: fork in run := true
somewhere in sbt build?
i'm not sure idea use actorsystem.dispatcher
execute oncomplete
callback (because use wait termination of actor system itself).
something try instead:
import actorsystem.dispatcher future.oncomplete{ _ => session.close() actorsystem.terminate() } await.result(actorsystem.whenterminated, duration.inf)
note jvm exit without needing call system.exit
when threads left daemon threads (see example what daemon thread in java?).
Comments
Post a Comment