summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/RemainsIterator.scala
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2011-04-13 16:31:42 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2011-04-13 16:31:42 +0000
commit3de96153e5bfbde16dcc89bfbd71ff6e8cf1f6c6 (patch)
tree2794a7bd176b315a9f4bdc3f5ef5553254b7dd47 /src/library/scala/collection/parallel/RemainsIterator.scala
parent9b5cb18dbd2d3e87def5da47ae76adb2e776487e (diff)
downloadscala-3de96153e5bfbde16dcc89bfbd71ff6e8cf1f6c6.tar.gz
scala-3de96153e5bfbde16dcc89bfbd71ff6e8cf1f6c6.tar.bz2
scala-3de96153e5bfbde16dcc89bfbd71ff6e8cf1f6c6.zip
Refactoring the collections api to support diff...
Refactoring the collections api to support differentiation between referring to a sequential collection and a parallel collection, and to support referring to both types of collections. New set of traits Gen* are now superclasses of both their * and Par* subclasses. For example, GenIterable is a superclass of both Iterable and ParIterable. Iterable and ParIterable are not in a subclassing relation. The new class hierarchy is illustrated below (simplified, not all relations and classes are shown): TraversableOnce --> GenTraversableOnce ^ ^ | | Traversable --> GenTraversable ^ ^ | | Iterable --> GenIterable <-- ParIterable ^ ^ ^ | | | Seq --> GenSeq <-- ParSeq (the *Like, *View and *ViewLike traits have a similar hierarchy) General views extract common view functionality from parallel and sequential collections. This design also allows for more flexible extensions to the collections framework. It also allows slowly factoring out common functionality up into Gen* traits. From now on, it is possible to write this: import collection._ val p = parallel.ParSeq(1, 2, 3) val g: GenSeq[Int] = p // meaning a General Sequence val s = g.seq // type of s is Seq[Int] for (elem <- g) { // do something without guarantees on sequentiality of foreach // this foreach may be executed in parallel } for (elem <- s) { // do something with a guarantee that foreach is executed in order, sequentially } for (elem <- p) { // do something concurrently, in parallel } This also means that some signatures had to be changed. For example, method `flatMap` now takes `A => GenTraversableOnce[B]`, and `zip` takes a `GenIterable[B]`. Also, there are mutable & immutable Gen* trait variants. They have generic companion functionality.
Diffstat (limited to 'src/library/scala/collection/parallel/RemainsIterator.scala')
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala141
1 files changed, 71 insertions, 70 deletions
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index 508bc46a72..e04e0e9c72 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -17,6 +17,7 @@ import scala.collection.generic.DelegatedSignalling
import scala.collection.generic.CanCombineFrom
import scala.collection.mutable.Builder
import scala.collection.Iterator.empty
+import scala.collection.GenTraversableOnce
import scala.collection.parallel.immutable.repetition
@@ -45,13 +46,13 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
i
}
- def reduce[U >: T](op: (U, U) => U): U = {
+ override def reduce[U >: T](op: (U, U) => U): U = {
var r: U = next
while (hasNext) r = op(r, next)
r
}
- def fold[U >: T](z: U)(op: (U, U) => U): U = {
+ override def fold[U >: T](z: U)(op: (U, U) => U): U = {
var r = z
while (hasNext) r = op(r, next)
r
@@ -124,10 +125,10 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
cb
}
- def flatmap2combiner[S, That](f: T => TraversableOnce[S], cb: Combiner[S, That]): Combiner[S, That] = {
+ def flatmap2combiner[S, That](f: T => GenTraversableOnce[S], cb: Combiner[S, That]): Combiner[S, That] = {
//val cb = pbf(repr)
while (hasNext) {
- val traversable = f(next)
+ val traversable = f(next).seq
if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator
else cb ++= traversable
}
@@ -279,7 +280,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
}
-trait AugmentedSeqIterator[+T] extends AugmentedIterableIterator[T] {
+private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIterator[T] {
/** The exact number of elements this iterator has yet to iterate.
* This method doesn't change the state of the iterator.
@@ -372,7 +373,7 @@ trait AugmentedSeqIterator[+T] extends AugmentedIterableIterator[T] {
*
* @param T type of the elements iterated.
*/
-trait ParIterableIterator[+T]
+trait IterableSplitter[+T]
extends AugmentedIterableIterator[T]
with Splitter[T]
with Signalling
@@ -381,9 +382,9 @@ extends AugmentedIterableIterator[T]
self =>
/** Creates a copy of this iterator. */
- def dup: ParIterableIterator[T]
+ def dup: IterableSplitter[T]
- def split: Seq[ParIterableIterator[T]]
+ def split: Seq[IterableSplitter[T]]
/** The number of elements this iterator has yet to traverse. This method
* doesn't change the state of the iterator.
@@ -419,14 +420,14 @@ self =>
/* iterator transformers */
- class Taken(taken: Int) extends ParIterableIterator[T] {
+ class Taken(taken: Int) extends IterableSplitter[T] {
var signalDelegate = self.signalDelegate
var remaining = taken min self.remaining
def hasNext = remaining > 0
def next = { remaining -= 1; self.next }
- def dup: ParIterableIterator[T] = self.dup.take(taken)
- def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) }
- protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = {
+ def dup: IterableSplitter[T] = self.dup.take(taken)
+ def split: Seq[IterableSplitter[T]] = takeSeq(self.split) { (p, n) => p.take(n) }
+ protected[this] def takeSeq[PI <: IterableSplitter[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = {
val sizes = sq.scanLeft(0)(_ + _.remaining)
val shortened = for ((it, (from, until)) <- sq zip (sizes.init zip sizes.tail)) yield
if (until < remaining) it else taker(it, remaining - from)
@@ -445,23 +446,23 @@ self =>
}
it
}
- override def take(n: Int): ParIterableIterator[T] = newTaken(n)
- override def slice(from1: Int, until1: Int): ParIterableIterator[T] = newSliceInternal(newTaken(until1), from1)
+ override def take(n: Int): IterableSplitter[T] = newTaken(n)
+ override def slice(from1: Int, until1: Int): IterableSplitter[T] = newSliceInternal(newTaken(until1), from1)
- class Mapped[S](f: T => S) extends ParIterableIterator[S] {
+ class Mapped[S](f: T => S) extends IterableSplitter[S] {
var signalDelegate = self.signalDelegate
def hasNext = self.hasNext
def next = f(self.next)
def remaining = self.remaining
- def dup: ParIterableIterator[S] = self.dup map f
- def split: Seq[ParIterableIterator[S]] = self.split.map { _ map f }
+ def dup: IterableSplitter[S] = self.dup map f
+ def split: Seq[IterableSplitter[S]] = self.split.map { _ map f }
}
override def map[S](f: T => S) = new Mapped(f)
- class Appended[U >: T, PI <: ParIterableIterator[U]](protected val that: PI) extends ParIterableIterator[U] {
+ class Appended[U >: T, PI <: IterableSplitter[U]](protected val that: PI) extends IterableSplitter[U] {
var signalDelegate = self.signalDelegate
- protected var curr: ParIterableIterator[U] = self
+ protected var curr: IterableSplitter[U] = self
def hasNext = if (curr.hasNext) true else if (curr eq self) {
curr = that
curr.hasNext
@@ -472,19 +473,19 @@ self =>
} else curr.next
def remaining = if (curr eq self) curr.remaining + that.remaining else curr.remaining
protected def firstNonEmpty = (curr eq self) && curr.hasNext
- def dup: ParIterableIterator[U] = self.dup.appendParIterable[U, PI](that)
- def split: Seq[ParIterableIterator[U]] = if (firstNonEmpty) Seq(curr, that) else curr.split
+ def dup: IterableSplitter[U] = self.dup.appendParIterable[U, PI](that)
+ def split: Seq[IterableSplitter[U]] = if (firstNonEmpty) Seq(curr, that) else curr.split
}
- def appendParIterable[U >: T, PI <: ParIterableIterator[U]](that: PI) = new Appended[U, PI](that)
+ def appendParIterable[U >: T, PI <: IterableSplitter[U]](that: PI) = new Appended[U, PI](that)
- class Zipped[S](protected val that: ParSeqIterator[S]) extends ParIterableIterator[(T, S)] {
+ class Zipped[S](protected val that: SeqSplitter[S]) extends IterableSplitter[(T, S)] {
var signalDelegate = self.signalDelegate
def hasNext = self.hasNext && that.hasNext
def next = (self.next, that.next)
def remaining = self.remaining min that.remaining
- def dup: ParIterableIterator[(T, S)] = self.dup.zipParSeq(that)
- def split: Seq[ParIterableIterator[(T, S)]] = {
+ def dup: IterableSplitter[(T, S)] = self.dup.zipParSeq(that)
+ def split: Seq[IterableSplitter[(T, S)]] = {
val selfs = self.split
val sizes = selfs.map(_.remaining)
val thats = that.psplit(sizes: _*)
@@ -492,10 +493,10 @@ self =>
}
}
- def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that)
+ def zipParSeq[S](that: SeqSplitter[S]) = new Zipped(that)
- class ZippedAll[U >: T, S](protected val that: ParSeqIterator[S], protected val thiselem: U, protected val thatelem: S)
- extends ParIterableIterator[(U, S)] {
+ class ZippedAll[U >: T, S](protected val that: SeqSplitter[S], protected val thiselem: U, protected val thatelem: S)
+ extends IterableSplitter[(U, S)] {
var signalDelegate = self.signalDelegate
def hasNext = self.hasNext || that.hasNext
def next = if (self.hasNext) {
@@ -503,18 +504,18 @@ self =>
else (self.next, thatelem)
} else (thiselem, that.next);
def remaining = self.remaining max that.remaining
- def dup: ParIterableIterator[(U, S)] = self.dup.zipAllParSeq(that, thiselem, thatelem)
- def split: Seq[ParIterableIterator[(U, S)]] = {
+ def dup: IterableSplitter[(U, S)] = self.dup.zipAllParSeq(that, thiselem, thatelem)
+ def split: Seq[IterableSplitter[(U, S)]] = {
val selfrem = self.remaining
val thatrem = that.remaining
- val thisit = if (selfrem < thatrem) self.appendParIterable[U, ParSeqIterator[U]](repetition[U](thiselem, thatrem - selfrem).parallelIterator) else self
- val thatit = if (selfrem > thatrem) that.appendParSeq(repetition(thatelem, selfrem - thatrem).parallelIterator) else that
+ val thisit = if (selfrem < thatrem) self.appendParIterable[U, SeqSplitter[U]](repetition[U](thiselem, thatrem - selfrem).splitter) else self
+ val thatit = if (selfrem > thatrem) that.appendParSeq(repetition(thatelem, selfrem - thatrem).splitter) else that
val zipped = thisit zipParSeq thatit
zipped.split
}
}
- def zipAllParSeq[S, U >: T, R >: S](that: ParSeqIterator[S], thisElem: U, thatElem: R) = new ZippedAll[U, R](that, thisElem, thatElem)
+ def zipAllParSeq[S, U >: T, R >: S](that: SeqSplitter[S], thisElem: U, thatElem: R) = new ZippedAll[U, R](that, thisElem, thatElem)
}
@@ -523,15 +524,15 @@ self =>
*
* @param T type of the elements iterated.
*/
-trait ParSeqIterator[+T]
-extends ParIterableIterator[T]
+trait SeqSplitter[+T]
+extends IterableSplitter[T]
with AugmentedSeqIterator[T]
with PreciseSplitter[T]
{
self =>
- def dup: ParSeqIterator[T]
- def split: Seq[ParSeqIterator[T]]
- def psplit(sizes: Int*): Seq[ParSeqIterator[T]]
+ def dup: SeqSplitter[T]
+ def split: Seq[SeqSplitter[T]]
+ def psplit(sizes: Int*): Seq[SeqSplitter[T]]
/** The number of elements this iterator has yet to traverse. This method
* doesn't change the state of the iterator. Unlike the version of this method in the supertrait,
@@ -544,27 +545,27 @@ self =>
/* iterator transformers */
- class Taken(tk: Int) extends super.Taken(tk) with ParSeqIterator[T] {
- override def dup = super.dup.asInstanceOf[ParSeqIterator[T]]
- override def split: Seq[ParSeqIterator[T]] = super.split.asInstanceOf[Seq[ParSeqIterator[T]]]
- def psplit(sizes: Int*): Seq[ParSeqIterator[T]] = takeSeq(self.psplit(sizes: _*)) { (p, n) => p.take(n) }
+ class Taken(tk: Int) extends super.Taken(tk) with SeqSplitter[T] {
+ override def dup = super.dup.asInstanceOf[SeqSplitter[T]]
+ override def split: Seq[SeqSplitter[T]] = super.split.asInstanceOf[Seq[SeqSplitter[T]]]
+ def psplit(sizes: Int*): Seq[SeqSplitter[T]] = takeSeq(self.psplit(sizes: _*)) { (p, n) => p.take(n) }
}
override private[collection] def newTaken(until: Int): Taken = new Taken(until)
- override def take(n: Int): ParSeqIterator[T] = newTaken(n)
- override def slice(from1: Int, until1: Int): ParSeqIterator[T] = newSliceInternal(newTaken(until1), from1)
+ override def take(n: Int): SeqSplitter[T] = newTaken(n)
+ override def slice(from1: Int, until1: Int): SeqSplitter[T] = newSliceInternal(newTaken(until1), from1)
- class Mapped[S](f: T => S) extends super.Mapped[S](f) with ParSeqIterator[S] {
- override def dup = super.dup.asInstanceOf[ParSeqIterator[S]]
- override def split: Seq[ParSeqIterator[S]] = super.split.asInstanceOf[Seq[ParSeqIterator[S]]]
- def psplit(sizes: Int*): Seq[ParSeqIterator[S]] = self.psplit(sizes: _*).map { _ map f }
+ class Mapped[S](f: T => S) extends super.Mapped[S](f) with SeqSplitter[S] {
+ override def dup = super.dup.asInstanceOf[SeqSplitter[S]]
+ override def split: Seq[SeqSplitter[S]] = super.split.asInstanceOf[Seq[SeqSplitter[S]]]
+ def psplit(sizes: Int*): Seq[SeqSplitter[S]] = self.psplit(sizes: _*).map { _ map f }
}
override def map[S](f: T => S) = new Mapped(f)
- class Appended[U >: T, PI <: ParSeqIterator[U]](it: PI) extends super.Appended[U, PI](it) with ParSeqIterator[U] {
- override def dup = super.dup.asInstanceOf[ParSeqIterator[U]]
- override def split: Seq[ParSeqIterator[U]] = super.split.asInstanceOf[Seq[ParSeqIterator[U]]]
- def psplit(sizes: Int*): Seq[ParSeqIterator[U]] = if (firstNonEmpty) {
+ class Appended[U >: T, PI <: SeqSplitter[U]](it: PI) extends super.Appended[U, PI](it) with SeqSplitter[U] {
+ override def dup = super.dup.asInstanceOf[SeqSplitter[U]]
+ override def split: Seq[SeqSplitter[U]] = super.split.asInstanceOf[Seq[SeqSplitter[U]]]
+ def psplit(sizes: Int*): Seq[SeqSplitter[U]] = if (firstNonEmpty) {
val selfrem = self.remaining
// split sizes
@@ -585,56 +586,56 @@ self =>
val thats = that.psplit(thatsizes: _*)
// appended last in self with first in rest if necessary
- if (appendMiddle) selfs.init ++ Seq(selfs.last.appendParSeq[U, ParSeqIterator[U]](thats.head)) ++ thats.tail
+ if (appendMiddle) selfs.init ++ Seq(selfs.last.appendParSeq[U, SeqSplitter[U]](thats.head)) ++ thats.tail
else selfs ++ thats
- } else curr.asInstanceOf[ParSeqIterator[U]].psplit(sizes: _*)
+ } else curr.asInstanceOf[SeqSplitter[U]].psplit(sizes: _*)
}
- def appendParSeq[U >: T, PI <: ParSeqIterator[U]](that: PI) = new Appended[U, PI](that)
+ def appendParSeq[U >: T, PI <: SeqSplitter[U]](that: PI) = new Appended[U, PI](that)
- class Zipped[S](ti: ParSeqIterator[S]) extends super.Zipped[S](ti) with ParSeqIterator[(T, S)] {
- override def dup = super.dup.asInstanceOf[ParSeqIterator[(T, S)]]
- override def split: Seq[ParSeqIterator[(T, S)]] = super.split.asInstanceOf[Seq[ParSeqIterator[(T, S)]]]
+ class Zipped[S](ti: SeqSplitter[S]) extends super.Zipped[S](ti) with SeqSplitter[(T, S)] {
+ override def dup = super.dup.asInstanceOf[SeqSplitter[(T, S)]]
+ override def split: Seq[SeqSplitter[(T, S)]] = super.split.asInstanceOf[Seq[SeqSplitter[(T, S)]]]
def psplit(szs: Int*) = (self.psplit(szs: _*) zip that.psplit(szs: _*)) map { p => p._1 zipParSeq p._2 }
}
- override def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that)
+ override def zipParSeq[S](that: SeqSplitter[S]) = new Zipped(that)
- class ZippedAll[U >: T, S](ti: ParSeqIterator[S], thise: U, thate: S) extends super.ZippedAll[U, S](ti, thise, thate) with ParSeqIterator[(U, S)] {
- override def dup = super.dup.asInstanceOf[ParSeqIterator[(U, S)]]
+ class ZippedAll[U >: T, S](ti: SeqSplitter[S], thise: U, thate: S) extends super.ZippedAll[U, S](ti, thise, thate) with SeqSplitter[(U, S)] {
+ override def dup = super.dup.asInstanceOf[SeqSplitter[(U, S)]]
private def patchem = {
val selfrem = self.remaining
val thatrem = that.remaining
- val thisit = if (selfrem < thatrem) self.appendParSeq[U, ParSeqIterator[U]](repetition[U](thiselem, thatrem - selfrem).parallelIterator) else self
- val thatit = if (selfrem > thatrem) that.appendParSeq(repetition(thatelem, selfrem - thatrem).parallelIterator) else that
+ val thisit = if (selfrem < thatrem) self.appendParSeq[U, SeqSplitter[U]](repetition[U](thiselem, thatrem - selfrem).splitter) else self
+ val thatit = if (selfrem > thatrem) that.appendParSeq(repetition(thatelem, selfrem - thatrem).splitter) else that
(thisit, thatit)
}
- override def split: Seq[ParSeqIterator[(U, S)]] = {
+ override def split: Seq[SeqSplitter[(U, S)]] = {
val (thisit, thatit) = patchem
val zipped = thisit zipParSeq thatit
zipped.split
}
- def psplit(sizes: Int*): Seq[ParSeqIterator[(U, S)]] = {
+ def psplit(sizes: Int*): Seq[SeqSplitter[(U, S)]] = {
val (thisit, thatit) = patchem
val zipped = thisit zipParSeq thatit
zipped.psplit(sizes: _*)
}
}
- override def zipAllParSeq[S, U >: T, R >: S](that: ParSeqIterator[S], thisElem: U, thatElem: R) = new ZippedAll[U, R](that, thisElem, thatElem)
+ override def zipAllParSeq[S, U >: T, R >: S](that: SeqSplitter[S], thisElem: U, thatElem: R) = new ZippedAll[U, R](that, thisElem, thatElem)
- def reverse: ParSeqIterator[T] = {
+ def reverse: SeqSplitter[T] = {
val pa = mutable.ParArray.fromTraversables(self).reverse
new pa.ParArrayIterator with pa.SCPI {
override def reverse = self
}
}
- class Patched[U >: T](from: Int, patch: ParSeqIterator[U], replaced: Int) extends ParSeqIterator[U] {
+ class Patched[U >: T](from: Int, patch: SeqSplitter[U], replaced: Int) extends SeqSplitter[U] {
var signalDelegate = self.signalDelegate
private[this] val trio = {
val pits = self.psplit(from, replaced, self.remaining - from - replaced)
- (pits(0).appendParSeq[U, ParSeqIterator[U]](patch)) appendParSeq pits(2)
+ (pits(0).appendParSeq[U, SeqSplitter[U]](patch)) appendParSeq pits(2)
}
def hasNext = trio.hasNext
def next = trio.next
@@ -644,7 +645,7 @@ self =>
def psplit(sizes: Int*) = trio.psplit(sizes: _*)
}
- def patchParSeq[U >: T](from: Int, patchElems: ParSeqIterator[U], replaced: Int) = new Patched(from, patchElems, replaced)
+ def patchParSeq[U >: T](from: Int, patchElems: SeqSplitter[U], replaced: Int) = new Patched(from, patchElems, replaced)
}