summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/concurrent/AbstractPromise.java.disabled17
-rw-r--r--src/library/scala/concurrent/Awaitable.scala25
-rw-r--r--src/library/scala/concurrent/DelayedLazyVal.scala3
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala33
-rw-r--r--src/library/scala/concurrent/Future.scala346
-rw-r--r--src/library/scala/concurrent/Future.scala.disabled1051
-rw-r--r--src/library/scala/concurrent/Promise.scala78
-rw-r--r--src/library/scala/concurrent/Scheduler.scala54
-rw-r--r--src/library/scala/concurrent/Task.scala13
-rw-r--r--src/library/scala/concurrent/default/SchedulerImpl.scala44
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala285
-rw-r--r--src/library/scala/concurrent/package.scala106
-rw-r--r--src/library/scala/package.scala3
-rw-r--r--src/library/scala/util/Duration.scala485
-rw-r--r--src/library/scala/util/Timeout.scala33
-rw-r--r--test/files/jvm/concurrent-future.check16
-rw-r--r--test/files/jvm/concurrent-future.scala122
-rw-r--r--test/files/jvm/scala-concurrent-tck-akka.scala391
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala389
19 files changed, 3491 insertions, 3 deletions
diff --git a/src/library/scala/concurrent/AbstractPromise.java.disabled b/src/library/scala/concurrent/AbstractPromise.java.disabled
new file mode 100644
index 0000000000..726e6a3156
--- /dev/null
+++ b/src/library/scala/concurrent/AbstractPromise.java.disabled
@@ -0,0 +1,17 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package scala.concurrent;
+/*
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import scala.concurrent.forkjoin.*;
+
+abstract class AbstractPromise {
+ //private volatile Object _ref = DefaultPromise.EmptyPending();
+ protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
+
+ protected void compute() { }
+}
+*/
diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala
new file mode 100644
index 0000000000..85546718d2
--- /dev/null
+++ b/src/library/scala/concurrent/Awaitable.scala
@@ -0,0 +1,25 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+
+
+import scala.annotation.implicitNotFound
+import scala.util.Timeout
+
+
+
+trait Awaitable[+T] {
+ @implicitNotFound(msg = "Waiting must be done by calling `await(timeout) b`, where `b` is the `Awaitable` object.")
+ def await(timeout: Timeout)(implicit canblock: CanBlock): T
+}
+
+
+
+
diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala
index e308c3b5a6..391ba7e314 100644
--- a/src/library/scala/concurrent/DelayedLazyVal.scala
+++ b/src/library/scala/concurrent/DelayedLazyVal.scala
@@ -8,7 +8,6 @@
package scala.concurrent
-import ops.future
/** A `DelayedLazyVal` is a wrapper for lengthy computations which have a
* valid partially computed result.
@@ -40,7 +39,7 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) {
*/
def apply(): T = if (isDone) complete else f()
- future {
+ ops.future {
body
_isDone = true
}
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
new file mode 100644
index 0000000000..b7b3e901e6
--- /dev/null
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -0,0 +1,33 @@
+package scala.concurrent
+
+
+
+import java.util.concurrent.{ Executors, Future => JFuture, Callable }
+import scala.util.{ Duration, Timeout }
+import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
+
+
+
+trait ExecutionContext {
+
+ protected implicit object CanBlockEvidence extends CanBlock
+
+ def execute(runnable: Runnable): Unit
+
+ def execute[U](body: () => U): Unit
+
+ def promise[T]: Promise[T]
+
+ def future[T](body: Callable[T]): Future[T] = future(body.call())
+
+ def future[T](body: => T): Future[T]
+
+ /** Only callable from the tasks running on the same execution context. */
+ def blockingCall[T](timeout: Timeout, body: Awaitable[T]): T
+
+}
+
+
+sealed trait CanBlock
+
+
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
new file mode 100644
index 0000000000..36126056c9
--- /dev/null
+++ b/src/library/scala/concurrent/Future.scala
@@ -0,0 +1,346 @@
+
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package scala.concurrent
+
+//import akka.AkkaException (replaced with Exception)
+//import akka.event.Logging.Error (removed all logging)
+import scala.util.{ Timeout, Duration }
+import scala.Option
+//import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } (commented methods)
+
+import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
+import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ NANOS, MILLISECONDS ⇒ MILLIS }
+import java.lang.{ Iterable ⇒ JIterable }
+import java.util.{ LinkedList ⇒ JLinkedList }
+
+import scala.annotation.tailrec
+import scala.collection.mutable.Stack
+//import akka.util.Switch (commented method)
+import java.{ lang ⇒ jl }
+import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
+import scala.collection.mutable.Builder
+import scala.collection.generic.CanBuildFrom
+
+/** The trait that represents futures.
+ *
+ * @define multipleCallbacks
+ * Multiple callbacks may be registered; there is no guarantee that they will be
+ * executed in a particular order.
+ *
+ * @define caughtThrowables
+ * The future may contain a throwable object and this means that the future failed.
+ * Futures obtained through combinators have the same exception as the future they were obtained from.
+ * The following throwable objects are not contained in the future:
+ * - Error - errors are not contained within futures
+ * - scala.util.control.ControlThrowable - not contained within futures
+ * - InterruptedException - not contained within futures
+ *
+ * Instead, the future is completed with a ExecutionException with one of the exceptions above
+ * as the cause.
+ *
+ * @define forComprehensionExamples
+ * Example:
+ *
+ * {{{
+ * val f = future { 5 }
+ * val g = future { 3 }
+ * val h = for {
+ * x: Int <- f // returns Future(5)
+ * y: Int <- g // returns Future(5)
+ * } yield x + y
+ * }}}
+ *
+ * is translated to:
+ *
+ * {{{
+ * f flatMap { (x: Int) => g map { (y: Int) => x + y } }
+ * }}}
+ */
+trait Future[+T] extends Awaitable[T] {
+self =>
+
+ /* Callbacks */
+
+ /** When this future is completed successfully (i.e. with a value),
+ * apply the provided function to the value.
+ *
+ * If the future has already been completed with a value,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * $multipleCallbacks
+ */
+ def onSuccess[U](func: T => U): this.type = onComplete {
+ case Left(t) => // do nothing
+ case Right(v) => func(v)
+ }
+
+ /** When this future is completed with a failure (i.e. with a throwable),
+ * apply the provided callback to the throwable.
+ *
+ * $caughtThrowables
+ *
+ * If the future has already been completed with a failure,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * Will not be called in case that the future is completed with a value.
+ *
+ * $multipleCallbacks
+ */
+ def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete {
+ case Left(t) if isFutureThrowable(t) => if (callback.isDefinedAt(t)) callback(t)
+ case Right(v) => // do nothing
+ }
+
+ /* To be removed
+ /** When this future times out, apply the provided function.
+ *
+ * If the future has already timed out,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * $multipleCallbacks
+ */
+ def onTimeout[U](callback: FutureTimeoutException => U): this.type = onComplete {
+ case Left(te: FutureTimeoutException) => callback(te)
+ case Right(v) => // do nothing
+ }
+ */
+
+ /** When this future is completed, either through an exception, a timeout, or a value,
+ * apply the provided function.
+ *
+ * If the future has already been completed,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * $multipleCallbacks
+ */
+ def onComplete[U](func: Either[Throwable, T] => U): this.type
+
+
+ /* Miscellaneous */
+
+ /** The execution context of the future.
+ */
+ def executionContext: ExecutionContext
+
+ /** Creates a new promise.
+ */
+ def newPromise[S]: Promise[S] = executionContext promise
+
+ /*
+ /** Tests whether this `Future`'s timeout has expired.
+ *
+ * $futureTimeout
+ */
+ def isTimedout: Boolean
+ */
+
+
+ /* Projections */
+
+ /** Returns a failed projection of this future.
+ *
+ * The failed projection is a future holding a value of type `Throwable`.
+ *
+ * It is completed with a value which is the throwable of the original future
+ * in case the original future is failed.
+ *
+ * It is failed with a `NoSuchElementException` if the original future is completed successfully.
+ *
+ * Blocking on this future returns a value if the original future is completed with an exception
+ * and throws a corresponding exception if the original future fails.
+ */
+ def failed: Future[Throwable] = new Future[Throwable] {
+ def executionContext = self.executionContext
+ def onComplete[U](func: Either[Throwable, Throwable] => U) = {
+ self.onComplete {
+ case Left(t) => func(Right(t))
+ case Right(v) => func(Left(noSuchElem(v))) // do nothing
+ }
+ this
+ }
+ def await(timeout: Timeout)(implicit canblock: CanBlock): Throwable = {
+ var t: Throwable = null
+ try {
+ val res = self.await(timeout)
+ t = noSuchElem(res)
+ } catch {
+ case t: Throwable => return t
+ }
+ throw t
+ }
+ private def noSuchElem(v: T) =
+ new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v)
+ }
+
+ /*
+ /** A timed out projection of this future.
+ *
+ * The timed out projection is a future holding a value of type `FutureTimeoutException`.
+ *
+ * It is completed with a value which is a `FutureTimeoutException` of the original future
+ * in case the original future is timed out.
+ *
+ * It is failed with a `NoSuchElementException` if the original future is completed successfully.
+ * It is failed with the original exception otherwise.
+ *
+ * Blocking on this future returns a value only if the original future timed out, and a
+ * corresponding exception otherwise.
+ */
+ def timedout: Future[FutureTimeoutException] = new Future[FutureTimeoutException] {
+ def executionContext = self.executionContext
+ def onComplete[U](func: Either[Throwable, FutureTimeoutException] => U) = {
+ self.onComplete {
+ case Left(te: FutureTimeoutException) => func(Right(te))
+ case Left(t) => func(Left(noSuchElemThrowable(t)))
+ case Right(v) => func(Left(noSuchElemValue(v)))
+ }
+ this
+ }
+ def isTimedout = self.isTimedout
+ def block()(implicit canblock: CanBlock) = try {
+ val res = self.block()
+ throw noSuchElemValue(res)
+ } catch {
+ case ft: FutureTimeoutException =>
+ ft
+ case t: Throwable =>
+ throw noSuchElemThrowable(t)
+ }
+ private def noSuchElemValue(v: T) =
+ new NoSuchElementException("Future.timedout didn't time out. Instead completed with: " + v)
+ private def noSuchElemThrowable(v: Throwable) =
+ new NoSuchElementException("Future.timedout didn't time out. Instead failed with: " + v)
+ }
+ */
+
+ /* Monadic operations */
+
+ /** Creates a new future that will handle any matching throwable that this
+ * future might contain. If there is no match, or if this future contains
+ * a valid result then the new future will contain the same.
+ *
+ * Example:
+ *
+ * {{{
+ * future (6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0
+ * future (6 / 0) recover { case e: NotFoundException ⇒ 0 } // result: exception
+ * future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
+ * }}}
+ */
+ def recover[U >: T](pf: PartialFunction[Throwable, U]): Future[U] = {
+ val p = newPromise[U]
+
+ onComplete {
+ case Left(t) => if (pf isDefinedAt t) p success pf(t) else p failure t
+ case Right(v) => p success v
+ }
+
+ p.future
+ }
+
+ /** Asynchronously processes the value in the future once the value becomes available.
+ *
+ * Will not be called if the future times out or fails.
+ *
+ * This method typically registers an `onSuccess` callback.
+ */
+ def foreach[U](f: T => U): Unit = onSuccess(f)
+
+ /** Creates a new future by applying a function to the successful result of
+ * this future. If this future is completed with an exception then the new
+ * future will also contain this exception.
+ *
+ * $forComprehensionExample
+ */
+ def map[S](f: T => S): Future[S] = {
+ val p = newPromise[S]
+
+ onComplete {
+ case Left(t) => p failure t
+ case Right(v) => p success f(v)
+ }
+
+ p.future
+ }
+
+ /** Creates a new future by applying a function to the successful result of
+ * this future, and returns the result of the function as the new future.
+ * If this future is completed with an exception then the new future will
+ * also contain this exception.
+ *
+ * $forComprehensionExample
+ */
+ def flatMap[S](f: T => Future[S]): Future[S] = {
+ val p = newPromise[S]
+
+ onComplete {
+ case Left(t) => p failure t
+ case Right(v) => f(v) onComplete {
+ case Left(t) => p failure t
+ case Right(v) => p success v
+ }
+ }
+
+ p.future
+ }
+
+ /** Creates a new future by filtering the value of the current future with a predicate.
+ *
+ * If the current future contains a value which satisfies the predicate, the new future will also hold that value.
+ * Otherwise, the resulting future will fail with a `NoSuchElementException`.
+ *
+ * If the current future fails or times out, the resulting future also fails or times out, respectively.
+ *
+ * Example:
+ * {{{
+ * val f = future { 5 }
+ * val g = g filter { _ % 2 == 1 }
+ * val h = f filter { _ % 2 == 0 }
+ * block on g // evaluates to 5
+ * block on h // throw a NoSuchElementException
+ * }}}
+ */
+ def filter(pred: T => Boolean): Future[T] = {
+ val p = newPromise[T]
+
+ onComplete {
+ case Left(t) => p failure t
+ case Right(v) => if (pred(v)) p success v else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v)
+ }
+
+ p.future
+ }
+
+}
+
+
+object Future {
+
+ def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
+ val builder = cbf(futures)
+ val p: Promise[Coll[T]] = executionContext.promise[Coll[T]]
+
+ if (futures.size == 1) futures.head onComplete {
+ case Left(t) => p failure t
+ case Right(v) => builder += v
+ p success builder.result
+ } else {
+ val restFutures = all(futures.tail)
+ futures.head onComplete {
+ case Left(t) => p failure t
+ case Right(v) => builder += v
+ restFutures onComplete {
+ case Left(t) => p failure t
+ case Right(vs) => for (v <- vs) builder += v
+ p success builder.result
+ }
+ }
+ }
+
+ p.future
+ }
+
+}
diff --git a/src/library/scala/concurrent/Future.scala.disabled b/src/library/scala/concurrent/Future.scala.disabled
new file mode 100644
index 0000000000..3cd9bbeb6e
--- /dev/null
+++ b/src/library/scala/concurrent/Future.scala.disabled
@@ -0,0 +1,1051 @@
+/*
+class FutureFactory(dispatcher: MessageDispatcher, timeout: Timeout) {
+
+ // TODO: remove me ASAP !!!
+ implicit val _dispatcher = dispatcher
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T]): Future[T] =
+ Future(body.call, timeout)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], timeout: Timeout): Future[T] =
+ Future(body.call, timeout)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], timeout: Long): Future[T] =
+ Future(body.call, timeout)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] =
+ Future(body.call)(dispatcher, timeout)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], timeout: Timeout, dispatcher: MessageDispatcher): Future[T] =
+ Future(body.call)(dispatcher, timeout)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
+ Future(body.call)(dispatcher, timeout)
+
+ /**
+ * Java API.
+ * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
+ */
+/*
+ def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], timeout: Timeout): Future[JOption[T]] = {
+ val pred: T ⇒ Boolean = predicate.apply(_)
+ Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)), timeout)(pred).map(JOption.fromScalaOption(_))(timeout)
+ }
+
+ def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean]): Future[JOption[T]] = find(futures, predicate, timeout)
+*/
+ /**
+ * Java API.
+ * Returns a Future to the result of the first future in the list that is completed
+ */
+/*
+ def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], timeout: Timeout): Future[T] =
+ Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
+
+ def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]]): Future[T] = firstCompletedOf(futures, timeout)
+*/
+ /**
+ * Java API
+ * A non-blocking fold over the specified futures.
+ * The fold is performed on the thread where the last future is completed,
+ * the result will be the first failure of any of the futures, or any failure in the actual fold,
+ * or the result of the fold.
+ */
+/*
+ def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
+ Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(zero)(fun.apply _)
+
+ def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun)
+
+ def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout, futures, fun)
+
+ /**
+ * Java API.
+ * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
+ */
+ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] =
+ Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
+
+ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun)
+
+ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout, fun)
+
+ /**
+ * Java API.
+ * Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
+ * Useful for reducing many Futures into a single Future.
+ */
+ def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] = {
+ implicit val t = timeout
+ scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) ⇒
+ for (r ← fr; a ← fa) yield {
+ r add a
+ r
+ })
+ }
+
+ def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = sequence(in, timeout)
+
+ /**
+ * Java API.
+ * Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A ⇒ Future[B].
+ * This is useful for performing a parallel map. For example, to apply a function to all items of a list
+ * in parallel.
+ */
+ def traverse[A, B](in: JIterable[A], timeout: Timeout, fn: JFunc[A, Future[B]]): Future[JIterable[B]] = {
+ implicit val t = timeout
+ scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) ⇒
+ val fb = fn(a)
+ for (r ← fr; b ← fb) yield {
+ r add b
+ r
+ }
+ }
+ }
+
+ def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, timeout, fn)
+
+*/
+}
+*/
+
+object Future {
+ /*
+ /**
+ * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
+ * The execution is performed by the specified Dispatcher.
+ */
+ def apply[T](body: ⇒ T)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = {
+ val promise: Promise[T] = DefaultPromise[T](timeout)
+
+ dispatcher dispatchTask { () ⇒
+ promise complete {
+ try {
+ Right(body)
+ } catch {
+ case e ⇒ Left(e)
+ }
+ }
+ }
+
+ promise
+ }
+
+ def apply[T](body: ⇒ T, timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] =
+ apply(body)(dispatcher, timeout)
+
+ def apply[T](body: ⇒ T, timeout: Duration)(implicit dispatcher: MessageDispatcher): Future[T] =
+ apply(body)(dispatcher, timeout)
+
+ def apply[T](body: ⇒ T, timeout: Long)(implicit dispatcher: MessageDispatcher): Future[T] =
+ apply(body)(dispatcher, timeout)
+
+ import scala.collection.mutable.Builder
+ import scala.collection.generic.CanBuildFrom
+
+ /**
+ * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
+ * Useful for reducing many Futures into a single Future.
+ */
+ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[A]] =
+ in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
+
+ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Timeout)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] =
+ sequence(in)(cbf, timeout, dispatcher)
+
+ /**
+ * Returns a Future to the result of the first future in the list that is completed
+ */
+ def firstCompletedOf[T](futures: Iterable[Future[T]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = {
+ val futureResult = DefaultPromise[T](timeout)
+
+ val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _)
+ futures.foreach(_ onComplete completeFirst)
+
+ futureResult
+ }
+
+ def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] =
+ firstCompletedOf(futures)(dispatcher, timeout)
+
+ /**
+ * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
+ */
+ def find[T](futures: Iterable[Future[T]])(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[Option[T]] = {
+ if (futures.isEmpty) new KeptPromise[Option[T]](Right(None))
+ else {
+ val result = DefaultPromise[Option[T]](timeout)
+ val ref = new AtomicInteger(futures.size)
+ val search: Future[T] ⇒ Unit = f ⇒ try {
+ f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r))
+ } finally {
+ if (ref.decrementAndGet == 0)
+ result completeWithResult None
+ }
+ futures.foreach(_ onComplete search)
+
+ result
+ }
+ }
+
+ def find[T](futures: Iterable[Future[T]], timeout: Timeout)(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] =
+ find(futures)(predicate)(dispatcher, timeout)
+
+ /**
+ * A non-blocking fold over the specified futures.
+ * The fold is performed on the thread where the last future is completed,
+ * the result will be the first failure of any of the futures, or any failure in the actual fold,
+ * or the result of the fold.
+ * Example:
+ * <pre>
+ * val result = Futures.fold(0)(futures)(_ + _).await.result
+ * </pre>
+ */
+/*
+ def fold[T, R](futures: Iterable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[R] = {
+ if (futures.isEmpty) {
+ new KeptPromise[R](Right(zero))
+ } else {
+ val result = DefaultPromise[R](timeout)
+ val results = new ConcurrentLinkedQueue[T]()
+ val done = new Switch(false)
+ val allDone = futures.size
+
+ val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
+ f.value.get match {
+ case Right(value) ⇒
+ val added = results add value
+ if (added && results.size == allDone) { //Only one thread can get here
+ if (done.switchOn) {
+ try {
+ val i = results.iterator
+ var currentValue = zero
+ while (i.hasNext) { currentValue = foldFun(currentValue, i.next) }
+ result completeWithResult currentValue
+ } catch {
+ case e: Exception ⇒
+ //dispatcher.prerequisites.eventStream.publish(Error(e, "Future.fold", e.getMessage))
+ result completeWithException e
+ } finally {
+ results.clear
+ }
+ }
+ }
+ case Left(exception) ⇒
+ if (done.switchOn) {
+ result completeWithException exception
+ results.clear
+ }
+ }
+ }
+
+ futures foreach { _ onComplete aggregate }
+ result
+ }
+ }
+*/
+/*
+ def fold[T, R](futures: Iterable[Future[T]], timeout: Timeout)(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] =
+ fold(futures)(zero)(foldFun)(dispatcher, timeout)
+*/
+ /**
+ * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
+ * Example:
+ * <pre>
+ * val result = Futures.reduce(futures)(_ + _).await.result
+ * </pre>
+ */
+/*
+ def reduce[T, R >: T](futures: Iterable[Future[T]])(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[R] = {
+ if (futures.isEmpty)
+ new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
+ else {
+ val result = DefaultPromise[R](timeout)
+ val seedFound = new AtomicBoolean(false)
+ val seedFold: Future[T] ⇒ Unit = f ⇒ {
+ if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
+ f.value.get match {
+ case Right(value) ⇒ result.completeWith(fold(futures.filterNot(_ eq f))(value)(op))
+ case Left(exception) ⇒ result.completeWithException(exception)
+ }
+ }
+ }
+ for (f ← futures) f onComplete seedFold //Attach the listener to the Futures
+ result
+ }
+ }
+*/
+/*
+ def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout)(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher): Future[R] =
+ reduce(futures)(op)(dispatcher, timeout)
+*/
+ /**
+ * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B].
+ * This is useful for performing a parallel map. For example, to apply a function to all items of a list
+ * in parallel:
+ * <pre>
+ * val myFutureList = Futures.traverse(myList)(x ⇒ Future(myFunc(x)))
+ * </pre>
+ */
+ def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[B]] =
+ in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[B, M[B]]]) { (fr, a) ⇒
+ val fb = fn(a.asInstanceOf[A])
+ for (r ← fr; b ← fb) yield (r += b)
+ }.map(_.result)
+
+ def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] =
+ traverse(in)(fn)(cbf, timeout, dispatcher)
+
+ /**
+ * Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
+ * Continuations plugin.
+ *
+ * Within the block, the result of a Future may be accessed by calling Future.apply. At that point
+ * execution is suspended with the rest of the block being stored in a continuation until the result
+ * of the Future is available. If an Exception is thrown while processing, it will be contained
+ * within the resulting Future.
+ *
+ * This allows working with Futures in an imperative style without blocking for each result.
+ *
+ * Completing a Future using 'Promise << Future' will also suspend execution until the
+ * value of the other Future is available.
+ *
+ * The Delimited Continuations compiler plugin must be enabled in order to use this method.
+ */
+/*
+ def flow[A](body: ⇒ A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = {
+ val future = Promise[A](timeout)
+ dispatchTask({ () ⇒
+ (reify(body) foreachFull (future completeWithResult, future completeWithException): Future[Any]) onException {
+ case e: Exception ⇒ future completeWithException e
+ }
+ }, true)
+ future
+ }
+*/
+ // TODO make variant of flow(timeout)(body) which does NOT break type inference
+
+ /**
+ * Assures that any Future tasks initiated in the current thread will be
+ * executed asynchronously, including any tasks currently queued to be
+ * executed in the current thread. This is needed if the current task may
+ * block, causing delays in executing the remaining tasks which in some
+ * cases may cause a deadlock.
+ *
+ * Note: Calling 'Future.await' will automatically trigger this method.
+ *
+ * For example, in the following block of code the call to 'latch.open'
+ * might not be executed until after the call to 'latch.await', causing
+ * a deadlock. By adding 'Future.blocking()' the call to 'latch.open'
+ * will instead be dispatched separately from the current block, allowing
+ * it to be run in parallel:
+ * <pre>
+ * val latch = new StandardLatch
+ * val future = Future() map { _ ⇒
+ * Future.blocking()
+ * val nested = Future()
+ * nested foreach (_ ⇒ latch.open)
+ * latch.await
+ * }
+ * </pre>
+ */
+ def blocking()(implicit dispatcher: MessageDispatcher): Unit =
+ _taskStack.get match {
+ case Some(taskStack) if taskStack.nonEmpty ⇒
+ val tasks = taskStack.elems
+ taskStack.clear()
+ _taskStack set None
+ dispatchTask(() ⇒ _taskStack.get.get.elems = tasks, true)
+ case Some(_) ⇒ _taskStack set None
+ case _ ⇒ // already None
+ }
+
+ private val _taskStack = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() {
+ override def initialValue = None
+ }
+
+ private[concurrent] def dispatchTask(task: () ⇒ Unit, force: Boolean = false)(implicit dispatcher: MessageDispatcher): Unit =
+ _taskStack.get match {
+ case Some(taskStack) if !force ⇒ taskStack push task
+ case _ ⇒
+ dispatcher dispatchTask { () ⇒
+ try {
+ val taskStack = Stack[() ⇒ Unit](task)
+ _taskStack set Some(taskStack)
+ while (taskStack.nonEmpty) {
+ val next = taskStack.pop()
+ try {
+ next.apply()
+ } catch {
+ case e ⇒ e.printStackTrace() //TODO FIXME strategy for handling exceptions in callbacks
+ }
+ }
+ } finally { _taskStack set None }
+ }
+ }
+ */
+}
+
+//trait Future[+T] {
+ /*
+ /**
+ * For use only within a Future.flow block or another compatible Delimited Continuations reset block.
+ *
+ * Returns the result of this Future without blocking, by suspending execution and storing it as a
+ * continuation until the result is available.
+ */
+ def apply()(implicit timeout: Timeout): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any]))
+
+ /**
+ * Blocks awaiting completion of this Future, then returns the resulting value,
+ * or throws the completed exception
+ *
+ * Scala & Java API
+ *
+ * throws FutureTimeoutException if this Future times out when waiting for completion
+ */
+ def get: T = this.await.resultOrException.get
+
+ /**
+ * Blocks the current thread until the Future has been completed or the
+ * timeout has expired. In the case of the timeout expiring a
+ * FutureTimeoutException will be thrown.
+ */
+ def await: Future[T]
+ */
+
+ /**
+ * Blocks the current thread until the Future has been completed or the
+ * timeout has expired, additionally bounding the waiting period according to
+ * the <code>atMost</code> parameter. The timeout will be the lesser value of
+ * 'atMost' and the timeout supplied at the constructuion of this Future. In
+ * the case of the timeout expiring a FutureTimeoutException will be thrown.
+ * Other callers of this method are not affected by the additional bound
+ * imposed by <code>atMost</code>.
+ */
+// def await(atMost: Duration): Future[T]
+
+/*
+ /**
+ * Await completion of this Future and return its value if it conforms to A's
+ * erased type. Will throw a ClassCastException if the value does not
+ * conform, or any exception the Future was completed with. Will return None
+ * in case of a timeout.
+ */
+ def as[A](implicit m: Manifest[A]): Option[A] = {
+ try await catch { case _: FutureTimeoutException ⇒ }
+ value match {
+ case None ⇒ None
+ case Some(Left(ex)) ⇒ throw ex
+ case Some(Right(v)) ⇒
+ try { Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) } catch {
+ case c: ClassCastException ⇒
+ if (v.asInstanceOf[AnyRef] eq null) throw new ClassCastException("null cannot be cast to " + m.erasure)
+ else throw new ClassCastException("'" + v + "' of class " + v.asInstanceOf[AnyRef].getClass + " cannot be cast to " + m.erasure)
+ }
+ }
+ }
+
+ /**
+ * Await completion of this Future and return its value if it conforms to A's
+ * erased type, None otherwise. Will throw any exception the Future was
+ * completed with. Will return None in case of a timeout.
+ */
+ def asSilently[A](implicit m: Manifest[A]): Option[A] = {
+ try await catch { case _: FutureTimeoutException ⇒ }
+ value match {
+ case None ⇒ None
+ case Some(Left(ex)) ⇒ throw ex
+ case Some(Right(v)) ⇒
+ try Some(BoxedType(m.erasure).cast(v).asInstanceOf[A])
+ catch { case _: ClassCastException ⇒ None }
+ }
+ }
+
+ /**
+ * Tests whether this Future has been completed.
+ */
+ final def isCompleted: Boolean = value.isDefined
+
+ /**
+ * Tests whether this Future's timeout has expired.
+ *
+ * Note that an expired Future may still contain a value, or it may be
+ * completed with a value.
+ */
+ def isExpired: Boolean
+
+ def timeout: Timeout
+
+ /**
+ * This Future's timeout in nanoseconds.
+ */
+ def timeoutInNanos = if (timeout.duration.isFinite) timeout.duration.toNanos else Long.MaxValue
+
+ /**
+ * The contained value of this Future. Before this Future is completed
+ * the value will be None. After completion the value will be Some(Right(t))
+ * if it contains a valid result, or Some(Left(error)) if it contains
+ * an exception.
+ */
+ def value: Option[Either[Throwable, T]]
+
+ /**
+ * Returns the successful result of this Future if it exists.
+ */
+ final def result: Option[T] = value match {
+ case Some(Right(r)) ⇒ Some(r)
+ case _ ⇒ None
+ }
+
+ /**
+ * Returns the contained exception of this Future if it exists.
+ */
+ final def exception: Option[Throwable] = value match {
+ case Some(Left(e)) ⇒ Some(e)
+ case _ ⇒ None
+ }
+
+ /**
+ * When this Future is completed, apply the provided function to the
+ * Future. If the Future has already been completed, this will apply
+ * immediately. Will not be called in case of a timeout, which also holds if
+ * corresponding Promise is attempted to complete after expiry. Multiple
+ * callbacks may be registered; there is no guarantee that they will be
+ * executed in a particular order.
+ */
+ def onComplete(func: Future[T] ⇒ Unit): this.type
+
+ /**
+ * When the future is completed with a valid result, apply the provided
+ * PartialFunction to the result. See `onComplete` for more details.
+ * <pre>
+ * future onResult {
+ * case Foo ⇒ target ! "foo"
+ * case Bar ⇒ target ! "bar"
+ * }
+ * </pre>
+ */
+ final def onResult(pf: PartialFunction[T, Unit]): this.type = onComplete {
+ _.value match {
+ case Some(Right(r)) if pf isDefinedAt r ⇒ pf(r)
+ case _ ⇒
+ }
+ }
+
+ /**
+ * When the future is completed with an exception, apply the provided
+ * PartialFunction to the exception. See `onComplete` for more details.
+ * <pre>
+ * future onException {
+ * case NumberFormatException ⇒ target ! "wrong format"
+ * }
+ * </pre>
+ */
+ final def onException(pf: PartialFunction[Throwable, Unit]): this.type = onComplete {
+ _.value match {
+ case Some(Left(ex)) if pf isDefinedAt ex ⇒ pf(ex)
+ case _ ⇒
+ }
+ }
+
+ def onTimeout(func: Future[T] ⇒ Unit): this.type
+
+ def orElse[A >: T](fallback: ⇒ A): Future[A]
+
+ /**
+ * Creates a new Future that will handle any matching Throwable that this
+ * Future might contain. If there is no match, or if this Future contains
+ * a valid result then the new Future will contain the same.
+ * Example:
+ * <pre>
+ * Future(6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0
+ * Future(6 / 0) recover { case e: NotFoundException ⇒ 0 } // result: exception
+ * Future(6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
+ * </pre>
+ */
+ final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit timeout: Timeout): Future[A] = {
+ val future = DefaultPromise[A](timeout)
+ onComplete {
+ _.value.get match {
+ case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) })
+ case otherwise ⇒ future complete otherwise
+ }
+ }
+ future
+ }
+
+ /**
+ * Creates a new Future by applying a function to the successful result of
+ * this Future. If this Future is completed with an exception then the new
+ * Future will also contain this exception.
+ * Example:
+ * <pre>
+ * val future1 = for {
+ * a: Int <- actor ? "Hello" // returns 5
+ * b: String <- actor ? a // returns "10"
+ * c: String <- actor ? 7 // returns "14"
+ * } yield b + "-" + c
+ * </pre>
+ */
+ final def map[A](f: T ⇒ A)(implicit timeout: Timeout): Future[A] = {
+ val future = DefaultPromise[A](timeout)
+ onComplete {
+ _.value.get match {
+ case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]]
+ case Right(res) ⇒
+ future complete (try {
+ Right(f(res))
+ } catch {
+ case e: Exception ⇒
+ //dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage))
+ Left(e)
+ })
+ }
+ }
+ future
+ }
+
+ /**
+ * Creates a new Future[A] which is completed with this Future's result if
+ * that conforms to A's erased type or a ClassCastException otherwise.
+ */
+ final def mapTo[A](implicit m: Manifest[A], timeout: Timeout = this.timeout): Future[A] = {
+ val fa = DefaultPromise[A](timeout)
+ onComplete { ft ⇒
+ fa complete (ft.value.get match {
+ case l: Left[_, _] ⇒ l.asInstanceOf[Either[Throwable, A]]
+ case Right(t) ⇒
+ try {
+ Right(BoxedType(m.erasure).cast(t).asInstanceOf[A])
+ } catch {
+ case e: ClassCastException ⇒ Left(e)
+ }
+ })
+ }
+ fa
+ }
+
+ /**
+ * Creates a new Future by applying a function to the successful result of
+ * this Future, and returns the result of the function as the new Future.
+ * If this Future is completed with an exception then the new Future will
+ * also contain this exception.
+ * Example:
+ * <pre>
+ * val future1 = for {
+ * a: Int <- actor ? "Hello" // returns 5
+ * b: String <- actor ? a // returns "10"
+ * c: String <- actor ? 7 // returns "14"
+ * } yield b + "-" + c
+ * </pre>
+ */
+ final def flatMap[A](f: T ⇒ Future[A])(implicit timeout: Timeout): Future[A] = {
+ val future = DefaultPromise[A](timeout)
+
+ onComplete {
+ _.value.get match {
+ case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]]
+ case Right(r) ⇒ try {
+ future.completeWith(f(r))
+ } catch {
+ case e: Exception ⇒
+ //dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage))
+ future complete Left(e)
+ }
+ }
+ }
+ future
+ }
+
+ final def foreach(f: T ⇒ Unit): Unit = onComplete {
+ _.value.get match {
+ case Right(r) ⇒ f(r)
+ case _ ⇒
+ }
+ }
+
+ final def withFilter(p: T ⇒ Boolean)(implicit timeout: Timeout) = new FutureWithFilter[T](this, p)
+
+ final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean)(implicit timeout: Timeout) {
+ def foreach(f: A ⇒ Unit): Unit = self filter p foreach f
+ def map[B](f: A ⇒ B): Future[B] = self filter p map f
+ def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f
+ def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x))
+ }
+
+ final def filter(p: T ⇒ Boolean)(implicit timeout: Timeout): Future[T] = {
+ val future = DefaultPromise[T](timeout)
+ onComplete {
+ _.value.get match {
+ case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, T]]
+ case r @ Right(res) ⇒ future complete (try {
+ if (p(res)) r else Left(new MatchError(res))
+ } catch {
+ case e: Exception ⇒
+ //dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage))
+ Left(e)
+ })
+ }
+ }
+ future
+ }
+
+ /**
+ * Returns the current result, throws the exception if one has been raised, else returns None
+ */
+ final def resultOrException: Option[T] = value match {
+ case Some(Left(e)) ⇒ throw e
+ case Some(Right(r)) ⇒ Some(r)
+ case _ ⇒ None
+ }
+ */
+//}
+
+/**
+ * Essentially this is the Promise (or write-side) of a Future (read-side).
+ */
+//trait Promise[T] extends Future[T] {
+
+ /*
+ def start(): Unit
+
+ /**
+ * Completes this Future with the specified result, if not already completed.
+ * @return this
+ */
+ def complete(value: Either[Throwable, T]): this.type
+
+ /**
+ * Completes this Future with the specified result, if not already completed.
+ * @return this
+ */
+ final def completeWithResult(result: T): this.type = complete(Right(result))
+
+ /**
+ * Completes this Future with the specified exception, if not already completed.
+ * @return this
+ */
+ final def completeWithException(exception: Throwable): this.type = complete(Left(exception))
+
+ /**
+ * Completes this Future with the specified other Future, when that Future is completed,
+ * unless this Future has already been completed.
+ * @return this.
+ */
+ final def completeWith(other: Future[T]): this.type = {
+ other onComplete { f ⇒ complete(f.value.get) }
+ this
+ }
+ */
+/*
+ final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ cont(complete(Right(value))) }
+
+ final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒
+ val fr = DefaultPromise[Any](this.timeout)
+ this completeWith other onComplete { f ⇒
+ try {
+ fr completeWith cont(f)
+ } catch {
+ case e: Exception ⇒
+ //dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
+ fr completeWithException e
+ }
+ }
+ fr
+ }
+
+ final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒
+ val fr = DefaultPromise[Any](this.timeout)
+ stream.dequeue(this).onComplete { f ⇒
+ try {
+ fr completeWith cont(f)
+ } catch {
+ case e: Exception ⇒
+ //dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
+ fr completeWithException e
+ }
+ }
+ fr
+ }
+*/
+//}
+
+/**
+ * Represents the internal state of the DefaultCompletableFuture
+ */
+sealed abstract class FState[+T] { def value: Option[Either[Throwable, T]] }
+
+case class Pending[T](listeners: List[Future[T] ⇒ Unit] = Nil) extends FState[T] {
+ def value: Option[Either[Throwable, T]] = None
+}
+
+case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
+ def result: T = value.get.right.get
+}
+
+case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
+ def exception: Throwable = value.get.left.get
+}
+
+case object Expired extends FState[Nothing] {
+ def value: Option[Either[Throwable, Nothing]] = None
+}
+
+//Companion object to FState, just to provide a cheap, immutable default entry
+private[concurrent] object FState {
+ def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]]
+ private val emptyPendingValue = Pending[Nothing](Nil)
+}
+
+private[concurrent] object DefaultPromise {
+ def apply[T](within: Timeout)(implicit disp: MessageDispatcher): Promise[T] = new DefaultPromise[T] {
+ val timeout = within
+ implicit val dispatcher = disp
+ }
+
+ def apply[T]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[T] = apply(timeout)
+
+ def apply[T](timeout: Long)(implicit dispatcher: MessageDispatcher): Promise[T] = apply(Timeout(timeout))
+
+ def apply[T](timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher): Promise[T] = apply(Timeout(timeout, timeunit))
+}
+
+/**
+ * The default concrete Future implementation.
+ */
+trait DefaultPromise[T] extends /*AbstractPromise with*/ Promise[T] {
+ self ⇒
+
+// @volatile private _ref: AnyRef = DefaultPromise.EmptyPending()
+
+// protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater =
+// AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
+
+ val timeout: Timeout
+ implicit val dispatcher: MessageDispatcher
+
+ private val _startTimeInNanos = currentTimeInNanos
+
+ @tailrec
+ private def awaitUnsafe(waitTimeNanos: Long): Boolean = {
+ if (value.isEmpty && waitTimeNanos > 0) {
+ val ms = NANOS.toMillis(waitTimeNanos)
+ val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
+ val start = currentTimeInNanos
+ try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ }
+
+ awaitUnsafe(waitTimeNanos - (currentTimeInNanos - start))
+ } else {
+ value.isDefined
+ }
+ }
+
+ def await(atMost: Duration): this.type = if (value.isDefined) this else {
+ Future.blocking()
+
+ val waitNanos =
+ if (timeout.duration.isFinite && atMost.isFinite)
+ atMost.toNanos min timeLeft()
+ else if (atMost.isFinite)
+ atMost.toNanos
+ else if (timeout.duration.isFinite)
+ timeLeft()
+ else Long.MaxValue //If both are infinite, use Long.MaxValue
+
+ if (awaitUnsafe(waitNanos)) this
+ else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(waitNanos) + "] milliseconds")
+ }
+
+ def await = await(timeout.duration)
+
+ def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false
+
+ def value: Option[Either[Throwable, T]] = getState.value
+
+ @inline
+ protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean =
+ AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].compareAndSet(this, oldState, newState)
+
+ @inline
+ protected final def getState: FState[T] = {
+
+ @tailrec
+ def read(): FState[T] = {
+ val cur = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this)
+ if (cur.isInstanceOf[Pending[_]] && isExpired) {
+ if (updateState(cur, Expired)) Expired else read()
+ } else cur
+ }
+
+ read()
+ }
+
+ def complete(value: Either[Throwable, T]): this.type = {
+ val callbacks = {
+ try {
+ @tailrec
+ def tryComplete: List[Future[T] ⇒ Unit] = {
+ val cur = getState
+
+ cur match {
+ case Pending(listeners) ⇒
+ if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners
+ else tryComplete
+ case _ ⇒ Nil
+ }
+ }
+ tryComplete
+ } finally {
+ synchronized { notifyAll() } //Notify any evil blockers
+ }
+ }
+
+ if (callbacks.nonEmpty) Future.dispatchTask(() ⇒ callbacks foreach notifyCompleted)
+
+ this
+ }
+
+ def onComplete(func: Future[T] ⇒ Unit): this.type = {
+ @tailrec //Returns whether the future has already been completed or not
+ def tryAddCallback(): Boolean = {
+ val cur = getState
+ cur match {
+ case _: Success[_] | _: Failure[_] ⇒ true
+ case Expired ⇒ false
+ case p: Pending[_] ⇒
+ val pt = p.asInstanceOf[Pending[T]]
+ if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false
+ else tryAddCallback()
+ }
+ }
+
+ if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func))
+
+ this
+ }
+
+ def onTimeout(func: Future[T] ⇒ Unit): this.type = {
+ val runNow =
+ if (!timeout.duration.isFinite) false //Not possible
+ else if (value.isEmpty) {
+ if (!isExpired) {
+ val runnable = new Runnable {
+ def run() {
+ if (!isCompleted) {
+ if (!isExpired) {
+ // TODO FIXME: add scheduler
+ //dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
+ } else func(DefaultPromise.this)
+ }
+ }
+ }
+ /*
+ TODO FIXME: add scheduler
+ val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
+ onComplete(_ ⇒ timeoutFuture.cancel())
+ */
+ false
+ } else true
+ } else false
+
+ if (runNow) Future.dispatchTask(() ⇒ notifyCompleted(func))
+
+ this
+ }
+
+ final def orElse[A >: T](fallback: ⇒ A): Future[A] =
+ if (timeout.duration.isFinite) {
+ getState match {
+ case _: Success[_] | _: Failure[_] ⇒ this
+ case Expired ⇒ Future[A](fallback, timeout)
+ case _: Pending[_] ⇒
+ val promise = DefaultPromise[A](Timeout.never) //TODO FIXME We can't have infinite timeout here, doesn't make sense.
+ promise completeWith this
+ val runnable = new Runnable {
+ def run() {
+ if (!isCompleted) {
+ if (!isExpired) {
+ // TODO FIXME add scheduler
+ //dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
+ } else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) })
+ }
+ }
+ }
+ // TODO FIXME add
+ //dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
+ promise
+ }
+ } else this
+
+ private def notifyCompleted(func: Future[T] ⇒ Unit) {
+ try { func(this) } catch { case e ⇒ /*dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception"))*/ } //TODO catch, everything? Really?
+ }
+
+ @inline
+ private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)?
+ //TODO: the danger of Math.abs is that it could break the ordering of time. So I would not recommend an abs.
+ @inline
+ private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
+
+ private def timeLeftNoinline(): Long = timeLeft()
+//}
+
+/**
+ * An already completed Future is seeded with it's result at creation, is useful for when you are participating in
+ * a Future-composition but you already have a value to contribute.
+ */
+sealed class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
+ val value = Some(suppliedValue)
+
+ def complete(value: Either[Throwable, T]): this.type = this
+ def onComplete(func: Future[T] ⇒ Unit): this.type = {
+ Future dispatchTask (() ⇒ func(this))
+ this
+ }
+ def await(atMost: Duration): this.type = this
+ def await: this.type = this
+ def isExpired: Boolean = true
+ def timeout: Timeout = Timeout.zero
+
+ final def onTimeout(func: Future[T] ⇒ Unit): this.type = this
+ final def orElse[A >: T](fallback: ⇒ A): Future[A] = this
+ */
+//}
+
+object BoxedType {
+
+ private val toBoxed = Map[Class[_], Class[_]](
+ classOf[Boolean] -> classOf[jl.Boolean],
+ classOf[Byte] -> classOf[jl.Byte],
+ classOf[Char] -> classOf[jl.Character],
+ classOf[Short] -> classOf[jl.Short],
+ classOf[Int] -> classOf[jl.Integer],
+ classOf[Long] -> classOf[jl.Long],
+ classOf[Float] -> classOf[jl.Float],
+ classOf[Double] -> classOf[jl.Double],
+ classOf[Unit] -> classOf[scala.runtime.BoxedUnit])
+
+ def apply(c: Class[_]): Class[_] = {
+ if (c.isPrimitive) toBoxed(c) else c
+ }
+
+}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
new file mode 100644
index 0000000000..aae0135af4
--- /dev/null
+++ b/src/library/scala/concurrent/Promise.scala
@@ -0,0 +1,78 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+
+
+import scala.util.Timeout
+
+
+
+/** Promise is an object which can be completed with a value or failed
+ * with an exception.
+ *
+ * @define promiseCompletion
+ * If the promise has already been fulfilled, failed or has timed out,
+ * calling this method will throw an IllegalStateException.
+ *
+ * @define allowedThrowables
+ * If the throwable used to fail this promise is an error, a control exception
+ * or an interrupted exception, it will be wrapped as a cause within an
+ * `ExecutionException` which will fail the promise.
+ */
+trait Promise[T] {
+
+ /** Future containing the value of this promise.
+ */
+ def future: Future[T]
+
+ /** Completes the promise with a value.
+ *
+ * @param value The value to complete the promise with.
+ *
+ * $promiseCompletion
+ */
+ def success(value: T): Unit
+
+ /** Completes the promise with an exception.
+ *
+ * @param t The throwable to complete the promise with.
+ *
+ * $allowedThrowables
+ *
+ * $promiseCompletion
+ */
+ def failure(t: Throwable): Unit
+
+ /** Wraps a `Throwable` in an `ExecutionException` if necessary.
+ *
+ * $allowedThrowables
+ */
+ protected def wrap(t: Throwable): Throwable = t match {
+ case t: Throwable if isFutureThrowable(t) => t
+ case _ => new ExecutionException(t)
+ }
+
+}
+
+
+
+object Promise {
+ /*
+ /**
+ * Creates a non-completed, new, Promise with the supplied timeout in milliseconds
+ */
+ def apply[A](timeout: Timeout)(implicit dispatcher: MessageDispatcher): Promise[A] = DefaultPromise[A](timeout)
+
+ /**
+ * Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf)
+ */
+ def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[A] = apply(timeout)
+ */
+}
diff --git a/src/library/scala/concurrent/Scheduler.scala b/src/library/scala/concurrent/Scheduler.scala
new file mode 100644
index 0000000000..39d798e6b4
--- /dev/null
+++ b/src/library/scala/concurrent/Scheduler.scala
@@ -0,0 +1,54 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+import scala.util.Duration
+
+/** A service for scheduling tasks and thunks for one-time, or periodic execution.
+ */
+trait Scheduler {
+
+ /** Schedules a thunk for repeated execution with an initial delay and a frequency.
+ *
+ * @param delay the initial delay after which the thunk should be executed
+ * the first time
+ * @param frequency the frequency with which the thunk should be executed,
+ * as a time period between subsequent executions
+ */
+ def schedule(delay: Duration, frequency: Duration)(thunk: => Unit): Cancellable
+
+ /** Schedules a task for execution after a given delay.
+ *
+ * @param delay the duration after which the task should be executed
+ * @param task the task that is scheduled for execution
+ * @return a `Cancellable` that may be used to cancel the execution
+ * of the task
+ */
+ def scheduleOnce(delay: Duration, task: Runnable): Cancellable
+
+ /** Schedules a thunk for execution after a given delay.
+ *
+ * @param delay the duration after which the thunk should be executed
+ * @param thunk the thunk that is scheduled for execution
+ * @return a `Cancellable` that may be used to cancel the execution
+ * of the thunk
+ */
+ def scheduleOnce(delay: Duration)(task: => Unit): Cancellable
+
+}
+
+
+
+trait Cancellable {
+
+ /** Cancels the underlying task.
+ */
+ def cancel(): Unit
+
+}
diff --git a/src/library/scala/concurrent/Task.scala b/src/library/scala/concurrent/Task.scala
new file mode 100644
index 0000000000..d6f86bac31
--- /dev/null
+++ b/src/library/scala/concurrent/Task.scala
@@ -0,0 +1,13 @@
+package scala.concurrent
+
+
+
+trait Task[+T] {
+
+ def start(): Unit
+
+ def future: Future[T]
+
+}
+
+
diff --git a/src/library/scala/concurrent/default/SchedulerImpl.scala b/src/library/scala/concurrent/default/SchedulerImpl.scala
new file mode 100644
index 0000000000..745d2d1a15
--- /dev/null
+++ b/src/library/scala/concurrent/default/SchedulerImpl.scala
@@ -0,0 +1,44 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+package default
+
+import scala.util.Duration
+
+private[concurrent] final class SchedulerImpl extends Scheduler {
+ private val timer =
+ new java.util.Timer(true) // the associated thread runs as a daemon
+
+ def schedule(delay: Duration, frequency: Duration)(thunk: => Unit): Cancellable = ???
+
+ def scheduleOnce(delay: Duration, task: Runnable): Cancellable = {
+ val timerTask = new java.util.TimerTask {
+ def run(): Unit =
+ task.run()
+ }
+ timer.schedule(timerTask, delay.toMillis)
+ new Cancellable {
+ def cancel(): Unit =
+ timerTask.cancel()
+ }
+ }
+
+ def scheduleOnce(delay: Duration)(task: => Unit): Cancellable = {
+ val timerTask = new java.util.TimerTask {
+ def run(): Unit =
+ task
+ }
+ timer.schedule(timerTask, delay.toMillis)
+ new Cancellable {
+ def cancel(): Unit =
+ timerTask.cancel()
+ }
+ }
+
+}
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala
new file mode 100644
index 0000000000..dac6400b45
--- /dev/null
+++ b/src/library/scala/concurrent/default/TaskImpl.scala
@@ -0,0 +1,285 @@
+package scala.concurrent
+package default
+
+
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread }
+import scala.util.{ Timeout, Duration }
+import scala.annotation.tailrec
+
+
+
+private[concurrent] trait Completable[T] {
+ self: Future[T] =>
+
+ val executionContext: ExecutionContextImpl
+
+ type Callback = Either[Throwable, T] => Any
+
+ def getState: State[T]
+
+ def casState(oldv: State[T], newv: State[T]): Boolean
+
+ protected def dispatch[U](r: Runnable) = executionContext execute r
+
+ protected def processCallbacks(cbs: List[Callback], r: Either[Throwable, T]) =
+ for (cb <- cbs) dispatch(new Runnable {
+ override def run() = cb(r)
+ })
+
+ def future: Future[T] = self
+
+ def onComplete[U](callback: Either[Throwable, T] => U): this.type = {
+ @tailrec def tryAddCallback(): Either[Throwable, T] = {
+ getState match {
+ case p @ Pending(lst) =>
+ val pt = p.asInstanceOf[Pending[T]]
+ if (casState(pt, Pending(callback :: pt.callbacks))) null
+ else tryAddCallback()
+ case Success(res) => Right(res)
+ case Failure(t) => Left(t)
+ }
+ }
+
+ val res = tryAddCallback()
+ if (res != null) dispatch(new Runnable {
+ override def run() =
+ try callback(res)
+ catch handledFutureException andThen {
+ t => Console.err.println(t)
+ }
+ })
+
+ this
+ }
+
+ def isTimedout: Boolean = getState match {
+ case Failure(ft: FutureTimeoutException) => true
+ case _ => false
+ }
+
+}
+
+private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl)
+ extends Promise[T] with Future[T] with Completable[T] {
+
+ val executionContext: scala.concurrent.default.ExecutionContextImpl = context
+
+ @volatile private var state: State[T] = _
+
+ val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[PromiseImpl[T]], classOf[State[T]], "state")
+
+ updater.set(this, Pending(List()))
+
+ def casState(oldv: State[T], newv: State[T]): Boolean = {
+ updater.compareAndSet(this, oldv, newv)
+ }
+
+ def getState: State[T] = {
+ updater.get(this)
+ }
+
+ @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match {
+ case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs
+ case _ => null
+ }
+
+ /** Completes the promise with a value.
+ *
+ * @param value The value to complete the promise with.
+ *
+ * $promiseCompletion
+ */
+ def success(value: T): Unit = {
+ val cbs = tryCompleteState(Success(value))
+ if (cbs == null)
+ throw new IllegalStateException
+ else {
+ processCallbacks(cbs, Right(value))
+ this.synchronized {
+ this.notifyAll()
+ }
+ }
+ }
+
+ /** Completes the promise with an exception.
+ *
+ * @param t The throwable to complete the promise with.
+ *
+ * $promiseCompletion
+ */
+ def failure(t: Throwable): Unit = {
+ val wrapped = wrap(t)
+ val cbs = tryCompleteState(Failure(wrapped))
+ if (cbs == null)
+ throw new IllegalStateException
+ else {
+ processCallbacks(cbs, Left(wrapped))
+ this.synchronized {
+ this.notifyAll()
+ }
+ }
+ }
+
+ def await(timeout: Timeout)(implicit canblock: scala.concurrent.CanBlock): T = getState match {
+ case Success(res) => res
+ case Failure(t) => throw t
+ case _ =>
+ this.synchronized {
+ while (true)
+ getState match {
+ case Pending(_) => this.wait()
+ case Success(res) => return res
+ case Failure(t) => throw t
+ }
+ }
+ sys.error("unreachable")
+ }
+
+}
+
+private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T)
+ extends RecursiveAction with Task[T] with Future[T] with Completable[T] {
+
+ val executionContext: ExecutionContextImpl = context
+
+ @volatile private var state: State[T] = _
+
+ val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[TaskImpl[T]], classOf[State[T]], "state")
+
+ updater.set(this, Pending(List()))
+
+ def casState(oldv: State[T], newv: State[T]): Boolean = {
+ updater.compareAndSet(this, oldv, newv)
+ }
+
+ def getState: State[T] = {
+ updater.get(this)
+ }
+
+ @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match {
+ case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs
+ }
+
+ def compute(): Unit = {
+ var cbs: List[Callback] = null
+ try {
+ val res = body
+ processCallbacks(tryCompleteState(Success(res)), Right(res))
+ } catch {
+ case t if isFutureThrowable(t) =>
+ processCallbacks(tryCompleteState(Failure(t)), Left(t))
+ case t =>
+ val ee = new ExecutionException(t)
+ processCallbacks(tryCompleteState(Failure(ee)), Left(ee))
+ throw t
+ }
+ }
+
+ def start(): Unit = {
+ Thread.currentThread match {
+ case fj: ForkJoinWorkerThread if fj.getPool eq executionContext.pool => fork()
+ case _ => executionContext.pool.execute(this)
+ }
+ }
+
+ // TODO FIXME: handle timeouts
+ def await(atMost: Duration): this.type =
+ await
+
+ def await: this.type = {
+ this.join()
+ this
+ }
+
+ def tryCancel(): Unit =
+ tryUnfork()
+
+ def await(timeout: Timeout)(implicit canblock: CanBlock): T = {
+ join() // TODO handle timeout also
+ (updater.get(this): @unchecked) match {
+ case Success(r) => r
+ case Failure(t) => throw t
+ }
+ }
+
+}
+
+
+private[concurrent] sealed abstract class State[T]
+
+
+case class Pending[T](callbacks: List[Either[Throwable, T] => Any]) extends State[T]
+
+
+case class Success[T](result: T) extends State[T]
+
+
+case class Failure[T](throwable: Throwable) extends State[T]
+
+
+private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
+ val pool = {
+ val p = new ForkJoinPool
+ p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
+ def uncaughtException(t: Thread, throwable: Throwable) {
+ Console.err.println(throwable.getMessage)
+ throwable.printStackTrace(Console.err)
+ }
+ })
+ p
+ }
+
+ @inline
+ private def executeTask(task: RecursiveAction) {
+ if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread])
+ task.fork()
+ else
+ pool execute task
+ }
+
+ def execute(task: Runnable) {
+ val action = new RecursiveAction { def compute() { task.run() } }
+ executeTask(action)
+ }
+
+ def execute[U](body: () => U) {
+ val action = new RecursiveAction { def compute() { body() } }
+ executeTask(action)
+ }
+
+ def task[T](body: => T): Task[T] = {
+ new TaskImpl(this, body)
+ }
+
+ def future[T](body: => T): Future[T] = {
+ val t = task(body)
+ t.start()
+ t.future
+ }
+
+ def promise[T]: Promise[T] =
+ new PromiseImpl[T](this)
+
+ // TODO fix the timeout
+ def blockingCall[T](timeout: Timeout, b: Awaitable[T]): T = b match {
+ case fj: TaskImpl[_] if fj.executionContext.pool eq pool =>
+ fj.await(timeout)
+ case _ =>
+ var res: T = null.asInstanceOf[T]
+ @volatile var blockingDone = false
+ // TODO add exception handling here!
+ val mb = new ForkJoinPool.ManagedBlocker {
+ def block() = {
+ res = b.await(timeout)(CanBlockEvidence)
+ blockingDone = true
+ true
+ }
+ def isReleasable = blockingDone
+ }
+ ForkJoinPool.managedBlock(mb, true)
+ res
+ }
+
+}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
new file mode 100644
index 0000000000..33e1b65993
--- /dev/null
+++ b/src/library/scala/concurrent/package.scala
@@ -0,0 +1,106 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+
+
+package scala
+
+
+
+import scala.util.{ Timeout, Duration }
+
+
+
+/** This package object contains primitives for concurrent and parallel programming.
+ */
+package object concurrent {
+
+ type ExecutionException = java.util.concurrent.ExecutionException
+ type CancellationException = java.util.concurrent.CancellationException
+ type TimeoutException = java.util.concurrent.TimeoutException
+
+ /** A global execution environment for executing lightweight tasks.
+ */
+ lazy val executionContext =
+ new default.ExecutionContextImpl
+
+ /** A global service for scheduling tasks for execution.
+ */
+ lazy val scheduler =
+ new default.SchedulerImpl
+
+ private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] {
+ override protected def initialValue = null
+ }
+
+ val handledFutureException: PartialFunction[Throwable, Throwable] = {
+ case t: Throwable if isFutureThrowable(t) => t
+ }
+
+ // TODO rename appropriately and make public
+ private[concurrent] def isFutureThrowable(t: Throwable) = t match {
+ case e: Error => false
+ case t: scala.util.control.ControlThrowable => false
+ case i: InterruptedException => false
+ case _ => true
+ }
+
+ /* concurrency constructs */
+
+ def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] =
+ execCtx future body
+
+ def promise[T]()(implicit execCtx: ExecutionContext = executionContext): Promise[T] =
+ execCtx promise
+
+ /** Used to block on a piece of code which potentially blocks.
+ *
+ * @param body A piece of code which contains potentially blocking or long running calls.
+ *
+ * Calling this method may throw the following exceptions:
+ * - CancellationException - if the computation was cancelled
+ * - InterruptedException - in the case that a wait within the blockable object was interrupted
+ * - TimeoutException - in the case that the blockable object timed out
+ */
+ def await[T](timeout: Timeout)(body: =>T): T = await(timeout, new Awaitable[T] {
+ def await(timeout: Timeout)(implicit cb: CanBlock) = body
+ })
+
+ /** Blocks on a blockable object.
+ *
+ * @param awaitable An object with a `block` method which runs potentially blocking or long running calls.
+ *
+ * Calling this method may throw the following exceptions:
+ * - CancellationException - if the computation was cancelled
+ * - InterruptedException - in the case that a wait within the blockable object was interrupted
+ * - TimeoutException - in the case that the blockable object timed out
+ */
+ def await[T](timeout: Timeout, awaitable: Awaitable[T]): T = {
+ currentExecutionContext.get match {
+ case null => awaitable.await(timeout)(null) // outside - TODO - fix timeout case
+ case x => x.blockingCall(timeout, awaitable) // inside an execution context thread
+ }
+ }
+
+}
+
+
+
+package concurrent {
+
+ /** A timeout exception.
+ *
+ * Futures are failed with a timeout exception when their timeout expires.
+ *
+ * Each timeout exception contains an origin future which originally timed out.
+ */
+ class FutureTimeoutException(origin: Future[_], message: String) extends TimeoutException(message) {
+ def this(origin: Future[_]) = this(origin, "Future timed out.")
+ }
+
+}
diff --git a/src/library/scala/package.scala b/src/library/scala/package.scala
index 0c5d10b15e..915ce6a648 100644
--- a/src/library/scala/package.scala
+++ b/src/library/scala/package.scala
@@ -27,7 +27,8 @@ package object scala {
type NoSuchElementException = java.util.NoSuchElementException
type NumberFormatException = java.lang.NumberFormatException
type AbstractMethodError = java.lang.AbstractMethodError
-
+ type InterruptedException = java.lang.InterruptedException
+
@deprecated("instead of `@serializable class C`, use `class C extends Serializable`", "2.9.0")
type serializable = annotation.serializable
diff --git a/src/library/scala/util/Duration.scala b/src/library/scala/util/Duration.scala
new file mode 100644
index 0000000000..4c118f8b3b
--- /dev/null
+++ b/src/library/scala/util/Duration.scala
@@ -0,0 +1,485 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package scala.util
+
+import java.util.concurrent.TimeUnit
+import TimeUnit._
+import java.lang.{ Long ⇒ JLong, Double ⇒ JDouble }
+//import akka.actor.ActorSystem (commented methods)
+
+class TimerException(message: String) extends RuntimeException(message)
+
+/**
+ * Simple timer class.
+ * Usage:
+ * <pre>
+ * import akka.util.duration._
+ * import akka.util.Timer
+ *
+ * val timer = Timer(30.seconds)
+ * while (timer.isTicking) { ... }
+ * </pre>
+ */
+case class Timer(duration: Duration, throwExceptionOnTimeout: Boolean = false) {
+ val startTimeInMillis = System.currentTimeMillis
+ val timeoutInMillis = duration.toMillis
+
+ /**
+ * Returns true while the timer is ticking. After that it either throws and exception or
+ * returns false. Depending on if the 'throwExceptionOnTimeout' argument is true or false.
+ */
+ def isTicking: Boolean = {
+ if (!(timeoutInMillis > (System.currentTimeMillis - startTimeInMillis))) {
+ if (throwExceptionOnTimeout) throw new TimerException("Time out after " + duration)
+ else false
+ } else true
+ }
+}
+
+object Duration {
+ def apply(length: Long, unit: TimeUnit): Duration = new FiniteDuration(length, unit)
+ def apply(length: Double, unit: TimeUnit): Duration = fromNanos(unit.toNanos(1) * length)
+ def apply(length: Long, unit: String): Duration = new FiniteDuration(length, timeUnit(unit))
+
+ def fromNanos(nanos: Long): Duration = {
+ if (nanos % 86400000000000L == 0) {
+ Duration(nanos / 86400000000000L, DAYS)
+ } else if (nanos % 3600000000000L == 0) {
+ Duration(nanos / 3600000000000L, HOURS)
+ } else if (nanos % 60000000000L == 0) {
+ Duration(nanos / 60000000000L, MINUTES)
+ } else if (nanos % 1000000000L == 0) {
+ Duration(nanos / 1000000000L, SECONDS)
+ } else if (nanos % 1000000L == 0) {
+ Duration(nanos / 1000000L, MILLISECONDS)
+ } else if (nanos % 1000L == 0) {
+ Duration(nanos / 1000L, MICROSECONDS)
+ } else {
+ Duration(nanos, NANOSECONDS)
+ }
+ }
+
+ def fromNanos(nanos: Double): Duration = fromNanos((nanos + 0.5).asInstanceOf[Long])
+
+ /**
+ * Construct a Duration by parsing a String. In case of a format error, a
+ * RuntimeException is thrown. See `unapply(String)` for more information.
+ */
+ def apply(s: String): Duration = unapply(s) getOrElse sys.error("format error")
+
+ /**
+ * Deconstruct a Duration into length and unit if it is finite.
+ */
+ def unapply(d: Duration): Option[(Long, TimeUnit)] = {
+ if (d.finite_?) {
+ Some((d.length, d.unit))
+ } else {
+ None
+ }
+ }
+
+ private val RE = ("""^\s*(\d+(?:\.\d+)?)\s*""" + // length part
+ "(?:" + // units are distinguished in separate match groups
+ "(d|day|days)|" +
+ "(h|hour|hours)|" +
+ "(min|minute|minutes)|" +
+ "(s|sec|second|seconds)|" +
+ "(ms|milli|millis|millisecond|milliseconds)|" +
+ "(µs|micro|micros|microsecond|microseconds)|" +
+ "(ns|nano|nanos|nanosecond|nanoseconds)" +
+ """)\s*$""").r // close the non-capturing group
+ private val REinf = """^\s*Inf\s*$""".r
+ private val REminf = """^\s*(?:-\s*|Minus)Inf\s*""".r
+
+ /**
+ * Parse String, return None if no match. Format is `"<length><unit>"`, where
+ * whitespace is allowed before, between and after the parts. Infinities are
+ * designated by `"Inf"` and `"-Inf"` or `"MinusInf"`.
+ */
+ def unapply(s: String): Option[Duration] = s match {
+ case RE(length, d, h, m, s, ms, mus, ns) ⇒
+ if (d ne null) Some(Duration(JDouble.parseDouble(length), DAYS)) else if (h ne null) Some(Duration(JDouble.parseDouble(length), HOURS)) else if (m ne null) Some(Duration(JDouble.parseDouble(length), MINUTES)) else if (s ne null) Some(Duration(JDouble.parseDouble(length), SECONDS)) else if (ms ne null) Some(Duration(JDouble.parseDouble(length), MILLISECONDS)) else if (mus ne null) Some(Duration(JDouble.parseDouble(length), MICROSECONDS)) else if (ns ne null) Some(Duration(JDouble.parseDouble(length), NANOSECONDS)) else
+ sys.error("made some error in regex (should not be possible)")
+ case REinf() ⇒ Some(Inf)
+ case REminf() ⇒ Some(MinusInf)
+ case _ ⇒ None
+ }
+
+ /**
+ * Parse TimeUnit from string representation.
+ */
+ def timeUnit(unit: String) = unit.toLowerCase match {
+ case "d" | "day" | "days" ⇒ DAYS
+ case "h" | "hour" | "hours" ⇒ HOURS
+ case "min" | "minute" | "minutes" ⇒ MINUTES
+ case "s" | "sec" | "second" | "seconds" ⇒ SECONDS
+ case "ms" | "milli" | "millis" | "millisecond" | "milliseconds" ⇒ MILLISECONDS
+ case "µs" | "micro" | "micros" | "microsecond" | "microseconds" ⇒ MICROSECONDS
+ case "ns" | "nano" | "nanos" | "nanosecond" | "nanoseconds" ⇒ NANOSECONDS
+ }
+
+ val Zero: Duration = new FiniteDuration(0, NANOSECONDS)
+ val Undefined: Duration = new Duration with Infinite {
+ override def toString = "Duration.Undefined"
+ override def equals(other: Any) = other.asInstanceOf[AnyRef] eq this
+ override def +(other: Duration): Duration = throw new IllegalArgumentException("cannot add Undefined duration")
+ override def -(other: Duration): Duration = throw new IllegalArgumentException("cannot subtract Undefined duration")
+ override def *(factor: Double): Duration = throw new IllegalArgumentException("cannot multiply Undefined duration")
+ override def /(factor: Double): Duration = throw new IllegalArgumentException("cannot divide Undefined duration")
+ override def /(other: Duration): Double = throw new IllegalArgumentException("cannot divide Undefined duration")
+ def >(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def >=(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def <(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def <=(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def unary_- : Duration = throw new IllegalArgumentException("cannot negate Undefined duration")
+ }
+
+ trait Infinite {
+ this: Duration ⇒
+
+ override def equals(other: Any) = false
+
+ def +(other: Duration): Duration =
+ other match {
+ case _: this.type ⇒ this
+ case _: Infinite ⇒ throw new IllegalArgumentException("illegal addition of infinities")
+ case _ ⇒ this
+ }
+ def -(other: Duration): Duration =
+ other match {
+ case _: this.type ⇒ throw new IllegalArgumentException("illegal subtraction of infinities")
+ case _ ⇒ this
+ }
+ def *(factor: Double): Duration = this
+ def /(factor: Double): Duration = this
+ def /(other: Duration): Double =
+ other match {
+ case _: Infinite ⇒ throw new IllegalArgumentException("illegal division of infinities")
+ // maybe questionable but pragmatic: Inf / 0 => Inf
+ case x ⇒ Double.PositiveInfinity * (if ((this > Zero) ^ (other >= Zero)) -1 else 1)
+ }
+
+ def finite_? = false
+
+ def length: Long = throw new IllegalArgumentException("length not allowed on infinite Durations")
+ def unit: TimeUnit = throw new IllegalArgumentException("unit not allowed on infinite Durations")
+ def toNanos: Long = throw new IllegalArgumentException("toNanos not allowed on infinite Durations")
+ def toMicros: Long = throw new IllegalArgumentException("toMicros not allowed on infinite Durations")
+ def toMillis: Long = throw new IllegalArgumentException("toMillis not allowed on infinite Durations")
+ def toSeconds: Long = throw new IllegalArgumentException("toSeconds not allowed on infinite Durations")
+ def toMinutes: Long = throw new IllegalArgumentException("toMinutes not allowed on infinite Durations")
+ def toHours: Long = throw new IllegalArgumentException("toHours not allowed on infinite Durations")
+ def toDays: Long = throw new IllegalArgumentException("toDays not allowed on infinite Durations")
+ def toUnit(unit: TimeUnit): Double = throw new IllegalArgumentException("toUnit not allowed on infinite Durations")
+
+ def printHMS = toString
+ }
+
+ /**
+ * Infinite duration: greater than any other and not equal to any other,
+ * including itself.
+ */
+ val Inf: Duration = new Duration with Infinite {
+ override def toString = "Duration.Inf"
+ def >(other: Duration) = true
+ def >=(other: Duration) = true
+ def <(other: Duration) = false
+ def <=(other: Duration) = false
+ def unary_- : Duration = MinusInf
+ }
+
+ /**
+ * Infinite negative duration: lesser than any other and not equal to any other,
+ * including itself.
+ */
+ val MinusInf: Duration = new Duration with Infinite {
+ override def toString = "Duration.MinusInf"
+ def >(other: Duration) = false
+ def >=(other: Duration) = false
+ def <(other: Duration) = true
+ def <=(other: Duration) = true
+ def unary_- : Duration = Inf
+ }
+
+ // Java Factories
+ def create(length: Long, unit: TimeUnit): Duration = apply(length, unit)
+ def create(length: Double, unit: TimeUnit): Duration = apply(length, unit)
+ def create(length: Long, unit: String): Duration = apply(length, unit)
+ def parse(s: String): Duration = unapply(s).get
+}
+
+/**
+ * Utility for working with java.util.concurrent.TimeUnit durations.
+ *
+ * <p/>
+ * Examples of usage from Java:
+ * <pre>
+ * import akka.util.FiniteDuration;
+ * import java.util.concurrent.TimeUnit;
+ *
+ * Duration duration = new FiniteDuration(100, MILLISECONDS);
+ * Duration duration = new FiniteDuration(5, "seconds");
+ *
+ * duration.toNanos();
+ * </pre>
+ *
+ * <p/>
+ * Examples of usage from Scala:
+ * <pre>
+ * import akka.util.Duration
+ * import java.util.concurrent.TimeUnit
+ *
+ * val duration = Duration(100, MILLISECONDS)
+ * val duration = Duration(100, "millis")
+ *
+ * duration.toNanos
+ * duration < 1.second
+ * duration <= Duration.Inf
+ * </pre>
+ *
+ * <p/>
+ * Implicits are also provided for Int, Long and Double. Example usage:
+ * <pre>
+ * import akka.util.duration._
+ *
+ * val duration = 100 millis
+ * </pre>
+ *
+ * Extractors, parsing and arithmetic are also included:
+ * <pre>
+ * val d = Duration("1.2 µs")
+ * val Duration(length, unit) = 5 millis
+ * val d2 = d * 2.5
+ * val d3 = d2 + 1.millisecond
+ * </pre>
+ */
+abstract class Duration extends Serializable {
+ def length: Long
+ def unit: TimeUnit
+ def toNanos: Long
+ def toMicros: Long
+ def toMillis: Long
+ def toSeconds: Long
+ def toMinutes: Long
+ def toHours: Long
+ def toDays: Long
+ def toUnit(unit: TimeUnit): Double
+ def printHMS: String
+ def <(other: Duration): Boolean
+ def <=(other: Duration): Boolean
+ def >(other: Duration): Boolean
+ def >=(other: Duration): Boolean
+ def +(other: Duration): Duration
+ def -(other: Duration): Duration
+ def *(factor: Double): Duration
+ def /(factor: Double): Duration
+ def /(other: Duration): Double
+ def unary_- : Duration
+ def finite_? : Boolean
+// def dilated(implicit system: ActorSystem): Duration = this * system.settings.TestTimeFactor
+ def min(other: Duration): Duration = if (this < other) this else other
+ def max(other: Duration): Duration = if (this > other) this else other
+ def sleep(): Unit = Thread.sleep(toMillis)
+
+ // Java API
+ def lt(other: Duration) = this < other
+ def lteq(other: Duration) = this <= other
+ def gt(other: Duration) = this > other
+ def gteq(other: Duration) = this >= other
+ def plus(other: Duration) = this + other
+ def minus(other: Duration) = this - other
+ def mul(factor: Double) = this * factor
+ def div(factor: Double) = this / factor
+ def div(other: Duration) = this / other
+ def neg() = -this
+ def isFinite() = finite_?
+}
+
+class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duration {
+ import Duration._
+
+ def this(length: Long, unit: String) = this(length, Duration.timeUnit(unit))
+
+ def toNanos = unit.toNanos(length)
+ def toMicros = unit.toMicros(length)
+ def toMillis = unit.toMillis(length)
+ def toSeconds = unit.toSeconds(length)
+ def toMinutes = unit.toMinutes(length)
+ def toHours = unit.toHours(length)
+ def toDays = unit.toDays(length)
+ def toUnit(u: TimeUnit) = long2double(toNanos) / NANOSECONDS.convert(1, u)
+
+ override def toString = this match {
+ case Duration(1, DAYS) ⇒ "1 day"
+ case Duration(x, DAYS) ⇒ x + " days"
+ case Duration(1, HOURS) ⇒ "1 hour"
+ case Duration(x, HOURS) ⇒ x + " hours"
+ case Duration(1, MINUTES) ⇒ "1 minute"
+ case Duration(x, MINUTES) ⇒ x + " minutes"
+ case Duration(1, SECONDS) ⇒ "1 second"
+ case Duration(x, SECONDS) ⇒ x + " seconds"
+ case Duration(1, MILLISECONDS) ⇒ "1 millisecond"
+ case Duration(x, MILLISECONDS) ⇒ x + " milliseconds"
+ case Duration(1, MICROSECONDS) ⇒ "1 microsecond"
+ case Duration(x, MICROSECONDS) ⇒ x + " microseconds"
+ case Duration(1, NANOSECONDS) ⇒ "1 nanosecond"
+ case Duration(x, NANOSECONDS) ⇒ x + " nanoseconds"
+ }
+
+ def printHMS = "%02d:%02d:%06.3f".format(toHours, toMinutes % 60, toMillis / 1000.0 % 60)
+
+ def <(other: Duration) = {
+ if (other.finite_?) {
+ toNanos < other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other > this
+ }
+ }
+
+ def <=(other: Duration) = {
+ if (other.finite_?) {
+ toNanos <= other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other >= this
+ }
+ }
+
+ def >(other: Duration) = {
+ if (other.finite_?) {
+ toNanos > other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other < this
+ }
+ }
+
+ def >=(other: Duration) = {
+ if (other.finite_?) {
+ toNanos >= other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other <= this
+ }
+ }
+
+ def +(other: Duration) = {
+ if (!other.finite_?) {
+ other
+ } else {
+ val nanos = toNanos + other.asInstanceOf[FiniteDuration].toNanos
+ fromNanos(nanos)
+ }
+ }
+
+ def -(other: Duration) = {
+ if (!other.finite_?) {
+ other
+ } else {
+ val nanos = toNanos - other.asInstanceOf[FiniteDuration].toNanos
+ fromNanos(nanos)
+ }
+ }
+
+ def *(factor: Double) = fromNanos(long2double(toNanos) * factor)
+
+ def /(factor: Double) = fromNanos(long2double(toNanos) / factor)
+
+ def /(other: Duration) = if (other.finite_?) long2double(toNanos) / other.toNanos else 0
+
+ def unary_- = Duration(-length, unit)
+
+ def finite_? = true
+
+ override def equals(other: Any) =
+ other.isInstanceOf[FiniteDuration] &&
+ toNanos == other.asInstanceOf[FiniteDuration].toNanos
+
+ override def hashCode = toNanos.asInstanceOf[Int]
+}
+
+class DurationInt(n: Int) {
+ def nanoseconds = Duration(n, NANOSECONDS)
+ def nanos = Duration(n, NANOSECONDS)
+ def nanosecond = Duration(n, NANOSECONDS)
+ def nano = Duration(n, NANOSECONDS)
+
+ def microseconds = Duration(n, MICROSECONDS)
+ def micros = Duration(n, MICROSECONDS)
+ def microsecond = Duration(n, MICROSECONDS)
+ def micro = Duration(n, MICROSECONDS)
+
+ def milliseconds = Duration(n, MILLISECONDS)
+ def millis = Duration(n, MILLISECONDS)
+ def millisecond = Duration(n, MILLISECONDS)
+ def milli = Duration(n, MILLISECONDS)
+
+ def seconds = Duration(n, SECONDS)
+ def second = Duration(n, SECONDS)
+
+ def minutes = Duration(n, MINUTES)
+ def minute = Duration(n, MINUTES)
+
+ def hours = Duration(n, HOURS)
+ def hour = Duration(n, HOURS)
+
+ def days = Duration(n, DAYS)
+ def day = Duration(n, DAYS)
+}
+
+class DurationLong(n: Long) {
+ def nanoseconds = Duration(n, NANOSECONDS)
+ def nanos = Duration(n, NANOSECONDS)
+ def nanosecond = Duration(n, NANOSECONDS)
+ def nano = Duration(n, NANOSECONDS)
+
+ def microseconds = Duration(n, MICROSECONDS)
+ def micros = Duration(n, MICROSECONDS)
+ def microsecond = Duration(n, MICROSECONDS)
+ def micro = Duration(n, MICROSECONDS)
+
+ def milliseconds = Duration(n, MILLISECONDS)
+ def millis = Duration(n, MILLISECONDS)
+ def millisecond = Duration(n, MILLISECONDS)
+ def milli = Duration(n, MILLISECONDS)
+
+ def seconds = Duration(n, SECONDS)
+ def second = Duration(n, SECONDS)
+
+ def minutes = Duration(n, MINUTES)
+ def minute = Duration(n, MINUTES)
+
+ def hours = Duration(n, HOURS)
+ def hour = Duration(n, HOURS)
+
+ def days = Duration(n, DAYS)
+ def day = Duration(n, DAYS)
+}
+
+class DurationDouble(d: Double) {
+ def nanoseconds = Duration(d, NANOSECONDS)
+ def nanos = Duration(d, NANOSECONDS)
+ def nanosecond = Duration(d, NANOSECONDS)
+ def nano = Duration(d, NANOSECONDS)
+
+ def microseconds = Duration(d, MICROSECONDS)
+ def micros = Duration(d, MICROSECONDS)
+ def microsecond = Duration(d, MICROSECONDS)
+ def micro = Duration(d, MICROSECONDS)
+
+ def milliseconds = Duration(d, MILLISECONDS)
+ def millis = Duration(d, MILLISECONDS)
+ def millisecond = Duration(d, MILLISECONDS)
+ def milli = Duration(d, MILLISECONDS)
+
+ def seconds = Duration(d, SECONDS)
+ def second = Duration(d, SECONDS)
+
+ def minutes = Duration(d, MINUTES)
+ def minute = Duration(d, MINUTES)
+
+ def hours = Duration(d, HOURS)
+ def hour = Duration(d, HOURS)
+
+ def days = Duration(d, DAYS)
+ def day = Duration(d, DAYS)
+}
diff --git a/src/library/scala/util/Timeout.scala b/src/library/scala/util/Timeout.scala
new file mode 100644
index 0000000000..0190675344
--- /dev/null
+++ b/src/library/scala/util/Timeout.scala
@@ -0,0 +1,33 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+package scala.util
+
+import java.util.concurrent.TimeUnit
+
+case class Timeout(duration: Duration) {
+ def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
+ def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
+}
+
+object Timeout {
+ /**
+ * A timeout with zero duration, will cause most requests to always timeout.
+ */
+ val zero = new Timeout(Duration.Zero)
+
+ /**
+ * A Timeout with infinite duration. Will never timeout. Use extreme caution with this
+ * as it may cause memory leaks, blocked threads, or may not even be supported by
+ * the receiver, which would result in an exception.
+ */
+ val never = new Timeout(Duration.Inf)
+
+ def apply(timeout: Long) = new Timeout(timeout)
+ def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit)
+
+ implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
+ implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
+ implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
+ //implicit def defaultTimeout(implicit system: ActorSystem) = system.settings.ActorTimeout (have to introduce this in ActorSystem)
+}
diff --git a/test/files/jvm/concurrent-future.check b/test/files/jvm/concurrent-future.check
new file mode 100644
index 0000000000..c55e824818
--- /dev/null
+++ b/test/files/jvm/concurrent-future.check
@@ -0,0 +1,16 @@
+test1: hai world
+test1: kthxbye
+test2: hai world
+test2: awsum thx
+test2: kthxbye
+test3: hai world
+test4: hai world
+test4: kthxbye
+test5: hai world
+test5: kthxbye
+test6: hai world
+test6: kthxbye
+test7: hai world
+test7: kthxbye
+test8: hai world
+test8: im in yr loop
diff --git a/test/files/jvm/concurrent-future.scala b/test/files/jvm/concurrent-future.scala
new file mode 100644
index 0000000000..eb3bbad591
--- /dev/null
+++ b/test/files/jvm/concurrent-future.scala
@@ -0,0 +1,122 @@
+
+
+
+import scala.concurrent._
+
+
+
+object Test extends App {
+
+ def once(body: (() => Unit) => Unit) {
+ val sv = new SyncVar[Boolean]
+ body(() => sv put true)
+ sv.take()
+ }
+
+ def output(num: Int, msg: String) {
+ println("test" + num + ": " + msg)
+ }
+
+ def testOnSuccess(): Unit = once {
+ done =>
+ val f = future {
+ output(1, "hai world")
+ }
+ f onSuccess { _ =>
+ output(1, "kthxbye")
+ done()
+ }
+ }
+
+ def testOnSuccessWhenCompleted(): Unit = once {
+ done =>
+ val f = future {
+ output(2, "hai world")
+ }
+ f onSuccess { _ =>
+ output(2, "awsum thx")
+ f onSuccess { _ =>
+ output(2, "kthxbye")
+ done()
+ }
+ }
+ }
+
+ def testOnSuccessWhenFailed(): Unit = once {
+ done =>
+ val f = future[Unit] {
+ output(3, "hai world")
+ done()
+ throw new Exception
+ }
+ f onSuccess { _ =>
+ output(3, "onoes")
+ }
+ }
+
+ def testOnFailure(): Unit = once {
+ done =>
+ val f = future[Unit] {
+ output(4, "hai world")
+ throw new Exception
+ }
+ f onSuccess { _ =>
+ output(4, "onoes")
+ done()
+ }
+ f onFailure { case _ =>
+ output(4, "kthxbye")
+ done()
+ }
+ }
+
+ def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once {
+ done =>
+ val f = future[Unit] {
+ output(num, "hai world")
+ throw cause
+ }
+ f onSuccess { _ =>
+ output(num, "onoes")
+ done()
+ }
+ f onFailure {
+ case e: ExecutionException if (e.getCause == cause) =>
+ output(num, "kthxbye")
+ done()
+ case _ =>
+ output(num, "onoes")
+ done()
+ }
+ }
+
+ def testOnFailureWhenFutureTimeoutException(): Unit = once {
+ done =>
+ val f = future[Unit] {
+ output(8, "hai world")
+ throw new FutureTimeoutException(null)
+ }
+ f onSuccess { _ =>
+ output(8, "onoes")
+ done()
+ }
+ f onFailure {
+ case e: FutureTimeoutException =>
+ output(8, "im in yr loop")
+ done()
+ case other =>
+ output(8, "onoes: " + other)
+ done()
+ }
+ }
+
+ testOnSuccess()
+ testOnSuccessWhenCompleted()
+ testOnSuccessWhenFailed()
+ testOnFailure()
+ testOnFailureWhenSpecialThrowable(5, new Error)
+ testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { })
+ testOnFailureWhenSpecialThrowable(7, new InterruptedException)
+ testOnFailureWhenFutureTimeoutException()
+
+}
diff --git a/test/files/jvm/scala-concurrent-tck-akka.scala b/test/files/jvm/scala-concurrent-tck-akka.scala
new file mode 100644
index 0000000000..dfd906e59e
--- /dev/null
+++ b/test/files/jvm/scala-concurrent-tck-akka.scala
@@ -0,0 +1,391 @@
+
+
+import akka.dispatch.{
+ Future => future,
+ Promise => promise
+}
+import akka.dispatch.Await.{result => await}
+
+// Duration required for await
+import akka.util.Duration
+import java.util.concurrent.TimeUnit
+import TimeUnit._
+
+import scala.concurrent.{
+ TimeoutException,
+ SyncVar,
+ ExecutionException
+}
+//import scala.concurrent.future
+//import scala.concurrent.promise
+//import scala.concurrent.await
+
+
+
+trait TestBase {
+
+ implicit val disp = akka.actor.ActorSystem().dispatcher
+
+ def once(body: (() => Unit) => Unit) {
+ val sv = new SyncVar[Boolean]
+ body(() => sv put true)
+ sv.take()
+ }
+
+}
+
+
+trait FutureCallbacks extends TestBase {
+
+ def testOnSuccess(): Unit = once {
+ done =>
+ var x = 0
+ val f = future {
+ x = 1
+ }
+ f onSuccess { case any =>
+ done()
+ assert(x == 1)
+ }
+ }
+
+ def testOnSuccessWhenCompleted(): Unit = once {
+ done =>
+ var x = 0
+ val f = future {
+ x = 1
+ }
+ f onSuccess { case any =>
+ assert(x == 1)
+ x = 2
+ f onSuccess { case any =>
+ assert(x == 2)
+ done()
+ }
+ }
+ }
+
+ def testOnSuccessWhenFailed(): Unit = once {
+ done =>
+ val f = future[Unit] {
+ done()
+ throw new Exception
+ }
+ f onSuccess { case any =>
+ assert(false)
+ }
+ }
+
+ def testOnFailure(): Unit = once {
+ done =>
+ var x = 0
+ val f = future[Unit] {
+ x = 1
+ throw new Exception
+ }
+ f onSuccess { case any =>
+ done()
+ assert(false)
+ }
+ f onFailure {
+ case _ =>
+ done()
+ assert(x == 1)
+ }
+ }
+
+ def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once {
+ done =>
+ val f = future[Unit] {
+ throw cause
+ }
+ f onSuccess { case any =>
+ done()
+ assert(false)
+ }
+ f onFailure {
+ case e: ExecutionException if (e.getCause == cause) =>
+ done()
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testOnFailureWhenTimeoutException(): Unit = once {
+ done =>
+ val f = future[Unit] {
+ throw new TimeoutException()
+ }
+ f onSuccess { case any =>
+ done()
+ assert(false)
+ }
+ f onFailure {
+ case e: TimeoutException =>
+ done()
+ case other =>
+ done()
+ assert(false)
+ }
+ }
+
+ testOnSuccess()
+ testOnSuccessWhenCompleted()
+ testOnSuccessWhenFailed()
+ testOnFailure()
+// testOnFailureWhenSpecialThrowable(5, new Error)
+// testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { })
+// testOnFailureWhenSpecialThrowable(7, new InterruptedException)
+// testOnFailureWhenTimeoutException()
+
+}
+
+
+trait FutureCombinators extends TestBase {
+
+ // map: stub
+ def testMapSuccess(): Unit = once {
+ done =>
+ done()
+ }
+
+ def testMapFailure(): Unit = once {
+ done =>
+ done()
+ }
+
+ // flatMap: stub
+ def testFlatMapSuccess(): Unit = once {
+ done =>
+ done()
+ }
+
+ def testFlatMapFailure(): Unit = once {
+ done =>
+ done()
+ }
+
+ // filter: stub
+ def testFilterSuccess(): Unit = once {
+ done =>
+ done()
+ }
+
+ def testFilterFailure(): Unit = once {
+ done =>
+ done()
+ }
+
+ // foreach: stub
+ def testForeachSuccess(): Unit = once {
+ done =>
+ done()
+ }
+
+ def testForeachFailure(): Unit = once {
+ done =>
+ done()
+ }
+
+ def testRecoverSuccess(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ } recover {
+ case re: RuntimeException =>
+ "recovered"
+ } onSuccess { case x =>
+ done()
+ assert(x == "recovered")
+ } onFailure { case any =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testRecoverFailure(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ } recover {
+ case te: TimeoutException => "timeout"
+ } onSuccess { case x =>
+ done()
+ assert(false)
+ } onFailure { case any =>
+ done()
+ assert(any == cause)
+ }
+ }
+
+ testMapSuccess()
+ testMapFailure()
+ testFlatMapSuccess()
+ testFlatMapFailure()
+ testFilterSuccess()
+ testFilterFailure()
+ testForeachSuccess()
+ testForeachFailure()
+ testRecoverSuccess()
+ testRecoverFailure()
+
+}
+
+/*
+trait FutureProjections extends TestBase {
+
+ def testFailedFailureOnComplete(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ f.failed onComplete {
+ case Right(t) =>
+ assert(t == cause)
+ done()
+ case Left(t) =>
+ assert(false)
+ }
+ }
+
+ def testFailedFailureOnSuccess(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ f.failed onSuccess {
+ t =>
+ assert(t == cause)
+ done()
+ }
+ }
+
+ def testFailedSuccessOnComplete(): Unit = once {
+ done =>
+ val f = future { 0 }
+ f.failed onComplete {
+ case Right(t) =>
+ assert(false)
+ case Left(t) =>
+ assert(t.isInstanceOf[NoSuchElementException])
+ done()
+ }
+ }
+
+ def testFailedSuccessOnFailure(): Unit = once {
+ done =>
+ val f = future { 0 }
+ f.failed onFailure {
+ case nsee: NoSuchElementException =>
+ done()
+ }
+ }
+
+ def testFailedFailureAwait(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ assert(await(0, f.failed) == cause)
+ done()
+ }
+
+ def testFailedSuccessAwait(): Unit = once {
+ done =>
+ val f = future { 0 }
+ try {
+ println(await(0, f.failed))
+ assert(false)
+ } catch {
+ case nsee: NoSuchElementException => done()
+ }
+ }
+
+ testFailedFailureOnComplete()
+ testFailedFailureOnSuccess()
+ testFailedSuccessOnComplete()
+ testFailedSuccessOnFailure()
+ testFailedFailureAwait()
+ //testFailedSuccessAwait()
+
+}
+*/
+
+trait Blocking extends TestBase {
+
+ def testAwaitSuccess(): Unit = once {
+ done =>
+ val f = future { 0 }
+ await(f, Duration(500, "ms"))
+ done()
+ }
+
+ def testAwaitFailure(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ try {
+ await(f, Duration(500, "ms"))
+ assert(false)
+ } catch {
+ case t =>
+ assert(t == cause)
+ done()
+ }
+ }
+
+ testAwaitSuccess()
+ testAwaitFailure()
+
+}
+
+/*
+trait Promises extends TestBase {
+
+ def testSuccess(): Unit = once {
+ done =>
+ val p = promise[Int]()
+ val f = p.future
+
+ f.onSuccess { x =>
+ done()
+ assert(x == 5)
+ } onFailure { case any =>
+ done()
+ assert(false)
+ }
+
+ p.success(5)
+ }
+
+ testSuccess()
+
+}
+*/
+
+trait Exceptions extends TestBase {
+
+}
+
+
+object Test
+extends App
+with FutureCallbacks
+with FutureCombinators
+/*with FutureProjections*/
+/*with Promises*/
+with Blocking
+with Exceptions
+{
+ System.exit(0)
+}
+
+
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
new file mode 100644
index 0000000000..ccf1162e19
--- /dev/null
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -0,0 +1,389 @@
+
+
+
+import scala.concurrent.{
+ Future,
+ Promise,
+ TimeoutException,
+ SyncVar,
+ ExecutionException
+}
+import scala.concurrent.future
+import scala.concurrent.promise
+import scala.concurrent.await
+
+import scala.util.Duration
+
+
+trait TestBase {
+
+ def once(body: (() => Unit) => Unit) {
+ val sv = new SyncVar[Boolean]
+ body(() => sv put true)
+ sv.take()
+ }
+
+ def assert(cond: => Boolean) {
+ try {
+ Predef.assert(cond)
+ } catch {
+ case e => e.printStackTrace()
+ }
+ }
+
+}
+
+
+trait FutureCallbacks extends TestBase {
+
+ def testOnSuccess(): Unit = once {
+ done =>
+ var x = 0
+ val f = future {
+ x = 1
+ }
+ f onSuccess { _ =>
+ done()
+ assert(x == 1)
+ }
+ }
+
+ def testOnSuccessWhenCompleted(): Unit = once {
+ done =>
+ var x = 0
+ val f = future {
+ x = 1
+ }
+ f onSuccess { _ =>
+ assert(x == 1)
+ x = 2
+ f onSuccess { _ =>
+ assert(x == 2)
+ done()
+ }
+ }
+ }
+
+ def testOnSuccessWhenFailed(): Unit = once {
+ done =>
+ val f = future[Unit] {
+ done()
+ throw new Exception
+ }
+ f onSuccess { _ =>
+ assert(false)
+ }
+ }
+
+ def testOnFailure(): Unit = once {
+ done =>
+ var x = 0
+ val f = future[Unit] {
+ x = 1
+ throw new Exception
+ }
+ f onSuccess { _ =>
+ done()
+ assert(false)
+ }
+ f onFailure {
+ case _ =>
+ done()
+ assert(x == 1)
+ }
+ }
+
+ def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once {
+ done =>
+ val f = future[Unit] {
+ throw cause
+ }
+ f onSuccess { _ =>
+ done()
+ assert(false)
+ }
+ f onFailure {
+ case e: ExecutionException if (e.getCause == cause) =>
+ done()
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testOnFailureWhenTimeoutException(): Unit = once {
+ done =>
+ val f = future[Unit] {
+ throw new TimeoutException()
+ }
+ f onSuccess { _ =>
+ done()
+ assert(false)
+ }
+ f onFailure {
+ case e: TimeoutException =>
+ done()
+ case other =>
+ done()
+ assert(false)
+ }
+ }
+
+ testOnSuccess()
+ testOnSuccessWhenCompleted()
+ testOnSuccessWhenFailed()
+ testOnFailure()
+ testOnFailureWhenSpecialThrowable(5, new Error)
+ testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { })
+ testOnFailureWhenSpecialThrowable(7, new InterruptedException)
+ testOnFailureWhenTimeoutException()
+
+}
+
+
+trait FutureCombinators extends TestBase {
+
+ // map: stub
+ def testMapSuccess(): Unit = once {
+ done =>
+ done()
+ }
+
+ def testMapFailure(): Unit = once {
+ done =>
+ done()
+ }
+
+ // flatMap: stub
+ def testFlatMapSuccess(): Unit = once {
+ done =>
+ done()
+ }
+
+ def testFlatMapFailure(): Unit = once {
+ done =>
+ done()
+ }
+
+ // filter: stub
+ def testFilterSuccess(): Unit = once {
+ done =>
+ done()
+ }
+
+ def testFilterFailure(): Unit = once {
+ done =>
+ done()
+ }
+
+ // foreach: stub
+ def testForeachSuccess(): Unit = once {
+ done =>
+ done()
+ }
+
+ def testForeachFailure(): Unit = once {
+ done =>
+ done()
+ }
+
+ def testRecoverSuccess(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ } recover {
+ case re: RuntimeException =>
+ "recovered"
+ } onSuccess { x =>
+ done()
+ assert(x == "recovered")
+ } onFailure { case any =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testRecoverFailure(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ } recover {
+ case te: TimeoutException => "timeout"
+ } onSuccess { x =>
+ done()
+ assert(false)
+ } onFailure { case any =>
+ done()
+ assert(any == cause)
+ }
+ }
+
+ testMapSuccess()
+ testMapFailure()
+ testFlatMapSuccess()
+ testFlatMapFailure()
+ testFilterSuccess()
+ testFilterFailure()
+ testForeachSuccess()
+ testForeachFailure()
+ testRecoverSuccess()
+ testRecoverFailure()
+
+}
+
+
+trait FutureProjections extends TestBase {
+
+ def testFailedFailureOnComplete(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ f.failed onComplete {
+ case Right(t) =>
+ assert(t == cause)
+ done()
+ case Left(t) =>
+ assert(false)
+ }
+ }
+
+ def testFailedFailureOnSuccess(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ f.failed onSuccess {
+ t =>
+ assert(t == cause)
+ done()
+ }
+ }
+
+ def testFailedSuccessOnComplete(): Unit = once {
+ done =>
+ val f = future { 0 }
+ f.failed onComplete {
+ case Right(t) =>
+ assert(false)
+ case Left(t) =>
+ assert(t.isInstanceOf[NoSuchElementException])
+ done()
+ }
+ }
+
+ def testFailedSuccessOnFailure(): Unit = once {
+ done =>
+ val f = future { 0 }
+ f.failed onFailure {
+ case nsee: NoSuchElementException =>
+ done()
+ }
+ }
+
+ def testFailedFailureAwait(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ assert(await(0, f.failed) == cause)
+ done()
+ }
+
+ def testFailedSuccessAwait(): Unit = once {
+ done =>
+ val f = future { 0 }
+ try {
+ await(0, f.failed)
+ assert(false)
+ } catch {
+ case nsee: NoSuchElementException => done()
+ }
+ }
+
+ testFailedFailureOnComplete()
+ testFailedFailureOnSuccess()
+ testFailedSuccessOnComplete()
+ testFailedSuccessOnFailure()
+ testFailedFailureAwait()
+ testFailedSuccessAwait()
+
+}
+
+
+trait Blocking extends TestBase {
+
+ def testAwaitSuccess(): Unit = once {
+ done =>
+ val f = future { 0 }
+ await(Duration(500, "ms"), f)
+ done()
+ }
+
+ def testAwaitFailure(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ try {
+ await(Duration(500, "ms"), f)
+ assert(false)
+ } catch {
+ case t =>
+ assert(t == cause)
+ done()
+ }
+ }
+
+ testAwaitSuccess()
+ testAwaitFailure()
+
+}
+
+
+trait Promises extends TestBase {
+
+ def testSuccess(): Unit = once {
+ done =>
+ val p = promise[Int]()
+ val f = p.future
+
+ f.onSuccess { x =>
+ done()
+ assert(x == 5)
+ } onFailure { case any =>
+ done()
+ assert(false)
+ }
+
+ p.success(5)
+ }
+
+ testSuccess()
+
+}
+
+
+trait Exceptions extends TestBase {
+
+}
+
+
+object Test
+extends App
+with FutureCallbacks
+with FutureCombinators
+with FutureProjections
+with Promises
+with Exceptions
+{
+ System.exit(0)
+}
+
+