diff options
Diffstat (limited to 'src/library/scala/collection/parallel/mutable/ParArray.scala')
-rw-r--r-- | src/library/scala/collection/parallel/mutable/ParArray.scala | 112 |
1 files changed, 89 insertions, 23 deletions
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index 909b8eb5d7..8f70547a03 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -71,7 +71,7 @@ self => class ParArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array) extends super.ParIterator { - me: SignalContextPassingIterator[ParArrayIterator] => + me: SignalContextPassingIterator[ParArrayIterator] => def hasNext = i < until @@ -83,6 +83,8 @@ self => def remaining = until - i + def dup = new ParArrayIterator(i, until, arr) with SCPI + def psplit(sizesIncomplete: Int*): Seq[ParIterator] = { var traversed = i val total = sizesIncomplete.reduceLeft(_ + _) @@ -439,13 +441,25 @@ self => override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = { cb.sizeHint(remaining) - cb.ifIs[ParArrayCombiner[T]] { pac => + cb.ifIs[ResizableParArrayCombiner[T]] { + pac => + // with res. combiner: val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]] Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i) pac.lastbuff.setInternalSize(remaining) } otherwise { - copy2builder_quick(cb, arr, until, i) - i = until + cb.ifIs[UnrolledParArrayCombiner[T]] { + pac => + // with unr. combiner: + val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]] + Array.copy(arr, i, targetarr, 0, until - i) + pac.buff.size = pac.buff.size + until - i + pac.buff.lastPtr.size = until - i + pac + } otherwise { + copy2builder_quick(cb, arr, until, i) + i = until + } } cb } @@ -495,21 +509,35 @@ self => } override def reverse2combiner[U >: T, This](cb: Combiner[U, This]): Combiner[U, This] = { - cb.ifIs[ParArrayCombiner[T]] { pac => + cb.ifIs[ResizableParArrayCombiner[T]] { + pac => + // with res. combiner: val sz = remaining pac.sizeHint(sz) val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]] - reverse2combiner_quick(targetarr, arr, i, until) + reverse2combiner_quick(targetarr, arr, 0, i, until) pac.lastbuff.setInternalSize(sz) pac - } otherwise super.reverse2combiner(cb) + } otherwise { + cb.ifIs[UnrolledParArrayCombiner[T]] { + pac => + // with unr. combiner: + val sz = remaining + pac.sizeHint(sz) + val targetarr: Array[Any] = pac.buff.lastPtr.array.asInstanceOf[Array[Any]] + reverse2combiner_quick(targetarr, arr, 0, i, until) + pac.buff.size = pac.buff.size + sz + pac.buff.lastPtr.size = sz + pac + } otherwise super.reverse2combiner(cb) + } cb } - private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) { - var j = from - var k = ntil - from - 1 - while (j < ntil) { + private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], targfrom: Int, srcfrom: Int, srcuntil: Int) { + var j = srcfrom + var k = targfrom + srcuntil - srcfrom - 1 + while (j < srcuntil) { targ(k) = a(j) j += 1 k -= 1 @@ -553,23 +581,61 @@ self => (new ParArray[S](targarrseq)).asInstanceOf[That] } else super.map(f)(bf) - override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = if (buildsArray(cbf(repr))) { - // reserve an array - val targarrseq = new ArraySeq[U](length + 1) - val targetarr = targarrseq.array.asInstanceOf[Array[Any]] - targetarr(0) = z + override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = + if (tasksupport.parallelismLevel > 1 && buildsArray(cbf(repr))) { + // reserve an array + val targarrseq = new ArraySeq[U](length + 1) + val targetarr = targarrseq.array.asInstanceOf[Array[Any]] + targetarr(0) = z - // do a parallel prefix scan - executeAndWaitResult(asTask[That, Task[That, _]](new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator).mapResult(st => - executeAndWaitResult(asTask[That, Task[That, _]](new ScanWithScanTree[U, Any](Some(z), op, st, array, targetarr))) - ))) + // do a parallel prefix scan + if (length > 0) executeAndWaitResult(new CreateScanTree[U](0, size, z, op, parallelIterator) mapResult { + tree => executeAndWaitResult(new ScanToArray(tree, z, op, targetarr)) + }) - // wrap the array into a parallel array - (new ParArray[U](targarrseq)).asInstanceOf[That] - } else super.scan(z)(op)(cbf) + // wrap the array into a parallel array + (new ParArray[U](targarrseq)).asInstanceOf[That] + } else super.scan(z)(op)(cbf) /* tasks */ + class ScanToArray[U >: T](tree: ScanTree[U], z: U, op: (U, U) => U, targetarr: Array[Any]) + extends Task[Unit, ScanToArray[U]] { + var result = (); + def leaf(prev: Option[Unit]) = iterate(tree) + private def iterate(tree: ScanTree[U]): Unit = tree match { + case ScanNode(left, right) => + iterate(left) + iterate(right) + case ScanLeaf(_, _, from, len, Some(prev), _) => + scanLeaf(array, targetarr, from, len, prev.acc) + case ScanLeaf(_, _, from, len, None, _) => + scanLeaf(array, targetarr, from, len, z) + } + private def scanLeaf(srcarr: Array[Any], targetarr: Array[Any], from: Int, len: Int, startval: U) { + var i = from + val until = from + len + var curr = startval + val operation = op + while (i < until) { + curr = operation(curr, srcarr(i).asInstanceOf[U]) + i += 1 + targetarr(i) = curr + } + } + def split = tree match { + case ScanNode(left, right) => Seq( + new ScanToArray(left, z, op, targetarr), + new ScanToArray(right, z, op, targetarr) + ) + case _ => system.error("Can only split scan tree internal nodes.") + } + def shouldSplitFurther = tree match { + case ScanNode(_, _) => true + case _ => false + } + } + class Map[S](f: T => S, targetarr: Array[Any], offset: Int, howmany: Int) extends Task[Unit, Map[S]] { var result = (); def leaf(prev: Option[Unit]) = { |