diff options
author | aleksandar <aleksandar@lampmac14.epfl.ch> | 2012-01-25 15:34:18 +0100 |
---|---|---|
committer | aleksandar <aleksandar@lampmac14.epfl.ch> | 2012-01-25 15:41:51 +0100 |
commit | 4abec1f64da57268ada7126f22894d1b50ebdbd8 (patch) | |
tree | 72565438745c371928768ce258973b4dcfae97ef /src | |
parent | cc3ad6e00ad3a5cfcbbc47bf62f425a31911f79f (diff) | |
download | scala-4abec1f64da57268ada7126f22894d1b50ebdbd8.tar.gz scala-4abec1f64da57268ada7126f22894d1b50ebdbd8.tar.bz2 scala-4abec1f64da57268ada7126f22894d1b50ebdbd8.zip |
Fix for SI-5375.
Changed CompositeThrowable to inherit Exception instead of Throwable.
A few minor fixes for the jdk1.5 parallel collection tasks.
Diffstat (limited to 'src')
-rw-r--r-- | src/library/scala/collection/parallel/ParIterableLike.scala | 3 | ||||
-rw-r--r-- | src/library/scala/collection/parallel/Tasks.scala | 57 | ||||
-rw-r--r-- | src/library/scala/collection/parallel/package.scala | 2 |
3 files changed, 35 insertions, 27 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 90b64c17f9..390bd72ab5 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -895,7 +895,8 @@ self: ParIterableLike[T, Repr, Sequential] => @volatile var result: R1 = null.asInstanceOf[R1] def map(r: R): R1 def leaf(prevr: Option[R1]) = { - result = map(executeAndWaitResult(inner)) + val initialResult = executeAndWaitResult(inner) + result = map(initialResult) } private[parallel] override def signalAbort() { inner.signalAbort diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index 873291fb2d..b705909cad 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -88,7 +88,7 @@ trait Tasks { if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) { println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable) } else if (this.throwable != null || that.throwable != null) { - println("merging this thr: " + this.throwable + " with " + that + ", thr=" + that.throwable) + println("merging this: " + this + " with thr: " + this.throwable + " with " + that + ", thr=" + that.throwable) } } @@ -118,7 +118,7 @@ trait Tasks { /** Try to cancel the task. * @return `true` if cancellation is successful. */ - def tryCancel: Boolean + def tryCancel(): Boolean /** If the task has been cancelled successfully, those syncing on it may * automatically be notified, depending on the implementation. If they * aren't, this release method should be called after processing the @@ -161,32 +161,39 @@ trait AdaptiveWorkStealingTasks extends Tasks { def split: Seq[TaskImpl[R, Tp]] - def compute() = if (body.shouldSplitFurther) internal else body.tryLeaf(None) + def compute() = if (body.shouldSplitFurther) { + internal() + release() + } else { + body.tryLeaf(None) + release() + } def internal() = { var last = spawnSubtasks() - + last.body.tryLeaf(None) + last.release() body.result = last.body.result body.throwable = last.body.throwable - + while (last.next != null) { // val lastresult = Option(last.body.result) val beforelast = last last = last.next - if (last.tryCancel) { + if (last.tryCancel()) { // println("Done with " + beforelast.body + ", next direct is " + last.body) last.body.tryLeaf(Some(body.result)) - last.release + last.release() } else { // println("Done with " + beforelast.body + ", next sync is " + last.body) - last.sync + last.sync() } // println("Merging " + body + " with " + last.body) body.tryMerge(last.body.repr) } } - + def spawnSubtasks() = { var last: TaskImpl[R, Tp] = null var head: TaskImpl[R, Tp] = this @@ -196,7 +203,7 @@ trait AdaptiveWorkStealingTasks extends Tasks { for (t <- subtasks.tail.reverse) { t.next = last last = t - t.start + t.start() } } while (head.body.shouldSplitFurther); head.next = last @@ -230,12 +237,12 @@ trait ThreadPoolTasks extends Tasks { // utb: var future: Future[_] = null @volatile var owned = false @volatile var completed = false - + def start() = synchronized { // debuglog("Starting " + body) // utb: future = executor.submit(this) executor.synchronized { - incrTasks + incrTasks() executor.submit(this) } } @@ -249,9 +256,9 @@ trait ThreadPoolTasks extends Tasks { //assert(executor.getCorePoolSize == (coresize + 1)) } } - if (!completed) this.wait + while (!completed) this.wait } - def tryCancel = synchronized { + def tryCancel() = synchronized { // utb: future.cancel(false) if (!owned) { // debuglog("Cancelling " + body) @@ -259,7 +266,7 @@ trait ThreadPoolTasks extends Tasks { true } else false } - def run = { + def run() = { // utb: compute var isOkToRun = false synchronized { @@ -270,17 +277,17 @@ trait ThreadPoolTasks extends Tasks { } if (isOkToRun) { // debuglog("Running body of " + body) - compute - release + compute() } else { // just skip // debuglog("skipping body of " + body) } } - override def release = synchronized { + override def release() = synchronized { + //println("releasing: " + this + ", body: " + this.body) completed = true executor.synchronized { - decrTasks + decrTasks() } this.notifyAll } @@ -305,10 +312,10 @@ trait ThreadPoolTasks extends Tasks { val t = newTaskImpl(task) // debuglog("-----------> Executing without wait: " + task) - t.start + t.start() () => { - t.sync + t.sync() t.body.forwardThrowable t.body.result } @@ -318,9 +325,9 @@ trait ThreadPoolTasks extends Tasks { val t = newTaskImpl(task) // debuglog("-----------> Executing with wait: " + task) - t.start - - t.sync + t.start() + + t.sync() t.body.forwardThrowable t.body.result } @@ -369,7 +376,7 @@ trait FutureThreadPoolTasks extends Tasks { def sync() = future.get def tryCancel = false def run = { - compute + compute() } } diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index addc366072..f152629c50 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -117,7 +117,7 @@ package parallel { /** Composite throwable - thrown when multiple exceptions are thrown at the same time. */ final case class CompositeThrowable( val throwables: Set[Throwable] - ) extends Throwable( + ) extends Exception( "Multiple exceptions thrown during a parallel computation: " + throwables.map(t => t + "\n" + t.getStackTrace.take(10).++("...").mkString("\n")).mkString("\n\n") ) |