summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorAdriaan Moors <adriaan.moors@typesafe.com>2013-11-05 10:26:33 -0800
committerAdriaan Moors <adriaan.moors@typesafe.com>2013-11-05 10:26:33 -0800
commitb33adf47330b510d7d15971fa75aa11c7b9503ad (patch)
treeb7e2546191b867a74eb840e6f9413770c631afd5 /src/library
parent92ec2cafb3333a710746a1a1bc1351000b2a3372 (diff)
parent344ac60c3f34cc0a1c6e6aae1479878fe63476eb (diff)
downloadscala-b33adf47330b510d7d15971fa75aa11c7b9503ad.tar.gz
scala-b33adf47330b510d7d15971fa75aa11c7b9503ad.tar.bz2
scala-b33adf47330b510d7d15971fa75aa11c7b9503ad.zip
Merge pull request #3086 from axel22/topic/pc-execution-context
- parallel collections should use default ExecutionContext
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala95
-rw-r--r--src/library/scala/collection/parallel/package.scala2
2 files changed, 87 insertions, 10 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index f8d0c6043a..fcf0dff846 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -52,7 +52,7 @@ trait Task[R, +Tp] {
signalAbort()
}
} catch {
- case thr: Exception =>
+ case thr: Throwable =>
result = result // ensure that effects of `leaf` are visible
throwable = thr
signalAbort()
@@ -433,9 +433,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
}
object ForkJoinTasks {
- val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() // scala.parallel.forkjoinpool
- // defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
- // defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
+ lazy val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool()
}
/* Some boilerplate due to no deep mixin composition. Not sure if it can be done differently without them.
@@ -461,19 +459,98 @@ trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveW
def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b)
}
+/** An implementation of the `Tasks` that uses Scala `Future`s to compute
+ * the work encapsulated in each task.
+ */
+private[parallel] final class FutureTasks(executor: ExecutionContext) extends Tasks {
+ import scala.concurrent._
+ import scala.util._
+
+ private val maxdepth = (math.log(parallelismLevel) / math.log(2) + 1).toInt
+
+ val environment: ExecutionContext = executor
+
+ /** Divides this task into a lot of small tasks and executes them asynchronously
+ * using futures.
+ * Folds the futures and merges them asynchronously.
+ */
+ private def exec[R, Tp](topLevelTask: Task[R, Tp]): Future[R] = {
+ implicit val ec = environment
+
+ /** Constructs a tree of futures where tasks can be reasonably split.
+ */
+ def compute(task: Task[R, Tp], depth: Int): Future[Task[R, Tp]] = {
+ if (task.shouldSplitFurther && depth < maxdepth) {
+ val subtasks = task.split
+ val subfutures = for (subtask <- subtasks.iterator) yield compute(subtask, depth + 1)
+ subfutures.reduceLeft { (firstFuture, nextFuture) =>
+ for {
+ firstTask <- firstFuture
+ nextTask <- nextFuture
+ } yield {
+ firstTask tryMerge nextTask.repr
+ firstTask
+ }
+ } andThen {
+ case Success(firstTask) =>
+ task.throwable = firstTask.throwable
+ task.result = firstTask.result
+ case Failure(exception) =>
+ task.throwable = exception
+ }
+ } else Future {
+ task.tryLeaf(None)
+ task
+ }
+ }
+
+ compute(topLevelTask, 0) map { t =>
+ t.forwardThrowable()
+ t.result
+ }
+ }
+
+ def execute[R, Tp](task: Task[R, Tp]): () => R = {
+ val future = exec(task)
+ val callback = () => {
+ Await.result(future, scala.concurrent.duration.Duration.Inf)
+ }
+ callback
+ }
+
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
+ execute(task)()
+ }
+
+ def parallelismLevel = Runtime.getRuntime.availableProcessors
+}
+
+/** This tasks implementation uses execution contexts to spawn a parallel computation.
+ *
+ * As an optimization, it internally checks whether the execution context is the
+ * standard implementation based on fork/join pools, and if it is, creates a
+ * `ForkJoinTaskSupport` that shares the same pool to forward its request to it.
+ *
+ * Otherwise, it uses an execution context exclusive `Tasks` implementation to
+ * divide the tasks into smaller chunks and execute operations on it.
+ */
trait ExecutionContextTasks extends Tasks {
def executionContext = environment
val environment: ExecutionContext
- // this part is a hack which allows switching
- val driver: Tasks = executionContext match {
+ /** A driver serves as a target for this proxy `Tasks` object.
+ *
+ * If the execution context has the standard implementation and uses fork/join pools,
+ * the driver is `ForkJoinTaskSupport` with the same pool, as an optimization.
+ * Otherwise, the driver will be a Scala `Future`-based implementation.
+ */
+ private val driver: Tasks = executionContext match {
case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executor match {
case fjp: ForkJoinPool => new ForkJoinTaskSupport(fjp)
- case tpe: ThreadPoolExecutor => new ThreadPoolTaskSupport(tpe)
- case _ => ???
+ case _ => new FutureTasks(environment)
}
- case _ => ???
+ case _ => new FutureTasks(environment)
}
def execute[R, Tp](task: Task[R, Tp]): () => R = driver execute task
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index b25553d2c8..923e21e5a7 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -41,7 +41,7 @@ package object parallel {
private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString)
- private[parallel] def getTaskSupport: TaskSupport = new ForkJoinTaskSupport
+ private[parallel] def getTaskSupport: TaskSupport = new ExecutionContextTaskSupport
val defaultTaskSupport: TaskSupport = getTaskSupport