diff options
Diffstat (limited to 'src')
4 files changed, 104 insertions, 51 deletions
diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala index 20800250b4..fc99347316 100644 --- a/src/library/scala/collection/parallel/TaskSupport.scala +++ b/src/library/scala/collection/parallel/TaskSupport.scala @@ -11,15 +11,26 @@ package scala.collection.parallel - +import java.util.concurrent.ThreadPoolExecutor +import scala.concurrent.forkjoin.ForkJoinPool +import scala.concurrent.ExecutionContext trait TaskSupport extends Tasks -private[collection] class ForkJoinTaskSupport extends TaskSupport with AdaptiveWorkStealingForkJoinTasks -private[collection] class ThreadPoolTaskSupport extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks +private[collection] class ForkJoinTaskSupport(val environment: ForkJoinPool = ForkJoinTasks.defaultForkJoinPool) +extends TaskSupport with AdaptiveWorkStealingForkJoinTasks + + +private[collection] class ThreadPoolTaskSupport(val environment: ThreadPoolExecutor = ThreadPoolTasks.defaultThreadPool) +extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks + + +private[collection] class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.executionContext) +extends TaskSupport with ExecutionContextTasks + diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index a7f2c586a7..60a8bb1ed6 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -10,7 +10,10 @@ package scala.collection.parallel +import java.util.concurrent.ThreadPoolExecutor + import scala.concurrent.forkjoin._ +import scala.concurrent.ExecutionContext import scala.util.control.Breaks._ import annotation.unchecked.uncheckedVariance @@ -101,11 +104,11 @@ trait Tasks { debugMessages += s } - trait TaskImpl[R, +Tp] { + trait WrappedTask[R, +Tp] { /** the body of this task - what it executes, how it gets split and how results are merged. */ val body: Task[R, Tp] - def split: Seq[TaskImpl[R, Tp]] + def split: Seq[WrappedTask[R, Tp]] /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */ def compute() /** Start task. */ @@ -126,13 +129,10 @@ trait Tasks { def release() {} } - protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] - /* task control */ - // safe to assume it will always have the same type, - // because the `tasksupport` in parallel iterable is final - var environment: AnyRef + /** The type of the environment is more specific in the implementations. */ + val environment: AnyRef /** Executes a task and returns a future. Forwards an exception if some task threw it. */ def execute[R, Tp](fjtask: Task[R, Tp]): () => R @@ -152,11 +152,11 @@ trait Tasks { */ trait AdaptiveWorkStealingTasks extends Tasks { - trait TaskImpl[R, Tp] extends super.TaskImpl[R, Tp] { - @volatile var next: TaskImpl[R, Tp] = null + trait WrappedTask[R, Tp] extends super.WrappedTask[R, Tp] { + @volatile var next: WrappedTask[R, Tp] = null @volatile var shouldWaitFor = true - def split: Seq[TaskImpl[R, Tp]] + def split: Seq[WrappedTask[R, Tp]] def compute() = if (body.shouldSplitFurther) { internal() @@ -192,8 +192,8 @@ trait AdaptiveWorkStealingTasks extends Tasks { } def spawnSubtasks() = { - var last: TaskImpl[R, Tp] = null - var head: TaskImpl[R, Tp] = this + var last: WrappedTask[R, Tp] = null + var head: WrappedTask[R, Tp] = this do { val subtasks = head.split head = subtasks.head @@ -219,7 +219,7 @@ trait AdaptiveWorkStealingTasks extends Tasks { } // specialize ctor - protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] + protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp] } @@ -228,7 +228,7 @@ trait AdaptiveWorkStealingTasks extends Tasks { trait ThreadPoolTasks extends Tasks { import java.util.concurrent._ - trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] { + trait WrappedTask[R, +Tp] extends Runnable with super.WrappedTask[R, Tp] { // initially, this is null // once the task is started, this future is set and used for `sync` // utb: var future: Future[_] = null @@ -290,9 +290,9 @@ trait ThreadPoolTasks extends Tasks { } } - protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] + protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp] - var environment: AnyRef = ThreadPoolTasks.defaultThreadPool + val environment: ThreadPoolExecutor def executor = environment.asInstanceOf[ThreadPoolExecutor] def queue = executor.getQueue.asInstanceOf[LinkedBlockingQueue[Runnable]] @volatile var totaltasks = 0 @@ -306,7 +306,7 @@ trait ThreadPoolTasks extends Tasks { } def execute[R, Tp](task: Task[R, Tp]): () => R = { - val t = newTaskImpl(task) + val t = newWrappedTask(task) // debuglog("-----------> Executing without wait: " + task) t.start() @@ -319,7 +319,7 @@ trait ThreadPoolTasks extends Tasks { } def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { - val t = newTaskImpl(task) + val t = newWrappedTask(task) // debuglog("-----------> Executing with wait: " + task) t.start() @@ -359,10 +359,11 @@ object ThreadPoolTasks { /** An implementation of tasks objects based on the Java thread pooling API and synchronization using futures. */ +@deprecated("This implementation is not used.") trait FutureThreadPoolTasks extends Tasks { import java.util.concurrent._ - trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] { + trait WrappedTask[R, +Tp] extends Runnable with super.WrappedTask[R, Tp] { @volatile var future: Future[_] = null def start() = { @@ -377,13 +378,13 @@ trait FutureThreadPoolTasks extends Tasks { } } - protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] + protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp] - var environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool + val environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool def executor = environment.asInstanceOf[ThreadPoolExecutor] def execute[R, Tp](task: Task[R, Tp]): () => R = { - val t = newTaskImpl(task) + val t = newWrappedTask(task) // debuglog("-----------> Executing without wait: " + task) t.start @@ -396,7 +397,7 @@ trait FutureThreadPoolTasks extends Tasks { } def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { - val t = newTaskImpl(task) + val t = newWrappedTask(task) // debuglog("-----------> Executing with wait: " + task) t.start @@ -438,26 +439,26 @@ trait HavingForkJoinPool { */ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { - trait TaskImpl[R, +Tp] extends RecursiveAction with super.TaskImpl[R, Tp] { + trait WrappedTask[R, +Tp] extends RecursiveAction with super.WrappedTask[R, Tp] { def start() = fork def sync() = join def tryCancel = tryUnfork } // specialize ctor - protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] + protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp] /** The fork/join pool of this collection. */ def forkJoinPool: ForkJoinPool = environment.asInstanceOf[ForkJoinPool] - var environment: AnyRef = ForkJoinTasks.defaultForkJoinPool + val environment: ForkJoinPool /** Executes a task and does not wait for it to finish - instead returns a future. * * $fjdispatch */ def execute[R, Tp](task: Task[R, Tp]): () => R = { - val fjtask = newTaskImpl(task) + val fjtask = newWrappedTask(task) if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) { fjtask.fork @@ -480,7 +481,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { * @return the result of the task */ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { - val fjtask = newTaskImpl(task) + val fjtask = newWrappedTask(task) if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) { fjtask.fork @@ -510,25 +511,50 @@ object ForkJoinTasks { */ trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks { - class TaskImpl[R, Tp](val body: Task[R, Tp]) - extends super[ForkJoinTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] { - def split = body.split.map(b => newTaskImpl(b)) + class WrappedTask[R, Tp](val body: Task[R, Tp]) + extends super[ForkJoinTasks].WrappedTask[R, Tp] with super[AdaptiveWorkStealingTasks].WrappedTask[R, Tp] { + def split = body.split.map(b => newWrappedTask(b)) } - def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b) + def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b) } trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveWorkStealingTasks { - class TaskImpl[R, Tp](val body: Task[R, Tp]) - extends super[ThreadPoolTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] { - def split = body.split.map(b => newTaskImpl(b)) + class WrappedTask[R, Tp](val body: Task[R, Tp]) + extends super[ThreadPoolTasks].WrappedTask[R, Tp] with super[AdaptiveWorkStealingTasks].WrappedTask[R, Tp] { + def split = body.split.map(b => newWrappedTask(b)) } - def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b) + def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b) + +} + +trait ExecutionContextTasks extends Tasks { + + def executionContext = environment + + val environment: ExecutionContext + + // this part is a hack which allows switching + val driver: Tasks = executionContext match { + case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executorService match { + case fjp: ForkJoinPool => new ForkJoinTaskSupport(fjp) + case tpe: ThreadPoolExecutor => new ThreadPoolTaskSupport(tpe) + case _ => ??? + } + case _ => ??? + } + + def execute[R, Tp](task: Task[R, Tp]): () => R = driver execute task + + def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = driver executeAndWaitResult task + + def parallelismLevel = driver.parallelismLevel + } @@ -538,3 +564,6 @@ trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveW + + + diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala index ae17c7e032..7d005838d3 100644 --- a/src/library/scala/concurrent/ConcurrentPackageObject.scala +++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala @@ -8,22 +8,30 @@ package scala.concurrent + + +import java.util.concurrent.{ Executors, ExecutorService } +import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{ Duration, Try, Success, Failure } import ConcurrentPackageObject._ + + /** This package object contains primitives for concurrent and parallel programming. */ abstract class ConcurrentPackageObject { /** A global execution environment for executing lightweight tasks. */ lazy val executionContext = - new impl.ExecutionContextImpl(java.util.concurrent.Executors.newCachedThreadPool()) - - /** A global service for scheduling tasks for execution. - */ - // lazy val scheduler = - // new default.SchedulerImpl - + new impl.ExecutionContextImpl(getExecutorService) + + private[concurrent] def getExecutorService: AnyRef = + if (util.Properties.isJavaAtLeast("1.6")) { + val vendor = util.Properties.javaVmVendor + if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple")) new ForkJoinPool + else Executors.newCachedThreadPool() + } else Executors.newCachedThreadPool() + val handledFutureException: PartialFunction[Throwable, Throwable] = { case t: Throwable if isFutureThrowable(t) => t } diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 7b44d02612..7984aa02b7 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -11,20 +11,25 @@ package scala.concurrent.impl import java.util.concurrent.{Callable, ExecutorService} +import scala.concurrent.forkjoin._ import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable} import scala.util.{ Duration, Try, Success, Failure } import scala.collection.mutable.Stack -class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionContext { +class ExecutionContextImpl(val executorService: AnyRef) extends ExecutionContext { import ExecutionContextImpl._ def execute(runnable: Runnable): Unit = executorService match { - // case fj: ForkJoinPool => - // TODO fork if more applicable - // executorService execute runnable - case _ => + case fj: ForkJoinPool => + if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) { + val fjtask = ForkJoinTask.adapt(runnable) + fjtask.fork + } else { + fj.execute(runnable) + } + case executorService: ExecutorService => executorService execute runnable } |