summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/concurrent/Awaitable.scala7
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala2
-rw-r--r--src/library/scala/concurrent/Future.scala24
-rw-r--r--src/library/scala/concurrent/Promise.scala18
-rw-r--r--src/library/scala/concurrent/akka/AbstractPromise.java21
-rw-r--r--src/library/scala/concurrent/akka/Promise.scala165
-rw-r--r--src/library/scala/concurrent/akka/package.scala3
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala17
-rw-r--r--src/library/scala/concurrent/package.scala32
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala12
10 files changed, 250 insertions, 51 deletions
diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala
index 52fd3b9516..c38e668f30 100644
--- a/src/library/scala/concurrent/Awaitable.scala
+++ b/src/library/scala/concurrent/Awaitable.scala
@@ -11,15 +11,14 @@ package scala.concurrent
import scala.annotation.implicitNotFound
-import scala.util.Timeout
+import scala.util.Duration
trait Awaitable[+T] {
- @implicitNotFound(msg = "Waiting must be done by calling `await(timeout) b`, where `b` is the `Awaitable` object.")
- def await(timeout: Timeout)(implicit canawait: CanAwait): T
+ @implicitNotFound(msg = "Waiting must be done by calling `blocking(timeout) b`, where `b` is the `Awaitable` object or a potentially blocking piece of code.")
+ def await(atMost: Duration)(implicit canawait: CanAwait): T
}
-
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index ebeeca995e..38a28044e1 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -23,7 +23,7 @@ trait ExecutionContext {
def future[T](body: => T): Future[T]
/** Only callable from the tasks running on the same execution context. */
- def blockingCall[T](timeout: Timeout, body: Awaitable[T]): T
+ def blockingCall[T](body: Awaitable[T]): T
}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 4002239fc4..e6edaea87a 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -29,6 +29,18 @@ import scala.collection.generic.CanBuildFrom
/** The trait that represents futures.
*
+ * Asynchronous computations that yield futures are created with the `future` call:
+ *
+ * {{{
+ * val s = "Hello"
+ * val f: Future[String] = future {
+ * s + " future!"
+ * }
+ * f onSuccess {
+ * case msg => println(msg)
+ * }
+ * }}}
+ *
* @define multipleCallbacks
* Multiple callbacks may be registered; there is no guarantee that they will be
* executed in a particular order.
@@ -37,12 +49,14 @@ import scala.collection.generic.CanBuildFrom
* The future may contain a throwable object and this means that the future failed.
* Futures obtained through combinators have the same exception as the future they were obtained from.
* The following throwable objects are not contained in the future:
- * - Error - errors are not contained within futures
- * - scala.util.control.ControlThrowable - not contained within futures
- * - InterruptedException - not contained within futures
+ * - `Error` - errors are not contained within futures
+ * - `InterruptedException` - not contained within futures
+ * - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures
*
* Instead, the future is completed with a ExecutionException with one of the exceptions above
* as the cause.
+ * If a future is failed with a `scala.runtime.NonLocalReturnControl`,
+ * it is completed with a value instead from that throwable instead instead.
*
* @define forComprehensionExamples
* Example:
@@ -146,10 +160,10 @@ self =>
}
this
}
- def await(timeout: Timeout)(implicit canawait: CanAwait): Throwable = {
+ def await(atMost: Duration)(implicit canawait: CanAwait): Throwable = {
var t: Throwable = null
try {
- val res = self.await(timeout)
+ val res = self.await(atMost)
t = noSuchElem(res)
} catch {
case t: Throwable => return t
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index e5557ae1c3..c3fa92053b 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -78,7 +78,7 @@ trait Promise[T] {
*
* @return If the promise has already been completed returns `false`, or `true` otherwise.
*/
- def trySuccess(value: T): Boolean
+ def trySuccess(value: T): Boolean = tryComplete(Right(value))
/** Completes the promise with an exception.
*
@@ -96,7 +96,7 @@ trait Promise[T] {
*
* @return If the promise has already been completed returns `false`, or `true` otherwise.
*/
- def tryFailure(t: Throwable): Boolean
+ def tryFailure(t: Throwable): Boolean = tryComplete(Left(t))
/** Wraps a `Throwable` in an `ExecutionException` if necessary.
*
@@ -112,15 +112,7 @@ trait Promise[T] {
object Promise {
- /*
- /**
- * Creates a non-completed, new, Promise with the supplied timeout in milliseconds
- */
- def apply[A](timeout: Timeout)(implicit dispatcher: MessageDispatcher): Promise[A] = DefaultPromise[A](timeout)
-
- /**
- * Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf)
- */
- def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[A] = apply(timeout)
- */
+
+
+
}
diff --git a/src/library/scala/concurrent/akka/AbstractPromise.java b/src/library/scala/concurrent/akka/AbstractPromise.java
new file mode 100644
index 0000000000..38c74edf2f
--- /dev/null
+++ b/src/library/scala/concurrent/akka/AbstractPromise.java
@@ -0,0 +1,21 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.akka;
+
+
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+
+
+abstract class AbstractPromise {
+ private volatile Object _ref = null;
+ protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
+}
diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala
index a47dee48e2..d3b93b9573 100644
--- a/src/library/scala/concurrent/akka/Promise.scala
+++ b/src/library/scala/concurrent/akka/Promise.scala
@@ -10,14 +10,19 @@ package scala.concurrent.akka
-import scala.concurrent.{ExecutionContext, resolver}
+import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+import scala.concurrent.{Awaitable, ExecutionContext, resolver, blocking, CanAwait, TimeoutException}
import scala.util.continuations._
+import scala.util.Duration
+import scala.annotation.tailrec
trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] {
// TODO refine answer and return types here from Any to type parameters
+ // then move this up in the hierarchy
final def <<(value: T): Future[T] @cps[Future[Any]] = shift {
cont: (Future[T] => Future[Any]) =>
@@ -61,3 +66,161 @@ trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] {
}
+
+object Promise {
+
+ def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]]
+
+ /** Represents the internal state.
+ */
+ sealed trait FState[+T] { def value: Option[Either[Throwable, T]] }
+
+ case class Pending[T](listeners: List[Either[Throwable, T] ⇒ Unit] = Nil) extends FState[T] {
+ def value: Option[Either[Throwable, T]] = None
+ }
+
+ case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
+ def result: T = value.get.right.get
+ }
+
+ case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
+ def exception: Throwable = value.get.left.get
+ }
+
+ private val emptyPendingValue = Pending[Nothing](Nil)
+
+ /* default promise implementation */
+ abstract class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] {
+ self =>
+
+ updater.set(this, Promise.EmptyPending())
+
+ protected final def tryAwait(atMost: Duration): Boolean = {
+ @tailrec
+ def awaitUnsafe(waitTimeNanos: Long): Boolean = {
+ if (value.isEmpty && waitTimeNanos > 0) {
+ val ms = NANOSECONDS.toMillis(waitTimeNanos)
+ val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec
+ val start = System.nanoTime()
+ try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ }
+
+ awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
+ } else
+ value.isDefined
+ }
+
+ executor.blockingCall(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))))
+ }
+
+ private def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
+ if (value.isDefined || tryAwait(atMost)) this
+ else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
+
+ def await(atMost: Duration)(implicit permit: CanAwait): T =
+ ready(atMost).value.get match {
+ case Left(e) => throw e
+ case Right(r) => r
+ }
+
+ def value: Option[Either[Throwable, T]] = getState.value
+
+ @inline
+ private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]]
+
+ @inline
+ protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState)
+
+ @inline
+ protected final def getState: FState[T] = updater.get(this)
+
+ /*
+ def tryComplete(value: Either[Throwable, T]): Boolean = {
+ val callbacks: List[Either[Throwable, T] => Unit] = {
+ try {
+ @tailrec
+ def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = {
+ getState match {
+ case cur @ Pending(listeners) =>
+ if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners
+ else tryComplete(v)
+ case _ => null
+ }
+ }
+ tryComplete(resolve(value))
+ } finally {
+ synchronized { notifyAll() } //Notify any evil blockers
+ }
+ }
+
+ callbacks match {
+ case null => false
+ case cs if cs.isEmpty => true
+ case cs => Future.dispatchTask(() => cs.foreach(f => notifyCompleted(f, value))); true
+ }
+ }
+
+ def onComplete(func: Either[Throwable, T] => Unit): this.type = {
+ @tailrec //Returns whether the future has already been completed or not
+ def tryAddCallback(): Boolean = {
+ val cur = getState
+ cur match {
+ case _: Success[_] | _: Failure[_] => true
+ case p: Pending[_] =>
+ val pt = p.asInstanceOf[Pending[T]]
+ if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
+ }
+ }
+
+ if (tryAddCallback()) {
+ val result = value.get
+ Future.dispatchTask(() => notifyCompleted(func, result))
+ }
+
+ this
+ }
+
+ private final def notifyCompleted(func: Either[Throwable, T] => Unit, result: Either[Throwable, T]) {
+ try { func(result) } catch { case e => logError("Future onComplete-callback raised an exception", e) }
+ }
+ */
+ }
+
+ /*
+ /**
+ * An already completed Future is seeded with it's result at creation, is useful for when you are participating in
+ * a Future-composition but you already have a value to contribute.
+ */
+ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] {
+ val value = Some(resolve(suppliedValue))
+
+ def tryComplete(value: Either[Throwable, T]): Boolean = false
+ def onComplete(func: Either[Throwable, T] => Unit): this.type = {
+ val completedAs = value.get
+ Future dispatchTask (() => func(completedAs))
+ this
+ }
+
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
+ def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
+ case Left(e) => throw e
+ case Right(r) => r
+ }
+ }
+ */
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/concurrent/akka/package.scala b/src/library/scala/concurrent/akka/package.scala
index 59eda5a3b4..8c059b8e71 100644
--- a/src/library/scala/concurrent/akka/package.scala
+++ b/src/library/scala/concurrent/akka/package.scala
@@ -11,6 +11,7 @@ package scala.concurrent
import java.{lang => jl}
+import scala.util.Duration
@@ -31,6 +32,8 @@ package object akka {
if (c.isPrimitive) toBoxed(c) else c
}
+ def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue
+
}
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala
index 716b9c02f1..a38541df5d 100644
--- a/src/library/scala/concurrent/default/TaskImpl.scala
+++ b/src/library/scala/concurrent/default/TaskImpl.scala
@@ -5,7 +5,7 @@ package default
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread }
-import scala.util.{ Timeout, Duration }
+import scala.util.Duration
import scala.annotation.tailrec
@@ -90,7 +90,7 @@ extends Promise[T] with Future[T] with Completable[T] {
case Right(v) => trySuccess(v)
}
- def trySuccess(value: T): Boolean = {
+ override def trySuccess(value: T): Boolean = {
val cbs = tryCompleteState(Success(value))
if (cbs == null)
false
@@ -103,7 +103,7 @@ extends Promise[T] with Future[T] with Completable[T] {
}
}
- def tryFailure(t: Throwable): Boolean = {
+ override def tryFailure(t: Throwable): Boolean = {
val wrapped = wrap(t)
val cbs = tryCompleteState(Failure(wrapped))
if (cbs == null)
@@ -117,7 +117,7 @@ extends Promise[T] with Future[T] with Completable[T] {
}
}
- def await(timeout: Timeout)(implicit canawait: scala.concurrent.CanAwait): T = getState match {
+ def await(atMost: Duration)(implicit canawait: scala.concurrent.CanAwait): T = getState match {
case Success(res) => res
case Failure(t) => throw t
case _ =>
@@ -191,7 +191,7 @@ extends RecursiveAction with Task[T] with Future[T] with Completable[T] {
def tryCancel(): Unit =
tryUnfork()
- def await(timeout: Timeout)(implicit canawait: CanAwait): T = {
+ def await(atMost: Duration)(implicit canawait: CanAwait): T = {
join() // TODO handle timeout also
(updater.get(this): @unchecked) match {
case Success(r) => r
@@ -257,17 +257,16 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
def promise[T]: Promise[T] =
new PromiseImpl[T](this)
- // TODO fix the timeout
- def blockingCall[T](timeout: Timeout, b: Awaitable[T]): T = b match {
+ def blockingCall[T](b: Awaitable[T]): T = b match {
case fj: TaskImpl[_] if fj.executor.pool eq pool =>
- fj.await(timeout)
+ fj.await(Duration.fromNanos(0))
case _ =>
var res: T = null.asInstanceOf[T]
@volatile var blockingDone = false
// TODO add exception handling here!
val mb = new ForkJoinPool.ManagedBlocker {
def block() = {
- res = b.await(timeout)(CanAwaitEvidence)
+ res = b.await(Duration.fromNanos(0))(CanAwaitEvidence)
blockingDone = true
true
}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index ce22c53c72..7552100af2 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -52,15 +52,17 @@ package object concurrent {
private[concurrent] def resolve[T](source: Either[Throwable, T]): Either[Throwable, T] = source match {
case Left(t: scala.runtime.NonLocalReturnControl[_]) => Right(t.value.asInstanceOf[T])
+ case Left(t: scala.util.control.ControlThrowable) => Left(new ExecutionException("Boxed ControlThrowable", t))
case Left(t: InterruptedException) => Left(new ExecutionException("Boxed InterruptedException", t))
- case Left(e: Error) => throw e
+ case Left(e: Error) => Left(new ExecutionException("Boxed Error", e))
case _ => source
}
private val resolverFunction: PartialFunction[Throwable, Either[Throwable, _]] = {
case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value)
+ case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t))
case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t))
- case e: Error => throw e
+ case e: Error => Left(new ExecutionException("Boxed Error", e))
case t => Left(t)
}
@@ -83,9 +85,12 @@ package object concurrent {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def await[T](atMost: Duration)(body: =>T): T = result(new Awaitable[T] {
- def await(timeout: Timeout)(implicit cb: CanAwait) = body
- }, atMost)
+ def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
+
+ /** Wraps a block of code into an awaitable object. */
+ def body2awaitable[T](body: =>T) = new Awaitable[T] {
+ def await(atMost: Duration)(implicit cb: CanAwait) = body
+ }
/** Blocks on a blockable object.
*
@@ -96,18 +101,23 @@ package object concurrent {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def result[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
currentExecutionContext.get match {
case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
- case x => x.blockingCall(atMost, awaitable) // inside an execution context thread
+ case x => x.blockingCall(awaitable) // inside an execution context thread
}
}
- def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = {
- result(awaitable, atMost)
- awaitable
+ object await {
+ def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = {
+ try blocking(awaitable, atMost)
+ catch { case _ => }
+ awaitable
+ }
+
+ def result[T](awaitable: Awaitable[T], atMost: Duration): T = blocking(awaitable, atMost)
}
-
+
}
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
index abd363cedf..a951c09da2 100644
--- a/test/files/jvm/scala-concurrent-tck.scala
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -10,9 +10,7 @@ import scala.concurrent.{
}
import scala.concurrent.future
import scala.concurrent.promise
-import scala.concurrent.await
-import scala.concurrent.result
-import scala.concurrent.ready
+import scala.concurrent.blocking
import scala.util.Duration
@@ -314,7 +312,7 @@ trait FutureProjections extends TestBase {
val f = future {
throw cause
}
- assert(result(f.failed, Duration(500, "ms")) == cause)
+ assert(blocking(f.failed, Duration(500, "ms")) == cause)
done()
}
@@ -322,7 +320,7 @@ trait FutureProjections extends TestBase {
done =>
val f = future { 0 }
try {
- ready(f.failed, Duration(0, "ms"))
+ blocking(f.failed, Duration(0, "ms"))
assert(false)
} catch {
case nsee: NoSuchElementException => done()
@@ -344,7 +342,7 @@ trait Blocking extends TestBase {
def testAwaitSuccess(): Unit = once {
done =>
val f = future { 0 }
- ready(f, Duration(500, "ms"))
+ blocking(f, Duration(500, "ms"))
done()
}
@@ -355,7 +353,7 @@ trait Blocking extends TestBase {
throw cause
}
try {
- ready(f, Duration(500, "ms"))
+ blocking(f, Duration(500, "ms"))
assert(false)
} catch {
case t =>