diff options
author | Adriaan Moors <adriaan.moors@typesafe.com> | 2013-11-05 10:26:33 -0800 |
---|---|---|
committer | Adriaan Moors <adriaan.moors@typesafe.com> | 2013-11-05 10:26:33 -0800 |
commit | b33adf47330b510d7d15971fa75aa11c7b9503ad (patch) | |
tree | b7e2546191b867a74eb840e6f9413770c631afd5 /src/library | |
parent | 92ec2cafb3333a710746a1a1bc1351000b2a3372 (diff) | |
parent | 344ac60c3f34cc0a1c6e6aae1479878fe63476eb (diff) | |
download | scala-b33adf47330b510d7d15971fa75aa11c7b9503ad.tar.gz scala-b33adf47330b510d7d15971fa75aa11c7b9503ad.tar.bz2 scala-b33adf47330b510d7d15971fa75aa11c7b9503ad.zip |
Merge pull request #3086 from axel22/topic/pc-execution-context
- parallel collections should use default ExecutionContext
Diffstat (limited to 'src/library')
-rw-r--r-- | src/library/scala/collection/parallel/Tasks.scala | 95 | ||||
-rw-r--r-- | src/library/scala/collection/parallel/package.scala | 2 |
2 files changed, 87 insertions, 10 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index f8d0c6043a..fcf0dff846 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -52,7 +52,7 @@ trait Task[R, +Tp] { signalAbort() } } catch { - case thr: Exception => + case thr: Throwable => result = result // ensure that effects of `leaf` are visible throwable = thr signalAbort() @@ -433,9 +433,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { } object ForkJoinTasks { - val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() // scala.parallel.forkjoinpool - // defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors) - // defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors) + lazy val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() } /* Some boilerplate due to no deep mixin composition. Not sure if it can be done differently without them. @@ -461,19 +459,98 @@ trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveW def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b) } +/** An implementation of the `Tasks` that uses Scala `Future`s to compute + * the work encapsulated in each task. + */ +private[parallel] final class FutureTasks(executor: ExecutionContext) extends Tasks { + import scala.concurrent._ + import scala.util._ + + private val maxdepth = (math.log(parallelismLevel) / math.log(2) + 1).toInt + + val environment: ExecutionContext = executor + + /** Divides this task into a lot of small tasks and executes them asynchronously + * using futures. + * Folds the futures and merges them asynchronously. + */ + private def exec[R, Tp](topLevelTask: Task[R, Tp]): Future[R] = { + implicit val ec = environment + + /** Constructs a tree of futures where tasks can be reasonably split. + */ + def compute(task: Task[R, Tp], depth: Int): Future[Task[R, Tp]] = { + if (task.shouldSplitFurther && depth < maxdepth) { + val subtasks = task.split + val subfutures = for (subtask <- subtasks.iterator) yield compute(subtask, depth + 1) + subfutures.reduceLeft { (firstFuture, nextFuture) => + for { + firstTask <- firstFuture + nextTask <- nextFuture + } yield { + firstTask tryMerge nextTask.repr + firstTask + } + } andThen { + case Success(firstTask) => + task.throwable = firstTask.throwable + task.result = firstTask.result + case Failure(exception) => + task.throwable = exception + } + } else Future { + task.tryLeaf(None) + task + } + } + + compute(topLevelTask, 0) map { t => + t.forwardThrowable() + t.result + } + } + + def execute[R, Tp](task: Task[R, Tp]): () => R = { + val future = exec(task) + val callback = () => { + Await.result(future, scala.concurrent.duration.Duration.Inf) + } + callback + } + + def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { + execute(task)() + } + + def parallelismLevel = Runtime.getRuntime.availableProcessors +} + +/** This tasks implementation uses execution contexts to spawn a parallel computation. + * + * As an optimization, it internally checks whether the execution context is the + * standard implementation based on fork/join pools, and if it is, creates a + * `ForkJoinTaskSupport` that shares the same pool to forward its request to it. + * + * Otherwise, it uses an execution context exclusive `Tasks` implementation to + * divide the tasks into smaller chunks and execute operations on it. + */ trait ExecutionContextTasks extends Tasks { def executionContext = environment val environment: ExecutionContext - // this part is a hack which allows switching - val driver: Tasks = executionContext match { + /** A driver serves as a target for this proxy `Tasks` object. + * + * If the execution context has the standard implementation and uses fork/join pools, + * the driver is `ForkJoinTaskSupport` with the same pool, as an optimization. + * Otherwise, the driver will be a Scala `Future`-based implementation. + */ + private val driver: Tasks = executionContext match { case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executor match { case fjp: ForkJoinPool => new ForkJoinTaskSupport(fjp) - case tpe: ThreadPoolExecutor => new ThreadPoolTaskSupport(tpe) - case _ => ??? + case _ => new FutureTasks(environment) } - case _ => ??? + case _ => new FutureTasks(environment) } def execute[R, Tp](task: Task[R, Tp]): () => R = driver execute task diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index b25553d2c8..923e21e5a7 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -41,7 +41,7 @@ package object parallel { private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString) - private[parallel] def getTaskSupport: TaskSupport = new ForkJoinTaskSupport + private[parallel] def getTaskSupport: TaskSupport = new ExecutionContextTaskSupport val defaultTaskSupport: TaskSupport = getTaskSupport |