From 733a3d756943e67d7608dbfd92aac445a080d69d Mon Sep 17 00:00:00 2001 From: Aleksandar Pokopec Date: Fri, 7 Jan 2011 12:05:31 +0000 Subject: Implemented a (slower) workaround for parallel ... Implemented a (slower) workaround for parallel vectors. Implemented group by. No review. --- .../scala/collection/immutable/HashMap.scala | 8 +- .../scala/collection/immutable/Vector.scala | 27 +++-- .../collection/parallel/ParIterableLike.scala | 43 ++++++-- .../scala/collection/parallel/ParSeqLike.scala | 31 +++--- .../collection/parallel/immutable/ParHashMap.scala | 82 ++++++++++++++- .../parallel/immutable/ParIterable.scala | 8 +- .../collection/parallel/immutable/ParRange.scala | 5 +- .../collection/parallel/immutable/ParSeq.scala | 46 +++++++++ .../collection/parallel/immutable/ParVector.scala | 115 +++++++++++++++++++++ .../parallel/immutable/ParallelSeq.scala.disabled | 44 -------- .../scala/collection/parallel/package.scala | 13 +-- test/benchmarks/source.list | 2 + .../scala/collection/parallel/Benchmarking.scala | 1 + .../parallel/benchmarks/misc/Coder.scala | 79 ++++++++------ .../parallel/benchmarks/misc/Dictionary.scala | 3 +- .../parallel/benchmarks/misc/Loader.scala | 64 ++++++++++++ .../benchmarks/parallel_array/GroupBy.scala | 45 ++++++++ .../parallel-collections/IntOperators.scala | 4 + .../parallel-collections/Operators.scala | 1 + .../parallel-collections/PairOperators.scala | 4 + .../ParallelIterableCheck.scala | 16 +++ .../parallel-collections/ParallelVectorCheck.scala | 61 +++++++++++ .../files/scalacheck/parallel-collections/pc.scala | 1 + 23 files changed, 573 insertions(+), 130 deletions(-) create mode 100644 src/library/scala/collection/parallel/immutable/ParSeq.scala create mode 100644 src/library/scala/collection/parallel/immutable/ParVector.scala delete mode 100644 src/library/scala/collection/parallel/immutable/ParallelSeq.scala.disabled create mode 100644 test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Loader.scala create mode 100644 test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/GroupBy.scala create mode 100644 test/files/scalacheck/parallel-collections/ParallelVectorCheck.scala diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala index 5b20638a84..a21e71158e 100644 --- a/src/library/scala/collection/immutable/HashMap.scala +++ b/src/library/scala/collection/immutable/HashMap.scala @@ -75,7 +75,7 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Par protected type Merger[B1] = ((A, B1), (A, B1)) => (A, B1) - protected def get0(key: A, hash: Int, level: Int): Option[B] = None + private[collection] def get0(key: A, hash: Int, level: Int): Option[B] = None private[collection] def updated0[B1 >: B](key: A, hash: Int, level: Int, value: B1, kv: (A, B1), merger: Merger[B1]): HashMap[A, B1] = new HashMap.HashMap1(key, hash, value, kv) @@ -117,7 +117,7 @@ object HashMap extends ImmutableMapFactory[HashMap] { // TODO: add HashMap2, HashMap3, ... - class HashMap1[A,+B](private[HashMap] var key: A, private[HashMap] var hash: Int, private[HashMap] var value: (B @uncheckedVariance), private[HashMap] var kv: (A,B @uncheckedVariance)) extends HashMap[A,B] { + class HashMap1[A,+B](private[HashMap] var key: A, private[HashMap] var hash: Int, private[collection] var value: (B @uncheckedVariance), private[collection] var kv: (A,B @uncheckedVariance)) extends HashMap[A,B] { override def size = 1 private[collection] def getKey = key @@ -188,7 +188,7 @@ object HashMap extends ImmutableMapFactory[HashMap] { } } - private class HashMapCollision1[A,+B](private[HashMap] var hash: Int, var kvs: ListMap[A,B @uncheckedVariance]) extends HashMap[A,B] { + private[collection] class HashMapCollision1[A,+B](private[HashMap] var hash: Int, var kvs: ListMap[A,B @uncheckedVariance]) extends HashMap[A,B] { override def size = kvs.size override def get0(key: A, hash: Int, level: Int): Option[B] = @@ -230,7 +230,7 @@ object HashMap extends ImmutableMapFactory[HashMap] { } } - class HashTrieMap[A,+B](private[HashMap] var bitmap: Int, private[HashMap] var elems: Array[HashMap[A,B @uncheckedVariance]], + class HashTrieMap[A,+B](private[HashMap] var bitmap: Int, private[collection] var elems: Array[HashMap[A,B @uncheckedVariance]], private[HashMap] var size0: Int) extends HashMap[A,B] { /* def this (level: Int, m1: HashMap1[A,B], m2: HashMap1[A,B]) = { diff --git a/src/library/scala/collection/immutable/Vector.scala b/src/library/scala/collection/immutable/Vector.scala index aeb3b82147..faee7aa60e 100644 --- a/src/library/scala/collection/immutable/Vector.scala +++ b/src/library/scala/collection/immutable/Vector.scala @@ -15,6 +15,7 @@ import compat.Platform import scala.collection.generic._ import scala.collection.mutable.Builder +import scala.collection.parallel.immutable.ParVector object Vector extends SeqFactory[Vector] { @@ -32,11 +33,14 @@ object Vector extends SeqFactory[Vector] { // in principle, most members should be private. however, access privileges must // be carefully chosen to not prevent method inlining -final class Vector[+A](startIndex: Int, endIndex: Int, focus: Int) extends IndexedSeq[A] - with GenericTraversableTemplate[A, Vector] - with IndexedSeqLike[A, Vector[A]] - with VectorPointer[A @uncheckedVariance] - with Serializable { self => +final class Vector[+A](private[collection] val startIndex: Int, private[collection] val endIndex: Int, focus: Int) +extends IndexedSeq[A] + with GenericTraversableTemplate[A, Vector] + with IndexedSeqLike[A, Vector[A]] + with VectorPointer[A @uncheckedVariance] + with Serializable + with Parallelizable[ParVector[A]] +{ self => override def companion: GenericCompanion[Vector] = Vector @@ -49,14 +53,19 @@ override def companion: GenericCompanion[Vector] = Vector def length = endIndex - startIndex - override def lengthCompare(len: Int): Int = length - len + def par = new ParVector(this) + override def lengthCompare(len: Int): Int = length - len - @inline override def iterator: VectorIterator[A] = { - val s = new VectorIterator[A](startIndex, endIndex) + private[collection] final def initIterator[B >: A](s: VectorIterator[B]) { s.initFrom(this) if (dirty) s.stabilize(focus) if (s.depth > 1) s.gotoPos(startIndex, startIndex ^ focus) + } + + @inline override def iterator: VectorIterator[A] = { + val s = new VectorIterator[A](startIndex, endIndex) + initIterator(s) s } @@ -602,7 +611,7 @@ override def companion: GenericCompanion[Vector] = Vector } -final class VectorIterator[+A](_startIndex: Int, _endIndex: Int) extends Iterator[A] with VectorPointer[A @uncheckedVariance] { +class VectorIterator[+A](_startIndex: Int, _endIndex: Int) extends Iterator[A] with VectorPointer[A @uncheckedVariance] { private var blockIndex: Int = _startIndex & ~31 private var lo: Int = _startIndex & 31 diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 3008f93ebd..9f894c0af8 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -10,12 +10,12 @@ import scala.collection.Parallel import scala.collection.Parallelizable import scala.collection.Sequentializable import scala.collection.generic._ - +import immutable.HashMapCombiner import java.util.concurrent.atomic.AtomicBoolean +import annotation.unchecked.uncheckedVariance -import annotation.unchecked.uncheckedStable // TODO update docs!! @@ -520,6 +520,12 @@ self => executeAndWaitResult(new Partition(pred, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) }) } + // override def groupBy[K](f: T => K): immutable.ParMap[K, Repr] = { + // executeAndWaitResult(new GroupBy(f, () => HashMapCombiner[K, T], parallelIterator) mapResult { + // rcb => rcb.groupByKey(cbfactory) + // }) + // } + override def take(n: Int): Repr = { val actualn = if (size > n) n else size if (actualn < MIN_FOR_COPY) take_sequential(actualn) @@ -893,9 +899,9 @@ self => def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf(self.repr)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new FlatMap(f, pbf, p) override def merge(that: FlatMap[S, That]) = { - debuglog("merging " + result + " and " + that.result) + //debuglog("merging " + result + " and " + that.result) result = result combine that.result - debuglog("merged into " + result) + //debuglog("merged into " + result) } } @@ -956,6 +962,29 @@ self => override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2) } + protected[this] class GroupBy[K, U >: T]( + f: U => K, + mcf: () => HashMapCombiner[K, U], + protected[this] val pit: ParIterableIterator[T] + ) extends Transformer[HashMapCombiner[K, U], GroupBy[K, U]] { + @volatile var result: Result = null + final def leaf(prev: Option[Result]) = { + // note: HashMapCombiner doesn't merge same keys until evaluation + val cb = mcf() + while (pit.hasNext) { + val elem = pit.next + cb += f(elem) -> elem + } + result = cb + } + protected[this] def newSubtask(p: ParIterableIterator[T]) = new GroupBy(f, mcf, p) + override def merge(that: GroupBy[K, U]) = { + // note: this works because we know that a HashMapCombiner doesn't merge same keys until evaluation + // --> we know we're not dropping any mappings + result = (result combine that.result).asInstanceOf[HashMapCombiner[K, U]] + } + } + protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, This], Take[U, This]] { @volatile var result: Combiner[U, This] = null @@ -1264,9 +1293,9 @@ self => private[parallel] def brokenInvariants = Seq[String]() - private val dbbuff = ArrayBuffer[String]() - def debugBuffer: ArrayBuffer[String] = dbbuff - // def debugBuffer: ArrayBuffer[String] = null + // private val dbbuff = ArrayBuffer[String]() + // def debugBuffer: ArrayBuffer[String] = dbbuff + def debugBuffer: ArrayBuffer[String] = null private[parallel] def debugclear() = synchronized { debugBuffer.clear diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 063a8cab7d..8a4f15fe6d 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -235,22 +235,23 @@ self => } } otherwise super.endsWith(that) - override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int) - (implicit bf: CanBuildFrom[Repr, U, That]): That = if (patch.isParSeq && bf.isParallel) { - val that = patch.asParSeq - val pbf = bf.asParallel + override def patch[U >: T, That](from: Int, patch: Seq[U], replaced: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = { val realreplaced = replaced min (length - from) - val pits = parallelIterator.psplit(from, replaced, length - from - realreplaced) - val copystart = new Copy[U, That](() => pbf(repr), pits(0)) - val copymiddle = wrap { - val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator) - tasksupport.executeAndWaitResult(tsk) - } - val copyend = new Copy[U, That](() => pbf(repr), pits(2)) - executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { - _.result - }) - } else patch_sequential(from, patch, replaced) + if (patch.isParSeq && bf.isParallel && (size - realreplaced + patch.size) > MIN_FOR_COPY) { + val that = patch.asParSeq + val pbf = bf.asParallel + val pits = parallelIterator.psplit(from, replaced, length - from - realreplaced) + val copystart = new Copy[U, That](() => pbf(repr), pits(0)) + val copymiddle = wrap { + val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator) + tasksupport.executeAndWaitResult(tsk) + } + val copyend = new Copy[U, That](() => pbf(repr), pits(2)) + executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { + _.result + }) + } else patch_sequential(from, patch, replaced) + } private def patch_sequential[U >: T, That](from: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = { val b = bf(repr) diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index 812a2ed94d..d60c2d39e8 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -141,7 +141,7 @@ object ParHashMap extends ParMapFactory[ParHashMap] { } -private[immutable] abstract class HashMapCombiner[K, V] +private[parallel] abstract class HashMapCombiner[K, V] extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), HashMapCombiner[K, V]](HashMapCombiner.rootsize) { self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => import HashMapCombiner._ @@ -183,6 +183,28 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => } } + def groupByKey[Repr](cbf: () => Combiner[V, Repr]): ParHashMap[K, Repr] = { + val bucks = buckets.filter(_ != null).map(_.headPtr) + val root = new Array[HashMap[K, AnyRef]](bucks.length) + + executeAndWaitResult(new CreateGroupedTrie(cbf, bucks, root, 0, bucks.length)) + + var bitmap = 0 + var i = 0 + while (i < rootsize) { + if (buckets(i) ne null) bitmap |= 1 << i + i += 1 + } + val sz = root.foldLeft(0)(_ + _.size) + + if (sz == 0) new ParHashMap[K, Repr] + else if (sz == 1) new ParHashMap[K, Repr](root(0).asInstanceOf[HashMap[K, Repr]]) + else { + val trie = new HashMap.HashTrieMap(bitmap, root.asInstanceOf[Array[HashMap[K, Repr]]], sz) + new ParHashMap[K, Repr](trie) + } + } + override def toString = { "HashTrieCombiner(sz: " + size + ")" //"HashTrieCombiner(buckets:\n\t" + buckets.filter(_ != null).mkString("\n\t") + ")\n" @@ -229,6 +251,64 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) } + class CreateGroupedTrie[Repr](cbf: () => Combiner[V, Repr], bucks: Array[Unrolled[(K, V)]], root: Array[HashMap[K, AnyRef]], offset: Int, howmany: Int) + extends Task[Unit, CreateGroupedTrie[Repr]] { + @volatile var result = () + def leaf(prev: Option[Unit]) = { + var i = offset + val until = offset + howmany + while (i < until) { + root(i) = createGroupedTrie(bucks(i)).asInstanceOf[HashMap[K, AnyRef]] + i += 1 + } + result = result + } + private def createGroupedTrie(elems: Unrolled[(K, V)]): HashMap[K, Repr] = { + var trie = new HashMap[K, Combiner[V, Repr]] + + var unrolled = elems + var i = 0 + while (unrolled ne null) { + val chunkarr = unrolled.array + val chunksz = unrolled.size + while (i < chunksz) { + val kv = chunkarr(i) + val hc = trie.computeHash(kv._1) + + // check to see if already present + val cmb: Combiner[V, Repr] = trie.get0(kv._1, hc, rootbits) match { + case Some(cmb) => cmb + case None => + val cmb: Combiner[V, Repr] = cbf() + trie = trie.updated0[Combiner[V, Repr]](kv._1, hc, rootbits, cmb, null, null) + cmb + } + cmb += kv._2 + i += 1 + } + i = 0 + unrolled = unrolled.next + } + + evaluateCombiners(trie) + trie.asInstanceOf[HashMap[K, Repr]] + } + private def evaluateCombiners(trie: HashMap[K, Combiner[V, Repr]]): Unit = trie match { + case hm1: HashMap.HashMap1[_, _] => + hm1.asInstanceOf[HashMap.HashMap1[K, Repr]].value = hm1.value.result + hm1.kv = null + case hmc: HashMap.HashMapCollision1[_, _] => + hmc.asInstanceOf[HashMap.HashMapCollision1[K, Repr]].kvs = hmc.kvs map { p => (p._1, p._2.result) } + case htm: HashMap.HashTrieMap[_, _] => + for (hm <- htm.elems) evaluateCombiners(hm) + } + def split = { + val fp = howmany / 2 + List(new CreateGroupedTrie(cbf, bucks, root, offset, fp), new CreateGroupedTrie(cbf, bucks, root, offset + fp, howmany - fp)) + } + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) + } + } diff --git a/src/library/scala/collection/parallel/immutable/ParIterable.scala b/src/library/scala/collection/parallel/immutable/ParIterable.scala index 48f2bdb439..00a8c02fd6 100644 --- a/src/library/scala/collection/parallel/immutable/ParIterable.scala +++ b/src/library/scala/collection/parallel/immutable/ParIterable.scala @@ -40,12 +40,12 @@ extends collection.immutable.Iterable[T] /** $factoryinfo */ object ParIterable extends ParFactory[ParIterable] { - implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] = - new GenericCanCombineFrom[T] + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] = + new GenericCanCombineFrom[T] - def newBuilder[T]: Combiner[T, ParIterable[T]] = HashSetCombiner[T] // TODO vector + def newBuilder[T]: Combiner[T, ParIterable[T]] = ParVector.newBuilder[T] - def newCombiner[T]: Combiner[T, ParIterable[T]] = HashSetCombiner[T] // TODO vector + def newCombiner[T]: Combiner[T, ParIterable[T]] = ParVector.newCombiner[T] } diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala index b1be7ffab5..2456c2beda 100644 --- a/src/library/scala/collection/parallel/immutable/ParRange.scala +++ b/src/library/scala/collection/parallel/immutable/ParRange.scala @@ -3,7 +3,6 @@ package scala.collection.parallel.immutable import scala.collection.immutable.Range -import scala.collection.parallel.ParSeq import scala.collection.parallel.Combiner import scala.collection.generic.CanCombineFrom import scala.collection.parallel.ParIterableIterator @@ -27,9 +26,9 @@ self => type SCPI = SignalContextPassingIterator[ParRangeIterator] - override def toParSeq = this // TODO remove when we have ParSeq, when ParVector is in place + override def toParSeq = this - override def toParSet[U >: Int] = toParCollection[U, ParSet[U]](() => HashSetCombiner[U]) // TODO remove when we have ParSeq, when ParVector is in place + override def toParSet[U >: Int] = toParCollection[U, ParSet[U]](() => HashSetCombiner[U]) class ParRangeIterator(range: Range = self.range) extends ParIterator { diff --git a/src/library/scala/collection/parallel/immutable/ParSeq.scala b/src/library/scala/collection/parallel/immutable/ParSeq.scala new file mode 100644 index 0000000000..68ed9a3139 --- /dev/null +++ b/src/library/scala/collection/parallel/immutable/ParSeq.scala @@ -0,0 +1,46 @@ +package scala.collection +package parallel.immutable + + +import scala.collection.generic.GenericParTemplate +import scala.collection.generic.GenericCompanion +import scala.collection.generic.GenericParCompanion +import scala.collection.generic.CanCombineFrom +import scala.collection.generic.ParFactory +import scala.collection.parallel.ParSeqLike +import scala.collection.parallel.Combiner + + + + +/** An immutable variant of `ParSeq`. + * + * @define Coll mutable.ParSeq + * @define coll mutable parallel sequence + */ +trait ParSeq[+T] +extends collection.immutable.Seq[T] + with collection.parallel.ParSeq[T] + with ParIterable[T] + with GenericParTemplate[T, ParSeq] + with ParSeqLike[T, ParSeq[T], Seq[T]] +{ + override def companion: GenericCompanion[ParSeq] with GenericParCompanion[ParSeq] = ParSeq + override def toSeq: ParSeq[T] = this +} + + +/** $factoryInfo + * @define Coll mutable.ParSeq + * @define coll mutable parallel sequence + */ +object ParSeq extends ParFactory[ParSeq] { + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParSeq[T]] = new GenericCanCombineFrom[T] + + def newBuilder[T]: Combiner[T, ParSeq[T]] = ParVector.newBuilder[T] + + def newCombiner[T]: Combiner[T, ParSeq[T]] = ParVector.newCombiner[T] +} + + + diff --git a/src/library/scala/collection/parallel/immutable/ParVector.scala b/src/library/scala/collection/parallel/immutable/ParVector.scala new file mode 100644 index 0000000000..663621e060 --- /dev/null +++ b/src/library/scala/collection/parallel/immutable/ParVector.scala @@ -0,0 +1,115 @@ +package scala.collection +package parallel.immutable + + + + +import scala.collection.generic.{GenericParTemplate, CanCombineFrom, ParFactory} +import scala.collection.parallel.ParSeqLike +import scala.collection.parallel.Combiner +import scala.collection.parallel.ParSeqIterator +import scala.collection.parallel.EnvironmentPassingCombiner +import mutable.ArrayBuffer +import immutable.Vector +import immutable.VectorBuilder +import immutable.VectorIterator + + + +class ParVector[+T](private[this] val vector: Vector[T]) +extends ParSeq[T] + with GenericParTemplate[T, ParVector] + with ParSeqLike[T, ParVector[T], Vector[T]] + with Serializable +{ + override def companion = ParVector + + def this() = this(Vector()) + + type SCPI = SignalContextPassingIterator[ParVectorIterator] + + def apply(idx: Int) = vector.apply(idx) + def length = vector.length + def parallelIterator: ParSeqIterator[T] = { + val pit = new ParVectorIterator(vector.startIndex, vector.endIndex) with SCPI + vector.initIterator(pit) + pit + } + def seq: Vector[T] = vector + + class ParVectorIterator(_start: Int, _end: Int) extends VectorIterator[T](_start, _end) with ParIterator { + self: SCPI => + def remaining: Int = remainingElementCount + def dup: ParSeqIterator[T] = (new ParVector(remainingVector)).parallelIterator + def split: Seq[ParVectorIterator] = { + val rem = remaining + if (rem >= 2) psplit(rem / 2, rem - rem / 2) + else Seq(this) + } + def psplit(sizes: Int*): Seq[ParVectorIterator] = { + var remvector = remainingVector + val splitted = new ArrayBuffer[Vector[T]] + for (sz <- sizes) { + splitted += remvector.take(sz) + remvector = remvector.drop(sz) + } + splitted.map(v => new ParVector(v).parallelIterator.asInstanceOf[ParVectorIterator]) + } + } + +} + + + +object ParVector extends ParFactory[ParVector] { + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParVector[T]] = + new GenericCanCombineFrom[T] + + def newBuilder[T]: Combiner[T, ParVector[T]] = new LazyParVectorCombiner[T] with EPC[T, ParVector[T]] + + def newCombiner[T]: Combiner[T, ParVector[T]] = new LazyParVectorCombiner[T] with EPC[T, ParVector[T]] +} + + + +private[immutable] class LazyParVectorCombiner[T] extends Combiner[T, ParVector[T]] { +self: EnvironmentPassingCombiner[T, ParVector[T]] => + var sz = 0 + val vectors = new ArrayBuffer[VectorBuilder[T]] += new VectorBuilder[T] + + def size: Int = sz + + def +=(elem: T): this.type = { + vectors.last += elem + sz += 1 + this + } + + def clear = { + vectors.clear + vectors += new VectorBuilder[T] + sz = 0 + } + + def result: ParVector[T] = { + val rvb = new VectorBuilder[T] + for (vb <- vectors) { + rvb ++= vb.result + } + new ParVector(rvb.result) + } + + def combine[U <: T, NewTo >: ParVector[T]](other: Combiner[U, NewTo]) = if (other eq this) this else { + val that = other.asInstanceOf[LazyParVectorCombiner[T]] + sz += that.sz + vectors ++= that.vectors + this + } + +} + + + + + + diff --git a/src/library/scala/collection/parallel/immutable/ParallelSeq.scala.disabled b/src/library/scala/collection/parallel/immutable/ParallelSeq.scala.disabled deleted file mode 100644 index e0e4e2ce54..0000000000 --- a/src/library/scala/collection/parallel/immutable/ParallelSeq.scala.disabled +++ /dev/null @@ -1,44 +0,0 @@ -package scala.collection.parallel.immutable - - -import scala.collection.generic.GenericParTemplate -import scala.collection.generic.GenericCompanion -import scala.collection.generic.GenericParCompanion -import scala.collection.generic.CanCombineFrom -import scala.collection.generic.ParFactory -import scala.collection.parallel.ParSeqLike -import scala.collection.parallel.Combiner - - - -// TODO uncomment when we add parallel vectors - -///** An immutable variant of `ParallelSeq`. -// * -// * @define Coll mutable.ParallelSeq -// * @define coll mutable parallel sequence -// */ -//trait ParallelSeq[A] extends collection.immutable.IndexedSeq[A] -// with ParallelIterable[A] -// with collection.parallel.ParallelSeq[A] -// with GenericParallelTemplate[A, ParallelSeq] -// with ParallelSeqLike[A, ParallelSeq[A], Seq[A]] { -// override def companion: GenericCompanion[ParallelSeq] with GenericParallelCompanion[ParallelSeq] = ParallelSeq -// -//} -// -// -///** $factoryInfo -// * @define Coll mutable.ParallelSeq -// * @define coll mutable parallel sequence -// */ -//object ParallelSeq extends ParallelFactory[ParallelSeq] { -// implicit def canBuildFrom[T]: CanBuildFromParallel[Coll, T, ParallelSeq[T]] = new GenericCanBuildFromParallel[T] -// -// def newBuilder[A]: Combiner[A, ParallelSeq[A]] = null // TODO -// -// def newCombiner[A]: Combiner[A, ParallelSeq[A]] = null // TODO -//} - - - diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index 1e545fd882..757f5d2686 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -15,7 +15,7 @@ import annotation.unchecked.uncheckedVariance package object parallel { /* constants */ - val MIN_FOR_COPY = -1 + val MIN_FOR_COPY = 512 val CHECK_RATE = 512 val SQRT2 = math.sqrt(2) val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors @@ -47,17 +47,6 @@ package object parallel { /* implicit conversions */ - /** An implicit conversion providing arrays with a `par` method, which - * returns a parallel array. - * - * @tparam T type of the elements in the array, which is a subtype of AnyRef - * @param array the array to be parallelized - * @return a `Parallelizable` object with a `par` method= - */ - implicit def array2ParArray[T <: AnyRef](array: Array[T]) = new Parallelizable[mutable.ParArray[T]] { - def par = mutable.ParArray.handoff[T](array) - } - trait FactoryOps[From, Elem, To] { trait Otherwise[R] { def otherwise(notbody: => R): R diff --git a/test/benchmarks/source.list b/test/benchmarks/source.list index 65ab4b9ca9..c5d5f7f8fe 100644 --- a/test/benchmarks/source.list +++ b/test/benchmarks/source.list @@ -22,6 +22,7 @@ src/scala/collection/parallel/benchmarks/parallel_array/SumLight.scala src/scala/collection/parallel/benchmarks/parallel_array/MinLight.scala src/scala/collection/parallel/benchmarks/parallel_array/CountList.scala src/scala/collection/parallel/benchmarks/parallel_array/PatchHalf.scala +src/scala/collection/parallel/benchmarks/parallel_array/GroupBy.scala src/scala/collection/parallel/benchmarks/parallel_array/DiffHalf.scala src/scala/collection/parallel/benchmarks/parallel_array/TakeMany.scala src/scala/collection/parallel/benchmarks/parallel_array/PartialMapLight.scala @@ -65,6 +66,7 @@ src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala src/scala/collection/parallel/benchmarks/generic/Dummy.scala src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala src/scala/collection/parallel/benchmarks/misc/Dictionary.scala +src/scala/collection/parallel/benchmarks/misc/Loader.scala src/scala/collection/parallel/benchmarks/misc/Coder.scala src/scala/collection/parallel/benchmarks/Bench.scala src/scala/collection/parallel/benchmarks/hashtries/Foreach.scala diff --git a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala index 65b9be4ca3..cbda3551e0 100644 --- a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala +++ b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala @@ -66,6 +66,7 @@ trait BenchmarkRegister { register(parallel_array.AggregateLight) register(parallel_array.ScanLight) register(parallel_array.ScanMedium) + register(parallel_array.GroupByLight) register(parallel_array.MatrixMultiplication) // parallel views diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Coder.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Coder.scala index 4f809cf734..5ed0ca317d 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Coder.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Coder.scala @@ -2,8 +2,12 @@ package scala.collection.parallel.benchmarks package misc -import collection.immutable._ -import collection.parallel.immutable._ + + + + +import collection._ //immutable._ +import collection.parallel._//immutable._ class SeqCoder(words: List[String]) { @@ -23,24 +27,32 @@ class SeqCoder(words: List[String]) { /** A map from digit strings to the words that represent * them e.g. `5282` -> List(`Java`, `Kata`, `Lava`, ...) */ - val wordsForNum: Map[String, List[String]] = - words groupBy wordCode withDefaultValue List() + val wordsForNum: Map[String, Seq[String]] = + (words groupBy wordCode).map(t => (t._1, t._2.toSeq)) withDefaultValue Seq() /** All ways to encode a number as a list of words */ - def encode(number: String): Set[List[String]] = - if (number.isEmpty) Set(List()) + def encode(number: String): Set[Seq[String]] = + if (number.isEmpty) Set(Seq()) else { val splits = (1 to number.length).toSet - for { - split <- splits - word <- wordsForNum(number take split) - rest <- encode(number drop split) - } yield word :: rest + // for { + // split <- splits + // word <- wordsForNum(number take split) + // rest <- encode(number drop split) + // } yield word :: rest + val r = splits.flatMap(split => { + val wfn = wordsForNum(number take split).flatMap(word => { + val subs = encode(number drop split) + subs.map(rest => word +: rest) + }) + wfn + }) + r } /** Maps a number to a list of all word phrases that can * represent it */ - def translate(number: String): Set[String] = encode(number) map (_ mkString " ") + def translate(number: String) = encode(number)// map (_ mkString " ") def ??? : Nothing = throw new UnsupportedOperationException } @@ -62,37 +74,43 @@ class ParCoder(words: List[String]) { /** A map from digit strings to the words that represent * them e.g. `5282` -> List(`Java`, `Kata`, `Lava`, ...) */ - val wordsForNum: Map[String, List[String]] = - words groupBy wordCode withDefaultValue List() + val wordsForNum: Map[String, Seq[String]] = + (words groupBy wordCode).map(t => (t._1, t._2)) withDefaultValue Seq() /** All ways to encode a number as a list of words */ - def encode(number: String): ParSet[List[String]] = - if (number.isEmpty) ParSet(List()) + def encode(number: String): Set[Seq[String]] = if (number.length > 12) { + if (number.isEmpty) ParSet(ParSeq()) else { val splits = (1 to number.length).toParSet for { - split <- splits - word <- wordsForNum(number take split) - rest <- encode(number drop split) - } yield word :: rest + split <- splits + word <- wordsForNum(number take split) + rest <- encode(number drop split) + } yield word +: rest + } + } else { + if (number.isEmpty) Set(Seq()) + else { + val splits = (1 to number.length).toSet + for { + split <- splits + word <- wordsForNum(number take split) + rest <- encode(number drop split) + } yield word +: rest } + } /** Maps a number to a list of all word phrases that can * represent it */ - def translate(number: String): ParSet[String] = encode(number) map (_ mkString " ") + def translate(number: String) = { + encode(number)// map (_ mkString " ") + } def ??? : Nothing = throw new UnsupportedOperationException } -/** Test code */ -object Main { - def main(args : Array[String]) : Unit = { - val coder = new SeqCoder(List("Scala", "Python", "Ruby", "Java", "Kata", "Lava", "a", "rocks", "pack", "rack", "sucks", "works")) - println(coder.wordsForNum) - println(coder.translate("7225276257")) - } -} + object Coder extends BenchCompanion { @@ -110,7 +128,7 @@ class Coder(val size: Int, val parallelism: Int, val runWhat: String) extends Be override def repetitionsPerRun = 1 - val code = "2328437472947362626"//33"//837976"//"6477323986225453446" + val code = "23284374729473626268379762538" reset @@ -131,6 +149,7 @@ class Coder(val size: Int, val parallelism: Int, val runWhat: String) extends Be println("Translation check: " + t.size) //println(t) case "par" => + collection.parallel.tasksupport.environment.asInstanceOf[concurrent.forkjoin.ForkJoinPool].setParallelism(parallelism) parcoder = new ParCoder(Dictionary.wordlist) val t = parcoder.translate(code) println("Translation check: " + t.size) diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Dictionary.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Dictionary.scala index 7ab5d94e93..e6ff55d234 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Dictionary.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Dictionary.scala @@ -4,7 +4,8 @@ package scala.collection.parallel.benchmarks.misc object Dictionary { - val words = wordlines.split(System.getProperty("line.separator")).filter(_.trim != "").toList + val wordlist = wordlines.split(System.getProperty("line.separator")).filter(_.trim != "").toList + val wordarray = wordlist.toArray def wordlines = { val is = getClass.getClassLoader.getResourceAsStream("scala/collection/parallel/benchmarks/misc/dict.txt") scala.io.Source.fromInputStream(is).mkString diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Loader.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Loader.scala new file mode 100644 index 0000000000..2a9fc2c3ef --- /dev/null +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/misc/Loader.scala @@ -0,0 +1,64 @@ +package scala.collection.parallel.benchmarks +package misc + + + + + + +import collection._ //immutable._ +import collection.parallel._//immutable._ + + + + + + + +object Loader extends BenchCompanion { + def benchName = "Loader" + def collectionName = "General" + def apply(sz: Int, p: Int, what: String) = new Loader(sz, p, what) + override def defaultSize = 100 +} + + +class Loader(val size: Int, val parallelism: Int, val runWhat: String) extends Bench { + def companion = Loader + + override def repetitionsPerRun = 1 + + reset + + def runseq { + val m = Map( + '2' -> "ABC", '3' -> "DEF", '4' -> "GHI", '5' -> "JKL", + '6' -> "MNO", '7' -> "PQRS", '8' -> "TUV", '9' -> "WXYZ" + ) + val charCode: Map[Char, Char] = for ((digit, letters) <- m; letter <- letters) yield letter -> digit + def wordCode(word: String): String = word.toUpperCase map charCode + + Dictionary.wordarray groupBy wordCode + } + + def runpar { + val m = Map( + '2' -> "ABC", '3' -> "DEF", '4' -> "GHI", '5' -> "JKL", + '6' -> "MNO", '7' -> "PQRS", '8' -> "TUV", '9' -> "WXYZ" + ) + val charCode: Map[Char, Char] = for ((digit, letters) <- m; letter <- letters) yield letter -> digit + def wordCode(word: String): String = word.toUpperCase map charCode + + Dictionary.wordarray.par groupBy wordCode + } + + def reset = runWhat match { + case "seq" => + case "par" => + collection.parallel.tasksupport.environment.asInstanceOf[concurrent.forkjoin.ForkJoinPool].setParallelism(parallelism) + } + + def comparisonMap = Map() + +} + diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/GroupBy.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/GroupBy.scala new file mode 100644 index 0000000000..abfba42b78 --- /dev/null +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/GroupBy.scala @@ -0,0 +1,45 @@ +package scala.collection.parallel.benchmarks +package parallel_array + + + +object GroupByLight extends Companion { + def benchName = "groupby-light"; + def apply(sz: Int, parallelism: Int, what: String) = new GroupByLight(sz, parallelism, what) + override def comparisons = List() + override def defaultSize = 10000 + + val fun = (a: Cont) => a.in % 2 +} + + +class GroupByLight(sz: Int, p: Int, what: String) +extends Resettable(sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) +with HavingResult[Int] { + def companion = GroupByLight + runresult = -1 + + val array = new Array[Cont](sz) + for (i <- 0 until sz) array(i) = new Cont(i) + + def runpar = runresult = pa.groupBy(GroupByLight.fun).size + def runseq = runresult = array.asInstanceOf[Array[Cont]].groupBy(GroupByLight.fun).size + def comparisonMap = collection.Map() +} + + + + + + + + + + + + + + + + + diff --git a/test/files/scalacheck/parallel-collections/IntOperators.scala b/test/files/scalacheck/parallel-collections/IntOperators.scala index 24330d7670..4a74b91da8 100644 --- a/test/files/scalacheck/parallel-collections/IntOperators.scala +++ b/test/files/scalacheck/parallel-collections/IntOperators.scala @@ -60,6 +60,10 @@ trait IntOperators extends Operators[Int] { Array.fill(1000)(1).toSeq ) def newArray(sz: Int) = new Array[Int](sz) + def groupByFunctions = List( + _ % 2, _ % 3, _ % 5, _ % 10, _ % 154, _% 3217, + _ * 2, _ + 1 + ) } diff --git a/test/files/scalacheck/parallel-collections/Operators.scala b/test/files/scalacheck/parallel-collections/Operators.scala index b4321cf805..72133a5009 100644 --- a/test/files/scalacheck/parallel-collections/Operators.scala +++ b/test/files/scalacheck/parallel-collections/Operators.scala @@ -21,6 +21,7 @@ trait Operators[T] { def foldArguments: List[(T, (T, T) => T)] def addAllTraversables: List[Traversable[T]] def newArray(sz: Int): Array[T] + def groupByFunctions: List[T => T] } diff --git a/test/files/scalacheck/parallel-collections/PairOperators.scala b/test/files/scalacheck/parallel-collections/PairOperators.scala index 2055c29d38..fe851114be 100644 --- a/test/files/scalacheck/parallel-collections/PairOperators.scala +++ b/test/files/scalacheck/parallel-collections/PairOperators.scala @@ -73,6 +73,10 @@ trait PairOperators[K, V] extends Operators[(K, V)] { def newArray(sz: Int) = new Array[(K, V)](sz) + def groupByFunctions = (koperators.groupByFunctions zip voperators.groupByFunctions) map { + opt => { (p: (K, V)) => (opt._1(p._1), opt._2(p._2)) } + } + } diff --git a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala index 60e8c8b1f2..9ddd5781b9 100644 --- a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala @@ -414,6 +414,22 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col }).reduceLeft(_ && _) } + // property("groupBy must be equal") = forAll(collectionPairs) { + // case (t, coll) => + // (for ((f, ind) <- groupByFunctions.zipWithIndex) yield { + // val tgroup = t.groupBy(f) + // val cgroup = coll.groupBy(f) + // if (tgroup != cgroup || cgroup != tgroup) { + // println("from: " + t) + // println("and: " + coll) + // println("groups are: ") + // println(tgroup) + // println(cgroup) + // } + // ("operator " + ind) |: tgroup == cgroup && cgroup == tgroup + // }).reduceLeft(_ && _) + // } + } diff --git a/test/files/scalacheck/parallel-collections/ParallelVectorCheck.scala b/test/files/scalacheck/parallel-collections/ParallelVectorCheck.scala new file mode 100644 index 0000000000..a2b6cef96d --- /dev/null +++ b/test/files/scalacheck/parallel-collections/ParallelVectorCheck.scala @@ -0,0 +1,61 @@ +package scala.collection +package parallel.immutable + + + +import org.scalacheck._ +import org.scalacheck.Gen +import org.scalacheck.Gen._ +import org.scalacheck.Prop._ +import org.scalacheck.Properties +import org.scalacheck.Arbitrary._ + +import scala.collection._ +import scala.collection.parallel.ops._ + + +import immutable.Vector +import immutable.VectorBuilder + + + + +abstract class ParallelVectorCheck[T](tp: String) extends collection.parallel.ParallelSeqCheck[T]("ParVector[" + tp + "]") { + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + + type CollType = ParVector[T] + + def isCheckingViews = false + + def hasStrictOrder = true + + def ofSize(vals: Seq[Gen[T]], sz: Int) = { + val vb = new immutable.VectorBuilder[T]() + val gen = vals(rnd.nextInt(vals.size)) + for (i <- 0 until sz) vb += sample(gen) + vb.result + } + + def fromSeq(a: Seq[T]) = { + val pa = ParVector.newCombiner[T] + for (elem <- a.toList) pa += elem + pa.result + } + +} + + + +object IntParallelVectorCheck extends ParallelVectorCheck[Int]("Int") with IntSeqOperators with IntValues { + override def instances(vals: Seq[Gen[Int]]) = oneOf(super.instances(vals), sized { sz => + (0 until sz).toArray.toSeq + }, sized { sz => + (-sz until 0).toArray.toSeq + }) +} + + + + + diff --git a/test/files/scalacheck/parallel-collections/pc.scala b/test/files/scalacheck/parallel-collections/pc.scala index 075a76ca6a..598c5a3751 100644 --- a/test/files/scalacheck/parallel-collections/pc.scala +++ b/test/files/scalacheck/parallel-collections/pc.scala @@ -29,6 +29,7 @@ class ParCollProperties extends Properties("Parallel collections") { include(mutable.IntParallelHashSetCheck) // parallel vectors + include(immutable.IntParallelVectorCheck) /* Views */ -- cgit v1.2.3