diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-09-13 16:07:33 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-09-13 16:07:33 +0000 |
commit | cfb6168dc5eef906b2552c7b40765db1d6720652 (patch) | |
tree | 4ba1ffd338f7dac34a7c08ac29a03e005b2ca34f | |
parent | f7a6c8823b39c622a1f96f9aaafb828427d4edda (diff) | |
download | scala-cfb6168dc5eef906b2552c7b40765db1d6720652.tar.gz scala-cfb6168dc5eef906b2552c7b40765db1d6720652.tar.bz2 scala-cfb6168dc5eef906b2552c7b40765db1d6720652.zip |
Improving parallel scan. No review
5 files changed, 102 insertions, 34 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index baff4de36a..02c3302c3c 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -12,6 +12,8 @@ import scala.collection.Sequentializable import scala.collection.generic._ +import java.util.concurrent.atomic.AtomicBoolean + // TODO update docs!! @@ -544,8 +546,10 @@ extends IterableLike[T, Repr] def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[Repr, U, That]): That = { val array = new Array[Any](size + 1) array(0) = z - executeAndWaitResult(new ScanToArray[U, Any](z, op, 1, size, array, parallelIterator) mapResult { u => - executeAndWaitResult(new FromArray(array, 0, size + 1, cbf) mapResult { _.result }) + executeAndWaitResult(new PartialScan[U, Any](z, op, 1, size, array, parallelIterator) mapResult { st => + executeAndWaitResult(new ApplyScanTree[U, Any](None, op, st, array) mapResult { u => + executeAndWaitResult(new FromArray(array, 0, size + 1, cbf) mapResult { _.result }); + }) }) } @@ -940,34 +944,84 @@ extends IterableLike[T, Repr] } } - protected[this] class ScanToArray[U >: T, A >: U](z: U, op: (U, U) => U, val from: Int, val len: Int, array: Array[A], val pit: ParIterator) - extends Accessor[Boolean, ScanToArray[U, A]] { - var result: Boolean = false // whether it was prefix-scanned, because previous result was already available - def leaf(prev: Option[Boolean]) = if (prev.isDefined) { // use prev result as an optimisation - val lastelem = array(from - 1) - pit.scanToArray(lastelem.asInstanceOf[U], op, array, from) - result = true - } else pit.scanToArray(z, op, array, from) - def newSubtask(p: ParIterator) = unsupported - override lazy val shouldSplitFurther = { - // we want less work stealings while prefix scanning - // and keep processors busier with merging - // (work stealing while prefix scanning means more work later) - val processors = parallelismLevel min availableProcessors - len > (((size + 1) / tweak(processors)) max 1) + protected[this] class ScanTree[U >: T](val from: Int, val len: Int) { + var value: U = _ + var left: ScanTree[U] = null + var right: ScanTree[U] = null + var shouldApply = true + + def isLeaf = (left eq null) && (right eq null) + def applyToInterval[A >: U](elem: U, op: (U, U) => U, array: Array[A]) = { + //executeAndWait(new ApplyToArray(elem, op, from, len, array)) + var i = from + val until = from + len + while (i < until) { + array(i) = op(elem, array(i).asInstanceOf[U]) + i += 1 + } + } + def pushDown(v: U, op: (U, U) => U) { + value = op(v, value) + if (left ne null) left.pushDown(v, op) + if (right ne null) right.pushDown(v, op) } - private def tweak(p: Int) = 2 // if (p < 4) p else p / 2 + def pushDownOnRight(v: U, op: (U, U) => U) = if (right ne null) right.pushDown(v, op) + def printTree: Unit = printTree(0) + private def printTree(t: Int): Unit = { + for (i <- 0 until t) print(" ") + if (isLeaf) print("(l) ") + println(value + ": from " + from + " until " + (from + len)) + if (left ne null) left.printTree(t + 1) + if (right ne null) right.printTree(t + 1) + } + } + + protected[this] class PartialScan[U >: T, A >: U](z: U, op: (U, U) => U, val from: Int, val len: Int, array: Array[A], val pit: ParIterator) + extends Accessor[ScanTree[U], PartialScan[U, A]] { + var result: ScanTree[U] = null + def leaf(prev: Option[ScanTree[U]]) = { + pit.scanToArray(z, op, array, from) + result = new ScanTree[U](from, len) + result.value = array(from + len - 1).asInstanceOf[U] + if (from == 1) result.shouldApply = false + } + def newSubtask(p: ParIterator) = unsupported override def split = { val pits = pit.split for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining); if untilp < len) yield { val plen = p.remaining min (len - untilp) - new ScanToArray[U, A](z, op, from + untilp, plen, array, p) + new PartialScan[U, A](z, op, from + untilp, plen, array, p) } } - override def merge(that: ScanToArray[U, A]) = if (!that.result) { // if previous result wasn't available when task was initiated - // apply the rightmost element of this array part to all the elements of `that` - executeAndWait(new ApplyToArray(array(that.from - 1).asInstanceOf[U], op, that.from, that.len, array)) + override def merge(that: PartialScan[U, A]) = { + val left = result + val right = that.result + val ns = new ScanTree[U](left.from, left.len + right.len) + ns.left = left + ns.right = right + ns.value = op(left.value, right.value) + ns.pushDownOnRight(left.value, op) + + result = ns + } + } + + protected[this] class ApplyScanTree[U >: T, A >: U](first: Option[U], op: (U, U) => U, st: ScanTree[U], array: Array[A]) + extends super.Task[Unit, ApplyScanTree[U, A]] { + var result = (); + def leaf(prev: Option[Unit]) = if (st.shouldApply) apply(st, first.get) + private def apply(st: ScanTree[U], elem: U) { + if (st.isLeaf) st.applyToInterval(elem, op, array) + else { + apply(st.left, elem) + apply(st.right, st.left.value) + } } + def split = collection.mutable.ArrayBuffer( + new ApplyScanTree(first, op, st.left, array), + new ApplyScanTree(Some(st.left.value), op, st.right, array) + ) + def shouldSplitFurther = (st.left ne null) && (st.right ne null) } protected[this] class ApplyToArray[U >: T, A >: U](elem: U, op: (U, U) => U, from: Int, len: Int, array: Array[A]) @@ -981,7 +1035,7 @@ extends IterableLike[T, Repr] i += 1 } } - def shouldSplitFurther = len > (parallelismLevel min availableProcessors) // threshold(size, parallelismLevel) + def shouldSplitFurther = len > threshold(size, parallelismLevel min availableProcessors) def split = { val fp = len / 2 val sp = len - fp diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index bbd894f89b..cbc85e43de 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -115,7 +115,7 @@ trait AdaptiveWorkStealingTasks extends Tasks { do { val subtasks = head.split head = subtasks.head - for (t <- subtasks.tail) { + for (t <- subtasks.tail.reverse) { t.next = last last = t t.start diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index 0d0b045912..aa0eb03365 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -535,25 +535,33 @@ extends ParSeq[T] override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[ParArray[T], S, That]) = if (buildsArray(bf(repr))) { // reserve an array - val targetarr = new Array[Any](length) + val targarrseq = new ArraySeq[S](length) + val targetarr = targarrseq.array.asInstanceOf[Array[Any]] // fill it in parallel executeAndWait(new Map[S](f, targetarr, 0, length)) // wrap it into a parallel array - (new ParArray[S](new ExposedArraySeq[S](targetarr.asInstanceOf[Array[AnyRef]], length))).asInstanceOf[That] + (new ParArray[S](targarrseq)).asInstanceOf[That] } else super.map(f)(bf) override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = if (buildsArray(cbf(repr))) { // reserve an array - val targetarr = new Array[Any](length + 1) + val targarrseq = new ArraySeq[U](length + 1) + val targetarr = targarrseq.array.asInstanceOf[Array[Any]] targetarr(0) = z // do a parallel prefix scan - executeAndWait(new ScanToArray[U, Any](z, op, 1, size, targetarr, parallelIterator)) + executeAndWait(new PartialScan[U, Any](z, op, 1, size, targetarr, parallelIterator) mapResult { st => + //println("-----------------------") + //println(targetarr.toList) + //st.printTree + executeAndWaitResult(new ApplyScanTree[U, Any](None, op, st, targetarr)) + }) + //println(targetarr.toList) // wrap the array into a parallel array - (new ParArray[U](new ExposedArraySeq[U](targetarr.asInstanceOf[Array[AnyRef]], length + 1))).asInstanceOf[That] + (new ParArray[U](targarrseq)).asInstanceOf[That] } else super.scan(z)(op)(cbf) /* tasks */ diff --git a/src/library/scala/collection/parallel/mutable/package.scala b/src/library/scala/collection/parallel/mutable/package.scala index f670c7b7c5..0590539a29 100644 --- a/src/library/scala/collection/parallel/mutable/package.scala +++ b/src/library/scala/collection/parallel/mutable/package.scala @@ -29,4 +29,4 @@ package object mutable { override val length = sz } -}
\ No newline at end of file +} diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanLight.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanLight.scala index b18b98a208..02a52ac207 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanLight.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanLight.scala @@ -8,10 +8,13 @@ import scala.collection.parallel.mutable.ParArray object ScanLight extends Companion { def benchName = "scan-light"; def apply(sz: Int, parallelism: Int, what: String) = new ScanLight(sz, parallelism, what) - override def comparisons = List() + override def comparisons = List("jsr") override def defaultSize = 40000 val op = (a: Cont, b: Cont) => { + operation(a, b) + } + def operation(a: Cont, b: Cont) = { val m = if (a.in < 0) 1 else 0 new Cont(a.in + b.in + m * (0 until 10).reduceLeft(_ + _)) } @@ -21,12 +24,15 @@ object ScanLight extends Companion { class ScanLight(sz: Int, p: Int, what: String) extends Resettable[Cont](sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) { def companion = ScanLight - override def repetitionsPerRun = 10 - override val runs = 10 + override def repetitionsPerRun = 50 + override val runs = 12 def runpar = pa.scan(new Cont(0))(ScanLight.op) def runseq = sequentialScan(new Cont(0), ScanLight.op, sz) - override def comparisonMap = collection.Map() + def runjsr = jsrarr.cumulate(new extra166y.Ops.Reducer[Cont] { + def op(a: Cont, b: Cont) = ScanLight.operation(a, b) + }, new Cont(0)) + override def comparisonMap = collection.Map("jsr" -> runjsr _) } |