diff options
author | Aleksandar Prokopec <axel22@gmail.com> | 2013-10-28 11:20:23 +0100 |
---|---|---|
committer | Aleksandar Prokopec <axel22@gmail.com> | 2013-10-29 12:15:10 +0100 |
commit | 344ac60c3f34cc0a1c6e6aae1479878fe63476eb (patch) | |
tree | 97e5de64b81b5cb76913e8773171a1ad93b43244 /src/library | |
parent | 1819af77fd4ecc66c89a84ea321aa7d6f92285ec (diff) | |
download | scala-344ac60c3f34cc0a1c6e6aae1479878fe63476eb.tar.gz scala-344ac60c3f34cc0a1c6e6aae1479878fe63476eb.tar.bz2 scala-344ac60c3f34cc0a1c6e6aae1479878fe63476eb.zip |
SI-7938 - parallel collections should use default ExecutionContext
Parallel collections now use `scala.concurrent.ExecutionContext`
by default.
The `ExecutionContextTaskSupport` is optimized to use the
`ForkJoinPool` underlying the `ExecutionContext` if possible.
Otherwise, a fallback `TaskSupport` that creates a reduction tree and
execute an operation through `Future`s is used.
Diffstat (limited to 'src/library')
-rw-r--r-- | src/library/scala/collection/parallel/Tasks.scala | 95 | ||||
-rw-r--r-- | src/library/scala/collection/parallel/package.scala | 2 |
2 files changed, 87 insertions, 10 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index f8d0c6043a..fcf0dff846 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -52,7 +52,7 @@ trait Task[R, +Tp] { signalAbort() } } catch { - case thr: Exception => + case thr: Throwable => result = result // ensure that effects of `leaf` are visible throwable = thr signalAbort() @@ -433,9 +433,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { } object ForkJoinTasks { - val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() // scala.parallel.forkjoinpool - // defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors) - // defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors) + lazy val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() } /* Some boilerplate due to no deep mixin composition. Not sure if it can be done differently without them. @@ -461,19 +459,98 @@ trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveW def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b) } +/** An implementation of the `Tasks` that uses Scala `Future`s to compute + * the work encapsulated in each task. + */ +private[parallel] final class FutureTasks(executor: ExecutionContext) extends Tasks { + import scala.concurrent._ + import scala.util._ + + private val maxdepth = (math.log(parallelismLevel) / math.log(2) + 1).toInt + + val environment: ExecutionContext = executor + + /** Divides this task into a lot of small tasks and executes them asynchronously + * using futures. + * Folds the futures and merges them asynchronously. + */ + private def exec[R, Tp](topLevelTask: Task[R, Tp]): Future[R] = { + implicit val ec = environment + + /** Constructs a tree of futures where tasks can be reasonably split. + */ + def compute(task: Task[R, Tp], depth: Int): Future[Task[R, Tp]] = { + if (task.shouldSplitFurther && depth < maxdepth) { + val subtasks = task.split + val subfutures = for (subtask <- subtasks.iterator) yield compute(subtask, depth + 1) + subfutures.reduceLeft { (firstFuture, nextFuture) => + for { + firstTask <- firstFuture + nextTask <- nextFuture + } yield { + firstTask tryMerge nextTask.repr + firstTask + } + } andThen { + case Success(firstTask) => + task.throwable = firstTask.throwable + task.result = firstTask.result + case Failure(exception) => + task.throwable = exception + } + } else Future { + task.tryLeaf(None) + task + } + } + + compute(topLevelTask, 0) map { t => + t.forwardThrowable() + t.result + } + } + + def execute[R, Tp](task: Task[R, Tp]): () => R = { + val future = exec(task) + val callback = () => { + Await.result(future, scala.concurrent.duration.Duration.Inf) + } + callback + } + + def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { + execute(task)() + } + + def parallelismLevel = Runtime.getRuntime.availableProcessors +} + +/** This tasks implementation uses execution contexts to spawn a parallel computation. + * + * As an optimization, it internally checks whether the execution context is the + * standard implementation based on fork/join pools, and if it is, creates a + * `ForkJoinTaskSupport` that shares the same pool to forward its request to it. + * + * Otherwise, it uses an execution context exclusive `Tasks` implementation to + * divide the tasks into smaller chunks and execute operations on it. + */ trait ExecutionContextTasks extends Tasks { def executionContext = environment val environment: ExecutionContext - // this part is a hack which allows switching - val driver: Tasks = executionContext match { + /** A driver serves as a target for this proxy `Tasks` object. + * + * If the execution context has the standard implementation and uses fork/join pools, + * the driver is `ForkJoinTaskSupport` with the same pool, as an optimization. + * Otherwise, the driver will be a Scala `Future`-based implementation. + */ + private val driver: Tasks = executionContext match { case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executor match { case fjp: ForkJoinPool => new ForkJoinTaskSupport(fjp) - case tpe: ThreadPoolExecutor => new ThreadPoolTaskSupport(tpe) - case _ => ??? + case _ => new FutureTasks(environment) } - case _ => ??? + case _ => new FutureTasks(environment) } def execute[R, Tp](task: Task[R, Tp]): () => R = driver execute task diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index b25553d2c8..923e21e5a7 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -41,7 +41,7 @@ package object parallel { private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString) - private[parallel] def getTaskSupport: TaskSupport = new ForkJoinTaskSupport + private[parallel] def getTaskSupport: TaskSupport = new ExecutionContextTaskSupport val defaultTaskSupport: TaskSupport = getTaskSupport |