summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala')
-rw-r--r--src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala131
1 files changed, 120 insertions, 11 deletions
diff --git a/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala b/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala
index e29e9dfa98..a9e08913ea 100644
--- a/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala
+++ b/src/library/scala/collection/parallel/immutable/ParallelHashTrie.scala
@@ -21,6 +21,10 @@ import scala.collection.immutable.HashMap
+/** Parallel hash trie map.
+ *
+ * @author prokopec
+ */
class ParallelHashTrie[K, +V] private[immutable] (private[this] val trie: HashMap[K, V])
extends ParallelMap[K, V]
with GenericParallelMapTemplate[K, V, ParallelHashTrie]
@@ -87,7 +91,7 @@ object ParallelHashTrie extends ParallelMapFactory[ParallelHashTrie] {
new CanCombineFromMap[K, V]
}
- def fromTrie[K, V](trie: HashMap[K, V]): ParallelHashTrie[K, V] = new ParallelHashTrie(trie)
+ def fromTrie[K, V](t: HashMap[K, V]) = new ParallelHashTrie(t)
var totalcombines = new java.util.concurrent.atomic.AtomicInteger(0)
}
@@ -96,31 +100,136 @@ object ParallelHashTrie extends ParallelMapFactory[ParallelHashTrie] {
trait HashTrieCombiner[K, V]
extends Combiner[(K, V), ParallelHashTrie[K, V]] {
self: EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] =>
- private var trie: HashMap[K, V] = HashMap.empty[K, V]
-
- def size: Int = trie.size
-
- def clear = trie = HashMap.empty[K, V]
+ import HashTrieCombiner._
+ var heads = new Array[Unrolled[K, V]](rootsize)
+ var lasts = new Array[Unrolled[K, V]](rootsize)
+ var size: Int = 0
+
+ def clear = {
+ heads = new Array[Unrolled[K, V]](rootsize)
+ lasts = new Array[Unrolled[K, V]](rootsize)
+ }
- def +=(elem: (K, V)) = { trie += elem; this }
+ def +=(elem: (K, V)) = {
+ size += 1
+ val hc = elem._1.##
+ val pos = hc & 0x1f
+ if (lasts(pos) eq null) {
+ // initialize bucket
+ heads(pos) = new Unrolled[K, V]
+ lasts(pos) = heads(pos)
+ }
+ // add to bucket
+ lasts(pos) = lasts(pos).add(elem)
+ this
+ }
def combine[N <: (K, V), NewTo >: ParallelHashTrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
// ParallelHashTrie.totalcombines.incrementAndGet
if (other.isInstanceOf[HashTrieCombiner[_, _]]) {
val that = other.asInstanceOf[HashTrieCombiner[K, V]]
- val ncombiner = HashTrieCombiner[K, V]
- ncombiner.trie = this.trie merge that.trie
- ncombiner
+ var i = 0
+ while (i < rootsize) {
+ if (lasts(i) eq null) {
+ heads(i) = that.heads(i)
+ lasts(i) = that.lasts(i)
+ } else {
+ lasts(i).next = that.heads(i)
+ if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
+ }
+ i += 1
+ }
+ size = size + that.size
+ this
} else error("Unexpected combiner type.")
} else this
- def result = new ParallelHashTrie[K, V](trie)
+ def result = {
+ val buckets = heads.filter(_ != null)
+ val root = new Array[HashMap[K, V]](buckets.length)
+
+ executeAndWait(new CreateTrie(buckets, root, 0, buckets.length))
+
+ var bitmap = 0
+ var i = 0
+ while (i < rootsize) {
+ if (heads(i) ne null) bitmap |= 1 << i
+ i += 1
+ }
+ val sz = root.foldLeft(0)(_ + _.size)
+
+ if (sz == 0) new ParallelHashTrie[K, V]
+ else if (sz == 1) new ParallelHashTrie[K, V](root(0))
+ else {
+ val trie = new HashMap.HashTrieMap(bitmap, root, sz)
+ new ParallelHashTrie[K, V](trie)
+ }
+ }
+
+ /* tasks */
+
+ class CreateTrie(buckets: Array[Unrolled[K, V]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
+ var result = ()
+ def leaf(prev: Option[Unit]) = {
+ var i = offset
+ val until = offset + howmany
+ while (i < until) {
+ root(i) = createTrie(buckets(i))
+ i += 1
+ }
+ }
+ private def createTrie(elems: Unrolled[K, V]): HashMap[K, V] = {
+ var trie = new HashMap[K, V]
+
+ var unrolled = elems
+ var i = 0
+ while (unrolled ne null) {
+ val chunkarr = unrolled.array
+ val chunksz = unrolled.size
+ while (i < chunksz) {
+ val kv = chunkarr(i)
+ val hc = kv._1.##
+ trie = trie.updated0(kv._1, hc, rootbits, kv._2, kv)
+ i += 1
+ }
+ i = 0
+ unrolled = unrolled.next
+ }
+
+ trie
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new CreateTrie(buckets, root, offset, fp), new CreateTrie(buckets, root, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(root.length, parallelismLevel)
+ }
}
object HashTrieCombiner {
def apply[K, V] = new HashTrieCombiner[K, V] with EnvironmentPassingCombiner[(K, V), ParallelHashTrie[K, V]] {}
+
+ private[immutable] val rootbits = 5
+ private[immutable] val rootsize = 1 << 5
+ private[immutable] val unrolledsize = 16
+
+ private[immutable] class Unrolled[K, V] {
+ var size = 0
+ var array = new Array[(K, V)](unrolledsize)
+ var next: Unrolled[K, V] = null
+ // adds and returns itself or the new unrolled if full
+ def add(elem: (K, V)): Unrolled[K, V] = if (size < unrolledsize) {
+ array(size) = elem
+ size += 1
+ this
+ } else {
+ next = new Unrolled[K, V]
+ next.add(elem)
+ }
+ override def toString = "Unrolled(" + array.mkString(", ") + ")"
+ }
}