summaryrefslogblamecommitdiff
path: root/test/files/jvm/future-spec/FutureTests.scala
blob: aa308945a15afe79b9808689e2da64697e699474 (plain) (tree)











































































































































































































































































































































































                                                                                                                
import scala.concurrent._
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration.Inf



object FutureTests extends MinimalScalaTest {
  
  /* some utils */
  
  def testAsync(s: String): Future[String] = s match {
    case "Hello"   => future { "World" }
    case "Failure" => Promise.failed(new RuntimeException("Expected exception; to test fault-tolerance")).future
    case "NoReply" => Promise[String]().future
  }
  
  val defaultTimeout = Inf
  
  /* future specification */
  
  "A future" should {
    
    "compose with for-comprehensions" in {
      def async(x: Int) = future { (x * 2).toString }
      val future0 = future[Any] {
        "five!".length
      }
      
      val future1 = for {
        a <- future0.mapTo[Int]  // returns 5
        b <- async(a)            // returns "10"
        c <- async(7)            // returns "14"
      } yield b + "-" + c
      
      val future2 = for {
        a <- future0.mapTo[Int]
        b <- (future { (a * 2).toString }).mapTo[Int]
        c <- future { (7 * 2).toString } 
      } yield b + "-" + c
      
      Await.result(future1, defaultTimeout) mustBe ("10-14")
      assert(checkType(future1, manifest[String]))
      intercept[ClassCastException] { Await.result(future2, defaultTimeout) }
    }
    
    "support pattern matching within a for-comprehension" in {
      case class Req[T](req: T)
      case class Res[T](res: T)
      def async[T](req: Req[T]) = req match {
        case Req(s: String) => future { Res(s.length) }
        case Req(i: Int)    => future { Res((i * 2).toString) }
      }
      
      val future1 = for {
        Res(a: Int) <- async(Req("Hello"))
        Res(b: String) <- async(Req(a))
        Res(c: String) <- async(Req(7))
      } yield b + "-" + c
      
      val future2 = for {
        Res(a: Int) <- async(Req("Hello"))
        Res(b: Int) <- async(Req(a))
        Res(c: Int) <- async(Req(7))
      } yield b + "-" + c
      
      Await.result(future1, defaultTimeout) mustBe ("10-14")
      intercept[NoSuchElementException] { Await.result(future2, defaultTimeout) }
    }
    
    "recover from exceptions" in {
      val future1 = Future(5)
      val future2 = future1 map (_ / 0)
      val future3 = future2 map (_.toString)
      
      val future4 = future1 recover {
        case e: ArithmeticException => 0
      } map (_.toString)
      
      val future5 = future2 recover {
        case e: ArithmeticException => 0
      } map (_.toString)
      
      val future6 = future2 recover {
        case e: MatchError => 0
      } map (_.toString)
      
      val future7 = future3 recover {
        case e: ArithmeticException => "You got ERROR"
      }
      
      val future8 = testAsync("Failure")
      val future9 = testAsync("Failure") recover {
        case e: RuntimeException => "FAIL!"
      }
      val future10 = testAsync("Hello") recover {
        case e: RuntimeException => "FAIL!"
      }
      val future11 = testAsync("Failure") recover {
        case _ => "Oops!"
      }
      
      Await.result(future1, defaultTimeout) mustBe (5)
      intercept[ArithmeticException] { Await.result(future2, defaultTimeout) }
      intercept[ArithmeticException] { Await.result(future3, defaultTimeout) }
      Await.result(future4, defaultTimeout) mustBe ("5")
      Await.result(future5, defaultTimeout) mustBe ("0")
      intercept[ArithmeticException] { Await.result(future6, defaultTimeout) }
      Await.result(future7, defaultTimeout) mustBe ("You got ERROR")
      intercept[RuntimeException] { Await.result(future8, defaultTimeout) }
      Await.result(future9, defaultTimeout) mustBe ("FAIL!")
      Await.result(future10, defaultTimeout) mustBe ("World")
      Await.result(future11, defaultTimeout) mustBe ("Oops!")
    }
    
    "recoverWith from exceptions" in {
      val o = new IllegalStateException("original")
      val r = new IllegalStateException("recovered")
      
      intercept[IllegalStateException] {
        val failed = Promise.failed[String](o).future recoverWith {
          case _ if false == true => Promise.successful("yay!").future
        }
        Await.result(failed, defaultTimeout)
      } mustBe (o)
      
      val recovered = Promise.failed[String](o).future recoverWith {
        case _ => Promise.successful("yay!").future
      }
      Await.result(recovered, defaultTimeout) mustBe ("yay!")
      
      intercept[IllegalStateException] {
        val refailed = Promise.failed[String](o).future recoverWith {
          case _ => Promise.failed[String](r).future
        }
        Await.result(refailed, defaultTimeout)
      } mustBe (r)
    }
    
    "andThen like a boss" in {
      val q = new java.util.concurrent.LinkedBlockingQueue[Int]
      for (i <- 1 to 1000) {
        val chained = future {
          q.add(1); 3
        } andThen {
          case _ => q.add(2)
        } andThen {
          case Right(0) => q.add(Int.MaxValue)
        } andThen {
          case _ => q.add(3);
        }
        Await.result(chained, defaultTimeout) mustBe (3)
        q.poll() mustBe (1)
        q.poll() mustBe (2)
        q.poll() mustBe (3)
        q.clear()
      }
    }
    
    "firstCompletedOf" in {
      val futures = Vector.fill[Future[Int]](10) {
        Promise[Int]().future
      } :+ Promise.successful[Int](5).future
      Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5)
    }
    
    "find" in {
      val futures = for (i <- 1 to 10) yield future {
        i
      }
      
      val result = Future.find[Int](futures)(_ == 3)
      Await.result(result, defaultTimeout) mustBe (Some(3))

      val notFound = Future.find[Int](futures)(_ == 11)
      Await.result(notFound, defaultTimeout) mustBe (None)
    }
    
    "zip" in {
      val timeout = 10000 millis
      val f = new IllegalStateException("test")
      intercept[IllegalStateException] {
        val failed = Promise.failed[String](f).future zip Promise.successful("foo").future
        Await.result(failed, timeout)
      } mustBe (f)
      
      intercept[IllegalStateException] {
        val failed = Promise.successful("foo").future zip Promise.failed[String](f).future
        Await.result(failed, timeout)
      } mustBe (f)
      
      intercept[IllegalStateException] {
        val failed = Promise.failed[String](f).future zip Promise.failed[String](f).future
        Await.result(failed, timeout)
      } mustBe (f)
      
      val successful = Promise.successful("foo").future zip Promise.successful("foo").future
      Await.result(successful, timeout) mustBe (("foo", "foo"))
    }
    
    "fold" in {
      val timeout = 10000 millis
      def async(add: Int, wait: Int) = future {
        Thread.sleep(wait)
        add
      }
      def futures = (0 to 9) map {
        idx => async(idx, idx * 200)
      }
      def folded = Future.fold(futures)(0)(_ + _)
      Await.result(folded, timeout) mustBe (45)
    }
    
    "fold by composing" in {
      val timeout = 10000 millis
      def async(add: Int, wait: Int) = future {
        Thread.sleep(wait)
        add
      }
      def futures = (0 to 9) map { 
        idx => async(idx, idx * 200)
      }
      val folded = futures.foldLeft(Future(0)) {
        case (fr, fa) => for (r <- fr; a <- fa) yield (r + a)
      }
      Await.result(folded, timeout) mustBe (45)
    }
    
    "fold with an exception" in {
      val timeout = 10000 millis
      def async(add: Int, wait: Int) = future {
        Thread.sleep(wait)
        if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
        add
      }
      def futures = (0 to 9) map {
        idx => async(idx, idx * 100)
      }
      val folded = Future.fold(futures)(0)(_ + _)
      intercept[IllegalArgumentException] {
        Await.result(folded, timeout)
      }.getMessage mustBe ("shouldFoldResultsWithException: expected")
    }
    
    "fold mutable zeroes safely" in {
      import scala.collection.mutable.ArrayBuffer
      def test(testNumber: Int) {
        val fs = (0 to 1000) map (i => Future(i))
        val f = Future.fold(fs)(ArrayBuffer.empty[AnyRef]) {
          case (l, i) if i % 2 == 0 => l += i.asInstanceOf[AnyRef]
          case (l, _)               => l
        }
        val result = Await.result(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum
        
        assert(result == 250500)
      }

      (1 to 100) foreach test //Make sure it tries to provoke the problem
    }
    
    "return zero value if folding empty list" in {
      val zero = Future.fold(List[Future[Int]]())(0)(_ + _)
      Await.result(zero, defaultTimeout) mustBe (0)
    }
    
    "shouldReduceResults" in {
      def async(idx: Int) = future {
        Thread.sleep(idx * 200)
        idx
      }
      val timeout = 10000 millis
      val futures = (0 to 9) map { async }
      val reduced = Future.reduce(futures)(_ + _)
      Await.result(reduced, timeout) mustBe (45)
    }
    
    "shouldReduceResultsWithException" in {
      def async(add: Int, wait: Int) = future {
        Thread.sleep(wait)
        if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
        else add
      }
      val timeout = 10000 millis
      def futures = (1 to 10) map {
        idx => async(idx, idx * 100)
      }
      val failed = Future.reduce(futures)(_ + _)
      intercept[IllegalArgumentException] {
        Await.result(failed, timeout)
      }.getMessage mustBe ("shouldFoldResultsWithException: expected")
    }
    
    "shouldReduceThrowNSEEOnEmptyInput" in {
      intercept[java.util.NoSuchElementException] {
        val emptyreduced = Future.reduce(List[Future[Int]]())(_ + _)
        Await.result(emptyreduced, defaultTimeout)
      }
    }
    
    "shouldTraverseFutures" in {
      object counter {
        var count = -1
        def incAndGet() = counter.synchronized {
          count += 2
          count
        }
      }
      
      val oddFutures = List.fill(100)(future { counter.incAndGet() })
      val traversed = Future.sequence(oddFutures)
      Await.result(traversed, defaultTimeout).sum mustBe (10000)
      
      val list = (1 to 100).toList
      val traversedList = Future.traverse(list)(x => Future(x * 2 - 1))
      Await.result(traversedList, defaultTimeout).sum mustBe (10000)
    }
    
    /* need configurable execution contexts here
    "shouldHandleThrowables" in {
      class ThrowableTest(m: String) extends Throwable(m)
      
      val f1 = future[Any] {
        throw new ThrowableTest("test")
      }
      
      intercept[ThrowableTest] {
        Await.result(f1, defaultTimeout)
      }
      
      val latch = new TestLatch
      val f2 = future {
        Await.ready(latch, 5 seconds)
        "success"
      }
      val f3 = f2 map { s => s.toUpperCase }
      
      f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") }
      f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") }
      
      latch.open()
      
      Await.result(f2, defaultTimeout) mustBe ("success")
      
      f2 foreach { _ => throw new ThrowableTest("current thread foreach") }
      f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") }
      
      Await.result(f3, defaultTimeout) mustBe ("SUCCESS")
    }
    */
  }
  
}