summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-09-23 16:08:22 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-09-23 16:08:22 +0000
commit6e710c26ea5b660b168d9ccdae4d6670f731ad6a (patch)
treead6af5b9764ddcd8f9aa56d559d06cc498978aab
parentd7739fc01492e674a2806e08558a8112a885ee1b (diff)
downloadscala-6e710c26ea5b660b168d9ccdae4d6670f731ad6a.tar.gz
scala-6e710c26ea5b660b168d9ccdae4d6670f731ad6a.tar.bz2
scala-6e710c26ea5b660b168d9ccdae4d6670f731ad6a.zip
zippedWithIndex and zippedAll added to ParItera...
zippedWithIndex and zippedAll added to ParIterable. ZippedAll view reimplemented. No review
-rw-r--r--src/library/scala/collection/IterableViewLike.scala7
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala32
-rw-r--r--src/library/scala/collection/parallel/ParIterableViewLike.scala21
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala2
-rw-r--r--src/library/scala/collection/parallel/ParSeqViewLike.scala13
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala57
-rw-r--r--src/library/scala/collection/parallel/immutable/package.scala2
7 files changed, 117 insertions, 17 deletions
diff --git a/src/library/scala/collection/IterableViewLike.scala b/src/library/scala/collection/IterableViewLike.scala
index 77d3ac770b..9f77c6965b 100644
--- a/src/library/scala/collection/IterableViewLike.scala
+++ b/src/library/scala/collection/IterableViewLike.scala
@@ -95,6 +95,9 @@ extends Iterable[A] with IterableLike[A, This] with TraversableView[A, Coll] wit
override def zipAll[B, A1 >: A, That](that: Iterable[B], thisElem: A1, thatElem: B)(implicit bf: CanBuildFrom[This, (A1, B), That]): That =
newZippedAll(that, thisElem, thatElem).asInstanceOf[That]
+ /** Boilerplate method, to override in each subclass
+ * This method could be eliminated if Scala had virtual classes
+ */
protected def newZipped[B](that: Iterable[B]): Transformed[(A, B)] = new Zipped[B] {
val other = that
}
@@ -103,10 +106,6 @@ extends Iterable[A] with IterableLike[A, This] with TraversableView[A, Coll] wit
val thisElem = _thisElem
val thatElem = _thatElem
}
-
- /** Boilerplate method, to override in each subclass
- * This method could be eliminated if Scala had virtual classes
- */
protected override def newForced[B](xs: => Seq[B]): Transformed[B] = new Forced[B] { val forced = xs }
protected override def newAppended[B >: A](that: Traversable[B]): Transformed[B] = new Appended[B] { val rest = that }
protected override def newMapped[B](f: A => B): Transformed[B] = new Mapped[B] { val mapping = f }
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 0d6e9147aa..05978d2f87 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -610,6 +610,14 @@ self =>
executeAndWaitResult(new Zip(pbf, parallelIterator, thatseq.parallelIterator) mapResult { _.result });
} else super.zip(that)(bf)
+ override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip new immutable.ParRange(0, size, 1, false)
+
+ override def zipAll[S, U >: T, That](that: Iterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) {
+ val pbf = bf.asParallel
+ val thatseq = that.asParSeq
+ executeAndWaitResult(new ZipAll(size max thatseq.length, thisElem, thatElem, pbf, parallelIterator, thatseq.parallelIterator) mapResult { _.result });
+ } else super.zipAll(that, thisElem, thatElem)(bf)
+
override def view = new ParIterableView[T, Repr, Sequential] {
protected lazy val underlying = self.repr
def seq = self.seq.view
@@ -942,7 +950,7 @@ self =>
}
}
- protected[this] class Zip[U >: T, S, That](pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: PreciseSplitter[S])
+ protected[this] class Zip[U >: T, S, That](pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: ParSeqIterator[S])
extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] {
var result: Result = null
def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](othpit, pbf(self.repr))
@@ -956,6 +964,28 @@ self =>
override def merge(that: Zip[U, S, That]) = result = result combine that.result
}
+ protected[this] class ZipAll[U >: T, S, That]
+ (len: Int, thiselem: U, thatelem: S, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: ParSeqIterator[S])
+ extends Transformer[Combiner[(U, S), That], ZipAll[U, S, That]] {
+ var result: Result = null
+ def leaf(prev: Option[Result]) = result = pit.zipAll2combiner[U, S, That](othpit, thiselem, thatelem, pbf(self.repr))
+ protected[this] def newSubtask(p: ParIterableIterator[T]) = unsupported
+ override def split = if (pit.remaining <= len) {
+ val pits = pit.split
+ val sizes = pits.map(_.remaining)
+ val opits = othpit.psplit(sizes: _*)
+ ((pits zip opits) zip sizes) map { t => new ZipAll(t._2, thiselem, thatelem, pbf, t._1._1, t._1._2) }
+ } else {
+ val opits = othpit.psplit(pit.remaining)
+ val diff = len - pit.remaining
+ Seq(
+ new ZipAll(pit.remaining, thiselem, thatelem, pbf, pit, opits(0)), // nothing wrong will happen with the cast below - elem T is never accessed
+ new ZipAll(diff, thiselem, thatelem, pbf, immutable.repetition(thiselem, diff).parallelIterator.asInstanceOf[ParIterableIterator[T]], opits(1))
+ )
+ }
+ override def merge(that: ZipAll[U, S, That]) = result = result combine that.result
+ }
+
protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], protected[this] val pit: ParIterableIterator[T])
extends Accessor[Unit, CopyToArray[U, This]] {
var result: Unit = ()
diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala
index 90accfdb4a..c7737a7fab 100644
--- a/src/library/scala/collection/parallel/ParIterableViewLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala
@@ -85,11 +85,11 @@ self =>
}
// only use if other is a ParSeq, otherwise force
- // trait ZippedAll[U >: T, S] extends Transformed[(U, S)] {
- // def otherPar: ParSeq[S] = other.asParSeq
- // def parallelIterator: ParIterableIterator[(T, S)] = self.parallelIterator zipAllParSeq otherPar.parallelIterator
- // def seq =
- // }
+ trait ZippedAll[U >: T, S] extends super.ZippedAll[U, S] with Transformed[(U, S)] {
+ def otherPar: ParSeq[S] = other.asParSeq
+ def parallelIterator: ParIterableIterator[(U, S)] = self.parallelIterator.zipAllParSeq(otherPar.parallelIterator, thisElem, thatElem)
+ def seq = (self.seq.zipAll(other, thisElem, thatElem)).asInstanceOf[IterableView[(U, S), CollSeq]]
+ }
protected[this] def thisParSeq: ParSeq[T] = mutable.ParArray.fromTraversables(this.iterator)
@@ -119,6 +119,8 @@ self =>
override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That]
override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That =
newZipped(new ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That]
+ override def zipAll[S, U >: T, That](that: Iterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[This, (U, S), That]): That =
+ newZippedAllTryParSeq(that, thisElem, thatElem).asInstanceOf[That]
override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf =>
executeAndWaitResult(new Force(pbf, parallelIterator) mapResult { _.result })
@@ -139,6 +141,11 @@ self =>
protected override def newFlatMapped[S](f: T => Traversable[S]) = unsupported
protected override def newFiltered(p: T => Boolean) = unsupported
protected override def newZipped[S](that: Iterable[S]): Transformed[(T, S)] = new Zipped[S] { val other = that }
+ protected override def newZippedAll[U >: T, S](that: Iterable[S], _thisElem: U, _thatElem: S): Transformed[(U, S)] = new ZippedAll[U, S] {
+ val other = that
+ val thisElem = _thisElem
+ val thatElem = _thatElem
+ }
/* argument sequence dependent ctors */
@@ -155,6 +162,10 @@ self =>
if (that.isParSeq) newZipped[S](that)
else newZipped[S](mutable.ParArray.fromTraversables(that))
}
+ protected def newZippedAllTryParSeq[S, U >: T](that: Iterable[S], thisElem: U, thatElem: S): Transformed[(U, S)] = {
+ if (that.isParSeq) newZippedAll(that, thisElem, thatElem)
+ else newZippedAll(mutable.ParArray.fromTraversables(that), thisElem, thatElem)
+ }
/* tasks */
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index 9ea647fd9f..cc68191f9d 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -424,7 +424,7 @@ self =>
override def merge(that: Updated[U, That]) = result = result combine that.result
}
- protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[S])
+ protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParSeqIterator[T], val otherpit: ParSeqIterator[S])
extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] {
var result: Result = null
def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, pbf(self.repr))
diff --git a/src/library/scala/collection/parallel/ParSeqViewLike.scala b/src/library/scala/collection/parallel/ParSeqViewLike.scala
index 809897d7ed..2116d4f4b4 100644
--- a/src/library/scala/collection/parallel/ParSeqViewLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqViewLike.scala
@@ -75,11 +75,12 @@ self =>
override def seq = (self.seq zip other).asInstanceOf[SeqView[(T, S), CollSeq]]
}
- // TODO from
- trait ZippedAll[T1 >: T, S] extends super.ZippedAll[T1, S] with Transformed[(T1, S)] {
- def seq = self.seq.zipAll(other, thisElem, thatElem).asInstanceOf[SeqView[(T1, S), CollSeq]]
+ trait ZippedAll[U >: T, S] extends super[SeqViewLike].ZippedAll[U, S] with super[ParIterableViewLike].ZippedAll[U, S] with Transformed[(U, S)] {
+ override def parallelIterator: ParSeqIterator[(U, S)] = self.parallelIterator.zipAllParSeq(otherPar.parallelIterator, thisElem, thatElem)
+ override def seq = (self.seq.zipAll(other, thisElem, thatElem)).asInstanceOf[SeqView[(U, S), CollSeq]]
}
+ // TODO from
trait Reversed extends super.Reversed with Transformed[T] {
def seq = self.seq.reverse
}
@@ -107,9 +108,13 @@ self =>
}
protected override def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f }
protected override def newZipped[S](that: Iterable[S]): Transformed[(T, S)] = new Zipped[S] { val other = that }
+ protected override def newZippedAll[U >: T, S](that: Iterable[S], _thisElem: U, _thatElem: S): Transformed[(U, S)] = new ZippedAll[U, S] {
+ val other = that
+ val thisElem = _thisElem
+ val thatElem = _thatElem
+ }
// TODO from here
- protected override def newZippedAll[T1 >: T, S](that: Iterable[S], _thisElem: T1, _thatElem: S): Transformed[(T1, S)] = new ZippedAll[T1, S] { val other = that; val thisElem = _thisElem; val thatElem = _thatElem }
protected override def newReversed: Transformed[T] = new Reversed { }
protected override def newPatched[U >: T](_from: Int, _patch: Seq[U], _replaced: Int): Transformed[U] = new Patched[U] { val from = _from; val patch = _patch; val replaced = _replaced }
protected override def newPrepended[U >: T](elem: U): Transformed[U] = new Prepended[U] { protected[this] val fst = elem }
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index 2652fbfb53..56eaf5a451 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -8,6 +8,8 @@ import scala.collection.generic.DelegatedSignalling
import scala.collection.generic.CanCombineFrom
import scala.collection.mutable.Builder
import scala.collection.Iterator.empty
+import scala.collection.parallel.immutable.repetition
+
trait RemainsIterator[+T] extends Iterator[T] {
@@ -219,14 +221,22 @@ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] {
}
}
- def zip2combiner[U >: T, S, That](otherpit: Iterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
- cb.sizeHint(remaining)
+ def zip2combiner[U >: T, S, That](otherpit: RemainsIterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
+ cb.sizeHint(remaining min otherpit.remaining)
while (hasNext && otherpit.hasNext) {
cb += ((next, otherpit.next))
}
cb
}
+ def zipAll2combiner[U >: T, S, That](that: RemainsIterator[S], thiselem: U, thatelem: S, cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
+ cb.sizeHint(remaining max that.remaining)
+ while (this.hasNext && that.hasNext) cb += ((this.next, that.next))
+ while (this.hasNext) cb += ((this.next, thatelem))
+ while (that.hasNext) cb += ((thiselem, that.next))
+ cb
+ }
+
}
@@ -414,6 +424,27 @@ self =>
def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that)
+ class ZippedAll[U >: T, S](protected val that: ParSeqIterator[S], protected val thiselem: U, protected val thatelem: S)
+ extends ParIterableIterator[(U, S)] {
+ var signalDelegate = self.signalDelegate
+ def hasNext = self.hasNext || that.hasNext
+ def next = if (self.hasNext) {
+ if (that.hasNext) (self.next, that.next)
+ else (self.next, thatelem)
+ } else (thiselem, that.next);
+ def remaining = self.remaining max that.remaining
+ def split: Seq[ParIterableIterator[(U, S)]] = {
+ val selfrem = self.remaining
+ val thatrem = that.remaining
+ val thisit = if (selfrem < thatrem) self.appendParIterable[U, ParSeqIterator[U]](repetition[U](thiselem, thatrem - selfrem).parallelIterator) else self
+ val thatit = if (selfrem > thatrem) that.appendParSeq(repetition(thatelem, selfrem - thatrem).parallelIterator) else that
+ val zipped = thisit zipParSeq thatit
+ zipped.split
+ }
+ }
+
+ def zipAllParSeq[S, U >: T, R >: S](that: ParSeqIterator[S], thisElem: U, thatElem: R) = new ZippedAll[U, R](that, thisElem, thatElem)
+
}
@@ -495,6 +526,28 @@ self =>
override def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that)
+ class ZippedAll[U >: T, S](ti: ParSeqIterator[S], thise: U, thate: S) extends super.ZippedAll[U, S](ti, thise, thate) with ParSeqIterator[(U, S)] {
+ private def patchem = {
+ val selfrem = self.remaining
+ val thatrem = that.remaining
+ val thisit = if (selfrem < thatrem) self.appendParSeq[U, ParSeqIterator[U]](repetition[U](thiselem, thatrem - selfrem).parallelIterator) else self
+ val thatit = if (selfrem > thatrem) that.appendParSeq(repetition(thatelem, selfrem - thatrem).parallelIterator) else that
+ (thisit, thatit)
+ }
+ override def split: Seq[ParSeqIterator[(U, S)]] = {
+ val (thisit, thatit) = patchem
+ val zipped = thisit zipParSeq thatit
+ zipped.split
+ }
+ def psplit(sizes: Int*): Seq[ParSeqIterator[(U, S)]] = {
+ val (thisit, thatit) = patchem
+ val zipped = thisit zipParSeq thatit
+ zipped.psplit(sizes: _*)
+ }
+ }
+
+ override def zipAllParSeq[S, U >: T, R >: S](that: ParSeqIterator[S], thisElem: U, thatElem: R) = new ZippedAll[U, R](that, thisElem, thatElem)
+
}
diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala
index 6049eaee15..d529d223c9 100644
--- a/src/library/scala/collection/parallel/immutable/package.scala
+++ b/src/library/scala/collection/parallel/immutable/package.scala
@@ -12,6 +12,8 @@ package scala.collection.parallel
package object immutable {
+ def repetition[T](elem: T, len: Int) = new Repetition(elem, len)
+
/** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method.
*
* @tparam T type of the elements