diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-12-09 10:08:20 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-12-09 10:08:20 +0000 |
commit | f2ecbd04691b1914e2f77c60afc2b296aa6826ae (patch) | |
tree | 539b543eb173cfc7b0bbde4ca5f2c5bb187297df /src/library/scala/collection/parallel/mutable/ParArray.scala | |
parent | 492b22576f2ad46b300ce8dc31c5b672aaf517e4 (diff) | |
download | scala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.tar.gz scala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.tar.bz2 scala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.zip |
Array combiners implementation changed from arr...
Array combiners implementation changed from array buffers to doubling
unrolled buffers to avoid excessive copying. Still evaluating the
benefits of this.
No review.
Diffstat (limited to 'src/library/scala/collection/parallel/mutable/ParArray.scala')
-rw-r--r-- | src/library/scala/collection/parallel/mutable/ParArray.scala | 112 |
1 files changed, 89 insertions, 23 deletions
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index 909b8eb5d7..8f70547a03 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -71,7 +71,7 @@ self => class ParArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array) extends super.ParIterator { - me: SignalContextPassingIterator[ParArrayIterator] => + me: SignalContextPassingIterator[ParArrayIterator] => def hasNext = i < until @@ -83,6 +83,8 @@ self => def remaining = until - i + def dup = new ParArrayIterator(i, until, arr) with SCPI + def psplit(sizesIncomplete: Int*): Seq[ParIterator] = { var traversed = i val total = sizesIncomplete.reduceLeft(_ + _) @@ -439,13 +441,25 @@ self => override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = { cb.sizeHint(remaining) - cb.ifIs[ParArrayCombiner[T]] { pac => + cb.ifIs[ResizableParArrayCombiner[T]] { + pac => + // with res. combiner: val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]] Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i) pac.lastbuff.setInternalSize(remaining) } otherwise { - copy2builder_quick(cb, arr, until, i) - i = until + cb.ifIs[UnrolledParArrayCombiner[T]] { + pac => + // with unr. combiner: + val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]] + Array.copy(arr, i, targetarr, 0, until - i) + pac.buff.size = pac.buff.size + until - i + pac.buff.lastPtr.size = until - i + pac + } otherwise { + copy2builder_quick(cb, arr, until, i) + i = until + } } cb } @@ -495,21 +509,35 @@ self => } override def reverse2combiner[U >: T, This](cb: Combiner[U, This]): Combiner[U, This] = { - cb.ifIs[ParArrayCombiner[T]] { pac => + cb.ifIs[ResizableParArrayCombiner[T]] { + pac => + // with res. combiner: val sz = remaining pac.sizeHint(sz) val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]] - reverse2combiner_quick(targetarr, arr, i, until) + reverse2combiner_quick(targetarr, arr, 0, i, until) pac.lastbuff.setInternalSize(sz) pac - } otherwise super.reverse2combiner(cb) + } otherwise { + cb.ifIs[UnrolledParArrayCombiner[T]] { + pac => + // with unr. combiner: + val sz = remaining + pac.sizeHint(sz) + val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]] + reverse2combiner_quick(targetarr, arr, 0, i, until) + pac.buff.size = pac.buff.size + sz + pac.buff.lastPtr.size = sz + pac + } otherwise super.reverse2combiner(cb) + } cb } - private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) { - var j = from - var k = ntil - from - 1 - while (j < ntil) { + private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], targfrom: Int, srcfrom: Int, srcuntil: Int) { + var j = srcfrom + var k = targfrom + srcuntil - srcfrom - 1 + while (j < srcuntil) { targ(k) = a(j) j += 1 k -= 1 @@ -553,23 +581,61 @@ self => (new ParArray[S](targarrseq)).asInstanceOf[That] } else super.map(f)(bf) - override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = if (buildsArray(cbf(repr))) { - // reserve an array - val targarrseq = new ArraySeq[U](length + 1) - val targetarr = targarrseq.array.asInstanceOf[Array[Any]] - targetarr(0) = z + override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = + if (tasksupport.parallelismLevel > 1 && buildsArray(cbf(repr))) { + // reserve an array + val targarrseq = new ArraySeq[U](length + 1) + val targetarr = targarrseq.array.asInstanceOf[Array[Any]] + targetarr(0) = z - // do a parallel prefix scan - executeAndWaitResult(asTask[That, Task[That, _]](new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator).mapResult(st => - executeAndWaitResult(asTask[That, Task[That, _]](new ScanWithScanTree[U, Any](Some(z), op, st, array, targetarr))) - ))) + // do a parallel prefix scan + if (length > 0) executeAndWaitResult(new CreateScanTree[U](0, size, z, op, parallelIterator) mapResult { + tree => executeAndWaitResult(new ScanToArray(tree, z, op, targetarr)) + }) - // wrap the array into a parallel array - (new ParArray[U](targarrseq)).asInstanceOf[That] - } else super.scan(z)(op)(cbf) + // wrap the array into a parallel array + (new ParArray[U](targarrseq)).asInstanceOf[That] + } else super.scan(z)(op)(cbf) /* tasks */ + class ScanToArray[U >: T](tree: ScanTree[U], z: U, op: (U, U) => U, targetarr: Array[Any]) + extends Task[Unit, ScanToArray[U]] { + var result = (); + def leaf(prev: Option[Unit]) = iterate(tree) + private def iterate(tree: ScanTree[U]): Unit = tree match { + case ScanNode(left, right) => + iterate(left) + iterate(right) + case ScanLeaf(_, _, from, len, Some(prev), _) => + scanLeaf(array, targetarr, from, len, prev.acc) + case ScanLeaf(_, _, from, len, None, _) => + scanLeaf(array, targetarr, from, len, z) + } + private def scanLeaf(srcarr: Array[Any], targetarr: Array[Any], from: Int, len: Int, startval: U) { + var i = from + val until = from + len + var curr = startval + val operation = op + while (i < until) { + curr = operation(curr, srcarr(i).asInstanceOf[U]) + i += 1 + targetarr(i) = curr + } + } + def split = tree match { + case ScanNode(left, right) => Seq( + new ScanToArray(left, z, op, targetarr), + new ScanToArray(right, z, op, targetarr) + ) + case _ => system.error("Can only split scan tree internal nodes.") + } + def shouldSplitFurther = tree match { + case ScanNode(_, _) => true + case _ => false + } + } + class Map[S](f: T => S, targetarr: Array[Any], offset: Int, howmany: Int) extends Task[Unit, Map[S]] { var result = (); def leaf(prev: Option[Unit]) = { |