diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-09-22 14:18:19 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-09-22 14:18:19 +0000 |
commit | 285d2182f1aeb113aba55be804eefa2f61ce2624 (patch) | |
tree | d6fab90800c9428e946d913b42ac662785d4e15b /src/library | |
parent | a5d47fb693d9b88ea9ed414762f16e027be64ada (diff) | |
download | scala-285d2182f1aeb113aba55be804eefa2f61ce2624.tar.gz scala-285d2182f1aeb113aba55be804eefa2f61ce2624.tar.bz2 scala-285d2182f1aeb113aba55be804eefa2f61ce2624.zip |
Reimplementing parallel views to solve several ...
Reimplementing parallel views to solve several performance glitches. No
review.
Diffstat (limited to 'src/library')
9 files changed, 233 insertions, 38 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 8fc232579a..e3fde884e2 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -117,10 +117,10 @@ import java.util.concurrent.atomic.AtomicBoolean * This method will provide sequential views it produces with `indexFlag` signalling capabilities. This means * that sequential views may set and read `indexFlag` state. */ -trait ParIterableLike[+T, +Repr <: Parallel, +SequentialView <: Iterable[T]] +trait ParIterableLike[+T, +Repr <: Parallel, +Sequential <: Iterable[T] with IterableLike[T, Sequential]] extends IterableLike[T, Repr] with Parallelizable[Repr] - with Sequentializable[T, SequentialView] + with Sequentializable[T, Sequential] with Parallel with HasNewCombiner[T, Repr] with TaskSupport { @@ -172,7 +172,7 @@ self => * * @return a parallel iterator */ - protected def parallelIterator: ParIterator + def parallelIterator: ParIterableIterator[T] /** Creates a new split iterator used to traverse the elements of this collection. * @@ -235,8 +235,8 @@ self => var result: R = null.asInstanceOf[R] } - /* convenience iterator operations wrapper */ - protected implicit def iterator2ops[PI <: ParIterator](it: PI) = new { + /* convenience signalling operations wrapper */ + protected implicit def delegatedSignalling2ops[PI <: DelegatedSignalling](it: PI) = new { def assign(cntx: Signalling): PI = { it.signalDelegate = cntx it @@ -604,6 +604,12 @@ self => executeAndWait(new CopyToArray(start, len, xs, parallelIterator)) } + override def view = new ParIterableView[T, Repr, Sequential] { + protected lazy val underlying = self.repr + def seq = self.seq.view + def parallelIterator = self.parallelIterator + } + override def toIterable: Iterable[T] = seq.drop(0).asInstanceOf[Iterable[T]] override def toArray[U >: T: ClassManifest]: Array[U] = { diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala index 18ae0ae097..1f7ea9b694 100644 --- a/src/library/scala/collection/parallel/ParIterableViewLike.scala +++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala @@ -7,7 +7,7 @@ import scala.collection.Parallel import scala.collection.TraversableViewLike import scala.collection.IterableView import scala.collection.IterableViewLike - +import scala.collection.generic.CanBuildFrom @@ -43,7 +43,58 @@ extends IterableView[T, Coll] type CPI = SignalContextPassingIterator[ParIterator] - trait Transformed[+S] extends ParIterableView[S, Coll, CollSeq] with super.Transformed[S] + /* wrappers */ + + trait Transformed[+S] extends ParIterableView[S, Coll, CollSeq] with super.Transformed[S] { + override def parallelIterator: ParIterableIterator[S] + override def iterator = parallelIterator + environment = self.environment + } + + trait Sliced extends super.Sliced with Transformed[T] { + override def slice(from1: Int, until1: Int): This = newSliced(from1 max 0, until1 max 0).asInstanceOf[This] + def parallelIterator: ParIterableIterator[T] = self.parallelIterator.slice(from, until) + def seq = self.seq.slice(from, until) + } + + trait Mapped[S] extends super.Mapped[S] with Transformed[S]{ + def parallelIterator: ParIterableIterator[S] = self.parallelIterator.map(mapping) + def seq = self.seq.map(mapping).asInstanceOf[IterableView[S, CollSeq]] + } + + // only use if other is a ParIterable, otherwise force + trait Appended[U >: T] extends super.Appended[U] with Transformed[U] { + def restAsParIterable: ParIterable[U] = rest.asParIterable + def parallelIterator: ParIterableIterator[U] = self.parallelIterator.appendIterable[U, ParIterableIterator[U]](restAsParIterable.parallelIterator) + def seq = self.seq.++(rest).asInstanceOf[IterableView[U, CollSeq]] + } + + trait Forced[S] extends super.Forced[S] with Transformed[S] { + def forcedPar: ParIterable[S] = forced.asParIterable + def parallelIterator: ParIterableIterator[S] = forcedPar.parallelIterator + // cheating here - knowing that `underlying` of `self.seq` is of type `CollSeq`, + // we use it to obtain a view of the correct type - not the most efficient thing + // in the universe, but without making `newForced` more accessible, or adding + // a `forced` method to `SeqView`, this is the best we can do + def seq = self.seq.take(0).++(forced).asInstanceOf[IterableView[S, CollSeq]] + } + + /* operation overrides */ + + override def slice(from: Int, until: Int): This = newSliced(from, until).asInstanceOf[This] + override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That] + override def ++[U >: T, That](xs: TraversableOnce[U])(implicit bf: CanBuildFrom[This, U, That]): That = newAppended(xs.toTraversable).asInstanceOf[That] + + /* wrapper virtual ctors */ + + protected override def newSliced(f: Int, u: Int): Transformed[T] = new Sliced { val from = f; val until = u } + protected override def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f } + protected override def newForced[S](xs: => Seq[S]): Transformed[S] = new Forced[S] { val forced = xs } + protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = { + // we only append if `that` is a parallel iterable, i.e. it has a splitter + if (that.isParIterable) new Appended[U] { val rest = that } + else newForced(mutable.ParArray.fromTraversables(this, that)) + } } diff --git a/src/library/scala/collection/parallel/ParMapLike.scala b/src/library/scala/collection/parallel/ParMapLike.scala index 252ca2446f..e6944953b5 100644 --- a/src/library/scala/collection/parallel/ParMapLike.scala +++ b/src/library/scala/collection/parallel/ParMapLike.scala @@ -16,10 +16,10 @@ import scala.collection.mutable.Builder trait ParMapLike[K, +V, - +Repr <: ParMapLike[K, V, Repr, SequentialView] with ParMap[K, V], - +SequentialView <: Map[K, V]] + +Repr <: ParMapLike[K, V, Repr, Sequential] with ParMap[K, V], + +Sequential <: Map[K, V] with MapLike[K, V, Sequential]] extends MapLike[K, V, Repr] - with ParIterableLike[(K, V), Repr, SequentialView] + with ParIterableLike[(K, V), Repr, Sequential] { self => protected[this] override def newBuilder: Builder[(K, V), Repr] = newCombiner diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 7e8b9d6129..9ea647fd9f 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -81,7 +81,7 @@ self => * * @return an iterator that can be split into subsets of precise size */ - protected def parallelIterator: ParIterator + def parallelIterator: ParSeqIterator[T] override def iterator: PreciseSplitter[T] = parallelIterator @@ -308,7 +308,7 @@ self => def length = self.length def apply(idx: Int) = self(idx) def seq = self.seq.view - def parallelIterator = new Elements(0, length) with SCPI {} + def parallelIterator = self.parallelIterator } override def view(from: Int, until: Int) = view.slice(from, until) @@ -350,7 +350,7 @@ self => pit.setIndexFlagIfLesser(from) } } - protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException + protected[this] def newSubtask(p: SuperParIterator) = unsupported override def split = { val pits = pit.split for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield new IndexWhere(pred, untilp, p) @@ -370,7 +370,7 @@ self => pit.setIndexFlagIfGreater(pos) } } - protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException + protected[this] def newSubtask(p: SuperParIterator) = unsupported override def split = { val pits = pit.split for ((p, untilp) <- pits zip pits.scanLeft(pos)(_ + _.remaining)) yield new LastIndexWhere(pred, untilp, p) @@ -396,7 +396,7 @@ self => override def merge(that: ReverseMap[S, That]) = result = that.result combine result } - protected[this] class SameElements[U >: T](val pit: ParIterator, val otherpit: PreciseSplitter[U]) + protected[this] class SameElements[U >: T](protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[U]) extends Accessor[Boolean, SameElements[U]] { var result: Boolean = true def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { @@ -424,7 +424,7 @@ self => override def merge(that: Updated[U, That]) = result = result combine that.result } - protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], val pit: ParIterator, val otherpit: PreciseSplitter[S]) + protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[S]) extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] { var result: Result = null def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, pbf(self.repr)) @@ -442,7 +442,7 @@ self => override def merge(that: Zip[U, S, That]) = result = result combine that.result } - protected[this] class Corresponds[S](corr: (T, S) => Boolean, val pit: ParIterator, val otherpit: PreciseSplitter[S]) + protected[this] class Corresponds[S](corr: (T, S) => Boolean, protected[this] val pit: ParSeqIterator[T], val otherpit: PreciseSplitter[S]) extends Accessor[Boolean, Corresponds[S]] { var result: Boolean = true def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { diff --git a/src/library/scala/collection/parallel/ParSeqViewLike.scala b/src/library/scala/collection/parallel/ParSeqViewLike.scala index 31b90936d7..1aac72767a 100644 --- a/src/library/scala/collection/parallel/ParSeqViewLike.scala +++ b/src/library/scala/collection/parallel/ParSeqViewLike.scala @@ -44,34 +44,38 @@ extends SeqView[T, Coll] trait Transformed[+S] extends ParSeqView[S, Coll, CollSeq] with super[ParIterableView].Transformed[S] with super[SeqView].Transformed[S] { - override def parallelIterator = new Elements(0, length) with SCPI {} + override def parallelIterator: ParSeqIterator[S] = new Elements(0, length) with SCPI {} override def iterator = parallelIterator - environment = self.environment } - trait Forced[S] extends super.Forced[S] with Transformed[S] { - // cheating here - knowing that `underlying` of `self.seq` is of type `CollSeq`, - // we use it to obtain a view of the correct type - not the most efficient thing - // in the universe, but without making `newForced` more accessible, or adding - // a `forced` method to `SeqView`, this is the best we can do - def seq = self.seq.take(0).++(forced).asInstanceOf[SeqView[S, CollSeq]] + trait Sliced extends super[SeqViewLike].Sliced with super[ParIterableViewLike].Sliced with Transformed[T] { + override def slice(from1: Int, until1: Int): This = newSliced(from1 max 0, until1 max 0).asInstanceOf[This] + override def parallelIterator = self.parallelIterator.psplit(from, until - from)(1) } - trait Filtered extends super.Filtered with Transformed[T] { - def seq = self.seq filter pred + trait Mapped[S] extends super[SeqViewLike].Mapped[S] with super[ParIterableViewLike].Mapped[S] with Transformed[S] { + override def parallelIterator = self.parallelIterator.map(mapping) + override def seq = self.seq.map(mapping).asInstanceOf[SeqView[S, CollSeq]] } - trait Sliced extends super.Sliced with Transformed[T] { - override def slice(from1: Int, until1: Int): This = newSliced(from1 max 0, until1 max 0).asInstanceOf[This] - def seq = self.seq.slice(from, until) + trait Appended[U >: T] extends super[SeqViewLike].Appended[U] with super[ParIterableViewLike].Appended[U] with Transformed[U] { + def restAsParSeq: ParSeq[U] = rest.asInstanceOf[ParSeq[U]] + override def parallelIterator = self.parallelIterator.appendSeq[U, ParSeqIterator[U]](restAsParSeq.parallelIterator) + override def seq = self.seq.++(rest).asInstanceOf[SeqView[U, CollSeq]] } - trait Appended[U >: T] extends super.Appended[U] with Transformed[U] { - def seq = self.seq.++(rest).asInstanceOf[SeqView[U, CollSeq]] + trait Forced[S] extends super[SeqViewLike].Forced[S] with super[ParIterableViewLike].Forced[S] with Transformed[S] { + override def forcedPar: ParSeq[S] = forced.asParSeq + override def parallelIterator: ParSeqIterator[S] = forcedPar.parallelIterator + // cheating here - knowing that `underlying` of `self.seq` is of type `CollSeq`, + // we use it to obtain a view of the correct type - not the most efficient thing + // in the universe, but without making `newForced` more accessible, or adding + // a `forced` method to `SeqView`, this is the best we can do + override def seq = self.seq.take(0).++(forced).asInstanceOf[SeqView[S, CollSeq]] } - trait Mapped[S] extends super.Mapped[S] with Transformed[S]{ - def seq = self.seq.map(mapping).asInstanceOf[SeqView[S, CollSeq]] + trait Filtered extends super.Filtered with Transformed[T] { + def seq = self.seq filter pred } trait FlatMapped[S] extends super.FlatMapped[S] with Transformed[S] { @@ -108,7 +112,13 @@ extends SeqView[T, Coll] protected override def newFiltered(p: T => Boolean): Transformed[T] = new Filtered { val pred = p } protected override def newSliced(f: Int, u: Int): Transformed[T] = new Sliced { val from = f; val until = u } - protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = new Appended[U] { val rest = that } + protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = { + // we only append if `that` is a parallel sequence, i.e. it has a precise splitter + if (that.isParSeq) new Appended[U] { val rest = that } + else newForced(mutable.ParArray.fromTraversables(this, that)) + } + protected override def newForced[S](xs: => Seq[S]): Transformed[S] = new Forced[S] { val forced = xs } + protected override def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f } protected override def newFlatMapped[S](f: T => Traversable[S]): Transformed[S] = new FlatMapped[S] { val mapping = f } protected override def newDroppedWhile(p: T => Boolean): Transformed[T] = new DroppedWhile { val pred = p } diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index 8296d92e59..43acf3b41e 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -21,7 +21,8 @@ trait RemainsIterator[+T] extends Iterator[T] { /** Augments iterators with additional methods, mostly transformers, * assuming they iterate an iterable collection. * - * @param T type of the elements iterated. + * @param T type of the elements iterated. + * @param IterRepr iterator type. */ trait AugmentedIterableIterator[+T] extends RemainsIterator[T] { @@ -326,6 +327,8 @@ extends AugmentedIterableIterator[T] with Signalling with DelegatedSignalling { +self => + def split: Seq[ParIterableIterator[T]] /** The number of elements this iterator has yet to traverse. This method @@ -339,12 +342,66 @@ extends AugmentedIterableIterator[T] * * In that case, 2 considerations must be taken into account: * - * 1) classes that inherit `ParIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt` and `copyToArray`. + * 1) classes that inherit `ParIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt`, `copyToArray` + * and which use tasks having the iterated subset length as a ctor argument. * * 2) if an iterator provides an upper bound on the number of elements, then after splitting the sum * of `remaining` values of split iterators must be less than or equal to this upper bound. */ def remaining: Int + + /* iterator transformers */ + + class Taken(taken: Int) extends ParIterableIterator[T] { + var signalDelegate = self.signalDelegate + var remaining = taken min self.remaining + def hasNext = remaining > 0 + def next = { remaining -= 1; self.next } + def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) } + protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = { + val shortened = for ((it, total) <- sq zip sq.scanLeft(0)(_ + _.remaining).tail) yield + if (total < remaining) it else taker(it, total - remaining) + shortened filter { _.remaining > 0 } + } + } + + override def take(n: Int) = new Taken(n) + + override def slice(from1: Int, until1: Int) = { + val it = new Taken(until1) + var todrop = from1 + while (todrop > 0 && it.hasNext) it.next + it + } + + class Mapped[S](f: T => S) extends ParIterableIterator[S] { + var signalDelegate = self.signalDelegate + def hasNext = self.hasNext + def next = f(self.next) + def remaining = self.remaining + def split: Seq[ParIterableIterator[S]] = self.split.map { _ map f } + } + + override def map[S](f: T => S) = new Mapped(f) + + class Appended[U >: T, PI <: ParIterableIterator[U]](protected val that: PI) extends ParIterableIterator[U] { + var signalDelegate = self.signalDelegate + protected var curr: ParIterableIterator[U] = self + def hasNext = if (curr.hasNext) true else if (curr eq self) { + curr = that + curr.hasNext + } else false + def next = if (curr eq self) { + hasNext + curr.next + } else curr.next + def remaining = if (curr eq self) curr.remaining + that.remaining else curr.remaining + protected def firstNonEmpty = (curr eq self) && curr.hasNext + def split: Seq[ParIterableIterator[U]] = if (firstNonEmpty) Seq(curr, that) else curr.split + } + + def appendIterable[U >: T, PI <: ParIterableIterator[U]](that: PI) = new Appended[U, PI](that) + } @@ -353,6 +410,7 @@ extends ParIterableIterator[T] with AugmentedSeqIterator[T] with PreciseSplitter[T] { +self => def split: Seq[ParSeqIterator[T]] def psplit(sizes: Int*): Seq[ParSeqIterator[T]] @@ -364,6 +422,60 @@ extends ParIterableIterator[T] * @return an exact number of elements this iterator has yet to iterate */ def remaining: Int + + /* iterator transformers */ + + class Taken(tk: Int) extends super.Taken(tk) with ParSeqIterator[T] { + override def split: Seq[ParSeqIterator[T]] = super.split.asInstanceOf[Seq[ParSeqIterator[T]]] + def psplit(sizes: Int*): Seq[ParSeqIterator[T]] = takeSeq(self.psplit(sizes: _*)) { (p, n) => p.take(n) } + } + + override def take(n: Int) = new Taken(n) + + override def slice(from1: Int, until1: Int) = { + val it = new Taken(until1) + var todrop = from1 + while (todrop > 0 && it.hasNext) it.next + it + } + + class Mapped[S](f: T => S) extends super.Mapped[S](f) with ParSeqIterator[S] { + override def split: Seq[ParSeqIterator[S]] = super.split.asInstanceOf[Seq[ParSeqIterator[S]]] + def psplit(sizes: Int*): Seq[ParSeqIterator[S]] = self.psplit(sizes: _*).map { _ map f } + } + + override def map[S](f: T => S) = new Mapped(f) + + class Appended[U >: T, PI <: ParSeqIterator[U]](it: PI) extends super.Appended[U, PI](it) with ParSeqIterator[U] { + override def split: Seq[ParSeqIterator[U]] = super.split.asInstanceOf[Seq[ParSeqIterator[U]]] + def psplit(sizes: Int*): Seq[ParSeqIterator[U]] = if (firstNonEmpty) { + val selfrem = self.remaining + + // split sizes + var appendMiddle = false + val szcum = sizes.scanLeft(0)(_ + _) + val splitsizes = sizes.zip(szcum.init zip szcum.tail).flatMap { t => + val (sz, (from, until)) = t + if (from < selfrem && until > selfrem) { + appendMiddle = true + Seq(selfrem - from, until - selfrem) + } else Seq(sz) + } + val (selfszfrom, thatszfrom) = splitsizes.zip(szcum.init).span(_._2 < selfrem) + val (selfsizes, thatsizes) = (selfszfrom map { _._1 }, thatszfrom map { _._1 }); + + // split iterators + val selfs = self.psplit(selfsizes: _*) + val thats = that.psplit(thatsizes: _*) + + // appended last in self with first in rest if necessary + if (appendMiddle) selfs.init ++ Seq(selfs.last.appendSeq[U, ParSeqIterator[U]](thats.head)) ++ thats.tail + else selfs ++ thats + } else curr.asInstanceOf[ParSeqIterator[U]].psplit(sizes: _*) + } + + def appendSeq[U >: T, PI <: ParSeqIterator[U]](that: PI) = new Appended[U, PI](that) + } diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index 9b0049cb10..d2123a402f 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -617,6 +617,15 @@ object ParArray extends ParFactory[ParArray] { handoff(newarr) } + def fromTraversables[T](xss: TraversableOnce[T]*) = { + val cb = ParArrayCombiner[T]() + for (xs <- xss) { + val it = xs.toIterator + while (it.hasNext) cb += it.next + } + cb.result + } + } diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala index 4cef521cf8..cb0e589d9b 100644 --- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala +++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala @@ -89,7 +89,7 @@ object ParArrayCombiner { def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ParArrayCombiner[T] = { new { val chain = c } with ParArrayCombiner[T] with EnvironmentPassingCombiner[T, ParArray[T]] } - def apply[T]: ParArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T]) + def apply[T](): ParArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T]) } diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index 3d8e7208ae..76677a1148 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -5,6 +5,7 @@ import java.lang.Thread._ import scala.collection.generic.CanBuildFrom import scala.collection.generic.CanCombineFrom +import scala.collection.parallel.mutable.ParArray /** Package object for parallel collections. @@ -56,6 +57,12 @@ package object parallel { def ifParSeq[R](isbody: ParSeq[T] => R) = new { def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody } + def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else { + val it = t.toIterator + val cb = mutable.ParArrayCombiner[T]() + while (it.hasNext) cb += it.next + cb.result + } } } |