summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-09-23 13:46:26 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-09-23 13:46:26 +0000
commita3aa8993d2ed9314206c1fbd2d5b56879f91bb0f (patch)
tree6ba553f9842c45a40a9131a051ef9dd48739b510
parentd7420203456f4369a490310170a2597cb4c32fe6 (diff)
downloadscala-a3aa8993d2ed9314206c1fbd2d5b56879f91bb0f.tar.gz
scala-a3aa8993d2ed9314206c1fbd2d5b56879f91bb0f.tar.bz2
scala-a3aa8993d2ed9314206c1fbd2d5b56879f91bb0f.zip
Adds a zip for ParIterables + a new Zipped view...
Adds a zip for ParIterables + a new Zipped view for ParSeqView and ParIterableView + a bench test. No review
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala22
-rw-r--r--src/library/scala/collection/parallel/ParIterableView.scala23
-rw-r--r--src/library/scala/collection/parallel/ParIterableViewLike.scala94
-rw-r--r--src/library/scala/collection/parallel/ParSeqViewLike.scala83
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala47
-rw-r--r--test/benchmarks/src/scala/collection/parallel/Benchmarking.scala1
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala4
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala1
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala19
9 files changed, 220 insertions, 74 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index e3fde884e2..0d6e9147aa 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -604,6 +604,12 @@ self =>
executeAndWait(new CopyToArray(start, len, xs, parallelIterator))
}
+ override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) {
+ val pbf = bf.asParallel
+ val thatseq = that.asParSeq
+ executeAndWaitResult(new Zip(pbf, parallelIterator, thatseq.parallelIterator) mapResult { _.result });
+ } else super.zip(that)(bf)
+
override def view = new ParIterableView[T, Repr, Sequential] {
protected lazy val underlying = self.repr
def seq = self.seq.view
@@ -936,6 +942,20 @@ self =>
}
}
+ protected[this] class Zip[U >: T, S, That](pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: PreciseSplitter[S])
+ extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] {
+ var result: Result = null
+ def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](othpit, pbf(self.repr))
+ protected[this] def newSubtask(p: ParIterableIterator[T]) = unsupported
+ override def split = {
+ val pits = pit.split
+ val sizes = pits.map(_.remaining)
+ val opits = othpit.psplit(sizes: _*)
+ (pits zip opits) map { p => new Zip(pbf, p._1, p._2) }
+ }
+ override def merge(that: Zip[U, S, That]) = result = result combine that.result
+ }
+
protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], protected[this] val pit: ParIterableIterator[T])
extends Accessor[Unit, CopyToArray[U, This]] {
var result: Unit = ()
@@ -997,6 +1017,7 @@ self =>
}
}
+ @deprecated
protected[this] class PartialScan[U >: T, A >: U](z: U, op: (U, U) => U, val from: Int, val len: Int, array: Array[A], protected[this] val pit: ParIterableIterator[T])
extends Accessor[ScanTree[U], PartialScan[U, A]] {
var result: ScanTree[U] = null
@@ -1043,6 +1064,7 @@ self =>
}
}
+ @deprecated
protected[this] class ApplyScanTree[U >: T, A >: U](first: Option[U], op: (U, U) => U, st: ScanTree[U], array: Array[A])
extends super.Task[Unit, ApplyScanTree[U, A]] {
var result = ();
diff --git a/src/library/scala/collection/parallel/ParIterableView.scala b/src/library/scala/collection/parallel/ParIterableView.scala
index dd703b5c8b..5efe87c9ee 100644
--- a/src/library/scala/collection/parallel/ParIterableView.scala
+++ b/src/library/scala/collection/parallel/ParIterableView.scala
@@ -6,6 +6,7 @@ package scala.collection.parallel
import scala.collection.Parallel
import scala.collection.TraversableViewLike
import scala.collection.IterableView
+import scala.collection.generic.CanCombineFrom
@@ -23,6 +24,28 @@ extends ParIterableViewLike[T, Coll, CollSeq, ParIterableView[T, Coll, CollSeq],
+object ParIterableView {
+ abstract class NoCombiner[T] extends Combiner[T, Nothing] {
+ self: EnvironmentPassingCombiner[T, Nothing] =>
+ def +=(elem: T): this.type = this
+ def iterator: Iterator[T] = Iterator.empty
+ def result() = throw new UnsupportedOperationException("ParIterableView.Combiner.result")
+ def size = throw new UnsupportedOperationException("ParIterableView.Combiner.size")
+ def clear() {}
+ def combine[N <: T, NewTo >: Nothing](other: Combiner[N, NewTo]) =
+ throw new UnsupportedOperationException("ParIterableView.Combiner.result")
+ }
+
+ type Coll = ParIterableView[_, C, _] forSome { type C <: ParIterable[_] }
+
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterableView[T, ParIterable[T], Iterable[T]]] =
+ new CanCombineFrom[Coll, T, ParIterableView[T, ParIterable[T], Iterable[T]]] {
+ def apply(from: Coll) = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing]
+ def apply() = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing]
+ }
+}
+
+
diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala
index 1f7ea9b694..90accfdb4a 100644
--- a/src/library/scala/collection/parallel/ParIterableViewLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala
@@ -8,6 +8,8 @@ import scala.collection.TraversableViewLike
import scala.collection.IterableView
import scala.collection.IterableViewLike
import scala.collection.generic.CanBuildFrom
+import scala.collection.generic.CanCombineFrom
+import scala.collection.parallel.immutable.ParRange
@@ -37,7 +39,7 @@ extends IterableView[T, Coll]
with ParIterable[T]
with ParIterableLike[T, This, ThisSeq]
{
- self =>
+self =>
override protected[this] def newCombiner: Combiner[T, This] = throw new UnsupportedOperationException(this + ".newCombiner");
@@ -64,36 +66,104 @@ extends IterableView[T, Coll]
// only use if other is a ParIterable, otherwise force
trait Appended[U >: T] extends super.Appended[U] with Transformed[U] {
- def restAsParIterable: ParIterable[U] = rest.asParIterable
- def parallelIterator: ParIterableIterator[U] = self.parallelIterator.appendIterable[U, ParIterableIterator[U]](restAsParIterable.parallelIterator)
+ def restPar: ParIterable[U] = rest.asParIterable
+ def parallelIterator: ParIterableIterator[U] = self.parallelIterator.appendParIterable[U, ParIterableIterator[U]](restPar.parallelIterator)
def seq = self.seq.++(rest).asInstanceOf[IterableView[U, CollSeq]]
}
trait Forced[S] extends super.Forced[S] with Transformed[S] {
def forcedPar: ParIterable[S] = forced.asParIterable
def parallelIterator: ParIterableIterator[S] = forcedPar.parallelIterator
- // cheating here - knowing that `underlying` of `self.seq` is of type `CollSeq`,
- // we use it to obtain a view of the correct type - not the most efficient thing
- // in the universe, but without making `newForced` more accessible, or adding
- // a `forced` method to `SeqView`, this is the best we can do
- def seq = self.seq.take(0).++(forced).asInstanceOf[IterableView[S, CollSeq]]
+ def seq = forcedPar.seq.view.asInstanceOf[IterableView[S, CollSeq]]
}
+ // only use if other is a ParSeq, otherwise force
+ trait Zipped[S] extends super.Zipped[S] with Transformed[(T, S)] {
+ def otherPar: ParSeq[S] = other.asParSeq
+ def parallelIterator: ParIterableIterator[(T, S)] = self.parallelIterator zipParSeq otherPar.parallelIterator
+ def seq = (self.seq zip other).asInstanceOf[IterableView[(T, S), CollSeq]]
+ }
+
+ // only use if other is a ParSeq, otherwise force
+ // trait ZippedAll[U >: T, S] extends Transformed[(U, S)] {
+ // def otherPar: ParSeq[S] = other.asParSeq
+ // def parallelIterator: ParIterableIterator[(T, S)] = self.parallelIterator zipAllParSeq otherPar.parallelIterator
+ // def seq =
+ // }
+
+ protected[this] def thisParSeq: ParSeq[T] = mutable.ParArray.fromTraversables(this.iterator)
+
/* operation overrides */
+ override def take(n: Int): This = newSliced(0, n).asInstanceOf[This]
+ override def drop(n: Int): This = newSliced(n, parallelIterator.remaining).asInstanceOf[This]
+ override def splitAt(n: Int): (This, This) = (take(n), drop(n))
override def slice(from: Int, until: Int): This = newSliced(from, until).asInstanceOf[This]
override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That]
- override def ++[U >: T, That](xs: TraversableOnce[U])(implicit bf: CanBuildFrom[This, U, That]): That = newAppended(xs.toTraversable).asInstanceOf[That]
+ override def ++[U >: T, That](xs: TraversableOnce[U])(implicit bf: CanBuildFrom[This, U, That]): That = newAppendedTryParIterable(xs.toTraversable).asInstanceOf[That]
+
+ override def filter(p: T => Boolean): This = newForced(thisParSeq.filter(p)).asInstanceOf[This]
+ override def filterNot(p: T => Boolean): This = newForced(thisParSeq.filterNot(p)).asInstanceOf[This]
+ override def partition(p: T => Boolean): (This, This) = {
+ val (t, f) = thisParSeq.partition(p)
+ (newForced(t).asInstanceOf[This], newForced(f).asInstanceOf[This])
+ }
+ override def takeWhile(p: T => Boolean): This = newForced(thisParSeq.takeWhile(p)).asInstanceOf[This]
+ override def dropWhile(p: T => Boolean): This = newForced(thisParSeq.dropWhile(p)).asInstanceOf[This]
+ override def span(p: T => Boolean): (This, This) = {
+ val (pref, suff) = thisParSeq.span(p)
+ (newForced(pref).asInstanceOf[This], newForced(suff).asInstanceOf[This])
+ }
+ override def flatMap[S, That](f: T => Traversable[S])(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisParSeq.flatMap(f)).asInstanceOf[That]
+
+ override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That]
+ override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That =
+ newZipped(new ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That]
+
+ override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf =>
+ executeAndWaitResult(new Force(pbf, parallelIterator) mapResult { _.result })
+ } otherwise {
+ val b = bf(underlying)
+ b ++= this.iterator
+ b.result
+ }
/* wrapper virtual ctors */
protected override def newSliced(f: Int, u: Int): Transformed[T] = new Sliced { val from = f; val until = u }
protected override def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f }
protected override def newForced[S](xs: => Seq[S]): Transformed[S] = new Forced[S] { val forced = xs }
- protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = {
+ protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = new Appended[U] { val rest = that }
+ protected override def newDroppedWhile(p: T => Boolean) = unsupported
+ protected override def newTakenWhile(p: T => Boolean) = unsupported
+ protected override def newFlatMapped[S](f: T => Traversable[S]) = unsupported
+ protected override def newFiltered(p: T => Boolean) = unsupported
+ protected override def newZipped[S](that: Iterable[S]): Transformed[(T, S)] = new Zipped[S] { val other = that }
+
+ /* argument sequence dependent ctors */
+
+ protected def newForcedTryParIterable[S](xs: => Seq[S]): Transformed[S] = {
+ if (xs.isParIterable) newForced[S](xs)
+ else newForced(mutable.ParArray.fromTraversables(xs))
+ }
+ protected def newAppendedTryParIterable[U >: T](that: Traversable[U]): Transformed[U] = {
// we only append if `that` is a parallel iterable, i.e. it has a splitter
- if (that.isParIterable) new Appended[U] { val rest = that }
- else newForced(mutable.ParArray.fromTraversables(this, that))
+ if (that.isParIterable) newAppended(that)
+ else newAppended(mutable.ParArray.fromTraversables(that))
+ }
+ protected def newZippedTryParSeq[S](that: Iterable[S]): Transformed[(T, S)] = {
+ if (that.isParSeq) newZipped[S](that)
+ else newZipped[S](mutable.ParArray.fromTraversables(that))
+ }
+
+ /* tasks */
+
+ protected[this] class Force[U >: T, That](cbf: CanCombineFrom[Coll, U, That], protected[this] val pit: ParIterableIterator[T])
+ extends Transformer[Combiner[U, That], Force[U, That]] {
+ var result: Combiner[U, That] = null
+ def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cbf(self.underlying)))
+ protected[this] def newSubtask(p: ParIterableIterator[T]) = new Force(cbf, p)
+ override def merge(that: Force[U, That]) = result = result combine that.result
}
}
diff --git a/src/library/scala/collection/parallel/ParSeqViewLike.scala b/src/library/scala/collection/parallel/ParSeqViewLike.scala
index 1aac72767a..809897d7ed 100644
--- a/src/library/scala/collection/parallel/ParSeqViewLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqViewLike.scala
@@ -9,7 +9,7 @@ import scala.collection.SeqViewLike
import scala.collection.Parallel
import scala.collection.generic.CanBuildFrom
import scala.collection.generic.CanCombineFrom
-
+import scala.collection.parallel.immutable.ParRange
@@ -38,7 +38,7 @@ extends SeqView[T, Coll]
with ParSeq[T]
with ParSeqLike[T, This, ThisSeq]
{
- self =>
+self =>
type SCPI = SignalContextPassingIterator[ParIterator]
@@ -59,41 +59,23 @@ extends SeqView[T, Coll]
}
trait Appended[U >: T] extends super[SeqViewLike].Appended[U] with super[ParIterableViewLike].Appended[U] with Transformed[U] {
- def restAsParSeq: ParSeq[U] = rest.asInstanceOf[ParSeq[U]]
- override def parallelIterator = self.parallelIterator.appendSeq[U, ParSeqIterator[U]](restAsParSeq.parallelIterator)
+ override def restPar: ParSeq[U] = rest.asParSeq
+ override def parallelIterator = self.parallelIterator.appendParSeq[U, ParSeqIterator[U]](restPar.parallelIterator)
override def seq = self.seq.++(rest).asInstanceOf[SeqView[U, CollSeq]]
}
trait Forced[S] extends super[SeqViewLike].Forced[S] with super[ParIterableViewLike].Forced[S] with Transformed[S] {
override def forcedPar: ParSeq[S] = forced.asParSeq
override def parallelIterator: ParSeqIterator[S] = forcedPar.parallelIterator
- // cheating here - knowing that `underlying` of `self.seq` is of type `CollSeq`,
- // we use it to obtain a view of the correct type - not the most efficient thing
- // in the universe, but without making `newForced` more accessible, or adding
- // a `forced` method to `SeqView`, this is the best we can do
- override def seq = self.seq.take(0).++(forced).asInstanceOf[SeqView[S, CollSeq]]
- }
-
- trait Filtered extends super.Filtered with Transformed[T] {
- def seq = self.seq filter pred
- }
-
- trait FlatMapped[S] extends super.FlatMapped[S] with Transformed[S] {
- def seq = self.seq.flatMap(mapping).asInstanceOf[SeqView[S, CollSeq]]
- }
-
- trait TakenWhile extends super.TakenWhile with Transformed[T] {
- def seq = self.seq takeWhile pred
- }
-
- trait DroppedWhile extends super.DroppedWhile with Transformed[T] {
- def seq = self.seq dropWhile pred
+ override def seq = forcedPar.seq.view.asInstanceOf[SeqView[S, CollSeq]]
}
- trait Zipped[S] extends super.Zipped[S] with Transformed[(T, S)] {
- def seq = (self.seq zip other).asInstanceOf[SeqView[(T, S), CollSeq]]
+ trait Zipped[S] extends super[SeqViewLike].Zipped[S] with super[ParIterableViewLike].Zipped[S] with Transformed[(T, S)] {
+ override def parallelIterator = self.parallelIterator zipParSeq otherPar.parallelIterator
+ override def seq = (self.seq zip other).asInstanceOf[SeqView[(T, S), CollSeq]]
}
+ // TODO from
trait ZippedAll[T1 >: T, S] extends super.ZippedAll[T1, S] with Transformed[(T1, S)] {
def seq = self.seq.zipAll(other, thisElem, thatElem).asInstanceOf[SeqView[(T1, S), CollSeq]]
}
@@ -109,40 +91,52 @@ extends SeqView[T, Coll]
trait Prepended[U >: T] extends super.Prepended[U] with Transformed[U] {
def seq = (fst +: self.seq).asInstanceOf[SeqView[U, CollSeq]]
}
+ // TODO until
+
+ /* wrapper virtual ctors */
- protected override def newFiltered(p: T => Boolean): Transformed[T] = new Filtered { val pred = p }
protected override def newSliced(f: Int, u: Int): Transformed[T] = new Sliced { val from = f; val until = u }
protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = {
// we only append if `that` is a parallel sequence, i.e. it has a precise splitter
if (that.isParSeq) new Appended[U] { val rest = that }
else newForced(mutable.ParArray.fromTraversables(this, that))
}
- protected override def newForced[S](xs: => Seq[S]): Transformed[S] = new Forced[S] { val forced = xs }
-
+ protected override def newForced[S](xs: => Seq[S]): Transformed[S] = {
+ if (xs.isParSeq) new Forced[S] { val forced = xs }
+ else new Forced[S] { val forced = mutable.ParArray.fromTraversables(xs) }
+ }
protected override def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f }
- protected override def newFlatMapped[S](f: T => Traversable[S]): Transformed[S] = new FlatMapped[S] { val mapping = f }
- protected override def newDroppedWhile(p: T => Boolean): Transformed[T] = new DroppedWhile { val pred = p }
- protected override def newTakenWhile(p: T => Boolean): Transformed[T] = new TakenWhile { val pred = p }
protected override def newZipped[S](that: Iterable[S]): Transformed[(T, S)] = new Zipped[S] { val other = that }
+
+ // TODO from here
protected override def newZippedAll[T1 >: T, S](that: Iterable[S], _thisElem: T1, _thatElem: S): Transformed[(T1, S)] = new ZippedAll[T1, S] { val other = that; val thisElem = _thisElem; val thatElem = _thatElem }
protected override def newReversed: Transformed[T] = new Reversed { }
protected override def newPatched[U >: T](_from: Int, _patch: Seq[U], _replaced: Int): Transformed[U] = new Patched[U] { val from = _from; val patch = _patch; val replaced = _replaced }
protected override def newPrepended[U >: T](elem: U): Transformed[U] = new Prepended[U] { protected[this] val fst = elem }
+ // TODO until here
+
+ /* operation overrides */
- override def filter(p: T => Boolean): This = newFiltered(p).asInstanceOf[This]
- override def filterNot(p: T => Boolean): This = newFiltered(!p(_)).asInstanceOf[This]
- override def partition(p: T => Boolean): (This, This) = (filter(p), filterNot(p))
override def slice(from: Int, until: Int): This = newSliced(from, until).asInstanceOf[This]
override def take(n: Int): This = newSliced(0, n).asInstanceOf[This]
override def drop(n: Int): This = newSliced(n, length).asInstanceOf[This]
override def splitAt(n: Int): (This, This) = (take(n), drop(n))
override def ++[U >: T, That](xs: TraversableOnce[U])(implicit bf: CanBuildFrom[This, U, That]): That = newAppended(xs.toTraversable).asInstanceOf[That]
override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That]
- override def flatMap[S, That](f: T => Traversable[S])(implicit bf: CanBuildFrom[This, S, That]): That = newFlatMapped(f).asInstanceOf[That]
+ override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That]
+ override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That =
+ newZipped(new ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That]
+
+ override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf =>
+ executeAndWaitResult(new Force(pbf, parallelIterator) mapResult { _.result })
+ } otherwise {
+ val b = bf(underlying)
+ b ++= this.iterator
+ b.result
+ }
+
+ // TODO from here
override def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[This, S, That]): That = filter(pf.isDefinedAt).map(pf)(bf)
- override def takeWhile(p: T => Boolean): This = newTakenWhile(p).asInstanceOf[This]
- override def dropWhile(p: T => Boolean): This = newDroppedWhile(p).asInstanceOf[This]
- override def span(p: T => Boolean): (This, This) = (takeWhile(p), dropWhile(p))
override def scanLeft[S, That](z: S)(op: (S, T) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisSeq.scanLeft(z)(op)).asInstanceOf[That]
override def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisSeq.scanRight(z)(op)).asInstanceOf[That]
override def groupBy[K](f: T => K): collection.immutable.Map[K, This] = thisSeq.groupBy(f).mapValues(xs => newForced(xs).asInstanceOf[This])
@@ -161,14 +155,6 @@ extends SeqView[T, Coll]
override def intersect[U >: T](that: Seq[U]): This = newForced(thisSeq intersect that).asInstanceOf[This]
override def sorted[U >: T](implicit ord: Ordering[U]): This = newForced(thisSeq sorted ord).asInstanceOf[This]
- override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf =>
- executeAndWaitResult(new Force(pbf, parallelIterator) mapResult { _.result })
- } otherwise {
- val b = bf(underlying)
- b ++= this.iterator
- b.result
- }
-
/* tasks */
protected[this] class Force[U >: T, That](cbf: CanCombineFrom[Coll, U, That], protected[this] val pit: ParSeqIterator[T])
@@ -178,6 +164,7 @@ extends SeqView[T, Coll]
protected[this] def newSubtask(p: SuperParIterator) = new Force(cbf, down(p))
override def merge(that: Force[U, That]) = result = result combine that.result
}
+ // TODO until here
}
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index 43acf3b41e..2652fbfb53 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -219,6 +219,14 @@ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] {
}
}
+ def zip2combiner[U >: T, S, That](otherpit: Iterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
+ cb.sizeHint(remaining)
+ while (hasNext && otherpit.hasNext) {
+ cb += ((next, otherpit.next))
+ }
+ cb
+ }
+
}
@@ -307,17 +315,6 @@ trait AugmentedSeqIterator[+T] extends AugmentedIterableIterator[T] {
cb
}
- /** Iterator `otherpit` must have equal or more elements.
- */
- def zip2combiner[U >: T, S, That](otherpit: Iterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
- //val cb = cbf(repr)
- cb.sizeHint(remaining)
- while (hasNext) {
- cb += ((next, otherpit.next))
- }
- cb
- }
-
}
@@ -400,7 +397,22 @@ self =>
def split: Seq[ParIterableIterator[U]] = if (firstNonEmpty) Seq(curr, that) else curr.split
}
- def appendIterable[U >: T, PI <: ParIterableIterator[U]](that: PI) = new Appended[U, PI](that)
+ def appendParIterable[U >: T, PI <: ParIterableIterator[U]](that: PI) = new Appended[U, PI](that)
+
+ class Zipped[S](protected val that: ParSeqIterator[S]) extends ParIterableIterator[(T, S)] {
+ var signalDelegate = self.signalDelegate
+ def hasNext = self.hasNext && that.hasNext
+ def next = (self.next, that.next)
+ def remaining = self.remaining min that.remaining
+ def split: Seq[ParIterableIterator[(T, S)]] = {
+ val selfs = self.split
+ val sizes = selfs.map(_.remaining)
+ val thats = that.psplit(sizes: _*)
+ (selfs zip thats) map { p => p._1 zipParSeq p._2 }
+ }
+ }
+
+ def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that)
}
@@ -469,12 +481,19 @@ self =>
val thats = that.psplit(thatsizes: _*)
// appended last in self with first in rest if necessary
- if (appendMiddle) selfs.init ++ Seq(selfs.last.appendSeq[U, ParSeqIterator[U]](thats.head)) ++ thats.tail
+ if (appendMiddle) selfs.init ++ Seq(selfs.last.appendParSeq[U, ParSeqIterator[U]](thats.head)) ++ thats.tail
else selfs ++ thats
} else curr.asInstanceOf[ParSeqIterator[U]].psplit(sizes: _*)
}
- def appendSeq[U >: T, PI <: ParSeqIterator[U]](that: PI) = new Appended[U, PI](that)
+ def appendParSeq[U >: T, PI <: ParSeqIterator[U]](that: PI) = new Appended[U, PI](that)
+
+ class Zipped[S](ti: ParSeqIterator[S]) extends super.Zipped[S](ti) with ParSeqIterator[(T, S)] {
+ override def split: Seq[ParSeqIterator[(T, S)]] = super.split.asInstanceOf[Seq[ParSeqIterator[(T, S)]]]
+ def psplit(szs: Int*) = (self.psplit(szs: _*) zip that.psplit(szs: _*)) map { p => p._1 zipParSeq p._2 }
+ }
+
+ override def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that)
}
diff --git a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala
index f733f4154b..954ee727a0 100644
--- a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala
+++ b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala
@@ -76,6 +76,7 @@ trait BenchmarkRegister {
register(parallel_view.DummyViewBenchList.IterationS)
register(parallel_view.DummyViewBenchList.IterationM)
register(parallel_view.DummyViewBenchList.IterationA)
+ register(parallel_view.DummyViewBenchList.IterationZ)
// parallel ranges
register(parallel_range.RangeBenches.Reduce)
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala
index ae93c7adf4..84cc25610b 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala
@@ -56,6 +56,10 @@ object DummyOperators extends Operators[Dummy] {
val eachFun: Dummy => Unit = (d: Dummy) => {
d.dummy
}
+ override val eachPairFun: ((Dummy, Dummy)) => Unit = p => {
+ p._1.dummy
+ p._2.dummy
+ }
override def sequence(sz: Int): Seq[Dummy] = {
val pa = new collection.parallel.mutable.ParArray[Dummy](sz)
for (i <- 0 until sz) pa(i) = new Dummy(i)
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala
index 4bd693a933..c86fe66e63 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala
@@ -15,6 +15,7 @@ trait Operators[T] {
def heavymapper: T => T
def taker: T => Boolean
def eachFun: T => Unit
+ def eachPairFun: ((T, T)) => Unit = error("unsupported")
def sequence(sz: Int): Seq[T] = error("unsupported")
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala
index fd4e87ab4c..8516b77191 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala
@@ -200,6 +200,25 @@ self =>
def companion = IterationA
}
+ object IterationZ extends SeqBenchCompanion {
+ override def defaultSize = 50000
+ def benchName = "iter-z"
+ def apply(sz: Int, p: Int, w: String) = new IterationZ(sz, p, w)
+ }
+
+ class IterationZ(val size: Int, val parallelism: Int, val runWhat: String)
+ extends SeqBench with SeqViewBench {
+ val zipped = operators.sequence(size)
+ def comparisonMap = collection.Map("seqview" -> runseqview _)
+ def runseq = {
+ val withzip = this.seqcoll.zip(zipped)
+ withzip.foreach(operators.eachPairFun)
+ }
+ def runpar = this.parcoll.zip(zipped).foreach(operators.eachPairFun)
+ def runseqview = this.seqview.zip(zipped).foreach(operators.eachPairFun)
+ def companion = IterationZ
+ }
+
object Reduce extends SeqBenchCompanion {
override def defaultSize = 50000
def benchName = "reduce";