summaryrefslogtreecommitdiff
path: root/src/library/scala
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-11-08 08:52:20 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-11-08 08:52:20 +0000
commit09ed9d12c343ee972861c8439fd10596903efe59 (patch)
treeea2735b13b43d4132664d8b3d6a9c23e2b709b7e /src/library/scala
parent056663c3f22b8c03f222856305ef99e3ed029889 (diff)
downloadscala-09ed9d12c343ee972861c8439fd10596903efe59.tar.gz
scala-09ed9d12c343ee972861c8439fd10596903efe59.tar.bz2
scala-09ed9d12c343ee972861c8439fd10596903efe59.zip
Added size maps to flat hash tables.
Added parallel mutable hash sets. Implemented parallel mutable hash set iterators. Implemented parallel mutable hash set combiners. Factored out unrolled linked lists into a separate class UnrolledBuffer, added tests. Added parallel mutable hash set tests, and debugged hashsets. No review.
Diffstat (limited to 'src/library/scala')
-rw-r--r--src/library/scala/collection/IterableLike.scala2
-rw-r--r--src/library/scala/collection/generic/ClassManifestTraversableFactory.scala25
-rw-r--r--src/library/scala/collection/generic/GenericClassManifestCompanion.scala33
-rw-r--r--src/library/scala/collection/generic/GenericClassManifestTraversableTemplate.scala25
-rw-r--r--src/library/scala/collection/mutable/FlatHashTable.scala192
-rw-r--r--src/library/scala/collection/mutable/HashSet.scala25
-rw-r--r--src/library/scala/collection/mutable/HashTable.scala33
-rw-r--r--src/library/scala/collection/package.scala4
-rw-r--r--src/library/scala/collection/parallel/ParSet.scala2
-rw-r--r--src/library/scala/collection/parallel/UnrolledBuffer.scala258
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala26
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala24
-rw-r--r--src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala95
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala20
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashSet.scala248
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashTable.scala44
-rw-r--r--src/library/scala/collection/parallel/mutable/ParSet.scala42
-rw-r--r--src/library/scala/collection/parallel/mutable/ParSetLike.scala70
-rw-r--r--src/library/scala/collection/parallel/mutable/package.scala37
-rw-r--r--src/library/scala/collection/parallel/package.scala58
20 files changed, 1102 insertions, 161 deletions
diff --git a/src/library/scala/collection/IterableLike.scala b/src/library/scala/collection/IterableLike.scala
index 538fd09c0e..97787c5867 100644
--- a/src/library/scala/collection/IterableLike.scala
+++ b/src/library/scala/collection/IterableLike.scala
@@ -193,7 +193,7 @@ self =>
* $orderDependent
*
* @param n The number of elements to take
- * @return a $coll consisting of all elements of this $coll except the first `n` ones, or else the
+ * @return a $coll consisting of all elements of this $coll except the last `n` ones, or else the
* empty $coll, if this $coll has less than `n` elements.
*/
def dropRight(n: Int): Repr = {
diff --git a/src/library/scala/collection/generic/ClassManifestTraversableFactory.scala b/src/library/scala/collection/generic/ClassManifestTraversableFactory.scala
new file mode 100644
index 0000000000..f9097eeca0
--- /dev/null
+++ b/src/library/scala/collection/generic/ClassManifestTraversableFactory.scala
@@ -0,0 +1,25 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2006-2010, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+
+package scala.collection
+package generic
+
+
+
+
+
+abstract class ClassManifestTraversableFactory[CC[X] <: Traversable[X] with GenericClassManifestTraversableTemplate[X, CC]]
+extends GenericClassManifestCompanion[CC] {
+
+ class GenericCanBuildFrom[A](implicit manif: ClassManifest[A]) extends CanBuildFrom[CC[_], A, CC[A]] {
+ def apply(from: CC[_]) = from.genericClassManifestBuilder[A]
+ def apply = newBuilder[A]
+ }
+
+}
diff --git a/src/library/scala/collection/generic/GenericClassManifestCompanion.scala b/src/library/scala/collection/generic/GenericClassManifestCompanion.scala
new file mode 100644
index 0000000000..98cafd3841
--- /dev/null
+++ b/src/library/scala/collection/generic/GenericClassManifestCompanion.scala
@@ -0,0 +1,33 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2010, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+
+
+package scala.collection
+package generic
+
+import mutable.Builder
+
+
+
+
+
+abstract class GenericClassManifestCompanion[+CC[X] <: Traversable[X]] {
+ type Coll = CC[_]
+
+ def newBuilder[A](implicit ord: ClassManifest[A]): Builder[A, CC[A]]
+
+ def empty[A: ClassManifest]: CC[A] = newBuilder[A].result
+
+ def apply[A](elems: A*)(implicit ord: ClassManifest[A]): CC[A] = {
+ val b = newBuilder[A]
+ b ++= elems
+ b.result
+ }
+}
+
diff --git a/src/library/scala/collection/generic/GenericClassManifestTraversableTemplate.scala b/src/library/scala/collection/generic/GenericClassManifestTraversableTemplate.scala
new file mode 100644
index 0000000000..bf167bd6ab
--- /dev/null
+++ b/src/library/scala/collection/generic/GenericClassManifestTraversableTemplate.scala
@@ -0,0 +1,25 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2010, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+
+
+package scala.collection
+package generic
+
+import mutable.Builder
+import annotation.unchecked.uncheckedVariance
+
+
+
+
+trait GenericClassManifestTraversableTemplate[+A, +CC[X] <: Traversable[X]] extends HasNewBuilder[A, CC[A] @uncheckedVariance] {
+ implicit protected[this] val manifest: ClassManifest[A]
+ def classManifestCompanion: GenericClassManifestCompanion[CC]
+ def genericClassManifestBuilder[B](implicit man: ClassManifest[B]): Builder[B, CC[B]] = classManifestCompanion.newBuilder[B]
+}
+
diff --git a/src/library/scala/collection/mutable/FlatHashTable.scala b/src/library/scala/collection/mutable/FlatHashTable.scala
index aa2cd2deba..d0d9ce2cf5 100644
--- a/src/library/scala/collection/mutable/FlatHashTable.scala
+++ b/src/library/scala/collection/mutable/FlatHashTable.scala
@@ -20,20 +20,12 @@ package mutable
*
* @tparam A the type of the elements contained in the flat hash table.
*/
-trait FlatHashTable[A] {
-
- /** The load factor for the hash table; must be < 500 (0.5)
- */
- protected def loadFactor: Int = 450
- protected final def loadFactorDenum = 1000
-
- /** The initial size of the hash table.
- */
- protected def initialSize: Int = 16
+trait FlatHashTable[A] extends FlatHashTable.HashUtils[A] {
+ import FlatHashTable._
private final val tableDebug = false
- @transient private[collection] var _loadFactor = loadFactor
+ @transient private[collection] var _loadFactor = defaultLoadFactor
/** The actual hash table.
*/
@@ -45,10 +37,14 @@ trait FlatHashTable[A] {
/** The next size value at which to resize (capacity * load factor).
*/
- @transient protected var threshold: Int = newThreshold(initialCapacity)
+ @transient protected var threshold: Int = newThreshold(_loadFactor, initialCapacity)
+
+ /** The array keeping track of number of elements in 32 element blocks.
+ */
+ @transient protected var sizemap: Array[Int] = null
import HashTable.powerOfTwo
- private def capacity(expectedSize: Int) = if (expectedSize == 0) 1 else powerOfTwo(expectedSize)
+ protected def capacity(expectedSize: Int) = if (expectedSize == 0) 1 else powerOfTwo(expectedSize)
private def initialCapacity = capacity(initialSize)
/**
@@ -68,8 +64,11 @@ trait FlatHashTable[A] {
tableSize = 0
assert(size >= 0)
- table = new Array(capacity(size * loadFactorDenum / _loadFactor))
- threshold = newThreshold(table.size)
+ table = new Array(capacity(sizeForThreshold(size, _loadFactor)))
+ threshold = newThreshold(_loadFactor, table.size)
+
+ val smDefined = in.readBoolean
+ if (smDefined) sizeMapInit(table.length)
var index = 0
while (index < size) {
@@ -89,6 +88,7 @@ trait FlatHashTable[A] {
out.defaultWriteObject
out.writeInt(_loadFactor)
out.writeInt(tableSize)
+ out.writeBoolean(isSizeMapDefined)
iterator.foreach(out.writeObject)
}
@@ -127,6 +127,7 @@ trait FlatHashTable[A] {
}
table(h) = elem.asInstanceOf[AnyRef]
tableSize = tableSize + 1
+ nnSizeMapAdd(h)
if (tableSize >= threshold) growTable()
true
}
@@ -157,6 +158,7 @@ trait FlatHashTable[A] {
}
table(h0) = null
tableSize -= 1
+ nnSizeMapRemove(h0)
if (tableDebug) checkConsistent()
return Some(entry.asInstanceOf[A])
}
@@ -181,7 +183,8 @@ trait FlatHashTable[A] {
val oldtable = table
table = new Array[AnyRef](table.length * 2)
tableSize = 0
- threshold = newThreshold(table.length)
+ nnSizeMapReset(table.length)
+ threshold = newThreshold(_loadFactor, table.length)
var i = 0
while (i < oldtable.length) {
val entry = oldtable(i)
@@ -197,26 +200,161 @@ trait FlatHashTable[A] {
assert(false, i+" "+table(i)+" "+table.mkString)
}
- protected def elemHashCode(elem: A) = if (elem == null) 0 else elem.hashCode()
+ /* Size map handling code */
- protected final def improve(hcode: Int) = {
- var h: Int = hcode + ~(hcode << 9)
- h = h ^ (h >>> 14)
- h = h + (h << 4)
- h ^ (h >>> 10)
+ /*
+ * The following three methods (nn*) modify a size map only if it has been
+ * initialized, that is, if it's not set to null.
+ *
+ * The size map logically divides the hash table into `sizeMapBucketSize` element buckets
+ * by keeping an integer entry for each such bucket. Each integer entry simply denotes
+ * the number of elements in the corresponding bucket.
+ * Best understood through an example, see:
+ * table = [/, 1, /, 6, 90, /, -3, 5] (8 entries)
+ * sizemap = [ 2 | 3 ] (2 entries)
+ * where sizeMapBucketSize == 4.
+ *
+ */
+ protected def nnSizeMapAdd(h: Int) = if (sizemap ne null) {
+ val p = h >> sizeMapBucketBitSize
+ sizemap(p) += 1
}
- protected final def index(hcode: Int) = improve(hcode) & (table.length - 1)
+ protected def nnSizeMapRemove(h: Int) = if (sizemap ne null) {
+ sizemap(h >> sizeMapBucketBitSize) -= 1
+ }
- private def newThreshold(size: Int) = {
- val lf = _loadFactor
- assert(lf < (loadFactorDenum / 2), "loadFactor too large; must be < 0.5")
- (size.toLong * lf / loadFactorDenum ).toInt
+ protected def nnSizeMapReset(tableLength: Int) = if (sizemap ne null) {
+ val nsize = calcSizeMapSize(tableLength)
+ if (sizemap.length != nsize) sizemap = new Array[Int](nsize)
+ else java.util.Arrays.fill(sizemap, 0)
+ }
+
+ private[collection] final def totalSizeMapBuckets = if (sizeMapBucketSize < table.length) 1 else table.length / sizeMapBucketSize
+
+ protected def calcSizeMapSize(tableLength: Int) = (tableLength >> sizeMapBucketBitSize) + 1
+
+ // discards the previous sizemap and only allocates a new one
+ protected def sizeMapInit(tableLength: Int) {
+ sizemap = new Array[Int](calcSizeMapSize(tableLength))
+ }
+
+ // discards the previous sizemap and populates the new one
+ protected def sizeMapInitAndRebuild {
+ // first allocate
+ sizeMapInit(table.length)
+
+ // rebuild
+ val totalbuckets = totalSizeMapBuckets
+ var bucketidx = 0
+ var tableidx = 0
+ var tbl = table
+ var tableuntil = sizeMapBucketSize min tbl.length
+ while (bucketidx < totalbuckets) {
+ var currbucketsz = 0
+ while (tableidx < tableuntil) {
+ if (tbl(tableidx) ne null) currbucketsz += 1
+ tableidx += 1
+ }
+ sizemap(bucketidx) = currbucketsz
+ tableuntil += sizeMapBucketSize
+ bucketidx += 1
+ }
+ }
+
+ private[collection] def printSizeMap {
+ println(sizemap.toList)
+ }
+
+ protected def sizeMapDisable = sizemap = null
+
+ protected def isSizeMapDefined = sizemap ne null
+
+ protected def alwaysInitSizeMap = false
+
+ /* End of size map handling code */
+
+ protected final def index(hcode: Int) = {
+ // improve(hcode) & (table.length - 1)
+ val improved = improve(hcode)
+ val ones = table.length - 1
+ (improved >> (32 - java.lang.Integer.bitCount(ones))) & ones
}
protected def clearTable() {
var i = table.length - 1
while (i >= 0) { table(i) = null; i -= 1 }
tableSize = 0
+ nnSizeMapReset(table.length)
+ }
+
+ private[collection] def hashTableContents = new FlatHashTable.Contents[A](
+ _loadFactor,
+ table,
+ tableSize,
+ threshold,
+ sizemap
+ )
+
+ protected def initWithContents(c: FlatHashTable.Contents[A]) = {
+ if (c != null) {
+ _loadFactor = c.loadFactor
+ table = c.table
+ tableSize = c.tableSize
+ threshold = c.threshold
+ sizemap = c.sizemap
+ }
+ if (alwaysInitSizeMap && sizemap == null) sizeMapInitAndRebuild
}
+
}
+
+
+
+private[collection] object FlatHashTable {
+
+ /** The load factor for the hash table; must be < 500 (0.5)
+ */
+ private[collection] def defaultLoadFactor: Int = 450
+ private[collection] final def loadFactorDenum = 1000
+
+ /** The initial size of the hash table.
+ */
+ private[collection] def initialSize: Int = 16
+
+ private[collection] def sizeForThreshold(size: Int, _loadFactor: Int) = size * loadFactorDenum / _loadFactor
+
+ private[collection] def newThreshold(_loadFactor: Int, size: Int) = {
+ val lf = _loadFactor
+ assert(lf < (loadFactorDenum / 2), "loadFactor too large; must be < 0.5")
+ (size.toLong * lf / loadFactorDenum ).toInt
+ }
+
+ class Contents[A](
+ val loadFactor: Int,
+ val table: Array[AnyRef],
+ val tableSize: Int,
+ val threshold: Int,
+ val sizemap: Array[Int]
+ )
+
+ trait HashUtils[A] {
+ protected final def sizeMapBucketBitSize = 5
+ // so that:
+ protected final def sizeMapBucketSize = 1 << sizeMapBucketBitSize
+
+ protected def elemHashCode(elem: A) = if (elem == null) 0 else elem.hashCode()
+
+ protected final def improve(hcode: Int) = {
+ // var h: Int = hcode + ~(hcode << 9)
+ // h = h ^ (h >>> 14)
+ // h = h + (h << 4)
+ // h ^ (h >>> 10)
+ var i = hcode * 0x9e3775cd
+ i = java.lang.Integer.reverseBytes(i)
+ i * 0x9e3775cd
+ }
+ }
+
+}
+
diff --git a/src/library/scala/collection/mutable/HashSet.scala b/src/library/scala/collection/mutable/HashSet.scala
index ebfeaa29ad..684faaabf6 100644
--- a/src/library/scala/collection/mutable/HashSet.scala
+++ b/src/library/scala/collection/mutable/HashSet.scala
@@ -12,6 +12,7 @@ package scala.collection
package mutable
import generic._
+import collection.parallel.mutable.ParHashSet
/** This class implements mutable sets using a hashtable.
*
@@ -35,10 +36,17 @@ import generic._
* @define willNotTerminateInf
*/
@serializable @SerialVersionUID(1L)
-class HashSet[A] extends Set[A]
- with GenericSetTemplate[A, HashSet]
- with SetLike[A, HashSet[A]]
- with FlatHashTable[A] {
+class HashSet[A] private[collection] (contents: FlatHashTable.Contents[A])
+extends Set[A]
+ with GenericSetTemplate[A, HashSet]
+ with SetLike[A, HashSet[A]]
+ with FlatHashTable[A]
+ with Parallelizable[ParHashSet[A]]
+{
+ initWithContents(contents)
+
+ def this() = this(null)
+
override def companion: GenericCompanion[HashSet] = HashSet
override def size = tableSize
@@ -48,6 +56,8 @@ class HashSet[A] extends Set[A]
def += (elem: A): this.type = { addEntry(elem); this }
def -= (elem: A): this.type = { removeEntry(elem); this }
+ def par = new ParHashSet(hashTableContents)
+
override def add(elem: A): Boolean = addEntry(elem)
override def remove(elem: A): Boolean = removeEntry(elem).isDefined
@@ -72,6 +82,13 @@ class HashSet[A] extends Set[A]
private def readObject(in: java.io.ObjectInputStream) {
init(in, x => x)
}
+
+ /** Toggles whether a size map is used to track hash map statistics.
+ */
+ def useSizeMap(t: Boolean) = if (t) {
+ if (!isSizeMapDefined) sizeMapInitAndRebuild
+ } else sizeMapDisable
+
}
/** $factoryInfo
diff --git a/src/library/scala/collection/mutable/HashTable.scala b/src/library/scala/collection/mutable/HashTable.scala
index ceb6d6e0e8..02d09f3081 100644
--- a/src/library/scala/collection/mutable/HashTable.scala
+++ b/src/library/scala/collection/mutable/HashTable.scala
@@ -48,6 +48,10 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
*/
@transient protected var threshold: Int = initialThreshold(_loadFactor)
+ /** The array keeping track of the number of elements in 32 element blocks.
+ */
+ @transient protected var sizemap: Array[Int] = null
+
protected def initialSize: Int = HashTable.initialSize
/**
@@ -70,7 +74,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
table = new Array(capacity(sizeForThreshold(_loadFactor, size)))
threshold = newThreshold(_loadFactor, table.size)
- if (smDefined) sizeMapInit(table.size) else sizemap = null
+ if (smDefined) sizeMapInit(table.length) else sizemap = null
var index = 0
while (index < size) {
@@ -218,8 +222,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
threshold = newThreshold(_loadFactor, newSize)
}
- @transient protected var sizemap: Array[Int] = null
- private[collection] final def totalSizeMapBuckets = if (sizeMapBucketSize < table.length) 1 else table.length / sizeMapBucketSize
+ /* Size map handling code */
/*
* The following three sizeMap* functions (Add, Remove, Reset)
@@ -252,6 +255,8 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
else java.util.Arrays.fill(sizemap, 0)
}
+ private[collection] final def totalSizeMapBuckets = if (sizeMapBucketSize < table.length) 1 else table.length / sizeMapBucketSize
+
protected def calcSizeMapSize(tableLength: Int) = (tableLength >> sizeMapBucketBitSize) + 1
// discards the previous sizemap and only allocates a new one
@@ -286,7 +291,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
}
}
- def printSizeMap {
+ private[collection] def printSizeMap {
println(sizemap.toList)
}
@@ -294,6 +299,11 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
protected def isSizeMapDefined = sizemap ne null
+ // override to automatically initialize the size map
+ protected def alwaysInitSizeMap = false
+
+ /* End of size map handling code */
+
protected def elemEquals(key1: A, key2: A): Boolean = (key1 == key2)
// Note:
@@ -306,12 +316,15 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
shifted
}
- protected def initWithContents(c: HashTable.Contents[A, Entry]) = if (c != null) {
- _loadFactor = c.loadFactor
- table = c.table
- tableSize = c.tableSize
- threshold = c.threshold
- sizemap = c.sizemap
+ protected def initWithContents(c: HashTable.Contents[A, Entry]) = {
+ if (c != null) {
+ _loadFactor = c.loadFactor
+ table = c.table
+ tableSize = c.tableSize
+ threshold = c.threshold
+ sizemap = c.sizemap
+ }
+ if (alwaysInitSizeMap && sizemap == null) sizeMapInitAndRebuild
}
private[collection] def hashTableContents = new HashTable.Contents(
diff --git a/src/library/scala/collection/package.scala b/src/library/scala/collection/package.scala
index f1eb50c5e0..13b6f22826 100644
--- a/src/library/scala/collection/package.scala
+++ b/src/library/scala/collection/package.scala
@@ -79,9 +79,9 @@ package object collection {
private[collection] object DebugUtils {
/* debug utils */
- def buildString(closure: (String => Unit) => Unit): String = {
+ def buildString(closure: (Any => Unit) => Unit): String = {
var output = ""
- def appendln(s: String) = output += s + "\n"
+ def appendln(s: Any) = output += s + "\n"
closure(appendln)
output
}
diff --git a/src/library/scala/collection/parallel/ParSet.scala b/src/library/scala/collection/parallel/ParSet.scala
index cd59bc0a78..80bc65fe43 100644
--- a/src/library/scala/collection/parallel/ParSet.scala
+++ b/src/library/scala/collection/parallel/ParSet.scala
@@ -6,7 +6,7 @@ package scala.collection.parallel
-import scala.collection.Map
+import scala.collection.Set
import scala.collection.mutable.Builder
import scala.collection.generic._
diff --git a/src/library/scala/collection/parallel/UnrolledBuffer.scala b/src/library/scala/collection/parallel/UnrolledBuffer.scala
new file mode 100644
index 0000000000..2c12069e1c
--- /dev/null
+++ b/src/library/scala/collection/parallel/UnrolledBuffer.scala
@@ -0,0 +1,258 @@
+package scala.collection.parallel
+
+
+
+import collection.generic._
+import collection.mutable.Builder
+
+import annotation.tailrec
+
+
+
+
+class UnrolledBuffer[T](implicit val manifest: ClassManifest[T])
+extends collection.mutable.Buffer[T]
+ with collection.mutable.BufferLike[T, UnrolledBuffer[T]]
+ with GenericClassManifestTraversableTemplate[T, UnrolledBuffer]
+ with collection.mutable.Builder[T, UnrolledBuffer[T]]
+{
+ import UnrolledBuffer.Unrolled
+
+ private var headptr = new Unrolled[T]
+ private var lastptr = headptr
+ private var sz = 0
+
+ private[parallel] def headPtr = headptr
+ private[parallel] def headPtr_=(head: Unrolled[T]) = headptr = head
+ private[parallel] def lastPtr = lastptr
+ private[parallel] def lastPtr_=(last: Unrolled[T]) = lastptr = last
+
+ protected[this] override def newBuilder = new UnrolledBuffer[T]
+
+ def classManifestCompanion = UnrolledBuffer
+
+ def concat(that: UnrolledBuffer[T]) = {
+ // bind the two together
+ if (!lastptr.bind(that.headptr)) lastptr = that.lastPtr
+
+ // update size
+ sz += that.sz
+
+ // `that` is no longer usable, so clear it
+ // here we rely on the fact that `clear` allocates
+ // new nodes instead of modifying the previous ones
+ that.clear
+
+ // return a reference to this
+ this
+ }
+
+ def +=(elem: T) = {
+ lastptr = lastptr.append(elem)
+ sz += 1
+ this
+ }
+
+ def clear() {
+ headptr = new Unrolled[T]
+ lastptr = headptr
+ sz = 0
+ }
+
+ def iterator = new Iterator[T] {
+ var pos: Int = -1
+ var node: Unrolled[T] = headptr
+ scan()
+
+ private def scan() {
+ pos += 1
+ while (pos >= node.size) {
+ pos = 0
+ node = node.next
+ if (node eq null) return
+ }
+ }
+ def hasNext = node ne null
+ def next = if (hasNext) {
+ val r = node.array(pos)
+ scan()
+ r
+ } else Iterator.empty.next
+ }
+
+ // this should be faster than the iterator
+ override def foreach[U](f: T => U) = headptr.foreach(f)
+
+ def result = this
+
+ def length = sz
+
+ def apply(idx: Int) =
+ if (idx >= 0 && idx < sz) headptr(idx)
+ else outofbounds(idx)
+
+ def update(idx: Int, newelem: T) =
+ if (idx >= 0 && idx < sz) headptr(idx) = newelem
+ else outofbounds(idx)
+
+ def remove(idx: Int) =
+ if (idx >= 0 && idx < sz) {
+ sz -= 1
+ headptr.remove(idx, this)
+ } else outofbounds(idx)
+
+ def +=:(elem: T) = {
+ headptr = headptr.prepend(elem)
+ sz += 1
+ this
+ }
+
+ def insertAll(idx: Int, elems: Traversable[T]) =
+ if (idx >= 0 && idx <= sz) {
+ headptr.insertAll(idx, elems, this)
+ sz += elems.size
+ } else outofbounds(idx)
+
+ override def stringPrefix = "UnrolledBuffer"
+}
+
+
+object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
+ /** $genericCanBuildFromInfo */
+ implicit def canBuildFrom[T](implicit m: ClassManifest[T]): CanBuildFrom[Coll, T, UnrolledBuffer[T]] =
+ new GenericCanBuildFrom[T]
+ def newBuilder[T](implicit m: ClassManifest[T]): Builder[T, UnrolledBuffer[T]] = new UnrolledBuffer[T]
+
+ val waterline = 50
+ val waterlineDelim = 100
+ private[parallel] val unrolledsize = 32
+
+ /** Unrolled buffer node.
+ */
+ class Unrolled[T: ClassManifest] private (var size: Int, var array: Array[T], var next: Unrolled[T]) {
+ def this() = this(0, new Array[T](UnrolledBuffer.unrolledsize), null)
+
+ // adds and returns itself or the new unrolled if full
+ @tailrec final def append(elem: T): Unrolled[T] = if (size < UnrolledBuffer.unrolledsize) {
+ array(size) = elem
+ size += 1
+ this
+ } else {
+ next = new Unrolled[T]
+ next.append(elem)
+ }
+ def foreach[U](f: T => U) {
+ var unrolled = this
+ var i = 0
+ while (unrolled ne null) {
+ val chunkarr = unrolled.array
+ val chunksz = unrolled.size
+ while (i < chunksz) {
+ val elem = chunkarr(i)
+ f(elem)
+ i += 1
+ }
+ i = 0
+ unrolled = unrolled.next
+ }
+ }
+ @tailrec final def apply(idx: Int): T =
+ if (idx < size) array(idx) else next.apply(idx - size)
+ @tailrec final def update(idx: Int, newelem: T): Unit =
+ if (idx < size) array(idx) = newelem else next.update(idx - size, newelem)
+ @tailrec final def locate(idx: Int): Unrolled[T] =
+ if (idx < size) this else next.locate(idx - size)
+ def prepend(elem: T) = if (size < array.length) {
+ // shift the elements of the array right
+ // then insert the element
+ shiftright()
+ array(0) = elem
+ size += 1
+ this
+ } else {
+ // allocate a new node and store element
+ // then make it point to this
+ val newhead = new Unrolled[T]
+ newhead.append(elem)
+ newhead.next = this
+ newhead
+ }
+ // shifts right assuming enough space
+ private def shiftright() {
+ var i = size - 1
+ while (i >= 0) {
+ array(i + 1) = array(i)
+ i -= 1
+ }
+ }
+ // returns pointer to new last if changed
+ @tailrec final def remove(idx: Int, buffer: UnrolledBuffer[T]): T =
+ if (idx < size) {
+ // remove the element
+ // then try to merge with the next bucket
+ val r = array(idx)
+ shiftleft(idx)
+ size -= 1
+ if (tryMergeWithNext()) buffer.lastPtr = this
+ r
+ } else next.remove(idx - size, buffer)
+ // shifts left elements after `leftb` (overwrites `leftb`)
+ private def shiftleft(leftb: Int) {
+ var i = leftb
+ while (i < (size - 1)) {
+ array(i) = array(i + 1)
+ i += 1
+ }
+ nullout(i, i + 1)
+ }
+ protected def tryMergeWithNext() = if (next != null && (size + next.size) < (array.length * waterline / waterlineDelim)) {
+ // copy the next array, then discard the next node
+ Array.copy(next.array, 0, array, size, next.size)
+ size = size + next.size
+ next = next.next
+ if (next eq null) true else false // checks if last node was thrown out
+ } else false
+
+ @tailrec final def insertAll(idx: Int, t: Traversable[T], buffer: UnrolledBuffer[T]): Unit = if (idx < size) {
+ // divide this node at the appropriate position and insert all into head
+ // update new next
+ val newnextnode = new Unrolled[T]
+ Array.copy(array, idx, newnextnode.array, 0, size - idx)
+ newnextnode.size = size - idx
+ newnextnode.next = next
+
+ // update this
+ nullout(idx, size)
+ size = idx
+ next = null
+
+ // insert everything from iterable to this
+ var curr = this
+ for (elem <- t) curr = curr append elem
+ curr.next = newnextnode
+
+ // try to merge the last node of this with the newnextnode
+ if (curr.tryMergeWithNext()) buffer.lastPtr = curr
+ } else insertAll(idx - size, t, buffer)
+ private def nullout(from: Int, until: Int) {
+ var idx = from
+ while (idx < until) {
+ array(idx) = null.asInstanceOf[T] // !!
+ idx += 1
+ }
+ }
+
+ // assumes this is the last node
+ // `thathead` and `thatlast` are head and last node
+ // of the other unrolled list, respectively
+ def bind(thathead: Unrolled[T]) = {
+ assert(next eq null)
+ next = thathead
+ tryMergeWithNext()
+ }
+
+ override def toString = array.take(size).mkString("Unrolled(", ", ", ")") + " -> " + (if (next ne null) next.toString else "")
+ }
+
+}
+
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 07caafb417..79dddc7c8b 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -11,7 +11,8 @@ import scala.collection.parallel.ParMapLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.ParIterableIterator
import scala.collection.parallel.EnvironmentPassingCombiner
-import scala.collection.parallel.Unrolled
+import scala.collection.parallel.UnrolledBuffer.Unrolled
+import scala.collection.parallel.UnrolledBuffer
import scala.collection.generic.ParMapFactory
import scala.collection.generic.CanCombineFrom
import scala.collection.generic.GenericParMapTemplate
@@ -134,26 +135,25 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
sz += 1
val hc = emptyTrie.computeHash(elem._1)
val pos = hc & 0x1f
- if (lasts(pos) eq null) {
+ if (buckets(pos) eq null) {
// initialize bucket
- heads(pos) = new Unrolled[(K, V)]
- lasts(pos) = heads(pos)
+ buckets(pos) = new UnrolledBuffer[(K, V)]
}
// add to bucket
- lasts(pos) = lasts(pos).add(elem)
+ buckets(pos) += elem
this
}
def result = {
- val buckets = heads.filter(_ != null)
- val root = new Array[HashMap[K, V]](buckets.length)
+ val bucks = buckets.filter(_ != null).map(_.headPtr)
+ val root = new Array[HashMap[K, V]](bucks.length)
- executeAndWaitResult(new CreateTrie(buckets, root, 0, buckets.length))
+ executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length))
var bitmap = 0
var i = 0
while (i < rootsize) {
- if (heads(i) ne null) bitmap |= 1 << i
+ if (buckets(i) ne null) bitmap |= 1 << i
i += 1
}
val sz = root.foldLeft(0)(_ + _.size)
@@ -167,18 +167,18 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
}
override def toString = {
- "HashTrieCombiner(buckets:\n\t" + heads.filter(_ != null).mkString("\n\t") + ")\n"
+ "HashTrieCombiner(buckets:\n\t" + buckets.filter(_ != null).mkString("\n\t") + ")\n"
}
/* tasks */
- class CreateTrie(buckets: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
+ class CreateTrie(bucks: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
var result = ()
def leaf(prev: Option[Unit]) = {
var i = offset
val until = offset + howmany
while (i < until) {
- root(i) = createTrie(buckets(i))
+ root(i) = createTrie(bucks(i))
i += 1
}
}
@@ -204,7 +204,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
}
def split = {
val fp = howmany / 2
- List(new CreateTrie(buckets, root, offset, fp), new CreateTrie(buckets, root, offset + fp, howmany - fp))
+ List(new CreateTrie(bucks, root, offset, fp), new CreateTrie(bucks, root, offset + fp, howmany - fp))
}
def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
}
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
index 33e2e7102a..66ded02397 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -11,7 +11,8 @@ import scala.collection.parallel.ParSetLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.ParIterableIterator
import scala.collection.parallel.EnvironmentPassingCombiner
-import scala.collection.parallel.Unrolled
+import scala.collection.parallel.UnrolledBuffer.Unrolled
+import scala.collection.parallel.UnrolledBuffer
import scala.collection.generic.ParSetFactory
import scala.collection.generic.CanCombineFrom
import scala.collection.generic.GenericParTemplate
@@ -112,26 +113,25 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
sz += 1
val hc = emptyTrie.computeHash(elem)
val pos = hc & 0x1f
- if (lasts(pos) eq null) {
+ if (buckets(pos) eq null) {
// initialize bucket
- heads(pos) = new Unrolled[Any]
- lasts(pos) = heads(pos)
+ buckets(pos) = new UnrolledBuffer[Any]
}
// add to bucket
- lasts(pos) = lasts(pos).add(elem)
+ buckets(pos) += elem
this
}
def result = {
- val buckets = heads.filter(_ != null)
- val root = new Array[HashSet[T]](buckets.length)
+ val bucks = buckets.filter(_ != null).map(_.headPtr)
+ val root = new Array[HashSet[T]](bucks.length)
- executeAndWaitResult(new CreateTrie(buckets, root, 0, buckets.length))
+ executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length))
var bitmap = 0
var i = 0
while (i < rootsize) {
- if (heads(i) ne null) bitmap |= 1 << i
+ if (buckets(i) ne null) bitmap |= 1 << i
i += 1
}
val sz = root.foldLeft(0)(_ + _.size)
@@ -146,14 +146,14 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
/* tasks */
- class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int)
+ class CreateTrie(bucks: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int)
extends super.Task[Unit, CreateTrie] {
var result = ()
def leaf(prev: Option[Unit]) = {
var i = offset
val until = offset + howmany
while (i < until) {
- root(i) = createTrie(buckets(i))
+ root(i) = createTrie(bucks(i))
i += 1
}
}
@@ -179,7 +179,7 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
}
def split = {
val fp = howmany / 2
- List(new CreateTrie(buckets, root, offset, fp), new CreateTrie(buckets, root, offset + fp, howmany - fp))
+ List(new CreateTrie(bucks, root, offset, fp), new CreateTrie(bucks, root, offset + fp, howmany - fp))
}
def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
}
diff --git a/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala b/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala
new file mode 100644
index 0000000000..f52bfc8544
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala
@@ -0,0 +1,95 @@
+package scala.collection
+package parallel.mutable
+
+
+
+
+import collection.parallel.ParIterableIterator
+
+
+
+
+trait ParFlatHashTable[T] extends collection.mutable.FlatHashTable[T] {
+
+ override def alwaysInitSizeMap = true
+
+ abstract class ParFlatHashTableIterator(var idx: Int, val until: Int, val totalsize: Int)
+ extends ParIterableIterator[T] with SizeMapUtils {
+ import collection.DebugUtils._
+
+ private var traversed = 0
+ private val itertable = table
+
+ if (hasNext) scan()
+
+ private def scan() {
+ while (itertable(idx) eq null) idx += 1
+ }
+
+ def newIterator(index: Int, until: Int, totalsize: Int): ParIterableIterator[T]
+
+ def remaining = totalsize - traversed
+ def hasNext = traversed < totalsize
+ def next = if (hasNext) {
+ val r = itertable(idx).asInstanceOf[T]
+ traversed += 1
+ idx += 1
+ if (hasNext) scan()
+ r
+ } else Iterator.empty.next
+ def split = if (remaining > 1) {
+ val divpt = (until + idx) / 2
+
+ val fstidx = idx
+ val fstuntil = divpt
+ val fsttotal = calcNumElems(idx, divpt, itertable.length, sizeMapBucketSize)
+ val fstit = newIterator(fstidx, fstuntil, fsttotal)
+
+ val sndidx = divpt
+ val snduntil = until
+ val sndtotal = remaining - fsttotal
+ val sndit = newIterator(sndidx, snduntil, sndtotal)
+
+ Seq(fstit, sndit)
+ } else Seq(this)
+
+ override def debugInformation = buildString {
+ append =>
+ append("Parallel flat hash table iterator")
+ append("---------------------------------")
+ append("Traversed/total: " + traversed + " / " + totalsize)
+ append("Table idx/until: " + idx + " / " + until)
+ append("Table length: " + itertable.length)
+ append("Table: ")
+ append(arrayString(itertable, 0, itertable.length))
+ append("Sizemap: ")
+ append(arrayString(sizemap, 0, sizemap.length))
+ }
+
+ protected def countElems(from: Int, until: Int) = {
+ var count = 0
+ var i = from
+ while (i < until) {
+ if (itertable(i) ne null) count += 1
+ i += 1
+ }
+ count
+ }
+
+ protected def countBucketSizes(frombucket: Int, untilbucket: Int) = {
+ var count = 0
+ var i = frombucket
+ while (i < untilbucket) {
+ count += sizemap(i)
+ i += 1
+ }
+ count
+ }
+
+ private def check = if (table.slice(idx, until).count(_ != null) != remaining) {
+ println("Invariant broken: " + debugInformation)
+ assert(false)
+ }
+ }
+
+}
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
index 7e4e26d758..a737c03db1 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -128,42 +128,44 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
sz += 1
val hc = improve(elemHashCode(elem._1))
val pos = (hc >>> nonmasklen)
- if (lasts(pos) eq null) {
+ if (buckets(pos) eq null) {
// initialize bucket
- heads(pos) = new Unrolled[DefaultEntry[K, V]]
- lasts(pos) = heads(pos)
+ buckets(pos) = new UnrolledBuffer[DefaultEntry[K, V]]()
}
// add to bucket
- lasts(pos) = lasts(pos).add(new DefaultEntry(elem._1, elem._2))
+ buckets(pos) += new DefaultEntry(elem._1, elem._2)
this
}
def result: ParHashMap[K, V] = if (size >= (ParHashMapCombiner.numblocks * sizeMapBucketSize)) { // 1024
// construct table
val table = new AddingHashTable(size, tableLoadFactor)
- val insertcount = executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks))
+ val bucks = buckets.map(b => if (b ne null) b.headPtr else null)
+ val insertcount = executeAndWaitResult(new FillBlocks(bucks, table, 0, bucks.length))
table.setSize(insertcount)
// TODO compare insertcount and size to see if compression is needed
val c = table.hashTableContents
new ParHashMap(c)
} else {
// construct a normal table and fill it sequentially
+ // TODO parallelize by keeping separate sizemaps and merging them
val table = new HashTable[K, DefaultEntry[K, V]] {
def insertEntry(e: DefaultEntry[K, V]) = if (super.findEntry(e.key) eq null) super.addEntry(e)
sizeMapInit(table.length)
}
var i = 0
while (i < ParHashMapCombiner.numblocks) {
- if (heads(i) ne null) {
- for (elem <- heads(i)) table.insertEntry(elem)
+ if (buckets(i) ne null) {
+ for (elem <- buckets(i)) table.insertEntry(elem)
}
i += 1
}
- // TODO compression
val c = table.hashTableContents
new ParHashMap(c)
}
+ /* classes */
+
/** A hash table which will never resize itself. Knowing the number of elements in advance,
* it allocates the table of the required size when created.
*
@@ -213,6 +215,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
/* tasks */
+ import UnrolledBuffer.Unrolled
+
class FillBlocks(buckets: Array[Unrolled[DefaultEntry[K, V]]], table: AddingHashTable, offset: Int, howmany: Int)
extends super.Task[Int, FillBlocks] {
var result = Int.MinValue
diff --git a/src/library/scala/collection/parallel/mutable/ParHashSet.scala b/src/library/scala/collection/parallel/mutable/ParHashSet.scala
index dc33ef3189..e14fbd7305 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashSet.scala
@@ -3,17 +3,265 @@ package scala.collection.parallel.mutable
+import collection.generic._
+import collection.mutable.HashSet
+import collection.mutable.FlatHashTable
+import collection.parallel.Combiner
+import collection.parallel.EnvironmentPassingCombiner
+import collection.parallel.UnrolledBuffer
+class ParHashSet[T] private[collection] (contents: FlatHashTable.Contents[T])
+extends ParSet[T]
+ with GenericParTemplate[T, ParHashSet]
+ with ParSetLike[T, ParHashSet[T], collection.mutable.HashSet[T]]
+ with ParFlatHashTable[T]
+{
+ initWithContents(contents)
+ // println("----> new par hash set!")
+ // java.lang.Thread.dumpStack
+ // println(debugInformation)
+ def this() = this(null)
+ override def companion = ParHashSet
+ override def empty = new ParHashSet
+ override def iterator = parallelIterator
+ override def size = tableSize
+ def seq = new HashSet(hashTableContents)
+ def +=(elem: T) = {
+ addEntry(elem)
+ this
+ }
+
+ def -=(elem: T) = {
+ removeEntry(elem)
+ this
+ }
+
+ def contains(elem: T) = containsEntry(elem)
+
+ def parallelIterator = new ParHashSetIterator(0, table.length, size) with SCPI
+
+ type SCPI = SignalContextPassingIterator[ParHashSetIterator]
+
+ class ParHashSetIterator(start: Int, iteratesUntil: Int, totalElements: Int)
+ extends ParFlatHashTableIterator(start, iteratesUntil, totalElements) with ParIterator {
+ me: SCPI =>
+ def newIterator(start: Int, until: Int, total: Int) = new ParHashSetIterator(start, until, total) with SCPI
+ }
+
+ import collection.DebugUtils._
+ override def debugInformation = buildString {
+ append =>
+ append("Parallel flat hash table set")
+ append("No. elems: " + tableSize)
+ append("Table length: " + table.length)
+ append("Table: ")
+ append(arrayString(table, 0, table.length))
+ append("Sizemap: ")
+ append(arrayString(sizemap, 0, sizemap.length))
+ }
+
+}
+
+
+/** $factoryInfo
+ * @define Coll mutable.ParSet
+ * @define coll mutable parallel set
+ */
+object ParHashSet extends ParSetFactory[ParHashSet] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParHashSet[T]] = new GenericCanCombineFrom[T]
+
+ override def newBuilder[T]: Combiner[T, ParHashSet[T]] = newCombiner
+
+ override def newCombiner[T]: Combiner[T, ParHashSet[T]] = ParHashSetCombiner.apply[T]
+}
+
+
+private[mutable] abstract class ParHashSetCombiner[T](private val tableLoadFactor: Int)
+extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, ParHashSetCombiner[T]](ParHashSetCombiner.numblocks)
+with collection.mutable.FlatHashTable.HashUtils[T] {
+self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
+ private var mask = ParHashSetCombiner.discriminantmask
+ private var nonmasklen = ParHashSetCombiner.nonmasklength
+
+ def +=(elem: T) = {
+ sz += 1
+ val hc = improve(elemHashCode(elem))
+ val pos = hc >>> nonmasklen
+ if (buckets(pos) eq null) {
+ // initialize bucket
+ buckets(pos) = new UnrolledBuffer[Any]
+ }
+ // add to bucket
+ buckets(pos) += elem
+ this
+ }
+
+ def result: ParHashSet[T] = {
+ val contents = if (size >= ParHashSetCombiner.numblocks * sizeMapBucketSize) parPopulate else seqPopulate
+ new ParHashSet(contents)
+ }
+
+ private def parPopulate: FlatHashTable.Contents[T] = {
+ // construct it in parallel
+ val table = new AddingFlatHashTable(size, tableLoadFactor)
+ val (inserted, leftovers) = executeAndWaitResult(new FillBlocks(buckets, table, 0, buckets.length))
+ var leftinserts = 0
+ for (elem <- leftovers) leftinserts += table.insertEntry(0, table.tableLength, elem.asInstanceOf[T])
+ table.setSize(leftinserts + inserted)
+ table.hashTableContents
+ }
+
+ private def seqPopulate: FlatHashTable.Contents[T] = {
+ // construct it sequentially
+ // TODO parallelize by keeping separate size maps and merging them
+ val tbl = new FlatHashTable[T] {
+ sizeMapInit(table.length)
+ }
+ for {
+ buffer <- buckets;
+ if buffer ne null;
+ elem <- buffer
+ } tbl.addEntry(elem.asInstanceOf[T])
+ tbl.hashTableContents
+ }
+
+ /* classes */
+
+ /** A flat hash table which doesn't resize itself. It accepts the number of elements
+ * it has to take and allocates the underlying hash table in advance.
+ * Elements can only be added to it. The final size has to be adjusted manually.
+ * It is internal to `ParHashSet` combiners.
+ *
+ */
+ class AddingFlatHashTable(numelems: Int, lf: Int) extends FlatHashTable[T] {
+ _loadFactor = lf
+ table = new Array[AnyRef](capacity(FlatHashTable.sizeForThreshold(numelems, _loadFactor)))
+ tableSize = 0
+ threshold = FlatHashTable.newThreshold(_loadFactor, table.length)
+ sizeMapInit(table.length)
+
+ def tableLength = table.length
+
+ def setSize(sz: Int) = tableSize = sz
+
+ /**
+ * The elements are added using the `insertEntry` method. This method accepts three
+ * arguments:
+ *
+ * @param insertAt where to add the element (set to -1 to use its hashcode)
+ * @param comesBefore the position before which the element should be added to
+ * @param elem the element to be added
+ *
+ * If the element is to be inserted at the position corresponding to its hash code,
+ * the table will try to add the element in such a position if possible. Collisions are resolved
+ * using linear hashing, so the element may actually have to be added to a position
+ * that follows the specified one. In the case that the first unoccupied position
+ * comes after `comesBefore`, the element is not added and the method simply returns `-1`,
+ * indicating that it couldn't add the element in a position that comes before the
+ * specified one.
+ * If the element is already present in the hash table, it is not added, and this method
+ * returns 0. If the element is added, it returns 1.
+ */
+ def insertEntry(insertAt: Int, comesBefore: Int, elem: T): Int = {
+ var h = insertAt
+ if (h == -1) h = index(elemHashCode(elem))
+ var entry = table(h)
+ while (null != entry) {
+ if (entry == elem) return 0
+ h = (h + 1) // we *do not* do `(h + 1) % table.length` here, because we don't overlap!!
+ if (h >= comesBefore) return -1
+ entry = table(h)
+ }
+ table(h) = elem.asInstanceOf[AnyRef]
+ tableSize = tableSize + 1
+ nnSizeMapAdd(h)
+ 1
+ }
+ }
+
+ /* tasks */
+
+ class FillBlocks(buckets: Array[UnrolledBuffer[Any]], table: AddingFlatHashTable, val offset: Int, val howmany: Int)
+ extends super.Task[(Int, UnrolledBuffer[Any]), FillBlocks] {
+ var result = (Int.MinValue, new UnrolledBuffer[Any]);
+ def leaf(prev: Option[(Int, UnrolledBuffer[Any])]) {
+ var i = offset
+ var totalinserts = 0
+ var leftover = new UnrolledBuffer[Any]()
+ while (i < (offset + howmany)) {
+ val (inserted, intonextblock) = fillBlock(i, buckets(i), leftover)
+ totalinserts += inserted
+ leftover = intonextblock
+ i += 1
+ }
+ result = (totalinserts, leftover)
+ }
+ private val blocksize = table.tableLength >> ParHashSetCombiner.discriminantbits
+ private def blockStart(block: Int) = block * blocksize
+ private def nextBlockStart(block: Int) = (block + 1) * blocksize
+ private def fillBlock(block: Int, elems: UnrolledBuffer[Any], leftovers: UnrolledBuffer[Any]): (Int, UnrolledBuffer[Any]) = {
+ val beforePos = nextBlockStart(block)
+
+ // store the elems
+ val (elemsIn, elemsLeft) = if (elems != null) insertAll(-1, beforePos, elems) else (0, UnrolledBuffer[Any]())
+
+ // store the leftovers
+ val (leftoversIn, leftoversLeft) = insertAll(blockStart(block), beforePos, leftovers)
+
+ // return the no. of stored elements tupled with leftovers
+ (elemsIn + leftoversIn, elemsLeft concat leftoversLeft)
+ }
+ private def insertAll(atPos: Int, beforePos: Int, elems: UnrolledBuffer[Any]): (Int, UnrolledBuffer[Any]) = {
+ var it = elems.iterator
+ var leftovers = new UnrolledBuffer[Any]
+ var inserted = 0
+ while (it.hasNext) {
+ val elem = it.next
+ val res = table.insertEntry(atPos, beforePos, elem.asInstanceOf[T])
+ if (res >= 0) inserted += res
+ else leftovers += elem
+ }
+ (inserted, leftovers)
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new FillBlocks(buckets, table, offset, fp), new FillBlocks(buckets, table, offset + fp, howmany - fp))
+ }
+ override def merge(that: FillBlocks) {
+ // take the leftovers from the left task, store them into the block of the right task
+ val atPos = blockStart(that.offset)
+ val beforePos = blockStart(that.offset + that.howmany)
+ val (inserted, remainingLeftovers) = insertAll(atPos, beforePos, this.result._2)
+
+ // anything left after trying the store the left leftovers is added to the right task leftovers
+ // and a new leftovers set is produced in this way
+ // the total number of successfully inserted elements is adjusted accordingly
+ result = (this.result._1 + that.result._1 + inserted, remainingLeftovers concat that.result._2)
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, parallelismLevel)
+ }
+
+}
+
+
+private[mutable] object ParHashSetCombiner {
+ private[mutable] val discriminantbits = 5
+ private[mutable] val numblocks = 1 << discriminantbits
+ private[mutable] val discriminantmask = ((1 << discriminantbits) - 1);
+ private[mutable] val nonmasklength = 32 - discriminantbits
+
+ def apply[T] = new ParHashSetCombiner[T](FlatHashTable.defaultLoadFactor) with EnvironmentPassingCombiner[T, ParHashSet[T]]
+}
diff --git a/src/library/scala/collection/parallel/mutable/ParHashTable.scala b/src/library/scala/collection/parallel/mutable/ParHashTable.scala
index a9ab577b55..efba6c8d9c 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashTable.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashTable.scala
@@ -15,14 +15,13 @@ import collection.parallel.ParIterableIterator
*/
trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.mutable.HashTable[K, Entry] {
- // always initialize size map
- if (!isSizeMapDefined) sizeMapInitAndRebuild
+ override def alwaysInitSizeMap = true
/** A parallel iterator returning all the entries.
*/
abstract class EntryIterator[T, +IterRepr <: ParIterableIterator[T]]
(private var idx: Int, private val until: Int, private val totalsize: Int, private var es: Entry)
- extends ParIterableIterator[T] {
+ extends ParIterableIterator[T] with SizeMapUtils {
private val itertable = table
private var traversed = 0
scan()
@@ -78,7 +77,7 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m
val sidx = idx + divsz + 1 // + 1 preserves iteration invariant
val suntil = until
val ses = itertable(sidx - 1).asInstanceOf[Entry] // sidx - 1 ensures counting from the right spot
- val stotal = calcNumElems(sidx - 1, suntil)
+ val stotal = calcNumElems(sidx - 1, suntil, table.length, sizeMapBucketSize)
// first iterator params
val fidx = idx
@@ -110,35 +109,7 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m
buff map { e => entry2item(e) }
}
- private def calcNumElems(from: Int, until: Int) = {
- // find the first bucket
- val fbindex = from / sizeMapBucketSize
-
- // find the last bucket
- val lbindex = until / sizeMapBucketSize
- // note to self: FYI if you define lbindex as from / sizeMapBucketSize, the first branch
- // below always triggers and tests pass, so you spend a great day benchmarking and profiling
-
- if (fbindex == lbindex) {
- // if first and last are the same, just count between `from` and `until`
- // return this count
- countElems(from, until)
- } else {
- // otherwise count in first, then count in last
- val fbuntil = ((fbindex + 1) * sizeMapBucketSize) min itertable.length
- val fbcount = countElems(from, fbuntil)
- val lbstart = lbindex * sizeMapBucketSize
- val lbcount = countElems(lbstart, until)
-
- // and finally count the elements in all the buckets between first and last using a sizemap
- val inbetween = countBucketSizes(fbindex + 1, lbindex)
-
- // return the sum
- fbcount + inbetween + lbcount
- }
- }
-
- private def countElems(from: Int, until: Int) = {
+ protected def countElems(from: Int, until: Int) = {
var c = 0
var idx = from
var es: Entry = null
@@ -153,7 +124,7 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m
c
}
- private def countBucketSizes(fromBucket: Int, untilBucket: Int) = {
+ protected def countBucketSizes(fromBucket: Int, untilBucket: Int) = {
var c = 0
var idx = fromBucket
while (idx < untilBucket) {
@@ -168,7 +139,6 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m
-object ParHashTable {
- var iters = 0
-}
+
+
diff --git a/src/library/scala/collection/parallel/mutable/ParSet.scala b/src/library/scala/collection/parallel/mutable/ParSet.scala
new file mode 100644
index 0000000000..e700bd97d7
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParSet.scala
@@ -0,0 +1,42 @@
+package scala.collection.parallel.mutable
+
+
+
+import scala.collection.generic._
+import scala.collection.parallel.Combiner
+
+
+
+
+
+
+/** A mutable variant of `ParSet`.
+ *
+ * @define Coll mutable.ParSet
+ * @define coll mutable parallel set
+ */
+trait ParSet[T]
+extends collection.mutable.Set[T]
+ with ParIterable[T]
+ with collection.parallel.ParSet[T]
+ with GenericParTemplate[T, ParSet]
+ with ParSetLike[T, ParSet[T], collection.mutable.Set[T]]
+{
+self =>
+ override def companion: GenericCompanion[ParSet] with GenericParCompanion[ParSet] = ParSet;
+ override def empty: ParSet[T] = ParHashSet()
+}
+
+
+/** $factoryInfo
+ * @define Coll mutable.ParSet
+ * @define coll mutable parallel set
+ */
+object ParSet extends ParSetFactory[ParSet] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParSet[T]] = new GenericCanCombineFrom[T]
+
+ override def newBuilder[T]: Combiner[T, ParSet[T]] = ParHashSet.newBuilder
+
+ override def newCombiner[T]: Combiner[T, ParSet[T]] = ParHashSet.newCombiner
+}
+
diff --git a/src/library/scala/collection/parallel/mutable/ParSetLike.scala b/src/library/scala/collection/parallel/mutable/ParSetLike.scala
new file mode 100644
index 0000000000..d3fab5a4db
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParSetLike.scala
@@ -0,0 +1,70 @@
+package scala.collection
+package parallel.mutable
+
+
+
+import scala.collection.mutable.Set
+import scala.collection.mutable.Builder
+
+
+
+
+
+
+
+
+trait ParSetLike[T,
+ +Repr <: ParSetLike[T, Repr, Sequential] with ParSet[T],
+ +Sequential <: mutable.Set[T] with mutable.SetLike[T, Sequential]]
+extends mutable.SetLike[T, Repr]
+ with collection.parallel.ParIterableLike[T, Repr, Sequential]
+ with collection.parallel.ParSetLike[T, Repr, Sequential]
+{ self =>
+
+ protected[this] override def newBuilder: Builder[T, Repr] = newCombiner
+
+ protected[this] override def newCombiner: parallel.Combiner[T, Repr]
+
+ override def empty: Repr
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/mutable/package.scala b/src/library/scala/collection/parallel/mutable/package.scala
index 0590539a29..cddd09f696 100644
--- a/src/library/scala/collection/parallel/mutable/package.scala
+++ b/src/library/scala/collection/parallel/mutable/package.scala
@@ -10,6 +10,43 @@ import scala.collection.generic.Sizing
package object mutable {
+ /* classes and traits */
+
+ private[mutable] trait SizeMapUtils {
+
+ protected def calcNumElems(from: Int, until: Int, tableLength: Int, sizeMapBucketSize: Int) = {
+ // find the first bucket
+ val fbindex = from / sizeMapBucketSize
+
+ // find the last bucket
+ val lbindex = until / sizeMapBucketSize
+ // note to self: FYI if you define lbindex as from / sizeMapBucketSize, the first branch
+ // below always triggers and tests pass, so you spend a great day benchmarking and profiling
+
+ if (fbindex == lbindex) {
+ // if first and last are the same, just count between `from` and `until`
+ // return this count
+ countElems(from, until)
+ } else {
+ // otherwise count in first, then count in last
+ val fbuntil = ((fbindex + 1) * sizeMapBucketSize) min tableLength
+ val fbcount = countElems(from, fbuntil)
+ val lbstart = lbindex * sizeMapBucketSize
+ val lbcount = countElems(lbstart, until)
+
+ // and finally count the elements in all the buckets between first and last using a sizemap
+ val inbetween = countBucketSizes(fbindex + 1, lbindex)
+
+ // return the sum
+ fbcount + inbetween + lbcount
+ }
+ }
+
+ protected def countElems(from: Int, until: Int): Int
+
+ protected def countBucketSizes(fromBucket: Int, untilBucket: Int): Int
+ }
+
/* hack-arounds */
private[mutable] class ExposedArrayBuffer[T] extends ArrayBuffer[T] with Sizing {
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 93f20132e5..0872ccc423 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -19,7 +19,6 @@ package object parallel {
val CHECK_RATE = 512
val SQRT2 = math.sqrt(2)
val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors
- private[parallel] val unrolledsize = 16
/* functions */
@@ -35,6 +34,8 @@ package object parallel {
private[parallel] def unsupportedop(msg: String) = throw new UnsupportedOperationException(msg)
+ private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString)
+
/* implicit conversions */
/** An implicit conversion providing arrays with a `par` method, which
@@ -114,38 +115,6 @@ package object parallel {
final class CompositeThrowable(val throwables: Set[Throwable])
extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.mkString(", "))
- /** Unrolled list node.
- */
- private[parallel] class Unrolled[T: ClassManifest] {
- var size = 0
- var array = new Array[T](unrolledsize)
- var next: Unrolled[T] = null
- // adds and returns itself or the new unrolled if full
- def add(elem: T): Unrolled[T] = if (size < unrolledsize) {
- array(size) = elem
- size += 1
- this
- } else {
- next = new Unrolled[T]
- next.add(elem)
- }
- def foreach[U](f: T => U) {
- var unrolled = this
- var i = 0
- while (unrolled ne null) {
- val chunkarr = unrolled.array
- val chunksz = unrolled.size
- while (i < chunksz) {
- val elem = chunkarr(i)
- f(elem)
- i += 1
- }
- i = 0
- unrolled = unrolled.next
- }
- }
- override def toString = array.take(size).mkString("Unrolled(", ", ", ")") + (if (next ne null) next.toString else "")
- }
/** A helper iterator for iterating very small array buffers.
* Automatically forwards the signal delegate when splitting.
@@ -184,12 +153,13 @@ package object parallel {
* are unrolled linked lists. Some parallel collections are constructed by
* sorting their result set according to some criteria.
*
- * A reference `heads` to bucket heads is maintained, as well as a reference
- * `lasts` to the last unrolled list node. Size is kept in `sz` and maintained
- * whenever 2 bucket combiners are combined.
+ * A reference `buckets` to buckets is maintained. Total size of all buckets
+ * is kept in `sz` and maintained whenever 2 bucket combiners are combined.
*
* Clients decide how to maintain these by implementing `+=` and `result`.
- * Populating and using the buckets is up to the client.
+ * Populating and using the buckets is up to the client. While populating them,
+ * the client should update `sz` accordingly. Note that a bucket is by default
+ * set to `null` to save space - the client should initialize it.
* Note that in general the type of the elements contained in the buckets `Buck`
* doesn't have to correspond to combiner element type `Elem`.
*
@@ -207,15 +177,13 @@ package object parallel {
(private val bucketnumber: Int)
extends Combiner[Elem, To] {
self: EnvironmentPassingCombiner[Elem, To] =>
- protected var heads: Array[Unrolled[Buck]] @uncheckedVariance = new Array[Unrolled[Buck]](bucketnumber)
- protected var lasts: Array[Unrolled[Buck]] @uncheckedVariance = new Array[Unrolled[Buck]](bucketnumber)
+ protected var buckets: Array[UnrolledBuffer[Buck]] @uncheckedVariance = new Array[UnrolledBuffer[Buck]](bucketnumber)
protected var sz: Int = 0
def size = sz
def clear = {
- heads = new Array[Unrolled[Buck]](bucketnumber)
- lasts = new Array[Unrolled[Buck]](bucketnumber)
+ buckets = new Array[UnrolledBuffer[Buck]](bucketnumber)
sz = 0
}
@@ -229,12 +197,10 @@ package object parallel {
val that = other.asInstanceOf[BucketCombiner[Elem, To, Buck, CombinerType]]
var i = 0
while (i < bucketnumber) {
- if (lasts(i) eq null) {
- heads(i) = that.heads(i)
- lasts(i) = that.lasts(i)
+ if (buckets(i) eq null) {
+ buckets(i) = that.buckets(i)
} else {
- lasts(i).next = that.heads(i)
- if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
+ if (that.buckets(i) ne null) buckets(i) concat that.buckets(i)
}
i += 1
}