diff options
4 files changed, 193 insertions, 119 deletions
diff --git a/src/library/scala/collection/parallel/Combiner.scala b/src/library/scala/collection/parallel/Combiner.scala index d1453c9ce9..a2cab7eb5d 100644 --- a/src/library/scala/collection/parallel/Combiner.scala +++ b/src/library/scala/collection/parallel/Combiner.scala @@ -62,7 +62,14 @@ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel { * @return the parallel builder containing both the elements of this and the `other` builder */ def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] - + + /** Returns `true` if this combiner has a thread-safe `+=` and is meant to be shared + * across several threads constructing the collection. + * + * By default, this method returns `false`. + */ + def canBeShared: Boolean = false + } diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 390bd72ab5..75f4552076 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -165,13 +165,13 @@ extends GenIterableLike[T, Repr] with HasNewCombiner[T, Repr] { self: ParIterableLike[T, Repr, Sequential] => - + import tasksupport._ - + def seq: Sequential def repr: Repr = this.asInstanceOf[Repr] - + /** Parallel iterators are split iterators that have additional accessor and * transformer methods defined in terms of methods `next` and `hasNext`. * When creating a new parallel collection, one might want to override these @@ -189,7 +189,7 @@ self: ParIterableLike[T, Repr, Sequential] => def repr = self.repr def split: Seq[IterableSplitter[T]] } - + /** A stackable modification that ensures signal contexts get passed along the iterators. * A self-type requirement in `ParIterator` ensures that this trait gets mixed into * concrete iterators. @@ -211,7 +211,7 @@ self: ParIterableLike[T, Repr, Sequential] => def hasDefiniteSize = true def nonEmpty = size != 0 - + /** Creates a new parallel iterator used to traverse the elements of this parallel collection. * This iterator is more specific than the iterator of the returned by `iterator`, and augmented * with additional accessor and transformer methods. @@ -293,7 +293,7 @@ self: ParIterableLike[T, Repr, Sequential] => trait SignallingOps[PI <: DelegatedSignalling] { def assign(cntx: Signalling): PI } - + /* convenience task operations wrapper */ protected implicit def task2ops[R, Tp](tsk: SSCTask[R, Tp]) = new TaskOps[R, Tp] { def mapResult[R1](mapping: R => R1): ResultMapping[R, Tp, R1] = new ResultMapping[R, Tp, R1](tsk) { @@ -321,7 +321,7 @@ self: ParIterableLike[T, Repr, Sequential] => it } } - + protected implicit def builder2ops[Elem, To](cb: Builder[Elem, To]) = new BuilderOps[Elem, To] { def ifIs[Cmb](isbody: Cmb => Unit) = new Otherwise[Cmb] { def otherwise(notbody: => Unit)(implicit m: ClassManifest[Cmb]) { @@ -331,12 +331,12 @@ self: ParIterableLike[T, Repr, Sequential] => def isCombiner = cb.isInstanceOf[Combiner[_, _]] def asCombiner = cb.asInstanceOf[Combiner[Elem, To]] } - + protected[this] def bf2seq[S, That](bf: CanBuildFrom[Repr, S, That]) = new CanBuildFrom[Sequential, S, That] { def apply(from: Sequential) = bf.apply(from.par.asInstanceOf[Repr]) // !!! we only use this on `this.seq`, and know that `this.seq.par.getClass == this.getClass` def apply() = bf.apply() } - + protected[this] def sequentially[S, That <: Parallel](b: Sequential => Parallelizable[S, That]) = b(seq).par.asInstanceOf[Repr] def mkString(start: String, sep: String, end: String): String = seq.mkString(start, sep, end) @@ -346,7 +346,7 @@ self: ParIterableLike[T, Repr, Sequential] => def mkString: String = seq.mkString("") override def toString = seq.mkString(stringPrefix + "(", ", ", ")") - + def canEqual(other: Any) = true /** Reduces the elements of this sequence using the specified associative binary operator. @@ -383,7 +383,7 @@ self: ParIterableLike[T, Repr, Sequential] => * the elements if the collection is nonempty, and `None` otherwise. */ def reduceOption[U >: T](op: (U, U) => U): Option[U] = if (isEmpty) None else Some(reduce(op)) - + /** Folds the elements of this sequence using the specified associative binary operator. * The order in which the elements are reduced is unspecified and may be nondeterministic. * @@ -434,15 +434,11 @@ self: ParIterableLike[T, Repr, Sequential] => def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = { executeAndWaitResult(new Aggregate(z, seqop, combop, splitter)) } - - def /:[S](z: S)(op: (S, T) => S): S = foldLeft(z)(op) - - def :\[S](z: S)(op: (T, S) => S): S = foldRight(z)(op) - + def foldLeft[S](z: S)(op: (S, T) => S): S = seq.foldLeft(z)(op) - + def foldRight[S](z: S)(op: (T, S) => S): S = seq.foldRight(z)(op) - + def reduceLeft[U >: T](op: (U, T) => U): U = seq.reduceLeft(op) def reduceRight[U >: T](op: (T, U) => U): U = seq.reduceRight(op) @@ -451,20 +447,6 @@ self: ParIterableLike[T, Repr, Sequential] => def reduceRightOption[U >: T](op: (T, U) => U): Option[U] = seq.reduceRightOption(op) - /* - /** Applies a function `f` to all the elements of $coll. Does so in a nondefined order, - * and in parallel. - * - * $undefinedorder - * - * @tparam U the result type of the function applied to each element, which is always discarded - * @param f function applied to each element - */ - def pareach[U](f: T => U): Unit = { - executeAndWaitResult(new Foreach(f, splitter)) - } - */ - /** Applies a function `f` to all the elements of $coll in a sequential order. * * @tparam U the result type of the function applied to each element, which is always discarded @@ -507,21 +489,21 @@ self: ParIterableLike[T, Repr, Sequential] => } def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) { - executeAndWaitResult(new Map[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.result }) + executeAndWaitResult(new Map[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result }) } else seq.map(f)(bf2seq(bf)) /*bf ifParallel { pbf => executeAndWaitResult(new Map[S, That](f, pbf, splitter) mapResult { _.result }) } otherwise seq.map(f)(bf2seq(bf))*/ def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) { - executeAndWaitResult(new Collect[S, That](pf, () => bf(repr).asCombiner, splitter) mapResult { _.result }) + executeAndWaitResult(new Collect[S, That](pf, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result }) } else seq.collect(pf)(bf2seq(bf)) /*bf ifParallel { pbf => executeAndWaitResult(new Collect[S, That](pf, pbf, splitter) mapResult { _.result }) } otherwise seq.collect(pf)(bf2seq(bf))*/ def flatMap[S, That](f: T => GenTraversableOnce[S])(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) { - executeAndWaitResult(new FlatMap[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.result }) + executeAndWaitResult(new FlatMap[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result }) } else seq.flatMap(f)(bf2seq(bf)) /*bf ifParallel { pbf => executeAndWaitResult(new FlatMap[S, That](f, pbf, splitter) mapResult { _.result }) @@ -563,17 +545,48 @@ self: ParIterableLike[T, Repr, Sequential] => def find(pred: T => Boolean): Option[T] = { executeAndWaitResult(new Find(pred, splitter assign new DefaultSignalling with VolatileAbort)) } - - protected[this] def cbfactory ={ - () => newCombiner + + /** Creates a combiner factory. Each combiner factory instance is used + * once per invocation of a parallel transformer method for a single + * collection. + * + * The default combiner factory creates a new combiner every time it + * is requested, unless the combiner is thread-safe as indicated by its + * `canBeShared` method. In this case, the method returns a factory which + * returns the same combiner each time. This is typically done for + * concurrent parallel collections, the combiners of which allow + * thread safe access. + */ + protected[this] def combinerFactory = { + val combiner = newCombiner + if (combiner.canBeShared) new CombinerFactory[T, Repr] { + val shared = combiner + def apply() = shared + def doesShareCombiners = true + } else new CombinerFactory[T, Repr] { + def apply() = newCombiner + def doesShareCombiners = false + } } - + + protected[this] def combinerFactory[S, That](cbf: () => Combiner[S, That]) = { + val combiner = cbf() + if (combiner.canBeShared) new CombinerFactory[S, That] { + val shared = combiner + def apply() = shared + def doesShareCombiners = true + } else new CombinerFactory[S, That] { + def apply() = cbf() + def doesShareCombiners = false + } + } + def filter(pred: T => Boolean): Repr = { - executeAndWaitResult(new Filter(pred, cbfactory, splitter) mapResult { _.result }) + executeAndWaitResult(new Filter(pred, combinerFactory, splitter) mapResult { _.result }) } def filterNot(pred: T => Boolean): Repr = { - executeAndWaitResult(new FilterNot(pred, cbfactory, splitter) mapResult { _.result }) + executeAndWaitResult(new FilterNot(pred, combinerFactory, splitter) mapResult { _.result }) } def ++[U >: T, That](that: GenTraversableOnce[U])(implicit bf: CanBuildFrom[Repr, U, That]): That = { @@ -581,9 +594,10 @@ self: ParIterableLike[T, Repr, Sequential] => // println("case both are parallel") val other = that.asParIterable val pbf = bf.asParallel - val copythis = new Copy(() => pbf(repr), splitter) + val cfactory = combinerFactory(() => pbf(repr)) + val copythis = new Copy(cfactory, splitter) val copythat = wrap { - val othtask = new other.Copy(() => pbf(self.repr), other.splitter) + val othtask = new other.Copy(cfactory, other.splitter) tasksupport.executeAndWaitResult(othtask) } val task = (copythis parallel copythat) { _ combine _ } mapResult { @@ -593,7 +607,7 @@ self: ParIterableLike[T, Repr, Sequential] => } else if (bf.isParallel) { // println("case parallel builder, `that` not parallel") val pbf = bf.asParallel - val copythis = new Copy(() => pbf(repr), splitter) + val copythis = new Copy(combinerFactory(() => pbf(repr)), splitter) val copythat = wrap { val cb = pbf(repr) for (elem <- that.seq) cb += elem @@ -610,19 +624,19 @@ self: ParIterableLike[T, Repr, Sequential] => } def partition(pred: T => Boolean): (Repr, Repr) = { - executeAndWaitResult(new Partition(pred, cbfactory, splitter) mapResult { p => (p._1.result, p._2.result) }) + executeAndWaitResult(new Partition(pred, combinerFactory, combinerFactory, splitter) mapResult { p => (p._1.result, p._2.result) }) } def groupBy[K](f: T => K): immutable.ParMap[K, Repr] = { executeAndWaitResult(new GroupBy(f, () => HashMapCombiner[K, T], splitter) mapResult { - rcb => rcb.groupByKey(cbfactory) + rcb => rcb.groupByKey(() => combinerFactory()) }) } def take(n: Int): Repr = { val actualn = if (size > n) n else size if (actualn < MIN_FOR_COPY) take_sequential(actualn) - else executeAndWaitResult(new Take(actualn, cbfactory, splitter) mapResult { + else executeAndWaitResult(new Take(actualn, combinerFactory, splitter) mapResult { _.result }) } @@ -642,7 +656,7 @@ self: ParIterableLike[T, Repr, Sequential] => def drop(n: Int): Repr = { val actualn = if (size > n) n else size if ((size - actualn) < MIN_FOR_COPY) drop_sequential(actualn) - else executeAndWaitResult(new Drop(actualn, cbfactory, splitter) mapResult { _.result }) + else executeAndWaitResult(new Drop(actualn, combinerFactory, splitter) mapResult { _.result }) } private def drop_sequential(n: Int) = { @@ -657,7 +671,7 @@ self: ParIterableLike[T, Repr, Sequential] => val from = unc_from min size max 0 val until = unc_until min size max from if ((until - from) <= MIN_FOR_COPY) slice_sequential(from, until) - else executeAndWaitResult(new Slice(from, until, cbfactory, splitter) mapResult { _.result }) + else executeAndWaitResult(new Slice(from, until, combinerFactory, splitter) mapResult { _.result }) } private def slice_sequential(from: Int, until: Int): Repr = { @@ -672,7 +686,7 @@ self: ParIterableLike[T, Repr, Sequential] => } def splitAt(n: Int): (Repr, Repr) = { - executeAndWaitResult(new SplitAt(n, cbfactory, splitter) mapResult { p => (p._1.result, p._2.result) }) + executeAndWaitResult(new SplitAt(n, combinerFactory, combinerFactory, splitter) mapResult { p => (p._1.result, p._2.result) }) } /** Computes a prefix scan of the elements of the collection. @@ -694,7 +708,7 @@ self: ParIterableLike[T, Repr, Sequential] => val cbf = bf.asParallel if (parallelismLevel > 1) { if (size > 0) executeAndWaitResult(new CreateScanTree(0, size, z, op, splitter) mapResult { - tree => executeAndWaitResult(new FromScanTree(tree, z, op, cbf) mapResult { + tree => executeAndWaitResult(new FromScanTree(tree, z, op, combinerFactory(() => cbf(repr))) mapResult { cb => cb.result }) }) else (cbf(self.repr) += z).result @@ -714,9 +728,15 @@ self: ParIterableLike[T, Repr, Sequential] => * @return the longest prefix of this $coll of elements that satisy the predicate `pred` */ def takeWhile(pred: T => Boolean): Repr = { - val cntx = new DefaultSignalling with AtomicIndexFlag - cntx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new TakeWhile(0, pred, cbfactory, splitter assign cntx) mapResult { _._1.result }) + val cbf = combinerFactory + if (cbf.doesShareCombiners) { + val parseqspan = toSeq.takeWhile(pred) + executeAndWaitResult(new Copy(combinerFactory, parseqspan.splitter) mapResult { _.result }) + } else { + val cntx = new DefaultSignalling with AtomicIndexFlag + cntx.setIndexFlag(Int.MaxValue) + executeAndWaitResult(new TakeWhile(0, pred, combinerFactory, splitter assign cntx) mapResult { _._1.result }) + } } /** Splits this $coll into a prefix/suffix pair according to a predicate. @@ -729,11 +749,22 @@ self: ParIterableLike[T, Repr, Sequential] => * the elements satisfy `pred`, and the rest of the collection */ def span(pred: T => Boolean): (Repr, Repr) = { - val cntx = new DefaultSignalling with AtomicIndexFlag - cntx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new Span(0, pred, cbfactory, splitter assign cntx) mapResult { - p => (p._1.result, p._2.result) - }) + val cbf = combinerFactory + if (cbf.doesShareCombiners) { + val (xs, ys) = toSeq.span(pred) + val copyxs = new Copy(combinerFactory, xs.splitter) mapResult { _.result } + val copyys = new Copy(combinerFactory, ys.splitter) mapResult { _.result } + val copyall = (copyxs parallel copyys) { + (xr, yr) => (xr, yr) + } + executeAndWaitResult(copyall) + } else { + val cntx = new DefaultSignalling with AtomicIndexFlag + cntx.setIndexFlag(Int.MaxValue) + executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult { + p => (p._1.result, p._2.result) + }) + } } /** Drops all elements in the longest prefix of elements that satisfy the predicate, @@ -749,7 +780,7 @@ self: ParIterableLike[T, Repr, Sequential] => def dropWhile(pred: T => Boolean): Repr = { val cntx = new DefaultSignalling with AtomicIndexFlag cntx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new Span(0, pred, cbfactory, splitter assign cntx) mapResult { _._2.result }) + executeAndWaitResult(new Span(0, pred, combinerFactory, combinerFactory, splitter assign cntx) mapResult { _._2.result }) } def copyToArray[U >: T](xs: Array[U]) = copyToArray(xs, 0) @@ -765,7 +796,7 @@ self: ParIterableLike[T, Repr, Sequential] => def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { val pbf = bf.asParallel val thatseq = that.asParSeq - executeAndWaitResult(new Zip(pbf, splitter, thatseq.splitter) mapResult { _.result }); + executeAndWaitResult(new Zip(combinerFactory(() => pbf(repr)), splitter, thatseq.splitter) mapResult { _.result }); } else seq.zip(that)(bf2seq(bf)) def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip immutable.ParRange(0, size, 1, false) @@ -773,15 +804,15 @@ self: ParIterableLike[T, Repr, Sequential] => def zipAll[S, U >: T, That](that: GenIterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { val pbf = bf.asParallel val thatseq = that.asParSeq - executeAndWaitResult(new ZipAll(size max thatseq.length, thisElem, thatElem, pbf, splitter, thatseq.splitter) mapResult { _.result }); + executeAndWaitResult(new ZipAll(size max thatseq.length, thisElem, thatElem, combinerFactory(() => pbf(repr)), splitter, thatseq.splitter) mapResult { _.result }); } else seq.zipAll(that, thisElem, thatElem)(bf2seq(bf)) protected def toParCollection[U >: T, That](cbf: () => Combiner[U, That]): That = { - executeAndWaitResult(new ToParCollection(cbf, splitter) mapResult { _.result }); + executeAndWaitResult(new ToParCollection(combinerFactory(cbf), splitter) mapResult { _.result }); } protected def toParMap[K, V, That](cbf: () => Combiner[(K, V), That])(implicit ev: T <:< (K, V)): That = { - executeAndWaitResult(new ToParMap(cbf, splitter)(ev) mapResult { _.result }) + executeAndWaitResult(new ToParMap(combinerFactory(cbf), splitter)(ev) mapResult { _.result }) } def view = new ParIterableView[T, Repr, Sequential] { @@ -869,7 +900,7 @@ self: ParIterableLike[T, Repr, Sequential] => /** Sequentially performs one task after another. */ protected[this] abstract class SeqComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] - (f: First, s: Second) + (f: First, s: Second) extends Composite[FR, SR, R, First, Second](f, s) { def leaf(prevr: Option[R]) = { executeAndWaitResult(ft) @@ -880,7 +911,7 @@ self: ParIterableLike[T, Repr, Sequential] => /** Performs two tasks in parallel, and waits for both to finish. */ protected[this] abstract class ParComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] - (f: First, s: Second) + (f: First, s: Second) extends Composite[FR, SR, R, First, Second](f, s) { def leaf(prevr: Option[R]) = { val ftfuture = execute(ft) @@ -903,16 +934,18 @@ self: ParIterableLike[T, Repr, Sequential] => } override def requiresStrictSplitters = inner.requiresStrictSplitters } - + protected trait Transformer[R, Tp] extends Accessor[R, Tp] - - protected[this] class Foreach[S](op: T => S, protected[this] val pit: IterableSplitter[T]) extends Accessor[Unit, Foreach[S]] { + + protected[this] class Foreach[S](op: T => S, protected[this] val pit: IterableSplitter[T]) + extends Accessor[Unit, Foreach[S]] { @volatile var result: Unit = () def leaf(prevr: Option[Unit]) = pit.foreach(op) protected[this] def newSubtask(p: IterableSplitter[T]) = new Foreach[S](op, p) } - protected[this] class Count(pred: T => Boolean, protected[this] val pit: IterableSplitter[T]) extends Accessor[Int, Count] { + protected[this] class Count(pred: T => Boolean, protected[this] val pit: IterableSplitter[T]) + extends Accessor[Int, Count] { // val pittxt = pit.toString @volatile var result: Int = 0 def leaf(prevr: Option[Int]) = result = pit.count(pred) @@ -920,8 +953,9 @@ self: ParIterableLike[T, Repr, Sequential] => override def merge(that: Count) = result = result + that.result // override def toString = "CountTask(" + pittxt + ")" } - - protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: IterableSplitter[T]) extends Accessor[Option[U], Reduce[U]] { + + protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: IterableSplitter[T]) + extends Accessor[Option[U], Reduce[U]] { @volatile var result: Option[U] = None def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.reduce(op)) protected[this] def newSubtask(p: IterableSplitter[T]) = new Reduce(op, p) @@ -931,7 +965,8 @@ self: ParIterableLike[T, Repr, Sequential] => override def requiresStrictSplitters = true } - protected[this] class Fold[U >: T](z: U, op: (U, U) => U, protected[this] val pit: IterableSplitter[T]) extends Accessor[U, Fold[U]] { + protected[this] class Fold[U >: T](z: U, op: (U, U) => U, protected[this] val pit: IterableSplitter[T]) + extends Accessor[U, Fold[U]] { @volatile var result: U = null.asInstanceOf[U] def leaf(prevr: Option[U]) = result = pit.fold(z)(op) protected[this] def newSubtask(p: IterableSplitter[T]) = new Fold(z, op, p) @@ -946,21 +981,24 @@ self: ParIterableLike[T, Repr, Sequential] => override def merge(that: Aggregate[S]) = result = combop(result, that.result) } - protected[this] class Sum[U >: T](num: Numeric[U], protected[this] val pit: IterableSplitter[T]) extends Accessor[U, Sum[U]] { + protected[this] class Sum[U >: T](num: Numeric[U], protected[this] val pit: IterableSplitter[T]) + extends Accessor[U, Sum[U]] { @volatile var result: U = null.asInstanceOf[U] def leaf(prevr: Option[U]) = result = pit.sum(num) protected[this] def newSubtask(p: IterableSplitter[T]) = new Sum(num, p) override def merge(that: Sum[U]) = result = num.plus(result, that.result) } - protected[this] class Product[U >: T](num: Numeric[U], protected[this] val pit: IterableSplitter[T]) extends Accessor[U, Product[U]] { + protected[this] class Product[U >: T](num: Numeric[U], protected[this] val pit: IterableSplitter[T]) + extends Accessor[U, Product[U]] { @volatile var result: U = null.asInstanceOf[U] def leaf(prevr: Option[U]) = result = pit.product(num) protected[this] def newSubtask(p: IterableSplitter[T]) = new Product(num, p) override def merge(that: Product[U]) = result = num.times(result, that.result) } - protected[this] class Min[U >: T](ord: Ordering[U], protected[this] val pit: IterableSplitter[T]) extends Accessor[Option[U], Min[U]] { + protected[this] class Min[U >: T](ord: Ordering[U], protected[this] val pit: IterableSplitter[T]) + extends Accessor[Option[U], Min[U]] { @volatile var result: Option[U] = None def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.min(ord)) protected[this] def newSubtask(p: IterableSplitter[T]) = new Min(ord, p) @@ -970,7 +1008,8 @@ self: ParIterableLike[T, Repr, Sequential] => override def requiresStrictSplitters = true } - protected[this] class Max[U >: T](ord: Ordering[U], protected[this] val pit: IterableSplitter[T]) extends Accessor[Option[U], Max[U]] { + protected[this] class Max[U >: T](ord: Ordering[U], protected[this] val pit: IterableSplitter[T]) + extends Accessor[Option[U], Max[U]] { @volatile var result: Option[U] = None def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.max(ord)) protected[this] def newSubtask(p: IterableSplitter[T]) = new Max(ord, p) @@ -980,16 +1019,16 @@ self: ParIterableLike[T, Repr, Sequential] => override def requiresStrictSplitters = true } - protected[this] class Map[S, That](f: T => S, pbf: () => Combiner[S, That], protected[this] val pit: IterableSplitter[T]) + protected[this] class Map[S, That](f: T => S, cbf: CombinerFactory[S, That], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[S, That], Map[S, That]] { @volatile var result: Combiner[S, That] = null - def leaf(prev: Option[Combiner[S, That]]) = result = pit.map2combiner(f, reuse(prev, pbf())) - protected[this] def newSubtask(p: IterableSplitter[T]) = new Map(f, pbf, p) + def leaf(prev: Option[Combiner[S, That]]) = result = pit.map2combiner(f, reuse(prev, cbf())) + protected[this] def newSubtask(p: IterableSplitter[T]) = new Map(f, cbf, p) override def merge(that: Map[S, That]) = result = result combine that.result } protected[this] class Collect[S, That] - (pf: PartialFunction[T, S], pbf: () => Combiner[S, That], protected[this] val pit: IterableSplitter[T]) + (pf: PartialFunction[T, S], pbf: CombinerFactory[S, That], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[S, That], Collect[S, That]] { @volatile var result: Combiner[S, That] = null def leaf(prev: Option[Combiner[S, That]]) = result = pit.collect2combiner[S, That](pf, pbf()) @@ -998,7 +1037,7 @@ self: ParIterableLike[T, Repr, Sequential] => } protected[this] class FlatMap[S, That] - (f: T => GenTraversableOnce[S], pbf: () => Combiner[S, That], protected[this] val pit: IterableSplitter[T]) + (f: T => GenTraversableOnce[S], pbf: CombinerFactory[S, That], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[S, That], FlatMap[S, That]] { @volatile var result: Combiner[S, That] = null def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf()) @@ -1010,28 +1049,31 @@ self: ParIterableLike[T, Repr, Sequential] => } } - protected[this] class Forall(pred: T => Boolean, protected[this] val pit: IterableSplitter[T]) extends Accessor[Boolean, Forall] { + protected[this] class Forall(pred: T => Boolean, protected[this] val pit: IterableSplitter[T]) + extends Accessor[Boolean, Forall] { @volatile var result: Boolean = true def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.forall(pred); if (result == false) pit.abort } protected[this] def newSubtask(p: IterableSplitter[T]) = new Forall(pred, p) override def merge(that: Forall) = result = result && that.result } - protected[this] class Exists(pred: T => Boolean, protected[this] val pit: IterableSplitter[T]) extends Accessor[Boolean, Exists] { + protected[this] class Exists(pred: T => Boolean, protected[this] val pit: IterableSplitter[T]) + extends Accessor[Boolean, Exists] { @volatile var result: Boolean = false def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.exists(pred); if (result == true) pit.abort } protected[this] def newSubtask(p: IterableSplitter[T]) = new Exists(pred, p) override def merge(that: Exists) = result = result || that.result } - protected[this] class Find[U >: T](pred: T => Boolean, protected[this] val pit: IterableSplitter[T]) extends Accessor[Option[U], Find[U]] { + protected[this] class Find[U >: T](pred: T => Boolean, protected[this] val pit: IterableSplitter[T]) + extends Accessor[Option[U], Find[U]] { @volatile var result: Option[U] = None def leaf(prev: Option[Option[U]]) = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort } protected[this] def newSubtask(p: IterableSplitter[T]) = new Find(pred, p) override def merge(that: Find[U]) = if (this.result == None) result = that.result } - protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: IterableSplitter[T]) + protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: CombinerFactory[U, This], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[U, This], Filter[U, This]] { @volatile var result: Combiner[U, This] = null def leaf(prev: Option[Combiner[U, This]]) = { @@ -1041,7 +1083,7 @@ self: ParIterableLike[T, Repr, Sequential] => override def merge(that: Filter[U, This]) = result = result combine that.result } - protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: IterableSplitter[T]) + protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: CombinerFactory[U, This], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[U, This], FilterNot[U, This]] { @volatile var result: Combiner[U, This] = null def leaf(prev: Option[Combiner[U, This]]) = { @@ -1051,7 +1093,7 @@ self: ParIterableLike[T, Repr, Sequential] => override def merge(that: FilterNot[U, This]) = result = result combine that.result } - protected class Copy[U >: T, That](cfactory: () => Combiner[U, That], protected[this] val pit: IterableSplitter[T]) + protected class Copy[U >: T, That](cfactory: CombinerFactory[U, That], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[U, That], Copy[U, That]] { @volatile var result: Combiner[U, That] = null def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cfactory())) @@ -1059,11 +1101,12 @@ self: ParIterableLike[T, Repr, Sequential] => override def merge(that: Copy[U, That]) = result = result combine that.result } - protected[this] class Partition[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: IterableSplitter[T]) + protected[this] class Partition[U >: T, This >: Repr] + (pred: T => Boolean, cbfTrue: CombinerFactory[U, This], cbfFalse: CombinerFactory[U, This], protected[this] val pit: IterableSplitter[T]) extends Transformer[(Combiner[U, This], Combiner[U, This]), Partition[U, This]] { @volatile var result: (Combiner[U, This], Combiner[U, This]) = null - def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.partition2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) - protected[this] def newSubtask(p: IterableSplitter[T]) = new Partition(pred, cbf, p) + def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.partition2combiners(pred, reuse(prev.map(_._1), cbfTrue()), reuse(prev.map(_._2), cbfFalse())) + protected[this] def newSubtask(p: IterableSplitter[T]) = new Partition(pred, cbfTrue, cbfFalse, p) override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2) } @@ -1090,7 +1133,8 @@ self: ParIterableLike[T, Repr, Sequential] => } } - protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: IterableSplitter[T]) + protected[this] class Take[U >: T, This >: Repr] + (n: Int, cbf: CombinerFactory[U, This], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[U, This], Take[U, This]] { @volatile var result: Combiner[U, This] = null def leaf(prev: Option[Combiner[U, This]]) = { @@ -1109,7 +1153,8 @@ self: ParIterableLike[T, Repr, Sequential] => override def requiresStrictSplitters = true } - protected[this] class Drop[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: IterableSplitter[T]) + protected[this] class Drop[U >: T, This >: Repr] + (n: Int, cbf: CombinerFactory[U, This], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[U, This], Drop[U, This]] { @volatile var result: Combiner[U, This] = null def leaf(prev: Option[Combiner[U, This]]) = result = pit.drop2combiner(n, reuse(prev, cbf())) @@ -1126,7 +1171,8 @@ self: ParIterableLike[T, Repr, Sequential] => override def requiresStrictSplitters = true } - protected[this] class Slice[U >: T, This >: Repr](from: Int, until: Int, cbf: () => Combiner[U, This], protected[this] val pit: IterableSplitter[T]) + protected[this] class Slice[U >: T, This >: Repr] + (from: Int, until: Int, cbf: CombinerFactory[U, This], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[U, This], Slice[U, This]] { @volatile var result: Combiner[U, This] = null def leaf(prev: Option[Combiner[U, This]]) = result = pit.slice2combiner(from, until, reuse(prev, cbf())) @@ -1144,22 +1190,23 @@ self: ParIterableLike[T, Repr, Sequential] => override def requiresStrictSplitters = true } - protected[this] class SplitAt[U >: T, This >: Repr](at: Int, cbf: () => Combiner[U, This], protected[this] val pit: IterableSplitter[T]) + protected[this] class SplitAt[U >: T, This >: Repr] + (at: Int, cbfBefore: CombinerFactory[U, This], cbfAfter: CombinerFactory[U, This], protected[this] val pit: IterableSplitter[T]) extends Transformer[(Combiner[U, This], Combiner[U, This]), SplitAt[U, This]] { @volatile var result: (Combiner[U, This], Combiner[U, This]) = null - def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.splitAt2combiners(at, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) + def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.splitAt2combiners(at, reuse(prev.map(_._1), cbfBefore()), reuse(prev.map(_._2), cbfAfter())) protected[this] def newSubtask(p: IterableSplitter[T]) = throw new UnsupportedOperationException override def split = { val pits = pit.split val sizes = pits.scanLeft(0)(_ + _.remaining) - for ((p, untilp) <- pits zip sizes) yield new SplitAt((at max untilp min (untilp + p.remaining)) - untilp, cbf, p) + for ((p, untilp) <- pits zip sizes) yield new SplitAt((at max untilp min (untilp + p.remaining)) - untilp, cbfBefore, cbfAfter, p) } override def merge(that: SplitAt[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2) override def requiresStrictSplitters = true } protected[this] class TakeWhile[U >: T, This >: Repr] - (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: IterableSplitter[T]) + (pos: Int, pred: T => Boolean, cbf: CombinerFactory[U, This], protected[this] val pit: IterableSplitter[T]) extends Transformer[(Combiner[U, This], Boolean), TakeWhile[U, This]] { @volatile var result: (Combiner[U, This], Boolean) = null def leaf(prev: Option[(Combiner[U, This], Boolean)]) = if (pos < pit.indexFlag) { @@ -1178,23 +1225,23 @@ self: ParIterableLike[T, Repr, Sequential] => } protected[this] class Span[U >: T, This >: Repr] - (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: IterableSplitter[T]) + (pos: Int, pred: T => Boolean, cbfBefore: CombinerFactory[U, This], cbfAfter: CombinerFactory[U, This], protected[this] val pit: IterableSplitter[T]) extends Transformer[(Combiner[U, This], Combiner[U, This]), Span[U, This]] { @volatile var result: (Combiner[U, This], Combiner[U, This]) = null def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = if (pos < pit.indexFlag) { // val lst = pit.toList // val pa = mutable.ParArray(lst: _*) // val str = "At leaf we will iterate: " + pa.splitter.toList - result = pit.span2combiners(pred, cbf(), cbf()) // do NOT reuse old combiners here, lest ye be surprised + result = pit.span2combiners(pred, cbfBefore(), cbfAfter()) // do NOT reuse old combiners here, lest ye be surprised // println("\nAt leaf result is: " + result) if (result._2.size > 0) pit.setIndexFlagIfLesser(pos) } else { - result = (reuse(prev.map(_._2), cbf()), pit.copy2builder[U, This, Combiner[U, This]](reuse(prev.map(_._2), cbf()))) + result = (reuse(prev.map(_._2), cbfBefore()), pit.copy2builder[U, This, Combiner[U, This]](reuse(prev.map(_._2), cbfAfter()))) } protected[this] def newSubtask(p: IterableSplitter[T]) = throw new UnsupportedOperationException override def split = { val pits = pit.split - for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Span(pos + untilp, pred, cbf, p) + for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Span(pos + untilp, pred, cbfBefore, cbfAfter, p) } override def merge(that: Span[U, This]) = result = if (result._2.size == 0) { (result._1 combine that.result._1, that.result._2) @@ -1204,10 +1251,10 @@ self: ParIterableLike[T, Repr, Sequential] => override def requiresStrictSplitters = true } - protected[this] class Zip[U >: T, S, That](pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: IterableSplitter[T], val othpit: SeqSplitter[S]) + protected[this] class Zip[U >: T, S, That](pbf: CombinerFactory[(U, S), That], protected[this] val pit: IterableSplitter[T], val othpit: SeqSplitter[S]) extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] { @volatile var result: Result = null - def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](othpit, pbf(self.repr)) + def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](othpit, pbf()) protected[this] def newSubtask(p: IterableSplitter[T]) = unsupported override def split = { val pits = pit.split @@ -1220,10 +1267,10 @@ self: ParIterableLike[T, Repr, Sequential] => } protected[this] class ZipAll[U >: T, S, That] - (len: Int, thiselem: U, thatelem: S, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: IterableSplitter[T], val othpit: SeqSplitter[S]) + (len: Int, thiselem: U, thatelem: S, pbf: CombinerFactory[(U, S), That], protected[this] val pit: IterableSplitter[T], val othpit: SeqSplitter[S]) extends Transformer[Combiner[(U, S), That], ZipAll[U, S, That]] { @volatile var result: Result = null - def leaf(prev: Option[Result]) = result = pit.zipAll2combiner[U, S, That](othpit, thiselem, thatelem, pbf(self.repr)) + def leaf(prev: Option[Result]) = result = pit.zipAll2combiner[U, S, That](othpit, thiselem, thatelem, pbf()) protected[this] def newSubtask(p: IterableSplitter[T]) = unsupported override def split = if (pit.remaining <= len) { val pits = pit.split @@ -1257,7 +1304,7 @@ self: ParIterableLike[T, Repr, Sequential] => override def requiresStrictSplitters = true } - protected[this] class ToParCollection[U >: T, That](cbf: () => Combiner[U, That], protected[this] val pit: IterableSplitter[T]) + protected[this] class ToParCollection[U >: T, That](cbf: CombinerFactory[U, That], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[U, That], ToParCollection[U, That]] { @volatile var result: Result = null def leaf(prev: Option[Combiner[U, That]]) { @@ -1268,7 +1315,7 @@ self: ParIterableLike[T, Repr, Sequential] => override def merge(that: ToParCollection[U, That]) = result = result combine that.result } - protected[this] class ToParMap[K, V, That](cbf: () => Combiner[(K, V), That], protected[this] val pit: IterableSplitter[T])(implicit ev: T <:< (K, V)) + protected[this] class ToParMap[K, V, That](cbf: CombinerFactory[(K, V), That], protected[this] val pit: IterableSplitter[T])(implicit ev: T <:< (K, V)) extends Transformer[Combiner[(K, V), That], ToParMap[K, V, That]] { @volatile var result: Result = null def leaf(prev: Option[Combiner[(K, V), That]]) { @@ -1315,13 +1362,13 @@ self: ParIterableLike[T, Repr, Sequential] => } else result = that.result override def requiresStrictSplitters = true } - + protected[this] class FromScanTree[U >: T, That] - (tree: ScanTree[U], z: U, op: (U, U) => U, cbf: CanCombineFrom[Repr, U, That]) + (tree: ScanTree[U], z: U, op: (U, U) => U, cbf: CombinerFactory[U, That]) extends StrictSplitterCheckTask[Combiner[U, That], FromScanTree[U, That]] { @volatile var result: Combiner[U, That] = null def leaf(prev: Option[Combiner[U, That]]) { - val cb = reuse(prev, cbf(self.repr)) + val cb = reuse(prev, cbf()) iterate(tree, cb) result = cb } @@ -1391,7 +1438,13 @@ self: ParIterableLike[T, Repr, Sequential] => def rightmost = this def print(depth: Int) = println((" " * depth) + this) } - + + /* alias methods */ + + def /:[S](z: S)(op: (S, T) => S): S = foldLeft(z)(op); + + def :\[S](z: S)(op: (T, S) => S): S = foldRight(z)(op); + /* debug information */ private[parallel] def debugInformation = "Parallel collection: " + this.getClass diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index d0f38b30dc..22c587b498 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -213,9 +213,9 @@ self => } otherwise seq.sameElements(that) /** Tests whether this $coll ends with the given parallel sequence. - * + * * $abortsignalling - * + * * @tparam S the type of the elements of `that` sequence * @param that the sequence to test * @return `true` if this $coll has `that` as a suffix, `false` otherwise @@ -236,12 +236,13 @@ self => val that = patch.asParSeq val pbf = bf.asParallel val pits = splitter.psplit(from, replaced, length - from - realreplaced) - val copystart = new Copy[U, That](() => pbf(repr), pits(0)) + val cfactory = combinerFactory(() => pbf(repr)) + val copystart = new Copy[U, That](cfactory, pits(0)) val copymiddle = wrap { - val tsk = new that.Copy[U, That](() => pbf(repr), that.splitter) + val tsk = new that.Copy[U, That](cfactory, that.splitter) tasksupport.executeAndWaitResult(tsk) } - val copyend = new Copy[U, That](() => pbf(repr), pits(2)) + val copyend = new Copy[U, That](cfactory, pits(2)) executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { _.result }) diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index f152629c50..f154019bac 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -83,6 +83,7 @@ package object parallel { } } + package parallel { trait FactoryOps[From, Elem, To] { trait Otherwise[R] { @@ -113,7 +114,19 @@ package parallel { } /* classes */ - + + trait CombinerFactory[U, Repr] { + /** Provides a combiner used to construct a collection. */ + def apply(): Combiner[U, Repr] + /** The call to the `apply` method can create a new combiner each time. + * If it does, this method returns `false`. + * The same combiner factory may be used each time (typically, this is + * the case for concurrent collections, which are thread safe). + * If so, the method returns `true`. + */ + def doesShareCombiners: Boolean + } + /** Composite throwable - thrown when multiple exceptions are thrown at the same time. */ final case class CompositeThrowable( val throwables: Set[Throwable] |