summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-05 16:22:21 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-05 16:22:21 +0000
commit23c6d4f98541820ffd4085aa3a57dffc88de4786 (patch)
treeb3c28909dc8c9e976f049f4c045b6e8e832bc0e3
parent7553e6901d52ace00bfcb670336c480766c8301c (diff)
downloadscala-23c6d4f98541820ffd4085aa3a57dffc88de4786.tar.gz
scala-23c6d4f98541820ffd4085aa3a57dffc88de4786.tar.bz2
scala-23c6d4f98541820ffd4085aa3a57dffc88de4786.zip
Adding immutable parallel hashsets.
Fixing an issue with hashset splitters where the splitting does not work if some elements have already been iterated. Added parallel collections exception handling. Added parallel collections break control. Renaming ParHashTrie -> ParHashMap. The part with immutable.{HashSet, HashMap} - review by rompf
-rw-r--r--build.xml6
-rw-r--r--src/library/scala/collection/generic/ParSetFactory.scala33
-rw-r--r--src/library/scala/collection/generic/Signalling.scala9
-rw-r--r--src/library/scala/collection/immutable/HashMap.scala198
-rw-r--r--src/library/scala/collection/immutable/HashSet.scala188
-rw-r--r--src/library/scala/collection/mutable/ConcurrentMap.scala6
-rw-r--r--src/library/scala/collection/parallel/ParIterable.scala3
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala14
-rw-r--r--src/library/scala/collection/parallel/ParMap.scala6
-rw-r--r--src/library/scala/collection/parallel/ParSet.scala83
-rw-r--r--src/library/scala/collection/parallel/ParSetLike.scala81
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala5
-rw-r--r--src/library/scala/collection/parallel/Splitter.scala9
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala50
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala (renamed from src/library/scala/collection/parallel/immutable/ParHashTrie.scala)117
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala279
-rw-r--r--src/library/scala/collection/parallel/immutable/package.scala21
-rw-r--r--src/library/scala/util/control/Breaks.scala10
-rw-r--r--test/files/scalacheck/HashTrieSplit.scala47
19 files changed, 940 insertions, 225 deletions
diff --git a/build.xml b/build.xml
index 4086a9eadf..e430e9693a 100644
--- a/build.xml
+++ b/build.xml
@@ -1563,12 +1563,12 @@ BOOTRAPING TEST AND TEST SUITE
<include name="run/**/*.scala"/>
</runtests>
<jvmtests dir="${partest.dir}/${partest.srcdir}/jvm" includes="*.scala"/>
- <scalachecktests dir="${partest.dir}/${partest.srcdir}/scalacheck">
- <include name="*.scala"/>
- </scalachecktests>
<residenttests dir="${partest.dir}/${partest.srcdir}/res" includes="*.res"/>
<buildmanagertests dir="${partest.dir}/${partest.srcdir}/buildmanager" includes="*"/>
<scalaptests dir="${partest.dir}/${partest.srcdir}/scalap" includes="**/*.scala"/>
+ <scalachecktests dir="${partest.dir}/${partest.srcdir}/scalacheck">
+ <include name="*.scala"/>
+ </scalachecktests>
<!-- <scripttests dir="${partest.dir}/${partest.srcdir}/script" includes="*.scala"/> -->
</partest>
</target>
diff --git a/src/library/scala/collection/generic/ParSetFactory.scala b/src/library/scala/collection/generic/ParSetFactory.scala
new file mode 100644
index 0000000000..7c43b29bf4
--- /dev/null
+++ b/src/library/scala/collection/generic/ParSetFactory.scala
@@ -0,0 +1,33 @@
+package scala.collection.generic
+
+
+
+
+
+import collection.mutable.Builder
+import collection.parallel.Combiner
+import collection.parallel.ParSet
+import collection.parallel.ParSetLike
+
+
+
+
+
+
+abstract class ParSetFactory[CC[X] <: ParSet[X] with ParSetLike[X, CC[X], _] with GenericParTemplate[X, CC]]
+ extends SetFactory[CC]
+ with GenericParCompanion[CC]
+{
+ def newBuilder[A]: Combiner[A, CC[A]] = newCombiner[A]
+
+ def newCombiner[A]: Combiner[A, CC[A]]
+
+ class GenericCanCombineFrom[A] extends CanCombineFrom[CC[_], A, CC[A]] {
+ override def apply(from: Coll) = from.genericCombiner[A]
+ override def apply() = newCombiner[A]
+ }
+}
+
+
+
+
diff --git a/src/library/scala/collection/generic/Signalling.scala b/src/library/scala/collection/generic/Signalling.scala
index 1dac4297b7..eaf945a4b7 100644
--- a/src/library/scala/collection/generic/Signalling.scala
+++ b/src/library/scala/collection/generic/Signalling.scala
@@ -91,10 +91,7 @@ trait Signalling {
/**
* This signalling implementation returns default values and ignores received signals.
*/
-class DefaultSignalling extends Signalling {
- def isAborted = false
- def abort {}
-
+class DefaultSignalling extends Signalling with VolatileAbort {
def indexFlag = -1
def setIndexFlag(f: Int) {}
def setIndexFlagIfGreater(f: Int) {}
@@ -115,8 +112,8 @@ object IdleSignalling extends DefaultSignalling
*/
trait VolatileAbort extends Signalling {
@volatile private var abortflag = false
- abstract override def isAborted = abortflag
- abstract override def abort = abortflag = true
+ override def isAborted = abortflag
+ override def abort = abortflag = true
}
diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala
index 1084e04101..d167e6d6a7 100644
--- a/src/library/scala/collection/immutable/HashMap.scala
+++ b/src/library/scala/collection/immutable/HashMap.scala
@@ -15,7 +15,7 @@ import generic._
import annotation.unchecked.uncheckedVariance
-import parallel.immutable.ParHashTrie
+import parallel.immutable.ParHashMap
/** This class implements immutable maps using a hash trie.
@@ -36,7 +36,7 @@ import parallel.immutable.ParHashTrie
* @define willNotTerminateInf
*/
@serializable @SerialVersionUID(2L)
-class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Parallelizable[ParHashTrie[A, B]] {
+class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Parallelizable[ParHashMap[A, B]] {
override def size: Int = 0
@@ -90,7 +90,7 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Par
protected def merge0[B1 >: B](that: HashMap[A, B1], level: Int, merger: Merger[B1]): HashMap[A, B1] = that
- def par = ParHashTrie.fromTrie(this)
+ def par = ParHashMap.fromTrie(this)
}
@@ -307,78 +307,7 @@ object HashMap extends ImmutableMapFactory[HashMap] {
}
}
-/*
- override def iterator = { // TODO: optimize (use a stack to keep track of pos)
-
- def iter(m: HashTrieMap[A,B], k: => Stream[(A,B)]): Stream[(A,B)] = {
- def horiz(elems: Array[HashMap[A,B]], i: Int, k: => Stream[(A,B)]): Stream[(A,B)] = {
- if (i < elems.length) {
- elems(i) match {
- case m: HashTrieMap[A,B] => iter(m, horiz(elems, i+1, k))
- case m: HashMap1[A,B] => new Stream.Cons(m.ensurePair, horiz(elems, i+1, k))
- }
- } else k
- }
- horiz(m.elems, 0, k)
- }
- iter(this, Stream.empty).iterator
- }
-*/
-
-
- override def iterator = new Iterator[(A,B)] {
- private[this] var depth = 0
- private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6)
- private[this] var posStack = new Array[Int](6)
-
- private[this] var arrayD = elems
- private[this] var posD = 0
-
- private[this] var subIter: Iterator[(A,B)] = null // to traverse collision nodes
-
- def hasNext = (subIter ne null) || depth >= 0
-
- def next: (A,B) = {
- if (subIter ne null) {
- val el = subIter.next
- if (!subIter.hasNext)
- subIter = null
- el
- } else
- next0(arrayD, posD)
- }
-
- @scala.annotation.tailrec private[this] def next0(elems: Array[HashMap[A,B]], i: Int): (A,B) = {
- if (i == elems.length-1) { // reached end of level, pop stack
- depth -= 1
- if (depth >= 0) {
- arrayD = arrayStack(depth)
- posD = posStack(depth)
- arrayStack(depth) = null
- } else {
- arrayD = null
- posD = 0
- }
- } else
- posD += 1
-
- elems(i) match {
- case m: HashTrieMap[A,B] => // push current pos onto stack and descend
- if (depth >= 0) {
- arrayStack(depth) = arrayD
- posStack(depth) = posD
- }
- depth += 1
- arrayD = m.elems
- posD = 0
- next0(m.elems, 0)
- case m: HashMap1[A,B] => m.ensurePair
- case m =>
- subIter = m.iterator
- subIter.next
- }
- }
- }
+ override def iterator = new TrieIterator[A, B](elems)
/*
@@ -534,6 +463,125 @@ time { mNew.iterator.foreach( p => ()) }
}
+ class TrieIterator[A, +B](elems: Array[HashMap[A, B]]) extends Iterator[(A, B)] {
+ private[this] var depth = 0
+ private[this] var arrayStack = new Array[Array[HashMap[A,B]]](6)
+ private[this] var posStack = new Array[Int](6)
+
+ private[this] var arrayD = elems
+ private[this] var posD = 0
+
+ private[this] var subIter: Iterator[(A, B)] = null // to traverse collision nodes
+
+ def hasNext = (subIter ne null) || depth >= 0
+
+ def next: (A,B) = {
+ if (subIter ne null) {
+ val el = subIter.next
+ if (!subIter.hasNext)
+ subIter = null
+ el
+ } else
+ next0(arrayD, posD)
+ }
+
+ @scala.annotation.tailrec private[this] def next0(elems: Array[HashMap[A,B]], i: Int): (A,B) = {
+ if (i == elems.length-1) { // reached end of level, pop stack
+ depth -= 1
+ if (depth >= 0) {
+ arrayD = arrayStack(depth)
+ posD = posStack(depth)
+ arrayStack(depth) = null
+ } else {
+ arrayD = null
+ posD = 0
+ }
+ } else
+ posD += 1
+
+ elems(i) match {
+ case m: HashTrieMap[A,B] => // push current pos onto stack and descend
+ if (depth >= 0) {
+ arrayStack(depth) = arrayD
+ posStack(depth) = posD
+ }
+ depth += 1
+ arrayD = m.elems
+ posD = 0
+ next0(m.elems, 0)
+ case m: HashMap1[A,B] => m.ensurePair
+ case m =>
+ subIter = m.iterator
+ subIter.next
+ }
+ }
+
+ // assumption: contains 2 or more elements
+ // splits this iterator into 2 iterators
+ // returns the 1st iterator, its number of elements, and the second iterator
+ def split: ((Iterator[(A, B)], Int), Iterator[(A, B)]) = {
+ // 0) simple case: no elements have been iterated - simply divide arrayD
+ if (arrayD != null && depth == 0 && posD == 0) {
+ val (fst, snd) = arrayD.splitAt(arrayD.length / 2)
+ val szfst = fst.foldLeft(0)(_ + _.size)
+ return ((new TrieIterator(fst), szfst), new TrieIterator(snd))
+ }
+
+ // otherwise, some elements have been iterated over
+ // 1) collision case: if we have a subIter, we return subIter and elements after it
+ if (subIter ne null) {
+ val buff = subIter.toBuffer
+ subIter = null
+ ((buff.iterator, buff.length), this)
+ } else {
+ // otherwise find the topmost array stack element
+ if (depth > 0) {
+ // 2) topmost comes before (is not) arrayD
+ // steal a portion of top to create a new iterator
+ val topmost = arrayStack(0)
+ if (posStack(0) == arrayStack(0).length - 1) {
+ // 2a) only a single entry left on top
+ // this means we have to modify this iterator - pop topmost
+ val snd = Array(arrayStack(0).last)
+ val szsnd = snd(0).size
+ // modify this - pop
+ depth -= 1
+ arrayStack = arrayStack.tail ++ Array[Array[HashMap[A, B]]](null)
+ posStack = posStack.tail ++ Array[Int](0)
+ // we know that `this` is not empty, since it had something on the arrayStack and arrayStack elements are always non-empty
+ ((new TrieIterator[A, B](snd), szsnd), this)
+ } else {
+ // 2b) more than a single entry left on top
+ val (fst, snd) = arrayStack(0).splitAt(arrayStack(0).length - (arrayStack(0).length - posStack(0) + 1) / 2)
+ arrayStack(0) = fst
+ val szsnd = snd.foldLeft(0)(_ + _.size)
+ ((new TrieIterator[A, B](snd), szsnd), this)
+ }
+ } else {
+ // 3) no topmost element (arrayD is at the top)
+ // steal a portion of it and update this iterator
+ if (posD == arrayD.length - 1) {
+ // 3a) positioned at the last element of arrayD
+ val arr: Array[HashMap[A, B]] = arrayD(posD) match {
+ case c: HashMapCollision1[_, _] => c.asInstanceOf[HashMapCollision1[A, B]].kvs.toArray map { HashMap() + _ }
+ case ht: HashTrieMap[_, _] => ht.asInstanceOf[HashTrieMap[A, B]].elems
+ case _ => error("cannot divide single element")
+ }
+ val (fst, snd) = arr.splitAt(arr.length / 2)
+ val szsnd = snd.foldLeft(0)(_ + _.size)
+ ((new TrieIterator(snd), szsnd), new TrieIterator(fst))
+ } else {
+ // 3b) arrayD has more free elements
+ val (fst, snd) = arrayD.splitAt(arrayD.length - (arrayD.length - posD + 1) / 2)
+ arrayD = fst
+ val szsnd = snd.foldLeft(0)(_ + _.size)
+ ((new TrieIterator[A, B](snd), szsnd), this)
+ }
+ }
+ }
+ }
+ }
+
private def check[K](x: HashMap[K, _], y: HashMap[K, _], xy: HashMap[K, _]) = { // TODO remove this debugging helper
var xs = Set[K]()
for (elem <- x) xs += elem._1
diff --git a/src/library/scala/collection/immutable/HashSet.scala b/src/library/scala/collection/immutable/HashSet.scala
index 08e64d6709..3f04d63b40 100644
--- a/src/library/scala/collection/immutable/HashSet.scala
+++ b/src/library/scala/collection/immutable/HashSet.scala
@@ -14,6 +14,8 @@ package immutable
import generic._
import annotation.unchecked.uncheckedVariance
+import collection.parallel.immutable.ParHashSet
+
/** This class implements immutable sets using a hash trie.
*
* '''Note:''' the builder of a hash set returns specialized representations `EmptySet`,`Set1`,..., `Set4`
@@ -31,7 +33,9 @@ import annotation.unchecked.uncheckedVariance
@serializable @SerialVersionUID(2L)
class HashSet[A] extends Set[A]
with GenericSetTemplate[A, HashSet]
- with SetLike[A, HashSet[A]] {
+ with SetLike[A, HashSet[A]]
+ with Parallelizable[ParHashSet[A]]
+{
override def companion: GenericCompanion[HashSet] = HashSet
//class HashSet[A] extends Set[A] with SetLike[A, HashSet[A]] {
@@ -55,6 +59,8 @@ class HashSet[A] extends Set[A]
def - (e: A): HashSet[A] =
removed0(e, computeHash(e), 0)
+ def par = ParHashSet.fromTrie(this)
+
protected def elemHashCode(key: A) = if (key == null) 0 else key.##
protected final def improve(hcode: Int) = {
@@ -68,7 +74,7 @@ class HashSet[A] extends Set[A]
protected def get0(key: A, hash: Int, level: Int): Boolean = false
- protected def updated0(key: A, hash: Int, level: Int): HashSet[A] =
+ def updated0(key: A, hash: Int, level: Int): HashSet[A] =
new HashSet.HashSet1(key, hash)
protected def removed0(key: A, hash: Int, level: Int): HashSet[A] = this
@@ -169,7 +175,7 @@ object HashSet extends ImmutableSetFactory[HashSet] {
}
- class HashTrieSet[A](private var bitmap: Int, private var elems: Array[HashSet[A]],
+ class HashTrieSet[A](private var bitmap: Int, private[HashSet] var elems: Array[HashSet[A]],
private var size0: Int) extends HashSet[A] {
override def size = size0
@@ -239,60 +245,7 @@ object HashSet extends ImmutableSetFactory[HashSet] {
}
}
-
- override def iterator = new Iterator[A] {
- private[this] var depth = 0
- private[this] var arrayStack = new Array[Array[HashSet[A]]](6)
- private[this] var posStack = new Array[Int](6)
-
- private[this] var arrayD = elems
- private[this] var posD = 0
-
- private[this] var subIter: Iterator[A] = null // to traverse collision nodes
-
- def hasNext = (subIter ne null) || depth >= 0
-
- def next: A = {
- if (subIter ne null) {
- val el = subIter.next
- if (!subIter.hasNext)
- subIter = null
- el
- } else
- next0(arrayD, posD)
- }
-
- @scala.annotation.tailrec private[this] def next0(elems: Array[HashSet[A]], i: Int): A = {
- if (i == elems.length-1) { // reached end of level, pop stack
- depth -= 1
- if (depth >= 0) {
- arrayD = arrayStack(depth)
- posD = posStack(depth)
- arrayStack(depth) = null
- } else {
- arrayD = null
- posD = 0
- }
- } else
- posD += 1
-
- elems(i) match {
- case m: HashTrieSet[A] => // push current pos onto stack and descend
- if (depth >= 0) {
- arrayStack(depth) = arrayD
- posStack(depth) = posD
- }
- depth += 1
- arrayD = m.elems
- posD = 0
- next0(m.elems, 0)
- case m: HashSet1[A] => m.key
- case m =>
- subIter = m.iterator
- subIter.next
- }
- }
- }
+ override def iterator = new TrieIterator[A](elems)
/*
@@ -315,7 +268,6 @@ time { mNew.iterator.foreach( p => ()) }
*/
-
override def foreach[U](f: A => U): Unit = {
var i = 0;
while (i < elems.length) {
@@ -325,6 +277,126 @@ time { mNew.iterator.foreach( p => ()) }
}
}
+
+ class TrieIterator[A](elems: Array[HashSet[A]]) extends Iterator[A] {
+ private[this] var depth = 0
+ private[this] var arrayStack = new Array[Array[HashSet[A]]](6)
+ private[this] var posStack = new Array[Int](6)
+
+ private[this] var arrayD = elems
+ private[this] var posD = 0
+
+ private[this] var subIter: Iterator[A] = null // to traverse collision nodes
+
+ def hasNext = (subIter ne null) || depth >= 0
+
+ def next: A = {
+ if (subIter ne null) {
+ val el = subIter.next
+ if (!subIter.hasNext)
+ subIter = null
+ el
+ } else
+ next0(arrayD, posD)
+ }
+
+ @scala.annotation.tailrec private[this] def next0(elems: Array[HashSet[A]], i: Int): A = {
+ if (i == elems.length-1) { // reached end of level, pop stack
+ depth -= 1
+ if (depth >= 0) {
+ arrayD = arrayStack(depth)
+ posD = posStack(depth)
+ arrayStack(depth) = null
+ } else {
+ arrayD = null
+ posD = 0
+ }
+ } else
+ posD += 1
+
+ elems(i) match {
+ case m: HashTrieSet[A] => // push current pos onto stack and descend
+ if (depth >= 0) {
+ arrayStack(depth) = arrayD
+ posStack(depth) = posD
+ }
+ depth += 1
+ arrayD = m.elems
+ posD = 0
+ next0(m.elems, 0)
+ case m: HashSet1[A] => m.key
+ case m =>
+ subIter = m.iterator
+ subIter.next
+ }
+ }
+
+ // assumption: contains 2 or more elements
+ // splits this iterator into 2 iterators
+ // returns the 1st iterator, its number of elements, and the second iterator
+ def split: ((Iterator[A], Int), Iterator[A]) = {
+ // 0) simple case: no elements have been iterated - simply divide arrayD
+ if (arrayD != null && depth == 0 && posD == 0) {
+ val (fst, snd) = arrayD.splitAt(arrayD.length / 2)
+ val szfst = fst.foldLeft(0)(_ + _.size)
+ return ((new TrieIterator(fst), szfst), new TrieIterator(snd))
+ }
+
+ // otherwise, some elements have been iterated over
+ // 1) collision case: if we have a subIter, we return subIter and elements after it
+ if (subIter ne null) {
+ val buff = subIter.toBuffer
+ subIter = null
+ ((buff.iterator, buff.length), this)
+ } else {
+ // otherwise find the topmost array stack element
+ if (depth > 0) {
+ // 2) topmost comes before (is not) arrayD
+ // steal a portion of top to create a new iterator
+ val topmost = arrayStack(0)
+ if (posStack(0) == arrayStack(0).length - 1) {
+ // 2a) only a single entry left on top
+ // this means we have to modify this iterator - pop topmost
+ val snd = Array(arrayStack(0).last)
+ val szsnd = snd(0).size
+ // modify this - pop
+ depth -= 1
+ arrayStack = arrayStack.tail ++ Array[Array[HashSet[A]]](null)
+ posStack = posStack.tail ++ Array[Int](0)
+ // we know that `this` is not empty, since it had something on the arrayStack and arrayStack elements are always non-empty
+ ((new TrieIterator[A](snd), szsnd), this)
+ } else {
+ // 2b) more than a single entry left on top
+ val (fst, snd) = arrayStack(0).splitAt(arrayStack(0).length - (arrayStack(0).length - posStack(0) + 1) / 2)
+ arrayStack(0) = fst
+ val szsnd = snd.foldLeft(0)(_ + _.size)
+ ((new TrieIterator[A](snd), szsnd), this)
+ }
+ } else {
+ // 3) no topmost element (arrayD is at the top)
+ // steal a portion of it and update this iterator
+ if (posD == arrayD.length - 1) {
+ // 3a) positioned at the last element of arrayD
+ val arr: Array[HashSet[A]] = arrayD(posD) match {
+ case c: HashSetCollision1[_] => c.asInstanceOf[HashSetCollision1[A]].ks.toList map { HashSet() + _ } toArray
+ case ht: HashTrieSet[_] => ht.asInstanceOf[HashTrieSet[A]].elems
+ case _ => error("cannot divide single element")
+ }
+ val (fst, snd) = arr.splitAt(arr.length / 2)
+ val szsnd = snd.foldLeft(0)(_ + _.size)
+ ((new TrieIterator(snd), szsnd), new TrieIterator(fst))
+ } else {
+ // 3b) arrayD has more free elements
+ val (fst, snd) = arrayD.splitAt(arrayD.length - (arrayD.length - posD + 1) / 2)
+ arrayD = fst
+ val szsnd = snd.foldLeft(0)(_ + _.size)
+ ((new TrieIterator[A](snd), szsnd), this)
+ }
+ }
+ }
+ }
+ }
+
@serializable @SerialVersionUID(2L) private class SerializationProxy[A,B](@transient private var orig: HashSet[A]) {
private def writeObject(out: java.io.ObjectOutputStream) {
val s = orig.size
diff --git a/src/library/scala/collection/mutable/ConcurrentMap.scala b/src/library/scala/collection/mutable/ConcurrentMap.scala
index f9505f694b..a88679a8c9 100644
--- a/src/library/scala/collection/mutable/ConcurrentMap.scala
+++ b/src/library/scala/collection/mutable/ConcurrentMap.scala
@@ -20,12 +20,13 @@ package mutable
* Note: The concurrent maps do not accept `null` for keys or values.
*
* @define atomicop
- * This is done atomically.
+ * This is an atomic operation.
*/
trait ConcurrentMap[A, B] extends Map[A, B] {
/**
* Associates the given key with a given value, unless the key was already associated with some other value.
+ *
* $atomicop
*
* @param k key with which the specified value is to be associated with
@@ -37,6 +38,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] {
/**
* Removes the entry for the specified key if its currently mapped to the specified value.
+ *
* $atomicop
*
* @param k key for which the entry should be removed
@@ -47,6 +49,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] {
/**
* Replaces the entry for the given key only if it was previously mapped to a given value.
+ *
* $atomicop
*
* @param k key for which the entry should be replaced
@@ -58,6 +61,7 @@ trait ConcurrentMap[A, B] extends Map[A, B] {
/**
* Replaces the entry for the given key only if it was previously mapped to some value.
+ *
* $atomicop
*
* @param k key for which the entry should be replaced
diff --git a/src/library/scala/collection/parallel/ParIterable.scala b/src/library/scala/collection/parallel/ParIterable.scala
index 7dd9b3038a..7f5b39d3e4 100644
--- a/src/library/scala/collection/parallel/ParIterable.scala
+++ b/src/library/scala/collection/parallel/ParIterable.scala
@@ -26,8 +26,7 @@ trait ParIterable[+T] extends Iterable[T]
/** $factoryinfo
*/
object ParIterable extends ParFactory[ParIterable] {
- implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] =
- new GenericCanCombineFrom[T]
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParIterable[T]] = new GenericCanCombineFrom[T]
def newBuilder[T]: Combiner[T, ParIterable[T]] = ParArrayCombiner[T]
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 907a3d84da..bdfb92a405 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -427,7 +427,10 @@ self =>
executeAndWaitResult(new Find(pred, parallelIterator assign new DefaultSignalling with VolatileAbort))
}
- protected[this] def cbfactory = () => newCombiner
+ protected[this] def cbfactory ={
+ println(newCombiner + ", " + newCombiner.getClass)
+ () => newCombiner
+ }
override def filter(pred: T => Boolean): Repr = {
executeAndWaitResult(new Filter(pred, cbfactory, parallelIterator) mapResult { _.result })
@@ -655,6 +658,7 @@ self =>
protected[this] def newSubtask(p: ParIterableIterator[T]): Accessor[R, Tp]
def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel)
def split = pit.split.map(newSubtask(_)) // default split procedure
+ private[parallel] override def signalAbort = pit.abort
override def toString = "Accessor(" + pit.toString + ")"
}
@@ -672,6 +676,10 @@ self =>
val st: Second
def combineResults(fr: FR, sr: SR): R
var result: R = null.asInstanceOf[R]
+ private[parallel] override def signalAbort {
+ ft.signalAbort
+ st.signalAbort
+ }
}
/** Sequentially performs one task after another. */
@@ -703,6 +711,9 @@ self =>
inner.compute
result = map(inner.result)
}
+ private[parallel] override def signalAbort {
+ inner.signalAbort
+ }
}
protected trait Transformer[R, Tp] extends Accessor[R, Tp]
@@ -1187,6 +1198,7 @@ self =>
new ScanWithScanTree(Some(st.left.value), op, st.right, src, dest)
)
def shouldSplitFurther = (st.left ne null) && (st.right ne null)
+
}
protected[this] class FromArray[S, A, That](array: Array[A], from: Int, len: Int, cbf: CanCombineFrom[Repr, S, That])
diff --git a/src/library/scala/collection/parallel/ParMap.scala b/src/library/scala/collection/parallel/ParMap.scala
index bf6d9ef644..b33b27c42b 100644
--- a/src/library/scala/collection/parallel/ParMap.scala
+++ b/src/library/scala/collection/parallel/ParMap.scala
@@ -26,7 +26,7 @@ self =>
def mapCompanion: GenericParMapCompanion[ParMap] = ParMap
- override def empty: ParMap[K, V] = new immutable.ParHashTrie[K, V]
+ override def empty: ParMap[K, V] = new immutable.ParHashMap[K, V]
override def stringPrefix = "ParMap"
}
@@ -34,9 +34,9 @@ self =>
object ParMap extends ParMapFactory[ParMap] {
- def empty[K, V]: ParMap[K, V] = new immutable.ParHashTrie[K, V]
+ def empty[K, V]: ParMap[K, V] = new immutable.ParHashMap[K, V]
- def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = immutable.HashTrieCombiner[K, V]
+ def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = immutable.HashMapCombiner[K, V]
implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParMap[K, V]] = new CanCombineFromMap[K, V]
diff --git a/src/library/scala/collection/parallel/ParSet.scala b/src/library/scala/collection/parallel/ParSet.scala
new file mode 100644
index 0000000000..cd59bc0a78
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParSet.scala
@@ -0,0 +1,83 @@
+package scala.collection.parallel
+
+
+
+
+
+
+
+import scala.collection.Map
+import scala.collection.mutable.Builder
+import scala.collection.generic._
+
+
+
+
+
+
+trait ParSet[T]
+extends Set[T]
+ with GenericParTemplate[T, ParSet]
+ with ParIterable[T]
+ with ParSetLike[T, ParSet[T], Set[T]]
+{
+self =>
+ override def empty: ParSet[T] = immutable.ParHashSet[T]()
+
+ override def companion: GenericCompanion[ParSet] with GenericParCompanion[ParSet] = ParSet
+
+ override def stringPrefix = "ParSet"
+}
+
+
+
+object ParSet extends ParSetFactory[ParSet] {
+ def newCombiner[T]: Combiner[T, ParSet[T]] = immutable.HashSetCombiner[T]
+
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParSet[T]] = new GenericCanCombineFrom[T]
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/ParSetLike.scala b/src/library/scala/collection/parallel/ParSetLike.scala
new file mode 100644
index 0000000000..444117368c
--- /dev/null
+++ b/src/library/scala/collection/parallel/ParSetLike.scala
@@ -0,0 +1,81 @@
+package scala.collection.parallel
+
+
+
+import scala.collection.SetLike
+import scala.collection.Set
+import scala.collection.mutable.Builder
+
+
+
+
+
+
+
+
+trait ParSetLike[T,
+ +Repr <: ParSetLike[T, Repr, Sequential] with ParSet[T],
+ +Sequential <: Set[T] with SetLike[T, Sequential]]
+extends SetLike[T, Repr]
+ with ParIterableLike[T, Repr, Sequential]
+{ self =>
+
+ protected[this] override def newBuilder: Builder[T, Repr] = newCombiner
+
+ protected[this] override def newCombiner: Combiner[T, Repr]
+
+ override def empty: Repr
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index ae19ed8744..686d08a301 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -366,8 +366,9 @@ self =>
def next = { remaining -= 1; self.next }
def split: Seq[ParIterableIterator[T]] = takeSeq(self.split) { (p, n) => p.take(n) }
protected[this] def takeSeq[PI <: ParIterableIterator[T]](sq: Seq[PI])(taker: (PI, Int) => PI) = {
- val shortened = for ((it, total) <- sq zip sq.scanLeft(0)(_ + _.remaining).tail) yield
- if (total < remaining) it else taker(it, total - remaining)
+ val sizes = sq.scanLeft(0)(_ + _.remaining)
+ val shortened = for ((it, (from, until)) <- sq zip (sizes.init zip sizes.tail)) yield
+ if (until < remaining) it else taker(it, remaining - from)
shortened filter { _.remaining > 0 }
}
}
diff --git a/src/library/scala/collection/parallel/Splitter.scala b/src/library/scala/collection/parallel/Splitter.scala
index b3cad6d67a..c890fdf974 100644
--- a/src/library/scala/collection/parallel/Splitter.scala
+++ b/src/library/scala/collection/parallel/Splitter.scala
@@ -34,6 +34,15 @@ trait Splitter[+T] extends Iterator[T] {
}
+object Splitter {
+ def empty[T]: Splitter[T] = new Splitter[T] {
+ def hasNext = false
+ def next = Iterator.empty.next
+ def split = Seq(this)
+ }
+}
+
+
/** A precise splitter (or a precise split iterator) can be split into arbitrary number of splitters
* that traverse disjoint subsets of arbitrary sizes.
*
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index dabb4e813f..3facaea7c3 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -4,9 +4,7 @@ package scala.collection.parallel
import scala.concurrent.forkjoin._
-
-
-
+import scala.util.control.Breaks._
@@ -59,6 +57,31 @@ trait Tasks {
protected[this] def split: Seq[Task[R, Tp]]
/** Read of results of `that` task and merge them into results of this one. */
protected[this] def merge(that: Tp) {}
+
+ // exception handling mechanism
+ var exception: Exception = null
+ def forwardException = if (exception != null) throw exception
+ // tries to do the leaf computation, storing the possible exception
+ protected def tryLeaf(result: Option[R]) {
+ try {
+ tryBreakable {
+ leaf(result)
+ } catchBreak {
+ signalAbort
+ }
+ } catch {
+ case e: Exception =>
+ exception = e
+ signalAbort
+ }
+ }
+ protected[this] def tryMerge(t: Tp) {
+ val that = t.asInstanceOf[Task[R, Tp]]
+ if (this.exception == null && that.exception == null) merge(that.repr)
+ else if (that.exception != null) this.exception = that.exception
+ }
+ // override in concrete task implementations to signal abort to other tasks
+ private[parallel] def signalAbort {}
}
type TaskType[R, +Tp] <: Task[R, Tp]
@@ -66,13 +89,13 @@ trait Tasks {
var environment: ExecutionEnvironment
- /** Executes a task and returns a future. */
+ /** Executes a task and returns a future. Forwards an exception if some task threw it. */
def execute[R, Tp](fjtask: TaskType[R, Tp]): () => R
- /** Executes a task and waits for it to finish. */
+ /** Executes a task and waits for it to finish. Forwards an exception if some task threw it. */
def executeAndWait[R, Tp](task: TaskType[R, Tp])
- /** Executes a result task, waits for it to finish, then returns its result. */
+ /** Executes a result task, waits for it to finish, then returns its result. Forwards an exception if some task threw it. */
def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R
/** Retrieves the parallelism level of the task execution environment. */
@@ -96,19 +119,19 @@ trait AdaptiveWorkStealingTasks extends Tasks {
/** The actual leaf computation. */
def leaf(result: Option[R]): Unit
- def compute = if (shouldSplitFurther) internal else leaf(None)
+ def compute = if (shouldSplitFurther) internal else tryLeaf(None)
def internal = {
var last = spawnSubtasks
- last.leaf(None)
+ last.tryLeaf(None)
result = last.result
while (last.next != null) {
val lastresult = Option(last.result)
last = last.next
- if (last.tryCancel) last.leaf(lastresult) else last.sync
- merge(last.repr)
+ if (last.tryCancel) last.tryLeaf(lastresult) else last.sync
+ tryMerge(last.repr)
}
}
@@ -150,7 +173,6 @@ trait HavingForkJoinPool {
}
-
/** An implementation trait for parallel tasks based on the fork/join framework.
*
* @define fjdispatch
@@ -187,6 +209,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
() => {
fjtask.join
+ fjtask.forwardException
fjtask.result
}
}
@@ -202,6 +225,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
forkJoinPool.execute(fjtask)
}
fjtask.join
+ fjtask.forwardException
}
/** Executes a task on a fork/join pool and waits for it to finish.
@@ -218,6 +242,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
forkJoinPool.execute(fjtask)
}
fjtask.join
+ fjtask.forwardException
fjtask.result
}
@@ -225,8 +250,9 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
}
+
object ForkJoinTasks {
- val defaultForkJoinPool = new ForkJoinPool
+ val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool
defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
}
diff --git a/src/library/scala/collection/parallel/immutable/ParHashTrie.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index cb835c7fcd..c72eb66207 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashTrie.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -9,6 +9,7 @@ package scala.collection.parallel.immutable
import scala.collection.parallel.ParMap
import scala.collection.parallel.ParMapLike
import scala.collection.parallel.Combiner
+import scala.collection.parallel.ParIterableIterator
import scala.collection.parallel.EnvironmentPassingCombiner
import scala.collection.generic.ParMapFactory
import scala.collection.generic.CanCombineFrom
@@ -25,26 +26,26 @@ import scala.collection.immutable.HashMap
*
* @author prokopec
*/
-class ParHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V])
+class ParHashMap[K, +V] private[immutable] (private[this] val trie: HashMap[K, V])
extends ParMap[K, V]
- with GenericParMapTemplate[K, V, ParHashTrie]
- with ParMapLike[K, V, ParHashTrie[K, V], HashMap[K, V]]
+ with GenericParMapTemplate[K, V, ParHashMap]
+ with ParMapLike[K, V, ParHashMap[K, V], HashMap[K, V]]
{
self =>
def this() = this(HashMap.empty[K, V])
- override def mapCompanion: GenericParMapCompanion[ParHashTrie] = ParHashTrie
+ override def mapCompanion: GenericParMapCompanion[ParHashMap] = ParHashMap
- override def empty: ParHashTrie[K, V] = new ParHashTrie[K, V]
+ override def empty: ParHashMap[K, V] = new ParHashMap[K, V]
- def parallelIterator = new ParHashTrieIterator(trie) with SCPI
+ def parallelIterator: ParIterableIterator[(K, V)] = new ParHashMapIterator(trie.iterator, trie.size) with SCPI
def seq = trie
- def -(k: K) = new ParHashTrie(trie - k)
+ def -(k: K) = new ParHashMap(trie - k)
- def +[U >: V](kv: (K, U)) = new ParHashTrie(trie + kv)
+ def +[U >: V](kv: (K, U)) = new ParHashMap(trie + kv)
def get(k: K) = trie.get(k)
@@ -55,59 +56,66 @@ self =>
case None => newc
}
- type SCPI = SignalContextPassingIterator[ParHashTrieIterator]
+ type SCPI = SignalContextPassingIterator[ParHashMapIterator]
- class ParHashTrieIterator(val ht: HashMap[K, V])
+ class ParHashMapIterator(val triter: Iterator[(K, V)], val sz: Int)
extends super.ParIterator {
- self: SignalContextPassingIterator[ParHashTrieIterator] =>
- // println("created iterator " + ht)
+ self: SignalContextPassingIterator[ParHashMapIterator] =>
var i = 0
- lazy val triter = ht.iterator
- def split: Seq[ParIterator] = {
- // println("splitting " + ht + " into " + ht.split.map(new ParHashTrieIterator(_) with SCPI).map(_.toList))
- ht.split.map(new ParHashTrieIterator(_) with SCPI)
+ def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match {
+ case t: HashMap.TrieIterator[_, _] =>
+ val previousRemaining = remaining
+ val ((fst, fstlength), snd) = t.asInstanceOf[HashMap.TrieIterator[K, V]].split
+ val sndlength = previousRemaining - fstlength
+ Seq(
+ new ParHashMapIterator(fst, fstlength) with SCPI,
+ new ParHashMapIterator(snd, sndlength) with SCPI
+ )
+ case _ =>
+ // iterator of the collision map case
+ val buff = triter.toBuffer
+ val (fp, sp) = buff.splitAt(buff.length / 2)
+ Seq(fp, sp) map { b => new ParHashMapIterator(b.iterator, b.length) with SCPI }
}
def next: (K, V) = {
- // println("taking next after " + i + ", in " + ht)
i += 1
triter.next
}
def hasNext: Boolean = {
- // println("hasNext: " + i + ", " + ht.size + ", " + ht)
- i < ht.size
+ i < sz
}
- def remaining = ht.size - i
+ def remaining = sz - i
}
}
-object ParHashTrie extends ParMapFactory[ParHashTrie] {
- def empty[K, V]: ParHashTrie[K, V] = new ParHashTrie[K, V]
+object ParHashMap extends ParMapFactory[ParHashMap] {
+ def empty[K, V]: ParHashMap[K, V] = new ParHashMap[K, V]
- def newCombiner[K, V]: Combiner[(K, V), ParHashTrie[K, V]] = HashTrieCombiner[K, V]
+ def newCombiner[K, V]: Combiner[(K, V), ParHashMap[K, V]] = HashMapCombiner[K, V]
- implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashTrie[K, V]] = {
+ implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashMap[K, V]] = {
new CanCombineFromMap[K, V]
}
- def fromTrie[K, V](t: HashMap[K, V]) = new ParHashTrie(t)
+ def fromTrie[K, V](t: HashMap[K, V]) = new ParHashMap(t)
var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0)
}
-trait HashTrieCombiner[K, V]
-extends Combiner[(K, V), ParHashTrie[K, V]] {
-self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
- import HashTrieCombiner._
- var heads = new Array[Unrolled[K, V]](rootsize)
- var lasts = new Array[Unrolled[K, V]](rootsize)
+private[immutable] trait HashMapCombiner[K, V]
+extends Combiner[(K, V), ParHashMap[K, V]] {
+self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
+ import HashMapCombiner._
+ var heads = new Array[Unrolled[(K, V)]](rootsize)
+ var lasts = new Array[Unrolled[(K, V)]](rootsize)
var size: Int = 0
def clear = {
- heads = new Array[Unrolled[K, V]](rootsize)
- lasts = new Array[Unrolled[K, V]](rootsize)
+ heads = new Array[Unrolled[(K, V)]](rootsize)
+ lasts = new Array[Unrolled[(K, V)]](rootsize)
}
def +=(elem: (K, V)) = {
@@ -116,7 +124,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
val pos = hc & 0x1f
if (lasts(pos) eq null) {
// initialize bucket
- heads(pos) = new Unrolled[K, V]
+ heads(pos) = new Unrolled[(K, V)]
lasts(pos) = heads(pos)
}
// add to bucket
@@ -124,10 +132,10 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
this
}
- def combine[N <: (K, V), NewTo >: ParHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
- // ParHashTrie.totalcombines.incrementAndGet
- if (other.isInstanceOf[HashTrieCombiner[_, _]]) {
- val that = other.asInstanceOf[HashTrieCombiner[K, V]]
+ def combine[N <: (K, V), NewTo >: ParHashMap[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
+ // ParHashMap.totalcombines.incrementAndGet
+ if (other.isInstanceOf[HashMapCombiner[_, _]]) {
+ val that = other.asInstanceOf[HashMapCombiner[K, V]]
var i = 0
while (i < rootsize) {
if (lasts(i) eq null) {
@@ -158,17 +166,17 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
}
val sz = root.foldLeft(0)(_ + _.size)
- if (sz == 0) new ParHashTrie[K, V]
- else if (sz == 1) new ParHashTrie[K, V](root(0))
+ if (sz == 0) new ParHashMap[K, V]
+ else if (sz == 1) new ParHashMap[K, V](root(0))
else {
val trie = new HashMap.HashTrieMap(bitmap, root, sz)
- new ParHashTrie[K, V](trie)
+ new ParHashMap[K, V](trie)
}
}
/* tasks */
- class CreateTrie(buckets: Array[Unrolled[K, V]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
+ class CreateTrie(buckets: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
var result = ()
def leaf(prev: Option[Unit]) = {
var i = offset
@@ -178,7 +186,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
i += 1
}
}
- private def createTrie(elems: Unrolled[K, V]): HashMap[K, V] = {
+ private def createTrie(elems: Unrolled[(K, V)]): HashMap[K, V] = {
var trie = new HashMap[K, V]
var unrolled = elems
@@ -208,28 +216,11 @@ self: EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] =>
}
-object HashTrieCombiner {
- def apply[K, V] = new HashTrieCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashTrie[K, V]] {}
+object HashMapCombiner {
+ def apply[K, V] = new HashMapCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] {}
private[immutable] val rootbits = 5
private[immutable] val rootsize = 1 << 5
- private[immutable] val unrolledsize = 16
-
- private[immutable] class Unrolled[K, V] {
- var size = 0
- var array = new Array[(K, V)](unrolledsize)
- var next: Unrolled[K, V] = null
- // adds and returns itself or the new unrolled if full
- def add(elem: (K, V)): Unrolled[K, V] = if (size < unrolledsize) {
- array(size) = elem
- size += 1
- this
- } else {
- next = new Unrolled[K, V]
- next.add(elem)
- }
- override def toString = "Unrolled(" + array.mkString(", ") + ")"
- }
}
@@ -246,3 +237,5 @@ object HashTrieCombiner {
+
+
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
new file mode 100644
index 0000000000..c9bda86d67
--- /dev/null
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -0,0 +1,279 @@
+package scala.collection.parallel.immutable
+
+
+
+
+
+
+
+import scala.collection.parallel.ParSet
+import scala.collection.parallel.ParSetLike
+import scala.collection.parallel.Combiner
+import scala.collection.parallel.ParIterableIterator
+import scala.collection.parallel.EnvironmentPassingCombiner
+import scala.collection.generic.ParSetFactory
+import scala.collection.generic.CanCombineFrom
+import scala.collection.generic.GenericParTemplate
+import scala.collection.generic.GenericParCompanion
+import scala.collection.generic.GenericCompanion
+import scala.collection.immutable.HashSet
+
+
+
+
+
+
+/** Parallel hash trie set.
+ *
+ * @author prokopec
+ */
+class ParHashSet[T] private[immutable] (private[this] val trie: HashSet[T])
+extends ParSet[T]
+ with GenericParTemplate[T, ParHashSet]
+ with ParSetLike[T, ParHashSet[T], HashSet[T]]
+{
+self =>
+
+ def this() = this(HashSet.empty[T])
+
+ override def companion: GenericCompanion[ParHashSet] with GenericParCompanion[ParHashSet] = ParHashSet
+
+ override def empty: ParHashSet[T] = new ParHashSet[T]
+
+ def parallelIterator: ParIterableIterator[T] = new ParHashSetIterator(trie.iterator, trie.size) with SCPI
+
+ def seq = trie
+
+ def -(e: T) = new ParHashSet(trie - e)
+
+ def +(e: T) = new ParHashSet(trie + e)
+
+ def contains(e: T): Boolean = trie.contains(e)
+
+ override def size = trie.size
+
+ protected override def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]) = oldc match {
+ case Some(old) => old
+ case None => newc
+ }
+
+ type SCPI = SignalContextPassingIterator[ParHashSetIterator]
+
+ class ParHashSetIterator(val triter: Iterator[T], val sz: Int)
+ extends super.ParIterator {
+ self: SignalContextPassingIterator[ParHashSetIterator] =>
+ var i = 0
+ def split: Seq[ParIterator] = if (remaining < 2) Seq(this) else triter match {
+ case t: HashSet.TrieIterator[_] =>
+ val previousRemaining = remaining
+ val ((fst, fstlength), snd) = t.asInstanceOf[HashSet.TrieIterator[T]].split
+ val sndlength = previousRemaining - fstlength
+ Seq(
+ new ParHashSetIterator(fst, fstlength) with SCPI,
+ new ParHashSetIterator(snd, sndlength) with SCPI
+ )
+ case _ =>
+ // iterator of the collision map case
+ val buff = triter.toBuffer
+ val (fp, sp) = buff.splitAt(buff.length / 2)
+ Seq(fp, sp) map { b => new ParHashSetIterator(b.iterator, b.length) with SCPI }
+ }
+ def next: T = {
+ i += 1
+ triter.next
+ }
+ def hasNext: Boolean = {
+ i < sz
+ }
+ def remaining = sz - i
+ }
+
+}
+
+
+object ParHashSet extends ParSetFactory[ParHashSet] {
+ def newCombiner[T]: Combiner[T, ParHashSet[T]] = HashSetCombiner[T]
+
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParHashSet[T]] =
+ new GenericCanCombineFrom[T]
+
+ def fromTrie[T](t: HashSet[T]) = new ParHashSet(t)
+}
+
+
+private[immutable] trait HashSetCombiner[T]
+extends Combiner[T, ParHashSet[T]] {
+self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
+ import HashSetCombiner._
+ var heads = new Array[Unrolled[Any]](rootsize)
+ var lasts = new Array[Unrolled[Any]](rootsize)
+ var size: Int = 0
+
+ def clear = {
+ heads = new Array[Unrolled[Any]](rootsize)
+ lasts = new Array[Unrolled[Any]](rootsize)
+ }
+
+ def +=(elem: T) = {
+ size += 1
+ val hc = elem.##
+ val pos = hc & 0x1f
+ if (lasts(pos) eq null) {
+ // initialize bucket
+ heads(pos) = new Unrolled[Any]
+ lasts(pos) = heads(pos)
+ }
+ // add to bucket
+ lasts(pos) = lasts(pos).add(elem)
+ this
+ }
+
+ def combine[N <: T, NewTo >: ParHashSet[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
+ if (other.isInstanceOf[HashSetCombiner[_]]) {
+ val that = other.asInstanceOf[HashSetCombiner[T]]
+ var i = 0
+ while (i < rootsize) {
+ if (lasts(i) eq null) {
+ heads(i) = that.heads(i)
+ lasts(i) = that.lasts(i)
+ } else {
+ lasts(i).next = that.heads(i)
+ if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
+ }
+ i += 1
+ }
+ size = size + that.size
+ this
+ } else error("Unexpected combiner type.")
+ } else this
+
+ def result = {
+ val buckets = heads.filter(_ != null)
+ val root = new Array[HashSet[T]](buckets.length)
+
+ executeAndWait(new CreateTrie(buckets, root, 0, buckets.length))
+
+ var bitmap = 0
+ var i = 0
+ while (i < rootsize) {
+ if (heads(i) ne null) bitmap |= 1 << i
+ i += 1
+ }
+ val sz = root.foldLeft(0)(_ + _.size)
+
+ if (sz == 0) new ParHashSet[T]
+ else if (sz == 1) new ParHashSet[T](root(0))
+ else {
+ val trie = new HashSet.HashTrieSet(bitmap, root, sz)
+ new ParHashSet[T](trie)
+ }
+ }
+
+ /* tasks */
+
+ class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
+ var result = ()
+ def leaf(prev: Option[Unit]) = {
+ var i = offset
+ val until = offset + howmany
+ while (i < until) {
+ root(i) = createTrie(buckets(i))
+ i += 1
+ }
+ }
+ private def createTrie(elems: Unrolled[Any]): HashSet[T] = {
+ var trie = new HashSet[T]
+
+ var unrolled = elems
+ var i = 0
+ while (unrolled ne null) {
+ val chunkarr = unrolled.array
+ val chunksz = unrolled.size
+ while (i < chunksz) {
+ val v = chunkarr(i).asInstanceOf[T]
+ val hc = v.##
+ trie = trie.updated0(v, hc, rootbits)
+ i += 1
+ }
+ i = 0
+ unrolled = unrolled.next
+ }
+
+ trie
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new CreateTrie(buckets, root, offset, fp), new CreateTrie(buckets, root, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
+ }
+
+}
+
+
+object HashSetCombiner {
+ def apply[T] = new HashSetCombiner[T] with EnvironmentPassingCombiner[T, ParHashSet[T]] {}
+
+ private[immutable] val rootbits = 5
+ private[immutable] val rootsize = 1 << 5
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala
index d529d223c9..c54875ecd3 100644
--- a/src/library/scala/collection/parallel/immutable/package.scala
+++ b/src/library/scala/collection/parallel/immutable/package.scala
@@ -12,8 +12,29 @@ package scala.collection.parallel
package object immutable {
+ /* package level methods */
def repetition[T](elem: T, len: Int) = new Repetition(elem, len)
+ /* properties */
+ private[immutable] val unrolledsize = 16
+
+ /* classes */
+ private[immutable] class Unrolled[T: ClassManifest] {
+ var size = 0
+ var array = new Array[T](unrolledsize)
+ var next: Unrolled[T] = null
+ // adds and returns itself or the new unrolled if full
+ def add(elem: T): Unrolled[T] = if (size < unrolledsize) {
+ array(size) = elem
+ size += 1
+ this
+ } else {
+ next = new Unrolled[T]
+ next.add(elem)
+ }
+ override def toString = "Unrolled(" + array.mkString(", ") + ")"
+ }
+
/** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method.
*
* @tparam T type of the elements
diff --git a/src/library/scala/util/control/Breaks.scala b/src/library/scala/util/control/Breaks.scala
index dedf721cd4..0b83ac6d3b 100644
--- a/src/library/scala/util/control/Breaks.scala
+++ b/src/library/scala/util/control/Breaks.scala
@@ -39,6 +39,16 @@ class Breaks {
}
}
+ def tryBreakable(op: => Unit) = new {
+ def catchBreak(onBreak: => Unit) = try {
+ op
+ } catch {
+ case ex: BreakControl =>
+ if (ex ne breakException) throw ex
+ onBreak
+ }
+ }
+
/* Break from dynamically closest enclosing breakable block
* @note this might be different than the statically closest enclosing
* block!
diff --git a/test/files/scalacheck/HashTrieSplit.scala b/test/files/scalacheck/HashTrieSplit.scala
new file mode 100644
index 0000000000..6b20efe12b
--- /dev/null
+++ b/test/files/scalacheck/HashTrieSplit.scala
@@ -0,0 +1,47 @@
+
+
+
+
+
+import collection._
+
+
+
+
+// checks whether hash tries split their iterators correctly
+// even after some elements have been traversed
+object Test {
+ def main(args: Array[String]) {
+ doesSplitOk
+ }
+
+ def doesSplitOk = {
+ val sz = 2000
+ var ht = new parallel.immutable.ParHashMap[Int, Int]
+ // println("creating trie")
+ for (i <- 0 until sz) ht += ((i + sz, i))
+ // println("created trie")
+ for (n <- 0 until (sz - 1)) {
+ // println("---------> n = " + n)
+ val pit = ht.parallelIterator
+ val pit2 = ht.parallelIterator
+ var i = 0
+ while (i < n) {
+ pit.next
+ pit2.next
+ i += 1
+ }
+ // println("splitting")
+ val pits = pit.split
+ val fst = pits(0).toSet
+ val snd = pits(1).toSet
+ val orig = pit2.toSet
+ if (orig.size != (fst.size + snd.size) || orig != (fst ++ snd)) {
+ println("Original: " + orig)
+ println("First: " + fst)
+ println("Second: " + snd)
+ assert(false)
+ }
+ }
+ }
+}