summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-06-18 15:06:17 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-06-18 15:06:17 +0000
commit9923b97157725ae1f7853a4834ef5e31283a1b98 (patch)
tree6252cf350a91d6bed178b07ed3ddc7fdd21d2890
parentceec792d1af5bb7b2d618f27f6fd48cdf75cf92f (diff)
downloadscala-9923b97157725ae1f7853a4834ef5e31283a1b98.tar.gz
scala-9923b97157725ae1f7853a4834ef5e31283a1b98.tar.bz2
scala-9923b97157725ae1f7853a4834ef5e31283a1b98.zip
Moved parallel collections to library dir, chan...
Moved parallel collections to library dir, changed sabbus script. Added `par` to some of the classes. No review.
-rw-r--r--build.xml21
-rw-r--r--src/library/scala/collection/Parallel.scala17
-rw-r--r--src/library/scala/collection/Parallelizable.scala38
-rw-r--r--src/library/scala/collection/Sequentializable.scala15
-rw-r--r--src/library/scala/collection/generic/CanBuildFromParallel.scala28
-rw-r--r--src/library/scala/collection/generic/GenericParallelCompanion.scala29
-rw-r--r--src/library/scala/collection/generic/GenericParallelTemplate.scala66
-rw-r--r--src/library/scala/collection/generic/HasNewCombiner.scala26
-rw-r--r--src/library/scala/collection/generic/ParallelFactory.scala43
-rw-r--r--src/library/scala/collection/generic/ParallelMapFactory.scala42
-rw-r--r--src/library/scala/collection/generic/Signalling.scala192
-rw-r--r--src/library/scala/collection/generic/Sizing.scala9
-rw-r--r--src/library/scala/collection/immutable/HashMap.scala32
-rw-r--r--src/library/scala/collection/immutable/package.scala81
-rw-r--r--src/library/scala/collection/mutable/ArrayBuffer.scala6
-rw-r--r--src/library/scala/collection/mutable/ArrayOps.scala7
-rw-r--r--src/library/scala/collection/parallel/Combiners.scala66
-rw-r--r--src/library/scala/collection/parallel/Iterators.scala443
-rw-r--r--src/library/scala/collection/parallel/ParallelIterable.scala49
-rw-r--r--src/library/scala/collection/parallel/ParallelIterableLike.scala940
-rw-r--r--src/library/scala/collection/parallel/ParallelIterableView.scala33
-rw-r--r--src/library/scala/collection/parallel/ParallelIterableViewLike.scala59
-rw-r--r--src/library/scala/collection/parallel/ParallelMap.scala71
-rw-r--r--src/library/scala/collection/parallel/ParallelMapLike.scala43
-rw-r--r--src/library/scala/collection/parallel/ParallelSeq.scala64
-rw-r--r--src/library/scala/collection/parallel/ParallelSeqLike.scala473
-rw-r--r--src/library/scala/collection/parallel/ParallelSeqView.scala64
-rw-r--r--src/library/scala/collection/parallel/ParallelSeqViewLike.scala192
-rw-r--r--src/library/scala/collection/parallel/Splitters.scala86
-rw-r--r--src/library/scala/collection/parallel/TaskSupport.scala27
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala230
-rw-r--r--src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala139
-rw-r--r--src/library/scala/collection/parallel/immutable/ParallelIterable.scala56
-rw-r--r--src/library/scala/collection/parallel/immutable/ParallelRange.scala88
-rw-r--r--src/library/scala/collection/parallel/immutable/ParallelSeq.scala47
-rw-r--r--src/library/scala/collection/parallel/immutable/package.scala56
-rw-r--r--src/library/scala/collection/parallel/mutable/LazyCombiner.scala43
-rw-r--r--src/library/scala/collection/parallel/mutable/ParallelArray.scala568
-rw-r--r--src/library/scala/collection/parallel/mutable/ParallelArrayCombiner.scala105
-rw-r--r--src/library/scala/collection/parallel/mutable/ParallelIterable.scala51
-rw-r--r--src/library/scala/collection/parallel/mutable/ParallelSeq.scala61
-rw-r--r--src/library/scala/collection/parallel/mutable/package.scala32
-rw-r--r--src/library/scala/collection/parallel/package.scala70
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/Combine.scala4
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala4
45 files changed, 4788 insertions, 28 deletions
diff --git a/build.xml b/build.xml
index c0593415fc..228f6faf21 100644
--- a/build.xml
+++ b/build.xml
@@ -304,6 +304,7 @@ LOCAL REFERENCE BUILD (LOCKER)
<include name="**/*.scala"/>
<compilationpath>
<pathelement location="${build-locker.dir}/classes/library"/>
+ <pathelement location="${lib.dir}/forkjoin.jar"/>
</compilationpath>
</scalacfork>
<propertyfile file="${build-locker.dir}/classes/library/library.properties">
@@ -444,7 +445,7 @@ QUICK BUILD (QUICK)
<include name="library/**"/>
<include name="dbc/**"/>
<include name="actors/**"/>
- <include name="parallel-collections/**"/>
+ <!--<include name="parallel-collections/**"/>-->
<include name="continuations/**"/>
<include name="swing/**"/>
</srcfiles>
@@ -480,6 +481,7 @@ QUICK BUILD (QUICK)
<include name="**/*.scala"/>
<compilationpath>
<pathelement location="${build-quick.dir}/classes/library"/>
+ <pathelement location="${lib.dir}/forkjoin.jar"/>
</compilationpath>
</scalacfork>
<scalacfork
@@ -494,7 +496,7 @@ QUICK BUILD (QUICK)
<pathelement location="${lib.dir}/forkjoin.jar"/>
</compilationpath>
</scalacfork>
- <scalacfork
+ <!--<scalacfork
destdir="${build-quick.dir}/classes/library"
compilerpathref="locker.classpath"
params="${scalac.args.quick}"
@@ -505,7 +507,7 @@ QUICK BUILD (QUICK)
<pathelement location="${build-quick.dir}/classes/library"/>
<pathelement location="${lib.dir}/forkjoin.jar"/>
</compilationpath>
- </scalacfork>
+ </scalacfork>-->
<scalacfork
destdir="${build-quick.dir}/classes/library"
compilerpathref="locker.classpath"
@@ -1011,6 +1013,7 @@ BOOTSTRAPPING BUILD (STRAP)
<include name="**/*.scala"/>
<compilationpath>
<pathelement location="${build-strap.dir}/classes/library"/>
+ <pathelement location="${forkjoin.jar}"/>
</compilationpath>
</scalacfork>
<scalacfork
@@ -1025,7 +1028,7 @@ BOOTSTRAPPING BUILD (STRAP)
<pathelement location="${forkjoin.jar}"/>
</compilationpath>
</scalacfork>
- <scalacfork
+ <!--<scalacfork
destdir="${build-strap.dir}/classes/library"
compilerpathref="pack.classpath"
params="${scalac.args.all}"
@@ -1036,7 +1039,7 @@ BOOTSTRAPPING BUILD (STRAP)
<pathelement location="${build-strap.dir}/classes/library"/>
<pathelement location="${forkjoin.jar}"/>
</compilationpath>
- </scalacfork>
+ </scalacfork>-->
<scalacfork
destdir="${build-strap.dir}/classes/library"
compilerpathref="pack.classpath"
@@ -1393,7 +1396,7 @@ DOCUMENTATION
<include name="library/**"/>
<include name="dbc/**"/>
<include name="actors/**"/>
- <include name="parallel-collections/**"/>
+ <!--<include name="parallel-collections/**"/>-->
<include name="swing/**"/>
</srcfiles>
</uptodate>
@@ -1411,7 +1414,7 @@ DOCUMENTATION
classpathref="pack.classpath">
<src>
<files includes="${src.dir}/actors"/>
- <files includes="${src.dir}/parallel-collections"/>
+ <!--<files includes="${src.dir}/parallel-collections"/>-->
<files includes="${src.dir}/library/scala"/>
<files includes="${src.dir}/swing"/>
<files includes="${src.dir}/continuations/library"/>
@@ -1650,7 +1653,7 @@ DISTRIBUTION
<jar destfile="${dist.dir}/src/scala-library-src.jar">
<fileset dir="${src.dir}/library"/>
<fileset dir="${src.dir}/actors"/>
- <fileset dir="${src.dir}/parallel-collections"/>
+ <!--<fileset dir="${src.dir}/parallel-collections"/>-->
<fileset dir="${src.dir}/continuations/library"/>
</jar>
<jar destfile="${dist.dir}/src/scala-dbc-src.jar">
@@ -1739,7 +1742,7 @@ STABLE REFERENCE (STARR)
<jar destfile="${basedir}/lib/scala-library-src.jar">
<fileset dir="${basedir}/src/library"/>
<fileset dir="${basedir}/src/actors"/>
- <fileset dir="${basedir}/src/parallel-collections"/>
+ <!--<fileset dir="${basedir}/src/parallel-collections"/>-->
<fileset dir="${basedir}/src/swing"/>
<fileset dir="${basedir}/src/dbc"/>
</jar>
diff --git a/src/library/scala/collection/Parallel.scala b/src/library/scala/collection/Parallel.scala
new file mode 100644
index 0000000000..e500817745
--- /dev/null
+++ b/src/library/scala/collection/Parallel.scala
@@ -0,0 +1,17 @@
+package scala.collection
+
+
+
+
+
+
+/** A marker trait for objects with parallelised operations.
+ *
+ * @since 2.8
+ * @author prokopec
+ */
+trait Parallel
+
+
+
+
diff --git a/src/library/scala/collection/Parallelizable.scala b/src/library/scala/collection/Parallelizable.scala
new file mode 100644
index 0000000000..405c005c55
--- /dev/null
+++ b/src/library/scala/collection/Parallelizable.scala
@@ -0,0 +1,38 @@
+package scala.collection
+
+
+
+import parallel.ParallelIterableLike
+
+
+
+/** This trait describes collections which can be turned into parallel collections
+ * by invoking the method `par`. Parallelizable collections may be parametrized with
+ * a target type different than their own.
+ */
+trait Parallelizable[+ParRepr <: Parallel] {
+
+ /** Returns a parallel implementation of a collection.
+ */
+ def par: ParRepr
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/Sequentializable.scala b/src/library/scala/collection/Sequentializable.scala
new file mode 100644
index 0000000000..61fb24571a
--- /dev/null
+++ b/src/library/scala/collection/Sequentializable.scala
@@ -0,0 +1,15 @@
+package scala.collection
+
+
+
+
+trait Sequentializable[+T, +Repr] {
+
+ /** A view of this parallel collection, but with all
+ * of the operations implemented sequentially (i.e. in a single-threaded manner).
+ *
+ * @return a sequential view of the collection.
+ */
+ def seq: Repr
+
+} \ No newline at end of file
diff --git a/src/library/scala/collection/generic/CanBuildFromParallel.scala b/src/library/scala/collection/generic/CanBuildFromParallel.scala
new file mode 100644
index 0000000000..fcbcd6295e
--- /dev/null
+++ b/src/library/scala/collection/generic/CanBuildFromParallel.scala
@@ -0,0 +1,28 @@
+package scala.collection
+package generic
+
+
+
+import scala.collection.parallel._
+
+
+
+
+/**
+ * A base trait for parallel builder factories.
+ *
+ * @tparam From the type of the underlying collection that requests a builder to be created
+ * @tparam Elem the element type of the collection to be created
+ * @tparam To the type of the collection to be created
+ */
+trait CanCombineFrom[-From, -Elem, +To] extends CanBuildFrom[From, Elem, To] with Parallel {
+ def apply(from: From): Combiner[Elem, To]
+ def apply(): Combiner[Elem, To]
+}
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/generic/GenericParallelCompanion.scala b/src/library/scala/collection/generic/GenericParallelCompanion.scala
new file mode 100644
index 0000000000..e5ba36f846
--- /dev/null
+++ b/src/library/scala/collection/generic/GenericParallelCompanion.scala
@@ -0,0 +1,29 @@
+package scala.collection.generic
+
+
+import scala.collection.parallel.Combiner
+import scala.collection.parallel.ParallelIterable
+import scala.collection.parallel.ParallelMap
+
+
+
+/** A template class for companion objects of parallel collection classes.
+ * They should be mixed in together with `GenericCompanion` type.
+ * @tparam CC the type constructor representing the collection class
+ * @since 2.8
+ */
+trait GenericParallelCompanion[+CC[X] <: ParallelIterable[X]] {
+ /** The default builder for $Coll objects.
+ */
+ def newBuilder[A]: Combiner[A, CC[A]]
+
+ /** The parallel builder for $Coll objects.
+ */
+ def newCombiner[A]: Combiner[A, CC[A]]
+}
+
+trait GenericParallelMapCompanion[+CC[P, Q] <: ParallelMap[P, Q]] {
+ def newCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]]
+}
+
+
diff --git a/src/library/scala/collection/generic/GenericParallelTemplate.scala b/src/library/scala/collection/generic/GenericParallelTemplate.scala
new file mode 100644
index 0000000000..e98c13fa36
--- /dev/null
+++ b/src/library/scala/collection/generic/GenericParallelTemplate.scala
@@ -0,0 +1,66 @@
+package scala.collection.generic
+
+
+
+import scala.collection.parallel.Combiner
+import scala.collection.parallel.ParallelIterable
+import scala.collection.parallel.ParallelMap
+import scala.collection.parallel.TaskSupport
+
+
+import annotation.unchecked.uncheckedVariance
+
+
+
+
+
+
+/** A template trait for collections having a companion.
+ *
+ * @tparam A the element type of the collection
+ * @tparam CC the type constructor representing the collection class
+ * @since 2.8
+ * @author prokopec
+ */
+trait GenericParallelTemplate[+A, +CC[X] <: ParallelIterable[X]]
+extends GenericTraversableTemplate[A, CC]
+ with HasNewCombiner[A, CC[A] @uncheckedVariance]
+ with TaskSupport
+{
+ def companion: GenericCompanion[CC] with GenericParallelCompanion[CC]
+
+ protected[this] override def newBuilder: collection.mutable.Builder[A, CC[A]] = newCombiner
+
+ protected[this] override def newCombiner: Combiner[A, CC[A]] = {
+ val cb = companion.newCombiner[A]
+ cb.environment = environment
+ cb
+ }
+
+ override def genericBuilder[B]: Combiner[B, CC[B]] = genericCombiner[B]
+
+ def genericCombiner[B]: Combiner[B, CC[B]] = {
+ val cb = companion.newCombiner[B]
+ cb.environment = environment
+ cb
+ }
+
+}
+
+
+trait GenericParallelMapTemplate[K, +V, +CC[X, Y] <: ParallelMap[X, Y]]
+extends TaskSupport
+{
+ def mapCompanion: GenericParallelMapCompanion[CC]
+
+ def genericMapCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]] = {
+ val cb = mapCompanion.newCombiner[P, Q]
+ cb.environment = environment
+ cb
+ }
+}
+
+
+
+
+
diff --git a/src/library/scala/collection/generic/HasNewCombiner.scala b/src/library/scala/collection/generic/HasNewCombiner.scala
new file mode 100644
index 0000000000..2c24b437d8
--- /dev/null
+++ b/src/library/scala/collection/generic/HasNewCombiner.scala
@@ -0,0 +1,26 @@
+package scala.collection.generic
+
+
+
+import scala.collection.parallel.Combiner
+
+
+
+trait HasNewCombiner[+T, +Repr] {
+ protected[this] def newCombiner: Combiner[T, Repr]
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/generic/ParallelFactory.scala b/src/library/scala/collection/generic/ParallelFactory.scala
new file mode 100644
index 0000000000..0b9e92aa10
--- /dev/null
+++ b/src/library/scala/collection/generic/ParallelFactory.scala
@@ -0,0 +1,43 @@
+package scala.collection.generic
+
+
+import scala.collection.parallel.ParallelIterable
+import scala.collection.parallel.Combiner
+
+
+
+/** A template class for companion objects of `ParallelIterable` and subclasses thereof.
+ * This class extends `TraversableFactory` and provides a set of operations to create `$Coll` objects.
+ *
+ * @define $coll parallel collection
+ * @define $Coll ParallelIterable
+ */
+abstract class ParallelFactory[CC[X] <: ParallelIterable[X] with GenericParallelTemplate[X, CC]]
+extends TraversableFactory[CC]
+ with GenericParallelCompanion[CC] {
+
+ type EPC[T, C] = collection.parallel.EnvironmentPassingCombiner[T, C]
+
+ /**
+ * A generic implementation of the `CanBuildFromParallel` trait, which forwards all calls to
+ * `apply(from)` to the `genericParallelBuilder` method of the $coll `from`, and calls to `apply()`
+ * to this factory.
+ */
+ class GenericCanCombineFrom[A] extends GenericCanBuildFrom[A] with CanCombineFrom[CC[_], A, CC[A]] {
+ override def apply(from: Coll) = from.genericCombiner
+ override def apply() = newBuilder[A]
+ }
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/generic/ParallelMapFactory.scala b/src/library/scala/collection/generic/ParallelMapFactory.scala
new file mode 100644
index 0000000000..8f779b4029
--- /dev/null
+++ b/src/library/scala/collection/generic/ParallelMapFactory.scala
@@ -0,0 +1,42 @@
+package scala.collection.generic
+
+
+
+import scala.collection.parallel.ParallelMap
+import scala.collection.parallel.ParallelMapLike
+import scala.collection.parallel.Combiner
+import scala.collection.mutable.Builder
+
+
+
+
+/** A template class for companion objects of `ParallelMap` and subclasses thereof.
+ * This class extends `TraversableFactory` and provides a set of operations to create `$Coll` objects.
+ *
+ * @define $coll parallel map
+ * @define $Coll ParallelMap
+ */
+abstract class ParallelMapFactory[CC[X, Y] <: ParallelMap[X, Y] with ParallelMapLike[X, Y, CC[X, Y], _]]
+extends MapFactory[CC]
+ with GenericParallelMapCompanion[CC] {
+
+ type MapColl = CC[_, _]
+
+ /** The default builder for $Coll objects.
+ * @tparam K the type of the keys
+ * @tparam V the type of the associated values
+ */
+ override def newBuilder[K, V]: Builder[(K, V), CC[K, V]] = newCombiner[K, V]
+
+ /** The default combiner for $Coll objects.
+ * @tparam K the type of the keys
+ * @tparam V the type of the associated values
+ */
+ def newCombiner[K, V]: Combiner[(K, V), CC[K, V]]
+
+ class CanCombineFromMap[K, V] extends CanCombineFrom[CC[_, _], (K, V), CC[K, V]] {
+ def apply(from: MapColl) = from.genericMapCombiner[K, V].asInstanceOf[Combiner[(K, V), CC[K, V]]]
+ def apply() = newCombiner[K, V]
+ }
+
+}
diff --git a/src/library/scala/collection/generic/Signalling.scala b/src/library/scala/collection/generic/Signalling.scala
new file mode 100644
index 0000000000..1dac4297b7
--- /dev/null
+++ b/src/library/scala/collection/generic/Signalling.scala
@@ -0,0 +1,192 @@
+package scala.collection.generic
+
+
+import java.util.concurrent.atomic.AtomicInteger
+
+
+
+
+
+/**
+ * A message interface serves as a unique interface to the
+ * part of the collection capable of receiving messages from
+ * a different task.
+ *
+ * One example of use of this is the `find` method, which can use the
+ * signalling interface to inform worker threads that an element has
+ * been found and no further search is necessary.
+ *
+ * @author prokopec
+ *
+ * @define abortflag
+ * Abort flag being true means that a worker can abort and produce whatever result,
+ * since its result will not affect the final result of computation. An example
+ * of operations using this are `find`, `forall` and `exists` methods.
+ *
+ * @define indexflag
+ * The index flag holds an integer which carries some operation-specific meaning. For
+ * instance, `takeWhile` operation sets the index flag to the position of the element
+ * where the predicate fails. Other workers may check this index against the indices
+ * they are working on and return if this index is smaller than their index. Examples
+ * of operations using this are `takeWhile`, `dropWhile`, `span` and `indexOf`.
+ */
+trait Signalling {
+ /**
+ * Checks whether an abort signal has been issued.
+ *
+ * $abortflag
+ * @return the state of the abort
+ */
+ def isAborted: Boolean
+
+ /**
+ * Sends an abort signal to other workers.
+ *
+ * $abortflag
+ */
+ def abort: Unit
+
+ /**
+ * Returns the value of the index flag.
+ *
+ * $indexflag
+ * @return the value of the index flag
+ */
+ def indexFlag: Int
+
+ /**
+ * Sets the value of the index flag.
+ *
+ * $indexflag
+ * @param f the value to which the index flag is set.
+ */
+ def setIndexFlag(f: Int)
+
+ /**
+ * Sets the value of the index flag if argument is greater than current value.
+ * This method does this atomically.
+ *
+ * $indexflag
+ * @param f the value to which the index flag is set
+ */
+ def setIndexFlagIfGreater(f: Int)
+
+ /**
+ * Sets the value of the index flag if argument is lesser than current value.
+ * This method does this atomically.
+ *
+ * $indexflag
+ * @param f the value to which the index flag is set
+ */
+ def setIndexFlagIfLesser(f: Int)
+
+ /**
+ * A read only tag specific to the signalling object. It is used to give
+ * specific workers information on the part of the collection being operated on.
+ */
+ def tag: Int
+}
+
+
+/**
+ * This signalling implementation returns default values and ignores received signals.
+ */
+class DefaultSignalling extends Signalling {
+ def isAborted = false
+ def abort {}
+
+ def indexFlag = -1
+ def setIndexFlag(f: Int) {}
+ def setIndexFlagIfGreater(f: Int) {}
+ def setIndexFlagIfLesser(f: Int) {}
+
+ def tag = -1
+}
+
+
+/**
+ * An object that returns default values and ignores received signals.
+ */
+object IdleSignalling extends DefaultSignalling
+
+
+/**
+ * A mixin trait that implements abort flag behaviour using volatile variables.
+ */
+trait VolatileAbort extends Signalling {
+ @volatile private var abortflag = false
+ abstract override def isAborted = abortflag
+ abstract override def abort = abortflag = true
+}
+
+
+/**
+ * A mixin trait that implements index flag behaviour using atomic integers.
+ * The `setIndex` operation is wait-free, while conditional set operations `setIndexIfGreater`
+ * and `setIndexIfLesser` are lock-free and support only monotonic changes.
+ */
+trait AtomicIndexFlag extends Signalling {
+ private val intflag: AtomicInteger = new AtomicInteger(-1)
+ abstract override def indexFlag = intflag.get
+ abstract override def setIndexFlag(f: Int) = intflag.set(f)
+ abstract override def setIndexFlagIfGreater(f: Int) = {
+ var loop = true
+ do {
+ val old = intflag.get
+ if (f <= old) loop = false
+ else if (intflag.compareAndSet(old, f)) loop = false
+ } while (loop);
+ }
+ abstract override def setIndexFlagIfLesser(f: Int) = {
+ var loop = true
+ do {
+ val old = intflag.get
+ if (f >= old) loop = false
+ else if (intflag.compareAndSet(old, f)) loop = false
+ } while (loop);
+ }
+}
+
+
+/**
+ * An implementation of the signalling interface using delegates.
+ */
+trait DelegatedSignalling extends Signalling {
+ /**
+ * A delegate that method calls are redirected to.
+ */
+ var signalDelegate: Signalling
+
+ def isAborted = signalDelegate.isAborted
+ def abort = signalDelegate.abort
+
+ def indexFlag = signalDelegate.indexFlag
+ def setIndexFlag(f: Int) = signalDelegate.setIndexFlag(f)
+ def setIndexFlagIfGreater(f: Int) = signalDelegate.setIndexFlagIfGreater(f)
+ def setIndexFlagIfLesser(f: Int) = signalDelegate.setIndexFlagIfLesser(f)
+
+ def tag = signalDelegate.tag
+}
+
+
+/**
+ * Class implementing delegated signalling.
+ */
+class DelegatedContext(var signalDelegate: Signalling) extends DelegatedSignalling
+
+
+/**
+ * Class implementing delegated signalling, but having its own distinct `tag`.
+ */
+class TaggedDelegatedContext(deleg: Signalling, override val tag: Int) extends DelegatedContext(deleg)
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/generic/Sizing.scala b/src/library/scala/collection/generic/Sizing.scala
new file mode 100644
index 0000000000..bf801302ae
--- /dev/null
+++ b/src/library/scala/collection/generic/Sizing.scala
@@ -0,0 +1,9 @@
+package scala.collection.generic
+
+
+
+/** A trait for objects which have a size.
+ */
+trait Sizing {
+ def size: Int
+} \ No newline at end of file
diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala
index 22760b88c2..8b4bc070ab 100644
--- a/src/library/scala/collection/immutable/HashMap.scala
+++ b/src/library/scala/collection/immutable/HashMap.scala
@@ -14,6 +14,10 @@ package immutable
import generic._
import annotation.unchecked.uncheckedVariance
+
+import parallel.immutable.ParallelHashTrie
+
+
/** This class implements immutable maps using a hash trie.
*
* '''Note:''' the builder of a hash map returns specialized representations EmptyMap,Map1,..., Map4
@@ -32,7 +36,7 @@ import annotation.unchecked.uncheckedVariance
* @define willNotTerminateInf
*/
@serializable @SerialVersionUID(2L)
-class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] {
+class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Parallelizable[ParallelHashTrie[A, B]] {
override def size: Int = 0
@@ -80,9 +84,11 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] {
def split: Seq[HashMap[A, B]] = Seq(this)
- def combine[B1 >: B](that: HashMap[A, B1]): HashMap[A, B1] = combine0(that, 0)
+ def merge[B1 >: B](that: HashMap[A, B1]): HashMap[A, B1] = merge0(that, 0)
+
+ protected def merge0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = that
- protected def combine0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = that
+ def par = ParallelHashTrie.fromTrie(this)
}
@@ -170,7 +176,7 @@ object HashMap extends ImmutableMapFactory[HashMap] {
override def iterator: Iterator[(A,B)] = Iterator(ensurePair)
override def foreach[U](f: ((A, B)) => U): Unit = f(ensurePair)
private[HashMap] def ensurePair: (A,B) = if (kv ne null) kv else { kv = (key, value); kv }
- protected override def combine0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = {
+ protected override def merge0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = {
// if (that.isInstanceOf[HashMap1[_, _]]) bothsingle += 1
// else onetrie += 1
that.updated0(key, hash, level, value, kv)
@@ -209,7 +215,7 @@ object HashMap extends ImmutableMapFactory[HashMap] {
def newhm(lm: ListMap[A, B @uncheckedVariance]) = new HashMapCollision1(hash, lm)
List(newhm(x), newhm(y))
}
- protected override def combine0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = {
+ protected override def merge0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = {
// this can be made more efficient by passing the entire ListMap at once
var m = that
for (p <- kvs) m = m.updated0(p._1, this.hash, level, p._2, p)
@@ -453,7 +459,7 @@ time { mNew.iterator.foreach( p => ()) }
} else elems(0).split
}
- protected override def combine0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = that match {
+ protected override def merge0[B1 >: B](that: HashMap[A, B1], level: Int): HashMap[A, B1] = that match {
case hm: HashMap1[_, _] =>
// onetrie += 1
this.updated0(hm.key, hm.hash, level, hm.value.asInstanceOf[B1], hm.kv)
@@ -469,7 +475,7 @@ time { mNew.iterator.foreach( p => ()) }
val subcount = Integer.bitCount(thisbm | thatbm)
// construct a new array of appropriate size
- val combined = new Array[HashMap[A, B1]](subcount)
+ val merged = new Array[HashMap[A, B1]](subcount)
// run through both bitmaps and add elements to it
var i = 0
@@ -486,9 +492,9 @@ time { mNew.iterator.foreach( p => ()) }
// }
if (thislsb == thatlsb) {
// println("a collision")
- val m = thiselems(thisi).combine0(thatelems(thati), level + 5)
+ val m = thiselems(thisi).merge0(thatelems(thati), level + 5)
totalelems += m.size
- combined(i) = m
+ merged(i) = m
thisbm = thisbm & ~thislsb
thatbm = thatbm & ~thatlsb
thati += 1
@@ -507,14 +513,14 @@ time { mNew.iterator.foreach( p => ()) }
// println("an element from this trie")
val m = thiselems(thisi)
totalelems += m.size
- combined(i) = m
+ merged(i) = m
thisbm = thisbm & ~thislsb
thisi += 1
} else {
// println("an element from that trie")
val m = thatelems(thati)
totalelems += m.size
- combined(i) = m
+ merged(i) = m
thatbm = thatbm & ~thatlsb
thati += 1
}
@@ -522,8 +528,8 @@ time { mNew.iterator.foreach( p => ()) }
i += 1
}
- new HashTrieMap[A, B1](this.bitmap | that.bitmap, combined, totalelems)
- case hm: HashMapCollision1[_, _] => that.combine0(this, level)
+ new HashTrieMap[A, B1](this.bitmap | that.bitmap, merged, totalelems)
+ case hm: HashMapCollision1[_, _] => that.merge0(this, level)
case _ => error("section supposed to be unreachable.")
}
diff --git a/src/library/scala/collection/immutable/package.scala b/src/library/scala/collection/immutable/package.scala
new file mode 100644
index 0000000000..5ff9fa223d
--- /dev/null
+++ b/src/library/scala/collection/immutable/package.scala
@@ -0,0 +1,81 @@
+package scala.collection
+
+
+
+
+
+
+
+
+
+package object immutable {
+
+ trait RangeUtils[+Repr <: RangeUtils[Repr]] {
+
+ def start: Int
+ def end: Int
+ def step: Int
+ def inclusive: Boolean
+ def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean): Repr
+
+ private final def inclusiveLast: Int = {
+ val size = end.toLong - start.toLong
+ (size / step.toLong * step.toLong + start.toLong).toInt
+ }
+
+ final def _last: Int = if (!inclusive) {
+ if (step == 1 || step == -1) end - step
+ else {
+ val inclast = inclusiveLast
+ if ((end.toLong - start.toLong) % step == 0) inclast - step else inclast
+ }
+ } else {
+ if (step == 1 || step == -1) end
+ else inclusiveLast
+ }
+
+ final def _foreach[U](f: Int => U) = if (_length > 0) {
+ var i = start
+ val last = _last
+ while (i != last) {
+ f(i)
+ i += step
+ }
+ }
+
+ final def _length: Int = if (!inclusive) {
+ if (end > start == step > 0 && start != end) {
+ (_last.toLong - start.toLong) / step.toLong + 1
+ } else 0
+ }.toInt else {
+ if (end > start == step > 0 || start == end) {
+ (_last.toLong - start.toLong) / step.toLong + 1
+ } else 0
+ }.toInt
+
+ final def _apply(idx: Int): Int = {
+ if (idx < 0 || idx >= _length) throw new IndexOutOfBoundsException(idx.toString)
+ start + idx * step
+ }
+
+ private def locationAfterN(n: Int) = if (n > 0) {
+ if (step > 0) ((start.toLong + step.toLong * n.toLong) min _last.toLong).toInt
+ else ((start.toLong + step.toLong * n.toLong) max _last.toLong).toInt
+ } else start
+
+ final def _take(n: Int) = if (n > 0 && _length > 0) {
+ create(start, locationAfterN(n), step, true)
+ } else create(start, start, step, false)
+
+ final def _drop(n: Int) = create(locationAfterN(n), end, step, inclusive)
+
+ final def _slice(from: Int, until: Int) = _drop(from)._take(until - from)
+
+ }
+
+}
+
+
+
+
+
diff --git a/src/library/scala/collection/mutable/ArrayBuffer.scala b/src/library/scala/collection/mutable/ArrayBuffer.scala
index 6412a21531..a59a0db2e1 100644
--- a/src/library/scala/collection/mutable/ArrayBuffer.scala
+++ b/src/library/scala/collection/mutable/ArrayBuffer.scala
@@ -12,6 +12,7 @@ package scala.collection
package mutable
import generic._
+import parallel.mutable.ParallelArray
/** An implementation of the `Buffer` class using an array to
* represent the assembled sequence internally. Append, update and random
@@ -46,7 +47,8 @@ class ArrayBuffer[A](override protected val initialSize: Int)
with BufferLike[A, ArrayBuffer[A]]
with IndexedSeqOptimized[A, ArrayBuffer[A]]
with Builder[A, ArrayBuffer[A]]
- with ResizableArray[A] {
+ with ResizableArray[A]
+ with Parallelizable[ParallelArray[A]] {
override def companion: GenericCompanion[ArrayBuffer] = ArrayBuffer
@@ -64,6 +66,8 @@ class ArrayBuffer[A](override protected val initialSize: Int)
}
}
+ def par = ParallelArray.handoff[A](array.asInstanceOf[Array[A]], size)
+
/** Appends a single element to this buffer and returns
* the identity of the buffer. It takes constant amortized time.
*
diff --git a/src/library/scala/collection/mutable/ArrayOps.scala b/src/library/scala/collection/mutable/ArrayOps.scala
index 00e8697b53..3cf6a642d2 100644
--- a/src/library/scala/collection/mutable/ArrayOps.scala
+++ b/src/library/scala/collection/mutable/ArrayOps.scala
@@ -14,6 +14,9 @@ import compat.Platform.arraycopy
import scala.reflect.ClassManifest
+import parallel.mutable.ParallelArray
+
+
/** This class serves as a wrapper for `Array`s with all the operations found in
* indexed sequences. Where needed, instances of arrays are implicitly converted
* into this class.
@@ -32,7 +35,7 @@ import scala.reflect.ClassManifest
* @define mayNotTerminateInf
* @define willNotTerminateInf
*/
-abstract class ArrayOps[T] extends ArrayLike[T, Array[T]] {
+abstract class ArrayOps[T] extends ArrayLike[T, Array[T]] with Parallelizable[ParallelArray[T]] {
private def rowBuilder[U]: Builder[U, Array[U]] =
Array.newBuilder(
@@ -52,6 +55,8 @@ abstract class ArrayOps[T] extends ArrayLike[T, Array[T]] {
else
super.toArray[U]
+ def par = ParallelArray.handoff(repr)
+
/** Flattens a two-dimensional array by concatenating all its rows
* into a single array.
*
diff --git a/src/library/scala/collection/parallel/Combiners.scala b/src/library/scala/collection/parallel/Combiners.scala
new file mode 100644
index 0000000000..a37f642d42
--- /dev/null
+++ b/src/library/scala/collection/parallel/Combiners.scala
@@ -0,0 +1,66 @@
+package scala.collection.parallel
+
+
+import scala.collection.Parallel
+import scala.collection.mutable.Builder
+import scala.collection.generic.Sizing
+
+
+
+/** The base trait for all combiners.
+ * A combiner lets one construct collections incrementally just like
+ * a regular builder, but also implements an efficient merge operation of two builders
+ * via `combine` method. Once the collection is constructed, it may be obtained by invoking
+ * the `result` method.
+ *
+ * @tparam Elem the type of the elements added to the builder
+ * @tparam To the type of the collection the builder produces
+ *
+ * @author prokopec
+ */
+trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel with TaskSupport {
+ self: EnvironmentPassingCombiner[Elem, To] =>
+
+ type EPC = EnvironmentPassingCombiner[Elem, To]
+
+ /** Combines the contents of the receiver builder and the `other` builder,
+ * producing a new builder containing both their elements.
+ *
+ * This method may combine the two builders by copying them into a larger collection,
+ * by producing a lazy view that gets evaluated once `result` is invoked, or use
+ * a merge operation specific to the data structure in question.
+ *
+ * Note that both the receiver builder and `other` builder become invalidated
+ * after the invocation of this method, and should be cleared (see `clear`)
+ * if they are to be used again.
+ *
+ * Also, combining two combiners `c1` and `c2` for which `c1 eq c2` is `true`, that is,
+ * they are the same objects in memories, always does nothing and returns the first combiner.
+ *
+ * @tparam N the type of elements contained by the `other` builder
+ * @tparam NewTo the type of collection produced by the `other` builder
+ * @param other the other builder
+ * @return the parallel builder containing both the elements of this and the `other` builder
+ */
+ def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo]
+
+}
+
+
+trait EnvironmentPassingCombiner[-Elem, +To] extends Combiner[Elem, To] {
+ abstract override def result = {
+ val res = super.result
+// res.environment = environment
+ res
+ }
+}
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/Iterators.scala b/src/library/scala/collection/parallel/Iterators.scala
new file mode 100644
index 0000000000..bfebff994c
--- /dev/null
+++ b/src/library/scala/collection/parallel/Iterators.scala
@@ -0,0 +1,443 @@
+package scala.collection.parallel
+
+
+
+import scala.collection.Parallel
+import scala.collection.generic.Signalling
+import scala.collection.generic.DelegatedSignalling
+import scala.collection.generic.CanCombineFrom
+import scala.collection.mutable.Builder
+import scala.collection.Iterator.empty
+
+
+
+
+
+
+trait RemainsIterator[+T] extends Iterator[T] {
+ /** The number of elements this iterator has yet to iterate.
+ * This method doesn't change the state of the iterator.
+ */
+ def remaining: Int
+}
+
+
+/** Augments iterators with additional methods, mostly transformers,
+ * assuming they iterate an iterable collection.
+ *
+ * @param T type of the elements iterated.
+ * @param Repr type of the collection iterator iterates.
+ */
+trait AugmentedIterableIterator[+T, +Repr <: Parallel] extends RemainsIterator[T] {
+
+ def repr: Repr
+
+ /* accessors */
+
+ override def count(p: T => Boolean): Int = {
+ var i = 0
+ while (hasNext) if (p(next)) i += 1
+ i
+ }
+
+ def reduce[U >: T](op: (U, U) => U): U = {
+ var r: U = next
+ while (hasNext) r = op(r, next)
+ r
+ }
+
+ def fold[U >: T](z: U)(op: (U, U) => U): U = {
+ var r = z
+ while (hasNext) r = op(r, next)
+ r
+ }
+
+ override def sum[U >: T](implicit num: Numeric[U]): U = {
+ var r: U = num.zero
+ while (hasNext) r = num.plus(r, next)
+ r
+ }
+
+ override def product[U >: T](implicit num: Numeric[U]): U = {
+ var r: U = num.one
+ while (hasNext) r = num.times(r, next)
+ r
+ }
+
+ override def min[U >: T](implicit ord: Ordering[U]): T = {
+ var r = next
+ while (hasNext) {
+ val curr = next
+ if (ord.lteq(curr, r)) r = curr
+ }
+ r
+ }
+
+ override def max[U >: T](implicit ord: Ordering[U]): T = {
+ var r = next
+ while (hasNext) {
+ val curr = next
+ if (ord.gteq(curr, r)) r = curr
+ }
+ r
+ }
+
+ override def copyToArray[U >: T](array: Array[U], from: Int, len: Int) {
+ var i = from
+ val until = from + len
+ while (i < until && hasNext) {
+ array(i) = next
+ i += 1
+ }
+ }
+
+ /* transformers to combiners */
+
+ def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
+ //val cb = pbf(repr)
+ cb.sizeHint(remaining)
+ while (hasNext) cb += f(next)
+ cb
+ }
+
+ def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = {
+ val cb = pbf(repr)
+ while (hasNext) {
+ val curr = next
+ if (pf.isDefinedAt(curr)) cb += pf(curr)
+ }
+ cb
+ }
+
+ def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = {
+ val cb = pbf(repr)
+ while (hasNext) {
+ val traversable = f(next)
+ if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator
+ else cb ++= traversable
+ }
+ cb
+ }
+
+ def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](b: Bld): Bld = {
+ b.sizeHint(remaining)
+ while (hasNext) b += next
+ b
+ }
+
+ def filter2combiner[U >: T, This >: Repr](pred: T => Boolean, cb: Combiner[U, This]): Combiner[U, This] = {
+ while (hasNext) {
+ val curr = next
+ if (pred(curr)) cb += curr
+ }
+ cb
+ }
+
+ def filterNot2combiner[U >: T, This >: Repr](pred: T => Boolean, cb: Combiner[U, This]): Combiner[U, This] = {
+ while (hasNext) {
+ val curr = next
+ if (!pred(curr)) cb += curr
+ }
+ cb
+ }
+
+ def partition2combiners[U >: T, This >: Repr](pred: T => Boolean, btrue: Combiner[U, This], bfalse: Combiner[U, This]) = {
+ while (hasNext) {
+ val curr = next
+ if (pred(curr)) btrue += curr
+ else bfalse += curr
+ }
+ (btrue, bfalse)
+ }
+
+ def take2combiner[U >: T, This >: Repr](n: Int, cb: Combiner[U, This]): Combiner[U, This] = {
+ cb.sizeHint(n)
+ var left = n
+ while (left > 0) {
+ cb += next
+ left -= 1
+ }
+ cb
+ }
+
+ def drop2combiner[U >: T, This >: Repr](n: Int, cb: Combiner[U, This]): Combiner[U, This] = {
+ drop(n)
+ cb.sizeHint(remaining)
+ while (hasNext) cb += next
+ cb
+ }
+
+ def slice2combiner[U >: T, This >: Repr](from: Int, until: Int, cb: Combiner[U, This]): Combiner[U, This] = {
+ drop(from)
+ var left = until - from
+ cb.sizeHint(left)
+ while (left > 0) {
+ cb += next
+ left -= 1
+ }
+ cb
+ }
+
+ def splitAt2combiners[U >: T, This >: Repr](at: Int, before: Combiner[U, This], after: Combiner[U, This]) = {
+ before.sizeHint(at)
+ after.sizeHint(remaining - at)
+ var left = at
+ while (left > 0) {
+ before += next
+ left -= 1
+ }
+ while (hasNext) after += next
+ (before, after)
+ }
+
+ def takeWhile2combiner[U >: T, This >: Repr](p: T => Boolean, cb: Combiner[U, This]) = {
+ var loop = true
+ while (hasNext && loop) {
+ val curr = next
+ if (p(curr)) cb += curr
+ else loop = false
+ }
+ (cb, loop)
+ }
+
+ def span2combiners[U >: T, This >: Repr](p: T => Boolean, before: Combiner[U, This], after: Combiner[U, This]) = {
+ var isBefore = true
+ while (hasNext && isBefore) {
+ val curr = next
+ if (p(curr)) before += curr
+ else {
+ after.sizeHint(remaining + 1)
+ after += curr
+ isBefore = false
+ }
+ }
+ while (hasNext) after += next
+ (before, after)
+ }
+}
+
+
+trait AugmentedSeqIterator[+T, +Repr <: Parallel] extends AugmentedIterableIterator[T, Repr] {
+
+ /** The exact number of elements this iterator has yet to iterate.
+ * This method doesn't change the state of the iterator.
+ */
+ def remaining: Int
+
+ /* accessors */
+
+ def prefixLength(pred: T => Boolean): Int = {
+ var total = 0
+ var loop = true
+ while (hasNext && loop) {
+ if (pred(next)) total += 1
+ else loop = false
+ }
+ total
+ }
+
+ override def indexWhere(pred: T => Boolean): Int = {
+ var i = 0
+ var loop = true
+ while (hasNext && loop) {
+ if (pred(next)) loop = false
+ else i += 1
+ }
+ if (loop) -1 else i
+ }
+
+ def lastIndexWhere(pred: T => Boolean): Int = {
+ var pos = -1
+ var i = 0
+ while (hasNext) {
+ if (pred(next)) pos = i
+ i += 1
+ }
+ pos
+ }
+
+ def corresponds[S](corr: (T, S) => Boolean)(that: Iterator[S]): Boolean = {
+ while (hasNext && that.hasNext) {
+ if (!corr(next, that.next)) return false
+ }
+ hasNext == that.hasNext
+ }
+
+ /* transformers */
+
+ def reverse2combiner[U >: T, This >: Repr](cb: Combiner[U, This]): Combiner[U, This] = {
+ cb.sizeHint(remaining)
+ var lst = List[T]()
+ while (hasNext) lst ::= next
+ while (lst != Nil) {
+ cb += lst.head
+ lst = lst.tail
+ }
+ cb
+ }
+
+ def reverseMap2combiner[S, That](f: T => S, cbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = {
+ val cb = cbf(repr)
+ cb.sizeHint(remaining)
+ var lst = List[S]()
+ while (hasNext) lst ::= f(next)
+ while (lst != Nil) {
+ cb += lst.head
+ lst = lst.tail
+ }
+ cb
+ }
+
+ def updated2combiner[U >: T, That](index: Int, elem: U, cbf: CanCombineFrom[Repr, U, That]): Combiner[U, That] = {
+ val cb = cbf(repr)
+ cb.sizeHint(remaining)
+ var j = 0
+ while (hasNext) {
+ if (j == index) {
+ cb += elem
+ next
+ } else cb += next
+ j += 1
+ }
+ cb
+ }
+
+}
+
+
+
+trait ParallelIterableIterator[+T, +Repr <: Parallel]
+extends AugmentedIterableIterator[T, Repr]
+ with Splitter[T]
+ with Signalling
+ with DelegatedSignalling
+{
+ def split: Seq[ParallelIterableIterator[T, Repr]]
+
+ /** The number of elements this iterator has yet to traverse. This method
+ * doesn't change the state of the iterator.
+ *
+ * This method is used to provide size hints to builders and combiners, and
+ * to approximate positions of iterators within a data structure.
+ *
+ * '''Note''': This method may be implemented to return an upper bound on the number of elements
+ * in the iterator, instead of the exact number of elements to iterate.
+ *
+ * In that case, 2 considerations must be taken into account:
+ *
+ * 1) classes that inherit `ParallelIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt` and `copyToArray`.
+ *
+ * 2) if an iterator provides an upper bound on the number of elements, then after splitting the sum
+ * of `remaining` values of split iterators must be less than or equal to this upper bound.
+ */
+ def remaining: Int
+}
+
+
+trait ParallelSeqIterator[+T, +Repr <: Parallel]
+extends ParallelIterableIterator[T, Repr]
+ with AugmentedSeqIterator[T, Repr]
+ with PreciseSplitter[T]
+{
+ def split: Seq[ParallelSeqIterator[T, Repr]]
+ def psplit(sizes: Int*): Seq[ParallelSeqIterator[T, Repr]]
+
+ /** The number of elements this iterator has yet to traverse. This method
+ * doesn't change the state of the iterator. Unlike the version of this method in the supertrait,
+ * method `remaining` in `ParallelSeqLike.this.ParallelIterator` must return an exact number
+ * of elements remaining in the iterator.
+ *
+ * @return an exact number of elements this iterator has yet to iterate
+ */
+ def remaining: Int
+}
+
+
+trait DelegatedIterator[+T, +Delegate <: Iterator[T]] extends RemainsIterator[T] {
+ val delegate: Delegate
+ def next = delegate.next
+ def hasNext = delegate.hasNext
+}
+
+
+trait Counting[+T] extends RemainsIterator[T] {
+ val initialSize: Int
+ def remaining = initialSize - traversed
+ var traversed = 0
+ abstract override def next = {
+ val n = super.next
+ traversed += 1
+ n
+ }
+}
+
+
+/** A mixin for iterators that traverse only filtered elements of a delegate.
+ */
+trait FilteredIterator[+T, +Delegate <: Iterator[T]] extends DelegatedIterator[T, Delegate] {
+ protected[this] val pred: T => Boolean
+
+ private[this] var hd: T = _
+ private var hdDefined = false
+
+ override def hasNext: Boolean = hdDefined || {
+ do {
+ if (!delegate.hasNext) return false
+ hd = delegate.next
+ } while (!pred(hd))
+ hdDefined = true
+ true
+ }
+
+ override def next = if (hasNext) { hdDefined = false; hd } else empty.next
+}
+
+
+/** A mixin for iterators that traverse elements of the delegate iterator, and of another collection.
+ */
+trait AppendedIterator[+T, +Delegate <: Iterator[T]] extends DelegatedIterator[T, Delegate] {
+ // `rest` should never alias `delegate`
+ protected[this] val rest: Iterator[T]
+
+ private[this] var current: Iterator[T] = delegate
+
+ override def hasNext = (current.hasNext) || (current == delegate && rest.hasNext)
+
+ override def next = {
+ if (!current.hasNext) current = rest
+ current.next
+ }
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/ParallelIterable.scala b/src/library/scala/collection/parallel/ParallelIterable.scala
new file mode 100644
index 0000000000..4882dc19ee
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParallelIterable.scala
@@ -0,0 +1,49 @@
+package scala.collection.parallel
+
+
+import scala.collection.generic._
+import scala.collection.parallel.mutable.ParallelArrayCombiner
+import scala.collection.parallel.mutable.ParallelArray
+
+
+/** A template trait for parallel iterable collections.
+ *
+ * $paralleliterableinfo
+ *
+ * $sideeffects
+ *
+ * @tparam T the element type of the collection
+ *
+ * @author prokopec
+ * @since 2.8
+ */
+trait ParallelIterable[+T] extends Iterable[T]
+ with GenericParallelTemplate[T, ParallelIterable]
+ with ParallelIterableLike[T, ParallelIterable[T], Iterable[T]] {
+ override def companion: GenericCompanion[ParallelIterable] with GenericParallelCompanion[ParallelIterable] = ParallelIterable
+}
+
+/** $factoryinfo
+ */
+object ParallelIterable extends ParallelFactory[ParallelIterable] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelIterable[T]] =
+ new GenericCanCombineFrom[T]
+
+ def newBuilder[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T]
+
+ def newCombiner[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T]
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/ParallelIterableLike.scala b/src/library/scala/collection/parallel/ParallelIterableLike.scala
new file mode 100644
index 0000000000..7ac2713b55
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParallelIterableLike.scala
@@ -0,0 +1,940 @@
+package scala.collection.parallel
+
+
+
+
+import scala.collection.mutable.Builder
+import scala.collection.mutable.ListBuffer
+import scala.collection.IterableLike
+import scala.collection.Parallel
+import scala.collection.Parallelizable
+import scala.collection.Sequentializable
+import scala.collection.generic._
+
+
+
+
+// TODO update docs!!
+/** A template trait for parallel collections of type `ParallelIterable[T]`.
+ *
+ * $paralleliterableinfo
+ *
+ * $sideeffects
+ *
+ * @tparam T the element type of the collection
+ * @tparam Repr the type of the actual collection containing the elements
+ *
+ * @define paralleliterableinfo
+ * This is a base trait for Scala parallel collections. It defines behaviour
+ * common to all parallel collections. The actual parallel operation implementation
+ * is found in the `ParallelIterableFJImpl` trait extending this trait. Concrete
+ * parallel collections should inherit both this and that trait.
+ *
+ * Parallel operations are implemented with divide and conquer style algorithms that
+ * parallelize well. The basic idea is to split the collection into smaller parts until
+ * they are small enough to be operated on sequentially.
+ *
+ * All of the parallel operations are implemented in terms of several methods. The first is:
+ * {{{
+ * def split: Seq[Repr]
+ * }}}
+ * which splits the collection into a sequence of disjunct views. This is typically a
+ * very fast operation which simply creates wrappers around the receiver collection.
+ * These views can then be split recursively into smaller views and so on. Each of
+ * the views is still a parallel collection.
+ *
+ * The next method is:
+ * {{{
+ * def combine[OtherRepr >: Repr](other: OtherRepr): OtherRepr
+ * }}}
+ * which combines this collection with the argument collection and returns a collection
+ * containing both the elements of this collection and the argument collection. This behaviour
+ * may be implemented by producing a view that iterates over both collections, by aggressively
+ * copying all the elements into the new collection or by lazily creating a wrapper over both
+ * collections that gets evaluated once it's needed. It is recommended to avoid copying all of
+ * the elements for performance reasons, although that cost might be negligible depending on
+ * the use case.
+ *
+ * Methods:
+ * {{{
+ * def seq: Repr
+ * }}}
+ * and
+ * {{{
+ * def par: Repr
+ * }}}
+ * produce a view of the collection that has sequential or parallel operations, respectively.
+ *
+ * The method:
+ * {{{
+ * def threshold(sz: Int, p: Int): Int
+ * }}}
+ * provides an estimate on the minimum number of elements the collection has before
+ * the splitting stops and depends on the number of elements in the collection. A rule of the
+ * thumb is the number of elements divided by 8 times the parallelism level. This method may
+ * be overridden in concrete implementations if necessary.
+ *
+ * Finally, method `newParallelBuilder` produces a new parallel builder.
+ *
+ * Since this trait extends the `Iterable` trait, methods like `size` and `iterator` must also
+ * be implemented.
+ *
+ * Each parallel collection is bound to a specific fork/join pool, on which dormant worker
+ * threads are kept. One can change a fork/join pool of a collection any time except during
+ * some method being invoked. The fork/join pool contains other information such as the parallelism
+ * level, that is, the number of processors used. When a collection is created, it is assigned the
+ * default fork/join pool found in the `scala.collection.parallel` package object.
+ *
+ * Parallel collections may or may not be strict, and they are not ordered in terms of the `foreach`
+ * operation (see `Traversable`). In terms of the iterator of the collection, some collections
+ * are ordered (for instance, parallel sequences).
+ *
+ * @author prokopec
+ * @since 2.8
+ *
+ * @define sideeffects
+ * The higher-order functions passed to certain operations may contain side-effects. Since implementations
+ * of operations may not be sequential, this means that side-effects may not be predictable and may
+ * produce data-races, deadlocks or invalidation of state if care is not taken. It is up to the programmer
+ * to either avoid using side-effects or to use some form of synchronization when accessing mutable data.
+ *
+ * @define undefinedorder
+ * The order in which the operations on elements are performed is unspecified and may be nondeterministic.
+ *
+ * @define pbfinfo
+ * An implicit value of class `CanCombineFrom` which determines the
+ * result class `That` from the current representation type `Repr` and
+ * and the new element type `B`. This builder factory can provide a parallel
+ * builder for the resulting collection.
+ *
+ * @define abortsignalling
+ * This method will provide sequential views it produces with `abort` signalling capabilities. This means
+ * that sequential views may send and read `abort` signals.
+ *
+ * @define indexsignalling
+ * This method will provide sequential views it produces with `indexFlag` signalling capabilities. This means
+ * that sequential views may set and read `indexFlag` state.
+ */
+trait ParallelIterableLike[+T, +Repr <: Parallel, +SequentialView <: Iterable[T]]
+extends IterableLike[T, Repr]
+ with Parallelizable[Repr]
+ with Sequentializable[T, SequentialView]
+ with Parallel
+ with HasNewCombiner[T, Repr]
+ with TaskSupport {
+ self =>
+
+ /** Parallel iterators are split iterators that have additional accessor and
+ * transformer methods defined in terms of methods `next` and `hasNext`.
+ * When creating a new parallel collection, one might want to override these
+ * new methods to make them more efficient.
+ *
+ * Parallel iterators are augmented with signalling capabilities. This means
+ * that a signalling object can be assigned to them as needed.
+ *
+ * The self-type ensures that signal context passing behaviour gets mixed in
+ * a concrete object instance.
+ */
+ trait ParallelIterator extends ParallelIterableIterator[T, Repr] {
+ me: SignalContextPassingIterator[ParallelIterator] =>
+ var signalDelegate: Signalling = IdleSignalling
+ def repr = self.repr
+ def split: Seq[ParallelIterator]
+ }
+
+ /** A stackable modification that ensures signal contexts get passed along the iterators.
+ * A self-type requirement in `ParallelIterator` ensures that this trait gets mixed into
+ * concrete iterators.
+ */
+ trait SignalContextPassingIterator[+IterRepr <: ParallelIterator] extends ParallelIterator {
+ // Note: This functionality must be factored out to this inner trait to avoid boilerplate.
+ // Also, one could omit the cast below. However, this leads to return type inconsistencies,
+ // due to inability to override the return type of _abstract overrides_.
+ // Be aware that this stackable modification has to be subclassed, so it shouldn't be rigid
+ // on the type of iterators it splits.
+ // The alternative is some boilerplate - better to tradeoff some type safety to avoid it here.
+ abstract override def split: Seq[IterRepr] = {
+ val pits = super.split
+ pits foreach { _.signalDelegate = signalDelegate }
+ pits.asInstanceOf[Seq[IterRepr]]
+ }
+ }
+
+ /** Convenience for signal context passing iterator.
+ */
+ type SCPI <: SignalContextPassingIterator[ParallelIterator]
+
+ /** Creates a new parallel iterator used to traverse the elements of this parallel collection.
+ * This iterator is more specific than the iterator of the returned by `iterator`, and augmented
+ * with additional accessor and transformer methods.
+ *
+ * @return a parallel iterator
+ */
+ protected def parallelIterator: ParallelIterator
+
+ /** Creates a new split iterator used to traverse the elements of this collection.
+ *
+ * By default, this method is implemented in terms of the protected `parallelIterator` method.
+ *
+ * @return a split iterator
+ */
+ def iterator: Splitter[T] = parallelIterator
+
+ def par = repr
+
+ /** Some minimal number of elements after which this collection should be handled
+ * sequentially by different processors.
+ *
+ * This method depends on the size of the collection and the parallelism level, which
+ * are both specified as arguments.
+ *
+ * @param sz the size based on which to compute the threshold
+ * @param p the parallelism level based on which to compute the threshold
+ * @return the maximum number of elements for performing operations sequentially
+ */
+ def threshold(sz: Int, p: Int): Int = thresholdFromSize(sz, p)
+
+ /** The `newBuilder` operation returns a parallel builder assigned to this collection's fork/join pool.
+ * This method forwards the call to `newCombiner`.
+ */
+ protected[this] override def newBuilder: collection.mutable.Builder[T, Repr] = newCombiner
+
+ /** Optionally reuses existing combiner for better performance. By default it doesn't - subclasses may override this behaviour.
+ * The provided combiner `oldc` that can potentially be reused will be either some combiner from the previous computational task, or `None` if there
+ * was no previous phase (in which case this method must return `newc`).
+ *
+ * @param oldc The combiner that is the result of the previous task, or `None` if there was no previous task.
+ * @param newc The new, empty combiner that can be used.
+ * @return Either `newc` or `oldc`.
+ */
+ protected def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]): Combiner[S, That] = newc
+
+ /* convenience task operations wrapper */
+ protected implicit def task2ops[R, Tp](tsk: Task[R, Tp]) = new {
+ def mapResult[R1](mapping: R => R1): ResultMapping[R, Tp, R1] = new ResultMapping[R, Tp, R1](tsk) {
+ def map(r: R): R1 = mapping(r)
+ }
+
+ def compose[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] {
+ val ft = tsk
+ val st = t2
+ def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr)
+ }
+
+ def parallel[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] {
+ val ft = tsk
+ val st = t2
+ def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr)
+ }
+ }
+
+ protected def wrap[R](body: => R) = new NonDivisible[R] {
+ def leaf(prevr: Option[R]) = result = body
+ var result: R = null.asInstanceOf[R]
+ }
+
+ /* convenience iterator operations wrapper */
+ protected implicit def iterator2ops[PI <: ParallelIterator](it: PI) = new {
+ def assign(cntx: Signalling): PI = {
+ it.signalDelegate = cntx
+ it
+ }
+ }
+
+ protected implicit def builder2ops[Elem, To](cb: Builder[Elem, To]) = new {
+ def ifIs[Cmb](isbody: Cmb => Unit) = new {
+ def otherwise(notbody: => Unit)(implicit m: ClassManifest[Cmb]) {
+ if (cb.getClass == m.erasure) isbody(cb.asInstanceOf[Cmb]) else notbody
+ }
+ }
+ }
+
+ override def toString = seq.mkString(stringPrefix + "(", ", ", ")")
+
+ /** Reduces the elements of this sequence using the specified associative binary operator.
+ *
+ * $undefinedorder
+ *
+ * Note this method has a different signature than the `reduceLeft`
+ * and `reduceRight` methods of the trait `Traversable`.
+ * The result of reducing may only be a supertype of this parallel collection's
+ * type parameter `T`.
+ *
+ * @tparam U A type parameter for the binary operator, a supertype of `T`.
+ * @param op A binary operator that must be associative.
+ * @return The result of applying reduce operator `op` between all the elements if the collection is nonempty.
+ * @throws UnsupportedOperationException
+ * if this $coll is empty.
+ */
+ def reduce[U >: T](op: (U, U) => U): U = {
+ executeAndWaitResult(new Reduce(op, parallelIterator))
+ }
+
+ /** Optionally reduces the elements of this sequence using the specified associative binary operator.
+ *
+ * $undefinedorder
+ *
+ * Note this method has a different signature than the `reduceLeftOption`
+ * and `reduceRightOption` methods of the trait `Traversable`.
+ * The result of reducing may only be a supertype of this parallel collection's
+ * type parameter `T`.
+ *
+ * @tparam U A type parameter for the binary operator, a supertype of `T`.
+ * @param op A binary operator that must be associative.
+ * @return An option value containing result of applying reduce operator `op` between all
+ * the elements if the collection is nonempty, and `None` otherwise.
+ */
+ def reduceOption[U >: T](op: (U, U) => U): Option[U] = if (isEmpty) None else Some(reduce(op))
+
+ /** Folds the elements of this sequence using the specified associative binary operator.
+ * The order in which the elements are reduced is unspecified and may be nondeterministic.
+ *
+ * Note this method has a different signature than the `foldLeft`
+ * and `foldRight` methods of the trait `Traversable`.
+ * The result of folding may only be a supertype of this parallel collection's
+ * type parameter `T`.
+ *
+ * @tparam U a type parameter for the binary operator, a supertype of `T`.
+ * @param z a neutral element for the fold operation, it may be added to the result
+ * an arbitrary number of times, not changing the result (e.g. `Nil` for list concatenation,
+ * 0 for addition, or 1 for multiplication)
+ * @param op a binary operator that must be associative
+ * @return the result of applying fold operator `op` between all the elements and `z`
+ */
+ def fold[U >: T](z: U)(op: (U, U) => U): U = {
+ executeAndWaitResult(new Fold(z, op, parallelIterator))
+ }
+
+ /** Aggregates the results of applying an operator to subsequent elements.
+ *
+ * This is a more general form of `fold` and `reduce`. It has similar semantics, but does
+ * not require the result to be a supertype of the element type. It traverses the elements in
+ * different partitions sequentially, using `seqop` to update the result, and then
+ * applies `combop` to results from different partitions. The implementation of this
+ * operation may operate on an arbitrary number of collection partitions, so `combop`
+ * may be invoked arbitrary number of times.
+ *
+ * For example, one might want to process some elements and then produce a `Set`. In this
+ * case, `seqop` would process an element and append it to the list, while `combop`
+ * would concatenate two lists from different partitions together. The initial value
+ * `z` would be an empty set.
+ *
+ * {{{
+ * pc.aggregate(Set[Int]())(_ += process(_), _ ++ _)
+ * }}}
+ *
+ * Another example is calculating geometric mean from a collection of doubles
+ * (one would typically require big doubles for this).
+ *
+ * @tparam S the type of accumulated results
+ * @param z the initial value for the accumulated result of the partition - this
+ * will typically be the neutral element for the `seqop` operator (e.g.
+ * `Nil` for list concatenation or `0` for summation)
+ * @param seqop an operator used to accumulate results within a partition
+ * @param combop an associative operator used to combine results from different partitions
+ */
+ def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
+ executeAndWaitResult(new Aggregate(z, seqop, combop, parallelIterator))
+ }
+
+ /** Applies a function `f` to all the elements of the receiver.
+ *
+ * $undefinedorder
+ *
+ * @tparam U the result type of the function applied to each element, which is always discarded
+ * @param f function that's applied to each element
+ */
+ override def foreach[U](f: T => U): Unit = {
+ executeAndWait(new Foreach(f, parallelIterator))
+ }
+
+ override def count(p: T => Boolean): Int = {
+ executeAndWaitResult(new Count(p, parallelIterator))
+ }
+
+ override def sum[U >: T](implicit num: Numeric[U]): U = {
+ executeAndWaitResult(new Sum[U](num, parallelIterator))
+ }
+
+ override def product[U >: T](implicit num: Numeric[U]): U = {
+ executeAndWaitResult(new Product[U](num, parallelIterator))
+ }
+
+ override def min[U >: T](implicit ord: Ordering[U]): T = {
+ executeAndWaitResult(new Min(ord, parallelIterator)).asInstanceOf[T]
+ }
+
+ override def max[U >: T](implicit ord: Ordering[U]): T = {
+ executeAndWaitResult(new Max(ord, parallelIterator)).asInstanceOf[T]
+ }
+
+ override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf =>
+ executeAndWaitResult(new Map[S, That](f, pbf, parallelIterator) mapResult { _.result })
+ } otherwise super.map(f)(bf)
+
+ override def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf =>
+ executeAndWaitResult(new Collect[S, That](pf, pbf, parallelIterator) mapResult { _.result })
+ } otherwise super.collect(pf)(bf)
+
+ override def flatMap[S, That](f: T => Traversable[S])(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf =>
+ executeAndWaitResult(new FlatMap[S, That](f, pbf, parallelIterator) mapResult { _.result })
+ } otherwise super.flatMap(f)(bf)
+
+ /** Tests whether a predicate holds for all elements of this $coll.
+ *
+ * $abortsignalling
+ *
+ * @param p a predicate used to test elements
+ * @return true if `p` holds for all elements, false otherwise
+ */
+ override def forall(pred: T => Boolean): Boolean = {
+ executeAndWaitResult(new Forall(pred, parallelIterator assign new DefaultSignalling with VolatileAbort))
+ }
+
+ /** Tests whether a predicate holds for some element of this $coll.
+ *
+ * $abortsignalling
+ *
+ * @param p a predicate used to test elements
+ * @return true if `p` holds for some element, false otherwise
+ */
+ override def exists(pred: T => Boolean): Boolean = {
+ executeAndWaitResult(new Exists(pred, parallelIterator assign new DefaultSignalling with VolatileAbort))
+ }
+
+ /** Finds some element in the collection for which the predicate holds, if such
+ * an element exists. The element may not necessarily be the first such element
+ * in the iteration order.
+ *
+ * If there are multiple elements obeying the predicate, the choice is nondeterministic.
+ *
+ * $abortsignalling
+ *
+ * @param p predicate used to test the elements
+ * @return an option value with the element if such an element exists, or `None` otherwise
+ */
+ override def find(pred: T => Boolean): Option[T] = {
+ executeAndWaitResult(new Find(pred, parallelIterator assign new DefaultSignalling with VolatileAbort))
+ }
+
+ protected[this] def cbfactory = () => newCombiner
+
+ override def filter(pred: T => Boolean): Repr = {
+ executeAndWaitResult(new Filter(pred, cbfactory, parallelIterator) mapResult { _.result })
+ }
+
+ override def filterNot(pred: T => Boolean): Repr = {
+ executeAndWaitResult(new FilterNot(pred, cbfactory, parallelIterator) mapResult { _.result })
+ }
+
+ override def ++[U >: T, That](that: TraversableOnce[U])(implicit bf: CanBuildFrom[Repr, U, That]): That = {
+ if (that.isParallel && bf.isParallel) {
+ // println("case both are parallel")
+ val other = that.asParallelIterable
+ val pbf = bf.asParallel
+ val copythis = new Copy(() => pbf(repr), parallelIterator)
+ val copythat = wrap {
+ val othtask = new other.Copy(() => pbf(self.repr), other.parallelIterator)
+ othtask.compute
+ othtask.result
+ }
+ val task = (copythis parallel copythat) { _ combine _ } mapResult { _.result }
+ executeAndWaitResult(task)
+ } else if (bf.isParallel) {
+ // println("case parallel builder, `that` not parallel")
+ val pbf = bf.asParallel
+ val copythis = new Copy(() => pbf(repr), parallelIterator)
+ val copythat = wrap {
+ val cb = pbf(repr)
+ for (elem <- that) cb += elem
+ cb
+ }
+ executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.result })
+ } else {
+ // println("case not a parallel builder")
+ val b = bf(repr)
+ this.parallelIterator.copy2builder[U, That, Builder[U, That]](b)
+ if (that.isInstanceOf[Parallel]) for (elem <- that.asInstanceOf[Iterable[U]].iterator) b += elem
+ else for (elem <- that) b += elem
+ b.result
+ }
+ }
+
+ override def partition(pred: T => Boolean): (Repr, Repr) = {
+ executeAndWaitResult(new Partition(pred, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) })
+ }
+
+ override def take(n: Int): Repr = {
+ val actualn = if (size > n) n else size
+ if (actualn < MIN_FOR_COPY) take_sequential(actualn)
+ else executeAndWaitResult(new Take(actualn, cbfactory, parallelIterator) mapResult { _.result })
+ }
+
+ private def take_sequential(n: Int) = {
+ val cb = newCombiner
+ cb.sizeHint(n)
+ val it = parallelIterator
+ var left = n
+ while (left > 0) {
+ cb += it.next
+ left -= 1
+ }
+ cb.result
+ }
+
+ override def drop(n: Int): Repr = {
+ val actualn = if (size > n) n else size
+ if ((size - actualn) < MIN_FOR_COPY) drop_sequential(actualn)
+ else executeAndWaitResult(new Drop(actualn, cbfactory, parallelIterator) mapResult { _.result })
+ }
+
+ private def drop_sequential(n: Int) = {
+ val it = parallelIterator drop n
+ val cb = newCombiner
+ cb.sizeHint(size - n)
+ while (it.hasNext) cb += it.next
+ cb.result
+ }
+
+ override def slice(unc_from: Int, unc_until: Int): Repr = {
+ val from = unc_from min size max 0
+ val until = unc_until min size max from
+ if ((until - from) <= MIN_FOR_COPY) slice_sequential(from, until)
+ else executeAndWaitResult(new Slice(from, until, cbfactory, parallelIterator) mapResult { _.result })
+ }
+
+ private def slice_sequential(from: Int, until: Int): Repr = {
+ val cb = newCombiner
+ var left = until - from
+ val it = parallelIterator drop from
+ while (left > 0) {
+ cb += it.next
+ left -= 1
+ }
+ cb.result
+ }
+
+ override def splitAt(n: Int): (Repr, Repr) = {
+ executeAndWaitResult(new SplitAt(n, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) })
+ }
+
+ /** Takes the longest prefix of elements that satisfy the predicate.
+ *
+ * $indexsignalling
+ * The index flag is initially set to maximum integer value.
+ *
+ * @param pred the predicate used to test the elements
+ * @return the longest prefix of this $coll of elements that satisy the predicate `pred`
+ */
+ override def takeWhile(pred: T => Boolean): Repr = {
+ val cntx = new DefaultSignalling with AtomicIndexFlag
+ cntx.setIndexFlag(Int.MaxValue)
+ executeAndWaitResult(new TakeWhile(0, pred, cbfactory, parallelIterator assign cntx) mapResult { _._1.result })
+ }
+
+ /** Splits this $coll into a prefix/suffix pair according to a predicate.
+ *
+ * $indexsignalling
+ * The index flag is initially set to maximum integer value.
+ *
+ * @param pred the predicate used to test the elements
+ * @return a pair consisting of the longest prefix of the collection for which all
+ * the elements satisfy `pred`, and the rest of the collection
+ */
+ override def span(pred: T => Boolean): (Repr, Repr) = {
+ val cntx = new DefaultSignalling with AtomicIndexFlag
+ cntx.setIndexFlag(Int.MaxValue)
+ executeAndWaitResult(new Span(0, pred, cbfactory, parallelIterator assign cntx) mapResult {
+ p => (p._1.result, p._2.result)
+ })
+ }
+
+ /** Drops all elements in the longest prefix of elements that satisfy the predicate,
+ * and returns a collection composed of the remaining elements.
+ *
+ * $indexsignalling
+ * The index flag is initially set to maximum integer value.
+ *
+ * @param pred the predicate used to test the elements
+ * @return a collection composed of all the elements after the longest prefix of elements
+ * in this $coll that satisfy the predicate `pred`
+ */
+ override def dropWhile(pred: T => Boolean): Repr = {
+ val cntx = new DefaultSignalling with AtomicIndexFlag
+ cntx.setIndexFlag(Int.MaxValue)
+ executeAndWaitResult(new Span(0, pred, cbfactory, parallelIterator assign cntx) mapResult { _._2.result })
+ }
+
+ override def copyToArray[U >: T](xs: Array[U], start: Int, len: Int) = if (len > 0) {
+ executeAndWait(new CopyToArray(start, len, xs, parallelIterator))
+ }
+
+ override def toIterable: Iterable[T] = seq.drop(0).asInstanceOf[Iterable[T]]
+
+ override def toArray[U >: T: ClassManifest]: Array[U] = {
+ val arr = new Array[U](size)
+ copyToArray(arr)
+ arr
+ }
+
+ override def toList: List[T] = seq.toList
+
+ override def toIndexedSeq[S >: T]: collection.immutable.IndexedSeq[S] = seq.toIndexedSeq[S]
+
+ override def toStream: Stream[T] = seq.toStream
+
+ override def toSet[S >: T]: collection.immutable.Set[S] = seq.toSet
+
+ override def toSeq: Seq[T] = seq.toSeq
+
+ /* tasks */
+
+ /** Standard accessor task that iterates over the elements of the collection.
+ *
+ * @tparam R type of the result of this method (`R` for result).
+ * @tparam Tp the representation type of the task at hand.
+ */
+ protected trait Accessor[R, Tp]
+ extends super.Task[R, Tp] {
+ val pit: ParallelIterator
+ def newSubtask(p: ParallelIterator): Accessor[R, Tp]
+ def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel)
+ def split = pit.split.map(newSubtask(_)) // default split procedure
+ override def toString = "Accessor(" + pit.toString + ")"
+ }
+
+ protected[this] trait NonDivisibleTask[R, Tp] extends super.Task[R, Tp] {
+ def shouldSplitFurther = false
+ def split = throw new UnsupportedOperationException("Does not split.")
+ override def toString = "NonDivisibleTask"
+ }
+
+ protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]]
+
+ protected[this] trait Composite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]]
+ extends NonDivisibleTask[R, Composite[FR, SR, R, First, Second]] {
+ val ft: First
+ val st: Second
+ def combineResults(fr: FR, sr: SR): R
+ var result: R = null.asInstanceOf[R]
+ }
+
+ /** Sequentially performs one task after another. */
+ protected[this] trait SeqComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]]
+ extends Composite[FR, SR, R, First, Second] {
+ def leaf(prevr: Option[R]) = {
+ ft.compute
+ st.compute
+ result = combineResults(ft.result, st.result)
+ }
+ }
+
+ /** Performs two tasks in parallel, and waits for both to finish. */
+ protected[this] trait ParComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]]
+ extends Composite[FR, SR, R, First, Second] {
+ def leaf(prevr: Option[R]) = {
+ st.start
+ ft.compute
+ st.sync
+ result = combineResults(ft.result, st.result)
+ }
+ }
+
+ protected[this] abstract class ResultMapping[R, Tp, R1](val inner: Task[R, Tp])
+ extends NonDivisibleTask[R1, ResultMapping[R, Tp, R1]] {
+ var result: R1 = null.asInstanceOf[R1]
+ def map(r: R): R1
+ def leaf(prevr: Option[R1]) = {
+ inner.compute
+ result = map(inner.result)
+ }
+ }
+
+ protected trait Transformer[R, Tp] extends Accessor[R, Tp]
+
+ protected[this] class Foreach[S](op: T => S, val pit: ParallelIterator) extends Accessor[Unit, Foreach[S]] {
+ var result: Unit = ()
+ def leaf(prevr: Option[Unit]) = pit.foreach(op)
+ def newSubtask(p: ParallelIterator) = new Foreach[S](op, p)
+ }
+
+ protected[this] class Count(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Int, Count] {
+ var result: Int = 0
+ def leaf(prevr: Option[Int]) = result = pit.count(pred)
+ def newSubtask(p: ParallelIterator) = new Count(pred, p)
+ override def merge(that: Count) = result = result + that.result
+ }
+
+ protected[this] class Reduce[U >: T](op: (U, U) => U, val pit: ParallelIterator) extends Accessor[U, Reduce[U]] {
+ var result: U = null.asInstanceOf[U]
+ def leaf(prevr: Option[U]) = result = pit.reduce(op)
+ def newSubtask(p: ParallelIterator) = new Reduce(op, p)
+ override def merge(that: Reduce[U]) = result = op(result, that.result)
+ }
+
+ protected[this] class Fold[U >: T](z: U, op: (U, U) => U, val pit: ParallelIterator) extends Accessor[U, Fold[U]] {
+ var result: U = null.asInstanceOf[U]
+ def leaf(prevr: Option[U]) = result = pit.fold(z)(op)
+ def newSubtask(p: ParallelIterator) = new Fold(z, op, p)
+ override def merge(that: Fold[U]) = result = op(result, that.result)
+ }
+
+ protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, val pit: ParallelIterator)
+ extends Accessor[S, Aggregate[S]] {
+ var result: S = null.asInstanceOf[S]
+ def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop)
+ def newSubtask(p: ParallelIterator) = new Aggregate(z, seqop, combop, p)
+ override def merge(that: Aggregate[S]) = result = combop(result, that.result)
+ }
+
+ protected[this] class Sum[U >: T](num: Numeric[U], val pit: ParallelIterator) extends Accessor[U, Sum[U]] {
+ var result: U = null.asInstanceOf[U]
+ def leaf(prevr: Option[U]) = result = pit.sum(num)
+ def newSubtask(p: ParallelIterator) = new Sum(num, p)
+ override def merge(that: Sum[U]) = result = num.plus(result, that.result)
+ }
+
+ protected[this] class Product[U >: T](num: Numeric[U], val pit: ParallelIterator) extends Accessor[U, Product[U]] {
+ var result: U = null.asInstanceOf[U]
+ def leaf(prevr: Option[U]) = result = pit.product(num)
+ def newSubtask(p: ParallelIterator) = new Product(num, p)
+ override def merge(that: Product[U]) = result = num.times(result, that.result)
+ }
+
+ protected[this] class Min[U >: T](ord: Ordering[U], val pit: ParallelIterator) extends Accessor[U, Min[U]] {
+ var result: U = null.asInstanceOf[U]
+ def leaf(prevr: Option[U]) = result = pit.min(ord)
+ def newSubtask(p: ParallelIterator) = new Min(ord, p)
+ override def merge(that: Min[U]) = result = if (ord.lteq(result, that.result)) result else that.result
+ }
+
+ protected[this] class Max[U >: T](ord: Ordering[U], val pit: ParallelIterator) extends Accessor[U, Max[U]] {
+ var result: U = null.asInstanceOf[U]
+ def leaf(prevr: Option[U]) = result = pit.max(ord)
+ def newSubtask(p: ParallelIterator) = new Max(ord, p)
+ override def merge(that: Max[U]) = result = if (ord.gteq(result, that.result)) result else that.result
+ }
+
+ protected[this] class Map[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator)
+ extends Transformer[Combiner[S, That], Map[S, That]] {
+ var result: Combiner[S, That] = null
+ def leaf(prev: Option[Combiner[S, That]]) = result = pit.map2combiner(f, reuse(prev, pbf(self.repr)))
+ def newSubtask(p: ParallelIterator) = new Map(f, pbf, p)
+ override def merge(that: Map[S, That]) = result = result combine that.result
+ }
+
+ protected[this] class Collect[S, That]
+ (pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator)
+ extends Transformer[Combiner[S, That], Collect[S, That]] {
+ var result: Combiner[S, That] = null
+ def leaf(prev: Option[Combiner[S, That]]) = result = pit.collect2combiner[S, That](pf, pbf) // TODO
+ def newSubtask(p: ParallelIterator) = new Collect(pf, pbf, p)
+ override def merge(that: Collect[S, That]) = result = result combine that.result
+ }
+
+ protected[this] class FlatMap[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator)
+ extends Transformer[Combiner[S, That], FlatMap[S, That]] {
+ var result: Combiner[S, That] = null
+ def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf) // TODO
+ def newSubtask(p: ParallelIterator) = new FlatMap(f, pbf, p)
+ override def merge(that: FlatMap[S, That]) = result = result combine that.result
+ }
+
+ protected[this] class Forall(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Boolean, Forall] {
+ var result: Boolean = true
+ def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.forall(pred); if (result == false) pit.abort }
+ def newSubtask(p: ParallelIterator) = new Forall(pred, p)
+ override def merge(that: Forall) = result = result && that.result
+ }
+
+ protected[this] class Exists(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Boolean, Exists] {
+ var result: Boolean = false
+ def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.exists(pred); if (result == true) pit.abort }
+ def newSubtask(p: ParallelIterator) = new Exists(pred, p)
+ override def merge(that: Exists) = result = result || that.result
+ }
+
+ protected[this] class Find[U >: T](pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Option[U], Find[U]] {
+ var result: Option[U] = None
+ def leaf(prev: Option[Option[U]]) = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort }
+ def newSubtask(p: ParallelIterator) = new Find(pred, p)
+ override def merge(that: Find[U]) = if (this.result == None) result = that.result
+ }
+
+ protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
+ extends Transformer[Combiner[U, This], Filter[U, This]] {
+ var result: Combiner[U, This] = null
+ def leaf(prev: Option[Combiner[U, This]]) = result = pit.filter2combiner(pred, reuse(prev, cbf()))
+ def newSubtask(p: ParallelIterator) = new Filter(pred, cbf, p)
+ override def merge(that: Filter[U, This]) = result = result combine that.result
+ }
+
+ protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
+ extends Transformer[Combiner[U, This], FilterNot[U, This]] {
+ var result: Combiner[U, This] = null
+ def leaf(prev: Option[Combiner[U, This]]) = result = pit.filterNot2combiner(pred, reuse(prev, cbf()))
+ def newSubtask(p: ParallelIterator) = new FilterNot(pred, cbf, p)
+ override def merge(that: FilterNot[U, This]) = result = result combine that.result
+ }
+
+ protected class Copy[U >: T, That](cfactory: () => Combiner[U, That], val pit: ParallelIterator)
+ extends Transformer[Combiner[U, That], Copy[U, That]] {
+ var result: Combiner[U, That] = null
+ def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cfactory()))
+ def newSubtask(p: ParallelIterator) = new Copy[U, That](cfactory, p)
+ override def merge(that: Copy[U, That]) = result = result combine that.result
+ }
+
+ protected[this] class Partition[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
+ extends Transformer[(Combiner[U, This], Combiner[U, This]), Partition[U, This]] {
+ var result: (Combiner[U, This], Combiner[U, This]) = null
+ def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.partition2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf()))
+ def newSubtask(p: ParallelIterator) = new Partition(pred, cbf, p)
+ override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2)
+ }
+
+ protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator)
+ extends Transformer[Combiner[U, This], Take[U, This]] {
+ var result: Combiner[U, This] = null
+ def leaf(prev: Option[Combiner[U, This]]) = result = pit.take2combiner(n, reuse(prev, cbf()))
+ def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val pits = pit.split
+ val sizes = pits.scanLeft(0)(_ + _.remaining)
+ for ((p, untilp) <- pits zip sizes; if untilp <= n) yield {
+ if (untilp + p.remaining < n) new Take(p.remaining, cbf, p)
+ else new Take(n - untilp, cbf, p)
+ }
+ }
+ override def merge(that: Take[U, This]) = result = result combine that.result
+ }
+
+ protected[this] class Drop[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator)
+ extends Transformer[Combiner[U, This], Drop[U, This]] {
+ var result: Combiner[U, This] = null
+ def leaf(prev: Option[Combiner[U, This]]) = result = pit.drop2combiner(n, reuse(prev, cbf()))
+ def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val pits = pit.split
+ val sizes = pits.scanLeft(0)(_ + _.remaining)
+ for ((p, withp) <- pits zip sizes.tail; if withp >= n) yield {
+ if (withp - p.remaining > n) new Drop(0, cbf, p)
+ else new Drop(n - withp + p.remaining, cbf, p)
+ }
+ }
+ override def merge(that: Drop[U, This]) = result = result combine that.result
+ }
+
+ protected[this] class Slice[U >: T, This >: Repr](from: Int, until: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator)
+ extends Transformer[Combiner[U, This], Slice[U, This]] {
+ var result: Combiner[U, This] = null
+ def leaf(prev: Option[Combiner[U, This]]) = result = pit.slice2combiner(from, until, reuse(prev, cbf()))
+ def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val pits = pit.split
+ val sizes = pits.scanLeft(0)(_ + _.remaining)
+ for ((p, untilp) <- pits zip sizes; if untilp + p.remaining >= from || untilp <= until) yield {
+ val f = (from max untilp) - untilp
+ val u = (until min (untilp + p.remaining)) - untilp
+ new Slice(f, u, cbf, p)
+ }
+ }
+ override def merge(that: Slice[U, This]) = result = result combine that.result
+ }
+
+ protected[this] class SplitAt[U >: T, This >: Repr](at: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator)
+ extends Transformer[(Combiner[U, This], Combiner[U, This]), SplitAt[U, This]] {
+ var result: (Combiner[U, This], Combiner[U, This]) = null
+ def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.splitAt2combiners(at, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf()))
+ def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val pits = pit.split
+ val sizes = pits.scanLeft(0)(_ + _.remaining)
+ for ((p, untilp) <- pits zip sizes) yield new SplitAt((at max untilp min (untilp + p.remaining)) - untilp, cbf, p)
+ }
+ override def merge(that: SplitAt[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2)
+ }
+
+ protected[this] class TakeWhile[U >: T, This >: Repr]
+ (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
+ extends Transformer[(Combiner[U, This], Boolean), TakeWhile[U, This]] {
+ var result: (Combiner[U, This], Boolean) = null
+ def leaf(prev: Option[(Combiner[U, This], Boolean)]) = if (pos < pit.indexFlag) {
+ result = pit.takeWhile2combiner(pred, reuse(prev.map(_._1), cbf()))
+ if (!result._2) pit.setIndexFlagIfLesser(pos)
+ } else result = (reuse(prev.map(_._1), cbf()), false)
+ def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val pits = pit.split
+ for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new TakeWhile(pos + untilp, pred, cbf, p)
+ }
+ override def merge(that: TakeWhile[U, This]) = if (result._2) {
+ result = (result._1 combine that.result._1, that.result._2)
+ }
+ }
+
+ protected[this] class Span[U >: T, This >: Repr]
+ (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
+ extends Transformer[(Combiner[U, This], Combiner[U, This]), Span[U, This]] {
+ var result: (Combiner[U, This], Combiner[U, This]) = null
+ def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = if (pos < pit.indexFlag) {
+ result = pit.span2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf()))
+ if (result._2.size > 0) pit.setIndexFlagIfLesser(pos)
+ } else {
+ result = (reuse(prev.map(_._2), cbf()), pit.copy2builder[U, This, Combiner[U, This]](reuse(prev.map(_._2), cbf())))
+ }
+ def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val pits = pit.split
+ for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Span(pos + untilp, pred, cbf, p)
+ }
+ override def merge(that: Span[U, This]) = result = if (result._2.size == 0) {
+ (result._1 combine that.result._1, that.result._2)
+ } else {
+ (result._1, result._2 combine that.result._1 combine that.result._2)
+ }
+ }
+
+ protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], val pit: ParallelIterator)
+ extends Accessor[Unit, CopyToArray[U, This]] {
+ var result: Unit = ()
+ def leaf(prev: Option[Unit]) = pit.copyToArray(array, from, len)
+ def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val pits = pit.split
+ for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining); if untilp < len) yield {
+ val plen = p.remaining min (len - untilp)
+ new CopyToArray[U, This](from + untilp, plen, array, p)
+ }
+ }
+ }
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/ParallelIterableView.scala b/src/library/scala/collection/parallel/ParallelIterableView.scala
new file mode 100644
index 0000000000..f40f02eb3b
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParallelIterableView.scala
@@ -0,0 +1,33 @@
+package scala.collection.parallel
+
+
+
+
+import scala.collection.Parallel
+import scala.collection.TraversableViewLike
+import scala.collection.IterableView
+
+
+
+
+/** A template view of a non-strict view of a parallel iterable collection.
+ *
+ * @tparam T ...
+ * @tparam Coll ...
+ *
+ * @since 2.8
+ */
+trait ParallelIterableView[+T, +Coll <: Parallel, +CollSeq]
+extends ParallelIterableViewLike[T, Coll, CollSeq, ParallelIterableView[T, Coll, CollSeq], IterableView[T, CollSeq]]
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/ParallelIterableViewLike.scala b/src/library/scala/collection/parallel/ParallelIterableViewLike.scala
new file mode 100644
index 0000000000..024eb48d25
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParallelIterableViewLike.scala
@@ -0,0 +1,59 @@
+package scala.collection.parallel
+
+
+
+
+import scala.collection.Parallel
+import scala.collection.TraversableViewLike
+import scala.collection.IterableView
+import scala.collection.IterableViewLike
+
+
+
+
+
+/** A template view of a non-strict view of parallel iterable collection.
+ *
+ * '''Note:''' Regular view traits have type parameters used to carry information
+ * about the type of the elements, type of the collection they are derived from and
+ * their own actual representation type. Parallel views have an additional parameter
+ * which carries information about the type of the sequential version of the view.
+ *
+ * @tparam T the type of the elements this view can traverse
+ * @tparam Coll the type of the collection this view is derived from
+ * @tparam CollSeq TODO
+ * @tparam This the actual representation type of this view
+ * @tparam ThisSeq the type of the sequential representation of this view
+ *
+ * @since 2.8
+ */
+trait ParallelIterableViewLike[+T,
+ +Coll <: Parallel,
+ +CollSeq,
+ +This <: ParallelIterableView[T, Coll, CollSeq] with ParallelIterableViewLike[T, Coll, CollSeq, This, ThisSeq],
+ +ThisSeq <: IterableView[T, CollSeq] with IterableViewLike[T, CollSeq, ThisSeq]]
+extends IterableView[T, Coll]
+ with IterableViewLike[T, Coll, This]
+ with ParallelIterable[T]
+ with ParallelIterableLike[T, This, ThisSeq]
+{
+ self =>
+
+ override protected[this] def newCombiner: Combiner[T, This] = throw new UnsupportedOperationException(this + ".newCombiner");
+
+ //type SCPI = SignalContextPassingIterator[ParallelIterator] // complains when overriden further in inh. hier., TODO check it out
+ type CPI = SignalContextPassingIterator[ParallelIterator]
+
+ trait Transformed[+S] extends ParallelIterableView[S, Coll, CollSeq] with super.Transformed[S]
+
+}
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/ParallelMap.scala b/src/library/scala/collection/parallel/ParallelMap.scala
new file mode 100644
index 0000000000..5ce61469bc
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParallelMap.scala
@@ -0,0 +1,71 @@
+package scala.collection.parallel
+
+
+
+
+
+import scala.collection.Map
+import scala.collection.mutable.Builder
+import scala.collection.generic.ParallelMapFactory
+import scala.collection.generic.GenericParallelMapTemplate
+import scala.collection.generic.GenericParallelMapCompanion
+import scala.collection.generic.CanCombineFrom
+
+
+
+
+
+
+trait ParallelMap[K, +V]
+extends Map[K, V]
+ with GenericParallelMapTemplate[K, V, ParallelMap]
+ with ParallelIterable[(K, V)]
+ with ParallelMapLike[K, V, ParallelMap[K, V], Map[K, V]]
+{
+self =>
+
+ def mapCompanion: GenericParallelMapCompanion[ParallelMap] = ParallelMap
+
+ override def empty: ParallelMap[K, V] = new immutable.ParallelHashTrie[K, V]
+
+ override def stringPrefix = "ParallelMap"
+}
+
+
+
+object ParallelMap extends ParallelMapFactory[ParallelMap] {
+ def empty[K, V]: ParallelMap[K, V] = new immutable.ParallelHashTrie[K, V]
+
+ def newCombiner[K, V]: Combiner[(K, V), ParallelMap[K, V]] = immutable.HashTrieCombiner[K, V]
+
+ implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParallelMap[K, V]] = new CanCombineFromMap[K, V]
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/ParallelMapLike.scala b/src/library/scala/collection/parallel/ParallelMapLike.scala
new file mode 100644
index 0000000000..8a0b54525f
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParallelMapLike.scala
@@ -0,0 +1,43 @@
+package scala.collection.parallel
+
+
+
+
+import scala.collection.MapLike
+import scala.collection.Map
+import scala.collection.mutable.Builder
+
+
+
+
+
+
+
+
+trait ParallelMapLike[K,
+ +V,
+ +Repr <: ParallelMapLike[K, V, Repr, SequentialView] with ParallelMap[K, V],
+ +SequentialView <: Map[K, V]]
+extends MapLike[K, V, Repr]
+ with ParallelIterableLike[(K, V), Repr, SequentialView]
+{ self =>
+
+ protected[this] override def newBuilder: Builder[(K, V), Repr] = newCombiner
+
+ protected[this] override def newCombiner: Combiner[(K, V), Repr] = error("Must be implemented in concrete classes.")
+
+ override def empty: Repr
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/ParallelSeq.scala b/src/library/scala/collection/parallel/ParallelSeq.scala
new file mode 100644
index 0000000000..71b802cd11
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParallelSeq.scala
@@ -0,0 +1,64 @@
+package scala.collection.parallel
+
+
+
+import scala.collection.generic.GenericCompanion
+import scala.collection.generic.GenericParallelCompanion
+import scala.collection.generic.GenericParallelTemplate
+import scala.collection.generic.ParallelFactory
+import scala.collection.generic.CanCombineFrom
+import scala.collection.parallel.mutable.ParallelArrayCombiner
+import scala.collection.parallel.mutable.ParallelArray
+
+
+/** A template trait for parallel sequences.
+ *
+ * $parallelseqinfo
+ *
+ * $sideeffects
+ */
+trait ParallelSeq[+T] extends Seq[T]
+ with ParallelIterable[T]
+ with GenericParallelTemplate[T, ParallelSeq]
+ with ParallelSeqLike[T, ParallelSeq[T], Seq[T]] {
+ override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq
+
+ def apply(i: Int): T
+
+ override def toString = super[ParallelIterable].toString
+}
+
+
+object ParallelSeq extends ParallelFactory[ParallelSeq] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeq[T]] = new GenericCanCombineFrom[T]
+
+ def newBuilder[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T]
+
+ def newCombiner[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T]
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/ParallelSeqLike.scala b/src/library/scala/collection/parallel/ParallelSeqLike.scala
new file mode 100644
index 0000000000..18b0c83f23
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParallelSeqLike.scala
@@ -0,0 +1,473 @@
+package scala.collection.parallel
+
+
+import scala.collection.Parallel
+import scala.collection.SeqLike
+import scala.collection.generic.DefaultSignalling
+import scala.collection.generic.AtomicIndexFlag
+import scala.collection.generic.CanBuildFrom
+import scala.collection.generic.CanCombineFrom
+import scala.collection.generic.VolatileAbort
+
+
+
+
+// TODO update docs!!
+/** A template trait for sequences of type `ParallelSeq[T]`, representing
+ * parallel sequences with element type `T`.
+ *
+ * $parallelseqinfo
+ *
+ * @tparam T the type of the elements contained in this collection
+ * @tparam Repr the type of the actual collection containing the elements
+ *
+ * @define parallelseqinfo
+ * Parallel sequences inherit the `IndexedSeq` trait. This means they provide
+ * efficient indexing and length computations. Like their sequential counterparts
+ * they always have a defined order of elements. This means they will produce resulting
+ * parallel sequences in the same way sequential sequences do. However, the order
+ * in which they iterate over elements to produce results is not defined and is generally
+ * nondeterministic. If the higher-order functions given to them produce no sideeffects,
+ * then this won't be noticeable.
+ *
+ * This trait defines a new, more general `split` operation and reimplements the `split`
+ * operation of `ParallelIterable` trait using the new `split` operation.
+ *
+ * @author prokopec
+ * @since 2.8
+ */
+trait ParallelSeqLike[+T, +Repr <: Parallel, +Sequential <: Seq[T] with SeqLike[T, Sequential]]
+extends scala.collection.SeqLike[T, Repr]
+ with ParallelIterableLike[T, Repr, Sequential] {
+ self =>
+
+ type SuperParallelIterator = super.ParallelIterator
+
+ /** An iterator that can be split into arbitrary subsets of iterators.
+ * The self-type requirement ensures that the signal context passing behaviour gets mixed in
+ * the concrete iterator instance in some concrete collection.
+ *
+ * '''Note:''' In concrete collection classes, collection implementers might want to override the iterator
+ * `reverse2builder` method to ensure higher efficiency.
+ */
+ trait ParallelIterator extends ParallelSeqIterator[T, Repr] with super.ParallelIterator {
+ me: SignalContextPassingIterator[ParallelIterator] =>
+ def split: Seq[ParallelIterator]
+ def psplit(sizes: Int*): Seq[ParallelIterator]
+ }
+
+ /** A stackable modification that ensures signal contexts get passed along the iterators.
+ * A self-type requirement in `ParallelIterator` ensures that this trait gets mixed into
+ * concrete iterators.
+ */
+ trait SignalContextPassingIterator[+IterRepr <: ParallelIterator]
+ extends ParallelIterator with super.SignalContextPassingIterator[IterRepr] {
+ // Note: See explanation in `ParallelIterableLike.this.SignalContextPassingIterator`
+ // to understand why we do the cast here, and have a type parameter.
+ // Bottomline: avoiding boilerplate and fighting against inability to override stackable modifications.
+ abstract override def psplit(sizes: Int*): Seq[IterRepr] = {
+ val pits = super.psplit(sizes: _*)
+ pits foreach { _.signalDelegate = signalDelegate }
+ pits.asInstanceOf[Seq[IterRepr]]
+ }
+ }
+
+ /** A convenient shorthand for the signal context passing stackable modification.
+ */
+ type SCPI <: SignalContextPassingIterator[ParallelIterator]
+
+ /** A more refined version of the iterator found in the `ParallelIterable` trait,
+ * this iterator can be split into arbitrary subsets of iterators.
+ *
+ * @return an iterator that can be split into subsets of precise size
+ */
+ protected def parallelIterator: ParallelIterator
+
+ override def iterator: PreciseSplitter[T] = parallelIterator
+
+ override def size = length
+
+ /** Used to iterate elements using indices */
+ protected abstract class Elements(start: Int, val end: Int) extends ParallelIterator with BufferedIterator[T] {
+ me: SignalContextPassingIterator[ParallelIterator] =>
+
+ private var i = start
+
+ def hasNext = i < end
+
+ def next: T = if (i < end) {
+ val x = self(i)
+ i += 1
+ x
+ } else Iterator.empty.next
+
+ def head = self(i)
+
+ final def remaining = end - i
+
+ def split = psplit(remaining / 2, remaining - remaining / 2)
+
+ def psplit(sizes: Int*) = {
+ val incr = sizes.scanLeft(0)(_ + _)
+ for ((from, until) <- incr.init zip incr.tail) yield {
+ new Elements(start + from, (start + until) min end) with SignalContextPassingIterator[ParallelIterator]
+ }
+ }
+
+ override def toString = "Elements(" + start + ", " + end + ")"
+ }
+
+ /* ParallelSeq methods */
+
+ /** Returns the length of the longest segment of elements starting at
+ * a given position satisfying some predicate.
+ *
+ * $indexsignalling
+ *
+ * The index flag is initially set to maximum integer value.
+ *
+ * @param p the predicate used to test the elements
+ * @param from the starting offset for the search
+ * @return the length of the longest segment of elements starting at `from` and
+ * satisfying the predicate
+ */
+ override def segmentLength(p: T => Boolean, from: Int): Int = if (from >= length) 0 else {
+ val realfrom = if (from < 0) 0 else from
+ val ctx = new DefaultSignalling with AtomicIndexFlag
+ ctx.setIndexFlag(Int.MaxValue)
+ executeAndWaitResult(new SegmentLength(p, 0, parallelIterator.psplit(realfrom, length - realfrom)(1) assign ctx))._1
+ }
+
+ override def prefixLength(p: T => Boolean) = segmentLength(p, 0)
+
+ /** Finds the first element satisfying some predicate.
+ *
+ * $indexsignalling
+ *
+ * The index flag is initially set to maximum integer value.
+ *
+ * @param p the predicate used to test the elements
+ * @param from the starting offset for the search
+ * @return the index `>= from` of the first element of this $coll that satisfies the predicate `p`,
+ * or `-1`, if none exists
+ */
+ override def indexWhere(p: T => Boolean, from: Int): Int = if (from >= length) -1 else {
+ val realfrom = if (from < 0) 0 else from
+ val ctx = new DefaultSignalling with AtomicIndexFlag
+ ctx.setIndexFlag(Int.MaxValue)
+ executeAndWaitResult(new IndexWhere(p, realfrom, parallelIterator.psplit(realfrom, length - realfrom)(1) assign ctx))
+ }
+
+ override def indexWhere(p: T => Boolean): Int = indexWhere(p, 0)
+
+ override def findIndexOf(p: T => Boolean): Int = indexWhere(p, 0)
+
+ override def indexOf[U >: T](elem: U): Int = indexOf(elem, 0)
+
+ override def indexOf[U >: T](elem: U, from: Int): Int = indexWhere(elem ==, from)
+
+ /** Finds the last element satisfying some predicate.
+ *
+ * $indexsignalling
+ *
+ * The index flag is initially set to minimum integer value.
+ *
+ * @param p the predicate used to test the elements
+ * @param end the maximum offset for the search
+ * @return the index `<= end` of the first element of this $coll that satisfies the predicate `p`,
+ * or `-1`, if none exists
+ */
+ override def lastIndexWhere(p: T => Boolean, end: Int): Int = if (end < 0) -1 else {
+ val until = if (end >= length) length else end + 1
+ val ctx = new DefaultSignalling with AtomicIndexFlag
+ ctx.setIndexFlag(Int.MinValue)
+ executeAndWaitResult(new LastIndexWhere(p, 0, parallelIterator.psplit(until, length - until)(0) assign ctx))
+ }
+
+ override def reverse: Repr = {
+ executeAndWaitResult(new Reverse(() => newCombiner, parallelIterator) mapResult { _.result })
+ }
+
+ override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf =>
+ executeAndWaitResult(new ReverseMap[S, That](f, pbf, parallelIterator) mapResult { _.result })
+ } otherwise super.reverseMap(f)(bf)
+
+ override def startsWith[S](that: Seq[S]): Boolean = startsWith(that, 0)
+
+ /** Tests whether this $coll contains the given sequence at a given index.
+ *
+ * $abortsignalling
+ *
+ * @tparam U the element type of `that` parallel sequence
+ * @param that the parallel sequence this sequence is being searched for
+ * @param offset the starting offset for the search
+ * @return `true` if there is a sequence `that` starting at `offset` in this sequence, `false` otherwise
+ */
+ override def startsWith[S](that: Seq[S], offset: Int): Boolean = that ifParallelSeq { pthat =>
+ if (offset < 0 || offset >= length) offset == length && pthat.length == 0
+ else if (pthat.length == 0) true
+ else if (pthat.length > length - offset) false
+ else {
+ val ctx = new DefaultSignalling with VolatileAbort
+ executeAndWaitResult(new SameElements(parallelIterator.psplit(offset, pthat.length)(1) assign ctx, pthat.parallelIterator))
+ }
+ } otherwise super.startsWith(that, offset)
+
+ override def sameElements[U >: T](that: Iterable[U]): Boolean = that ifParallelSeq { pthat =>
+ val ctx = new DefaultSignalling with VolatileAbort
+ length == pthat.length && executeAndWaitResult(new SameElements(parallelIterator assign ctx, pthat.parallelIterator))
+ } otherwise super.sameElements(that)
+
+ /** Tests whether this $coll ends with the given parallel sequence
+ *
+ * $abortsignalling
+ *
+ * @tparam S the type of the elements of `that` sequence
+ * @param that the sequence to test
+ * @return `true` if this $coll has `that` as a suffix, `false` otherwise
+ */
+ override def endsWith[S](that: Seq[S]): Boolean = that ifParallelSeq { pthat =>
+ if (that.length == 0) true
+ else if (that.length > length) false
+ else {
+ val ctx = new DefaultSignalling with VolatileAbort
+ val tlen = that.length
+ executeAndWaitResult(new SameElements(parallelIterator.psplit(length - tlen, tlen)(1) assign ctx, pthat.parallelIterator))
+ }
+ } otherwise super.endsWith(that)
+
+ override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int)
+ (implicit bf: CanBuildFrom[Repr, U, That]): That = if (patch.isParallelSeq && bf.isParallel) {
+ val that = patch.asParallelSeq
+ val pbf = bf.asParallel
+ val realreplaced = replaced min (length - from)
+ val pits = parallelIterator.psplit(from, replaced, length - from - realreplaced)
+ val copystart = new Copy[U, That](() => pbf(repr), pits(0))
+ val copymiddle = wrap {
+ val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator)
+ tsk.compute
+ tsk.result
+ }
+ val copyend = new Copy[U, That](() => pbf(repr), pits(2))
+ executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { _.result })
+ } else patch_sequential(from, patch, replaced)
+
+ private def patch_sequential[U >: T, That](from: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
+ val b = bf(repr)
+ val repl = r min (length - from)
+ val pits = parallelIterator.psplit(from, repl, length - from - repl)
+ b ++= pits(0)
+ b ++= patch.iterator
+ b ++= pits(2)
+ b.result
+ }
+
+ override def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = bf ifParallel { pbf =>
+ executeAndWaitResult(new Updated(index, elem, pbf, parallelIterator) mapResult { _.result })
+ } otherwise super.updated(index, elem)
+
+ override def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
+ patch(0, mutable.ParallelArray(elem), 0)
+ }
+
+ override def :+[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
+ patch(length, mutable.ParallelArray(elem), 0)
+ }
+
+ override def padTo[U >: T, That](len: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (length < len) {
+ patch(length, new immutable.Repetition(elem, len - length), 0)
+ } else patch(length, Nil, 0)
+
+ /** Tests whether every element of this $coll relates to the
+ * corresponding element of another parallel sequence by satisfying a test predicate.
+ *
+ * $abortsignalling
+ *
+ * @param that the other parallel sequence
+ * @param p the test predicate, which relates elements from both sequences
+ * @tparam S the type of the elements of `that`
+ * @return `true` if both parallel sequences have the same length and
+ * `p(x, y)` is `true` for all corresponding elements `x` of this $coll
+ * and `y` of `that`, otherwise `false`
+ */
+ override def corresponds[S](that: Seq[S])(p: (T, S) => Boolean): Boolean = that ifParallelSeq { pthat =>
+ val ctx = new DefaultSignalling with VolatileAbort
+ length == pthat.length && executeAndWaitResult(new Corresponds(p, parallelIterator assign ctx, pthat.parallelIterator))
+ } otherwise super.corresponds(that)(p)
+
+ override def toString = seq.mkString(stringPrefix + "(", ", ", ")")
+
+ override def view = new ParallelSeqView[T, Repr, Sequential] {
+ protected lazy val underlying = self.repr
+ def length = self.length
+ def apply(idx: Int) = self(idx)
+ def seq = self.seq.view
+ def parallelIterator = new Elements(0, length) with SCPI {}
+ }
+
+ override def view(from: Int, until: Int) = view.slice(from, until)
+
+ /* tasks */
+
+ protected def down(p: SuperParallelIterator) = p.asInstanceOf[ParallelIterator]
+
+ protected trait Accessor[R, Tp] extends super.Accessor[R, Tp] {
+ val pit: ParallelIterator
+ }
+
+ protected trait Transformer[R, Tp] extends Accessor[R, Tp] with super.Transformer[R, Tp]
+
+ protected[this] class SegmentLength(pred: T => Boolean, from: Int, val pit: ParallelIterator)
+ extends Accessor[(Int, Boolean), SegmentLength] {
+ var result: (Int, Boolean) = null
+ def leaf(prev: Option[(Int, Boolean)]) = if (from < pit.indexFlag) {
+ val itsize = pit.remaining
+ val seglen = pit.prefixLength(pred)
+ result = (seglen, itsize == seglen)
+ if (!result._2) pit.setIndexFlagIfLesser(from)
+ } else result = (0, false)
+ def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val pits = pit.split
+ for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new SegmentLength(pred, from + untilp, p)
+ }
+ override def merge(that: SegmentLength) = if (result._2) result = (result._1 + that.result._1, that.result._2)
+ }
+
+ protected[this] class IndexWhere(pred: T => Boolean, from: Int, val pit: ParallelIterator)
+ extends Accessor[Int, IndexWhere] {
+ var result: Int = -1
+ def leaf(prev: Option[Int]) = if (from < pit.indexFlag) {
+ val r = pit.indexWhere(pred)
+ if (r != -1) {
+ result = from + r
+ pit.setIndexFlagIfLesser(from)
+ }
+ }
+ def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val pits = pit.split
+ for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield new IndexWhere(pred, untilp, p)
+ }
+ override def merge(that: IndexWhere) = result = if (result == -1) that.result else {
+ if (that.result != -1) result min that.result else result
+ }
+ }
+
+ protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, val pit: ParallelIterator)
+ extends Accessor[Int, LastIndexWhere] {
+ var result: Int = -1
+ def leaf(prev: Option[Int]) = if (pos > pit.indexFlag) {
+ val r = pit.lastIndexWhere(pred)
+ if (r != -1) {
+ result = pos + r
+ pit.setIndexFlagIfGreater(pos)
+ }
+ }
+ def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val pits = pit.split
+ for ((p, untilp) <- pits zip pits.scanLeft(pos)(_ + _.remaining)) yield new LastIndexWhere(pred, untilp, p)
+ }
+ override def merge(that: LastIndexWhere) = result = if (result == -1) that.result else {
+ if (that.result != -1) result max that.result else result
+ }
+ }
+
+ protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], val pit: ParallelIterator)
+ extends Transformer[Combiner[U, This], Reverse[U, This]] {
+ var result: Combiner[U, This] = null
+ def leaf(prev: Option[Combiner[U, This]]) = result = pit.reverse2combiner(reuse(prev, cbf()))
+ def newSubtask(p: SuperParallelIterator) = new Reverse(cbf, down(p))
+ override def merge(that: Reverse[U, This]) = result = that.result combine result
+ }
+
+ protected[this] class ReverseMap[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator)
+ extends Transformer[Combiner[S, That], ReverseMap[S, That]] {
+ var result: Combiner[S, That] = null
+ def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf) // TODO
+ def newSubtask(p: SuperParallelIterator) = new ReverseMap(f, pbf, down(p))
+ override def merge(that: ReverseMap[S, That]) = result = that.result combine result
+ }
+
+ protected[this] class SameElements[U >: T](val pit: ParallelIterator, val otherpit: PreciseSplitter[U])
+ extends Accessor[Boolean, SameElements[U]] {
+ var result: Boolean = true
+ def leaf(prev: Option[Boolean]) = if (!pit.isAborted) {
+ result = pit.sameElements(otherpit)
+ if (!result) pit.abort
+ }
+ def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val fp = pit.remaining / 2
+ val sp = pit.remaining - fp
+ for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new SameElements(p, op)
+ }
+ override def merge(that: SameElements[U]) = result = result && that.result
+ }
+
+ protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], val pit: ParallelIterator)
+ extends Transformer[Combiner[U, That], Updated[U, That]] {
+ var result: Combiner[U, That] = null
+ def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf) // TODO
+ def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val pits = pit.split
+ for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Updated(pos - untilp, elem, pbf, p)
+ }
+ override def merge(that: Updated[U, That]) = result = result combine that.result
+ }
+
+ protected[this] class Corresponds[S](corr: (T, S) => Boolean, val pit: ParallelIterator, val otherpit: PreciseSplitter[S])
+ extends Accessor[Boolean, Corresponds[S]] {
+ var result: Boolean = true
+ def leaf(prev: Option[Boolean]) = if (!pit.isAborted) {
+ result = pit.corresponds(corr)(otherpit)
+ if (!result) pit.abort
+ }
+ def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
+ override def split = {
+ val fp = pit.remaining / 2
+ val sp = pit.remaining - fp
+ for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new Corresponds(corr, p, op)
+ }
+ override def merge(that: Corresponds[S]) = result = result && that.result
+ }
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/ParallelSeqView.scala b/src/library/scala/collection/parallel/ParallelSeqView.scala
new file mode 100644
index 0000000000..7862e99f44
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParallelSeqView.scala
@@ -0,0 +1,64 @@
+package scala.collection.parallel
+
+
+
+
+import scala.collection.TraversableView
+import scala.collection.SeqView
+import scala.collection.Parallel
+import scala.collection.generic.CanCombineFrom
+
+
+
+
+
+/** A template view of a non-strict view of a parallel sequence.
+ *
+ * @tparam T
+ * @tparam Coll
+ *
+ * @since 2.8
+ */
+trait ParallelSeqView[+T, +Coll <: Parallel, +CollSeq]
+extends ParallelSeqViewLike[T, Coll, CollSeq, ParallelSeqView[T, Coll, CollSeq], SeqView[T, CollSeq]]
+
+
+
+object ParallelSeqView {
+ abstract class NoCombiner[T] extends Combiner[T, Nothing] {
+ self: EnvironmentPassingCombiner[T, Nothing] =>
+ def +=(elem: T): this.type = this
+ def iterator: Iterator[T] = Iterator.empty
+ def result() = throw new UnsupportedOperationException("ParallelSeqView.Combiner.result")
+ def size = throw new UnsupportedOperationException("ParallelSeqView.Combiner.size")
+ def clear() {}
+ def combine[N <: T, NewTo >: Nothing](other: Combiner[N, NewTo]) =
+ throw new UnsupportedOperationException("ParallelSeqView.Combiner.result")
+ }
+
+ type Coll = ParallelSeqView[_, C, _] forSome { type C <: ParallelSeq[_] }
+
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] =
+ new CanCombineFrom[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] {
+ def apply(from: Coll) = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing]
+ def apply() = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing]
+ }
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/ParallelSeqViewLike.scala b/src/library/scala/collection/parallel/ParallelSeqViewLike.scala
new file mode 100644
index 0000000000..eab4d7ad5f
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParallelSeqViewLike.scala
@@ -0,0 +1,192 @@
+package scala.collection.parallel
+
+
+
+
+
+import scala.collection.SeqView
+import scala.collection.SeqViewLike
+import scala.collection.Parallel
+import scala.collection.generic.CanBuildFrom
+import scala.collection.generic.CanCombineFrom
+
+
+
+
+
+
+
+/** A template view of a non-strict view of parallel sequence.
+ *
+ * @tparam T the type of the elements in this view
+ * @tparam Coll type of the collection this view is derived from
+ * @tparam CollSeq TODO
+ * @tparam This actual representation type of this view
+ * @tparam ThisSeq type of the sequential version of this view
+ *
+ * @since 2.8
+ */
+trait ParallelSeqViewLike[+T,
+ +Coll <: Parallel,
+ +CollSeq,
+ +This <: ParallelSeqView[T, Coll, CollSeq] with ParallelSeqViewLike[T, Coll, CollSeq, This, ThisSeq],
+ +ThisSeq <: SeqView[T, CollSeq] with SeqViewLike[T, CollSeq, ThisSeq]]
+extends SeqView[T, Coll]
+ with SeqViewLike[T, Coll, This]
+ with ParallelIterableView[T, Coll, CollSeq]
+ with ParallelIterableViewLike[T, Coll, CollSeq, This, ThisSeq]
+ with ParallelSeq[T]
+ with ParallelSeqLike[T, This, ThisSeq]
+{
+ self =>
+
+ type SCPI = SignalContextPassingIterator[ParallelIterator]
+
+ trait Transformed[+S] extends ParallelSeqView[S, Coll, CollSeq]
+ with super[ParallelIterableView].Transformed[S] with super[SeqView].Transformed[S] {
+ override def parallelIterator = new Elements(0, length) with SCPI {}
+ override def iterator = parallelIterator
+ environment = self.environment
+ }
+
+ trait Forced[S] extends super.Forced[S] with Transformed[S] {
+ // cheating here - knowing that `underlying` of `self.seq` is of type `CollSeq`,
+ // we use it to obtain a view of the correct type - not the most efficient thing
+ // in the universe, but without making `newForced` more accessible, or adding
+ // a `forced` method to `SeqView`, this is the best we can do
+ def seq = self.seq.take(0).++(forced).asInstanceOf[SeqView[S, CollSeq]]
+ }
+
+ trait Filtered extends super.Filtered with Transformed[T] {
+ def seq = self.seq filter pred
+ }
+
+ trait Sliced extends super.Sliced with Transformed[T] {
+ override def slice(from1: Int, until1: Int): This = newSliced(from1 max 0, until1 max 0).asInstanceOf[This]
+ def seq = self.seq.slice(from, until)
+ }
+
+ trait Appended[U >: T] extends super.Appended[U] with Transformed[U] {
+ def seq = self.seq.++(rest).asInstanceOf[SeqView[U, CollSeq]]
+ }
+
+ trait Mapped[S] extends super.Mapped[S] with Transformed[S]{
+ def seq = self.seq.map(mapping).asInstanceOf[SeqView[S, CollSeq]]
+ }
+
+ trait FlatMapped[S] extends super.FlatMapped[S] with Transformed[S] {
+ def seq = self.seq.flatMap(mapping).asInstanceOf[SeqView[S, CollSeq]]
+ }
+
+ trait TakenWhile extends super.TakenWhile with Transformed[T] {
+ def seq = self.seq takeWhile pred
+ }
+
+ trait DroppedWhile extends super.DroppedWhile with Transformed[T] {
+ def seq = self.seq dropWhile pred
+ }
+
+ trait Zipped[S] extends super.Zipped[S] with Transformed[(T, S)] {
+ def seq = (self.seq zip other).asInstanceOf[SeqView[(T, S), CollSeq]]
+ }
+
+ trait ZippedAll[T1 >: T, S] extends super.ZippedAll[T1, S] with Transformed[(T1, S)] {
+ def seq = self.seq.zipAll(other, thisElem, thatElem).asInstanceOf[SeqView[(T1, S), CollSeq]]
+ }
+
+ trait Reversed extends super.Reversed with Transformed[T] {
+ def seq = self.seq.reverse
+ }
+
+ trait Patched[U >: T] extends super.Patched[U] with Transformed[U] {
+ def seq = self.seq.patch(from, patch, replaced).asInstanceOf[SeqView[U, CollSeq]]
+ }
+
+ trait Prepended[U >: T] extends super.Prepended[U] with Transformed[U] {
+ def seq = (fst +: self.seq).asInstanceOf[SeqView[U, CollSeq]]
+ }
+
+ protected override def newFiltered(p: T => Boolean): Transformed[T] = new Filtered { val pred = p }
+ protected override def newSliced(f: Int, u: Int): Transformed[T] = new Sliced { val from = f; val until = u }
+ protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = new Appended[U] { val rest = that }
+ protected override def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f }
+ protected override def newFlatMapped[S](f: T => Traversable[S]): Transformed[S] = new FlatMapped[S] { val mapping = f }
+ protected override def newDroppedWhile(p: T => Boolean): Transformed[T] = new DroppedWhile { val pred = p }
+ protected override def newTakenWhile(p: T => Boolean): Transformed[T] = new TakenWhile { val pred = p }
+ protected override def newZipped[S](that: Iterable[S]): Transformed[(T, S)] = new Zipped[S] { val other = that }
+ protected override def newZippedAll[T1 >: T, S](that: Iterable[S], _thisElem: T1, _thatElem: S): Transformed[(T1, S)] = new ZippedAll[T1, S] { val other = that; val thisElem = _thisElem; val thatElem = _thatElem }
+ protected override def newReversed: Transformed[T] = new Reversed { }
+ protected override def newPatched[U >: T](_from: Int, _patch: Seq[U], _replaced: Int): Transformed[U] = new Patched[U] { val from = _from; val patch = _patch; val replaced = _replaced }
+ protected override def newPrepended[U >: T](elem: U): Transformed[U] = new Prepended[U] { protected[this] val fst = elem }
+
+ override def filter(p: T => Boolean): This = newFiltered(p).asInstanceOf[This]
+ override def filterNot(p: T => Boolean): This = newFiltered(!p(_)).asInstanceOf[This]
+ override def partition(p: T => Boolean): (This, This) = (filter(p), filterNot(p))
+ override def slice(from: Int, until: Int): This = newSliced(from, until).asInstanceOf[This]
+ override def take(n: Int): This = newSliced(0, n).asInstanceOf[This]
+ override def drop(n: Int): This = newSliced(n, length).asInstanceOf[This]
+ override def splitAt(n: Int): (This, This) = (take(n), drop(n))
+ override def ++[U >: T, That](xs: TraversableOnce[U])(implicit bf: CanBuildFrom[This, U, That]): That = newAppended(xs.toTraversable).asInstanceOf[That]
+ override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That]
+ override def flatMap[S, That](f: T => Traversable[S])(implicit bf: CanBuildFrom[This, S, That]): That = newFlatMapped(f).asInstanceOf[That]
+ override def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[This, S, That]): That = filter(pf.isDefinedAt).map(pf)(bf)
+ override def takeWhile(p: T => Boolean): This = newTakenWhile(p).asInstanceOf[This]
+ override def dropWhile(p: T => Boolean): This = newDroppedWhile(p).asInstanceOf[This]
+ override def span(p: T => Boolean): (This, This) = (takeWhile(p), dropWhile(p))
+ override def scanLeft[S, That](z: S)(op: (S, T) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisSeq.scanLeft(z)(op)).asInstanceOf[That]
+ override def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisSeq.scanRight(z)(op)).asInstanceOf[That]
+ override def groupBy[K](f: T => K): collection.immutable.Map[K, This] = thisSeq.groupBy(f).mapValues(xs => newForced(xs).asInstanceOf[This])
+ override def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = newPrepended(elem).asInstanceOf[That]
+ override def reverse: This = newReversed.asInstanceOf[This]
+ override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int)(implicit bf: CanBuildFrom[This, U, That]): That = newPatched(from, patch, replaced).asInstanceOf[That]
+ override def padTo[U >: T, That](len: Int, elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = patch(length, Seq.fill(len - length)(elem), 0)
+ override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = reverse.map(f)
+ override def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = {
+ require(0 <= index && index < length)
+ patch(index, List(elem), 1)(bf)
+ }
+ override def :+[U >: T, That](elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = ++(Iterator.single(elem))(bf)
+ override def union[U >: T, That](that: Seq[U])(implicit bf: CanBuildFrom[This, U, That]): That = this ++ that
+ override def diff[U >: T](that: Seq[U]): This = newForced(thisSeq diff that).asInstanceOf[This]
+ override def intersect[U >: T](that: Seq[U]): This = newForced(thisSeq intersect that).asInstanceOf[This]
+ override def sorted[U >: T](implicit ord: Ordering[U]): This = newForced(thisSeq sorted ord).asInstanceOf[This]
+
+ override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf =>
+ executeAndWaitResult(new Force(pbf, parallelIterator) mapResult { _.result })
+ } otherwise {
+ val b = bf(underlying)
+ b ++= this.iterator
+ b.result
+ }
+
+ /* tasks */
+
+ protected[this] class Force[U >: T, That](cbf: CanCombineFrom[Coll, U, That], val pit: ParallelIterator)
+ extends Transformer[Combiner[U, That], Force[U, That]] {
+ var result: Combiner[U, That] = null
+ def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cbf(self.underlying)))
+ def newSubtask(p: SuperParallelIterator) = new Force(cbf, down(p))
+ override def merge(that: Force[U, That]) = result = result combine that.result
+ }
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/Splitters.scala b/src/library/scala/collection/parallel/Splitters.scala
new file mode 100644
index 0000000000..b3cad6d67a
--- /dev/null
+++ b/src/library/scala/collection/parallel/Splitters.scala
@@ -0,0 +1,86 @@
+package scala.collection.parallel
+
+
+import scala.collection.Seq
+
+
+/** A splitter (or a split iterator) can be split into more splitters that traverse over
+ * disjoint subsets of elements.
+ *
+ * @tparam T type of the elements this parallel iterator traverses
+ *
+ * @since 2.8.1
+ * @author prokopec
+ */
+trait Splitter[+T] extends Iterator[T] {
+
+ /** Splits the iterator into a sequence of disjunct views.
+ *
+ * Returns a sequence of split iterators, each iterating over some subset of the
+ * elements in the collection. These subsets are disjoint and should be approximately
+ * equal in size. These subsets are not empty, unless the iterator is empty in which
+ * case this method returns a sequence with a single empty iterator. If the iterator has
+ * more than two elements, this method will return two or more iterators.
+ *
+ * Implementors are advised to keep this partition relatively small - two iterators are
+ * already enough when partitioning the collection, although there may be a few more.
+ *
+ * '''Note:''' this method actually invalidates the current iterator.
+ *
+ * @return a sequence of disjunct iterators of the collection
+ */
+ def split: Seq[Splitter[T]]
+
+}
+
+
+/** A precise splitter (or a precise split iterator) can be split into arbitrary number of splitters
+ * that traverse disjoint subsets of arbitrary sizes.
+ *
+ * Implementors might want to override the parameterless `split` method for efficiency.
+ *
+ * @tparam T type of the elements this parallel iterator traverses
+ *
+ * @since 2.8.1
+ * @author prokopec
+ */
+trait PreciseSplitter[+T] extends Splitter[T] {
+
+ /** Splits the iterator into disjunct views.
+ *
+ * This overloaded version of the `split` method is specific to precise parallel iterators.
+ * It returns a sequence of parallel iterators, each iterating some subset of the
+ * elements in this iterator. The sizes of the subiterators in the partition is equal to
+ * the size in the corresponding argument, as long as there are enough elements in this
+ * iterator to split it that way.
+ *
+ * If there aren't enough elements, a zero element iterator is appended for each additional argument.
+ * If there are additional elements, an additional iterator is appended at the end to compensate.
+ *
+ * For example, say we have a parallel iterator `ps` with 100 elements. Invoking:
+ * {{{
+ * ps.split(50, 25, 25, 10, 5)
+ * }}}
+ * will return a sequence of five iterators, last two views being empty. On the other hand, calling:
+ * {{{
+ * ps.split(50, 40)
+ * }}}
+ * will return a sequence of three iterators, last of them containing ten elements.
+ *
+ * '''Note:''' this method actually invalidates the current iterator.
+ *
+ * Unlike the case with `split` found in parallel iterable iterators, views returned by this method can be empty.
+ *
+ * @param sizes the sizes used to split this split iterator into iterators that traverse disjunct subsets
+ * @return a sequence of disjunct subsequence iterators of this parallel iterator
+ */
+ def psplit(sizes: Int*): Seq[PreciseSplitter[T]]
+
+ def split: Seq[PreciseSplitter[T]]
+
+}
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala
new file mode 100644
index 0000000000..8a072b22aa
--- /dev/null
+++ b/src/library/scala/collection/parallel/TaskSupport.scala
@@ -0,0 +1,27 @@
+package scala.collection.parallel
+
+
+
+
+
+
+
+trait TaskSupport extends AdaptiveWorkStealingForkJoinTasks
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
new file mode 100644
index 0000000000..3ef60f8c7a
--- /dev/null
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -0,0 +1,230 @@
+package scala.collection.parallel
+
+
+
+
+import scala.concurrent.forkjoin._
+
+
+
+
+
+
+
+
+
+
+/** A trait that declares task execution capabilities used
+ * by parallel collections. Parallel collections inherit a subtrait
+ * of this trait.
+ *
+ * One implementation trait of `TaskExecution` is `ForkJoinTaskExecution`.
+ */
+trait Tasks {
+
+ /** A task abstraction which allows starting a task with `start`,
+ * waiting for it to finish with `sync` and attempting to cancel
+ * the task with `tryCancel`.
+ * It also defines a method `leaf` which must be called once the
+ * the task is started and defines what this task actually does.
+ * Method `split` allows splitting this task into smaller subtasks,
+ * and method `shouldSplitFurther` decides if the task should be
+ * partitioned further.
+ * Method `merge` allows merging the results of the 2 tasks. It updates
+ * the result of the receiver.
+ * Finally, it defines the task result of type `U`.
+ */
+ trait Task[R, +Tp] {
+ def repr = this.asInstanceOf[Tp]
+ /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */
+ def compute
+ /** Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous task
+ * or `None` if there was no previous task.
+ */
+ def leaf(result: Option[R])
+ /** Start task. */
+ def start
+ /** Wait for task to finish. */
+ def sync
+ /** Try to cancel the task.
+ * @return `true` if cancellation is successful.
+ */
+ def tryCancel: Boolean
+ /** A result that can be accessed once the task is completed. */
+ def result: R
+ /** Decides whether or not this task should be split further. */
+ def shouldSplitFurther: Boolean
+ /** Splits this task into a list of smaller tasks. */
+ protected[this] def split: Seq[Task[R, Tp]]
+ /** Read of results of `that` task and merge them into results of this one. */
+ protected[this] def merge(that: Tp) {}
+ }
+
+ type TaskType[R, +Tp] <: Task[R, Tp]
+ type ExecutionEnvironment
+
+ var environment: ExecutionEnvironment
+
+ /** Executes a task and waits for it to finish. */
+ def executeAndWait[R, Tp](task: TaskType[R, Tp])
+
+ /** Executes a result task, waits for it to finish, then returns its result. */
+ def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R
+
+ /** Retrieves the parallelism level of the task execution environment. */
+ def parallelismLevel: Int
+
+}
+
+
+/** This trait implements scheduling by employing
+ * an adaptive work stealing technique.
+ */
+trait AdaptiveWorkStealingTasks extends Tasks {
+
+ trait Task[R, Tp] extends super.Task[R, Tp] {
+ var next: Task[R, Tp] = null
+ var shouldWaitFor = true
+ var result: R
+
+ def split: Seq[Task[R, Tp]]
+
+ /** The actual leaf computation. */
+ def leaf(result: Option[R]): Unit
+
+ def compute = if (shouldSplitFurther) internal else leaf(None)
+
+ def internal = {
+ var last = spawnSubtasks
+
+ last.leaf(None)
+ result = last.result
+
+ while (last.next != null) {
+ val lastresult = Option(last.result)
+ last = last.next
+ if (last.tryCancel) last.leaf(lastresult) else last.sync
+ merge(last.repr)
+ }
+ }
+
+ def spawnSubtasks = {
+ var last: Task[R, Tp] = null
+ var head: Task[R, Tp] = this
+ do {
+ val subtasks = head.split
+ head = subtasks.head
+ for (t <- subtasks.tail) {
+ t.next = last
+ last = t
+ t.start
+ }
+ } while (head.shouldSplitFurther);
+ head.next = last
+ head
+ }
+
+ def printChain = {
+ var curr = this
+ var chain = "chain: "
+ while (curr != null) {
+ chain += curr + " ---> "
+ curr = curr.next
+ }
+ println(chain)
+ }
+ }
+
+}
+
+
+/**
+ * A trait describing objects that provide a fork/join pool.
+ */
+trait HavingForkJoinPool {
+ def forkJoinPool: ForkJoinPool
+}
+
+
+
+/** An implementation trait for parallel tasks based on the fork/join framework.
+ *
+ * @define fjdispatch
+ * If the current thread is a fork/join worker thread, the task's `fork` method will
+ * be invoked. Otherwise, the task will be executed on the fork/join pool.
+ */
+trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
+
+ trait Task[R, +Tp] extends RecursiveAction with super.Task[R, Tp] {
+ def start = fork
+ def sync = join
+ def tryCancel = tryUnfork
+ var result: R
+ }
+
+ type TaskType[R, +Tp] = Task[R, Tp]
+ type ExecutionEnvironment = ForkJoinPool
+
+ /** The fork/join pool of this collection.
+ */
+ def forkJoinPool: ForkJoinPool = environment
+ var environment = ForkJoinTasks.defaultForkJoinPool
+
+ /** Executes a task on a fork/join pool and waits for it to finish.
+ *
+ * $fjdispatch
+ */
+ def executeAndWait[R, Tp](fjtask: Task[R, Tp]) {
+ if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
+ fjtask.fork
+ } else {
+ forkJoinPool.execute(fjtask)
+ }
+ fjtask.join
+ }
+
+ /** Executes a task on a fork/join pool and waits for it to finish.
+ * Returns its result when it does.
+ *
+ * $fjdispatch
+ *
+ * @return the result of the task
+ */
+ def executeAndWaitResult[R, Tp](fjtask: Task[R, Tp]): R = {
+ if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
+ fjtask.fork
+ } else {
+ forkJoinPool.execute(fjtask)
+ }
+ fjtask.join
+ fjtask.result
+ }
+
+ def parallelismLevel = forkJoinPool.getParallelism
+
+}
+
+object ForkJoinTasks {
+ val defaultForkJoinPool = new ForkJoinPool
+ defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
+ defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
+}
+
+
+/* Some boilerplate due to no deep mixin composition. Not sure if it can be done differently without them.
+ */
+trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks {
+
+ trait Task[R, Tp] extends super[ForkJoinTasks].Task[R, Tp] with super[AdaptiveWorkStealingTasks].Task[R, Tp]
+
+}
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala b/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala
new file mode 100644
index 0000000000..e29e9dfa98
--- /dev/null
+++ b/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala
@@ -0,0 +1,139 @@
+package scala.collection.parallel.immutable
+
+
+
+
+
+
+
+import scala.collection.parallel.ParallelMap
+import scala.collection.parallel.ParallelMapLike
+import scala.collection.parallel.Combiner
+import scala.collection.parallel.EnvironmentPassingCombiner
+import scala.collection.generic.ParallelMapFactory
+import scala.collection.generic.CanCombineFrom
+import scala.collection.generic.GenericParallelMapTemplate
+import scala.collection.generic.GenericParallelMapCompanion
+import scala.collection.immutable.HashMap
+
+
+
+
+
+
+class ParallelHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V])
+extends ParallelMap[K, V]
+ with GenericParallelMapTemplate[K, V, ParallelHashTrie]
+ with ParallelMapLike[K, V, ParallelHashTrie[K, V], HashMap[K, V]]
+{
+self =>
+
+ def this() = this(HashMap.empty[K, V])
+
+ override def mapCompanion: GenericParallelMapCompanion[ParallelHashTrie] = ParallelHashTrie
+
+ override def empty: ParallelHashTrie[K, V] = new ParallelHashTrie[K, V]
+
+ def parallelIterator = new ParallelHashTrieIterator(trie) with SCPI
+
+ def seq = trie
+
+ def -(k: K) = new ParallelHashTrie(trie - k)
+
+ def +[U >: V](kv: (K, U)) = new ParallelHashTrie(trie + kv)
+
+ def get(k: K) = trie.get(k)
+
+ override def size = trie.size
+
+ protected override def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]) = oldc match {
+ case Some(old) => old
+ case None => newc
+ }
+
+ type SCPI = SignalContextPassingIterator[ParallelHashTrieIterator]
+
+ class ParallelHashTrieIterator(val ht: HashMap[K, V])
+ extends super.ParallelIterator {
+ self: SignalContextPassingIterator[ParallelHashTrieIterator] =>
+ // println("created iterator " + ht)
+ var i = 0
+ lazy val triter = ht.iterator
+ def split: Seq[ParallelIterator] = {
+ // println("splitting " + ht + " into " + ht.split.map(new ParallelHashTrieIterator(_) with SCPI).map(_.toList))
+ ht.split.map(new ParallelHashTrieIterator(_) with SCPI)
+ }
+ def next: (K, V) = {
+ // println("taking next after " + i + ", in " + ht)
+ i += 1
+ triter.next
+ }
+ def hasNext: Boolean = {
+ // println("hasNext: " + i + ", " + ht.size + ", " + ht)
+ i < ht.size
+ }
+ def remaining = ht.size - i
+ }
+
+}
+
+
+object ParallelHashTrie extends ParallelMapFactory[ParallelHashTrie] {
+ def empty[K, V]: ParallelHashTrie[K, V] = new ParallelHashTrie[K, V]
+
+ def newCombiner[K, V]: Combiner[(K, V), ParallelHashTrie[K, V]] = HashTrieCombiner[K, V]
+
+ implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParallelHashTrie[K, V]] = {
+ new CanCombineFromMap[K, V]
+ }
+
+ def fromTrie[K, V](trie: HashMap[K, V]): ParallelHashTrie[K, V] = new ParallelHashTrie(trie)
+
+ var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0)
+}
+
+
+trait HashTrieCombiner[K, V]
+extends Combiner[(K, V), ParallelHashTrie[K, V]] {
+self: EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] =>
+ private var trie: HashMap[K, V] = HashMap.empty[K, V]
+
+ def size: Int = trie.size
+
+ def clear = trie = HashMap.empty[K, V]
+
+ def +=(elem: (K, V)) = { trie += elem; this }
+
+ def combine[N <: (K, V), NewTo >: ParallelHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
+ // ParallelHashTrie.totalcombines.incrementAndGet
+ if (other.isInstanceOf[HashTrieCombiner[_, _]]) {
+ val that = other.asInstanceOf[HashTrieCombiner[K, V]]
+ val ncombiner = HashTrieCombiner[K, V]
+ ncombiner.trie = this.trie merge that.trie
+ ncombiner
+ } else error("Unexpected combiner type.")
+ } else this
+
+ def result = new ParallelHashTrie[K, V](trie)
+
+}
+
+
+object HashTrieCombiner {
+ def apply[K, V] = new HashTrieCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] {}
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/immutable/ParallelIterable.scala b/src/library/scala/collection/parallel/immutable/ParallelIterable.scala
new file mode 100644
index 0000000000..92bf5ab706
--- /dev/null
+++ b/src/library/scala/collection/parallel/immutable/ParallelIterable.scala
@@ -0,0 +1,56 @@
+package scala.collection.parallel.immutable
+
+
+import scala.collection.generic._
+
+import scala.collection.parallel.ParallelIterableLike
+import scala.collection.parallel.Combiner
+
+
+
+
+
+// TODO uncomment when we add parallel vectors
+
+///** A template trait for immutable parallel iterable collections.
+// *
+// * $paralleliterableinfo
+// *
+// * $sideeffects
+// *
+// * @tparam A the element type of the collection
+// *
+// * @author prokopec
+// * @since 2.8
+// */
+//trait ParallelIterable[A] extends collection.immutable.Iterable[A]
+// with collection.parallel.ParallelIterable[A]
+// with GenericParallelTemplate[A, ParallelIterable]
+// with ParallelIterableLike[A, ParallelIterable[A], Iterable[A]] {
+// override def companion: GenericCompanion[ParallelIterable] with GenericParallelCompanion[ParallelIterable] = ParallelIterable
+//}
+//
+///** $factoryinfo
+// */
+//object ParallelIterable extends ParallelFactory[ParallelIterable] {
+// implicit def canBuildFrom[A]: CanBuildFromParallel[Coll, A, ParallelIterable[A]] =
+// new GenericCanBuildFromParallel[A]
+//
+// def newBuilder[A]: Combiner[A, ParallelIterable[A]] = null // TODO
+//
+// def newCombiner[A]: Combiner[A, ParallelIterable[A]] = null // TODO
+//}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/immutable/ParallelRange.scala b/src/library/scala/collection/parallel/immutable/ParallelRange.scala
new file mode 100644
index 0000000000..85a33c7431
--- /dev/null
+++ b/src/library/scala/collection/parallel/immutable/ParallelRange.scala
@@ -0,0 +1,88 @@
+package scala.collection.parallel.immutable
+
+
+
+import scala.collection.immutable.Range
+import scala.collection.immutable.RangeUtils
+import scala.collection.parallel.ParallelSeq
+import scala.collection.parallel.Combiner
+import scala.collection.generic.CanCombineFrom
+
+
+
+class ParallelRange(val start: Int, val end: Int, val step: Int, val inclusive: Boolean)
+extends ParallelSeq[Int]
+ with RangeUtils[ParallelRange] {
+ self =>
+
+ def seq = new Range(start, end, step)
+
+ def length = _length
+
+ def apply(idx: Int) = _apply(idx)
+
+ def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = new ParallelRange(_start, _end, _step, _inclusive)
+
+ def parallelIterator = new ParallelRangeIterator with SCPI
+
+ override def toString = seq.toString // TODO
+
+ type SCPI = SignalContextPassingIterator[ParallelRangeIterator]
+
+ class ParallelRangeIterator
+ (var start: Int = self.start, val end: Int = self.end, val step: Int = self.step, val inclusive: Boolean = self.inclusive)
+ extends ParallelIterator with RangeUtils[ParallelRangeIterator] {
+ me: SignalContextPassingIterator[ParallelRangeIterator] =>
+ def remaining = _length
+ def next = { val r = start; start += step; r }
+ def hasNext = remaining > 0
+ def split: Seq[ParallelIterator] = psplit(remaining / 2, remaining - remaining / 2)
+ def psplit(sizes: Int*): Seq[ParallelIterator] = {
+ val incr = sizes.scanLeft(0)(_ + _)
+ for ((from, until) <- incr.init zip incr.tail) yield _slice(from, until)
+ }
+ def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = {
+ new ParallelRangeIterator(_start, _end, _step, _inclusive) with SCPI
+ }
+
+ override def toString = "ParallelRangeIterator(" + start + ", " + end + ", " + step + ", incl: " + inclusive + ")"
+
+ /* accessors */
+
+ override def foreach[U](f: Int => U): Unit = {
+ _foreach(f)
+ start = end + step
+ }
+
+ override def reduce[U >: Int](op: (U, U) => U): U = {
+ var sum = next
+ for (elem <- this) sum += elem
+ sum
+ }
+
+ /* transformers */
+
+ override def map2combiner[S, That](f: Int => S, cb: Combiner[S, That]): Combiner[S, That] = {
+ //val cb = pbf(self.repr)
+ val sz = remaining
+ cb.sizeHint(sz)
+ if (sz > 0) {
+ val last = _last
+ while (start != last) {
+ f(start)
+ start += step
+ }
+ }
+ cb
+ }
+
+ }
+
+}
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/immutable/ParallelSeq.scala b/src/library/scala/collection/parallel/immutable/ParallelSeq.scala
new file mode 100644
index 0000000000..ceb0dcc13d
--- /dev/null
+++ b/src/library/scala/collection/parallel/immutable/ParallelSeq.scala
@@ -0,0 +1,47 @@
+package scala.collection.parallel.immutable
+
+
+import scala.collection.generic.GenericParallelTemplate
+import scala.collection.generic.GenericCompanion
+import scala.collection.generic.GenericParallelCompanion
+import scala.collection.generic.CanCombineFrom
+import scala.collection.generic.ParallelFactory
+import scala.collection.parallel.ParallelSeqLike
+import scala.collection.parallel.Combiner
+
+
+
+
+
+
+// TODO uncomment when we add parallel vectors
+
+///** An immutable variant of `ParallelSeq`.
+// *
+// * @define Coll mutable.ParallelSeq
+// * @define coll mutable parallel sequence
+// */
+//trait ParallelSeq[A] extends collection.immutable.IndexedSeq[A]
+// with ParallelIterable[A]
+// with collection.parallel.ParallelSeq[A]
+// with GenericParallelTemplate[A, ParallelSeq]
+// with ParallelSeqLike[A, ParallelSeq[A], Seq[A]] {
+// override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq
+//
+//}
+//
+//
+///** $factoryInfo
+// * @define Coll mutable.ParallelSeq
+// * @define coll mutable parallel sequence
+// */
+//object ParallelSeq extends ParallelFactory[ParallelSeq] {
+// implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeq[T]] = new GenericCanBuildFromParallel[T]
+//
+// def newBuilder[A]: Combiner[A, ParallelSeq[A]] = null // TODO
+//
+// def newCombiner[A]: Combiner[A, ParallelSeq[A]] = null // TODO
+//}
+
+
+
diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala
new file mode 100644
index 0000000000..054786afaf
--- /dev/null
+++ b/src/library/scala/collection/parallel/immutable/package.scala
@@ -0,0 +1,56 @@
+package scala.collection.parallel
+
+
+
+
+
+
+
+
+
+
+
+package object immutable {
+
+ /** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method.
+ *
+ * @tparam T type of the elements
+ * @param elem the element in the repetition
+ * @param length the length of the collection
+ */
+ private[parallel] class Repetition[T](elem: T, val length: Int) extends ParallelSeq[T] {
+ self =>
+
+ def apply(idx: Int) = if (0 <= idx && idx < length) elem else throw new IndexOutOfBoundsException
+ def seq = throw new UnsupportedOperationException
+ def update(idx: Int, elem: T) = throw new UnsupportedOperationException
+
+ type SCPI = SignalContextPassingIterator[ParallelIterator]
+
+ class ParallelIterator(var i: Int = 0, val until: Int = length, elem: T = self.elem) extends super.ParallelIterator {
+ me: SignalContextPassingIterator[ParallelIterator] =>
+ def remaining = until - i
+ def hasNext = i < until
+ def next = { i += 1; elem }
+ def psplit(sizes: Int*) = {
+ val incr = sizes.scanLeft(0)(_ + _)
+ for ((start, end) <- incr.init zip incr.tail) yield new ParallelIterator(i + start, (i + end) min until, elem) with SCPI
+ }
+ def split = psplit(remaining / 2, remaining - remaining / 2)
+ }
+
+ def parallelIterator = new ParallelIterator with SCPI
+
+ }
+
+}
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/mutable/LazyCombiner.scala b/src/library/scala/collection/parallel/mutable/LazyCombiner.scala
new file mode 100644
index 0000000000..bd17d24ea8
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/LazyCombiner.scala
@@ -0,0 +1,43 @@
+package scala.collection.parallel.mutable
+
+
+
+
+import scala.collection.generic.Growable
+import scala.collection.generic.Sizing
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.parallel.Combiner
+
+
+
+
+/** Implements combining contents of two combiners
+ * by postponing the operation until `result` method is called. It chains
+ * the leaf results together instead of evaluating the actual collection.
+ *
+ * @tparam Elem the type of the elements in the combiner
+ * @tparam To the type of the collection the combiner produces
+ * @tparam Buff the type of the buffers that contain leaf results and this combiner chains together
+ */
+trait LazyCombiner[Elem, +To, Buff <: Growable[Elem] with Sizing] extends Combiner[Elem, To]
+{
+ self: collection.parallel.EnvironmentPassingCombiner[Elem, To] =>
+ val chain: ArrayBuffer[Buff]
+ val lastbuff = chain.last
+ def +=(elem: Elem) = { lastbuff += elem; this }
+ def result: To = allocateAndCopy
+ def clear = { chain.clear }
+ def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
+ if (other.isInstanceOf[LazyCombiner[_, _, _]]) {
+ val that = other.asInstanceOf[LazyCombiner[Elem, To, Buff]]
+ newLazyCombiner(chain ++= that.chain)
+ } else throw new UnsupportedOperationException("Cannot combine with combiner of different type.")
+ } else this
+ def size = chain.foldLeft(0)(_ + _.size)
+
+ /** Method that allocates the data structure and copies elements into it using
+ * `size` and `chain` members.
+ */
+ def allocateAndCopy: To
+ def newLazyCombiner(buffchain: ArrayBuffer[Buff]): LazyCombiner[Elem, To, Buff]
+}
diff --git a/src/library/scala/collection/parallel/mutable/ParallelArray.scala b/src/library/scala/collection/parallel/mutable/ParallelArray.scala
new file mode 100644
index 0000000000..3331c2dfd2
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParallelArray.scala
@@ -0,0 +1,568 @@
+package scala.collection.parallel.mutable
+
+
+
+import scala.collection.generic.GenericParallelTemplate
+import scala.collection.generic.GenericCompanion
+import scala.collection.generic.GenericParallelCompanion
+import scala.collection.generic.CanCombineFrom
+import scala.collection.generic.ParallelFactory
+import scala.collection.generic.Sizing
+import scala.collection.parallel.Combiner
+import scala.collection.parallel.ParallelSeqLike
+import scala.collection.parallel.CHECK_RATE
+import scala.collection.mutable.ArraySeq
+import scala.collection.mutable.Builder
+import scala.collection.Sequentializable
+
+
+
+
+/** Parallel sequence holding elements in a linear array.
+ *
+ * `ParallelArray` is a parallel sequence with a predefined size. The size of the array
+ * cannot be changed after it's been created.
+ *
+ * `ParallelArray` internally keeps an array containing the elements. This means that
+ * bulk operations based on traversal are fast, but those returning a parallel array as a result
+ * are slightly slower. The reason for this is that `ParallelArray` uses lazy builders that
+ * create the internal data array only after the size of the array is known. The fragments
+ * are then copied into the resulting data array in parallel using fast array copy operations.
+ * Operations for which the resulting array size is known in advance are optimised to use this
+ * information.
+ *
+ * @tparam T type of the elements in the array
+ *
+ * @define Coll ParallelArray
+ * @define coll parallel array
+ */
+class ParallelArray[T] private[mutable] (val arrayseq: ArraySeq[T])
+extends ParallelSeq[T]
+ with GenericParallelTemplate[T, ParallelArray]
+ with ParallelSeqLike[T, ParallelArray[T], ArraySeq[T]]
+{
+ self =>
+
+ private val array: Array[Any] = arrayseq.array.asInstanceOf[Array[Any]]
+
+ override def companion: GenericCompanion[ParallelArray] with GenericParallelCompanion[ParallelArray] = ParallelArray
+
+ def this(sz: Int) = this {
+ require(sz >= 0)
+ new ArraySeq[T](sz)
+ }
+
+ def apply(i: Int) = array(i).asInstanceOf[T]
+
+ def update(i: Int, elem: T) = array(i) = elem
+
+ def length = arrayseq.length
+
+ def seq = arrayseq
+
+ type SCPI = SignalContextPassingIterator[ParallelArrayIterator]
+
+ def parallelIterator: ParallelArrayIterator = {
+ val pit = new ParallelArrayIterator with SCPI
+ pit
+ }
+
+ class ParallelArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array)
+ extends super.ParallelIterator {
+ me: SignalContextPassingIterator[ParallelArrayIterator] =>
+
+ def hasNext = i < until
+
+ def next = {
+ val elem = arr(i)
+ i += 1
+ elem.asInstanceOf[T]
+ }
+
+ def remaining = until - i
+
+ def psplit(sizesIncomplete: Int*): Seq[ParallelIterator] = {
+ var traversed = i
+ val total = sizesIncomplete.reduceLeft(_ + _)
+ val left = remaining
+ val sizes = if (total >= left) sizesIncomplete else sizesIncomplete :+ (left - total)
+ for (sz <- sizes) yield if (traversed < until) {
+ val start = traversed
+ val end = (traversed + sz) min until
+ traversed = end
+ new ParallelArrayIterator(start, end, arr) with SCPI
+ } else {
+ new ParallelArrayIterator(traversed, traversed, arr) with SCPI
+ }
+ }
+
+ override def split: Seq[ParallelIterator] = {
+ val left = remaining
+ if (left >= 2) {
+ val splitpoint = left / 2
+ Seq(new ParallelArrayIterator(i, i + splitpoint, arr) with SCPI,
+ new ParallelArrayIterator(i + splitpoint, until, arr) with SCPI)
+ } else {
+ Seq(this)
+ }
+ }
+
+ override def toString = "ParallelArrayIterator(" + i + ", " + until + ")"
+
+ /* overrides for efficiency */
+
+ /* accessors */
+
+ override def foreach[U](f: T => U) = {
+ foreach_quick(f, arr, until, i)
+ i = until
+ }
+
+ private def foreach_quick[U](f: T => U, a: Array[Any], ntil: Int, from: Int) = {
+ var j = from
+ while (j < ntil) {
+ f(a(j).asInstanceOf[T])
+ j += 1
+ }
+ }
+
+ override def count(p: T => Boolean) = {
+ val c = count_quick(p, arr, until, i)
+ i = until
+ c
+ }
+
+ private def count_quick(p: T => Boolean, a: Array[Any], ntil: Int, from: Int) = {
+ var cnt = 0
+ var j = from
+ while (j < ntil) {
+ if (p(a(j).asInstanceOf[T])) cnt += 1
+ j += 1
+ }
+ cnt
+ }
+
+ override def foldLeft[S](z: S)(op: (S, T) => S): S = {
+ val r = foldLeft_quick(arr, until, op, z)
+ i = until
+ r
+ }
+
+ private def foldLeft_quick[S](a: Array[Any], ntil: Int, op: (S, T) => S, z: S): S = {
+ var j = i
+ var sum = z
+ while (j < ntil) {
+ sum = op(sum, a(j).asInstanceOf[T])
+ j += 1
+ }
+ sum
+ }
+
+ def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = foldLeft[S](z)(seqop)
+
+ override def sum[U >: T](implicit num: Numeric[U]): U = {
+ var s = sum_quick(num, arr, until, i, num.zero)
+ i = until
+ s
+ }
+
+ private def sum_quick[U >: T](num: Numeric[U], a: Array[Any], ntil: Int, from: Int, zero: U): U = {
+ var j = from
+ var sum = zero
+ while (j < ntil) {
+ sum = num.plus(sum, a(j).asInstanceOf[T])
+ j += 1
+ }
+ sum
+ }
+
+ override def product[U >: T](implicit num: Numeric[U]): U = {
+ var p = product_quick(num, arr, until, i, num.one)
+ i = until
+ p
+ }
+
+ private def product_quick[U >: T](num: Numeric[U], a: Array[Any], ntil: Int, from: Int, one: U): U = {
+ var j = from
+ var prod = one
+ while (j < ntil) {
+ prod = num.times(prod, a(j).asInstanceOf[T])
+ j += 1
+ }
+ prod
+ }
+
+ override def forall(p: T => Boolean): Boolean = {
+ if (isAborted) return false
+
+ var all = true
+ while (i < until) {
+ val nextuntil = if (i + CHECK_RATE > until) until else i + CHECK_RATE
+
+ all = forall_quick(p, array, nextuntil, i)
+ if (all) i = nextuntil
+ else {
+ i = until
+ abort
+ }
+
+ if (isAborted) return false
+ }
+ all
+ }
+
+ // it's faster to use a separate small method
+ private def forall_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Boolean = {
+ var j = start
+ while (j < nextuntil) {
+ if (p(a(j).asInstanceOf[T])) j += 1
+ else return false
+ }
+ return true
+ }
+
+ override def exists(p: T => Boolean): Boolean = {
+ if (isAborted) return true
+
+ var some = false
+ while (i < until) {
+ val nextuntil = if (i + CHECK_RATE > until) until else i + CHECK_RATE
+
+ some = exists_quick(p, array, nextuntil, i)
+ if (some) {
+ i = until
+ abort
+ } else i = nextuntil
+
+ if (isAborted) return true
+ }
+ some
+ }
+
+ // faster to use separate small method
+ private def exists_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Boolean = {
+ var j = start
+ while (j < nextuntil) {
+ if (p(a(j).asInstanceOf[T])) return true
+ else j += 1
+ }
+ return false
+ }
+
+ override def find(p: T => Boolean): Option[T] = {
+ if (isAborted) return None
+
+ var r: Option[T] = None
+ while (i < until) {
+ val nextuntil = if ((i + CHECK_RATE) < until) (i + CHECK_RATE) else until
+
+ r = find_quick(p, array, nextuntil, i)
+
+ if (r != None) {
+ i = until
+ abort
+ } else i = nextuntil
+
+ if (isAborted) return r
+ }
+ r
+ }
+
+ private def find_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Option[T] = {
+ var j = start
+ while (j < nextuntil) {
+ val elem = a(j).asInstanceOf[T]
+ if (p(elem)) return Some(elem)
+ else j += 1
+ }
+ return None
+ }
+
+ override def drop(n: Int): ParallelArrayIterator = {
+ i += n
+ this
+ }
+
+ override def copyToArray[U >: T](array: Array[U], from: Int, len: Int) {
+ val totallen = (self.length - i) min len min (array.length - from)
+ Array.copy(arr, i, array, from, totallen)
+ i += totallen
+ }
+
+ override def prefixLength(pred: T => Boolean): Int = {
+ val r = prefixLength_quick(pred, arr, until, i)
+ i += r + 1
+ r
+ }
+
+ private def prefixLength_quick(pred: T => Boolean, a: Array[Any], ntil: Int, startpos: Int): Int = {
+ var j = startpos
+ var endpos = ntil
+ while (j < endpos) {
+ if (pred(a(j).asInstanceOf[T])) j += 1
+ else endpos = j
+ }
+ endpos - startpos
+ }
+
+ override def indexWhere(pred: T => Boolean): Int = {
+ val r = indexWhere_quick(pred, arr, until, i)
+ val ret = if (r != -1) r - i else r
+ i = until
+ ret
+ }
+
+ private def indexWhere_quick(pred: T => Boolean, a: Array[Any], ntil: Int, from: Int): Int = {
+ var j = from
+ var pos = -1
+ while (j < ntil) {
+ if (pred(a(j).asInstanceOf[T])) {
+ pos = j
+ j = ntil
+ } else j += 1
+ }
+ pos
+ }
+
+ override def lastIndexWhere(pred: T => Boolean): Int = {
+ val r = lastIndexWhere_quick(pred, arr, i, until)
+ val ret = if (r != -1) r - i else r
+ i = until
+ ret
+ }
+
+ private def lastIndexWhere_quick(pred: T => Boolean, a: Array[Any], from: Int, ntil: Int): Int = {
+ var pos = -1
+ var j = ntil - 1
+ while (j >= from) {
+ if (pred(a(j).asInstanceOf[T])) {
+ pos = j
+ j = -1
+ } else j -= 1
+ }
+ pos
+ }
+
+ override def sameElements(that: Iterator[_]): Boolean = {
+ var same = true
+ while (i < until && that.hasNext) {
+ if (arr(i) != that.next) {
+ i = until
+ same = false
+ }
+ i += 1
+ }
+ same
+ }
+
+ /* transformers */
+
+ override def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
+ //val cb = cbf(self.repr)
+ cb.sizeHint(remaining)
+ map2combiner_quick(f, arr, cb, until, i)
+ i = until
+ cb
+ }
+
+ private def map2combiner_quick[S, That](f: T => S, a: Array[Any], cb: Builder[S, That], ntil: Int, from: Int) {
+ var j = from
+ while (j < ntil) {
+ cb += f(a(j).asInstanceOf[T])
+ j += 1
+ }
+ }
+
+ override def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[ParallelArray[T], S, That]): Combiner[S, That] = {
+ val cb = pbf(self.repr)
+ collect2combiner_quick(pf, arr, cb, until, i)
+ i = until
+ cb
+ }
+
+ private def collect2combiner_quick[S, That](pf: PartialFunction[T, S], a: Array[Any], cb: Builder[S, That], ntil: Int, from: Int) {
+ var j = from
+ while (j < ntil) {
+ val curr = a(j).asInstanceOf[T]
+ if (pf.isDefinedAt(curr)) cb += pf(curr)
+ j += 1
+ }
+ }
+
+ override def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[ParallelArray[T], S, That]): Combiner[S, That] = {
+ val cb = pbf(self.repr)
+ while (i < until) {
+ val traversable = f(arr(i).asInstanceOf[T])
+ if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator
+ else cb ++= traversable
+ i += 1
+ }
+ cb
+ }
+
+ override def filter2combiner[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Combiner[U, This]) = {
+ filter2combiner_quick(pred, cb, arr, until, i)
+ i = until
+ cb
+ }
+
+ private def filter2combiner_quick[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Builder[U, This], a: Array[Any], ntil: Int, from: Int) {
+ var j = i
+ while(j < ntil) {
+ var curr = a(j).asInstanceOf[T]
+ if (pred(curr)) cb += curr
+ j += 1
+ }
+ }
+
+ override def filterNot2combiner[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Combiner[U, This]) = {
+ filterNot2combiner_quick(pred, cb, arr, until, i)
+ i = until
+ cb
+ }
+
+ private def filterNot2combiner_quick[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Builder[U, This], a: Array[Any], ntil: Int, from: Int) {
+ var j = i
+ while(j < ntil) {
+ var curr = a(j).asInstanceOf[T]
+ if (!pred(curr)) cb += curr
+ j += 1
+ }
+ }
+
+ override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = {
+ cb.sizeHint(remaining)
+ cb.ifIs[ParallelArrayCombiner[T]] { pac =>
+ val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
+ Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i)
+ pac.lastbuff.setInternalSize(remaining)
+ } otherwise {
+ copy2builder_quick(cb, arr, until, i)
+ i = until
+ }
+ cb
+ }
+
+ private def copy2builder_quick[U >: T, Coll](b: Builder[U, Coll], a: Array[Any], ntil: Int, from: Int) {
+ var j = from
+ while (j < ntil) {
+ b += a(j).asInstanceOf[T]
+ j += 1
+ }
+ }
+
+ override def partition2combiners[U >: T, This >: ParallelArray[T]](pred: T => Boolean, btrue: Combiner[U, This], bfalse: Combiner[U, This]) = {
+ partition2combiners_quick(pred, btrue, bfalse, arr, until, i)
+ i = until
+ (btrue, bfalse)
+ }
+
+ private def partition2combiners_quick[U >: T, This >: ParallelArray[T]](p: T => Boolean, btrue: Builder[U, This], bfalse: Builder[U, This], a: Array[Any], ntil: Int, from: Int) {
+ var j = from
+ while (j < ntil) {
+ val curr = a(j).asInstanceOf[T]
+ if (p(curr)) btrue += curr else bfalse += curr
+ j += 1
+ }
+ }
+
+ override def take2combiner[U >: T, This >: ParallelArray[T]](n: Int, cb: Combiner[U, This]) = {
+ cb.sizeHint(n)
+ val ntil = i + n
+ val a = arr
+ while (i < ntil) {
+ cb += a(i).asInstanceOf[T]
+ i += 1
+ }
+ cb
+ }
+
+ override def drop2combiner[U >: T, This >: ParallelArray[T]](n: Int, cb: Combiner[U, This]) = {
+ drop(n)
+ cb.sizeHint(remaining)
+ while (i < until) {
+ cb += arr(i).asInstanceOf[T]
+ i += 1
+ }
+ cb
+ }
+
+ override def reverse2combiner[U >: T, This >: ParallelArray[T]](cb: Combiner[U, This]): Combiner[U, This] = {
+ cb.ifIs[ParallelArrayCombiner[T]] { pac =>
+ val sz = remaining
+ pac.sizeHint(sz)
+ val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
+ reverse2combiner_quick(targetarr, arr, i, until)
+ pac.lastbuff.setInternalSize(sz)
+ pac
+ } otherwise super.reverse2combiner(cb)
+ cb
+ }
+
+ private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) {
+ var j = from
+ var k = ntil - from - 1
+ while (j < ntil) {
+ targ(k) = a(j)
+ j += 1
+ k -= 1
+ }
+ }
+
+ }
+
+}
+
+
+
+
+
+object ParallelArray extends ParallelFactory[ParallelArray] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelArray[T]] = new GenericCanCombineFrom[T]
+ def newBuilder[T]: Combiner[T, ParallelArray[T]] = newCombiner
+ def newCombiner[T]: Combiner[T, ParallelArray[T]] = ParallelArrayCombiner[T]
+
+ /** Creates a new parallel array by wrapping the specified array.
+ */
+ def handoff[T](arr: Array[T]): ParallelArray[T] = wrapOrRebuild(arr, arr.length)
+
+ /** Creates a new parallel array by wrapping a part of the specified array.
+ */
+ def handoff[T](arr: Array[T], sz: Int): ParallelArray[T] = wrapOrRebuild(arr, sz)
+
+ private def wrapOrRebuild[T](arr: AnyRef, sz: Int) = arr match {
+ case arr: Array[AnyRef] => new ParallelArray[T](new ExposedArraySeq[T](arr, sz))
+ case _ => new ParallelArray[T](new ExposedArraySeq[T](runtime.ScalaRunTime.toObjectArray(arr), sz))
+ }
+
+ def createFromCopy[T <: AnyRef : ClassManifest](arr: Array[T]): ParallelArray[T] = {
+ val newarr = new Array[T](arr.length)
+ Array.copy(arr, 0, newarr, 0, arr.length)
+ handoff(newarr)
+ }
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/mutable/ParallelArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParallelArrayCombiner.scala
new file mode 100644
index 0000000000..2991344be2
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParallelArrayCombiner.scala
@@ -0,0 +1,105 @@
+package scala.collection.parallel.mutable
+
+
+
+
+
+import scala.collection.generic.Sizing
+import scala.collection.mutable.ArraySeq
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.parallel.TaskSupport
+import scala.collection.parallel.EnvironmentPassingCombiner
+
+
+
+
+
+
+
+trait ParallelArrayCombiner[T]
+extends LazyCombiner[T, ParallelArray[T], ExposedArrayBuffer[T]]
+ with TaskSupport {
+ self: EnvironmentPassingCombiner[T, ParallelArray[T]] =>
+
+ override def sizeHint(sz: Int) = if (chain.length == 1) chain(0).sizeHint(sz)
+
+ def newLazyCombiner(c: ArrayBuffer[ExposedArrayBuffer[T]]) = ParallelArrayCombiner(c)
+
+ def allocateAndCopy = if (chain.size > 1) {
+ val arrayseq = new ArraySeq[T](size)
+ val array = arrayseq.array.asInstanceOf[Array[Any]]
+
+ executeAndWait(new CopyChainToArray(array, 0, size))
+
+ new ParallelArray(arrayseq)
+ } else { // optimisation if there is only 1 array
+ val pa = new ParallelArray(new ExposedArraySeq[T](chain(0).internalArray, size))
+ pa
+ }
+
+ override def toString = "ParallelArrayCombiner(" + size + "): " + chain
+
+ /* tasks */
+
+ class CopyChainToArray(array: Array[Any], offset: Int, howmany: Int) extends super.Task[Unit, CopyChainToArray] {
+ var result = ()
+ def leaf(prev: Option[Unit]) = if (howmany > 0) {
+ var totalleft = howmany
+ val (stbuff, stind) = findStart(offset)
+ var buffind = stbuff
+ var ind = stind
+ var arrayIndex = offset
+ while (totalleft > 0) {
+ val currbuff = chain(buffind)
+ val chunksize = if (totalleft < (currbuff.size - ind)) totalleft else currbuff.size - ind
+ val until = ind + chunksize
+
+ copyChunk(currbuff.internalArray, ind, array, arrayIndex, until)
+ arrayIndex += chunksize
+ ind += chunksize
+
+ totalleft -= chunksize
+ buffind += 1
+ ind = 0
+ }
+ }
+ private def copyChunk(buffarr: Array[AnyRef], buffStart: Int, ra: Array[Any], arrayStart: Int, until: Int) {
+ Array.copy(buffarr, buffStart, ra, arrayStart, until - buffStart)
+ }
+ private def findStart(pos: Int) = {
+ var left = pos
+ var buffind = 0
+ while (left >= chain(buffind).size) {
+ left -= chain(buffind).size
+ buffind += 1
+ }
+ (buffind, left)
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new CopyChainToArray(array, offset, fp), new CopyChainToArray(array, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, parallelismLevel)
+ }
+
+}
+
+
+object ParallelArrayCombiner {
+ def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ParallelArrayCombiner[T] = {
+ new { val chain = c } with ParallelArrayCombiner[T] with EnvironmentPassingCombiner[T, ParallelArray[T]]
+ }
+ def apply[T]: ParallelArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T])
+}
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/mutable/ParallelIterable.scala b/src/library/scala/collection/parallel/mutable/ParallelIterable.scala
new file mode 100644
index 0000000000..bd0a46bc43
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParallelIterable.scala
@@ -0,0 +1,51 @@
+package scala.collection.parallel.mutable
+
+
+import scala.collection.generic._
+
+import scala.collection.parallel.ParallelIterableLike
+import scala.collection.parallel.Combiner
+
+
+/** A template trait for parallel iterable collections.
+ *
+ * $paralleliterableinfo
+ *
+ * $sideeffects
+ *
+ * @tparam T the element type of the collection
+ *
+ * @author prokopec
+ * @since 2.8
+ */
+trait ParallelIterable[T] extends collection.mutable.Iterable[T]
+ with collection.parallel.ParallelIterable[T]
+ with GenericParallelTemplate[T, ParallelIterable]
+ with ParallelIterableLike[T, ParallelIterable[T], Iterable[T]] {
+ override def companion: GenericCompanion[ParallelIterable] with GenericParallelCompanion[ParallelIterable] = ParallelIterable
+}
+
+/** $factoryinfo
+ */
+object ParallelIterable extends ParallelFactory[ParallelIterable] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelIterable[T]] =
+ new GenericCanCombineFrom[T]
+
+ def newBuilder[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T]
+
+ def newCombiner[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T]
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/mutable/ParallelSeq.scala b/src/library/scala/collection/parallel/mutable/ParallelSeq.scala
new file mode 100644
index 0000000000..636ba1ac3d
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParallelSeq.scala
@@ -0,0 +1,61 @@
+package scala.collection.parallel.mutable
+
+
+import scala.collection.generic.GenericParallelTemplate
+import scala.collection.generic.GenericCompanion
+import scala.collection.generic.GenericParallelCompanion
+import scala.collection.generic.CanCombineFrom
+import scala.collection.generic.ParallelFactory
+import scala.collection.parallel.ParallelSeqLike
+import scala.collection.parallel.Combiner
+
+
+
+
+
+
+
+/** A mutable variant of `ParallelSeq`.
+ *
+ * @define Coll mutable.ParallelSeq
+ * @define coll mutable parallel sequence
+ */
+trait ParallelSeq[T] extends collection.mutable.Seq[T]
+ with ParallelIterable[T]
+ with collection.parallel.ParallelSeq[T]
+ with GenericParallelTemplate[T, ParallelSeq]
+ with ParallelSeqLike[T, ParallelSeq[T], Seq[T]] {
+ self =>
+ override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq
+
+ def update(i: Int, elem: T): Unit
+
+}
+
+
+/** $factoryInfo
+ * @define Coll mutable.ParallelSeq
+ * @define coll mutable parallel sequence
+ */
+object ParallelSeq extends ParallelFactory[ParallelSeq] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeq[T]] = new GenericCanCombineFrom[T]
+
+ def newBuilder[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T]
+
+ def newCombiner[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T]
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/mutable/package.scala b/src/library/scala/collection/parallel/mutable/package.scala
new file mode 100644
index 0000000000..f670c7b7c5
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/package.scala
@@ -0,0 +1,32 @@
+package scala.collection.parallel
+
+
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.ArraySeq
+import scala.collection.generic.Sizing
+
+
+
+package object mutable {
+
+ /* hack-arounds */
+
+ private[mutable] class ExposedArrayBuffer[T] extends ArrayBuffer[T] with Sizing {
+ def internalArray = array
+ def setInternalSize(s: Int) = size0 = s
+ override def sizeHint(len: Int) = { // delete once we start using 2.8.RC1+
+ if (len > size && len >= 1) {
+ val newarray = new Array[AnyRef](len)
+ Array.copy(array, 0, newarray, 0, size0)
+ array = newarray
+ }
+ }
+ }
+
+ private[mutable] class ExposedArraySeq[T](arr: Array[AnyRef], sz: Int) extends ArraySeq[T](sz) {
+ override val array = arr
+ override val length = sz
+ }
+
+} \ No newline at end of file
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
new file mode 100644
index 0000000000..cddf098966
--- /dev/null
+++ b/src/library/scala/collection/parallel/package.scala
@@ -0,0 +1,70 @@
+package scala.collection
+
+
+import java.lang.Thread._
+
+import scala.collection.generic.CanBuildFrom
+import scala.collection.generic.CanCombineFrom
+
+
+/** Package object for parallel collections.
+ */
+package object parallel {
+ val MIN_FOR_COPY = -1 // TODO: set to 5000
+ val CHECK_RATE = 512
+
+ /** Computes threshold from the size of the collection and the parallelism level.
+ */
+ def thresholdFromSize(sz: Int, parallelismLevel: Int) = {
+ val p = parallelismLevel
+ if (p > 1) 1 + sz / (8 * p)
+ else sz
+ }
+
+ /** An implicit conversion providing arrays with a `par` method, which
+ * returns a parallel array.
+ *
+ * @tparam T type of the elements in the array, which is a subtype of AnyRef
+ * @param array the array to be parallelized
+ * @return a `Parallelizable` object with a `par` method
+ */
+ implicit def array2ParallelArray[T <: AnyRef](array: Array[T]) = new Parallelizable[mutable.ParallelArray[T]] {
+ def par = mutable.ParallelArray.handoff[T](array)
+ }
+
+ implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new {
+ def isParallel = bf.isInstanceOf[Parallel]
+ def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]]
+ def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new {
+ def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody
+ }
+ }
+
+ implicit def traversable2ops[T](t: TraversableOnce[T]) = new {
+ def isParallel = t.isInstanceOf[Parallel]
+ def isParallelIterable = t.isInstanceOf[ParallelIterable[_]]
+ def asParallelIterable = t.asInstanceOf[ParallelIterable[T]]
+ def isParallelSeq = t.isInstanceOf[ParallelSeq[_]]
+ def asParallelSeq = t.asInstanceOf[ParallelSeq[T]]
+ def ifParallelSeq[R](isbody: ParallelSeq[T] => R) = new {
+ def otherwise(notbody: => R) = if (isParallel) isbody(asParallelSeq) else notbody
+ }
+ }
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/Combine.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/Combine.scala
index 16f791a710..3a070fb6ff 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/Combine.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/Combine.scala
@@ -21,7 +21,7 @@ class Combine(val size: Int, val parallelism: Int, val runWhat: String) extends
def runpar = throw new UnsupportedOperationException
def runseq = runhashtrie
def runhashtrie = {
- hashtrie combine thattrie
+ hashtrie merge thattrie
// println
// println("both tries: " + HashTrie.bothtries)
// println("one trie, one item: " + HashTrie.onetrie)
@@ -29,7 +29,7 @@ class Combine(val size: Int, val parallelism: Int, val runWhat: String) extends
// System exit 1
}
def rundestructive = {
- hashtrie combine thattrie
+ hashtrie merge thattrie
}
def runappendtrie = hashtrie ++ thattrie
def runhashmap = hashmap ++ thatmap
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala
index a944a7fb39..033c211849 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/MultipleCombine.scala
@@ -37,7 +37,7 @@ class MultipleCombine(val size: Int, val parallelism: Int, val runWhat: String)
def runhashtrie = {
initHashTrie
var trie = hashtrie
- for (r <- 0 until combines) trie = trie combine thattries(r)
+ for (r <- 0 until combines) trie = trie merge thattries(r)
}
def runappendtrie = {
initHashTrie
@@ -52,7 +52,7 @@ class MultipleCombine(val size: Int, val parallelism: Int, val runWhat: String)
def rundestructive = {
initHashTrie
var trie = hashtrie
- for (r <- 0 until combines) trie = trie combine thattries(r)
+ for (r <- 0 until combines) trie = trie merge thattries(r)
}
def companion = MultipleCombine
def comparisonMap = Map("hashtrie" -> runhashtrie _, "hashmap" -> runhashmap _, "appendtrie" -> runappendtrie _, "destruct" -> rundestructive _)