summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2011-01-07 12:05:31 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2011-01-07 12:05:31 +0000
commit733a3d756943e67d7608dbfd92aac445a080d69d (patch)
tree6ce9bb74cee997212845c6fac15c8923b54c55ec
parent07edcee629fb00808fb695ba4ec68a26f3e99490 (diff)
downloadscala-733a3d756943e67d7608dbfd92aac445a080d69d.tar.gz
scala-733a3d756943e67d7608dbfd92aac445a080d69d.tar.bz2
scala-733a3d756943e67d7608dbfd92aac445a080d69d.zip
Implemented a (slower) workaround for parallel ...
Implemented a (slower) workaround for parallel vectors. Implemented group by. No review.
-rw-r--r--src/library/scala/collection/immutable/HashMap.scala8
-rw-r--r--src/library/scala/collection/immutable/Vector.scala27
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala43
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala31
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala82
-rw-r--r--src/library/scala/collection/parallel/immutable/ParIterable.scala8
-rw-r--r--src/library/scala/collection/parallel/immutable/ParRange.scala5
-rw-r--r--src/library/scala/collection/parallel/immutable/ParSeq.scala46
-rw-r--r--src/library/scala/collection/parallel/immutable/ParVector.scala115
-rw-r--r--src/library/scala/collection/parallel/immutable/ParallelSeq.scala.disabled44
-rw-r--r--src/library/scala/collection/parallel/package.scala13
-rw-r--r--test/benchmarks/source.list2
-rw-r--r--test/benchmarks/src/scala/collection/parallel/Benchmarking.scala1
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Coder.scala79
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Dictionary.scala3
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Loader.scala64
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/GroupBy.scala45
-rw-r--r--test/files/scalacheck/parallel-collections/IntOperators.scala4
-rw-r--r--test/files/scalacheck/parallel-collections/Operators.scala1
-rw-r--r--test/files/scalacheck/parallel-collections/PairOperators.scala4
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala16
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelVectorCheck.scala61
-rw-r--r--test/files/scalacheck/parallel-collections/pc.scala1
23 files changed, 573 insertions, 130 deletions
diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala
index 5b20638a84..a21e71158e 100644
--- a/src/library/scala/collection/immutable/HashMap.scala
+++ b/src/library/scala/collection/immutable/HashMap.scala
@@ -75,7 +75,7 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Par
protected type Merger[B1] = ((A, B1), (A, B1)) => (A, B1)
- protected def get0(key: A, hash: Int, level: Int): Option[B] = None
+ private[collection] def get0(key: A, hash: Int, level: Int): Option[B] = None
private[collection] def updated0[B1 >: B](key: A, hash: Int, level: Int, value: B1, kv: (A, B1), merger: Merger[B1]): HashMap[A, B1] =
new HashMap.HashMap1(key, hash, value, kv)
@@ -117,7 +117,7 @@ object HashMap extends ImmutableMapFactory[HashMap] {
// TODO: add HashMap2, HashMap3, ...
- class HashMap1[A,+B](private[HashMap] var key: A, private[HashMap] var hash: Int, private[HashMap] var value: (B @uncheckedVariance), private[HashMap] var kv: (A,B @uncheckedVariance)) extends HashMap[A,B] {
+ class HashMap1[A,+B](private[HashMap] var key: A, private[HashMap] var hash: Int, private[collection] var value: (B @uncheckedVariance), private[collection] var kv: (A,B @uncheckedVariance)) extends HashMap[A,B] {
override def size = 1
private[collection] def getKey = key
@@ -188,7 +188,7 @@ object HashMap extends ImmutableMapFactory[HashMap] {
}
}
- private class HashMapCollision1[A,+B](private[HashMap] var hash: Int, var kvs: ListMap[A,B @uncheckedVariance]) extends HashMap[A,B] {
+ private[collection] class HashMapCollision1[A,+B](private[HashMap] var hash: Int, var kvs: ListMap[A,B @uncheckedVariance]) extends HashMap[A,B] {
override def size = kvs.size
override def get0(key: A, hash: Int, level: Int): Option[B] =
@@ -230,7 +230,7 @@ object HashMap extends ImmutableMapFactory[HashMap] {
}
}
- class HashTrieMap[A,+B](private[HashMap] var bitmap: Int, private[HashMap] var elems: Array[HashMap[A,B @uncheckedVariance]],
+ class HashTrieMap[A,+B](private[HashMap] var bitmap: Int, private[collection] var elems: Array[HashMap[A,B @uncheckedVariance]],
private[HashMap] var size0: Int) extends HashMap[A,B] {
/*
def this (level: Int, m1: HashMap1[A,B], m2: HashMap1[A,B]) = {
diff --git a/src/library/scala/collection/immutable/Vector.scala b/src/library/scala/collection/immutable/Vector.scala
index aeb3b82147..faee7aa60e 100644
--- a/src/library/scala/collection/immutable/Vector.scala
+++ b/src/library/scala/collection/immutable/Vector.scala
@@ -15,6 +15,7 @@ import compat.Platform
import scala.collection.generic._
import scala.collection.mutable.Builder
+import scala.collection.parallel.immutable.ParVector
object Vector extends SeqFactory[Vector] {
@@ -32,11 +33,14 @@ object Vector extends SeqFactory[Vector] {
// in principle, most members should be private. however, access privileges must
// be carefully chosen to not prevent method inlining
-final class Vector[+A](startIndex: Int, endIndex: Int, focus: Int) extends IndexedSeq[A]
- with GenericTraversableTemplate[A, Vector]
- with IndexedSeqLike[A, Vector[A]]
- with VectorPointer[A @uncheckedVariance]
- with Serializable { self =>
+final class Vector[+A](private[collection] val startIndex: Int, private[collection] val endIndex: Int, focus: Int)
+extends IndexedSeq[A]
+ with GenericTraversableTemplate[A, Vector]
+ with IndexedSeqLike[A, Vector[A]]
+ with VectorPointer[A @uncheckedVariance]
+ with Serializable
+ with Parallelizable[ParVector[A]]
+{ self =>
override def companion: GenericCompanion[Vector] = Vector
@@ -49,14 +53,19 @@ override def companion: GenericCompanion[Vector] = Vector
def length = endIndex - startIndex
- override def lengthCompare(len: Int): Int = length - len
+ def par = new ParVector(this)
+ override def lengthCompare(len: Int): Int = length - len
- @inline override def iterator: VectorIterator[A] = {
- val s = new VectorIterator[A](startIndex, endIndex)
+ private[collection] final def initIterator[B >: A](s: VectorIterator[B]) {
s.initFrom(this)
if (dirty) s.stabilize(focus)
if (s.depth > 1) s.gotoPos(startIndex, startIndex ^ focus)
+ }
+
+ @inline override def iterator: VectorIterator[A] = {
+ val s = new VectorIterator[A](startIndex, endIndex)
+ initIterator(s)
s
}
@@ -602,7 +611,7 @@ override def companion: GenericCompanion[Vector] = Vector
}
-final class VectorIterator[+A](_startIndex: Int, _endIndex: Int) extends Iterator[A] with VectorPointer[A @uncheckedVariance] {
+class VectorIterator[+A](_startIndex: Int, _endIndex: Int) extends Iterator[A] with VectorPointer[A @uncheckedVariance] {
private var blockIndex: Int = _startIndex & ~31
private var lo: Int = _startIndex & 31
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 3008f93ebd..9f894c0af8 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -10,12 +10,12 @@ import scala.collection.Parallel
import scala.collection.Parallelizable
import scala.collection.Sequentializable
import scala.collection.generic._
-
+import immutable.HashMapCombiner
import java.util.concurrent.atomic.AtomicBoolean
+import annotation.unchecked.uncheckedVariance
-import annotation.unchecked.uncheckedStable
// TODO update docs!!
@@ -520,6 +520,12 @@ self =>
executeAndWaitResult(new Partition(pred, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) })
}
+ // override def groupBy[K](f: T => K): immutable.ParMap[K, Repr] = {
+ // executeAndWaitResult(new GroupBy(f, () => HashMapCombiner[K, T], parallelIterator) mapResult {
+ // rcb => rcb.groupByKey(cbfactory)
+ // })
+ // }
+
override def take(n: Int): Repr = {
val actualn = if (size > n) n else size
if (actualn < MIN_FOR_COPY) take_sequential(actualn)
@@ -893,9 +899,9 @@ self =>
def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf(self.repr))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new FlatMap(f, pbf, p)
override def merge(that: FlatMap[S, That]) = {
- debuglog("merging " + result + " and " + that.result)
+ //debuglog("merging " + result + " and " + that.result)
result = result combine that.result
- debuglog("merged into " + result)
+ //debuglog("merged into " + result)
}
}
@@ -956,6 +962,29 @@ self =>
override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2)
}
+ protected[this] class GroupBy[K, U >: T](
+ f: U => K,
+ mcf: () => HashMapCombiner[K, U],
+ protected[this] val pit: ParIterableIterator[T]
+ ) extends Transformer[HashMapCombiner[K, U], GroupBy[K, U]] {
+ @volatile var result: Result = null
+ final def leaf(prev: Option[Result]) = {
+ // note: HashMapCombiner doesn't merge same keys until evaluation
+ val cb = mcf()
+ while (pit.hasNext) {
+ val elem = pit.next
+ cb += f(elem) -> elem
+ }
+ result = cb
+ }
+ protected[this] def newSubtask(p: ParIterableIterator[T]) = new GroupBy(f, mcf, p)
+ override def merge(that: GroupBy[K, U]) = {
+ // note: this works because we know that a HashMapCombiner doesn't merge same keys until evaluation
+ // --> we know we're not dropping any mappings
+ result = (result combine that.result).asInstanceOf[HashMapCombiner[K, U]]
+ }
+ }
+
protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], Take[U, This]] {
@volatile var result: Combiner[U, This] = null
@@ -1264,9 +1293,9 @@ self =>
private[parallel] def brokenInvariants = Seq[String]()
- private val dbbuff = ArrayBuffer[String]()
- def debugBuffer: ArrayBuffer[String] = dbbuff
- // def debugBuffer: ArrayBuffer[String] = null
+ // private val dbbuff = ArrayBuffer[String]()
+ // def debugBuffer: ArrayBuffer[String] = dbbuff
+ def debugBuffer: ArrayBuffer[String] = null
private[parallel] def debugclear() = synchronized {
debugBuffer.clear
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index 063a8cab7d..8a4f15fe6d 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -235,22 +235,23 @@ self =>
}
} otherwise super.endsWith(that)
- override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int)
- (implicit bf: CanBuildFrom[Repr, U, That]): That = if (patch.isParSeq && bf.isParallel) {
- val that = patch.asParSeq
- val pbf = bf.asParallel
+ override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
val realreplaced = replaced min (length - from)
- val pits = parallelIterator.psplit(from, replaced, length - from - realreplaced)
- val copystart = new Copy[U, That](() => pbf(repr), pits(0))
- val copymiddle = wrap {
- val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator)
- tasksupport.executeAndWaitResult(tsk)
- }
- val copyend = new Copy[U, That](() => pbf(repr), pits(2))
- executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
- _.result
- })
- } else patch_sequential(from, patch, replaced)
+ if (patch.isParSeq && bf.isParallel && (size - realreplaced + patch.size) > MIN_FOR_COPY) {
+ val that = patch.asParSeq
+ val pbf = bf.asParallel
+ val pits = parallelIterator.psplit(from, replaced, length - from - realreplaced)
+ val copystart = new Copy[U, That](() => pbf(repr), pits(0))
+ val copymiddle = wrap {
+ val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator)
+ tasksupport.executeAndWaitResult(tsk)
+ }
+ val copyend = new Copy[U, That](() => pbf(repr), pits(2))
+ executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
+ _.result
+ })
+ } else patch_sequential(from, patch, replaced)
+ }
private def patch_sequential[U >: T, That](from: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
val b = bf(repr)
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 812a2ed94d..d60c2d39e8 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -141,7 +141,7 @@ object ParHashMap extends ParMapFactory[ParHashMap] {
}
-private[immutable] abstract class HashMapCombiner[K, V]
+private[parallel] abstract class HashMapCombiner[K, V]
extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), HashMapCombiner[K, V]](HashMapCombiner.rootsize) {
self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
import HashMapCombiner._
@@ -183,6 +183,28 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
}
}
+ def groupByKey[Repr](cbf: () => Combiner[V, Repr]): ParHashMap[K, Repr] = {
+ val bucks = buckets.filter(_ != null).map(_.headPtr)
+ val root = new Array[HashMap[K, AnyRef]](bucks.length)
+
+ executeAndWaitResult(new CreateGroupedTrie(cbf, bucks, root, 0, bucks.length))
+
+ var bitmap = 0
+ var i = 0
+ while (i < rootsize) {
+ if (buckets(i) ne null) bitmap |= 1 << i
+ i += 1
+ }
+ val sz = root.foldLeft(0)(_ + _.size)
+
+ if (sz == 0) new ParHashMap[K, Repr]
+ else if (sz == 1) new ParHashMap[K, Repr](root(0).asInstanceOf[HashMap[K, Repr]])
+ else {
+ val trie = new HashMap.HashTrieMap(bitmap, root.asInstanceOf[Array[HashMap[K, Repr]]], sz)
+ new ParHashMap[K, Repr](trie)
+ }
+ }
+
override def toString = {
"HashTrieCombiner(sz: " + size + ")"
//"HashTrieCombiner(buckets:\n\t" + buckets.filter(_ != null).mkString("\n\t") + ")\n"
@@ -229,6 +251,64 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
}
+ class CreateGroupedTrie[Repr](cbf: () => Combiner[V, Repr], bucks: Array[Unrolled[(K, V)]], root: Array[HashMap[K, AnyRef]], offset: Int, howmany: Int)
+ extends Task[Unit, CreateGroupedTrie[Repr]] {
+ @volatile var result = ()
+ def leaf(prev: Option[Unit]) = {
+ var i = offset
+ val until = offset + howmany
+ while (i < until) {
+ root(i) = createGroupedTrie(bucks(i)).asInstanceOf[HashMap[K, AnyRef]]
+ i += 1
+ }
+ result = result
+ }
+ private def createGroupedTrie(elems: Unrolled[(K, V)]): HashMap[K, Repr] = {
+ var trie = new HashMap[K, Combiner[V, Repr]]
+
+ var unrolled = elems
+ var i = 0
+ while (unrolled ne null) {
+ val chunkarr = unrolled.array
+ val chunksz = unrolled.size
+ while (i < chunksz) {
+ val kv = chunkarr(i)
+ val hc = trie.computeHash(kv._1)
+
+ // check to see if already present
+ val cmb: Combiner[V, Repr] = trie.get0(kv._1, hc, rootbits) match {
+ case Some(cmb) => cmb
+ case None =>
+ val cmb: Combiner[V, Repr] = cbf()
+ trie = trie.updated0[Combiner[V, Repr]](kv._1, hc, rootbits, cmb, null, null)
+ cmb
+ }
+ cmb += kv._2
+ i += 1
+ }
+ i = 0
+ unrolled = unrolled.next
+ }
+
+ evaluateCombiners(trie)
+ trie.asInstanceOf[HashMap[K, Repr]]
+ }
+ private def evaluateCombiners(trie: HashMap[K, Combiner[V, Repr]]): Unit = trie match {
+ case hm1: HashMap.HashMap1[_, _] =>
+ hm1.asInstanceOf[HashMap.HashMap1[K, Repr]].value = hm1.value.result
+ hm1.kv = null
+ case hmc: HashMap.HashMapCollision1[_, _] =>
+ hmc.asInstanceOf[HashMap.HashMapCollision1[K, Repr]].kvs = hmc.kvs map { p => (p._1, p._2.result) }
+ case htm: HashMap.HashTrieMap[_, _] =>
+ for (hm <- htm.elems) evaluateCombiners(hm)
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new CreateGroupedTrie(cbf, bucks, root, offset, fp), new CreateGroupedTrie(cbf, bucks, root, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
+ }
+
}
diff --git a/src/library/scala/collection/parallel/immutable/ParIterable.scala b/src/library/scala/collection/parallel/immutable/ParIterable.scala
index 48f2bdb439..00a8c02fd6 100644
--- a/src/library/scala/collection/parallel/immutable/ParIterable.scala
+++ b/src/library/scala/collection/parallel/immutable/ParIterable.scala
@@ -40,12 +40,12 @@ extends collection.immutable.Iterable[T]
/** $factoryinfo
*/
object ParIterable extends ParFactory[ParIterable] {
- implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] =
- new GenericCanCombineFrom[T]
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] =
+ new GenericCanCombineFrom[T]
- def newBuilder[T]: Combiner[T, ParIterable[T]] = HashSetCombiner[T] // TODO vector
+ def newBuilder[T]: Combiner[T, ParIterable[T]] = ParVector.newBuilder[T]
- def newCombiner[T]: Combiner[T, ParIterable[T]] = HashSetCombiner[T] // TODO vector
+ def newCombiner[T]: Combiner[T, ParIterable[T]] = ParVector.newCombiner[T]
}
diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala
index b1be7ffab5..2456c2beda 100644
--- a/src/library/scala/collection/parallel/immutable/ParRange.scala
+++ b/src/library/scala/collection/parallel/immutable/ParRange.scala
@@ -3,7 +3,6 @@ package scala.collection.parallel.immutable
import scala.collection.immutable.Range
-import scala.collection.parallel.ParSeq
import scala.collection.parallel.Combiner
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.ParIterableIterator
@@ -27,9 +26,9 @@ self =>
type SCPI = SignalContextPassingIterator[ParRangeIterator]
- override def toParSeq = this // TODO remove when we have ParSeq, when ParVector is in place
+ override def toParSeq = this
- override def toParSet[U >: Int] = toParCollection[U, ParSet[U]](() => HashSetCombiner[U]) // TODO remove when we have ParSeq, when ParVector is in place
+ override def toParSet[U >: Int] = toParCollection[U, ParSet[U]](() => HashSetCombiner[U])
class ParRangeIterator(range: Range = self.range)
extends ParIterator {
diff --git a/src/library/scala/collection/parallel/immutable/ParSeq.scala b/src/library/scala/collection/parallel/immutable/ParSeq.scala
new file mode 100644
index 0000000000..68ed9a3139
--- /dev/null
+++ b/src/library/scala/collection/parallel/immutable/ParSeq.scala
@@ -0,0 +1,46 @@
+package scala.collection
+package parallel.immutable
+
+
+import scala.collection.generic.GenericParTemplate
+import scala.collection.generic.GenericCompanion
+import scala.collection.generic.GenericParCompanion
+import scala.collection.generic.CanCombineFrom
+import scala.collection.generic.ParFactory
+import scala.collection.parallel.ParSeqLike
+import scala.collection.parallel.Combiner
+
+
+
+
+/** An immutable variant of `ParSeq`.
+ *
+ * @define Coll mutable.ParSeq
+ * @define coll mutable parallel sequence
+ */
+trait ParSeq[+T]
+extends collection.immutable.Seq[T]
+ with collection.parallel.ParSeq[T]
+ with ParIterable[T]
+ with GenericParTemplate[T, ParSeq]
+ with ParSeqLike[T, ParSeq[T], Seq[T]]
+{
+ override def companion: GenericCompanion[ParSeq] with GenericParCompanion[ParSeq] = ParSeq
+ override def toSeq: ParSeq[T] = this
+}
+
+
+/** $factoryInfo
+ * @define Coll mutable.ParSeq
+ * @define coll mutable parallel sequence
+ */
+object ParSeq extends ParFactory[ParSeq] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParSeq[T]] = new GenericCanCombineFrom[T]
+
+ def newBuilder[T]: Combiner[T, ParSeq[T]] = ParVector.newBuilder[T]
+
+ def newCombiner[T]: Combiner[T, ParSeq[T]] = ParVector.newCombiner[T]
+}
+
+
+
diff --git a/src/library/scala/collection/parallel/immutable/ParVector.scala b/src/library/scala/collection/parallel/immutable/ParVector.scala
new file mode 100644
index 0000000000..663621e060
--- /dev/null
+++ b/src/library/scala/collection/parallel/immutable/ParVector.scala
@@ -0,0 +1,115 @@
+package scala.collection
+package parallel.immutable
+
+
+
+
+import scala.collection.generic.{GenericParTemplate, CanCombineFrom, ParFactory}
+import scala.collection.parallel.ParSeqLike
+import scala.collection.parallel.Combiner
+import scala.collection.parallel.ParSeqIterator
+import scala.collection.parallel.EnvironmentPassingCombiner
+import mutable.ArrayBuffer
+import immutable.Vector
+import immutable.VectorBuilder
+import immutable.VectorIterator
+
+
+
+class ParVector[+T](private[this] val vector: Vector[T])
+extends ParSeq[T]
+ with GenericParTemplate[T, ParVector]
+ with ParSeqLike[T, ParVector[T], Vector[T]]
+ with Serializable
+{
+ override def companion = ParVector
+
+ def this() = this(Vector())
+
+ type SCPI = SignalContextPassingIterator[ParVectorIterator]
+
+ def apply(idx: Int) = vector.apply(idx)
+ def length = vector.length
+ def parallelIterator: ParSeqIterator[T] = {
+ val pit = new ParVectorIterator(vector.startIndex, vector.endIndex) with SCPI
+ vector.initIterator(pit)
+ pit
+ }
+ def seq: Vector[T] = vector
+
+ class ParVectorIterator(_start: Int, _end: Int) extends VectorIterator[T](_start, _end) with ParIterator {
+ self: SCPI =>
+ def remaining: Int = remainingElementCount
+ def dup: ParSeqIterator[T] = (new ParVector(remainingVector)).parallelIterator
+ def split: Seq[ParVectorIterator] = {
+ val rem = remaining
+ if (rem >= 2) psplit(rem / 2, rem - rem / 2)
+ else Seq(this)
+ }
+ def psplit(sizes: Int*): Seq[ParVectorIterator] = {
+ var remvector = remainingVector
+ val splitted = new ArrayBuffer[Vector[T]]
+ for (sz <- sizes) {
+ splitted += remvector.take(sz)
+ remvector = remvector.drop(sz)
+ }
+ splitted.map(v => new ParVector(v).parallelIterator.asInstanceOf[ParVectorIterator])
+ }
+ }
+
+}
+
+
+
+object ParVector extends ParFactory[ParVector] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParVector[T]] =
+ new GenericCanCombineFrom[T]
+
+ def newBuilder[T]: Combiner[T, ParVector[T]] = new LazyParVectorCombiner[T] with EPC[T, ParVector[T]]
+
+ def newCombiner[T]: Combiner[T, ParVector[T]] = new LazyParVectorCombiner[T] with EPC[T, ParVector[T]]
+}
+
+
+
+private[immutable] class LazyParVectorCombiner[T] extends Combiner[T, ParVector[T]] {
+self: EnvironmentPassingCombiner[T, ParVector[T]] =>
+ var sz = 0
+ val vectors = new ArrayBuffer[VectorBuilder[T]] += new VectorBuilder[T]
+
+ def size: Int = sz
+
+ def +=(elem: T): this.type = {
+ vectors.last += elem
+ sz += 1
+ this
+ }
+
+ def clear = {
+ vectors.clear
+ vectors += new VectorBuilder[T]
+ sz = 0
+ }
+
+ def result: ParVector[T] = {
+ val rvb = new VectorBuilder[T]
+ for (vb <- vectors) {
+ rvb ++= vb.result
+ }
+ new ParVector(rvb.result)
+ }
+
+ def combine[U <: T, NewTo >: ParVector[T]](other: Combiner[U, NewTo]) = if (other eq this) this else {
+ val that = other.asInstanceOf[LazyParVectorCombiner[T]]
+ sz += that.sz
+ vectors ++= that.vectors
+ this
+ }
+
+}
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/immutable/ParallelSeq.scala.disabled b/src/library/scala/collection/parallel/immutable/ParallelSeq.scala.disabled
deleted file mode 100644
index e0e4e2ce54..0000000000
--- a/src/library/scala/collection/parallel/immutable/ParallelSeq.scala.disabled
+++ /dev/null
@@ -1,44 +0,0 @@
-package scala.collection.parallel.immutable
-
-
-import scala.collection.generic.GenericParTemplate
-import scala.collection.generic.GenericCompanion
-import scala.collection.generic.GenericParCompanion
-import scala.collection.generic.CanCombineFrom
-import scala.collection.generic.ParFactory
-import scala.collection.parallel.ParSeqLike
-import scala.collection.parallel.Combiner
-
-
-
-// TODO uncomment when we add parallel vectors
-
-///** An immutable variant of `ParallelSeq`.
-// *
-// * @define Coll mutable.ParallelSeq
-// * @define coll mutable parallel sequence
-// */
-//trait ParallelSeq[A] extends collection.immutable.IndexedSeq[A]
-// with ParallelIterable[A]
-// with collection.parallel.ParallelSeq[A]
-// with GenericParallelTemplate[A, ParallelSeq]
-// with ParallelSeqLike[A, ParallelSeq[A], Seq[A]] {
-// override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq
-//
-//}
-//
-//
-///** $factoryInfo
-// * @define Coll mutable.ParallelSeq
-// * @define coll mutable parallel sequence
-// */
-//object ParallelSeq extends ParallelFactory[ParallelSeq] {
-// implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeq[T]] = new GenericCanBuildFromParallel[T]
-//
-// def newBuilder[A]: Combiner[A, ParallelSeq[A]] = null // TODO
-//
-// def newCombiner[A]: Combiner[A, ParallelSeq[A]] = null // TODO
-//}
-
-
-
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 1e545fd882..757f5d2686 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -15,7 +15,7 @@ import annotation.unchecked.uncheckedVariance
package object parallel {
/* constants */
- val MIN_FOR_COPY = -1
+ val MIN_FOR_COPY = 512
val CHECK_RATE = 512
val SQRT2 = math.sqrt(2)
val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors
@@ -47,17 +47,6 @@ package object parallel {
/* implicit conversions */
- /** An implicit conversion providing arrays with a `par` method, which
- * returns a parallel array.
- *
- * @tparam T type of the elements in the array, which is a subtype of AnyRef
- * @param array the array to be parallelized
- * @return a `Parallelizable` object with a `par` method=
- */
- implicit def array2ParArray[T <: AnyRef](array: Array[T]) = new Parallelizable[mutable.ParArray[T]] {
- def par = mutable.ParArray.handoff[T](array)
- }
-
trait FactoryOps[From, Elem, To] {
trait Otherwise[R] {
def otherwise(notbody: => R): R
diff --git a/test/benchmarks/source.list b/test/benchmarks/source.list
index 65ab4b9ca9..c5d5f7f8fe 100644
--- a/test/benchmarks/source.list
+++ b/test/benchmarks/source.list
@@ -22,6 +22,7 @@ src/scala/collection/parallel/benchmarks/parallel_array/SumLight.scala
src/scala/collection/parallel/benchmarks/parallel_array/MinLight.scala
src/scala/collection/parallel/benchmarks/parallel_array/CountList.scala
src/scala/collection/parallel/benchmarks/parallel_array/PatchHalf.scala
+src/scala/collection/parallel/benchmarks/parallel_array/GroupBy.scala
src/scala/collection/parallel/benchmarks/parallel_array/DiffHalf.scala
src/scala/collection/parallel/benchmarks/parallel_array/TakeMany.scala
src/scala/collection/parallel/benchmarks/parallel_array/PartialMapLight.scala
@@ -65,6 +66,7 @@ src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala
src/scala/collection/parallel/benchmarks/generic/Dummy.scala
src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala
src/scala/collection/parallel/benchmarks/misc/Dictionary.scala
+src/scala/collection/parallel/benchmarks/misc/Loader.scala
src/scala/collection/parallel/benchmarks/misc/Coder.scala
src/scala/collection/parallel/benchmarks/Bench.scala
src/scala/collection/parallel/benchmarks/hashtries/Foreach.scala
diff --git a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala
index 65b9be4ca3..cbda3551e0 100644
--- a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala
+++ b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala
@@ -66,6 +66,7 @@ trait BenchmarkRegister {
register(parallel_array.AggregateLight)
register(parallel_array.ScanLight)
register(parallel_array.ScanMedium)
+ register(parallel_array.GroupByLight)
register(parallel_array.MatrixMultiplication)
// parallel views
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Coder.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Coder.scala
index 4f809cf734..5ed0ca317d 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Coder.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Coder.scala
@@ -2,8 +2,12 @@ package scala.collection.parallel.benchmarks
package misc
-import collection.immutable._
-import collection.parallel.immutable._
+
+
+
+
+import collection._ //immutable._
+import collection.parallel._//immutable._
class SeqCoder(words: List[String]) {
@@ -23,24 +27,32 @@ class SeqCoder(words: List[String]) {
/** A map from digit strings to the words that represent
* them e.g. `5282` -> List(`Java`, `Kata`, `Lava`, ...)
*/
- val wordsForNum: Map[String, List[String]] =
- words groupBy wordCode withDefaultValue List()
+ val wordsForNum: Map[String, Seq[String]] =
+ (words groupBy wordCode).map(t => (t._1, t._2.toSeq)) withDefaultValue Seq()
/** All ways to encode a number as a list of words */
- def encode(number: String): Set[List[String]] =
- if (number.isEmpty) Set(List())
+ def encode(number: String): Set[Seq[String]] =
+ if (number.isEmpty) Set(Seq())
else {
val splits = (1 to number.length).toSet
- for {
- split <- splits
- word <- wordsForNum(number take split)
- rest <- encode(number drop split)
- } yield word :: rest
+ // for {
+ // split <- splits
+ // word <- wordsForNum(number take split)
+ // rest <- encode(number drop split)
+ // } yield word :: rest
+ val r = splits.flatMap(split => {
+ val wfn = wordsForNum(number take split).flatMap(word => {
+ val subs = encode(number drop split)
+ subs.map(rest => word +: rest)
+ })
+ wfn
+ })
+ r
}
/** Maps a number to a list of all word phrases that can
* represent it */
- def translate(number: String): Set[String] = encode(number) map (_ mkString " ")
+ def translate(number: String) = encode(number)// map (_ mkString " ")
def ??? : Nothing = throw new UnsupportedOperationException
}
@@ -62,37 +74,43 @@ class ParCoder(words: List[String]) {
/** A map from digit strings to the words that represent
* them e.g. `5282` -> List(`Java`, `Kata`, `Lava`, ...)
*/
- val wordsForNum: Map[String, List[String]] =
- words groupBy wordCode withDefaultValue List()
+ val wordsForNum: Map[String, Seq[String]] =
+ (words groupBy wordCode).map(t => (t._1, t._2)) withDefaultValue Seq()
/** All ways to encode a number as a list of words */
- def encode(number: String): ParSet[List[String]] =
- if (number.isEmpty) ParSet(List())
+ def encode(number: String): Set[Seq[String]] = if (number.length > 12) {
+ if (number.isEmpty) ParSet(ParSeq())
else {
val splits = (1 to number.length).toParSet
for {
- split <- splits
- word <- wordsForNum(number take split)
- rest <- encode(number drop split)
- } yield word :: rest
+ split <- splits
+ word <- wordsForNum(number take split)
+ rest <- encode(number drop split)
+ } yield word +: rest
+ }
+ } else {
+ if (number.isEmpty) Set(Seq())
+ else {
+ val splits = (1 to number.length).toSet
+ for {
+ split <- splits
+ word <- wordsForNum(number take split)
+ rest <- encode(number drop split)
+ } yield word +: rest
}
+ }
/** Maps a number to a list of all word phrases that can
* represent it */
- def translate(number: String): ParSet[String] = encode(number) map (_ mkString " ")
+ def translate(number: String) = {
+ encode(number)// map (_ mkString " ")
+ }
def ??? : Nothing = throw new UnsupportedOperationException
}
-/** Test code */
-object Main {
- def main(args : Array[String]) : Unit = {
- val coder = new SeqCoder(List("Scala", "Python", "Ruby", "Java", "Kata", "Lava", "a", "rocks", "pack", "rack", "sucks", "works"))
- println(coder.wordsForNum)
- println(coder.translate("7225276257"))
- }
-}
+
object Coder extends BenchCompanion {
@@ -110,7 +128,7 @@ class Coder(val size: Int, val parallelism: Int, val runWhat: String) extends Be
override def repetitionsPerRun = 1
- val code = "2328437472947362626"//33"//837976"//"6477323986225453446"
+ val code = "23284374729473626268379762538"
reset
@@ -131,6 +149,7 @@ class Coder(val size: Int, val parallelism: Int, val runWhat: String) extends Be
println("Translation check: " + t.size)
//println(t)
case "par" =>
+ collection.parallel.tasksupport.environment.asInstanceOf[concurrent.forkjoin.ForkJoinPool].setParallelism(parallelism)
parcoder = new ParCoder(Dictionary.wordlist)
val t = parcoder.translate(code)
println("Translation check: " + t.size)
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Dictionary.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Dictionary.scala
index 7ab5d94e93..e6ff55d234 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Dictionary.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Dictionary.scala
@@ -4,7 +4,8 @@ package scala.collection.parallel.benchmarks.misc
object Dictionary {
- val words = wordlines.split(System.getProperty("line.separator")).filter(_.trim != "").toList
+ val wordlist = wordlines.split(System.getProperty("line.separator")).filter(_.trim != "").toList
+ val wordarray = wordlist.toArray
def wordlines = {
val is = getClass.getClassLoader.getResourceAsStream("scala/collection/parallel/benchmarks/misc/dict.txt")
scala.io.Source.fromInputStream(is).mkString
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Loader.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Loader.scala
new file mode 100644
index 0000000000..2a9fc2c3ef
--- /dev/null
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Loader.scala
@@ -0,0 +1,64 @@
+package scala.collection.parallel.benchmarks
+package misc
+
+
+
+
+
+
+import collection._ //immutable._
+import collection.parallel._//immutable._
+
+
+
+
+
+
+
+object Loader extends BenchCompanion {
+ def benchName = "Loader"
+ def collectionName = "General"
+ def apply(sz: Int, p: Int, what: String) = new Loader(sz, p, what)
+ override def defaultSize = 100
+}
+
+
+class Loader(val size: Int, val parallelism: Int, val runWhat: String) extends Bench {
+ def companion = Loader
+
+ override def repetitionsPerRun = 1
+
+ reset
+
+ def runseq {
+ val m = Map(
+ '2' -> "ABC", '3' -> "DEF", '4' -> "GHI", '5' -> "JKL",
+ '6' -> "MNO", '7' -> "PQRS", '8' -> "TUV", '9' -> "WXYZ"
+ )
+ val charCode: Map[Char, Char] = for ((digit, letters) <- m; letter <- letters) yield letter -> digit
+ def wordCode(word: String): String = word.toUpperCase map charCode
+
+ Dictionary.wordarray groupBy wordCode
+ }
+
+ def runpar {
+ val m = Map(
+ '2' -> "ABC", '3' -> "DEF", '4' -> "GHI", '5' -> "JKL",
+ '6' -> "MNO", '7' -> "PQRS", '8' -> "TUV", '9' -> "WXYZ"
+ )
+ val charCode: Map[Char, Char] = for ((digit, letters) <- m; letter <- letters) yield letter -> digit
+ def wordCode(word: String): String = word.toUpperCase map charCode
+
+ Dictionary.wordarray.par groupBy wordCode
+ }
+
+ def reset = runWhat match {
+ case "seq" =>
+ case "par" =>
+ collection.parallel.tasksupport.environment.asInstanceOf[concurrent.forkjoin.ForkJoinPool].setParallelism(parallelism)
+ }
+
+ def comparisonMap = Map()
+
+}
+
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/GroupBy.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/GroupBy.scala
new file mode 100644
index 0000000000..abfba42b78
--- /dev/null
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/GroupBy.scala
@@ -0,0 +1,45 @@
+package scala.collection.parallel.benchmarks
+package parallel_array
+
+
+
+object GroupByLight extends Companion {
+ def benchName = "groupby-light";
+ def apply(sz: Int, parallelism: Int, what: String) = new GroupByLight(sz, parallelism, what)
+ override def comparisons = List()
+ override def defaultSize = 10000
+
+ val fun = (a: Cont) => a.in % 2
+}
+
+
+class GroupByLight(sz: Int, p: Int, what: String)
+extends Resettable(sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont])
+with HavingResult[Int] {
+ def companion = GroupByLight
+ runresult = -1
+
+ val array = new Array[Cont](sz)
+ for (i <- 0 until sz) array(i) = new Cont(i)
+
+ def runpar = runresult = pa.groupBy(GroupByLight.fun).size
+ def runseq = runresult = array.asInstanceOf[Array[Cont]].groupBy(GroupByLight.fun).size
+ def comparisonMap = collection.Map()
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/test/files/scalacheck/parallel-collections/IntOperators.scala b/test/files/scalacheck/parallel-collections/IntOperators.scala
index 24330d7670..4a74b91da8 100644
--- a/test/files/scalacheck/parallel-collections/IntOperators.scala
+++ b/test/files/scalacheck/parallel-collections/IntOperators.scala
@@ -60,6 +60,10 @@ trait IntOperators extends Operators[Int] {
Array.fill(1000)(1).toSeq
)
def newArray(sz: Int) = new Array[Int](sz)
+ def groupByFunctions = List(
+ _ % 2, _ % 3, _ % 5, _ % 10, _ % 154, _% 3217,
+ _ * 2, _ + 1
+ )
}
diff --git a/test/files/scalacheck/parallel-collections/Operators.scala b/test/files/scalacheck/parallel-collections/Operators.scala
index b4321cf805..72133a5009 100644
--- a/test/files/scalacheck/parallel-collections/Operators.scala
+++ b/test/files/scalacheck/parallel-collections/Operators.scala
@@ -21,6 +21,7 @@ trait Operators[T] {
def foldArguments: List[(T, (T, T) => T)]
def addAllTraversables: List[Traversable[T]]
def newArray(sz: Int): Array[T]
+ def groupByFunctions: List[T => T]
}
diff --git a/test/files/scalacheck/parallel-collections/PairOperators.scala b/test/files/scalacheck/parallel-collections/PairOperators.scala
index 2055c29d38..fe851114be 100644
--- a/test/files/scalacheck/parallel-collections/PairOperators.scala
+++ b/test/files/scalacheck/parallel-collections/PairOperators.scala
@@ -73,6 +73,10 @@ trait PairOperators[K, V] extends Operators[(K, V)] {
def newArray(sz: Int) = new Array[(K, V)](sz)
+ def groupByFunctions = (koperators.groupByFunctions zip voperators.groupByFunctions) map {
+ opt => { (p: (K, V)) => (opt._1(p._1), opt._2(p._2)) }
+ }
+
}
diff --git a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala
index 60e8c8b1f2..9ddd5781b9 100644
--- a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala
@@ -414,6 +414,22 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col
}).reduceLeft(_ && _)
}
+ // property("groupBy must be equal") = forAll(collectionPairs) {
+ // case (t, coll) =>
+ // (for ((f, ind) <- groupByFunctions.zipWithIndex) yield {
+ // val tgroup = t.groupBy(f)
+ // val cgroup = coll.groupBy(f)
+ // if (tgroup != cgroup || cgroup != tgroup) {
+ // println("from: " + t)
+ // println("and: " + coll)
+ // println("groups are: ")
+ // println(tgroup)
+ // println(cgroup)
+ // }
+ // ("operator " + ind) |: tgroup == cgroup && cgroup == tgroup
+ // }).reduceLeft(_ && _)
+ // }
+
}
diff --git a/test/files/scalacheck/parallel-collections/ParallelVectorCheck.scala b/test/files/scalacheck/parallel-collections/ParallelVectorCheck.scala
new file mode 100644
index 0000000000..a2b6cef96d
--- /dev/null
+++ b/test/files/scalacheck/parallel-collections/ParallelVectorCheck.scala
@@ -0,0 +1,61 @@
+package scala.collection
+package parallel.immutable
+
+
+
+import org.scalacheck._
+import org.scalacheck.Gen
+import org.scalacheck.Gen._
+import org.scalacheck.Prop._
+import org.scalacheck.Properties
+import org.scalacheck.Arbitrary._
+
+import scala.collection._
+import scala.collection.parallel.ops._
+
+
+import immutable.Vector
+import immutable.VectorBuilder
+
+
+
+
+abstract class ParallelVectorCheck[T](tp: String) extends collection.parallel.ParallelSeqCheck[T]("ParVector[" + tp + "]") {
+ // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
+
+ type CollType = ParVector[T]
+
+ def isCheckingViews = false
+
+ def hasStrictOrder = true
+
+ def ofSize(vals: Seq[Gen[T]], sz: Int) = {
+ val vb = new immutable.VectorBuilder[T]()
+ val gen = vals(rnd.nextInt(vals.size))
+ for (i <- 0 until sz) vb += sample(gen)
+ vb.result
+ }
+
+ def fromSeq(a: Seq[T]) = {
+ val pa = ParVector.newCombiner[T]
+ for (elem <- a.toList) pa += elem
+ pa.result
+ }
+
+}
+
+
+
+object IntParallelVectorCheck extends ParallelVectorCheck[Int]("Int") with IntSeqOperators with IntValues {
+ override def instances(vals: Seq[Gen[Int]]) = oneOf(super.instances(vals), sized { sz =>
+ (0 until sz).toArray.toSeq
+ }, sized { sz =>
+ (-sz until 0).toArray.toSeq
+ })
+}
+
+
+
+
+
diff --git a/test/files/scalacheck/parallel-collections/pc.scala b/test/files/scalacheck/parallel-collections/pc.scala
index 075a76ca6a..598c5a3751 100644
--- a/test/files/scalacheck/parallel-collections/pc.scala
+++ b/test/files/scalacheck/parallel-collections/pc.scala
@@ -29,6 +29,7 @@ class ParCollProperties extends Properties("Parallel collections") {
include(mutable.IntParallelHashSetCheck)
// parallel vectors
+ include(immutable.IntParallelVectorCheck)
/* Views */