scala - Why don't Akka Streams application terminate normally? -


i wrote simple application using alpakka cassandra library

package com.abhi  import akka.actor.actorsystem import akka.stream.{actormaterializer, closedshape} import akka.stream.alpakka.cassandra.scaladsl.cassandrasource import akka.stream.scaladsl.{flow, graphdsl, runnablegraph, sink} import com.datastax.driver.core.{cluster, row, simplestatement} import scala.concurrent.await import scala.concurrent.duration._  object myapp extends app {    implicit val actorsystem = actorsystem()    implicit val actormaterializer = actormaterializer()    implicit val session = cluster       .builder       .addcontactpoints(list("localhost") :_*)       .withport(9042)       .withcredentials("foo", "bar")       .build       .connect("foobar")    val stmt = new simplestatement("select col1, col2 foo").setfetchsize(20)    val source = cassandrasource(stmt)    val tofoo = flow[row].map(row => foo(row.getlong(0), row.long(1)))    val sink = sink.foreach[foo](foo => println(foo.col1, foo.col2))    val graph = runnablegraph.fromgraph(graphdsl.create(sink){ implicit b =>       s =>       import graphdsl.implicits._       source.take(10) ~> tofoo ~> s       closedshape    })    // let run graph    val future = graph.run()    import actorsystem.dispatcher    future.oncomplete{_ =>       session.close()       await.result(actorsystem.terminate(), duration.inf)    }    await.result(future, duration.inf)    system.exit(0) }  case class foo(col1: long, col2: long) 

this application runs expected prints 10 rows on screen.

but post hangs. when system.exit(0) call executed throws exception

exception: sbt.trapexitsecurityexception thrown uncaughtexceptionhandler in thread "run-main-0" 

but still application not stop running. hangs.

i don't understand why doesn't application terminate (in fact shouldn't need system.exit(0) call.

the way exit application via control c.

this might happen because sbt runs code in own jvm instance, system.exit exit sbt's jvm giving above result.

did try setting: fork in run := true somewhere in sbt build?

i'm not sure idea use actorsystem.dispatcher execute oncomplete callback (because use wait termination of actor system itself).

something try instead:

import actorsystem.dispatcher future.oncomplete{ _ =>   session.close()   actorsystem.terminate() } await.result(actorsystem.whenterminated, duration.inf) 

note jvm exit without needing call system.exit when threads left daemon threads (see example what daemon thread in java?).


Comments

Popular posts from this blog

angular - Ionic slides - dynamically add slides before and after -

minify - Minimizing css files -

Add a dynamic header in angular 2 http provider -