summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-28 12:10:00 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-28 12:10:00 +0000
commit8d311558f3774cd628a53fc675da93b550d06090 (patch)
tree6811285bc37af76d356980adaa579fa1d181faaf /src/library
parent962a348ab26f189a19dd74aeb3bbc8fd5d63061a (diff)
downloadscala-8d311558f3774cd628a53fc675da93b550d06090.tar.gz
scala-8d311558f3774cd628a53fc675da93b550d06090.tar.bz2
scala-8d311558f3774cd628a53fc675da93b550d06090.zip
Debugging parallel hash tables.
No review.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/collection/mutable/HashTable.scala116
-rw-r--r--src/library/scala/collection/package.scala13
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala9
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala92
-rw-r--r--src/library/scala/collection/parallel/package.scala2
5 files changed, 176 insertions, 56 deletions
diff --git a/src/library/scala/collection/mutable/HashTable.scala b/src/library/scala/collection/mutable/HashTable.scala
index 3d31c3860e..c233bd2719 100644
--- a/src/library/scala/collection/mutable/HashTable.scala
+++ b/src/library/scala/collection/mutable/HashTable.scala
@@ -31,7 +31,7 @@ package mutable
*
* @tparam A type of the elements contained in this hash table.
*/
-trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] {
+trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashUtils[A] {
import HashTable._
@transient protected var _loadFactor = defaultLoadFactor
@@ -218,10 +218,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] {
}
@transient protected var sizemap: Array[Int] = null
- protected final def sizeMapBucketBitSize = 5
- // so that:
- protected final def sizeMapBucketSize = 1 << sizeMapBucketBitSize
- protected final def totalSizeMapBuckets = if (sizeMapBucketSize < table.length) 1 else table.length / sizeMapBucketSize
+ private[collection] final def totalSizeMapBuckets = if (sizeMapBucketSize < table.length) 1 else table.length / sizeMapBucketSize
/*
* The following three sizeMap* functions (Add, Remove, Reset)
@@ -298,39 +295,6 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] {
protected def elemEquals(key1: A, key2: A): Boolean = (key1 == key2)
- protected def elemHashCode(key: A) = if (key == null) 0 else key.##
-
- protected final def improve(hcode: Int) = {
- /* Murmur hash
- * m = 0x5bd1e995
- * r = 24
- * note: h = seed = 0 in mmix
- * mmix(h,k) = k *= m; k ^= k >> r; k *= m; h *= m; h ^= k; */
- var k = hcode * 0x5bd1e995
- k ^= k >> 24
- k *= 0x5bd1e995
- k
-
- /* Jenkins hash
- * for range 0-10000, output has the msb set to zero */
- // var h = hcode + (hcode << 12)
- // h ^= (h >> 22)
- // h += (h << 4)
- // h ^= (h >> 9)
- // h += (h << 10)
- // h ^= (h >> 2)
- // h += (h << 7)
- // h ^= (h >> 12)
- // h
-
- /* OLD VERSION
- * since 2003 */
- // var h: Int = hcode + ~(hcode << 9)
- // h = h ^ (h >>> 14)
- // h = h + (h << 4)
- // h ^ (h >>> 10)
- }
-
// Note:
// we take the most significant bits of the hashcode, not the lower ones
// this is of crucial importance when populating the table in parallel
@@ -380,6 +344,68 @@ private[collection] object HashTable {
private[collection] final def capacity(expectedSize: Int) = if (expectedSize == 0) 1 else powerOfTwo(expectedSize)
+ trait HashUtils[KeyType] {
+ protected final def sizeMapBucketBitSize = 5
+ // so that:
+ protected final def sizeMapBucketSize = 1 << sizeMapBucketBitSize
+
+ protected def elemHashCode(key: KeyType) = if (key == null) 0 else key.##
+
+ protected final def improve(hcode: Int) = {
+ /* Murmur hash
+ * m = 0x5bd1e995
+ * r = 24
+ * note: h = seed = 0 in mmix
+ * mmix(h,k) = k *= m; k ^= k >> r; k *= m; h *= m; h ^= k; */
+ // var k = hcode * 0x5bd1e995
+ // k ^= k >> 24
+ // k *= 0x5bd1e995
+ // k
+
+ /* Another fast multiplicative hash
+ * by Phil Bagwell
+ *
+ * Comment:
+ * Multiplication doesn't affect all the bits in the same way, so we want to
+ * multiply twice, "once from each side".
+ * It would be ideal to reverse all the bits after the first multiplication,
+ * however, this is more costly. We therefore restrict ourselves only to
+ * reversing the bytes before final multiplication. This yields a slightly
+ * worse entropy in the lower 8 bits, but that can be improved by adding:
+ *
+ * `i ^= i >> 6`
+ *
+ * For performance reasons, we avoid this improvement.
+ * */
+ var i = hcode * 0x9e3775cd
+ i = java.lang.Integer.reverseBytes(i)
+ i * 0x9e3775cd
+ // a slower alternative for byte reversal:
+ // i = (i << 16) | (i >> 16)
+ // i = ((i >> 8) & 0x00ff00ff) | ((i << 8) & 0xff00ff00)
+
+ /* Jenkins hash
+ * for range 0-10000, output has the msb set to zero */
+ // var h = hcode + (hcode << 12)
+ // h ^= (h >> 22)
+ // h += (h << 4)
+ // h ^= (h >> 9)
+ // h += (h << 10)
+ // h ^= (h >> 2)
+ // h += (h << 7)
+ // h ^= (h >> 12)
+ // h
+
+ /* OLD VERSION
+ * quick, but bad for sequence 0-10000 - little enthropy in higher bits
+ * since 2003 */
+ // var h: Int = hcode + ~(hcode << 9)
+ // h = h ^ (h >>> 14)
+ // h = h + (h << 4)
+ // h ^ (h >>> 10)
+ }
+ }
+
/**
* Returns a power of two >= `target`.
*/
@@ -400,6 +426,18 @@ private[collection] object HashTable {
val tableSize: Int,
val threshold: Int,
val sizemap: Array[Int]
- )
+ ) {
+ import collection.DebugUtils._
+ private[collection] def debugInformation = buildString {
+ append =>
+ append("Hash table contents")
+ append("-------------------")
+ append("Table: [" + arrayString(table, 0, table.length) + "]")
+ append("Table size: " + tableSize)
+ append("Load factor: " + loadFactor)
+ append("Threshold: " + threshold)
+ append("Sizemap: [" + arrayString(sizemap, 0, sizemap.length) + "]")
+ }
+ }
}
diff --git a/src/library/scala/collection/package.scala b/src/library/scala/collection/package.scala
index 19d65b73e2..f1eb50c5e0 100644
--- a/src/library/scala/collection/package.scala
+++ b/src/library/scala/collection/package.scala
@@ -76,4 +76,17 @@ package object collection {
new CanBuildFrom[From, T, To] { // TODO: could we just return b instead?
def apply(from: From) = b.apply() ; def apply() = b.apply()
}
+
+ private[collection] object DebugUtils {
+ /* debug utils */
+ def buildString(closure: (String => Unit) => Unit): String = {
+ var output = ""
+ def appendln(s: String) = output += s + "\n"
+ closure(appendln)
+ output
+ }
+
+ def arrayString[T](array: Array[T], from: Int, until: Int) = array.slice(from, until).map(x => if (x != null) x.toString else "n/a").mkString(" | ")
+ }
+
}
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 348068c78c..681fe3570e 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -275,7 +275,6 @@ self =>
* if this $coll is empty.
*/
def reduce[U >: T](op: (U, U) => U): U = {
- println("------------------------------------------------")
executeAndWaitResult(new Reduce(op, parallelIterator) mapResult { _.get })
}
@@ -758,7 +757,7 @@ self =>
protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] {
var result: Option[U] = None
def leaf(prevr: Option[Option[U]]) = {
- // pit.printDebugInformation
+ // pit.debugInformation
// val rem = pit.remaining
// val lst = pit.toList
// val pa = mutable.ParArray(lst: _*)
@@ -1227,6 +1226,12 @@ self =>
override def merge(that: FromArray[S, A, That]) = result = result combine that.result
}
+ /* debug information */
+
+ private[parallel] def debugInformation = "Parallel collection: " + this.getClass
+
+ private[parallel] def brokenInvariants = Seq[String]()
+
}
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
index c5b404e092..66c491086f 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -75,6 +75,33 @@ self =>
new ParHashMapIterator(idxFrom, idxUntil, totalSz, es) with SCPI
}
+ private[parallel] override def brokenInvariants = {
+ // bucket by bucket, count elements
+ val buckets = for (i <- 0 until (table.length / sizeMapBucketSize)) yield checkBucket(i)
+
+ // check if each element is in the position corresponding to its key
+ val elems = for (i <- 0 until table.length) yield checkEntry(i)
+
+ buckets.flatMap(x => x) ++ elems.flatMap(x => x)
+ }
+
+ private def checkBucket(i: Int) = {
+ def count(e: HashEntry[K, DefaultEntry[K, V]]): Int = if (e eq null) 0 else 1 + count(e.next)
+ val expected = sizemap(i)
+ val found = ((i * sizeMapBucketSize) until ((i + 1) * sizeMapBucketSize)).foldLeft(0) {
+ (acc, c) => acc + count(table(c))
+ }
+ if (found != expected) List("Found " + found + " elements, while sizemap showed " + expected)
+ else Nil
+ }
+
+ private def checkEntry(i: Int) = {
+ def check(e: HashEntry[K, DefaultEntry[K, V]]): List[String] = if (e eq null) Nil else
+ if (index(elemHashCode(e.key)) == i) check(e.next)
+ else ("Element " + e.key + " at " + i + " with " + elemHashCode(e.key) + " maps to " + index(elemHashCode(e.key))) :: check(e.next)
+ check(table(i))
+ }
+
}
@@ -88,14 +115,17 @@ object ParHashMap extends ParMapFactory[ParHashMap] {
private[mutable] abstract class ParHashMapCombiner[K, V](private val tableLoadFactor: Int)
-extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntry[K, V], ParHashMapCombiner[K, V]](ParHashMapCombiner.numblocks) {
+extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntry[K, V], ParHashMapCombiner[K, V]](ParHashMapCombiner.numblocks)
+ with collection.mutable.HashTable.HashUtils[K]
+{
self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
private var mask = ParHashMapCombiner.discriminantmask
+ private var nonmasklen = ParHashMapCombiner.nonmasklength
def +=(elem: (K, V)) = {
sz += 1
- val hc = elem._1.##
- val pos = hc & mask
+ val hc = improve(elemHashCode(elem._1))
+ val pos = (hc >>> nonmasklen)
if (lasts(pos) eq null) {
// initialize bucket
heads(pos) = new Unrolled[DefaultEntry[K, V]]
@@ -106,14 +136,28 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
this
}
- def result: ParHashMap[K, V] = {
+ def result: ParHashMap[K, V] = if (size >= (ParHashMapCombiner.numblocks * sizeMapBucketSize)) { // 1024
// construct table
val table = new AddingHashTable(size, tableLoadFactor)
-
val insertcount = executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks))
-
+ table.setSize(insertcount)
// TODO compare insertcount and size to see if compression is needed
-
+ val c = table.hashTableContents
+ new ParHashMap(c)
+ } else {
+ // construct a normal table and fill it sequentially
+ val table = new HashTable[K, DefaultEntry[K, V]] {
+ def insertEntry(e: DefaultEntry[K, V]) = if (super.findEntry(e.key) eq null) super.addEntry(e)
+ sizeMapInit(table.length)
+ }
+ var i = 0
+ while (i < ParHashMapCombiner.numblocks) {
+ if (heads(i) ne null) {
+ for (elem <- heads(i)) table.insertEntry(elem)
+ }
+ i += 1
+ }
+ // TODO compression
val c = table.hashTableContents
new ParHashMap(c)
}
@@ -123,17 +167,20 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
*
* Entries are added using the `insertEntry` method. This method checks whether the element
* exists and updates the size map. It returns false if the key was already in the table,
- * and true if the key was successfully inserted.
+ * and true if the key was successfully inserted. It does not update the number of elements
+ * in the table.
*/
- class AddingHashTable(numelems: Int, lf: Int) extends HashTable[K, DefaultEntry[K, V]] {
+ private[ParHashMapCombiner] class AddingHashTable(numelems: Int, lf: Int) extends HashTable[K, DefaultEntry[K, V]] {
import HashTable._
_loadFactor = lf
table = new Array[HashEntry[K, DefaultEntry[K, V]]](capacity(sizeForThreshold(_loadFactor, numelems)))
tableSize = 0
threshold = newThreshold(_loadFactor, table.length)
sizeMapInit(table.length)
- def insertEntry(e: DefaultEntry[K, V]) = {
+ def setSize(sz: Int) = tableSize = sz
+ def insertEntry(block: Int, e: DefaultEntry[K, V]) = {
var h = index(elemHashCode(e.key))
+ // assertCorrectBlock(h, block)
var olde = table(h).asInstanceOf[DefaultEntry[K, V]]
// check if key already exists
@@ -149,11 +196,17 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
if (h != -1) {
e.next = olde
table(h) = e
- tableSize = tableSize + 1
nnSizeMapAdd(h)
true
} else false
}
+ private def assertCorrectBlock(h: Int, block: Int) {
+ val blocksize = table.length / (1 << ParHashMapCombiner.discriminantbits)
+ if (!(h >= block * blocksize && h < (block + 1) * blocksize)) {
+ println("trying to put " + h + " into block no.: " + block + ", range: [" + block * blocksize + ", " + (block + 1) * blocksize + ">")
+ assert(h >= block * blocksize && h < (block + 1) * blocksize)
+ }
+ }
}
/* tasks */
@@ -166,11 +219,11 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
val until = offset + howmany
result = 0
while (i < until) {
- result += fillBlock(buckets(i))
+ result += fillBlock(i, buckets(i))
i += 1
}
}
- private def fillBlock(elems: Unrolled[DefaultEntry[K, V]]) = {
+ private def fillBlock(block: Int, elems: Unrolled[DefaultEntry[K, V]]) = {
var insertcount = 0
var unrolled = elems
var i = 0
@@ -180,7 +233,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
val chunksz = unrolled.size
while (i < chunksz) {
val elem = chunkarr(i)
- if (t.insertEntry(elem)) insertcount += 1
+ // assertCorrectBlock(block, elem.key)
+ if (t.insertEntry(block, elem)) insertcount += 1
i += 1
}
i = 0
@@ -188,6 +242,13 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
}
insertcount
}
+ private def assertCorrectBlock(block: Int, k: K) {
+ val hc = improve(elemHashCode(k))
+ if ((hc >>> nonmasklen) != block) {
+ println(hc + " goes to " + (hc >>> nonmasklen) + ", while expected block is " + block)
+ assert((hc >>> nonmasklen) == block)
+ }
+ }
def split = {
val fp = howmany / 2
List(new FillBlocks(buckets, table, offset, fp), new FillBlocks(buckets, table, offset + fp, howmany - fp))
@@ -204,7 +265,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
private[mutable] object ParHashMapCombiner {
private[mutable] val discriminantbits = 5
private[mutable] val numblocks = 1 << discriminantbits
- private[mutable] val discriminantmask = ((1 << discriminantbits) - 1) << (32 - discriminantbits)
+ private[mutable] val discriminantmask = ((1 << discriminantbits) - 1);
+ private[mutable] val nonmasklength = 32 - discriminantbits
def apply[K, V] = new ParHashMapCombiner[K, V](HashTable.defaultLoadFactor) with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]]
}
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index a694aeba17..1a3b35c853 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -219,6 +219,8 @@ package object parallel {
} else this
}
+
+
}