diff options
author | Philipp Haller <hallerp@gmail.com> | 2011-12-07 16:51:55 +0100 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2011-12-07 16:51:55 +0100 |
commit | 4ea25c98d377b6b0369fa20aa9d4bfd3a3223ef6 (patch) | |
tree | 305eb48d4c7c6c672b9db6ef9f4a065d10400223 | |
parent | 4b62e8059c1f0f8cb4624291b0aa64e6e460948e (diff) | |
download | scala-4ea25c98d377b6b0369fa20aa9d4bfd3a3223ef6.tar.gz scala-4ea25c98d377b6b0369fa20aa9d4bfd3a3223ef6.tar.bz2 scala-4ea25c98d377b6b0369fa20aa9d4bfd3a3223ef6.zip |
Add future method to ExecutionContext trait. Log uncaught exceptions to stderr.
-rw-r--r-- | src/library/scala/concurrent/ExecutionContext.scala | 16 | ||||
-rw-r--r-- | src/library/scala/concurrent/ForkJoinTaskImpl.scala | 21 | ||||
-rw-r--r-- | src/library/scala/concurrent/package.scala | 5 |
3 files changed, 26 insertions, 16 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index d54b6c370e..9606c28bab 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -12,24 +12,22 @@ trait ExecutionContext { def execute(task: Runnable): Unit - def task[T](task: () => T): Task[T] + def task[T](task: => T): Task[T] def promise[T]: Promise[T] + def future[T](body: => T): Future[T] = { + val t = task(body) + t.start() + t.future + } + /** Only callable from the tasks running on the same execution context. */ def blockingCall[T](body: Blockable[T]): T } -object ExecutionContext { - - lazy val forNonBlocking = new ForkJoinExecutionContext - - //lazy val forBlocking = new BlockingExecutionContext - -} - sealed trait CanBlock diff --git a/src/library/scala/concurrent/ForkJoinTaskImpl.scala b/src/library/scala/concurrent/ForkJoinTaskImpl.scala index faa7ecb45a..9df4768ebb 100644 --- a/src/library/scala/concurrent/ForkJoinTaskImpl.scala +++ b/src/library/scala/concurrent/ForkJoinTaskImpl.scala @@ -13,7 +13,7 @@ import scala.annotation.tailrec * to avoid an object allocation per promise. This requires turning DefaultPromise * into a trait, i.e., removing its constructor parameters. */ -private[concurrent] class ForkJoinTaskImpl[T](val executionContext: ForkJoinExecutionContext, val body: () => T, val timeout: Timeout) +private[concurrent] class ForkJoinTaskImpl[T](val executionContext: ForkJoinExecutionContext, body: => T, val timeout: Timeout) extends RecursiveAction with Task[T] with Future[T] { private val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[ForkJoinTaskImpl[T]], classOf[State[T]], "state") @@ -46,7 +46,7 @@ extends RecursiveAction with Task[T] with Future[T] { var cbs: List[Callback] = null try { - val res = body() + val res = body processCallbacks(trySucceedState(res), Right(res)) } catch { case t if isFutureThrowable(t) => @@ -83,7 +83,9 @@ extends RecursiveAction with Task[T] with Future[T] { if (res != null) dispatch(new Runnable { override def run() = try callback(res) - catch handledFutureException + catch handledFutureException andThen { + t => Console.err.println(t) + } }) this @@ -130,7 +132,16 @@ case class Failure[T](throwable: Throwable) extends State[T] private[concurrent] final class ForkJoinExecutionContext extends ExecutionContext { - val pool = new ForkJoinPool + val pool = { + val p = new ForkJoinPool + p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler { + def uncaughtException(t: Thread, throwable: Throwable) { + Console.err.println(throwable.getMessage) + throwable.printStackTrace(Console.err) + } + }) + p + } @inline private def executeForkJoinTask(task: RecursiveAction) { @@ -145,7 +156,7 @@ private[concurrent] final class ForkJoinExecutionContext extends ExecutionContex executeForkJoinTask(action) } - def task[T](body: () => T): Task[T] = { + def task[T](body: => T): Task[T] = { new ForkJoinTaskImpl(this, body, Timeout.never) } diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index b9e39a21a1..63faeef502 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -21,6 +21,9 @@ package object concurrent { type CancellationException = java.util.concurrent.CancellationException type TimeoutException = java.util.concurrent.TimeoutException + lazy val executionContext = + new ForkJoinExecutionContext + private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] { override protected def initialValue = null } @@ -59,8 +62,6 @@ package object concurrent { } } - def future[T](body: =>T): Future[T] = null // TODO - val handledFutureException: PartialFunction[Throwable, Throwable] = { case t: Throwable if isFutureThrowable(t) => t } |