summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/ParIterableLike.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/collection/parallel/ParIterableLike.scala')
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala247
1 files changed, 141 insertions, 106 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 7c5a835e56..7e0fa366ab 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -28,7 +28,7 @@ import immutable.HashMapCombiner
import java.util.concurrent.atomic.AtomicBoolean
import annotation.unchecked.uncheckedVariance
-
+import annotation.unchecked.uncheckedStable
/** A template trait for parallel collections of type `ParIterable[T]`.
@@ -58,7 +58,7 @@ import annotation.unchecked.uncheckedVariance
* }}}
*
* which returns an instance of `IterableSplitter[T]`, which is a subtype of `Splitter[T]`.
- * Parallel iterators have a method `remaining` to check the remaining number of elements,
+ * Splitters have a method `remaining` to check the remaining number of elements,
* and method `split` which is defined by splitters. Method `split` divides the splitters
* iterate over into disjunct subsets:
*
@@ -96,7 +96,7 @@ import annotation.unchecked.uncheckedVariance
* The combination of methods `toMap`, `toSeq` or `toSet` along with `par` and `seq` is a flexible
* way to change between different collection types.
*
- * Since this trait extends the `Iterable` trait, methods like `size` must also
+ * Since this trait extends the `GenIterable` trait, methods like `size` must also
* be implemented in concrete collections, while `iterator` forwards to `splitter` by
* default.
*
@@ -116,7 +116,7 @@ import annotation.unchecked.uncheckedVariance
* which do not know the number of elements remaining. To do this, the new collection implementation must override
* `isStrictSplitterCollection` to `false`. This will make some operations unavailable.
*
- * To create a new parallel collection, extend the `ParIterable` trait, and implement `size`, `parallelIterator`,
+ * To create a new parallel collection, extend the `ParIterable` trait, and implement `size`, `splitter`,
* `newCombiner` and `seq`. Having an implicit combiner factory requires extending this trait in addition, as
* well as providing a companion object, as with regular collections.
*
@@ -155,7 +155,23 @@ extends GenIterableLike[T, Repr]
{
self: ParIterableLike[T, Repr, Sequential] =>
- import tasksupport._
+ @transient
+ @volatile
+ private var _tasksupport = defaultTaskSupport
+
+ protected def initTaskSupport() {
+ _tasksupport = defaultTaskSupport
+ }
+
+ def tasksupport = {
+ val ts = _tasksupport
+ if (ts eq null) {
+ _tasksupport = defaultTaskSupport
+ defaultTaskSupport
+ } else ts
+ }
+
+ def tasksupport_=(ts: TaskSupport) = _tasksupport = ts
def seq: Sequential
@@ -164,7 +180,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
def hasDefiniteSize = true
def nonEmpty = size != 0
-
+
/** Creates a new parallel iterator used to traverse the elements of this parallel collection.
* This iterator is more specific than the iterator of the returned by `iterator`, and augmented
* with additional accessor and transformer methods.
@@ -234,7 +250,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
trait SignallingOps[PI <: DelegatedSignalling] {
def assign(cntx: Signalling): PI
}
-
+
/* convenience task operations wrapper */
protected implicit def task2ops[R, Tp](tsk: SSCTask[R, Tp]) = new TaskOps[R, Tp] {
def mapResult[R1](mapping: R => R1): ResultMapping[R, Tp, R1] = new ResultMapping[R, Tp, R1](tsk) {
@@ -262,7 +278,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
it
}
}
-
+
protected implicit def builder2ops[Elem, To](cb: Builder[Elem, To]) = new BuilderOps[Elem, To] {
def ifIs[Cmb](isbody: Cmb => Unit) = new Otherwise[Cmb] {
def otherwise(notbody: => Unit)(implicit m: ClassManifest[Cmb]) {
@@ -272,12 +288,12 @@ self: ParIterableLike[T, Repr, Sequential] =>
def isCombiner = cb.isInstanceOf[Combiner[_, _]]
def asCombiner = cb.asInstanceOf[Combiner[Elem, To]]
}
-
+
protected[this] def bf2seq[S, That](bf: CanBuildFrom[Repr, S, That]) = new CanBuildFrom[Sequential, S, That] {
def apply(from: Sequential) = bf.apply(from.par.asInstanceOf[Repr]) // !!! we only use this on `this.seq`, and know that `this.seq.par.getClass == this.getClass`
def apply() = bf.apply()
}
-
+
protected[this] def sequentially[S, That <: Parallel](b: Sequential => Parallelizable[S, That]) = b(seq).par.asInstanceOf[Repr]
def mkString(start: String, sep: String, end: String): String = seq.mkString(start, sep, end)
@@ -287,7 +303,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
def mkString: String = seq.mkString("")
override def toString = seq.mkString(stringPrefix + "(", ", ", ")")
-
+
def canEqual(other: Any) = true
/** Reduces the elements of this sequence using the specified associative binary operator.
@@ -306,7 +322,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
* if this $coll is empty.
*/
def reduce[U >: T](op: (U, U) => U): U = {
- executeAndWaitResult(new Reduce(op, splitter) mapResult { _.get })
+ tasksupport.executeAndWaitResult(new Reduce(op, splitter) mapResult { _.get })
}
/** Optionally reduces the elements of this sequence using the specified associative binary operator.
@@ -324,7 +340,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
* the elements if the collection is nonempty, and `None` otherwise.
*/
def reduceOption[U >: T](op: (U, U) => U): Option[U] = if (isEmpty) None else Some(reduce(op))
-
+
/** Folds the elements of this sequence using the specified associative binary operator.
* The order in which the elements are reduced is unspecified and may be nondeterministic.
*
@@ -341,7 +357,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
* @return the result of applying fold operator `op` between all the elements and `z`
*/
def fold[U >: T](z: U)(op: (U, U) => U): U = {
- executeAndWaitResult(new Fold(z, op, splitter))
+ tasksupport.executeAndWaitResult(new Fold(z, op, splitter))
}
/** Aggregates the results of applying an operator to subsequent elements.
@@ -373,13 +389,13 @@ self: ParIterableLike[T, Repr, Sequential] =>
* @param combop an associative operator used to combine results from different partitions
*/
def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
- executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))
+ tasksupport.executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))
}
-
+
def foldLeft[S](z: S)(op: (S, T) => S): S = seq.foldLeft(z)(op)
-
+
def foldRight[S](z: S)(op: (T, S) => S): S = seq.foldRight(z)(op)
-
+
def reduceLeft[U >: T](op: (U, T) => U): U = seq.reduceLeft(op)
def reduceRight[U >: T](op: (T, U) => U): U = seq.reduceRight(op)
@@ -394,27 +410,27 @@ self: ParIterableLike[T, Repr, Sequential] =>
* @param f function applied to each element
*/
def foreach[U](f: T => U) = {
- executeAndWaitResult(new Foreach(f, splitter))
+ tasksupport.executeAndWaitResult(new Foreach(f, splitter))
}
def count(p: T => Boolean): Int = {
- executeAndWaitResult(new Count(p, splitter))
+ tasksupport.executeAndWaitResult(new Count(p, splitter))
}
def sum[U >: T](implicit num: Numeric[U]): U = {
- executeAndWaitResult(new Sum[U](num, splitter))
+ tasksupport.executeAndWaitResult(new Sum[U](num, splitter))
}
def product[U >: T](implicit num: Numeric[U]): U = {
- executeAndWaitResult(new Product[U](num, splitter))
+ tasksupport.executeAndWaitResult(new Product[U](num, splitter))
}
def min[U >: T](implicit ord: Ordering[U]): T = {
- executeAndWaitResult(new Min(ord, splitter) mapResult { _.get }).asInstanceOf[T]
+ tasksupport.executeAndWaitResult(new Min(ord, splitter) mapResult { _.get }).asInstanceOf[T]
}
def max[U >: T](implicit ord: Ordering[U]): T = {
- executeAndWaitResult(new Max(ord, splitter) mapResult { _.get }).asInstanceOf[T]
+ tasksupport.executeAndWaitResult(new Max(ord, splitter) mapResult { _.get }).asInstanceOf[T]
}
def maxBy[S](f: T => S)(implicit cmp: Ordering[S]): T = {
@@ -428,26 +444,26 @@ self: ParIterableLike[T, Repr, Sequential] =>
reduce((x, y) => if (cmp.lteq(f(x), f(y))) x else y)
}
-
+
def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) {
- executeAndWaitResult(new Map[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result })
- } else seq.map(f)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(new Map[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.resultWithTaskSupport })
+ } else setTaskSupport(seq.map(f)(bf2seq(bf)), tasksupport)
/*bf ifParallel { pbf =>
- executeAndWaitResult(new Map[S, That](f, pbf, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new Map[S, That](f, pbf, splitter) mapResult { _.result })
} otherwise seq.map(f)(bf2seq(bf))*/
def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) {
- executeAndWaitResult(new Collect[S, That](pf, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result })
- } else seq.collect(pf)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(new Collect[S, That](pf, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.resultWithTaskSupport })
+ } else setTaskSupport(seq.collect(pf)(bf2seq(bf)), tasksupport)
/*bf ifParallel { pbf =>
- executeAndWaitResult(new Collect[S, That](pf, pbf, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new Collect[S, That](pf, pbf, splitter) mapResult { _.result })
} otherwise seq.collect(pf)(bf2seq(bf))*/
def flatMap[S, That](f: T => GenTraversableOnce[S])(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) {
- executeAndWaitResult(new FlatMap[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result })
- } else seq.flatMap(f)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(new FlatMap[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.resultWithTaskSupport })
+ } else setTaskSupport(seq.flatMap(f)(bf2seq(bf)), tasksupport)
/*bf ifParallel { pbf =>
- executeAndWaitResult(new FlatMap[S, That](f, pbf, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new FlatMap[S, That](f, pbf, splitter) mapResult { _.result })
} otherwise seq.flatMap(f)(bf2seq(bf))*/
/** Tests whether a predicate holds for all elements of this $coll.
@@ -458,7 +474,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
* @return true if `p` holds for all elements, false otherwise
*/
def forall(pred: T => Boolean): Boolean = {
- executeAndWaitResult(new Forall(pred, splitter assign new DefaultSignalling with VolatileAbort))
+ tasksupport.executeAndWaitResult(new Forall(pred, splitter assign new DefaultSignalling with VolatileAbort))
}
/** Tests whether a predicate holds for some element of this $coll.
@@ -469,7 +485,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
* @return true if `p` holds for some element, false otherwise
*/
def exists(pred: T => Boolean): Boolean = {
- executeAndWaitResult(new Exists(pred, splitter assign new DefaultSignalling with VolatileAbort))
+ tasksupport.executeAndWaitResult(new Exists(pred, splitter assign new DefaultSignalling with VolatileAbort))
}
/** Finds some element in the collection for which the predicate holds, if such
@@ -484,13 +500,13 @@ self: ParIterableLike[T, Repr, Sequential] =>
* @return an option value with the element if such an element exists, or `None` otherwise
*/
def find(pred: T => Boolean): Option[T] = {
- executeAndWaitResult(new Find(pred, splitter assign new DefaultSignalling with VolatileAbort))
+ tasksupport.executeAndWaitResult(new Find(pred, splitter assign new DefaultSignalling with VolatileAbort))
}
-
+
/** Creates a combiner factory. Each combiner factory instance is used
* once per invocation of a parallel transformer method for a single
* collection.
- *
+ *
* The default combiner factory creates a new combiner every time it
* is requested, unless the combiner is thread-safe as indicated by its
* `canBeShared` method. In this case, the method returns a factory which
@@ -500,6 +516,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
*/
protected[this] def combinerFactory = {
val combiner = newCombiner
+ combiner.combinerTaskSupport = tasksupport
if (combiner.canBeShared) new CombinerFactory[T, Repr] {
val shared = combiner
def apply() = shared
@@ -509,9 +526,10 @@ self: ParIterableLike[T, Repr, Sequential] =>
def doesShareCombiners = false
}
}
-
+
protected[this] def combinerFactory[S, That](cbf: () => Combiner[S, That]) = {
val combiner = cbf()
+ combiner.combinerTaskSupport = tasksupport
if (combiner.canBeShared) new CombinerFactory[S, That] {
val shared = combiner
def apply() = shared
@@ -521,13 +539,13 @@ self: ParIterableLike[T, Repr, Sequential] =>
def doesShareCombiners = false
}
}
-
+
def filter(pred: T => Boolean): Repr = {
- executeAndWaitResult(new Filter(pred, combinerFactory, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new Filter(pred, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
}
def filterNot(pred: T => Boolean): Repr = {
- executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
}
def ++[U >: T, That](that: GenTraversableOnce[U])(implicit bf: CanBuildFrom[Repr, U, That]): That = {
@@ -542,43 +560,47 @@ self: ParIterableLike[T, Repr, Sequential] =>
tasksupport.executeAndWaitResult(othtask)
}
val task = (copythis parallel copythat) { _ combine _ } mapResult {
- _.result
+ _.resultWithTaskSupport
}
- executeAndWaitResult(task)
- } else if (bf.isParallel) {
+ tasksupport.executeAndWaitResult(task)
+ } else if (bf(repr).isCombiner) {
// println("case parallel builder, `that` not parallel")
- val pbf = bf.asParallel
- val copythis = new Copy(combinerFactory(() => pbf(repr)), splitter)
+ val copythis = new Copy(combinerFactory(() => bf(repr).asCombiner), splitter)
val copythat = wrap {
- val cb = pbf(repr)
+ val cb = bf(repr).asCombiner
for (elem <- that.seq) cb += elem
cb
}
- executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.result })
+ tasksupport.executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.resultWithTaskSupport })
} else {
// println("case not a parallel builder")
val b = bf(repr)
this.splitter.copy2builder[U, That, Builder[U, That]](b)
for (elem <- that.seq) b += elem
- b.result
+ setTaskSupport(b.result, tasksupport)
}
}
def partition(pred: T => Boolean): (Repr, Repr) = {
- executeAndWaitResult(new Partition(pred, combinerFactory, combinerFactory, splitter) mapResult { p => (p._1.result, p._2.result) })
+ tasksupport.executeAndWaitResult(
+ new Partition(pred, combinerFactory, combinerFactory, splitter) mapResult {
+ p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport)
+ }
+ )
}
def groupBy[K](f: T => K): immutable.ParMap[K, Repr] = {
- executeAndWaitResult(new GroupBy(f, () => HashMapCombiner[K, T], splitter) mapResult {
+ val r = tasksupport.executeAndWaitResult(new GroupBy(f, () => HashMapCombiner[K, T], splitter) mapResult {
rcb => rcb.groupByKey(() => combinerFactory())
})
+ setTaskSupport(r, tasksupport)
}
def take(n: Int): Repr = {
val actualn = if (size > n) n else size
if (actualn < MIN_FOR_COPY) take_sequential(actualn)
- else executeAndWaitResult(new Take(actualn, combinerFactory, splitter) mapResult {
- _.result
+ else tasksupport.executeAndWaitResult(new Take(actualn, combinerFactory, splitter) mapResult {
+ _.resultWithTaskSupport
})
}
@@ -591,13 +613,13 @@ self: ParIterableLike[T, Repr, Sequential] =>
cb += it.next
left -= 1
}
- cb.result
+ cb.resultWithTaskSupport
}
def drop(n: Int): Repr = {
val actualn = if (size > n) n else size
if ((size - actualn) < MIN_FOR_COPY) drop_sequential(actualn)
- else executeAndWaitResult(new Drop(actualn, combinerFactory, splitter) mapResult { _.result })
+ else tasksupport.executeAndWaitResult(new Drop(actualn, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
}
private def drop_sequential(n: Int) = {
@@ -605,14 +627,14 @@ self: ParIterableLike[T, Repr, Sequential] =>
val cb = newCombiner
cb.sizeHint(size - n)
while (it.hasNext) cb += it.next
- cb.result
+ cb.resultWithTaskSupport
}
override def slice(unc_from: Int, unc_until: Int): Repr = {
val from = unc_from min size max 0
val until = unc_until min size max from
if ((until - from) <= MIN_FOR_COPY) slice_sequential(from, until)
- else executeAndWaitResult(new Slice(from, until, combinerFactory, splitter) mapResult { _.result })
+ else tasksupport.executeAndWaitResult(new Slice(from, until, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
}
private def slice_sequential(from: Int, until: Int): Repr = {
@@ -623,11 +645,15 @@ self: ParIterableLike[T, Repr, Sequential] =>
cb += it.next
left -= 1
}
- cb.result
+ cb.resultWithTaskSupport
}
def splitAt(n: Int): (Repr, Repr) = {
- executeAndWaitResult(new SplitAt(n, combinerFactory, combinerFactory, splitter) mapResult { p => (p._1.result, p._2.result) })
+ tasksupport.executeAndWaitResult(
+ new SplitAt(n, combinerFactory, combinerFactory, splitter) mapResult {
+ p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport)
+ }
+ )
}
/** Computes a prefix scan of the elements of the collection.
@@ -645,20 +671,19 @@ self: ParIterableLike[T, Repr, Sequential] =>
*
* @return a new $coll containing the prefix scan of the elements in this $coll
*/
- def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (bf.isParallel) {
- val cbf = bf.asParallel
- if (parallelismLevel > 1) {
- if (size > 0) executeAndWaitResult(new CreateScanTree(0, size, z, op, splitter) mapResult {
- tree => executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => cbf(repr))) mapResult {
- cb => cb.result
+ def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (bf(repr).isCombiner) {
+ if (tasksupport.parallelismLevel > 1) {
+ if (size > 0) tasksupport.executeAndWaitResult(new CreateScanTree(0, size, z, op, splitter) mapResult {
+ tree => tasksupport.executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => bf(repr).asCombiner)) mapResult {
+ cb => cb.resultWithTaskSupport
})
- }) else (cbf(self.repr) += z).result
- } else seq.scan(z)(op)(bf2seq(bf))
- } else seq.scan(z)(op)(bf2seq(bf))
+ }) else setTaskSupport((bf(repr) += z).result, tasksupport)
+ } else setTaskSupport(seq.scan(z)(op)(bf2seq(bf)), tasksupport)
+ } else setTaskSupport(seq.scan(z)(op)(bf2seq(bf)), tasksupport)
- def scanLeft[S, That](z: S)(op: (S, T) => S)(implicit bf: CanBuildFrom[Repr, S, That]) = seq.scanLeft(z)(op)(bf2seq(bf))
+ def scanLeft[S, That](z: S)(op: (S, T) => S)(implicit bf: CanBuildFrom[Repr, S, That]) = setTaskSupport(seq.scanLeft(z)(op)(bf2seq(bf)), tasksupport)
- def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[Repr, S, That]) = seq.scanRight(z)(op)(bf2seq(bf))
+ def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[Repr, S, That]) = setTaskSupport(seq.scanRight(z)(op)(bf2seq(bf)), tasksupport)
/** Takes the longest prefix of elements that satisfy the predicate.
*
@@ -672,11 +697,15 @@ self: ParIterableLike[T, Repr, Sequential] =>
val cbf = combinerFactory
if (cbf.doesShareCombiners) {
val parseqspan = toSeq.takeWhile(pred)
- executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter) mapResult {
+ _.resultWithTaskSupport
+ })
} else {
val cntx = new DefaultSignalling with AtomicIndexFlag
cntx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx) mapResult { _._1.result })
+ tasksupport.executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx) mapResult {
+ _._1.resultWithTaskSupport
+ })
}
}
@@ -693,17 +722,17 @@ self: ParIterableLike[T, Repr, Sequential] =>
val cbf = combinerFactory
if (cbf.doesShareCombiners) {
val (xs, ys) = toSeq.span(pred)
- val copyxs = new Copy(combinerFactory, xs.splitter) mapResult { _.result }
- val copyys = new Copy(combinerFactory, ys.splitter) mapResult { _.result }
+ val copyxs = new Copy(combinerFactory, xs.splitter) mapResult { _.resultWithTaskSupport }
+ val copyys = new Copy(combinerFactory, ys.splitter) mapResult { _.resultWithTaskSupport }
val copyall = (copyxs parallel copyys) {
(xr, yr) => (xr, yr)
}
- executeAndWaitResult(copyall)
+ tasksupport.executeAndWaitResult(copyall)
} else {
val cntx = new DefaultSignalling with AtomicIndexFlag
cntx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult {
- p => (p._1.result, p._2.result)
+ tasksupport.executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult {
+ p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport)
})
}
}
@@ -721,7 +750,11 @@ self: ParIterableLike[T, Repr, Sequential] =>
def dropWhile(pred: T => Boolean): Repr = {
val cntx = new DefaultSignalling with AtomicIndexFlag
cntx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult { _._2.result })
+ tasksupport.executeAndWaitResult(
+ new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult {
+ _._2.resultWithTaskSupport
+ }
+ )
}
def copyToArray[U >: T](xs: Array[U]) = copyToArray(xs, 0)
@@ -729,31 +762,33 @@ self: ParIterableLike[T, Repr, Sequential] =>
def copyToArray[U >: T](xs: Array[U], start: Int) = copyToArray(xs, start, xs.length - start)
def copyToArray[U >: T](xs: Array[U], start: Int, len: Int) = if (len > 0) {
- executeAndWaitResult(new CopyToArray(start, len, xs, splitter))
+ tasksupport.executeAndWaitResult(new CopyToArray(start, len, xs, splitter))
}
def sameElements[U >: T](that: GenIterable[U]) = seq.sameElements(that)
- def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) {
- val pbf = bf.asParallel
+ def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf(repr).isCombiner && that.isParSeq) {
val thatseq = that.asParSeq
- executeAndWaitResult(new Zip(combinerFactory(() => pbf(repr)), splitter, thatseq.splitter) mapResult { _.result });
- } else seq.zip(that)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(new Zip(combinerFactory(() => bf(repr).asCombiner), splitter, thatseq.splitter) mapResult { _.resultWithTaskSupport });
+ } else setTaskSupport(seq.zip(that)(bf2seq(bf)), tasksupport)
def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip immutable.ParRange(0, size, 1, false)
- def zipAll[S, U >: T, That](that: GenIterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) {
- val pbf = bf.asParallel
+ def zipAll[S, U >: T, That](that: GenIterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf(repr).isCombiner && that.isParSeq) {
val thatseq = that.asParSeq
- executeAndWaitResult(new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => pbf(repr)), splitter, thatseq.splitter) mapResult { _.result });
- } else seq.zipAll(that, thisElem, thatElem)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(
+ new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => bf(repr).asCombiner), splitter, thatseq.splitter) mapResult {
+ _.resultWithTaskSupport
+ }
+ );
+ } else setTaskSupport(seq.zipAll(that, thisElem, thatElem)(bf2seq(bf)), tasksupport)
protected def toParCollection[U >: T, That](cbf: () => Combiner[U, That]): That = {
- executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter) mapResult { _.result });
+ tasksupport.executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter) mapResult { _.resultWithTaskSupport });
}
protected def toParMap[K, V, That](cbf: () => Combiner[(K, V), That])(implicit ev: T <:< (K, V)): That = {
- executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev) mapResult { _.resultWithTaskSupport })
}
def view = new ParIterableView[T, Repr, Sequential] {
@@ -810,7 +845,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
extends StrictSplitterCheckTask[R, Tp] {
protected[this] val pit: IterableSplitter[T]
protected[this] def newSubtask(p: IterableSplitter[T]): Accessor[R, Tp]
- def shouldSplitFurther = pit.shouldSplitFurther(self.repr, parallelismLevel)
+ def shouldSplitFurther = pit.shouldSplitFurther(self.repr, tasksupport.parallelismLevel)
def split = pit.splitWithSignalling.map(newSubtask(_)) // default split procedure
private[parallel] override def signalAbort = pit.abort
override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")(supername: " + super.toString + ")"
@@ -844,8 +879,8 @@ self: ParIterableLike[T, Repr, Sequential] =>
(f: First, s: Second)
extends Composite[FR, SR, R, First, Second](f, s) {
def leaf(prevr: Option[R]) = {
- executeAndWaitResult(ft)
- executeAndWaitResult(st)
+ tasksupport.executeAndWaitResult(ft)
+ tasksupport.executeAndWaitResult(st)
mergeSubtasks
}
}
@@ -855,8 +890,8 @@ self: ParIterableLike[T, Repr, Sequential] =>
(f: First, s: Second)
extends Composite[FR, SR, R, First, Second](f, s) {
def leaf(prevr: Option[R]) = {
- val ftfuture = execute(ft)
- executeAndWaitResult(st)
+ val ftfuture = tasksupport.execute(ft)
+ tasksupport.executeAndWaitResult(st)
ftfuture()
mergeSubtasks
}
@@ -867,7 +902,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
@volatile var result: R1 = null.asInstanceOf[R1]
def map(r: R): R1
def leaf(prevr: Option[R1]) = {
- val initialResult = executeAndWaitResult(inner)
+ val initialResult = tasksupport.executeAndWaitResult(inner)
result = map(initialResult)
}
private[parallel] override def signalAbort() {
@@ -875,9 +910,9 @@ self: ParIterableLike[T, Repr, Sequential] =>
}
override def requiresStrictSplitters = inner.requiresStrictSplitters
}
-
+
protected trait Transformer[R, Tp] extends Accessor[R, Tp]
-
+
protected[this] class Foreach[S](op: T => S, protected[this] val pit: IterableSplitter[T])
extends Accessor[Unit, Foreach[S]] {
@volatile var result: Unit = ()
@@ -894,7 +929,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
override def merge(that: Count) = result = result + that.result
// override def toString = "CountTask(" + pittxt + ")"
}
-
+
protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: IterableSplitter[T])
extends Accessor[Option[U], Reduce[U]] {
@volatile var result: Option[U] = None
@@ -1303,7 +1338,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
} else result = that.result
override def requiresStrictSplitters = true
}
-
+
protected[this] class FromScanTree[U >: T, That]
(tree: ScanTree[U], z: U, op: (U, U) => U, cbf: CombinerFactory[U, That])
extends StrictSplitterCheckTask[Combiner[U, That], FromScanTree[U, That]] {
@@ -1339,7 +1374,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
/* scan tree */
- protected[this] def scanBlockSize = (thresholdFromSize(size, parallelismLevel) / 2) max 1
+ protected[this] def scanBlockSize = (thresholdFromSize(size, tasksupport.parallelismLevel) / 2) max 1
protected[this] trait ScanTree[U >: T] {
def beginsAt: Int
@@ -1379,13 +1414,13 @@ self: ParIterableLike[T, Repr, Sequential] =>
def rightmost = this
def print(depth: Int) = println((" " * depth) + this)
}
-
+
/* alias methods */
-
+
def /:[S](z: S)(op: (S, T) => S): S = foldLeft(z)(op);
-
+
def :\[S](z: S)(op: (T, S) => S): S = foldRight(z)(op);
-
+
/* debug information */
private[parallel] def debugInformation = "Parallel collection: " + this.getClass