diff options
author | phaller <hallerp@gmail.com> | 2012-05-17 00:20:45 +0200 |
---|---|---|
committer | phaller <hallerp@gmail.com> | 2012-05-17 00:20:45 +0200 |
commit | 2a36246342c17044bf5aafbf71fe1f2147ffe60a (patch) | |
tree | 68ae9d33231ffc7d3699e9ac51d6c2a759bc73c8 /src/library/scala/concurrent/impl | |
parent | 9fe251e16b93d4bdc8a496f3edce90ef2e207ee8 (diff) | |
download | scala-2a36246342c17044bf5aafbf71fe1f2147ffe60a.tar.gz scala-2a36246342c17044bf5aafbf71fe1f2147ffe60a.tar.bz2 scala-2a36246342c17044bf5aafbf71fe1f2147ffe60a.zip |
SIP-14: clean ups and fixes
Diffstat (limited to 'src/library/scala/concurrent/impl')
-rw-r--r-- | src/library/scala/concurrent/impl/ExecutionContextImpl.scala | 80 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Future.scala | 2 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Promise.scala | 8 |
3 files changed, 52 insertions, 38 deletions
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 3ed960c7ab..1083a93439 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -10,18 +10,20 @@ package scala.concurrent.impl -import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory } +import java.util.concurrent.{ Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit } +import java.util.Collection import scala.concurrent.forkjoin._ import scala.concurrent.{ ExecutionContext, Awaitable } import scala.concurrent.util.Duration -private[scala] class ExecutionContextImpl(es: AnyRef, reporter: Throwable => Unit = ExecutionContext.defaultReporter) -extends ExecutionContext with Executor { - import ExecutionContextImpl._ - - val executorService: AnyRef = if (es eq null) getExecutorService else es +private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContext with Executor { + + val executor: Executor = es match { + case null => createExecutorService + case some => some + } // to ensure that the current execution context thread local is properly set def executorsThreadFactory = new ThreadFactory { @@ -42,51 +44,46 @@ extends ExecutionContext with Executor { } } - def getExecutorService: AnyRef = - if (scala.util.Properties.isJavaAtLeast("1.6")) { - val vendor = scala.util.Properties.javaVmVendor - if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple")) - new ForkJoinPool( - Runtime.getRuntime.availableProcessors(), + def createExecutorService: ExecutorService = try { new ForkJoinPool( + Runtime.getRuntime.availableProcessors(), //FIXME from config forkJoinPoolThreadFactory, - null, - false) - else - Executors.newCachedThreadPool(executorsThreadFactory) - } else Executors.newCachedThreadPool(executorsThreadFactory) + null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does + true) //FIXME I really think this should be async... + } catch { + case NonFatal(t) => + System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool") + t.printStackTrace(System.err) + Executors.newCachedThreadPool(executorsThreadFactory) + } - def execute(runnable: Runnable): Unit = executorService match { + def execute(runnable: Runnable): Unit = executor match { case fj: ForkJoinPool => Thread.currentThread match { case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => - val fjtask = runnable match { + (runnable match { case fjt: ForkJoinTask[_] => fjt case _ => ForkJoinTask.adapt(runnable) - } - fjtask.fork - case _ => - fj.execute(runnable) + }).fork + case _ => fj.execute(runnable) } - case executor: Executor => - executor execute runnable + case generic => generic execute runnable } def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { Future.releaseStack(this) - executorService match { + executor match { case fj: ForkJoinPool => var result: T = null.asInstanceOf[T] - val managedBlocker = new ForkJoinPool.ManagedBlocker { + ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { @volatile var isdone = false - def block() = { - result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) + def block(): Boolean = { + result = awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) // FIXME what happens if there's an exception thrown here? isdone = true true } def isReleasable = isdone - } - ForkJoinPool.managedBlock(managedBlocker) + }) result case _ => awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) @@ -94,12 +91,29 @@ extends ExecutionContext with Executor { } def reportFailure(t: Throwable) = reporter(t) - } private[concurrent] object ExecutionContextImpl { - + + def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter) + def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutorService = + new ExecutionContextImpl(es, reporter) with ExecutorService { + final def asExecutorService: ExecutorService = executor.asInstanceOf[ExecutorService] + override def execute(command: Runnable) = executor.execute(command) + override def shutdown() { asExecutorService.shutdown() } + override def shutdownNow() = asExecutorService.shutdownNow() + override def isShutdown = asExecutorService.isShutdown + override def isTerminated = asExecutorService.isTerminated + override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit) + override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable) + override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t) + override def submit(runnable: Runnable) = asExecutorService.submit(runnable) + override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables) + override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit) + override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables) + override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit) + } } diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index bf136b6195..a54e81bd05 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -65,6 +65,8 @@ private[concurrent] object Future { promise.future } + private[impl] val throwableId: Throwable => Throwable = identity _ + // an optimization for batching futures // TODO we should replace this with a public queue, // so that it can be stolen from diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 5a5b893f16..78053f5117 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -112,7 +112,7 @@ object Promise { } } - def onComplete[U](func: Either[Throwable, T] => U): this.type = { + def onComplete[U](func: Either[Throwable, T] => U): Unit = { @tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed def dispatchOrAddCallback(): Unit = getState match { @@ -120,7 +120,6 @@ object Promise { case listeners: List[_] => if (updateState(listeners, func :: listeners)) () else dispatchOrAddCallback() } dispatchOrAddCallback() - this } private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) { @@ -144,10 +143,9 @@ object Promise { def tryComplete(value: Either[Throwable, T]): Boolean = false - def onComplete[U](func: Either[Throwable, T] => U): this.type = { - val completedAs = value.get + def onComplete[U](func: Either[Throwable, T] => U): Unit = { + val completedAs = value.get // Avoid closing over "this" Future.dispatchFuture(executor, () => func(completedAs)) - this } def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this |