summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-09 10:08:20 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-09 10:08:20 +0000
commitf2ecbd04691b1914e2f77c60afc2b296aa6826ae (patch)
tree539b543eb173cfc7b0bbde4ca5f2c5bb187297df /src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
parent492b22576f2ad46b300ce8dc31c5b672aaf517e4 (diff)
downloadscala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.tar.gz
scala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.tar.bz2
scala-f2ecbd04691b1914e2f77c60afc2b296aa6826ae.zip
Array combiners implementation changed from arr...
Array combiners implementation changed from array buffers to doubling unrolled buffers to avoid excessive copying. Still evaluating the benefits of this. No review.
Diffstat (limited to 'src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala')
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala112
1 files changed, 105 insertions, 7 deletions
diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
index 760f8b09ce..339f827aef 100644
--- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
@@ -9,14 +9,112 @@ import scala.collection.mutable.ArraySeq
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.TaskSupport
import scala.collection.parallel.EnvironmentPassingCombiner
+import scala.collection.parallel.unsupportedop
+import scala.collection.parallel.UnrolledBuffer
+import scala.collection.parallel.UnrolledBuffer.Unrolled
+import scala.collection.parallel.Combiner
+private[mutable] class DoublingUnrolledBuffer[T](implicit m: ClassManifest[T]) extends UnrolledBuffer[T]()(m) {
+ override def calcNextLength(sz: Int) = if (sz < 10000) sz * 2 else sz
+ protected override def newUnrolled = new Unrolled[T](0, new Array[T](4), null, this)
+}
+
+
+
+/** An array combiner that uses doubling unrolled buffers to store elements. */
+trait UnrolledParArrayCombiner[T]
+extends Combiner[T, ParArray[T]] {
+self: EnvironmentPassingCombiner[T, ParArray[T]] =>
+ // because size is doubling, random access is O(logn)!
+ val buff = new DoublingUnrolledBuffer[Any]
+
+ import tasksupport._
+
+ def +=(elem: T) = {
+ buff += elem
+ this
+ }
+
+ def result = {
+ val arrayseq = new ArraySeq[T](size)
+ val array = arrayseq.array.asInstanceOf[Array[Any]]
+
+ executeAndWaitResult(new CopyUnrolledToArray(array, 0, size))
+
+ new ParArray(arrayseq)
+ }
+
+ def clear {
+ buff.clear
+ }
+
+ override def sizeHint(sz: Int) = {
+ buff.lastPtr.next = new Unrolled(0, new Array[Any](sz), null, buff)
+ buff.lastPtr = buff.lastPtr.next
+ }
+
+ def combine[N <: T, NewTo >: ParArray[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = other match {
+ case that if that eq this => this // just return this
+ case that: UnrolledParArrayCombiner[t] =>
+ buff concat that.buff
+ this
+ case _ => unsupportedop("Cannot combine with combiner of different type.")
+ }
+
+ def size = buff.size
+
+ /* tasks */
+
+ class CopyUnrolledToArray(array: Array[Any], offset: Int, howmany: Int)
+ extends Task[Unit, CopyUnrolledToArray] {
+ var result = ();
+ def leaf(prev: Option[Unit]) = if (howmany > 0) {
+ var totalleft = howmany
+ val (startnode, startpos) = findStart(offset)
+ var curr = startnode
+ var pos = startpos
+ var arroffset = offset
+ while (totalleft > 0) {
+ val lefthere = math.min(totalleft, curr.size - pos)
+ Array.copy(curr.array, pos, array, arroffset, lefthere)
+ // println("from: " + arroffset + " elems " + lefthere + " - " + pos + ", " + curr + " -> " + array.toList + " by " + this + " !! " + buff.headPtr)
+ totalleft -= lefthere
+ arroffset += lefthere
+ pos = 0
+ curr = curr.next
+ }
+ }
+ private def findStart(pos: Int) = {
+ var left = pos
+ var node = buff.headPtr
+ while ((left - node.size) >= 0) {
+ left -= node.size
+ node = node.next
+ }
+ (node, left)
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new CopyUnrolledToArray(array, offset, fp), new CopyUnrolledToArray(array, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, parallelismLevel)
+ override def toString = "CopyUnrolledToArray(" + offset + ", " + howmany + ")"
+ }
+}
+
+
+
+object UnrolledParArrayCombiner {
+ def apply[T](): UnrolledParArrayCombiner[T] = new UnrolledParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]]
+}
-trait ParArrayCombiner[T]
+/** An array combiner that uses a chain of arraybuffers to store elements. */
+trait ResizableParArrayCombiner[T]
extends LazyCombiner[T, ParArray[T], ExposedArrayBuffer[T]]
{
self: EnvironmentPassingCombiner[T, ParArray[T]] =>
@@ -24,7 +122,7 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] =>
override def sizeHint(sz: Int) = if (chain.length == 1) chain(0).sizeHint(sz)
- def newLazyCombiner(c: ArrayBuffer[ExposedArrayBuffer[T]]) = ParArrayCombiner(c)
+ def newLazyCombiner(c: ArrayBuffer[ExposedArrayBuffer[T]]) = ResizableParArrayCombiner(c)
def allocateAndCopy = if (chain.size > 1) {
val arrayseq = new ArraySeq[T](size)
@@ -38,7 +136,7 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] =>
pa
}
- override def toString = "ParArrayCombiner(" + size + "): " //+ chain
+ override def toString = "ResizableParArrayCombiner(" + size + "): " //+ chain
/* tasks */
@@ -86,11 +184,11 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] =>
}
-object ParArrayCombiner {
- def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ParArrayCombiner[T] = {
- new { val chain = c } with ParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]]
+object ResizableParArrayCombiner {
+ def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ResizableParArrayCombiner[T] = {
+ new { val chain = c } with ResizableParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]]
}
- def apply[T](): ParArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T])
+ def apply[T](): ResizableParArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T])
}