diff options
-rw-r--r-- | src/library/scala/collection/Iterator.scala | 8 | ||||
-rw-r--r-- | src/library/scala/collection/TraversableOnce.scala | 29 | ||||
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 52 | ||||
-rw-r--r-- | src/library/scala/concurrent/util/Duration.scala | 12 | ||||
-rw-r--r-- | test/files/jvm/future-spec/FutureTests.scala | 261 | ||||
-rw-r--r-- | test/files/jvm/future-spec/PromiseTests.scala | 244 |
6 files changed, 329 insertions, 277 deletions
diff --git a/src/library/scala/collection/Iterator.scala b/src/library/scala/collection/Iterator.scala index 65de60c8fe..af83291a46 100644 --- a/src/library/scala/collection/Iterator.scala +++ b/src/library/scala/collection/Iterator.scala @@ -21,6 +21,14 @@ import immutable.Stream */ object Iterator { + /** With the advent of `TraversableOnce` and `Iterator`, it can be useful to have a builder which + * operates on `Iterator`s so they can be treated uniformly along with the collections. + * See `scala.util.Random.shuffle` for an example. + */ + implicit def IteratorCanBuildFrom[A] = new TraversableOnce.BufferedCanBuildFrom[A, Iterator] { + def toColl[B](coll: ArrayBuffer[B]) = coll.iterator + } + /** The iterator which produces no values. */ val empty: Iterator[Nothing] = new AbstractIterator[Nothing] { def hasNext: Boolean = false diff --git a/src/library/scala/collection/TraversableOnce.scala b/src/library/scala/collection/TraversableOnce.scala index 5b5cee7f1b..b34daed2be 100644 --- a/src/library/scala/collection/TraversableOnce.scala +++ b/src/library/scala/collection/TraversableOnce.scala @@ -8,7 +8,7 @@ package scala.collection -import mutable.{ Buffer, ListBuffer, ArrayBuffer } +import mutable.{ Buffer, Builder, ListBuffer, ArrayBuffer } import annotation.unchecked.{ uncheckedVariance => uV } import language.{implicitConversions, higherKinds} @@ -357,7 +357,6 @@ trait TraversableOnce[+A] extends Any with GenTraversableOnce[A] { } - object TraversableOnce { @deprecated("use OnceCanBuildFrom instead") def traversableOnceCanBuildFrom[T] = new OnceCanBuildFrom[T] @@ -367,26 +366,34 @@ object TraversableOnce { implicit def alternateImplicit[A](trav: TraversableOnce[A]) = new ForceImplicitAmbiguity implicit def flattenTraversableOnce[A, CC[_]](travs: TraversableOnce[CC[A]])(implicit ev: CC[A] => TraversableOnce[A]) = new FlattenOps[A](travs map ev) - - /** With the advent of TraversableOnce, it can be useful to have a builder which - * operates on Iterators so they can be treated uniformly along with the collections. - * See scala.util.Random.shuffle for an example. - */ - implicit class OnceCanBuildFrom[A] extends generic.CanBuildFrom[TraversableOnce[A], A, TraversableOnce[A]] { - def newIterator = new ArrayBuffer[A] mapResult (_.iterator) + + abstract class BufferedCanBuildFrom[A, Coll[X] <: TraversableOnce[X]] extends generic.CanBuildFrom[Coll[_], A, Coll[A]] { + def toColl[B](buff: ArrayBuffer[B]): Coll[B] + + def newIterator: Builder[A, Coll[A]] = new ArrayBuffer[A] mapResult toColl /** Creates a new builder on request of a collection. * @param from the collection requesting the builder to be created. * @return the result of invoking the `genericBuilder` method on `from`. */ - def apply(from: TraversableOnce[A]) = newIterator + def apply(from: Coll[_]): Builder[A, Coll[A]] = newIterator /** Creates a new builder from scratch * @return the result of invoking the `newBuilder` method of this factory. */ def apply() = newIterator } - + + /** With the advent of `TraversableOnce`, it can be useful to have a builder which + * operates on `Iterator`s so they can be treated uniformly along with the collections. + * See `scala.util.Random.shuffle` or `scala.concurrent.Future.sequence` for an example. + */ + class OnceCanBuildFrom[A] extends BufferedCanBuildFrom[A, TraversableOnce] { + def toColl[B](buff: ArrayBuffer[B]) = buff.iterator + } + + implicit def OnceCanBuildFrom[A] = new OnceCanBuildFrom[A] + class FlattenOps[A](travs: TraversableOnce[TraversableOnce[A]]) { def flatten: Iterator[A] = new AbstractIterator[A] { val its = travs.toIterator diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 55c91619fa..11505e4146 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -426,21 +426,8 @@ trait Future[+T] extends Awaitable[T] { * that conforms to `S`'s erased type or a `ClassCastException` otherwise. */ def mapTo[S](implicit tag: ClassTag[S]): Future[S] = { - import java.{ lang => jl } - val toBoxed = Map[Class[_], Class[_]]( - classOf[Boolean] -> classOf[jl.Boolean], - classOf[Byte] -> classOf[jl.Byte], - classOf[Char] -> classOf[jl.Character], - classOf[Short] -> classOf[jl.Short], - classOf[Int] -> classOf[jl.Integer], - classOf[Long] -> classOf[jl.Long], - classOf[Float] -> classOf[jl.Float], - classOf[Double] -> classOf[jl.Double], - classOf[Unit] -> classOf[scala.runtime.BoxedUnit] - ) - def boxedType(c: Class[_]): Class[_] = { - if (c.isPrimitive) toBoxed(c) else c + if (c.isPrimitive) Future.toBoxed(c) else c } val p = newPromise[S] @@ -530,7 +517,22 @@ trait Future[+T] extends Awaitable[T] { * Note: using this method yields nondeterministic dataflow programs. */ object Future { - /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. + + import java.{ lang => jl } + + private[concurrent] val toBoxed = Map[Class[_], Class[_]]( + classOf[Boolean] -> classOf[jl.Boolean], + classOf[Byte] -> classOf[jl.Byte], + classOf[Char] -> classOf[jl.Character], + classOf[Short] -> classOf[jl.Short], + classOf[Int] -> classOf[jl.Integer], + classOf[Long] -> classOf[jl.Long], + classOf[Float] -> classOf[jl.Float], + classOf[Double] -> classOf[jl.Double], + classOf[Unit] -> classOf[scala.runtime.BoxedUnit] + ) + + /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. * * The result becomes available once the asynchronous computation is completed. * @@ -544,10 +546,10 @@ object Future { import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom - /** Simple version of `Futures.traverse`. Transforms a `Traversable[Future[A]]` into a `Future[Traversable[A]]`. + /** Simple version of `Futures.traverse`. Transforms a `TraversableOnce[Future[A]]` into a `Future[TraversableOnce[A]]`. * Useful for reducing many `Future`s into a single `Future`. */ - def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { + def sequence[A, M[_] <: TraversableOnce[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { in.foldLeft(Promise.successful(cbf(in)).future) { (fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a) } map (_.result) @@ -555,7 +557,7 @@ object Future { /** Returns a `Future` to the result of the first future in the list that is completed. */ - def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = { + def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { val p = Promise[T]() val completeFirst: Either[Throwable, T] => Unit = p tryComplete _ @@ -566,7 +568,8 @@ object Future { /** Returns a `Future` that will hold the optional result of the first `Future` with a result that matches the predicate. */ - def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { + def find[T](futurestravonce: TraversableOnce[Future[T]])(predicate: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { + val futures = futurestravonce.toBuffer if (futures.isEmpty) Promise.successful[Option[T]](None).future else { val result = Promise[Option[T]]() @@ -577,8 +580,9 @@ object Future { case _ => } } finally { - if (ref.decrementAndGet == 0) + if (ref.decrementAndGet == 0) { result tryComplete Right(None) + } } futures.foreach(_ onComplete search) @@ -597,7 +601,7 @@ object Future { * val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds) * }}} */ - def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { + def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { if (futures.isEmpty) Promise.successful(zero).future else sequence(futures).map(_.foldLeft(zero)(foldFun)) } @@ -609,12 +613,12 @@ object Future { * val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds) * }}} */ - def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { + def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")).future else sequence(futures).map(_ reduceLeft op) } - /** Transforms a `Traversable[A]` into a `Future[Traversable[B]]` using the provided function `A => Future[B]`. + /** Transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]` using the provided function `A => Future[B]`. * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel: * @@ -622,7 +626,7 @@ object Future { * val myFutureList = Future.traverse(myList)(x => Future(myFunc(x))) * }}} */ - def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = + def traverse[A, B, M[_] <: TraversableOnce[_]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = in.foldLeft(Promise.successful(cbf(in)).future) { (fr, a) => val fb = fn(a.asInstanceOf[A]) for (r <- fr; b <- fb) yield (r += b) diff --git a/src/library/scala/concurrent/util/Duration.scala b/src/library/scala/concurrent/util/Duration.scala index 0e39fdffa3..bab664727e 100644 --- a/src/library/scala/concurrent/util/Duration.scala +++ b/src/library/scala/concurrent/util/Duration.scala @@ -1,12 +1,16 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> - */ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ package scala.concurrent.util import java.util.concurrent.TimeUnit import TimeUnit._ -import java.lang.{ Double ⇒ JDouble } +import java.lang.{ Double => JDouble } import language.implicitConversions case class Deadline private (time: Duration) { diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala index 90ed10b93a..9a9cf951bb 100644 --- a/test/files/jvm/future-spec/FutureTests.scala +++ b/test/files/jvm/future-spec/FutureTests.scala @@ -162,10 +162,12 @@ object FutureTests extends MinimalScalaTest { } "firstCompletedOf" in { - val futures = Vector.fill[Future[Int]](10) { + def futures = Vector.fill[Future[Int]](10) { Promise[Int]().future } :+ Promise.successful[Int](5).future + Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5) + Await.result(Future.firstCompletedOf(futures.iterator), defaultTimeout) mustBe (5) } "find" in { @@ -176,7 +178,7 @@ object FutureTests extends MinimalScalaTest { val result = Future.find[Int](futures)(_ == 3) Await.result(result, defaultTimeout) mustBe (Some(3)) - val notFound = Future.find[Int](futures)(_ == 11) + val notFound = Future.find[Int](futures.iterator)(_ == 11) Await.result(notFound, defaultTimeout) mustBe (None) } @@ -208,11 +210,18 @@ object FutureTests extends MinimalScalaTest { Thread.sleep(wait) add } - def futures = (0 to 9) map { + + val futures = (0 to 9) map { idx => async(idx, idx * 200) } - def folded = Future.fold(futures)(0)(_ + _) + val folded = Future.fold(futures)(0)(_ + _) Await.result(folded, timeout) mustBe (45) + + val futuresit = (0 to 9) map { + idx => async(idx, idx * 200) + } + val foldedit = Future.fold(futures)(0)(_ + _) + Await.result(foldedit, timeout) mustBe (45) } "fold by composing" in { @@ -273,9 +282,14 @@ object FutureTests extends MinimalScalaTest { idx } val timeout = 10000 millis + val futures = (0 to 9) map { async } val reduced = Future.reduce(futures)(_ + _) Await.result(reduced, timeout) mustBe (45) + + val futuresit = (0 to 9) map { async } + val reducedit = Future.reduce(futuresit)(_ + _) + Await.result(reducedit, timeout) mustBe (45) } "shouldReduceResultsWithException" in { @@ -310,13 +324,17 @@ object FutureTests extends MinimalScalaTest { } } - val oddFutures = List.fill(100)(future { counter.incAndGet() }) + val oddFutures = List.fill(100)(future { counter.incAndGet() }).iterator val traversed = Future.sequence(oddFutures) Await.result(traversed, defaultTimeout).sum mustBe (10000) val list = (1 to 100).toList val traversedList = Future.traverse(list)(x => Future(x * 2 - 1)) Await.result(traversedList, defaultTimeout).sum mustBe (10000) + + val iterator = (1 to 100).toList.iterator + val traversedIterator = Future.traverse(iterator)(x => Future(x * 2 - 1)) + Await.result(traversedIterator, defaultTimeout).sum mustBe (10000) } "shouldHandleThrowables" in { @@ -491,236 +509,3 @@ object FutureTests extends MinimalScalaTest { } -object PromiseTests extends MinimalScalaTest { - - val defaultTimeout = Inf - - /* promise specification */ - - "An empty Promise" should { - - "not be completed" in { - val p = Promise() - p.future.isCompleted mustBe (false) - } - - "have no value" in { - val p = Promise() - p.future.value mustBe (None) - } - - "return supplied value on timeout" in { - val failure = Promise.failed[String](new RuntimeException("br0ken")).future - val otherFailure = Promise.failed[String](new RuntimeException("last")).future - val empty = Promise[String]().future - val timedOut = Promise.successful[String]("Timedout").future - - Await.result(failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout") - Await.result(timedOut fallbackTo empty, defaultTimeout) mustBe ("Timedout") - Await.result(failure fallbackTo failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout") - intercept[RuntimeException] { - Await.result(failure fallbackTo otherFailure, defaultTimeout) - }.getMessage mustBe ("last") - } - - } - - "A successful Promise" should { - val result = "test value" - val future = Promise[String]().complete(Right(result)).future - futureWithResult(_(future, result)) - } - - "A failed Promise" should { - val message = "Expected Exception" - val future = Promise[String]().complete(Left(new RuntimeException(message))).future - futureWithException[RuntimeException](_(future, message)) - } - - "An interrupted Promise" should { - val message = "Boxed InterruptedException" - val future = Promise[String]().complete(Left(new InterruptedException(message))).future - futureWithException[ExecutionException](_(future, message)) - } - - "A NonLocalReturnControl failed Promise" should { - val result = "test value" - val future = Promise[String]().complete(Left(new NonLocalReturnControl[String]("test", result))).future - futureWithResult(_(future, result)) - } - - def futureWithResult(f: ((Future[Any], Any) => Unit) => Unit) { - - "be completed" in { f((future, _) => future.isCompleted mustBe (true)) } - - "contain a value" in { f((future, result) => future.value mustBe (Some(Right(result)))) } - - "return result with 'blocking'" in { f((future, result) => blocking(future, defaultTimeout) mustBe (result)) } - - "return result with 'Await.result'" in { f((future, result) => Await.result(future, defaultTimeout) mustBe (result)) } - - "not timeout" in { f((future, _) => Await.ready(future, 0 millis)) } - - "filter result" in { - f { - (future, result) => - Await.result((future filter (_ => true)), defaultTimeout) mustBe (result) - intercept[NoSuchElementException] { - Await.result((future filter (_ => false)), defaultTimeout) - } - } - } - - "transform result with map" in { f((future, result) => Await.result((future map (_.toString.length)), defaultTimeout) mustBe (result.toString.length)) } - - "compose result with flatMap" in { - f { (future, result) => - val r = for (r <- future; p <- Promise.successful("foo").future) yield r.toString + p - Await.result(r, defaultTimeout) mustBe (result.toString + "foo") - } - } - - "perform action with foreach" in { - f { - (future, result) => - val p = Promise[Any]() - future foreach p.success - Await.result(p.future, defaultTimeout) mustBe (result) - } - } - - "zip properly" in { - f { - (future, result) => - Await.result(future zip Promise.successful("foo").future, defaultTimeout) mustBe ((result, "foo")) - intercept[RuntimeException] { - Await.result(future zip Promise.failed(new RuntimeException("ohnoes")).future, defaultTimeout) - }.getMessage mustBe ("ohnoes") - } - } - - "not recover from exception" in { f((future, result) => Await.result(future.recover({ case _ => "pigdog" }), defaultTimeout) mustBe (result)) } - - "perform action on result" in { - f { - (future, result) => - val p = Promise[Any]() - future.onSuccess { case x => p.success(x) } - Await.result(p.future, defaultTimeout) mustBe (result) - } - } - - "not project a failure" in { - f { - (future, result) => - intercept[NoSuchElementException] { - Await.result(future.failed, defaultTimeout) - }.getMessage mustBe ("Future.failed not completed with a throwable. Instead completed with: " + result) - } - } - - "cast using mapTo" in { - f { - (future, result) => - Await.result(future.mapTo[Boolean].recover({ case _: ClassCastException ⇒ false }), defaultTimeout) mustBe (false) - } - } - - } - - def futureWithException[E <: Throwable: Manifest](f: ((Future[Any], String) => Unit) => Unit) { - - "be completed" in { - f((future, _) => future.isCompleted mustBe (true)) - } - - "contain a value" in { - f((future, message) => { - future.value.get.left.get.getMessage mustBe (message) - }) - } - - "throw exception with 'blocking'" in { - f { - (future, message) => - intercept[E] { - blocking(future, defaultTimeout) - }.getMessage mustBe (message) - } - } - - "throw exception with 'Await.result'" in { - f { - (future, message) => - intercept[E] { - Await.result(future, defaultTimeout) - }.getMessage mustBe (message) - } - } - - "retain exception with filter" in { - f { - (future, message) => - intercept[E] { Await.result(future filter (_ => true), defaultTimeout) }.getMessage mustBe (message) - intercept[E] { Await.result(future filter (_ => false), defaultTimeout) }.getMessage mustBe (message) - } - } - - "retain exception with map" in { - f { - (future, message) => - intercept[E] { Await.result(future map (_.toString.length), defaultTimeout) }.getMessage mustBe (message) - } - } - - "retain exception with flatMap" in { - f { - (future, message) => - intercept[E] { Await.result(future flatMap (_ => Promise.successful("foo").future), defaultTimeout) }.getMessage mustBe (message) - } - } - - "zip properly" in { - f { - (future, message) => - intercept[E] { - Await.result(future zip Promise.successful("foo").future, defaultTimeout) - }.getMessage mustBe (message) - } - } - - "recover from exception" in { - f { - (future, message) => - Await.result(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), defaultTimeout) mustBe ("pigdog") - } - } - - "project a failure" in { - f((future, message) => Await.result(future.failed, defaultTimeout).getMessage mustBe (message)) - } - - "perform action on exception" in { - f { - (future, message) => - val p = Promise[Any]() - future.onFailure { case _ => p.success(message) } - Await.result(p.future, defaultTimeout) mustBe (message) - } - } - - "always cast successfully using mapTo" in { - f { - (future, message) => - intercept[E] { Await.result(future.mapTo[java.lang.Thread], defaultTimeout) }.getMessage mustBe (message) - } - } - } -} - - - - - - - diff --git a/test/files/jvm/future-spec/PromiseTests.scala b/test/files/jvm/future-spec/PromiseTests.scala new file mode 100644 index 0000000000..6016746a23 --- /dev/null +++ b/test/files/jvm/future-spec/PromiseTests.scala @@ -0,0 +1,244 @@ + + + +import scala.concurrent._ +import scala.concurrent.util.duration._ +import scala.concurrent.util.Duration.Inf +import scala.collection._ +import scala.runtime.NonLocalReturnControl + + + +object PromiseTests extends MinimalScalaTest { + + val defaultTimeout = Inf + + /* promise specification */ + + "An empty Promise" should { + + "not be completed" in { + val p = Promise() + p.future.isCompleted mustBe (false) + } + + "have no value" in { + val p = Promise() + p.future.value mustBe (None) + } + + "return supplied value on timeout" in { + val failure = Promise.failed[String](new RuntimeException("br0ken")).future + val otherFailure = Promise.failed[String](new RuntimeException("last")).future + val empty = Promise[String]().future + val timedOut = Promise.successful[String]("Timedout").future + + Await.result(failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout") + Await.result(timedOut fallbackTo empty, defaultTimeout) mustBe ("Timedout") + Await.result(failure fallbackTo failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout") + intercept[RuntimeException] { + Await.result(failure fallbackTo otherFailure, defaultTimeout) + }.getMessage mustBe ("last") + } + + } + + "A successful Promise" should { + val result = "test value" + val future = Promise[String]().complete(Right(result)).future + futureWithResult(_(future, result)) + } + + "A failed Promise" should { + val message = "Expected Exception" + val future = Promise[String]().complete(Left(new RuntimeException(message))).future + futureWithException[RuntimeException](_(future, message)) + } + + "An interrupted Promise" should { + val message = "Boxed InterruptedException" + val future = Promise[String]().complete(Left(new InterruptedException(message))).future + futureWithException[ExecutionException](_(future, message)) + } + + "A NonLocalReturnControl failed Promise" should { + val result = "test value" + val future = Promise[String]().complete(Left(new NonLocalReturnControl[String]("test", result))).future + futureWithResult(_(future, result)) + } + + def futureWithResult(f: ((Future[Any], Any) => Unit) => Unit) { + + "be completed" in { f((future, _) => future.isCompleted mustBe (true)) } + + "contain a value" in { f((future, result) => future.value mustBe (Some(Right(result)))) } + + "return result with 'blocking'" in { f((future, result) => blocking(future, defaultTimeout) mustBe (result)) } + + "return result with 'Await.result'" in { f((future, result) => Await.result(future, defaultTimeout) mustBe (result)) } + + "not timeout" in { f((future, _) => Await.ready(future, 0 millis)) } + + "filter result" in { + f { + (future, result) => + Await.result((future filter (_ => true)), defaultTimeout) mustBe (result) + intercept[NoSuchElementException] { + Await.result((future filter (_ => false)), defaultTimeout) + } + } + } + + "transform result with map" in { f((future, result) => Await.result((future map (_.toString.length)), defaultTimeout) mustBe (result.toString.length)) } + + "compose result with flatMap" in { + f { (future, result) => + val r = for (r <- future; p <- Promise.successful("foo").future) yield r.toString + p + Await.result(r, defaultTimeout) mustBe (result.toString + "foo") + } + } + + "perform action with foreach" in { + f { + (future, result) => + val p = Promise[Any]() + future foreach p.success + Await.result(p.future, defaultTimeout) mustBe (result) + } + } + + "zip properly" in { + f { + (future, result) => + Await.result(future zip Promise.successful("foo").future, defaultTimeout) mustBe ((result, "foo")) + intercept[RuntimeException] { + Await.result(future zip Promise.failed(new RuntimeException("ohnoes")).future, defaultTimeout) + }.getMessage mustBe ("ohnoes") + } + } + + "not recover from exception" in { f((future, result) => Await.result(future.recover({ case _ => "pigdog" }), defaultTimeout) mustBe (result)) } + + "perform action on result" in { + f { + (future, result) => + val p = Promise[Any]() + future.onSuccess { case x => p.success(x) } + Await.result(p.future, defaultTimeout) mustBe (result) + } + } + + "not project a failure" in { + f { + (future, result) => + intercept[NoSuchElementException] { + Await.result(future.failed, defaultTimeout) + }.getMessage mustBe ("Future.failed not completed with a throwable. Instead completed with: " + result) + } + } + + "cast using mapTo" in { + f { + (future, result) => + Await.result(future.mapTo[Boolean].recover({ case _: ClassCastException ⇒ false }), defaultTimeout) mustBe (false) + } + } + + } + + def futureWithException[E <: Throwable: Manifest](f: ((Future[Any], String) => Unit) => Unit) { + + "be completed" in { + f((future, _) => future.isCompleted mustBe (true)) + } + + "contain a value" in { + f((future, message) => { + future.value.get.left.get.getMessage mustBe (message) + }) + } + + "throw exception with 'blocking'" in { + f { + (future, message) => + intercept[E] { + blocking(future, defaultTimeout) + }.getMessage mustBe (message) + } + } + + "throw exception with 'Await.result'" in { + f { + (future, message) => + intercept[E] { + Await.result(future, defaultTimeout) + }.getMessage mustBe (message) + } + } + + "retain exception with filter" in { + f { + (future, message) => + intercept[E] { Await.result(future filter (_ => true), defaultTimeout) }.getMessage mustBe (message) + intercept[E] { Await.result(future filter (_ => false), defaultTimeout) }.getMessage mustBe (message) + } + } + + "retain exception with map" in { + f { + (future, message) => + intercept[E] { Await.result(future map (_.toString.length), defaultTimeout) }.getMessage mustBe (message) + } + } + + "retain exception with flatMap" in { + f { + (future, message) => + intercept[E] { Await.result(future flatMap (_ => Promise.successful("foo").future), defaultTimeout) }.getMessage mustBe (message) + } + } + + "zip properly" in { + f { + (future, message) => + intercept[E] { + Await.result(future zip Promise.successful("foo").future, defaultTimeout) + }.getMessage mustBe (message) + } + } + + "recover from exception" in { + f { + (future, message) => + Await.result(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), defaultTimeout) mustBe ("pigdog") + } + } + + "project a failure" in { + f((future, message) => Await.result(future.failed, defaultTimeout).getMessage mustBe (message)) + } + + "perform action on exception" in { + f { + (future, message) => + val p = Promise[Any]() + future.onFailure { case _ => p.success(message) } + Await.result(p.future, defaultTimeout) mustBe (message) + } + } + + "always cast successfully using mapTo" in { + f { + (future, message) => + intercept[E] { Await.result(future.mapTo[java.lang.Thread], defaultTimeout) }.getMessage mustBe (message) + } + } + } +} + + + + + + + |