diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2011-04-13 16:31:42 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2011-04-13 16:31:42 +0000 |
commit | 3de96153e5bfbde16dcc89bfbd71ff6e8cf1f6c6 (patch) | |
tree | 2794a7bd176b315a9f4bdc3f5ef5553254b7dd47 /src/library/scala/collection/parallel/RemainsIterator.scala | |
parent | 9b5cb18dbd2d3e87def5da47ae76adb2e776487e (diff) | |
download | scala-3de96153e5bfbde16dcc89bfbd71ff6e8cf1f6c6.tar.gz scala-3de96153e5bfbde16dcc89bfbd71ff6e8cf1f6c6.tar.bz2 scala-3de96153e5bfbde16dcc89bfbd71ff6e8cf1f6c6.zip |
Refactoring the collections api to support diff...
Refactoring the collections api to support differentiation between
referring to a sequential collection and a parallel collection, and to
support referring to both types of collections.
New set of traits Gen* are now superclasses of both their * and Par* subclasses. For example, GenIterable is a superclass of both Iterable and ParIterable. Iterable and ParIterable are not in a subclassing relation. The new class hierarchy is illustrated below (simplified, not all relations and classes are shown):
TraversableOnce --> GenTraversableOnce
^ ^
| |
Traversable --> GenTraversable
^ ^
| |
Iterable --> GenIterable <-- ParIterable
^ ^ ^
| | |
Seq --> GenSeq <-- ParSeq
(the *Like, *View and *ViewLike traits have a similar hierarchy)
General views extract common view functionality from parallel and
sequential collections.
This design also allows for more flexible extensions to the collections
framework. It also allows slowly factoring out common functionality up
into Gen* traits.
From now on, it is possible to write this:
import collection._
val p = parallel.ParSeq(1, 2, 3)
val g: GenSeq[Int] = p // meaning a General Sequence
val s = g.seq // type of s is Seq[Int]
for (elem <- g) {
// do something without guarantees on sequentiality of foreach
// this foreach may be executed in parallel
}
for (elem <- s) {
// do something with a guarantee that foreach is executed in order, sequentially
}
for (elem <- p) {
// do something concurrently, in parallel
}
This also means that some signatures had to be changed. For example,
method `flatMap` now takes `A => GenTraversableOnce[B]`, and `zip` takes
a `GenIterable[B]`.
Also, there are mutable & immutable Gen* trait variants. They have
generic companion functionality.
Diffstat (limited to 'src/library/scala/collection/parallel/RemainsIterator.scala')
-rw-r--r-- | src/library/scala/collection/parallel/RemainsIterator.scala | 141 |
1 files changed, 71 insertions, 70 deletions
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index 508bc46a72..e04e0e9c72 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -17,6 +17,7 @@ import scala.collection.generic.DelegatedSignalling import scala.collection.generic.CanCombineFrom import scala.collection.mutable.Builder import scala.collection.Iterator.empty +import scala.collection.GenTraversableOnce import scala.collection.parallel.immutable.repetition @@ -45,13 +46,13 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[ i } - def reduce[U >: T](op: (U, U) => U): U = { + override def reduce[U >: T](op: (U, U) => U): U = { var r: U = next while (hasNext) r = op(r, next) r } - def fold[U >: T](z: U)(op: (U, U) => U): U = { + override def fold[U >: T](z: U)(op: (U, U) => U): U = { var r = z while (hasNext) r = op(r, next) r @@ -124,10 +125,10 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[ cb } - def flatmap2combiner[S, That](f: T => TraversableOnce[S], cb: Combiner[S, That]): Combiner[S, That] = { + def flatmap2combiner[S, That](f: T => GenTraversableOnce[S], cb: Combiner[S, That]): Combiner[S, That] = { //val cb = pbf(repr) while (hasNext) { - val traversable = f(next) + val traversable = f(next).seq if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator else cb ++= traversable } @@ -279,7 +280,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[ } -trait AugmentedSeqIterator[+T] extends AugmentedIterableIterator[T] { +private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIterator[T] { /** The exact number of elements this iterator has yet to iterate. * This method doesn't change the state of the iterator. @@ -372,7 +373,7 @@ trait AugmentedSeqIterator[+T] extends AugmentedIterableIterator[T] { * * @param T type of the elements iterated. */ -trait ParIterableIterator[+T] +trait IterableSplitter[+T] extends AugmentedIterableIterator[T] with Splitter[T] with Signalling @@ -381,9 +382,9 @@ extends AugmentedIterableIterator[T] self => /** Creates a copy of this iterator. */ - def dup: ParIterableIterator[T] + def dup: IterableSplitter[T] - def split: Seq[ParIterableIterator[T]] + def split: Seq[IterableSplitter[T]] /** The number of elements this iterator has yet to traverse. This method * doesn't change the state of the iterator. @@ -419,14 +420,14 @@ self => /* iterator transformers */ - class Taken(taken: Int) extends ParIterableIterator[T] { + class Taken(taken: Int) extends IterableSplitter[T] { var signalDelegate = self.signalDelegate var remaining = taken min self.remaining def hasNext = remaining > 0 def next = { remaining -= 1; self.next } - def dup: ParIterableIterator[T] = self.dup.take(taken) - def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) } - protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = { + def dup: IterableSplitter[T] = self.dup.take(taken) + def split: Seq[IterableSplitter[T]] = takeSeq(self.split) { (p, n) => p.take(n) } + protected[this] def takeSeq[PI <: IterableSplitter[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = { val sizes = sq.scanLeft(0)(_ + _.remaining) val shortened = for ((it, (from, until)) <- sq zip (sizes.init zip sizes.tail)) yield if (until < remaining) it else taker(it, remaining - from) @@ -445,23 +446,23 @@ self => } it } - override def take(n: Int): ParIterableIterator[T] = newTaken(n) - override def slice(from1: Int, until1: Int): ParIterableIterator[T] = newSliceInternal(newTaken(until1), from1) + override def take(n: Int): IterableSplitter[T] = newTaken(n) + override def slice(from1: Int, until1: Int): IterableSplitter[T] = newSliceInternal(newTaken(until1), from1) - class Mapped[S](f: T => S) extends ParIterableIterator[S] { + class Mapped[S](f: T => S) extends IterableSplitter[S] { var signalDelegate = self.signalDelegate def hasNext = self.hasNext def next = f(self.next) def remaining = self.remaining - def dup: ParIterableIterator[S] = self.dup map f - def split: Seq[ParIterableIterator[S]] = self.split.map { _ map f } + def dup: IterableSplitter[S] = self.dup map f + def split: Seq[IterableSplitter[S]] = self.split.map { _ map f } } override def map[S](f: T => S) = new Mapped(f) - class Appended[U >: T, PI <: ParIterableIterator[U]](protected val that: PI) extends ParIterableIterator[U] { + class Appended[U >: T, PI <: IterableSplitter[U]](protected val that: PI) extends IterableSplitter[U] { var signalDelegate = self.signalDelegate - protected var curr: ParIterableIterator[U] = self + protected var curr: IterableSplitter[U] = self def hasNext = if (curr.hasNext) true else if (curr eq self) { curr = that curr.hasNext @@ -472,19 +473,19 @@ self => } else curr.next def remaining = if (curr eq self) curr.remaining + that.remaining else curr.remaining protected def firstNonEmpty = (curr eq self) && curr.hasNext - def dup: ParIterableIterator[U] = self.dup.appendParIterable[U, PI](that) - def split: Seq[ParIterableIterator[U]] = if (firstNonEmpty) Seq(curr, that) else curr.split + def dup: IterableSplitter[U] = self.dup.appendParIterable[U, PI](that) + def split: Seq[IterableSplitter[U]] = if (firstNonEmpty) Seq(curr, that) else curr.split } - def appendParIterable[U >: T, PI <: ParIterableIterator[U]](that: PI) = new Appended[U, PI](that) + def appendParIterable[U >: T, PI <: IterableSplitter[U]](that: PI) = new Appended[U, PI](that) - class Zipped[S](protected val that: ParSeqIterator[S]) extends ParIterableIterator[(T, S)] { + class Zipped[S](protected val that: SeqSplitter[S]) extends IterableSplitter[(T, S)] { var signalDelegate = self.signalDelegate def hasNext = self.hasNext && that.hasNext def next = (self.next, that.next) def remaining = self.remaining min that.remaining - def dup: ParIterableIterator[(T, S)] = self.dup.zipParSeq(that) - def split: Seq[ParIterableIterator[(T, S)]] = { + def dup: IterableSplitter[(T, S)] = self.dup.zipParSeq(that) + def split: Seq[IterableSplitter[(T, S)]] = { val selfs = self.split val sizes = selfs.map(_.remaining) val thats = that.psplit(sizes: _*) @@ -492,10 +493,10 @@ self => } } - def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that) + def zipParSeq[S](that: SeqSplitter[S]) = new Zipped(that) - class ZippedAll[U >: T, S](protected val that: ParSeqIterator[S], protected val thiselem: U, protected val thatelem: S) - extends ParIterableIterator[(U, S)] { + class ZippedAll[U >: T, S](protected val that: SeqSplitter[S], protected val thiselem: U, protected val thatelem: S) + extends IterableSplitter[(U, S)] { var signalDelegate = self.signalDelegate def hasNext = self.hasNext || that.hasNext def next = if (self.hasNext) { @@ -503,18 +504,18 @@ self => else (self.next, thatelem) } else (thiselem, that.next); def remaining = self.remaining max that.remaining - def dup: ParIterableIterator[(U, S)] = self.dup.zipAllParSeq(that, thiselem, thatelem) - def split: Seq[ParIterableIterator[(U, S)]] = { + def dup: IterableSplitter[(U, S)] = self.dup.zipAllParSeq(that, thiselem, thatelem) + def split: Seq[IterableSplitter[(U, S)]] = { val selfrem = self.remaining val thatrem = that.remaining - val thisit = if (selfrem < thatrem) self.appendParIterable[U, ParSeqIterator[U]](repetition[U](thiselem, thatrem - selfrem).parallelIterator) else self - val thatit = if (selfrem > thatrem) that.appendParSeq(repetition(thatelem, selfrem - thatrem).parallelIterator) else that + val thisit = if (selfrem < thatrem) self.appendParIterable[U, SeqSplitter[U]](repetition[U](thiselem, thatrem - selfrem).splitter) else self + val thatit = if (selfrem > thatrem) that.appendParSeq(repetition(thatelem, selfrem - thatrem).splitter) else that val zipped = thisit zipParSeq thatit zipped.split } } - def zipAllParSeq[S, U >: T, R >: S](that: ParSeqIterator[S], thisElem: U, thatElem: R) = new ZippedAll[U, R](that, thisElem, thatElem) + def zipAllParSeq[S, U >: T, R >: S](that: SeqSplitter[S], thisElem: U, thatElem: R) = new ZippedAll[U, R](that, thisElem, thatElem) } @@ -523,15 +524,15 @@ self => * * @param T type of the elements iterated. */ -trait ParSeqIterator[+T] -extends ParIterableIterator[T] +trait SeqSplitter[+T] +extends IterableSplitter[T] with AugmentedSeqIterator[T] with PreciseSplitter[T] { self => - def dup: ParSeqIterator[T] - def split: Seq[ParSeqIterator[T]] - def psplit(sizes: Int*): Seq[ParSeqIterator[T]] + def dup: SeqSplitter[T] + def split: Seq[SeqSplitter[T]] + def psplit(sizes: Int*): Seq[SeqSplitter[T]] /** The number of elements this iterator has yet to traverse. This method * doesn't change the state of the iterator. Unlike the version of this method in the supertrait, @@ -544,27 +545,27 @@ self => /* iterator transformers */ - class Taken(tk: Int) extends super.Taken(tk) with ParSeqIterator[T] { - override def dup = super.dup.asInstanceOf[ParSeqIterator[T]] - override def split: Seq[ParSeqIterator[T]] = super.split.asInstanceOf[Seq[ParSeqIterator[T]]] - def psplit(sizes: Int*): Seq[ParSeqIterator[T]] = takeSeq(self.psplit(sizes: _*)) { (p, n) => p.take(n) } + class Taken(tk: Int) extends super.Taken(tk) with SeqSplitter[T] { + override def dup = super.dup.asInstanceOf[SeqSplitter[T]] + override def split: Seq[SeqSplitter[T]] = super.split.asInstanceOf[Seq[SeqSplitter[T]]] + def psplit(sizes: Int*): Seq[SeqSplitter[T]] = takeSeq(self.psplit(sizes: _*)) { (p, n) => p.take(n) } } override private[collection] def newTaken(until: Int): Taken = new Taken(until) - override def take(n: Int): ParSeqIterator[T] = newTaken(n) - override def slice(from1: Int, until1: Int): ParSeqIterator[T] = newSliceInternal(newTaken(until1), from1) + override def take(n: Int): SeqSplitter[T] = newTaken(n) + override def slice(from1: Int, until1: Int): SeqSplitter[T] = newSliceInternal(newTaken(until1), from1) - class Mapped[S](f: T => S) extends super.Mapped[S](f) with ParSeqIterator[S] { - override def dup = super.dup.asInstanceOf[ParSeqIterator[S]] - override def split: Seq[ParSeqIterator[S]] = super.split.asInstanceOf[Seq[ParSeqIterator[S]]] - def psplit(sizes: Int*): Seq[ParSeqIterator[S]] = self.psplit(sizes: _*).map { _ map f } + class Mapped[S](f: T => S) extends super.Mapped[S](f) with SeqSplitter[S] { + override def dup = super.dup.asInstanceOf[SeqSplitter[S]] + override def split: Seq[SeqSplitter[S]] = super.split.asInstanceOf[Seq[SeqSplitter[S]]] + def psplit(sizes: Int*): Seq[SeqSplitter[S]] = self.psplit(sizes: _*).map { _ map f } } override def map[S](f: T => S) = new Mapped(f) - class Appended[U >: T, PI <: ParSeqIterator[U]](it: PI) extends super.Appended[U, PI](it) with ParSeqIterator[U] { - override def dup = super.dup.asInstanceOf[ParSeqIterator[U]] - override def split: Seq[ParSeqIterator[U]] = super.split.asInstanceOf[Seq[ParSeqIterator[U]]] - def psplit(sizes: Int*): Seq[ParSeqIterator[U]] = if (firstNonEmpty) { + class Appended[U >: T, PI <: SeqSplitter[U]](it: PI) extends super.Appended[U, PI](it) with SeqSplitter[U] { + override def dup = super.dup.asInstanceOf[SeqSplitter[U]] + override def split: Seq[SeqSplitter[U]] = super.split.asInstanceOf[Seq[SeqSplitter[U]]] + def psplit(sizes: Int*): Seq[SeqSplitter[U]] = if (firstNonEmpty) { val selfrem = self.remaining // split sizes @@ -585,56 +586,56 @@ self => val thats = that.psplit(thatsizes: _*) // appended last in self with first in rest if necessary - if (appendMiddle) selfs.init ++ Seq(selfs.last.appendParSeq[U, ParSeqIterator[U]](thats.head)) ++ thats.tail + if (appendMiddle) selfs.init ++ Seq(selfs.last.appendParSeq[U, SeqSplitter[U]](thats.head)) ++ thats.tail else selfs ++ thats - } else curr.asInstanceOf[ParSeqIterator[U]].psplit(sizes: _*) + } else curr.asInstanceOf[SeqSplitter[U]].psplit(sizes: _*) } - def appendParSeq[U >: T, PI <: ParSeqIterator[U]](that: PI) = new Appended[U, PI](that) + def appendParSeq[U >: T, PI <: SeqSplitter[U]](that: PI) = new Appended[U, PI](that) - class Zipped[S](ti: ParSeqIterator[S]) extends super.Zipped[S](ti) with ParSeqIterator[(T, S)] { - override def dup = super.dup.asInstanceOf[ParSeqIterator[(T, S)]] - override def split: Seq[ParSeqIterator[(T, S)]] = super.split.asInstanceOf[Seq[ParSeqIterator[(T, S)]]] + class Zipped[S](ti: SeqSplitter[S]) extends super.Zipped[S](ti) with SeqSplitter[(T, S)] { + override def dup = super.dup.asInstanceOf[SeqSplitter[(T, S)]] + override def split: Seq[SeqSplitter[(T, S)]] = super.split.asInstanceOf[Seq[SeqSplitter[(T, S)]]] def psplit(szs: Int*) = (self.psplit(szs: _*) zip that.psplit(szs: _*)) map { p => p._1 zipParSeq p._2 } } - override def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that) + override def zipParSeq[S](that: SeqSplitter[S]) = new Zipped(that) - class ZippedAll[U >: T, S](ti: ParSeqIterator[S], thise: U, thate: S) extends super.ZippedAll[U, S](ti, thise, thate) with ParSeqIterator[(U, S)] { - override def dup = super.dup.asInstanceOf[ParSeqIterator[(U, S)]] + class ZippedAll[U >: T, S](ti: SeqSplitter[S], thise: U, thate: S) extends super.ZippedAll[U, S](ti, thise, thate) with SeqSplitter[(U, S)] { + override def dup = super.dup.asInstanceOf[SeqSplitter[(U, S)]] private def patchem = { val selfrem = self.remaining val thatrem = that.remaining - val thisit = if (selfrem < thatrem) self.appendParSeq[U, ParSeqIterator[U]](repetition[U](thiselem, thatrem - selfrem).parallelIterator) else self - val thatit = if (selfrem > thatrem) that.appendParSeq(repetition(thatelem, selfrem - thatrem).parallelIterator) else that + val thisit = if (selfrem < thatrem) self.appendParSeq[U, SeqSplitter[U]](repetition[U](thiselem, thatrem - selfrem).splitter) else self + val thatit = if (selfrem > thatrem) that.appendParSeq(repetition(thatelem, selfrem - thatrem).splitter) else that (thisit, thatit) } - override def split: Seq[ParSeqIterator[(U, S)]] = { + override def split: Seq[SeqSplitter[(U, S)]] = { val (thisit, thatit) = patchem val zipped = thisit zipParSeq thatit zipped.split } - def psplit(sizes: Int*): Seq[ParSeqIterator[(U, S)]] = { + def psplit(sizes: Int*): Seq[SeqSplitter[(U, S)]] = { val (thisit, thatit) = patchem val zipped = thisit zipParSeq thatit zipped.psplit(sizes: _*) } } - override def zipAllParSeq[S, U >: T, R >: S](that: ParSeqIterator[S], thisElem: U, thatElem: R) = new ZippedAll[U, R](that, thisElem, thatElem) + override def zipAllParSeq[S, U >: T, R >: S](that: SeqSplitter[S], thisElem: U, thatElem: R) = new ZippedAll[U, R](that, thisElem, thatElem) - def reverse: ParSeqIterator[T] = { + def reverse: SeqSplitter[T] = { val pa = mutable.ParArray.fromTraversables(self).reverse new pa.ParArrayIterator with pa.SCPI { override def reverse = self } } - class Patched[U >: T](from: Int, patch: ParSeqIterator[U], replaced: Int) extends ParSeqIterator[U] { + class Patched[U >: T](from: Int, patch: SeqSplitter[U], replaced: Int) extends SeqSplitter[U] { var signalDelegate = self.signalDelegate private[this] val trio = { val pits = self.psplit(from, replaced, self.remaining - from - replaced) - (pits(0).appendParSeq[U, ParSeqIterator[U]](patch)) appendParSeq pits(2) + (pits(0).appendParSeq[U, SeqSplitter[U]](patch)) appendParSeq pits(2) } def hasNext = trio.hasNext def next = trio.next @@ -644,7 +645,7 @@ self => def psplit(sizes: Int*) = trio.psplit(sizes: _*) } - def patchParSeq[U >: T](from: Int, patchElems: ParSeqIterator[U], replaced: Int) = new Patched(from, patchElems, replaced) + def patchParSeq[U >: T](from: Int, patchElems: SeqSplitter[U], replaced: Int) = new Patched(from, patchElems, replaced) } |