summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-09 10:08:20 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-09 10:08:20 +0000
commitf2ecbd04691b1914e2f77c60afc2b296aa6826ae (patch)
tree539b543eb173cfc7b0bbde4ca5f2c5bb187297df /src/library
parent492b22576f2ad46b300ce8dc31c5b672aaf517e4 (diff)
downloadscala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.tar.gz
scala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.tar.bz2
scala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.zip
Array combiners implementation changed from arr...
Array combiners implementation changed from array buffers to doubling unrolled buffers to avoid excessive copying. Still evaluating the benefits of this. No review.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/collection/immutable/HashMap.scala27
-rw-r--r--src/library/scala/collection/immutable/HashSet.scala27
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala238
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala2
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala48
-rw-r--r--src/library/scala/collection/parallel/UnrolledBuffer.scala30
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala18
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala16
-rw-r--r--src/library/scala/collection/parallel/immutable/ParRange.scala2
-rw-r--r--src/library/scala/collection/parallel/immutable/package.scala1
-rw-r--r--src/library/scala/collection/parallel/mutable/LazyCombiner.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArray.scala112
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala112
-rw-r--r--src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala1
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashTable.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/package.scala5
-rw-r--r--src/library/scala/collection/parallel/package.scala9
17 files changed, 449 insertions, 203 deletions
diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala
index 4a7a10c6c1..f6e17e9630 100644
--- a/src/library/scala/collection/immutable/HashMap.scala
+++ b/src/library/scala/collection/immutable/HashMap.scala
@@ -473,14 +473,25 @@ time { mNew.iterator.foreach( p => ()) }
}
class TrieIterator[A, +B](elems: Array[HashMap[A, B]]) extends Iterator[(A, B)] {
- private[this] var depth = 0
- private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6)
- private[this] var posStack = new Array[Int](6)
-
- private[this] var arrayD = elems
- private[this] var posD = 0
-
- private[this] var subIter: Iterator[(A, B)] = null // to traverse collision nodes
+ protected var depth = 0
+ protected var arrayStack: Array[Array[HashMap[A, B @uncheckedVariance]]] = new Array[Array[HashMap[A,B]]](6)
+ protected var posStack = new Array[Int](6)
+
+ protected var arrayD: Array[HashMap[A, B @uncheckedVariance]] = elems
+ protected var posD = 0
+
+ protected var subIter: Iterator[(A, B @uncheckedVariance)] = null // to traverse collision nodes
+
+ def dupIterator: TrieIterator[A, B] = {
+ val t = new TrieIterator(elems)
+ t.depth = depth
+ t.arrayStack = arrayStack
+ t.posStack = posStack
+ t.arrayD = arrayD
+ t.posD = posD
+ t.subIter = subIter
+ t
+ }
def hasNext = (subIter ne null) || depth >= 0
diff --git a/src/library/scala/collection/immutable/HashSet.scala b/src/library/scala/collection/immutable/HashSet.scala
index 8d52908349..720ea29639 100644
--- a/src/library/scala/collection/immutable/HashSet.scala
+++ b/src/library/scala/collection/immutable/HashSet.scala
@@ -285,14 +285,25 @@ time { mNew.iterator.foreach( p => ()) }
class TrieIterator[A](elems: Array[HashSet[A]]) extends Iterator[A] {
- private[this] var depth = 0
- private[this] var arrayStack = new Array[Array[HashSet[A]]](6)
- private[this] var posStack = new Array[Int](6)
-
- private[this] var arrayD = elems
- private[this] var posD = 0
-
- private[this] var subIter: Iterator[A] = null // to traverse collision nodes
+ protected var depth = 0
+ protected var arrayStack = new Array[Array[HashSet[A]]](6)
+ protected var posStack = new Array[Int](6)
+
+ protected var arrayD = elems
+ protected var posD = 0
+
+ protected var subIter: Iterator[A] = null // to traverse collision nodes
+
+ def dupIterator: TrieIterator[A] = {
+ val t = new TrieIterator(elems)
+ t.depth = depth
+ t.arrayStack = arrayStack
+ t.posStack = posStack
+ t.arrayD = arrayD
+ t.posD = posD
+ t.subIter = subIter
+ t
+ }
def hasNext = (subIter ne null) || depth >= 0
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index ed757655f5..d3e6eb42d4 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -4,7 +4,7 @@ package scala.collection.parallel
import scala.collection.mutable.Builder
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.ArrayBuffer
import scala.collection.IterableLike
import scala.collection.Parallel
import scala.collection.Parallelizable
@@ -577,15 +577,13 @@ self =>
*
* @return a new $coll containing the prefix scan of the elements in this $coll
*/
- def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[Repr, U, That]): That = {
- val array = new Array[Any](size + 1)
- array(0) = z
- executeAndWaitResult(new BuildScanTree[U, Any](z, op, 1, size, array, parallelIterator) mapResult { st =>
- executeAndWaitResult(new ScanWithScanTree[U, Any](Some(z), op, st, array, array) mapResult { u =>
- executeAndWaitResult(new FromArray(array, 0, size + 1, cbf) mapResult { _.result })
+ def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[Repr, U, That]): That = if (parallelismLevel > 1) {
+ if (size > 0) executeAndWaitResult(new CreateScanTree(0, size, z, op, parallelIterator) mapResult {
+ tree => executeAndWaitResult(new FromScanTree(tree, z, op, cbf) mapResult {
+ cb => cb.result
})
- })
- }
+ }) else (cbf(self.repr) += z).result
+ } else super.scanLeft(z)(op)(cbf)
/** Takes the longest prefix of elements that satisfy the predicate.
*
@@ -1099,154 +1097,117 @@ self =>
override def requiresStrictSplitters = true
}
- protected[this] class ScanTree[U >: T](val from: Int, val len: Int) {
- var value: U = _
- var left: ScanTree[U] = null
- var right: ScanTree[U] = null
- @volatile var chunkFinished = false
- var activeScan: () => Unit = null
-
- def isApplying = activeScan ne null
- def isLeaf = (left eq null) && (right eq null)
- def shouldApply = !chunkFinished && !isApplying
- def applyToInterval[A >: U](elem: U, op: (U, U) => U, array: Array[A]) = {
- //executeAndWait(new ApplyToArray(elem, op, from, len, array))
+ protected[this] class CreateScanTree[U >: T](from: Int, len: Int, z: U, op: (U, U) => U, protected[this] val pit: ParIterableIterator[T])
+ extends Transformer[ScanTree[U], CreateScanTree[U]] {
+ var result: ScanTree[U] = null
+ def leaf(prev: Option[ScanTree[U]]) = if (pit.remaining > 0) {
+ val trees = ArrayBuffer[ScanTree[U]]()
var i = from
val until = from + len
+ val blocksize = scanBlockSize
while (i < until) {
- array(i) = op(elem, array(i).asInstanceOf[U])
- i += 1
+ trees += scanBlock(i, math.min(blocksize, pit.remaining))
+ i += blocksize
}
+
+ // merge trees
+ result = mergeTrees(trees, 0, trees.length)
+ } else result = null // no elements to scan (merge will take care of `null`s)
+ private def scanBlock(from: Int, len: Int): ScanTree[U] = {
+ val pitdup = pit.dup
+ new ScanLeaf(pitdup, op, from, len, None, pit.reduceLeft(len, op))
}
- def scanInterval[A >: U](elem: U, op: (U, U) => U, srcA: Array[A], destA: Array[A]) = {
- val src = srcA.asInstanceOf[Array[Any]]
- val dest = destA.asInstanceOf[Array[Any]]
- var last = elem
- var i = from
- val until = from + len
- while (i < until) {
- last = op(last, src(i - 1).asInstanceOf[U])
- dest(i) = last
- i += 1
+ private def mergeTrees(trees: ArrayBuffer[ScanTree[U]], from: Int, howmany: Int): ScanTree[U] = if (howmany > 1) {
+ val half = howmany / 2
+ ScanNode(mergeTrees(trees, from, half), mergeTrees(trees, from + half, howmany - half))
+ } else trees(from)
+ protected[this] def newSubtask(pit: ParIterableIterator[T]) = unsupported
+ override def split = {
+ val pits = pit.split
+ for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield {
+ new CreateScanTree(untilp, p.remaining, z, op, p)
}
}
- def pushDown(v: U, op: (U, U) => U) {
- value = op(v, value)
- if (left ne null) left.pushDown(v, op)
- if (right ne null) right.pushDown(v, op)
- }
- def pushDownOnRight(v: U, op: (U, U) => U) = if (right ne null) right.pushDown(v, op)
- def printTree: Unit = printTree(0)
- private def printTree(t: Int): Unit = {
- for (i <- 0 until t) print(" ")
- if (isLeaf) print("(l) ")
- println(value + ": from " + from + " until " + (from + len))
- if (left ne null) left.printTree(t + 1)
- if (right ne null) right.printTree(t + 1)
- }
+ override def merge(that: CreateScanTree[U]) = if (this.result != null) {
+ if (that.result != null) result = ScanNode(result, that.result)
+ } else result = that.result
+ override def requiresStrictSplitters = true
}
- protected[this] class ApplyToArray[U >: T, A >: U](elem: U, op: (U, U) => U, from: Int, len: Int, array: Array[A])
- extends StrictSplitterCheckTask[Unit, ApplyToArray[U, A]] {
- var result: Unit = ()
- def leaf(prev: Option[Unit]) = {
- var i = from
- val until = from + len
- while (i < until) {
- array(i) = op(elem, array(i).asInstanceOf[U])
- i += 1
- }
+ protected[this] class FromScanTree[U >: T, That]
+ (tree: ScanTree[U], z: U, op: (U, U) => U, cbf: CanCombineFrom[Repr, U, That])
+ extends StrictSplitterCheckTask[Combiner[U, That], FromScanTree[U, That]] {
+ var result: Combiner[U, That] = null
+ def leaf(prev: Option[Combiner[U, That]]) {
+ val cb = reuse(prev, cbf(self.repr))
+ iterate(tree, cb)
+ result = cb
}
- def shouldSplitFurther = len > threshold(size, parallelismLevel min availableProcessors)
- def split = {
- val fp = len / 2
- val sp = len - fp
- Seq(
- new ApplyToArray(elem, op, from, fp, array),
- new ApplyToArray(elem, op, from + fp, sp, array)
+ private def iterate(tree: ScanTree[U], cb: Combiner[U, That]): Unit = tree match {
+ case ScanNode(left, right) =>
+ iterate(left, cb)
+ iterate(right, cb)
+ case ScanLeaf(p, _, _, len, Some(prev), _) =>
+ p.scanToCombiner(len, prev.acc, op, cb)
+ case ScanLeaf(p, _, _, len, None, _) =>
+ cb += z
+ p.scanToCombiner(len, z, op, cb)
+ }
+ def split = tree match {
+ case ScanNode(left, right) => Seq(
+ new FromScanTree(left, z, op, cbf),
+ new FromScanTree(right, z, op, cbf)
)
+ case _ => unsupportedop("Cannot be split further")
}
+ def shouldSplitFurther = tree match {
+ case ScanNode(_, _) => true
+ case ScanLeaf(_, _, _, _, _, _) => false
+ }
+ override def merge(that: FromScanTree[U, That]) = result = result combine that.result
}
- protected[this] class BuildScanTree[U >: T, A >: U](z: U, op: (U, U) => U, val from: Int, val len: Int, array: Array[A], protected[this] val pit: ParIterableIterator[T])
- extends Accessor[ScanTree[U], BuildScanTree[U, A]] {
- // TODO reimplement - there are some issues here
- var result: ScanTree[U] = null
- def leaf(prev: Option[ScanTree[U]]) = if ((prev != None && prev.get.chunkFinished) || from == 1) {
- val prevElem = if (from == 1) z else prev.get.value
- result = new ScanTree[U](from, len)
- pit.scanToArray(prevElem, op, array, from)
- result.value = array(from + len - 1).asInstanceOf[U]
- result.chunkFinished = true
- } else {
- result = new ScanTree[U](from, len)
- result.value = pit.fold(z)(op)
- }
- protected[this] def newSubtask(p: ParIterableIterator[T]) = unsupported
- override def split = {
- val pits = pit.split
- for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining); if untilp < len) yield {
- val plen = p.remaining min (len - untilp)
- new BuildScanTree[U, A](z, op, from + untilp, plen, array, p)
- }
- }
- override def merge(that: BuildScanTree[U, A]) = {
- // create scan tree node
- val left = result
- val right = that.result
- val ns = new ScanTree[U](left.from, left.len + right.len)
- ns.left = left
- ns.right = right
- ns.value = op(left.value, right.value)
- ns.pushDownOnRight(left.value, op)
-
- // set result
- result = ns
- }
- override def requiresStrictSplitters = true
+ /* scan tree */
+
+ protected[this] def scanBlockSize = (threshold(size, parallelismLevel) / 2) max 1
+
+ protected[this] trait ScanTree[U >: T] {
+ def beginsAt: Int
+ def pushdown(v: U): Unit
+ def leftmost: ScanLeaf[U]
+ def rightmost: ScanLeaf[U]
+ def print(depth: Int = 0): Unit
}
- protected[this] class ScanWithScanTree[U >: T, A >: U](first: Option[U], op: (U, U) => U, st: ScanTree[U], src: Array[A], dest: Array[A])
- extends StrictSplitterCheckTask[Unit, ScanWithScanTree[U, A]] {
- var result = ();
- def leaf(prev: Option[Unit]) = scan(st, first.get)
- private def scan(st: ScanTree[U], elem: U): Unit = if (!st.chunkFinished) {
- if (st.isLeaf) st.scanInterval(elem, op, src, dest)
- else {
- scan(st.left, elem)
- scan(st.right, st.left.value)
- }
+ protected[this] case class ScanNode[U >: T](left: ScanTree[U], right: ScanTree[U]) extends ScanTree[U] {
+ right.pushdown(left.rightmost.acc)
+ right.leftmost.prev = Some(left.rightmost)
+
+ val leftmost = left.leftmost
+ val rightmost = right.rightmost
+
+ def beginsAt = left.beginsAt
+ def pushdown(v: U) {
+ left.pushdown(v)
+ right.pushdown(v)
+ }
+ def print(depth: Int) {
+ println((" " * depth) + "ScanNode, begins at " + beginsAt)
+ left.print(depth + 1)
+ right.print(depth + 1)
}
- def split = collection.mutable.ArrayBuffer(
- new ScanWithScanTree(first, op, st.left, src, dest),
- new ScanWithScanTree(Some(st.left.value), op, st.right, src, dest)
- )
- def shouldSplitFurther = (st.left ne null) && (st.right ne null)
}
- protected[this] class FromArray[S, A, That](array: Array[A], from: Int, len: Int, cbf: CanCombineFrom[Repr, S, That])
- extends StrictSplitterCheckTask[Combiner[S, That], FromArray[S, A, That]] {
- var result: Result = null
- def leaf(prev: Option[Result]) = {
- val cb = prev getOrElse cbf(self.repr)
- var i = from
- val until = from + len
- while (i < until) {
- cb += array(i).asInstanceOf[S]
- i += 1
- }
- result = cb
- }
- def shouldSplitFurther = len > threshold(size, parallelismLevel)
- def split = {
- val fp = len / 2
- val sp = len - fp
- Seq(
- new FromArray(array, from, fp, cbf),
- new FromArray(array, from + fp, sp, cbf)
- )
+ protected[this] case class ScanLeaf[U >: T]
+ (pit: ParIterableIterator[U], op: (U, U) => U, from: Int, len: Int, var prev: Option[ScanLeaf[U]], var acc: U)
+ extends ScanTree[U] {
+ def beginsAt = from
+ def pushdown(v: U) = {
+ acc = op(v, acc)
}
- override def merge(that: FromArray[S, A, That]) = result = result combine that.result
+ def leftmost = this
+ def rightmost = this
+ def print(depth: Int) = println((" " * depth) + this)
}
/* debug information */
@@ -1255,7 +1216,8 @@ self =>
private[parallel] def brokenInvariants = Seq[String]()
- private val debugBuffer = collection.mutable.ArrayBuffer[String]()
+ // private val dbbuff = ArrayBuffer[String]()
+ def debugBuffer: ArrayBuffer[String] = null // dbbuff
private[parallel] def debugclear() = synchronized {
debugBuffer.clear
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index 0ea33d0e39..91e15fa946 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -102,6 +102,8 @@ self =>
final def remaining = end - i
+ def dup = new Elements(i, end) with SignalContextPassingIterator[ParIterator]
+
def split = psplit(remaining / 2, remaining - remaining / 2)
def psplit(sizes: Int*) = {
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index a33702a527..3363186b7d 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -87,6 +87,16 @@ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] {
}
}
+ def reduceLeft[U >: T](howmany: Int, op: (U, U) => U): U = {
+ var i = howmany - 1
+ var u: U = next
+ while (i > 0 && hasNext) {
+ u = op(u, next)
+ i -= 1
+ }
+ u
+ }
+
/* transformers to combiners */
def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
@@ -165,7 +175,7 @@ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] {
def slice2combiner[U >: T, This](from: Int, until: Int, cb: Combiner[U, This]): Combiner[U, This] = {
drop(from)
- var left = until - from
+ var left = math.max(until - from, 0)
cb.sizeHint(left)
while (left > 0) {
cb += next
@@ -221,6 +231,26 @@ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] {
}
}
+ def scanToCombiner[U >: T, That](startValue: U, op: (U, U) => U, cb: Combiner[U, That]) = {
+ var curr = startValue
+ while (hasNext) {
+ curr = op(curr, next)
+ cb += curr
+ }
+ cb
+ }
+
+ def scanToCombiner[U >: T, That](howmany: Int, startValue: U, op: (U, U) => U, cb: Combiner[U, That]) = {
+ var curr = startValue
+ var left = howmany
+ while (left > 0) {
+ curr = op(curr, next)
+ cb += curr
+ left -= 1
+ }
+ cb
+ }
+
def zip2combiner[U >: T, S, That](otherpit: RemainsIterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
cb.sizeHint(remaining min otherpit.remaining)
while (hasNext && otherpit.hasNext) {
@@ -336,6 +366,9 @@ extends AugmentedIterableIterator[T]
{
self =>
+ /** Creates a copy of this iterator. */
+ def dup: ParIterableIterator[T]
+
def split: Seq[ParIterableIterator[T]]
/** The number of elements this iterator has yet to traverse. This method
@@ -377,6 +410,7 @@ self =>
var remaining = taken min self.remaining
def hasNext = remaining > 0
def next = { remaining -= 1; self.next }
+ def dup: ParIterableIterator[T] = self.dup.take(taken)
def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) }
protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = {
val sizes = sq.scanLeft(0)(_ + _.remaining)
@@ -400,6 +434,7 @@ self =>
def hasNext = self.hasNext
def next = f(self.next)
def remaining = self.remaining
+ def dup: ParIterableIterator[S] = self.dup map f
def split: Seq[ParIterableIterator[S]] = self.split.map { _ map f }
}
@@ -418,6 +453,7 @@ self =>
} else curr.next
def remaining = if (curr eq self) curr.remaining + that.remaining else curr.remaining
protected def firstNonEmpty = (curr eq self) && curr.hasNext
+ def dup: ParIterableIterator[U] = self.dup.appendParIterable[U, PI](that)
def split: Seq[ParIterableIterator[U]] = if (firstNonEmpty) Seq(curr, that) else curr.split
}
@@ -428,6 +464,7 @@ self =>
def hasNext = self.hasNext && that.hasNext
def next = (self.next, that.next)
def remaining = self.remaining min that.remaining
+ def dup: ParIterableIterator[(T, S)] = self.dup.zipParSeq(that)
def split: Seq[ParIterableIterator[(T, S)]] = {
val selfs = self.split
val sizes = selfs.map(_.remaining)
@@ -447,6 +484,7 @@ self =>
else (self.next, thatelem)
} else (thiselem, that.next);
def remaining = self.remaining max that.remaining
+ def dup: ParIterableIterator[(U, S)] = self.dup.zipAllParSeq(that, thiselem, thatelem)
def split: Seq[ParIterableIterator[(U, S)]] = {
val selfrem = self.remaining
val thatrem = that.remaining
@@ -468,6 +506,8 @@ extends ParIterableIterator[T]
with PreciseSplitter[T]
{
self =>
+ def dup: ParSeqIterator[T]
+
def split: Seq[ParSeqIterator[T]]
def psplit(sizes: Int*): Seq[ParSeqIterator[T]]
@@ -483,6 +523,7 @@ self =>
/* iterator transformers */
class Taken(tk: Int) extends super.Taken(tk) with ParSeqIterator[T] {
+ override def dup = super.dup.asInstanceOf[ParSeqIterator[T]]
override def split: Seq[ParSeqIterator[T]] = super.split.asInstanceOf[Seq[ParSeqIterator[T]]]
def psplit(sizes: Int*): Seq[ParSeqIterator[T]] = takeSeq(self.psplit(sizes: _*)) { (p, n) => p.take(n) }
}
@@ -497,6 +538,7 @@ self =>
}
class Mapped[S](f: T => S) extends super.Mapped[S](f) with ParSeqIterator[S] {
+ override def dup = super.dup.asInstanceOf[ParSeqIterator[S]]
override def split: Seq[ParSeqIterator[S]] = super.split.asInstanceOf[Seq[ParSeqIterator[S]]]
def psplit(sizes: Int*): Seq[ParSeqIterator[S]] = self.psplit(sizes: _*).map { _ map f }
}
@@ -504,6 +546,7 @@ self =>
override def map[S](f: T => S) = new Mapped(f)
class Appended[U >: T, PI <: ParSeqIterator[U]](it: PI) extends super.Appended[U, PI](it) with ParSeqIterator[U] {
+ override def dup = super.dup.asInstanceOf[ParSeqIterator[U]]
override def split: Seq[ParSeqIterator[U]] = super.split.asInstanceOf[Seq[ParSeqIterator[U]]]
def psplit(sizes: Int*): Seq[ParSeqIterator[U]] = if (firstNonEmpty) {
val selfrem = self.remaining
@@ -534,6 +577,7 @@ self =>
def appendParSeq[U >: T, PI <: ParSeqIterator[U]](that: PI) = new Appended[U, PI](that)
class Zipped[S](ti: ParSeqIterator[S]) extends super.Zipped[S](ti) with ParSeqIterator[(T, S)] {
+ override def dup = super.dup.asInstanceOf[ParSeqIterator[(T, S)]]
override def split: Seq[ParSeqIterator[(T, S)]] = super.split.asInstanceOf[Seq[ParSeqIterator[(T, S)]]]
def psplit(szs: Int*) = (self.psplit(szs: _*) zip that.psplit(szs: _*)) map { p => p._1 zipParSeq p._2 }
}
@@ -541,6 +585,7 @@ self =>
override def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that)
class ZippedAll[U >: T, S](ti: ParSeqIterator[S], thise: U, thate: S) extends super.ZippedAll[U, S](ti, thise, thate) with ParSeqIterator[(U, S)] {
+ override def dup = super.dup.asInstanceOf[ParSeqIterator[(U, S)]]
private def patchem = {
val selfrem = self.remaining
val thatrem = that.remaining
@@ -578,6 +623,7 @@ self =>
def hasNext = trio.hasNext
def next = trio.next
def remaining = trio.remaining
+ def dup = self.dup.patchParSeq(from, patch, replaced)
def split = trio.split
def psplit(sizes: Int*) = trio.psplit(sizes: _*)
}
diff --git a/src/library/scala/collection/parallel/UnrolledBuffer.scala b/src/library/scala/collection/parallel/UnrolledBuffer.scala
index d29c24822c..c7a8b388bd 100644
--- a/src/library/scala/collection/parallel/UnrolledBuffer.scala
+++ b/src/library/scala/collection/parallel/UnrolledBuffer.scala
@@ -46,7 +46,7 @@ extends collection.mutable.Buffer[T]
{
import UnrolledBuffer.Unrolled
- private var headptr = new Unrolled[T]
+ private var headptr = newUnrolled
private var lastptr = headptr
private var sz = 0
@@ -54,9 +54,14 @@ extends collection.mutable.Buffer[T]
private[parallel] def headPtr_=(head: Unrolled[T]) = headptr = head
private[parallel] def lastPtr = lastptr
private[parallel] def lastPtr_=(last: Unrolled[T]) = lastptr = last
+ private[parallel] def size_=(s: Int) = sz = s
protected[this] override def newBuilder = new UnrolledBuffer[T]
+ protected def newUnrolled = new Unrolled[T](this)
+
+ private[collection] def calcNextLength(sz: Int) = sz
+
def classManifestCompanion = UnrolledBuffer
def concat(that: UnrolledBuffer[T]) = {
@@ -82,7 +87,7 @@ extends collection.mutable.Buffer[T]
}
def clear() {
- headptr = new Unrolled[T]
+ headptr = newUnrolled
lastptr = headptr
sz = 0
}
@@ -153,20 +158,23 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
val waterline = 50
val waterlineDelim = 100
- private[parallel] val unrolledsize = 32
+ private[parallel] val unrolledlength = 32
/** Unrolled buffer node.
*/
- class Unrolled[T: ClassManifest] private (var size: Int, var array: Array[T], var next: Unrolled[T]) {
- def this() = this(0, new Array[T](UnrolledBuffer.unrolledsize), null)
+ class Unrolled[T: ClassManifest] private[parallel] (var size: Int, var array: Array[T], var next: Unrolled[T], val buff: UnrolledBuffer[T] = null) {
+ private[parallel] def this() = this(0, new Array[T](unrolledlength), null, null)
+ private[parallel] def this(b: UnrolledBuffer[T]) = this(0, new Array[T](unrolledlength), null, b)
+
+ private def nextlength = if (buff eq null) unrolledlength else buff.calcNextLength(array.length)
// adds and returns itself or the new unrolled if full
- @tailrec final def append(elem: T): Unrolled[T] = if (size < UnrolledBuffer.unrolledsize) {
+ @tailrec final def append(elem: T): Unrolled[T] = if (size < array.length) {
array(size) = elem
size += 1
this
} else {
- next = new Unrolled[T]
+ next = new Unrolled[T](0, new Array[T](nextlength), null, buff)
next.append(elem)
}
def foreach[U](f: T => U) {
@@ -200,7 +208,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
} else {
// allocate a new node and store element
// then make it point to this
- val newhead = new Unrolled[T]
+ val newhead = new Unrolled[T](buff)
newhead.append(elem)
newhead.next = this
newhead
@@ -244,7 +252,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
@tailrec final def insertAll(idx: Int, t: Traversable[T], buffer: UnrolledBuffer[T]): Unit = if (idx < size) {
// divide this node at the appropriate position and insert all into head
// update new next
- val newnextnode = new Unrolled[T]
+ val newnextnode = new Unrolled[T](0, new Array(array.length), null, buff)
Array.copy(array, idx, newnextnode.array, 0, size - idx)
newnextnode.size = size - idx
newnextnode.next = next
@@ -265,7 +273,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
private def nullout(from: Int, until: Int) {
var idx = from
while (idx < until) {
- array(idx) = null.asInstanceOf[T] // !!
+ array(idx) = null.asInstanceOf[T] // TODO find a way to assign a default here!!
idx += 1
}
}
@@ -279,7 +287,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
tryMergeWithNext()
}
- override def toString = array.take(size).mkString("Unrolled(", ", ", ")") + " -> " + (if (next ne null) next.toString else "")
+ override def toString = array.take(size).mkString("Unrolled[" + array.length + "](", ", ", ")") + " -> " + (if (next ne null) next.toString else "")
}
}
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 58dce1aef4..a411a1cc44 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -20,7 +20,7 @@ import scala.collection.generic.GenericParMapCompanion
import scala.collection.immutable.HashMap
-
+import annotation.unchecked.uncheckedVariance
@@ -62,10 +62,24 @@ self =>
type SCPI = SignalContextPassingIterator[ParHashMapIterator]
- class ParHashMapIterator(val triter: Iterator[(K, V)], val sz: Int)
+ class ParHashMapIterator(var triter: Iterator[(K, V @uncheckedVariance)], val sz: Int)
extends super.ParIterator {
self: SignalContextPassingIterator[ParHashMapIterator] =>
var i = 0
+ def dup = triter match {
+ case t: HashMap.TrieIterator[_, _] =>
+ val dupt = t.dupIterator.asInstanceOf[Iterator[(K, V)]]
+ dupFromIterator(dupt)
+ case _ =>
+ val buff = triter.toBuffer
+ triter = buff.iterator
+ dupFromIterator(buff.iterator)
+ }
+ private def dupFromIterator(it: Iterator[(K, V @uncheckedVariance)]) = {
+ val phit = new ParHashMapIterator(it, sz) with SCPI
+ phit.i = i
+ phit
+ }
def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match {
case t: HashMap.TrieIterator[_, _] =>
val previousRemaining = remaining
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
index 747ed3eed3..0b1f9c5b7e 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -61,10 +61,24 @@ self =>
type SCPI = SignalContextPassingIterator[ParHashSetIterator]
- class ParHashSetIterator(val triter: Iterator[T], val sz: Int)
+ class ParHashSetIterator(var triter: Iterator[T], val sz: Int)
extends super.ParIterator {
self: SignalContextPassingIterator[ParHashSetIterator] =>
var i = 0
+ def dup = triter match {
+ case t: HashSet.TrieIterator[_] =>
+ val dupt = t.dupIterator.asInstanceOf[Iterator[T]]
+ dupFromIterator(dupt)
+ case _ =>
+ val buff = triter.toBuffer
+ triter = buff.iterator
+ dupFromIterator(buff.iterator)
+ }
+ private def dupFromIterator(it: Iterator[T]) = {
+ val phit = new ParHashSetIterator(it, sz) with SCPI
+ phit.i = i
+ phit
+ }
def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match {
case t: HashSet.TrieIterator[_] =>
val previousRemaining = remaining
diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala
index dbd5449362..ab5e509515 100644
--- a/src/library/scala/collection/parallel/immutable/ParRange.scala
+++ b/src/library/scala/collection/parallel/immutable/ParRange.scala
@@ -46,6 +46,8 @@ self =>
private def rangeleft = range.drop(ind)
+ def dup = new ParRangeIterator(rangeleft) with SCPI
+
def split = {
val rleft = rangeleft
val elemleft = rleft.length
diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala
index 9c6bbae8dd..142e554178 100644
--- a/src/library/scala/collection/parallel/immutable/package.scala
+++ b/src/library/scala/collection/parallel/immutable/package.scala
@@ -38,6 +38,7 @@ package object immutable {
def remaining = until - i
def hasNext = i < until
def next = { i += 1; elem }
+ def dup = new ParIterator(i, until, elem) with SCPI
def psplit(sizes: Int*) = {
val incr = sizes.scanLeft(0)(_ + _)
for ((start, end) <- incr.init zip incr.tail) yield new ParIterator(i + start, (i + end) min until, elem) with SCPI
diff --git a/src/library/scala/collection/parallel/mutable/LazyCombiner.scala b/src/library/scala/collection/parallel/mutable/LazyCombiner.scala
index bd17d24ea8..955698fdd4 100644
--- a/src/library/scala/collection/parallel/mutable/LazyCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/LazyCombiner.scala
@@ -21,7 +21,7 @@ import scala.collection.parallel.Combiner
*/
trait LazyCombiner[Elem, +To, Buff <: Growable[Elem] with Sizing] extends Combiner[Elem, To]
{
- self: collection.parallel.EnvironmentPassingCombiner[Elem, To] =>
+self: collection.parallel.EnvironmentPassingCombiner[Elem, To] =>
val chain: ArrayBuffer[Buff]
val lastbuff = chain.last
def +=(elem: Elem) = { lastbuff += elem; this }
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
index 909b8eb5d7..8f70547a03 100644
--- a/src/library/scala/collection/parallel/mutable/ParArray.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -71,7 +71,7 @@ self =>
class ParArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array)
extends super.ParIterator {
- me: SignalContextPassingIterator[ParArrayIterator] =>
+ me: SignalContextPassingIterator[ParArrayIterator] =>
def hasNext = i < until
@@ -83,6 +83,8 @@ self =>
def remaining = until - i
+ def dup = new ParArrayIterator(i, until, arr) with SCPI
+
def psplit(sizesIncomplete: Int*): Seq[ParIterator] = {
var traversed = i
val total = sizesIncomplete.reduceLeft(_ + _)
@@ -439,13 +441,25 @@ self =>
override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = {
cb.sizeHint(remaining)
- cb.ifIs[ParArrayCombiner[T]] { pac =>
+ cb.ifIs[ResizableParArrayCombiner[T]] {
+ pac =>
+ // with res. combiner:
val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i)
pac.lastbuff.setInternalSize(remaining)
} otherwise {
- copy2builder_quick(cb, arr, until, i)
- i = until
+ cb.ifIs[UnrolledParArrayCombiner[T]] {
+ pac =>
+ // with unr. combiner:
+ val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]]
+ Array.copy(arr, i, targetarr, 0, until - i)
+ pac.buff.size = pac.buff.size + until - i
+ pac.buff.lastPtr.size = until - i
+ pac
+ } otherwise {
+ copy2builder_quick(cb, arr, until, i)
+ i = until
+ }
}
cb
}
@@ -495,21 +509,35 @@ self =>
}
override def reverse2combiner[U >: T, This](cb: Combiner[U, This]): Combiner[U, This] = {
- cb.ifIs[ParArrayCombiner[T]] { pac =>
+ cb.ifIs[ResizableParArrayCombiner[T]] {
+ pac =>
+ // with res. combiner:
val sz = remaining
pac.sizeHint(sz)
val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
- reverse2combiner_quick(targetarr, arr, i, until)
+ reverse2combiner_quick(targetarr, arr, 0, i, until)
pac.lastbuff.setInternalSize(sz)
pac
- } otherwise super.reverse2combiner(cb)
+ } otherwise {
+ cb.ifIs[UnrolledParArrayCombiner[T]] {
+ pac =>
+ // with unr. combiner:
+ val sz = remaining
+ pac.sizeHint(sz)
+ val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]]
+ reverse2combiner_quick(targetarr, arr, 0, i, until)
+ pac.buff.size = pac.buff.size + sz
+ pac.buff.lastPtr.size = sz
+ pac
+ } otherwise super.reverse2combiner(cb)
+ }
cb
}
- private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) {
- var j = from
- var k = ntil - from - 1
- while (j < ntil) {
+ private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], targfrom: Int, srcfrom: Int, srcuntil: Int) {
+ var j = srcfrom
+ var k = targfrom + srcuntil - srcfrom - 1
+ while (j < srcuntil) {
targ(k) = a(j)
j += 1
k -= 1
@@ -553,23 +581,61 @@ self =>
(new ParArray[S](targarrseq)).asInstanceOf[That]
} else super.map(f)(bf)
- override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = if (buildsArray(cbf(repr))) {
- // reserve an array
- val targarrseq = new ArraySeq[U](length + 1)
- val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
- targetarr(0) = z
+ override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That =
+ if (tasksupport.parallelismLevel > 1 && buildsArray(cbf(repr))) {
+ // reserve an array
+ val targarrseq = new ArraySeq[U](length + 1)
+ val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
+ targetarr(0) = z
- // do a parallel prefix scan
- executeAndWaitResult(asTask[That, Task[That, _]](new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator).mapResult(st =>
- executeAndWaitResult(asTask[That, Task[That, _]](new ScanWithScanTree[U, Any](Some(z), op, st, array, targetarr)))
- )))
+ // do a parallel prefix scan
+ if (length > 0) executeAndWaitResult(new CreateScanTree[U](0, size, z, op, parallelIterator) mapResult {
+ tree => executeAndWaitResult(new ScanToArray(tree, z, op, targetarr))
+ })
- // wrap the array into a parallel array
- (new ParArray[U](targarrseq)).asInstanceOf[That]
- } else super.scan(z)(op)(cbf)
+ // wrap the array into a parallel array
+ (new ParArray[U](targarrseq)).asInstanceOf[That]
+ } else super.scan(z)(op)(cbf)
/* tasks */
+ class ScanToArray[U >: T](tree: ScanTree[U], z: U, op: (U, U) => U, targetarr: Array[Any])
+ extends Task[Unit, ScanToArray[U]] {
+ var result = ();
+ def leaf(prev: Option[Unit]) = iterate(tree)
+ private def iterate(tree: ScanTree[U]): Unit = tree match {
+ case ScanNode(left, right) =>
+ iterate(left)
+ iterate(right)
+ case ScanLeaf(_, _, from, len, Some(prev), _) =>
+ scanLeaf(array, targetarr, from, len, prev.acc)
+ case ScanLeaf(_, _, from, len, None, _) =>
+ scanLeaf(array, targetarr, from, len, z)
+ }
+ private def scanLeaf(srcarr: Array[Any], targetarr: Array[Any], from: Int, len: Int, startval: U) {
+ var i = from
+ val until = from + len
+ var curr = startval
+ val operation = op
+ while (i < until) {
+ curr = operation(curr, srcarr(i).asInstanceOf[U])
+ i += 1
+ targetarr(i) = curr
+ }
+ }
+ def split = tree match {
+ case ScanNode(left, right) => Seq(
+ new ScanToArray(left, z, op, targetarr),
+ new ScanToArray(right, z, op, targetarr)
+ )
+ case _ => system.error("Can only split scan tree internal nodes.")
+ }
+ def shouldSplitFurther = tree match {
+ case ScanNode(_, _) => true
+ case _ => false
+ }
+ }
+
class Map[S](f: T => S, targetarr: Array[Any], offset: Int, howmany: Int) extends Task[Unit, Map[S]] {
var result = ();
def leaf(prev: Option[Unit]) = {
diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
index 760f8b09ce..339f827aef 100644
--- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
@@ -9,14 +9,112 @@ import scala.collection.mutable.ArraySeq
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.TaskSupport
import scala.collection.parallel.EnvironmentPassingCombiner
+import scala.collection.parallel.unsupportedop
+import scala.collection.parallel.UnrolledBuffer
+import scala.collection.parallel.UnrolledBuffer.Unrolled
+import scala.collection.parallel.Combiner
+private[mutable] class DoublingUnrolledBuffer[T](implicit m: ClassManifest[T]) extends UnrolledBuffer[T]()(m) {
+ override def calcNextLength(sz: Int) = if (sz < 10000) sz * 2 else sz
+ protected override def newUnrolled = new Unrolled[T](0, new Array[T](4), null, this)
+}
+
+
+
+/** An array combiner that uses doubling unrolled buffers to store elements. */
+trait UnrolledParArrayCombiner[T]
+extends Combiner[T, ParArray[T]] {
+self: EnvironmentPassingCombiner[T, ParArray[T]] =>
+ // because size is doubling, random access is O(logn)!
+ val buff = new DoublingUnrolledBuffer[Any]
+
+ import tasksupport._
+
+ def +=(elem: T) = {
+ buff += elem
+ this
+ }
+
+ def result = {
+ val arrayseq = new ArraySeq[T](size)
+ val array = arrayseq.array.asInstanceOf[Array[Any]]
+
+ executeAndWaitResult(new CopyUnrolledToArray(array, 0, size))
+
+ new ParArray(arrayseq)
+ }
+
+ def clear {
+ buff.clear
+ }
+
+ override def sizeHint(sz: Int) = {
+ buff.lastPtr.next = new Unrolled(0, new Array[Any](sz), null, buff)
+ buff.lastPtr = buff.lastPtr.next
+ }
+
+ def combine[N <: T, NewTo >: ParArray[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = other match {
+ case that if that eq this => this // just return this
+ case that: UnrolledParArrayCombiner[t] =>
+ buff concat that.buff
+ this
+ case _ => unsupportedop("Cannot combine with combiner of different type.")
+ }
+
+ def size = buff.size
+
+ /* tasks */
+
+ class CopyUnrolledToArray(array: Array[Any], offset: Int, howmany: Int)
+ extends Task[Unit, CopyUnrolledToArray] {
+ var result = ();
+ def leaf(prev: Option[Unit]) = if (howmany > 0) {
+ var totalleft = howmany
+ val (startnode, startpos) = findStart(offset)
+ var curr = startnode
+ var pos = startpos
+ var arroffset = offset
+ while (totalleft > 0) {
+ val lefthere = math.min(totalleft, curr.size - pos)
+ Array.copy(curr.array, pos, array, arroffset, lefthere)
+ // println("from: " + arroffset + " elems " + lefthere + " - " + pos + ", " + curr + " -> " + array.toList + " by " + this + " !! " + buff.headPtr)
+ totalleft -= lefthere
+ arroffset += lefthere
+ pos = 0
+ curr = curr.next
+ }
+ }
+ private def findStart(pos: Int) = {
+ var left = pos
+ var node = buff.headPtr
+ while ((left - node.size) >= 0) {
+ left -= node.size
+ node = node.next
+ }
+ (node, left)
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new CopyUnrolledToArray(array, offset, fp), new CopyUnrolledToArray(array, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, parallelismLevel)
+ override def toString = "CopyUnrolledToArray(" + offset + ", " + howmany + ")"
+ }
+}
+
+
+
+object UnrolledParArrayCombiner {
+ def apply[T](): UnrolledParArrayCombiner[T] = new UnrolledParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]]
+}
-trait ParArrayCombiner[T]
+/** An array combiner that uses a chain of arraybuffers to store elements. */
+trait ResizableParArrayCombiner[T]
extends LazyCombiner[T, ParArray[T], ExposedArrayBuffer[T]]
{
self: EnvironmentPassingCombiner[T, ParArray[T]] =>
@@ -24,7 +122,7 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] =>
override def sizeHint(sz: Int) = if (chain.length == 1) chain(0).sizeHint(sz)
- def newLazyCombiner(c: ArrayBuffer[ExposedArrayBuffer[T]]) = ParArrayCombiner(c)
+ def newLazyCombiner(c: ArrayBuffer[ExposedArrayBuffer[T]]) = ResizableParArrayCombiner(c)
def allocateAndCopy = if (chain.size > 1) {
val arrayseq = new ArraySeq[T](size)
@@ -38,7 +136,7 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] =>
pa
}
- override def toString = "ParArrayCombiner(" + size + "): " //+ chain
+ override def toString = "ResizableParArrayCombiner(" + size + "): " //+ chain
/* tasks */
@@ -86,11 +184,11 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] =>
}
-object ParArrayCombiner {
- def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ParArrayCombiner[T] = {
- new { val chain = c } with ParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]]
+object ResizableParArrayCombiner {
+ def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ResizableParArrayCombiner[T] = {
+ new { val chain = c } with ResizableParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]]
}
- def apply[T](): ParArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T])
+ def apply[T](): ResizableParArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T])
}
diff --git a/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala b/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala
index fc63d51b33..82c69b4c53 100644
--- a/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala
+++ b/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala
@@ -43,6 +43,7 @@ trait ParFlatHashTable[T] extends collection.mutable.FlatHashTable[T] {
if (hasNext) scan()
r
} else Iterator.empty.next
+ def dup = newIterator(idx, until, totalsize)
def split = if (remaining > 1) {
val divpt = (until + idx) / 2
diff --git a/src/library/scala/collection/parallel/mutable/ParHashTable.scala b/src/library/scala/collection/parallel/mutable/ParHashTable.scala
index efba6c8d9c..b238ce09de 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashTable.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashTable.scala
@@ -67,6 +67,8 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m
}
}
+ def dup = newIterator(idx, until, totalsize, es)
+
def split: Seq[ParIterableIterator[T]] = if (remaining > 1) {
if (until > idx) {
// there is at least one more slot for the next iterator
diff --git a/src/library/scala/collection/parallel/mutable/package.scala b/src/library/scala/collection/parallel/mutable/package.scala
index 89544c8bdd..1efe79b00d 100644
--- a/src/library/scala/collection/parallel/mutable/package.scala
+++ b/src/library/scala/collection/parallel/mutable/package.scala
@@ -10,6 +10,11 @@ import scala.collection.generic.Sizing
package object mutable {
+ /* aliases */
+
+ type ParArrayCombiner[T] = ResizableParArrayCombiner[T]
+ val ParArrayCombiner = ResizableParArrayCombiner
+
/* classes and traits */
private[mutable] trait SizeMapUtils {
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 19ae9aef5d..6b3f3bf448 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -107,8 +107,10 @@ package object parallel {
}
implicit def throwable2ops(self: Throwable) = new ThrowableOps {
- def alongWith(that: Throwable) = self match {
- case ct: CompositeThrowable => new CompositeThrowable(ct.throwables + that)
+ def alongWith(that: Throwable) = (self, that) match {
+ case (self: CompositeThrowable, that: CompositeThrowable) => new CompositeThrowable(self.throwables ++ that.throwables)
+ case (self: CompositeThrowable, _) => new CompositeThrowable(self.throwables + that)
+ case (_, that: CompositeThrowable) => new CompositeThrowable(that.throwables + self)
case _ => new CompositeThrowable(Set(self, that))
}
}
@@ -117,7 +119,7 @@ package object parallel {
/** Composite throwable - thrown when multiple exceptions are thrown at the same time. */
final class CompositeThrowable(val throwables: Set[Throwable])
- extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.mkString(", "))
+ extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.map(t => (t, t.getStackTrace.toList)).mkString(", "))
/** A helper iterator for iterating very small array buffers.
@@ -133,6 +135,7 @@ package object parallel {
r
}
def remaining = until - index
+ def dup = new BufferIterator(buffer, index, until, signalDelegate)
def split: Seq[ParIterableIterator[T]] = if (remaining > 1) {
val divsz = (until - index) / 2
Seq(