summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2011-01-07 12:05:31 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2011-01-07 12:05:31 +0000
commit733a3d756943e67d7608dbfd92aac445a080d69d (patch)
tree6ce9bb74cee997212845c6fac15c8923b54c55ec /src/library
parent07edcee629fb00808fb695ba4ec68a26f3e99490 (diff)
downloadscala-733a3d756943e67d7608dbfd92aac445a080d69d.tar.gz
scala-733a3d756943e67d7608dbfd92aac445a080d69d.tar.bz2
scala-733a3d756943e67d7608dbfd92aac445a080d69d.zip
Implemented a (slower) workaround for parallel ...
Implemented a (slower) workaround for parallel vectors. Implemented group by. No review.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/collection/immutable/HashMap.scala8
-rw-r--r--src/library/scala/collection/immutable/Vector.scala27
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala43
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala31
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala82
-rw-r--r--src/library/scala/collection/parallel/immutable/ParIterable.scala8
-rw-r--r--src/library/scala/collection/parallel/immutable/ParRange.scala5
-rw-r--r--src/library/scala/collection/parallel/immutable/ParSeq.scala46
-rw-r--r--src/library/scala/collection/parallel/immutable/ParVector.scala115
-rw-r--r--src/library/scala/collection/parallel/immutable/ParallelSeq.scala.disabled44
-rw-r--r--src/library/scala/collection/parallel/package.scala13
11 files changed, 323 insertions, 99 deletions
diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala
index 5b20638a84..a21e71158e 100644
--- a/src/library/scala/collection/immutable/HashMap.scala
+++ b/src/library/scala/collection/immutable/HashMap.scala
@@ -75,7 +75,7 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Par
protected type Merger[B1] = ((A, B1), (A, B1)) => (A, B1)
- protected def get0(key: A, hash: Int, level: Int): Option[B] = None
+ private[collection] def get0(key: A, hash: Int, level: Int): Option[B] = None
private[collection] def updated0[B1 >: B](key: A, hash: Int, level: Int, value: B1, kv: (A, B1), merger: Merger[B1]): HashMap[A, B1] =
new HashMap.HashMap1(key, hash, value, kv)
@@ -117,7 +117,7 @@ object HashMap extends ImmutableMapFactory[HashMap] {
// TODO: add HashMap2, HashMap3, ...
- class HashMap1[A,+B](private[HashMap] var key: A, private[HashMap] var hash: Int, private[HashMap] var value: (B @uncheckedVariance), private[HashMap] var kv: (A,B @uncheckedVariance)) extends HashMap[A,B] {
+ class HashMap1[A,+B](private[HashMap] var key: A, private[HashMap] var hash: Int, private[collection] var value: (B @uncheckedVariance), private[collection] var kv: (A,B @uncheckedVariance)) extends HashMap[A,B] {
override def size = 1
private[collection] def getKey = key
@@ -188,7 +188,7 @@ object HashMap extends ImmutableMapFactory[HashMap] {
}
}
- private class HashMapCollision1[A,+B](private[HashMap] var hash: Int, var kvs: ListMap[A,B @uncheckedVariance]) extends HashMap[A,B] {
+ private[collection] class HashMapCollision1[A,+B](private[HashMap] var hash: Int, var kvs: ListMap[A,B @uncheckedVariance]) extends HashMap[A,B] {
override def size = kvs.size
override def get0(key: A, hash: Int, level: Int): Option[B] =
@@ -230,7 +230,7 @@ object HashMap extends ImmutableMapFactory[HashMap] {
}
}
- class HashTrieMap[A,+B](private[HashMap] var bitmap: Int, private[HashMap] var elems: Array[HashMap[A,B @uncheckedVariance]],
+ class HashTrieMap[A,+B](private[HashMap] var bitmap: Int, private[collection] var elems: Array[HashMap[A,B @uncheckedVariance]],
private[HashMap] var size0: Int) extends HashMap[A,B] {
/*
def this (level: Int, m1: HashMap1[A,B], m2: HashMap1[A,B]) = {
diff --git a/src/library/scala/collection/immutable/Vector.scala b/src/library/scala/collection/immutable/Vector.scala
index aeb3b82147..faee7aa60e 100644
--- a/src/library/scala/collection/immutable/Vector.scala
+++ b/src/library/scala/collection/immutable/Vector.scala
@@ -15,6 +15,7 @@ import compat.Platform
import scala.collection.generic._
import scala.collection.mutable.Builder
+import scala.collection.parallel.immutable.ParVector
object Vector extends SeqFactory[Vector] {
@@ -32,11 +33,14 @@ object Vector extends SeqFactory[Vector] {
// in principle, most members should be private. however, access privileges must
// be carefully chosen to not prevent method inlining
-final class Vector[+A](startIndex: Int, endIndex: Int, focus: Int) extends IndexedSeq[A]
- with GenericTraversableTemplate[A, Vector]
- with IndexedSeqLike[A, Vector[A]]
- with VectorPointer[A @uncheckedVariance]
- with Serializable { self =>
+final class Vector[+A](private[collection] val startIndex: Int, private[collection] val endIndex: Int, focus: Int)
+extends IndexedSeq[A]
+ with GenericTraversableTemplate[A, Vector]
+ with IndexedSeqLike[A, Vector[A]]
+ with VectorPointer[A @uncheckedVariance]
+ with Serializable
+ with Parallelizable[ParVector[A]]
+{ self =>
override def companion: GenericCompanion[Vector] = Vector
@@ -49,14 +53,19 @@ override def companion: GenericCompanion[Vector] = Vector
def length = endIndex - startIndex
- override def lengthCompare(len: Int): Int = length - len
+ def par = new ParVector(this)
+ override def lengthCompare(len: Int): Int = length - len
- @inline override def iterator: VectorIterator[A] = {
- val s = new VectorIterator[A](startIndex, endIndex)
+ private[collection] final def initIterator[B >: A](s: VectorIterator[B]) {
s.initFrom(this)
if (dirty) s.stabilize(focus)
if (s.depth > 1) s.gotoPos(startIndex, startIndex ^ focus)
+ }
+
+ @inline override def iterator: VectorIterator[A] = {
+ val s = new VectorIterator[A](startIndex, endIndex)
+ initIterator(s)
s
}
@@ -602,7 +611,7 @@ override def companion: GenericCompanion[Vector] = Vector
}
-final class VectorIterator[+A](_startIndex: Int, _endIndex: Int) extends Iterator[A] with VectorPointer[A @uncheckedVariance] {
+class VectorIterator[+A](_startIndex: Int, _endIndex: Int) extends Iterator[A] with VectorPointer[A @uncheckedVariance] {
private var blockIndex: Int = _startIndex & ~31
private var lo: Int = _startIndex & 31
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 3008f93ebd..9f894c0af8 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -10,12 +10,12 @@ import scala.collection.Parallel
import scala.collection.Parallelizable
import scala.collection.Sequentializable
import scala.collection.generic._
-
+import immutable.HashMapCombiner
import java.util.concurrent.atomic.AtomicBoolean
+import annotation.unchecked.uncheckedVariance
-import annotation.unchecked.uncheckedStable
// TODO update docs!!
@@ -520,6 +520,12 @@ self =>
executeAndWaitResult(new Partition(pred, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) })
}
+ // override def groupBy[K](f: T => K): immutable.ParMap[K, Repr] = {
+ // executeAndWaitResult(new GroupBy(f, () => HashMapCombiner[K, T], parallelIterator) mapResult {
+ // rcb => rcb.groupByKey(cbfactory)
+ // })
+ // }
+
override def take(n: Int): Repr = {
val actualn = if (size > n) n else size
if (actualn < MIN_FOR_COPY) take_sequential(actualn)
@@ -893,9 +899,9 @@ self =>
def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf(self.repr))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new FlatMap(f, pbf, p)
override def merge(that: FlatMap[S, That]) = {
- debuglog("merging " + result + " and " + that.result)
+ //debuglog("merging " + result + " and " + that.result)
result = result combine that.result
- debuglog("merged into " + result)
+ //debuglog("merged into " + result)
}
}
@@ -956,6 +962,29 @@ self =>
override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2)
}
+ protected[this] class GroupBy[K, U >: T](
+ f: U => K,
+ mcf: () => HashMapCombiner[K, U],
+ protected[this] val pit: ParIterableIterator[T]
+ ) extends Transformer[HashMapCombiner[K, U], GroupBy[K, U]] {
+ @volatile var result: Result = null
+ final def leaf(prev: Option[Result]) = {
+ // note: HashMapCombiner doesn't merge same keys until evaluation
+ val cb = mcf()
+ while (pit.hasNext) {
+ val elem = pit.next
+ cb += f(elem) -> elem
+ }
+ result = cb
+ }
+ protected[this] def newSubtask(p: ParIterableIterator[T]) = new GroupBy(f, mcf, p)
+ override def merge(that: GroupBy[K, U]) = {
+ // note: this works because we know that a HashMapCombiner doesn't merge same keys until evaluation
+ // --> we know we're not dropping any mappings
+ result = (result combine that.result).asInstanceOf[HashMapCombiner[K, U]]
+ }
+ }
+
protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], Take[U, This]] {
@volatile var result: Combiner[U, This] = null
@@ -1264,9 +1293,9 @@ self =>
private[parallel] def brokenInvariants = Seq[String]()
- private val dbbuff = ArrayBuffer[String]()
- def debugBuffer: ArrayBuffer[String] = dbbuff
- // def debugBuffer: ArrayBuffer[String] = null
+ // private val dbbuff = ArrayBuffer[String]()
+ // def debugBuffer: ArrayBuffer[String] = dbbuff
+ def debugBuffer: ArrayBuffer[String] = null
private[parallel] def debugclear() = synchronized {
debugBuffer.clear
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index 063a8cab7d..8a4f15fe6d 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -235,22 +235,23 @@ self =>
}
} otherwise super.endsWith(that)
- override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int)
- (implicit bf: CanBuildFrom[Repr, U, That]): That = if (patch.isParSeq && bf.isParallel) {
- val that = patch.asParSeq
- val pbf = bf.asParallel
+ override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
val realreplaced = replaced min (length - from)
- val pits = parallelIterator.psplit(from, replaced, length - from - realreplaced)
- val copystart = new Copy[U, That](() => pbf(repr), pits(0))
- val copymiddle = wrap {
- val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator)
- tasksupport.executeAndWaitResult(tsk)
- }
- val copyend = new Copy[U, That](() => pbf(repr), pits(2))
- executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
- _.result
- })
- } else patch_sequential(from, patch, replaced)
+ if (patch.isParSeq && bf.isParallel && (size - realreplaced + patch.size) > MIN_FOR_COPY) {
+ val that = patch.asParSeq
+ val pbf = bf.asParallel
+ val pits = parallelIterator.psplit(from, replaced, length - from - realreplaced)
+ val copystart = new Copy[U, That](() => pbf(repr), pits(0))
+ val copymiddle = wrap {
+ val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator)
+ tasksupport.executeAndWaitResult(tsk)
+ }
+ val copyend = new Copy[U, That](() => pbf(repr), pits(2))
+ executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
+ _.result
+ })
+ } else patch_sequential(from, patch, replaced)
+ }
private def patch_sequential[U >: T, That](from: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
val b = bf(repr)
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 812a2ed94d..d60c2d39e8 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -141,7 +141,7 @@ object ParHashMap extends ParMapFactory[ParHashMap] {
}
-private[immutable] abstract class HashMapCombiner[K, V]
+private[parallel] abstract class HashMapCombiner[K, V]
extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), HashMapCombiner[K, V]](HashMapCombiner.rootsize) {
self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
import HashMapCombiner._
@@ -183,6 +183,28 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
}
}
+ def groupByKey[Repr](cbf: () => Combiner[V, Repr]): ParHashMap[K, Repr] = {
+ val bucks = buckets.filter(_ != null).map(_.headPtr)
+ val root = new Array[HashMap[K, AnyRef]](bucks.length)
+
+ executeAndWaitResult(new CreateGroupedTrie(cbf, bucks, root, 0, bucks.length))
+
+ var bitmap = 0
+ var i = 0
+ while (i < rootsize) {
+ if (buckets(i) ne null) bitmap |= 1 << i
+ i += 1
+ }
+ val sz = root.foldLeft(0)(_ + _.size)
+
+ if (sz == 0) new ParHashMap[K, Repr]
+ else if (sz == 1) new ParHashMap[K, Repr](root(0).asInstanceOf[HashMap[K, Repr]])
+ else {
+ val trie = new HashMap.HashTrieMap(bitmap, root.asInstanceOf[Array[HashMap[K, Repr]]], sz)
+ new ParHashMap[K, Repr](trie)
+ }
+ }
+
override def toString = {
"HashTrieCombiner(sz: " + size + ")"
//"HashTrieCombiner(buckets:\n\t" + buckets.filter(_ != null).mkString("\n\t") + ")\n"
@@ -229,6 +251,64 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
}
+ class CreateGroupedTrie[Repr](cbf: () => Combiner[V, Repr], bucks: Array[Unrolled[(K, V)]], root: Array[HashMap[K, AnyRef]], offset: Int, howmany: Int)
+ extends Task[Unit, CreateGroupedTrie[Repr]] {
+ @volatile var result = ()
+ def leaf(prev: Option[Unit]) = {
+ var i = offset
+ val until = offset + howmany
+ while (i < until) {
+ root(i) = createGroupedTrie(bucks(i)).asInstanceOf[HashMap[K, AnyRef]]
+ i += 1
+ }
+ result = result
+ }
+ private def createGroupedTrie(elems: Unrolled[(K, V)]): HashMap[K, Repr] = {
+ var trie = new HashMap[K, Combiner[V, Repr]]
+
+ var unrolled = elems
+ var i = 0
+ while (unrolled ne null) {
+ val chunkarr = unrolled.array
+ val chunksz = unrolled.size
+ while (i < chunksz) {
+ val kv = chunkarr(i)
+ val hc = trie.computeHash(kv._1)
+
+ // check to see if already present
+ val cmb: Combiner[V, Repr] = trie.get0(kv._1, hc, rootbits) match {
+ case Some(cmb) => cmb
+ case None =>
+ val cmb: Combiner[V, Repr] = cbf()
+ trie = trie.updated0[Combiner[V, Repr]](kv._1, hc, rootbits, cmb, null, null)
+ cmb
+ }
+ cmb += kv._2
+ i += 1
+ }
+ i = 0
+ unrolled = unrolled.next
+ }
+
+ evaluateCombiners(trie)
+ trie.asInstanceOf[HashMap[K, Repr]]
+ }
+ private def evaluateCombiners(trie: HashMap[K, Combiner[V, Repr]]): Unit = trie match {
+ case hm1: HashMap.HashMap1[_, _] =>
+ hm1.asInstanceOf[HashMap.HashMap1[K, Repr]].value = hm1.value.result
+ hm1.kv = null
+ case hmc: HashMap.HashMapCollision1[_, _] =>
+ hmc.asInstanceOf[HashMap.HashMapCollision1[K, Repr]].kvs = hmc.kvs map { p => (p._1, p._2.result) }
+ case htm: HashMap.HashTrieMap[_, _] =>
+ for (hm <- htm.elems) evaluateCombiners(hm)
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new CreateGroupedTrie(cbf, bucks, root, offset, fp), new CreateGroupedTrie(cbf, bucks, root, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
+ }
+
}
diff --git a/src/library/scala/collection/parallel/immutable/ParIterable.scala b/src/library/scala/collection/parallel/immutable/ParIterable.scala
index 48f2bdb439..00a8c02fd6 100644
--- a/src/library/scala/collection/parallel/immutable/ParIterable.scala
+++ b/src/library/scala/collection/parallel/immutable/ParIterable.scala
@@ -40,12 +40,12 @@ extends collection.immutable.Iterable[T]
/** $factoryinfo
*/
object ParIterable extends ParFactory[ParIterable] {
- implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] =
- new GenericCanCombineFrom[T]
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] =
+ new GenericCanCombineFrom[T]
- def newBuilder[T]: Combiner[T, ParIterable[T]] = HashSetCombiner[T] // TODO vector
+ def newBuilder[T]: Combiner[T, ParIterable[T]] = ParVector.newBuilder[T]
- def newCombiner[T]: Combiner[T, ParIterable[T]] = HashSetCombiner[T] // TODO vector
+ def newCombiner[T]: Combiner[T, ParIterable[T]] = ParVector.newCombiner[T]
}
diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala
index b1be7ffab5..2456c2beda 100644
--- a/src/library/scala/collection/parallel/immutable/ParRange.scala
+++ b/src/library/scala/collection/parallel/immutable/ParRange.scala
@@ -3,7 +3,6 @@ package scala.collection.parallel.immutable
import scala.collection.immutable.Range
-import scala.collection.parallel.ParSeq
import scala.collection.parallel.Combiner
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.ParIterableIterator
@@ -27,9 +26,9 @@ self =>
type SCPI = SignalContextPassingIterator[ParRangeIterator]
- override def toParSeq = this // TODO remove when we have ParSeq, when ParVector is in place
+ override def toParSeq = this
- override def toParSet[U >: Int] = toParCollection[U, ParSet[U]](() => HashSetCombiner[U]) // TODO remove when we have ParSeq, when ParVector is in place
+ override def toParSet[U >: Int] = toParCollection[U, ParSet[U]](() => HashSetCombiner[U])
class ParRangeIterator(range: Range = self.range)
extends ParIterator {
diff --git a/src/library/scala/collection/parallel/immutable/ParSeq.scala b/src/library/scala/collection/parallel/immutable/ParSeq.scala
new file mode 100644
index 0000000000..68ed9a3139
--- /dev/null
+++ b/src/library/scala/collection/parallel/immutable/ParSeq.scala
@@ -0,0 +1,46 @@
+package scala.collection
+package parallel.immutable
+
+
+import scala.collection.generic.GenericParTemplate
+import scala.collection.generic.GenericCompanion
+import scala.collection.generic.GenericParCompanion
+import scala.collection.generic.CanCombineFrom
+import scala.collection.generic.ParFactory
+import scala.collection.parallel.ParSeqLike
+import scala.collection.parallel.Combiner
+
+
+
+
+/** An immutable variant of `ParSeq`.
+ *
+ * @define Coll mutable.ParSeq
+ * @define coll mutable parallel sequence
+ */
+trait ParSeq[+T]
+extends collection.immutable.Seq[T]
+ with collection.parallel.ParSeq[T]
+ with ParIterable[T]
+ with GenericParTemplate[T, ParSeq]
+ with ParSeqLike[T, ParSeq[T], Seq[T]]
+{
+ override def companion: GenericCompanion[ParSeq] with GenericParCompanion[ParSeq] = ParSeq
+ override def toSeq: ParSeq[T] = this
+}
+
+
+/** $factoryInfo
+ * @define Coll mutable.ParSeq
+ * @define coll mutable parallel sequence
+ */
+object ParSeq extends ParFactory[ParSeq] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParSeq[T]] = new GenericCanCombineFrom[T]
+
+ def newBuilder[T]: Combiner[T, ParSeq[T]] = ParVector.newBuilder[T]
+
+ def newCombiner[T]: Combiner[T, ParSeq[T]] = ParVector.newCombiner[T]
+}
+
+
+
diff --git a/src/library/scala/collection/parallel/immutable/ParVector.scala b/src/library/scala/collection/parallel/immutable/ParVector.scala
new file mode 100644
index 0000000000..663621e060
--- /dev/null
+++ b/src/library/scala/collection/parallel/immutable/ParVector.scala
@@ -0,0 +1,115 @@
+package scala.collection
+package parallel.immutable
+
+
+
+
+import scala.collection.generic.{GenericParTemplate, CanCombineFrom, ParFactory}
+import scala.collection.parallel.ParSeqLike
+import scala.collection.parallel.Combiner
+import scala.collection.parallel.ParSeqIterator
+import scala.collection.parallel.EnvironmentPassingCombiner
+import mutable.ArrayBuffer
+import immutable.Vector
+import immutable.VectorBuilder
+import immutable.VectorIterator
+
+
+
+class ParVector[+T](private[this] val vector: Vector[T])
+extends ParSeq[T]
+ with GenericParTemplate[T, ParVector]
+ with ParSeqLike[T, ParVector[T], Vector[T]]
+ with Serializable
+{
+ override def companion = ParVector
+
+ def this() = this(Vector())
+
+ type SCPI = SignalContextPassingIterator[ParVectorIterator]
+
+ def apply(idx: Int) = vector.apply(idx)
+ def length = vector.length
+ def parallelIterator: ParSeqIterator[T] = {
+ val pit = new ParVectorIterator(vector.startIndex, vector.endIndex) with SCPI
+ vector.initIterator(pit)
+ pit
+ }
+ def seq: Vector[T] = vector
+
+ class ParVectorIterator(_start: Int, _end: Int) extends VectorIterator[T](_start, _end) with ParIterator {
+ self: SCPI =>
+ def remaining: Int = remainingElementCount
+ def dup: ParSeqIterator[T] = (new ParVector(remainingVector)).parallelIterator
+ def split: Seq[ParVectorIterator] = {
+ val rem = remaining
+ if (rem >= 2) psplit(rem / 2, rem - rem / 2)
+ else Seq(this)
+ }
+ def psplit(sizes: Int*): Seq[ParVectorIterator] = {
+ var remvector = remainingVector
+ val splitted = new ArrayBuffer[Vector[T]]
+ for (sz <- sizes) {
+ splitted += remvector.take(sz)
+ remvector = remvector.drop(sz)
+ }
+ splitted.map(v => new ParVector(v).parallelIterator.asInstanceOf[ParVectorIterator])
+ }
+ }
+
+}
+
+
+
+object ParVector extends ParFactory[ParVector] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParVector[T]] =
+ new GenericCanCombineFrom[T]
+
+ def newBuilder[T]: Combiner[T, ParVector[T]] = new LazyParVectorCombiner[T] with EPC[T, ParVector[T]]
+
+ def newCombiner[T]: Combiner[T, ParVector[T]] = new LazyParVectorCombiner[T] with EPC[T, ParVector[T]]
+}
+
+
+
+private[immutable] class LazyParVectorCombiner[T] extends Combiner[T, ParVector[T]] {
+self: EnvironmentPassingCombiner[T, ParVector[T]] =>
+ var sz = 0
+ val vectors = new ArrayBuffer[VectorBuilder[T]] += new VectorBuilder[T]
+
+ def size: Int = sz
+
+ def +=(elem: T): this.type = {
+ vectors.last += elem
+ sz += 1
+ this
+ }
+
+ def clear = {
+ vectors.clear
+ vectors += new VectorBuilder[T]
+ sz = 0
+ }
+
+ def result: ParVector[T] = {
+ val rvb = new VectorBuilder[T]
+ for (vb <- vectors) {
+ rvb ++= vb.result
+ }
+ new ParVector(rvb.result)
+ }
+
+ def combine[U <: T, NewTo >: ParVector[T]](other: Combiner[U, NewTo]) = if (other eq this) this else {
+ val that = other.asInstanceOf[LazyParVectorCombiner[T]]
+ sz += that.sz
+ vectors ++= that.vectors
+ this
+ }
+
+}
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/immutable/ParallelSeq.scala.disabled b/src/library/scala/collection/parallel/immutable/ParallelSeq.scala.disabled
deleted file mode 100644
index e0e4e2ce54..0000000000
--- a/src/library/scala/collection/parallel/immutable/ParallelSeq.scala.disabled
+++ /dev/null
@@ -1,44 +0,0 @@
-package scala.collection.parallel.immutable
-
-
-import scala.collection.generic.GenericParTemplate
-import scala.collection.generic.GenericCompanion
-import scala.collection.generic.GenericParCompanion
-import scala.collection.generic.CanCombineFrom
-import scala.collection.generic.ParFactory
-import scala.collection.parallel.ParSeqLike
-import scala.collection.parallel.Combiner
-
-
-
-// TODO uncomment when we add parallel vectors
-
-///** An immutable variant of `ParallelSeq`.
-// *
-// * @define Coll mutable.ParallelSeq
-// * @define coll mutable parallel sequence
-// */
-//trait ParallelSeq[A] extends collection.immutable.IndexedSeq[A]
-// with ParallelIterable[A]
-// with collection.parallel.ParallelSeq[A]
-// with GenericParallelTemplate[A, ParallelSeq]
-// with ParallelSeqLike[A, ParallelSeq[A], Seq[A]] {
-// override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq
-//
-//}
-//
-//
-///** $factoryInfo
-// * @define Coll mutable.ParallelSeq
-// * @define coll mutable parallel sequence
-// */
-//object ParallelSeq extends ParallelFactory[ParallelSeq] {
-// implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeq[T]] = new GenericCanBuildFromParallel[T]
-//
-// def newBuilder[A]: Combiner[A, ParallelSeq[A]] = null // TODO
-//
-// def newCombiner[A]: Combiner[A, ParallelSeq[A]] = null // TODO
-//}
-
-
-
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 1e545fd882..757f5d2686 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -15,7 +15,7 @@ import annotation.unchecked.uncheckedVariance
package object parallel {
/* constants */
- val MIN_FOR_COPY = -1
+ val MIN_FOR_COPY = 512
val CHECK_RATE = 512
val SQRT2 = math.sqrt(2)
val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors
@@ -47,17 +47,6 @@ package object parallel {
/* implicit conversions */
- /** An implicit conversion providing arrays with a `par` method, which
- * returns a parallel array.
- *
- * @tparam T type of the elements in the array, which is a subtype of AnyRef
- * @param array the array to be parallelized
- * @return a `Parallelizable` object with a `par` method=
- */
- implicit def array2ParArray[T <: AnyRef](array: Array[T]) = new Parallelizable[mutable.ParArray[T]] {
- def par = mutable.ParArray.handoff[T](array)
- }
-
trait FactoryOps[From, Elem, To] {
trait Otherwise[R] {
def otherwise(notbody: => R): R