summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/Tasks.scala
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-06-18 15:06:17 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-06-18 15:06:17 +0000
commit9923b97157725ae1f7853a4834ef5e31283a1b98 (patch)
tree6252cf350a91d6bed178b07ed3ddc7fdd21d2890 /src/library/scala/collection/parallel/Tasks.scala
parentceec792d1af5bb7b2d618f27f6fd48cdf75cf92f (diff)
downloadscala-9923b97157725ae1f7853a4834ef5e31283a1b98.tar.gz
scala-9923b97157725ae1f7853a4834ef5e31283a1b98.tar.bz2
scala-9923b97157725ae1f7853a4834ef5e31283a1b98.zip
Moved parallel collections to library dir, chan...
Moved parallel collections to library dir, changed sabbus script. Added `par` to some of the classes. No review.
Diffstat (limited to 'src/library/scala/collection/parallel/Tasks.scala')
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala230
1 files changed, 230 insertions, 0 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
new file mode 100644
index 0000000000..3ef60f8c7a
--- /dev/null
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -0,0 +1,230 @@
+package scala.collection.parallel
+
+
+
+
+import scala.concurrent.forkjoin._
+
+
+
+
+
+
+
+
+
+
+/** A trait that declares task execution capabilities used
+ * by parallel collections. Parallel collections inherit a subtrait
+ * of this trait.
+ *
+ * One implementation trait of `TaskExecution` is `ForkJoinTaskExecution`.
+ */
+trait Tasks {
+
+ /** A task abstraction which allows starting a task with `start`,
+ * waiting for it to finish with `sync` and attempting to cancel
+ * the task with `tryCancel`.
+ * It also defines a method `leaf` which must be called once the
+ * the task is started and defines what this task actually does.
+ * Method `split` allows splitting this task into smaller subtasks,
+ * and method `shouldSplitFurther` decides if the task should be
+ * partitioned further.
+ * Method `merge` allows merging the results of the 2 tasks. It updates
+ * the result of the receiver.
+ * Finally, it defines the task result of type `U`.
+ */
+ trait Task[R, +Tp] {
+ def repr = this.asInstanceOf[Tp]
+ /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */
+ def compute
+ /** Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous task
+ * or `None` if there was no previous task.
+ */
+ def leaf(result: Option[R])
+ /** Start task. */
+ def start
+ /** Wait for task to finish. */
+ def sync
+ /** Try to cancel the task.
+ * @return `true` if cancellation is successful.
+ */
+ def tryCancel: Boolean
+ /** A result that can be accessed once the task is completed. */
+ def result: R
+ /** Decides whether or not this task should be split further. */
+ def shouldSplitFurther: Boolean
+ /** Splits this task into a list of smaller tasks. */
+ protected[this] def split: Seq[Task[R, Tp]]
+ /** Read of results of `that` task and merge them into results of this one. */
+ protected[this] def merge(that: Tp) {}
+ }
+
+ type TaskType[R, +Tp] <: Task[R, Tp]
+ type ExecutionEnvironment
+
+ var environment: ExecutionEnvironment
+
+ /** Executes a task and waits for it to finish. */
+ def executeAndWait[R, Tp](task: TaskType[R, Tp])
+
+ /** Executes a result task, waits for it to finish, then returns its result. */
+ def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R
+
+ /** Retrieves the parallelism level of the task execution environment. */
+ def parallelismLevel: Int
+
+}
+
+
+/** This trait implements scheduling by employing
+ * an adaptive work stealing technique.
+ */
+trait AdaptiveWorkStealingTasks extends Tasks {
+
+ trait Task[R, Tp] extends super.Task[R, Tp] {
+ var next: Task[R, Tp] = null
+ var shouldWaitFor = true
+ var result: R
+
+ def split: Seq[Task[R, Tp]]
+
+ /** The actual leaf computation. */
+ def leaf(result: Option[R]): Unit
+
+ def compute = if (shouldSplitFurther) internal else leaf(None)
+
+ def internal = {
+ var last = spawnSubtasks
+
+ last.leaf(None)
+ result = last.result
+
+ while (last.next != null) {
+ val lastresult = Option(last.result)
+ last = last.next
+ if (last.tryCancel) last.leaf(lastresult) else last.sync
+ merge(last.repr)
+ }
+ }
+
+ def spawnSubtasks = {
+ var last: Task[R, Tp] = null
+ var head: Task[R, Tp] = this
+ do {
+ val subtasks = head.split
+ head = subtasks.head
+ for (t <- subtasks.tail) {
+ t.next = last
+ last = t
+ t.start
+ }
+ } while (head.shouldSplitFurther);
+ head.next = last
+ head
+ }
+
+ def printChain = {
+ var curr = this
+ var chain = "chain: "
+ while (curr != null) {
+ chain += curr + " ---> "
+ curr = curr.next
+ }
+ println(chain)
+ }
+ }
+
+}
+
+
+/**
+ * A trait describing objects that provide a fork/join pool.
+ */
+trait HavingForkJoinPool {
+ def forkJoinPool: ForkJoinPool
+}
+
+
+
+/** An implementation trait for parallel tasks based on the fork/join framework.
+ *
+ * @define fjdispatch
+ * If the current thread is a fork/join worker thread, the task's `fork` method will
+ * be invoked. Otherwise, the task will be executed on the fork/join pool.
+ */
+trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
+
+ trait Task[R, +Tp] extends RecursiveAction with super.Task[R, Tp] {
+ def start = fork
+ def sync = join
+ def tryCancel = tryUnfork
+ var result: R
+ }
+
+ type TaskType[R, +Tp] = Task[R, Tp]
+ type ExecutionEnvironment = ForkJoinPool
+
+ /** The fork/join pool of this collection.
+ */
+ def forkJoinPool: ForkJoinPool = environment
+ var environment = ForkJoinTasks.defaultForkJoinPool
+
+ /** Executes a task on a fork/join pool and waits for it to finish.
+ *
+ * $fjdispatch
+ */
+ def executeAndWait[R, Tp](fjtask: Task[R, Tp]) {
+ if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
+ fjtask.fork
+ } else {
+ forkJoinPool.execute(fjtask)
+ }
+ fjtask.join
+ }
+
+ /** Executes a task on a fork/join pool and waits for it to finish.
+ * Returns its result when it does.
+ *
+ * $fjdispatch
+ *
+ * @return the result of the task
+ */
+ def executeAndWaitResult[R, Tp](fjtask: Task[R, Tp]): R = {
+ if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
+ fjtask.fork
+ } else {
+ forkJoinPool.execute(fjtask)
+ }
+ fjtask.join
+ fjtask.result
+ }
+
+ def parallelismLevel = forkJoinPool.getParallelism
+
+}
+
+object ForkJoinTasks {
+ val defaultForkJoinPool = new ForkJoinPool
+ defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
+ defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
+}
+
+
+/* Some boilerplate due to no deep mixin composition. Not sure if it can be done differently without them.
+ */
+trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks {
+
+ trait Task[R, Tp] extends super[ForkJoinTasks].Task[R, Tp] with super[AdaptiveWorkStealingTasks].Task[R, Tp]
+
+}
+
+
+
+
+
+
+
+
+
+