summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/mutable/ParArray.scala
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-09 10:08:20 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-09 10:08:20 +0000
commitf2ecbd04691b1914e2f77c60afc2b296aa6826ae (patch)
tree539b543eb173cfc7b0bbde4ca5f2c5bb187297df /src/library/scala/collection/parallel/mutable/ParArray.scala
parent492b22576f2ad46b300ce8dc31c5b672aaf517e4 (diff)
downloadscala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.tar.gz
scala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.tar.bz2
scala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.zip
Array combiners implementation changed from arr...
Array combiners implementation changed from array buffers to doubling unrolled buffers to avoid excessive copying. Still evaluating the benefits of this. No review.
Diffstat (limited to 'src/library/scala/collection/parallel/mutable/ParArray.scala')
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArray.scala112
1 files changed, 89 insertions, 23 deletions
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
index 909b8eb5d7..8f70547a03 100644
--- a/src/library/scala/collection/parallel/mutable/ParArray.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -71,7 +71,7 @@ self =>
class ParArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array)
extends super.ParIterator {
- me: SignalContextPassingIterator[ParArrayIterator] =>
+ me: SignalContextPassingIterator[ParArrayIterator] =>
def hasNext = i < until
@@ -83,6 +83,8 @@ self =>
def remaining = until - i
+ def dup = new ParArrayIterator(i, until, arr) with SCPI
+
def psplit(sizesIncomplete: Int*): Seq[ParIterator] = {
var traversed = i
val total = sizesIncomplete.reduceLeft(_ + _)
@@ -439,13 +441,25 @@ self =>
override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = {
cb.sizeHint(remaining)
- cb.ifIs[ParArrayCombiner[T]] { pac =>
+ cb.ifIs[ResizableParArrayCombiner[T]] {
+ pac =>
+ // with res. combiner:
val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i)
pac.lastbuff.setInternalSize(remaining)
} otherwise {
- copy2builder_quick(cb, arr, until, i)
- i = until
+ cb.ifIs[UnrolledParArrayCombiner[T]] {
+ pac =>
+ // with unr. combiner:
+ val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]]
+ Array.copy(arr, i, targetarr, 0, until - i)
+ pac.buff.size = pac.buff.size + until - i
+ pac.buff.lastPtr.size = until - i
+ pac
+ } otherwise {
+ copy2builder_quick(cb, arr, until, i)
+ i = until
+ }
}
cb
}
@@ -495,21 +509,35 @@ self =>
}
override def reverse2combiner[U >: T, This](cb: Combiner[U, This]): Combiner[U, This] = {
- cb.ifIs[ParArrayCombiner[T]] { pac =>
+ cb.ifIs[ResizableParArrayCombiner[T]] {
+ pac =>
+ // with res. combiner:
val sz = remaining
pac.sizeHint(sz)
val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
- reverse2combiner_quick(targetarr, arr, i, until)
+ reverse2combiner_quick(targetarr, arr, 0, i, until)
pac.lastbuff.setInternalSize(sz)
pac
- } otherwise super.reverse2combiner(cb)
+ } otherwise {
+ cb.ifIs[UnrolledParArrayCombiner[T]] {
+ pac =>
+ // with unr. combiner:
+ val sz = remaining
+ pac.sizeHint(sz)
+ val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]]
+ reverse2combiner_quick(targetarr, arr, 0, i, until)
+ pac.buff.size = pac.buff.size + sz
+ pac.buff.lastPtr.size = sz
+ pac
+ } otherwise super.reverse2combiner(cb)
+ }
cb
}
- private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) {
- var j = from
- var k = ntil - from - 1
- while (j < ntil) {
+ private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], targfrom: Int, srcfrom: Int, srcuntil: Int) {
+ var j = srcfrom
+ var k = targfrom + srcuntil - srcfrom - 1
+ while (j < srcuntil) {
targ(k) = a(j)
j += 1
k -= 1
@@ -553,23 +581,61 @@ self =>
(new ParArray[S](targarrseq)).asInstanceOf[That]
} else super.map(f)(bf)
- override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = if (buildsArray(cbf(repr))) {
- // reserve an array
- val targarrseq = new ArraySeq[U](length + 1)
- val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
- targetarr(0) = z
+ override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That =
+ if (tasksupport.parallelismLevel > 1 && buildsArray(cbf(repr))) {
+ // reserve an array
+ val targarrseq = new ArraySeq[U](length + 1)
+ val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
+ targetarr(0) = z
- // do a parallel prefix scan
- executeAndWaitResult(asTask[That, Task[That, _]](new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator).mapResult(st =>
- executeAndWaitResult(asTask[That, Task[That, _]](new ScanWithScanTree[U, Any](Some(z), op, st, array, targetarr)))
- )))
+ // do a parallel prefix scan
+ if (length > 0) executeAndWaitResult(new CreateScanTree[U](0, size, z, op, parallelIterator) mapResult {
+ tree => executeAndWaitResult(new ScanToArray(tree, z, op, targetarr))
+ })
- // wrap the array into a parallel array
- (new ParArray[U](targarrseq)).asInstanceOf[That]
- } else super.scan(z)(op)(cbf)
+ // wrap the array into a parallel array
+ (new ParArray[U](targarrseq)).asInstanceOf[That]
+ } else super.scan(z)(op)(cbf)
/* tasks */
+ class ScanToArray[U >: T](tree: ScanTree[U], z: U, op: (U, U) => U, targetarr: Array[Any])
+ extends Task[Unit, ScanToArray[U]] {
+ var result = ();
+ def leaf(prev: Option[Unit]) = iterate(tree)
+ private def iterate(tree: ScanTree[U]): Unit = tree match {
+ case ScanNode(left, right) =>
+ iterate(left)
+ iterate(right)
+ case ScanLeaf(_, _, from, len, Some(prev), _) =>
+ scanLeaf(array, targetarr, from, len, prev.acc)
+ case ScanLeaf(_, _, from, len, None, _) =>
+ scanLeaf(array, targetarr, from, len, z)
+ }
+ private def scanLeaf(srcarr: Array[Any], targetarr: Array[Any], from: Int, len: Int, startval: U) {
+ var i = from
+ val until = from + len
+ var curr = startval
+ val operation = op
+ while (i < until) {
+ curr = operation(curr, srcarr(i).asInstanceOf[U])
+ i += 1
+ targetarr(i) = curr
+ }
+ }
+ def split = tree match {
+ case ScanNode(left, right) => Seq(
+ new ScanToArray(left, z, op, targetarr),
+ new ScanToArray(right, z, op, targetarr)
+ )
+ case _ => system.error("Can only split scan tree internal nodes.")
+ }
+ def shouldSplitFurther = tree match {
+ case ScanNode(_, _) => true
+ case _ => false
+ }
+ }
+
class Map[S](f: T => S, targetarr: Array[Any], offset: Int, howmany: Int) extends Task[Unit, Map[S]] {
var result = ();
def leaf(prev: Option[Unit]) = {