From 52b863dd86ae854328f74d1d05ca71b2915fa7d7 Mon Sep 17 00:00:00 2001 From: Aleksandar Pokopec Date: Tue, 29 Jun 2010 14:05:59 +0000 Subject: Implemented lazy combiners for parallel hash trie. --- .../scala/collection/immutable/HashMap.scala | 2 +- .../parallel/immutable/ParallelHashTrie.scala | 131 ++- .../scala/collection/Parallel.scala | 17 - .../scala/collection/Parallelizable.scala | 40 - .../scala/collection/Sequentializable.scala | 15 - .../collection/generic/CanBuildFromParallel.scala | 28 - .../generic/GenericParallelCompanion.scala | 29 - .../generic/GenericParallelTemplate.scala | 66 -- .../scala/collection/generic/HasNewCombiner.scala | 26 - .../scala/collection/generic/ParallelFactory.scala | 43 - .../collection/generic/ParallelMapFactory.scala | 42 - .../scala/collection/generic/Signalling.scala | 192 ----- .../scala/collection/generic/Sizing.scala | 9 - .../scala/collection/immutable/package.scala | 81 -- .../scala/collection/parallel/Combiners.scala | 66 -- .../scala/collection/parallel/Iterators.scala | 443 ---------- .../collection/parallel/ParallelIterable.scala | 49 -- .../collection/parallel/ParallelIterableLike.scala | 940 --------------------- .../collection/parallel/ParallelIterableView.scala | 33 - .../parallel/ParallelIterableViewLike.scala | 59 -- .../scala/collection/parallel/ParallelMap.scala | 71 -- .../collection/parallel/ParallelMapLike.scala | 43 - .../scala/collection/parallel/ParallelSeq.scala | 64 -- .../collection/parallel/ParallelSeqLike.scala | 473 ----------- .../collection/parallel/ParallelSeqView.scala | 64 -- .../collection/parallel/ParallelSeqViewLike.scala | 192 ----- .../scala/collection/parallel/Splitters.scala | 86 -- .../scala/collection/parallel/TaskSupport.scala | 27 - .../scala/collection/parallel/Tasks.scala | 230 ----- .../parallel/immutable/ParallelHashTrie.scala | 137 --- .../parallel/immutable/ParallelIterable.scala | 56 -- .../parallel/immutable/ParallelRange.scala | 88 -- .../parallel/immutable/ParallelSeq.scala | 47 -- .../collection/parallel/immutable/package.scala | 56 -- .../collection/parallel/mutable/LazyCombiner.scala | 43 - .../parallel/mutable/ParallelArray.scala | 561 ------------ .../parallel/mutable/ParallelArrayCombiner.scala | 105 --- .../parallel/mutable/ParallelIterable.scala | 51 -- .../collection/parallel/mutable/ParallelSeq.scala | 61 -- .../collection/parallel/mutable/package.scala | 32 - .../scala/collection/parallel/package.scala | 70 -- 41 files changed, 121 insertions(+), 4747 deletions(-) delete mode 100644 src/parallel-collections/scala/collection/Parallel.scala delete mode 100644 src/parallel-collections/scala/collection/Parallelizable.scala delete mode 100644 src/parallel-collections/scala/collection/Sequentializable.scala delete mode 100644 src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala delete mode 100644 src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala delete mode 100644 src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala delete mode 100644 src/parallel-collections/scala/collection/generic/HasNewCombiner.scala delete mode 100644 src/parallel-collections/scala/collection/generic/ParallelFactory.scala delete mode 100644 src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala delete mode 100644 src/parallel-collections/scala/collection/generic/Signalling.scala delete mode 100644 src/parallel-collections/scala/collection/generic/Sizing.scala delete mode 100644 src/parallel-collections/scala/collection/immutable/package.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/Combiners.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/Iterators.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/ParallelIterable.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/ParallelIterableView.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/ParallelIterableViewLike.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/ParallelMap.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/ParallelSeq.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/Splitters.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/TaskSupport.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/Tasks.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/immutable/ParallelIterable.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/immutable/package.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/mutable/package.scala delete mode 100644 src/parallel-collections/scala/collection/parallel/package.scala (limited to 'src') diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala index 8b4bc070ab..f40905428e 100644 --- a/src/library/scala/collection/immutable/HashMap.scala +++ b/src/library/scala/collection/immutable/HashMap.scala @@ -75,7 +75,7 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Par protected def get0(key: A, hash: Int, level: Int): Option[B] = None - protected def updated0[B1 >: B](key: A, hash: Int, level: Int, value: B1, kv: (A, B1)): HashMap[A, B1] = + def updated0[B1 >: B](key: A, hash: Int, level: Int, value: B1, kv: (A, B1)): HashMap[A, B1] = new HashMap.HashMap1(key, hash, value, kv) protected def removed0(key: A, hash: Int, level: Int): HashMap[A, B] = this diff --git a/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala b/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala index e29e9dfa98..a9e08913ea 100644 --- a/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala +++ b/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala @@ -21,6 +21,10 @@ import scala.collection.immutable.HashMap +/** Parallel hash trie map. + * + * @author prokopec + */ class ParallelHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V]) extends ParallelMap[K, V] with GenericParallelMapTemplate[K, V, ParallelHashTrie] @@ -87,7 +91,7 @@ object ParallelHashTrie extends ParallelMapFactory[ParallelHashTrie] { new CanCombineFromMap[K, V] } - def fromTrie[K, V](trie: HashMap[K, V]): ParallelHashTrie[K, V] = new ParallelHashTrie(trie) + def fromTrie[K, V](t: HashMap[K, V]) = new ParallelHashTrie(t) var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0) } @@ -96,31 +100,136 @@ object ParallelHashTrie extends ParallelMapFactory[ParallelHashTrie] { trait HashTrieCombiner[K, V] extends Combiner[(K, V), ParallelHashTrie[K, V]] { self: EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] => - private var trie: HashMap[K, V] = HashMap.empty[K, V] - - def size: Int = trie.size - - def clear = trie = HashMap.empty[K, V] + import HashTrieCombiner._ + var heads = new Array[Unrolled[K, V]](rootsize) + var lasts = new Array[Unrolled[K, V]](rootsize) + var size: Int = 0 + + def clear = { + heads = new Array[Unrolled[K, V]](rootsize) + lasts = new Array[Unrolled[K, V]](rootsize) + } - def +=(elem: (K, V)) = { trie += elem; this } + def +=(elem: (K, V)) = { + size += 1 + val hc = elem._1.## + val pos = hc & 0x1f + if (lasts(pos) eq null) { + // initialize bucket + heads(pos) = new Unrolled[K, V] + lasts(pos) = heads(pos) + } + // add to bucket + lasts(pos) = lasts(pos).add(elem) + this + } def combine[N <: (K, V), NewTo >: ParallelHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { // ParallelHashTrie.totalcombines.incrementAndGet if (other.isInstanceOf[HashTrieCombiner[_, _]]) { val that = other.asInstanceOf[HashTrieCombiner[K, V]] - val ncombiner = HashTrieCombiner[K, V] - ncombiner.trie = this.trie merge that.trie - ncombiner + var i = 0 + while (i < rootsize) { + if (lasts(i) eq null) { + heads(i) = that.heads(i) + lasts(i) = that.lasts(i) + } else { + lasts(i).next = that.heads(i) + if (that.lasts(i) ne null) lasts(i) = that.lasts(i) + } + i += 1 + } + size = size + that.size + this } else error("Unexpected combiner type.") } else this - def result = new ParallelHashTrie[K, V](trie) + def result = { + val buckets = heads.filter(_ != null) + val root = new Array[HashMap[K, V]](buckets.length) + + executeAndWait(new CreateTrie(buckets, root, 0, buckets.length)) + + var bitmap = 0 + var i = 0 + while (i < rootsize) { + if (heads(i) ne null) bitmap |= 1 << i + i += 1 + } + val sz = root.foldLeft(0)(_ + _.size) + + if (sz == 0) new ParallelHashTrie[K, V] + else if (sz == 1) new ParallelHashTrie[K, V](root(0)) + else { + val trie = new HashMap.HashTrieMap(bitmap, root, sz) + new ParallelHashTrie[K, V](trie) + } + } + + /* tasks */ + + class CreateTrie(buckets: Array[Unrolled[K, V]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] { + var result = () + def leaf(prev: Option[Unit]) = { + var i = offset + val until = offset + howmany + while (i < until) { + root(i) = createTrie(buckets(i)) + i += 1 + } + } + private def createTrie(elems: Unrolled[K, V]): HashMap[K, V] = { + var trie = new HashMap[K, V] + + var unrolled = elems + var i = 0 + while (unrolled ne null) { + val chunkarr = unrolled.array + val chunksz = unrolled.size + while (i < chunksz) { + val kv = chunkarr(i) + val hc = kv._1.## + trie = trie.updated0(kv._1, hc, rootbits, kv._2, kv) + i += 1 + } + i = 0 + unrolled = unrolled.next + } + + trie + } + def split = { + val fp = howmany / 2 + List(new CreateTrie(buckets, root, offset, fp), new CreateTrie(buckets, root, offset + fp, howmany - fp)) + } + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) + } } object HashTrieCombiner { def apply[K, V] = new HashTrieCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] {} + + private[immutable] val rootbits = 5 + private[immutable] val rootsize = 1 << 5 + private[immutable] val unrolledsize = 16 + + private[immutable] class Unrolled[K, V] { + var size = 0 + var array = new Array[(K, V)](unrolledsize) + var next: Unrolled[K, V] = null + // adds and returns itself or the new unrolled if full + def add(elem: (K, V)): Unrolled[K, V] = if (size < unrolledsize) { + array(size) = elem + size += 1 + this + } else { + next = new Unrolled[K, V] + next.add(elem) + } + override def toString = "Unrolled(" + array.mkString(", ") + ")" + } } diff --git a/src/parallel-collections/scala/collection/Parallel.scala b/src/parallel-collections/scala/collection/Parallel.scala deleted file mode 100644 index e500817745..0000000000 --- a/src/parallel-collections/scala/collection/Parallel.scala +++ /dev/null @@ -1,17 +0,0 @@ -package scala.collection - - - - - - -/** A marker trait for objects with parallelised operations. - * - * @since 2.8 - * @author prokopec - */ -trait Parallel - - - - diff --git a/src/parallel-collections/scala/collection/Parallelizable.scala b/src/parallel-collections/scala/collection/Parallelizable.scala deleted file mode 100644 index bfca4a41d7..0000000000 --- a/src/parallel-collections/scala/collection/Parallelizable.scala +++ /dev/null @@ -1,40 +0,0 @@ -package scala.collection - - - -import parallel.ParallelIterableLike - - - -/** - * This trait describes collections which can be turned into parallel collections - * by invoking the method `par`. Parallelizable collections may be parametrized with - * a target type different than their own. - */ -trait Parallelizable[+T, +ParRepr <: Parallel] { - - /** - * Returns a parallel implementation of a collection. - */ - def par: ParRepr - -} - - - - - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/Sequentializable.scala b/src/parallel-collections/scala/collection/Sequentializable.scala deleted file mode 100644 index 61fb24571a..0000000000 --- a/src/parallel-collections/scala/collection/Sequentializable.scala +++ /dev/null @@ -1,15 +0,0 @@ -package scala.collection - - - - -trait Sequentializable[+T, +Repr] { - - /** A view of this parallel collection, but with all - * of the operations implemented sequentially (i.e. in a single-threaded manner). - * - * @return a sequential view of the collection. - */ - def seq: Repr - -} \ No newline at end of file diff --git a/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala b/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala deleted file mode 100644 index fcbcd6295e..0000000000 --- a/src/parallel-collections/scala/collection/generic/CanBuildFromParallel.scala +++ /dev/null @@ -1,28 +0,0 @@ -package scala.collection -package generic - - - -import scala.collection.parallel._ - - - - -/** - * A base trait for parallel builder factories. - * - * @tparam From the type of the underlying collection that requests a builder to be created - * @tparam Elem the element type of the collection to be created - * @tparam To the type of the collection to be created - */ -trait CanCombineFrom[-From, -Elem, +To] extends CanBuildFrom[From, Elem, To] with Parallel { - def apply(from: From): Combiner[Elem, To] - def apply(): Combiner[Elem, To] -} - - - - - - - diff --git a/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala b/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala deleted file mode 100644 index e5ba36f846..0000000000 --- a/src/parallel-collections/scala/collection/generic/GenericParallelCompanion.scala +++ /dev/null @@ -1,29 +0,0 @@ -package scala.collection.generic - - -import scala.collection.parallel.Combiner -import scala.collection.parallel.ParallelIterable -import scala.collection.parallel.ParallelMap - - - -/** A template class for companion objects of parallel collection classes. - * They should be mixed in together with `GenericCompanion` type. - * @tparam CC the type constructor representing the collection class - * @since 2.8 - */ -trait GenericParallelCompanion[+CC[X] <: ParallelIterable[X]] { - /** The default builder for $Coll objects. - */ - def newBuilder[A]: Combiner[A, CC[A]] - - /** The parallel builder for $Coll objects. - */ - def newCombiner[A]: Combiner[A, CC[A]] -} - -trait GenericParallelMapCompanion[+CC[P, Q] <: ParallelMap[P, Q]] { - def newCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]] -} - - diff --git a/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala b/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala deleted file mode 100644 index e98c13fa36..0000000000 --- a/src/parallel-collections/scala/collection/generic/GenericParallelTemplate.scala +++ /dev/null @@ -1,66 +0,0 @@ -package scala.collection.generic - - - -import scala.collection.parallel.Combiner -import scala.collection.parallel.ParallelIterable -import scala.collection.parallel.ParallelMap -import scala.collection.parallel.TaskSupport - - -import annotation.unchecked.uncheckedVariance - - - - - - -/** A template trait for collections having a companion. - * - * @tparam A the element type of the collection - * @tparam CC the type constructor representing the collection class - * @since 2.8 - * @author prokopec - */ -trait GenericParallelTemplate[+A, +CC[X] <: ParallelIterable[X]] -extends GenericTraversableTemplate[A, CC] - with HasNewCombiner[A, CC[A] @uncheckedVariance] - with TaskSupport -{ - def companion: GenericCompanion[CC] with GenericParallelCompanion[CC] - - protected[this] override def newBuilder: collection.mutable.Builder[A, CC[A]] = newCombiner - - protected[this] override def newCombiner: Combiner[A, CC[A]] = { - val cb = companion.newCombiner[A] - cb.environment = environment - cb - } - - override def genericBuilder[B]: Combiner[B, CC[B]] = genericCombiner[B] - - def genericCombiner[B]: Combiner[B, CC[B]] = { - val cb = companion.newCombiner[B] - cb.environment = environment - cb - } - -} - - -trait GenericParallelMapTemplate[K, +V, +CC[X, Y] <: ParallelMap[X, Y]] -extends TaskSupport -{ - def mapCompanion: GenericParallelMapCompanion[CC] - - def genericMapCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]] = { - val cb = mapCompanion.newCombiner[P, Q] - cb.environment = environment - cb - } -} - - - - - diff --git a/src/parallel-collections/scala/collection/generic/HasNewCombiner.scala b/src/parallel-collections/scala/collection/generic/HasNewCombiner.scala deleted file mode 100644 index 2c24b437d8..0000000000 --- a/src/parallel-collections/scala/collection/generic/HasNewCombiner.scala +++ /dev/null @@ -1,26 +0,0 @@ -package scala.collection.generic - - - -import scala.collection.parallel.Combiner - - - -trait HasNewCombiner[+T, +Repr] { - protected[this] def newCombiner: Combiner[T, Repr] -} - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/generic/ParallelFactory.scala b/src/parallel-collections/scala/collection/generic/ParallelFactory.scala deleted file mode 100644 index 0b9e92aa10..0000000000 --- a/src/parallel-collections/scala/collection/generic/ParallelFactory.scala +++ /dev/null @@ -1,43 +0,0 @@ -package scala.collection.generic - - -import scala.collection.parallel.ParallelIterable -import scala.collection.parallel.Combiner - - - -/** A template class for companion objects of `ParallelIterable` and subclasses thereof. - * This class extends `TraversableFactory` and provides a set of operations to create `$Coll` objects. - * - * @define $coll parallel collection - * @define $Coll ParallelIterable - */ -abstract class ParallelFactory[CC[X] <: ParallelIterable[X] with GenericParallelTemplate[X, CC]] -extends TraversableFactory[CC] - with GenericParallelCompanion[CC] { - - type EPC[T, C] = collection.parallel.EnvironmentPassingCombiner[T, C] - - /** - * A generic implementation of the `CanBuildFromParallel` trait, which forwards all calls to - * `apply(from)` to the `genericParallelBuilder` method of the $coll `from`, and calls to `apply()` - * to this factory. - */ - class GenericCanCombineFrom[A] extends GenericCanBuildFrom[A] with CanCombineFrom[CC[_], A, CC[A]] { - override def apply(from: Coll) = from.genericCombiner - override def apply() = newBuilder[A] - } -} - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala b/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala deleted file mode 100644 index 8f779b4029..0000000000 --- a/src/parallel-collections/scala/collection/generic/ParallelMapFactory.scala +++ /dev/null @@ -1,42 +0,0 @@ -package scala.collection.generic - - - -import scala.collection.parallel.ParallelMap -import scala.collection.parallel.ParallelMapLike -import scala.collection.parallel.Combiner -import scala.collection.mutable.Builder - - - - -/** A template class for companion objects of `ParallelMap` and subclasses thereof. - * This class extends `TraversableFactory` and provides a set of operations to create `$Coll` objects. - * - * @define $coll parallel map - * @define $Coll ParallelMap - */ -abstract class ParallelMapFactory[CC[X, Y] <: ParallelMap[X, Y] with ParallelMapLike[X, Y, CC[X, Y], _]] -extends MapFactory[CC] - with GenericParallelMapCompanion[CC] { - - type MapColl = CC[_, _] - - /** The default builder for $Coll objects. - * @tparam K the type of the keys - * @tparam V the type of the associated values - */ - override def newBuilder[K, V]: Builder[(K, V), CC[K, V]] = newCombiner[K, V] - - /** The default combiner for $Coll objects. - * @tparam K the type of the keys - * @tparam V the type of the associated values - */ - def newCombiner[K, V]: Combiner[(K, V), CC[K, V]] - - class CanCombineFromMap[K, V] extends CanCombineFrom[CC[_, _], (K, V), CC[K, V]] { - def apply(from: MapColl) = from.genericMapCombiner[K, V].asInstanceOf[Combiner[(K, V), CC[K, V]]] - def apply() = newCombiner[K, V] - } - -} diff --git a/src/parallel-collections/scala/collection/generic/Signalling.scala b/src/parallel-collections/scala/collection/generic/Signalling.scala deleted file mode 100644 index 1dac4297b7..0000000000 --- a/src/parallel-collections/scala/collection/generic/Signalling.scala +++ /dev/null @@ -1,192 +0,0 @@ -package scala.collection.generic - - -import java.util.concurrent.atomic.AtomicInteger - - - - - -/** - * A message interface serves as a unique interface to the - * part of the collection capable of receiving messages from - * a different task. - * - * One example of use of this is the `find` method, which can use the - * signalling interface to inform worker threads that an element has - * been found and no further search is necessary. - * - * @author prokopec - * - * @define abortflag - * Abort flag being true means that a worker can abort and produce whatever result, - * since its result will not affect the final result of computation. An example - * of operations using this are `find`, `forall` and `exists` methods. - * - * @define indexflag - * The index flag holds an integer which carries some operation-specific meaning. For - * instance, `takeWhile` operation sets the index flag to the position of the element - * where the predicate fails. Other workers may check this index against the indices - * they are working on and return if this index is smaller than their index. Examples - * of operations using this are `takeWhile`, `dropWhile`, `span` and `indexOf`. - */ -trait Signalling { - /** - * Checks whether an abort signal has been issued. - * - * $abortflag - * @return the state of the abort - */ - def isAborted: Boolean - - /** - * Sends an abort signal to other workers. - * - * $abortflag - */ - def abort: Unit - - /** - * Returns the value of the index flag. - * - * $indexflag - * @return the value of the index flag - */ - def indexFlag: Int - - /** - * Sets the value of the index flag. - * - * $indexflag - * @param f the value to which the index flag is set. - */ - def setIndexFlag(f: Int) - - /** - * Sets the value of the index flag if argument is greater than current value. - * This method does this atomically. - * - * $indexflag - * @param f the value to which the index flag is set - */ - def setIndexFlagIfGreater(f: Int) - - /** - * Sets the value of the index flag if argument is lesser than current value. - * This method does this atomically. - * - * $indexflag - * @param f the value to which the index flag is set - */ - def setIndexFlagIfLesser(f: Int) - - /** - * A read only tag specific to the signalling object. It is used to give - * specific workers information on the part of the collection being operated on. - */ - def tag: Int -} - - -/** - * This signalling implementation returns default values and ignores received signals. - */ -class DefaultSignalling extends Signalling { - def isAborted = false - def abort {} - - def indexFlag = -1 - def setIndexFlag(f: Int) {} - def setIndexFlagIfGreater(f: Int) {} - def setIndexFlagIfLesser(f: Int) {} - - def tag = -1 -} - - -/** - * An object that returns default values and ignores received signals. - */ -object IdleSignalling extends DefaultSignalling - - -/** - * A mixin trait that implements abort flag behaviour using volatile variables. - */ -trait VolatileAbort extends Signalling { - @volatile private var abortflag = false - abstract override def isAborted = abortflag - abstract override def abort = abortflag = true -} - - -/** - * A mixin trait that implements index flag behaviour using atomic integers. - * The `setIndex` operation is wait-free, while conditional set operations `setIndexIfGreater` - * and `setIndexIfLesser` are lock-free and support only monotonic changes. - */ -trait AtomicIndexFlag extends Signalling { - private val intflag: AtomicInteger = new AtomicInteger(-1) - abstract override def indexFlag = intflag.get - abstract override def setIndexFlag(f: Int) = intflag.set(f) - abstract override def setIndexFlagIfGreater(f: Int) = { - var loop = true - do { - val old = intflag.get - if (f <= old) loop = false - else if (intflag.compareAndSet(old, f)) loop = false - } while (loop); - } - abstract override def setIndexFlagIfLesser(f: Int) = { - var loop = true - do { - val old = intflag.get - if (f >= old) loop = false - else if (intflag.compareAndSet(old, f)) loop = false - } while (loop); - } -} - - -/** - * An implementation of the signalling interface using delegates. - */ -trait DelegatedSignalling extends Signalling { - /** - * A delegate that method calls are redirected to. - */ - var signalDelegate: Signalling - - def isAborted = signalDelegate.isAborted - def abort = signalDelegate.abort - - def indexFlag = signalDelegate.indexFlag - def setIndexFlag(f: Int) = signalDelegate.setIndexFlag(f) - def setIndexFlagIfGreater(f: Int) = signalDelegate.setIndexFlagIfGreater(f) - def setIndexFlagIfLesser(f: Int) = signalDelegate.setIndexFlagIfLesser(f) - - def tag = signalDelegate.tag -} - - -/** - * Class implementing delegated signalling. - */ -class DelegatedContext(var signalDelegate: Signalling) extends DelegatedSignalling - - -/** - * Class implementing delegated signalling, but having its own distinct `tag`. - */ -class TaggedDelegatedContext(deleg: Signalling, override val tag: Int) extends DelegatedContext(deleg) - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/generic/Sizing.scala b/src/parallel-collections/scala/collection/generic/Sizing.scala deleted file mode 100644 index bf801302ae..0000000000 --- a/src/parallel-collections/scala/collection/generic/Sizing.scala +++ /dev/null @@ -1,9 +0,0 @@ -package scala.collection.generic - - - -/** A trait for objects which have a size. - */ -trait Sizing { - def size: Int -} \ No newline at end of file diff --git a/src/parallel-collections/scala/collection/immutable/package.scala b/src/parallel-collections/scala/collection/immutable/package.scala deleted file mode 100644 index 5ff9fa223d..0000000000 --- a/src/parallel-collections/scala/collection/immutable/package.scala +++ /dev/null @@ -1,81 +0,0 @@ -package scala.collection - - - - - - - - - -package object immutable { - - trait RangeUtils[+Repr <: RangeUtils[Repr]] { - - def start: Int - def end: Int - def step: Int - def inclusive: Boolean - def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean): Repr - - private final def inclusiveLast: Int = { - val size = end.toLong - start.toLong - (size / step.toLong * step.toLong + start.toLong).toInt - } - - final def _last: Int = if (!inclusive) { - if (step == 1 || step == -1) end - step - else { - val inclast = inclusiveLast - if ((end.toLong - start.toLong) % step == 0) inclast - step else inclast - } - } else { - if (step == 1 || step == -1) end - else inclusiveLast - } - - final def _foreach[U](f: Int => U) = if (_length > 0) { - var i = start - val last = _last - while (i != last) { - f(i) - i += step - } - } - - final def _length: Int = if (!inclusive) { - if (end > start == step > 0 && start != end) { - (_last.toLong - start.toLong) / step.toLong + 1 - } else 0 - }.toInt else { - if (end > start == step > 0 || start == end) { - (_last.toLong - start.toLong) / step.toLong + 1 - } else 0 - }.toInt - - final def _apply(idx: Int): Int = { - if (idx < 0 || idx >= _length) throw new IndexOutOfBoundsException(idx.toString) - start + idx * step - } - - private def locationAfterN(n: Int) = if (n > 0) { - if (step > 0) ((start.toLong + step.toLong * n.toLong) min _last.toLong).toInt - else ((start.toLong + step.toLong * n.toLong) max _last.toLong).toInt - } else start - - final def _take(n: Int) = if (n > 0 && _length > 0) { - create(start, locationAfterN(n), step, true) - } else create(start, start, step, false) - - final def _drop(n: Int) = create(locationAfterN(n), end, step, inclusive) - - final def _slice(from: Int, until: Int) = _drop(from)._take(until - from) - - } - -} - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/Combiners.scala b/src/parallel-collections/scala/collection/parallel/Combiners.scala deleted file mode 100644 index a37f642d42..0000000000 --- a/src/parallel-collections/scala/collection/parallel/Combiners.scala +++ /dev/null @@ -1,66 +0,0 @@ -package scala.collection.parallel - - -import scala.collection.Parallel -import scala.collection.mutable.Builder -import scala.collection.generic.Sizing - - - -/** The base trait for all combiners. - * A combiner lets one construct collections incrementally just like - * a regular builder, but also implements an efficient merge operation of two builders - * via `combine` method. Once the collection is constructed, it may be obtained by invoking - * the `result` method. - * - * @tparam Elem the type of the elements added to the builder - * @tparam To the type of the collection the builder produces - * - * @author prokopec - */ -trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel with TaskSupport { - self: EnvironmentPassingCombiner[Elem, To] => - - type EPC = EnvironmentPassingCombiner[Elem, To] - - /** Combines the contents of the receiver builder and the `other` builder, - * producing a new builder containing both their elements. - * - * This method may combine the two builders by copying them into a larger collection, - * by producing a lazy view that gets evaluated once `result` is invoked, or use - * a merge operation specific to the data structure in question. - * - * Note that both the receiver builder and `other` builder become invalidated - * after the invocation of this method, and should be cleared (see `clear`) - * if they are to be used again. - * - * Also, combining two combiners `c1` and `c2` for which `c1 eq c2` is `true`, that is, - * they are the same objects in memories, always does nothing and returns the first combiner. - * - * @tparam N the type of elements contained by the `other` builder - * @tparam NewTo the type of collection produced by the `other` builder - * @param other the other builder - * @return the parallel builder containing both the elements of this and the `other` builder - */ - def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] - -} - - -trait EnvironmentPassingCombiner[-Elem, +To] extends Combiner[Elem, To] { - abstract override def result = { - val res = super.result -// res.environment = environment - res - } -} - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/Iterators.scala b/src/parallel-collections/scala/collection/parallel/Iterators.scala deleted file mode 100644 index bfebff994c..0000000000 --- a/src/parallel-collections/scala/collection/parallel/Iterators.scala +++ /dev/null @@ -1,443 +0,0 @@ -package scala.collection.parallel - - - -import scala.collection.Parallel -import scala.collection.generic.Signalling -import scala.collection.generic.DelegatedSignalling -import scala.collection.generic.CanCombineFrom -import scala.collection.mutable.Builder -import scala.collection.Iterator.empty - - - - - - -trait RemainsIterator[+T] extends Iterator[T] { - /** The number of elements this iterator has yet to iterate. - * This method doesn't change the state of the iterator. - */ - def remaining: Int -} - - -/** Augments iterators with additional methods, mostly transformers, - * assuming they iterate an iterable collection. - * - * @param T type of the elements iterated. - * @param Repr type of the collection iterator iterates. - */ -trait AugmentedIterableIterator[+T, +Repr <: Parallel] extends RemainsIterator[T] { - - def repr: Repr - - /* accessors */ - - override def count(p: T => Boolean): Int = { - var i = 0 - while (hasNext) if (p(next)) i += 1 - i - } - - def reduce[U >: T](op: (U, U) => U): U = { - var r: U = next - while (hasNext) r = op(r, next) - r - } - - def fold[U >: T](z: U)(op: (U, U) => U): U = { - var r = z - while (hasNext) r = op(r, next) - r - } - - override def sum[U >: T](implicit num: Numeric[U]): U = { - var r: U = num.zero - while (hasNext) r = num.plus(r, next) - r - } - - override def product[U >: T](implicit num: Numeric[U]): U = { - var r: U = num.one - while (hasNext) r = num.times(r, next) - r - } - - override def min[U >: T](implicit ord: Ordering[U]): T = { - var r = next - while (hasNext) { - val curr = next - if (ord.lteq(curr, r)) r = curr - } - r - } - - override def max[U >: T](implicit ord: Ordering[U]): T = { - var r = next - while (hasNext) { - val curr = next - if (ord.gteq(curr, r)) r = curr - } - r - } - - override def copyToArray[U >: T](array: Array[U], from: Int, len: Int) { - var i = from - val until = from + len - while (i < until && hasNext) { - array(i) = next - i += 1 - } - } - - /* transformers to combiners */ - - def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = { - //val cb = pbf(repr) - cb.sizeHint(remaining) - while (hasNext) cb += f(next) - cb - } - - def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = { - val cb = pbf(repr) - while (hasNext) { - val curr = next - if (pf.isDefinedAt(curr)) cb += pf(curr) - } - cb - } - - def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = { - val cb = pbf(repr) - while (hasNext) { - val traversable = f(next) - if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator - else cb ++= traversable - } - cb - } - - def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](b: Bld): Bld = { - b.sizeHint(remaining) - while (hasNext) b += next - b - } - - def filter2combiner[U >: T, This >: Repr](pred: T => Boolean, cb: Combiner[U, This]): Combiner[U, This] = { - while (hasNext) { - val curr = next - if (pred(curr)) cb += curr - } - cb - } - - def filterNot2combiner[U >: T, This >: Repr](pred: T => Boolean, cb: Combiner[U, This]): Combiner[U, This] = { - while (hasNext) { - val curr = next - if (!pred(curr)) cb += curr - } - cb - } - - def partition2combiners[U >: T, This >: Repr](pred: T => Boolean, btrue: Combiner[U, This], bfalse: Combiner[U, This]) = { - while (hasNext) { - val curr = next - if (pred(curr)) btrue += curr - else bfalse += curr - } - (btrue, bfalse) - } - - def take2combiner[U >: T, This >: Repr](n: Int, cb: Combiner[U, This]): Combiner[U, This] = { - cb.sizeHint(n) - var left = n - while (left > 0) { - cb += next - left -= 1 - } - cb - } - - def drop2combiner[U >: T, This >: Repr](n: Int, cb: Combiner[U, This]): Combiner[U, This] = { - drop(n) - cb.sizeHint(remaining) - while (hasNext) cb += next - cb - } - - def slice2combiner[U >: T, This >: Repr](from: Int, until: Int, cb: Combiner[U, This]): Combiner[U, This] = { - drop(from) - var left = until - from - cb.sizeHint(left) - while (left > 0) { - cb += next - left -= 1 - } - cb - } - - def splitAt2combiners[U >: T, This >: Repr](at: Int, before: Combiner[U, This], after: Combiner[U, This]) = { - before.sizeHint(at) - after.sizeHint(remaining - at) - var left = at - while (left > 0) { - before += next - left -= 1 - } - while (hasNext) after += next - (before, after) - } - - def takeWhile2combiner[U >: T, This >: Repr](p: T => Boolean, cb: Combiner[U, This]) = { - var loop = true - while (hasNext && loop) { - val curr = next - if (p(curr)) cb += curr - else loop = false - } - (cb, loop) - } - - def span2combiners[U >: T, This >: Repr](p: T => Boolean, before: Combiner[U, This], after: Combiner[U, This]) = { - var isBefore = true - while (hasNext && isBefore) { - val curr = next - if (p(curr)) before += curr - else { - after.sizeHint(remaining + 1) - after += curr - isBefore = false - } - } - while (hasNext) after += next - (before, after) - } -} - - -trait AugmentedSeqIterator[+T, +Repr <: Parallel] extends AugmentedIterableIterator[T, Repr] { - - /** The exact number of elements this iterator has yet to iterate. - * This method doesn't change the state of the iterator. - */ - def remaining: Int - - /* accessors */ - - def prefixLength(pred: T => Boolean): Int = { - var total = 0 - var loop = true - while (hasNext && loop) { - if (pred(next)) total += 1 - else loop = false - } - total - } - - override def indexWhere(pred: T => Boolean): Int = { - var i = 0 - var loop = true - while (hasNext && loop) { - if (pred(next)) loop = false - else i += 1 - } - if (loop) -1 else i - } - - def lastIndexWhere(pred: T => Boolean): Int = { - var pos = -1 - var i = 0 - while (hasNext) { - if (pred(next)) pos = i - i += 1 - } - pos - } - - def corresponds[S](corr: (T, S) => Boolean)(that: Iterator[S]): Boolean = { - while (hasNext && that.hasNext) { - if (!corr(next, that.next)) return false - } - hasNext == that.hasNext - } - - /* transformers */ - - def reverse2combiner[U >: T, This >: Repr](cb: Combiner[U, This]): Combiner[U, This] = { - cb.sizeHint(remaining) - var lst = List[T]() - while (hasNext) lst ::= next - while (lst != Nil) { - cb += lst.head - lst = lst.tail - } - cb - } - - def reverseMap2combiner[S, That](f: T => S, cbf: CanCombineFrom[Repr, S, That]): Combiner[S, That] = { - val cb = cbf(repr) - cb.sizeHint(remaining) - var lst = List[S]() - while (hasNext) lst ::= f(next) - while (lst != Nil) { - cb += lst.head - lst = lst.tail - } - cb - } - - def updated2combiner[U >: T, That](index: Int, elem: U, cbf: CanCombineFrom[Repr, U, That]): Combiner[U, That] = { - val cb = cbf(repr) - cb.sizeHint(remaining) - var j = 0 - while (hasNext) { - if (j == index) { - cb += elem - next - } else cb += next - j += 1 - } - cb - } - -} - - - -trait ParallelIterableIterator[+T, +Repr <: Parallel] -extends AugmentedIterableIterator[T, Repr] - with Splitter[T] - with Signalling - with DelegatedSignalling -{ - def split: Seq[ParallelIterableIterator[T, Repr]] - - /** The number of elements this iterator has yet to traverse. This method - * doesn't change the state of the iterator. - * - * This method is used to provide size hints to builders and combiners, and - * to approximate positions of iterators within a data structure. - * - * '''Note''': This method may be implemented to return an upper bound on the number of elements - * in the iterator, instead of the exact number of elements to iterate. - * - * In that case, 2 considerations must be taken into account: - * - * 1) classes that inherit `ParallelIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt` and `copyToArray`. - * - * 2) if an iterator provides an upper bound on the number of elements, then after splitting the sum - * of `remaining` values of split iterators must be less than or equal to this upper bound. - */ - def remaining: Int -} - - -trait ParallelSeqIterator[+T, +Repr <: Parallel] -extends ParallelIterableIterator[T, Repr] - with AugmentedSeqIterator[T, Repr] - with PreciseSplitter[T] -{ - def split: Seq[ParallelSeqIterator[T, Repr]] - def psplit(sizes: Int*): Seq[ParallelSeqIterator[T, Repr]] - - /** The number of elements this iterator has yet to traverse. This method - * doesn't change the state of the iterator. Unlike the version of this method in the supertrait, - * method `remaining` in `ParallelSeqLike.this.ParallelIterator` must return an exact number - * of elements remaining in the iterator. - * - * @return an exact number of elements this iterator has yet to iterate - */ - def remaining: Int -} - - -trait DelegatedIterator[+T, +Delegate <: Iterator[T]] extends RemainsIterator[T] { - val delegate: Delegate - def next = delegate.next - def hasNext = delegate.hasNext -} - - -trait Counting[+T] extends RemainsIterator[T] { - val initialSize: Int - def remaining = initialSize - traversed - var traversed = 0 - abstract override def next = { - val n = super.next - traversed += 1 - n - } -} - - -/** A mixin for iterators that traverse only filtered elements of a delegate. - */ -trait FilteredIterator[+T, +Delegate <: Iterator[T]] extends DelegatedIterator[T, Delegate] { - protected[this] val pred: T => Boolean - - private[this] var hd: T = _ - private var hdDefined = false - - override def hasNext: Boolean = hdDefined || { - do { - if (!delegate.hasNext) return false - hd = delegate.next - } while (!pred(hd)) - hdDefined = true - true - } - - override def next = if (hasNext) { hdDefined = false; hd } else empty.next -} - - -/** A mixin for iterators that traverse elements of the delegate iterator, and of another collection. - */ -trait AppendedIterator[+T, +Delegate <: Iterator[T]] extends DelegatedIterator[T, Delegate] { - // `rest` should never alias `delegate` - protected[this] val rest: Iterator[T] - - private[this] var current: Iterator[T] = delegate - - override def hasNext = (current.hasNext) || (current == delegate && rest.hasNext) - - override def next = { - if (!current.hasNext) current = rest - current.next - } - -} - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala b/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala deleted file mode 100644 index 4882dc19ee..0000000000 --- a/src/parallel-collections/scala/collection/parallel/ParallelIterable.scala +++ /dev/null @@ -1,49 +0,0 @@ -package scala.collection.parallel - - -import scala.collection.generic._ -import scala.collection.parallel.mutable.ParallelArrayCombiner -import scala.collection.parallel.mutable.ParallelArray - - -/** A template trait for parallel iterable collections. - * - * $paralleliterableinfo - * - * $sideeffects - * - * @tparam T the element type of the collection - * - * @author prokopec - * @since 2.8 - */ -trait ParallelIterable[+T] extends Iterable[T] - with GenericParallelTemplate[T, ParallelIterable] - with ParallelIterableLike[T, ParallelIterable[T], Iterable[T]] { - override def companion: GenericCompanion[ParallelIterable] with GenericParallelCompanion[ParallelIterable] = ParallelIterable -} - -/** $factoryinfo - */ -object ParallelIterable extends ParallelFactory[ParallelIterable] { - implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelIterable[T]] = - new GenericCanCombineFrom[T] - - def newBuilder[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T] - - def newCombiner[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T] -} - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala deleted file mode 100644 index 01a108eea0..0000000000 --- a/src/parallel-collections/scala/collection/parallel/ParallelIterableLike.scala +++ /dev/null @@ -1,940 +0,0 @@ -package scala.collection.parallel - - - - -import scala.collection.mutable.Builder -import scala.collection.mutable.ListBuffer -import scala.collection.IterableLike -import scala.collection.Parallel -import scala.collection.Parallelizable -import scala.collection.Sequentializable -import scala.collection.generic._ - - - - -// TODO update docs!! -/** A template trait for parallel collections of type `ParallelIterable[T]`. - * - * $paralleliterableinfo - * - * $sideeffects - * - * @tparam T the element type of the collection - * @tparam Repr the type of the actual collection containing the elements - * - * @define paralleliterableinfo - * This is a base trait for Scala parallel collections. It defines behaviour - * common to all parallel collections. The actual parallel operation implementation - * is found in the `ParallelIterableFJImpl` trait extending this trait. Concrete - * parallel collections should inherit both this and that trait. - * - * Parallel operations are implemented with divide and conquer style algorithms that - * parallelize well. The basic idea is to split the collection into smaller parts until - * they are small enough to be operated on sequentially. - * - * All of the parallel operations are implemented in terms of several methods. The first is: - * {{{ - * def split: Seq[Repr] - * }}} - * which splits the collection into a sequence of disjunct views. This is typically a - * very fast operation which simply creates wrappers around the receiver collection. - * These views can then be split recursively into smaller views and so on. Each of - * the views is still a parallel collection. - * - * The next method is: - * {{{ - * def combine[OtherRepr >: Repr](other: OtherRepr): OtherRepr - * }}} - * which combines this collection with the argument collection and returns a collection - * containing both the elements of this collection and the argument collection. This behaviour - * may be implemented by producing a view that iterates over both collections, by aggressively - * copying all the elements into the new collection or by lazily creating a wrapper over both - * collections that gets evaluated once it's needed. It is recommended to avoid copying all of - * the elements for performance reasons, although that cost might be negligible depending on - * the use case. - * - * Methods: - * {{{ - * def seq: Repr - * }}} - * and - * {{{ - * def par: Repr - * }}} - * produce a view of the collection that has sequential or parallel operations, respectively. - * - * The method: - * {{{ - * def threshold(sz: Int, p: Int): Int - * }}} - * provides an estimate on the minimum number of elements the collection has before - * the splitting stops and depends on the number of elements in the collection. A rule of the - * thumb is the number of elements divided by 8 times the parallelism level. This method may - * be overridden in concrete implementations if necessary. - * - * Finally, method `newParallelBuilder` produces a new parallel builder. - * - * Since this trait extends the `Iterable` trait, methods like `size` and `iterator` must also - * be implemented. - * - * Each parallel collection is bound to a specific fork/join pool, on which dormant worker - * threads are kept. One can change a fork/join pool of a collection any time except during - * some method being invoked. The fork/join pool contains other information such as the parallelism - * level, that is, the number of processors used. When a collection is created, it is assigned the - * default fork/join pool found in the `scala.collection.parallel` package object. - * - * Parallel collections may or may not be strict, and they are not ordered in terms of the `foreach` - * operation (see `Traversable`). In terms of the iterator of the collection, some collections - * are ordered (for instance, parallel sequences). - * - * @author prokopec - * @since 2.8 - * - * @define sideeffects - * The higher-order functions passed to certain operations may contain side-effects. Since implementations - * of operations may not be sequential, this means that side-effects may not be predictable and may - * produce data-races, deadlocks or invalidation of state if care is not taken. It is up to the programmer - * to either avoid using side-effects or to use some form of synchronization when accessing mutable data. - * - * @define undefinedorder - * The order in which the operations on elements are performed is unspecified and may be nondeterministic. - * - * @define pbfinfo - * An implicit value of class `CanCombineFrom` which determines the - * result class `That` from the current representation type `Repr` and - * and the new element type `B`. This builder factory can provide a parallel - * builder for the resulting collection. - * - * @define abortsignalling - * This method will provide sequential views it produces with `abort` signalling capabilities. This means - * that sequential views may send and read `abort` signals. - * - * @define indexsignalling - * This method will provide sequential views it produces with `indexFlag` signalling capabilities. This means - * that sequential views may set and read `indexFlag` state. - */ -trait ParallelIterableLike[+T, +Repr <: Parallel, +SequentialView <: Iterable[T]] -extends IterableLike[T, Repr] - with Parallelizable[T, Repr] - with Sequentializable[T, SequentialView] - with Parallel - with HasNewCombiner[T, Repr] - with TaskSupport { - self => - - /** Parallel iterators are split iterators that have additional accessor and - * transformer methods defined in terms of methods `next` and `hasNext`. - * When creating a new parallel collection, one might want to override these - * new methods to make them more efficient. - * - * Parallel iterators are augmented with signalling capabilities. This means - * that a signalling object can be assigned to them as needed. - * - * The self-type ensures that signal context passing behaviour gets mixed in - * a concrete object instance. - */ - trait ParallelIterator extends ParallelIterableIterator[T, Repr] { - me: SignalContextPassingIterator[ParallelIterator] => - var signalDelegate: Signalling = IdleSignalling - def repr = self.repr - def split: Seq[ParallelIterator] - } - - /** A stackable modification that ensures signal contexts get passed along the iterators. - * A self-type requirement in `ParallelIterator` ensures that this trait gets mixed into - * concrete iterators. - */ - trait SignalContextPassingIterator[+IterRepr <: ParallelIterator] extends ParallelIterator { - // Note: This functionality must be factored out to this inner trait to avoid boilerplate. - // Also, one could omit the cast below. However, this leads to return type inconsistencies, - // due to inability to override the return type of _abstract overrides_. - // Be aware that this stackable modification has to be subclassed, so it shouldn't be rigid - // on the type of iterators it splits. - // The alternative is some boilerplate - better to tradeoff some type safety to avoid it here. - abstract override def split: Seq[IterRepr] = { - val pits = super.split - pits foreach { _.signalDelegate = signalDelegate } - pits.asInstanceOf[Seq[IterRepr]] - } - } - - /** Convenience for signal context passing iterator. - */ - type SCPI <: SignalContextPassingIterator[ParallelIterator] - - /** Creates a new parallel iterator used to traverse the elements of this parallel collection. - * This iterator is more specific than the iterator of the returned by `iterator`, and augmented - * with additional accessor and transformer methods. - * - * @return a parallel iterator - */ - protected def parallelIterator: ParallelIterator - - /** Creates a new split iterator used to traverse the elements of this collection. - * - * By default, this method is implemented in terms of the protected `parallelIterator` method. - * - * @return a split iterator - */ - def iterator: Splitter[T] = parallelIterator - - def par = repr - - /** Some minimal number of elements after which this collection should be handled - * sequentially by different processors. - * - * This method depends on the size of the collection and the parallelism level, which - * are both specified as arguments. - * - * @param sz the size based on which to compute the threshold - * @param p the parallelism level based on which to compute the threshold - * @return the maximum number of elements for performing operations sequentially - */ - def threshold(sz: Int, p: Int): Int = thresholdFromSize(sz, p) - - /** The `newBuilder` operation returns a parallel builder assigned to this collection's fork/join pool. - * This method forwards the call to `newCombiner`. - */ - protected[this] override def newBuilder: collection.mutable.Builder[T, Repr] = newCombiner - - /** Optionally reuses existing combiner for better performance. By default it doesn't - subclasses may override this behaviour. - * The provided combiner `oldc` that can potentially be reused will be either some combiner from the previous computational task, or `None` if there - * was no previous phase (in which case this method must return `newc`). - * - * @param oldc The combiner that is the result of the previous task, or `None` if there was no previous task. - * @param newc The new, empty combiner that can be used. - * @return Either `newc` or `oldc`. - */ - protected def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]): Combiner[S, That] = newc - - /* convenience task operations wrapper */ - protected implicit def task2ops[R, Tp](tsk: Task[R, Tp]) = new { - def mapResult[R1](mapping: R => R1): ResultMapping[R, Tp, R1] = new ResultMapping[R, Tp, R1](tsk) { - def map(r: R): R1 = mapping(r) - } - - def compose[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] { - val ft = tsk - val st = t2 - def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr) - } - - def parallel[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] { - val ft = tsk - val st = t2 - def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr) - } - } - - protected def wrap[R](body: => R) = new NonDivisible[R] { - def leaf(prevr: Option[R]) = result = body - var result: R = null.asInstanceOf[R] - } - - /* convenience iterator operations wrapper */ - protected implicit def iterator2ops[PI <: ParallelIterator](it: PI) = new { - def assign(cntx: Signalling): PI = { - it.signalDelegate = cntx - it - } - } - - protected implicit def builder2ops[Elem, To](cb: Builder[Elem, To]) = new { - def ifIs[Cmb](isbody: Cmb => Unit) = new { - def otherwise(notbody: => Unit)(implicit m: ClassManifest[Cmb]) { - if (cb.getClass == m.erasure) isbody(cb.asInstanceOf[Cmb]) else notbody - } - } - } - - override def toString = seq.mkString(stringPrefix + "(", ", ", ")") - - /** Reduces the elements of this sequence using the specified associative binary operator. - * - * $undefinedorder - * - * Note this method has a different signature than the `reduceLeft` - * and `reduceRight` methods of the trait `Traversable`. - * The result of reducing may only be a supertype of this parallel collection's - * type parameter `T`. - * - * @tparam U A type parameter for the binary operator, a supertype of `T`. - * @param op A binary operator that must be associative. - * @return The result of applying reduce operator `op` between all the elements if the collection is nonempty. - * @throws UnsupportedOperationException - * if this $coll is empty. - */ - def reduce[U >: T](op: (U, U) => U): U = { - executeAndWaitResult(new Reduce(op, parallelIterator)) - } - - /** Optionally reduces the elements of this sequence using the specified associative binary operator. - * - * $undefinedorder - * - * Note this method has a different signature than the `reduceLeftOption` - * and `reduceRightOption` methods of the trait `Traversable`. - * The result of reducing may only be a supertype of this parallel collection's - * type parameter `T`. - * - * @tparam U A type parameter for the binary operator, a supertype of `T`. - * @param op A binary operator that must be associative. - * @return An option value containing result of applying reduce operator `op` between all - * the elements if the collection is nonempty, and `None` otherwise. - */ - def reduceOption[U >: T](op: (U, U) => U): Option[U] = if (isEmpty) None else Some(reduce(op)) - - /** Folds the elements of this sequence using the specified associative binary operator. - * The order in which the elements are reduced is unspecified and may be nondeterministic. - * - * Note this method has a different signature than the `foldLeft` - * and `foldRight` methods of the trait `Traversable`. - * The result of folding may only be a supertype of this parallel collection's - * type parameter `T`. - * - * @tparam U a type parameter for the binary operator, a supertype of `T`. - * @param z a neutral element for the fold operation, it may be added to the result - * an arbitrary number of times, not changing the result (e.g. `Nil` for list concatenation, - * 0 for addition, or 1 for multiplication) - * @param op a binary operator that must be associative - * @return the result of applying fold operator `op` between all the elements and `z` - */ - def fold[U >: T](z: U)(op: (U, U) => U): U = { - executeAndWaitResult(new Fold(z, op, parallelIterator)) - } - - /** Aggregates the results of applying an operator to subsequent elements. - * - * This is a more general form of `fold` and `reduce`. It has similar semantics, but does - * not require the result to be a supertype of the element type. It traverses the elements in - * different partitions sequentially, using `seqop` to update the result, and then - * applies `combop` to results from different partitions. The implementation of this - * operation may operate on an arbitrary number of collection partitions, so `combop` - * may be invoked arbitrary number of times. - * - * For example, one might want to process some elements and then produce a `Set`. In this - * case, `seqop` would process an element and append it to the list, while `combop` - * would concatenate two lists from different partitions together. The initial value - * `z` would be an empty set. - * - * {{{ - * pc.aggregate(Set[Int]())(_ += process(_), _ ++ _) - * }}} - * - * Another example is calculating geometric mean from a collection of doubles - * (one would typically require big doubles for this). - * - * @tparam S the type of accumulated results - * @param z the initial value for the accumulated result of the partition - this - * will typically be the neutral element for the `seqop` operator (e.g. - * `Nil` for list concatenation or `0` for summation) - * @param seqop an operator used to accumulate results within a partition - * @param combop an associative operator used to combine results from different partitions - */ - def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = { - executeAndWaitResult(new Aggregate(z, seqop, combop, parallelIterator)) - } - - /** Applies a function `f` to all the elements of the receiver. - * - * $undefinedorder - * - * @tparam U the result type of the function applied to each element, which is always discarded - * @param f function that's applied to each element - */ - override def foreach[U](f: T => U): Unit = { - executeAndWait(new Foreach(f, parallelIterator)) - } - - override def count(p: T => Boolean): Int = { - executeAndWaitResult(new Count(p, parallelIterator)) - } - - override def sum[U >: T](implicit num: Numeric[U]): U = { - executeAndWaitResult(new Sum[U](num, parallelIterator)) - } - - override def product[U >: T](implicit num: Numeric[U]): U = { - executeAndWaitResult(new Product[U](num, parallelIterator)) - } - - override def min[U >: T](implicit ord: Ordering[U]): T = { - executeAndWaitResult(new Min(ord, parallelIterator)).asInstanceOf[T] - } - - override def max[U >: T](implicit ord: Ordering[U]): T = { - executeAndWaitResult(new Max(ord, parallelIterator)).asInstanceOf[T] - } - - override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => - executeAndWaitResult(new Map[S, That](f, pbf, parallelIterator) mapResult { _.result }) - } otherwise super.map(f)(bf) - - override def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => - executeAndWaitResult(new Collect[S, That](pf, pbf, parallelIterator) mapResult { _.result }) - } otherwise super.collect(pf)(bf) - - override def flatMap[S, That](f: T => Traversable[S])(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => - executeAndWaitResult(new FlatMap[S, That](f, pbf, parallelIterator) mapResult { _.result }) - } otherwise super.flatMap(f)(bf) - - /** Tests whether a predicate holds for all elements of this $coll. - * - * $abortsignalling - * - * @param p a predicate used to test elements - * @return true if `p` holds for all elements, false otherwise - */ - override def forall(pred: T => Boolean): Boolean = { - executeAndWaitResult(new Forall(pred, parallelIterator assign new DefaultSignalling with VolatileAbort)) - } - - /** Tests whether a predicate holds for some element of this $coll. - * - * $abortsignalling - * - * @param p a predicate used to test elements - * @return true if `p` holds for some element, false otherwise - */ - override def exists(pred: T => Boolean): Boolean = { - executeAndWaitResult(new Exists(pred, parallelIterator assign new DefaultSignalling with VolatileAbort)) - } - - /** Finds some element in the collection for which the predicate holds, if such - * an element exists. The element may not necessarily be the first such element - * in the iteration order. - * - * If there are multiple elements obeying the predicate, the choice is nondeterministic. - * - * $abortsignalling - * - * @param p predicate used to test the elements - * @return an option value with the element if such an element exists, or `None` otherwise - */ - override def find(pred: T => Boolean): Option[T] = { - executeAndWaitResult(new Find(pred, parallelIterator assign new DefaultSignalling with VolatileAbort)) - } - - protected[this] def cbfactory = () => newCombiner - - override def filter(pred: T => Boolean): Repr = { - executeAndWaitResult(new Filter(pred, cbfactory, parallelIterator) mapResult { _.result }) - } - - override def filterNot(pred: T => Boolean): Repr = { - executeAndWaitResult(new FilterNot(pred, cbfactory, parallelIterator) mapResult { _.result }) - } - - override def ++[U >: T, That](that: TraversableOnce[U])(implicit bf: CanBuildFrom[Repr, U, That]): That = { - if (that.isParallel && bf.isParallel) { - // println("case both are parallel") - val other = that.asParallelIterable - val pbf = bf.asParallel - val copythis = new Copy(() => pbf(repr), parallelIterator) - val copythat = wrap { - val othtask = new other.Copy(() => pbf(self.repr), other.parallelIterator) - othtask.compute - othtask.result - } - val task = (copythis parallel copythat) { _ combine _ } mapResult { _.result } - executeAndWaitResult(task) - } else if (bf.isParallel) { - // println("case parallel builder, `that` not parallel") - val pbf = bf.asParallel - val copythis = new Copy(() => pbf(repr), parallelIterator) - val copythat = wrap { - val cb = pbf(repr) - for (elem <- that) cb += elem - cb - } - executeAndWaitResult((copythis parallel copythat) { _ combine _ } mapResult { _.result }) - } else { - // println("case not a parallel builder") - val b = bf(repr) - this.parallelIterator.copy2builder[U, That, Builder[U, That]](b) - if (that.isInstanceOf[Parallel]) for (elem <- that.asInstanceOf[Iterable[U]].iterator) b += elem - else for (elem <- that) b += elem - b.result - } - } - - override def partition(pred: T => Boolean): (Repr, Repr) = { - executeAndWaitResult(new Partition(pred, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) }) - } - - override def take(n: Int): Repr = { - val actualn = if (size > n) n else size - if (actualn < MIN_FOR_COPY) take_sequential(actualn) - else executeAndWaitResult(new Take(actualn, cbfactory, parallelIterator) mapResult { _.result }) - } - - private def take_sequential(n: Int) = { - val cb = newCombiner - cb.sizeHint(n) - val it = parallelIterator - var left = n - while (left > 0) { - cb += it.next - left -= 1 - } - cb.result - } - - override def drop(n: Int): Repr = { - val actualn = if (size > n) n else size - if ((size - actualn) < MIN_FOR_COPY) drop_sequential(actualn) - else executeAndWaitResult(new Drop(actualn, cbfactory, parallelIterator) mapResult { _.result }) - } - - private def drop_sequential(n: Int) = { - val it = parallelIterator drop n - val cb = newCombiner - cb.sizeHint(size - n) - while (it.hasNext) cb += it.next - cb.result - } - - override def slice(unc_from: Int, unc_until: Int): Repr = { - val from = unc_from min size max 0 - val until = unc_until min size max from - if ((until - from) <= MIN_FOR_COPY) slice_sequential(from, until) - else executeAndWaitResult(new Slice(from, until, cbfactory, parallelIterator) mapResult { _.result }) - } - - private def slice_sequential(from: Int, until: Int): Repr = { - val cb = newCombiner - var left = until - from - val it = parallelIterator drop from - while (left > 0) { - cb += it.next - left -= 1 - } - cb.result - } - - override def splitAt(n: Int): (Repr, Repr) = { - executeAndWaitResult(new SplitAt(n, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) }) - } - - /** Takes the longest prefix of elements that satisfy the predicate. - * - * $indexsignalling - * The index flag is initially set to maximum integer value. - * - * @param pred the predicate used to test the elements - * @return the longest prefix of this $coll of elements that satisy the predicate `pred` - */ - override def takeWhile(pred: T => Boolean): Repr = { - val cntx = new DefaultSignalling with AtomicIndexFlag - cntx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new TakeWhile(0, pred, cbfactory, parallelIterator assign cntx) mapResult { _._1.result }) - } - - /** Splits this $coll into a prefix/suffix pair according to a predicate. - * - * $indexsignalling - * The index flag is initially set to maximum integer value. - * - * @param pred the predicate used to test the elements - * @return a pair consisting of the longest prefix of the collection for which all - * the elements satisfy `pred`, and the rest of the collection - */ - override def span(pred: T => Boolean): (Repr, Repr) = { - val cntx = new DefaultSignalling with AtomicIndexFlag - cntx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new Span(0, pred, cbfactory, parallelIterator assign cntx) mapResult { - p => (p._1.result, p._2.result) - }) - } - - /** Drops all elements in the longest prefix of elements that satisfy the predicate, - * and returns a collection composed of the remaining elements. - * - * $indexsignalling - * The index flag is initially set to maximum integer value. - * - * @param pred the predicate used to test the elements - * @return a collection composed of all the elements after the longest prefix of elements - * in this $coll that satisfy the predicate `pred` - */ - override def dropWhile(pred: T => Boolean): Repr = { - val cntx = new DefaultSignalling with AtomicIndexFlag - cntx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new Span(0, pred, cbfactory, parallelIterator assign cntx) mapResult { _._2.result }) - } - - override def copyToArray[U >: T](xs: Array[U], start: Int, len: Int) = if (len > 0) { - executeAndWait(new CopyToArray(start, len, xs, parallelIterator)) - } - - override def toIterable: Iterable[T] = seq.drop(0).asInstanceOf[Iterable[T]] - - override def toArray[U >: T: ClassManifest]: Array[U] = { - val arr = new Array[U](size) - copyToArray(arr) - arr - } - - override def toList: List[T] = seq.toList - - override def toIndexedSeq[S >: T]: collection.immutable.IndexedSeq[S] = seq.toIndexedSeq[S] - - override def toStream: Stream[T] = seq.toStream - - override def toSet[S >: T]: collection.immutable.Set[S] = seq.toSet - - override def toSeq: Seq[T] = seq.toSeq - - /* tasks */ - - /** Standard accessor task that iterates over the elements of the collection. - * - * @tparam R type of the result of this method (`R` for result). - * @tparam Tp the representation type of the task at hand. - */ - protected trait Accessor[R, Tp] - extends super.Task[R, Tp] { - val pit: ParallelIterator - def newSubtask(p: ParallelIterator): Accessor[R, Tp] - def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel) - def split = pit.split.map(newSubtask(_)) // default split procedure - override def toString = "Accessor(" + pit.toString + ")" - } - - protected[this] trait NonDivisibleTask[R, Tp] extends super.Task[R, Tp] { - def shouldSplitFurther = false - def split = throw new UnsupportedOperationException("Does not split.") - override def toString = "NonDivisibleTask" - } - - protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]] - - protected[this] trait Composite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] - extends NonDivisibleTask[R, Composite[FR, SR, R, First, Second]] { - val ft: First - val st: Second - def combineResults(fr: FR, sr: SR): R - var result: R = null.asInstanceOf[R] - } - - /** Sequentially performs one task after another. */ - protected[this] trait SeqComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] - extends Composite[FR, SR, R, First, Second] { - def leaf(prevr: Option[R]) = { - ft.compute - st.compute - result = combineResults(ft.result, st.result) - } - } - - /** Performs two tasks in parallel, and waits for both to finish. */ - protected[this] trait ParComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] - extends Composite[FR, SR, R, First, Second] { - def leaf(prevr: Option[R]) = { - st.start - ft.compute - st.sync - result = combineResults(ft.result, st.result) - } - } - - protected[this] abstract class ResultMapping[R, Tp, R1](val inner: Task[R, Tp]) - extends NonDivisibleTask[R1, ResultMapping[R, Tp, R1]] { - var result: R1 = null.asInstanceOf[R1] - def map(r: R): R1 - def leaf(prevr: Option[R1]) = { - inner.compute - result = map(inner.result) - } - } - - protected trait Transformer[R, Tp] extends Accessor[R, Tp] - - protected[this] class Foreach[S](op: T => S, val pit: ParallelIterator) extends Accessor[Unit, Foreach[S]] { - var result: Unit = () - def leaf(prevr: Option[Unit]) = pit.foreach(op) - def newSubtask(p: ParallelIterator) = new Foreach[S](op, p) - } - - protected[this] class Count(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Int, Count] { - var result: Int = 0 - def leaf(prevr: Option[Int]) = result = pit.count(pred) - def newSubtask(p: ParallelIterator) = new Count(pred, p) - override def merge(that: Count) = result = result + that.result - } - - protected[this] class Reduce[U >: T](op: (U, U) => U, val pit: ParallelIterator) extends Accessor[U, Reduce[U]] { - var result: U = null.asInstanceOf[U] - def leaf(prevr: Option[U]) = result = pit.reduce(op) - def newSubtask(p: ParallelIterator) = new Reduce(op, p) - override def merge(that: Reduce[U]) = result = op(result, that.result) - } - - protected[this] class Fold[U >: T](z: U, op: (U, U) => U, val pit: ParallelIterator) extends Accessor[U, Fold[U]] { - var result: U = null.asInstanceOf[U] - def leaf(prevr: Option[U]) = result = pit.fold(z)(op) - def newSubtask(p: ParallelIterator) = new Fold(z, op, p) - override def merge(that: Fold[U]) = result = op(result, that.result) - } - - protected[this] class Aggregate[S](z: S, seqop: (S, T) => S, combop: (S, S) => S, val pit: ParallelIterator) - extends Accessor[S, Aggregate[S]] { - var result: S = null.asInstanceOf[S] - def leaf(prevr: Option[S]) = result = pit.foldLeft(z)(seqop) - def newSubtask(p: ParallelIterator) = new Aggregate(z, seqop, combop, p) - override def merge(that: Aggregate[S]) = result = combop(result, that.result) - } - - protected[this] class Sum[U >: T](num: Numeric[U], val pit: ParallelIterator) extends Accessor[U, Sum[U]] { - var result: U = null.asInstanceOf[U] - def leaf(prevr: Option[U]) = result = pit.sum(num) - def newSubtask(p: ParallelIterator) = new Sum(num, p) - override def merge(that: Sum[U]) = result = num.plus(result, that.result) - } - - protected[this] class Product[U >: T](num: Numeric[U], val pit: ParallelIterator) extends Accessor[U, Product[U]] { - var result: U = null.asInstanceOf[U] - def leaf(prevr: Option[U]) = result = pit.product(num) - def newSubtask(p: ParallelIterator) = new Product(num, p) - override def merge(that: Product[U]) = result = num.times(result, that.result) - } - - protected[this] class Min[U >: T](ord: Ordering[U], val pit: ParallelIterator) extends Accessor[U, Min[U]] { - var result: U = null.asInstanceOf[U] - def leaf(prevr: Option[U]) = result = pit.min(ord) - def newSubtask(p: ParallelIterator) = new Min(ord, p) - override def merge(that: Min[U]) = result = if (ord.lteq(result, that.result)) result else that.result - } - - protected[this] class Max[U >: T](ord: Ordering[U], val pit: ParallelIterator) extends Accessor[U, Max[U]] { - var result: U = null.asInstanceOf[U] - def leaf(prevr: Option[U]) = result = pit.max(ord) - def newSubtask(p: ParallelIterator) = new Max(ord, p) - override def merge(that: Max[U]) = result = if (ord.gteq(result, that.result)) result else that.result - } - - protected[this] class Map[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator) - extends Transformer[Combiner[S, That], Map[S, That]] { - var result: Combiner[S, That] = null - def leaf(prev: Option[Combiner[S, That]]) = result = pit.map2combiner(f, reuse(prev, pbf(self.repr))) - def newSubtask(p: ParallelIterator) = new Map(f, pbf, p) - override def merge(that: Map[S, That]) = result = result combine that.result - } - - protected[this] class Collect[S, That] - (pf: PartialFunction[T, S], pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator) - extends Transformer[Combiner[S, That], Collect[S, That]] { - var result: Combiner[S, That] = null - def leaf(prev: Option[Combiner[S, That]]) = result = pit.collect2combiner[S, That](pf, pbf) // TODO - def newSubtask(p: ParallelIterator) = new Collect(pf, pbf, p) - override def merge(that: Collect[S, That]) = result = result combine that.result - } - - protected[this] class FlatMap[S, That](f: T => Traversable[S], pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator) - extends Transformer[Combiner[S, That], FlatMap[S, That]] { - var result: Combiner[S, That] = null - def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf) // TODO - def newSubtask(p: ParallelIterator) = new FlatMap(f, pbf, p) - override def merge(that: FlatMap[S, That]) = result = result combine that.result - } - - protected[this] class Forall(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Boolean, Forall] { - var result: Boolean = true - def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.forall(pred); if (result == false) pit.abort } - def newSubtask(p: ParallelIterator) = new Forall(pred, p) - override def merge(that: Forall) = result = result && that.result - } - - protected[this] class Exists(pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Boolean, Exists] { - var result: Boolean = false - def leaf(prev: Option[Boolean]) = { if (!pit.isAborted) result = pit.exists(pred); if (result == true) pit.abort } - def newSubtask(p: ParallelIterator) = new Exists(pred, p) - override def merge(that: Exists) = result = result || that.result - } - - protected[this] class Find[U >: T](pred: T => Boolean, val pit: ParallelIterator) extends Accessor[Option[U], Find[U]] { - var result: Option[U] = None - def leaf(prev: Option[Option[U]]) = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort } - def newSubtask(p: ParallelIterator) = new Find(pred, p) - override def merge(that: Find[U]) = if (this.result == None) result = that.result - } - - protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) - extends Transformer[Combiner[U, This], Filter[U, This]] { - var result: Combiner[U, This] = null - def leaf(prev: Option[Combiner[U, This]]) = result = pit.filter2combiner(pred, reuse(prev, cbf())) - def newSubtask(p: ParallelIterator) = new Filter(pred, cbf, p) - override def merge(that: Filter[U, This]) = result = result combine that.result - } - - protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) - extends Transformer[Combiner[U, This], FilterNot[U, This]] { - var result: Combiner[U, This] = null - def leaf(prev: Option[Combiner[U, This]]) = result = pit.filterNot2combiner(pred, reuse(prev, cbf())) - def newSubtask(p: ParallelIterator) = new FilterNot(pred, cbf, p) - override def merge(that: FilterNot[U, This]) = result = result combine that.result - } - - protected class Copy[U >: T, That](cfactory: () => Combiner[U, That], val pit: ParallelIterator) - extends Transformer[Combiner[U, That], Copy[U, That]] { - var result: Combiner[U, That] = null - def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cfactory())) - def newSubtask(p: ParallelIterator) = new Copy[U, That](cfactory, p) - override def merge(that: Copy[U, That]) = result = result combine that.result - } - - protected[this] class Partition[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) - extends Transformer[(Combiner[U, This], Combiner[U, This]), Partition[U, This]] { - var result: (Combiner[U, This], Combiner[U, This]) = null - def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.partition2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) - def newSubtask(p: ParallelIterator) = new Partition(pred, cbf, p) - override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2) - } - - protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator) - extends Transformer[Combiner[U, This], Take[U, This]] { - var result: Combiner[U, This] = null - def leaf(prev: Option[Combiner[U, This]]) = result = pit.take2combiner(n, reuse(prev, cbf())) - def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException - override def split = { - val pits = pit.split - val sizes = pits.scanLeft(0)(_ + _.remaining) - for ((p, untilp) <- pits zip sizes; if untilp <= n) yield { - if (untilp + p.remaining < n) new Take(p.remaining, cbf, p) - else new Take(n - untilp, cbf, p) - } - } - override def merge(that: Take[U, This]) = result = result combine that.result - } - - protected[this] class Drop[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator) - extends Transformer[Combiner[U, This], Drop[U, This]] { - var result: Combiner[U, This] = null - def leaf(prev: Option[Combiner[U, This]]) = result = pit.drop2combiner(n, reuse(prev, cbf())) - def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException - override def split = { - val pits = pit.split - val sizes = pits.scanLeft(0)(_ + _.remaining) - for ((p, withp) <- pits zip sizes.tail; if withp >= n) yield { - if (withp - p.remaining > n) new Drop(0, cbf, p) - else new Drop(n - withp + p.remaining, cbf, p) - } - } - override def merge(that: Drop[U, This]) = result = result combine that.result - } - - protected[this] class Slice[U >: T, This >: Repr](from: Int, until: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator) - extends Transformer[Combiner[U, This], Slice[U, This]] { - var result: Combiner[U, This] = null - def leaf(prev: Option[Combiner[U, This]]) = result = pit.slice2combiner(from, until, reuse(prev, cbf())) - def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException - override def split = { - val pits = pit.split - val sizes = pits.scanLeft(0)(_ + _.remaining) - for ((p, untilp) <- pits zip sizes; if untilp + p.remaining >= from || untilp <= until) yield { - val f = (from max untilp) - untilp - val u = (until min (untilp + p.remaining)) - untilp - new Slice(f, u, cbf, p) - } - } - override def merge(that: Slice[U, This]) = result = result combine that.result - } - - protected[this] class SplitAt[U >: T, This >: Repr](at: Int, cbf: () => Combiner[U, This], val pit: ParallelIterator) - extends Transformer[(Combiner[U, This], Combiner[U, This]), SplitAt[U, This]] { - var result: (Combiner[U, This], Combiner[U, This]) = null - def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = result = pit.splitAt2combiners(at, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) - def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException - override def split = { - val pits = pit.split - val sizes = pits.scanLeft(0)(_ + _.remaining) - for ((p, untilp) <- pits zip sizes) yield new SplitAt((at max untilp min (untilp + p.remaining)) - untilp, cbf, p) - } - override def merge(that: SplitAt[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2) - } - - protected[this] class TakeWhile[U >: T, This >: Repr] - (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) - extends Transformer[(Combiner[U, This], Boolean), TakeWhile[U, This]] { - var result: (Combiner[U, This], Boolean) = null - def leaf(prev: Option[(Combiner[U, This], Boolean)]) = if (pos < pit.indexFlag) { - result = pit.takeWhile2combiner(pred, reuse(prev.map(_._1), cbf())) - if (!result._2) pit.setIndexFlagIfLesser(pos) - } else result = (reuse(prev.map(_._1), cbf()), false) - def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException - override def split = { - val pits = pit.split - for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new TakeWhile(pos + untilp, pred, cbf, p) - } - override def merge(that: TakeWhile[U, This]) = if (result._2) { - result = (result._1 combine that.result._1, that.result._2) - } - } - - protected[this] class Span[U >: T, This >: Repr] - (pos: Int, pred: T => Boolean, cbf: () => Combiner[U, This], val pit: ParallelIterator) - extends Transformer[(Combiner[U, This], Combiner[U, This]), Span[U, This]] { - var result: (Combiner[U, This], Combiner[U, This]) = null - def leaf(prev: Option[(Combiner[U, This], Combiner[U, This])]) = if (pos < pit.indexFlag) { - result = pit.span2combiners(pred, reuse(prev.map(_._1), cbf()), reuse(prev.map(_._2), cbf())) - if (result._2.size > 0) pit.setIndexFlagIfLesser(pos) - } else { - result = (reuse(prev.map(_._2), cbf()), pit.copy2builder[U, This, Combiner[U, This]](reuse(prev.map(_._2), cbf()))) - } - def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException - override def split = { - val pits = pit.split - for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Span(pos + untilp, pred, cbf, p) - } - override def merge(that: Span[U, This]) = result = if (result._2.size == 0) { - (result._1 combine that.result._1, that.result._2) - } else { - (result._1, result._2 combine that.result._1 combine that.result._2) - } - } - - protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], val pit: ParallelIterator) - extends Accessor[Unit, CopyToArray[U, This]] { - var result: Unit = () - def leaf(prev: Option[Unit]) = pit.copyToArray(array, from, len) - def newSubtask(p: ParallelIterator) = throw new UnsupportedOperationException - override def split = { - val pits = pit.split - for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining); if untilp < len) yield { - val plen = p.remaining min (len - untilp) - new CopyToArray[U, This](from + untilp, plen, array, p) - } - } - } - -} - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/ParallelIterableView.scala b/src/parallel-collections/scala/collection/parallel/ParallelIterableView.scala deleted file mode 100644 index f40f02eb3b..0000000000 --- a/src/parallel-collections/scala/collection/parallel/ParallelIterableView.scala +++ /dev/null @@ -1,33 +0,0 @@ -package scala.collection.parallel - - - - -import scala.collection.Parallel -import scala.collection.TraversableViewLike -import scala.collection.IterableView - - - - -/** A template view of a non-strict view of a parallel iterable collection. - * - * @tparam T ... - * @tparam Coll ... - * - * @since 2.8 - */ -trait ParallelIterableView[+T, +Coll <: Parallel, +CollSeq] -extends ParallelIterableViewLike[T, Coll, CollSeq, ParallelIterableView[T, Coll, CollSeq], IterableView[T, CollSeq]] - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/ParallelIterableViewLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelIterableViewLike.scala deleted file mode 100644 index 024eb48d25..0000000000 --- a/src/parallel-collections/scala/collection/parallel/ParallelIterableViewLike.scala +++ /dev/null @@ -1,59 +0,0 @@ -package scala.collection.parallel - - - - -import scala.collection.Parallel -import scala.collection.TraversableViewLike -import scala.collection.IterableView -import scala.collection.IterableViewLike - - - - - -/** A template view of a non-strict view of parallel iterable collection. - * - * '''Note:''' Regular view traits have type parameters used to carry information - * about the type of the elements, type of the collection they are derived from and - * their own actual representation type. Parallel views have an additional parameter - * which carries information about the type of the sequential version of the view. - * - * @tparam T the type of the elements this view can traverse - * @tparam Coll the type of the collection this view is derived from - * @tparam CollSeq TODO - * @tparam This the actual representation type of this view - * @tparam ThisSeq the type of the sequential representation of this view - * - * @since 2.8 - */ -trait ParallelIterableViewLike[+T, - +Coll <: Parallel, - +CollSeq, - +This <: ParallelIterableView[T, Coll, CollSeq] with ParallelIterableViewLike[T, Coll, CollSeq, This, ThisSeq], - +ThisSeq <: IterableView[T, CollSeq] with IterableViewLike[T, CollSeq, ThisSeq]] -extends IterableView[T, Coll] - with IterableViewLike[T, Coll, This] - with ParallelIterable[T] - with ParallelIterableLike[T, This, ThisSeq] -{ - self => - - override protected[this] def newCombiner: Combiner[T, This] = throw new UnsupportedOperationException(this + ".newCombiner"); - - //type SCPI = SignalContextPassingIterator[ParallelIterator] // complains when overriden further in inh. hier., TODO check it out - type CPI = SignalContextPassingIterator[ParallelIterator] - - trait Transformed[+S] extends ParallelIterableView[S, Coll, CollSeq] with super.Transformed[S] - -} - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/ParallelMap.scala b/src/parallel-collections/scala/collection/parallel/ParallelMap.scala deleted file mode 100644 index 5ce61469bc..0000000000 --- a/src/parallel-collections/scala/collection/parallel/ParallelMap.scala +++ /dev/null @@ -1,71 +0,0 @@ -package scala.collection.parallel - - - - - -import scala.collection.Map -import scala.collection.mutable.Builder -import scala.collection.generic.ParallelMapFactory -import scala.collection.generic.GenericParallelMapTemplate -import scala.collection.generic.GenericParallelMapCompanion -import scala.collection.generic.CanCombineFrom - - - - - - -trait ParallelMap[K, +V] -extends Map[K, V] - with GenericParallelMapTemplate[K, V, ParallelMap] - with ParallelIterable[(K, V)] - with ParallelMapLike[K, V, ParallelMap[K, V], Map[K, V]] -{ -self => - - def mapCompanion: GenericParallelMapCompanion[ParallelMap] = ParallelMap - - override def empty: ParallelMap[K, V] = new immutable.ParallelHashTrie[K, V] - - override def stringPrefix = "ParallelMap" -} - - - -object ParallelMap extends ParallelMapFactory[ParallelMap] { - def empty[K, V]: ParallelMap[K, V] = new immutable.ParallelHashTrie[K, V] - - def newCombiner[K, V]: Combiner[(K, V), ParallelMap[K, V]] = immutable.HashTrieCombiner[K, V] - - implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParallelMap[K, V]] = new CanCombineFromMap[K, V] - -} - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala deleted file mode 100644 index 8a0b54525f..0000000000 --- a/src/parallel-collections/scala/collection/parallel/ParallelMapLike.scala +++ /dev/null @@ -1,43 +0,0 @@ -package scala.collection.parallel - - - - -import scala.collection.MapLike -import scala.collection.Map -import scala.collection.mutable.Builder - - - - - - - - -trait ParallelMapLike[K, - +V, - +Repr <: ParallelMapLike[K, V, Repr, SequentialView] with ParallelMap[K, V], - +SequentialView <: Map[K, V]] -extends MapLike[K, V, Repr] - with ParallelIterableLike[(K, V), Repr, SequentialView] -{ self => - - protected[this] override def newBuilder: Builder[(K, V), Repr] = newCombiner - - protected[this] override def newCombiner: Combiner[(K, V), Repr] = error("Must be implemented in concrete classes.") - - override def empty: Repr - -} - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala deleted file mode 100644 index 71b802cd11..0000000000 --- a/src/parallel-collections/scala/collection/parallel/ParallelSeq.scala +++ /dev/null @@ -1,64 +0,0 @@ -package scala.collection.parallel - - - -import scala.collection.generic.GenericCompanion -import scala.collection.generic.GenericParallelCompanion -import scala.collection.generic.GenericParallelTemplate -import scala.collection.generic.ParallelFactory -import scala.collection.generic.CanCombineFrom -import scala.collection.parallel.mutable.ParallelArrayCombiner -import scala.collection.parallel.mutable.ParallelArray - - -/** A template trait for parallel sequences. - * - * $parallelseqinfo - * - * $sideeffects - */ -trait ParallelSeq[+T] extends Seq[T] - with ParallelIterable[T] - with GenericParallelTemplate[T, ParallelSeq] - with ParallelSeqLike[T, ParallelSeq[T], Seq[T]] { - override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq - - def apply(i: Int): T - - override def toString = super[ParallelIterable].toString -} - - -object ParallelSeq extends ParallelFactory[ParallelSeq] { - implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeq[T]] = new GenericCanCombineFrom[T] - - def newBuilder[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T] - - def newCombiner[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T] -} - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala deleted file mode 100644 index 18b0c83f23..0000000000 --- a/src/parallel-collections/scala/collection/parallel/ParallelSeqLike.scala +++ /dev/null @@ -1,473 +0,0 @@ -package scala.collection.parallel - - -import scala.collection.Parallel -import scala.collection.SeqLike -import scala.collection.generic.DefaultSignalling -import scala.collection.generic.AtomicIndexFlag -import scala.collection.generic.CanBuildFrom -import scala.collection.generic.CanCombineFrom -import scala.collection.generic.VolatileAbort - - - - -// TODO update docs!! -/** A template trait for sequences of type `ParallelSeq[T]`, representing - * parallel sequences with element type `T`. - * - * $parallelseqinfo - * - * @tparam T the type of the elements contained in this collection - * @tparam Repr the type of the actual collection containing the elements - * - * @define parallelseqinfo - * Parallel sequences inherit the `IndexedSeq` trait. This means they provide - * efficient indexing and length computations. Like their sequential counterparts - * they always have a defined order of elements. This means they will produce resulting - * parallel sequences in the same way sequential sequences do. However, the order - * in which they iterate over elements to produce results is not defined and is generally - * nondeterministic. If the higher-order functions given to them produce no sideeffects, - * then this won't be noticeable. - * - * This trait defines a new, more general `split` operation and reimplements the `split` - * operation of `ParallelIterable` trait using the new `split` operation. - * - * @author prokopec - * @since 2.8 - */ -trait ParallelSeqLike[+T, +Repr <: Parallel, +Sequential <: Seq[T] with SeqLike[T, Sequential]] -extends scala.collection.SeqLike[T, Repr] - with ParallelIterableLike[T, Repr, Sequential] { - self => - - type SuperParallelIterator = super.ParallelIterator - - /** An iterator that can be split into arbitrary subsets of iterators. - * The self-type requirement ensures that the signal context passing behaviour gets mixed in - * the concrete iterator instance in some concrete collection. - * - * '''Note:''' In concrete collection classes, collection implementers might want to override the iterator - * `reverse2builder` method to ensure higher efficiency. - */ - trait ParallelIterator extends ParallelSeqIterator[T, Repr] with super.ParallelIterator { - me: SignalContextPassingIterator[ParallelIterator] => - def split: Seq[ParallelIterator] - def psplit(sizes: Int*): Seq[ParallelIterator] - } - - /** A stackable modification that ensures signal contexts get passed along the iterators. - * A self-type requirement in `ParallelIterator` ensures that this trait gets mixed into - * concrete iterators. - */ - trait SignalContextPassingIterator[+IterRepr <: ParallelIterator] - extends ParallelIterator with super.SignalContextPassingIterator[IterRepr] { - // Note: See explanation in `ParallelIterableLike.this.SignalContextPassingIterator` - // to understand why we do the cast here, and have a type parameter. - // Bottomline: avoiding boilerplate and fighting against inability to override stackable modifications. - abstract override def psplit(sizes: Int*): Seq[IterRepr] = { - val pits = super.psplit(sizes: _*) - pits foreach { _.signalDelegate = signalDelegate } - pits.asInstanceOf[Seq[IterRepr]] - } - } - - /** A convenient shorthand for the signal context passing stackable modification. - */ - type SCPI <: SignalContextPassingIterator[ParallelIterator] - - /** A more refined version of the iterator found in the `ParallelIterable` trait, - * this iterator can be split into arbitrary subsets of iterators. - * - * @return an iterator that can be split into subsets of precise size - */ - protected def parallelIterator: ParallelIterator - - override def iterator: PreciseSplitter[T] = parallelIterator - - override def size = length - - /** Used to iterate elements using indices */ - protected abstract class Elements(start: Int, val end: Int) extends ParallelIterator with BufferedIterator[T] { - me: SignalContextPassingIterator[ParallelIterator] => - - private var i = start - - def hasNext = i < end - - def next: T = if (i < end) { - val x = self(i) - i += 1 - x - } else Iterator.empty.next - - def head = self(i) - - final def remaining = end - i - - def split = psplit(remaining / 2, remaining - remaining / 2) - - def psplit(sizes: Int*) = { - val incr = sizes.scanLeft(0)(_ + _) - for ((from, until) <- incr.init zip incr.tail) yield { - new Elements(start + from, (start + until) min end) with SignalContextPassingIterator[ParallelIterator] - } - } - - override def toString = "Elements(" + start + ", " + end + ")" - } - - /* ParallelSeq methods */ - - /** Returns the length of the longest segment of elements starting at - * a given position satisfying some predicate. - * - * $indexsignalling - * - * The index flag is initially set to maximum integer value. - * - * @param p the predicate used to test the elements - * @param from the starting offset for the search - * @return the length of the longest segment of elements starting at `from` and - * satisfying the predicate - */ - override def segmentLength(p: T => Boolean, from: Int): Int = if (from >= length) 0 else { - val realfrom = if (from < 0) 0 else from - val ctx = new DefaultSignalling with AtomicIndexFlag - ctx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new SegmentLength(p, 0, parallelIterator.psplit(realfrom, length - realfrom)(1) assign ctx))._1 - } - - override def prefixLength(p: T => Boolean) = segmentLength(p, 0) - - /** Finds the first element satisfying some predicate. - * - * $indexsignalling - * - * The index flag is initially set to maximum integer value. - * - * @param p the predicate used to test the elements - * @param from the starting offset for the search - * @return the index `>= from` of the first element of this $coll that satisfies the predicate `p`, - * or `-1`, if none exists - */ - override def indexWhere(p: T => Boolean, from: Int): Int = if (from >= length) -1 else { - val realfrom = if (from < 0) 0 else from - val ctx = new DefaultSignalling with AtomicIndexFlag - ctx.setIndexFlag(Int.MaxValue) - executeAndWaitResult(new IndexWhere(p, realfrom, parallelIterator.psplit(realfrom, length - realfrom)(1) assign ctx)) - } - - override def indexWhere(p: T => Boolean): Int = indexWhere(p, 0) - - override def findIndexOf(p: T => Boolean): Int = indexWhere(p, 0) - - override def indexOf[U >: T](elem: U): Int = indexOf(elem, 0) - - override def indexOf[U >: T](elem: U, from: Int): Int = indexWhere(elem ==, from) - - /** Finds the last element satisfying some predicate. - * - * $indexsignalling - * - * The index flag is initially set to minimum integer value. - * - * @param p the predicate used to test the elements - * @param end the maximum offset for the search - * @return the index `<= end` of the first element of this $coll that satisfies the predicate `p`, - * or `-1`, if none exists - */ - override def lastIndexWhere(p: T => Boolean, end: Int): Int = if (end < 0) -1 else { - val until = if (end >= length) length else end + 1 - val ctx = new DefaultSignalling with AtomicIndexFlag - ctx.setIndexFlag(Int.MinValue) - executeAndWaitResult(new LastIndexWhere(p, 0, parallelIterator.psplit(until, length - until)(0) assign ctx)) - } - - override def reverse: Repr = { - executeAndWaitResult(new Reverse(() => newCombiner, parallelIterator) mapResult { _.result }) - } - - override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => - executeAndWaitResult(new ReverseMap[S, That](f, pbf, parallelIterator) mapResult { _.result }) - } otherwise super.reverseMap(f)(bf) - - override def startsWith[S](that: Seq[S]): Boolean = startsWith(that, 0) - - /** Tests whether this $coll contains the given sequence at a given index. - * - * $abortsignalling - * - * @tparam U the element type of `that` parallel sequence - * @param that the parallel sequence this sequence is being searched for - * @param offset the starting offset for the search - * @return `true` if there is a sequence `that` starting at `offset` in this sequence, `false` otherwise - */ - override def startsWith[S](that: Seq[S], offset: Int): Boolean = that ifParallelSeq { pthat => - if (offset < 0 || offset >= length) offset == length && pthat.length == 0 - else if (pthat.length == 0) true - else if (pthat.length > length - offset) false - else { - val ctx = new DefaultSignalling with VolatileAbort - executeAndWaitResult(new SameElements(parallelIterator.psplit(offset, pthat.length)(1) assign ctx, pthat.parallelIterator)) - } - } otherwise super.startsWith(that, offset) - - override def sameElements[U >: T](that: Iterable[U]): Boolean = that ifParallelSeq { pthat => - val ctx = new DefaultSignalling with VolatileAbort - length == pthat.length && executeAndWaitResult(new SameElements(parallelIterator assign ctx, pthat.parallelIterator)) - } otherwise super.sameElements(that) - - /** Tests whether this $coll ends with the given parallel sequence - * - * $abortsignalling - * - * @tparam S the type of the elements of `that` sequence - * @param that the sequence to test - * @return `true` if this $coll has `that` as a suffix, `false` otherwise - */ - override def endsWith[S](that: Seq[S]): Boolean = that ifParallelSeq { pthat => - if (that.length == 0) true - else if (that.length > length) false - else { - val ctx = new DefaultSignalling with VolatileAbort - val tlen = that.length - executeAndWaitResult(new SameElements(parallelIterator.psplit(length - tlen, tlen)(1) assign ctx, pthat.parallelIterator)) - } - } otherwise super.endsWith(that) - - override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int) - (implicit bf: CanBuildFrom[Repr, U, That]): That = if (patch.isParallelSeq && bf.isParallel) { - val that = patch.asParallelSeq - val pbf = bf.asParallel - val realreplaced = replaced min (length - from) - val pits = parallelIterator.psplit(from, replaced, length - from - realreplaced) - val copystart = new Copy[U, That](() => pbf(repr), pits(0)) - val copymiddle = wrap { - val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator) - tsk.compute - tsk.result - } - val copyend = new Copy[U, That](() => pbf(repr), pits(2)) - executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { _.result }) - } else patch_sequential(from, patch, replaced) - - private def patch_sequential[U >: T, That](from: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = { - val b = bf(repr) - val repl = r min (length - from) - val pits = parallelIterator.psplit(from, repl, length - from - repl) - b ++= pits(0) - b ++= patch.iterator - b ++= pits(2) - b.result - } - - override def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = bf ifParallel { pbf => - executeAndWaitResult(new Updated(index, elem, pbf, parallelIterator) mapResult { _.result }) - } otherwise super.updated(index, elem) - - override def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = { - patch(0, mutable.ParallelArray(elem), 0) - } - - override def :+[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = { - patch(length, mutable.ParallelArray(elem), 0) - } - - override def padTo[U >: T, That](len: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (length < len) { - patch(length, new immutable.Repetition(elem, len - length), 0) - } else patch(length, Nil, 0) - - /** Tests whether every element of this $coll relates to the - * corresponding element of another parallel sequence by satisfying a test predicate. - * - * $abortsignalling - * - * @param that the other parallel sequence - * @param p the test predicate, which relates elements from both sequences - * @tparam S the type of the elements of `that` - * @return `true` if both parallel sequences have the same length and - * `p(x, y)` is `true` for all corresponding elements `x` of this $coll - * and `y` of `that`, otherwise `false` - */ - override def corresponds[S](that: Seq[S])(p: (T, S) => Boolean): Boolean = that ifParallelSeq { pthat => - val ctx = new DefaultSignalling with VolatileAbort - length == pthat.length && executeAndWaitResult(new Corresponds(p, parallelIterator assign ctx, pthat.parallelIterator)) - } otherwise super.corresponds(that)(p) - - override def toString = seq.mkString(stringPrefix + "(", ", ", ")") - - override def view = new ParallelSeqView[T, Repr, Sequential] { - protected lazy val underlying = self.repr - def length = self.length - def apply(idx: Int) = self(idx) - def seq = self.seq.view - def parallelIterator = new Elements(0, length) with SCPI {} - } - - override def view(from: Int, until: Int) = view.slice(from, until) - - /* tasks */ - - protected def down(p: SuperParallelIterator) = p.asInstanceOf[ParallelIterator] - - protected trait Accessor[R, Tp] extends super.Accessor[R, Tp] { - val pit: ParallelIterator - } - - protected trait Transformer[R, Tp] extends Accessor[R, Tp] with super.Transformer[R, Tp] - - protected[this] class SegmentLength(pred: T => Boolean, from: Int, val pit: ParallelIterator) - extends Accessor[(Int, Boolean), SegmentLength] { - var result: (Int, Boolean) = null - def leaf(prev: Option[(Int, Boolean)]) = if (from < pit.indexFlag) { - val itsize = pit.remaining - val seglen = pit.prefixLength(pred) - result = (seglen, itsize == seglen) - if (!result._2) pit.setIndexFlagIfLesser(from) - } else result = (0, false) - def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException - override def split = { - val pits = pit.split - for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new SegmentLength(pred, from + untilp, p) - } - override def merge(that: SegmentLength) = if (result._2) result = (result._1 + that.result._1, that.result._2) - } - - protected[this] class IndexWhere(pred: T => Boolean, from: Int, val pit: ParallelIterator) - extends Accessor[Int, IndexWhere] { - var result: Int = -1 - def leaf(prev: Option[Int]) = if (from < pit.indexFlag) { - val r = pit.indexWhere(pred) - if (r != -1) { - result = from + r - pit.setIndexFlagIfLesser(from) - } - } - def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException - override def split = { - val pits = pit.split - for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield new IndexWhere(pred, untilp, p) - } - override def merge(that: IndexWhere) = result = if (result == -1) that.result else { - if (that.result != -1) result min that.result else result - } - } - - protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, val pit: ParallelIterator) - extends Accessor[Int, LastIndexWhere] { - var result: Int = -1 - def leaf(prev: Option[Int]) = if (pos > pit.indexFlag) { - val r = pit.lastIndexWhere(pred) - if (r != -1) { - result = pos + r - pit.setIndexFlagIfGreater(pos) - } - } - def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException - override def split = { - val pits = pit.split - for ((p, untilp) <- pits zip pits.scanLeft(pos)(_ + _.remaining)) yield new LastIndexWhere(pred, untilp, p) - } - override def merge(that: LastIndexWhere) = result = if (result == -1) that.result else { - if (that.result != -1) result max that.result else result - } - } - - protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], val pit: ParallelIterator) - extends Transformer[Combiner[U, This], Reverse[U, This]] { - var result: Combiner[U, This] = null - def leaf(prev: Option[Combiner[U, This]]) = result = pit.reverse2combiner(reuse(prev, cbf())) - def newSubtask(p: SuperParallelIterator) = new Reverse(cbf, down(p)) - override def merge(that: Reverse[U, This]) = result = that.result combine result - } - - protected[this] class ReverseMap[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], val pit: ParallelIterator) - extends Transformer[Combiner[S, That], ReverseMap[S, That]] { - var result: Combiner[S, That] = null - def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf) // TODO - def newSubtask(p: SuperParallelIterator) = new ReverseMap(f, pbf, down(p)) - override def merge(that: ReverseMap[S, That]) = result = that.result combine result - } - - protected[this] class SameElements[U >: T](val pit: ParallelIterator, val otherpit: PreciseSplitter[U]) - extends Accessor[Boolean, SameElements[U]] { - var result: Boolean = true - def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { - result = pit.sameElements(otherpit) - if (!result) pit.abort - } - def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException - override def split = { - val fp = pit.remaining / 2 - val sp = pit.remaining - fp - for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new SameElements(p, op) - } - override def merge(that: SameElements[U]) = result = result && that.result - } - - protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], val pit: ParallelIterator) - extends Transformer[Combiner[U, That], Updated[U, That]] { - var result: Combiner[U, That] = null - def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf) // TODO - def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException - override def split = { - val pits = pit.split - for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Updated(pos - untilp, elem, pbf, p) - } - override def merge(that: Updated[U, That]) = result = result combine that.result - } - - protected[this] class Corresponds[S](corr: (T, S) => Boolean, val pit: ParallelIterator, val otherpit: PreciseSplitter[S]) - extends Accessor[Boolean, Corresponds[S]] { - var result: Boolean = true - def leaf(prev: Option[Boolean]) = if (!pit.isAborted) { - result = pit.corresponds(corr)(otherpit) - if (!result) pit.abort - } - def newSubtask(p: SuperParallelIterator) = throw new UnsupportedOperationException - override def split = { - val fp = pit.remaining / 2 - val sp = pit.remaining - fp - for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new Corresponds(corr, p, op) - } - override def merge(that: Corresponds[S]) = result = result && that.result - } - -} - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala deleted file mode 100644 index 7862e99f44..0000000000 --- a/src/parallel-collections/scala/collection/parallel/ParallelSeqView.scala +++ /dev/null @@ -1,64 +0,0 @@ -package scala.collection.parallel - - - - -import scala.collection.TraversableView -import scala.collection.SeqView -import scala.collection.Parallel -import scala.collection.generic.CanCombineFrom - - - - - -/** A template view of a non-strict view of a parallel sequence. - * - * @tparam T - * @tparam Coll - * - * @since 2.8 - */ -trait ParallelSeqView[+T, +Coll <: Parallel, +CollSeq] -extends ParallelSeqViewLike[T, Coll, CollSeq, ParallelSeqView[T, Coll, CollSeq], SeqView[T, CollSeq]] - - - -object ParallelSeqView { - abstract class NoCombiner[T] extends Combiner[T, Nothing] { - self: EnvironmentPassingCombiner[T, Nothing] => - def +=(elem: T): this.type = this - def iterator: Iterator[T] = Iterator.empty - def result() = throw new UnsupportedOperationException("ParallelSeqView.Combiner.result") - def size = throw new UnsupportedOperationException("ParallelSeqView.Combiner.size") - def clear() {} - def combine[N <: T, NewTo >: Nothing](other: Combiner[N, NewTo]) = - throw new UnsupportedOperationException("ParallelSeqView.Combiner.result") - } - - type Coll = ParallelSeqView[_, C, _] forSome { type C <: ParallelSeq[_] } - - implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] = - new CanCombineFrom[Coll, T, ParallelSeqView[T, ParallelSeq[T], Seq[T]]] { - def apply(from: Coll) = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing] - def apply() = new NoCombiner[T] with EnvironmentPassingCombiner[T, Nothing] - } -} - - - - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala b/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala deleted file mode 100644 index eab4d7ad5f..0000000000 --- a/src/parallel-collections/scala/collection/parallel/ParallelSeqViewLike.scala +++ /dev/null @@ -1,192 +0,0 @@ -package scala.collection.parallel - - - - - -import scala.collection.SeqView -import scala.collection.SeqViewLike -import scala.collection.Parallel -import scala.collection.generic.CanBuildFrom -import scala.collection.generic.CanCombineFrom - - - - - - - -/** A template view of a non-strict view of parallel sequence. - * - * @tparam T the type of the elements in this view - * @tparam Coll type of the collection this view is derived from - * @tparam CollSeq TODO - * @tparam This actual representation type of this view - * @tparam ThisSeq type of the sequential version of this view - * - * @since 2.8 - */ -trait ParallelSeqViewLike[+T, - +Coll <: Parallel, - +CollSeq, - +This <: ParallelSeqView[T, Coll, CollSeq] with ParallelSeqViewLike[T, Coll, CollSeq, This, ThisSeq], - +ThisSeq <: SeqView[T, CollSeq] with SeqViewLike[T, CollSeq, ThisSeq]] -extends SeqView[T, Coll] - with SeqViewLike[T, Coll, This] - with ParallelIterableView[T, Coll, CollSeq] - with ParallelIterableViewLike[T, Coll, CollSeq, This, ThisSeq] - with ParallelSeq[T] - with ParallelSeqLike[T, This, ThisSeq] -{ - self => - - type SCPI = SignalContextPassingIterator[ParallelIterator] - - trait Transformed[+S] extends ParallelSeqView[S, Coll, CollSeq] - with super[ParallelIterableView].Transformed[S] with super[SeqView].Transformed[S] { - override def parallelIterator = new Elements(0, length) with SCPI {} - override def iterator = parallelIterator - environment = self.environment - } - - trait Forced[S] extends super.Forced[S] with Transformed[S] { - // cheating here - knowing that `underlying` of `self.seq` is of type `CollSeq`, - // we use it to obtain a view of the correct type - not the most efficient thing - // in the universe, but without making `newForced` more accessible, or adding - // a `forced` method to `SeqView`, this is the best we can do - def seq = self.seq.take(0).++(forced).asInstanceOf[SeqView[S, CollSeq]] - } - - trait Filtered extends super.Filtered with Transformed[T] { - def seq = self.seq filter pred - } - - trait Sliced extends super.Sliced with Transformed[T] { - override def slice(from1: Int, until1: Int): This = newSliced(from1 max 0, until1 max 0).asInstanceOf[This] - def seq = self.seq.slice(from, until) - } - - trait Appended[U >: T] extends super.Appended[U] with Transformed[U] { - def seq = self.seq.++(rest).asInstanceOf[SeqView[U, CollSeq]] - } - - trait Mapped[S] extends super.Mapped[S] with Transformed[S]{ - def seq = self.seq.map(mapping).asInstanceOf[SeqView[S, CollSeq]] - } - - trait FlatMapped[S] extends super.FlatMapped[S] with Transformed[S] { - def seq = self.seq.flatMap(mapping).asInstanceOf[SeqView[S, CollSeq]] - } - - trait TakenWhile extends super.TakenWhile with Transformed[T] { - def seq = self.seq takeWhile pred - } - - trait DroppedWhile extends super.DroppedWhile with Transformed[T] { - def seq = self.seq dropWhile pred - } - - trait Zipped[S] extends super.Zipped[S] with Transformed[(T, S)] { - def seq = (self.seq zip other).asInstanceOf[SeqView[(T, S), CollSeq]] - } - - trait ZippedAll[T1 >: T, S] extends super.ZippedAll[T1, S] with Transformed[(T1, S)] { - def seq = self.seq.zipAll(other, thisElem, thatElem).asInstanceOf[SeqView[(T1, S), CollSeq]] - } - - trait Reversed extends super.Reversed with Transformed[T] { - def seq = self.seq.reverse - } - - trait Patched[U >: T] extends super.Patched[U] with Transformed[U] { - def seq = self.seq.patch(from, patch, replaced).asInstanceOf[SeqView[U, CollSeq]] - } - - trait Prepended[U >: T] extends super.Prepended[U] with Transformed[U] { - def seq = (fst +: self.seq).asInstanceOf[SeqView[U, CollSeq]] - } - - protected override def newFiltered(p: T => Boolean): Transformed[T] = new Filtered { val pred = p } - protected override def newSliced(f: Int, u: Int): Transformed[T] = new Sliced { val from = f; val until = u } - protected override def newAppended[U >: T](that: Traversable[U]): Transformed[U] = new Appended[U] { val rest = that } - protected override def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f } - protected override def newFlatMapped[S](f: T => Traversable[S]): Transformed[S] = new FlatMapped[S] { val mapping = f } - protected override def newDroppedWhile(p: T => Boolean): Transformed[T] = new DroppedWhile { val pred = p } - protected override def newTakenWhile(p: T => Boolean): Transformed[T] = new TakenWhile { val pred = p } - protected override def newZipped[S](that: Iterable[S]): Transformed[(T, S)] = new Zipped[S] { val other = that } - protected override def newZippedAll[T1 >: T, S](that: Iterable[S], _thisElem: T1, _thatElem: S): Transformed[(T1, S)] = new ZippedAll[T1, S] { val other = that; val thisElem = _thisElem; val thatElem = _thatElem } - protected override def newReversed: Transformed[T] = new Reversed { } - protected override def newPatched[U >: T](_from: Int, _patch: Seq[U], _replaced: Int): Transformed[U] = new Patched[U] { val from = _from; val patch = _patch; val replaced = _replaced } - protected override def newPrepended[U >: T](elem: U): Transformed[U] = new Prepended[U] { protected[this] val fst = elem } - - override def filter(p: T => Boolean): This = newFiltered(p).asInstanceOf[This] - override def filterNot(p: T => Boolean): This = newFiltered(!p(_)).asInstanceOf[This] - override def partition(p: T => Boolean): (This, This) = (filter(p), filterNot(p)) - override def slice(from: Int, until: Int): This = newSliced(from, until).asInstanceOf[This] - override def take(n: Int): This = newSliced(0, n).asInstanceOf[This] - override def drop(n: Int): This = newSliced(n, length).asInstanceOf[This] - override def splitAt(n: Int): (This, This) = (take(n), drop(n)) - override def ++[U >: T, That](xs: TraversableOnce[U])(implicit bf: CanBuildFrom[This, U, That]): That = newAppended(xs.toTraversable).asInstanceOf[That] - override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That] - override def flatMap[S, That](f: T => Traversable[S])(implicit bf: CanBuildFrom[This, S, That]): That = newFlatMapped(f).asInstanceOf[That] - override def collect[S, That](pf: PartialFunction[T, S])(implicit bf: CanBuildFrom[This, S, That]): That = filter(pf.isDefinedAt).map(pf)(bf) - override def takeWhile(p: T => Boolean): This = newTakenWhile(p).asInstanceOf[This] - override def dropWhile(p: T => Boolean): This = newDroppedWhile(p).asInstanceOf[This] - override def span(p: T => Boolean): (This, This) = (takeWhile(p), dropWhile(p)) - override def scanLeft[S, That](z: S)(op: (S, T) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisSeq.scanLeft(z)(op)).asInstanceOf[That] - override def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisSeq.scanRight(z)(op)).asInstanceOf[That] - override def groupBy[K](f: T => K): collection.immutable.Map[K, This] = thisSeq.groupBy(f).mapValues(xs => newForced(xs).asInstanceOf[This]) - override def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = newPrepended(elem).asInstanceOf[That] - override def reverse: This = newReversed.asInstanceOf[This] - override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int)(implicit bf: CanBuildFrom[This, U, That]): That = newPatched(from, patch, replaced).asInstanceOf[That] - override def padTo[U >: T, That](len: Int, elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = patch(length, Seq.fill(len - length)(elem), 0) - override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = reverse.map(f) - override def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = { - require(0 <= index && index < length) - patch(index, List(elem), 1)(bf) - } - override def :+[U >: T, That](elem: U)(implicit bf: CanBuildFrom[This, U, That]): That = ++(Iterator.single(elem))(bf) - override def union[U >: T, That](that: Seq[U])(implicit bf: CanBuildFrom[This, U, That]): That = this ++ that - override def diff[U >: T](that: Seq[U]): This = newForced(thisSeq diff that).asInstanceOf[This] - override def intersect[U >: T](that: Seq[U]): This = newForced(thisSeq intersect that).asInstanceOf[This] - override def sorted[U >: T](implicit ord: Ordering[U]): This = newForced(thisSeq sorted ord).asInstanceOf[This] - - override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf => - executeAndWaitResult(new Force(pbf, parallelIterator) mapResult { _.result }) - } otherwise { - val b = bf(underlying) - b ++= this.iterator - b.result - } - - /* tasks */ - - protected[this] class Force[U >: T, That](cbf: CanCombineFrom[Coll, U, That], val pit: ParallelIterator) - extends Transformer[Combiner[U, That], Force[U, That]] { - var result: Combiner[U, That] = null - def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cbf(self.underlying))) - def newSubtask(p: SuperParallelIterator) = new Force(cbf, down(p)) - override def merge(that: Force[U, That]) = result = result combine that.result - } - -} - - - - - - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/Splitters.scala b/src/parallel-collections/scala/collection/parallel/Splitters.scala deleted file mode 100644 index b3cad6d67a..0000000000 --- a/src/parallel-collections/scala/collection/parallel/Splitters.scala +++ /dev/null @@ -1,86 +0,0 @@ -package scala.collection.parallel - - -import scala.collection.Seq - - -/** A splitter (or a split iterator) can be split into more splitters that traverse over - * disjoint subsets of elements. - * - * @tparam T type of the elements this parallel iterator traverses - * - * @since 2.8.1 - * @author prokopec - */ -trait Splitter[+T] extends Iterator[T] { - - /** Splits the iterator into a sequence of disjunct views. - * - * Returns a sequence of split iterators, each iterating over some subset of the - * elements in the collection. These subsets are disjoint and should be approximately - * equal in size. These subsets are not empty, unless the iterator is empty in which - * case this method returns a sequence with a single empty iterator. If the iterator has - * more than two elements, this method will return two or more iterators. - * - * Implementors are advised to keep this partition relatively small - two iterators are - * already enough when partitioning the collection, although there may be a few more. - * - * '''Note:''' this method actually invalidates the current iterator. - * - * @return a sequence of disjunct iterators of the collection - */ - def split: Seq[Splitter[T]] - -} - - -/** A precise splitter (or a precise split iterator) can be split into arbitrary number of splitters - * that traverse disjoint subsets of arbitrary sizes. - * - * Implementors might want to override the parameterless `split` method for efficiency. - * - * @tparam T type of the elements this parallel iterator traverses - * - * @since 2.8.1 - * @author prokopec - */ -trait PreciseSplitter[+T] extends Splitter[T] { - - /** Splits the iterator into disjunct views. - * - * This overloaded version of the `split` method is specific to precise parallel iterators. - * It returns a sequence of parallel iterators, each iterating some subset of the - * elements in this iterator. The sizes of the subiterators in the partition is equal to - * the size in the corresponding argument, as long as there are enough elements in this - * iterator to split it that way. - * - * If there aren't enough elements, a zero element iterator is appended for each additional argument. - * If there are additional elements, an additional iterator is appended at the end to compensate. - * - * For example, say we have a parallel iterator `ps` with 100 elements. Invoking: - * {{{ - * ps.split(50, 25, 25, 10, 5) - * }}} - * will return a sequence of five iterators, last two views being empty. On the other hand, calling: - * {{{ - * ps.split(50, 40) - * }}} - * will return a sequence of three iterators, last of them containing ten elements. - * - * '''Note:''' this method actually invalidates the current iterator. - * - * Unlike the case with `split` found in parallel iterable iterators, views returned by this method can be empty. - * - * @param sizes the sizes used to split this split iterator into iterators that traverse disjunct subsets - * @return a sequence of disjunct subsequence iterators of this parallel iterator - */ - def psplit(sizes: Int*): Seq[PreciseSplitter[T]] - - def split: Seq[PreciseSplitter[T]] - -} - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/TaskSupport.scala b/src/parallel-collections/scala/collection/parallel/TaskSupport.scala deleted file mode 100644 index 8a072b22aa..0000000000 --- a/src/parallel-collections/scala/collection/parallel/TaskSupport.scala +++ /dev/null @@ -1,27 +0,0 @@ -package scala.collection.parallel - - - - - - - -trait TaskSupport extends AdaptiveWorkStealingForkJoinTasks - - - - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/Tasks.scala b/src/parallel-collections/scala/collection/parallel/Tasks.scala deleted file mode 100644 index 3ef60f8c7a..0000000000 --- a/src/parallel-collections/scala/collection/parallel/Tasks.scala +++ /dev/null @@ -1,230 +0,0 @@ -package scala.collection.parallel - - - - -import scala.concurrent.forkjoin._ - - - - - - - - - - -/** A trait that declares task execution capabilities used - * by parallel collections. Parallel collections inherit a subtrait - * of this trait. - * - * One implementation trait of `TaskExecution` is `ForkJoinTaskExecution`. - */ -trait Tasks { - - /** A task abstraction which allows starting a task with `start`, - * waiting for it to finish with `sync` and attempting to cancel - * the task with `tryCancel`. - * It also defines a method `leaf` which must be called once the - * the task is started and defines what this task actually does. - * Method `split` allows splitting this task into smaller subtasks, - * and method `shouldSplitFurther` decides if the task should be - * partitioned further. - * Method `merge` allows merging the results of the 2 tasks. It updates - * the result of the receiver. - * Finally, it defines the task result of type `U`. - */ - trait Task[R, +Tp] { - def repr = this.asInstanceOf[Tp] - /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */ - def compute - /** Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous task - * or `None` if there was no previous task. - */ - def leaf(result: Option[R]) - /** Start task. */ - def start - /** Wait for task to finish. */ - def sync - /** Try to cancel the task. - * @return `true` if cancellation is successful. - */ - def tryCancel: Boolean - /** A result that can be accessed once the task is completed. */ - def result: R - /** Decides whether or not this task should be split further. */ - def shouldSplitFurther: Boolean - /** Splits this task into a list of smaller tasks. */ - protected[this] def split: Seq[Task[R, Tp]] - /** Read of results of `that` task and merge them into results of this one. */ - protected[this] def merge(that: Tp) {} - } - - type TaskType[R, +Tp] <: Task[R, Tp] - type ExecutionEnvironment - - var environment: ExecutionEnvironment - - /** Executes a task and waits for it to finish. */ - def executeAndWait[R, Tp](task: TaskType[R, Tp]) - - /** Executes a result task, waits for it to finish, then returns its result. */ - def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R - - /** Retrieves the parallelism level of the task execution environment. */ - def parallelismLevel: Int - -} - - -/** This trait implements scheduling by employing - * an adaptive work stealing technique. - */ -trait AdaptiveWorkStealingTasks extends Tasks { - - trait Task[R, Tp] extends super.Task[R, Tp] { - var next: Task[R, Tp] = null - var shouldWaitFor = true - var result: R - - def split: Seq[Task[R, Tp]] - - /** The actual leaf computation. */ - def leaf(result: Option[R]): Unit - - def compute = if (shouldSplitFurther) internal else leaf(None) - - def internal = { - var last = spawnSubtasks - - last.leaf(None) - result = last.result - - while (last.next != null) { - val lastresult = Option(last.result) - last = last.next - if (last.tryCancel) last.leaf(lastresult) else last.sync - merge(last.repr) - } - } - - def spawnSubtasks = { - var last: Task[R, Tp] = null - var head: Task[R, Tp] = this - do { - val subtasks = head.split - head = subtasks.head - for (t <- subtasks.tail) { - t.next = last - last = t - t.start - } - } while (head.shouldSplitFurther); - head.next = last - head - } - - def printChain = { - var curr = this - var chain = "chain: " - while (curr != null) { - chain += curr + " ---> " - curr = curr.next - } - println(chain) - } - } - -} - - -/** - * A trait describing objects that provide a fork/join pool. - */ -trait HavingForkJoinPool { - def forkJoinPool: ForkJoinPool -} - - - -/** An implementation trait for parallel tasks based on the fork/join framework. - * - * @define fjdispatch - * If the current thread is a fork/join worker thread, the task's `fork` method will - * be invoked. Otherwise, the task will be executed on the fork/join pool. - */ -trait ForkJoinTasks extends Tasks with HavingForkJoinPool { - - trait Task[R, +Tp] extends RecursiveAction with super.Task[R, Tp] { - def start = fork - def sync = join - def tryCancel = tryUnfork - var result: R - } - - type TaskType[R, +Tp] = Task[R, Tp] - type ExecutionEnvironment = ForkJoinPool - - /** The fork/join pool of this collection. - */ - def forkJoinPool: ForkJoinPool = environment - var environment = ForkJoinTasks.defaultForkJoinPool - - /** Executes a task on a fork/join pool and waits for it to finish. - * - * $fjdispatch - */ - def executeAndWait[R, Tp](fjtask: Task[R, Tp]) { - if (currentThread.isInstanceOf[ForkJoinWorkerThread]) { - fjtask.fork - } else { - forkJoinPool.execute(fjtask) - } - fjtask.join - } - - /** Executes a task on a fork/join pool and waits for it to finish. - * Returns its result when it does. - * - * $fjdispatch - * - * @return the result of the task - */ - def executeAndWaitResult[R, Tp](fjtask: Task[R, Tp]): R = { - if (currentThread.isInstanceOf[ForkJoinWorkerThread]) { - fjtask.fork - } else { - forkJoinPool.execute(fjtask) - } - fjtask.join - fjtask.result - } - - def parallelismLevel = forkJoinPool.getParallelism - -} - -object ForkJoinTasks { - val defaultForkJoinPool = new ForkJoinPool - defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors) - defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors) -} - - -/* Some boilerplate due to no deep mixin composition. Not sure if it can be done differently without them. - */ -trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks { - - trait Task[R, Tp] extends super[ForkJoinTasks].Task[R, Tp] with super[AdaptiveWorkStealingTasks].Task[R, Tp] - -} - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala deleted file mode 100644 index 0a39dd200c..0000000000 --- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelHashTrie.scala +++ /dev/null @@ -1,137 +0,0 @@ -package scala.collection.parallel.immutable - - - - - - - -import scala.collection.parallel.ParallelMap -import scala.collection.parallel.ParallelMapLike -import scala.collection.parallel.Combiner -import scala.collection.parallel.EnvironmentPassingCombiner -import scala.collection.generic.ParallelMapFactory -import scala.collection.generic.CanCombineFrom -import scala.collection.generic.GenericParallelMapTemplate -import scala.collection.generic.GenericParallelMapCompanion -import scala.collection.immutable.HashMap - - - - - - -class ParallelHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V]) -extends ParallelMap[K, V] - with GenericParallelMapTemplate[K, V, ParallelHashTrie] - with ParallelMapLike[K, V, ParallelHashTrie[K, V], HashMap[K, V]] -{ -self => - - def this() = this(HashMap.empty[K, V]) - - override def mapCompanion: GenericParallelMapCompanion[ParallelHashTrie] = ParallelHashTrie - - override def empty: ParallelHashTrie[K, V] = new ParallelHashTrie[K, V] - - def parallelIterator = new ParallelHashTrieIterator(trie) with SCPI - - def seq = trie - - def -(k: K) = new ParallelHashTrie(trie - k) - - def +[U >: V](kv: (K, U)) = new ParallelHashTrie(trie + kv) - - def get(k: K) = trie.get(k) - - override def size = trie.size - - protected override def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]) = oldc match { - case Some(old) => old - case None => newc - } - - type SCPI = SignalContextPassingIterator[ParallelHashTrieIterator] - - class ParallelHashTrieIterator(val ht: HashMap[K, V]) - extends super.ParallelIterator { - self: SignalContextPassingIterator[ParallelHashTrieIterator] => - // println("created iterator " + ht) - var i = 0 - lazy val triter = ht.iterator - def split: Seq[ParallelIterator] = { - // println("splitting " + ht + " into " + ht.split.map(new ParallelHashTrieIterator(_) with SCPI).map(_.toList)) - ht.split.map(new ParallelHashTrieIterator(_) with SCPI) - } - def next: (K, V) = { - // println("taking next after " + i + ", in " + ht) - i += 1 - triter.next - } - def hasNext: Boolean = { - // println("hasNext: " + i + ", " + ht.size + ", " + ht) - i < ht.size - } - def remaining = ht.size - i - } - -} - - -object ParallelHashTrie extends ParallelMapFactory[ParallelHashTrie] { - def empty[K, V]: ParallelHashTrie[K, V] = new ParallelHashTrie[K, V] - - def newCombiner[K, V]: Combiner[(K, V), ParallelHashTrie[K, V]] = HashTrieCombiner[K, V] - - implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParallelHashTrie[K, V]] = { - new CanCombineFromMap[K, V] - } - - var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0) -} - - -trait HashTrieCombiner[K, V] -extends Combiner[(K, V), ParallelHashTrie[K, V]] { -self: EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] => - private var trie: HashMap[K, V] = HashMap.empty[K, V] - - def size: Int = trie.size - - def clear = trie = HashMap.empty[K, V] - - def +=(elem: (K, V)) = { trie += elem; this } - - def combine[N <: (K, V), NewTo >: ParallelHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { - // ParallelHashTrie.totalcombines.incrementAndGet - if (other.isInstanceOf[HashTrieCombiner[_, _]]) { - val that = other.asInstanceOf[HashTrieCombiner[K, V]] - val ncombiner = HashTrieCombiner[K, V] - ncombiner.trie = this.trie combine that.trie - ncombiner - } else error("Unexpected combiner type.") - } else this - - def result = new ParallelHashTrie[K, V](trie) - -} - - -object HashTrieCombiner { - def apply[K, V] = new HashTrieCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] {} -} - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelIterable.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelIterable.scala deleted file mode 100644 index 92bf5ab706..0000000000 --- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelIterable.scala +++ /dev/null @@ -1,56 +0,0 @@ -package scala.collection.parallel.immutable - - -import scala.collection.generic._ - -import scala.collection.parallel.ParallelIterableLike -import scala.collection.parallel.Combiner - - - - - -// TODO uncomment when we add parallel vectors - -///** A template trait for immutable parallel iterable collections. -// * -// * $paralleliterableinfo -// * -// * $sideeffects -// * -// * @tparam A the element type of the collection -// * -// * @author prokopec -// * @since 2.8 -// */ -//trait ParallelIterable[A] extends collection.immutable.Iterable[A] -// with collection.parallel.ParallelIterable[A] -// with GenericParallelTemplate[A, ParallelIterable] -// with ParallelIterableLike[A, ParallelIterable[A], Iterable[A]] { -// override def companion: GenericCompanion[ParallelIterable] with GenericParallelCompanion[ParallelIterable] = ParallelIterable -//} -// -///** $factoryinfo -// */ -//object ParallelIterable extends ParallelFactory[ParallelIterable] { -// implicit def canBuildFrom[A]: CanBuildFromParallel[Coll, A, ParallelIterable[A]] = -// new GenericCanBuildFromParallel[A] -// -// def newBuilder[A]: Combiner[A, ParallelIterable[A]] = null // TODO -// -// def newCombiner[A]: Combiner[A, ParallelIterable[A]] = null // TODO -//} - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala deleted file mode 100644 index 85a33c7431..0000000000 --- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelRange.scala +++ /dev/null @@ -1,88 +0,0 @@ -package scala.collection.parallel.immutable - - - -import scala.collection.immutable.Range -import scala.collection.immutable.RangeUtils -import scala.collection.parallel.ParallelSeq -import scala.collection.parallel.Combiner -import scala.collection.generic.CanCombineFrom - - - -class ParallelRange(val start: Int, val end: Int, val step: Int, val inclusive: Boolean) -extends ParallelSeq[Int] - with RangeUtils[ParallelRange] { - self => - - def seq = new Range(start, end, step) - - def length = _length - - def apply(idx: Int) = _apply(idx) - - def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = new ParallelRange(_start, _end, _step, _inclusive) - - def parallelIterator = new ParallelRangeIterator with SCPI - - override def toString = seq.toString // TODO - - type SCPI = SignalContextPassingIterator[ParallelRangeIterator] - - class ParallelRangeIterator - (var start: Int = self.start, val end: Int = self.end, val step: Int = self.step, val inclusive: Boolean = self.inclusive) - extends ParallelIterator with RangeUtils[ParallelRangeIterator] { - me: SignalContextPassingIterator[ParallelRangeIterator] => - def remaining = _length - def next = { val r = start; start += step; r } - def hasNext = remaining > 0 - def split: Seq[ParallelIterator] = psplit(remaining / 2, remaining - remaining / 2) - def psplit(sizes: Int*): Seq[ParallelIterator] = { - val incr = sizes.scanLeft(0)(_ + _) - for ((from, until) <- incr.init zip incr.tail) yield _slice(from, until) - } - def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = { - new ParallelRangeIterator(_start, _end, _step, _inclusive) with SCPI - } - - override def toString = "ParallelRangeIterator(" + start + ", " + end + ", " + step + ", incl: " + inclusive + ")" - - /* accessors */ - - override def foreach[U](f: Int => U): Unit = { - _foreach(f) - start = end + step - } - - override def reduce[U >: Int](op: (U, U) => U): U = { - var sum = next - for (elem <- this) sum += elem - sum - } - - /* transformers */ - - override def map2combiner[S, That](f: Int => S, cb: Combiner[S, That]): Combiner[S, That] = { - //val cb = pbf(self.repr) - val sz = remaining - cb.sizeHint(sz) - if (sz > 0) { - val last = _last - while (start != last) { - f(start) - start += step - } - } - cb - } - - } - -} - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala b/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala deleted file mode 100644 index ceb0dcc13d..0000000000 --- a/src/parallel-collections/scala/collection/parallel/immutable/ParallelSeq.scala +++ /dev/null @@ -1,47 +0,0 @@ -package scala.collection.parallel.immutable - - -import scala.collection.generic.GenericParallelTemplate -import scala.collection.generic.GenericCompanion -import scala.collection.generic.GenericParallelCompanion -import scala.collection.generic.CanCombineFrom -import scala.collection.generic.ParallelFactory -import scala.collection.parallel.ParallelSeqLike -import scala.collection.parallel.Combiner - - - - - - -// TODO uncomment when we add parallel vectors - -///** An immutable variant of `ParallelSeq`. -// * -// * @define Coll mutable.ParallelSeq -// * @define coll mutable parallel sequence -// */ -//trait ParallelSeq[A] extends collection.immutable.IndexedSeq[A] -// with ParallelIterable[A] -// with collection.parallel.ParallelSeq[A] -// with GenericParallelTemplate[A, ParallelSeq] -// with ParallelSeqLike[A, ParallelSeq[A], Seq[A]] { -// override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq -// -//} -// -// -///** $factoryInfo -// * @define Coll mutable.ParallelSeq -// * @define coll mutable parallel sequence -// */ -//object ParallelSeq extends ParallelFactory[ParallelSeq] { -// implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeq[T]] = new GenericCanBuildFromParallel[T] -// -// def newBuilder[A]: Combiner[A, ParallelSeq[A]] = null // TODO -// -// def newCombiner[A]: Combiner[A, ParallelSeq[A]] = null // TODO -//} - - - diff --git a/src/parallel-collections/scala/collection/parallel/immutable/package.scala b/src/parallel-collections/scala/collection/parallel/immutable/package.scala deleted file mode 100644 index 054786afaf..0000000000 --- a/src/parallel-collections/scala/collection/parallel/immutable/package.scala +++ /dev/null @@ -1,56 +0,0 @@ -package scala.collection.parallel - - - - - - - - - - - -package object immutable { - - /** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method. - * - * @tparam T type of the elements - * @param elem the element in the repetition - * @param length the length of the collection - */ - private[parallel] class Repetition[T](elem: T, val length: Int) extends ParallelSeq[T] { - self => - - def apply(idx: Int) = if (0 <= idx && idx < length) elem else throw new IndexOutOfBoundsException - def seq = throw new UnsupportedOperationException - def update(idx: Int, elem: T) = throw new UnsupportedOperationException - - type SCPI = SignalContextPassingIterator[ParallelIterator] - - class ParallelIterator(var i: Int = 0, val until: Int = length, elem: T = self.elem) extends super.ParallelIterator { - me: SignalContextPassingIterator[ParallelIterator] => - def remaining = until - i - def hasNext = i < until - def next = { i += 1; elem } - def psplit(sizes: Int*) = { - val incr = sizes.scanLeft(0)(_ + _) - for ((start, end) <- incr.init zip incr.tail) yield new ParallelIterator(i + start, (i + end) min until, elem) with SCPI - } - def split = psplit(remaining / 2, remaining - remaining / 2) - } - - def parallelIterator = new ParallelIterator with SCPI - - } - -} - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala b/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala deleted file mode 100644 index bd17d24ea8..0000000000 --- a/src/parallel-collections/scala/collection/parallel/mutable/LazyCombiner.scala +++ /dev/null @@ -1,43 +0,0 @@ -package scala.collection.parallel.mutable - - - - -import scala.collection.generic.Growable -import scala.collection.generic.Sizing -import scala.collection.mutable.ArrayBuffer -import scala.collection.parallel.Combiner - - - - -/** Implements combining contents of two combiners - * by postponing the operation until `result` method is called. It chains - * the leaf results together instead of evaluating the actual collection. - * - * @tparam Elem the type of the elements in the combiner - * @tparam To the type of the collection the combiner produces - * @tparam Buff the type of the buffers that contain leaf results and this combiner chains together - */ -trait LazyCombiner[Elem, +To, Buff <: Growable[Elem] with Sizing] extends Combiner[Elem, To] -{ - self: collection.parallel.EnvironmentPassingCombiner[Elem, To] => - val chain: ArrayBuffer[Buff] - val lastbuff = chain.last - def +=(elem: Elem) = { lastbuff += elem; this } - def result: To = allocateAndCopy - def clear = { chain.clear } - def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { - if (other.isInstanceOf[LazyCombiner[_, _, _]]) { - val that = other.asInstanceOf[LazyCombiner[Elem, To, Buff]] - newLazyCombiner(chain ++= that.chain) - } else throw new UnsupportedOperationException("Cannot combine with combiner of different type.") - } else this - def size = chain.foldLeft(0)(_ + _.size) - - /** Method that allocates the data structure and copies elements into it using - * `size` and `chain` members. - */ - def allocateAndCopy: To - def newLazyCombiner(buffchain: ArrayBuffer[Buff]): LazyCombiner[Elem, To, Buff] -} diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala deleted file mode 100644 index 30b4b109b2..0000000000 --- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArray.scala +++ /dev/null @@ -1,561 +0,0 @@ -package scala.collection.parallel.mutable - - - -import scala.collection.generic.GenericParallelTemplate -import scala.collection.generic.GenericCompanion -import scala.collection.generic.GenericParallelCompanion -import scala.collection.generic.CanCombineFrom -import scala.collection.generic.ParallelFactory -import scala.collection.generic.Sizing -import scala.collection.parallel.Combiner -import scala.collection.parallel.ParallelSeqLike -import scala.collection.parallel.CHECK_RATE -import scala.collection.mutable.ArraySeq -import scala.collection.mutable.Builder -import scala.collection.Sequentializable - - - - -/** Parallel sequence holding elements in a linear array. - * - * `ParallelArray` is a parallel sequence with a predefined size. The size of the array - * cannot be changed after it's been created. - * - * `ParallelArray` internally keeps an array containing the elements. This means that - * bulk operations based on traversal are fast, but those returning a parallel array as a result - * are slightly slower. The reason for this is that `ParallelArray` uses lazy builders that - * create the internal data array only after the size of the array is known. The fragments - * are then copied into the resulting data array in parallel using fast array copy operations. - * Operations for which the resulting array size is known in advance are optimised to use this - * information. - * - * @tparam T type of the elements in the array - * - * @define Coll ParallelArray - * @define coll parallel array - */ -class ParallelArray[T] private[mutable] (val arrayseq: ArraySeq[T]) -extends ParallelSeq[T] - with GenericParallelTemplate[T, ParallelArray] - with ParallelSeqLike[T, ParallelArray[T], ArraySeq[T]] -{ - self => - - private val array: Array[Any] = arrayseq.array.asInstanceOf[Array[Any]] - - override def companion: GenericCompanion[ParallelArray] with GenericParallelCompanion[ParallelArray] = ParallelArray - - def this(sz: Int) = this { - require(sz >= 0) - new ArraySeq[T](sz) - } - - def apply(i: Int) = array(i).asInstanceOf[T] - - def update(i: Int, elem: T) = array(i) = elem - - def length = arrayseq.length - - def seq = arrayseq - - type SCPI = SignalContextPassingIterator[ParallelArrayIterator] - - def parallelIterator: ParallelArrayIterator = { - val pit = new ParallelArrayIterator with SCPI - pit - } - - class ParallelArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array) - extends super.ParallelIterator { - me: SignalContextPassingIterator[ParallelArrayIterator] => - - def hasNext = i < until - - def next = { - val elem = arr(i) - i += 1 - elem.asInstanceOf[T] - } - - def remaining = until - i - - def psplit(sizesIncomplete: Int*): Seq[ParallelIterator] = { - var traversed = i - val total = sizesIncomplete.reduceLeft(_ + _) - val left = remaining - val sizes = if (total >= left) sizesIncomplete else sizesIncomplete :+ (left - total) - for (sz <- sizes) yield if (traversed < until) { - val start = traversed - val end = (traversed + sz) min until - traversed = end - new ParallelArrayIterator(start, end, arr) with SCPI - } else { - new ParallelArrayIterator(traversed, traversed, arr) with SCPI - } - } - - override def split: Seq[ParallelIterator] = { - val left = remaining - if (left >= 2) { - val splitpoint = left / 2 - Seq(new ParallelArrayIterator(i, i + splitpoint, arr) with SCPI, - new ParallelArrayIterator(i + splitpoint, until, arr) with SCPI) - } else { - Seq(this) - } - } - - override def toString = "ParallelArrayIterator(" + i + ", " + until + ")" - - /* overrides for efficiency */ - - /* accessors */ - - override def foreach[U](f: T => U) = { - foreach_quick(f, arr, until, i) - i = until - } - - private def foreach_quick[U](f: T => U, a: Array[Any], ntil: Int, from: Int) = { - var j = from - while (j < ntil) { - f(a(j).asInstanceOf[T]) - j += 1 - } - } - - override def count(p: T => Boolean) = { - val c = count_quick(p, arr, until, i) - i = until - c - } - - private def count_quick(p: T => Boolean, a: Array[Any], ntil: Int, from: Int) = { - var cnt = 0 - var j = from - while (j < ntil) { - if (p(a(j).asInstanceOf[T])) cnt += 1 - j += 1 - } - cnt - } - - override def foldLeft[S](z: S)(op: (S, T) => S): S = { - val r = foldLeft_quick(arr, until, op, z) - i = until - r - } - - private def foldLeft_quick[S](a: Array[Any], ntil: Int, op: (S, T) => S, z: S): S = { - var j = i - var sum = z - while (j < ntil) { - sum = op(sum, a(j).asInstanceOf[T]) - j += 1 - } - sum - } - - def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = foldLeft[S](z)(seqop) - - override def sum[U >: T](implicit num: Numeric[U]): U = { - var s = sum_quick(num, arr, until, i, num.zero) - i = until - s - } - - private def sum_quick[U >: T](num: Numeric[U], a: Array[Any], ntil: Int, from: Int, zero: U): U = { - var j = from - var sum = zero - while (j < ntil) { - sum = num.plus(sum, a(j).asInstanceOf[T]) - j += 1 - } - sum - } - - override def product[U >: T](implicit num: Numeric[U]): U = { - var p = product_quick(num, arr, until, i, num.one) - i = until - p - } - - private def product_quick[U >: T](num: Numeric[U], a: Array[Any], ntil: Int, from: Int, one: U): U = { - var j = from - var prod = one - while (j < ntil) { - prod = num.times(prod, a(j).asInstanceOf[T]) - j += 1 - } - prod - } - - override def forall(p: T => Boolean): Boolean = { - if (isAborted) return false - - var all = true - while (i < until) { - val nextuntil = if (i + CHECK_RATE > until) until else i + CHECK_RATE - - all = forall_quick(p, array, nextuntil, i) - if (all) i = nextuntil - else { - i = until - abort - } - - if (isAborted) return false - } - all - } - - // it's faster to use a separate small method - private def forall_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Boolean = { - var j = start - while (j < nextuntil) { - if (p(a(j).asInstanceOf[T])) j += 1 - else return false - } - return true - } - - override def exists(p: T => Boolean): Boolean = { - if (isAborted) return true - - var some = false - while (i < until) { - val nextuntil = if (i + CHECK_RATE > until) until else i + CHECK_RATE - - some = exists_quick(p, array, nextuntil, i) - if (some) { - i = until - abort - } else i = nextuntil - - if (isAborted) return true - } - some - } - - // faster to use separate small method - private def exists_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Boolean = { - var j = start - while (j < nextuntil) { - if (p(a(j).asInstanceOf[T])) return true - else j += 1 - } - return false - } - - override def find(p: T => Boolean): Option[T] = { - if (isAborted) return None - - var r: Option[T] = None - while (i < until) { - val nextuntil = if ((i + CHECK_RATE) < until) (i + CHECK_RATE) else until - - r = find_quick(p, array, nextuntil, i) - - if (r != None) { - i = until - abort - } else i = nextuntil - - if (isAborted) return r - } - r - } - - private def find_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Option[T] = { - var j = start - while (j < nextuntil) { - val elem = a(j).asInstanceOf[T] - if (p(elem)) return Some(elem) - else j += 1 - } - return None - } - - override def drop(n: Int): ParallelArrayIterator = { - i += n - this - } - - override def copyToArray[U >: T](array: Array[U], from: Int, len: Int) { - val totallen = (self.length - i) min len min (array.length - from) - Array.copy(arr, i, array, from, totallen) - i += totallen - } - - override def prefixLength(pred: T => Boolean): Int = { - val r = prefixLength_quick(pred, arr, until, i) - i += r + 1 - r - } - - private def prefixLength_quick(pred: T => Boolean, a: Array[Any], ntil: Int, startpos: Int): Int = { - var j = startpos - var endpos = ntil - while (j < endpos) { - if (pred(a(j).asInstanceOf[T])) j += 1 - else endpos = j - } - endpos - startpos - } - - override def indexWhere(pred: T => Boolean): Int = { - val r = indexWhere_quick(pred, arr, until, i) - val ret = if (r != -1) r - i else r - i = until - ret - } - - private def indexWhere_quick(pred: T => Boolean, a: Array[Any], ntil: Int, from: Int): Int = { - var j = from - var pos = -1 - while (j < ntil) { - if (pred(a(j).asInstanceOf[T])) { - pos = j - j = ntil - } else j += 1 - } - pos - } - - override def lastIndexWhere(pred: T => Boolean): Int = { - val r = lastIndexWhere_quick(pred, arr, i, until) - val ret = if (r != -1) r - i else r - i = until - ret - } - - private def lastIndexWhere_quick(pred: T => Boolean, a: Array[Any], from: Int, ntil: Int): Int = { - var pos = -1 - var j = ntil - 1 - while (j >= from) { - if (pred(a(j).asInstanceOf[T])) { - pos = j - j = -1 - } else j -= 1 - } - pos - } - - override def sameElements(that: Iterator[_]): Boolean = { - var same = true - while (i < until && that.hasNext) { - if (arr(i) != that.next) { - i = until - same = false - } - i += 1 - } - same - } - - /* transformers */ - - override def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = { - //val cb = cbf(self.repr) - cb.sizeHint(remaining) - map2combiner_quick(f, arr, cb, until, i) - i = until - cb - } - - private def map2combiner_quick[S, That](f: T => S, a: Array[Any], cb: Builder[S, That], ntil: Int, from: Int) { - var j = from - while (j < ntil) { - cb += f(a(j).asInstanceOf[T]) - j += 1 - } - } - - override def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[ParallelArray[T], S, That]): Combiner[S, That] = { - val cb = pbf(self.repr) - collect2combiner_quick(pf, arr, cb, until, i) - i = until - cb - } - - private def collect2combiner_quick[S, That](pf: PartialFunction[T, S], a: Array[Any], cb: Builder[S, That], ntil: Int, from: Int) { - var j = from - while (j < ntil) { - val curr = a(j).asInstanceOf[T] - if (pf.isDefinedAt(curr)) cb += pf(curr) - j += 1 - } - } - - override def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[ParallelArray[T], S, That]): Combiner[S, That] = { - val cb = pbf(self.repr) - while (i < until) { - val traversable = f(arr(i).asInstanceOf[T]) - if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator - else cb ++= traversable - i += 1 - } - cb - } - - override def filter2combiner[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Combiner[U, This]) = { - filter2combiner_quick(pred, cb, arr, until, i) - i = until - cb - } - - private def filter2combiner_quick[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Builder[U, This], a: Array[Any], ntil: Int, from: Int) { - var j = i - while(j < ntil) { - var curr = a(j).asInstanceOf[T] - if (pred(curr)) cb += curr - j += 1 - } - } - - override def filterNot2combiner[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Combiner[U, This]) = { - filterNot2combiner_quick(pred, cb, arr, until, i) - i = until - cb - } - - private def filterNot2combiner_quick[U >: T, This >: ParallelArray[T]](pred: T => Boolean, cb: Builder[U, This], a: Array[Any], ntil: Int, from: Int) { - var j = i - while(j < ntil) { - var curr = a(j).asInstanceOf[T] - if (!pred(curr)) cb += curr - j += 1 - } - } - - override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = { - cb.sizeHint(remaining) - cb.ifIs[ParallelArrayCombiner[T]] { pac => - val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]] - Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i) - pac.lastbuff.setInternalSize(remaining) - } otherwise { - copy2builder_quick(cb, arr, until, i) - i = until - } - cb - } - - private def copy2builder_quick[U >: T, Coll](b: Builder[U, Coll], a: Array[Any], ntil: Int, from: Int) { - var j = from - while (j < ntil) { - b += a(j).asInstanceOf[T] - j += 1 - } - } - - override def partition2combiners[U >: T, This >: ParallelArray[T]](pred: T => Boolean, btrue: Combiner[U, This], bfalse: Combiner[U, This]) = { - partition2combiners_quick(pred, btrue, bfalse, arr, until, i) - i = until - (btrue, bfalse) - } - - private def partition2combiners_quick[U >: T, This >: ParallelArray[T]](p: T => Boolean, btrue: Builder[U, This], bfalse: Builder[U, This], a: Array[Any], ntil: Int, from: Int) { - var j = from - while (j < ntil) { - val curr = a(j).asInstanceOf[T] - if (p(curr)) btrue += curr else bfalse += curr - j += 1 - } - } - - override def take2combiner[U >: T, This >: ParallelArray[T]](n: Int, cb: Combiner[U, This]) = { - cb.sizeHint(n) - val ntil = i + n - val a = arr - while (i < ntil) { - cb += a(i).asInstanceOf[T] - i += 1 - } - cb - } - - override def drop2combiner[U >: T, This >: ParallelArray[T]](n: Int, cb: Combiner[U, This]) = { - drop(n) - cb.sizeHint(remaining) - while (i < until) { - cb += arr(i).asInstanceOf[T] - i += 1 - } - cb - } - - override def reverse2combiner[U >: T, This >: ParallelArray[T]](cb: Combiner[U, This]): Combiner[U, This] = { - cb.ifIs[ParallelArrayCombiner[T]] { pac => - val sz = remaining - pac.sizeHint(sz) - val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]] - reverse2combiner_quick(targetarr, arr, i, until) - pac.lastbuff.setInternalSize(sz) - pac - } otherwise super.reverse2combiner(cb) - cb - } - - private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) { - var j = from - var k = ntil - from - 1 - while (j < ntil) { - targ(k) = a(j) - j += 1 - k -= 1 - } - } - - } - -} - - - - - -object ParallelArray extends ParallelFactory[ParallelArray] { - implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelArray[T]] = new GenericCanCombineFrom[T] - def newBuilder[T]: Combiner[T, ParallelArray[T]] = newCombiner - def newCombiner[T]: Combiner[T, ParallelArray[T]] = ParallelArrayCombiner[T] - - /** Creates a new parallel array by wrapping the specified array. - */ - def handoff[T <: AnyRef](arr: Array[T]): ParallelArray[T] = { - new ParallelArray[T](new ExposedArraySeq[T](arr.asInstanceOf[Array[AnyRef]], arr.length)) - } - - def createFromCopy[T <: AnyRef : ClassManifest](arr: Array[T]): ParallelArray[T] = { - val newarr = new Array[T](arr.length) - Array.copy(arr, 0, newarr, 0, arr.length) - handoff(newarr) - } - -} - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala deleted file mode 100644 index 2991344be2..0000000000 --- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelArrayCombiner.scala +++ /dev/null @@ -1,105 +0,0 @@ -package scala.collection.parallel.mutable - - - - - -import scala.collection.generic.Sizing -import scala.collection.mutable.ArraySeq -import scala.collection.mutable.ArrayBuffer -import scala.collection.parallel.TaskSupport -import scala.collection.parallel.EnvironmentPassingCombiner - - - - - - - -trait ParallelArrayCombiner[T] -extends LazyCombiner[T, ParallelArray[T], ExposedArrayBuffer[T]] - with TaskSupport { - self: EnvironmentPassingCombiner[T, ParallelArray[T]] => - - override def sizeHint(sz: Int) = if (chain.length == 1) chain(0).sizeHint(sz) - - def newLazyCombiner(c: ArrayBuffer[ExposedArrayBuffer[T]]) = ParallelArrayCombiner(c) - - def allocateAndCopy = if (chain.size > 1) { - val arrayseq = new ArraySeq[T](size) - val array = arrayseq.array.asInstanceOf[Array[Any]] - - executeAndWait(new CopyChainToArray(array, 0, size)) - - new ParallelArray(arrayseq) - } else { // optimisation if there is only 1 array - val pa = new ParallelArray(new ExposedArraySeq[T](chain(0).internalArray, size)) - pa - } - - override def toString = "ParallelArrayCombiner(" + size + "): " + chain - - /* tasks */ - - class CopyChainToArray(array: Array[Any], offset: Int, howmany: Int) extends super.Task[Unit, CopyChainToArray] { - var result = () - def leaf(prev: Option[Unit]) = if (howmany > 0) { - var totalleft = howmany - val (stbuff, stind) = findStart(offset) - var buffind = stbuff - var ind = stind - var arrayIndex = offset - while (totalleft > 0) { - val currbuff = chain(buffind) - val chunksize = if (totalleft < (currbuff.size - ind)) totalleft else currbuff.size - ind - val until = ind + chunksize - - copyChunk(currbuff.internalArray, ind, array, arrayIndex, until) - arrayIndex += chunksize - ind += chunksize - - totalleft -= chunksize - buffind += 1 - ind = 0 - } - } - private def copyChunk(buffarr: Array[AnyRef], buffStart: Int, ra: Array[Any], arrayStart: Int, until: Int) { - Array.copy(buffarr, buffStart, ra, arrayStart, until - buffStart) - } - private def findStart(pos: Int) = { - var left = pos - var buffind = 0 - while (left >= chain(buffind).size) { - left -= chain(buffind).size - buffind += 1 - } - (buffind, left) - } - def split = { - val fp = howmany / 2 - List(new CopyChainToArray(array, offset, fp), new CopyChainToArray(array, offset + fp, howmany - fp)) - } - def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(size, parallelismLevel) - } - -} - - -object ParallelArrayCombiner { - def apply[T](c: ArrayBuffer[ExposedArrayBuffer[T]]): ParallelArrayCombiner[T] = { - new { val chain = c } with ParallelArrayCombiner[T] with EnvironmentPassingCombiner[T, ParallelArray[T]] - } - def apply[T]: ParallelArrayCombiner[T] = apply(new ArrayBuffer[ExposedArrayBuffer[T]] += new ExposedArrayBuffer[T]) -} - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala deleted file mode 100644 index bd0a46bc43..0000000000 --- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelIterable.scala +++ /dev/null @@ -1,51 +0,0 @@ -package scala.collection.parallel.mutable - - -import scala.collection.generic._ - -import scala.collection.parallel.ParallelIterableLike -import scala.collection.parallel.Combiner - - -/** A template trait for parallel iterable collections. - * - * $paralleliterableinfo - * - * $sideeffects - * - * @tparam T the element type of the collection - * - * @author prokopec - * @since 2.8 - */ -trait ParallelIterable[T] extends collection.mutable.Iterable[T] - with collection.parallel.ParallelIterable[T] - with GenericParallelTemplate[T, ParallelIterable] - with ParallelIterableLike[T, ParallelIterable[T], Iterable[T]] { - override def companion: GenericCompanion[ParallelIterable] with GenericParallelCompanion[ParallelIterable] = ParallelIterable -} - -/** $factoryinfo - */ -object ParallelIterable extends ParallelFactory[ParallelIterable] { - implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelIterable[T]] = - new GenericCanCombineFrom[T] - - def newBuilder[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T] - - def newCombiner[T]: Combiner[T, ParallelIterable[T]] = ParallelArrayCombiner[T] -} - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala b/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala deleted file mode 100644 index 636ba1ac3d..0000000000 --- a/src/parallel-collections/scala/collection/parallel/mutable/ParallelSeq.scala +++ /dev/null @@ -1,61 +0,0 @@ -package scala.collection.parallel.mutable - - -import scala.collection.generic.GenericParallelTemplate -import scala.collection.generic.GenericCompanion -import scala.collection.generic.GenericParallelCompanion -import scala.collection.generic.CanCombineFrom -import scala.collection.generic.ParallelFactory -import scala.collection.parallel.ParallelSeqLike -import scala.collection.parallel.Combiner - - - - - - - -/** A mutable variant of `ParallelSeq`. - * - * @define Coll mutable.ParallelSeq - * @define coll mutable parallel sequence - */ -trait ParallelSeq[T] extends collection.mutable.Seq[T] - with ParallelIterable[T] - with collection.parallel.ParallelSeq[T] - with GenericParallelTemplate[T, ParallelSeq] - with ParallelSeqLike[T, ParallelSeq[T], Seq[T]] { - self => - override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq - - def update(i: Int, elem: T): Unit - -} - - -/** $factoryInfo - * @define Coll mutable.ParallelSeq - * @define coll mutable parallel sequence - */ -object ParallelSeq extends ParallelFactory[ParallelSeq] { - implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParallelSeq[T]] = new GenericCanCombineFrom[T] - - def newBuilder[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T] - - def newCombiner[T]: Combiner[T, ParallelSeq[T]] = ParallelArrayCombiner[T] -} - - - - - - - - - - - - - - - diff --git a/src/parallel-collections/scala/collection/parallel/mutable/package.scala b/src/parallel-collections/scala/collection/parallel/mutable/package.scala deleted file mode 100644 index f670c7b7c5..0000000000 --- a/src/parallel-collections/scala/collection/parallel/mutable/package.scala +++ /dev/null @@ -1,32 +0,0 @@ -package scala.collection.parallel - - - -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.ArraySeq -import scala.collection.generic.Sizing - - - -package object mutable { - - /* hack-arounds */ - - private[mutable] class ExposedArrayBuffer[T] extends ArrayBuffer[T] with Sizing { - def internalArray = array - def setInternalSize(s: Int) = size0 = s - override def sizeHint(len: Int) = { // delete once we start using 2.8.RC1+ - if (len > size && len >= 1) { - val newarray = new Array[AnyRef](len) - Array.copy(array, 0, newarray, 0, size0) - array = newarray - } - } - } - - private[mutable] class ExposedArraySeq[T](arr: Array[AnyRef], sz: Int) extends ArraySeq[T](sz) { - override val array = arr - override val length = sz - } - -} \ No newline at end of file diff --git a/src/parallel-collections/scala/collection/parallel/package.scala b/src/parallel-collections/scala/collection/parallel/package.scala deleted file mode 100644 index 3b297f1cd1..0000000000 --- a/src/parallel-collections/scala/collection/parallel/package.scala +++ /dev/null @@ -1,70 +0,0 @@ -package scala.collection - - -import java.lang.Thread._ - -import scala.collection.generic.CanBuildFrom -import scala.collection.generic.CanCombineFrom - - -/** Package object for parallel collections. - */ -package object parallel { - val MIN_FOR_COPY = -1 // TODO: set to 5000 - val CHECK_RATE = 512 - - /** Computes threshold from the size of the collection and the parallelism level. - */ - def thresholdFromSize(sz: Int, parallelismLevel: Int) = { - val p = parallelismLevel - if (p > 1) 1 + sz / (8 * p) - else sz - } - - /** An implicit conversion providing arrays with a `par` method, which - * returns a parallel array. - * - * @tparam T type of the elements in the array, which is a subtype of AnyRef - * @param array the array to be parallelized - * @return a `Parallelizable` object with a `par` method - */ - implicit def array2ParallelArray[T <: AnyRef](array: Array[T]) = new Parallelizable[T, mutable.ParallelArray[T]] { - def par = mutable.ParallelArray.handoff[T](array) - } - - implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new { - def isParallel = bf.isInstanceOf[Parallel] - def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]] - def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new { - def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody - } - } - - implicit def traversable2ops[T](t: TraversableOnce[T]) = new { - def isParallel = t.isInstanceOf[Parallel] - def isParallelIterable = t.isInstanceOf[ParallelIterable[_]] - def asParallelIterable = t.asInstanceOf[ParallelIterable[T]] - def isParallelSeq = t.isInstanceOf[ParallelSeq[_]] - def asParallelSeq = t.asInstanceOf[ParallelSeq[T]] - def ifParallelSeq[R](isbody: ParallelSeq[T] => R) = new { - def otherwise(notbody: => R) = if (isParallel) isbody(asParallelSeq) else notbody - } - } - -} - - - - - - - - - - - - - - - - -- cgit v1.2.3