summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-09 10:08:20 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-09 10:08:20 +0000
commitf2ecbd04691b1914e2f77c60afc2b296aa6826ae (patch)
tree539b543eb173cfc7b0bbde4ca5f2c5bb187297df
parent492b22576f2ad46b300ce8dc31c5b672aaf517e4 (diff)
downloadscala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.tar.gz
scala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.tar.bz2
scala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.zip
Array combiners implementation changed from arr...
Array combiners implementation changed from array buffers to doubling unrolled buffers to avoid excessive copying. Still evaluating the benefits of this. No review.
-rw-r--r--src/library/scala/collection/immutable/HashMap.scala27
-rw-r--r--src/library/scala/collection/immutable/HashSet.scala27
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala238
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala2
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala48
-rw-r--r--src/library/scala/collection/parallel/UnrolledBuffer.scala30
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala18
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala16
-rw-r--r--src/library/scala/collection/parallel/immutable/ParRange.scala2
-rw-r--r--src/library/scala/collection/parallel/immutable/package.scala1
-rw-r--r--src/library/scala/collection/parallel/mutable/LazyCombiner.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArray.scala112
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala112
-rw-r--r--src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala1
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashTable.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/package.scala5
-rw-r--r--src/library/scala/collection/parallel/package.scala9
-rw-r--r--test/benchmarks/source.list1
-rw-r--r--test/benchmarks/src/scala/collection/parallel/Benchmarking.scala1
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala3
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanMedium.scala55
-rw-r--r--test/files/scalacheck/parallel-collections/IntOperators.scala3
-rw-r--r--test/files/scalacheck/parallel-collections/Operators.scala2
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala34
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala44
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala16
26 files changed, 567 insertions, 244 deletions
diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala
index 4a7a10c6c1..f6e17e9630 100644
--- a/src/library/scala/collection/immutable/HashMap.scala
+++ b/src/library/scala/collection/immutable/HashMap.scala
@@ -473,14 +473,25 @@ time { mNew.iterator.foreach( p => ()) }
}
class TrieIterator[A, +B](elems: Array[HashMap[A, B]]) extends Iterator[(A, B)] {
- private[this] var depth = 0
- private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6)
- private[this] var posStack = new Array[Int](6)
-
- private[this] var arrayD = elems
- private[this] var posD = 0
-
- private[this] var subIter: Iterator[(A, B)] = null // to traverse collision nodes
+ protected var depth = 0
+ protected var arrayStack: Array[Array[HashMap[A, B @uncheckedVariance]]] = new Array[Array[HashMap[A,B]]](6)
+ protected var posStack = new Array[Int](6)
+
+ protected var arrayD: Array[HashMap[A, B @uncheckedVariance]] = elems
+ protected var posD = 0
+
+ protected var subIter: Iterator[(A, B @uncheckedVariance)] = null // to traverse collision nodes
+
+ def dupIterator: TrieIterator[A, B] = {
+ val t = new TrieIterator(elems)
+ t.depth = depth
+ t.arrayStack = arrayStack
+ t.posStack = posStack
+ t.arrayD = arrayD
+ t.posD = posD
+ t.subIter = subIter
+ t
+ }
def hasNext = (subIter ne null) || depth >= 0
diff --git a/src/library/scala/collection/immutable/HashSet.scala b/src/library/scala/collection/immutable/HashSet.scala
index 8d52908349..720ea29639 100644
--- a/src/library/scala/collection/immutable/HashSet.scala
+++ b/src/library/scala/collection/immutable/HashSet.scala
@@ -285,14 +285,25 @@ time { mNew.iterator.foreach( p => ()) }
class TrieIterator[A](elems: Array[HashSet[A]]) extends Iterator[A] {
- private[this] var depth = 0
- private[this] var arrayStack = new Array[Array[HashSet[A]]](6)
- private[this] var posStack = new Array[Int](6)
-
- private[this] var arrayD = elems
- private[this] var posD = 0
-
- private[this] var subIter: Iterator[A] = null // to traverse collision nodes
+ protected var depth = 0
+ protected var arrayStack = new Array[Array[HashSet[A]]](6)
+ protected var posStack = new Array[Int](6)
+
+ protected var arrayD = elems
+ protected var posD = 0
+
+ protected var subIter: Iterator[A] = null // to traverse collision nodes
+
+ def dupIterator: TrieIterator[A] = {
+ val t = new TrieIterator(elems)
+ t.depth = depth
+ t.arrayStack = arrayStack
+ t.posStack = posStack
+ t.arrayD = arrayD
+ t.posD = posD
+ t.subIter = subIter
+ t
+ }
def hasNext = (subIter ne null) || depth >= 0
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index ed757655f5..d3e6eb42d4 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -4,7 +4,7 @@ package scala.collection.parallel
import scala.collection.mutable.Builder
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.ArrayBuffer
import scala.collection.IterableLike
import scala.collection.Parallel
import scala.collection.Parallelizable
@@ -577,15 +577,13 @@ self =>
*
* @return a new $coll containing the prefix scan of the elements in this $coll
*/
- def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[Repr, U, That]): That = {
- val array = new Array[Any](size + 1)
- array(0) = z
- executeAndWaitResult(new BuildScanTree[U, Any](z, op, 1, size, array, parallelIterator) mapResult { st =>
- executeAndWaitResult(new ScanWithScanTree[U, Any](Some(z), op, st, array, array) mapResult { u =>
- executeAndWaitResult(new FromArray(array, 0, size + 1, cbf) mapResult { _.result })
+ def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[Repr, U, That]): That = if (parallelismLevel > 1) {
+ if (size > 0) executeAndWaitResult(new CreateScanTree(0, size, z, op, parallelIterator) mapResult {
+ tree => executeAndWaitResult(new FromScanTree(tree, z, op, cbf) mapResult {
+ cb => cb.result
})
- })
- }
+ }) else (cbf(self.repr) += z).result
+ } else super.scanLeft(z)(op)(cbf)
/** Takes the longest prefix of elements that satisfy the predicate.
*
@@ -1099,154 +1097,117 @@ self =>
override def requiresStrictSplitters = true
}
- protected[this] class ScanTree[U >: T](val from: Int, val len: Int) {
- var value: U = _
- var left: ScanTree[U] = null
- var right: ScanTree[U] = null
- @volatile var chunkFinished = false
- var activeScan: () => Unit = null
-
- def isApplying = activeScan ne null
- def isLeaf = (left eq null) && (right eq null)
- def shouldApply = !chunkFinished && !isApplying
- def applyToInterval[A >: U](elem: U, op: (U, U) => U, array: Array[A]) = {
- //executeAndWait(new ApplyToArray(elem, op, from, len, array))
+ protected[this] class CreateScanTree[U >: T](from: Int, len: Int, z: U, op: (U, U) => U, protected[this] val pit: ParIterableIterator[T])
+ extends Transformer[ScanTree[U], CreateScanTree[U]] {
+ var result: ScanTree[U] = null
+ def leaf(prev: Option[ScanTree[U]]) = if (pit.remaining > 0) {
+ val trees = ArrayBuffer[ScanTree[U]]()
var i = from
val until = from + len
+ val blocksize = scanBlockSize
while (i < until) {
- array(i) = op(elem, array(i).asInstanceOf[U])
- i += 1
+ trees += scanBlock(i, math.min(blocksize, pit.remaining))
+ i += blocksize
}
+
+ // merge trees
+ result = mergeTrees(trees, 0, trees.length)
+ } else result = null // no elements to scan (merge will take care of `null`s)
+ private def scanBlock(from: Int, len: Int): ScanTree[U] = {
+ val pitdup = pit.dup
+ new ScanLeaf(pitdup, op, from, len, None, pit.reduceLeft(len, op))
}
- def scanInterval[A >: U](elem: U, op: (U, U) => U, srcA: Array[A], destA: Array[A]) = {
- val src = srcA.asInstanceOf[Array[Any]]
- val dest = destA.asInstanceOf[Array[Any]]
- var last = elem
- var i = from
- val until = from + len
- while (i < until) {
- last = op(last, src(i - 1).asInstanceOf[U])
- dest(i) = last
- i += 1
+ private def mergeTrees(trees: ArrayBuffer[ScanTree[U]], from: Int, howmany: Int): ScanTree[U] = if (howmany > 1) {
+ val half = howmany / 2
+ ScanNode(mergeTrees(trees, from, half), mergeTrees(trees, from + half, howmany - half))
+ } else trees(from)
+ protected[this] def newSubtask(pit: ParIterableIterator[T]) = unsupported
+ override def split = {
+ val pits = pit.split
+ for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield {
+ new CreateScanTree(untilp, p.remaining, z, op, p)
}
}
- def pushDown(v: U, op: (U, U) => U) {
- value = op(v, value)
- if (left ne null) left.pushDown(v, op)
- if (right ne null) right.pushDown(v, op)
- }
- def pushDownOnRight(v: U, op: (U, U) => U) = if (right ne null) right.pushDown(v, op)
- def printTree: Unit = printTree(0)
- private def printTree(t: Int): Unit = {
- for (i <- 0 until t) print(" ")
- if (isLeaf) print("(l) ")
- println(value + ": from " + from + " until " + (from + len))
- if (left ne null) left.printTree(t + 1)
- if (right ne null) right.printTree(t + 1)
- }
+ override def merge(that: CreateScanTree[U]) = if (this.result != null) {
+ if (that.result != null) result = ScanNode(result, that.result)
+ } else result = that.result
+ override def requiresStrictSplitters = true
}
- protected[this] class ApplyToArray[U >: T, A >: U](elem: U, op: (U, U) => U, from: Int, len: Int, array: Array[A])
- extends StrictSplitterCheckTask[Unit, ApplyToArray[U, A]] {
- var result: Unit = ()
- def leaf(prev: Option[Unit]) = {
- var i = from
- val until = from + len
- while (i < until) {
- array(i) = op(elem, array(i).asInstanceOf[U])
- i += 1
- }
+ protected[this] class FromScanTree[U >: T, That]
+ (tree: ScanTree[U], z: U, op: (U, U) => U, cbf: CanCombineFrom[Repr, U, That])
+ extends StrictSplitterCheckTask[Combiner[U, That], FromScanTree[U, That]] {
+ var result: Combiner[U, That] = null
+ def leaf(prev: Option[Combiner[U, That]]) {
+ val cb = reuse(prev, cbf(self.repr))
+ iterate(tree, cb)
+ result = cb
}
- def shouldSplitFurther = len > threshold(size, parallelismLevel min availableProcessors)
- def split = {
- val fp = len / 2
- val sp = len - fp
- Seq(
- new ApplyToArray(elem, op, from, fp, array),
- new ApplyToArray(elem, op, from + fp, sp, array)
+ private def iterate(tree: ScanTree[U], cb: Combiner[U, That]): Unit = tree match {
+ case ScanNode(left, right) =>
+ iterate(left, cb)
+ iterate(right, cb)
+ case ScanLeaf(p, _, _, len, Some(prev), _) =>
+ p.scanToCombiner(len, prev.acc, op, cb)
+ case ScanLeaf(p, _, _, len, None, _) =>
+ cb += z
+ p.scanToCombiner(len, z, op, cb)
+ }
+ def split = tree match {
+ case ScanNode(left, right) => Seq(
+ new FromScanTree(left, z, op, cbf),
+ new FromScanTree(right, z, op, cbf)
)
+ case _ => unsupportedop("Cannot be split further")
}
+ def shouldSplitFurther = tree match {
+ case ScanNode(_, _) => true
+ case ScanLeaf(_, _, _, _, _, _) => false
+ }
+ override def merge(that: FromScanTree[U, That]) = result = result combine that.result
}
- protected[this] class BuildScanTree[U >: T, A >: U](z: U, op: (U, U) => U, val from: Int, val len: Int, array: Array[A], protected[this] val pit: ParIterableIterator[T])
- extends Accessor[ScanTree[U], BuildScanTree[U, A]] {
- // TODO reimplement - there are some issues here
- var result: ScanTree[U] = null
- def leaf(prev: Option[ScanTree[U]]) = if ((prev != None && prev.get.chunkFinished) || from == 1) {
- val prevElem = if (from == 1) z else prev.get.value
- result = new ScanTree[U](from, len)
- pit.scanToArray(prevElem, op, array, from)
- result.value = array(from + len - 1).asInstanceOf[U]
- result.chunkFinished = true
- } else {
- result = new ScanTree[U](from, len)
- result.value = pit.fold(z)(op)
- }
- protected[this] def newSubtask(p: ParIterableIterator[T]) = unsupported
- override def split = {
- val pits = pit.split
- for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining); if untilp < len) yield {
- val plen = p.remaining min (len - untilp)
- new BuildScanTree[U, A](z, op, from + untilp, plen, array, p)
- }
- }
- override def merge(that: BuildScanTree[U, A]) = {
- // create scan tree node
- val left = result
- val right = that.result
- val ns = new ScanTree[U](left.from, left.len + right.len)
- ns.left = left
- ns.right = right
- ns.value = op(left.value, right.value)
- ns.pushDownOnRight(left.value, op)
-
- // set result
- result = ns
- }
- override def requiresStrictSplitters = true
+ /* scan tree */
+
+ protected[this] def scanBlockSize = (threshold(size, parallelismLevel) / 2) max 1
+
+ protected[this] trait ScanTree[U >: T] {
+ def beginsAt: Int
+ def pushdown(v: U): Unit
+ def leftmost: ScanLeaf[U]
+ def rightmost: ScanLeaf[U]
+ def print(depth: Int = 0): Unit
}
- protected[this] class ScanWithScanTree[U >: T, A >: U](first: Option[U], op: (U, U) => U, st: ScanTree[U], src: Array[A], dest: Array[A])
- extends StrictSplitterCheckTask[Unit, ScanWithScanTree[U, A]] {
- var result = ();
- def leaf(prev: Option[Unit]) = scan(st, first.get)
- private def scan(st: ScanTree[U], elem: U): Unit = if (!st.chunkFinished) {
- if (st.isLeaf) st.scanInterval(elem, op, src, dest)
- else {
- scan(st.left, elem)
- scan(st.right, st.left.value)
- }
+ protected[this] case class ScanNode[U >: T](left: ScanTree[U], right: ScanTree[U]) extends ScanTree[U] {
+ right.pushdown(left.rightmost.acc)
+ right.leftmost.prev = Some(left.rightmost)
+
+ val leftmost = left.leftmost
+ val rightmost = right.rightmost
+
+ def beginsAt = left.beginsAt
+ def pushdown(v: U) {
+ left.pushdown(v)
+ right.pushdown(v)
+ }
+ def print(depth: Int) {
+ println((" " * depth) + "ScanNode, begins at " + beginsAt)
+ left.print(depth + 1)
+ right.print(depth + 1)
}
- def split = collection.mutable.ArrayBuffer(
- new ScanWithScanTree(first, op, st.left, src, dest),
- new ScanWithScanTree(Some(st.left.value), op, st.right, src, dest)
- )
- def shouldSplitFurther = (st.left ne null) && (st.right ne null)
}
- protected[this] class FromArray[S, A, That](array: Array[A], from: Int, len: Int, cbf: CanCombineFrom[Repr, S, That])
- extends StrictSplitterCheckTask[Combiner[S, That], FromArray[S, A, That]] {
- var result: Result = null
- def leaf(prev: Option[Result]) = {
- val cb = prev getOrElse cbf(self.repr)
- var i = from
- val until = from + len
- while (i < until) {
- cb += array(i).asInstanceOf[S]
- i += 1
- }
- result = cb
- }
- def shouldSplitFurther = len > threshold(size, parallelismLevel)
- def split = {
- val fp = len / 2
- val sp = len - fp
- Seq(
- new FromArray(array, from, fp, cbf),
- new FromArray(array, from + fp, sp, cbf)
- )
+ protected[this] case class ScanLeaf[U >: T]
+ (pit: ParIterableIterator[U], op: (U, U) => U, from: Int, len: Int, var prev: Option[ScanLeaf[U]], var acc: U)
+ extends ScanTree[U] {
+ def beginsAt = from
+ def pushdown(v: U) = {
+ acc = op(v, acc)
}
- override def merge(that: FromArray[S, A, That]) = result = result combine that.result
+ def leftmost = this
+ def rightmost = this
+ def print(depth: Int) = println((" " * depth) + this)
}
/* debug information */
@@ -1255,7 +1216,8 @@ self =>
private[parallel] def brokenInvariants = Seq[String]()
- private val debugBuffer = collection.mutable.ArrayBuffer[String]()
+ // private val dbbuff = ArrayBuffer[String]()
+ def debugBuffer: ArrayBuffer[String] = null // dbbuff
private[parallel] def debugclear() = synchronized {
debugBuffer.clear
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index 0ea33d0e39..91e15fa946 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -102,6 +102,8 @@ self =>
final def remaining = end - i
+ def dup = new Elements(i, end) with SignalContextPassingIterator[ParIterator]
+
def split = psplit(remaining / 2, remaining - remaining / 2)
def psplit(sizes: Int*) = {
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index a33702a527..3363186b7d 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -87,6 +87,16 @@ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] {
}
}
+ def reduceLeft[U >: T](howmany: Int, op: (U, U) => U): U = {
+ var i = howmany - 1
+ var u: U = next
+ while (i > 0 && hasNext) {
+ u = op(u, next)
+ i -= 1
+ }
+ u
+ }
+
/* transformers to combiners */
def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
@@ -165,7 +175,7 @@ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] {
def slice2combiner[U >: T, This](from: Int, until: Int, cb: Combiner[U, This]): Combiner[U, This] = {
drop(from)
- var left = until - from
+ var left = math.max(until - from, 0)
cb.sizeHint(left)
while (left > 0) {
cb += next
@@ -221,6 +231,26 @@ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] {
}
}
+ def scanToCombiner[U >: T, That](startValue: U, op: (U, U) => U, cb: Combiner[U, That]) = {
+ var curr = startValue
+ while (hasNext) {
+ curr = op(curr, next)
+ cb += curr
+ }
+ cb
+ }
+
+ def scanToCombiner[U >: T, That](howmany: Int, startValue: U, op: (U, U) => U, cb: Combiner[U, That]) = {
+ var curr = startValue
+ var left = howmany
+ while (left > 0) {
+ curr = op(curr, next)
+ cb += curr
+ left -= 1
+ }
+ cb
+ }
+
def zip2combiner[U >: T, S, That](otherpit: RemainsIterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
cb.sizeHint(remaining min otherpit.remaining)
while (hasNext && otherpit.hasNext) {
@@ -336,6 +366,9 @@ extends AugmentedIterableIterator[T]
{
self =>
+ /** Creates a copy of this iterator. */
+ def dup: ParIterableIterator[T]
+
def split: Seq[ParIterableIterator[T]]
/** The number of elements this iterator has yet to traverse. This method
@@ -377,6 +410,7 @@ self =>
var remaining = taken min self.remaining
def hasNext = remaining > 0
def next = { remaining -= 1; self.next }
+ def dup: ParIterableIterator[T] = self.dup.take(taken)
def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) }
protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = {
val sizes = sq.scanLeft(0)(_ + _.remaining)
@@ -400,6 +434,7 @@ self =>
def hasNext = self.hasNext
def next = f(self.next)
def remaining = self.remaining
+ def dup: ParIterableIterator[S] = self.dup map f
def split: Seq[ParIterableIterator[S]] = self.split.map { _ map f }
}
@@ -418,6 +453,7 @@ self =>
} else curr.next
def remaining = if (curr eq self) curr.remaining + that.remaining else curr.remaining
protected def firstNonEmpty = (curr eq self) && curr.hasNext
+ def dup: ParIterableIterator[U] = self.dup.appendParIterable[U, PI](that)
def split: Seq[ParIterableIterator[U]] = if (firstNonEmpty) Seq(curr, that) else curr.split
}
@@ -428,6 +464,7 @@ self =>
def hasNext = self.hasNext && that.hasNext
def next = (self.next, that.next)
def remaining = self.remaining min that.remaining
+ def dup: ParIterableIterator[(T, S)] = self.dup.zipParSeq(that)
def split: Seq[ParIterableIterator[(T, S)]] = {
val selfs = self.split
val sizes = selfs.map(_.remaining)
@@ -447,6 +484,7 @@ self =>
else (self.next, thatelem)
} else (thiselem, that.next);
def remaining = self.remaining max that.remaining
+ def dup: ParIterableIterator[(U, S)] = self.dup.zipAllParSeq(that, thiselem, thatelem)
def split: Seq[ParIterableIterator[(U, S)]] = {
val selfrem = self.remaining
val thatrem = that.remaining
@@ -468,6 +506,8 @@ extends ParIterableIterator[T]
with PreciseSplitter[T]
{
self =>
+ def dup: ParSeqIterator[T]
+
def split: Seq[ParSeqIterator[T]]
def psplit(sizes: Int*): Seq[ParSeqIterator[T]]
@@ -483,6 +523,7 @@ self =>
/* iterator transformers */
class Taken(tk: Int) extends super.Taken(tk) with ParSeqIterator[T] {
+ override def dup = super.dup.asInstanceOf[ParSeqIterator[T]]
override def split: Seq[ParSeqIterator[T]] = super.split.asInstanceOf[Seq[ParSeqIterator[T]]]
def psplit(sizes: Int*): Seq[ParSeqIterator[T]] = takeSeq(self.psplit(sizes: _*)) { (p, n) => p.take(n) }
}
@@ -497,6 +538,7 @@ self =>
}
class Mapped[S](f: T => S) extends super.Mapped[S](f) with ParSeqIterator[S] {
+ override def dup = super.dup.asInstanceOf[ParSeqIterator[S]]
override def split: Seq[ParSeqIterator[S]] = super.split.asInstanceOf[Seq[ParSeqIterator[S]]]
def psplit(sizes: Int*): Seq[ParSeqIterator[S]] = self.psplit(sizes: _*).map { _ map f }
}
@@ -504,6 +546,7 @@ self =>
override def map[S](f: T => S) = new Mapped(f)
class Appended[U >: T, PI <: ParSeqIterator[U]](it: PI) extends super.Appended[U, PI](it) with ParSeqIterator[U] {
+ override def dup = super.dup.asInstanceOf[ParSeqIterator[U]]
override def split: Seq[ParSeqIterator[U]] = super.split.asInstanceOf[Seq[ParSeqIterator[U]]]
def psplit(sizes: Int*): Seq[ParSeqIterator[U]] = if (firstNonEmpty) {
val selfrem = self.remaining
@@ -534,6 +577,7 @@ self =>
def appendParSeq[U >: T, PI <: ParSeqIterator[U]](that: PI) = new Appended[U, PI](that)
class Zipped[S](ti: ParSeqIterator[S]) extends super.Zipped[S](ti) with ParSeqIterator[(T, S)] {
+ override def dup = super.dup.asInstanceOf[ParSeqIterator[(T, S)]]
override def split: Seq[ParSeqIterator[(T, S)]] = super.split.asInstanceOf[Seq[ParSeqIterator[(T, S)]]]
def psplit(szs: Int*) = (self.psplit(szs: _*) zip that.psplit(szs: _*)) map { p => p._1 zipParSeq p._2 }
}
@@ -541,6 +585,7 @@ self =>
override def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that)
class ZippedAll[U >: T, S](ti: ParSeqIterator[S], thise: U, thate: S) extends super.ZippedAll[U, S](ti, thise, thate) with ParSeqIterator[(U, S)] {
+ override def dup = super.dup.asInstanceOf[ParSeqIterator[(U, S)]]
private def patchem = {
val selfrem = self.remaining
val thatrem = that.remaining
@@ -578,6 +623,7 @@ self =>
def hasNext = trio.hasNext
def next = trio.next
def remaining = trio.remaining
+ def dup = self.dup.patchParSeq(from, patch, replaced)
def split = trio.split
def psplit(sizes: Int*) = trio.psplit(sizes: _*)
}
diff --git a/src/library/scala/collection/parallel/UnrolledBuffer.scala b/src/library/scala/collection/parallel/UnrolledBuffer.scala
index d29c24822c..c7a8b388bd 100644
--- a/src/library/scala/collection/parallel/UnrolledBuffer.scala
+++ b/src/library/scala/collection/parallel/UnrolledBuffer.scala
@@ -46,7 +46,7 @@ extends collection.mutable.Buffer[T]
{
import UnrolledBuffer.Unrolled
- private var headptr = new Unrolled[T]
+ private var headptr = newUnrolled
private var lastptr = headptr
private var sz = 0
@@ -54,9 +54,14 @@ extends collection.mutable.Buffer[T]
private[parallel] def headPtr_=(head: Unrolled[T]) = headptr = head
private[parallel] def lastPtr = lastptr
private[parallel] def lastPtr_=(last: Unrolled[T]) = lastptr = last
+ private[parallel] def size_=(s: Int) = sz = s
protected[this] override def newBuilder = new UnrolledBuffer[T]
+ protected def newUnrolled = new Unrolled[T](this)
+
+ private[collection] def calcNextLength(sz: Int) = sz
+
def classManifestCompanion = UnrolledBuffer
def concat(that: UnrolledBuffer[T]) = {
@@ -82,7 +87,7 @@ extends collection.mutable.Buffer[T]
}
def clear() {
- headptr = new Unrolled[T]
+ headptr = newUnrolled
lastptr = headptr
sz = 0
}
@@ -153,20 +158,23 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
val waterline = 50
val waterlineDelim = 100
- private[parallel] val unrolledsize = 32
+ private[parallel] val unrolledlength = 32
/** Unrolled buffer node.
*/
- class Unrolled[T: ClassManifest] private (var size: Int, var array: Array[T], var next: Unrolled[T]) {
- def this() = this(0, new Array[T](UnrolledBuffer.unrolledsize), null)
+ class Unrolled[T: ClassManifest] private[parallel] (var size: Int, var array: Array[T], var next: Unrolled[T], val buff: UnrolledBuffer[T] = null) {
+ private[parallel] def this() = this(0, new Array[T](unrolledlength), null, null)
+ private[parallel] def this(b: UnrolledBuffer[T]) = this(0, new Array[T](unrolledlength), null, b)
+
+ private def nextlength = if (buff eq null) unrolledlength else buff.calcNextLength(array.length)
// adds and returns itself or the new unrolled if full
- @tailrec final def append(elem: T): Unrolled[T] = if (size < UnrolledBuffer.unrolledsize) {
+ @tailrec final def append(elem: T): Unrolled[T] = if (size < array.length) {
array(size) = elem
size += 1
this
} else {
- next = new Unrolled[T]
+ next = new Unrolled[T](0, new Array[T](nextlength), null, buff)
next.append(elem)
}
def foreach[U](f: T => U) {
@@ -200,7 +208,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
} else {
// allocate a new node and store element
// then make it point to this
- val newhead = new Unrolled[T]
+ val newhead = new Unrolled[T](buff)
newhead.append(elem)
newhead.next = this
newhead
@@ -244,7 +252,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
@tailrec final def insertAll(idx: Int, t: Traversable[T], buffer: UnrolledBuffer[T]): Unit = if (idx < size) {
// divide this node at the appropriate position and insert all into head
// update new next
- val newnextnode = new Unrolled[T]
+ val newnextnode = new Unrolled[T](0, new Array(array.length), null, buff)
Array.copy(array, idx, newnextnode.array, 0, size - idx)
newnextnode.size = size - idx
newnextnode.next = next
@@ -265,7 +273,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
private def nullout(from: Int, until: Int) {
var idx = from
while (idx < until) {
- array(idx) = null.asInstanceOf[T] // !!
+ array(idx) = null.asInstanceOf[T] // TODO find a way to assign a default here!!
idx += 1
}
}
@@ -279,7 +287,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
tryMergeWithNext()
}
- override def toString = array.take(size).mkString("Unrolled(", ", ", ")") + " -> " + (if (next ne null) next.toString else "")
+ override def toString = array.take(size).mkString("Unrolled[" + array.length + "](", ", ", ")") + " -> " + (if (next ne null) next.toString else "")
}
}
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 58dce1aef4..a411a1cc44 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -20,7 +20,7 @@ import scala.collection.generic.GenericParMapCompanion
import scala.collection.immutable.HashMap
-
+import annotation.unchecked.uncheckedVariance
@@ -62,10 +62,24 @@ self =>
type SCPI = SignalContextPassingIterator[ParHashMapIterator]
- class ParHashMapIterator(val triter: Iterator[(K, V)], val sz: Int)
+ class ParHashMapIterator(var triter: Iterator[(K, V @uncheckedVariance)], val sz: Int)
extends super.ParIterator {
self: SignalContextPassingIterator[ParHashMapIterator] =>
var i = 0
+ def dup = triter match {
+ case t: HashMap.TrieIterator[_, _] =>
+ val dupt = t.dupIterator.asInstanceOf[Iterator[(K, V)]]
+ dupFromIterator(dupt)
+ case _ =>
+ val buff = triter.toBuffer
+ triter = buff.iterator
+ dupFromIterator(buff.iterator)
+ }
+ private def dupFromIterator(it: Iterator[(K, V @uncheckedVariance)]) = {
+ val phit = new ParHashMapIterator(it, sz) with SCPI
+ phit.i = i
+ phit
+ }
def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match {
case t: HashMap.TrieIterator[_, _] =>
val previousRemaining = remaining
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
index 747ed3eed3..0b1f9c5b7e 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -61,10 +61,24 @@ self =>
type SCPI = SignalContextPassingIterator[ParHashSetIterator]
- class ParHashSetIterator(val triter: Iterator[T], val sz: Int)
+ class ParHashSetIterator(var triter: Iterator[T], val sz: Int)
extends super.ParIterator {
self: SignalContextPassingIterator[ParHashSetIterator] =>
var i = 0
+ def dup = triter match {
+ case t: HashSet.TrieIterator[_] =>
+ val dupt = t.dupIterator.asInstanceOf[Iterator[T]]
+ dupFromIterator(dupt)
+ case _ =>
+ val buff = triter.toBuffer
+ triter = buff.iterator
+ dupFromIterator(buff.iterator)
+ }
+ private def dupFromIterator(it: Iterator[T]) = {
+ val phit = new ParHashSetIterator(it, sz) with SCPI
+ phit.i = i
+ phit
+ }
def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match {
case t: HashSet.TrieIterator[_] =>
val previousRemaining = remaining
diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala
index dbd5449362..ab5e509515 100644
--- a/src/library/scala/collection/parallel/immutable/ParRange.scala
+++ b/src/library/scala/collection/parallel/immutable/ParRange.scala
@@ -46,6 +46,8 @@ self =>
private def rangeleft = range.drop(ind)
+ def dup = new ParRangeIterator(rangeleft) with SCPI
+
def split = {
val rleft = rangeleft
val elemleft = rleft.length
diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala
index 9c6bbae8dd..142e554178 100644
--- a/src/library/scala/collection/parallel/immutable/package.scala
+++ b/src/library/scala/collection/parallel/immutable/package.scala
@@ -38,6 +38,7 @@ package object immutable {
def remaining = until - i
def hasNext = i < until
def next = { i += 1; elem }
+ def dup = new ParIterator(i, until, elem) with SCPI
def psplit(sizes: Int*) = {
val incr = sizes.scanLeft(0)(_ + _)
for ((start, end) <- incr.init zip incr.tail) yield new ParIterator(i + start, (i + end) min until, elem) with SCPI
diff --git a/src/library/scala/collection/parallel/mutable/LazyCombiner.scala b/src/library/scala/collection/parallel/mutable/LazyCombiner.scala
index bd17d24ea8..955698fdd4 100644
--- a/src/library/scala/collection/parallel/mutable/LazyCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/LazyCombiner.scala
@@ -21,7 +21,7 @@ import scala.collection.parallel.Combiner
*/
trait LazyCombiner[Elem, +To, Buff <: Growable[Elem] with Sizing] extends Combiner[Elem, To]
{
- self: collection.parallel.EnvironmentPassingCombiner[Elem, To] =>
+self: collection.parallel.EnvironmentPassingCombiner[Elem, To] =>
val chain: ArrayBuffer[Buff]
val lastbuff = chain.last
def +=(elem: Elem) = { lastbuff += elem; this }
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
index 909b8eb5d7..8f70547a03 100644
--- a/src/library/scala/collection/parallel/mutable/ParArray.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -71,7 +71,7 @@ self =>
class ParArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array)
extends super.ParIterator {
- me: SignalContextPassingIterator[ParArrayIterator] =>
+ me: SignalContextPassingIterator[ParArrayIterator] =>
def hasNext = i < until
@@ -83,6 +83,8 @@ self =>
def remaining = until - i
+ def dup = new ParArrayIterator(i, until, arr) with SCPI
+
def psplit(sizesIncomplete: Int*): Seq[ParIterator] = {
var traversed = i
val total = sizesIncomplete.reduceLeft(_ + _)
@@ -439,13 +441,25 @@ self =>
override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = {
cb.sizeHint(remaining)
- cb.ifIs[ParArrayCombiner[T]] { pac =>
+ cb.ifIs[ResizableParArrayCombiner[T]] {
+ pac =>
+ // with res. combiner:
val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i)
pac.lastbuff.setInternalSize(remaining)
} otherwise {
- copy2builder_quick(cb, arr, until, i)
- i = until
+ cb.ifIs[UnrolledParArrayCombiner[T]] {
+ pac =>
+ // with unr. combiner:
+ val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]]
+ Array.copy(arr, i, targetarr, 0, until - i)
+ pac.buff.size = pac.buff.size + until - i
+ pac.buff.lastPtr.size = until - i
+ pac
+ } otherwise {
+ copy2builder_quick(cb, arr, until, i)
+ i = until
+ }
}
cb
}
@@ -495,21 +509,35 @@ self =>
}
override def reverse2combiner[U >: T, This](cb: Combiner[U, This]): Combiner[U, This] = {
- cb.ifIs[ParArrayCombiner[T]] { pac =>
+ cb.ifIs[ResizableParArrayCombiner[T]] {
+ pac =>
+ // with res. combiner:
val sz = remaining
pac.sizeHint(sz)
val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
- reverse2combiner_quick(targetarr, arr, i, until)
+ reverse2combiner_quick(targetarr, arr, 0, i, until)
pac.lastbuff.setInternalSize(sz)
pac
- } otherwise super.reverse2combiner(cb)
+ } otherwise {
+ cb.ifIs[UnrolledParArrayCombiner[T]] {
+ pac =>
+ // with unr. combiner:
+ val sz = remaining
+ pac.sizeHint(sz)
+ val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]]
+ reverse2combiner_quick(targetarr, arr, 0, i, until)
+ pac.buff.size = pac.buff.size + sz
+ pac.buff.lastPtr.size = sz
+ pac
+ } otherwise super.reverse2combiner(cb)
+ }
cb
}
- private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) {
- var j = from
- var k = ntil - from - 1
- while (j < ntil) {
+ private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], targfrom: Int, srcfrom: Int, srcuntil: Int) {
+ var j = srcfrom
+ var k = targfrom + srcuntil - srcfrom - 1
+ while (j < srcuntil) {
targ(k) = a(j)
j += 1
k -= 1
@@ -553,23 +581,61 @@ self =>
(new ParArray[S](targarrseq)).asInstanceOf[That]
} else super.map(f)(bf)
- override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = if (buildsArray(cbf(repr))) {
- // reserve an array
- val targarrseq = new ArraySeq[U](length + 1)
- val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
- targetarr(0) = z
+ override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That =
+ if (tasksupport.parallelismLevel > 1 && buildsArray(cbf(repr))) {
+ // reserve an array
+ val targarrseq = new ArraySeq[U](length + 1)
+ val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
+ targetarr(0) = z
- // do a parallel prefix scan
- executeAndWaitResult(asTask[That, Task[That, _]](new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator).mapResult(st =>
- executeAndWaitResult(asTask[That, Task[That, _]](new ScanWithScanTree[U, Any](Some(z), op, st, array, targetarr)))
- )))
+ // do a parallel prefix scan
+ if (length > 0) executeAndWaitResult(new CreateScanTree[U](0, size, z, op, parallelIterator) mapResult {
+ tree => executeAndWaitResult(new ScanToArray(tree, z, op, targetarr))
+ })
- // wrap the array into a parallel array
- (new ParArray[U](targarrseq)).asInstanceOf[That]
- } else super.scan(z)(op)(cbf)
+ // wrap the array into a parallel array
+ (new ParArray[U](targarrseq)).asInstanceOf[That]
+ } else super.scan(z)(op)(cbf)
/* tasks */
+ class ScanToArray[U >: T](tree: ScanTree[U], z: U, op: (U, U) => U, targetarr: Array[Any])
+ extends Task[Unit, ScanToArray[U]] {
+ var result = ();
+ def leaf(prev: Option[Unit]) = iterate(tree)
+ private def iterate(tree: ScanTree[U]): Unit = tree match {
+ case ScanNode(left, right) =>
+ iterate(left)
+ iterate(right)
+ case ScanLeaf(_, _, from, len, Some(prev), _) =>
+ scanLeaf(array, targetarr, from, len, prev.acc)
+ case ScanLeaf(_, _, from, len, None, _) =>
+ scanLeaf(array, targetarr, from, len, z)
+ }
+ private def scanLeaf(srcarr: Array[Any], targetarr: Array[Any], from: Int, len: Int, startval: U) {
+ var i = from
+ val until = from + len
+ var curr = startval
+ val operation = op
+ while (i < until) {
+ curr = operation(curr, srcarr(i).asInstanceOf[U])
+ i += 1
+ targetarr(i) = curr
+ }
+ }
+ def split = tree match {
+ case ScanNode(left, right) => Seq(
+ new ScanToArray(left, z, op, targetarr),
+ new ScanToArray(right, z, op, targetarr)
+ )
+ case _ => system.error("Can only split scan tree internal nodes.")
+ }
+ def shouldSplitFurther = tree match {
+ case ScanNode(_, _) => true
+ case _ => false
+ }
+ }
+
class Map[S](f: T => S, targetarr: Array[Any], offset: Int, howmany: Int) extends Task[Unit, Map[S]] {
var result = ();
def leaf(prev: Option[Unit]) = {
diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
index 760f8b09ce..339f827aef 100644
--- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
@@ -9,14 +9,112 @@ import scala.collection.mutable.ArraySeq
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.TaskSupport
import scala.collection.parallel.EnvironmentPassingCombiner
+import scala.collection.parallel.unsupportedop
+import scala.collection.parallel.UnrolledBuffer
+import scala.collection.parallel.UnrolledBuffer.Unrolled
+import scala.collection.parallel.Combiner
+private[mutable] class DoublingUnrolledBuffer[T](implicit m: ClassManifest[T]) extends UnrolledBuffer[T]()(m) {
+ override def calcNextLength(sz: Int) = if (sz < 10000) sz * 2 else sz
+ protected override def newUnrolled = new Unrolled[T](0, new Array[T](4), null, this)
+}
+
+
+
+/** An array combiner that uses doubling unrolled buffers to store elements. */
+trait UnrolledParArrayCombiner[T]
+extends Combiner[T, ParArray[T]] {
+self: EnvironmentPassingCombiner[T, ParArray[T]] =>
+ // because size is doubling, random access is O(logn)!
+ val buff = new DoublingUnrolledBuffer[Any]
+
+ import tasksupport._
+
+ def +=(elem: T) = {
+ buff += elem
+ this
+ }
+
+ def result = {
+ val arrayseq = new ArraySeq[T](size)
+ val array = arrayseq.array.asInstanceOf[Array[Any]]
+
+ executeAndWaitResult(new CopyUnrolledToArray(array, 0, size))
+
+ new ParArray(arrayseq)
+ }
+
+ def clear {
+ buff.clear
+ }
+
+ override def sizeHint(sz: Int) = {
+ buff.lastPtr.next = new Unrolled(0, new Array[Any](sz), null, buff)
+ buff.lastPtr = buff.lastPtr.next
+ }
+
+ def combine[N <: T, NewTo >: ParArray[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = other match {
+ case that if that eq this => this // just return this
+ case that: UnrolledParArrayCombiner[t] =>
+ buff concat that.buff
+ this
+ case _ => unsupportedop("Cannot combine with combiner of different type.")
+ }
+
+ def size = buff.size
+
+ /* tasks */
+
+ class CopyUnrolledToArray(array: Array[Any], offset: Int, howmany: Int)
+ extends Task[Unit, CopyUnrolledToArray] {
+ var result = ();
+ def leaf(prev: Option[Unit]) = if (howmany > 0) {
+ var totalleft = howmany
+ val (startnode, startpos) = findStart(offset)
+ var curr = startnode
+ var pos = startpos
+ var arroffset = offset
+ while (totalleft > 0) {
+ val lefthere = math.min(totalleft, curr.size - pos)
+ Array.copy(curr.array, pos, array, arroffset, lefthere)
+ // println("from: " + arroffset + " elems " + lefthere + " - " + pos + ", " + curr + " -> " + array.toList + " by " + this + " !! " + buff.headPtr)
+ totalleft -= lefthere
+ arroffset += lefthere
+ pos = 0
+ curr = curr.next
+ }
+ }
+ private def findStart(pos: Int) = {
+ var left = pos
+ var node = buff.headPtr
+ while ((left - node.size) >= 0) {
+ left -= node.size
+ node = node.next
+ }
+ (node, left)
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new CopyUnrolledToArray(array, offset, fp), new CopyUnrolledToArray(array, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, parallelismLevel)
+ override def toString = "CopyUnrolledToArray(" + offset + ", " + howmany + ")"
+ }
+}
+
+
+
+object UnrolledParArrayCombiner {
+ def apply[T](): UnrolledParArrayCombiner[T] = new UnrolledParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]]
+}
-trait ParArrayCombiner[T]
+/** An array combiner that uses a chain of arraybuffers to store elements. */
+trait ResizableParArrayCombiner[T]
extends LazyCombiner[T, ParArray[T], ExposedArrayBuffer[T]]
{
self: EnvironmentPassingCombiner[T, ParArray[T]] =>
@@ -24,7 +122,7 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] =>
override def sizeHint(sz: Int) = if (chain.length == 1) chain(0).sizeHint(sz)
- def newLazyCombiner(c: ArrayBuffer[ExposedArrayBuffer[T]]) = ParArrayCombiner(c)
+ def newLazyCombiner(c: ArrayBuffer[ExposedArrayBuffer[T]]) = ResizableParArrayCombiner(c)
def allocateAndCopy = if (chain.size > 1) {
val arrayseq = new ArraySeq[T](size)
@@ -38,7 +136,7 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] =>
pa
}
- override def toString = "ParArrayCombiner(" + size + "): " //+ chain
+ override def toString = "ResizableParArrayCombiner(" + size + "): " //+ chain
/* tasks */
@@ -86,11 +184,11 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] =>
}
-object ParArrayCombiner {
- def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ParArrayCombiner[T] = {
- new { val chain = c } with ParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]]
+object ResizableParArrayCombiner {
+ def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ResizableParArrayCombiner[T] = {
+ new { val chain = c } with ResizableParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]]
}
- def apply[T](): ParArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T])
+ def apply[T](): ResizableParArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T])
}
diff --git a/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala b/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala
index fc63d51b33..82c69b4c53 100644
--- a/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala
+++ b/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala
@@ -43,6 +43,7 @@ trait ParFlatHashTable[T] extends collection.mutable.FlatHashTable[T] {
if (hasNext) scan()
r
} else Iterator.empty.next
+ def dup = newIterator(idx, until, totalsize)
def split = if (remaining > 1) {
val divpt = (until + idx) / 2
diff --git a/src/library/scala/collection/parallel/mutable/ParHashTable.scala b/src/library/scala/collection/parallel/mutable/ParHashTable.scala
index efba6c8d9c..b238ce09de 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashTable.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashTable.scala
@@ -67,6 +67,8 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m
}
}
+ def dup = newIterator(idx, until, totalsize, es)
+
def split: Seq[ParIterableIterator[T]] = if (remaining > 1) {
if (until > idx) {
// there is at least one more slot for the next iterator
diff --git a/src/library/scala/collection/parallel/mutable/package.scala b/src/library/scala/collection/parallel/mutable/package.scala
index 89544c8bdd..1efe79b00d 100644
--- a/src/library/scala/collection/parallel/mutable/package.scala
+++ b/src/library/scala/collection/parallel/mutable/package.scala
@@ -10,6 +10,11 @@ import scala.collection.generic.Sizing
package object mutable {
+ /* aliases */
+
+ type ParArrayCombiner[T] = ResizableParArrayCombiner[T]
+ val ParArrayCombiner = ResizableParArrayCombiner
+
/* classes and traits */
private[mutable] trait SizeMapUtils {
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 19ae9aef5d..6b3f3bf448 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -107,8 +107,10 @@ package object parallel {
}
implicit def throwable2ops(self: Throwable) = new ThrowableOps {
- def alongWith(that: Throwable) = self match {
- case ct: CompositeThrowable => new CompositeThrowable(ct.throwables + that)
+ def alongWith(that: Throwable) = (self, that) match {
+ case (self: CompositeThrowable, that: CompositeThrowable) => new CompositeThrowable(self.throwables ++ that.throwables)
+ case (self: CompositeThrowable, _) => new CompositeThrowable(self.throwables + that)
+ case (_, that: CompositeThrowable) => new CompositeThrowable(that.throwables + self)
case _ => new CompositeThrowable(Set(self, that))
}
}
@@ -117,7 +119,7 @@ package object parallel {
/** Composite throwable - thrown when multiple exceptions are thrown at the same time. */
final class CompositeThrowable(val throwables: Set[Throwable])
- extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.mkString(", "))
+ extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.map(t => (t, t.getStackTrace.toList)).mkString(", "))
/** A helper iterator for iterating very small array buffers.
@@ -133,6 +135,7 @@ package object parallel {
r
}
def remaining = until - index
+ def dup = new BufferIterator(buffer, index, until, signalDelegate)
def split: Seq[ParIterableIterator[T]] = if (remaining > 1) {
val divsz = (until - index) / 2
Seq(
diff --git a/test/benchmarks/source.list b/test/benchmarks/source.list
index dffd5ef4ea..9382e996dc 100644
--- a/test/benchmarks/source.list
+++ b/test/benchmarks/source.list
@@ -35,6 +35,7 @@ src/scala/collection/parallel/benchmarks/parallel_array/ReducePrime.scala
src/scala/collection/parallel/benchmarks/parallel_array/DropMany.scala
src/scala/collection/parallel/benchmarks/parallel_array/ReduceList.scala
src/scala/collection/parallel/benchmarks/parallel_array/ForeachLight.scala
+src/scala/collection/parallel/benchmarks/parallel_array/ScanMedium.scala
src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala
src/scala/collection/parallel/benchmarks/parallel_array/SliceMedium.scala
src/scala/collection/parallel/benchmarks/parallel_array/ReverseMap.scala
diff --git a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala
index 32c3ca154f..9eacc7ff1f 100644
--- a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala
+++ b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala
@@ -65,6 +65,7 @@ trait BenchmarkRegister {
register(parallel_array.PadToDouble)
register(parallel_array.AggregateLight)
register(parallel_array.ScanLight)
+ register(parallel_array.ScanMedium)
register(parallel_array.MatrixMultiplication)
// parallel views
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala
index b5dcfca872..f151158ad9 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala
@@ -9,6 +9,8 @@ object ForeachHeavy extends Companion {
override def comparisons = List("jsr")
override def defaultSize = 2048
+ @volatile var z = 0
+
val fun = (a: Cont) => heavyOperation(a)
val funjsr = new extra166y.Ops.Procedure[Cont] {
def op(a: Cont) = heavyOperation(a)
@@ -26,6 +28,7 @@ object ForeachHeavy extends Companion {
if (n % i == 0) isPrime = false
i += 1
}
+ if (isPrime && (n.toString == z)) z += 1
isPrime
}
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanMedium.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanMedium.scala
new file mode 100644
index 0000000000..73a237189f
--- /dev/null
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanMedium.scala
@@ -0,0 +1,55 @@
+package scala.collection.parallel.benchmarks.parallel_array
+
+
+import scala.collection.parallel.benchmarks._
+import scala.collection.parallel.mutable.ParArray
+
+
+object ScanMedium extends Companion {
+ def benchName = "scan-medium";
+ def apply(sz: Int, parallelism: Int, what: String) = new ScanMedium(sz, parallelism, what)
+ override def comparisons = List("jsr")
+ override def defaultSize = 5000
+
+ val op = (a: Cont, b: Cont) => {
+ operation(a, b)
+ }
+ def operation(a: Cont, b: Cont) = {
+ val m = if (a.in < 0) 1 else 0
+ val k = calc(a.in, b.in, m)
+ new Cont(a.in + b.in + k * m * (0 until 2).reduceLeft(_ + _))
+ }
+ private def calc(x: Int, y: Int, n: Int) = {
+ var sum = x
+ for (i <- 0 until 500) {
+ sum += y + (if (sum % 2 == 0) n * x else y)
+ if (sum % 5 == 0) sum -= x * y - n * (x + y)
+ }
+ sum
+ }
+}
+
+
+class ScanMedium(sz: Int, p: Int, what: String)
+extends Resettable[Cont](sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) {
+ def companion = ScanMedium
+ override def repetitionsPerRun = 50
+ override val runs = 12
+
+ def runpar = pa.scan(new Cont(0))(ScanMedium.op)
+ def runseq = sequentialScan(new Cont(0), ScanMedium.op, sz)
+ def runjsr = jsrarr.cumulate(new extra166y.Ops.Reducer[Cont] {
+ def op(a: Cont, b: Cont) = ScanMedium.operation(a, b)
+ }, new Cont(0))
+ override def comparisonMap = collection.Map("jsr" -> runjsr _)
+}
+
+
+
+
+
+
+
+
+
+
diff --git a/test/files/scalacheck/parallel-collections/IntOperators.scala b/test/files/scalacheck/parallel-collections/IntOperators.scala
index 690ee34cca..24330d7670 100644
--- a/test/files/scalacheck/parallel-collections/IntOperators.scala
+++ b/test/files/scalacheck/parallel-collections/IntOperators.scala
@@ -49,7 +49,8 @@ trait IntOperators extends Operators[Int] {
def foldArguments = List(
(0, _ + _),
(1, _ * _),
- (Int.MinValue, math.max(_, _))
+ (Int.MinValue, math.max(_, _)),
+ (Int.MaxValue, math.min(_, _))
)
def addAllTraversables = List(
List[Int](),
diff --git a/test/files/scalacheck/parallel-collections/Operators.scala b/test/files/scalacheck/parallel-collections/Operators.scala
index 538cc23325..b4321cf805 100644
--- a/test/files/scalacheck/parallel-collections/Operators.scala
+++ b/test/files/scalacheck/parallel-collections/Operators.scala
@@ -32,4 +32,4 @@ trait SeqOperators[T] extends Operators[T] {
def reverseMapFunctions: List[T => T]
def sameElementsSeqs: List[Seq[T]]
def startEndSeqs: List[Seq[T]]
-} \ No newline at end of file
+}
diff --git a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala
index 061bb08d9b..9299a201a1 100644
--- a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala
@@ -64,23 +64,23 @@ with PairValues[Int, Int]
}
override def checkDataStructureInvariants(orig: Traversable[(Int, Int)], ds: AnyRef) = ds match {
- case pm: ParHashMap[k, v] if 1 == 0 => // disabled this to make tests faster
- val invs = pm.brokenInvariants
-
- val containsall = (for ((k, v) <- orig) yield {
- if (pm.asInstanceOf[ParHashMap[Int, Int]].get(k) == Some(v)) true
- else {
- println("Does not contain original element: " + (k, v))
- false
- }
- }).foldLeft(true)(_ && _)
-
-
- if (invs.isEmpty) containsall
- else {
- println("Invariants broken:\n" + invs.mkString("\n"))
- false
- }
+ // case pm: ParHashMap[k, v] if 1 == 0 => // disabled this to make tests faster
+ // val invs = pm.brokenInvariants
+
+ // val containsall = (for ((k, v) <- orig) yield {
+ // if (pm.asInstanceOf[ParHashMap[Int, Int]].get(k) == Some(v)) true
+ // else {
+ // println("Does not contain original element: " + (k, v))
+ // false
+ // }
+ // }).foldLeft(true)(_ && _)
+
+
+ // if (invs.isEmpty) containsall
+ // else {
+ // println("Invariants broken:\n" + invs.mkString("\n"))
+ // false
+ // }
case _ => true
}
diff --git a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala
index be70a7c7a3..8b41908a26 100644
--- a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala
@@ -56,28 +56,28 @@ with IntValues
}
override def checkDataStructureInvariants(orig: Traversable[Int], ds: AnyRef) = ds match {
- case pm: ParHashSet[t] =>
- // for an example of how not to write code proceed below
- val invs = pm.brokenInvariants
-
- val containsall = (for (elem <- orig) yield {
- if (pm.asInstanceOf[ParHashSet[Int]](elem) == true) true
- else {
- println("Does not contain original element: " + elem)
- println(pm.hashTableContents.table.find(_ == elem))
- println(pm.hashTableContents.table.indexOf(elem))
- false
- }
- }).foldLeft(true)(_ && _)
-
-
- if (invs.isEmpty) {
- if (!containsall) println(pm.debugInformation)
- containsall
- } else {
- println("Invariants broken:\n" + invs.mkString("\n"))
- false
- }
+ // case pm: ParHashSet[t] if 1 == 0 =>
+ // // for an example of how not to write code proceed below
+ // val invs = pm.brokenInvariants
+
+ // val containsall = (for (elem <- orig) yield {
+ // if (pm.asInstanceOf[ParHashSet[Int]](elem) == true) true
+ // else {
+ // println("Does not contain original element: " + elem)
+ // println(pm.hashTableContents.table.find(_ == elem))
+ // println(pm.hashTableContents.table.indexOf(elem))
+ // false
+ // }
+ // }).foldLeft(true)(_ && _)
+
+
+ // if (invs.isEmpty) {
+ // if (!containsall) println(pm.debugInformation)
+ // containsall
+ // } else {
+ // println("Invariants broken:\n" + invs.mkString("\n"))
+ // false
+ // }
case _ => true
}
diff --git a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala
index 8b5d72ea01..0dcd877ecb 100644
--- a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala
@@ -398,6 +398,22 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col
tarr.toSeq == collarr.toSeq
}
+ if (hasStrictOrder) property("scans must be equal") = forAll(collectionPairs) {
+ case (t, coll) =>
+ (for (((first, op), ind) <- foldArguments.zipWithIndex) yield {
+ val tscan = t.scanLeft(first)(op)
+ val cscan = coll.scan(first)(op)
+ if (tscan != cscan || cscan != tscan) {
+ println("from: " + t)
+ println("and: " + coll)
+ println("scans are: ")
+ println(tscan)
+ println(cscan)
+ }
+ ("operator " + ind) |: tscan == cscan && cscan == tscan
+ }).reduceLeft(_ && _)
+ }
+
}