diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/library/scala/collection/parallel/ParIterableLike.scala | 38 | ||||
-rw-r--r-- | src/library/scala/collection/parallel/ParSeqLike.scala | 22 |
2 files changed, 40 insertions, 20 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 7c176eeee4..cc18ec895c 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -285,6 +285,8 @@ self: ParIterableLike[T, Repr, Sequential] => } def ifIs[Cmb](isbody: Cmb => Unit): Otherwise[Cmb] + def isCombiner: Boolean + def asCombiner: Combiner[Elem, To] } trait SignallingOps[PI <: DelegatedSignalling] { @@ -325,6 +327,8 @@ self: ParIterableLike[T, Repr, Sequential] => if (cb.getClass == m.erasure) isbody(cb.asInstanceOf[Cmb]) else notbody } } + def isCombiner = cb.isInstanceOf[Combiner[_, _]] + def asCombiner = cb.asInstanceOf[Combiner[Elem, To]] } protected[this] def bf2seq[S, That](bf: CanBuildFrom[Repr, S, That]) = new CanBuildFrom[Sequential, S, That] { @@ -501,17 +505,26 @@ self: ParIterableLike[T, Repr, Sequential] => reduce((x, y) => if (cmp.lteq(f(x), f(y))) x else y) } - def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => + def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) { + executeAndWaitResult(new Map[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.result }) + } else seq.map(f)(bf2seq(bf)) + /*bf ifParallel { pbf => executeAndWaitResult(new Map[S, That](f, pbf, splitter) mapResult { _.result }) - } otherwise seq.map(f)(bf2seq(bf)) + } otherwise seq.map(f)(bf2seq(bf))*/ - def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => + def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) { + executeAndWaitResult(new Collect[S, That](pf, () => bf(repr).asCombiner, splitter) mapResult { _.result }) + } else seq.collect(pf)(bf2seq(bf)) + /*bf ifParallel { pbf => executeAndWaitResult(new Collect[S, That](pf, pbf, splitter) mapResult { _.result }) - } otherwise seq.collect(pf)(bf2seq(bf)) + } otherwise seq.collect(pf)(bf2seq(bf))*/ - def flatMap[S, That](f: T => GenTraversableOnce[S])(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => + def flatMap[S, That](f: T => GenTraversableOnce[S])(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) { + executeAndWaitResult(new FlatMap[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.result }) + } else seq.flatMap(f)(bf2seq(bf)) + /*bf ifParallel { pbf => executeAndWaitResult(new FlatMap[S, That](f, pbf, splitter) mapResult { _.result }) - } otherwise seq.flatMap(f)(bf2seq(bf)) + } otherwise seq.flatMap(f)(bf2seq(bf))*/ /** Tests whether a predicate holds for all elements of this $coll. * @@ -965,27 +978,28 @@ self: ParIterableLike[T, Repr, Sequential] => override def requiresStrictSplitters = true } - protected[this] class Map[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: IterableSplitter[T]) + protected[this] class Map[S, That](f: T => S, pbf: () => Combiner[S, That], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[S, That], Map[S, That]] { @volatile var result: Combiner[S, That] = null - def leaf(prev: Option[Combiner[S, That]]) = result = pit.map2combiner(f, reuse(prev, pbf(self.repr))) + def leaf(prev: Option[Combiner[S, That]]) = result = pit.map2combiner(f, reuse(prev, pbf())) protected[this] def newSubtask(p: IterableSplitter[T]) = new Map(f, pbf, p) override def merge(that: Map[S, That]) = result = result combine that.result } protected[this] class Collect[S, That] - (pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: IterableSplitter[T]) + (pf: PartialFunction[T, S], pbf: () => Combiner[S, That], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[S, That], Collect[S, That]] { @volatile var result: Combiner[S, That] = null - def leaf(prev: Option[Combiner[S, That]]) = result = pit.collect2combiner[S, That](pf, pbf(self.repr)) + def leaf(prev: Option[Combiner[S, That]]) = result = pit.collect2combiner[S, That](pf, pbf()) protected[this] def newSubtask(p: IterableSplitter[T]) = new Collect(pf, pbf, p) override def merge(that: Collect[S, That]) = result = result combine that.result } - protected[this] class FlatMap[S, That](f: T => GenTraversableOnce[S], pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: IterableSplitter[T]) + protected[this] class FlatMap[S, That] + (f: T => GenTraversableOnce[S], pbf: () => Combiner[S, That], protected[this] val pit: IterableSplitter[T]) extends Transformer[Combiner[S, That], FlatMap[S, That]] { @volatile var result: Combiner[S, That] = null - def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf(self.repr)) + def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf()) protected[this] def newSubtask(p: IterableSplitter[T]) = new FlatMap(f, pbf, p) override def merge(that: FlatMap[S, That]) = { //debuglog("merging " + result + " and " + that.result) diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 1d4d8a13ad..bb14e35f4f 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -181,9 +181,12 @@ self => executeAndWaitResult(new Reverse(() => newCombiner, splitter) mapResult { _.result }) } - def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => + def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) { + executeAndWaitResult(new ReverseMap[S, That](f, () => bf(repr).asCombiner, splitter) mapResult { _.result }) + } else seq.reverseMap(f)(bf2seq(bf)) + /*bf ifParallel { pbf => executeAndWaitResult(new ReverseMap[S, That](f, pbf, splitter) mapResult { _.result }) - } otherwise seq.reverseMap(f)(bf2seq(bf)) + } otherwise seq.reverseMap(f)(bf2seq(bf))*/ /** Tests whether this $coll contains the given sequence at a given index. * @@ -256,9 +259,12 @@ self => b.result } - def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = bf ifParallel { pbf => + def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (bf(repr).isCombiner) { + executeAndWaitResult(new Updated(index, elem, () => bf(repr).asCombiner, splitter) mapResult { _.result }) + } else seq.updated(index, elem)(bf2seq(bf)) + /*bf ifParallel { pbf => executeAndWaitResult(new Updated(index, elem, pbf, splitter) mapResult { _.result }) - } otherwise seq.updated(index, elem)(bf2seq(bf)) + } otherwise seq.updated(index, elem)(bf2seq(bf))*/ def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = { patch(0, mutable.ParArray(elem), 0) @@ -423,10 +429,10 @@ self => override def merge(that: Reverse[U, This]) = result = that.result combine result } - protected[this] class ReverseMap[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: SeqSplitter[T]) + protected[this] class ReverseMap[S, That](f: T => S, pbf: () => Combiner[S, That], protected[this] val pit: SeqSplitter[T]) extends Transformer[Combiner[S, That], ReverseMap[S, That]] { @volatile var result: Combiner[S, That] = null - def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf(self.repr)) + def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf()) protected[this] def newSubtask(p: SuperParIterator) = new ReverseMap(f, pbf, down(p)) override def merge(that: ReverseMap[S, That]) = result = that.result combine result } @@ -448,10 +454,10 @@ self => override def requiresStrictSplitters = true } - protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], protected[this] val pit: SeqSplitter[T]) + protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: () => Combiner[U, That], protected[this] val pit: SeqSplitter[T]) extends Transformer[Combiner[U, That], Updated[U, That]] { @volatile var result: Combiner[U, That] = null - def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf(self.repr)) + def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf()) protected[this] def newSubtask(p: SuperParIterator) = unsupported override def split = { val pits = pit.split |