diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-11-08 08:52:20 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-11-08 08:52:20 +0000 |
commit | 09ed9d12c343ee972861c8439fd10596903efe59 (patch) | |
tree | ea2735b13b43d4132664d8b3d6a9c23e2b709b7e | |
parent | 056663c3f22b8c03f222856305ef99e3ed029889 (diff) | |
download | scala-09ed9d12c343ee972861c8439fd10596903efe59.tar.gz scala-09ed9d12c343ee972861c8439fd10596903efe59.tar.bz2 scala-09ed9d12c343ee972861c8439fd10596903efe59.zip |
Added size maps to flat hash tables.
Added parallel mutable hash sets.
Implemented parallel mutable hash set iterators.
Implemented parallel mutable hash set combiners.
Factored out unrolled linked lists into a separate class UnrolledBuffer, added tests.
Added parallel mutable hash set tests, and debugged hashsets.
No review.
26 files changed, 1364 insertions, 168 deletions
diff --git a/src/library/scala/collection/IterableLike.scala b/src/library/scala/collection/IterableLike.scala index 538fd09c0e..97787c5867 100644 --- a/src/library/scala/collection/IterableLike.scala +++ b/src/library/scala/collection/IterableLike.scala @@ -193,7 +193,7 @@ self => * $orderDependent * * @param n The number of elements to take - * @return a $coll consisting of all elements of this $coll except the first `n` ones, or else the + * @return a $coll consisting of all elements of this $coll except the last `n` ones, or else the * empty $coll, if this $coll has less than `n` elements. */ def dropRight(n: Int): Repr = { diff --git a/src/library/scala/collection/generic/ClassManifestTraversableFactory.scala b/src/library/scala/collection/generic/ClassManifestTraversableFactory.scala new file mode 100644 index 0000000000..f9097eeca0 --- /dev/null +++ b/src/library/scala/collection/generic/ClassManifestTraversableFactory.scala @@ -0,0 +1,25 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2006-2010, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + + +package scala.collection +package generic + + + + + +abstract class ClassManifestTraversableFactory[CC[X] <: Traversable[X] with GenericClassManifestTraversableTemplate[X, CC]] +extends GenericClassManifestCompanion[CC] { + + class GenericCanBuildFrom[A](implicit manif: ClassManifest[A]) extends CanBuildFrom[CC[_], A, CC[A]] { + def apply(from: CC[_]) = from.genericClassManifestBuilder[A] + def apply = newBuilder[A] + } + +} diff --git a/src/library/scala/collection/generic/GenericClassManifestCompanion.scala b/src/library/scala/collection/generic/GenericClassManifestCompanion.scala new file mode 100644 index 0000000000..98cafd3841 --- /dev/null +++ b/src/library/scala/collection/generic/GenericClassManifestCompanion.scala @@ -0,0 +1,33 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2010, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + + + +package scala.collection +package generic + +import mutable.Builder + + + + + +abstract class GenericClassManifestCompanion[+CC[X] <: Traversable[X]] { + type Coll = CC[_] + + def newBuilder[A](implicit ord: ClassManifest[A]): Builder[A, CC[A]] + + def empty[A: ClassManifest]: CC[A] = newBuilder[A].result + + def apply[A](elems: A*)(implicit ord: ClassManifest[A]): CC[A] = { + val b = newBuilder[A] + b ++= elems + b.result + } +} + diff --git a/src/library/scala/collection/generic/GenericClassManifestTraversableTemplate.scala b/src/library/scala/collection/generic/GenericClassManifestTraversableTemplate.scala new file mode 100644 index 0000000000..bf167bd6ab --- /dev/null +++ b/src/library/scala/collection/generic/GenericClassManifestTraversableTemplate.scala @@ -0,0 +1,25 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2010, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + + + +package scala.collection +package generic + +import mutable.Builder +import annotation.unchecked.uncheckedVariance + + + + +trait GenericClassManifestTraversableTemplate[+A, +CC[X] <: Traversable[X]] extends HasNewBuilder[A, CC[A] @uncheckedVariance] { + implicit protected[this] val manifest: ClassManifest[A] + def classManifestCompanion: GenericClassManifestCompanion[CC] + def genericClassManifestBuilder[B](implicit man: ClassManifest[B]): Builder[B, CC[B]] = classManifestCompanion.newBuilder[B] +} + diff --git a/src/library/scala/collection/mutable/FlatHashTable.scala b/src/library/scala/collection/mutable/FlatHashTable.scala index aa2cd2deba..d0d9ce2cf5 100644 --- a/src/library/scala/collection/mutable/FlatHashTable.scala +++ b/src/library/scala/collection/mutable/FlatHashTable.scala @@ -20,20 +20,12 @@ package mutable * * @tparam A the type of the elements contained in the flat hash table. */ -trait FlatHashTable[A] { - - /** The load factor for the hash table; must be < 500 (0.5) - */ - protected def loadFactor: Int = 450 - protected final def loadFactorDenum = 1000 - - /** The initial size of the hash table. - */ - protected def initialSize: Int = 16 +trait FlatHashTable[A] extends FlatHashTable.HashUtils[A] { + import FlatHashTable._ private final val tableDebug = false - @transient private[collection] var _loadFactor = loadFactor + @transient private[collection] var _loadFactor = defaultLoadFactor /** The actual hash table. */ @@ -45,10 +37,14 @@ trait FlatHashTable[A] { /** The next size value at which to resize (capacity * load factor). */ - @transient protected var threshold: Int = newThreshold(initialCapacity) + @transient protected var threshold: Int = newThreshold(_loadFactor, initialCapacity) + + /** The array keeping track of number of elements in 32 element blocks. + */ + @transient protected var sizemap: Array[Int] = null import HashTable.powerOfTwo - private def capacity(expectedSize: Int) = if (expectedSize == 0) 1 else powerOfTwo(expectedSize) + protected def capacity(expectedSize: Int) = if (expectedSize == 0) 1 else powerOfTwo(expectedSize) private def initialCapacity = capacity(initialSize) /** @@ -68,8 +64,11 @@ trait FlatHashTable[A] { tableSize = 0 assert(size >= 0) - table = new Array(capacity(size * loadFactorDenum / _loadFactor)) - threshold = newThreshold(table.size) + table = new Array(capacity(sizeForThreshold(size, _loadFactor))) + threshold = newThreshold(_loadFactor, table.size) + + val smDefined = in.readBoolean + if (smDefined) sizeMapInit(table.length) var index = 0 while (index < size) { @@ -89,6 +88,7 @@ trait FlatHashTable[A] { out.defaultWriteObject out.writeInt(_loadFactor) out.writeInt(tableSize) + out.writeBoolean(isSizeMapDefined) iterator.foreach(out.writeObject) } @@ -127,6 +127,7 @@ trait FlatHashTable[A] { } table(h) = elem.asInstanceOf[AnyRef] tableSize = tableSize + 1 + nnSizeMapAdd(h) if (tableSize >= threshold) growTable() true } @@ -157,6 +158,7 @@ trait FlatHashTable[A] { } table(h0) = null tableSize -= 1 + nnSizeMapRemove(h0) if (tableDebug) checkConsistent() return Some(entry.asInstanceOf[A]) } @@ -181,7 +183,8 @@ trait FlatHashTable[A] { val oldtable = table table = new Array[AnyRef](table.length * 2) tableSize = 0 - threshold = newThreshold(table.length) + nnSizeMapReset(table.length) + threshold = newThreshold(_loadFactor, table.length) var i = 0 while (i < oldtable.length) { val entry = oldtable(i) @@ -197,26 +200,161 @@ trait FlatHashTable[A] { assert(false, i+" "+table(i)+" "+table.mkString) } - protected def elemHashCode(elem: A) = if (elem == null) 0 else elem.hashCode() + /* Size map handling code */ - protected final def improve(hcode: Int) = { - var h: Int = hcode + ~(hcode << 9) - h = h ^ (h >>> 14) - h = h + (h << 4) - h ^ (h >>> 10) + /* + * The following three methods (nn*) modify a size map only if it has been + * initialized, that is, if it's not set to null. + * + * The size map logically divides the hash table into `sizeMapBucketSize` element buckets + * by keeping an integer entry for each such bucket. Each integer entry simply denotes + * the number of elements in the corresponding bucket. + * Best understood through an example, see: + * table = [/, 1, /, 6, 90, /, -3, 5] (8 entries) + * sizemap = [ 2 | 3 ] (2 entries) + * where sizeMapBucketSize == 4. + * + */ + protected def nnSizeMapAdd(h: Int) = if (sizemap ne null) { + val p = h >> sizeMapBucketBitSize + sizemap(p) += 1 } - protected final def index(hcode: Int) = improve(hcode) & (table.length - 1) + protected def nnSizeMapRemove(h: Int) = if (sizemap ne null) { + sizemap(h >> sizeMapBucketBitSize) -= 1 + } - private def newThreshold(size: Int) = { - val lf = _loadFactor - assert(lf < (loadFactorDenum / 2), "loadFactor too large; must be < 0.5") - (size.toLong * lf / loadFactorDenum ).toInt + protected def nnSizeMapReset(tableLength: Int) = if (sizemap ne null) { + val nsize = calcSizeMapSize(tableLength) + if (sizemap.length != nsize) sizemap = new Array[Int](nsize) + else java.util.Arrays.fill(sizemap, 0) + } + + private[collection] final def totalSizeMapBuckets = if (sizeMapBucketSize < table.length) 1 else table.length / sizeMapBucketSize + + protected def calcSizeMapSize(tableLength: Int) = (tableLength >> sizeMapBucketBitSize) + 1 + + // discards the previous sizemap and only allocates a new one + protected def sizeMapInit(tableLength: Int) { + sizemap = new Array[Int](calcSizeMapSize(tableLength)) + } + + // discards the previous sizemap and populates the new one + protected def sizeMapInitAndRebuild { + // first allocate + sizeMapInit(table.length) + + // rebuild + val totalbuckets = totalSizeMapBuckets + var bucketidx = 0 + var tableidx = 0 + var tbl = table + var tableuntil = sizeMapBucketSize min tbl.length + while (bucketidx < totalbuckets) { + var currbucketsz = 0 + while (tableidx < tableuntil) { + if (tbl(tableidx) ne null) currbucketsz += 1 + tableidx += 1 + } + sizemap(bucketidx) = currbucketsz + tableuntil += sizeMapBucketSize + bucketidx += 1 + } + } + + private[collection] def printSizeMap { + println(sizemap.toList) + } + + protected def sizeMapDisable = sizemap = null + + protected def isSizeMapDefined = sizemap ne null + + protected def alwaysInitSizeMap = false + + /* End of size map handling code */ + + protected final def index(hcode: Int) = { + // improve(hcode) & (table.length - 1) + val improved = improve(hcode) + val ones = table.length - 1 + (improved >> (32 - java.lang.Integer.bitCount(ones))) & ones } protected def clearTable() { var i = table.length - 1 while (i >= 0) { table(i) = null; i -= 1 } tableSize = 0 + nnSizeMapReset(table.length) + } + + private[collection] def hashTableContents = new FlatHashTable.Contents[A]( + _loadFactor, + table, + tableSize, + threshold, + sizemap + ) + + protected def initWithContents(c: FlatHashTable.Contents[A]) = { + if (c != null) { + _loadFactor = c.loadFactor + table = c.table + tableSize = c.tableSize + threshold = c.threshold + sizemap = c.sizemap + } + if (alwaysInitSizeMap && sizemap == null) sizeMapInitAndRebuild } + } + + + +private[collection] object FlatHashTable { + + /** The load factor for the hash table; must be < 500 (0.5) + */ + private[collection] def defaultLoadFactor: Int = 450 + private[collection] final def loadFactorDenum = 1000 + + /** The initial size of the hash table. + */ + private[collection] def initialSize: Int = 16 + + private[collection] def sizeForThreshold(size: Int, _loadFactor: Int) = size * loadFactorDenum / _loadFactor + + private[collection] def newThreshold(_loadFactor: Int, size: Int) = { + val lf = _loadFactor + assert(lf < (loadFactorDenum / 2), "loadFactor too large; must be < 0.5") + (size.toLong * lf / loadFactorDenum ).toInt + } + + class Contents[A]( + val loadFactor: Int, + val table: Array[AnyRef], + val tableSize: Int, + val threshold: Int, + val sizemap: Array[Int] + ) + + trait HashUtils[A] { + protected final def sizeMapBucketBitSize = 5 + // so that: + protected final def sizeMapBucketSize = 1 << sizeMapBucketBitSize + + protected def elemHashCode(elem: A) = if (elem == null) 0 else elem.hashCode() + + protected final def improve(hcode: Int) = { + // var h: Int = hcode + ~(hcode << 9) + // h = h ^ (h >>> 14) + // h = h + (h << 4) + // h ^ (h >>> 10) + var i = hcode * 0x9e3775cd + i = java.lang.Integer.reverseBytes(i) + i * 0x9e3775cd + } + } + +} + diff --git a/src/library/scala/collection/mutable/HashSet.scala b/src/library/scala/collection/mutable/HashSet.scala index ebfeaa29ad..684faaabf6 100644 --- a/src/library/scala/collection/mutable/HashSet.scala +++ b/src/library/scala/collection/mutable/HashSet.scala @@ -12,6 +12,7 @@ package scala.collection package mutable import generic._ +import collection.parallel.mutable.ParHashSet /** This class implements mutable sets using a hashtable. * @@ -35,10 +36,17 @@ import generic._ * @define willNotTerminateInf */ @serializable @SerialVersionUID(1L) -class HashSet[A] extends Set[A] - with GenericSetTemplate[A, HashSet] - with SetLike[A, HashSet[A]] - with FlatHashTable[A] { +class HashSet[A] private[collection] (contents: FlatHashTable.Contents[A]) +extends Set[A] + with GenericSetTemplate[A, HashSet] + with SetLike[A, HashSet[A]] + with FlatHashTable[A] + with Parallelizable[ParHashSet[A]] +{ + initWithContents(contents) + + def this() = this(null) + override def companion: GenericCompanion[HashSet] = HashSet override def size = tableSize @@ -48,6 +56,8 @@ class HashSet[A] extends Set[A] def += (elem: A): this.type = { addEntry(elem); this } def -= (elem: A): this.type = { removeEntry(elem); this } + def par = new ParHashSet(hashTableContents) + override def add(elem: A): Boolean = addEntry(elem) override def remove(elem: A): Boolean = removeEntry(elem).isDefined @@ -72,6 +82,13 @@ class HashSet[A] extends Set[A] private def readObject(in: java.io.ObjectInputStream) { init(in, x => x) } + + /** Toggles whether a size map is used to track hash map statistics. + */ + def useSizeMap(t: Boolean) = if (t) { + if (!isSizeMapDefined) sizeMapInitAndRebuild + } else sizeMapDisable + } /** $factoryInfo diff --git a/src/library/scala/collection/mutable/HashTable.scala b/src/library/scala/collection/mutable/HashTable.scala index ceb6d6e0e8..02d09f3081 100644 --- a/src/library/scala/collection/mutable/HashTable.scala +++ b/src/library/scala/collection/mutable/HashTable.scala @@ -48,6 +48,10 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU */ @transient protected var threshold: Int = initialThreshold(_loadFactor) + /** The array keeping track of the number of elements in 32 element blocks. + */ + @transient protected var sizemap: Array[Int] = null + protected def initialSize: Int = HashTable.initialSize /** @@ -70,7 +74,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU table = new Array(capacity(sizeForThreshold(_loadFactor, size))) threshold = newThreshold(_loadFactor, table.size) - if (smDefined) sizeMapInit(table.size) else sizemap = null + if (smDefined) sizeMapInit(table.length) else sizemap = null var index = 0 while (index < size) { @@ -218,8 +222,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU threshold = newThreshold(_loadFactor, newSize) } - @transient protected var sizemap: Array[Int] = null - private[collection] final def totalSizeMapBuckets = if (sizeMapBucketSize < table.length) 1 else table.length / sizeMapBucketSize + /* Size map handling code */ /* * The following three sizeMap* functions (Add, Remove, Reset) @@ -252,6 +255,8 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU else java.util.Arrays.fill(sizemap, 0) } + private[collection] final def totalSizeMapBuckets = if (sizeMapBucketSize < table.length) 1 else table.length / sizeMapBucketSize + protected def calcSizeMapSize(tableLength: Int) = (tableLength >> sizeMapBucketBitSize) + 1 // discards the previous sizemap and only allocates a new one @@ -286,7 +291,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU } } - def printSizeMap { + private[collection] def printSizeMap { println(sizemap.toList) } @@ -294,6 +299,11 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU protected def isSizeMapDefined = sizemap ne null + // override to automatically initialize the size map + protected def alwaysInitSizeMap = false + + /* End of size map handling code */ + protected def elemEquals(key1: A, key2: A): Boolean = (key1 == key2) // Note: @@ -306,12 +316,15 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU shifted } - protected def initWithContents(c: HashTable.Contents[A, Entry]) = if (c != null) { - _loadFactor = c.loadFactor - table = c.table - tableSize = c.tableSize - threshold = c.threshold - sizemap = c.sizemap + protected def initWithContents(c: HashTable.Contents[A, Entry]) = { + if (c != null) { + _loadFactor = c.loadFactor + table = c.table + tableSize = c.tableSize + threshold = c.threshold + sizemap = c.sizemap + } + if (alwaysInitSizeMap && sizemap == null) sizeMapInitAndRebuild } private[collection] def hashTableContents = new HashTable.Contents( diff --git a/src/library/scala/collection/package.scala b/src/library/scala/collection/package.scala index f1eb50c5e0..13b6f22826 100644 --- a/src/library/scala/collection/package.scala +++ b/src/library/scala/collection/package.scala @@ -79,9 +79,9 @@ package object collection { private[collection] object DebugUtils { /* debug utils */ - def buildString(closure: (String => Unit) => Unit): String = { + def buildString(closure: (Any => Unit) => Unit): String = { var output = "" - def appendln(s: String) = output += s + "\n" + def appendln(s: Any) = output += s + "\n" closure(appendln) output } diff --git a/src/library/scala/collection/parallel/ParSet.scala b/src/library/scala/collection/parallel/ParSet.scala index cd59bc0a78..80bc65fe43 100644 --- a/src/library/scala/collection/parallel/ParSet.scala +++ b/src/library/scala/collection/parallel/ParSet.scala @@ -6,7 +6,7 @@ package scala.collection.parallel -import scala.collection.Map +import scala.collection.Set import scala.collection.mutable.Builder import scala.collection.generic._ diff --git a/src/library/scala/collection/parallel/UnrolledBuffer.scala b/src/library/scala/collection/parallel/UnrolledBuffer.scala new file mode 100644 index 0000000000..2c12069e1c --- /dev/null +++ b/src/library/scala/collection/parallel/UnrolledBuffer.scala @@ -0,0 +1,258 @@ +package scala.collection.parallel + + + +import collection.generic._ +import collection.mutable.Builder + +import annotation.tailrec + + + + +class UnrolledBuffer[T](implicit val manifest: ClassManifest[T]) +extends collection.mutable.Buffer[T] + with collection.mutable.BufferLike[T, UnrolledBuffer[T]] + with GenericClassManifestTraversableTemplate[T, UnrolledBuffer] + with collection.mutable.Builder[T, UnrolledBuffer[T]] +{ + import UnrolledBuffer.Unrolled + + private var headptr = new Unrolled[T] + private var lastptr = headptr + private var sz = 0 + + private[parallel] def headPtr = headptr + private[parallel] def headPtr_=(head: Unrolled[T]) = headptr = head + private[parallel] def lastPtr = lastptr + private[parallel] def lastPtr_=(last: Unrolled[T]) = lastptr = last + + protected[this] override def newBuilder = new UnrolledBuffer[T] + + def classManifestCompanion = UnrolledBuffer + + def concat(that: UnrolledBuffer[T]) = { + // bind the two together + if (!lastptr.bind(that.headptr)) lastptr = that.lastPtr + + // update size + sz += that.sz + + // `that` is no longer usable, so clear it + // here we rely on the fact that `clear` allocates + // new nodes instead of modifying the previous ones + that.clear + + // return a reference to this + this + } + + def +=(elem: T) = { + lastptr = lastptr.append(elem) + sz += 1 + this + } + + def clear() { + headptr = new Unrolled[T] + lastptr = headptr + sz = 0 + } + + def iterator = new Iterator[T] { + var pos: Int = -1 + var node: Unrolled[T] = headptr + scan() + + private def scan() { + pos += 1 + while (pos >= node.size) { + pos = 0 + node = node.next + if (node eq null) return + } + } + def hasNext = node ne null + def next = if (hasNext) { + val r = node.array(pos) + scan() + r + } else Iterator.empty.next + } + + // this should be faster than the iterator + override def foreach[U](f: T => U) = headptr.foreach(f) + + def result = this + + def length = sz + + def apply(idx: Int) = + if (idx >= 0 && idx < sz) headptr(idx) + else outofbounds(idx) + + def update(idx: Int, newelem: T) = + if (idx >= 0 && idx < sz) headptr(idx) = newelem + else outofbounds(idx) + + def remove(idx: Int) = + if (idx >= 0 && idx < sz) { + sz -= 1 + headptr.remove(idx, this) + } else outofbounds(idx) + + def +=:(elem: T) = { + headptr = headptr.prepend(elem) + sz += 1 + this + } + + def insertAll(idx: Int, elems: Traversable[T]) = + if (idx >= 0 && idx <= sz) { + headptr.insertAll(idx, elems, this) + sz += elems.size + } else outofbounds(idx) + + override def stringPrefix = "UnrolledBuffer" +} + + +object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] { + /** $genericCanBuildFromInfo */ + implicit def canBuildFrom[T](implicit m: ClassManifest[T]): CanBuildFrom[Coll, T, UnrolledBuffer[T]] = + new GenericCanBuildFrom[T] + def newBuilder[T](implicit m: ClassManifest[T]): Builder[T, UnrolledBuffer[T]] = new UnrolledBuffer[T] + + val waterline = 50 + val waterlineDelim = 100 + private[parallel] val unrolledsize = 32 + + /** Unrolled buffer node. + */ + class Unrolled[T: ClassManifest] private (var size: Int, var array: Array[T], var next: Unrolled[T]) { + def this() = this(0, new Array[T](UnrolledBuffer.unrolledsize), null) + + // adds and returns itself or the new unrolled if full + @tailrec final def append(elem: T): Unrolled[T] = if (size < UnrolledBuffer.unrolledsize) { + array(size) = elem + size += 1 + this + } else { + next = new Unrolled[T] + next.append(elem) + } + def foreach[U](f: T => U) { + var unrolled = this + var i = 0 + while (unrolled ne null) { + val chunkarr = unrolled.array + val chunksz = unrolled.size + while (i < chunksz) { + val elem = chunkarr(i) + f(elem) + i += 1 + } + i = 0 + unrolled = unrolled.next + } + } + @tailrec final def apply(idx: Int): T = + if (idx < size) array(idx) else next.apply(idx - size) + @tailrec final def update(idx: Int, newelem: T): Unit = + if (idx < size) array(idx) = newelem else next.update(idx - size, newelem) + @tailrec final def locate(idx: Int): Unrolled[T] = + if (idx < size) this else next.locate(idx - size) + def prepend(elem: T) = if (size < array.length) { + // shift the elements of the array right + // then insert the element + shiftright() + array(0) = elem + size += 1 + this + } else { + // allocate a new node and store element + // then make it point to this + val newhead = new Unrolled[T] + newhead.append(elem) + newhead.next = this + newhead + } + // shifts right assuming enough space + private def shiftright() { + var i = size - 1 + while (i >= 0) { + array(i + 1) = array(i) + i -= 1 + } + } + // returns pointer to new last if changed + @tailrec final def remove(idx: Int, buffer: UnrolledBuffer[T]): T = + if (idx < size) { + // remove the element + // then try to merge with the next bucket + val r = array(idx) + shiftleft(idx) + size -= 1 + if (tryMergeWithNext()) buffer.lastPtr = this + r + } else next.remove(idx - size, buffer) + // shifts left elements after `leftb` (overwrites `leftb`) + private def shiftleft(leftb: Int) { + var i = leftb + while (i < (size - 1)) { + array(i) = array(i + 1) + i += 1 + } + nullout(i, i + 1) + } + protected def tryMergeWithNext() = if (next != null && (size + next.size) < (array.length * waterline / waterlineDelim)) { + // copy the next array, then discard the next node + Array.copy(next.array, 0, array, size, next.size) + size = size + next.size + next = next.next + if (next eq null) true else false // checks if last node was thrown out + } else false + + @tailrec final def insertAll(idx: Int, t: Traversable[T], buffer: UnrolledBuffer[T]): Unit = if (idx < size) { + // divide this node at the appropriate position and insert all into head + // update new next + val newnextnode = new Unrolled[T] + Array.copy(array, idx, newnextnode.array, 0, size - idx) + newnextnode.size = size - idx + newnextnode.next = next + + // update this + nullout(idx, size) + size = idx + next = null + + // insert everything from iterable to this + var curr = this + for (elem <- t) curr = curr append elem + curr.next = newnextnode + + // try to merge the last node of this with the newnextnode + if (curr.tryMergeWithNext()) buffer.lastPtr = curr + } else insertAll(idx - size, t, buffer) + private def nullout(from: Int, until: Int) { + var idx = from + while (idx < until) { + array(idx) = null.asInstanceOf[T] // !! + idx += 1 + } + } + + // assumes this is the last node + // `thathead` and `thatlast` are head and last node + // of the other unrolled list, respectively + def bind(thathead: Unrolled[T]) = { + assert(next eq null) + next = thathead + tryMergeWithNext() + } + + override def toString = array.take(size).mkString("Unrolled(", ", ", ")") + " -> " + (if (next ne null) next.toString else "") + } + +} + diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index 07caafb417..79dddc7c8b 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -11,7 +11,8 @@ import scala.collection.parallel.ParMapLike import scala.collection.parallel.Combiner import scala.collection.parallel.ParIterableIterator import scala.collection.parallel.EnvironmentPassingCombiner -import scala.collection.parallel.Unrolled +import scala.collection.parallel.UnrolledBuffer.Unrolled +import scala.collection.parallel.UnrolledBuffer import scala.collection.generic.ParMapFactory import scala.collection.generic.CanCombineFrom import scala.collection.generic.GenericParMapTemplate @@ -134,26 +135,25 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => sz += 1 val hc = emptyTrie.computeHash(elem._1) val pos = hc & 0x1f - if (lasts(pos) eq null) { + if (buckets(pos) eq null) { // initialize bucket - heads(pos) = new Unrolled[(K, V)] - lasts(pos) = heads(pos) + buckets(pos) = new UnrolledBuffer[(K, V)] } // add to bucket - lasts(pos) = lasts(pos).add(elem) + buckets(pos) += elem this } def result = { - val buckets = heads.filter(_ != null) - val root = new Array[HashMap[K, V]](buckets.length) + val bucks = buckets.filter(_ != null).map(_.headPtr) + val root = new Array[HashMap[K, V]](bucks.length) - executeAndWaitResult(new CreateTrie(buckets, root, 0, buckets.length)) + executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length)) var bitmap = 0 var i = 0 while (i < rootsize) { - if (heads(i) ne null) bitmap |= 1 << i + if (buckets(i) ne null) bitmap |= 1 << i i += 1 } val sz = root.foldLeft(0)(_ + _.size) @@ -167,18 +167,18 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => } override def toString = { - "HashTrieCombiner(buckets:\n\t" + heads.filter(_ != null).mkString("\n\t") + ")\n" + "HashTrieCombiner(buckets:\n\t" + buckets.filter(_ != null).mkString("\n\t") + ")\n" } /* tasks */ - class CreateTrie(buckets: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] { + class CreateTrie(bucks: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] { var result = () def leaf(prev: Option[Unit]) = { var i = offset val until = offset + howmany while (i < until) { - root(i) = createTrie(buckets(i)) + root(i) = createTrie(bucks(i)) i += 1 } } @@ -204,7 +204,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => } def split = { val fp = howmany / 2 - List(new CreateTrie(buckets, root, offset, fp), new CreateTrie(buckets, root, offset + fp, howmany - fp)) + List(new CreateTrie(bucks, root, offset, fp), new CreateTrie(bucks, root, offset + fp, howmany - fp)) } def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) } diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala index 33e2e7102a..66ded02397 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala @@ -11,7 +11,8 @@ import scala.collection.parallel.ParSetLike import scala.collection.parallel.Combiner import scala.collection.parallel.ParIterableIterator import scala.collection.parallel.EnvironmentPassingCombiner -import scala.collection.parallel.Unrolled +import scala.collection.parallel.UnrolledBuffer.Unrolled +import scala.collection.parallel.UnrolledBuffer import scala.collection.generic.ParSetFactory import scala.collection.generic.CanCombineFrom import scala.collection.generic.GenericParTemplate @@ -112,26 +113,25 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] => sz += 1 val hc = emptyTrie.computeHash(elem) val pos = hc & 0x1f - if (lasts(pos) eq null) { + if (buckets(pos) eq null) { // initialize bucket - heads(pos) = new Unrolled[Any] - lasts(pos) = heads(pos) + buckets(pos) = new UnrolledBuffer[Any] } // add to bucket - lasts(pos) = lasts(pos).add(elem) + buckets(pos) += elem this } def result = { - val buckets = heads.filter(_ != null) - val root = new Array[HashSet[T]](buckets.length) + val bucks = buckets.filter(_ != null).map(_.headPtr) + val root = new Array[HashSet[T]](bucks.length) - executeAndWaitResult(new CreateTrie(buckets, root, 0, buckets.length)) + executeAndWaitResult(new CreateTrie(bucks, root, 0, bucks.length)) var bitmap = 0 var i = 0 while (i < rootsize) { - if (heads(i) ne null) bitmap |= 1 << i + if (buckets(i) ne null) bitmap |= 1 << i i += 1 } val sz = root.foldLeft(0)(_ + _.size) @@ -146,14 +146,14 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] => /* tasks */ - class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int) + class CreateTrie(bucks: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] { var result = () def leaf(prev: Option[Unit]) = { var i = offset val until = offset + howmany while (i < until) { - root(i) = createTrie(buckets(i)) + root(i) = createTrie(bucks(i)) i += 1 } } @@ -179,7 +179,7 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] => } def split = { val fp = howmany / 2 - List(new CreateTrie(buckets, root, offset, fp), new CreateTrie(buckets, root, offset + fp, howmany - fp)) + List(new CreateTrie(bucks, root, offset, fp), new CreateTrie(bucks, root, offset + fp, howmany - fp)) } def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel) } diff --git a/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala b/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala new file mode 100644 index 0000000000..f52bfc8544 --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParFlatHashTable.scala @@ -0,0 +1,95 @@ +package scala.collection +package parallel.mutable + + + + +import collection.parallel.ParIterableIterator + + + + +trait ParFlatHashTable[T] extends collection.mutable.FlatHashTable[T] { + + override def alwaysInitSizeMap = true + + abstract class ParFlatHashTableIterator(var idx: Int, val until: Int, val totalsize: Int) + extends ParIterableIterator[T] with SizeMapUtils { + import collection.DebugUtils._ + + private var traversed = 0 + private val itertable = table + + if (hasNext) scan() + + private def scan() { + while (itertable(idx) eq null) idx += 1 + } + + def newIterator(index: Int, until: Int, totalsize: Int): ParIterableIterator[T] + + def remaining = totalsize - traversed + def hasNext = traversed < totalsize + def next = if (hasNext) { + val r = itertable(idx).asInstanceOf[T] + traversed += 1 + idx += 1 + if (hasNext) scan() + r + } else Iterator.empty.next + def split = if (remaining > 1) { + val divpt = (until + idx) / 2 + + val fstidx = idx + val fstuntil = divpt + val fsttotal = calcNumElems(idx, divpt, itertable.length, sizeMapBucketSize) + val fstit = newIterator(fstidx, fstuntil, fsttotal) + + val sndidx = divpt + val snduntil = until + val sndtotal = remaining - fsttotal + val sndit = newIterator(sndidx, snduntil, sndtotal) + + Seq(fstit, sndit) + } else Seq(this) + + override def debugInformation = buildString { + append => + append("Parallel flat hash table iterator") + append("---------------------------------") + append("Traversed/total: " + traversed + " / " + totalsize) + append("Table idx/until: " + idx + " / " + until) + append("Table length: " + itertable.length) + append("Table: ") + append(arrayString(itertable, 0, itertable.length)) + append("Sizemap: ") + append(arrayString(sizemap, 0, sizemap.length)) + } + + protected def countElems(from: Int, until: Int) = { + var count = 0 + var i = from + while (i < until) { + if (itertable(i) ne null) count += 1 + i += 1 + } + count + } + + protected def countBucketSizes(frombucket: Int, untilbucket: Int) = { + var count = 0 + var i = frombucket + while (i < untilbucket) { + count += sizemap(i) + i += 1 + } + count + } + + private def check = if (table.slice(idx, until).count(_ != null) != remaining) { + println("Invariant broken: " + debugInformation) + assert(false) + } + } + +} diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala index 7e4e26d758..a737c03db1 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala @@ -128,42 +128,44 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => sz += 1 val hc = improve(elemHashCode(elem._1)) val pos = (hc >>> nonmasklen) - if (lasts(pos) eq null) { + if (buckets(pos) eq null) { // initialize bucket - heads(pos) = new Unrolled[DefaultEntry[K, V]] - lasts(pos) = heads(pos) + buckets(pos) = new UnrolledBuffer[DefaultEntry[K, V]]() } // add to bucket - lasts(pos) = lasts(pos).add(new DefaultEntry(elem._1, elem._2)) + buckets(pos) += new DefaultEntry(elem._1, elem._2) this } def result: ParHashMap[K, V] = if (size >= (ParHashMapCombiner.numblocks * sizeMapBucketSize)) { // 1024 // construct table val table = new AddingHashTable(size, tableLoadFactor) - val insertcount = executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks)) + val bucks = buckets.map(b => if (b ne null) b.headPtr else null) + val insertcount = executeAndWaitResult(new FillBlocks(bucks, table, 0, bucks.length)) table.setSize(insertcount) // TODO compare insertcount and size to see if compression is needed val c = table.hashTableContents new ParHashMap(c) } else { // construct a normal table and fill it sequentially + // TODO parallelize by keeping separate sizemaps and merging them val table = new HashTable[K, DefaultEntry[K, V]] { def insertEntry(e: DefaultEntry[K, V]) = if (super.findEntry(e.key) eq null) super.addEntry(e) sizeMapInit(table.length) } var i = 0 while (i < ParHashMapCombiner.numblocks) { - if (heads(i) ne null) { - for (elem <- heads(i)) table.insertEntry(elem) + if (buckets(i) ne null) { + for (elem <- buckets(i)) table.insertEntry(elem) } i += 1 } - // TODO compression val c = table.hashTableContents new ParHashMap(c) } + /* classes */ + /** A hash table which will never resize itself. Knowing the number of elements in advance, * it allocates the table of the required size when created. * @@ -213,6 +215,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => /* tasks */ + import UnrolledBuffer.Unrolled + class FillBlocks(buckets: Array[Unrolled[DefaultEntry[K, V]]], table: AddingHashTable, offset: Int, howmany: Int) extends super.Task[Int, FillBlocks] { var result = Int.MinValue diff --git a/src/library/scala/collection/parallel/mutable/ParHashSet.scala b/src/library/scala/collection/parallel/mutable/ParHashSet.scala index dc33ef3189..e14fbd7305 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashSet.scala @@ -3,17 +3,265 @@ package scala.collection.parallel.mutable +import collection.generic._ +import collection.mutable.HashSet +import collection.mutable.FlatHashTable +import collection.parallel.Combiner +import collection.parallel.EnvironmentPassingCombiner +import collection.parallel.UnrolledBuffer +class ParHashSet[T] private[collection] (contents: FlatHashTable.Contents[T]) +extends ParSet[T] + with GenericParTemplate[T, ParHashSet] + with ParSetLike[T, ParHashSet[T], collection.mutable.HashSet[T]] + with ParFlatHashTable[T] +{ + initWithContents(contents) + // println("----> new par hash set!") + // java.lang.Thread.dumpStack + // println(debugInformation) + def this() = this(null) + override def companion = ParHashSet + override def empty = new ParHashSet + override def iterator = parallelIterator + override def size = tableSize + def seq = new HashSet(hashTableContents) + def +=(elem: T) = { + addEntry(elem) + this + } + + def -=(elem: T) = { + removeEntry(elem) + this + } + + def contains(elem: T) = containsEntry(elem) + + def parallelIterator = new ParHashSetIterator(0, table.length, size) with SCPI + + type SCPI = SignalContextPassingIterator[ParHashSetIterator] + + class ParHashSetIterator(start: Int, iteratesUntil: Int, totalElements: Int) + extends ParFlatHashTableIterator(start, iteratesUntil, totalElements) with ParIterator { + me: SCPI => + def newIterator(start: Int, until: Int, total: Int) = new ParHashSetIterator(start, until, total) with SCPI + } + + import collection.DebugUtils._ + override def debugInformation = buildString { + append => + append("Parallel flat hash table set") + append("No. elems: " + tableSize) + append("Table length: " + table.length) + append("Table: ") + append(arrayString(table, 0, table.length)) + append("Sizemap: ") + append(arrayString(sizemap, 0, sizemap.length)) + } + +} + + +/** $factoryInfo + * @define Coll mutable.ParSet + * @define coll mutable parallel set + */ +object ParHashSet extends ParSetFactory[ParHashSet] { + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParHashSet[T]] = new GenericCanCombineFrom[T] + + override def newBuilder[T]: Combiner[T, ParHashSet[T]] = newCombiner + + override def newCombiner[T]: Combiner[T, ParHashSet[T]] = ParHashSetCombiner.apply[T] +} + + +private[mutable] abstract class ParHashSetCombiner[T](private val tableLoadFactor: Int) +extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, ParHashSetCombiner[T]](ParHashSetCombiner.numblocks) +with collection.mutable.FlatHashTable.HashUtils[T] { +self: EnvironmentPassingCombiner[T, ParHashSet[T]] => + private var mask = ParHashSetCombiner.discriminantmask + private var nonmasklen = ParHashSetCombiner.nonmasklength + + def +=(elem: T) = { + sz += 1 + val hc = improve(elemHashCode(elem)) + val pos = hc >>> nonmasklen + if (buckets(pos) eq null) { + // initialize bucket + buckets(pos) = new UnrolledBuffer[Any] + } + // add to bucket + buckets(pos) += elem + this + } + + def result: ParHashSet[T] = { + val contents = if (size >= ParHashSetCombiner.numblocks * sizeMapBucketSize) parPopulate else seqPopulate + new ParHashSet(contents) + } + + private def parPopulate: FlatHashTable.Contents[T] = { + // construct it in parallel + val table = new AddingFlatHashTable(size, tableLoadFactor) + val (inserted, leftovers) = executeAndWaitResult(new FillBlocks(buckets, table, 0, buckets.length)) + var leftinserts = 0 + for (elem <- leftovers) leftinserts += table.insertEntry(0, table.tableLength, elem.asInstanceOf[T]) + table.setSize(leftinserts + inserted) + table.hashTableContents + } + + private def seqPopulate: FlatHashTable.Contents[T] = { + // construct it sequentially + // TODO parallelize by keeping separate size maps and merging them + val tbl = new FlatHashTable[T] { + sizeMapInit(table.length) + } + for { + buffer <- buckets; + if buffer ne null; + elem <- buffer + } tbl.addEntry(elem.asInstanceOf[T]) + tbl.hashTableContents + } + + /* classes */ + + /** A flat hash table which doesn't resize itself. It accepts the number of elements + * it has to take and allocates the underlying hash table in advance. + * Elements can only be added to it. The final size has to be adjusted manually. + * It is internal to `ParHashSet` combiners. + * + */ + class AddingFlatHashTable(numelems: Int, lf: Int) extends FlatHashTable[T] { + _loadFactor = lf + table = new Array[AnyRef](capacity(FlatHashTable.sizeForThreshold(numelems, _loadFactor))) + tableSize = 0 + threshold = FlatHashTable.newThreshold(_loadFactor, table.length) + sizeMapInit(table.length) + + def tableLength = table.length + + def setSize(sz: Int) = tableSize = sz + + /** + * The elements are added using the `insertEntry` method. This method accepts three + * arguments: + * + * @param insertAt where to add the element (set to -1 to use its hashcode) + * @param comesBefore the position before which the element should be added to + * @param elem the element to be added + * + * If the element is to be inserted at the position corresponding to its hash code, + * the table will try to add the element in such a position if possible. Collisions are resolved + * using linear hashing, so the element may actually have to be added to a position + * that follows the specified one. In the case that the first unoccupied position + * comes after `comesBefore`, the element is not added and the method simply returns `-1`, + * indicating that it couldn't add the element in a position that comes before the + * specified one. + * If the element is already present in the hash table, it is not added, and this method + * returns 0. If the element is added, it returns 1. + */ + def insertEntry(insertAt: Int, comesBefore: Int, elem: T): Int = { + var h = insertAt + if (h == -1) h = index(elemHashCode(elem)) + var entry = table(h) + while (null != entry) { + if (entry == elem) return 0 + h = (h + 1) // we *do not* do `(h + 1) % table.length` here, because we don't overlap!! + if (h >= comesBefore) return -1 + entry = table(h) + } + table(h) = elem.asInstanceOf[AnyRef] + tableSize = tableSize + 1 + nnSizeMapAdd(h) + 1 + } + } + + /* tasks */ + + class FillBlocks(buckets: Array[UnrolledBuffer[Any]], table: AddingFlatHashTable, val offset: Int, val howmany: Int) + extends super.Task[(Int, UnrolledBuffer[Any]), FillBlocks] { + var result = (Int.MinValue, new UnrolledBuffer[Any]); + def leaf(prev: Option[(Int, UnrolledBuffer[Any])]) { + var i = offset + var totalinserts = 0 + var leftover = new UnrolledBuffer[Any]() + while (i < (offset + howmany)) { + val (inserted, intonextblock) = fillBlock(i, buckets(i), leftover) + totalinserts += inserted + leftover = intonextblock + i += 1 + } + result = (totalinserts, leftover) + } + private val blocksize = table.tableLength >> ParHashSetCombiner.discriminantbits + private def blockStart(block: Int) = block * blocksize + private def nextBlockStart(block: Int) = (block + 1) * blocksize + private def fillBlock(block: Int, elems: UnrolledBuffer[Any], leftovers: UnrolledBuffer[Any]): (Int, UnrolledBuffer[Any]) = { + val beforePos = nextBlockStart(block) + + // store the elems + val (elemsIn, elemsLeft) = if (elems != null) insertAll(-1, beforePos, elems) else (0, UnrolledBuffer[Any]()) + + // store the leftovers + val (leftoversIn, leftoversLeft) = insertAll(blockStart(block), beforePos, leftovers) + + // return the no. of stored elements tupled with leftovers + (elemsIn + leftoversIn, elemsLeft concat leftoversLeft) + } + private def insertAll(atPos: Int, beforePos: Int, elems: UnrolledBuffer[Any]): (Int, UnrolledBuffer[Any]) = { + var it = elems.iterator + var leftovers = new UnrolledBuffer[Any] + var inserted = 0 + while (it.hasNext) { + val elem = it.next + val res = table.insertEntry(atPos, beforePos, elem.asInstanceOf[T]) + if (res >= 0) inserted += res + else leftovers += elem + } + (inserted, leftovers) + } + def split = { + val fp = howmany / 2 + List(new FillBlocks(buckets, table, offset, fp), new FillBlocks(buckets, table, offset + fp, howmany - fp)) + } + override def merge(that: FillBlocks) { + // take the leftovers from the left task, store them into the block of the right task + val atPos = blockStart(that.offset) + val beforePos = blockStart(that.offset + that.howmany) + val (inserted, remainingLeftovers) = insertAll(atPos, beforePos, this.result._2) + + // anything left after trying the store the left leftovers is added to the right task leftovers + // and a new leftovers set is produced in this way + // the total number of successfully inserted elements is adjusted accordingly + result = (this.result._1 + that.result._1 + inserted, remainingLeftovers concat that.result._2) + } + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, parallelismLevel) + } + +} + + +private[mutable] object ParHashSetCombiner { + private[mutable] val discriminantbits = 5 + private[mutable] val numblocks = 1 << discriminantbits + private[mutable] val discriminantmask = ((1 << discriminantbits) - 1); + private[mutable] val nonmasklength = 32 - discriminantbits + + def apply[T] = new ParHashSetCombiner[T](FlatHashTable.defaultLoadFactor) with EnvironmentPassingCombiner[T, ParHashSet[T]] +} diff --git a/src/library/scala/collection/parallel/mutable/ParHashTable.scala b/src/library/scala/collection/parallel/mutable/ParHashTable.scala index a9ab577b55..efba6c8d9c 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashTable.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashTable.scala @@ -15,14 +15,13 @@ import collection.parallel.ParIterableIterator */ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.mutable.HashTable[K, Entry] { - // always initialize size map - if (!isSizeMapDefined) sizeMapInitAndRebuild + override def alwaysInitSizeMap = true /** A parallel iterator returning all the entries. */ abstract class EntryIterator[T, +IterRepr <: ParIterableIterator[T]] (private var idx: Int, private val until: Int, private val totalsize: Int, private var es: Entry) - extends ParIterableIterator[T] { + extends ParIterableIterator[T] with SizeMapUtils { private val itertable = table private var traversed = 0 scan() @@ -78,7 +77,7 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m val sidx = idx + divsz + 1 // + 1 preserves iteration invariant val suntil = until val ses = itertable(sidx - 1).asInstanceOf[Entry] // sidx - 1 ensures counting from the right spot - val stotal = calcNumElems(sidx - 1, suntil) + val stotal = calcNumElems(sidx - 1, suntil, table.length, sizeMapBucketSize) // first iterator params val fidx = idx @@ -110,35 +109,7 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m buff map { e => entry2item(e) } } - private def calcNumElems(from: Int, until: Int) = { - // find the first bucket - val fbindex = from / sizeMapBucketSize - - // find the last bucket - val lbindex = until / sizeMapBucketSize - // note to self: FYI if you define lbindex as from / sizeMapBucketSize, the first branch - // below always triggers and tests pass, so you spend a great day benchmarking and profiling - - if (fbindex == lbindex) { - // if first and last are the same, just count between `from` and `until` - // return this count - countElems(from, until) - } else { - // otherwise count in first, then count in last - val fbuntil = ((fbindex + 1) * sizeMapBucketSize) min itertable.length - val fbcount = countElems(from, fbuntil) - val lbstart = lbindex * sizeMapBucketSize - val lbcount = countElems(lbstart, until) - - // and finally count the elements in all the buckets between first and last using a sizemap - val inbetween = countBucketSizes(fbindex + 1, lbindex) - - // return the sum - fbcount + inbetween + lbcount - } - } - - private def countElems(from: Int, until: Int) = { + protected def countElems(from: Int, until: Int) = { var c = 0 var idx = from var es: Entry = null @@ -153,7 +124,7 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m c } - private def countBucketSizes(fromBucket: Int, untilBucket: Int) = { + protected def countBucketSizes(fromBucket: Int, untilBucket: Int) = { var c = 0 var idx = fromBucket while (idx < untilBucket) { @@ -168,7 +139,6 @@ trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.m -object ParHashTable { - var iters = 0 -} + + diff --git a/src/library/scala/collection/parallel/mutable/ParSet.scala b/src/library/scala/collection/parallel/mutable/ParSet.scala new file mode 100644 index 0000000000..e700bd97d7 --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParSet.scala @@ -0,0 +1,42 @@ +package scala.collection.parallel.mutable + + + +import scala.collection.generic._ +import scala.collection.parallel.Combiner + + + + + + +/** A mutable variant of `ParSet`. + * + * @define Coll mutable.ParSet + * @define coll mutable parallel set + */ +trait ParSet[T] +extends collection.mutable.Set[T] + with ParIterable[T] + with collection.parallel.ParSet[T] + with GenericParTemplate[T, ParSet] + with ParSetLike[T, ParSet[T], collection.mutable.Set[T]] +{ +self => + override def companion: GenericCompanion[ParSet] with GenericParCompanion[ParSet] = ParSet; + override def empty: ParSet[T] = ParHashSet() +} + + +/** $factoryInfo + * @define Coll mutable.ParSet + * @define coll mutable parallel set + */ +object ParSet extends ParSetFactory[ParSet] { + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParSet[T]] = new GenericCanCombineFrom[T] + + override def newBuilder[T]: Combiner[T, ParSet[T]] = ParHashSet.newBuilder + + override def newCombiner[T]: Combiner[T, ParSet[T]] = ParHashSet.newCombiner +} + diff --git a/src/library/scala/collection/parallel/mutable/ParSetLike.scala b/src/library/scala/collection/parallel/mutable/ParSetLike.scala new file mode 100644 index 0000000000..d3fab5a4db --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParSetLike.scala @@ -0,0 +1,70 @@ +package scala.collection +package parallel.mutable + + + +import scala.collection.mutable.Set +import scala.collection.mutable.Builder + + + + + + + + +trait ParSetLike[T, + +Repr <: ParSetLike[T, Repr, Sequential] with ParSet[T], + +Sequential <: mutable.Set[T] with mutable.SetLike[T, Sequential]] +extends mutable.SetLike[T, Repr] + with collection.parallel.ParIterableLike[T, Repr, Sequential] + with collection.parallel.ParSetLike[T, Repr, Sequential] +{ self => + + protected[this] override def newBuilder: Builder[T, Repr] = newCombiner + + protected[this] override def newCombiner: parallel.Combiner[T, Repr] + + override def empty: Repr + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/mutable/package.scala b/src/library/scala/collection/parallel/mutable/package.scala index 0590539a29..cddd09f696 100644 --- a/src/library/scala/collection/parallel/mutable/package.scala +++ b/src/library/scala/collection/parallel/mutable/package.scala @@ -10,6 +10,43 @@ import scala.collection.generic.Sizing package object mutable { + /* classes and traits */ + + private[mutable] trait SizeMapUtils { + + protected def calcNumElems(from: Int, until: Int, tableLength: Int, sizeMapBucketSize: Int) = { + // find the first bucket + val fbindex = from / sizeMapBucketSize + + // find the last bucket + val lbindex = until / sizeMapBucketSize + // note to self: FYI if you define lbindex as from / sizeMapBucketSize, the first branch + // below always triggers and tests pass, so you spend a great day benchmarking and profiling + + if (fbindex == lbindex) { + // if first and last are the same, just count between `from` and `until` + // return this count + countElems(from, until) + } else { + // otherwise count in first, then count in last + val fbuntil = ((fbindex + 1) * sizeMapBucketSize) min tableLength + val fbcount = countElems(from, fbuntil) + val lbstart = lbindex * sizeMapBucketSize + val lbcount = countElems(lbstart, until) + + // and finally count the elements in all the buckets between first and last using a sizemap + val inbetween = countBucketSizes(fbindex + 1, lbindex) + + // return the sum + fbcount + inbetween + lbcount + } + } + + protected def countElems(from: Int, until: Int): Int + + protected def countBucketSizes(fromBucket: Int, untilBucket: Int): Int + } + /* hack-arounds */ private[mutable] class ExposedArrayBuffer[T] extends ArrayBuffer[T] with Sizing { diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index 93f20132e5..0872ccc423 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -19,7 +19,6 @@ package object parallel { val CHECK_RATE = 512 val SQRT2 = math.sqrt(2) val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors - private[parallel] val unrolledsize = 16 /* functions */ @@ -35,6 +34,8 @@ package object parallel { private[parallel] def unsupportedop(msg: String) = throw new UnsupportedOperationException(msg) + private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString) + /* implicit conversions */ /** An implicit conversion providing arrays with a `par` method, which @@ -114,38 +115,6 @@ package object parallel { final class CompositeThrowable(val throwables: Set[Throwable]) extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.mkString(", ")) - /** Unrolled list node. - */ - private[parallel] class Unrolled[T: ClassManifest] { - var size = 0 - var array = new Array[T](unrolledsize) - var next: Unrolled[T] = null - // adds and returns itself or the new unrolled if full - def add(elem: T): Unrolled[T] = if (size < unrolledsize) { - array(size) = elem - size += 1 - this - } else { - next = new Unrolled[T] - next.add(elem) - } - def foreach[U](f: T => U) { - var unrolled = this - var i = 0 - while (unrolled ne null) { - val chunkarr = unrolled.array - val chunksz = unrolled.size - while (i < chunksz) { - val elem = chunkarr(i) - f(elem) - i += 1 - } - i = 0 - unrolled = unrolled.next - } - } - override def toString = array.take(size).mkString("Unrolled(", ", ", ")") + (if (next ne null) next.toString else "") - } /** A helper iterator for iterating very small array buffers. * Automatically forwards the signal delegate when splitting. @@ -184,12 +153,13 @@ package object parallel { * are unrolled linked lists. Some parallel collections are constructed by * sorting their result set according to some criteria. * - * A reference `heads` to bucket heads is maintained, as well as a reference - * `lasts` to the last unrolled list node. Size is kept in `sz` and maintained - * whenever 2 bucket combiners are combined. + * A reference `buckets` to buckets is maintained. Total size of all buckets + * is kept in `sz` and maintained whenever 2 bucket combiners are combined. * * Clients decide how to maintain these by implementing `+=` and `result`. - * Populating and using the buckets is up to the client. + * Populating and using the buckets is up to the client. While populating them, + * the client should update `sz` accordingly. Note that a bucket is by default + * set to `null` to save space - the client should initialize it. * Note that in general the type of the elements contained in the buckets `Buck` * doesn't have to correspond to combiner element type `Elem`. * @@ -207,15 +177,13 @@ package object parallel { (private val bucketnumber: Int) extends Combiner[Elem, To] { self: EnvironmentPassingCombiner[Elem, To] => - protected var heads: Array[Unrolled[Buck]] @uncheckedVariance = new Array[Unrolled[Buck]](bucketnumber) - protected var lasts: Array[Unrolled[Buck]] @uncheckedVariance = new Array[Unrolled[Buck]](bucketnumber) + protected var buckets: Array[UnrolledBuffer[Buck]] @uncheckedVariance = new Array[UnrolledBuffer[Buck]](bucketnumber) protected var sz: Int = 0 def size = sz def clear = { - heads = new Array[Unrolled[Buck]](bucketnumber) - lasts = new Array[Unrolled[Buck]](bucketnumber) + buckets = new Array[UnrolledBuffer[Buck]](bucketnumber) sz = 0 } @@ -229,12 +197,10 @@ package object parallel { val that = other.asInstanceOf[BucketCombiner[Elem, To, Buck, CombinerType]] var i = 0 while (i < bucketnumber) { - if (lasts(i) eq null) { - heads(i) = that.heads(i) - lasts(i) = that.lasts(i) + if (buckets(i) eq null) { + buckets(i) = that.buckets(i) } else { - lasts(i).next = that.heads(i) - if (that.lasts(i) ne null) lasts(i) = that.lasts(i) + if (that.buckets(i) ne null) buckets(i) concat that.buckets(i) } i += 1 } diff --git a/test/files/run/UnrolledBuffer.scala b/test/files/run/UnrolledBuffer.scala new file mode 100644 index 0000000000..7e113c3e04 --- /dev/null +++ b/test/files/run/UnrolledBuffer.scala @@ -0,0 +1,125 @@ + + + + +import collection.parallel.UnrolledBuffer + + + +object Test { + + def main(args: Array[String]) { + val u1 = new UnrolledBuffer[Int] + assert(u1.isEmpty) + assert(u1.size == 0) + + u1 += 1 + u1 += 2 + u1 += 3 + assert(u1 == UnrolledBuffer(1, 2, 3)) + assert(u1.toList == List(1, 2, 3)) + assert(u1.nonEmpty) + assert(u1.size == 3) + + u1.clear + assert(u1.isEmpty) + assert(u1.size == 0) + + u1 += 1 + u1 += 2 + u1 += 3 + u1.remove(1) + assert(u1.nonEmpty) + assert(u1.size == 2) + assert(u1 == UnrolledBuffer(1, 3)) + assert(u1.toList == List(1, 3)) + + u1 concat UnrolledBuffer(5, 7, 9) + assert(u1 == UnrolledBuffer(1, 3, 5, 7, 9)) + + val u2 = u1 map { x => (x - 1) / 2 } + assert(u2 == UnrolledBuffer(0, 1, 2, 3, 4)) + + u1.clear + u2.clear + assert(u1.size == 0) + assert(u2.size == 0) + + for (i <- 0 until 500) u1 += i + for (i <- 500 until 1000) u2 += i + assert(u1.size == 500) + assert(u2.size == 500) + assert(u1.iterator.toList == (0 until 500).toList) + assert((for (elem <- u1) yield elem) sameElements (0 until 500)) + + u1 concat u2 + assert(u1.size == 1000) + assert(u2.size == 0) + assertCorrect(u1) + + u1 concat UnrolledBuffer() + assertCorrect(u1) + + val u3 = u1 map { x => x } + var i = 0 + for (elem <- u1) { + assert(elem == u3(i)) + i += 1 + } + + u1.remove(999) + assert(u1.size == 999) + assertCorrect(u1) + + u1.remove(500) + assert(u1.size == 998) + assertCorrect(u1) + + u1.remove(5) + assert(u1.size == 997) + assertCorrect(u1) + + u1.remove(0) + assert(u1.size == 996) + assertCorrect(u1) + + u1.insert(0, 0) + assert(u1.size == 997) + assertCorrect(u1) + + u1.insert(5, 5) + assert(u1.size == 998) + assertCorrect(u1) + + u1.insert(500, 500) + assert(u1.size == 999) + assertCorrect(u1) + + u1.insert(999, 999) + assert(u1.size == 1000) + assertCorrect(u1) + + for (i <- -100 until 0) { + i +=: u1 + assertCorrect(u1) + } + assert(u1.size == 1100) + assertCorrect(u1) + } + + def assertCorrect(u1: UnrolledBuffer[Int]) { + val sz = u1.size + val store = new Array[Int](sz) + for (i <- 0 until sz) { + store(i) = u1(i) + u1(i) = sz - i + } + for (i <- 0 until sz) assert(u1(i) == (sz - i)) + for (i <- 0 until sz) u1(i) = store(i) + for (i <- 0 until sz) assert(store(i) == u1(i)) + + assert((u1 map { x => x }) == u1) + assert(u1.iterator.toSeq.size == u1.size) + } + +} diff --git a/test/files/scalacheck/Unrolled.scala b/test/files/scalacheck/Unrolled.scala new file mode 100644 index 0000000000..d69e62dd01 --- /dev/null +++ b/test/files/scalacheck/Unrolled.scala @@ -0,0 +1,26 @@ +import org.scalacheck._ +import Prop._ +import Gen._ + +import collection.parallel.UnrolledBuffer + +object Test extends Properties("UnrolledBuffer") { + + property("concat size") = forAll { (l1: List[Int], l2: List[Int]) => + val u1 = new UnrolledBuffer[Int] + u1 ++= l1 + val u2 = new UnrolledBuffer[Int] + u2 ++= l2 + val totalsz = u1.size + u2.size + u1 concat u2 + totalsz == u1.size + } + + property("adding") = forAll { (l: List[Int]) => + val u = new UnrolledBuffer[Int] + u ++= l + u == l + } + +} + diff --git a/test/files/scalacheck/parallel-collections/PairOperators.scala b/test/files/scalacheck/parallel-collections/PairOperators.scala index 48cbd136e5..2055c29d38 100644 --- a/test/files/scalacheck/parallel-collections/PairOperators.scala +++ b/test/files/scalacheck/parallel-collections/PairOperators.scala @@ -49,7 +49,7 @@ trait PairOperators[K, V] extends Operators[(K, V)] { def apply(kv: (K, V)) = kfm(kv._1).toIterable zip vfm(kv._2).toIterable } - def filterPredicates = zipPredicates(koperators.filterPredicates, voperators.existsPredicates) + def filterPredicates = zipPredicates(koperators.filterPredicates, voperators.filterPredicates) def filterNotPredicates = filterPredicates diff --git a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala new file mode 100644 index 0000000000..973a5cdf4b --- /dev/null +++ b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala @@ -0,0 +1,94 @@ +package scala.collection.parallel +package mutable + + + +import org.scalacheck._ +import org.scalacheck.Gen +import org.scalacheck.Gen._ +import org.scalacheck.Prop._ +import org.scalacheck.Properties +import org.scalacheck.Arbitrary._ + +import scala.collection._ +import scala.collection.parallel.ops._ + + +abstract class ParallelHashSetCheck[T](tp: String) extends ParallelSetCheck[T]("mutable.ParHashSet[" + tp + "]") { + ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + + type CollType = ParHashSet[T] + + def isCheckingViews = false + + def hasStrictOrder = false + + def ofSize(vals: Seq[Gen[T]], sz: Int) = { + val hm = new mutable.HashSet[T] + val gen = vals(rnd.nextInt(vals.size)) + for (i <- 0 until sz) hm += sample(gen) + hm + } + + def fromTraversable(t: Traversable[T]) = { + val phm = new ParHashSet[T] + var i = 0 + for (kv <- t.toList) { + phm += kv + i += 1 + } + phm + } + +} + + +object IntParallelHashSetCheck extends ParallelHashSetCheck[Int]("Int") +with IntOperators +with IntValues +{ + override def printDataStructureDebugInfo(ds: AnyRef) = ds match { + case pm: ParHashSet[t] => + println("Mutable parallel hash set") + case _ => + println("could not match data structure type: " + ds.getClass) + } + + override def checkDataStructureInvariants(orig: Traversable[Int], ds: AnyRef) = ds match { + case pm: ParHashSet[t] => + // for an example of how not to write code proceed below + val invs = pm.brokenInvariants + + val containsall = (for (elem <- orig) yield { + if (pm.asInstanceOf[ParHashSet[Int]](elem) == true) true + else { + println("Does not contain original element: " + elem) + println(pm.hashTableContents.table.find(_ == elem)) + println(pm.hashTableContents.table.indexOf(elem)) + false + } + }).foldLeft(true)(_ && _) + + + if (invs.isEmpty) { + if (!containsall) println(pm.debugInformation) + containsall + } else { + println("Invariants broken:\n" + invs.mkString("\n")) + false + } + case _ => true + } + +} + + + + + + + + + + diff --git a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala index 0acdb2b0a7..d2d6119997 100644 --- a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala @@ -146,7 +146,10 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col println("mapped to: ") println(ms) println(mp) - println("valid: " + !checkDataStructureInvariants(ms, mp)) + println("sizes: ") + println(ms.size) + println(mp.size) + println("valid: " + checkDataStructureInvariants(ms, mp)) } ("op index: " + ind) |: (areEqual(ms, mp) && checkDataStructureInvariants(ms, mp)) } diff --git a/test/files/scalacheck/parallel-collections/pc.scala b/test/files/scalacheck/parallel-collections/pc.scala index 24e7d09918..4e2d5611f0 100644 --- a/test/files/scalacheck/parallel-collections/pc.scala +++ b/test/files/scalacheck/parallel-collections/pc.scala @@ -11,20 +11,25 @@ class ParCollProperties extends Properties("Parallel collections") { /* Collections */ // parallel arrays - //include(mutable.IntParallelArrayCheck) + // include(mutable.IntParallelArrayCheck) // parallel ranges - //include(immutable.ParallelRangeCheck) + // include(immutable.ParallelRangeCheck) // parallel immutable hash maps (tries) - //include(immutable.IntIntParallelHashMapCheck) + // include(immutable.IntIntParallelHashMapCheck) // parallel immutable hash sets (tries) - //include(immutable.IntParallelHashSetCheck) + // include(immutable.IntParallelHashSetCheck) // parallel mutable hash maps (tables) // include(mutable.IntIntParallelHashMapCheck) + // parallel mutable hash sets (tables) + include(mutable.IntParallelHashSetCheck) + + // parallel vectors + /* Views */ // parallel array views @@ -32,6 +37,8 @@ class ParCollProperties extends Properties("Parallel collections") { // parallel immutable hash map views // parallel mutable hash map views + + // parallel vector views } @@ -45,7 +52,7 @@ object Test { workers = 1, minSize = 0, maxSize = 4000, - minSuccessfulTests = 100 + minSuccessfulTests = 250 ), pc ) |