summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/Tasks.scala
blob: 3ef60f8c7a9d6cc60833bb6769d769bd5debfd47 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package scala.collection.parallel




import scala.concurrent.forkjoin._










/** A trait that declares task execution capabilities used
 *  by parallel collections. Parallel collections inherit a subtrait
 *  of this trait.
 *
 *  One implementation trait of `TaskExecution` is `ForkJoinTaskExecution`.
 */
trait Tasks {

  /** A task abstraction which allows starting a task with `start`,
   *  waiting for it to finish with `sync` and attempting to cancel
   *  the task with `tryCancel`.
   *  It also defines a method `leaf` which must be called once the
   *  the task is started and defines what this task actually does.
   *  Method `split` allows splitting this task into smaller subtasks,
   *  and method `shouldSplitFurther` decides if the task should be
   *  partitioned further.
   *  Method `merge` allows merging the results of the 2 tasks. It updates
   *  the result of the receiver.
   *  Finally, it defines the task result of type `U`.
   */
  trait Task[R, +Tp] {
    def repr = this.asInstanceOf[Tp]
    /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */
    def compute
    /** Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous task
     *  or `None` if there was no previous task.
     */
    def leaf(result: Option[R])
    /** Start task. */
    def start
    /** Wait for task to finish. */
    def sync
    /** Try to cancel the task.
     *  @return     `true` if cancellation is successful.
     */
    def tryCancel: Boolean
    /** A result that can be accessed once the task is completed. */
    def result: R
    /** Decides whether or not this task should be split further. */
    def shouldSplitFurther: Boolean
    /** Splits this task into a list of smaller tasks. */
    protected[this] def split: Seq[Task[R, Tp]]
    /** Read of results of `that` task and merge them into results of this one. */
    protected[this] def merge(that: Tp) {}
  }

  type TaskType[R, +Tp] <: Task[R, Tp]
  type ExecutionEnvironment

  var environment: ExecutionEnvironment

  /** Executes a task and waits for it to finish. */
  def executeAndWait[R, Tp](task: TaskType[R, Tp])

  /** Executes a result task, waits for it to finish, then returns its result. */
  def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R

  /** Retrieves the parallelism level of the task execution environment. */
  def parallelismLevel: Int

}


/** This trait implements scheduling by employing
 *  an adaptive work stealing technique.
 */
trait AdaptiveWorkStealingTasks extends Tasks {

  trait Task[R, Tp] extends super.Task[R, Tp] {
    var next: Task[R, Tp] = null
    var shouldWaitFor = true
    var result: R

    def split: Seq[Task[R, Tp]]

    /** The actual leaf computation. */
    def leaf(result: Option[R]): Unit

    def compute = if (shouldSplitFurther) internal else leaf(None)

    def internal = {
      var last = spawnSubtasks

      last.leaf(None)
      result = last.result

      while (last.next != null) {
        val lastresult = Option(last.result)
        last = last.next
        if (last.tryCancel) last.leaf(lastresult) else last.sync
        merge(last.repr)
      }
    }

    def spawnSubtasks = {
      var last: Task[R, Tp] = null
      var head: Task[R, Tp] = this
      do {
        val subtasks = head.split
        head = subtasks.head
        for (t <- subtasks.tail) {
          t.next = last
          last = t
          t.start
        }
      } while (head.shouldSplitFurther);
      head.next = last
      head
    }

    def printChain = {
      var curr = this
      var chain = "chain: "
      while (curr != null) {
        chain += curr + " ---> "
        curr = curr.next
      }
      println(chain)
    }
  }

}


/**
 * A trait describing objects that provide a fork/join pool.
 */
trait HavingForkJoinPool {
  def forkJoinPool: ForkJoinPool
}



/** An implementation trait for parallel tasks based on the fork/join framework.
 *
 *  @define fjdispatch
 *  If the current thread is a fork/join worker thread, the task's `fork` method will
 *  be invoked. Otherwise, the task will be executed on the fork/join pool.
 */
trait ForkJoinTasks extends Tasks with HavingForkJoinPool {

  trait Task[R, +Tp] extends RecursiveAction with super.Task[R, Tp] {
    def start = fork
    def sync = join
    def tryCancel = tryUnfork
    var result: R
  }

  type TaskType[R, +Tp] = Task[R, Tp]
  type ExecutionEnvironment = ForkJoinPool

  /** The fork/join pool of this collection.
   */
  def forkJoinPool: ForkJoinPool = environment
  var environment = ForkJoinTasks.defaultForkJoinPool

  /** Executes a task on a fork/join pool and waits for it to finish.
   *
   *  $fjdispatch
   */
  def executeAndWait[R, Tp](fjtask: Task[R, Tp]) {
    if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
      fjtask.fork
    } else {
      forkJoinPool.execute(fjtask)
    }
    fjtask.join
  }

  /** Executes a task on a fork/join pool and waits for it to finish.
   *  Returns its result when it does.
   *
   *  $fjdispatch
   *
   *  @return    the result of the task
   */
  def executeAndWaitResult[R, Tp](fjtask: Task[R, Tp]): R = {
    if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
      fjtask.fork
    } else {
      forkJoinPool.execute(fjtask)
    }
    fjtask.join
    fjtask.result
  }

  def parallelismLevel = forkJoinPool.getParallelism

}

object ForkJoinTasks {
  val defaultForkJoinPool = new ForkJoinPool
  defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
  defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
}


/* Some boilerplate due to no deep mixin composition. Not sure if it can be done differently without them.
 */
trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks {

  trait Task[R, Tp] extends super[ForkJoinTasks].Task[R, Tp] with super[AdaptiveWorkStealingTasks].Task[R, Tp]

}