diff options
author | Aleksandar Prokopec <axel22@gmail.com> | 2012-04-27 19:00:59 +0200 |
---|---|---|
committer | Aleksandar Prokopec <axel22@gmail.com> | 2012-04-27 19:00:59 +0200 |
commit | 9c4baa93d906b161f501ae04f1552e1b7d448436 (patch) | |
tree | 4b11bf4193a08598c7d6d13a5803ac1bc385dedf /test | |
parent | 8fc543b5dd7e6a8fa1827cc9e9d65e721cae140e (diff) | |
download | scala-9c4baa93d906b161f501ae04f1552e1b7d448436.tar.gz scala-9c4baa93d906b161f501ae04f1552e1b7d448436.tar.bz2 scala-9c4baa93d906b161f501ae04f1552e1b7d448436.zip |
Porting akka future tests.
Fixed a bug in Future.zip.
Diffstat (limited to 'test')
-rw-r--r-- | test/files/jvm/future-spec/FutureTests.scala | 364 | ||||
-rw-r--r-- | test/files/jvm/future-spec/main.scala | 107 |
2 files changed, 471 insertions, 0 deletions
diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala new file mode 100644 index 0000000000..aa308945a1 --- /dev/null +++ b/test/files/jvm/future-spec/FutureTests.scala @@ -0,0 +1,364 @@ + + + +import scala.concurrent._ +import scala.concurrent.util.duration._ +import scala.concurrent.util.Duration.Inf + + + +object FutureTests extends MinimalScalaTest { + + /* some utils */ + + def testAsync(s: String): Future[String] = s match { + case "Hello" => future { "World" } + case "Failure" => Promise.failed(new RuntimeException("Expected exception; to test fault-tolerance")).future + case "NoReply" => Promise[String]().future + } + + val defaultTimeout = Inf + + /* future specification */ + + "A future" should { + + "compose with for-comprehensions" in { + def async(x: Int) = future { (x * 2).toString } + val future0 = future[Any] { + "five!".length + } + + val future1 = for { + a <- future0.mapTo[Int] // returns 5 + b <- async(a) // returns "10" + c <- async(7) // returns "14" + } yield b + "-" + c + + val future2 = for { + a <- future0.mapTo[Int] + b <- (future { (a * 2).toString }).mapTo[Int] + c <- future { (7 * 2).toString } + } yield b + "-" + c + + Await.result(future1, defaultTimeout) mustBe ("10-14") + assert(checkType(future1, manifest[String])) + intercept[ClassCastException] { Await.result(future2, defaultTimeout) } + } + + "support pattern matching within a for-comprehension" in { + case class Req[T](req: T) + case class Res[T](res: T) + def async[T](req: Req[T]) = req match { + case Req(s: String) => future { Res(s.length) } + case Req(i: Int) => future { Res((i * 2).toString) } + } + + val future1 = for { + Res(a: Int) <- async(Req("Hello")) + Res(b: String) <- async(Req(a)) + Res(c: String) <- async(Req(7)) + } yield b + "-" + c + + val future2 = for { + Res(a: Int) <- async(Req("Hello")) + Res(b: Int) <- async(Req(a)) + Res(c: Int) <- async(Req(7)) + } yield b + "-" + c + + Await.result(future1, defaultTimeout) mustBe ("10-14") + intercept[NoSuchElementException] { Await.result(future2, defaultTimeout) } + } + + "recover from exceptions" in { + val future1 = Future(5) + val future2 = future1 map (_ / 0) + val future3 = future2 map (_.toString) + + val future4 = future1 recover { + case e: ArithmeticException => 0 + } map (_.toString) + + val future5 = future2 recover { + case e: ArithmeticException => 0 + } map (_.toString) + + val future6 = future2 recover { + case e: MatchError => 0 + } map (_.toString) + + val future7 = future3 recover { + case e: ArithmeticException => "You got ERROR" + } + + val future8 = testAsync("Failure") + val future9 = testAsync("Failure") recover { + case e: RuntimeException => "FAIL!" + } + val future10 = testAsync("Hello") recover { + case e: RuntimeException => "FAIL!" + } + val future11 = testAsync("Failure") recover { + case _ => "Oops!" + } + + Await.result(future1, defaultTimeout) mustBe (5) + intercept[ArithmeticException] { Await.result(future2, defaultTimeout) } + intercept[ArithmeticException] { Await.result(future3, defaultTimeout) } + Await.result(future4, defaultTimeout) mustBe ("5") + Await.result(future5, defaultTimeout) mustBe ("0") + intercept[ArithmeticException] { Await.result(future6, defaultTimeout) } + Await.result(future7, defaultTimeout) mustBe ("You got ERROR") + intercept[RuntimeException] { Await.result(future8, defaultTimeout) } + Await.result(future9, defaultTimeout) mustBe ("FAIL!") + Await.result(future10, defaultTimeout) mustBe ("World") + Await.result(future11, defaultTimeout) mustBe ("Oops!") + } + + "recoverWith from exceptions" in { + val o = new IllegalStateException("original") + val r = new IllegalStateException("recovered") + + intercept[IllegalStateException] { + val failed = Promise.failed[String](o).future recoverWith { + case _ if false == true => Promise.successful("yay!").future + } + Await.result(failed, defaultTimeout) + } mustBe (o) + + val recovered = Promise.failed[String](o).future recoverWith { + case _ => Promise.successful("yay!").future + } + Await.result(recovered, defaultTimeout) mustBe ("yay!") + + intercept[IllegalStateException] { + val refailed = Promise.failed[String](o).future recoverWith { + case _ => Promise.failed[String](r).future + } + Await.result(refailed, defaultTimeout) + } mustBe (r) + } + + "andThen like a boss" in { + val q = new java.util.concurrent.LinkedBlockingQueue[Int] + for (i <- 1 to 1000) { + val chained = future { + q.add(1); 3 + } andThen { + case _ => q.add(2) + } andThen { + case Right(0) => q.add(Int.MaxValue) + } andThen { + case _ => q.add(3); + } + Await.result(chained, defaultTimeout) mustBe (3) + q.poll() mustBe (1) + q.poll() mustBe (2) + q.poll() mustBe (3) + q.clear() + } + } + + "firstCompletedOf" in { + val futures = Vector.fill[Future[Int]](10) { + Promise[Int]().future + } :+ Promise.successful[Int](5).future + Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5) + } + + "find" in { + val futures = for (i <- 1 to 10) yield future { + i + } + + val result = Future.find[Int](futures)(_ == 3) + Await.result(result, defaultTimeout) mustBe (Some(3)) + + val notFound = Future.find[Int](futures)(_ == 11) + Await.result(notFound, defaultTimeout) mustBe (None) + } + + "zip" in { + val timeout = 10000 millis + val f = new IllegalStateException("test") + intercept[IllegalStateException] { + val failed = Promise.failed[String](f).future zip Promise.successful("foo").future + Await.result(failed, timeout) + } mustBe (f) + + intercept[IllegalStateException] { + val failed = Promise.successful("foo").future zip Promise.failed[String](f).future + Await.result(failed, timeout) + } mustBe (f) + + intercept[IllegalStateException] { + val failed = Promise.failed[String](f).future zip Promise.failed[String](f).future + Await.result(failed, timeout) + } mustBe (f) + + val successful = Promise.successful("foo").future zip Promise.successful("foo").future + Await.result(successful, timeout) mustBe (("foo", "foo")) + } + + "fold" in { + val timeout = 10000 millis + def async(add: Int, wait: Int) = future { + Thread.sleep(wait) + add + } + def futures = (0 to 9) map { + idx => async(idx, idx * 200) + } + def folded = Future.fold(futures)(0)(_ + _) + Await.result(folded, timeout) mustBe (45) + } + + "fold by composing" in { + val timeout = 10000 millis + def async(add: Int, wait: Int) = future { + Thread.sleep(wait) + add + } + def futures = (0 to 9) map { + idx => async(idx, idx * 200) + } + val folded = futures.foldLeft(Future(0)) { + case (fr, fa) => for (r <- fr; a <- fa) yield (r + a) + } + Await.result(folded, timeout) mustBe (45) + } + + "fold with an exception" in { + val timeout = 10000 millis + def async(add: Int, wait: Int) = future { + Thread.sleep(wait) + if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") + add + } + def futures = (0 to 9) map { + idx => async(idx, idx * 100) + } + val folded = Future.fold(futures)(0)(_ + _) + intercept[IllegalArgumentException] { + Await.result(folded, timeout) + }.getMessage mustBe ("shouldFoldResultsWithException: expected") + } + + "fold mutable zeroes safely" in { + import scala.collection.mutable.ArrayBuffer + def test(testNumber: Int) { + val fs = (0 to 1000) map (i => Future(i)) + val f = Future.fold(fs)(ArrayBuffer.empty[AnyRef]) { + case (l, i) if i % 2 == 0 => l += i.asInstanceOf[AnyRef] + case (l, _) => l + } + val result = Await.result(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum + + assert(result == 250500) + } + + (1 to 100) foreach test //Make sure it tries to provoke the problem + } + + "return zero value if folding empty list" in { + val zero = Future.fold(List[Future[Int]]())(0)(_ + _) + Await.result(zero, defaultTimeout) mustBe (0) + } + + "shouldReduceResults" in { + def async(idx: Int) = future { + Thread.sleep(idx * 200) + idx + } + val timeout = 10000 millis + val futures = (0 to 9) map { async } + val reduced = Future.reduce(futures)(_ + _) + Await.result(reduced, timeout) mustBe (45) + } + + "shouldReduceResultsWithException" in { + def async(add: Int, wait: Int) = future { + Thread.sleep(wait) + if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected") + else add + } + val timeout = 10000 millis + def futures = (1 to 10) map { + idx => async(idx, idx * 100) + } + val failed = Future.reduce(futures)(_ + _) + intercept[IllegalArgumentException] { + Await.result(failed, timeout) + }.getMessage mustBe ("shouldFoldResultsWithException: expected") + } + + "shouldReduceThrowNSEEOnEmptyInput" in { + intercept[java.util.NoSuchElementException] { + val emptyreduced = Future.reduce(List[Future[Int]]())(_ + _) + Await.result(emptyreduced, defaultTimeout) + } + } + + "shouldTraverseFutures" in { + object counter { + var count = -1 + def incAndGet() = counter.synchronized { + count += 2 + count + } + } + + val oddFutures = List.fill(100)(future { counter.incAndGet() }) + val traversed = Future.sequence(oddFutures) + Await.result(traversed, defaultTimeout).sum mustBe (10000) + + val list = (1 to 100).toList + val traversedList = Future.traverse(list)(x => Future(x * 2 - 1)) + Await.result(traversedList, defaultTimeout).sum mustBe (10000) + } + + /* need configurable execution contexts here + "shouldHandleThrowables" in { + class ThrowableTest(m: String) extends Throwable(m) + + val f1 = future[Any] { + throw new ThrowableTest("test") + } + + intercept[ThrowableTest] { + Await.result(f1, defaultTimeout) + } + + val latch = new TestLatch + val f2 = future { + Await.ready(latch, 5 seconds) + "success" + } + val f3 = f2 map { s => s.toUpperCase } + + f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") } + f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") } + + latch.open() + + Await.result(f2, defaultTimeout) mustBe ("success") + + f2 foreach { _ => throw new ThrowableTest("current thread foreach") } + f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") } + + Await.result(f3, defaultTimeout) mustBe ("SUCCESS") + } + */ + } + +} + + + + + + + + + + diff --git a/test/files/jvm/future-spec/main.scala b/test/files/jvm/future-spec/main.scala new file mode 100644 index 0000000000..053dfbdff7 --- /dev/null +++ b/test/files/jvm/future-spec/main.scala @@ -0,0 +1,107 @@ + + + +import scala.collection._ +import scala.concurrent._ +import scala.concurrent.util.Duration +import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit } + + +object Test { + + def main(args: Array[String]) { + FutureTests.check() + } + +} + + +trait Output { + val buffer = new StringBuilder + + def bufferPrintln(a: Any) = buffer.synchronized { + buffer.append(a.toString + "\n") + } +} + + +trait MinimalScalaTest extends Output { + + val throwables = mutable.ArrayBuffer[Throwable]() + + def check() { + if (throwables.nonEmpty) println(buffer.toString) + } + + implicit def stringops(s: String) = new { + + def should[U](snippets: =>U) = { + bufferPrintln(s + " should:") + snippets + } + + def in[U](snippet: =>U) = { + try { + bufferPrintln("- " + s) + snippet + bufferPrintln("[OK] Test passed.") + } catch { + case e => + bufferPrintln("[FAILED] " + e) + bufferPrintln(e.getStackTrace().mkString("\n")) + throwables += e + } + } + + } + + implicit def objectops(obj: Any) = new { + + def mustBe(other: Any) = assert(obj == other, obj + " is not " + other) + + } + + def intercept[T <: Throwable: Manifest](body: =>Any): T = { + try { + body + throw new Exception("Exception of type %s was not thrown".format(manifest[T])) + } catch { + case t: Throwable => + if (manifest[T].erasure != t.getClass) throw t + else t.asInstanceOf[T] + } + } + + def checkType[T: Manifest, S](in: Future[T], refmanifest: Manifest[S]): Boolean = manifest[T] == refmanifest +} + + +object TestLatch { + val DefaultTimeout = Duration(5, TimeUnit.SECONDS) + + def apply(count: Int = 1) = new TestLatch(count) +} + + +class TestLatch(count: Int = 1) extends Awaitable[Unit] { + private var latch = new CountDownLatch(count) + + def countDown() = latch.countDown() + def isOpen: Boolean = latch.getCount == 0 + def open() = while (!isOpen) countDown() + def reset() = latch = new CountDownLatch(count) + + @throws(classOf[TimeoutException]) + def ready(atMost: Duration)(implicit permit: CanAwait) = { + val opened = latch.await(atMost.toNanos, TimeUnit.NANOSECONDS) + if (!opened) throw new TimeoutException("Timeout of %s." format (atMost.toString)) + this + } + + @throws(classOf[Exception]) + def result(atMost: Duration)(implicit permit: CanAwait): Unit = { + ready(atMost) + } + +} + |