diff options
author | Adriaan Moors <adriaan.moors@typesafe.com> | 2012-12-13 15:02:51 -0800 |
---|---|---|
committer | Adriaan Moors <adriaan.moors@typesafe.com> | 2012-12-13 15:02:51 -0800 |
commit | eda88c84daae182266565d21b31ad3c93d9b3be5 (patch) | |
tree | f64dbc7b0a2f7986a7bbd4b9ff2b3bff56599e95 | |
parent | 8e3d23a950ced345eea108926cf35838e6c4befc (diff) | |
parent | f0bc3f765416b2ff89ebb5af5677ba08010306d3 (diff) | |
download | scala-eda88c84daae182266565d21b31ad3c93d9b3be5.tar.gz scala-eda88c84daae182266565d21b31ad3c93d9b3be5.tar.bz2 scala-eda88c84daae182266565d21b31ad3c93d9b3be5.zip |
Merge pull request #1764 from adriaanm/backport/sip14
SIP-14 backport to 2.9.x
32 files changed, 4925 insertions, 15 deletions
@@ -286,9 +286,9 @@ INITIALISATION <property name="scalac.args.optimise" value=""/> <!-- scalac.args.quickonly are added to quick.* targets but not others (particularly, locker.) This is to facilitate testing new command line options which do not yet exist in starr. --> - <property name="scalac.args.quickonly" value=""/> + <property name="scalac.args.quickonly" value="-Ydependent-method-types"/> <property name="scalac.args.all" value="${scalac.args} ${scalac.args.optimise}"/> - <property name="scalac.args.quick" value="${scalac.args.all} ${scalac.args.quickonly}"/> + <property name="scalac.args.quick" value="${scalac.args.all} ${scalac.args.quickonly}"/> <!-- Setting-up Ant contrib tasks --> <taskdef resource="net/sf/antcontrib/antlib.xml" classpath="${lib.dir}/ant/ant-contrib.jar"/> <!-- This is the start time for the distribution --> @@ -376,8 +376,9 @@ LOCAL REFERENCE BUILD (LOCKER) <javac srcdir="${src.dir}/library" destdir="${build-locker.dir}/classes/library" - classpath="${build-locker.dir}/classes/library" + classpathref="quick.compilation.path" includes="**/*.java" + excludes="scala/concurrent/**/*.java" target="1.5" source="1.5"> <compilerarg line="${javac.args}"/> </javac> @@ -392,6 +393,17 @@ LOCAL REFERENCE BUILD (LOCKER) srcdir="${src.dir}/library" jvmargs="${scalacfork.jvmargs}"> <include name="**/*.scala"/> + <!-- Exclude duration package and everything that depends on it --> + <exclude name="scala/concurrent/duration/**/*.scala"/> + <exclude name="scala/concurrent/package.scala"/> + <exclude name="scala/concurrent/Awaitable.scala"/> + <exclude name="scala/concurrent/Future.scala"/> + <exclude name="scala/concurrent/Promise.scala"/> + <exclude name="scala/concurrent/ExecutionContext.scala"/> + <exclude name="scala/concurrent/BlockContext.scala"/> + <exclude name="scala/concurrent/impl/Future.scala"/> + <exclude name="scala/concurrent/impl/Promise.scala"/> + <exclude name="scala/concurrent/impl/ExecutionContextImpl.scala"/> <compilationpath> <pathelement location="${build-locker.dir}/classes/library"/> <pathelement location="${lib.dir}/forkjoin.jar"/> @@ -546,7 +558,7 @@ QUICK BUILD (QUICK) <javac srcdir="${src.dir}/library" destdir="${build-quick.dir}/classes/library" - classpath="${build-quick.dir}/classes/library" + classpathref="quick.compilation.path" includes="**/*.java" target="1.5" source="1.5"> <compilerarg line="${javac.args}"/> @@ -1083,7 +1095,7 @@ BOOTSTRAPPING BUILD (STRAP) <javac srcdir="${src.dir}/library" destdir="${build-strap.dir}/classes/library" - classpath="${build-strap.dir}/classes/library" + classpathref="strap.compilation.path" includes="**/*.java" target="1.5" source="1.5"> <compilerarg line="${javac.args}"/> @@ -1100,7 +1112,7 @@ BOOTSTRAPPING BUILD (STRAP) destdir="${build-strap.dir}/classes/library" compilerpathref="pack.classpath" srcpath="${src.dir}/library" - params="${scalac.args.all}" + params="${scalac.args.quick}" srcdir="${src.dir}/library" jvmargs="${scalacfork.jvmargs}"> <include name="**/*.scala"/> @@ -1109,7 +1121,7 @@ BOOTSTRAPPING BUILD (STRAP) <scalacfork destdir="${build-strap.dir}/classes/library" compilerpathref="pack.classpath" - params="${scalac.args.all}" + params="${scalac.args.quick}" srcdir="${src.dir}/actors" jvmargs="${scalacfork.jvmargs}"> <include name="**/*.scala"/> @@ -1118,7 +1130,7 @@ BOOTSTRAPPING BUILD (STRAP) <scalacfork destdir="${build-strap.dir}/classes/library" compilerpathref="pack.classpath" - params="${scalac.args.all}" + params="${scalac.args.quick}" srcdir="${src.dir}/dbc" jvmargs="${scalacfork.jvmargs}"> <include name="**/*.scala"/> @@ -1127,7 +1139,7 @@ BOOTSTRAPPING BUILD (STRAP) <scalacfork destdir="${build-strap.dir}/classes/library" compilerpathref="pack.classpath" - params="${scalac.args.all}" + params="${scalac.args.quick}" srcdir="${src.dir}/swing" jvmargs="${scalacfork.jvmargs}"> <include name="**/*.scala"/> @@ -1163,7 +1175,7 @@ BOOTSTRAPPING BUILD (STRAP) <scalacfork destdir="${build-strap.dir}/classes/compiler" compilerpathref="pack.classpath" - params="${scalac.args.all}" + params="${scalac.args.quick}" srcdir="${src.dir}/compiler" jvmargs="${scalacfork.jvmargs}"> <include name="**/*.scala"/> @@ -1531,6 +1543,7 @@ DOCUMENTATION docUncompilable="${src.dir}/library-aux" sourcepath="${src.dir}" classpathref="pack.classpath" + addparams="-Ydependent-method-types" docRootContent="${build-docs.dir}/library/lib/rootdoc.txt"> <src> <files includes="${src.dir}/actors"/> diff --git a/src/compiler/scala/tools/nsc/ast/TreeGen.scala b/src/compiler/scala/tools/nsc/ast/TreeGen.scala index 268e104309..63c5b66243 100644 --- a/src/compiler/scala/tools/nsc/ast/TreeGen.scala +++ b/src/compiler/scala/tools/nsc/ast/TreeGen.scala @@ -124,6 +124,8 @@ abstract class TreeGen { /** Computes stable type for a tree if possible */ def stableTypeFor(tree: Tree): Option[Type] = tree match { + case This(_) if tree.symbol != null && !tree.symbol.isError => + Some(ThisType(tree.symbol)) case Ident(_) if tree.symbol.isStable => Some(singleType(tree.symbol.owner.thisType, tree.symbol)) case Select(qual, _) if ((tree.symbol ne null) && (qual.tpe ne null)) && // turned assert into guard for #4064 diff --git a/src/compiler/scala/tools/nsc/backend/icode/Members.scala b/src/compiler/scala/tools/nsc/backend/icode/Members.scala index 630f109bb6..54f66b96d8 100644 --- a/src/compiler/scala/tools/nsc/backend/icode/Members.scala +++ b/src/compiler/scala/tools/nsc/backend/icode/Members.scala @@ -154,6 +154,7 @@ trait Members { self: ICodes => var returnType: TypeKind = _ var recursive: Boolean = false + var bytecodeHasEHs = false // set by ICodeReader only, used by Inliner to prevent inlining (SI-6188) /** local variables and method parameters */ var locals: List[Local] = Nil diff --git a/src/compiler/scala/tools/nsc/backend/opt/Inliners.scala b/src/compiler/scala/tools/nsc/backend/opt/Inliners.scala index 82ca2d07d1..b08e849099 100644 --- a/src/compiler/scala/tools/nsc/backend/opt/Inliners.scala +++ b/src/compiler/scala/tools/nsc/backend/opt/Inliners.scala @@ -311,7 +311,7 @@ abstract class Inliners extends SubComponent { def isRecursive = m.recursive def hasCode = m.code != null def hasSourceFile = m.sourceFile != null - def hasHandlers = handlers.nonEmpty + def hasHandlers = handlers.nonEmpty || m.bytecodeHasEHs // the number of inlined calls in 'm', used by 'shouldInline' var inlinedCalls = 0 @@ -495,7 +495,7 @@ abstract class Inliners extends SubComponent { } def isStampedForInlining(stack: TypeStack) = - !sameSymbols && inc.hasCode && shouldInline && isSafeToInline(stack) + !sameSymbols && inc.hasCode && !inc.m.bytecodeHasEHs && shouldInline && isSafeToInline(stack) def logFailure(stack: TypeStack) = log( """|inline failed for %s: diff --git a/src/compiler/scala/tools/nsc/symtab/classfile/ICodeReader.scala b/src/compiler/scala/tools/nsc/symtab/classfile/ICodeReader.scala index ac2cd9e996..94de765f31 100644 --- a/src/compiler/scala/tools/nsc/symtab/classfile/ICodeReader.scala +++ b/src/compiler/scala/tools/nsc/symtab/classfile/ICodeReader.scala @@ -618,6 +618,7 @@ abstract class ICodeReader extends ClassfileParser { while (pc < codeLength) parseInstruction val exceptionEntries = in.nextChar.toInt + code.containsEHs = (exceptionEntries != 0) var i = 0 while (i < exceptionEntries) { // skip start end PC @@ -669,6 +670,7 @@ abstract class ICodeReader extends ClassfileParser { var containsDUPX = false var containsNEW = false + var containsEHs = false def emit(i: Instruction) { instrs += ((pc, i)) @@ -686,6 +688,7 @@ abstract class ICodeReader extends ClassfileParser { val code = new Code(method) method.setCode(code) + method.bytecodeHasEHs = containsEHs var bb = code.startBlock def makeBasicBlocks: mutable.Map[Int, BasicBlock] = diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala new file mode 100644 index 0000000000..652a23471f --- /dev/null +++ b/src/library/scala/concurrent/Awaitable.scala @@ -0,0 +1,64 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + + + +import scala.concurrent.duration.Duration + + + +/** + * An object that may eventually be completed with a result value of type `T` which may be + * awaited using blocking methods. + * + * The [[Await]] object provides methods that allow accessing the result of an `Awaitable` + * by blocking the current thread until the `Awaitable` has been completed or a timeout has + * occurred. + */ +trait Awaitable[+T] { + + /** + * Await the "completed" state of this `Awaitable`. + * + * '''''This method should not be called directly; use [[Await.ready]] instead.''''' + * + * @param atMost + * maximum wait time, which may be negative (no waiting is done), + * [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, or a finite positive + * duration + * @return this `Awaitable` + * @throws InterruptedException if the current thread is interrupted while waiting + * @throws TimeoutException if after waiting for the specified time this `Awaitable` is still not ready + * @throws IllegalArgumentException if `atMost` is [[scala.concurrent.duration.Duration.Undefined Duration.Undefined]] + */ + @throws(classOf[TimeoutException]) + @throws(classOf[InterruptedException]) + def ready(atMost: Duration)(implicit permit: CanAwait): this.type + + /** + * Await and return the result (of type `T`) of this `Awaitable`. + * + * '''''This method should not be called directly; use [[Await.result]] instead.''''' + * + * @param atMost + * maximum wait time, which may be negative (no waiting is done), + * [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, or a finite positive + * duration + * @return the result value if the `Awaitable` is completed within the specific maximum wait time + * @throws InterruptedException if the current thread is interrupted while waiting + * @throws TimeoutException if after waiting for the specified time this `Awaitable` is still not ready + * @throws IllegalArgumentException if `atMost` is [[scala.concurrent.duration.Duration.Undefined Duration.Undefined]] + */ + @throws(classOf[Exception]) + def result(atMost: Duration)(implicit permit: CanAwait): T +} + + + diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala new file mode 100644 index 0000000000..747cc393c3 --- /dev/null +++ b/src/library/scala/concurrent/BlockContext.scala @@ -0,0 +1,77 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + +/** + * A context to be notified by `scala.concurrent.blocking` when + * a thread is about to block. In effect this trait provides + * the implementation for `scala.concurrent.Await`. + * `scala.concurrent.Await.result()` and `scala.concurrent.Await.ready()` + * locates an instance of `BlockContext` by first looking for one + * provided through `BlockContext.withBlockContext()` and failing that, + * checking whether `Thread.currentThread` is an instance of `BlockContext`. + * So a thread pool can have its `java.lang.Thread` instances implement + * `BlockContext`. There's a default `BlockContext` used if the thread + * doesn't implement `BlockContext`. + * + * Typically, you'll want to chain to the previous `BlockContext`, + * like this: + * {{{ + * val oldContext = BlockContext.current + * val myContext = new BlockContext { + * override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = { + * // you'd have code here doing whatever you need to do + * // when the thread is about to block. + * // Then you'd chain to the previous context: + * oldContext.blockOn(thunk) + * } + * } + * BlockContext.withBlockContext(myContext) { + * // then this block runs with myContext as the handler + * // for scala.concurrent.blocking + * } + * }}} + */ +trait BlockContext { + + /** Used internally by the framework; + * Designates (and eventually executes) a thunk which potentially blocks the calling `Thread`. + * + * Clients must use `scala.concurrent.blocking` or `scala.concurrent.Await` instead. + */ + def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T +} + +object BlockContext { + private object DefaultBlockContext extends BlockContext { + override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = thunk + } + + private val contextLocal = new ThreadLocal[BlockContext]() + + /** Obtain the current thread's current `BlockContext`. */ + def current: BlockContext = contextLocal.get match { + case null => Thread.currentThread match { + case ctx: BlockContext => ctx + case _ => DefaultBlockContext + } + case some => some + } + + /** Pushes a current `BlockContext` while executing `body`. */ + def withBlockContext[T](blockContext: BlockContext)(body: => T): T = { + val old = contextLocal.get // can be null + try { + contextLocal.set(blockContext) + body + } finally { + contextLocal.set(old) + } + } +} diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala index 2a143aca72..e04c4f7fc9 100644 --- a/src/library/scala/concurrent/DelayedLazyVal.scala +++ b/src/library/scala/concurrent/DelayedLazyVal.scala @@ -9,8 +9,6 @@ package scala.concurrent -import ops.future - /** A <code>DelayedLazyVal</code> is a wrapper for lengthy * computations which have a valid partially computed result. * The first argument is a function for obtaining the result @@ -41,7 +39,7 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) { */ def apply(): T = if (isDone) complete else f() - future { + ops.future { body _isDone = true } diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala new file mode 100644 index 0000000000..e3a912d217 --- /dev/null +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -0,0 +1,88 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + + +import java.util.concurrent.{ Executor, ExecutorService } +import scala.annotation.implicitNotFound + +/** + * An `ExecutionContext` is an abstraction over an entity that can execute program logic. + */ +@implicitNotFound("Cannot find an implicit ExecutionContext, either require one yourself or import ExecutionContext.Implicits.global") +trait ExecutionContext { + + /** Runs a block of code on this execution context. + */ + def execute(runnable: Runnable): Unit + + /** Reports that an asynchronous computation failed. + */ + def reportFailure(t: Throwable): Unit + + /** Prepares for the execution of a task. Returns the prepared + * execution context. A valid implementation of `prepare` is one + * that simply returns `this`. + */ + def prepare(): ExecutionContext = this + +} + +/** + * Union interface since Java does not support union types + */ +trait ExecutionContextExecutor extends ExecutionContext with Executor + +/** + * Union interface since Java does not support union types + */ +trait ExecutionContextExecutorService extends ExecutionContextExecutor with ExecutorService + + +/** Contains factory methods for creating execution contexts. + */ +object ExecutionContext { + /** + * This is the explicit global ExecutionContext, + * call this when you want to provide the global ExecutionContext explicitly + */ + def global: ExecutionContextExecutor = Implicits.global + + object Implicits { + /** + * This is the implicit global ExecutionContext, + * import this when you want to provide the global ExecutionContext implicitly + */ + implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor) + } + + /** Creates an `ExecutionContext` from the given `ExecutorService`. + */ + def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService = + impl.ExecutionContextImpl.fromExecutorService(e, reporter) + + /** Creates an `ExecutionContext` from the given `ExecutorService` with the default Reporter. + */ + def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter) + + /** Creates an `ExecutionContext` from the given `Executor`. + */ + def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor = + impl.ExecutionContextImpl.fromExecutor(e, reporter) + + /** Creates an `ExecutionContext` from the given `Executor` with the default Reporter. + */ + def fromExecutor(e: Executor): ExecutionContextExecutor = fromExecutor(e, defaultReporter) + + /** The default reporter simply prints the stack trace of the `Throwable` to System.err. + */ + def defaultReporter: Throwable => Unit = (t: Throwable) => t.printStackTrace() +} + + diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala new file mode 100644 index 0000000000..ddd8e53578 --- /dev/null +++ b/src/library/scala/concurrent/Future.scala @@ -0,0 +1,691 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + +import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } +import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS } +import java.lang.{ Iterable => JIterable } +import java.util.{ LinkedList => JLinkedList } +import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } + +import scala.util.control.NonFatal +import scala.Option +import scala.util.{Try, Success, Failure} + +import scala.annotation.tailrec +import scala.collection.mutable.Builder +import scala.collection.generic.CanBuildFrom + + + +/** The trait that represents futures. + * + * Asynchronous computations that yield futures are created with the `future` call: + * + * {{{ + * val s = "Hello" + * val f: Future[String] = future { + * s + " future!" + * } + * f onSuccess { + * case msg => println(msg) + * } + * }}} + * + * @author Philipp Haller, Heather Miller, Aleksandar Prokopec, Viktor Klang + * + * @define multipleCallbacks + * Multiple callbacks may be registered; there is no guarantee that they will be + * executed in a particular order. + * + * @define caughtThrowables + * The future may contain a throwable object and this means that the future failed. + * Futures obtained through combinators have the same exception as the future they were obtained from. + * The following throwable objects are not contained in the future: + * - `Error` - errors are not contained within futures + * - `InterruptedException` - not contained within futures + * - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures + * + * Instead, the future is completed with a ExecutionException with one of the exceptions above + * as the cause. + * If a future is failed with a `scala.runtime.NonLocalReturnControl`, + * it is completed with a value from that throwable instead. + * + * @define nonDeterministic + * Note: using this method yields nondeterministic dataflow programs. + * + * @define forComprehensionExamples + * Example: + * + * {{{ + * val f = future { 5 } + * val g = future { 3 } + * val h = for { + * x: Int <- f // returns Future(5) + * y: Int <- g // returns Future(5) + * } yield x + y + * }}} + * + * is translated to: + * + * {{{ + * f flatMap { (x: Int) => g map { (y: Int) => x + y } } + * }}} + * + * @define callbackInContext + * The provided callback always runs in the provided implicit + *`ExecutionContext`, though there is no guarantee that the + * `execute()` method on the `ExecutionContext` will be called once + * per callback or that `execute()` will be called in the current + * thread. That is, the implementation may run multiple callbacks + * in a batch within a single `execute()` and it may run + * `execute()` either immediately or asynchronously. + */ +trait Future[+T] extends Awaitable[T] { + + // The executor within the lexical scope + // of the Future trait. Note that this will + // (modulo bugs) _never_ execute a callback + // other than those below in this same file. + // As a nice side benefit, having this implicit + // here forces an ambiguity in those methods + // that also have an executor parameter, which + // keeps us from accidentally forgetting to use + // the executor parameter. + private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor + + /* Callbacks */ + + /** When this future is completed successfully (i.e. with a value), + * apply the provided partial function to the value if the partial function + * is defined at that value. + * + * If the future has already been completed with a value, + * this will either be applied immediately or be scheduled asynchronously. + * + * $multipleCallbacks + * $callbackInContext + */ + def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete { + case Success(v) if pf isDefinedAt v => pf(v) + case _ => + }(executor) + + /** When this future is completed with a failure (i.e. with a throwable), + * apply the provided callback to the throwable. + * + * $caughtThrowables + * + * If the future has already been completed with a failure, + * this will either be applied immediately or be scheduled asynchronously. + * + * Will not be called in case that the future is completed with a value. + * + * $multipleCallbacks + * $callbackInContext + */ + def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete { + case Failure(t) if NonFatal(t) && callback.isDefinedAt(t) => callback(t) + case _ => + }(executor) + + /** When this future is completed, either through an exception, or a value, + * apply the provided function. + * + * If the future has already been completed, + * this will either be applied immediately or be scheduled asynchronously. + * + * $multipleCallbacks + * $callbackInContext + */ + def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit + + + /* Miscellaneous */ + + /** Returns whether the future has already been completed with + * a value or an exception. + * + * $nonDeterministic + * + * @return `true` if the future is already completed, `false` otherwise + */ + def isCompleted: Boolean + + /** The value of this `Future`. + * + * If the future is not completed the returned value will be `None`. + * If the future is completed the value will be `Some(Success(t))` + * if it contains a valid result, or `Some(Failure(error))` if it contains + * an exception. + */ + def value: Option[Try[T]] + + + /* Projections */ + + /** Returns a failed projection of this future. + * + * The failed projection is a future holding a value of type `Throwable`. + * + * It is completed with a value which is the throwable of the original future + * in case the original future is failed. + * + * It is failed with a `NoSuchElementException` if the original future is completed successfully. + * + * Blocking on this future returns a value if the original future is completed with an exception + * and throws a corresponding exception if the original future fails. + */ + def failed: Future[Throwable] = { + val p = Promise[Throwable]() + + onComplete { + case Failure(t) => p success t + case Success(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable.")) + } + + p.future + } + + + /* Monadic operations */ + + /** Asynchronously processes the value in the future once the value becomes available. + * + * Will not be called if the future fails. + */ + def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete { + case Success(r) => f(r) + case _ => // do nothing + }(executor) + + /** Creates a new future by applying the 's' function to the successful result of + * this future, or the 'f' function to the failed result. If there is any non-fatal + * exception thrown when 's' or 'f' is applied, that exception will be propagated + * to the resulting future. + * + * @param s function that transforms a successful result of the receiver into a + * successful result of the returned future + * @param f function that transforms a failure of the receiver into a failure of + * the returned future + * @return a future that will be completed with the transformed value + */ + def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = { + val p = Promise[S]() + + onComplete { + case result => + try { + result match { + case Failure(t) => p failure f(t) + case Success(r) => p success s(r) + } + } catch { + case NonFatal(t) => p failure t + } + }(executor) + + p.future + } + + /** Creates a new future by applying a function to the successful result of + * this future. If this future is completed with an exception then the new + * future will also contain this exception. + * + * $forComprehensionExamples + */ + def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity) + val p = Promise[S]() + + onComplete { + case result => + try { + result match { + case Success(r) => p success f(r) + case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] + } + } catch { + case NonFatal(t) => p failure t + } + }(executor) + + p.future + } + + /** Creates a new future by applying a function to the successful result of + * this future, and returns the result of the function as the new future. + * If this future is completed with an exception then the new future will + * also contain this exception. + * + * $forComprehensionExamples + */ + def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = { + val p = Promise[S]() + + onComplete { + case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] + case Success(v) => + try { + f(v).onComplete({ + case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] + case Success(v) => p success v + })(internalExecutor) + } catch { + case NonFatal(t) => p failure t + } + }(executor) + + p.future + } + + /** Creates a new future by filtering the value of the current future with a predicate. + * + * If the current future contains a value which satisfies the predicate, the new future will also hold that value. + * Otherwise, the resulting future will fail with a `NoSuchElementException`. + * + * If the current future fails, then the resulting future also fails. + * + * Example: + * {{{ + * val f = future { 5 } + * val g = f filter { _ % 2 == 1 } + * val h = f filter { _ % 2 == 0 } + * await(g, 0) // evaluates to 5 + * await(h, 0) // throw a NoSuchElementException + * }}} + */ + def filter(pred: T => Boolean)(implicit executor: ExecutionContext): Future[T] = { + val p = Promise[T]() + + onComplete { + case f: Failure[_] => p complete f.asInstanceOf[Failure[T]] + case Success(v) => + try { + if (pred(v)) p success v + else p failure new NoSuchElementException("Future.filter predicate is not satisfied") + } catch { + case NonFatal(t) => p failure t + } + }(executor) + + p.future + } + + /** Used by for-comprehensions. + */ + final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = filter(p)(executor) + // final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p) + + // final class FutureWithFilter[+S](self: Future[S], p: S => Boolean) { + // def foreach(f: S => Unit): Unit = self filter p foreach f + // def map[R](f: S => R) = self filter p map f + // def flatMap[R](f: S => Future[R]) = self filter p flatMap f + // def withFilter(q: S => Boolean): FutureWithFilter[S] = new FutureWithFilter[S](self, x => p(x) && q(x)) + // } + + /** Creates a new future by mapping the value of the current future, if the given partial function is defined at that value. + * + * If the current future contains a value for which the partial function is defined, the new future will also hold that value. + * Otherwise, the resulting future will fail with a `NoSuchElementException`. + * + * If the current future fails, then the resulting future also fails. + * + * Example: + * {{{ + * val f = future { -5 } + * val g = f collect { + * case x if x < 0 => -x + * } + * val h = f collect { + * case x if x > 0 => x * 2 + * } + * await(g, 0) // evaluates to 5 + * await(h, 0) // throw a NoSuchElementException + * }}} + */ + def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = { + val p = Promise[S]() + + onComplete { + case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] + case Success(v) => + try { + if (pf.isDefinedAt(v)) p success pf(v) + else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v) + } catch { + case NonFatal(t) => p failure t + } + }(executor) + + p.future + } + + /** Creates a new future that will handle any matching throwable that this + * future might contain. If there is no match, or if this future contains + * a valid result then the new future will contain the same. + * + * Example: + * + * {{{ + * future (6 / 0) recover { case e: ArithmeticException => 0 } // result: 0 + * future (6 / 0) recover { case e: NotFoundException => 0 } // result: exception + * future (6 / 2) recover { case e: ArithmeticException => 0 } // result: 3 + * }}} + */ + def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = { + val p = Promise[U]() + + onComplete { case tr => p.complete(tr recover pf) }(executor) + + p.future + } + + /** Creates a new future that will handle any matching throwable that this + * future might contain by assigning it a value of another future. + * + * If there is no match, or if this future contains + * a valid result then the new future will contain the same result. + * + * Example: + * + * {{{ + * val f = future { Int.MaxValue } + * future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue + * }}} + */ + def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = { + val p = Promise[U]() + + onComplete { + case Failure(t) if pf isDefinedAt t => + try { + p completeWith pf(t) + } catch { + case NonFatal(t) => p failure t + } + case otherwise => p complete otherwise + }(executor) + + p.future + } + + /** Zips the values of `this` and `that` future, and creates + * a new future holding the tuple of their results. + * + * If `this` future fails, the resulting future is failed + * with the throwable stored in `this`. + * Otherwise, if `that` future fails, the resulting future is failed + * with the throwable stored in `that`. + */ + def zip[U](that: Future[U]): Future[(T, U)] = { + val p = Promise[(T, U)]() + + this onComplete { + case f: Failure[_] => p complete f.asInstanceOf[Failure[(T, U)]] + case Success(r) => + that onSuccess { + case r2 => p success ((r, r2)) + } + that onFailure { + case f => p failure f + } + } + + p.future + } + + /** Creates a new future which holds the result of this future if it was completed successfully, or, if not, + * the result of the `that` future if `that` is completed successfully. + * If both futures are failed, the resulting future holds the throwable object of the first future. + * + * Using this method will not cause concurrent programs to become nondeterministic. + * + * Example: + * {{{ + * val f = future { sys.error("failed") } + * val g = future { 5 } + * val h = f fallbackTo g + * await(h, 0) // evaluates to 5 + * }}} + */ + def fallbackTo[U >: T](that: Future[U]): Future[U] = { + val p = Promise[U]() + onComplete { + case s @ Success(_) => p complete s + case _ => p completeWith that + } + p.future + } + + /** Creates a new `Future[S]` which is completed with this `Future`'s result if + * that conforms to `S`'s erased type or a `ClassCastException` otherwise. + */ + def mapTo[S](implicit tag: ClassManifest[S]): Future[S] = { + def boxedType(c: Class[_]): Class[_] = { + if (c.isPrimitive) Future.toBoxed(c) else c + } + + val p = Promise[S]() + + onComplete { + case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] + case Success(t) => + p complete (try { + Success(boxedType(tag.erasure).cast(t).asInstanceOf[S]) + } catch { + case e: ClassCastException => Failure(e) + }) + } + + p.future + } + + /** Applies the side-effecting function to the result of this future, and returns + * a new future with the result of this future. + * + * This method allows one to enforce that the callbacks are executed in a + * specified order. + * + * Note that if one of the chained `andThen` callbacks throws + * an exception, that exception is not propagated to the subsequent `andThen` + * callbacks. Instead, the subsequent `andThen` callbacks are given the original + * value of this future. + * + * The following example prints out `5`: + * + * {{{ + * val f = future { 5 } + * f andThen { + * case r => sys.error("runtime exception") + * } andThen { + * case Failure(t) => println(t) + * case Success(v) => println(v) + * } + * }}} + */ + def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = { + val p = Promise[T]() + + onComplete { + case r => try if (pf isDefinedAt r) pf(r) finally p complete r + }(executor) + + p.future + } + +} + + + +/** Future companion object. + * + * @define nonDeterministic + * Note: using this method yields nondeterministic dataflow programs. + */ +object Future { + + private[concurrent] val toBoxed = Map[Class[_], Class[_]]( + classOf[Boolean] -> classOf[java.lang.Boolean], + classOf[Byte] -> classOf[java.lang.Byte], + classOf[Char] -> classOf[java.lang.Character], + classOf[Short] -> classOf[java.lang.Short], + classOf[Int] -> classOf[java.lang.Integer], + classOf[Long] -> classOf[java.lang.Long], + classOf[Float] -> classOf[java.lang.Float], + classOf[Double] -> classOf[java.lang.Double], + classOf[Unit] -> classOf[scala.runtime.BoxedUnit] + ) + + /** Creates an already completed Future with the specified exception. + * + * @tparam T the type of the value in the future + * @return the newly created `Future` object + */ + def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future + + /** Creates an already completed Future with the specified result. + * + * @tparam T the type of the value in the future + * @return the newly created `Future` object + */ + def successful[T](result: T): Future[T] = Promise.successful(result).future + + /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. + * + * The result becomes available once the asynchronous computation is completed. + * + * @tparam T the type of the result + * @param body the asychronous computation + * @param execctx the execution context on which the future is run + * @return the `Future` holding the result of the computation + */ + def apply[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = impl.Future(body) + + /** Simple version of `Futures.traverse`. Transforms a `TraversableOnce[Future[A]]` into a `Future[TraversableOnce[A]]`. + * Useful for reducing many `Future`s into a single `Future`. + */ + def sequence[A, M[_] <: TraversableOnce[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[A], A, M[A]], executor: ExecutionContext): Future[M[A]] = { + in.foldLeft(Promise.successful(cbf()).future) { + (fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a) + } map (_.result) + } + + /** Returns a `Future` to the result of the first future in the list that is completed. + */ + def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { + val p = Promise[T]() + + val completeFirst: Try[T] => Unit = p tryComplete _ + futures.foreach(_ onComplete completeFirst) + + p.future + } + + /** Returns a `Future` that will hold the optional result of the first `Future` with a result that matches the predicate. + */ + def find[T](futurestravonce: TraversableOnce[Future[T]])(predicate: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { + val futures = futurestravonce.toBuffer + if (futures.isEmpty) Promise.successful[Option[T]](None).future + else { + val result = Promise[Option[T]]() + val ref = new AtomicInteger(futures.size) + val search: Try[T] => Unit = v => try { + v match { + case Success(r) => if (predicate(r)) result tryComplete Success(Some(r)) + case _ => + } + } finally { + if (ref.decrementAndGet == 0) { + result tryComplete Success(None) + } + } + + futures.foreach(_ onComplete search) + + result.future + } + } + + /** A non-blocking fold over the specified futures, with the start value of the given zero. + * The fold is performed on the thread where the last future is completed, + * the result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + * + * Example: + * {{{ + * val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds) + * }}} + */ + def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { + if (futures.isEmpty) Promise.successful(zero).future + else sequence(futures).map(_.foldLeft(zero)(foldFun)) + } + + /** Initiates a fold over the supplied futures where the fold-zero is the result value of the `Future` that's completed first. + * + * Example: + * {{{ + * val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds) + * }}} + */ + def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { + if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")).future + else sequence(futures).map(_ reduceLeft op) + } + + /** Transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]` using the provided function `A => Future[B]`. + * This is useful for performing a parallel map. For example, to apply a function to all items of a list + * in parallel: + * + * {{{ + * val myFutureList = Future.traverse(myList)(x => Future(myFunc(x))) + * }}} + */ + def traverse[A, B, M[_] <: TraversableOnce[_]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = + in.foldLeft(Promise.successful(cbf(in)).future) { (fr, a) => + val fb = fn(a.asInstanceOf[A]) + for (r <- fr; b <- fb) yield (r += b) + }.map(_.result) + + // This is used to run callbacks which are internal + // to scala.concurrent; our own callbacks are only + // ever used to eventually run another callback, + // and that other callback will have its own + // executor because all callbacks come with + // an executor. Our own callbacks never block + // and have no "expected" exceptions. + // As a result, this executor can do nothing; + // some other executor will always come after + // it (and sometimes one will be before it), + // and those will be performing the "real" + // dispatch to code outside scala.concurrent. + // Because this exists, ExecutionContext.defaultExecutionContext + // isn't instantiated by Future internals, so + // if some code for some reason wants to avoid + // ever starting up the default context, it can do so + // by just not ever using it itself. scala.concurrent + // doesn't need to create defaultExecutionContext as + // a side effect. + private[concurrent] object InternalCallbackExecutor extends ExecutionContext { + override def execute(runnable: Runnable): Unit = + runnable.run() + override def reportFailure(t: Throwable): Unit = + throw new IllegalStateException("problem in scala.concurrent internal callback", t) + } +} + +/** A marker indicating that a `java.lang.Runnable` provided to `scala.concurrent.ExecutionContext` + * wraps a callback provided to `Future.onComplete`. + * All callbacks provided to a `Future` end up going through `onComplete`, so this allows an + * `ExecutionContext` to special-case callbacks that were executed by `Future` if desired. + */ +trait OnCompleteRunnable { + self: Runnable => +} + diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala new file mode 100644 index 0000000000..8355a73a1f --- /dev/null +++ b/src/library/scala/concurrent/Promise.scala @@ -0,0 +1,152 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + +import scala.util.{ Try, Success, Failure } + +/** Promise is an object which can be completed with a value or failed + * with an exception. + * + * @define promiseCompletion + * If the promise has already been fulfilled, failed or has timed out, + * calling this method will throw an IllegalStateException. + * + * @define allowedThrowables + * If the throwable used to fail this promise is an error, a control exception + * or an interrupted exception, it will be wrapped as a cause within an + * `ExecutionException` which will fail the promise. + * + * @define nonDeterministic + * Note: Using this method may result in non-deterministic concurrent programs. + */ +trait Promise[T] { + + // used for internal callbacks defined in + // the lexical scope of this trait; + // _never_ for application callbacks. + private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor + + /** Future containing the value of this promise. + */ + def future: Future[T] + + /** Returns whether the promise has already been completed with + * a value or an exception. + * + * $nonDeterministic + * + * @return `true` if the promise is already completed, `false` otherwise + */ + def isCompleted: Boolean + + /** Completes the promise with either an exception or a value. + * + * @param result Either the value or the exception to complete the promise with. + * + * $promiseCompletion + */ + def complete(result: Try[T]): this.type = + if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.") + + /** Tries to complete the promise with either a value or the exception. + * + * $nonDeterministic + * + * @return If the promise has already been completed returns `false`, or `true` otherwise. + */ + def tryComplete(result: Try[T]): Boolean + + /** Completes this promise with the specified future, once that future is completed. + * + * @return This promise + */ + final def completeWith(other: Future[T]): this.type = { + other onComplete { this complete _ } + this + } + + /** Attempts to complete this promise with the specified future, once that future is completed. + * + * @return This promise + */ + final def tryCompleteWith(other: Future[T]): this.type = { + other onComplete { this tryComplete _ } + this + } + + /** Completes the promise with a value. + * + * @param v The value to complete the promise with. + * + * $promiseCompletion + */ + def success(v: T): this.type = complete(Success(v)) + + /** Tries to complete the promise with a value. + * + * $nonDeterministic + * + * @return If the promise has already been completed returns `false`, or `true` otherwise. + */ + def trySuccess(value: T): Boolean = tryComplete(Success(value)) + + /** Completes the promise with an exception. + * + * @param t The throwable to complete the promise with. + * + * $allowedThrowables + * + * $promiseCompletion + */ + def failure(t: Throwable): this.type = complete(Failure(t)) + + /** Tries to complete the promise with an exception. + * + * $nonDeterministic + * + * @return If the promise has already been completed returns `false`, or `true` otherwise. + */ + def tryFailure(t: Throwable): Boolean = tryComplete(Failure(t)) +} + + + +object Promise { + + /** Creates a promise object which can be completed with a value. + * + * @tparam T the type of the value in the promise + * @return the newly created `Promise` object + */ + def apply[T](): Promise[T] = new impl.Promise.DefaultPromise[T]() + + /** Creates an already completed Promise with the specified exception. + * + * @tparam T the type of the value in the promise + * @return the newly created `Promise` object + */ + def failed[T](exception: Throwable): Promise[T] = new impl.Promise.KeptPromise[T](Failure(exception)) + + /** Creates an already completed Promise with the specified result. + * + * @tparam T the type of the value in the promise + * @return the newly created `Promise` object + */ + def successful[T](result: T): Promise[T] = new impl.Promise.KeptPromise[T](Success(result)) + +} + + + + + + + + + diff --git a/src/library/scala/concurrent/duration/Deadline.scala b/src/library/scala/concurrent/duration/Deadline.scala new file mode 100644 index 0000000000..61cbe47530 --- /dev/null +++ b/src/library/scala/concurrent/duration/Deadline.scala @@ -0,0 +1,81 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.duration + +/** + * This class stores a deadline, as obtained via `Deadline.now` or the + * duration DSL: + * + * {{{ + * import scala.concurrent.duration._ + * 3.seconds.fromNow + * }}} + * + * Its main purpose is to manage repeated attempts to achieve something (like + * awaiting a condition) by offering the methods `hasTimeLeft` and `timeLeft`. All + * durations are measured according to `System.nanoTime` aka wall-time; this + * does not take into account changes to the system clock (such as leap + * seconds). + */ +case class Deadline private (time: FiniteDuration) extends Ordered[Deadline] { + /** + * Return a deadline advanced (i.e. moved into the future) by the given duration. + */ + def +(other: FiniteDuration): Deadline = copy(time = time + other) + /** + * Return a deadline moved backwards (i.e. towards the past) by the given duration. + */ + def -(other: FiniteDuration): Deadline = copy(time = time - other) + /** + * Calculate time difference between this and the other deadline, where the result is directed (i.e. may be negative). + */ + def -(other: Deadline): FiniteDuration = time - other.time + /** + * Calculate time difference between this duration and now; the result is negative if the deadline has passed. + * + * '''''Note that on some systems this operation is costly because it entails a system call.''''' + * Check `System.nanoTime` for your platform. + */ + def timeLeft: FiniteDuration = this - Deadline.now + /** + * Determine whether the deadline still lies in the future at the point where this method is called. + * + * '''''Note that on some systems this operation is costly because it entails a system call.''''' + * Check `System.nanoTime` for your platform. + */ + def hasTimeLeft(): Boolean = !isOverdue() + /** + * Determine whether the deadline lies in the past at the point where this method is called. + * + * '''''Note that on some systems this operation is costly because it entails a system call.''''' + * Check `System.nanoTime` for your platform. + */ + def isOverdue(): Boolean = (time.toNanos - System.nanoTime()) < 0 + /** + * The natural ordering for deadline is determined by the natural order of the underlying (finite) duration. + */ + def compare(other: Deadline) = time compare other.time +} + +object Deadline { + /** + * Construct a deadline due exactly at the point where this method is called. Useful for then + * advancing it to obtain a future deadline, or for sampling the current time exactly once and + * then comparing it to multiple deadlines (using subtraction). + */ + def now: Deadline = Deadline(Duration(System.nanoTime, NANOSECONDS)) + + /** + * The natural ordering for deadline is determined by the natural order of the underlying (finite) duration. + */ + implicit object DeadlineIsOrdered extends Ordering[Deadline] { + def compare(a: Deadline, b: Deadline) = a compare b + } + +} diff --git a/src/library/scala/concurrent/duration/Duration.scala b/src/library/scala/concurrent/duration/Duration.scala new file mode 100644 index 0000000000..5ac9887ae3 --- /dev/null +++ b/src/library/scala/concurrent/duration/Duration.scala @@ -0,0 +1,697 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.duration + +import java.lang.{ Double => JDouble, Long => JLong } + +object Duration { + + /** + * Construct a Duration from the given length and unit. Observe that nanosecond precision may be lost if + * + * - the unit is NANOSECONDS + * - and the length has an absolute value greater than 2^53 + * + * Infinite inputs (and NaN) are converted into [[Duration.Inf]], [[Duration.MinusInf]] and [[Duration.Undefined]], respectively. + * + * @throws IllegalArgumentException if the length was finite but the resulting duration cannot be expressed as a [[FiniteDuration]] + */ + def apply(length: Double, unit: TimeUnit): Duration = fromNanos(unit.toNanos(1) * length) + + /** + * Construct a finite duration from the given length and time unit. The unit given is retained + * throughout calculations as long as possible, so that it can be retrieved later. + */ + def apply(length: Long, unit: TimeUnit): FiniteDuration = new FiniteDuration(length, unit) + + /** + * Construct a finite duration from the given length and time unit, where the latter is + * looked up in a list of string representation. Valid choices are: + * + * `d, day, h, hour, min, minute, s, sec, second, ms, milli, millisecond, µs, micro, microsecond, ns, nano, nanosecond` + * and their pluralized forms (for every but the first mentioned form of each unit, i.e. no "ds", but "days"). + */ + def apply(length: Long, unit: String): FiniteDuration = new FiniteDuration(length, Duration.timeUnit(unit)) + + // Double stores 52 bits mantissa, but there is an implied '1' in front, making the limit 2^53 + private[this] final val maxPreciseDouble = 9007199254740992d + + /** + * Parse String into Duration. Format is `"<length><unit>"`, where + * whitespace is allowed before, between and after the parts. Infinities are + * designated by `"Inf"`, `"PlusInf"`, `"+Inf"` and `"-Inf"` or `"MinusInf"`. + * + * @throws NumberFormatException if format is not parseable + */ + def apply(s: String): Duration = { + val s1: String = s filterNot (_.isWhitespace) + s1 match { + case "Inf" | "PlusInf" | "+Inf" => Inf + case "MinusInf" | "-Inf" => MinusInf + case _ => + val unitName = s1.reverse takeWhile (_.isLetter) reverse; + timeUnit get unitName match { + case Some(unit) => + val valueStr = s1 dropRight unitName.length + val valueD = JDouble.parseDouble(valueStr) + if (valueD >= -maxPreciseDouble && valueD <= maxPreciseDouble) Duration(valueD, unit) + else Duration(JLong.parseLong(valueStr), unit) + case _ => throw new NumberFormatException("format error " + s) + } + } + } + + // "ms milli millisecond" -> List("ms", "milli", "millis", "millisecond", "milliseconds") + private[this] def words(s: String) = (s.trim split "\\s+").toList + private[this] def expandLabels(labels: String): List[String] = { + val hd :: rest = words(labels) + hd :: rest.flatMap(s => List(s, s + "s")) + } + private[this] val timeUnitLabels = List( + DAYS -> "d day", + HOURS -> "h hour", + MINUTES -> "min minute", + SECONDS -> "s sec second", + MILLISECONDS -> "ms milli millisecond", + MICROSECONDS -> "µs micro microsecond", + NANOSECONDS -> "ns nano nanosecond" + ) + + // TimeUnit => standard label + protected[duration] val timeUnitName: Map[TimeUnit, String] = + timeUnitLabels.toMap mapValues (s => words(s).last) toMap + + // Label => TimeUnit + protected[duration] val timeUnit: Map[String, TimeUnit] = + timeUnitLabels flatMap { case (unit, names) => expandLabels(names) map (_ -> unit) } toMap + + /** + * Extract length and time unit out of a string, where the format must match the description for [[Duration$.apply(String):Duration apply(String)]]. + * The extractor will not match for malformed strings or non-finite durations. + */ + def unapply(s: String): Option[(Long, TimeUnit)] = + ( try Some(apply(s)) catch { case _: RuntimeException => None } ) flatMap unapply + + /** + * Extract length and time unit out of a duration, if it is finite. + */ + def unapply(d: Duration): Option[(Long, TimeUnit)] = + if (d.isFinite) Some((d.length, d.unit)) else None + + /** + * Construct a possibly infinite or undefined Duration from the given number of nanoseconds. + * + * - `Double.PositiveInfinity` is mapped to [[Duration.Inf]] + * - `Double.NegativeInfinity` is mapped to [[Duration.MinusInf]] + * - `Double.NaN` is mapped to [[Duration.Undefined]] + * - `-0d` is mapped to [[Duration.Zero]] (exactly like `0d`) + * + * The semantics of the resulting Duration objects matches the semantics of their Double + * counterparts with respect to arithmetic operations. + * + * @throws IllegalArgumentException if the length was finite but the resulting duration cannot be expressed as a [[FiniteDuration]] + */ + def fromNanos(nanos: Double): Duration = { + if (nanos.isInfinite) + if (nanos > 0) Inf else MinusInf + else if (nanos.isNaN) + Undefined + else if (nanos > Long.MaxValue || nanos < Long.MinValue) + throw new IllegalArgumentException("trying to construct too large duration with " + nanos + "ns") + else + fromNanos((nanos + 0.5).toLong) + } + + private[this] final val µs_per_ns = 1000L + private[this] final val ms_per_ns = µs_per_ns * 1000 + private[this] final val s_per_ns = ms_per_ns * 1000 + private[this] final val min_per_ns = s_per_ns * 60 + private[this] final val h_per_ns = min_per_ns * 60 + private[this] final val d_per_ns = h_per_ns * 24 + + /** + * Construct a finite duration from the given number of nanoseconds. The + * result will have the coarsest possible time unit which can exactly express + * this duration. + * + * @throws IllegalArgumentException for `Long.MinValue` since that would lead to inconsistent behavior afterwards (cannot be negated) + */ + def fromNanos(nanos: Long): FiniteDuration = { + if (nanos % d_per_ns == 0) Duration(nanos / d_per_ns, DAYS) + else if (nanos % h_per_ns == 0) Duration(nanos / h_per_ns, HOURS) + else if (nanos % min_per_ns == 0) Duration(nanos / min_per_ns, MINUTES) + else if (nanos % s_per_ns == 0) Duration(nanos / s_per_ns, SECONDS) + else if (nanos % ms_per_ns == 0) Duration(nanos / ms_per_ns, MILLISECONDS) + else if (nanos % µs_per_ns == 0) Duration(nanos / µs_per_ns, MICROSECONDS) + else Duration(nanos, NANOSECONDS) + } + + /** + * Preconstructed value of `0.days`. + */ + // unit as coarse as possible to keep (_ + Zero) sane unit-wise + val Zero: FiniteDuration = new FiniteDuration(0, DAYS) + + /** + * The Undefined value corresponds closely to Double.NaN: + * + * - it is the result of otherwise invalid operations + * - it does not equal itself (according to `equals()`) + * - it compares greater than any other Duration apart from itself (for which `compare` returns 0) + * + * The particular comparison semantics mirror those of Double.NaN. + * + * '''''Use `eq` when checking an input of a method against this value.''''' + */ + val Undefined: Infinite = new Infinite { + override def toString = "Duration.Undefined" + override def equals(other: Any) = false + override def +(other: Duration): Duration = this + override def -(other: Duration): Duration = this + override def *(factor: Double): Duration = this + override def /(factor: Double): Duration = this + override def /(other: Duration): Double = Double.NaN + def compare(other: Duration) = if (other eq this) 0 else 1 + def unary_- : Duration = this + def toUnit(unit: TimeUnit): Double = Double.NaN + } + + sealed abstract class Infinite extends Duration { + def +(other: Duration): Duration = other match { + case x if x eq Undefined => Undefined + case x: Infinite if x ne this => Undefined + case _ => this + } + def -(other: Duration): Duration = other match { + case x if x eq Undefined => Undefined + case x: Infinite if x eq this => Undefined + case _ => this + } + + def *(factor: Double): Duration = + if (factor == 0d || factor.isNaN) Undefined + else if (factor < 0d) -this + else this + def /(divisor: Double): Duration = + if (divisor.isNaN || divisor.isInfinite) Undefined + else if ((divisor compare 0d) < 0) -this + else this + def /(divisor: Duration): Double = divisor match { + case _: Infinite => Double.NaN + case x => Double.PositiveInfinity * (if ((this > Zero) ^ (divisor >= Zero)) -1 else 1) + } + + final def isFinite() = false + + private[this] def fail(what: String) = throw new IllegalArgumentException(what + " not allowed on infinite Durations") + final def length: Long = fail("length") + final def unit: TimeUnit = fail("unit") + final def toNanos: Long = fail("toNanos") + final def toMicros: Long = fail("toMicros") + final def toMillis: Long = fail("toMillis") + final def toSeconds: Long = fail("toSeconds") + final def toMinutes: Long = fail("toMinutes") + final def toHours: Long = fail("toHours") + final def toDays: Long = fail("toDays") + } + + /** + * Infinite duration: greater than any other (apart from Undefined) and not equal to any other + * but itself. This value closely corresponds to Double.PositiveInfinity, + * matching its semantics in arithmetic operations. + */ + val Inf: Infinite = new Infinite { + override def toString = "Duration.Inf" + def compare(other: Duration) = other match { + case x if x eq Undefined => -1 // Undefined != Undefined + case x if x eq this => 0 // `case Inf` will include null checks in the byte code + case _ => 1 + } + def unary_- : Duration = MinusInf + def toUnit(unit: TimeUnit): Double = Double.PositiveInfinity + } + + /** + * Infinite duration: less than any other and not equal to any other + * but itself. This value closely corresponds to Double.NegativeInfinity, + * matching its semantics in arithmetic operations. + */ + val MinusInf: Infinite = new Infinite { + override def toString = "Duration.MinusInf" + def compare(other: Duration) = if (other eq this) 0 else -1 + def unary_- : Duration = Inf + def toUnit(unit: TimeUnit): Double = Double.NegativeInfinity + } + + // Java Factories + + /** + * Construct a finite duration from the given length and time unit. The unit given is retained + * throughout calculations as long as possible, so that it can be retrieved later. + */ + def create(length: Long, unit: TimeUnit): FiniteDuration = apply(length, unit) + /** + * Construct a Duration from the given length and unit. Observe that nanosecond precision may be lost if + * + * - the unit is NANOSECONDS + * - and the length has an absolute value greater than 2^53 + * + * Infinite inputs (and NaN) are converted into [[Duration.Inf]], [[Duration.MinusInf]] and [[Duration.Undefined]], respectively. + * + * @throws IllegalArgumentException if the length was finite but the resulting duration cannot be expressed as a [[FiniteDuration]] + */ + def create(length: Double, unit: TimeUnit): Duration = apply(length, unit) + /** + * Construct a finite duration from the given length and time unit, where the latter is + * looked up in a list of string representation. Valid choices are: + * + * `d, day, h, hour, min, minute, s, sec, second, ms, milli, millisecond, µs, micro, microsecond, ns, nano, nanosecond` + * and their pluralized forms (for every but the first mentioned form of each unit, i.e. no "ds", but "days"). + */ + def create(length: Long, unit: String): FiniteDuration = apply(length, unit) + /** + * Parse String into Duration. Format is `"<length><unit>"`, where + * whitespace is allowed before, between and after the parts. Infinities are + * designated by `"Inf"`, `"PlusInf"`, `"+Inf"` and `"-Inf"` or `"MinusInf"`. + * + * @throws NumberFormatException if format is not parseable + */ + def create(s: String): Duration = apply(s) + + /** + * The natural ordering of durations matches the natural ordering for Double, including non-finite values. + */ + implicit object DurationIsOrdered extends Ordering[Duration] { + def compare(a: Duration, b: Duration) = a compare b + } +} + +/** + * <h2>Utility for working with java.util.concurrent.TimeUnit durations.</h2> + * + * '''''This class is not meant as a general purpose representation of time, it is + * optimized for the needs of `scala.concurrent`.''''' + * + * <h2>Basic Usage</h2> + * + * <p/> + * Examples: + * {{{ + * import scala.concurrent.duration._ + * + * val duration = Duration(100, MILLISECONDS) + * val duration = Duration(100, "millis") + * + * duration.toNanos + * duration < 1.second + * duration <= Duration.Inf + * }}} + * + * '''''Invoking inexpressible conversions (like calling `toSeconds` on an infinite duration) will throw an IllegalArgumentException.''''' + * + * <p/> + * Implicits are also provided for Int, Long and Double. Example usage: + * {{{ + * import scala.concurrent.duration._ + * + * val duration = 100 millis + * }}} + * + * '''''The DSL provided by the implicit conversions always allows construction of finite durations, even for infinite Double inputs; use Duration.Inf instead.''''' + * + * Extractors, parsing and arithmetic are also included: + * {{{ + * val d = Duration("1.2 µs") + * val Duration(length, unit) = 5 millis + * val d2 = d * 2.5 + * val d3 = d2 + 1.millisecond + * }}} + * + * <h2>Handling of Time Units</h2> + * + * Calculations performed on finite durations always retain the more precise unit of either operand, no matter + * whether a coarser unit would be able to exactly express the same duration. This means that Duration can be + * used as a lossless container for a (length, unit) pair if it is constructed using the corresponding methods + * and no arithmetic is performed on it; adding/subtracting durations should in that case be done with care. + * + * <h2>Correspondence to Double Semantics</h2> + * + * The semantics of arithmetic operations on Duration are two-fold: + * + * - exact addition/subtraction with nanosecond resolution for finite durations, independent of the summands' magnitude + * - isomorphic to `java.lang.Double` when it comes to infinite or undefined values + * + * The conversion between Duration and Double is done using [[Duration.toUnit]] (with unit NANOSECONDS) + * and [[Duration$.fromNanos(Double):Duration Duration.fromNanos(Double)]]. + * + * <h2>Ordering</h2> + * + * The default ordering is consistent with the ordering of Double numbers, which means that Undefined is + * considered greater than all other durations, including [[Duration.Inf]]. + * + * @define exc @throws IllegalArgumentException when invoked on a non-finite duration + * + * @define ovf @throws IllegalArgumentException in case of a finite overflow: the range of a finite duration is +-(2^63-1)ns, and no conversion to infinite durations takes place. + */ +sealed abstract class Duration extends Serializable with Ordered[Duration] { + /** + * Obtain the length of this Duration measured in the unit obtained by the `unit` method. + * + * $exc + */ + def length: Long + /** + * Obtain the time unit in which the length of this duration is measured. + * + * $exc + */ + def unit: TimeUnit + /** + * Return the length of this duration measured in whole nanoseconds, rounding towards zero. + * + * $exc + */ + def toNanos: Long + /** + * Return the length of this duration measured in whole microseconds, rounding towards zero. + * + * $exc + */ + def toMicros: Long + /** + * Return the length of this duration measured in whole milliseconds, rounding towards zero. + * + * $exc + */ + def toMillis: Long + /** + * Return the length of this duration measured in whole seconds, rounding towards zero. + * + * $exc + */ + def toSeconds: Long + /** + * Return the length of this duration measured in whole minutes, rounding towards zero. + * + * $exc + */ + def toMinutes: Long + /** + * Return the length of this duration measured in whole hours, rounding towards zero. + * + * $exc + */ + def toHours: Long + /** + * Return the length of this duration measured in whole days, rounding towards zero. + * + * $exc + */ + def toDays: Long + /** + * Return the number of nanoseconds as floating point number, scaled down to the given unit. + * The result may not precisely represent this duration due to the Double datatype's inherent + * limitations (mantissa size effectively 53 bits). Non-finite durations are represented as + * - [[Duration.Undefined]] is mapped to Double.NaN + * - [[Duration.Inf]] is mapped to Double.PositiveInfinity + * - [[Duration.MinusInf]] is mapped to Double.NegativeInfinity + */ + def toUnit(unit: TimeUnit): Double + + /** + * Return the sum of that duration and this. When involving non-finite summands the semantics match those + * of Double. + * + * $ovf + */ + def +(other: Duration): Duration + /** + * Return the difference of that duration and this. When involving non-finite summands the semantics match those + * of Double. + * + * $ovf + */ + def -(other: Duration): Duration + /** + * Return this duration multiplied by the scalar factor. When involving non-finite factors the semantics match those + * of Double. + * + * $ovf + */ + def *(factor: Double): Duration + /** + * Return this duration divided by the scalar factor. When involving non-finite factors the semantics match those + * of Double. + * + * $ovf + */ + def /(divisor: Double): Duration + /** + * Return the quotient of this and that duration as floating-point number. The semantics are + * determined by Double as if calculating the quotient of the nanosecond lengths of both factors. + */ + def /(divisor: Duration): Double + /** + * Negate this duration. The only two values which are mapped to themselves are [[Duration.Zero]] and [[Duration.Undefined]]. + */ + def unary_- : Duration + /** + * This method returns whether this duration is finite, which is not the same as + * `!isInfinite` for Double because this method also returns `false` for [[Duration.Undefined]]. + */ + def isFinite(): Boolean + /** + * Return the smaller of this and that duration as determined by the natural ordering. + */ + def min(other: Duration): Duration = if (this < other) this else other + /** + * Return the larger of this and that duration as determined by the natural ordering. + */ + def max(other: Duration): Duration = if (this > other) this else other + + // Java API + + /** + * Return this duration divided by the scalar factor. When involving non-finite factors the semantics match those + * of Double. + * + * $ovf + */ + def div(divisor: Double) = this / divisor + /** + * Return the quotient of this and that duration as floating-point number. The semantics are + * determined by Double as if calculating the quotient of the nanosecond lengths of both factors. + */ + def div(other: Duration) = this / other + def gt(other: Duration) = this > other + def gteq(other: Duration) = this >= other + def lt(other: Duration) = this < other + def lteq(other: Duration) = this <= other + /** + * Return the difference of that duration and this. When involving non-finite summands the semantics match those + * of Double. + * + * $ovf + */ + def minus(other: Duration) = this - other + /** + * Return this duration multiplied by the scalar factor. When involving non-finite factors the semantics match those + * of Double. + * + * $ovf + */ + def mul(factor: Double) = this * factor + /** + * Negate this duration. The only two values which are mapped to themselves are [[Duration.Zero]] and [[Duration.Undefined]]. + */ + def neg() = -this + /** + * Return the sum of that duration and this. When involving non-finite summands the semantics match those + * of Double. + * + * $ovf + */ + def plus(other: Duration) = this + other +} + +object FiniteDuration { + + implicit object FiniteDurationIsOrdered extends Ordering[FiniteDuration] { + def compare(a: FiniteDuration, b: FiniteDuration) = a compare b + } + + def apply(length: Long, unit: TimeUnit) = new FiniteDuration(length, unit) + def apply(length: Long, unit: String) = new FiniteDuration(length, Duration.timeUnit(unit)) + + // limit on abs. value of durations in their units + private final val max_ns = Long.MaxValue + private final val max_µs = max_ns / 1000 + private final val max_ms = max_µs / 1000 + private final val max_s = max_ms / 1000 + private final val max_min= max_s / 60 + private final val max_h = max_min / 60 + private final val max_d = max_h / 24 +} + +/** + * This class represents a finite duration. Its addition and subtraction operators are overloaded to retain + * this guarantee statically. The range of this class is limited to +-(2^63-1)ns, which is roughly 292 years. + */ +final class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duration { + import FiniteDuration._ + import Duration._ + + private[this] def bounded(max: Long) = -max <= length && length <= max + + require(unit match { + /* + * enforce the 2^63-1 ns limit, must be pos/neg symmetrical because of unary_- + */ + case NANOSECONDS ⇒ bounded(max_ns) + case MICROSECONDS ⇒ bounded(max_µs) + case MILLISECONDS ⇒ bounded(max_ms) + case SECONDS ⇒ bounded(max_s) + case MINUTES ⇒ bounded(max_min) + case HOURS ⇒ bounded(max_h) + case DAYS ⇒ bounded(max_d) + case _ ⇒ + val v = DAYS.convert(length, unit) + -max_d <= v && v <= max_d + }, "Duration is limited to +-(2^63-1)ns (ca. 292 years)") + + def toNanos = unit.toNanos(length) + def toMicros = unit.toMicros(length) + def toMillis = unit.toMillis(length) + def toSeconds = unit.toSeconds(length) + def toMinutes = unit.toMinutes(length) + def toHours = unit.toHours(length) + def toDays = unit.toDays(length) + def toUnit(u: TimeUnit) = toNanos.toDouble / NANOSECONDS.convert(1, u) + + /** + * Construct a [[Deadline]] from this duration by adding it to the current instant `Deadline.now`. + */ + def fromNow: Deadline = Deadline.now + this + + private[this] def unitString = timeUnitName(unit) + ( if (length == 1) "" else "s" ) + override def toString = "" + length + " " + unitString + + def compare(other: Duration) = other match { + case x: FiniteDuration => toNanos compare x.toNanos + case _ => -(other compare this) + } + + // see https://www.securecoding.cert.org/confluence/display/java/NUM00-J.+Detect+or+prevent+integer+overflow + private[this] def safeAdd(a: Long, b: Long): Long = { + if ((b > 0) && (a > Long.MaxValue - b) || + (b < 0) && (a < Long.MinValue - b)) throw new IllegalArgumentException("integer overflow") + a + b + } + private[this] def add(otherLength: Long, otherUnit: TimeUnit): FiniteDuration = { + val commonUnit = if (otherUnit.convert(1, unit) == 0) unit else otherUnit + val totalLength = safeAdd(commonUnit.convert(length, unit), commonUnit.convert(otherLength, otherUnit)) + new FiniteDuration(totalLength, commonUnit) + } + + def +(other: Duration) = other match { + case x: FiniteDuration => add(x.length, x.unit) + case _ => other + } + def -(other: Duration) = other match { + case x: FiniteDuration => add(-x.length, x.unit) + case _ => other + } + + def *(factor: Double) = + if (!factor.isInfinite) fromNanos(toNanos * factor) + else if (factor.isNaN) Undefined + else if ((factor > 0) ^ (this < Zero)) Inf + else MinusInf + + def /(divisor: Double) = + if (!divisor.isInfinite) fromNanos(toNanos / divisor) + else if (divisor.isNaN) Undefined + else Zero + + // if this is made a constant, then scalac will elide the conditional and always return +0.0, SI-6331 + private[this] def minusZero = -0d + def /(divisor: Duration): Double = + if (divisor.isFinite) toNanos.toDouble / divisor.toNanos + else if (divisor eq Undefined) Double.NaN + else if ((length < 0) ^ (divisor > Zero)) 0d + else minusZero + + // overloaded methods taking FiniteDurations, so that you can calculate while statically staying finite + def +(other: FiniteDuration) = add(other.length, other.unit) + def -(other: FiniteDuration) = add(-other.length, other.unit) + def plus(other: FiniteDuration) = this + other + def minus(other: FiniteDuration) = this - other + def min(other: FiniteDuration) = if (this < other) this else other + def max(other: FiniteDuration) = if (this > other) this else other + + // overloaded methods taking Long so that you can calculate while statically staying finite + + /** + * Return the quotient of this duration and the given integer factor. + * + * @throws ArithmeticException if the factor is 0 + */ + def /(divisor: Long) = fromNanos(toNanos / divisor) + + /** + * Return the product of this duration and the given integer factor. + * + * @throws IllegalArgumentException if the result would overflow the range of FiniteDuration + */ + def *(factor: Long) = new FiniteDuration(safeMul(length, factor), unit) + + /* + * This method avoids the use of Long division, which saves 95% of the time spent, + * by checking that there are enough leading zeros so that the result has a chance + * to fit into a Long again; the remaining edge cases are caught by using the sign + * of the product for overflow detection. + * + * This method is not general purpose because it disallows the (otherwise legal) + * case of Long.MinValue * 1, but that is okay for use in FiniteDuration, since + * Long.MinValue is not a legal `length` anyway. + */ + private def safeMul(_a: Long, _b: Long): Long = { + val a = math.abs(_a) + val b = math.abs(_b) + import java.lang.Long.{ numberOfLeadingZeros => leading } + if (leading(a) + leading(b) < 64) throw new IllegalArgumentException("multiplication overflow") + val product = a * b + if (product < 0) throw new IllegalArgumentException("multiplication overflow") + if (a == _a ^ b == _b) -product else product + } + + /** + * Return the quotient of this duration and the given integer factor. + * + * @throws ArithmeticException if the factor is 0 + */ + def div(divisor: Long) = this / divisor + + /** + * Return the product of this duration and the given integer factor. + * + * @throws IllegalArgumentException if the result would overflow the range of FiniteDuration + */ + def mul(factor: Long) = this * factor + + def unary_- = Duration(-length, unit) + + final def isFinite() = true + + override def equals(other: Any) = other match { + case x: FiniteDuration => toNanos == x.toNanos + case _ => super.equals(other) + } + override def hashCode = toNanos.toInt +} diff --git a/src/library/scala/concurrent/duration/DurationConversions.scala b/src/library/scala/concurrent/duration/DurationConversions.scala new file mode 100644 index 0000000000..2e5ff4752b --- /dev/null +++ b/src/library/scala/concurrent/duration/DurationConversions.scala @@ -0,0 +1,92 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.duration + +import DurationConversions._ + +// Would be nice to limit the visibility of this trait a little bit, +// but it crashes scalac to do so. +trait DurationConversions { + protected def durationIn(unit: TimeUnit): FiniteDuration + + def nanoseconds = durationIn(NANOSECONDS) + def nanos = nanoseconds + def nanosecond = nanoseconds + def nano = nanoseconds + + def microseconds = durationIn(MICROSECONDS) + def micros = microseconds + def microsecond = microseconds + def micro = microseconds + + def milliseconds = durationIn(MILLISECONDS) + def millis = milliseconds + def millisecond = milliseconds + def milli = milliseconds + + def seconds = durationIn(SECONDS) + def second = seconds + + def minutes = durationIn(MINUTES) + def minute = minutes + + def hours = durationIn(HOURS) + def hour = hours + + def days = durationIn(DAYS) + def day = days + + def nanoseconds[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(nanoseconds) + def nanos[C](c: C)(implicit ev: Classifier[C]): ev.R = nanoseconds(c) + def nanosecond[C](c: C)(implicit ev: Classifier[C]): ev.R = nanoseconds(c) + def nano[C](c: C)(implicit ev: Classifier[C]): ev.R = nanoseconds(c) + + def microseconds[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(microseconds) + def micros[C](c: C)(implicit ev: Classifier[C]): ev.R = microseconds(c) + def microsecond[C](c: C)(implicit ev: Classifier[C]): ev.R = microseconds(c) + def micro[C](c: C)(implicit ev: Classifier[C]): ev.R = microseconds(c) + + def milliseconds[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(milliseconds) + def millis[C](c: C)(implicit ev: Classifier[C]): ev.R = milliseconds(c) + def millisecond[C](c: C)(implicit ev: Classifier[C]): ev.R = milliseconds(c) + def milli[C](c: C)(implicit ev: Classifier[C]): ev.R = milliseconds(c) + + def seconds[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(seconds) + def second[C](c: C)(implicit ev: Classifier[C]): ev.R = seconds(c) + + def minutes[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(minutes) + def minute[C](c: C)(implicit ev: Classifier[C]): ev.R = minutes(c) + + def hours[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(hours) + def hour[C](c: C)(implicit ev: Classifier[C]): ev.R = hours(c) + + def days[C](c: C)(implicit ev: Classifier[C]): ev.R = ev.convert(days) + def day[C](c: C)(implicit ev: Classifier[C]): ev.R = days(c) +} + +/** + * This object just holds some cogs which make the DSL machine work, not for direct consumption. + */ +object DurationConversions { + trait Classifier[C] { + type R + def convert(d: FiniteDuration): R + } + + implicit object spanConvert extends Classifier[span.type] { + type R = FiniteDuration + def convert(d: FiniteDuration) = d + } + + implicit object fromNowConvert extends Classifier[fromNow.type] { + type R = Deadline + def convert(d: FiniteDuration) = Deadline.now + d + } + +} diff --git a/src/library/scala/concurrent/duration/package.scala b/src/library/scala/concurrent/duration/package.scala new file mode 100644 index 0000000000..1b3461414e --- /dev/null +++ b/src/library/scala/concurrent/duration/package.scala @@ -0,0 +1,79 @@ +package scala.concurrent + +package object duration { + /** + * This object can be used as closing token if you prefer dot-less style but do not want + * to enable language.postfixOps: + * + * {{{ + * import scala.concurrent.duration._ + * + * val duration = 2 seconds span + * }}} + */ + object span + + /** + * This object can be used as closing token for declaring a deadline at some future point + * in time: + * + * {{{ + * import scala.concurrent.duration._ + * + * val deadline = 3 seconds fromNow + * }}} + */ + object fromNow + + type TimeUnit = java.util.concurrent.TimeUnit + final val DAYS = java.util.concurrent.TimeUnit.DAYS + final val HOURS = java.util.concurrent.TimeUnit.HOURS + final val MICROSECONDS = java.util.concurrent.TimeUnit.MICROSECONDS + final val MILLISECONDS = java.util.concurrent.TimeUnit.MILLISECONDS + final val MINUTES = java.util.concurrent.TimeUnit.MINUTES + final val NANOSECONDS = java.util.concurrent.TimeUnit.NANOSECONDS + final val SECONDS = java.util.concurrent.TimeUnit.SECONDS + + implicit def pairIntToDuration(p: (Int, TimeUnit)): Duration = Duration(p._1, p._2) + implicit def pairLongToDuration(p: (Long, TimeUnit)): FiniteDuration = Duration(p._1, p._2) + implicit def durationToPair(d: Duration): (Long, TimeUnit) = (d.length, d.unit) + + final class DurationInt(val n: Int) extends DurationConversions { + override protected def durationIn(unit: TimeUnit): FiniteDuration = Duration(n, unit) + } + implicit def intToDurationInt(n: Int) = new DurationInt(n) + + final class DurationLong(val n: Long) extends DurationConversions { + override protected def durationIn(unit: TimeUnit): FiniteDuration = Duration(n, unit) + } + implicit def longToDurationLong(n: Long) = new DurationLong(n) + + final class DurationDouble(val d: Double) extends DurationConversions { + override protected def durationIn(unit: TimeUnit): FiniteDuration = + Duration(d, unit) match { + case f: FiniteDuration => f + case _ => throw new IllegalArgumentException("Duration DSL not applicable to " + d) + } + } + implicit def doubleToDurationDouble(d: Double) = new DurationDouble(d) + + /* + * Avoid reflection based invocation by using non-duck type + */ + final class IntMult(val i: Int) { + def *(d: Duration) = d * i + def *(d: FiniteDuration) = d * i + } + implicit def intToIntMult(i: Int) = new IntMult(i) + + final class LongMult(val i: Long) { + def *(d: Duration) = d * i + def *(d: FiniteDuration) = d * i + } + implicit def longToLongMult(i: Long) = new LongMult(i) + + final class DoubleMult(val f: Double) { + def *(d: Duration) = d * f + } + implicit def doubleToDoubleMult(f: Double) = new DoubleMult(f) +} diff --git a/src/library/scala/concurrent/impl/AbstractPromise.java b/src/library/scala/concurrent/impl/AbstractPromise.java new file mode 100644 index 0000000000..b8165b6cde --- /dev/null +++ b/src/library/scala/concurrent/impl/AbstractPromise.java @@ -0,0 +1,40 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl; + + +import scala.concurrent.util.Unsafe; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + + + +abstract class AbstractPromise { + private volatile Object _ref; + + final static long _refoffset; + + static { + try { + _refoffset = Unsafe.instance.objectFieldOffset(AbstractPromise.class.getDeclaredField("_ref")); + } catch (Throwable t) { + throw new ExceptionInInitializerError(t); + } + } + + protected final boolean updateState(Object oldState, Object newState) { + return Unsafe.instance.compareAndSwapObject(this, _refoffset, oldState, newState); + } + + protected final Object getState() { + return _ref; + } + + protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater = + AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); +}
\ No newline at end of file diff --git a/src/library/scala/concurrent/impl/AdaptedRunnableAction.java b/src/library/scala/concurrent/impl/AdaptedRunnableAction.java new file mode 100644 index 0000000000..3dff9a1895 --- /dev/null +++ b/src/library/scala/concurrent/impl/AdaptedRunnableAction.java @@ -0,0 +1,21 @@ +package scala.concurrent.impl; + +import scala.concurrent.forkjoin.ForkJoinTask; +import java.util.concurrent.RunnableFuture; + +/** + * Adaptor for Runnables without results + */ +final class AdaptedRunnableAction extends ForkJoinTask<Void> + /*implements RunnableFuture<Void>*/ { + final Runnable runnable; + AdaptedRunnableAction(Runnable runnable) { + if (runnable == null) throw new NullPointerException(); + this.runnable = runnable; + } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) { } + public final boolean exec() { runnable.run(); return true; } + public final void run() { invoke(); } + private static final long serialVersionUID = 5232453952276885070L; +} diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala new file mode 100644 index 0000000000..9a03b1d5df --- /dev/null +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -0,0 +1,135 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl + + + +import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit, ThreadPoolExecutor } +import java.util.Collection +import scala.concurrent.forkjoin._ +import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService } +import scala.util.control.NonFatal + + + +private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContextExecutor { + + val executor: Either[ForkJoinPool, Executor] = es match { + case null => createExecutor + case some => Right(some) + } + + // Implement BlockContext on FJP threads + class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { + def wire[T <: Thread](thread: T): T = { + thread.setDaemon(daemonic) + //Potentially set things like uncaught exception handler, name etc + thread + } + + def newThread(runnable: Runnable): Thread = wire(new Thread(runnable)) + + def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext { + override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = { + var result: T = null.asInstanceOf[T] + ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { + @volatile var isdone = false + override def block(): Boolean = { + result = try thunk finally { isdone = true } + true + } + override def isReleasable = isdone + }, true) + result + } + }) + } + + def createExecutor: Either[ForkJoinPool, Executor] = { + + def getInt(name: String, f: String => Int): Int = + try f(System.getProperty(name)) catch { case e: Exception => Runtime.getRuntime.availableProcessors } + def range(floor: Int, desired: Int, ceiling: Int): Int = + if (ceiling < floor) range(ceiling, desired, floor) else scala.math.min(scala.math.max(desired, floor), ceiling) + + val desiredParallelism = range( + getInt("scala.concurrent.context.minThreads", _.toInt), + getInt("scala.concurrent.context.numThreads", { + case null | "" => Runtime.getRuntime.availableProcessors + case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt + case other => other.toInt + }), + getInt("scala.concurrent.context.maxThreads", _.toInt)) + + val threadFactory = new DefaultThreadFactory(daemonic = true) + + try { + val pool = new ForkJoinPool( + desiredParallelism, + threadFactory) + pool.setAsyncMode(true) + Left(pool) + } catch { + case NonFatal(t) => + System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to ThreadPoolExecutor") + t.printStackTrace(System.err) + val exec = new ThreadPoolExecutor( + desiredParallelism, + desiredParallelism, + 5L, + TimeUnit.MINUTES, + new LinkedBlockingQueue[Runnable], + threadFactory + ) + exec.allowCoreThreadTimeOut(true) + Right(exec) + } + } + + def execute(runnable: Runnable): Unit = executor match { + case Left(fj) => + Thread.currentThread match { + case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => + (runnable match { + case fjt: ForkJoinTask[_] => fjt + case _ => new AdaptedRunnableAction(runnable) + }).fork + case _ => fj.execute(runnable) + } + case Right(generic) => generic execute runnable + } + + def reportFailure(t: Throwable) = reporter(t) +} + + +private[concurrent] object ExecutionContextImpl { + + def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter) + + def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService = + new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService { + final def asExecutorService: ExecutorService = executor.right.asInstanceOf[ExecutorService] + override def execute(command: Runnable) = executor.right.get.execute(command) + override def shutdown() { asExecutorService.shutdown() } + override def shutdownNow() = asExecutorService.shutdownNow() + override def isShutdown = asExecutorService.isShutdown + override def isTerminated = asExecutorService.isTerminated + override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit) + override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable) + override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t) + override def submit(runnable: Runnable) = asExecutorService.submit(runnable) + override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables) + override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit) + override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables) + override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit) + } +} + + diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala new file mode 100644 index 0000000000..8c2a77c75f --- /dev/null +++ b/src/library/scala/concurrent/impl/Future.scala @@ -0,0 +1,34 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl + + + +import scala.concurrent.ExecutionContext +import scala.util.control.NonFatal +import scala.util.{Try, Success, Failure} + + +private[concurrent] object Future { + class PromiseCompletingRunnable[T](body: => T) extends Runnable { + val promise = new Promise.DefaultPromise[T]() + + override def run() = { + promise complete { + try Success(body) catch { case NonFatal(e) => Failure(e) } + } + } + } + + def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = { + val runnable = new PromiseCompletingRunnable(body) + executor.execute(runnable) + runnable.promise.future + } +} diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala new file mode 100644 index 0000000000..e9da45a079 --- /dev/null +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -0,0 +1,174 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl + +import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, TimeoutException, ExecutionException } +import scala.concurrent.duration.{ Duration, Deadline, FiniteDuration, NANOSECONDS } +import scala.annotation.tailrec +import scala.util.control.NonFatal +import scala.util.{ Try, Success, Failure } + +private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] { + def future: this.type = this +} + +/* Precondition: `executor` is prepared, i.e., `executor` has been returned from invocation of `prepare` on some other `ExecutionContext`. + */ +private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable { + // must be filled in before running it + var value: Try[T] = null + + override def run() = { + require(value ne null) // must set value to non-null before running! + try onComplete(value) catch { case NonFatal(e) => executor reportFailure e } + } + + def executeWithValue(v: Try[T]): Unit = { + require(value eq null) // can't complete it twice + value = v + // Note that we cannot prepare the ExecutionContext at this point, since we might + // already be running on a different thread! + executor.execute(this) + } +} + +private[concurrent] object Promise { + + private def resolveTry[T](source: Try[T]): Try[T] = source match { + case Failure(t) => resolver(t) + case _ => source + } + + private def resolver[T](throwable: Throwable): Try[T] = throwable match { + case t: scala.runtime.NonLocalReturnControl[_] => Success(t.value.asInstanceOf[T]) + case t: scala.util.control.ControlThrowable => Failure(new ExecutionException("Boxed ControlThrowable", t)) + case t: InterruptedException => Failure(new ExecutionException("Boxed InterruptedException", t)) + case e: Error => Failure(new ExecutionException("Boxed Error", e)) + case t => Failure(t) + } + + /** Default promise implementation. + */ + class DefaultPromise[T] extends AbstractPromise with Promise[T] { self => + updateState(null, Nil) // Start at "No callbacks" + + protected final def tryAwait(atMost: Duration): Boolean = { + @tailrec + def awaitUnsafe(deadline: Deadline, nextWait: FiniteDuration): Boolean = { + if (!isCompleted && nextWait > Duration.Zero) { + val ms = nextWait.toMillis + val ns = (nextWait.toNanos % 1000000l).toInt // as per object.wait spec + + synchronized { if (!isCompleted) wait(ms, ns) } + + awaitUnsafe(deadline, deadline.timeLeft) + } else + isCompleted + } + @tailrec + def awaitUnbounded(): Boolean = { + if (isCompleted) true + else { + synchronized { if (!isCompleted) wait() } + awaitUnbounded() + } + } + + import Duration.Undefined + atMost match { + case u if u eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period") + case Duration.Inf => awaitUnbounded + case Duration.MinusInf => isCompleted + case f: FiniteDuration => if (f > Duration.Zero) awaitUnsafe(f.fromNow, f) else isCompleted + } + } + + @throws(classOf[TimeoutException]) + @throws(classOf[InterruptedException]) + def ready(atMost: Duration)(implicit permit: CanAwait): this.type = + if (isCompleted || tryAwait(atMost)) this + else throw new TimeoutException("Futures timed out after [" + atMost + "]") + + @throws(classOf[Exception]) + def result(atMost: Duration)(implicit permit: CanAwait): T = + ready(atMost).value.get match { + case Failure(e) => throw e + case Success(r) => r + } + + def value: Option[Try[T]] = getState match { + case c: Try[_] => Some(c.asInstanceOf[Try[T]]) + case _ => None + } + + override def isCompleted: Boolean = getState match { // Cheaper than boxing result into Option due to "def value" + case _: Try[_] => true + case _ => false + } + + def tryComplete(value: Try[T]): Boolean = { + val resolved = resolveTry(value) + (try { + @tailrec + def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = { + getState match { + case raw: List[_] => + val cur = raw.asInstanceOf[List[CallbackRunnable[T]]] + if (updateState(cur, v)) cur else tryComplete(v) + case _ => null + } + } + tryComplete(resolved) + } finally { + synchronized { notifyAll() } //Notify any evil blockers + }) match { + case null => false + case rs if rs.isEmpty => true + case rs => rs.foreach(r => r.executeWithValue(resolved)); true + } + } + + def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { + val preparedEC = executor.prepare + val runnable = new CallbackRunnable[T](preparedEC, func) + + @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed + def dispatchOrAddCallback(): Unit = + getState match { + case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]]) + case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback() + } + dispatchOrAddCallback() + } + } + + /** An already completed Future is given its result at creation. + * + * Useful in Future-composition when a value to contribute is already available. + */ + final class KeptPromise[T](suppliedValue: Try[T]) extends Promise[T] { + + val value = Some(resolveTry(suppliedValue)) + + override def isCompleted: Boolean = true + + def tryComplete(value: Try[T]): Boolean = false + + def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { + val completedAs = value.get + val preparedEC = executor.prepare + (new CallbackRunnable(preparedEC, func)).executeWithValue(completedAs) + } + + def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + + def result(atMost: Duration)(implicit permit: CanAwait): T = value.get.get + } + +} diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala new file mode 100644 index 0000000000..cc69b0011c --- /dev/null +++ b/src/library/scala/concurrent/package.scala @@ -0,0 +1,109 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala + +import scala.concurrent.duration.Duration +import scala.annotation.implicitNotFound + +/** This package object contains primitives for concurrent and parallel programming. + */ +package object concurrent { + type ExecutionException = java.util.concurrent.ExecutionException + type CancellationException = java.util.concurrent.CancellationException + type TimeoutException = java.util.concurrent.TimeoutException + + /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. + * + * The result becomes available once the asynchronous computation is completed. + * + * @tparam T the type of the result + * @param body the asynchronous computation + * @param execctx the execution context on which the future is run + * @return the `Future` holding the result of the computation + */ + def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body) + + /** Creates a promise object which can be completed with a value or an exception. + * + * @tparam T the type of the value in the promise + * @return the newly created `Promise` object + */ + def promise[T](): Promise[T] = Promise[T]() + + /** Used to designate a piece of code which potentially blocks, allowing the current [[BlockContext]] to adjust + * the runtime's behavior. + * Properly marking blocking code may improve performance or avoid deadlocks. + * + * Blocking on an [[Awaitable]] should be done using [[Await.result]] instead of `blocking`. + * + * @param body A piece of code which contains potentially blocking or long running calls. + * @throws `CancellationException` if the computation was cancelled + * @throws `InterruptedException` in the case that a wait within the blocking `body` was interrupted + */ + @throws(classOf[Exception]) + def blocking[T](body: =>T): T = BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission) +} + +package concurrent { + @implicitNotFound("Don't call `Awaitable` methods directly, use the `Await` object.") + sealed trait CanAwait + + /** + * Internal usage only, implementation detail. + */ + private[concurrent] object AwaitPermission extends CanAwait + + /** + * `Await` is what is used to ensure proper handling of blocking for `Awaitable` instances. + */ + object Await { + /** + * Await the "completed" state of an `Awaitable`. + * + * Although this method is blocking, the internal use of [[scala.concurrent.blocking blocking]] ensures that + * the underlying [[ExecutionContext]] is prepared to properly manage the blocking. + * + * @param awaitable + * the `Awaitable` to be awaited + * @param atMost + * maximum wait time, which may be negative (no waiting is done), + * [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, or a finite positive + * duration + * @return the `awaitable` + * @throws InterruptedException if the current thread is interrupted while waiting + * @throws TimeoutException if after waiting for the specified time this `Awaitable` is still not ready + * @throws IllegalArgumentException if `atMost` is [[scala.concurrent.duration.Duration.Undefined Duration.Undefined]] + */ + @throws(classOf[TimeoutException]) + @throws(classOf[InterruptedException]) + def ready[T](awaitable: Awaitable[T], atMost: Duration): /*awaitable.type*/Awaitable[T] = + blocking(awaitable.ready(atMost)(AwaitPermission)) + + /** + * Await and return the result (of type `T`) of an `Awaitable`. + * + * Although this method is blocking, the internal use of [[scala.concurrent.blocking blocking]] ensures that + * the underlying [[ExecutionContext]] to properly detect blocking and ensure that there are no deadlocks. + * + * @param awaitable + * the `Awaitable` to be awaited + * @param atMost + * maximum wait time, which may be negative (no waiting is done), + * [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, or a finite positive + * duration + * @return the result value if `awaitable` is completed within the specific maximum wait time + * @throws InterruptedException if the current thread is interrupted while waiting + * @throws TimeoutException if after waiting for the specified time `awaitable` is still not ready + * @throws IllegalArgumentException if `atMost` is [[scala.concurrent.duration.Duration.Undefined Duration.Undefined]] + */ + @throws(classOf[Exception]) + def result[T](awaitable: Awaitable[T], atMost: Duration): T = + blocking(awaitable.result(atMost)(AwaitPermission)) + } +} diff --git a/src/library/scala/concurrent/util/Unsafe.java b/src/library/scala/concurrent/util/Unsafe.java new file mode 100644 index 0000000000..ef893c94d9 --- /dev/null +++ b/src/library/scala/concurrent/util/Unsafe.java @@ -0,0 +1,35 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.util; + + + +import java.lang.reflect.Field; + + + +public final class Unsafe { + public final static sun.misc.Unsafe instance; + static { + try { + sun.misc.Unsafe found = null; + for(Field field : sun.misc.Unsafe.class.getDeclaredFields()) { + if (field.getType() == sun.misc.Unsafe.class) { + field.setAccessible(true); + found = (sun.misc.Unsafe) field.get(null); + break; + } + } + if (found == null) throw new IllegalStateException("Can't find instance of sun.misc.Unsafe"); + else instance = found; + } catch(Throwable t) { + throw new ExceptionInInitializerError(t); + } + } +} diff --git a/src/library/scala/util/Try.scala b/src/library/scala/util/Try.scala new file mode 100644 index 0000000000..4bcccb6969 --- /dev/null +++ b/src/library/scala/util/Try.scala @@ -0,0 +1,216 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2008-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.util + +import scala.collection.Seq +import scala.util.control.NonFatal + +/** + * The `Try` type represents a computation that may either result in an exception, or return a + * successfully computed value. It's similar to, but semantically different from the [[scala.util.Either]] type. + * + * Instances of `Try[T]`, are either an instance of [[scala.util.Success]][T] or [[scala.util.Failure]][T]. + * + * For example, `Try` can be used to perform division on a user-defined input, without the need to do explicit + * exception-handling in all of the places that an exception might occur. + * + * Example: + * {{{ + * import scala.util.{Try, Success, Failure} + * + * def divide: Try[Int] = { + * val dividend = Try(Console.readLine("Enter an Int that you'd like to divide:\n").toInt) + * val divisor = Try(Console.readLine("Enter an Int that you'd like to divide by:\n").toInt) + * val problem = dividend.flatMap(x => divisor.map(y => x/y)) + * problem match { + * case Success(v) => + * println("Result of " + dividend.get + "/"+ divisor.get +" is: " + v) + * Success(v) + * case Failure(e) => + * println("You must've divided by zero or entered something that's not an Int. Try again!") + * println("Info from the exception: " + e.getMessage) + * divide + * } + * } + * + * }}} + * + * An important property of `Try` shown in the above example is its ability to ''pipeline'', or chain, operations, + * catching exceptions along the way. The `flatMap` and `map` combinators in the above example each essentially + * pass off either their successfully completed value, wrapped in the `Success` type for it to be further operated + * upon by the next combinator in the chain, or the exception wrapped in the `Failure` type usually to be simply + * passed on down the chain. Combinators such as `rescue` and `recover` are designed to provide some type of + * default behavior in the case of failure. + * + * ''Note'': only non-fatal exceptions are caught by the combinators on `Try` (see [[scala.util.control.NonFatal]]). + * Serious system errors, on the other hand, will be thrown. + * + * ''Note:'': all Try combinators will catch exceptions and return failure unless otherwise specified in the documentation. + * + * `Try` comes to the Scala standard library after years of use as an integral part of Twitter's stack. + * + * @author based on Twitter's original implementation in com.twitter.util. + * @since 2.10 + */ +sealed abstract class Try[+T] { + + /** Returns `true` if the `Try` is a `Failure`, `false` otherwise. + */ + def isFailure: Boolean + + /** Returns `true` if the `Try` is a `Success`, `false` otherwise. + */ + def isSuccess: Boolean + + /** Returns the value from this `Success` or the given `default` argument if this is a `Failure`. + * + * ''Note:'': This will throw an exception if it is not a success and default throws an exception. + */ + def getOrElse[U >: T](default: => U): U = + if (isSuccess) get else default + + /** Returns this `Try` if it's a `Success` or the given `default` argument if this is a `Failure`. + */ + def orElse[U >: T](default: => Try[U]): Try[U] = + try if (isSuccess) this else default + catch { + case NonFatal(e) => Failure(e) + } + + /** Returns the value from this `Success` or throws the exception if this is a `Failure`. + */ + def get: T + + /** + * Applies the given function `f` if this is a `Success`, otherwise returns `Unit` if this is a `Failure`. + * + * ''Note:'' If `f` throws, then this method may throw an exception. + */ + def foreach[U](f: T => U): Unit + + /** + * Returns the given function applied to the value from this `Success` or returns this if this is a `Failure`. + */ + def flatMap[U](f: T => Try[U]): Try[U] + + /** + * Maps the given function to the value from this `Success` or returns this if this is a `Failure`. + */ + def map[U](f: T => U): Try[U] + + /** + * Converts this to a `Failure` if the predicate is not satisfied. + */ + def filter(p: T => Boolean): Try[T] + + /** + * Applies the given function `f` if this is a `Failure`, otherwise returns this if this is a `Success`. + * This is like `flatMap` for the exception. + */ + def recoverWith[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] + + /** + * Applies the given function `f` if this is a `Failure`, otherwise returns this if this is a `Success`. + * This is like map for the exception. + */ + def recover[U >: T](f: PartialFunction[Throwable, U]): Try[U] + + /** + * Returns `None` if this is a `Failure` or a `Some` containing the value if this is a `Success`. + */ + def toOption: Option[T] = if (isSuccess) Some(get) else None + + /** + * Transforms a nested `Try`, ie, a `Try` of type `Try[Try[T]]`, + * into an un-nested `Try`, ie, a `Try` of type `Try[T]`. + */ + def flatten[U](implicit ev: T <:< Try[U]): Try[U] + + /** + * Completes this `Try` with an exception wrapped in a `Success`. The exception is either the exception that the + * `Try` failed with (if a `Failure`) or an `UnsupportedOperationException`. + */ + def failed: Try[Throwable] + + /** Completes this `Try` by applying the function `f` to this if this is of type `Failure`, or conversely, by applying + * `s` if this is a `Success`. + */ + def transform[U](s: T => Try[U], f: Throwable => Try[U]): Try[U] = + try this match { + case Success(v) => s(v) + case Failure(e) => f(e) + } catch { + case NonFatal(e) => Failure(e) + } + +} + +object Try { + /** Constructs a `Try` using the by-name parameter. This + * method will ensure any non-fatal exception is caught and a + * `Failure` object is returned. + */ + def apply[T](r: => T): Try[T] = + try Success(r) catch { + case NonFatal(e) => Failure(e) + } + +} + +final case class Failure[+T](val exception: Throwable) extends Try[T] { + def isFailure: Boolean = true + def isSuccess: Boolean = false + def recoverWith[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] = + try { + if (f isDefinedAt exception) f(exception) else this + } catch { + case NonFatal(e) => Failure(e) + } + def get: T = throw exception + def flatMap[U](f: T => Try[U]): Try[U] = this.asInstanceOf[Try[U]] + def flatten[U](implicit ev: T <:< Try[U]): Try[U] = this.asInstanceOf[Try[U]] + def foreach[U](f: T => U): Unit = () + def map[U](f: T => U): Try[U] = this.asInstanceOf[Try[U]] + def filter(p: T => Boolean): Try[T] = this + def recover[U >: T](rescueException: PartialFunction[Throwable, U]): Try[U] = + try { + if (rescueException isDefinedAt exception) { + Try(rescueException(exception)) + } else this + } catch { + case NonFatal(e) => Failure(e) + } + def failed: Try[Throwable] = Success(exception) +} + + +final case class Success[+T](value: T) extends Try[T] { + def isFailure: Boolean = false + def isSuccess: Boolean = true + def recoverWith[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] = this + def get = value + def flatMap[U](f: T => Try[U]): Try[U] = + try f(value) + catch { + case NonFatal(e) => Failure(e) + } + def flatten[U](implicit ev: T <:< Try[U]): Try[U] = value + def foreach[U](f: T => U): Unit = f(value) + def map[U](f: T => U): Try[U] = Try[U](f(value)) + def filter(p: T => Boolean): Try[T] = { + try { + if (p(value)) this + else Failure(new NoSuchElementException("Predicate does not hold for " + value)) + } catch { + case NonFatal(e) => Failure(e) + } + } + def recover[U >: T](rescueException: PartialFunction[Throwable, U]): Try[U] = this + def failed: Try[Throwable] = Failure(new UnsupportedOperationException("Success.failed")) +} diff --git a/src/library/scala/util/control/NonFatal.scala b/src/library/scala/util/control/NonFatal.scala new file mode 100644 index 0000000000..f364b3504c --- /dev/null +++ b/src/library/scala/util/control/NonFatal.scala @@ -0,0 +1,45 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.util.control + +/** + * Extractor of non-fatal Throwables. Will not match fatal errors like `VirtualMachineError` + * (for example, `OutOfMemoryError`, a subclass of `VirtualMachineError`), `ThreadDeath`, + * `LinkageError`, `InterruptedException`, `ControlThrowable`, or `NotImplementedError`. + * However, `StackOverflowError` is matched, i.e. considered non-fatal. + * + * Note that [[scala.util.control.ControlThrowable]], an internal Throwable, is not matched by + * `NonFatal` (and would therefore be thrown). + * + * For example, all harmless Throwables can be caught by: + * {{{ + * try { + * // dangerous stuff + * } catch { + * case NonFatal(e) => log.error(e, "Something not that bad.") + * // or + * case e if NonFatal(e) => log.error(e, "Something not that bad.") + * } + * }}} + */ +object NonFatal { + /** + * Returns true if the provided `Throwable` is to be considered non-fatal, or false if it is to be considered fatal + */ + def apply(t: Throwable): Boolean = t match { + case _: StackOverflowError => true // StackOverflowError ok even though it is a VirtualMachineError + // VirtualMachineError includes OutOfMemoryError and other fatal errors + case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError | _: ControlThrowable => false + case _ => true + } + /** + * Returns Some(t) if NonFatal(t) == true, otherwise None + */ + def unapply(t: Throwable): Option[Throwable] = if (apply(t)) Some(t) else None +} diff --git a/test/files/jvm/future-spec.flags b/test/files/jvm/future-spec.flags new file mode 100644 index 0000000000..a7ed61704f --- /dev/null +++ b/test/files/jvm/future-spec.flags @@ -0,0 +1 @@ +-Ydependent-method-types diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala new file mode 100644 index 0000000000..e5e3ef9e16 --- /dev/null +++ b/test/files/jvm/future-spec/FutureTests.scala @@ -0,0 +1,524 @@ + + + +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.concurrent.duration.Duration.Inf +import scala.collection._ +import scala.runtime.NonLocalReturnControl +import scala.util.{Try,Success,Failure} + + + +object FutureTests extends MinimalScalaTest { + + /* some utils */ + + def testAsync(s: String)(implicit ec: ExecutionContext): Future[String] = s match { + case "Hello" => future { "World" } + case "Failure" => Future.failed(new RuntimeException("Expected exception; to test fault-tolerance")) + case "NoReply" => Promise[String]().future + } + + val defaultTimeout = 5 seconds + + /* future specification */ + + "A future with custom ExecutionContext" should { + "shouldHandleThrowables" in { + val ms = new mutable.HashSet[Throwable] with mutable.SynchronizedSet[Throwable] + val threadPool = java.util.concurrent.Executors.newFixedThreadPool(4) + implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(threadPool, { + t => + ms += t + }) + + class ThrowableTest(m: String) extends Throwable(m) + + val f1 = future[Any] { + throw new ThrowableTest("test") + } + + intercept[ThrowableTest] { + Await.result(f1, defaultTimeout) + } + + val latch = new TestLatch + val f2 = future { + Await.ready(latch, 5 seconds) + "success" + } + val f3 = f2 map { s => s.toUpperCase } + + f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") } + f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") } + + latch.open() + + Await.result(f2, defaultTimeout) mustBe ("success") + + f2 foreach { _ => throw new ThrowableTest("current thread foreach") } + f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") } + + Await.result(f3, defaultTimeout) mustBe ("SUCCESS") + + val waiting = future { + Thread.sleep(1000) + } + Await.ready(waiting, 2000 millis) + + ms.size mustBe (4) + //FIXME should check + threadPool.shutdown() + } + } + + "A future with global ExecutionContext" should { + import ExecutionContext.Implicits._ + + "compose with for-comprehensions" in { + def async(x: Int) = future { (x * 2).toString } + val future0 = future[Any] { + "five!".length + } + + val future1 = for { + a <- future0.mapTo[Int] // returns 5 + b <- async(a) // returns "10" + c <- async(7) // returns "14" + } yield b + "-" + c + + val future2 = for { + a <- future0.mapTo[Int] + b <- (future { (a * 2).toString }).mapTo[Int] + c <- future { (7 * 2).toString } + } yield b + "-" + c + + Await.result(future1, defaultTimeout) mustBe ("10-14") + assert(checkType(future1, manifest[String])) + intercept[ClassCastException] { Await.result(future2, defaultTimeout) } + } + + "support pattern matching within a for-comprehension" in { + case class Req[T](req: T) + case class Res[T](res: T) + def async[T](req: Req[T]) = req match { + case Req(s: String) => future { Res(s.length) } + case Req(i: Int) => future { Res((i * 2).toString) } + } + + val future1 = for { + Res(a: Int) <- async(Req("Hello")) + Res(b: String) <- async(Req(a)) + Res(c: String) <- async(Req(7)) + } yield b + "-" + c + + val future2 = for { + Res(a: Int) <- async(Req("Hello")) + Res(b: Int) <- async(Req(a)) + Res(c: Int) <- async(Req(7)) + } yield b + "-" + c + + Await.result(future1, defaultTimeout) mustBe ("10-14") + intercept[NoSuchElementException] { Await.result(future2, defaultTimeout) } + } + + "recover from exceptions" in { + val future1 = Future(5) + val future2 = future1 map (_ / 0) + val future3 = future2 map (_.toString) + + val future4 = future1 recover { + case e: ArithmeticException => 0 + } map (_.toString) + + val future5 = future2 recover { + case e: ArithmeticException => 0 + } map (_.toString) + + val future6 = future2 recover { + case e: MatchError => 0 + } map (_.toString) + + val future7 = future3 recover { + case e: ArithmeticException => "You got ERROR" + } + + val future8 = testAsync("Failure") + val future9 = testAsync("Failure") recover { + case e: RuntimeException => "FAIL!" + } + val future10 = testAsync("Hello") recover { + case e: RuntimeException => "FAIL!" + } + val future11 = testAsync("Failure") recover { + case _ => "Oops!" + } + + Await.result(future1, defaultTimeout) mustBe (5) + intercept[ArithmeticException] { Await.result(future2, defaultTimeout) } + intercept[ArithmeticException] { Await.result(future3, defaultTimeout) } + Await.result(future4, defaultTimeout) mustBe ("5") + Await.result(future5, defaultTimeout) mustBe ("0") + intercept[ArithmeticException] { Await.result(future6, defaultTimeout) } + Await.result(future7, defaultTimeout) mustBe ("You got ERROR") + intercept[RuntimeException] { Await.result(future8, defaultTimeout) } + Await.result(future9, defaultTimeout) mustBe ("FAIL!") + Await.result(future10, defaultTimeout) mustBe ("World") + Await.result(future11, defaultTimeout) mustBe ("Oops!") + } + + "recoverWith from exceptions" in { + val o = new IllegalStateException("original") + val r = new IllegalStateException("recovered") + + intercept[IllegalStateException] { + val failed = Future.failed[String](o) recoverWith { + case _ if false == true => Future.successful("yay!") + } + Await.result(failed, defaultTimeout) + } mustBe (o) + + val recovered = Future.failed[String](o) recoverWith { + case _ => Future.successful("yay!") + } + Await.result(recovered, defaultTimeout) mustBe ("yay!") + + intercept[IllegalStateException] { + val refailed = Future.failed[String](o) recoverWith { + case _ => Future.failed[String](r) + } + Await.result(refailed, defaultTimeout) + } mustBe (r) + } + + "andThen like a boss" in { + val q = new java.util.concurrent.LinkedBlockingQueue[Int] + for (i <- 1 to 1000) { + val chained = future { + q.add(1); 3 + } andThen { + case _ => q.add(2) + } andThen { + case Success(0) => q.add(Int.MaxValue) + } andThen { + case _ => q.add(3); + } + Await.result(chained, defaultTimeout) mustBe (3) + q.poll() mustBe (1) + q.poll() mustBe (2) + q.poll() mustBe (3) + q.clear() + } + } + + "firstCompletedOf" in { + def futures = Vector.fill[Future[Int]](10) { + Promise[Int]().future + } :+ Future.successful[Int](5) + + Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5) + Await.result(Future.firstCompletedOf(futures.iterator), defaultTimeout) mustBe (5) + } + + "find" in { + val futures = for (i <- 1 to 10) yield future { + i + } + + val result = Future.find[Int](futures)(_ == 3) + Await.result(result, defaultTimeout) mustBe (Some(3)) + + val notFound = Future.find[Int](futures.iterator)(_ == 11) + Await.result(notFound, defaultTimeout) mustBe (None) + } + + "zip" in { + val timeout = 10000 millis + val f = new IllegalStateException("test") + intercept[IllegalStateException] { + val failed = Future.failed[String](f) zip Future.successful("foo") + Await.result(failed, timeout) + } mustBe (f) + + intercept[IllegalStateException] { + val failed = Future.successful("foo") zip Future.failed[String](f) + Await.result(failed, timeout) + } mustBe (f) + + intercept[IllegalStateException] { + val failed = Future.failed[String](f) zip Future.failed[String](f) + Await.result(failed, timeout) + } mustBe (f) + + val successful = Future.successful("foo") zip Future.successful("foo") + Await.result(successful, timeout) mustBe (("foo", "foo")) + } + + "fold" in { + val timeout = 10000 millis + def async(add: Int, wait: Int) = future { + Thread.sleep(wait) + add + } + + val futures = (0 to 9) map { + idx => async(idx, idx * 20) + } + val folded = Future.fold(futures)(0)(_ + _) + Await.result(folded, timeout) mustBe (45) + + val futuresit = (0 to 9) map { + idx => async(idx, idx * 20) + } + val foldedit = Future.fold(futures)(0)(_ + _) + Await.result(foldedit, timeout) mustBe (45) + } + + "fold by composing" in { + val timeout = 10000 millis + def async(add: Int, wait: Int) = future { + Thread.sleep(wait) + add + } + def futures = (0 to 9) map { + idx => async(idx, idx * 20) + } + val folded = futures.foldLeft(Future(0)) { + case (fr, fa) => for (r <- fr; a <- fa) yield (r + a) + } + Await.result(folded, timeout) mustBe (45) + } + + "fold with an exception" in { + val timeout = 10000 millis + def async(add: Int, wait: Int) = future { + Thread.sleep(wait) + if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") + add + } + def futures = (0 to 9) map { + idx => async(idx, idx * 10) + } + val folded = Future.fold(futures)(0)(_ + _) + intercept[IllegalArgumentException] { + Await.result(folded, timeout) + }.getMessage mustBe ("shouldFoldResultsWithException: expected") + } + + "fold mutable zeroes safely" in { + import scala.collection.mutable.ArrayBuffer + def test(testNumber: Int) { + val fs = (0 to 1000) map (i => Future(i)) + val f = Future.fold(fs)(ArrayBuffer.empty[AnyRef]) { + case (l, i) if i % 2 == 0 => l += i.asInstanceOf[AnyRef] + case (l, _) => l + } + val result = Await.result(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum + + assert(result == 250500) + } + + (1 to 100) foreach test //Make sure it tries to provoke the problem + } + + "return zero value if folding empty list" in { + val zero = Future.fold(List[Future[Int]]())(0)(_ + _) + Await.result(zero, defaultTimeout) mustBe (0) + } + + "shouldReduceResults" in { + def async(idx: Int) = future { + Thread.sleep(idx * 20) + idx + } + val timeout = 10000 millis + + val futures = (0 to 9) map { async } + val reduced = Future.reduce(futures)(_ + _) + Await.result(reduced, timeout) mustBe (45) + + val futuresit = (0 to 9) map { async } + val reducedit = Future.reduce(futuresit)(_ + _) + Await.result(reducedit, timeout) mustBe (45) + } + + "shouldReduceResultsWithException" in { + def async(add: Int, wait: Int) = future { + Thread.sleep(wait) + if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") + else add + } + val timeout = 10000 millis + def futures = (1 to 10) map { + idx => async(idx, idx * 10) + } + val failed = Future.reduce(futures)(_ + _) + intercept[IllegalArgumentException] { + Await.result(failed, timeout) + }.getMessage mustBe ("shouldFoldResultsWithException: expected") + } + + "shouldReduceThrowNSEEOnEmptyInput" in { + intercept[java.util.NoSuchElementException] { + val emptyreduced = Future.reduce(List[Future[Int]]())(_ + _) + Await.result(emptyreduced, defaultTimeout) + } + } + + "shouldTraverseFutures" in { + object counter { + var count = -1 + def incAndGet() = counter.synchronized { + count += 2 + count + } + } + + val oddFutures = List.fill(100)(future { counter.incAndGet() }) + val traversed = Future.sequence(oddFutures) + Await.result(traversed, defaultTimeout).sum mustBe (10000) + + val list = (1 to 100).toList + val traversedList = Future.traverse(list)(x => Future(x * 2 - 1)) + Await.result(traversedList, defaultTimeout).sum mustBe (10000) + } + + "shouldBlockUntilResult" in { + val latch = new TestLatch + + val f = future { + Await.ready(latch, 5 seconds) + 5 + } + val f2 = future { + val res = Await.result(f, Inf) + res + 9 + } + + intercept[TimeoutException] { + Await.ready(f2, 100 millis) + } + + latch.open() + + Await.result(f2, defaultTimeout) mustBe (14) + + val f3 = future { + Thread.sleep(100) + 5 + } + + intercept[TimeoutException] { + Await.ready(f3, 0 millis) + } + } + + "run callbacks async" in { + val latch = Vector.fill(10)(new TestLatch) + + val f1 = future { + latch(0).open() + Await.ready(latch(1), TestLatch.DefaultTimeout) + "Hello" + } + val f2 = f1 map { + s => + latch(2).open() + Await.ready(latch(3), TestLatch.DefaultTimeout) + s.length + } + for (_ <- f2) latch(4).open() + + Await.ready(latch(0), TestLatch.DefaultTimeout) + + f1.isCompleted mustBe (false) + f2.isCompleted mustBe (false) + + latch(1).open() + Await.ready(latch(2), TestLatch.DefaultTimeout) + + f1.isCompleted mustBe (true) + f2.isCompleted mustBe (false) + + val f3 = f1 map { + s => + latch(5).open() + Await.ready(latch(6), TestLatch.DefaultTimeout) + s.length * 2 + } + for (_ <- f3) latch(3).open() + + Await.ready(latch(5), TestLatch.DefaultTimeout) + + f3.isCompleted mustBe (false) + + latch(6).open() + Await.ready(latch(4), TestLatch.DefaultTimeout) + + f2.isCompleted mustBe (true) + f3.isCompleted mustBe (true) + + val p1 = Promise[String]() + val f4 = p1.future map { + s => + latch(7).open() + Await.ready(latch(8), TestLatch.DefaultTimeout) + s.length + } + for (_ <- f4) latch(9).open() + + p1.future.isCompleted mustBe (false) + f4.isCompleted mustBe (false) + + p1 complete Success("Hello") + + Await.ready(latch(7), TestLatch.DefaultTimeout) + + p1.future.isCompleted mustBe (true) + f4.isCompleted mustBe (false) + + latch(8).open() + Await.ready(latch(9), TestLatch.DefaultTimeout) + + Await.ready(f4, defaultTimeout) + f4.isCompleted mustBe (true) + } + + "should not deadlock with nested await (ticket 1313)" in { + val simple = Future() map { + _ => + val unit = Future(()) + val umap = unit map { _ => () } + Await.result(umap, Inf) + } + Await.ready(simple, Inf) + simple.isCompleted mustBe (true) + + val l1, l2 = new TestLatch + val complex = Future() map { + _ => + blocking { + val nested = Future(()) + for (_ <- nested) l1.open() + Await.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed + for (_ <- nested) l2.open() + Await.ready(l2, TestLatch.DefaultTimeout) + } + } + Await.ready(complex, defaultTimeout) + complex.isCompleted mustBe (true) + } + + "should not throw when Await.ready" in { + val expected = try Success(5 / 0) catch { case a: ArithmeticException => Failure(a) } + val f = future(5).map(_ / 0) + Await.ready(f, defaultTimeout) + f.value.get.toString mustBe expected.toString + } + + } + +} + + diff --git a/test/files/jvm/future-spec/PromiseTests.scala b/test/files/jvm/future-spec/PromiseTests.scala new file mode 100644 index 0000000000..0bd48f9dd6 --- /dev/null +++ b/test/files/jvm/future-spec/PromiseTests.scala @@ -0,0 +1,246 @@ + + + +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.concurrent.duration.Duration.Inf +import scala.collection._ +import scala.runtime.NonLocalReturnControl +import scala.util.{Try,Success,Failure} + + +object PromiseTests extends MinimalScalaTest { + import ExecutionContext.Implicits._ + + val defaultTimeout = Inf + + /* promise specification */ + + "An empty Promise" should { + + "not be completed" in { + val p = Promise() + p.future.isCompleted mustBe (false) + p.isCompleted mustBe (false) + } + + "have no value" in { + val p = Promise() + p.future.value mustBe (None) + p.isCompleted mustBe (false) + } + + "return supplied value on timeout" in { + val failure = Promise.failed[String](new RuntimeException("br0ken")).future + val otherFailure = Promise.failed[String](new RuntimeException("last")).future + val empty = Promise[String]().future + val timedOut = Promise.successful[String]("Timedout").future + + Await.result(failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout") + Await.result(timedOut fallbackTo empty, defaultTimeout) mustBe ("Timedout") + Await.result(failure fallbackTo failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout") + intercept[RuntimeException] { + Await.result(failure fallbackTo otherFailure, defaultTimeout) + }.getMessage mustBe ("last") + } + + } + + "A successful Promise" should { + val result = "test value" + val promise = Promise[String]().complete(Success(result)) + promise.isCompleted mustBe (true) + futureWithResult(_(promise.future, result)) + } + + "A failed Promise" should { + val message = "Expected Exception" + val promise = Promise[String]().complete(Failure(new RuntimeException(message))) + promise.isCompleted mustBe (true) + futureWithException[RuntimeException](_(promise.future, message)) + } + + "An interrupted Promise" should { + val message = "Boxed InterruptedException" + val future = Promise[String]().complete(Failure(new InterruptedException(message))).future + futureWithException[ExecutionException](_(future, message)) + } + + "A NonLocalReturnControl failed Promise" should { + val result = "test value" + val future = Promise[String]().complete(Failure(new NonLocalReturnControl[String]("test", result))).future + futureWithResult(_(future, result)) + } + + def futureWithResult(f: ((Future[Any], Any) => Unit) => Unit) { + + "be completed" in { f((future, _) => future.isCompleted mustBe (true)) } + + "contain a value" in { f((future, result) => future.value mustBe (Some(Success(result)))) } + + "return when ready with 'Await.ready'" in { f((future, result) => { Await.ready(future, defaultTimeout); future.isCompleted mustBe (true) }) } + + "return result with 'Await.result'" in { f((future, result) => Await.result(future, defaultTimeout) mustBe (result)) } + + "not timeout" in { f((future, _) => Await.ready(future, 0 millis)) } + + "filter result" in { + f { + (future, result) => + Await.result((future filter (_ => true)), defaultTimeout) mustBe (result) + intercept[NoSuchElementException] { + Await.result((future filter (_ => false)), defaultTimeout) + } + } + } + + "transform result with map" in { f((future, result) => Await.result((future map (_.toString.length)), defaultTimeout) mustBe (result.toString.length)) } + + "compose result with flatMap" in { + f { (future, result) => + val r = for (r <- future; p <- Promise.successful("foo").future) yield r.toString + p + Await.result(r, defaultTimeout) mustBe (result.toString + "foo") + } + } + + "perform action with foreach" in { + f { + (future, result) => + val p = Promise[Any]() + future foreach p.success + Await.result(p.future, defaultTimeout) mustBe (result) + } + } + + "zip properly" in { + f { + (future, result) => + Await.result(future zip Promise.successful("foo").future, defaultTimeout) mustBe ((result, "foo")) + intercept[RuntimeException] { + Await.result(future zip Promise.failed(new RuntimeException("ohnoes")).future, defaultTimeout) + }.getMessage mustBe ("ohnoes") + } + } + + "not recover from exception" in { f((future, result) => Await.result(future.recover({ case _ => "pigdog" }), defaultTimeout) mustBe (result)) } + + "perform action on result" in { + f { + (future, result) => + val p = Promise[Any]() + future.onSuccess { case x => p.success(x) } + Await.result(p.future, defaultTimeout) mustBe (result) + } + } + + "not project a failure" in { + f { + (future, result) => + intercept[NoSuchElementException] { + Await.result(future.failed, defaultTimeout) + }.getMessage mustBe ("Future.failed not completed with a throwable.") + } + } + + "cast using mapTo" in { + f { + (future, result) => + Await.result(future.mapTo[Boolean].recover({ case _: ClassCastException => false }), defaultTimeout) mustBe (false) + } + } + + } + + def futureWithException[E <: Throwable: Manifest](f: ((Future[Any], String) => Unit) => Unit) { + + "be completed" in { + f((future, _) => future.isCompleted mustBe (true)) + } + + "contain a value" in { + f((future, message) => { + future.value.get.failed.get.getMessage mustBe (message) + }) + } + + "throw not throw exception with 'Await.ready'" in { + f { + (future, message) => Await.ready(future, defaultTimeout); future.isCompleted mustBe (true) + } + } + + "throw exception with 'Await.result'" in { + f { + (future, message) => + intercept[E] { + Await.result(future, defaultTimeout) + }.getMessage mustBe (message) + } + } + + "retain exception with filter" in { + f { + (future, message) => + intercept[E] { Await.result(future filter (_ => true), defaultTimeout) }.getMessage mustBe (message) + intercept[E] { Await.result(future filter (_ => false), defaultTimeout) }.getMessage mustBe (message) + } + } + + "retain exception with map" in { + f { + (future, message) => + intercept[E] { Await.result(future map (_.toString.length), defaultTimeout) }.getMessage mustBe (message) + } + } + + "retain exception with flatMap" in { + f { + (future, message) => + intercept[E] { Await.result(future flatMap (_ => Promise.successful("foo").future), defaultTimeout) }.getMessage mustBe (message) + } + } + + "zip properly" in { + f { + (future, message) => + intercept[E] { + Await.result(future zip Promise.successful("foo").future, defaultTimeout) + }.getMessage mustBe (message) + } + } + + "recover from exception" in { + f { + (future, message) => + Await.result(future.recover({ case e if e.getMessage == message => "pigdog" }), defaultTimeout) mustBe ("pigdog") + } + } + + "project a failure" in { + f((future, message) => Await.result(future.failed, defaultTimeout).getMessage mustBe (message)) + } + + "perform action on exception" in { + f { + (future, message) => + val p = Promise[Any]() + future.onFailure { case _ => p.success(message) } + Await.result(p.future, defaultTimeout) mustBe (message) + } + } + + "always cast successfully using mapTo" in { + f { + (future, message) => + intercept[E] { Await.result(future.mapTo[java.lang.Thread], defaultTimeout) }.getMessage mustBe (message) + } + } + } +} + + + + + + + diff --git a/test/files/jvm/future-spec/TryTests.scala b/test/files/jvm/future-spec/TryTests.scala new file mode 100644 index 0000000000..5d1b9b84b4 --- /dev/null +++ b/test/files/jvm/future-spec/TryTests.scala @@ -0,0 +1,130 @@ +// This is a port of the com.twitter.util Try spec. +// -- +// It lives in the future-spec directory simply because it requires a specs-like +// DSL which has already been minimally implemented for the future spec tests. + +import scala.util.{Try,Success,Failure} + +object TryTests extends MinimalScalaTest { + class MyException extends Exception + val e = new Exception("this is an exception") + + "Try()" should { + "catch exceptions and lift into the Try type" in { + Try[Int](1) mustEqual Success(1) + Try[Int] { throw e } mustEqual Failure(e) + } + } + + "Try" should { + "recoverWith" in { + val myException = new MyException + Success(1) recoverWith { case _ => Success(2) } mustEqual Success(1) + Failure(e) recoverWith { case _ => Success(2) } mustEqual Success(2) + Failure(e) recoverWith { case _ => Failure(e) } mustEqual Failure(e) + } + + "getOrElse" in { + Success(1) getOrElse 2 mustEqual 1 + Failure(e) getOrElse 2 mustEqual 2 + } + + "orElse" in { + Success(1) orElse Success(2) mustEqual Success(1) + Failure(e) orElse Success(2) mustEqual Success(2) + } + + "map" in { + "when there is no exception" in { + Success(1) map(1+) mustEqual Success(2) + Failure[Int](e) map(1+) mustEqual Failure(e) + } + + "when there is an exception" in { + Success(1) map(_ => throw e) mustEqual Failure(e) + + val e2 = new Exception + Failure[Int](e) map(_ => throw e2) mustEqual Failure(e) + } + "when there is a fatal exception" in { + val e3 = new ThreadDeath + intercept[ThreadDeath] { + Success(1) map (_ => throw e3) + } + } + } + + "flatMap" in { + "when there is no exception" in { + Success(1) flatMap(x => Success(1 + x)) mustEqual Success(2) + Failure[Int](e) flatMap(x => Success(1 + x)) mustEqual Failure(e) + } + + "when there is an exception" in { + Success(1).flatMap[Int](_ => throw e) mustEqual Failure(e) + + val e2 = new Exception + Failure[Int](e).flatMap[Int](_ => throw e2) mustEqual Failure(e) + } + "when there is a fatal exception" in { + val e3 = new ThreadDeath + intercept[ThreadDeath] { + Success(1).flatMap[Int](_ => throw e3) + } + } + } + + "flatten" in { + "is a Success(Success)" in { + Success(Success(1)).flatten mustEqual Success(1) + } + + "is a Success(Failure)" in { + val e = new Exception + Success(Failure(e)).flatten mustEqual Failure(e) + } + + "is a Throw" in { + val e = new Exception + Failure[Try[Int]](e).flatten mustEqual Failure(e) + } + } + + "for" in { + "with no Failure values" in { + val result = for { + i <- Success(1) + j <- Success(1) + } yield (i + j) + result mustEqual Success(2) + } + + "with Failure values" in { + "throws before" in { + val result = for { + i <- Failure[Int](e) + j <- Success(1) + } yield (i + j) + result mustEqual Failure(e) + } + + "throws after" in { + val result = for { + i <- Success(1) + j <- Failure[Int](e) + } yield (i + j) + result mustEqual Failure(e) + } + + "returns the FIRST Failure" in { + val e2 = new Exception + val result = for { + i <- Failure[Int](e) + j <- Failure[Int](e2) + } yield (i + j) + result mustEqual Failure(e) + } + } + } + } +} diff --git a/test/files/jvm/future-spec/main.scala b/test/files/jvm/future-spec/main.scala new file mode 100644 index 0000000000..90048ccda0 --- /dev/null +++ b/test/files/jvm/future-spec/main.scala @@ -0,0 +1,110 @@ + + + +import scala.collection._ +import scala.concurrent._ +import scala.concurrent.duration.Duration +import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit } + + +object Test { + + def main(args: Array[String]) { + FutureTests.check() + PromiseTests.check() + TryTests.check() + } + +} + + +trait Output { + val buffer = new StringBuilder + + def bufferPrintln(a: Any) = buffer.synchronized { + buffer.append(a.toString + "\n") + } +} + + +trait MinimalScalaTest extends Output { + + val throwables = mutable.ArrayBuffer[Throwable]() + + def check() { + if (throwables.nonEmpty) println(buffer.toString) + } + + implicit def stringops(s: String) = new { + + def should[U](snippets: =>U) = { + bufferPrintln(s + " should:") + snippets + } + + def in[U](snippet: =>U) = { + try { + bufferPrintln("- " + s) + snippet + bufferPrintln("[OK] Test passed.") + } catch { + case e: Throwable => + bufferPrintln("[FAILED] " + e) + bufferPrintln(e.getStackTrace().mkString("\n")) + throwables += e + } + } + + } + + implicit def objectops(obj: Any) = new { + + def mustBe(other: Any) = assert(obj == other, obj + " is not " + other) + def mustEqual(other: Any) = mustBe(other) + + } + + def intercept[T <: Throwable: Manifest](body: =>Any): T = { + try { + body + throw new Exception("Exception of type %s was not thrown".format(manifest[T])) + } catch { + case t: Throwable => + if (manifest[T].erasure != t.getClass) throw t + else t.asInstanceOf[T] + } + } + + def checkType[T: Manifest, S](in: Future[T], refmanifest: Manifest[S]): Boolean = manifest[T] == refmanifest +} + + +object TestLatch { + val DefaultTimeout = Duration(5, TimeUnit.SECONDS) + + def apply(count: Int = 1) = new TestLatch(count) +} + + +class TestLatch(count: Int = 1) extends Awaitable[Unit] { + private var latch = new CountDownLatch(count) + + def countDown() = latch.countDown() + def isOpen: Boolean = latch.getCount == 0 + def open() = while (!isOpen) countDown() + def reset() = latch = new CountDownLatch(count) + + @throws(classOf[TimeoutException]) + def ready(atMost: Duration)(implicit permit: CanAwait) = { + val opened = latch.await(atMost.toNanos, TimeUnit.NANOSECONDS) + if (!opened) throw new TimeoutException("Timeout of %s." format (atMost.toString)) + this + } + + @throws(classOf[Exception]) + def result(atMost: Duration)(implicit permit: CanAwait): Unit = { + ready(atMost) + } + +} + diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala new file mode 100644 index 0000000000..00ef91539d --- /dev/null +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -0,0 +1,1036 @@ +import scala.concurrent.{ + Future, + Promise, + TimeoutException, + SyncVar, + ExecutionException, + ExecutionContext, + CanAwait, + Await +} +import scala.concurrent.{ future, promise, blocking } +import scala.util.{ Try, Success, Failure } +import scala.concurrent.duration.Duration + +trait TestBase { + + def intercept[T <: Exception](code: => Unit)(implicit cm: ClassManifest[Exception]): Unit = + try { + code + assert(false, "did not throw " + cm) + } catch { + case ex: Exception if cm.erasure isInstance ex => + } + + def once(body: (() => Unit) => Unit) { + val sv = new SyncVar[Boolean] + body(() => sv put true) + //sv.take(2000) + sv.take() + } + + // def assert(cond: => Boolean) { + // try { + // Predef.assert(cond) + // } catch { + // case e => e.printStackTrace() + // } + // } + +} + + +trait FutureCallbacks extends TestBase { + import ExecutionContext.Implicits._ + + def testOnSuccess(): Unit = once { + done => + var x = 0 + val f = future { + x = 1 + } + f onSuccess { + case _ => + done() + assert(x == 1) + } + } + + def testOnSuccessWhenCompleted(): Unit = once { + done => + var x = 0 + val f = future { + x = 1 + } + f onSuccess { + case _ => + assert(x == 1) + x = 2 + f onSuccess { + case _ => + assert(x == 2) + done() + } + } + } + + def testOnSuccessWhenFailed(): Unit = once { + done => + val f = future[Unit] { + done() + throw new Exception + } + f onSuccess { + case _ => assert(false) + } + } + + def testOnFailure(): Unit = once { + done => + var x = 0 + val f = future[Unit] { + x = 1 + throw new Exception + } + f onSuccess { + case _ => + done() + assert(false) + } + f onFailure { + case _ => + done() + assert(x == 1) + } + } + + def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once { + done => + val f = future[Unit] { + throw cause + } + f onSuccess { + case _ => + done() + assert(false) + } + f onFailure { + case e: ExecutionException if (e.getCause == cause) => + done() + case _ => + done() + assert(false) + } + } + + def testOnFailureWhenTimeoutException(): Unit = once { + done => + val f = future[Unit] { + throw new TimeoutException() + } + f onSuccess { + case _ => + done() + assert(false) + } + f onFailure { + case e: TimeoutException => + done() + case other => + done() + assert(false) + } + } + + testOnSuccess() + testOnSuccessWhenCompleted() + testOnSuccessWhenFailed() + testOnFailure() + testOnFailureWhenSpecialThrowable(5, new Error) + // testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { }) + //TODO: this test is currently problematic, because NonFatal does not match InterruptedException + //testOnFailureWhenSpecialThrowable(7, new InterruptedException) + testOnFailureWhenTimeoutException() + +} + + +trait FutureCombinators extends TestBase { + import ExecutionContext.Implicits._ + + def testMapSuccess(): Unit = once { + done => + val f = future { 5 } + val g = f map { x => "result: " + x } + g onSuccess { + case s => + done() + assert(s == "result: 5") + } + g onFailure { + case _ => + done() + assert(false) + } + } + + def testMapFailure(): Unit = once { + done => + val f = future { + throw new Exception("exception message") + } + val g = f map { x => "result: " + x } + g onSuccess { + case _ => + done() + assert(false) + } + g onFailure { + case t => + done() + assert(t.getMessage() == "exception message") + } + } + + def testMapSuccessPF(): Unit = once { + done => + val f = future { 5 } + val g = f map { case r => "result: " + r } + g onSuccess { + case s => + done() + assert(s == "result: 5") + } + g onFailure { + case _ => + done() + assert(false) + } + } + + def testTransformSuccess(): Unit = once { + done => + val f = future { 5 } + val g = f.transform(r => "result: " + r, identity) + g onSuccess { + case s => + done() + assert(s == "result: 5") + } + g onFailure { + case _ => + done() + assert(false) + } + } + + def testTransformSuccessPF(): Unit = once { + done => + val f = future { 5 } + val g = f.transform( { case r => "result: " + r }, identity) + g onSuccess { + case s => + done() + assert(s == "result: 5") + } + g onFailure { + case _ => + done() + assert(false) + } + } + + def testFoldFailure(): Unit = once { + done => + val f = future { + throw new Exception("exception message") + } + val g = f.transform(r => "result: " + r, identity) + g onSuccess { + case _ => + done() + assert(false) + } + g onFailure { + case t => + done() + assert(t.getMessage() == "exception message") + } + } + + def testFlatMapSuccess(): Unit = once { + done => + val f = future { 5 } + val g = f flatMap { _ => future { 10 } } + g onSuccess { + case x => + done() + assert(x == 10) + } + g onFailure { + case _ => + done() + assert(false) + } + } + + def testFlatMapFailure(): Unit = once { + done => + val f = future { + throw new Exception("exception message") + } + val g = f flatMap { _ => future { 10 } } + g onSuccess { + case _ => + done() + assert(false) + } + g onFailure { + case t => + done() + assert(t.getMessage() == "exception message") + } + } + + def testFilterSuccess(): Unit = once { + done => + val f = future { 4 } + val g = f filter { _ % 2 == 0 } + g onSuccess { + case x: Int => + done() + assert(x == 4) + } + g onFailure { + case _ => + done() + assert(false) + } + } + + def testFilterFailure(): Unit = once { + done => + val f = future { 4 } + val g = f filter { _ % 2 == 1 } + g onSuccess { + case x: Int => + done() + assert(false) + } + g onFailure { + case e: NoSuchElementException => + done() + assert(true) + case _ => + done() + assert(false) + } + } + + def testCollectSuccess(): Unit = once { + done => + val f = future { -5 } + val g = f collect { + case x if x < 0 => -x + } + g onSuccess { + case x: Int => + done() + assert(x == 5) + } + g onFailure { + case _ => + done() + assert(false) + } + } + + def testCollectFailure(): Unit = once { + done => + val f = future { -5 } + val g = f collect { + case x if x > 0 => x * 2 + } + g onSuccess { + case _ => + done() + assert(false) + } + g onFailure { + case e: NoSuchElementException => + done() + assert(true) + case _ => + done() + assert(false) + } + } + + /* TODO: Test for NonFatal in collect (more of a regression test at this point). + */ + + def testForeachSuccess(): Unit = once { + done => + val p = promise[Int]() + val f = future[Int] { 5 } + f foreach { x => p.success(x * 2) } + val g = p.future + + g.onSuccess { + case res: Int => + done() + assert(res == 10) + } + g.onFailure { + case _ => + done() + assert(false) + } + } + + def testForeachFailure(): Unit = once { + done => + val p = promise[Int]() + val f = future[Int] { throw new Exception } + f foreach { x => p.success(x * 2) } + f onFailure { case _ => p.failure(new Exception) } + val g = p.future + + g.onSuccess { + case _ => + done() + assert(false) + } + g.onFailure { + case _ => + done() + assert(true) + } + } + + def testRecoverSuccess(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } recover { + case re: RuntimeException => + "recovered" + } + f onSuccess { + case x => + done() + assert(x == "recovered") + } + f onFailure { case any => + done() + assert(false) + } + f + } + + def testRecoverFailure(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } recover { + case te: TimeoutException => "timeout" + } + f onSuccess { + case x => + done() + assert(false) + } + f onFailure { case any => + done() + assert(any == cause) + } + } + + def testRecoverWithSuccess(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } recoverWith { + case re: RuntimeException => + future { "recovered" } + } + f onSuccess { + case x => + done() + assert(x == "recovered") + } + f onFailure { case any => + done() + assert(false) + } + } + + def testRecoverWithFailure(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } recoverWith { + case te: TimeoutException => + future { "timeout" } + } + f onSuccess { + case x => + done() + assert(false) + } + f onFailure { case any => + done() + assert(any == cause) + } + } + + def testZipSuccess(): Unit = once { + done => + val f = future { 5 } + val g = future { 6 } + val h = f zip g + h onSuccess { + case (l: Int, r: Int) => + done() + assert(l+r == 11) + } + h onFailure { + case _ => + done() + assert(false) + } + } + + def testZipFailureLeft(): Unit = once { + done => + val cause = new Exception("exception message") + val f = future { throw cause } + val g = future { 6 } + val h = f zip g + h onSuccess { + case _ => + done() + assert(false) + } + h onFailure { + case e: Exception => + done() + assert(e.getMessage == "exception message") + } + } + + def testZipFailureRight(): Unit = once { + done => + val cause = new Exception("exception message") + val f = future { 5 } + val g = future { throw cause } + val h = f zip g + h onSuccess { + case _ => + done() + assert(false) + } + h onFailure { + case e: Exception => + done() + assert(e.getMessage == "exception message") + } + } + + def testFallbackTo(): Unit = once { + done => + val f = future { sys.error("failed") } + val g = future { 5 } + val h = f fallbackTo g + + h onSuccess { + case x: Int => + done() + assert(x == 5) + } + h onFailure { + case _ => + done() + assert(false) + } + } + + def testFallbackToFailure(): Unit = once { + done => + val cause = new Exception + val f = future { sys.error("failed") } + val g = future { throw cause } + val h = f fallbackTo g + + h onSuccess { + case _ => + done() + assert(false) + } + h onFailure { + case e: Exception => + done() + assert(e == cause) + } + } + + testMapSuccess() + testMapFailure() + testFlatMapSuccess() + testFlatMapFailure() + testFilterSuccess() + testFilterFailure() + testCollectSuccess() + testCollectFailure() + testForeachSuccess() + testForeachFailure() + testRecoverSuccess() + testRecoverFailure() + testRecoverWithSuccess() + testRecoverWithFailure() + testZipSuccess() + testZipFailureLeft() + testZipFailureRight() + testFallbackTo() + testFallbackToFailure() +} + + +trait FutureProjections extends TestBase { + import ExecutionContext.Implicits._ + + def testFailedFailureOnComplete(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + f.failed onComplete { + case Success(t) => + assert(t == cause) + done() + case Failure(t) => + assert(false) + } + } + + def testFailedFailureOnSuccess(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + f.failed onSuccess { + case t => + assert(t == cause) + done() + } + } + + def testFailedSuccessOnComplete(): Unit = once { + done => + val f = future { 0 } + f.failed onComplete { + case Success(t) => + assert(false) + case Failure(t) => + assert(t.isInstanceOf[NoSuchElementException]) + done() + } + } + + def testFailedSuccessOnFailure(): Unit = once { + done => + val f = future { 0 } + f.failed onFailure { + case nsee: NoSuchElementException => + done() + } + } + + def testFailedFailureAwait(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + assert(Await.result(f.failed, Duration(500, "ms")) == cause) + done() + } + + def testFailedSuccessAwait(): Unit = once { + done => + val f = future { 0 } + try { + Await.result(f.failed, Duration(500, "ms")) + assert(false) + } catch { + case nsee: NoSuchElementException => done() + } + } + + def testAwaitPositiveDuration(): Unit = once { done => + val p = Promise[Int]() + val f = p.future + future { + intercept[IllegalArgumentException] { Await.ready(f, Duration.Undefined) } + p.success(0) + Await.ready(f, Duration.Zero) + Await.ready(f, Duration(500, "ms")) + Await.ready(f, Duration.Inf) + done() + } onFailure { case x => throw x } + } + + def testAwaitNegativeDuration(): Unit = once { done => + val f = Promise().future + future { + intercept[TimeoutException] { Await.ready(f, Duration.Zero) } + intercept[TimeoutException] { Await.ready(f, Duration.MinusInf) } + intercept[TimeoutException] { Await.ready(f, Duration(-500, "ms")) } + done() + } onFailure { case x => throw x } + } + + testFailedFailureOnComplete() + testFailedFailureOnSuccess() + testFailedSuccessOnComplete() + testFailedSuccessOnFailure() + testFailedFailureAwait() + testFailedSuccessAwait() + testAwaitPositiveDuration() + testAwaitNegativeDuration() + +} + + +trait Blocking extends TestBase { + import ExecutionContext.Implicits._ + + def testAwaitSuccess(): Unit = once { + done => + val f = future { 0 } + Await.result(f, Duration(500, "ms")) + done() + } + + def testAwaitFailure(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + try { + Await.result(f, Duration(500, "ms")) + assert(false) + } catch { + case t => + assert(t == cause) + done() + } + } + + def testFQCNForAwaitAPI(): Unit = once { + done => + + assert(classOf[CanAwait].getName == "scala.concurrent.CanAwait") + assert(Await.getClass.getName == "scala.concurrent.Await") + + done() + } + + testAwaitSuccess() + testAwaitFailure() + testFQCNForAwaitAPI() +} + +trait BlockContexts extends TestBase { + import ExecutionContext.Implicits._ + import scala.concurrent.{ Await, Awaitable, BlockContext } + + private def getBlockContext(body: => BlockContext): BlockContext = { + Await.result(Future { body }, Duration(500, "ms")) + } + + // test outside of an ExecutionContext + def testDefaultOutsideFuture(): Unit = { + val bc = BlockContext.current + assert(bc.getClass.getName.contains("DefaultBlockContext")) + } + + // test BlockContext in our default ExecutionContext + def testDefaultFJP(): Unit = { + val bc = getBlockContext(BlockContext.current) + assert(bc.isInstanceOf[scala.concurrent.forkjoin.ForkJoinWorkerThread]) + } + + // test BlockContext inside BlockContext.withBlockContext + def testPushCustom(): Unit = { + val orig = BlockContext.current + val customBC = new BlockContext() { + override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = orig.blockOn(thunk) + } + + val bc = getBlockContext({ + BlockContext.withBlockContext(customBC) { + BlockContext.current + } + }) + + assert(bc eq customBC) + } + + // test BlockContext after a BlockContext.push + def testPopCustom(): Unit = { + val orig = BlockContext.current + val customBC = new BlockContext() { + override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = orig.blockOn(thunk) + } + + val bc = getBlockContext({ + BlockContext.withBlockContext(customBC) {} + BlockContext.current + }) + + assert(bc ne customBC) + } + + testDefaultOutsideFuture() + testDefaultFJP() + testPushCustom() + testPopCustom() +} + +trait Promises extends TestBase { + import ExecutionContext.Implicits._ + + def testSuccess(): Unit = once { + done => + val p = promise[Int]() + val f = p.future + + f onSuccess { + case x => + done() + assert(x == 5) + } + f onFailure { + case any => + done() + assert(false) + } + + p.success(5) + } + + testSuccess() + +} + + +trait Exceptions extends TestBase { + import ExecutionContext.Implicits._ + +} + +trait CustomExecutionContext extends TestBase { + import scala.concurrent.{ ExecutionContext, Awaitable } + + def defaultEC = ExecutionContext.global + + val inEC = new java.lang.ThreadLocal[Int]() { + override def initialValue = 0 + } + + def enterEC() = inEC.set(inEC.get + 1) + def leaveEC() = inEC.set(inEC.get - 1) + def assertEC() = assert(inEC.get > 0) + def assertNoEC() = assert(inEC.get == 0) + + class CountingExecutionContext extends ExecutionContext { + val _count = new java.util.concurrent.atomic.AtomicInteger(0) + def count = _count.get + + def delegate = ExecutionContext.global + + override def execute(runnable: Runnable) = { + _count.incrementAndGet() + val wrapper = new Runnable() { + override def run() = { + enterEC() + try { + runnable.run() + } finally { + leaveEC() + } + } + } + delegate.execute(wrapper) + } + + override def reportFailure(t: Throwable): Unit = { + System.err.println("Failure: " + t.getClass.getSimpleName + ": " + t.getMessage) + delegate.reportFailure(t) + } + } + + def countExecs(block: (ExecutionContext) => Unit): Int = { + val context = new CountingExecutionContext() + block(context) + context.count + } + + def testOnSuccessCustomEC(): Unit = { + val count = countExecs { implicit ec => + blocking { + once { done => + val f = future({ assertNoEC() })(defaultEC) + f onSuccess { + case _ => + assertEC() + done() + } + assertNoEC() + } + } + } + + // should be onSuccess, but not future body + assert(count == 1) + } + + def testKeptPromiseCustomEC(): Unit = { + val count = countExecs { implicit ec => + blocking { + once { done => + val f = Promise.successful(10).future + f onSuccess { + case _ => + assertEC() + done() + } + } + } + } + + // should be onSuccess called once in proper EC + assert(count == 1) + } + + def testCallbackChainCustomEC(): Unit = { + val count = countExecs { implicit ec => + blocking { + once { done => + assertNoEC() + val addOne = { x: Int => assertEC(); x + 1 } + val f = Promise.successful(10).future + f.map(addOne).filter { x => + assertEC() + x == 11 + } flatMap { x => + Promise.successful(x + 1).future.map(addOne).map(addOne) + } onComplete { + case Failure(t) => + try { + throw new AssertionError("error in test: " + t.getMessage, t) + } finally { + done() + } + case Success(x) => + assertEC() + assert(x == 14) + done() + } + assertNoEC() + } + } + } + + // the count is not defined (other than >=1) + // due to the batching optimizations. + assert(count >= 1) + } + + testOnSuccessCustomEC() + testKeptPromiseCustomEC() + testCallbackChainCustomEC() +} + +trait ExecutionContextPrepare extends TestBase { + val theLocal = new ThreadLocal[String] { + override protected def initialValue(): String = "" + } + + class PreparingExecutionContext extends ExecutionContext { + def delegate = ExecutionContext.global + + override def execute(runnable: Runnable): Unit = + delegate.execute(runnable) + + override def prepare(): ExecutionContext = { + // save object stored in ThreadLocal storage + val localData = theLocal.get + new PreparingExecutionContext { + override def execute(runnable: Runnable): Unit = { + val wrapper = new Runnable { + override def run(): Unit = { + // now we're on the new thread + // put localData into theLocal + theLocal.set(localData) + runnable.run() + } + } + delegate.execute(wrapper) + } + } + } + + override def reportFailure(t: Throwable): Unit = + delegate.reportFailure(t) + } + + implicit val ec = new PreparingExecutionContext + + def testOnComplete(): Unit = once { + done => + theLocal.set("secret") + val fut = future { 42 } + fut onComplete { + case _ => + assert(theLocal.get == "secret") + done() + } + } + + def testMap(): Unit = once { + done => + theLocal.set("secret2") + val fut = future { 42 } + fut map { x => + assert(theLocal.get == "secret2") + done() + } + } + + testOnComplete() + testMap() +} + +object Test +extends App +with FutureCallbacks +with FutureCombinators +with FutureProjections +with Promises +with BlockContexts +with Exceptions +with CustomExecutionContext +with ExecutionContextPrepare +{ + System.exit(0) +} + diff --git a/test/files/pos/t5958.flags b/test/files/pos/t5958.flags new file mode 100644 index 0000000000..a7ed61704f --- /dev/null +++ b/test/files/pos/t5958.flags @@ -0,0 +1 @@ +-Ydependent-method-types diff --git a/test/files/pos/t5958.scala b/test/files/pos/t5958.scala new file mode 100644 index 0000000000..b7eb8cf8df --- /dev/null +++ b/test/files/pos/t5958.scala @@ -0,0 +1,15 @@ +class Test { + def newComponent(u: Universe): u.Component = throw new Exception("not implemented") + + class Universe { self => + class Component + + newComponent(this): this.Component // error, but should be fine since this is a stable reference + newComponent(self): self.Component // error, but should be fine since this is a stable reference + newComponent(self): this.Component // error, but should be fine since this is a stable reference + newComponent(this): self.Component // error, but should be fine since this is a stable reference + + val u = this + newComponent(u): u.Component // ok + } +} |