diff options
Diffstat (limited to 'src/library/scala/collection/parallel/ParSeqLike.scala')
-rw-r--r-- | src/library/scala/collection/parallel/ParSeqLike.scala | 73 |
1 files changed, 41 insertions, 32 deletions
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 6a5ee5c69b..9f28a286ca 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -44,8 +44,7 @@ trait ParSeqLike[+T, +Repr <: ParSeq[T], +Sequential <: Seq[T] with SeqLike[T, S extends scala.collection.GenSeqLike[T, Repr] with ParIterableLike[T, Repr, Sequential] { self => - import tasksupport._ - + type SuperParIterator = IterableSplitter[T] /** A more refined version of the iterator found in the `ParallelIterable` trait, @@ -107,7 +106,7 @@ self => val realfrom = if (from < 0) 0 else from val ctx = new DefaultSignalling with AtomicIndexFlag ctx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new SegmentLength(p, 0, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))._1 + tasksupport.executeAndWaitResult(new SegmentLength(p, 0, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))._1 } /** Finds the first element satisfying some predicate. @@ -125,7 +124,7 @@ self => val realfrom = if (from < 0) 0 else from val ctx = new DefaultSignalling with AtomicIndexFlag ctx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx)) + tasksupport.executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx)) } /** Finds the last element satisfying some predicate. @@ -143,18 +142,20 @@ self => val until = if (end >= length) length else end + 1 val ctx = new DefaultSignalling with AtomicIndexFlag ctx.setIndexFlag(Int.MinValue) - executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplitWithSignalling(until, length - until)(0) assign ctx)) + tasksupport.executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplitWithSignalling(until, length - until)(0) assign ctx)) } def reverse: Repr = { - executeAndWaitResult(new Reverse(() => newCombiner, splitter) mapResult { _.result }) + tasksupport.executeAndWaitResult(new Reverse(() => newCombiner, splitter) mapResult { _.resultWithTaskSupport }) } def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) { - executeAndWaitResult(new ReverseMap[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.result }) - } else seq.reverseMap(f)(bf2seq(bf)) + tasksupport.executeAndWaitResult( + new ReverseMap[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.resultWithTaskSupport } + ) + } else setTaskSupport(seq.reverseMap(f)(bf2seq(bf)), tasksupport) /*bf ifParallel { pbf => - executeAndWaitResult(new ReverseMap[S, That](f, pbf, splitter) mapResult { _.result }) + tasksupport.executeAndWaitResult(new ReverseMap[S, That](f, pbf, splitter) mapResult { _.result }) } otherwise seq.reverseMap(f)(bf2seq(bf))*/ /** Tests whether this $coll contains the given sequence at a given index. @@ -172,19 +173,21 @@ self => else if (pthat.length > length - offset) false else { val ctx = new DefaultSignalling with VolatileAbort - executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(offset, pthat.length)(1) assign ctx, pthat.splitter)) + tasksupport.executeAndWaitResult( + new SameElements(splitter.psplitWithSignalling(offset, pthat.length)(1) assign ctx, pthat.splitter) + ) } } otherwise seq.startsWith(that, offset) override def sameElements[U >: T](that: GenIterable[U]): Boolean = that ifParSeq { pthat => val ctx = new DefaultSignalling with VolatileAbort - length == pthat.length && executeAndWaitResult(new SameElements(splitter assign ctx, pthat.splitter)) + length == pthat.length && tasksupport.executeAndWaitResult(new SameElements(splitter assign ctx, pthat.splitter)) } otherwise seq.sameElements(that) /** Tests whether this $coll ends with the given parallel sequence. - * + * * $abortsignalling - * + * * @tparam S the type of the elements of `that` sequence * @param that the sequence to test * @return `true` if this $coll has `that` as a suffix, `false` otherwise @@ -195,25 +198,24 @@ self => else { val ctx = new DefaultSignalling with VolatileAbort val tlen = that.length - executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(length - tlen, tlen)(1) assign ctx, pthat.splitter)) + tasksupport.executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(length - tlen, tlen)(1) assign ctx, pthat.splitter)) } } otherwise seq.endsWith(that) def patch[U >: T, That](from: Int, patch: GenSeq[U], replaced: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = { val realreplaced = replaced min (length - from) - if (patch.isParSeq && bf.isParallel && (size - realreplaced + patch.size) > MIN_FOR_COPY) { + if (patch.isParSeq && bf(repr).isCombiner && (size - realreplaced + patch.size) > MIN_FOR_COPY) { val that = patch.asParSeq - val pbf = bf.asParallel val pits = splitter.psplitWithSignalling(from, replaced, length - from - realreplaced) - val cfactory = combinerFactory(() => pbf(repr)) + val cfactory = combinerFactory(() => bf(repr).asCombiner) val copystart = new Copy[U, That](cfactory, pits(0)) val copymiddle = wrap { val tsk = new that.Copy[U, That](cfactory, that.splitter) tasksupport.executeAndWaitResult(tsk) } val copyend = new Copy[U, That](cfactory, pits(2)) - executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { - _.result + tasksupport.executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { + _.resultWithTaskSupport }) } else patch_sequential(from, patch.seq, replaced) } @@ -226,14 +228,18 @@ self => b ++= pits(0) b ++= patch b ++= pits(2) - b.result + setTaskSupport(b.result, tasksupport) } def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (bf(repr).isCombiner) { - executeAndWaitResult(new Updated(index, elem, () => bf(repr).asCombiner, splitter) mapResult { _.result }) - } else seq.updated(index, elem)(bf2seq(bf)) + tasksupport.executeAndWaitResult( + new Updated(index, elem, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { + _.resultWithTaskSupport + } + ) + } else setTaskSupport(seq.updated(index, elem)(bf2seq(bf)), tasksupport) /*bf ifParallel { pbf => - executeAndWaitResult(new Updated(index, elem, pbf, splitter) mapResult { _.result }) + tasksupport.executeAndWaitResult(new Updated(index, elem, pbf, splitter) mapResult { _.result }) } otherwise seq.updated(index, elem)(bf2seq(bf))*/ def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = { @@ -248,10 +254,13 @@ self => patch(length, new immutable.Repetition(elem, len - length), 0) } else patch(length, Nil, 0); - override def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { - val pbf = bf.asParallel + override def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf(repr).isCombiner && that.isParSeq) { val thatseq = that.asParSeq - executeAndWaitResult(new Zip(length min thatseq.length, pbf, splitter, thatseq.splitter) mapResult { _.result }); + tasksupport.executeAndWaitResult( + new Zip(length min thatseq.length, combinerFactory(() => bf(repr).asCombiner), splitter, thatseq.splitter) mapResult { + _.resultWithTaskSupport + } + ); } else super.zip(that)(bf) /** Tests whether every element of this $coll relates to the @@ -268,7 +277,7 @@ self => */ def corresponds[S](that: GenSeq[S])(p: (T, S) => Boolean): Boolean = that ifParSeq { pthat => val ctx = new DefaultSignalling with VolatileAbort - length == pthat.length && executeAndWaitResult(new Corresponds(p, splitter assign ctx, pthat.splitter)) + length == pthat.length && tasksupport.executeAndWaitResult(new Corresponds(p, splitter assign ctx, pthat.splitter)) } otherwise seq.corresponds(that)(p) def diff[U >: T](that: GenSeq[U]): Repr = sequentially { @@ -424,7 +433,7 @@ self => override def requiresStrictSplitters = true } - protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: () => Combiner[U, That], protected[this] val pit: SeqSplitter[T]) + protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CombinerFactory[U, That], protected[this] val pit: SeqSplitter[T]) extends Transformer[Combiner[U, That], Updated[U, That]] { @volatile var result: Combiner[U, That] = null def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf()) @@ -437,10 +446,10 @@ self => override def requiresStrictSplitters = true } - protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S]) + protected[this] class Zip[U >: T, S, That](len: Int, cf: CombinerFactory[(U, S), That], protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S]) extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] { @volatile var result: Result = null - def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, pbf(self.repr)) + def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, cf()) protected[this] def newSubtask(p: SuperParIterator) = unsupported override def split = { val fp = len / 2 @@ -448,8 +457,8 @@ self => val pits = pit.psplitWithSignalling(fp, sp) val opits = otherpit.psplitWithSignalling(fp, sp) Seq( - new Zip(fp, pbf, pits(0), opits(0)), - new Zip(sp, pbf, pits(1), opits(1)) + new Zip(fp, cf, pits(0), opits(0)), + new Zip(sp, cf, pits(1), opits(1)) ) } override def merge(that: Zip[U, S, That]) = result = result combine that.result |