From c6cc8c72820a5d540b9bfa4a8dc52eb0923936f1 Mon Sep 17 00:00:00 2001 From: Aleksandar Pokopec Date: Mon, 26 Jul 2010 16:31:47 +0000 Subject: Refactored benchmarks. --- .../collection/parallel/ParIterableLike.scala | 95 +++++++++++++++++++++- .../collection/parallel/RemainsIterator.scala | 11 +++ .../collection/parallel/mutable/ParArray.scala | 42 +++++++++- 3 files changed, 146 insertions(+), 2 deletions(-) (limited to 'src/library') diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 82e1b60e25..0769181150 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -518,6 +518,29 @@ extends IterableLike[T, Repr] executeAndWaitResult(new SplitAt(n, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) }) } + /** Computes a prefix scan of the elements of the collection. + * + * Note: The neutral element `z` may be applied more than once. + * + * @tparam U element type of the resulting collection + * @tparam That type of the resulting collection + * @param z neutral element for the operator `op` + * @param op the associative operator for the scan + * @param cbf combiner factory which provides a combiner + * @return a collection containing the prefix scan of the elements in the original collection + * + * @usecase def scan(z: T)(op: (T, T) => T): $Coll[T] + * + * @return a new $coll containing the prefix scan of the elements in this $coll + */ + def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[Repr, U, That]): That = { + val array = new Array[Any](size + 1) + array(0) = z + executeAndWaitResult(new ScanToArray[U, Any](z, op, 1, size, array, parallelIterator) mapResult { u => + executeAndWaitResult(new FromArray(array, 0, size + 1, cbf) mapResult { _.result }) + }) + } + /** Takes the longest prefix of elements that satisfy the predicate. * * $indexsignalling @@ -899,7 +922,7 @@ extends IterableLike[T, Repr] extends Accessor[Unit, CopyToArray[U, This]] { var result: Unit = () def leaf(prev: Option[Unit]) = pit.copyToArray(array, from, len) - def newSubtask(p: ParIterator) = throw new UnsupportedOperationException + def newSubtask(p: ParIterator) = unsupported override def split = { val pits = pit.split for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining); if untilp < len) yield { @@ -909,6 +932,76 @@ extends IterableLike[T, Repr] } } + protected[this] class ScanToArray[U >: T, A >: U](z: U, op: (U, U) => U, val from: Int, val len: Int, array: Array[A], val pit: ParIterator) + extends Accessor[Boolean, ScanToArray[U, A]] { + var result: Boolean = false // whether it was prefix-scanned, because previous result was already available + def leaf(prev: Option[Boolean]) = if (prev.isDefined) { // use prev result as an optimisation + val lastelem = array(from - 1) + pit.scanToArray(lastelem.asInstanceOf[U], op, array, from) + result = true + } else pit.scanToArray(z, op, array, from) + def newSubtask(p: ParIterator) = unsupported + override def shouldSplitFurther = len > size / 2 + override def split = { + val pits = pit.split + for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining); if untilp < len) yield { + val plen = p.remaining min (len - untilp) + new ScanToArray[U, A](z, op, from + untilp, plen, array, p) + } + } + override def merge(that: ScanToArray[U, A]) = if (!that.result) { // if previous result wasn't available when task was initiated + // apply the rightmost element of this array part to all the elements of `that` + executeAndWait(new ApplyToArray(array(that.from - 1).asInstanceOf[U], op, that.from, that.len, array)) + } + } + + protected[this] class ApplyToArray[U >: T, A >: U](elem: U, op: (U, U) => U, from: Int, len: Int, array: Array[A]) + extends super.Task[Unit, ApplyToArray[U, A]] { + var result: Unit = () + def leaf(prev: Option[Unit]) = { + var i = from + val until = from + len + while (i < until) { + array(i) = op(elem, array(i).asInstanceOf[U]) + i += 1 + } + } + def shouldSplitFurther = len > threshold(size, parallelismLevel) + def split = { + val fp = len / 2 + val sp = len - fp + Seq( + new ApplyToArray(elem, op, from, fp, array), + new ApplyToArray(elem, op, from + fp, sp, array) + ) + } + } + + protected[this] class FromArray[S, A, That](array: Array[A], from: Int, len: Int, cbf: CanCombineFrom[Repr, S, That]) + extends super.Task[Combiner[S, That], FromArray[S, A, That]] { + var result: Result = null + def leaf(prev: Option[Result]) = { + val cb = prev getOrElse cbf(self.repr) + var i = from + val until = from + len + while (i < until) { + cb += array(i).asInstanceOf[S] + i += 1 + } + result = cb + } + def shouldSplitFurther = len > threshold(size, parallelismLevel) + def split = { + val fp = len / 2 + val sp = len - fp + Seq( + new FromArray(array, from, fp, cbf), + new FromArray(array, from + fp, sp, cbf) + ) + } + override def merge(that: FromArray[S, A, That]) = result = result combine that.result + } + } diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index bf8ae4a834..18878a3bba 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -209,6 +209,17 @@ trait AugmentedIterableIterator[+T, +Repr <: Parallel] extends RemainsIterator[T while (hasNext) after += next (before, after) } + + def scanToArray[U >: T, A >: U](z: U, op: (U, U) => U, array: Array[A], from: Int) { + var last = z + var i = from + while (hasNext) { + last = op(last, next) + array(i) = last + i += 1 + } + } + } diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index 2443888465..da99db860b 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -510,6 +510,34 @@ extends ParSeq[T] } } + override def scanToArray[U >: T, A >: U](z: U, op: (U, U) => U, destarr: Array[A], from: Int) { + // var last = z + // var j = from + // var k = i + // val ntil = until + // val a = arr + // while (k < ntil) { + // last = op(last, a(k).asInstanceOf[U]) + // destarr(j) = last + // k += 1 + // } + // i = k + scanToArray_quick[U](array, destarr.asInstanceOf[Array[Any]], op, z, i, until, from) + i = until + } + + protected def scanToArray_quick[U](srcarr: Array[Any], destarr: Array[Any], op: (U, U) => U, z: U, srcfrom: Int, srcntil: Int, destfrom: Int) { + var last = z + var j = srcfrom + var k = destfrom + while (j < srcntil) { + last = op(last, srcarr(j).asInstanceOf[U]) + destarr(k) = last + j += 1 + k += 1 + } + } + } /* operations */ @@ -517,7 +545,7 @@ extends ParSeq[T] private def buildsArray[S, That](c: Builder[S, That]) = c.isInstanceOf[ParArrayCombiner[_]] override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[ParArray[T], S, That]) = if (buildsArray(bf(repr))) { - // reserve array + // reserve an array val targetarr = new Array[Any](length) // fill it in parallel @@ -527,6 +555,18 @@ extends ParSeq[T] (new ParArray[S](new ExposedArraySeq[S](targetarr.asInstanceOf[Array[AnyRef]], length))).asInstanceOf[That] } else super.map(f)(bf) + override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = if (buildsArray(cbf(repr))) { + // reserve an array + val targetarr = new Array[Any](length + 1) + targetarr(0) = z + + // do a parallel prefix scan + executeAndWait(new ScanToArray[U, Any](z, op, 1, size, targetarr, parallelIterator)) + + // wrap the array into a parallel array + (new ParArray[U](new ExposedArraySeq[U](targetarr.asInstanceOf[Array[AnyRef]], length + 1))).asInstanceOf[That] + } else super.scan(z)(op)(cbf) + /* tasks */ class Map[S](f: T => S, targetarr: Array[Any], offset: Int, howmany: Int) extends super.Task[Unit, Map[S]] { -- cgit v1.2.3