summaryrefslogblamecommitdiff
path: root/test/files/jvm/future-spec/FutureTests.scala
blob: efe9c59d7a87603bd62fd869ebe334638a54029a (plain) (tree)
1
2
3
4
5
6
7
8
9
                         

                                             
                         
                                          
                                       


 
                                            
 
                  
 
                                                                                     
                                        
                                                                                                        

                                              
 
                                
 
                            



                                                                                     
                                                                                                                             


               
 
                                                         
 
                            

                                       
 


                                        
 
                               
                       



                                            
 

                                                                              
 
                  
 
                                                         
 

                                                                                  
 
                                                         
 
                            


                                       
 
                        
                      

     
 


















                                                                                  




                                                                                  
 
                                        

                                                
     



                                                  
 
                                          

                                                     

                      
 




                                                
 

                               

                                                     
                         
 



                                                                             
 



                                                              

                                                               
       
 




                                          
 




                                          
 


                                                                                 
 



                                            
 


                                        
 


                                        
 


                                     
 


                                                      
 









                                                   
 











                                                                              
 


                                                    
 
                                        

                                                              


                                            
 

                                                            

                                                             
 
                                        

                                                             



                                              
 


                                                               
                              



                            
                                                









                                                        
 
                           
                                                  
                             
                                    
 
                                                                               
                                                                                        
     
 
               
                                                     

         
 


                                                           
                                                                

                                                          
 



                                               
                                                                          

                                     
 
                                        
                                                                          

                                     
 
                                        
                                                                          

                                     
 
                                                                            

                                                               
 

                                
                                               


                          
 
                                  
                                   
       
                                                 
                                               
 
                                    
                                   


                                                   
     
 

                                
                                               


                          
                                  
                                   





                                                             
 

                                 
                                               




                                                                                                    
                                   





                                                                      
 








                                                                              
 




                                                                         
 



                                                           
 
                              
                                    
                              


                                
 


                                                 
 


                                                     
     
 
                                           
                                               





                                                                                                    
                                   





                                                                      
 





                                                                    
 







                                                
 
                                                                              

                                                                
 


                                                                       
 


                                                                               
     
 

                                 
 
                      


                                     
                       


                                      
 


                                   
 
                  
 
                                                  
 
                       


                         
 


                                   
     
 

                                                
 
                       










                                                       
 
                                                     
 

                                   
 

                                                     
 

                                   
 






                                                       
 
                                                     
 
                                   
 

                                                     
 

                                  
 







                                                       
 

                                          
 
                                  
 
                                                     
 

                                         
 

                                                     
 

                                                               
 
                                                              
                                   





                                                        
 
                                
                                    










                                                                                    

                                            
                                                                                           
                                  

                                                                                
 















                                                                                                         
   
 
 
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.duration.Duration.Inf
import scala.collection._
import scala.runtime.NonLocalReturnControl
import scala.util.{Try,Success,Failure}



class FutureTests extends MinimalScalaTest {

  /* some utils */

  def testAsync(s: String)(implicit ec: ExecutionContext): Future[String] = s match {
    case "Hello"   => Future { "World" }
    case "Failure" => Future.failed(new RuntimeException("Expected exception; to test fault-tolerance"))
    case "NoReply" => Promise[String]().future
  }

  val defaultTimeout = 5 seconds

  /* future specification */

  "A future with custom ExecutionContext" should {
    "shouldHandleThrowables" in {
      val ms = new mutable.HashSet[Throwable] with mutable.SynchronizedSet[Throwable]
      implicit val ec = scala.concurrent.ExecutionContext.fromExecutorService(new scala.concurrent.forkjoin.ForkJoinPool(), {
        t =>
        ms += t
      })

      class ThrowableTest(m: String) extends Throwable(m)

      val f1 = Future[Any] {
        throw new ThrowableTest("test")
      }

      intercept[ThrowableTest] {
        Await.result(f1, defaultTimeout)
      }

      val latch = new TestLatch
      val f2 = Future {
        Await.ready(latch, 5 seconds)
        "success"
      }
      val f3 = f2 map { s => s.toUpperCase }

      f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") }
      f2 onSuccess { case _ => throw new ThrowableTest("dispatcher receive") }

      latch.open()

      Await.result(f2, defaultTimeout) mustBe ("success")

      f2 foreach { _ => throw new ThrowableTest("current thread foreach") }
      f2 onSuccess { case _ => throw new ThrowableTest("current thread receive") }

      Await.result(f3, defaultTimeout) mustBe ("SUCCESS")

      val waiting = Future {
        Thread.sleep(1000)
      }
      Await.ready(waiting, 2000 millis)

      ms.size mustBe (4)
      ec.shutdownNow()
    }
  }

  "The Future companion object" should {
    "call ExecutionContext.prepare on apply" in {
      val p = Promise[Boolean]()
      val ec = new ExecutionContext {
        val delegate = ExecutionContext.global
        override def prepare(): ExecutionContext = {
          p.success(true)
          delegate.prepare
        }
        override def execute(r: Runnable) = delegate.execute(r)
        override def reportFailure(t: Throwable): Unit = delegate.reportFailure(t)
      }

      val f = Future("foo")(ec)
      Await.result(f, defaultTimeout) mustBe ("foo")
      Await.result(p.future, defaultTimeout) mustBe (true)
    }
  }

  "The default ExecutionContext" should {
    "report uncaught exceptions" in {
      val p = Promise[Throwable]()
      val logThrowable: Throwable => Unit = p.trySuccess(_)
      val ec: ExecutionContext = ExecutionContext.fromExecutor(null, logThrowable)

      val t = new InterruptedException()
      val f = Future(throw t)(ec)
      Await.result(p.future, 2.seconds) mustBe t
    }
  }

  "A future with global ExecutionContext" should {
    import ExecutionContext.Implicits._

    "compose with for-comprehensions" in {
      def async(x: Int) = Future { (x * 2).toString }
      val future0 = Future[Any] {
        "five!".length
      }

      val future1 = for {
        a <- future0.mapTo[Int]  // returns 5
        b <- async(a)            // returns "10"
        c <- async(7)            // returns "14"
      } yield b + "-" + c

      val future2 = for {
        a <- future0.mapTo[Int]
        b <- (Future { (a * 2).toString }).mapTo[Int]
        c <- Future { (7 * 2).toString }
      } yield b + "-" + c

      Await.result(future1, defaultTimeout) mustBe ("10-14")
      assert(checkType(future1, manifest[String]))
      intercept[ClassCastException] { Await.result(future2, defaultTimeout) }
    }

    "support pattern matching within a for-comprehension" in {
      case class Req[T](req: T)
      case class Res[T](res: T)
      def async[T](req: Req[T]) = req match {
        case Req(s: String) => Future { Res(s.length) }
        case Req(i: Int)    => Future { Res((i * 2).toString) }
      }

      val future1 = for {
        Res(a: Int) <- async(Req("Hello"))
        Res(b: String) <- async(Req(a))
        Res(c: String) <- async(Req(7))
      } yield b + "-" + c

      val future2 = for {
        Res(a: Int) <- async(Req("Hello"))
        Res(b: Int) <- async(Req(a))
        Res(c: Int) <- async(Req(7))
      } yield b + "-" + c

      Await.result(future1, defaultTimeout) mustBe ("10-14")
      intercept[NoSuchElementException] { Await.result(future2, defaultTimeout) }
    }

    "recover from exceptions" in {
      val future1 = Future(5)
      val future2 = future1 map (_ / 0)
      val future3 = future2 map (_.toString)

      val future4 = future1 recover {
        case e: ArithmeticException => 0
      } map (_.toString)

      val future5 = future2 recover {
        case e: ArithmeticException => 0
      } map (_.toString)

      val future6 = future2 recover {
        case e: MatchError => 0
      } map (_.toString)

      val future7 = future3 recover {
        case e: ArithmeticException => "You got ERROR"
      }

      val future8 = testAsync("Failure")
      val future9 = testAsync("Failure") recover {
        case e: RuntimeException => "FAIL!"
      }
      val future10 = testAsync("Hello") recover {
        case e: RuntimeException => "FAIL!"
      }
      val future11 = testAsync("Failure") recover {
        case _ => "Oops!"
      }

      Await.result(future1, defaultTimeout) mustBe (5)
      intercept[ArithmeticException] { Await.result(future2, defaultTimeout) }
      intercept[ArithmeticException] { Await.result(future3, defaultTimeout) }
      Await.result(future4, defaultTimeout) mustBe ("5")
      Await.result(future5, defaultTimeout) mustBe ("0")
      intercept[ArithmeticException] { Await.result(future6, defaultTimeout) }
      Await.result(future7, defaultTimeout) mustBe ("You got ERROR")
      intercept[RuntimeException] { Await.result(future8, defaultTimeout) }
      Await.result(future9, defaultTimeout) mustBe ("FAIL!")
      Await.result(future10, defaultTimeout) mustBe ("World")
      Await.result(future11, defaultTimeout) mustBe ("Oops!")
    }

    "recoverWith from exceptions" in {
      val o = new IllegalStateException("original")
      val r = new IllegalStateException("recovered")

      intercept[IllegalStateException] {
        val failed = Future.failed[String](o) recoverWith {
          case _ if false == true => Future.successful("yay!")
        }
        Await.result(failed, defaultTimeout)
      } mustBe (o)

      val recovered = Future.failed[String](o) recoverWith {
        case _ => Future.successful("yay!")
      }
      Await.result(recovered, defaultTimeout) mustBe ("yay!")

      intercept[IllegalStateException] {
        val refailed = Future.failed[String](o) recoverWith {
          case _ => Future.failed[String](r)
        }
        Await.result(refailed, defaultTimeout)
      } mustBe (r)
    }

    "andThen like a boss" in {
      val q = new java.util.concurrent.LinkedBlockingQueue[Int]
      for (i <- 1 to 1000) {
        val chained = Future {
          q.add(1); 3
        } andThen {
          case _ => q.add(2)
        } andThen {
          case Success(0) => q.add(Int.MaxValue)
        } andThen {
          case _ => q.add(3);
        }
        Await.result(chained, defaultTimeout) mustBe (3)
        q.poll() mustBe (1)
        q.poll() mustBe (2)
        q.poll() mustBe (3)
        q.clear()
      }
    }

    "firstCompletedOf" in {
      def futures = Vector.fill[Future[Int]](10) {
        Promise[Int]().future
      } :+ Future.successful[Int](5)

      Await.result(Future.firstCompletedOf(futures), defaultTimeout) mustBe (5)
      Await.result(Future.firstCompletedOf(futures.iterator), defaultTimeout) mustBe (5)
    }

    "find" in {
      val futures = for (i <- 1 to 10) yield Future {
        i
      }

      val result = Future.find[Int](futures)(_ == 3)
      Await.result(result, defaultTimeout) mustBe (Some(3))

      val notFound = Future.find[Int](futures.iterator)(_ == 11)
      Await.result(notFound, defaultTimeout) mustBe (None)
    }

    "zip" in {
      val timeout = 10000 millis
      val f = new IllegalStateException("test")
      intercept[IllegalStateException] {
        val failed = Future.failed[String](f) zip Future.successful("foo")
        Await.result(failed, timeout)
      } mustBe (f)

      intercept[IllegalStateException] {
        val failed = Future.successful("foo") zip Future.failed[String](f)
        Await.result(failed, timeout)
      } mustBe (f)

      intercept[IllegalStateException] {
        val failed = Future.failed[String](f) zip Future.failed[String](f)
        Await.result(failed, timeout)
      } mustBe (f)

      val successful = Future.successful("foo") zip Future.successful("foo")
      Await.result(successful, timeout) mustBe (("foo", "foo"))
    }

    "fold" in {
      val timeout = 10000 millis
      def async(add: Int, wait: Int) = Future {
        Thread.sleep(wait)
        add
      }

      val futures = (0 to 9) map {
        idx => async(idx, idx * 20)
      }
      val folded = Future.fold(futures)(0)(_ + _)
      Await.result(folded, timeout) mustBe (45)

      val futuresit = (0 to 9) map {
        idx => async(idx, idx * 20)
      }
      val foldedit = Future.fold(futures)(0)(_ + _)
      Await.result(foldedit, timeout) mustBe (45)
    }

    "fold by composing" in {
      val timeout = 10000 millis
      def async(add: Int, wait: Int) = Future {
        Thread.sleep(wait)
        add
      }
      def futures = (0 to 9) map {
        idx => async(idx, idx * 20)
      }
      val folded = futures.foldLeft(Future(0)) {
        case (fr, fa) => for (r <- fr; a <- fa) yield (r + a)
      }
      Await.result(folded, timeout) mustBe (45)
    }

    "fold with an exception" in {
      val timeout = 10000 millis
      def async(add: Int, wait: Int) = Future {
        Thread.sleep(wait)
        if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
        add
      }
      def futures = (0 to 9) map {
        idx => async(idx, idx * 10)
      }
      val folded = Future.fold(futures)(0)(_ + _)
      intercept[IllegalArgumentException] {
        Await.result(folded, timeout)
      }.getMessage mustBe ("shouldFoldResultsWithException: expected")
    }

    "fold mutable zeroes safely" in {
      import scala.collection.mutable.ArrayBuffer
      def test(testNumber: Int) {
        val fs = (0 to 1000) map (i => Future(i))
        val f = Future.fold(fs)(ArrayBuffer.empty[AnyRef]) {
          case (l, i) if i % 2 == 0 => l += i.asInstanceOf[AnyRef]
          case (l, _)               => l
        }
        val result = Await.result(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum

        assert(result == 250500)
      }

      (1 to 100) foreach test //Make sure it tries to provoke the problem
    }

    "return zero value if folding empty list" in {
      val zero = Future.fold(List[Future[Int]]())(0)(_ + _)
      Await.result(zero, defaultTimeout) mustBe (0)
    }

    "shouldReduceResults" in {
      def async(idx: Int) = Future {
        Thread.sleep(idx * 20)
        idx
      }
      val timeout = 10000 millis

      val futures = (0 to 9) map { async }
      val reduced = Future.reduce(futures)(_ + _)
      Await.result(reduced, timeout) mustBe (45)

      val futuresit = (0 to 9) map { async }
      val reducedit = Future.reduce(futuresit)(_ + _)
      Await.result(reducedit, timeout) mustBe (45)
    }

    "shouldReduceResultsWithException" in {
      def async(add: Int, wait: Int) = Future {
        Thread.sleep(wait)
        if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
        else add
      }
      val timeout = 10000 millis
      def futures = (1 to 10) map {
        idx => async(idx, idx * 10)
      }
      val failed = Future.reduce(futures)(_ + _)
      intercept[IllegalArgumentException] {
        Await.result(failed, timeout)
      }.getMessage mustBe ("shouldFoldResultsWithException: expected")
    }

    "shouldReduceThrowNSEEOnEmptyInput" in {
      intercept[java.util.NoSuchElementException] {
        val emptyreduced = Future.reduce(List[Future[Int]]())(_ + _)
        Await.result(emptyreduced, defaultTimeout)
      }
    }

    "shouldTraverseFutures" in {
      object counter {
        var count = -1
        def incAndGet() = counter.synchronized {
          count += 2
          count
        }
      }

      val oddFutures = List.fill(100)(Future { counter.incAndGet() }).iterator
      val traversed = Future.sequence(oddFutures)
      Await.result(traversed, defaultTimeout).sum mustBe (10000)

      val list = (1 to 100).toList
      val traversedList = Future.traverse(list)(x => Future(x * 2 - 1))
      Await.result(traversedList, defaultTimeout).sum mustBe (10000)

      val iterator = (1 to 100).toList.iterator
      val traversedIterator = Future.traverse(iterator)(x => Future(x * 2 - 1))
      Await.result(traversedIterator, defaultTimeout).sum mustBe (10000)
    }

    "shouldBlockUntilResult" in {
      val latch = new TestLatch

      val f = Future {
        Await.ready(latch, 5 seconds)
        5
      }
      val f2 = Future {
        val res = Await.result(f, Inf)
        res + 9
      }

      intercept[TimeoutException] {
        Await.ready(f2, 100 millis)
      }

      latch.open()

      Await.result(f2, defaultTimeout) mustBe (14)

      val f3 = Future {
        Thread.sleep(100)
        5
      }

      intercept[TimeoutException] {
        Await.ready(f3, 0 millis)
      }
    }

    "run callbacks async" in {
      val latch = Vector.fill(10)(new TestLatch)

      val f1 = Future {
        latch(0).open()
        Await.ready(latch(1), TestLatch.DefaultTimeout)
        "Hello"
      }
      val f2 = f1 map {
        s =>
        latch(2).open()
        Await.ready(latch(3), TestLatch.DefaultTimeout)
        s.length
      }
      for (_ <- f2) latch(4).open()

      Await.ready(latch(0), TestLatch.DefaultTimeout)

      f1.isCompleted mustBe (false)
      f2.isCompleted mustBe (false)

      latch(1).open()
      Await.ready(latch(2), TestLatch.DefaultTimeout)

      f1.isCompleted mustBe (true)
      f2.isCompleted mustBe (false)

      val f3 = f1 map {
        s =>
        latch(5).open()
        Await.ready(latch(6), TestLatch.DefaultTimeout)
        s.length * 2
      }
      for (_ <- f3) latch(3).open()

      Await.ready(latch(5), TestLatch.DefaultTimeout)

      f3.isCompleted mustBe (false)

      latch(6).open()
      Await.ready(latch(4), TestLatch.DefaultTimeout)

      f2.isCompleted mustBe (true)
      f3.isCompleted mustBe (true)

      val p1 = Promise[String]()
      val f4 = p1.future map {
        s =>
        latch(7).open()
        Await.ready(latch(8), TestLatch.DefaultTimeout)
        s.length
      }
      for (_ <- f4) latch(9).open()

      p1.future.isCompleted mustBe (false)
      f4.isCompleted mustBe (false)

      p1 complete Success("Hello")

      Await.ready(latch(7), TestLatch.DefaultTimeout)

      p1.future.isCompleted mustBe (true)
      f4.isCompleted mustBe (false)

      latch(8).open()
      Await.ready(latch(9), TestLatch.DefaultTimeout)

      Await.ready(f4, defaultTimeout).isCompleted mustBe (true)
    }

    "should not deadlock with nested await (ticket 1313)" in {
      val simple = Future(()) map {
        _ =>
        val unit = Future(())
        val umap = unit map { _ => () }
        Await.result(umap, Inf)
      }
      Await.ready(simple, Inf).isCompleted mustBe (true)

      val l1, l2 = new TestLatch
      val complex = Future(()) map {
        _ =>
        blocking {
          val nested = Future(())
          for (_ <- nested) l1.open()
          Await.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed
          for (_ <- nested) l2.open()
          Await.ready(l2, TestLatch.DefaultTimeout)
        }
      }
      Await.ready(complex, defaultTimeout).isCompleted mustBe (true)
    }

    "should not throw when Await.ready" in {
      val expected = try Success(5 / 0) catch { case a: ArithmeticException => Failure(a) }
      val f = Future(5).map(_ / 0)
      Await.ready(f, defaultTimeout).value.get.toString mustBe expected.toString
    }

    "should have a decent toString representation" in {      
      val i = scala.concurrent.forkjoin.ThreadLocalRandom.current.nextInt()
      val e = new Exception(i.toString)
      val successString = "Future(Success("+i+"))"
      val failureString = "Future(Failure("+e+"))"
      val notCompletedString = "Future(<not completed>)"
      
      Future.successful(i).toString mustBe successString
      Future.failed[Int](e).toString mustBe failureString
      Promise[Int]().toString mustBe notCompletedString
      Promise[Int]().success(i).toString mustBe successString
      Promise[Int]().failure(e).toString mustBe failureString
      Await.ready(Future(i)(ExecutionContext.global), defaultTimeout).toString mustBe successString
      Await.ready(Future(throw e)(ExecutionContext.global), defaultTimeout).toString mustBe failureString
    }

  }

}