summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/collection/Iterator.scala8
-rw-r--r--src/library/scala/collection/TraversableOnce.scala29
-rw-r--r--src/library/scala/concurrent/Future.scala52
-rw-r--r--src/library/scala/concurrent/util/Duration.scala12
-rw-r--r--test/files/jvm/future-spec/FutureTests.scala261
-rw-r--r--test/files/jvm/future-spec/PromiseTests.scala244
6 files changed, 329 insertions, 277 deletions
diff --git a/src/library/scala/collection/Iterator.scala b/src/library/scala/collection/Iterator.scala
index 65de60c8fe..af83291a46 100644
--- a/src/library/scala/collection/Iterator.scala
+++ b/src/library/scala/collection/Iterator.scala
@@ -21,6 +21,14 @@ import immutable.Stream
*/
object Iterator {
+ /** With the advent of `TraversableOnce` and `Iterator`, it can be useful to have a builder which
+ * operates on `Iterator`s so they can be treated uniformly along with the collections.
+ * See `scala.util.Random.shuffle` for an example.
+ */
+ implicit def IteratorCanBuildFrom[A] = new TraversableOnce.BufferedCanBuildFrom[A, Iterator] {
+ def toColl[B](coll: ArrayBuffer[B]) = coll.iterator
+ }
+
/** The iterator which produces no values. */
val empty: Iterator[Nothing] = new AbstractIterator[Nothing] {
def hasNext: Boolean = false
diff --git a/src/library/scala/collection/TraversableOnce.scala b/src/library/scala/collection/TraversableOnce.scala
index 5b5cee7f1b..b34daed2be 100644
--- a/src/library/scala/collection/TraversableOnce.scala
+++ b/src/library/scala/collection/TraversableOnce.scala
@@ -8,7 +8,7 @@
package scala.collection
-import mutable.{ Buffer, ListBuffer, ArrayBuffer }
+import mutable.{ Buffer, Builder, ListBuffer, ArrayBuffer }
import annotation.unchecked.{ uncheckedVariance => uV }
import language.{implicitConversions, higherKinds}
@@ -357,7 +357,6 @@ trait TraversableOnce[+A] extends Any with GenTraversableOnce[A] {
}
-
object TraversableOnce {
@deprecated("use OnceCanBuildFrom instead")
def traversableOnceCanBuildFrom[T] = new OnceCanBuildFrom[T]
@@ -367,26 +366,34 @@ object TraversableOnce {
implicit def alternateImplicit[A](trav: TraversableOnce[A]) = new ForceImplicitAmbiguity
implicit def flattenTraversableOnce[A, CC[_]](travs: TraversableOnce[CC[A]])(implicit ev: CC[A] => TraversableOnce[A]) =
new FlattenOps[A](travs map ev)
-
- /** With the advent of TraversableOnce, it can be useful to have a builder which
- * operates on Iterators so they can be treated uniformly along with the collections.
- * See scala.util.Random.shuffle for an example.
- */
- implicit class OnceCanBuildFrom[A] extends generic.CanBuildFrom[TraversableOnce[A], A, TraversableOnce[A]] {
- def newIterator = new ArrayBuffer[A] mapResult (_.iterator)
+
+ abstract class BufferedCanBuildFrom[A, Coll[X] <: TraversableOnce[X]] extends generic.CanBuildFrom[Coll[_], A, Coll[A]] {
+ def toColl[B](buff: ArrayBuffer[B]): Coll[B]
+
+ def newIterator: Builder[A, Coll[A]] = new ArrayBuffer[A] mapResult toColl
/** Creates a new builder on request of a collection.
* @param from the collection requesting the builder to be created.
* @return the result of invoking the `genericBuilder` method on `from`.
*/
- def apply(from: TraversableOnce[A]) = newIterator
+ def apply(from: Coll[_]): Builder[A, Coll[A]] = newIterator
/** Creates a new builder from scratch
* @return the result of invoking the `newBuilder` method of this factory.
*/
def apply() = newIterator
}
-
+
+ /** With the advent of `TraversableOnce`, it can be useful to have a builder which
+ * operates on `Iterator`s so they can be treated uniformly along with the collections.
+ * See `scala.util.Random.shuffle` or `scala.concurrent.Future.sequence` for an example.
+ */
+ class OnceCanBuildFrom[A] extends BufferedCanBuildFrom[A, TraversableOnce] {
+ def toColl[B](buff: ArrayBuffer[B]) = buff.iterator
+ }
+
+ implicit def OnceCanBuildFrom[A] = new OnceCanBuildFrom[A]
+
class FlattenOps[A](travs: TraversableOnce[TraversableOnce[A]]) {
def flatten: Iterator[A] = new AbstractIterator[A] {
val its = travs.toIterator
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 55c91619fa..11505e4146 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -426,21 +426,8 @@ trait Future[+T] extends Awaitable[T] {
* that conforms to `S`'s erased type or a `ClassCastException` otherwise.
*/
def mapTo[S](implicit tag: ClassTag[S]): Future[S] = {
- import java.{ lang => jl }
- val toBoxed = Map[Class[_], Class[_]](
- classOf[Boolean] -> classOf[jl.Boolean],
- classOf[Byte] -> classOf[jl.Byte],
- classOf[Char] -> classOf[jl.Character],
- classOf[Short] -> classOf[jl.Short],
- classOf[Int] -> classOf[jl.Integer],
- classOf[Long] -> classOf[jl.Long],
- classOf[Float] -> classOf[jl.Float],
- classOf[Double] -> classOf[jl.Double],
- classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
- )
-
def boxedType(c: Class[_]): Class[_] = {
- if (c.isPrimitive) toBoxed(c) else c
+ if (c.isPrimitive) Future.toBoxed(c) else c
}
val p = newPromise[S]
@@ -530,7 +517,22 @@ trait Future[+T] extends Awaitable[T] {
* Note: using this method yields nondeterministic dataflow programs.
*/
object Future {
- /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
+
+ import java.{ lang => jl }
+
+ private[concurrent] val toBoxed = Map[Class[_], Class[_]](
+ classOf[Boolean] -> classOf[jl.Boolean],
+ classOf[Byte] -> classOf[jl.Byte],
+ classOf[Char] -> classOf[jl.Character],
+ classOf[Short] -> classOf[jl.Short],
+ classOf[Int] -> classOf[jl.Integer],
+ classOf[Long] -> classOf[jl.Long],
+ classOf[Float] -> classOf[jl.Float],
+ classOf[Double] -> classOf[jl.Double],
+ classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
+ )
+
+ /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
*
* The result becomes available once the asynchronous computation is completed.
*
@@ -544,10 +546,10 @@ object Future {
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
- /** Simple version of `Futures.traverse`. Transforms a `Traversable[Future[A]]` into a `Future[Traversable[A]]`.
+ /** Simple version of `Futures.traverse`. Transforms a `TraversableOnce[Future[A]]` into a `Future[TraversableOnce[A]]`.
* Useful for reducing many `Future`s into a single `Future`.
*/
- def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
+ def sequence[A, M[_] <: TraversableOnce[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
in.foldLeft(Promise.successful(cbf(in)).future) {
(fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)
} map (_.result)
@@ -555,7 +557,7 @@ object Future {
/** Returns a `Future` to the result of the first future in the list that is completed.
*/
- def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
+ def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
val completeFirst: Either[Throwable, T] => Unit = p tryComplete _
@@ -566,7 +568,8 @@ object Future {
/** Returns a `Future` that will hold the optional result of the first `Future` with a result that matches the predicate.
*/
- def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
+ def find[T](futurestravonce: TraversableOnce[Future[T]])(predicate: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
+ val futures = futurestravonce.toBuffer
if (futures.isEmpty) Promise.successful[Option[T]](None).future
else {
val result = Promise[Option[T]]()
@@ -577,8 +580,9 @@ object Future {
case _ =>
}
} finally {
- if (ref.decrementAndGet == 0)
+ if (ref.decrementAndGet == 0) {
result tryComplete Right(None)
+ }
}
futures.foreach(_ onComplete search)
@@ -597,7 +601,7 @@ object Future {
* val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
* }}}
*/
- def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
+ def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
if (futures.isEmpty) Promise.successful(zero).future
else sequence(futures).map(_.foldLeft(zero)(foldFun))
}
@@ -609,12 +613,12 @@ object Future {
* val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds)
* }}}
*/
- def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
+ def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")).future
else sequence(futures).map(_ reduceLeft op)
}
- /** Transforms a `Traversable[A]` into a `Future[Traversable[B]]` using the provided function `A => Future[B]`.
+ /** Transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]` using the provided function `A => Future[B]`.
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel:
*
@@ -622,7 +626,7 @@ object Future {
* val myFutureList = Future.traverse(myList)(x => Future(myFunc(x)))
* }}}
*/
- def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
+ def traverse[A, B, M[_] <: TraversableOnce[_]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(Promise.successful(cbf(in)).future) { (fr, a) =>
val fb = fn(a.asInstanceOf[A])
for (r <- fr; b <- fb) yield (r += b)
diff --git a/src/library/scala/concurrent/util/Duration.scala b/src/library/scala/concurrent/util/Duration.scala
index 0e39fdffa3..bab664727e 100644
--- a/src/library/scala/concurrent/util/Duration.scala
+++ b/src/library/scala/concurrent/util/Duration.scala
@@ -1,12 +1,16 @@
-/**
- * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
- */
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
package scala.concurrent.util
import java.util.concurrent.TimeUnit
import TimeUnit._
-import java.lang.{ Double ⇒ JDouble }
+import java.lang.{ Double => JDouble }
import language.implicitConversions
case class Deadline private (time: Duration) {
diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala
index 90ed10b93a..9a9cf951bb 100644
--- a/test/files/jvm/future-spec/FutureTests.scala
+++ b/test/files/jvm/future-spec/FutureTests.scala
@@ -162,10 +162,12 @@ object FutureTests extends MinimalScalaTest {
}
"firstCompletedOf" in {
- val futures = Vector.fill[Future[Int]](10) {
+ def futures = Vector.fill[Future[Int]](10) {
Promise[Int]().future
} :+ Promise.successful[Int](5).future
+
Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5)
+ Await.result(Future.firstCompletedOf(futures.iterator), defaultTimeout) mustBe (5)
}
"find" in {
@@ -176,7 +178,7 @@ object FutureTests extends MinimalScalaTest {
val result = Future.find[Int](futures)(_ == 3)
Await.result(result, defaultTimeout) mustBe (Some(3))
- val notFound = Future.find[Int](futures)(_ == 11)
+ val notFound = Future.find[Int](futures.iterator)(_ == 11)
Await.result(notFound, defaultTimeout) mustBe (None)
}
@@ -208,11 +210,18 @@ object FutureTests extends MinimalScalaTest {
Thread.sleep(wait)
add
}
- def futures = (0 to 9) map {
+
+ val futures = (0 to 9) map {
idx => async(idx, idx * 200)
}
- def folded = Future.fold(futures)(0)(_ + _)
+ val folded = Future.fold(futures)(0)(_ + _)
Await.result(folded, timeout) mustBe (45)
+
+ val futuresit = (0 to 9) map {
+ idx => async(idx, idx * 200)
+ }
+ val foldedit = Future.fold(futures)(0)(_ + _)
+ Await.result(foldedit, timeout) mustBe (45)
}
"fold by composing" in {
@@ -273,9 +282,14 @@ object FutureTests extends MinimalScalaTest {
idx
}
val timeout = 10000 millis
+
val futures = (0 to 9) map { async }
val reduced = Future.reduce(futures)(_ + _)
Await.result(reduced, timeout) mustBe (45)
+
+ val futuresit = (0 to 9) map { async }
+ val reducedit = Future.reduce(futuresit)(_ + _)
+ Await.result(reducedit, timeout) mustBe (45)
}
"shouldReduceResultsWithException" in {
@@ -310,13 +324,17 @@ object FutureTests extends MinimalScalaTest {
}
}
- val oddFutures = List.fill(100)(future { counter.incAndGet() })
+ val oddFutures = List.fill(100)(future { counter.incAndGet() }).iterator
val traversed = Future.sequence(oddFutures)
Await.result(traversed, defaultTimeout).sum mustBe (10000)
val list = (1 to 100).toList
val traversedList = Future.traverse(list)(x => Future(x * 2 - 1))
Await.result(traversedList, defaultTimeout).sum mustBe (10000)
+
+ val iterator = (1 to 100).toList.iterator
+ val traversedIterator = Future.traverse(iterator)(x => Future(x * 2 - 1))
+ Await.result(traversedIterator, defaultTimeout).sum mustBe (10000)
}
"shouldHandleThrowables" in {
@@ -491,236 +509,3 @@ object FutureTests extends MinimalScalaTest {
}
-object PromiseTests extends MinimalScalaTest {
-
- val defaultTimeout = Inf
-
- /* promise specification */
-
- "An empty Promise" should {
-
- "not be completed" in {
- val p = Promise()
- p.future.isCompleted mustBe (false)
- }
-
- "have no value" in {
- val p = Promise()
- p.future.value mustBe (None)
- }
-
- "return supplied value on timeout" in {
- val failure = Promise.failed[String](new RuntimeException("br0ken")).future
- val otherFailure = Promise.failed[String](new RuntimeException("last")).future
- val empty = Promise[String]().future
- val timedOut = Promise.successful[String]("Timedout").future
-
- Await.result(failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout")
- Await.result(timedOut fallbackTo empty, defaultTimeout) mustBe ("Timedout")
- Await.result(failure fallbackTo failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout")
- intercept[RuntimeException] {
- Await.result(failure fallbackTo otherFailure, defaultTimeout)
- }.getMessage mustBe ("last")
- }
-
- }
-
- "A successful Promise" should {
- val result = "test value"
- val future = Promise[String]().complete(Right(result)).future
- futureWithResult(_(future, result))
- }
-
- "A failed Promise" should {
- val message = "Expected Exception"
- val future = Promise[String]().complete(Left(new RuntimeException(message))).future
- futureWithException[RuntimeException](_(future, message))
- }
-
- "An interrupted Promise" should {
- val message = "Boxed InterruptedException"
- val future = Promise[String]().complete(Left(new InterruptedException(message))).future
- futureWithException[ExecutionException](_(future, message))
- }
-
- "A NonLocalReturnControl failed Promise" should {
- val result = "test value"
- val future = Promise[String]().complete(Left(new NonLocalReturnControl[String]("test", result))).future
- futureWithResult(_(future, result))
- }
-
- def futureWithResult(f: ((Future[Any], Any) => Unit) => Unit) {
-
- "be completed" in { f((future, _) => future.isCompleted mustBe (true)) }
-
- "contain a value" in { f((future, result) => future.value mustBe (Some(Right(result)))) }
-
- "return result with 'blocking'" in { f((future, result) => blocking(future, defaultTimeout) mustBe (result)) }
-
- "return result with 'Await.result'" in { f((future, result) => Await.result(future, defaultTimeout) mustBe (result)) }
-
- "not timeout" in { f((future, _) => Await.ready(future, 0 millis)) }
-
- "filter result" in {
- f {
- (future, result) =>
- Await.result((future filter (_ => true)), defaultTimeout) mustBe (result)
- intercept[NoSuchElementException] {
- Await.result((future filter (_ => false)), defaultTimeout)
- }
- }
- }
-
- "transform result with map" in { f((future, result) => Await.result((future map (_.toString.length)), defaultTimeout) mustBe (result.toString.length)) }
-
- "compose result with flatMap" in {
- f { (future, result) =>
- val r = for (r <- future; p <- Promise.successful("foo").future) yield r.toString + p
- Await.result(r, defaultTimeout) mustBe (result.toString + "foo")
- }
- }
-
- "perform action with foreach" in {
- f {
- (future, result) =>
- val p = Promise[Any]()
- future foreach p.success
- Await.result(p.future, defaultTimeout) mustBe (result)
- }
- }
-
- "zip properly" in {
- f {
- (future, result) =>
- Await.result(future zip Promise.successful("foo").future, defaultTimeout) mustBe ((result, "foo"))
- intercept[RuntimeException] {
- Await.result(future zip Promise.failed(new RuntimeException("ohnoes")).future, defaultTimeout)
- }.getMessage mustBe ("ohnoes")
- }
- }
-
- "not recover from exception" in { f((future, result) => Await.result(future.recover({ case _ => "pigdog" }), defaultTimeout) mustBe (result)) }
-
- "perform action on result" in {
- f {
- (future, result) =>
- val p = Promise[Any]()
- future.onSuccess { case x => p.success(x) }
- Await.result(p.future, defaultTimeout) mustBe (result)
- }
- }
-
- "not project a failure" in {
- f {
- (future, result) =>
- intercept[NoSuchElementException] {
- Await.result(future.failed, defaultTimeout)
- }.getMessage mustBe ("Future.failed not completed with a throwable. Instead completed with: " + result)
- }
- }
-
- "cast using mapTo" in {
- f {
- (future, result) =>
- Await.result(future.mapTo[Boolean].recover({ case _: ClassCastException ⇒ false }), defaultTimeout) mustBe (false)
- }
- }
-
- }
-
- def futureWithException[E <: Throwable: Manifest](f: ((Future[Any], String) => Unit) => Unit) {
-
- "be completed" in {
- f((future, _) => future.isCompleted mustBe (true))
- }
-
- "contain a value" in {
- f((future, message) => {
- future.value.get.left.get.getMessage mustBe (message)
- })
- }
-
- "throw exception with 'blocking'" in {
- f {
- (future, message) =>
- intercept[E] {
- blocking(future, defaultTimeout)
- }.getMessage mustBe (message)
- }
- }
-
- "throw exception with 'Await.result'" in {
- f {
- (future, message) =>
- intercept[E] {
- Await.result(future, defaultTimeout)
- }.getMessage mustBe (message)
- }
- }
-
- "retain exception with filter" in {
- f {
- (future, message) =>
- intercept[E] { Await.result(future filter (_ => true), defaultTimeout) }.getMessage mustBe (message)
- intercept[E] { Await.result(future filter (_ => false), defaultTimeout) }.getMessage mustBe (message)
- }
- }
-
- "retain exception with map" in {
- f {
- (future, message) =>
- intercept[E] { Await.result(future map (_.toString.length), defaultTimeout) }.getMessage mustBe (message)
- }
- }
-
- "retain exception with flatMap" in {
- f {
- (future, message) =>
- intercept[E] { Await.result(future flatMap (_ => Promise.successful("foo").future), defaultTimeout) }.getMessage mustBe (message)
- }
- }
-
- "zip properly" in {
- f {
- (future, message) =>
- intercept[E] {
- Await.result(future zip Promise.successful("foo").future, defaultTimeout)
- }.getMessage mustBe (message)
- }
- }
-
- "recover from exception" in {
- f {
- (future, message) =>
- Await.result(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), defaultTimeout) mustBe ("pigdog")
- }
- }
-
- "project a failure" in {
- f((future, message) => Await.result(future.failed, defaultTimeout).getMessage mustBe (message))
- }
-
- "perform action on exception" in {
- f {
- (future, message) =>
- val p = Promise[Any]()
- future.onFailure { case _ => p.success(message) }
- Await.result(p.future, defaultTimeout) mustBe (message)
- }
- }
-
- "always cast successfully using mapTo" in {
- f {
- (future, message) =>
- intercept[E] { Await.result(future.mapTo[java.lang.Thread], defaultTimeout) }.getMessage mustBe (message)
- }
- }
- }
-}
-
-
-
-
-
-
-
diff --git a/test/files/jvm/future-spec/PromiseTests.scala b/test/files/jvm/future-spec/PromiseTests.scala
new file mode 100644
index 0000000000..6016746a23
--- /dev/null
+++ b/test/files/jvm/future-spec/PromiseTests.scala
@@ -0,0 +1,244 @@
+
+
+
+import scala.concurrent._
+import scala.concurrent.util.duration._
+import scala.concurrent.util.Duration.Inf
+import scala.collection._
+import scala.runtime.NonLocalReturnControl
+
+
+
+object PromiseTests extends MinimalScalaTest {
+
+ val defaultTimeout = Inf
+
+ /* promise specification */
+
+ "An empty Promise" should {
+
+ "not be completed" in {
+ val p = Promise()
+ p.future.isCompleted mustBe (false)
+ }
+
+ "have no value" in {
+ val p = Promise()
+ p.future.value mustBe (None)
+ }
+
+ "return supplied value on timeout" in {
+ val failure = Promise.failed[String](new RuntimeException("br0ken")).future
+ val otherFailure = Promise.failed[String](new RuntimeException("last")).future
+ val empty = Promise[String]().future
+ val timedOut = Promise.successful[String]("Timedout").future
+
+ Await.result(failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout")
+ Await.result(timedOut fallbackTo empty, defaultTimeout) mustBe ("Timedout")
+ Await.result(failure fallbackTo failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout")
+ intercept[RuntimeException] {
+ Await.result(failure fallbackTo otherFailure, defaultTimeout)
+ }.getMessage mustBe ("last")
+ }
+
+ }
+
+ "A successful Promise" should {
+ val result = "test value"
+ val future = Promise[String]().complete(Right(result)).future
+ futureWithResult(_(future, result))
+ }
+
+ "A failed Promise" should {
+ val message = "Expected Exception"
+ val future = Promise[String]().complete(Left(new RuntimeException(message))).future
+ futureWithException[RuntimeException](_(future, message))
+ }
+
+ "An interrupted Promise" should {
+ val message = "Boxed InterruptedException"
+ val future = Promise[String]().complete(Left(new InterruptedException(message))).future
+ futureWithException[ExecutionException](_(future, message))
+ }
+
+ "A NonLocalReturnControl failed Promise" should {
+ val result = "test value"
+ val future = Promise[String]().complete(Left(new NonLocalReturnControl[String]("test", result))).future
+ futureWithResult(_(future, result))
+ }
+
+ def futureWithResult(f: ((Future[Any], Any) => Unit) => Unit) {
+
+ "be completed" in { f((future, _) => future.isCompleted mustBe (true)) }
+
+ "contain a value" in { f((future, result) => future.value mustBe (Some(Right(result)))) }
+
+ "return result with 'blocking'" in { f((future, result) => blocking(future, defaultTimeout) mustBe (result)) }
+
+ "return result with 'Await.result'" in { f((future, result) => Await.result(future, defaultTimeout) mustBe (result)) }
+
+ "not timeout" in { f((future, _) => Await.ready(future, 0 millis)) }
+
+ "filter result" in {
+ f {
+ (future, result) =>
+ Await.result((future filter (_ => true)), defaultTimeout) mustBe (result)
+ intercept[NoSuchElementException] {
+ Await.result((future filter (_ => false)), defaultTimeout)
+ }
+ }
+ }
+
+ "transform result with map" in { f((future, result) => Await.result((future map (_.toString.length)), defaultTimeout) mustBe (result.toString.length)) }
+
+ "compose result with flatMap" in {
+ f { (future, result) =>
+ val r = for (r <- future; p <- Promise.successful("foo").future) yield r.toString + p
+ Await.result(r, defaultTimeout) mustBe (result.toString + "foo")
+ }
+ }
+
+ "perform action with foreach" in {
+ f {
+ (future, result) =>
+ val p = Promise[Any]()
+ future foreach p.success
+ Await.result(p.future, defaultTimeout) mustBe (result)
+ }
+ }
+
+ "zip properly" in {
+ f {
+ (future, result) =>
+ Await.result(future zip Promise.successful("foo").future, defaultTimeout) mustBe ((result, "foo"))
+ intercept[RuntimeException] {
+ Await.result(future zip Promise.failed(new RuntimeException("ohnoes")).future, defaultTimeout)
+ }.getMessage mustBe ("ohnoes")
+ }
+ }
+
+ "not recover from exception" in { f((future, result) => Await.result(future.recover({ case _ => "pigdog" }), defaultTimeout) mustBe (result)) }
+
+ "perform action on result" in {
+ f {
+ (future, result) =>
+ val p = Promise[Any]()
+ future.onSuccess { case x => p.success(x) }
+ Await.result(p.future, defaultTimeout) mustBe (result)
+ }
+ }
+
+ "not project a failure" in {
+ f {
+ (future, result) =>
+ intercept[NoSuchElementException] {
+ Await.result(future.failed, defaultTimeout)
+ }.getMessage mustBe ("Future.failed not completed with a throwable. Instead completed with: " + result)
+ }
+ }
+
+ "cast using mapTo" in {
+ f {
+ (future, result) =>
+ Await.result(future.mapTo[Boolean].recover({ case _: ClassCastException ⇒ false }), defaultTimeout) mustBe (false)
+ }
+ }
+
+ }
+
+ def futureWithException[E <: Throwable: Manifest](f: ((Future[Any], String) => Unit) => Unit) {
+
+ "be completed" in {
+ f((future, _) => future.isCompleted mustBe (true))
+ }
+
+ "contain a value" in {
+ f((future, message) => {
+ future.value.get.left.get.getMessage mustBe (message)
+ })
+ }
+
+ "throw exception with 'blocking'" in {
+ f {
+ (future, message) =>
+ intercept[E] {
+ blocking(future, defaultTimeout)
+ }.getMessage mustBe (message)
+ }
+ }
+
+ "throw exception with 'Await.result'" in {
+ f {
+ (future, message) =>
+ intercept[E] {
+ Await.result(future, defaultTimeout)
+ }.getMessage mustBe (message)
+ }
+ }
+
+ "retain exception with filter" in {
+ f {
+ (future, message) =>
+ intercept[E] { Await.result(future filter (_ => true), defaultTimeout) }.getMessage mustBe (message)
+ intercept[E] { Await.result(future filter (_ => false), defaultTimeout) }.getMessage mustBe (message)
+ }
+ }
+
+ "retain exception with map" in {
+ f {
+ (future, message) =>
+ intercept[E] { Await.result(future map (_.toString.length), defaultTimeout) }.getMessage mustBe (message)
+ }
+ }
+
+ "retain exception with flatMap" in {
+ f {
+ (future, message) =>
+ intercept[E] { Await.result(future flatMap (_ => Promise.successful("foo").future), defaultTimeout) }.getMessage mustBe (message)
+ }
+ }
+
+ "zip properly" in {
+ f {
+ (future, message) =>
+ intercept[E] {
+ Await.result(future zip Promise.successful("foo").future, defaultTimeout)
+ }.getMessage mustBe (message)
+ }
+ }
+
+ "recover from exception" in {
+ f {
+ (future, message) =>
+ Await.result(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), defaultTimeout) mustBe ("pigdog")
+ }
+ }
+
+ "project a failure" in {
+ f((future, message) => Await.result(future.failed, defaultTimeout).getMessage mustBe (message))
+ }
+
+ "perform action on exception" in {
+ f {
+ (future, message) =>
+ val p = Promise[Any]()
+ future.onFailure { case _ => p.success(message) }
+ Await.result(p.future, defaultTimeout) mustBe (message)
+ }
+ }
+
+ "always cast successfully using mapTo" in {
+ f {
+ (future, message) =>
+ intercept[E] { Await.result(future.mapTo[java.lang.Thread], defaultTimeout) }.getMessage mustBe (message)
+ }
+ }
+ }
+}
+
+
+
+
+
+
+