From 285d2182f1aeb113aba55be804eefa2f61ce2624 Mon Sep 17 00:00:00 2001 From: Aleksandar Pokopec Date: Wed, 22 Sep 2010 14:18:19 +0000 Subject: Reimplementing parallel views to solve several ... Reimplementing parallel views to solve several performance glitches. No review. --- .../collection/parallel/ParIterableLike.scala | 16 +- .../collection/parallel/ParIterableViewLike.scala | 55 ++++++- .../scala/collection/parallel/ParMapLike.scala | 6 +- .../scala/collection/parallel/ParSeqLike.scala | 14 +- .../scala/collection/parallel/ParSeqViewLike.scala | 46 +++--- .../collection/parallel/RemainsIterator.scala | 116 +++++++++++++- .../collection/parallel/mutable/ParArray.scala | 9 ++ .../parallel/mutable/ParArrayCombiner.scala | 2 +- .../scala/collection/parallel/package.scala | 7 + .../scala/collection/parallel/Benchmarking.scala | 4 + .../parallel/benchmarks/generic/Dummy.scala | 13 +- .../parallel/benchmarks/generic/Operators.scala | 5 + .../benchmarks/generic/ParallelBenches.scala | 167 +++++++++++++++------ .../benchmarks/hashtries/ParallelHashTries.scala | 14 +- .../benchmarks/parallel_range/RangeBenches.scala | 18 +-- .../benchmarks/parallel_view/SeqViewBenches.scala | 5 +- 16 files changed, 390 insertions(+), 107 deletions(-) diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 8fc232579a..e3fde884e2 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -117,10 +117,10 @@ import java.util.concurrent.atomic.AtomicBoolean * This method will provide sequential views it produces with `indexFlag` signalling capabilities. This means * that sequential views may set and read `indexFlag` state. */ -trait ParIterableLike[+T, +Repr <: Parallel, +SequentialView <: Iterable[T]] +trait ParIterableLike[+T, +Repr <: Parallel, +Sequential <: Iterable[T] with IterableLike[T, Sequential]] extends IterableLike[T, Repr] with Parallelizable[Repr] - with Sequentializable[T, SequentialView] + with Sequentializable[T, Sequential] with Parallel with HasNewCombiner[T, Repr] with TaskSupport { @@ -172,7 +172,7 @@ self => * * @return a parallel iterator */ - protected def parallelIterator: ParIterator + def parallelIterator: ParIterableIterator[T] /** Creates a new split iterator used to traverse the elements of this collection. * @@ -235,8 +235,8 @@ self => var result: R = null.asInstanceOf[R] } - /* convenience iterator operations wrapper */ - protected implicit def iterator2ops[PI <: ParIterator](it: PI) = new { + /* convenience signalling operations wrapper */ + protected implicit def delegatedSignalling2ops[PI <: DelegatedSignalling](it: PI) = new { def assign(cntx: Signalling): PI = { it.signalDelegate = cntx it @@ -604,6 +604,12 @@ self => executeAndWait(new CopyToArray(start, len, xs, parallelIterator)) } + override def view = new ParIterableView[T, Repr, Sequential] { + protected lazy val underlying = self.repr + def seq = self.seq.view + def parallelIterator = self.parallelIterator + } + override def toIterable: Iterable[T] = seq.drop(0).asInstanceOf[Iterable[T]] override def toArray[U >: T: ClassManifest]: Array[U] = { diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala index 18ae0ae097..1f7ea9b694 100644 --- a/src/library/scala/collection/parallel/ParIterableViewLike.scala +++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala @@ -7,7 +7,7 @@ import scala.collection.Parallel import scala.collection.TraversableViewLike import scala.collection.IterableView import scala.collection.IterableViewLike - +import scala.collection.generic.CanBuildFrom @@ -43,7 +43,58 @@ extends IterableView[T, Coll] type CPI = SignalContextPassingIterator[ParIterator] - trait Transformed[+S] extends ParIterableView[S, Coll, CollSeq] with super.Transformed[S] + /* wrappers */ + + trait Transformed[+S] extends ParIterableView[S, Coll, CollSeq] with super.Transformed[S] { + override def parallelIterator: ParIterableIterator[S] + override def iterator = parallelIterator + environment = self.environment + } + + trait Sliced extends super.Sliced with Transformed[T] { + override def slice(from1: Int, until1: Int): This = newSliced(from1 max 0, until1 max 0).asInstanceOf[This] + def parallelIterator: ParIterableIterator[T] = self.parallelIterator.slice(from, until) + def seq = self.seq.slice(from, until) + } + + trait Mapped[S] extends super.Mapped[S] with Transformed[S]{ + def parallelIterator: ParIterableIterator[S] = self.parallelIterator.map(mapping) + def seq = self.seq.map(mapping).asInstanceOf[IterableView[S, CollSeq]] + } + + // only use if other is a ParIterable, otherwise force + trait Appended[U >: T] extends super.Appended[U] with Transformed[U] { + def restAsParIterable: ParIterable[U] = rest.asParIterable + def parallelIterator: ParIterableIterator[U] = self.parallelIterator.appendIterable[U, ParIterableIterator[U]](restAsParIterable.parallelIterator) + def seq = self.seq.++(rest).asInstanceOf[IterableView[U, CollSeq]] + } + + trait Forced[S] extends super.Forced[S] with Transformed[S] { + def forcedPar: ParIterable[S] = forced.asParIterable + def parallelIterator: ParIterableIterator[S] = forcedPar.parallelIterator + // cheating here - knowing that `underlying` of `self.seq` is of type `CollSeq`, + // we use it to obtain a view of the correct type - not the most efficient thing + // in the universe, but without making `newForced` more accessible, or adding + // a `forced` method to `SeqView`, this is the best we can do + def seq = self.seq.take(0).++(forced).asInstanceOf[IterableView[S, CollSeq]] + } + + /* operation overrides */ + + override def slice(from: Int, until: Int): This = newSliced(from, until).asInstanceOf[This] + override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That] + override def ++[U >: T, That](xs: TraversableOnce[U])(implicit bf: CanBuildFrom[This, U, That]): That = newAppended(xs.toTraversable).asInstanceOf[That] + + /* wrapper virtual ctors */ + + protected override def newSliced(f: Int, u: Int): Transformed[T] = new Sliced { val from = f; val until = u } + protected override def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f } + protected override def newForced[S](xs: => Seq[S]): Transformed[S] = new Forced[S] { val forced = xs } + protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = { + // we only append if `that` is a parallel iterable, i.e. it has a splitter + if (that.isParIterable) new Appended[U] { val rest = that } + else newForced(mutable.ParArray.fromTraversables(this, that)) + } } diff --git a/src/library/scala/collection/parallel/ParMapLike.scala b/src/library/scala/collection/parallel/ParMapLike.scala index 252ca2446f..e6944953b5 100644 --- a/src/library/scala/collection/parallel/ParMapLike.scala +++ b/src/library/scala/collection/parallel/ParMapLike.scala @@ -16,10 +16,10 @@ import scala.collection.mutable.Builder trait ParMapLike[K, +V, - +Repr <: ParMapLike[K, V, Repr, SequentialView] with ParMap[K, V], - +SequentialView <: Map[K, V]] + +Repr <: ParMapLike[K, V, Repr, Sequential] with ParMap[K, V], + +Sequential <: Map[K, V] with MapLike[K, V, Sequential]] extends MapLike[K, V, Repr] - with ParIterableLike[(K, V), Repr, SequentialView] + with ParIterableLike[(K, V), Repr, Sequential] { self => protected[this] override def newBuilder: Builder[(K, V), Repr] = newCombiner diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 7e8b9d6129..9ea647fd9f 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -81,7 +81,7 @@ self => * * @return an iterator that can be split into subsets of precise size */ - protected def parallelIterator: ParIterator + def parallelIterator: ParSeqIterator[T] override def iterator: PreciseSplitter[T] = parallelIterator @@ -308,7 +308,7 @@ self => def length = self.length def apply(idx: Int) = self(idx) def seq = self.seq.view - def parallelIterator = new Elements(0, length) with SCPI {} + def parallelIterator = self.parallelIterator } override def view(from: Int, until: Int) = view.slice(from, until) @@ -350,7 +350,7 @@ self => pit.setIndexFlagIfLesser(from) } } - protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException + protected[this] def newSubtask(p: SuperParIterator) = unsupported override def split = { val pits = pit.split for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield new IndexWhere(pred, untilp, p) @@ -370,7 +370,7 @@ self => pit.setIndexFlagIfGreater(pos) } } - protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException + protected[this] def newSubtask(p: SuperParIterator) = unsupported override def split = { val pits = pit.split for ((p, untilp) <- pits zip pits.scanLeft(pos)(_ + _.remaining)) yield new LastIndexWhere(pred, untilp, p) @@ -396,7 +396,7 @@ self => override def merge(that: ReverseMap[S, That]) = result = that.result combine result } - protected[this] class SameElements[U >: T](val pit: ParIterator, val otherpit: PreciseSplitter[U]) + protected[this] class SameElements[U >: T](protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[U]) extends Accessor[Boolean, SameElements[U]] { var result: Boolean = true def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { @@ -424,7 +424,7 @@ self => override def merge(that: Updated[U, That]) = result = result combine that.result } - protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], val pit: ParIterator, val otherpit: PreciseSplitter[S]) + protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[S]) extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] { var result: Result = null def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, pbf(self.repr)) @@ -442,7 +442,7 @@ self => override def merge(that: Zip[U, S, That]) = result = result combine that.result } - protected[this] class Corresponds[S](corr: (T, S) => Boolean, val pit: ParIterator, val otherpit: PreciseSplitter[S]) + protected[this] class Corresponds[S](corr: (T, S) => Boolean, protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[S]) extends Accessor[Boolean, Corresponds[S]] { var result: Boolean = true def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { diff --git a/src/library/scala/collection/parallel/ParSeqViewLike.scala b/src/library/scala/collection/parallel/ParSeqViewLike.scala index 31b90936d7..1aac72767a 100644 --- a/src/library/scala/collection/parallel/ParSeqViewLike.scala +++ b/src/library/scala/collection/parallel/ParSeqViewLike.scala @@ -44,34 +44,38 @@ extends SeqView[T, Coll] trait Transformed[+S] extends ParSeqView[S, Coll, CollSeq] with super[ParIterableView].Transformed[S] with super[SeqView].Transformed[S] { - override def parallelIterator = new Elements(0, length) with SCPI {} + override def parallelIterator: ParSeqIterator[S] = new Elements(0, length) with SCPI {} override def iterator = parallelIterator - environment = self.environment } - trait Forced[S] extends super.Forced[S] with Transformed[S] { - // cheating here - knowing that `underlying` of `self.seq` is of type `CollSeq`, - // we use it to obtain a view of the correct type - not the most efficient thing - // in the universe, but without making `newForced` more accessible, or adding - // a `forced` method to `SeqView`, this is the best we can do - def seq = self.seq.take(0).++(forced).asInstanceOf[SeqView[S, CollSeq]] + trait Sliced extends super[SeqViewLike].Sliced with super[ParIterableViewLike].Sliced with Transformed[T] { + override def slice(from1: Int, until1: Int): This = newSliced(from1 max 0, until1 max 0).asInstanceOf[This] + override def parallelIterator = self.parallelIterator.psplit(from, until - from)(1) } - trait Filtered extends super.Filtered with Transformed[T] { - def seq = self.seq filter pred + trait Mapped[S] extends super[SeqViewLike].Mapped[S] with super[ParIterableViewLike].Mapped[S] with Transformed[S] { + override def parallelIterator = self.parallelIterator.map(mapping) + override def seq = self.seq.map(mapping).asInstanceOf[SeqView[S, CollSeq]] } - trait Sliced extends super.Sliced with Transformed[T] { - override def slice(from1: Int, until1: Int): This = newSliced(from1 max 0, until1 max 0).asInstanceOf[This] - def seq = self.seq.slice(from, until) + trait Appended[U >: T] extends super[SeqViewLike].Appended[U] with super[ParIterableViewLike].Appended[U] with Transformed[U] { + def restAsParSeq: ParSeq[U] = rest.asInstanceOf[ParSeq[U]] + override def parallelIterator = self.parallelIterator.appendSeq[U, ParSeqIterator[U]](restAsParSeq.parallelIterator) + override def seq = self.seq.++(rest).asInstanceOf[SeqView[U, CollSeq]] } - trait Appended[U >: T] extends super.Appended[U] with Transformed[U] { - def seq = self.seq.++(rest).asInstanceOf[SeqView[U, CollSeq]] + trait Forced[S] extends super[SeqViewLike].Forced[S] with super[ParIterableViewLike].Forced[S] with Transformed[S] { + override def forcedPar: ParSeq[S] = forced.asParSeq + override def parallelIterator: ParSeqIterator[S] = forcedPar.parallelIterator + // cheating here - knowing that `underlying` of `self.seq` is of type `CollSeq`, + // we use it to obtain a view of the correct type - not the most efficient thing + // in the universe, but without making `newForced` more accessible, or adding + // a `forced` method to `SeqView`, this is the best we can do + override def seq = self.seq.take(0).++(forced).asInstanceOf[SeqView[S, CollSeq]] } - trait Mapped[S] extends super.Mapped[S] with Transformed[S]{ - def seq = self.seq.map(mapping).asInstanceOf[SeqView[S, CollSeq]] + trait Filtered extends super.Filtered with Transformed[T] { + def seq = self.seq filter pred } trait FlatMapped[S] extends super.FlatMapped[S] with Transformed[S] { @@ -108,7 +112,13 @@ extends SeqView[T, Coll] protected override def newFiltered(p: T => Boolean): Transformed[T] = new Filtered { val pred = p } protected override def newSliced(f: Int, u: Int): Transformed[T] = new Sliced { val from = f; val until = u } - protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = new Appended[U] { val rest = that } + protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = { + // we only append if `that` is a parallel sequence, i.e. it has a precise splitter + if (that.isParSeq) new Appended[U] { val rest = that } + else newForced(mutable.ParArray.fromTraversables(this, that)) + } + protected override def newForced[S](xs: => Seq[S]): Transformed[S] = new Forced[S] { val forced = xs } + protected override def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f } protected override def newFlatMapped[S](f: T => Traversable[S]): Transformed[S] = new FlatMapped[S] { val mapping = f } protected override def newDroppedWhile(p: T => Boolean): Transformed[T] = new DroppedWhile { val pred = p } diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index 8296d92e59..43acf3b41e 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -21,7 +21,8 @@ trait RemainsIterator[+T] extends Iterator[T] { /** Augments iterators with additional methods, mostly transformers, * assuming they iterate an iterable collection. * - * @param T type of the elements iterated. + * @param T type of the elements iterated. + * @param IterRepr iterator type. */ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] { @@ -326,6 +327,8 @@ extends AugmentedIterableIterator[T] with Signalling with DelegatedSignalling { +self => + def split: Seq[ParIterableIterator[T]] /** The number of elements this iterator has yet to traverse. This method @@ -339,12 +342,66 @@ extends AugmentedIterableIterator[T] * * In that case, 2 considerations must be taken into account: * - * 1) classes that inherit `ParIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt` and `copyToArray`. + * 1) classes that inherit `ParIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt`, `copyToArray` + * and which use tasks having the iterated subset length as a ctor argument. * * 2) if an iterator provides an upper bound on the number of elements, then after splitting the sum * of `remaining` values of split iterators must be less than or equal to this upper bound. */ def remaining: Int + + /* iterator transformers */ + + class Taken(taken: Int) extends ParIterableIterator[T] { + var signalDelegate = self.signalDelegate + var remaining = taken min self.remaining + def hasNext = remaining > 0 + def next = { remaining -= 1; self.next } + def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) } + protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = { + val shortened = for ((it, total) <- sq zip sq.scanLeft(0)(_ + _.remaining).tail) yield + if (total < remaining) it else taker(it, total - remaining) + shortened filter { _.remaining > 0 } + } + } + + override def take(n: Int) = new Taken(n) + + override def slice(from1: Int, until1: Int) = { + val it = new Taken(until1) + var todrop = from1 + while (todrop > 0 && it.hasNext) it.next + it + } + + class Mapped[S](f: T => S) extends ParIterableIterator[S] { + var signalDelegate = self.signalDelegate + def hasNext = self.hasNext + def next = f(self.next) + def remaining = self.remaining + def split: Seq[ParIterableIterator[S]] = self.split.map { _ map f } + } + + override def map[S](f: T => S) = new Mapped(f) + + class Appended[U >: T, PI <: ParIterableIterator[U]](protected val that: PI) extends ParIterableIterator[U] { + var signalDelegate = self.signalDelegate + protected var curr: ParIterableIterator[U] = self + def hasNext = if (curr.hasNext) true else if (curr eq self) { + curr = that + curr.hasNext + } else false + def next = if (curr eq self) { + hasNext + curr.next + } else curr.next + def remaining = if (curr eq self) curr.remaining + that.remaining else curr.remaining + protected def firstNonEmpty = (curr eq self) && curr.hasNext + def split: Seq[ParIterableIterator[U]] = if (firstNonEmpty) Seq(curr, that) else curr.split + } + + def appendIterable[U >: T, PI <: ParIterableIterator[U]](that: PI) = new Appended[U, PI](that) + } @@ -353,6 +410,7 @@ extends ParIterableIterator[T] with AugmentedSeqIterator[T] with PreciseSplitter[T] { +self => def split: Seq[ParSeqIterator[T]] def psplit(sizes: Int*): Seq[ParSeqIterator[T]] @@ -364,6 +422,60 @@ extends ParIterableIterator[T] * @return an exact number of elements this iterator has yet to iterate */ def remaining: Int + + /* iterator transformers */ + + class Taken(tk: Int) extends super.Taken(tk) with ParSeqIterator[T] { + override def split: Seq[ParSeqIterator[T]] = super.split.asInstanceOf[Seq[ParSeqIterator[T]]] + def psplit(sizes: Int*): Seq[ParSeqIterator[T]] = takeSeq(self.psplit(sizes: _*)) { (p, n) => p.take(n) } + } + + override def take(n: Int) = new Taken(n) + + override def slice(from1: Int, until1: Int) = { + val it = new Taken(until1) + var todrop = from1 + while (todrop > 0 && it.hasNext) it.next + it + } + + class Mapped[S](f: T => S) extends super.Mapped[S](f) with ParSeqIterator[S] { + override def split: Seq[ParSeqIterator[S]] = super.split.asInstanceOf[Seq[ParSeqIterator[S]]] + def psplit(sizes: Int*): Seq[ParSeqIterator[S]] = self.psplit(sizes: _*).map { _ map f } + } + + override def map[S](f: T => S) = new Mapped(f) + + class Appended[U >: T, PI <: ParSeqIterator[U]](it: PI) extends super.Appended[U, PI](it) with ParSeqIterator[U] { + override def split: Seq[ParSeqIterator[U]] = super.split.asInstanceOf[Seq[ParSeqIterator[U]]] + def psplit(sizes: Int*): Seq[ParSeqIterator[U]] = if (firstNonEmpty) { + val selfrem = self.remaining + + // split sizes + var appendMiddle = false + val szcum = sizes.scanLeft(0)(_ + _) + val splitsizes = sizes.zip(szcum.init zip szcum.tail).flatMap { t => + val (sz, (from, until)) = t + if (from < selfrem && until > selfrem) { + appendMiddle = true + Seq(selfrem - from, until - selfrem) + } else Seq(sz) + } + val (selfszfrom, thatszfrom) = splitsizes.zip(szcum.init).span(_._2 < selfrem) + val (selfsizes, thatsizes) = (selfszfrom map { _._1 }, thatszfrom map { _._1 }); + + // split iterators + val selfs = self.psplit(selfsizes: _*) + val thats = that.psplit(thatsizes: _*) + + // appended last in self with first in rest if necessary + if (appendMiddle) selfs.init ++ Seq(selfs.last.appendSeq[U, ParSeqIterator[U]](thats.head)) ++ thats.tail + else selfs ++ thats + } else curr.asInstanceOf[ParSeqIterator[U]].psplit(sizes: _*) + } + + def appendSeq[U >: T, PI <: ParSeqIterator[U]](that: PI) = new Appended[U, PI](that) + } diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index 9b0049cb10..d2123a402f 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -617,6 +617,15 @@ object ParArray extends ParFactory[ParArray] { handoff(newarr) } + def fromTraversables[T](xss: TraversableOnce[T]*) = { + val cb = ParArrayCombiner[T]() + for (xs <- xss) { + val it = xs.toIterator + while (it.hasNext) cb += it.next + } + cb.result + } + } diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala index 4cef521cf8..cb0e589d9b 100644 --- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala +++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala @@ -89,7 +89,7 @@ object ParArrayCombiner { def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ParArrayCombiner[T] = { new { val chain = c } with ParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]] } - def apply[T]: ParArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T]) + def apply[T](): ParArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T]) } diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index 3d8e7208ae..76677a1148 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -5,6 +5,7 @@ import java.lang.Thread._ import scala.collection.generic.CanBuildFrom import scala.collection.generic.CanCombineFrom +import scala.collection.parallel.mutable.ParArray /** Package object for parallel collections. @@ -56,6 +57,12 @@ package object parallel { def ifParSeq[R](isbody: ParSeq[T] => R) = new { def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody } + def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else { + val it = t.toIterator + val cb = mutable.ParArrayCombiner[T]() + while (it.hasNext) cb += it.next + cb.result + } } } diff --git a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala index 2cc3abaf27..f733f4154b 100644 --- a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala +++ b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala @@ -72,6 +72,10 @@ trait BenchmarkRegister { register(parallel_view.DummyViewBenchList.MediumReduce) register(parallel_view.DummyViewBenchList.ModifyThenReduce) register(parallel_view.DummyViewBenchList.ModifyThenForce) + register(parallel_view.DummyViewBenchList.Iteration) + register(parallel_view.DummyViewBenchList.IterationS) + register(parallel_view.DummyViewBenchList.IterationM) + register(parallel_view.DummyViewBenchList.IterationA) // parallel ranges register(parallel_range.RangeBenches.Reduce) diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala index 2b2ad81af6..ae93c7adf4 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala @@ -7,6 +7,8 @@ class Dummy(val in: Int) { var num = in override def toString = in.toString override def hashCode = in + def dummy = num + in + def one = "1".length } @@ -37,7 +39,7 @@ object DummyOperators extends Operators[Dummy] { a.in % 2 == 0 } val mapper = (a: Dummy) => { - a.num = a.in % 2 + a.num = a.dummy + a.num + a.in + a.one a } val heavymapper = (a: Dummy) => { @@ -51,9 +53,16 @@ object DummyOperators extends Operators[Dummy] { val taker = (a: Dummy) => { a.in >= 0 } + val eachFun: Dummy => Unit = (d: Dummy) => { + d.dummy + } + override def sequence(sz: Int): Seq[Dummy] = { + val pa = new collection.parallel.mutable.ParArray[Dummy](sz) + for (i <- 0 until sz) pa(i) = new Dummy(i) + pa + } } - diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala index 1268f94bac..4bd693a933 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala @@ -14,6 +14,8 @@ trait Operators[T] { def mapper2: T => T = error("unsupported") def heavymapper: T => T def taker: T => Boolean + def eachFun: T => Unit + def sequence(sz: Int): Seq[T] = error("unsupported") } @@ -40,6 +42,9 @@ trait IntOperators extends Operators[Int] { n + sum } val taker: Int => Boolean = _ < 10000 + val eachFun: Int => Unit = { n => + n % 2 == 0 + } } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala index da5a2a63ca..fd4e87ab4c 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala @@ -4,55 +4,56 @@ package generic +import scala.collection.SeqView - -trait ParIterableBench[T, Coll <: ParIterable[T]] extends collection.parallel.benchmarks.Bench { +trait ParIterableBenches[T, Coll <: ParIterable[T]] { self => - protected var seqcoll: Iterable[T] = null - protected var parcoll: Coll = null.asInstanceOf[Coll] - - reset - - def reset = runWhat match { - case "seq" => this.seqcoll = createSequential(size, parallelism) - case "par" => this.parcoll = createParallel(size, parallelism) - case _ => - } - - def nameOfCollection: String - def operators: Operators[T] def createSequential(sz: Int, p: Int): Iterable[T] def createParallel(sz: Int, p: Int): Coll + def nameOfCollection: String + def operators: Operators[T] trait IterableBenchCompanion extends BenchCompanion { def collectionName = self.nameOfCollection } - trait IterableBench extends ParIterableBench[T, Coll] { + trait IterableBench extends collection.parallel.benchmarks.Bench { + protected var seqcoll: Iterable[T] = null + protected var parcoll: Coll = null.asInstanceOf[Coll] + + reset + + def reset = runWhat match { + case "seq" => this.seqcoll = createSequential(size, parallelism) + case "par" => this.parcoll = createParallel(size, parallelism) + case _ => + } + def nameOfCollection = self.nameOfCollection def operators = self.operators def createSequential(sz: Int, p: Int) = self.createSequential(size, parallelism) def createParallel(sz: Int, p: Int) = self.createParallel(size, parallelism) def forkJoinPool: scala.concurrent.forkjoin.ForkJoinPool = self.forkJoinPool - } - def forkJoinPool: scala.concurrent.forkjoin.ForkJoinPool + override def printResults { + println(" --- Fork join pool state --- ") + println("Parallelism: " + forkJoinPool.getParallelism) + println("Active threads: " + forkJoinPool.getActiveThreadCount) + println("Work stealings: " + forkJoinPool.getStealCount) + } - override def printResults { - println(" --- Fork join pool state --- ") - println("Parallelism: " + forkJoinPool.getParallelism) - println("Active threads: " + forkJoinPool.getActiveThreadCount) - println("Work stealings: " + forkJoinPool.getStealCount) } + def forkJoinPool: scala.concurrent.forkjoin.ForkJoinPool + } -trait ParSeqBench[T, Coll <: ParSeq[T]] extends ParIterableBench[T, Coll] { - self => +trait ParSeqBenches[T, Coll <: ParSeq[T]] extends ParIterableBenches[T, Coll] { +self => def createSequential(sz: Int, p: Int): Seq[T] @@ -60,27 +61,18 @@ trait ParSeqBench[T, Coll <: ParSeq[T]] extends ParIterableBench[T, Coll] { def collectionName = self.nameOfCollection } - trait SeqBench extends IterableBench with ParSeqBench[T, Coll] { - override def createSequential(sz: Int, p: Int) = self.createSequential(size, parallelism) + trait SeqBench extends IterableBench { + override def createSequential(sz: Int, p: Int) = self.createSequential(sz, p) } } -trait NotBenchmark { - lazy val runWhat = ""; - val size = -1 - val parallelism = -1 - def runpar {} - def runseq {} - def companion = throw new UnsupportedOperationException -} -/** - * Standard benchmarks for collections. +/** Standard benchmarks for collections. */ -trait StandardParIterableBench[T, Coll <: ParIterable[T]] extends ParIterableBench[T, Coll] { +trait StandardParIterableBenches[T, Coll <: ParIterable[T]] extends ParIterableBenches[T, Coll] { object Reduce extends IterableBenchCompanion { override def defaultSize = 50000 @@ -89,7 +81,7 @@ trait StandardParIterableBench[T, Coll <: ParIterable[T]] extends ParIterableBen } class Reduce(val size: Int, val parallelism: Int, val runWhat: String) - extends IterableBench with StandardParIterableBench[T, Coll] { + extends IterableBench { def comparisonMap = collection.Map() def runseq = this.seqcoll.reduceLeft(operators.reducer) def runpar = this.parcoll.reduce(operators.reducer) @@ -103,7 +95,7 @@ trait StandardParIterableBench[T, Coll <: ParIterable[T]] extends ParIterableBen } class ReduceMedium(val size: Int, val parallelism: Int, val runWhat: String) - extends IterableBench with StandardParIterableBench[T, Coll] { + extends IterableBench { def comparisonMap = collection.Map() def runseq = this.seqcoll.reduceLeft(operators.mediumreducer) def runpar = this.parcoll.reduce(operators.mediumreducer) @@ -117,7 +109,7 @@ trait StandardParIterableBench[T, Coll <: ParIterable[T]] extends ParIterableBen } class Map(val size: Int, val parallelism: Int, val runWhat: String) - extends IterableBench with StandardParIterableBench[T, Coll] { + extends IterableBench { def comparisonMap = collection.Map() def runseq = this.seqcoll.map(operators.mapper) def runpar = this.parcoll.map(operators.mapper) @@ -128,33 +120,108 @@ trait StandardParIterableBench[T, Coll <: ParIterable[T]] extends ParIterableBen -/** - * Benchmarks for sequence views. +/** Benchmarks for sequence views. */ -trait ParSeqViewBench[T, Coll <: ParSeqView[T, ParSeq[T], CollSeq], CollSeq] extends ParSeqBench[T, Coll] { +trait ParSeqViewBenches[T, Coll <: ParSeqView[T, ParSeq[T], CollSeq], CollSeq] extends ParSeqBenches[T, Coll] { +self => - object Reduce extends IterableBenchCompanion { + trait SeqViewBench extends SeqBench { + lazy val seqview: SeqView[T, Seq[T]] = createSeqView(size, parallelism) + + def createSeqView(sz: Int, p: Int) = self.createSeqView(sz, p) + } + + def createSeqView(sz: Int, p: Int): SeqView[T, Seq[T]] + + object Iteration extends SeqBenchCompanion { + override def defaultSize = 250000 + def benchName = "iter" + def apply(sz: Int, p: Int, w: String) = new Iteration(sz, p, w) + } + + class Iteration(val size: Int, val parallelism: Int, val runWhat: String) + extends SeqBench with SeqViewBench { + def comparisonMap = collection.Map("seqview" -> runseqview _) + def runseq = this.seqcoll.foreach(operators.eachFun) + def runpar = this.parcoll.foreach(operators.eachFun) + def runseqview = { + this.seqview.foreach(operators.eachFun) + } + def companion = Iteration + } + + object IterationS extends SeqBenchCompanion { + override def defaultSize = 250000 + def benchName = "iter-s" + def apply(sz: Int, p: Int, w: String) = new IterationS(sz, p, w) + } + + class IterationS(val size: Int, val parallelism: Int, val runWhat: String) + extends SeqBench with SeqViewBench { + def comparisonMap = collection.Map("seqview" -> runseqview _) + def runseq = this.seqcoll.slice(0, size / 2).foreach(operators.eachFun) + def runpar = this.parcoll.slice(0, size / 2).foreach(operators.eachFun) + def runseqview = this.seqview.slice(0, size / 2).foreach(operators.eachFun) + def companion = IterationS + } + + object IterationM extends SeqBenchCompanion { + override def defaultSize = 100000 + def benchName = "iter-m" + def apply(sz: Int, p: Int, w: String) = new IterationM(sz, p, w) + } + + class IterationM(val size: Int, val parallelism: Int, val runWhat: String) + extends SeqBench with SeqViewBench { + def comparisonMap = collection.Map("seqview" -> runseqview _) + def runseq = this.seqcoll.map(operators.mapper).foreach(operators.eachFun) + def runpar = this.parcoll.map(operators.mapper).foreach(operators.eachFun) + def runseqview = this.seqview.map(operators.mapper).foreach(operators.eachFun) + def companion = IterationM + } + + object IterationA extends SeqBenchCompanion { + override def defaultSize = 50000 + def benchName = "iter-a" + def apply(sz: Int, p: Int, w: String) = new IterationA(sz, p, w) + } + + class IterationA(val size: Int, val parallelism: Int, val runWhat: String) + extends SeqBench with SeqViewBench { + val appended = operators.sequence(size) + val sqappended = appended.toSeq + def comparisonMap = collection.Map("seqview" -> runseqview _) + def runseq = { + val withapp = this.seqcoll.++(sqappended) + withapp.foreach(operators.eachFun) + } + def runpar = this.parcoll.++(appended).foreach(operators.eachFun) + def runseqview = this.seqview.++(appended).foreach(operators.eachFun) + def companion = IterationA + } + + object Reduce extends SeqBenchCompanion { override def defaultSize = 50000 def benchName = "reduce"; def apply(sz: Int, p: Int, w: String) = new Reduce(sz, p, w) } class Reduce(val size: Int, val parallelism: Int, val runWhat: String) - extends SeqBench with ParSeqViewBench[T, Coll, CollSeq] { + extends SeqBench with SeqViewBench { def comparisonMap = collection.Map() def runseq = this.seqcoll.reduceLeft(operators.reducer) def runpar = this.parcoll.reduce(operators.reducer) def companion = Reduce } - object MediumReduce extends IterableBenchCompanion { + object MediumReduce extends SeqBenchCompanion { override def defaultSize = 50000 def benchName = "reduce-medium"; def apply(sz: Int, p: Int, w: String) = new MediumReduce(sz, p, w) } class MediumReduce(val size: Int, val parallelism: Int, val runWhat: String) - extends SeqBench with ParSeqViewBench[T, Coll, CollSeq] { + extends SeqBench with SeqViewBench { def comparisonMap = collection.Map() def runseq = this.seqcoll.reduceLeft(operators.mediumreducer) def runpar = this.parcoll.reduce(operators.mediumreducer) @@ -168,7 +235,7 @@ trait ParSeqViewBench[T, Coll <: ParSeqView[T, ParSeq[T], CollSeq], CollSeq] ext } class ModifyThenReduce(val size: Int, val parallelism: Int, val runWhat: String) - extends SeqBench with ParSeqViewBench[T, Coll, CollSeq] { + extends SeqBench with SeqViewBench { val toadd = createSequential(size, parallelism) def comparisonMap = collection.Map() def runseq = { @@ -189,7 +256,7 @@ trait ParSeqViewBench[T, Coll <: ParSeqView[T, ParSeq[T], CollSeq], CollSeq] ext } class ModifyThenForce(val size: Int, val parallelism: Int, val runWhat: String) - extends SeqBench with ParSeqViewBench[T, Coll, CollSeq] { + extends SeqBench with SeqViewBench { val toadd = createSequential(size, parallelism) def comparisonMap = collection.Map() def runseq = (seqcoll ++ toadd).drop(size).map(operators.mapper).++(toadd).take(size) diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala index c68bce93b2..11726b05bb 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala @@ -3,8 +3,7 @@ package scala.collection.parallel.benchmarks.hashtries -import scala.collection.parallel.benchmarks.generic.StandardParIterableBench -import scala.collection.parallel.benchmarks.generic.NotBenchmark +import scala.collection.parallel.benchmarks.generic.StandardParIterableBenches import scala.collection.parallel.benchmarks.generic.Dummy import scala.collection.parallel.benchmarks.generic.Operators import scala.collection.parallel.immutable.ParHashTrie @@ -13,7 +12,7 @@ import scala.collection.parallel.immutable.ParHashTrie -trait ParHashTrieBenches[K, V] extends StandardParIterableBench[(K, V), ParHashTrie[K, V]] { +trait ParHashTrieBenches[K, V] extends StandardParIterableBenches[(K, V), ParHashTrie[K, V]] { def nameOfCollection = "ParHashTrie" def comparisonMap = collection.Map() @@ -27,7 +26,7 @@ trait ParHashTrieBenches[K, V] extends StandardParIterableBench[(K, V), ParHashT } class Map2(val size: Int, val parallelism: Int, val runWhat: String) - extends IterableBench with StandardParIterableBench[(K, V), ParHashTrie[K, V]] { + extends IterableBench { var result: Int = 0 def comparisonMap = collection.Map("jhashtable" -> runjhashtable _, "hashtable" -> runhashtable _) def runseq = { @@ -78,7 +77,7 @@ trait ParHashTrieBenches[K, V] extends StandardParIterableBench[(K, V), ParHashT } class Reduce2(val size: Int, val parallelism: Int, val runWhat: String) - extends IterableBench with StandardParIterableBench[(K, V), ParHashTrie[K, V]] { + extends IterableBench { private var ht: collection.mutable.HashMap[K, V] = _ def comparisonMap = collection.Map("hashtable" -> runhashtable _) def runseq = this.seqcoll.reduceLeft(operators.reducer) @@ -99,7 +98,7 @@ trait ParHashTrieBenches[K, V] extends StandardParIterableBench[(K, V), ParHashT -object RefParHashTrieBenches extends ParHashTrieBenches[Dummy, Dummy] with NotBenchmark { +object RefParHashTrieBenches extends ParHashTrieBenches[Dummy, Dummy] { type DPair = (Dummy, Dummy) @@ -149,6 +148,9 @@ object RefParHashTrieBenches extends ParHashTrieBenches[Dummy, Dummy] with NotBe (a, p._2) } val taker = (p: DPair) => true + val eachFun: DPair => Unit = { dp => + dp._1.dummy + } } def createSequential(sz: Int, p: Int) = { diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala index 8a01d668fb..14a6259a38 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala @@ -6,13 +6,13 @@ package scala.collection.parallel.benchmarks.parallel_range import scala.collection.parallel.benchmarks.generic._ import scala.collection.parallel.immutable.ParRange +import scala.collection.parallel.benchmarks.generic.StandardParIterableBenches - -object RangeBenches extends StandardParIterableBench[Int, ParRange] with NotBenchmark { +object RangeBenches extends StandardParIterableBenches[Int, ParRange] { def nameOfCollection = "ParRange" def operators = new IntOperators {} @@ -33,7 +33,7 @@ object RangeBenches extends StandardParIterableBench[Int, ParRange] with NotBenc } class MapLight(val size: Int, val parallelism: Int, val runWhat: String) - extends IterableBench with StandardParIterableBench[Int, ParRange] { + extends IterableBench { def calc(n: Int) = n % 2 + 1 def comparisonMap = collection.Map() @@ -49,7 +49,7 @@ object RangeBenches extends StandardParIterableBench[Int, ParRange] with NotBenc } class MapMedium(val size: Int, val parallelism: Int, val runWhat: String) - extends IterableBench with StandardParIterableBench[Int, ParRange] { + extends IterableBench { def calc(n: Int) = { var i = 0 var sum = n @@ -73,7 +73,7 @@ object RangeBenches extends StandardParIterableBench[Int, ParRange] with NotBenc } class ForeachModify(val size: Int, val parallelism: Int, val runWhat: String) - extends IterableBench with StandardParIterableBench[Int, ParRange] { + extends IterableBench { val array = new Array[Int](size) def modify(n: Int) = array(n) += 1 @@ -90,7 +90,7 @@ object RangeBenches extends StandardParIterableBench[Int, ParRange] with NotBenc } class ForeachModifyMedium(val size: Int, val parallelism: Int, val runWhat: String) - extends IterableBench with StandardParIterableBench[Int, ParRange] { + extends IterableBench { val array = new Array[Int](size) def modify(n: Int) = array(n) = { var i = 0 @@ -115,7 +115,7 @@ object RangeBenches extends StandardParIterableBench[Int, ParRange] with NotBenc } class ForeachModifyHeavy(val size: Int, val parallelism: Int, val runWhat: String) - extends IterableBench with StandardParIterableBench[Int, ParRange] { + extends IterableBench { val array = new Array[Int](size) def modify(n: Int) = array(n) = collatz(10000 + array(n)) @@ -133,7 +133,7 @@ object RangeBenches extends StandardParIterableBench[Int, ParRange] with NotBenc } class ForeachAdd(val size: Int, val parallelism: Int, val runWhat: String) - extends IterableBench with StandardParIterableBench[Int, ParRange] { + extends IterableBench { val cmap = new java.util.concurrent.ConcurrentHashMap[Int, Int] val hmap = new java.util.HashMap[Int, Int] @@ -157,7 +157,7 @@ object RangeBenches extends StandardParIterableBench[Int, ParRange] with NotBenc } class ForeachAddCollatz(val size: Int, val parallelism: Int, val runWhat: String) - extends IterableBench with StandardParIterableBench[Int, ParRange] { + extends IterableBench { val cmap = new java.util.concurrent.ConcurrentHashMap[Int, Int] val hmap = new java.util.HashMap[Int, Int] diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala index e84b25971b..eed62fc5c1 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala @@ -16,7 +16,7 @@ import scala.collection.SeqView trait DummyViewBenches -extends ParSeqViewBench[Dummy, ParSeqView[Dummy, ParSeq[Dummy], Seq[Dummy]], Seq[Dummy]] { +extends ParSeqViewBenches[Dummy, ParSeqView[Dummy, ParSeq[Dummy], Seq[Dummy]], Seq[Dummy]] { def nameOfCollection = "ParView" def operators = DummyOperators def comparisonMap = collection.Map() @@ -34,10 +34,11 @@ extends ParSeqViewBench[Dummy, ParSeqView[Dummy, ParSeq[Dummy], Seq[Dummy]], Seq v.environment = forkJoinPool v } + def createSeqView(sz: Int, p: Int) = createSequential(sz, p).view } -object DummyViewBenchList extends DummyViewBenches with NotBenchmark +object DummyViewBenchList extends DummyViewBenches -- cgit v1.2.3