summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/RemainsIterator.scala
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-09-22 14:18:19 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-09-22 14:18:19 +0000
commit285d2182f1aeb113aba55be804eefa2f61ce2624 (patch)
treed6fab90800c9428e946d913b42ac662785d4e15b /src/library/scala/collection/parallel/RemainsIterator.scala
parenta5d47fb693d9b88ea9ed414762f16e027be64ada (diff)
downloadscala-285d2182f1aeb113aba55be804eefa2f61ce2624.tar.gz
scala-285d2182f1aeb113aba55be804eefa2f61ce2624.tar.bz2
scala-285d2182f1aeb113aba55be804eefa2f61ce2624.zip
Reimplementing parallel views to solve several ...
Reimplementing parallel views to solve several performance glitches. No review.
Diffstat (limited to 'src/library/scala/collection/parallel/RemainsIterator.scala')
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala116
1 files changed, 114 insertions, 2 deletions
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index 8296d92e59..43acf3b41e 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -21,7 +21,8 @@ trait RemainsIterator[+T] extends Iterator[T] {
/** Augments iterators with additional methods, mostly transformers,
* assuming they iterate an iterable collection.
*
- * @param T type of the elements iterated.
+ * @param T type of the elements iterated.
+ * @param IterRepr iterator type.
*/
trait AugmentedIterableIterator[+T] extends RemainsIterator[T] {
@@ -326,6 +327,8 @@ extends AugmentedIterableIterator[T]
with Signalling
with DelegatedSignalling
{
+self =>
+
def split: Seq[ParIterableIterator[T]]
/** The number of elements this iterator has yet to traverse. This method
@@ -339,12 +342,66 @@ extends AugmentedIterableIterator[T]
*
* In that case, 2 considerations must be taken into account:
*
- * 1) classes that inherit `ParIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt` and `copyToArray`.
+ * 1) classes that inherit `ParIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt`, `copyToArray`
+ * and which use tasks having the iterated subset length as a ctor argument.
*
* 2) if an iterator provides an upper bound on the number of elements, then after splitting the sum
* of `remaining` values of split iterators must be less than or equal to this upper bound.
*/
def remaining: Int
+
+ /* iterator transformers */
+
+ class Taken(taken: Int) extends ParIterableIterator[T] {
+ var signalDelegate = self.signalDelegate
+ var remaining = taken min self.remaining
+ def hasNext = remaining > 0
+ def next = { remaining -= 1; self.next }
+ def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) }
+ protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = {
+ val shortened = for ((it, total) <- sq zip sq.scanLeft(0)(_ + _.remaining).tail) yield
+ if (total < remaining) it else taker(it, total - remaining)
+ shortened filter { _.remaining > 0 }
+ }
+ }
+
+ override def take(n: Int) = new Taken(n)
+
+ override def slice(from1: Int, until1: Int) = {
+ val it = new Taken(until1)
+ var todrop = from1
+ while (todrop > 0 && it.hasNext) it.next
+ it
+ }
+
+ class Mapped[S](f: T => S) extends ParIterableIterator[S] {
+ var signalDelegate = self.signalDelegate
+ def hasNext = self.hasNext
+ def next = f(self.next)
+ def remaining = self.remaining
+ def split: Seq[ParIterableIterator[S]] = self.split.map { _ map f }
+ }
+
+ override def map[S](f: T => S) = new Mapped(f)
+
+ class Appended[U >: T, PI <: ParIterableIterator[U]](protected val that: PI) extends ParIterableIterator[U] {
+ var signalDelegate = self.signalDelegate
+ protected var curr: ParIterableIterator[U] = self
+ def hasNext = if (curr.hasNext) true else if (curr eq self) {
+ curr = that
+ curr.hasNext
+ } else false
+ def next = if (curr eq self) {
+ hasNext
+ curr.next
+ } else curr.next
+ def remaining = if (curr eq self) curr.remaining + that.remaining else curr.remaining
+ protected def firstNonEmpty = (curr eq self) && curr.hasNext
+ def split: Seq[ParIterableIterator[U]] = if (firstNonEmpty) Seq(curr, that) else curr.split
+ }
+
+ def appendIterable[U >: T, PI <: ParIterableIterator[U]](that: PI) = new Appended[U, PI](that)
+
}
@@ -353,6 +410,7 @@ extends ParIterableIterator[T]
with AugmentedSeqIterator[T]
with PreciseSplitter[T]
{
+self =>
def split: Seq[ParSeqIterator[T]]
def psplit(sizes: Int*): Seq[ParSeqIterator[T]]
@@ -364,6 +422,60 @@ extends ParIterableIterator[T]
* @return an exact number of elements this iterator has yet to iterate
*/
def remaining: Int
+
+ /* iterator transformers */
+
+ class Taken(tk: Int) extends super.Taken(tk) with ParSeqIterator[T] {
+ override def split: Seq[ParSeqIterator[T]] = super.split.asInstanceOf[Seq[ParSeqIterator[T]]]
+ def psplit(sizes: Int*): Seq[ParSeqIterator[T]] = takeSeq(self.psplit(sizes: _*)) { (p, n) => p.take(n) }
+ }
+
+ override def take(n: Int) = new Taken(n)
+
+ override def slice(from1: Int, until1: Int) = {
+ val it = new Taken(until1)
+ var todrop = from1
+ while (todrop > 0 && it.hasNext) it.next
+ it
+ }
+
+ class Mapped[S](f: T => S) extends super.Mapped[S](f) with ParSeqIterator[S] {
+ override def split: Seq[ParSeqIterator[S]] = super.split.asInstanceOf[Seq[ParSeqIterator[S]]]
+ def psplit(sizes: Int*): Seq[ParSeqIterator[S]] = self.psplit(sizes: _*).map { _ map f }
+ }
+
+ override def map[S](f: T => S) = new Mapped(f)
+
+ class Appended[U >: T, PI <: ParSeqIterator[U]](it: PI) extends super.Appended[U, PI](it) with ParSeqIterator[U] {
+ override def split: Seq[ParSeqIterator[U]] = super.split.asInstanceOf[Seq[ParSeqIterator[U]]]
+ def psplit(sizes: Int*): Seq[ParSeqIterator[U]] = if (firstNonEmpty) {
+ val selfrem = self.remaining
+
+ // split sizes
+ var appendMiddle = false
+ val szcum = sizes.scanLeft(0)(_ + _)
+ val splitsizes = sizes.zip(szcum.init zip szcum.tail).flatMap { t =>
+ val (sz, (from, until)) = t
+ if (from < selfrem && until > selfrem) {
+ appendMiddle = true
+ Seq(selfrem - from, until - selfrem)
+ } else Seq(sz)
+ }
+ val (selfszfrom, thatszfrom) = splitsizes.zip(szcum.init).span(_._2 < selfrem)
+ val (selfsizes, thatsizes) = (selfszfrom map { _._1 }, thatszfrom map { _._1 });
+
+ // split iterators
+ val selfs = self.psplit(selfsizes: _*)
+ val thats = that.psplit(thatsizes: _*)
+
+ // appended last in self with first in rest if necessary
+ if (appendMiddle) selfs.init ++ Seq(selfs.last.appendSeq[U, ParSeqIterator[U]](thats.head)) ++ thats.tail
+ else selfs ++ thats
+ } else curr.asInstanceOf[ParSeqIterator[U]].psplit(sizes: _*)
+ }
+
+ def appendSeq[U >: T, PI <: ParSeqIterator[U]](that: PI) = new Appended[U, PI](that)
+
}