diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-10-28 12:10:00 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-10-28 12:10:00 +0000 |
commit | 8d311558f3774cd628a53fc675da93b550d06090 (patch) | |
tree | 6811285bc37af76d356980adaa579fa1d181faaf /src/library | |
parent | 962a348ab26f189a19dd74aeb3bbc8fd5d63061a (diff) | |
download | scala-8d311558f3774cd628a53fc675da93b550d06090.tar.gz scala-8d311558f3774cd628a53fc675da93b550d06090.tar.bz2 scala-8d311558f3774cd628a53fc675da93b550d06090.zip |
Debugging parallel hash tables.
No review.
Diffstat (limited to 'src/library')
5 files changed, 176 insertions, 56 deletions
diff --git a/src/library/scala/collection/mutable/HashTable.scala b/src/library/scala/collection/mutable/HashTable.scala index 3d31c3860e..c233bd2719 100644 --- a/src/library/scala/collection/mutable/HashTable.scala +++ b/src/library/scala/collection/mutable/HashTable.scala @@ -31,7 +31,7 @@ package mutable * * @tparam A type of the elements contained in this hash table. */ -trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] { +trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashUtils[A] { import HashTable._ @transient protected var _loadFactor = defaultLoadFactor @@ -218,10 +218,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] { } @transient protected var sizemap: Array[Int] = null - protected final def sizeMapBucketBitSize = 5 - // so that: - protected final def sizeMapBucketSize = 1 << sizeMapBucketBitSize - protected final def totalSizeMapBuckets = if (sizeMapBucketSize < table.length) 1 else table.length / sizeMapBucketSize + private[collection] final def totalSizeMapBuckets = if (sizeMapBucketSize < table.length) 1 else table.length / sizeMapBucketSize /* * The following three sizeMap* functions (Add, Remove, Reset) @@ -298,39 +295,6 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] { protected def elemEquals(key1: A, key2: A): Boolean = (key1 == key2) - protected def elemHashCode(key: A) = if (key == null) 0 else key.## - - protected final def improve(hcode: Int) = { - /* Murmur hash - * m = 0x5bd1e995 - * r = 24 - * note: h = seed = 0 in mmix - * mmix(h,k) = k *= m; k ^= k >> r; k *= m; h *= m; h ^= k; */ - var k = hcode * 0x5bd1e995 - k ^= k >> 24 - k *= 0x5bd1e995 - k - - /* Jenkins hash - * for range 0-10000, output has the msb set to zero */ - // var h = hcode + (hcode << 12) - // h ^= (h >> 22) - // h += (h << 4) - // h ^= (h >> 9) - // h += (h << 10) - // h ^= (h >> 2) - // h += (h << 7) - // h ^= (h >> 12) - // h - - /* OLD VERSION - * since 2003 */ - // var h: Int = hcode + ~(hcode << 9) - // h = h ^ (h >>> 14) - // h = h + (h << 4) - // h ^ (h >>> 10) - } - // Note: // we take the most significant bits of the hashcode, not the lower ones // this is of crucial importance when populating the table in parallel @@ -380,6 +344,68 @@ private[collection] object HashTable { private[collection] final def capacity(expectedSize: Int) = if (expectedSize == 0) 1 else powerOfTwo(expectedSize) + trait HashUtils[KeyType] { + protected final def sizeMapBucketBitSize = 5 + // so that: + protected final def sizeMapBucketSize = 1 << sizeMapBucketBitSize + + protected def elemHashCode(key: KeyType) = if (key == null) 0 else key.## + + protected final def improve(hcode: Int) = { + /* Murmur hash + * m = 0x5bd1e995 + * r = 24 + * note: h = seed = 0 in mmix + * mmix(h,k) = k *= m; k ^= k >> r; k *= m; h *= m; h ^= k; */ + // var k = hcode * 0x5bd1e995 + // k ^= k >> 24 + // k *= 0x5bd1e995 + // k + + /* Another fast multiplicative hash + * by Phil Bagwell + * + * Comment: + * Multiplication doesn't affect all the bits in the same way, so we want to + * multiply twice, "once from each side". + * It would be ideal to reverse all the bits after the first multiplication, + * however, this is more costly. We therefore restrict ourselves only to + * reversing the bytes before final multiplication. This yields a slightly + * worse entropy in the lower 8 bits, but that can be improved by adding: + * + * `i ^= i >> 6` + * + * For performance reasons, we avoid this improvement. + * */ + var i = hcode * 0x9e3775cd + i = java.lang.Integer.reverseBytes(i) + i * 0x9e3775cd + // a slower alternative for byte reversal: + // i = (i << 16) | (i >> 16) + // i = ((i >> 8) & 0x00ff00ff) | ((i << 8) & 0xff00ff00) + + /* Jenkins hash + * for range 0-10000, output has the msb set to zero */ + // var h = hcode + (hcode << 12) + // h ^= (h >> 22) + // h += (h << 4) + // h ^= (h >> 9) + // h += (h << 10) + // h ^= (h >> 2) + // h += (h << 7) + // h ^= (h >> 12) + // h + + /* OLD VERSION + * quick, but bad for sequence 0-10000 - little enthropy in higher bits + * since 2003 */ + // var h: Int = hcode + ~(hcode << 9) + // h = h ^ (h >>> 14) + // h = h + (h << 4) + // h ^ (h >>> 10) + } + } + /** * Returns a power of two >= `target`. */ @@ -400,6 +426,18 @@ private[collection] object HashTable { val tableSize: Int, val threshold: Int, val sizemap: Array[Int] - ) + ) { + import collection.DebugUtils._ + private[collection] def debugInformation = buildString { + append => + append("Hash table contents") + append("-------------------") + append("Table: [" + arrayString(table, 0, table.length) + "]") + append("Table size: " + tableSize) + append("Load factor: " + loadFactor) + append("Threshold: " + threshold) + append("Sizemap: [" + arrayString(sizemap, 0, sizemap.length) + "]") + } + } } diff --git a/src/library/scala/collection/package.scala b/src/library/scala/collection/package.scala index 19d65b73e2..f1eb50c5e0 100644 --- a/src/library/scala/collection/package.scala +++ b/src/library/scala/collection/package.scala @@ -76,4 +76,17 @@ package object collection { new CanBuildFrom[From, T, To] { // TODO: could we just return b instead? def apply(from: From) = b.apply() ; def apply() = b.apply() } + + private[collection] object DebugUtils { + /* debug utils */ + def buildString(closure: (String => Unit) => Unit): String = { + var output = "" + def appendln(s: String) = output += s + "\n" + closure(appendln) + output + } + + def arrayString[T](array: Array[T], from: Int, until: Int) = array.slice(from, until).map(x => if (x != null) x.toString else "n/a").mkString(" | ") + } + } diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 348068c78c..681fe3570e 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -275,7 +275,6 @@ self => * if this $coll is empty. */ def reduce[U >: T](op: (U, U) => U): U = { - println("------------------------------------------------") executeAndWaitResult(new Reduce(op, parallelIterator) mapResult { _.get }) } @@ -758,7 +757,7 @@ self => protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] { var result: Option[U] = None def leaf(prevr: Option[Option[U]]) = { - // pit.printDebugInformation + // pit.debugInformation // val rem = pit.remaining // val lst = pit.toList // val pa = mutable.ParArray(lst: _*) @@ -1227,6 +1226,12 @@ self => override def merge(that: FromArray[S, A, That]) = result = result combine that.result } + /* debug information */ + + private[parallel] def debugInformation = "Parallel collection: " + this.getClass + + private[parallel] def brokenInvariants = Seq[String]() + } diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala index c5b404e092..66c491086f 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala @@ -75,6 +75,33 @@ self => new ParHashMapIterator(idxFrom, idxUntil, totalSz, es) with SCPI } + private[parallel] override def brokenInvariants = { + // bucket by bucket, count elements + val buckets = for (i <- 0 until (table.length / sizeMapBucketSize)) yield checkBucket(i) + + // check if each element is in the position corresponding to its key + val elems = for (i <- 0 until table.length) yield checkEntry(i) + + buckets.flatMap(x => x) ++ elems.flatMap(x => x) + } + + private def checkBucket(i: Int) = { + def count(e: HashEntry[K, DefaultEntry[K, V]]): Int = if (e eq null) 0 else 1 + count(e.next) + val expected = sizemap(i) + val found = ((i * sizeMapBucketSize) until ((i + 1) * sizeMapBucketSize)).foldLeft(0) { + (acc, c) => acc + count(table(c)) + } + if (found != expected) List("Found " + found + " elements, while sizemap showed " + expected) + else Nil + } + + private def checkEntry(i: Int) = { + def check(e: HashEntry[K, DefaultEntry[K, V]]): List[String] = if (e eq null) Nil else + if (index(elemHashCode(e.key)) == i) check(e.next) + else ("Element " + e.key + " at " + i + " with " + elemHashCode(e.key) + " maps to " + index(elemHashCode(e.key))) :: check(e.next) + check(table(i)) + } + } @@ -88,14 +115,17 @@ object ParHashMap extends ParMapFactory[ParHashMap] { private[mutable] abstract class ParHashMapCombiner[K, V](private val tableLoadFactor: Int) -extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntry[K, V], ParHashMapCombiner[K, V]](ParHashMapCombiner.numblocks) { +extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntry[K, V], ParHashMapCombiner[K, V]](ParHashMapCombiner.numblocks) + with collection.mutable.HashTable.HashUtils[K] +{ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => private var mask = ParHashMapCombiner.discriminantmask + private var nonmasklen = ParHashMapCombiner.nonmasklength def +=(elem: (K, V)) = { sz += 1 - val hc = elem._1.## - val pos = hc & mask + val hc = improve(elemHashCode(elem._1)) + val pos = (hc >>> nonmasklen) if (lasts(pos) eq null) { // initialize bucket heads(pos) = new Unrolled[DefaultEntry[K, V]] @@ -106,14 +136,28 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => this } - def result: ParHashMap[K, V] = { + def result: ParHashMap[K, V] = if (size >= (ParHashMapCombiner.numblocks * sizeMapBucketSize)) { // 1024 // construct table val table = new AddingHashTable(size, tableLoadFactor) - val insertcount = executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks)) - + table.setSize(insertcount) // TODO compare insertcount and size to see if compression is needed - + val c = table.hashTableContents + new ParHashMap(c) + } else { + // construct a normal table and fill it sequentially + val table = new HashTable[K, DefaultEntry[K, V]] { + def insertEntry(e: DefaultEntry[K, V]) = if (super.findEntry(e.key) eq null) super.addEntry(e) + sizeMapInit(table.length) + } + var i = 0 + while (i < ParHashMapCombiner.numblocks) { + if (heads(i) ne null) { + for (elem <- heads(i)) table.insertEntry(elem) + } + i += 1 + } + // TODO compression val c = table.hashTableContents new ParHashMap(c) } @@ -123,17 +167,20 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => * * Entries are added using the `insertEntry` method. This method checks whether the element * exists and updates the size map. It returns false if the key was already in the table, - * and true if the key was successfully inserted. + * and true if the key was successfully inserted. It does not update the number of elements + * in the table. */ - class AddingHashTable(numelems: Int, lf: Int) extends HashTable[K, DefaultEntry[K, V]] { + private[ParHashMapCombiner] class AddingHashTable(numelems: Int, lf: Int) extends HashTable[K, DefaultEntry[K, V]] { import HashTable._ _loadFactor = lf table = new Array[HashEntry[K, DefaultEntry[K, V]]](capacity(sizeForThreshold(_loadFactor, numelems))) tableSize = 0 threshold = newThreshold(_loadFactor, table.length) sizeMapInit(table.length) - def insertEntry(e: DefaultEntry[K, V]) = { + def setSize(sz: Int) = tableSize = sz + def insertEntry(block: Int, e: DefaultEntry[K, V]) = { var h = index(elemHashCode(e.key)) + // assertCorrectBlock(h, block) var olde = table(h).asInstanceOf[DefaultEntry[K, V]] // check if key already exists @@ -149,11 +196,17 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => if (h != -1) { e.next = olde table(h) = e - tableSize = tableSize + 1 nnSizeMapAdd(h) true } else false } + private def assertCorrectBlock(h: Int, block: Int) { + val blocksize = table.length / (1 << ParHashMapCombiner.discriminantbits) + if (!(h >= block * blocksize && h < (block + 1) * blocksize)) { + println("trying to put " + h + " into block no.: " + block + ", range: [" + block * blocksize + ", " + (block + 1) * blocksize + ">") + assert(h >= block * blocksize && h < (block + 1) * blocksize) + } + } } /* tasks */ @@ -166,11 +219,11 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => val until = offset + howmany result = 0 while (i < until) { - result += fillBlock(buckets(i)) + result += fillBlock(i, buckets(i)) i += 1 } } - private def fillBlock(elems: Unrolled[DefaultEntry[K, V]]) = { + private def fillBlock(block: Int, elems: Unrolled[DefaultEntry[K, V]]) = { var insertcount = 0 var unrolled = elems var i = 0 @@ -180,7 +233,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => val chunksz = unrolled.size while (i < chunksz) { val elem = chunkarr(i) - if (t.insertEntry(elem)) insertcount += 1 + // assertCorrectBlock(block, elem.key) + if (t.insertEntry(block, elem)) insertcount += 1 i += 1 } i = 0 @@ -188,6 +242,13 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => } insertcount } + private def assertCorrectBlock(block: Int, k: K) { + val hc = improve(elemHashCode(k)) + if ((hc >>> nonmasklen) != block) { + println(hc + " goes to " + (hc >>> nonmasklen) + ", while expected block is " + block) + assert((hc >>> nonmasklen) == block) + } + } def split = { val fp = howmany / 2 List(new FillBlocks(buckets, table, offset, fp), new FillBlocks(buckets, table, offset + fp, howmany - fp)) @@ -204,7 +265,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => private[mutable] object ParHashMapCombiner { private[mutable] val discriminantbits = 5 private[mutable] val numblocks = 1 << discriminantbits - private[mutable] val discriminantmask = ((1 << discriminantbits) - 1) << (32 - discriminantbits) + private[mutable] val discriminantmask = ((1 << discriminantbits) - 1); + private[mutable] val nonmasklength = 32 - discriminantbits def apply[K, V] = new ParHashMapCombiner[K, V](HashTable.defaultLoadFactor) with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] } diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index a694aeba17..1a3b35c853 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -219,6 +219,8 @@ package object parallel { } else this } + + } |