summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAleksandar Prokopec <axel22@gmail.com>2013-10-28 11:20:23 +0100
committerAleksandar Prokopec <axel22@gmail.com>2013-10-29 12:15:10 +0100
commit344ac60c3f34cc0a1c6e6aae1479878fe63476eb (patch)
tree97e5de64b81b5cb76913e8773171a1ad93b43244 /src
parent1819af77fd4ecc66c89a84ea321aa7d6f92285ec (diff)
downloadscala-344ac60c3f34cc0a1c6e6aae1479878fe63476eb.tar.gz
scala-344ac60c3f34cc0a1c6e6aae1479878fe63476eb.tar.bz2
scala-344ac60c3f34cc0a1c6e6aae1479878fe63476eb.zip
SI-7938 - parallel collections should use default ExecutionContext
Parallel collections now use `scala.concurrent.ExecutionContext` by default. The `ExecutionContextTaskSupport` is optimized to use the `ForkJoinPool` underlying the `ExecutionContext` if possible. Otherwise, a fallback `TaskSupport` that creates a reduction tree and execute an operation through `Future`s is used.
Diffstat (limited to 'src')
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala95
-rw-r--r--src/library/scala/collection/parallel/package.scala2
2 files changed, 87 insertions, 10 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index f8d0c6043a..fcf0dff846 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -52,7 +52,7 @@ trait Task[R, +Tp] {
signalAbort()
}
} catch {
- case thr: Exception =>
+ case thr: Throwable =>
result = result // ensure that effects of `leaf` are visible
throwable = thr
signalAbort()
@@ -433,9 +433,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
}
object ForkJoinTasks {
- val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() // scala.parallel.forkjoinpool
- // defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
- // defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
+ lazy val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool()
}
/* Some boilerplate due to no deep mixin composition. Not sure if it can be done differently without them.
@@ -461,19 +459,98 @@ trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveW
def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b)
}
+/** An implementation of the `Tasks` that uses Scala `Future`s to compute
+ * the work encapsulated in each task.
+ */
+private[parallel] final class FutureTasks(executor: ExecutionContext) extends Tasks {
+ import scala.concurrent._
+ import scala.util._
+
+ private val maxdepth = (math.log(parallelismLevel) / math.log(2) + 1).toInt
+
+ val environment: ExecutionContext = executor
+
+ /** Divides this task into a lot of small tasks and executes them asynchronously
+ * using futures.
+ * Folds the futures and merges them asynchronously.
+ */
+ private def exec[R, Tp](topLevelTask: Task[R, Tp]): Future[R] = {
+ implicit val ec = environment
+
+ /** Constructs a tree of futures where tasks can be reasonably split.
+ */
+ def compute(task: Task[R, Tp], depth: Int): Future[Task[R, Tp]] = {
+ if (task.shouldSplitFurther && depth < maxdepth) {
+ val subtasks = task.split
+ val subfutures = for (subtask <- subtasks.iterator) yield compute(subtask, depth + 1)
+ subfutures.reduceLeft { (firstFuture, nextFuture) =>
+ for {
+ firstTask <- firstFuture
+ nextTask <- nextFuture
+ } yield {
+ firstTask tryMerge nextTask.repr
+ firstTask
+ }
+ } andThen {
+ case Success(firstTask) =>
+ task.throwable = firstTask.throwable
+ task.result = firstTask.result
+ case Failure(exception) =>
+ task.throwable = exception
+ }
+ } else Future {
+ task.tryLeaf(None)
+ task
+ }
+ }
+
+ compute(topLevelTask, 0) map { t =>
+ t.forwardThrowable()
+ t.result
+ }
+ }
+
+ def execute[R, Tp](task: Task[R, Tp]): () => R = {
+ val future = exec(task)
+ val callback = () => {
+ Await.result(future, scala.concurrent.duration.Duration.Inf)
+ }
+ callback
+ }
+
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
+ execute(task)()
+ }
+
+ def parallelismLevel = Runtime.getRuntime.availableProcessors
+}
+
+/** This tasks implementation uses execution contexts to spawn a parallel computation.
+ *
+ * As an optimization, it internally checks whether the execution context is the
+ * standard implementation based on fork/join pools, and if it is, creates a
+ * `ForkJoinTaskSupport` that shares the same pool to forward its request to it.
+ *
+ * Otherwise, it uses an execution context exclusive `Tasks` implementation to
+ * divide the tasks into smaller chunks and execute operations on it.
+ */
trait ExecutionContextTasks extends Tasks {
def executionContext = environment
val environment: ExecutionContext
- // this part is a hack which allows switching
- val driver: Tasks = executionContext match {
+ /** A driver serves as a target for this proxy `Tasks` object.
+ *
+ * If the execution context has the standard implementation and uses fork/join pools,
+ * the driver is `ForkJoinTaskSupport` with the same pool, as an optimization.
+ * Otherwise, the driver will be a Scala `Future`-based implementation.
+ */
+ private val driver: Tasks = executionContext match {
case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executor match {
case fjp: ForkJoinPool => new ForkJoinTaskSupport(fjp)
- case tpe: ThreadPoolExecutor => new ThreadPoolTaskSupport(tpe)
- case _ => ???
+ case _ => new FutureTasks(environment)
}
- case _ => ???
+ case _ => new FutureTasks(environment)
}
def execute[R, Tp](task: Task[R, Tp]): () => R = driver execute task
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index b25553d2c8..923e21e5a7 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -41,7 +41,7 @@ package object parallel {
private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString)
- private[parallel] def getTaskSupport: TaskSupport = new ForkJoinTaskSupport
+ private[parallel] def getTaskSupport: TaskSupport = new ExecutionContextTaskSupport
val defaultTaskSupport: TaskSupport = getTaskSupport