summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/ParSeqLike.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/collection/parallel/ParSeqLike.scala')
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala73
1 files changed, 41 insertions, 32 deletions
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index 6a5ee5c69b..9f28a286ca 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -44,8 +44,7 @@ trait ParSeqLike[+T, +Repr <: ParSeq[T], +Sequential <: Seq[T] with SeqLike[T, S
extends scala.collection.GenSeqLike[T, Repr]
with ParIterableLike[T, Repr, Sequential] {
self =>
- import tasksupport._
-
+
type SuperParIterator = IterableSplitter[T]
/** A more refined version of the iterator found in the `ParallelIterable` trait,
@@ -107,7 +106,7 @@ self =>
val realfrom = if (from < 0) 0 else from
val ctx = new DefaultSignalling with AtomicIndexFlag
ctx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new SegmentLength(p, 0, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))._1
+ tasksupport.executeAndWaitResult(new SegmentLength(p, 0, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))._1
}
/** Finds the first element satisfying some predicate.
@@ -125,7 +124,7 @@ self =>
val realfrom = if (from < 0) 0 else from
val ctx = new DefaultSignalling with AtomicIndexFlag
ctx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))
+ tasksupport.executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))
}
/** Finds the last element satisfying some predicate.
@@ -143,18 +142,20 @@ self =>
val until = if (end >= length) length else end + 1
val ctx = new DefaultSignalling with AtomicIndexFlag
ctx.setIndexFlag(Int.MinValue)
- executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplitWithSignalling(until, length - until)(0) assign ctx))
+ tasksupport.executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplitWithSignalling(until, length - until)(0) assign ctx))
}
def reverse: Repr = {
- executeAndWaitResult(new Reverse(() => newCombiner, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new Reverse(() => newCombiner, splitter) mapResult { _.resultWithTaskSupport })
}
def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) {
- executeAndWaitResult(new ReverseMap[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.result })
- } else seq.reverseMap(f)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(
+ new ReverseMap[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.resultWithTaskSupport }
+ )
+ } else setTaskSupport(seq.reverseMap(f)(bf2seq(bf)), tasksupport)
/*bf ifParallel { pbf =>
- executeAndWaitResult(new ReverseMap[S, That](f, pbf, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new ReverseMap[S, That](f, pbf, splitter) mapResult { _.result })
} otherwise seq.reverseMap(f)(bf2seq(bf))*/
/** Tests whether this $coll contains the given sequence at a given index.
@@ -172,19 +173,21 @@ self =>
else if (pthat.length > length - offset) false
else {
val ctx = new DefaultSignalling with VolatileAbort
- executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(offset, pthat.length)(1) assign ctx, pthat.splitter))
+ tasksupport.executeAndWaitResult(
+ new SameElements(splitter.psplitWithSignalling(offset, pthat.length)(1) assign ctx, pthat.splitter)
+ )
}
} otherwise seq.startsWith(that, offset)
override def sameElements[U >: T](that: GenIterable[U]): Boolean = that ifParSeq { pthat =>
val ctx = new DefaultSignalling with VolatileAbort
- length == pthat.length && executeAndWaitResult(new SameElements(splitter assign ctx, pthat.splitter))
+ length == pthat.length && tasksupport.executeAndWaitResult(new SameElements(splitter assign ctx, pthat.splitter))
} otherwise seq.sameElements(that)
/** Tests whether this $coll ends with the given parallel sequence.
- *
+ *
* $abortsignalling
- *
+ *
* @tparam S the type of the elements of `that` sequence
* @param that the sequence to test
* @return `true` if this $coll has `that` as a suffix, `false` otherwise
@@ -195,25 +198,24 @@ self =>
else {
val ctx = new DefaultSignalling with VolatileAbort
val tlen = that.length
- executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(length - tlen, tlen)(1) assign ctx, pthat.splitter))
+ tasksupport.executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(length - tlen, tlen)(1) assign ctx, pthat.splitter))
}
} otherwise seq.endsWith(that)
def patch[U >: T, That](from: Int, patch: GenSeq[U], replaced: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
val realreplaced = replaced min (length - from)
- if (patch.isParSeq && bf.isParallel && (size - realreplaced + patch.size) > MIN_FOR_COPY) {
+ if (patch.isParSeq && bf(repr).isCombiner && (size - realreplaced + patch.size) > MIN_FOR_COPY) {
val that = patch.asParSeq
- val pbf = bf.asParallel
val pits = splitter.psplitWithSignalling(from, replaced, length - from - realreplaced)
- val cfactory = combinerFactory(() => pbf(repr))
+ val cfactory = combinerFactory(() => bf(repr).asCombiner)
val copystart = new Copy[U, That](cfactory, pits(0))
val copymiddle = wrap {
val tsk = new that.Copy[U, That](cfactory, that.splitter)
tasksupport.executeAndWaitResult(tsk)
}
val copyend = new Copy[U, That](cfactory, pits(2))
- executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
- _.result
+ tasksupport.executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
+ _.resultWithTaskSupport
})
} else patch_sequential(from, patch.seq, replaced)
}
@@ -226,14 +228,18 @@ self =>
b ++= pits(0)
b ++= patch
b ++= pits(2)
- b.result
+ setTaskSupport(b.result, tasksupport)
}
def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (bf(repr).isCombiner) {
- executeAndWaitResult(new Updated(index, elem, () => bf(repr).asCombiner, splitter) mapResult { _.result })
- } else seq.updated(index, elem)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(
+ new Updated(index, elem, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult {
+ _.resultWithTaskSupport
+ }
+ )
+ } else setTaskSupport(seq.updated(index, elem)(bf2seq(bf)), tasksupport)
/*bf ifParallel { pbf =>
- executeAndWaitResult(new Updated(index, elem, pbf, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new Updated(index, elem, pbf, splitter) mapResult { _.result })
} otherwise seq.updated(index, elem)(bf2seq(bf))*/
def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
@@ -248,10 +254,13 @@ self =>
patch(length, new immutable.Repetition(elem, len - length), 0)
} else patch(length, Nil, 0);
- override def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) {
- val pbf = bf.asParallel
+ override def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf(repr).isCombiner && that.isParSeq) {
val thatseq = that.asParSeq
- executeAndWaitResult(new Zip(length min thatseq.length, pbf, splitter, thatseq.splitter) mapResult { _.result });
+ tasksupport.executeAndWaitResult(
+ new Zip(length min thatseq.length, combinerFactory(() => bf(repr).asCombiner), splitter, thatseq.splitter) mapResult {
+ _.resultWithTaskSupport
+ }
+ );
} else super.zip(that)(bf)
/** Tests whether every element of this $coll relates to the
@@ -268,7 +277,7 @@ self =>
*/
def corresponds[S](that: GenSeq[S])(p: (T, S) => Boolean): Boolean = that ifParSeq { pthat =>
val ctx = new DefaultSignalling with VolatileAbort
- length == pthat.length && executeAndWaitResult(new Corresponds(p, splitter assign ctx, pthat.splitter))
+ length == pthat.length && tasksupport.executeAndWaitResult(new Corresponds(p, splitter assign ctx, pthat.splitter))
} otherwise seq.corresponds(that)(p)
def diff[U >: T](that: GenSeq[U]): Repr = sequentially {
@@ -424,7 +433,7 @@ self =>
override def requiresStrictSplitters = true
}
- protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: () => Combiner[U, That], protected[this] val pit: SeqSplitter[T])
+ protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CombinerFactory[U, That], protected[this] val pit: SeqSplitter[T])
extends Transformer[Combiner[U, That], Updated[U, That]] {
@volatile var result: Combiner[U, That] = null
def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf())
@@ -437,10 +446,10 @@ self =>
override def requiresStrictSplitters = true
}
- protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S])
+ protected[this] class Zip[U >: T, S, That](len: Int, cf: CombinerFactory[(U, S), That], protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S])
extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] {
@volatile var result: Result = null
- def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, pbf(self.repr))
+ def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, cf())
protected[this] def newSubtask(p: SuperParIterator) = unsupported
override def split = {
val fp = len / 2
@@ -448,8 +457,8 @@ self =>
val pits = pit.psplitWithSignalling(fp, sp)
val opits = otherpit.psplitWithSignalling(fp, sp)
Seq(
- new Zip(fp, pbf, pits(0), opits(0)),
- new Zip(sp, pbf, pits(1), opits(1))
+ new Zip(fp, cf, pits(0), opits(0)),
+ new Zip(sp, cf, pits(1), opits(1))
)
}
override def merge(that: Zip[U, S, That]) = result = result combine that.result