From 4ea25c98d377b6b0369fa20aa9d4bfd3a3223ef6 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Wed, 7 Dec 2011 16:51:55 +0100 Subject: Add future method to ExecutionContext trait. Log uncaught exceptions to stderr. --- src/library/scala/concurrent/ExecutionContext.scala | 16 +++++++--------- src/library/scala/concurrent/ForkJoinTaskImpl.scala | 21 ++++++++++++++++----- src/library/scala/concurrent/package.scala | 5 +++-- 3 files changed, 26 insertions(+), 16 deletions(-) (limited to 'src') diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index d54b6c370e..9606c28bab 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -12,24 +12,22 @@ trait ExecutionContext { def execute(task: Runnable): Unit - def task[T](task: () => T): Task[T] + def task[T](task: => T): Task[T] def promise[T]: Promise[T] + def future[T](body: => T): Future[T] = { + val t = task(body) + t.start() + t.future + } + /** Only callable from the tasks running on the same execution context. */ def blockingCall[T](body: Blockable[T]): T } -object ExecutionContext { - - lazy val forNonBlocking = new ForkJoinExecutionContext - - //lazy val forBlocking = new BlockingExecutionContext - -} - sealed trait CanBlock diff --git a/src/library/scala/concurrent/ForkJoinTaskImpl.scala b/src/library/scala/concurrent/ForkJoinTaskImpl.scala index faa7ecb45a..9df4768ebb 100644 --- a/src/library/scala/concurrent/ForkJoinTaskImpl.scala +++ b/src/library/scala/concurrent/ForkJoinTaskImpl.scala @@ -13,7 +13,7 @@ import scala.annotation.tailrec * to avoid an object allocation per promise. This requires turning DefaultPromise * into a trait, i.e., removing its constructor parameters. */ -private[concurrent] class ForkJoinTaskImpl[T](val executionContext: ForkJoinExecutionContext, val body: () => T, val timeout: Timeout) +private[concurrent] class ForkJoinTaskImpl[T](val executionContext: ForkJoinExecutionContext, body: => T, val timeout: Timeout) extends RecursiveAction with Task[T] with Future[T] { private val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[ForkJoinTaskImpl[T]], classOf[State[T]], "state") @@ -46,7 +46,7 @@ extends RecursiveAction with Task[T] with Future[T] { var cbs: List[Callback] = null try { - val res = body() + val res = body processCallbacks(trySucceedState(res), Right(res)) } catch { case t if isFutureThrowable(t) => @@ -83,7 +83,9 @@ extends RecursiveAction with Task[T] with Future[T] { if (res != null) dispatch(new Runnable { override def run() = try callback(res) - catch handledFutureException + catch handledFutureException andThen { + t => Console.err.println(t) + } }) this @@ -130,7 +132,16 @@ case class Failure[T](throwable: Throwable) extends State[T] private[concurrent] final class ForkJoinExecutionContext extends ExecutionContext { - val pool = new ForkJoinPool + val pool = { + val p = new ForkJoinPool + p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler { + def uncaughtException(t: Thread, throwable: Throwable) { + Console.err.println(throwable.getMessage) + throwable.printStackTrace(Console.err) + } + }) + p + } @inline private def executeForkJoinTask(task: RecursiveAction) { @@ -145,7 +156,7 @@ private[concurrent] final class ForkJoinExecutionContext extends ExecutionContex executeForkJoinTask(action) } - def task[T](body: () => T): Task[T] = { + def task[T](body: => T): Task[T] = { new ForkJoinTaskImpl(this, body, Timeout.never) } diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index b9e39a21a1..63faeef502 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -21,6 +21,9 @@ package object concurrent { type CancellationException = java.util.concurrent.CancellationException type TimeoutException = java.util.concurrent.TimeoutException + lazy val executionContext = + new ForkJoinExecutionContext + private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] { override protected def initialValue = null } @@ -59,8 +62,6 @@ package object concurrent { } } - def future[T](body: =>T): Future[T] = null // TODO - val handledFutureException: PartialFunction[Throwable, Throwable] = { case t: Throwable if isFutureThrowable(t) => t } -- cgit v1.2.3