summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/concurrent/AbstractPromise.java.disabled17
-rw-r--r--src/library/scala/concurrent/Blockable.scala23
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala30
-rw-r--r--src/library/scala/concurrent/ForkJoinTaskImpl.scala133
-rw-r--r--src/library/scala/concurrent/Future.scala279
-rw-r--r--src/library/scala/concurrent/Future.scala.disabled1051
-rw-r--r--src/library/scala/concurrent/Promise.scala63
-rw-r--r--src/library/scala/concurrent/Task.scala13
-rw-r--r--src/library/scala/concurrent/package.scala93
-rw-r--r--src/library/scala/package.scala3
-rw-r--r--src/library/scala/util/Duration.scala485
-rw-r--r--src/library/scala/util/Timeout.scala33
12 files changed, 2222 insertions, 1 deletions
diff --git a/src/library/scala/concurrent/AbstractPromise.java.disabled b/src/library/scala/concurrent/AbstractPromise.java.disabled
new file mode 100644
index 0000000000..726e6a3156
--- /dev/null
+++ b/src/library/scala/concurrent/AbstractPromise.java.disabled
@@ -0,0 +1,17 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package scala.concurrent;
+/*
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import scala.concurrent.forkjoin.*;
+
+abstract class AbstractPromise {
+ //private volatile Object _ref = DefaultPromise.EmptyPending();
+ protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
+
+ protected void compute() { }
+}
+*/
diff --git a/src/library/scala/concurrent/Blockable.scala b/src/library/scala/concurrent/Blockable.scala
new file mode 100644
index 0000000000..1ad02c7469
--- /dev/null
+++ b/src/library/scala/concurrent/Blockable.scala
@@ -0,0 +1,23 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+
+
+import scala.annotation.implicitNotFound
+
+
+
+trait Blockable[+T] {
+ @implicitNotFound(msg = "Blocking must be done by calling `block on b`, where `b` is the Blockable object.")
+ def block()(implicit canblock: CanBlock): T
+}
+
+
+
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
new file mode 100644
index 0000000000..34c14147f5
--- /dev/null
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -0,0 +1,30 @@
+package scala.concurrent
+
+
+import java.util.concurrent.{ Executors, Future => JFuture }
+import scala.util.{ Duration, Timeout }
+import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
+
+
+trait ExecutionContext {
+
+ protected implicit object CanBlockEvidence extends CanBlock
+
+ def execute(task: Runnable): Unit
+
+ def task[T](task: () => T)(implicit timeout: Timeout): Task[T]
+
+ def promise[T](implicit timeout: Timeout): Promise[T]
+
+ def blockingCall[T](body: Blockable[T]): T
+
+}
+
+
+object ExecutionContext {
+
+ lazy val forNonBlocking = new ForkJoinExecutionContext
+
+ //lazy val forBlocking = new BlockingExecutionContext
+
+}
diff --git a/src/library/scala/concurrent/ForkJoinTaskImpl.scala b/src/library/scala/concurrent/ForkJoinTaskImpl.scala
new file mode 100644
index 0000000000..6a33ca162a
--- /dev/null
+++ b/src/library/scala/concurrent/ForkJoinTaskImpl.scala
@@ -0,0 +1,133 @@
+package scala.concurrent
+
+
+
+import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+import scala.annotation.tailrec
+
+
+
+/* DONE: The challenge is to make ForkJoinPromise inherit from RecursiveAction
+ * to avoid an object allocation per promise. This requires turning DefaultPromise
+ * into a trait, i.e., removing its constructor parameters.
+ */
+private[concurrent] class ForkJoinTaskImpl[T](val executionContext: ForkJoinExecutionContext, val body: () => T, val timeout: Timeout)
+extends FJTask[T] with Task[T] with Future[T] {
+
+ private val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[ForkJoinTaskImpl[T]], classOf[FJState[T]], "state")
+ @volatile private var state: State[T] = _
+
+ updater.set(this, Pending(List()))
+
+ private def casState(oldv: State[T], newv: State[T]) = {
+ updater.compareAndSet(this, oldv, newv)
+ }
+
+ @tailrec private def trySucceedState(res: T): Unit = updater.get(this) match {
+ case p @ Pending(cbs) => if (!casState(p, Success(res))) trySucceedState(res)
+ case _ => // return
+ }
+
+ @tailrec private def tryFailState(t: Throwable): Unit = updater.get(this) match {
+ case p @ Pending(cbs) => if (!casState(p, Failure(t))) tryFailState(t)
+ case _ => // return
+ }
+
+ // body of RecursiveTask
+ def compute(): T = {
+ try {
+ val res = body()
+ trySucceedState(res)
+ } catch handledFutureException andThen {
+ t => tryFailState(t)
+ } finally tryFailState(new ExecutionException)
+ }
+
+ def start(): Unit = {
+ Thread.currentThread match {
+ case fj: ForkJoinWorkerThread if fj.pool eq executionContext.pool => fork()
+ case _ => executionContext.pool.execute(this)
+ }
+ }
+
+ def future: Future[T] = this
+
+ def onComplete[U](callback: Either[Throwable, T] => U): this.type = {
+ @tailrec def tryAddCallback(): Either[Throwable, T] = {
+ updater.get(this) match {
+ case p @ Pending(lst) =>
+ val pt = p.asInstanceOf[Pending[T]]
+ if (casState(pt, Pending(callback :: pt.lst))) null
+ else tryAddCallback()
+ case Success(res) => Right(res)
+ case Failure(t) => Left(t)
+ }
+ }
+
+ val res = tryAddCallback()
+ if (res != null) dispatchTask new Runnable {
+ override def run() =
+ try callback(res)
+ catch handledFutureException
+ }
+ }
+
+ private def dispatchTask[U](r: Runnable) = executionContext execute r
+
+ def isTimedout: Boolean = false // TODO
+
+ // TODO FIXME: handle timeouts
+ def await(atMost: Duration): this.type =
+ await
+
+ def await: this.type = {
+ this.join()
+ this
+ }
+
+ def tryCancel(): Unit =
+ tryUnfork()
+
+}
+
+
+private[concurrent] sealed abstract class FJState[T]
+
+
+case class Pending[T](callbacks: List[Either[Throwable, T] => Any]) extends FJState[T]
+
+
+case class Success[T](result: T) extends FJState[T]
+
+
+case class Failure[T](throwable: Throwable) extends FJState[T]
+
+
+private[concurrent] final class ForkJoinExecutionContext extends ExecutionContext {
+ val pool = new ForkJoinPool
+
+ @inline
+ private def executeForkJoinTask(task: RecursiveAction) {
+ if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread])
+ task.fork()
+ else
+ pool execute task
+ }
+
+ def execute(task: Runnable) {
+ val action = new RecursiveAction { def compute() { task.run() } }
+ executeForkJoinTask(action)
+ }
+
+ def makeTask[T](body: () => T)(implicit timeout: Timeout): Task[T] = {
+ new ForkJoinTaskImpl(this, body, timeout)
+ }
+
+ def makePromise[T](timeout: Timeout): Promise[T] =
+ null
+
+ def blockingCall[T](body: Blockable[T]): T =
+ body.block()(CanBlockEvidence)
+
+}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
new file mode 100644
index 0000000000..b65d777d67
--- /dev/null
+++ b/src/library/scala/concurrent/Future.scala
@@ -0,0 +1,279 @@
+
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package scala.concurrent
+
+//import akka.AkkaException (replaced with Exception)
+//import akka.event.Logging.Error (removed all logging)
+import scala.util.{ Timeout, Duration }
+import scala.Option
+//import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } (commented methods)
+
+import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
+import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ NANOS, MILLISECONDS ⇒ MILLIS }
+import java.lang.{ Iterable ⇒ JIterable }
+import java.util.{ LinkedList ⇒ JLinkedList }
+
+import scala.annotation.tailrec
+import scala.collection.mutable.Stack
+//import akka.util.Switch (commented method)
+import java.{ lang ⇒ jl }
+import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
+
+
+
+/** The trait that represents futures.
+ *
+ * @define futureTimeout
+ * The timeout of the future is:
+ * - if this future was obtained from a task (i.e. by calling `task.future`), the timeout associated with that task
+ * - if this future was obtained from a promise (i.e. by calling `promise.future`), the timeout associated with that promise
+ *
+ * @define multipleCallbacks
+ * Multiple callbacks may be registered; there is no guarantee that they will be
+ * executed in a particular order.
+ *
+ * @define caughtThrowables
+ * The future may contain a throwable object and this means that the future failed.
+ * Futures obtained through combinators have the same exception as the future they were obtained from.
+ * The following throwable objects are treated differently:
+ * - Error - errors are not contained within futures
+ * - scala.util.control.ControlException - not contained within futures
+ * - InterruptedException - not contained within futures
+ *
+ * @define forComprehensionExamples
+ * Example:
+ *
+ * {{{
+ * val f = future { 5 }
+ * val g = future { 3 }
+ * val h = for {
+ * x: Int <- f // returns Future(5)
+ * y: Int <- g // returns Future(5)
+ * } yield x + y
+ * }}}
+ *
+ * is translated to:
+ *
+ * {{{
+ * f flatMap { (x: Int) => g map { (y: Int) => x + y } }
+ * }}}
+ */
+trait Future[+T] extends Blockable[T] {
+self =>
+
+ /* Callbacks */
+
+ /** When this future is completed successfully (i.e. with a value),
+ * apply the provided function to the value.
+ *
+ * If the future has already been completed with a value,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * Will not be called in case of a timeout.
+ *
+ * Will not be called in case of an exception.
+ *
+ * $multipleCallbacks
+ */
+ def onSuccess[U](func: T => U): this.type = onComplete {
+ case Right(v) => func(v)
+ }
+
+ /** When this future is completed with a failure (i.e. with a throwable),
+ * apply the provided function to the throwable.
+ *
+ * $caughtThrowables
+ *
+ * If the future has already been completed with a failure,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * Will not be called in case of a timeout.
+ *
+ * Will not be called in case of an exception.
+ *
+ * $multipleCallbacks
+ */
+ def onFailure[U](func: Throwable => U): this.type = onComplete {
+ case Left(t) if isFutureThrowable(t) => func(t)
+ }
+
+ /** When this future times out, apply the provided function.
+ *
+ * If the future has already timed out,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * $multipleCallbacks
+ */
+ def onTimeout[U](body: =>U): this.type = onComplete {
+ case Left(te: FutureTimeoutException) => body
+ }
+
+ /** When this future is completed, either through an exception, a timeout, or a value,
+ * apply the provided function.
+ *
+ * If the future has already been completed,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * $multipleCallbacks
+ */
+ def onComplete[U](func: Either[Throwable, T] => U): this.type
+
+
+ /* Miscellaneous */
+
+ /** The execution context of the future.
+ */
+ def executionContext: ExecutionContext
+
+ /** Creates a new promise.
+ */
+ def newPromise[S]: Promise[S] = executionContext promise
+
+ /** Tests whether this Future's timeout has expired.
+ *
+ * $futureTimeout
+ *
+ * Note that an expired Future may still contain a value, or it may be
+ * completed with a value.
+ */
+ def isTimedout: Boolean
+
+ /** This future's timeout.
+ *
+ * $futureTimeout
+ */
+ def timeout: Timeout
+
+ /** This future's timeout in nanoseconds.
+ *
+ * $futureTimeout
+ */
+ def timeoutInNanos = if (timeout.duration.isFinite) timeout.duration.toNanos else Long.MaxValue
+
+
+ /* Projections */
+
+ def failed: Future[Throwable] = new Future[Throwable] {
+ def newPromise[S] = self.newPromise[S]
+ def onComplete[U](func: Either[Throwable, Throwable] => U) = self.onComplete {
+ case Left(t) => func(Right(t))
+ case Right(v) => // do nothing
+ }
+ def isTimedout = self.isTimedout
+ def timeout = self.timeout
+ }
+
+ def timedout: Future[FutureTimeoutException] = new Future[FutureTimeoutException] {
+ def newPromise[S] = self.newPromise[S]
+ def onComplete[U](func: Either[Throwable, FutureTimeoutException] => U) = self.onComplete {
+ case Left(te: FutureTimeoutException) => func(Right(te))
+ case _ => // do nothing
+ }
+ def isTimedout = self.isTimedout
+ def timeout = self.timeout
+ }
+
+
+ /* Monadic operations */
+
+ /** Creates a new future that will handle any matching throwable that this
+ * future might contain. If there is no match, or if this future contains
+ * a valid result then the new future will contain the same.
+ *
+ * Example:
+ *
+ * {{{
+ * future (6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0
+ * future (6 / 0) recover { case e: NotFoundException ⇒ 0 } // result: exception
+ * future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
+ * }}}
+ */
+ def recover[U >: T](pf: PartialFunction[Throwable, U]): Future[U] = {
+ val p = newPromise[U]
+
+ onComplete {
+ case Left(t) => if (pf isDefinedAt t) p fulfill pf(t) else p fail t
+ case Right(v) => p fulfill v
+ }
+
+ p.future
+ }
+
+ /** Asynchronously processes the value in the future once the value becomes available.
+ *
+ * Will not be called if the future times out or fails.
+ *
+ * This method typically registers an `onResult` callback.
+ */
+ def foreach[U](f: T => U): Unit = onResult f
+
+ /** Creates a new future by applying a function to the successful result of
+ * this future. If this future is completed with an exception then the new
+ * future will also contain this exception.
+ *
+ * $forComprehensionExample
+ */
+ def map[S](f: T => S): Future[S] = {
+ val p = newPromise[S]
+
+ onComplete {
+ case Left(t) => p fail t
+ case Right(v) => p fulfill f(v)
+ }
+
+ p.future
+ }
+
+ /** Creates a new future by applying a function to the successful result of
+ * this future, and returns the result of the function as the new future.
+ * If this future is completed with an exception then the new future will
+ * also contain this exception.
+ *
+ * $forComprehensionExample
+ */
+ def flatMap[S](f: T => Future[S]): Future[S] = {
+ val p = newPromise[S]
+
+ onComplete {
+ case Left(t) => p fail t
+ case Right(f) => f onComplete {
+ p fulfill _
+ }
+ }
+
+ p.future
+ }
+
+ /** Creates a new future by filtering the value of the current future with a predicate.
+ *
+ * If the current future contains a value which satisfies the predicate, the new future will also hold that value.
+ * Otherwise, the resulting future will fail with a `NoSuchElementException`.
+ *
+ * If the current future fails or times out, the resulting future also fails or times out, respectively.
+ *
+ * Example:
+ * {{{
+ * val f = future { 5 }
+ * val g = g filter { _ % 2 == 1 }
+ * val h = f filter { _ % 2 == 0 }
+ * block on g // evaluates to 5
+ * block on h // throw a NoSuchElementException
+ * }}}
+ */
+ def filter(p: T => Boolean): Future[T] = {
+ val p = newPromise[T]
+
+ onComplete {
+ case Left(t) => p fail t
+ case Right(v) => if (p(v)) p fulfill v else p fail new NoSuchElementException
+ }
+
+ p.future
+ }
+
+}
+
+
diff --git a/src/library/scala/concurrent/Future.scala.disabled b/src/library/scala/concurrent/Future.scala.disabled
new file mode 100644
index 0000000000..3cd9bbeb6e
--- /dev/null
+++ b/src/library/scala/concurrent/Future.scala.disabled
@@ -0,0 +1,1051 @@
+/*
+class FutureFactory(dispatcher: MessageDispatcher, timeout: Timeout) {
+
+ // TODO: remove me ASAP !!!
+ implicit val _dispatcher = dispatcher
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T]): Future[T] =
+ Future(body.call, timeout)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], timeout: Timeout): Future[T] =
+ Future(body.call, timeout)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], timeout: Long): Future[T] =
+ Future(body.call, timeout)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] =
+ Future(body.call)(dispatcher, timeout)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], timeout: Timeout, dispatcher: MessageDispatcher): Future[T] =
+ Future(body.call)(dispatcher, timeout)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
+ Future(body.call)(dispatcher, timeout)
+
+ /**
+ * Java API.
+ * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
+ */
+/*
+ def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], timeout: Timeout): Future[JOption[T]] = {
+ val pred: T ⇒ Boolean = predicate.apply(_)
+ Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)), timeout)(pred).map(JOption.fromScalaOption(_))(timeout)
+ }
+
+ def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean]): Future[JOption[T]] = find(futures, predicate, timeout)
+*/
+ /**
+ * Java API.
+ * Returns a Future to the result of the first future in the list that is completed
+ */
+/*
+ def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], timeout: Timeout): Future[T] =
+ Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
+
+ def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]]): Future[T] = firstCompletedOf(futures, timeout)
+*/
+ /**
+ * Java API
+ * A non-blocking fold over the specified futures.
+ * The fold is performed on the thread where the last future is completed,
+ * the result will be the first failure of any of the futures, or any failure in the actual fold,
+ * or the result of the fold.
+ */
+/*
+ def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
+ Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(zero)(fun.apply _)
+
+ def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun)
+
+ def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout, futures, fun)
+
+ /**
+ * Java API.
+ * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
+ */
+ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] =
+ Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
+
+ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun)
+
+ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout, fun)
+
+ /**
+ * Java API.
+ * Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
+ * Useful for reducing many Futures into a single Future.
+ */
+ def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] = {
+ implicit val t = timeout
+ scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) ⇒
+ for (r ← fr; a ← fa) yield {
+ r add a
+ r
+ })
+ }
+
+ def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = sequence(in, timeout)
+
+ /**
+ * Java API.
+ * Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A ⇒ Future[B].
+ * This is useful for performing a parallel map. For example, to apply a function to all items of a list
+ * in parallel.
+ */
+ def traverse[A, B](in: JIterable[A], timeout: Timeout, fn: JFunc[A, Future[B]]): Future[JIterable[B]] = {
+ implicit val t = timeout
+ scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) ⇒
+ val fb = fn(a)
+ for (r ← fr; b ← fb) yield {
+ r add b
+ r
+ }
+ }
+ }
+
+ def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, timeout, fn)
+
+*/
+}
+*/
+
+object Future {
+ /*
+ /**
+ * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
+ * The execution is performed by the specified Dispatcher.
+ */
+ def apply[T](body: ⇒ T)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = {
+ val promise: Promise[T] = DefaultPromise[T](timeout)
+
+ dispatcher dispatchTask { () ⇒
+ promise complete {
+ try {
+ Right(body)
+ } catch {
+ case e ⇒ Left(e)
+ }
+ }
+ }
+
+ promise
+ }
+
+ def apply[T](body: ⇒ T, timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] =
+ apply(body)(dispatcher, timeout)
+
+ def apply[T](body: ⇒ T, timeout: Duration)(implicit dispatcher: MessageDispatcher): Future[T] =
+ apply(body)(dispatcher, timeout)
+
+ def apply[T](body: ⇒ T, timeout: Long)(implicit dispatcher: MessageDispatcher): Future[T] =
+ apply(body)(dispatcher, timeout)
+
+ import scala.collection.mutable.Builder
+ import scala.collection.generic.CanBuildFrom
+
+ /**
+ * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
+ * Useful for reducing many Futures into a single Future.
+ */
+ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[A]] =
+ in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
+
+ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Timeout)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] =
+ sequence(in)(cbf, timeout, dispatcher)
+
+ /**
+ * Returns a Future to the result of the first future in the list that is completed
+ */
+ def firstCompletedOf[T](futures: Iterable[Future[T]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = {
+ val futureResult = DefaultPromise[T](timeout)
+
+ val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _)
+ futures.foreach(_ onComplete completeFirst)
+
+ futureResult
+ }
+
+ def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] =
+ firstCompletedOf(futures)(dispatcher, timeout)
+
+ /**
+ * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
+ */
+ def find[T](futures: Iterable[Future[T]])(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[Option[T]] = {
+ if (futures.isEmpty) new KeptPromise[Option[T]](Right(None))
+ else {
+ val result = DefaultPromise[Option[T]](timeout)
+ val ref = new AtomicInteger(futures.size)
+ val search: Future[T] ⇒ Unit = f ⇒ try {
+ f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r))
+ } finally {
+ if (ref.decrementAndGet == 0)
+ result completeWithResult None
+ }
+ futures.foreach(_ onComplete search)
+
+ result
+ }
+ }
+
+ def find[T](futures: Iterable[Future[T]], timeout: Timeout)(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] =
+ find(futures)(predicate)(dispatcher, timeout)
+
+ /**
+ * A non-blocking fold over the specified futures.
+ * The fold is performed on the thread where the last future is completed,
+ * the result will be the first failure of any of the futures, or any failure in the actual fold,
+ * or the result of the fold.
+ * Example:
+ * <pre>
+ * val result = Futures.fold(0)(futures)(_ + _).await.result
+ * </pre>
+ */
+/*
+ def fold[T, R](futures: Iterable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[R] = {
+ if (futures.isEmpty) {
+ new KeptPromise[R](Right(zero))
+ } else {
+ val result = DefaultPromise[R](timeout)
+ val results = new ConcurrentLinkedQueue[T]()
+ val done = new Switch(false)
+ val allDone = futures.size
+
+ val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
+ f.value.get match {
+ case Right(value) ⇒
+ val added = results add value
+ if (added && results.size == allDone) { //Only one thread can get here
+ if (done.switchOn) {
+ try {
+ val i = results.iterator
+ var currentValue = zero
+ while (i.hasNext) { currentValue = foldFun(currentValue, i.next) }
+ result completeWithResult currentValue
+ } catch {
+ case e: Exception ⇒
+ //dispatcher.prerequisites.eventStream.publish(Error(e, "Future.fold", e.getMessage))
+ result completeWithException e
+ } finally {
+ results.clear
+ }
+ }
+ }
+ case Left(exception) ⇒
+ if (done.switchOn) {
+ result completeWithException exception
+ results.clear
+ }
+ }
+ }
+
+ futures foreach { _ onComplete aggregate }
+ result
+ }
+ }
+*/
+/*
+ def fold[T, R](futures: Iterable[Future[T]], timeout: Timeout)(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] =
+ fold(futures)(zero)(foldFun)(dispatcher, timeout)
+*/
+ /**
+ * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
+ * Example:
+ * <pre>
+ * val result = Futures.reduce(futures)(_ + _).await.result
+ * </pre>
+ */
+/*
+ def reduce[T, R >: T](futures: Iterable[Future[T]])(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[R] = {
+ if (futures.isEmpty)
+ new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
+ else {
+ val result = DefaultPromise[R](timeout)
+ val seedFound = new AtomicBoolean(false)
+ val seedFold: Future[T] ⇒ Unit = f ⇒ {
+ if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
+ f.value.get match {
+ case Right(value) ⇒ result.completeWith(fold(futures.filterNot(_ eq f))(value)(op))
+ case Left(exception) ⇒ result.completeWithException(exception)
+ }
+ }
+ }
+ for (f ← futures) f onComplete seedFold //Attach the listener to the Futures
+ result
+ }
+ }
+*/
+/*
+ def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout)(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher): Future[R] =
+ reduce(futures)(op)(dispatcher, timeout)
+*/
+ /**
+ * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B].
+ * This is useful for performing a parallel map. For example, to apply a function to all items of a list
+ * in parallel:
+ * <pre>
+ * val myFutureList = Futures.traverse(myList)(x ⇒ Future(myFunc(x)))
+ * </pre>
+ */
+ def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[B]] =
+ in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[B, M[B]]]) { (fr, a) ⇒
+ val fb = fn(a.asInstanceOf[A])
+ for (r ← fr; b ← fb) yield (r += b)
+ }.map(_.result)
+
+ def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] =
+ traverse(in)(fn)(cbf, timeout, dispatcher)
+
+ /**
+ * Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
+ * Continuations plugin.
+ *
+ * Within the block, the result of a Future may be accessed by calling Future.apply. At that point
+ * execution is suspended with the rest of the block being stored in a continuation until the result
+ * of the Future is available. If an Exception is thrown while processing, it will be contained
+ * within the resulting Future.
+ *
+ * This allows working with Futures in an imperative style without blocking for each result.
+ *
+ * Completing a Future using 'Promise << Future' will also suspend execution until the
+ * value of the other Future is available.
+ *
+ * The Delimited Continuations compiler plugin must be enabled in order to use this method.
+ */
+/*
+ def flow[A](body: ⇒ A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = {
+ val future = Promise[A](timeout)
+ dispatchTask({ () ⇒
+ (reify(body) foreachFull (future completeWithResult, future completeWithException): Future[Any]) onException {
+ case e: Exception ⇒ future completeWithException e
+ }
+ }, true)
+ future
+ }
+*/
+ // TODO make variant of flow(timeout)(body) which does NOT break type inference
+
+ /**
+ * Assures that any Future tasks initiated in the current thread will be
+ * executed asynchronously, including any tasks currently queued to be
+ * executed in the current thread. This is needed if the current task may
+ * block, causing delays in executing the remaining tasks which in some
+ * cases may cause a deadlock.
+ *
+ * Note: Calling 'Future.await' will automatically trigger this method.
+ *
+ * For example, in the following block of code the call to 'latch.open'
+ * might not be executed until after the call to 'latch.await', causing
+ * a deadlock. By adding 'Future.blocking()' the call to 'latch.open'
+ * will instead be dispatched separately from the current block, allowing
+ * it to be run in parallel:
+ * <pre>
+ * val latch = new StandardLatch
+ * val future = Future() map { _ ⇒
+ * Future.blocking()
+ * val nested = Future()
+ * nested foreach (_ ⇒ latch.open)
+ * latch.await
+ * }
+ * </pre>
+ */
+ def blocking()(implicit dispatcher: MessageDispatcher): Unit =
+ _taskStack.get match {
+ case Some(taskStack) if taskStack.nonEmpty ⇒
+ val tasks = taskStack.elems
+ taskStack.clear()
+ _taskStack set None
+ dispatchTask(() ⇒ _taskStack.get.get.elems = tasks, true)
+ case Some(_) ⇒ _taskStack set None
+ case _ ⇒ // already None
+ }
+
+ private val _taskStack = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() {
+ override def initialValue = None
+ }
+
+ private[concurrent] def dispatchTask(task: () ⇒ Unit, force: Boolean = false)(implicit dispatcher: MessageDispatcher): Unit =
+ _taskStack.get match {
+ case Some(taskStack) if !force ⇒ taskStack push task
+ case _ ⇒
+ dispatcher dispatchTask { () ⇒
+ try {
+ val taskStack = Stack[() ⇒ Unit](task)
+ _taskStack set Some(taskStack)
+ while (taskStack.nonEmpty) {
+ val next = taskStack.pop()
+ try {
+ next.apply()
+ } catch {
+ case e ⇒ e.printStackTrace() //TODO FIXME strategy for handling exceptions in callbacks
+ }
+ }
+ } finally { _taskStack set None }
+ }
+ }
+ */
+}
+
+//trait Future[+T] {
+ /*
+ /**
+ * For use only within a Future.flow block or another compatible Delimited Continuations reset block.
+ *
+ * Returns the result of this Future without blocking, by suspending execution and storing it as a
+ * continuation until the result is available.
+ */
+ def apply()(implicit timeout: Timeout): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any]))
+
+ /**
+ * Blocks awaiting completion of this Future, then returns the resulting value,
+ * or throws the completed exception
+ *
+ * Scala & Java API
+ *
+ * throws FutureTimeoutException if this Future times out when waiting for completion
+ */
+ def get: T = this.await.resultOrException.get
+
+ /**
+ * Blocks the current thread until the Future has been completed or the
+ * timeout has expired. In the case of the timeout expiring a
+ * FutureTimeoutException will be thrown.
+ */
+ def await: Future[T]
+ */
+
+ /**
+ * Blocks the current thread until the Future has been completed or the
+ * timeout has expired, additionally bounding the waiting period according to
+ * the <code>atMost</code> parameter. The timeout will be the lesser value of
+ * 'atMost' and the timeout supplied at the constructuion of this Future. In
+ * the case of the timeout expiring a FutureTimeoutException will be thrown.
+ * Other callers of this method are not affected by the additional bound
+ * imposed by <code>atMost</code>.
+ */
+// def await(atMost: Duration): Future[T]
+
+/*
+ /**
+ * Await completion of this Future and return its value if it conforms to A's
+ * erased type. Will throw a ClassCastException if the value does not
+ * conform, or any exception the Future was completed with. Will return None
+ * in case of a timeout.
+ */
+ def as[A](implicit m: Manifest[A]): Option[A] = {
+ try await catch { case _: FutureTimeoutException ⇒ }
+ value match {
+ case None ⇒ None
+ case Some(Left(ex)) ⇒ throw ex
+ case Some(Right(v)) ⇒
+ try { Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) } catch {
+ case c: ClassCastException ⇒
+ if (v.asInstanceOf[AnyRef] eq null) throw new ClassCastException("null cannot be cast to " + m.erasure)
+ else throw new ClassCastException("'" + v + "' of class " + v.asInstanceOf[AnyRef].getClass + " cannot be cast to " + m.erasure)
+ }
+ }
+ }
+
+ /**
+ * Await completion of this Future and return its value if it conforms to A's
+ * erased type, None otherwise. Will throw any exception the Future was
+ * completed with. Will return None in case of a timeout.
+ */
+ def asSilently[A](implicit m: Manifest[A]): Option[A] = {
+ try await catch { case _: FutureTimeoutException ⇒ }
+ value match {
+ case None ⇒ None
+ case Some(Left(ex)) ⇒ throw ex
+ case Some(Right(v)) ⇒
+ try Some(BoxedType(m.erasure).cast(v).asInstanceOf[A])
+ catch { case _: ClassCastException ⇒ None }
+ }
+ }
+
+ /**
+ * Tests whether this Future has been completed.
+ */
+ final def isCompleted: Boolean = value.isDefined
+
+ /**
+ * Tests whether this Future's timeout has expired.
+ *
+ * Note that an expired Future may still contain a value, or it may be
+ * completed with a value.
+ */
+ def isExpired: Boolean
+
+ def timeout: Timeout
+
+ /**
+ * This Future's timeout in nanoseconds.
+ */
+ def timeoutInNanos = if (timeout.duration.isFinite) timeout.duration.toNanos else Long.MaxValue
+
+ /**
+ * The contained value of this Future. Before this Future is completed
+ * the value will be None. After completion the value will be Some(Right(t))
+ * if it contains a valid result, or Some(Left(error)) if it contains
+ * an exception.
+ */
+ def value: Option[Either[Throwable, T]]
+
+ /**
+ * Returns the successful result of this Future if it exists.
+ */
+ final def result: Option[T] = value match {
+ case Some(Right(r)) ⇒ Some(r)
+ case _ ⇒ None
+ }
+
+ /**
+ * Returns the contained exception of this Future if it exists.
+ */
+ final def exception: Option[Throwable] = value match {
+ case Some(Left(e)) ⇒ Some(e)
+ case _ ⇒ None
+ }
+
+ /**
+ * When this Future is completed, apply the provided function to the
+ * Future. If the Future has already been completed, this will apply
+ * immediately. Will not be called in case of a timeout, which also holds if
+ * corresponding Promise is attempted to complete after expiry. Multiple
+ * callbacks may be registered; there is no guarantee that they will be
+ * executed in a particular order.
+ */
+ def onComplete(func: Future[T] ⇒ Unit): this.type
+
+ /**
+ * When the future is completed with a valid result, apply the provided
+ * PartialFunction to the result. See `onComplete` for more details.
+ * <pre>
+ * future onResult {
+ * case Foo ⇒ target ! "foo"
+ * case Bar ⇒ target ! "bar"
+ * }
+ * </pre>
+ */
+ final def onResult(pf: PartialFunction[T, Unit]): this.type = onComplete {
+ _.value match {
+ case Some(Right(r)) if pf isDefinedAt r ⇒ pf(r)
+ case _ ⇒
+ }
+ }
+
+ /**
+ * When the future is completed with an exception, apply the provided
+ * PartialFunction to the exception. See `onComplete` for more details.
+ * <pre>
+ * future onException {
+ * case NumberFormatException ⇒ target ! "wrong format"
+ * }
+ * </pre>
+ */
+ final def onException(pf: PartialFunction[Throwable, Unit]): this.type = onComplete {
+ _.value match {
+ case Some(Left(ex)) if pf isDefinedAt ex ⇒ pf(ex)
+ case _ ⇒
+ }
+ }
+
+ def onTimeout(func: Future[T] ⇒ Unit): this.type
+
+ def orElse[A >: T](fallback: ⇒ A): Future[A]
+
+ /**
+ * Creates a new Future that will handle any matching Throwable that this
+ * Future might contain. If there is no match, or if this Future contains
+ * a valid result then the new Future will contain the same.
+ * Example:
+ * <pre>
+ * Future(6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0
+ * Future(6 / 0) recover { case e: NotFoundException ⇒ 0 } // result: exception
+ * Future(6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
+ * </pre>
+ */
+ final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit timeout: Timeout): Future[A] = {
+ val future = DefaultPromise[A](timeout)
+ onComplete {
+ _.value.get match {
+ case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) })
+ case otherwise ⇒ future complete otherwise
+ }
+ }
+ future
+ }
+
+ /**
+ * Creates a new Future by applying a function to the successful result of
+ * this Future. If this Future is completed with an exception then the new
+ * Future will also contain this exception.
+ * Example:
+ * <pre>
+ * val future1 = for {
+ * a: Int <- actor ? "Hello" // returns 5
+ * b: String <- actor ? a // returns "10"
+ * c: String <- actor ? 7 // returns "14"
+ * } yield b + "-" + c
+ * </pre>
+ */
+ final def map[A](f: T ⇒ A)(implicit timeout: Timeout): Future[A] = {
+ val future = DefaultPromise[A](timeout)
+ onComplete {
+ _.value.get match {
+ case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]]
+ case Right(res) ⇒
+ future complete (try {
+ Right(f(res))
+ } catch {
+ case e: Exception ⇒
+ //dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage))
+ Left(e)
+ })
+ }
+ }
+ future
+ }
+
+ /**
+ * Creates a new Future[A] which is completed with this Future's result if
+ * that conforms to A's erased type or a ClassCastException otherwise.
+ */
+ final def mapTo[A](implicit m: Manifest[A], timeout: Timeout = this.timeout): Future[A] = {
+ val fa = DefaultPromise[A](timeout)
+ onComplete { ft ⇒
+ fa complete (ft.value.get match {
+ case l: Left[_, _] ⇒ l.asInstanceOf[Either[Throwable, A]]
+ case Right(t) ⇒
+ try {
+ Right(BoxedType(m.erasure).cast(t).asInstanceOf[A])
+ } catch {
+ case e: ClassCastException ⇒ Left(e)
+ }
+ })
+ }
+ fa
+ }
+
+ /**
+ * Creates a new Future by applying a function to the successful result of
+ * this Future, and returns the result of the function as the new Future.
+ * If this Future is completed with an exception then the new Future will
+ * also contain this exception.
+ * Example:
+ * <pre>
+ * val future1 = for {
+ * a: Int <- actor ? "Hello" // returns 5
+ * b: String <- actor ? a // returns "10"
+ * c: String <- actor ? 7 // returns "14"
+ * } yield b + "-" + c
+ * </pre>
+ */
+ final def flatMap[A](f: T ⇒ Future[A])(implicit timeout: Timeout): Future[A] = {
+ val future = DefaultPromise[A](timeout)
+
+ onComplete {
+ _.value.get match {
+ case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]]
+ case Right(r) ⇒ try {
+ future.completeWith(f(r))
+ } catch {
+ case e: Exception ⇒
+ //dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage))
+ future complete Left(e)
+ }
+ }
+ }
+ future
+ }
+
+ final def foreach(f: T ⇒ Unit): Unit = onComplete {
+ _.value.get match {
+ case Right(r) ⇒ f(r)
+ case _ ⇒
+ }
+ }
+
+ final def withFilter(p: T ⇒ Boolean)(implicit timeout: Timeout) = new FutureWithFilter[T](this, p)
+
+ final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean)(implicit timeout: Timeout) {
+ def foreach(f: A ⇒ Unit): Unit = self filter p foreach f
+ def map[B](f: A ⇒ B): Future[B] = self filter p map f
+ def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f
+ def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x))
+ }
+
+ final def filter(p: T ⇒ Boolean)(implicit timeout: Timeout): Future[T] = {
+ val future = DefaultPromise[T](timeout)
+ onComplete {
+ _.value.get match {
+ case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, T]]
+ case r @ Right(res) ⇒ future complete (try {
+ if (p(res)) r else Left(new MatchError(res))
+ } catch {
+ case e: Exception ⇒
+ //dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage))
+ Left(e)
+ })
+ }
+ }
+ future
+ }
+
+ /**
+ * Returns the current result, throws the exception if one has been raised, else returns None
+ */
+ final def resultOrException: Option[T] = value match {
+ case Some(Left(e)) ⇒ throw e
+ case Some(Right(r)) ⇒ Some(r)
+ case _ ⇒ None
+ }
+ */
+//}
+
+/**
+ * Essentially this is the Promise (or write-side) of a Future (read-side).
+ */
+//trait Promise[T] extends Future[T] {
+
+ /*
+ def start(): Unit
+
+ /**
+ * Completes this Future with the specified result, if not already completed.
+ * @return this
+ */
+ def complete(value: Either[Throwable, T]): this.type
+
+ /**
+ * Completes this Future with the specified result, if not already completed.
+ * @return this
+ */
+ final def completeWithResult(result: T): this.type = complete(Right(result))
+
+ /**
+ * Completes this Future with the specified exception, if not already completed.
+ * @return this
+ */
+ final def completeWithException(exception: Throwable): this.type = complete(Left(exception))
+
+ /**
+ * Completes this Future with the specified other Future, when that Future is completed,
+ * unless this Future has already been completed.
+ * @return this.
+ */
+ final def completeWith(other: Future[T]): this.type = {
+ other onComplete { f ⇒ complete(f.value.get) }
+ this
+ }
+ */
+/*
+ final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ cont(complete(Right(value))) }
+
+ final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒
+ val fr = DefaultPromise[Any](this.timeout)
+ this completeWith other onComplete { f ⇒
+ try {
+ fr completeWith cont(f)
+ } catch {
+ case e: Exception ⇒
+ //dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
+ fr completeWithException e
+ }
+ }
+ fr
+ }
+
+ final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒
+ val fr = DefaultPromise[Any](this.timeout)
+ stream.dequeue(this).onComplete { f ⇒
+ try {
+ fr completeWith cont(f)
+ } catch {
+ case e: Exception ⇒
+ //dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
+ fr completeWithException e
+ }
+ }
+ fr
+ }
+*/
+//}
+
+/**
+ * Represents the internal state of the DefaultCompletableFuture
+ */
+sealed abstract class FState[+T] { def value: Option[Either[Throwable, T]] }
+
+case class Pending[T](listeners: List[Future[T] ⇒ Unit] = Nil) extends FState[T] {
+ def value: Option[Either[Throwable, T]] = None
+}
+
+case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
+ def result: T = value.get.right.get
+}
+
+case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
+ def exception: Throwable = value.get.left.get
+}
+
+case object Expired extends FState[Nothing] {
+ def value: Option[Either[Throwable, Nothing]] = None
+}
+
+//Companion object to FState, just to provide a cheap, immutable default entry
+private[concurrent] object FState {
+ def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]]
+ private val emptyPendingValue = Pending[Nothing](Nil)
+}
+
+private[concurrent] object DefaultPromise {
+ def apply[T](within: Timeout)(implicit disp: MessageDispatcher): Promise[T] = new DefaultPromise[T] {
+ val timeout = within
+ implicit val dispatcher = disp
+ }
+
+ def apply[T]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[T] = apply(timeout)
+
+ def apply[T](timeout: Long)(implicit dispatcher: MessageDispatcher): Promise[T] = apply(Timeout(timeout))
+
+ def apply[T](timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher): Promise[T] = apply(Timeout(timeout, timeunit))
+}
+
+/**
+ * The default concrete Future implementation.
+ */
+trait DefaultPromise[T] extends /*AbstractPromise with*/ Promise[T] {
+ self ⇒
+
+// @volatile private _ref: AnyRef = DefaultPromise.EmptyPending()
+
+// protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater =
+// AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
+
+ val timeout: Timeout
+ implicit val dispatcher: MessageDispatcher
+
+ private val _startTimeInNanos = currentTimeInNanos
+
+ @tailrec
+ private def awaitUnsafe(waitTimeNanos: Long): Boolean = {
+ if (value.isEmpty && waitTimeNanos > 0) {
+ val ms = NANOS.toMillis(waitTimeNanos)
+ val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
+ val start = currentTimeInNanos
+ try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ }
+
+ awaitUnsafe(waitTimeNanos - (currentTimeInNanos - start))
+ } else {
+ value.isDefined
+ }
+ }
+
+ def await(atMost: Duration): this.type = if (value.isDefined) this else {
+ Future.blocking()
+
+ val waitNanos =
+ if (timeout.duration.isFinite && atMost.isFinite)
+ atMost.toNanos min timeLeft()
+ else if (atMost.isFinite)
+ atMost.toNanos
+ else if (timeout.duration.isFinite)
+ timeLeft()
+ else Long.MaxValue //If both are infinite, use Long.MaxValue
+
+ if (awaitUnsafe(waitNanos)) this
+ else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(waitNanos) + "] milliseconds")
+ }
+
+ def await = await(timeout.duration)
+
+ def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false
+
+ def value: Option[Either[Throwable, T]] = getState.value
+
+ @inline
+ protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean =
+ AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].compareAndSet(this, oldState, newState)
+
+ @inline
+ protected final def getState: FState[T] = {
+
+ @tailrec
+ def read(): FState[T] = {
+ val cur = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this)
+ if (cur.isInstanceOf[Pending[_]] && isExpired) {
+ if (updateState(cur, Expired)) Expired else read()
+ } else cur
+ }
+
+ read()
+ }
+
+ def complete(value: Either[Throwable, T]): this.type = {
+ val callbacks = {
+ try {
+ @tailrec
+ def tryComplete: List[Future[T] ⇒ Unit] = {
+ val cur = getState
+
+ cur match {
+ case Pending(listeners) ⇒
+ if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners
+ else tryComplete
+ case _ ⇒ Nil
+ }
+ }
+ tryComplete
+ } finally {
+ synchronized { notifyAll() } //Notify any evil blockers
+ }
+ }
+
+ if (callbacks.nonEmpty) Future.dispatchTask(() ⇒ callbacks foreach notifyCompleted)
+
+ this
+ }
+
+ def onComplete(func: Future[T] ⇒ Unit): this.type = {
+ @tailrec //Returns whether the future has already been completed or not
+ def tryAddCallback(): Boolean = {
+ val cur = getState
+ cur match {
+ case _: Success[_] | _: Failure[_] ⇒ true
+ case Expired ⇒ false
+ case p: Pending[_] ⇒
+ val pt = p.asInstanceOf[Pending[T]]
+ if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false
+ else tryAddCallback()
+ }
+ }
+
+ if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func))
+
+ this
+ }
+
+ def onTimeout(func: Future[T] ⇒ Unit): this.type = {
+ val runNow =
+ if (!timeout.duration.isFinite) false //Not possible
+ else if (value.isEmpty) {
+ if (!isExpired) {
+ val runnable = new Runnable {
+ def run() {
+ if (!isCompleted) {
+ if (!isExpired) {
+ // TODO FIXME: add scheduler
+ //dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
+ } else func(DefaultPromise.this)
+ }
+ }
+ }
+ /*
+ TODO FIXME: add scheduler
+ val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
+ onComplete(_ ⇒ timeoutFuture.cancel())
+ */
+ false
+ } else true
+ } else false
+
+ if (runNow) Future.dispatchTask(() ⇒ notifyCompleted(func))
+
+ this
+ }
+
+ final def orElse[A >: T](fallback: ⇒ A): Future[A] =
+ if (timeout.duration.isFinite) {
+ getState match {
+ case _: Success[_] | _: Failure[_] ⇒ this
+ case Expired ⇒ Future[A](fallback, timeout)
+ case _: Pending[_] ⇒
+ val promise = DefaultPromise[A](Timeout.never) //TODO FIXME We can't have infinite timeout here, doesn't make sense.
+ promise completeWith this
+ val runnable = new Runnable {
+ def run() {
+ if (!isCompleted) {
+ if (!isExpired) {
+ // TODO FIXME add scheduler
+ //dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
+ } else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) })
+ }
+ }
+ }
+ // TODO FIXME add
+ //dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
+ promise
+ }
+ } else this
+
+ private def notifyCompleted(func: Future[T] ⇒ Unit) {
+ try { func(this) } catch { case e ⇒ /*dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception"))*/ } //TODO catch, everything? Really?
+ }
+
+ @inline
+ private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)?
+ //TODO: the danger of Math.abs is that it could break the ordering of time. So I would not recommend an abs.
+ @inline
+ private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
+
+ private def timeLeftNoinline(): Long = timeLeft()
+//}
+
+/**
+ * An already completed Future is seeded with it's result at creation, is useful for when you are participating in
+ * a Future-composition but you already have a value to contribute.
+ */
+sealed class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
+ val value = Some(suppliedValue)
+
+ def complete(value: Either[Throwable, T]): this.type = this
+ def onComplete(func: Future[T] ⇒ Unit): this.type = {
+ Future dispatchTask (() ⇒ func(this))
+ this
+ }
+ def await(atMost: Duration): this.type = this
+ def await: this.type = this
+ def isExpired: Boolean = true
+ def timeout: Timeout = Timeout.zero
+
+ final def onTimeout(func: Future[T] ⇒ Unit): this.type = this
+ final def orElse[A >: T](fallback: ⇒ A): Future[A] = this
+ */
+//}
+
+object BoxedType {
+
+ private val toBoxed = Map[Class[_], Class[_]](
+ classOf[Boolean] -> classOf[jl.Boolean],
+ classOf[Byte] -> classOf[jl.Byte],
+ classOf[Char] -> classOf[jl.Character],
+ classOf[Short] -> classOf[jl.Short],
+ classOf[Int] -> classOf[jl.Integer],
+ classOf[Long] -> classOf[jl.Long],
+ classOf[Float] -> classOf[jl.Float],
+ classOf[Double] -> classOf[jl.Double],
+ classOf[Unit] -> classOf[scala.runtime.BoxedUnit])
+
+ def apply(c: Class[_]): Class[_] = {
+ if (c.isPrimitive) toBoxed(c) else c
+ }
+
+}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
new file mode 100644
index 0000000000..c5336ab00f
--- /dev/null
+++ b/src/library/scala/concurrent/Promise.scala
@@ -0,0 +1,63 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+
+
+/** Promise is an object which can be completed with a value or failed
+ * with an exception.
+ *
+ * A promise is assigned a timeout when created. After the timeout expires,
+ * the promise will be failed with a TimeoutException.
+ *
+ * @define promiseCompletion
+ * If the promise has already been fulfilled, failed or has timed out,
+ * calling this method will throw an IllegalStateException.
+ */
+trait Promise[T] {
+
+ /** Future containing the value of this promise.
+ */
+ def future: Future[T]
+
+ /** Completes the promise with a value.
+ *
+ * @param value The value to complete the promise with.
+ *
+ * $promiseCompletion
+ */
+ def fulfill(value: T): Unit
+
+ /** Completes the promise with an exception.
+ *
+ * @param t The throwable to complete the promise with.
+ *
+ * $promiseCompletion
+ */
+ def fail(t: Throwable): Unit
+
+ /** The timeout for this promise.
+ */
+ def timeout: Timeout
+}
+
+
+object Promise {
+ /*
+ /**
+ * Creates a non-completed, new, Promise with the supplied timeout in milliseconds
+ */
+ def apply[A](timeout: Timeout)(implicit dispatcher: MessageDispatcher): Promise[A] = DefaultPromise[A](timeout)
+
+ /**
+ * Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf)
+ */
+ def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[A] = apply(timeout)
+ */
+}
diff --git a/src/library/scala/concurrent/Task.scala b/src/library/scala/concurrent/Task.scala
new file mode 100644
index 0000000000..98c7da77d2
--- /dev/null
+++ b/src/library/scala/concurrent/Task.scala
@@ -0,0 +1,13 @@
+package scala.concurrent
+
+
+
+trait Task[T] {
+
+ def start(): Unit
+
+ def future: Future[T]
+
+}
+
+
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
new file mode 100644
index 0000000000..51bb1ac3e0
--- /dev/null
+++ b/src/library/scala/concurrent/package.scala
@@ -0,0 +1,93 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+
+
+package scala
+
+
+
+
+/** This package object contains primitives for parallel programming.
+ */
+package object concurrent {
+
+ type ExecutionException = java.util.concurrent.ExecutionException
+ type CancellationException = java.util.concurrent.CancellationException
+ type TimeoutException = java.util.concurrent.TimeoutException
+
+ private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] {
+ override protected def initialValue = null
+ }
+
+ /** The keyword used to block on a piece of code which potentially blocks.
+ *
+ * @define mayThrow
+ * Calling this method may throw the following exceptions:
+ * - CancellationException - if the computation was cancelled
+ * - InterruptedException - in the case that a wait within the blockable object was interrupted
+ * - TimeoutException - in the case that the blockable object timed out
+ */
+ object block {
+
+ /** Blocks on a piece of code.
+ *
+ * @param body A piece of code which contains potentially blocking or long running calls.
+ *
+ * $mayThrow
+ */
+ def on[T](body: =>T): T = on(new Blockable[T] {
+ def block()(implicit cb: CanBlock) = body
+ })
+
+ /** Blocks on a blockable object.
+ *
+ * @param blockable An object with a `block` method which runs potentially blocking or long running calls.
+ *
+ * $mayThrow
+ */
+ def on[T](blockable: Blockable[T]): T = {
+ currentExecutionContext.get match {
+ case null => blockable.block()(null) // outside
+ case x => x.blockingCall(blockable) // inside an execution context thread
+ }
+ }
+ }
+
+ def future[T](body: =>T): Future[T] = null // TODO
+
+ val handledFutureException: PartialFunction[Throwable, Throwable] = {
+ case t: Throwable if isFutureThrowable => t
+ }
+
+ // TODO rename appropriately and make public
+ private[concurrent] def isFutureThrowable(t: Throwable) = t match {
+ case e: Error => false
+ case t: scala.util.control.ControlThrowable => false
+ case i: InterruptException => false
+ case _ => true
+ }
+
+}
+
+
+package concurrent {
+
+ private[concurrent] trait CanBlock
+
+ /** A timeout exception.
+ *
+ * Futures are failed with a timeout exception when their timeout expires.
+ *
+ * Each timeout exception contains an origin future which originally timed out.
+ */
+ class FutureTimeoutException(origin: Future[T], message: String) extends TimeoutException(message) {
+ def this(origin: Future[T]) = this(origin, "Future timed out.")
+ }
+
+}
diff --git a/src/library/scala/package.scala b/src/library/scala/package.scala
index 0c5d10b15e..915ce6a648 100644
--- a/src/library/scala/package.scala
+++ b/src/library/scala/package.scala
@@ -27,7 +27,8 @@ package object scala {
type NoSuchElementException = java.util.NoSuchElementException
type NumberFormatException = java.lang.NumberFormatException
type AbstractMethodError = java.lang.AbstractMethodError
-
+ type InterruptedException = java.lang.InterruptedException
+
@deprecated("instead of `@serializable class C`, use `class C extends Serializable`", "2.9.0")
type serializable = annotation.serializable
diff --git a/src/library/scala/util/Duration.scala b/src/library/scala/util/Duration.scala
new file mode 100644
index 0000000000..4c118f8b3b
--- /dev/null
+++ b/src/library/scala/util/Duration.scala
@@ -0,0 +1,485 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package scala.util
+
+import java.util.concurrent.TimeUnit
+import TimeUnit._
+import java.lang.{ Long ⇒ JLong, Double ⇒ JDouble }
+//import akka.actor.ActorSystem (commented methods)
+
+class TimerException(message: String) extends RuntimeException(message)
+
+/**
+ * Simple timer class.
+ * Usage:
+ * <pre>
+ * import akka.util.duration._
+ * import akka.util.Timer
+ *
+ * val timer = Timer(30.seconds)
+ * while (timer.isTicking) { ... }
+ * </pre>
+ */
+case class Timer(duration: Duration, throwExceptionOnTimeout: Boolean = false) {
+ val startTimeInMillis = System.currentTimeMillis
+ val timeoutInMillis = duration.toMillis
+
+ /**
+ * Returns true while the timer is ticking. After that it either throws and exception or
+ * returns false. Depending on if the 'throwExceptionOnTimeout' argument is true or false.
+ */
+ def isTicking: Boolean = {
+ if (!(timeoutInMillis > (System.currentTimeMillis - startTimeInMillis))) {
+ if (throwExceptionOnTimeout) throw new TimerException("Time out after " + duration)
+ else false
+ } else true
+ }
+}
+
+object Duration {
+ def apply(length: Long, unit: TimeUnit): Duration = new FiniteDuration(length, unit)
+ def apply(length: Double, unit: TimeUnit): Duration = fromNanos(unit.toNanos(1) * length)
+ def apply(length: Long, unit: String): Duration = new FiniteDuration(length, timeUnit(unit))
+
+ def fromNanos(nanos: Long): Duration = {
+ if (nanos % 86400000000000L == 0) {
+ Duration(nanos / 86400000000000L, DAYS)
+ } else if (nanos % 3600000000000L == 0) {
+ Duration(nanos / 3600000000000L, HOURS)
+ } else if (nanos % 60000000000L == 0) {
+ Duration(nanos / 60000000000L, MINUTES)
+ } else if (nanos % 1000000000L == 0) {
+ Duration(nanos / 1000000000L, SECONDS)
+ } else if (nanos % 1000000L == 0) {
+ Duration(nanos / 1000000L, MILLISECONDS)
+ } else if (nanos % 1000L == 0) {
+ Duration(nanos / 1000L, MICROSECONDS)
+ } else {
+ Duration(nanos, NANOSECONDS)
+ }
+ }
+
+ def fromNanos(nanos: Double): Duration = fromNanos((nanos + 0.5).asInstanceOf[Long])
+
+ /**
+ * Construct a Duration by parsing a String. In case of a format error, a
+ * RuntimeException is thrown. See `unapply(String)` for more information.
+ */
+ def apply(s: String): Duration = unapply(s) getOrElse sys.error("format error")
+
+ /**
+ * Deconstruct a Duration into length and unit if it is finite.
+ */
+ def unapply(d: Duration): Option[(Long, TimeUnit)] = {
+ if (d.finite_?) {
+ Some((d.length, d.unit))
+ } else {
+ None
+ }
+ }
+
+ private val RE = ("""^\s*(\d+(?:\.\d+)?)\s*""" + // length part
+ "(?:" + // units are distinguished in separate match groups
+ "(d|day|days)|" +
+ "(h|hour|hours)|" +
+ "(min|minute|minutes)|" +
+ "(s|sec|second|seconds)|" +
+ "(ms|milli|millis|millisecond|milliseconds)|" +
+ "(µs|micro|micros|microsecond|microseconds)|" +
+ "(ns|nano|nanos|nanosecond|nanoseconds)" +
+ """)\s*$""").r // close the non-capturing group
+ private val REinf = """^\s*Inf\s*$""".r
+ private val REminf = """^\s*(?:-\s*|Minus)Inf\s*""".r
+
+ /**
+ * Parse String, return None if no match. Format is `"<length><unit>"`, where
+ * whitespace is allowed before, between and after the parts. Infinities are
+ * designated by `"Inf"` and `"-Inf"` or `"MinusInf"`.
+ */
+ def unapply(s: String): Option[Duration] = s match {
+ case RE(length, d, h, m, s, ms, mus, ns) ⇒
+ if (d ne null) Some(Duration(JDouble.parseDouble(length), DAYS)) else if (h ne null) Some(Duration(JDouble.parseDouble(length), HOURS)) else if (m ne null) Some(Duration(JDouble.parseDouble(length), MINUTES)) else if (s ne null) Some(Duration(JDouble.parseDouble(length), SECONDS)) else if (ms ne null) Some(Duration(JDouble.parseDouble(length), MILLISECONDS)) else if (mus ne null) Some(Duration(JDouble.parseDouble(length), MICROSECONDS)) else if (ns ne null) Some(Duration(JDouble.parseDouble(length), NANOSECONDS)) else
+ sys.error("made some error in regex (should not be possible)")
+ case REinf() ⇒ Some(Inf)
+ case REminf() ⇒ Some(MinusInf)
+ case _ ⇒ None
+ }
+
+ /**
+ * Parse TimeUnit from string representation.
+ */
+ def timeUnit(unit: String) = unit.toLowerCase match {
+ case "d" | "day" | "days" ⇒ DAYS
+ case "h" | "hour" | "hours" ⇒ HOURS
+ case "min" | "minute" | "minutes" ⇒ MINUTES
+ case "s" | "sec" | "second" | "seconds" ⇒ SECONDS
+ case "ms" | "milli" | "millis" | "millisecond" | "milliseconds" ⇒ MILLISECONDS
+ case "µs" | "micro" | "micros" | "microsecond" | "microseconds" ⇒ MICROSECONDS
+ case "ns" | "nano" | "nanos" | "nanosecond" | "nanoseconds" ⇒ NANOSECONDS
+ }
+
+ val Zero: Duration = new FiniteDuration(0, NANOSECONDS)
+ val Undefined: Duration = new Duration with Infinite {
+ override def toString = "Duration.Undefined"
+ override def equals(other: Any) = other.asInstanceOf[AnyRef] eq this
+ override def +(other: Duration): Duration = throw new IllegalArgumentException("cannot add Undefined duration")
+ override def -(other: Duration): Duration = throw new IllegalArgumentException("cannot subtract Undefined duration")
+ override def *(factor: Double): Duration = throw new IllegalArgumentException("cannot multiply Undefined duration")
+ override def /(factor: Double): Duration = throw new IllegalArgumentException("cannot divide Undefined duration")
+ override def /(other: Duration): Double = throw new IllegalArgumentException("cannot divide Undefined duration")
+ def >(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def >=(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def <(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def <=(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def unary_- : Duration = throw new IllegalArgumentException("cannot negate Undefined duration")
+ }
+
+ trait Infinite {
+ this: Duration ⇒
+
+ override def equals(other: Any) = false
+
+ def +(other: Duration): Duration =
+ other match {
+ case _: this.type ⇒ this
+ case _: Infinite ⇒ throw new IllegalArgumentException("illegal addition of infinities")
+ case _ ⇒ this
+ }
+ def -(other: Duration): Duration =
+ other match {
+ case _: this.type ⇒ throw new IllegalArgumentException("illegal subtraction of infinities")
+ case _ ⇒ this
+ }
+ def *(factor: Double): Duration = this
+ def /(factor: Double): Duration = this
+ def /(other: Duration): Double =
+ other match {
+ case _: Infinite ⇒ throw new IllegalArgumentException("illegal division of infinities")
+ // maybe questionable but pragmatic: Inf / 0 => Inf
+ case x ⇒ Double.PositiveInfinity * (if ((this > Zero) ^ (other >= Zero)) -1 else 1)
+ }
+
+ def finite_? = false
+
+ def length: Long = throw new IllegalArgumentException("length not allowed on infinite Durations")
+ def unit: TimeUnit = throw new IllegalArgumentException("unit not allowed on infinite Durations")
+ def toNanos: Long = throw new IllegalArgumentException("toNanos not allowed on infinite Durations")
+ def toMicros: Long = throw new IllegalArgumentException("toMicros not allowed on infinite Durations")
+ def toMillis: Long = throw new IllegalArgumentException("toMillis not allowed on infinite Durations")
+ def toSeconds: Long = throw new IllegalArgumentException("toSeconds not allowed on infinite Durations")
+ def toMinutes: Long = throw new IllegalArgumentException("toMinutes not allowed on infinite Durations")
+ def toHours: Long = throw new IllegalArgumentException("toHours not allowed on infinite Durations")
+ def toDays: Long = throw new IllegalArgumentException("toDays not allowed on infinite Durations")
+ def toUnit(unit: TimeUnit): Double = throw new IllegalArgumentException("toUnit not allowed on infinite Durations")
+
+ def printHMS = toString
+ }
+
+ /**
+ * Infinite duration: greater than any other and not equal to any other,
+ * including itself.
+ */
+ val Inf: Duration = new Duration with Infinite {
+ override def toString = "Duration.Inf"
+ def >(other: Duration) = true
+ def >=(other: Duration) = true
+ def <(other: Duration) = false
+ def <=(other: Duration) = false
+ def unary_- : Duration = MinusInf
+ }
+
+ /**
+ * Infinite negative duration: lesser than any other and not equal to any other,
+ * including itself.
+ */
+ val MinusInf: Duration = new Duration with Infinite {
+ override def toString = "Duration.MinusInf"
+ def >(other: Duration) = false
+ def >=(other: Duration) = false
+ def <(other: Duration) = true
+ def <=(other: Duration) = true
+ def unary_- : Duration = Inf
+ }
+
+ // Java Factories
+ def create(length: Long, unit: TimeUnit): Duration = apply(length, unit)
+ def create(length: Double, unit: TimeUnit): Duration = apply(length, unit)
+ def create(length: Long, unit: String): Duration = apply(length, unit)
+ def parse(s: String): Duration = unapply(s).get
+}
+
+/**
+ * Utility for working with java.util.concurrent.TimeUnit durations.
+ *
+ * <p/>
+ * Examples of usage from Java:
+ * <pre>
+ * import akka.util.FiniteDuration;
+ * import java.util.concurrent.TimeUnit;
+ *
+ * Duration duration = new FiniteDuration(100, MILLISECONDS);
+ * Duration duration = new FiniteDuration(5, "seconds");
+ *
+ * duration.toNanos();
+ * </pre>
+ *
+ * <p/>
+ * Examples of usage from Scala:
+ * <pre>
+ * import akka.util.Duration
+ * import java.util.concurrent.TimeUnit
+ *
+ * val duration = Duration(100, MILLISECONDS)
+ * val duration = Duration(100, "millis")
+ *
+ * duration.toNanos
+ * duration < 1.second
+ * duration <= Duration.Inf
+ * </pre>
+ *
+ * <p/>
+ * Implicits are also provided for Int, Long and Double. Example usage:
+ * <pre>
+ * import akka.util.duration._
+ *
+ * val duration = 100 millis
+ * </pre>
+ *
+ * Extractors, parsing and arithmetic are also included:
+ * <pre>
+ * val d = Duration("1.2 µs")
+ * val Duration(length, unit) = 5 millis
+ * val d2 = d * 2.5
+ * val d3 = d2 + 1.millisecond
+ * </pre>
+ */
+abstract class Duration extends Serializable {
+ def length: Long
+ def unit: TimeUnit
+ def toNanos: Long
+ def toMicros: Long
+ def toMillis: Long
+ def toSeconds: Long
+ def toMinutes: Long
+ def toHours: Long
+ def toDays: Long
+ def toUnit(unit: TimeUnit): Double
+ def printHMS: String
+ def <(other: Duration): Boolean
+ def <=(other: Duration): Boolean
+ def >(other: Duration): Boolean
+ def >=(other: Duration): Boolean
+ def +(other: Duration): Duration
+ def -(other: Duration): Duration
+ def *(factor: Double): Duration
+ def /(factor: Double): Duration
+ def /(other: Duration): Double
+ def unary_- : Duration
+ def finite_? : Boolean
+// def dilated(implicit system: ActorSystem): Duration = this * system.settings.TestTimeFactor
+ def min(other: Duration): Duration = if (this < other) this else other
+ def max(other: Duration): Duration = if (this > other) this else other
+ def sleep(): Unit = Thread.sleep(toMillis)
+
+ // Java API
+ def lt(other: Duration) = this < other
+ def lteq(other: Duration) = this <= other
+ def gt(other: Duration) = this > other
+ def gteq(other: Duration) = this >= other
+ def plus(other: Duration) = this + other
+ def minus(other: Duration) = this - other
+ def mul(factor: Double) = this * factor
+ def div(factor: Double) = this / factor
+ def div(other: Duration) = this / other
+ def neg() = -this
+ def isFinite() = finite_?
+}
+
+class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duration {
+ import Duration._
+
+ def this(length: Long, unit: String) = this(length, Duration.timeUnit(unit))
+
+ def toNanos = unit.toNanos(length)
+ def toMicros = unit.toMicros(length)
+ def toMillis = unit.toMillis(length)
+ def toSeconds = unit.toSeconds(length)
+ def toMinutes = unit.toMinutes(length)
+ def toHours = unit.toHours(length)
+ def toDays = unit.toDays(length)
+ def toUnit(u: TimeUnit) = long2double(toNanos) / NANOSECONDS.convert(1, u)
+
+ override def toString = this match {
+ case Duration(1, DAYS) ⇒ "1 day"
+ case Duration(x, DAYS) ⇒ x + " days"
+ case Duration(1, HOURS) ⇒ "1 hour"
+ case Duration(x, HOURS) ⇒ x + " hours"
+ case Duration(1, MINUTES) ⇒ "1 minute"
+ case Duration(x, MINUTES) ⇒ x + " minutes"
+ case Duration(1, SECONDS) ⇒ "1 second"
+ case Duration(x, SECONDS) ⇒ x + " seconds"
+ case Duration(1, MILLISECONDS) ⇒ "1 millisecond"
+ case Duration(x, MILLISECONDS) ⇒ x + " milliseconds"
+ case Duration(1, MICROSECONDS) ⇒ "1 microsecond"
+ case Duration(x, MICROSECONDS) ⇒ x + " microseconds"
+ case Duration(1, NANOSECONDS) ⇒ "1 nanosecond"
+ case Duration(x, NANOSECONDS) ⇒ x + " nanoseconds"
+ }
+
+ def printHMS = "%02d:%02d:%06.3f".format(toHours, toMinutes % 60, toMillis / 1000.0 % 60)
+
+ def <(other: Duration) = {
+ if (other.finite_?) {
+ toNanos < other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other > this
+ }
+ }
+
+ def <=(other: Duration) = {
+ if (other.finite_?) {
+ toNanos <= other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other >= this
+ }
+ }
+
+ def >(other: Duration) = {
+ if (other.finite_?) {
+ toNanos > other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other < this
+ }
+ }
+
+ def >=(other: Duration) = {
+ if (other.finite_?) {
+ toNanos >= other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other <= this
+ }
+ }
+
+ def +(other: Duration) = {
+ if (!other.finite_?) {
+ other
+ } else {
+ val nanos = toNanos + other.asInstanceOf[FiniteDuration].toNanos
+ fromNanos(nanos)
+ }
+ }
+
+ def -(other: Duration) = {
+ if (!other.finite_?) {
+ other
+ } else {
+ val nanos = toNanos - other.asInstanceOf[FiniteDuration].toNanos
+ fromNanos(nanos)
+ }
+ }
+
+ def *(factor: Double) = fromNanos(long2double(toNanos) * factor)
+
+ def /(factor: Double) = fromNanos(long2double(toNanos) / factor)
+
+ def /(other: Duration) = if (other.finite_?) long2double(toNanos) / other.toNanos else 0
+
+ def unary_- = Duration(-length, unit)
+
+ def finite_? = true
+
+ override def equals(other: Any) =
+ other.isInstanceOf[FiniteDuration] &&
+ toNanos == other.asInstanceOf[FiniteDuration].toNanos
+
+ override def hashCode = toNanos.asInstanceOf[Int]
+}
+
+class DurationInt(n: Int) {
+ def nanoseconds = Duration(n, NANOSECONDS)
+ def nanos = Duration(n, NANOSECONDS)
+ def nanosecond = Duration(n, NANOSECONDS)
+ def nano = Duration(n, NANOSECONDS)
+
+ def microseconds = Duration(n, MICROSECONDS)
+ def micros = Duration(n, MICROSECONDS)
+ def microsecond = Duration(n, MICROSECONDS)
+ def micro = Duration(n, MICROSECONDS)
+
+ def milliseconds = Duration(n, MILLISECONDS)
+ def millis = Duration(n, MILLISECONDS)
+ def millisecond = Duration(n, MILLISECONDS)
+ def milli = Duration(n, MILLISECONDS)
+
+ def seconds = Duration(n, SECONDS)
+ def second = Duration(n, SECONDS)
+
+ def minutes = Duration(n, MINUTES)
+ def minute = Duration(n, MINUTES)
+
+ def hours = Duration(n, HOURS)
+ def hour = Duration(n, HOURS)
+
+ def days = Duration(n, DAYS)
+ def day = Duration(n, DAYS)
+}
+
+class DurationLong(n: Long) {
+ def nanoseconds = Duration(n, NANOSECONDS)
+ def nanos = Duration(n, NANOSECONDS)
+ def nanosecond = Duration(n, NANOSECONDS)
+ def nano = Duration(n, NANOSECONDS)
+
+ def microseconds = Duration(n, MICROSECONDS)
+ def micros = Duration(n, MICROSECONDS)
+ def microsecond = Duration(n, MICROSECONDS)
+ def micro = Duration(n, MICROSECONDS)
+
+ def milliseconds = Duration(n, MILLISECONDS)
+ def millis = Duration(n, MILLISECONDS)
+ def millisecond = Duration(n, MILLISECONDS)
+ def milli = Duration(n, MILLISECONDS)
+
+ def seconds = Duration(n, SECONDS)
+ def second = Duration(n, SECONDS)
+
+ def minutes = Duration(n, MINUTES)
+ def minute = Duration(n, MINUTES)
+
+ def hours = Duration(n, HOURS)
+ def hour = Duration(n, HOURS)
+
+ def days = Duration(n, DAYS)
+ def day = Duration(n, DAYS)
+}
+
+class DurationDouble(d: Double) {
+ def nanoseconds = Duration(d, NANOSECONDS)
+ def nanos = Duration(d, NANOSECONDS)
+ def nanosecond = Duration(d, NANOSECONDS)
+ def nano = Duration(d, NANOSECONDS)
+
+ def microseconds = Duration(d, MICROSECONDS)
+ def micros = Duration(d, MICROSECONDS)
+ def microsecond = Duration(d, MICROSECONDS)
+ def micro = Duration(d, MICROSECONDS)
+
+ def milliseconds = Duration(d, MILLISECONDS)
+ def millis = Duration(d, MILLISECONDS)
+ def millisecond = Duration(d, MILLISECONDS)
+ def milli = Duration(d, MILLISECONDS)
+
+ def seconds = Duration(d, SECONDS)
+ def second = Duration(d, SECONDS)
+
+ def minutes = Duration(d, MINUTES)
+ def minute = Duration(d, MINUTES)
+
+ def hours = Duration(d, HOURS)
+ def hour = Duration(d, HOURS)
+
+ def days = Duration(d, DAYS)
+ def day = Duration(d, DAYS)
+}
diff --git a/src/library/scala/util/Timeout.scala b/src/library/scala/util/Timeout.scala
new file mode 100644
index 0000000000..0190675344
--- /dev/null
+++ b/src/library/scala/util/Timeout.scala
@@ -0,0 +1,33 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+package scala.util
+
+import java.util.concurrent.TimeUnit
+
+case class Timeout(duration: Duration) {
+ def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
+ def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
+}
+
+object Timeout {
+ /**
+ * A timeout with zero duration, will cause most requests to always timeout.
+ */
+ val zero = new Timeout(Duration.Zero)
+
+ /**
+ * A Timeout with infinite duration. Will never timeout. Use extreme caution with this
+ * as it may cause memory leaks, blocked threads, or may not even be supported by
+ * the receiver, which would result in an exception.
+ */
+ val never = new Timeout(Duration.Inf)
+
+ def apply(timeout: Long) = new Timeout(timeout)
+ def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit)
+
+ implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
+ implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
+ implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
+ //implicit def defaultTimeout(implicit system: ActorSystem) = system.settings.ActorTimeout (have to introduce this in ActorSystem)
+}