diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-06-18 15:06:17 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-06-18 15:06:17 +0000 |
commit | 9923b97157725ae1f7853a4834ef5e31283a1b98 (patch) | |
tree | 6252cf350a91d6bed178b07ed3ddc7fdd21d2890 | |
parent | ceec792d1af5bb7b2d618f27f6fd48cdf75cf92f (diff) | |
download | scala-9923b97157725ae1f7853a4834ef5e31283a1b98.tar.gz scala-9923b97157725ae1f7853a4834ef5e31283a1b98.tar.bz2 scala-9923b97157725ae1f7853a4834ef5e31283a1b98.zip |
Moved parallel collections to library dir, chan...
Moved parallel collections to library dir, changed sabbus script. Added
`par` to some of the classes. No review.
45 files changed, 4788 insertions, 28 deletions
@@ -304,6 +304,7 @@ LOCAL REFERENCE BUILD (LOCKER) <include name="**/*.scala"/> <compilationpath> <pathelement location="${build-locker.dir}/classes/library"/> + <pathelement location="${lib.dir}/forkjoin.jar"/> </compilationpath> </scalacfork> <propertyfile file="${build-locker.dir}/classes/library/library.properties"> @@ -444,7 +445,7 @@ QUICK BUILD (QUICK) <include name="library/**"/> <include name="dbc/**"/> <include name="actors/**"/> - <include name="parallel-collections/**"/> + <!--<include name="parallel-collections/**"/>--> <include name="continuations/**"/> <include name="swing/**"/> </srcfiles> @@ -480,6 +481,7 @@ QUICK BUILD (QUICK) <include name="**/*.scala"/> <compilationpath> <pathelement location="${build-quick.dir}/classes/library"/> + <pathelement location="${lib.dir}/forkjoin.jar"/> </compilationpath> </scalacfork> <scalacfork @@ -494,7 +496,7 @@ QUICK BUILD (QUICK) <pathelement location="${lib.dir}/forkjoin.jar"/> </compilationpath> </scalacfork> - <scalacfork + <!--<scalacfork destdir="${build-quick.dir}/classes/library" compilerpathref="locker.classpath" params="${scalac.args.quick}" @@ -505,7 +507,7 @@ QUICK BUILD (QUICK) <pathelement location="${build-quick.dir}/classes/library"/> <pathelement location="${lib.dir}/forkjoin.jar"/> </compilationpath> - </scalacfork> + </scalacfork>--> <scalacfork destdir="${build-quick.dir}/classes/library" compilerpathref="locker.classpath" @@ -1011,6 +1013,7 @@ BOOTSTRAPPING BUILD (STRAP) <include name="**/*.scala"/> <compilationpath> <pathelement location="${build-strap.dir}/classes/library"/> + <pathelement location="${forkjoin.jar}"/> </compilationpath> </scalacfork> <scalacfork @@ -1025,7 +1028,7 @@ BOOTSTRAPPING BUILD (STRAP) <pathelement location="${forkjoin.jar}"/> </compilationpath> </scalacfork> - <scalacfork + <!--<scalacfork destdir="${build-strap.dir}/classes/library" compilerpathref="pack.classpath" params="${scalac.args.all}" @@ -1036,7 +1039,7 @@ BOOTSTRAPPING BUILD (STRAP) <pathelement location="${build-strap.dir}/classes/library"/> <pathelement location="${forkjoin.jar}"/> </compilationpath> - </scalacfork> + </scalacfork>--> <scalacfork destdir="${build-strap.dir}/classes/library" compilerpathref="pack.classpath" @@ -1393,7 +1396,7 @@ DOCUMENTATION <include name="library/**"/> <include name="dbc/**"/> <include name="actors/**"/> - <include name="parallel-collections/**"/> + <!--<include name="parallel-collections/**"/>--> <include name="swing/**"/> </srcfiles> </uptodate> @@ -1411,7 +1414,7 @@ DOCUMENTATION classpathref="pack.classpath"> <src> <files includes="${src.dir}/actors"/> - <files includes="${src.dir}/parallel-collections"/> + <!--<files includes="${src.dir}/parallel-collections"/>--> <files includes="${src.dir}/library/scala"/> <files includes="${src.dir}/swing"/> <files includes="${src.dir}/continuations/library"/> @@ -1650,7 +1653,7 @@ DISTRIBUTION <jar destfile="${dist.dir}/src/scala-library-src.jar"> <fileset dir="${src.dir}/library"/> <fileset dir="${src.dir}/actors"/> - <fileset dir="${src.dir}/parallel-collections"/> + <!--<fileset dir="${src.dir}/parallel-collections"/>--> <fileset dir="${src.dir}/continuations/library"/> </jar> <jar destfile="${dist.dir}/src/scala-dbc-src.jar"> @@ -1739,7 +1742,7 @@ STABLE REFERENCE (STARR) <jar destfile="${basedir}/lib/scala-library-src.jar"> <fileset dir="${basedir}/src/library"/> <fileset dir="${basedir}/src/actors"/> - <fileset dir="${basedir}/src/parallel-collections"/> + <!--<fileset dir="${basedir}/src/parallel-collections"/>--> <fileset dir="${basedir}/src/swing"/> <fileset dir="${basedir}/src/dbc"/> </jar> diff --git a/src/library/scala/collection/Parallel.scala b/src/library/scala/collection/Parallel.scala new file mode 100644 index 0000000000..e500817745 --- /dev/null +++ b/src/library/scala/collection/Parallel.scala @@ -0,0 +1,17 @@ +package scala.collection + + + + + + +/** A marker trait for objects with parallelised operations. + * + * @since 2.8 + * @author prokopec + */ +trait Parallel + + + + diff --git a/src/library/scala/collection/Parallelizable.scala b/src/library/scala/collection/Parallelizable.scala new file mode 100644 index 0000000000..405c005c55 --- /dev/null +++ b/src/library/scala/collection/Parallelizable.scala @@ -0,0 +1,38 @@ +package scala.collection + + + +import parallel.ParallelIterableLike + + + +/** This trait describes collections which can be turned into parallel collections + * by invoking the method `par`. Parallelizable collections may be parametrized with + * a target type different than their own. + */ +trait Parallelizable[+ParRepr <: Parallel] { + + /** Returns a parallel implementation of a collection. + */ + def par: ParRepr + +} + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/Sequentializable.scala b/src/library/scala/collection/Sequentializable.scala new file mode 100644 index 0000000000..61fb24571a --- /dev/null +++ b/src/library/scala/collection/Sequentializable.scala @@ -0,0 +1,15 @@ +package scala.collection + + + + +trait Sequentializable[+T, +Repr] { + + /** A view of this parallel collection, but with all + * of the operations implemented sequentially (i.e. in a single-threaded manner). + * + * @return a sequential view of the collection. + */ + def seq: Repr + +}
\ No newline at end of file diff --git a/src/library/scala/collection/generic/CanBuildFromParallel.scala b/src/library/scala/collection/generic/CanBuildFromParallel.scala new file mode 100644 index 0000000000..fcbcd6295e --- /dev/null +++ b/src/library/scala/collection/generic/CanBuildFromParallel.scala @@ -0,0 +1,28 @@ +package scala.collection +package generic + + + +import scala.collection.parallel._ + + + + +/** + * A base trait for parallel builder factories. + * + * @tparam From the type of the underlying collection that requests a builder to be created + * @tparam Elem the element type of the collection to be created + * @tparam To the type of the collection to be created + */ +trait CanCombineFrom[-From, -Elem, +To] extends CanBuildFrom[From, Elem, To] with Parallel { + def apply(from: From): Combiner[Elem, To] + def apply(): Combiner[Elem, To] +} + + + + + + + diff --git a/src/library/scala/collection/generic/GenericParallelCompanion.scala b/src/library/scala/collection/generic/GenericParallelCompanion.scala new file mode 100644 index 0000000000..e5ba36f846 --- /dev/null +++ b/src/library/scala/collection/generic/GenericParallelCompanion.scala @@ -0,0 +1,29 @@ +package scala.collection.generic + + +import scala.collection.parallel.Combiner +import scala.collection.parallel.ParallelIterable +import scala.collection.parallel.ParallelMap + + + +/** A template class for companion objects of parallel collection classes. + * They should be mixed in together with `GenericCompanion` type. + * @tparam CC the type constructor representing the collection class + * @since 2.8 + */ +trait GenericParallelCompanion[+CC[X] <: ParallelIterable[X]] { + /** The default builder for $Coll objects. + */ + def newBuilder[A]: Combiner[A, CC[A]] + + /** The parallel builder for $Coll objects. + */ + def newCombiner[A]: Combiner[A, CC[A]] +} + +trait GenericParallelMapCompanion[+CC[P, Q] <: ParallelMap[P, Q]] { + def newCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]] +} + + diff --git a/src/library/scala/collection/generic/GenericParallelTemplate.scala b/src/library/scala/collection/generic/GenericParallelTemplate.scala new file mode 100644 index 0000000000..e98c13fa36 --- /dev/null +++ b/src/library/scala/collection/generic/GenericParallelTemplate.scala @@ -0,0 +1,66 @@ +package scala.collection.generic + + + +import scala.collection.parallel.Combiner +import scala.collection.parallel.ParallelIterable +import scala.collection.parallel.ParallelMap +import scala.collection.parallel.TaskSupport + + +import annotation.unchecked.uncheckedVariance + + + + + + +/** A template trait for collections having a companion. + * + * @tparam A the element type of the collection + * @tparam CC the type constructor representing the collection class + * @since 2.8 + * @author prokopec + */ +trait GenericParallelTemplate[+A, +CC[X] <: ParallelIterable[X]] +extends GenericTraversableTemplate[A, CC] + with HasNewCombiner[A, CC[A] @uncheckedVariance] + with TaskSupport +{ + def companion: GenericCompanion[CC] with GenericParallelCompanion[CC] + + protected[this] override def newBuilder: collection.mutable.Builder[A, CC[A]] = newCombiner + + protected[this] override def newCombiner: Combiner[A, CC[A]] = { + val cb = companion.newCombiner[A] + cb.environment = environment + cb + } + + override def genericBuilder[B]: Combiner[B, CC[B]] = genericCombiner[B] + + def genericCombiner[B]: Combiner[B, CC[B]] = { + val cb = companion.newCombiner[B] + cb.environment = environment + cb + } + +} + + +trait GenericParallelMapTemplate[K, +V, +CC[X, Y] <: ParallelMap[X, Y]] +extends TaskSupport +{ + def mapCompanion: GenericParallelMapCompanion[CC] + + def genericMapCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]] = { + val cb = mapCompanion.newCombiner[P, Q] + cb.environment = environment + cb + } +} + + + + + diff --git a/src/library/scala/collection/generic/HasNewCombiner.scala b/src/library/scala/collection/generic/HasNewCombiner.scala new file mode 100644 index 0000000000..2c24b437d8 --- /dev/null +++ b/src/library/scala/collection/generic/HasNewCombiner.scala @@ -0,0 +1,26 @@ +package scala.collection.generic + + + +import scala.collection.parallel.Combiner + + + +trait HasNewCombiner[+T, +Repr] { + protected[this] def newCombiner: Combiner[T, Repr] +} + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/generic/ParallelFactory.scala b/src/library/scala/collection/generic/ParallelFactory.scala new file mode 100644 index 0000000000..0b9e92aa10 --- /dev/null +++ b/src/library/scala/collection/generic/ParallelFactory.scala @@ -0,0 +1,43 @@ +package scala.collection.generic + + +import scala.collection.parallel.ParallelIterable +import scala.collection.parallel.Combiner + + + +/** A template class for companion objects of `ParallelIterable` and subclasses thereof. + * This class extends `TraversableFactory` and provides a set of operations to create `$Coll` objects. + * + * @define $coll parallel collection + * @define $Coll ParallelIterable + */ +abstract class ParallelFactory[CC[X] <: ParallelIterable[X] with GenericParallelTemplate[X, CC]] +extends TraversableFactory[CC] + with GenericParallelCompanion[CC] { + + type EPC[T, C] = collection.parallel.EnvironmentPassingCombiner[T, C] + + /** + * A generic implementation of the `CanBuildFromParallel` trait, which forwards all calls to + * `apply(from)` to the `genericParallelBuilder` method of the $coll `from`, and calls to `apply()` + * to this factory. + */ + class GenericCanCombineFrom[A] extends GenericCanBuildFrom[A] with CanCombineFrom[CC[_], A, CC[A]] { + override def apply(from: Coll) = from.genericCombiner + override def apply() = newBuilder[A] + } +} + + + + + + + + + + + + + diff --git a/src/library/scala/collection/generic/ParallelMapFactory.scala b/src/library/scala/collection/generic/ParallelMapFactory.scala new file mode 100644 index 0000000000..8f779b4029 --- /dev/null +++ b/src/library/scala/collection/generic/ParallelMapFactory.scala @@ -0,0 +1,42 @@ +package scala.collection.generic + + + +import scala.collection.parallel.ParallelMap +import scala.collection.parallel.ParallelMapLike +import scala.collection.parallel.Combiner +import scala.collection.mutable.Builder + + + + +/** A template class for companion objects of `ParallelMap` and subclasses thereof. + * This class extends `TraversableFactory` and provides a set of operations to create `$Coll` objects. + * + * @define $coll parallel map + * @define $Coll ParallelMap + */ +abstract class ParallelMapFactory[CC[X, Y] <: ParallelMap[X, Y] with ParallelMapLike[X, Y, CC[X, Y], _]] +extends MapFactory[CC] + with GenericParallelMapCompanion[CC] { + + type MapColl = CC[_, _] + + /** The default builder for $Coll objects. + * @tparam K the type of the keys + * @tparam V the type of the associated values + */ + override def newBuilder[K, V]: Builder[(K, V), CC[K, V]] = newCombiner[K, V] + + /** The default combiner for $Coll objects. + * @tparam K the type of the keys + * @tparam V the type of the associated values + */ + def newCombiner[K, V]: Combiner[(K, V), CC[K, V]] + + class CanCombineFromMap[K, V] extends CanCombineFrom[CC[_, _], (K, V), CC[K, V]] { + def apply(from: MapColl) = from.genericMapCombiner[K, V].asInstanceOf[Combiner[(K, V), CC[K, V]]] + def apply() = newCombiner[K, V] + } + +} diff --git a/src/library/scala/collection/generic/Signalling.scala b/src/library/scala/collection/generic/Signalling.scala new file mode 100644 index 0000000000..1dac4297b7 --- /dev/null +++ b/src/library/scala/collection/generic/Signalling.scala @@ -0,0 +1,192 @@ +package scala.collection.generic + + +import java.util.concurrent.atomic.AtomicInteger + + + + + +/** + * A message interface serves as a unique interface to the + * part of the collection capable of receiving messages from + * a different task. + * + * One example of use of this is the `find` method, which can use the + * signalling interface to inform worker threads that an element has + * been found and no further search is necessary. + * + * @author prokopec + * + * @define abortflag + * Abort flag being true means that a worker can abort and produce whatever result, + * since its result will not affect the final result of computation. An example + * of operations using this are `find`, `forall` and `exists` methods. + * + * @define indexflag + * The index flag holds an integer which carries some operation-specific meaning. For + * instance, `takeWhile` operation sets the index flag to the position of the element + * where the predicate fails. Other workers may check this index against the indices + * they are working on and return if this index is smaller than their index. Examples + * of operations using this are `takeWhile`, `dropWhile`, `span` and `indexOf`. + */ +trait Signalling { + /** + * Checks whether an abort signal has been issued. + * + * $abortflag + * @return the state of the abort + */ + def isAborted: Boolean + + /** + * Sends an abort signal to other workers. + * + * $abortflag + */ + def abort: Unit + + /** + * Returns the value of the index flag. + * + * $indexflag + * @return the value of the index flag + */ + def indexFlag: Int + + /** + * Sets the value of the index flag. + * + * $indexflag + * @param f the value to which the index flag is set. + */ + def setIndexFlag(f: Int) + + /** + * Sets the value of the index flag if argument is greater than current value. + * This method does this atomically. + * + * $indexflag + * @param f the value to which the index flag is set + */ + def setIndexFlagIfGreater(f: Int) + + /** + * Sets the value of the index flag if argument is lesser than current value. + * This method does this atomically. + * + * $indexflag + * @param f the value to which the index flag is set + */ + def setIndexFlagIfLesser(f: Int) + + /** + * A read only tag specific to the signalling object. It is used to give + * specific workers information on the part of the collection being operated on. + */ + def tag: Int +} + + +/** + * This signalling implementation returns default values and ignores received signals. + */ +class DefaultSignalling extends Signalling { + def isAborted = false + def abort {} + + def indexFlag = -1 + def setIndexFlag(f: Int) {} + def setIndexFlagIfGreater(f: Int) {} + def setIndexFlagIfLesser(f: Int) {} + + def tag = -1 +} + + +/** + * An object that returns default values and ignores received signals. + */ +object IdleSignalling extends DefaultSignalling + + +/** + * A mixin trait that implements abort flag behaviour using volatile variables. + */ +trait VolatileAbort extends Signalling { + @volatile private var abortflag = false + abstract override def isAborted = abortflag + abstract override def abort = abortflag = true +} + + +/** + * A mixin trait that implements index flag behaviour using atomic integers. + * The `setIndex` operation is wait-free, while conditional set operations `setIndexIfGreater` + * and `setIndexIfLesser` are lock-free and support only monotonic changes. + */ +trait AtomicIndexFlag extends Signalling { + private val intflag: AtomicInteger = new AtomicInteger(-1) + abstract override def indexFlag = intflag.get + abstract override def setIndexFlag(f: Int) = intflag.set(f) + abstract override def setIndexFlagIfGreater(f: Int) = { + var loop = true + do { + val old = intflag.get + if (f <= old) loop = false + else if (intflag.compareAndSet(old, f)) loop = false + } while (loop); + } + abstract override def setIndexFlagIfLesser(f: Int) = { + var loop = true + do { + val old = intflag.get + if (f >= old) loop = false + else if (intflag.compareAndSet(old, f)) loop = false + } while (loop); + } +} + + +/** + * An implementation of the signalling interface using delegates. + */ +trait DelegatedSignalling extends Signalling { + /** + * A delegate that method calls are redirected to. + */ + var signalDelegate: Signalling + + def isAborted = signalDelegate.isAborted + def abort = signalDelegate.abort + + def indexFlag = signalDelegate.indexFlag + def setIndexFlag(f: Int) = signalDelegate.setIndexFlag(f) + def setIndexFlagIfGreater(f: Int) = signalDelegate.setIndexFlagIfGreater(f) + def setIndexFlagIfLesser(f: Int) = signalDelegate.setIndexFlagIfLesser(f) + + def tag = signalDelegate.tag +} + + +/** + * Class implementing delegated signalling. + */ +class DelegatedContext(var signalDelegate: Signalling) extends DelegatedSignalling + + +/** + * Class implementing delegated signalling, but having its own distinct `tag`. + */ +class TaggedDelegatedContext(deleg: Signalling, override val tag: Int) extends DelegatedContext(deleg) + + + + + + + + + + + diff --git a/src/library/scala/collection/generic/Sizing.scala b/src/library/scala/collection/generic/Sizing.scala new file mode 100644 index 0000000000..bf801302ae --- /dev/null +++ b/src/library/scala/collection/generic/Sizing.scala @@ -0,0 +1,9 @@ +package scala.collection.generic + + + +/** A trait for objects which have a size. + */ +trait Sizing { + def size: Int +}
\ No newline at end of file diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala index 22760b88c2..8b4bc070ab 100644 --- a/src/library/scala/collection/immutable/HashMap.scala +++ b/src/library/scala/collection/immutable/HashMap.scala @@ -14,6 +14,10 @@ package immutable import generic._ import annotation.unchecked.uncheckedVariance + +import parallel.immutable.ParallelHashTrie + + /** This class implements immutable maps using a hash trie. * * '''Note:''' the builder of a hash map returns specialized representations EmptyMap,Map1,..., Map4 @@ -32,7 +36,7 @@ import annotation.unchecked.uncheckedVariance * @define willNotTerminateInf */ @serializable @SerialVersionUID(2L) -class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] { +class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Parallelizable[ParallelHashTrie[A, B]] { override def size: Int = 0 @@ -80,9 +84,11 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] { def split: Seq[HashMap[A, B]] = Seq(this) - def combine[B1 >: B](that: HashMap[A, B1]): HashMap[A, B1] = combine0(that, 0) + def merge[B1 >: B](that: HashMap[A, B1]): HashMap[A, B1] = merge0(that, 0) + + protected def merge0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = that - protected def combine0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = that + def par = ParallelHashTrie.fromTrie(this) } @@ -170,7 +176,7 @@ object HashMap extends ImmutableMapFactory[HashMap] { override def iterator: Iterator[(A,B)] = Iterator(ensurePair) override def foreach[U](f: ((A, B)) => U): Unit = f(ensurePair) private[HashMap] def ensurePair: (A,B) = if (kv ne null) kv else { kv = (key, value); kv } - protected override def combine0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = { + protected override def merge0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = { // if (that.isInstanceOf[HashMap1[_, _]]) bothsingle += 1 // else onetrie += 1 that.updated0(key, hash, level, value, kv) @@ -209,7 +215,7 @@ object HashMap extends ImmutableMapFactory[HashMap] { def newhm(lm: ListMap[A, B @uncheckedVariance]) = new HashMapCollision1(hash, lm) List(newhm(x), newhm(y)) } - protected override def combine0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = { + protected override def merge0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = { // this can be made more efficient by passing the entire ListMap at once var m = that for (p <- kvs) m = m.updated0(p._1, this.hash, level, p._2, p) @@ -453,7 +459,7 @@ time { mNew.iterator.foreach( p => ()) } } else elems(0).split } - protected override def combine0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = that match { + protected override def merge0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = that match { case hm: HashMap1[_, _] => // onetrie += 1 this.updated0(hm.key, hm.hash, level, hm.value.asInstanceOf[B1], hm.kv) @@ -469,7 +475,7 @@ time { mNew.iterator.foreach( p => ()) } val subcount = Integer.bitCount(thisbm | thatbm) // construct a new array of appropriate size - val combined = new Array[HashMap[A, B1]](subcount) + val merged = new Array[HashMap[A, B1]](subcount) // run through both bitmaps and add elements to it var i = 0 @@ -486,9 +492,9 @@ time { mNew.iterator.foreach( p => ()) } // } if (thislsb == thatlsb) { // println("a collision") - val m = thiselems(thisi).combine0(thatelems(thati), level + 5) + val m = thiselems(thisi).merge0(thatelems(thati), level + 5) totalelems += m.size - combined(i) = m + merged(i) = m thisbm = thisbm & ~thislsb thatbm = thatbm & ~thatlsb thati += 1 @@ -507,14 +513,14 @@ time { mNew.iterator.foreach( p => ()) } // println("an element from this trie") val m = thiselems(thisi) totalelems += m.size - combined(i) = m + merged(i) = m thisbm = thisbm & ~thislsb thisi += 1 } else { // println("an element from that trie") val m = thatelems(thati) totalelems += m.size - combined(i) = m + merged(i) = m thatbm = thatbm & ~thatlsb thati += 1 } @@ -522,8 +528,8 @@ time { mNew.iterator.foreach( p => ()) } i += 1 } - new HashTrieMap[A, B1](this.bitmap | that.bitmap, combined, totalelems) - case hm: HashMapCollision1[_, _] => that.combine0(this, level) + new HashTrieMap[A, B1](this.bitmap | that.bitmap, merged, totalelems) + case hm: HashMapCollision1[_, _] => that.merge0(this, level) case _ => error("section supposed to be unreachable.") } diff --git a/src/library/scala/collection/immutable/package.scala b/src/library/scala/collection/immutable/package.scala new file mode 100644 index 0000000000..5ff9fa223d --- /dev/null +++ b/src/library/scala/collection/immutable/package.scala @@ -0,0 +1,81 @@ +package scala.collection + + + + + + + + + +package object immutable { + + trait RangeUtils[+Repr <: RangeUtils[Repr]] { + + def start: Int + def end: Int + def step: Int + def inclusive: Boolean + def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean): Repr + + private final def inclusiveLast: Int = { + val size = end.toLong - start.toLong + (size / step.toLong * step.toLong + start.toLong).toInt + } + + final def _last: Int = if (!inclusive) { + if (step == 1 || step == -1) end - step + else { + val inclast = inclusiveLast + if ((end.toLong - start.toLong) % step == 0) inclast - step else inclast + } + } else { + if (step == 1 || step == -1) end + else inclusiveLast + } + + final def _foreach[U](f: Int => U) = if (_length > 0) { + var i = start + val last = _last + while (i != last) { + f(i) + i += step + } + } + + final def _length: Int = if (!inclusive) { + if (end > start == step > 0 && start != end) { + (_last.toLong - start.toLong) / step.toLong + 1 + } else 0 + }.toInt else { + if (end > start == step > 0 || start == end) { + (_last.toLong - start.toLong) / step.toLong + 1 + } else 0 + }.toInt + + final def _apply(idx: Int): Int = { + if (idx < 0 || idx >= _length) throw new IndexOutOfBoundsException(idx.toString) + start + idx * step + } + + private def locationAfterN(n: Int) = if (n > 0) { + if (step > 0) ((start.toLong + step.toLong * n.toLong) min _last.toLong).toInt + else ((start.toLong + step.toLong * n.toLong) max _last.toLong).toInt + } else start + + final def _take(n: Int) = if (n > 0 && _length > 0) { + create(start, locationAfterN(n), step, true) + } else create(start, start, step, false) + + final def _drop(n: Int) = create(locationAfterN(n), end, step, inclusive) + + final def _slice(from: Int, until: Int) = _drop(from)._take(until - from) + + } + +} + + + + + diff --git a/src/library/scala/collection/mutable/ArrayBuffer.scala b/src/library/scala/collection/mutable/ArrayBuffer.scala index 6412a21531..a59a0db2e1 100644 --- a/src/library/scala/collection/mutable/ArrayBuffer.scala +++ b/src/library/scala/collection/mutable/ArrayBuffer.scala @@ -12,6 +12,7 @@ package scala.collection package mutable import generic._ +import parallel.mutable.ParallelArray /** An implementation of the `Buffer` class using an array to * represent the assembled sequence internally. Append, update and random @@ -46,7 +47,8 @@ class ArrayBuffer[A](override protected val initialSize: Int) with BufferLike[A, ArrayBuffer[A]] with IndexedSeqOptimized[A, ArrayBuffer[A]] with Builder[A, ArrayBuffer[A]] - with ResizableArray[A] { + with ResizableArray[A] + with Parallelizable[ParallelArray[A]] { override def companion: GenericCompanion[ArrayBuffer] = ArrayBuffer @@ -64,6 +66,8 @@ class ArrayBuffer[A](override protected val initialSize: Int) } } + def par = ParallelArray.handoff[A](array.asInstanceOf[Array[A]], size) + /** Appends a single element to this buffer and returns * the identity of the buffer. It takes constant amortized time. * diff --git a/src/library/scala/collection/mutable/ArrayOps.scala b/src/library/scala/collection/mutable/ArrayOps.scala index 00e8697b53..3cf6a642d2 100644 --- a/src/library/scala/collection/mutable/ArrayOps.scala +++ b/src/library/scala/collection/mutable/ArrayOps.scala @@ -14,6 +14,9 @@ import compat.Platform.arraycopy import scala.reflect.ClassManifest +import parallel.mutable.ParallelArray + + /** This class serves as a wrapper for `Array`s with all the operations found in * indexed sequences. Where needed, instances of arrays are implicitly converted * into this class. @@ -32,7 +35,7 @@ import scala.reflect.ClassManifest * @define mayNotTerminateInf * @define willNotTerminateInf */ -abstract class ArrayOps[T] extends ArrayLike[T, Array[T]] { +abstract class ArrayOps[T] extends ArrayLike[T, Array[T]] with Parallelizable[ParallelArray[T]] { private def rowBuilder[U]: Builder[U, Array[U]] = Array.newBuilder( @@ -52,6 +55,8 @@ abstract class ArrayOps[T] extends ArrayLike[T, Array[T]] { else super.toArray[U] + def par = ParallelArray.handoff(repr) + /** Flattens a two-dimensional array by concatenating all its rows * into a single array. * diff --git a/src/library/scala/collection/parallel/Combiners.scala b/src/library/scala/collection/parallel/Combiners.scala new file mode 100644 index 0000000000..a37f642d42 --- /dev/null +++ b/src/library/scala/collection/parallel/Combiners.scala @@ -0,0 +1,66 @@ +package scala.collection.parallel + + +import scala.collection.Parallel +import scala.collection.mutable.Builder +import scala.collection.generic.Sizing + + + +/** The base trait for all combiners. + * A combiner lets one construct collections incrementally just like + * a regular builder, but also implements an efficient merge operation of two builders + * via `combine` method. Once the collection is constructed, it may be obtained by invoking + * the `result` method. + * + * @tparam Elem the type of the elements added to the builder + * @tparam To the type of the collection the builder produces + * + * @author prokopec + */ +trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel with TaskSupport { + self: EnvironmentPassingCombiner[Elem, To] => + + type EPC = EnvironmentPassingCombiner[Elem, To] + + /** Combines the contents of the receiver builder and the `other` builder, + * producing a new builder containing both their elements. + * + * This method may combine the two builders by copying them into a larger collection, + * by producing a lazy view that gets evaluated once `result` is invoked, or use + * a merge operation specific to the data structure in question. + * + * Note that both the receiver builder and `other` builder become invalidated + * after the invocation of this method, and should be cleared (see `clear`) + * if they are to be used again. + * + * Also, combining two combiners `c1` and `c2` for which `c1 eq c2` is `true`, that is, + * they are the same objects in memories, always does nothing and returns the first combiner. + * + * @tparam N the type of elements contained by the `other` builder + * @tparam NewTo the type of collection produced by the `other` builder + * @param other the other builder + * @return the parallel builder containing both the elements of this and the `other` builder + */ + def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] + +} + + +trait EnvironmentPassingCombiner[-Elem, +To] extends Combiner[Elem, To] { + abstract override def result = { + val res = super.result +// res.environment = environment + res + } +} + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/Iterators.scala b/src/library/scala/collection/parallel/Iterators.scala new file mode 100644 index 0000000000..bfebff994c --- /dev/null +++ b/src/library/scala/collection/parallel/Iterators.scala @@ -0,0 +1,443 @@ +package scala.collection.parallel + + + +import scala.collection.Parallel +import scala.collection.generic.Signalling +import scala.collection.generic.DelegatedSignalling +import scala.collection.generic.CanCombineFrom +import scala.collection.mutable.Builder +import scala.collection.Iterator.empty + + + + + + +trait RemainsIterator[+T] extends Iterator[T] { + /** The number of elements this iterator has yet to iterate. + * This method doesn't change the state of the iterator. + */ + def remaining: Int +} + + +/** Augments iterators with additional methods, mostly transformers, + * assuming they iterate an iterable collection. + * + * @param T type of the elements iterated. + * @param Repr type of the collection iterator iterates. + */ +trait AugmentedIterableIterator[+T, +Repr <: Parallel] extends RemainsIterator[T] { + + def repr: Repr + + /* accessors */ + + override def count(p: T => Boolean): Int = { + var i = 0 + while (hasNext) if (p(next)) i += 1 + i + } + + def reduce[U >: T](op: (U, U) => U): U = { + var r: U = next + while (hasNext) r = op(r, next) + r + } + + def fold[U >: T](z: U)(op: (U, U) => U): U = { + var r = z + while (hasNext) r = op(r, next) + r + } + + override def sum[U >: T](implicit num: Numeric[U]): U = { + var r: U = num.zero + while (hasNext) r = num.plus(r, next) + r + } + + override def product[U >: T](implicit num: Numeric[U]): U = { + var r: U = num.one + while (hasNext) r = num.times(r, next) + r + } + + override def min[U >: T](implicit ord: Ordering[U]): T = { + var r = next + while (hasNext) { + val curr = next + if (ord.lteq(curr, r)) r = curr + } + r + } + + override def max[U >: T](implicit ord: Ordering[U]): T = { + var r = next + while (hasNext) { + val curr = next + if (ord.gteq(curr, r)) r = curr + } + r + } + + override def copyToArray[U >: T](array: Array[U], from: Int, len: Int) { + var i = from + val until = from + len + while (i < until && hasNext) { + array(i) = next + i += 1 + } + } + + /* transformers to combiners */ + + def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = { + //val cb = pbf(repr) + cb.sizeHint(remaining) + while (hasNext) cb += f(next) + cb + } + + def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = { + val cb = pbf(repr) + while (hasNext) { + val curr = next + if (pf.isDefinedAt(curr)) cb += pf(curr) + } + cb + } + + def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = { + val cb = pbf(repr) + while (hasNext) { + val traversable = f(next) + if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator + else cb ++= traversable + } + cb + } + + def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](b: Bld): Bld = { + b.sizeHint(remaining) + while (hasNext) b += next + b + } + + def filter2combiner[U >: T, This >: Repr](pred: T => Boolean, cb: Combiner[U, This]): Combiner[U, This] = { + while (hasNext) { + val curr = next + if (pred(curr)) cb += curr + } + cb + } + + def filterNot2combiner[U >: T, This >: Repr](pred: T => Boolean, cb: Combiner[U, This]): Combiner[U, This] = { + while (hasNext) { + val curr = next + if (!pred(curr)) cb += curr + } + cb + } + + def partition2combiners[U >: T, This >: Repr](pred: T => Boolean, btrue: Combiner[U, This], bfalse: Combiner[U, This]) = { + while (hasNext) { + val curr = next + if (pred(curr)) btrue += curr + else bfalse += curr + } + (btrue, bfalse) + } + + def take2combiner[U >: T, This >: Repr](n: Int, cb: Combiner[U, This]): Combiner[U, This] = { + cb.sizeHint(n) + var left = n + while (left > 0) { + cb += next + left -= 1 + } + cb + } + + def drop2combiner[U >: T, This >: Repr](n: Int, cb: Combiner[U, This]): Combiner[U, This] = { + drop(n) + cb.sizeHint(remaining) + while (hasNext) cb += next + cb + } + + def slice2combiner[U >: T, This >: Repr](from: Int, until: Int, cb: Combiner[U, This]): Combiner[U, This] = { + drop(from) + var left = until - from + cb.sizeHint(left) + while (left > 0) { + cb += next + left -= 1 + } + cb + } + + def splitAt2combiners[U >: T, This >: Repr](at: Int, before: Combiner[U, This], after: Combiner[U, This]) = { + before.sizeHint(at) + after.sizeHint(remaining - at) + var left = at + while (left > 0) { + before += next + left -= 1 + } + while (hasNext) after += next + (before, after) + } + + def takeWhile2combiner[U >: T, This >: Repr](p: T => Boolean, cb: Combiner[U, This]) = { + var loop = true + while (hasNext && loop) { + val curr = next + if (p(curr)) cb += curr + else loop = false + } + (cb, loop) + } + + def span2combiners[U >: T, This >: Repr](p: T => Boolean, before: Combiner[U, This], after: Combiner[U, This]) = { + var isBefore = true + while (hasNext && isBefore) { + val curr = next + if (p(curr)) before += curr + else { + after.sizeHint(remaining + 1) + after += curr + isBefore = false + } + } + while (hasNext) after += next + (before, after) + } +} + + +trait AugmentedSeqIterator[+T, +Repr <: Parallel] extends AugmentedIterableIterator[T, Repr] { + + /** The exact number of elements this iterator has yet to iterate. + * This method doesn't change the state of the iterator. + */ + def remaining: Int + + /* accessors */ + + def prefixLength(pred: T => Boolean): Int = { + var total = 0 + var loop = true + while (hasNext && loop) { + if (pred(next)) total += 1 + else loop = false + } + total + } + + override def indexWhere(pred: T => Boolean): Int = { + var i = 0 + var loop = true + while (hasNext && loop) { + if (pred(next)) loop = false + else i += 1 + } + if (loop) -1 else i + } + + def lastIndexWhere(pred: T => Boolean): Int = { + var pos = -1 + var i = 0 + while (hasNext) { + if (pred(next)) pos = i + i += 1 + } + pos + } + + def corresponds[S](corr: (T, S) => Boolean)(that: Iterator[S]): Boolean = { + while (hasNext && that.hasNext) { + if (!corr(next, that.next)) return false + } + hasNext == that.hasNext + } + + /* transformers */ + + def reverse2combiner[U >: T, This >: Repr](cb: Combiner[U, This]): Combiner[U, This] = { + cb.sizeHint(remaining) + var lst = List[T]() + while (hasNext) lst ::= next + while (lst != Nil) { + cb += lst.head + lst = lst.tail + } + cb + } + + def reverseMap2combiner[S, That](f: T => S, cbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = { + val cb = cbf(repr) + cb.sizeHint(remaining) + var lst = List[S]() + while (hasNext) lst ::= f(next) + while (lst != Nil) { + cb += lst.head + lst = lst.tail + } + cb + } + + def updated2combiner[U >: T, That](index: Int, elem: U, cbf: CanCombineFrom[Repr, U, That]): Combiner[U, That] = { + val cb = cbf(repr) + cb.sizeHint(remaining) + var j = 0 + while (hasNext) { + if (j == index) { + cb += elem + next + } else cb += next + j += 1 + } + cb + } + +} + + + +trait ParallelIterableIterator[+T, +Repr <: Parallel] +extends AugmentedIterableIterator[T, Repr] + with Splitter[T] + with Signalling + with DelegatedSignalling +{ + def split: Seq[ParallelIterableIterator[T, Repr]] + + /** The number of elements this iterator has yet to traverse. This method + * doesn't change the state of the iterator. + * + * This method is used to provide size hints to builders and combiners, and + * to approximate positions of iterators within a data structure. + * + * '''Note''': This method may be implemented to return an upper bound on the number of elements + * in the iterator, instead of the exact number of elements to iterate. + * + * In that case, 2 considerations must be taken into account: + * + * 1) classes that inherit `ParallelIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt` and `copyToArray`. + * + * 2) if an iterator provides an upper bound on the number of elements, then after splitting the sum + * of `remaining` values of split iterators must be less than or equal to this upper bound. + */ + def remaining: Int +} + + +trait ParallelSeqIterator[+T, +Repr <: Parallel] +extends ParallelIterableIterator[T, Repr] + with AugmentedSeqIterator[T, Repr] + with PreciseSplitter[T] +{ + def split: Seq[ParallelSeqIterator[T, Repr]] + def psplit(sizes: Int*): Seq[ParallelSeqIterator[T, Repr]] + + /** The number of elements this iterator has yet to traverse. This method + * doesn't change the state of the iterator. Unlike the version of this method in the supertrait, + * method `remaining` in `ParallelSeqLike.this.ParallelIterator` must return an exact number + * of elements remaining in the iterator. + * + * @return an exact number of elements this iterator has yet to iterate + */ + def remaining: Int +} + + +trait DelegatedIterator[+T, +Delegate <: Iterator[T]] extends RemainsIterator[T] { + val delegate: Delegate + def next = delegate.next + def hasNext = delegate.hasNext +} + + +trait Counting[+T] extends RemainsIterator[T] { + val initialSize: Int + def remaining = initialSize - traversed + var traversed = 0 + abstract override def next = { + val n = super.next + traversed += 1 + n + } +} + + +/** A mixin for iterators that traverse only filtered elements of a delegate. + */ +trait FilteredIterator[+T, +Delegate <: Iterator[T]] extends DelegatedIterator[T, Delegate] { + protected[this] val pred: T => Boolean + + private[this] var hd: T = _ + private var hdDefined = false + + override def hasNext: Boolean = hdDefined || { + do { + if (!delegate.hasNext) return false + hd = delegate.next + } while (!pred(hd)) + hdDefined = true + true + } + + override def next = if (hasNext) { hdDefined = false; hd } else empty.next +} + + +/** A mixin for iterators that traverse elements of the delegate iterator, and of another collection. + */ +trait AppendedIterator[+T, +Delegate <: Iterator[T]] extends DelegatedIterator[T, Delegate] { + // `rest` should never alias `delegate` + protected[this] val rest: Iterator[T] + + private[this] var current: Iterator[T] = delegate + + override def hasNext = (current.hasNext) || (current == delegate && rest.hasNext) + + override def next = { + if (!current.hasNext) current = rest + current.next + } + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/ParallelIterable.scala b/src/library/scala/collection/parallel/ParallelIterable.scala new file mode 100644 index 0000000000..4882dc19ee --- /dev/null +++ b/src/library/scala/collection/parallel/ParallelIterable.scala @@ -0,0 +1,49 @@ +package scala.collection.parallel + + +import scala.collection.generic._ +import scala.collection.parallel.mutable.ParallelArrayCombiner +import scala.collection.parallel.mutable.ParallelArray + + +/** A template trait for parallel iterable collections. + * + * $paralleliterableinfo + * + * $sideeffects + * + * @tparam T the element type of the collection + * + * @author prokopec + * @since 2.8 + */ +trait ParallelIterable[+T] extends Iterable[T] + with GenericParallelTemplate[T, ParallelIterable] + with ParallelIterableLike[T, ParallelIterable[T], Iterable[T]] { + override def companion: GenericCompanion[ParallelIterable] with GenericParallelCompanion[ParallelIterable] = ParallelIterable +} + +/** $factoryinfo + */ +object ParallelIterable extends ParallelFactory[ParallelIterable] { + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelIterable[T]] = + new GenericCanCombineFrom[T] + + def newBuilder[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T] + + def newCombiner[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T] +} + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/ParallelIterableLike.scala b/src/library/scala/collection/parallel/ParallelIterableLike.scala new file mode 100644 index 0000000000..7ac2713b55 --- /dev/null +++ b/src/library/scala/collection/parallel/ParallelIterableLike.scala @@ -0,0 +1,940 @@ +package scala.collection.parallel + + + + +import scala.collection.mutable.Builder +import scala.collection.mutable.ListBuffer +import scala.collection.IterableLike +import scala.collection.Parallel +import scala.collection.Parallelizable +import scala.collection.Sequentializable +import scala.collection.generic._ + + + + +// TODO update docs!! +/** A template trait for parallel collections of type `ParallelIterable[T]`. + * + * $paralleliterableinfo + * + * $sideeffects + * + * @tparam T the element type of the collection + * @tparam Repr the type of the actual collection containing the elements + * + * @define paralleliterableinfo + * This is a base trait for Scala parallel collections. It defines behaviour + * common to all parallel collections. The actual parallel operation implementation + * is found in the `ParallelIterableFJImpl` trait extending this trait. Concrete + * parallel collections should inherit both this and that trait. + * + * Parallel operations are implemented with divide and conquer style algorithms that + * parallelize well. The basic idea is to split the collection into smaller parts until + * they are small enough to be operated on sequentially. + * + * All of the parallel operations are implemented in terms of several methods. The first is: + * {{{ + * def split: Seq[Repr] + * }}} + * which splits the collection into a sequence of disjunct views. This is typically a + * very fast operation which simply creates wrappers around the receiver collection. + * These views can then be split recursively into smaller views and so on. Each of + * the views is still a parallel collection. + * + * The next method is: + * {{{ + * def combine[OtherRepr >: Repr](other: OtherRepr): OtherRepr + * }}} + * which combines this collection with the argument collection and returns a collection + * containing both the elements of this collection and the argument collection. This behaviour + * may be implemented by producing a view that iterates over both collections, by aggressively + * copying all the elements into the new collection or by lazily creating a wrapper over both + * collections that gets evaluated once it's needed. It is recommended to avoid copying all of + * the elements for performance reasons, although that cost might be negligible depending on + * the use case. + * + * Methods: + * {{{ + * def seq: Repr + * }}} + * and + * {{{ + * def par: Repr + * }}} + * produce a view of the collection that has sequential or parallel operations, respectively. + * + * The method: + * {{{ + * def threshold(sz: Int, p: Int): Int + * }}} + * provides an estimate on the minimum number of elements the collection has before + * the splitting stops and depends on the number of elements in the collection. A rule of the + * thumb is the number of elements divided by 8 times the parallelism level. This method may + * be overridden in concrete implementations if necessary. + * + * Finally, method `newParallelBuilder` produces a new parallel builder. + * + * Since this trait extends the `Iterable` trait, methods like `size` and `iterator` must also + * be implemented. + * + * Each parallel collection is bound to a specific fork/join pool, on which dormant worker + * threads are kept. One can change a fork/join pool of a collection any time except during + * some method being invoked. The fork/join pool contains other information such as the parallelism + * level, that is, the number of processors used. When a collection is created, it is assigned the + * default fork/join pool found in the `scala.collection.parallel` package object. + * + * Parallel collections may or may not be strict, and they are not ordered in terms of the `foreach` + * operation (see `Traversable`). In terms of the iterator of the collection, some collections + * are ordered (for instance, parallel sequences). + * + * @author prokopec + * @since 2.8 + * + * @define sideeffects + * The higher-order functions passed to certain operations may contain side-effects. Since implementations + * of operations may not be sequential, this means that side-effects may not be predictable and may + * produce data-races, deadlocks or invalidation of state if care is not taken. It is up to the programmer + * to either avoid using side-effects or to use some form of synchronization when accessing mutable data. + * + * @define undefinedorder + * The order in which the operations on elements are performed is unspecified and may be nondeterministic. + * + * @define pbfinfo + * An implicit value of class `CanCombineFrom` which determines the + * result class `That` from the current representation type `Repr` and + * and the new element type `B`. This builder factory can provide a parallel + * builder for the resulting collection. + * + * @define abortsignalling + * This method will provide sequential views it produces with `abort` signalling capabilities. This means + * that sequential views may send and read `abort` signals. + * + * @define indexsignalling + * This method will provide sequential views it produces with `indexFlag` signalling capabilities. This means + * that sequential views may set and read `indexFlag` state. + */ +trait ParallelIterableLike[+T, +Repr <: Parallel, +SequentialView <: Iterable[T]] +extends IterableLike[T, Repr] + with Parallelizable[Repr] + with Sequentializable[T, SequentialView] + with Parallel + with HasNewCombiner[T, Repr] + with TaskSupport { + self => + + /** Parallel iterators are split iterators that have additional accessor and + * transformer methods defined in terms of methods `next` and `hasNext`. + * When creating a new parallel collection, one might want to override these + * new methods to make them more efficient. + * + * Parallel iterators are augmented with signalling capabilities. This means + * that a signalling object can be assigned to them as needed. + * + * The self-type ensures that signal context passing behaviour gets mixed in + * a concrete object instance. + */ + trait ParallelIterator extends ParallelIterableIterator[T, Repr] { + me: SignalContextPassingIterator[ParallelIterator] => + var signalDelegate: Signalling = IdleSignalling + def repr = self.repr + def split: Seq[ParallelIterator] + } + + /** A stackable modification that ensures signal contexts get passed along the iterators. + * A self-type requirement in `ParallelIterator` ensures that this trait gets mixed into + * concrete iterators. + */ + trait SignalContextPassingIterator[+IterRepr <: ParallelIterator] extends ParallelIterator { + // Note: This functionality must be factored out to this inner trait to avoid boilerplate. + // Also, one could omit the cast below. However, this leads to return type inconsistencies, + // due to inability to override the return type of _abstract overrides_. + // Be aware that this stackable modification has to be subclassed, so it shouldn't be rigid + // on the type of iterators it splits. + // The alternative is some boilerplate - better to tradeoff some type safety to avoid it here. + abstract override def split: Seq[IterRepr] = { + val pits = super.split + pits foreach { _.signalDelegate = signalDelegate } + pits.asInstanceOf[Seq[IterRepr]] + } + } + + /** Convenience for signal context passing iterator. + */ + type SCPI <: SignalContextPassingIterator[ParallelIterator] + + /** Creates a new parallel iterator used to traverse the elements of this parallel collection. + * This iterator is more specific than the iterator of the returned by `iterator`, and augmented + * with additional accessor and transformer methods. + * + * @return a parallel iterator + */ + protected def parallelIterator: ParallelIterator + + /** Creates a new split iterator used to traverse the elements of this collection. + * + * By default, this method is implemented in terms of the protected `parallelIterator` method. + * + * @return a split iterator + */ + def iterator: Splitter[T] = parallelIterator + + def par = repr + + /** Some minimal number of elements after which this collection should be handled + * sequentially by different processors. + * + * This method depends on the size of the collection and the parallelism level, which + * are both specified as arguments. + * + * @param sz the size based on which to compute the threshold + * @param p the parallelism level based on which to compute the threshold + * @return the maximum number of elements for performing operations sequentially + */ + def threshold(sz: Int, p: Int): Int = thresholdFromSize(sz, p) + + /** The `newBuilder` operation returns a parallel builder assigned to this collection's fork/join pool. + * This method forwards the call to `newCombiner`. + */ + protected[this] override def newBuilder: collection.mutable.Builder[T, Repr] = newCombiner + + /** Optionally reuses existing combiner for better performance. By default it doesn't - subclasses may override this behaviour. + * The provided combiner `oldc` that can potentially be reused will be either some combiner from the previous computational task, or `None` if there + * was no previous phase (in which case this method must return `newc`). + * + * @param oldc The combiner that is the result of the previous task, or `None` if there was no previous task. + * @param newc The new, empty combiner that can be used. + * @return Either `newc` or `oldc`. + */ + protected def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]): Combiner[S, That] = newc + + /* convenience task operations wrapper */ + protected implicit def task2ops[R, Tp](tsk: Task[R, Tp]) = new { + def mapResult[R1](mapping: R => R1): ResultMapping[R, Tp, R1] = new ResultMapping[R, Tp, R1](tsk) { + def map(r: R): R1 = mapping(r) + } + + def compose[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] { + val ft = tsk + val st = t2 + def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr) + } + + def parallel[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] { + val ft = tsk + val st = t2 + def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr) + } + } + + protected def wrap[R](body: => R) = new NonDivisible[R] { + def leaf(prevr: Option[R]) = result = body + var result: R = null.asInstanceOf[R] + } + + /* convenience iterator operations wrapper */ + protected implicit def iterator2ops[PI <: ParallelIterator](it: PI) = new { + def assign(cntx: Signalling): PI = { + it.signalDelegate = cntx + it + } + } + + protected implicit def builder2ops[Elem, To](cb: Builder[Elem, To]) = new { + def ifIs[Cmb](isbody: Cmb => Unit) = new { + def otherwise(notbody: => Unit)(implicit m: ClassManifest[Cmb]) { + if (cb.getClass == m.erasure) isbody(cb.asInstanceOf[Cmb]) else notbody + } + } + } + + override def toString = seq.mkString(stringPrefix + "(", ", ", ")") + + /** Reduces the elements of this sequence using the specified associative binary operator. + * + * $undefinedorder + * + * Note this method has a different signature than the `reduceLeft` + * and `reduceRight` methods of the trait `Traversable`. + * The result of reducing may only be a supertype of this parallel collection's + * type parameter `T`. + * + * @tparam U A type parameter for the binary operator, a supertype of `T`. + * @param op A binary operator that must be associative. + * @return The result of applying reduce operator `op` between all the elements if the collection is nonempty. + * @throws UnsupportedOperationException + * if this $coll is empty. + */ + def reduce[U >: T](op: (U, U) => U): U = { + executeAndWaitResult(new Reduce(op, parallelIterator)) + } + + /** Optionally reduces the elements of this sequence using the specified associative binary operator. + * + * $undefinedorder + * + * Note this method has a different signature than the `reduceLeftOption` + * and `reduceRightOption` methods of the trait `Traversable`. + * The result of reducing may only be a supertype of this parallel collection's + * type parameter `T`. + * + * @tparam U A type parameter for the binary operator, a supertype of `T`. + * @param op A binary operator that must be associative. + * @return An option value containing result of applying reduce operator `op` between all + * the elements if the collection is nonempty, and `None` otherwise. + */ + def reduceOption[U >: T](op: (U, U) => U): Option[U] = if (isEmpty) None else Some(reduce(op)) + + /** Folds the elements of this sequence using the specified associative binary operator. + * The order in which the elements are reduced is unspecified and may be nondeterministic. + * + * Note this method has a different signature than the `foldLeft` + * and `foldRight` methods of the trait `Traversable`. + * The result of folding may only be a supertype of this parallel collection's + * type parameter `T`. + * + * @tparam U a type parameter for the binary operator, a supertype of `T`. + * @param z a neutral element for the fold operation, it may be added to the result + * an arbitrary number of times, not changing the result (e.g. `Nil` for list concatenation, + * 0 for addition, or 1 for multiplication) + * @param op a binary operator that must be associative + * @return the result of applying fold operator `op` between all the elements and `z` + */ + def fold[U >: T](z: U)(op: (U, U) => U): U = { + executeAndWaitResult(new Fold(z, op, parallelIterator)) + } + + /** Aggregates the results of applying an operator to subsequent elements. + * + * This is a more general form of `fold` and `reduce`. It has similar semantics, but does + * not require the result to be a supertype of the element type. It traverses the elements in + * different partitions sequentially, using `seqop` to update the result, and then + * applies `combop` to results from different partitions. The implementation of this + * operation may operate on an arbitrary number of collection partitions, so `combop` + * may be invoked arbitrary number of times. + * + * For example, one might want to process some elements and then produce a `Set`. In this + * case, `seqop` would process an element and append it to the list, while `combop` + * would concatenate two lists from different partitions together. The initial value + * `z` would be an empty set. + * + * {{{ + * pc.aggregate(Set[Int]())(_ += process(_), _ ++ _) + * }}} + * + * Another example is calculating geometric mean from a collection of doubles + * (one would typically require big doubles for this). + * + * @tparam S the type of accumulated results + * @param z the initial value for the accumulated result of the partition - this + * will typically be the neutral element for the `seqop` operator (e.g. + * `Nil` for list concatenation or `0` for summation) + * @param seqop an operator used to accumulate results within a partition + * @param combop an associative operator used to combine results from different partitions + */ + def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = { + executeAndWaitResult(new Aggregate(z, seqop, combop, parallelIterator)) + } + + /** Applies a function `f` to all the elements of the receiver. + * + * $undefinedorder + * + * @tparam U the result type of the function applied to each element, which is always discarded + * @param f function that's applied to each element + */ + override def foreach[U](f: T => U): Unit = { + executeAndWait(new Foreach(f, parallelIterator)) + } + + override def count(p: T => Boolean): Int = { + executeAndWaitResult(new Count(p, parallelIterator)) + } + + override def sum[U >: T](implicit num: Numeric[U]): U = { + executeAndWaitResult(new Sum[U](num, parallelIterator)) + } + + override def product[U >: T](implicit num: Numeric[U]): U = { + executeAndWaitResult(new Product[U](num, parallelIterator)) + } + + override def min[U >: T](implicit ord: Ordering[U]): T = { + executeAndWaitResult(new Min(ord, parallelIterator)).asInstanceOf[T] + } + + override def max[U >: T](implicit ord: Ordering[U]): T = { + executeAndWaitResult(new Max(ord, parallelIterator)).asInstanceOf[T] + } + + override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => + executeAndWaitResult(new Map[S, That](f, pbf, parallelIterator) mapResult { _.result }) + } otherwise super.map(f)(bf) + + override def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => + executeAndWaitResult(new Collect[S, That](pf, pbf, parallelIterator) mapResult { _.result }) + } otherwise super.collect(pf)(bf) + + override def flatMap[S, That](f: T => Traversable[S])(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => + executeAndWaitResult(new FlatMap[S, That](f, pbf, parallelIterator) mapResult { _.result }) + } otherwise super.flatMap(f)(bf) + + /** Tests whether a predicate holds for all elements of this $coll. + * + * $abortsignalling + * + * @param p a predicate used to test elements + * @return true if `p` holds for all elements, false otherwise + */ + override def forall(pred: T => Boolean): Boolean = { + executeAndWaitResult(new Forall(pred, parallelIterator assign new DefaultSignalling with VolatileAbort)) + } + + /** Tests whether a predicate holds for some element of this $coll. + * + * $abortsignalling + * + * @param p a predicate used to test elements + * @return true if `p` holds for some element, false otherwise + */ + override def exists(pred: T => Boolean): Boolean = { + executeAndWaitResult(new Exists(pred, parallelIterator assign new DefaultSignalling with VolatileAbort)) + } + + /** Finds some element in the collection for which the predicate holds, if such + * an element exists. The element may not necessarily be the first such element + * in the iteration order. + * + * If there are multiple elements obeying the predicate, the choice is nondeterministic. + * + * $abortsignalling + * + * @param p predicate used to test the elements + * @return an option value with the element if such an element exists, or `None` otherwise + */ + override def find(pred: T => Boolean): Option[T] = { + executeAndWaitResult(new Find(pred, parallelIterator assign new DefaultSignalling with VolatileAbort)) + } + + protected[this] def cbfactory = () => newCombiner + + override def filter(pred: T => Boolean): Repr = { + executeAndWaitResult(new Filter(pred, cbfactory, parallelIterator) mapResult { _.result }) + } + + override def filterNot(pred: T => Boolean): Repr = { + executeAndWaitResult(new FilterNot(pred, cbfactory, parallelIterator) mapResult { _.result }) + } + + override def ++[U >: T, That](that: TraversableOnce[U])(implicit bf: CanBuildFrom[Repr, U, That]): That = { + if (that.isParallel && bf.isParallel) { + // println("case both are parallel") + val other = that.asParallelIterable + val pbf = bf.asParallel + val copythis = new Copy(() => pbf(repr), parallelIterator) + val copythat = wrap { + val othtask = new other.Copy(() => pbf(self.repr), other.parallelIterator) + othtask.compute + othtask.result + } + val task = (copythis parallel copythat) { _ combine _ } mapResult { _.result } + executeAndWaitResult(task) + } else if (bf.isParallel) { + // println("case parallel builder, `that` not parallel") + val pbf = bf.asParallel + val copythis = new Copy(() => pbf(repr), parallelIterator) + val copythat = wrap { + val cb = pbf(repr) + for (elem <- that) cb += elem + cb + } + executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.result }) + } else { + // println("case not a parallel builder") + val b = bf(repr) + this.parallelIterator.copy2builder[U, That, Builder[U, That]](b) + if (that.isInstanceOf[Parallel]) for (elem <- that.asInstanceOf[Iterable[U]].iterator) b += elem + else for (elem <- that) b += elem + b.result + } + } + + override def partition(pred: T => Boolean): (Repr, Repr) = { + executeAndWaitResult(new Partition(pred, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) }) + } + + override def take(n: Int): Repr = { + val actualn = if (size > n) n else size + if (actualn < MIN_FOR_COPY) take_sequential(actualn) + else executeAndWaitResult(new Take(actualn, cbfactory, parallelIterator) mapResult { _.result }) + } + + private def take_sequential(n: Int) = { + val cb = newCombiner + cb.sizeHint(n) + val it = parallelIterator + var left = n + while (left > 0) { + cb += it.next + left -= 1 + } + cb.result + } + + override def drop(n: Int): Repr = { + val actualn = if (size > n) n else size + if ((size - actualn) < MIN_FOR_COPY) drop_sequential(actualn) + else executeAndWaitResult(new Drop(actualn, cbfactory, parallelIterator) mapResult { _.result }) + } + + private def drop_sequential(n: Int) = { + val it = parallelIterator drop n + val cb = newCombiner + cb.sizeHint(size - n) + while (it.hasNext) cb += it.next + cb.result + } + + override def slice(unc_from: Int, unc_until: Int): Repr = { + val from = unc_from min size max 0 + val until = unc_until min size max from + if ((until - from) <= MIN_FOR_COPY) slice_sequential(from, until) + else executeAndWaitResult(new Slice(from, until, cbfactory, parallelIterator) mapResult { _.result }) + } + + private def slice_sequential(from: Int, until: Int): Repr = { + val cb = newCombiner + var left = until - from + val it = parallelIterator drop from + while (left > 0) { + cb += it.next + left -= 1 + } + cb.result + } + + override def splitAt(n: Int): (Repr, Repr) = { + executeAndWaitResult(new SplitAt(n, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) }) + } + + /** Takes the longest prefix of elements that satisfy the predicate. + * + * $indexsignalling + * The index flag is initially set to maximum integer value. + * + * @param pred the predicate used to test the elements + * @return the longest prefix of this $coll of elements that satisy the predicate `pred` + */ + override def takeWhile(pred: T => Boolean): Repr = { + val cntx = new DefaultSignalling with AtomicIndexFlag + cntx.setIndexFlag(Int.MaxValue) + executeAndWaitResult(new TakeWhile(0, pred, cbfactory, parallelIterator assign cntx) mapResult { _._1.result }) + } + + /** Splits this $coll into a prefix/suffix pair according to a predicate. + * + * $indexsignalling + * The index flag is initially set to maximum integer value. + * + * @param pred the predicate used to test the elements + * @return a pair consisting of the longest prefix of the collection for which all + * the elements satisfy `pred`, and the rest of the collection + */ + override def span(pred: T => Boolean): (Repr, Repr) = { + val cntx = new DefaultSignalling with AtomicIndexFlag + cntx.setIndexFlag(Int.MaxValue) + executeAndWaitResult(new Span(0, pred, cbfactory, parallelIterator assign cntx) mapResult { + p => (p._1.result, p._2.result) + }) + } + + /** Drops all elements in the longest prefix of elements that satisfy the predicate, + * and returns a collection composed of the remaining elements. + * + * $indexsignalling + * The index flag is initially set to maximum integer value. + * + * @param pred the predicate used to test the elements + * @return a collection composed of all the elements after the longest prefix of elements + * in this $coll that satisfy the predicate `pred` + */ + override def dropWhile(pred: T => Boolean): Repr = { + val cntx = new DefaultSignalling with AtomicIndexFlag + cntx.setIndexFlag(Int.MaxValue) + executeAndWaitResult(new Span(0, pred, cbfactory, parallelIterator assign cntx) mapResult { _._2.result }) + } + + override def copyToArray[U >: T](xs: Array[U], start: Int, len: Int) = if (len > 0) { + executeAndWait(new CopyToArray(start, len, xs, parallelIterator)) + } + + override def toIterable: Iterable[T] = seq.drop(0).asInstanceOf[Iterable[T]] + + override def toArray[U >: T: ClassManifest]: Array[U] = { + val arr = new Array[U](size) + copyToArray(arr) + arr + } + + override def toList: List[T] = seq.toList + + override def toIndexedSeq[S >: T]: collection.immutable.IndexedSeq[S] = seq.toIndexedSeq[S] + + override def toStream: Stream[T] = seq.toStream + + override def toSet[S >: T]: collection.immutable.Set[S] = seq.toSet + + override def toSeq: Seq[T] = seq.toSeq + + /* tasks */ + + /** Standard accessor task that iterates over the elements of the collection. + * + * @tparam R type of the result of this method (`R` for result). + * @tparam Tp the representation type of the task at hand. + */ + protected trait Accessor[R, Tp] + extends super.Task[R, Tp] { + val pit: ParallelIterator + def newSubtask(p: ParallelIterator): Accessor[R, Tp] + def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel) + def split = pit.split.map(newSubtask(_)) // default split procedure + override def toString = "Accessor(" + pit.toString + ")" + } + + protected[this] trait NonDivisibleTask[R, Tp] extends super.Task[R, Tp] { + def shouldSplitFurther = false + def split = throw new UnsupportedOperationException("Does not split.") + override def toString = "NonDivisibleTask" + } + + protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]] + + protected[this] trait Composite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] + extends NonDivisibleTask[R, Composite[FR, SR, R, First, Second]] { + val ft: First + val st: Second + def combineResults(fr: FR, sr: SR): R + var result: R = null.asInstanceOf[R] + } + + /** Sequentially performs one task after another. */ + protected[this] trait SeqComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] + extends Composite[FR, SR, R, First, Second] { + def leaf(prevr: Option[R]) = { + ft.compute + st.compute + result = combineResults(ft.result, st.result) + } + } + + /** Performs two tasks in parallel, and waits for both to finish. */ + protected[this] trait ParComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] + extends Composite[FR, SR, R, First, Second] { + def leaf(prevr: Option[R]) = { + st.start + ft.compute + st.sync + result = combineResults(ft.result, st.result) + } + } + + protected[this] abstract class ResultMapping[R, Tp, R1](val inner: Task[R, Tp]) + extends NonDivisibleTask[R1, ResultMapping[R, Tp, R1]] { + var result: R1 = null.asInstanceOf[R1] + def map(r: R): R1 + def leaf(prevr: Option[R1]) = { + inner.compute + result = map(inner.result) + } + } + + protected trait Transformer[R, Tp] extends Accessor[R, Tp] + + protected[this] class Foreach[S](op: T => S, val pit: ParallelIterator) extends Accessor[Unit, Foreach[S]] { + var result: Unit = () + def leaf(prevr: Option[Unit]) = pit.foreach(op) + def newSubtask(p: ParallelIterator) = new Foreach[S](op, p) + } + + protected[this] class Count(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Int, Count] { + var result: Int = 0 + def leaf(prevr: Option[Int]) = result = pit.count(pred) + def newSubtask(p: ParallelIterator) = new Count(pred, p) + override def merge(that: Count) = result = result + that.result + } + + protected[this] class Reduce[U >: T](op: (U, U) => U, val pit: ParallelIterator) extends Accessor[U, Reduce[U]] { + var result: U = null.asInstanceOf[U] + def leaf(prevr: Option[U]) = result = pit.reduce(op) + def newSubtask(p: ParallelIterator) = new Reduce(op, p) + override def merge(that: Reduce[U]) = result = op(result, that.result) + } + + protected[this] class Fold[U >: T](z: U, op: (U, U) => U, val pit: ParallelIterator) extends Accessor[U, Fold[U]] { + var result: U = null.asInstanceOf[U] + def leaf(prevr: Option[U]) = result = pit.fold(z)(op) + def newSubtask(p: ParallelIterator) = new Fold(z, op, p) + override def merge(that: Fold[U]) = result = op(result, that.result) + } + + protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, val pit: ParallelIterator) + extends Accessor[S, Aggregate[S]] { + var result: S = null.asInstanceOf[S] + def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop) + def newSubtask(p: ParallelIterator) = new Aggregate(z, seqop, combop, p) + override def merge(that: Aggregate[S]) = result = combop(result, that.result) + } + + protected[this] class Sum[U >: T](num: Numeric[U], val pit: ParallelIterator) extends Accessor[U, Sum[U]] { + var result: U = null.asInstanceOf[U] + def leaf(prevr: Option[U]) = result = pit.sum(num) + def newSubtask(p: ParallelIterator) = new Sum(num, p) + override def merge(that: Sum[U]) = result = num.plus(result, that.result) + } + + protected[this] class Product[U >: T](num: Numeric[U], val pit: ParallelIterator) extends Accessor[U, Product[U]] { + var result: U = null.asInstanceOf[U] + def leaf(prevr: Option[U]) = result = pit.product(num) + def newSubtask(p: ParallelIterator) = new Product(num, p) + override def merge(that: Product[U]) = result = num.times(result, that.result) + } + + protected[this] class Min[U >: T](ord: Ordering[U], val pit: ParallelIterator) extends Accessor[U, Min[U]] { + var result: U = null.asInstanceOf[U] + def leaf(prevr: Option[U]) = result = pit.min(ord) + def newSubtask(p: ParallelIterator) = new Min(ord, p) + override def merge(that: Min[U]) = result = if (ord.lteq(result, that.result)) result else that.result + } + + protected[this] class Max[U >: T](ord: Ordering[U], val pit: ParallelIterator) extends Accessor[U, Max[U]] { + var result: U = null.asInstanceOf[U] + def leaf(prevr: Option[U]) = result = pit.max(ord) + def newSubtask(p: ParallelIterator) = new Max(ord, p) + override def merge(that: Max[U]) = result = if (ord.gteq(result, that.result)) result else that.result + } + + protected[this] class Map[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator) + extends Transformer[Combiner[S, That], Map[S, That]] { + var result: Combiner[S, That] = null + def leaf(prev: Option[Combiner[S, That]]) = result = pit.map2combiner(f, reuse(prev, pbf(self.repr))) + def newSubtask(p: ParallelIterator) = new Map(f, pbf, p) + override def merge(that: Map[S, That]) = result = result combine that.result + } + + protected[this] class Collect[S, That] + (pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator) + extends Transformer[Combiner[S, That], Collect[S, That]] { + var result: Combiner[S, That] = null + def leaf(prev: Option[Combiner[S, That]]) = result = pit.collect2combiner[S, That](pf, pbf) // TODO + def newSubtask(p: ParallelIterator) = new Collect(pf, pbf, p) + override def merge(that: Collect[S, That]) = result = result combine that.result + } + + protected[this] class FlatMap[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator) + extends Transformer[Combiner[S, That], FlatMap[S, That]] { + var result: Combiner[S, That] = null + def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf) // TODO + def newSubtask(p: ParallelIterator) = new FlatMap(f, pbf, p) + override def merge(that: FlatMap[S, That]) = result = result combine that.result + } + + protected[this] class Forall(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Boolean, Forall] { + var result: Boolean = true + def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.forall(pred); if (result == false) pit.abort } + def newSubtask(p: ParallelIterator) = new Forall(pred, p) + override def merge(that: Forall) = result = result && that.result + } + + protected[this] class Exists(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Boolean, Exists] { + var result: Boolean = false + def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.exists(pred); if (result == true) pit.abort } + def newSubtask(p: ParallelIterator) = new Exists(pred, p) + override def merge(that: Exists) = result = result || that.result + } + + protected[this] class Find[U >: T](pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Option[U], Find[U]] { + var result: Option[U] = None + def leaf(prev: Option[Option[U]]) = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort } + def newSubtask(p: ParallelIterator) = new Find(pred, p) + override def merge(that: Find[U]) = if (this.result == None) result = that.result + } + + protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) + extends Transformer[Combiner[U, This], Filter[U, This]] { + var result: Combiner[U, This] = null + def leaf(prev: Option[Combiner[U, This]]) = result = pit.filter2combiner(pred, reuse(prev, cbf())) + def newSubtask(p: ParallelIterator) = new Filter(pred, cbf, p) + override def merge(that: Filter[U, This]) = result = result combine that.result + } + + protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) + extends Transformer[Combiner[U, This], FilterNot[U, This]] { + var result: Combiner[U, This] = null + def leaf(prev: Option[Combiner[U, This]]) = result = pit.filterNot2combiner(pred, reuse(prev, cbf())) + def newSubtask(p: ParallelIterator) = new FilterNot(pred, cbf, p) + override def merge(that: FilterNot[U, This]) = result = result combine that.result + } + + protected class Copy[U >: T, That](cfactory: () => Combiner[U, That], val pit: ParallelIterator) + extends Transformer[Combiner[U, That], Copy[U, That]] { + var result: Combiner[U, That] = null + def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cfactory())) + def newSubtask(p: ParallelIterator) = new Copy[U, That](cfactory, p) + override def merge(that: Copy[U, That]) = result = result combine that.result + } + + protected[this] class Partition[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) + extends Transformer[(Combiner[U, This], Combiner[U, This]), Partition[U, This]] { + var result: (Combiner[U, This], Combiner[U, This]) = null + def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.partition2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) + def newSubtask(p: ParallelIterator) = new Partition(pred, cbf, p) + override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2) + } + + protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator) + extends Transformer[Combiner[U, This], Take[U, This]] { + var result: Combiner[U, This] = null + def leaf(prev: Option[Combiner[U, This]]) = result = pit.take2combiner(n, reuse(prev, cbf())) + def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException + override def split = { + val pits = pit.split + val sizes = pits.scanLeft(0)(_ + _.remaining) + for ((p, untilp) <- pits zip sizes; if untilp <= n) yield { + if (untilp + p.remaining < n) new Take(p.remaining, cbf, p) + else new Take(n - untilp, cbf, p) + } + } + override def merge(that: Take[U, This]) = result = result combine that.result + } + + protected[this] class Drop[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator) + extends Transformer[Combiner[U, This], Drop[U, This]] { + var result: Combiner[U, This] = null + def leaf(prev: Option[Combiner[U, This]]) = result = pit.drop2combiner(n, reuse(prev, cbf())) + def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException + override def split = { + val pits = pit.split + val sizes = pits.scanLeft(0)(_ + _.remaining) + for ((p, withp) <- pits zip sizes.tail; if withp >= n) yield { + if (withp - p.remaining > n) new Drop(0, cbf, p) + else new Drop(n - withp + p.remaining, cbf, p) + } + } + override def merge(that: Drop[U, This]) = result = result combine that.result + } + + protected[this] class Slice[U >: T, This >: Repr](from: Int, until: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator) + extends Transformer[Combiner[U, This], Slice[U, This]] { + var result: Combiner[U, This] = null + def leaf(prev: Option[Combiner[U, This]]) = result = pit.slice2combiner(from, until, reuse(prev, cbf())) + def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException + override def split = { + val pits = pit.split + val sizes = pits.scanLeft(0)(_ + _.remaining) + for ((p, untilp) <- pits zip sizes; if untilp + p.remaining >= from || untilp <= until) yield { + val f = (from max untilp) - untilp + val u = (until min (untilp + p.remaining)) - untilp + new Slice(f, u, cbf, p) + } + } + override def merge(that: Slice[U, This]) = result = result combine that.result + } + + protected[this] class SplitAt[U >: T, This >: Repr](at: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator) + extends Transformer[(Combiner[U, This], Combiner[U, This]), SplitAt[U, This]] { + var result: (Combiner[U, This], Combiner[U, This]) = null + def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.splitAt2combiners(at, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) + def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException + override def split = { + val pits = pit.split + val sizes = pits.scanLeft(0)(_ + _.remaining) + for ((p, untilp) <- pits zip sizes) yield new SplitAt((at max untilp min (untilp + p.remaining)) - untilp, cbf, p) + } + override def merge(that: SplitAt[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2) + } + + protected[this] class TakeWhile[U >: T, This >: Repr] + (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) + extends Transformer[(Combiner[U, This], Boolean), TakeWhile[U, This]] { + var result: (Combiner[U, This], Boolean) = null + def leaf(prev: Option[(Combiner[U, This], Boolean)]) = if (pos < pit.indexFlag) { + result = pit.takeWhile2combiner(pred, reuse(prev.map(_._1), cbf())) + if (!result._2) pit.setIndexFlagIfLesser(pos) + } else result = (reuse(prev.map(_._1), cbf()), false) + def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException + override def split = { + val pits = pit.split + for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new TakeWhile(pos + untilp, pred, cbf, p) + } + override def merge(that: TakeWhile[U, This]) = if (result._2) { + result = (result._1 combine that.result._1, that.result._2) + } + } + + protected[this] class Span[U >: T, This >: Repr] + (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) + extends Transformer[(Combiner[U, This], Combiner[U, This]), Span[U, This]] { + var result: (Combiner[U, This], Combiner[U, This]) = null + def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = if (pos < pit.indexFlag) { + result = pit.span2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) + if (result._2.size > 0) pit.setIndexFlagIfLesser(pos) + } else { + result = (reuse(prev.map(_._2), cbf()), pit.copy2builder[U, This, Combiner[U, This]](reuse(prev.map(_._2), cbf()))) + } + def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException + override def split = { + val pits = pit.split + for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Span(pos + untilp, pred, cbf, p) + } + override def merge(that: Span[U, This]) = result = if (result._2.size == 0) { + (result._1 combine that.result._1, that.result._2) + } else { + (result._1, result._2 combine that.result._1 combine that.result._2) + } + } + + protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], val pit: ParallelIterator) + extends Accessor[Unit, CopyToArray[U, This]] { + var result: Unit = () + def leaf(prev: Option[Unit]) = pit.copyToArray(array, from, len) + def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException + override def split = { + val pits = pit.split + for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining); if untilp < len) yield { + val plen = p.remaining min (len - untilp) + new CopyToArray[U, This](from + untilp, plen, array, p) + } + } + } + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/ParallelIterableView.scala b/src/library/scala/collection/parallel/ParallelIterableView.scala new file mode 100644 index 0000000000..f40f02eb3b --- /dev/null +++ b/src/library/scala/collection/parallel/ParallelIterableView.scala @@ -0,0 +1,33 @@ +package scala.collection.parallel + + + + +import scala.collection.Parallel +import scala.collection.TraversableViewLike +import scala.collection.IterableView + + + + +/** A template view of a non-strict view of a parallel iterable collection. + * + * @tparam T ... + * @tparam Coll ... + * + * @since 2.8 + */ +trait ParallelIterableView[+T, +Coll <: Parallel, +CollSeq] +extends ParallelIterableViewLike[T, Coll, CollSeq, ParallelIterableView[T, Coll, CollSeq], IterableView[T, CollSeq]] + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/ParallelIterableViewLike.scala b/src/library/scala/collection/parallel/ParallelIterableViewLike.scala new file mode 100644 index 0000000000..024eb48d25 --- /dev/null +++ b/src/library/scala/collection/parallel/ParallelIterableViewLike.scala @@ -0,0 +1,59 @@ +package scala.collection.parallel + + + + +import scala.collection.Parallel +import scala.collection.TraversableViewLike +import scala.collection.IterableView +import scala.collection.IterableViewLike + + + + + +/** A template view of a non-strict view of parallel iterable collection. + * + * '''Note:''' Regular view traits have type parameters used to carry information + * about the type of the elements, type of the collection they are derived from and + * their own actual representation type. Parallel views have an additional parameter + * which carries information about the type of the sequential version of the view. + * + * @tparam T the type of the elements this view can traverse + * @tparam Coll the type of the collection this view is derived from + * @tparam CollSeq TODO + * @tparam This the actual representation type of this view + * @tparam ThisSeq the type of the sequential representation of this view + * + * @since 2.8 + */ +trait ParallelIterableViewLike[+T, + +Coll <: Parallel, + +CollSeq, + +This <: ParallelIterableView[T, Coll, CollSeq] with ParallelIterableViewLike[T, Coll, CollSeq, This, ThisSeq], + +ThisSeq <: IterableView[T, CollSeq] with IterableViewLike[T, CollSeq, ThisSeq]] +extends IterableView[T, Coll] + with IterableViewLike[T, Coll, This] + with ParallelIterable[T] + with ParallelIterableLike[T, This, ThisSeq] +{ + self => + + override protected[this] def newCombiner: Combiner[T, This] = throw new UnsupportedOperationException(this + ".newCombiner"); + + //type SCPI = SignalContextPassingIterator[ParallelIterator] // complains when overriden further in inh. hier., TODO check it out + type CPI = SignalContextPassingIterator[ParallelIterator] + + trait Transformed[+S] extends ParallelIterableView[S, Coll, CollSeq] with super.Transformed[S] + +} + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/ParallelMap.scala b/src/library/scala/collection/parallel/ParallelMap.scala new file mode 100644 index 0000000000..5ce61469bc --- /dev/null +++ b/src/library/scala/collection/parallel/ParallelMap.scala @@ -0,0 +1,71 @@ +package scala.collection.parallel + + + + + +import scala.collection.Map +import scala.collection.mutable.Builder +import scala.collection.generic.ParallelMapFactory +import scala.collection.generic.GenericParallelMapTemplate +import scala.collection.generic.GenericParallelMapCompanion +import scala.collection.generic.CanCombineFrom + + + + + + +trait ParallelMap[K, +V] +extends Map[K, V] + with GenericParallelMapTemplate[K, V, ParallelMap] + with ParallelIterable[(K, V)] + with ParallelMapLike[K, V, ParallelMap[K, V], Map[K, V]] +{ +self => + + def mapCompanion: GenericParallelMapCompanion[ParallelMap] = ParallelMap + + override def empty: ParallelMap[K, V] = new immutable.ParallelHashTrie[K, V] + + override def stringPrefix = "ParallelMap" +} + + + +object ParallelMap extends ParallelMapFactory[ParallelMap] { + def empty[K, V]: ParallelMap[K, V] = new immutable.ParallelHashTrie[K, V] + + def newCombiner[K, V]: Combiner[(K, V), ParallelMap[K, V]] = immutable.HashTrieCombiner[K, V] + + implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParallelMap[K, V]] = new CanCombineFromMap[K, V] + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/ParallelMapLike.scala b/src/library/scala/collection/parallel/ParallelMapLike.scala new file mode 100644 index 0000000000..8a0b54525f --- /dev/null +++ b/src/library/scala/collection/parallel/ParallelMapLike.scala @@ -0,0 +1,43 @@ +package scala.collection.parallel + + + + +import scala.collection.MapLike +import scala.collection.Map +import scala.collection.mutable.Builder + + + + + + + + +trait ParallelMapLike[K, + +V, + +Repr <: ParallelMapLike[K, V, Repr, SequentialView] with ParallelMap[K, V], + +SequentialView <: Map[K, V]] +extends MapLike[K, V, Repr] + with ParallelIterableLike[(K, V), Repr, SequentialView] +{ self => + + protected[this] override def newBuilder: Builder[(K, V), Repr] = newCombiner + + protected[this] override def newCombiner: Combiner[(K, V), Repr] = error("Must be implemented in concrete classes.") + + override def empty: Repr + +} + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/ParallelSeq.scala b/src/library/scala/collection/parallel/ParallelSeq.scala new file mode 100644 index 0000000000..71b802cd11 --- /dev/null +++ b/src/library/scala/collection/parallel/ParallelSeq.scala @@ -0,0 +1,64 @@ +package scala.collection.parallel + + + +import scala.collection.generic.GenericCompanion +import scala.collection.generic.GenericParallelCompanion +import scala.collection.generic.GenericParallelTemplate +import scala.collection.generic.ParallelFactory +import scala.collection.generic.CanCombineFrom +import scala.collection.parallel.mutable.ParallelArrayCombiner +import scala.collection.parallel.mutable.ParallelArray + + +/** A template trait for parallel sequences. + * + * $parallelseqinfo + * + * $sideeffects + */ +trait ParallelSeq[+T] extends Seq[T] + with ParallelIterable[T] + with GenericParallelTemplate[T, ParallelSeq] + with ParallelSeqLike[T, ParallelSeq[T], Seq[T]] { + override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq + + def apply(i: Int): T + + override def toString = super[ParallelIterable].toString +} + + +object ParallelSeq extends ParallelFactory[ParallelSeq] { + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeq[T]] = new GenericCanCombineFrom[T] + + def newBuilder[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T] + + def newCombiner[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T] +} + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/ParallelSeqLike.scala b/src/library/scala/collection/parallel/ParallelSeqLike.scala new file mode 100644 index 0000000000..18b0c83f23 --- /dev/null +++ b/src/library/scala/collection/parallel/ParallelSeqLike.scala @@ -0,0 +1,473 @@ +package scala.collection.parallel + + +import scala.collection.Parallel +import scala.collection.SeqLike +import scala.collection.generic.DefaultSignalling +import scala.collection.generic.AtomicIndexFlag +import scala.collection.generic.CanBuildFrom +import scala.collection.generic.CanCombineFrom +import scala.collection.generic.VolatileAbort + + + + +// TODO update docs!! +/** A template trait for sequences of type `ParallelSeq[T]`, representing + * parallel sequences with element type `T`. + * + * $parallelseqinfo + * + * @tparam T the type of the elements contained in this collection + * @tparam Repr the type of the actual collection containing the elements + * + * @define parallelseqinfo + * Parallel sequences inherit the `IndexedSeq` trait. This means they provide + * efficient indexing and length computations. Like their sequential counterparts + * they always have a defined order of elements. This means they will produce resulting + * parallel sequences in the same way sequential sequences do. However, the order + * in which they iterate over elements to produce results is not defined and is generally + * nondeterministic. If the higher-order functions given to them produce no sideeffects, + * then this won't be noticeable. + * + * This trait defines a new, more general `split` operation and reimplements the `split` + * operation of `ParallelIterable` trait using the new `split` operation. + * + * @author prokopec + * @since 2.8 + */ +trait ParallelSeqLike[+T, +Repr <: Parallel, +Sequential <: Seq[T] with SeqLike[T, Sequential]] +extends scala.collection.SeqLike[T, Repr] + with ParallelIterableLike[T, Repr, Sequential] { + self => + + type SuperParallelIterator = super.ParallelIterator + + /** An iterator that can be split into arbitrary subsets of iterators. + * The self-type requirement ensures that the signal context passing behaviour gets mixed in + * the concrete iterator instance in some concrete collection. + * + * '''Note:''' In concrete collection classes, collection implementers might want to override the iterator + * `reverse2builder` method to ensure higher efficiency. + */ + trait ParallelIterator extends ParallelSeqIterator[T, Repr] with super.ParallelIterator { + me: SignalContextPassingIterator[ParallelIterator] => + def split: Seq[ParallelIterator] + def psplit(sizes: Int*): Seq[ParallelIterator] + } + + /** A stackable modification that ensures signal contexts get passed along the iterators. + * A self-type requirement in `ParallelIterator` ensures that this trait gets mixed into + * concrete iterators. + */ + trait SignalContextPassingIterator[+IterRepr <: ParallelIterator] + extends ParallelIterator with super.SignalContextPassingIterator[IterRepr] { + // Note: See explanation in `ParallelIterableLike.this.SignalContextPassingIterator` + // to understand why we do the cast here, and have a type parameter. + // Bottomline: avoiding boilerplate and fighting against inability to override stackable modifications. + abstract override def psplit(sizes: Int*): Seq[IterRepr] = { + val pits = super.psplit(sizes: _*) + pits foreach { _.signalDelegate = signalDelegate } + pits.asInstanceOf[Seq[IterRepr]] + } + } + + /** A convenient shorthand for the signal context passing stackable modification. + */ + type SCPI <: SignalContextPassingIterator[ParallelIterator] + + /** A more refined version of the iterator found in the `ParallelIterable` trait, + * this iterator can be split into arbitrary subsets of iterators. + * + * @return an iterator that can be split into subsets of precise size + */ + protected def parallelIterator: ParallelIterator + + override def iterator: PreciseSplitter[T] = parallelIterator + + override def size = length + + /** Used to iterate elements using indices */ + protected abstract class Elements(start: Int, val end: Int) extends ParallelIterator with BufferedIterator[T] { + me: SignalContextPassingIterator[ParallelIterator] => + + private var i = start + + def hasNext = i < end + + def next: T = if (i < end) { + val x = self(i) + i += 1 + x + } else Iterator.empty.next + + def head = self(i) + + final def remaining = end - i + + def split = psplit(remaining / 2, remaining - remaining / 2) + + def psplit(sizes: Int*) = { + val incr = sizes.scanLeft(0)(_ + _) + for ((from, until) <- incr.init zip incr.tail) yield { + new Elements(start + from, (start + until) min end) with SignalContextPassingIterator[ParallelIterator] + } + } + + override def toString = "Elements(" + start + ", " + end + ")" + } + + /* ParallelSeq methods */ + + /** Returns the length of the longest segment of elements starting at + * a given position satisfying some predicate. + * + * $indexsignalling + * + * The index flag is initially set to maximum integer value. + * + * @param p the predicate used to test the elements + * @param from the starting offset for the search + * @return the length of the longest segment of elements starting at `from` and + * satisfying the predicate + */ + override def segmentLength(p: T => Boolean, from: Int): Int = if (from >= length) 0 else { + val realfrom = if (from < 0) 0 else from + val ctx = new DefaultSignalling with AtomicIndexFlag + ctx.setIndexFlag(Int.MaxValue) + executeAndWaitResult(new SegmentLength(p, 0, parallelIterator.psplit(realfrom, length - realfrom)(1) assign ctx))._1 + } + + override def prefixLength(p: T => Boolean) = segmentLength(p, 0) + + /** Finds the first element satisfying some predicate. + * + * $indexsignalling + * + * The index flag is initially set to maximum integer value. + * + * @param p the predicate used to test the elements + * @param from the starting offset for the search + * @return the index `>= from` of the first element of this $coll that satisfies the predicate `p`, + * or `-1`, if none exists + */ + override def indexWhere(p: T => Boolean, from: Int): Int = if (from >= length) -1 else { + val realfrom = if (from < 0) 0 else from + val ctx = new DefaultSignalling with AtomicIndexFlag + ctx.setIndexFlag(Int.MaxValue) + executeAndWaitResult(new IndexWhere(p, realfrom, parallelIterator.psplit(realfrom, length - realfrom)(1) assign ctx)) + } + + override def indexWhere(p: T => Boolean): Int = indexWhere(p, 0) + + override def findIndexOf(p: T => Boolean): Int = indexWhere(p, 0) + + override def indexOf[U >: T](elem: U): Int = indexOf(elem, 0) + + override def indexOf[U >: T](elem: U, from: Int): Int = indexWhere(elem ==, from) + + /** Finds the last element satisfying some predicate. + * + * $indexsignalling + * + * The index flag is initially set to minimum integer value. + * + * @param p the predicate used to test the elements + * @param end the maximum offset for the search + * @return the index `<= end` of the first element of this $coll that satisfies the predicate `p`, + * or `-1`, if none exists + */ + override def lastIndexWhere(p: T => Boolean, end: Int): Int = if (end < 0) -1 else { + val until = if (end >= length) length else end + 1 + val ctx = new DefaultSignalling with AtomicIndexFlag + ctx.setIndexFlag(Int.MinValue) + executeAndWaitResult(new LastIndexWhere(p, 0, parallelIterator.psplit(until, length - until)(0) assign ctx)) + } + + override def reverse: Repr = { + executeAndWaitResult(new Reverse(() => newCombiner, parallelIterator) mapResult { _.result }) + } + + override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => + executeAndWaitResult(new ReverseMap[S, That](f, pbf, parallelIterator) mapResult { _.result }) + } otherwise super.reverseMap(f)(bf) + + override def startsWith[S](that: Seq[S]): Boolean = startsWith(that, 0) + + /** Tests whether this $coll contains the given sequence at a given index. + * + * $abortsignalling + * + * @tparam U the element type of `that` parallel sequence + * @param that the parallel sequence this sequence is being searched for + * @param offset the starting offset for the search + * @return `true` if there is a sequence `that` starting at `offset` in this sequence, `false` otherwise + */ + override def startsWith[S](that: Seq[S], offset: Int): Boolean = that ifParallelSeq { pthat => + if (offset < 0 || offset >= length) offset == length && pthat.length == 0 + else if (pthat.length == 0) true + else if (pthat.length > length - offset) false + else { + val ctx = new DefaultSignalling with VolatileAbort + executeAndWaitResult(new SameElements(parallelIterator.psplit(offset, pthat.length)(1) assign ctx, pthat.parallelIterator)) + } + } otherwise super.startsWith(that, offset) + + override def sameElements[U >: T](that: Iterable[U]): Boolean = that ifParallelSeq { pthat => + val ctx = new DefaultSignalling with VolatileAbort + length == pthat.length && executeAndWaitResult(new SameElements(parallelIterator assign ctx, pthat.parallelIterator)) + } otherwise super.sameElements(that) + + /** Tests whether this $coll ends with the given parallel sequence + * + * $abortsignalling + * + * @tparam S the type of the elements of `that` sequence + * @param that the sequence to test + * @return `true` if this $coll has `that` as a suffix, `false` otherwise + */ + override def endsWith[S](that: Seq[S]): Boolean = that ifParallelSeq { pthat => + if (that.length == 0) true + else if (that.length > length) false + else { + val ctx = new DefaultSignalling with VolatileAbort + val tlen = that.length + executeAndWaitResult(new SameElements(parallelIterator.psplit(length - tlen, tlen)(1) assign ctx, pthat.parallelIterator)) + } + } otherwise super.endsWith(that) + + override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int) + (implicit bf: CanBuildFrom[Repr, U, That]): That = if (patch.isParallelSeq && bf.isParallel) { + val that = patch.asParallelSeq + val pbf = bf.asParallel + val realreplaced = replaced min (length - from) + val pits = parallelIterator.psplit(from, replaced, length - from - realreplaced) + val copystart = new Copy[U, That](() => pbf(repr), pits(0)) + val copymiddle = wrap { + val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator) + tsk.compute + tsk.result + } + val copyend = new Copy[U, That](() => pbf(repr), pits(2)) + executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { _.result }) + } else patch_sequential(from, patch, replaced) + + private def patch_sequential[U >: T, That](from: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = { + val b = bf(repr) + val repl = r min (length - from) + val pits = parallelIterator.psplit(from, repl, length - from - repl) + b ++= pits(0) + b ++= patch.iterator + b ++= pits(2) + b.result + } + + override def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = bf ifParallel { pbf => + executeAndWaitResult(new Updated(index, elem, pbf, parallelIterator) mapResult { _.result }) + } otherwise super.updated(index, elem) + + override def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = { + patch(0, mutable.ParallelArray(elem), 0) + } + + override def :+[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = { + patch(length, mutable.ParallelArray(elem), 0) + } + + override def padTo[U >: T, That](len: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (length < len) { + patch(length, new immutable.Repetition(elem, len - length), 0) + } else patch(length, Nil, 0) + + /** Tests whether every element of this $coll relates to the + * corresponding element of another parallel sequence by satisfying a test predicate. + * + * $abortsignalling + * + * @param that the other parallel sequence + * @param p the test predicate, which relates elements from both sequences + * @tparam S the type of the elements of `that` + * @return `true` if both parallel sequences have the same length and + * `p(x, y)` is `true` for all corresponding elements `x` of this $coll + * and `y` of `that`, otherwise `false` + */ + override def corresponds[S](that: Seq[S])(p: (T, S) => Boolean): Boolean = that ifParallelSeq { pthat => + val ctx = new DefaultSignalling with VolatileAbort + length == pthat.length && executeAndWaitResult(new Corresponds(p, parallelIterator assign ctx, pthat.parallelIterator)) + } otherwise super.corresponds(that)(p) + + override def toString = seq.mkString(stringPrefix + "(", ", ", ")") + + override def view = new ParallelSeqView[T, Repr, Sequential] { + protected lazy val underlying = self.repr + def length = self.length + def apply(idx: Int) = self(idx) + def seq = self.seq.view + def parallelIterator = new Elements(0, length) with SCPI {} + } + + override def view(from: Int, until: Int) = view.slice(from, until) + + /* tasks */ + + protected def down(p: SuperParallelIterator) = p.asInstanceOf[ParallelIterator] + + protected trait Accessor[R, Tp] extends super.Accessor[R, Tp] { + val pit: ParallelIterator + } + + protected trait Transformer[R, Tp] extends Accessor[R, Tp] with super.Transformer[R, Tp] + + protected[this] class SegmentLength(pred: T => Boolean, from: Int, val pit: ParallelIterator) + extends Accessor[(Int, Boolean), SegmentLength] { + var result: (Int, Boolean) = null + def leaf(prev: Option[(Int, Boolean)]) = if (from < pit.indexFlag) { + val itsize = pit.remaining + val seglen = pit.prefixLength(pred) + result = (seglen, itsize == seglen) + if (!result._2) pit.setIndexFlagIfLesser(from) + } else result = (0, false) + def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException + override def split = { + val pits = pit.split + for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new SegmentLength(pred, from + untilp, p) + } + override def merge(that: SegmentLength) = if (result._2) result = (result._1 + that.result._1, that.result._2) + } + + protected[this] class IndexWhere(pred: T => Boolean, from: Int, val pit: ParallelIterator) + extends Accessor[Int, IndexWhere] { + var result: Int = -1 + def leaf(prev: Option[Int]) = if (from < pit.indexFlag) { + val r = pit.indexWhere(pred) + if (r != -1) { + result = from + r + pit.setIndexFlagIfLesser(from) + } + } + def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException + override def split = { + val pits = pit.split + for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield new IndexWhere(pred, untilp, p) + } + override def merge(that: IndexWhere) = result = if (result == -1) that.result else { + if (that.result != -1) result min that.result else result + } + } + + protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, val pit: ParallelIterator) + extends Accessor[Int, LastIndexWhere] { + var result: Int = -1 + def leaf(prev: Option[Int]) = if (pos > pit.indexFlag) { + val r = pit.lastIndexWhere(pred) + if (r != -1) { + result = pos + r + pit.setIndexFlagIfGreater(pos) + } + } + def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException + override def split = { + val pits = pit.split + for ((p, untilp) <- pits zip pits.scanLeft(pos)(_ + _.remaining)) yield new LastIndexWhere(pred, untilp, p) + } + override def merge(that: LastIndexWhere) = result = if (result == -1) that.result else { + if (that.result != -1) result max that.result else result + } + } + + protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], val pit: ParallelIterator) + extends Transformer[Combiner[U, This], Reverse[U, This]] { + var result: Combiner[U, This] = null + def leaf(prev: Option[Combiner[U, This]]) = result = pit.reverse2combiner(reuse(prev, cbf())) + def newSubtask(p: SuperParallelIterator) = new Reverse(cbf, down(p)) + override def merge(that: Reverse[U, This]) = result = that.result combine result + } + + protected[this] class ReverseMap[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator) + extends Transformer[Combiner[S, That], ReverseMap[S, That]] { + var result: Combiner[S, That] = null + def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf) // TODO + def newSubtask(p: SuperParallelIterator) = new ReverseMap(f, pbf, down(p)) + override def merge(that: ReverseMap[S, That]) = result = that.result combine result + } + + protected[this] class SameElements[U >: T](val pit: ParallelIterator, val otherpit: PreciseSplitter[U]) + extends Accessor[Boolean, SameElements[U]] { + var result: Boolean = true + def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { + result = pit.sameElements(otherpit) + if (!result) pit.abort + } + def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException + override def split = { + val fp = pit.remaining / 2 + val sp = pit.remaining - fp + for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new SameElements(p, op) + } + override def merge(that: SameElements[U]) = result = result && that.result + } + + protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], val pit: ParallelIterator) + extends Transformer[Combiner[U, That], Updated[U, That]] { + var result: Combiner[U, That] = null + def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf) // TODO + def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException + override def split = { + val pits = pit.split + for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Updated(pos - untilp, elem, pbf, p) + } + override def merge(that: Updated[U, That]) = result = result combine that.result + } + + protected[this] class Corresponds[S](corr: (T, S) => Boolean, val pit: ParallelIterator, val otherpit: PreciseSplitter[S]) + extends Accessor[Boolean, Corresponds[S]] { + var result: Boolean = true + def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { + result = pit.corresponds(corr)(otherpit) + if (!result) pit.abort + } + def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException + override def split = { + val fp = pit.remaining / 2 + val sp = pit.remaining - fp + for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new Corresponds(corr, p, op) + } + override def merge(that: Corresponds[S]) = result = result && that.result + } + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/ParallelSeqView.scala b/src/library/scala/collection/parallel/ParallelSeqView.scala new file mode 100644 index 0000000000..7862e99f44 --- /dev/null +++ b/src/library/scala/collection/parallel/ParallelSeqView.scala @@ -0,0 +1,64 @@ +package scala.collection.parallel + + + + +import scala.collection.TraversableView +import scala.collection.SeqView +import scala.collection.Parallel +import scala.collection.generic.CanCombineFrom + + + + + +/** A template view of a non-strict view of a parallel sequence. + * + * @tparam T + * @tparam Coll + * + * @since 2.8 + */ +trait ParallelSeqView[+T, +Coll <: Parallel, +CollSeq] +extends ParallelSeqViewLike[T, Coll, CollSeq, ParallelSeqView[T, Coll, CollSeq], SeqView[T, CollSeq]] + + + +object ParallelSeqView { + abstract class NoCombiner[T] extends Combiner[T, Nothing] { + self: EnvironmentPassingCombiner[T, Nothing] => + def +=(elem: T): this.type = this + def iterator: Iterator[T] = Iterator.empty + def result() = throw new UnsupportedOperationException("ParallelSeqView.Combiner.result") + def size = throw new UnsupportedOperationException("ParallelSeqView.Combiner.size") + def clear() {} + def combine[N <: T, NewTo >: Nothing](other: Combiner[N, NewTo]) = + throw new UnsupportedOperationException("ParallelSeqView.Combiner.result") + } + + type Coll = ParallelSeqView[_, C, _] forSome { type C <: ParallelSeq[_] } + + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] = + new CanCombineFrom[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] { + def apply(from: Coll) = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing] + def apply() = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing] + } +} + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/ParallelSeqViewLike.scala b/src/library/scala/collection/parallel/ParallelSeqViewLike.scala new file mode 100644 index 0000000000..eab4d7ad5f --- /dev/null +++ b/src/library/scala/collection/parallel/ParallelSeqViewLike.scala @@ -0,0 +1,192 @@ +package scala.collection.parallel + + + + + +import scala.collection.SeqView +import scala.collection.SeqViewLike +import scala.collection.Parallel +import scala.collection.generic.CanBuildFrom +import scala.collection.generic.CanCombineFrom + + + + + + + +/** A template view of a non-strict view of parallel sequence. + * + * @tparam T the type of the elements in this view + * @tparam Coll type of the collection this view is derived from + * @tparam CollSeq TODO + * @tparam This actual representation type of this view + * @tparam ThisSeq type of the sequential version of this view + * + * @since 2.8 + */ +trait ParallelSeqViewLike[+T, + +Coll <: Parallel, + +CollSeq, + +This <: ParallelSeqView[T, Coll, CollSeq] with ParallelSeqViewLike[T, Coll, CollSeq, This, ThisSeq], + +ThisSeq <: SeqView[T, CollSeq] with SeqViewLike[T, CollSeq, ThisSeq]] +extends SeqView[T, Coll] + with SeqViewLike[T, Coll, This] + with ParallelIterableView[T, Coll, CollSeq] + with ParallelIterableViewLike[T, Coll, CollSeq, This, ThisSeq] + with ParallelSeq[T] + with ParallelSeqLike[T, This, ThisSeq] +{ + self => + + type SCPI = SignalContextPassingIterator[ParallelIterator] + + trait Transformed[+S] extends ParallelSeqView[S, Coll, CollSeq] + with super[ParallelIterableView].Transformed[S] with super[SeqView].Transformed[S] { + override def parallelIterator = new Elements(0, length) with SCPI {} + override def iterator = parallelIterator + environment = self.environment + } + + trait Forced[S] extends super.Forced[S] with Transformed[S] { + // cheating here - knowing that `underlying` of `self.seq` is of type `CollSeq`, + // we use it to obtain a view of the correct type - not the most efficient thing + // in the universe, but without making `newForced` more accessible, or adding + // a `forced` method to `SeqView`, this is the best we can do + def seq = self.seq.take(0).++(forced).asInstanceOf[SeqView[S, CollSeq]] + } + + trait Filtered extends super.Filtered with Transformed[T] { + def seq = self.seq filter pred + } + + trait Sliced extends super.Sliced with Transformed[T] { + override def slice(from1: Int, until1: Int): This = newSliced(from1 max 0, until1 max 0).asInstanceOf[This] + def seq = self.seq.slice(from, until) + } + + trait Appended[U >: T] extends super.Appended[U] with Transformed[U] { + def seq = self.seq.++(rest).asInstanceOf[SeqView[U, CollSeq]] + } + + trait Mapped[S] extends super.Mapped[S] with Transformed[S]{ + def seq = self.seq.map(mapping).asInstanceOf[SeqView[S, CollSeq]] + } + + trait FlatMapped[S] extends super.FlatMapped[S] with Transformed[S] { + def seq = self.seq.flatMap(mapping).asInstanceOf[SeqView[S, CollSeq]] + } + + trait TakenWhile extends super.TakenWhile with Transformed[T] { + def seq = self.seq takeWhile pred + } + + trait DroppedWhile extends super.DroppedWhile with Transformed[T] { + def seq = self.seq dropWhile pred + } + + trait Zipped[S] extends super.Zipped[S] with Transformed[(T, S)] { + def seq = (self.seq zip other).asInstanceOf[SeqView[(T, S), CollSeq]] + } + + trait ZippedAll[T1 >: T, S] extends super.ZippedAll[T1, S] with Transformed[(T1, S)] { + def seq = self.seq.zipAll(other, thisElem, thatElem).asInstanceOf[SeqView[(T1, S), CollSeq]] + } + + trait Reversed extends super.Reversed with Transformed[T] { + def seq = self.seq.reverse + } + + trait Patched[U >: T] extends super.Patched[U] with Transformed[U] { + def seq = self.seq.patch(from, patch, replaced).asInstanceOf[SeqView[U, CollSeq]] + } + + trait Prepended[U >: T] extends super.Prepended[U] with Transformed[U] { + def seq = (fst +: self.seq).asInstanceOf[SeqView[U, CollSeq]] + } + + protected override def newFiltered(p: T => Boolean): Transformed[T] = new Filtered { val pred = p } + protected override def newSliced(f: Int, u: Int): Transformed[T] = new Sliced { val from = f; val until = u } + protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = new Appended[U] { val rest = that } + protected override def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f } + protected override def newFlatMapped[S](f: T => Traversable[S]): Transformed[S] = new FlatMapped[S] { val mapping = f } + protected override def newDroppedWhile(p: T => Boolean): Transformed[T] = new DroppedWhile { val pred = p } + protected override def newTakenWhile(p: T => Boolean): Transformed[T] = new TakenWhile { val pred = p } + protected override def newZipped[S](that: Iterable[S]): Transformed[(T, S)] = new Zipped[S] { val other = that } + protected override def newZippedAll[T1 >: T, S](that: Iterable[S], _thisElem: T1, _thatElem: S): Transformed[(T1, S)] = new ZippedAll[T1, S] { val other = that; val thisElem = _thisElem; val thatElem = _thatElem } + protected override def newReversed: Transformed[T] = new Reversed { } + protected override def newPatched[U >: T](_from: Int, _patch: Seq[U], _replaced: Int): Transformed[U] = new Patched[U] { val from = _from; val patch = _patch; val replaced = _replaced } + protected override def newPrepended[U >: T](elem: U): Transformed[U] = new Prepended[U] { protected[this] val fst = elem } + + override def filter(p: T => Boolean): This = newFiltered(p).asInstanceOf[This] + override def filterNot(p: T => Boolean): This = newFiltered(!p(_)).asInstanceOf[This] + override def partition(p: T => Boolean): (This, This) = (filter(p), filterNot(p)) + override def slice(from: Int, until: Int): This = newSliced(from, until).asInstanceOf[This] + override def take(n: Int): This = newSliced(0, n).asInstanceOf[This] + override def drop(n: Int): This = newSliced(n, length).asInstanceOf[This] + override def splitAt(n: Int): (This, This) = (take(n), drop(n)) + override def ++[U >: T, That](xs: TraversableOnce[U])(implicit bf: CanBuildFrom[This, U, That]): That = newAppended(xs.toTraversable).asInstanceOf[That] + override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That] + override def flatMap[S, That](f: T => Traversable[S])(implicit bf: CanBuildFrom[This, S, That]): That = newFlatMapped(f).asInstanceOf[That] + override def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[This, S, That]): That = filter(pf.isDefinedAt).map(pf)(bf) + override def takeWhile(p: T => Boolean): This = newTakenWhile(p).asInstanceOf[This] + override def dropWhile(p: T => Boolean): This = newDroppedWhile(p).asInstanceOf[This] + override def span(p: T => Boolean): (This, This) = (takeWhile(p), dropWhile(p)) + override def scanLeft[S, That](z: S)(op: (S, T) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisSeq.scanLeft(z)(op)).asInstanceOf[That] + override def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisSeq.scanRight(z)(op)).asInstanceOf[That] + override def groupBy[K](f: T => K): collection.immutable.Map[K, This] = thisSeq.groupBy(f).mapValues(xs => newForced(xs).asInstanceOf[This]) + override def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = newPrepended(elem).asInstanceOf[That] + override def reverse: This = newReversed.asInstanceOf[This] + override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int)(implicit bf: CanBuildFrom[This, U, That]): That = newPatched(from, patch, replaced).asInstanceOf[That] + override def padTo[U >: T, That](len: Int, elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = patch(length, Seq.fill(len - length)(elem), 0) + override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = reverse.map(f) + override def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = { + require(0 <= index && index < length) + patch(index, List(elem), 1)(bf) + } + override def :+[U >: T, That](elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = ++(Iterator.single(elem))(bf) + override def union[U >: T, That](that: Seq[U])(implicit bf: CanBuildFrom[This, U, That]): That = this ++ that + override def diff[U >: T](that: Seq[U]): This = newForced(thisSeq diff that).asInstanceOf[This] + override def intersect[U >: T](that: Seq[U]): This = newForced(thisSeq intersect that).asInstanceOf[This] + override def sorted[U >: T](implicit ord: Ordering[U]): This = newForced(thisSeq sorted ord).asInstanceOf[This] + + override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf => + executeAndWaitResult(new Force(pbf, parallelIterator) mapResult { _.result }) + } otherwise { + val b = bf(underlying) + b ++= this.iterator + b.result + } + + /* tasks */ + + protected[this] class Force[U >: T, That](cbf: CanCombineFrom[Coll, U, That], val pit: ParallelIterator) + extends Transformer[Combiner[U, That], Force[U, That]] { + var result: Combiner[U, That] = null + def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cbf(self.underlying))) + def newSubtask(p: SuperParallelIterator) = new Force(cbf, down(p)) + override def merge(that: Force[U, That]) = result = result combine that.result + } + +} + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/Splitters.scala b/src/library/scala/collection/parallel/Splitters.scala new file mode 100644 index 0000000000..b3cad6d67a --- /dev/null +++ b/src/library/scala/collection/parallel/Splitters.scala @@ -0,0 +1,86 @@ +package scala.collection.parallel + + +import scala.collection.Seq + + +/** A splitter (or a split iterator) can be split into more splitters that traverse over + * disjoint subsets of elements. + * + * @tparam T type of the elements this parallel iterator traverses + * + * @since 2.8.1 + * @author prokopec + */ +trait Splitter[+T] extends Iterator[T] { + + /** Splits the iterator into a sequence of disjunct views. + * + * Returns a sequence of split iterators, each iterating over some subset of the + * elements in the collection. These subsets are disjoint and should be approximately + * equal in size. These subsets are not empty, unless the iterator is empty in which + * case this method returns a sequence with a single empty iterator. If the iterator has + * more than two elements, this method will return two or more iterators. + * + * Implementors are advised to keep this partition relatively small - two iterators are + * already enough when partitioning the collection, although there may be a few more. + * + * '''Note:''' this method actually invalidates the current iterator. + * + * @return a sequence of disjunct iterators of the collection + */ + def split: Seq[Splitter[T]] + +} + + +/** A precise splitter (or a precise split iterator) can be split into arbitrary number of splitters + * that traverse disjoint subsets of arbitrary sizes. + * + * Implementors might want to override the parameterless `split` method for efficiency. + * + * @tparam T type of the elements this parallel iterator traverses + * + * @since 2.8.1 + * @author prokopec + */ +trait PreciseSplitter[+T] extends Splitter[T] { + + /** Splits the iterator into disjunct views. + * + * This overloaded version of the `split` method is specific to precise parallel iterators. + * It returns a sequence of parallel iterators, each iterating some subset of the + * elements in this iterator. The sizes of the subiterators in the partition is equal to + * the size in the corresponding argument, as long as there are enough elements in this + * iterator to split it that way. + * + * If there aren't enough elements, a zero element iterator is appended for each additional argument. + * If there are additional elements, an additional iterator is appended at the end to compensate. + * + * For example, say we have a parallel iterator `ps` with 100 elements. Invoking: + * {{{ + * ps.split(50, 25, 25, 10, 5) + * }}} + * will return a sequence of five iterators, last two views being empty. On the other hand, calling: + * {{{ + * ps.split(50, 40) + * }}} + * will return a sequence of three iterators, last of them containing ten elements. + * + * '''Note:''' this method actually invalidates the current iterator. + * + * Unlike the case with `split` found in parallel iterable iterators, views returned by this method can be empty. + * + * @param sizes the sizes used to split this split iterator into iterators that traverse disjunct subsets + * @return a sequence of disjunct subsequence iterators of this parallel iterator + */ + def psplit(sizes: Int*): Seq[PreciseSplitter[T]] + + def split: Seq[PreciseSplitter[T]] + +} + + + + + diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala new file mode 100644 index 0000000000..8a072b22aa --- /dev/null +++ b/src/library/scala/collection/parallel/TaskSupport.scala @@ -0,0 +1,27 @@ +package scala.collection.parallel + + + + + + + +trait TaskSupport extends AdaptiveWorkStealingForkJoinTasks + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala new file mode 100644 index 0000000000..3ef60f8c7a --- /dev/null +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -0,0 +1,230 @@ +package scala.collection.parallel + + + + +import scala.concurrent.forkjoin._ + + + + + + + + + + +/** A trait that declares task execution capabilities used + * by parallel collections. Parallel collections inherit a subtrait + * of this trait. + * + * One implementation trait of `TaskExecution` is `ForkJoinTaskExecution`. + */ +trait Tasks { + + /** A task abstraction which allows starting a task with `start`, + * waiting for it to finish with `sync` and attempting to cancel + * the task with `tryCancel`. + * It also defines a method `leaf` which must be called once the + * the task is started and defines what this task actually does. + * Method `split` allows splitting this task into smaller subtasks, + * and method `shouldSplitFurther` decides if the task should be + * partitioned further. + * Method `merge` allows merging the results of the 2 tasks. It updates + * the result of the receiver. + * Finally, it defines the task result of type `U`. + */ + trait Task[R, +Tp] { + def repr = this.asInstanceOf[Tp] + /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */ + def compute + /** Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous task + * or `None` if there was no previous task. + */ + def leaf(result: Option[R]) + /** Start task. */ + def start + /** Wait for task to finish. */ + def sync + /** Try to cancel the task. + * @return `true` if cancellation is successful. + */ + def tryCancel: Boolean + /** A result that can be accessed once the task is completed. */ + def result: R + /** Decides whether or not this task should be split further. */ + def shouldSplitFurther: Boolean + /** Splits this task into a list of smaller tasks. */ + protected[this] def split: Seq[Task[R, Tp]] + /** Read of results of `that` task and merge them into results of this one. */ + protected[this] def merge(that: Tp) {} + } + + type TaskType[R, +Tp] <: Task[R, Tp] + type ExecutionEnvironment + + var environment: ExecutionEnvironment + + /** Executes a task and waits for it to finish. */ + def executeAndWait[R, Tp](task: TaskType[R, Tp]) + + /** Executes a result task, waits for it to finish, then returns its result. */ + def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R + + /** Retrieves the parallelism level of the task execution environment. */ + def parallelismLevel: Int + +} + + +/** This trait implements scheduling by employing + * an adaptive work stealing technique. + */ +trait AdaptiveWorkStealingTasks extends Tasks { + + trait Task[R, Tp] extends super.Task[R, Tp] { + var next: Task[R, Tp] = null + var shouldWaitFor = true + var result: R + + def split: Seq[Task[R, Tp]] + + /** The actual leaf computation. */ + def leaf(result: Option[R]): Unit + + def compute = if (shouldSplitFurther) internal else leaf(None) + + def internal = { + var last = spawnSubtasks + + last.leaf(None) + result = last.result + + while (last.next != null) { + val lastresult = Option(last.result) + last = last.next + if (last.tryCancel) last.leaf(lastresult) else last.sync + merge(last.repr) + } + } + + def spawnSubtasks = { + var last: Task[R, Tp] = null + var head: Task[R, Tp] = this + do { + val subtasks = head.split + head = subtasks.head + for (t <- subtasks.tail) { + t.next = last + last = t + t.start + } + } while (head.shouldSplitFurther); + head.next = last + head + } + + def printChain = { + var curr = this + var chain = "chain: " + while (curr != null) { + chain += curr + " ---> " + curr = curr.next + } + println(chain) + } + } + +} + + +/** + * A trait describing objects that provide a fork/join pool. + */ +trait HavingForkJoinPool { + def forkJoinPool: ForkJoinPool +} + + + +/** An implementation trait for parallel tasks based on the fork/join framework. + * + * @define fjdispatch + * If the current thread is a fork/join worker thread, the task's `fork` method will + * be invoked. Otherwise, the task will be executed on the fork/join pool. + */ +trait ForkJoinTasks extends Tasks with HavingForkJoinPool { + + trait Task[R, +Tp] extends RecursiveAction with super.Task[R, Tp] { + def start = fork + def sync = join + def tryCancel = tryUnfork + var result: R + } + + type TaskType[R, +Tp] = Task[R, Tp] + type ExecutionEnvironment = ForkJoinPool + + /** The fork/join pool of this collection. + */ + def forkJoinPool: ForkJoinPool = environment + var environment = ForkJoinTasks.defaultForkJoinPool + + /** Executes a task on a fork/join pool and waits for it to finish. + * + * $fjdispatch + */ + def executeAndWait[R, Tp](fjtask: Task[R, Tp]) { + if (currentThread.isInstanceOf[ForkJoinWorkerThread]) { + fjtask.fork + } else { + forkJoinPool.execute(fjtask) + } + fjtask.join + } + + /** Executes a task on a fork/join pool and waits for it to finish. + * Returns its result when it does. + * + * $fjdispatch + * + * @return the result of the task + */ + def executeAndWaitResult[R, Tp](fjtask: Task[R, Tp]): R = { + if (currentThread.isInstanceOf[ForkJoinWorkerThread]) { + fjtask.fork + } else { + forkJoinPool.execute(fjtask) + } + fjtask.join + fjtask.result + } + + def parallelismLevel = forkJoinPool.getParallelism + +} + +object ForkJoinTasks { + val defaultForkJoinPool = new ForkJoinPool + defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors) + defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors) +} + + +/* Some boilerplate due to no deep mixin composition. Not sure if it can be done differently without them. + */ +trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks { + + trait Task[R, Tp] extends super[ForkJoinTasks].Task[R, Tp] with super[AdaptiveWorkStealingTasks].Task[R, Tp] + +} + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala b/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala new file mode 100644 index 0000000000..e29e9dfa98 --- /dev/null +++ b/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala @@ -0,0 +1,139 @@ +package scala.collection.parallel.immutable + + + + + + + +import scala.collection.parallel.ParallelMap +import scala.collection.parallel.ParallelMapLike +import scala.collection.parallel.Combiner +import scala.collection.parallel.EnvironmentPassingCombiner +import scala.collection.generic.ParallelMapFactory +import scala.collection.generic.CanCombineFrom +import scala.collection.generic.GenericParallelMapTemplate +import scala.collection.generic.GenericParallelMapCompanion +import scala.collection.immutable.HashMap + + + + + + +class ParallelHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V]) +extends ParallelMap[K, V] + with GenericParallelMapTemplate[K, V, ParallelHashTrie] + with ParallelMapLike[K, V, ParallelHashTrie[K, V], HashMap[K, V]] +{ +self => + + def this() = this(HashMap.empty[K, V]) + + override def mapCompanion: GenericParallelMapCompanion[ParallelHashTrie] = ParallelHashTrie + + override def empty: ParallelHashTrie[K, V] = new ParallelHashTrie[K, V] + + def parallelIterator = new ParallelHashTrieIterator(trie) with SCPI + + def seq = trie + + def -(k: K) = new ParallelHashTrie(trie - k) + + def +[U >: V](kv: (K, U)) = new ParallelHashTrie(trie + kv) + + def get(k: K) = trie.get(k) + + override def size = trie.size + + protected override def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]) = oldc match { + case Some(old) => old + case None => newc + } + + type SCPI = SignalContextPassingIterator[ParallelHashTrieIterator] + + class ParallelHashTrieIterator(val ht: HashMap[K, V]) + extends super.ParallelIterator { + self: SignalContextPassingIterator[ParallelHashTrieIterator] => + // println("created iterator " + ht) + var i = 0 + lazy val triter = ht.iterator + def split: Seq[ParallelIterator] = { + // println("splitting " + ht + " into " + ht.split.map(new ParallelHashTrieIterator(_) with SCPI).map(_.toList)) + ht.split.map(new ParallelHashTrieIterator(_) with SCPI) + } + def next: (K, V) = { + // println("taking next after " + i + ", in " + ht) + i += 1 + triter.next + } + def hasNext: Boolean = { + // println("hasNext: " + i + ", " + ht.size + ", " + ht) + i < ht.size + } + def remaining = ht.size - i + } + +} + + +object ParallelHashTrie extends ParallelMapFactory[ParallelHashTrie] { + def empty[K, V]: ParallelHashTrie[K, V] = new ParallelHashTrie[K, V] + + def newCombiner[K, V]: Combiner[(K, V), ParallelHashTrie[K, V]] = HashTrieCombiner[K, V] + + implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParallelHashTrie[K, V]] = { + new CanCombineFromMap[K, V] + } + + def fromTrie[K, V](trie: HashMap[K, V]): ParallelHashTrie[K, V] = new ParallelHashTrie(trie) + + var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0) +} + + +trait HashTrieCombiner[K, V] +extends Combiner[(K, V), ParallelHashTrie[K, V]] { +self: EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] => + private var trie: HashMap[K, V] = HashMap.empty[K, V] + + def size: Int = trie.size + + def clear = trie = HashMap.empty[K, V] + + def +=(elem: (K, V)) = { trie += elem; this } + + def combine[N <: (K, V), NewTo >: ParallelHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { + // ParallelHashTrie.totalcombines.incrementAndGet + if (other.isInstanceOf[HashTrieCombiner[_, _]]) { + val that = other.asInstanceOf[HashTrieCombiner[K, V]] + val ncombiner = HashTrieCombiner[K, V] + ncombiner.trie = this.trie merge that.trie + ncombiner + } else error("Unexpected combiner type.") + } else this + + def result = new ParallelHashTrie[K, V](trie) + +} + + +object HashTrieCombiner { + def apply[K, V] = new HashTrieCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] {} +} + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/immutable/ParallelIterable.scala b/src/library/scala/collection/parallel/immutable/ParallelIterable.scala new file mode 100644 index 0000000000..92bf5ab706 --- /dev/null +++ b/src/library/scala/collection/parallel/immutable/ParallelIterable.scala @@ -0,0 +1,56 @@ +package scala.collection.parallel.immutable + + +import scala.collection.generic._ + +import scala.collection.parallel.ParallelIterableLike +import scala.collection.parallel.Combiner + + + + + +// TODO uncomment when we add parallel vectors + +///** A template trait for immutable parallel iterable collections. +// * +// * $paralleliterableinfo +// * +// * $sideeffects +// * +// * @tparam A the element type of the collection +// * +// * @author prokopec +// * @since 2.8 +// */ +//trait ParallelIterable[A] extends collection.immutable.Iterable[A] +// with collection.parallel.ParallelIterable[A] +// with GenericParallelTemplate[A, ParallelIterable] +// with ParallelIterableLike[A, ParallelIterable[A], Iterable[A]] { +// override def companion: GenericCompanion[ParallelIterable] with GenericParallelCompanion[ParallelIterable] = ParallelIterable +//} +// +///** $factoryinfo +// */ +//object ParallelIterable extends ParallelFactory[ParallelIterable] { +// implicit def canBuildFrom[A]: CanBuildFromParallel[Coll, A, ParallelIterable[A]] = +// new GenericCanBuildFromParallel[A] +// +// def newBuilder[A]: Combiner[A, ParallelIterable[A]] = null // TODO +// +// def newCombiner[A]: Combiner[A, ParallelIterable[A]] = null // TODO +//} + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/immutable/ParallelRange.scala b/src/library/scala/collection/parallel/immutable/ParallelRange.scala new file mode 100644 index 0000000000..85a33c7431 --- /dev/null +++ b/src/library/scala/collection/parallel/immutable/ParallelRange.scala @@ -0,0 +1,88 @@ +package scala.collection.parallel.immutable + + + +import scala.collection.immutable.Range +import scala.collection.immutable.RangeUtils +import scala.collection.parallel.ParallelSeq +import scala.collection.parallel.Combiner +import scala.collection.generic.CanCombineFrom + + + +class ParallelRange(val start: Int, val end: Int, val step: Int, val inclusive: Boolean) +extends ParallelSeq[Int] + with RangeUtils[ParallelRange] { + self => + + def seq = new Range(start, end, step) + + def length = _length + + def apply(idx: Int) = _apply(idx) + + def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = new ParallelRange(_start, _end, _step, _inclusive) + + def parallelIterator = new ParallelRangeIterator with SCPI + + override def toString = seq.toString // TODO + + type SCPI = SignalContextPassingIterator[ParallelRangeIterator] + + class ParallelRangeIterator + (var start: Int = self.start, val end: Int = self.end, val step: Int = self.step, val inclusive: Boolean = self.inclusive) + extends ParallelIterator with RangeUtils[ParallelRangeIterator] { + me: SignalContextPassingIterator[ParallelRangeIterator] => + def remaining = _length + def next = { val r = start; start += step; r } + def hasNext = remaining > 0 + def split: Seq[ParallelIterator] = psplit(remaining / 2, remaining - remaining / 2) + def psplit(sizes: Int*): Seq[ParallelIterator] = { + val incr = sizes.scanLeft(0)(_ + _) + for ((from, until) <- incr.init zip incr.tail) yield _slice(from, until) + } + def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = { + new ParallelRangeIterator(_start, _end, _step, _inclusive) with SCPI + } + + override def toString = "ParallelRangeIterator(" + start + ", " + end + ", " + step + ", incl: " + inclusive + ")" + + /* accessors */ + + override def foreach[U](f: Int => U): Unit = { + _foreach(f) + start = end + step + } + + override def reduce[U >: Int](op: (U, U) => U): U = { + var sum = next + for (elem <- this) sum += elem + sum + } + + /* transformers */ + + override def map2combiner[S, That](f: Int => S, cb: Combiner[S, That]): Combiner[S, That] = { + //val cb = pbf(self.repr) + val sz = remaining + cb.sizeHint(sz) + if (sz > 0) { + val last = _last + while (start != last) { + f(start) + start += step + } + } + cb + } + + } + +} + + + + + + + diff --git a/src/library/scala/collection/parallel/immutable/ParallelSeq.scala b/src/library/scala/collection/parallel/immutable/ParallelSeq.scala new file mode 100644 index 0000000000..ceb0dcc13d --- /dev/null +++ b/src/library/scala/collection/parallel/immutable/ParallelSeq.scala @@ -0,0 +1,47 @@ +package scala.collection.parallel.immutable + + +import scala.collection.generic.GenericParallelTemplate +import scala.collection.generic.GenericCompanion +import scala.collection.generic.GenericParallelCompanion +import scala.collection.generic.CanCombineFrom +import scala.collection.generic.ParallelFactory +import scala.collection.parallel.ParallelSeqLike +import scala.collection.parallel.Combiner + + + + + + +// TODO uncomment when we add parallel vectors + +///** An immutable variant of `ParallelSeq`. +// * +// * @define Coll mutable.ParallelSeq +// * @define coll mutable parallel sequence +// */ +//trait ParallelSeq[A] extends collection.immutable.IndexedSeq[A] +// with ParallelIterable[A] +// with collection.parallel.ParallelSeq[A] +// with GenericParallelTemplate[A, ParallelSeq] +// with ParallelSeqLike[A, ParallelSeq[A], Seq[A]] { +// override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq +// +//} +// +// +///** $factoryInfo +// * @define Coll mutable.ParallelSeq +// * @define coll mutable parallel sequence +// */ +//object ParallelSeq extends ParallelFactory[ParallelSeq] { +// implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeq[T]] = new GenericCanBuildFromParallel[T] +// +// def newBuilder[A]: Combiner[A, ParallelSeq[A]] = null // TODO +// +// def newCombiner[A]: Combiner[A, ParallelSeq[A]] = null // TODO +//} + + + diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala new file mode 100644 index 0000000000..054786afaf --- /dev/null +++ b/src/library/scala/collection/parallel/immutable/package.scala @@ -0,0 +1,56 @@ +package scala.collection.parallel + + + + + + + + + + + +package object immutable { + + /** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method. + * + * @tparam T type of the elements + * @param elem the element in the repetition + * @param length the length of the collection + */ + private[parallel] class Repetition[T](elem: T, val length: Int) extends ParallelSeq[T] { + self => + + def apply(idx: Int) = if (0 <= idx && idx < length) elem else throw new IndexOutOfBoundsException + def seq = throw new UnsupportedOperationException + def update(idx: Int, elem: T) = throw new UnsupportedOperationException + + type SCPI = SignalContextPassingIterator[ParallelIterator] + + class ParallelIterator(var i: Int = 0, val until: Int = length, elem: T = self.elem) extends super.ParallelIterator { + me: SignalContextPassingIterator[ParallelIterator] => + def remaining = until - i + def hasNext = i < until + def next = { i += 1; elem } + def psplit(sizes: Int*) = { + val incr = sizes.scanLeft(0)(_ + _) + for ((start, end) <- incr.init zip incr.tail) yield new ParallelIterator(i + start, (i + end) min until, elem) with SCPI + } + def split = psplit(remaining / 2, remaining - remaining / 2) + } + + def parallelIterator = new ParallelIterator with SCPI + + } + +} + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/mutable/LazyCombiner.scala b/src/library/scala/collection/parallel/mutable/LazyCombiner.scala new file mode 100644 index 0000000000..bd17d24ea8 --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/LazyCombiner.scala @@ -0,0 +1,43 @@ +package scala.collection.parallel.mutable + + + + +import scala.collection.generic.Growable +import scala.collection.generic.Sizing +import scala.collection.mutable.ArrayBuffer +import scala.collection.parallel.Combiner + + + + +/** Implements combining contents of two combiners + * by postponing the operation until `result` method is called. It chains + * the leaf results together instead of evaluating the actual collection. + * + * @tparam Elem the type of the elements in the combiner + * @tparam To the type of the collection the combiner produces + * @tparam Buff the type of the buffers that contain leaf results and this combiner chains together + */ +trait LazyCombiner[Elem, +To, Buff <: Growable[Elem] with Sizing] extends Combiner[Elem, To] +{ + self: collection.parallel.EnvironmentPassingCombiner[Elem, To] => + val chain: ArrayBuffer[Buff] + val lastbuff = chain.last + def +=(elem: Elem) = { lastbuff += elem; this } + def result: To = allocateAndCopy + def clear = { chain.clear } + def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { + if (other.isInstanceOf[LazyCombiner[_, _, _]]) { + val that = other.asInstanceOf[LazyCombiner[Elem, To, Buff]] + newLazyCombiner(chain ++= that.chain) + } else throw new UnsupportedOperationException("Cannot combine with combiner of different type.") + } else this + def size = chain.foldLeft(0)(_ + _.size) + + /** Method that allocates the data structure and copies elements into it using + * `size` and `chain` members. + */ + def allocateAndCopy: To + def newLazyCombiner(buffchain: ArrayBuffer[Buff]): LazyCombiner[Elem, To, Buff] +} diff --git a/src/library/scala/collection/parallel/mutable/ParallelArray.scala b/src/library/scala/collection/parallel/mutable/ParallelArray.scala new file mode 100644 index 0000000000..3331c2dfd2 --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParallelArray.scala @@ -0,0 +1,568 @@ +package scala.collection.parallel.mutable + + + +import scala.collection.generic.GenericParallelTemplate +import scala.collection.generic.GenericCompanion +import scala.collection.generic.GenericParallelCompanion +import scala.collection.generic.CanCombineFrom +import scala.collection.generic.ParallelFactory +import scala.collection.generic.Sizing +import scala.collection.parallel.Combiner +import scala.collection.parallel.ParallelSeqLike +import scala.collection.parallel.CHECK_RATE +import scala.collection.mutable.ArraySeq +import scala.collection.mutable.Builder +import scala.collection.Sequentializable + + + + +/** Parallel sequence holding elements in a linear array. + * + * `ParallelArray` is a parallel sequence with a predefined size. The size of the array + * cannot be changed after it's been created. + * + * `ParallelArray` internally keeps an array containing the elements. This means that + * bulk operations based on traversal are fast, but those returning a parallel array as a result + * are slightly slower. The reason for this is that `ParallelArray` uses lazy builders that + * create the internal data array only after the size of the array is known. The fragments + * are then copied into the resulting data array in parallel using fast array copy operations. + * Operations for which the resulting array size is known in advance are optimised to use this + * information. + * + * @tparam T type of the elements in the array + * + * @define Coll ParallelArray + * @define coll parallel array + */ +class ParallelArray[T] private[mutable] (val arrayseq: ArraySeq[T]) +extends ParallelSeq[T] + with GenericParallelTemplate[T, ParallelArray] + with ParallelSeqLike[T, ParallelArray[T], ArraySeq[T]] +{ + self => + + private val array: Array[Any] = arrayseq.array.asInstanceOf[Array[Any]] + + override def companion: GenericCompanion[ParallelArray] with GenericParallelCompanion[ParallelArray] = ParallelArray + + def this(sz: Int) = this { + require(sz >= 0) + new ArraySeq[T](sz) + } + + def apply(i: Int) = array(i).asInstanceOf[T] + + def update(i: Int, elem: T) = array(i) = elem + + def length = arrayseq.length + + def seq = arrayseq + + type SCPI = SignalContextPassingIterator[ParallelArrayIterator] + + def parallelIterator: ParallelArrayIterator = { + val pit = new ParallelArrayIterator with SCPI + pit + } + + class ParallelArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array) + extends super.ParallelIterator { + me: SignalContextPassingIterator[ParallelArrayIterator] => + + def hasNext = i < until + + def next = { + val elem = arr(i) + i += 1 + elem.asInstanceOf[T] + } + + def remaining = until - i + + def psplit(sizesIncomplete: Int*): Seq[ParallelIterator] = { + var traversed = i + val total = sizesIncomplete.reduceLeft(_ + _) + val left = remaining + val sizes = if (total >= left) sizesIncomplete else sizesIncomplete :+ (left - total) + for (sz <- sizes) yield if (traversed < until) { + val start = traversed + val end = (traversed + sz) min until + traversed = end + new ParallelArrayIterator(start, end, arr) with SCPI + } else { + new ParallelArrayIterator(traversed, traversed, arr) with SCPI + } + } + + override def split: Seq[ParallelIterator] = { + val left = remaining + if (left >= 2) { + val splitpoint = left / 2 + Seq(new ParallelArrayIterator(i, i + splitpoint, arr) with SCPI, + new ParallelArrayIterator(i + splitpoint, until, arr) with SCPI) + } else { + Seq(this) + } + } + + override def toString = "ParallelArrayIterator(" + i + ", " + until + ")" + + /* overrides for efficiency */ + + /* accessors */ + + override def foreach[U](f: T => U) = { + foreach_quick(f, arr, until, i) + i = until + } + + private def foreach_quick[U](f: T => U, a: Array[Any], ntil: Int, from: Int) = { + var j = from + while (j < ntil) { + f(a(j).asInstanceOf[T]) + j += 1 + } + } + + override def count(p: T => Boolean) = { + val c = count_quick(p, arr, until, i) + i = until + c + } + + private def count_quick(p: T => Boolean, a: Array[Any], ntil: Int, from: Int) = { + var cnt = 0 + var j = from + while (j < ntil) { + if (p(a(j).asInstanceOf[T])) cnt += 1 + j += 1 + } + cnt + } + + override def foldLeft[S](z: S)(op: (S, T) => S): S = { + val r = foldLeft_quick(arr, until, op, z) + i = until + r + } + + private def foldLeft_quick[S](a: Array[Any], ntil: Int, op: (S, T) => S, z: S): S = { + var j = i + var sum = z + while (j < ntil) { + sum = op(sum, a(j).asInstanceOf[T]) + j += 1 + } + sum + } + + def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = foldLeft[S](z)(seqop) + + override def sum[U >: T](implicit num: Numeric[U]): U = { + var s = sum_quick(num, arr, until, i, num.zero) + i = until + s + } + + private def sum_quick[U >: T](num: Numeric[U], a: Array[Any], ntil: Int, from: Int, zero: U): U = { + var j = from + var sum = zero + while (j < ntil) { + sum = num.plus(sum, a(j).asInstanceOf[T]) + j += 1 + } + sum + } + + override def product[U >: T](implicit num: Numeric[U]): U = { + var p = product_quick(num, arr, until, i, num.one) + i = until + p + } + + private def product_quick[U >: T](num: Numeric[U], a: Array[Any], ntil: Int, from: Int, one: U): U = { + var j = from + var prod = one + while (j < ntil) { + prod = num.times(prod, a(j).asInstanceOf[T]) + j += 1 + } + prod + } + + override def forall(p: T => Boolean): Boolean = { + if (isAborted) return false + + var all = true + while (i < until) { + val nextuntil = if (i + CHECK_RATE > until) until else i + CHECK_RATE + + all = forall_quick(p, array, nextuntil, i) + if (all) i = nextuntil + else { + i = until + abort + } + + if (isAborted) return false + } + all + } + + // it's faster to use a separate small method + private def forall_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Boolean = { + var j = start + while (j < nextuntil) { + if (p(a(j).asInstanceOf[T])) j += 1 + else return false + } + return true + } + + override def exists(p: T => Boolean): Boolean = { + if (isAborted) return true + + var some = false + while (i < until) { + val nextuntil = if (i + CHECK_RATE > until) until else i + CHECK_RATE + + some = exists_quick(p, array, nextuntil, i) + if (some) { + i = until + abort + } else i = nextuntil + + if (isAborted) return true + } + some + } + + // faster to use separate small method + private def exists_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Boolean = { + var j = start + while (j < nextuntil) { + if (p(a(j).asInstanceOf[T])) return true + else j += 1 + } + return false + } + + override def find(p: T => Boolean): Option[T] = { + if (isAborted) return None + + var r: Option[T] = None + while (i < until) { + val nextuntil = if ((i + CHECK_RATE) < until) (i + CHECK_RATE) else until + + r = find_quick(p, array, nextuntil, i) + + if (r != None) { + i = until + abort + } else i = nextuntil + + if (isAborted) return r + } + r + } + + private def find_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Option[T] = { + var j = start + while (j < nextuntil) { + val elem = a(j).asInstanceOf[T] + if (p(elem)) return Some(elem) + else j += 1 + } + return None + } + + override def drop(n: Int): ParallelArrayIterator = { + i += n + this + } + + override def copyToArray[U >: T](array: Array[U], from: Int, len: Int) { + val totallen = (self.length - i) min len min (array.length - from) + Array.copy(arr, i, array, from, totallen) + i += totallen + } + + override def prefixLength(pred: T => Boolean): Int = { + val r = prefixLength_quick(pred, arr, until, i) + i += r + 1 + r + } + + private def prefixLength_quick(pred: T => Boolean, a: Array[Any], ntil: Int, startpos: Int): Int = { + var j = startpos + var endpos = ntil + while (j < endpos) { + if (pred(a(j).asInstanceOf[T])) j += 1 + else endpos = j + } + endpos - startpos + } + + override def indexWhere(pred: T => Boolean): Int = { + val r = indexWhere_quick(pred, arr, until, i) + val ret = if (r != -1) r - i else r + i = until + ret + } + + private def indexWhere_quick(pred: T => Boolean, a: Array[Any], ntil: Int, from: Int): Int = { + var j = from + var pos = -1 + while (j < ntil) { + if (pred(a(j).asInstanceOf[T])) { + pos = j + j = ntil + } else j += 1 + } + pos + } + + override def lastIndexWhere(pred: T => Boolean): Int = { + val r = lastIndexWhere_quick(pred, arr, i, until) + val ret = if (r != -1) r - i else r + i = until + ret + } + + private def lastIndexWhere_quick(pred: T => Boolean, a: Array[Any], from: Int, ntil: Int): Int = { + var pos = -1 + var j = ntil - 1 + while (j >= from) { + if (pred(a(j).asInstanceOf[T])) { + pos = j + j = -1 + } else j -= 1 + } + pos + } + + override def sameElements(that: Iterator[_]): Boolean = { + var same = true + while (i < until && that.hasNext) { + if (arr(i) != that.next) { + i = until + same = false + } + i += 1 + } + same + } + + /* transformers */ + + override def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = { + //val cb = cbf(self.repr) + cb.sizeHint(remaining) + map2combiner_quick(f, arr, cb, until, i) + i = until + cb + } + + private def map2combiner_quick[S, That](f: T => S, a: Array[Any], cb: Builder[S, That], ntil: Int, from: Int) { + var j = from + while (j < ntil) { + cb += f(a(j).asInstanceOf[T]) + j += 1 + } + } + + override def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[ParallelArray[T], S, That]): Combiner[S, That] = { + val cb = pbf(self.repr) + collect2combiner_quick(pf, arr, cb, until, i) + i = until + cb + } + + private def collect2combiner_quick[S, That](pf: PartialFunction[T, S], a: Array[Any], cb: Builder[S, That], ntil: Int, from: Int) { + var j = from + while (j < ntil) { + val curr = a(j).asInstanceOf[T] + if (pf.isDefinedAt(curr)) cb += pf(curr) + j += 1 + } + } + + override def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[ParallelArray[T], S, That]): Combiner[S, That] = { + val cb = pbf(self.repr) + while (i < until) { + val traversable = f(arr(i).asInstanceOf[T]) + if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator + else cb ++= traversable + i += 1 + } + cb + } + + override def filter2combiner[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Combiner[U, This]) = { + filter2combiner_quick(pred, cb, arr, until, i) + i = until + cb + } + + private def filter2combiner_quick[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Builder[U, This], a: Array[Any], ntil: Int, from: Int) { + var j = i + while(j < ntil) { + var curr = a(j).asInstanceOf[T] + if (pred(curr)) cb += curr + j += 1 + } + } + + override def filterNot2combiner[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Combiner[U, This]) = { + filterNot2combiner_quick(pred, cb, arr, until, i) + i = until + cb + } + + private def filterNot2combiner_quick[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Builder[U, This], a: Array[Any], ntil: Int, from: Int) { + var j = i + while(j < ntil) { + var curr = a(j).asInstanceOf[T] + if (!pred(curr)) cb += curr + j += 1 + } + } + + override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = { + cb.sizeHint(remaining) + cb.ifIs[ParallelArrayCombiner[T]] { pac => + val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]] + Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i) + pac.lastbuff.setInternalSize(remaining) + } otherwise { + copy2builder_quick(cb, arr, until, i) + i = until + } + cb + } + + private def copy2builder_quick[U >: T, Coll](b: Builder[U, Coll], a: Array[Any], ntil: Int, from: Int) { + var j = from + while (j < ntil) { + b += a(j).asInstanceOf[T] + j += 1 + } + } + + override def partition2combiners[U >: T, This >: ParallelArray[T]](pred: T => Boolean, btrue: Combiner[U, This], bfalse: Combiner[U, This]) = { + partition2combiners_quick(pred, btrue, bfalse, arr, until, i) + i = until + (btrue, bfalse) + } + + private def partition2combiners_quick[U >: T, This >: ParallelArray[T]](p: T => Boolean, btrue: Builder[U, This], bfalse: Builder[U, This], a: Array[Any], ntil: Int, from: Int) { + var j = from + while (j < ntil) { + val curr = a(j).asInstanceOf[T] + if (p(curr)) btrue += curr else bfalse += curr + j += 1 + } + } + + override def take2combiner[U >: T, This >: ParallelArray[T]](n: Int, cb: Combiner[U, This]) = { + cb.sizeHint(n) + val ntil = i + n + val a = arr + while (i < ntil) { + cb += a(i).asInstanceOf[T] + i += 1 + } + cb + } + + override def drop2combiner[U >: T, This >: ParallelArray[T]](n: Int, cb: Combiner[U, This]) = { + drop(n) + cb.sizeHint(remaining) + while (i < until) { + cb += arr(i).asInstanceOf[T] + i += 1 + } + cb + } + + override def reverse2combiner[U >: T, This >: ParallelArray[T]](cb: Combiner[U, This]): Combiner[U, This] = { + cb.ifIs[ParallelArrayCombiner[T]] { pac => + val sz = remaining + pac.sizeHint(sz) + val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]] + reverse2combiner_quick(targetarr, arr, i, until) + pac.lastbuff.setInternalSize(sz) + pac + } otherwise super.reverse2combiner(cb) + cb + } + + private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) { + var j = from + var k = ntil - from - 1 + while (j < ntil) { + targ(k) = a(j) + j += 1 + k -= 1 + } + } + + } + +} + + + + + +object ParallelArray extends ParallelFactory[ParallelArray] { + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelArray[T]] = new GenericCanCombineFrom[T] + def newBuilder[T]: Combiner[T, ParallelArray[T]] = newCombiner + def newCombiner[T]: Combiner[T, ParallelArray[T]] = ParallelArrayCombiner[T] + + /** Creates a new parallel array by wrapping the specified array. + */ + def handoff[T](arr: Array[T]): ParallelArray[T] = wrapOrRebuild(arr, arr.length) + + /** Creates a new parallel array by wrapping a part of the specified array. + */ + def handoff[T](arr: Array[T], sz: Int): ParallelArray[T] = wrapOrRebuild(arr, sz) + + private def wrapOrRebuild[T](arr: AnyRef, sz: Int) = arr match { + case arr: Array[AnyRef] => new ParallelArray[T](new ExposedArraySeq[T](arr, sz)) + case _ => new ParallelArray[T](new ExposedArraySeq[T](runtime.ScalaRunTime.toObjectArray(arr), sz)) + } + + def createFromCopy[T <: AnyRef : ClassManifest](arr: Array[T]): ParallelArray[T] = { + val newarr = new Array[T](arr.length) + Array.copy(arr, 0, newarr, 0, arr.length) + handoff(newarr) + } + +} + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/mutable/ParallelArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParallelArrayCombiner.scala new file mode 100644 index 0000000000..2991344be2 --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParallelArrayCombiner.scala @@ -0,0 +1,105 @@ +package scala.collection.parallel.mutable + + + + + +import scala.collection.generic.Sizing +import scala.collection.mutable.ArraySeq +import scala.collection.mutable.ArrayBuffer +import scala.collection.parallel.TaskSupport +import scala.collection.parallel.EnvironmentPassingCombiner + + + + + + + +trait ParallelArrayCombiner[T] +extends LazyCombiner[T, ParallelArray[T], ExposedArrayBuffer[T]] + with TaskSupport { + self: EnvironmentPassingCombiner[T, ParallelArray[T]] => + + override def sizeHint(sz: Int) = if (chain.length == 1) chain(0).sizeHint(sz) + + def newLazyCombiner(c: ArrayBuffer[ExposedArrayBuffer[T]]) = ParallelArrayCombiner(c) + + def allocateAndCopy = if (chain.size > 1) { + val arrayseq = new ArraySeq[T](size) + val array = arrayseq.array.asInstanceOf[Array[Any]] + + executeAndWait(new CopyChainToArray(array, 0, size)) + + new ParallelArray(arrayseq) + } else { // optimisation if there is only 1 array + val pa = new ParallelArray(new ExposedArraySeq[T](chain(0).internalArray, size)) + pa + } + + override def toString = "ParallelArrayCombiner(" + size + "): " + chain + + /* tasks */ + + class CopyChainToArray(array: Array[Any], offset: Int, howmany: Int) extends super.Task[Unit, CopyChainToArray] { + var result = () + def leaf(prev: Option[Unit]) = if (howmany > 0) { + var totalleft = howmany + val (stbuff, stind) = findStart(offset) + var buffind = stbuff + var ind = stind + var arrayIndex = offset + while (totalleft > 0) { + val currbuff = chain(buffind) + val chunksize = if (totalleft < (currbuff.size - ind)) totalleft else currbuff.size - ind + val until = ind + chunksize + + copyChunk(currbuff.internalArray, ind, array, arrayIndex, until) + arrayIndex += chunksize + ind += chunksize + + totalleft -= chunksize + buffind += 1 + ind = 0 + } + } + private def copyChunk(buffarr: Array[AnyRef], buffStart: Int, ra: Array[Any], arrayStart: Int, until: Int) { + Array.copy(buffarr, buffStart, ra, arrayStart, until - buffStart) + } + private def findStart(pos: Int) = { + var left = pos + var buffind = 0 + while (left >= chain(buffind).size) { + left -= chain(buffind).size + buffind += 1 + } + (buffind, left) + } + def split = { + val fp = howmany / 2 + List(new CopyChainToArray(array, offset, fp), new CopyChainToArray(array, offset + fp, howmany - fp)) + } + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, parallelismLevel) + } + +} + + +object ParallelArrayCombiner { + def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ParallelArrayCombiner[T] = { + new { val chain = c } with ParallelArrayCombiner[T] with EnvironmentPassingCombiner[T, ParallelArray[T]] + } + def apply[T]: ParallelArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T]) +} + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/mutable/ParallelIterable.scala b/src/library/scala/collection/parallel/mutable/ParallelIterable.scala new file mode 100644 index 0000000000..bd0a46bc43 --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParallelIterable.scala @@ -0,0 +1,51 @@ +package scala.collection.parallel.mutable + + +import scala.collection.generic._ + +import scala.collection.parallel.ParallelIterableLike +import scala.collection.parallel.Combiner + + +/** A template trait for parallel iterable collections. + * + * $paralleliterableinfo + * + * $sideeffects + * + * @tparam T the element type of the collection + * + * @author prokopec + * @since 2.8 + */ +trait ParallelIterable[T] extends collection.mutable.Iterable[T] + with collection.parallel.ParallelIterable[T] + with GenericParallelTemplate[T, ParallelIterable] + with ParallelIterableLike[T, ParallelIterable[T], Iterable[T]] { + override def companion: GenericCompanion[ParallelIterable] with GenericParallelCompanion[ParallelIterable] = ParallelIterable +} + +/** $factoryinfo + */ +object ParallelIterable extends ParallelFactory[ParallelIterable] { + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelIterable[T]] = + new GenericCanCombineFrom[T] + + def newBuilder[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T] + + def newCombiner[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T] +} + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/mutable/ParallelSeq.scala b/src/library/scala/collection/parallel/mutable/ParallelSeq.scala new file mode 100644 index 0000000000..636ba1ac3d --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParallelSeq.scala @@ -0,0 +1,61 @@ +package scala.collection.parallel.mutable + + +import scala.collection.generic.GenericParallelTemplate +import scala.collection.generic.GenericCompanion +import scala.collection.generic.GenericParallelCompanion +import scala.collection.generic.CanCombineFrom +import scala.collection.generic.ParallelFactory +import scala.collection.parallel.ParallelSeqLike +import scala.collection.parallel.Combiner + + + + + + + +/** A mutable variant of `ParallelSeq`. + * + * @define Coll mutable.ParallelSeq + * @define coll mutable parallel sequence + */ +trait ParallelSeq[T] extends collection.mutable.Seq[T] + with ParallelIterable[T] + with collection.parallel.ParallelSeq[T] + with GenericParallelTemplate[T, ParallelSeq] + with ParallelSeqLike[T, ParallelSeq[T], Seq[T]] { + self => + override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq + + def update(i: Int, elem: T): Unit + +} + + +/** $factoryInfo + * @define Coll mutable.ParallelSeq + * @define coll mutable parallel sequence + */ +object ParallelSeq extends ParallelFactory[ParallelSeq] { + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeq[T]] = new GenericCanCombineFrom[T] + + def newBuilder[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T] + + def newCombiner[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T] +} + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/mutable/package.scala b/src/library/scala/collection/parallel/mutable/package.scala new file mode 100644 index 0000000000..f670c7b7c5 --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/package.scala @@ -0,0 +1,32 @@ +package scala.collection.parallel + + + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.ArraySeq +import scala.collection.generic.Sizing + + + +package object mutable { + + /* hack-arounds */ + + private[mutable] class ExposedArrayBuffer[T] extends ArrayBuffer[T] with Sizing { + def internalArray = array + def setInternalSize(s: Int) = size0 = s + override def sizeHint(len: Int) = { // delete once we start using 2.8.RC1+ + if (len > size && len >= 1) { + val newarray = new Array[AnyRef](len) + Array.copy(array, 0, newarray, 0, size0) + array = newarray + } + } + } + + private[mutable] class ExposedArraySeq[T](arr: Array[AnyRef], sz: Int) extends ArraySeq[T](sz) { + override val array = arr + override val length = sz + } + +}
\ No newline at end of file diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala new file mode 100644 index 0000000000..cddf098966 --- /dev/null +++ b/src/library/scala/collection/parallel/package.scala @@ -0,0 +1,70 @@ +package scala.collection + + +import java.lang.Thread._ + +import scala.collection.generic.CanBuildFrom +import scala.collection.generic.CanCombineFrom + + +/** Package object for parallel collections. + */ +package object parallel { + val MIN_FOR_COPY = -1 // TODO: set to 5000 + val CHECK_RATE = 512 + + /** Computes threshold from the size of the collection and the parallelism level. + */ + def thresholdFromSize(sz: Int, parallelismLevel: Int) = { + val p = parallelismLevel + if (p > 1) 1 + sz / (8 * p) + else sz + } + + /** An implicit conversion providing arrays with a `par` method, which + * returns a parallel array. + * + * @tparam T type of the elements in the array, which is a subtype of AnyRef + * @param array the array to be parallelized + * @return a `Parallelizable` object with a `par` method + */ + implicit def array2ParallelArray[T <: AnyRef](array: Array[T]) = new Parallelizable[mutable.ParallelArray[T]] { + def par = mutable.ParallelArray.handoff[T](array) + } + + implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new { + def isParallel = bf.isInstanceOf[Parallel] + def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]] + def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new { + def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody + } + } + + implicit def traversable2ops[T](t: TraversableOnce[T]) = new { + def isParallel = t.isInstanceOf[Parallel] + def isParallelIterable = t.isInstanceOf[ParallelIterable[_]] + def asParallelIterable = t.asInstanceOf[ParallelIterable[T]] + def isParallelSeq = t.isInstanceOf[ParallelSeq[_]] + def asParallelSeq = t.asInstanceOf[ParallelSeq[T]] + def ifParallelSeq[R](isbody: ParallelSeq[T] => R) = new { + def otherwise(notbody: => R) = if (isParallel) isbody(asParallelSeq) else notbody + } + } + +} + + + + + + + + + + + + + + + + diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/Combine.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/Combine.scala index 16f791a710..3a070fb6ff 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/Combine.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/Combine.scala @@ -21,7 +21,7 @@ class Combine(val size: Int, val parallelism: Int, val runWhat: String) extends def runpar = throw new UnsupportedOperationException def runseq = runhashtrie def runhashtrie = { - hashtrie combine thattrie + hashtrie merge thattrie // println // println("both tries: " + HashTrie.bothtries) // println("one trie, one item: " + HashTrie.onetrie) @@ -29,7 +29,7 @@ class Combine(val size: Int, val parallelism: Int, val runWhat: String) extends // System exit 1 } def rundestructive = { - hashtrie combine thattrie + hashtrie merge thattrie } def runappendtrie = hashtrie ++ thattrie def runhashmap = hashmap ++ thatmap diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala index a944a7fb39..033c211849 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala @@ -37,7 +37,7 @@ class MultipleCombine(val size: Int, val parallelism: Int, val runWhat: String) def runhashtrie = { initHashTrie var trie = hashtrie - for (r <- 0 until combines) trie = trie combine thattries(r) + for (r <- 0 until combines) trie = trie merge thattries(r) } def runappendtrie = { initHashTrie @@ -52,7 +52,7 @@ class MultipleCombine(val size: Int, val parallelism: Int, val runWhat: String) def rundestructive = { initHashTrie var trie = hashtrie - for (r <- 0 until combines) trie = trie combine thattries(r) + for (r <- 0 until combines) trie = trie merge thattries(r) } def companion = MultipleCombine def comparisonMap = Map("hashtrie" -> runhashtrie _, "hashmap" -> runhashmap _, "appendtrie" -> runappendtrie _, "destruct" -> rundestructive _) |