diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-10-20 20:20:00 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-10-20 20:20:00 +0000 |
commit | e7ca142b45255f6b41582c25fe590a664d5fc8b9 (patch) | |
tree | a674b7cc69ad247330d444f4011a55d6a7ce61e2 /src | |
parent | d3d218e5ea77584489437f0dfa8148ee3764d6f7 (diff) | |
download | scala-e7ca142b45255f6b41582c25fe590a664d5fc8b9.tar.gz scala-e7ca142b45255f6b41582c25fe590a664d5fc8b9.tar.bz2 scala-e7ca142b45255f6b41582c25fe590a664d5fc8b9.zip |
Some exception handling fixes in parallel colle...
Some exception handling fixes in parallel collections. Fixed some
regressions. Fixed some tests.
No review.
Diffstat (limited to 'src')
9 files changed, 127 insertions, 89 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 881ab80038..3d839119b0 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -227,15 +227,11 @@ self => def map(r: R): R1 = mapping(r) } - def compose[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]] { - val ft = tsk - val st = t2 + def compose[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]](tsk, t2) { def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr) } - def parallel[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]] { - val ft = tsk - val st = t2 + def parallel[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]](tsk, t2) { def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr) } } @@ -460,7 +456,9 @@ self => othtask.compute othtask.result } - val task = (copythis parallel copythat) { _ combine _ } mapResult { _.result } + val task = (copythis parallel copythat) { _ combine _ } mapResult { + _.result + } executeAndWaitResult(task) } else if (bf.isParallel) { // println("case parallel builder, `that` not parallel") @@ -687,37 +685,42 @@ self => protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]] - protected[this] trait Composite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] + protected[this] abstract class Composite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] + (val ft: First, val st: Second) extends NonDivisibleTask[R, Composite[FR, SR, R, First, Second]] { - val ft: First - val st: Second def combineResults(fr: FR, sr: SR): R var result: R = null.asInstanceOf[R] private[parallel] override def signalAbort { ft.signalAbort st.signalAbort } + protected def mergeSubtasks { + ft mergeThrowables st + if (throwable eq null) result = combineResults(ft.result, st.result) + } override def requiresStrictSplitters = ft.requiresStrictSplitters || st.requiresStrictSplitters } /** Sequentially performs one task after another. */ - protected[this] trait SeqComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] - extends Composite[FR, SR, R, First, Second] { + protected[this] abstract class SeqComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] + (f: First, s: Second) + extends Composite[FR, SR, R, First, Second](f, s) { def leaf(prevr: Option[R]) = { - ft.compute - st.compute - result = combineResults(ft.result, st.result) + executeAndWaitResult(ft) + executeAndWaitResult(st) + mergeSubtasks } } /** Performs two tasks in parallel, and waits for both to finish. */ - protected[this] trait ParComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] - extends Composite[FR, SR, R, First, Second] { + protected[this] abstract class ParComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] + (f: First, s: Second) + extends Composite[FR, SR, R, First, Second](f, s) { def leaf(prevr: Option[R]) = { - st.start - ft.compute - st.sync - result = combineResults(ft.result, st.result) + val ftfuture = execute(ft) + executeAndWaitResult(st) + ftfuture() + mergeSubtasks } } @@ -727,7 +730,8 @@ self => def map(r: R): R1 def leaf(prevr: Option[R1]) = { inner.compute - result = map(inner.result) + throwable = inner.throwable + if (throwable eq null) result = map(inner.result) } private[parallel] override def signalAbort { inner.signalAbort @@ -756,7 +760,7 @@ self => protected[this] def newSubtask(p: ParIterableIterator[T]) = new Reduce(op, p) override def merge(that: Reduce[U]) = if (this.result == None) result = that.result - else if (that.result != None) op(result.get, that.result.get) + else if (that.result != None) result = Some(op(result.get, that.result.get)) override def requiresStrictSplitters = true } @@ -890,7 +894,9 @@ self => protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, This], Take[U, This]] { var result: Combiner[U, This] = null - def leaf(prev: Option[Combiner[U, This]]) = result = pit.take2combiner(n, reuse(prev, cbf())) + def leaf(prev: Option[Combiner[U, This]]) = { + result = pit.take2combiner(n, reuse(prev, cbf())) + } protected[this] def newSubtask(p: ParIterableIterator[T]) = throw new UnsupportedOperationException override def split = { val pits = pit.split diff --git a/src/library/scala/collection/parallel/ParMapLike.scala b/src/library/scala/collection/parallel/ParMapLike.scala index e6944953b5..6eb621c76a 100644 --- a/src/library/scala/collection/parallel/ParMapLike.scala +++ b/src/library/scala/collection/parallel/ParMapLike.scala @@ -24,7 +24,7 @@ extends MapLike[K, V, Repr] protected[this] override def newBuilder: Builder[(K, V), Repr] = newCombiner - protected[this] override def newCombiner: Combiner[(K, V), Repr] = error("Must be implemented in concrete classes.") + protected[this] override def newCombiner: Combiner[(K, V), Repr] = unsupportedop("Must implement `newCombiner` in concrete collections.") override def empty: Repr diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index d2e0f965d6..b3d0593ecd 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -249,7 +249,9 @@ self => tsk.result } val copyend = new Copy[U, That](() => pbf(repr), pits(2)) - executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { _.result }) + executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { + _.result + }) } else patch_sequential(from, patch, replaced) private def patch_sequential[U >: T, That](from: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = { diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index 915e02f787..c58bc8b734 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -59,8 +59,8 @@ trait Tasks { protected[this] def merge(that: Tp) {} // exception handling mechanism - var exception: Exception = null - def forwardException = if (exception != null) throw exception + var throwable: Throwable = null + def forwardThrowable = if (throwable != null) throw throwable // tries to do the leaf computation, storing the possible exception protected def tryLeaf(result: Option[R]) { try { @@ -70,15 +70,21 @@ trait Tasks { signalAbort } } catch { - case e: Exception => - exception = e + case thr: Throwable => + throwable = thr signalAbort } } protected[this] def tryMerge(t: Tp) { val that = t.asInstanceOf[Task[R, Tp]] - if (this.exception == null && that.exception == null) merge(that.repr) - else if (that.exception != null) this.exception = that.exception + if (this.throwable == null && that.throwable == null) merge(t) + mergeThrowables(that) + } + private[parallel] def mergeThrowables(that: Task[_, _]) { + if (this.throwable != null && that.throwable != null) { + // merge exceptions, since there were multiple exceptions + this.throwable = this.throwable alongWith that.throwable + } else if (that.throwable != null) this.throwable = that.throwable } // override in concrete task implementations to signal abort to other tasks private[parallel] def signalAbort {} @@ -206,7 +212,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { () => { fjtask.join - fjtask.forwardException + fjtask.forwardThrowable fjtask.result } } @@ -225,7 +231,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { forkJoinPool.execute(fjtask) } fjtask.join - fjtask.forwardException + fjtask.forwardThrowable fjtask.result } diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index 37b52b7a40..7cc0adbb4f 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -40,6 +40,8 @@ self => override def empty: ParHashMap[K, V] = new ParHashMap[K, V] + protected[this] override def newCombiner = HashMapCombiner[K, V] + def parallelIterator: ParIterableIterator[(K, V)] = new ParHashMapIterator(trie.iterator, trie.size) with SCPI def seq = trie diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala index fb4119bddc..3648945857 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala @@ -28,6 +28,8 @@ self => override def empty: ParHashMap[K, V] = new ParHashMap[K, V] + protected[this] override def newCombiner = ParHashMapCombiner[K, V] + def seq = new collection.mutable.HashMap[K, V](hashTableContents) def parallelIterator = new ParHashMapIterator(0, table.length, size, table(0).asInstanceOf[DefaultEntry[K, V]]) with SCPI @@ -108,7 +110,9 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => // construct table val table = new AddingHashTable(size, tableLoadFactor) - executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks)) + val insertcount = executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks)) + + // TODO compare insertcount and size to see if compression is needed val c = table.hashTableContents new ParHashMap(c) @@ -118,7 +122,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => * it allocates the table of the required size when created. * * Entries are added using the `insertEntry` method. This method checks whether the element - * exists and updates the size map. + * exists and updates the size map. It returns false if the key was already in the table, + * and true if the key was successfully inserted. */ class AddingHashTable(numelems: Int, lf: Int) extends HashTable[K, DefaultEntry[K, V]] { import HashTable._ @@ -127,7 +132,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => tableSize = 0 threshold = newThreshold(_loadFactor, table.length) sizeMapInit(table.length) - def insertEntry(e: DefaultEntry[K, V]) { + def insertEntry(e: DefaultEntry[K, V]) = { var h = index(elemHashCode(e.key)) var olde = table(h).asInstanceOf[DefaultEntry[K, V]] @@ -146,24 +151,27 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => table(h) = e tableSize = tableSize + 1 nnSizeMapAdd(h) - } + true + } else false } } /* tasks */ class FillBlocks(buckets: Array[Unrolled[DefaultEntry[K, V]]], table: AddingHashTable, offset: Int, howmany: Int) - extends super.Task[Unit, FillBlocks] { - var result = () - def leaf(prev: Option[Unit]) = { + extends super.Task[Int, FillBlocks] { + var result = Int.MinValue + def leaf(prev: Option[Int]) = { var i = offset val until = offset + howmany + result = 0 while (i < until) { - fillBlock(buckets(i)) + result += fillBlock(buckets(i)) i += 1 } } - private def fillBlock(elems: Unrolled[DefaultEntry[K, V]]) { + private def fillBlock(elems: Unrolled[DefaultEntry[K, V]]) = { + var insertcount = 0 var unrolled = elems var i = 0 val t = table @@ -172,17 +180,21 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => val chunksz = unrolled.size while (i < chunksz) { val elem = chunkarr(i) - t.insertEntry(elem) + if (t.insertEntry(elem)) insertcount += 1 i += 1 } i = 0 unrolled = unrolled.next } + insertcount } def split = { val fp = howmany / 2 List(new FillBlocks(buckets, table, offset, fp), new FillBlocks(buckets, table, offset + fp, howmany - fp)) } + override def merge(that: FillBlocks) { + this.result += that.result + } def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, parallelismLevel) } diff --git a/src/library/scala/collection/parallel/mutable/ParMapLike.scala b/src/library/scala/collection/parallel/mutable/ParMapLike.scala index 902186eb2d..25b5c3b5f7 100644 --- a/src/library/scala/collection/parallel/mutable/ParMapLike.scala +++ b/src/library/scala/collection/parallel/mutable/ParMapLike.scala @@ -12,6 +12,6 @@ trait ParMapLike[K, V, +Repr <: ParMapLike[K, V, Repr, Sequential] with ParMap[K, V], +Sequential <: collection.mutable.Map[K, V] with collection.mutable.MapLike[K, V, Sequential]] -extends collection.parallel.ParMapLike[K, V, Repr, Sequential] - with collection.mutable.MapLike[K, V, Repr] +extends collection.mutable.MapLike[K, V, Repr] + with collection.parallel.ParMapLike[K, V, Repr, Sequential] diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index a30d564039..bb53dcdaaa 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -31,12 +31,61 @@ package object parallel { else sz } - private[parallel] def unsupported(msg: String) = throw new UnsupportedOperationException(msg) - private[parallel] def unsupported = throw new UnsupportedOperationException + private[parallel] def unsupportedop(msg: String) = throw new UnsupportedOperationException(msg) + + /* implicit conversions */ + + /** An implicit conversion providing arrays with a `par` method, which + * returns a parallel array. + * + * @tparam T type of the elements in the array, which is a subtype of AnyRef + * @param array the array to be parallelized + * @return a `Parallelizable` object with a `par` method= + */ + implicit def array2ParArray[T <: AnyRef](array: Array[T]) = new Parallelizable[mutable.ParArray[T]] { + def par = mutable.ParArray.handoff[T](array) + } + + implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new { + def isParallel = bf.isInstanceOf[Parallel] + def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]] + def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new { + def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody + } + } + + implicit def traversable2ops[T](t: TraversableOnce[T]) = new { + def isParallel = t.isInstanceOf[Parallel] + def isParIterable = t.isInstanceOf[ParIterable[_]] + def asParIterable = t.asInstanceOf[ParIterable[T]] + def isParSeq = t.isInstanceOf[ParSeq[_]] + def asParSeq = t.asInstanceOf[ParSeq[T]] + def ifParSeq[R](isbody: ParSeq[T] => R) = new { + def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody + } + def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else { + val it = t.toIterator + val cb = mutable.ParArrayCombiner[T]() + while (it.hasNext) cb += it.next + cb.result + } + } + + implicit def throwable2ops(self: Throwable) = new { + def alongWith(that: Throwable) = self match { + case ct: CompositeThrowable => new CompositeThrowable(ct.throwables + that) + case _ => new CompositeThrowable(Set(self, that)) + } + } + /* classes */ + /** Composite throwable - thrown when multiple exceptions are thrown at the same time. */ + final class CompositeThrowable(val throwables: Set[Throwable]) + extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.mkString(", ")) + /** Unrolled list node. */ private[parallel] class Unrolled[T: ClassManifest] { @@ -159,45 +208,6 @@ package object parallel { } else this } - - /* implicit conversions */ - - /** An implicit conversion providing arrays with a `par` method, which - * returns a parallel array. - * - * @tparam T type of the elements in the array, which is a subtype of AnyRef - * @param array the array to be parallelized - * @return a `Parallelizable` object with a `par` method= - */ - implicit def array2ParArray[T <: AnyRef](array: Array[T]) = new Parallelizable[mutable.ParArray[T]] { - def par = mutable.ParArray.handoff[T](array) - } - - implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new { - def isParallel = bf.isInstanceOf[Parallel] - def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]] - def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new { - def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody - } - } - - implicit def traversable2ops[T](t: TraversableOnce[T]) = new { - def isParallel = t.isInstanceOf[Parallel] - def isParIterable = t.isInstanceOf[ParIterable[_]] - def asParIterable = t.asInstanceOf[ParIterable[T]] - def isParSeq = t.isInstanceOf[ParSeq[_]] - def asParSeq = t.asInstanceOf[ParSeq[T]] - def ifParSeq[R](isbody: ParSeq[T] => R) = new { - def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody - } - def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else { - val it = t.toIterator - val cb = mutable.ParArrayCombiner[T]() - while (it.hasNext) cb += it.next - cb.result - } - } - } diff --git a/src/partest/scala/tools/partest/nest/Worker.scala b/src/partest/scala/tools/partest/nest/Worker.scala index fba24394a5..6e8ccff723 100644 --- a/src/partest/scala/tools/partest/nest/Worker.scala +++ b/src/partest/scala/tools/partest/nest/Worker.scala @@ -538,8 +538,8 @@ class Worker(val fileManager: FileManager, params: TestRunParams) extends Actor val lines = SFile(logFile).lines.filter(_.trim != "").toBuffer succeeded = { val failures = lines filter (_ startsWith "!") - val passedok = lines filter (_ startsWith "+") forall (_ contains "OK") - failures.isEmpty && passedok + //val passedok = lines filter (_ startsWith "+") forall (_ contains "OK") - OK may wrap!! + failures.isEmpty } if (!succeeded) { NestUI.normal("ScalaCheck test failed. Output:\n") |