summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-20 20:19:56 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-20 20:19:56 +0000
commitd3d218e5ea77584489437f0dfa8148ee3764d6f7 (patch)
tree881fba9234da6654e8d914c8b56ddadd100c5cba /src/library/scala/collection/parallel
parentd13a2529aa8218836d13ee04303da4f3325933c2 (diff)
downloadscala-d3d218e5ea77584489437f0dfa8148ee3764d6f7.tar.gz
scala-d3d218e5ea77584489437f0dfa8148ee3764d6f7.tar.bz2
scala-d3d218e5ea77584489437f0dfa8148ee3764d6f7.zip
Further work on parallel mutable hash maps.
Changed HashTable interface. Fixed one test. Implemented hash map iterators. Implementing hash map combiners. Extracting common functionalities of bucket-based combiners. No review.
Diffstat (limited to 'src/library/scala/collection/parallel')
-rw-r--r--src/library/scala/collection/parallel/Combiner.scala12
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala6
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala39
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala40
-rw-r--r--src/library/scala/collection/parallel/immutable/package.scala21
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala173
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashTable.scala131
-rw-r--r--src/library/scala/collection/parallel/mutable/ParMap.scala6
-rw-r--r--src/library/scala/collection/parallel/package.scala141
9 files changed, 454 insertions, 115 deletions
diff --git a/src/library/scala/collection/parallel/Combiner.scala b/src/library/scala/collection/parallel/Combiner.scala
index a37f642d42..f47f92457f 100644
--- a/src/library/scala/collection/parallel/Combiner.scala
+++ b/src/library/scala/collection/parallel/Combiner.scala
@@ -19,7 +19,7 @@ import scala.collection.generic.Sizing
* @author prokopec
*/
trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel with TaskSupport {
- self: EnvironmentPassingCombiner[Elem, To] =>
+self: EnvironmentPassingCombiner[Elem, To] =>
type EPC = EnvironmentPassingCombiner[Elem, To]
@@ -35,7 +35,13 @@ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel w
* if they are to be used again.
*
* Also, combining two combiners `c1` and `c2` for which `c1 eq c2` is `true`, that is,
- * they are the same objects in memories, always does nothing and returns the first combiner.
+ * they are the same objects in memory:
+ *
+ * {{{
+ * c1.combine(c2)
+ * }}}
+ *
+ * always does nothing and returns `c1`.
*
* @tparam N the type of elements contained by the `other` builder
* @tparam NewTo the type of collection produced by the `other` builder
@@ -50,7 +56,7 @@ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel w
trait EnvironmentPassingCombiner[-Elem, +To] extends Combiner[Elem, To] {
abstract override def result = {
val res = super.result
-// res.environment = environment
+ if (res.isInstanceOf[TaskSupport]) res.asInstanceOf[TaskSupport].environment = environment
res
}
}
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 4d95043c3a..881ab80038 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -141,7 +141,7 @@ self =>
me: SignalContextPassingIterator[ParIterator] =>
var signalDelegate: Signalling = IdleSignalling
def repr = self.repr
- def split: Seq[ParIterator]
+ def split: Seq[ParIterableIterator[T]]
}
/** A stackable modification that ensures signal contexts get passed along the iterators.
@@ -489,7 +489,9 @@ self =>
override def take(n: Int): Repr = {
val actualn = if (size > n) n else size
if (actualn < MIN_FOR_COPY) take_sequential(actualn)
- else executeAndWaitResult(new Take(actualn, cbfactory, parallelIterator) mapResult { _.result })
+ else executeAndWaitResult(new Take(actualn, cbfactory, parallelIterator) mapResult {
+ _.result
+ })
}
private def take_sequential(n: Int) = {
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 306ec68548..37b52b7a40 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -11,6 +11,7 @@ import scala.collection.parallel.ParMapLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.ParIterableIterator
import scala.collection.parallel.EnvironmentPassingCombiner
+import scala.collection.parallel.Unrolled
import scala.collection.generic.ParMapFactory
import scala.collection.generic.CanCombineFrom
import scala.collection.generic.GenericParMapTemplate
@@ -105,21 +106,13 @@ object ParHashMap extends ParMapFactory[ParHashMap] {
}
-private[immutable] trait HashMapCombiner[K, V]
-extends Combiner[(K, V), ParHashMap[K, V]] {
+private[immutable] abstract class HashMapCombiner[K, V]
+extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), HashMapCombiner[K, V]](HashMapCombiner.rootsize) {
self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
import HashMapCombiner._
- var heads = new Array[Unrolled[(K, V)]](rootsize)
- var lasts = new Array[Unrolled[(K, V)]](rootsize)
- var size: Int = 0
-
- def clear = {
- heads = new Array[Unrolled[(K, V)]](rootsize)
- lasts = new Array[Unrolled[(K, V)]](rootsize)
- }
def +=(elem: (K, V)) = {
- size += 1
+ sz += 1
val hc = elem._1.##
val pos = hc & 0x1f
if (lasts(pos) eq null) {
@@ -132,26 +125,6 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
this
}
- def combine[N <: (K, V), NewTo >: ParHashMap[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
- // ParHashMap.totalcombines.incrementAndGet
- if (other.isInstanceOf[HashMapCombiner[_, _]]) {
- val that = other.asInstanceOf[HashMapCombiner[K, V]]
- var i = 0
- while (i < rootsize) {
- if (lasts(i) eq null) {
- heads(i) = that.heads(i)
- lasts(i) = that.lasts(i)
- } else {
- lasts(i).next = that.heads(i)
- if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
- }
- i += 1
- }
- size = size + that.size
- this
- } else error("Unexpected combiner type.")
- } else this
-
def result = {
val buckets = heads.filter(_ != null)
val root = new Array[HashMap[K, V]](buckets.length)
@@ -216,8 +189,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
}
-object HashMapCombiner {
- def apply[K, V] = new HashMapCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] {}
+private[parallel] object HashMapCombiner {
+ def apply[K, V] = new HashMapCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]]
private[immutable] val rootbits = 5
private[immutable] val rootsize = 1 << 5
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
index 0ef2681567..c9554ae1eb 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -11,6 +11,7 @@ import scala.collection.parallel.ParSetLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.ParIterableIterator
import scala.collection.parallel.EnvironmentPassingCombiner
+import scala.collection.parallel.Unrolled
import scala.collection.generic.ParSetFactory
import scala.collection.generic.CanCombineFrom
import scala.collection.generic.GenericParTemplate
@@ -101,21 +102,13 @@ object ParHashSet extends ParSetFactory[ParHashSet] {
}
-private[immutable] trait HashSetCombiner[T]
-extends Combiner[T, ParHashSet[T]] {
+private[immutable] abstract class HashSetCombiner[T]
+extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombiner[T]](HashSetCombiner.rootsize) {
self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
import HashSetCombiner._
- var heads = new Array[Unrolled[Any]](rootsize)
- var lasts = new Array[Unrolled[Any]](rootsize)
- var size: Int = 0
-
- def clear = {
- heads = new Array[Unrolled[Any]](rootsize)
- lasts = new Array[Unrolled[Any]](rootsize)
- }
def +=(elem: T) = {
- size += 1
+ sz += 1
val hc = elem.##
val pos = hc & 0x1f
if (lasts(pos) eq null) {
@@ -128,25 +121,6 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
this
}
- def combine[N <: T, NewTo >: ParHashSet[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
- if (other.isInstanceOf[HashSetCombiner[_]]) {
- val that = other.asInstanceOf[HashSetCombiner[T]]
- var i = 0
- while (i < rootsize) {
- if (lasts(i) eq null) {
- heads(i) = that.heads(i)
- lasts(i) = that.lasts(i)
- } else {
- lasts(i).next = that.heads(i)
- if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
- }
- i += 1
- }
- size = size + that.size
- this
- } else error("Unexpected combiner type.")
- } else this
-
def result = {
val buckets = heads.filter(_ != null)
val root = new Array[HashSet[T]](buckets.length)
@@ -171,7 +145,8 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
/* tasks */
- class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
+ class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int)
+ extends super.Task[Unit, CreateTrie] {
var result = ()
def leaf(prev: Option[Unit]) = {
var i = offset
@@ -274,6 +249,3 @@ object HashSetCombiner {
-
-
-
diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala
index c54875ecd3..9c6bbae8dd 100644
--- a/src/library/scala/collection/parallel/immutable/package.scala
+++ b/src/library/scala/collection/parallel/immutable/package.scala
@@ -15,25 +15,9 @@ package object immutable {
/* package level methods */
def repetition[T](elem: T, len: Int) = new Repetition(elem, len)
- /* properties */
- private[immutable] val unrolledsize = 16
+ /* constants */
/* classes */
- private[immutable] class Unrolled[T: ClassManifest] {
- var size = 0
- var array = new Array[T](unrolledsize)
- var next: Unrolled[T] = null
- // adds and returns itself or the new unrolled if full
- def add(elem: T): Unrolled[T] = if (size < unrolledsize) {
- array(size) = elem
- size += 1
- this
- } else {
- next = new Unrolled[T]
- next.add(elem)
- }
- override def toString = "Unrolled(" + array.mkString(", ") + ")"
- }
/** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method.
*
@@ -42,8 +26,7 @@ package object immutable {
* @param length the length of the collection
*/
private[parallel] class Repetition[T](elem: T, val length: Int) extends ParSeq[T] {
- self =>
-
+ self =>
def apply(idx: Int) = if (0 <= idx && idx < length) elem else throw new IndexOutOfBoundsException
def seq = throw new UnsupportedOperationException
def update(idx: Int, elem: T) = throw new UnsupportedOperationException
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
index 057faa66e1..fb4119bddc 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -5,53 +5,198 @@ package mutable
import collection.generic._
+import collection.mutable.DefaultEntry
+import collection.mutable.HashEntry
+import collection.mutable.HashTable
-
-
-class ParHashMap[K, V]
+class ParHashMap[K, V] private[collection] (contents: HashTable.Contents[K, DefaultEntry[K, V]])
extends ParMap[K, V]
with GenericParMapTemplate[K, V, ParHashMap]
with ParMapLike[K, V, ParHashMap[K, V], collection.mutable.HashMap[K, V]]
+ with ParHashTable[K, DefaultEntry[K, V]]
{
self =>
+ initWithContents(contents)
+
+ type Entry = collection.mutable.DefaultEntry[K, V]
+
+ def this() = this(null)
override def mapCompanion: GenericParMapCompanion[ParHashMap] = ParHashMap
override def empty: ParHashMap[K, V] = new ParHashMap[K, V]
- def parallelIterator = null // TODO
+ def seq = new collection.mutable.HashMap[K, V](hashTableContents)
- def seq = null // TODO
+ def parallelIterator = new ParHashMapIterator(0, table.length, size, table(0).asInstanceOf[DefaultEntry[K, V]]) with SCPI
- def get(k: K): Option[V] = null // TODO
+ override def size = tableSize
- def +=(kv: (K, V)) = null // TODO
+ def get(key: K): Option[V] = {
+ val e = findEntry(key)
+ if (e == null) None
+ else Some(e.value)
+ }
- def -=(k: K) = null // TODO
+ override def put(key: K, value: V): Option[V] = {
+ val e = findEntry(key)
+ if (e == null) { addEntry(new Entry(key, value)); None }
+ else { val v = e.value; e.value = value; Some(v) }
+ }
- override def size = 0 // TODO
+ override def update(key: K, value: V): Unit = put(key, value)
-}
+ override def remove(key: K): Option[V] = {
+ val e = removeEntry(key)
+ if (e ne null) Some(e.value)
+ else None
+ }
+ def += (kv: (K, V)): this.type = {
+ val e = findEntry(kv._1)
+ if (e == null) addEntry(new Entry(kv._1, kv._2))
+ else e.value = kv._2
+ this
+ }
+ def -=(key: K): this.type = { removeEntry(key); this }
+ type SCPI = SignalContextPassingIterator[ParHashMapIterator]
-object ParHashMap extends ParMapFactory[ParHashMap] {
+ class ParHashMapIterator(start: Int, untilIdx: Int, totalSize: Int, e: DefaultEntry[K, V])
+ extends EntryIterator[(K, V), ParHashMapIterator](start, untilIdx, totalSize, e) with ParIterator {
+ me: SCPI =>
+ def entry2item(entry: DefaultEntry[K, V]) = (entry.key, entry.value);
+ def newIterator(idxFrom: Int, idxUntil: Int, totalSz: Int, es: DefaultEntry[K, V]) =
+ new ParHashMapIterator(idxFrom, idxUntil, totalSz, es) with SCPI
+ }
- def empty[K, V]: ParHashMap[K, V] = new ParHashMap[K, V]
+}
- def newCombiner[K, V]: Combiner[(K, V), ParHashMap[K, V]] = null // TODO
- implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashMap[K, V]] = null // TODO
+object ParHashMap extends ParMapFactory[ParHashMap] {
+ def empty[K, V]: ParHashMap[K, V] = new ParHashMap[K, V]
+ def newCombiner[K, V]: Combiner[(K, V), ParHashMap[K, V]] = ParHashMapCombiner.apply[K, V]
+
+ implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashMap[K, V]] = new CanCombineFromMap[K, V]
}
+private[mutable] abstract class ParHashMapCombiner[K, V](private val tableLoadFactor: Int)
+extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntry[K, V], ParHashMapCombiner[K, V]](ParHashMapCombiner.numblocks) {
+self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
+ private var mask = ParHashMapCombiner.discriminantmask
+
+ def +=(elem: (K, V)) = {
+ sz += 1
+ val hc = elem._1.##
+ val pos = hc & mask
+ if (lasts(pos) eq null) {
+ // initialize bucket
+ heads(pos) = new Unrolled[DefaultEntry[K, V]]
+ lasts(pos) = heads(pos)
+ }
+ // add to bucket
+ lasts(pos) = lasts(pos).add(new DefaultEntry(elem._1, elem._2))
+ this
+ }
+
+ def result: ParHashMap[K, V] = {
+ // construct table
+ val table = new AddingHashTable(size, tableLoadFactor)
+
+ executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks))
+
+ val c = table.hashTableContents
+ new ParHashMap(c)
+ }
+
+ /** A hash table which will never resize itself. Knowing the number of elements in advance,
+ * it allocates the table of the required size when created.
+ *
+ * Entries are added using the `insertEntry` method. This method checks whether the element
+ * exists and updates the size map.
+ */
+ class AddingHashTable(numelems: Int, lf: Int) extends HashTable[K, DefaultEntry[K, V]] {
+ import HashTable._
+ _loadFactor = lf
+ table = new Array[HashEntry[K, DefaultEntry[K, V]]](capacity(sizeForThreshold(_loadFactor, numelems)))
+ tableSize = 0
+ threshold = newThreshold(_loadFactor, table.length)
+ sizeMapInit(table.length)
+ def insertEntry(e: DefaultEntry[K, V]) {
+ var h = index(elemHashCode(e.key))
+ var olde = table(h).asInstanceOf[DefaultEntry[K, V]]
+
+ // check if key already exists
+ var ce = olde
+ while (ce ne null) {
+ if (ce.key == e.key) {
+ h = -1
+ ce = null
+ } else ce = ce.next
+ }
+
+ // if key does not already exist
+ if (h != -1) {
+ e.next = olde
+ table(h) = e
+ tableSize = tableSize + 1
+ nnSizeMapAdd(h)
+ }
+ }
+ }
+
+ /* tasks */
+
+ class FillBlocks(buckets: Array[Unrolled[DefaultEntry[K, V]]], table: AddingHashTable, offset: Int, howmany: Int)
+ extends super.Task[Unit, FillBlocks] {
+ var result = ()
+ def leaf(prev: Option[Unit]) = {
+ var i = offset
+ val until = offset + howmany
+ while (i < until) {
+ fillBlock(buckets(i))
+ i += 1
+ }
+ }
+ private def fillBlock(elems: Unrolled[DefaultEntry[K, V]]) {
+ var unrolled = elems
+ var i = 0
+ val t = table
+ while (unrolled ne null) {
+ val chunkarr = unrolled.array
+ val chunksz = unrolled.size
+ while (i < chunksz) {
+ val elem = chunkarr(i)
+ t.insertEntry(elem)
+ i += 1
+ }
+ i = 0
+ unrolled = unrolled.next
+ }
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new FillBlocks(buckets, table, offset, fp), new FillBlocks(buckets, table, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, parallelismLevel)
+ }
+}
+private[mutable] object ParHashMapCombiner {
+ private[mutable] val discriminantbits = 5
+ private[mutable] val numblocks = 1 << discriminantbits
+ private[mutable] val discriminantmask = ((1 << discriminantbits) - 1) << (32 - discriminantbits)
+
+ def apply[K, V] = new ParHashMapCombiner[K, V](HashTable.defaultLoadFactor) with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]]
+}
+
diff --git a/src/library/scala/collection/parallel/mutable/ParHashTable.scala b/src/library/scala/collection/parallel/mutable/ParHashTable.scala
index 9e356b7fb4..2617685a3d 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashTable.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashTable.scala
@@ -5,7 +5,7 @@ package parallel.mutable
import collection.mutable.HashEntry
-
+import collection.parallel.ParIterableIterator
@@ -13,9 +13,134 @@ import collection.mutable.HashEntry
* enriching the data structure by fulfilling certain requirements
* for their parallel construction and iteration.
*/
-trait ParHashTable[K] {
+trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.mutable.HashTable[K, Entry] {
+
+ // always initialize size map
+ if (!isSizeMapDefined) sizeMapInitAndRebuild
+
+ /** A parallel iterator returning all the entries.
+ */
+ abstract class EntryIterator[T, +IterRepr <: ParIterableIterator[T]]
+ (private var idx: Int, private val until: Int, private val totalsize: Int, private var es: Entry)
+ extends ParIterableIterator[T] {
+ private val itertable = table
+ private var traversed = 0
+ scan()
+
+ def entry2item(e: Entry): T
+ def newIterator(idxFrom: Int, idxUntil: Int, totalSize: Int, es: Entry): IterRepr
+
+ def hasNext = es != null
+
+ def next = {
+ val res = es
+ es = es.next
+ scan()
+ traversed += 1
+ entry2item(res)
+ }
+
+ def scan() {
+ while (es == null && idx < until) {
+ es = itertable(idx).asInstanceOf[Entry]
+ idx = idx + 1
+ }
+ }
+
+ def remaining = totalsize - traversed
+
+ def split: Seq[ParIterableIterator[T]] = if (remaining > 1) {
+ if ((until - idx) > 1) {
+ // there is at least one more slot for the next iterator
+ // divide the rest of the table
+ val divsz = (until - idx) / 2
+
+ // second iterator params
+ val sidx = idx + divsz
+ val suntil = until
+ val ses = itertable(sidx).asInstanceOf[Entry]
+ val stotal = calcNumElems(sidx, suntil)
+
+ // first iterator params
+ val fidx = idx
+ val funtil = idx + divsz
+ val fes = es
+ val ftotal = totalsize - stotal
+
+ Seq(
+ newIterator(fidx, funtil, ftotal, fes),
+ newIterator(sidx, suntil, stotal, ses)
+ )
+ } else {
+ // otherwise, this is the last entry in the table - all what remains is the chain
+ // so split the rest of the chain
+ val arr = convertToArrayBuffer(es)
+ val arrpit = new collection.parallel.BufferIterator[T](arr, 0, arr.length, signalDelegate)
+ arrpit.split
+ }
+ } else Seq(this.asInstanceOf[IterRepr])
+
+ private def convertToArrayBuffer(chainhead: Entry): mutable.ArrayBuffer[T] = {
+ var buff = mutable.ArrayBuffer[Entry]()
+ var curr = chainhead
+ while (curr != null) {
+ buff += curr
+ curr = curr.next
+ }
+ buff map { e => entry2item(e) }
+ }
+
+ private def calcNumElems(from: Int, until: Int) = {
+ // find the first bucket
+ val fbindex = from / sizeMapBucketSize
+
+ // find the last bucket
+ val lbindex = from / sizeMapBucketSize
+
+ if (fbindex == lbindex) {
+ // if first and last are the same, just count between `from` and `until`
+ // return this count
+ countElems(from, until)
+ } else {
+ // otherwise count in first, then count in last
+ val fbuntil = ((fbindex + 1) * sizeMapBucketSize) min itertable.length
+ val fbcount = countElems(from, fbuntil)
+ val lbstart = lbindex * sizeMapBucketSize
+ val lbcount = countElems(lbstart, until)
+
+ // and finally count the elements in all the buckets between first and last using a sizemap
+ val inbetween = countBucketSizes(fbindex + 1, lbindex)
+
+ // return the sum
+ fbcount + inbetween + lbcount
+ }
+ }
+
+ private def countElems(from: Int, until: Int) = {
+ var c = 0
+ var idx = from
+ var es: Entry = null
+ while (idx < until) {
+ es = itertable(idx).asInstanceOf[Entry]
+ while (es ne null) {
+ c += 1
+ es = es.next
+ }
+ idx += 1
+ }
+ c
+ }
- protected type Entry >: Null <: HashEntry[K, Entry]
+ private def countBucketSizes(fromBucket: Int, untilBucket: Int) = {
+ var c = 0
+ var idx = fromBucket
+ while (idx < untilBucket) {
+ c += sizemap(idx)
+ idx += 1
+ }
+ c
+ }
+ }
}
diff --git a/src/library/scala/collection/parallel/mutable/ParMap.scala b/src/library/scala/collection/parallel/mutable/ParMap.scala
index 63342fa1bc..cb6014289d 100644
--- a/src/library/scala/collection/parallel/mutable/ParMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParMap.scala
@@ -18,16 +18,16 @@ extends collection.mutable.Map[K, V]
override def mapCompanion: GenericParMapCompanion[ParMap] = ParMap
- override def empty: ParMap[K, V] = null // TODO
+ override def empty: ParMap[K, V] = new ParHashMap[K, V]
}
object ParMap extends ParMapFactory[ParMap] {
- def empty[K, V]: ParMap[K, V] = null // TODO
+ def empty[K, V]: ParMap[K, V] = new ParHashMap[K, V]
- def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = null // TODO
+ def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = ParHashMapCombiner.apply[K, V]
implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParMap[K, V]] = new CanCombineFromMap[K, V]
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 76677a1148..a30d564039 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -7,13 +7,21 @@ import scala.collection.generic.CanBuildFrom
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.mutable.ParArray
+import annotation.unchecked.uncheckedVariance
+
/** Package object for parallel collections.
*/
package object parallel {
- val MIN_FOR_COPY = -1 // TODO: set to 5000
+
+ /* constants */
+ val MIN_FOR_COPY = -1
val CHECK_RATE = 512
val SQRT2 = math.sqrt(2)
+ val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors
+ private[parallel] val unrolledsize = 16
+
+ /* functions */
/** Computes threshold from the size of the collection and the parallelism level.
*/
@@ -23,11 +31,136 @@ package object parallel {
else sz
}
- val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors
+ private[parallel] def unsupported(msg: String) = throw new UnsupportedOperationException(msg)
+
+ private[parallel] def unsupported = throw new UnsupportedOperationException
+
+ /* classes */
- def unsupported(msg: String) = throw new UnsupportedOperationException(msg)
+ /** Unrolled list node.
+ */
+ private[parallel] class Unrolled[T: ClassManifest] {
+ var size = 0
+ var array = new Array[T](unrolledsize)
+ var next: Unrolled[T] = null
+ // adds and returns itself or the new unrolled if full
+ def add(elem: T): Unrolled[T] = if (size < unrolledsize) {
+ array(size) = elem
+ size += 1
+ this
+ } else {
+ next = new Unrolled[T]
+ next.add(elem)
+ }
+ def foreach[U](f: T => U) {
+ var unrolled = this
+ var i = 0
+ while (unrolled ne null) {
+ val chunkarr = unrolled.array
+ val chunksz = unrolled.size
+ while (i < chunksz) {
+ val elem = chunkarr(i)
+ f(elem)
+ i += 1
+ }
+ i = 0
+ unrolled = unrolled.next
+ }
+ }
+ override def toString = array.mkString("Unrolled(", ", ", ")")
+ }
+
+ /** A helper iterator for iterating very small array buffers.
+ * Automatically forwards the signal delegate when splitting.
+ */
+ private[parallel] class BufferIterator[T]
+ (private val buffer: collection.mutable.ArrayBuffer[T], private var index: Int, private val until: Int, var signalDelegate: collection.generic.Signalling)
+ extends ParIterableIterator[T] {
+ def hasNext = index < until
+ def next = {
+ val r = buffer(index)
+ index += 1
+ r
+ }
+ def remaining = until - index
+ def split: Seq[ParIterableIterator[T]] = if (remaining > 1) {
+ val divsz = (until - index) / 2
+ Seq(
+ new BufferIterator(buffer, index, index + divsz, signalDelegate),
+ new BufferIterator(buffer, index + divsz, until, signalDelegate)
+ )
+ } else Seq(this)
+ }
+
+ /** A helper combiner which contains an array of buckets. Buckets themselves
+ * are unrolled linked lists. Some parallel collections are constructed by
+ * sorting their result set according to some criteria.
+ *
+ * A reference `heads` to bucket heads is maintained, as well as a reference
+ * `lasts` to the last unrolled list node. Size is kept in `sz` and maintained
+ * whenever 2 bucket combiners are combined.
+ *
+ * Clients decide how to maintain these by implementing `+=` and `result`.
+ * Populating and using the buckets is up to the client.
+ * Note that in general the type of the elements contained in the buckets `Buck`
+ * doesn't have to correspond to combiner element type `Elem`.
+ *
+ * This class simply gives an efficient `combine` for free - it chains
+ * the buckets together. Since the `combine` contract states that the receiver (`this`)
+ * becomes invalidated, `combine` reuses the receiver and returns it.
+ *
+ * Methods `beforeCombine` and `afterCombine` are called before and after
+ * combining the buckets, respectively, given that the argument to `combine`
+ * is not `this` (as required by the `combine` contract).
+ * They can be overriden in subclasses to provide custom behaviour by modifying
+ * the receiver (which will be the return value).
+ */
+ private[parallel] abstract class BucketCombiner[-Elem, +To, Buck, +CombinerType <: BucketCombiner[Elem, To, Buck, CombinerType]]
+ (private val bucketnumber: Int)
+ extends Combiner[Elem, To] {
+ self: EnvironmentPassingCombiner[Elem, To] =>
+ protected var heads: Array[Unrolled[Buck]] @uncheckedVariance = new Array[Unrolled[Buck]](bucketnumber)
+ protected var lasts: Array[Unrolled[Buck]] @uncheckedVariance = new Array[Unrolled[Buck]](bucketnumber)
+ protected var sz: Int = 0
+
+ def size = sz
+
+ def clear = {
+ heads = new Array[Unrolled[Buck]](bucketnumber)
+ lasts = new Array[Unrolled[Buck]](bucketnumber)
+ sz = 0
+ }
+
+ def beforeCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
+ def afterCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
+
+ def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
+ if (other.isInstanceOf[BucketCombiner[_, _, _, _]]) {
+ beforeCombine(other)
+
+ val that = other.asInstanceOf[BucketCombiner[Elem, To, Buck, CombinerType]]
+ var i = 0
+ while (i < bucketnumber) {
+ if (lasts(i) eq null) {
+ heads(i) = that.heads(i)
+ lasts(i) = that.lasts(i)
+ } else {
+ lasts(i).next = that.heads(i)
+ if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
+ }
+ i += 1
+ }
+ sz = sz + that.size
+
+ afterCombine(other)
+
+ this
+ } else error("Unexpected combiner type.")
+ } else this
+
+ }
- def unsupported = throw new UnsupportedOperationException
+ /* implicit conversions */
/** An implicit conversion providing arrays with a `par` method, which
* returns a parallel array.