From f58ade23be8354aab223da5ca1e7162b6b53749b Mon Sep 17 00:00:00 2001 From: aleksandar Date: Thu, 19 Jan 2012 20:33:01 +0100 Subject: Add NonDeterministic evidence needed to call nondeterministic methods. --- .../scala/concurrent/ExecutionContext.scala | 20 ++++++++-- src/library/scala/concurrent/Future.scala | 43 ++++++++++++++++++---- src/library/scala/concurrent/Promise.scala | 30 +++++++++++---- src/library/scala/concurrent/akka/Promise.scala | 6 +-- .../scala/concurrent/default/TaskImpl.scala | 7 ++-- src/library/scala/concurrent/package.scala | 39 ++++---------------- 6 files changed, 91 insertions(+), 54 deletions(-) (limited to 'src/library') diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 0657121de2..260d4cb54d 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -16,6 +16,7 @@ import scala.util.Duration import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread } import scala.collection.generic.CanBuildFrom import collection._ +import annotation.implicitNotFound @@ -41,10 +42,21 @@ trait ExecutionContext { private implicit val executionContext = this + def keptPromise[T](result: T): Promise[T] = { + val p = promise[T] + p success result + } + + def brokenPromise[T](t: Throwable): Promise[T] = { + val p = promise[T] + p failure t + } + /** TODO some docs * */ def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = { + import nondeterministic._ val buffer = new mutable.ArrayBuffer[T] val counter = new AtomicInteger(1) // how else could we do this? val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature @@ -78,7 +90,8 @@ trait ExecutionContext { /** TODO some docs * */ - def any[T](futures: Traversable[Future[T]]): Future[T] = { + @implicitNotFound(msg = "Calling this method yields non-deterministic programs.") + def any[T](futures: Traversable[Future[T]])(implicit nondet: NonDeterministic): Future[T] = { val p = promise[T] val completeFirst: Either[Throwable, T] => Unit = elem => p tryComplete elem @@ -90,8 +103,9 @@ trait ExecutionContext { /** TODO some docs * */ - def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = { - if (futures.isEmpty) Promise.successful[Option[T]](None).future + @implicitNotFound(msg = "Calling this method yields non-deterministic programs.") + def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit nondet: NonDeterministic): Future[Option[T]] = { + if (futures.isEmpty) Promise.kept[Option[T]](None).future else { val result = promise[Option[T]] val count = new AtomicInteger(futures.size) diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 92e50b1f89..4f89aa483d 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -10,20 +10,21 @@ package scala.concurrent -import scala.util.{ Timeout, Duration } -import scala.Option - import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS } import java.lang.{ Iterable => JIterable } import java.util.{ LinkedList => JLinkedList } +import java.{ lang => jl } +import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } + +import scala.util.{ Timeout, Duration } +import scala.Option import scala.annotation.tailrec import scala.collection.mutable.Stack -import java.{ lang => jl } -import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom +import scala.annotation.implicitNotFound @@ -357,6 +358,34 @@ self => p.future } + /** Creates a new future which holds the result of either this future or `that` future, depending on + * which future was completed first. + * + * $nonDeterministic + * + * Example: + * {{{ + * val f = future { sys.error("failed") } + * val g = future { 5 } + * val h = f either g + * await(0) h // evaluates to either 5 or throws a runtime exception + * }}} + */ + @implicitNotFound(msg = "Calling this method yields non-deterministic programs.") + def either[U >: T](that: Future[U])(implicit nondet: NonDeterministic): Future[U] = { + val p = self.newPromise[U] + + val completePromise: PartialFunction[Either[Throwable, U], _] = { + case Left(t) => p tryFailure t + case Right(v) => p trySuccess v + } + + self onComplete completePromise + that onComplete completePromise + + p.future + } + } @@ -377,9 +406,9 @@ object Future { // move this to future companion object @inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body) - def any[T](futures: Traversable[Future[T]])(implicit ec: ExecutionContext): Future[T] = ec.any(futures) + def any[T](futures: Traversable[Future[T]])(implicit ec: ExecutionContext, nondet: NonDeterministic): Future[T] = ec.any(futures) - def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit ec: ExecutionContext): Future[Option[T]] = ec.find(futures)(predicate) + def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit ec: ExecutionContext, nondet: NonDeterministic): Future[Option[T]] = ec.find(futures)(predicate) } diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index e9b83cba73..6aa04eff9f 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -10,6 +10,7 @@ package scala.concurrent +import scala.annotation.implicitNotFound @@ -30,6 +31,8 @@ package scala.concurrent */ trait Promise[T] { + import nondeterministic._ + /** Future containing the value of this promise. */ def future: Future[T] @@ -48,7 +51,8 @@ trait Promise[T] { * * @return If the promise has already been completed returns `false`, or `true` otherwise. */ - def tryComplete(result: Either[Throwable, T]): Boolean + @implicitNotFound(msg = "Calling this method yields non-deterministic programs.") + def tryComplete(result: Either[Throwable, T])(implicit nondet: NonDeterministic): Boolean /** Completes this promise with the specified future, once that future is completed. * @@ -75,7 +79,8 @@ trait Promise[T] { * * @return If the promise has already been completed returns `false`, or `true` otherwise. */ - def trySuccess(value: T): Boolean = tryComplete(Right(value)) + @implicitNotFound(msg = "Calling this method yields non-deterministic programs.") + def trySuccess(value: T)(implicit nondet: NonDeterministic): Boolean = tryComplete(Right(value))(nonDeterministicEvidence) /** Completes the promise with an exception. * @@ -93,7 +98,8 @@ trait Promise[T] { * * @return If the promise has already been completed returns `false`, or `true` otherwise. */ - def tryFailure(t: Throwable): Boolean = tryComplete(Left(t)) + @implicitNotFound(msg = "Calling this method yields non-deterministic programs.") + def tryFailure(t: Throwable)(implicit nondet: NonDeterministic): Boolean = tryComplete(Left(t))(nonDeterministicEvidence) /** Wraps a `Throwable` in an `ExecutionException` if necessary. * @@ -112,9 +118,19 @@ trait Promise[T] { object Promise { - def successful[T](result: T): Promise[T] = { - val p = promise[T]() - p.success(result) - } + def kept[T](result: T)(implicit execctx: ExecutionContext): Promise[T] = + execctx keptPromise result + + def broken[T](t: Throwable)(implicit execctx: ExecutionContext): Promise[T] = + execctx brokenPromise t } + + + + + + + + + diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala index d688d3d850..e36d237e82 100644 --- a/src/library/scala/concurrent/akka/Promise.scala +++ b/src/library/scala/concurrent/akka/Promise.scala @@ -16,7 +16,7 @@ import scala.concurrent.{Awaitable, ExecutionContext, resolve, resolver, blockin //import scala.util.continuations._ import scala.util.Duration import scala.annotation.tailrec - +import scala.concurrent.NonDeterministic trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { @@ -142,7 +142,7 @@ object Promise { @inline protected final def getState: FState[T] = updater.get(this) - def tryComplete(value: Either[Throwable, T]): Boolean = { + def tryComplete(value: Either[Throwable, T])(implicit nd: NonDeterministic): Boolean = { val callbacks: List[Either[Throwable, T] => Any] = { try { @tailrec @@ -210,7 +210,7 @@ object Promise { final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContextImpl) extends Promise[T] { val value = Some(resolve(suppliedValue)) - def tryComplete(value: Either[Throwable, T]): Boolean = false + def tryComplete(value: Either[Throwable, T])(implicit nondet: NonDeterministic): Boolean = false def onComplete[U](func: Either[Throwable, T] => U): this.type = { val completedAs = value.get diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala index 59037cc48b..771cf02ec1 100644 --- a/src/library/scala/concurrent/default/TaskImpl.scala +++ b/src/library/scala/concurrent/default/TaskImpl.scala @@ -7,6 +7,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread } import scala.util.Duration import scala.annotation.tailrec +import scala.concurrent.NonDeterministic @@ -85,12 +86,12 @@ extends Promise[T] with Future[T] with Completable[T] { case _ => null } - def tryComplete(r: Either[Throwable, T]) = r match { + def tryComplete(r: Either[Throwable, T])(implicit nd: NonDeterministic) = r match { case Left(t) => tryFailure(t) case Right(v) => trySuccess(v) } - override def trySuccess(value: T): Boolean = { + override def trySuccess(value: T)(implicit nd: NonDeterministic): Boolean = { val cbs = tryCompleteState(Success(value)) if (cbs == null) false @@ -103,7 +104,7 @@ extends Promise[T] with Future[T] with Completable[T] { } } - override def tryFailure(t: Throwable): Boolean = { + override def tryFailure(t: Throwable)(implicit nd: NonDeterministic): Boolean = { val wrapped = wrap(t) val cbs = tryCompleteState(Failure(wrapped)) if (cbs == null) diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index 73da8469e6..3e60ffe8de 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -120,7 +120,7 @@ package object concurrent { */ object nondeterministic { - implicit def future2nondeterministic[T](f: Future[T]) = new NondeterministicFuture[T](f) + implicit val nonDeterministicEvidence = new NonDeterministic {} } @@ -140,36 +140,13 @@ package concurrent { def this(origin: Future[_]) = this(origin, "Future timed out.") } - private[concurrent] class NondeterministicFuture[+T](self: Future[T]) { - - /** Creates a new future which holds the result of either this future or `that` future, depending on - * which future was completed first. - * - * $nonDeterministic - * - * Example: - * {{{ - * val f = future { sys.error("failed") } - * val g = future { 5 } - * val h = f either g - * await(0) h // evaluates to either 5 or throws a runtime exception - * }}} - */ - def either[U >: T](that: Future[U]): Future[U] = { - val p = self.newPromise[U] - - val completePromise: PartialFunction[Either[Throwable, U], _] = { - case Left(t) => p tryFailure t - case Right(v) => p trySuccess v - } - - self onComplete completePromise - that onComplete completePromise - - p.future - } - - } + /** Evidence that the program can be nondeterministic. + * + * Programs in which such an evidence is available in scope + * can contain calls to methods which yield nondeterministic + * programs. + */ + sealed trait NonDeterministic } -- cgit v1.2.3