summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/Tasks.scala
diff options
context:
space:
mode:
authorAleksandar Prokopec <axel22@gmail.com>2012-03-02 17:47:38 +0100
committerAleksandar Prokopec <axel22@gmail.com>2012-03-02 17:47:38 +0100
commit6aca8a074ac7c05d3bb2429bffa7ce922f9c8bd9 (patch)
treeaccab2ad11bccacf37ef5c66b1d1387292e20716 /src/library/scala/collection/parallel/Tasks.scala
parent66271b123807340632c24d3dc83bb833f411cf30 (diff)
downloadscala-6aca8a074ac7c05d3bb2429bffa7ce922f9c8bd9.tar.gz
scala-6aca8a074ac7c05d3bb2429bffa7ce922f9c8bd9.tar.bz2
scala-6aca8a074ac7c05d3bb2429bffa7ce922f9c8bd9.zip
Adding execution context based task support implementation.
Parallel collections now get the execution context task support which by default picks the execution context from the scala concurrent package. This execution context task support forwards calls to either a fork join task support or the thread pool task support. Additionally, the default execution context now uses either a fork join pool or a thread pool executor, depending on the JVM vendor and version.
Diffstat (limited to 'src/library/scala/collection/parallel/Tasks.scala')
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala101
1 files changed, 65 insertions, 36 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index a7f2c586a7..60a8bb1ed6 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -10,7 +10,10 @@ package scala.collection.parallel
+import java.util.concurrent.ThreadPoolExecutor
+
import scala.concurrent.forkjoin._
+import scala.concurrent.ExecutionContext
import scala.util.control.Breaks._
import annotation.unchecked.uncheckedVariance
@@ -101,11 +104,11 @@ trait Tasks {
debugMessages += s
}
- trait TaskImpl[R, +Tp] {
+ trait WrappedTask[R, +Tp] {
/** the body of this task - what it executes, how it gets split and how results are merged. */
val body: Task[R, Tp]
- def split: Seq[TaskImpl[R, Tp]]
+ def split: Seq[WrappedTask[R, Tp]]
/** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */
def compute()
/** Start task. */
@@ -126,13 +129,10 @@ trait Tasks {
def release() {}
}
- protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
-
/* task control */
- // safe to assume it will always have the same type,
- // because the `tasksupport` in parallel iterable is final
- var environment: AnyRef
+ /** The type of the environment is more specific in the implementations. */
+ val environment: AnyRef
/** Executes a task and returns a future. Forwards an exception if some task threw it. */
def execute[R, Tp](fjtask: Task[R, Tp]): () => R
@@ -152,11 +152,11 @@ trait Tasks {
*/
trait AdaptiveWorkStealingTasks extends Tasks {
- trait TaskImpl[R, Tp] extends super.TaskImpl[R, Tp] {
- @volatile var next: TaskImpl[R, Tp] = null
+ trait WrappedTask[R, Tp] extends super.WrappedTask[R, Tp] {
+ @volatile var next: WrappedTask[R, Tp] = null
@volatile var shouldWaitFor = true
- def split: Seq[TaskImpl[R, Tp]]
+ def split: Seq[WrappedTask[R, Tp]]
def compute() = if (body.shouldSplitFurther) {
internal()
@@ -192,8 +192,8 @@ trait AdaptiveWorkStealingTasks extends Tasks {
}
def spawnSubtasks() = {
- var last: TaskImpl[R, Tp] = null
- var head: TaskImpl[R, Tp] = this
+ var last: WrappedTask[R, Tp] = null
+ var head: WrappedTask[R, Tp] = this
do {
val subtasks = head.split
head = subtasks.head
@@ -219,7 +219,7 @@ trait AdaptiveWorkStealingTasks extends Tasks {
}
// specialize ctor
- protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+ protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp]
}
@@ -228,7 +228,7 @@ trait AdaptiveWorkStealingTasks extends Tasks {
trait ThreadPoolTasks extends Tasks {
import java.util.concurrent._
- trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
+ trait WrappedTask[R, +Tp] extends Runnable with super.WrappedTask[R, Tp] {
// initially, this is null
// once the task is started, this future is set and used for `sync`
// utb: var future: Future[_] = null
@@ -290,9 +290,9 @@ trait ThreadPoolTasks extends Tasks {
}
}
- protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+ protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp]
- var environment: AnyRef = ThreadPoolTasks.defaultThreadPool
+ val environment: ThreadPoolExecutor
def executor = environment.asInstanceOf[ThreadPoolExecutor]
def queue = executor.getQueue.asInstanceOf[LinkedBlockingQueue[Runnable]]
@volatile var totaltasks = 0
@@ -306,7 +306,7 @@ trait ThreadPoolTasks extends Tasks {
}
def execute[R, Tp](task: Task[R, Tp]): () => R = {
- val t = newTaskImpl(task)
+ val t = newWrappedTask(task)
// debuglog("-----------> Executing without wait: " + task)
t.start()
@@ -319,7 +319,7 @@ trait ThreadPoolTasks extends Tasks {
}
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
- val t = newTaskImpl(task)
+ val t = newWrappedTask(task)
// debuglog("-----------> Executing with wait: " + task)
t.start()
@@ -359,10 +359,11 @@ object ThreadPoolTasks {
/** An implementation of tasks objects based on the Java thread pooling API and synchronization using futures. */
+@deprecated("This implementation is not used.")
trait FutureThreadPoolTasks extends Tasks {
import java.util.concurrent._
- trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
+ trait WrappedTask[R, +Tp] extends Runnable with super.WrappedTask[R, Tp] {
@volatile var future: Future[_] = null
def start() = {
@@ -377,13 +378,13 @@ trait FutureThreadPoolTasks extends Tasks {
}
}
- protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+ protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp]
- var environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool
+ val environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool
def executor = environment.asInstanceOf[ThreadPoolExecutor]
def execute[R, Tp](task: Task[R, Tp]): () => R = {
- val t = newTaskImpl(task)
+ val t = newWrappedTask(task)
// debuglog("-----------> Executing without wait: " + task)
t.start
@@ -396,7 +397,7 @@ trait FutureThreadPoolTasks extends Tasks {
}
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
- val t = newTaskImpl(task)
+ val t = newWrappedTask(task)
// debuglog("-----------> Executing with wait: " + task)
t.start
@@ -438,26 +439,26 @@ trait HavingForkJoinPool {
*/
trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
- trait TaskImpl[R, +Tp] extends RecursiveAction with super.TaskImpl[R, Tp] {
+ trait WrappedTask[R, +Tp] extends RecursiveAction with super.WrappedTask[R, Tp] {
def start() = fork
def sync() = join
def tryCancel = tryUnfork
}
// specialize ctor
- protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+ protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp]
/** The fork/join pool of this collection.
*/
def forkJoinPool: ForkJoinPool = environment.asInstanceOf[ForkJoinPool]
- var environment: AnyRef = ForkJoinTasks.defaultForkJoinPool
+ val environment: ForkJoinPool
/** Executes a task and does not wait for it to finish - instead returns a future.
*
* $fjdispatch
*/
def execute[R, Tp](task: Task[R, Tp]): () => R = {
- val fjtask = newTaskImpl(task)
+ val fjtask = newWrappedTask(task)
if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
fjtask.fork
@@ -480,7 +481,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
* @return the result of the task
*/
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
- val fjtask = newTaskImpl(task)
+ val fjtask = newWrappedTask(task)
if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
fjtask.fork
@@ -510,25 +511,50 @@ object ForkJoinTasks {
*/
trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks {
- class TaskImpl[R, Tp](val body: Task[R, Tp])
- extends super[ForkJoinTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] {
- def split = body.split.map(b => newTaskImpl(b))
+ class WrappedTask[R, Tp](val body: Task[R, Tp])
+ extends super[ForkJoinTasks].WrappedTask[R, Tp] with super[AdaptiveWorkStealingTasks].WrappedTask[R, Tp] {
+ def split = body.split.map(b => newWrappedTask(b))
}
- def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b)
+ def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b)
}
trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveWorkStealingTasks {
- class TaskImpl[R, Tp](val body: Task[R, Tp])
- extends super[ThreadPoolTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] {
- def split = body.split.map(b => newTaskImpl(b))
+ class WrappedTask[R, Tp](val body: Task[R, Tp])
+ extends super[ThreadPoolTasks].WrappedTask[R, Tp] with super[AdaptiveWorkStealingTasks].WrappedTask[R, Tp] {
+ def split = body.split.map(b => newWrappedTask(b))
}
- def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b)
+ def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b)
+
+}
+
+trait ExecutionContextTasks extends Tasks {
+
+ def executionContext = environment
+
+ val environment: ExecutionContext
+
+ // this part is a hack which allows switching
+ val driver: Tasks = executionContext match {
+ case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executorService match {
+ case fjp: ForkJoinPool => new ForkJoinTaskSupport(fjp)
+ case tpe: ThreadPoolExecutor => new ThreadPoolTaskSupport(tpe)
+ case _ => ???
+ }
+ case _ => ???
+ }
+
+ def execute[R, Tp](task: Task[R, Tp]): () => R = driver execute task
+
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = driver executeAndWaitResult task
+
+ def parallelismLevel = driver.parallelismLevel
+
}
@@ -538,3 +564,6 @@ trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveW
+
+
+