summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/Tasks.scala
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-25 15:34:18 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-25 15:41:51 +0100
commit4abec1f64da57268ada7126f22894d1b50ebdbd8 (patch)
tree72565438745c371928768ce258973b4dcfae97ef /src/library/scala/collection/parallel/Tasks.scala
parentcc3ad6e00ad3a5cfcbbc47bf62f425a31911f79f (diff)
downloadscala-4abec1f64da57268ada7126f22894d1b50ebdbd8.tar.gz
scala-4abec1f64da57268ada7126f22894d1b50ebdbd8.tar.bz2
scala-4abec1f64da57268ada7126f22894d1b50ebdbd8.zip
Fix for SI-5375.
Changed CompositeThrowable to inherit Exception instead of Throwable. A few minor fixes for the jdk1.5 parallel collection tasks.
Diffstat (limited to 'src/library/scala/collection/parallel/Tasks.scala')
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala57
1 files changed, 32 insertions, 25 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index 873291fb2d..b705909cad 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -88,7 +88,7 @@ trait Tasks {
if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) {
println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable)
} else if (this.throwable != null || that.throwable != null) {
- println("merging this thr: " + this.throwable + " with " + that + ", thr=" + that.throwable)
+ println("merging this: " + this + " with thr: " + this.throwable + " with " + that + ", thr=" + that.throwable)
}
}
@@ -118,7 +118,7 @@ trait Tasks {
/** Try to cancel the task.
* @return `true` if cancellation is successful.
*/
- def tryCancel: Boolean
+ def tryCancel(): Boolean
/** If the task has been cancelled successfully, those syncing on it may
* automatically be notified, depending on the implementation. If they
* aren't, this release method should be called after processing the
@@ -161,32 +161,39 @@ trait AdaptiveWorkStealingTasks extends Tasks {
def split: Seq[TaskImpl[R, Tp]]
- def compute() = if (body.shouldSplitFurther) internal else body.tryLeaf(None)
+ def compute() = if (body.shouldSplitFurther) {
+ internal()
+ release()
+ } else {
+ body.tryLeaf(None)
+ release()
+ }
def internal() = {
var last = spawnSubtasks()
-
+
last.body.tryLeaf(None)
+ last.release()
body.result = last.body.result
body.throwable = last.body.throwable
-
+
while (last.next != null) {
// val lastresult = Option(last.body.result)
val beforelast = last
last = last.next
- if (last.tryCancel) {
+ if (last.tryCancel()) {
// println("Done with " + beforelast.body + ", next direct is " + last.body)
last.body.tryLeaf(Some(body.result))
- last.release
+ last.release()
} else {
// println("Done with " + beforelast.body + ", next sync is " + last.body)
- last.sync
+ last.sync()
}
// println("Merging " + body + " with " + last.body)
body.tryMerge(last.body.repr)
}
}
-
+
def spawnSubtasks() = {
var last: TaskImpl[R, Tp] = null
var head: TaskImpl[R, Tp] = this
@@ -196,7 +203,7 @@ trait AdaptiveWorkStealingTasks extends Tasks {
for (t <- subtasks.tail.reverse) {
t.next = last
last = t
- t.start
+ t.start()
}
} while (head.body.shouldSplitFurther);
head.next = last
@@ -230,12 +237,12 @@ trait ThreadPoolTasks extends Tasks {
// utb: var future: Future[_] = null
@volatile var owned = false
@volatile var completed = false
-
+
def start() = synchronized {
// debuglog("Starting " + body)
// utb: future = executor.submit(this)
executor.synchronized {
- incrTasks
+ incrTasks()
executor.submit(this)
}
}
@@ -249,9 +256,9 @@ trait ThreadPoolTasks extends Tasks {
//assert(executor.getCorePoolSize == (coresize + 1))
}
}
- if (!completed) this.wait
+ while (!completed) this.wait
}
- def tryCancel = synchronized {
+ def tryCancel() = synchronized {
// utb: future.cancel(false)
if (!owned) {
// debuglog("Cancelling " + body)
@@ -259,7 +266,7 @@ trait ThreadPoolTasks extends Tasks {
true
} else false
}
- def run = {
+ def run() = {
// utb: compute
var isOkToRun = false
synchronized {
@@ -270,17 +277,17 @@ trait ThreadPoolTasks extends Tasks {
}
if (isOkToRun) {
// debuglog("Running body of " + body)
- compute
- release
+ compute()
} else {
// just skip
// debuglog("skipping body of " + body)
}
}
- override def release = synchronized {
+ override def release() = synchronized {
+ //println("releasing: " + this + ", body: " + this.body)
completed = true
executor.synchronized {
- decrTasks
+ decrTasks()
}
this.notifyAll
}
@@ -305,10 +312,10 @@ trait ThreadPoolTasks extends Tasks {
val t = newTaskImpl(task)
// debuglog("-----------> Executing without wait: " + task)
- t.start
+ t.start()
() => {
- t.sync
+ t.sync()
t.body.forwardThrowable
t.body.result
}
@@ -318,9 +325,9 @@ trait ThreadPoolTasks extends Tasks {
val t = newTaskImpl(task)
// debuglog("-----------> Executing with wait: " + task)
- t.start
-
- t.sync
+ t.start()
+
+ t.sync()
t.body.forwardThrowable
t.body.result
}
@@ -369,7 +376,7 @@ trait FutureThreadPoolTasks extends Tasks {
def sync() = future.get
def tryCancel = false
def run = {
- compute
+ compute()
}
}