From f2ecbd04691b1914e2f77c60afc2b296aa6826ae Mon Sep 17 00:00:00 2001 From: Aleksandar Pokopec Date: Thu, 9 Dec 2010 10:08:20 +0000 Subject: Array combiners implementation changed from arr... Array combiners implementation changed from array buffers to doubling unrolled buffers to avoid excessive copying. Still evaluating the benefits of this. No review. --- .../scala/collection/immutable/HashMap.scala | 27 ++- .../scala/collection/immutable/HashSet.scala | 27 ++- .../collection/parallel/ParIterableLike.scala | 238 +++++++++------------ .../scala/collection/parallel/ParSeqLike.scala | 2 + .../collection/parallel/RemainsIterator.scala | 48 ++++- .../scala/collection/parallel/UnrolledBuffer.scala | 30 ++- .../collection/parallel/immutable/ParHashMap.scala | 18 +- .../collection/parallel/immutable/ParHashSet.scala | 16 +- .../collection/parallel/immutable/ParRange.scala | 2 + .../collection/parallel/immutable/package.scala | 1 + .../collection/parallel/mutable/LazyCombiner.scala | 2 +- .../collection/parallel/mutable/ParArray.scala | 112 ++++++++-- .../parallel/mutable/ParArrayCombiner.scala | 112 +++++++++- .../parallel/mutable/ParFlatHashTable.scala | 1 + .../collection/parallel/mutable/ParHashTable.scala | 2 + .../collection/parallel/mutable/package.scala | 5 + .../scala/collection/parallel/package.scala | 9 +- test/benchmarks/source.list | 1 + .../scala/collection/parallel/Benchmarking.scala | 1 + .../benchmarks/parallel_array/ForeachHeavy.scala | 3 + .../benchmarks/parallel_array/ScanMedium.scala | 55 +++++ .../parallel-collections/IntOperators.scala | 3 +- .../parallel-collections/Operators.scala | 2 +- .../ParallelHashMapCheck.scala | 34 +-- .../ParallelHashSetCheck.scala | 44 ++-- .../ParallelIterableCheck.scala | 16 ++ 26 files changed, 567 insertions(+), 244 deletions(-) create mode 100644 test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanMedium.scala diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala index 4a7a10c6c1..f6e17e9630 100644 --- a/src/library/scala/collection/immutable/HashMap.scala +++ b/src/library/scala/collection/immutable/HashMap.scala @@ -473,14 +473,25 @@ time { mNew.iterator.foreach( p => ()) } } class TrieIterator[A, +B](elems: Array[HashMap[A, B]]) extends Iterator[(A, B)] { - private[this] var depth = 0 - private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6) - private[this] var posStack = new Array[Int](6) - - private[this] var arrayD = elems - private[this] var posD = 0 - - private[this] var subIter: Iterator[(A, B)] = null // to traverse collision nodes + protected var depth = 0 + protected var arrayStack: Array[Array[HashMap[A, B @uncheckedVariance]]] = new Array[Array[HashMap[A,B]]](6) + protected var posStack = new Array[Int](6) + + protected var arrayD: Array[HashMap[A, B @uncheckedVariance]] = elems + protected var posD = 0 + + protected var subIter: Iterator[(A, B @uncheckedVariance)] = null // to traverse collision nodes + + def dupIterator: TrieIterator[A, B] = { + val t = new TrieIterator(elems) + t.depth = depth + t.arrayStack = arrayStack + t.posStack = posStack + t.arrayD = arrayD + t.posD = posD + t.subIter = subIter + t + } def hasNext = (subIter ne null) || depth >= 0 diff --git a/src/library/scala/collection/immutable/HashSet.scala b/src/library/scala/collection/immutable/HashSet.scala index 8d52908349..720ea29639 100644 --- a/src/library/scala/collection/immutable/HashSet.scala +++ b/src/library/scala/collection/immutable/HashSet.scala @@ -285,14 +285,25 @@ time { mNew.iterator.foreach( p => ()) } class TrieIterator[A](elems: Array[HashSet[A]]) extends Iterator[A] { - private[this] var depth = 0 - private[this] var arrayStack = new Array[Array[HashSet[A]]](6) - private[this] var posStack = new Array[Int](6) - - private[this] var arrayD = elems - private[this] var posD = 0 - - private[this] var subIter: Iterator[A] = null // to traverse collision nodes + protected var depth = 0 + protected var arrayStack = new Array[Array[HashSet[A]]](6) + protected var posStack = new Array[Int](6) + + protected var arrayD = elems + protected var posD = 0 + + protected var subIter: Iterator[A] = null // to traverse collision nodes + + def dupIterator: TrieIterator[A] = { + val t = new TrieIterator(elems) + t.depth = depth + t.arrayStack = arrayStack + t.posStack = posStack + t.arrayD = arrayD + t.posD = posD + t.subIter = subIter + t + } def hasNext = (subIter ne null) || depth >= 0 diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index ed757655f5..d3e6eb42d4 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -4,7 +4,7 @@ package scala.collection.parallel import scala.collection.mutable.Builder -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.ArrayBuffer import scala.collection.IterableLike import scala.collection.Parallel import scala.collection.Parallelizable @@ -577,15 +577,13 @@ self => * * @return a new $coll containing the prefix scan of the elements in this $coll */ - def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[Repr, U, That]): That = { - val array = new Array[Any](size + 1) - array(0) = z - executeAndWaitResult(new BuildScanTree[U, Any](z, op, 1, size, array, parallelIterator) mapResult { st => - executeAndWaitResult(new ScanWithScanTree[U, Any](Some(z), op, st, array, array) mapResult { u => - executeAndWaitResult(new FromArray(array, 0, size + 1, cbf) mapResult { _.result }) + def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[Repr, U, That]): That = if (parallelismLevel > 1) { + if (size > 0) executeAndWaitResult(new CreateScanTree(0, size, z, op, parallelIterator) mapResult { + tree => executeAndWaitResult(new FromScanTree(tree, z, op, cbf) mapResult { + cb => cb.result }) - }) - } + }) else (cbf(self.repr) += z).result + } else super.scanLeft(z)(op)(cbf) /** Takes the longest prefix of elements that satisfy the predicate. * @@ -1099,154 +1097,117 @@ self => override def requiresStrictSplitters = true } - protected[this] class ScanTree[U >: T](val from: Int, val len: Int) { - var value: U = _ - var left: ScanTree[U] = null - var right: ScanTree[U] = null - @volatile var chunkFinished = false - var activeScan: () => Unit = null - - def isApplying = activeScan ne null - def isLeaf = (left eq null) && (right eq null) - def shouldApply = !chunkFinished && !isApplying - def applyToInterval[A >: U](elem: U, op: (U, U) => U, array: Array[A]) = { - //executeAndWait(new ApplyToArray(elem, op, from, len, array)) + protected[this] class CreateScanTree[U >: T](from: Int, len: Int, z: U, op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) + extends Transformer[ScanTree[U], CreateScanTree[U]] { + var result: ScanTree[U] = null + def leaf(prev: Option[ScanTree[U]]) = if (pit.remaining > 0) { + val trees = ArrayBuffer[ScanTree[U]]() var i = from val until = from + len + val blocksize = scanBlockSize while (i < until) { - array(i) = op(elem, array(i).asInstanceOf[U]) - i += 1 + trees += scanBlock(i, math.min(blocksize, pit.remaining)) + i += blocksize } + + // merge trees + result = mergeTrees(trees, 0, trees.length) + } else result = null // no elements to scan (merge will take care of `null`s) + private def scanBlock(from: Int, len: Int): ScanTree[U] = { + val pitdup = pit.dup + new ScanLeaf(pitdup, op, from, len, None, pit.reduceLeft(len, op)) } - def scanInterval[A >: U](elem: U, op: (U, U) => U, srcA: Array[A], destA: Array[A]) = { - val src = srcA.asInstanceOf[Array[Any]] - val dest = destA.asInstanceOf[Array[Any]] - var last = elem - var i = from - val until = from + len - while (i < until) { - last = op(last, src(i - 1).asInstanceOf[U]) - dest(i) = last - i += 1 + private def mergeTrees(trees: ArrayBuffer[ScanTree[U]], from: Int, howmany: Int): ScanTree[U] = if (howmany > 1) { + val half = howmany / 2 + ScanNode(mergeTrees(trees, from, half), mergeTrees(trees, from + half, howmany - half)) + } else trees(from) + protected[this] def newSubtask(pit: ParIterableIterator[T]) = unsupported + override def split = { + val pits = pit.split + for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield { + new CreateScanTree(untilp, p.remaining, z, op, p) } } - def pushDown(v: U, op: (U, U) => U) { - value = op(v, value) - if (left ne null) left.pushDown(v, op) - if (right ne null) right.pushDown(v, op) - } - def pushDownOnRight(v: U, op: (U, U) => U) = if (right ne null) right.pushDown(v, op) - def printTree: Unit = printTree(0) - private def printTree(t: Int): Unit = { - for (i <- 0 until t) print(" ") - if (isLeaf) print("(l) ") - println(value + ": from " + from + " until " + (from + len)) - if (left ne null) left.printTree(t + 1) - if (right ne null) right.printTree(t + 1) - } + override def merge(that: CreateScanTree[U]) = if (this.result != null) { + if (that.result != null) result = ScanNode(result, that.result) + } else result = that.result + override def requiresStrictSplitters = true } - protected[this] class ApplyToArray[U >: T, A >: U](elem: U, op: (U, U) => U, from: Int, len: Int, array: Array[A]) - extends StrictSplitterCheckTask[Unit, ApplyToArray[U, A]] { - var result: Unit = () - def leaf(prev: Option[Unit]) = { - var i = from - val until = from + len - while (i < until) { - array(i) = op(elem, array(i).asInstanceOf[U]) - i += 1 - } + protected[this] class FromScanTree[U >: T, That] + (tree: ScanTree[U], z: U, op: (U, U) => U, cbf: CanCombineFrom[Repr, U, That]) + extends StrictSplitterCheckTask[Combiner[U, That], FromScanTree[U, That]] { + var result: Combiner[U, That] = null + def leaf(prev: Option[Combiner[U, That]]) { + val cb = reuse(prev, cbf(self.repr)) + iterate(tree, cb) + result = cb } - def shouldSplitFurther = len > threshold(size, parallelismLevel min availableProcessors) - def split = { - val fp = len / 2 - val sp = len - fp - Seq( - new ApplyToArray(elem, op, from, fp, array), - new ApplyToArray(elem, op, from + fp, sp, array) + private def iterate(tree: ScanTree[U], cb: Combiner[U, That]): Unit = tree match { + case ScanNode(left, right) => + iterate(left, cb) + iterate(right, cb) + case ScanLeaf(p, _, _, len, Some(prev), _) => + p.scanToCombiner(len, prev.acc, op, cb) + case ScanLeaf(p, _, _, len, None, _) => + cb += z + p.scanToCombiner(len, z, op, cb) + } + def split = tree match { + case ScanNode(left, right) => Seq( + new FromScanTree(left, z, op, cbf), + new FromScanTree(right, z, op, cbf) ) + case _ => unsupportedop("Cannot be split further") } + def shouldSplitFurther = tree match { + case ScanNode(_, _) => true + case ScanLeaf(_, _, _, _, _, _) => false + } + override def merge(that: FromScanTree[U, That]) = result = result combine that.result } - protected[this] class BuildScanTree[U >: T, A >: U](z: U, op: (U, U) => U, val from: Int, val len: Int, array: Array[A], protected[this] val pit: ParIterableIterator[T]) - extends Accessor[ScanTree[U], BuildScanTree[U, A]] { - // TODO reimplement - there are some issues here - var result: ScanTree[U] = null - def leaf(prev: Option[ScanTree[U]]) = if ((prev != None && prev.get.chunkFinished) || from == 1) { - val prevElem = if (from == 1) z else prev.get.value - result = new ScanTree[U](from, len) - pit.scanToArray(prevElem, op, array, from) - result.value = array(from + len - 1).asInstanceOf[U] - result.chunkFinished = true - } else { - result = new ScanTree[U](from, len) - result.value = pit.fold(z)(op) - } - protected[this] def newSubtask(p: ParIterableIterator[T]) = unsupported - override def split = { - val pits = pit.split - for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining); if untilp < len) yield { - val plen = p.remaining min (len - untilp) - new BuildScanTree[U, A](z, op, from + untilp, plen, array, p) - } - } - override def merge(that: BuildScanTree[U, A]) = { - // create scan tree node - val left = result - val right = that.result - val ns = new ScanTree[U](left.from, left.len + right.len) - ns.left = left - ns.right = right - ns.value = op(left.value, right.value) - ns.pushDownOnRight(left.value, op) - - // set result - result = ns - } - override def requiresStrictSplitters = true + /* scan tree */ + + protected[this] def scanBlockSize = (threshold(size, parallelismLevel) / 2) max 1 + + protected[this] trait ScanTree[U >: T] { + def beginsAt: Int + def pushdown(v: U): Unit + def leftmost: ScanLeaf[U] + def rightmost: ScanLeaf[U] + def print(depth: Int = 0): Unit } - protected[this] class ScanWithScanTree[U >: T, A >: U](first: Option[U], op: (U, U) => U, st: ScanTree[U], src: Array[A], dest: Array[A]) - extends StrictSplitterCheckTask[Unit, ScanWithScanTree[U, A]] { - var result = (); - def leaf(prev: Option[Unit]) = scan(st, first.get) - private def scan(st: ScanTree[U], elem: U): Unit = if (!st.chunkFinished) { - if (st.isLeaf) st.scanInterval(elem, op, src, dest) - else { - scan(st.left, elem) - scan(st.right, st.left.value) - } + protected[this] case class ScanNode[U >: T](left: ScanTree[U], right: ScanTree[U]) extends ScanTree[U] { + right.pushdown(left.rightmost.acc) + right.leftmost.prev = Some(left.rightmost) + + val leftmost = left.leftmost + val rightmost = right.rightmost + + def beginsAt = left.beginsAt + def pushdown(v: U) { + left.pushdown(v) + right.pushdown(v) + } + def print(depth: Int) { + println((" " * depth) + "ScanNode, begins at " + beginsAt) + left.print(depth + 1) + right.print(depth + 1) } - def split = collection.mutable.ArrayBuffer( - new ScanWithScanTree(first, op, st.left, src, dest), - new ScanWithScanTree(Some(st.left.value), op, st.right, src, dest) - ) - def shouldSplitFurther = (st.left ne null) && (st.right ne null) } - protected[this] class FromArray[S, A, That](array: Array[A], from: Int, len: Int, cbf: CanCombineFrom[Repr, S, That]) - extends StrictSplitterCheckTask[Combiner[S, That], FromArray[S, A, That]] { - var result: Result = null - def leaf(prev: Option[Result]) = { - val cb = prev getOrElse cbf(self.repr) - var i = from - val until = from + len - while (i < until) { - cb += array(i).asInstanceOf[S] - i += 1 - } - result = cb - } - def shouldSplitFurther = len > threshold(size, parallelismLevel) - def split = { - val fp = len / 2 - val sp = len - fp - Seq( - new FromArray(array, from, fp, cbf), - new FromArray(array, from + fp, sp, cbf) - ) + protected[this] case class ScanLeaf[U >: T] + (pit: ParIterableIterator[U], op: (U, U) => U, from: Int, len: Int, var prev: Option[ScanLeaf[U]], var acc: U) + extends ScanTree[U] { + def beginsAt = from + def pushdown(v: U) = { + acc = op(v, acc) } - override def merge(that: FromArray[S, A, That]) = result = result combine that.result + def leftmost = this + def rightmost = this + def print(depth: Int) = println((" " * depth) + this) } /* debug information */ @@ -1255,7 +1216,8 @@ self => private[parallel] def brokenInvariants = Seq[String]() - private val debugBuffer = collection.mutable.ArrayBuffer[String]() + // private val dbbuff = ArrayBuffer[String]() + def debugBuffer: ArrayBuffer[String] = null // dbbuff private[parallel] def debugclear() = synchronized { debugBuffer.clear diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 0ea33d0e39..91e15fa946 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -102,6 +102,8 @@ self => final def remaining = end - i + def dup = new Elements(i, end) with SignalContextPassingIterator[ParIterator] + def split = psplit(remaining / 2, remaining - remaining / 2) def psplit(sizes: Int*) = { diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index a33702a527..3363186b7d 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -87,6 +87,16 @@ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] { } } + def reduceLeft[U >: T](howmany: Int, op: (U, U) => U): U = { + var i = howmany - 1 + var u: U = next + while (i > 0 && hasNext) { + u = op(u, next) + i -= 1 + } + u + } + /* transformers to combiners */ def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = { @@ -165,7 +175,7 @@ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] { def slice2combiner[U >: T, This](from: Int, until: Int, cb: Combiner[U, This]): Combiner[U, This] = { drop(from) - var left = until - from + var left = math.max(until - from, 0) cb.sizeHint(left) while (left > 0) { cb += next @@ -221,6 +231,26 @@ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] { } } + def scanToCombiner[U >: T, That](startValue: U, op: (U, U) => U, cb: Combiner[U, That]) = { + var curr = startValue + while (hasNext) { + curr = op(curr, next) + cb += curr + } + cb + } + + def scanToCombiner[U >: T, That](howmany: Int, startValue: U, op: (U, U) => U, cb: Combiner[U, That]) = { + var curr = startValue + var left = howmany + while (left > 0) { + curr = op(curr, next) + cb += curr + left -= 1 + } + cb + } + def zip2combiner[U >: T, S, That](otherpit: RemainsIterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = { cb.sizeHint(remaining min otherpit.remaining) while (hasNext && otherpit.hasNext) { @@ -336,6 +366,9 @@ extends AugmentedIterableIterator[T] { self => + /** Creates a copy of this iterator. */ + def dup: ParIterableIterator[T] + def split: Seq[ParIterableIterator[T]] /** The number of elements this iterator has yet to traverse. This method @@ -377,6 +410,7 @@ self => var remaining = taken min self.remaining def hasNext = remaining > 0 def next = { remaining -= 1; self.next } + def dup: ParIterableIterator[T] = self.dup.take(taken) def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) } protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = { val sizes = sq.scanLeft(0)(_ + _.remaining) @@ -400,6 +434,7 @@ self => def hasNext = self.hasNext def next = f(self.next) def remaining = self.remaining + def dup: ParIterableIterator[S] = self.dup map f def split: Seq[ParIterableIterator[S]] = self.split.map { _ map f } } @@ -418,6 +453,7 @@ self => } else curr.next def remaining = if (curr eq self) curr.remaining + that.remaining else curr.remaining protected def firstNonEmpty = (curr eq self) && curr.hasNext + def dup: ParIterableIterator[U] = self.dup.appendParIterable[U, PI](that) def split: Seq[ParIterableIterator[U]] = if (firstNonEmpty) Seq(curr, that) else curr.split } @@ -428,6 +464,7 @@ self => def hasNext = self.hasNext && that.hasNext def next = (self.next, that.next) def remaining = self.remaining min that.remaining + def dup: ParIterableIterator[(T, S)] = self.dup.zipParSeq(that) def split: Seq[ParIterableIterator[(T, S)]] = { val selfs = self.split val sizes = selfs.map(_.remaining) @@ -447,6 +484,7 @@ self => else (self.next, thatelem) } else (thiselem, that.next); def remaining = self.remaining max that.remaining + def dup: ParIterableIterator[(U, S)] = self.dup.zipAllParSeq(that, thiselem, thatelem) def split: Seq[ParIterableIterator[(U, S)]] = { val selfrem = self.remaining val thatrem = that.remaining @@ -468,6 +506,8 @@ extends ParIterableIterator[T] with PreciseSplitter[T] { self => + def dup: ParSeqIterator[T] + def split: Seq[ParSeqIterator[T]] def psplit(sizes: Int*): Seq[ParSeqIterator[T]] @@ -483,6 +523,7 @@ self => /* iterator transformers */ class Taken(tk: Int) extends super.Taken(tk) with ParSeqIterator[T] { + override def dup = super.dup.asInstanceOf[ParSeqIterator[T]] override def split: Seq[ParSeqIterator[T]] = super.split.asInstanceOf[Seq[ParSeqIterator[T]]] def psplit(sizes: Int*): Seq[ParSeqIterator[T]] = takeSeq(self.psplit(sizes: _*)) { (p, n) => p.take(n) } } @@ -497,6 +538,7 @@ self => } class Mapped[S](f: T => S) extends super.Mapped[S](f) with ParSeqIterator[S] { + override def dup = super.dup.asInstanceOf[ParSeqIterator[S]] override def split: Seq[ParSeqIterator[S]] = super.split.asInstanceOf[Seq[ParSeqIterator[S]]] def psplit(sizes: Int*): Seq[ParSeqIterator[S]] = self.psplit(sizes: _*).map { _ map f } } @@ -504,6 +546,7 @@ self => override def map[S](f: T => S) = new Mapped(f) class Appended[U >: T, PI <: ParSeqIterator[U]](it: PI) extends super.Appended[U, PI](it) with ParSeqIterator[U] { + override def dup = super.dup.asInstanceOf[ParSeqIterator[U]] override def split: Seq[ParSeqIterator[U]] = super.split.asInstanceOf[Seq[ParSeqIterator[U]]] def psplit(sizes: Int*): Seq[ParSeqIterator[U]] = if (firstNonEmpty) { val selfrem = self.remaining @@ -534,6 +577,7 @@ self => def appendParSeq[U >: T, PI <: ParSeqIterator[U]](that: PI) = new Appended[U, PI](that) class Zipped[S](ti: ParSeqIterator[S]) extends super.Zipped[S](ti) with ParSeqIterator[(T, S)] { + override def dup = super.dup.asInstanceOf[ParSeqIterator[(T, S)]] override def split: Seq[ParSeqIterator[(T, S)]] = super.split.asInstanceOf[Seq[ParSeqIterator[(T, S)]]] def psplit(szs: Int*) = (self.psplit(szs: _*) zip that.psplit(szs: _*)) map { p => p._1 zipParSeq p._2 } } @@ -541,6 +585,7 @@ self => override def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that) class ZippedAll[U >: T, S](ti: ParSeqIterator[S], thise: U, thate: S) extends super.ZippedAll[U, S](ti, thise, thate) with ParSeqIterator[(U, S)] { + override def dup = super.dup.asInstanceOf[ParSeqIterator[(U, S)]] private def patchem = { val selfrem = self.remaining val thatrem = that.remaining @@ -578,6 +623,7 @@ self => def hasNext = trio.hasNext def next = trio.next def remaining = trio.remaining + def dup = self.dup.patchParSeq(from, patch, replaced) def split = trio.split def psplit(sizes: Int*) = trio.psplit(sizes: _*) } diff --git a/src/library/scala/collection/parallel/UnrolledBuffer.scala b/src/library/scala/collection/parallel/UnrolledBuffer.scala index d29c24822c..c7a8b388bd 100644 --- a/src/library/scala/collection/parallel/UnrolledBuffer.scala +++ b/src/library/scala/collection/parallel/UnrolledBuffer.scala @@ -46,7 +46,7 @@ extends collection.mutable.Buffer[T] { import UnrolledBuffer.Unrolled - private var headptr = new Unrolled[T] + private var headptr = newUnrolled private var lastptr = headptr private var sz = 0 @@ -54,9 +54,14 @@ extends collection.mutable.Buffer[T] private[parallel] def headPtr_=(head: Unrolled[T]) = headptr = head private[parallel] def lastPtr = lastptr private[parallel] def lastPtr_=(last: Unrolled[T]) = lastptr = last + private[parallel] def size_=(s: Int) = sz = s protected[this] override def newBuilder = new UnrolledBuffer[T] + protected def newUnrolled = new Unrolled[T](this) + + private[collection] def calcNextLength(sz: Int) = sz + def classManifestCompanion = UnrolledBuffer def concat(that: UnrolledBuffer[T]) = { @@ -82,7 +87,7 @@ extends collection.mutable.Buffer[T] } def clear() { - headptr = new Unrolled[T] + headptr = newUnrolled lastptr = headptr sz = 0 } @@ -153,20 +158,23 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] { val waterline = 50 val waterlineDelim = 100 - private[parallel] val unrolledsize = 32 + private[parallel] val unrolledlength = 32 /** Unrolled buffer node. */ - class Unrolled[T: ClassManifest] private (var size: Int, var array: Array[T], var next: Unrolled[T]) { - def this() = this(0, new Array[T](UnrolledBuffer.unrolledsize), null) + class Unrolled[T: ClassManifest] private[parallel] (var size: Int, var array: Array[T], var next: Unrolled[T], val buff: UnrolledBuffer[T] = null) { + private[parallel] def this() = this(0, new Array[T](unrolledlength), null, null) + private[parallel] def this(b: UnrolledBuffer[T]) = this(0, new Array[T](unrolledlength), null, b) + + private def nextlength = if (buff eq null) unrolledlength else buff.calcNextLength(array.length) // adds and returns itself or the new unrolled if full - @tailrec final def append(elem: T): Unrolled[T] = if (size < UnrolledBuffer.unrolledsize) { + @tailrec final def append(elem: T): Unrolled[T] = if (size < array.length) { array(size) = elem size += 1 this } else { - next = new Unrolled[T] + next = new Unrolled[T](0, new Array[T](nextlength), null, buff) next.append(elem) } def foreach[U](f: T => U) { @@ -200,7 +208,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] { } else { // allocate a new node and store element // then make it point to this - val newhead = new Unrolled[T] + val newhead = new Unrolled[T](buff) newhead.append(elem) newhead.next = this newhead @@ -244,7 +252,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] { @tailrec final def insertAll(idx: Int, t: Traversable[T], buffer: UnrolledBuffer[T]): Unit = if (idx < size) { // divide this node at the appropriate position and insert all into head // update new next - val newnextnode = new Unrolled[T] + val newnextnode = new Unrolled[T](0, new Array(array.length), null, buff) Array.copy(array, idx, newnextnode.array, 0, size - idx) newnextnode.size = size - idx newnextnode.next = next @@ -265,7 +273,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] { private def nullout(from: Int, until: Int) { var idx = from while (idx < until) { - array(idx) = null.asInstanceOf[T] // !! + array(idx) = null.asInstanceOf[T] // TODO find a way to assign a default here!! idx += 1 } } @@ -279,7 +287,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] { tryMergeWithNext() } - override def toString = array.take(size).mkString("Unrolled(", ", ", ")") + " -> " + (if (next ne null) next.toString else "") + override def toString = array.take(size).mkString("Unrolled[" + array.length + "](", ", ", ")") + " -> " + (if (next ne null) next.toString else "") } } diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index 58dce1aef4..a411a1cc44 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -20,7 +20,7 @@ import scala.collection.generic.GenericParMapCompanion import scala.collection.immutable.HashMap - +import annotation.unchecked.uncheckedVariance @@ -62,10 +62,24 @@ self => type SCPI = SignalContextPassingIterator[ParHashMapIterator] - class ParHashMapIterator(val triter: Iterator[(K, V)], val sz: Int) + class ParHashMapIterator(var triter: Iterator[(K, V @uncheckedVariance)], val sz: Int) extends super.ParIterator { self: SignalContextPassingIterator[ParHashMapIterator] => var i = 0 + def dup = triter match { + case t: HashMap.TrieIterator[_, _] => + val dupt = t.dupIterator.asInstanceOf[Iterator[(K, V)]] + dupFromIterator(dupt) + case _ => + val buff = triter.toBuffer + triter = buff.iterator + dupFromIterator(buff.iterator) + } + private def dupFromIterator(it: Iterator[(K, V @uncheckedVariance)]) = { + val phit = new ParHashMapIterator(it, sz) with SCPI + phit.i = i + phit + } def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match { case t: HashMap.TrieIterator[_, _] => val previousRemaining = remaining diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala index 747ed3eed3..0b1f9c5b7e 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala @@ -61,10 +61,24 @@ self => type SCPI = SignalContextPassingIterator[ParHashSetIterator] - class ParHashSetIterator(val triter: Iterator[T], val sz: Int) + class ParHashSetIterator(var triter: Iterator[T], val sz: Int) extends super.ParIterator { self: SignalContextPassingIterator[ParHashSetIterator] => var i = 0 + def dup = triter match { + case t: HashSet.TrieIterator[_] => + val dupt = t.dupIterator.asInstanceOf[Iterator[T]] + dupFromIterator(dupt) + case _ => + val buff = triter.toBuffer + triter = buff.iterator + dupFromIterator(buff.iterator) + } + private def dupFromIterator(it: Iterator[T]) = { + val phit = new ParHashSetIterator(it, sz) with SCPI + phit.i = i + phit + } def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match { case t: HashSet.TrieIterator[_] => val previousRemaining = remaining diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala index dbd5449362..ab5e509515 100644 --- a/src/library/scala/collection/parallel/immutable/ParRange.scala +++ b/src/library/scala/collection/parallel/immutable/ParRange.scala @@ -46,6 +46,8 @@ self => private def rangeleft = range.drop(ind) + def dup = new ParRangeIterator(rangeleft) with SCPI + def split = { val rleft = rangeleft val elemleft = rleft.length diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala index 9c6bbae8dd..142e554178 100644 --- a/src/library/scala/collection/parallel/immutable/package.scala +++ b/src/library/scala/collection/parallel/immutable/package.scala @@ -38,6 +38,7 @@ package object immutable { def remaining = until - i def hasNext = i < until def next = { i += 1; elem } + def dup = new ParIterator(i, until, elem) with SCPI def psplit(sizes: Int*) = { val incr = sizes.scanLeft(0)(_ + _) for ((start, end) <- incr.init zip incr.tail) yield new ParIterator(i + start, (i + end) min until, elem) with SCPI diff --git a/src/library/scala/collection/parallel/mutable/LazyCombiner.scala b/src/library/scala/collection/parallel/mutable/LazyCombiner.scala index bd17d24ea8..955698fdd4 100644 --- a/src/library/scala/collection/parallel/mutable/LazyCombiner.scala +++ b/src/library/scala/collection/parallel/mutable/LazyCombiner.scala @@ -21,7 +21,7 @@ import scala.collection.parallel.Combiner */ trait LazyCombiner[Elem, +To, Buff <: Growable[Elem] with Sizing] extends Combiner[Elem, To] { - self: collection.parallel.EnvironmentPassingCombiner[Elem, To] => +self: collection.parallel.EnvironmentPassingCombiner[Elem, To] => val chain: ArrayBuffer[Buff] val lastbuff = chain.last def +=(elem: Elem) = { lastbuff += elem; this } diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index 909b8eb5d7..8f70547a03 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -71,7 +71,7 @@ self => class ParArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array) extends super.ParIterator { - me: SignalContextPassingIterator[ParArrayIterator] => + me: SignalContextPassingIterator[ParArrayIterator] => def hasNext = i < until @@ -83,6 +83,8 @@ self => def remaining = until - i + def dup = new ParArrayIterator(i, until, arr) with SCPI + def psplit(sizesIncomplete: Int*): Seq[ParIterator] = { var traversed = i val total = sizesIncomplete.reduceLeft(_ + _) @@ -439,13 +441,25 @@ self => override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = { cb.sizeHint(remaining) - cb.ifIs[ParArrayCombiner[T]] { pac => + cb.ifIs[ResizableParArrayCombiner[T]] { + pac => + // with res. combiner: val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]] Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i) pac.lastbuff.setInternalSize(remaining) } otherwise { - copy2builder_quick(cb, arr, until, i) - i = until + cb.ifIs[UnrolledParArrayCombiner[T]] { + pac => + // with unr. combiner: + val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]] + Array.copy(arr, i, targetarr, 0, until - i) + pac.buff.size = pac.buff.size + until - i + pac.buff.lastPtr.size = until - i + pac + } otherwise { + copy2builder_quick(cb, arr, until, i) + i = until + } } cb } @@ -495,21 +509,35 @@ self => } override def reverse2combiner[U >: T, This](cb: Combiner[U, This]): Combiner[U, This] = { - cb.ifIs[ParArrayCombiner[T]] { pac => + cb.ifIs[ResizableParArrayCombiner[T]] { + pac => + // with res. combiner: val sz = remaining pac.sizeHint(sz) val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]] - reverse2combiner_quick(targetarr, arr, i, until) + reverse2combiner_quick(targetarr, arr, 0, i, until) pac.lastbuff.setInternalSize(sz) pac - } otherwise super.reverse2combiner(cb) + } otherwise { + cb.ifIs[UnrolledParArrayCombiner[T]] { + pac => + // with unr. combiner: + val sz = remaining + pac.sizeHint(sz) + val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]] + reverse2combiner_quick(targetarr, arr, 0, i, until) + pac.buff.size = pac.buff.size + sz + pac.buff.lastPtr.size = sz + pac + } otherwise super.reverse2combiner(cb) + } cb } - private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) { - var j = from - var k = ntil - from - 1 - while (j < ntil) { + private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], targfrom: Int, srcfrom: Int, srcuntil: Int) { + var j = srcfrom + var k = targfrom + srcuntil - srcfrom - 1 + while (j < srcuntil) { targ(k) = a(j) j += 1 k -= 1 @@ -553,23 +581,61 @@ self => (new ParArray[S](targarrseq)).asInstanceOf[That] } else super.map(f)(bf) - override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = if (buildsArray(cbf(repr))) { - // reserve an array - val targarrseq = new ArraySeq[U](length + 1) - val targetarr = targarrseq.array.asInstanceOf[Array[Any]] - targetarr(0) = z + override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = + if (tasksupport.parallelismLevel > 1 && buildsArray(cbf(repr))) { + // reserve an array + val targarrseq = new ArraySeq[U](length + 1) + val targetarr = targarrseq.array.asInstanceOf[Array[Any]] + targetarr(0) = z - // do a parallel prefix scan - executeAndWaitResult(asTask[That, Task[That, _]](new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator).mapResult(st => - executeAndWaitResult(asTask[That, Task[That, _]](new ScanWithScanTree[U, Any](Some(z), op, st, array, targetarr))) - ))) + // do a parallel prefix scan + if (length > 0) executeAndWaitResult(new CreateScanTree[U](0, size, z, op, parallelIterator) mapResult { + tree => executeAndWaitResult(new ScanToArray(tree, z, op, targetarr)) + }) - // wrap the array into a parallel array - (new ParArray[U](targarrseq)).asInstanceOf[That] - } else super.scan(z)(op)(cbf) + // wrap the array into a parallel array + (new ParArray[U](targarrseq)).asInstanceOf[That] + } else super.scan(z)(op)(cbf) /* tasks */ + class ScanToArray[U >: T](tree: ScanTree[U], z: U, op: (U, U) => U, targetarr: Array[Any]) + extends Task[Unit, ScanToArray[U]] { + var result = (); + def leaf(prev: Option[Unit]) = iterate(tree) + private def iterate(tree: ScanTree[U]): Unit = tree match { + case ScanNode(left, right) => + iterate(left) + iterate(right) + case ScanLeaf(_, _, from, len, Some(prev), _) => + scanLeaf(array, targetarr, from, len, prev.acc) + case ScanLeaf(_, _, from, len, None, _) => + scanLeaf(array, targetarr, from, len, z) + } + private def scanLeaf(srcarr: Array[Any], targetarr: Array[Any], from: Int, len: Int, startval: U) { + var i = from + val until = from + len + var curr = startval + val operation = op + while (i < until) { + curr = operation(curr, srcarr(i).asInstanceOf[U]) + i += 1 + targetarr(i) = curr + } + } + def split = tree match { + case ScanNode(left, right) => Seq( + new ScanToArray(left, z, op, targetarr), + new ScanToArray(right, z, op, targetarr) + ) + case _ => system.error("Can only split scan tree internal nodes.") + } + def shouldSplitFurther = tree match { + case ScanNode(_, _) => true + case _ => false + } + } + class Map[S](f: T => S, targetarr: Array[Any], offset: Int, howmany: Int) extends Task[Unit, Map[S]] { var result = (); def leaf(prev: Option[Unit]) = { diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala index 760f8b09ce..339f827aef 100644 --- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala +++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala @@ -9,14 +9,112 @@ import scala.collection.mutable.ArraySeq import scala.collection.mutable.ArrayBuffer import scala.collection.parallel.TaskSupport import scala.collection.parallel.EnvironmentPassingCombiner +import scala.collection.parallel.unsupportedop +import scala.collection.parallel.UnrolledBuffer +import scala.collection.parallel.UnrolledBuffer.Unrolled +import scala.collection.parallel.Combiner +private[mutable] class DoublingUnrolledBuffer[T](implicit m: ClassManifest[T]) extends UnrolledBuffer[T]()(m) { + override def calcNextLength(sz: Int) = if (sz < 10000) sz * 2 else sz + protected override def newUnrolled = new Unrolled[T](0, new Array[T](4), null, this) +} + + + +/** An array combiner that uses doubling unrolled buffers to store elements. */ +trait UnrolledParArrayCombiner[T] +extends Combiner[T, ParArray[T]] { +self: EnvironmentPassingCombiner[T, ParArray[T]] => + // because size is doubling, random access is O(logn)! + val buff = new DoublingUnrolledBuffer[Any] + + import tasksupport._ + + def +=(elem: T) = { + buff += elem + this + } + + def result = { + val arrayseq = new ArraySeq[T](size) + val array = arrayseq.array.asInstanceOf[Array[Any]] + + executeAndWaitResult(new CopyUnrolledToArray(array, 0, size)) + + new ParArray(arrayseq) + } + + def clear { + buff.clear + } + + override def sizeHint(sz: Int) = { + buff.lastPtr.next = new Unrolled(0, new Array[Any](sz), null, buff) + buff.lastPtr = buff.lastPtr.next + } + + def combine[N <: T, NewTo >: ParArray[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = other match { + case that if that eq this => this // just return this + case that: UnrolledParArrayCombiner[t] => + buff concat that.buff + this + case _ => unsupportedop("Cannot combine with combiner of different type.") + } + + def size = buff.size + + /* tasks */ + + class CopyUnrolledToArray(array: Array[Any], offset: Int, howmany: Int) + extends Task[Unit, CopyUnrolledToArray] { + var result = (); + def leaf(prev: Option[Unit]) = if (howmany > 0) { + var totalleft = howmany + val (startnode, startpos) = findStart(offset) + var curr = startnode + var pos = startpos + var arroffset = offset + while (totalleft > 0) { + val lefthere = math.min(totalleft, curr.size - pos) + Array.copy(curr.array, pos, array, arroffset, lefthere) + // println("from: " + arroffset + " elems " + lefthere + " - " + pos + ", " + curr + " -> " + array.toList + " by " + this + " !! " + buff.headPtr) + totalleft -= lefthere + arroffset += lefthere + pos = 0 + curr = curr.next + } + } + private def findStart(pos: Int) = { + var left = pos + var node = buff.headPtr + while ((left - node.size) >= 0) { + left -= node.size + node = node.next + } + (node, left) + } + def split = { + val fp = howmany / 2 + List(new CopyUnrolledToArray(array, offset, fp), new CopyUnrolledToArray(array, offset + fp, howmany - fp)) + } + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, parallelismLevel) + override def toString = "CopyUnrolledToArray(" + offset + ", " + howmany + ")" + } +} + + + +object UnrolledParArrayCombiner { + def apply[T](): UnrolledParArrayCombiner[T] = new UnrolledParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]] +} -trait ParArrayCombiner[T] +/** An array combiner that uses a chain of arraybuffers to store elements. */ +trait ResizableParArrayCombiner[T] extends LazyCombiner[T, ParArray[T], ExposedArrayBuffer[T]] { self: EnvironmentPassingCombiner[T, ParArray[T]] => @@ -24,7 +122,7 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] => override def sizeHint(sz: Int) = if (chain.length == 1) chain(0).sizeHint(sz) - def newLazyCombiner(c: ArrayBuffer[ExposedArrayBuffer[T]]) = ParArrayCombiner(c) + def newLazyCombiner(c: ArrayBuffer[ExposedArrayBuffer[T]]) = ResizableParArrayCombiner(c) def allocateAndCopy = if (chain.size > 1) { val arrayseq = new ArraySeq[T](size) @@ -38,7 +136,7 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] => pa } - override def toString = "ParArrayCombiner(" + size + "): " //+ chain + override def toString = "ResizableParArrayCombiner(" + size + "): " //+ chain /* tasks */ @@ -86,11 +184,11 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] => } -object ParArrayCombiner { - def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ParArrayCombiner[T] = { - new { val chain = c } with ParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]] +object ResizableParArrayCombiner { + def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ResizableParArrayCombiner[T] = { + new { val chain = c } with ResizableParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]] } - def apply[T](): ParArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T]) + def apply[T](): ResizableParArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T]) } diff --git a/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala b/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala index fc63d51b33..82c69b4c53 100644 --- a/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala +++ b/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala @@ -43,6 +43,7 @@ trait ParFlatHashTable[T] extends collection.mutable.FlatHashTable[T] { if (hasNext) scan() r } else Iterator.empty.next + def dup = newIterator(idx, until, totalsize) def split = if (remaining > 1) { val divpt = (until + idx) / 2 diff --git a/src/library/scala/collection/parallel/mutable/ParHashTable.scala b/src/library/scala/collection/parallel/mutable/ParHashTable.scala index efba6c8d9c..b238ce09de 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashTable.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashTable.scala @@ -67,6 +67,8 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m } } + def dup = newIterator(idx, until, totalsize, es) + def split: Seq[ParIterableIterator[T]] = if (remaining > 1) { if (until > idx) { // there is at least one more slot for the next iterator diff --git a/src/library/scala/collection/parallel/mutable/package.scala b/src/library/scala/collection/parallel/mutable/package.scala index 89544c8bdd..1efe79b00d 100644 --- a/src/library/scala/collection/parallel/mutable/package.scala +++ b/src/library/scala/collection/parallel/mutable/package.scala @@ -10,6 +10,11 @@ import scala.collection.generic.Sizing package object mutable { + /* aliases */ + + type ParArrayCombiner[T] = ResizableParArrayCombiner[T] + val ParArrayCombiner = ResizableParArrayCombiner + /* classes and traits */ private[mutable] trait SizeMapUtils { diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index 19ae9aef5d..6b3f3bf448 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -107,8 +107,10 @@ package object parallel { } implicit def throwable2ops(self: Throwable) = new ThrowableOps { - def alongWith(that: Throwable) = self match { - case ct: CompositeThrowable => new CompositeThrowable(ct.throwables + that) + def alongWith(that: Throwable) = (self, that) match { + case (self: CompositeThrowable, that: CompositeThrowable) => new CompositeThrowable(self.throwables ++ that.throwables) + case (self: CompositeThrowable, _) => new CompositeThrowable(self.throwables + that) + case (_, that: CompositeThrowable) => new CompositeThrowable(that.throwables + self) case _ => new CompositeThrowable(Set(self, that)) } } @@ -117,7 +119,7 @@ package object parallel { /** Composite throwable - thrown when multiple exceptions are thrown at the same time. */ final class CompositeThrowable(val throwables: Set[Throwable]) - extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.mkString(", ")) + extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.map(t => (t, t.getStackTrace.toList)).mkString(", ")) /** A helper iterator for iterating very small array buffers. @@ -133,6 +135,7 @@ package object parallel { r } def remaining = until - index + def dup = new BufferIterator(buffer, index, until, signalDelegate) def split: Seq[ParIterableIterator[T]] = if (remaining > 1) { val divsz = (until - index) / 2 Seq( diff --git a/test/benchmarks/source.list b/test/benchmarks/source.list index dffd5ef4ea..9382e996dc 100644 --- a/test/benchmarks/source.list +++ b/test/benchmarks/source.list @@ -35,6 +35,7 @@ src/scala/collection/parallel/benchmarks/parallel_array/ReducePrime.scala src/scala/collection/parallel/benchmarks/parallel_array/DropMany.scala src/scala/collection/parallel/benchmarks/parallel_array/ReduceList.scala src/scala/collection/parallel/benchmarks/parallel_array/ForeachLight.scala +src/scala/collection/parallel/benchmarks/parallel_array/ScanMedium.scala src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala src/scala/collection/parallel/benchmarks/parallel_array/SliceMedium.scala src/scala/collection/parallel/benchmarks/parallel_array/ReverseMap.scala diff --git a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala index 32c3ca154f..9eacc7ff1f 100644 --- a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala +++ b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala @@ -65,6 +65,7 @@ trait BenchmarkRegister { register(parallel_array.PadToDouble) register(parallel_array.AggregateLight) register(parallel_array.ScanLight) + register(parallel_array.ScanMedium) register(parallel_array.MatrixMultiplication) // parallel views diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala index b5dcfca872..f151158ad9 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala @@ -9,6 +9,8 @@ object ForeachHeavy extends Companion { override def comparisons = List("jsr") override def defaultSize = 2048 + @volatile var z = 0 + val fun = (a: Cont) => heavyOperation(a) val funjsr = new extra166y.Ops.Procedure[Cont] { def op(a: Cont) = heavyOperation(a) @@ -26,6 +28,7 @@ object ForeachHeavy extends Companion { if (n % i == 0) isPrime = false i += 1 } + if (isPrime && (n.toString == z)) z += 1 isPrime } } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanMedium.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanMedium.scala new file mode 100644 index 0000000000..73a237189f --- /dev/null +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanMedium.scala @@ -0,0 +1,55 @@ +package scala.collection.parallel.benchmarks.parallel_array + + +import scala.collection.parallel.benchmarks._ +import scala.collection.parallel.mutable.ParArray + + +object ScanMedium extends Companion { + def benchName = "scan-medium"; + def apply(sz: Int, parallelism: Int, what: String) = new ScanMedium(sz, parallelism, what) + override def comparisons = List("jsr") + override def defaultSize = 5000 + + val op = (a: Cont, b: Cont) => { + operation(a, b) + } + def operation(a: Cont, b: Cont) = { + val m = if (a.in < 0) 1 else 0 + val k = calc(a.in, b.in, m) + new Cont(a.in + b.in + k * m * (0 until 2).reduceLeft(_ + _)) + } + private def calc(x: Int, y: Int, n: Int) = { + var sum = x + for (i <- 0 until 500) { + sum += y + (if (sum % 2 == 0) n * x else y) + if (sum % 5 == 0) sum -= x * y - n * (x + y) + } + sum + } +} + + +class ScanMedium(sz: Int, p: Int, what: String) +extends Resettable[Cont](sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) { + def companion = ScanMedium + override def repetitionsPerRun = 50 + override val runs = 12 + + def runpar = pa.scan(new Cont(0))(ScanMedium.op) + def runseq = sequentialScan(new Cont(0), ScanMedium.op, sz) + def runjsr = jsrarr.cumulate(new extra166y.Ops.Reducer[Cont] { + def op(a: Cont, b: Cont) = ScanMedium.operation(a, b) + }, new Cont(0)) + override def comparisonMap = collection.Map("jsr" -> runjsr _) +} + + + + + + + + + + diff --git a/test/files/scalacheck/parallel-collections/IntOperators.scala b/test/files/scalacheck/parallel-collections/IntOperators.scala index 690ee34cca..24330d7670 100644 --- a/test/files/scalacheck/parallel-collections/IntOperators.scala +++ b/test/files/scalacheck/parallel-collections/IntOperators.scala @@ -49,7 +49,8 @@ trait IntOperators extends Operators[Int] { def foldArguments = List( (0, _ + _), (1, _ * _), - (Int.MinValue, math.max(_, _)) + (Int.MinValue, math.max(_, _)), + (Int.MaxValue, math.min(_, _)) ) def addAllTraversables = List( List[Int](), diff --git a/test/files/scalacheck/parallel-collections/Operators.scala b/test/files/scalacheck/parallel-collections/Operators.scala index 538cc23325..b4321cf805 100644 --- a/test/files/scalacheck/parallel-collections/Operators.scala +++ b/test/files/scalacheck/parallel-collections/Operators.scala @@ -32,4 +32,4 @@ trait SeqOperators[T] extends Operators[T] { def reverseMapFunctions: List[T => T] def sameElementsSeqs: List[Seq[T]] def startEndSeqs: List[Seq[T]] -} \ No newline at end of file +} diff --git a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala index 061bb08d9b..9299a201a1 100644 --- a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala @@ -64,23 +64,23 @@ with PairValues[Int, Int] } override def checkDataStructureInvariants(orig: Traversable[(Int, Int)], ds: AnyRef) = ds match { - case pm: ParHashMap[k, v] if 1 == 0 => // disabled this to make tests faster - val invs = pm.brokenInvariants - - val containsall = (for ((k, v) <- orig) yield { - if (pm.asInstanceOf[ParHashMap[Int, Int]].get(k) == Some(v)) true - else { - println("Does not contain original element: " + (k, v)) - false - } - }).foldLeft(true)(_ && _) - - - if (invs.isEmpty) containsall - else { - println("Invariants broken:\n" + invs.mkString("\n")) - false - } + // case pm: ParHashMap[k, v] if 1 == 0 => // disabled this to make tests faster + // val invs = pm.brokenInvariants + + // val containsall = (for ((k, v) <- orig) yield { + // if (pm.asInstanceOf[ParHashMap[Int, Int]].get(k) == Some(v)) true + // else { + // println("Does not contain original element: " + (k, v)) + // false + // } + // }).foldLeft(true)(_ && _) + + + // if (invs.isEmpty) containsall + // else { + // println("Invariants broken:\n" + invs.mkString("\n")) + // false + // } case _ => true } diff --git a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala index be70a7c7a3..8b41908a26 100644 --- a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala @@ -56,28 +56,28 @@ with IntValues } override def checkDataStructureInvariants(orig: Traversable[Int], ds: AnyRef) = ds match { - case pm: ParHashSet[t] => - // for an example of how not to write code proceed below - val invs = pm.brokenInvariants - - val containsall = (for (elem <- orig) yield { - if (pm.asInstanceOf[ParHashSet[Int]](elem) == true) true - else { - println("Does not contain original element: " + elem) - println(pm.hashTableContents.table.find(_ == elem)) - println(pm.hashTableContents.table.indexOf(elem)) - false - } - }).foldLeft(true)(_ && _) - - - if (invs.isEmpty) { - if (!containsall) println(pm.debugInformation) - containsall - } else { - println("Invariants broken:\n" + invs.mkString("\n")) - false - } + // case pm: ParHashSet[t] if 1 == 0 => + // // for an example of how not to write code proceed below + // val invs = pm.brokenInvariants + + // val containsall = (for (elem <- orig) yield { + // if (pm.asInstanceOf[ParHashSet[Int]](elem) == true) true + // else { + // println("Does not contain original element: " + elem) + // println(pm.hashTableContents.table.find(_ == elem)) + // println(pm.hashTableContents.table.indexOf(elem)) + // false + // } + // }).foldLeft(true)(_ && _) + + + // if (invs.isEmpty) { + // if (!containsall) println(pm.debugInformation) + // containsall + // } else { + // println("Invariants broken:\n" + invs.mkString("\n")) + // false + // } case _ => true } diff --git a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala index 8b5d72ea01..0dcd877ecb 100644 --- a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala @@ -398,6 +398,22 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col tarr.toSeq == collarr.toSeq } + if (hasStrictOrder) property("scans must be equal") = forAll(collectionPairs) { + case (t, coll) => + (for (((first, op), ind) <- foldArguments.zipWithIndex) yield { + val tscan = t.scanLeft(first)(op) + val cscan = coll.scan(first)(op) + if (tscan != cscan || cscan != tscan) { + println("from: " + t) + println("and: " + coll) + println("scans are: ") + println(tscan) + println(cscan) + } + ("operator " + ind) |: tscan == cscan && cscan == tscan + }).reduceLeft(_ && _) + } + } -- cgit v1.2.3