diff options
author | Paul Phillips <paulp@improving.org> | 2012-02-17 12:46:02 -0800 |
---|---|---|
committer | Paul Phillips <paulp@improving.org> | 2012-02-17 12:46:02 -0800 |
commit | a77d90b893593a2e2675110bde16e393cc2d0329 (patch) | |
tree | 4e681560bfcf3eb611e9e328de1e8e0b9374a6ef /src/library/scala/concurrent | |
parent | 35b81d14778d2c6e8392ae51c53652f48b52b488 (diff) | |
parent | ab84c8d9a97b41728e77f7808eda2748d052ca06 (diff) | |
download | scala-a77d90b893593a2e2675110bde16e393cc2d0329.tar.gz scala-a77d90b893593a2e2675110bde16e393cc2d0329.tar.bz2 scala-a77d90b893593a2e2675110bde16e393cc2d0329.zip |
Merge remote-tracking branch 'phaller/execution-context' into feb17-alex
Diffstat (limited to 'src/library/scala/concurrent')
24 files changed, 1898 insertions, 115 deletions
diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala new file mode 100644 index 0000000000..c38e668f30 --- /dev/null +++ b/src/library/scala/concurrent/Awaitable.scala @@ -0,0 +1,24 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + + + +import scala.annotation.implicitNotFound +import scala.util.Duration + + + +trait Awaitable[+T] { + @implicitNotFound(msg = "Waiting must be done by calling `blocking(timeout) b`, where `b` is the `Awaitable` object or a potentially blocking piece of code.") + def await(atMost: Duration)(implicit canawait: CanAwait): T +} + + + diff --git a/src/library/scala/concurrent/Channel.scala b/src/library/scala/concurrent/Channel.scala index 43d684641e..e79f76430f 100644 --- a/src/library/scala/concurrent/Channel.scala +++ b/src/library/scala/concurrent/Channel.scala @@ -23,7 +23,7 @@ class Channel[A] { private var written = new LinkedList[A] // FIFO buffer, realized through private var lastWritten = written // aliasing of a linked list private var nreaders = 0 - + /** * @param x ... */ @@ -33,7 +33,7 @@ class Channel[A] { lastWritten = lastWritten.next if (nreaders > 0) notify() } - + def read: A = synchronized { while (written.next == null) { try { @@ -46,4 +46,5 @@ class Channel[A] { written = written.next x } + } diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala index e308c3b5a6..0b7f54a27a 100644 --- a/src/library/scala/concurrent/DelayedLazyVal.scala +++ b/src/library/scala/concurrent/DelayedLazyVal.scala @@ -8,7 +8,6 @@ package scala.concurrent -import ops.future /** A `DelayedLazyVal` is a wrapper for lengthy computations which have a * valid partially computed result. @@ -27,21 +26,23 @@ import ops.future class DelayedLazyVal[T](f: () => T, body: => Unit) { @volatile private[this] var _isDone = false private[this] lazy val complete = f() - + /** Whether the computation is complete. * * @return true if the computation is complete. */ def isDone = _isDone - + /** The current result of f(), or the final result if complete. * * @return the current value */ def apply(): T = if (isDone) complete else f() - - future { + + // TODO replace with scala.concurrent.future { ... } + ops.future { body _isDone = true } + } diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala new file mode 100644 index 0000000000..99cd264ac5 --- /dev/null +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -0,0 +1,132 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + + + +import java.util.concurrent.atomic.{ AtomicInteger } +import java.util.concurrent.{ Executors, Future => JFuture, Callable } +import scala.util.Duration +import scala.util.{ Try, Success, Failure } +import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread } +import scala.collection.generic.CanBuildFrom +import collection._ + + + +trait ExecutionContext { + + protected implicit object CanAwaitEvidence extends CanAwait + + def execute(runnable: Runnable): Unit + + def execute[U](body: () => U): Unit + + def promise[T]: Promise[T] + + def future[T](body: Callable[T]): Future[T] = future(body.call()) + + def future[T](body: => T): Future[T] + + def blocking[T](atMost: Duration)(body: =>T): T + + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T + + def reportFailure(t: Throwable): Unit + + /* implementations follow */ + + private implicit val executionContext = this + + def keptPromise[T](result: T): Promise[T] = { + val p = promise[T] + p success result + } + + def brokenPromise[T](t: Throwable): Promise[T] = { + val p = promise[T] + p failure t + } + + /** TODO some docs + * + */ + def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = { + import nondeterministic._ + val buffer = new mutable.ArrayBuffer[T] + val counter = new AtomicInteger(1) // how else could we do this? + val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature + var idx = 0 + + def tryFinish() = if (counter.decrementAndGet() == 0) { + val builder = cbf(futures) + builder ++= buffer + p success builder.result + } + + for (f <- futures) { + val currentIndex = idx + buffer += null.asInstanceOf[T] + counter.incrementAndGet() + f onComplete { + case Failure(t) => + p tryFailure t + case Success(v) => + buffer(currentIndex) = v + tryFinish() + } + idx += 1 + } + + tryFinish() + + p.future + } + + /** TODO some docs + * + */ + def any[T](futures: Traversable[Future[T]]): Future[T] = { + val p = promise[T] + val completeFirst: Try[T] => Unit = elem => p tryComplete elem + + futures foreach (_ onComplete completeFirst) + + p.future + } + + /** TODO some docs + * + */ + def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = { + if (futures.isEmpty) Promise.kept[Option[T]](None).future + else { + val result = promise[Option[T]] + val count = new AtomicInteger(futures.size) + val search: Try[T] => Unit = { + v => v match { + case Success(r) => if (predicate(r)) result trySuccess Some(r) + case _ => + } + if (count.decrementAndGet() == 0) result trySuccess None + } + + futures.foreach(_ onComplete search) + + result.future + } + } + +} + + +sealed trait CanAwait + + + diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala new file mode 100644 index 0000000000..73f76bbbfb --- /dev/null +++ b/src/library/scala/concurrent/Future.scala @@ -0,0 +1,492 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + + + +import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } +import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS } +import java.lang.{ Iterable => JIterable } +import java.util.{ LinkedList => JLinkedList } +import java.{ lang => jl } +import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } + +import scala.util.{ Timeout, Duration, Try, Success, Failure } +import scala.Option + +import scala.annotation.tailrec +import scala.collection.mutable.Stack +import scala.collection.mutable.Builder +import scala.collection.generic.CanBuildFrom + + + +/** The trait that represents futures. + * + * Asynchronous computations that yield futures are created with the `future` call: + * + * {{{ + * val s = "Hello" + * val f: Future[String] = future { + * s + " future!" + * } + * f onSuccess { + * case msg => println(msg) + * } + * }}} + * + * @author Philipp Haller, Heather Miller, Aleksandar Prokopec, Viktor Klang + * + * @define multipleCallbacks + * Multiple callbacks may be registered; there is no guarantee that they will be + * executed in a particular order. + * + * @define caughtThrowables + * The future may contain a throwable object and this means that the future failed. + * Futures obtained through combinators have the same exception as the future they were obtained from. + * The following throwable objects are not contained in the future: + * - `Error` - errors are not contained within futures + * - `InterruptedException` - not contained within futures + * - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures + * + * Instead, the future is completed with a ExecutionException with one of the exceptions above + * as the cause. + * If a future is failed with a `scala.runtime.NonLocalReturnControl`, + * it is completed with a value instead from that throwable instead instead. + * + * @define nonDeterministic + * Note: using this method yields nondeterministic dataflow programs. + * + * @define forComprehensionExamples + * Example: + * + * {{{ + * val f = future { 5 } + * val g = future { 3 } + * val h = for { + * x: Int <- f // returns Future(5) + * y: Int <- g // returns Future(5) + * } yield x + y + * }}} + * + * is translated to: + * + * {{{ + * f flatMap { (x: Int) => g map { (y: Int) => x + y } } + * }}} + */ +trait Future[+T] extends Awaitable[T] { +self => + + /* Callbacks */ + + /** When this future is completed successfully (i.e. with a value), + * apply the provided partial function to the value if the partial function + * is defined at that value. + * + * If the future has already been completed with a value, + * this will either be applied immediately or be scheduled asynchronously. + * + * $multipleCallbacks + */ + def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete { + case Failure(t) => // do nothing + case Success(v) => if (pf isDefinedAt v) pf(v) else { /*do nothing*/ } + } + + /** When this future is completed with a failure (i.e. with a throwable), + * apply the provided callback to the throwable. + * + * $caughtThrowables + * + * If the future has already been completed with a failure, + * this will either be applied immediately or be scheduled asynchronously. + * + * Will not be called in case that the future is completed with a value. + * + * $multipleCallbacks + */ + def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete { + case Failure(t) => if (isFutureThrowable(t) && callback.isDefinedAt(t)) callback(t) else { /*do nothing*/ } + case Success(v) => // do nothing + } + + /** When this future is completed, either through an exception, a timeout, or a value, + * apply the provided function. + * + * If the future has already been completed, + * this will either be applied immediately or be scheduled asynchronously. + * + * $multipleCallbacks + */ + def onComplete[U](func: Try[T] => U): this.type + + + /* Miscellaneous */ + + /** Creates a new promise. + */ + def newPromise[S]: Promise[S] + + + /* Projections */ + + /** Returns a failed projection of this future. + * + * The failed projection is a future holding a value of type `Throwable`. + * + * It is completed with a value which is the throwable of the original future + * in case the original future is failed. + * + * It is failed with a `NoSuchElementException` if the original future is completed successfully. + * + * Blocking on this future returns a value if the original future is completed with an exception + * and throws a corresponding exception if the original future fails. + */ + def failed: Future[Throwable] = { + def noSuchElem(v: T) = + new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v) + + val p = newPromise[Throwable] + + onComplete { + case Failure(t) => p success t + case Success(v) => p failure noSuchElem(v) + } + + p.future + } + + + /* Monadic operations */ + + /** Asynchronously processes the value in the future once the value becomes available. + * + * Will not be called if the future fails. + */ + def foreach[U](f: T => U): Unit = onComplete { + case Success(r) => f(r) + case Failure(_) => // do nothing + } + + /** Creates a new future by applying a function to the successful result of + * this future. If this future is completed with an exception then the new + * future will also contain this exception. + * + * $forComprehensionExample + */ + def map[S](f: T => S): Future[S] = { + val p = newPromise[S] + + onComplete { + case Failure(t) => p failure t + case Success(v) => + try p success f(v) + catch { + case t => p complete resolver(t) + } + } + + p.future + } + + /** Creates a new future by applying a function to the successful result of + * this future, and returns the result of the function as the new future. + * If this future is completed with an exception then the new future will + * also contain this exception. + * + * $forComprehensionExample + */ + def flatMap[S](f: T => Future[S]): Future[S] = { + val p = newPromise[S] + + onComplete { + case Failure(t) => p failure t + case Success(v) => + try { + f(v) onComplete { + case Failure(t) => p failure t + case Success(v) => p success v + } + } catch { + case t: Throwable => p complete resolver(t) + } + } + + p.future + } + + /** Creates a new future by filtering the value of the current future with a predicate. + * + * If the current future contains a value which satisfies the predicate, the new future will also hold that value. + * Otherwise, the resulting future will fail with a `NoSuchElementException`. + * + * If the current future fails or times out, the resulting future also fails or times out, respectively. + * + * Example: + * {{{ + * val f = future { 5 } + * val g = f filter { _ % 2 == 1 } + * val h = f filter { _ % 2 == 0 } + * await(0) g // evaluates to 5 + * await(0) h // throw a NoSuchElementException + * }}} + */ + def filter(pred: T => Boolean): Future[T] = { + val p = newPromise[T] + + onComplete { + case Failure(t) => p failure t + case Success(v) => + try { + if (pred(v)) p success v + else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v) + } catch { + case t: Throwable => p complete resolver(t) + } + } + + p.future + } + + /** Creates a new future by mapping the value of the current future if the given partial function is defined at that value. + * + * If the current future contains a value for which the partial function is defined, the new future will also hold that value. + * Otherwise, the resulting future will fail with a `NoSuchElementException`. + * + * If the current future fails or times out, the resulting future also fails or times out, respectively. + * + * Example: + * {{{ + * val f = future { -5 } + * val g = f collect { + * case x if x < 0 => -x + * } + * val h = f collect { + * case x if x > 0 => x * 2 + * } + * await(0) g // evaluates to 5 + * await(0) h // throw a NoSuchElementException + * }}} + */ + def collect[S](pf: PartialFunction[T, S]): Future[S] = { + val p = newPromise[S] + + onComplete { + case Failure(t) => p failure t + case Success(v) => + try { + if (pf.isDefinedAt(v)) p success pf(v) + else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v) + } catch { + case t: Throwable => p complete resolver(t) + } + } + + p.future + } + + /** Creates a new future that will handle any matching throwable that this + * future might contain. If there is no match, or if this future contains + * a valid result then the new future will contain the same. + * + * Example: + * + * {{{ + * future (6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0 + * future (6 / 0) recover { case e: NotFoundException ⇒ 0 } // result: exception + * future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3 + * }}} + */ + def recover[U >: T](pf: PartialFunction[Throwable, U]): Future[U] = { + val p = newPromise[U] + + onComplete { + case Failure(t) if pf isDefinedAt t => + try { p success pf(t) } + catch { case t: Throwable => p complete resolver(t) } + case otherwise => p complete otherwise + } + + p.future + } + + /** Creates a new future that will handle any matching throwable that this + * future might contain by assigning it a value of another future. + * + * If there is no match, or if this future contains + * a valid result then the new future will contain the same result. + * + * Example: + * + * {{{ + * val f = future { Int.MaxValue } + * future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue + * }}} + */ + def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = { + val p = newPromise[U] + + onComplete { + case Failure(t) if pf isDefinedAt t => + try { + p completeWith pf(t) + } catch { + case t: Throwable => p complete resolver(t) + } + case otherwise => p complete otherwise + } + + p.future + } + + /** Zips the values of `this` and `that` future, and creates + * a new future holding the tuple of their results. + * + * If `this` future fails, the resulting future is failed + * with the throwable stored in `this`. + * Otherwise, if `that` future fails, the resulting future is failed + * with the throwable stored in `that`. + */ + def zip[U](that: Future[U]): Future[(T, U)] = { + val p = newPromise[(T, U)] + + this onComplete { + case Failure(t) => p failure t + case Success(r) => that onSuccess { + case r2 => p success ((r, r2)) + } + } + + that onFailure { + case f => p failure f + } + + p.future + } + + /** Creates a new future which holds the result of this future if it was completed successfully, or, if not, + * the result of the `that` future if `that` is completed successfully. + * If both futures are failed, the resulting future holds the throwable object of the first future. + * + * Using this method will not cause concurrent programs to become nondeterministic. + * + * Example: + * {{{ + * val f = future { sys.error("failed") } + * val g = future { 5 } + * val h = f orElse g + * await(0) h // evaluates to 5 + * }}} + */ + def fallbackTo[U >: T](that: Future[U]): Future[U] = { + val p = newPromise[U] + + onComplete { + case Failure(t) => that onComplete { + case Failure(_) => p failure t + case Success(v) => p success v + } + case Success(v) => p success v + } + + p.future + } + + /** Applies the side-effecting function to the result of this future, and returns + * a new future with the result of this future. + * + * This method allows one to enforce that the callbacks are executed in a + * specified order. + * + * Note that if one of the chained `andThen` callbacks throws + * an exception, that exception is not propagated to the subsequent `andThen` + * callbacks. Instead, the subsequent `andThen` callbacks are given the original + * value of this future. + * + * The following example prints out `5`: + * + * {{{ + * val f = future { 5 } + * f andThen { + * case r => sys.error("runtime exception") + * } andThen { + * case Failure(t) => println(t) + * case Success(v) => println(v) + * } + * }}} + */ + def andThen[U](pf: PartialFunction[Try[T], U]): Future[T] = { + val p = newPromise[T] + + onComplete { + case r => + try if (pf isDefinedAt r) pf(r) + finally p complete r + } + + p.future + } + + /** Creates a new future which holds the result of either this future or `that` future, depending on + * which future was completed first. + * + * $nonDeterministic + * + * Example: + * {{{ + * val f = future { sys.error("failed") } + * val g = future { 5 } + * val h = f either g + * await(0) h // evaluates to either 5 or throws a runtime exception + * }}} + */ + def either[U >: T](that: Future[U]): Future[U] = { + val p = self.newPromise[U] + + val completePromise: PartialFunction[Try[U], _] = { + case Failure(t) => p tryFailure t + case Success(v) => p trySuccess v + } + + self onComplete completePromise + that onComplete completePromise + + p.future + } + +} + + + +/** TODO some docs + * + * @define nonDeterministic + * Note: using this method yields nondeterministic dataflow programs. + */ +object Future { + + // TODO make more modular by encoding all other helper methods within the execution context + /** TODO some docs + */ + def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]], ec: ExecutionContext): Future[Coll[T]] = + ec.all[T, Coll](futures) + + // move this to future companion object + @inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body) + + def any[T](futures: Traversable[Future[T]])(implicit ec: ExecutionContext): Future[T] = ec.any(futures) + + def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit ec: ExecutionContext): Future[Option[T]] = ec.find(futures)(predicate) + +} + + + + diff --git a/src/library/scala/concurrent/FutureTaskRunner.scala b/src/library/scala/concurrent/FutureTaskRunner.scala index c5fcde2d19..75e6299ad9 100644 --- a/src/library/scala/concurrent/FutureTaskRunner.scala +++ b/src/library/scala/concurrent/FutureTaskRunner.scala @@ -13,6 +13,7 @@ package scala.concurrent * * @author Philipp Haller */ +@deprecated("Use `ExecutionContext`s instead.", "2.10.0") trait FutureTaskRunner extends TaskRunner { /** The type of the futures that the underlying task runner supports. diff --git a/src/library/scala/concurrent/JavaConversions.scala b/src/library/scala/concurrent/JavaConversions.scala index db3c490882..bac9d4f558 100644 --- a/src/library/scala/concurrent/JavaConversions.scala +++ b/src/library/scala/concurrent/JavaConversions.scala @@ -17,6 +17,7 @@ import java.util.concurrent.{ExecutorService, Executor} */ object JavaConversions { + @deprecated("Use `asExecutionContext` instead.", "2.10.0") implicit def asTaskRunner(exec: ExecutorService): FutureTaskRunner = new ThreadPoolRunner { override protected def executor = @@ -26,6 +27,7 @@ object JavaConversions { exec.shutdown() } + @deprecated("Use `asExecutionContext` instead.", "2.10.0") implicit def asTaskRunner(exec: Executor): TaskRunner = new TaskRunner { type Task[T] = Runnable @@ -46,4 +48,9 @@ object JavaConversions { // do nothing } } + + implicit def asExecutionContext(exec: ExecutorService): ExecutionContext = null // TODO + + implicit def asExecutionContext(exec: Executor): ExecutionContext = null // TODO + } diff --git a/src/library/scala/concurrent/ManagedBlocker.scala b/src/library/scala/concurrent/ManagedBlocker.scala index 9c6f4d51d6..0b6d82e76f 100644 --- a/src/library/scala/concurrent/ManagedBlocker.scala +++ b/src/library/scala/concurrent/ManagedBlocker.scala @@ -12,6 +12,7 @@ package scala.concurrent * * @author Philipp Haller */ +@deprecated("Not used.", "2.10.0") trait ManagedBlocker { /** diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala new file mode 100644 index 0000000000..f26deb77ab --- /dev/null +++ b/src/library/scala/concurrent/Promise.scala @@ -0,0 +1,132 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + +import scala.util.{ Try, Success, Failure } + + + + +/** Promise is an object which can be completed with a value or failed + * with an exception. + * + * @define promiseCompletion + * If the promise has already been fulfilled, failed or has timed out, + * calling this method will throw an IllegalStateException. + * + * @define allowedThrowables + * If the throwable used to fail this promise is an error, a control exception + * or an interrupted exception, it will be wrapped as a cause within an + * `ExecutionException` which will fail the promise. + * + * @define nonDeterministic + * Note: Using this method may result in non-deterministic concurrent programs. + */ +trait Promise[T] { + + import nondeterministic._ + + /** Future containing the value of this promise. + */ + def future: Future[T] + + /** Completes the promise with either an exception or a value. + * + * @param result Either the value or the exception to complete the promise with. + * + * $promiseCompletion + */ + def complete(result:Try[T]): this.type = if (tryComplete(result)) this else throwCompleted + + /** Tries to complete the promise with either a value or the exception. + * + * $nonDeterministic + * + * @return If the promise has already been completed returns `false`, or `true` otherwise. + */ + def tryComplete(result: Try[T]): Boolean + + /** Completes this promise with the specified future, once that future is completed. + * + * @return This promise + */ + final def completeWith(other: Future[T]): this.type = { + other onComplete { + this complete _ + } + this + } + + /** Completes the promise with a value. + * + * @param value The value to complete the promise with. + * + * $promiseCompletion + */ + def success(v: T): this.type = if (trySuccess(v)) this else throwCompleted + + /** Tries to complete the promise with a value. + * + * $nonDeterministic + * + * @return If the promise has already been completed returns `false`, or `true` otherwise. + */ + def trySuccess(value: T): Boolean = tryComplete(Success(value)) + + /** Completes the promise with an exception. + * + * @param t The throwable to complete the promise with. + * + * $allowedThrowables + * + * $promiseCompletion + */ + def failure(t: Throwable): this.type = if (tryFailure(t)) this else throwCompleted + + /** Tries to complete the promise with an exception. + * + * $nonDeterministic + * + * @return If the promise has already been completed returns `false`, or `true` otherwise. + */ + def tryFailure(t: Throwable): Boolean = tryComplete(Failure(t)) + + /** Wraps a `Throwable` in an `ExecutionException` if necessary. TODO replace with `resolver` from scala.concurrent + * + * $allowedThrowables + */ + protected def wrap(t: Throwable): Throwable = t match { + case t: Throwable if isFutureThrowable(t) => t + case _ => new ExecutionException(t) + } + + private def throwCompleted = throw new IllegalStateException("Promise already completed.") + +} + + + +object Promise { + + def kept[T](result: T)(implicit execctx: ExecutionContext): Promise[T] = + execctx keptPromise result + + def broken[T](t: Throwable)(implicit execctx: ExecutionContext): Promise[T] = + execctx brokenPromise t + +} + + + + + + + + + diff --git a/src/library/scala/concurrent/Scheduler.scala b/src/library/scala/concurrent/Scheduler.scala new file mode 100644 index 0000000000..39d798e6b4 --- /dev/null +++ b/src/library/scala/concurrent/Scheduler.scala @@ -0,0 +1,54 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + +import scala.util.Duration + +/** A service for scheduling tasks and thunks for one-time, or periodic execution. + */ +trait Scheduler { + + /** Schedules a thunk for repeated execution with an initial delay and a frequency. + * + * @param delay the initial delay after which the thunk should be executed + * the first time + * @param frequency the frequency with which the thunk should be executed, + * as a time period between subsequent executions + */ + def schedule(delay: Duration, frequency: Duration)(thunk: => Unit): Cancellable + + /** Schedules a task for execution after a given delay. + * + * @param delay the duration after which the task should be executed + * @param task the task that is scheduled for execution + * @return a `Cancellable` that may be used to cancel the execution + * of the task + */ + def scheduleOnce(delay: Duration, task: Runnable): Cancellable + + /** Schedules a thunk for execution after a given delay. + * + * @param delay the duration after which the thunk should be executed + * @param thunk the thunk that is scheduled for execution + * @return a `Cancellable` that may be used to cancel the execution + * of the thunk + */ + def scheduleOnce(delay: Duration)(task: => Unit): Cancellable + +} + + + +trait Cancellable { + + /** Cancels the underlying task. + */ + def cancel(): Unit + +} diff --git a/src/library/scala/concurrent/Task.scala b/src/library/scala/concurrent/Task.scala new file mode 100644 index 0000000000..d6f86bac31 --- /dev/null +++ b/src/library/scala/concurrent/Task.scala @@ -0,0 +1,13 @@ +package scala.concurrent + + + +trait Task[+T] { + + def start(): Unit + + def future: Future[T] + +} + + diff --git a/src/library/scala/concurrent/TaskRunner.scala b/src/library/scala/concurrent/TaskRunner.scala index 64e62adfd3..500d79e07f 100644 --- a/src/library/scala/concurrent/TaskRunner.scala +++ b/src/library/scala/concurrent/TaskRunner.scala @@ -12,6 +12,7 @@ package scala.concurrent * * @author Philipp Haller */ +@deprecated("Use `ExecutionContext`s instead.", "2.10.0") trait TaskRunner { type Task[T] diff --git a/src/library/scala/concurrent/TaskRunners.scala b/src/library/scala/concurrent/TaskRunners.scala index 588073dc5e..7994255b25 100644 --- a/src/library/scala/concurrent/TaskRunners.scala +++ b/src/library/scala/concurrent/TaskRunners.scala @@ -14,6 +14,7 @@ import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit} * * @author Philipp Haller */ +@deprecated("Use `ExecutionContext`s instead.", "2.10.0") object TaskRunners { implicit val threadRunner: FutureTaskRunner = diff --git a/src/library/scala/concurrent/ThreadPoolRunner.scala b/src/library/scala/concurrent/ThreadPoolRunner.scala index 27d8f2cc32..a3e0253634 100644 --- a/src/library/scala/concurrent/ThreadPoolRunner.scala +++ b/src/library/scala/concurrent/ThreadPoolRunner.scala @@ -15,6 +15,7 @@ import java.util.concurrent.{ExecutorService, Callable, TimeUnit} * * @author Philipp Haller */ +@deprecated("Use `ExecutionContext`s instead.", "2.10.0") trait ThreadPoolRunner extends FutureTaskRunner { type Task[T] = Callable[T] with Runnable diff --git a/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled b/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled new file mode 100644 index 0000000000..745d2d1a15 --- /dev/null +++ b/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled @@ -0,0 +1,44 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent +package default + +import scala.util.Duration + +private[concurrent] final class SchedulerImpl extends Scheduler { + private val timer = + new java.util.Timer(true) // the associated thread runs as a daemon + + def schedule(delay: Duration, frequency: Duration)(thunk: => Unit): Cancellable = ??? + + def scheduleOnce(delay: Duration, task: Runnable): Cancellable = { + val timerTask = new java.util.TimerTask { + def run(): Unit = + task.run() + } + timer.schedule(timerTask, delay.toMillis) + new Cancellable { + def cancel(): Unit = + timerTask.cancel() + } + } + + def scheduleOnce(delay: Duration)(task: => Unit): Cancellable = { + val timerTask = new java.util.TimerTask { + def run(): Unit = + task + } + timer.schedule(timerTask, delay.toMillis) + new Cancellable { + def cancel(): Unit = + timerTask.cancel() + } + } + +} diff --git a/src/library/scala/concurrent/default/TaskImpl.scala.disabled b/src/library/scala/concurrent/default/TaskImpl.scala.disabled new file mode 100644 index 0000000000..94e54cb372 --- /dev/null +++ b/src/library/scala/concurrent/default/TaskImpl.scala.disabled @@ -0,0 +1,313 @@ +package scala.concurrent +package default + + + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater +import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread } +import scala.util.Try +import scala.util +import scala.util.Duration +import scala.annotation.tailrec + + + +private[concurrent] trait Completable[T] { +self: Future[T] => + + val executor: ExecutionContextImpl + + def newPromise[S]: Promise[S] = executor promise + + type Callback = Try[T] => Any + + def getState: State[T] + + def casState(oldv: State[T], newv: State[T]): Boolean + + protected def dispatch[U](r: Runnable) = executionContext execute r + + protected def processCallbacks(cbs: List[Callback], r: Try[T]) = + for (cb <- cbs) dispatch(new Runnable { + override def run() = cb(r) + }) + + def future: Future[T] = self + + def onComplete[U](callback: Try[T] => U): this.type = { + @tailrec def tryAddCallback(): Try[T] = { + getState match { + case p @ Pending(lst) => + val pt = p.asInstanceOf[Pending[T]] + if (casState(pt, Pending(callback :: pt.callbacks))) null + else tryAddCallback() + case Success(res) => util.Success(res) + case Failure(t) => util.Failure(t) + } + } + + val res = tryAddCallback() + if (res != null) dispatch(new Runnable { + override def run() = + try callback(res) + catch handledFutureException andThen { + t => Console.err.println(t) + } + }) + + this + } + + def isTimedout: Boolean = getState match { + case Failure(ft: FutureTimeoutException) => true + case _ => false + } + +} + +private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl) +extends Promise[T] with Future[T] with Completable[T] { + + val executor: scala.concurrent.default.ExecutionContextImpl = context + + @volatile private var state: State[T] = _ + + val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[PromiseImpl[T]], classOf[State[T]], "state") + + updater.set(this, Pending(List())) + + def casState(oldv: State[T], newv: State[T]): Boolean = { + updater.compareAndSet(this, oldv, newv) + } + + def getState: State[T] = { + updater.get(this) + } + + @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match { + case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs + case _ => null + } + + def tryComplete(r: Try[T]) = r match { + case util.Failure(t) => tryFailure(t) + case util.Success(v) => trySuccess(v) + } + + override def trySuccess(value: T): Boolean = { + val cbs = tryCompleteState(Success(value)) + if (cbs == null) + false + else { + processCallbacks(cbs, util.Success(value)) + this.synchronized { + this.notifyAll() + } + true + } + } + + override def tryFailure(t: Throwable): Boolean = { + val wrapped = wrap(t) + val cbs = tryCompleteState(Failure(wrapped)) + if (cbs == null) + false + else { + processCallbacks(cbs, util.Failure(wrapped)) + this.synchronized { + this.notifyAll() + } + true + } + } + + def await(atMost: Duration)(implicit canawait: scala.concurrent.CanAwait): T = getState match { + case Success(res) => res + case Failure(t) => throw t + case _ => + this.synchronized { + while (true) + getState match { + case Pending(_) => this.wait() + case Success(res) => return res + case Failure(t) => throw t + } + } + sys.error("unreachable") + } + +} + +private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T) +extends RecursiveAction with Task[T] with Future[T] with Completable[T] { + + val executor: ExecutionContextImpl = context + + @volatile private var state: State[T] = _ + + val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[TaskImpl[T]], classOf[State[T]], "state") + + updater.set(this, Pending(List())) + + def casState(oldv: State[T], newv: State[T]): Boolean = { + updater.compareAndSet(this, oldv, newv) + } + + def getState: State[T] = { + updater.get(this) + } + + @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match { + case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs + } + + def compute(): Unit = { + var cbs: List[Callback] = null + try { + val res = body + processCallbacks(tryCompleteState(Success(res)), util.Success(res)) + } catch { + case t if isFutureThrowable(t) => + processCallbacks(tryCompleteState(Failure(t)), util.Failure(t)) + case t => + val ee = new ExecutionException(t) + processCallbacks(tryCompleteState(Failure(ee)), util.Failure(ee)) + throw t + } + } + + def start(): Unit = { + Thread.currentThread match { + case fj: ForkJoinWorkerThread if fj.getPool eq executor.pool => fork() + case _ => executor.pool.execute(this) + } + } + + // TODO FIXME: handle timeouts + def await(atMost: Duration): this.type = + await + + def await: this.type = { + this.join() + this + } + + def tryCancel(): Unit = + tryUnfork() + + def await(atMost: Duration)(implicit canawait: CanAwait): T = { + join() // TODO handle timeout also + (updater.get(this): @unchecked) match { + case Success(r) => r + case Failure(t) => throw t + } + } + +} + + +private[concurrent] sealed abstract class State[T] + + +case class Pending[T](callbacks: List[Try[T] => Any]) extends State[T] + + +case class Success[T](result: T) extends State[T] + + +case class Failure[T](throwable: Throwable) extends State[T] + + +private[concurrent] final class ExecutionContextImpl extends ExecutionContext { + import ExecutionContextImpl._ + + val pool = { + val p = new ForkJoinPool + p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler { + def uncaughtException(t: Thread, throwable: Throwable) { + Console.err.println(throwable.getMessage) + throwable.printStackTrace(Console.err) + } + }) + p + } + + @inline + private def executeTask(task: RecursiveAction) { + if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) + task.fork() + else + pool execute task + } + + def execute(task: Runnable) { + val action = new RecursiveAction { def compute() { task.run() } } + executeTask(action) + } + + def execute[U](body: () => U) { + val action = new RecursiveAction { def compute() { body() } } + executeTask(action) + } + + def task[T](body: => T): Task[T] = { + new TaskImpl(this, body) + } + + def future[T](body: => T): Future[T] = { + val t = task(body) + t.start() + t.future + } + + def promise[T]: Promise[T] = + new PromiseImpl[T](this) + + def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) + + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { + currentExecutionContext.get match { + case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case + case x if x eq this => this.blockingCall(awaitable) // inside an execution context thread on this executor + case x => x.blocking(awaitable, atMost) + } + } + + private def blockingCall[T](b: Awaitable[T]): T = b match { + case fj: TaskImpl[_] if fj.executor.pool eq pool => + fj.await(Duration.fromNanos(0)) + case _ => + var res: T = null.asInstanceOf[T] + @volatile var blockingDone = false + // TODO add exception handling here! + val mb = new ForkJoinPool.ManagedBlocker { + def block() = { + res = b.await(Duration.fromNanos(0))(CanAwaitEvidence) + blockingDone = true + true + } + def isReleasable = blockingDone + } + ForkJoinPool.managedBlock(mb, true) + res + } + + def reportFailure(t: Throwable): Unit = {} + +} + + +object ExecutionContextImpl { + + private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] { + override protected def initialValue = null + } + +} + + + + + + + diff --git a/src/library/scala/concurrent/impl/AbstractPromise.java b/src/library/scala/concurrent/impl/AbstractPromise.java new file mode 100644 index 0000000000..5280d67854 --- /dev/null +++ b/src/library/scala/concurrent/impl/AbstractPromise.java @@ -0,0 +1,21 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl; + + + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + + + +abstract class AbstractPromise { + private volatile Object _ref = null; + protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater = + AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); +} diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala new file mode 100644 index 0000000000..af0eb66292 --- /dev/null +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -0,0 +1,134 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl + + + +import java.util.concurrent.{Callable, ExecutorService} +import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable} +import scala.util.{ Duration, Try, Success, Failure } +import scala.collection.mutable.Stack + + + +class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionContext { + import ExecutionContextImpl._ + + def execute(runnable: Runnable): Unit = executorService match { + // case fj: ForkJoinPool => + // TODO fork if more applicable + // executorService execute runnable + case _ => + executorService execute runnable + } + + def execute[U](body: () => U): Unit = execute(new Runnable { + def run() = body() + }) + + def promise[T]: Promise[T] = new Promise.DefaultPromise[T]()(this) + + def future[T](body: =>T): Future[T] = { + val p = promise[T] + + dispatchFuture { + () => + p complete { + try { + Success(body) + } catch { + case e => resolver(e) + } + } + } + + p.future + } + + def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) + + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { + currentExecutionContext.get match { + case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case + case x => x.blockingCall(awaitable) // inside an execution context thread + } + } + + def reportFailure(t: Throwable) = t match { + case e: Error => throw e // rethrow serious errors + case t => t.printStackTrace() + } + + /** Only callable from the tasks running on the same execution context. */ + private def blockingCall[T](body: Awaitable[T]): T = { + releaseStack() + + // TODO see what to do with timeout + body.await(Duration.fromNanos(0))(CanAwaitEvidence) + } + + // an optimization for batching futures + // TODO we should replace this with a public queue, + // so that it can be stolen from + // OR: a push to the local task queue should be so cheap that this is + // not even needed, but stealing is still possible + private val _taskStack = new ThreadLocal[Stack[() => Unit]]() + + private def releaseStack(): Unit = + _taskStack.get match { + case stack if (stack ne null) && stack.nonEmpty => + val tasks = stack.elems + stack.clear() + _taskStack.remove() + dispatchFuture(() => _taskStack.get.elems = tasks, true) + case null => + // do nothing - there is no local batching stack anymore + case _ => + _taskStack.remove() + } + + private[impl] def dispatchFuture(task: () => Unit, force: Boolean = false): Unit = + _taskStack.get match { + case stack if (stack ne null) && !force => stack push task + case _ => this.execute( + new Runnable { + def run() { + try { + val taskStack = Stack[() => Unit](task) + _taskStack set taskStack + while (taskStack.nonEmpty) { + val next = taskStack.pop() + try { + next.apply() + } catch { + case e => + // TODO catching all and continue isn't good for OOME + reportFailure(e) + } + } + } finally { + _taskStack.remove() + } + } + } + ) + } + +} + + +object ExecutionContextImpl { + + private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContextImpl] = new ThreadLocal[ExecutionContextImpl] { + override protected def initialValue = null + } + +} + + diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala new file mode 100644 index 0000000000..3664241ec0 --- /dev/null +++ b/src/library/scala/concurrent/impl/Future.scala @@ -0,0 +1,77 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl + + + +import scala.concurrent.{Awaitable, ExecutionContext} +import scala.util.{ Try, Success, Failure } +//import scala.util.continuations._ + + + +trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] { + + implicit def executor: ExecutionContextImpl + + /** For use only within a Future.flow block or another compatible Delimited Continuations reset block. + * + * Returns the result of this Future without blocking, by suspending execution and storing it as a + * continuation until the result is available. + */ + //def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any])) + + /** Tests whether this Future has been completed. + */ + final def isCompleted: Boolean = value.isDefined + + /** The contained value of this Future. Before this Future is completed + * the value will be None. After completion the value will be Some(Right(t)) + * if it contains a valid result, or Some(Left(error)) if it contains + * an exception. + */ + def value: Option[Try[T]] + + def onComplete[U](func: Try[T] => U): this.type + + /** Creates a new Future[A] which is completed with this Future's result if + * that conforms to A's erased type or a ClassCastException otherwise. + */ + final def mapTo[T](implicit m: Manifest[T]) = { + val p = executor.promise[T] + + onComplete { + case f @ Failure(t) => p complete f.asInstanceOf[Try[T]] + case Success(v) => + p complete (try { + Success(boxedType(m.erasure).cast(v).asInstanceOf[T]) + } catch { + case e: ClassCastException ⇒ Failure(e) + }) + } + + p.future + } + + /** Used by for-comprehensions. + */ + final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p) + + final class FutureWithFilter[+A](self: Future[A], p: A => Boolean) { + def foreach(f: A => Unit): Unit = self filter p foreach f + def map[B](f: A => B) = self filter p map f + def flatMap[B](f: A => Future[B]) = self filter p flatMap f + def withFilter(q: A => Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) + } + +} + + + + diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala new file mode 100644 index 0000000000..3f9970b178 --- /dev/null +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -0,0 +1,251 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl + + + +import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater +import scala.concurrent.{Awaitable, ExecutionContext, resolve, resolver, blocking, CanAwait, TimeoutException} +//import scala.util.continuations._ +import scala.util.Duration +import scala.util.Try +import scala.util +import scala.annotation.tailrec +//import scala.concurrent.NonDeterministic + + + +trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { + + def future = this + + def newPromise[S]: Promise[S] = executor promise + + // TODO refine answer and return types here from Any to type parameters + // then move this up in the hierarchy + /* + final def <<(value: T): Future[T] @cps[Future[Any]] = shift { + cont: (Future[T] => Future[Any]) => + cont(complete(Right(value))) + } + + final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { + cont: (Future[T] => Future[Any]) => + val p = executor.promise[Any] + val thisPromise = this + + thisPromise completeWith other + thisPromise onComplete { v => + try { + p completeWith cont(thisPromise) + } catch { + case e => p complete resolver(e) + } + } + + p.future + } + */ + // TODO finish this once we introduce something like dataflow streams + + /* + final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => + val fr = executor.promise[Any] + val f = stream.dequeue(this) + f.onComplete { _ => + try { + fr completeWith cont(f) + } catch { + case e => + fr failure e + } + } + fr + } + */ + +} + + +object Promise { + + def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]] + + /** Represents the internal state. + */ + sealed trait FState[+T] { def value: Option[Try[T]] } + + case class Pending[T](listeners: List[Try[T] => Any] = Nil) extends FState[T] { + def value: Option[Try[T]] = None + } + + case class Success[T](value: Option[util.Success[T]] = None) extends FState[T] { + def result: T = value.get.get + } + + case class Failure[T](value: Option[util.Failure[T]] = None) extends FState[T] { + def exception: Throwable = value.get.exception + } + + private val emptyPendingValue = Pending[Nothing](Nil) + + /** Default promise implementation. + */ + class DefaultPromise[T](implicit val executor: ExecutionContextImpl) extends AbstractPromise with Promise[T] { + self => + + updater.set(this, Promise.EmptyPending()) + + protected final def tryAwait(atMost: Duration): Boolean = { + @tailrec + def awaitUnsafe(waitTimeNanos: Long): Boolean = { + if (value.isEmpty && waitTimeNanos > 0) { + val ms = NANOSECONDS.toMillis(waitTimeNanos) + val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec + val start = System.nanoTime() + try { + synchronized { + while (value.isEmpty) wait(ms, ns) + } + } catch { + case e: InterruptedException => + } + + awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) + } else + value.isDefined + } + + executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), Duration.fromNanos(0)) + } + + private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = + if (value.isDefined || tryAwait(atMost)) this + else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") + + def await(atMost: Duration)(implicit permit: CanAwait): T = + ready(atMost).value.get match { + case util.Failure(e) => throw e + case util.Success(r) => r + } + + def value: Option[Try[T]] = getState.value + + @inline + private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] + + @inline + protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) + + @inline + protected final def getState: FState[T] = updater.get(this) + + def tryComplete(value: Try[T]): Boolean = { + val callbacks: List[Try[T] => Any] = { + try { + @tailrec + def tryComplete(v: Try[T]): List[Try[T] => Any] = { + getState match { + case cur @ Pending(listeners) => + if (updateState(cur, if (v.isFailure) Failure(Some(v.asInstanceOf[util.Failure[T]])) else Success(Some(v.asInstanceOf[util.Success[T]])))) listeners + else tryComplete(v) + case _ => null + } + } + tryComplete(resolve(value)) + } finally { + synchronized { notifyAll() } // notify any blockers from `tryAwait` + } + } + + callbacks match { + case null => false + case cs if cs.isEmpty => true + case cs => + executor dispatchFuture { + () => cs.foreach(f => notifyCompleted(f, value)) + } + true + } + } + + def onComplete[U](func: Try[T] => U): this.type = { + @tailrec // Returns whether the future has already been completed or not + def tryAddCallback(): Boolean = { + val cur = getState + cur match { + case _: Success[_] | _: Failure[_] => true + case p: Pending[_] => + val pt = p.asInstanceOf[Pending[T]] + if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() + } + } + + if (tryAddCallback()) { + val result = value.get + executor dispatchFuture { + () => notifyCompleted(func, result) + } + } + + this + } + + private final def notifyCompleted(func: Try[T] => Any, result: Try[T]) { + try { + func(result) + } catch { + case e => executor.reportFailure(e) + } + } + } + + /** An already completed Future is given its result at creation. + * + * Useful in Future-composition when a value to contribute is already available. + */ + final class KeptPromise[T](suppliedValue: Try[T])(implicit val executor: ExecutionContextImpl) extends Promise[T] { + val value = Some(resolve(suppliedValue)) + + def tryComplete(value: Try[T]): Boolean = false + + def onComplete[U](func: Try[T] => U): this.type = { + val completedAs = value.get + executor dispatchFuture { + () => func(completedAs) + } + this + } + + private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + + def await(atMost: Duration)(implicit permit: CanAwait): T = value.get match { + case util.Failure(e) => throw e + case util.Success(r) => r + } + } + +} + + + + + + + + + + + + + + + + diff --git a/src/library/scala/concurrent/impl/package.scala b/src/library/scala/concurrent/impl/package.scala new file mode 100644 index 0000000000..72add73167 --- /dev/null +++ b/src/library/scala/concurrent/impl/package.scala @@ -0,0 +1,39 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + + + +import java.{lang => jl} +import scala.util.Duration + + + +package object impl { + + private val toBoxed = Map[Class[_], Class[_]]( + classOf[Boolean] -> classOf[jl.Boolean], + classOf[Byte] -> classOf[jl.Byte], + classOf[Char] -> classOf[jl.Character], + classOf[Short] -> classOf[jl.Short], + classOf[Int] -> classOf[jl.Integer], + classOf[Long] -> classOf[jl.Long], + classOf[Float] -> classOf[jl.Float], + classOf[Double] -> classOf[jl.Double], + classOf[Unit] -> classOf[scala.runtime.BoxedUnit]) + + def boxedType(c: Class[_]): Class[_] = { + if (c.isPrimitive) toBoxed(c) else c + } + + def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue + +} + + diff --git a/src/library/scala/concurrent/ops.scala b/src/library/scala/concurrent/ops.scala index 92220a8313..2cea29aefe 100644 --- a/src/library/scala/concurrent/ops.scala +++ b/src/library/scala/concurrent/ops.scala @@ -15,6 +15,7 @@ import scala.util.control.Exception.allCatch * * @author Martin Odersky, Stepan Koltsov, Philipp Haller */ +@deprecated("Use `future` instead.", "2.10.0") object ops { val defaultRunner: FutureTaskRunner = TaskRunners.threadRunner diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala new file mode 100644 index 0000000000..0725332c5e --- /dev/null +++ b/src/library/scala/concurrent/package.scala @@ -0,0 +1,150 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala + + + +import scala.util.{ Duration, Try, Success, Failure } + + + +/** This package object contains primitives for concurrent and parallel programming. + */ +package object concurrent { + + type ExecutionException = java.util.concurrent.ExecutionException + type CancellationException = java.util.concurrent.CancellationException + type TimeoutException = java.util.concurrent.TimeoutException + + /** A global execution environment for executing lightweight tasks. + */ + lazy val executionContext = + new impl.ExecutionContextImpl(java.util.concurrent.Executors.newCachedThreadPool()) + + /** A global service for scheduling tasks for execution. + */ + // lazy val scheduler = + // new default.SchedulerImpl + + val handledFutureException: PartialFunction[Throwable, Throwable] = { + case t: Throwable if isFutureThrowable(t) => t + } + + // TODO rename appropriately and make public + private[concurrent] def isFutureThrowable(t: Throwable) = t match { + case e: Error => false + case t: scala.util.control.ControlThrowable => false + case i: InterruptedException => false + case _ => true + } + + private[concurrent] def resolve[T](source: Try[T]): Try[T] = source match { + case Failure(t: scala.runtime.NonLocalReturnControl[_]) => Success(t.value.asInstanceOf[T]) + case Failure(t: scala.util.control.ControlThrowable) => Failure(new ExecutionException("Boxed ControlThrowable", t)) + case Failure(t: InterruptedException) => Failure(new ExecutionException("Boxed InterruptedException", t)) + case Failure(e: Error) => Failure(new ExecutionException("Boxed Error", e)) + case _ => source + } + + // TODO, docs, return type + private val resolverFunction: PartialFunction[Throwable, Try[_]] = { + case t: scala.runtime.NonLocalReturnControl[_] => Success(t.value) + case t: scala.util.control.ControlThrowable => Failure(new ExecutionException("Boxed ControlThrowable", t)) + case t: InterruptedException => Failure(new ExecutionException("Boxed InterruptedException", t)) + case e: Error => Failure(new ExecutionException("Boxed Error", e)) + case t => Failure(t) + } + + private[concurrent] def resolver[T] = resolverFunction.asInstanceOf[PartialFunction[Throwable, Try[T]]] + + /* concurrency constructs */ + + def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] = + execCtx future body + + def promise[T]()(implicit execCtx: ExecutionContext = executionContext): Promise[T] = + execCtx promise + + /** Wraps a block of code into an awaitable object. */ + def body2awaitable[T](body: =>T) = new Awaitable[T] { + def await(atMost: Duration)(implicit cb: CanAwait) = body + } + + /** Used to block on a piece of code which potentially blocks. + * + * @param body A piece of code which contains potentially blocking or long running calls. + * + * Calling this method may throw the following exceptions: + * - CancellationException - if the computation was cancelled + * - InterruptedException - in the case that a wait within the blockable object was interrupted + * - TimeoutException - in the case that the blockable object timed out + */ + def blocking[T](atMost: Duration)(body: =>T)(implicit execCtx: ExecutionContext): T = + executionContext.blocking(atMost)(body) + + /** Blocks on an awaitable object. + * + * @param awaitable An object with a `block` method which runs potentially blocking or long running calls. + * + * Calling this method may throw the following exceptions: + * - CancellationException - if the computation was cancelled + * - InterruptedException - in the case that a wait within the blockable object was interrupted + * - TimeoutException - in the case that the blockable object timed out + */ + def blocking[T](awaitable: Awaitable[T], atMost: Duration)(implicit execCtx: ExecutionContext = executionContext): T = + executionContext.blocking(awaitable, atMost) + + object await { + def ready[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): Awaitable[T] = { + try blocking(awaitable, atMost) + catch { case _ => } + awaitable + } + + def result[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): T = { + blocking(awaitable, atMost) + } + } + + /** Importing this object allows using some concurrency primitives + * on futures and promises that can yield nondeterministic programs. + * + * While program determinism is broken when using these primitives, + * some programs cannot be written without them (e.g. multiple client threads + * cannot send requests to a server thread through regular promises and futures). + */ + object nondeterministic { + } + + @inline implicit final def int2durationops(x: Int) = new DurationOps(x) + +} + + + +package concurrent { + + /** A timeout exception. + * + * Futures are failed with a timeout exception when their timeout expires. + * + * Each timeout exception contains an origin future which originally timed out. + */ + class FutureTimeoutException(origin: Future[_], message: String) extends TimeoutException(message) { + def this(origin: Future[_]) = this(origin, "Future timed out.") + } + + final class DurationOps private[concurrent] (x: Int) { + // TODO ADD OTHERS + def ns = util.Duration.fromNanos(0) + } + +} + + diff --git a/src/library/scala/concurrent/package.scala.disabled b/src/library/scala/concurrent/package.scala.disabled deleted file mode 100644 index 42b4bf954c..0000000000 --- a/src/library/scala/concurrent/package.scala.disabled +++ /dev/null @@ -1,108 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - - -package scala - - - - -/** This package object contains primitives for parallel programming. - */ -package object concurrent { - - /** Performs a call which can potentially block execution. - * - * Example: - * {{{ - * val lock = new ReentrantLock - * - * // ... do something ... - * - * blocking { - * if (!lock.hasLock) lock.lock() - * } - * }}} - * - * '''Note:''' calling methods that wait arbitrary amounts of time - * (e.g. for I/O operations or locks) may severely decrease performance - * or even result in deadlocks. This does not include waiting for - * results of futures. - * - * @tparam T the result type of the blocking operation - * @param body the blocking operation - * @param runner the runner used for parallel computations - * @return the result of the potentially blocking operation - */ - def blocking[T](body: =>T)(implicit runner: TaskRunner): T = { - null.asInstanceOf[T] - } - - /** Invokes a computation asynchronously. Does not wait for the computation - * to finish. - * - * @tparam U the result type of the operation - * @param p the computation to be invoked asynchronously - * @param runner the runner used for parallel computations - */ - def spawn[U](p: =>U)(implicit runner: TaskRunner): Unit = { - } - - /** Starts 2 parallel computations and returns once they are completed. - * - * $invokingPar - * - * @tparam T1 the type of the result of 1st the parallel computation - * @tparam T2 the type of the result of 2nd the parallel computation - * @param b1 the 1st computation to be invoked in parallel - * @param b2 the 2nd computation to be invoked in parallel - * @param runner the runner used for parallel computations - * @return a tuple of results corresponding to parallel computations - */ - def par[T1, T2](b1: =>T1)(b2: =>T2)(implicit runner: TaskRunner): (T1, T2) = { - null - } - - /** Starts 3 parallel computations and returns once they are completed. - * - * $invokingPar - * - * @tparam T1 the type of the result of 1st the parallel computation - * @tparam T2 the type of the result of 2nd the parallel computation - * @tparam T3 the type of the result of 3rd the parallel computation - * @param b1 the 1st computation to be invoked in parallel - * @param b2 the 2nd computation to be invoked in parallel - * @param b3 the 3rd computation to be invoked in parallel - * @param runner the runner used for parallel computations - * @return a tuple of results corresponding to parallel computations - */ - def par[T1, T2, T3](b1: =>T1)(b2: =>T2)(b3: =>T3)(implicit runner: TaskRunner): (T1, T2, T3) = { - null - } - - /** Starts 4 parallel computations and returns once they are completed. - * - * $invokingPar - * - * @tparam T1 the type of the result of 1st the parallel computation - * @tparam T2 the type of the result of 2nd the parallel computation - * @tparam T3 the type of the result of 3rd the parallel computation - * @tparam T4 the type of the result of 4th the parallel computation - * @param b1 the 1st computation to be invoked in parallel - * @param b2 the 2nd computation to be invoked in parallel - * @param b3 the 3rd computation to be invoked in parallel - * @param b4 the 4th computation to be invoked in parallel - * @param runner the runner used for parallel computations - * @return a tuple of results corresponding to parallel computations - */ - def par[T1, T2, T3, T4](b1: =>T1)(b2: =>T2)(b3: =>T3)(b4: =>T4)(implicit runner: TaskRunner): (T1, T2, T3, T4) = { - null - } - -} |