diff options
Diffstat (limited to 'src/library/scala/collection/parallel/ParSeqLike.scala')
-rw-r--r-- | src/library/scala/collection/parallel/ParSeqLike.scala | 166 |
1 files changed, 102 insertions, 64 deletions
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 189db237b9..3081acdc18 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -6,12 +6,14 @@ ** |/ ** \* */ - package scala.collection.parallel import scala.collection.Parallel import scala.collection.SeqLike +import scala.collection.GenSeqLike +import scala.collection.GenSeq +import scala.collection.GenIterable import scala.collection.generic.DefaultSignalling import scala.collection.generic.AtomicIndexFlag import scala.collection.generic.CanBuildFrom @@ -46,12 +48,12 @@ import scala.collection.generic.VolatileAbort * @since 2.9 */ trait ParSeqLike[+T, +Repr <: ParSeq[T], +Sequential <: Seq[T] with SeqLike[T, Sequential]] -extends scala.collection.SeqLike[T, Repr] +extends scala.collection.GenSeqLike[T, Repr] with ParIterableLike[T, Repr, Sequential] { self => import tasksupport._ - type SuperParIterator = ParIterableIterator[T] + type SuperParIterator = IterableSplitter[T] /** An iterator that can be split into arbitrary subsets of iterators. * The self-type requirement ensures that the signal context passing behaviour gets mixed in @@ -60,14 +62,14 @@ self => * '''Note:''' In concrete collection classes, collection implementers might want to override the iterator * `reverse2builder` method to ensure higher efficiency. */ - trait ParIterator extends ParSeqIterator[T] with super.ParIterator { + trait ParIterator extends SeqSplitter[T] with super.ParIterator { me: SignalContextPassingIterator[ParIterator] => def split: Seq[ParIterator] def psplit(sizes: Int*): Seq[ParIterator] } /** A stackable modification that ensures signal contexts get passed along the iterators. - * A self-type requirement in `ParallelIterator` ensures that this trait gets mixed into + * A self-type requirement in `ParIterator` ensures that this trait gets mixed into * concrete iterators. */ trait SignalContextPassingIterator[+IterRepr <: ParIterator] @@ -87,9 +89,9 @@ self => * * @return an iterator that can be split into subsets of precise size */ - def parallelIterator: ParSeqIterator[T] + protected[parallel] def splitter: SeqSplitter[T] - override def iterator: PreciseSplitter[T] = parallelIterator + override def iterator: PreciseSplitter[T] = splitter override def size = length @@ -139,14 +141,14 @@ self => * @return the length of the longest segment of elements starting at `from` and * satisfying the predicate */ - override def segmentLength(p: T => Boolean, from: Int): Int = if (from >= length) 0 else { + def segmentLength(p: T => Boolean, from: Int): Int = if (from >= length) 0 else { val realfrom = if (from < 0) 0 else from val ctx = new DefaultSignalling with AtomicIndexFlag ctx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new SegmentLength(p, 0, parallelIterator.psplit(realfrom, length - realfrom)(1) assign ctx))._1 + executeAndWaitResult(new SegmentLength(p, 0, splitter.psplit(realfrom, length - realfrom)(1) assign ctx))._1 } - override def prefixLength(p: T => Boolean) = segmentLength(p, 0) + def prefixLength(p: T => Boolean) = segmentLength(p, 0) /** Finds the first element satisfying some predicate. * @@ -159,20 +161,20 @@ self => * @return the index `>= from` of the first element of this $coll that satisfies the predicate `p`, * or `-1`, if none exists */ - override def indexWhere(p: T => Boolean, from: Int): Int = if (from >= length) -1 else { + def indexWhere(p: T => Boolean, from: Int): Int = if (from >= length) -1 else { val realfrom = if (from < 0) 0 else from val ctx = new DefaultSignalling with AtomicIndexFlag ctx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new IndexWhere(p, realfrom, parallelIterator.psplit(realfrom, length - realfrom)(1) assign ctx)) + executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplit(realfrom, length - realfrom)(1) assign ctx)) } - override def indexWhere(p: T => Boolean): Int = indexWhere(p, 0) + def indexWhere(p: T => Boolean): Int = indexWhere(p, 0) - override def findIndexOf(p: T => Boolean): Int = indexWhere(p, 0) + def findIndexOf(p: T => Boolean): Int = indexWhere(p, 0) - override def indexOf[U >: T](elem: U): Int = indexOf(elem, 0) + def indexOf[U >: T](elem: U): Int = indexOf(elem, 0) - override def indexOf[U >: T](elem: U, from: Int): Int = indexWhere(elem ==, from) + def indexOf[U >: T](elem: U, from: Int): Int = indexWhere(elem ==, from) /** Finds the last element satisfying some predicate. * @@ -185,22 +187,22 @@ self => * @return the index `<= end` of the first element of this $coll that satisfies the predicate `p`, * or `-1`, if none exists */ - override def lastIndexWhere(p: T => Boolean, end: Int): Int = if (end < 0) -1 else { + def lastIndexWhere(p: T => Boolean, end: Int): Int = if (end < 0) -1 else { val until = if (end >= length) length else end + 1 val ctx = new DefaultSignalling with AtomicIndexFlag ctx.setIndexFlag(Int.MinValue) - executeAndWaitResult(new LastIndexWhere(p, 0, parallelIterator.psplit(until, length - until)(0) assign ctx)) + executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplit(until, length - until)(0) assign ctx)) } - override def reverse: Repr = { - executeAndWaitResult(new Reverse(() => newCombiner, parallelIterator) mapResult { _.result }) + def reverse: Repr = { + executeAndWaitResult(new Reverse(() => newCombiner, splitter) mapResult { _.result }) } - override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => - executeAndWaitResult(new ReverseMap[S, That](f, pbf, parallelIterator) mapResult { _.result }) - } otherwise super.reverseMap(f)(bf) + def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => + executeAndWaitResult(new ReverseMap[S, That](f, pbf, splitter) mapResult { _.result }) + } otherwise seq.reverseMap(f)(bf2seq(bf)) - override def startsWith[S](that: Seq[S]): Boolean = startsWith(that, 0) + def startsWith[S](that: GenSeq[S]): Boolean = startsWith(that, 0) /** Tests whether this $coll contains the given sequence at a given index. * @@ -211,20 +213,20 @@ self => * @param offset the starting offset for the search * @return `true` if there is a sequence `that` starting at `offset` in this sequence, `false` otherwise */ - override def startsWith[S](that: Seq[S], offset: Int): Boolean = that ifParSeq { pthat => + def startsWith[S](that: GenSeq[S], offset: Int): Boolean = that ifParSeq { pthat => if (offset < 0 || offset >= length) offset == length && pthat.length == 0 else if (pthat.length == 0) true else if (pthat.length > length - offset) false else { val ctx = new DefaultSignalling with VolatileAbort - executeAndWaitResult(new SameElements(parallelIterator.psplit(offset, pthat.length)(1) assign ctx, pthat.parallelIterator)) + executeAndWaitResult(new SameElements(splitter.psplit(offset, pthat.length)(1) assign ctx, pthat.splitter)) } - } otherwise super.startsWith(that, offset) + } otherwise seq.startsWith(that, offset) - override def sameElements[U >: T](that: Iterable[U]): Boolean = that ifParSeq { pthat => + override def sameElements[U >: T](that: GenIterable[U]): Boolean = that ifParSeq { pthat => val ctx = new DefaultSignalling with VolatileAbort - length == pthat.length && executeAndWaitResult(new SameElements(parallelIterator assign ctx, pthat.parallelIterator)) - } otherwise super.sameElements(that) + length == pthat.length && executeAndWaitResult(new SameElements(splitter assign ctx, pthat.splitter)) + } otherwise seq.sameElements(that) /** Tests whether this $coll ends with the given parallel sequence * @@ -234,65 +236,65 @@ self => * @param that the sequence to test * @return `true` if this $coll has `that` as a suffix, `false` otherwise */ - override def endsWith[S](that: Seq[S]): Boolean = that ifParSeq { pthat => + def endsWith[S](that: GenSeq[S]): Boolean = that ifParSeq { pthat => if (that.length == 0) true else if (that.length > length) false else { val ctx = new DefaultSignalling with VolatileAbort val tlen = that.length - executeAndWaitResult(new SameElements(parallelIterator.psplit(length - tlen, tlen)(1) assign ctx, pthat.parallelIterator)) + executeAndWaitResult(new SameElements(splitter.psplit(length - tlen, tlen)(1) assign ctx, pthat.splitter)) } - } otherwise super.endsWith(that) + } otherwise seq.endsWith(that) - override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = { + def patch[U >: T, That](from: Int, patch: GenSeq[U], replaced: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = { val realreplaced = replaced min (length - from) if (patch.isParSeq && bf.isParallel && (size - realreplaced + patch.size) > MIN_FOR_COPY) { val that = patch.asParSeq val pbf = bf.asParallel - val pits = parallelIterator.psplit(from, replaced, length - from - realreplaced) + val pits = splitter.psplit(from, replaced, length - from - realreplaced) val copystart = new Copy[U, That](() => pbf(repr), pits(0)) val copymiddle = wrap { - val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator) + val tsk = new that.Copy[U, That](() => pbf(repr), that.splitter) tasksupport.executeAndWaitResult(tsk) } val copyend = new Copy[U, That](() => pbf(repr), pits(2)) executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { _.result }) - } else patch_sequential(from, patch, replaced) + } else patch_sequential(from, patch.seq, replaced) } private def patch_sequential[U >: T, That](fromarg: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = { val from = 0 max fromarg val b = bf(repr) val repl = (r min (length - from)) max 0 - val pits = parallelIterator.psplit(from, repl, length - from - repl) + val pits = splitter.psplit(from, repl, length - from - repl) b ++= pits(0) - b ++= patch.iterator + b ++= patch b ++= pits(2) b.result } - override def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = bf ifParallel { pbf => - executeAndWaitResult(new Updated(index, elem, pbf, parallelIterator) mapResult { _.result }) - } otherwise super.updated(index, elem) + def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = bf ifParallel { pbf => + executeAndWaitResult(new Updated(index, elem, pbf, splitter) mapResult { _.result }) + } otherwise seq.updated(index, elem)(bf2seq(bf)) - override def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = { + def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = { patch(0, mutable.ParArray(elem), 0) } - override def :+[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = { + def :+[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = { patch(length, mutable.ParArray(elem), 0) } - override def padTo[U >: T, That](len: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (length < len) { + def padTo[U >: T, That](len: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (length < len) { patch(length, new immutable.Repetition(elem, len - length), 0) } else patch(length, Nil, 0); - override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { + override def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { val pbf = bf.asParallel val thatseq = that.asParSeq - executeAndWaitResult(new Zip(length min thatseq.length, pbf, parallelIterator, thatseq.parallelIterator) mapResult { _.result }); + executeAndWaitResult(new Zip(length min thatseq.length, pbf, splitter, thatseq.splitter) mapResult { _.result }); } else super.zip(that)(bf) /** Tests whether every element of this $coll relates to the @@ -307,10 +309,46 @@ self => * `p(x, y)` is `true` for all corresponding elements `x` of this $coll * and `y` of `that`, otherwise `false` */ - override def corresponds[S](that: Seq[S])(p: (T, S) => Boolean): Boolean = that ifParSeq { pthat => + def corresponds[S](that: GenSeq[S])(p: (T, S) => Boolean): Boolean = that ifParSeq { pthat => val ctx = new DefaultSignalling with VolatileAbort - length == pthat.length && executeAndWaitResult(new Corresponds(p, parallelIterator assign ctx, pthat.parallelIterator)) - } otherwise super.corresponds(that)(p) + length == pthat.length && executeAndWaitResult(new Corresponds(p, splitter assign ctx, pthat.splitter)) + } otherwise seq.corresponds(that)(p) + + def diff[U >: T](that: GenSeq[U]): Repr = sequentially { + _ diff that + } + + /** Computes the multiset intersection between this $coll and another sequence. + * $mayNotTerminateInf + * + * @param that the sequence of elements to intersect with. + * @tparam B the element type of the returned $coll. + * @tparam That $thatinfo + * @param bf $bfinfo + * @return a new collection of type `That` which contains all elements of this $coll + * which also appear in `that`. + * If an element value `x` appears + * ''n'' times in `that`, then the first ''n'' occurrences of `x` will be retained + * in the result, but any following occurrences will be omitted. + * @usecase def intersect(that: Seq[A]): $Coll[A] + * @return a new $coll which contains all elements of this $coll + * which also appear in `that`. + * If an element value `x` appears + * ''n'' times in `that`, then the first ''n'' occurrences of `x` will be retained + * in the result, but any following occurrences will be omitted. + */ + def intersect[U >: T](that: GenSeq[U]) = sequentially { + _ intersect that + } + + /** Builds a new $coll from this $coll without any duplicate elements. + * $willNotTerminateInf + * + * @return A new $coll which contains the first occurrence of every element of this $coll. + */ + def distinct: Repr = sequentially { + _.distinct + } override def toString = seq.mkString(stringPrefix + "(", ", ", ")") @@ -318,25 +356,25 @@ self => override def view = new ParSeqView[T, Repr, Sequential] { protected lazy val underlying = self.repr + protected[this] def viewIdentifier = "" + protected[this] def viewIdString = "" def length = self.length def apply(idx: Int) = self(idx) override def seq = self.seq.view - def parallelIterator = self.parallelIterator + def splitter = self.splitter } - override def view(from: Int, until: Int) = view.slice(from, until) - /* tasks */ - protected[this] def down(p: ParIterableIterator[_]) = p.asInstanceOf[ParSeqIterator[T]] + protected[this] def down(p: IterableSplitter[_]) = p.asInstanceOf[SeqSplitter[T]] protected trait Accessor[R, Tp] extends super.Accessor[R, Tp] { - protected[this] val pit: ParSeqIterator[T] + protected[this] val pit: SeqSplitter[T] } protected trait Transformer[R, Tp] extends Accessor[R, Tp] with super.Transformer[R, Tp] - protected[this] class SegmentLength(pred: T => Boolean, from: Int, protected[this] val pit: ParSeqIterator[T]) + protected[this] class SegmentLength(pred: T => Boolean, from: Int, protected[this] val pit: SeqSplitter[T]) extends Accessor[(Int, Boolean), SegmentLength] { @volatile var result: (Int, Boolean) = null def leaf(prev: Option[(Int, Boolean)]) = if (from < pit.indexFlag) { @@ -354,7 +392,7 @@ self => override def requiresStrictSplitters = true } - protected[this] class IndexWhere(pred: T => Boolean, from: Int, protected[this] val pit: ParSeqIterator[T]) + protected[this] class IndexWhere(pred: T => Boolean, from: Int, protected[this] val pit: SeqSplitter[T]) extends Accessor[Int, IndexWhere] { @volatile var result: Int = -1 def leaf(prev: Option[Int]) = if (from < pit.indexFlag) { @@ -375,7 +413,7 @@ self => override def requiresStrictSplitters = true } - protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, protected[this] val pit: ParSeqIterator[T]) + protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, protected[this] val pit: SeqSplitter[T]) extends Accessor[Int, LastIndexWhere] { @volatile var result: Int = -1 def leaf(prev: Option[Int]) = if (pos > pit.indexFlag) { @@ -396,7 +434,7 @@ self => override def requiresStrictSplitters = true } - protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], protected[this] val pit: ParSeqIterator[T]) + protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], protected[this] val pit: SeqSplitter[T]) extends Transformer[Combiner[U, This], Reverse[U, This]] { @volatile var result: Combiner[U, This] = null def leaf(prev: Option[Combiner[U, This]]) = result = pit.reverse2combiner(reuse(prev, cbf())) @@ -404,7 +442,7 @@ self => override def merge(that: Reverse[U, This]) = result = that.result combine result } - protected[this] class ReverseMap[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: ParSeqIterator[T]) + protected[this] class ReverseMap[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: SeqSplitter[T]) extends Transformer[Combiner[S, That], ReverseMap[S, That]] { @volatile var result: Combiner[S, That] = null def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf(self.repr)) @@ -412,7 +450,7 @@ self => override def merge(that: ReverseMap[S, That]) = result = that.result combine result } - protected[this] class SameElements[U >: T](protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[U]) + protected[this] class SameElements[U >: T](protected[this] val pit: SeqSplitter[T], val otherpit: PreciseSplitter[U]) extends Accessor[Boolean, SameElements[U]] { @volatile var result: Boolean = true def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { @@ -429,7 +467,7 @@ self => override def requiresStrictSplitters = true } - protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], protected[this] val pit: ParSeqIterator[T]) + protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], protected[this] val pit: SeqSplitter[T]) extends Transformer[Combiner[U, That], Updated[U, That]] { @volatile var result: Combiner[U, That] = null def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf(self.repr)) @@ -442,7 +480,7 @@ self => override def requiresStrictSplitters = true } - protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParSeqIterator[T], val otherpit: ParSeqIterator[S]) + protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S]) extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] { @volatile var result: Result = null def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, pbf(self.repr)) @@ -460,7 +498,7 @@ self => override def merge(that: Zip[U, S, That]) = result = result combine that.result } - protected[this] class Corresponds[S](corr: (T, S) => Boolean, protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[S]) + protected[this] class Corresponds[S](corr: (T, S) => Boolean, protected[this] val pit: SeqSplitter[T], val otherpit: PreciseSplitter[S]) extends Accessor[Boolean, Corresponds[S]] { @volatile var result: Boolean = true def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { |