summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/concurrent/ConcurrentPackageObject.scala66
-rw-r--r--src/library/scala/concurrent/DelayedLazyVal.scala9
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala5
-rw-r--r--src/library/scala/concurrent/Future.scala2
-rw-r--r--src/library/scala/concurrent/JavaConversions.scala12
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala.disabled4
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala74
-rw-r--r--src/library/scala/concurrent/impl/Future.scala15
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala3
-rw-r--r--src/library/scala/concurrent/package.scala51
10 files changed, 109 insertions, 132 deletions
diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala
deleted file mode 100644
index 86a86966ef..0000000000
--- a/src/library/scala/concurrent/ConcurrentPackageObject.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-package scala.concurrent
-
-import java.util.concurrent.{ Executors, Executor, ThreadFactory }
-import scala.concurrent.forkjoin.{ ForkJoinPool, ForkJoinWorkerThread }
-import scala.concurrent.util.Duration
-import language.implicitConversions
-
-
-/** This package object contains primitives for concurrent and parallel programming.
- */
-abstract class ConcurrentPackageObject {
-
- /* concurrency constructs */
-
- /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
- *
- * The result becomes available once the asynchronous computation is completed.
- *
- * @tparam T the type of the result
- * @param body the asychronous computation
- * @param execctx the execution context on which the future is run
- * @return the `Future` holding the result of the computation
- */
- def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)
-
- /** Creates a promise object which can be completed with a value.
- *
- * @tparam T the type of the value in the promise
- * @param execctx the execution context on which the promise is created on
- * @return the newly created `Promise` object
- */
- def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]()
-
- /** Used to block on a piece of code which potentially blocks.
- *
- * @param body A piece of code which contains potentially blocking or long running calls.
- *
- * Calling this method may throw the following exceptions:
- * - CancellationException - if the computation was cancelled
- * - InterruptedException - in the case that a wait within the blockable object was interrupted
- * - TimeoutException - in the case that the blockable object timed out
- */
- def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf)
-
- /** Blocks on an awaitable object.
- *
- * @param awaitable An object with a `block` method which runs potentially blocking or long running calls.
- *
- * Calling this method may throw the following exceptions:
- * - CancellationException - if the computation was cancelled
- * - InterruptedException - in the case that a wait within the blockable object was interrupted
- * - TimeoutException - in the case that the blockable object timed out
- */
- def blocking[T](awaitable: Awaitable[T], atMost: Duration): T =
- BlockContext.current.internalBlockingCall(awaitable, atMost)
-
- @inline implicit final def int2durationops(x: Int): DurationOps = new DurationOps(x)
-}
diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala
index 91e41748f5..6d262ea9a2 100644
--- a/src/library/scala/concurrent/DelayedLazyVal.scala
+++ b/src/library/scala/concurrent/DelayedLazyVal.scala
@@ -23,7 +23,7 @@ package scala.concurrent
* @author Paul Phillips
* @version 2.8
*/
-class DelayedLazyVal[T](f: () => T, body: => Unit){
+class DelayedLazyVal[T](f: () => T, body: => Unit)(implicit exec: ExecutionContext){
@volatile private[this] var _isDone = false
private[this] lazy val complete = f()
@@ -39,10 +39,5 @@ class DelayedLazyVal[T](f: () => T, body: => Unit){
*/
def apply(): T = if (isDone) complete else f()
- // FIXME need to take ExecutionContext in constructor
- import ExecutionContext.Implicits.global
- future {
- body
- _isDone = true
- }
+ exec.execute(new Runnable { def run = { body; _isDone = true } })
}
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index b486e5269e..debfc226db 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -44,11 +44,6 @@ trait ExecutionContextExecutorService extends ExecutionContextExecutor with Exec
*/
object ExecutionContext {
/**
- * The `ExecutionContext` associated with the current `Thread`
- */
- val currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal //FIXME might want to set the initial value to an executionContext that throws an exception on execute and warns that it's not set
-
- /**
* This is the explicit global ExecutionContext,
* call this when you want to provide the global ExecutionContext explicitly
*/
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 75a83d6ef8..e556be4fe3 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -136,7 +136,7 @@ trait Future[+T] extends Awaitable[T] {
* $callbackInContext
*/
def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
- case Left(t) if (impl.Future.isFutureThrowable(t) && callback.isDefinedAt(t)) => callback(t)
+ case Left(t) if NonFatal(t) && callback.isDefinedAt(t) => callback(t)
case _ =>
}(executor)
diff --git a/src/library/scala/concurrent/JavaConversions.scala b/src/library/scala/concurrent/JavaConversions.scala
index 9b5e741549..ffb9926fef 100644
--- a/src/library/scala/concurrent/JavaConversions.scala
+++ b/src/library/scala/concurrent/JavaConversions.scala
@@ -50,8 +50,16 @@ object JavaConversions {
}
}
- implicit def asExecutionContext(exec: ExecutorService): ExecutionContext = null // TODO
+ /**
+ * Creates a new `ExecutionContext` which uses the provided `ExecutorService`.
+ */
+ implicit def asExecutionContext(exec: ExecutorService): ExecutionContextExecutorService =
+ ExecutionContext.fromExecutorService(exec)
- implicit def asExecutionContext(exec: Executor): ExecutionContext = null // TODO
+ /**
+ * Creates a new `ExecutionContext` which uses the provided `Executor`.
+ */
+ implicit def asExecutionContext(exec: Executor): ExecutionContextExecutor =
+ ExecutionContext.fromExecutor(exec)
}
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala.disabled b/src/library/scala/concurrent/default/TaskImpl.scala.disabled
index 50753a7154..8b4eb12d4f 100644
--- a/src/library/scala/concurrent/default/TaskImpl.scala.disabled
+++ b/src/library/scala/concurrent/default/TaskImpl.scala.disabled
@@ -9,7 +9,7 @@ import scala.util.Try
import scala.util
import scala.concurrent.util.Duration
import scala.annotation.tailrec
-
+import scala.util.control.NonFatal
private[concurrent] trait Completable[T] {
@@ -167,7 +167,7 @@ extends RecursiveAction with Task[T] with Future[T] with Completable[T] {
val res = body
processCallbacks(tryCompleteState(Success(res)), util.Success(res))
} catch {
- case t if isFutureThrowable(t) =>
+ case t if NonFatal(t) =>
processCallbacks(tryCompleteState(Failure(t)), util.Failure(t))
case t =>
val ee = new ExecutionException(t)
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 551a444425..98f821652f 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -10,7 +10,7 @@ package scala.concurrent.impl
-import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit }
+import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit, ThreadPoolExecutor }
import java.util.Collection
import scala.concurrent.forkjoin._
import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, ExecutionContextExecutor, ExecutionContextExecutorService }
@@ -27,48 +27,70 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
}
// Implement BlockContext on FJP threads
- def forkJoinPoolThreadFactory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
- def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) with BlockContext {
+ class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
+ def wire[T <: Thread](thread: T): T = {
+ thread.setDaemon(daemonic)
+ //Potentially set things like uncaught exception handler, name etc
+ thread
+ }
+
+ def newThread(runnable: Runnable): Thread = wire(new Thread(runnable))
+
+ def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext {
override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
var result: T = null.asInstanceOf[T]
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
@volatile var isdone = false
def block(): Boolean = {
- result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here?
- isdone = true
+ result = try awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) finally { isdone = true }
true
}
def isReleasable = isdone
})
result
}
- }
+ })
}
- def createExecutorService: ExecutorService = try {
+ def createExecutorService: ExecutorService = {
+
def getInt(name: String, f: String => Int): Int =
- try f(System.getProperty(name)) catch { case e: Exception => Runtime.getRuntime.availableProcessors }
+ try f(System.getProperty(name)) catch { case e: Exception => Runtime.getRuntime.availableProcessors }
def range(floor: Int, desired: Int, ceiling: Int): Int =
if (ceiling < floor) range(ceiling, desired, floor) else scala.math.min(scala.math.max(desired, floor), ceiling)
+
+ val desiredParallelism = range(
+ getInt("scala.concurrent.context.minThreads", _.toInt),
+ getInt("scala.concurrent.context.numThreads", {
+ case null | "" => Runtime.getRuntime.availableProcessors
+ case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt
+ case other => other.toInt
+ }),
+ getInt("scala.concurrent.context.maxThreads", _.toInt))
+
+ val threadFactory = new DefaultThreadFactory(daemonic = true)
- new ForkJoinPool(
- range(
- getInt("scala.concurrent.ec.minThreads", _.toInt),
- getInt("scala.concurrent.ec.numThreads", {
- case null | "" => Runtime.getRuntime.availableProcessors
- case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt
- case other => other.toInt
- }),
- getInt("scala.concurrent.ec.maxThreads", _.toInt)
- ),
- forkJoinPoolThreadFactory,
- null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does
- true) //FIXME I really think this should be async...
- } catch {
- case NonFatal(t) =>
- System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool")
- t.printStackTrace(System.err)
- Executors.newCachedThreadPool() //FIXME use the same desired parallelism here too?
+ try {
+ new ForkJoinPool(
+ desiredParallelism,
+ threadFactory,
+ null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does
+ true) // Async all the way baby
+ } catch {
+ case NonFatal(t) =>
+ System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to ThreadPoolExecutor")
+ t.printStackTrace(System.err)
+ val exec = new ThreadPoolExecutor(
+ desiredParallelism,
+ desiredParallelism,
+ 5L,
+ TimeUnit.MINUTES,
+ new LinkedBlockingQueue[Runnable],
+ threadFactory
+ )
+ exec.allowCoreThreadTimeOut(true)
+ exec
+ }
}
def execute(runnable: Runnable): Unit = executor match {
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 073e6c4c9f..0c031743db 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -46,26 +46,13 @@ private[concurrent] object Future {
def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c
- // TODO rename appropriately and make public
- private[concurrent] def isFutureThrowable(t: Throwable) = t match {
- case e: Error => false
- case t: scala.util.control.ControlThrowable => false
- case i: InterruptedException => false
- case _ => true
- }
-
private[impl] class PromiseCompletingRunnable[T](body: => T)
extends Runnable {
val promise = new Promise.DefaultPromise[T]()
override def run() = {
promise complete {
- try Right(body) catch {
- case NonFatal(e) =>
- // Commenting out reporting for now, since it produces too much output in the tests
- //executor.reportFailure(e)
- Left(e)
- }
+ try Right(body) catch { case NonFatal(e) => Left(e) }
}
}
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index 3ac34bef8a..ccfcd30c97 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -59,7 +59,7 @@ object Promise {
/** Default promise implementation.
*/
class DefaultPromise[T] extends AbstractPromise with Promise[T] { self =>
- updateState(null, Nil) // Start at "No callbacks" //FIXME switch to Unsafe instead of ARFU
+ updateState(null, Nil) // Start at "No callbacks"
protected final def tryAwait(atMost: Duration): Boolean = {
@tailrec
@@ -80,7 +80,6 @@ object Promise {
} else
isCompleted
}
- //FIXME do not do this if there'll be no waiting
awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)
}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index e8921ef531..76703bf081 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -8,17 +8,59 @@
package scala
-import scala.util.{ Try, Success, Failure }
import scala.concurrent.util.Duration
/** This package object contains primitives for concurrent and parallel programming.
*/
-package object concurrent extends scala.concurrent.ConcurrentPackageObject {
+package object concurrent {
type ExecutionException = java.util.concurrent.ExecutionException
type CancellationException = java.util.concurrent.CancellationException
type TimeoutException = java.util.concurrent.TimeoutException
+
+ /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
+ *
+ * The result becomes available once the asynchronous computation is completed.
+ *
+ * @tparam T the type of the result
+ * @param body the asychronous computation
+ * @param execctx the execution context on which the future is run
+ * @return the `Future` holding the result of the computation
+ */
+ def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)
+
+ /** Creates a promise object which can be completed with a value.
+ *
+ * @tparam T the type of the value in the promise
+ * @param execctx the execution context on which the promise is created on
+ * @return the newly created `Promise` object
+ */
+ def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]()
+
+ /** Used to block on a piece of code which potentially blocks.
+ *
+ * @param body A piece of code which contains potentially blocking or long running calls.
+ *
+ * Calling this method may throw the following exceptions:
+ * - CancellationException - if the computation was cancelled
+ * - InterruptedException - in the case that a wait within the blockable object was interrupted
+ * - TimeoutException - in the case that the blockable object timed out
+ */
+ def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf)
+
+ /** Blocks on an awaitable object.
+ *
+ * @param awaitable An object with a `block` method which runs potentially blocking or long running calls.
+ *
+ * Calling this method may throw the following exceptions:
+ * - CancellationException - if the computation was cancelled
+ * - InterruptedException - in the case that a wait within the blockable object was interrupted
+ * - TimeoutException - in the case that the blockable object timed out
+ */
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T =
+ BlockContext.current.internalBlockingCall(awaitable, atMost)
}
+/* concurrency constructs */
package concurrent {
sealed trait CanAwait
@@ -36,9 +78,4 @@ package concurrent {
}
}
-
- final class DurationOps private[concurrent] (x: Int) {
- // TODO ADD OTHERS
- def ns = util.Duration.fromNanos(0)
- }
}