summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-06-29 14:05:59 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-06-29 14:05:59 +0000
commit52b863dd86ae854328f74d1d05ca71b2915fa7d7 (patch)
treed088283359939e87e4c9998f6873bc2ccc127782
parentb2abe22c97e0c588f7a5f68e970dbc78f87c6b29 (diff)
downloadscala-52b863dd86ae854328f74d1d05ca71b2915fa7d7.tar.gz
scala-52b863dd86ae854328f74d1d05ca71b2915fa7d7.tar.bz2
scala-52b863dd86ae854328f74d1d05ca71b2915fa7d7.zip
Implemented lazy combiners for parallel hash trie.
-rw-r--r--src/library/scala/collection/immutable/HashMap.scala2
-rw-r--r--src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala131
-rw-r--r--src/parallel-collections/scala/collection/Parallel.scala17
-rw-r--r--src/parallel-collections/scala/collection/Parallelizable.scala40
-rw-r--r--src/parallel-collections/scala/collection/Sequentializable.scala15
-rw-r--r--src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala28
-rw-r--r--src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala29
-rw-r--r--src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala66
-rw-r--r--src/parallel-collections/scala/collection/generic/HasNewCombiner.scala26
-rw-r--r--src/parallel-collections/scala/collection/generic/ParallelFactory.scala43
-rw-r--r--src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala42
-rw-r--r--src/parallel-collections/scala/collection/generic/Signalling.scala192
-rw-r--r--src/parallel-collections/scala/collection/generic/Sizing.scala9
-rw-r--r--src/parallel-collections/scala/collection/immutable/package.scala81
-rw-r--r--src/parallel-collections/scala/collection/parallel/Combiners.scala66
-rw-r--r--src/parallel-collections/scala/collection/parallel/Iterators.scala443
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelIterable.scala49
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala940
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelIterableView.scala33
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelIterableViewLike.scala59
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelMap.scala71
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala43
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelSeq.scala64
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala473
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala64
-rw-r--r--src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala192
-rw-r--r--src/parallel-collections/scala/collection/parallel/Splitters.scala86
-rw-r--r--src/parallel-collections/scala/collection/parallel/TaskSupport.scala27
-rw-r--r--src/parallel-collections/scala/collection/parallel/Tasks.scala230
-rw-r--r--src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala137
-rw-r--r--src/parallel-collections/scala/collection/parallel/immutable/ParallelIterable.scala56
-rw-r--r--src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala88
-rw-r--r--src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala47
-rw-r--r--src/parallel-collections/scala/collection/parallel/immutable/package.scala56
-rw-r--r--src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala43
-rw-r--r--src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala561
-rw-r--r--src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala105
-rw-r--r--src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala51
-rw-r--r--src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala61
-rw-r--r--src/parallel-collections/scala/collection/parallel/mutable/package.scala32
-rw-r--r--src/parallel-collections/scala/collection/parallel/package.scala70
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala5
42 files changed, 125 insertions, 4748 deletions
diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala
index 8b4bc070ab..f40905428e 100644
--- a/src/library/scala/collection/immutable/HashMap.scala
+++ b/src/library/scala/collection/immutable/HashMap.scala
@@ -75,7 +75,7 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Par
protected def get0(key: A, hash: Int, level: Int): Option[B] = None
- protected def updated0[B1 >: B](key: A, hash: Int, level: Int, value: B1, kv: (A, B1)): HashMap[A, B1] =
+ def updated0[B1 >: B](key: A, hash: Int, level: Int, value: B1, kv: (A, B1)): HashMap[A, B1] =
new HashMap.HashMap1(key, hash, value, kv)
protected def removed0(key: A, hash: Int, level: Int): HashMap[A, B] = this
diff --git a/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala b/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala
index e29e9dfa98..a9e08913ea 100644
--- a/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala
+++ b/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala
@@ -21,6 +21,10 @@ import scala.collection.immutable.HashMap
+/** Parallel hash trie map.
+ *
+ * @author prokopec
+ */
class ParallelHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V])
extends ParallelMap[K, V]
with GenericParallelMapTemplate[K, V, ParallelHashTrie]
@@ -87,7 +91,7 @@ object ParallelHashTrie extends ParallelMapFactory[ParallelHashTrie] {
new CanCombineFromMap[K, V]
}
- def fromTrie[K, V](trie: HashMap[K, V]): ParallelHashTrie[K, V] = new ParallelHashTrie(trie)
+ def fromTrie[K, V](t: HashMap[K, V]) = new ParallelHashTrie(t)
var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0)
}
@@ -96,31 +100,136 @@ object ParallelHashTrie extends ParallelMapFactory[ParallelHashTrie] {
trait HashTrieCombiner[K, V]
extends Combiner[(K, V), ParallelHashTrie[K, V]] {
self: EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] =>
- private var trie: HashMap[K, V] = HashMap.empty[K, V]
-
- def size: Int = trie.size
-
- def clear = trie = HashMap.empty[K, V]
+ import HashTrieCombiner._
+ var heads = new Array[Unrolled[K, V]](rootsize)
+ var lasts = new Array[Unrolled[K, V]](rootsize)
+ var size: Int = 0
+
+ def clear = {
+ heads = new Array[Unrolled[K, V]](rootsize)
+ lasts = new Array[Unrolled[K, V]](rootsize)
+ }
- def +=(elem: (K, V)) = { trie += elem; this }
+ def +=(elem: (K, V)) = {
+ size += 1
+ val hc = elem._1.##
+ val pos = hc & 0x1f
+ if (lasts(pos) eq null) {
+ // initialize bucket
+ heads(pos) = new Unrolled[K, V]
+ lasts(pos) = heads(pos)
+ }
+ // add to bucket
+ lasts(pos) = lasts(pos).add(elem)
+ this
+ }
def combine[N <: (K, V), NewTo >: ParallelHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
// ParallelHashTrie.totalcombines.incrementAndGet
if (other.isInstanceOf[HashTrieCombiner[_, _]]) {
val that = other.asInstanceOf[HashTrieCombiner[K, V]]
- val ncombiner = HashTrieCombiner[K, V]
- ncombiner.trie = this.trie merge that.trie
- ncombiner
+ var i = 0
+ while (i < rootsize) {
+ if (lasts(i) eq null) {
+ heads(i) = that.heads(i)
+ lasts(i) = that.lasts(i)
+ } else {
+ lasts(i).next = that.heads(i)
+ if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
+ }
+ i += 1
+ }
+ size = size + that.size
+ this
} else error("Unexpected combiner type.")
} else this
- def result = new ParallelHashTrie[K, V](trie)
+ def result = {
+ val buckets = heads.filter(_ != null)
+ val root = new Array[HashMap[K, V]](buckets.length)
+
+ executeAndWait(new CreateTrie(buckets, root, 0, buckets.length))
+
+ var bitmap = 0
+ var i = 0
+ while (i < rootsize) {
+ if (heads(i) ne null) bitmap |= 1 << i
+ i += 1
+ }
+ val sz = root.foldLeft(0)(_ + _.size)
+
+ if (sz == 0) new ParallelHashTrie[K, V]
+ else if (sz == 1) new ParallelHashTrie[K, V](root(0))
+ else {
+ val trie = new HashMap.HashTrieMap(bitmap, root, sz)
+ new ParallelHashTrie[K, V](trie)
+ }
+ }
+
+ /* tasks */
+
+ class CreateTrie(buckets: Array[Unrolled[K, V]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
+ var result = ()
+ def leaf(prev: Option[Unit]) = {
+ var i = offset
+ val until = offset + howmany
+ while (i < until) {
+ root(i) = createTrie(buckets(i))
+ i += 1
+ }
+ }
+ private def createTrie(elems: Unrolled[K, V]): HashMap[K, V] = {
+ var trie = new HashMap[K, V]
+
+ var unrolled = elems
+ var i = 0
+ while (unrolled ne null) {
+ val chunkarr = unrolled.array
+ val chunksz = unrolled.size
+ while (i < chunksz) {
+ val kv = chunkarr(i)
+ val hc = kv._1.##
+ trie = trie.updated0(kv._1, hc, rootbits, kv._2, kv)
+ i += 1
+ }
+ i = 0
+ unrolled = unrolled.next
+ }
+
+ trie
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new CreateTrie(buckets, root, offset, fp), new CreateTrie(buckets, root, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
+ }
}
object HashTrieCombiner {
def apply[K, V] = new HashTrieCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] {}
+
+ private[immutable] val rootbits = 5
+ private[immutable] val rootsize = 1 << 5
+ private[immutable] val unrolledsize = 16
+
+ private[immutable] class Unrolled[K, V] {
+ var size = 0
+ var array = new Array[(K, V)](unrolledsize)
+ var next: Unrolled[K, V] = null
+ // adds and returns itself or the new unrolled if full
+ def add(elem: (K, V)): Unrolled[K, V] = if (size < unrolledsize) {
+ array(size) = elem
+ size += 1
+ this
+ } else {
+ next = new Unrolled[K, V]
+ next.add(elem)
+ }
+ override def toString = "Unrolled(" + array.mkString(", ") + ")"
+ }
}
diff --git a/src/parallel-collections/scala/collection/Parallel.scala b/src/parallel-collections/scala/collection/Parallel.scala
deleted file mode 100644
index e500817745..0000000000
--- a/src/parallel-collections/scala/collection/Parallel.scala
+++ /dev/null
@@ -1,17 +0,0 @@
-package scala.collection
-
-
-
-
-
-
-/** A marker trait for objects with parallelised operations.
- *
- * @since 2.8
- * @author prokopec
- */
-trait Parallel
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/Parallelizable.scala b/src/parallel-collections/scala/collection/Parallelizable.scala
deleted file mode 100644
index bfca4a41d7..0000000000
--- a/src/parallel-collections/scala/collection/Parallelizable.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-package scala.collection
-
-
-
-import parallel.ParallelIterableLike
-
-
-
-/**
- * This trait describes collections which can be turned into parallel collections
- * by invoking the method `par`. Parallelizable collections may be parametrized with
- * a target type different than their own.
- */
-trait Parallelizable[+T, +ParRepr <: Parallel] {
-
- /**
- * Returns a parallel implementation of a collection.
- */
- def par: ParRepr
-
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/Sequentializable.scala b/src/parallel-collections/scala/collection/Sequentializable.scala
deleted file mode 100644
index 61fb24571a..0000000000
--- a/src/parallel-collections/scala/collection/Sequentializable.scala
+++ /dev/null
@@ -1,15 +0,0 @@
-package scala.collection
-
-
-
-
-trait Sequentializable[+T, +Repr] {
-
- /** A view of this parallel collection, but with all
- * of the operations implemented sequentially (i.e. in a single-threaded manner).
- *
- * @return a sequential view of the collection.
- */
- def seq: Repr
-
-} \ No newline at end of file
diff --git a/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala b/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala
deleted file mode 100644
index fcbcd6295e..0000000000
--- a/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-package scala.collection
-package generic
-
-
-
-import scala.collection.parallel._
-
-
-
-
-/**
- * A base trait for parallel builder factories.
- *
- * @tparam From the type of the underlying collection that requests a builder to be created
- * @tparam Elem the element type of the collection to be created
- * @tparam To the type of the collection to be created
- */
-trait CanCombineFrom[-From, -Elem, +To] extends CanBuildFrom[From, Elem, To] with Parallel {
- def apply(from: From): Combiner[Elem, To]
- def apply(): Combiner[Elem, To]
-}
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala b/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala
deleted file mode 100644
index e5ba36f846..0000000000
--- a/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-package scala.collection.generic
-
-
-import scala.collection.parallel.Combiner
-import scala.collection.parallel.ParallelIterable
-import scala.collection.parallel.ParallelMap
-
-
-
-/** A template class for companion objects of parallel collection classes.
- * They should be mixed in together with `GenericCompanion` type.
- * @tparam CC the type constructor representing the collection class
- * @since 2.8
- */
-trait GenericParallelCompanion[+CC[X] <: ParallelIterable[X]] {
- /** The default builder for $Coll objects.
- */
- def newBuilder[A]: Combiner[A, CC[A]]
-
- /** The parallel builder for $Coll objects.
- */
- def newCombiner[A]: Combiner[A, CC[A]]
-}
-
-trait GenericParallelMapCompanion[+CC[P, Q] <: ParallelMap[P, Q]] {
- def newCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]]
-}
-
-
diff --git a/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala b/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala
deleted file mode 100644
index e98c13fa36..0000000000
--- a/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-package scala.collection.generic
-
-
-
-import scala.collection.parallel.Combiner
-import scala.collection.parallel.ParallelIterable
-import scala.collection.parallel.ParallelMap
-import scala.collection.parallel.TaskSupport
-
-
-import annotation.unchecked.uncheckedVariance
-
-
-
-
-
-
-/** A template trait for collections having a companion.
- *
- * @tparam A the element type of the collection
- * @tparam CC the type constructor representing the collection class
- * @since 2.8
- * @author prokopec
- */
-trait GenericParallelTemplate[+A, +CC[X] <: ParallelIterable[X]]
-extends GenericTraversableTemplate[A, CC]
- with HasNewCombiner[A, CC[A] @uncheckedVariance]
- with TaskSupport
-{
- def companion: GenericCompanion[CC] with GenericParallelCompanion[CC]
-
- protected[this] override def newBuilder: collection.mutable.Builder[A, CC[A]] = newCombiner
-
- protected[this] override def newCombiner: Combiner[A, CC[A]] = {
- val cb = companion.newCombiner[A]
- cb.environment = environment
- cb
- }
-
- override def genericBuilder[B]: Combiner[B, CC[B]] = genericCombiner[B]
-
- def genericCombiner[B]: Combiner[B, CC[B]] = {
- val cb = companion.newCombiner[B]
- cb.environment = environment
- cb
- }
-
-}
-
-
-trait GenericParallelMapTemplate[K, +V, +CC[X, Y] <: ParallelMap[X, Y]]
-extends TaskSupport
-{
- def mapCompanion: GenericParallelMapCompanion[CC]
-
- def genericMapCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]] = {
- val cb = mapCompanion.newCombiner[P, Q]
- cb.environment = environment
- cb
- }
-}
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/generic/HasNewCombiner.scala b/src/parallel-collections/scala/collection/generic/HasNewCombiner.scala
deleted file mode 100644
index 2c24b437d8..0000000000
--- a/src/parallel-collections/scala/collection/generic/HasNewCombiner.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-package scala.collection.generic
-
-
-
-import scala.collection.parallel.Combiner
-
-
-
-trait HasNewCombiner[+T, +Repr] {
- protected[this] def newCombiner: Combiner[T, Repr]
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/generic/ParallelFactory.scala b/src/parallel-collections/scala/collection/generic/ParallelFactory.scala
deleted file mode 100644
index 0b9e92aa10..0000000000
--- a/src/parallel-collections/scala/collection/generic/ParallelFactory.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-package scala.collection.generic
-
-
-import scala.collection.parallel.ParallelIterable
-import scala.collection.parallel.Combiner
-
-
-
-/** A template class for companion objects of `ParallelIterable` and subclasses thereof.
- * This class extends `TraversableFactory` and provides a set of operations to create `$Coll` objects.
- *
- * @define $coll parallel collection
- * @define $Coll ParallelIterable
- */
-abstract class ParallelFactory[CC[X] <: ParallelIterable[X] with GenericParallelTemplate[X, CC]]
-extends TraversableFactory[CC]
- with GenericParallelCompanion[CC] {
-
- type EPC[T, C] = collection.parallel.EnvironmentPassingCombiner[T, C]
-
- /**
- * A generic implementation of the `CanBuildFromParallel` trait, which forwards all calls to
- * `apply(from)` to the `genericParallelBuilder` method of the $coll `from`, and calls to `apply()`
- * to this factory.
- */
- class GenericCanCombineFrom[A] extends GenericCanBuildFrom[A] with CanCombineFrom[CC[_], A, CC[A]] {
- override def apply(from: Coll) = from.genericCombiner
- override def apply() = newBuilder[A]
- }
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala b/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala
deleted file mode 100644
index 8f779b4029..0000000000
--- a/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-package scala.collection.generic
-
-
-
-import scala.collection.parallel.ParallelMap
-import scala.collection.parallel.ParallelMapLike
-import scala.collection.parallel.Combiner
-import scala.collection.mutable.Builder
-
-
-
-
-/** A template class for companion objects of `ParallelMap` and subclasses thereof.
- * This class extends `TraversableFactory` and provides a set of operations to create `$Coll` objects.
- *
- * @define $coll parallel map
- * @define $Coll ParallelMap
- */
-abstract class ParallelMapFactory[CC[X, Y] <: ParallelMap[X, Y] with ParallelMapLike[X, Y, CC[X, Y], _]]
-extends MapFactory[CC]
- with GenericParallelMapCompanion[CC] {
-
- type MapColl = CC[_, _]
-
- /** The default builder for $Coll objects.
- * @tparam K the type of the keys
- * @tparam V the type of the associated values
- */
- override def newBuilder[K, V]: Builder[(K, V), CC[K, V]] = newCombiner[K, V]
-
- /** The default combiner for $Coll objects.
- * @tparam K the type of the keys
- * @tparam V the type of the associated values
- */
- def newCombiner[K, V]: Combiner[(K, V), CC[K, V]]
-
- class CanCombineFromMap[K, V] extends CanCombineFrom[CC[_, _], (K, V), CC[K, V]] {
- def apply(from: MapColl) = from.genericMapCombiner[K, V].asInstanceOf[Combiner[(K, V), CC[K, V]]]
- def apply() = newCombiner[K, V]
- }
-
-}
diff --git a/src/parallel-collections/scala/collection/generic/Signalling.scala b/src/parallel-collections/scala/collection/generic/Signalling.scala
deleted file mode 100644
index 1dac4297b7..0000000000
--- a/src/parallel-collections/scala/collection/generic/Signalling.scala
+++ /dev/null
@@ -1,192 +0,0 @@
-package scala.collection.generic
-
-
-import java.util.concurrent.atomic.AtomicInteger
-
-
-
-
-
-/**
- * A message interface serves as a unique interface to the
- * part of the collection capable of receiving messages from
- * a different task.
- *
- * One example of use of this is the `find` method, which can use the
- * signalling interface to inform worker threads that an element has
- * been found and no further search is necessary.
- *
- * @author prokopec
- *
- * @define abortflag
- * Abort flag being true means that a worker can abort and produce whatever result,
- * since its result will not affect the final result of computation. An example
- * of operations using this are `find`, `forall` and `exists` methods.
- *
- * @define indexflag
- * The index flag holds an integer which carries some operation-specific meaning. For
- * instance, `takeWhile` operation sets the index flag to the position of the element
- * where the predicate fails. Other workers may check this index against the indices
- * they are working on and return if this index is smaller than their index. Examples
- * of operations using this are `takeWhile`, `dropWhile`, `span` and `indexOf`.
- */
-trait Signalling {
- /**
- * Checks whether an abort signal has been issued.
- *
- * $abortflag
- * @return the state of the abort
- */
- def isAborted: Boolean
-
- /**
- * Sends an abort signal to other workers.
- *
- * $abortflag
- */
- def abort: Unit
-
- /**
- * Returns the value of the index flag.
- *
- * $indexflag
- * @return the value of the index flag
- */
- def indexFlag: Int
-
- /**
- * Sets the value of the index flag.
- *
- * $indexflag
- * @param f the value to which the index flag is set.
- */
- def setIndexFlag(f: Int)
-
- /**
- * Sets the value of the index flag if argument is greater than current value.
- * This method does this atomically.
- *
- * $indexflag
- * @param f the value to which the index flag is set
- */
- def setIndexFlagIfGreater(f: Int)
-
- /**
- * Sets the value of the index flag if argument is lesser than current value.
- * This method does this atomically.
- *
- * $indexflag
- * @param f the value to which the index flag is set
- */
- def setIndexFlagIfLesser(f: Int)
-
- /**
- * A read only tag specific to the signalling object. It is used to give
- * specific workers information on the part of the collection being operated on.
- */
- def tag: Int
-}
-
-
-/**
- * This signalling implementation returns default values and ignores received signals.
- */
-class DefaultSignalling extends Signalling {
- def isAborted = false
- def abort {}
-
- def indexFlag = -1
- def setIndexFlag(f: Int) {}
- def setIndexFlagIfGreater(f: Int) {}
- def setIndexFlagIfLesser(f: Int) {}
-
- def tag = -1
-}
-
-
-/**
- * An object that returns default values and ignores received signals.
- */
-object IdleSignalling extends DefaultSignalling
-
-
-/**
- * A mixin trait that implements abort flag behaviour using volatile variables.
- */
-trait VolatileAbort extends Signalling {
- @volatile private var abortflag = false
- abstract override def isAborted = abortflag
- abstract override def abort = abortflag = true
-}
-
-
-/**
- * A mixin trait that implements index flag behaviour using atomic integers.
- * The `setIndex` operation is wait-free, while conditional set operations `setIndexIfGreater`
- * and `setIndexIfLesser` are lock-free and support only monotonic changes.
- */
-trait AtomicIndexFlag extends Signalling {
- private val intflag: AtomicInteger = new AtomicInteger(-1)
- abstract override def indexFlag = intflag.get
- abstract override def setIndexFlag(f: Int) = intflag.set(f)
- abstract override def setIndexFlagIfGreater(f: Int) = {
- var loop = true
- do {
- val old = intflag.get
- if (f <= old) loop = false
- else if (intflag.compareAndSet(old, f)) loop = false
- } while (loop);
- }
- abstract override def setIndexFlagIfLesser(f: Int) = {
- var loop = true
- do {
- val old = intflag.get
- if (f >= old) loop = false
- else if (intflag.compareAndSet(old, f)) loop = false
- } while (loop);
- }
-}
-
-
-/**
- * An implementation of the signalling interface using delegates.
- */
-trait DelegatedSignalling extends Signalling {
- /**
- * A delegate that method calls are redirected to.
- */
- var signalDelegate: Signalling
-
- def isAborted = signalDelegate.isAborted
- def abort = signalDelegate.abort
-
- def indexFlag = signalDelegate.indexFlag
- def setIndexFlag(f: Int) = signalDelegate.setIndexFlag(f)
- def setIndexFlagIfGreater(f: Int) = signalDelegate.setIndexFlagIfGreater(f)
- def setIndexFlagIfLesser(f: Int) = signalDelegate.setIndexFlagIfLesser(f)
-
- def tag = signalDelegate.tag
-}
-
-
-/**
- * Class implementing delegated signalling.
- */
-class DelegatedContext(var signalDelegate: Signalling) extends DelegatedSignalling
-
-
-/**
- * Class implementing delegated signalling, but having its own distinct `tag`.
- */
-class TaggedDelegatedContext(deleg: Signalling, override val tag: Int) extends DelegatedContext(deleg)
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/generic/Sizing.scala b/src/parallel-collections/scala/collection/generic/Sizing.scala
deleted file mode 100644
index bf801302ae..0000000000
--- a/src/parallel-collections/scala/collection/generic/Sizing.scala
+++ /dev/null
@@ -1,9 +0,0 @@
-package scala.collection.generic
-
-
-
-/** A trait for objects which have a size.
- */
-trait Sizing {
- def size: Int
-} \ No newline at end of file
diff --git a/src/parallel-collections/scala/collection/immutable/package.scala b/src/parallel-collections/scala/collection/immutable/package.scala
deleted file mode 100644
index 5ff9fa223d..0000000000
--- a/src/parallel-collections/scala/collection/immutable/package.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-package scala.collection
-
-
-
-
-
-
-
-
-
-package object immutable {
-
- trait RangeUtils[+Repr <: RangeUtils[Repr]] {
-
- def start: Int
- def end: Int
- def step: Int
- def inclusive: Boolean
- def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean): Repr
-
- private final def inclusiveLast: Int = {
- val size = end.toLong - start.toLong
- (size / step.toLong * step.toLong + start.toLong).toInt
- }
-
- final def _last: Int = if (!inclusive) {
- if (step == 1 || step == -1) end - step
- else {
- val inclast = inclusiveLast
- if ((end.toLong - start.toLong) % step == 0) inclast - step else inclast
- }
- } else {
- if (step == 1 || step == -1) end
- else inclusiveLast
- }
-
- final def _foreach[U](f: Int => U) = if (_length > 0) {
- var i = start
- val last = _last
- while (i != last) {
- f(i)
- i += step
- }
- }
-
- final def _length: Int = if (!inclusive) {
- if (end > start == step > 0 && start != end) {
- (_last.toLong - start.toLong) / step.toLong + 1
- } else 0
- }.toInt else {
- if (end > start == step > 0 || start == end) {
- (_last.toLong - start.toLong) / step.toLong + 1
- } else 0
- }.toInt
-
- final def _apply(idx: Int): Int = {
- if (idx < 0 || idx >= _length) throw new IndexOutOfBoundsException(idx.toString)
- start + idx * step
- }
-
- private def locationAfterN(n: Int) = if (n > 0) {
- if (step > 0) ((start.toLong + step.toLong * n.toLong) min _last.toLong).toInt
- else ((start.toLong + step.toLong * n.toLong) max _last.toLong).toInt
- } else start
-
- final def _take(n: Int) = if (n > 0 && _length > 0) {
- create(start, locationAfterN(n), step, true)
- } else create(start, start, step, false)
-
- final def _drop(n: Int) = create(locationAfterN(n), end, step, inclusive)
-
- final def _slice(from: Int, until: Int) = _drop(from)._take(until - from)
-
- }
-
-}
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/Combiners.scala b/src/parallel-collections/scala/collection/parallel/Combiners.scala
deleted file mode 100644
index a37f642d42..0000000000
--- a/src/parallel-collections/scala/collection/parallel/Combiners.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-package scala.collection.parallel
-
-
-import scala.collection.Parallel
-import scala.collection.mutable.Builder
-import scala.collection.generic.Sizing
-
-
-
-/** The base trait for all combiners.
- * A combiner lets one construct collections incrementally just like
- * a regular builder, but also implements an efficient merge operation of two builders
- * via `combine` method. Once the collection is constructed, it may be obtained by invoking
- * the `result` method.
- *
- * @tparam Elem the type of the elements added to the builder
- * @tparam To the type of the collection the builder produces
- *
- * @author prokopec
- */
-trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel with TaskSupport {
- self: EnvironmentPassingCombiner[Elem, To] =>
-
- type EPC = EnvironmentPassingCombiner[Elem, To]
-
- /** Combines the contents of the receiver builder and the `other` builder,
- * producing a new builder containing both their elements.
- *
- * This method may combine the two builders by copying them into a larger collection,
- * by producing a lazy view that gets evaluated once `result` is invoked, or use
- * a merge operation specific to the data structure in question.
- *
- * Note that both the receiver builder and `other` builder become invalidated
- * after the invocation of this method, and should be cleared (see `clear`)
- * if they are to be used again.
- *
- * Also, combining two combiners `c1` and `c2` for which `c1 eq c2` is `true`, that is,
- * they are the same objects in memories, always does nothing and returns the first combiner.
- *
- * @tparam N the type of elements contained by the `other` builder
- * @tparam NewTo the type of collection produced by the `other` builder
- * @param other the other builder
- * @return the parallel builder containing both the elements of this and the `other` builder
- */
- def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo]
-
-}
-
-
-trait EnvironmentPassingCombiner[-Elem, +To] extends Combiner[Elem, To] {
- abstract override def result = {
- val res = super.result
-// res.environment = environment
- res
- }
-}
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/Iterators.scala b/src/parallel-collections/scala/collection/parallel/Iterators.scala
deleted file mode 100644
index bfebff994c..0000000000
--- a/src/parallel-collections/scala/collection/parallel/Iterators.scala
+++ /dev/null
@@ -1,443 +0,0 @@
-package scala.collection.parallel
-
-
-
-import scala.collection.Parallel
-import scala.collection.generic.Signalling
-import scala.collection.generic.DelegatedSignalling
-import scala.collection.generic.CanCombineFrom
-import scala.collection.mutable.Builder
-import scala.collection.Iterator.empty
-
-
-
-
-
-
-trait RemainsIterator[+T] extends Iterator[T] {
- /** The number of elements this iterator has yet to iterate.
- * This method doesn't change the state of the iterator.
- */
- def remaining: Int
-}
-
-
-/** Augments iterators with additional methods, mostly transformers,
- * assuming they iterate an iterable collection.
- *
- * @param T type of the elements iterated.
- * @param Repr type of the collection iterator iterates.
- */
-trait AugmentedIterableIterator[+T, +Repr <: Parallel] extends RemainsIterator[T] {
-
- def repr: Repr
-
- /* accessors */
-
- override def count(p: T => Boolean): Int = {
- var i = 0
- while (hasNext) if (p(next)) i += 1
- i
- }
-
- def reduce[U >: T](op: (U, U) => U): U = {
- var r: U = next
- while (hasNext) r = op(r, next)
- r
- }
-
- def fold[U >: T](z: U)(op: (U, U) => U): U = {
- var r = z
- while (hasNext) r = op(r, next)
- r
- }
-
- override def sum[U >: T](implicit num: Numeric[U]): U = {
- var r: U = num.zero
- while (hasNext) r = num.plus(r, next)
- r
- }
-
- override def product[U >: T](implicit num: Numeric[U]): U = {
- var r: U = num.one
- while (hasNext) r = num.times(r, next)
- r
- }
-
- override def min[U >: T](implicit ord: Ordering[U]): T = {
- var r = next
- while (hasNext) {
- val curr = next
- if (ord.lteq(curr, r)) r = curr
- }
- r
- }
-
- override def max[U >: T](implicit ord: Ordering[U]): T = {
- var r = next
- while (hasNext) {
- val curr = next
- if (ord.gteq(curr, r)) r = curr
- }
- r
- }
-
- override def copyToArray[U >: T](array: Array[U], from: Int, len: Int) {
- var i = from
- val until = from + len
- while (i < until && hasNext) {
- array(i) = next
- i += 1
- }
- }
-
- /* transformers to combiners */
-
- def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
- //val cb = pbf(repr)
- cb.sizeHint(remaining)
- while (hasNext) cb += f(next)
- cb
- }
-
- def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = {
- val cb = pbf(repr)
- while (hasNext) {
- val curr = next
- if (pf.isDefinedAt(curr)) cb += pf(curr)
- }
- cb
- }
-
- def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = {
- val cb = pbf(repr)
- while (hasNext) {
- val traversable = f(next)
- if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator
- else cb ++= traversable
- }
- cb
- }
-
- def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](b: Bld): Bld = {
- b.sizeHint(remaining)
- while (hasNext) b += next
- b
- }
-
- def filter2combiner[U >: T, This >: Repr](pred: T => Boolean, cb: Combiner[U, This]): Combiner[U, This] = {
- while (hasNext) {
- val curr = next
- if (pred(curr)) cb += curr
- }
- cb
- }
-
- def filterNot2combiner[U >: T, This >: Repr](pred: T => Boolean, cb: Combiner[U, This]): Combiner[U, This] = {
- while (hasNext) {
- val curr = next
- if (!pred(curr)) cb += curr
- }
- cb
- }
-
- def partition2combiners[U >: T, This >: Repr](pred: T => Boolean, btrue: Combiner[U, This], bfalse: Combiner[U, This]) = {
- while (hasNext) {
- val curr = next
- if (pred(curr)) btrue += curr
- else bfalse += curr
- }
- (btrue, bfalse)
- }
-
- def take2combiner[U >: T, This >: Repr](n: Int, cb: Combiner[U, This]): Combiner[U, This] = {
- cb.sizeHint(n)
- var left = n
- while (left > 0) {
- cb += next
- left -= 1
- }
- cb
- }
-
- def drop2combiner[U >: T, This >: Repr](n: Int, cb: Combiner[U, This]): Combiner[U, This] = {
- drop(n)
- cb.sizeHint(remaining)
- while (hasNext) cb += next
- cb
- }
-
- def slice2combiner[U >: T, This >: Repr](from: Int, until: Int, cb: Combiner[U, This]): Combiner[U, This] = {
- drop(from)
- var left = until - from
- cb.sizeHint(left)
- while (left > 0) {
- cb += next
- left -= 1
- }
- cb
- }
-
- def splitAt2combiners[U >: T, This >: Repr](at: Int, before: Combiner[U, This], after: Combiner[U, This]) = {
- before.sizeHint(at)
- after.sizeHint(remaining - at)
- var left = at
- while (left > 0) {
- before += next
- left -= 1
- }
- while (hasNext) after += next
- (before, after)
- }
-
- def takeWhile2combiner[U >: T, This >: Repr](p: T => Boolean, cb: Combiner[U, This]) = {
- var loop = true
- while (hasNext && loop) {
- val curr = next
- if (p(curr)) cb += curr
- else loop = false
- }
- (cb, loop)
- }
-
- def span2combiners[U >: T, This >: Repr](p: T => Boolean, before: Combiner[U, This], after: Combiner[U, This]) = {
- var isBefore = true
- while (hasNext && isBefore) {
- val curr = next
- if (p(curr)) before += curr
- else {
- after.sizeHint(remaining + 1)
- after += curr
- isBefore = false
- }
- }
- while (hasNext) after += next
- (before, after)
- }
-}
-
-
-trait AugmentedSeqIterator[+T, +Repr <: Parallel] extends AugmentedIterableIterator[T, Repr] {
-
- /** The exact number of elements this iterator has yet to iterate.
- * This method doesn't change the state of the iterator.
- */
- def remaining: Int
-
- /* accessors */
-
- def prefixLength(pred: T => Boolean): Int = {
- var total = 0
- var loop = true
- while (hasNext && loop) {
- if (pred(next)) total += 1
- else loop = false
- }
- total
- }
-
- override def indexWhere(pred: T => Boolean): Int = {
- var i = 0
- var loop = true
- while (hasNext && loop) {
- if (pred(next)) loop = false
- else i += 1
- }
- if (loop) -1 else i
- }
-
- def lastIndexWhere(pred: T => Boolean): Int = {
- var pos = -1
- var i = 0
- while (hasNext) {
- if (pred(next)) pos = i
- i += 1
- }
- pos
- }
-
- def corresponds[S](corr: (T, S) => Boolean)(that: Iterator[S]): Boolean = {
- while (hasNext && that.hasNext) {
- if (!corr(next, that.next)) return false
- }
- hasNext == that.hasNext
- }
-
- /* transformers */
-
- def reverse2combiner[U >: T, This >: Repr](cb: Combiner[U, This]): Combiner[U, This] = {
- cb.sizeHint(remaining)
- var lst = List[T]()
- while (hasNext) lst ::= next
- while (lst != Nil) {
- cb += lst.head
- lst = lst.tail
- }
- cb
- }
-
- def reverseMap2combiner[S, That](f: T => S, cbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = {
- val cb = cbf(repr)
- cb.sizeHint(remaining)
- var lst = List[S]()
- while (hasNext) lst ::= f(next)
- while (lst != Nil) {
- cb += lst.head
- lst = lst.tail
- }
- cb
- }
-
- def updated2combiner[U >: T, That](index: Int, elem: U, cbf: CanCombineFrom[Repr, U, That]): Combiner[U, That] = {
- val cb = cbf(repr)
- cb.sizeHint(remaining)
- var j = 0
- while (hasNext) {
- if (j == index) {
- cb += elem
- next
- } else cb += next
- j += 1
- }
- cb
- }
-
-}
-
-
-
-trait ParallelIterableIterator[+T, +Repr <: Parallel]
-extends AugmentedIterableIterator[T, Repr]
- with Splitter[T]
- with Signalling
- with DelegatedSignalling
-{
- def split: Seq[ParallelIterableIterator[T, Repr]]
-
- /** The number of elements this iterator has yet to traverse. This method
- * doesn't change the state of the iterator.
- *
- * This method is used to provide size hints to builders and combiners, and
- * to approximate positions of iterators within a data structure.
- *
- * '''Note''': This method may be implemented to return an upper bound on the number of elements
- * in the iterator, instead of the exact number of elements to iterate.
- *
- * In that case, 2 considerations must be taken into account:
- *
- * 1) classes that inherit `ParallelIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt` and `copyToArray`.
- *
- * 2) if an iterator provides an upper bound on the number of elements, then after splitting the sum
- * of `remaining` values of split iterators must be less than or equal to this upper bound.
- */
- def remaining: Int
-}
-
-
-trait ParallelSeqIterator[+T, +Repr <: Parallel]
-extends ParallelIterableIterator[T, Repr]
- with AugmentedSeqIterator[T, Repr]
- with PreciseSplitter[T]
-{
- def split: Seq[ParallelSeqIterator[T, Repr]]
- def psplit(sizes: Int*): Seq[ParallelSeqIterator[T, Repr]]
-
- /** The number of elements this iterator has yet to traverse. This method
- * doesn't change the state of the iterator. Unlike the version of this method in the supertrait,
- * method `remaining` in `ParallelSeqLike.this.ParallelIterator` must return an exact number
- * of elements remaining in the iterator.
- *
- * @return an exact number of elements this iterator has yet to iterate
- */
- def remaining: Int
-}
-
-
-trait DelegatedIterator[+T, +Delegate <: Iterator[T]] extends RemainsIterator[T] {
- val delegate: Delegate
- def next = delegate.next
- def hasNext = delegate.hasNext
-}
-
-
-trait Counting[+T] extends RemainsIterator[T] {
- val initialSize: Int
- def remaining = initialSize - traversed
- var traversed = 0
- abstract override def next = {
- val n = super.next
- traversed += 1
- n
- }
-}
-
-
-/** A mixin for iterators that traverse only filtered elements of a delegate.
- */
-trait FilteredIterator[+T, +Delegate <: Iterator[T]] extends DelegatedIterator[T, Delegate] {
- protected[this] val pred: T => Boolean
-
- private[this] var hd: T = _
- private var hdDefined = false
-
- override def hasNext: Boolean = hdDefined || {
- do {
- if (!delegate.hasNext) return false
- hd = delegate.next
- } while (!pred(hd))
- hdDefined = true
- true
- }
-
- override def next = if (hasNext) { hdDefined = false; hd } else empty.next
-}
-
-
-/** A mixin for iterators that traverse elements of the delegate iterator, and of another collection.
- */
-trait AppendedIterator[+T, +Delegate <: Iterator[T]] extends DelegatedIterator[T, Delegate] {
- // `rest` should never alias `delegate`
- protected[this] val rest: Iterator[T]
-
- private[this] var current: Iterator[T] = delegate
-
- override def hasNext = (current.hasNext) || (current == delegate && rest.hasNext)
-
- override def next = {
- if (!current.hasNext) current = rest
- current.next
- }
-
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala b/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala
deleted file mode 100644
index 4882dc19ee..0000000000
--- a/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-package scala.collection.parallel
-
-
-import scala.collection.generic._
-import scala.collection.parallel.mutable.ParallelArrayCombiner
-import scala.collection.parallel.mutable.ParallelArray
-
-
-/** A template trait for parallel iterable collections.
- *
- * $paralleliterableinfo
- *
- * $sideeffects
- *
- * @tparam T the element type of the collection
- *
- * @author prokopec
- * @since 2.8
- */
-trait ParallelIterable[+T] extends Iterable[T]
- with GenericParallelTemplate[T, ParallelIterable]
- with ParallelIterableLike[T, ParallelIterable[T], Iterable[T]] {
- override def companion: GenericCompanion[ParallelIterable] with GenericParallelCompanion[ParallelIterable] = ParallelIterable
-}
-
-/** $factoryinfo
- */
-object ParallelIterable extends ParallelFactory[ParallelIterable] {
- implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelIterable[T]] =
- new GenericCanCombineFrom[T]
-
- def newBuilder[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T]
-
- def newCombiner[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T]
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala
deleted file mode 100644
index 01a108eea0..0000000000
--- a/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala
+++ /dev/null
@@ -1,940 +0,0 @@
-package scala.collection.parallel
-
-
-
-
-import scala.collection.mutable.Builder
-import scala.collection.mutable.ListBuffer
-import scala.collection.IterableLike
-import scala.collection.Parallel
-import scala.collection.Parallelizable
-import scala.collection.Sequentializable
-import scala.collection.generic._
-
-
-
-
-// TODO update docs!!
-/** A template trait for parallel collections of type `ParallelIterable[T]`.
- *
- * $paralleliterableinfo
- *
- * $sideeffects
- *
- * @tparam T the element type of the collection
- * @tparam Repr the type of the actual collection containing the elements
- *
- * @define paralleliterableinfo
- * This is a base trait for Scala parallel collections. It defines behaviour
- * common to all parallel collections. The actual parallel operation implementation
- * is found in the `ParallelIterableFJImpl` trait extending this trait. Concrete
- * parallel collections should inherit both this and that trait.
- *
- * Parallel operations are implemented with divide and conquer style algorithms that
- * parallelize well. The basic idea is to split the collection into smaller parts until
- * they are small enough to be operated on sequentially.
- *
- * All of the parallel operations are implemented in terms of several methods. The first is:
- * {{{
- * def split: Seq[Repr]
- * }}}
- * which splits the collection into a sequence of disjunct views. This is typically a
- * very fast operation which simply creates wrappers around the receiver collection.
- * These views can then be split recursively into smaller views and so on. Each of
- * the views is still a parallel collection.
- *
- * The next method is:
- * {{{
- * def combine[OtherRepr >: Repr](other: OtherRepr): OtherRepr
- * }}}
- * which combines this collection with the argument collection and returns a collection
- * containing both the elements of this collection and the argument collection. This behaviour
- * may be implemented by producing a view that iterates over both collections, by aggressively
- * copying all the elements into the new collection or by lazily creating a wrapper over both
- * collections that gets evaluated once it's needed. It is recommended to avoid copying all of
- * the elements for performance reasons, although that cost might be negligible depending on
- * the use case.
- *
- * Methods:
- * {{{
- * def seq: Repr
- * }}}
- * and
- * {{{
- * def par: Repr
- * }}}
- * produce a view of the collection that has sequential or parallel operations, respectively.
- *
- * The method:
- * {{{
- * def threshold(sz: Int, p: Int): Int
- * }}}
- * provides an estimate on the minimum number of elements the collection has before
- * the splitting stops and depends on the number of elements in the collection. A rule of the
- * thumb is the number of elements divided by 8 times the parallelism level. This method may
- * be overridden in concrete implementations if necessary.
- *
- * Finally, method `newParallelBuilder` produces a new parallel builder.
- *
- * Since this trait extends the `Iterable` trait, methods like `size` and `iterator` must also
- * be implemented.
- *
- * Each parallel collection is bound to a specific fork/join pool, on which dormant worker
- * threads are kept. One can change a fork/join pool of a collection any time except during
- * some method being invoked. The fork/join pool contains other information such as the parallelism
- * level, that is, the number of processors used. When a collection is created, it is assigned the
- * default fork/join pool found in the `scala.collection.parallel` package object.
- *
- * Parallel collections may or may not be strict, and they are not ordered in terms of the `foreach`
- * operation (see `Traversable`). In terms of the iterator of the collection, some collections
- * are ordered (for instance, parallel sequences).
- *
- * @author prokopec
- * @since 2.8
- *
- * @define sideeffects
- * The higher-order functions passed to certain operations may contain side-effects. Since implementations
- * of operations may not be sequential, this means that side-effects may not be predictable and may
- * produce data-races, deadlocks or invalidation of state if care is not taken. It is up to the programmer
- * to either avoid using side-effects or to use some form of synchronization when accessing mutable data.
- *
- * @define undefinedorder
- * The order in which the operations on elements are performed is unspecified and may be nondeterministic.
- *
- * @define pbfinfo
- * An implicit value of class `CanCombineFrom` which determines the
- * result class `That` from the current representation type `Repr` and
- * and the new element type `B`. This builder factory can provide a parallel
- * builder for the resulting collection.
- *
- * @define abortsignalling
- * This method will provide sequential views it produces with `abort` signalling capabilities. This means
- * that sequential views may send and read `abort` signals.
- *
- * @define indexsignalling
- * This method will provide sequential views it produces with `indexFlag` signalling capabilities. This means
- * that sequential views may set and read `indexFlag` state.
- */
-trait ParallelIterableLike[+T, +Repr <: Parallel, +SequentialView <: Iterable[T]]
-extends IterableLike[T, Repr]
- with Parallelizable[T, Repr]
- with Sequentializable[T, SequentialView]
- with Parallel
- with HasNewCombiner[T, Repr]
- with TaskSupport {
- self =>
-
- /** Parallel iterators are split iterators that have additional accessor and
- * transformer methods defined in terms of methods `next` and `hasNext`.
- * When creating a new parallel collection, one might want to override these
- * new methods to make them more efficient.
- *
- * Parallel iterators are augmented with signalling capabilities. This means
- * that a signalling object can be assigned to them as needed.
- *
- * The self-type ensures that signal context passing behaviour gets mixed in
- * a concrete object instance.
- */
- trait ParallelIterator extends ParallelIterableIterator[T, Repr] {
- me: SignalContextPassingIterator[ParallelIterator] =>
- var signalDelegate: Signalling = IdleSignalling
- def repr = self.repr
- def split: Seq[ParallelIterator]
- }
-
- /** A stackable modification that ensures signal contexts get passed along the iterators.
- * A self-type requirement in `ParallelIterator` ensures that this trait gets mixed into
- * concrete iterators.
- */
- trait SignalContextPassingIterator[+IterRepr <: ParallelIterator] extends ParallelIterator {
- // Note: This functionality must be factored out to this inner trait to avoid boilerplate.
- // Also, one could omit the cast below. However, this leads to return type inconsistencies,
- // due to inability to override the return type of _abstract overrides_.
- // Be aware that this stackable modification has to be subclassed, so it shouldn't be rigid
- // on the type of iterators it splits.
- // The alternative is some boilerplate - better to tradeoff some type safety to avoid it here.
- abstract override def split: Seq[IterRepr] = {
- val pits = super.split
- pits foreach { _.signalDelegate = signalDelegate }
- pits.asInstanceOf[Seq[IterRepr]]
- }
- }
-
- /** Convenience for signal context passing iterator.
- */
- type SCPI <: SignalContextPassingIterator[ParallelIterator]
-
- /** Creates a new parallel iterator used to traverse the elements of this parallel collection.
- * This iterator is more specific than the iterator of the returned by `iterator`, and augmented
- * with additional accessor and transformer methods.
- *
- * @return a parallel iterator
- */
- protected def parallelIterator: ParallelIterator
-
- /** Creates a new split iterator used to traverse the elements of this collection.
- *
- * By default, this method is implemented in terms of the protected `parallelIterator` method.
- *
- * @return a split iterator
- */
- def iterator: Splitter[T] = parallelIterator
-
- def par = repr
-
- /** Some minimal number of elements after which this collection should be handled
- * sequentially by different processors.
- *
- * This method depends on the size of the collection and the parallelism level, which
- * are both specified as arguments.
- *
- * @param sz the size based on which to compute the threshold
- * @param p the parallelism level based on which to compute the threshold
- * @return the maximum number of elements for performing operations sequentially
- */
- def threshold(sz: Int, p: Int): Int = thresholdFromSize(sz, p)
-
- /** The `newBuilder` operation returns a parallel builder assigned to this collection's fork/join pool.
- * This method forwards the call to `newCombiner`.
- */
- protected[this] override def newBuilder: collection.mutable.Builder[T, Repr] = newCombiner
-
- /** Optionally reuses existing combiner for better performance. By default it doesn't - subclasses may override this behaviour.
- * The provided combiner `oldc` that can potentially be reused will be either some combiner from the previous computational task, or `None` if there
- * was no previous phase (in which case this method must return `newc`).
- *
- * @param oldc The combiner that is the result of the previous task, or `None` if there was no previous task.
- * @param newc The new, empty combiner that can be used.
- * @return Either `newc` or `oldc`.
- */
- protected def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]): Combiner[S, That] = newc
-
- /* convenience task operations wrapper */
- protected implicit def task2ops[R, Tp](tsk: Task[R, Tp]) = new {
- def mapResult[R1](mapping: R => R1): ResultMapping[R, Tp, R1] = new ResultMapping[R, Tp, R1](tsk) {
- def map(r: R): R1 = mapping(r)
- }
-
- def compose[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] {
- val ft = tsk
- val st = t2
- def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr)
- }
-
- def parallel[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] {
- val ft = tsk
- val st = t2
- def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr)
- }
- }
-
- protected def wrap[R](body: => R) = new NonDivisible[R] {
- def leaf(prevr: Option[R]) = result = body
- var result: R = null.asInstanceOf[R]
- }
-
- /* convenience iterator operations wrapper */
- protected implicit def iterator2ops[PI <: ParallelIterator](it: PI) = new {
- def assign(cntx: Signalling): PI = {
- it.signalDelegate = cntx
- it
- }
- }
-
- protected implicit def builder2ops[Elem, To](cb: Builder[Elem, To]) = new {
- def ifIs[Cmb](isbody: Cmb => Unit) = new {
- def otherwise(notbody: => Unit)(implicit m: ClassManifest[Cmb]) {
- if (cb.getClass == m.erasure) isbody(cb.asInstanceOf[Cmb]) else notbody
- }
- }
- }
-
- override def toString = seq.mkString(stringPrefix + "(", ", ", ")")
-
- /** Reduces the elements of this sequence using the specified associative binary operator.
- *
- * $undefinedorder
- *
- * Note this method has a different signature than the `reduceLeft`
- * and `reduceRight` methods of the trait `Traversable`.
- * The result of reducing may only be a supertype of this parallel collection's
- * type parameter `T`.
- *
- * @tparam U A type parameter for the binary operator, a supertype of `T`.
- * @param op A binary operator that must be associative.
- * @return The result of applying reduce operator `op` between all the elements if the collection is nonempty.
- * @throws UnsupportedOperationException
- * if this $coll is empty.
- */
- def reduce[U >: T](op: (U, U) => U): U = {
- executeAndWaitResult(new Reduce(op, parallelIterator))
- }
-
- /** Optionally reduces the elements of this sequence using the specified associative binary operator.
- *
- * $undefinedorder
- *
- * Note this method has a different signature than the `reduceLeftOption`
- * and `reduceRightOption` methods of the trait `Traversable`.
- * The result of reducing may only be a supertype of this parallel collection's
- * type parameter `T`.
- *
- * @tparam U A type parameter for the binary operator, a supertype of `T`.
- * @param op A binary operator that must be associative.
- * @return An option value containing result of applying reduce operator `op` between all
- * the elements if the collection is nonempty, and `None` otherwise.
- */
- def reduceOption[U >: T](op: (U, U) => U): Option[U] = if (isEmpty) None else Some(reduce(op))
-
- /** Folds the elements of this sequence using the specified associative binary operator.
- * The order in which the elements are reduced is unspecified and may be nondeterministic.
- *
- * Note this method has a different signature than the `foldLeft`
- * and `foldRight` methods of the trait `Traversable`.
- * The result of folding may only be a supertype of this parallel collection's
- * type parameter `T`.
- *
- * @tparam U a type parameter for the binary operator, a supertype of `T`.
- * @param z a neutral element for the fold operation, it may be added to the result
- * an arbitrary number of times, not changing the result (e.g. `Nil` for list concatenation,
- * 0 for addition, or 1 for multiplication)
- * @param op a binary operator that must be associative
- * @return the result of applying fold operator `op` between all the elements and `z`
- */
- def fold[U >: T](z: U)(op: (U, U) => U): U = {
- executeAndWaitResult(new Fold(z, op, parallelIterator))
- }
-
- /** Aggregates the results of applying an operator to subsequent elements.
- *
- * This is a more general form of `fold` and `reduce`. It has similar semantics, but does
- * not require the result to be a supertype of the element type. It traverses the elements in
- * different partitions sequentially, using `seqop` to update the result, and then
- * applies `combop` to results from different partitions. The implementation of this
- * operation may operate on an arbitrary number of collection partitions, so `combop`
- * may be invoked arbitrary number of times.
- *
- * For example, one might want to process some elements and then produce a `Set`. In this
- * case, `seqop` would process an element and append it to the list, while `combop`
- * would concatenate two lists from different partitions together. The initial value
- * `z` would be an empty set.
- *
- * {{{
- * pc.aggregate(Set[Int]())(_ += process(_), _ ++ _)
- * }}}
- *
- * Another example is calculating geometric mean from a collection of doubles
- * (one would typically require big doubles for this).
- *
- * @tparam S the type of accumulated results
- * @param z the initial value for the accumulated result of the partition - this
- * will typically be the neutral element for the `seqop` operator (e.g.
- * `Nil` for list concatenation or `0` for summation)
- * @param seqop an operator used to accumulate results within a partition
- * @param combop an associative operator used to combine results from different partitions
- */
- def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
- executeAndWaitResult(new Aggregate(z, seqop, combop, parallelIterator))
- }
-
- /** Applies a function `f` to all the elements of the receiver.
- *
- * $undefinedorder
- *
- * @tparam U the result type of the function applied to each element, which is always discarded
- * @param f function that's applied to each element
- */
- override def foreach[U](f: T => U): Unit = {
- executeAndWait(new Foreach(f, parallelIterator))
- }
-
- override def count(p: T => Boolean): Int = {
- executeAndWaitResult(new Count(p, parallelIterator))
- }
-
- override def sum[U >: T](implicit num: Numeric[U]): U = {
- executeAndWaitResult(new Sum[U](num, parallelIterator))
- }
-
- override def product[U >: T](implicit num: Numeric[U]): U = {
- executeAndWaitResult(new Product[U](num, parallelIterator))
- }
-
- override def min[U >: T](implicit ord: Ordering[U]): T = {
- executeAndWaitResult(new Min(ord, parallelIterator)).asInstanceOf[T]
- }
-
- override def max[U >: T](implicit ord: Ordering[U]): T = {
- executeAndWaitResult(new Max(ord, parallelIterator)).asInstanceOf[T]
- }
-
- override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf =>
- executeAndWaitResult(new Map[S, That](f, pbf, parallelIterator) mapResult { _.result })
- } otherwise super.map(f)(bf)
-
- override def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf =>
- executeAndWaitResult(new Collect[S, That](pf, pbf, parallelIterator) mapResult { _.result })
- } otherwise super.collect(pf)(bf)
-
- override def flatMap[S, That](f: T => Traversable[S])(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf =>
- executeAndWaitResult(new FlatMap[S, That](f, pbf, parallelIterator) mapResult { _.result })
- } otherwise super.flatMap(f)(bf)
-
- /** Tests whether a predicate holds for all elements of this $coll.
- *
- * $abortsignalling
- *
- * @param p a predicate used to test elements
- * @return true if `p` holds for all elements, false otherwise
- */
- override def forall(pred: T => Boolean): Boolean = {
- executeAndWaitResult(new Forall(pred, parallelIterator assign new DefaultSignalling with VolatileAbort))
- }
-
- /** Tests whether a predicate holds for some element of this $coll.
- *
- * $abortsignalling
- *
- * @param p a predicate used to test elements
- * @return true if `p` holds for some element, false otherwise
- */
- override def exists(pred: T => Boolean): Boolean = {
- executeAndWaitResult(new Exists(pred, parallelIterator assign new DefaultSignalling with VolatileAbort))
- }
-
- /** Finds some element in the collection for which the predicate holds, if such
- * an element exists. The element may not necessarily be the first such element
- * in the iteration order.
- *
- * If there are multiple elements obeying the predicate, the choice is nondeterministic.
- *
- * $abortsignalling
- *
- * @param p predicate used to test the elements
- * @return an option value with the element if such an element exists, or `None` otherwise
- */
- override def find(pred: T => Boolean): Option[T] = {
- executeAndWaitResult(new Find(pred, parallelIterator assign new DefaultSignalling with VolatileAbort))
- }
-
- protected[this] def cbfactory = () => newCombiner
-
- override def filter(pred: T => Boolean): Repr = {
- executeAndWaitResult(new Filter(pred, cbfactory, parallelIterator) mapResult { _.result })
- }
-
- override def filterNot(pred: T => Boolean): Repr = {
- executeAndWaitResult(new FilterNot(pred, cbfactory, parallelIterator) mapResult { _.result })
- }
-
- override def ++[U >: T, That](that: TraversableOnce[U])(implicit bf: CanBuildFrom[Repr, U, That]): That = {
- if (that.isParallel && bf.isParallel) {
- // println("case both are parallel")
- val other = that.asParallelIterable
- val pbf = bf.asParallel
- val copythis = new Copy(() => pbf(repr), parallelIterator)
- val copythat = wrap {
- val othtask = new other.Copy(() => pbf(self.repr), other.parallelIterator)
- othtask.compute
- othtask.result
- }
- val task = (copythis parallel copythat) { _ combine _ } mapResult { _.result }
- executeAndWaitResult(task)
- } else if (bf.isParallel) {
- // println("case parallel builder, `that` not parallel")
- val pbf = bf.asParallel
- val copythis = new Copy(() => pbf(repr), parallelIterator)
- val copythat = wrap {
- val cb = pbf(repr)
- for (elem <- that) cb += elem
- cb
- }
- executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.result })
- } else {
- // println("case not a parallel builder")
- val b = bf(repr)
- this.parallelIterator.copy2builder[U, That, Builder[U, That]](b)
- if (that.isInstanceOf[Parallel]) for (elem <- that.asInstanceOf[Iterable[U]].iterator) b += elem
- else for (elem <- that) b += elem
- b.result
- }
- }
-
- override def partition(pred: T => Boolean): (Repr, Repr) = {
- executeAndWaitResult(new Partition(pred, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) })
- }
-
- override def take(n: Int): Repr = {
- val actualn = if (size > n) n else size
- if (actualn < MIN_FOR_COPY) take_sequential(actualn)
- else executeAndWaitResult(new Take(actualn, cbfactory, parallelIterator) mapResult { _.result })
- }
-
- private def take_sequential(n: Int) = {
- val cb = newCombiner
- cb.sizeHint(n)
- val it = parallelIterator
- var left = n
- while (left > 0) {
- cb += it.next
- left -= 1
- }
- cb.result
- }
-
- override def drop(n: Int): Repr = {
- val actualn = if (size > n) n else size
- if ((size - actualn) < MIN_FOR_COPY) drop_sequential(actualn)
- else executeAndWaitResult(new Drop(actualn, cbfactory, parallelIterator) mapResult { _.result })
- }
-
- private def drop_sequential(n: Int) = {
- val it = parallelIterator drop n
- val cb = newCombiner
- cb.sizeHint(size - n)
- while (it.hasNext) cb += it.next
- cb.result
- }
-
- override def slice(unc_from: Int, unc_until: Int): Repr = {
- val from = unc_from min size max 0
- val until = unc_until min size max from
- if ((until - from) <= MIN_FOR_COPY) slice_sequential(from, until)
- else executeAndWaitResult(new Slice(from, until, cbfactory, parallelIterator) mapResult { _.result })
- }
-
- private def slice_sequential(from: Int, until: Int): Repr = {
- val cb = newCombiner
- var left = until - from
- val it = parallelIterator drop from
- while (left > 0) {
- cb += it.next
- left -= 1
- }
- cb.result
- }
-
- override def splitAt(n: Int): (Repr, Repr) = {
- executeAndWaitResult(new SplitAt(n, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) })
- }
-
- /** Takes the longest prefix of elements that satisfy the predicate.
- *
- * $indexsignalling
- * The index flag is initially set to maximum integer value.
- *
- * @param pred the predicate used to test the elements
- * @return the longest prefix of this $coll of elements that satisy the predicate `pred`
- */
- override def takeWhile(pred: T => Boolean): Repr = {
- val cntx = new DefaultSignalling with AtomicIndexFlag
- cntx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new TakeWhile(0, pred, cbfactory, parallelIterator assign cntx) mapResult { _._1.result })
- }
-
- /** Splits this $coll into a prefix/suffix pair according to a predicate.
- *
- * $indexsignalling
- * The index flag is initially set to maximum integer value.
- *
- * @param pred the predicate used to test the elements
- * @return a pair consisting of the longest prefix of the collection for which all
- * the elements satisfy `pred`, and the rest of the collection
- */
- override def span(pred: T => Boolean): (Repr, Repr) = {
- val cntx = new DefaultSignalling with AtomicIndexFlag
- cntx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new Span(0, pred, cbfactory, parallelIterator assign cntx) mapResult {
- p => (p._1.result, p._2.result)
- })
- }
-
- /** Drops all elements in the longest prefix of elements that satisfy the predicate,
- * and returns a collection composed of the remaining elements.
- *
- * $indexsignalling
- * The index flag is initially set to maximum integer value.
- *
- * @param pred the predicate used to test the elements
- * @return a collection composed of all the elements after the longest prefix of elements
- * in this $coll that satisfy the predicate `pred`
- */
- override def dropWhile(pred: T => Boolean): Repr = {
- val cntx = new DefaultSignalling with AtomicIndexFlag
- cntx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new Span(0, pred, cbfactory, parallelIterator assign cntx) mapResult { _._2.result })
- }
-
- override def copyToArray[U >: T](xs: Array[U], start: Int, len: Int) = if (len > 0) {
- executeAndWait(new CopyToArray(start, len, xs, parallelIterator))
- }
-
- override def toIterable: Iterable[T] = seq.drop(0).asInstanceOf[Iterable[T]]
-
- override def toArray[U >: T: ClassManifest]: Array[U] = {
- val arr = new Array[U](size)
- copyToArray(arr)
- arr
- }
-
- override def toList: List[T] = seq.toList
-
- override def toIndexedSeq[S >: T]: collection.immutable.IndexedSeq[S] = seq.toIndexedSeq[S]
-
- override def toStream: Stream[T] = seq.toStream
-
- override def toSet[S >: T]: collection.immutable.Set[S] = seq.toSet
-
- override def toSeq: Seq[T] = seq.toSeq
-
- /* tasks */
-
- /** Standard accessor task that iterates over the elements of the collection.
- *
- * @tparam R type of the result of this method (`R` for result).
- * @tparam Tp the representation type of the task at hand.
- */
- protected trait Accessor[R, Tp]
- extends super.Task[R, Tp] {
- val pit: ParallelIterator
- def newSubtask(p: ParallelIterator): Accessor[R, Tp]
- def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel)
- def split = pit.split.map(newSubtask(_)) // default split procedure
- override def toString = "Accessor(" + pit.toString + ")"
- }
-
- protected[this] trait NonDivisibleTask[R, Tp] extends super.Task[R, Tp] {
- def shouldSplitFurther = false
- def split = throw new UnsupportedOperationException("Does not split.")
- override def toString = "NonDivisibleTask"
- }
-
- protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]]
-
- protected[this] trait Composite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]]
- extends NonDivisibleTask[R, Composite[FR, SR, R, First, Second]] {
- val ft: First
- val st: Second
- def combineResults(fr: FR, sr: SR): R
- var result: R = null.asInstanceOf[R]
- }
-
- /** Sequentially performs one task after another. */
- protected[this] trait SeqComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]]
- extends Composite[FR, SR, R, First, Second] {
- def leaf(prevr: Option[R]) = {
- ft.compute
- st.compute
- result = combineResults(ft.result, st.result)
- }
- }
-
- /** Performs two tasks in parallel, and waits for both to finish. */
- protected[this] trait ParComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]]
- extends Composite[FR, SR, R, First, Second] {
- def leaf(prevr: Option[R]) = {
- st.start
- ft.compute
- st.sync
- result = combineResults(ft.result, st.result)
- }
- }
-
- protected[this] abstract class ResultMapping[R, Tp, R1](val inner: Task[R, Tp])
- extends NonDivisibleTask[R1, ResultMapping[R, Tp, R1]] {
- var result: R1 = null.asInstanceOf[R1]
- def map(r: R): R1
- def leaf(prevr: Option[R1]) = {
- inner.compute
- result = map(inner.result)
- }
- }
-
- protected trait Transformer[R, Tp] extends Accessor[R, Tp]
-
- protected[this] class Foreach[S](op: T => S, val pit: ParallelIterator) extends Accessor[Unit, Foreach[S]] {
- var result: Unit = ()
- def leaf(prevr: Option[Unit]) = pit.foreach(op)
- def newSubtask(p: ParallelIterator) = new Foreach[S](op, p)
- }
-
- protected[this] class Count(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Int, Count] {
- var result: Int = 0
- def leaf(prevr: Option[Int]) = result = pit.count(pred)
- def newSubtask(p: ParallelIterator) = new Count(pred, p)
- override def merge(that: Count) = result = result + that.result
- }
-
- protected[this] class Reduce[U >: T](op: (U, U) => U, val pit: ParallelIterator) extends Accessor[U, Reduce[U]] {
- var result: U = null.asInstanceOf[U]
- def leaf(prevr: Option[U]) = result = pit.reduce(op)
- def newSubtask(p: ParallelIterator) = new Reduce(op, p)
- override def merge(that: Reduce[U]) = result = op(result, that.result)
- }
-
- protected[this] class Fold[U >: T](z: U, op: (U, U) => U, val pit: ParallelIterator) extends Accessor[U, Fold[U]] {
- var result: U = null.asInstanceOf[U]
- def leaf(prevr: Option[U]) = result = pit.fold(z)(op)
- def newSubtask(p: ParallelIterator) = new Fold(z, op, p)
- override def merge(that: Fold[U]) = result = op(result, that.result)
- }
-
- protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, val pit: ParallelIterator)
- extends Accessor[S, Aggregate[S]] {
- var result: S = null.asInstanceOf[S]
- def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop)
- def newSubtask(p: ParallelIterator) = new Aggregate(z, seqop, combop, p)
- override def merge(that: Aggregate[S]) = result = combop(result, that.result)
- }
-
- protected[this] class Sum[U >: T](num: Numeric[U], val pit: ParallelIterator) extends Accessor[U, Sum[U]] {
- var result: U = null.asInstanceOf[U]
- def leaf(prevr: Option[U]) = result = pit.sum(num)
- def newSubtask(p: ParallelIterator) = new Sum(num, p)
- override def merge(that: Sum[U]) = result = num.plus(result, that.result)
- }
-
- protected[this] class Product[U >: T](num: Numeric[U], val pit: ParallelIterator) extends Accessor[U, Product[U]] {
- var result: U = null.asInstanceOf[U]
- def leaf(prevr: Option[U]) = result = pit.product(num)
- def newSubtask(p: ParallelIterator) = new Product(num, p)
- override def merge(that: Product[U]) = result = num.times(result, that.result)
- }
-
- protected[this] class Min[U >: T](ord: Ordering[U], val pit: ParallelIterator) extends Accessor[U, Min[U]] {
- var result: U = null.asInstanceOf[U]
- def leaf(prevr: Option[U]) = result = pit.min(ord)
- def newSubtask(p: ParallelIterator) = new Min(ord, p)
- override def merge(that: Min[U]) = result = if (ord.lteq(result, that.result)) result else that.result
- }
-
- protected[this] class Max[U >: T](ord: Ordering[U], val pit: ParallelIterator) extends Accessor[U, Max[U]] {
- var result: U = null.asInstanceOf[U]
- def leaf(prevr: Option[U]) = result = pit.max(ord)
- def newSubtask(p: ParallelIterator) = new Max(ord, p)
- override def merge(that: Max[U]) = result = if (ord.gteq(result, that.result)) result else that.result
- }
-
- protected[this] class Map[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator)
- extends Transformer[Combiner[S, That], Map[S, That]] {
- var result: Combiner[S, That] = null
- def leaf(prev: Option[Combiner[S, That]]) = result = pit.map2combiner(f, reuse(prev, pbf(self.repr)))
- def newSubtask(p: ParallelIterator) = new Map(f, pbf, p)
- override def merge(that: Map[S, That]) = result = result combine that.result
- }
-
- protected[this] class Collect[S, That]
- (pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator)
- extends Transformer[Combiner[S, That], Collect[S, That]] {
- var result: Combiner[S, That] = null
- def leaf(prev: Option[Combiner[S, That]]) = result = pit.collect2combiner[S, That](pf, pbf) // TODO
- def newSubtask(p: ParallelIterator) = new Collect(pf, pbf, p)
- override def merge(that: Collect[S, That]) = result = result combine that.result
- }
-
- protected[this] class FlatMap[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator)
- extends Transformer[Combiner[S, That], FlatMap[S, That]] {
- var result: Combiner[S, That] = null
- def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf) // TODO
- def newSubtask(p: ParallelIterator) = new FlatMap(f, pbf, p)
- override def merge(that: FlatMap[S, That]) = result = result combine that.result
- }
-
- protected[this] class Forall(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Boolean, Forall] {
- var result: Boolean = true
- def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.forall(pred); if (result == false) pit.abort }
- def newSubtask(p: ParallelIterator) = new Forall(pred, p)
- override def merge(that: Forall) = result = result && that.result
- }
-
- protected[this] class Exists(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Boolean, Exists] {
- var result: Boolean = false
- def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.exists(pred); if (result == true) pit.abort }
- def newSubtask(p: ParallelIterator) = new Exists(pred, p)
- override def merge(that: Exists) = result = result || that.result
- }
-
- protected[this] class Find[U >: T](pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Option[U], Find[U]] {
- var result: Option[U] = None
- def leaf(prev: Option[Option[U]]) = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort }
- def newSubtask(p: ParallelIterator) = new Find(pred, p)
- override def merge(that: Find[U]) = if (this.result == None) result = that.result
- }
-
- protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
- extends Transformer[Combiner[U, This], Filter[U, This]] {
- var result: Combiner[U, This] = null
- def leaf(prev: Option[Combiner[U, This]]) = result = pit.filter2combiner(pred, reuse(prev, cbf()))
- def newSubtask(p: ParallelIterator) = new Filter(pred, cbf, p)
- override def merge(that: Filter[U, This]) = result = result combine that.result
- }
-
- protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
- extends Transformer[Combiner[U, This], FilterNot[U, This]] {
- var result: Combiner[U, This] = null
- def leaf(prev: Option[Combiner[U, This]]) = result = pit.filterNot2combiner(pred, reuse(prev, cbf()))
- def newSubtask(p: ParallelIterator) = new FilterNot(pred, cbf, p)
- override def merge(that: FilterNot[U, This]) = result = result combine that.result
- }
-
- protected class Copy[U >: T, That](cfactory: () => Combiner[U, That], val pit: ParallelIterator)
- extends Transformer[Combiner[U, That], Copy[U, That]] {
- var result: Combiner[U, That] = null
- def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cfactory()))
- def newSubtask(p: ParallelIterator) = new Copy[U, That](cfactory, p)
- override def merge(that: Copy[U, That]) = result = result combine that.result
- }
-
- protected[this] class Partition[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
- extends Transformer[(Combiner[U, This], Combiner[U, This]), Partition[U, This]] {
- var result: (Combiner[U, This], Combiner[U, This]) = null
- def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.partition2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf()))
- def newSubtask(p: ParallelIterator) = new Partition(pred, cbf, p)
- override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2)
- }
-
- protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator)
- extends Transformer[Combiner[U, This], Take[U, This]] {
- var result: Combiner[U, This] = null
- def leaf(prev: Option[Combiner[U, This]]) = result = pit.take2combiner(n, reuse(prev, cbf()))
- def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val pits = pit.split
- val sizes = pits.scanLeft(0)(_ + _.remaining)
- for ((p, untilp) <- pits zip sizes; if untilp <= n) yield {
- if (untilp + p.remaining < n) new Take(p.remaining, cbf, p)
- else new Take(n - untilp, cbf, p)
- }
- }
- override def merge(that: Take[U, This]) = result = result combine that.result
- }
-
- protected[this] class Drop[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator)
- extends Transformer[Combiner[U, This], Drop[U, This]] {
- var result: Combiner[U, This] = null
- def leaf(prev: Option[Combiner[U, This]]) = result = pit.drop2combiner(n, reuse(prev, cbf()))
- def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val pits = pit.split
- val sizes = pits.scanLeft(0)(_ + _.remaining)
- for ((p, withp) <- pits zip sizes.tail; if withp >= n) yield {
- if (withp - p.remaining > n) new Drop(0, cbf, p)
- else new Drop(n - withp + p.remaining, cbf, p)
- }
- }
- override def merge(that: Drop[U, This]) = result = result combine that.result
- }
-
- protected[this] class Slice[U >: T, This >: Repr](from: Int, until: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator)
- extends Transformer[Combiner[U, This], Slice[U, This]] {
- var result: Combiner[U, This] = null
- def leaf(prev: Option[Combiner[U, This]]) = result = pit.slice2combiner(from, until, reuse(prev, cbf()))
- def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val pits = pit.split
- val sizes = pits.scanLeft(0)(_ + _.remaining)
- for ((p, untilp) <- pits zip sizes; if untilp + p.remaining >= from || untilp <= until) yield {
- val f = (from max untilp) - untilp
- val u = (until min (untilp + p.remaining)) - untilp
- new Slice(f, u, cbf, p)
- }
- }
- override def merge(that: Slice[U, This]) = result = result combine that.result
- }
-
- protected[this] class SplitAt[U >: T, This >: Repr](at: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator)
- extends Transformer[(Combiner[U, This], Combiner[U, This]), SplitAt[U, This]] {
- var result: (Combiner[U, This], Combiner[U, This]) = null
- def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.splitAt2combiners(at, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf()))
- def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val pits = pit.split
- val sizes = pits.scanLeft(0)(_ + _.remaining)
- for ((p, untilp) <- pits zip sizes) yield new SplitAt((at max untilp min (untilp + p.remaining)) - untilp, cbf, p)
- }
- override def merge(that: SplitAt[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2)
- }
-
- protected[this] class TakeWhile[U >: T, This >: Repr]
- (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
- extends Transformer[(Combiner[U, This], Boolean), TakeWhile[U, This]] {
- var result: (Combiner[U, This], Boolean) = null
- def leaf(prev: Option[(Combiner[U, This], Boolean)]) = if (pos < pit.indexFlag) {
- result = pit.takeWhile2combiner(pred, reuse(prev.map(_._1), cbf()))
- if (!result._2) pit.setIndexFlagIfLesser(pos)
- } else result = (reuse(prev.map(_._1), cbf()), false)
- def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val pits = pit.split
- for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new TakeWhile(pos + untilp, pred, cbf, p)
- }
- override def merge(that: TakeWhile[U, This]) = if (result._2) {
- result = (result._1 combine that.result._1, that.result._2)
- }
- }
-
- protected[this] class Span[U >: T, This >: Repr]
- (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator)
- extends Transformer[(Combiner[U, This], Combiner[U, This]), Span[U, This]] {
- var result: (Combiner[U, This], Combiner[U, This]) = null
- def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = if (pos < pit.indexFlag) {
- result = pit.span2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf()))
- if (result._2.size > 0) pit.setIndexFlagIfLesser(pos)
- } else {
- result = (reuse(prev.map(_._2), cbf()), pit.copy2builder[U, This, Combiner[U, This]](reuse(prev.map(_._2), cbf())))
- }
- def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val pits = pit.split
- for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Span(pos + untilp, pred, cbf, p)
- }
- override def merge(that: Span[U, This]) = result = if (result._2.size == 0) {
- (result._1 combine that.result._1, that.result._2)
- } else {
- (result._1, result._2 combine that.result._1 combine that.result._2)
- }
- }
-
- protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], val pit: ParallelIterator)
- extends Accessor[Unit, CopyToArray[U, This]] {
- var result: Unit = ()
- def leaf(prev: Option[Unit]) = pit.copyToArray(array, from, len)
- def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val pits = pit.split
- for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining); if untilp < len) yield {
- val plen = p.remaining min (len - untilp)
- new CopyToArray[U, This](from + untilp, plen, array, p)
- }
- }
- }
-
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelIterableView.scala b/src/parallel-collections/scala/collection/parallel/ParallelIterableView.scala
deleted file mode 100644
index f40f02eb3b..0000000000
--- a/src/parallel-collections/scala/collection/parallel/ParallelIterableView.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-package scala.collection.parallel
-
-
-
-
-import scala.collection.Parallel
-import scala.collection.TraversableViewLike
-import scala.collection.IterableView
-
-
-
-
-/** A template view of a non-strict view of a parallel iterable collection.
- *
- * @tparam T ...
- * @tparam Coll ...
- *
- * @since 2.8
- */
-trait ParallelIterableView[+T, +Coll <: Parallel, +CollSeq]
-extends ParallelIterableViewLike[T, Coll, CollSeq, ParallelIterableView[T, Coll, CollSeq], IterableView[T, CollSeq]]
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelIterableViewLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelIterableViewLike.scala
deleted file mode 100644
index 024eb48d25..0000000000
--- a/src/parallel-collections/scala/collection/parallel/ParallelIterableViewLike.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-package scala.collection.parallel
-
-
-
-
-import scala.collection.Parallel
-import scala.collection.TraversableViewLike
-import scala.collection.IterableView
-import scala.collection.IterableViewLike
-
-
-
-
-
-/** A template view of a non-strict view of parallel iterable collection.
- *
- * '''Note:''' Regular view traits have type parameters used to carry information
- * about the type of the elements, type of the collection they are derived from and
- * their own actual representation type. Parallel views have an additional parameter
- * which carries information about the type of the sequential version of the view.
- *
- * @tparam T the type of the elements this view can traverse
- * @tparam Coll the type of the collection this view is derived from
- * @tparam CollSeq TODO
- * @tparam This the actual representation type of this view
- * @tparam ThisSeq the type of the sequential representation of this view
- *
- * @since 2.8
- */
-trait ParallelIterableViewLike[+T,
- +Coll <: Parallel,
- +CollSeq,
- +This <: ParallelIterableView[T, Coll, CollSeq] with ParallelIterableViewLike[T, Coll, CollSeq, This, ThisSeq],
- +ThisSeq <: IterableView[T, CollSeq] with IterableViewLike[T, CollSeq, ThisSeq]]
-extends IterableView[T, Coll]
- with IterableViewLike[T, Coll, This]
- with ParallelIterable[T]
- with ParallelIterableLike[T, This, ThisSeq]
-{
- self =>
-
- override protected[this] def newCombiner: Combiner[T, This] = throw new UnsupportedOperationException(this + ".newCombiner");
-
- //type SCPI = SignalContextPassingIterator[ParallelIterator] // complains when overriden further in inh. hier., TODO check it out
- type CPI = SignalContextPassingIterator[ParallelIterator]
-
- trait Transformed[+S] extends ParallelIterableView[S, Coll, CollSeq] with super.Transformed[S]
-
-}
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelMap.scala b/src/parallel-collections/scala/collection/parallel/ParallelMap.scala
deleted file mode 100644
index 5ce61469bc..0000000000
--- a/src/parallel-collections/scala/collection/parallel/ParallelMap.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-package scala.collection.parallel
-
-
-
-
-
-import scala.collection.Map
-import scala.collection.mutable.Builder
-import scala.collection.generic.ParallelMapFactory
-import scala.collection.generic.GenericParallelMapTemplate
-import scala.collection.generic.GenericParallelMapCompanion
-import scala.collection.generic.CanCombineFrom
-
-
-
-
-
-
-trait ParallelMap[K, +V]
-extends Map[K, V]
- with GenericParallelMapTemplate[K, V, ParallelMap]
- with ParallelIterable[(K, V)]
- with ParallelMapLike[K, V, ParallelMap[K, V], Map[K, V]]
-{
-self =>
-
- def mapCompanion: GenericParallelMapCompanion[ParallelMap] = ParallelMap
-
- override def empty: ParallelMap[K, V] = new immutable.ParallelHashTrie[K, V]
-
- override def stringPrefix = "ParallelMap"
-}
-
-
-
-object ParallelMap extends ParallelMapFactory[ParallelMap] {
- def empty[K, V]: ParallelMap[K, V] = new immutable.ParallelHashTrie[K, V]
-
- def newCombiner[K, V]: Combiner[(K, V), ParallelMap[K, V]] = immutable.HashTrieCombiner[K, V]
-
- implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParallelMap[K, V]] = new CanCombineFromMap[K, V]
-
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala
deleted file mode 100644
index 8a0b54525f..0000000000
--- a/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-package scala.collection.parallel
-
-
-
-
-import scala.collection.MapLike
-import scala.collection.Map
-import scala.collection.mutable.Builder
-
-
-
-
-
-
-
-
-trait ParallelMapLike[K,
- +V,
- +Repr <: ParallelMapLike[K, V, Repr, SequentialView] with ParallelMap[K, V],
- +SequentialView <: Map[K, V]]
-extends MapLike[K, V, Repr]
- with ParallelIterableLike[(K, V), Repr, SequentialView]
-{ self =>
-
- protected[this] override def newBuilder: Builder[(K, V), Repr] = newCombiner
-
- protected[this] override def newCombiner: Combiner[(K, V), Repr] = error("Must be implemented in concrete classes.")
-
- override def empty: Repr
-
-}
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala
deleted file mode 100644
index 71b802cd11..0000000000
--- a/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-package scala.collection.parallel
-
-
-
-import scala.collection.generic.GenericCompanion
-import scala.collection.generic.GenericParallelCompanion
-import scala.collection.generic.GenericParallelTemplate
-import scala.collection.generic.ParallelFactory
-import scala.collection.generic.CanCombineFrom
-import scala.collection.parallel.mutable.ParallelArrayCombiner
-import scala.collection.parallel.mutable.ParallelArray
-
-
-/** A template trait for parallel sequences.
- *
- * $parallelseqinfo
- *
- * $sideeffects
- */
-trait ParallelSeq[+T] extends Seq[T]
- with ParallelIterable[T]
- with GenericParallelTemplate[T, ParallelSeq]
- with ParallelSeqLike[T, ParallelSeq[T], Seq[T]] {
- override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq
-
- def apply(i: Int): T
-
- override def toString = super[ParallelIterable].toString
-}
-
-
-object ParallelSeq extends ParallelFactory[ParallelSeq] {
- implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeq[T]] = new GenericCanCombineFrom[T]
-
- def newBuilder[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T]
-
- def newCombiner[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T]
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala
deleted file mode 100644
index 18b0c83f23..0000000000
--- a/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala
+++ /dev/null
@@ -1,473 +0,0 @@
-package scala.collection.parallel
-
-
-import scala.collection.Parallel
-import scala.collection.SeqLike
-import scala.collection.generic.DefaultSignalling
-import scala.collection.generic.AtomicIndexFlag
-import scala.collection.generic.CanBuildFrom
-import scala.collection.generic.CanCombineFrom
-import scala.collection.generic.VolatileAbort
-
-
-
-
-// TODO update docs!!
-/** A template trait for sequences of type `ParallelSeq[T]`, representing
- * parallel sequences with element type `T`.
- *
- * $parallelseqinfo
- *
- * @tparam T the type of the elements contained in this collection
- * @tparam Repr the type of the actual collection containing the elements
- *
- * @define parallelseqinfo
- * Parallel sequences inherit the `IndexedSeq` trait. This means they provide
- * efficient indexing and length computations. Like their sequential counterparts
- * they always have a defined order of elements. This means they will produce resulting
- * parallel sequences in the same way sequential sequences do. However, the order
- * in which they iterate over elements to produce results is not defined and is generally
- * nondeterministic. If the higher-order functions given to them produce no sideeffects,
- * then this won't be noticeable.
- *
- * This trait defines a new, more general `split` operation and reimplements the `split`
- * operation of `ParallelIterable` trait using the new `split` operation.
- *
- * @author prokopec
- * @since 2.8
- */
-trait ParallelSeqLike[+T, +Repr <: Parallel, +Sequential <: Seq[T] with SeqLike[T, Sequential]]
-extends scala.collection.SeqLike[T, Repr]
- with ParallelIterableLike[T, Repr, Sequential] {
- self =>
-
- type SuperParallelIterator = super.ParallelIterator
-
- /** An iterator that can be split into arbitrary subsets of iterators.
- * The self-type requirement ensures that the signal context passing behaviour gets mixed in
- * the concrete iterator instance in some concrete collection.
- *
- * '''Note:''' In concrete collection classes, collection implementers might want to override the iterator
- * `reverse2builder` method to ensure higher efficiency.
- */
- trait ParallelIterator extends ParallelSeqIterator[T, Repr] with super.ParallelIterator {
- me: SignalContextPassingIterator[ParallelIterator] =>
- def split: Seq[ParallelIterator]
- def psplit(sizes: Int*): Seq[ParallelIterator]
- }
-
- /** A stackable modification that ensures signal contexts get passed along the iterators.
- * A self-type requirement in `ParallelIterator` ensures that this trait gets mixed into
- * concrete iterators.
- */
- trait SignalContextPassingIterator[+IterRepr <: ParallelIterator]
- extends ParallelIterator with super.SignalContextPassingIterator[IterRepr] {
- // Note: See explanation in `ParallelIterableLike.this.SignalContextPassingIterator`
- // to understand why we do the cast here, and have a type parameter.
- // Bottomline: avoiding boilerplate and fighting against inability to override stackable modifications.
- abstract override def psplit(sizes: Int*): Seq[IterRepr] = {
- val pits = super.psplit(sizes: _*)
- pits foreach { _.signalDelegate = signalDelegate }
- pits.asInstanceOf[Seq[IterRepr]]
- }
- }
-
- /** A convenient shorthand for the signal context passing stackable modification.
- */
- type SCPI <: SignalContextPassingIterator[ParallelIterator]
-
- /** A more refined version of the iterator found in the `ParallelIterable` trait,
- * this iterator can be split into arbitrary subsets of iterators.
- *
- * @return an iterator that can be split into subsets of precise size
- */
- protected def parallelIterator: ParallelIterator
-
- override def iterator: PreciseSplitter[T] = parallelIterator
-
- override def size = length
-
- /** Used to iterate elements using indices */
- protected abstract class Elements(start: Int, val end: Int) extends ParallelIterator with BufferedIterator[T] {
- me: SignalContextPassingIterator[ParallelIterator] =>
-
- private var i = start
-
- def hasNext = i < end
-
- def next: T = if (i < end) {
- val x = self(i)
- i += 1
- x
- } else Iterator.empty.next
-
- def head = self(i)
-
- final def remaining = end - i
-
- def split = psplit(remaining / 2, remaining - remaining / 2)
-
- def psplit(sizes: Int*) = {
- val incr = sizes.scanLeft(0)(_ + _)
- for ((from, until) <- incr.init zip incr.tail) yield {
- new Elements(start + from, (start + until) min end) with SignalContextPassingIterator[ParallelIterator]
- }
- }
-
- override def toString = "Elements(" + start + ", " + end + ")"
- }
-
- /* ParallelSeq methods */
-
- /** Returns the length of the longest segment of elements starting at
- * a given position satisfying some predicate.
- *
- * $indexsignalling
- *
- * The index flag is initially set to maximum integer value.
- *
- * @param p the predicate used to test the elements
- * @param from the starting offset for the search
- * @return the length of the longest segment of elements starting at `from` and
- * satisfying the predicate
- */
- override def segmentLength(p: T => Boolean, from: Int): Int = if (from >= length) 0 else {
- val realfrom = if (from < 0) 0 else from
- val ctx = new DefaultSignalling with AtomicIndexFlag
- ctx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new SegmentLength(p, 0, parallelIterator.psplit(realfrom, length - realfrom)(1) assign ctx))._1
- }
-
- override def prefixLength(p: T => Boolean) = segmentLength(p, 0)
-
- /** Finds the first element satisfying some predicate.
- *
- * $indexsignalling
- *
- * The index flag is initially set to maximum integer value.
- *
- * @param p the predicate used to test the elements
- * @param from the starting offset for the search
- * @return the index `>= from` of the first element of this $coll that satisfies the predicate `p`,
- * or `-1`, if none exists
- */
- override def indexWhere(p: T => Boolean, from: Int): Int = if (from >= length) -1 else {
- val realfrom = if (from < 0) 0 else from
- val ctx = new DefaultSignalling with AtomicIndexFlag
- ctx.setIndexFlag(Int.MaxValue)
- executeAndWaitResult(new IndexWhere(p, realfrom, parallelIterator.psplit(realfrom, length - realfrom)(1) assign ctx))
- }
-
- override def indexWhere(p: T => Boolean): Int = indexWhere(p, 0)
-
- override def findIndexOf(p: T => Boolean): Int = indexWhere(p, 0)
-
- override def indexOf[U >: T](elem: U): Int = indexOf(elem, 0)
-
- override def indexOf[U >: T](elem: U, from: Int): Int = indexWhere(elem ==, from)
-
- /** Finds the last element satisfying some predicate.
- *
- * $indexsignalling
- *
- * The index flag is initially set to minimum integer value.
- *
- * @param p the predicate used to test the elements
- * @param end the maximum offset for the search
- * @return the index `<= end` of the first element of this $coll that satisfies the predicate `p`,
- * or `-1`, if none exists
- */
- override def lastIndexWhere(p: T => Boolean, end: Int): Int = if (end < 0) -1 else {
- val until = if (end >= length) length else end + 1
- val ctx = new DefaultSignalling with AtomicIndexFlag
- ctx.setIndexFlag(Int.MinValue)
- executeAndWaitResult(new LastIndexWhere(p, 0, parallelIterator.psplit(until, length - until)(0) assign ctx))
- }
-
- override def reverse: Repr = {
- executeAndWaitResult(new Reverse(() => newCombiner, parallelIterator) mapResult { _.result })
- }
-
- override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf =>
- executeAndWaitResult(new ReverseMap[S, That](f, pbf, parallelIterator) mapResult { _.result })
- } otherwise super.reverseMap(f)(bf)
-
- override def startsWith[S](that: Seq[S]): Boolean = startsWith(that, 0)
-
- /** Tests whether this $coll contains the given sequence at a given index.
- *
- * $abortsignalling
- *
- * @tparam U the element type of `that` parallel sequence
- * @param that the parallel sequence this sequence is being searched for
- * @param offset the starting offset for the search
- * @return `true` if there is a sequence `that` starting at `offset` in this sequence, `false` otherwise
- */
- override def startsWith[S](that: Seq[S], offset: Int): Boolean = that ifParallelSeq { pthat =>
- if (offset < 0 || offset >= length) offset == length && pthat.length == 0
- else if (pthat.length == 0) true
- else if (pthat.length > length - offset) false
- else {
- val ctx = new DefaultSignalling with VolatileAbort
- executeAndWaitResult(new SameElements(parallelIterator.psplit(offset, pthat.length)(1) assign ctx, pthat.parallelIterator))
- }
- } otherwise super.startsWith(that, offset)
-
- override def sameElements[U >: T](that: Iterable[U]): Boolean = that ifParallelSeq { pthat =>
- val ctx = new DefaultSignalling with VolatileAbort
- length == pthat.length && executeAndWaitResult(new SameElements(parallelIterator assign ctx, pthat.parallelIterator))
- } otherwise super.sameElements(that)
-
- /** Tests whether this $coll ends with the given parallel sequence
- *
- * $abortsignalling
- *
- * @tparam S the type of the elements of `that` sequence
- * @param that the sequence to test
- * @return `true` if this $coll has `that` as a suffix, `false` otherwise
- */
- override def endsWith[S](that: Seq[S]): Boolean = that ifParallelSeq { pthat =>
- if (that.length == 0) true
- else if (that.length > length) false
- else {
- val ctx = new DefaultSignalling with VolatileAbort
- val tlen = that.length
- executeAndWaitResult(new SameElements(parallelIterator.psplit(length - tlen, tlen)(1) assign ctx, pthat.parallelIterator))
- }
- } otherwise super.endsWith(that)
-
- override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int)
- (implicit bf: CanBuildFrom[Repr, U, That]): That = if (patch.isParallelSeq && bf.isParallel) {
- val that = patch.asParallelSeq
- val pbf = bf.asParallel
- val realreplaced = replaced min (length - from)
- val pits = parallelIterator.psplit(from, replaced, length - from - realreplaced)
- val copystart = new Copy[U, That](() => pbf(repr), pits(0))
- val copymiddle = wrap {
- val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator)
- tsk.compute
- tsk.result
- }
- val copyend = new Copy[U, That](() => pbf(repr), pits(2))
- executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { _.result })
- } else patch_sequential(from, patch, replaced)
-
- private def patch_sequential[U >: T, That](from: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
- val b = bf(repr)
- val repl = r min (length - from)
- val pits = parallelIterator.psplit(from, repl, length - from - repl)
- b ++= pits(0)
- b ++= patch.iterator
- b ++= pits(2)
- b.result
- }
-
- override def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = bf ifParallel { pbf =>
- executeAndWaitResult(new Updated(index, elem, pbf, parallelIterator) mapResult { _.result })
- } otherwise super.updated(index, elem)
-
- override def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
- patch(0, mutable.ParallelArray(elem), 0)
- }
-
- override def :+[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
- patch(length, mutable.ParallelArray(elem), 0)
- }
-
- override def padTo[U >: T, That](len: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (length < len) {
- patch(length, new immutable.Repetition(elem, len - length), 0)
- } else patch(length, Nil, 0)
-
- /** Tests whether every element of this $coll relates to the
- * corresponding element of another parallel sequence by satisfying a test predicate.
- *
- * $abortsignalling
- *
- * @param that the other parallel sequence
- * @param p the test predicate, which relates elements from both sequences
- * @tparam S the type of the elements of `that`
- * @return `true` if both parallel sequences have the same length and
- * `p(x, y)` is `true` for all corresponding elements `x` of this $coll
- * and `y` of `that`, otherwise `false`
- */
- override def corresponds[S](that: Seq[S])(p: (T, S) => Boolean): Boolean = that ifParallelSeq { pthat =>
- val ctx = new DefaultSignalling with VolatileAbort
- length == pthat.length && executeAndWaitResult(new Corresponds(p, parallelIterator assign ctx, pthat.parallelIterator))
- } otherwise super.corresponds(that)(p)
-
- override def toString = seq.mkString(stringPrefix + "(", ", ", ")")
-
- override def view = new ParallelSeqView[T, Repr, Sequential] {
- protected lazy val underlying = self.repr
- def length = self.length
- def apply(idx: Int) = self(idx)
- def seq = self.seq.view
- def parallelIterator = new Elements(0, length) with SCPI {}
- }
-
- override def view(from: Int, until: Int) = view.slice(from, until)
-
- /* tasks */
-
- protected def down(p: SuperParallelIterator) = p.asInstanceOf[ParallelIterator]
-
- protected trait Accessor[R, Tp] extends super.Accessor[R, Tp] {
- val pit: ParallelIterator
- }
-
- protected trait Transformer[R, Tp] extends Accessor[R, Tp] with super.Transformer[R, Tp]
-
- protected[this] class SegmentLength(pred: T => Boolean, from: Int, val pit: ParallelIterator)
- extends Accessor[(Int, Boolean), SegmentLength] {
- var result: (Int, Boolean) = null
- def leaf(prev: Option[(Int, Boolean)]) = if (from < pit.indexFlag) {
- val itsize = pit.remaining
- val seglen = pit.prefixLength(pred)
- result = (seglen, itsize == seglen)
- if (!result._2) pit.setIndexFlagIfLesser(from)
- } else result = (0, false)
- def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val pits = pit.split
- for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new SegmentLength(pred, from + untilp, p)
- }
- override def merge(that: SegmentLength) = if (result._2) result = (result._1 + that.result._1, that.result._2)
- }
-
- protected[this] class IndexWhere(pred: T => Boolean, from: Int, val pit: ParallelIterator)
- extends Accessor[Int, IndexWhere] {
- var result: Int = -1
- def leaf(prev: Option[Int]) = if (from < pit.indexFlag) {
- val r = pit.indexWhere(pred)
- if (r != -1) {
- result = from + r
- pit.setIndexFlagIfLesser(from)
- }
- }
- def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val pits = pit.split
- for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield new IndexWhere(pred, untilp, p)
- }
- override def merge(that: IndexWhere) = result = if (result == -1) that.result else {
- if (that.result != -1) result min that.result else result
- }
- }
-
- protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, val pit: ParallelIterator)
- extends Accessor[Int, LastIndexWhere] {
- var result: Int = -1
- def leaf(prev: Option[Int]) = if (pos > pit.indexFlag) {
- val r = pit.lastIndexWhere(pred)
- if (r != -1) {
- result = pos + r
- pit.setIndexFlagIfGreater(pos)
- }
- }
- def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val pits = pit.split
- for ((p, untilp) <- pits zip pits.scanLeft(pos)(_ + _.remaining)) yield new LastIndexWhere(pred, untilp, p)
- }
- override def merge(that: LastIndexWhere) = result = if (result == -1) that.result else {
- if (that.result != -1) result max that.result else result
- }
- }
-
- protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], val pit: ParallelIterator)
- extends Transformer[Combiner[U, This], Reverse[U, This]] {
- var result: Combiner[U, This] = null
- def leaf(prev: Option[Combiner[U, This]]) = result = pit.reverse2combiner(reuse(prev, cbf()))
- def newSubtask(p: SuperParallelIterator) = new Reverse(cbf, down(p))
- override def merge(that: Reverse[U, This]) = result = that.result combine result
- }
-
- protected[this] class ReverseMap[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator)
- extends Transformer[Combiner[S, That], ReverseMap[S, That]] {
- var result: Combiner[S, That] = null
- def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf) // TODO
- def newSubtask(p: SuperParallelIterator) = new ReverseMap(f, pbf, down(p))
- override def merge(that: ReverseMap[S, That]) = result = that.result combine result
- }
-
- protected[this] class SameElements[U >: T](val pit: ParallelIterator, val otherpit: PreciseSplitter[U])
- extends Accessor[Boolean, SameElements[U]] {
- var result: Boolean = true
- def leaf(prev: Option[Boolean]) = if (!pit.isAborted) {
- result = pit.sameElements(otherpit)
- if (!result) pit.abort
- }
- def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val fp = pit.remaining / 2
- val sp = pit.remaining - fp
- for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new SameElements(p, op)
- }
- override def merge(that: SameElements[U]) = result = result && that.result
- }
-
- protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], val pit: ParallelIterator)
- extends Transformer[Combiner[U, That], Updated[U, That]] {
- var result: Combiner[U, That] = null
- def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf) // TODO
- def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val pits = pit.split
- for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Updated(pos - untilp, elem, pbf, p)
- }
- override def merge(that: Updated[U, That]) = result = result combine that.result
- }
-
- protected[this] class Corresponds[S](corr: (T, S) => Boolean, val pit: ParallelIterator, val otherpit: PreciseSplitter[S])
- extends Accessor[Boolean, Corresponds[S]] {
- var result: Boolean = true
- def leaf(prev: Option[Boolean]) = if (!pit.isAborted) {
- result = pit.corresponds(corr)(otherpit)
- if (!result) pit.abort
- }
- def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException
- override def split = {
- val fp = pit.remaining / 2
- val sp = pit.remaining - fp
- for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new Corresponds(corr, p, op)
- }
- override def merge(that: Corresponds[S]) = result = result && that.result
- }
-
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala
deleted file mode 100644
index 7862e99f44..0000000000
--- a/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-package scala.collection.parallel
-
-
-
-
-import scala.collection.TraversableView
-import scala.collection.SeqView
-import scala.collection.Parallel
-import scala.collection.generic.CanCombineFrom
-
-
-
-
-
-/** A template view of a non-strict view of a parallel sequence.
- *
- * @tparam T
- * @tparam Coll
- *
- * @since 2.8
- */
-trait ParallelSeqView[+T, +Coll <: Parallel, +CollSeq]
-extends ParallelSeqViewLike[T, Coll, CollSeq, ParallelSeqView[T, Coll, CollSeq], SeqView[T, CollSeq]]
-
-
-
-object ParallelSeqView {
- abstract class NoCombiner[T] extends Combiner[T, Nothing] {
- self: EnvironmentPassingCombiner[T, Nothing] =>
- def +=(elem: T): this.type = this
- def iterator: Iterator[T] = Iterator.empty
- def result() = throw new UnsupportedOperationException("ParallelSeqView.Combiner.result")
- def size = throw new UnsupportedOperationException("ParallelSeqView.Combiner.size")
- def clear() {}
- def combine[N <: T, NewTo >: Nothing](other: Combiner[N, NewTo]) =
- throw new UnsupportedOperationException("ParallelSeqView.Combiner.result")
- }
-
- type Coll = ParallelSeqView[_, C, _] forSome { type C <: ParallelSeq[_] }
-
- implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] =
- new CanCombineFrom[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] {
- def apply(from: Coll) = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing]
- def apply() = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing]
- }
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala
deleted file mode 100644
index eab4d7ad5f..0000000000
--- a/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala
+++ /dev/null
@@ -1,192 +0,0 @@
-package scala.collection.parallel
-
-
-
-
-
-import scala.collection.SeqView
-import scala.collection.SeqViewLike
-import scala.collection.Parallel
-import scala.collection.generic.CanBuildFrom
-import scala.collection.generic.CanCombineFrom
-
-
-
-
-
-
-
-/** A template view of a non-strict view of parallel sequence.
- *
- * @tparam T the type of the elements in this view
- * @tparam Coll type of the collection this view is derived from
- * @tparam CollSeq TODO
- * @tparam This actual representation type of this view
- * @tparam ThisSeq type of the sequential version of this view
- *
- * @since 2.8
- */
-trait ParallelSeqViewLike[+T,
- +Coll <: Parallel,
- +CollSeq,
- +This <: ParallelSeqView[T, Coll, CollSeq] with ParallelSeqViewLike[T, Coll, CollSeq, This, ThisSeq],
- +ThisSeq <: SeqView[T, CollSeq] with SeqViewLike[T, CollSeq, ThisSeq]]
-extends SeqView[T, Coll]
- with SeqViewLike[T, Coll, This]
- with ParallelIterableView[T, Coll, CollSeq]
- with ParallelIterableViewLike[T, Coll, CollSeq, This, ThisSeq]
- with ParallelSeq[T]
- with ParallelSeqLike[T, This, ThisSeq]
-{
- self =>
-
- type SCPI = SignalContextPassingIterator[ParallelIterator]
-
- trait Transformed[+S] extends ParallelSeqView[S, Coll, CollSeq]
- with super[ParallelIterableView].Transformed[S] with super[SeqView].Transformed[S] {
- override def parallelIterator = new Elements(0, length) with SCPI {}
- override def iterator = parallelIterator
- environment = self.environment
- }
-
- trait Forced[S] extends super.Forced[S] with Transformed[S] {
- // cheating here - knowing that `underlying` of `self.seq` is of type `CollSeq`,
- // we use it to obtain a view of the correct type - not the most efficient thing
- // in the universe, but without making `newForced` more accessible, or adding
- // a `forced` method to `SeqView`, this is the best we can do
- def seq = self.seq.take(0).++(forced).asInstanceOf[SeqView[S, CollSeq]]
- }
-
- trait Filtered extends super.Filtered with Transformed[T] {
- def seq = self.seq filter pred
- }
-
- trait Sliced extends super.Sliced with Transformed[T] {
- override def slice(from1: Int, until1: Int): This = newSliced(from1 max 0, until1 max 0).asInstanceOf[This]
- def seq = self.seq.slice(from, until)
- }
-
- trait Appended[U >: T] extends super.Appended[U] with Transformed[U] {
- def seq = self.seq.++(rest).asInstanceOf[SeqView[U, CollSeq]]
- }
-
- trait Mapped[S] extends super.Mapped[S] with Transformed[S]{
- def seq = self.seq.map(mapping).asInstanceOf[SeqView[S, CollSeq]]
- }
-
- trait FlatMapped[S] extends super.FlatMapped[S] with Transformed[S] {
- def seq = self.seq.flatMap(mapping).asInstanceOf[SeqView[S, CollSeq]]
- }
-
- trait TakenWhile extends super.TakenWhile with Transformed[T] {
- def seq = self.seq takeWhile pred
- }
-
- trait DroppedWhile extends super.DroppedWhile with Transformed[T] {
- def seq = self.seq dropWhile pred
- }
-
- trait Zipped[S] extends super.Zipped[S] with Transformed[(T, S)] {
- def seq = (self.seq zip other).asInstanceOf[SeqView[(T, S), CollSeq]]
- }
-
- trait ZippedAll[T1 >: T, S] extends super.ZippedAll[T1, S] with Transformed[(T1, S)] {
- def seq = self.seq.zipAll(other, thisElem, thatElem).asInstanceOf[SeqView[(T1, S), CollSeq]]
- }
-
- trait Reversed extends super.Reversed with Transformed[T] {
- def seq = self.seq.reverse
- }
-
- trait Patched[U >: T] extends super.Patched[U] with Transformed[U] {
- def seq = self.seq.patch(from, patch, replaced).asInstanceOf[SeqView[U, CollSeq]]
- }
-
- trait Prepended[U >: T] extends super.Prepended[U] with Transformed[U] {
- def seq = (fst +: self.seq).asInstanceOf[SeqView[U, CollSeq]]
- }
-
- protected override def newFiltered(p: T => Boolean): Transformed[T] = new Filtered { val pred = p }
- protected override def newSliced(f: Int, u: Int): Transformed[T] = new Sliced { val from = f; val until = u }
- protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = new Appended[U] { val rest = that }
- protected override def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f }
- protected override def newFlatMapped[S](f: T => Traversable[S]): Transformed[S] = new FlatMapped[S] { val mapping = f }
- protected override def newDroppedWhile(p: T => Boolean): Transformed[T] = new DroppedWhile { val pred = p }
- protected override def newTakenWhile(p: T => Boolean): Transformed[T] = new TakenWhile { val pred = p }
- protected override def newZipped[S](that: Iterable[S]): Transformed[(T, S)] = new Zipped[S] { val other = that }
- protected override def newZippedAll[T1 >: T, S](that: Iterable[S], _thisElem: T1, _thatElem: S): Transformed[(T1, S)] = new ZippedAll[T1, S] { val other = that; val thisElem = _thisElem; val thatElem = _thatElem }
- protected override def newReversed: Transformed[T] = new Reversed { }
- protected override def newPatched[U >: T](_from: Int, _patch: Seq[U], _replaced: Int): Transformed[U] = new Patched[U] { val from = _from; val patch = _patch; val replaced = _replaced }
- protected override def newPrepended[U >: T](elem: U): Transformed[U] = new Prepended[U] { protected[this] val fst = elem }
-
- override def filter(p: T => Boolean): This = newFiltered(p).asInstanceOf[This]
- override def filterNot(p: T => Boolean): This = newFiltered(!p(_)).asInstanceOf[This]
- override def partition(p: T => Boolean): (This, This) = (filter(p), filterNot(p))
- override def slice(from: Int, until: Int): This = newSliced(from, until).asInstanceOf[This]
- override def take(n: Int): This = newSliced(0, n).asInstanceOf[This]
- override def drop(n: Int): This = newSliced(n, length).asInstanceOf[This]
- override def splitAt(n: Int): (This, This) = (take(n), drop(n))
- override def ++[U >: T, That](xs: TraversableOnce[U])(implicit bf: CanBuildFrom[This, U, That]): That = newAppended(xs.toTraversable).asInstanceOf[That]
- override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That]
- override def flatMap[S, That](f: T => Traversable[S])(implicit bf: CanBuildFrom[This, S, That]): That = newFlatMapped(f).asInstanceOf[That]
- override def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[This, S, That]): That = filter(pf.isDefinedAt).map(pf)(bf)
- override def takeWhile(p: T => Boolean): This = newTakenWhile(p).asInstanceOf[This]
- override def dropWhile(p: T => Boolean): This = newDroppedWhile(p).asInstanceOf[This]
- override def span(p: T => Boolean): (This, This) = (takeWhile(p), dropWhile(p))
- override def scanLeft[S, That](z: S)(op: (S, T) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisSeq.scanLeft(z)(op)).asInstanceOf[That]
- override def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisSeq.scanRight(z)(op)).asInstanceOf[That]
- override def groupBy[K](f: T => K): collection.immutable.Map[K, This] = thisSeq.groupBy(f).mapValues(xs => newForced(xs).asInstanceOf[This])
- override def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = newPrepended(elem).asInstanceOf[That]
- override def reverse: This = newReversed.asInstanceOf[This]
- override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int)(implicit bf: CanBuildFrom[This, U, That]): That = newPatched(from, patch, replaced).asInstanceOf[That]
- override def padTo[U >: T, That](len: Int, elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = patch(length, Seq.fill(len - length)(elem), 0)
- override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = reverse.map(f)
- override def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = {
- require(0 <= index && index < length)
- patch(index, List(elem), 1)(bf)
- }
- override def :+[U >: T, That](elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = ++(Iterator.single(elem))(bf)
- override def union[U >: T, That](that: Seq[U])(implicit bf: CanBuildFrom[This, U, That]): That = this ++ that
- override def diff[U >: T](that: Seq[U]): This = newForced(thisSeq diff that).asInstanceOf[This]
- override def intersect[U >: T](that: Seq[U]): This = newForced(thisSeq intersect that).asInstanceOf[This]
- override def sorted[U >: T](implicit ord: Ordering[U]): This = newForced(thisSeq sorted ord).asInstanceOf[This]
-
- override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf =>
- executeAndWaitResult(new Force(pbf, parallelIterator) mapResult { _.result })
- } otherwise {
- val b = bf(underlying)
- b ++= this.iterator
- b.result
- }
-
- /* tasks */
-
- protected[this] class Force[U >: T, That](cbf: CanCombineFrom[Coll, U, That], val pit: ParallelIterator)
- extends Transformer[Combiner[U, That], Force[U, That]] {
- var result: Combiner[U, That] = null
- def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cbf(self.underlying)))
- def newSubtask(p: SuperParallelIterator) = new Force(cbf, down(p))
- override def merge(that: Force[U, That]) = result = result combine that.result
- }
-
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/Splitters.scala b/src/parallel-collections/scala/collection/parallel/Splitters.scala
deleted file mode 100644
index b3cad6d67a..0000000000
--- a/src/parallel-collections/scala/collection/parallel/Splitters.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-package scala.collection.parallel
-
-
-import scala.collection.Seq
-
-
-/** A splitter (or a split iterator) can be split into more splitters that traverse over
- * disjoint subsets of elements.
- *
- * @tparam T type of the elements this parallel iterator traverses
- *
- * @since 2.8.1
- * @author prokopec
- */
-trait Splitter[+T] extends Iterator[T] {
-
- /** Splits the iterator into a sequence of disjunct views.
- *
- * Returns a sequence of split iterators, each iterating over some subset of the
- * elements in the collection. These subsets are disjoint and should be approximately
- * equal in size. These subsets are not empty, unless the iterator is empty in which
- * case this method returns a sequence with a single empty iterator. If the iterator has
- * more than two elements, this method will return two or more iterators.
- *
- * Implementors are advised to keep this partition relatively small - two iterators are
- * already enough when partitioning the collection, although there may be a few more.
- *
- * '''Note:''' this method actually invalidates the current iterator.
- *
- * @return a sequence of disjunct iterators of the collection
- */
- def split: Seq[Splitter[T]]
-
-}
-
-
-/** A precise splitter (or a precise split iterator) can be split into arbitrary number of splitters
- * that traverse disjoint subsets of arbitrary sizes.
- *
- * Implementors might want to override the parameterless `split` method for efficiency.
- *
- * @tparam T type of the elements this parallel iterator traverses
- *
- * @since 2.8.1
- * @author prokopec
- */
-trait PreciseSplitter[+T] extends Splitter[T] {
-
- /** Splits the iterator into disjunct views.
- *
- * This overloaded version of the `split` method is specific to precise parallel iterators.
- * It returns a sequence of parallel iterators, each iterating some subset of the
- * elements in this iterator. The sizes of the subiterators in the partition is equal to
- * the size in the corresponding argument, as long as there are enough elements in this
- * iterator to split it that way.
- *
- * If there aren't enough elements, a zero element iterator is appended for each additional argument.
- * If there are additional elements, an additional iterator is appended at the end to compensate.
- *
- * For example, say we have a parallel iterator `ps` with 100 elements. Invoking:
- * {{{
- * ps.split(50, 25, 25, 10, 5)
- * }}}
- * will return a sequence of five iterators, last two views being empty. On the other hand, calling:
- * {{{
- * ps.split(50, 40)
- * }}}
- * will return a sequence of three iterators, last of them containing ten elements.
- *
- * '''Note:''' this method actually invalidates the current iterator.
- *
- * Unlike the case with `split` found in parallel iterable iterators, views returned by this method can be empty.
- *
- * @param sizes the sizes used to split this split iterator into iterators that traverse disjunct subsets
- * @return a sequence of disjunct subsequence iterators of this parallel iterator
- */
- def psplit(sizes: Int*): Seq[PreciseSplitter[T]]
-
- def split: Seq[PreciseSplitter[T]]
-
-}
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/TaskSupport.scala b/src/parallel-collections/scala/collection/parallel/TaskSupport.scala
deleted file mode 100644
index 8a072b22aa..0000000000
--- a/src/parallel-collections/scala/collection/parallel/TaskSupport.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-package scala.collection.parallel
-
-
-
-
-
-
-
-trait TaskSupport extends AdaptiveWorkStealingForkJoinTasks
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/Tasks.scala b/src/parallel-collections/scala/collection/parallel/Tasks.scala
deleted file mode 100644
index 3ef60f8c7a..0000000000
--- a/src/parallel-collections/scala/collection/parallel/Tasks.scala
+++ /dev/null
@@ -1,230 +0,0 @@
-package scala.collection.parallel
-
-
-
-
-import scala.concurrent.forkjoin._
-
-
-
-
-
-
-
-
-
-
-/** A trait that declares task execution capabilities used
- * by parallel collections. Parallel collections inherit a subtrait
- * of this trait.
- *
- * One implementation trait of `TaskExecution` is `ForkJoinTaskExecution`.
- */
-trait Tasks {
-
- /** A task abstraction which allows starting a task with `start`,
- * waiting for it to finish with `sync` and attempting to cancel
- * the task with `tryCancel`.
- * It also defines a method `leaf` which must be called once the
- * the task is started and defines what this task actually does.
- * Method `split` allows splitting this task into smaller subtasks,
- * and method `shouldSplitFurther` decides if the task should be
- * partitioned further.
- * Method `merge` allows merging the results of the 2 tasks. It updates
- * the result of the receiver.
- * Finally, it defines the task result of type `U`.
- */
- trait Task[R, +Tp] {
- def repr = this.asInstanceOf[Tp]
- /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */
- def compute
- /** Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous task
- * or `None` if there was no previous task.
- */
- def leaf(result: Option[R])
- /** Start task. */
- def start
- /** Wait for task to finish. */
- def sync
- /** Try to cancel the task.
- * @return `true` if cancellation is successful.
- */
- def tryCancel: Boolean
- /** A result that can be accessed once the task is completed. */
- def result: R
- /** Decides whether or not this task should be split further. */
- def shouldSplitFurther: Boolean
- /** Splits this task into a list of smaller tasks. */
- protected[this] def split: Seq[Task[R, Tp]]
- /** Read of results of `that` task and merge them into results of this one. */
- protected[this] def merge(that: Tp) {}
- }
-
- type TaskType[R, +Tp] <: Task[R, Tp]
- type ExecutionEnvironment
-
- var environment: ExecutionEnvironment
-
- /** Executes a task and waits for it to finish. */
- def executeAndWait[R, Tp](task: TaskType[R, Tp])
-
- /** Executes a result task, waits for it to finish, then returns its result. */
- def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R
-
- /** Retrieves the parallelism level of the task execution environment. */
- def parallelismLevel: Int
-
-}
-
-
-/** This trait implements scheduling by employing
- * an adaptive work stealing technique.
- */
-trait AdaptiveWorkStealingTasks extends Tasks {
-
- trait Task[R, Tp] extends super.Task[R, Tp] {
- var next: Task[R, Tp] = null
- var shouldWaitFor = true
- var result: R
-
- def split: Seq[Task[R, Tp]]
-
- /** The actual leaf computation. */
- def leaf(result: Option[R]): Unit
-
- def compute = if (shouldSplitFurther) internal else leaf(None)
-
- def internal = {
- var last = spawnSubtasks
-
- last.leaf(None)
- result = last.result
-
- while (last.next != null) {
- val lastresult = Option(last.result)
- last = last.next
- if (last.tryCancel) last.leaf(lastresult) else last.sync
- merge(last.repr)
- }
- }
-
- def spawnSubtasks = {
- var last: Task[R, Tp] = null
- var head: Task[R, Tp] = this
- do {
- val subtasks = head.split
- head = subtasks.head
- for (t <- subtasks.tail) {
- t.next = last
- last = t
- t.start
- }
- } while (head.shouldSplitFurther);
- head.next = last
- head
- }
-
- def printChain = {
- var curr = this
- var chain = "chain: "
- while (curr != null) {
- chain += curr + " ---> "
- curr = curr.next
- }
- println(chain)
- }
- }
-
-}
-
-
-/**
- * A trait describing objects that provide a fork/join pool.
- */
-trait HavingForkJoinPool {
- def forkJoinPool: ForkJoinPool
-}
-
-
-
-/** An implementation trait for parallel tasks based on the fork/join framework.
- *
- * @define fjdispatch
- * If the current thread is a fork/join worker thread, the task's `fork` method will
- * be invoked. Otherwise, the task will be executed on the fork/join pool.
- */
-trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
-
- trait Task[R, +Tp] extends RecursiveAction with super.Task[R, Tp] {
- def start = fork
- def sync = join
- def tryCancel = tryUnfork
- var result: R
- }
-
- type TaskType[R, +Tp] = Task[R, Tp]
- type ExecutionEnvironment = ForkJoinPool
-
- /** The fork/join pool of this collection.
- */
- def forkJoinPool: ForkJoinPool = environment
- var environment = ForkJoinTasks.defaultForkJoinPool
-
- /** Executes a task on a fork/join pool and waits for it to finish.
- *
- * $fjdispatch
- */
- def executeAndWait[R, Tp](fjtask: Task[R, Tp]) {
- if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
- fjtask.fork
- } else {
- forkJoinPool.execute(fjtask)
- }
- fjtask.join
- }
-
- /** Executes a task on a fork/join pool and waits for it to finish.
- * Returns its result when it does.
- *
- * $fjdispatch
- *
- * @return the result of the task
- */
- def executeAndWaitResult[R, Tp](fjtask: Task[R, Tp]): R = {
- if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
- fjtask.fork
- } else {
- forkJoinPool.execute(fjtask)
- }
- fjtask.join
- fjtask.result
- }
-
- def parallelismLevel = forkJoinPool.getParallelism
-
-}
-
-object ForkJoinTasks {
- val defaultForkJoinPool = new ForkJoinPool
- defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
- defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
-}
-
-
-/* Some boilerplate due to no deep mixin composition. Not sure if it can be done differently without them.
- */
-trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks {
-
- trait Task[R, Tp] extends super[ForkJoinTasks].Task[R, Tp] with super[AdaptiveWorkStealingTasks].Task[R, Tp]
-
-}
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala
deleted file mode 100644
index 0a39dd200c..0000000000
--- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-package scala.collection.parallel.immutable
-
-
-
-
-
-
-
-import scala.collection.parallel.ParallelMap
-import scala.collection.parallel.ParallelMapLike
-import scala.collection.parallel.Combiner
-import scala.collection.parallel.EnvironmentPassingCombiner
-import scala.collection.generic.ParallelMapFactory
-import scala.collection.generic.CanCombineFrom
-import scala.collection.generic.GenericParallelMapTemplate
-import scala.collection.generic.GenericParallelMapCompanion
-import scala.collection.immutable.HashMap
-
-
-
-
-
-
-class ParallelHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V])
-extends ParallelMap[K, V]
- with GenericParallelMapTemplate[K, V, ParallelHashTrie]
- with ParallelMapLike[K, V, ParallelHashTrie[K, V], HashMap[K, V]]
-{
-self =>
-
- def this() = this(HashMap.empty[K, V])
-
- override def mapCompanion: GenericParallelMapCompanion[ParallelHashTrie] = ParallelHashTrie
-
- override def empty: ParallelHashTrie[K, V] = new ParallelHashTrie[K, V]
-
- def parallelIterator = new ParallelHashTrieIterator(trie) with SCPI
-
- def seq = trie
-
- def -(k: K) = new ParallelHashTrie(trie - k)
-
- def +[U >: V](kv: (K, U)) = new ParallelHashTrie(trie + kv)
-
- def get(k: K) = trie.get(k)
-
- override def size = trie.size
-
- protected override def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]) = oldc match {
- case Some(old) => old
- case None => newc
- }
-
- type SCPI = SignalContextPassingIterator[ParallelHashTrieIterator]
-
- class ParallelHashTrieIterator(val ht: HashMap[K, V])
- extends super.ParallelIterator {
- self: SignalContextPassingIterator[ParallelHashTrieIterator] =>
- // println("created iterator " + ht)
- var i = 0
- lazy val triter = ht.iterator
- def split: Seq[ParallelIterator] = {
- // println("splitting " + ht + " into " + ht.split.map(new ParallelHashTrieIterator(_) with SCPI).map(_.toList))
- ht.split.map(new ParallelHashTrieIterator(_) with SCPI)
- }
- def next: (K, V) = {
- // println("taking next after " + i + ", in " + ht)
- i += 1
- triter.next
- }
- def hasNext: Boolean = {
- // println("hasNext: " + i + ", " + ht.size + ", " + ht)
- i < ht.size
- }
- def remaining = ht.size - i
- }
-
-}
-
-
-object ParallelHashTrie extends ParallelMapFactory[ParallelHashTrie] {
- def empty[K, V]: ParallelHashTrie[K, V] = new ParallelHashTrie[K, V]
-
- def newCombiner[K, V]: Combiner[(K, V), ParallelHashTrie[K, V]] = HashTrieCombiner[K, V]
-
- implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParallelHashTrie[K, V]] = {
- new CanCombineFromMap[K, V]
- }
-
- var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0)
-}
-
-
-trait HashTrieCombiner[K, V]
-extends Combiner[(K, V), ParallelHashTrie[K, V]] {
-self: EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] =>
- private var trie: HashMap[K, V] = HashMap.empty[K, V]
-
- def size: Int = trie.size
-
- def clear = trie = HashMap.empty[K, V]
-
- def +=(elem: (K, V)) = { trie += elem; this }
-
- def combine[N <: (K, V), NewTo >: ParallelHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
- // ParallelHashTrie.totalcombines.incrementAndGet
- if (other.isInstanceOf[HashTrieCombiner[_, _]]) {
- val that = other.asInstanceOf[HashTrieCombiner[K, V]]
- val ncombiner = HashTrieCombiner[K, V]
- ncombiner.trie = this.trie combine that.trie
- ncombiner
- } else error("Unexpected combiner type.")
- } else this
-
- def result = new ParallelHashTrie[K, V](trie)
-
-}
-
-
-object HashTrieCombiner {
- def apply[K, V] = new HashTrieCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] {}
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelIterable.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelIterable.scala
deleted file mode 100644
index 92bf5ab706..0000000000
--- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelIterable.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-package scala.collection.parallel.immutable
-
-
-import scala.collection.generic._
-
-import scala.collection.parallel.ParallelIterableLike
-import scala.collection.parallel.Combiner
-
-
-
-
-
-// TODO uncomment when we add parallel vectors
-
-///** A template trait for immutable parallel iterable collections.
-// *
-// * $paralleliterableinfo
-// *
-// * $sideeffects
-// *
-// * @tparam A the element type of the collection
-// *
-// * @author prokopec
-// * @since 2.8
-// */
-//trait ParallelIterable[A] extends collection.immutable.Iterable[A]
-// with collection.parallel.ParallelIterable[A]
-// with GenericParallelTemplate[A, ParallelIterable]
-// with ParallelIterableLike[A, ParallelIterable[A], Iterable[A]] {
-// override def companion: GenericCompanion[ParallelIterable] with GenericParallelCompanion[ParallelIterable] = ParallelIterable
-//}
-//
-///** $factoryinfo
-// */
-//object ParallelIterable extends ParallelFactory[ParallelIterable] {
-// implicit def canBuildFrom[A]: CanBuildFromParallel[Coll, A, ParallelIterable[A]] =
-// new GenericCanBuildFromParallel[A]
-//
-// def newBuilder[A]: Combiner[A, ParallelIterable[A]] = null // TODO
-//
-// def newCombiner[A]: Combiner[A, ParallelIterable[A]] = null // TODO
-//}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala
deleted file mode 100644
index 85a33c7431..0000000000
--- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-package scala.collection.parallel.immutable
-
-
-
-import scala.collection.immutable.Range
-import scala.collection.immutable.RangeUtils
-import scala.collection.parallel.ParallelSeq
-import scala.collection.parallel.Combiner
-import scala.collection.generic.CanCombineFrom
-
-
-
-class ParallelRange(val start: Int, val end: Int, val step: Int, val inclusive: Boolean)
-extends ParallelSeq[Int]
- with RangeUtils[ParallelRange] {
- self =>
-
- def seq = new Range(start, end, step)
-
- def length = _length
-
- def apply(idx: Int) = _apply(idx)
-
- def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = new ParallelRange(_start, _end, _step, _inclusive)
-
- def parallelIterator = new ParallelRangeIterator with SCPI
-
- override def toString = seq.toString // TODO
-
- type SCPI = SignalContextPassingIterator[ParallelRangeIterator]
-
- class ParallelRangeIterator
- (var start: Int = self.start, val end: Int = self.end, val step: Int = self.step, val inclusive: Boolean = self.inclusive)
- extends ParallelIterator with RangeUtils[ParallelRangeIterator] {
- me: SignalContextPassingIterator[ParallelRangeIterator] =>
- def remaining = _length
- def next = { val r = start; start += step; r }
- def hasNext = remaining > 0
- def split: Seq[ParallelIterator] = psplit(remaining / 2, remaining - remaining / 2)
- def psplit(sizes: Int*): Seq[ParallelIterator] = {
- val incr = sizes.scanLeft(0)(_ + _)
- for ((from, until) <- incr.init zip incr.tail) yield _slice(from, until)
- }
- def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = {
- new ParallelRangeIterator(_start, _end, _step, _inclusive) with SCPI
- }
-
- override def toString = "ParallelRangeIterator(" + start + ", " + end + ", " + step + ", incl: " + inclusive + ")"
-
- /* accessors */
-
- override def foreach[U](f: Int => U): Unit = {
- _foreach(f)
- start = end + step
- }
-
- override def reduce[U >: Int](op: (U, U) => U): U = {
- var sum = next
- for (elem <- this) sum += elem
- sum
- }
-
- /* transformers */
-
- override def map2combiner[S, That](f: Int => S, cb: Combiner[S, That]): Combiner[S, That] = {
- //val cb = pbf(self.repr)
- val sz = remaining
- cb.sizeHint(sz)
- if (sz > 0) {
- val last = _last
- while (start != last) {
- f(start)
- start += step
- }
- }
- cb
- }
-
- }
-
-}
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala
deleted file mode 100644
index ceb0dcc13d..0000000000
--- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-package scala.collection.parallel.immutable
-
-
-import scala.collection.generic.GenericParallelTemplate
-import scala.collection.generic.GenericCompanion
-import scala.collection.generic.GenericParallelCompanion
-import scala.collection.generic.CanCombineFrom
-import scala.collection.generic.ParallelFactory
-import scala.collection.parallel.ParallelSeqLike
-import scala.collection.parallel.Combiner
-
-
-
-
-
-
-// TODO uncomment when we add parallel vectors
-
-///** An immutable variant of `ParallelSeq`.
-// *
-// * @define Coll mutable.ParallelSeq
-// * @define coll mutable parallel sequence
-// */
-//trait ParallelSeq[A] extends collection.immutable.IndexedSeq[A]
-// with ParallelIterable[A]
-// with collection.parallel.ParallelSeq[A]
-// with GenericParallelTemplate[A, ParallelSeq]
-// with ParallelSeqLike[A, ParallelSeq[A], Seq[A]] {
-// override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq
-//
-//}
-//
-//
-///** $factoryInfo
-// * @define Coll mutable.ParallelSeq
-// * @define coll mutable parallel sequence
-// */
-//object ParallelSeq extends ParallelFactory[ParallelSeq] {
-// implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeq[T]] = new GenericCanBuildFromParallel[T]
-//
-// def newBuilder[A]: Combiner[A, ParallelSeq[A]] = null // TODO
-//
-// def newCombiner[A]: Combiner[A, ParallelSeq[A]] = null // TODO
-//}
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/immutable/package.scala b/src/parallel-collections/scala/collection/parallel/immutable/package.scala
deleted file mode 100644
index 054786afaf..0000000000
--- a/src/parallel-collections/scala/collection/parallel/immutable/package.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-package scala.collection.parallel
-
-
-
-
-
-
-
-
-
-
-
-package object immutable {
-
- /** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method.
- *
- * @tparam T type of the elements
- * @param elem the element in the repetition
- * @param length the length of the collection
- */
- private[parallel] class Repetition[T](elem: T, val length: Int) extends ParallelSeq[T] {
- self =>
-
- def apply(idx: Int) = if (0 <= idx && idx < length) elem else throw new IndexOutOfBoundsException
- def seq = throw new UnsupportedOperationException
- def update(idx: Int, elem: T) = throw new UnsupportedOperationException
-
- type SCPI = SignalContextPassingIterator[ParallelIterator]
-
- class ParallelIterator(var i: Int = 0, val until: Int = length, elem: T = self.elem) extends super.ParallelIterator {
- me: SignalContextPassingIterator[ParallelIterator] =>
- def remaining = until - i
- def hasNext = i < until
- def next = { i += 1; elem }
- def psplit(sizes: Int*) = {
- val incr = sizes.scanLeft(0)(_ + _)
- for ((start, end) <- incr.init zip incr.tail) yield new ParallelIterator(i + start, (i + end) min until, elem) with SCPI
- }
- def split = psplit(remaining / 2, remaining - remaining / 2)
- }
-
- def parallelIterator = new ParallelIterator with SCPI
-
- }
-
-}
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala b/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala
deleted file mode 100644
index bd17d24ea8..0000000000
--- a/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-package scala.collection.parallel.mutable
-
-
-
-
-import scala.collection.generic.Growable
-import scala.collection.generic.Sizing
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.parallel.Combiner
-
-
-
-
-/** Implements combining contents of two combiners
- * by postponing the operation until `result` method is called. It chains
- * the leaf results together instead of evaluating the actual collection.
- *
- * @tparam Elem the type of the elements in the combiner
- * @tparam To the type of the collection the combiner produces
- * @tparam Buff the type of the buffers that contain leaf results and this combiner chains together
- */
-trait LazyCombiner[Elem, +To, Buff <: Growable[Elem] with Sizing] extends Combiner[Elem, To]
-{
- self: collection.parallel.EnvironmentPassingCombiner[Elem, To] =>
- val chain: ArrayBuffer[Buff]
- val lastbuff = chain.last
- def +=(elem: Elem) = { lastbuff += elem; this }
- def result: To = allocateAndCopy
- def clear = { chain.clear }
- def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
- if (other.isInstanceOf[LazyCombiner[_, _, _]]) {
- val that = other.asInstanceOf[LazyCombiner[Elem, To, Buff]]
- newLazyCombiner(chain ++= that.chain)
- } else throw new UnsupportedOperationException("Cannot combine with combiner of different type.")
- } else this
- def size = chain.foldLeft(0)(_ + _.size)
-
- /** Method that allocates the data structure and copies elements into it using
- * `size` and `chain` members.
- */
- def allocateAndCopy: To
- def newLazyCombiner(buffchain: ArrayBuffer[Buff]): LazyCombiner[Elem, To, Buff]
-}
diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala
deleted file mode 100644
index 30b4b109b2..0000000000
--- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala
+++ /dev/null
@@ -1,561 +0,0 @@
-package scala.collection.parallel.mutable
-
-
-
-import scala.collection.generic.GenericParallelTemplate
-import scala.collection.generic.GenericCompanion
-import scala.collection.generic.GenericParallelCompanion
-import scala.collection.generic.CanCombineFrom
-import scala.collection.generic.ParallelFactory
-import scala.collection.generic.Sizing
-import scala.collection.parallel.Combiner
-import scala.collection.parallel.ParallelSeqLike
-import scala.collection.parallel.CHECK_RATE
-import scala.collection.mutable.ArraySeq
-import scala.collection.mutable.Builder
-import scala.collection.Sequentializable
-
-
-
-
-/** Parallel sequence holding elements in a linear array.
- *
- * `ParallelArray` is a parallel sequence with a predefined size. The size of the array
- * cannot be changed after it's been created.
- *
- * `ParallelArray` internally keeps an array containing the elements. This means that
- * bulk operations based on traversal are fast, but those returning a parallel array as a result
- * are slightly slower. The reason for this is that `ParallelArray` uses lazy builders that
- * create the internal data array only after the size of the array is known. The fragments
- * are then copied into the resulting data array in parallel using fast array copy operations.
- * Operations for which the resulting array size is known in advance are optimised to use this
- * information.
- *
- * @tparam T type of the elements in the array
- *
- * @define Coll ParallelArray
- * @define coll parallel array
- */
-class ParallelArray[T] private[mutable] (val arrayseq: ArraySeq[T])
-extends ParallelSeq[T]
- with GenericParallelTemplate[T, ParallelArray]
- with ParallelSeqLike[T, ParallelArray[T], ArraySeq[T]]
-{
- self =>
-
- private val array: Array[Any] = arrayseq.array.asInstanceOf[Array[Any]]
-
- override def companion: GenericCompanion[ParallelArray] with GenericParallelCompanion[ParallelArray] = ParallelArray
-
- def this(sz: Int) = this {
- require(sz >= 0)
- new ArraySeq[T](sz)
- }
-
- def apply(i: Int) = array(i).asInstanceOf[T]
-
- def update(i: Int, elem: T) = array(i) = elem
-
- def length = arrayseq.length
-
- def seq = arrayseq
-
- type SCPI = SignalContextPassingIterator[ParallelArrayIterator]
-
- def parallelIterator: ParallelArrayIterator = {
- val pit = new ParallelArrayIterator with SCPI
- pit
- }
-
- class ParallelArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array)
- extends super.ParallelIterator {
- me: SignalContextPassingIterator[ParallelArrayIterator] =>
-
- def hasNext = i < until
-
- def next = {
- val elem = arr(i)
- i += 1
- elem.asInstanceOf[T]
- }
-
- def remaining = until - i
-
- def psplit(sizesIncomplete: Int*): Seq[ParallelIterator] = {
- var traversed = i
- val total = sizesIncomplete.reduceLeft(_ + _)
- val left = remaining
- val sizes = if (total >= left) sizesIncomplete else sizesIncomplete :+ (left - total)
- for (sz <- sizes) yield if (traversed < until) {
- val start = traversed
- val end = (traversed + sz) min until
- traversed = end
- new ParallelArrayIterator(start, end, arr) with SCPI
- } else {
- new ParallelArrayIterator(traversed, traversed, arr) with SCPI
- }
- }
-
- override def split: Seq[ParallelIterator] = {
- val left = remaining
- if (left >= 2) {
- val splitpoint = left / 2
- Seq(new ParallelArrayIterator(i, i + splitpoint, arr) with SCPI,
- new ParallelArrayIterator(i + splitpoint, until, arr) with SCPI)
- } else {
- Seq(this)
- }
- }
-
- override def toString = "ParallelArrayIterator(" + i + ", " + until + ")"
-
- /* overrides for efficiency */
-
- /* accessors */
-
- override def foreach[U](f: T => U) = {
- foreach_quick(f, arr, until, i)
- i = until
- }
-
- private def foreach_quick[U](f: T => U, a: Array[Any], ntil: Int, from: Int) = {
- var j = from
- while (j < ntil) {
- f(a(j).asInstanceOf[T])
- j += 1
- }
- }
-
- override def count(p: T => Boolean) = {
- val c = count_quick(p, arr, until, i)
- i = until
- c
- }
-
- private def count_quick(p: T => Boolean, a: Array[Any], ntil: Int, from: Int) = {
- var cnt = 0
- var j = from
- while (j < ntil) {
- if (p(a(j).asInstanceOf[T])) cnt += 1
- j += 1
- }
- cnt
- }
-
- override def foldLeft[S](z: S)(op: (S, T) => S): S = {
- val r = foldLeft_quick(arr, until, op, z)
- i = until
- r
- }
-
- private def foldLeft_quick[S](a: Array[Any], ntil: Int, op: (S, T) => S, z: S): S = {
- var j = i
- var sum = z
- while (j < ntil) {
- sum = op(sum, a(j).asInstanceOf[T])
- j += 1
- }
- sum
- }
-
- def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = foldLeft[S](z)(seqop)
-
- override def sum[U >: T](implicit num: Numeric[U]): U = {
- var s = sum_quick(num, arr, until, i, num.zero)
- i = until
- s
- }
-
- private def sum_quick[U >: T](num: Numeric[U], a: Array[Any], ntil: Int, from: Int, zero: U): U = {
- var j = from
- var sum = zero
- while (j < ntil) {
- sum = num.plus(sum, a(j).asInstanceOf[T])
- j += 1
- }
- sum
- }
-
- override def product[U >: T](implicit num: Numeric[U]): U = {
- var p = product_quick(num, arr, until, i, num.one)
- i = until
- p
- }
-
- private def product_quick[U >: T](num: Numeric[U], a: Array[Any], ntil: Int, from: Int, one: U): U = {
- var j = from
- var prod = one
- while (j < ntil) {
- prod = num.times(prod, a(j).asInstanceOf[T])
- j += 1
- }
- prod
- }
-
- override def forall(p: T => Boolean): Boolean = {
- if (isAborted) return false
-
- var all = true
- while (i < until) {
- val nextuntil = if (i + CHECK_RATE > until) until else i + CHECK_RATE
-
- all = forall_quick(p, array, nextuntil, i)
- if (all) i = nextuntil
- else {
- i = until
- abort
- }
-
- if (isAborted) return false
- }
- all
- }
-
- // it's faster to use a separate small method
- private def forall_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Boolean = {
- var j = start
- while (j < nextuntil) {
- if (p(a(j).asInstanceOf[T])) j += 1
- else return false
- }
- return true
- }
-
- override def exists(p: T => Boolean): Boolean = {
- if (isAborted) return true
-
- var some = false
- while (i < until) {
- val nextuntil = if (i + CHECK_RATE > until) until else i + CHECK_RATE
-
- some = exists_quick(p, array, nextuntil, i)
- if (some) {
- i = until
- abort
- } else i = nextuntil
-
- if (isAborted) return true
- }
- some
- }
-
- // faster to use separate small method
- private def exists_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Boolean = {
- var j = start
- while (j < nextuntil) {
- if (p(a(j).asInstanceOf[T])) return true
- else j += 1
- }
- return false
- }
-
- override def find(p: T => Boolean): Option[T] = {
- if (isAborted) return None
-
- var r: Option[T] = None
- while (i < until) {
- val nextuntil = if ((i + CHECK_RATE) < until) (i + CHECK_RATE) else until
-
- r = find_quick(p, array, nextuntil, i)
-
- if (r != None) {
- i = until
- abort
- } else i = nextuntil
-
- if (isAborted) return r
- }
- r
- }
-
- private def find_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Option[T] = {
- var j = start
- while (j < nextuntil) {
- val elem = a(j).asInstanceOf[T]
- if (p(elem)) return Some(elem)
- else j += 1
- }
- return None
- }
-
- override def drop(n: Int): ParallelArrayIterator = {
- i += n
- this
- }
-
- override def copyToArray[U >: T](array: Array[U], from: Int, len: Int) {
- val totallen = (self.length - i) min len min (array.length - from)
- Array.copy(arr, i, array, from, totallen)
- i += totallen
- }
-
- override def prefixLength(pred: T => Boolean): Int = {
- val r = prefixLength_quick(pred, arr, until, i)
- i += r + 1
- r
- }
-
- private def prefixLength_quick(pred: T => Boolean, a: Array[Any], ntil: Int, startpos: Int): Int = {
- var j = startpos
- var endpos = ntil
- while (j < endpos) {
- if (pred(a(j).asInstanceOf[T])) j += 1
- else endpos = j
- }
- endpos - startpos
- }
-
- override def indexWhere(pred: T => Boolean): Int = {
- val r = indexWhere_quick(pred, arr, until, i)
- val ret = if (r != -1) r - i else r
- i = until
- ret
- }
-
- private def indexWhere_quick(pred: T => Boolean, a: Array[Any], ntil: Int, from: Int): Int = {
- var j = from
- var pos = -1
- while (j < ntil) {
- if (pred(a(j).asInstanceOf[T])) {
- pos = j
- j = ntil
- } else j += 1
- }
- pos
- }
-
- override def lastIndexWhere(pred: T => Boolean): Int = {
- val r = lastIndexWhere_quick(pred, arr, i, until)
- val ret = if (r != -1) r - i else r
- i = until
- ret
- }
-
- private def lastIndexWhere_quick(pred: T => Boolean, a: Array[Any], from: Int, ntil: Int): Int = {
- var pos = -1
- var j = ntil - 1
- while (j >= from) {
- if (pred(a(j).asInstanceOf[T])) {
- pos = j
- j = -1
- } else j -= 1
- }
- pos
- }
-
- override def sameElements(that: Iterator[_]): Boolean = {
- var same = true
- while (i < until && that.hasNext) {
- if (arr(i) != that.next) {
- i = until
- same = false
- }
- i += 1
- }
- same
- }
-
- /* transformers */
-
- override def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
- //val cb = cbf(self.repr)
- cb.sizeHint(remaining)
- map2combiner_quick(f, arr, cb, until, i)
- i = until
- cb
- }
-
- private def map2combiner_quick[S, That](f: T => S, a: Array[Any], cb: Builder[S, That], ntil: Int, from: Int) {
- var j = from
- while (j < ntil) {
- cb += f(a(j).asInstanceOf[T])
- j += 1
- }
- }
-
- override def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[ParallelArray[T], S, That]): Combiner[S, That] = {
- val cb = pbf(self.repr)
- collect2combiner_quick(pf, arr, cb, until, i)
- i = until
- cb
- }
-
- private def collect2combiner_quick[S, That](pf: PartialFunction[T, S], a: Array[Any], cb: Builder[S, That], ntil: Int, from: Int) {
- var j = from
- while (j < ntil) {
- val curr = a(j).asInstanceOf[T]
- if (pf.isDefinedAt(curr)) cb += pf(curr)
- j += 1
- }
- }
-
- override def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[ParallelArray[T], S, That]): Combiner[S, That] = {
- val cb = pbf(self.repr)
- while (i < until) {
- val traversable = f(arr(i).asInstanceOf[T])
- if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator
- else cb ++= traversable
- i += 1
- }
- cb
- }
-
- override def filter2combiner[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Combiner[U, This]) = {
- filter2combiner_quick(pred, cb, arr, until, i)
- i = until
- cb
- }
-
- private def filter2combiner_quick[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Builder[U, This], a: Array[Any], ntil: Int, from: Int) {
- var j = i
- while(j < ntil) {
- var curr = a(j).asInstanceOf[T]
- if (pred(curr)) cb += curr
- j += 1
- }
- }
-
- override def filterNot2combiner[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Combiner[U, This]) = {
- filterNot2combiner_quick(pred, cb, arr, until, i)
- i = until
- cb
- }
-
- private def filterNot2combiner_quick[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Builder[U, This], a: Array[Any], ntil: Int, from: Int) {
- var j = i
- while(j < ntil) {
- var curr = a(j).asInstanceOf[T]
- if (!pred(curr)) cb += curr
- j += 1
- }
- }
-
- override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = {
- cb.sizeHint(remaining)
- cb.ifIs[ParallelArrayCombiner[T]] { pac =>
- val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
- Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i)
- pac.lastbuff.setInternalSize(remaining)
- } otherwise {
- copy2builder_quick(cb, arr, until, i)
- i = until
- }
- cb
- }
-
- private def copy2builder_quick[U >: T, Coll](b: Builder[U, Coll], a: Array[Any], ntil: Int, from: Int) {
- var j = from
- while (j < ntil) {
- b += a(j).asInstanceOf[T]
- j += 1
- }
- }
-
- override def partition2combiners[U >: T, This >: ParallelArray[T]](pred: T => Boolean, btrue: Combiner[U, This], bfalse: Combiner[U, This]) = {
- partition2combiners_quick(pred, btrue, bfalse, arr, until, i)
- i = until
- (btrue, bfalse)
- }
-
- private def partition2combiners_quick[U >: T, This >: ParallelArray[T]](p: T => Boolean, btrue: Builder[U, This], bfalse: Builder[U, This], a: Array[Any], ntil: Int, from: Int) {
- var j = from
- while (j < ntil) {
- val curr = a(j).asInstanceOf[T]
- if (p(curr)) btrue += curr else bfalse += curr
- j += 1
- }
- }
-
- override def take2combiner[U >: T, This >: ParallelArray[T]](n: Int, cb: Combiner[U, This]) = {
- cb.sizeHint(n)
- val ntil = i + n
- val a = arr
- while (i < ntil) {
- cb += a(i).asInstanceOf[T]
- i += 1
- }
- cb
- }
-
- override def drop2combiner[U >: T, This >: ParallelArray[T]](n: Int, cb: Combiner[U, This]) = {
- drop(n)
- cb.sizeHint(remaining)
- while (i < until) {
- cb += arr(i).asInstanceOf[T]
- i += 1
- }
- cb
- }
-
- override def reverse2combiner[U >: T, This >: ParallelArray[T]](cb: Combiner[U, This]): Combiner[U, This] = {
- cb.ifIs[ParallelArrayCombiner[T]] { pac =>
- val sz = remaining
- pac.sizeHint(sz)
- val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
- reverse2combiner_quick(targetarr, arr, i, until)
- pac.lastbuff.setInternalSize(sz)
- pac
- } otherwise super.reverse2combiner(cb)
- cb
- }
-
- private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) {
- var j = from
- var k = ntil - from - 1
- while (j < ntil) {
- targ(k) = a(j)
- j += 1
- k -= 1
- }
- }
-
- }
-
-}
-
-
-
-
-
-object ParallelArray extends ParallelFactory[ParallelArray] {
- implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelArray[T]] = new GenericCanCombineFrom[T]
- def newBuilder[T]: Combiner[T, ParallelArray[T]] = newCombiner
- def newCombiner[T]: Combiner[T, ParallelArray[T]] = ParallelArrayCombiner[T]
-
- /** Creates a new parallel array by wrapping the specified array.
- */
- def handoff[T <: AnyRef](arr: Array[T]): ParallelArray[T] = {
- new ParallelArray[T](new ExposedArraySeq[T](arr.asInstanceOf[Array[AnyRef]], arr.length))
- }
-
- def createFromCopy[T <: AnyRef : ClassManifest](arr: Array[T]): ParallelArray[T] = {
- val newarr = new Array[T](arr.length)
- Array.copy(arr, 0, newarr, 0, arr.length)
- handoff(newarr)
- }
-
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala
deleted file mode 100644
index 2991344be2..0000000000
--- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-package scala.collection.parallel.mutable
-
-
-
-
-
-import scala.collection.generic.Sizing
-import scala.collection.mutable.ArraySeq
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.parallel.TaskSupport
-import scala.collection.parallel.EnvironmentPassingCombiner
-
-
-
-
-
-
-
-trait ParallelArrayCombiner[T]
-extends LazyCombiner[T, ParallelArray[T], ExposedArrayBuffer[T]]
- with TaskSupport {
- self: EnvironmentPassingCombiner[T, ParallelArray[T]] =>
-
- override def sizeHint(sz: Int) = if (chain.length == 1) chain(0).sizeHint(sz)
-
- def newLazyCombiner(c: ArrayBuffer[ExposedArrayBuffer[T]]) = ParallelArrayCombiner(c)
-
- def allocateAndCopy = if (chain.size > 1) {
- val arrayseq = new ArraySeq[T](size)
- val array = arrayseq.array.asInstanceOf[Array[Any]]
-
- executeAndWait(new CopyChainToArray(array, 0, size))
-
- new ParallelArray(arrayseq)
- } else { // optimisation if there is only 1 array
- val pa = new ParallelArray(new ExposedArraySeq[T](chain(0).internalArray, size))
- pa
- }
-
- override def toString = "ParallelArrayCombiner(" + size + "): " + chain
-
- /* tasks */
-
- class CopyChainToArray(array: Array[Any], offset: Int, howmany: Int) extends super.Task[Unit, CopyChainToArray] {
- var result = ()
- def leaf(prev: Option[Unit]) = if (howmany > 0) {
- var totalleft = howmany
- val (stbuff, stind) = findStart(offset)
- var buffind = stbuff
- var ind = stind
- var arrayIndex = offset
- while (totalleft > 0) {
- val currbuff = chain(buffind)
- val chunksize = if (totalleft < (currbuff.size - ind)) totalleft else currbuff.size - ind
- val until = ind + chunksize
-
- copyChunk(currbuff.internalArray, ind, array, arrayIndex, until)
- arrayIndex += chunksize
- ind += chunksize
-
- totalleft -= chunksize
- buffind += 1
- ind = 0
- }
- }
- private def copyChunk(buffarr: Array[AnyRef], buffStart: Int, ra: Array[Any], arrayStart: Int, until: Int) {
- Array.copy(buffarr, buffStart, ra, arrayStart, until - buffStart)
- }
- private def findStart(pos: Int) = {
- var left = pos
- var buffind = 0
- while (left >= chain(buffind).size) {
- left -= chain(buffind).size
- buffind += 1
- }
- (buffind, left)
- }
- def split = {
- val fp = howmany / 2
- List(new CopyChainToArray(array, offset, fp), new CopyChainToArray(array, offset + fp, howmany - fp))
- }
- def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, parallelismLevel)
- }
-
-}
-
-
-object ParallelArrayCombiner {
- def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ParallelArrayCombiner[T] = {
- new { val chain = c } with ParallelArrayCombiner[T] with EnvironmentPassingCombiner[T, ParallelArray[T]]
- }
- def apply[T]: ParallelArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T])
-}
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala
deleted file mode 100644
index bd0a46bc43..0000000000
--- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-package scala.collection.parallel.mutable
-
-
-import scala.collection.generic._
-
-import scala.collection.parallel.ParallelIterableLike
-import scala.collection.parallel.Combiner
-
-
-/** A template trait for parallel iterable collections.
- *
- * $paralleliterableinfo
- *
- * $sideeffects
- *
- * @tparam T the element type of the collection
- *
- * @author prokopec
- * @since 2.8
- */
-trait ParallelIterable[T] extends collection.mutable.Iterable[T]
- with collection.parallel.ParallelIterable[T]
- with GenericParallelTemplate[T, ParallelIterable]
- with ParallelIterableLike[T, ParallelIterable[T], Iterable[T]] {
- override def companion: GenericCompanion[ParallelIterable] with GenericParallelCompanion[ParallelIterable] = ParallelIterable
-}
-
-/** $factoryinfo
- */
-object ParallelIterable extends ParallelFactory[ParallelIterable] {
- implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelIterable[T]] =
- new GenericCanCombineFrom[T]
-
- def newBuilder[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T]
-
- def newCombiner[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T]
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala
deleted file mode 100644
index 636ba1ac3d..0000000000
--- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-package scala.collection.parallel.mutable
-
-
-import scala.collection.generic.GenericParallelTemplate
-import scala.collection.generic.GenericCompanion
-import scala.collection.generic.GenericParallelCompanion
-import scala.collection.generic.CanCombineFrom
-import scala.collection.generic.ParallelFactory
-import scala.collection.parallel.ParallelSeqLike
-import scala.collection.parallel.Combiner
-
-
-
-
-
-
-
-/** A mutable variant of `ParallelSeq`.
- *
- * @define Coll mutable.ParallelSeq
- * @define coll mutable parallel sequence
- */
-trait ParallelSeq[T] extends collection.mutable.Seq[T]
- with ParallelIterable[T]
- with collection.parallel.ParallelSeq[T]
- with GenericParallelTemplate[T, ParallelSeq]
- with ParallelSeqLike[T, ParallelSeq[T], Seq[T]] {
- self =>
- override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq
-
- def update(i: Int, elem: T): Unit
-
-}
-
-
-/** $factoryInfo
- * @define Coll mutable.ParallelSeq
- * @define coll mutable parallel sequence
- */
-object ParallelSeq extends ParallelFactory[ParallelSeq] {
- implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeq[T]] = new GenericCanCombineFrom[T]
-
- def newBuilder[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T]
-
- def newCombiner[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T]
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/parallel-collections/scala/collection/parallel/mutable/package.scala b/src/parallel-collections/scala/collection/parallel/mutable/package.scala
deleted file mode 100644
index f670c7b7c5..0000000000
--- a/src/parallel-collections/scala/collection/parallel/mutable/package.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-package scala.collection.parallel
-
-
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.ArraySeq
-import scala.collection.generic.Sizing
-
-
-
-package object mutable {
-
- /* hack-arounds */
-
- private[mutable] class ExposedArrayBuffer[T] extends ArrayBuffer[T] with Sizing {
- def internalArray = array
- def setInternalSize(s: Int) = size0 = s
- override def sizeHint(len: Int) = { // delete once we start using 2.8.RC1+
- if (len > size && len >= 1) {
- val newarray = new Array[AnyRef](len)
- Array.copy(array, 0, newarray, 0, size0)
- array = newarray
- }
- }
- }
-
- private[mutable] class ExposedArraySeq[T](arr: Array[AnyRef], sz: Int) extends ArraySeq[T](sz) {
- override val array = arr
- override val length = sz
- }
-
-} \ No newline at end of file
diff --git a/src/parallel-collections/scala/collection/parallel/package.scala b/src/parallel-collections/scala/collection/parallel/package.scala
deleted file mode 100644
index 3b297f1cd1..0000000000
--- a/src/parallel-collections/scala/collection/parallel/package.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-package scala.collection
-
-
-import java.lang.Thread._
-
-import scala.collection.generic.CanBuildFrom
-import scala.collection.generic.CanCombineFrom
-
-
-/** Package object for parallel collections.
- */
-package object parallel {
- val MIN_FOR_COPY = -1 // TODO: set to 5000
- val CHECK_RATE = 512
-
- /** Computes threshold from the size of the collection and the parallelism level.
- */
- def thresholdFromSize(sz: Int, parallelismLevel: Int) = {
- val p = parallelismLevel
- if (p > 1) 1 + sz / (8 * p)
- else sz
- }
-
- /** An implicit conversion providing arrays with a `par` method, which
- * returns a parallel array.
- *
- * @tparam T type of the elements in the array, which is a subtype of AnyRef
- * @param array the array to be parallelized
- * @return a `Parallelizable` object with a `par` method
- */
- implicit def array2ParallelArray[T <: AnyRef](array: Array[T]) = new Parallelizable[T, mutable.ParallelArray[T]] {
- def par = mutable.ParallelArray.handoff[T](array)
- }
-
- implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new {
- def isParallel = bf.isInstanceOf[Parallel]
- def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]]
- def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new {
- def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody
- }
- }
-
- implicit def traversable2ops[T](t: TraversableOnce[T]) = new {
- def isParallel = t.isInstanceOf[Parallel]
- def isParallelIterable = t.isInstanceOf[ParallelIterable[_]]
- def asParallelIterable = t.asInstanceOf[ParallelIterable[T]]
- def isParallelSeq = t.isInstanceOf[ParallelSeq[_]]
- def asParallelSeq = t.asInstanceOf[ParallelSeq[T]]
- def ifParallelSeq[R](isbody: ParallelSeq[T] => R) = new {
- def otherwise(notbody: => R) = if (isParallel) isbody(asParallelSeq) else notbody
- }
- }
-
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala
index 747178c1a4..fd6e1a8559 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala
@@ -29,7 +29,10 @@ trait ParallelHashTrieBenches[K, V] extends StandardParallelIterableBench[(K, V)
extends IterableBench with StandardParallelIterableBench[(K, V), ParallelHashTrie[K, V]] {
var result: Int = 0
def comparisonMap = collection.Map()
- def runseq = result = this.seqcoll.map(operators.mapper2).size
+ def runseq = {
+ val r = this.seqcoll.asInstanceOf[collection.immutable.HashMap[K, V]].map(operators.mapper2)
+ result = r.size
+ }
def runpar = {
result = this.parcoll.map(operators.mapper2).size
//println(collection.parallel.immutable.ParallelHashTrie.totalcombines)