diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-06-18 07:49:14 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-06-18 07:49:14 +0000 |
commit | 18ad78dd73b29c0c8b34e970c58cd86232cdc4f5 (patch) | |
tree | 12ff8e92f2601ff2127e1f2c6957d95e28c76e18 /src | |
parent | b9fb76d09d0a6e63bfb6f332079ab7d05f1233ca (diff) | |
download | scala-18ad78dd73b29c0c8b34e970c58cd86232cdc4f5.tar.gz scala-18ad78dd73b29c0c8b34e970c58cd86232cdc4f5.tar.bz2 scala-18ad78dd73b29c0c8b34e970c58cd86232cdc4f5.zip |
Refactorings and hash trie combiners. No review.
Diffstat (limited to 'src')
26 files changed, 180 insertions, 127 deletions
diff --git a/src/parallel-collections/scala/collection/Parallelizable.scala b/src/parallel-collections/scala/collection/Parallelizable.scala index 206285459d..bfca4a41d7 100644 --- a/src/parallel-collections/scala/collection/Parallelizable.scala +++ b/src/parallel-collections/scala/collection/Parallelizable.scala @@ -11,12 +11,12 @@ import parallel.ParallelIterableLike * by invoking the method `par`. Parallelizable collections may be parametrized with * a target type different than their own. */ -trait Parallelizable[+T, +Repr <: Parallel] { +trait Parallelizable[+T, +ParRepr <: Parallel] { /** * Returns a parallel implementation of a collection. */ - def par: Repr + def par: ParRepr } diff --git a/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala b/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala index 404201b1c2..fcbcd6295e 100644 --- a/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala +++ b/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala @@ -15,7 +15,7 @@ import scala.collection.parallel._ * @tparam Elem the element type of the collection to be created * @tparam To the type of the collection to be created */ -trait CanBuildFromParallel[-From, -Elem, +To] extends CanBuildFrom[From, Elem, To] with Parallel { +trait CanCombineFrom[-From, -Elem, +To] extends CanBuildFrom[From, Elem, To] with Parallel { def apply(from: From): Combiner[Elem, To] def apply(): Combiner[Elem, To] } diff --git a/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala b/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala index 14eb9ab282..e5ba36f846 100644 --- a/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala +++ b/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala @@ -3,29 +3,27 @@ package scala.collection.generic import scala.collection.parallel.Combiner import scala.collection.parallel.ParallelIterable +import scala.collection.parallel.ParallelMap -/** - * A template class for companion objects of parallel collection classes. - * They should be mixed in together with `GenericCompanion` type. - * @tparam CC the type constructor representing the collection class - * @since 2.8 +/** A template class for companion objects of parallel collection classes. + * They should be mixed in together with `GenericCompanion` type. + * @tparam CC the type constructor representing the collection class + * @since 2.8 */ trait GenericParallelCompanion[+CC[X] <: ParallelIterable[X]] { - /** - * The default builder for $Coll objects. + /** The default builder for $Coll objects. */ def newBuilder[A]: Combiner[A, CC[A]] - /** - * The parallel builder for $Coll objects. + /** The parallel builder for $Coll objects. */ def newCombiner[A]: Combiner[A, CC[A]] } - - - +trait GenericParallelMapCompanion[+CC[P, Q] <: ParallelMap[P, Q]] { + def newCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]] +} diff --git a/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala b/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala index 58454be04e..e98c13fa36 100644 --- a/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala +++ b/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala @@ -4,6 +4,7 @@ package scala.collection.generic import scala.collection.parallel.Combiner import scala.collection.parallel.ParallelIterable +import scala.collection.parallel.ParallelMap import scala.collection.parallel.TaskSupport @@ -47,7 +48,17 @@ extends GenericTraversableTemplate[A, CC] } +trait GenericParallelMapTemplate[K, +V, +CC[X, Y] <: ParallelMap[X, Y]] +extends TaskSupport +{ + def mapCompanion: GenericParallelMapCompanion[CC] + def genericMapCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]] = { + val cb = mapCompanion.newCombiner[P, Q] + cb.environment = environment + cb + } +} diff --git a/src/parallel-collections/scala/collection/generic/ParallelFactory.scala b/src/parallel-collections/scala/collection/generic/ParallelFactory.scala index 86a5fdf822..0b9e92aa10 100644 --- a/src/parallel-collections/scala/collection/generic/ParallelFactory.scala +++ b/src/parallel-collections/scala/collection/generic/ParallelFactory.scala @@ -23,7 +23,7 @@ extends TraversableFactory[CC] * `apply(from)` to the `genericParallelBuilder` method of the $coll `from`, and calls to `apply()` * to this factory. */ - class GenericCanBuildFromParallel[A] extends GenericCanBuildFrom[A] with CanBuildFromParallel[CC[_], A, CC[A]] { + class GenericCanCombineFrom[A] extends GenericCanBuildFrom[A] with CanCombineFrom[CC[_], A, CC[A]] { override def apply(from: Coll) = from.genericCombiner override def apply() = newBuilder[A] } diff --git a/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala b/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala index ceda9d1155..8f779b4029 100644 --- a/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala +++ b/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala @@ -17,7 +17,10 @@ import scala.collection.mutable.Builder * @define $Coll ParallelMap */ abstract class ParallelMapFactory[CC[X, Y] <: ParallelMap[X, Y] with ParallelMapLike[X, Y, CC[X, Y], _]] -extends MapFactory[CC] { +extends MapFactory[CC] + with GenericParallelMapCompanion[CC] { + + type MapColl = CC[_, _] /** The default builder for $Coll objects. * @tparam K the type of the keys @@ -29,10 +32,10 @@ extends MapFactory[CC] { * @tparam K the type of the keys * @tparam V the type of the associated values */ - def newCombiner[K, V]: Combiner[(K, V), CC[K, V]] = null // TODO + def newCombiner[K, V]: Combiner[(K, V), CC[K, V]] - class ParallelMapCanBuildFrom[K, V] extends CanBuildFromParallel[CC[_, _], (K, V), CC[K, V]] { - def apply(from: CC[_, _]) = newCombiner[K, V] + class CanCombineFromMap[K, V] extends CanCombineFrom[CC[_, _], (K, V), CC[K, V]] { + def apply(from: MapColl) = from.genericMapCombiner[K, V].asInstanceOf[Combiner[(K, V), CC[K, V]]] def apply() = newCombiner[K, V] } diff --git a/src/parallel-collections/scala/collection/parallel/Combiners.scala b/src/parallel-collections/scala/collection/parallel/Combiners.scala index 80966f3435..a37f642d42 100644 --- a/src/parallel-collections/scala/collection/parallel/Combiners.scala +++ b/src/parallel-collections/scala/collection/parallel/Combiners.scala @@ -34,6 +34,9 @@ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel w * after the invocation of this method, and should be cleared (see `clear`) * if they are to be used again. * + * Also, combining two combiners `c1` and `c2` for which `c1 eq c2` is `true`, that is, + * they are the same objects in memories, always does nothing and returns the first combiner. + * * @tparam N the type of elements contained by the `other` builder * @tparam NewTo the type of collection produced by the `other` builder * @param other the other builder diff --git a/src/parallel-collections/scala/collection/parallel/Iterators.scala b/src/parallel-collections/scala/collection/parallel/Iterators.scala index 30aca2965b..bfebff994c 100644 --- a/src/parallel-collections/scala/collection/parallel/Iterators.scala +++ b/src/parallel-collections/scala/collection/parallel/Iterators.scala @@ -5,7 +5,7 @@ package scala.collection.parallel import scala.collection.Parallel import scala.collection.generic.Signalling import scala.collection.generic.DelegatedSignalling -import scala.collection.generic.CanBuildFromParallel +import scala.collection.generic.CanCombineFrom import scala.collection.mutable.Builder import scala.collection.Iterator.empty @@ -93,14 +93,14 @@ trait AugmentedIterableIterator[+T, +Repr <: Parallel] extends RemainsIterator[T /* transformers to combiners */ - def map2combiner[S, That](f: T => S, pbf: CanBuildFromParallel[Repr, S, That]): Combiner[S, That] = { - val cb = pbf(repr) + def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = { + //val cb = pbf(repr) cb.sizeHint(remaining) while (hasNext) cb += f(next) cb } - def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanBuildFromParallel[Repr, S, That]): Combiner[S, That] = { + def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = { val cb = pbf(repr) while (hasNext) { val curr = next @@ -109,7 +109,7 @@ trait AugmentedIterableIterator[+T, +Repr <: Parallel] extends RemainsIterator[T cb } - def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanBuildFromParallel[Repr, S, That]): Combiner[S, That] = { + def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = { val cb = pbf(repr) while (hasNext) { val traversable = f(next) @@ -276,7 +276,7 @@ trait AugmentedSeqIterator[+T, +Repr <: Parallel] extends AugmentedIterableItera cb } - def reverseMap2combiner[S, That](f: T => S, cbf: CanBuildFromParallel[Repr, S, That]): Combiner[S, That] = { + def reverseMap2combiner[S, That](f: T => S, cbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = { val cb = cbf(repr) cb.sizeHint(remaining) var lst = List[S]() @@ -288,7 +288,7 @@ trait AugmentedSeqIterator[+T, +Repr <: Parallel] extends AugmentedIterableItera cb } - def updated2combiner[U >: T, That](index: Int, elem: U, cbf: CanBuildFromParallel[Repr, U, That]): Combiner[U, That] = { + def updated2combiner[U >: T, That](index: Int, elem: U, cbf: CanCombineFrom[Repr, U, That]): Combiner[U, That] = { val cb = cbf(repr) cb.sizeHint(remaining) var j = 0 diff --git a/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala b/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala index 83cb37f9c8..4882dc19ee 100644 --- a/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala +++ b/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala @@ -26,8 +26,8 @@ trait ParallelIterable[+T] extends Iterable[T] /** $factoryinfo */ object ParallelIterable extends ParallelFactory[ParallelIterable] { - implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelIterable[T]] = - new GenericCanBuildFromParallel[T] + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelIterable[T]] = + new GenericCanCombineFrom[T] def newBuilder[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T] diff --git a/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala index 5ed6d10195..01a108eea0 100644 --- a/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala +++ b/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala @@ -102,7 +102,7 @@ import scala.collection.generic._ * The order in which the operations on elements are performed is unspecified and may be nondeterministic. * * @define pbfinfo - * An implicit value of class `CanBuildFromParallel` which determines the + * An implicit value of class `CanCombineFrom` which determines the * result class `That` from the current representation type `Repr` and * and the new element type `B`. This builder factory can provide a parallel * builder for the resulting collection. @@ -199,6 +199,16 @@ extends IterableLike[T, Repr] */ protected[this] override def newBuilder: collection.mutable.Builder[T, Repr] = newCombiner + /** Optionally reuses existing combiner for better performance. By default it doesn't - subclasses may override this behaviour. + * The provided combiner `oldc` that can potentially be reused will be either some combiner from the previous computational task, or `None` if there + * was no previous phase (in which case this method must return `newc`). + * + * @param oldc The combiner that is the result of the previous task, or `None` if there was no previous task. + * @param newc The new, empty combiner that can be used. + * @return Either `newc` or `oldc`. + */ + protected def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]): Combiner[S, That] = newc + /* convenience task operations wrapper */ protected implicit def task2ops[R, Tp](tsk: Task[R, Tp]) = new { def mapResult[R1](mapping: R => R1): ResultMapping[R, Tp, R1] = new ResultMapping[R, Tp, R1](tsk) { @@ -219,7 +229,7 @@ extends IterableLike[T, Repr] } protected def wrap[R](body: => R) = new NonDivisible[R] { - def leaf = result = body + def leaf(prevr: Option[R]) = result = body var result: R = null.asInstanceOf[R] } @@ -347,7 +357,7 @@ extends IterableLike[T, Repr] } override def product[U >: T](implicit num: Numeric[U]): U = { - executeAndWaitResult(new Product[U](num, parallelIterator)) + executeAndWaitResult(new Product[U](num, parallelIterator)) } override def min[U >: T](implicit ord: Ordering[U]): T = { @@ -612,7 +622,7 @@ extends IterableLike[T, Repr] /** Sequentially performs one task after another. */ protected[this] trait SeqComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] extends Composite[FR, SR, R, First, Second] { - def leaf = { + def leaf(prevr: Option[R]) = { ft.compute st.compute result = combineResults(ft.result, st.result) @@ -622,7 +632,7 @@ extends IterableLike[T, Repr] /** Performs two tasks in parallel, and waits for both to finish. */ protected[this] trait ParComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] extends Composite[FR, SR, R, First, Second] { - def leaf = { + def leaf(prevr: Option[R]) = { st.start ft.compute st.sync @@ -634,7 +644,7 @@ extends IterableLike[T, Repr] extends NonDivisibleTask[R1, ResultMapping[R, Tp, R1]] { var result: R1 = null.asInstanceOf[R1] def map(r: R): R1 - def leaf = { + def leaf(prevr: Option[R1]) = { inner.compute result = map(inner.result) } @@ -644,27 +654,27 @@ extends IterableLike[T, Repr] protected[this] class Foreach[S](op: T => S, val pit: ParallelIterator) extends Accessor[Unit, Foreach[S]] { var result: Unit = () - def leaf = pit.foreach(op) + def leaf(prevr: Option[Unit]) = pit.foreach(op) def newSubtask(p: ParallelIterator) = new Foreach[S](op, p) } protected[this] class Count(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Int, Count] { var result: Int = 0 - def leaf = result = pit.count(pred) + def leaf(prevr: Option[Int]) = result = pit.count(pred) def newSubtask(p: ParallelIterator) = new Count(pred, p) override def merge(that: Count) = result = result + that.result } protected[this] class Reduce[U >: T](op: (U, U) => U, val pit: ParallelIterator) extends Accessor[U, Reduce[U]] { var result: U = null.asInstanceOf[U] - def leaf = result = pit.reduce(op) + def leaf(prevr: Option[U]) = result = pit.reduce(op) def newSubtask(p: ParallelIterator) = new Reduce(op, p) override def merge(that: Reduce[U]) = result = op(result, that.result) } protected[this] class Fold[U >: T](z: U, op: (U, U) => U, val pit: ParallelIterator) extends Accessor[U, Fold[U]] { var result: U = null.asInstanceOf[U] - def leaf = result = pit.fold(z)(op) + def leaf(prevr: Option[U]) = result = pit.fold(z)(op) def newSubtask(p: ParallelIterator) = new Fold(z, op, p) override def merge(that: Fold[U]) = result = op(result, that.result) } @@ -672,81 +682,81 @@ extends IterableLike[T, Repr] protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, val pit: ParallelIterator) extends Accessor[S, Aggregate[S]] { var result: S = null.asInstanceOf[S] - def leaf = result = pit.foldLeft(z)(seqop) + def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop) def newSubtask(p: ParallelIterator) = new Aggregate(z, seqop, combop, p) override def merge(that: Aggregate[S]) = result = combop(result, that.result) } protected[this] class Sum[U >: T](num: Numeric[U], val pit: ParallelIterator) extends Accessor[U, Sum[U]] { var result: U = null.asInstanceOf[U] - def leaf = result = pit.sum(num) + def leaf(prevr: Option[U]) = result = pit.sum(num) def newSubtask(p: ParallelIterator) = new Sum(num, p) override def merge(that: Sum[U]) = result = num.plus(result, that.result) } protected[this] class Product[U >: T](num: Numeric[U], val pit: ParallelIterator) extends Accessor[U, Product[U]] { var result: U = null.asInstanceOf[U] - def leaf = result = pit.product(num) + def leaf(prevr: Option[U]) = result = pit.product(num) def newSubtask(p: ParallelIterator) = new Product(num, p) override def merge(that: Product[U]) = result = num.times(result, that.result) } protected[this] class Min[U >: T](ord: Ordering[U], val pit: ParallelIterator) extends Accessor[U, Min[U]] { var result: U = null.asInstanceOf[U] - def leaf = result = pit.min(ord) + def leaf(prevr: Option[U]) = result = pit.min(ord) def newSubtask(p: ParallelIterator) = new Min(ord, p) override def merge(that: Min[U]) = result = if (ord.lteq(result, that.result)) result else that.result } protected[this] class Max[U >: T](ord: Ordering[U], val pit: ParallelIterator) extends Accessor[U, Max[U]] { var result: U = null.asInstanceOf[U] - def leaf = result = pit.max(ord) + def leaf(prevr: Option[U]) = result = pit.max(ord) def newSubtask(p: ParallelIterator) = new Max(ord, p) override def merge(that: Max[U]) = result = if (ord.gteq(result, that.result)) result else that.result } - protected[this] class Map[S, That](f: T => S, pbf: CanBuildFromParallel[Repr, S, That], val pit: ParallelIterator) + protected[this] class Map[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator) extends Transformer[Combiner[S, That], Map[S, That]] { var result: Combiner[S, That] = null - def leaf = result = pit.map2combiner(f, pbf) + def leaf(prev: Option[Combiner[S, That]]) = result = pit.map2combiner(f, reuse(prev, pbf(self.repr))) def newSubtask(p: ParallelIterator) = new Map(f, pbf, p) override def merge(that: Map[S, That]) = result = result combine that.result } protected[this] class Collect[S, That] - (pf: PartialFunction[T, S], pbf: CanBuildFromParallel[Repr, S, That], val pit: ParallelIterator) + (pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator) extends Transformer[Combiner[S, That], Collect[S, That]] { var result: Combiner[S, That] = null - def leaf = result = pit.collect2combiner[S, That](pf, pbf) + def leaf(prev: Option[Combiner[S, That]]) = result = pit.collect2combiner[S, That](pf, pbf) // TODO def newSubtask(p: ParallelIterator) = new Collect(pf, pbf, p) override def merge(that: Collect[S, That]) = result = result combine that.result } - protected[this] class FlatMap[S, That](f: T => Traversable[S], pbf: CanBuildFromParallel[Repr, S, That], val pit: ParallelIterator) + protected[this] class FlatMap[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator) extends Transformer[Combiner[S, That], FlatMap[S, That]] { var result: Combiner[S, That] = null - def leaf = result = pit.flatmap2combiner(f, pbf) + def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf) // TODO def newSubtask(p: ParallelIterator) = new FlatMap(f, pbf, p) override def merge(that: FlatMap[S, That]) = result = result combine that.result } protected[this] class Forall(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Boolean, Forall] { var result: Boolean = true - def leaf = { if (!pit.isAborted) result = pit.forall(pred); if (result == false) pit.abort } + def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.forall(pred); if (result == false) pit.abort } def newSubtask(p: ParallelIterator) = new Forall(pred, p) override def merge(that: Forall) = result = result && that.result } protected[this] class Exists(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Boolean, Exists] { var result: Boolean = false - def leaf = { if (!pit.isAborted) result = pit.exists(pred); if (result == true) pit.abort } + def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.exists(pred); if (result == true) pit.abort } def newSubtask(p: ParallelIterator) = new Exists(pred, p) override def merge(that: Exists) = result = result || that.result } protected[this] class Find[U >: T](pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Option[U], Find[U]] { var result: Option[U] = None - def leaf = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort } + def leaf(prev: Option[Option[U]]) = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort } def newSubtask(p: ParallelIterator) = new Find(pred, p) override def merge(that: Find[U]) = if (this.result == None) result = that.result } @@ -754,7 +764,7 @@ extends IterableLike[T, Repr] protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) extends Transformer[Combiner[U, This], Filter[U, This]] { var result: Combiner[U, This] = null - def leaf = result = pit.filter2combiner(pred, cbf()) + def leaf(prev: Option[Combiner[U, This]]) = result = pit.filter2combiner(pred, reuse(prev, cbf())) def newSubtask(p: ParallelIterator) = new Filter(pred, cbf, p) override def merge(that: Filter[U, This]) = result = result combine that.result } @@ -762,7 +772,7 @@ extends IterableLike[T, Repr] protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) extends Transformer[Combiner[U, This], FilterNot[U, This]] { var result: Combiner[U, This] = null - def leaf = result = pit.filterNot2combiner(pred, cbf()) + def leaf(prev: Option[Combiner[U, This]]) = result = pit.filterNot2combiner(pred, reuse(prev, cbf())) def newSubtask(p: ParallelIterator) = new FilterNot(pred, cbf, p) override def merge(that: FilterNot[U, This]) = result = result combine that.result } @@ -770,7 +780,7 @@ extends IterableLike[T, Repr] protected class Copy[U >: T, That](cfactory: () => Combiner[U, That], val pit: ParallelIterator) extends Transformer[Combiner[U, That], Copy[U, That]] { var result: Combiner[U, That] = null - def leaf = result = pit.copy2builder[U, That, Combiner[U, That]](cfactory()) + def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cfactory())) def newSubtask(p: ParallelIterator) = new Copy[U, That](cfactory, p) override def merge(that: Copy[U, That]) = result = result combine that.result } @@ -778,7 +788,7 @@ extends IterableLike[T, Repr] protected[this] class Partition[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) extends Transformer[(Combiner[U, This], Combiner[U, This]), Partition[U, This]] { var result: (Combiner[U, This], Combiner[U, This]) = null - def leaf = result = pit.partition2combiners(pred, cbf(), cbf()) + def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.partition2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) def newSubtask(p: ParallelIterator) = new Partition(pred, cbf, p) override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2) } @@ -786,7 +796,7 @@ extends IterableLike[T, Repr] protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator) extends Transformer[Combiner[U, This], Take[U, This]] { var result: Combiner[U, This] = null - def leaf = result = pit.take2combiner(n, cbf()) + def leaf(prev: Option[Combiner[U, This]]) = result = pit.take2combiner(n, reuse(prev, cbf())) def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException override def split = { val pits = pit.split @@ -802,7 +812,7 @@ extends IterableLike[T, Repr] protected[this] class Drop[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator) extends Transformer[Combiner[U, This], Drop[U, This]] { var result: Combiner[U, This] = null - def leaf = result = pit.drop2combiner(n, cbf()) + def leaf(prev: Option[Combiner[U, This]]) = result = pit.drop2combiner(n, reuse(prev, cbf())) def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException override def split = { val pits = pit.split @@ -818,7 +828,7 @@ extends IterableLike[T, Repr] protected[this] class Slice[U >: T, This >: Repr](from: Int, until: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator) extends Transformer[Combiner[U, This], Slice[U, This]] { var result: Combiner[U, This] = null - def leaf = result = pit.slice2combiner(from, until, cbf()) + def leaf(prev: Option[Combiner[U, This]]) = result = pit.slice2combiner(from, until, reuse(prev, cbf())) def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException override def split = { val pits = pit.split @@ -835,7 +845,7 @@ extends IterableLike[T, Repr] protected[this] class SplitAt[U >: T, This >: Repr](at: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator) extends Transformer[(Combiner[U, This], Combiner[U, This]), SplitAt[U, This]] { var result: (Combiner[U, This], Combiner[U, This]) = null - def leaf = result = pit.splitAt2combiners(at, cbf(), cbf()) + def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.splitAt2combiners(at, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException override def split = { val pits = pit.split @@ -849,10 +859,10 @@ extends IterableLike[T, Repr] (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) extends Transformer[(Combiner[U, This], Boolean), TakeWhile[U, This]] { var result: (Combiner[U, This], Boolean) = null - def leaf = if (pos < pit.indexFlag) { - result = pit.takeWhile2combiner(pred, cbf()) + def leaf(prev: Option[(Combiner[U, This], Boolean)]) = if (pos < pit.indexFlag) { + result = pit.takeWhile2combiner(pred, reuse(prev.map(_._1), cbf())) if (!result._2) pit.setIndexFlagIfLesser(pos) - } else result = (cbf(), false) + } else result = (reuse(prev.map(_._1), cbf()), false) def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException override def split = { val pits = pit.split @@ -867,11 +877,11 @@ extends IterableLike[T, Repr] (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) extends Transformer[(Combiner[U, This], Combiner[U, This]), Span[U, This]] { var result: (Combiner[U, This], Combiner[U, This]) = null - def leaf = if (pos < pit.indexFlag) { - result = pit.span2combiners(pred, cbf(), cbf()) + def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = if (pos < pit.indexFlag) { + result = pit.span2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) if (result._2.size > 0) pit.setIndexFlagIfLesser(pos) } else { - result = (cbf(), pit.copy2builder[U, This, Combiner[U, This]](cbf())) + result = (reuse(prev.map(_._2), cbf()), pit.copy2builder[U, This, Combiner[U, This]](reuse(prev.map(_._2), cbf()))) } def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException override def split = { @@ -888,7 +898,7 @@ extends IterableLike[T, Repr] protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], val pit: ParallelIterator) extends Accessor[Unit, CopyToArray[U, This]] { var result: Unit = () - def leaf = pit.copyToArray(array, from, len) + def leaf(prev: Option[Unit]) = pit.copyToArray(array, from, len) def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException override def split = { val pits = pit.split diff --git a/src/parallel-collections/scala/collection/parallel/ParallelMap.scala b/src/parallel-collections/scala/collection/parallel/ParallelMap.scala index 2eaf474b07..5ce61469bc 100644 --- a/src/parallel-collections/scala/collection/parallel/ParallelMap.scala +++ b/src/parallel-collections/scala/collection/parallel/ParallelMap.scala @@ -7,7 +7,9 @@ package scala.collection.parallel import scala.collection.Map import scala.collection.mutable.Builder import scala.collection.generic.ParallelMapFactory -import scala.collection.generic.CanBuildFromParallel +import scala.collection.generic.GenericParallelMapTemplate +import scala.collection.generic.GenericParallelMapCompanion +import scala.collection.generic.CanCombineFrom @@ -16,14 +18,17 @@ import scala.collection.generic.CanBuildFromParallel trait ParallelMap[K, +V] extends Map[K, V] + with GenericParallelMapTemplate[K, V, ParallelMap] with ParallelIterable[(K, V)] with ParallelMapLike[K, V, ParallelMap[K, V], Map[K, V]] -{ self => +{ +self => + + def mapCompanion: GenericParallelMapCompanion[ParallelMap] = ParallelMap override def empty: ParallelMap[K, V] = new immutable.ParallelHashTrie[K, V] override def stringPrefix = "ParallelMap" - } @@ -31,7 +36,9 @@ extends Map[K, V] object ParallelMap extends ParallelMapFactory[ParallelMap] { def empty[K, V]: ParallelMap[K, V] = new immutable.ParallelHashTrie[K, V] - implicit def canBuildFrom[K, V]: CanBuildFromParallel[Coll, (K, V), ParallelMap[K, V]] = new ParallelMapCanBuildFrom[K, V] + def newCombiner[K, V]: Combiner[(K, V), ParallelMap[K, V]] = immutable.HashTrieCombiner[K, V] + + implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParallelMap[K, V]] = new CanCombineFromMap[K, V] } diff --git a/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala index eddc2963fa..8a0b54525f 100644 --- a/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala +++ b/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala @@ -22,9 +22,9 @@ extends MapLike[K, V, Repr] with ParallelIterableLike[(K, V), Repr, SequentialView] { self => - protected[this] override def newBuilder: Builder[(K, V), Repr] = null // TODO + protected[this] override def newBuilder: Builder[(K, V), Repr] = newCombiner - protected[this] override def newCombiner: Combiner[(K, V), Repr] = null // TODO + protected[this] override def newCombiner: Combiner[(K, V), Repr] = error("Must be implemented in concrete classes.") override def empty: Repr diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala index 3e85b8dff6..71b802cd11 100644 --- a/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala +++ b/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala @@ -6,7 +6,7 @@ import scala.collection.generic.GenericCompanion import scala.collection.generic.GenericParallelCompanion import scala.collection.generic.GenericParallelTemplate import scala.collection.generic.ParallelFactory -import scala.collection.generic.CanBuildFromParallel +import scala.collection.generic.CanCombineFrom import scala.collection.parallel.mutable.ParallelArrayCombiner import scala.collection.parallel.mutable.ParallelArray @@ -30,7 +30,7 @@ trait ParallelSeq[+T] extends Seq[T] object ParallelSeq extends ParallelFactory[ParallelSeq] { - implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeq[T]] = new GenericCanBuildFromParallel[T] + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeq[T]] = new GenericCanCombineFrom[T] def newBuilder[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T] diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala index fedc9f56ac..18b0c83f23 100644 --- a/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala +++ b/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala @@ -6,7 +6,7 @@ import scala.collection.SeqLike import scala.collection.generic.DefaultSignalling import scala.collection.generic.AtomicIndexFlag import scala.collection.generic.CanBuildFrom -import scala.collection.generic.CanBuildFromParallel +import scala.collection.generic.CanCombineFrom import scala.collection.generic.VolatileAbort @@ -320,7 +320,7 @@ extends scala.collection.SeqLike[T, Repr] protected[this] class SegmentLength(pred: T => Boolean, from: Int, val pit: ParallelIterator) extends Accessor[(Int, Boolean), SegmentLength] { var result: (Int, Boolean) = null - def leaf = if (from < pit.indexFlag) { + def leaf(prev: Option[(Int, Boolean)]) = if (from < pit.indexFlag) { val itsize = pit.remaining val seglen = pit.prefixLength(pred) result = (seglen, itsize == seglen) @@ -337,7 +337,7 @@ extends scala.collection.SeqLike[T, Repr] protected[this] class IndexWhere(pred: T => Boolean, from: Int, val pit: ParallelIterator) extends Accessor[Int, IndexWhere] { var result: Int = -1 - def leaf = if (from < pit.indexFlag) { + def leaf(prev: Option[Int]) = if (from < pit.indexFlag) { val r = pit.indexWhere(pred) if (r != -1) { result = from + r @@ -357,7 +357,7 @@ extends scala.collection.SeqLike[T, Repr] protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, val pit: ParallelIterator) extends Accessor[Int, LastIndexWhere] { var result: Int = -1 - def leaf = if (pos > pit.indexFlag) { + def leaf(prev: Option[Int]) = if (pos > pit.indexFlag) { val r = pit.lastIndexWhere(pred) if (r != -1) { result = pos + r @@ -377,15 +377,15 @@ extends scala.collection.SeqLike[T, Repr] protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], val pit: ParallelIterator) extends Transformer[Combiner[U, This], Reverse[U, This]] { var result: Combiner[U, This] = null - def leaf = result = pit.reverse2combiner(cbf()) + def leaf(prev: Option[Combiner[U, This]]) = result = pit.reverse2combiner(reuse(prev, cbf())) def newSubtask(p: SuperParallelIterator) = new Reverse(cbf, down(p)) override def merge(that: Reverse[U, This]) = result = that.result combine result } - protected[this] class ReverseMap[S, That](f: T => S, pbf: CanBuildFromParallel[Repr, S, That], val pit: ParallelIterator) + protected[this] class ReverseMap[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator) extends Transformer[Combiner[S, That], ReverseMap[S, That]] { var result: Combiner[S, That] = null - def leaf = result = pit.reverseMap2combiner(f, pbf) + def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf) // TODO def newSubtask(p: SuperParallelIterator) = new ReverseMap(f, pbf, down(p)) override def merge(that: ReverseMap[S, That]) = result = that.result combine result } @@ -393,7 +393,7 @@ extends scala.collection.SeqLike[T, Repr] protected[this] class SameElements[U >: T](val pit: ParallelIterator, val otherpit: PreciseSplitter[U]) extends Accessor[Boolean, SameElements[U]] { var result: Boolean = true - def leaf = if (!pit.isAborted) { + def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { result = pit.sameElements(otherpit) if (!result) pit.abort } @@ -406,10 +406,10 @@ extends scala.collection.SeqLike[T, Repr] override def merge(that: SameElements[U]) = result = result && that.result } - protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanBuildFromParallel[Repr, U, That], val pit: ParallelIterator) + protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], val pit: ParallelIterator) extends Transformer[Combiner[U, That], Updated[U, That]] { var result: Combiner[U, That] = null - def leaf = result = pit.updated2combiner(pos, elem, pbf) + def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf) // TODO def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException override def split = { val pits = pit.split @@ -421,7 +421,7 @@ extends scala.collection.SeqLike[T, Repr] protected[this] class Corresponds[S](corr: (T, S) => Boolean, val pit: ParallelIterator, val otherpit: PreciseSplitter[S]) extends Accessor[Boolean, Corresponds[S]] { var result: Boolean = true - def leaf = if (!pit.isAborted) { + def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { result = pit.corresponds(corr)(otherpit) if (!result) pit.abort } diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala index d0faa942ef..7862e99f44 100644 --- a/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala +++ b/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala @@ -6,7 +6,7 @@ package scala.collection.parallel import scala.collection.TraversableView import scala.collection.SeqView import scala.collection.Parallel -import scala.collection.generic.CanBuildFromParallel +import scala.collection.generic.CanCombineFrom @@ -38,8 +38,8 @@ object ParallelSeqView { type Coll = ParallelSeqView[_, C, _] forSome { type C <: ParallelSeq[_] } - implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] = - new CanBuildFromParallel[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] { + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] = + new CanCombineFrom[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] { def apply(from: Coll) = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing] def apply() = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing] } diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala index 2e9ebb1df3..eab4d7ad5f 100644 --- a/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala +++ b/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala @@ -8,7 +8,7 @@ import scala.collection.SeqView import scala.collection.SeqViewLike import scala.collection.Parallel import scala.collection.generic.CanBuildFrom -import scala.collection.generic.CanBuildFromParallel +import scala.collection.generic.CanCombineFrom @@ -161,10 +161,10 @@ extends SeqView[T, Coll] /* tasks */ - protected[this] class Force[U >: T, That](cbf: CanBuildFromParallel[Coll, U, That], val pit: ParallelIterator) + protected[this] class Force[U >: T, That](cbf: CanCombineFrom[Coll, U, That], val pit: ParallelIterator) extends Transformer[Combiner[U, That], Force[U, That]] { var result: Combiner[U, That] = null - def leaf = result = pit.copy2builder[U, That, Combiner[U, That]](cbf(self.underlying)) + def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cbf(self.underlying))) def newSubtask(p: SuperParallelIterator) = new Force(cbf, down(p)) override def merge(that: Force[U, That]) = result = result combine that.result } diff --git a/src/parallel-collections/scala/collection/parallel/Tasks.scala b/src/parallel-collections/scala/collection/parallel/Tasks.scala index d21113cf64..3ef60f8c7a 100644 --- a/src/parallel-collections/scala/collection/parallel/Tasks.scala +++ b/src/parallel-collections/scala/collection/parallel/Tasks.scala @@ -38,8 +38,10 @@ trait Tasks { def repr = this.asInstanceOf[Tp] /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */ def compute - /** Body of the task - non-divisible unit of work done by this task. */ - def leaf + /** Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous task + * or `None` if there was no previous task. + */ + def leaf(result: Option[R]) /** Start task. */ def start /** Wait for task to finish. */ @@ -88,19 +90,20 @@ trait AdaptiveWorkStealingTasks extends Tasks { def split: Seq[Task[R, Tp]] /** The actual leaf computation. */ - def leaf: Unit + def leaf(result: Option[R]): Unit - def compute = if (shouldSplitFurther) internal else leaf + def compute = if (shouldSplitFurther) internal else leaf(None) def internal = { var last = spawnSubtasks - last.leaf + last.leaf(None) result = last.result while (last.next != null) { + val lastresult = Option(last.result) last = last.next - if (last.tryCancel) last.leaf else last.sync + if (last.tryCancel) last.leaf(lastresult) else last.sync merge(last.repr) } } diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala index 3f73c8fe18..0a39dd200c 100644 --- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala +++ b/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala @@ -11,7 +11,9 @@ import scala.collection.parallel.ParallelMapLike import scala.collection.parallel.Combiner import scala.collection.parallel.EnvironmentPassingCombiner import scala.collection.generic.ParallelMapFactory -import scala.collection.generic.CanBuildFromParallel +import scala.collection.generic.CanCombineFrom +import scala.collection.generic.GenericParallelMapTemplate +import scala.collection.generic.GenericParallelMapCompanion import scala.collection.immutable.HashMap @@ -21,11 +23,15 @@ import scala.collection.immutable.HashMap class ParallelHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V]) extends ParallelMap[K, V] + with GenericParallelMapTemplate[K, V, ParallelHashTrie] with ParallelMapLike[K, V, ParallelHashTrie[K, V], HashMap[K, V]] -{ self => +{ +self => def this() = this(HashMap.empty[K, V]) + override def mapCompanion: GenericParallelMapCompanion[ParallelHashTrie] = ParallelHashTrie + override def empty: ParallelHashTrie[K, V] = new ParallelHashTrie[K, V] def parallelIterator = new ParallelHashTrieIterator(trie) with SCPI @@ -40,6 +46,11 @@ extends ParallelMap[K, V] override def size = trie.size + protected override def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]) = oldc match { + case Some(old) => old + case None => newc + } + type SCPI = SignalContextPassingIterator[ParallelHashTrieIterator] class ParallelHashTrieIterator(val ht: HashMap[K, V]) @@ -70,7 +81,13 @@ extends ParallelMap[K, V] object ParallelHashTrie extends ParallelMapFactory[ParallelHashTrie] { def empty[K, V]: ParallelHashTrie[K, V] = new ParallelHashTrie[K, V] - implicit def canBuildFrom[K, V]: CanBuildFromParallel[Coll, (K, V), ParallelHashTrie[K, V]] = new ParallelMapCanBuildFrom[K, V] + def newCombiner[K, V]: Combiner[(K, V), ParallelHashTrie[K, V]] = HashTrieCombiner[K, V] + + implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParallelHashTrie[K, V]] = { + new CanCombineFromMap[K, V] + } + + var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0) } @@ -85,16 +102,17 @@ self: EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] => def +=(elem: (K, V)) = { trie += elem; this } - def result = new ParallelHashTrie[K, V](trie) - - def combine[N <: (K, V), NewTo >: ParallelHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = { + def combine[N <: (K, V), NewTo >: ParallelHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { + // ParallelHashTrie.totalcombines.incrementAndGet if (other.isInstanceOf[HashTrieCombiner[_, _]]) { val that = other.asInstanceOf[HashTrieCombiner[K, V]] val ncombiner = HashTrieCombiner[K, V] ncombiner.trie = this.trie combine that.trie ncombiner } else error("Unexpected combiner type.") - } + } else this + + def result = new ParallelHashTrie[K, V](trie) } diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala index a07db9c39c..85a33c7431 100644 --- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala +++ b/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala @@ -6,7 +6,7 @@ import scala.collection.immutable.Range import scala.collection.immutable.RangeUtils import scala.collection.parallel.ParallelSeq import scala.collection.parallel.Combiner -import scala.collection.generic.CanBuildFromParallel +import scala.collection.generic.CanCombineFrom @@ -62,8 +62,8 @@ extends ParallelSeq[Int] /* transformers */ - override def map2combiner[S, That](f: Int => S, pbf: CanBuildFromParallel[ParallelSeq[Int], S, That]): Combiner[S, That] = { - val cb = pbf(self.repr) + override def map2combiner[S, That](f: Int => S, cb: Combiner[S, That]): Combiner[S, That] = { + //val cb = pbf(self.repr) val sz = remaining cb.sizeHint(sz) if (sz > 0) { diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala index f3a26e8682..ceb0dcc13d 100644 --- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala +++ b/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala @@ -4,7 +4,7 @@ package scala.collection.parallel.immutable import scala.collection.generic.GenericParallelTemplate import scala.collection.generic.GenericCompanion import scala.collection.generic.GenericParallelCompanion -import scala.collection.generic.CanBuildFromParallel +import scala.collection.generic.CanCombineFrom import scala.collection.generic.ParallelFactory import scala.collection.parallel.ParallelSeqLike import scala.collection.parallel.Combiner diff --git a/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala b/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala index e45b3f156a..bd17d24ea8 100644 --- a/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala +++ b/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala @@ -27,12 +27,12 @@ trait LazyCombiner[Elem, +To, Buff <: Growable[Elem] with Sizing] extends Combin def +=(elem: Elem) = { lastbuff += elem; this } def result: To = allocateAndCopy def clear = { chain.clear } - def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = { + def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { if (other.isInstanceOf[LazyCombiner[_, _, _]]) { val that = other.asInstanceOf[LazyCombiner[Elem, To, Buff]] newLazyCombiner(chain ++= that.chain) } else throw new UnsupportedOperationException("Cannot combine with combiner of different type.") - } + } else this def size = chain.foldLeft(0)(_ + _.size) /** Method that allocates the data structure and copies elements into it using @@ -40,4 +40,4 @@ trait LazyCombiner[Elem, +To, Buff <: Growable[Elem] with Sizing] extends Combin */ def allocateAndCopy: To def newLazyCombiner(buffchain: ArrayBuffer[Buff]): LazyCombiner[Elem, To, Buff] -}
\ No newline at end of file +} diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala index 7ea5499aa6..30b4b109b2 100644 --- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala +++ b/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala @@ -5,7 +5,7 @@ package scala.collection.parallel.mutable import scala.collection.generic.GenericParallelTemplate import scala.collection.generic.GenericCompanion import scala.collection.generic.GenericParallelCompanion -import scala.collection.generic.CanBuildFromParallel +import scala.collection.generic.CanCombineFrom import scala.collection.generic.ParallelFactory import scala.collection.generic.Sizing import scala.collection.parallel.Combiner @@ -357,8 +357,8 @@ extends ParallelSeq[T] /* transformers */ - override def map2combiner[S, That](f: T => S, cbf: CanBuildFromParallel[ParallelArray[T], S, That]): Combiner[S, That] = { - val cb = cbf(self.repr) + override def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = { + //val cb = cbf(self.repr) cb.sizeHint(remaining) map2combiner_quick(f, arr, cb, until, i) i = until @@ -373,7 +373,7 @@ extends ParallelSeq[T] } } - override def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanBuildFromParallel[ParallelArray[T], S, That]): Combiner[S, That] = { + override def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[ParallelArray[T], S, That]): Combiner[S, That] = { val cb = pbf(self.repr) collect2combiner_quick(pf, arr, cb, until, i) i = until @@ -389,7 +389,7 @@ extends ParallelSeq[T] } } - override def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanBuildFromParallel[ParallelArray[T], S, That]): Combiner[S, That] = { + override def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[ParallelArray[T], S, That]): Combiner[S, That] = { val cb = pbf(self.repr) while (i < until) { val traversable = f(arr(i).asInstanceOf[T]) @@ -518,7 +518,7 @@ extends ParallelSeq[T] object ParallelArray extends ParallelFactory[ParallelArray] { - implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelArray[T]] = new GenericCanBuildFromParallel[T] + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelArray[T]] = new GenericCanCombineFrom[T] def newBuilder[T]: Combiner[T, ParallelArray[T]] = newCombiner def newCombiner[T]: Combiner[T, ParallelArray[T]] = ParallelArrayCombiner[T] diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala index 9bbad7035e..2991344be2 100644 --- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala +++ b/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala @@ -43,7 +43,7 @@ extends LazyCombiner[T, ParallelArray[T], ExposedArrayBuffer[T]] class CopyChainToArray(array: Array[Any], offset: Int, howmany: Int) extends super.Task[Unit, CopyChainToArray] { var result = () - def leaf = if (howmany > 0) { + def leaf(prev: Option[Unit]) = if (howmany > 0) { var totalleft = howmany val (stbuff, stind) = findStart(offset) var buffind = stbuff diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala index f7ba44b67e..bd0a46bc43 100644 --- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala +++ b/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala @@ -28,8 +28,8 @@ trait ParallelIterable[T] extends collection.mutable.Iterable[T] /** $factoryinfo */ object ParallelIterable extends ParallelFactory[ParallelIterable] { - implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelIterable[T]] = - new GenericCanBuildFromParallel[T] + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelIterable[T]] = + new GenericCanCombineFrom[T] def newBuilder[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T] diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala index ed08b59962..636ba1ac3d 100644 --- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala +++ b/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala @@ -4,7 +4,7 @@ package scala.collection.parallel.mutable import scala.collection.generic.GenericParallelTemplate import scala.collection.generic.GenericCompanion import scala.collection.generic.GenericParallelCompanion -import scala.collection.generic.CanBuildFromParallel +import scala.collection.generic.CanCombineFrom import scala.collection.generic.ParallelFactory import scala.collection.parallel.ParallelSeqLike import scala.collection.parallel.Combiner @@ -38,7 +38,7 @@ trait ParallelSeq[T] extends collection.mutable.Seq[T] * @define coll mutable parallel sequence */ object ParallelSeq extends ParallelFactory[ParallelSeq] { - implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeq[T]] = new GenericCanBuildFromParallel[T] + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeq[T]] = new GenericCanCombineFrom[T] def newBuilder[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T] diff --git a/src/parallel-collections/scala/collection/parallel/package.scala b/src/parallel-collections/scala/collection/parallel/package.scala index 6e8cbe8633..3b297f1cd1 100644 --- a/src/parallel-collections/scala/collection/parallel/package.scala +++ b/src/parallel-collections/scala/collection/parallel/package.scala @@ -4,7 +4,7 @@ package scala.collection import java.lang.Thread._ import scala.collection.generic.CanBuildFrom -import scala.collection.generic.CanBuildFromParallel +import scala.collection.generic.CanCombineFrom /** Package object for parallel collections. @@ -34,8 +34,8 @@ package object parallel { implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new { def isParallel = bf.isInstanceOf[Parallel] - def asParallel = bf.asInstanceOf[CanBuildFromParallel[From, Elem, To]] - def ifParallel[R](isbody: CanBuildFromParallel[From, Elem, To] => R) = new { + def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]] + def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new { def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody } } |