diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-06-18 15:06:17 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-06-18 15:06:17 +0000 |
commit | 9923b97157725ae1f7853a4834ef5e31283a1b98 (patch) | |
tree | 6252cf350a91d6bed178b07ed3ddc7fdd21d2890 /src/library/scala/collection/parallel/Iterators.scala | |
parent | ceec792d1af5bb7b2d618f27f6fd48cdf75cf92f (diff) | |
download | scala-9923b97157725ae1f7853a4834ef5e31283a1b98.tar.gz scala-9923b97157725ae1f7853a4834ef5e31283a1b98.tar.bz2 scala-9923b97157725ae1f7853a4834ef5e31283a1b98.zip |
Moved parallel collections to library dir, chan...
Moved parallel collections to library dir, changed sabbus script. Added
`par` to some of the classes. No review.
Diffstat (limited to 'src/library/scala/collection/parallel/Iterators.scala')
-rw-r--r-- | src/library/scala/collection/parallel/Iterators.scala | 443 |
1 files changed, 443 insertions, 0 deletions
diff --git a/src/library/scala/collection/parallel/Iterators.scala b/src/library/scala/collection/parallel/Iterators.scala new file mode 100644 index 0000000000..bfebff994c --- /dev/null +++ b/src/library/scala/collection/parallel/Iterators.scala @@ -0,0 +1,443 @@ +package scala.collection.parallel + + + +import scala.collection.Parallel +import scala.collection.generic.Signalling +import scala.collection.generic.DelegatedSignalling +import scala.collection.generic.CanCombineFrom +import scala.collection.mutable.Builder +import scala.collection.Iterator.empty + + + + + + +trait RemainsIterator[+T] extends Iterator[T] { + /** The number of elements this iterator has yet to iterate. + * This method doesn't change the state of the iterator. + */ + def remaining: Int +} + + +/** Augments iterators with additional methods, mostly transformers, + * assuming they iterate an iterable collection. + * + * @param T type of the elements iterated. + * @param Repr type of the collection iterator iterates. + */ +trait AugmentedIterableIterator[+T, +Repr <: Parallel] extends RemainsIterator[T] { + + def repr: Repr + + /* accessors */ + + override def count(p: T => Boolean): Int = { + var i = 0 + while (hasNext) if (p(next)) i += 1 + i + } + + def reduce[U >: T](op: (U, U) => U): U = { + var r: U = next + while (hasNext) r = op(r, next) + r + } + + def fold[U >: T](z: U)(op: (U, U) => U): U = { + var r = z + while (hasNext) r = op(r, next) + r + } + + override def sum[U >: T](implicit num: Numeric[U]): U = { + var r: U = num.zero + while (hasNext) r = num.plus(r, next) + r + } + + override def product[U >: T](implicit num: Numeric[U]): U = { + var r: U = num.one + while (hasNext) r = num.times(r, next) + r + } + + override def min[U >: T](implicit ord: Ordering[U]): T = { + var r = next + while (hasNext) { + val curr = next + if (ord.lteq(curr, r)) r = curr + } + r + } + + override def max[U >: T](implicit ord: Ordering[U]): T = { + var r = next + while (hasNext) { + val curr = next + if (ord.gteq(curr, r)) r = curr + } + r + } + + override def copyToArray[U >: T](array: Array[U], from: Int, len: Int) { + var i = from + val until = from + len + while (i < until && hasNext) { + array(i) = next + i += 1 + } + } + + /* transformers to combiners */ + + def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = { + //val cb = pbf(repr) + cb.sizeHint(remaining) + while (hasNext) cb += f(next) + cb + } + + def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = { + val cb = pbf(repr) + while (hasNext) { + val curr = next + if (pf.isDefinedAt(curr)) cb += pf(curr) + } + cb + } + + def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = { + val cb = pbf(repr) + while (hasNext) { + val traversable = f(next) + if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator + else cb ++= traversable + } + cb + } + + def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](b: Bld): Bld = { + b.sizeHint(remaining) + while (hasNext) b += next + b + } + + def filter2combiner[U >: T, This >: Repr](pred: T => Boolean, cb: Combiner[U, This]): Combiner[U, This] = { + while (hasNext) { + val curr = next + if (pred(curr)) cb += curr + } + cb + } + + def filterNot2combiner[U >: T, This >: Repr](pred: T => Boolean, cb: Combiner[U, This]): Combiner[U, This] = { + while (hasNext) { + val curr = next + if (!pred(curr)) cb += curr + } + cb + } + + def partition2combiners[U >: T, This >: Repr](pred: T => Boolean, btrue: Combiner[U, This], bfalse: Combiner[U, This]) = { + while (hasNext) { + val curr = next + if (pred(curr)) btrue += curr + else bfalse += curr + } + (btrue, bfalse) + } + + def take2combiner[U >: T, This >: Repr](n: Int, cb: Combiner[U, This]): Combiner[U, This] = { + cb.sizeHint(n) + var left = n + while (left > 0) { + cb += next + left -= 1 + } + cb + } + + def drop2combiner[U >: T, This >: Repr](n: Int, cb: Combiner[U, This]): Combiner[U, This] = { + drop(n) + cb.sizeHint(remaining) + while (hasNext) cb += next + cb + } + + def slice2combiner[U >: T, This >: Repr](from: Int, until: Int, cb: Combiner[U, This]): Combiner[U, This] = { + drop(from) + var left = until - from + cb.sizeHint(left) + while (left > 0) { + cb += next + left -= 1 + } + cb + } + + def splitAt2combiners[U >: T, This >: Repr](at: Int, before: Combiner[U, This], after: Combiner[U, This]) = { + before.sizeHint(at) + after.sizeHint(remaining - at) + var left = at + while (left > 0) { + before += next + left -= 1 + } + while (hasNext) after += next + (before, after) + } + + def takeWhile2combiner[U >: T, This >: Repr](p: T => Boolean, cb: Combiner[U, This]) = { + var loop = true + while (hasNext && loop) { + val curr = next + if (p(curr)) cb += curr + else loop = false + } + (cb, loop) + } + + def span2combiners[U >: T, This >: Repr](p: T => Boolean, before: Combiner[U, This], after: Combiner[U, This]) = { + var isBefore = true + while (hasNext && isBefore) { + val curr = next + if (p(curr)) before += curr + else { + after.sizeHint(remaining + 1) + after += curr + isBefore = false + } + } + while (hasNext) after += next + (before, after) + } +} + + +trait AugmentedSeqIterator[+T, +Repr <: Parallel] extends AugmentedIterableIterator[T, Repr] { + + /** The exact number of elements this iterator has yet to iterate. + * This method doesn't change the state of the iterator. + */ + def remaining: Int + + /* accessors */ + + def prefixLength(pred: T => Boolean): Int = { + var total = 0 + var loop = true + while (hasNext && loop) { + if (pred(next)) total += 1 + else loop = false + } + total + } + + override def indexWhere(pred: T => Boolean): Int = { + var i = 0 + var loop = true + while (hasNext && loop) { + if (pred(next)) loop = false + else i += 1 + } + if (loop) -1 else i + } + + def lastIndexWhere(pred: T => Boolean): Int = { + var pos = -1 + var i = 0 + while (hasNext) { + if (pred(next)) pos = i + i += 1 + } + pos + } + + def corresponds[S](corr: (T, S) => Boolean)(that: Iterator[S]): Boolean = { + while (hasNext && that.hasNext) { + if (!corr(next, that.next)) return false + } + hasNext == that.hasNext + } + + /* transformers */ + + def reverse2combiner[U >: T, This >: Repr](cb: Combiner[U, This]): Combiner[U, This] = { + cb.sizeHint(remaining) + var lst = List[T]() + while (hasNext) lst ::= next + while (lst != Nil) { + cb += lst.head + lst = lst.tail + } + cb + } + + def reverseMap2combiner[S, That](f: T => S, cbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = { + val cb = cbf(repr) + cb.sizeHint(remaining) + var lst = List[S]() + while (hasNext) lst ::= f(next) + while (lst != Nil) { + cb += lst.head + lst = lst.tail + } + cb + } + + def updated2combiner[U >: T, That](index: Int, elem: U, cbf: CanCombineFrom[Repr, U, That]): Combiner[U, That] = { + val cb = cbf(repr) + cb.sizeHint(remaining) + var j = 0 + while (hasNext) { + if (j == index) { + cb += elem + next + } else cb += next + j += 1 + } + cb + } + +} + + + +trait ParallelIterableIterator[+T, +Repr <: Parallel] +extends AugmentedIterableIterator[T, Repr] + with Splitter[T] + with Signalling + with DelegatedSignalling +{ + def split: Seq[ParallelIterableIterator[T, Repr]] + + /** The number of elements this iterator has yet to traverse. This method + * doesn't change the state of the iterator. + * + * This method is used to provide size hints to builders and combiners, and + * to approximate positions of iterators within a data structure. + * + * '''Note''': This method may be implemented to return an upper bound on the number of elements + * in the iterator, instead of the exact number of elements to iterate. + * + * In that case, 2 considerations must be taken into account: + * + * 1) classes that inherit `ParallelIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt` and `copyToArray`. + * + * 2) if an iterator provides an upper bound on the number of elements, then after splitting the sum + * of `remaining` values of split iterators must be less than or equal to this upper bound. + */ + def remaining: Int +} + + +trait ParallelSeqIterator[+T, +Repr <: Parallel] +extends ParallelIterableIterator[T, Repr] + with AugmentedSeqIterator[T, Repr] + with PreciseSplitter[T] +{ + def split: Seq[ParallelSeqIterator[T, Repr]] + def psplit(sizes: Int*): Seq[ParallelSeqIterator[T, Repr]] + + /** The number of elements this iterator has yet to traverse. This method + * doesn't change the state of the iterator. Unlike the version of this method in the supertrait, + * method `remaining` in `ParallelSeqLike.this.ParallelIterator` must return an exact number + * of elements remaining in the iterator. + * + * @return an exact number of elements this iterator has yet to iterate + */ + def remaining: Int +} + + +trait DelegatedIterator[+T, +Delegate <: Iterator[T]] extends RemainsIterator[T] { + val delegate: Delegate + def next = delegate.next + def hasNext = delegate.hasNext +} + + +trait Counting[+T] extends RemainsIterator[T] { + val initialSize: Int + def remaining = initialSize - traversed + var traversed = 0 + abstract override def next = { + val n = super.next + traversed += 1 + n + } +} + + +/** A mixin for iterators that traverse only filtered elements of a delegate. + */ +trait FilteredIterator[+T, +Delegate <: Iterator[T]] extends DelegatedIterator[T, Delegate] { + protected[this] val pred: T => Boolean + + private[this] var hd: T = _ + private var hdDefined = false + + override def hasNext: Boolean = hdDefined || { + do { + if (!delegate.hasNext) return false + hd = delegate.next + } while (!pred(hd)) + hdDefined = true + true + } + + override def next = if (hasNext) { hdDefined = false; hd } else empty.next +} + + +/** A mixin for iterators that traverse elements of the delegate iterator, and of another collection. + */ +trait AppendedIterator[+T, +Delegate <: Iterator[T]] extends DelegatedIterator[T, Delegate] { + // `rest` should never alias `delegate` + protected[this] val rest: Iterator[T] + + private[this] var current: Iterator[T] = delegate + + override def hasNext = (current.hasNext) || (current == delegate && rest.hasNext) + + override def next = { + if (!current.hasNext) current = rest + current.next + } + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + |