summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-28 12:09:52 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-28 12:09:52 +0000
commite9b61ff9fc769fd94f427902ec0a65ee23db6b85 (patch)
tree10851e95fb0ff1eed4949b5ff241b036e1035b57 /src/library
parentf388aaaf52dab4ceaf8e5f26c72eb4a0d1d3b3e7 (diff)
downloadscala-e9b61ff9fc769fd94f427902ec0a65ee23db6b85.tar.gz
scala-e9b61ff9fc769fd94f427902ec0a65ee23db6b85.tar.bz2
scala-e9b61ff9fc769fd94f427902ec0a65ee23db6b85.zip
Some serious bugfixes in parallel hash tables.
No review.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/collection/mutable/DefaultEntry.scala7
-rw-r--r--src/library/scala/collection/mutable/HashTable.scala6
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala21
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala12
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashTable.scala28
-rw-r--r--src/library/scala/collection/parallel/package.scala11
7 files changed, 79 insertions, 8 deletions
diff --git a/src/library/scala/collection/mutable/DefaultEntry.scala b/src/library/scala/collection/mutable/DefaultEntry.scala
index 5144f4f590..44695c9ebe 100644
--- a/src/library/scala/collection/mutable/DefaultEntry.scala
+++ b/src/library/scala/collection/mutable/DefaultEntry.scala
@@ -19,3 +19,10 @@ package mutable
@serializable
final class DefaultEntry[A, B](val key: A, var value: B)
extends HashEntry[A, DefaultEntry[A, B]]
+{
+ override def toString = chainString
+
+ def chainString = {
+ "(kv: " + key + ", " + value + ")" + (if (next != null) " -> " + next.toString else "")
+ }
+}
diff --git a/src/library/scala/collection/mutable/HashTable.scala b/src/library/scala/collection/mutable/HashTable.scala
index 7aae961a62..a445b6d6d1 100644
--- a/src/library/scala/collection/mutable/HashTable.scala
+++ b/src/library/scala/collection/mutable/HashTable.scala
@@ -113,6 +113,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] {
val h = index(elemHashCode(e.key))
e.next = table(h).asInstanceOf[Entry]
table(h) = e
+ if (this.isInstanceOf[collection.parallel.mutable.ParHashMap[_, _]]) println("adding at pos: " + h)
tableSize = tableSize + 1
nnSizeMapAdd(h)
if (tableSize > threshold)
@@ -312,7 +313,10 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] {
// this is of crucial importance when populating the table in parallel
protected final def index(hcode: Int) = {
val ones = table.length - 1
- (improve(hcode) >> (32 - java.lang.Integer.bitCount(ones))) & ones
+ val improved = improve(hcode)
+ val shifted = (improved >> (32 - java.lang.Integer.bitCount(ones))) & ones
+ if (this.isInstanceOf[collection.parallel.mutable.ParHashMap[_, _]]) println("computing hash code for: " + hcode + " -> " + improved + " -> " + shifted)
+ shifted
}
protected def initWithContents(c: HashTable.Contents[A, Entry]) = if (c != null) {
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 3a83140a53..348068c78c 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -275,6 +275,7 @@ self =>
* if this $coll is empty.
*/
def reduce[U >: T](op: (U, U) => U): U = {
+ println("------------------------------------------------")
executeAndWaitResult(new Reduce(op, parallelIterator) mapResult { _.get })
}
@@ -756,8 +757,26 @@ self =>
protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] {
var result: Option[U] = None
- def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.reduce(op))
+ def leaf(prevr: Option[Option[U]]) = {
+ // pit.printDebugInformation
+ // val rem = pit.remaining
+ // val lst = pit.toList
+ // val pa = mutable.ParArray(lst: _*)
+ // val str = "At leaf we will iterate " + rem + " elems: " + pa.parallelIterator.toList
+ // val p2 = pa.parallelIterator
+ if (pit.remaining > 0) result = Some(pit.reduce(op))
+ // println(str)
+ }
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Reduce(op, p)
+ // override def split = {
+ // var str = pit.debugInformation
+ // val pits = pit.split
+ // str += "\nsplitting: " + pits.map(_.remaining) + "\n"
+ // str += pits.map(_.debugInformation).mkString("\n")
+ // str += "=========================================\n"
+ // println(str)
+ // pits map { p => newSubtask(p) }
+ // }
override def merge(that: Reduce[U]) =
if (this.result == None) result = that.result
else if (that.result != None) result = Some(op(result.get, that.result.get))
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index 4831c829ad..cc0f2c0669 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -358,6 +358,18 @@ self =>
*/
def remaining: Int
+ protected def buildString(closure: (String => Unit) => Unit): String = {
+ var output = ""
+ def appendln(s: String) = output += s + "\n"
+ closure(appendln)
+ output
+ }
+
+ private[parallel] def debugInformation = {
+ // can be overridden in subclasses
+ "Parallel iterator: " + this.getClass
+ }
+
/* iterator transformers */
class Taken(taken: Int) extends ParIterableIterator[T] {
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
index 3648945857..c5b404e092 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -32,7 +32,7 @@ self =>
def seq = new collection.mutable.HashMap[K, V](hashTableContents)
- def parallelIterator = new ParHashMapIterator(0, table.length, size, table(0).asInstanceOf[DefaultEntry[K, V]]) with SCPI
+ def parallelIterator = new ParHashMapIterator(1, table.length, size, table(0).asInstanceOf[DefaultEntry[K, V]]) with SCPI
override def size = tableSize
diff --git a/src/library/scala/collection/parallel/mutable/ParHashTable.scala b/src/library/scala/collection/parallel/mutable/ParHashTable.scala
index 2617685a3d..ab29045444 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashTable.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashTable.scala
@@ -49,17 +49,34 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m
def remaining = totalsize - traversed
+ private[parallel] override def debugInformation = {
+ buildString {
+ append =>
+ append("/-------------------\\")
+ append("Parallel hash table entry iterator")
+ append("total hash table elements: " + tableSize)
+ append("pos: " + idx)
+ append("until: " + until)
+ append("traversed: " + traversed)
+ append("totalsize: " + totalsize)
+ append("current entry: " + es)
+ append("underlying from " + idx + " until " + until)
+ append(itertable.slice(idx, until).map(x => if (x != null) x.toString else "n/a").mkString(" | "))
+ append("\\-------------------/")
+ }
+ }
+
def split: Seq[ParIterableIterator[T]] = if (remaining > 1) {
- if ((until - idx) > 1) {
+ if (until > idx) {
// there is at least one more slot for the next iterator
// divide the rest of the table
val divsz = (until - idx) / 2
// second iterator params
- val sidx = idx + divsz
+ val sidx = idx + divsz + 1 // + 1 preserves iteration invariant
val suntil = until
- val ses = itertable(sidx).asInstanceOf[Entry]
- val stotal = calcNumElems(sidx, suntil)
+ val ses = itertable(sidx - 1).asInstanceOf[Entry] // sidx - 1 ensures counting from the right spot
+ val stotal = calcNumElems(sidx - 1, suntil)
// first iterator params
val fidx = idx
@@ -83,10 +100,11 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m
private def convertToArrayBuffer(chainhead: Entry): mutable.ArrayBuffer[T] = {
var buff = mutable.ArrayBuffer[Entry]()
var curr = chainhead
- while (curr != null) {
+ while (curr ne null) {
buff += curr
curr = curr.next
}
+ // println("converted " + remaining + " element iterator into buffer: " + buff)
buff map { e => entry2item(e) }
}
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index de3a790727..a694aeba17 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -139,6 +139,17 @@ package object parallel {
new BufferIterator(buffer, index + divsz, until, signalDelegate)
)
} else Seq(this)
+ private[parallel] override def debugInformation = {
+ buildString {
+ append =>
+ append("---------------")
+ append("Buffer iterator")
+ append("buffer: " + buffer)
+ append("index: " + index)
+ append("until: " + until)
+ append("---------------")
+ }
+ }
}
/** A helper combiner which contains an array of buckets. Buckets themselves