summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdriaan Moors <adriaan.moors@typesafe.com>2013-02-02 12:21:44 -0800
committerAdriaan Moors <adriaan.moors@typesafe.com>2013-02-02 12:21:44 -0800
commit0bb743170377ba6e110990d6d0fff764ce1fd153 (patch)
tree67ff511e6facf249c2448ee78e90275d0af06085
parentad2a69e313eb1ad25d44d42cbb34b1be086c3358 (diff)
parent5275baee6c563418f53abd2486764be08916c8c5 (diff)
downloadscala-0bb743170377ba6e110990d6d0fff764ce1fd153.tar.gz
scala-0bb743170377ba6e110990d6d0fff764ce1fd153.tar.bz2
scala-0bb743170377ba6e110990d6d0fff764ce1fd153.zip
Merge pull request #2044 from phaller/issue/7029
SI-7029 - Makes sure that uncaught exceptions are propagated to the UEH ...
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala34
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala2
-rw-r--r--test/files/jvm/future-spec/FutureTests.scala14
3 files changed, 40 insertions, 10 deletions
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 215f90b17e..77625e381c 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -25,11 +25,15 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
case some => some
}
+ private val uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler {
+ def uncaughtException(thread: Thread, cause: Throwable): Unit = reporter(cause)
+ }
+
// Implement BlockContext on FJP threads
class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
def wire[T <: Thread](thread: T): T = {
thread.setDaemon(daemonic)
- //Potentially set things like uncaught exception handler, name etc
+ thread.setUncaughtExceptionHandler(uncaughtExceptionHandler)
thread
}
@@ -73,7 +77,7 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
new ForkJoinPool(
desiredParallelism,
threadFactory,
- null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does
+ uncaughtExceptionHandler,
true) // Async all the way baby
} catch {
case NonFatal(t) =>
@@ -94,13 +98,13 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
def execute(runnable: Runnable): Unit = executor match {
case fj: ForkJoinPool =>
+ val fjt = runnable match {
+ case t: ForkJoinTask[_] => t
+ case r => new ExecutionContextImpl.AdaptedForkJoinTask(r)
+ }
Thread.currentThread match {
- case fjw: ForkJoinWorkerThread if fjw.getPool eq fj =>
- (runnable match {
- case fjt: ForkJoinTask[_] => fjt
- case _ => ForkJoinTask.adapt(runnable)
- }).fork
- case _ => fj.execute(runnable)
+ case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => fjt.fork()
+ case _ => fj execute fjt
}
case generic => generic execute runnable
}
@@ -111,6 +115,20 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
private[concurrent] object ExecutionContextImpl {
+ final class AdaptedForkJoinTask(runnable: Runnable) extends ForkJoinTask[Unit] {
+ final override def setRawResult(u: Unit): Unit = ()
+ final override def getRawResult(): Unit = ()
+ final override def exec(): Boolean = try { runnable.run(); true } catch {
+ case anything: Throwable ⇒
+ val t = Thread.currentThread
+ t.getUncaughtExceptionHandler match {
+ case null ⇒
+ case some ⇒ some.uncaughtException(t, anything)
+ }
+ throw anything
+ }
+ }
+
def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService =
new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService {
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index e9da45a079..52f1075137 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -34,7 +34,7 @@ private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete
value = v
// Note that we cannot prepare the ExecutionContext at this point, since we might
// already be running on a different thread!
- executor.execute(this)
+ try executor.execute(this) catch { case NonFatal(t) => executor reportFailure t }
}
}
diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala
index 8674be168c..0efa83fbd9 100644
--- a/test/files/jvm/future-spec/FutureTests.scala
+++ b/test/files/jvm/future-spec/FutureTests.scala
@@ -70,7 +70,19 @@ object FutureTests extends MinimalScalaTest {
//FIXME should check
}
}
-
+
+ "The default ExecutionContext" should {
+ "report uncaught exceptions" in {
+ val p = Promise[Throwable]()
+ val logThrowable: Throwable => Unit = p.trySuccess(_)
+ val ec: ExecutionContext = ExecutionContext.fromExecutor(null, logThrowable)
+
+ val t = new NotImplementedError("foo")
+ val f = Future(throw t)(ec)
+ Await.result(p.future, 2.seconds) mustBe t
+ }
+ }
+
"A future with global ExecutionContext" should {
import ExecutionContext.Implicits._