From 3cb0e784a05db7d0b542cec9bf4c5fbf3772a6cf Mon Sep 17 00:00:00 2001 From: Heather Miller Date: Sat, 4 Aug 2012 21:55:35 +0200 Subject: Basing Futures on Try instead of Either --- src/library/scala/concurrent/Future.scala | 85 ++++++++++++------------- src/library/scala/concurrent/Promise.scala | 18 +++--- src/library/scala/concurrent/impl/Future.scala | 4 +- src/library/scala/concurrent/impl/Promise.scala | 62 +++++++++--------- src/library/scala/util/Try.scala | 23 ++----- 5 files changed, 87 insertions(+), 105 deletions(-) (limited to 'src') diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index d24fdbf005..09e29d971d 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -117,7 +117,7 @@ trait Future[+T] extends Awaitable[T] { * $callbackInContext */ def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete { - case Right(v) if pf isDefinedAt v => pf(v) + case Success(v) if pf isDefinedAt v => pf(v) case _ => }(executor) @@ -135,7 +135,7 @@ trait Future[+T] extends Awaitable[T] { * $callbackInContext */ def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete { - case Left(t) if NonFatal(t) && callback.isDefinedAt(t) => callback(t) + case Failure(t) if NonFatal(t) && callback.isDefinedAt(t) => callback(t) case _ => }(executor) @@ -148,7 +148,7 @@ trait Future[+T] extends Awaitable[T] { * $multipleCallbacks * $callbackInContext */ - def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit + def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit /* Miscellaneous */ @@ -169,7 +169,7 @@ trait Future[+T] extends Awaitable[T] { * if it contains a valid result, or `Some(Failure(error))` if it contains * an exception. */ - def value: Option[Either[Throwable, T]] + def value: Option[Try[T]] /* Projections */ @@ -190,8 +190,8 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[Throwable]() onComplete { - case Left(t) => p success t - case Right(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable.")) + case Failure(t) => p success t + case Success(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable.")) } p.future @@ -205,7 +205,7 @@ trait Future[+T] extends Awaitable[T] { * Will not be called if the future fails. */ def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete { - case Right(r) => f(r) + case Success(r) => f(r) case _ => // do nothing }(executor) @@ -227,8 +227,8 @@ trait Future[+T] extends Awaitable[T] { case result => try { result match { - case Left(t) => p failure f(t) - case Right(r) => p success s(r) + case Failure(t) => p failure f(t) + case Success(r) => p success s(r) } } catch { case NonFatal(t) => p failure t @@ -251,8 +251,8 @@ trait Future[+T] extends Awaitable[T] { case result => try { result match { - case Right(r) => p success f(r) - case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]] + case Success(r) => p success f(r) + case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] } } catch { case NonFatal(t) => p failure t @@ -273,12 +273,12 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[S]() onComplete { - case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]] - case Right(v) => + case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] + case Success(v) => try { f(v).onComplete({ - case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]] - case Right(v) => p success v + case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] + case Success(v) => p success v })(internalExecutor) } catch { case NonFatal(t) => p failure t @@ -308,8 +308,8 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[T]() onComplete { - case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, T]] - case Right(v) => + case f: Failure[_] => p complete f.asInstanceOf[Failure[T]] + case Success(v) => try { if (pred(v)) p success v else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v) @@ -357,8 +357,8 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[S]() onComplete { - case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]] - case Right(v) => + case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] + case Success(v) => try { if (pf.isDefinedAt(v)) p success pf(v) else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v) @@ -377,22 +377,15 @@ trait Future[+T] extends Awaitable[T] { * Example: * * {{{ - * future (6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0 - * future (6 / 0) recover { case e: NotFoundException ⇒ 0 } // result: exception - * future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3 + * future (6 / 0) recover { case e: ArithmeticException => 0 } // result: 0 + * future (6 / 0) recover { case e: NotFoundException => 0 } // result: exception + * future (6 / 2) recover { case e: ArithmeticException => 0 } // result: 3 * }}} */ def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = { val p = Promise[U]() - onComplete { - case Left(t) if pf isDefinedAt t => - try { p success pf(t) } - catch { - case NonFatal(t) => p failure t - } - case otherwise => p complete otherwise - }(executor) + onComplete { case tr => p.complete(tr recover pf) }(executor) p.future } @@ -414,7 +407,7 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[U]() onComplete { - case Left(t) if pf isDefinedAt t => + case Failure(t) if pf isDefinedAt t => try { p completeWith pf(t) } catch { @@ -438,8 +431,8 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[(T, U)]() this onComplete { - case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, (T, U)]] - case Right(r) => + case f: Failure[_] => p complete f.asInstanceOf[Failure[(T, U)]] + case Success(r) => that onSuccess { case r2 => p success ((r, r2)) } @@ -468,8 +461,8 @@ trait Future[+T] extends Awaitable[T] { def fallbackTo[U >: T](that: Future[U]): Future[U] = { val p = Promise[U]() onComplete { - case r @ Right(_) ⇒ p complete r - case _ ⇒ p completeWith that + case s @ Success(_) => p complete s + case _ => p completeWith that } p.future } @@ -485,12 +478,12 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[S]() onComplete { - case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]] - case Right(t) => + case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] + case Success(t) => p complete (try { - Right(boxedType(tag.runtimeClass).cast(t).asInstanceOf[S]) + Success(boxedType(tag.runtimeClass).cast(t).asInstanceOf[S]) } catch { - case e: ClassCastException => Left(e) + case e: ClassCastException => Failure(e) }) } @@ -520,7 +513,7 @@ trait Future[+T] extends Awaitable[T] { * } * }}} */ - def andThen[U](pf: PartialFunction[Either[Throwable, T], U])(implicit executor: ExecutionContext): Future[T] = { + def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = { val p = Promise[T]() onComplete { @@ -545,7 +538,7 @@ trait Future[+T] extends Awaitable[T] { */ def either[U >: T](that: Future[U]): Future[U] = { val p = Promise[U]() - val completePromise: PartialFunction[Either[Throwable, U], _] = { case result => p tryComplete result } + val completePromise: PartialFunction[Try[U], _] = { case result => p tryComplete result } this onComplete completePromise that onComplete completePromise @@ -615,7 +608,7 @@ object Future { def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { val p = Promise[T]() - val completeFirst: Either[Throwable, T] => Unit = p tryComplete _ + val completeFirst: Try[T] => Unit = p tryComplete _ futures.foreach(_ onComplete completeFirst) p.future @@ -629,14 +622,14 @@ object Future { else { val result = Promise[Option[T]]() val ref = new AtomicInteger(futures.size) - val search: Either[Throwable, T] => Unit = v => try { + val search: Try[T] => Unit = v => try { v match { - case Right(r) => if (predicate(r)) result tryComplete Right(Some(r)) - case _ => + case Success(r) => if (predicate(r)) result tryComplete Success(Some(r)) + case _ => } } finally { if (ref.decrementAndGet == 0) { - result tryComplete Right(None) + result tryComplete Success(None) } } diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index 5d1b2c00b6..b873939c15 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -8,6 +8,8 @@ package scala.concurrent +import scala.util.{ Try, Success, Failure } + /** Promise is an object which can be completed with a value or failed * with an exception. * @@ -49,7 +51,7 @@ trait Promise[T] { * * $promiseCompletion */ - def complete(result: Either[Throwable, T]): this.type = + def complete(result: Try[T]): this.type = if (tryComplete(result)) this else throw new IllegalStateException("Promise already completed.") /** Tries to complete the promise with either a value or the exception. @@ -58,7 +60,7 @@ trait Promise[T] { * * @return If the promise has already been completed returns `false`, or `true` otherwise. */ - def tryComplete(result: Either[Throwable, T]): Boolean + def tryComplete(result: Try[T]): Boolean /** Completes this promise with the specified future, once that future is completed. * @@ -84,7 +86,7 @@ trait Promise[T] { * * $promiseCompletion */ - def success(v: T): this.type = complete(Right(v)) + def success(v: T): this.type = complete(Success(v)) /** Tries to complete the promise with a value. * @@ -92,7 +94,7 @@ trait Promise[T] { * * @return If the promise has already been completed returns `false`, or `true` otherwise. */ - def trySuccess(value: T): Boolean = tryComplete(Right(value)) + def trySuccess(value: T): Boolean = tryComplete(Success(value)) /** Completes the promise with an exception. * @@ -102,7 +104,7 @@ trait Promise[T] { * * $promiseCompletion */ - def failure(t: Throwable): this.type = complete(Left(t)) + def failure(t: Throwable): this.type = complete(Failure(t)) /** Tries to complete the promise with an exception. * @@ -110,7 +112,7 @@ trait Promise[T] { * * @return If the promise has already been completed returns `false`, or `true` otherwise. */ - def tryFailure(t: Throwable): Boolean = tryComplete(Left(t)) + def tryFailure(t: Throwable): Boolean = tryComplete(Failure(t)) } @@ -129,14 +131,14 @@ object Promise { * @tparam T the type of the value in the promise * @return the newly created `Promise` object */ - def failed[T](exception: Throwable): Promise[T] = new impl.Promise.KeptPromise[T](Left(exception)) + def failed[T](exception: Throwable): Promise[T] = new impl.Promise.KeptPromise[T](Failure(exception)) /** Creates an already completed Promise with the specified result. * * @tparam T the type of the value in the promise * @return the newly created `Promise` object */ - def successful[T](result: T): Promise[T] = new impl.Promise.KeptPromise[T](Right(result)) + def successful[T](result: T): Promise[T] = new impl.Promise.KeptPromise[T](Success(result)) } diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index 098008e958..d92691901f 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -12,7 +12,7 @@ package scala.concurrent.impl import scala.concurrent.ExecutionContext import scala.util.control.NonFatal - +import scala.util.{Try, Success, Failure} private[concurrent] object Future { @@ -21,7 +21,7 @@ private[concurrent] object Future { override def run() = { promise complete { - try Right(body) catch { case NonFatal(e) => Left(e) } + try Success(body) catch { case NonFatal(e) => Failure(e) } } } } diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index c2df9ac296..fab6b55c52 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -15,23 +15,23 @@ import scala.concurrent.{ ExecutionContext, CanAwait, OnCompleteRunnable, Timeou import scala.concurrent.util.Duration import scala.annotation.tailrec import scala.util.control.NonFatal - +import scala.util.{ Try, Success, Failure } private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] { def future: this.type = this } -private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any) extends Runnable with OnCompleteRunnable { +private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Try[T]) => Any) extends Runnable with OnCompleteRunnable { // must be filled in before running it - var value: Either[Throwable, T] = null + var value: Try[T] = null override def run() = { require(value ne null) // must set value to non-null before running! try onComplete(value) catch { case NonFatal(e) => executor reportFailure e } } - def executeWithValue(v: Either[Throwable, T]): Unit = { + def executeWithValue(v: Try[T]): Unit = { require(value eq null) // can't complete it twice value = v executor.execute(this) @@ -40,17 +40,17 @@ private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete private[concurrent] object Promise { - private def resolveEither[T](source: Either[Throwable, T]): Either[Throwable, T] = source match { - case Left(t) => resolver(t) - case _ => source + private def resolveTry[T](source: Try[T]): Try[T] = source match { + case Failure(t) => resolver(t) + case _ => source } - private def resolver[T](throwable: Throwable): Either[Throwable, T] = throwable match { - case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value.asInstanceOf[T]) - case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t)) - case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t)) - case e: Error => Left(new ExecutionException("Boxed Error", e)) - case t => Left(t) + private def resolver[T](throwable: Throwable): Try[T] = throwable match { + case t: scala.runtime.NonLocalReturnControl[_] => Success(t.value.asInstanceOf[T]) + case t: scala.util.control.ControlThrowable => Failure(new ExecutionException("Boxed ControlThrowable", t)) + case t: InterruptedException => Failure(new ExecutionException("Boxed InterruptedException", t)) + case e: Error => Failure(new ExecutionException("Boxed Error", e)) + case t => Failure(t) } /** Default promise implementation. @@ -88,25 +88,25 @@ private[concurrent] object Promise { @throws(classOf[Exception]) def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get match { - case Left(e) => throw e - case Right(r) => r + case Failure(e) => throw e + case Success(r) => r } - def value: Option[Either[Throwable, T]] = getState match { - case c: Either[_, _] => Some(c.asInstanceOf[Either[Throwable, T]]) - case _ => None + def value: Option[Try[T]] = getState match { + case c: Try[_] => Some(c.asInstanceOf[Try[T]]) + case _ => None } override def isCompleted(): Boolean = getState match { // Cheaper than boxing result into Option due to "def value" - case _: Either[_, _] => true - case _ => false + case _: Try[_] => true + case _ => false } - def tryComplete(value: Either[Throwable, T]): Boolean = { - val resolved = resolveEither(value) + def tryComplete(value: Try[T]): Boolean = { + val resolved = resolveTry(value) (try { @tailrec - def tryComplete(v: Either[Throwable, T]): List[CallbackRunnable[T]] = { + def tryComplete(v: Try[T]): List[CallbackRunnable[T]] = { getState match { case raw: List[_] => val cur = raw.asInstanceOf[List[CallbackRunnable[T]]] @@ -124,13 +124,13 @@ private[concurrent] object Promise { } } - def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = { + def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { val runnable = new CallbackRunnable[T](executor, func) @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed def dispatchOrAddCallback(): Unit = getState match { - case r: Either[_, _] => runnable.executeWithValue(r.asInstanceOf[Either[Throwable, T]]) + case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]]) case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback() } dispatchOrAddCallback() @@ -141,15 +141,15 @@ private[concurrent] object Promise { * * Useful in Future-composition when a value to contribute is already available. */ - final class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] { + final class KeptPromise[T](suppliedValue: Try[T]) extends Promise[T] { - val value = Some(resolveEither(suppliedValue)) + val value = Some(resolveTry(suppliedValue)) override def isCompleted(): Boolean = true - def tryComplete(value: Either[Throwable, T]): Boolean = false + def tryComplete(value: Try[T]): Boolean = false - def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): Unit = { + def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { val completedAs = value.get (new CallbackRunnable(executor, func)).executeWithValue(completedAs) } @@ -157,8 +157,8 @@ private[concurrent] object Promise { def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { - case Left(e) => throw e - case Right(r) => r + case Failure(e) => throw e + case Success(r) => r } } diff --git a/src/library/scala/util/Try.scala b/src/library/scala/util/Try.scala index f85bac0b84..487ddaced3 100644 --- a/src/library/scala/util/Try.scala +++ b/src/library/scala/util/Try.scala @@ -54,6 +54,7 @@ import language.implicitConversions * * `Try` comes to the Scala standard library after years of use as an integral part of Twitter's stack. * + * @author based on Marius Eriksen's original implementation in com.twitter.util. * @since 2.10 */ sealed abstract class Try[+T] { @@ -102,7 +103,7 @@ sealed abstract class Try[+T] { * Applies the given function `f` if this is a `Failure`, otherwise returns this if this is a `Success`. * This is like `flatMap` for the exception. */ - def rescue[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] + def recoverWith[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] /** * Applies the given function `f` if this is a `Failure`, otherwise returns this if this is a `Success`. @@ -150,20 +151,6 @@ sealed abstract class Try[+T] { object Try { - implicit def try2either[T](tr: Try[T]): Either[Throwable, T] = { - tr match { - case Success(v) => Right(v) - case Failure(t) => Left(t) - } - } - - implicit def either2try[T](ei: Either[Throwable, T]): Try[T] = { - ei match { - case Right(v) => Success(v) - case Left(t) => Failure(t) - } - } - def apply[T](r: => T): Try[T] = { try { Success(r) } catch { case NonFatal(e) => Failure(e) @@ -175,7 +162,7 @@ object Try { final case class Failure[+T](val exception: Throwable) extends Try[T] { def isFailure: Boolean = true def isSuccess: Boolean = false - def rescue[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] = + def recoverWith[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] = if (f.isDefinedAt(exception)) f(exception) else this def get: T = throw exception def flatMap[U](f: T => Try[U]): Try[U] = Failure[U](exception) @@ -201,12 +188,12 @@ final case class Failure[+T](val exception: Throwable) extends Try[T] { final case class Success[+T](value: T) extends Try[T] { def isFailure: Boolean = false def isSuccess: Boolean = true - def rescue[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] = Success(value) + def recoverWith[U >: T](f: PartialFunction[Throwable, Try[U]]): Try[U] = Success(value) def get = value def flatMap[U](f: T => Try[U]): Try[U] = try f(value) catch { - case e: Throwable => Failure(e) + case NonFatal(e) => Failure(e) } def flatten[U](implicit ev: T <:< Try[U]): Try[U] = value def foreach[U](f: T => U): Unit = f(value) -- cgit v1.2.3 From 5b82a9702de3ffd5b131caf8c550877b476e8f9c Mon Sep 17 00:00:00 2001 From: phaller Date: Sun, 5 Aug 2012 00:20:01 +0200 Subject: SI-6185 - add "prepare" hook to ExecutionContext Enables important abstractions to be built on top of futures, such as Twitter's "Local" for handling data local to a callback chain. --- .../scala/concurrent/ExecutionContext.scala | 8 +++ src/library/scala/concurrent/impl/Promise.scala | 12 +++-- test/files/jvm/scala-concurrent-tck.scala | 61 ++++++++++++++++++++++ 3 files changed, 78 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 8081bb32da..ac462ac9d2 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -12,6 +12,7 @@ package scala.concurrent import java.util.concurrent.{ ExecutorService, Executor } import scala.concurrent.util.Duration import scala.annotation.implicitNotFound +import scala.util.Try /** * An `ExecutionContext` is an abstraction over an entity that can execute program logic. @@ -27,6 +28,13 @@ trait ExecutionContext { */ def reportFailure(t: Throwable): Unit + /** Prepares for the execution of callback `f`. Returns the prepared + * execution context which should be used to schedule the execution + * of the task associated with `f`. + */ + def prepare[T, U](f: Try[T] => U): ExecutionContext = + this + } /** diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index fab6b55c52..9f30529dc8 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -22,7 +22,9 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with sc def future: this.type = this } -private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: (Try[T]) => Any) extends Runnable with OnCompleteRunnable { +/* Precondition: `executor` is prepared, i.e., `executor` has been returned from invocation of `prepare` on some other `ExecutionContext`. + */ +private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable { // must be filled in before running it var value: Try[T] = null @@ -34,6 +36,8 @@ private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete def executeWithValue(v: Try[T]): Unit = { require(value eq null) // can't complete it twice value = v + // Note that we cannot prepare the ExecutionContext at this point, since we might + // already be running on a different thread! executor.execute(this) } } @@ -125,7 +129,8 @@ private[concurrent] object Promise { } def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { - val runnable = new CallbackRunnable[T](executor, func) + val preparedEC = executor.prepare(func) + val runnable = new CallbackRunnable[T](preparedEC, func) @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed def dispatchOrAddCallback(): Unit = @@ -151,7 +156,8 @@ private[concurrent] object Promise { def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { val completedAs = value.get - (new CallbackRunnable(executor, func)).executeWithValue(completedAs) + val preparedEC = executor.prepare(func) + (new CallbackRunnable(preparedEC, func)).executeWithValue(completedAs) } def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 36ab910593..976d98a337 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -926,6 +926,66 @@ trait CustomExecutionContext extends TestBase { testCallbackChainCustomEC() } +trait ExecutionContextPrepare extends TestBase { + val theLocal = new ThreadLocal[String] { + override protected def initialValue(): String = "" + } + + class PreparingExecutionContext extends ExecutionContext { + def delegate = ExecutionContext.global + + override def execute(runnable: Runnable): Unit = + delegate.execute(runnable) + + override def prepare[T, U](f: Try[T] => U): ExecutionContext = { + // save object stored in ThreadLocal storage + val localData = theLocal.get + new PreparingExecutionContext { + override def execute(runnable: Runnable): Unit = { + val wrapper = new Runnable { + override def run(): Unit = { + // now we're on the new thread + // put localData into theLocal + theLocal.set(localData) + runnable.run() + } + } + delegate.execute(wrapper) + } + } + } + + override def reportFailure(t: Throwable): Unit = + delegate.reportFailure(t) + } + + implicit val ec = new PreparingExecutionContext + + def testOnComplete(): Unit = once { + done => + theLocal.set("secret") + val fut = future { 42 } + fut onComplete { + case _ => + assert(theLocal.get == "secret") + done() + } + } + + def testMap(): Unit = once { + done => + theLocal.set("secret2") + val fut = future { 42 } + fut map { x => + assert(theLocal.get == "secret2") + done() + } + } + + testOnComplete() + testMap() +} + object Test extends App with FutureCallbacks @@ -935,6 +995,7 @@ with Promises with BlockContexts with Exceptions with CustomExecutionContext +with ExecutionContextPrepare { System.exit(0) } -- cgit v1.2.3 From cb6066ee6136d78ceb8c3b5c06cefac998966dd0 Mon Sep 17 00:00:00 2001 From: Heather Miller Date: Sun, 5 Aug 2012 12:38:13 +0200 Subject: Temporarily skips failing test due to optimizer bug SI-6188 Also swaps the arguments to method transform on Try, so as to mirror transform on scala.concurrent.Future. --- src/library/scala/util/Try.scala | 2 +- test/files/jvm/future-spec/TryTests.scala | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/library/scala/util/Try.scala b/src/library/scala/util/Try.scala index 487ddaced3..c834f6d514 100644 --- a/src/library/scala/util/Try.scala +++ b/src/library/scala/util/Try.scala @@ -142,7 +142,7 @@ sealed abstract class Try[+T] { /** Completes this `Try` by applying the function `f` to this if this is of type `Failure`, or conversely, by applying * `s` if this is a `Success`. */ - def transform[U](f: Throwable => Try[U], s: T => Try[U]): Try[U] = this match { + def transform[U](s: T => Try[U], f: Throwable => Try[U]): Try[U] = this match { case Success(v) => s(v) case Failure(e) => f(e) } diff --git a/test/files/jvm/future-spec/TryTests.scala b/test/files/jvm/future-spec/TryTests.scala index 9d749c44ba..47ce9c2e15 100644 --- a/test/files/jvm/future-spec/TryTests.scala +++ b/test/files/jvm/future-spec/TryTests.scala @@ -55,12 +55,12 @@ object TryTests extends MinimalScalaTest { Failure[Int](e) flatMap(x => Success(1 + x)) mustEqual Failure(e) } - "when there is an exception" in { - Success(1).flatMap[Int](_ => throw e) mustEqual Failure(e) + // "when there is an exception" in { + // Success(1).flatMap[Int](_ => throw e) mustEqual Failure(e) - val e2 = new Exception - Failure[Int](e).flatMap[Int](_ => throw e2) mustEqual Failure(e) - } + // val e2 = new Exception + // Failure[Int](e).flatMap[Int](_ => throw e2) mustEqual Failure(e) + // } } "flatten" in { -- cgit v1.2.3 From ea81ab3314359c98986e6fac74c807fa1accdfb6 Mon Sep 17 00:00:00 2001 From: Heather Miller Date: Tue, 7 Aug 2012 18:36:36 +0200 Subject: Added tests, removal of unnecessary methods, fixes prepare --- src/library/scala/concurrent/ExecutionContext.scala | 8 +++----- src/library/scala/concurrent/impl/Promise.scala | 9 +++------ src/library/scala/util/Try.scala | 13 +------------ test/files/jvm/future-spec/TryTests.scala | 12 ++++++------ test/files/jvm/try-type-tests.scala | 14 ++++++++++++++ 5 files changed, 27 insertions(+), 29 deletions(-) (limited to 'src') diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index ac462ac9d2..82c1cff4b1 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -28,12 +28,10 @@ trait ExecutionContext { */ def reportFailure(t: Throwable): Unit - /** Prepares for the execution of callback `f`. Returns the prepared - * execution context which should be used to schedule the execution - * of the task associated with `f`. + /** Prepares for the execution of a task. Returns the prepared + * execution context. */ - def prepare[T, U](f: Try[T] => U): ExecutionContext = - this + def prepare(): ExecutionContext = this } diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 9f30529dc8..b19bed004b 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -129,7 +129,7 @@ private[concurrent] object Promise { } def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { - val preparedEC = executor.prepare(func) + val preparedEC = executor.prepare val runnable = new CallbackRunnable[T](preparedEC, func) @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed @@ -156,16 +156,13 @@ private[concurrent] object Promise { def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = { val completedAs = value.get - val preparedEC = executor.prepare(func) + val preparedEC = executor.prepare (new CallbackRunnable(preparedEC, func)).executeWithValue(completedAs) } def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this - def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { - case Failure(e) => throw e - case Success(r) => r - } + def result(atMost: Duration)(implicit permit: CanAwait): T = value.get.get } } diff --git a/src/library/scala/util/Try.scala b/src/library/scala/util/Try.scala index c834f6d514..f381a18b0c 100644 --- a/src/library/scala/util/Try.scala +++ b/src/library/scala/util/Try.scala @@ -54,7 +54,7 @@ import language.implicitConversions * * `Try` comes to the Scala standard library after years of use as an integral part of Twitter's stack. * - * @author based on Marius Eriksen's original implementation in com.twitter.util. + * @author based on Twitter's original implementation in com.twitter.util. * @since 2.10 */ sealed abstract class Try[+T] { @@ -116,17 +116,6 @@ sealed abstract class Try[+T] { */ def toOption = if (isSuccess) Some(get) else None - /** - * Returns an empty `Seq` (usually a `List`) if this is a `Failure` or a `Seq` containing the value if this is a `Success`. - */ - def toSeq = if (isSuccess) Seq(get) else Seq() - - /** - * Returns the given function applied to the value from this `Success` or returns this if this is a `Failure`. - * Alias for `flatMap`. - */ - def andThen[U](f: T => Try[U]): Try[U] = flatMap(f) - /** * Transforms a nested `Try`, ie, a `Try` of type `Try[Try[T]]`, * into an un-nested `Try`, ie, a `Try` of type `Try[T]`. diff --git a/test/files/jvm/future-spec/TryTests.scala b/test/files/jvm/future-spec/TryTests.scala index 47ce9c2e15..db0be0dfff 100644 --- a/test/files/jvm/future-spec/TryTests.scala +++ b/test/files/jvm/future-spec/TryTests.scala @@ -1,4 +1,4 @@ - +x // This is a port of the com.twitter.util Try spec. // -- // It lives in the future-spec directory simply because it requires a specs-like @@ -55,12 +55,12 @@ object TryTests extends MinimalScalaTest { Failure[Int](e) flatMap(x => Success(1 + x)) mustEqual Failure(e) } - // "when there is an exception" in { - // Success(1).flatMap[Int](_ => throw e) mustEqual Failure(e) + "when there is an exception" in { + Success(1).flatMap[Int](_ => throw e) mustEqual Failure(e) - // val e2 = new Exception - // Failure[Int](e).flatMap[Int](_ => throw e2) mustEqual Failure(e) - // } + val e2 = new Exception + Failure[Int](e).flatMap[Int](_ => throw e2) mustEqual Failure(e) + } } "flatten" in { diff --git a/test/files/jvm/try-type-tests.scala b/test/files/jvm/try-type-tests.scala index 9dece8d6d8..351f02b183 100644 --- a/test/files/jvm/try-type-tests.scala +++ b/test/files/jvm/try-type-tests.scala @@ -103,6 +103,20 @@ trait TryStandard { } } + def testSuccessTransform(): Unit = { + val s = Success(1) + val succ = (x: Int) => Success(x * 10) + val fail = (x: Throwable) => Success(0) + assert(s.transform(succ, fail).get == s.get) + } + + def testFailureTransform(): Unit = { + val f = Failure(new Exception("foo")) + val succ = (x: Int) => Success(x * 10) + val fail = (x: Throwable) => Success(0) + assert(f.transform(succ, fail).get == 0) + } + testForeachSuccess() testForeachFailure() testFlatMapSuccess() -- cgit v1.2.3 From 540706a95d15b43dd94b5258477d36468e76474b Mon Sep 17 00:00:00 2001 From: Heather Miller Date: Wed, 8 Aug 2012 08:03:17 +0200 Subject: Doc fix on exec ctx prepare method, fix to tests --- src/library/scala/concurrent/ExecutionContext.scala | 3 ++- src/library/scala/concurrent/Future.scala | 2 +- test/files/jvm/future-spec/TryTests.scala | 1 - test/files/jvm/try-type-tests.scala | 4 +++- 4 files changed, 6 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 82c1cff4b1..1be6050303 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -29,7 +29,8 @@ trait ExecutionContext { def reportFailure(t: Throwable): Unit /** Prepares for the execution of a task. Returns the prepared - * execution context. + * execution context. A valid implementation of `prepare` is one + * that simply returns `this`. */ def prepare(): ExecutionContext = this diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 09e29d971d..bc0b437a33 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -312,7 +312,7 @@ trait Future[+T] extends Awaitable[T] { case Success(v) => try { if (pred(v)) p success v - else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v) + else p failure new NoSuchElementException("Future.filter predicate is not satisfied") } catch { case NonFatal(t) => p failure t } diff --git a/test/files/jvm/future-spec/TryTests.scala b/test/files/jvm/future-spec/TryTests.scala index db0be0dfff..82ca12276f 100644 --- a/test/files/jvm/future-spec/TryTests.scala +++ b/test/files/jvm/future-spec/TryTests.scala @@ -1,4 +1,3 @@ -x // This is a port of the com.twitter.util Try spec. // -- // It lives in the future-spec directory simply because it requires a specs-like diff --git a/test/files/jvm/try-type-tests.scala b/test/files/jvm/try-type-tests.scala index 351f02b183..17811f64c5 100644 --- a/test/files/jvm/try-type-tests.scala +++ b/test/files/jvm/try-type-tests.scala @@ -107,7 +107,7 @@ trait TryStandard { val s = Success(1) val succ = (x: Int) => Success(x * 10) val fail = (x: Throwable) => Success(0) - assert(s.transform(succ, fail).get == s.get) + assert(s.transform(succ, fail).get == 10) } def testFailureTransform(): Unit = { @@ -133,6 +133,8 @@ trait TryStandard { testFlattenSuccess() testFailedSuccess() testFailedFailure() + testSuccessTransform() + testFailureTransform() } object Test -- cgit v1.2.3