summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/concurrent/AbstractPromise.java.disabled17
-rw-r--r--src/library/scala/concurrent/Blockable.scala23
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala117
-rw-r--r--src/library/scala/concurrent/Future.scala214
-rw-r--r--src/library/scala/concurrent/Promise.scala30
-rw-r--r--src/library/scala/concurrent/package.scala49
-rw-r--r--src/library/scala/util/Duration.scala485
-rw-r--r--src/library/scala/util/Timeout.scala33
8 files changed, 968 insertions, 0 deletions
diff --git a/src/library/scala/concurrent/AbstractPromise.java.disabled b/src/library/scala/concurrent/AbstractPromise.java.disabled
new file mode 100644
index 0000000000..726e6a3156
--- /dev/null
+++ b/src/library/scala/concurrent/AbstractPromise.java.disabled
@@ -0,0 +1,17 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package scala.concurrent;
+/*
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import scala.concurrent.forkjoin.*;
+
+abstract class AbstractPromise {
+ //private volatile Object _ref = DefaultPromise.EmptyPending();
+ protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
+
+ protected void compute() { }
+}
+*/
diff --git a/src/library/scala/concurrent/Blockable.scala b/src/library/scala/concurrent/Blockable.scala
new file mode 100644
index 0000000000..1ad02c7469
--- /dev/null
+++ b/src/library/scala/concurrent/Blockable.scala
@@ -0,0 +1,23 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+
+
+import scala.annotation.implicitNotFound
+
+
+
+trait Blockable[+T] {
+ @implicitNotFound(msg = "Blocking must be done by calling `block on b`, where `b` is the Blockable object.")
+ def block()(implicit canblock: CanBlock): T
+}
+
+
+
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
new file mode 100644
index 0000000000..972a76a95a
--- /dev/null
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -0,0 +1,117 @@
+package scala.concurrent
+
+import java.util.concurrent.{ Executors, Future => JFuture }
+import scala.util.{ Duration, Timeout }
+import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
+
+trait ExecutionContext {
+
+ protected implicit object CanBlockEvidence extends CanBlock
+
+ def execute(task: Runnable): Unit
+
+ def makeTask[T](task: () => T)(implicit timeout: Timeout): Task[T]
+
+ def makePromise[T](timeout: Timeout): Promise[T]
+
+ def blockingCall[T](body: Blockable[T]): T
+
+}
+
+trait Task[T] {
+
+ def start(): Unit
+ def future: Future[T]
+
+}
+
+/* DONE: The challenge is to make ForkJoinPromise inherit from RecursiveAction
+ * to avoid an object allocation per promise. This requires turning DefaultPromise
+ * into a trait, i.e., removing its constructor parameters.
+ */
+private[concurrent] class ForkJoinTaskImpl[T](context: ForkJoinExecutionContext, body: () => T, within: Timeout) extends FJTask[T] with Task[T] {
+
+ val timeout = within
+ implicit val dispatcher = context
+
+ // body of RecursiveTask
+ def compute(): T =
+ body()
+
+ def start(): Unit =
+ fork()
+
+ def future: Future[T] = {
+ null
+ }
+
+ // TODO FIXME: handle timeouts
+ def await(atMost: Duration): this.type =
+ await
+
+ def await: this.type = {
+ this.join()
+ this
+ }
+
+ def tryCancel(): Unit =
+ tryUnfork()
+}
+
+private[concurrent] final class ForkJoinExecutionContext extends ExecutionContext {
+ val pool = new ForkJoinPool
+
+ @inline
+ private def executeForkJoinTask(task: RecursiveAction) {
+ if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread])
+ task.fork()
+ else
+ pool execute task
+ }
+
+ def execute(task: Runnable) {
+ val action = new RecursiveAction { def compute() { task.run() } }
+ executeForkJoinTask(action)
+ }
+
+ def makeTask[T](body: () => T)(implicit timeout: Timeout): Task[T] = {
+ new ForkJoinTaskImpl(this, body, timeout)
+ }
+
+ def makePromise[T](timeout: Timeout): Promise[T] =
+ null
+
+ def blockingCall[T](body: Blockable[T]): T =
+ body.block()(CanBlockEvidence)
+
+}
+
+/**
+ * Implements a blocking execution context
+ */
+/*
+private[concurrent] class BlockingExecutionContext extends ExecutionContext {
+ //val pool = makeCachedThreadPool // TODO FIXME: need to merge thread pool factory methods from Heather's parcolls repo
+
+ def execute(task: Runnable) {
+ /* TODO
+ val p = newPromise(task.run())
+ p.start()
+ pool execute p
+ */
+ }
+
+ // TODO FIXME: implement
+ def newPromise[T](body: => T): Promise[T] = {
+ throw new Exception("not yet implemented")
+ }
+}
+*/
+
+object ExecutionContext {
+
+ lazy val forNonBlocking = new ForkJoinExecutionContext
+
+ //lazy val forBlocking = new BlockingExecutionContext
+
+}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
new file mode 100644
index 0000000000..222c5d18b6
--- /dev/null
+++ b/src/library/scala/concurrent/Future.scala
@@ -0,0 +1,214 @@
+
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package scala.concurrent
+
+//import akka.AkkaException (replaced with Exception)
+//import akka.event.Logging.Error (removed all logging)
+import scala.util.{ Timeout, Duration }
+import scala.Option
+//import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } (commented methods)
+
+import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
+import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ NANOS, MILLISECONDS ⇒ MILLIS }
+import java.lang.{ Iterable ⇒ JIterable }
+import java.util.{ LinkedList ⇒ JLinkedList }
+
+import scala.annotation.tailrec
+import scala.collection.mutable.Stack
+//import akka.util.Switch (commented method)
+import java.{ lang ⇒ jl }
+import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
+
+
+
+/** The trait that represents futures.
+ *
+ * @define futureTimeout
+ * The timeout of the future is:
+ * - if this future was obtained from a task (i.e. by calling `task.future`), the timeout associated with that task
+ * - if this future was obtained from a promise (i.e. by calling `promise.future`), the timeout associated with that promise
+ * - if this future was obtained from a combinator on some other future `g` (e.g. by calling `g.map(_)`), the timeout of `g`
+ * - if this future was obtained from a combinator on multiple futures `g0`, ..., `g1`, the minimum of the timeouts of these futures
+ *
+ * @define multipleCallbacks
+ * Multiple callbacks may be registered; there is no guarantee that they will be
+ * executed in a particular order.
+ *
+ * @define caughtThrowables
+ * The future may contain a throwable object and this means that the future failed.
+ * Futures obtained through combinators have the same exception as the future they were obtained from.
+ * The following throwable objects are treated differently:
+ * - Error - errors are not contained within futures
+ * - NonLocalControlException - not contained within futures
+ * - InterruptedException - not contained within futures
+ *
+ * @define forComprehensionExamples
+ * Example:
+ *
+ * {{{
+ * val f = future { 5 }
+ * val g = future { 3 }
+ * val h = for {
+ * x: Int <- f // returns Future(5)
+ * y: Int <- g // returns Future(5)
+ * } yield x + y
+ * }}}
+ *
+ * is translated to:
+ *
+ * {{{
+ * f flatMap { (x: Int) => g map { (y: Int) => x + y } }
+ * }}}
+ */
+trait Future[+T] extends Blockable[T] {
+
+ /* Callbacks */
+
+ /** When this future is completed successfully (i.e. with a value),
+ * apply the provided function to the value.
+ *
+ * If the future has already been completed with a value,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * Will not be called in case of a timeout.
+ *
+ * Will not be called in case of an exception.
+ *
+ * $multipleCallbacks
+ */
+ def onSuccess[U](func: T => U): this.type
+
+ /** When this future is completed with a failure (i.e. with a throwable),
+ * apply the provided function to the throwable.
+ *
+ * $caughtThrowables
+ *
+ * If the future has already been completed with a failure,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * Will not be called in case of a timeout.
+ *
+ * Will not be called in case of an exception.
+ *
+ * $multipleCallbacks
+ */
+ def onFailure[U](func: Throwable => U): this.type
+
+ /** When this future times out, apply the provided function.
+ *
+ * If the future has already timed out,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * $multipleCallbacks
+ */
+ def onTimeout[U](func: => U): this.type
+
+ /** When this future is completed, either through an exception, a timeout, or a value,
+ * apply the provided function.
+ *
+ * If the future has already been completed,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * $multipleCallbacks
+ */
+ def onComplete[U](func: Either[Throwable, T] => U): this.type
+
+
+ /* Various info */
+
+ /** Tests whether this Future's timeout has expired.
+ *
+ * $futureTimeout
+ *
+ * Note that an expired Future may still contain a value, or it may be
+ * completed with a value.
+ */
+ def isTimedout: Boolean
+
+ /** This future's timeout.
+ *
+ * $futureTimeout
+ */
+ def timeout: Timeout
+
+ /** This future's timeout in nanoseconds.
+ *
+ * $futureTimeout
+ */
+ def timeoutInNanos = if (timeout.duration.isFinite) timeout.duration.toNanos else Long.MaxValue
+
+
+ /* Projections */
+
+ def failed: Future[Exception]
+
+ def timedout: Future[Timeout]
+
+
+ /* Monadic operations */
+
+ /** Creates a new future that will handle any matching throwable that this
+ * future might contain. If there is no match, or if this future contains
+ * a valid result then the new future will contain the same.
+ *
+ * Example:
+ *
+ * {{{
+ * future (6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0
+ * future (6 / 0) recover { case e: NotFoundException ⇒ 0 } // result: exception
+ * future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
+ * }}}
+ */
+ def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit timeout: Timeout): Future[U]
+
+ /** Asynchronously processes the value in the future once the value becomes available.
+ *
+ * Will not be called if the future times out or fails.
+ *
+ * This method typically registers an `onResult` callback.
+ */
+ def foreach[U](f: T => U): Unit
+
+ /** Creates a new future by applying a function to the successful result of
+ * this future. If this future is completed with an exception then the new
+ * future will also contain this exception.
+ *
+ * $forComprehensionExample
+ */
+ def map[S](f: T => S)(implicit timeout: Timeout): Future[S]
+
+ /** Creates a new future by applying a function to the successful result of
+ * this future, and returns the result of the function as the new future.
+ * If this future is completed with an exception then the new future will
+ * also contain this exception.
+ *
+ * $forComprehensionExample
+ */
+ def flatMap[S](f: T => Future[S])(implicit timeout: Timeout): Future[S]
+
+ /** Creates a new future by filtering the value of the current future with a predicate.
+ *
+ * If the current future contains a value which satisfies the predicate, the new future will also hold that value.
+ * Otherwise, the resulting future will fail with a `NoSuchElementException`.
+ *
+ * If the current future fails or times out, the resulting future also fails or times out, respectively.
+ *
+ * Example:
+ * {{{
+ * val f = future { 5 }
+ * val g = g filter { _ % 2 == 1 }
+ * val h = f filter { _ % 2 == 0 }
+ * block on g // evaluates to 5
+ * block on h // throw a NoSuchElementException
+ * }}}
+ */
+ def filter(p: T => Boolean)(implicit timeout: Timeout): Future[T]
+
+}
+
+class FutureTimeoutException(message: String, cause: Throwable = null) extends Exception(message, cause) {
+ def this(message: String) = this(message, null)
+}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
new file mode 100644
index 0000000000..109f6b4eba
--- /dev/null
+++ b/src/library/scala/concurrent/Promise.scala
@@ -0,0 +1,30 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+trait Promise[T] {
+ def future: Future[T]
+
+ def fulfill(value: T): Unit
+ def fail(t: Throwable): Unit
+}
+
+object Promise {
+ /*
+ /**
+ * Creates a non-completed, new, Promise with the supplied timeout in milliseconds
+ */
+ def apply[A](timeout: Timeout)(implicit dispatcher: MessageDispatcher): Promise[A] = DefaultPromise[A](timeout)
+
+ /**
+ * Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf)
+ */
+ def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[A] = apply(timeout)
+ */
+}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
new file mode 100644
index 0000000000..2a1ff9b799
--- /dev/null
+++ b/src/library/scala/concurrent/package.scala
@@ -0,0 +1,49 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+
+
+package scala
+
+
+
+
+/** This package object contains primitives for parallel programming.
+ */
+package object concurrent {
+
+ type MessageDispatcher = ExecutionContext // TODO FIXME: change futures to use execution context
+
+ private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] {
+ override protected def initialValue = null
+ }
+
+ object block {
+ def on[T](body: =>T): T = on(new Blockable[T] {
+ def block()(implicit cb: CanBlock) = body
+ })
+
+ def on[T](blockable: Blockable[T]): T = {
+ currentExecutionContext.get match {
+ case null => blockable.block()(null) // outside
+ case x => x.blockingCall(blockable) // inside an execution context thread
+ }
+ }
+ }
+
+
+ def future[T](body: =>T): Future[T] = null // TODO
+
+}
+
+
+package concurrent {
+
+ private[concurrent] trait CanBlock
+
+}
diff --git a/src/library/scala/util/Duration.scala b/src/library/scala/util/Duration.scala
new file mode 100644
index 0000000000..4c118f8b3b
--- /dev/null
+++ b/src/library/scala/util/Duration.scala
@@ -0,0 +1,485 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package scala.util
+
+import java.util.concurrent.TimeUnit
+import TimeUnit._
+import java.lang.{ Long ⇒ JLong, Double ⇒ JDouble }
+//import akka.actor.ActorSystem (commented methods)
+
+class TimerException(message: String) extends RuntimeException(message)
+
+/**
+ * Simple timer class.
+ * Usage:
+ * <pre>
+ * import akka.util.duration._
+ * import akka.util.Timer
+ *
+ * val timer = Timer(30.seconds)
+ * while (timer.isTicking) { ... }
+ * </pre>
+ */
+case class Timer(duration: Duration, throwExceptionOnTimeout: Boolean = false) {
+ val startTimeInMillis = System.currentTimeMillis
+ val timeoutInMillis = duration.toMillis
+
+ /**
+ * Returns true while the timer is ticking. After that it either throws and exception or
+ * returns false. Depending on if the 'throwExceptionOnTimeout' argument is true or false.
+ */
+ def isTicking: Boolean = {
+ if (!(timeoutInMillis > (System.currentTimeMillis - startTimeInMillis))) {
+ if (throwExceptionOnTimeout) throw new TimerException("Time out after " + duration)
+ else false
+ } else true
+ }
+}
+
+object Duration {
+ def apply(length: Long, unit: TimeUnit): Duration = new FiniteDuration(length, unit)
+ def apply(length: Double, unit: TimeUnit): Duration = fromNanos(unit.toNanos(1) * length)
+ def apply(length: Long, unit: String): Duration = new FiniteDuration(length, timeUnit(unit))
+
+ def fromNanos(nanos: Long): Duration = {
+ if (nanos % 86400000000000L == 0) {
+ Duration(nanos / 86400000000000L, DAYS)
+ } else if (nanos % 3600000000000L == 0) {
+ Duration(nanos / 3600000000000L, HOURS)
+ } else if (nanos % 60000000000L == 0) {
+ Duration(nanos / 60000000000L, MINUTES)
+ } else if (nanos % 1000000000L == 0) {
+ Duration(nanos / 1000000000L, SECONDS)
+ } else if (nanos % 1000000L == 0) {
+ Duration(nanos / 1000000L, MILLISECONDS)
+ } else if (nanos % 1000L == 0) {
+ Duration(nanos / 1000L, MICROSECONDS)
+ } else {
+ Duration(nanos, NANOSECONDS)
+ }
+ }
+
+ def fromNanos(nanos: Double): Duration = fromNanos((nanos + 0.5).asInstanceOf[Long])
+
+ /**
+ * Construct a Duration by parsing a String. In case of a format error, a
+ * RuntimeException is thrown. See `unapply(String)` for more information.
+ */
+ def apply(s: String): Duration = unapply(s) getOrElse sys.error("format error")
+
+ /**
+ * Deconstruct a Duration into length and unit if it is finite.
+ */
+ def unapply(d: Duration): Option[(Long, TimeUnit)] = {
+ if (d.finite_?) {
+ Some((d.length, d.unit))
+ } else {
+ None
+ }
+ }
+
+ private val RE = ("""^\s*(\d+(?:\.\d+)?)\s*""" + // length part
+ "(?:" + // units are distinguished in separate match groups
+ "(d|day|days)|" +
+ "(h|hour|hours)|" +
+ "(min|minute|minutes)|" +
+ "(s|sec|second|seconds)|" +
+ "(ms|milli|millis|millisecond|milliseconds)|" +
+ "(µs|micro|micros|microsecond|microseconds)|" +
+ "(ns|nano|nanos|nanosecond|nanoseconds)" +
+ """)\s*$""").r // close the non-capturing group
+ private val REinf = """^\s*Inf\s*$""".r
+ private val REminf = """^\s*(?:-\s*|Minus)Inf\s*""".r
+
+ /**
+ * Parse String, return None if no match. Format is `"<length><unit>"`, where
+ * whitespace is allowed before, between and after the parts. Infinities are
+ * designated by `"Inf"` and `"-Inf"` or `"MinusInf"`.
+ */
+ def unapply(s: String): Option[Duration] = s match {
+ case RE(length, d, h, m, s, ms, mus, ns) ⇒
+ if (d ne null) Some(Duration(JDouble.parseDouble(length), DAYS)) else if (h ne null) Some(Duration(JDouble.parseDouble(length), HOURS)) else if (m ne null) Some(Duration(JDouble.parseDouble(length), MINUTES)) else if (s ne null) Some(Duration(JDouble.parseDouble(length), SECONDS)) else if (ms ne null) Some(Duration(JDouble.parseDouble(length), MILLISECONDS)) else if (mus ne null) Some(Duration(JDouble.parseDouble(length), MICROSECONDS)) else if (ns ne null) Some(Duration(JDouble.parseDouble(length), NANOSECONDS)) else
+ sys.error("made some error in regex (should not be possible)")
+ case REinf() ⇒ Some(Inf)
+ case REminf() ⇒ Some(MinusInf)
+ case _ ⇒ None
+ }
+
+ /**
+ * Parse TimeUnit from string representation.
+ */
+ def timeUnit(unit: String) = unit.toLowerCase match {
+ case "d" | "day" | "days" ⇒ DAYS
+ case "h" | "hour" | "hours" ⇒ HOURS
+ case "min" | "minute" | "minutes" ⇒ MINUTES
+ case "s" | "sec" | "second" | "seconds" ⇒ SECONDS
+ case "ms" | "milli" | "millis" | "millisecond" | "milliseconds" ⇒ MILLISECONDS
+ case "µs" | "micro" | "micros" | "microsecond" | "microseconds" ⇒ MICROSECONDS
+ case "ns" | "nano" | "nanos" | "nanosecond" | "nanoseconds" ⇒ NANOSECONDS
+ }
+
+ val Zero: Duration = new FiniteDuration(0, NANOSECONDS)
+ val Undefined: Duration = new Duration with Infinite {
+ override def toString = "Duration.Undefined"
+ override def equals(other: Any) = other.asInstanceOf[AnyRef] eq this
+ override def +(other: Duration): Duration = throw new IllegalArgumentException("cannot add Undefined duration")
+ override def -(other: Duration): Duration = throw new IllegalArgumentException("cannot subtract Undefined duration")
+ override def *(factor: Double): Duration = throw new IllegalArgumentException("cannot multiply Undefined duration")
+ override def /(factor: Double): Duration = throw new IllegalArgumentException("cannot divide Undefined duration")
+ override def /(other: Duration): Double = throw new IllegalArgumentException("cannot divide Undefined duration")
+ def >(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def >=(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def <(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def <=(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def unary_- : Duration = throw new IllegalArgumentException("cannot negate Undefined duration")
+ }
+
+ trait Infinite {
+ this: Duration ⇒
+
+ override def equals(other: Any) = false
+
+ def +(other: Duration): Duration =
+ other match {
+ case _: this.type ⇒ this
+ case _: Infinite ⇒ throw new IllegalArgumentException("illegal addition of infinities")
+ case _ ⇒ this
+ }
+ def -(other: Duration): Duration =
+ other match {
+ case _: this.type ⇒ throw new IllegalArgumentException("illegal subtraction of infinities")
+ case _ ⇒ this
+ }
+ def *(factor: Double): Duration = this
+ def /(factor: Double): Duration = this
+ def /(other: Duration): Double =
+ other match {
+ case _: Infinite ⇒ throw new IllegalArgumentException("illegal division of infinities")
+ // maybe questionable but pragmatic: Inf / 0 => Inf
+ case x ⇒ Double.PositiveInfinity * (if ((this > Zero) ^ (other >= Zero)) -1 else 1)
+ }
+
+ def finite_? = false
+
+ def length: Long = throw new IllegalArgumentException("length not allowed on infinite Durations")
+ def unit: TimeUnit = throw new IllegalArgumentException("unit not allowed on infinite Durations")
+ def toNanos: Long = throw new IllegalArgumentException("toNanos not allowed on infinite Durations")
+ def toMicros: Long = throw new IllegalArgumentException("toMicros not allowed on infinite Durations")
+ def toMillis: Long = throw new IllegalArgumentException("toMillis not allowed on infinite Durations")
+ def toSeconds: Long = throw new IllegalArgumentException("toSeconds not allowed on infinite Durations")
+ def toMinutes: Long = throw new IllegalArgumentException("toMinutes not allowed on infinite Durations")
+ def toHours: Long = throw new IllegalArgumentException("toHours not allowed on infinite Durations")
+ def toDays: Long = throw new IllegalArgumentException("toDays not allowed on infinite Durations")
+ def toUnit(unit: TimeUnit): Double = throw new IllegalArgumentException("toUnit not allowed on infinite Durations")
+
+ def printHMS = toString
+ }
+
+ /**
+ * Infinite duration: greater than any other and not equal to any other,
+ * including itself.
+ */
+ val Inf: Duration = new Duration with Infinite {
+ override def toString = "Duration.Inf"
+ def >(other: Duration) = true
+ def >=(other: Duration) = true
+ def <(other: Duration) = false
+ def <=(other: Duration) = false
+ def unary_- : Duration = MinusInf
+ }
+
+ /**
+ * Infinite negative duration: lesser than any other and not equal to any other,
+ * including itself.
+ */
+ val MinusInf: Duration = new Duration with Infinite {
+ override def toString = "Duration.MinusInf"
+ def >(other: Duration) = false
+ def >=(other: Duration) = false
+ def <(other: Duration) = true
+ def <=(other: Duration) = true
+ def unary_- : Duration = Inf
+ }
+
+ // Java Factories
+ def create(length: Long, unit: TimeUnit): Duration = apply(length, unit)
+ def create(length: Double, unit: TimeUnit): Duration = apply(length, unit)
+ def create(length: Long, unit: String): Duration = apply(length, unit)
+ def parse(s: String): Duration = unapply(s).get
+}
+
+/**
+ * Utility for working with java.util.concurrent.TimeUnit durations.
+ *
+ * <p/>
+ * Examples of usage from Java:
+ * <pre>
+ * import akka.util.FiniteDuration;
+ * import java.util.concurrent.TimeUnit;
+ *
+ * Duration duration = new FiniteDuration(100, MILLISECONDS);
+ * Duration duration = new FiniteDuration(5, "seconds");
+ *
+ * duration.toNanos();
+ * </pre>
+ *
+ * <p/>
+ * Examples of usage from Scala:
+ * <pre>
+ * import akka.util.Duration
+ * import java.util.concurrent.TimeUnit
+ *
+ * val duration = Duration(100, MILLISECONDS)
+ * val duration = Duration(100, "millis")
+ *
+ * duration.toNanos
+ * duration < 1.second
+ * duration <= Duration.Inf
+ * </pre>
+ *
+ * <p/>
+ * Implicits are also provided for Int, Long and Double. Example usage:
+ * <pre>
+ * import akka.util.duration._
+ *
+ * val duration = 100 millis
+ * </pre>
+ *
+ * Extractors, parsing and arithmetic are also included:
+ * <pre>
+ * val d = Duration("1.2 µs")
+ * val Duration(length, unit) = 5 millis
+ * val d2 = d * 2.5
+ * val d3 = d2 + 1.millisecond
+ * </pre>
+ */
+abstract class Duration extends Serializable {
+ def length: Long
+ def unit: TimeUnit
+ def toNanos: Long
+ def toMicros: Long
+ def toMillis: Long
+ def toSeconds: Long
+ def toMinutes: Long
+ def toHours: Long
+ def toDays: Long
+ def toUnit(unit: TimeUnit): Double
+ def printHMS: String
+ def <(other: Duration): Boolean
+ def <=(other: Duration): Boolean
+ def >(other: Duration): Boolean
+ def >=(other: Duration): Boolean
+ def +(other: Duration): Duration
+ def -(other: Duration): Duration
+ def *(factor: Double): Duration
+ def /(factor: Double): Duration
+ def /(other: Duration): Double
+ def unary_- : Duration
+ def finite_? : Boolean
+// def dilated(implicit system: ActorSystem): Duration = this * system.settings.TestTimeFactor
+ def min(other: Duration): Duration = if (this < other) this else other
+ def max(other: Duration): Duration = if (this > other) this else other
+ def sleep(): Unit = Thread.sleep(toMillis)
+
+ // Java API
+ def lt(other: Duration) = this < other
+ def lteq(other: Duration) = this <= other
+ def gt(other: Duration) = this > other
+ def gteq(other: Duration) = this >= other
+ def plus(other: Duration) = this + other
+ def minus(other: Duration) = this - other
+ def mul(factor: Double) = this * factor
+ def div(factor: Double) = this / factor
+ def div(other: Duration) = this / other
+ def neg() = -this
+ def isFinite() = finite_?
+}
+
+class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duration {
+ import Duration._
+
+ def this(length: Long, unit: String) = this(length, Duration.timeUnit(unit))
+
+ def toNanos = unit.toNanos(length)
+ def toMicros = unit.toMicros(length)
+ def toMillis = unit.toMillis(length)
+ def toSeconds = unit.toSeconds(length)
+ def toMinutes = unit.toMinutes(length)
+ def toHours = unit.toHours(length)
+ def toDays = unit.toDays(length)
+ def toUnit(u: TimeUnit) = long2double(toNanos) / NANOSECONDS.convert(1, u)
+
+ override def toString = this match {
+ case Duration(1, DAYS) ⇒ "1 day"
+ case Duration(x, DAYS) ⇒ x + " days"
+ case Duration(1, HOURS) ⇒ "1 hour"
+ case Duration(x, HOURS) ⇒ x + " hours"
+ case Duration(1, MINUTES) ⇒ "1 minute"
+ case Duration(x, MINUTES) ⇒ x + " minutes"
+ case Duration(1, SECONDS) ⇒ "1 second"
+ case Duration(x, SECONDS) ⇒ x + " seconds"
+ case Duration(1, MILLISECONDS) ⇒ "1 millisecond"
+ case Duration(x, MILLISECONDS) ⇒ x + " milliseconds"
+ case Duration(1, MICROSECONDS) ⇒ "1 microsecond"
+ case Duration(x, MICROSECONDS) ⇒ x + " microseconds"
+ case Duration(1, NANOSECONDS) ⇒ "1 nanosecond"
+ case Duration(x, NANOSECONDS) ⇒ x + " nanoseconds"
+ }
+
+ def printHMS = "%02d:%02d:%06.3f".format(toHours, toMinutes % 60, toMillis / 1000.0 % 60)
+
+ def <(other: Duration) = {
+ if (other.finite_?) {
+ toNanos < other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other > this
+ }
+ }
+
+ def <=(other: Duration) = {
+ if (other.finite_?) {
+ toNanos <= other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other >= this
+ }
+ }
+
+ def >(other: Duration) = {
+ if (other.finite_?) {
+ toNanos > other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other < this
+ }
+ }
+
+ def >=(other: Duration) = {
+ if (other.finite_?) {
+ toNanos >= other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other <= this
+ }
+ }
+
+ def +(other: Duration) = {
+ if (!other.finite_?) {
+ other
+ } else {
+ val nanos = toNanos + other.asInstanceOf[FiniteDuration].toNanos
+ fromNanos(nanos)
+ }
+ }
+
+ def -(other: Duration) = {
+ if (!other.finite_?) {
+ other
+ } else {
+ val nanos = toNanos - other.asInstanceOf[FiniteDuration].toNanos
+ fromNanos(nanos)
+ }
+ }
+
+ def *(factor: Double) = fromNanos(long2double(toNanos) * factor)
+
+ def /(factor: Double) = fromNanos(long2double(toNanos) / factor)
+
+ def /(other: Duration) = if (other.finite_?) long2double(toNanos) / other.toNanos else 0
+
+ def unary_- = Duration(-length, unit)
+
+ def finite_? = true
+
+ override def equals(other: Any) =
+ other.isInstanceOf[FiniteDuration] &&
+ toNanos == other.asInstanceOf[FiniteDuration].toNanos
+
+ override def hashCode = toNanos.asInstanceOf[Int]
+}
+
+class DurationInt(n: Int) {
+ def nanoseconds = Duration(n, NANOSECONDS)
+ def nanos = Duration(n, NANOSECONDS)
+ def nanosecond = Duration(n, NANOSECONDS)
+ def nano = Duration(n, NANOSECONDS)
+
+ def microseconds = Duration(n, MICROSECONDS)
+ def micros = Duration(n, MICROSECONDS)
+ def microsecond = Duration(n, MICROSECONDS)
+ def micro = Duration(n, MICROSECONDS)
+
+ def milliseconds = Duration(n, MILLISECONDS)
+ def millis = Duration(n, MILLISECONDS)
+ def millisecond = Duration(n, MILLISECONDS)
+ def milli = Duration(n, MILLISECONDS)
+
+ def seconds = Duration(n, SECONDS)
+ def second = Duration(n, SECONDS)
+
+ def minutes = Duration(n, MINUTES)
+ def minute = Duration(n, MINUTES)
+
+ def hours = Duration(n, HOURS)
+ def hour = Duration(n, HOURS)
+
+ def days = Duration(n, DAYS)
+ def day = Duration(n, DAYS)
+}
+
+class DurationLong(n: Long) {
+ def nanoseconds = Duration(n, NANOSECONDS)
+ def nanos = Duration(n, NANOSECONDS)
+ def nanosecond = Duration(n, NANOSECONDS)
+ def nano = Duration(n, NANOSECONDS)
+
+ def microseconds = Duration(n, MICROSECONDS)
+ def micros = Duration(n, MICROSECONDS)
+ def microsecond = Duration(n, MICROSECONDS)
+ def micro = Duration(n, MICROSECONDS)
+
+ def milliseconds = Duration(n, MILLISECONDS)
+ def millis = Duration(n, MILLISECONDS)
+ def millisecond = Duration(n, MILLISECONDS)
+ def milli = Duration(n, MILLISECONDS)
+
+ def seconds = Duration(n, SECONDS)
+ def second = Duration(n, SECONDS)
+
+ def minutes = Duration(n, MINUTES)
+ def minute = Duration(n, MINUTES)
+
+ def hours = Duration(n, HOURS)
+ def hour = Duration(n, HOURS)
+
+ def days = Duration(n, DAYS)
+ def day = Duration(n, DAYS)
+}
+
+class DurationDouble(d: Double) {
+ def nanoseconds = Duration(d, NANOSECONDS)
+ def nanos = Duration(d, NANOSECONDS)
+ def nanosecond = Duration(d, NANOSECONDS)
+ def nano = Duration(d, NANOSECONDS)
+
+ def microseconds = Duration(d, MICROSECONDS)
+ def micros = Duration(d, MICROSECONDS)
+ def microsecond = Duration(d, MICROSECONDS)
+ def micro = Duration(d, MICROSECONDS)
+
+ def milliseconds = Duration(d, MILLISECONDS)
+ def millis = Duration(d, MILLISECONDS)
+ def millisecond = Duration(d, MILLISECONDS)
+ def milli = Duration(d, MILLISECONDS)
+
+ def seconds = Duration(d, SECONDS)
+ def second = Duration(d, SECONDS)
+
+ def minutes = Duration(d, MINUTES)
+ def minute = Duration(d, MINUTES)
+
+ def hours = Duration(d, HOURS)
+ def hour = Duration(d, HOURS)
+
+ def days = Duration(d, DAYS)
+ def day = Duration(d, DAYS)
+}
diff --git a/src/library/scala/util/Timeout.scala b/src/library/scala/util/Timeout.scala
new file mode 100644
index 0000000000..0190675344
--- /dev/null
+++ b/src/library/scala/util/Timeout.scala
@@ -0,0 +1,33 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+package scala.util
+
+import java.util.concurrent.TimeUnit
+
+case class Timeout(duration: Duration) {
+ def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
+ def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
+}
+
+object Timeout {
+ /**
+ * A timeout with zero duration, will cause most requests to always timeout.
+ */
+ val zero = new Timeout(Duration.Zero)
+
+ /**
+ * A Timeout with infinite duration. Will never timeout. Use extreme caution with this
+ * as it may cause memory leaks, blocked threads, or may not even be supported by
+ * the receiver, which would result in an exception.
+ */
+ val never = new Timeout(Duration.Inf)
+
+ def apply(timeout: Long) = new Timeout(timeout)
+ def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit)
+
+ implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
+ implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
+ implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
+ //implicit def defaultTimeout(implicit system: ActorSystem) = system.settings.ActorTimeout (have to introduce this in ActorSystem)
+}