diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-10-28 12:09:52 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-10-28 12:09:52 +0000 |
commit | e9b61ff9fc769fd94f427902ec0a65ee23db6b85 (patch) | |
tree | 10851e95fb0ff1eed4949b5ff241b036e1035b57 /src/library | |
parent | f388aaaf52dab4ceaf8e5f26c72eb4a0d1d3b3e7 (diff) | |
download | scala-e9b61ff9fc769fd94f427902ec0a65ee23db6b85.tar.gz scala-e9b61ff9fc769fd94f427902ec0a65ee23db6b85.tar.bz2 scala-e9b61ff9fc769fd94f427902ec0a65ee23db6b85.zip |
Some serious bugfixes in parallel hash tables.
No review.
Diffstat (limited to 'src/library')
7 files changed, 79 insertions, 8 deletions
diff --git a/src/library/scala/collection/mutable/DefaultEntry.scala b/src/library/scala/collection/mutable/DefaultEntry.scala index 5144f4f590..44695c9ebe 100644 --- a/src/library/scala/collection/mutable/DefaultEntry.scala +++ b/src/library/scala/collection/mutable/DefaultEntry.scala @@ -19,3 +19,10 @@ package mutable @serializable final class DefaultEntry[A, B](val key: A, var value: B) extends HashEntry[A, DefaultEntry[A, B]] +{ + override def toString = chainString + + def chainString = { + "(kv: " + key + ", " + value + ")" + (if (next != null) " -> " + next.toString else "") + } +} diff --git a/src/library/scala/collection/mutable/HashTable.scala b/src/library/scala/collection/mutable/HashTable.scala index 7aae961a62..a445b6d6d1 100644 --- a/src/library/scala/collection/mutable/HashTable.scala +++ b/src/library/scala/collection/mutable/HashTable.scala @@ -113,6 +113,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] { val h = index(elemHashCode(e.key)) e.next = table(h).asInstanceOf[Entry] table(h) = e + if (this.isInstanceOf[collection.parallel.mutable.ParHashMap[_, _]]) println("adding at pos: " + h) tableSize = tableSize + 1 nnSizeMapAdd(h) if (tableSize > threshold) @@ -312,7 +313,10 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] { // this is of crucial importance when populating the table in parallel protected final def index(hcode: Int) = { val ones = table.length - 1 - (improve(hcode) >> (32 - java.lang.Integer.bitCount(ones))) & ones + val improved = improve(hcode) + val shifted = (improved >> (32 - java.lang.Integer.bitCount(ones))) & ones + if (this.isInstanceOf[collection.parallel.mutable.ParHashMap[_, _]]) println("computing hash code for: " + hcode + " -> " + improved + " -> " + shifted) + shifted } protected def initWithContents(c: HashTable.Contents[A, Entry]) = if (c != null) { diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 3a83140a53..348068c78c 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -275,6 +275,7 @@ self => * if this $coll is empty. */ def reduce[U >: T](op: (U, U) => U): U = { + println("------------------------------------------------") executeAndWaitResult(new Reduce(op, parallelIterator) mapResult { _.get }) } @@ -756,8 +757,26 @@ self => protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] { var result: Option[U] = None - def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.reduce(op)) + def leaf(prevr: Option[Option[U]]) = { + // pit.printDebugInformation + // val rem = pit.remaining + // val lst = pit.toList + // val pa = mutable.ParArray(lst: _*) + // val str = "At leaf we will iterate " + rem + " elems: " + pa.parallelIterator.toList + // val p2 = pa.parallelIterator + if (pit.remaining > 0) result = Some(pit.reduce(op)) + // println(str) + } protected[this] def newSubtask(p: ParIterableIterator[T]) = new Reduce(op, p) + // override def split = { + // var str = pit.debugInformation + // val pits = pit.split + // str += "\nsplitting: " + pits.map(_.remaining) + "\n" + // str += pits.map(_.debugInformation).mkString("\n") + // str += "=========================================\n" + // println(str) + // pits map { p => newSubtask(p) } + // } override def merge(that: Reduce[U]) = if (this.result == None) result = that.result else if (that.result != None) result = Some(op(result.get, that.result.get)) diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index 4831c829ad..cc0f2c0669 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -358,6 +358,18 @@ self => */ def remaining: Int + protected def buildString(closure: (String => Unit) => Unit): String = { + var output = "" + def appendln(s: String) = output += s + "\n" + closure(appendln) + output + } + + private[parallel] def debugInformation = { + // can be overridden in subclasses + "Parallel iterator: " + this.getClass + } + /* iterator transformers */ class Taken(taken: Int) extends ParIterableIterator[T] { diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala index 3648945857..c5b404e092 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala @@ -32,7 +32,7 @@ self => def seq = new collection.mutable.HashMap[K, V](hashTableContents) - def parallelIterator = new ParHashMapIterator(0, table.length, size, table(0).asInstanceOf[DefaultEntry[K, V]]) with SCPI + def parallelIterator = new ParHashMapIterator(1, table.length, size, table(0).asInstanceOf[DefaultEntry[K, V]]) with SCPI override def size = tableSize diff --git a/src/library/scala/collection/parallel/mutable/ParHashTable.scala b/src/library/scala/collection/parallel/mutable/ParHashTable.scala index 2617685a3d..ab29045444 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashTable.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashTable.scala @@ -49,17 +49,34 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m def remaining = totalsize - traversed + private[parallel] override def debugInformation = { + buildString { + append => + append("/-------------------\\") + append("Parallel hash table entry iterator") + append("total hash table elements: " + tableSize) + append("pos: " + idx) + append("until: " + until) + append("traversed: " + traversed) + append("totalsize: " + totalsize) + append("current entry: " + es) + append("underlying from " + idx + " until " + until) + append(itertable.slice(idx, until).map(x => if (x != null) x.toString else "n/a").mkString(" | ")) + append("\\-------------------/") + } + } + def split: Seq[ParIterableIterator[T]] = if (remaining > 1) { - if ((until - idx) > 1) { + if (until > idx) { // there is at least one more slot for the next iterator // divide the rest of the table val divsz = (until - idx) / 2 // second iterator params - val sidx = idx + divsz + val sidx = idx + divsz + 1 // + 1 preserves iteration invariant val suntil = until - val ses = itertable(sidx).asInstanceOf[Entry] - val stotal = calcNumElems(sidx, suntil) + val ses = itertable(sidx - 1).asInstanceOf[Entry] // sidx - 1 ensures counting from the right spot + val stotal = calcNumElems(sidx - 1, suntil) // first iterator params val fidx = idx @@ -83,10 +100,11 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m private def convertToArrayBuffer(chainhead: Entry): mutable.ArrayBuffer[T] = { var buff = mutable.ArrayBuffer[Entry]() var curr = chainhead - while (curr != null) { + while (curr ne null) { buff += curr curr = curr.next } + // println("converted " + remaining + " element iterator into buffer: " + buff) buff map { e => entry2item(e) } } diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index de3a790727..a694aeba17 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -139,6 +139,17 @@ package object parallel { new BufferIterator(buffer, index + divsz, until, signalDelegate) ) } else Seq(this) + private[parallel] override def debugInformation = { + buildString { + append => + append("---------------") + append("Buffer iterator") + append("buffer: " + buffer) + append("index: " + index) + append("until: " + until) + append("---------------") + } + } } /** A helper combiner which contains an array of buckets. Buckets themselves |