summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/impl
diff options
context:
space:
mode:
authorphaller <hallerp@gmail.com>2012-05-17 00:20:45 +0200
committerphaller <hallerp@gmail.com>2012-05-17 00:20:45 +0200
commit2a36246342c17044bf5aafbf71fe1f2147ffe60a (patch)
tree68ae9d33231ffc7d3699e9ac51d6c2a759bc73c8 /src/library/scala/concurrent/impl
parent9fe251e16b93d4bdc8a496f3edce90ef2e207ee8 (diff)
downloadscala-2a36246342c17044bf5aafbf71fe1f2147ffe60a.tar.gz
scala-2a36246342c17044bf5aafbf71fe1f2147ffe60a.tar.bz2
scala-2a36246342c17044bf5aafbf71fe1f2147ffe60a.zip
SIP-14: clean ups and fixes
Diffstat (limited to 'src/library/scala/concurrent/impl')
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala80
-rw-r--r--src/library/scala/concurrent/impl/Future.scala2
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala8
3 files changed, 52 insertions, 38 deletions
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 3ed960c7ab..1083a93439 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -10,18 +10,20 @@ package scala.concurrent.impl
-import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory }
+import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit }
+import java.util.Collection
import scala.concurrent.forkjoin._
import scala.concurrent.{ ExecutionContext, Awaitable }
import scala.concurrent.util.Duration
-private[scala] class ExecutionContextImpl(es: AnyRef, reporter: Throwable => Unit = ExecutionContext.defaultReporter)
-extends ExecutionContext with Executor {
- import ExecutionContextImpl._
-
- val executorService: AnyRef = if (es eq null) getExecutorService else es
+private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContext with Executor {
+
+ val executor: Executor = es match {
+ case null => createExecutorService
+ case some => some
+ }
// to ensure that the current execution context thread local is properly set
def executorsThreadFactory = new ThreadFactory {
@@ -42,51 +44,46 @@ extends ExecutionContext with Executor {
}
}
- def getExecutorService: AnyRef =
- if (scala.util.Properties.isJavaAtLeast("1.6")) {
- val vendor = scala.util.Properties.javaVmVendor
- if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple"))
- new ForkJoinPool(
- Runtime.getRuntime.availableProcessors(),
+ def createExecutorService: ExecutorService = try { new ForkJoinPool(
+ Runtime.getRuntime.availableProcessors(), //FIXME from config
forkJoinPoolThreadFactory,
- null,
- false)
- else
- Executors.newCachedThreadPool(executorsThreadFactory)
- } else Executors.newCachedThreadPool(executorsThreadFactory)
+ null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does
+ true) //FIXME I really think this should be async...
+ } catch {
+ case NonFatal(t) =>
+ System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool")
+ t.printStackTrace(System.err)
+ Executors.newCachedThreadPool(executorsThreadFactory)
+ }
- def execute(runnable: Runnable): Unit = executorService match {
+ def execute(runnable: Runnable): Unit = executor match {
case fj: ForkJoinPool =>
Thread.currentThread match {
case fjw: ForkJoinWorkerThread if fjw.getPool eq fj =>
- val fjtask = runnable match {
+ (runnable match {
case fjt: ForkJoinTask[_] => fjt
case _ => ForkJoinTask.adapt(runnable)
- }
- fjtask.fork
- case _ =>
- fj.execute(runnable)
+ }).fork
+ case _ => fj.execute(runnable)
}
- case executor: Executor =>
- executor execute runnable
+ case generic => generic execute runnable
}
def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
Future.releaseStack(this)
- executorService match {
+ executor match {
case fj: ForkJoinPool =>
var result: T = null.asInstanceOf[T]
- val managedBlocker = new ForkJoinPool.ManagedBlocker {
+ ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
@volatile var isdone = false
- def block() = {
- result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
+ def block(): Boolean = {
+ result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here?
isdone = true
true
}
def isReleasable = isdone
- }
- ForkJoinPool.managedBlock(managedBlocker)
+ })
result
case _ =>
awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
@@ -94,12 +91,29 @@ extends ExecutionContext with Executor {
}
def reportFailure(t: Throwable) = reporter(t)
-
}
private[concurrent] object ExecutionContextImpl {
-
+
+ def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
+ def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutorService =
+ new ExecutionContextImpl(es, reporter) with ExecutorService {
+ final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService]
+ override def execute(command: Runnable) = executor.execute(command)
+ override def shutdown() { asExecutorService.shutdown() }
+ override def shutdownNow() = asExecutorService.shutdownNow()
+ override def isShutdown = asExecutorService.isShutdown
+ override def isTerminated = asExecutorService.isTerminated
+ override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit)
+ override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable)
+ override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t)
+ override def submit(runnable: Runnable) = asExecutorService.submit(runnable)
+ override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables)
+ override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit)
+ override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables)
+ override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit)
+ }
}
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index bf136b6195..a54e81bd05 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -65,6 +65,8 @@ private[concurrent] object Future {
promise.future
}
+ private[impl] val throwableId: Throwable => Throwable = identity _
+
// an optimization for batching futures
// TODO we should replace this with a public queue,
// so that it can be stolen from
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index 5a5b893f16..78053f5117 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -112,7 +112,7 @@ object Promise {
}
}
- def onComplete[U](func: Either[Throwable, T] => U): this.type = {
+ def onComplete[U](func: Either[Throwable, T] => U): Unit = {
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
@@ -120,7 +120,6 @@ object Promise {
case listeners: List[_] => if (updateState(listeners, func :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
- this
}
private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) {
@@ -144,10 +143,9 @@ object Promise {
def tryComplete(value: Either[Throwable, T]): Boolean = false
- def onComplete[U](func: Either[Throwable, T] => U): this.type = {
- val completedAs = value.get
+ def onComplete[U](func: Either[Throwable, T] => U): Unit = {
+ val completedAs = value.get // Avoid closing over "this"
Future.dispatchFuture(executor, () => func(completedAs))
- this
}
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this