summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Prokopec <aleksandar.prokopec@gmail.com>2011-12-07 16:55:37 +0100
committerAleksandar Prokopec <aleksandar.prokopec@gmail.com>2011-12-07 16:55:37 +0100
commitc9d10fc4f3d50892890b96db2fbb790457d15f43 (patch)
treeab06a5519ea51d301a2ce9306269afa6bf517cb6
parentcbad136be350087ebb29b9a36c1da893bbb18ec9 (diff)
parent5ccc928f7ec2e79c793ee1b31ca8e91321688749 (diff)
downloadscala-c9d10fc4f3d50892890b96db2fbb790457d15f43.tar.gz
scala-c9d10fc4f3d50892890b96db2fbb790457d15f43.tar.bz2
scala-c9d10fc4f3d50892890b96db2fbb790457d15f43.zip
Merge branch 'execution-context' of https://github.com/phaller/scala into execution-context
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala16
-rw-r--r--src/library/scala/concurrent/ForkJoinTaskImpl.scala21
-rw-r--r--src/library/scala/concurrent/package.scala5
-rw-r--r--test/files/jvm/concurrent-future.check16
-rw-r--r--test/files/jvm/concurrent-future.scala118
5 files changed, 160 insertions, 16 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index d54b6c370e..9606c28bab 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -12,24 +12,22 @@ trait ExecutionContext {
def execute(task: Runnable): Unit
- def task[T](task: () => T): Task[T]
+ def task[T](task: => T): Task[T]
def promise[T]: Promise[T]
+ def future[T](body: => T): Future[T] = {
+ val t = task(body)
+ t.start()
+ t.future
+ }
+
/** Only callable from the tasks running on the same execution context. */
def blockingCall[T](body: Blockable[T]): T
}
-object ExecutionContext {
-
- lazy val forNonBlocking = new ForkJoinExecutionContext
-
- //lazy val forBlocking = new BlockingExecutionContext
-
-}
-
sealed trait CanBlock
diff --git a/src/library/scala/concurrent/ForkJoinTaskImpl.scala b/src/library/scala/concurrent/ForkJoinTaskImpl.scala
index faa7ecb45a..9df4768ebb 100644
--- a/src/library/scala/concurrent/ForkJoinTaskImpl.scala
+++ b/src/library/scala/concurrent/ForkJoinTaskImpl.scala
@@ -13,7 +13,7 @@ import scala.annotation.tailrec
* to avoid an object allocation per promise. This requires turning DefaultPromise
* into a trait, i.e., removing its constructor parameters.
*/
-private[concurrent] class ForkJoinTaskImpl[T](val executionContext: ForkJoinExecutionContext, val body: () => T, val timeout: Timeout)
+private[concurrent] class ForkJoinTaskImpl[T](val executionContext: ForkJoinExecutionContext, body: => T, val timeout: Timeout)
extends RecursiveAction with Task[T] with Future[T] {
private val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[ForkJoinTaskImpl[T]], classOf[State[T]], "state")
@@ -46,7 +46,7 @@ extends RecursiveAction with Task[T] with Future[T] {
var cbs: List[Callback] = null
try {
- val res = body()
+ val res = body
processCallbacks(trySucceedState(res), Right(res))
} catch {
case t if isFutureThrowable(t) =>
@@ -83,7 +83,9 @@ extends RecursiveAction with Task[T] with Future[T] {
if (res != null) dispatch(new Runnable {
override def run() =
try callback(res)
- catch handledFutureException
+ catch handledFutureException andThen {
+ t => Console.err.println(t)
+ }
})
this
@@ -130,7 +132,16 @@ case class Failure[T](throwable: Throwable) extends State[T]
private[concurrent] final class ForkJoinExecutionContext extends ExecutionContext {
- val pool = new ForkJoinPool
+ val pool = {
+ val p = new ForkJoinPool
+ p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
+ def uncaughtException(t: Thread, throwable: Throwable) {
+ Console.err.println(throwable.getMessage)
+ throwable.printStackTrace(Console.err)
+ }
+ })
+ p
+ }
@inline
private def executeForkJoinTask(task: RecursiveAction) {
@@ -145,7 +156,7 @@ private[concurrent] final class ForkJoinExecutionContext extends ExecutionContex
executeForkJoinTask(action)
}
- def task[T](body: () => T): Task[T] = {
+ def task[T](body: => T): Task[T] = {
new ForkJoinTaskImpl(this, body, Timeout.never)
}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index b9e39a21a1..63faeef502 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -21,6 +21,9 @@ package object concurrent {
type CancellationException = java.util.concurrent.CancellationException
type TimeoutException = java.util.concurrent.TimeoutException
+ lazy val executionContext =
+ new ForkJoinExecutionContext
+
private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] {
override protected def initialValue = null
}
@@ -59,8 +62,6 @@ package object concurrent {
}
}
- def future[T](body: =>T): Future[T] = null // TODO
-
val handledFutureException: PartialFunction[Throwable, Throwable] = {
case t: Throwable if isFutureThrowable(t) => t
}
diff --git a/test/files/jvm/concurrent-future.check b/test/files/jvm/concurrent-future.check
new file mode 100644
index 0000000000..c55e824818
--- /dev/null
+++ b/test/files/jvm/concurrent-future.check
@@ -0,0 +1,16 @@
+test1: hai world
+test1: kthxbye
+test2: hai world
+test2: awsum thx
+test2: kthxbye
+test3: hai world
+test4: hai world
+test4: kthxbye
+test5: hai world
+test5: kthxbye
+test6: hai world
+test6: kthxbye
+test7: hai world
+test7: kthxbye
+test8: hai world
+test8: im in yr loop
diff --git a/test/files/jvm/concurrent-future.scala b/test/files/jvm/concurrent-future.scala
new file mode 100644
index 0000000000..9c2f04fb07
--- /dev/null
+++ b/test/files/jvm/concurrent-future.scala
@@ -0,0 +1,118 @@
+import scala.concurrent.{ executionContext, FutureTimeoutException, ExecutionException, SyncVar }
+import executionContext._
+
+object Test extends App {
+
+ def once(body: (() => Unit) => Unit) {
+ val sv = new SyncVar[Boolean]
+ body(() => sv put true)
+ sv.take()
+ }
+
+ def output(num: Int, msg: String) {
+ println("test" + num + ": " + msg)
+ }
+
+ def testOnSuccess(): Unit = once {
+ done =>
+ val f = future {
+ output(1, "hai world")
+ }
+ f onSuccess { _ =>
+ output(1, "kthxbye")
+ done()
+ }
+ }
+
+ def testOnSuccessWhenCompleted(): Unit = once {
+ done =>
+ val f = future {
+ output(2, "hai world")
+ }
+ f onSuccess { _ =>
+ output(2, "awsum thx")
+ f onSuccess { _ =>
+ output(2, "kthxbye")
+ done()
+ }
+ }
+ }
+
+ def testOnSuccessWhenFailed(): Unit = once {
+ done =>
+ val f = future {
+ output(3, "hai world")
+ done()
+ throw new Exception
+ }
+ f onSuccess { _ =>
+ output(3, "onoes")
+ }
+ }
+
+ def testOnFailure(): Unit = once {
+ done =>
+ val f = future {
+ output(4, "hai world")
+ throw new Exception
+ }
+ f onSuccess { _ =>
+ output(4, "onoes")
+ done()
+ }
+ f onFailure { _ =>
+ output(4, "kthxbye")
+ done()
+ }
+ }
+
+ def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once {
+ done =>
+ val f = future {
+ output(num, "hai world")
+ throw cause
+ }
+ f onSuccess { _ =>
+ output(num, "onoes")
+ done()
+ }
+ f onFailure {
+ case e: ExecutionException if (e.getCause == cause) =>
+ output(num, "kthxbye")
+ done()
+ case _ =>
+ output(num, "onoes")
+ done()
+ }
+ }
+
+ def testOnFailureWhenFutureTimeoutException(): Unit = once {
+ done =>
+ val f = future {
+ output(8, "hai world")
+ throw new FutureTimeoutException(null)
+ }
+ f onSuccess { _ =>
+ output(8, "onoes")
+ done()
+ }
+ f onFailure {
+ case e: FutureTimeoutException =>
+ output(8, "im in yr loop")
+ done()
+ case other =>
+ output(8, "onoes: " + other)
+ done()
+ }
+ }
+
+ testOnSuccess()
+ testOnSuccessWhenCompleted()
+ testOnSuccessWhenFailed()
+ testOnFailure()
+ testOnFailureWhenSpecialThrowable(5, new Error)
+ testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { })
+ testOnFailureWhenSpecialThrowable(7, new InterruptedException)
+ testOnFailureWhenFutureTimeoutException()
+
+}