From da54f34a6526b49b9e13e60c4fce242325c1c36e Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 19 Jun 2013 23:16:53 +0200 Subject: Cleaning up method implementations in Future Optimizations: 1) Avoiding isDefinedAt + apply and using applyOrElse to allow for optimizations later 2) Reducing method sizes to be more JIT + inliner friendly 3) Reusing core combinators to reuse inliner/JIT optimizations and be more code-cache friendly --- src/library/scala/concurrent/Future.scala | 181 ++++++++---------------------- 1 file changed, 44 insertions(+), 137 deletions(-) (limited to 'src') diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 6b6ad29074..bc3a241ce7 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -14,7 +14,7 @@ import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS } import java.lang.{ Iterable => JIterable } import java.util.{ LinkedList => JLinkedList } -import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } +import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicLong, AtomicBoolean } import scala.util.control.NonFatal import scala.Option @@ -101,7 +101,7 @@ trait Future[+T] extends Awaitable[T] { // that also have an executor parameter, which // keeps us from accidentally forgetting to use // the executor parameter. - private implicit def internalExecutor: ExecutionContext = Future.InternalCallbackExecutor + private def internalExecutor = Future.InternalCallbackExecutor /* Callbacks */ @@ -116,9 +116,10 @@ trait Future[+T] extends Awaitable[T] { * $callbackInContext */ def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete { - case Success(v) if pf isDefinedAt v => pf(v) + case Success(v) => + pf.applyOrElse[T, Any](v, Predef.conforms[T]) // Exploiting the cached function to avoid MatchError case _ => - }(executor) + } /** When this future is completed with a failure (i.e. with a throwable), * apply the provided callback to the throwable. @@ -134,9 +135,10 @@ trait Future[+T] extends Awaitable[T] { * $callbackInContext */ def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete { - case Failure(t) if NonFatal(t) && callback.isDefinedAt(t) => callback(t) + case Failure(t) => + callback.applyOrElse[Throwable, Any](t, Predef.conforms[Throwable]) // Exploiting the cached function to avoid MatchError case _ => - }(executor) + } /** When this future is completed, either through an exception, or a value, * apply the provided function. @@ -186,13 +188,12 @@ trait Future[+T] extends Awaitable[T] { * and throws a corresponding exception if the original future fails. */ def failed: Future[Throwable] = { + implicit val ec = internalExecutor val p = Promise[Throwable]() - onComplete { case Failure(t) => p success t case Success(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable.")) } - p.future } @@ -203,10 +204,7 @@ trait Future[+T] extends Awaitable[T] { * * Will not be called if the future fails. */ - def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete { - case Success(r) => f(r) - case _ => // do nothing - }(executor) + def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete { _ foreach f } /** Creates a new future by applying the 's' function to the successful result of * this future, or the 'f' function to the failed result. If there is any non-fatal @@ -221,19 +219,11 @@ trait Future[+T] extends Awaitable[T] { */ def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = { val p = Promise[S]() - + // transform on Try has the wrong shape for us here onComplete { - case result => - try { - result match { - case Failure(t) => p failure f(t) - case Success(r) => p success s(r) - } - } catch { - case NonFatal(t) => p failure t - } - }(executor) - + case Success(r) => p complete Try(s(r)) + case Failure(t) => p complete Try(throw f(t)) // will throw fatal errors! + } p.future } @@ -245,19 +235,7 @@ trait Future[+T] extends Awaitable[T] { */ def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity) val p = Promise[S]() - - onComplete { - case result => - try { - result match { - case Success(r) => p success f(r) - case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] - } - } catch { - case NonFatal(t) => p failure t - } - }(executor) - + onComplete { v => p complete (v map f) } p.future } @@ -270,20 +248,10 @@ trait Future[+T] extends Awaitable[T] { */ def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = { val p = Promise[S]() - onComplete { case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] - case Success(v) => - try { - f(v).onComplete({ - case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] - case Success(v) => p success v - })(internalExecutor) - } catch { - case NonFatal(t) => p failure t - } - }(executor) - + case Success(v) => try f(v) onComplete p.complete catch { case NonFatal(t) => p failure t } + } p.future } @@ -303,34 +271,14 @@ trait Future[+T] extends Awaitable[T] { * Await.result(h, Duration.Zero) // throw a NoSuchElementException * }}} */ - def filter(pred: T => Boolean)(implicit executor: ExecutionContext): Future[T] = { - val p = Promise[T]() - - onComplete { - case f: Failure[_] => p complete f.asInstanceOf[Failure[T]] - case Success(v) => - try { - if (pred(v)) p success v - else p failure new NoSuchElementException("Future.filter predicate is not satisfied") - } catch { - case NonFatal(t) => p failure t - } - }(executor) - - p.future - } + def filter(pred: T => Boolean)(implicit executor: ExecutionContext): Future[T] = + map { + r => if (pred(r)) r else throw new NoSuchElementException("Future.filter predicate is not satisfied") + } /** Used by for-comprehensions. */ final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = filter(p)(executor) - // final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p) - - // final class FutureWithFilter[+S](self: Future[S], p: S => Boolean) { - // def foreach(f: S => Unit): Unit = self filter p foreach f - // def map[R](f: S => R) = self filter p map f - // def flatMap[R](f: S => Future[R]) = self filter p flatMap f - // def withFilter(q: S => Boolean): FutureWithFilter[S] = new FutureWithFilter[S](self, x => p(x) && q(x)) - // } /** Creates a new future by mapping the value of the current future, if the given partial function is defined at that value. * @@ -352,22 +300,10 @@ trait Future[+T] extends Awaitable[T] { * Await.result(h, Duration.Zero) // throw a NoSuchElementException * }}} */ - def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = { - val p = Promise[S]() - - onComplete { - case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] - case Success(v) => - try { - if (pf.isDefinedAt(v)) p success pf(v) - else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v) - } catch { - case NonFatal(t) => p failure t - } - }(executor) - - p.future - } + def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] = + map { + r => pf.applyOrElse(r, (t: T) => throw new NoSuchElementException("Future.collect partial function is not defined at: " + t)) + } /** Creates a new future that will handle any matching throwable that this * future might contain. If there is no match, or if this future contains @@ -383,9 +319,7 @@ trait Future[+T] extends Awaitable[T] { */ def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = { val p = Promise[U]() - - onComplete { case tr => p.complete(tr recover pf) }(executor) - + onComplete { v => p complete (v recover pf) } p.future } @@ -404,17 +338,10 @@ trait Future[+T] extends Awaitable[T] { */ def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = { val p = Promise[U]() - onComplete { - case Failure(t) if pf isDefinedAt t => - try { - p completeWith pf(t) - } catch { - case NonFatal(t) => p failure t - } - case otherwise => p complete otherwise - }(executor) - + case Failure(t) => try pf.applyOrElse(t, (_: Throwable) => this) onComplete p.complete catch { case NonFatal(t) => p failure t } + case other => p complete other + } p.future } @@ -427,19 +354,12 @@ trait Future[+T] extends Awaitable[T] { * with the throwable stored in `that`. */ def zip[U](that: Future[U]): Future[(T, U)] = { + implicit val ec = internalExecutor val p = Promise[(T, U)]() - - this onComplete { + onComplete { case f: Failure[_] => p complete f.asInstanceOf[Failure[(T, U)]] - case Success(r) => - that onSuccess { - case r2 => p success ((r, r2)) - } - that onFailure { - case f => p failure f - } + case Success(s) => that onComplete { c => p.complete(c map { s2 => (s, s2) }) } } - p.future } @@ -458,6 +378,7 @@ trait Future[+T] extends Awaitable[T] { * }}} */ def fallbackTo[U >: T](that: Future[U]): Future[U] = { + implicit val ec = internalExecutor val p = Promise[U]() onComplete { case s @ Success(_) => p complete s @@ -470,23 +391,13 @@ trait Future[+T] extends Awaitable[T] { * that conforms to `S`'s erased type or a `ClassCastException` otherwise. */ def mapTo[S](implicit tag: ClassTag[S]): Future[S] = { - def boxedType(c: Class[_]): Class[_] = { + implicit val ec = internalExecutor + val boxedClass = { + val c = tag.runtimeClass if (c.isPrimitive) Future.toBoxed(c) else c } - - val p = Promise[S]() - - onComplete { - case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] - case Success(t) => - p complete (try { - Success(boxedType(tag.runtimeClass).cast(t).asInstanceOf[S]) - } catch { - case e: ClassCastException => Failure(e) - }) - } - - p.future + require(boxedClass ne null) + map(s => boxedClass.cast(s).asInstanceOf[S]) } /** Applies the side-effecting function to the result of this future, and returns @@ -514,11 +425,9 @@ trait Future[+T] extends Awaitable[T] { */ def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = { val p = Promise[T]() - onComplete { - case r => try if (pf isDefinedAt r) pf(r) finally p complete r - }(executor) - + case r => try pf.applyOrElse[Try[T], Any](r, Predef.conforms[Try[T]]) finally p complete r + } p.future } @@ -579,14 +488,12 @@ object Future { } map (_.result) } - /** Returns a `Future` to the result of the first future in the list that is completed. + /** Returns a new `Future` to the result of the first future in the list that is completed. */ def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { val p = Promise[T]() - val completeFirst: Try[T] => Unit = p tryComplete _ - futures.foreach(_ onComplete completeFirst) - + futures foreach { _ onComplete completeFirst } p.future } @@ -626,7 +533,7 @@ object Future { * }}} */ def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { - if (futures.isEmpty) Promise.successful(zero).future + if (futures.isEmpty) Future.successful(zero) else sequence(futures).map(_.foldLeft(zero)(foldFun)) } @@ -638,7 +545,7 @@ object Future { * }}} */ def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = { - if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")).future + if (futures.isEmpty) Future.failed(new NoSuchElementException("reduce attempted on empty collection")) else sequence(futures).map(_ reduceLeft op) } -- cgit v1.2.3