diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-10-20 20:19:56 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-10-20 20:19:56 +0000 |
commit | d3d218e5ea77584489437f0dfa8148ee3764d6f7 (patch) | |
tree | 881fba9234da6654e8d914c8b56ddadd100c5cba /src/library/scala/collection/parallel | |
parent | d13a2529aa8218836d13ee04303da4f3325933c2 (diff) | |
download | scala-d3d218e5ea77584489437f0dfa8148ee3764d6f7.tar.gz scala-d3d218e5ea77584489437f0dfa8148ee3764d6f7.tar.bz2 scala-d3d218e5ea77584489437f0dfa8148ee3764d6f7.zip |
Further work on parallel mutable hash maps.
Changed HashTable interface.
Fixed one test.
Implemented hash map iterators.
Implementing hash map combiners.
Extracting common functionalities of bucket-based combiners.
No review.
Diffstat (limited to 'src/library/scala/collection/parallel')
9 files changed, 454 insertions, 115 deletions
diff --git a/src/library/scala/collection/parallel/Combiner.scala b/src/library/scala/collection/parallel/Combiner.scala index a37f642d42..f47f92457f 100644 --- a/src/library/scala/collection/parallel/Combiner.scala +++ b/src/library/scala/collection/parallel/Combiner.scala @@ -19,7 +19,7 @@ import scala.collection.generic.Sizing * @author prokopec */ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel with TaskSupport { - self: EnvironmentPassingCombiner[Elem, To] => +self: EnvironmentPassingCombiner[Elem, To] => type EPC = EnvironmentPassingCombiner[Elem, To] @@ -35,7 +35,13 @@ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel w * if they are to be used again. * * Also, combining two combiners `c1` and `c2` for which `c1 eq c2` is `true`, that is, - * they are the same objects in memories, always does nothing and returns the first combiner. + * they are the same objects in memory: + * + * {{{ + * c1.combine(c2) + * }}} + * + * always does nothing and returns `c1`. * * @tparam N the type of elements contained by the `other` builder * @tparam NewTo the type of collection produced by the `other` builder @@ -50,7 +56,7 @@ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel w trait EnvironmentPassingCombiner[-Elem, +To] extends Combiner[Elem, To] { abstract override def result = { val res = super.result -// res.environment = environment + if (res.isInstanceOf[TaskSupport]) res.asInstanceOf[TaskSupport].environment = environment res } } diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 4d95043c3a..881ab80038 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -141,7 +141,7 @@ self => me: SignalContextPassingIterator[ParIterator] => var signalDelegate: Signalling = IdleSignalling def repr = self.repr - def split: Seq[ParIterator] + def split: Seq[ParIterableIterator[T]] } /** A stackable modification that ensures signal contexts get passed along the iterators. @@ -489,7 +489,9 @@ self => override def take(n: Int): Repr = { val actualn = if (size > n) n else size if (actualn < MIN_FOR_COPY) take_sequential(actualn) - else executeAndWaitResult(new Take(actualn, cbfactory, parallelIterator) mapResult { _.result }) + else executeAndWaitResult(new Take(actualn, cbfactory, parallelIterator) mapResult { + _.result + }) } private def take_sequential(n: Int) = { diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index 306ec68548..37b52b7a40 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -11,6 +11,7 @@ import scala.collection.parallel.ParMapLike import scala.collection.parallel.Combiner import scala.collection.parallel.ParIterableIterator import scala.collection.parallel.EnvironmentPassingCombiner +import scala.collection.parallel.Unrolled import scala.collection.generic.ParMapFactory import scala.collection.generic.CanCombineFrom import scala.collection.generic.GenericParMapTemplate @@ -105,21 +106,13 @@ object ParHashMap extends ParMapFactory[ParHashMap] { } -private[immutable] trait HashMapCombiner[K, V] -extends Combiner[(K, V), ParHashMap[K, V]] { +private[immutable] abstract class HashMapCombiner[K, V] +extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), HashMapCombiner[K, V]](HashMapCombiner.rootsize) { self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => import HashMapCombiner._ - var heads = new Array[Unrolled[(K, V)]](rootsize) - var lasts = new Array[Unrolled[(K, V)]](rootsize) - var size: Int = 0 - - def clear = { - heads = new Array[Unrolled[(K, V)]](rootsize) - lasts = new Array[Unrolled[(K, V)]](rootsize) - } def +=(elem: (K, V)) = { - size += 1 + sz += 1 val hc = elem._1.## val pos = hc & 0x1f if (lasts(pos) eq null) { @@ -132,26 +125,6 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => this } - def combine[N <: (K, V), NewTo >: ParHashMap[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { - // ParHashMap.totalcombines.incrementAndGet - if (other.isInstanceOf[HashMapCombiner[_, _]]) { - val that = other.asInstanceOf[HashMapCombiner[K, V]] - var i = 0 - while (i < rootsize) { - if (lasts(i) eq null) { - heads(i) = that.heads(i) - lasts(i) = that.lasts(i) - } else { - lasts(i).next = that.heads(i) - if (that.lasts(i) ne null) lasts(i) = that.lasts(i) - } - i += 1 - } - size = size + that.size - this - } else error("Unexpected combiner type.") - } else this - def result = { val buckets = heads.filter(_ != null) val root = new Array[HashMap[K, V]](buckets.length) @@ -216,8 +189,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => } -object HashMapCombiner { - def apply[K, V] = new HashMapCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] {} +private[parallel] object HashMapCombiner { + def apply[K, V] = new HashMapCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] private[immutable] val rootbits = 5 private[immutable] val rootsize = 1 << 5 diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala index 0ef2681567..c9554ae1eb 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala @@ -11,6 +11,7 @@ import scala.collection.parallel.ParSetLike import scala.collection.parallel.Combiner import scala.collection.parallel.ParIterableIterator import scala.collection.parallel.EnvironmentPassingCombiner +import scala.collection.parallel.Unrolled import scala.collection.generic.ParSetFactory import scala.collection.generic.CanCombineFrom import scala.collection.generic.GenericParTemplate @@ -101,21 +102,13 @@ object ParHashSet extends ParSetFactory[ParHashSet] { } -private[immutable] trait HashSetCombiner[T] -extends Combiner[T, ParHashSet[T]] { +private[immutable] abstract class HashSetCombiner[T] +extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombiner[T]](HashSetCombiner.rootsize) { self: EnvironmentPassingCombiner[T, ParHashSet[T]] => import HashSetCombiner._ - var heads = new Array[Unrolled[Any]](rootsize) - var lasts = new Array[Unrolled[Any]](rootsize) - var size: Int = 0 - - def clear = { - heads = new Array[Unrolled[Any]](rootsize) - lasts = new Array[Unrolled[Any]](rootsize) - } def +=(elem: T) = { - size += 1 + sz += 1 val hc = elem.## val pos = hc & 0x1f if (lasts(pos) eq null) { @@ -128,25 +121,6 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] => this } - def combine[N <: T, NewTo >: ParHashSet[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { - if (other.isInstanceOf[HashSetCombiner[_]]) { - val that = other.asInstanceOf[HashSetCombiner[T]] - var i = 0 - while (i < rootsize) { - if (lasts(i) eq null) { - heads(i) = that.heads(i) - lasts(i) = that.lasts(i) - } else { - lasts(i).next = that.heads(i) - if (that.lasts(i) ne null) lasts(i) = that.lasts(i) - } - i += 1 - } - size = size + that.size - this - } else error("Unexpected combiner type.") - } else this - def result = { val buckets = heads.filter(_ != null) val root = new Array[HashSet[T]](buckets.length) @@ -171,7 +145,8 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] => /* tasks */ - class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] { + class CreateTrie(buckets: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int) + extends super.Task[Unit, CreateTrie] { var result = () def leaf(prev: Option[Unit]) = { var i = offset @@ -274,6 +249,3 @@ object HashSetCombiner { - - - diff --git a/src/library/scala/collection/parallel/immutable/package.scala b/src/library/scala/collection/parallel/immutable/package.scala index c54875ecd3..9c6bbae8dd 100644 --- a/src/library/scala/collection/parallel/immutable/package.scala +++ b/src/library/scala/collection/parallel/immutable/package.scala @@ -15,25 +15,9 @@ package object immutable { /* package level methods */ def repetition[T](elem: T, len: Int) = new Repetition(elem, len) - /* properties */ - private[immutable] val unrolledsize = 16 + /* constants */ /* classes */ - private[immutable] class Unrolled[T: ClassManifest] { - var size = 0 - var array = new Array[T](unrolledsize) - var next: Unrolled[T] = null - // adds and returns itself or the new unrolled if full - def add(elem: T): Unrolled[T] = if (size < unrolledsize) { - array(size) = elem - size += 1 - this - } else { - next = new Unrolled[T] - next.add(elem) - } - override def toString = "Unrolled(" + array.mkString(", ") + ")" - } /** A (parallel) sequence consisting of `length` elements `elem`. Used in the `padTo` method. * @@ -42,8 +26,7 @@ package object immutable { * @param length the length of the collection */ private[parallel] class Repetition[T](elem: T, val length: Int) extends ParSeq[T] { - self => - + self => def apply(idx: Int) = if (0 <= idx && idx < length) elem else throw new IndexOutOfBoundsException def seq = throw new UnsupportedOperationException def update(idx: Int, elem: T) = throw new UnsupportedOperationException diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala index 057faa66e1..fb4119bddc 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala @@ -5,53 +5,198 @@ package mutable import collection.generic._ +import collection.mutable.DefaultEntry +import collection.mutable.HashEntry +import collection.mutable.HashTable - - -class ParHashMap[K, V] +class ParHashMap[K, V] private[collection] (contents: HashTable.Contents[K, DefaultEntry[K, V]]) extends ParMap[K, V] with GenericParMapTemplate[K, V, ParHashMap] with ParMapLike[K, V, ParHashMap[K, V], collection.mutable.HashMap[K, V]] + with ParHashTable[K, DefaultEntry[K, V]] { self => + initWithContents(contents) + + type Entry = collection.mutable.DefaultEntry[K, V] + + def this() = this(null) override def mapCompanion: GenericParMapCompanion[ParHashMap] = ParHashMap override def empty: ParHashMap[K, V] = new ParHashMap[K, V] - def parallelIterator = null // TODO + def seq = new collection.mutable.HashMap[K, V](hashTableContents) - def seq = null // TODO + def parallelIterator = new ParHashMapIterator(0, table.length, size, table(0).asInstanceOf[DefaultEntry[K, V]]) with SCPI - def get(k: K): Option[V] = null // TODO + override def size = tableSize - def +=(kv: (K, V)) = null // TODO + def get(key: K): Option[V] = { + val e = findEntry(key) + if (e == null) None + else Some(e.value) + } - def -=(k: K) = null // TODO + override def put(key: K, value: V): Option[V] = { + val e = findEntry(key) + if (e == null) { addEntry(new Entry(key, value)); None } + else { val v = e.value; e.value = value; Some(v) } + } - override def size = 0 // TODO + override def update(key: K, value: V): Unit = put(key, value) -} + override def remove(key: K): Option[V] = { + val e = removeEntry(key) + if (e ne null) Some(e.value) + else None + } + def += (kv: (K, V)): this.type = { + val e = findEntry(kv._1) + if (e == null) addEntry(new Entry(kv._1, kv._2)) + else e.value = kv._2 + this + } + def -=(key: K): this.type = { removeEntry(key); this } + type SCPI = SignalContextPassingIterator[ParHashMapIterator] -object ParHashMap extends ParMapFactory[ParHashMap] { + class ParHashMapIterator(start: Int, untilIdx: Int, totalSize: Int, e: DefaultEntry[K, V]) + extends EntryIterator[(K, V), ParHashMapIterator](start, untilIdx, totalSize, e) with ParIterator { + me: SCPI => + def entry2item(entry: DefaultEntry[K, V]) = (entry.key, entry.value); + def newIterator(idxFrom: Int, idxUntil: Int, totalSz: Int, es: DefaultEntry[K, V]) = + new ParHashMapIterator(idxFrom, idxUntil, totalSz, es) with SCPI + } - def empty[K, V]: ParHashMap[K, V] = new ParHashMap[K, V] +} - def newCombiner[K, V]: Combiner[(K, V), ParHashMap[K, V]] = null // TODO - implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashMap[K, V]] = null // TODO +object ParHashMap extends ParMapFactory[ParHashMap] { + def empty[K, V]: ParHashMap[K, V] = new ParHashMap[K, V] + def newCombiner[K, V]: Combiner[(K, V), ParHashMap[K, V]] = ParHashMapCombiner.apply[K, V] + + implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParHashMap[K, V]] = new CanCombineFromMap[K, V] } +private[mutable] abstract class ParHashMapCombiner[K, V](private val tableLoadFactor: Int) +extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntry[K, V], ParHashMapCombiner[K, V]](ParHashMapCombiner.numblocks) { +self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => + private var mask = ParHashMapCombiner.discriminantmask + + def +=(elem: (K, V)) = { + sz += 1 + val hc = elem._1.## + val pos = hc & mask + if (lasts(pos) eq null) { + // initialize bucket + heads(pos) = new Unrolled[DefaultEntry[K, V]] + lasts(pos) = heads(pos) + } + // add to bucket + lasts(pos) = lasts(pos).add(new DefaultEntry(elem._1, elem._2)) + this + } + + def result: ParHashMap[K, V] = { + // construct table + val table = new AddingHashTable(size, tableLoadFactor) + + executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks)) + + val c = table.hashTableContents + new ParHashMap(c) + } + + /** A hash table which will never resize itself. Knowing the number of elements in advance, + * it allocates the table of the required size when created. + * + * Entries are added using the `insertEntry` method. This method checks whether the element + * exists and updates the size map. + */ + class AddingHashTable(numelems: Int, lf: Int) extends HashTable[K, DefaultEntry[K, V]] { + import HashTable._ + _loadFactor = lf + table = new Array[HashEntry[K, DefaultEntry[K, V]]](capacity(sizeForThreshold(_loadFactor, numelems))) + tableSize = 0 + threshold = newThreshold(_loadFactor, table.length) + sizeMapInit(table.length) + def insertEntry(e: DefaultEntry[K, V]) { + var h = index(elemHashCode(e.key)) + var olde = table(h).asInstanceOf[DefaultEntry[K, V]] + + // check if key already exists + var ce = olde + while (ce ne null) { + if (ce.key == e.key) { + h = -1 + ce = null + } else ce = ce.next + } + + // if key does not already exist + if (h != -1) { + e.next = olde + table(h) = e + tableSize = tableSize + 1 + nnSizeMapAdd(h) + } + } + } + + /* tasks */ + + class FillBlocks(buckets: Array[Unrolled[DefaultEntry[K, V]]], table: AddingHashTable, offset: Int, howmany: Int) + extends super.Task[Unit, FillBlocks] { + var result = () + def leaf(prev: Option[Unit]) = { + var i = offset + val until = offset + howmany + while (i < until) { + fillBlock(buckets(i)) + i += 1 + } + } + private def fillBlock(elems: Unrolled[DefaultEntry[K, V]]) { + var unrolled = elems + var i = 0 + val t = table + while (unrolled ne null) { + val chunkarr = unrolled.array + val chunksz = unrolled.size + while (i < chunksz) { + val elem = chunkarr(i) + t.insertEntry(elem) + i += 1 + } + i = 0 + unrolled = unrolled.next + } + } + def split = { + val fp = howmany / 2 + List(new FillBlocks(buckets, table, offset, fp), new FillBlocks(buckets, table, offset + fp, howmany - fp)) + } + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, parallelismLevel) + } +} +private[mutable] object ParHashMapCombiner { + private[mutable] val discriminantbits = 5 + private[mutable] val numblocks = 1 << discriminantbits + private[mutable] val discriminantmask = ((1 << discriminantbits) - 1) << (32 - discriminantbits) + + def apply[K, V] = new ParHashMapCombiner[K, V](HashTable.defaultLoadFactor) with EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] +} + diff --git a/src/library/scala/collection/parallel/mutable/ParHashTable.scala b/src/library/scala/collection/parallel/mutable/ParHashTable.scala index 9e356b7fb4..2617685a3d 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashTable.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashTable.scala @@ -5,7 +5,7 @@ package parallel.mutable import collection.mutable.HashEntry - +import collection.parallel.ParIterableIterator @@ -13,9 +13,134 @@ import collection.mutable.HashEntry * enriching the data structure by fulfilling certain requirements * for their parallel construction and iteration. */ -trait ParHashTable[K] { +trait ParHashTable[K, Entry >: Null <: HashEntry[K, Entry]] extends collection.mutable.HashTable[K, Entry] { + + // always initialize size map + if (!isSizeMapDefined) sizeMapInitAndRebuild + + /** A parallel iterator returning all the entries. + */ + abstract class EntryIterator[T, +IterRepr <: ParIterableIterator[T]] + (private var idx: Int, private val until: Int, private val totalsize: Int, private var es: Entry) + extends ParIterableIterator[T] { + private val itertable = table + private var traversed = 0 + scan() + + def entry2item(e: Entry): T + def newIterator(idxFrom: Int, idxUntil: Int, totalSize: Int, es: Entry): IterRepr + + def hasNext = es != null + + def next = { + val res = es + es = es.next + scan() + traversed += 1 + entry2item(res) + } + + def scan() { + while (es == null && idx < until) { + es = itertable(idx).asInstanceOf[Entry] + idx = idx + 1 + } + } + + def remaining = totalsize - traversed + + def split: Seq[ParIterableIterator[T]] = if (remaining > 1) { + if ((until - idx) > 1) { + // there is at least one more slot for the next iterator + // divide the rest of the table + val divsz = (until - idx) / 2 + + // second iterator params + val sidx = idx + divsz + val suntil = until + val ses = itertable(sidx).asInstanceOf[Entry] + val stotal = calcNumElems(sidx, suntil) + + // first iterator params + val fidx = idx + val funtil = idx + divsz + val fes = es + val ftotal = totalsize - stotal + + Seq( + newIterator(fidx, funtil, ftotal, fes), + newIterator(sidx, suntil, stotal, ses) + ) + } else { + // otherwise, this is the last entry in the table - all what remains is the chain + // so split the rest of the chain + val arr = convertToArrayBuffer(es) + val arrpit = new collection.parallel.BufferIterator[T](arr, 0, arr.length, signalDelegate) + arrpit.split + } + } else Seq(this.asInstanceOf[IterRepr]) + + private def convertToArrayBuffer(chainhead: Entry): mutable.ArrayBuffer[T] = { + var buff = mutable.ArrayBuffer[Entry]() + var curr = chainhead + while (curr != null) { + buff += curr + curr = curr.next + } + buff map { e => entry2item(e) } + } + + private def calcNumElems(from: Int, until: Int) = { + // find the first bucket + val fbindex = from / sizeMapBucketSize + + // find the last bucket + val lbindex = from / sizeMapBucketSize + + if (fbindex == lbindex) { + // if first and last are the same, just count between `from` and `until` + // return this count + countElems(from, until) + } else { + // otherwise count in first, then count in last + val fbuntil = ((fbindex + 1) * sizeMapBucketSize) min itertable.length + val fbcount = countElems(from, fbuntil) + val lbstart = lbindex * sizeMapBucketSize + val lbcount = countElems(lbstart, until) + + // and finally count the elements in all the buckets between first and last using a sizemap + val inbetween = countBucketSizes(fbindex + 1, lbindex) + + // return the sum + fbcount + inbetween + lbcount + } + } + + private def countElems(from: Int, until: Int) = { + var c = 0 + var idx = from + var es: Entry = null + while (idx < until) { + es = itertable(idx).asInstanceOf[Entry] + while (es ne null) { + c += 1 + es = es.next + } + idx += 1 + } + c + } - protected type Entry >: Null <: HashEntry[K, Entry] + private def countBucketSizes(fromBucket: Int, untilBucket: Int) = { + var c = 0 + var idx = fromBucket + while (idx < untilBucket) { + c += sizemap(idx) + idx += 1 + } + c + } + } } diff --git a/src/library/scala/collection/parallel/mutable/ParMap.scala b/src/library/scala/collection/parallel/mutable/ParMap.scala index 63342fa1bc..cb6014289d 100644 --- a/src/library/scala/collection/parallel/mutable/ParMap.scala +++ b/src/library/scala/collection/parallel/mutable/ParMap.scala @@ -18,16 +18,16 @@ extends collection.mutable.Map[K, V] override def mapCompanion: GenericParMapCompanion[ParMap] = ParMap - override def empty: ParMap[K, V] = null // TODO + override def empty: ParMap[K, V] = new ParHashMap[K, V] } object ParMap extends ParMapFactory[ParMap] { - def empty[K, V]: ParMap[K, V] = null // TODO + def empty[K, V]: ParMap[K, V] = new ParHashMap[K, V] - def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = null // TODO + def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = ParHashMapCombiner.apply[K, V] implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParMap[K, V]] = new CanCombineFromMap[K, V] diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index 76677a1148..a30d564039 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -7,13 +7,21 @@ import scala.collection.generic.CanBuildFrom import scala.collection.generic.CanCombineFrom import scala.collection.parallel.mutable.ParArray +import annotation.unchecked.uncheckedVariance + /** Package object for parallel collections. */ package object parallel { - val MIN_FOR_COPY = -1 // TODO: set to 5000 + + /* constants */ + val MIN_FOR_COPY = -1 val CHECK_RATE = 512 val SQRT2 = math.sqrt(2) + val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors + private[parallel] val unrolledsize = 16 + + /* functions */ /** Computes threshold from the size of the collection and the parallelism level. */ @@ -23,11 +31,136 @@ package object parallel { else sz } - val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors + private[parallel] def unsupported(msg: String) = throw new UnsupportedOperationException(msg) + + private[parallel] def unsupported = throw new UnsupportedOperationException + + /* classes */ - def unsupported(msg: String) = throw new UnsupportedOperationException(msg) + /** Unrolled list node. + */ + private[parallel] class Unrolled[T: ClassManifest] { + var size = 0 + var array = new Array[T](unrolledsize) + var next: Unrolled[T] = null + // adds and returns itself or the new unrolled if full + def add(elem: T): Unrolled[T] = if (size < unrolledsize) { + array(size) = elem + size += 1 + this + } else { + next = new Unrolled[T] + next.add(elem) + } + def foreach[U](f: T => U) { + var unrolled = this + var i = 0 + while (unrolled ne null) { + val chunkarr = unrolled.array + val chunksz = unrolled.size + while (i < chunksz) { + val elem = chunkarr(i) + f(elem) + i += 1 + } + i = 0 + unrolled = unrolled.next + } + } + override def toString = array.mkString("Unrolled(", ", ", ")") + } + + /** A helper iterator for iterating very small array buffers. + * Automatically forwards the signal delegate when splitting. + */ + private[parallel] class BufferIterator[T] + (private val buffer: collection.mutable.ArrayBuffer[T], private var index: Int, private val until: Int, var signalDelegate: collection.generic.Signalling) + extends ParIterableIterator[T] { + def hasNext = index < until + def next = { + val r = buffer(index) + index += 1 + r + } + def remaining = until - index + def split: Seq[ParIterableIterator[T]] = if (remaining > 1) { + val divsz = (until - index) / 2 + Seq( + new BufferIterator(buffer, index, index + divsz, signalDelegate), + new BufferIterator(buffer, index + divsz, until, signalDelegate) + ) + } else Seq(this) + } + + /** A helper combiner which contains an array of buckets. Buckets themselves + * are unrolled linked lists. Some parallel collections are constructed by + * sorting their result set according to some criteria. + * + * A reference `heads` to bucket heads is maintained, as well as a reference + * `lasts` to the last unrolled list node. Size is kept in `sz` and maintained + * whenever 2 bucket combiners are combined. + * + * Clients decide how to maintain these by implementing `+=` and `result`. + * Populating and using the buckets is up to the client. + * Note that in general the type of the elements contained in the buckets `Buck` + * doesn't have to correspond to combiner element type `Elem`. + * + * This class simply gives an efficient `combine` for free - it chains + * the buckets together. Since the `combine` contract states that the receiver (`this`) + * becomes invalidated, `combine` reuses the receiver and returns it. + * + * Methods `beforeCombine` and `afterCombine` are called before and after + * combining the buckets, respectively, given that the argument to `combine` + * is not `this` (as required by the `combine` contract). + * They can be overriden in subclasses to provide custom behaviour by modifying + * the receiver (which will be the return value). + */ + private[parallel] abstract class BucketCombiner[-Elem, +To, Buck, +CombinerType <: BucketCombiner[Elem, To, Buck, CombinerType]] + (private val bucketnumber: Int) + extends Combiner[Elem, To] { + self: EnvironmentPassingCombiner[Elem, To] => + protected var heads: Array[Unrolled[Buck]] @uncheckedVariance = new Array[Unrolled[Buck]](bucketnumber) + protected var lasts: Array[Unrolled[Buck]] @uncheckedVariance = new Array[Unrolled[Buck]](bucketnumber) + protected var sz: Int = 0 + + def size = sz + + def clear = { + heads = new Array[Unrolled[Buck]](bucketnumber) + lasts = new Array[Unrolled[Buck]](bucketnumber) + sz = 0 + } + + def beforeCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {} + def afterCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {} + + def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) { + if (other.isInstanceOf[BucketCombiner[_, _, _, _]]) { + beforeCombine(other) + + val that = other.asInstanceOf[BucketCombiner[Elem, To, Buck, CombinerType]] + var i = 0 + while (i < bucketnumber) { + if (lasts(i) eq null) { + heads(i) = that.heads(i) + lasts(i) = that.lasts(i) + } else { + lasts(i).next = that.heads(i) + if (that.lasts(i) ne null) lasts(i) = that.lasts(i) + } + i += 1 + } + sz = sz + that.size + + afterCombine(other) + + this + } else error("Unexpected combiner type.") + } else this + + } - def unsupported = throw new UnsupportedOperationException + /* implicit conversions */ /** An implicit conversion providing arrays with a `par` method, which * returns a parallel array. |