diff options
-rw-r--r-- | src/library/scala/concurrent/impl/ExecutionContextImpl.scala | 34 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Promise.scala | 2 | ||||
-rw-r--r-- | test/files/jvm/future-spec/FutureTests.scala | 14 |
3 files changed, 40 insertions, 10 deletions
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 215f90b17e..77625e381c 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -25,11 +25,15 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: case some => some } + private val uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler { + def uncaughtException(thread: Thread, cause: Throwable): Unit = reporter(cause) + } + // Implement BlockContext on FJP threads class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { def wire[T <: Thread](thread: T): T = { thread.setDaemon(daemonic) - //Potentially set things like uncaught exception handler, name etc + thread.setUncaughtExceptionHandler(uncaughtExceptionHandler) thread } @@ -73,7 +77,7 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: new ForkJoinPool( desiredParallelism, threadFactory, - null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does + uncaughtExceptionHandler, true) // Async all the way baby } catch { case NonFatal(t) => @@ -94,13 +98,13 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: def execute(runnable: Runnable): Unit = executor match { case fj: ForkJoinPool => + val fjt = runnable match { + case t: ForkJoinTask[_] => t + case r => new ExecutionContextImpl.AdaptedForkJoinTask(r) + } Thread.currentThread match { - case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => - (runnable match { - case fjt: ForkJoinTask[_] => fjt - case _ => ForkJoinTask.adapt(runnable) - }).fork - case _ => fj.execute(runnable) + case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => fjt.fork() + case _ => fj execute fjt } case generic => generic execute runnable } @@ -111,6 +115,20 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: private[concurrent] object ExecutionContextImpl { + final class AdaptedForkJoinTask(runnable: Runnable) extends ForkJoinTask[Unit] { + final override def setRawResult(u: Unit): Unit = () + final override def getRawResult(): Unit = () + final override def exec(): Boolean = try { runnable.run(); true } catch { + case anything: Throwable ⇒ + val t = Thread.currentThread + t.getUncaughtExceptionHandler match { + case null ⇒ + case some ⇒ some.uncaughtException(t, anything) + } + throw anything + } + } + def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter) def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService = new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService { diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index e9da45a079..52f1075137 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -34,7 +34,7 @@ private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete value = v // Note that we cannot prepare the ExecutionContext at this point, since we might // already be running on a different thread! - executor.execute(this) + try executor.execute(this) catch { case NonFatal(t) => executor reportFailure t } } } diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala index 8674be168c..0efa83fbd9 100644 --- a/test/files/jvm/future-spec/FutureTests.scala +++ b/test/files/jvm/future-spec/FutureTests.scala @@ -70,7 +70,19 @@ object FutureTests extends MinimalScalaTest { //FIXME should check } } - + + "The default ExecutionContext" should { + "report uncaught exceptions" in { + val p = Promise[Throwable]() + val logThrowable: Throwable => Unit = p.trySuccess(_) + val ec: ExecutionContext = ExecutionContext.fromExecutor(null, logThrowable) + + val t = new NotImplementedError("foo") + val f = Future(throw t)(ec) + Await.result(p.future, 2.seconds) mustBe t + } + } + "A future with global ExecutionContext" should { import ExecutionContext.Implicits._ |