Eliminated all the trailing whitespace I could manage, with special prejudice reserved for the test cases which depended on the preservation of trailing whitespace. Was reminded I cannot figure out how to eliminate the trailing space on the "scala> " prompt in repl transcripts. At least reduced the number of such empty prompts by trimming transcript code on the way in. Routed ConsoleReporter's "printMessage" through a trailing whitespace stripping method which might help futureproof against the future of whitespace diseases. Deleted the up-to-40 lines of trailing whitespace found in various library files. It seems like only yesterday we performed whitespace surgery on the whole repo. Clearly it doesn't stick very well. I suggest it would work better to enforce a few requirements on the way in.
object FutureTests extends MinimalScalaTest {
/* some utils */
def testAsync(s: String)(implicit ec: ExecutionContext): Future[String] = s match {
case "Hello" => future { "World" }
case "Failure" => Future.failed(new RuntimeException("Expected exception; to test fault-tolerance"))
case "NoReply" => Promise[String]().future
val defaultTimeout = 5 seconds
/* future specification */
"A future with custom ExecutionContext" should {
t =>
ms += t
class ThrowableTest(m: String) extends Throwable(m)
val f1 = future[Any] {
throw new ThrowableTest("test")
intercept[ThrowableTest] {
Await.result(f1, defaultTimeout)
val latch = new TestLatch
val f2 = future {
Await.ready(latch, 5 seconds)
val f3 = f2 map { s => s.toUpperCase }
f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") }
f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") }
Await.result(f2, defaultTimeout) mustBe ("success")
f2 foreach { _ => throw new ThrowableTest("current thread foreach") }
f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") }
Await.result(f3, defaultTimeout) mustBe ("SUCCESS")
val waiting = future {
Await.ready(waiting, 2000 millis)
ms.size mustBe (4)
//FIXME should check
val future0 = future[Any] {
val future1 = for {
a <- future0.mapTo[Int] // returns 5
b <- async(a) // returns "10"
c <- async(7) // returns "14"
} yield b + "-" + c
val future2 = for {
a <- future0.mapTo[Int]
b <- (future { (a * 2).toString }).mapTo[Int]
} yield b + "-" + c
Await.result(future1, defaultTimeout) mustBe ("10-14")
assert(checkType(future1, manifest[String]))
intercept[ClassCastException] { Await.result(future2, defaultTimeout) }
"support pattern matching within a for-comprehension" in {
case class Req[T](req: T)
case class Res[T](res: T)
case Req(s: String) => future { Res(s.length) }
case Req(i: Int) => future { Res((i * 2).toString) }
val future1 = for {
Res(a: Int) <- async(Req("Hello"))
Res(b: String) <- async(Req(a))
Res(c: String) <- async(Req(7))
} yield b + "-" + c
val future2 = for {
Res(a: Int) <- async(Req("Hello"))
Res(b: Int) <- async(Req(a))
Res(c: Int) <- async(Req(7))
} yield b + "-" + c
Await.result(future1, defaultTimeout) mustBe ("10-14")
intercept[NoSuchElementException] { Await.result(future2, defaultTimeout) }
"recover from exceptions" in {
val future1 = Future(5)
val future2 = future1 map (_ / 0)
val future3 = future2 map (_.toString)
val future4 = future1 recover {
case e: ArithmeticException => 0
} map (_.toString)
val future5 = future2 recover {
case e: ArithmeticException => 0
} map (_.toString)
val future6 = future2 recover {
case e: MatchError => 0
} map (_.toString)
val future7 = future3 recover {
case e: ArithmeticException => "You got ERROR"
val future8 = testAsync("Failure")
val future9 = testAsync("Failure") recover {
case e: RuntimeException => "FAIL!"
val future11 = testAsync("Failure") recover {
case _ => "Oops!"
Await.result(future1, defaultTimeout) mustBe (5)
intercept[ArithmeticException] { Await.result(future2, defaultTimeout) }
intercept[ArithmeticException] { Await.result(future3, defaultTimeout) }
@@ -196,23 +196,23 @@ object FutureTests extends MinimalScalaTest {
Await.result(future10, defaultTimeout) mustBe ("World")
Await.result(future11, defaultTimeout) mustBe ("Oops!")
"recoverWith from exceptions" in {
val o = new IllegalStateException("original")
val r = new IllegalStateException("recovered")
intercept[IllegalStateException] {
val failed = Future.failed[String](o) recoverWith {
case _ if false == true => Future.successful("yay!")
Await.result(failed, defaultTimeout)
} mustBe (o)
val recovered = Future.failed[String](o) recoverWith {
case _ => Future.successful("yay!")
Await.result(recovered, defaultTimeout) mustBe ("yay!")
intercept[IllegalStateException] {
val refailed = Future.failed[String](o) recoverWith {
case _ => Future.failed[String](r)
Await.result(refailed, defaultTimeout)
} mustBe (r)
"andThen like a boss" in {
val q = new java.util.concurrent.LinkedBlockingQueue[Int]
for (i <- 1 to 1000) {
"firstCompletedOf" in {
def futures = Vector.fill[Future[Int]](10) {
} :+ Future.successful[Int](5)
Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5)
Await.result(Future.firstCompletedOf(futures.iterator), defaultTimeout) mustBe (5)
"find" in {
val futures = for (i <- 1 to 10) yield future {
val result = Future.find[Int](futures)(_ == 3)
Await.result(result, defaultTimeout) mustBe (Some(3))
val notFound = Future.find[Int](futures.iterator)(_ == 11)
Await.result(notFound, defaultTimeout) mustBe (None)
"zip" in {
val timeout = 10000 millis
val f = new IllegalStateException("test")
val failed = Future.failed[String](f) zip Future.successful("foo")
Await.result(failed, timeout)
} mustBe (f)
intercept[IllegalStateException] {
val failed = Future.successful("foo") zip Future.failed[String](f)
Await.result(failed, timeout)
} mustBe (f)
intercept[IllegalStateException] {
val failed = Future.failed[String](f) zip Future.failed[String](f)
Await.result(failed, timeout)
} mustBe (f)
val successful = Future.successful("foo") zip Future.successful("foo")
Await.result(successful, timeout) mustBe (("foo", "foo"))
"fold" in {
val timeout = 10000 millis
def async(add: Int, wait: Int) = future {
val futures = (0 to 9) map {
idx => async(idx, idx * 20)
val folded = Future.fold(futures)(0)(_ + _)
Await.result(folded, timeout) mustBe (45)
val futuresit = (0 to 9) map {
idx => async(idx, idx * 20)
val foldedit = Future.fold(futures)(0)(_ + _)
Await.result(foldedit, timeout) mustBe (45)
"fold by composing" in {
val timeout = 10000 millis
def async(add: Int, wait: Int) = future {
idx => async(idx, idx * 20)
val folded = futures.foldLeft(Future(0)) {
@@ -318,7 +318,7 @@ object FutureTests extends MinimalScalaTest {
Await.result(folded, timeout) mustBe (45)
"fold with an exception" in {
val timeout = 10000 millis
def async(add: Int, wait: Int) = future {
Await.result(folded, timeout)
}.getMessage mustBe ("shouldFoldResultsWithException: expected")
"fold mutable zeroes safely" in {
import scala.collection.mutable.ArrayBuffer
def test(testNumber: Int) {
case (l, _) => l
val result = Await.result(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum
assert(result == 250500)
(1 to 100) foreach test //Make sure it tries to provoke the problem
"return zero value if folding empty list" in {
val zero = Future.fold(List[Future[Int]]())(0)(_ + _)
Await.result(zero, defaultTimeout) mustBe (0)
"shouldReduceResults" in {
def async(idx: Int) = future {
Thread.sleep(idx * 20)
val timeout = 10000 millis
val futures = (0 to 9) map { async }
val reduced = Future.reduce(futures)(_ + _)
Await.result(reduced, timeout) mustBe (45)
val futuresit = (0 to 9) map { async }
val reducedit = Future.reduce(futuresit)(_ + _)
Await.result(reducedit, timeout) mustBe (45)
"shouldReduceResultsWithException" in {
def async(add: Int, wait: Int) = future {
Await.result(failed, timeout)
}.getMessage mustBe ("shouldFoldResultsWithException: expected")
"shouldReduceThrowNSEEOnEmptyInput" in {
intercept[java.util.NoSuchElementException] {
val emptyreduced = Future.reduce(List[Future[Int]]())(_ + _)
Await.result(emptyreduced, defaultTimeout)
"shouldTraverseFutures" in {
object counter {
var count = -1
@@ -403,23 +403,23 @@ object FutureTests extends MinimalScalaTest {
val oddFutures = List.fill(100)(future { counter.incAndGet() }).iterator
val traversed = Future.sequence(oddFutures)
Await.result(traversed, defaultTimeout).sum mustBe (10000)
val list = (1 to 100).toList
val traversedList = Future.traverse(list)(x => Future(x * 2 - 1))
Await.result(traversedList, defaultTimeout).sum mustBe (10000)
val iterator = (1 to 100).toList.iterator
val traversedIterator = Future.traverse(iterator)(x => Future(x * 2 - 1))
Await.result(traversedIterator, defaultTimeout).sum mustBe (10000)
"shouldBlockUntilResult" in {
val latch = new TestLatch
val f = future {
Await.ready(latch, 5 seconds)
val res = Await.result(f, Inf)
res + 9
intercept[TimeoutException] {
Await.ready(f2, 100 millis)
Await.result(f2, defaultTimeout) mustBe (14)
val f3 = future {
intercept[TimeoutException] {
Await.ready(f3, 0 millis)
"run callbacks async" in {
val latch = Vector.fill(10)(new TestLatch)
val f1 = future {
Await.ready(latch(1), TestLatch.DefaultTimeout)
for (_ <- f2) latch(4).open()
Await.ready(latch(0), TestLatch.DefaultTimeout)
f1.isCompleted mustBe (false)
f2.isCompleted mustBe (false)
Await.ready(latch(2), TestLatch.DefaultTimeout)
f1.isCompleted mustBe (true)
f2.isCompleted mustBe (false)
val f3 = f1 map {
s =>
s.length * 2
for (_ <- f3) latch(3).open()
Await.ready(latch(5), TestLatch.DefaultTimeout)
f3.isCompleted mustBe (false)
Await.ready(latch(4), TestLatch.DefaultTimeout)
f2.isCompleted mustBe (true)
f3.isCompleted mustBe (true)
val p1 = Promise[String]()
val f4 = p1.future map {
s =>
for (_ <- f4) latch(9).open()
p1.future.isCompleted mustBe (false)
f4.isCompleted mustBe (false)
p1 complete Success("Hello")
Await.ready(latch(7), TestLatch.DefaultTimeout)
p1.future.isCompleted mustBe (true)
f4.isCompleted mustBe (false)
Await.ready(latch(9), TestLatch.DefaultTimeout)
Await.ready(f4, defaultTimeout).isCompleted mustBe (true)
"should not deadlock with nested await (ticket 1313)" in {
val simple = Future() map {
_ =>
Await.result(umap, Inf)
Await.ready(simple, Inf).isCompleted mustBe (true)
val l1, l2 = new TestLatch
val complex = Future() map {
_ =>
val f = future(5).map(_ / 0)
Await.ready(f, defaultTimeout).value.get.toString mustBe expected.toString
import ExecutionContext.Implicits._
val defaultTimeout = Inf
/* promise specification */
"An empty Promise" should {
"not be completed" in {
val p = Promise()
p.future.isCompleted mustBe (false)
p.isCompleted mustBe (false)
"have no value" in {
val p = Promise()
p.future.value mustBe (None)
p.isCompleted mustBe (false)
"return supplied value on timeout" in {
val failure = Promise.failed[String](new RuntimeException("br0ken")).future
val otherFailure = Promise.failed[String](new RuntimeException("last")).future
val empty = Promise[String]().future
val timedOut = Promise.successful[String]("Timedout").future
Await.result(failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout")
Await.result(timedOut fallbackTo empty, defaultTimeout) mustBe ("Timedout")
Await.result(failure fallbackTo failure fallbackTo timedOut, defaultTimeout) mustBe ("Timedout")
Await.result(failure fallbackTo otherFailure, defaultTimeout)
}.getMessage mustBe ("last")
"A successful Promise" should {
val result = "test value"
val promise = Promise[String]().complete(Success(result))
promise.isCompleted mustBe (true)
futureWithResult(_(promise.future, result))
"A failed Promise" should {
val message = "Expected Exception"
val promise = Promise[String]().complete(Failure(new RuntimeException(message)))
promise.isCompleted mustBe (true)
futureWithException[RuntimeException](_(promise.future, message))
"An interrupted Promise" should {
val message = "Boxed InterruptedException"
val future = Promise[String]().complete(Failure(new InterruptedException(message))).future
futureWithException[ExecutionException](_(future, message))
"A NonLocalReturnControl failed Promise" should {
val result = "test value"
val future = Promise[String]().complete(Failure(new NonLocalReturnControl[String]("test", result))).future
futureWithResult(_(future, result))
def futureWithResult(f: ((Future[Any], Any) => Unit) => Unit) {
"be completed" in { f((future, _) => future.isCompleted mustBe (true)) }
"contain a value" in { f((future, result) => future.value mustBe (Some(Success(result)))) }
"return when ready with 'Await.ready'" in { f((future, result) => Await.ready(future, defaultTimeout).isCompleted mustBe (true)) }
"return result with 'Await.result'" in { f((future, result) => Await.result(future, defaultTimeout) mustBe (result)) }
"not timeout" in { f((future, _) => Await.ready(future, 0 millis)) }
"filter result" in {
f {
(future, result) =>
"transform result with map" in { f((future, result) => Await.result((future map (_.toString.length)), defaultTimeout) mustBe (result.toString.length)) }
"compose result with flatMap" in {
f { (future, result) =>
val r = for (r <- future; p <- Promise.successful("foo").future) yield r.toString + p
Await.result(r, defaultTimeout) mustBe (result.toString + "foo")
"perform action with foreach" in {
f {
(future, result) =>
@@ -111,7 +111,7 @@ object PromiseTests extends MinimalScalaTest {
Await.result(p.future, defaultTimeout) mustBe (result)
"zip properly" in {
f {
(future, result) =>
}.getMessage mustBe ("ohnoes")
"not recover from exception" in { f((future, result) => Await.result(future.recover({ case _ => "pigdog" }), defaultTimeout) mustBe (result)) }
"perform action on result" in {
f {
(future, result) =>
Await.result(p.future, defaultTimeout) mustBe (result)
"not project a failure" in {
f {
(future, result) =>
}.getMessage mustBe ("Future.failed not completed with a throwable.")
"cast using mapTo" in {
f {
(future, result) =>
Await.result(future.mapTo[Boolean].recover({ case _: ClassCastException ⇒ false }), defaultTimeout) mustBe (false)
def futureWithException[E <: Throwable: Manifest](f: ((Future[Any], String) => Unit) => Unit) {
"be completed" in {
f((future, _) => future.isCompleted mustBe (true))
"contain a value" in {
f((future, message) => {
future.value.get.failed.get.getMessage mustBe (message)
"throw not throw exception with 'Await.ready'" in {
f {
(future, message) => Await.ready(future, defaultTimeout).isCompleted mustBe (true)
"throw exception with 'Await.result'" in {
f {
(future, message) =>
}.getMessage mustBe (message)
"retain exception with filter" in {
f {
(future, message) =>
intercept[E] { Await.result(future filter (_ => false), defaultTimeout) }.getMessage mustBe (message)
"retain exception with map" in {
f {
(future, message) =>
"retain exception with flatMap" in {
f {
(future, message) =>
"zip properly" in {
f {
(future, message) =>
}.getMessage mustBe (message)
"recover from exception" in {
f {
(future, message) =>
Await.result(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), defaultTimeout) mustBe ("pigdog")
"project a failure" in {
f((future, message) => Await.result(future.failed, defaultTimeout).getMessage mustBe (message))
"perform action on exception" in {
f {
(future, message) =>
Await.result(p.future, defaultTimeout) mustBe (message)
"always cast successfully using mapTo" in {
f {
(future, message) =>
object Test {
def main(args: Array[String]) {
trait Features {
trait Output {
val buffer = new StringBuilder
def bufferPrintln(a: Any) = buffer.synchronized {
buffer.append(a.toString + "\n")
trait MinimalScalaTest extends Output with Features {
val throwables = mutable.ArrayBuffer[Throwable]()
def check() {
if (throwables.nonEmpty) println(buffer.toString)
implicit def stringops(s: String) = new {
def should[U](snippets: =>U) = {
bufferPrintln(s + " should:")
def in[U](snippet: =>U) = {
try {
bufferPrintln("- " + s)
throwables += e
implicit def objectops(obj: Any) = new {
def mustBe(other: Any) = assert(obj == other, obj + " is not " + other)
def mustEqual(other: Any) = mustBe(other)
def intercept[T <: Throwable: Manifest](body: =>Any): T = {
try {
else t.asInstanceOf[T]
def checkType[T: Manifest, S](in: Future[T], refmanifest: Manifest[S]): Boolean = manifest[T] == refmanifest
class TestLatch(count: Int = 1) extends Awaitable[Unit] {
private var latch = new CountDownLatch(count)
def countDown() = latch.countDown()
def isOpen: Boolean = latch.getCount == 0
def open() = while (!isOpen) countDown()
def reset() = latch = new CountDownLatch(count)
def ready(atMost: Duration)(implicit permit: CanAwait) = {
val opened = latch.await(atMost.toNanos, TimeUnit.NANOSECONDS)
if (!opened) throw new TimeoutException("Timeout of %s." format (atMost.toString))
def result(atMost: Duration)(implicit permit: CanAwait): Unit = {