From 5379eba504efc84adf80c25c8c75708076c6c505 Mon Sep 17 00:00:00 2001 From: Heather Miller Date: Sun, 17 Feb 2013 23:21:54 +0100 Subject: Removing disabled, unneeded futures tests --- test/disabled/jvm/scala-concurrent-tck-akka.scala | 391 ---------- .../akka/src/akka/dispatch/Future.scala | 832 --------------------- test/disabled/presentation/akka/src/pi.scala | 108 --- 3 files changed, 1331 deletions(-) delete mode 100644 test/disabled/jvm/scala-concurrent-tck-akka.scala delete mode 100644 test/disabled/presentation/akka/src/akka/dispatch/Future.scala delete mode 100644 test/disabled/presentation/akka/src/pi.scala diff --git a/test/disabled/jvm/scala-concurrent-tck-akka.scala b/test/disabled/jvm/scala-concurrent-tck-akka.scala deleted file mode 100644 index dfd906e59e..0000000000 --- a/test/disabled/jvm/scala-concurrent-tck-akka.scala +++ /dev/null @@ -1,391 +0,0 @@ - - -import akka.dispatch.{ - Future => future, - Promise => promise -} -import akka.dispatch.Await.{result => await} - -// Duration required for await -import akka.util.Duration -import java.util.concurrent.TimeUnit -import TimeUnit._ - -import scala.concurrent.{ - TimeoutException, - SyncVar, - ExecutionException -} -//import scala.concurrent.future -//import scala.concurrent.promise -//import scala.concurrent.await - - - -trait TestBase { - - implicit val disp = akka.actor.ActorSystem().dispatcher - - def once(body: (() => Unit) => Unit) { - val sv = new SyncVar[Boolean] - body(() => sv put true) - sv.take() - } - -} - - -trait FutureCallbacks extends TestBase { - - def testOnSuccess(): Unit = once { - done => - var x = 0 - val f = future { - x = 1 - } - f onSuccess { case any => - done() - assert(x == 1) - } - } - - def testOnSuccessWhenCompleted(): Unit = once { - done => - var x = 0 - val f = future { - x = 1 - } - f onSuccess { case any => - assert(x == 1) - x = 2 - f onSuccess { case any => - assert(x == 2) - done() - } - } - } - - def testOnSuccessWhenFailed(): Unit = once { - done => - val f = future[Unit] { - done() - throw new Exception - } - f onSuccess { case any => - assert(false) - } - } - - def testOnFailure(): Unit = once { - done => - var x = 0 - val f = future[Unit] { - x = 1 - throw new Exception - } - f onSuccess { case any => - done() - assert(false) - } - f onFailure { - case _ => - done() - assert(x == 1) - } - } - - def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once { - done => - val f = future[Unit] { - throw cause - } - f onSuccess { case any => - done() - assert(false) - } - f onFailure { - case e: ExecutionException if (e.getCause == cause) => - done() - case _ => - done() - assert(false) - } - } - - def testOnFailureWhenTimeoutException(): Unit = once { - done => - val f = future[Unit] { - throw new TimeoutException() - } - f onSuccess { case any => - done() - assert(false) - } - f onFailure { - case e: TimeoutException => - done() - case other => - done() - assert(false) - } - } - - testOnSuccess() - testOnSuccessWhenCompleted() - testOnSuccessWhenFailed() - testOnFailure() -// testOnFailureWhenSpecialThrowable(5, new Error) -// testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { }) -// testOnFailureWhenSpecialThrowable(7, new InterruptedException) -// testOnFailureWhenTimeoutException() - -} - - -trait FutureCombinators extends TestBase { - - // map: stub - def testMapSuccess(): Unit = once { - done => - done() - } - - def testMapFailure(): Unit = once { - done => - done() - } - - // flatMap: stub - def testFlatMapSuccess(): Unit = once { - done => - done() - } - - def testFlatMapFailure(): Unit = once { - done => - done() - } - - // filter: stub - def testFilterSuccess(): Unit = once { - done => - done() - } - - def testFilterFailure(): Unit = once { - done => - done() - } - - // foreach: stub - def testForeachSuccess(): Unit = once { - done => - done() - } - - def testForeachFailure(): Unit = once { - done => - done() - } - - def testRecoverSuccess(): Unit = once { - done => - val cause = new RuntimeException - val f = future { - throw cause - } recover { - case re: RuntimeException => - "recovered" - } onSuccess { case x => - done() - assert(x == "recovered") - } onFailure { case any => - done() - assert(false) - } - } - - def testRecoverFailure(): Unit = once { - done => - val cause = new RuntimeException - val f = future { - throw cause - } recover { - case te: TimeoutException => "timeout" - } onSuccess { case x => - done() - assert(false) - } onFailure { case any => - done() - assert(any == cause) - } - } - - testMapSuccess() - testMapFailure() - testFlatMapSuccess() - testFlatMapFailure() - testFilterSuccess() - testFilterFailure() - testForeachSuccess() - testForeachFailure() - testRecoverSuccess() - testRecoverFailure() - -} - -/* -trait FutureProjections extends TestBase { - - def testFailedFailureOnComplete(): Unit = once { - done => - val cause = new RuntimeException - val f = future { - throw cause - } - f.failed onComplete { - case Right(t) => - assert(t == cause) - done() - case Left(t) => - assert(false) - } - } - - def testFailedFailureOnSuccess(): Unit = once { - done => - val cause = new RuntimeException - val f = future { - throw cause - } - f.failed onSuccess { - t => - assert(t == cause) - done() - } - } - - def testFailedSuccessOnComplete(): Unit = once { - done => - val f = future { 0 } - f.failed onComplete { - case Right(t) => - assert(false) - case Left(t) => - assert(t.isInstanceOf[NoSuchElementException]) - done() - } - } - - def testFailedSuccessOnFailure(): Unit = once { - done => - val f = future { 0 } - f.failed onFailure { - case nsee: NoSuchElementException => - done() - } - } - - def testFailedFailureAwait(): Unit = once { - done => - val cause = new RuntimeException - val f = future { - throw cause - } - assert(await(0, f.failed) == cause) - done() - } - - def testFailedSuccessAwait(): Unit = once { - done => - val f = future { 0 } - try { - println(await(0, f.failed)) - assert(false) - } catch { - case nsee: NoSuchElementException => done() - } - } - - testFailedFailureOnComplete() - testFailedFailureOnSuccess() - testFailedSuccessOnComplete() - testFailedSuccessOnFailure() - testFailedFailureAwait() - //testFailedSuccessAwait() - -} -*/ - -trait Blocking extends TestBase { - - def testAwaitSuccess(): Unit = once { - done => - val f = future { 0 } - await(f, Duration(500, "ms")) - done() - } - - def testAwaitFailure(): Unit = once { - done => - val cause = new RuntimeException - val f = future { - throw cause - } - try { - await(f, Duration(500, "ms")) - assert(false) - } catch { - case t => - assert(t == cause) - done() - } - } - - testAwaitSuccess() - testAwaitFailure() - -} - -/* -trait Promises extends TestBase { - - def testSuccess(): Unit = once { - done => - val p = promise[Int]() - val f = p.future - - f.onSuccess { x => - done() - assert(x == 5) - } onFailure { case any => - done() - assert(false) - } - - p.success(5) - } - - testSuccess() - -} -*/ - -trait Exceptions extends TestBase { - -} - - -object Test -extends App -with FutureCallbacks -with FutureCombinators -/*with FutureProjections*/ -/*with Promises*/ -with Blocking -with Exceptions -{ - System.exit(0) -} - - diff --git a/test/disabled/presentation/akka/src/akka/dispatch/Future.scala b/test/disabled/presentation/akka/src/akka/dispatch/Future.scala deleted file mode 100644 index 1ad304d726..0000000000 --- a/test/disabled/presentation/akka/src/akka/dispatch/Future.scala +++ /dev/null @@ -1,832 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.dispatch - -import akka.AkkaException -import akka.event.EventHandler -import akka.actor.{ Actor, Channel } -import akka.util.Duration -import akka.japi.{ Procedure, Function => JFunc } - -import scala.util.continuations._ - -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } -import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS => MILLIS } -import java.util.concurrent.atomic.{ AtomicBoolean } -import java.lang.{ Iterable => JIterable } -import java.util.{ LinkedList => JLinkedList } -import scala.collection.mutable.Stack -import annotation.tailrec - -class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) - -object Futures { - - /** - * Java API, equivalent to Future.apply - */ - def future[T](body: Callable[T]): Future[T] = - Future(body.call) - - /** - * Java API, equivalent to Future.apply - */ - def future[T](body: Callable[T], timeout: Long): Future[T] = - Future(body.call, timeout) - - /** - * Java API, equivalent to Future.apply - */ - def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = - Future(body.call)(dispatcher) - - /** - * Java API, equivalent to Future.apply - */ - def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] = - Future(body.call, timeout)(dispatcher) - - /** - * Returns a Future to the result of the first future in the list that is completed - */ - def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = { - val futureResult = new DefaultCompletableFuture[T](timeout) - - val completeFirst: Future[T] => Unit = _.value.foreach(futureResult complete _) - for (f ← futures) f onComplete completeFirst - - futureResult - } - - /** - * Java API. - * Returns a Future to the result of the first future in the list that is completed - */ - def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] = - firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout) - - /** - * A non-blocking fold over the specified futures. - * The fold is performed on the thread where the last future is completed, - * the result will be the first failure of any of the futures, or any failure in the actual fold, - * or the result of the fold. - * Example: - *
-   *   val result = Futures.fold(0)(futures)(_ + _).await.result
-   * 
- */ - def fold[T, R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = { - if (futures.isEmpty) { - new AlreadyCompletedFuture[R](Right(zero)) - } else { - val result = new DefaultCompletableFuture[R](timeout) - val results = new ConcurrentLinkedQueue[T]() - val allDone = futures.size - - val aggregate: Future[T] => Unit = f => if (!result.isCompleted) { //TODO: This is an optimization, is it premature? - f.value.get match { - case r: Right[Throwable, T] => - results add r.b - if (results.size == allDone) { //Only one thread can get here - try { - result completeWithResult scala.collection.JavaConversions.collectionAsScalaIterable(results).foldLeft(zero)(foldFun) - } catch { - case e: Exception => - EventHandler.error(e, this, e.getMessage) - result completeWithException e - } - finally { - results.clear - } - } - case l: Left[Throwable, T] => - result completeWithException l.a - results.clear - } - } - - futures foreach { _ onComplete aggregate } - result - } - } - - /** - * Java API - * A non-blocking fold over the specified futures. - * The fold is performed on the thread where the last future is completed, - * the result will be the first failure of any of the futures, or any failure in the actual fold, - * or the result of the fold. - */ - def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = - fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _) - - /** - * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first - * Example: - *
-   *   val result = Futures.reduce(futures)(_ + _).await.result
-   * 
- */ - def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R, T) => T): Future[R] = { - if (futures.isEmpty) - new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left"))) - else { - val result = new DefaultCompletableFuture[R](timeout) - val seedFound = new AtomicBoolean(false) - val seedFold: Future[T] => Unit = f => { - if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold - f.value.get match { - case r: Right[Throwable, T] => - result.completeWith(fold(r.b, timeout)(futures.filterNot(_ eq f))(op)) - case l: Left[Throwable, T] => - result.completeWithException(l.a) - } - } - } - for (f ← futures) f onComplete seedFold //Attach the listener to the Futures - result - } - } - - /** - * Java API. - * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first - */ - def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = - reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) - - /** - * Java API. - * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.util.LinkedList[A]]. - * Useful for reducing many Futures into a single Future. - */ - def sequence[A](in: JIterable[Future[A]], timeout: Long): Future[JLinkedList[A]] = - scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) => - for (r ← fr; a ← fa) yield { - r add a - r - }) - - /** - * Java API. - * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.util.LinkedList[A]]. - * Useful for reducing many Futures into a single Future. - */ - def sequence[A](in: JIterable[Future[A]]): Future[JLinkedList[A]] = sequence(in, Actor.TIMEOUT) - - /** - * Java API. - * Transforms a java.lang.Iterable[A] into a Future[java.util.LinkedList[B]] using the provided Function A => Future[B]. - * This is useful for performing a parallel map. For example, to apply a function to all items of a list - * in parallel. - */ - def traverse[A, B](in: JIterable[A], timeout: Long, fn: JFunc[A, Future[B]]): Future[JLinkedList[B]] = - scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) => - val fb = fn(a) - for (r ← fr; b ← fb) yield { - r add b - r - } - } - - /** - * Java API. - * Transforms a java.lang.Iterable[A] into a Future[java.util.LinkedList[B]] using the provided Function A => Future[B]. - * This is useful for performing a parallel map. For example, to apply a function to all items of a list - * in parallel. - */ - def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JLinkedList[B]] = traverse(in, Actor.TIMEOUT, fn) - - // ===================================== - // Deprecations - // ===================================== - - /** - * (Blocking!) - */ - @deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)", "1.1") - def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) - - /** - * Returns the First Future that is completed (blocking!) - */ - @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await", "1.1") - def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await - - /** - * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed - */ - @deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }", "1.1") - def awaitMap[A, B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = - in map { f => fun(f.await) } - - /** - * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) - */ - @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException", "1.1") - def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1, f2)).await.resultOrException -} - -object Future { - /** - * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body - * The execution is performed by the specified Dispatcher. - */ - def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = - dispatcher.dispatchFuture(() => body, timeout) - - /** - * Construct a completable channel - */ - def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] { - val future = empty[Any](timeout) - def !(msg: Any) = future completeWithResult msg - } - - /** - * Create an empty Future with default timeout - */ - def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultCompletableFuture[T](timeout) - - import scala.collection.mutable.Builder - import scala.collection.generic.CanBuildFrom - - /** - * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]]. - * Useful for reducing many Futures into a single Future. - */ - def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = - in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) => for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) - - /** - * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B]. - * This is useful for performing a parallel map. For example, to apply a function to all items of a list - * in parallel: - *
-   * val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x)))
-   * 
- */ - def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = - in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => - val fb = fn(a.asInstanceOf[A]) - for (r ← fr; b ← fb) yield (r += b) - }.map(_.result) - - /** - * Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited - * Continuations plugin. - * - * Within the block, the result of a Future may be accessed by calling Future.apply. At that point - * execution is suspended with the rest of the block being stored in a continuation until the result - * of the Future is available. If an Exception is thrown while processing, it will be contained - * within the resulting Future. - * - * This allows working with Futures in an imperative style without blocking for each result. - * - * Completing a Future using 'CompletableFuture << Future' will also suspend execution until the - * value of the other Future is available. - * - * The Delimited Continuations compiler plugin must be enabled in order to use this method. - */ - def flow[A](body: => A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = { - val future = Promise[A](timeout) - (reset(future.asInstanceOf[CompletableFuture[Any]].completeWithResult(body)): Future[Any]) onComplete { f => - val opte = f.exception - if (opte.isDefined) future completeWithException (opte.get) - } - future - } - - private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() => Unit]]]() { - override def initialValue = None - } -} - -sealed trait Future[+T] { - - /** - * For use only within a Future.flow block or another compatible Delimited Continuations reset block. - * - * Returns the result of this Future without blocking, by suspending execution and storing it as a - * continuation until the result is available. - * - * If this Future is untyped (a Future[Nothing]), a type parameter must be explicitly provided or - * execution will fail. The normal result of getting a Future from an ActorRef using !!! will return - * an untyped Future. - */ - def apply[A >: T](): A @cps[Future[Any]] = shift(this flatMap (_: A => Future[Any])) - - /** - * Blocks awaiting completion of this Future, then returns the resulting value, - * or throws the completed exception - * - * Scala & Java API - * - * throws FutureTimeoutException if this Future times out when waiting for completion - */ - def get: T = this.await.resultOrException.get - - /** - * Blocks the current thread until the Future has been completed or the - * timeout has expired. In the case of the timeout expiring a - * FutureTimeoutException will be thrown. - */ - def await: Future[T] - - /** - * Blocks the current thread until the Future has been completed or the - * timeout has expired. The timeout will be the least value of 'atMost' and the timeout - * supplied at the constructuion of this Future. - * In the case of the timeout expiring a FutureTimeoutException will be thrown. - */ - def await(atMost: Duration): Future[T] - - /** - * Blocks the current thread until the Future has been completed. Use - * caution with this method as it ignores the timeout and will block - * indefinitely if the Future is never completed. - */ - @deprecated("Will be removed after 1.1, it's dangerous and can cause deadlocks, agony and insanity.", "1.1") - def awaitBlocking: Future[T] - - /** - * Tests whether this Future has been completed. - */ - final def isCompleted: Boolean = value.isDefined - - /** - * Tests whether this Future's timeout has expired. - * - * Note that an expired Future may still contain a value, or it may be - * completed with a value. - */ - def isExpired: Boolean - - /** - * This Future's timeout in nanoseconds. - */ - def timeoutInNanos: Long - - /** - * The contained value of this Future. Before this Future is completed - * the value will be None. After completion the value will be Some(Right(t)) - * if it contains a valid result, or Some(Left(error)) if it contains - * an exception. - */ - def value: Option[Either[Throwable, T]] - - /** - * Returns the successful result of this Future if it exists. - */ - final def result: Option[T] = { - val v = value - if (v.isDefined) v.get.right.toOption - else None - } - - /** - * Returns the contained exception of this Future if it exists. - */ - final def exception: Option[Throwable] = { - val v = value - if (v.isDefined) v.get.left.toOption - else None - } - - /** - * When this Future is completed, apply the provided function to the - * Future. If the Future has already been completed, this will apply - * immediately. - */ - def onComplete(func: Future[T] => Unit): Future[T] - - /** - * When the future is completed with a valid result, apply the provided - * PartialFunction to the result. - *
-   *   val result = future receive {
-   *     case Foo => "foo"
-   *     case Bar => "bar"
-   *   }.await.result
-   * 
- */ - final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f => - val optr = f.result - if (optr.isDefined) { - val r = optr.get - if (pf.isDefinedAt(r)) pf(r) - } - } - - /** - * Creates a new Future by applying a PartialFunction to the successful - * result of this Future if a match is found, or else return a MatchError. - * If this Future is completed with an exception then the new Future will - * also contain this exception. - * Example: - *
-   * val future1 = for {
-   *   a <- actor !!! Req("Hello") collect { case Res(x: Int)    => x }
-   *   b <- actor !!! Req(a)       collect { case Res(x: String) => x }
-   *   c <- actor !!! Req(7)       collect { case Res(x: String) => x }
-   * } yield b + "-" + c
-   * 
- */ - final def collect[A](pf: PartialFunction[Any, A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) - onComplete { ft => - val v = ft.value.get - fa complete { - if (v.isLeft) v.asInstanceOf[Either[Throwable, A]] - else { - try { - val r = v.right.get - if (pf isDefinedAt r) Right(pf(r)) - else Left(new MatchError(r)) - } catch { - case e: Exception => - EventHandler.error(e, this, e.getMessage) - Left(e) - } - } - } - } - fa - } - - /** - * Creates a new Future that will handle any matching Throwable that this - * Future might contain. If there is no match, or if this Future contains - * a valid result then the new Future will contain the same. - * Example: - *
-   * Future(6 / 0) failure { case e: ArithmeticException => 0 } // result: 0
-   * Future(6 / 0) failure { case e: NotFoundException   => 0 } // result: exception
-   * Future(6 / 2) failure { case e: ArithmeticException => 0 } // result: 3
-   * 
- */ - final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) - onComplete { ft => - val opte = ft.exception - fa complete { - if (opte.isDefined) { - val e = opte.get - try { - if (pf isDefinedAt e) Right(pf(e)) - else Left(e) - } catch { - case x: Exception => Left(x) - } - } else ft.value.get - } - } - fa - } - - /** - * Creates a new Future by applying a function to the successful result of - * this Future. If this Future is completed with an exception then the new - * Future will also contain this exception. - * Example: - *
-   * val future1 = for {
-   *   a: Int    <- actor !!! "Hello" // returns 5
-   *   b: String <- actor !!! a       // returns "10"
-   *   c: String <- actor !!! 7       // returns "14"
-   * } yield b + "-" + c
-   * 
- */ - final def map[A](f: T => A): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) - onComplete { ft => - val optv = ft.value - if (optv.isDefined) { - val v = optv.get - if (v.isLeft) - fa complete v.asInstanceOf[Either[Throwable, A]] - else { - fa complete (try { - Right(f(v.right.get)) - } catch { - case e: Exception => - EventHandler.error(e, this, e.getMessage) - Left(e) - }) - } - } - } - fa - } - - /** - * Creates a new Future by applying a function to the successful result of - * this Future, and returns the result of the function as the new Future. - * If this Future is completed with an exception then the new Future will - * also contain this exception. - * Example: - *
-   * val future1 = for {
-   *   a: Int    <- actor !!! "Hello" // returns 5
-   *   b: String <- actor !!! a       // returns "10"
-   *   c: String <- actor !!! 7       // returns "14"
-   * } yield b + "-" + c
-   * 
- */ - final def flatMap[A](f: T => Future[A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) - onComplete { ft => - val optv = ft.value - if (optv.isDefined) { - val v = optv.get - if (v.isLeft) - fa complete v.asInstanceOf[Either[Throwable, A]] - else { - try { - fa.completeWith(f(v.right.get)) - } catch { - case e: Exception => - EventHandler.error(e, this, e.getMessage) - fa completeWithException e - } - } - } - } - fa - } - - final def foreach(f: T => Unit): Unit = onComplete { ft => - val optr = ft.result - if (optr.isDefined) - f(optr.get) - } - - final def filter(p: Any => Boolean): Future[Any] = { - val f = new DefaultCompletableFuture[T](timeoutInNanos, NANOS) - onComplete { ft => - val optv = ft.value - if (optv.isDefined) { - val v = optv.get - if (v.isLeft) - f complete v - else { - val r = v.right.get - f complete (try { - if (p(r)) Right(r) - else Left(new MatchError(r)) - } catch { - case e: Exception => - EventHandler.error(e, this, e.getMessage) - Left(e) - }) - } - } - } - f - } - - /** - * Returns the current result, throws the exception is one has been raised, else returns None - */ - final def resultOrException: Option[T] = { - val v = value - if (v.isDefined) { - val r = v.get - if (r.isLeft) throw r.left.get - else r.right.toOption - } else None - } - - /* Java API */ - final def onComplete[A >: T](proc: Procedure[Future[A]]): Future[T] = onComplete(proc(_)) - - final def map[A >: T, B](f: JFunc[A, B]): Future[B] = map(f(_)) - - final def flatMap[A >: T, B](f: JFunc[A, Future[B]]): Future[B] = flatMap(f(_)) - - final def foreach[A >: T](proc: Procedure[A]): Unit = foreach(proc(_)) - - final def filter(p: JFunc[Any, Boolean]): Future[Any] = filter(p(_)) - -} - -object Promise { - - def apply[A](timeout: Long): CompletableFuture[A] = new DefaultCompletableFuture[A](timeout) - - def apply[A](): CompletableFuture[A] = apply(Actor.TIMEOUT) - -} - -/** - * Essentially this is the Promise (or write-side) of a Future (read-side). - */ -trait CompletableFuture[T] extends Future[T] { - /** - * Completes this Future with the specified result, if not already completed. - * @return this - */ - def complete(value: Either[Throwable, T]): Future[T] - - /** - * Completes this Future with the specified result, if not already completed. - * @return this - */ - final def completeWithResult(result: T): Future[T] = complete(Right(result)) - - /** - * Completes this Future with the specified exception, if not already completed. - * @return this - */ - final def completeWithException(exception: Throwable): Future[T] = complete(Left(exception)) - - /** - * Completes this Future with the specified other Future, when that Future is completed, - * unless this Future has already been completed. - * @return this. - */ - final def completeWith(other: Future[T]): Future[T] = { - other onComplete { f => complete(f.value.get) } - this - } - - final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => cont(complete(Right(value))) } - - final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => - val fr = new DefaultCompletableFuture[Any](Actor.TIMEOUT) - this completeWith other onComplete { f => - try { - fr completeWith cont(f) - } catch { - case e: Exception => - EventHandler.error(e, this, e.getMessage) - fr completeWithException e - } - } - fr - } - -} - -/** - * The default concrete Future implementation. - */ -class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] { - - def this() = this(0, MILLIS) - - def this(timeout: Long) = this(timeout, MILLIS) - - val timeoutInNanos = timeunit.toNanos(timeout) - private val _startTimeInNanos = currentTimeInNanos - private val _lock = new ReentrantLock - private val _signal = _lock.newCondition - private var _value: Option[Either[Throwable, T]] = None - private var _listeners: List[Future[T] => Unit] = Nil - - /** - * Must be called inside _lock.lock<->_lock.unlock - */ - @tailrec - private def awaitUnsafe(waitTimeNanos: Long): Boolean = { - if (_value.isEmpty && waitTimeNanos > 0) { - val start = currentTimeInNanos - val remainingNanos = try { - _signal.awaitNanos(waitTimeNanos) - } catch { - case e: InterruptedException => - waitTimeNanos - (currentTimeInNanos - start) - } - awaitUnsafe(remainingNanos) - } else { - _value.isDefined - } - } - - def await(atMost: Duration) = { - _lock.lock - if (try { awaitUnsafe(atMost.toNanos min timeLeft()) } finally { _lock.unlock }) this - else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds") - } - - def await = { - _lock.lock - if (try { awaitUnsafe(timeLeft()) } finally { _lock.unlock }) this - else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds") - } - - def awaitBlocking = { - _lock.lock - try { - while (_value.isEmpty) { - _signal.await - } - this - } finally { - _lock.unlock - } - } - - def isExpired: Boolean = timeLeft() <= 0 - - def value: Option[Either[Throwable, T]] = { - _lock.lock - try { - _value - } finally { - _lock.unlock - } - } - - def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { - _lock.lock - val notifyTheseListeners = try { - if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired - _value = Some(value) - val existingListeners = _listeners - _listeners = Nil - existingListeners - } else Nil - } finally { - _signal.signalAll - _lock.unlock - } - - if (notifyTheseListeners.nonEmpty) { // Steps to ensure we don't run into a stack-overflow situation - @tailrec - def runCallbacks(rest: List[Future[T] => Unit], callbacks: Stack[() => Unit]) { - if (rest.nonEmpty) { - notifyCompleted(rest.head) - while (callbacks.nonEmpty) { callbacks.pop().apply() } - runCallbacks(rest.tail, callbacks) - } - } - - val pending = Future.callbacksPendingExecution.get - if (pending.isDefined) { //Instead of nesting the calls to the callbacks (leading to stack overflow) - pending.get.push(() => { // Linearize/aggregate callbacks at top level and then execute - val doNotify = notifyCompleted _ //Hoist closure to avoid garbage - notifyTheseListeners foreach doNotify - }) - } else { - try { - val callbacks = Stack[() => Unit]() // Allocate new aggregator for pending callbacks - Future.callbacksPendingExecution.set(Some(callbacks)) // Specify the callback aggregator - runCallbacks(notifyTheseListeners, callbacks) // Execute callbacks, if they trigger new callbacks, they are aggregated - } finally { Future.callbacksPendingExecution.set(None) } // Ensure cleanup - } - } - - this - } - - def onComplete(func: Future[T] => Unit): CompletableFuture[T] = { - _lock.lock - val notifyNow = try { - if (_value.isEmpty) { - if (!isExpired) { //Only add the listener if the future isn't expired - _listeners ::= func - false - } else false //Will never run the callback since the future is expired - } else true - } finally { - _lock.unlock - } - - if (notifyNow) notifyCompleted(func) - - this - } - - private def notifyCompleted(func: Future[T] => Unit) { - try { - func(this) - } catch { - case e => EventHandler notify EventHandler.Error(e, this) - } - } - - @inline - private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) - @inline - private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) -} - -/** - * An already completed Future is seeded with it's result at creation, is useful for when you are participating in - * a Future-composition but you already have a value to contribute. - */ -sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] { - val value = Some(suppliedValue) - - def complete(value: Either[Throwable, T]): CompletableFuture[T] = this - def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this } - def await(atMost: Duration): Future[T] = this - def await: Future[T] = this - def awaitBlocking: Future[T] = this - def isExpired: Boolean = true - def timeoutInNanos: Long = 0 -} diff --git a/test/disabled/presentation/akka/src/pi.scala b/test/disabled/presentation/akka/src/pi.scala deleted file mode 100644 index b4c644052c..0000000000 --- a/test/disabled/presentation/akka/src/pi.scala +++ /dev/null @@ -1,108 +0,0 @@ - -import akka.actor.{Actor, PoisonPill} -import Actor._ -import akka.routing.{Routing, CyclicIterator} -import Routing._ - -import java.util.concurrent.CountDownLatch - -object Pi extends App { - - calculate/*#*/(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) - - // ==================== - // ===== Messages ===== - // ==================== - sealed trait PiMessage - case object Calculate extends PiMessage/*#*/ - case class Work(start: Int, nrOfElements: Int) extends PiMessage - case class Result(value: Double) extends PiMessage - - // ================== - // ===== Worker ===== - // ================== - class Worker extends Actor/*#*/ { - - // define the work - def calculatePiFor(start: Int, nrOfElements: Int): Double = { - var acc = 0.0 - for (i <- start until (start + nrOfElements)) - acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) - acc - } - - def receive /*?*/ = { - case Work(start, nrOfElements) => - self reply/*#*/ Result(calculatePiFor(start, nrOfElements)) // perform the work // TODO: this currently returns wrong position for the symbol - } - } - - // ================== - // ===== Master ===== - // ================== - class Master( - nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) - extends Actor { - - var pi: Double = _ - var nrOfResults: Int = _ - var start: Long = _ - - // create the workers - val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]./*!*/start()) - - // wrap them with a load-balancing router - val router = Routing./*!*/loadBalancerActor(CyclicIterator(workers))./*!*/start() - - // message handler - def receive = { - case Calculate => - // schedule work - //for (start <- 0 until nrOfMessages) router ! Work(start, nrOfElements) - for (i <- 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements) - - // send a PoisonPill to all workers telling them to shut down themselves - router./*!*/!(Broadcast(PoisonPill)) - - // send a PoisonPill to the router, telling him to shut himself down - router ! PoisonPill - - case Result(value) => - // handle result from the worker - pi += value - nrOfResults/*#*/ += 1 - if (nrOfResults == nrOfMessages) self./*!*/stop() - } - - override def preStart() { - start = System.currentTimeMillis - } - - override def postStop() { - // tell the world that the calculation is complete - println( - "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis" - .format(pi, (System.currentTimeMillis - start))) - latch/*#*/.countDown() - } - } - - // ================== - // ===== Run it ===== - // ================== - def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { - - // this latch is only plumbing to know when the calculation is completed - val latch = new CountDownLatch(1) - - // create the master - val master = actorOf( - new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)).start() - - // start the calculation - master ! Calculate - - // wait for master to shut down - latch.await() - } -} -- cgit v1.2.3