From 2a36246342c17044bf5aafbf71fe1f2147ffe60a Mon Sep 17 00:00:00 2001 From: phaller Date: Thu, 17 May 2012 00:20:45 +0200 Subject: SIP-14: clean ups and fixes --- src/library/scala/collection/parallel/Tasks.scala | 2 +- .../scala/concurrent/ConcurrentPackageObject.scala | 4 +- .../scala/concurrent/ExecutionContext.scala | 6 +- src/library/scala/concurrent/Future.scala | 82 ++++++++++++++-------- .../concurrent/impl/ExecutionContextImpl.scala | 80 ++++++++++++--------- src/library/scala/concurrent/impl/Future.scala | 2 + src/library/scala/concurrent/impl/Promise.scala | 8 +-- 7 files changed, 113 insertions(+), 71 deletions(-) (limited to 'src') diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index 4a581f219e..7a0116b3b3 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -541,7 +541,7 @@ trait ExecutionContextTasks extends Tasks { // this part is a hack which allows switching val driver: Tasks = executionContext match { - case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executorService match { + case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executor match { case fjp: ForkJoinPool => new ForkJoinTaskSupport(fjp) case tpe: ThreadPoolExecutor => new ThreadPoolTaskSupport(tpe) case _ => ??? diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala index c3c329121c..330a2f0e25 100644 --- a/src/library/scala/concurrent/ConcurrentPackageObject.scala +++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala @@ -8,7 +8,7 @@ package scala.concurrent -import java.util.concurrent.{ Executors, ExecutorService, ThreadFactory } +import java.util.concurrent.{ Executors, Executor, ThreadFactory } import scala.concurrent.forkjoin.{ ForkJoinPool, ForkJoinWorkerThread } import scala.concurrent.util.Duration import language.implicitConversions @@ -19,7 +19,7 @@ import language.implicitConversions abstract class ConcurrentPackageObject { /** A global execution environment for executing lightweight tasks. */ - lazy val defaultExecutionContext = new impl.ExecutionContextImpl(null) + lazy val defaultExecutionContext: ExecutionContext with Executor = impl.ExecutionContextImpl.fromExecutor(null: Executor) val currentExecutionContext = new ThreadLocal[ExecutionContext] diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index d2a2d5e8a8..436a17a33b 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -47,11 +47,13 @@ object ExecutionContext { /** Creates an `ExecutionContext` from the given `ExecutorService`. */ - def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor = new impl.ExecutionContextImpl(e, reporter) + def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit = defaultReporter): ExecutionContext with ExecutorService = + impl.ExecutionContextImpl.fromExecutorService(e, reporter) /** Creates an `ExecutionContext` from the given `Executor`. */ - def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor = new impl.ExecutionContextImpl(e, reporter) + def fromExecutor(e: Executor, reporter: Throwable => Unit = defaultReporter): ExecutionContext with Executor = + impl.ExecutionContextImpl.fromExecutor(e, reporter) def defaultReporter: Throwable => Unit = { // re-throwing `Error`s here causes an exception handling test to fail. diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 2f90afe056..4df2bb63af 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -96,9 +96,9 @@ trait Future[+T] extends Awaitable[T] { * * $multipleCallbacks */ - def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete { - case Left(t) => // do nothing - case Right(v) => if (pf isDefinedAt v) pf(v) else { /*do nothing*/ } + def onSuccess[U](pf: PartialFunction[T, U]): Unit = onComplete { + case Right(v) if pf isDefinedAt v => pf(v) + case _ => } /** When this future is completed with a failure (i.e. with a throwable), @@ -113,9 +113,9 @@ trait Future[+T] extends Awaitable[T] { * * $multipleCallbacks */ - def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete { - case Left(t) => if (isFutureThrowable(t) && callback.isDefinedAt(t)) callback(t) else { /*do nothing*/ } - case Right(v) => // do nothing + def onFailure[U](callback: PartialFunction[Throwable, U]): Unit = onComplete { + case Left(t) if (isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t) + case _ => } /** When this future is completed, either through an exception, or a value, @@ -126,7 +126,7 @@ trait Future[+T] extends Awaitable[T] { * * $multipleCallbacks */ - def onComplete[U](func: Either[Throwable, T] => U): this.type + def onComplete[U](func: Either[Throwable, T] => U): Unit /* Miscellaneous */ @@ -169,7 +169,7 @@ trait Future[+T] extends Awaitable[T] { onComplete { case Left(t) => p success t - case Right(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v)) + case Right(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable.")) } p.future @@ -184,7 +184,36 @@ trait Future[+T] extends Awaitable[T] { */ def foreach[U](f: T => U): Unit = onComplete { case Right(r) => f(r) - case Left(_) => // do nothing + case _ => // do nothing + } + + /** Creates a new future by applying the 's' function to the successful result of + * this future, or the 'f' function to the failed result. If there is any non-fatal + * exception thrown when 's' or 'f' is applied, that exception will be propagated + * to the resulting future. + * + * @param s function that transforms a successful result of the receiver into a + * successful result of the returned future + * @param f function that transforms a failure of the receiver into a failure of + * the returned future + * @return a future that will be completed with the transformed value + */ + def transform[S](s: T => S, f: Throwable => Throwable): Future[S] = { + val p = Promise[S]() + + onComplete { + case result => + try { + result match { + case Left(t) => p failure f(t) + case Right(r) => p success s(r) + } + } catch { + case NonFatal(t) => p failure t + } + } + + p.future } /** Creates a new future by applying a function to the successful result of @@ -193,14 +222,17 @@ trait Future[+T] extends Awaitable[T] { * * $forComprehensionExamples */ - def map[S](f: T => S): Future[S] = { + def map[S](f: T => S): Future[S] = { // transform(f, identity) val p = Promise[S]() onComplete { - case Left(t) => p failure t - case Right(v) => - try p success f(v) - catch { + case result => + try { + result match { + case Right(r) => p success f(r) + case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]] + } + } catch { case NonFatal(t) => p failure t } } @@ -219,11 +251,11 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[S]() onComplete { - case Left(t) => p failure t + case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]] case Right(v) => try { f(v) onComplete { - case Left(t) => p failure t + case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]] case Right(v) => p success v } } catch { @@ -254,7 +286,7 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[T]() onComplete { - case Left(t) => p failure t + case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, T]] case Right(v) => try { if (pred(v)) p success v @@ -303,7 +335,7 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[S]() onComplete { - case Left(t) => p failure t + case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]] case Right(v) => try { if (pf.isDefinedAt(v)) p success pf(v) @@ -384,7 +416,7 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[(T, U)]() this onComplete { - case Left(t) => p failure t + case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, (T, U)]] case Right(r) => that onSuccess { case r2 => p success ((r, r2)) @@ -431,7 +463,7 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[S]() onComplete { - case l: Left[Throwable, _] => p complete l.asInstanceOf[Either[Throwable, S]] + case l: Left[_, _] => p complete l.asInstanceOf[Left[Throwable, S]] case Right(t) => p complete (try { Right(boxedType(tag.erasure).cast(t).asInstanceOf[S]) @@ -470,9 +502,7 @@ trait Future[+T] extends Awaitable[T] { val p = Promise[T]() onComplete { - case r => - try if (pf isDefinedAt r) pf(r) - finally p complete r + case r => try if (pf isDefinedAt r) pf(r) finally p complete r } p.future @@ -493,11 +523,7 @@ trait Future[+T] extends Awaitable[T] { */ def either[U >: T](that: Future[U]): Future[U] = { val p = Promise[U]() - - val completePromise: PartialFunction[Either[Throwable, U], _] = { - case Left(t) => p tryFailure t - case Right(v) => p trySuccess v - } + val completePromise: PartialFunction[Either[Throwable, U], _] = { case result => p tryComplete result } this onComplete completePromise that onComplete completePromise diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 3ed960c7ab..1083a93439 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -10,18 +10,20 @@ package scala.concurrent.impl -import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory } +import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit } +import java.util.Collection import scala.concurrent.forkjoin._ import scala.concurrent.{ ExecutionContext, Awaitable } import scala.concurrent.util.Duration -private[scala] class ExecutionContextImpl(es: AnyRef, reporter: Throwable => Unit = ExecutionContext.defaultReporter) -extends ExecutionContext with Executor { - import ExecutionContextImpl._ - - val executorService: AnyRef = if (es eq null) getExecutorService else es +private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContext with Executor { + + val executor: Executor = es match { + case null => createExecutorService + case some => some + } // to ensure that the current execution context thread local is properly set def executorsThreadFactory = new ThreadFactory { @@ -42,51 +44,46 @@ extends ExecutionContext with Executor { } } - def getExecutorService: AnyRef = - if (scala.util.Properties.isJavaAtLeast("1.6")) { - val vendor = scala.util.Properties.javaVmVendor - if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple")) - new ForkJoinPool( - Runtime.getRuntime.availableProcessors(), + def createExecutorService: ExecutorService = try { new ForkJoinPool( + Runtime.getRuntime.availableProcessors(), //FIXME from config forkJoinPoolThreadFactory, - null, - false) - else - Executors.newCachedThreadPool(executorsThreadFactory) - } else Executors.newCachedThreadPool(executorsThreadFactory) + null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does + true) //FIXME I really think this should be async... + } catch { + case NonFatal(t) => + System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool") + t.printStackTrace(System.err) + Executors.newCachedThreadPool(executorsThreadFactory) + } - def execute(runnable: Runnable): Unit = executorService match { + def execute(runnable: Runnable): Unit = executor match { case fj: ForkJoinPool => Thread.currentThread match { case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => - val fjtask = runnable match { + (runnable match { case fjt: ForkJoinTask[_] => fjt case _ => ForkJoinTask.adapt(runnable) - } - fjtask.fork - case _ => - fj.execute(runnable) + }).fork + case _ => fj.execute(runnable) } - case executor: Executor => - executor execute runnable + case generic => generic execute runnable } def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { Future.releaseStack(this) - executorService match { + executor match { case fj: ForkJoinPool => var result: T = null.asInstanceOf[T] - val managedBlocker = new ForkJoinPool.ManagedBlocker { + ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { @volatile var isdone = false - def block() = { - result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) + def block(): Boolean = { + result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here? isdone = true true } def isReleasable = isdone - } - ForkJoinPool.managedBlock(managedBlocker) + }) result case _ => awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) @@ -94,12 +91,29 @@ extends ExecutionContext with Executor { } def reportFailure(t: Throwable) = reporter(t) - } private[concurrent] object ExecutionContextImpl { - + + def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter) + def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutorService = + new ExecutionContextImpl(es, reporter) with ExecutorService { + final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService] + override def execute(command: Runnable) = executor.execute(command) + override def shutdown() { asExecutorService.shutdown() } + override def shutdownNow() = asExecutorService.shutdownNow() + override def isShutdown = asExecutorService.isShutdown + override def isTerminated = asExecutorService.isTerminated + override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit) + override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable) + override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t) + override def submit(runnable: Runnable) = asExecutorService.submit(runnable) + override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables) + override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit) + override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables) + override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit) + } } diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index bf136b6195..a54e81bd05 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -65,6 +65,8 @@ private[concurrent] object Future { promise.future } + private[impl] val throwableId: Throwable => Throwable = identity _ + // an optimization for batching futures // TODO we should replace this with a public queue, // so that it can be stolen from diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 5a5b893f16..78053f5117 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -112,7 +112,7 @@ object Promise { } } - def onComplete[U](func: Either[Throwable, T] => U): this.type = { + def onComplete[U](func: Either[Throwable, T] => U): Unit = { @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed def dispatchOrAddCallback(): Unit = getState match { @@ -120,7 +120,6 @@ object Promise { case listeners: List[_] => if (updateState(listeners, func :: listeners)) () else dispatchOrAddCallback() } dispatchOrAddCallback() - this } private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) { @@ -144,10 +143,9 @@ object Promise { def tryComplete(value: Either[Throwable, T]): Boolean = false - def onComplete[U](func: Either[Throwable, T] => U): this.type = { - val completedAs = value.get + def onComplete[U](func: Either[Throwable, T] => U): Unit = { + val completedAs = value.get // Avoid closing over "this" Future.dispatchFuture(executor, () => func(completedAs)) - this } def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this -- cgit v1.2.3