From c7a71c2d5c5afbb3dc047bca20c4b8c72e5c94c9 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Thu, 12 Apr 2012 20:04:57 +0200 Subject: Making changes in the scala.concurrent package. --- .../scala/concurrent/ConcurrentPackageObject.scala | 60 ++++++++++++---------- .../scala/concurrent/ExecutionContext.scala | 21 ++++---- src/library/scala/concurrent/Future.scala | 9 ++++ src/library/scala/concurrent/Promise.scala | 18 +++++-- .../concurrent/impl/ExecutionContextImpl.scala | 20 ++++---- src/library/scala/concurrent/impl/Future.scala | 13 ++++- src/library/scala/concurrent/impl/Promise.scala | 8 +-- 7 files changed, 96 insertions(+), 53 deletions(-) (limited to 'src/library') diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala index d185ade8a4..789738e6ec 100644 --- a/src/library/scala/concurrent/ConcurrentPackageObject.scala +++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala @@ -36,34 +36,42 @@ abstract class ConcurrentPackageObject { case _ => true } - private[concurrent] def resolve[T](source: Either[Throwable, T]): Either[Throwable, T] = source match { - case Left(t: scala.runtime.NonLocalReturnControl[_]) => Right(t.value.asInstanceOf[T]) - case Left(t: scala.util.control.ControlThrowable) => Left(new ExecutionException("Boxed ControlThrowable", t)) - case Left(t: InterruptedException) => Left(new ExecutionException("Boxed InterruptedException", t)) - case Left(e: Error) => Left(new ExecutionException("Boxed Error", e)) - case _ => source + private[concurrent] def resolveEither[T](source: Either[Throwable, T]): Either[Throwable, T] = source match { + case Left(t) => resolver(t) + case _ => source } - private[concurrent] def resolver[T] = - resolverFunction.asInstanceOf[PartialFunction[Throwable, Either[Throwable, T]]] - + private[concurrent] def resolver[T](throwable: Throwable): Either[Throwable, T] = throwable match { + case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value.asInstanceOf[T]) + case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t)) + case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t)) + case e: Error => Left(new ExecutionException("Boxed Error", e)) + case t => Left(t) + } + /* concurrency constructs */ + /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. + * + * The result becomes available once the asynchronous computation is completed. + * + * @tparam T the type of the result + * @param body the asychronous computation + * @param execctx the execution context on which the future is run + * @return the `Future` holding the result of the computation + */ def future[T](body: =>T)(implicit execctx: ExecutionContext = defaultExecutionContext): Future[T] = Future[T](body) + /** Creates a promise object which can be completed with a value. + * + * @tparam T the type of the value in the promise + * @param execctx the execution context on which the promise is created on + * @return the newly created `Promise` object + */ def promise[T]()(implicit execctx: ExecutionContext = defaultExecutionContext): Promise[T] = Promise[T]() - /** Wraps a block of code into an awaitable object. */ - def body2awaitable[T](body: =>T) = new Awaitable[T] { - def ready(atMost: Duration)(implicit permit: CanAwait) = { - body - this - } - def result(atMost: Duration)(implicit permit: CanAwait) = body - } - /** Used to block on a piece of code which potentially blocks. * * @param body A piece of code which contains potentially blocking or long running calls. @@ -74,7 +82,7 @@ abstract class ConcurrentPackageObject { * - TimeoutException - in the case that the blockable object timed out */ def blocking[T](body: =>T): T = - blocking(body2awaitable(body), Duration.fromNanos(0)) + blocking(impl.Future.body2awaitable(body), Duration.fromNanos(0)) /** Blocks on an awaitable object. * @@ -100,11 +108,11 @@ private[concurrent] object ConcurrentPackageObject { // compiling a subset of sources; it seems that the wildcard is not // properly handled, and you get messages like "type _$1 defined twice". // This is consistent with other package object breakdowns. - private val resolverFunction: PartialFunction[Throwable, Either[Throwable, _]] = { - case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value) - case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t)) - case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t)) - case e: Error => Left(new ExecutionException("Boxed Error", e)) - case t => Left(t) - } + // private val resolverFunction: PartialFunction[Throwable, Either[Throwable, _]] = { + // case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value) + // case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t)) + // case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t)) + // case e: Error => Left(new ExecutionException("Boxed Error", e)) + // case t => Left(t) + // } } diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index e1d4276396..3f62f58bf8 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -20,19 +20,22 @@ import collection._ trait ExecutionContext { - + + /** Runs a block of code on this execution context. + */ def execute(runnable: Runnable): Unit - - def execute[U](body: () => U): Unit - + + /** Used internally by the framework - blocks execution for at most `atMost` time while waiting + * for an `awaitable` object to become ready. + * + * Clients should use `scala.concurrent.blocking` instead. + */ def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T + /** Reports that an asynchronous computation failed. + */ def reportFailure(t: Throwable): Unit - - /* implementations follow */ - - private implicit val executionContext = this - + } diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 5bc9ad783f..1463dbcebf 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -512,6 +512,15 @@ trait Future[+T] extends Awaitable[T] { */ object Future { + /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. + * + * The result becomes available once the asynchronous computation is completed. + * + * @tparam T the type of the result + * @param body the asychronous computation + * @param execctx the execution context on which the future is run + * @return the `Future` holding the result of the computation + */ def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body) import scala.collection.mutable.Builder diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index 8f2bce5d1a..cd22a55ce7 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -107,15 +107,27 @@ trait Promise[T] { object Promise { - /** Creates a new promise. + /** Creates a promise object which can be completed with a value. + * + * @tparam T the type of the value in the promise + * @param execctx the execution context on which the promise is created on + * @return the newly created `Promise` object */ def apply[T]()(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.DefaultPromise[T]() - /** Creates an already completed Promise with the specified exception + /** Creates an already completed Promise with the specified exception. + * + * @tparam T the type of the value in the promise + * @param execctx the execution context on which the promise is created on + * @return the newly created `Promise` object */ def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Left(exception)) - /** Creates an already completed Promise with the specified result + /** Creates an already completed Promise with the specified result. + * + * @tparam T the type of the value in the promise + * @param execctx the execution context on which the promise is created on + * @return the newly created `Promise` object */ def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Right(result)) diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index c5062267dc..d15a9b828b 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -12,7 +12,7 @@ package scala.concurrent.impl import java.util.concurrent.{Callable, Executor, ExecutorService, Executors, ThreadFactory} import scala.concurrent.forkjoin._ -import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable} +import scala.concurrent.{ExecutionContext, resolver, Awaitable} import scala.concurrent.util.{ Duration } @@ -56,20 +56,20 @@ private[scala] class ExecutionContextImpl(es: AnyRef) extends ExecutionContext w def execute(runnable: Runnable): Unit = executorService match { case fj: ForkJoinPool => - if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) { - val fjtask = ForkJoinTask.adapt(runnable) - fjtask.fork - } else { - fj.execute(runnable) + Thread.currentThread match { + case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => + val fjtask = runnable match { + case fjt: ForkJoinTask[_] => fjt + case _ => ForkJoinTask.adapt(runnable) + } + fjtask.fork + case _ => + fj.execute(runnable) } case executor: Executor => executor execute runnable } - def execute[U](body: () => U): Unit = execute(new Runnable { - def run() = body() - }) - def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { Future.releaseStack(this) diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index 615ab061a5..a3c8ed3095 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -10,9 +10,11 @@ package scala.concurrent.impl -import scala.concurrent.{Awaitable, ExecutionContext} +import scala.concurrent.util.Duration +import scala.concurrent.{Awaitable, ExecutionContext, CanAwait} import scala.collection.mutable.Stack + private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] { implicit def executor: ExecutionContext @@ -54,6 +56,15 @@ object Future { classOf[Unit] -> classOf[scala.runtime.BoxedUnit] ) + /** Wraps a block of code into an awaitable object. */ + private[concurrent] def body2awaitable[T](body: =>T) = new Awaitable[T] { + def ready(atMost: Duration)(implicit permit: CanAwait) = { + body + this + } + def result(atMost: Duration)(implicit permit: CanAwait) = body + } + def boxedType(c: Class[_]): Class[_] = { if (c.isPrimitive) toBoxed(c) else c } diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index f7e073cb78..07b6d1f278 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -12,7 +12,7 @@ package scala.concurrent.impl import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } import java.util.concurrent.atomic.AtomicReferenceFieldUpdater -import scala.concurrent.{Awaitable, ExecutionContext, resolve, resolver, blocking, CanAwait, TimeoutException} +import scala.concurrent.{Awaitable, ExecutionContext, resolveEither, resolver, blocking, CanAwait, TimeoutException} //import scala.util.continuations._ import scala.concurrent.util.Duration import scala.util @@ -126,7 +126,7 @@ object Promise { value.isDefined } - blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost) + blocking(Future.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost) } def ready(atMost: Duration)(implicit permit: CanAwait): this.type = @@ -166,7 +166,7 @@ object Promise { case _ => null } } - tryComplete(resolve(value)) + tryComplete(resolveEither(value)) } finally { synchronized { notifyAll() } // notify any blockers from `tryAwait` } @@ -220,7 +220,7 @@ object Promise { */ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] { - val value = Some(resolve(suppliedValue)) + val value = Some(resolveEither(suppliedValue)) def tryComplete(value: Either[Throwable, T]): Boolean = false -- cgit v1.2.3