From 1b6468cd6178249919885847ae55bf3f33372a2a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 13 Jul 2012 17:24:41 +0200 Subject: Squashed critical enhancements to SIP-14: daemonizing global EC, giving EC to DelayedLazyVal, removing currentExecutionContext, removing impl.Future.isFutureThrowable, implementing asExecutionContext, giving a decent fallback if ForkJoinPool cannot be created --- .../scala/concurrent/ConcurrentPackageObject.scala | 66 ------------------- src/library/scala/concurrent/DelayedLazyVal.scala | 9 +-- .../scala/concurrent/ExecutionContext.scala | 5 -- src/library/scala/concurrent/Future.scala | 2 +- src/library/scala/concurrent/JavaConversions.scala | 12 +++- .../concurrent/default/TaskImpl.scala.disabled | 4 +- .../concurrent/impl/ExecutionContextImpl.scala | 75 ++++++++++++++-------- src/library/scala/concurrent/impl/Future.scala | 15 +---- src/library/scala/concurrent/impl/Promise.scala | 3 +- src/library/scala/concurrent/package.scala | 51 +++++++++++++-- 10 files changed, 110 insertions(+), 132 deletions(-) delete mode 100644 src/library/scala/concurrent/ConcurrentPackageObject.scala diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala deleted file mode 100644 index 86a86966ef..0000000000 --- a/src/library/scala/concurrent/ConcurrentPackageObject.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent - -import java.util.concurrent.{ Executors, Executor, ThreadFactory } -import scala.concurrent.forkjoin.{ ForkJoinPool, ForkJoinWorkerThread } -import scala.concurrent.util.Duration -import language.implicitConversions - - -/** This package object contains primitives for concurrent and parallel programming. - */ -abstract class ConcurrentPackageObject { - - /* concurrency constructs */ - - /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. - * - * The result becomes available once the asynchronous computation is completed. - * - * @tparam T the type of the result - * @param body the asychronous computation - * @param execctx the execution context on which the future is run - * @return the `Future` holding the result of the computation - */ - def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body) - - /** Creates a promise object which can be completed with a value. - * - * @tparam T the type of the value in the promise - * @param execctx the execution context on which the promise is created on - * @return the newly created `Promise` object - */ - def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]() - - /** Used to block on a piece of code which potentially blocks. - * - * @param body A piece of code which contains potentially blocking or long running calls. - * - * Calling this method may throw the following exceptions: - * - CancellationException - if the computation was cancelled - * - InterruptedException - in the case that a wait within the blockable object was interrupted - * - TimeoutException - in the case that the blockable object timed out - */ - def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf) - - /** Blocks on an awaitable object. - * - * @param awaitable An object with a `block` method which runs potentially blocking or long running calls. - * - * Calling this method may throw the following exceptions: - * - CancellationException - if the computation was cancelled - * - InterruptedException - in the case that a wait within the blockable object was interrupted - * - TimeoutException - in the case that the blockable object timed out - */ - def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = - BlockContext.current.internalBlockingCall(awaitable, atMost) - - @inline implicit final def int2durationops(x: Int): DurationOps = new DurationOps(x) -} diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala index 91e41748f5..6d262ea9a2 100644 --- a/src/library/scala/concurrent/DelayedLazyVal.scala +++ b/src/library/scala/concurrent/DelayedLazyVal.scala @@ -23,7 +23,7 @@ package scala.concurrent * @author Paul Phillips * @version 2.8 */ -class DelayedLazyVal[T](f: () => T, body: => Unit){ +class DelayedLazyVal[T](f: () => T, body: => Unit)(implicit exec: ExecutionContext){ @volatile private[this] var _isDone = false private[this] lazy val complete = f() @@ -39,10 +39,5 @@ class DelayedLazyVal[T](f: () => T, body: => Unit){ */ def apply(): T = if (isDone) complete else f() - // FIXME need to take ExecutionContext in constructor - import ExecutionContext.Implicits.global - future { - body - _isDone = true - } + exec.execute(new Runnable { def run = { body; _isDone = true } }) } diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index b486e5269e..debfc226db 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -43,11 +43,6 @@ trait ExecutionContextExecutorService extends ExecutionContextExecutor with Exec /** Contains factory methods for creating execution contexts. */ object ExecutionContext { - /** - * The `ExecutionContext` associated with the current `Thread` - */ - val currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal //FIXME might want to set the initial value to an executionContext that throws an exception on execute and warns that it's not set - /** * This is the explicit global ExecutionContext, * call this when you want to provide the global ExecutionContext explicitly diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 75a83d6ef8..e556be4fe3 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -136,7 +136,7 @@ trait Future[+T] extends Awaitable[T] { * $callbackInContext */ def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete { - case Left(t) if (impl.Future.isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t) + case Left(t) if NonFatal(t) && callback.isDefinedAt(t) => callback(t) case _ => }(executor) diff --git a/src/library/scala/concurrent/JavaConversions.scala b/src/library/scala/concurrent/JavaConversions.scala index 9b5e741549..ffb9926fef 100644 --- a/src/library/scala/concurrent/JavaConversions.scala +++ b/src/library/scala/concurrent/JavaConversions.scala @@ -50,8 +50,16 @@ object JavaConversions { } } - implicit def asExecutionContext(exec: ExecutorService): ExecutionContext = null // TODO + /** + * Creates a new `ExecutionContext` which uses the provided `ExecutorService`. + */ + implicit def asExecutionContext(exec: ExecutorService): ExecutionContextExecutorService = + ExecutionContext.fromExecutorService(exec) - implicit def asExecutionContext(exec: Executor): ExecutionContext = null // TODO + /** + * Creates a new `ExecutionContext` which uses the provided `Executor`. + */ + implicit def asExecutionContext(exec: Executor): ExecutionContextExecutor = + ExecutionContext.fromExecutor(exec) } diff --git a/src/library/scala/concurrent/default/TaskImpl.scala.disabled b/src/library/scala/concurrent/default/TaskImpl.scala.disabled index 50753a7154..8b4eb12d4f 100644 --- a/src/library/scala/concurrent/default/TaskImpl.scala.disabled +++ b/src/library/scala/concurrent/default/TaskImpl.scala.disabled @@ -9,7 +9,7 @@ import scala.util.Try import scala.util import scala.concurrent.util.Duration import scala.annotation.tailrec - +import scala.util.control.NonFatal private[concurrent] trait Completable[T] { @@ -167,7 +167,7 @@ extends RecursiveAction with Task[T] with Future[T] with Completable[T] { val res = body processCallbacks(tryCompleteState(Success(res)), util.Success(res)) } catch { - case t if isFutureThrowable(t) => + case t if NonFatal(t) => processCallbacks(tryCompleteState(Failure(t)), util.Failure(t)) case t => val ee = new ExecutionException(t) diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 551a444425..ccac565eea 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -10,7 +10,7 @@ package scala.concurrent.impl -import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit } +import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit, ThreadPoolExecutor } import java.util.Collection import scala.concurrent.forkjoin._ import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, ExecutionContextExecutor, ExecutionContextExecutorService } @@ -27,48 +27,71 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: } // Implement BlockContext on FJP threads - def forkJoinPoolThreadFactory = new ForkJoinPool.ForkJoinWorkerThreadFactory { - def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) with BlockContext { + class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { + def wire[T <: Thread](thread: T): T = { + thread.setDaemon(daemonic) + //Potentially set things like uncaught exception handler, name etc + thread + } + + def newThread(runnable: Runnable): Thread = wire(new Thread()) + + def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext { override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { var result: T = null.asInstanceOf[T] ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { @volatile var isdone = false def block(): Boolean = { - result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here? - isdone = true + result = try awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) finally { isdone = true } true } def isReleasable = isdone }) result } - } + }) } - def createExecutorService: ExecutorService = try { + def createExecutorService: ExecutorService = { + def getInt(name: String, f: String => Int): Int = - try f(System.getProperty(name)) catch { case e: Exception => Runtime.getRuntime.availableProcessors } + try f(System.getProperty(name)) catch { case e: Exception => Runtime.getRuntime.availableProcessors } def range(floor: Int, desired: Int, ceiling: Int): Int = if (ceiling < floor) range(ceiling, desired, floor) else scala.math.min(scala.math.max(desired, floor), ceiling) + + val minThreads = getInt("scala.concurrent.ec.minThreads", _.toInt) + val maxThreads = getInt("scala.concurrent.ec.maxThreads", _.toInt) + val numThreads = getInt("scala.concurrent.ec.numThreads", { + case null | "" => Runtime.getRuntime.availableProcessors + case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt + case other => other.toInt + }) + + val desiredParallelism = range(minThreads, numThreads, maxThreads) + + val threadFactory = new DefaultThreadFactory(daemonic = true) - new ForkJoinPool( - range( - getInt("scala.concurrent.ec.minThreads", _.toInt), - getInt("scala.concurrent.ec.numThreads", { - case null | "" => Runtime.getRuntime.availableProcessors - case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt - case other => other.toInt - }), - getInt("scala.concurrent.ec.maxThreads", _.toInt) - ), - forkJoinPoolThreadFactory, - null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does - true) //FIXME I really think this should be async... - } catch { - case NonFatal(t) => - System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool") - t.printStackTrace(System.err) - Executors.newCachedThreadPool() //FIXME use the same desired parallelism here too? + try { + new ForkJoinPool( + desiredParallelism, + threadFactory, + null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does + true) // Async all the way baby + } catch { + case NonFatal(t) => + System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to ThreadPoolExecutor") + t.printStackTrace(System.err) + val exec = new ThreadPoolExecutor( + desiredParallelism, + desiredParallelism, + 5L, + TimeUnit.MINUTES, + new LinkedBlockingQueue[Runnable], + threadFactory + ) + exec.allowCoreThreadTimeOut(true) + exec + } } def execute(runnable: Runnable): Unit = executor match { diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index 073e6c4c9f..0c031743db 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -46,26 +46,13 @@ private[concurrent] object Future { def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c - // TODO rename appropriately and make public - private[concurrent] def isFutureThrowable(t: Throwable) = t match { - case e: Error => false - case t: scala.util.control.ControlThrowable => false - case i: InterruptedException => false - case _ => true - } - private[impl] class PromiseCompletingRunnable[T](body: => T) extends Runnable { val promise = new Promise.DefaultPromise[T]() override def run() = { promise complete { - try Right(body) catch { - case NonFatal(e) => - // Commenting out reporting for now, since it produces too much output in the tests - //executor.reportFailure(e) - Left(e) - } + try Right(body) catch { case NonFatal(e) => Left(e) } } } } diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 3ac34bef8a..ccfcd30c97 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -59,7 +59,7 @@ object Promise { /** Default promise implementation. */ class DefaultPromise[T] extends AbstractPromise with Promise[T] { self => - updateState(null, Nil) // Start at "No callbacks" //FIXME switch to Unsafe instead of ARFU + updateState(null, Nil) // Start at "No callbacks" protected final def tryAwait(atMost: Duration): Boolean = { @tailrec @@ -80,7 +80,6 @@ object Promise { } else isCompleted } - //FIXME do not do this if there'll be no waiting awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue) } diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index e8921ef531..76703bf081 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -8,17 +8,59 @@ package scala -import scala.util.{ Try, Success, Failure } import scala.concurrent.util.Duration /** This package object contains primitives for concurrent and parallel programming. */ -package object concurrent extends scala.concurrent.ConcurrentPackageObject { +package object concurrent { type ExecutionException = java.util.concurrent.ExecutionException type CancellationException = java.util.concurrent.CancellationException type TimeoutException = java.util.concurrent.TimeoutException + + /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. + * + * The result becomes available once the asynchronous computation is completed. + * + * @tparam T the type of the result + * @param body the asychronous computation + * @param execctx the execution context on which the future is run + * @return the `Future` holding the result of the computation + */ + def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body) + + /** Creates a promise object which can be completed with a value. + * + * @tparam T the type of the value in the promise + * @param execctx the execution context on which the promise is created on + * @return the newly created `Promise` object + */ + def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]() + + /** Used to block on a piece of code which potentially blocks. + * + * @param body A piece of code which contains potentially blocking or long running calls. + * + * Calling this method may throw the following exceptions: + * - CancellationException - if the computation was cancelled + * - InterruptedException - in the case that a wait within the blockable object was interrupted + * - TimeoutException - in the case that the blockable object timed out + */ + def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf) + + /** Blocks on an awaitable object. + * + * @param awaitable An object with a `block` method which runs potentially blocking or long running calls. + * + * Calling this method may throw the following exceptions: + * - CancellationException - if the computation was cancelled + * - InterruptedException - in the case that a wait within the blockable object was interrupted + * - TimeoutException - in the case that the blockable object timed out + */ + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = + BlockContext.current.internalBlockingCall(awaitable, atMost) } +/* concurrency constructs */ package concurrent { sealed trait CanAwait @@ -36,9 +78,4 @@ package concurrent { } } - - final class DurationOps private[concurrent] (x: Int) { - // TODO ADD OTHERS - def ns = util.Duration.fromNanos(0) - } } -- cgit v1.2.3 From 47652e692a5bdcbc18de2881e86267d37757751d Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 13 Jul 2012 18:11:53 +0200 Subject: Fixing oversight in propagating the runnable into the newly created thread. --- src/library/scala/concurrent/impl/ExecutionContextImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index ccac565eea..6c4145e2d5 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -34,7 +34,7 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: thread } - def newThread(runnable: Runnable): Thread = wire(new Thread()) + def newThread(runnable: Runnable): Thread = wire(new Thread(runnable)) def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext { override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { -- cgit v1.2.3 From 7e0b9e8effaebbb6413e4cc63789b8f86428a684 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 13 Jul 2012 18:16:31 +0200 Subject: Changing to scala.concurrent.context. as namespace for the system properties for the global execution context --- src/library/scala/concurrent/impl/ExecutionContextImpl.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 6c4145e2d5..98f821652f 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -59,15 +59,14 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: def range(floor: Int, desired: Int, ceiling: Int): Int = if (ceiling < floor) range(ceiling, desired, floor) else scala.math.min(scala.math.max(desired, floor), ceiling) - val minThreads = getInt("scala.concurrent.ec.minThreads", _.toInt) - val maxThreads = getInt("scala.concurrent.ec.maxThreads", _.toInt) - val numThreads = getInt("scala.concurrent.ec.numThreads", { + val desiredParallelism = range( + getInt("scala.concurrent.context.minThreads", _.toInt), + getInt("scala.concurrent.context.numThreads", { case null | "" => Runtime.getRuntime.availableProcessors case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt case other => other.toInt - }) - - val desiredParallelism = range(minThreads, numThreads, maxThreads) + }), + getInt("scala.concurrent.context.maxThreads", _.toInt)) val threadFactory = new DefaultThreadFactory(daemonic = true) -- cgit v1.2.3