From 1852a7ddf7f8c5fb4a85e64b73123d333e698932 Mon Sep 17 00:00:00 2001 From: Aleksandar Prokopec Date: Thu, 23 Feb 2012 16:34:34 +0100 Subject: Add tasksupport as a configurable field in parallel collections. This required a bit of refactoring in the tasks objects and implementations of various operations. Combiners now hold a reference to a tasksupport object and pass it on to their result if `resultWithTaskSupport` is called. Additionally, several bugs that have to do with CanBuildFrom and combiner resolution have been fixed. --- .../scala/collection/parallel/Combiner.scala | 25 ++- .../collection/parallel/ParIterableLike.scala | 191 ++++++++++++--------- .../collection/parallel/ParIterableViewLike.scala | 3 +- .../scala/collection/parallel/ParSeqLike.scala | 69 ++++---- .../scala/collection/parallel/ParSeqViewLike.scala | 3 +- src/library/scala/collection/parallel/Tasks.scala | 135 +++++++-------- .../collection/parallel/immutable/ParHashMap.scala | 14 +- .../collection/parallel/immutable/ParHashSet.scala | 10 +- .../collection/parallel/mutable/ParArray.scala | 12 +- .../collection/parallel/mutable/ParCtrie.scala | 4 +- .../collection/parallel/mutable/ParHashMap.scala | 7 +- .../collection/parallel/mutable/ParHashSet.scala | 10 +- .../mutable/ResizableParArrayCombiner.scala | 12 +- .../mutable/UnrolledParArrayCombiner.scala | 8 +- .../scala/collection/parallel/package.scala | 12 +- 15 files changed, 295 insertions(+), 220 deletions(-) (limited to 'src') diff --git a/src/library/scala/collection/parallel/Combiner.scala b/src/library/scala/collection/parallel/Combiner.scala index e304be92ae..69e3271d39 100644 --- a/src/library/scala/collection/parallel/Combiner.scala +++ b/src/library/scala/collection/parallel/Combiner.scala @@ -33,8 +33,21 @@ import scala.collection.generic.Sizing * @since 2.9 */ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel { -//self: EnvironmentPassingCombiner[Elem, To] => - + + @transient + @volatile + var _combinerTaskSupport = defaultTaskSupport + + def combinerTaskSupport = { + val cts = _combinerTaskSupport + if (cts eq null) { + _combinerTaskSupport = defaultTaskSupport + defaultTaskSupport + } else cts + } + + def combinerTaskSupport_=(cts: TaskSupport) = _combinerTaskSupport = cts + /** Combines the contents of the receiver builder and the `other` builder, * producing a new builder containing both their elements. * @@ -69,6 +82,14 @@ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel { */ def canBeShared: Boolean = false + /** Constructs the result and sets the appropriate tasksupport object to the resulting collection + * if this is applicable. + */ + def resultWithTaskSupport: To = { + val res = result + setTaskSupport(res, combinerTaskSupport) + } + } diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 7c5a835e56..cffd3bfbcf 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -28,7 +28,7 @@ import immutable.HashMapCombiner import java.util.concurrent.atomic.AtomicBoolean import annotation.unchecked.uncheckedVariance - +import annotation.unchecked.uncheckedStable /** A template trait for parallel collections of type `ParIterable[T]`. @@ -155,7 +155,19 @@ extends GenIterableLike[T, Repr] { self: ParIterableLike[T, Repr, Sequential] => - import tasksupport._ + @transient + @volatile + private var _tasksupport = defaultTaskSupport + + def tasksupport = { + val ts = _tasksupport + if (ts eq null) { + _tasksupport = defaultTaskSupport + defaultTaskSupport + } else ts + } + + def tasksupport_=(ts: TaskSupport) = _tasksupport = ts def seq: Sequential @@ -306,7 +318,7 @@ self: ParIterableLike[T, Repr, Sequential] => * if this $coll is empty. */ def reduce[U >: T](op: (U, U) => U): U = { - executeAndWaitResult(new Reduce(op, splitter) mapResult { _.get }) + tasksupport.executeAndWaitResult(new Reduce(op, splitter) mapResult { _.get }) } /** Optionally reduces the elements of this sequence using the specified associative binary operator. @@ -341,7 +353,7 @@ self: ParIterableLike[T, Repr, Sequential] => * @return the result of applying fold operator `op` between all the elements and `z` */ def fold[U >: T](z: U)(op: (U, U) => U): U = { - executeAndWaitResult(new Fold(z, op, splitter)) + tasksupport.executeAndWaitResult(new Fold(z, op, splitter)) } /** Aggregates the results of applying an operator to subsequent elements. @@ -373,7 +385,7 @@ self: ParIterableLike[T, Repr, Sequential] => * @param combop an associative operator used to combine results from different partitions */ def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = { - executeAndWaitResult(new Aggregate(z, seqop, combop, splitter)) + tasksupport.executeAndWaitResult(new Aggregate(z, seqop, combop, splitter)) } def foldLeft[S](z: S)(op: (S, T) => S): S = seq.foldLeft(z)(op) @@ -394,27 +406,27 @@ self: ParIterableLike[T, Repr, Sequential] => * @param f function applied to each element */ def foreach[U](f: T => U) = { - executeAndWaitResult(new Foreach(f, splitter)) + tasksupport.executeAndWaitResult(new Foreach(f, splitter)) } def count(p: T => Boolean): Int = { - executeAndWaitResult(new Count(p, splitter)) + tasksupport.executeAndWaitResult(new Count(p, splitter)) } def sum[U >: T](implicit num: Numeric[U]): U = { - executeAndWaitResult(new Sum[U](num, splitter)) + tasksupport.executeAndWaitResult(new Sum[U](num, splitter)) } def product[U >: T](implicit num: Numeric[U]): U = { - executeAndWaitResult(new Product[U](num, splitter)) + tasksupport.executeAndWaitResult(new Product[U](num, splitter)) } def min[U >: T](implicit ord: Ordering[U]): T = { - executeAndWaitResult(new Min(ord, splitter) mapResult { _.get }).asInstanceOf[T] + tasksupport.executeAndWaitResult(new Min(ord, splitter) mapResult { _.get }).asInstanceOf[T] } def max[U >: T](implicit ord: Ordering[U]): T = { - executeAndWaitResult(new Max(ord, splitter) mapResult { _.get }).asInstanceOf[T] + tasksupport.executeAndWaitResult(new Max(ord, splitter) mapResult { _.get }).asInstanceOf[T] } def maxBy[S](f: T => S)(implicit cmp: Ordering[S]): T = { @@ -430,24 +442,24 @@ self: ParIterableLike[T, Repr, Sequential] => } def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) { - executeAndWaitResult(new Map[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result }) - } else seq.map(f)(bf2seq(bf)) + tasksupport.executeAndWaitResult(new Map[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.resultWithTaskSupport }) + } else setTaskSupport(seq.map(f)(bf2seq(bf)), tasksupport) /*bf ifParallel { pbf => - executeAndWaitResult(new Map[S, That](f, pbf, splitter) mapResult { _.result }) + tasksupport.executeAndWaitResult(new Map[S, That](f, pbf, splitter) mapResult { _.result }) } otherwise seq.map(f)(bf2seq(bf))*/ def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) { - executeAndWaitResult(new Collect[S, That](pf, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result }) - } else seq.collect(pf)(bf2seq(bf)) + tasksupport.executeAndWaitResult(new Collect[S, That](pf, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.resultWithTaskSupport }) + } else setTaskSupport(seq.collect(pf)(bf2seq(bf)), tasksupport) /*bf ifParallel { pbf => - executeAndWaitResult(new Collect[S, That](pf, pbf, splitter) mapResult { _.result }) + tasksupport.executeAndWaitResult(new Collect[S, That](pf, pbf, splitter) mapResult { _.result }) } otherwise seq.collect(pf)(bf2seq(bf))*/ def flatMap[S, That](f: T => GenTraversableOnce[S])(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) { - executeAndWaitResult(new FlatMap[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result }) - } else seq.flatMap(f)(bf2seq(bf)) + tasksupport.executeAndWaitResult(new FlatMap[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.resultWithTaskSupport }) + } else setTaskSupport(seq.flatMap(f)(bf2seq(bf)), tasksupport) /*bf ifParallel { pbf => - executeAndWaitResult(new FlatMap[S, That](f, pbf, splitter) mapResult { _.result }) + tasksupport.executeAndWaitResult(new FlatMap[S, That](f, pbf, splitter) mapResult { _.result }) } otherwise seq.flatMap(f)(bf2seq(bf))*/ /** Tests whether a predicate holds for all elements of this $coll. @@ -458,7 +470,7 @@ self: ParIterableLike[T, Repr, Sequential] => * @return true if `p` holds for all elements, false otherwise */ def forall(pred: T => Boolean): Boolean = { - executeAndWaitResult(new Forall(pred, splitter assign new DefaultSignalling with VolatileAbort)) + tasksupport.executeAndWaitResult(new Forall(pred, splitter assign new DefaultSignalling with VolatileAbort)) } /** Tests whether a predicate holds for some element of this $coll. @@ -469,7 +481,7 @@ self: ParIterableLike[T, Repr, Sequential] => * @return true if `p` holds for some element, false otherwise */ def exists(pred: T => Boolean): Boolean = { - executeAndWaitResult(new Exists(pred, splitter assign new DefaultSignalling with VolatileAbort)) + tasksupport.executeAndWaitResult(new Exists(pred, splitter assign new DefaultSignalling with VolatileAbort)) } /** Finds some element in the collection for which the predicate holds, if such @@ -484,7 +496,7 @@ self: ParIterableLike[T, Repr, Sequential] => * @return an option value with the element if such an element exists, or `None` otherwise */ def find(pred: T => Boolean): Option[T] = { - executeAndWaitResult(new Find(pred, splitter assign new DefaultSignalling with VolatileAbort)) + tasksupport.executeAndWaitResult(new Find(pred, splitter assign new DefaultSignalling with VolatileAbort)) } /** Creates a combiner factory. Each combiner factory instance is used @@ -500,6 +512,7 @@ self: ParIterableLike[T, Repr, Sequential] => */ protected[this] def combinerFactory = { val combiner = newCombiner + combiner.combinerTaskSupport = tasksupport if (combiner.canBeShared) new CombinerFactory[T, Repr] { val shared = combiner def apply() = shared @@ -512,6 +525,7 @@ self: ParIterableLike[T, Repr, Sequential] => protected[this] def combinerFactory[S, That](cbf: () => Combiner[S, That]) = { val combiner = cbf() + combiner.combinerTaskSupport = tasksupport if (combiner.canBeShared) new CombinerFactory[S, That] { val shared = combiner def apply() = shared @@ -523,11 +537,11 @@ self: ParIterableLike[T, Repr, Sequential] => } def filter(pred: T => Boolean): Repr = { - executeAndWaitResult(new Filter(pred, combinerFactory, splitter) mapResult { _.result }) + tasksupport.executeAndWaitResult(new Filter(pred, combinerFactory, splitter) mapResult { _.resultWithTaskSupport }) } def filterNot(pred: T => Boolean): Repr = { - executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter) mapResult { _.result }) + tasksupport.executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter) mapResult { _.resultWithTaskSupport }) } def ++[U >: T, That](that: GenTraversableOnce[U])(implicit bf: CanBuildFrom[Repr, U, That]): That = { @@ -542,43 +556,47 @@ self: ParIterableLike[T, Repr, Sequential] => tasksupport.executeAndWaitResult(othtask) } val task = (copythis parallel copythat) { _ combine _ } mapResult { - _.result + _.resultWithTaskSupport } - executeAndWaitResult(task) - } else if (bf.isParallel) { + tasksupport.executeAndWaitResult(task) + } else if (bf(repr).isCombiner) { // println("case parallel builder, `that` not parallel") - val pbf = bf.asParallel - val copythis = new Copy(combinerFactory(() => pbf(repr)), splitter) + val copythis = new Copy(combinerFactory(() => bf(repr).asCombiner), splitter) val copythat = wrap { - val cb = pbf(repr) + val cb = bf(repr).asCombiner for (elem <- that.seq) cb += elem cb } - executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.result }) + tasksupport.executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.resultWithTaskSupport }) } else { // println("case not a parallel builder") val b = bf(repr) this.splitter.copy2builder[U, That, Builder[U, That]](b) for (elem <- that.seq) b += elem - b.result + setTaskSupport(b.result, tasksupport) } } def partition(pred: T => Boolean): (Repr, Repr) = { - executeAndWaitResult(new Partition(pred, combinerFactory, combinerFactory, splitter) mapResult { p => (p._1.result, p._2.result) }) + tasksupport.executeAndWaitResult( + new Partition(pred, combinerFactory, combinerFactory, splitter) mapResult { + p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport) + } + ) } def groupBy[K](f: T => K): immutable.ParMap[K, Repr] = { - executeAndWaitResult(new GroupBy(f, () => HashMapCombiner[K, T], splitter) mapResult { + val r = tasksupport.executeAndWaitResult(new GroupBy(f, () => HashMapCombiner[K, T], splitter) mapResult { rcb => rcb.groupByKey(() => combinerFactory()) }) + setTaskSupport(r, tasksupport) } def take(n: Int): Repr = { val actualn = if (size > n) n else size if (actualn < MIN_FOR_COPY) take_sequential(actualn) - else executeAndWaitResult(new Take(actualn, combinerFactory, splitter) mapResult { - _.result + else tasksupport.executeAndWaitResult(new Take(actualn, combinerFactory, splitter) mapResult { + _.resultWithTaskSupport }) } @@ -591,13 +609,13 @@ self: ParIterableLike[T, Repr, Sequential] => cb += it.next left -= 1 } - cb.result + cb.resultWithTaskSupport } def drop(n: Int): Repr = { val actualn = if (size > n) n else size if ((size - actualn) < MIN_FOR_COPY) drop_sequential(actualn) - else executeAndWaitResult(new Drop(actualn, combinerFactory, splitter) mapResult { _.result }) + else tasksupport.executeAndWaitResult(new Drop(actualn, combinerFactory, splitter) mapResult { _.resultWithTaskSupport }) } private def drop_sequential(n: Int) = { @@ -605,14 +623,14 @@ self: ParIterableLike[T, Repr, Sequential] => val cb = newCombiner cb.sizeHint(size - n) while (it.hasNext) cb += it.next - cb.result + cb.resultWithTaskSupport } override def slice(unc_from: Int, unc_until: Int): Repr = { val from = unc_from min size max 0 val until = unc_until min size max from if ((until - from) <= MIN_FOR_COPY) slice_sequential(from, until) - else executeAndWaitResult(new Slice(from, until, combinerFactory, splitter) mapResult { _.result }) + else tasksupport.executeAndWaitResult(new Slice(from, until, combinerFactory, splitter) mapResult { _.resultWithTaskSupport }) } private def slice_sequential(from: Int, until: Int): Repr = { @@ -623,11 +641,15 @@ self: ParIterableLike[T, Repr, Sequential] => cb += it.next left -= 1 } - cb.result + cb.resultWithTaskSupport } def splitAt(n: Int): (Repr, Repr) = { - executeAndWaitResult(new SplitAt(n, combinerFactory, combinerFactory, splitter) mapResult { p => (p._1.result, p._2.result) }) + tasksupport.executeAndWaitResult( + new SplitAt(n, combinerFactory, combinerFactory, splitter) mapResult { + p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport) + } + ) } /** Computes a prefix scan of the elements of the collection. @@ -645,20 +667,19 @@ self: ParIterableLike[T, Repr, Sequential] => * * @return a new $coll containing the prefix scan of the elements in this $coll */ - def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (bf.isParallel) { - val cbf = bf.asParallel - if (parallelismLevel > 1) { - if (size > 0) executeAndWaitResult(new CreateScanTree(0, size, z, op, splitter) mapResult { - tree => executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => cbf(repr))) mapResult { - cb => cb.result + def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (bf(repr).isCombiner) { + if (tasksupport.parallelismLevel > 1) { + if (size > 0) tasksupport.executeAndWaitResult(new CreateScanTree(0, size, z, op, splitter) mapResult { + tree => tasksupport.executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => bf(repr).asCombiner)) mapResult { + cb => cb.resultWithTaskSupport }) - }) else (cbf(self.repr) += z).result - } else seq.scan(z)(op)(bf2seq(bf)) - } else seq.scan(z)(op)(bf2seq(bf)) + }) else setTaskSupport((bf(repr) += z).result, tasksupport) + } else setTaskSupport(seq.scan(z)(op)(bf2seq(bf)), tasksupport) + } else setTaskSupport(seq.scan(z)(op)(bf2seq(bf)), tasksupport) - def scanLeft[S, That](z: S)(op: (S, T) => S)(implicit bf: CanBuildFrom[Repr, S, That]) = seq.scanLeft(z)(op)(bf2seq(bf)) + def scanLeft[S, That](z: S)(op: (S, T) => S)(implicit bf: CanBuildFrom[Repr, S, That]) = setTaskSupport(seq.scanLeft(z)(op)(bf2seq(bf)), tasksupport) - def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[Repr, S, That]) = seq.scanRight(z)(op)(bf2seq(bf)) + def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[Repr, S, That]) = setTaskSupport(seq.scanRight(z)(op)(bf2seq(bf)), tasksupport) /** Takes the longest prefix of elements that satisfy the predicate. * @@ -672,11 +693,15 @@ self: ParIterableLike[T, Repr, Sequential] => val cbf = combinerFactory if (cbf.doesShareCombiners) { val parseqspan = toSeq.takeWhile(pred) - executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter) mapResult { _.result }) + tasksupport.executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter) mapResult { + _.resultWithTaskSupport + }) } else { val cntx = new DefaultSignalling with AtomicIndexFlag cntx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx) mapResult { _._1.result }) + tasksupport.executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx) mapResult { + _._1.resultWithTaskSupport + }) } } @@ -693,17 +718,17 @@ self: ParIterableLike[T, Repr, Sequential] => val cbf = combinerFactory if (cbf.doesShareCombiners) { val (xs, ys) = toSeq.span(pred) - val copyxs = new Copy(combinerFactory, xs.splitter) mapResult { _.result } - val copyys = new Copy(combinerFactory, ys.splitter) mapResult { _.result } + val copyxs = new Copy(combinerFactory, xs.splitter) mapResult { _.resultWithTaskSupport } + val copyys = new Copy(combinerFactory, ys.splitter) mapResult { _.resultWithTaskSupport } val copyall = (copyxs parallel copyys) { (xr, yr) => (xr, yr) } - executeAndWaitResult(copyall) + tasksupport.executeAndWaitResult(copyall) } else { val cntx = new DefaultSignalling with AtomicIndexFlag cntx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult { - p => (p._1.result, p._2.result) + tasksupport.executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult { + p => (p._1.resultWithTaskSupport, p._2.resultWithTaskSupport) }) } } @@ -721,7 +746,11 @@ self: ParIterableLike[T, Repr, Sequential] => def dropWhile(pred: T => Boolean): Repr = { val cntx = new DefaultSignalling with AtomicIndexFlag cntx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult { _._2.result }) + tasksupport.executeAndWaitResult( + new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult { + _._2.resultWithTaskSupport + } + ) } def copyToArray[U >: T](xs: Array[U]) = copyToArray(xs, 0) @@ -729,31 +758,33 @@ self: ParIterableLike[T, Repr, Sequential] => def copyToArray[U >: T](xs: Array[U], start: Int) = copyToArray(xs, start, xs.length - start) def copyToArray[U >: T](xs: Array[U], start: Int, len: Int) = if (len > 0) { - executeAndWaitResult(new CopyToArray(start, len, xs, splitter)) + tasksupport.executeAndWaitResult(new CopyToArray(start, len, xs, splitter)) } def sameElements[U >: T](that: GenIterable[U]) = seq.sameElements(that) - def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { - val pbf = bf.asParallel + def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf(repr).isCombiner && that.isParSeq) { val thatseq = that.asParSeq - executeAndWaitResult(new Zip(combinerFactory(() => pbf(repr)), splitter, thatseq.splitter) mapResult { _.result }); - } else seq.zip(that)(bf2seq(bf)) + tasksupport.executeAndWaitResult(new Zip(combinerFactory(() => bf(repr).asCombiner), splitter, thatseq.splitter) mapResult { _.resultWithTaskSupport }); + } else setTaskSupport(seq.zip(that)(bf2seq(bf)), tasksupport) def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip immutable.ParRange(0, size, 1, false) - def zipAll[S, U >: T, That](that: GenIterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { - val pbf = bf.asParallel + def zipAll[S, U >: T, That](that: GenIterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf(repr).isCombiner && that.isParSeq) { val thatseq = that.asParSeq - executeAndWaitResult(new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => pbf(repr)), splitter, thatseq.splitter) mapResult { _.result }); - } else seq.zipAll(that, thisElem, thatElem)(bf2seq(bf)) + tasksupport.executeAndWaitResult( + new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => bf(repr).asCombiner), splitter, thatseq.splitter) mapResult { + _.resultWithTaskSupport + } + ); + } else setTaskSupport(seq.zipAll(that, thisElem, thatElem)(bf2seq(bf)), tasksupport) protected def toParCollection[U >: T, That](cbf: () => Combiner[U, That]): That = { - executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter) mapResult { _.result }); + tasksupport.executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter) mapResult { _.resultWithTaskSupport }); } protected def toParMap[K, V, That](cbf: () => Combiner[(K, V), That])(implicit ev: T <:< (K, V)): That = { - executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev) mapResult { _.result }) + tasksupport.executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev) mapResult { _.resultWithTaskSupport }) } def view = new ParIterableView[T, Repr, Sequential] { @@ -810,7 +841,7 @@ self: ParIterableLike[T, Repr, Sequential] => extends StrictSplitterCheckTask[R, Tp] { protected[this] val pit: IterableSplitter[T] protected[this] def newSubtask(p: IterableSplitter[T]): Accessor[R, Tp] - def shouldSplitFurther = pit.shouldSplitFurther(self.repr, parallelismLevel) + def shouldSplitFurther = pit.shouldSplitFurther(self.repr, tasksupport.parallelismLevel) def split = pit.splitWithSignalling.map(newSubtask(_)) // default split procedure private[parallel] override def signalAbort = pit.abort override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")(supername: " + super.toString + ")" @@ -844,8 +875,8 @@ self: ParIterableLike[T, Repr, Sequential] => (f: First, s: Second) extends Composite[FR, SR, R, First, Second](f, s) { def leaf(prevr: Option[R]) = { - executeAndWaitResult(ft) - executeAndWaitResult(st) + tasksupport.executeAndWaitResult(ft) + tasksupport.executeAndWaitResult(st) mergeSubtasks } } @@ -855,8 +886,8 @@ self: ParIterableLike[T, Repr, Sequential] => (f: First, s: Second) extends Composite[FR, SR, R, First, Second](f, s) { def leaf(prevr: Option[R]) = { - val ftfuture = execute(ft) - executeAndWaitResult(st) + val ftfuture = tasksupport.execute(ft) + tasksupport.executeAndWaitResult(st) ftfuture() mergeSubtasks } @@ -867,7 +898,7 @@ self: ParIterableLike[T, Repr, Sequential] => @volatile var result: R1 = null.asInstanceOf[R1] def map(r: R): R1 def leaf(prevr: Option[R1]) = { - val initialResult = executeAndWaitResult(inner) + val initialResult = tasksupport.executeAndWaitResult(inner) result = map(initialResult) } private[parallel] override def signalAbort() { @@ -1339,7 +1370,7 @@ self: ParIterableLike[T, Repr, Sequential] => /* scan tree */ - protected[this] def scanBlockSize = (thresholdFromSize(size, parallelismLevel) / 2) max 1 + protected[this] def scanBlockSize = (thresholdFromSize(size, tasksupport.parallelismLevel) / 2) max 1 protected[this] trait ScanTree[U >: T] { def beginsAt: Int diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala index 1d7659922c..536139c812 100644 --- a/src/library/scala/collection/parallel/ParIterableViewLike.scala +++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala @@ -47,7 +47,6 @@ extends GenIterableView[T, Coll] with ParIterableLike[T, This, ThisSeq] { self => - import tasksupport._ override def foreach[U](f: T => U): Unit = super[ParIterableLike].foreach(f) override protected[this] def newCombiner: Combiner[T, This] = throw new UnsupportedOperationException(this + ".newCombiner"); @@ -135,7 +134,7 @@ self => newZippedAllTryParSeq(that, thisElem, thatElem).asInstanceOf[That] override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf => - executeAndWaitResult(new Force(pbf, splitter).mapResult(_.result).asInstanceOf[Task[That, ResultMapping[_, Force[U, That], That]]]) + tasksupport.executeAndWaitResult(new Force(pbf, splitter).mapResult(_.result).asInstanceOf[Task[That, ResultMapping[_, Force[U, That], That]]]) } otherwise { val b = bf(underlying) b ++= this.iterator diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 6a5ee5c69b..3d498ab41b 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -44,8 +44,7 @@ trait ParSeqLike[+T, +Repr <: ParSeq[T], +Sequential <: Seq[T] with SeqLike[T, S extends scala.collection.GenSeqLike[T, Repr] with ParIterableLike[T, Repr, Sequential] { self => - import tasksupport._ - + type SuperParIterator = IterableSplitter[T] /** A more refined version of the iterator found in the `ParallelIterable` trait, @@ -107,7 +106,7 @@ self => val realfrom = if (from < 0) 0 else from val ctx = new DefaultSignalling with AtomicIndexFlag ctx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new SegmentLength(p, 0, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))._1 + tasksupport.executeAndWaitResult(new SegmentLength(p, 0, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx))._1 } /** Finds the first element satisfying some predicate. @@ -125,7 +124,7 @@ self => val realfrom = if (from < 0) 0 else from val ctx = new DefaultSignalling with AtomicIndexFlag ctx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx)) + tasksupport.executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplitWithSignalling(realfrom, length - realfrom)(1) assign ctx)) } /** Finds the last element satisfying some predicate. @@ -143,18 +142,20 @@ self => val until = if (end >= length) length else end + 1 val ctx = new DefaultSignalling with AtomicIndexFlag ctx.setIndexFlag(Int.MinValue) - executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplitWithSignalling(until, length - until)(0) assign ctx)) + tasksupport.executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplitWithSignalling(until, length - until)(0) assign ctx)) } def reverse: Repr = { - executeAndWaitResult(new Reverse(() => newCombiner, splitter) mapResult { _.result }) + tasksupport.executeAndWaitResult(new Reverse(() => newCombiner, splitter) mapResult { _.resultWithTaskSupport }) } def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) { - executeAndWaitResult(new ReverseMap[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.result }) - } else seq.reverseMap(f)(bf2seq(bf)) + tasksupport.executeAndWaitResult( + new ReverseMap[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.resultWithTaskSupport } + ) + } else setTaskSupport(seq.reverseMap(f)(bf2seq(bf)), tasksupport) /*bf ifParallel { pbf => - executeAndWaitResult(new ReverseMap[S, That](f, pbf, splitter) mapResult { _.result }) + tasksupport.executeAndWaitResult(new ReverseMap[S, That](f, pbf, splitter) mapResult { _.result }) } otherwise seq.reverseMap(f)(bf2seq(bf))*/ /** Tests whether this $coll contains the given sequence at a given index. @@ -172,13 +173,15 @@ self => else if (pthat.length > length - offset) false else { val ctx = new DefaultSignalling with VolatileAbort - executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(offset, pthat.length)(1) assign ctx, pthat.splitter)) + tasksupport.executeAndWaitResult( + new SameElements(splitter.psplitWithSignalling(offset, pthat.length)(1) assign ctx, pthat.splitter) + ) } } otherwise seq.startsWith(that, offset) override def sameElements[U >: T](that: GenIterable[U]): Boolean = that ifParSeq { pthat => val ctx = new DefaultSignalling with VolatileAbort - length == pthat.length && executeAndWaitResult(new SameElements(splitter assign ctx, pthat.splitter)) + length == pthat.length && tasksupport.executeAndWaitResult(new SameElements(splitter assign ctx, pthat.splitter)) } otherwise seq.sameElements(that) /** Tests whether this $coll ends with the given parallel sequence. @@ -195,25 +198,24 @@ self => else { val ctx = new DefaultSignalling with VolatileAbort val tlen = that.length - executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(length - tlen, tlen)(1) assign ctx, pthat.splitter)) + tasksupport.executeAndWaitResult(new SameElements(splitter.psplitWithSignalling(length - tlen, tlen)(1) assign ctx, pthat.splitter)) } } otherwise seq.endsWith(that) def patch[U >: T, That](from: Int, patch: GenSeq[U], replaced: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = { val realreplaced = replaced min (length - from) - if (patch.isParSeq && bf.isParallel && (size - realreplaced + patch.size) > MIN_FOR_COPY) { + if (patch.isParSeq && bf(repr).isCombiner && (size - realreplaced + patch.size) > MIN_FOR_COPY) { val that = patch.asParSeq - val pbf = bf.asParallel val pits = splitter.psplitWithSignalling(from, replaced, length - from - realreplaced) - val cfactory = combinerFactory(() => pbf(repr)) + val cfactory = combinerFactory(() => bf(repr).asCombiner) val copystart = new Copy[U, That](cfactory, pits(0)) val copymiddle = wrap { val tsk = new that.Copy[U, That](cfactory, that.splitter) tasksupport.executeAndWaitResult(tsk) } val copyend = new Copy[U, That](cfactory, pits(2)) - executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { - _.result + tasksupport.executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { + _.resultWithTaskSupport }) } else patch_sequential(from, patch.seq, replaced) } @@ -226,14 +228,18 @@ self => b ++= pits(0) b ++= patch b ++= pits(2) - b.result + setTaskSupport(b.result, tasksupport) } def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (bf(repr).isCombiner) { - executeAndWaitResult(new Updated(index, elem, () => bf(repr).asCombiner, splitter) mapResult { _.result }) - } else seq.updated(index, elem)(bf2seq(bf)) + tasksupport.executeAndWaitResult( + new Updated(index, elem, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { + _.resultWithTaskSupport + } + ) + } else setTaskSupport(seq.updated(index, elem)(bf2seq(bf)), tasksupport) /*bf ifParallel { pbf => - executeAndWaitResult(new Updated(index, elem, pbf, splitter) mapResult { _.result }) + tasksupport.executeAndWaitResult(new Updated(index, elem, pbf, splitter) mapResult { _.result }) } otherwise seq.updated(index, elem)(bf2seq(bf))*/ def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = { @@ -248,10 +254,13 @@ self => patch(length, new immutable.Repetition(elem, len - length), 0) } else patch(length, Nil, 0); - override def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { - val pbf = bf.asParallel + override def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf(repr).isCombiner && that.isParSeq) { val thatseq = that.asParSeq - executeAndWaitResult(new Zip(length min thatseq.length, pbf, splitter, thatseq.splitter) mapResult { _.result }); + tasksupport.executeAndWaitResult( + new Zip(length min thatseq.length, combinerFactory(() => bf(repr).asCombiner), splitter, thatseq.splitter) mapResult { + _.resultWithTaskSupport + } + ); } else super.zip(that)(bf) /** Tests whether every element of this $coll relates to the @@ -268,7 +277,7 @@ self => */ def corresponds[S](that: GenSeq[S])(p: (T, S) => Boolean): Boolean = that ifParSeq { pthat => val ctx = new DefaultSignalling with VolatileAbort - length == pthat.length && executeAndWaitResult(new Corresponds(p, splitter assign ctx, pthat.splitter)) + length == pthat.length && tasksupport.executeAndWaitResult(new Corresponds(p, splitter assign ctx, pthat.splitter)) } otherwise seq.corresponds(that)(p) def diff[U >: T](that: GenSeq[U]): Repr = sequentially { @@ -424,7 +433,7 @@ self => override def requiresStrictSplitters = true } - protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: () => Combiner[U, That], protected[this] val pit: SeqSplitter[T]) + protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CombinerFactory[U, That], protected[this] val pit: SeqSplitter[T]) extends Transformer[Combiner[U, That], Updated[U, That]] { @volatile var result: Combiner[U, That] = null def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf()) @@ -437,10 +446,10 @@ self => override def requiresStrictSplitters = true } - protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S]) + protected[this] class Zip[U >: T, S, That](len: Int, cf: CombinerFactory[(U, S), That], protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S]) extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] { @volatile var result: Result = null - def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, pbf(self.repr)) + def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, cf()) protected[this] def newSubtask(p: SuperParIterator) = unsupported override def split = { val fp = len / 2 @@ -448,8 +457,8 @@ self => val pits = pit.psplitWithSignalling(fp, sp) val opits = otherpit.psplitWithSignalling(fp, sp) Seq( - new Zip(fp, pbf, pits(0), opits(0)), - new Zip(sp, pbf, pits(1), opits(1)) + new Zip(fp, cf, pits(0), opits(0)), + new Zip(sp, cf, pits(1), opits(1)) ) } override def merge(that: Zip[U, S, That]) = result = result combine that.result diff --git a/src/library/scala/collection/parallel/ParSeqViewLike.scala b/src/library/scala/collection/parallel/ParSeqViewLike.scala index 6fdc181793..e0d1a7d6ff 100644 --- a/src/library/scala/collection/parallel/ParSeqViewLike.scala +++ b/src/library/scala/collection/parallel/ParSeqViewLike.scala @@ -38,7 +38,6 @@ extends GenSeqView[T, Coll] with ParSeqLike[T, This, ThisSeq] { self => - import tasksupport._ trait Transformed[+S] extends ParSeqView[S, Coll, CollSeq] with super[ParIterableView].Transformed[S] with super[GenSeqViewLike].Transformed[S] { @@ -170,7 +169,7 @@ self => override def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisParSeq.scanRight(z)(op)).asInstanceOf[That] override def groupBy[K](f: T => K): immutable.ParMap[K, This] = thisParSeq.groupBy(f).map(kv => (kv._1, newForced(kv._2).asInstanceOf[This])) override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf => - executeAndWaitResult(new Force(pbf, splitter).mapResult(_.result).asInstanceOf[Task[That, _]]) + tasksupport.executeAndWaitResult(new Force(pbf, splitter).mapResult(_.result).asInstanceOf[Task[That, _]]) } otherwise { val b = bf(underlying) b ++= this.iterator diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index b705909cad..e32ac443ae 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -6,102 +6,99 @@ ** |/ ** \* */ - package scala.collection.parallel - import scala.concurrent.forkjoin._ import scala.util.control.Breaks._ - import annotation.unchecked.uncheckedVariance +trait Task[R, +Tp] { + type Result = R -/** A trait that declares task execution capabilities used - * by parallel collections. - */ -trait Tasks { - - private[parallel] val debugMessages = collection.mutable.ArrayBuffer[String]() + def repr = this.asInstanceOf[Tp] - private[parallel] def debuglog(s: String) = synchronized { - debugMessages += s - } - - trait Task[R, +Tp] { - type Result = R - - def repr = this.asInstanceOf[Tp] - - /** Body of the task - non-divisible unit of work done by this task. - * Optionally is provided with the result from the previous completed task - * or `None` if there was no previous task (or the previous task is uncompleted or unknown). - */ - def leaf(result: Option[R]) + /** Body of the task - non-divisible unit of work done by this task. + * Optionally is provided with the result from the previous completed task + * or `None` if there was no previous task (or the previous task is uncompleted or unknown). + */ + def leaf(result: Option[R]) - /** A result that can be accessed once the task is completed. */ - var result: R + /** A result that can be accessed once the task is completed. */ + var result: R - /** Decides whether or not this task should be split further. */ - def shouldSplitFurther: Boolean + /** Decides whether or not this task should be split further. */ + def shouldSplitFurther: Boolean - /** Splits this task into a list of smaller tasks. */ - private[parallel] def split: Seq[Task[R, Tp]] + /** Splits this task into a list of smaller tasks. */ + private[parallel] def split: Seq[Task[R, Tp]] - /** Read of results of `that` task and merge them into results of this one. */ - private[parallel] def merge(that: Tp @uncheckedVariance) {} + /** Read of results of `that` task and merge them into results of this one. */ + private[parallel] def merge(that: Tp @uncheckedVariance) {} - // exception handling mechanism - @volatile var throwable: Throwable = null - def forwardThrowable() = if (throwable != null) throw throwable + // exception handling mechanism + @volatile var throwable: Throwable = null + def forwardThrowable() = if (throwable != null) throw throwable - // tries to do the leaf computation, storing the possible exception - private[parallel] def tryLeaf(lastres: Option[R]) { - try { - tryBreakable { - leaf(lastres) - result = result // ensure that effects of `leaf` are visible to readers of `result` - } catchBreak { - signalAbort - } - } catch { - case thr: Exception => - result = result // ensure that effects of `leaf` are visible - throwable = thr - signalAbort + // tries to do the leaf computation, storing the possible exception + private[parallel] def tryLeaf(lastres: Option[R]) { + try { + tryBreakable { + leaf(lastres) + result = result // ensure that effects of `leaf` are visible to readers of `result` + } catchBreak { + signalAbort } + } catch { + case thr: Exception => + result = result // ensure that effects of `leaf` are visible + throwable = thr + signalAbort } + } - private[parallel] def tryMerge(t: Tp @uncheckedVariance) { - val that = t.asInstanceOf[Task[R, Tp]] - val local = result // ensure that any effects of modifying `result` are detected - // checkMerge(that) - if (this.throwable == null && that.throwable == null) merge(t) - mergeThrowables(that) - } + private[parallel] def tryMerge(t: Tp @uncheckedVariance) { + val that = t.asInstanceOf[Task[R, Tp]] + val local = result // ensure that any effects of modifying `result` are detected + // checkMerge(that) + if (this.throwable == null && that.throwable == null) merge(t) + mergeThrowables(that) + } - private def checkMerge(that: Task[R, Tp] @uncheckedVariance) { - if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) { - println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable) - } else if (this.throwable != null || that.throwable != null) { - println("merging this: " + this + " with thr: " + this.throwable + " with " + that + ", thr=" + that.throwable) - } + private def checkMerge(that: Task[R, Tp] @uncheckedVariance) { + if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) { + println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable) + } else if (this.throwable != null || that.throwable != null) { + println("merging this: " + this + " with thr: " + this.throwable + " with " + that + ", thr=" + that.throwable) } + } - private[parallel] def mergeThrowables(that: Task[_, _]) { - if (this.throwable != null && that.throwable != null) { - // merge exceptions, since there were multiple exceptions - this.throwable = this.throwable alongWith that.throwable - } else if (that.throwable != null) this.throwable = that.throwable + private[parallel] def mergeThrowables(that: Task[_, _]) { + if (this.throwable != null && that.throwable != null) { + // merge exceptions, since there were multiple exceptions + this.throwable = this.throwable alongWith that.throwable + } else if (that.throwable != null) this.throwable = that.throwable else this.throwable = this.throwable - } + } + + // override in concrete task implementations to signal abort to other tasks + private[parallel] def signalAbort() {} +} + - // override in concrete task implementations to signal abort to other tasks - private[parallel] def signalAbort() {} +/** A trait that declares task execution capabilities used + * by parallel collections. + */ +trait Tasks { + + private[parallel] val debugMessages = collection.mutable.ArrayBuffer[String]() + + private[parallel] def debuglog(s: String) = synchronized { + debugMessages += s } trait TaskImpl[R, +Tp] { diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index 7adf51cffb..266b179401 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -8,6 +8,8 @@ package scala.collection.parallel.immutable + + import scala.collection.parallel.ParMapLike import scala.collection.parallel.Combiner import scala.collection.parallel.IterableSplitter @@ -19,6 +21,9 @@ import scala.collection.generic.GenericParMapTemplate import scala.collection.generic.GenericParMapCompanion import scala.collection.immutable.{ HashMap, TrieIterator } import annotation.unchecked.uncheckedVariance +import collection.parallel.Task + + /** Immutable parallel hash map, based on hash tries. * @@ -153,7 +158,6 @@ private[parallel] abstract class HashMapCombiner[K, V] extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), HashMapCombiner[K, V]](HashMapCombiner.rootsize) { //self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => import HashMapCombiner._ - import collection.parallel.tasksupport._ val emptyTrie = HashMap.empty[K, V] def +=(elem: (K, V)) = { @@ -173,7 +177,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), Has val bucks = buckets.filter(_ != null).map(_.headPtr) val root = new Array[HashMap[K, V]](bucks.length) - executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length)) + combinerTaskSupport.executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length)) var bitmap = 0 var i = 0 @@ -195,7 +199,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), Has val bucks = buckets.filter(_ != null).map(_.headPtr) val root = new Array[HashMap[K, AnyRef]](bucks.length) - executeAndWaitResult(new CreateGroupedTrie(cbf, bucks, root, 0, bucks.length)) + combinerTaskSupport.executeAndWaitResult(new CreateGroupedTrie(cbf, bucks, root, 0, bucks.length)) var bitmap = 0 var i = 0 @@ -256,7 +260,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), Has val fp = howmany / 2 List(new CreateTrie(bucks, root, offset, fp), new CreateTrie(bucks, root, offset + fp, howmany - fp)) } - def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, combinerTaskSupport.parallelismLevel) } class CreateGroupedTrie[Repr](cbf: () => Combiner[V, Repr], bucks: Array[Unrolled[(K, V)]], root: Array[HashMap[K, AnyRef]], offset: Int, howmany: Int) @@ -321,7 +325,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), Has val fp = howmany / 2 List(new CreateGroupedTrie(cbf, bucks, root, offset, fp), new CreateGroupedTrie(cbf, bucks, root, offset + fp, howmany - fp)) } - def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, combinerTaskSupport.parallelismLevel) } } diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala index 1cf0ccd391..0d7f04976e 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala @@ -8,6 +8,8 @@ package scala.collection.parallel.immutable + + import scala.collection.parallel.ParSetLike import scala.collection.parallel.Combiner import scala.collection.parallel.IterableSplitter @@ -19,6 +21,9 @@ import scala.collection.generic.GenericParTemplate import scala.collection.generic.GenericParCompanion import scala.collection.generic.GenericCompanion import scala.collection.immutable.{ HashSet, TrieIterator } +import collection.parallel.Task + + /** Immutable parallel hash set, based on hash tries. * @@ -127,7 +132,6 @@ private[immutable] abstract class HashSetCombiner[T] extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombiner[T]](HashSetCombiner.rootsize) { //self: EnvironmentPassingCombiner[T, ParHashSet[T]] => import HashSetCombiner._ - import collection.parallel.tasksupport._ val emptyTrie = HashSet.empty[T] def +=(elem: T) = { @@ -147,7 +151,7 @@ extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombine val bucks = buckets.filter(_ != null).map(_.headPtr) val root = new Array[HashSet[T]](bucks.length) - executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length)) + combinerTaskSupport.executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length)) var bitmap = 0 var i = 0 @@ -202,7 +206,7 @@ extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombine val fp = howmany / 2 List(new CreateTrie(bucks, root, offset, fp), new CreateTrie(bucks, root, offset + fp, howmany - fp)) } - def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, combinerTaskSupport.parallelismLevel) } } diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index 72a8184b10..5c3da66be0 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -21,6 +21,7 @@ import scala.collection.generic.Sizing import scala.collection.parallel.Combiner import scala.collection.parallel.SeqSplitter import scala.collection.parallel.ParSeqLike +import scala.collection.parallel.Task import scala.collection.parallel.CHECK_RATE import scala.collection.mutable.ArraySeq import scala.collection.mutable.Builder @@ -56,7 +57,6 @@ extends ParSeq[T] with Serializable { self => - import collection.parallel.tasksupport._ @transient private var array: Array[Any] = arrayseq.array.asInstanceOf[Array[Any]] @@ -584,22 +584,22 @@ self => val targetarr = targarrseq.array.asInstanceOf[Array[Any]] // fill it in parallel - executeAndWaitResult(new Map[S](f, targetarr, 0, length)) + tasksupport.executeAndWaitResult(new Map[S](f, targetarr, 0, length)) // wrap it into a parallel array (new ParArray[S](targarrseq)).asInstanceOf[That] } else super.map(f)(bf) override def scan[U >: T, That](z: U)(op: (U, U) => U)(implicit cbf: CanBuildFrom[ParArray[T], U, That]): That = - if (parallelismLevel > 1 && buildsArray(cbf(repr))) { + if (tasksupport.parallelismLevel > 1 && buildsArray(cbf(repr))) { // reserve an array val targarrseq = new ArraySeq[U](length + 1) val targetarr = targarrseq.array.asInstanceOf[Array[Any]] targetarr(0) = z // do a parallel prefix scan - if (length > 0) executeAndWaitResult(new CreateScanTree[U](0, size, z, op, splitter) mapResult { - tree => executeAndWaitResult(new ScanToArray(tree, z, op, targetarr)) + if (length > 0) tasksupport.executeAndWaitResult(new CreateScanTree[U](0, size, z, op, splitter) mapResult { + tree => tasksupport.executeAndWaitResult(new ScanToArray(tree, z, op, targetarr)) }) // wrap the array into a parallel array @@ -661,7 +661,7 @@ self => val fp = howmany / 2 List(new Map(f, targetarr, offset, fp), new Map(f, targetarr, offset + fp, howmany - fp)) } - def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(length, parallelismLevel) + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(length, tasksupport.parallelismLevel) } /* serialization */ diff --git a/src/library/scala/collection/parallel/mutable/ParCtrie.scala b/src/library/scala/collection/parallel/mutable/ParCtrie.scala index cec2e6886d..178424decc 100644 --- a/src/library/scala/collection/parallel/mutable/ParCtrie.scala +++ b/src/library/scala/collection/parallel/mutable/ParCtrie.scala @@ -13,6 +13,7 @@ package scala.collection.parallel.mutable import scala.collection.generic._ import scala.collection.parallel.Combiner import scala.collection.parallel.IterableSplitter +import scala.collection.parallel.Task import scala.collection.mutable.BasicNode import scala.collection.mutable.TNode import scala.collection.mutable.LNode @@ -40,7 +41,6 @@ extends ParMap[K, V] with ParCtrieCombiner[K, V] with Serializable { - import collection.parallel.tasksupport._ def this() = this(new Ctrie) @@ -83,7 +83,7 @@ extends ParMap[K, V] case tn: TNode[_, _] => tn.cachedSize(ctrie) case ln: LNode[_, _] => ln.cachedSize(ctrie) case cn: CNode[_, _] => - executeAndWaitResult(new Size(0, cn.array.length, cn.array)) + tasksupport.executeAndWaitResult(new Size(0, cn.array.length, cn.array)) cn.cachedSize(ctrie) } } diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala index 21a5b05749..6ce6c45460 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala @@ -17,6 +17,7 @@ import collection.mutable.DefaultEntry import collection.mutable.HashEntry import collection.mutable.HashTable import collection.mutable.UnrolledBuffer +import collection.parallel.Task @@ -156,8 +157,6 @@ private[mutable] abstract class ParHashMapCombiner[K, V](private val tableLoadFa extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntry[K, V], ParHashMapCombiner[K, V]](ParHashMapCombiner.numblocks) with collection.mutable.HashTable.HashUtils[K] { -//self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => - import collection.parallel.tasksupport._ private var mask = ParHashMapCombiner.discriminantmask private var nonmasklen = ParHashMapCombiner.nonmasklength private var seedvalue = 27 @@ -179,7 +178,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr // construct table val table = new AddingHashTable(size, tableLoadFactor, seedvalue) val bucks = buckets.map(b => if (b ne null) b.headPtr else null) - val insertcount = executeAndWaitResult(new FillBlocks(bucks, table, 0, bucks.length)) + val insertcount = combinerTaskSupport.executeAndWaitResult(new FillBlocks(bucks, table, 0, bucks.length)) table.setSize(insertcount) // TODO compare insertcount and size to see if compression is needed val c = table.hashTableContents @@ -300,7 +299,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr override def merge(that: FillBlocks) { this.result += that.result } - def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, parallelismLevel) + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, combinerTaskSupport.parallelismLevel) } } diff --git a/src/library/scala/collection/parallel/mutable/ParHashSet.scala b/src/library/scala/collection/parallel/mutable/ParHashSet.scala index 6c5f513ad0..811fc8bfe7 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashSet.scala @@ -8,10 +8,15 @@ package scala.collection.parallel.mutable + + import collection.generic._ import collection.mutable.FlatHashTable import collection.parallel.Combiner import collection.mutable.UnrolledBuffer +import collection.parallel.Task + + /** A parallel hash set. * @@ -113,7 +118,6 @@ private[mutable] abstract class ParHashSetCombiner[T](private val tableLoadFacto extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, ParHashSetCombiner[T]](ParHashSetCombiner.numblocks) with collection.mutable.FlatHashTable.HashUtils[T] { //self: EnvironmentPassingCombiner[T, ParHashSet[T]] => - import collection.parallel.tasksupport._ private var mask = ParHashSetCombiner.discriminantmask private var nonmasklen = ParHashSetCombiner.nonmasklength private var seedvalue = 27 @@ -139,7 +143,7 @@ with collection.mutable.FlatHashTable.HashUtils[T] { private def parPopulate: FlatHashTable.Contents[T] = { // construct it in parallel val table = new AddingFlatHashTable(size, tableLoadFactor, seedvalue) - val (inserted, leftovers) = executeAndWaitResult(new FillBlocks(buckets, table, 0, buckets.length)) + val (inserted, leftovers) = combinerTaskSupport.executeAndWaitResult(new FillBlocks(buckets, table, 0, buckets.length)) var leftinserts = 0 for (elem <- leftovers) leftinserts += table.insertEntry(0, table.tableLength, elem.asInstanceOf[T]) table.setSize(leftinserts + inserted) @@ -304,7 +308,7 @@ with collection.mutable.FlatHashTable.HashUtils[T] { // the total number of successfully inserted elements is adjusted accordingly result = (this.result._1 + that.result._1 + inserted, remainingLeftovers concat that.result._2) } - def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, parallelismLevel) + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, combinerTaskSupport.parallelismLevel) } } diff --git a/src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala index eadc93d422..01eb17024e 100644 --- a/src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala +++ b/src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala @@ -8,18 +8,20 @@ package scala.collection.parallel.mutable + + import scala.collection.generic.Sizing import scala.collection.mutable.ArraySeq import scala.collection.mutable.ArrayBuffer import scala.collection.parallel.TaskSupport -//import scala.collection.parallel.EnvironmentPassingCombiner import scala.collection.parallel.unsupportedop import scala.collection.parallel.Combiner +import scala.collection.parallel.Task + + /** An array combiner that uses a chain of arraybuffers to store elements. */ trait ResizableParArrayCombiner[T] extends LazyCombiner[T, ParArray[T], ExposedArrayBuffer[T]] { -//self: EnvironmentPassingCombiner[T, ParArray[T]] => - import collection.parallel.tasksupport._ override def sizeHint(sz: Int) = if (chain.length == 1) chain(0).sizeHint(sz) @@ -30,7 +32,7 @@ trait ResizableParArrayCombiner[T] extends LazyCombiner[T, ParArray[T], ExposedA val arrayseq = new ArraySeq[T](size) val array = arrayseq.array.asInstanceOf[Array[Any]] - executeAndWaitResult(new CopyChainToArray(array, 0, size)) + combinerTaskSupport.executeAndWaitResult(new CopyChainToArray(array, 0, size)) new ParArray(arrayseq) } else { // optimisation if there is only 1 array @@ -79,7 +81,7 @@ trait ResizableParArrayCombiner[T] extends LazyCombiner[T, ParArray[T], ExposedA val fp = howmany / 2 List(new CopyChainToArray(array, offset, fp), new CopyChainToArray(array, offset + fp, howmany - fp)) } - def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, parallelismLevel) + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, combinerTaskSupport.parallelismLevel) } } diff --git a/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala index dc583fb4e7..410b542a68 100644 --- a/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala +++ b/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala @@ -18,9 +18,9 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.UnrolledBuffer import scala.collection.mutable.UnrolledBuffer.Unrolled import scala.collection.parallel.TaskSupport -//import scala.collection.parallel.EnvironmentPassingCombiner import scala.collection.parallel.unsupportedop import scala.collection.parallel.Combiner +import scala.collection.parallel.Task @@ -40,8 +40,6 @@ extends Combiner[T, ParArray[T]] { // because size is doubling, random access is O(logn)! val buff = new DoublingUnrolledBuffer[Any] - import collection.parallel.tasksupport._ - def +=(elem: T) = { buff += elem this @@ -51,7 +49,7 @@ extends Combiner[T, ParArray[T]] { val arrayseq = new ArraySeq[T](size) val array = arrayseq.array.asInstanceOf[Array[Any]] - executeAndWaitResult(new CopyUnrolledToArray(array, 0, size)) + combinerTaskSupport.executeAndWaitResult(new CopyUnrolledToArray(array, 0, size)) new ParArray(arrayseq) } @@ -109,7 +107,7 @@ extends Combiner[T, ParArray[T]] { val fp = howmany / 2 List(new CopyUnrolledToArray(array, offset, fp), new CopyUnrolledToArray(array, offset + fp, howmany - fp)) } - def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, parallelismLevel) + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, combinerTaskSupport.parallelismLevel) override def toString = "CopyUnrolledToArray(" + offset + ", " + howmany + ")" } } diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index 8f19d0ecdb..8f49b80c93 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -46,8 +46,16 @@ package object parallel { else new ThreadPoolTaskSupport } else new ThreadPoolTaskSupport - val tasksupport = getTaskSupport - + val defaultTaskSupport: TaskSupport = getTaskSupport + + def setTaskSupport[Coll](c: Coll, t: TaskSupport): Coll = { + c match { + case pc: ParIterableLike[_, _, _] => pc.tasksupport = t + case _ => // do nothing + } + c + } + /* implicit conversions */ implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new FactoryOps[From, Elem, To] { -- cgit v1.2.3