diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-06-18 15:06:17 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-06-18 15:06:17 +0000 |
commit | 9923b97157725ae1f7853a4834ef5e31283a1b98 (patch) | |
tree | 6252cf350a91d6bed178b07ed3ddc7fdd21d2890 /src/library/scala/collection/parallel/Tasks.scala | |
parent | ceec792d1af5bb7b2d618f27f6fd48cdf75cf92f (diff) | |
download | scala-9923b97157725ae1f7853a4834ef5e31283a1b98.tar.gz scala-9923b97157725ae1f7853a4834ef5e31283a1b98.tar.bz2 scala-9923b97157725ae1f7853a4834ef5e31283a1b98.zip |
Moved parallel collections to library dir, chan...
Moved parallel collections to library dir, changed sabbus script. Added
`par` to some of the classes. No review.
Diffstat (limited to 'src/library/scala/collection/parallel/Tasks.scala')
-rw-r--r-- | src/library/scala/collection/parallel/Tasks.scala | 230 |
1 files changed, 230 insertions, 0 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala new file mode 100644 index 0000000000..3ef60f8c7a --- /dev/null +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -0,0 +1,230 @@ +package scala.collection.parallel + + + + +import scala.concurrent.forkjoin._ + + + + + + + + + + +/** A trait that declares task execution capabilities used + * by parallel collections. Parallel collections inherit a subtrait + * of this trait. + * + * One implementation trait of `TaskExecution` is `ForkJoinTaskExecution`. + */ +trait Tasks { + + /** A task abstraction which allows starting a task with `start`, + * waiting for it to finish with `sync` and attempting to cancel + * the task with `tryCancel`. + * It also defines a method `leaf` which must be called once the + * the task is started and defines what this task actually does. + * Method `split` allows splitting this task into smaller subtasks, + * and method `shouldSplitFurther` decides if the task should be + * partitioned further. + * Method `merge` allows merging the results of the 2 tasks. It updates + * the result of the receiver. + * Finally, it defines the task result of type `U`. + */ + trait Task[R, +Tp] { + def repr = this.asInstanceOf[Tp] + /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */ + def compute + /** Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous task + * or `None` if there was no previous task. + */ + def leaf(result: Option[R]) + /** Start task. */ + def start + /** Wait for task to finish. */ + def sync + /** Try to cancel the task. + * @return `true` if cancellation is successful. + */ + def tryCancel: Boolean + /** A result that can be accessed once the task is completed. */ + def result: R + /** Decides whether or not this task should be split further. */ + def shouldSplitFurther: Boolean + /** Splits this task into a list of smaller tasks. */ + protected[this] def split: Seq[Task[R, Tp]] + /** Read of results of `that` task and merge them into results of this one. */ + protected[this] def merge(that: Tp) {} + } + + type TaskType[R, +Tp] <: Task[R, Tp] + type ExecutionEnvironment + + var environment: ExecutionEnvironment + + /** Executes a task and waits for it to finish. */ + def executeAndWait[R, Tp](task: TaskType[R, Tp]) + + /** Executes a result task, waits for it to finish, then returns its result. */ + def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R + + /** Retrieves the parallelism level of the task execution environment. */ + def parallelismLevel: Int + +} + + +/** This trait implements scheduling by employing + * an adaptive work stealing technique. + */ +trait AdaptiveWorkStealingTasks extends Tasks { + + trait Task[R, Tp] extends super.Task[R, Tp] { + var next: Task[R, Tp] = null + var shouldWaitFor = true + var result: R + + def split: Seq[Task[R, Tp]] + + /** The actual leaf computation. */ + def leaf(result: Option[R]): Unit + + def compute = if (shouldSplitFurther) internal else leaf(None) + + def internal = { + var last = spawnSubtasks + + last.leaf(None) + result = last.result + + while (last.next != null) { + val lastresult = Option(last.result) + last = last.next + if (last.tryCancel) last.leaf(lastresult) else last.sync + merge(last.repr) + } + } + + def spawnSubtasks = { + var last: Task[R, Tp] = null + var head: Task[R, Tp] = this + do { + val subtasks = head.split + head = subtasks.head + for (t <- subtasks.tail) { + t.next = last + last = t + t.start + } + } while (head.shouldSplitFurther); + head.next = last + head + } + + def printChain = { + var curr = this + var chain = "chain: " + while (curr != null) { + chain += curr + " ---> " + curr = curr.next + } + println(chain) + } + } + +} + + +/** + * A trait describing objects that provide a fork/join pool. + */ +trait HavingForkJoinPool { + def forkJoinPool: ForkJoinPool +} + + + +/** An implementation trait for parallel tasks based on the fork/join framework. + * + * @define fjdispatch + * If the current thread is a fork/join worker thread, the task's `fork` method will + * be invoked. Otherwise, the task will be executed on the fork/join pool. + */ +trait ForkJoinTasks extends Tasks with HavingForkJoinPool { + + trait Task[R, +Tp] extends RecursiveAction with super.Task[R, Tp] { + def start = fork + def sync = join + def tryCancel = tryUnfork + var result: R + } + + type TaskType[R, +Tp] = Task[R, Tp] + type ExecutionEnvironment = ForkJoinPool + + /** The fork/join pool of this collection. + */ + def forkJoinPool: ForkJoinPool = environment + var environment = ForkJoinTasks.defaultForkJoinPool + + /** Executes a task on a fork/join pool and waits for it to finish. + * + * $fjdispatch + */ + def executeAndWait[R, Tp](fjtask: Task[R, Tp]) { + if (currentThread.isInstanceOf[ForkJoinWorkerThread]) { + fjtask.fork + } else { + forkJoinPool.execute(fjtask) + } + fjtask.join + } + + /** Executes a task on a fork/join pool and waits for it to finish. + * Returns its result when it does. + * + * $fjdispatch + * + * @return the result of the task + */ + def executeAndWaitResult[R, Tp](fjtask: Task[R, Tp]): R = { + if (currentThread.isInstanceOf[ForkJoinWorkerThread]) { + fjtask.fork + } else { + forkJoinPool.execute(fjtask) + } + fjtask.join + fjtask.result + } + + def parallelismLevel = forkJoinPool.getParallelism + +} + +object ForkJoinTasks { + val defaultForkJoinPool = new ForkJoinPool + defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors) + defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors) +} + + +/* Some boilerplate due to no deep mixin composition. Not sure if it can be done differently without them. + */ +trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks { + + trait Task[R, Tp] extends super[ForkJoinTasks].Task[R, Tp] with super[AdaptiveWorkStealingTasks].Task[R, Tp] + +} + + + + + + + + + + |