summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--build.xml6
-rw-r--r--src/library/scala/collection/generic/ParSetFactory.scala33
-rw-r--r--src/library/scala/collection/generic/Signalling.scala9
-rw-r--r--src/library/scala/collection/immutable/HashMap.scala198
-rw-r--r--src/library/scala/collection/immutable/HashSet.scala188
-rw-r--r--src/library/scala/collection/mutable/ConcurrentMap.scala6
-rw-r--r--src/library/scala/collection/parallel/ParIterable.scala3
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala14
-rw-r--r--src/library/scala/collection/parallel/ParMap.scala6
-rw-r--r--src/library/scala/collection/parallel/ParSet.scala83
-rw-r--r--src/library/scala/collection/parallel/ParSetLike.scala81
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala5
-rw-r--r--src/library/scala/collection/parallel/Splitter.scala9
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala50
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala (renamed from src/library/scala/collection/parallel/immutable/ParHashTrie.scala)117
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala279
-rw-r--r--src/library/scala/collection/parallel/immutable/package.scala21
-rw-r--r--src/library/scala/util/control/Breaks.scala10
-rw-r--r--test/files/scalacheck/HashTrieSplit.scala47
19 files changed, 940 insertions, 225 deletions
diff --git a/build.xml b/build.xml
index 4086a9eadf..e430e9693a 100644
--- a/build.xml
+++ b/build.xml
@@ -1563,12 +1563,12 @@ BOOTRAPING TEST AND TEST SUITE
<include name="run/**/*.scala"/>
</runtests>
<jvmtests dir="${partest.dir}/${partest.srcdir}/jvm" includes="*.scala"/>
- <scalachecktests dir="${partest.dir}/${partest.srcdir}/scalacheck">
- <include name="*.scala"/>
- </scalachecktests>
<residenttests dir="${partest.dir}/${partest.srcdir}/res" includes="*.res"/>
<buildmanagertests dir="${partest.dir}/${partest.srcdir}/buildmanager" includes="*"/>
<scalaptests dir="${partest.dir}/${partest.srcdir}/scalap" includes="**/*.scala"/>
+ <scalachecktests dir="${partest.dir}/${partest.srcdir}/scalacheck">
+ <include name="*.scala"/>
+ </scalachecktests>
<!-- <scripttests dir="${partest.dir}/${partest.srcdir}/script" includes="*.scala"/> -->
</partest>
</target>
diff --git a/src/library/scala/collection/generic/ParSetFactory.scala b/src/library/scala/collection/generic/ParSetFactory.scala
new file mode 100644
index 0000000000..7c43b29bf4
--- /dev/null
+++ b/src/library/scala/collection/generic/ParSetFactory.scala
@@ -0,0 +1,33 @@
+package scala.collection.generic
+
+
+
+
+
+import collection.mutable.Builder
+import collection.parallel.Combiner
+import collection.parallel.ParSet
+import collection.parallel.ParSetLike
+
+
+
+
+
+
+abstract class ParSetFactory[CC[X] <: ParSet[X] with ParSetLike[X, CC[X], _] with GenericParTemplate[X, CC]]
+ extends SetFactory[CC]
+ with GenericParCompanion[CC]
+{
+ def newBuilder[A]: Combiner[A, CC[A]] = newCombiner[A]
+
+ def newCombiner[A]: Combiner[A, CC[A]]
+
+ class GenericCanCombineFrom[A] extends CanCombineFrom[CC[_], A, CC[A]] {
+ override def apply(from: Coll) = from.genericCombiner[A]
+ override def apply() = newCombiner[A]
+ }
+}
+
+
+
+
diff --git a/src/library/scala/collection/generic/Signalling.scala b/src/library/scala/collection/generic/Signalling.scala
index 1dac4297b7..eaf945a4b7 100644
--- a/src/library/scala/collection/generic/Signalling.scala
+++ b/src/library/scala/collection/generic/Signalling.scala
@@ -91,10 +91,7 @@ trait Signalling {
/**
* This signalling implementation returns default values and ignores received signals.
*/
-class DefaultSignalling extends Signalling {
- def isAborted = false
- def abort {}
-
+class DefaultSignalling extends Signalling with VolatileAbort {
def indexFlag = -1
def setIndexFlag(f: Int) {}
def setIndexFlagIfGreater(f: Int) {}
@@ -115,8 +112,8 @@ object IdleSignalling extends DefaultSignalling
*/
trait VolatileAbort extends Signalling {
@volatile private var abortflag = false
- abstract override def isAborted = abortflag
- abstract override def abort = abortflag = true
+ override def isAborted = abortflag
+ override def abort = abortflag = true
}
diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala
index 1084e04101..d167e6d6a7 100644
--- a/src/library/scala/collection/immutable/HashMap.scala
+++ b/src/library/scala/collection/immutable/HashMap.scala
@@ -15,7 +15,7 @@ import generic._
import annotation.unchecked.uncheckedVariance
-import parallel.immutable.ParHashTrie
+import parallel.immutable.ParHashMap
/** This class implements immutable maps using a hash trie.
@@ -36,7 +36,7 @@ import parallel.immutable.ParHashTrie
* @define willNotTerminateInf
*/
@serializable @SerialVersionUID(2L)
-class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Parallelizable[ParHashTrie[A, B]] {
+class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Parallelizable[ParHashMap[A, B]] {
override def size: Int = 0
@@ -90,7 +90,7 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Par
protected def merge0[B1 >: B](that: HashMap[A, B1], level: Int, merger: Merger[B1]): HashMap[A, B1] = that
- def par = ParHashTrie.fromTrie(this)
+ def par = ParHashMap.fromTrie(this)
}
@@ -307,78 +307,7 @@ object HashMap extends ImmutableMapFactory[HashMap] {
}
}
-/*
- override def iterator = { // TODO: optimize (use a stack to keep track of pos)
-
- def iter(m: HashTrieMap[A,B], k: => Stream[(A,B)]): Stream[(A,B)] = {
- def horiz(elems: Array[HashMap[A,B]], i: Int, k: => Stream[(A,B)]): Stream[(A,B)] = {
- if (i < elems.length) {
- elems(i) match {
- case m: HashTrieMap[A,B] => iter(m, horiz(elems, i+1, k))
- case m: HashMap1[A,B] => new Stream.Cons(m.ensurePair, horiz(elems, i+1, k))
- }
- } else k
- }
- horiz(m.elems, 0, k)
- }
- iter(this, Stream.empty).iterator
- }
-*/
-
-
- override def iterator = new Iterator[(A,B)] {
- private[this] var depth = 0
- private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6)
- private[this] var posStack = new Array[Int](6)
-
- private[this] var arrayD = elems
- private[this] var posD = 0
-
- private[this] var subIter: Iterator[(A,B)] = null // to traverse collision nodes
-
- def hasNext = (subIter ne null) || depth >= 0
-
- def next: (A,B) = {
- if (subIter ne null) {
- val el = subIter.next
- if (!subIter.hasNext)
- subIter = null
- el
- } else
- next0(arrayD, posD)
- }
-
- @scala.annotation.tailrec private[this] def next0(elems: Array[HashMap[A,B]], i: Int): (A,B) = {
- if (i == elems.length-1) { // reached end of level, pop stack
- depth -= 1
- if (depth >= 0) {
- arrayD = arrayStack(depth)
- posD = posStack(depth)
- arrayStack(depth) = null
- } else {
- arrayD = null
- posD = 0
- }
- } else
- posD += 1
-
- elems(i) match {
- case m: HashTrieMap[A,B] => // push current pos onto stack and descend
- if (depth >= 0) {
- arrayStack(depth) = arrayD
- posStack(depth) = posD
- }
- depth += 1
- arrayD = m.elems
- posD = 0
- next0(m.elems, 0)
- case m: HashMap1[A,B] => m.ensurePair
- case m =>
- subIter = m.iterator
- subIter.next
- }
- }
- }
+ override def iterator = new TrieIterator[A, B](elems)
/*
@@ -534,6 +463,125 @@ time { mNew.iterator.foreach( p => ()) }
}
+ class TrieIterator[A, +B](elems: Array[HashMap[A, B]]) extends Iterator[(A, B)] {
+ private[this] var depth = 0
+ private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6)
+ private[this] var posStack = new Array[Int](6)
+
+ private[this] var arrayD = elems
+ private[this] var posD = 0
+
+ private[this] var subIter: Iterator[(A, B)] = null // to traverse collision nodes
+
+ def hasNext = (subIter ne null) || depth >= 0
+
+ def next: (A,B) = {
+ if (subIter ne null) {
+ val el = subIter.next
+ if (!subIter.hasNext)
+ subIter = null
+ el
+ } else
+ next0(arrayD, posD)
+ }
+
+ @scala.annotation.tailrec private[this] def next0(elems: Array[HashMap[A,B]], i: Int): (A,B) = {
+ if (i == elems.length-1) { // reached end of level, pop stack
+ depth -= 1
+ if (depth >= 0) {
+ arrayD = arrayStack(depth)
+ posD = posStack(depth)
+ arrayStack(depth) = null
+ } else {
+ arrayD = null
+ posD = 0
+ }
+ } else
+ posD += 1
+
+ elems(i) match {
+ case m: HashTrieMap[A,B] => // push current pos onto stack and descend
+ if (depth >= 0) {
+ arrayStack(depth) = arrayD
+ posStack(depth) = posD
+ }
+ depth += 1
+ arrayD = m.elems
+ posD = 0
+ next0(m.elems, 0)
+ case m: HashMap1[A,B] => m.ensurePair
+ case m =>
+ subIter = m.iterator
+ subIter.next
+ }
+ }
+
+ // assumption: contains 2 or more elements
+ // splits this iterator into 2 iterators
+ // returns the 1st iterator, its number of elements, and the second iterator
+ def split: ((Iterator[(A, B)], Int), Iterator[(A, B)]) = {
+ // 0) simple case: no elements have been iterated - simply divide arrayD
+ if (arrayD != null && depth == 0 && posD == 0) {
+ val (fst, snd) = arrayD.splitAt(arrayD.length / 2)
+ val szfst = fst.foldLeft(0)(_ + _.size)
+ return ((new TrieIterator(fst), szfst), new TrieIterator(snd))
+ }
+
+ // otherwise, some elements have been iterated over
+ // 1) collision case: if we have a subIter, we return subIter and elements after it
+ if (subIter ne null) {
+ val buff = subIter.toBuffer
+ subIter = null
+ ((buff.iterator, buff.length), this)
+ } else {
+ // otherwise find the topmost array stack element
+ if (depth > 0) {
+ // 2) topmost comes before (is not) arrayD
+ // steal a portion of top to create a new iterator
+ val topmost = arrayStack(0)
+ if (posStack(0) == arrayStack(0).length - 1) {
+ // 2a) only a single entry left on top
+ // this means we have to modify this iterator - pop topmost
+ val snd = Array(arrayStack(0).last)
+ val szsnd = snd(0).size
+ // modify this - pop
+ depth -= 1
+ arrayStack = arrayStack.tail ++ Array[Array[HashMap[A, B]]](null)
+ posStack = posStack.tail ++ Array[Int](0)
+ // we know that `this` is not empty, since it had something on the arrayStack and arrayStack elements are always non-empty
+ ((new TrieIterator[A, B](snd), szsnd), this)
+ } else {
+ // 2b) more than a single entry left on top
+ val (fst, snd) = arrayStack(0).splitAt(arrayStack(0).length - (arrayStack(0).length - posStack(0) + 1) / 2)
+ arrayStack(0) = fst
+ val szsnd = snd.foldLeft(0)(_ + _.size)
+ ((new TrieIterator[A, B](snd), szsnd), this)
+ }
+ } else {
+ // 3) no topmost element (arrayD is at the top)
+ // steal a portion of it and update this iterator
+ if (posD == arrayD.length - 1) {
+ // 3a) positioned at the last element of arrayD
+ val arr: Array[HashMap[A, B]] = arrayD(posD) match {
+ case c: HashMapCollision1[_, _] => c.asInstanceOf[HashMapCollision1[A, B]].kvs.toArray map { HashMap() + _ }
+ case ht: HashTrieMap[_, _] => ht.asInstanceOf[HashTrieMap[A, B]].elems
+ case _ => error("cannot divide single element")
+ }
+ val (fst, snd) = arr.splitAt(arr.length / 2)
+ val szsnd = snd.foldLeft(0)(_ + _.size)
+ ((new TrieIterator(snd), szsnd), new TrieIterator(fst))
+ } else {
+ // 3b) arrayD has more free elements
+ val (fst, snd) = arrayD.splitAt(arrayD.length - (arrayD.length - posD + 1) / 2)
+ arrayD = fst
+ val szsnd = snd.foldLeft(0)(_ + _.size)
+ ((new TrieIterator[A, B](snd), szsnd), this)
+ }
+ }
+ }
+ }
+ }
+
private def check[K](x: HashMap[K, _], y: HashMap[K, _], xy: HashMap[K, _]) = { // TODO remove this debugging helper
var xs = Set[K]()
for (elem <- x) xs += elem._1
diff --git a/src/library/scala/collection/immutable/HashSet.scala b/src/library/scala/collection/immutable/HashSet.scala
index 08e64d6709..3f04d63b40 100644
--- a/src/library/scala/collection/immutable/HashSet.scala
+++ b/src/library/scala/collection/immutable/HashSet.scala
@@ -14,6 +14,8 @@ package immutable
import generic._
import annotation.unchecked.uncheckedVariance
+import collection.parallel.immutable.ParHashSet
+
/** This class implements immutable sets using a hash trie.
*
* '''Note:''' the builder of a hash set returns specialized representations `EmptySet`,`Set1`,..., `Set4`
@@ -31,7 +33,9 @@ import annotation.unchecked.uncheckedVariance
@serializable @SerialVersionUID(2L)
class HashSet[A] extends Set[A]
with GenericSetTemplate[A, HashSet]
- with SetLike[A, HashSet[A]] {
+ with SetLike[A, HashSet[A]]
+ with Parallelizable[ParHashSet[A]]
+{
override def companion: GenericCompanion[HashSet] = HashSet
//class HashSet[A] extends Set[A] with SetLike[A, HashSet[A]] {
@@ -55,6 +59,8 @@ class HashSet[A] extends Set[A]
def - (e: A): HashSet[A] =
removed0(e, computeHash(e), 0)
+ def par = ParHashSet.fromTrie(this)
+
protected def elemHashCode(key: A) = if (key == null) 0 else key.##
protected final def improve(hcode: Int) = {
@@ -68,7 +74,7 @@ class HashSet[A] extends Set[A]
protected def get0(key: A, hash: Int, level: Int): Boolean = false
- protected def updated0(key: A, hash: Int, level: Int): HashSet[A] =
+ def updated0(key: A, hash: Int, level: Int): HashSet[A] =
new HashSet.HashSet1(key, hash)
protected def removed0(key: A, hash: Int, level: Int): HashSet[A] = this
@@ -169,7 +175,7 @@ object HashSet extends ImmutableSetFactory[HashSet] {
}
- class HashTrieSet[A](private var bitmap: Int, private var elems: Array[HashSet[A]],
+ class HashTrieSet[A](private var bitmap: Int, private[HashSet] var elems: Array[HashSet[A]],
private var size0: Int) extends HashSet[A] {
override def size = size0
@@ -239,60 +245,7 @@ object HashSet extends ImmutableSetFactory[HashSet] {
}
}
-
- override def iterator = new Iterator[A] {
- private[this] var depth = 0
- private[this] var arrayStack = new Array[Array[HashSet[A]]](6)
- private[this] var posStack = new Array[Int](6)
-
- private[this] var arrayD = elems
- private[this] var posD = 0
-
- private[this] var subIter: Iterator[A] = null // to traverse collision nodes
-
- def hasNext = (subIter ne null) || depth >= 0
-
- def next: A = {
- if (subIter ne null) {
- val el = subIter.next
- if (!subIter.hasNext)
- subIter = null
- el
- } else
- next0(arrayD, posD)
- }
-
- @scala.annotation.tailrec private[this] def next0(elems: Array[HashSet[A]], i: Int): A = {
- if (i == elems.length-1) { // reached end of level, pop stack
- depth -= 1
- if (depth >= 0) {
- arrayD = arrayStack(depth)
- posD = posStack(depth)
- arrayStack(depth) = null
- } else {
- arrayD = null
- posD = 0
- }
- } else
- posD += 1
-
- elems(i) match {
- case m: HashTrieSet[A] => // push current pos onto stack and descend
- if (depth >= 0) {
- arrayStack(depth) = arrayD
- posStack(depth) = posD
- }
- depth += 1
- arrayD = m.elems
- posD = 0
- next0(m.elems, 0)
- case m: HashSet1[A] => m.key
- case m =>
- subIter = m.iterator
- subIter.next
- }
- }
- }
+ override def iterator = new TrieIterator[A](elems)
/*
@@ -315,7 +268,6 @@ time { mNew.iterator.foreach( p => ()) }
*/
-
override def foreach[U](f: A => U): Unit = {
var i = 0;
while (i < elems.length) {
@@ -325,6 +277,126 @@ time { mNew.iterator.foreach( p => ()) }
}
}
+
+ class TrieIterator[A](elems: Array[HashSet[A]]) extends Iterator[A] {
+ private[this] var depth = 0
+ private[this] var arrayStack = new Array[Array[HashSet[A]]](6)
+ private[this] var posStack = new Array[Int](6)
+
+ private[this] var arrayD = elems
+ private[this] var posD = 0
+
+ private[this] var subIter: Iterator[A] = null // to traverse collision nodes
+
+ def hasNext = (subIter ne null) || depth >= 0
+
+ def next: A = {
+ if (subIter ne null) {
+ val el = subIter.next
+ if (!subIter.hasNext)
+ subIter = null
+ el
+ } else
+ next0(arrayD, posD)
+ }
+
+ @scala.annotation.tailrec private[this] def next0(elems: Array[HashSet[A]], i: Int): A = {
+ if (i == elems.length-1) { // reached end of level, pop stack
+ depth -= 1
+ if (depth >= 0) {
+ arrayD = arrayStack(depth)
+ posD = posStack(depth)
+ arrayStack(depth) = null
+ } else {
+ arrayD = null
+ posD = 0
+ }
+ } else
+ posD += 1
+
+ elems(i) match {
+ case m: HashTrieSet[A] => // push current pos onto stack and descend
+ if (depth >= 0) {
+ arrayStack(depth) = arrayD
+ posStack(depth) = posD
+ }
+ depth += 1
+ arrayD = m.elems
+ posD = 0
+ next0(m.elems, 0)
+ case m: HashSet1[A] => m.key
+ case m =>
+ subIter = m.iterator
+ subIter.next
+ }
+ }
+
+ // assumption: contains 2 or more elements
+ // splits this iterator into 2 iterators
+ // returns the 1st iterator, its number of elements, and the second iterator
+ def split: ((Iterator[A], Int), Iterator[A]) = {
+ // 0) simple case: no elements have been iterated - simply divide arrayD
+ if (arrayD != null && depth == 0 && posD == 0) {
+ val (fst, snd) = arrayD.splitAt(arrayD.length / 2)
+ val szfst = fst.foldLeft(0)(_ + _.size)
+ return ((new TrieIterator(fst), szfst), new TrieIterator(snd))
+ }
+
+ // otherwise, some elements have been iterated over
+ // 1) collision case: if we have a subIter, we return subIter and elements after it
+ if (subIter ne null) {
+ val buff = subIter.toBuffer
+ subIter = null
+ ((buff.iterator, buff.length), this)
+ } else {
+ // otherwise find the topmost array stack element
+ if (depth > 0) {
+ // 2) topmost comes before (is not) arrayD
+ // steal a portion of top to create a new iterator
+ val topmost = arrayStack(0)
+ if (posStack(0) == arrayStack(0).length - 1) {
+ // 2a) only a single entry left on top
+ // this means we have to modify this iterator - pop topmost
+ val snd = Array(arrayStack(0).last)
+ val szsnd = snd(0).size
+ // modify this - pop
+ depth -= 1
+ arrayStack = arrayStack.tail ++ Array[Array[HashSet[A]]](null)
+ posStack = posStack.tail ++ Array[Int](0)
+ // we know that `this` is not empty, since it had something on the arrayStack and arrayStack elements are always non-empty
+ ((new TrieIterator[A](snd), szsnd), this)
+ } else {
+ // 2b) more than a single entry left on top
+ val (fst, snd) = arrayStack(0).splitAt(arrayStack(0).length - (arrayStack(0).length - posStack(0) + 1) / 2)
+ arrayStack(0) = fst
+ val szsnd = snd.foldLeft(0)(_ + _.size)
+ ((new TrieIterator[A](snd), szsnd), this)
+ }
+ } else {
+ // 3) no topmost element (arrayD is at the top)
+ // steal a portion of it and update this iterator
+ if (posD == arrayD.length - 1) {
+ // 3a) positioned at the last element of arrayD
+ val arr: Array[HashSet[A]] = arrayD(posD) match {
+ case c: HashSetCollision1[_] => c.asInstanceOf[HashSetCollision1[A]].ks.toList map { HashSet() + _ } toArray
+ case ht: HashTrieSet[_] => ht.asInstanceOf[HashTrieSet[A]].elems
+ case _ => error("cannot divide single element")
+ }
+ val (fst, snd) = arr.splitAt(arr.length / 2)
+ val szsnd = snd.foldLeft(0)(_ + _.size)
+ ((new TrieIterator(snd), szsnd), new TrieIterator(fst))
+ } else {
+ // 3b) arrayD has more free elements
+ val (fst, snd) = arrayD.splitAt(arrayD.length - (arrayD.length - posD + 1) / 2)
+ arrayD = fst
+ val szsnd = snd.foldLeft(0)(_ + _.size)
+ ((new TrieIterator[A](snd), szsnd), this)
+ }
+ }
+ }
+ }
+ }
+
@serializable @SerialVersionUID(2L) private class SerializationProxy[A,B](@transient private var orig: HashSet[A]) {
private def writeObject(out: java.io.ObjectOutputStream) {
val s = orig.size
diff --git a/src/library/scala/collection/mutable/ConcurrentMap.scala b/src/library/scala/collection/mutable/ConcurrentMap.scala
index f9505f694b..a88679a8c9 100644
--- a/src/library/scala/collection/mutable/ConcurrentMap.scala
+++ b/src/library/scala/collection/mutable/ConcurrentMap.scala
@@ -20,12 +20,13 @@ package mutable
* Note: The concurrent maps do not accept `null` for keys or values.
*
* @define atomicop
- * This is done atomically.
+ * This is an atomic operation.
*/
trait ConcurrentMap[A, B] extends Map[A, B] {
/**
* Associates the given key with a given value, unless the key was already associated with some other value.
+ *
* $atomicop
*
* @param k key with which the specified value is to be associated with
@@ -37,6 +38,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] {
/**
* Removes the entry for the specified key if its currently mapped to the specified value.
+ *
* $atomicop
*
* @param k key for which the entry should be removed
@@ -47,6 +49,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] {
/**
* Replaces the entry for the given key only if it was previously mapped to a given value.
+ *
* $atomicop
*
* @param k key for which the entry should be replaced
@@ -58,6 +61,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] {
/**
* Replaces the entry for the given key only if it was previously mapped to some value.
+ *
* $atomicop
*
* @param k key for which the entry should be replaced
diff --git a/src/library/scala/collection/parallel/ParIterable.scala b/src/library/scala/collection/parallel/ParIterable.scala
index 7dd9b3038a..7f5b39d3e4 100644
--- a/src/library/scala/collection/parallel/ParIterable.scala
+++ b/src/library/scala/collection/parallel/ParIterable.scala
@@ -26,8 +26,7 @@ trait ParIterable[+T] extends Iterable[T]
/** $factoryinfo
*/
object ParIterable extends ParFactory[ParIterable] {
- implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] =
- new GenericCanCombineFrom[T]
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] = new GenericCanCombineFrom[T]
def newBuilder[T]: Combiner[T, ParIterable[T]] = ParArrayCombiner[T]
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 907a3d84da..bdfb92a405 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -427,7 +427,10 @@ self =>
executeAndWaitResult(new Find(pred, parallelIterator assign new DefaultSignalling with VolatileAbort))
}
- protected[this] def cbfactory = () => newCombiner
+ protected[this] def cbfactory ={
+ println(newCombiner + ", " + newCombiner.getClass)
+ () => newCombiner
+ }
override def filter(pred: T => Boolean): Repr = {
executeAndWaitResult(new Filter(pred, cbfactory, parallelIterator) mapResult { _.result })
@@ -655,6 +658,7 @@ self =>
protected[this] def newSubtask(p: ParIterableIterator[T]): Accessor[R, Tp]
def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel)
def split = pit.split.map(newSubtask(_)) // default split procedure
+ private[parallel] override def signalAbort = pit.abort
override def toString = "Accessor(" + pit.toString + ")"
}
@@ -672,6 +676,10 @@ self =>
val st: Second
def combineResults(fr: FR, sr: SR): R
var result: R = null.asInstanceOf[R]
+ private[parallel] override def signalAbort {
+ ft.signalAbort
+ st.signalAbort
+ }
}
/** Sequentially performs one task after another. */
@@ -703,6 +711,9 @@ self =>
inner.compute
result = map(inner.result)
}
+ private[parallel] override def signalAbort {
+ inner.signalAbort
+ }
}
protected trait Transformer[R, Tp] extends Accessor[R, Tp]
@@ -1187,6 +1198,7 @@ self =>
new ScanWithScanTree(Some(st.left.value), op, st.right, src, dest)
)
def shouldSplitFurther = (st.left ne null) && (st.right ne null)
+
}
protected[this] class FromArray[S, A, That](array: Array[A], from: Int, len: Int, cbf: CanCombineFrom[Repr, S, That])
diff --git a/src/library/scala/collection/parallel/ParMap.scala b/src/library/scala/collection/parallel/ParMap.scala
index bf6d9ef644..b33b27c42b 100644
--- a/src/library/scala/collection/parallel/ParMap.scala
+++ b/src/library/scala/collection/parallel/ParMap.scala
@@ -26,7 +26,7 @@ self =>
def mapCompanion: GenericParMapCompanion[ParMap] = ParMap
- override def empty: ParMap[K, V] = new immutable.ParHashTrie[K, V]
+ override def empty: ParMap[K, V] = new immutable.ParHashMap[K, V]
override def stringPrefix = "ParMap"
}
@@ -34,9 +34,9 @@ self =>
object ParMap extends ParMapFactory[ParMap] {
- def empty[K, V]: ParMap[K, V] = new immutable.ParHashTrie[K, V]
+ def empty[K, V]: ParMap[K, V] = new immutable.ParHashMap[K, V]
- def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = immutable.HashTrieCombiner[K, V]
+ def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = immutable.HashMapCombiner[K, V]
implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParMap[K, V]] = new CanCombineFromMap[K, V]
diff --git a/src/library/scala/collection/parallel/ParSet.scala b/src/library/scala/collection/parallel/ParSet.scala
new file mode 100644
index 0000000000..cd59bc0a78
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParSet.scala
@@ -0,0 +1,83 @@
+package scala.collection.parallel
+
+
+
+
+
+
+
+import scala.collection.Map
+import scala.collection.mutable.Builder
+import scala.collection.generic._
+
+
+
+
+
+
+trait ParSet[T]
+extends Set[T]
+ with GenericParTemplate[T, ParSet]
+ with ParIterable[T]
+ with ParSetLike[T, ParSet[T], Set[T]]
+{
+self =>
+ override def empty: ParSet[T] = immutable.ParHashSet[T]()
+
+ override def companion: GenericCompanion[ParSet] with GenericParCompanion[ParSet] = ParSet
+
+ override def stringPrefix = "ParSet"
+}
+
+
+
+object ParSet extends ParSetFactory[ParSet] {
+ def newCombiner[T]: Combiner[T, ParSet[T]] = immutable.HashSetCombiner[T]
+
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParSet[T]] = new GenericCanCombineFrom[T]
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/ParSetLike.scala b/src/library/scala/collection/parallel/ParSetLike.scala
new file mode 100644
index 0000000000..444117368c
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParSetLike.scala
@@ -0,0 +1,81 @@
+package scala.collection.parallel
+
+
+
+import scala.collection.SetLike
+import scala.collection.Set
+import scala.collection.mutable.Builder
+
+
+
+
+
+
+
+
+trait ParSetLike[T,
+ +Repr <: ParSetLike[T, Repr, Sequential] with ParSet[T],
+ +Sequential <: Set[T] with SetLike[T, Sequential]]
+extends SetLike[T, Repr]
+ with ParIterableLike[T, Repr, Sequential]
+{ self =>
+
+ protected[this] override def newBuilder: Builder[T, Repr] = newCombiner
+
+ protected[this] override def newCombiner: Combiner[T, Repr]
+
+ override def empty: Repr
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index ae19ed8744..686d08a301 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -366,8 +366,9 @@ self =>
def next = { remaining -= 1; self.next }
def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) }
protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = {
- val shortened = for ((it, total) <- sq zip sq.scanLeft(0)(_ + _.remaining).tail) yield
- if (total < remaining) it else taker(it, total - remaining)
+ val sizes = sq.scanLeft(0)(_ + _.remaining)
+ val shortened = for ((it, (from, until)) <- sq zip (sizes.init zip sizes.tail)) yield
+ if (until < remaining) it else taker(it, remaining - from)
shortened filter { _.remaining > 0 }
}
}
diff --git a/src/library/scala/collection/parallel/Splitter.scala b/src/library/scala/collection/parallel/Splitter.scala
index b3cad6d67a..c890fdf974 100644
--- a/src/library/scala/collection/parallel/Splitter.scala
+++ b/src/library/scala/collection/parallel/Splitter.scala
@@ -34,6 +34,15 @@ trait Splitter[+T] extends Iterator[T] {
}
+object Splitter {
+ def empty[T]: Splitter[T] = new Splitter[T] {
+ def hasNext = false
+ def next = Iterator.empty.next
+ def split = Seq(this)
+ }
+}
+
+
/** A precise splitter (or a precise split iterator) can be split into arbitrary number of splitters
* that traverse disjoint subsets of arbitrary sizes.
*
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index dabb4e813f..3facaea7c3 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -4,9 +4,7 @@ package scala.collection.parallel
import scala.concurrent.forkjoin._
-
-
-
+import scala.util.control.Breaks._
@@ -59,6 +57,31 @@ trait Tasks {
protected[this] def split: Seq[Task[R, Tp]]
/** Read of results of `that` task and merge them into results of this one. */
protected[this] def merge(that: Tp) {}
+
+ // exception handling mechanism
+ var exception: Exception = null
+ def forwardException = if (exception != null) throw exception
+ // tries to do the leaf computation, storing the possible exception
+ protected def tryLeaf(result: Option[R]) {
+ try {
+ tryBreakable {
+ leaf(result)
+ } catchBreak {
+ signalAbort
+ }
+ } catch {
+ case e: Exception =>
+ exception = e
+ signalAbort
+ }
+ }
+ protected[this] def tryMerge(t: Tp) {
+ val that = t.asInstanceOf[Task[R, Tp]]
+ if (this.exception == null && that.exception == null) merge(that.repr)
+ else if (that.exception != null) this.exception = that.exception
+ }
+ // override in concrete task implementations to signal abort to other tasks
+ private[parallel] def signalAbort {}
}
type TaskType[R, +Tp] <: Task[R, Tp]
@@ -66,13 +89,13 @@ trait Tasks {
var environment: ExecutionEnvironment
- /** Executes a task and returns a future. */
+ /** Executes a task and returns a future. Forwards an exception if some task threw it. */
def execute[R, Tp](fjtask: TaskType[R, Tp]): () => R
- /** Executes a task and waits for it to finish. */
+ /** Executes a task and waits for it to finish. Forwards an exception if some task threw it. */
def executeAndWait[R, Tp](task: TaskType[R, Tp])
- /** Executes a result task, waits for it to finish, then returns its result. */
+ /** Executes a result task, waits for it to finish, then returns its result. Forwards an exception if some task threw it. */
def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R
/** Retrieves the parallelism level of the task execution environment. */
@@ -96,19 +119,19 @@ trait AdaptiveWorkStealingTasks extends Tasks {
/** The actual leaf computation. */
def leaf(result: Option[R]): Unit
- def compute = if (shouldSplitFurther) internal else leaf(None)
+ def compute = if (shouldSplitFurther) internal else tryLeaf(None)
def internal = {
var last = spawnSubtasks
- last.leaf(None)
+ last.tryLeaf(None)
result = last.result
while (last.next != null) {
val lastresult = Option(last.result)
last = last.next
- if (last.tryCancel) last.leaf(lastresult) else last.sync
- merge(last.repr)
+ if (last.tryCancel) last.tryLeaf(lastresult) else last.sync
+ tryMerge(last.repr)
}
}
@@ -150,7 +173,6 @@ trait HavingForkJoinPool {
}
-
/** An implementation trait for parallel tasks based on the fork/join framework.
*
* @define fjdispatch
@@ -187,6 +209,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
() => {
fjtask.join
+ fjtask.forwardException
fjtask.result
}
}
@@ -202,6 +225,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
forkJoinPool.execute(fjtask)
}
fjtask.join
+ fjtask.forwardException
}
/** Executes a task on a fork/join pool and waits for it to finish.
@@ -218,6 +242,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
forkJoinPool.execute(fjtask)
}
fjtask.join
+ fjtask.forwardException
fjtask.result
}
@@ -225,8 +250,9 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
}
+
object ForkJoinTasks {
- val defaultForkJoinPool = new ForkJoinPool
+ val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool
defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
}
diff --git a/src/library/scala/collection/parallel/immutable/ParHashTrie.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index cb835c7fcd..c72eb66207 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashTrie.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -9,6 +9,7 @@ package scala.collection.parallel.immutable
import scala.collection.parallel.ParMap
import scala.collection.parallel.ParMapLike
import scala.collection.parallel.Combiner
+import scala.collection.parallel.ParIterableIterator
import scala.collection.parallel.EnvironmentPassingCombiner
import scala.collection.generic.ParMapFactory
import scala.collection.generic.CanCombineFrom
@@ -25,26 +26,26 @@ import scala.collection.immutable.HashMap
*
* @author prokopec
*/
-class ParHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V])
+class ParHashMap[K, +V] private[immutable] (private[this] val trie: HashMap[K, V])
extends ParMap[K, V]
- with GenericParMapTemplate[K, V, ParHashTrie]
- with ParMapLike[K, V, ParHashTrie[K, V], HashMap[K, V]]
+ with GenericParMapTemplate[K, V, ParHashMap]
+ with ParMapLike[K, V, ParHashMap[K, V], HashMap[K, V]]
{
self =>
def this() = this(HashMap.empty[K, V])
- override def mapCompanion: GenericParMapCompanion[ParHashTrie] = ParHashTrie
+ override def mapCompanion: GenericParMapCompanion[ParHashMap] = ParHashMap
- override def empty: ParHashTrie[K, V] = new ParHashTrie[K, V]
+ override def empty: ParHashMap[K, V] = new ParHashMap[K, V]
- def parallelIterator = new ParHashTrieIterator(trie) with SCPI
+ def parallelIterator: ParIterableIterator[(K, V)] = new ParHashMapIterator(trie.iterator, trie.size) with SCPI
def seq = trie
- def -(k: K) = new ParHashTrie(trie - k)
+ def -(k: K) = new ParHashMap(trie - k)
- def +[U >: V](kv: (K, U)) = new ParHashTrie(trie + kv)
+ def +[U >: V](kv: (K, U)) = new ParHashMap(trie + kv)
def get(k: K) = trie.get(k)
@@ -55,59 +56,66 @@ self =>
case None => newc
}
- type SCPI = SignalContextPassingIterator[ParHashTrieIterator]
+ type SCPI = SignalContextPassingIterator[ParHashMapIterator]
- class ParHashTrieIterator(val ht: HashMap[K, V])
+ class ParHashMapIterator(val triter: Iterator[(K, V)], val sz: Int)
extends super.ParIterator {
- self: SignalContextPassingIterator[ParHashTrieIterator] =>
- // println("created iterator " + ht)
+ self: SignalContextPassingIterator[ParHashMapIterator] =>
var i = 0
- lazy val triter = ht.iterator
- def split: Seq[ParIterator] = {
- // println("splitting " + ht + " into " + ht.split.map(new ParHashTrieIterator(_) with SCPI).map(_.toList))
- ht.split.map(new ParHashTrieIterator(_) with SCPI)
+ def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match {
+ case t: HashMap.TrieIterator[_, _] =>
+ val previousRemaining = remaining
+ val ((fst, fstlength), snd) = t.asInstanceOf[HashMap.TrieIterator[K, V]].split
+ val sndlength = previousRemaining - fstlength
+ Seq(
+ new ParHashMapIterator(fst, fstlength) with SCPI,
+ new ParHashMapIterator(snd, sndlength) with SCPI
+ )
+ case _ =>
+ // iterator of the collision map case
+ val buff = triter.toBuffer
+ val (fp, sp) = buff.splitAt(buff.length / 2)
+ Seq(fp, sp) map { b => new ParHashMapIterator(b.iterator, b.length) with SCPI }
}
def next: (K, V) = {
- // println("taking next after " + i + ", in " + ht)
i += 1
triter.next
}
def hasNext: Boolean = {
- // println("hasNext: " + i + ", " + ht.size + ", " + ht)
- i < ht.size
+ i < sz
}
- def remaining = ht.size - i
+ def remaining = sz - i
}
}
-object ParHashTrie extends ParMapFactory[ParHashTrie] {
- def empty[K, V]: ParHashTrie[K, V] = new ParHashTrie[K, V]
+object ParHashMap extends ParMapFactory[ParHashMap] {
+ def empty[K, V]: ParHashMap[K, V] = new ParHashMap[K, V]
- def newCombiner[K, V]: Combiner[(K, V), ParHashTrie[K, V]] = HashTrieCombiner[K, V]
+ def newCombiner[K, V]: Combiner[(K, V), ParHashMap[K, V]] = HashMapCombiner[K, V]
- implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashTrie[K, V]] = {
+ implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashMap[K, V]] = {
new CanCombineFromMap[K, V]
}
- def fromTrie[K, V](t: HashMap[K, V]) = new ParHashTrie(t)
+ def fromTrie[K, V](t: HashMap[K, V]) = new ParHashMap(t)
var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0)
}
-trait HashTrieCombiner[K, V]
-extends Combiner[(K, V), ParHashTrie[K, V]] {
-self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
- import HashTrieCombiner._
- var heads = new Array[Unrolled[K, V]](rootsize)
- var lasts = new Array[Unrolled[K, V]](rootsize)
+private[immutable] trait HashMapCombiner[K, V]
+extends Combiner[(K, V), ParHashMap[K, V]] {
+self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
+ import HashMapCombiner._
+ var heads = new Array[Unrolled[(K, V)]](rootsize)
+ var lasts = new Array[Unrolled[(K, V)]](rootsize)
var size: Int = 0
def clear = {
- heads = new Array[Unrolled[K, V]](rootsize)
- lasts = new Array[Unrolled[K, V]](rootsize)
+ heads = new Array[Unrolled[(K, V)]](rootsize)
+ lasts = new Array[Unrolled[(K, V)]](rootsize)
}
def +=(elem: (K, V)) = {
@@ -116,7 +124,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
val pos = hc & 0x1f
if (lasts(pos) eq null) {
// initialize bucket
- heads(pos) = new Unrolled[K, V]
+ heads(pos) = new Unrolled[(K, V)]
lasts(pos) = heads(pos)
}
// add to bucket
@@ -124,10 +132,10 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
this
}
- def combine[N <: (K, V), NewTo >: ParHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
- // ParHashTrie.totalcombines.incrementAndGet
- if (other.isInstanceOf[HashTrieCombiner[_, _]]) {
- val that = other.asInstanceOf[HashTrieCombiner[K, V]]
+ def combine[N <: (K, V), NewTo >: ParHashMap[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
+ // ParHashMap.totalcombines.incrementAndGet
+ if (other.isInstanceOf[HashMapCombiner[_, _]]) {
+ val that = other.asInstanceOf[HashMapCombiner[K, V]]
var i = 0
while (i < rootsize) {
if (lasts(i) eq null) {
@@ -158,17 +166,17 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
}
val sz = root.foldLeft(0)(_ + _.size)
- if (sz == 0) new ParHashTrie[K, V]
- else if (sz == 1) new ParHashTrie[K, V](root(0))
+ if (sz == 0) new ParHashMap[K, V]
+ else if (sz == 1) new ParHashMap[K, V](root(0))
else {
val trie = new HashMap.HashTrieMap(bitmap, root, sz)
- new ParHashTrie[K, V](trie)
+ new ParHashMap[K, V](trie)
}
}
/* tasks */
- class CreateTrie(buckets: Array[Unrolled[K, V]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
+ class CreateTrie(buckets: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
var result = ()
def leaf(prev: Option[Unit]) = {
var i = offset
@@ -178,7 +186,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
i += 1
}
}
- private def createTrie(elems: Unrolled[K, V]): HashMap[K, V] = {
+ private def createTrie(elems: Unrolled[(K, V)]): HashMap[K, V] = {
var trie = new HashMap[K, V]
var unrolled = elems
@@ -208,28 +216,11 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
}
-object HashTrieCombiner {
- def apply[K, V] = new HashTrieCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] {}
+object HashMapCombiner {
+ def apply[K, V] = new HashMapCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] {}
private[immutable] val rootbits = 5
private[immutable] val rootsize = 1 << 5
- private[immutable] val unrolledsize = 16
-
- private[immutable] class Unrolled[K, V] {
- var size = 0
- var array = new Array[(K, V)](unrolledsize)
- var next: Unrolled[K, V] = null
- // adds and returns itself or the new unrolled if full
- def add(elem: (K, V)): Unrolled[K, V] = if (size < unrolledsize) {
- array(size) = elem
- size += 1
- this
- } else {
- next = new Unrolled[K, V]
- next.add(elem)
- }
- override def toString = "Unrolled(" + array.mkString(", ") + ")"
- }
}
@@ -246,3 +237,5 @@ object HashTrieCombiner {
+
+
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
new file mode 100644
index 0000000000..c9bda86d67
--- /dev/null
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -0,0 +1,279 @@
+package scala.collection.parallel.immutable
+
+
+
+
+
+
+
+import scala.collection.parallel.ParSet
+import scala.collection.parallel.ParSetLike
+import scala.collection.parallel.Combiner
+import scala.collection.parallel.ParIterableIterator
+import scala.collection.parallel.EnvironmentPassingCombiner
+import scala.collection.generic.ParSetFactory
+import scala.collection.generic.CanCombineFrom
+import scala.collection.generic.GenericParTemplate
+import scala.collection.generic.GenericParCompanion
+import scala.collection.generic.GenericCompanion
+import scala.collection.immutable.HashSet
+
+
+
+
+
+
+/** Parallel hash trie set.
+ *
+ * @author prokopec
+ */
+class ParHashSet[T] private[immutable] (private[this] val trie: HashSet[T])
+extends ParSet[T]
+ with GenericParTemplate[T, ParHashSet]
+ with ParSetLike[T, ParHashSet[T], HashSet[T]]
+{
+self =>
+
+ def this() = this(HashSet.empty[T])
+
+ override def companion: GenericCompanion[ParHashSet] with GenericParCompanion[ParHashSet] = ParHashSet
+
+ override def empty: ParHashSet[T] = new ParHashSet[T]
+
+ def parallelIterator: ParIterableIterator[T] = new ParHashSetIterator(trie.iterator, trie.size) with SCPI
+
+ def seq = trie
+
+ def -(e: T) = new ParHashSet(trie - e)
+
+ def +(e: T) = new ParHashSet(trie + e)
+
+ def contains(e: T): Boolean = trie.contains(e)
+
+ override def size = trie.size
+
+ protected override def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]) = oldc match {
+ case Some(old) => old
+ case None => newc
+ }
+
+ type SCPI = SignalContextPassingIterator[ParHashSetIterator]
+
+ class ParHashSetIterator(val triter: Iterator[T], val sz: Int)
+ extends super.ParIterator {
+ self: SignalContextPassingIterator[ParHashSetIterator] =>
+ var i = 0
+ def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match {
+ case t: HashSet.TrieIterator[_] =>
+ val previousRemaining = remaining
+ val ((fst, fstlength), snd) = t.asInstanceOf[HashSet.TrieIterator[T]].split
+ val sndlength = previousRemaining - fstlength
+ Seq(
+ new ParHashSetIterator(fst, fstlength) with SCPI,
+ new ParHashSetIterator(snd, sndlength) with SCPI
+ )
+ case _ =>
+ // iterator of the collision map case
+ val buff = triter.toBuffer
+ val (fp, sp) = buff.splitAt(buff.length / 2)
+ Seq(fp, sp) map { b => new ParHashSetIterator(b.iterator, b.length) with SCPI }
+ }
+ def next: T = {
+ i += 1
+ triter.next
+ }
+ def hasNext: Boolean = {
+ i < sz
+ }
+ def remaining = sz - i
+ }
+
+}
+
+
+object ParHashSet extends ParSetFactory[ParHashSet] {
+ def newCombiner[T]: Combiner[T, ParHashSet[T]] = HashSetCombiner[T]
+
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParHashSet[T]] =
+ new GenericCanCombineFrom[T]
+
+ def fromTrie[T](t: HashSet[T]) = new ParHashSet(t)
+}
+
+
+private[immutable] trait HashSetCombiner[T]
+extends Combiner[T, ParHashSet[T]] {
+self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
+ import HashSetCombiner._
+ var heads = new Array[Unrolled[Any]](rootsize)
+ var lasts = new Array[Unrolled[Any]](rootsize)
+ var size: Int = 0
+
+ def clear = {
+ heads = new Array[Unrolled[Any]](rootsize)
+ lasts = new Array[Unrolled[Any]](rootsize)
+ }
+
+ def +=(elem: T) = {
+ size += 1
+ val hc = elem.##
+ val pos = hc & 0x1f
+ if (lasts(pos) eq null) {
+ // initialize bucket
+ heads(pos) = new Unrolled[Any]
+ lasts(pos) = heads(pos)
+ }
+ // add to bucket
+ lasts(pos) = lasts(pos).add(elem)
+ this
+ }
+
+ def combine[N <: T, NewTo >: ParHashSet[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
+ if (other.isInstanceOf[HashSetCombiner[_]]) {
+ val that = other.asInstanceOf[HashSetCombiner[T]]
+ var i = 0
+ while (i < rootsize) {
+ if (lasts(i) eq null) {
+ heads(i) = that.heads(i)
+ lasts(i) = that.lasts(i)
+ } else {
+ lasts(i).next = that.heads(i)
+ if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
+ }
+ i += 1
+ }
+ size = size + that.size
+ this
+ } else error("Unexpected combiner type.")
+ } else this
+
+ def result = {
+ val buckets = heads.filter(_ != null)
+ val root = new Array[HashSet[T]](buckets.length)
+
+ executeAndWait(new CreateTrie(buckets, root, 0, buckets.length))
+
+ var bitmap = 0
+ var i = 0
+ while (i < rootsize) {
+ if (heads(i) ne null) bitmap |= 1 << i
+ i += 1
+ }
+ val sz = root.foldLeft(0)(_ + _.size)
+
+ if (sz == 0) new ParHashSet[T]
+ else if (sz == 1) new ParHashSet[T](root(0))
+ else {
+ val trie = new HashSet.HashTrieSet(bitmap, root, sz)
+ new ParHashSet[T](trie)
+ }
+ }
+
+ /* tasks */
+
+ class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
+ var result = ()
+ def leaf(prev: Option[Unit]) = {
+ var i = offset
+ val until = offset + howmany
+ while (i < until) {
+ root(i) = createTrie(buckets(i))
+ i += 1
+ }
+ }
+ private def createTrie(elems: Unrolled[Any]): HashSet[T] = {
+ var trie = new HashSet[T]
+
+ var unrolled = elems
+ var i = 0
+ while (unrolled ne null) {
+ val chunkarr = unrolled.array
+ val chunksz = unrolled.size
+ while (i < chunksz) {
+ val v = chunkarr(i).asInstanceOf[T]
+ val hc = v.##
+ trie = trie.updated0(v, hc, rootbits)
+ i += 1
+ }
+ i = 0
+ unrolled = unrolled.next
+ }
+
+ trie
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new CreateTrie(buckets, root, offset, fp), new CreateTrie(buckets, root, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
+ }
+
+}
+
+
+object HashSetCombiner {
+ def apply[T] = new HashSetCombiner[T] with EnvironmentPassingCombiner[T, ParHashSet[T]] {}
+
+ private[immutable] val rootbits = 5
+ private[immutable] val rootsize = 1 << 5
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala
index d529d223c9..c54875ecd3 100644
--- a/src/library/scala/collection/parallel/immutable/package.scala
+++ b/src/library/scala/collection/parallel/immutable/package.scala
@@ -12,8 +12,29 @@ package scala.collection.parallel
package object immutable {
+ /* package level methods */
def repetition[T](elem: T, len: Int) = new Repetition(elem, len)
+ /* properties */
+ private[immutable] val unrolledsize = 16
+
+ /* classes */
+ private[immutable] class Unrolled[T: ClassManifest] {
+ var size = 0
+ var array = new Array[T](unrolledsize)
+ var next: Unrolled[T] = null
+ // adds and returns itself or the new unrolled if full
+ def add(elem: T): Unrolled[T] = if (size < unrolledsize) {
+ array(size) = elem
+ size += 1
+ this
+ } else {
+ next = new Unrolled[T]
+ next.add(elem)
+ }
+ override def toString = "Unrolled(" + array.mkString(", ") + ")"
+ }
+
/** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method.
*
* @tparam T type of the elements
diff --git a/src/library/scala/util/control/Breaks.scala b/src/library/scala/util/control/Breaks.scala
index dedf721cd4..0b83ac6d3b 100644
--- a/src/library/scala/util/control/Breaks.scala
+++ b/src/library/scala/util/control/Breaks.scala
@@ -39,6 +39,16 @@ class Breaks {
}
}
+ def tryBreakable(op: => Unit) = new {
+ def catchBreak(onBreak: => Unit) = try {
+ op
+ } catch {
+ case ex: BreakControl =>
+ if (ex ne breakException) throw ex
+ onBreak
+ }
+ }
+
/* Break from dynamically closest enclosing breakable block
* @note this might be different than the statically closest enclosing
* block!
diff --git a/test/files/scalacheck/HashTrieSplit.scala b/test/files/scalacheck/HashTrieSplit.scala
new file mode 100644
index 0000000000..6b20efe12b
--- /dev/null
+++ b/test/files/scalacheck/HashTrieSplit.scala
@@ -0,0 +1,47 @@
+
+
+
+
+
+import collection._
+
+
+
+
+// checks whether hash tries split their iterators correctly
+// even after some elements have been traversed
+object Test {
+ def main(args: Array[String]) {
+ doesSplitOk
+ }
+
+ def doesSplitOk = {
+ val sz = 2000
+ var ht = new parallel.immutable.ParHashMap[Int, Int]
+ // println("creating trie")
+ for (i <- 0 until sz) ht += ((i + sz, i))
+ // println("created trie")
+ for (n <- 0 until (sz - 1)) {
+ // println("---------> n = " + n)
+ val pit = ht.parallelIterator
+ val pit2 = ht.parallelIterator
+ var i = 0
+ while (i < n) {
+ pit.next
+ pit2.next
+ i += 1
+ }
+ // println("splitting")
+ val pits = pit.split
+ val fst = pits(0).toSet
+ val snd = pits(1).toSet
+ val orig = pit2.toSet
+ if (orig.size != (fst.size + snd.size) || orig != (fst ++ snd)) {
+ println("Original: " + orig)
+ println("First: " + fst)
+ println("Second: " + snd)
+ assert(false)
+ }
+ }
+ }
+}