summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Prokopec <aleksandar.prokopec@gmail.com>2011-12-05 16:43:22 +0100
committerAleksandar Prokopec <aleksandar.prokopec@gmail.com>2011-12-05 16:43:22 +0100
commitf01ab43357b2b565d94ec09bdef82f2437da0db9 (patch)
treea42da3e91eeb384cc5e8da454a852422866c9d66
parentd38ca89f8d50c0d6eafac623f0edc165c01f8bd4 (diff)
downloadscala-f01ab43357b2b565d94ec09bdef82f2437da0db9.tar.gz
scala-f01ab43357b2b565d94ec09bdef82f2437da0db9.tar.bz2
scala-f01ab43357b2b565d94ec09bdef82f2437da0db9.zip
The default implementations for some of the future methods.
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala14
-rw-r--r--src/library/scala/concurrent/Future.scala47
-rw-r--r--src/library/scala/concurrent/Promise.scala30
-rw-r--r--src/library/scala/concurrent/package.scala9
4 files changed, 85 insertions, 15 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 972a76a95a..5ab712b89a 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -1,30 +1,34 @@
package scala.concurrent
+
import java.util.concurrent.{ Executors, Future => JFuture }
import scala.util.{ Duration, Timeout }
import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
-trait ExecutionContext {
+trait ExecutionContext {
+
protected implicit object CanBlockEvidence extends CanBlock
def execute(task: Runnable): Unit
def makeTask[T](task: () => T)(implicit timeout: Timeout): Task[T]
- def makePromise[T](timeout: Timeout): Promise[T]
+ def makePromise[T](implicit timeout: Timeout): Promise[T]
def blockingCall[T](body: Blockable[T]): T
-
+
}
-trait Task[T] {
+trait Task[T] {
+
def start(): Unit
def future: Future[T]
-
+
}
+
/* DONE: The challenge is to make ForkJoinPromise inherit from RecursiveAction
* to avoid an object allocation per promise. This requires turning DefaultPromise
* into a trait, i.e., removing its constructor parameters.
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 222c5d18b6..16741ad50c 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -42,7 +42,7 @@ import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger,
* Futures obtained through combinators have the same exception as the future they were obtained from.
* The following throwable objects are treated differently:
* - Error - errors are not contained within futures
- * - NonLocalControlException - not contained within futures
+ * - scala.util.control.ControlException - not contained within futures
* - InterruptedException - not contained within futures
*
* @define forComprehensionExamples
@@ -64,6 +64,7 @@ import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger,
* }}}
*/
trait Future[+T] extends Blockable[T] {
+self =>
/* Callbacks */
@@ -79,7 +80,9 @@ trait Future[+T] extends Blockable[T] {
*
* $multipleCallbacks
*/
- def onSuccess[U](func: T => U): this.type
+ def onSuccess[U](func: T => U): this.type = onComplete {
+ case Right(v) => func(v)
+ }
/** When this future is completed with a failure (i.e. with a throwable),
* apply the provided function to the throwable.
@@ -95,7 +98,9 @@ trait Future[+T] extends Blockable[T] {
*
* $multipleCallbacks
*/
- def onFailure[U](func: Throwable => U): this.type
+ def onFailure[U](func: Throwable => U): this.type = onComplete {
+ case Left(t) if isFutureThrowable(t) => func(t)
+ }
/** When this future times out, apply the provided function.
*
@@ -104,7 +109,9 @@ trait Future[+T] extends Blockable[T] {
*
* $multipleCallbacks
*/
- def onTimeout[U](func: => U): this.type
+ def onTimeout[U](body: =>U): this.type = onComplete {
+ case Left(te: TimeoutException) => body
+ }
/** When this future is completed, either through an exception, a timeout, or a value,
* apply the provided function.
@@ -143,9 +150,23 @@ trait Future[+T] extends Blockable[T] {
/* Projections */
- def failed: Future[Exception]
-
- def timedout: Future[Timeout]
+ def failed: Future[Throwable] = new Future[Throwable] {
+ def onComplete[U](func: Either[Throwable, Throwable] => U) = self.onComplete {
+ case Left(t) => func(Right(t))
+ case Right(v) => // do nothing
+ }
+ def isTimedout = self.isTimedout
+ def timeout = self.timeout
+ }
+
+ def timedout: Future[TimeoutException] = new Future[TimeoutException] {
+ def onComplete[U](func: Either[Throwable, TimeoutException] => U) = self.onComplete {
+ case Left(te: TimeoutException) => func(Right(te))
+ case _ => // do nothing
+ }
+ def isTimedout = self.isTimedout
+ def timeout = self.timeout
+ }
/* Monadic operations */
@@ -209,6 +230,14 @@ trait Future[+T] extends Blockable[T] {
}
-class FutureTimeoutException(message: String, cause: Throwable = null) extends Exception(message, cause) {
- def this(message: String) = this(message, null)
+
+/** A timeout exception.
+ *
+ * Futures are failed with a timeout exception when their timeout expires.
+ *
+ * Each timeout exception contains an origin future which originally timed out.
+ */
+class TimeoutException(origin: Future[T], message: String, cause: Throwable = null) extends Exception(message, cause) {
+ def this(origin: Future[T], message: String) = this(origin, message, null)
+ def this(origin: Future[T]) = this(origin, "Future timed out.")
}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index 109f6b4eba..4c6b11dfd6 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -8,13 +8,43 @@
package scala.concurrent
+
+
+/** Promise is an object which can be completed with a value or failed
+ * with an exception.
+ *
+ * A promise is assigned a timeout when created. After the timeout expires,
+ * the promise will be failed with a TimeoutException.
+ *
+ * @promiseCompletion
+ * If the promise has already been fulfilled, failed or has timed out,
+ * calling this method will throw an IllegalStateException.
+ */
trait Promise[T] {
def future: Future[T]
+ /** Completes the promise with a value.
+ *
+ * @param value The value to complete the promise with.
+ *
+ * $promiseCompletion
+ */
def fulfill(value: T): Unit
+
+ /** Completes the promise with an exception.
+ *
+ * @param t The throwable to complete the promise with.
+ *
+ * $promiseCompletion
+ */
def fail(t: Throwable): Unit
+
+ /** The timeout for this promise.
+ */
+ def timeout: Timeout
}
+
object Promise {
/*
/**
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 2a1ff9b799..64ccdeb0ab 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -36,9 +36,16 @@ package object concurrent {
}
}
-
def future[T](body: =>T): Future[T] = null // TODO
+ // TODO rename appropriately and make public
+ private[concurrent] def isFutureThrowable(t: Throwable) = t match {
+ case e: Error => false
+ case t: scala.util.control.ControlThrowable => false
+ case i: InterruptException => false
+ case _ => true
+ }
+
}