summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-20 20:19:56 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-20 20:19:56 +0000
commitd3d218e5ea77584489437f0dfa8148ee3764d6f7 (patch)
tree881fba9234da6654e8d914c8b56ddadd100c5cba /src
parentd13a2529aa8218836d13ee04303da4f3325933c2 (diff)
downloadscala-d3d218e5ea77584489437f0dfa8148ee3764d6f7.tar.gz
scala-d3d218e5ea77584489437f0dfa8148ee3764d6f7.tar.bz2
scala-d3d218e5ea77584489437f0dfa8148ee3764d6f7.zip
Further work on parallel mutable hash maps.
Changed HashTable interface. Fixed one test. Implemented hash map iterators. Implementing hash map combiners. Extracting common functionalities of bucket-based combiners. No review.
Diffstat (limited to 'src')
-rw-r--r--src/library/scala/collection/immutable/HashMap.scala2
-rw-r--r--src/library/scala/collection/mutable/HashMap.scala27
-rw-r--r--src/library/scala/collection/mutable/HashTable.scala164
-rw-r--r--src/library/scala/collection/mutable/LinkedHashMap.scala2
-rw-r--r--src/library/scala/collection/parallel/Combiner.scala12
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala6
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala39
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala40
-rw-r--r--src/library/scala/collection/parallel/immutable/package.scala21
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala173
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashTable.scala131
-rw-r--r--src/library/scala/collection/parallel/mutable/ParMap.scala6
-rw-r--r--src/library/scala/collection/parallel/package.scala141
13 files changed, 595 insertions, 169 deletions
diff --git a/src/library/scala/collection/immutable/HashMap.scala b/src/library/scala/collection/immutable/HashMap.scala
index d167e6d6a7..684f9bced4 100644
--- a/src/library/scala/collection/immutable/HashMap.scala
+++ b/src/library/scala/collection/immutable/HashMap.scala
@@ -77,7 +77,7 @@ class HashMap[A, +B] extends Map[A,B] with MapLike[A, B, HashMap[A, B]] with Par
protected def get0(key: A, hash: Int, level: Int): Option[B] = None
- def updated0[B1 >: B](key: A, hash: Int, level: Int, value: B1, kv: (A, B1), merger: Merger[B1]): HashMap[A, B1] =
+ private[collection] def updated0[B1 >: B](key: A, hash: Int, level: Int, value: B1, kv: (A, B1), merger: Merger[B1]): HashMap[A, B1] =
new HashMap.HashMap1(key, hash, value, kv)
protected def removed0(key: A, hash: Int, level: Int): HashMap[A, B] = this
diff --git a/src/library/scala/collection/mutable/HashMap.scala b/src/library/scala/collection/mutable/HashMap.scala
index 1d605fe444..44461eaf74 100644
--- a/src/library/scala/collection/mutable/HashMap.scala
+++ b/src/library/scala/collection/mutable/HashMap.scala
@@ -14,6 +14,10 @@ package mutable
import generic._
+import scala.collection.parallel.mutable.ParHashMap
+
+
+
/** This class implements mutable maps using a hashtable.
*
* @since 1
@@ -36,15 +40,23 @@ import generic._
* @define willNotTerminateInf
*/
@serializable @SerialVersionUID(1L)
-class HashMap[A, B] extends Map[A, B]
- with MapLike[A, B, HashMap[A, B]]
- with HashTable[A] {
+class HashMap[A, B] private[collection] (contents: HashTable.Contents[A, DefaultEntry[A, B]])
+extends Map[A, B]
+ with MapLike[A, B, HashMap[A, B]]
+ with HashTable[A, DefaultEntry[A, B]]
+ with Parallelizable[ParHashMap[A, B]]
+{
+ initWithContents(contents)
+
+ type Entry = DefaultEntry[A, B]
override def empty: HashMap[A, B] = HashMap.empty[A, B]
override def clear() = clearTable()
override def size: Int = tableSize
- type Entry = DefaultEntry[A, B]
+ def this() = this(null)
+
+ def par = new ParHashMap[A, B](contents)
def get(key: A): Option[B] = {
val e = findEntry(key)
@@ -103,6 +115,12 @@ class HashMap[A, B] extends Map[A, B]
def next = iter.next.value
}
+ /** Toggles whether a size map is used to track hash map statistics.
+ */
+ def useSizeMap(t: Boolean) = if (t) {
+ if (!isSizeMapDefined) sizeMapInitAndRebuild
+ } else sizeMapDisable
+
private def writeObject(out: java.io.ObjectOutputStream) {
serializeTo(out, _.value)
}
@@ -110,6 +128,7 @@ class HashMap[A, B] extends Map[A, B]
private def readObject(in: java.io.ObjectInputStream) {
init[B](in, new Entry(_, _))
}
+
}
/** $factoryInfo
diff --git a/src/library/scala/collection/mutable/HashTable.scala b/src/library/scala/collection/mutable/HashTable.scala
index f4a3d8dfa5..5293e40c06 100644
--- a/src/library/scala/collection/mutable/HashTable.scala
+++ b/src/library/scala/collection/mutable/HashTable.scala
@@ -31,25 +31,10 @@ package mutable
*
* @tparam A type of the elements contained in this hash table.
*/
-trait HashTable[A] {
+trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] {
import HashTable._
- protected type Entry >: Null <: HashEntry[A, Entry]
-
- /** The load factor for the hash table (in 0.001 step).
- */
- protected def loadFactor: Int = 750 // corresponds to 75%
- protected final val loadFactorDenum = 1000;
-
- /** The initial size of the hash table.
- */
- protected def initialSize: Int = 16
-
- /** The initial threshold
- */
- protected def initialThreshold: Int = newThreshold(initialCapacity)
-
- @transient private[collection] var _loadFactor = loadFactor
+ @transient protected var _loadFactor = defaultLoadFactor
/** The actual hash table.
*/
@@ -61,9 +46,7 @@ trait HashTable[A] {
/** The next size value at which to resize (capacity * load factor).
*/
- @transient protected var threshold: Int = initialThreshold
-
- private def initialCapacity = capacity(initialSize)
+ @transient protected var threshold: Int = initialThreshold(_loadFactor)
/**
* Initializes the collection from the input stream. `f` will be called for each key/value pair
@@ -79,12 +62,12 @@ trait HashTable[A] {
val size = in.readInt
assert(size >= 0)
- //val smDefined = in.readBoolean
+ val smDefined = in.readBoolean
- table = new Array(capacity(size * loadFactorDenum / _loadFactor))
- threshold = newThreshold(table.size)
+ table = new Array(capacity(sizeForThreshold(_loadFactor, size)))
+ threshold = newThreshold(_loadFactor, table.size)
- //if (smDefined) sizeMapInit(table.size)
+ if (smDefined) sizeMapInit(table.size)
var index = 0
while (index < size) {
@@ -103,17 +86,15 @@ trait HashTable[A] {
*/
private[collection] def serializeTo[B](out: java.io.ObjectOutputStream, value: Entry => B) {
out.defaultWriteObject
- out.writeInt(loadFactor)
+ out.writeInt(_loadFactor)
out.writeInt(tableSize)
- //out.writeBoolean(isSizeMapDefined)
+ out.writeBoolean(isSizeMapDefined)
foreachEntry { entry =>
out.writeObject(entry.key)
out.writeObject(value(entry))
}
}
- private def capacity(expectedSize: Int) = if (expectedSize == 0) 1 else powerOfTwo(expectedSize)
-
/** Find entry with given key in table, null if not found.
*/
protected def findEntry(key: A): Entry = {
@@ -131,7 +112,7 @@ trait HashTable[A] {
e.next = table(h).asInstanceOf[Entry]
table(h) = e
tableSize = tableSize + 1
- sizeMapAdd(h)
+ nnSizeMapAdd(h)
if (tableSize > threshold)
resize(2 * table.length)
}
@@ -145,7 +126,7 @@ trait HashTable[A] {
if (elemEquals(e.key, key)) {
table(h) = e.next
tableSize = tableSize - 1
- sizeMapRemove(h)
+ nnSizeMapRemove(h)
return e
} else {
var e1 = e.next
@@ -156,7 +137,7 @@ trait HashTable[A] {
if (e1 != null) {
e.next = e1.next
tableSize = tableSize - 1
- sizeMapRemove(h)
+ nnSizeMapRemove(h)
return e1
}
}
@@ -211,16 +192,13 @@ trait HashTable[A] {
var i = table.length - 1
while (i >= 0) { table(i) = null; i = i - 1 }
tableSize = 0
- sizeMapReset(0)
+ nnSizeMapReset(0)
}
- private def newThreshold(size: Int) =
- ((size.toLong * _loadFactor)/loadFactorDenum).toInt
-
private def resize(newSize: Int) {
val oldTable = table
table = new Array(newSize)
- sizeMapReset(table.length)
+ nnSizeMapReset(table.length)
var i = oldTable.length - 1
while (i >= 0) {
var e = oldTable(i)
@@ -230,15 +208,18 @@ trait HashTable[A] {
e.next = table(h).asInstanceOf[Entry]
table(h) = e
e = e1
- sizeMapAdd(h)
+ nnSizeMapAdd(h)
}
i = i - 1
}
- threshold = newThreshold(newSize)
+ threshold = newThreshold(_loadFactor, newSize)
}
- protected def sizemap: Array[Int] = null
- private final def sizeMapBucketSize = 1 << 5
+ @transient protected var sizemap: Array[Int] = null
+ protected final def sizeMapBucketBitSize = 5
+ // so that:
+ protected final def sizeMapBucketSize = 1 << sizeMapBucketBitSize
+ protected final def totalSizeMapBuckets = if (sizeMapBucketSize < table.length) 1 else table.length / sizeMapBucketSize
/*
* The following three sizeMap* functions (Add, Remove, Reset)
@@ -247,27 +228,69 @@ trait HashTable[A] {
* The size map logically divides the hash table into `sizeMapBucketSize` element buckets
* by keeping an integer entry for each such bucket. Each integer entry simply denotes
* the number of elements in the corresponding bucket.
+ * Best understood through an example, see:
+ * table = [/, 1, /, 6, 90, /, -3, 5] (8 entries)
+ * sizemap = [ 2 | 3 ] (2 entries)
+ * where sizeMapBucketSize == 4.
*
* By default the size map is not initialized, so these methods don't do anything, thus,
* their impact on hash table performance is negligible. However, if the hash table
* is converted into a parallel hash table, the size map is initialized, as it will be needed
* there.
*/
- protected def sizeMapAdd(h: Int) = if (sizemap ne null) {
- // TODO
+ protected def nnSizeMapAdd(h: Int) = if (sizemap ne null) {
+ sizemap(h >> sizeMapBucketBitSize) += 1
}
- protected def sizeMapRemove(h: Int) = if (sizemap ne null) {
- // TODO
+ protected def nnSizeMapRemove(h: Int) = if (sizemap ne null) {
+ sizemap(h >> sizeMapBucketBitSize) -= 1
}
- protected def sizeMapReset(nTableLength: Int) = if (sizemap ne null) {
- // TODO
+ protected def nnSizeMapReset(tableLength: Int) = if (sizemap ne null) {
+ val nsize = calcSizeMapSize(tableLength)
+ if (sizemap.length != nsize) sizemap = new Array[Int](nsize)
+ else java.util.Arrays.fill(sizemap, 0)
}
- // protected def sizeMapInit(nTableLength: Int) = sizemap = new Array[Int](nTableLength / sizeMapBucketSize)
+ protected def calcSizeMapSize(tableLength: Int) = (tableLength >> sizeMapBucketBitSize) + 1
- // protected def sizeMapDisable = sizemap = null
+ // discards the previous sizemap and only allocates a new one
+ protected def sizeMapInit(tableLength: Int) {
+ sizemap = new Array[Int](calcSizeMapSize(tableLength))
+ }
+
+ // discards the previous sizemap and populates the new one
+ protected def sizeMapInitAndRebuild {
+ sizeMapInit(table.length)
+
+ // go through the buckets, count elements
+ var tableidx = 0
+ var bucketidx = 0
+ val tbl = table
+ var tableuntil = 0
+ if (tbl.length < sizeMapBucketSize) tableuntil = tbl.length else tableuntil = sizeMapBucketSize
+ val totalbuckets = totalSizeMapBuckets
+ while (bucketidx < totalbuckets) {
+ var currbucketsize = 0
+ while (tableidx < tableuntil) {
+ var e = tbl(tableidx)
+ while (e ne null) {
+ currbucketsize += 1
+ e = e.next
+ }
+ tableidx += 1
+ }
+ sizemap(bucketidx) = currbucketsize
+ tableuntil += sizeMapBucketSize
+ bucketidx += 1
+ }
+ }
+
+ def printSizeMap {
+ println(sizemap.toList)
+ }
+
+ protected def sizeMapDisable = sizemap = null
protected def isSizeMapDefined = sizemap ne null
@@ -289,9 +312,45 @@ trait HashTable[A] {
val ones = table.length - 1
(improve(hcode) >> (32 - java.lang.Integer.bitCount(ones))) & ones
}
+
+ protected def initWithContents(c: HashTable.Contents[A, Entry]) = if (c != null) {
+ _loadFactor = c.loadFactor
+ table = c.table
+ tableSize = c.tableSize
+ threshold = c.threshold
+ sizemap = c.sizemap
+ }
+
+ private[collection] def hashTableContents = new HashTable.Contents(
+ _loadFactor,
+ table,
+ tableSize,
+ threshold,
+ sizemap
+ )
}
private[collection] object HashTable {
+ /** The load factor for the hash table (in 0.001 step).
+ */
+ private[collection] final def defaultLoadFactor: Int = 750 // corresponds to 75%
+ private[collection] final def loadFactorDenum = 1000;
+
+ /** The initial size of the hash table.
+ */
+ private[collection] final def initialSize: Int = 16
+
+ /** The initial threshold.
+ */
+ private[collection] final def initialThreshold(_loadFactor: Int): Int = newThreshold(_loadFactor, initialCapacity)
+
+ private[collection] final def initialCapacity = capacity(initialSize)
+
+ private[collection] final def newThreshold(_loadFactor: Int, size: Int) = ((size.toLong * _loadFactor) / loadFactorDenum).toInt
+
+ private[collection] final def sizeForThreshold(_loadFactor: Int, thr: Int) = thr * loadFactorDenum / _loadFactor
+
+ private[collection] final def capacity(expectedSize: Int) = if (expectedSize == 0) 1 else powerOfTwo(expectedSize)
/**
* Returns a power of two >= `target`.
@@ -306,4 +365,13 @@ private[collection] object HashTable {
c |= c >>> 16;
c + 1;
}
+
+ class Contents[A, Entry >: Null <: HashEntry[A, Entry]](
+ val loadFactor: Int,
+ val table: Array[HashEntry[A, Entry]],
+ val tableSize: Int,
+ val threshold: Int,
+ val sizemap: Array[Int]
+ )
+
}
diff --git a/src/library/scala/collection/mutable/LinkedHashMap.scala b/src/library/scala/collection/mutable/LinkedHashMap.scala
index be6442561c..203adcc966 100644
--- a/src/library/scala/collection/mutable/LinkedHashMap.scala
+++ b/src/library/scala/collection/mutable/LinkedHashMap.scala
@@ -47,7 +47,7 @@ object LinkedHashMap extends MutableMapFactory[LinkedHashMap] {
@serializable @SerialVersionUID(1L)
class LinkedHashMap[A, B] extends Map[A, B]
with MapLike[A, B, LinkedHashMap[A, B]]
- with HashTable[A] {
+ with HashTable[A, LinkedEntry[A, B]] {
override def empty = LinkedHashMap.empty[A, B]
override def size = tableSize
diff --git a/src/library/scala/collection/parallel/Combiner.scala b/src/library/scala/collection/parallel/Combiner.scala
index a37f642d42..f47f92457f 100644
--- a/src/library/scala/collection/parallel/Combiner.scala
+++ b/src/library/scala/collection/parallel/Combiner.scala
@@ -19,7 +19,7 @@ import scala.collection.generic.Sizing
* @author prokopec
*/
trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel with TaskSupport {
- self: EnvironmentPassingCombiner[Elem, To] =>
+self: EnvironmentPassingCombiner[Elem, To] =>
type EPC = EnvironmentPassingCombiner[Elem, To]
@@ -35,7 +35,13 @@ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel w
* if they are to be used again.
*
* Also, combining two combiners `c1` and `c2` for which `c1 eq c2` is `true`, that is,
- * they are the same objects in memories, always does nothing and returns the first combiner.
+ * they are the same objects in memory:
+ *
+ * {{{
+ * c1.combine(c2)
+ * }}}
+ *
+ * always does nothing and returns `c1`.
*
* @tparam N the type of elements contained by the `other` builder
* @tparam NewTo the type of collection produced by the `other` builder
@@ -50,7 +56,7 @@ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel w
trait EnvironmentPassingCombiner[-Elem, +To] extends Combiner[Elem, To] {
abstract override def result = {
val res = super.result
-// res.environment = environment
+ if (res.isInstanceOf[TaskSupport]) res.asInstanceOf[TaskSupport].environment = environment
res
}
}
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 4d95043c3a..881ab80038 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -141,7 +141,7 @@ self =>
me: SignalContextPassingIterator[ParIterator] =>
var signalDelegate: Signalling = IdleSignalling
def repr = self.repr
- def split: Seq[ParIterator]
+ def split: Seq[ParIterableIterator[T]]
}
/** A stackable modification that ensures signal contexts get passed along the iterators.
@@ -489,7 +489,9 @@ self =>
override def take(n: Int): Repr = {
val actualn = if (size > n) n else size
if (actualn < MIN_FOR_COPY) take_sequential(actualn)
- else executeAndWaitResult(new Take(actualn, cbfactory, parallelIterator) mapResult { _.result })
+ else executeAndWaitResult(new Take(actualn, cbfactory, parallelIterator) mapResult {
+ _.result
+ })
}
private def take_sequential(n: Int) = {
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 306ec68548..37b52b7a40 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -11,6 +11,7 @@ import scala.collection.parallel.ParMapLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.ParIterableIterator
import scala.collection.parallel.EnvironmentPassingCombiner
+import scala.collection.parallel.Unrolled
import scala.collection.generic.ParMapFactory
import scala.collection.generic.CanCombineFrom
import scala.collection.generic.GenericParMapTemplate
@@ -105,21 +106,13 @@ object ParHashMap extends ParMapFactory[ParHashMap] {
}
-private[immutable] trait HashMapCombiner[K, V]
-extends Combiner[(K, V), ParHashMap[K, V]] {
+private[immutable] abstract class HashMapCombiner[K, V]
+extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), HashMapCombiner[K, V]](HashMapCombiner.rootsize) {
self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
import HashMapCombiner._
- var heads = new Array[Unrolled[(K, V)]](rootsize)
- var lasts = new Array[Unrolled[(K, V)]](rootsize)
- var size: Int = 0
-
- def clear = {
- heads = new Array[Unrolled[(K, V)]](rootsize)
- lasts = new Array[Unrolled[(K, V)]](rootsize)
- }
def +=(elem: (K, V)) = {
- size += 1
+ sz += 1
val hc = elem._1.##
val pos = hc & 0x1f
if (lasts(pos) eq null) {
@@ -132,26 +125,6 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
this
}
- def combine[N <: (K, V), NewTo >: ParHashMap[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
- // ParHashMap.totalcombines.incrementAndGet
- if (other.isInstanceOf[HashMapCombiner[_, _]]) {
- val that = other.asInstanceOf[HashMapCombiner[K, V]]
- var i = 0
- while (i < rootsize) {
- if (lasts(i) eq null) {
- heads(i) = that.heads(i)
- lasts(i) = that.lasts(i)
- } else {
- lasts(i).next = that.heads(i)
- if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
- }
- i += 1
- }
- size = size + that.size
- this
- } else error("Unexpected combiner type.")
- } else this
-
def result = {
val buckets = heads.filter(_ != null)
val root = new Array[HashMap[K, V]](buckets.length)
@@ -216,8 +189,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
}
-object HashMapCombiner {
- def apply[K, V] = new HashMapCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] {}
+private[parallel] object HashMapCombiner {
+ def apply[K, V] = new HashMapCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]]
private[immutable] val rootbits = 5
private[immutable] val rootsize = 1 << 5
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
index 0ef2681567..c9554ae1eb 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -11,6 +11,7 @@ import scala.collection.parallel.ParSetLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.ParIterableIterator
import scala.collection.parallel.EnvironmentPassingCombiner
+import scala.collection.parallel.Unrolled
import scala.collection.generic.ParSetFactory
import scala.collection.generic.CanCombineFrom
import scala.collection.generic.GenericParTemplate
@@ -101,21 +102,13 @@ object ParHashSet extends ParSetFactory[ParHashSet] {
}
-private[immutable] trait HashSetCombiner[T]
-extends Combiner[T, ParHashSet[T]] {
+private[immutable] abstract class HashSetCombiner[T]
+extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombiner[T]](HashSetCombiner.rootsize) {
self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
import HashSetCombiner._
- var heads = new Array[Unrolled[Any]](rootsize)
- var lasts = new Array[Unrolled[Any]](rootsize)
- var size: Int = 0
-
- def clear = {
- heads = new Array[Unrolled[Any]](rootsize)
- lasts = new Array[Unrolled[Any]](rootsize)
- }
def +=(elem: T) = {
- size += 1
+ sz += 1
val hc = elem.##
val pos = hc & 0x1f
if (lasts(pos) eq null) {
@@ -128,25 +121,6 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
this
}
- def combine[N <: T, NewTo >: ParHashSet[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
- if (other.isInstanceOf[HashSetCombiner[_]]) {
- val that = other.asInstanceOf[HashSetCombiner[T]]
- var i = 0
- while (i < rootsize) {
- if (lasts(i) eq null) {
- heads(i) = that.heads(i)
- lasts(i) = that.lasts(i)
- } else {
- lasts(i).next = that.heads(i)
- if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
- }
- i += 1
- }
- size = size + that.size
- this
- } else error("Unexpected combiner type.")
- } else this
-
def result = {
val buckets = heads.filter(_ != null)
val root = new Array[HashSet[T]](buckets.length)
@@ -171,7 +145,8 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
/* tasks */
- class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
+ class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int)
+ extends super.Task[Unit, CreateTrie] {
var result = ()
def leaf(prev: Option[Unit]) = {
var i = offset
@@ -274,6 +249,3 @@ object HashSetCombiner {
-
-
-
diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala
index c54875ecd3..9c6bbae8dd 100644
--- a/src/library/scala/collection/parallel/immutable/package.scala
+++ b/src/library/scala/collection/parallel/immutable/package.scala
@@ -15,25 +15,9 @@ package object immutable {
/* package level methods */
def repetition[T](elem: T, len: Int) = new Repetition(elem, len)
- /* properties */
- private[immutable] val unrolledsize = 16
+ /* constants */
/* classes */
- private[immutable] class Unrolled[T: ClassManifest] {
- var size = 0
- var array = new Array[T](unrolledsize)
- var next: Unrolled[T] = null
- // adds and returns itself or the new unrolled if full
- def add(elem: T): Unrolled[T] = if (size < unrolledsize) {
- array(size) = elem
- size += 1
- this
- } else {
- next = new Unrolled[T]
- next.add(elem)
- }
- override def toString = "Unrolled(" + array.mkString(", ") + ")"
- }
/** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method.
*
@@ -42,8 +26,7 @@ package object immutable {
* @param length the length of the collection
*/
private[parallel] class Repetition[T](elem: T, val length: Int) extends ParSeq[T] {
- self =>
-
+ self =>
def apply(idx: Int) = if (0 <= idx && idx < length) elem else throw new IndexOutOfBoundsException
def seq = throw new UnsupportedOperationException
def update(idx: Int, elem: T) = throw new UnsupportedOperationException
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
index 057faa66e1..fb4119bddc 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -5,53 +5,198 @@ package mutable
import collection.generic._
+import collection.mutable.DefaultEntry
+import collection.mutable.HashEntry
+import collection.mutable.HashTable
-
-
-class ParHashMap[K, V]
+class ParHashMap[K, V] private[collection] (contents: HashTable.Contents[K, DefaultEntry[K, V]])
extends ParMap[K, V]
with GenericParMapTemplate[K, V, ParHashMap]
with ParMapLike[K, V, ParHashMap[K, V], collection.mutable.HashMap[K, V]]
+ with ParHashTable[K, DefaultEntry[K, V]]
{
self =>
+ initWithContents(contents)
+
+ type Entry = collection.mutable.DefaultEntry[K, V]
+
+ def this() = this(null)
override def mapCompanion: GenericParMapCompanion[ParHashMap] = ParHashMap
override def empty: ParHashMap[K, V] = new ParHashMap[K, V]
- def parallelIterator = null // TODO
+ def seq = new collection.mutable.HashMap[K, V](hashTableContents)
- def seq = null // TODO
+ def parallelIterator = new ParHashMapIterator(0, table.length, size, table(0).asInstanceOf[DefaultEntry[K, V]]) with SCPI
- def get(k: K): Option[V] = null // TODO
+ override def size = tableSize
- def +=(kv: (K, V)) = null // TODO
+ def get(key: K): Option[V] = {
+ val e = findEntry(key)
+ if (e == null) None
+ else Some(e.value)
+ }
- def -=(k: K) = null // TODO
+ override def put(key: K, value: V): Option[V] = {
+ val e = findEntry(key)
+ if (e == null) { addEntry(new Entry(key, value)); None }
+ else { val v = e.value; e.value = value; Some(v) }
+ }
- override def size = 0 // TODO
+ override def update(key: K, value: V): Unit = put(key, value)
-}
+ override def remove(key: K): Option[V] = {
+ val e = removeEntry(key)
+ if (e ne null) Some(e.value)
+ else None
+ }
+ def += (kv: (K, V)): this.type = {
+ val e = findEntry(kv._1)
+ if (e == null) addEntry(new Entry(kv._1, kv._2))
+ else e.value = kv._2
+ this
+ }
+ def -=(key: K): this.type = { removeEntry(key); this }
+ type SCPI = SignalContextPassingIterator[ParHashMapIterator]
-object ParHashMap extends ParMapFactory[ParHashMap] {
+ class ParHashMapIterator(start: Int, untilIdx: Int, totalSize: Int, e: DefaultEntry[K, V])
+ extends EntryIterator[(K, V), ParHashMapIterator](start, untilIdx, totalSize, e) with ParIterator {
+ me: SCPI =>
+ def entry2item(entry: DefaultEntry[K, V]) = (entry.key, entry.value);
+ def newIterator(idxFrom: Int, idxUntil: Int, totalSz: Int, es: DefaultEntry[K, V]) =
+ new ParHashMapIterator(idxFrom, idxUntil, totalSz, es) with SCPI
+ }
- def empty[K, V]: ParHashMap[K, V] = new ParHashMap[K, V]
+}
- def newCombiner[K, V]: Combiner[(K, V), ParHashMap[K, V]] = null // TODO
- implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashMap[K, V]] = null // TODO
+object ParHashMap extends ParMapFactory[ParHashMap] {
+ def empty[K, V]: ParHashMap[K, V] = new ParHashMap[K, V]
+ def newCombiner[K, V]: Combiner[(K, V), ParHashMap[K, V]] = ParHashMapCombiner.apply[K, V]
+
+ implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashMap[K, V]] = new CanCombineFromMap[K, V]
}
+private[mutable] abstract class ParHashMapCombiner[K, V](private val tableLoadFactor: Int)
+extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntry[K, V], ParHashMapCombiner[K, V]](ParHashMapCombiner.numblocks) {
+self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
+ private var mask = ParHashMapCombiner.discriminantmask
+
+ def +=(elem: (K, V)) = {
+ sz += 1
+ val hc = elem._1.##
+ val pos = hc & mask
+ if (lasts(pos) eq null) {
+ // initialize bucket
+ heads(pos) = new Unrolled[DefaultEntry[K, V]]
+ lasts(pos) = heads(pos)
+ }
+ // add to bucket
+ lasts(pos) = lasts(pos).add(new DefaultEntry(elem._1, elem._2))
+ this
+ }
+
+ def result: ParHashMap[K, V] = {
+ // construct table
+ val table = new AddingHashTable(size, tableLoadFactor)
+
+ executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks))
+
+ val c = table.hashTableContents
+ new ParHashMap(c)
+ }
+
+ /** A hash table which will never resize itself. Knowing the number of elements in advance,
+ * it allocates the table of the required size when created.
+ *
+ * Entries are added using the `insertEntry` method. This method checks whether the element
+ * exists and updates the size map.
+ */
+ class AddingHashTable(numelems: Int, lf: Int) extends HashTable[K, DefaultEntry[K, V]] {
+ import HashTable._
+ _loadFactor = lf
+ table = new Array[HashEntry[K, DefaultEntry[K, V]]](capacity(sizeForThreshold(_loadFactor, numelems)))
+ tableSize = 0
+ threshold = newThreshold(_loadFactor, table.length)
+ sizeMapInit(table.length)
+ def insertEntry(e: DefaultEntry[K, V]) {
+ var h = index(elemHashCode(e.key))
+ var olde = table(h).asInstanceOf[DefaultEntry[K, V]]
+
+ // check if key already exists
+ var ce = olde
+ while (ce ne null) {
+ if (ce.key == e.key) {
+ h = -1
+ ce = null
+ } else ce = ce.next
+ }
+
+ // if key does not already exist
+ if (h != -1) {
+ e.next = olde
+ table(h) = e
+ tableSize = tableSize + 1
+ nnSizeMapAdd(h)
+ }
+ }
+ }
+
+ /* tasks */
+
+ class FillBlocks(buckets: Array[Unrolled[DefaultEntry[K, V]]], table: AddingHashTable, offset: Int, howmany: Int)
+ extends super.Task[Unit, FillBlocks] {
+ var result = ()
+ def leaf(prev: Option[Unit]) = {
+ var i = offset
+ val until = offset + howmany
+ while (i < until) {
+ fillBlock(buckets(i))
+ i += 1
+ }
+ }
+ private def fillBlock(elems: Unrolled[DefaultEntry[K, V]]) {
+ var unrolled = elems
+ var i = 0
+ val t = table
+ while (unrolled ne null) {
+ val chunkarr = unrolled.array
+ val chunksz = unrolled.size
+ while (i < chunksz) {
+ val elem = chunkarr(i)
+ t.insertEntry(elem)
+ i += 1
+ }
+ i = 0
+ unrolled = unrolled.next
+ }
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new FillBlocks(buckets, table, offset, fp), new FillBlocks(buckets, table, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, parallelismLevel)
+ }
+}
+private[mutable] object ParHashMapCombiner {
+ private[mutable] val discriminantbits = 5
+ private[mutable] val numblocks = 1 << discriminantbits
+ private[mutable] val discriminantmask = ((1 << discriminantbits) - 1) << (32 - discriminantbits)
+
+ def apply[K, V] = new ParHashMapCombiner[K, V](HashTable.defaultLoadFactor) with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]]
+}
+
diff --git a/src/library/scala/collection/parallel/mutable/ParHashTable.scala b/src/library/scala/collection/parallel/mutable/ParHashTable.scala
index 9e356b7fb4..2617685a3d 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashTable.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashTable.scala
@@ -5,7 +5,7 @@ package parallel.mutable
import collection.mutable.HashEntry
-
+import collection.parallel.ParIterableIterator
@@ -13,9 +13,134 @@ import collection.mutable.HashEntry
* enriching the data structure by fulfilling certain requirements
* for their parallel construction and iteration.
*/
-trait ParHashTable[K] {
+trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.mutable.HashTable[K, Entry] {
+
+ // always initialize size map
+ if (!isSizeMapDefined) sizeMapInitAndRebuild
+
+ /** A parallel iterator returning all the entries.
+ */
+ abstract class EntryIterator[T, +IterRepr <: ParIterableIterator[T]]
+ (private var idx: Int, private val until: Int, private val totalsize: Int, private var es: Entry)
+ extends ParIterableIterator[T] {
+ private val itertable = table
+ private var traversed = 0
+ scan()
+
+ def entry2item(e: Entry): T
+ def newIterator(idxFrom: Int, idxUntil: Int, totalSize: Int, es: Entry): IterRepr
+
+ def hasNext = es != null
+
+ def next = {
+ val res = es
+ es = es.next
+ scan()
+ traversed += 1
+ entry2item(res)
+ }
+
+ def scan() {
+ while (es == null && idx < until) {
+ es = itertable(idx).asInstanceOf[Entry]
+ idx = idx + 1
+ }
+ }
+
+ def remaining = totalsize - traversed
+
+ def split: Seq[ParIterableIterator[T]] = if (remaining > 1) {
+ if ((until - idx) > 1) {
+ // there is at least one more slot for the next iterator
+ // divide the rest of the table
+ val divsz = (until - idx) / 2
+
+ // second iterator params
+ val sidx = idx + divsz
+ val suntil = until
+ val ses = itertable(sidx).asInstanceOf[Entry]
+ val stotal = calcNumElems(sidx, suntil)
+
+ // first iterator params
+ val fidx = idx
+ val funtil = idx + divsz
+ val fes = es
+ val ftotal = totalsize - stotal
+
+ Seq(
+ newIterator(fidx, funtil, ftotal, fes),
+ newIterator(sidx, suntil, stotal, ses)
+ )
+ } else {
+ // otherwise, this is the last entry in the table - all what remains is the chain
+ // so split the rest of the chain
+ val arr = convertToArrayBuffer(es)
+ val arrpit = new collection.parallel.BufferIterator[T](arr, 0, arr.length, signalDelegate)
+ arrpit.split
+ }
+ } else Seq(this.asInstanceOf[IterRepr])
+
+ private def convertToArrayBuffer(chainhead: Entry): mutable.ArrayBuffer[T] = {
+ var buff = mutable.ArrayBuffer[Entry]()
+ var curr = chainhead
+ while (curr != null) {
+ buff += curr
+ curr = curr.next
+ }
+ buff map { e => entry2item(e) }
+ }
+
+ private def calcNumElems(from: Int, until: Int) = {
+ // find the first bucket
+ val fbindex = from / sizeMapBucketSize
+
+ // find the last bucket
+ val lbindex = from / sizeMapBucketSize
+
+ if (fbindex == lbindex) {
+ // if first and last are the same, just count between `from` and `until`
+ // return this count
+ countElems(from, until)
+ } else {
+ // otherwise count in first, then count in last
+ val fbuntil = ((fbindex + 1) * sizeMapBucketSize) min itertable.length
+ val fbcount = countElems(from, fbuntil)
+ val lbstart = lbindex * sizeMapBucketSize
+ val lbcount = countElems(lbstart, until)
+
+ // and finally count the elements in all the buckets between first and last using a sizemap
+ val inbetween = countBucketSizes(fbindex + 1, lbindex)
+
+ // return the sum
+ fbcount + inbetween + lbcount
+ }
+ }
+
+ private def countElems(from: Int, until: Int) = {
+ var c = 0
+ var idx = from
+ var es: Entry = null
+ while (idx < until) {
+ es = itertable(idx).asInstanceOf[Entry]
+ while (es ne null) {
+ c += 1
+ es = es.next
+ }
+ idx += 1
+ }
+ c
+ }
- protected type Entry >: Null <: HashEntry[K, Entry]
+ private def countBucketSizes(fromBucket: Int, untilBucket: Int) = {
+ var c = 0
+ var idx = fromBucket
+ while (idx < untilBucket) {
+ c += sizemap(idx)
+ idx += 1
+ }
+ c
+ }
+ }
}
diff --git a/src/library/scala/collection/parallel/mutable/ParMap.scala b/src/library/scala/collection/parallel/mutable/ParMap.scala
index 63342fa1bc..cb6014289d 100644
--- a/src/library/scala/collection/parallel/mutable/ParMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParMap.scala
@@ -18,16 +18,16 @@ extends collection.mutable.Map[K, V]
override def mapCompanion: GenericParMapCompanion[ParMap] = ParMap
- override def empty: ParMap[K, V] = null // TODO
+ override def empty: ParMap[K, V] = new ParHashMap[K, V]
}
object ParMap extends ParMapFactory[ParMap] {
- def empty[K, V]: ParMap[K, V] = null // TODO
+ def empty[K, V]: ParMap[K, V] = new ParHashMap[K, V]
- def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = null // TODO
+ def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = ParHashMapCombiner.apply[K, V]
implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParMap[K, V]] = new CanCombineFromMap[K, V]
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 76677a1148..a30d564039 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -7,13 +7,21 @@ import scala.collection.generic.CanBuildFrom
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.mutable.ParArray
+import annotation.unchecked.uncheckedVariance
+
/** Package object for parallel collections.
*/
package object parallel {
- val MIN_FOR_COPY = -1 // TODO: set to 5000
+
+ /* constants */
+ val MIN_FOR_COPY = -1
val CHECK_RATE = 512
val SQRT2 = math.sqrt(2)
+ val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors
+ private[parallel] val unrolledsize = 16
+
+ /* functions */
/** Computes threshold from the size of the collection and the parallelism level.
*/
@@ -23,11 +31,136 @@ package object parallel {
else sz
}
- val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors
+ private[parallel] def unsupported(msg: String) = throw new UnsupportedOperationException(msg)
+
+ private[parallel] def unsupported = throw new UnsupportedOperationException
+
+ /* classes */
- def unsupported(msg: String) = throw new UnsupportedOperationException(msg)
+ /** Unrolled list node.
+ */
+ private[parallel] class Unrolled[T: ClassManifest] {
+ var size = 0
+ var array = new Array[T](unrolledsize)
+ var next: Unrolled[T] = null
+ // adds and returns itself or the new unrolled if full
+ def add(elem: T): Unrolled[T] = if (size < unrolledsize) {
+ array(size) = elem
+ size += 1
+ this
+ } else {
+ next = new Unrolled[T]
+ next.add(elem)
+ }
+ def foreach[U](f: T => U) {
+ var unrolled = this
+ var i = 0
+ while (unrolled ne null) {
+ val chunkarr = unrolled.array
+ val chunksz = unrolled.size
+ while (i < chunksz) {
+ val elem = chunkarr(i)
+ f(elem)
+ i += 1
+ }
+ i = 0
+ unrolled = unrolled.next
+ }
+ }
+ override def toString = array.mkString("Unrolled(", ", ", ")")
+ }
+
+ /** A helper iterator for iterating very small array buffers.
+ * Automatically forwards the signal delegate when splitting.
+ */
+ private[parallel] class BufferIterator[T]
+ (private val buffer: collection.mutable.ArrayBuffer[T], private var index: Int, private val until: Int, var signalDelegate: collection.generic.Signalling)
+ extends ParIterableIterator[T] {
+ def hasNext = index < until
+ def next = {
+ val r = buffer(index)
+ index += 1
+ r
+ }
+ def remaining = until - index
+ def split: Seq[ParIterableIterator[T]] = if (remaining > 1) {
+ val divsz = (until - index) / 2
+ Seq(
+ new BufferIterator(buffer, index, index + divsz, signalDelegate),
+ new BufferIterator(buffer, index + divsz, until, signalDelegate)
+ )
+ } else Seq(this)
+ }
+
+ /** A helper combiner which contains an array of buckets. Buckets themselves
+ * are unrolled linked lists. Some parallel collections are constructed by
+ * sorting their result set according to some criteria.
+ *
+ * A reference `heads` to bucket heads is maintained, as well as a reference
+ * `lasts` to the last unrolled list node. Size is kept in `sz` and maintained
+ * whenever 2 bucket combiners are combined.
+ *
+ * Clients decide how to maintain these by implementing `+=` and `result`.
+ * Populating and using the buckets is up to the client.
+ * Note that in general the type of the elements contained in the buckets `Buck`
+ * doesn't have to correspond to combiner element type `Elem`.
+ *
+ * This class simply gives an efficient `combine` for free - it chains
+ * the buckets together. Since the `combine` contract states that the receiver (`this`)
+ * becomes invalidated, `combine` reuses the receiver and returns it.
+ *
+ * Methods `beforeCombine` and `afterCombine` are called before and after
+ * combining the buckets, respectively, given that the argument to `combine`
+ * is not `this` (as required by the `combine` contract).
+ * They can be overriden in subclasses to provide custom behaviour by modifying
+ * the receiver (which will be the return value).
+ */
+ private[parallel] abstract class BucketCombiner[-Elem, +To, Buck, +CombinerType <: BucketCombiner[Elem, To, Buck, CombinerType]]
+ (private val bucketnumber: Int)
+ extends Combiner[Elem, To] {
+ self: EnvironmentPassingCombiner[Elem, To] =>
+ protected var heads: Array[Unrolled[Buck]] @uncheckedVariance = new Array[Unrolled[Buck]](bucketnumber)
+ protected var lasts: Array[Unrolled[Buck]] @uncheckedVariance = new Array[Unrolled[Buck]](bucketnumber)
+ protected var sz: Int = 0
+
+ def size = sz
+
+ def clear = {
+ heads = new Array[Unrolled[Buck]](bucketnumber)
+ lasts = new Array[Unrolled[Buck]](bucketnumber)
+ sz = 0
+ }
+
+ def beforeCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
+ def afterCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
+
+ def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
+ if (other.isInstanceOf[BucketCombiner[_, _, _, _]]) {
+ beforeCombine(other)
+
+ val that = other.asInstanceOf[BucketCombiner[Elem, To, Buck, CombinerType]]
+ var i = 0
+ while (i < bucketnumber) {
+ if (lasts(i) eq null) {
+ heads(i) = that.heads(i)
+ lasts(i) = that.lasts(i)
+ } else {
+ lasts(i).next = that.heads(i)
+ if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
+ }
+ i += 1
+ }
+ sz = sz + that.size
+
+ afterCombine(other)
+
+ this
+ } else error("Unexpected combiner type.")
+ } else this
+
+ }
- def unsupported = throw new UnsupportedOperationException
+ /* implicit conversions */
/** An implicit conversion providing arrays with a `par` method, which
* returns a parallel array.