summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2011-12-07 16:51:55 +0100
committerPhilipp Haller <hallerp@gmail.com>2011-12-07 16:51:55 +0100
commit4ea25c98d377b6b0369fa20aa9d4bfd3a3223ef6 (patch)
tree305eb48d4c7c6c672b9db6ef9f4a065d10400223
parent4b62e8059c1f0f8cb4624291b0aa64e6e460948e (diff)
downloadscala-4ea25c98d377b6b0369fa20aa9d4bfd3a3223ef6.tar.gz
scala-4ea25c98d377b6b0369fa20aa9d4bfd3a3223ef6.tar.bz2
scala-4ea25c98d377b6b0369fa20aa9d4bfd3a3223ef6.zip
Add future method to ExecutionContext trait. Log uncaught exceptions to stderr.
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala16
-rw-r--r--src/library/scala/concurrent/ForkJoinTaskImpl.scala21
-rw-r--r--src/library/scala/concurrent/package.scala5
3 files changed, 26 insertions, 16 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index d54b6c370e..9606c28bab 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -12,24 +12,22 @@ trait ExecutionContext {
def execute(task: Runnable): Unit
- def task[T](task: () => T): Task[T]
+ def task[T](task: => T): Task[T]
def promise[T]: Promise[T]
+ def future[T](body: => T): Future[T] = {
+ val t = task(body)
+ t.start()
+ t.future
+ }
+
/** Only callable from the tasks running on the same execution context. */
def blockingCall[T](body: Blockable[T]): T
}
-object ExecutionContext {
-
- lazy val forNonBlocking = new ForkJoinExecutionContext
-
- //lazy val forBlocking = new BlockingExecutionContext
-
-}
-
sealed trait CanBlock
diff --git a/src/library/scala/concurrent/ForkJoinTaskImpl.scala b/src/library/scala/concurrent/ForkJoinTaskImpl.scala
index faa7ecb45a..9df4768ebb 100644
--- a/src/library/scala/concurrent/ForkJoinTaskImpl.scala
+++ b/src/library/scala/concurrent/ForkJoinTaskImpl.scala
@@ -13,7 +13,7 @@ import scala.annotation.tailrec
* to avoid an object allocation per promise. This requires turning DefaultPromise
* into a trait, i.e., removing its constructor parameters.
*/
-private[concurrent] class ForkJoinTaskImpl[T](val executionContext: ForkJoinExecutionContext, val body: () => T, val timeout: Timeout)
+private[concurrent] class ForkJoinTaskImpl[T](val executionContext: ForkJoinExecutionContext, body: => T, val timeout: Timeout)
extends RecursiveAction with Task[T] with Future[T] {
private val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[ForkJoinTaskImpl[T]], classOf[State[T]], "state")
@@ -46,7 +46,7 @@ extends RecursiveAction with Task[T] with Future[T] {
var cbs: List[Callback] = null
try {
- val res = body()
+ val res = body
processCallbacks(trySucceedState(res), Right(res))
} catch {
case t if isFutureThrowable(t) =>
@@ -83,7 +83,9 @@ extends RecursiveAction with Task[T] with Future[T] {
if (res != null) dispatch(new Runnable {
override def run() =
try callback(res)
- catch handledFutureException
+ catch handledFutureException andThen {
+ t => Console.err.println(t)
+ }
})
this
@@ -130,7 +132,16 @@ case class Failure[T](throwable: Throwable) extends State[T]
private[concurrent] final class ForkJoinExecutionContext extends ExecutionContext {
- val pool = new ForkJoinPool
+ val pool = {
+ val p = new ForkJoinPool
+ p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
+ def uncaughtException(t: Thread, throwable: Throwable) {
+ Console.err.println(throwable.getMessage)
+ throwable.printStackTrace(Console.err)
+ }
+ })
+ p
+ }
@inline
private def executeForkJoinTask(task: RecursiveAction) {
@@ -145,7 +156,7 @@ private[concurrent] final class ForkJoinExecutionContext extends ExecutionContex
executeForkJoinTask(action)
}
- def task[T](body: () => T): Task[T] = {
+ def task[T](body: => T): Task[T] = {
new ForkJoinTaskImpl(this, body, Timeout.never)
}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index b9e39a21a1..63faeef502 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -21,6 +21,9 @@ package object concurrent {
type CancellationException = java.util.concurrent.CancellationException
type TimeoutException = java.util.concurrent.TimeoutException
+ lazy val executionContext =
+ new ForkJoinExecutionContext
+
private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] {
override protected def initialValue = null
}
@@ -59,8 +62,6 @@ package object concurrent {
}
}
- def future[T](body: =>T): Future[T] = null // TODO
-
val handledFutureException: PartialFunction[Throwable, Throwable] = {
case t: Throwable if isFutureThrowable(t) => t
}