diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-09-23 16:08:22 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-09-23 16:08:22 +0000 |
commit | 6e710c26ea5b660b168d9ccdae4d6670f731ad6a (patch) | |
tree | ad6af5b9764ddcd8f9aa56d559d06cc498978aab | |
parent | d7739fc01492e674a2806e08558a8112a885ee1b (diff) | |
download | scala-6e710c26ea5b660b168d9ccdae4d6670f731ad6a.tar.gz scala-6e710c26ea5b660b168d9ccdae4d6670f731ad6a.tar.bz2 scala-6e710c26ea5b660b168d9ccdae4d6670f731ad6a.zip |
zippedWithIndex and zippedAll added to ParItera...
zippedWithIndex and zippedAll added to ParIterable. ZippedAll view
reimplemented. No review
7 files changed, 117 insertions, 17 deletions
diff --git a/src/library/scala/collection/IterableViewLike.scala b/src/library/scala/collection/IterableViewLike.scala index 77d3ac770b..9f77c6965b 100644 --- a/src/library/scala/collection/IterableViewLike.scala +++ b/src/library/scala/collection/IterableViewLike.scala @@ -95,6 +95,9 @@ extends Iterable[A] with IterableLike[A, This] with TraversableView[A, Coll] wit override def zipAll[B, A1 >: A, That](that: Iterable[B], thisElem: A1, thatElem: B)(implicit bf: CanBuildFrom[This, (A1, B), That]): That = newZippedAll(that, thisElem, thatElem).asInstanceOf[That] + /** Boilerplate method, to override in each subclass + * This method could be eliminated if Scala had virtual classes + */ protected def newZipped[B](that: Iterable[B]): Transformed[(A, B)] = new Zipped[B] { val other = that } @@ -103,10 +106,6 @@ extends Iterable[A] with IterableLike[A, This] with TraversableView[A, Coll] wit val thisElem = _thisElem val thatElem = _thatElem } - - /** Boilerplate method, to override in each subclass - * This method could be eliminated if Scala had virtual classes - */ protected override def newForced[B](xs: => Seq[B]): Transformed[B] = new Forced[B] { val forced = xs } protected override def newAppended[B >: A](that: Traversable[B]): Transformed[B] = new Appended[B] { val rest = that } protected override def newMapped[B](f: A => B): Transformed[B] = new Mapped[B] { val mapping = f } diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 0d6e9147aa..05978d2f87 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -610,6 +610,14 @@ self => executeAndWaitResult(new Zip(pbf, parallelIterator, thatseq.parallelIterator) mapResult { _.result }); } else super.zip(that)(bf) + override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip new immutable.ParRange(0, size, 1, false) + + override def zipAll[S, U >: T, That](that: Iterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { + val pbf = bf.asParallel + val thatseq = that.asParSeq + executeAndWaitResult(new ZipAll(size max thatseq.length, thisElem, thatElem, pbf, parallelIterator, thatseq.parallelIterator) mapResult { _.result }); + } else super.zipAll(that, thisElem, thatElem)(bf) + override def view = new ParIterableView[T, Repr, Sequential] { protected lazy val underlying = self.repr def seq = self.seq.view @@ -942,7 +950,7 @@ self => } } - protected[this] class Zip[U >: T, S, That](pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: PreciseSplitter[S]) + protected[this] class Zip[U >: T, S, That](pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: ParSeqIterator[S]) extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] { var result: Result = null def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](othpit, pbf(self.repr)) @@ -956,6 +964,28 @@ self => override def merge(that: Zip[U, S, That]) = result = result combine that.result } + protected[this] class ZipAll[U >: T, S, That] + (len: Int, thiselem: U, thatelem: S, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: ParSeqIterator[S]) + extends Transformer[Combiner[(U, S), That], ZipAll[U, S, That]] { + var result: Result = null + def leaf(prev: Option[Result]) = result = pit.zipAll2combiner[U, S, That](othpit, thiselem, thatelem, pbf(self.repr)) + protected[this] def newSubtask(p: ParIterableIterator[T]) = unsupported + override def split = if (pit.remaining <= len) { + val pits = pit.split + val sizes = pits.map(_.remaining) + val opits = othpit.psplit(sizes: _*) + ((pits zip opits) zip sizes) map { t => new ZipAll(t._2, thiselem, thatelem, pbf, t._1._1, t._1._2) } + } else { + val opits = othpit.psplit(pit.remaining) + val diff = len - pit.remaining + Seq( + new ZipAll(pit.remaining, thiselem, thatelem, pbf, pit, opits(0)), // nothing wrong will happen with the cast below - elem T is never accessed + new ZipAll(diff, thiselem, thatelem, pbf, immutable.repetition(thiselem, diff).parallelIterator.asInstanceOf[ParIterableIterator[T]], opits(1)) + ) + } + override def merge(that: ZipAll[U, S, That]) = result = result combine that.result + } + protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[Unit, CopyToArray[U, This]] { var result: Unit = () diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala index 90accfdb4a..c7737a7fab 100644 --- a/src/library/scala/collection/parallel/ParIterableViewLike.scala +++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala @@ -85,11 +85,11 @@ self => } // only use if other is a ParSeq, otherwise force - // trait ZippedAll[U >: T, S] extends Transformed[(U, S)] { - // def otherPar: ParSeq[S] = other.asParSeq - // def parallelIterator: ParIterableIterator[(T, S)] = self.parallelIterator zipAllParSeq otherPar.parallelIterator - // def seq = - // } + trait ZippedAll[U >: T, S] extends super.ZippedAll[U, S] with Transformed[(U, S)] { + def otherPar: ParSeq[S] = other.asParSeq + def parallelIterator: ParIterableIterator[(U, S)] = self.parallelIterator.zipAllParSeq(otherPar.parallelIterator, thisElem, thatElem) + def seq = (self.seq.zipAll(other, thisElem, thatElem)).asInstanceOf[IterableView[(U, S), CollSeq]] + } protected[this] def thisParSeq: ParSeq[T] = mutable.ParArray.fromTraversables(this.iterator) @@ -119,6 +119,8 @@ self => override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That] override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That = newZipped(new ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That] + override def zipAll[S, U >: T, That](that: Iterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[This, (U, S), That]): That = + newZippedAllTryParSeq(that, thisElem, thatElem).asInstanceOf[That] override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf => executeAndWaitResult(new Force(pbf, parallelIterator) mapResult { _.result }) @@ -139,6 +141,11 @@ self => protected override def newFlatMapped[S](f: T => Traversable[S]) = unsupported protected override def newFiltered(p: T => Boolean) = unsupported protected override def newZipped[S](that: Iterable[S]): Transformed[(T, S)] = new Zipped[S] { val other = that } + protected override def newZippedAll[U >: T, S](that: Iterable[S], _thisElem: U, _thatElem: S): Transformed[(U, S)] = new ZippedAll[U, S] { + val other = that + val thisElem = _thisElem + val thatElem = _thatElem + } /* argument sequence dependent ctors */ @@ -155,6 +162,10 @@ self => if (that.isParSeq) newZipped[S](that) else newZipped[S](mutable.ParArray.fromTraversables(that)) } + protected def newZippedAllTryParSeq[S, U >: T](that: Iterable[S], thisElem: U, thatElem: S): Transformed[(U, S)] = { + if (that.isParSeq) newZippedAll(that, thisElem, thatElem) + else newZippedAll(mutable.ParArray.fromTraversables(that), thisElem, thatElem) + } /* tasks */ diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 9ea647fd9f..cc68191f9d 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -424,7 +424,7 @@ self => override def merge(that: Updated[U, That]) = result = result combine that.result } - protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[S]) + protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParSeqIterator[T], val otherpit: ParSeqIterator[S]) extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] { var result: Result = null def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, pbf(self.repr)) diff --git a/src/library/scala/collection/parallel/ParSeqViewLike.scala b/src/library/scala/collection/parallel/ParSeqViewLike.scala index 809897d7ed..2116d4f4b4 100644 --- a/src/library/scala/collection/parallel/ParSeqViewLike.scala +++ b/src/library/scala/collection/parallel/ParSeqViewLike.scala @@ -75,11 +75,12 @@ self => override def seq = (self.seq zip other).asInstanceOf[SeqView[(T, S), CollSeq]] } - // TODO from - trait ZippedAll[T1 >: T, S] extends super.ZippedAll[T1, S] with Transformed[(T1, S)] { - def seq = self.seq.zipAll(other, thisElem, thatElem).asInstanceOf[SeqView[(T1, S), CollSeq]] + trait ZippedAll[U >: T, S] extends super[SeqViewLike].ZippedAll[U, S] with super[ParIterableViewLike].ZippedAll[U, S] with Transformed[(U, S)] { + override def parallelIterator: ParSeqIterator[(U, S)] = self.parallelIterator.zipAllParSeq(otherPar.parallelIterator, thisElem, thatElem) + override def seq = (self.seq.zipAll(other, thisElem, thatElem)).asInstanceOf[SeqView[(U, S), CollSeq]] } + // TODO from trait Reversed extends super.Reversed with Transformed[T] { def seq = self.seq.reverse } @@ -107,9 +108,13 @@ self => } protected override def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f } protected override def newZipped[S](that: Iterable[S]): Transformed[(T, S)] = new Zipped[S] { val other = that } + protected override def newZippedAll[U >: T, S](that: Iterable[S], _thisElem: U, _thatElem: S): Transformed[(U, S)] = new ZippedAll[U, S] { + val other = that + val thisElem = _thisElem + val thatElem = _thatElem + } // TODO from here - protected override def newZippedAll[T1 >: T, S](that: Iterable[S], _thisElem: T1, _thatElem: S): Transformed[(T1, S)] = new ZippedAll[T1, S] { val other = that; val thisElem = _thisElem; val thatElem = _thatElem } protected override def newReversed: Transformed[T] = new Reversed { } protected override def newPatched[U >: T](_from: Int, _patch: Seq[U], _replaced: Int): Transformed[U] = new Patched[U] { val from = _from; val patch = _patch; val replaced = _replaced } protected override def newPrepended[U >: T](elem: U): Transformed[U] = new Prepended[U] { protected[this] val fst = elem } diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index 2652fbfb53..56eaf5a451 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -8,6 +8,8 @@ import scala.collection.generic.DelegatedSignalling import scala.collection.generic.CanCombineFrom import scala.collection.mutable.Builder import scala.collection.Iterator.empty +import scala.collection.parallel.immutable.repetition + trait RemainsIterator[+T] extends Iterator[T] { @@ -219,14 +221,22 @@ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] { } } - def zip2combiner[U >: T, S, That](otherpit: Iterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = { - cb.sizeHint(remaining) + def zip2combiner[U >: T, S, That](otherpit: RemainsIterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = { + cb.sizeHint(remaining min otherpit.remaining) while (hasNext && otherpit.hasNext) { cb += ((next, otherpit.next)) } cb } + def zipAll2combiner[U >: T, S, That](that: RemainsIterator[S], thiselem: U, thatelem: S, cb: Combiner[(U, S), That]): Combiner[(U, S), That] = { + cb.sizeHint(remaining max that.remaining) + while (this.hasNext && that.hasNext) cb += ((this.next, that.next)) + while (this.hasNext) cb += ((this.next, thatelem)) + while (that.hasNext) cb += ((thiselem, that.next)) + cb + } + } @@ -414,6 +424,27 @@ self => def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that) + class ZippedAll[U >: T, S](protected val that: ParSeqIterator[S], protected val thiselem: U, protected val thatelem: S) + extends ParIterableIterator[(U, S)] { + var signalDelegate = self.signalDelegate + def hasNext = self.hasNext || that.hasNext + def next = if (self.hasNext) { + if (that.hasNext) (self.next, that.next) + else (self.next, thatelem) + } else (thiselem, that.next); + def remaining = self.remaining max that.remaining + def split: Seq[ParIterableIterator[(U, S)]] = { + val selfrem = self.remaining + val thatrem = that.remaining + val thisit = if (selfrem < thatrem) self.appendParIterable[U, ParSeqIterator[U]](repetition[U](thiselem, thatrem - selfrem).parallelIterator) else self + val thatit = if (selfrem > thatrem) that.appendParSeq(repetition(thatelem, selfrem - thatrem).parallelIterator) else that + val zipped = thisit zipParSeq thatit + zipped.split + } + } + + def zipAllParSeq[S, U >: T, R >: S](that: ParSeqIterator[S], thisElem: U, thatElem: R) = new ZippedAll[U, R](that, thisElem, thatElem) + } @@ -495,6 +526,28 @@ self => override def zipParSeq[S](that: ParSeqIterator[S]) = new Zipped(that) + class ZippedAll[U >: T, S](ti: ParSeqIterator[S], thise: U, thate: S) extends super.ZippedAll[U, S](ti, thise, thate) with ParSeqIterator[(U, S)] { + private def patchem = { + val selfrem = self.remaining + val thatrem = that.remaining + val thisit = if (selfrem < thatrem) self.appendParSeq[U, ParSeqIterator[U]](repetition[U](thiselem, thatrem - selfrem).parallelIterator) else self + val thatit = if (selfrem > thatrem) that.appendParSeq(repetition(thatelem, selfrem - thatrem).parallelIterator) else that + (thisit, thatit) + } + override def split: Seq[ParSeqIterator[(U, S)]] = { + val (thisit, thatit) = patchem + val zipped = thisit zipParSeq thatit + zipped.split + } + def psplit(sizes: Int*): Seq[ParSeqIterator[(U, S)]] = { + val (thisit, thatit) = patchem + val zipped = thisit zipParSeq thatit + zipped.psplit(sizes: _*) + } + } + + override def zipAllParSeq[S, U >: T, R >: S](that: ParSeqIterator[S], thisElem: U, thatElem: R) = new ZippedAll[U, R](that, thisElem, thatElem) + } diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala index 6049eaee15..d529d223c9 100644 --- a/src/library/scala/collection/parallel/immutable/package.scala +++ b/src/library/scala/collection/parallel/immutable/package.scala @@ -12,6 +12,8 @@ package scala.collection.parallel package object immutable { + def repetition[T](elem: T, len: Int) = new Repetition(elem, len) + /** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method. * * @tparam T type of the elements |