summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/mutable/ParArray.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/collection/parallel/mutable/ParArray.scala')
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArray.scala112
1 files changed, 89 insertions, 23 deletions
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
index 909b8eb5d7..8f70547a03 100644
--- a/src/library/scala/collection/parallel/mutable/ParArray.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -71,7 +71,7 @@ self =>
class ParArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array)
extends super.ParIterator {
- me: SignalContextPassingIterator[ParArrayIterator] =>
+ me: SignalContextPassingIterator[ParArrayIterator] =>
def hasNext = i < until
@@ -83,6 +83,8 @@ self =>
def remaining = until - i
+ def dup = new ParArrayIterator(i, until, arr) with SCPI
+
def psplit(sizesIncomplete: Int*): Seq[ParIterator] = {
var traversed = i
val total = sizesIncomplete.reduceLeft(_ + _)
@@ -439,13 +441,25 @@ self =>
override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = {
cb.sizeHint(remaining)
- cb.ifIs[ParArrayCombiner[T]] { pac =>
+ cb.ifIs[ResizableParArrayCombiner[T]] {
+ pac =>
+ // with res. combiner:
val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i)
pac.lastbuff.setInternalSize(remaining)
} otherwise {
- copy2builder_quick(cb, arr, until, i)
- i = until
+ cb.ifIs[UnrolledParArrayCombiner[T]] {
+ pac =>
+ // with unr. combiner:
+ val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]]
+ Array.copy(arr, i, targetarr, 0, until - i)
+ pac.buff.size = pac.buff.size + until - i
+ pac.buff.lastPtr.size = until - i
+ pac
+ } otherwise {
+ copy2builder_quick(cb, arr, until, i)
+ i = until
+ }
}
cb
}
@@ -495,21 +509,35 @@ self =>
}
override def reverse2combiner[U >: T, This](cb: Combiner[U, This]): Combiner[U, This] = {
- cb.ifIs[ParArrayCombiner[T]] { pac =>
+ cb.ifIs[ResizableParArrayCombiner[T]] {
+ pac =>
+ // with res. combiner:
val sz = remaining
pac.sizeHint(sz)
val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
- reverse2combiner_quick(targetarr, arr, i, until)
+ reverse2combiner_quick(targetarr, arr, 0, i, until)
pac.lastbuff.setInternalSize(sz)
pac
- } otherwise super.reverse2combiner(cb)
+ } otherwise {
+ cb.ifIs[UnrolledParArrayCombiner[T]] {
+ pac =>
+ // with unr. combiner:
+ val sz = remaining
+ pac.sizeHint(sz)
+ val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]]
+ reverse2combiner_quick(targetarr, arr, 0, i, until)
+ pac.buff.size = pac.buff.size + sz
+ pac.buff.lastPtr.size = sz
+ pac
+ } otherwise super.reverse2combiner(cb)
+ }
cb
}
- private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) {
- var j = from
- var k = ntil - from - 1
- while (j < ntil) {
+ private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], targfrom: Int, srcfrom: Int, srcuntil: Int) {
+ var j = srcfrom
+ var k = targfrom + srcuntil - srcfrom - 1
+ while (j < srcuntil) {
targ(k) = a(j)
j += 1
k -= 1
@@ -553,23 +581,61 @@ self =>
(new ParArray[S](targarrseq)).asInstanceOf[That]
} else super.map(f)(bf)
- override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = if (buildsArray(cbf(repr))) {
- // reserve an array
- val targarrseq = new ArraySeq[U](length + 1)
- val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
- targetarr(0) = z
+ override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That =
+ if (tasksupport.parallelismLevel > 1 && buildsArray(cbf(repr))) {
+ // reserve an array
+ val targarrseq = new ArraySeq[U](length + 1)
+ val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
+ targetarr(0) = z
- // do a parallel prefix scan
- executeAndWaitResult(asTask[That, Task[That, _]](new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator).mapResult(st =>
- executeAndWaitResult(asTask[That, Task[That, _]](new ScanWithScanTree[U, Any](Some(z), op, st, array, targetarr)))
- )))
+ // do a parallel prefix scan
+ if (length > 0) executeAndWaitResult(new CreateScanTree[U](0, size, z, op, parallelIterator) mapResult {
+ tree => executeAndWaitResult(new ScanToArray(tree, z, op, targetarr))
+ })
- // wrap the array into a parallel array
- (new ParArray[U](targarrseq)).asInstanceOf[That]
- } else super.scan(z)(op)(cbf)
+ // wrap the array into a parallel array
+ (new ParArray[U](targarrseq)).asInstanceOf[That]
+ } else super.scan(z)(op)(cbf)
/* tasks */
+ class ScanToArray[U >: T](tree: ScanTree[U], z: U, op: (U, U) => U, targetarr: Array[Any])
+ extends Task[Unit, ScanToArray[U]] {
+ var result = ();
+ def leaf(prev: Option[Unit]) = iterate(tree)
+ private def iterate(tree: ScanTree[U]): Unit = tree match {
+ case ScanNode(left, right) =>
+ iterate(left)
+ iterate(right)
+ case ScanLeaf(_, _, from, len, Some(prev), _) =>
+ scanLeaf(array, targetarr, from, len, prev.acc)
+ case ScanLeaf(_, _, from, len, None, _) =>
+ scanLeaf(array, targetarr, from, len, z)
+ }
+ private def scanLeaf(srcarr: Array[Any], targetarr: Array[Any], from: Int, len: Int, startval: U) {
+ var i = from
+ val until = from + len
+ var curr = startval
+ val operation = op
+ while (i < until) {
+ curr = operation(curr, srcarr(i).asInstanceOf[U])
+ i += 1
+ targetarr(i) = curr
+ }
+ }
+ def split = tree match {
+ case ScanNode(left, right) => Seq(
+ new ScanToArray(left, z, op, targetarr),
+ new ScanToArray(right, z, op, targetarr)
+ )
+ case _ => system.error("Can only split scan tree internal nodes.")
+ }
+ def shouldSplitFurther = tree match {
+ case ScanNode(_, _) => true
+ case _ => false
+ }
+ }
+
class Map[S](f: T => S, targetarr: Array[Any], offset: Int, howmany: Int) extends Task[Unit, Map[S]] {
var result = ();
def leaf(prev: Option[Unit]) = {