summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAleksandar Prokopec <axel22@gmail.com>2012-02-23 16:34:34 +0100
committerAleksandar Prokopec <axel22@gmail.com>2012-02-23 16:34:34 +0100
commit1852a7ddf7f8c5fb4a85e64b73123d333e698932 (patch)
treef9b5f405c56544b37f51c9f99b8dab1882fe2d58 /src
parent4a984f82d5bfca05123c53bd385d0299818f8a75 (diff)
downloadscala-1852a7ddf7f8c5fb4a85e64b73123d333e698932.tar.gz
scala-1852a7ddf7f8c5fb4a85e64b73123d333e698932.tar.bz2
scala-1852a7ddf7f8c5fb4a85e64b73123d333e698932.zip
Add tasksupport as a configurable field in parallel collections.
This required a bit of refactoring in the tasks objects and implementations of various operations. Combiners now hold a reference to a tasksupport object and pass it on to their result if `resultWithTaskSupport` is called. Additionally, several bugs that have to do with CanBuildFrom and combiner resolution have been fixed.
Diffstat (limited to 'src')
-rw-r--r--src/library/scala/collection/parallel/Combiner.scala25
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala191
-rw-r--r--src/library/scala/collection/parallel/ParIterableViewLike.scala3
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala69
-rw-r--r--src/library/scala/collection/parallel/ParSeqViewLike.scala3
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala135
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala14
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala10
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArray.scala12
-rw-r--r--src/library/scala/collection/parallel/mutable/ParCtrie.scala4
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala7
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashSet.scala10
-rw-r--r--src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala12
-rw-r--r--src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala8
-rw-r--r--src/library/scala/collection/parallel/package.scala12
15 files changed, 295 insertions, 220 deletions
diff --git a/src/library/scala/collection/parallel/Combiner.scala b/src/library/scala/collection/parallel/Combiner.scala
index e304be92ae..69e3271d39 100644
--- a/src/library/scala/collection/parallel/Combiner.scala
+++ b/src/library/scala/collection/parallel/Combiner.scala
@@ -33,8 +33,21 @@ import scala.collection.generic.Sizing
* @since 2.9
*/
trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel {
-//self: EnvironmentPassingCombiner[Elem, To] =>
-
+
+ @transient
+ @volatile
+ var _combinerTaskSupport = defaultTaskSupport
+
+ def combinerTaskSupport = {
+ val cts = _combinerTaskSupport
+ if (cts eq null) {
+ _combinerTaskSupport = defaultTaskSupport
+ defaultTaskSupport
+ } else cts
+ }
+
+ def combinerTaskSupport_=(cts: TaskSupport) = _combinerTaskSupport = cts
+
/** Combines the contents of the receiver builder and the `other` builder,
* producing a new builder containing both their elements.
*
@@ -69,6 +82,14 @@ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel {
*/
def canBeShared: Boolean = false
+ /** Constructs the result and sets the appropriate tasksupport object to the resulting collection
+ * if this is applicable.
+ */
+ def resultWithTaskSupport: To = {
+ val res = result
+ setTaskSupport(res, combinerTaskSupport)
+ }
+
}
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 7c5a835e56..cffd3bfbcf 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -28,7 +28,7 @@ import immutable.HashMapCombiner
import java.util.concurrent.atomic.AtomicBoolean
import annotation.unchecked.uncheckedVariance
-
+import annotation.unchecked.uncheckedStable
/** A template trait for parallel collections of type `ParIterable[T]`.
@@ -155,7 +155,19 @@ extends GenIterableLike[T, Repr]
{
self: ParIterableLike[T, Repr, Sequential] =>
- import tasksupport._
+ @transient
+ @volatile
+ private var _tasksupport = defaultTaskSupport
+
+ def tasksupport = {
+ val ts = _tasksupport
+ if (ts eq null) {
+ _tasksupport = defaultTaskSupport
+ defaultTaskSupport
+ } else ts
+ }
+
+ def tasksupport_=(ts: TaskSupport) = _tasksupport = ts
def seq: Sequential
@@ -306,7 +318,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
* if this $coll is empty.
*/
def reduce[U >: T](op: (U, U) => U): U = {
- executeAndWaitResult(new Reduce(op, splitter) mapResult { _.get })
+ tasksupport.executeAndWaitResult(new Reduce(op, splitter) mapResult { _.get })
}
/** Optionally reduces the elements of this sequence using the specified associative binary operator.
@@ -341,7 +353,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
* @return the result of applying fold operator `op` between all the elements and `z`
*/
def fold[U >: T](z: U)(op: (U, U) => U): U = {
- executeAndWaitResult(new Fold(z, op, splitter))
+ tasksupport.executeAndWaitResult(new Fold(z, op, splitter))
}
/** Aggregates the results of applying an operator to subsequent elements.
@@ -373,7 +385,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
* @param combop an associative operator used to combine results from different partitions
*/
def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
- executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))
+ tasksupport.executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))
}
def foldLeft[S](z: S)(op: (S, T) => S): S = seq.foldLeft(z)(op)
@@ -394,27 +406,27 @@ self: ParIterableLike[T, Repr, Sequential] =>
* @param f function applied to each element
*/
def foreach[U](f: T => U) = {
- executeAndWaitResult(new Foreach(f, splitter))
+ tasksupport.executeAndWaitResult(new Foreach(f, splitter))
}
def count(p: T => Boolean): Int = {
- executeAndWaitResult(new Count(p, splitter))
+ tasksupport.executeAndWaitResult(new Count(p, splitter))
}
def sum[U >: T](implicit num: Numeric[U]): U = {
- executeAndWaitResult(new Sum[U](num, splitter))
+ tasksupport.executeAndWaitResult(new Sum[U](num, splitter))
}
def product[U >: T](implicit num: Numeric[U]): U = {
- executeAndWaitResult(new Product[U](num, splitter))
+ tasksupport.executeAndWaitResult(new Product[U](num, splitter))
}
def min[U >: T](implicit ord: Ordering[U]): T = {
- executeAndWaitResult(new Min(ord, splitter) mapResult { _.get }).asInstanceOf[T]
+ tasksupport.executeAndWaitResult(new Min(ord, splitter) mapResult { _.get }).asInstanceOf[T]
}
def max[U >: T](implicit ord: Ordering[U]): T = {
- executeAndWaitResult(new Max(ord, splitter) mapResult { _.get }).asInstanceOf[T]
+ tasksupport.executeAndWaitResult(new Max(ord, splitter) mapResult { _.get }).asInstanceOf[T]
}
def maxBy[S](f: T => S)(implicit cmp: Ordering[S]): T = {
@@ -430,24 +442,24 @@ self: ParIterableLike[T, Repr, Sequential] =>
}
def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) {
- executeAndWaitResult(new Map[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result })
- } else seq.map(f)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(new Map[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.resultWithTaskSupport })
+ } else setTaskSupport(seq.map(f)(bf2seq(bf)), tasksupport)
/*bf ifParallel { pbf =>
- executeAndWaitResult(new Map[S, That](f, pbf, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new Map[S, That](f, pbf, splitter) mapResult { _.result })
} otherwise seq.map(f)(bf2seq(bf))*/
def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) {
- executeAndWaitResult(new Collect[S, That](pf, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result })
- } else seq.collect(pf)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(new Collect[S, That](pf, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.resultWithTaskSupport })
+ } else setTaskSupport(seq.collect(pf)(bf2seq(bf)), tasksupport)
/*bf ifParallel { pbf =>
- executeAndWaitResult(new Collect[S, That](pf, pbf, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new Collect[S, That](pf, pbf, splitter) mapResult { _.result })
} otherwise seq.collect(pf)(bf2seq(bf))*/
def flatMap[S, That](f: T => GenTraversableOnce[S])(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) {
- executeAndWaitResult(new FlatMap[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result })
- } else seq.flatMap(f)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(new FlatMap[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.resultWithTaskSupport })
+ } else setTaskSupport(seq.flatMap(f)(bf2seq(bf)), tasksupport)
/*bf ifParallel { pbf =>
- executeAndWaitResult(new FlatMap[S, That](f, pbf, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new FlatMap[S, That](f, pbf, splitter) mapResult { _.result })
} otherwise seq.flatMap(f)(bf2seq(bf))*/
/** Tests whether a predicate holds for all elements of this $coll.
@@ -458,7 +470,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
* @return true if `p` holds for all elements, false otherwise
*/
def forall(pred: T => Boolean): Boolean = {
- executeAndWaitResult(new Forall(pred, splitter assign new DefaultSignalling with VolatileAbort))
+ tasksupport.executeAndWaitResult(new Forall(pred, splitter assign new DefaultSignalling with VolatileAbort))
}
/** Tests whether a predicate holds for some element of this $coll.
@@ -469,7 +481,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
* @return true if `p` holds for some element, false otherwise
*/
def exists(pred: T => Boolean): Boolean = {
- executeAndWaitResult(new Exists(pred, splitter assign new DefaultSignalling with VolatileAbort))
+ tasksupport.executeAndWaitResult(new Exists(pred, splitter assign new DefaultSignalling with VolatileAbort))
}
/** Finds some element in the collection for which the predicate holds, if such
@@ -484,7 +496,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
* @return an option value with the element if such an element exists, or `None` otherwise
*/
def find(pred: T => Boolean): Option[T] = {
- executeAndWaitResult(new Find(pred, splitter assign new DefaultSignalling with VolatileAbort))
+ tasksupport.executeAndWaitResult(new Find(pred, splitter assign new DefaultSignalling with VolatileAbort))
}
/** Creates a combiner factory. Each combiner factory instance is used
@@ -500,6 +512,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
*/
protected[this] def combinerFactory = {
val combiner = newCombiner
+ combiner.combinerTaskSupport = tasksupport
if (combiner.canBeShared) new CombinerFactory[T, Repr] {
val shared = combiner
def apply() = shared
@@ -512,6 +525,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
protected[this] def combinerFactory[S, That](cbf: () => Combiner[S, That]) = {
val combiner = cbf()
+ combiner.combinerTaskSupport = tasksupport
if (combiner.canBeShared) new CombinerFactory[S, That] {
val shared = combiner
def apply() = shared
@@ -523,11 +537,11 @@ self: ParIterableLike[T, Repr, Sequential] =>
}
def filter(pred: T => Boolean): Repr = {
- executeAndWaitResult(new Filter(pred, combinerFactory, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new Filter(pred, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
}
def filterNot(pred: T => Boolean): Repr = {
- executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
}
def ++[U >: T, That](that: GenTraversableOnce[U])(implicit bf: CanBuildFrom[Repr, U, That]): That = {
@@ -542,43 +556,47 @@ self: ParIterableLike[T, Repr, Sequential] =>
tasksupport.executeAndWaitResult(othtask)
}
val task = (copythis parallel copythat) { _ combine _ } mapResult {
- _.result
+ _.resultWithTaskSupport
}
- executeAndWaitResult(task)
- } else if (bf.isParallel) {
+ tasksupport.executeAndWaitResult(task)
+ } else if (bf(repr).isCombiner) {
// println("case parallel builder, `that` not parallel")
- val pbf = bf.asParallel
- val copythis = new Copy(combinerFactory(() => pbf(repr)), splitter)
+ val copythis = new Copy(combinerFactory(() => bf(repr).asCombiner), splitter)
val copythat = wrap {
- val cb = pbf(repr)
+ val cb = bf(repr).asCombiner
for (elem <- that.seq) cb += elem
cb
}
- executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.result })
+ tasksupport.executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.resultWithTaskSupport })
} else {
// println("case not a parallel builder")
val b = bf(repr)
this.splitter.copy2builder[U, That, Builder[U, That]](b)
for (elem <- that.seq) b += elem
- b.result
+ setTaskSupport(b.result, tasksupport)
}
}
def partition(pred: T => Boolean): (Repr, Repr) = {
- executeAndWaitResult(new Partition(pred, combinerFactory, combinerFactory, splitter) mapResult { p => (p._1.result, p._2.result) })
+ tasksupport.executeAndWaitResult(
+ new Partition(pred, combinerFactory, combinerFactory, splitter) mapResult {
+ p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport)
+ }
+ )
}
def groupBy[K](f: T => K): immutable.ParMap[K, Repr] = {
- executeAndWaitResult(new GroupBy(f, () => HashMapCombiner[K, T], splitter) mapResult {
+ val r = tasksupport.executeAndWaitResult(new GroupBy(f, () => HashMapCombiner[K, T], splitter) mapResult {
rcb => rcb.groupByKey(() => combinerFactory())
})
+ setTaskSupport(r, tasksupport)
}
def take(n: Int): Repr = {
val actualn = if (size > n) n else size
if (actualn < MIN_FOR_COPY) take_sequential(actualn)
- else executeAndWaitResult(new Take(actualn, combinerFactory, splitter) mapResult {
- _.result
+ else tasksupport.executeAndWaitResult(new Take(actualn, combinerFactory, splitter) mapResult {
+ _.resultWithTaskSupport
})
}
@@ -591,13 +609,13 @@ self: ParIterableLike[T, Repr, Sequential] =>
cb += it.next
left -= 1
}
- cb.result
+ cb.resultWithTaskSupport
}
def drop(n: Int): Repr = {
val actualn = if (size > n) n else size
if ((size - actualn) < MIN_FOR_COPY) drop_sequential(actualn)
- else executeAndWaitResult(new Drop(actualn, combinerFactory, splitter) mapResult { _.result })
+ else tasksupport.executeAndWaitResult(new Drop(actualn, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
}
private def drop_sequential(n: Int) = {
@@ -605,14 +623,14 @@ self: ParIterableLike[T, Repr, Sequential] =>
val cb = newCombiner
cb.sizeHint(size - n)
while (it.hasNext) cb += it.next
- cb.result
+ cb.resultWithTaskSupport
}
override def slice(unc_from: Int, unc_until: Int): Repr = {
val from = unc_from min size max 0
val until = unc_until min size max from
if ((until - from) <= MIN_FOR_COPY) slice_sequential(from, until)
- else executeAndWaitResult(new Slice(from, until, combinerFactory, splitter) mapResult { _.result })
+ else tasksupport.executeAndWaitResult(new Slice(from, until, combinerFactory, splitter) mapResult { _.resultWithTaskSupport })
}
private def slice_sequential(from: Int, until: Int): Repr = {
@@ -623,11 +641,15 @@ self: ParIterableLike[T, Repr, Sequential] =>
cb += it.next
left -= 1
}
- cb.result
+ cb.resultWithTaskSupport
}
def splitAt(n: Int): (Repr, Repr) = {
- executeAndWaitResult(new SplitAt(n, combinerFactory, combinerFactory, splitter) mapResult { p => (p._1.result, p._2.result) })
+ tasksupport.executeAndWaitResult(
+ new SplitAt(n, combinerFactory, combinerFactory, splitter) mapResult {
+ p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport)
+ }
+ )
}
/** Computes a prefix scan of the elements of the collection.
@@ -645,20 +667,19 @@ self: ParIterableLike[T, Repr, Sequential] =>
*
* @return a new $coll containing the prefix scan of the elements in this $coll
*/
- def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (bf.isParallel) {
- val cbf = bf.asParallel
- if (parallelismLevel > 1) {
- if (size > 0) executeAndWaitResult(new CreateScanTree(0, size, z, op, splitter) mapResult {
- tree => executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => cbf(repr))) mapResult {
- cb => cb.result
+ def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (bf(repr).isCombiner) {
+ if (tasksupport.parallelismLevel > 1) {
+ if (size > 0) tasksupport.executeAndWaitResult(new CreateScanTree(0, size, z, op, splitter) mapResult {
+ tree => tasksupport.executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => bf(repr).asCombiner)) mapResult {
+ cb => cb.resultWithTaskSupport
})
- }) else (cbf(self.repr) += z).result
- } else seq.scan(z)(op)(bf2seq(bf))
- } else seq.scan(z)(op)(bf2seq(bf))
+ }) else setTaskSupport((bf(repr) += z).result, tasksupport)
+ } else setTaskSupport(seq.scan(z)(op)(bf2seq(bf)), tasksupport)
+ } else setTaskSupport(seq.scan(z)(op)(bf2seq(bf)), tasksupport)
- def scanLeft[S, That](z: S)(op: (S, T) => S)(implicit bf: CanBuildFrom[Repr, S, That]) = seq.scanLeft(z)(op)(bf2seq(bf))
+ def scanLeft[S, That](z: S)(op: (S, T) => S)(implicit bf: CanBuildFrom[Repr, S, That]) = setTaskSupport(seq.scanLeft(z)(op)(bf2seq(bf)), tasksupport)
- def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[Repr, S, That]) = seq.scanRight(z)(op)(bf2seq(bf))
+ def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[Repr, S, That]) = setTaskSupport(seq.scanRight(z)(op)(bf2seq(bf)), tasksupport)
/** Takes the longest prefix of elements that satisfy the predicate.
*
@@ -672,11 +693,15 @@ self: ParIterableLike[T, Repr, Sequential] =>
val cbf = combinerFactory
if (cbf.doesShareCombiners) {
val parseqspan = toSeq.takeWhile(pred)
- executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter) mapResult {
+ _.resultWithTaskSupport
+ })
} else {
val cntx = new DefaultSignalling with AtomicIndexFlag
cntx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx) mapResult { _._1.result })
+ tasksupport.executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx) mapResult {
+ _._1.resultWithTaskSupport
+ })
}
}
@@ -693,17 +718,17 @@ self: ParIterableLike[T, Repr, Sequential] =>
val cbf = combinerFactory
if (cbf.doesShareCombiners) {
val (xs, ys) = toSeq.span(pred)
- val copyxs = new Copy(combinerFactory, xs.splitter) mapResult { _.result }
- val copyys = new Copy(combinerFactory, ys.splitter) mapResult { _.result }
+ val copyxs = new Copy(combinerFactory, xs.splitter) mapResult { _.resultWithTaskSupport }
+ val copyys = new Copy(combinerFactory, ys.splitter) mapResult { _.resultWithTaskSupport }
val copyall = (copyxs parallel copyys) {
(xr, yr) => (xr, yr)
}
- executeAndWaitResult(copyall)
+ tasksupport.executeAndWaitResult(copyall)
} else {
val cntx = new DefaultSignalling with AtomicIndexFlag
cntx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult {
- p => (p._1.result, p._2.result)
+ tasksupport.executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult {
+ p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport)
})
}
}
@@ -721,7 +746,11 @@ self: ParIterableLike[T, Repr, Sequential] =>
def dropWhile(pred: T => Boolean): Repr = {
val cntx = new DefaultSignalling with AtomicIndexFlag
cntx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult { _._2.result })
+ tasksupport.executeAndWaitResult(
+ new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult {
+ _._2.resultWithTaskSupport
+ }
+ )
}
def copyToArray[U >: T](xs: Array[U]) = copyToArray(xs, 0)
@@ -729,31 +758,33 @@ self: ParIterableLike[T, Repr, Sequential] =>
def copyToArray[U >: T](xs: Array[U], start: Int) = copyToArray(xs, start, xs.length - start)
def copyToArray[U >: T](xs: Array[U], start: Int, len: Int) = if (len > 0) {
- executeAndWaitResult(new CopyToArray(start, len, xs, splitter))
+ tasksupport.executeAndWaitResult(new CopyToArray(start, len, xs, splitter))
}
def sameElements[U >: T](that: GenIterable[U]) = seq.sameElements(that)
- def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) {
- val pbf = bf.asParallel
+ def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf(repr).isCombiner && that.isParSeq) {
val thatseq = that.asParSeq
- executeAndWaitResult(new Zip(combinerFactory(() => pbf(repr)), splitter, thatseq.splitter) mapResult { _.result });
- } else seq.zip(that)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(new Zip(combinerFactory(() => bf(repr).asCombiner), splitter, thatseq.splitter) mapResult { _.resultWithTaskSupport });
+ } else setTaskSupport(seq.zip(that)(bf2seq(bf)), tasksupport)
def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip immutable.ParRange(0, size, 1, false)
- def zipAll[S, U >: T, That](that: GenIterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) {
- val pbf = bf.asParallel
+ def zipAll[S, U >: T, That](that: GenIterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf(repr).isCombiner && that.isParSeq) {
val thatseq = that.asParSeq
- executeAndWaitResult(new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => pbf(repr)), splitter, thatseq.splitter) mapResult { _.result });
- } else seq.zipAll(that, thisElem, thatElem)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(
+ new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => bf(repr).asCombiner), splitter, thatseq.splitter) mapResult {
+ _.resultWithTaskSupport
+ }
+ );
+ } else setTaskSupport(seq.zipAll(that, thisElem, thatElem)(bf2seq(bf)), tasksupport)
protected def toParCollection[U >: T, That](cbf: () => Combiner[U, That]): That = {
- executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter) mapResult { _.result });
+ tasksupport.executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter) mapResult { _.resultWithTaskSupport });
}
protected def toParMap[K, V, That](cbf: () => Combiner[(K, V), That])(implicit ev: T <:< (K, V)): That = {
- executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev) mapResult { _.resultWithTaskSupport })
}
def view = new ParIterableView[T, Repr, Sequential] {
@@ -810,7 +841,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
extends StrictSplitterCheckTask[R, Tp] {
protected[this] val pit: IterableSplitter[T]
protected[this] def newSubtask(p: IterableSplitter[T]): Accessor[R, Tp]
- def shouldSplitFurther = pit.shouldSplitFurther(self.repr, parallelismLevel)
+ def shouldSplitFurther = pit.shouldSplitFurther(self.repr, tasksupport.parallelismLevel)
def split = pit.splitWithSignalling.map(newSubtask(_)) // default split procedure
private[parallel] override def signalAbort = pit.abort
override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")(supername: " + super.toString + ")"
@@ -844,8 +875,8 @@ self: ParIterableLike[T, Repr, Sequential] =>
(f: First, s: Second)
extends Composite[FR, SR, R, First, Second](f, s) {
def leaf(prevr: Option[R]) = {
- executeAndWaitResult(ft)
- executeAndWaitResult(st)
+ tasksupport.executeAndWaitResult(ft)
+ tasksupport.executeAndWaitResult(st)
mergeSubtasks
}
}
@@ -855,8 +886,8 @@ self: ParIterableLike[T, Repr, Sequential] =>
(f: First, s: Second)
extends Composite[FR, SR, R, First, Second](f, s) {
def leaf(prevr: Option[R]) = {
- val ftfuture = execute(ft)
- executeAndWaitResult(st)
+ val ftfuture = tasksupport.execute(ft)
+ tasksupport.executeAndWaitResult(st)
ftfuture()
mergeSubtasks
}
@@ -867,7 +898,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
@volatile var result: R1 = null.asInstanceOf[R1]
def map(r: R): R1
def leaf(prevr: Option[R1]) = {
- val initialResult = executeAndWaitResult(inner)
+ val initialResult = tasksupport.executeAndWaitResult(inner)
result = map(initialResult)
}
private[parallel] override def signalAbort() {
@@ -1339,7 +1370,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
/* scan tree */
- protected[this] def scanBlockSize = (thresholdFromSize(size, parallelismLevel) / 2) max 1
+ protected[this] def scanBlockSize = (thresholdFromSize(size, tasksupport.parallelismLevel) / 2) max 1
protected[this] trait ScanTree[U >: T] {
def beginsAt: Int
diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala
index 1d7659922c..536139c812 100644
--- a/src/library/scala/collection/parallel/ParIterableViewLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala
@@ -47,7 +47,6 @@ extends GenIterableView[T, Coll]
with ParIterableLike[T, This, ThisSeq]
{
self =>
- import tasksupport._
override def foreach[U](f: T => U): Unit = super[ParIterableLike].foreach(f)
override protected[this] def newCombiner: Combiner[T, This] = throw new UnsupportedOperationException(this + ".newCombiner");
@@ -135,7 +134,7 @@ self =>
newZippedAllTryParSeq(that, thisElem, thatElem).asInstanceOf[That]
override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf =>
- executeAndWaitResult(new Force(pbf, splitter).mapResult(_.result).asInstanceOf[Task[That, ResultMapping[_, Force[U, That], That]]])
+ tasksupport.executeAndWaitResult(new Force(pbf, splitter).mapResult(_.result).asInstanceOf[Task[That, ResultMapping[_, Force[U, That], That]]])
} otherwise {
val b = bf(underlying)
b ++= this.iterator
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index 6a5ee5c69b..3d498ab41b 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -44,8 +44,7 @@ trait ParSeqLike[+T, +Repr <: ParSeq[T], +Sequential <: Seq[T] with SeqLike[T, S
extends scala.collection.GenSeqLike[T, Repr]
with ParIterableLike[T, Repr, Sequential] {
self =>
- import tasksupport._
-
+
type SuperParIterator = IterableSplitter[T]
/** A more refined version of the iterator found in the `ParallelIterable` trait,
@@ -107,7 +106,7 @@ self =>
val realfrom = if (from < 0) 0 else from
val ctx = new DefaultSignalling with AtomicIndexFlag
ctx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new SegmentLength(p, 0, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))._1
+ tasksupport.executeAndWaitResult(new SegmentLength(p, 0, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))._1
}
/** Finds the first element satisfying some predicate.
@@ -125,7 +124,7 @@ self =>
val realfrom = if (from < 0) 0 else from
val ctx = new DefaultSignalling with AtomicIndexFlag
ctx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))
+ tasksupport.executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))
}
/** Finds the last element satisfying some predicate.
@@ -143,18 +142,20 @@ self =>
val until = if (end >= length) length else end + 1
val ctx = new DefaultSignalling with AtomicIndexFlag
ctx.setIndexFlag(Int.MinValue)
- executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplitWithSignalling(until, length - until)(0) assign ctx))
+ tasksupport.executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplitWithSignalling(until, length - until)(0) assign ctx))
}
def reverse: Repr = {
- executeAndWaitResult(new Reverse(() => newCombiner, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new Reverse(() => newCombiner, splitter) mapResult { _.resultWithTaskSupport })
}
def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) {
- executeAndWaitResult(new ReverseMap[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.result })
- } else seq.reverseMap(f)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(
+ new ReverseMap[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.resultWithTaskSupport }
+ )
+ } else setTaskSupport(seq.reverseMap(f)(bf2seq(bf)), tasksupport)
/*bf ifParallel { pbf =>
- executeAndWaitResult(new ReverseMap[S, That](f, pbf, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new ReverseMap[S, That](f, pbf, splitter) mapResult { _.result })
} otherwise seq.reverseMap(f)(bf2seq(bf))*/
/** Tests whether this $coll contains the given sequence at a given index.
@@ -172,13 +173,15 @@ self =>
else if (pthat.length > length - offset) false
else {
val ctx = new DefaultSignalling with VolatileAbort
- executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(offset, pthat.length)(1) assign ctx, pthat.splitter))
+ tasksupport.executeAndWaitResult(
+ new SameElements(splitter.psplitWithSignalling(offset, pthat.length)(1) assign ctx, pthat.splitter)
+ )
}
} otherwise seq.startsWith(that, offset)
override def sameElements[U >: T](that: GenIterable[U]): Boolean = that ifParSeq { pthat =>
val ctx = new DefaultSignalling with VolatileAbort
- length == pthat.length && executeAndWaitResult(new SameElements(splitter assign ctx, pthat.splitter))
+ length == pthat.length && tasksupport.executeAndWaitResult(new SameElements(splitter assign ctx, pthat.splitter))
} otherwise seq.sameElements(that)
/** Tests whether this $coll ends with the given parallel sequence.
@@ -195,25 +198,24 @@ self =>
else {
val ctx = new DefaultSignalling with VolatileAbort
val tlen = that.length
- executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(length - tlen, tlen)(1) assign ctx, pthat.splitter))
+ tasksupport.executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(length - tlen, tlen)(1) assign ctx, pthat.splitter))
}
} otherwise seq.endsWith(that)
def patch[U >: T, That](from: Int, patch: GenSeq[U], replaced: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
val realreplaced = replaced min (length - from)
- if (patch.isParSeq && bf.isParallel && (size - realreplaced + patch.size) > MIN_FOR_COPY) {
+ if (patch.isParSeq && bf(repr).isCombiner && (size - realreplaced + patch.size) > MIN_FOR_COPY) {
val that = patch.asParSeq
- val pbf = bf.asParallel
val pits = splitter.psplitWithSignalling(from, replaced, length - from - realreplaced)
- val cfactory = combinerFactory(() => pbf(repr))
+ val cfactory = combinerFactory(() => bf(repr).asCombiner)
val copystart = new Copy[U, That](cfactory, pits(0))
val copymiddle = wrap {
val tsk = new that.Copy[U, That](cfactory, that.splitter)
tasksupport.executeAndWaitResult(tsk)
}
val copyend = new Copy[U, That](cfactory, pits(2))
- executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
- _.result
+ tasksupport.executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
+ _.resultWithTaskSupport
})
} else patch_sequential(from, patch.seq, replaced)
}
@@ -226,14 +228,18 @@ self =>
b ++= pits(0)
b ++= patch
b ++= pits(2)
- b.result
+ setTaskSupport(b.result, tasksupport)
}
def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (bf(repr).isCombiner) {
- executeAndWaitResult(new Updated(index, elem, () => bf(repr).asCombiner, splitter) mapResult { _.result })
- } else seq.updated(index, elem)(bf2seq(bf))
+ tasksupport.executeAndWaitResult(
+ new Updated(index, elem, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult {
+ _.resultWithTaskSupport
+ }
+ )
+ } else setTaskSupport(seq.updated(index, elem)(bf2seq(bf)), tasksupport)
/*bf ifParallel { pbf =>
- executeAndWaitResult(new Updated(index, elem, pbf, splitter) mapResult { _.result })
+ tasksupport.executeAndWaitResult(new Updated(index, elem, pbf, splitter) mapResult { _.result })
} otherwise seq.updated(index, elem)(bf2seq(bf))*/
def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
@@ -248,10 +254,13 @@ self =>
patch(length, new immutable.Repetition(elem, len - length), 0)
} else patch(length, Nil, 0);
- override def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) {
- val pbf = bf.asParallel
+ override def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf(repr).isCombiner && that.isParSeq) {
val thatseq = that.asParSeq
- executeAndWaitResult(new Zip(length min thatseq.length, pbf, splitter, thatseq.splitter) mapResult { _.result });
+ tasksupport.executeAndWaitResult(
+ new Zip(length min thatseq.length, combinerFactory(() => bf(repr).asCombiner), splitter, thatseq.splitter) mapResult {
+ _.resultWithTaskSupport
+ }
+ );
} else super.zip(that)(bf)
/** Tests whether every element of this $coll relates to the
@@ -268,7 +277,7 @@ self =>
*/
def corresponds[S](that: GenSeq[S])(p: (T, S) => Boolean): Boolean = that ifParSeq { pthat =>
val ctx = new DefaultSignalling with VolatileAbort
- length == pthat.length && executeAndWaitResult(new Corresponds(p, splitter assign ctx, pthat.splitter))
+ length == pthat.length && tasksupport.executeAndWaitResult(new Corresponds(p, splitter assign ctx, pthat.splitter))
} otherwise seq.corresponds(that)(p)
def diff[U >: T](that: GenSeq[U]): Repr = sequentially {
@@ -424,7 +433,7 @@ self =>
override def requiresStrictSplitters = true
}
- protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: () => Combiner[U, That], protected[this] val pit: SeqSplitter[T])
+ protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CombinerFactory[U, That], protected[this] val pit: SeqSplitter[T])
extends Transformer[Combiner[U, That], Updated[U, That]] {
@volatile var result: Combiner[U, That] = null
def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf())
@@ -437,10 +446,10 @@ self =>
override def requiresStrictSplitters = true
}
- protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S])
+ protected[this] class Zip[U >: T, S, That](len: Int, cf: CombinerFactory[(U, S), That], protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S])
extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] {
@volatile var result: Result = null
- def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, pbf(self.repr))
+ def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, cf())
protected[this] def newSubtask(p: SuperParIterator) = unsupported
override def split = {
val fp = len / 2
@@ -448,8 +457,8 @@ self =>
val pits = pit.psplitWithSignalling(fp, sp)
val opits = otherpit.psplitWithSignalling(fp, sp)
Seq(
- new Zip(fp, pbf, pits(0), opits(0)),
- new Zip(sp, pbf, pits(1), opits(1))
+ new Zip(fp, cf, pits(0), opits(0)),
+ new Zip(sp, cf, pits(1), opits(1))
)
}
override def merge(that: Zip[U, S, That]) = result = result combine that.result
diff --git a/src/library/scala/collection/parallel/ParSeqViewLike.scala b/src/library/scala/collection/parallel/ParSeqViewLike.scala
index 6fdc181793..e0d1a7d6ff 100644
--- a/src/library/scala/collection/parallel/ParSeqViewLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqViewLike.scala
@@ -38,7 +38,6 @@ extends GenSeqView[T, Coll]
with ParSeqLike[T, This, ThisSeq]
{
self =>
- import tasksupport._
trait Transformed[+S] extends ParSeqView[S, Coll, CollSeq]
with super[ParIterableView].Transformed[S] with super[GenSeqViewLike].Transformed[S] {
@@ -170,7 +169,7 @@ self =>
override def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisParSeq.scanRight(z)(op)).asInstanceOf[That]
override def groupBy[K](f: T => K): immutable.ParMap[K, This] = thisParSeq.groupBy(f).map(kv => (kv._1, newForced(kv._2).asInstanceOf[This]))
override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf =>
- executeAndWaitResult(new Force(pbf, splitter).mapResult(_.result).asInstanceOf[Task[That, _]])
+ tasksupport.executeAndWaitResult(new Force(pbf, splitter).mapResult(_.result).asInstanceOf[Task[That, _]])
} otherwise {
val b = bf(underlying)
b ++= this.iterator
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index b705909cad..e32ac443ae 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -6,102 +6,99 @@
** |/ **
\* */
-
package scala.collection.parallel
-
import scala.concurrent.forkjoin._
import scala.util.control.Breaks._
-
import annotation.unchecked.uncheckedVariance
+trait Task[R, +Tp] {
+ type Result = R
-/** A trait that declares task execution capabilities used
- * by parallel collections.
- */
-trait Tasks {
-
- private[parallel] val debugMessages = collection.mutable.ArrayBuffer[String]()
+ def repr = this.asInstanceOf[Tp]
- private[parallel] def debuglog(s: String) = synchronized {
- debugMessages += s
- }
-
- trait Task[R, +Tp] {
- type Result = R
-
- def repr = this.asInstanceOf[Tp]
-
- /** Body of the task - non-divisible unit of work done by this task.
- * Optionally is provided with the result from the previous completed task
- * or `None` if there was no previous task (or the previous task is uncompleted or unknown).
- */
- def leaf(result: Option[R])
+ /** Body of the task - non-divisible unit of work done by this task.
+ * Optionally is provided with the result from the previous completed task
+ * or `None` if there was no previous task (or the previous task is uncompleted or unknown).
+ */
+ def leaf(result: Option[R])
- /** A result that can be accessed once the task is completed. */
- var result: R
+ /** A result that can be accessed once the task is completed. */
+ var result: R
- /** Decides whether or not this task should be split further. */
- def shouldSplitFurther: Boolean
+ /** Decides whether or not this task should be split further. */
+ def shouldSplitFurther: Boolean
- /** Splits this task into a list of smaller tasks. */
- private[parallel] def split: Seq[Task[R, Tp]]
+ /** Splits this task into a list of smaller tasks. */
+ private[parallel] def split: Seq[Task[R, Tp]]
- /** Read of results of `that` task and merge them into results of this one. */
- private[parallel] def merge(that: Tp @uncheckedVariance) {}
+ /** Read of results of `that` task and merge them into results of this one. */
+ private[parallel] def merge(that: Tp @uncheckedVariance) {}
- // exception handling mechanism
- @volatile var throwable: Throwable = null
- def forwardThrowable() = if (throwable != null) throw throwable
+ // exception handling mechanism
+ @volatile var throwable: Throwable = null
+ def forwardThrowable() = if (throwable != null) throw throwable
- // tries to do the leaf computation, storing the possible exception
- private[parallel] def tryLeaf(lastres: Option[R]) {
- try {
- tryBreakable {
- leaf(lastres)
- result = result // ensure that effects of `leaf` are visible to readers of `result`
- } catchBreak {
- signalAbort
- }
- } catch {
- case thr: Exception =>
- result = result // ensure that effects of `leaf` are visible
- throwable = thr
- signalAbort
+ // tries to do the leaf computation, storing the possible exception
+ private[parallel] def tryLeaf(lastres: Option[R]) {
+ try {
+ tryBreakable {
+ leaf(lastres)
+ result = result // ensure that effects of `leaf` are visible to readers of `result`
+ } catchBreak {
+ signalAbort
}
+ } catch {
+ case thr: Exception =>
+ result = result // ensure that effects of `leaf` are visible
+ throwable = thr
+ signalAbort
}
+ }
- private[parallel] def tryMerge(t: Tp @uncheckedVariance) {
- val that = t.asInstanceOf[Task[R, Tp]]
- val local = result // ensure that any effects of modifying `result` are detected
- // checkMerge(that)
- if (this.throwable == null && that.throwable == null) merge(t)
- mergeThrowables(that)
- }
+ private[parallel] def tryMerge(t: Tp @uncheckedVariance) {
+ val that = t.asInstanceOf[Task[R, Tp]]
+ val local = result // ensure that any effects of modifying `result` are detected
+ // checkMerge(that)
+ if (this.throwable == null && that.throwable == null) merge(t)
+ mergeThrowables(that)
+ }
- private def checkMerge(that: Task[R, Tp] @uncheckedVariance) {
- if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) {
- println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable)
- } else if (this.throwable != null || that.throwable != null) {
- println("merging this: " + this + " with thr: " + this.throwable + " with " + that + ", thr=" + that.throwable)
- }
+ private def checkMerge(that: Task[R, Tp] @uncheckedVariance) {
+ if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) {
+ println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable)
+ } else if (this.throwable != null || that.throwable != null) {
+ println("merging this: " + this + " with thr: " + this.throwable + " with " + that + ", thr=" + that.throwable)
}
+ }
- private[parallel] def mergeThrowables(that: Task[_, _]) {
- if (this.throwable != null && that.throwable != null) {
- // merge exceptions, since there were multiple exceptions
- this.throwable = this.throwable alongWith that.throwable
- } else if (that.throwable != null) this.throwable = that.throwable
+ private[parallel] def mergeThrowables(that: Task[_, _]) {
+ if (this.throwable != null && that.throwable != null) {
+ // merge exceptions, since there were multiple exceptions
+ this.throwable = this.throwable alongWith that.throwable
+ } else if (that.throwable != null) this.throwable = that.throwable
else this.throwable = this.throwable
- }
+ }
+
+ // override in concrete task implementations to signal abort to other tasks
+ private[parallel] def signalAbort() {}
+}
+
- // override in concrete task implementations to signal abort to other tasks
- private[parallel] def signalAbort() {}
+/** A trait that declares task execution capabilities used
+ * by parallel collections.
+ */
+trait Tasks {
+
+ private[parallel] val debugMessages = collection.mutable.ArrayBuffer[String]()
+
+ private[parallel] def debuglog(s: String) = synchronized {
+ debugMessages += s
}
trait TaskImpl[R, +Tp] {
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 7adf51cffb..266b179401 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -8,6 +8,8 @@
package scala.collection.parallel.immutable
+
+
import scala.collection.parallel.ParMapLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.IterableSplitter
@@ -19,6 +21,9 @@ import scala.collection.generic.GenericParMapTemplate
import scala.collection.generic.GenericParMapCompanion
import scala.collection.immutable.{ HashMap, TrieIterator }
import annotation.unchecked.uncheckedVariance
+import collection.parallel.Task
+
+
/** Immutable parallel hash map, based on hash tries.
*
@@ -153,7 +158,6 @@ private[parallel] abstract class HashMapCombiner[K, V]
extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), HashMapCombiner[K, V]](HashMapCombiner.rootsize) {
//self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
import HashMapCombiner._
- import collection.parallel.tasksupport._
val emptyTrie = HashMap.empty[K, V]
def +=(elem: (K, V)) = {
@@ -173,7 +177,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), Has
val bucks = buckets.filter(_ != null).map(_.headPtr)
val root = new Array[HashMap[K, V]](bucks.length)
- executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length))
+ combinerTaskSupport.executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length))
var bitmap = 0
var i = 0
@@ -195,7 +199,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), Has
val bucks = buckets.filter(_ != null).map(_.headPtr)
val root = new Array[HashMap[K, AnyRef]](bucks.length)
- executeAndWaitResult(new CreateGroupedTrie(cbf, bucks, root, 0, bucks.length))
+ combinerTaskSupport.executeAndWaitResult(new CreateGroupedTrie(cbf, bucks, root, 0, bucks.length))
var bitmap = 0
var i = 0
@@ -256,7 +260,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), Has
val fp = howmany / 2
List(new CreateTrie(bucks, root, offset, fp), new CreateTrie(bucks, root, offset + fp, howmany - fp))
}
- def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, combinerTaskSupport.parallelismLevel)
}
class CreateGroupedTrie[Repr](cbf: () => Combiner[V, Repr], bucks: Array[Unrolled[(K, V)]], root: Array[HashMap[K, AnyRef]], offset: Int, howmany: Int)
@@ -321,7 +325,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), Has
val fp = howmany / 2
List(new CreateGroupedTrie(cbf, bucks, root, offset, fp), new CreateGroupedTrie(cbf, bucks, root, offset + fp, howmany - fp))
}
- def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, combinerTaskSupport.parallelismLevel)
}
}
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
index 1cf0ccd391..0d7f04976e 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -8,6 +8,8 @@
package scala.collection.parallel.immutable
+
+
import scala.collection.parallel.ParSetLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.IterableSplitter
@@ -19,6 +21,9 @@ import scala.collection.generic.GenericParTemplate
import scala.collection.generic.GenericParCompanion
import scala.collection.generic.GenericCompanion
import scala.collection.immutable.{ HashSet, TrieIterator }
+import collection.parallel.Task
+
+
/** Immutable parallel hash set, based on hash tries.
*
@@ -127,7 +132,6 @@ private[immutable] abstract class HashSetCombiner[T]
extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombiner[T]](HashSetCombiner.rootsize) {
//self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
import HashSetCombiner._
- import collection.parallel.tasksupport._
val emptyTrie = HashSet.empty[T]
def +=(elem: T) = {
@@ -147,7 +151,7 @@ extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombine
val bucks = buckets.filter(_ != null).map(_.headPtr)
val root = new Array[HashSet[T]](bucks.length)
- executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length))
+ combinerTaskSupport.executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length))
var bitmap = 0
var i = 0
@@ -202,7 +206,7 @@ extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombine
val fp = howmany / 2
List(new CreateTrie(bucks, root, offset, fp), new CreateTrie(bucks, root, offset + fp, howmany - fp))
}
- def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, combinerTaskSupport.parallelismLevel)
}
}
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
index 72a8184b10..5c3da66be0 100644
--- a/src/library/scala/collection/parallel/mutable/ParArray.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -21,6 +21,7 @@ import scala.collection.generic.Sizing
import scala.collection.parallel.Combiner
import scala.collection.parallel.SeqSplitter
import scala.collection.parallel.ParSeqLike
+import scala.collection.parallel.Task
import scala.collection.parallel.CHECK_RATE
import scala.collection.mutable.ArraySeq
import scala.collection.mutable.Builder
@@ -56,7 +57,6 @@ extends ParSeq[T]
with Serializable
{
self =>
- import collection.parallel.tasksupport._
@transient private var array: Array[Any] = arrayseq.array.asInstanceOf[Array[Any]]
@@ -584,22 +584,22 @@ self =>
val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
// fill it in parallel
- executeAndWaitResult(new Map[S](f, targetarr, 0, length))
+ tasksupport.executeAndWaitResult(new Map[S](f, targetarr, 0, length))
// wrap it into a parallel array
(new ParArray[S](targarrseq)).asInstanceOf[That]
} else super.map(f)(bf)
override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanBuildFrom[ParArray[T], U, That]): That =
- if (parallelismLevel > 1 && buildsArray(cbf(repr))) {
+ if (tasksupport.parallelismLevel > 1 && buildsArray(cbf(repr))) {
// reserve an array
val targarrseq = new ArraySeq[U](length + 1)
val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
targetarr(0) = z
// do a parallel prefix scan
- if (length > 0) executeAndWaitResult(new CreateScanTree[U](0, size, z, op, splitter) mapResult {
- tree => executeAndWaitResult(new ScanToArray(tree, z, op, targetarr))
+ if (length > 0) tasksupport.executeAndWaitResult(new CreateScanTree[U](0, size, z, op, splitter) mapResult {
+ tree => tasksupport.executeAndWaitResult(new ScanToArray(tree, z, op, targetarr))
})
// wrap the array into a parallel array
@@ -661,7 +661,7 @@ self =>
val fp = howmany / 2
List(new Map(f, targetarr, offset, fp), new Map(f, targetarr, offset + fp, howmany - fp))
}
- def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(length, parallelismLevel)
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(length, tasksupport.parallelismLevel)
}
/* serialization */
diff --git a/src/library/scala/collection/parallel/mutable/ParCtrie.scala b/src/library/scala/collection/parallel/mutable/ParCtrie.scala
index cec2e6886d..178424decc 100644
--- a/src/library/scala/collection/parallel/mutable/ParCtrie.scala
+++ b/src/library/scala/collection/parallel/mutable/ParCtrie.scala
@@ -13,6 +13,7 @@ package scala.collection.parallel.mutable
import scala.collection.generic._
import scala.collection.parallel.Combiner
import scala.collection.parallel.IterableSplitter
+import scala.collection.parallel.Task
import scala.collection.mutable.BasicNode
import scala.collection.mutable.TNode
import scala.collection.mutable.LNode
@@ -40,7 +41,6 @@ extends ParMap[K, V]
with ParCtrieCombiner[K, V]
with Serializable
{
- import collection.parallel.tasksupport._
def this() = this(new Ctrie)
@@ -83,7 +83,7 @@ extends ParMap[K, V]
case tn: TNode[_, _] => tn.cachedSize(ctrie)
case ln: LNode[_, _] => ln.cachedSize(ctrie)
case cn: CNode[_, _] =>
- executeAndWaitResult(new Size(0, cn.array.length, cn.array))
+ tasksupport.executeAndWaitResult(new Size(0, cn.array.length, cn.array))
cn.cachedSize(ctrie)
}
}
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
index 21a5b05749..6ce6c45460 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -17,6 +17,7 @@ import collection.mutable.DefaultEntry
import collection.mutable.HashEntry
import collection.mutable.HashTable
import collection.mutable.UnrolledBuffer
+import collection.parallel.Task
@@ -156,8 +157,6 @@ private[mutable] abstract class ParHashMapCombiner[K, V](private val tableLoadFa
extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntry[K, V], ParHashMapCombiner[K, V]](ParHashMapCombiner.numblocks)
with collection.mutable.HashTable.HashUtils[K]
{
-//self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
- import collection.parallel.tasksupport._
private var mask = ParHashMapCombiner.discriminantmask
private var nonmasklen = ParHashMapCombiner.nonmasklength
private var seedvalue = 27
@@ -179,7 +178,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr
// construct table
val table = new AddingHashTable(size, tableLoadFactor, seedvalue)
val bucks = buckets.map(b => if (b ne null) b.headPtr else null)
- val insertcount = executeAndWaitResult(new FillBlocks(bucks, table, 0, bucks.length))
+ val insertcount = combinerTaskSupport.executeAndWaitResult(new FillBlocks(bucks, table, 0, bucks.length))
table.setSize(insertcount)
// TODO compare insertcount and size to see if compression is needed
val c = table.hashTableContents
@@ -300,7 +299,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr
override def merge(that: FillBlocks) {
this.result += that.result
}
- def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, parallelismLevel)
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, combinerTaskSupport.parallelismLevel)
}
}
diff --git a/src/library/scala/collection/parallel/mutable/ParHashSet.scala b/src/library/scala/collection/parallel/mutable/ParHashSet.scala
index 6c5f513ad0..811fc8bfe7 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashSet.scala
@@ -8,10 +8,15 @@
package scala.collection.parallel.mutable
+
+
import collection.generic._
import collection.mutable.FlatHashTable
import collection.parallel.Combiner
import collection.mutable.UnrolledBuffer
+import collection.parallel.Task
+
+
/** A parallel hash set.
*
@@ -113,7 +118,6 @@ private[mutable] abstract class ParHashSetCombiner[T](private val tableLoadFacto
extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, ParHashSetCombiner[T]](ParHashSetCombiner.numblocks)
with collection.mutable.FlatHashTable.HashUtils[T] {
//self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
- import collection.parallel.tasksupport._
private var mask = ParHashSetCombiner.discriminantmask
private var nonmasklen = ParHashSetCombiner.nonmasklength
private var seedvalue = 27
@@ -139,7 +143,7 @@ with collection.mutable.FlatHashTable.HashUtils[T] {
private def parPopulate: FlatHashTable.Contents[T] = {
// construct it in parallel
val table = new AddingFlatHashTable(size, tableLoadFactor, seedvalue)
- val (inserted, leftovers) = executeAndWaitResult(new FillBlocks(buckets, table, 0, buckets.length))
+ val (inserted, leftovers) = combinerTaskSupport.executeAndWaitResult(new FillBlocks(buckets, table, 0, buckets.length))
var leftinserts = 0
for (elem <- leftovers) leftinserts += table.insertEntry(0, table.tableLength, elem.asInstanceOf[T])
table.setSize(leftinserts + inserted)
@@ -304,7 +308,7 @@ with collection.mutable.FlatHashTable.HashUtils[T] {
// the total number of successfully inserted elements is adjusted accordingly
result = (this.result._1 + that.result._1 + inserted, remainingLeftovers concat that.result._2)
}
- def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, parallelismLevel)
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, combinerTaskSupport.parallelismLevel)
}
}
diff --git a/src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala
index eadc93d422..01eb17024e 100644
--- a/src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala
@@ -8,18 +8,20 @@
package scala.collection.parallel.mutable
+
+
import scala.collection.generic.Sizing
import scala.collection.mutable.ArraySeq
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.TaskSupport
-//import scala.collection.parallel.EnvironmentPassingCombiner
import scala.collection.parallel.unsupportedop
import scala.collection.parallel.Combiner
+import scala.collection.parallel.Task
+
+
/** An array combiner that uses a chain of arraybuffers to store elements. */
trait ResizableParArrayCombiner[T] extends LazyCombiner[T, ParArray[T], ExposedArrayBuffer[T]] {
-//self: EnvironmentPassingCombiner[T, ParArray[T]] =>
- import collection.parallel.tasksupport._
override def sizeHint(sz: Int) = if (chain.length == 1) chain(0).sizeHint(sz)
@@ -30,7 +32,7 @@ trait ResizableParArrayCombiner[T] extends LazyCombiner[T, ParArray[T], ExposedA
val arrayseq = new ArraySeq[T](size)
val array = arrayseq.array.asInstanceOf[Array[Any]]
- executeAndWaitResult(new CopyChainToArray(array, 0, size))
+ combinerTaskSupport.executeAndWaitResult(new CopyChainToArray(array, 0, size))
new ParArray(arrayseq)
} else { // optimisation if there is only 1 array
@@ -79,7 +81,7 @@ trait ResizableParArrayCombiner[T] extends LazyCombiner[T, ParArray[T], ExposedA
val fp = howmany / 2
List(new CopyChainToArray(array, offset, fp), new CopyChainToArray(array, offset + fp, howmany - fp))
}
- def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, parallelismLevel)
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, combinerTaskSupport.parallelismLevel)
}
}
diff --git a/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala
index dc583fb4e7..410b542a68 100644
--- a/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala
@@ -18,9 +18,9 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.UnrolledBuffer
import scala.collection.mutable.UnrolledBuffer.Unrolled
import scala.collection.parallel.TaskSupport
-//import scala.collection.parallel.EnvironmentPassingCombiner
import scala.collection.parallel.unsupportedop
import scala.collection.parallel.Combiner
+import scala.collection.parallel.Task
@@ -40,8 +40,6 @@ extends Combiner[T, ParArray[T]] {
// because size is doubling, random access is O(logn)!
val buff = new DoublingUnrolledBuffer[Any]
- import collection.parallel.tasksupport._
-
def +=(elem: T) = {
buff += elem
this
@@ -51,7 +49,7 @@ extends Combiner[T, ParArray[T]] {
val arrayseq = new ArraySeq[T](size)
val array = arrayseq.array.asInstanceOf[Array[Any]]
- executeAndWaitResult(new CopyUnrolledToArray(array, 0, size))
+ combinerTaskSupport.executeAndWaitResult(new CopyUnrolledToArray(array, 0, size))
new ParArray(arrayseq)
}
@@ -109,7 +107,7 @@ extends Combiner[T, ParArray[T]] {
val fp = howmany / 2
List(new CopyUnrolledToArray(array, offset, fp), new CopyUnrolledToArray(array, offset + fp, howmany - fp))
}
- def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, parallelismLevel)
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, combinerTaskSupport.parallelismLevel)
override def toString = "CopyUnrolledToArray(" + offset + ", " + howmany + ")"
}
}
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 8f19d0ecdb..8f49b80c93 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -46,8 +46,16 @@ package object parallel {
else new ThreadPoolTaskSupport
} else new ThreadPoolTaskSupport
- val tasksupport = getTaskSupport
-
+ val defaultTaskSupport: TaskSupport = getTaskSupport
+
+ def setTaskSupport[Coll](c: Coll, t: TaskSupport): Coll = {
+ c match {
+ case pc: ParIterableLike[_, _, _] => pc.tasksupport = t
+ case _ => // do nothing
+ }
+ c
+ }
+
/* implicit conversions */
implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new FactoryOps[From, Elem, To] {