summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdriaan Moors <adriaan.moors@typesafe.com>2012-12-13 15:02:51 -0800
committerAdriaan Moors <adriaan.moors@typesafe.com>2012-12-13 15:02:51 -0800
commiteda88c84daae182266565d21b31ad3c93d9b3be5 (patch)
treef64dbc7b0a2f7986a7bbd4b9ff2b3bff56599e95
parent8e3d23a950ced345eea108926cf35838e6c4befc (diff)
parentf0bc3f765416b2ff89ebb5af5677ba08010306d3 (diff)
downloadscala-eda88c84daae182266565d21b31ad3c93d9b3be5.tar.gz
scala-eda88c84daae182266565d21b31ad3c93d9b3be5.tar.bz2
scala-eda88c84daae182266565d21b31ad3c93d9b3be5.zip
Merge pull request #1764 from adriaanm/backport/sip14
SIP-14 backport to 2.9.x
-rw-r--r--build.xml33
-rw-r--r--src/compiler/scala/tools/nsc/ast/TreeGen.scala2
-rw-r--r--src/compiler/scala/tools/nsc/backend/icode/Members.scala1
-rw-r--r--src/compiler/scala/tools/nsc/backend/opt/Inliners.scala4
-rw-r--r--src/compiler/scala/tools/nsc/symtab/classfile/ICodeReader.scala3
-rw-r--r--src/library/scala/concurrent/Awaitable.scala64
-rw-r--r--src/library/scala/concurrent/BlockContext.scala77
-rw-r--r--src/library/scala/concurrent/DelayedLazyVal.scala4
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala88
-rw-r--r--src/library/scala/concurrent/Future.scala691
-rw-r--r--src/library/scala/concurrent/Promise.scala152
-rw-r--r--src/library/scala/concurrent/duration/Deadline.scala81
-rw-r--r--src/library/scala/concurrent/duration/Duration.scala697
-rw-r--r--src/library/scala/concurrent/duration/DurationConversions.scala92
-rw-r--r--src/library/scala/concurrent/duration/package.scala79
-rw-r--r--src/library/scala/concurrent/impl/AbstractPromise.java40
-rw-r--r--src/library/scala/concurrent/impl/AdaptedRunnableAction.java21
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala135
-rw-r--r--src/library/scala/concurrent/impl/Future.scala34
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala174
-rw-r--r--src/library/scala/concurrent/package.scala109
-rw-r--r--src/library/scala/concurrent/util/Unsafe.java35
-rw-r--r--src/library/scala/util/Try.scala216
-rw-r--r--src/library/scala/util/control/NonFatal.scala45
-rw-r--r--test/files/jvm/future-spec.flags1
-rw-r--r--test/files/jvm/future-spec/FutureTests.scala524
-rw-r--r--test/files/jvm/future-spec/PromiseTests.scala246
-rw-r--r--test/files/jvm/future-spec/TryTests.scala130
-rw-r--r--test/files/jvm/future-spec/main.scala110
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala1036
-rw-r--r--test/files/pos/t5958.flags1
-rw-r--r--test/files/pos/t5958.scala15
32 files changed, 4925 insertions, 15 deletions
diff --git a/build.xml b/build.xml
index d9be96e436..0f62615f89 100644
--- a/build.xml
+++ b/build.xml
@@ -286,9 +286,9 @@ INITIALISATION
<property name="scalac.args.optimise" value=""/>
<!-- scalac.args.quickonly are added to quick.* targets but not others (particularly, locker.)
This is to facilitate testing new command line options which do not yet exist in starr. -->
- <property name="scalac.args.quickonly" value=""/>
+ <property name="scalac.args.quickonly" value="-Ydependent-method-types"/>
<property name="scalac.args.all" value="${scalac.args} ${scalac.args.optimise}"/>
- <property name="scalac.args.quick" value="${scalac.args.all} ${scalac.args.quickonly}"/>
+ <property name="scalac.args.quick" value="${scalac.args.all} ${scalac.args.quickonly}"/>
<!-- Setting-up Ant contrib tasks -->
<taskdef resource="net/sf/antcontrib/antlib.xml" classpath="${lib.dir}/ant/ant-contrib.jar"/>
<!-- This is the start time for the distribution -->
@@ -376,8 +376,9 @@ LOCAL REFERENCE BUILD (LOCKER)
<javac
srcdir="${src.dir}/library"
destdir="${build-locker.dir}/classes/library"
- classpath="${build-locker.dir}/classes/library"
+ classpathref="quick.compilation.path"
includes="**/*.java"
+ excludes="scala/concurrent/**/*.java"
target="1.5" source="1.5">
<compilerarg line="${javac.args}"/>
</javac>
@@ -392,6 +393,17 @@ LOCAL REFERENCE BUILD (LOCKER)
srcdir="${src.dir}/library"
jvmargs="${scalacfork.jvmargs}">
<include name="**/*.scala"/>
+ <!-- Exclude duration package and everything that depends on it -->
+ <exclude name="scala/concurrent/duration/**/*.scala"/>
+ <exclude name="scala/concurrent/package.scala"/>
+ <exclude name="scala/concurrent/Awaitable.scala"/>
+ <exclude name="scala/concurrent/Future.scala"/>
+ <exclude name="scala/concurrent/Promise.scala"/>
+ <exclude name="scala/concurrent/ExecutionContext.scala"/>
+ <exclude name="scala/concurrent/BlockContext.scala"/>
+ <exclude name="scala/concurrent/impl/Future.scala"/>
+ <exclude name="scala/concurrent/impl/Promise.scala"/>
+ <exclude name="scala/concurrent/impl/ExecutionContextImpl.scala"/>
<compilationpath>
<pathelement location="${build-locker.dir}/classes/library"/>
<pathelement location="${lib.dir}/forkjoin.jar"/>
@@ -546,7 +558,7 @@ QUICK BUILD (QUICK)
<javac
srcdir="${src.dir}/library"
destdir="${build-quick.dir}/classes/library"
- classpath="${build-quick.dir}/classes/library"
+ classpathref="quick.compilation.path"
includes="**/*.java"
target="1.5" source="1.5">
<compilerarg line="${javac.args}"/>
@@ -1083,7 +1095,7 @@ BOOTSTRAPPING BUILD (STRAP)
<javac
srcdir="${src.dir}/library"
destdir="${build-strap.dir}/classes/library"
- classpath="${build-strap.dir}/classes/library"
+ classpathref="strap.compilation.path"
includes="**/*.java"
target="1.5" source="1.5">
<compilerarg line="${javac.args}"/>
@@ -1100,7 +1112,7 @@ BOOTSTRAPPING BUILD (STRAP)
destdir="${build-strap.dir}/classes/library"
compilerpathref="pack.classpath"
srcpath="${src.dir}/library"
- params="${scalac.args.all}"
+ params="${scalac.args.quick}"
srcdir="${src.dir}/library"
jvmargs="${scalacfork.jvmargs}">
<include name="**/*.scala"/>
@@ -1109,7 +1121,7 @@ BOOTSTRAPPING BUILD (STRAP)
<scalacfork
destdir="${build-strap.dir}/classes/library"
compilerpathref="pack.classpath"
- params="${scalac.args.all}"
+ params="${scalac.args.quick}"
srcdir="${src.dir}/actors"
jvmargs="${scalacfork.jvmargs}">
<include name="**/*.scala"/>
@@ -1118,7 +1130,7 @@ BOOTSTRAPPING BUILD (STRAP)
<scalacfork
destdir="${build-strap.dir}/classes/library"
compilerpathref="pack.classpath"
- params="${scalac.args.all}"
+ params="${scalac.args.quick}"
srcdir="${src.dir}/dbc"
jvmargs="${scalacfork.jvmargs}">
<include name="**/*.scala"/>
@@ -1127,7 +1139,7 @@ BOOTSTRAPPING BUILD (STRAP)
<scalacfork
destdir="${build-strap.dir}/classes/library"
compilerpathref="pack.classpath"
- params="${scalac.args.all}"
+ params="${scalac.args.quick}"
srcdir="${src.dir}/swing"
jvmargs="${scalacfork.jvmargs}">
<include name="**/*.scala"/>
@@ -1163,7 +1175,7 @@ BOOTSTRAPPING BUILD (STRAP)
<scalacfork
destdir="${build-strap.dir}/classes/compiler"
compilerpathref="pack.classpath"
- params="${scalac.args.all}"
+ params="${scalac.args.quick}"
srcdir="${src.dir}/compiler"
jvmargs="${scalacfork.jvmargs}">
<include name="**/*.scala"/>
@@ -1531,6 +1543,7 @@ DOCUMENTATION
docUncompilable="${src.dir}/library-aux"
sourcepath="${src.dir}"
classpathref="pack.classpath"
+ addparams="-Ydependent-method-types"
docRootContent="${build-docs.dir}/library/lib/rootdoc.txt">
<src>
<files includes="${src.dir}/actors"/>
diff --git a/src/compiler/scala/tools/nsc/ast/TreeGen.scala b/src/compiler/scala/tools/nsc/ast/TreeGen.scala
index 268e104309..63c5b66243 100644
--- a/src/compiler/scala/tools/nsc/ast/TreeGen.scala
+++ b/src/compiler/scala/tools/nsc/ast/TreeGen.scala
@@ -124,6 +124,8 @@ abstract class TreeGen {
/** Computes stable type for a tree if possible */
def stableTypeFor(tree: Tree): Option[Type] = tree match {
+ case This(_) if tree.symbol != null && !tree.symbol.isError =>
+ Some(ThisType(tree.symbol))
case Ident(_) if tree.symbol.isStable =>
Some(singleType(tree.symbol.owner.thisType, tree.symbol))
case Select(qual, _) if ((tree.symbol ne null) && (qual.tpe ne null)) && // turned assert into guard for #4064
diff --git a/src/compiler/scala/tools/nsc/backend/icode/Members.scala b/src/compiler/scala/tools/nsc/backend/icode/Members.scala
index 630f109bb6..54f66b96d8 100644
--- a/src/compiler/scala/tools/nsc/backend/icode/Members.scala
+++ b/src/compiler/scala/tools/nsc/backend/icode/Members.scala
@@ -154,6 +154,7 @@ trait Members { self: ICodes =>
var returnType: TypeKind = _
var recursive: Boolean = false
+ var bytecodeHasEHs = false // set by ICodeReader only, used by Inliner to prevent inlining (SI-6188)
/** local variables and method parameters */
var locals: List[Local] = Nil
diff --git a/src/compiler/scala/tools/nsc/backend/opt/Inliners.scala b/src/compiler/scala/tools/nsc/backend/opt/Inliners.scala
index 82ca2d07d1..b08e849099 100644
--- a/src/compiler/scala/tools/nsc/backend/opt/Inliners.scala
+++ b/src/compiler/scala/tools/nsc/backend/opt/Inliners.scala
@@ -311,7 +311,7 @@ abstract class Inliners extends SubComponent {
def isRecursive = m.recursive
def hasCode = m.code != null
def hasSourceFile = m.sourceFile != null
- def hasHandlers = handlers.nonEmpty
+ def hasHandlers = handlers.nonEmpty || m.bytecodeHasEHs
// the number of inlined calls in 'm', used by 'shouldInline'
var inlinedCalls = 0
@@ -495,7 +495,7 @@ abstract class Inliners extends SubComponent {
}
def isStampedForInlining(stack: TypeStack) =
- !sameSymbols && inc.hasCode && shouldInline && isSafeToInline(stack)
+ !sameSymbols && inc.hasCode && !inc.m.bytecodeHasEHs && shouldInline && isSafeToInline(stack)
def logFailure(stack: TypeStack) = log(
"""|inline failed for %s:
diff --git a/src/compiler/scala/tools/nsc/symtab/classfile/ICodeReader.scala b/src/compiler/scala/tools/nsc/symtab/classfile/ICodeReader.scala
index ac2cd9e996..94de765f31 100644
--- a/src/compiler/scala/tools/nsc/symtab/classfile/ICodeReader.scala
+++ b/src/compiler/scala/tools/nsc/symtab/classfile/ICodeReader.scala
@@ -618,6 +618,7 @@ abstract class ICodeReader extends ClassfileParser {
while (pc < codeLength) parseInstruction
val exceptionEntries = in.nextChar.toInt
+ code.containsEHs = (exceptionEntries != 0)
var i = 0
while (i < exceptionEntries) {
// skip start end PC
@@ -669,6 +670,7 @@ abstract class ICodeReader extends ClassfileParser {
var containsDUPX = false
var containsNEW = false
+ var containsEHs = false
def emit(i: Instruction) {
instrs += ((pc, i))
@@ -686,6 +688,7 @@ abstract class ICodeReader extends ClassfileParser {
val code = new Code(method)
method.setCode(code)
+ method.bytecodeHasEHs = containsEHs
var bb = code.startBlock
def makeBasicBlocks: mutable.Map[Int, BasicBlock] =
diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala
new file mode 100644
index 0000000000..652a23471f
--- /dev/null
+++ b/src/library/scala/concurrent/Awaitable.scala
@@ -0,0 +1,64 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+
+
+import scala.concurrent.duration.Duration
+
+
+
+/**
+ * An object that may eventually be completed with a result value of type `T` which may be
+ * awaited using blocking methods.
+ *
+ * The [[Await]] object provides methods that allow accessing the result of an `Awaitable`
+ * by blocking the current thread until the `Awaitable` has been completed or a timeout has
+ * occurred.
+ */
+trait Awaitable[+T] {
+
+ /**
+ * Await the "completed" state of this `Awaitable`.
+ *
+ * '''''This method should not be called directly; use [[Await.ready]] instead.'''''
+ *
+ * @param atMost
+ * maximum wait time, which may be negative (no waiting is done),
+ * [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, or a finite positive
+ * duration
+ * @return this `Awaitable`
+ * @throws InterruptedException if the current thread is interrupted while waiting
+ * @throws TimeoutException if after waiting for the specified time this `Awaitable` is still not ready
+ * @throws IllegalArgumentException if `atMost` is [[scala.concurrent.duration.Duration.Undefined Duration.Undefined]]
+ */
+ @throws(classOf[TimeoutException])
+ @throws(classOf[InterruptedException])
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type
+
+ /**
+ * Await and return the result (of type `T`) of this `Awaitable`.
+ *
+ * '''''This method should not be called directly; use [[Await.result]] instead.'''''
+ *
+ * @param atMost
+ * maximum wait time, which may be negative (no waiting is done),
+ * [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, or a finite positive
+ * duration
+ * @return the result value if the `Awaitable` is completed within the specific maximum wait time
+ * @throws InterruptedException if the current thread is interrupted while waiting
+ * @throws TimeoutException if after waiting for the specified time this `Awaitable` is still not ready
+ * @throws IllegalArgumentException if `atMost` is [[scala.concurrent.duration.Duration.Undefined Duration.Undefined]]
+ */
+ @throws(classOf[Exception])
+ def result(atMost: Duration)(implicit permit: CanAwait): T
+}
+
+
+
diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala
new file mode 100644
index 0000000000..747cc393c3
--- /dev/null
+++ b/src/library/scala/concurrent/BlockContext.scala
@@ -0,0 +1,77 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+/**
+ * A context to be notified by `scala.concurrent.blocking` when
+ * a thread is about to block. In effect this trait provides
+ * the implementation for `scala.concurrent.Await`.
+ * `scala.concurrent.Await.result()` and `scala.concurrent.Await.ready()`
+ * locates an instance of `BlockContext` by first looking for one
+ * provided through `BlockContext.withBlockContext()` and failing that,
+ * checking whether `Thread.currentThread` is an instance of `BlockContext`.
+ * So a thread pool can have its `java.lang.Thread` instances implement
+ * `BlockContext`. There's a default `BlockContext` used if the thread
+ * doesn't implement `BlockContext`.
+ *
+ * Typically, you'll want to chain to the previous `BlockContext`,
+ * like this:
+ * {{{
+ * val oldContext = BlockContext.current
+ * val myContext = new BlockContext {
+ * override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
+ * // you'd have code here doing whatever you need to do
+ * // when the thread is about to block.
+ * // Then you'd chain to the previous context:
+ * oldContext.blockOn(thunk)
+ * }
+ * }
+ * BlockContext.withBlockContext(myContext) {
+ * // then this block runs with myContext as the handler
+ * // for scala.concurrent.blocking
+ * }
+ * }}}
+ */
+trait BlockContext {
+
+ /** Used internally by the framework;
+ * Designates (and eventually executes) a thunk which potentially blocks the calling `Thread`.
+ *
+ * Clients must use `scala.concurrent.blocking` or `scala.concurrent.Await` instead.
+ */
+ def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T
+}
+
+object BlockContext {
+ private object DefaultBlockContext extends BlockContext {
+ override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = thunk
+ }
+
+ private val contextLocal = new ThreadLocal[BlockContext]()
+
+ /** Obtain the current thread's current `BlockContext`. */
+ def current: BlockContext = contextLocal.get match {
+ case null => Thread.currentThread match {
+ case ctx: BlockContext => ctx
+ case _ => DefaultBlockContext
+ }
+ case some => some
+ }
+
+ /** Pushes a current `BlockContext` while executing `body`. */
+ def withBlockContext[T](blockContext: BlockContext)(body: => T): T = {
+ val old = contextLocal.get // can be null
+ try {
+ contextLocal.set(blockContext)
+ body
+ } finally {
+ contextLocal.set(old)
+ }
+ }
+}
diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala
index 2a143aca72..e04c4f7fc9 100644
--- a/src/library/scala/concurrent/DelayedLazyVal.scala
+++ b/src/library/scala/concurrent/DelayedLazyVal.scala
@@ -9,8 +9,6 @@
package scala.concurrent
-import ops.future
-
/** A <code>DelayedLazyVal</code> is a wrapper for lengthy
* computations which have a valid partially computed result.
* The first argument is a function for obtaining the result
@@ -41,7 +39,7 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) {
*/
def apply(): T = if (isDone) complete else f()
- future {
+ ops.future {
body
_isDone = true
}
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
new file mode 100644
index 0000000000..e3a912d217
--- /dev/null
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -0,0 +1,88 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+
+import java.util.concurrent.{ Executor, ExecutorService }
+import scala.annotation.implicitNotFound
+
+/**
+ * An `ExecutionContext` is an abstraction over an entity that can execute program logic.
+ */
+@implicitNotFound("Cannot find an implicit ExecutionContext, either require one yourself or import ExecutionContext.Implicits.global")
+trait ExecutionContext {
+
+ /** Runs a block of code on this execution context.
+ */
+ def execute(runnable: Runnable): Unit
+
+ /** Reports that an asynchronous computation failed.
+ */
+ def reportFailure(t: Throwable): Unit
+
+ /** Prepares for the execution of a task. Returns the prepared
+ * execution context. A valid implementation of `prepare` is one
+ * that simply returns `this`.
+ */
+ def prepare(): ExecutionContext = this
+
+}
+
+/**
+ * Union interface since Java does not support union types
+ */
+trait ExecutionContextExecutor extends ExecutionContext with Executor
+
+/**
+ * Union interface since Java does not support union types
+ */
+trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService
+
+
+/** Contains factory methods for creating execution contexts.
+ */
+object ExecutionContext {
+ /**
+ * This is the explicit global ExecutionContext,
+ * call this when you want to provide the global ExecutionContext explicitly
+ */
+ def global: ExecutionContextExecutor = Implicits.global
+
+ object Implicits {
+ /**
+ * This is the implicit global ExecutionContext,
+ * import this when you want to provide the global ExecutionContext implicitly
+ */
+ implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
+ }
+
+ /** Creates an `ExecutionContext` from the given `ExecutorService`.
+ */
+ def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService =
+ impl.ExecutionContextImpl.fromExecutorService(e, reporter)
+
+ /** Creates an `ExecutionContext` from the given `ExecutorService` with the default Reporter.
+ */
+ def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)
+
+ /** Creates an `ExecutionContext` from the given `Executor`.
+ */
+ def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor =
+ impl.ExecutionContextImpl.fromExecutor(e, reporter)
+
+ /** Creates an `ExecutionContext` from the given `Executor` with the default Reporter.
+ */
+ def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter)
+
+ /** The default reporter simply prints the stack trace of the `Throwable` to System.err.
+ */
+ def defaultReporter: Throwable => Unit = (t: Throwable) => t.printStackTrace()
+}
+
+
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
new file mode 100644
index 0000000000..ddd8e53578
--- /dev/null
+++ b/src/library/scala/concurrent/Future.scala
@@ -0,0 +1,691 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
+import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS }
+import java.lang.{ Iterable => JIterable }
+import java.util.{ LinkedList => JLinkedList }
+import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
+
+import scala.util.control.NonFatal
+import scala.Option
+import scala.util.{Try, Success, Failure}
+
+import scala.annotation.tailrec
+import scala.collection.mutable.Builder
+import scala.collection.generic.CanBuildFrom
+
+
+
+/** The trait that represents futures.
+ *
+ * Asynchronous computations that yield futures are created with the `future` call:
+ *
+ * {{{
+ * val s = "Hello"
+ * val f: Future[String] = future {
+ * s + " future!"
+ * }
+ * f onSuccess {
+ * case msg => println(msg)
+ * }
+ * }}}
+ *
+ * @author Philipp Haller, Heather Miller, Aleksandar Prokopec, Viktor Klang
+ *
+ * @define multipleCallbacks
+ * Multiple callbacks may be registered; there is no guarantee that they will be
+ * executed in a particular order.
+ *
+ * @define caughtThrowables
+ * The future may contain a throwable object and this means that the future failed.
+ * Futures obtained through combinators have the same exception as the future they were obtained from.
+ * The following throwable objects are not contained in the future:
+ * - `Error` - errors are not contained within futures
+ * - `InterruptedException` - not contained within futures
+ * - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures
+ *
+ * Instead, the future is completed with a ExecutionException with one of the exceptions above
+ * as the cause.
+ * If a future is failed with a `scala.runtime.NonLocalReturnControl`,
+ * it is completed with a value from that throwable instead.
+ *
+ * @define nonDeterministic
+ * Note: using this method yields nondeterministic dataflow programs.
+ *
+ * @define forComprehensionExamples
+ * Example:
+ *
+ * {{{
+ * val f = future { 5 }
+ * val g = future { 3 }
+ * val h = for {
+ * x: Int <- f // returns Future(5)
+ * y: Int <- g // returns Future(5)
+ * } yield x + y
+ * }}}
+ *
+ * is translated to:
+ *
+ * {{{
+ * f flatMap { (x: Int) => g map { (y: Int) => x + y } }
+ * }}}
+ *
+ * @define callbackInContext
+ * The provided callback always runs in the provided implicit
+ *`ExecutionContext`, though there is no guarantee that the
+ * `execute()` method on the `ExecutionContext` will be called once
+ * per callback or that `execute()` will be called in the current
+ * thread. That is, the implementation may run multiple callbacks
+ * in a batch within a single `execute()` and it may run
+ * `execute()` either immediately or asynchronously.
+ */
+trait Future[+T] extends Awaitable[T] {
+
+ // The executor within the lexical scope
+ // of the Future trait. Note that this will
+ // (modulo bugs) _never_ execute a callback
+ // other than those below in this same file.
+ // As a nice side benefit, having this implicit
+ // here forces an ambiguity in those methods
+ // that also have an executor parameter, which
+ // keeps us from accidentally forgetting to use
+ // the executor parameter.
+ private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor
+
+ /* Callbacks */
+
+ /** When this future is completed successfully (i.e. with a value),
+ * apply the provided partial function to the value if the partial function
+ * is defined at that value.
+ *
+ * If the future has already been completed with a value,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * $multipleCallbacks
+ * $callbackInContext
+ */
+ def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete {
+ case Success(v) if pf isDefinedAt v => pf(v)
+ case _ =>
+ }(executor)
+
+ /** When this future is completed with a failure (i.e. with a throwable),
+ * apply the provided callback to the throwable.
+ *
+ * $caughtThrowables
+ *
+ * If the future has already been completed with a failure,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * Will not be called in case that the future is completed with a value.
+ *
+ * $multipleCallbacks
+ * $callbackInContext
+ */
+ def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
+ case Failure(t) if NonFatal(t) && callback.isDefinedAt(t) => callback(t)
+ case _ =>
+ }(executor)
+
+ /** When this future is completed, either through an exception, or a value,
+ * apply the provided function.
+ *
+ * If the future has already been completed,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * $multipleCallbacks
+ * $callbackInContext
+ */
+ def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit
+
+
+ /* Miscellaneous */
+
+ /** Returns whether the future has already been completed with
+ * a value or an exception.
+ *
+ * $nonDeterministic
+ *
+ * @return `true` if the future is already completed, `false` otherwise
+ */
+ def isCompleted: Boolean
+
+ /** The value of this `Future`.
+ *
+ * If the future is not completed the returned value will be `None`.
+ * If the future is completed the value will be `Some(Success(t))`
+ * if it contains a valid result, or `Some(Failure(error))` if it contains
+ * an exception.
+ */
+ def value: Option[Try[T]]
+
+
+ /* Projections */
+
+ /** Returns a failed projection of this future.
+ *
+ * The failed projection is a future holding a value of type `Throwable`.
+ *
+ * It is completed with a value which is the throwable of the original future
+ * in case the original future is failed.
+ *
+ * It is failed with a `NoSuchElementException` if the original future is completed successfully.
+ *
+ * Blocking on this future returns a value if the original future is completed with an exception
+ * and throws a corresponding exception if the original future fails.
+ */
+ def failed: Future[Throwable] = {
+ val p = Promise[Throwable]()
+
+ onComplete {
+ case Failure(t) => p success t
+ case Success(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable."))
+ }
+
+ p.future
+ }
+
+
+ /* Monadic operations */
+
+ /** Asynchronously processes the value in the future once the value becomes available.
+ *
+ * Will not be called if the future fails.
+ */
+ def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete {
+ case Success(r) => f(r)
+ case _ => // do nothing
+ }(executor)
+
+ /** Creates a new future by applying the 's' function to the successful result of
+ * this future, or the 'f' function to the failed result. If there is any non-fatal
+ * exception thrown when 's' or 'f' is applied, that exception will be propagated
+ * to the resulting future.
+ *
+ * @param s function that transforms a successful result of the receiver into a
+ * successful result of the returned future
+ * @param f function that transforms a failure of the receiver into a failure of
+ * the returned future
+ * @return a future that will be completed with the transformed value
+ */
+ def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = {
+ val p = Promise[S]()
+
+ onComplete {
+ case result =>
+ try {
+ result match {
+ case Failure(t) => p failure f(t)
+ case Success(r) => p success s(r)
+ }
+ } catch {
+ case NonFatal(t) => p failure t
+ }
+ }(executor)
+
+ p.future
+ }
+
+ /** Creates a new future by applying a function to the successful result of
+ * this future. If this future is completed with an exception then the new
+ * future will also contain this exception.
+ *
+ * $forComprehensionExamples
+ */
+ def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity)
+ val p = Promise[S]()
+
+ onComplete {
+ case result =>
+ try {
+ result match {
+ case Success(r) => p success f(r)
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
+ }
+ } catch {
+ case NonFatal(t) => p failure t
+ }
+ }(executor)
+
+ p.future
+ }
+
+ /** Creates a new future by applying a function to the successful result of
+ * this future, and returns the result of the function as the new future.
+ * If this future is completed with an exception then the new future will
+ * also contain this exception.
+ *
+ * $forComprehensionExamples
+ */
+ def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = {
+ val p = Promise[S]()
+
+ onComplete {
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
+ case Success(v) =>
+ try {
+ f(v).onComplete({
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
+ case Success(v) => p success v
+ })(internalExecutor)
+ } catch {
+ case NonFatal(t) => p failure t
+ }
+ }(executor)
+
+ p.future
+ }
+
+ /** Creates a new future by filtering the value of the current future with a predicate.
+ *
+ * If the current future contains a value which satisfies the predicate, the new future will also hold that value.
+ * Otherwise, the resulting future will fail with a `NoSuchElementException`.
+ *
+ * If the current future fails, then the resulting future also fails.
+ *
+ * Example:
+ * {{{
+ * val f = future { 5 }
+ * val g = f filter { _ % 2 == 1 }
+ * val h = f filter { _ % 2 == 0 }
+ * await(g, 0) // evaluates to 5
+ * await(h, 0) // throw a NoSuchElementException
+ * }}}
+ */
+ def filter(pred: T => Boolean)(implicit executor: ExecutionContext): Future[T] = {
+ val p = Promise[T]()
+
+ onComplete {
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[T]]
+ case Success(v) =>
+ try {
+ if (pred(v)) p success v
+ else p failure new NoSuchElementException("Future.filter predicate is not satisfied")
+ } catch {
+ case NonFatal(t) => p failure t
+ }
+ }(executor)
+
+ p.future
+ }
+
+ /** Used by for-comprehensions.
+ */
+ final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = filter(p)(executor)
+ // final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p)
+
+ // final class FutureWithFilter[+S](self: Future[S], p: S => Boolean) {
+ // def foreach(f: S => Unit): Unit = self filter p foreach f
+ // def map[R](f: S => R) = self filter p map f
+ // def flatMap[R](f: S => Future[R]) = self filter p flatMap f
+ // def withFilter(q: S => Boolean): FutureWithFilter[S] = new FutureWithFilter[S](self, x => p(x) && q(x))
+ // }
+
+ /** Creates a new future by mapping the value of the current future, if the given partial function is defined at that value.
+ *
+ * If the current future contains a value for which the partial function is defined, the new future will also hold that value.
+ * Otherwise, the resulting future will fail with a `NoSuchElementException`.
+ *
+ * If the current future fails, then the resulting future also fails.
+ *
+ * Example:
+ * {{{
+ * val f = future { -5 }
+ * val g = f collect {
+ * case x if x < 0 => -x
+ * }
+ * val h = f collect {
+ * case x if x > 0 => x * 2
+ * }
+ * await(g, 0) // evaluates to 5
+ * await(h, 0) // throw a NoSuchElementException
+ * }}}
+ */
+ def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = {
+ val p = Promise[S]()
+
+ onComplete {
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
+ case Success(v) =>
+ try {
+ if (pf.isDefinedAt(v)) p success pf(v)
+ else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v)
+ } catch {
+ case NonFatal(t) => p failure t
+ }
+ }(executor)
+
+ p.future
+ }
+
+ /** Creates a new future that will handle any matching throwable that this
+ * future might contain. If there is no match, or if this future contains
+ * a valid result then the new future will contain the same.
+ *
+ * Example:
+ *
+ * {{{
+ * future (6 / 0) recover { case e: ArithmeticException => 0 } // result: 0
+ * future (6 / 0) recover { case e: NotFoundException => 0 } // result: exception
+ * future (6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
+ * }}}
+ */
+ def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = {
+ val p = Promise[U]()
+
+ onComplete { case tr => p.complete(tr recover pf) }(executor)
+
+ p.future
+ }
+
+ /** Creates a new future that will handle any matching throwable that this
+ * future might contain by assigning it a value of another future.
+ *
+ * If there is no match, or if this future contains
+ * a valid result then the new future will contain the same result.
+ *
+ * Example:
+ *
+ * {{{
+ * val f = future { Int.MaxValue }
+ * future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue
+ * }}}
+ */
+ def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = {
+ val p = Promise[U]()
+
+ onComplete {
+ case Failure(t) if pf isDefinedAt t =>
+ try {
+ p completeWith pf(t)
+ } catch {
+ case NonFatal(t) => p failure t
+ }
+ case otherwise => p complete otherwise
+ }(executor)
+
+ p.future
+ }
+
+ /** Zips the values of `this` and `that` future, and creates
+ * a new future holding the tuple of their results.
+ *
+ * If `this` future fails, the resulting future is failed
+ * with the throwable stored in `this`.
+ * Otherwise, if `that` future fails, the resulting future is failed
+ * with the throwable stored in `that`.
+ */
+ def zip[U](that: Future[U]): Future[(T, U)] = {
+ val p = Promise[(T, U)]()
+
+ this onComplete {
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[(T, U)]]
+ case Success(r) =>
+ that onSuccess {
+ case r2 => p success ((r, r2))
+ }
+ that onFailure {
+ case f => p failure f
+ }
+ }
+
+ p.future
+ }
+
+ /** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
+ * the result of the `that` future if `that` is completed successfully.
+ * If both futures are failed, the resulting future holds the throwable object of the first future.
+ *
+ * Using this method will not cause concurrent programs to become nondeterministic.
+ *
+ * Example:
+ * {{{
+ * val f = future { sys.error("failed") }
+ * val g = future { 5 }
+ * val h = f fallbackTo g
+ * await(h, 0) // evaluates to 5
+ * }}}
+ */
+ def fallbackTo[U >: T](that: Future[U]): Future[U] = {
+ val p = Promise[U]()
+ onComplete {
+ case s @ Success(_) => p complete s
+ case _ => p completeWith that
+ }
+ p.future
+ }
+
+ /** Creates a new `Future[S]` which is completed with this `Future`'s result if
+ * that conforms to `S`'s erased type or a `ClassCastException` otherwise.
+ */
+ def mapTo[S](implicit tag: ClassManifest[S]): Future[S] = {
+ def boxedType(c: Class[_]): Class[_] = {
+ if (c.isPrimitive) Future.toBoxed(c) else c
+ }
+
+ val p = Promise[S]()
+
+ onComplete {
+ case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
+ case Success(t) =>
+ p complete (try {
+ Success(boxedType(tag.erasure).cast(t).asInstanceOf[S])
+ } catch {
+ case e: ClassCastException => Failure(e)
+ })
+ }
+
+ p.future
+ }
+
+ /** Applies the side-effecting function to the result of this future, and returns
+ * a new future with the result of this future.
+ *
+ * This method allows one to enforce that the callbacks are executed in a
+ * specified order.
+ *
+ * Note that if one of the chained `andThen` callbacks throws
+ * an exception, that exception is not propagated to the subsequent `andThen`
+ * callbacks. Instead, the subsequent `andThen` callbacks are given the original
+ * value of this future.
+ *
+ * The following example prints out `5`:
+ *
+ * {{{
+ * val f = future { 5 }
+ * f andThen {
+ * case r => sys.error("runtime exception")
+ * } andThen {
+ * case Failure(t) => println(t)
+ * case Success(v) => println(v)
+ * }
+ * }}}
+ */
+ def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = {
+ val p = Promise[T]()
+
+ onComplete {
+ case r => try if (pf isDefinedAt r) pf(r) finally p complete r
+ }(executor)
+
+ p.future
+ }
+
+}
+
+
+
+/** Future companion object.
+ *
+ * @define nonDeterministic
+ * Note: using this method yields nondeterministic dataflow programs.
+ */
+object Future {
+
+ private[concurrent] val toBoxed = Map[Class[_], Class[_]](
+ classOf[Boolean] -> classOf[java.lang.Boolean],
+ classOf[Byte] -> classOf[java.lang.Byte],
+ classOf[Char] -> classOf[java.lang.Character],
+ classOf[Short] -> classOf[java.lang.Short],
+ classOf[Int] -> classOf[java.lang.Integer],
+ classOf[Long] -> classOf[java.lang.Long],
+ classOf[Float] -> classOf[java.lang.Float],
+ classOf[Double] -> classOf[java.lang.Double],
+ classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
+ )
+
+ /** Creates an already completed Future with the specified exception.
+ *
+ * @tparam T the type of the value in the future
+ * @return the newly created `Future` object
+ */
+ def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future
+
+ /** Creates an already completed Future with the specified result.
+ *
+ * @tparam T the type of the value in the future
+ * @return the newly created `Future` object
+ */
+ def successful[T](result: T): Future[T] = Promise.successful(result).future
+
+ /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
+ *
+ * The result becomes available once the asynchronous computation is completed.
+ *
+ * @tparam T the type of the result
+ * @param body the asychronous computation
+ * @param execctx the execution context on which the future is run
+ * @return the `Future` holding the result of the computation
+ */
+ def apply[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = impl.Future(body)
+
+ /** Simple version of `Futures.traverse`. Transforms a `TraversableOnce[Future[A]]` into a `Future[TraversableOnce[A]]`.
+ * Useful for reducing many `Future`s into a single `Future`.
+ */
+ def sequence[A, M[_] <: TraversableOnce[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[A], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
+ in.foldLeft(Promise.successful(cbf()).future) {
+ (fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)
+ } map (_.result)
+ }
+
+ /** Returns a `Future` to the result of the first future in the list that is completed.
+ */
+ def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
+ val p = Promise[T]()
+
+ val completeFirst: Try[T] => Unit = p tryComplete _
+ futures.foreach(_ onComplete completeFirst)
+
+ p.future
+ }
+
+ /** Returns a `Future` that will hold the optional result of the first `Future` with a result that matches the predicate.
+ */
+ def find[T](futurestravonce: TraversableOnce[Future[T]])(predicate: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
+ val futures = futurestravonce.toBuffer
+ if (futures.isEmpty) Promise.successful[Option[T]](None).future
+ else {
+ val result = Promise[Option[T]]()
+ val ref = new AtomicInteger(futures.size)
+ val search: Try[T] => Unit = v => try {
+ v match {
+ case Success(r) => if (predicate(r)) result tryComplete Success(Some(r))
+ case _ =>
+ }
+ } finally {
+ if (ref.decrementAndGet == 0) {
+ result tryComplete Success(None)
+ }
+ }
+
+ futures.foreach(_ onComplete search)
+
+ result.future
+ }
+ }
+
+ /** A non-blocking fold over the specified futures, with the start value of the given zero.
+ * The fold is performed on the thread where the last future is completed,
+ * the result will be the first failure of any of the futures, or any failure in the actual fold,
+ * or the result of the fold.
+ *
+ * Example:
+ * {{{
+ * val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
+ * }}}
+ */
+ def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
+ if (futures.isEmpty) Promise.successful(zero).future
+ else sequence(futures).map(_.foldLeft(zero)(foldFun))
+ }
+
+ /** Initiates a fold over the supplied futures where the fold-zero is the result value of the `Future` that's completed first.
+ *
+ * Example:
+ * {{{
+ * val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds)
+ * }}}
+ */
+ def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
+ if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")).future
+ else sequence(futures).map(_ reduceLeft op)
+ }
+
+ /** Transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]` using the provided function `A => Future[B]`.
+ * This is useful for performing a parallel map. For example, to apply a function to all items of a list
+ * in parallel:
+ *
+ * {{{
+ * val myFutureList = Future.traverse(myList)(x => Future(myFunc(x)))
+ * }}}
+ */
+ def traverse[A, B, M[_] <: TraversableOnce[_]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
+ in.foldLeft(Promise.successful(cbf(in)).future) { (fr, a) =>
+ val fb = fn(a.asInstanceOf[A])
+ for (r <- fr; b <- fb) yield (r += b)
+ }.map(_.result)
+
+ // This is used to run callbacks which are internal
+ // to scala.concurrent; our own callbacks are only
+ // ever used to eventually run another callback,
+ // and that other callback will have its own
+ // executor because all callbacks come with
+ // an executor. Our own callbacks never block
+ // and have no "expected" exceptions.
+ // As a result, this executor can do nothing;
+ // some other executor will always come after
+ // it (and sometimes one will be before it),
+ // and those will be performing the "real"
+ // dispatch to code outside scala.concurrent.
+ // Because this exists, ExecutionContext.defaultExecutionContext
+ // isn't instantiated by Future internals, so
+ // if some code for some reason wants to avoid
+ // ever starting up the default context, it can do so
+ // by just not ever using it itself. scala.concurrent
+ // doesn't need to create defaultExecutionContext as
+ // a side effect.
+ private[concurrent] object InternalCallbackExecutor extends ExecutionContext {
+ override def execute(runnable: Runnable): Unit =
+ runnable.run()
+ override def reportFailure(t: Throwable): Unit =
+ throw new IllegalStateException("problem in scala.concurrent internal callback", t)
+ }
+}
+
+/** A marker indicating that a `java.lang.Runnable` provided to `scala.concurrent.ExecutionContext`
+ * wraps a callback provided to `Future.onComplete`.
+ * All callbacks provided to a `Future` end up going through `onComplete`, so this allows an
+ * `ExecutionContext` to special-case callbacks that were executed by `Future` if desired.
+ */
+trait OnCompleteRunnable {
+ self: Runnable =>
+}
+
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
new file mode 100644
index 0000000000..8355a73a1f
--- /dev/null
+++ b/src/library/scala/concurrent/Promise.scala
@@ -0,0 +1,152 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+import scala.util.{ Try, Success, Failure }
+
+/** Promise is an object which can be completed with a value or failed
+ * with an exception.
+ *
+ * @define promiseCompletion
+ * If the promise has already been fulfilled, failed or has timed out,
+ * calling this method will throw an IllegalStateException.
+ *
+ * @define allowedThrowables
+ * If the throwable used to fail this promise is an error, a control exception
+ * or an interrupted exception, it will be wrapped as a cause within an
+ * `ExecutionException` which will fail the promise.
+ *
+ * @define nonDeterministic
+ * Note: Using this method may result in non-deterministic concurrent programs.
+ */
+trait Promise[T] {
+
+ // used for internal callbacks defined in
+ // the lexical scope of this trait;
+ // _never_ for application callbacks.
+ private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor
+
+ /** Future containing the value of this promise.
+ */
+ def future: Future[T]
+
+ /** Returns whether the promise has already been completed with
+ * a value or an exception.
+ *
+ * $nonDeterministic
+ *
+ * @return `true` if the promise is already completed, `false` otherwise
+ */
+ def isCompleted: Boolean
+
+ /** Completes the promise with either an exception or a value.
+ *
+ * @param result Either the value or the exception to complete the promise with.
+ *
+ * $promiseCompletion
+ */
+ def complete(result: Try[T]): this.type =
+ if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.")
+
+ /** Tries to complete the promise with either a value or the exception.
+ *
+ * $nonDeterministic
+ *
+ * @return If the promise has already been completed returns `false`, or `true` otherwise.
+ */
+ def tryComplete(result: Try[T]): Boolean
+
+ /** Completes this promise with the specified future, once that future is completed.
+ *
+ * @return This promise
+ */
+ final def completeWith(other: Future[T]): this.type = {
+ other onComplete { this complete _ }
+ this
+ }
+
+ /** Attempts to complete this promise with the specified future, once that future is completed.
+ *
+ * @return This promise
+ */
+ final def tryCompleteWith(other: Future[T]): this.type = {
+ other onComplete { this tryComplete _ }
+ this
+ }
+
+ /** Completes the promise with a value.
+ *
+ * @param v The value to complete the promise with.
+ *
+ * $promiseCompletion
+ */
+ def success(v: T): this.type = complete(Success(v))
+
+ /** Tries to complete the promise with a value.
+ *
+ * $nonDeterministic
+ *
+ * @return If the promise has already been completed returns `false`, or `true` otherwise.
+ */
+ def trySuccess(value: T): Boolean = tryComplete(Success(value))
+
+ /** Completes the promise with an exception.
+ *
+ * @param t The throwable to complete the promise with.
+ *
+ * $allowedThrowables
+ *
+ * $promiseCompletion
+ */
+ def failure(t: Throwable): this.type = complete(Failure(t))
+
+ /** Tries to complete the promise with an exception.
+ *
+ * $nonDeterministic
+ *
+ * @return If the promise has already been completed returns `false`, or `true` otherwise.
+ */
+ def tryFailure(t: Throwable): Boolean = tryComplete(Failure(t))
+}
+
+
+
+object Promise {
+
+ /** Creates a promise object which can be completed with a value.
+ *
+ * @tparam T the type of the value in the promise
+ * @return the newly created `Promise` object
+ */
+ def apply[T](): Promise[T] = new impl.Promise.DefaultPromise[T]()
+
+ /** Creates an already completed Promise with the specified exception.
+ *
+ * @tparam T the type of the value in the promise
+ * @return the newly created `Promise` object
+ */
+ def failed[T](exception: Throwable): Promise[T] = new impl.Promise.KeptPromise[T](Failure(exception))
+
+ /** Creates an already completed Promise with the specified result.
+ *
+ * @tparam T the type of the value in the promise
+ * @return the newly created `Promise` object
+ */
+ def successful[T](result: T): Promise[T] = new impl.Promise.KeptPromise[T](Success(result))
+
+}
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/concurrent/duration/Deadline.scala b/src/library/scala/concurrent/duration/Deadline.scala
new file mode 100644
index 0000000000..61cbe47530
--- /dev/null
+++ b/src/library/scala/concurrent/duration/Deadline.scala
@@ -0,0 +1,81 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.duration
+
+/**
+ * This class stores a deadline, as obtained via `Deadline.now` or the
+ * duration DSL:
+ *
+ * {{{
+ * import scala.concurrent.duration._
+ * 3.seconds.fromNow
+ * }}}
+ *
+ * Its main purpose is to manage repeated attempts to achieve something (like
+ * awaiting a condition) by offering the methods `hasTimeLeft` and `timeLeft`. All
+ * durations are measured according to `System.nanoTime` aka wall-time; this
+ * does not take into account changes to the system clock (such as leap
+ * seconds).
+ */
+case class Deadline private (time: FiniteDuration) extends Ordered[Deadline] {
+ /**
+ * Return a deadline advanced (i.e. moved into the future) by the given duration.
+ */
+ def +(other: FiniteDuration): Deadline = copy(time = time + other)
+ /**
+ * Return a deadline moved backwards (i.e. towards the past) by the given duration.
+ */
+ def -(other: FiniteDuration): Deadline = copy(time = time - other)
+ /**
+ * Calculate time difference between this and the other deadline, where the result is directed (i.e. may be negative).
+ */
+ def -(other: Deadline): FiniteDuration = time - other.time
+ /**
+ * Calculate time difference between this duration and now; the result is negative if the deadline has passed.
+ *
+ * '''''Note that on some systems this operation is costly because it entails a system call.'''''
+ * Check `System.nanoTime` for your platform.
+ */
+ def timeLeft: FiniteDuration = this - Deadline.now
+ /**
+ * Determine whether the deadline still lies in the future at the point where this method is called.
+ *
+ * '''''Note that on some systems this operation is costly because it entails a system call.'''''
+ * Check `System.nanoTime` for your platform.
+ */
+ def hasTimeLeft(): Boolean = !isOverdue()
+ /**
+ * Determine whether the deadline lies in the past at the point where this method is called.
+ *
+ * '''''Note that on some systems this operation is costly because it entails a system call.'''''
+ * Check `System.nanoTime` for your platform.
+ */
+ def isOverdue(): Boolean = (time.toNanos - System.nanoTime()) < 0
+ /**
+ * The natural ordering for deadline is determined by the natural order of the underlying (finite) duration.
+ */
+ def compare(other: Deadline) = time compare other.time
+}
+
+object Deadline {
+ /**
+ * Construct a deadline due exactly at the point where this method is called. Useful for then
+ * advancing it to obtain a future deadline, or for sampling the current time exactly once and
+ * then comparing it to multiple deadlines (using subtraction).
+ */
+ def now: Deadline = Deadline(Duration(System.nanoTime, NANOSECONDS))
+
+ /**
+ * The natural ordering for deadline is determined by the natural order of the underlying (finite) duration.
+ */
+ implicit object DeadlineIsOrdered extends Ordering[Deadline] {
+ def compare(a: Deadline, b: Deadline) = a compare b
+ }
+
+}
diff --git a/src/library/scala/concurrent/duration/Duration.scala b/src/library/scala/concurrent/duration/Duration.scala
new file mode 100644
index 0000000000..5ac9887ae3
--- /dev/null
+++ b/src/library/scala/concurrent/duration/Duration.scala
@@ -0,0 +1,697 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.duration
+
+import java.lang.{ Double => JDouble, Long => JLong }
+
+object Duration {
+
+ /**
+ * Construct a Duration from the given length and unit. Observe that nanosecond precision may be lost if
+ *
+ * - the unit is NANOSECONDS
+ * - and the length has an absolute value greater than 2^53
+ *
+ * Infinite inputs (and NaN) are converted into [[Duration.Inf]], [[Duration.MinusInf]] and [[Duration.Undefined]], respectively.
+ *
+ * @throws IllegalArgumentException if the length was finite but the resulting duration cannot be expressed as a [[FiniteDuration]]
+ */
+ def apply(length: Double, unit: TimeUnit): Duration = fromNanos(unit.toNanos(1) * length)
+
+ /**
+ * Construct a finite duration from the given length and time unit. The unit given is retained
+ * throughout calculations as long as possible, so that it can be retrieved later.
+ */
+ def apply(length: Long, unit: TimeUnit): FiniteDuration = new FiniteDuration(length, unit)
+
+ /**
+ * Construct a finite duration from the given length and time unit, where the latter is
+ * looked up in a list of string representation. Valid choices are:
+ *
+ * `d, day, h, hour, min, minute, s, sec, second, ms, milli, millisecond, µs, micro, microsecond, ns, nano, nanosecond`
+ * and their pluralized forms (for every but the first mentioned form of each unit, i.e. no "ds", but "days").
+ */
+ def apply(length: Long, unit: String): FiniteDuration = new FiniteDuration(length, Duration.timeUnit(unit))
+
+ // Double stores 52 bits mantissa, but there is an implied '1' in front, making the limit 2^53
+ private[this] final val maxPreciseDouble = 9007199254740992d
+
+ /**
+ * Parse String into Duration. Format is `"<length><unit>"`, where
+ * whitespace is allowed before, between and after the parts. Infinities are
+ * designated by `"Inf"`, `"PlusInf"`, `"+Inf"` and `"-Inf"` or `"MinusInf"`.
+ *
+ * @throws NumberFormatException if format is not parseable
+ */
+ def apply(s: String): Duration = {
+ val s1: String = s filterNot (_.isWhitespace)
+ s1 match {
+ case "Inf" | "PlusInf" | "+Inf" => Inf
+ case "MinusInf" | "-Inf" => MinusInf
+ case _ =>
+ val unitName = s1.reverse takeWhile (_.isLetter) reverse;
+ timeUnit get unitName match {
+ case Some(unit) =>
+ val valueStr = s1 dropRight unitName.length
+ val valueD = JDouble.parseDouble(valueStr)
+ if (valueD >= -maxPreciseDouble && valueD <= maxPreciseDouble) Duration(valueD, unit)
+ else Duration(JLong.parseLong(valueStr), unit)
+ case _ => throw new NumberFormatException("format error " + s)
+ }
+ }
+ }
+
+ // "ms milli millisecond" -> List("ms", "milli", "millis", "millisecond", "milliseconds")
+ private[this] def words(s: String) = (s.trim split "\\s+").toList
+ private[this] def expandLabels(labels: String): List[String] = {
+ val hd :: rest = words(labels)
+ hd :: rest.flatMap(s => List(s, s + "s"))
+ }
+ private[this] val timeUnitLabels = List(
+ DAYS -> "d day",
+ HOURS -> "h hour",
+ MINUTES -> "min minute",
+ SECONDS -> "s sec second",
+ MILLISECONDS -> "ms milli millisecond",
+ MICROSECONDS -> "µs micro microsecond",
+ NANOSECONDS -> "ns nano nanosecond"
+ )
+
+ // TimeUnit => standard label
+ protected[duration] val timeUnitName: Map[TimeUnit, String] =
+ timeUnitLabels.toMap mapValues (s => words(s).last) toMap
+
+ // Label => TimeUnit
+ protected[duration] val timeUnit: Map[String, TimeUnit] =
+ timeUnitLabels flatMap { case (unit, names) => expandLabels(names) map (_ -> unit) } toMap
+
+ /**
+ * Extract length and time unit out of a string, where the format must match the description for [[Duration$.apply(String):Duration apply(String)]].
+ * The extractor will not match for malformed strings or non-finite durations.
+ */
+ def unapply(s: String): Option[(Long, TimeUnit)] =
+ ( try Some(apply(s)) catch { case _: RuntimeException => None } ) flatMap unapply
+
+ /**
+ * Extract length and time unit out of a duration, if it is finite.
+ */
+ def unapply(d: Duration): Option[(Long, TimeUnit)] =
+ if (d.isFinite) Some((d.length, d.unit)) else None
+
+ /**
+ * Construct a possibly infinite or undefined Duration from the given number of nanoseconds.
+ *
+ * - `Double.PositiveInfinity` is mapped to [[Duration.Inf]]
+ * - `Double.NegativeInfinity` is mapped to [[Duration.MinusInf]]
+ * - `Double.NaN` is mapped to [[Duration.Undefined]]
+ * - `-0d` is mapped to [[Duration.Zero]] (exactly like `0d`)
+ *
+ * The semantics of the resulting Duration objects matches the semantics of their Double
+ * counterparts with respect to arithmetic operations.
+ *
+ * @throws IllegalArgumentException if the length was finite but the resulting duration cannot be expressed as a [[FiniteDuration]]
+ */
+ def fromNanos(nanos: Double): Duration = {
+ if (nanos.isInfinite)
+ if (nanos > 0) Inf else MinusInf
+ else if (nanos.isNaN)
+ Undefined
+ else if (nanos > Long.MaxValue || nanos < Long.MinValue)
+ throw new IllegalArgumentException("trying to construct too large duration with " + nanos + "ns")
+ else
+ fromNanos((nanos + 0.5).toLong)
+ }
+
+ private[this] final val µs_per_ns = 1000L
+ private[this] final val ms_per_ns = µs_per_ns * 1000
+ private[this] final val s_per_ns = ms_per_ns * 1000
+ private[this] final val min_per_ns = s_per_ns * 60
+ private[this] final val h_per_ns = min_per_ns * 60
+ private[this] final val d_per_ns = h_per_ns * 24
+
+ /**
+ * Construct a finite duration from the given number of nanoseconds. The
+ * result will have the coarsest possible time unit which can exactly express
+ * this duration.
+ *
+ * @throws IllegalArgumentException for `Long.MinValue` since that would lead to inconsistent behavior afterwards (cannot be negated)
+ */
+ def fromNanos(nanos: Long): FiniteDuration = {
+ if (nanos % d_per_ns == 0) Duration(nanos / d_per_ns, DAYS)
+ else if (nanos % h_per_ns == 0) Duration(nanos / h_per_ns, HOURS)
+ else if (nanos % min_per_ns == 0) Duration(nanos / min_per_ns, MINUTES)
+ else if (nanos % s_per_ns == 0) Duration(nanos / s_per_ns, SECONDS)
+ else if (nanos % ms_per_ns == 0) Duration(nanos / ms_per_ns, MILLISECONDS)
+ else if (nanos % µs_per_ns == 0) Duration(nanos / µs_per_ns, MICROSECONDS)
+ else Duration(nanos, NANOSECONDS)
+ }
+
+ /**
+ * Preconstructed value of `0.days`.
+ */
+ // unit as coarse as possible to keep (_ + Zero) sane unit-wise
+ val Zero: FiniteDuration = new FiniteDuration(0, DAYS)
+
+ /**
+ * The Undefined value corresponds closely to Double.NaN:
+ *
+ * - it is the result of otherwise invalid operations
+ * - it does not equal itself (according to `equals()`)
+ * - it compares greater than any other Duration apart from itself (for which `compare` returns 0)
+ *
+ * The particular comparison semantics mirror those of Double.NaN.
+ *
+ * '''''Use `eq` when checking an input of a method against this value.'''''
+ */
+ val Undefined: Infinite = new Infinite {
+ override def toString = "Duration.Undefined"
+ override def equals(other: Any) = false
+ override def +(other: Duration): Duration = this
+ override def -(other: Duration): Duration = this
+ override def *(factor: Double): Duration = this
+ override def /(factor: Double): Duration = this
+ override def /(other: Duration): Double = Double.NaN
+ def compare(other: Duration) = if (other eq this) 0 else 1
+ def unary_- : Duration = this
+ def toUnit(unit: TimeUnit): Double = Double.NaN
+ }
+
+ sealed abstract class Infinite extends Duration {
+ def +(other: Duration): Duration = other match {
+ case x if x eq Undefined => Undefined
+ case x: Infinite if x ne this => Undefined
+ case _ => this
+ }
+ def -(other: Duration): Duration = other match {
+ case x if x eq Undefined => Undefined
+ case x: Infinite if x eq this => Undefined
+ case _ => this
+ }
+
+ def *(factor: Double): Duration =
+ if (factor == 0d || factor.isNaN) Undefined
+ else if (factor < 0d) -this
+ else this
+ def /(divisor: Double): Duration =
+ if (divisor.isNaN || divisor.isInfinite) Undefined
+ else if ((divisor compare 0d) < 0) -this
+ else this
+ def /(divisor: Duration): Double = divisor match {
+ case _: Infinite => Double.NaN
+ case x => Double.PositiveInfinity * (if ((this > Zero) ^ (divisor >= Zero)) -1 else 1)
+ }
+
+ final def isFinite() = false
+
+ private[this] def fail(what: String) = throw new IllegalArgumentException(what + " not allowed on infinite Durations")
+ final def length: Long = fail("length")
+ final def unit: TimeUnit = fail("unit")
+ final def toNanos: Long = fail("toNanos")
+ final def toMicros: Long = fail("toMicros")
+ final def toMillis: Long = fail("toMillis")
+ final def toSeconds: Long = fail("toSeconds")
+ final def toMinutes: Long = fail("toMinutes")
+ final def toHours: Long = fail("toHours")
+ final def toDays: Long = fail("toDays")
+ }
+
+ /**
+ * Infinite duration: greater than any other (apart from Undefined) and not equal to any other
+ * but itself. This value closely corresponds to Double.PositiveInfinity,
+ * matching its semantics in arithmetic operations.
+ */
+ val Inf: Infinite = new Infinite {
+ override def toString = "Duration.Inf"
+ def compare(other: Duration) = other match {
+ case x if x eq Undefined => -1 // Undefined != Undefined
+ case x if x eq this => 0 // `case Inf` will include null checks in the byte code
+ case _ => 1
+ }
+ def unary_- : Duration = MinusInf
+ def toUnit(unit: TimeUnit): Double = Double.PositiveInfinity
+ }
+
+ /**
+ * Infinite duration: less than any other and not equal to any other
+ * but itself. This value closely corresponds to Double.NegativeInfinity,
+ * matching its semantics in arithmetic operations.
+ */
+ val MinusInf: Infinite = new Infinite {
+ override def toString = "Duration.MinusInf"
+ def compare(other: Duration) = if (other eq this) 0 else -1
+ def unary_- : Duration = Inf
+ def toUnit(unit: TimeUnit): Double = Double.NegativeInfinity
+ }
+
+ // Java Factories
+
+ /**
+ * Construct a finite duration from the given length and time unit. The unit given is retained
+ * throughout calculations as long as possible, so that it can be retrieved later.
+ */
+ def create(length: Long, unit: TimeUnit): FiniteDuration = apply(length, unit)
+ /**
+ * Construct a Duration from the given length and unit. Observe that nanosecond precision may be lost if
+ *
+ * - the unit is NANOSECONDS
+ * - and the length has an absolute value greater than 2^53
+ *
+ * Infinite inputs (and NaN) are converted into [[Duration.Inf]], [[Duration.MinusInf]] and [[Duration.Undefined]], respectively.
+ *
+ * @throws IllegalArgumentException if the length was finite but the resulting duration cannot be expressed as a [[FiniteDuration]]
+ */
+ def create(length: Double, unit: TimeUnit): Duration = apply(length, unit)
+ /**
+ * Construct a finite duration from the given length and time unit, where the latter is
+ * looked up in a list of string representation. Valid choices are:
+ *
+ * `d, day, h, hour, min, minute, s, sec, second, ms, milli, millisecond, µs, micro, microsecond, ns, nano, nanosecond`
+ * and their pluralized forms (for every but the first mentioned form of each unit, i.e. no "ds", but "days").
+ */
+ def create(length: Long, unit: String): FiniteDuration = apply(length, unit)
+ /**
+ * Parse String into Duration. Format is `"<length><unit>"`, where
+ * whitespace is allowed before, between and after the parts. Infinities are
+ * designated by `"Inf"`, `"PlusInf"`, `"+Inf"` and `"-Inf"` or `"MinusInf"`.
+ *
+ * @throws NumberFormatException if format is not parseable
+ */
+ def create(s: String): Duration = apply(s)
+
+ /**
+ * The natural ordering of durations matches the natural ordering for Double, including non-finite values.
+ */
+ implicit object DurationIsOrdered extends Ordering[Duration] {
+ def compare(a: Duration, b: Duration) = a compare b
+ }
+}
+
+/**
+ * <h2>Utility for working with java.util.concurrent.TimeUnit durations.</h2>
+ *
+ * '''''This class is not meant as a general purpose representation of time, it is
+ * optimized for the needs of `scala.concurrent`.'''''
+ *
+ * <h2>Basic Usage</h2>
+ *
+ * <p/>
+ * Examples:
+ * {{{
+ * import scala.concurrent.duration._
+ *
+ * val duration = Duration(100, MILLISECONDS)
+ * val duration = Duration(100, "millis")
+ *
+ * duration.toNanos
+ * duration < 1.second
+ * duration <= Duration.Inf
+ * }}}
+ *
+ * '''''Invoking inexpressible conversions (like calling `toSeconds` on an infinite duration) will throw an IllegalArgumentException.'''''
+ *
+ * <p/>
+ * Implicits are also provided for Int, Long and Double. Example usage:
+ * {{{
+ * import scala.concurrent.duration._
+ *
+ * val duration = 100 millis
+ * }}}
+ *
+ * '''''The DSL provided by the implicit conversions always allows construction of finite durations, even for infinite Double inputs; use Duration.Inf instead.'''''
+ *
+ * Extractors, parsing and arithmetic are also included:
+ * {{{
+ * val d = Duration("1.2 µs")
+ * val Duration(length, unit) = 5 millis
+ * val d2 = d * 2.5
+ * val d3 = d2 + 1.millisecond
+ * }}}
+ *
+ * <h2>Handling of Time Units</h2>
+ *
+ * Calculations performed on finite durations always retain the more precise unit of either operand, no matter
+ * whether a coarser unit would be able to exactly express the same duration. This means that Duration can be
+ * used as a lossless container for a (length, unit) pair if it is constructed using the corresponding methods
+ * and no arithmetic is performed on it; adding/subtracting durations should in that case be done with care.
+ *
+ * <h2>Correspondence to Double Semantics</h2>
+ *
+ * The semantics of arithmetic operations on Duration are two-fold:
+ *
+ * - exact addition/subtraction with nanosecond resolution for finite durations, independent of the summands' magnitude
+ * - isomorphic to `java.lang.Double` when it comes to infinite or undefined values
+ *
+ * The conversion between Duration and Double is done using [[Duration.toUnit]] (with unit NANOSECONDS)
+ * and [[Duration$.fromNanos(Double):Duration Duration.fromNanos(Double)]].
+ *
+ * <h2>Ordering</h2>
+ *
+ * The default ordering is consistent with the ordering of Double numbers, which means that Undefined is
+ * considered greater than all other durations, including [[Duration.Inf]].
+ *
+ * @define exc @throws IllegalArgumentException when invoked on a non-finite duration
+ *
+ * @define ovf @throws IllegalArgumentException in case of a finite overflow: the range of a finite duration is +-(2^63-1)ns, and no conversion to infinite durations takes place.
+ */
+sealed abstract class Duration extends Serializable with Ordered[Duration] {
+ /**
+ * Obtain the length of this Duration measured in the unit obtained by the `unit` method.
+ *
+ * $exc
+ */
+ def length: Long
+ /**
+ * Obtain the time unit in which the length of this duration is measured.
+ *
+ * $exc
+ */
+ def unit: TimeUnit
+ /**
+ * Return the length of this duration measured in whole nanoseconds, rounding towards zero.
+ *
+ * $exc
+ */
+ def toNanos: Long
+ /**
+ * Return the length of this duration measured in whole microseconds, rounding towards zero.
+ *
+ * $exc
+ */
+ def toMicros: Long
+ /**
+ * Return the length of this duration measured in whole milliseconds, rounding towards zero.
+ *
+ * $exc
+ */
+ def toMillis: Long
+ /**
+ * Return the length of this duration measured in whole seconds, rounding towards zero.
+ *
+ * $exc
+ */
+ def toSeconds: Long
+ /**
+ * Return the length of this duration measured in whole minutes, rounding towards zero.
+ *
+ * $exc
+ */
+ def toMinutes: Long
+ /**
+ * Return the length of this duration measured in whole hours, rounding towards zero.
+ *
+ * $exc
+ */
+ def toHours: Long
+ /**
+ * Return the length of this duration measured in whole days, rounding towards zero.
+ *
+ * $exc
+ */
+ def toDays: Long
+ /**
+ * Return the number of nanoseconds as floating point number, scaled down to the given unit.
+ * The result may not precisely represent this duration due to the Double datatype's inherent
+ * limitations (mantissa size effectively 53 bits). Non-finite durations are represented as
+ * - [[Duration.Undefined]] is mapped to Double.NaN
+ * - [[Duration.Inf]] is mapped to Double.PositiveInfinity
+ * - [[Duration.MinusInf]] is mapped to Double.NegativeInfinity
+ */
+ def toUnit(unit: TimeUnit): Double
+
+ /**
+ * Return the sum of that duration and this. When involving non-finite summands the semantics match those
+ * of Double.
+ *
+ * $ovf
+ */
+ def +(other: Duration): Duration
+ /**
+ * Return the difference of that duration and this. When involving non-finite summands the semantics match those
+ * of Double.
+ *
+ * $ovf
+ */
+ def -(other: Duration): Duration
+ /**
+ * Return this duration multiplied by the scalar factor. When involving non-finite factors the semantics match those
+ * of Double.
+ *
+ * $ovf
+ */
+ def *(factor: Double): Duration
+ /**
+ * Return this duration divided by the scalar factor. When involving non-finite factors the semantics match those
+ * of Double.
+ *
+ * $ovf
+ */
+ def /(divisor: Double): Duration
+ /**
+ * Return the quotient of this and that duration as floating-point number. The semantics are
+ * determined by Double as if calculating the quotient of the nanosecond lengths of both factors.
+ */
+ def /(divisor: Duration): Double
+ /**
+ * Negate this duration. The only two values which are mapped to themselves are [[Duration.Zero]] and [[Duration.Undefined]].
+ */
+ def unary_- : Duration
+ /**
+ * This method returns whether this duration is finite, which is not the same as
+ * `!isInfinite` for Double because this method also returns `false` for [[Duration.Undefined]].
+ */
+ def isFinite(): Boolean
+ /**
+ * Return the smaller of this and that duration as determined by the natural ordering.
+ */
+ def min(other: Duration): Duration = if (this < other) this else other
+ /**
+ * Return the larger of this and that duration as determined by the natural ordering.
+ */
+ def max(other: Duration): Duration = if (this > other) this else other
+
+ // Java API
+
+ /**
+ * Return this duration divided by the scalar factor. When involving non-finite factors the semantics match those
+ * of Double.
+ *
+ * $ovf
+ */
+ def div(divisor: Double) = this / divisor
+ /**
+ * Return the quotient of this and that duration as floating-point number. The semantics are
+ * determined by Double as if calculating the quotient of the nanosecond lengths of both factors.
+ */
+ def div(other: Duration) = this / other
+ def gt(other: Duration) = this > other
+ def gteq(other: Duration) = this >= other
+ def lt(other: Duration) = this < other
+ def lteq(other: Duration) = this <= other
+ /**
+ * Return the difference of that duration and this. When involving non-finite summands the semantics match those
+ * of Double.
+ *
+ * $ovf
+ */
+ def minus(other: Duration) = this - other
+ /**
+ * Return this duration multiplied by the scalar factor. When involving non-finite factors the semantics match those
+ * of Double.
+ *
+ * $ovf
+ */
+ def mul(factor: Double) = this * factor
+ /**
+ * Negate this duration. The only two values which are mapped to themselves are [[Duration.Zero]] and [[Duration.Undefined]].
+ */
+ def neg() = -this
+ /**
+ * Return the sum of that duration and this. When involving non-finite summands the semantics match those
+ * of Double.
+ *
+ * $ovf
+ */
+ def plus(other: Duration) = this + other
+}
+
+object FiniteDuration {
+
+ implicit object FiniteDurationIsOrdered extends Ordering[FiniteDuration] {
+ def compare(a: FiniteDuration, b: FiniteDuration) = a compare b
+ }
+
+ def apply(length: Long, unit: TimeUnit) = new FiniteDuration(length, unit)
+ def apply(length: Long, unit: String) = new FiniteDuration(length, Duration.timeUnit(unit))
+
+ // limit on abs. value of durations in their units
+ private final val max_ns = Long.MaxValue
+ private final val max_µs = max_ns / 1000
+ private final val max_ms = max_µs / 1000
+ private final val max_s = max_ms / 1000
+ private final val max_min= max_s / 60
+ private final val max_h = max_min / 60
+ private final val max_d = max_h / 24
+}
+
+/**
+ * This class represents a finite duration. Its addition and subtraction operators are overloaded to retain
+ * this guarantee statically. The range of this class is limited to +-(2^63-1)ns, which is roughly 292 years.
+ */
+final class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duration {
+ import FiniteDuration._
+ import Duration._
+
+ private[this] def bounded(max: Long) = -max <= length && length <= max
+
+ require(unit match {
+ /*
+ * enforce the 2^63-1 ns limit, must be pos/neg symmetrical because of unary_-
+ */
+ case NANOSECONDS ⇒ bounded(max_ns)
+ case MICROSECONDS ⇒ bounded(max_µs)
+ case MILLISECONDS ⇒ bounded(max_ms)
+ case SECONDS ⇒ bounded(max_s)
+ case MINUTES ⇒ bounded(max_min)
+ case HOURS ⇒ bounded(max_h)
+ case DAYS ⇒ bounded(max_d)
+ case _ ⇒
+ val v = DAYS.convert(length, unit)
+ -max_d <= v && v <= max_d
+ }, "Duration is limited to +-(2^63-1)ns (ca. 292 years)")
+
+ def toNanos = unit.toNanos(length)
+ def toMicros = unit.toMicros(length)
+ def toMillis = unit.toMillis(length)
+ def toSeconds = unit.toSeconds(length)
+ def toMinutes = unit.toMinutes(length)
+ def toHours = unit.toHours(length)
+ def toDays = unit.toDays(length)
+ def toUnit(u: TimeUnit) = toNanos.toDouble / NANOSECONDS.convert(1, u)
+
+ /**
+ * Construct a [[Deadline]] from this duration by adding it to the current instant `Deadline.now`.
+ */
+ def fromNow: Deadline = Deadline.now + this
+
+ private[this] def unitString = timeUnitName(unit) + ( if (length == 1) "" else "s" )
+ override def toString = "" + length + " " + unitString
+
+ def compare(other: Duration) = other match {
+ case x: FiniteDuration => toNanos compare x.toNanos
+ case _ => -(other compare this)
+ }
+
+ // see https://www.securecoding.cert.org/confluence/display/java/NUM00-J.+Detect+or+prevent+integer+overflow
+ private[this] def safeAdd(a: Long, b: Long): Long = {
+ if ((b > 0) && (a > Long.MaxValue - b) ||
+ (b < 0) && (a < Long.MinValue - b)) throw new IllegalArgumentException("integer overflow")
+ a + b
+ }
+ private[this] def add(otherLength: Long, otherUnit: TimeUnit): FiniteDuration = {
+ val commonUnit = if (otherUnit.convert(1, unit) == 0) unit else otherUnit
+ val totalLength = safeAdd(commonUnit.convert(length, unit), commonUnit.convert(otherLength, otherUnit))
+ new FiniteDuration(totalLength, commonUnit)
+ }
+
+ def +(other: Duration) = other match {
+ case x: FiniteDuration => add(x.length, x.unit)
+ case _ => other
+ }
+ def -(other: Duration) = other match {
+ case x: FiniteDuration => add(-x.length, x.unit)
+ case _ => other
+ }
+
+ def *(factor: Double) =
+ if (!factor.isInfinite) fromNanos(toNanos * factor)
+ else if (factor.isNaN) Undefined
+ else if ((factor > 0) ^ (this < Zero)) Inf
+ else MinusInf
+
+ def /(divisor: Double) =
+ if (!divisor.isInfinite) fromNanos(toNanos / divisor)
+ else if (divisor.isNaN) Undefined
+ else Zero
+
+ // if this is made a constant, then scalac will elide the conditional and always return +0.0, SI-6331
+ private[this] def minusZero = -0d
+ def /(divisor: Duration): Double =
+ if (divisor.isFinite) toNanos.toDouble / divisor.toNanos
+ else if (divisor eq Undefined) Double.NaN
+ else if ((length < 0) ^ (divisor > Zero)) 0d
+ else minusZero
+
+ // overloaded methods taking FiniteDurations, so that you can calculate while statically staying finite
+ def +(other: FiniteDuration) = add(other.length, other.unit)
+ def -(other: FiniteDuration) = add(-other.length, other.unit)
+ def plus(other: FiniteDuration) = this + other
+ def minus(other: FiniteDuration) = this - other
+ def min(other: FiniteDuration) = if (this < other) this else other
+ def max(other: FiniteDuration) = if (this > other) this else other
+
+ // overloaded methods taking Long so that you can calculate while statically staying finite
+
+ /**
+ * Return the quotient of this duration and the given integer factor.
+ *
+ * @throws ArithmeticException if the factor is 0
+ */
+ def /(divisor: Long) = fromNanos(toNanos / divisor)
+
+ /**
+ * Return the product of this duration and the given integer factor.
+ *
+ * @throws IllegalArgumentException if the result would overflow the range of FiniteDuration
+ */
+ def *(factor: Long) = new FiniteDuration(safeMul(length, factor), unit)
+
+ /*
+ * This method avoids the use of Long division, which saves 95% of the time spent,
+ * by checking that there are enough leading zeros so that the result has a chance
+ * to fit into a Long again; the remaining edge cases are caught by using the sign
+ * of the product for overflow detection.
+ *
+ * This method is not general purpose because it disallows the (otherwise legal)
+ * case of Long.MinValue * 1, but that is okay for use in FiniteDuration, since
+ * Long.MinValue is not a legal `length` anyway.
+ */
+ private def safeMul(_a: Long, _b: Long): Long = {
+ val a = math.abs(_a)
+ val b = math.abs(_b)
+ import java.lang.Long.{ numberOfLeadingZeros => leading }
+ if (leading(a) + leading(b) < 64) throw new IllegalArgumentException("multiplication overflow")
+ val product = a * b
+ if (product < 0) throw new IllegalArgumentException("multiplication overflow")
+ if (a == _a ^ b == _b) -product else product
+ }
+
+ /**
+ * Return the quotient of this duration and the given integer factor.
+ *
+ * @throws ArithmeticException if the factor is 0
+ */
+ def div(divisor: Long) = this / divisor
+
+ /**
+ * Return the product of this duration and the given integer factor.
+ *
+ * @throws IllegalArgumentException if the result would overflow the range of FiniteDuration
+ */
+ def mul(factor: Long) = this * factor
+
+ def unary_- = Duration(-length, unit)
+
+ final def isFinite() = true
+
+ override def equals(other: Any) = other match {
+ case x: FiniteDuration => toNanos == x.toNanos
+ case _ => super.equals(other)
+ }
+ override def hashCode = toNanos.toInt
+}
diff --git a/src/library/scala/concurrent/duration/DurationConversions.scala b/src/library/scala/concurrent/duration/DurationConversions.scala
new file mode 100644
index 0000000000..2e5ff4752b
--- /dev/null
+++ b/src/library/scala/concurrent/duration/DurationConversions.scala
@@ -0,0 +1,92 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.duration
+
+import DurationConversions._
+
+// Would be nice to limit the visibility of this trait a little bit,
+// but it crashes scalac to do so.
+trait DurationConversions {
+ protected def durationIn(unit: TimeUnit): FiniteDuration
+
+ def nanoseconds = durationIn(NANOSECONDS)
+ def nanos = nanoseconds
+ def nanosecond = nanoseconds
+ def nano = nanoseconds
+
+ def microseconds = durationIn(MICROSECONDS)
+ def micros = microseconds
+ def microsecond = microseconds
+ def micro = microseconds
+
+ def milliseconds = durationIn(MILLISECONDS)
+ def millis = milliseconds
+ def millisecond = milliseconds
+ def milli = milliseconds
+
+ def seconds = durationIn(SECONDS)
+ def second = seconds
+
+ def minutes = durationIn(MINUTES)
+ def minute = minutes
+
+ def hours = durationIn(HOURS)
+ def hour = hours
+
+ def days = durationIn(DAYS)
+ def day = days
+
+ def nanoseconds[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(nanoseconds)
+ def nanos[C](c: C)(implicit ev: Classifier[C]): ev.R = nanoseconds(c)
+ def nanosecond[C](c: C)(implicit ev: Classifier[C]): ev.R = nanoseconds(c)
+ def nano[C](c: C)(implicit ev: Classifier[C]): ev.R = nanoseconds(c)
+
+ def microseconds[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(microseconds)
+ def micros[C](c: C)(implicit ev: Classifier[C]): ev.R = microseconds(c)
+ def microsecond[C](c: C)(implicit ev: Classifier[C]): ev.R = microseconds(c)
+ def micro[C](c: C)(implicit ev: Classifier[C]): ev.R = microseconds(c)
+
+ def milliseconds[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(milliseconds)
+ def millis[C](c: C)(implicit ev: Classifier[C]): ev.R = milliseconds(c)
+ def millisecond[C](c: C)(implicit ev: Classifier[C]): ev.R = milliseconds(c)
+ def milli[C](c: C)(implicit ev: Classifier[C]): ev.R = milliseconds(c)
+
+ def seconds[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(seconds)
+ def second[C](c: C)(implicit ev: Classifier[C]): ev.R = seconds(c)
+
+ def minutes[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(minutes)
+ def minute[C](c: C)(implicit ev: Classifier[C]): ev.R = minutes(c)
+
+ def hours[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(hours)
+ def hour[C](c: C)(implicit ev: Classifier[C]): ev.R = hours(c)
+
+ def days[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(days)
+ def day[C](c: C)(implicit ev: Classifier[C]): ev.R = days(c)
+}
+
+/**
+ * This object just holds some cogs which make the DSL machine work, not for direct consumption.
+ */
+object DurationConversions {
+ trait Classifier[C] {
+ type R
+ def convert(d: FiniteDuration): R
+ }
+
+ implicit object spanConvert extends Classifier[span.type] {
+ type R = FiniteDuration
+ def convert(d: FiniteDuration) = d
+ }
+
+ implicit object fromNowConvert extends Classifier[fromNow.type] {
+ type R = Deadline
+ def convert(d: FiniteDuration) = Deadline.now + d
+ }
+
+}
diff --git a/src/library/scala/concurrent/duration/package.scala b/src/library/scala/concurrent/duration/package.scala
new file mode 100644
index 0000000000..1b3461414e
--- /dev/null
+++ b/src/library/scala/concurrent/duration/package.scala
@@ -0,0 +1,79 @@
+package scala.concurrent
+
+package object duration {
+ /**
+ * This object can be used as closing token if you prefer dot-less style but do not want
+ * to enable language.postfixOps:
+ *
+ * {{{
+ * import scala.concurrent.duration._
+ *
+ * val duration = 2 seconds span
+ * }}}
+ */
+ object span
+
+ /**
+ * This object can be used as closing token for declaring a deadline at some future point
+ * in time:
+ *
+ * {{{
+ * import scala.concurrent.duration._
+ *
+ * val deadline = 3 seconds fromNow
+ * }}}
+ */
+ object fromNow
+
+ type TimeUnit = java.util.concurrent.TimeUnit
+ final val DAYS = java.util.concurrent.TimeUnit.DAYS
+ final val HOURS = java.util.concurrent.TimeUnit.HOURS
+ final val MICROSECONDS = java.util.concurrent.TimeUnit.MICROSECONDS
+ final val MILLISECONDS = java.util.concurrent.TimeUnit.MILLISECONDS
+ final val MINUTES = java.util.concurrent.TimeUnit.MINUTES
+ final val NANOSECONDS = java.util.concurrent.TimeUnit.NANOSECONDS
+ final val SECONDS = java.util.concurrent.TimeUnit.SECONDS
+
+ implicit def pairIntToDuration(p: (Int, TimeUnit)): Duration = Duration(p._1, p._2)
+ implicit def pairLongToDuration(p: (Long, TimeUnit)): FiniteDuration = Duration(p._1, p._2)
+ implicit def durationToPair(d: Duration): (Long, TimeUnit) = (d.length, d.unit)
+
+ final class DurationInt(val n: Int) extends DurationConversions {
+ override protected def durationIn(unit: TimeUnit): FiniteDuration = Duration(n, unit)
+ }
+ implicit def intToDurationInt(n: Int) = new DurationInt(n)
+
+ final class DurationLong(val n: Long) extends DurationConversions {
+ override protected def durationIn(unit: TimeUnit): FiniteDuration = Duration(n, unit)
+ }
+ implicit def longToDurationLong(n: Long) = new DurationLong(n)
+
+ final class DurationDouble(val d: Double) extends DurationConversions {
+ override protected def durationIn(unit: TimeUnit): FiniteDuration =
+ Duration(d, unit) match {
+ case f: FiniteDuration => f
+ case _ => throw new IllegalArgumentException("Duration DSL not applicable to " + d)
+ }
+ }
+ implicit def doubleToDurationDouble(d: Double) = new DurationDouble(d)
+
+ /*
+ * Avoid reflection based invocation by using non-duck type
+ */
+ final class IntMult(val i: Int) {
+ def *(d: Duration) = d * i
+ def *(d: FiniteDuration) = d * i
+ }
+ implicit def intToIntMult(i: Int) = new IntMult(i)
+
+ final class LongMult(val i: Long) {
+ def *(d: Duration) = d * i
+ def *(d: FiniteDuration) = d * i
+ }
+ implicit def longToLongMult(i: Long) = new LongMult(i)
+
+ final class DoubleMult(val f: Double) {
+ def *(d: Duration) = d * f
+ }
+ implicit def doubleToDoubleMult(f: Double) = new DoubleMult(f)
+}
diff --git a/src/library/scala/concurrent/impl/AbstractPromise.java b/src/library/scala/concurrent/impl/AbstractPromise.java
new file mode 100644
index 0000000000..b8165b6cde
--- /dev/null
+++ b/src/library/scala/concurrent/impl/AbstractPromise.java
@@ -0,0 +1,40 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.impl;
+
+
+import scala.concurrent.util.Unsafe;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+
+
+abstract class AbstractPromise {
+ private volatile Object _ref;
+
+ final static long _refoffset;
+
+ static {
+ try {
+ _refoffset = Unsafe.instance.objectFieldOffset(AbstractPromise.class.getDeclaredField("_ref"));
+ } catch (Throwable t) {
+ throw new ExceptionInInitializerError(t);
+ }
+ }
+
+ protected final boolean updateState(Object oldState, Object newState) {
+ return Unsafe.instance.compareAndSwapObject(this, _refoffset, oldState, newState);
+ }
+
+ protected final Object getState() {
+ return _ref;
+ }
+
+ protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
+} \ No newline at end of file
diff --git a/src/library/scala/concurrent/impl/AdaptedRunnableAction.java b/src/library/scala/concurrent/impl/AdaptedRunnableAction.java
new file mode 100644
index 0000000000..3dff9a1895
--- /dev/null
+++ b/src/library/scala/concurrent/impl/AdaptedRunnableAction.java
@@ -0,0 +1,21 @@
+package scala.concurrent.impl;
+
+import scala.concurrent.forkjoin.ForkJoinTask;
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Adaptor for Runnables without results
+ */
+final class AdaptedRunnableAction extends ForkJoinTask<Void>
+ /*implements RunnableFuture<Void>*/ {
+ final Runnable runnable;
+ AdaptedRunnableAction(Runnable runnable) {
+ if (runnable == null) throw new NullPointerException();
+ this.runnable = runnable;
+ }
+ public final Void getRawResult() { return null; }
+ public final void setRawResult(Void v) { }
+ public final boolean exec() { runnable.run(); return true; }
+ public final void run() { invoke(); }
+ private static final long serialVersionUID = 5232453952276885070L;
+}
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
new file mode 100644
index 0000000000..9a03b1d5df
--- /dev/null
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -0,0 +1,135 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.impl
+
+
+
+import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit, ThreadPoolExecutor }
+import java.util.Collection
+import scala.concurrent.forkjoin._
+import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService }
+import scala.util.control.NonFatal
+
+
+
+private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContextExecutor {
+
+ val executor: Either[ForkJoinPool, Executor] = es match {
+ case null => createExecutor
+ case some => Right(some)
+ }
+
+ // Implement BlockContext on FJP threads
+ class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
+ def wire[T <: Thread](thread: T): T = {
+ thread.setDaemon(daemonic)
+ //Potentially set things like uncaught exception handler, name etc
+ thread
+ }
+
+ def newThread(runnable: Runnable): Thread = wire(new Thread(runnable))
+
+ def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext {
+ override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
+ var result: T = null.asInstanceOf[T]
+ ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
+ @volatile var isdone = false
+ override def block(): Boolean = {
+ result = try thunk finally { isdone = true }
+ true
+ }
+ override def isReleasable = isdone
+ }, true)
+ result
+ }
+ })
+ }
+
+ def createExecutor: Either[ForkJoinPool, Executor] = {
+
+ def getInt(name: String, f: String => Int): Int =
+ try f(System.getProperty(name)) catch { case e: Exception => Runtime.getRuntime.availableProcessors }
+ def range(floor: Int, desired: Int, ceiling: Int): Int =
+ if (ceiling < floor) range(ceiling, desired, floor) else scala.math.min(scala.math.max(desired, floor), ceiling)
+
+ val desiredParallelism = range(
+ getInt("scala.concurrent.context.minThreads", _.toInt),
+ getInt("scala.concurrent.context.numThreads", {
+ case null | "" => Runtime.getRuntime.availableProcessors
+ case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt
+ case other => other.toInt
+ }),
+ getInt("scala.concurrent.context.maxThreads", _.toInt))
+
+ val threadFactory = new DefaultThreadFactory(daemonic = true)
+
+ try {
+ val pool = new ForkJoinPool(
+ desiredParallelism,
+ threadFactory)
+ pool.setAsyncMode(true)
+ Left(pool)
+ } catch {
+ case NonFatal(t) =>
+ System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to ThreadPoolExecutor")
+ t.printStackTrace(System.err)
+ val exec = new ThreadPoolExecutor(
+ desiredParallelism,
+ desiredParallelism,
+ 5L,
+ TimeUnit.MINUTES,
+ new LinkedBlockingQueue[Runnable],
+ threadFactory
+ )
+ exec.allowCoreThreadTimeOut(true)
+ Right(exec)
+ }
+ }
+
+ def execute(runnable: Runnable): Unit = executor match {
+ case Left(fj) =>
+ Thread.currentThread match {
+ case fjw: ForkJoinWorkerThread if fjw.getPool eq fj =>
+ (runnable match {
+ case fjt: ForkJoinTask[_] => fjt
+ case _ => new AdaptedRunnableAction(runnable)
+ }).fork
+ case _ => fj.execute(runnable)
+ }
+ case Right(generic) => generic execute runnable
+ }
+
+ def reportFailure(t: Throwable) = reporter(t)
+}
+
+
+private[concurrent] object ExecutionContextImpl {
+
+ def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
+
+ def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService =
+ new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService {
+ final def asExecutorService: ExecutorService = executor.right.asInstanceOf[ExecutorService]
+ override def execute(command: Runnable) = executor.right.get.execute(command)
+ override def shutdown() { asExecutorService.shutdown() }
+ override def shutdownNow() = asExecutorService.shutdownNow()
+ override def isShutdown = asExecutorService.isShutdown
+ override def isTerminated = asExecutorService.isTerminated
+ override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit)
+ override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable)
+ override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t)
+ override def submit(runnable: Runnable) = asExecutorService.submit(runnable)
+ override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables)
+ override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit)
+ override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables)
+ override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit)
+ }
+}
+
+
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
new file mode 100644
index 0000000000..8c2a77c75f
--- /dev/null
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -0,0 +1,34 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.impl
+
+
+
+import scala.concurrent.ExecutionContext
+import scala.util.control.NonFatal
+import scala.util.{Try, Success, Failure}
+
+
+private[concurrent] object Future {
+ class PromiseCompletingRunnable[T](body: => T) extends Runnable {
+ val promise = new Promise.DefaultPromise[T]()
+
+ override def run() = {
+ promise complete {
+ try Success(body) catch { case NonFatal(e) => Failure(e) }
+ }
+ }
+ }
+
+ def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
+ val runnable = new PromiseCompletingRunnable(body)
+ executor.execute(runnable)
+ runnable.promise.future
+ }
+}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
new file mode 100644
index 0000000000..e9da45a079
--- /dev/null
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -0,0 +1,174 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.impl
+
+import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException }
+import scala.concurrent.duration.{ Duration, Deadline, FiniteDuration, NANOSECONDS }
+import scala.annotation.tailrec
+import scala.util.control.NonFatal
+import scala.util.{ Try, Success, Failure }
+
+private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
+ def future: this.type = this
+}
+
+/* Precondition: `executor` is prepared, i.e., `executor` has been returned from invocation of `prepare` on some other `ExecutionContext`.
+ */
+private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable {
+ // must be filled in before running it
+ var value: Try[T] = null
+
+ override def run() = {
+ require(value ne null) // must set value to non-null before running!
+ try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }
+ }
+
+ def executeWithValue(v: Try[T]): Unit = {
+ require(value eq null) // can't complete it twice
+ value = v
+ // Note that we cannot prepare the ExecutionContext at this point, since we might
+ // already be running on a different thread!
+ executor.execute(this)
+ }
+}
+
+private[concurrent] object Promise {
+
+ private def resolveTry[T](source: Try[T]): Try[T] = source match {
+ case Failure(t) => resolver(t)
+ case _ => source
+ }
+
+ private def resolver[T](throwable: Throwable): Try[T] = throwable match {
+ case t: scala.runtime.NonLocalReturnControl[_] => Success(t.value.asInstanceOf[T])
+ case t: scala.util.control.ControlThrowable => Failure(new ExecutionException("Boxed ControlThrowable", t))
+ case t: InterruptedException => Failure(new ExecutionException("Boxed InterruptedException", t))
+ case e: Error => Failure(new ExecutionException("Boxed Error", e))
+ case t => Failure(t)
+ }
+
+ /** Default promise implementation.
+ */
+ class DefaultPromise[T] extends AbstractPromise with Promise[T] { self =>
+ updateState(null, Nil) // Start at "No callbacks"
+
+ protected final def tryAwait(atMost: Duration): Boolean = {
+ @tailrec
+ def awaitUnsafe(deadline: Deadline, nextWait: FiniteDuration): Boolean = {
+ if (!isCompleted && nextWait > Duration.Zero) {
+ val ms = nextWait.toMillis
+ val ns = (nextWait.toNanos % 1000000l).toInt // as per object.wait spec
+
+ synchronized { if (!isCompleted) wait(ms, ns) }
+
+ awaitUnsafe(deadline, deadline.timeLeft)
+ } else
+ isCompleted
+ }
+ @tailrec
+ def awaitUnbounded(): Boolean = {
+ if (isCompleted) true
+ else {
+ synchronized { if (!isCompleted) wait() }
+ awaitUnbounded()
+ }
+ }
+
+ import Duration.Undefined
+ atMost match {
+ case u if u eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period")
+ case Duration.Inf => awaitUnbounded
+ case Duration.MinusInf => isCompleted
+ case f: FiniteDuration => if (f > Duration.Zero) awaitUnsafe(f.fromNow, f) else isCompleted
+ }
+ }
+
+ @throws(classOf[TimeoutException])
+ @throws(classOf[InterruptedException])
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
+ if (isCompleted || tryAwait(atMost)) this
+ else throw new TimeoutException("Futures timed out after [" + atMost + "]")
+
+ @throws(classOf[Exception])
+ def result(atMost: Duration)(implicit permit: CanAwait): T =
+ ready(atMost).value.get match {
+ case Failure(e) => throw e
+ case Success(r) => r
+ }
+
+ def value: Option[Try[T]] = getState match {
+ case c: Try[_] => Some(c.asInstanceOf[Try[T]])
+ case _ => None
+ }
+
+ override def isCompleted: Boolean = getState match { // Cheaper than boxing result into Option due to "def value"
+ case _: Try[_] => true
+ case _ => false
+ }
+
+ def tryComplete(value: Try[T]): Boolean = {
+ val resolved = resolveTry(value)
+ (try {
+ @tailrec
+ def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = {
+ getState match {
+ case raw: List[_] =>
+ val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
+ if (updateState(cur, v)) cur else tryComplete(v)
+ case _ => null
+ }
+ }
+ tryComplete(resolved)
+ } finally {
+ synchronized { notifyAll() } //Notify any evil blockers
+ }) match {
+ case null => false
+ case rs if rs.isEmpty => true
+ case rs => rs.foreach(r => r.executeWithValue(resolved)); true
+ }
+ }
+
+ def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
+ val preparedEC = executor.prepare
+ val runnable = new CallbackRunnable[T](preparedEC, func)
+
+ @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
+ def dispatchOrAddCallback(): Unit =
+ getState match {
+ case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]])
+ case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback()
+ }
+ dispatchOrAddCallback()
+ }
+ }
+
+ /** An already completed Future is given its result at creation.
+ *
+ * Useful in Future-composition when a value to contribute is already available.
+ */
+ final class KeptPromise[T](suppliedValue: Try[T]) extends Promise[T] {
+
+ val value = Some(resolveTry(suppliedValue))
+
+ override def isCompleted: Boolean = true
+
+ def tryComplete(value: Try[T]): Boolean = false
+
+ def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
+ val completedAs = value.get
+ val preparedEC = executor.prepare
+ (new CallbackRunnable(preparedEC, func)).executeWithValue(completedAs)
+ }
+
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
+
+ def result(atMost: Duration)(implicit permit: CanAwait): T = value.get.get
+ }
+
+}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
new file mode 100644
index 0000000000..cc69b0011c
--- /dev/null
+++ b/src/library/scala/concurrent/package.scala
@@ -0,0 +1,109 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala
+
+import scala.concurrent.duration.Duration
+import scala.annotation.implicitNotFound
+
+/** This package object contains primitives for concurrent and parallel programming.
+ */
+package object concurrent {
+ type ExecutionException = java.util.concurrent.ExecutionException
+ type CancellationException = java.util.concurrent.CancellationException
+ type TimeoutException = java.util.concurrent.TimeoutException
+
+ /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
+ *
+ * The result becomes available once the asynchronous computation is completed.
+ *
+ * @tparam T the type of the result
+ * @param body the asynchronous computation
+ * @param execctx the execution context on which the future is run
+ * @return the `Future` holding the result of the computation
+ */
+ def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)
+
+ /** Creates a promise object which can be completed with a value or an exception.
+ *
+ * @tparam T the type of the value in the promise
+ * @return the newly created `Promise` object
+ */
+ def promise[T](): Promise[T] = Promise[T]()
+
+ /** Used to designate a piece of code which potentially blocks, allowing the current [[BlockContext]] to adjust
+ * the runtime's behavior.
+ * Properly marking blocking code may improve performance or avoid deadlocks.
+ *
+ * Blocking on an [[Awaitable]] should be done using [[Await.result]] instead of `blocking`.
+ *
+ * @param body A piece of code which contains potentially blocking or long running calls.
+ * @throws `CancellationException` if the computation was cancelled
+ * @throws `InterruptedException` in the case that a wait within the blocking `body` was interrupted
+ */
+ @throws(classOf[Exception])
+ def blocking[T](body: =>T): T = BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)
+}
+
+package concurrent {
+ @implicitNotFound("Don't call `Awaitable` methods directly, use the `Await` object.")
+ sealed trait CanAwait
+
+ /**
+ * Internal usage only, implementation detail.
+ */
+ private[concurrent] object AwaitPermission extends CanAwait
+
+ /**
+ * `Await` is what is used to ensure proper handling of blocking for `Awaitable` instances.
+ */
+ object Await {
+ /**
+ * Await the "completed" state of an `Awaitable`.
+ *
+ * Although this method is blocking, the internal use of [[scala.concurrent.blocking blocking]] ensures that
+ * the underlying [[ExecutionContext]] is prepared to properly manage the blocking.
+ *
+ * @param awaitable
+ * the `Awaitable` to be awaited
+ * @param atMost
+ * maximum wait time, which may be negative (no waiting is done),
+ * [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, or a finite positive
+ * duration
+ * @return the `awaitable`
+ * @throws InterruptedException if the current thread is interrupted while waiting
+ * @throws TimeoutException if after waiting for the specified time this `Awaitable` is still not ready
+ * @throws IllegalArgumentException if `atMost` is [[scala.concurrent.duration.Duration.Undefined Duration.Undefined]]
+ */
+ @throws(classOf[TimeoutException])
+ @throws(classOf[InterruptedException])
+ def ready[T](awaitable: Awaitable[T], atMost: Duration): /*awaitable.type*/Awaitable[T] =
+ blocking(awaitable.ready(atMost)(AwaitPermission))
+
+ /**
+ * Await and return the result (of type `T`) of an `Awaitable`.
+ *
+ * Although this method is blocking, the internal use of [[scala.concurrent.blocking blocking]] ensures that
+ * the underlying [[ExecutionContext]] to properly detect blocking and ensure that there are no deadlocks.
+ *
+ * @param awaitable
+ * the `Awaitable` to be awaited
+ * @param atMost
+ * maximum wait time, which may be negative (no waiting is done),
+ * [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, or a finite positive
+ * duration
+ * @return the result value if `awaitable` is completed within the specific maximum wait time
+ * @throws InterruptedException if the current thread is interrupted while waiting
+ * @throws TimeoutException if after waiting for the specified time `awaitable` is still not ready
+ * @throws IllegalArgumentException if `atMost` is [[scala.concurrent.duration.Duration.Undefined Duration.Undefined]]
+ */
+ @throws(classOf[Exception])
+ def result[T](awaitable: Awaitable[T], atMost: Duration): T =
+ blocking(awaitable.result(atMost)(AwaitPermission))
+ }
+}
diff --git a/src/library/scala/concurrent/util/Unsafe.java b/src/library/scala/concurrent/util/Unsafe.java
new file mode 100644
index 0000000000..ef893c94d9
--- /dev/null
+++ b/src/library/scala/concurrent/util/Unsafe.java
@@ -0,0 +1,35 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.util;
+
+
+
+import java.lang.reflect.Field;
+
+
+
+public final class Unsafe {
+ public final static sun.misc.Unsafe instance;
+ static {
+ try {
+ sun.misc.Unsafe found = null;
+ for(Field field : sun.misc.Unsafe.class.getDeclaredFields()) {
+ if (field.getType() == sun.misc.Unsafe.class) {
+ field.setAccessible(true);
+ found = (sun.misc.Unsafe) field.get(null);
+ break;
+ }
+ }
+ if (found == null) throw new IllegalStateException("Can't find instance of sun.misc.Unsafe");
+ else instance = found;
+ } catch(Throwable t) {
+ throw new ExceptionInInitializerError(t);
+ }
+ }
+}
diff --git a/src/library/scala/util/Try.scala b/src/library/scala/util/Try.scala
new file mode 100644
index 0000000000..4bcccb6969
--- /dev/null
+++ b/src/library/scala/util/Try.scala
@@ -0,0 +1,216 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2008-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.util
+
+import scala.collection.Seq
+import scala.util.control.NonFatal
+
+/**
+ * The `Try` type represents a computation that may either result in an exception, or return a
+ * successfully computed value. It's similar to, but semantically different from the [[scala.util.Either]] type.
+ *
+ * Instances of `Try[T]`, are either an instance of [[scala.util.Success]][T] or [[scala.util.Failure]][T].
+ *
+ * For example, `Try` can be used to perform division on a user-defined input, without the need to do explicit
+ * exception-handling in all of the places that an exception might occur.
+ *
+ * Example:
+ * {{{
+ * import scala.util.{Try, Success, Failure}
+ *
+ * def divide: Try[Int] = {
+ * val dividend = Try(Console.readLine("Enter an Int that you'd like to divide:\n").toInt)
+ * val divisor = Try(Console.readLine("Enter an Int that you'd like to divide by:\n").toInt)
+ * val problem = dividend.flatMap(x => divisor.map(y => x/y))
+ * problem match {
+ * case Success(v) =>
+ * println("Result of " + dividend.get + "/"+ divisor.get +" is: " + v)
+ * Success(v)
+ * case Failure(e) =>
+ * println("You must've divided by zero or entered something that's not an Int. Try again!")
+ * println("Info from the exception: " + e.getMessage)
+ * divide
+ * }
+ * }
+ *
+ * }}}
+ *
+ * An important property of `Try` shown in the above example is its ability to ''pipeline'', or chain, operations,
+ * catching exceptions along the way. The `flatMap` and `map` combinators in the above example each essentially
+ * pass off either their successfully completed value, wrapped in the `Success` type for it to be further operated
+ * upon by the next combinator in the chain, or the exception wrapped in the `Failure` type usually to be simply
+ * passed on down the chain. Combinators such as `rescue` and `recover` are designed to provide some type of
+ * default behavior in the case of failure.
+ *
+ * ''Note'': only non-fatal exceptions are caught by the combinators on `Try` (see [[scala.util.control.NonFatal]]).
+ * Serious system errors, on the other hand, will be thrown.
+ *
+ * ''Note:'': all Try combinators will catch exceptions and return failure unless otherwise specified in the documentation.
+ *
+ * `Try` comes to the Scala standard library after years of use as an integral part of Twitter's stack.
+ *
+ * @author based on Twitter's original implementation in com.twitter.util.
+ * @since 2.10
+ */
+sealed abstract class Try[+T] {
+
+ /** Returns `true` if the `Try` is a `Failure`, `false` otherwise.
+ */
+ def isFailure: Boolean
+
+ /** Returns `true` if the `Try` is a `Success`, `false` otherwise.
+ */
+ def isSuccess: Boolean
+
+ /** Returns the value from this `Success` or the given `default` argument if this is a `Failure`.
+ *
+ * ''Note:'': This will throw an exception if it is not a success and default throws an exception.
+ */
+ def getOrElse[U >: T](default: => U): U =
+ if (isSuccess) get else default
+
+ /** Returns this `Try` if it's a `Success` or the given `default` argument if this is a `Failure`.
+ */
+ def orElse[U >: T](default: => Try[U]): Try[U] =
+ try if (isSuccess) this else default
+ catch {
+ case NonFatal(e) => Failure(e)
+ }
+
+ /** Returns the value from this `Success` or throws the exception if this is a `Failure`.
+ */
+ def get: T
+
+ /**
+ * Applies the given function `f` if this is a `Success`, otherwise returns `Unit` if this is a `Failure`.
+ *
+ * ''Note:'' If `f` throws, then this method may throw an exception.
+ */
+ def foreach[U](f: T => U): Unit
+
+ /**
+ * Returns the given function applied to the value from this `Success` or returns this if this is a `Failure`.
+ */
+ def flatMap[U](f: T => Try[U]): Try[U]
+
+ /**
+ * Maps the given function to the value from this `Success` or returns this if this is a `Failure`.
+ */
+ def map[U](f: T => U): Try[U]
+
+ /**
+ * Converts this to a `Failure` if the predicate is not satisfied.
+ */
+ def filter(p: T => Boolean): Try[T]
+
+ /**
+ * Applies the given function `f` if this is a `Failure`, otherwise returns this if this is a `Success`.
+ * This is like `flatMap` for the exception.
+ */
+ def recoverWith[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U]
+
+ /**
+ * Applies the given function `f` if this is a `Failure`, otherwise returns this if this is a `Success`.
+ * This is like map for the exception.
+ */
+ def recover[U >: T](f: PartialFunction[Throwable, U]): Try[U]
+
+ /**
+ * Returns `None` if this is a `Failure` or a `Some` containing the value if this is a `Success`.
+ */
+ def toOption: Option[T] = if (isSuccess) Some(get) else None
+
+ /**
+ * Transforms a nested `Try`, ie, a `Try` of type `Try[Try[T]]`,
+ * into an un-nested `Try`, ie, a `Try` of type `Try[T]`.
+ */
+ def flatten[U](implicit ev: T <:< Try[U]): Try[U]
+
+ /**
+ * Completes this `Try` with an exception wrapped in a `Success`. The exception is either the exception that the
+ * `Try` failed with (if a `Failure`) or an `UnsupportedOperationException`.
+ */
+ def failed: Try[Throwable]
+
+ /** Completes this `Try` by applying the function `f` to this if this is of type `Failure`, or conversely, by applying
+ * `s` if this is a `Success`.
+ */
+ def transform[U](s: T => Try[U], f: Throwable => Try[U]): Try[U] =
+ try this match {
+ case Success(v) => s(v)
+ case Failure(e) => f(e)
+ } catch {
+ case NonFatal(e) => Failure(e)
+ }
+
+}
+
+object Try {
+ /** Constructs a `Try` using the by-name parameter. This
+ * method will ensure any non-fatal exception is caught and a
+ * `Failure` object is returned.
+ */
+ def apply[T](r: => T): Try[T] =
+ try Success(r) catch {
+ case NonFatal(e) => Failure(e)
+ }
+
+}
+
+final case class Failure[+T](val exception: Throwable) extends Try[T] {
+ def isFailure: Boolean = true
+ def isSuccess: Boolean = false
+ def recoverWith[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] =
+ try {
+ if (f isDefinedAt exception) f(exception) else this
+ } catch {
+ case NonFatal(e) => Failure(e)
+ }
+ def get: T = throw exception
+ def flatMap[U](f: T => Try[U]): Try[U] = this.asInstanceOf[Try[U]]
+ def flatten[U](implicit ev: T <:< Try[U]): Try[U] = this.asInstanceOf[Try[U]]
+ def foreach[U](f: T => U): Unit = ()
+ def map[U](f: T => U): Try[U] = this.asInstanceOf[Try[U]]
+ def filter(p: T => Boolean): Try[T] = this
+ def recover[U >: T](rescueException: PartialFunction[Throwable, U]): Try[U] =
+ try {
+ if (rescueException isDefinedAt exception) {
+ Try(rescueException(exception))
+ } else this
+ } catch {
+ case NonFatal(e) => Failure(e)
+ }
+ def failed: Try[Throwable] = Success(exception)
+}
+
+
+final case class Success[+T](value: T) extends Try[T] {
+ def isFailure: Boolean = false
+ def isSuccess: Boolean = true
+ def recoverWith[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] = this
+ def get = value
+ def flatMap[U](f: T => Try[U]): Try[U] =
+ try f(value)
+ catch {
+ case NonFatal(e) => Failure(e)
+ }
+ def flatten[U](implicit ev: T <:< Try[U]): Try[U] = value
+ def foreach[U](f: T => U): Unit = f(value)
+ def map[U](f: T => U): Try[U] = Try[U](f(value))
+ def filter(p: T => Boolean): Try[T] = {
+ try {
+ if (p(value)) this
+ else Failure(new NoSuchElementException("Predicate does not hold for " + value))
+ } catch {
+ case NonFatal(e) => Failure(e)
+ }
+ }
+ def recover[U >: T](rescueException: PartialFunction[Throwable, U]): Try[U] = this
+ def failed: Try[Throwable] = Failure(new UnsupportedOperationException("Success.failed"))
+}
diff --git a/src/library/scala/util/control/NonFatal.scala b/src/library/scala/util/control/NonFatal.scala
new file mode 100644
index 0000000000..f364b3504c
--- /dev/null
+++ b/src/library/scala/util/control/NonFatal.scala
@@ -0,0 +1,45 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.util.control
+
+/**
+ * Extractor of non-fatal Throwables. Will not match fatal errors like `VirtualMachineError`
+ * (for example, `OutOfMemoryError`, a subclass of `VirtualMachineError`), `ThreadDeath`,
+ * `LinkageError`, `InterruptedException`, `ControlThrowable`, or `NotImplementedError`.
+ * However, `StackOverflowError` is matched, i.e. considered non-fatal.
+ *
+ * Note that [[scala.util.control.ControlThrowable]], an internal Throwable, is not matched by
+ * `NonFatal` (and would therefore be thrown).
+ *
+ * For example, all harmless Throwables can be caught by:
+ * {{{
+ * try {
+ * // dangerous stuff
+ * } catch {
+ * case NonFatal(e) => log.error(e, "Something not that bad.")
+ * // or
+ * case e if NonFatal(e) => log.error(e, "Something not that bad.")
+ * }
+ * }}}
+ */
+object NonFatal {
+ /**
+ * Returns true if the provided `Throwable` is to be considered non-fatal, or false if it is to be considered fatal
+ */
+ def apply(t: Throwable): Boolean = t match {
+ case _: StackOverflowError => true // StackOverflowError ok even though it is a VirtualMachineError
+ // VirtualMachineError includes OutOfMemoryError and other fatal errors
+ case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable => false
+ case _ => true
+ }
+ /**
+ * Returns Some(t) if NonFatal(t) == true, otherwise None
+ */
+ def unapply(t: Throwable): Option[Throwable] = if (apply(t)) Some(t) else None
+}
diff --git a/test/files/jvm/future-spec.flags b/test/files/jvm/future-spec.flags
new file mode 100644
index 0000000000..a7ed61704f
--- /dev/null
+++ b/test/files/jvm/future-spec.flags
@@ -0,0 +1 @@
+-Ydependent-method-types
diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala
new file mode 100644
index 0000000000..e5e3ef9e16
--- /dev/null
+++ b/test/files/jvm/future-spec/FutureTests.scala
@@ -0,0 +1,524 @@
+
+
+
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.concurrent.duration.Duration.Inf
+import scala.collection._
+import scala.runtime.NonLocalReturnControl
+import scala.util.{Try,Success,Failure}
+
+
+
+object FutureTests extends MinimalScalaTest {
+
+ /* some utils */
+
+ def testAsync(s: String)(implicit ec: ExecutionContext): Future[String] = s match {
+ case "Hello" => future { "World" }
+ case "Failure" => Future.failed(new RuntimeException("Expected exception; to test fault-tolerance"))
+ case "NoReply" => Promise[String]().future
+ }
+
+ val defaultTimeout = 5 seconds
+
+ /* future specification */
+
+ "A future with custom ExecutionContext" should {
+ "shouldHandleThrowables" in {
+ val ms = new mutable.HashSet[Throwable] with mutable.SynchronizedSet[Throwable]
+ val threadPool = java.util.concurrent.Executors.newFixedThreadPool(4)
+ implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(threadPool, {
+ t =>
+ ms += t
+ })
+
+ class ThrowableTest(m: String) extends Throwable(m)
+
+ val f1 = future[Any] {
+ throw new ThrowableTest("test")
+ }
+
+ intercept[ThrowableTest] {
+ Await.result(f1, defaultTimeout)
+ }
+
+ val latch = new TestLatch
+ val f2 = future {
+ Await.ready(latch, 5 seconds)
+ "success"
+ }
+ val f3 = f2 map { s => s.toUpperCase }
+
+ f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") }
+ f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") }
+
+ latch.open()
+
+ Await.result(f2, defaultTimeout) mustBe ("success")
+
+ f2 foreach { _ => throw new ThrowableTest("current thread foreach") }
+ f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") }
+
+ Await.result(f3, defaultTimeout) mustBe ("SUCCESS")
+
+ val waiting = future {
+ Thread.sleep(1000)
+ }
+ Await.ready(waiting, 2000 millis)
+
+ ms.size mustBe (4)
+ //FIXME should check
+ threadPool.shutdown()
+ }
+ }
+
+ "A future with global ExecutionContext" should {
+ import ExecutionContext.Implicits._
+
+ "compose with for-comprehensions" in {
+ def async(x: Int) = future { (x * 2).toString }
+ val future0 = future[Any] {
+ "five!".length
+ }
+
+ val future1 = for {
+ a <- future0.mapTo[Int] // returns 5
+ b <- async(a) // returns "10"
+ c <- async(7) // returns "14"
+ } yield b + "-" + c
+
+ val future2 = for {
+ a <- future0.mapTo[Int]
+ b <- (future { (a * 2).toString }).mapTo[Int]
+ c <- future { (7 * 2).toString }
+ } yield b + "-" + c
+
+ Await.result(future1, defaultTimeout) mustBe ("10-14")
+ assert(checkType(future1, manifest[String]))
+ intercept[ClassCastException] { Await.result(future2, defaultTimeout) }
+ }
+
+ "support pattern matching within a for-comprehension" in {
+ case class Req[T](req: T)
+ case class Res[T](res: T)
+ def async[T](req: Req[T]) = req match {
+ case Req(s: String) => future { Res(s.length) }
+ case Req(i: Int) => future { Res((i * 2).toString) }
+ }
+
+ val future1 = for {
+ Res(a: Int) <- async(Req("Hello"))
+ Res(b: String) <- async(Req(a))
+ Res(c: String) <- async(Req(7))
+ } yield b + "-" + c
+
+ val future2 = for {
+ Res(a: Int) <- async(Req("Hello"))
+ Res(b: Int) <- async(Req(a))
+ Res(c: Int) <- async(Req(7))
+ } yield b + "-" + c
+
+ Await.result(future1, defaultTimeout) mustBe ("10-14")
+ intercept[NoSuchElementException] { Await.result(future2, defaultTimeout) }
+ }
+
+ "recover from exceptions" in {
+ val future1 = Future(5)
+ val future2 = future1 map (_ / 0)
+ val future3 = future2 map (_.toString)
+
+ val future4 = future1 recover {
+ case e: ArithmeticException => 0
+ } map (_.toString)
+
+ val future5 = future2 recover {
+ case e: ArithmeticException => 0
+ } map (_.toString)
+
+ val future6 = future2 recover {
+ case e: MatchError => 0
+ } map (_.toString)
+
+ val future7 = future3 recover {
+ case e: ArithmeticException => "You got ERROR"
+ }
+
+ val future8 = testAsync("Failure")
+ val future9 = testAsync("Failure") recover {
+ case e: RuntimeException => "FAIL!"
+ }
+ val future10 = testAsync("Hello") recover {
+ case e: RuntimeException => "FAIL!"
+ }
+ val future11 = testAsync("Failure") recover {
+ case _ => "Oops!"
+ }
+
+ Await.result(future1, defaultTimeout) mustBe (5)
+ intercept[ArithmeticException] { Await.result(future2, defaultTimeout) }
+ intercept[ArithmeticException] { Await.result(future3, defaultTimeout) }
+ Await.result(future4, defaultTimeout) mustBe ("5")
+ Await.result(future5, defaultTimeout) mustBe ("0")
+ intercept[ArithmeticException] { Await.result(future6, defaultTimeout) }
+ Await.result(future7, defaultTimeout) mustBe ("You got ERROR")
+ intercept[RuntimeException] { Await.result(future8, defaultTimeout) }
+ Await.result(future9, defaultTimeout) mustBe ("FAIL!")
+ Await.result(future10, defaultTimeout) mustBe ("World")
+ Await.result(future11, defaultTimeout) mustBe ("Oops!")
+ }
+
+ "recoverWith from exceptions" in {
+ val o = new IllegalStateException("original")
+ val r = new IllegalStateException("recovered")
+
+ intercept[IllegalStateException] {
+ val failed = Future.failed[String](o) recoverWith {
+ case _ if false == true => Future.successful("yay!")
+ }
+ Await.result(failed, defaultTimeout)
+ } mustBe (o)
+
+ val recovered = Future.failed[String](o) recoverWith {
+ case _ => Future.successful("yay!")
+ }
+ Await.result(recovered, defaultTimeout) mustBe ("yay!")
+
+ intercept[IllegalStateException] {
+ val refailed = Future.failed[String](o) recoverWith {
+ case _ => Future.failed[String](r)
+ }
+ Await.result(refailed, defaultTimeout)
+ } mustBe (r)
+ }
+
+ "andThen like a boss" in {
+ val q = new java.util.concurrent.LinkedBlockingQueue[Int]
+ for (i <- 1 to 1000) {
+ val chained = future {
+ q.add(1); 3
+ } andThen {
+ case _ => q.add(2)
+ } andThen {
+ case Success(0) => q.add(Int.MaxValue)
+ } andThen {
+ case _ => q.add(3);
+ }
+ Await.result(chained, defaultTimeout) mustBe (3)
+ q.poll() mustBe (1)
+ q.poll() mustBe (2)
+ q.poll() mustBe (3)
+ q.clear()
+ }
+ }
+
+ "firstCompletedOf" in {
+ def futures = Vector.fill[Future[Int]](10) {
+ Promise[Int]().future
+ } :+ Future.successful[Int](5)
+
+ Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5)
+ Await.result(Future.firstCompletedOf(futures.iterator), defaultTimeout) mustBe (5)
+ }
+
+ "find" in {
+ val futures = for (i <- 1 to 10) yield future {
+ i
+ }
+
+ val result = Future.find[Int](futures)(_ == 3)
+ Await.result(result, defaultTimeout) mustBe (Some(3))
+
+ val notFound = Future.find[Int](futures.iterator)(_ == 11)
+ Await.result(notFound, defaultTimeout) mustBe (None)
+ }
+
+ "zip" in {
+ val timeout = 10000 millis
+ val f = new IllegalStateException("test")
+ intercept[IllegalStateException] {
+ val failed = Future.failed[String](f) zip Future.successful("foo")
+ Await.result(failed, timeout)
+ } mustBe (f)
+
+ intercept[IllegalStateException] {
+ val failed = Future.successful("foo") zip Future.failed[String](f)
+ Await.result(failed, timeout)
+ } mustBe (f)
+
+ intercept[IllegalStateException] {
+ val failed = Future.failed[String](f) zip Future.failed[String](f)
+ Await.result(failed, timeout)
+ } mustBe (f)
+
+ val successful = Future.successful("foo") zip Future.successful("foo")
+ Await.result(successful, timeout) mustBe (("foo", "foo"))
+ }
+
+ "fold" in {
+ val timeout = 10000 millis
+ def async(add: Int, wait: Int) = future {
+ Thread.sleep(wait)
+ add
+ }
+
+ val futures = (0 to 9) map {
+ idx => async(idx, idx * 20)
+ }
+ val folded = Future.fold(futures)(0)(_ + _)
+ Await.result(folded, timeout) mustBe (45)
+
+ val futuresit = (0 to 9) map {
+ idx => async(idx, idx * 20)
+ }
+ val foldedit = Future.fold(futures)(0)(_ + _)
+ Await.result(foldedit, timeout) mustBe (45)
+ }
+
+ "fold by composing" in {
+ val timeout = 10000 millis
+ def async(add: Int, wait: Int) = future {
+ Thread.sleep(wait)
+ add
+ }
+ def futures = (0 to 9) map {
+ idx => async(idx, idx * 20)
+ }
+ val folded = futures.foldLeft(Future(0)) {
+ case (fr, fa) => for (r <- fr; a <- fa) yield (r + a)
+ }
+ Await.result(folded, timeout) mustBe (45)
+ }
+
+ "fold with an exception" in {
+ val timeout = 10000 millis
+ def async(add: Int, wait: Int) = future {
+ Thread.sleep(wait)
+ if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
+ add
+ }
+ def futures = (0 to 9) map {
+ idx => async(idx, idx * 10)
+ }
+ val folded = Future.fold(futures)(0)(_ + _)
+ intercept[IllegalArgumentException] {
+ Await.result(folded, timeout)
+ }.getMessage mustBe ("shouldFoldResultsWithException: expected")
+ }
+
+ "fold mutable zeroes safely" in {
+ import scala.collection.mutable.ArrayBuffer
+ def test(testNumber: Int) {
+ val fs = (0 to 1000) map (i => Future(i))
+ val f = Future.fold(fs)(ArrayBuffer.empty[AnyRef]) {
+ case (l, i) if i % 2 == 0 => l += i.asInstanceOf[AnyRef]
+ case (l, _) => l
+ }
+ val result = Await.result(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum
+
+ assert(result == 250500)
+ }
+
+ (1 to 100) foreach test //Make sure it tries to provoke the problem
+ }
+
+ "return zero value if folding empty list" in {
+ val zero = Future.fold(List[Future[Int]]())(0)(_ + _)
+ Await.result(zero, defaultTimeout) mustBe (0)
+ }
+
+ "shouldReduceResults" in {
+ def async(idx: Int) = future {
+ Thread.sleep(idx * 20)
+ idx
+ }
+ val timeout = 10000 millis
+
+ val futures = (0 to 9) map { async }
+ val reduced = Future.reduce(futures)(_ + _)
+ Await.result(reduced, timeout) mustBe (45)
+
+ val futuresit = (0 to 9) map { async }
+ val reducedit = Future.reduce(futuresit)(_ + _)
+ Await.result(reducedit, timeout) mustBe (45)
+ }
+
+ "shouldReduceResultsWithException" in {
+ def async(add: Int, wait: Int) = future {
+ Thread.sleep(wait)
+ if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
+ else add
+ }
+ val timeout = 10000 millis
+ def futures = (1 to 10) map {
+ idx => async(idx, idx * 10)
+ }
+ val failed = Future.reduce(futures)(_ + _)
+ intercept[IllegalArgumentException] {
+ Await.result(failed, timeout)
+ }.getMessage mustBe ("shouldFoldResultsWithException: expected")
+ }
+
+ "shouldReduceThrowNSEEOnEmptyInput" in {
+ intercept[java.util.NoSuchElementException] {
+ val emptyreduced = Future.reduce(List[Future[Int]]())(_ + _)
+ Await.result(emptyreduced, defaultTimeout)
+ }
+ }
+
+ "shouldTraverseFutures" in {
+ object counter {
+ var count = -1
+ def incAndGet() = counter.synchronized {
+ count += 2
+ count
+ }
+ }
+
+ val oddFutures = List.fill(100)(future { counter.incAndGet() })
+ val traversed = Future.sequence(oddFutures)
+ Await.result(traversed, defaultTimeout).sum mustBe (10000)
+
+ val list = (1 to 100).toList
+ val traversedList = Future.traverse(list)(x => Future(x * 2 - 1))
+ Await.result(traversedList, defaultTimeout).sum mustBe (10000)
+ }
+
+ "shouldBlockUntilResult" in {
+ val latch = new TestLatch
+
+ val f = future {
+ Await.ready(latch, 5 seconds)
+ 5
+ }
+ val f2 = future {
+ val res = Await.result(f, Inf)
+ res + 9
+ }
+
+ intercept[TimeoutException] {
+ Await.ready(f2, 100 millis)
+ }
+
+ latch.open()
+
+ Await.result(f2, defaultTimeout) mustBe (14)
+
+ val f3 = future {
+ Thread.sleep(100)
+ 5
+ }
+
+ intercept[TimeoutException] {
+ Await.ready(f3, 0 millis)
+ }
+ }
+
+ "run callbacks async" in {
+ val latch = Vector.fill(10)(new TestLatch)
+
+ val f1 = future {
+ latch(0).open()
+ Await.ready(latch(1), TestLatch.DefaultTimeout)
+ "Hello"
+ }
+ val f2 = f1 map {
+ s =>
+ latch(2).open()
+ Await.ready(latch(3), TestLatch.DefaultTimeout)
+ s.length
+ }
+ for (_ <- f2) latch(4).open()
+
+ Await.ready(latch(0), TestLatch.DefaultTimeout)
+
+ f1.isCompleted mustBe (false)
+ f2.isCompleted mustBe (false)
+
+ latch(1).open()
+ Await.ready(latch(2), TestLatch.DefaultTimeout)
+
+ f1.isCompleted mustBe (true)
+ f2.isCompleted mustBe (false)
+
+ val f3 = f1 map {
+ s =>
+ latch(5).open()
+ Await.ready(latch(6), TestLatch.DefaultTimeout)
+ s.length * 2
+ }
+ for (_ <- f3) latch(3).open()
+
+ Await.ready(latch(5), TestLatch.DefaultTimeout)
+
+ f3.isCompleted mustBe (false)
+
+ latch(6).open()
+ Await.ready(latch(4), TestLatch.DefaultTimeout)
+
+ f2.isCompleted mustBe (true)
+ f3.isCompleted mustBe (true)
+
+ val p1 = Promise[String]()
+ val f4 = p1.future map {
+ s =>
+ latch(7).open()
+ Await.ready(latch(8), TestLatch.DefaultTimeout)
+ s.length
+ }
+ for (_ <- f4) latch(9).open()
+
+ p1.future.isCompleted mustBe (false)
+ f4.isCompleted mustBe (false)
+
+ p1 complete Success("Hello")
+
+ Await.ready(latch(7), TestLatch.DefaultTimeout)
+
+ p1.future.isCompleted mustBe (true)
+ f4.isCompleted mustBe (false)
+
+ latch(8).open()
+ Await.ready(latch(9), TestLatch.DefaultTimeout)
+
+ Await.ready(f4, defaultTimeout)
+ f4.isCompleted mustBe (true)
+ }
+
+ "should not deadlock with nested await (ticket 1313)" in {
+ val simple = Future() map {
+ _ =>
+ val unit = Future(())
+ val umap = unit map { _ => () }
+ Await.result(umap, Inf)
+ }
+ Await.ready(simple, Inf)
+ simple.isCompleted mustBe (true)
+
+ val l1, l2 = new TestLatch
+ val complex = Future() map {
+ _ =>
+ blocking {
+ val nested = Future(())
+ for (_ <- nested) l1.open()
+ Await.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed
+ for (_ <- nested) l2.open()
+ Await.ready(l2, TestLatch.DefaultTimeout)
+ }
+ }
+ Await.ready(complex, defaultTimeout)
+ complex.isCompleted mustBe (true)
+ }
+
+ "should not throw when Await.ready" in {
+ val expected = try Success(5 / 0) catch { case a: ArithmeticException => Failure(a) }
+ val f = future(5).map(_ / 0)
+ Await.ready(f, defaultTimeout)
+ f.value.get.toString mustBe expected.toString
+ }
+
+ }
+
+}
+
+
diff --git a/test/files/jvm/future-spec/PromiseTests.scala b/test/files/jvm/future-spec/PromiseTests.scala
new file mode 100644
index 0000000000..0bd48f9dd6
--- /dev/null
+++ b/test/files/jvm/future-spec/PromiseTests.scala
@@ -0,0 +1,246 @@
+
+
+
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.concurrent.duration.Duration.Inf
+import scala.collection._
+import scala.runtime.NonLocalReturnControl
+import scala.util.{Try,Success,Failure}
+
+
+object PromiseTests extends MinimalScalaTest {
+ import ExecutionContext.Implicits._
+
+ val defaultTimeout = Inf
+
+ /* promise specification */
+
+ "An empty Promise" should {
+
+ "not be completed" in {
+ val p = Promise()
+ p.future.isCompleted mustBe (false)
+ p.isCompleted mustBe (false)
+ }
+
+ "have no value" in {
+ val p = Promise()
+ p.future.value mustBe (None)
+ p.isCompleted mustBe (false)
+ }
+
+ "return supplied value on timeout" in {
+ val failure = Promise.failed[String](new RuntimeException("br0ken")).future
+ val otherFailure = Promise.failed[String](new RuntimeException("last")).future
+ val empty = Promise[String]().future
+ val timedOut = Promise.successful[String]("Timedout").future
+
+ Await.result(failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout")
+ Await.result(timedOut fallbackTo empty, defaultTimeout) mustBe ("Timedout")
+ Await.result(failure fallbackTo failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout")
+ intercept[RuntimeException] {
+ Await.result(failure fallbackTo otherFailure, defaultTimeout)
+ }.getMessage mustBe ("last")
+ }
+
+ }
+
+ "A successful Promise" should {
+ val result = "test value"
+ val promise = Promise[String]().complete(Success(result))
+ promise.isCompleted mustBe (true)
+ futureWithResult(_(promise.future, result))
+ }
+
+ "A failed Promise" should {
+ val message = "Expected Exception"
+ val promise = Promise[String]().complete(Failure(new RuntimeException(message)))
+ promise.isCompleted mustBe (true)
+ futureWithException[RuntimeException](_(promise.future, message))
+ }
+
+ "An interrupted Promise" should {
+ val message = "Boxed InterruptedException"
+ val future = Promise[String]().complete(Failure(new InterruptedException(message))).future
+ futureWithException[ExecutionException](_(future, message))
+ }
+
+ "A NonLocalReturnControl failed Promise" should {
+ val result = "test value"
+ val future = Promise[String]().complete(Failure(new NonLocalReturnControl[String]("test", result))).future
+ futureWithResult(_(future, result))
+ }
+
+ def futureWithResult(f: ((Future[Any], Any) => Unit) => Unit) {
+
+ "be completed" in { f((future, _) => future.isCompleted mustBe (true)) }
+
+ "contain a value" in { f((future, result) => future.value mustBe (Some(Success(result)))) }
+
+ "return when ready with 'Await.ready'" in { f((future, result) => { Await.ready(future, defaultTimeout); future.isCompleted mustBe (true) }) }
+
+ "return result with 'Await.result'" in { f((future, result) => Await.result(future, defaultTimeout) mustBe (result)) }
+
+ "not timeout" in { f((future, _) => Await.ready(future, 0 millis)) }
+
+ "filter result" in {
+ f {
+ (future, result) =>
+ Await.result((future filter (_ => true)), defaultTimeout) mustBe (result)
+ intercept[NoSuchElementException] {
+ Await.result((future filter (_ => false)), defaultTimeout)
+ }
+ }
+ }
+
+ "transform result with map" in { f((future, result) => Await.result((future map (_.toString.length)), defaultTimeout) mustBe (result.toString.length)) }
+
+ "compose result with flatMap" in {
+ f { (future, result) =>
+ val r = for (r <- future; p <- Promise.successful("foo").future) yield r.toString + p
+ Await.result(r, defaultTimeout) mustBe (result.toString + "foo")
+ }
+ }
+
+ "perform action with foreach" in {
+ f {
+ (future, result) =>
+ val p = Promise[Any]()
+ future foreach p.success
+ Await.result(p.future, defaultTimeout) mustBe (result)
+ }
+ }
+
+ "zip properly" in {
+ f {
+ (future, result) =>
+ Await.result(future zip Promise.successful("foo").future, defaultTimeout) mustBe ((result, "foo"))
+ intercept[RuntimeException] {
+ Await.result(future zip Promise.failed(new RuntimeException("ohnoes")).future, defaultTimeout)
+ }.getMessage mustBe ("ohnoes")
+ }
+ }
+
+ "not recover from exception" in { f((future, result) => Await.result(future.recover({ case _ => "pigdog" }), defaultTimeout) mustBe (result)) }
+
+ "perform action on result" in {
+ f {
+ (future, result) =>
+ val p = Promise[Any]()
+ future.onSuccess { case x => p.success(x) }
+ Await.result(p.future, defaultTimeout) mustBe (result)
+ }
+ }
+
+ "not project a failure" in {
+ f {
+ (future, result) =>
+ intercept[NoSuchElementException] {
+ Await.result(future.failed, defaultTimeout)
+ }.getMessage mustBe ("Future.failed not completed with a throwable.")
+ }
+ }
+
+ "cast using mapTo" in {
+ f {
+ (future, result) =>
+ Await.result(future.mapTo[Boolean].recover({ case _: ClassCastException => false }), defaultTimeout) mustBe (false)
+ }
+ }
+
+ }
+
+ def futureWithException[E <: Throwable: Manifest](f: ((Future[Any], String) => Unit) => Unit) {
+
+ "be completed" in {
+ f((future, _) => future.isCompleted mustBe (true))
+ }
+
+ "contain a value" in {
+ f((future, message) => {
+ future.value.get.failed.get.getMessage mustBe (message)
+ })
+ }
+
+ "throw not throw exception with 'Await.ready'" in {
+ f {
+ (future, message) => Await.ready(future, defaultTimeout); future.isCompleted mustBe (true)
+ }
+ }
+
+ "throw exception with 'Await.result'" in {
+ f {
+ (future, message) =>
+ intercept[E] {
+ Await.result(future, defaultTimeout)
+ }.getMessage mustBe (message)
+ }
+ }
+
+ "retain exception with filter" in {
+ f {
+ (future, message) =>
+ intercept[E] { Await.result(future filter (_ => true), defaultTimeout) }.getMessage mustBe (message)
+ intercept[E] { Await.result(future filter (_ => false), defaultTimeout) }.getMessage mustBe (message)
+ }
+ }
+
+ "retain exception with map" in {
+ f {
+ (future, message) =>
+ intercept[E] { Await.result(future map (_.toString.length), defaultTimeout) }.getMessage mustBe (message)
+ }
+ }
+
+ "retain exception with flatMap" in {
+ f {
+ (future, message) =>
+ intercept[E] { Await.result(future flatMap (_ => Promise.successful("foo").future), defaultTimeout) }.getMessage mustBe (message)
+ }
+ }
+
+ "zip properly" in {
+ f {
+ (future, message) =>
+ intercept[E] {
+ Await.result(future zip Promise.successful("foo").future, defaultTimeout)
+ }.getMessage mustBe (message)
+ }
+ }
+
+ "recover from exception" in {
+ f {
+ (future, message) =>
+ Await.result(future.recover({ case e if e.getMessage == message => "pigdog" }), defaultTimeout) mustBe ("pigdog")
+ }
+ }
+
+ "project a failure" in {
+ f((future, message) => Await.result(future.failed, defaultTimeout).getMessage mustBe (message))
+ }
+
+ "perform action on exception" in {
+ f {
+ (future, message) =>
+ val p = Promise[Any]()
+ future.onFailure { case _ => p.success(message) }
+ Await.result(p.future, defaultTimeout) mustBe (message)
+ }
+ }
+
+ "always cast successfully using mapTo" in {
+ f {
+ (future, message) =>
+ intercept[E] { Await.result(future.mapTo[java.lang.Thread], defaultTimeout) }.getMessage mustBe (message)
+ }
+ }
+ }
+}
+
+
+
+
+
+
+
diff --git a/test/files/jvm/future-spec/TryTests.scala b/test/files/jvm/future-spec/TryTests.scala
new file mode 100644
index 0000000000..5d1b9b84b4
--- /dev/null
+++ b/test/files/jvm/future-spec/TryTests.scala
@@ -0,0 +1,130 @@
+// This is a port of the com.twitter.util Try spec.
+// --
+// It lives in the future-spec directory simply because it requires a specs-like
+// DSL which has already been minimally implemented for the future spec tests.
+
+import scala.util.{Try,Success,Failure}
+
+object TryTests extends MinimalScalaTest {
+ class MyException extends Exception
+ val e = new Exception("this is an exception")
+
+ "Try()" should {
+ "catch exceptions and lift into the Try type" in {
+ Try[Int](1) mustEqual Success(1)
+ Try[Int] { throw e } mustEqual Failure(e)
+ }
+ }
+
+ "Try" should {
+ "recoverWith" in {
+ val myException = new MyException
+ Success(1) recoverWith { case _ => Success(2) } mustEqual Success(1)
+ Failure(e) recoverWith { case _ => Success(2) } mustEqual Success(2)
+ Failure(e) recoverWith { case _ => Failure(e) } mustEqual Failure(e)
+ }
+
+ "getOrElse" in {
+ Success(1) getOrElse 2 mustEqual 1
+ Failure(e) getOrElse 2 mustEqual 2
+ }
+
+ "orElse" in {
+ Success(1) orElse Success(2) mustEqual Success(1)
+ Failure(e) orElse Success(2) mustEqual Success(2)
+ }
+
+ "map" in {
+ "when there is no exception" in {
+ Success(1) map(1+) mustEqual Success(2)
+ Failure[Int](e) map(1+) mustEqual Failure(e)
+ }
+
+ "when there is an exception" in {
+ Success(1) map(_ => throw e) mustEqual Failure(e)
+
+ val e2 = new Exception
+ Failure[Int](e) map(_ => throw e2) mustEqual Failure(e)
+ }
+ "when there is a fatal exception" in {
+ val e3 = new ThreadDeath
+ intercept[ThreadDeath] {
+ Success(1) map (_ => throw e3)
+ }
+ }
+ }
+
+ "flatMap" in {
+ "when there is no exception" in {
+ Success(1) flatMap(x => Success(1 + x)) mustEqual Success(2)
+ Failure[Int](e) flatMap(x => Success(1 + x)) mustEqual Failure(e)
+ }
+
+ "when there is an exception" in {
+ Success(1).flatMap[Int](_ => throw e) mustEqual Failure(e)
+
+ val e2 = new Exception
+ Failure[Int](e).flatMap[Int](_ => throw e2) mustEqual Failure(e)
+ }
+ "when there is a fatal exception" in {
+ val e3 = new ThreadDeath
+ intercept[ThreadDeath] {
+ Success(1).flatMap[Int](_ => throw e3)
+ }
+ }
+ }
+
+ "flatten" in {
+ "is a Success(Success)" in {
+ Success(Success(1)).flatten mustEqual Success(1)
+ }
+
+ "is a Success(Failure)" in {
+ val e = new Exception
+ Success(Failure(e)).flatten mustEqual Failure(e)
+ }
+
+ "is a Throw" in {
+ val e = new Exception
+ Failure[Try[Int]](e).flatten mustEqual Failure(e)
+ }
+ }
+
+ "for" in {
+ "with no Failure values" in {
+ val result = for {
+ i <- Success(1)
+ j <- Success(1)
+ } yield (i + j)
+ result mustEqual Success(2)
+ }
+
+ "with Failure values" in {
+ "throws before" in {
+ val result = for {
+ i <- Failure[Int](e)
+ j <- Success(1)
+ } yield (i + j)
+ result mustEqual Failure(e)
+ }
+
+ "throws after" in {
+ val result = for {
+ i <- Success(1)
+ j <- Failure[Int](e)
+ } yield (i + j)
+ result mustEqual Failure(e)
+ }
+
+ "returns the FIRST Failure" in {
+ val e2 = new Exception
+ val result = for {
+ i <- Failure[Int](e)
+ j <- Failure[Int](e2)
+ } yield (i + j)
+ result mustEqual Failure(e)
+ }
+ }
+ }
+ }
+}
diff --git a/test/files/jvm/future-spec/main.scala b/test/files/jvm/future-spec/main.scala
new file mode 100644
index 0000000000..90048ccda0
--- /dev/null
+++ b/test/files/jvm/future-spec/main.scala
@@ -0,0 +1,110 @@
+
+
+
+import scala.collection._
+import scala.concurrent._
+import scala.concurrent.duration.Duration
+import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit }
+
+
+object Test {
+
+ def main(args: Array[String]) {
+ FutureTests.check()
+ PromiseTests.check()
+ TryTests.check()
+ }
+
+}
+
+
+trait Output {
+ val buffer = new StringBuilder
+
+ def bufferPrintln(a: Any) = buffer.synchronized {
+ buffer.append(a.toString + "\n")
+ }
+}
+
+
+trait MinimalScalaTest extends Output {
+
+ val throwables = mutable.ArrayBuffer[Throwable]()
+
+ def check() {
+ if (throwables.nonEmpty) println(buffer.toString)
+ }
+
+ implicit def stringops(s: String) = new {
+
+ def should[U](snippets: =>U) = {
+ bufferPrintln(s + " should:")
+ snippets
+ }
+
+ def in[U](snippet: =>U) = {
+ try {
+ bufferPrintln("- " + s)
+ snippet
+ bufferPrintln("[OK] Test passed.")
+ } catch {
+ case e: Throwable =>
+ bufferPrintln("[FAILED] " + e)
+ bufferPrintln(e.getStackTrace().mkString("\n"))
+ throwables += e
+ }
+ }
+
+ }
+
+ implicit def objectops(obj: Any) = new {
+
+ def mustBe(other: Any) = assert(obj == other, obj + " is not " + other)
+ def mustEqual(other: Any) = mustBe(other)
+
+ }
+
+ def intercept[T <: Throwable: Manifest](body: =>Any): T = {
+ try {
+ body
+ throw new Exception("Exception of type %s was not thrown".format(manifest[T]))
+ } catch {
+ case t: Throwable =>
+ if (manifest[T].erasure != t.getClass) throw t
+ else t.asInstanceOf[T]
+ }
+ }
+
+ def checkType[T: Manifest, S](in: Future[T], refmanifest: Manifest[S]): Boolean = manifest[T] == refmanifest
+}
+
+
+object TestLatch {
+ val DefaultTimeout = Duration(5, TimeUnit.SECONDS)
+
+ def apply(count: Int = 1) = new TestLatch(count)
+}
+
+
+class TestLatch(count: Int = 1) extends Awaitable[Unit] {
+ private var latch = new CountDownLatch(count)
+
+ def countDown() = latch.countDown()
+ def isOpen: Boolean = latch.getCount == 0
+ def open() = while (!isOpen) countDown()
+ def reset() = latch = new CountDownLatch(count)
+
+ @throws(classOf[TimeoutException])
+ def ready(atMost: Duration)(implicit permit: CanAwait) = {
+ val opened = latch.await(atMost.toNanos, TimeUnit.NANOSECONDS)
+ if (!opened) throw new TimeoutException("Timeout of %s." format (atMost.toString))
+ this
+ }
+
+ @throws(classOf[Exception])
+ def result(atMost: Duration)(implicit permit: CanAwait): Unit = {
+ ready(atMost)
+ }
+
+}
+
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
new file mode 100644
index 0000000000..00ef91539d
--- /dev/null
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -0,0 +1,1036 @@
+import scala.concurrent.{
+ Future,
+ Promise,
+ TimeoutException,
+ SyncVar,
+ ExecutionException,
+ ExecutionContext,
+ CanAwait,
+ Await
+}
+import scala.concurrent.{ future, promise, blocking }
+import scala.util.{ Try, Success, Failure }
+import scala.concurrent.duration.Duration
+
+trait TestBase {
+
+ def intercept[T <: Exception](code: => Unit)(implicit cm: ClassManifest[Exception]): Unit =
+ try {
+ code
+ assert(false, "did not throw " + cm)
+ } catch {
+ case ex: Exception if cm.erasure isInstance ex =>
+ }
+
+ def once(body: (() => Unit) => Unit) {
+ val sv = new SyncVar[Boolean]
+ body(() => sv put true)
+ //sv.take(2000)
+ sv.take()
+ }
+
+ // def assert(cond: => Boolean) {
+ // try {
+ // Predef.assert(cond)
+ // } catch {
+ // case e => e.printStackTrace()
+ // }
+ // }
+
+}
+
+
+trait FutureCallbacks extends TestBase {
+ import ExecutionContext.Implicits._
+
+ def testOnSuccess(): Unit = once {
+ done =>
+ var x = 0
+ val f = future {
+ x = 1
+ }
+ f onSuccess {
+ case _ =>
+ done()
+ assert(x == 1)
+ }
+ }
+
+ def testOnSuccessWhenCompleted(): Unit = once {
+ done =>
+ var x = 0
+ val f = future {
+ x = 1
+ }
+ f onSuccess {
+ case _ =>
+ assert(x == 1)
+ x = 2
+ f onSuccess {
+ case _ =>
+ assert(x == 2)
+ done()
+ }
+ }
+ }
+
+ def testOnSuccessWhenFailed(): Unit = once {
+ done =>
+ val f = future[Unit] {
+ done()
+ throw new Exception
+ }
+ f onSuccess {
+ case _ => assert(false)
+ }
+ }
+
+ def testOnFailure(): Unit = once {
+ done =>
+ var x = 0
+ val f = future[Unit] {
+ x = 1
+ throw new Exception
+ }
+ f onSuccess {
+ case _ =>
+ done()
+ assert(false)
+ }
+ f onFailure {
+ case _ =>
+ done()
+ assert(x == 1)
+ }
+ }
+
+ def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once {
+ done =>
+ val f = future[Unit] {
+ throw cause
+ }
+ f onSuccess {
+ case _ =>
+ done()
+ assert(false)
+ }
+ f onFailure {
+ case e: ExecutionException if (e.getCause == cause) =>
+ done()
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testOnFailureWhenTimeoutException(): Unit = once {
+ done =>
+ val f = future[Unit] {
+ throw new TimeoutException()
+ }
+ f onSuccess {
+ case _ =>
+ done()
+ assert(false)
+ }
+ f onFailure {
+ case e: TimeoutException =>
+ done()
+ case other =>
+ done()
+ assert(false)
+ }
+ }
+
+ testOnSuccess()
+ testOnSuccessWhenCompleted()
+ testOnSuccessWhenFailed()
+ testOnFailure()
+ testOnFailureWhenSpecialThrowable(5, new Error)
+ // testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { })
+ //TODO: this test is currently problematic, because NonFatal does not match InterruptedException
+ //testOnFailureWhenSpecialThrowable(7, new InterruptedException)
+ testOnFailureWhenTimeoutException()
+
+}
+
+
+trait FutureCombinators extends TestBase {
+ import ExecutionContext.Implicits._
+
+ def testMapSuccess(): Unit = once {
+ done =>
+ val f = future { 5 }
+ val g = f map { x => "result: " + x }
+ g onSuccess {
+ case s =>
+ done()
+ assert(s == "result: 5")
+ }
+ g onFailure {
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testMapFailure(): Unit = once {
+ done =>
+ val f = future {
+ throw new Exception("exception message")
+ }
+ val g = f map { x => "result: " + x }
+ g onSuccess {
+ case _ =>
+ done()
+ assert(false)
+ }
+ g onFailure {
+ case t =>
+ done()
+ assert(t.getMessage() == "exception message")
+ }
+ }
+
+ def testMapSuccessPF(): Unit = once {
+ done =>
+ val f = future { 5 }
+ val g = f map { case r => "result: " + r }
+ g onSuccess {
+ case s =>
+ done()
+ assert(s == "result: 5")
+ }
+ g onFailure {
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testTransformSuccess(): Unit = once {
+ done =>
+ val f = future { 5 }
+ val g = f.transform(r => "result: " + r, identity)
+ g onSuccess {
+ case s =>
+ done()
+ assert(s == "result: 5")
+ }
+ g onFailure {
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testTransformSuccessPF(): Unit = once {
+ done =>
+ val f = future { 5 }
+ val g = f.transform( { case r => "result: " + r }, identity)
+ g onSuccess {
+ case s =>
+ done()
+ assert(s == "result: 5")
+ }
+ g onFailure {
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testFoldFailure(): Unit = once {
+ done =>
+ val f = future {
+ throw new Exception("exception message")
+ }
+ val g = f.transform(r => "result: " + r, identity)
+ g onSuccess {
+ case _ =>
+ done()
+ assert(false)
+ }
+ g onFailure {
+ case t =>
+ done()
+ assert(t.getMessage() == "exception message")
+ }
+ }
+
+ def testFlatMapSuccess(): Unit = once {
+ done =>
+ val f = future { 5 }
+ val g = f flatMap { _ => future { 10 } }
+ g onSuccess {
+ case x =>
+ done()
+ assert(x == 10)
+ }
+ g onFailure {
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testFlatMapFailure(): Unit = once {
+ done =>
+ val f = future {
+ throw new Exception("exception message")
+ }
+ val g = f flatMap { _ => future { 10 } }
+ g onSuccess {
+ case _ =>
+ done()
+ assert(false)
+ }
+ g onFailure {
+ case t =>
+ done()
+ assert(t.getMessage() == "exception message")
+ }
+ }
+
+ def testFilterSuccess(): Unit = once {
+ done =>
+ val f = future { 4 }
+ val g = f filter { _ % 2 == 0 }
+ g onSuccess {
+ case x: Int =>
+ done()
+ assert(x == 4)
+ }
+ g onFailure {
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testFilterFailure(): Unit = once {
+ done =>
+ val f = future { 4 }
+ val g = f filter { _ % 2 == 1 }
+ g onSuccess {
+ case x: Int =>
+ done()
+ assert(false)
+ }
+ g onFailure {
+ case e: NoSuchElementException =>
+ done()
+ assert(true)
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testCollectSuccess(): Unit = once {
+ done =>
+ val f = future { -5 }
+ val g = f collect {
+ case x if x < 0 => -x
+ }
+ g onSuccess {
+ case x: Int =>
+ done()
+ assert(x == 5)
+ }
+ g onFailure {
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testCollectFailure(): Unit = once {
+ done =>
+ val f = future { -5 }
+ val g = f collect {
+ case x if x > 0 => x * 2
+ }
+ g onSuccess {
+ case _ =>
+ done()
+ assert(false)
+ }
+ g onFailure {
+ case e: NoSuchElementException =>
+ done()
+ assert(true)
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ /* TODO: Test for NonFatal in collect (more of a regression test at this point).
+ */
+
+ def testForeachSuccess(): Unit = once {
+ done =>
+ val p = promise[Int]()
+ val f = future[Int] { 5 }
+ f foreach { x => p.success(x * 2) }
+ val g = p.future
+
+ g.onSuccess {
+ case res: Int =>
+ done()
+ assert(res == 10)
+ }
+ g.onFailure {
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testForeachFailure(): Unit = once {
+ done =>
+ val p = promise[Int]()
+ val f = future[Int] { throw new Exception }
+ f foreach { x => p.success(x * 2) }
+ f onFailure { case _ => p.failure(new Exception) }
+ val g = p.future
+
+ g.onSuccess {
+ case _ =>
+ done()
+ assert(false)
+ }
+ g.onFailure {
+ case _ =>
+ done()
+ assert(true)
+ }
+ }
+
+ def testRecoverSuccess(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ } recover {
+ case re: RuntimeException =>
+ "recovered"
+ }
+ f onSuccess {
+ case x =>
+ done()
+ assert(x == "recovered")
+ }
+ f onFailure { case any =>
+ done()
+ assert(false)
+ }
+ f
+ }
+
+ def testRecoverFailure(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ } recover {
+ case te: TimeoutException => "timeout"
+ }
+ f onSuccess {
+ case x =>
+ done()
+ assert(false)
+ }
+ f onFailure { case any =>
+ done()
+ assert(any == cause)
+ }
+ }
+
+ def testRecoverWithSuccess(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ } recoverWith {
+ case re: RuntimeException =>
+ future { "recovered" }
+ }
+ f onSuccess {
+ case x =>
+ done()
+ assert(x == "recovered")
+ }
+ f onFailure { case any =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testRecoverWithFailure(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ } recoverWith {
+ case te: TimeoutException =>
+ future { "timeout" }
+ }
+ f onSuccess {
+ case x =>
+ done()
+ assert(false)
+ }
+ f onFailure { case any =>
+ done()
+ assert(any == cause)
+ }
+ }
+
+ def testZipSuccess(): Unit = once {
+ done =>
+ val f = future { 5 }
+ val g = future { 6 }
+ val h = f zip g
+ h onSuccess {
+ case (l: Int, r: Int) =>
+ done()
+ assert(l+r == 11)
+ }
+ h onFailure {
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testZipFailureLeft(): Unit = once {
+ done =>
+ val cause = new Exception("exception message")
+ val f = future { throw cause }
+ val g = future { 6 }
+ val h = f zip g
+ h onSuccess {
+ case _ =>
+ done()
+ assert(false)
+ }
+ h onFailure {
+ case e: Exception =>
+ done()
+ assert(e.getMessage == "exception message")
+ }
+ }
+
+ def testZipFailureRight(): Unit = once {
+ done =>
+ val cause = new Exception("exception message")
+ val f = future { 5 }
+ val g = future { throw cause }
+ val h = f zip g
+ h onSuccess {
+ case _ =>
+ done()
+ assert(false)
+ }
+ h onFailure {
+ case e: Exception =>
+ done()
+ assert(e.getMessage == "exception message")
+ }
+ }
+
+ def testFallbackTo(): Unit = once {
+ done =>
+ val f = future { sys.error("failed") }
+ val g = future { 5 }
+ val h = f fallbackTo g
+
+ h onSuccess {
+ case x: Int =>
+ done()
+ assert(x == 5)
+ }
+ h onFailure {
+ case _ =>
+ done()
+ assert(false)
+ }
+ }
+
+ def testFallbackToFailure(): Unit = once {
+ done =>
+ val cause = new Exception
+ val f = future { sys.error("failed") }
+ val g = future { throw cause }
+ val h = f fallbackTo g
+
+ h onSuccess {
+ case _ =>
+ done()
+ assert(false)
+ }
+ h onFailure {
+ case e: Exception =>
+ done()
+ assert(e == cause)
+ }
+ }
+
+ testMapSuccess()
+ testMapFailure()
+ testFlatMapSuccess()
+ testFlatMapFailure()
+ testFilterSuccess()
+ testFilterFailure()
+ testCollectSuccess()
+ testCollectFailure()
+ testForeachSuccess()
+ testForeachFailure()
+ testRecoverSuccess()
+ testRecoverFailure()
+ testRecoverWithSuccess()
+ testRecoverWithFailure()
+ testZipSuccess()
+ testZipFailureLeft()
+ testZipFailureRight()
+ testFallbackTo()
+ testFallbackToFailure()
+}
+
+
+trait FutureProjections extends TestBase {
+ import ExecutionContext.Implicits._
+
+ def testFailedFailureOnComplete(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ f.failed onComplete {
+ case Success(t) =>
+ assert(t == cause)
+ done()
+ case Failure(t) =>
+ assert(false)
+ }
+ }
+
+ def testFailedFailureOnSuccess(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ f.failed onSuccess {
+ case t =>
+ assert(t == cause)
+ done()
+ }
+ }
+
+ def testFailedSuccessOnComplete(): Unit = once {
+ done =>
+ val f = future { 0 }
+ f.failed onComplete {
+ case Success(t) =>
+ assert(false)
+ case Failure(t) =>
+ assert(t.isInstanceOf[NoSuchElementException])
+ done()
+ }
+ }
+
+ def testFailedSuccessOnFailure(): Unit = once {
+ done =>
+ val f = future { 0 }
+ f.failed onFailure {
+ case nsee: NoSuchElementException =>
+ done()
+ }
+ }
+
+ def testFailedFailureAwait(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ assert(Await.result(f.failed, Duration(500, "ms")) == cause)
+ done()
+ }
+
+ def testFailedSuccessAwait(): Unit = once {
+ done =>
+ val f = future { 0 }
+ try {
+ Await.result(f.failed, Duration(500, "ms"))
+ assert(false)
+ } catch {
+ case nsee: NoSuchElementException => done()
+ }
+ }
+
+ def testAwaitPositiveDuration(): Unit = once { done =>
+ val p = Promise[Int]()
+ val f = p.future
+ future {
+ intercept[IllegalArgumentException] { Await.ready(f, Duration.Undefined) }
+ p.success(0)
+ Await.ready(f, Duration.Zero)
+ Await.ready(f, Duration(500, "ms"))
+ Await.ready(f, Duration.Inf)
+ done()
+ } onFailure { case x => throw x }
+ }
+
+ def testAwaitNegativeDuration(): Unit = once { done =>
+ val f = Promise().future
+ future {
+ intercept[TimeoutException] { Await.ready(f, Duration.Zero) }
+ intercept[TimeoutException] { Await.ready(f, Duration.MinusInf) }
+ intercept[TimeoutException] { Await.ready(f, Duration(-500, "ms")) }
+ done()
+ } onFailure { case x => throw x }
+ }
+
+ testFailedFailureOnComplete()
+ testFailedFailureOnSuccess()
+ testFailedSuccessOnComplete()
+ testFailedSuccessOnFailure()
+ testFailedFailureAwait()
+ testFailedSuccessAwait()
+ testAwaitPositiveDuration()
+ testAwaitNegativeDuration()
+
+}
+
+
+trait Blocking extends TestBase {
+ import ExecutionContext.Implicits._
+
+ def testAwaitSuccess(): Unit = once {
+ done =>
+ val f = future { 0 }
+ Await.result(f, Duration(500, "ms"))
+ done()
+ }
+
+ def testAwaitFailure(): Unit = once {
+ done =>
+ val cause = new RuntimeException
+ val f = future {
+ throw cause
+ }
+ try {
+ Await.result(f, Duration(500, "ms"))
+ assert(false)
+ } catch {
+ case t =>
+ assert(t == cause)
+ done()
+ }
+ }
+
+ def testFQCNForAwaitAPI(): Unit = once {
+ done =>
+
+ assert(classOf[CanAwait].getName == "scala.concurrent.CanAwait")
+ assert(Await.getClass.getName == "scala.concurrent.Await")
+
+ done()
+ }
+
+ testAwaitSuccess()
+ testAwaitFailure()
+ testFQCNForAwaitAPI()
+}
+
+trait BlockContexts extends TestBase {
+ import ExecutionContext.Implicits._
+ import scala.concurrent.{ Await, Awaitable, BlockContext }
+
+ private def getBlockContext(body: => BlockContext): BlockContext = {
+ Await.result(Future { body }, Duration(500, "ms"))
+ }
+
+ // test outside of an ExecutionContext
+ def testDefaultOutsideFuture(): Unit = {
+ val bc = BlockContext.current
+ assert(bc.getClass.getName.contains("DefaultBlockContext"))
+ }
+
+ // test BlockContext in our default ExecutionContext
+ def testDefaultFJP(): Unit = {
+ val bc = getBlockContext(BlockContext.current)
+ assert(bc.isInstanceOf[scala.concurrent.forkjoin.ForkJoinWorkerThread])
+ }
+
+ // test BlockContext inside BlockContext.withBlockContext
+ def testPushCustom(): Unit = {
+ val orig = BlockContext.current
+ val customBC = new BlockContext() {
+ override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = orig.blockOn(thunk)
+ }
+
+ val bc = getBlockContext({
+ BlockContext.withBlockContext(customBC) {
+ BlockContext.current
+ }
+ })
+
+ assert(bc eq customBC)
+ }
+
+ // test BlockContext after a BlockContext.push
+ def testPopCustom(): Unit = {
+ val orig = BlockContext.current
+ val customBC = new BlockContext() {
+ override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = orig.blockOn(thunk)
+ }
+
+ val bc = getBlockContext({
+ BlockContext.withBlockContext(customBC) {}
+ BlockContext.current
+ })
+
+ assert(bc ne customBC)
+ }
+
+ testDefaultOutsideFuture()
+ testDefaultFJP()
+ testPushCustom()
+ testPopCustom()
+}
+
+trait Promises extends TestBase {
+ import ExecutionContext.Implicits._
+
+ def testSuccess(): Unit = once {
+ done =>
+ val p = promise[Int]()
+ val f = p.future
+
+ f onSuccess {
+ case x =>
+ done()
+ assert(x == 5)
+ }
+ f onFailure {
+ case any =>
+ done()
+ assert(false)
+ }
+
+ p.success(5)
+ }
+
+ testSuccess()
+
+}
+
+
+trait Exceptions extends TestBase {
+ import ExecutionContext.Implicits._
+
+}
+
+trait CustomExecutionContext extends TestBase {
+ import scala.concurrent.{ ExecutionContext, Awaitable }
+
+ def defaultEC = ExecutionContext.global
+
+ val inEC = new java.lang.ThreadLocal[Int]() {
+ override def initialValue = 0
+ }
+
+ def enterEC() = inEC.set(inEC.get + 1)
+ def leaveEC() = inEC.set(inEC.get - 1)
+ def assertEC() = assert(inEC.get > 0)
+ def assertNoEC() = assert(inEC.get == 0)
+
+ class CountingExecutionContext extends ExecutionContext {
+ val _count = new java.util.concurrent.atomic.AtomicInteger(0)
+ def count = _count.get
+
+ def delegate = ExecutionContext.global
+
+ override def execute(runnable: Runnable) = {
+ _count.incrementAndGet()
+ val wrapper = new Runnable() {
+ override def run() = {
+ enterEC()
+ try {
+ runnable.run()
+ } finally {
+ leaveEC()
+ }
+ }
+ }
+ delegate.execute(wrapper)
+ }
+
+ override def reportFailure(t: Throwable): Unit = {
+ System.err.println("Failure: " + t.getClass.getSimpleName + ": " + t.getMessage)
+ delegate.reportFailure(t)
+ }
+ }
+
+ def countExecs(block: (ExecutionContext) => Unit): Int = {
+ val context = new CountingExecutionContext()
+ block(context)
+ context.count
+ }
+
+ def testOnSuccessCustomEC(): Unit = {
+ val count = countExecs { implicit ec =>
+ blocking {
+ once { done =>
+ val f = future({ assertNoEC() })(defaultEC)
+ f onSuccess {
+ case _ =>
+ assertEC()
+ done()
+ }
+ assertNoEC()
+ }
+ }
+ }
+
+ // should be onSuccess, but not future body
+ assert(count == 1)
+ }
+
+ def testKeptPromiseCustomEC(): Unit = {
+ val count = countExecs { implicit ec =>
+ blocking {
+ once { done =>
+ val f = Promise.successful(10).future
+ f onSuccess {
+ case _ =>
+ assertEC()
+ done()
+ }
+ }
+ }
+ }
+
+ // should be onSuccess called once in proper EC
+ assert(count == 1)
+ }
+
+ def testCallbackChainCustomEC(): Unit = {
+ val count = countExecs { implicit ec =>
+ blocking {
+ once { done =>
+ assertNoEC()
+ val addOne = { x: Int => assertEC(); x + 1 }
+ val f = Promise.successful(10).future
+ f.map(addOne).filter { x =>
+ assertEC()
+ x == 11
+ } flatMap { x =>
+ Promise.successful(x + 1).future.map(addOne).map(addOne)
+ } onComplete {
+ case Failure(t) =>
+ try {
+ throw new AssertionError("error in test: " + t.getMessage, t)
+ } finally {
+ done()
+ }
+ case Success(x) =>
+ assertEC()
+ assert(x == 14)
+ done()
+ }
+ assertNoEC()
+ }
+ }
+ }
+
+ // the count is not defined (other than >=1)
+ // due to the batching optimizations.
+ assert(count >= 1)
+ }
+
+ testOnSuccessCustomEC()
+ testKeptPromiseCustomEC()
+ testCallbackChainCustomEC()
+}
+
+trait ExecutionContextPrepare extends TestBase {
+ val theLocal = new ThreadLocal[String] {
+ override protected def initialValue(): String = ""
+ }
+
+ class PreparingExecutionContext extends ExecutionContext {
+ def delegate = ExecutionContext.global
+
+ override def execute(runnable: Runnable): Unit =
+ delegate.execute(runnable)
+
+ override def prepare(): ExecutionContext = {
+ // save object stored in ThreadLocal storage
+ val localData = theLocal.get
+ new PreparingExecutionContext {
+ override def execute(runnable: Runnable): Unit = {
+ val wrapper = new Runnable {
+ override def run(): Unit = {
+ // now we're on the new thread
+ // put localData into theLocal
+ theLocal.set(localData)
+ runnable.run()
+ }
+ }
+ delegate.execute(wrapper)
+ }
+ }
+ }
+
+ override def reportFailure(t: Throwable): Unit =
+ delegate.reportFailure(t)
+ }
+
+ implicit val ec = new PreparingExecutionContext
+
+ def testOnComplete(): Unit = once {
+ done =>
+ theLocal.set("secret")
+ val fut = future { 42 }
+ fut onComplete {
+ case _ =>
+ assert(theLocal.get == "secret")
+ done()
+ }
+ }
+
+ def testMap(): Unit = once {
+ done =>
+ theLocal.set("secret2")
+ val fut = future { 42 }
+ fut map { x =>
+ assert(theLocal.get == "secret2")
+ done()
+ }
+ }
+
+ testOnComplete()
+ testMap()
+}
+
+object Test
+extends App
+with FutureCallbacks
+with FutureCombinators
+with FutureProjections
+with Promises
+with BlockContexts
+with Exceptions
+with CustomExecutionContext
+with ExecutionContextPrepare
+{
+ System.exit(0)
+}
+
diff --git a/test/files/pos/t5958.flags b/test/files/pos/t5958.flags
new file mode 100644
index 0000000000..a7ed61704f
--- /dev/null
+++ b/test/files/pos/t5958.flags
@@ -0,0 +1 @@
+-Ydependent-method-types
diff --git a/test/files/pos/t5958.scala b/test/files/pos/t5958.scala
new file mode 100644
index 0000000000..b7eb8cf8df
--- /dev/null
+++ b/test/files/pos/t5958.scala
@@ -0,0 +1,15 @@
+class Test {
+ def newComponent(u: Universe): u.Component = throw new Exception("not implemented")
+
+ class Universe { self =>
+ class Component
+
+ newComponent(this): this.Component // error, but should be fine since this is a stable reference
+ newComponent(self): self.Component // error, but should be fine since this is a stable reference
+ newComponent(self): this.Component // error, but should be fine since this is a stable reference
+ newComponent(this): self.Component // error, but should be fine since this is a stable reference
+
+ val u = this
+ newComponent(u): u.Component // ok
+ }
+}