summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/library/scala/collection/parallel/TaskSupport.scala17
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala101
-rw-r--r--src/library/scala/concurrent/ConcurrentPackageObject.scala22
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala15
4 files changed, 104 insertions, 51 deletions
diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala
index 20800250b4..fc99347316 100644
--- a/src/library/scala/collection/parallel/TaskSupport.scala
+++ b/src/library/scala/collection/parallel/TaskSupport.scala
@@ -11,15 +11,26 @@ package scala.collection.parallel
-
+import java.util.concurrent.ThreadPoolExecutor
+import scala.concurrent.forkjoin.ForkJoinPool
+import scala.concurrent.ExecutionContext
trait TaskSupport extends Tasks
-private[collection] class ForkJoinTaskSupport extends TaskSupport with AdaptiveWorkStealingForkJoinTasks
-private[collection] class ThreadPoolTaskSupport extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks
+private[collection] class ForkJoinTaskSupport(val environment: ForkJoinPool = ForkJoinTasks.defaultForkJoinPool)
+extends TaskSupport with AdaptiveWorkStealingForkJoinTasks
+
+
+private[collection] class ThreadPoolTaskSupport(val environment: ThreadPoolExecutor = ThreadPoolTasks.defaultThreadPool)
+extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks
+
+
+private[collection] class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.executionContext)
+extends TaskSupport with ExecutionContextTasks
+
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index a7f2c586a7..60a8bb1ed6 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -10,7 +10,10 @@ package scala.collection.parallel
+import java.util.concurrent.ThreadPoolExecutor
+
import scala.concurrent.forkjoin._
+import scala.concurrent.ExecutionContext
import scala.util.control.Breaks._
import annotation.unchecked.uncheckedVariance
@@ -101,11 +104,11 @@ trait Tasks {
debugMessages += s
}
- trait TaskImpl[R, +Tp] {
+ trait WrappedTask[R, +Tp] {
/** the body of this task - what it executes, how it gets split and how results are merged. */
val body: Task[R, Tp]
- def split: Seq[TaskImpl[R, Tp]]
+ def split: Seq[WrappedTask[R, Tp]]
/** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */
def compute()
/** Start task. */
@@ -126,13 +129,10 @@ trait Tasks {
def release() {}
}
- protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
-
/* task control */
- // safe to assume it will always have the same type,
- // because the `tasksupport` in parallel iterable is final
- var environment: AnyRef
+ /** The type of the environment is more specific in the implementations. */
+ val environment: AnyRef
/** Executes a task and returns a future. Forwards an exception if some task threw it. */
def execute[R, Tp](fjtask: Task[R, Tp]): () => R
@@ -152,11 +152,11 @@ trait Tasks {
*/
trait AdaptiveWorkStealingTasks extends Tasks {
- trait TaskImpl[R, Tp] extends super.TaskImpl[R, Tp] {
- @volatile var next: TaskImpl[R, Tp] = null
+ trait WrappedTask[R, Tp] extends super.WrappedTask[R, Tp] {
+ @volatile var next: WrappedTask[R, Tp] = null
@volatile var shouldWaitFor = true
- def split: Seq[TaskImpl[R, Tp]]
+ def split: Seq[WrappedTask[R, Tp]]
def compute() = if (body.shouldSplitFurther) {
internal()
@@ -192,8 +192,8 @@ trait AdaptiveWorkStealingTasks extends Tasks {
}
def spawnSubtasks() = {
- var last: TaskImpl[R, Tp] = null
- var head: TaskImpl[R, Tp] = this
+ var last: WrappedTask[R, Tp] = null
+ var head: WrappedTask[R, Tp] = this
do {
val subtasks = head.split
head = subtasks.head
@@ -219,7 +219,7 @@ trait AdaptiveWorkStealingTasks extends Tasks {
}
// specialize ctor
- protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+ protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp]
}
@@ -228,7 +228,7 @@ trait AdaptiveWorkStealingTasks extends Tasks {
trait ThreadPoolTasks extends Tasks {
import java.util.concurrent._
- trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
+ trait WrappedTask[R, +Tp] extends Runnable with super.WrappedTask[R, Tp] {
// initially, this is null
// once the task is started, this future is set and used for `sync`
// utb: var future: Future[_] = null
@@ -290,9 +290,9 @@ trait ThreadPoolTasks extends Tasks {
}
}
- protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+ protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp]
- var environment: AnyRef = ThreadPoolTasks.defaultThreadPool
+ val environment: ThreadPoolExecutor
def executor = environment.asInstanceOf[ThreadPoolExecutor]
def queue = executor.getQueue.asInstanceOf[LinkedBlockingQueue[Runnable]]
@volatile var totaltasks = 0
@@ -306,7 +306,7 @@ trait ThreadPoolTasks extends Tasks {
}
def execute[R, Tp](task: Task[R, Tp]): () => R = {
- val t = newTaskImpl(task)
+ val t = newWrappedTask(task)
// debuglog("-----------> Executing without wait: " + task)
t.start()
@@ -319,7 +319,7 @@ trait ThreadPoolTasks extends Tasks {
}
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
- val t = newTaskImpl(task)
+ val t = newWrappedTask(task)
// debuglog("-----------> Executing with wait: " + task)
t.start()
@@ -359,10 +359,11 @@ object ThreadPoolTasks {
/** An implementation of tasks objects based on the Java thread pooling API and synchronization using futures. */
+@deprecated("This implementation is not used.")
trait FutureThreadPoolTasks extends Tasks {
import java.util.concurrent._
- trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
+ trait WrappedTask[R, +Tp] extends Runnable with super.WrappedTask[R, Tp] {
@volatile var future: Future[_] = null
def start() = {
@@ -377,13 +378,13 @@ trait FutureThreadPoolTasks extends Tasks {
}
}
- protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+ protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp]
- var environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool
+ val environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool
def executor = environment.asInstanceOf[ThreadPoolExecutor]
def execute[R, Tp](task: Task[R, Tp]): () => R = {
- val t = newTaskImpl(task)
+ val t = newWrappedTask(task)
// debuglog("-----------> Executing without wait: " + task)
t.start
@@ -396,7 +397,7 @@ trait FutureThreadPoolTasks extends Tasks {
}
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
- val t = newTaskImpl(task)
+ val t = newWrappedTask(task)
// debuglog("-----------> Executing with wait: " + task)
t.start
@@ -438,26 +439,26 @@ trait HavingForkJoinPool {
*/
trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
- trait TaskImpl[R, +Tp] extends RecursiveAction with super.TaskImpl[R, Tp] {
+ trait WrappedTask[R, +Tp] extends RecursiveAction with super.WrappedTask[R, Tp] {
def start() = fork
def sync() = join
def tryCancel = tryUnfork
}
// specialize ctor
- protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+ protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp]
/** The fork/join pool of this collection.
*/
def forkJoinPool: ForkJoinPool = environment.asInstanceOf[ForkJoinPool]
- var environment: AnyRef = ForkJoinTasks.defaultForkJoinPool
+ val environment: ForkJoinPool
/** Executes a task and does not wait for it to finish - instead returns a future.
*
* $fjdispatch
*/
def execute[R, Tp](task: Task[R, Tp]): () => R = {
- val fjtask = newTaskImpl(task)
+ val fjtask = newWrappedTask(task)
if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
fjtask.fork
@@ -480,7 +481,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
* @return the result of the task
*/
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
- val fjtask = newTaskImpl(task)
+ val fjtask = newWrappedTask(task)
if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
fjtask.fork
@@ -510,25 +511,50 @@ object ForkJoinTasks {
*/
trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks {
- class TaskImpl[R, Tp](val body: Task[R, Tp])
- extends super[ForkJoinTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] {
- def split = body.split.map(b => newTaskImpl(b))
+ class WrappedTask[R, Tp](val body: Task[R, Tp])
+ extends super[ForkJoinTasks].WrappedTask[R, Tp] with super[AdaptiveWorkStealingTasks].WrappedTask[R, Tp] {
+ def split = body.split.map(b => newWrappedTask(b))
}
- def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b)
+ def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b)
}
trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveWorkStealingTasks {
- class TaskImpl[R, Tp](val body: Task[R, Tp])
- extends super[ThreadPoolTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] {
- def split = body.split.map(b => newTaskImpl(b))
+ class WrappedTask[R, Tp](val body: Task[R, Tp])
+ extends super[ThreadPoolTasks].WrappedTask[R, Tp] with super[AdaptiveWorkStealingTasks].WrappedTask[R, Tp] {
+ def split = body.split.map(b => newWrappedTask(b))
}
- def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b)
+ def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b)
+
+}
+
+trait ExecutionContextTasks extends Tasks {
+
+ def executionContext = environment
+
+ val environment: ExecutionContext
+
+ // this part is a hack which allows switching
+ val driver: Tasks = executionContext match {
+ case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executorService match {
+ case fjp: ForkJoinPool => new ForkJoinTaskSupport(fjp)
+ case tpe: ThreadPoolExecutor => new ThreadPoolTaskSupport(tpe)
+ case _ => ???
+ }
+ case _ => ???
+ }
+
+ def execute[R, Tp](task: Task[R, Tp]): () => R = driver execute task
+
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = driver executeAndWaitResult task
+
+ def parallelismLevel = driver.parallelismLevel
+
}
@@ -538,3 +564,6 @@ trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveW
+
+
+
diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala
index ae17c7e032..7d005838d3 100644
--- a/src/library/scala/concurrent/ConcurrentPackageObject.scala
+++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala
@@ -8,22 +8,30 @@
package scala.concurrent
+
+
+import java.util.concurrent.{ Executors, ExecutorService }
+import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.{ Duration, Try, Success, Failure }
import ConcurrentPackageObject._
+
+
/** This package object contains primitives for concurrent and parallel programming.
*/
abstract class ConcurrentPackageObject {
/** A global execution environment for executing lightweight tasks.
*/
lazy val executionContext =
- new impl.ExecutionContextImpl(java.util.concurrent.Executors.newCachedThreadPool())
-
- /** A global service for scheduling tasks for execution.
- */
- // lazy val scheduler =
- // new default.SchedulerImpl
-
+ new impl.ExecutionContextImpl(getExecutorService)
+
+ private[concurrent] def getExecutorService: AnyRef =
+ if (util.Properties.isJavaAtLeast("1.6")) {
+ val vendor = util.Properties.javaVmVendor
+ if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple")) new ForkJoinPool
+ else Executors.newCachedThreadPool()
+ } else Executors.newCachedThreadPool()
+
val handledFutureException: PartialFunction[Throwable, Throwable] = {
case t: Throwable if isFutureThrowable(t) => t
}
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 7b44d02612..7984aa02b7 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -11,20 +11,25 @@ package scala.concurrent.impl
import java.util.concurrent.{Callable, ExecutorService}
+import scala.concurrent.forkjoin._
import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable}
import scala.util.{ Duration, Try, Success, Failure }
import scala.collection.mutable.Stack
-class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionContext {
+class ExecutionContextImpl(val executorService: AnyRef) extends ExecutionContext {
import ExecutionContextImpl._
def execute(runnable: Runnable): Unit = executorService match {
- // case fj: ForkJoinPool =>
- // TODO fork if more applicable
- // executorService execute runnable
- case _ =>
+ case fj: ForkJoinPool =>
+ if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
+ val fjtask = ForkJoinTask.adapt(runnable)
+ fjtask.fork
+ } else {
+ fj.execute(runnable)
+ }
+ case executorService: ExecutorService =>
executorService execute runnable
}