diff options
author | Aleksandar Prokopec <axel22@gmail.com> | 2012-03-02 17:47:38 +0100 |
---|---|---|
committer | Aleksandar Prokopec <axel22@gmail.com> | 2012-03-02 17:47:38 +0100 |
commit | 6aca8a074ac7c05d3bb2429bffa7ce922f9c8bd9 (patch) | |
tree | accab2ad11bccacf37ef5c66b1d1387292e20716 /src | |
parent | 66271b123807340632c24d3dc83bb833f411cf30 (diff) | |
download | scala-6aca8a074ac7c05d3bb2429bffa7ce922f9c8bd9.tar.gz scala-6aca8a074ac7c05d3bb2429bffa7ce922f9c8bd9.tar.bz2 scala-6aca8a074ac7c05d3bb2429bffa7ce922f9c8bd9.zip |
Adding execution context based task support implementation.
Parallel collections now get the execution context task support
which by default picks the execution context from the scala concurrent
package. This execution context task support forwards calls to
either a fork join task support or the thread pool task support.
Additionally, the default execution context now uses either a fork join pool
or a thread pool executor, depending on the JVM vendor and version.
Diffstat (limited to 'src')
4 files changed, 104 insertions, 51 deletions
diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala index 20800250b4..fc99347316 100644 --- a/src/library/scala/collection/parallel/TaskSupport.scala +++ b/src/library/scala/collection/parallel/TaskSupport.scala @@ -11,15 +11,26 @@ package scala.collection.parallel - +import java.util.concurrent.ThreadPoolExecutor +import scala.concurrent.forkjoin.ForkJoinPool +import scala.concurrent.ExecutionContext trait TaskSupport extends Tasks -private[collection] class ForkJoinTaskSupport extends TaskSupport with AdaptiveWorkStealingForkJoinTasks -private[collection] class ThreadPoolTaskSupport extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks +private[collection] class ForkJoinTaskSupport(val environment: ForkJoinPool = ForkJoinTasks.defaultForkJoinPool) +extends TaskSupport with AdaptiveWorkStealingForkJoinTasks + + +private[collection] class ThreadPoolTaskSupport(val environment: ThreadPoolExecutor = ThreadPoolTasks.defaultThreadPool) +extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks + + +private[collection] class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.executionContext) +extends TaskSupport with ExecutionContextTasks + diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index a7f2c586a7..60a8bb1ed6 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -10,7 +10,10 @@ package scala.collection.parallel +import java.util.concurrent.ThreadPoolExecutor + import scala.concurrent.forkjoin._ +import scala.concurrent.ExecutionContext import scala.util.control.Breaks._ import annotation.unchecked.uncheckedVariance @@ -101,11 +104,11 @@ trait Tasks { debugMessages += s } - trait TaskImpl[R, +Tp] { + trait WrappedTask[R, +Tp] { /** the body of this task - what it executes, how it gets split and how results are merged. */ val body: Task[R, Tp] - def split: Seq[TaskImpl[R, Tp]] + def split: Seq[WrappedTask[R, Tp]] /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */ def compute() /** Start task. */ @@ -126,13 +129,10 @@ trait Tasks { def release() {} } - protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] - /* task control */ - // safe to assume it will always have the same type, - // because the `tasksupport` in parallel iterable is final - var environment: AnyRef + /** The type of the environment is more specific in the implementations. */ + val environment: AnyRef /** Executes a task and returns a future. Forwards an exception if some task threw it. */ def execute[R, Tp](fjtask: Task[R, Tp]): () => R @@ -152,11 +152,11 @@ trait Tasks { */ trait AdaptiveWorkStealingTasks extends Tasks { - trait TaskImpl[R, Tp] extends super.TaskImpl[R, Tp] { - @volatile var next: TaskImpl[R, Tp] = null + trait WrappedTask[R, Tp] extends super.WrappedTask[R, Tp] { + @volatile var next: WrappedTask[R, Tp] = null @volatile var shouldWaitFor = true - def split: Seq[TaskImpl[R, Tp]] + def split: Seq[WrappedTask[R, Tp]] def compute() = if (body.shouldSplitFurther) { internal() @@ -192,8 +192,8 @@ trait AdaptiveWorkStealingTasks extends Tasks { } def spawnSubtasks() = { - var last: TaskImpl[R, Tp] = null - var head: TaskImpl[R, Tp] = this + var last: WrappedTask[R, Tp] = null + var head: WrappedTask[R, Tp] = this do { val subtasks = head.split head = subtasks.head @@ -219,7 +219,7 @@ trait AdaptiveWorkStealingTasks extends Tasks { } // specialize ctor - protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] + protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp] } @@ -228,7 +228,7 @@ trait AdaptiveWorkStealingTasks extends Tasks { trait ThreadPoolTasks extends Tasks { import java.util.concurrent._ - trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] { + trait WrappedTask[R, +Tp] extends Runnable with super.WrappedTask[R, Tp] { // initially, this is null // once the task is started, this future is set and used for `sync` // utb: var future: Future[_] = null @@ -290,9 +290,9 @@ trait ThreadPoolTasks extends Tasks { } } - protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] + protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp] - var environment: AnyRef = ThreadPoolTasks.defaultThreadPool + val environment: ThreadPoolExecutor def executor = environment.asInstanceOf[ThreadPoolExecutor] def queue = executor.getQueue.asInstanceOf[LinkedBlockingQueue[Runnable]] @volatile var totaltasks = 0 @@ -306,7 +306,7 @@ trait ThreadPoolTasks extends Tasks { } def execute[R, Tp](task: Task[R, Tp]): () => R = { - val t = newTaskImpl(task) + val t = newWrappedTask(task) // debuglog("-----------> Executing without wait: " + task) t.start() @@ -319,7 +319,7 @@ trait ThreadPoolTasks extends Tasks { } def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { - val t = newTaskImpl(task) + val t = newWrappedTask(task) // debuglog("-----------> Executing with wait: " + task) t.start() @@ -359,10 +359,11 @@ object ThreadPoolTasks { /** An implementation of tasks objects based on the Java thread pooling API and synchronization using futures. */ +@deprecated("This implementation is not used.") trait FutureThreadPoolTasks extends Tasks { import java.util.concurrent._ - trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] { + trait WrappedTask[R, +Tp] extends Runnable with super.WrappedTask[R, Tp] { @volatile var future: Future[_] = null def start() = { @@ -377,13 +378,13 @@ trait FutureThreadPoolTasks extends Tasks { } } - protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] + protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp] - var environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool + val environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool def executor = environment.asInstanceOf[ThreadPoolExecutor] def execute[R, Tp](task: Task[R, Tp]): () => R = { - val t = newTaskImpl(task) + val t = newWrappedTask(task) // debuglog("-----------> Executing without wait: " + task) t.start @@ -396,7 +397,7 @@ trait FutureThreadPoolTasks extends Tasks { } def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { - val t = newTaskImpl(task) + val t = newWrappedTask(task) // debuglog("-----------> Executing with wait: " + task) t.start @@ -438,26 +439,26 @@ trait HavingForkJoinPool { */ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { - trait TaskImpl[R, +Tp] extends RecursiveAction with super.TaskImpl[R, Tp] { + trait WrappedTask[R, +Tp] extends RecursiveAction with super.WrappedTask[R, Tp] { def start() = fork def sync() = join def tryCancel = tryUnfork } // specialize ctor - protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] + protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp] /** The fork/join pool of this collection. */ def forkJoinPool: ForkJoinPool = environment.asInstanceOf[ForkJoinPool] - var environment: AnyRef = ForkJoinTasks.defaultForkJoinPool + val environment: ForkJoinPool /** Executes a task and does not wait for it to finish - instead returns a future. * * $fjdispatch */ def execute[R, Tp](task: Task[R, Tp]): () => R = { - val fjtask = newTaskImpl(task) + val fjtask = newWrappedTask(task) if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) { fjtask.fork @@ -480,7 +481,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { * @return the result of the task */ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { - val fjtask = newTaskImpl(task) + val fjtask = newWrappedTask(task) if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) { fjtask.fork @@ -510,25 +511,50 @@ object ForkJoinTasks { */ trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks { - class TaskImpl[R, Tp](val body: Task[R, Tp]) - extends super[ForkJoinTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] { - def split = body.split.map(b => newTaskImpl(b)) + class WrappedTask[R, Tp](val body: Task[R, Tp]) + extends super[ForkJoinTasks].WrappedTask[R, Tp] with super[AdaptiveWorkStealingTasks].WrappedTask[R, Tp] { + def split = body.split.map(b => newWrappedTask(b)) } - def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b) + def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b) } trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveWorkStealingTasks { - class TaskImpl[R, Tp](val body: Task[R, Tp]) - extends super[ThreadPoolTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] { - def split = body.split.map(b => newTaskImpl(b)) + class WrappedTask[R, Tp](val body: Task[R, Tp]) + extends super[ThreadPoolTasks].WrappedTask[R, Tp] with super[AdaptiveWorkStealingTasks].WrappedTask[R, Tp] { + def split = body.split.map(b => newWrappedTask(b)) } - def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b) + def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b) + +} + +trait ExecutionContextTasks extends Tasks { + + def executionContext = environment + + val environment: ExecutionContext + + // this part is a hack which allows switching + val driver: Tasks = executionContext match { + case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executorService match { + case fjp: ForkJoinPool => new ForkJoinTaskSupport(fjp) + case tpe: ThreadPoolExecutor => new ThreadPoolTaskSupport(tpe) + case _ => ??? + } + case _ => ??? + } + + def execute[R, Tp](task: Task[R, Tp]): () => R = driver execute task + + def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = driver executeAndWaitResult task + + def parallelismLevel = driver.parallelismLevel + } @@ -538,3 +564,6 @@ trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveW + + + diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala index ae17c7e032..7d005838d3 100644 --- a/src/library/scala/concurrent/ConcurrentPackageObject.scala +++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala @@ -8,22 +8,30 @@ package scala.concurrent + + +import java.util.concurrent.{ Executors, ExecutorService } +import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{ Duration, Try, Success, Failure } import ConcurrentPackageObject._ + + /** This package object contains primitives for concurrent and parallel programming. */ abstract class ConcurrentPackageObject { /** A global execution environment for executing lightweight tasks. */ lazy val executionContext = - new impl.ExecutionContextImpl(java.util.concurrent.Executors.newCachedThreadPool()) - - /** A global service for scheduling tasks for execution. - */ - // lazy val scheduler = - // new default.SchedulerImpl - + new impl.ExecutionContextImpl(getExecutorService) + + private[concurrent] def getExecutorService: AnyRef = + if (util.Properties.isJavaAtLeast("1.6")) { + val vendor = util.Properties.javaVmVendor + if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple")) new ForkJoinPool + else Executors.newCachedThreadPool() + } else Executors.newCachedThreadPool() + val handledFutureException: PartialFunction[Throwable, Throwable] = { case t: Throwable if isFutureThrowable(t) => t } diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 7b44d02612..7984aa02b7 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -11,20 +11,25 @@ package scala.concurrent.impl import java.util.concurrent.{Callable, ExecutorService} +import scala.concurrent.forkjoin._ import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable} import scala.util.{ Duration, Try, Success, Failure } import scala.collection.mutable.Stack -class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionContext { +class ExecutionContextImpl(val executorService: AnyRef) extends ExecutionContext { import ExecutionContextImpl._ def execute(runnable: Runnable): Unit = executorService match { - // case fj: ForkJoinPool => - // TODO fork if more applicable - // executorService execute runnable - case _ => + case fj: ForkJoinPool => + if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) { + val fjtask = ForkJoinTask.adapt(runnable) + fjtask.fork + } else { + fj.execute(runnable) + } + case executorService: ExecutorService => executorService execute runnable } |