summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-06-18 07:49:14 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-06-18 07:49:14 +0000
commit18ad78dd73b29c0c8b34e970c58cd86232cdc4f5 (patch)
tree12ff8e92f2601ff2127e1f2c6957d95e28c76e18
parentb9fb76d09d0a6e63bfb6f332079ab7d05f1233ca (diff)
downloadscala-18ad78dd73b29c0c8b34e970c58cd86232cdc4f5.tar.gz
scala-18ad78dd73b29c0c8b34e970c58cd86232cdc4f5.tar.bz2
scala-18ad78dd73b29c0c8b34e970c58cd86232cdc4f5.zip
Refactorings and hash trie combiners. No review.
-rw-r--r--src/parallel-collections/scala/collection/Parallelizable.scala4
-rw-r--r--src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala2
-rw-r--r--src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala22
-rw-r--r--src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala11
-rw-r--r--src/parallel-collections/scala/collection/generic/ParallelFactory.scala2
-rw-r--r--src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala11
-rw-r--r--src/parallel-collections/scala/collection/parallel/Combiners.scala3
-rw-r--r--src/parallel-collections/scala/collection/parallel/Iterators.scala14
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelIterable.scala4
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala88
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelMap.scala15
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala4
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelSeq.scala4
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala22
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala6
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala6
-rw-r--r--src/parallel-collections/scala/collection/parallel/Tasks.scala15
-rw-r--r--src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala32
-rw-r--r--src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala6
-rw-r--r--src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala2
-rw-r--r--src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala6
-rw-r--r--src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala12
-rw-r--r--src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala2
-rw-r--r--src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala4
-rw-r--r--src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala4
-rw-r--r--src/parallel-collections/scala/collection/parallel/package.scala6
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala87
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala118
28 files changed, 385 insertions, 127 deletions
diff --git a/src/parallel-collections/scala/collection/Parallelizable.scala b/src/parallel-collections/scala/collection/Parallelizable.scala
index 206285459d..bfca4a41d7 100644
--- a/src/parallel-collections/scala/collection/Parallelizable.scala
+++ b/src/parallel-collections/scala/collection/Parallelizable.scala
@@ -11,12 +11,12 @@ import parallel.ParallelIterableLike
* by invoking the method `par`. Parallelizable collections may be parametrized with
* a target type different than their own.
*/
-trait Parallelizable[+T, +Repr <: Parallel] {
+trait Parallelizable[+T, +ParRepr <: Parallel] {
/**
* Returns a parallel implementation of a collection.
*/
- def par: Repr
+ def par: ParRepr
}
diff --git a/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala b/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala
index 404201b1c2..fcbcd6295e 100644
--- a/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala
+++ b/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala
@@ -15,7 +15,7 @@ import scala.collection.parallel._
* @tparam Elem the element type of the collection to be created
* @tparam To the type of the collection to be created
*/
-trait CanBuildFromParallel[-From, -Elem, +To] extends CanBuildFrom[From, Elem, To] with Parallel {
+trait CanCombineFrom[-From, -Elem, +To] extends CanBuildFrom[From, Elem, To] with Parallel {
def apply(from: From): Combiner[Elem, To]
def apply(): Combiner[Elem, To]
}
diff --git a/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala b/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala
index 14eb9ab282..e5ba36f846 100644
--- a/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala
+++ b/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala
@@ -3,29 +3,27 @@ package scala.collection.generic
import scala.collection.parallel.Combiner
import scala.collection.parallel.ParallelIterable
+import scala.collection.parallel.ParallelMap
-/**
- * A template class for companion objects of parallel collection classes.
- * They should be mixed in together with `GenericCompanion` type.
- * @tparam CC the type constructor representing the collection class
- * @since 2.8
+/** A template class for companion objects of parallel collection classes.
+ * They should be mixed in together with `GenericCompanion` type.
+ * @tparam CC the type constructor representing the collection class
+ * @since 2.8
*/
trait GenericParallelCompanion[+CC[X] <: ParallelIterable[X]] {
- /**
- * The default builder for $Coll objects.
+ /** The default builder for $Coll objects.
*/
def newBuilder[A]: Combiner[A, CC[A]]
- /**
- * The parallel builder for $Coll objects.
+ /** The parallel builder for $Coll objects.
*/
def newCombiner[A]: Combiner[A, CC[A]]
}
-
-
-
+trait GenericParallelMapCompanion[+CC[P, Q] <: ParallelMap[P, Q]] {
+ def newCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]]
+}
diff --git a/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala b/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala
index 58454be04e..e98c13fa36 100644
--- a/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala
+++ b/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala
@@ -4,6 +4,7 @@ package scala.collection.generic
import scala.collection.parallel.Combiner
import scala.collection.parallel.ParallelIterable
+import scala.collection.parallel.ParallelMap
import scala.collection.parallel.TaskSupport
@@ -47,7 +48,17 @@ extends GenericTraversableTemplate[A, CC]
}
+trait GenericParallelMapTemplate[K, +V, +CC[X, Y] <: ParallelMap[X, Y]]
+extends TaskSupport
+{
+ def mapCompanion: GenericParallelMapCompanion[CC]
+ def genericMapCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]] = {
+ val cb = mapCompanion.newCombiner[P, Q]
+ cb.environment = environment
+ cb
+ }
+}
diff --git a/src/parallel-collections/scala/collection/generic/ParallelFactory.scala b/src/parallel-collections/scala/collection/generic/ParallelFactory.scala
index 86a5fdf822..0b9e92aa10 100644
--- a/src/parallel-collections/scala/collection/generic/ParallelFactory.scala
+++ b/src/parallel-collections/scala/collection/generic/ParallelFactory.scala
@@ -23,7 +23,7 @@ extends TraversableFactory[CC]
* `apply(from)` to the `genericParallelBuilder` method of the $coll `from`, and calls to `apply()`
* to this factory.
*/
- class GenericCanBuildFromParallel[A] extends GenericCanBuildFrom[A] with CanBuildFromParallel[CC[_], A, CC[A]] {
+ class GenericCanCombineFrom[A] extends GenericCanBuildFrom[A] with CanCombineFrom[CC[_], A, CC[A]] {
override def apply(from: Coll) = from.genericCombiner
override def apply() = newBuilder[A]
}
diff --git a/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala b/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala
index ceda9d1155..8f779b4029 100644
--- a/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala
+++ b/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala
@@ -17,7 +17,10 @@ import scala.collection.mutable.Builder
* @define $Coll ParallelMap
*/
abstract class ParallelMapFactory[CC[X, Y] <: ParallelMap[X, Y] with ParallelMapLike[X, Y, CC[X, Y], _]]
-extends MapFactory[CC] {
+extends MapFactory[CC]
+ with GenericParallelMapCompanion[CC] {
+
+ type MapColl = CC[_, _]
/** The default builder for $Coll objects.
* @tparam K the type of the keys
@@ -29,10 +32,10 @@ extends MapFactory[CC] {
* @tparam K the type of the keys
* @tparam V the type of the associated values
*/
- def newCombiner[K, V]: Combiner[(K, V), CC[K, V]] = null // TODO
+ def newCombiner[K, V]: Combiner[(K, V), CC[K, V]]
- class ParallelMapCanBuildFrom[K, V] extends CanBuildFromParallel[CC[_, _], (K, V), CC[K, V]] {
- def apply(from: CC[_, _]) = newCombiner[K, V]
+ class CanCombineFromMap[K, V] extends CanCombineFrom[CC[_, _], (K, V), CC[K, V]] {
+ def apply(from: MapColl) = from.genericMapCombiner[K, V].asInstanceOf[Combiner[(K, V), CC[K, V]]]
def apply() = newCombiner[K, V]
}
diff --git a/src/parallel-collections/scala/collection/parallel/Combiners.scala b/src/parallel-collections/scala/collection/parallel/Combiners.scala
index 80966f3435..a37f642d42 100644
--- a/src/parallel-collections/scala/collection/parallel/Combiners.scala
+++ b/src/parallel-collections/scala/collection/parallel/Combiners.scala
@@ -34,6 +34,9 @@ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel w
* after the invocation of this method, and should be cleared (see `clear`)
* if they are to be used again.
*
+ * Also, combining two combiners `c1` and `c2` for which `c1 eq c2` is `true`, that is,
+ * they are the same objects in memories, always does nothing and returns the first combiner.
+ *
* @tparam N the type of elements contained by the `other` builder
* @tparam NewTo the type of collection produced by the `other` builder
* @param other the other builder
diff --git a/src/parallel-collections/scala/collection/parallel/Iterators.scala b/src/parallel-collections/scala/collection/parallel/Iterators.scala
index 30aca2965b..bfebff994c 100644
--- a/src/parallel-collections/scala/collection/parallel/Iterators.scala
+++ b/src/parallel-collections/scala/collection/parallel/Iterators.scala
@@ -5,7 +5,7 @@ package scala.collection.parallel
import scala.collection.Parallel
import scala.collection.generic.Signalling
import scala.collection.generic.DelegatedSignalling
-import scala.collection.generic.CanBuildFromParallel
+import scala.collection.generic.CanCombineFrom
import scala.collection.mutable.Builder
import scala.collection.Iterator.empty
@@ -93,14 +93,14 @@ trait AugmentedIterableIterator[+T, +Repr <: Parallel] extends RemainsIterator[T
/* transformers to combiners */
- def map2combiner[S, That](f: T => S, pbf: CanBuildFromParallel[Repr, S, That]): Combiner[S, That] = {
- val cb = pbf(repr)
+ def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
+ //val cb = pbf(repr)
cb.sizeHint(remaining)
while (hasNext) cb += f(next)
cb
}
- def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanBuildFromParallel[Repr, S, That]): Combiner[S, That] = {
+ def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = {
val cb = pbf(repr)
while (hasNext) {
val curr = next
@@ -109,7 +109,7 @@ trait AugmentedIterableIterator[+T, +Repr <: Parallel] extends RemainsIterator[T
cb
}
- def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanBuildFromParallel[Repr, S, That]): Combiner[S, That] = {
+ def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = {
val cb = pbf(repr)
while (hasNext) {
val traversable = f(next)
@@ -276,7 +276,7 @@ trait AugmentedSeqIterator[+T, +Repr <: Parallel] extends AugmentedIterableItera
cb
}
- def reverseMap2combiner[S, That](f: T => S, cbf: CanBuildFromParallel[Repr, S, That]): Combiner[S, That] = {
+ def reverseMap2combiner[S, That](f: T => S, cbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = {
val cb = cbf(repr)
cb.sizeHint(remaining)
var lst = List[S]()
@@ -288,7 +288,7 @@ trait AugmentedSeqIterator[+T, +Repr <: Parallel] extends AugmentedIterableItera
cb
}
- def updated2combiner[U >: T, That](index: Int, elem: U, cbf: CanBuildFromParallel[Repr, U, That]): Combiner[U, That] = {
+ def updated2combiner[U >: T, That](index: Int, elem: U, cbf: CanCombineFrom[Repr, U, That]): Combiner[U, That] = {
val cb = cbf(repr)
cb.sizeHint(remaining)
var j = 0
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala b/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala
index 83cb37f9c8..4882dc19ee 100644
--- a/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala
+++ b/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala
@@ -26,8 +26,8 @@ trait ParallelIterable[+T] extends Iterable[T]
/** $factoryinfo
*/
object ParallelIterable extends ParallelFactory[ParallelIterable] {
- implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelIterable[T]] =
- new GenericCanBuildFromParallel[T]
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelIterable[T]] =
+ new GenericCanCombineFrom[T]
def newBuilder[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T]
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala
index 5ed6d10195..01a108eea0 100644
--- a/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala
+++ b/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala
@@ -102,7 +102,7 @@ import scala.collection.generic._
* The order in which the operations on elements are performed is unspecified and may be nondeterministic.
*
* @define pbfinfo
- * An implicit value of class `CanBuildFromParallel` which determines the
+ * An implicit value of class `CanCombineFrom` which determines the
* result class `That` from the current representation type `Repr` and
* and the new element type `B`. This builder factory can provide a parallel
* builder for the resulting collection.
@@ -199,6 +199,16 @@ extends IterableLike[T, Repr]
*/
protected[this] override def newBuilder: collection.mutable.Builder[T, Repr] = newCombiner
+ /** Optionally reuses existing combiner for better performance. By default it doesn't - subclasses may override this behaviour.
+ * The provided combiner `oldc` that can potentially be reused will be either some combiner from the previous computational task, or `None` if there
+ * was no previous phase (in which case this method must return `newc`).
+ *
+ * @param oldc The combiner that is the result of the previous task, or `None` if there was no previous task.
+ * @param newc The new, empty combiner that can be used.
+ * @return Either `newc` or `oldc`.
+ */
+ protected def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]): Combiner[S, That] = newc
+
/* convenience task operations wrapper */
protected implicit def task2ops[R, Tp](tsk: Task[R, Tp]) = new {
def mapResult[R1](mapping: R => R1): ResultMapping[R, Tp, R1] = new ResultMapping[R, Tp, R1](tsk) {
@@ -219,7 +229,7 @@ extends IterableLike[T, Repr]
}
protected def wrap[R](body: => R) = new NonDivisible[R] {
- def leaf = result = body
+ def leaf(prevr: Option[R]) = result = body
var result: R = null.asInstanceOf[R]
}
@@ -347,7 +357,7 @@ extends IterableLike[T, Repr]
}
override def product[U >: T](implicit num: Numeric[U]): U = {
- executeAndWaitResult(new Product[U](num, parallelIterator))
+ executeAndWaitResult(new Product[U](num, parallelIterator))
}
override def min[U >: T](implicit ord: Ordering[U]): T = {
@@ -612,7 +622,7 @@ extends IterableLike[T, Repr]
/** Sequentially performs one task after another. */
protected[this] trait SeqComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]]
extends Composite[FR, SR, R, First, Second] {
- def leaf = {
+ def leaf(prevr: Option[R]) = {
ft.compute
st.compute
result = combineResults(ft.result, st.result)
@@ -622,7 +632,7 @@ extends IterableLike[T, Repr]
/** Performs two tasks in parallel, and waits for both to finish. */
protected[this] trait ParComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]]
extends Composite[FR, SR, R, First, Second] {
- def leaf = {
+ def leaf(prevr: Option[R]) = {
st.start
ft.compute
st.sync
@@ -634,7 +644,7 @@ extends IterableLike[T, Repr]
extends NonDivisibleTask[R1, ResultMapping[R, Tp, R1]] {
var result: R1 = null.asInstanceOf[R1]
def map(r: R): R1
- def leaf = {
+ def leaf(prevr: Option[R1]) = {
inner.compute
result = map(inner.result)
}
@@ -644,27 +654,27 @@ extends IterableLike[T, Repr]
protected[this] class Foreach[S](op: T => S, val pit: ParallelIterator) extends Accessor[Unit, Foreach[S]] {
var result: Unit = ()
- def leaf = pit.foreach(op)
+ def leaf(prevr: Option[Unit]) = pit.foreach(op)
def newSubtask(p: ParallelIterator) = new Foreach[S](op, p)
}
protected[this] class Count(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Int, Count] {
var result: Int = 0
- def leaf = result = pit.count(pred)
+ def leaf(prevr: Option[Int]) = result = pit.count(pred)
def newSubtask(p: ParallelIterator) = new Count(pred, p)
override def merge(that: Count) = result = result + that.result
}
protected[this] class Reduce[U >: T](op: (U, U) => U, val pit: ParallelIterator) extends Accessor[U, Reduce[U]] {
var result: U = null.asInstanceOf[U]
- def leaf = result = pit.reduce(op)
+ def leaf(prevr: Option[U]) = result = pit.reduce(op)
def newSubtask(p: ParallelIterator) = new Reduce(op, p)
override def merge(that: Reduce[U]) = result = op(result, that.result)
}
protected[this] class Fold[U >: T](z: U, op: (U, U) => U, val pit: ParallelIterator) extends Accessor[U, Fold[U]] {
var result: U = null.asInstanceOf[U]
- def leaf = result = pit.fold(z)(op)
+ def leaf(prevr: Option[U]) = result = pit.fold(z)(op)
def newSubtask(p: ParallelIterator) = new Fold(z, op, p)
override def merge(that: Fold[U]) = result = op(result, that.result)
}
@@ -672,81 +682,81 @@ extends IterableLike[T, Repr]
protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, val pit: ParallelIterator)
extends Accessor[S, Aggregate[S]] {
var result: S = null.asInstanceOf[S]
- def leaf = result = pit.foldLeft(z)(seqop)
+ def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop)
def newSubtask(p: ParallelIterator) = new Aggregate(z, seqop, combop, p)
override def merge(that: Aggregate[S]) = result = combop(result, that.result)
}
protected[this] class Sum[U >: T](num: Numeric[U], val pit: ParallelIterator) extends Accessor[U, Sum[U]] {
var result: U = null.asInstanceOf[U]
- def leaf = result = pit.sum(num)
+ def leaf(prevr: Option[U]) = result = pit.sum(num)
def newSubtask(p: ParallelIterator) = new Sum(num, p)
override def merge(that: Sum[U]) = result = num.plus(result, that.result)
}
protected[this] class Product[U >: T](num: Numeric[U], val pit: ParallelIterator) extends Accessor[U, Product[U]] {
var result: U = null.asInstanceOf[U]
- def leaf = result = pit.product(num)
+ def leaf(prevr: Option[U]) = result = pit.product(num)
def newSubtask(p: ParallelIterator) = new Product(num, p)
override def merge(that: Product[U]) = result = num.times(result, that.result)
}
protected[this] class Min[U >: T](ord: Ordering[U], val pit: ParallelIterator) extends Accessor[U, Min[U]] {
var result: U = null.asInstanceOf[U]
- def leaf = result = pit.min(ord)
+ def leaf(prevr: Option[U]) = result = pit.min(ord)
def newSubtask(p: ParallelIterator) = new Min(ord, p)
override def merge(that: Min[U]) = result = if (ord.lteq(result, that.result)) result else that.result
}
protected[this] class Max[U >: T](ord: Ordering[U], val pit: ParallelIterator) extends Accessor[U, Max[U]] {
var result: U = null.asInstanceOf[U]
- def leaf = result = pit.max(ord)
+ def leaf(prevr: Option[U]) = result = pit.max(ord)
def newSubtask(p: ParallelIterator) = new Max(ord, p)
override def merge(that: Max[U]) = result = if (ord.gteq(result, that.result)) result else that.result
}
- protected[this] class Map[S, That](f: T => S, pbf: CanBuildFromParallel[Repr, S, That], val pit: ParallelIterator)
+ protected[this] class Map[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator)
extends Transformer[Combiner[S, That], Map[S, That]] {
var result: Combiner[S, That] = null
- def leaf = result = pit.map2combiner(f, pbf)
+ def leaf(prev: Option[Combiner[S, That]]) = result = pit.map2combiner(f, reuse(prev, pbf(self.repr)))
def newSubtask(p: ParallelIterator) = new Map(f, pbf, p)
override def merge(that: Map[S, That]) = result = result combine that.result
}
protected[this] class Collect[S, That]
- (pf: PartialFunction[T, S], pbf: CanBuildFromParallel[Repr, S, That], val pit: ParallelIterator)
+ (pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator)
extends Transformer[Combiner[S, That], Collect[S, That]] {
var result: Combiner[S, That] = null
- def leaf = result = pit.collect2combiner[S, That](pf, pbf)
+ def leaf(prev: Option[Combiner[S, That]]) = result = pit.collect2combiner[S, That](pf, pbf) // TODO
def newSubtask(p: ParallelIterator) = new Collect(pf, pbf, p)
override def merge(that: Collect[S, That]) = result = result combine that.result
}
- protected[this] class FlatMap[S, That](f: T => Traversable[S], pbf: CanBuildFromParallel[Repr, S, That], val pit: ParallelIterator)
+ protected[this] class FlatMap[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator)
extends Transformer[Combiner[S, That], FlatMap[S, That]] {
var result: Combiner[S, That] = null
- def leaf = result = pit.flatmap2combiner(f, pbf)
+ def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf) // TODO
def newSubtask(p: ParallelIterator) = new FlatMap(f, pbf, p)
override def merge(that: FlatMap[S, That]) = result = result combine that.result
}
protected[this] class Forall(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Boolean, Forall] {
var result: Boolean = true
- def leaf = { if (!pit.isAborted) result = pit.forall(pred); if (result == false) pit.abort }
+ def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.forall(pred); if (result == false) pit.abort }
def newSubtask(p: ParallelIterator) = new Forall(pred, p)
override def merge(that: Forall) = result = result && that.result
}
protected[this] class Exists(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Boolean, Exists] {
var result: Boolean = false
- def leaf = { if (!pit.isAborted) result = pit.exists(pred); if (result == true) pit.abort }
+ def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.exists(pred); if (result == true) pit.abort }
def newSubtask(p: ParallelIterator) = new Exists(pred, p)
override def merge(that: Exists) = result = result || that.result
}
protected[this] class Find[U >: T](pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Option[U], Find[U]] {
var result: Option[U] = None
- def leaf = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort }
+ def leaf(prev: Option[Option[U]]) = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort }
def newSubtask(p: ParallelIterator) = new Find(pred, p)
override def merge(that: Find[U]) = if (this.result == None) result = that.result
}
@@ -754,7 +764,7 @@ extends IterableLike[T, Repr]
protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
extends Transformer[Combiner[U, This], Filter[U, This]] {
var result: Combiner[U, This] = null
- def leaf = result = pit.filter2combiner(pred, cbf())
+ def leaf(prev: Option[Combiner[U, This]]) = result = pit.filter2combiner(pred, reuse(prev, cbf()))
def newSubtask(p: ParallelIterator) = new Filter(pred, cbf, p)
override def merge(that: Filter[U, This]) = result = result combine that.result
}
@@ -762,7 +772,7 @@ extends IterableLike[T, Repr]
protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
extends Transformer[Combiner[U, This], FilterNot[U, This]] {
var result: Combiner[U, This] = null
- def leaf = result = pit.filterNot2combiner(pred, cbf())
+ def leaf(prev: Option[Combiner[U, This]]) = result = pit.filterNot2combiner(pred, reuse(prev, cbf()))
def newSubtask(p: ParallelIterator) = new FilterNot(pred, cbf, p)
override def merge(that: FilterNot[U, This]) = result = result combine that.result
}
@@ -770,7 +780,7 @@ extends IterableLike[T, Repr]
protected class Copy[U >: T, That](cfactory: () => Combiner[U, That], val pit: ParallelIterator)
extends Transformer[Combiner[U, That], Copy[U, That]] {
var result: Combiner[U, That] = null
- def leaf = result = pit.copy2builder[U, That, Combiner[U, That]](cfactory())
+ def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cfactory()))
def newSubtask(p: ParallelIterator) = new Copy[U, That](cfactory, p)
override def merge(that: Copy[U, That]) = result = result combine that.result
}
@@ -778,7 +788,7 @@ extends IterableLike[T, Repr]
protected[this] class Partition[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
extends Transformer[(Combiner[U, This], Combiner[U, This]), Partition[U, This]] {
var result: (Combiner[U, This], Combiner[U, This]) = null
- def leaf = result = pit.partition2combiners(pred, cbf(), cbf())
+ def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.partition2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf()))
def newSubtask(p: ParallelIterator) = new Partition(pred, cbf, p)
override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2)
}
@@ -786,7 +796,7 @@ extends IterableLike[T, Repr]
protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator)
extends Transformer[Combiner[U, This], Take[U, This]] {
var result: Combiner[U, This] = null
- def leaf = result = pit.take2combiner(n, cbf())
+ def leaf(prev: Option[Combiner[U, This]]) = result = pit.take2combiner(n, reuse(prev, cbf()))
def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
override def split = {
val pits = pit.split
@@ -802,7 +812,7 @@ extends IterableLike[T, Repr]
protected[this] class Drop[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator)
extends Transformer[Combiner[U, This], Drop[U, This]] {
var result: Combiner[U, This] = null
- def leaf = result = pit.drop2combiner(n, cbf())
+ def leaf(prev: Option[Combiner[U, This]]) = result = pit.drop2combiner(n, reuse(prev, cbf()))
def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
override def split = {
val pits = pit.split
@@ -818,7 +828,7 @@ extends IterableLike[T, Repr]
protected[this] class Slice[U >: T, This >: Repr](from: Int, until: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator)
extends Transformer[Combiner[U, This], Slice[U, This]] {
var result: Combiner[U, This] = null
- def leaf = result = pit.slice2combiner(from, until, cbf())
+ def leaf(prev: Option[Combiner[U, This]]) = result = pit.slice2combiner(from, until, reuse(prev, cbf()))
def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
override def split = {
val pits = pit.split
@@ -835,7 +845,7 @@ extends IterableLike[T, Repr]
protected[this] class SplitAt[U >: T, This >: Repr](at: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator)
extends Transformer[(Combiner[U, This], Combiner[U, This]), SplitAt[U, This]] {
var result: (Combiner[U, This], Combiner[U, This]) = null
- def leaf = result = pit.splitAt2combiners(at, cbf(), cbf())
+ def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.splitAt2combiners(at, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf()))
def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
override def split = {
val pits = pit.split
@@ -849,10 +859,10 @@ extends IterableLike[T, Repr]
(pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
extends Transformer[(Combiner[U, This], Boolean), TakeWhile[U, This]] {
var result: (Combiner[U, This], Boolean) = null
- def leaf = if (pos < pit.indexFlag) {
- result = pit.takeWhile2combiner(pred, cbf())
+ def leaf(prev: Option[(Combiner[U, This], Boolean)]) = if (pos < pit.indexFlag) {
+ result = pit.takeWhile2combiner(pred, reuse(prev.map(_._1), cbf()))
if (!result._2) pit.setIndexFlagIfLesser(pos)
- } else result = (cbf(), false)
+ } else result = (reuse(prev.map(_._1), cbf()), false)
def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
override def split = {
val pits = pit.split
@@ -867,11 +877,11 @@ extends IterableLike[T, Repr]
(pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
extends Transformer[(Combiner[U, This], Combiner[U, This]), Span[U, This]] {
var result: (Combiner[U, This], Combiner[U, This]) = null
- def leaf = if (pos < pit.indexFlag) {
- result = pit.span2combiners(pred, cbf(), cbf())
+ def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = if (pos < pit.indexFlag) {
+ result = pit.span2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf()))
if (result._2.size > 0) pit.setIndexFlagIfLesser(pos)
} else {
- result = (cbf(), pit.copy2builder[U, This, Combiner[U, This]](cbf()))
+ result = (reuse(prev.map(_._2), cbf()), pit.copy2builder[U, This, Combiner[U, This]](reuse(prev.map(_._2), cbf())))
}
def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
override def split = {
@@ -888,7 +898,7 @@ extends IterableLike[T, Repr]
protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], val pit: ParallelIterator)
extends Accessor[Unit, CopyToArray[U, This]] {
var result: Unit = ()
- def leaf = pit.copyToArray(array, from, len)
+ def leaf(prev: Option[Unit]) = pit.copyToArray(array, from, len)
def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
override def split = {
val pits = pit.split
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelMap.scala b/src/parallel-collections/scala/collection/parallel/ParallelMap.scala
index 2eaf474b07..5ce61469bc 100644
--- a/src/parallel-collections/scala/collection/parallel/ParallelMap.scala
+++ b/src/parallel-collections/scala/collection/parallel/ParallelMap.scala
@@ -7,7 +7,9 @@ package scala.collection.parallel
import scala.collection.Map
import scala.collection.mutable.Builder
import scala.collection.generic.ParallelMapFactory
-import scala.collection.generic.CanBuildFromParallel
+import scala.collection.generic.GenericParallelMapTemplate
+import scala.collection.generic.GenericParallelMapCompanion
+import scala.collection.generic.CanCombineFrom
@@ -16,14 +18,17 @@ import scala.collection.generic.CanBuildFromParallel
trait ParallelMap[K, +V]
extends Map[K, V]
+ with GenericParallelMapTemplate[K, V, ParallelMap]
with ParallelIterable[(K, V)]
with ParallelMapLike[K, V, ParallelMap[K, V], Map[K, V]]
-{ self =>
+{
+self =>
+
+ def mapCompanion: GenericParallelMapCompanion[ParallelMap] = ParallelMap
override def empty: ParallelMap[K, V] = new immutable.ParallelHashTrie[K, V]
override def stringPrefix = "ParallelMap"
-
}
@@ -31,7 +36,9 @@ extends Map[K, V]
object ParallelMap extends ParallelMapFactory[ParallelMap] {
def empty[K, V]: ParallelMap[K, V] = new immutable.ParallelHashTrie[K, V]
- implicit def canBuildFrom[K, V]: CanBuildFromParallel[Coll, (K, V), ParallelMap[K, V]] = new ParallelMapCanBuildFrom[K, V]
+ def newCombiner[K, V]: Combiner[(K, V), ParallelMap[K, V]] = immutable.HashTrieCombiner[K, V]
+
+ implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParallelMap[K, V]] = new CanCombineFromMap[K, V]
}
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala
index eddc2963fa..8a0b54525f 100644
--- a/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala
+++ b/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala
@@ -22,9 +22,9 @@ extends MapLike[K, V, Repr]
with ParallelIterableLike[(K, V), Repr, SequentialView]
{ self =>
- protected[this] override def newBuilder: Builder[(K, V), Repr] = null // TODO
+ protected[this] override def newBuilder: Builder[(K, V), Repr] = newCombiner
- protected[this] override def newCombiner: Combiner[(K, V), Repr] = null // TODO
+ protected[this] override def newCombiner: Combiner[(K, V), Repr] = error("Must be implemented in concrete classes.")
override def empty: Repr
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala
index 3e85b8dff6..71b802cd11 100644
--- a/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala
+++ b/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala
@@ -6,7 +6,7 @@ import scala.collection.generic.GenericCompanion
import scala.collection.generic.GenericParallelCompanion
import scala.collection.generic.GenericParallelTemplate
import scala.collection.generic.ParallelFactory
-import scala.collection.generic.CanBuildFromParallel
+import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.mutable.ParallelArrayCombiner
import scala.collection.parallel.mutable.ParallelArray
@@ -30,7 +30,7 @@ trait ParallelSeq[+T] extends Seq[T]
object ParallelSeq extends ParallelFactory[ParallelSeq] {
- implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeq[T]] = new GenericCanBuildFromParallel[T]
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeq[T]] = new GenericCanCombineFrom[T]
def newBuilder[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T]
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala
index fedc9f56ac..18b0c83f23 100644
--- a/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala
+++ b/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala
@@ -6,7 +6,7 @@ import scala.collection.SeqLike
import scala.collection.generic.DefaultSignalling
import scala.collection.generic.AtomicIndexFlag
import scala.collection.generic.CanBuildFrom
-import scala.collection.generic.CanBuildFromParallel
+import scala.collection.generic.CanCombineFrom
import scala.collection.generic.VolatileAbort
@@ -320,7 +320,7 @@ extends scala.collection.SeqLike[T, Repr]
protected[this] class SegmentLength(pred: T => Boolean, from: Int, val pit: ParallelIterator)
extends Accessor[(Int, Boolean), SegmentLength] {
var result: (Int, Boolean) = null
- def leaf = if (from < pit.indexFlag) {
+ def leaf(prev: Option[(Int, Boolean)]) = if (from < pit.indexFlag) {
val itsize = pit.remaining
val seglen = pit.prefixLength(pred)
result = (seglen, itsize == seglen)
@@ -337,7 +337,7 @@ extends scala.collection.SeqLike[T, Repr]
protected[this] class IndexWhere(pred: T => Boolean, from: Int, val pit: ParallelIterator)
extends Accessor[Int, IndexWhere] {
var result: Int = -1
- def leaf = if (from < pit.indexFlag) {
+ def leaf(prev: Option[Int]) = if (from < pit.indexFlag) {
val r = pit.indexWhere(pred)
if (r != -1) {
result = from + r
@@ -357,7 +357,7 @@ extends scala.collection.SeqLike[T, Repr]
protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, val pit: ParallelIterator)
extends Accessor[Int, LastIndexWhere] {
var result: Int = -1
- def leaf = if (pos > pit.indexFlag) {
+ def leaf(prev: Option[Int]) = if (pos > pit.indexFlag) {
val r = pit.lastIndexWhere(pred)
if (r != -1) {
result = pos + r
@@ -377,15 +377,15 @@ extends scala.collection.SeqLike[T, Repr]
protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], val pit: ParallelIterator)
extends Transformer[Combiner[U, This], Reverse[U, This]] {
var result: Combiner[U, This] = null
- def leaf = result = pit.reverse2combiner(cbf())
+ def leaf(prev: Option[Combiner[U, This]]) = result = pit.reverse2combiner(reuse(prev, cbf()))
def newSubtask(p: SuperParallelIterator) = new Reverse(cbf, down(p))
override def merge(that: Reverse[U, This]) = result = that.result combine result
}
- protected[this] class ReverseMap[S, That](f: T => S, pbf: CanBuildFromParallel[Repr, S, That], val pit: ParallelIterator)
+ protected[this] class ReverseMap[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator)
extends Transformer[Combiner[S, That], ReverseMap[S, That]] {
var result: Combiner[S, That] = null
- def leaf = result = pit.reverseMap2combiner(f, pbf)
+ def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf) // TODO
def newSubtask(p: SuperParallelIterator) = new ReverseMap(f, pbf, down(p))
override def merge(that: ReverseMap[S, That]) = result = that.result combine result
}
@@ -393,7 +393,7 @@ extends scala.collection.SeqLike[T, Repr]
protected[this] class SameElements[U >: T](val pit: ParallelIterator, val otherpit: PreciseSplitter[U])
extends Accessor[Boolean, SameElements[U]] {
var result: Boolean = true
- def leaf = if (!pit.isAborted) {
+ def leaf(prev: Option[Boolean]) = if (!pit.isAborted) {
result = pit.sameElements(otherpit)
if (!result) pit.abort
}
@@ -406,10 +406,10 @@ extends scala.collection.SeqLike[T, Repr]
override def merge(that: SameElements[U]) = result = result && that.result
}
- protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanBuildFromParallel[Repr, U, That], val pit: ParallelIterator)
+ protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], val pit: ParallelIterator)
extends Transformer[Combiner[U, That], Updated[U, That]] {
var result: Combiner[U, That] = null
- def leaf = result = pit.updated2combiner(pos, elem, pbf)
+ def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf) // TODO
def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
override def split = {
val pits = pit.split
@@ -421,7 +421,7 @@ extends scala.collection.SeqLike[T, Repr]
protected[this] class Corresponds[S](corr: (T, S) => Boolean, val pit: ParallelIterator, val otherpit: PreciseSplitter[S])
extends Accessor[Boolean, Corresponds[S]] {
var result: Boolean = true
- def leaf = if (!pit.isAborted) {
+ def leaf(prev: Option[Boolean]) = if (!pit.isAborted) {
result = pit.corresponds(corr)(otherpit)
if (!result) pit.abort
}
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala
index d0faa942ef..7862e99f44 100644
--- a/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala
+++ b/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala
@@ -6,7 +6,7 @@ package scala.collection.parallel
import scala.collection.TraversableView
import scala.collection.SeqView
import scala.collection.Parallel
-import scala.collection.generic.CanBuildFromParallel
+import scala.collection.generic.CanCombineFrom
@@ -38,8 +38,8 @@ object ParallelSeqView {
type Coll = ParallelSeqView[_, C, _] forSome { type C <: ParallelSeq[_] }
- implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] =
- new CanBuildFromParallel[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] =
+ new CanCombineFrom[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] {
def apply(from: Coll) = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing]
def apply() = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing]
}
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala
index 2e9ebb1df3..eab4d7ad5f 100644
--- a/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala
+++ b/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala
@@ -8,7 +8,7 @@ import scala.collection.SeqView
import scala.collection.SeqViewLike
import scala.collection.Parallel
import scala.collection.generic.CanBuildFrom
-import scala.collection.generic.CanBuildFromParallel
+import scala.collection.generic.CanCombineFrom
@@ -161,10 +161,10 @@ extends SeqView[T, Coll]
/* tasks */
- protected[this] class Force[U >: T, That](cbf: CanBuildFromParallel[Coll, U, That], val pit: ParallelIterator)
+ protected[this] class Force[U >: T, That](cbf: CanCombineFrom[Coll, U, That], val pit: ParallelIterator)
extends Transformer[Combiner[U, That], Force[U, That]] {
var result: Combiner[U, That] = null
- def leaf = result = pit.copy2builder[U, That, Combiner[U, That]](cbf(self.underlying))
+ def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cbf(self.underlying)))
def newSubtask(p: SuperParallelIterator) = new Force(cbf, down(p))
override def merge(that: Force[U, That]) = result = result combine that.result
}
diff --git a/src/parallel-collections/scala/collection/parallel/Tasks.scala b/src/parallel-collections/scala/collection/parallel/Tasks.scala
index d21113cf64..3ef60f8c7a 100644
--- a/src/parallel-collections/scala/collection/parallel/Tasks.scala
+++ b/src/parallel-collections/scala/collection/parallel/Tasks.scala
@@ -38,8 +38,10 @@ trait Tasks {
def repr = this.asInstanceOf[Tp]
/** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */
def compute
- /** Body of the task - non-divisible unit of work done by this task. */
- def leaf
+ /** Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous task
+ * or `None` if there was no previous task.
+ */
+ def leaf(result: Option[R])
/** Start task. */
def start
/** Wait for task to finish. */
@@ -88,19 +90,20 @@ trait AdaptiveWorkStealingTasks extends Tasks {
def split: Seq[Task[R, Tp]]
/** The actual leaf computation. */
- def leaf: Unit
+ def leaf(result: Option[R]): Unit
- def compute = if (shouldSplitFurther) internal else leaf
+ def compute = if (shouldSplitFurther) internal else leaf(None)
def internal = {
var last = spawnSubtasks
- last.leaf
+ last.leaf(None)
result = last.result
while (last.next != null) {
+ val lastresult = Option(last.result)
last = last.next
- if (last.tryCancel) last.leaf else last.sync
+ if (last.tryCancel) last.leaf(lastresult) else last.sync
merge(last.repr)
}
}
diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala
index 3f73c8fe18..0a39dd200c 100644
--- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala
+++ b/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala
@@ -11,7 +11,9 @@ import scala.collection.parallel.ParallelMapLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.EnvironmentPassingCombiner
import scala.collection.generic.ParallelMapFactory
-import scala.collection.generic.CanBuildFromParallel
+import scala.collection.generic.CanCombineFrom
+import scala.collection.generic.GenericParallelMapTemplate
+import scala.collection.generic.GenericParallelMapCompanion
import scala.collection.immutable.HashMap
@@ -21,11 +23,15 @@ import scala.collection.immutable.HashMap
class ParallelHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V])
extends ParallelMap[K, V]
+ with GenericParallelMapTemplate[K, V, ParallelHashTrie]
with ParallelMapLike[K, V, ParallelHashTrie[K, V], HashMap[K, V]]
-{ self =>
+{
+self =>
def this() = this(HashMap.empty[K, V])
+ override def mapCompanion: GenericParallelMapCompanion[ParallelHashTrie] = ParallelHashTrie
+
override def empty: ParallelHashTrie[K, V] = new ParallelHashTrie[K, V]
def parallelIterator = new ParallelHashTrieIterator(trie) with SCPI
@@ -40,6 +46,11 @@ extends ParallelMap[K, V]
override def size = trie.size
+ protected override def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]) = oldc match {
+ case Some(old) => old
+ case None => newc
+ }
+
type SCPI = SignalContextPassingIterator[ParallelHashTrieIterator]
class ParallelHashTrieIterator(val ht: HashMap[K, V])
@@ -70,7 +81,13 @@ extends ParallelMap[K, V]
object ParallelHashTrie extends ParallelMapFactory[ParallelHashTrie] {
def empty[K, V]: ParallelHashTrie[K, V] = new ParallelHashTrie[K, V]
- implicit def canBuildFrom[K, V]: CanBuildFromParallel[Coll, (K, V), ParallelHashTrie[K, V]] = new ParallelMapCanBuildFrom[K, V]
+ def newCombiner[K, V]: Combiner[(K, V), ParallelHashTrie[K, V]] = HashTrieCombiner[K, V]
+
+ implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParallelHashTrie[K, V]] = {
+ new CanCombineFromMap[K, V]
+ }
+
+ var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0)
}
@@ -85,16 +102,17 @@ self: EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] =>
def +=(elem: (K, V)) = { trie += elem; this }
- def result = new ParallelHashTrie[K, V](trie)
-
- def combine[N <: (K, V), NewTo >: ParallelHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = {
+ def combine[N <: (K, V), NewTo >: ParallelHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
+ // ParallelHashTrie.totalcombines.incrementAndGet
if (other.isInstanceOf[HashTrieCombiner[_, _]]) {
val that = other.asInstanceOf[HashTrieCombiner[K, V]]
val ncombiner = HashTrieCombiner[K, V]
ncombiner.trie = this.trie combine that.trie
ncombiner
} else error("Unexpected combiner type.")
- }
+ } else this
+
+ def result = new ParallelHashTrie[K, V](trie)
}
diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala
index a07db9c39c..85a33c7431 100644
--- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala
+++ b/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala
@@ -6,7 +6,7 @@ import scala.collection.immutable.Range
import scala.collection.immutable.RangeUtils
import scala.collection.parallel.ParallelSeq
import scala.collection.parallel.Combiner
-import scala.collection.generic.CanBuildFromParallel
+import scala.collection.generic.CanCombineFrom
@@ -62,8 +62,8 @@ extends ParallelSeq[Int]
/* transformers */
- override def map2combiner[S, That](f: Int => S, pbf: CanBuildFromParallel[ParallelSeq[Int], S, That]): Combiner[S, That] = {
- val cb = pbf(self.repr)
+ override def map2combiner[S, That](f: Int => S, cb: Combiner[S, That]): Combiner[S, That] = {
+ //val cb = pbf(self.repr)
val sz = remaining
cb.sizeHint(sz)
if (sz > 0) {
diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala
index f3a26e8682..ceb0dcc13d 100644
--- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala
+++ b/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala
@@ -4,7 +4,7 @@ package scala.collection.parallel.immutable
import scala.collection.generic.GenericParallelTemplate
import scala.collection.generic.GenericCompanion
import scala.collection.generic.GenericParallelCompanion
-import scala.collection.generic.CanBuildFromParallel
+import scala.collection.generic.CanCombineFrom
import scala.collection.generic.ParallelFactory
import scala.collection.parallel.ParallelSeqLike
import scala.collection.parallel.Combiner
diff --git a/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala b/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala
index e45b3f156a..bd17d24ea8 100644
--- a/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala
+++ b/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala
@@ -27,12 +27,12 @@ trait LazyCombiner[Elem, +To, Buff <: Growable[Elem] with Sizing] extends Combin
def +=(elem: Elem) = { lastbuff += elem; this }
def result: To = allocateAndCopy
def clear = { chain.clear }
- def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = {
+ def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
if (other.isInstanceOf[LazyCombiner[_, _, _]]) {
val that = other.asInstanceOf[LazyCombiner[Elem, To, Buff]]
newLazyCombiner(chain ++= that.chain)
} else throw new UnsupportedOperationException("Cannot combine with combiner of different type.")
- }
+ } else this
def size = chain.foldLeft(0)(_ + _.size)
/** Method that allocates the data structure and copies elements into it using
@@ -40,4 +40,4 @@ trait LazyCombiner[Elem, +To, Buff <: Growable[Elem] with Sizing] extends Combin
*/
def allocateAndCopy: To
def newLazyCombiner(buffchain: ArrayBuffer[Buff]): LazyCombiner[Elem, To, Buff]
-} \ No newline at end of file
+}
diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala
index 7ea5499aa6..30b4b109b2 100644
--- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala
+++ b/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala
@@ -5,7 +5,7 @@ package scala.collection.parallel.mutable
import scala.collection.generic.GenericParallelTemplate
import scala.collection.generic.GenericCompanion
import scala.collection.generic.GenericParallelCompanion
-import scala.collection.generic.CanBuildFromParallel
+import scala.collection.generic.CanCombineFrom
import scala.collection.generic.ParallelFactory
import scala.collection.generic.Sizing
import scala.collection.parallel.Combiner
@@ -357,8 +357,8 @@ extends ParallelSeq[T]
/* transformers */
- override def map2combiner[S, That](f: T => S, cbf: CanBuildFromParallel[ParallelArray[T], S, That]): Combiner[S, That] = {
- val cb = cbf(self.repr)
+ override def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
+ //val cb = cbf(self.repr)
cb.sizeHint(remaining)
map2combiner_quick(f, arr, cb, until, i)
i = until
@@ -373,7 +373,7 @@ extends ParallelSeq[T]
}
}
- override def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanBuildFromParallel[ParallelArray[T], S, That]): Combiner[S, That] = {
+ override def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[ParallelArray[T], S, That]): Combiner[S, That] = {
val cb = pbf(self.repr)
collect2combiner_quick(pf, arr, cb, until, i)
i = until
@@ -389,7 +389,7 @@ extends ParallelSeq[T]
}
}
- override def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanBuildFromParallel[ParallelArray[T], S, That]): Combiner[S, That] = {
+ override def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[ParallelArray[T], S, That]): Combiner[S, That] = {
val cb = pbf(self.repr)
while (i < until) {
val traversable = f(arr(i).asInstanceOf[T])
@@ -518,7 +518,7 @@ extends ParallelSeq[T]
object ParallelArray extends ParallelFactory[ParallelArray] {
- implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelArray[T]] = new GenericCanBuildFromParallel[T]
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelArray[T]] = new GenericCanCombineFrom[T]
def newBuilder[T]: Combiner[T, ParallelArray[T]] = newCombiner
def newCombiner[T]: Combiner[T, ParallelArray[T]] = ParallelArrayCombiner[T]
diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala
index 9bbad7035e..2991344be2 100644
--- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala
+++ b/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala
@@ -43,7 +43,7 @@ extends LazyCombiner[T, ParallelArray[T], ExposedArrayBuffer[T]]
class CopyChainToArray(array: Array[Any], offset: Int, howmany: Int) extends super.Task[Unit, CopyChainToArray] {
var result = ()
- def leaf = if (howmany > 0) {
+ def leaf(prev: Option[Unit]) = if (howmany > 0) {
var totalleft = howmany
val (stbuff, stind) = findStart(offset)
var buffind = stbuff
diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala
index f7ba44b67e..bd0a46bc43 100644
--- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala
+++ b/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala
@@ -28,8 +28,8 @@ trait ParallelIterable[T] extends collection.mutable.Iterable[T]
/** $factoryinfo
*/
object ParallelIterable extends ParallelFactory[ParallelIterable] {
- implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelIterable[T]] =
- new GenericCanBuildFromParallel[T]
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelIterable[T]] =
+ new GenericCanCombineFrom[T]
def newBuilder[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T]
diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala
index ed08b59962..636ba1ac3d 100644
--- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala
+++ b/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala
@@ -4,7 +4,7 @@ package scala.collection.parallel.mutable
import scala.collection.generic.GenericParallelTemplate
import scala.collection.generic.GenericCompanion
import scala.collection.generic.GenericParallelCompanion
-import scala.collection.generic.CanBuildFromParallel
+import scala.collection.generic.CanCombineFrom
import scala.collection.generic.ParallelFactory
import scala.collection.parallel.ParallelSeqLike
import scala.collection.parallel.Combiner
@@ -38,7 +38,7 @@ trait ParallelSeq[T] extends collection.mutable.Seq[T]
* @define coll mutable parallel sequence
*/
object ParallelSeq extends ParallelFactory[ParallelSeq] {
- implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeq[T]] = new GenericCanBuildFromParallel[T]
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeq[T]] = new GenericCanCombineFrom[T]
def newBuilder[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T]
diff --git a/src/parallel-collections/scala/collection/parallel/package.scala b/src/parallel-collections/scala/collection/parallel/package.scala
index 6e8cbe8633..3b297f1cd1 100644
--- a/src/parallel-collections/scala/collection/parallel/package.scala
+++ b/src/parallel-collections/scala/collection/parallel/package.scala
@@ -4,7 +4,7 @@ package scala.collection
import java.lang.Thread._
import scala.collection.generic.CanBuildFrom
-import scala.collection.generic.CanBuildFromParallel
+import scala.collection.generic.CanCombineFrom
/** Package object for parallel collections.
@@ -34,8 +34,8 @@ package object parallel {
implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new {
def isParallel = bf.isInstanceOf[Parallel]
- def asParallel = bf.asInstanceOf[CanBuildFromParallel[From, Elem, To]]
- def ifParallel[R](isbody: CanBuildFromParallel[From, Elem, To] => R) = new {
+ def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]]
+ def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new {
def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody
}
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala
new file mode 100644
index 0000000000..a944a7fb39
--- /dev/null
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala
@@ -0,0 +1,87 @@
+package scala.collection.parallel.benchmarks
+package hashtries
+
+
+
+
+import collection.immutable.{HashMap => HashTrie}
+import collection.mutable.HashMap
+
+
+
+
+
+
+class MultipleCombine(val size: Int, val parallelism: Int, val runWhat: String) extends Bench with IntInit {
+ var combines = 10
+
+ var thattries = new Array[HashTrie[Int, Int]](combines)
+ def initTries = for (r <- 0 until combines) {
+ var thattrie = new HashTrie[Int, Int]
+ for (i <- ((r + 1) * size) until ((r + 2) * size)) thattrie += ((i, i))
+ thattries(r) = thattrie
+ }
+ initTries
+
+ val thatmaps = new Array[HashMap[Int, Int]](10)
+ def initMaps = for (r <- 0 until combines) {
+ var thatmap = new HashMap[Int, Int]
+ for (i <- ((r + 1) * size) until ((r + 2) * size)) thatmap += ((i, i))
+ thatmaps(r) = thatmap
+ }
+ initMaps
+
+ override def repetitionsPerRun = 25
+ def runpar = throw new UnsupportedOperationException
+ def runseq = runhashtrie
+ def runhashtrie = {
+ initHashTrie
+ var trie = hashtrie
+ for (r <- 0 until combines) trie = trie combine thattries(r)
+ }
+ def runappendtrie = {
+ initHashTrie
+ var trie = hashtrie
+ for (r <- 0 until combines) trie = trie ++ thattries(r)
+ }
+ def runhashmap = {
+ initHashMap
+ var map = hashmap
+ for (r <- 0 until combines) map = map ++ thatmaps(r)
+ }
+ def rundestructive = {
+ initHashTrie
+ var trie = hashtrie
+ for (r <- 0 until combines) trie = trie combine thattries(r)
+ }
+ def companion = MultipleCombine
+ def comparisonMap = Map("hashtrie" -> runhashtrie _, "hashmap" -> runhashmap _, "appendtrie" -> runappendtrie _, "destruct" -> rundestructive _)
+ override def reset = runWhat match {
+ case "appendtrie" => initHashTrie
+ case "destruct" => initHashTrie
+ case _ => super.reset
+ }
+}
+
+
+object MultipleCombine extends BenchCompanion {
+ def collectionName = "HashTrie"
+ def benchName = "multi-combine";
+ def apply(sz: Int, p: Int, what: String) = new MultipleCombine(sz, p, what)
+ override def defaultSize = 5000
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala
new file mode 100644
index 0000000000..747178c1a4
--- /dev/null
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala
@@ -0,0 +1,118 @@
+package scala.collection.parallel.benchmarks.hashtries
+
+
+
+
+import scala.collection.parallel.benchmarks.generic.StandardParallelIterableBench
+import scala.collection.parallel.benchmarks.generic.NotBenchmark
+import scala.collection.parallel.benchmarks.generic.Dummy
+import scala.collection.parallel.benchmarks.generic.Operators
+import scala.collection.parallel.immutable.ParallelHashTrie
+
+
+
+
+
+trait ParallelHashTrieBenches[K, V] extends StandardParallelIterableBench[(K, V), ParallelHashTrie[K, V]] {
+
+ def nameOfCollection = "ParallelHashTrie"
+ def comparisonMap = collection.Map()
+ val forkJoinPool = new scala.concurrent.forkjoin.ForkJoinPool
+
+ object Map2 extends IterableBenchCompanion {
+ override def defaultSize = 5000
+ def benchName = "map2";
+ def apply(sz: Int, p: Int, w: String) = new Map2(sz, p, w)
+ }
+
+ class Map2(val size: Int, val parallelism: Int, val runWhat: String)
+ extends IterableBench with StandardParallelIterableBench[(K, V), ParallelHashTrie[K, V]] {
+ var result: Int = 0
+ def comparisonMap = collection.Map()
+ def runseq = result = this.seqcoll.map(operators.mapper2).size
+ def runpar = {
+ result = this.parcoll.map(operators.mapper2).size
+ //println(collection.parallel.immutable.ParallelHashTrie.totalcombines)
+ //System.exit(1)
+ }
+ def companion = Map2
+ override def repetitionsPerRun = 50
+ override def printResults {
+ println("Total combines: " + collection.parallel.immutable.ParallelHashTrie.totalcombines)
+ println("Size of last result: " + result)
+ }
+ }
+
+}
+
+
+
+
+
+object RefParallelHashTrieBenches extends ParallelHashTrieBenches[Dummy, Dummy] with NotBenchmark {
+
+ type DPair = (Dummy, Dummy)
+
+ object operators extends Operators[DPair] {
+ def gcd(a: Int, b: Int): Int = {
+ val result = if (b == 0) a else {
+ gcd(b, a - b * (a / b))
+ }
+ result + 1000
+ }
+ def heavy(a: Int): Int = {
+ var i = 0
+ var sum = a
+ while (i < 3000) {
+ i += 1
+ sum += a + i
+ }
+ sum
+ }
+ val reducer = (x: DPair, y: DPair) => {
+ y._2.num = x._2.in + y._2.in
+ y
+ }
+ val mediumreducer = (x: DPair, y: DPair) => {
+ y._2.num = gcd(x._2.in, y._2.in)
+ y
+ }
+ val filterer = (p: DPair) => {
+ p._1.num % 2 == 0
+ }
+ val mapper = (p: DPair) => {
+ val a = p._1
+ a.num = a.in % 2
+ (a, p._2)
+ }
+ override val mapper2 = (p: DPair) => {
+ val a = 1 //heavy(p._1.in)
+ (new Dummy(p._1.in * -2 + a), p._2)
+ }
+ val heavymapper = (p: DPair) => {
+ val a = p._1
+ var i = -100
+ while (i < 0) {
+ if (a.in < i) a.num += 1
+ i += 1
+ }
+ (a, p._2)
+ }
+ val taker = (p: DPair) => true
+ }
+
+ def createSequential(sz: Int, p: Int) = {
+ var ht = new collection.immutable.HashMap[Dummy, Dummy]
+ for (i <- 0 until sz) ht += ((new Dummy(i), new Dummy(i)))
+ ht
+ }
+
+ def createParallel(sz: Int, p: Int) = {
+ var pht = new ParallelHashTrie[Dummy, Dummy]
+ for (i <- 0 until sz) pht += ((new Dummy(i), new Dummy(i)))
+ forkJoinPool.setParallelism(p)
+ pht.environment = forkJoinPool
+ pht
+ }
+
+}