summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala100
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArray.scala18
-rw-r--r--src/library/scala/collection/parallel/mutable/package.scala2
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanLight.scala14
5 files changed, 102 insertions, 34 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index baff4de36a..02c3302c3c 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -12,6 +12,8 @@ import scala.collection.Sequentializable
import scala.collection.generic._
+import java.util.concurrent.atomic.AtomicBoolean
+
// TODO update docs!!
@@ -544,8 +546,10 @@ extends IterableLike[T, Repr]
def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[Repr, U, That]): That = {
val array = new Array[Any](size + 1)
array(0) = z
- executeAndWaitResult(new ScanToArray[U, Any](z, op, 1, size, array, parallelIterator) mapResult { u =>
- executeAndWaitResult(new FromArray(array, 0, size + 1, cbf) mapResult { _.result })
+ executeAndWaitResult(new PartialScan[U, Any](z, op, 1, size, array, parallelIterator) mapResult { st =>
+ executeAndWaitResult(new ApplyScanTree[U, Any](None, op, st, array) mapResult { u =>
+ executeAndWaitResult(new FromArray(array, 0, size + 1, cbf) mapResult { _.result });
+ })
})
}
@@ -940,34 +944,84 @@ extends IterableLike[T, Repr]
}
}
- protected[this] class ScanToArray[U >: T, A >: U](z: U, op: (U, U) => U, val from: Int, val len: Int, array: Array[A], val pit: ParIterator)
- extends Accessor[Boolean, ScanToArray[U, A]] {
- var result: Boolean = false // whether it was prefix-scanned, because previous result was already available
- def leaf(prev: Option[Boolean]) = if (prev.isDefined) { // use prev result as an optimisation
- val lastelem = array(from - 1)
- pit.scanToArray(lastelem.asInstanceOf[U], op, array, from)
- result = true
- } else pit.scanToArray(z, op, array, from)
- def newSubtask(p: ParIterator) = unsupported
- override lazy val shouldSplitFurther = {
- // we want less work stealings while prefix scanning
- // and keep processors busier with merging
- // (work stealing while prefix scanning means more work later)
- val processors = parallelismLevel min availableProcessors
- len > (((size + 1) / tweak(processors)) max 1)
+ protected[this] class ScanTree[U >: T](val from: Int, val len: Int) {
+ var value: U = _
+ var left: ScanTree[U] = null
+ var right: ScanTree[U] = null
+ var shouldApply = true
+
+ def isLeaf = (left eq null) && (right eq null)
+ def applyToInterval[A >: U](elem: U, op: (U, U) => U, array: Array[A]) = {
+ //executeAndWait(new ApplyToArray(elem, op, from, len, array))
+ var i = from
+ val until = from + len
+ while (i < until) {
+ array(i) = op(elem, array(i).asInstanceOf[U])
+ i += 1
+ }
+ }
+ def pushDown(v: U, op: (U, U) => U) {
+ value = op(v, value)
+ if (left ne null) left.pushDown(v, op)
+ if (right ne null) right.pushDown(v, op)
}
- private def tweak(p: Int) = 2 // if (p < 4) p else p / 2
+ def pushDownOnRight(v: U, op: (U, U) => U) = if (right ne null) right.pushDown(v, op)
+ def printTree: Unit = printTree(0)
+ private def printTree(t: Int): Unit = {
+ for (i <- 0 until t) print(" ")
+ if (isLeaf) print("(l) ")
+ println(value + ": from " + from + " until " + (from + len))
+ if (left ne null) left.printTree(t + 1)
+ if (right ne null) right.printTree(t + 1)
+ }
+ }
+
+ protected[this] class PartialScan[U >: T, A >: U](z: U, op: (U, U) => U, val from: Int, val len: Int, array: Array[A], val pit: ParIterator)
+ extends Accessor[ScanTree[U], PartialScan[U, A]] {
+ var result: ScanTree[U] = null
+ def leaf(prev: Option[ScanTree[U]]) = {
+ pit.scanToArray(z, op, array, from)
+ result = new ScanTree[U](from, len)
+ result.value = array(from + len - 1).asInstanceOf[U]
+ if (from == 1) result.shouldApply = false
+ }
+ def newSubtask(p: ParIterator) = unsupported
override def split = {
val pits = pit.split
for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining); if untilp < len) yield {
val plen = p.remaining min (len - untilp)
- new ScanToArray[U, A](z, op, from + untilp, plen, array, p)
+ new PartialScan[U, A](z, op, from + untilp, plen, array, p)
}
}
- override def merge(that: ScanToArray[U, A]) = if (!that.result) { // if previous result wasn't available when task was initiated
- // apply the rightmost element of this array part to all the elements of `that`
- executeAndWait(new ApplyToArray(array(that.from - 1).asInstanceOf[U], op, that.from, that.len, array))
+ override def merge(that: PartialScan[U, A]) = {
+ val left = result
+ val right = that.result
+ val ns = new ScanTree[U](left.from, left.len + right.len)
+ ns.left = left
+ ns.right = right
+ ns.value = op(left.value, right.value)
+ ns.pushDownOnRight(left.value, op)
+
+ result = ns
+ }
+ }
+
+ protected[this] class ApplyScanTree[U >: T, A >: U](first: Option[U], op: (U, U) => U, st: ScanTree[U], array: Array[A])
+ extends super.Task[Unit, ApplyScanTree[U, A]] {
+ var result = ();
+ def leaf(prev: Option[Unit]) = if (st.shouldApply) apply(st, first.get)
+ private def apply(st: ScanTree[U], elem: U) {
+ if (st.isLeaf) st.applyToInterval(elem, op, array)
+ else {
+ apply(st.left, elem)
+ apply(st.right, st.left.value)
+ }
}
+ def split = collection.mutable.ArrayBuffer(
+ new ApplyScanTree(first, op, st.left, array),
+ new ApplyScanTree(Some(st.left.value), op, st.right, array)
+ )
+ def shouldSplitFurther = (st.left ne null) && (st.right ne null)
}
protected[this] class ApplyToArray[U >: T, A >: U](elem: U, op: (U, U) => U, from: Int, len: Int, array: Array[A])
@@ -981,7 +1035,7 @@ extends IterableLike[T, Repr]
i += 1
}
}
- def shouldSplitFurther = len > (parallelismLevel min availableProcessors) // threshold(size, parallelismLevel)
+ def shouldSplitFurther = len > threshold(size, parallelismLevel min availableProcessors)
def split = {
val fp = len / 2
val sp = len - fp
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index bbd894f89b..cbc85e43de 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -115,7 +115,7 @@ trait AdaptiveWorkStealingTasks extends Tasks {
do {
val subtasks = head.split
head = subtasks.head
- for (t <- subtasks.tail) {
+ for (t <- subtasks.tail.reverse) {
t.next = last
last = t
t.start
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
index 0d0b045912..aa0eb03365 100644
--- a/src/library/scala/collection/parallel/mutable/ParArray.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -535,25 +535,33 @@ extends ParSeq[T]
override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[ParArray[T], S, That]) = if (buildsArray(bf(repr))) {
// reserve an array
- val targetarr = new Array[Any](length)
+ val targarrseq = new ArraySeq[S](length)
+ val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
// fill it in parallel
executeAndWait(new Map[S](f, targetarr, 0, length))
// wrap it into a parallel array
- (new ParArray[S](new ExposedArraySeq[S](targetarr.asInstanceOf[Array[AnyRef]], length))).asInstanceOf[That]
+ (new ParArray[S](targarrseq)).asInstanceOf[That]
} else super.map(f)(bf)
override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanCombineFrom[ParArray[T], U, That]): That = if (buildsArray(cbf(repr))) {
// reserve an array
- val targetarr = new Array[Any](length + 1)
+ val targarrseq = new ArraySeq[U](length + 1)
+ val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
targetarr(0) = z
// do a parallel prefix scan
- executeAndWait(new ScanToArray[U, Any](z, op, 1, size, targetarr, parallelIterator))
+ executeAndWait(new PartialScan[U, Any](z, op, 1, size, targetarr, parallelIterator) mapResult { st =>
+ //println("-----------------------")
+ //println(targetarr.toList)
+ //st.printTree
+ executeAndWaitResult(new ApplyScanTree[U, Any](None, op, st, targetarr))
+ })
+ //println(targetarr.toList)
// wrap the array into a parallel array
- (new ParArray[U](new ExposedArraySeq[U](targetarr.asInstanceOf[Array[AnyRef]], length + 1))).asInstanceOf[That]
+ (new ParArray[U](targarrseq)).asInstanceOf[That]
} else super.scan(z)(op)(cbf)
/* tasks */
diff --git a/src/library/scala/collection/parallel/mutable/package.scala b/src/library/scala/collection/parallel/mutable/package.scala
index f670c7b7c5..0590539a29 100644
--- a/src/library/scala/collection/parallel/mutable/package.scala
+++ b/src/library/scala/collection/parallel/mutable/package.scala
@@ -29,4 +29,4 @@ package object mutable {
override val length = sz
}
-} \ No newline at end of file
+}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanLight.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanLight.scala
index b18b98a208..02a52ac207 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanLight.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ScanLight.scala
@@ -8,10 +8,13 @@ import scala.collection.parallel.mutable.ParArray
object ScanLight extends Companion {
def benchName = "scan-light";
def apply(sz: Int, parallelism: Int, what: String) = new ScanLight(sz, parallelism, what)
- override def comparisons = List()
+ override def comparisons = List("jsr")
override def defaultSize = 40000
val op = (a: Cont, b: Cont) => {
+ operation(a, b)
+ }
+ def operation(a: Cont, b: Cont) = {
val m = if (a.in < 0) 1 else 0
new Cont(a.in + b.in + m * (0 until 10).reduceLeft(_ + _))
}
@@ -21,12 +24,15 @@ object ScanLight extends Companion {
class ScanLight(sz: Int, p: Int, what: String)
extends Resettable[Cont](sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) {
def companion = ScanLight
- override def repetitionsPerRun = 10
- override val runs = 10
+ override def repetitionsPerRun = 50
+ override val runs = 12
def runpar = pa.scan(new Cont(0))(ScanLight.op)
def runseq = sequentialScan(new Cont(0), ScanLight.op, sz)
- override def comparisonMap = collection.Map()
+ def runjsr = jsrarr.cumulate(new extra166y.Ops.Reducer[Cont] {
+ def op(a: Cont, b: Cont) = ScanLight.operation(a, b)
+ }, new Cont(0))
+ override def comparisonMap = collection.Map("jsr" -> runjsr _)
}