diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-09-22 14:18:19 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-09-22 14:18:19 +0000 |
commit | 285d2182f1aeb113aba55be804eefa2f61ce2624 (patch) | |
tree | d6fab90800c9428e946d913b42ac662785d4e15b /src/library/scala/collection/parallel/RemainsIterator.scala | |
parent | a5d47fb693d9b88ea9ed414762f16e027be64ada (diff) | |
download | scala-285d2182f1aeb113aba55be804eefa2f61ce2624.tar.gz scala-285d2182f1aeb113aba55be804eefa2f61ce2624.tar.bz2 scala-285d2182f1aeb113aba55be804eefa2f61ce2624.zip |
Reimplementing parallel views to solve several ...
Reimplementing parallel views to solve several performance glitches. No
review.
Diffstat (limited to 'src/library/scala/collection/parallel/RemainsIterator.scala')
-rw-r--r-- | src/library/scala/collection/parallel/RemainsIterator.scala | 116 |
1 files changed, 114 insertions, 2 deletions
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index 8296d92e59..43acf3b41e 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -21,7 +21,8 @@ trait RemainsIterator[+T] extends Iterator[T] { /** Augments iterators with additional methods, mostly transformers, * assuming they iterate an iterable collection. * - * @param T type of the elements iterated. + * @param T type of the elements iterated. + * @param IterRepr iterator type. */ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] { @@ -326,6 +327,8 @@ extends AugmentedIterableIterator[T] with Signalling with DelegatedSignalling { +self => + def split: Seq[ParIterableIterator[T]] /** The number of elements this iterator has yet to traverse. This method @@ -339,12 +342,66 @@ extends AugmentedIterableIterator[T] * * In that case, 2 considerations must be taken into account: * - * 1) classes that inherit `ParIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt` and `copyToArray`. + * 1) classes that inherit `ParIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt`, `copyToArray` + * and which use tasks having the iterated subset length as a ctor argument. * * 2) if an iterator provides an upper bound on the number of elements, then after splitting the sum * of `remaining` values of split iterators must be less than or equal to this upper bound. */ def remaining: Int + + /* iterator transformers */ + + class Taken(taken: Int) extends ParIterableIterator[T] { + var signalDelegate = self.signalDelegate + var remaining = taken min self.remaining + def hasNext = remaining > 0 + def next = { remaining -= 1; self.next } + def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) } + protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = { + val shortened = for ((it, total) <- sq zip sq.scanLeft(0)(_ + _.remaining).tail) yield + if (total < remaining) it else taker(it, total - remaining) + shortened filter { _.remaining > 0 } + } + } + + override def take(n: Int) = new Taken(n) + + override def slice(from1: Int, until1: Int) = { + val it = new Taken(until1) + var todrop = from1 + while (todrop > 0 && it.hasNext) it.next + it + } + + class Mapped[S](f: T => S) extends ParIterableIterator[S] { + var signalDelegate = self.signalDelegate + def hasNext = self.hasNext + def next = f(self.next) + def remaining = self.remaining + def split: Seq[ParIterableIterator[S]] = self.split.map { _ map f } + } + + override def map[S](f: T => S) = new Mapped(f) + + class Appended[U >: T, PI <: ParIterableIterator[U]](protected val that: PI) extends ParIterableIterator[U] { + var signalDelegate = self.signalDelegate + protected var curr: ParIterableIterator[U] = self + def hasNext = if (curr.hasNext) true else if (curr eq self) { + curr = that + curr.hasNext + } else false + def next = if (curr eq self) { + hasNext + curr.next + } else curr.next + def remaining = if (curr eq self) curr.remaining + that.remaining else curr.remaining + protected def firstNonEmpty = (curr eq self) && curr.hasNext + def split: Seq[ParIterableIterator[U]] = if (firstNonEmpty) Seq(curr, that) else curr.split + } + + def appendIterable[U >: T, PI <: ParIterableIterator[U]](that: PI) = new Appended[U, PI](that) + } @@ -353,6 +410,7 @@ extends ParIterableIterator[T] with AugmentedSeqIterator[T] with PreciseSplitter[T] { +self => def split: Seq[ParSeqIterator[T]] def psplit(sizes: Int*): Seq[ParSeqIterator[T]] @@ -364,6 +422,60 @@ extends ParIterableIterator[T] * @return an exact number of elements this iterator has yet to iterate */ def remaining: Int + + /* iterator transformers */ + + class Taken(tk: Int) extends super.Taken(tk) with ParSeqIterator[T] { + override def split: Seq[ParSeqIterator[T]] = super.split.asInstanceOf[Seq[ParSeqIterator[T]]] + def psplit(sizes: Int*): Seq[ParSeqIterator[T]] = takeSeq(self.psplit(sizes: _*)) { (p, n) => p.take(n) } + } + + override def take(n: Int) = new Taken(n) + + override def slice(from1: Int, until1: Int) = { + val it = new Taken(until1) + var todrop = from1 + while (todrop > 0 && it.hasNext) it.next + it + } + + class Mapped[S](f: T => S) extends super.Mapped[S](f) with ParSeqIterator[S] { + override def split: Seq[ParSeqIterator[S]] = super.split.asInstanceOf[Seq[ParSeqIterator[S]]] + def psplit(sizes: Int*): Seq[ParSeqIterator[S]] = self.psplit(sizes: _*).map { _ map f } + } + + override def map[S](f: T => S) = new Mapped(f) + + class Appended[U >: T, PI <: ParSeqIterator[U]](it: PI) extends super.Appended[U, PI](it) with ParSeqIterator[U] { + override def split: Seq[ParSeqIterator[U]] = super.split.asInstanceOf[Seq[ParSeqIterator[U]]] + def psplit(sizes: Int*): Seq[ParSeqIterator[U]] = if (firstNonEmpty) { + val selfrem = self.remaining + + // split sizes + var appendMiddle = false + val szcum = sizes.scanLeft(0)(_ + _) + val splitsizes = sizes.zip(szcum.init zip szcum.tail).flatMap { t => + val (sz, (from, until)) = t + if (from < selfrem && until > selfrem) { + appendMiddle = true + Seq(selfrem - from, until - selfrem) + } else Seq(sz) + } + val (selfszfrom, thatszfrom) = splitsizes.zip(szcum.init).span(_._2 < selfrem) + val (selfsizes, thatsizes) = (selfszfrom map { _._1 }, thatszfrom map { _._1 }); + + // split iterators + val selfs = self.psplit(selfsizes: _*) + val thats = that.psplit(thatsizes: _*) + + // appended last in self with first in rest if necessary + if (appendMiddle) selfs.init ++ Seq(selfs.last.appendSeq[U, ParSeqIterator[U]](thats.head)) ++ thats.tail + else selfs ++ thats + } else curr.asInstanceOf[ParSeqIterator[U]].psplit(sizes: _*) + } + + def appendSeq[U >: T, PI <: ParSeqIterator[U]](that: PI) = new Appended[U, PI](that) + } |