summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Prokopec <axel22@gmail.com>2012-02-15 15:37:42 +0100
committerAleksandar Prokopec <axel22@gmail.com>2012-02-15 15:38:06 +0100
commitfe6c9e3f1693f2e6db5ae69517893894bbac6afb (patch)
treef90deb499f8ffa009eb26e55ce7c233098b010cc
parentada6771679aa63e8aa57a294dfb268b2a7a32df4 (diff)
downloadscala-fe6c9e3f1693f2e6db5ae69517893894bbac6afb.tar.gz
scala-fe6c9e3f1693f2e6db5ae69517893894bbac6afb.tar.bz2
scala-fe6c9e3f1693f2e6db5ae69517893894bbac6afb.zip
Add parallel size computation for ParCtrie.
Also modified size computation for Ctrie so that concurrent `size` invocations can be parallelized more efficiently.
-rw-r--r--src/library/scala/collection/mutable/BasicNode.java2
-rw-r--r--src/library/scala/collection/mutable/Ctrie.scala19
-rw-r--r--src/library/scala/collection/parallel/mutable/ParCtrie.scala47
-rw-r--r--test/files/run/ctries/concmap.scala19
4 files changed, 78 insertions, 9 deletions
diff --git a/src/library/scala/collection/mutable/BasicNode.java b/src/library/scala/collection/mutable/BasicNode.java
index b934aed24f..c05009470a 100644
--- a/src/library/scala/collection/mutable/BasicNode.java
+++ b/src/library/scala/collection/mutable/BasicNode.java
@@ -13,7 +13,7 @@ package scala.collection.mutable;
-abstract class BasicNode {
+public abstract class BasicNode {
public abstract String string(int lev);
diff --git a/src/library/scala/collection/mutable/Ctrie.scala b/src/library/scala/collection/mutable/Ctrie.scala
index f208d6555e..9a8f4bf276 100644
--- a/src/library/scala/collection/mutable/Ctrie.scala
+++ b/src/library/scala/collection/mutable/Ctrie.scala
@@ -20,7 +20,7 @@ import annotation.switch
-private[mutable] final class INode[K, V](bn: MainNode[K, V], g: Gen) extends INodeBase[K, V](g) {
+private[collection] final class INode[K, V](bn: MainNode[K, V], g: Gen) extends INodeBase[K, V](g) {
import INodeBase._
WRITE(bn)
@@ -405,7 +405,7 @@ private[mutable] trait KVNode[K, V] {
}
-private[mutable] final class SNode[K, V](final val k: K, final val v: V, final val hc: Int)
+private[collection] final class SNode[K, V](final val k: K, final val v: V, final val hc: Int)
extends BasicNode with KVNode[K, V] {
final def copy = new SNode(k, v, hc)
final def copyTombed = new TNode(k, v, hc)
@@ -415,7 +415,7 @@ extends BasicNode with KVNode[K, V] {
}
-private[mutable] final class TNode[K, V](final val k: K, final val v: V, final val hc: Int)
+private[collection] final class TNode[K, V](final val k: K, final val v: V, final val hc: Int)
extends MainNode[K, V] with KVNode[K, V] {
final def copy = new TNode(k, v, hc)
final def copyTombed = new TNode(k, v, hc)
@@ -426,7 +426,7 @@ extends MainNode[K, V] with KVNode[K, V] {
}
-private[mutable] final class LNode[K, V](final val listmap: ImmutableListMap[K, V])
+private[collection] final class LNode[K, V](final val listmap: ImmutableListMap[K, V])
extends MainNode[K, V] {
def this(k: K, v: V) = this(ImmutableListMap(k -> v))
def this(k1: K, v1: V, k2: K, v2: V) = this(ImmutableListMap(k1 -> v1, k2 -> v2))
@@ -445,7 +445,7 @@ extends MainNode[K, V] {
}
-private[mutable] final class CNode[K, V](final val bitmap: Int, final val array: Array[BasicNode], final val gen: Gen)
+private[collection] final class CNode[K, V](final val bitmap: Int, final val array: Array[BasicNode], final val gen: Gen)
extends CNodeBase[K, V] {
// this should only be called from within read-only snapshots
@@ -459,11 +459,18 @@ extends CNodeBase[K, V] {
}
}
+ // lends itself towards being parallelizable by choosing
+ // a random starting offset in the array
+ // => if there are concurrent size computations, they start
+ // at different positions, so they are more likely to
+ // to be independent
private def computeSize(ct: Ctrie[K, V]): Int = {
var i = 0
var sz = 0
+ val offset = math.abs(util.Random.nextInt()) % array.length
while (i < array.length) {
- array(i) match {
+ val pos = (i + offset) % array.length
+ array(pos) match {
case sn: SNode[_, _] => sz += 1
case in: INode[K, V] => sz += in.cachedSize(ct)
}
diff --git a/src/library/scala/collection/parallel/mutable/ParCtrie.scala b/src/library/scala/collection/parallel/mutable/ParCtrie.scala
index 37add60df9..1e11b85da5 100644
--- a/src/library/scala/collection/parallel/mutable/ParCtrie.scala
+++ b/src/library/scala/collection/parallel/mutable/ParCtrie.scala
@@ -13,6 +13,12 @@ package scala.collection.parallel.mutable
import scala.collection.generic._
import scala.collection.parallel.Combiner
import scala.collection.parallel.IterableSplitter
+import scala.collection.mutable.BasicNode
+import scala.collection.mutable.TNode
+import scala.collection.mutable.LNode
+import scala.collection.mutable.CNode
+import scala.collection.mutable.SNode
+import scala.collection.mutable.INode
import scala.collection.mutable.Ctrie
import scala.collection.mutable.CtrieIterator
@@ -34,6 +40,7 @@ extends ParMap[K, V]
with ParCtrieCombiner[K, V]
with Serializable
{
+ import collection.parallel.tasksupport._
def this() = this(new Ctrie)
@@ -47,8 +54,6 @@ extends ParMap[K, V]
def splitter = new ParCtrieSplitter(0, ctrie.readOnlySnapshot().asInstanceOf[Ctrie[K, V]], true)
- override def size = ctrie.size
-
override def clear() = ctrie.clear()
def result = this
@@ -71,8 +76,46 @@ extends ParMap[K, V]
this
}
+ override def size = {
+ val in = ctrie.RDCSS_READ_ROOT()
+ val r = in.GCAS_READ(ctrie)
+ r match {
+ case tn: TNode[_, _] => tn.cachedSize(ctrie)
+ case ln: LNode[_, _] => ln.cachedSize(ctrie)
+ case cn: CNode[_, _] =>
+ executeAndWaitResult(new Size(0, cn.array.length, cn.array))
+ cn.cachedSize(ctrie)
+ }
+ }
+
override def stringPrefix = "ParCtrie"
+ /* tasks */
+
+ /** Computes Ctrie size in parallel. */
+ class Size(offset: Int, howmany: Int, array: Array[BasicNode]) extends Task[Int, Size] {
+ var result = -1
+ def leaf(prev: Option[Int]) = {
+ var sz = 0
+ var i = offset
+ val until = offset + howmany
+ while (i < until) {
+ array(i) match {
+ case sn: SNode[_, _] => sz += 1
+ case in: INode[K, V] => sz += in.cachedSize(ctrie)
+ }
+ i += 1
+ }
+ result = sz
+ }
+ def split = {
+ val fp = howmany / 2
+ Seq(new Size(offset, fp, array), new Size(offset + fp, howmany - fp, array))
+ }
+ def shouldSplitFurther = howmany > 1
+ override def merge(that: Size) = result = result + that.result
+ }
+
}
diff --git a/test/files/run/ctries/concmap.scala b/test/files/run/ctries/concmap.scala
index 85a305ce5b..d73e33182a 100644
--- a/test/files/run/ctries/concmap.scala
+++ b/test/files/run/ctries/concmap.scala
@@ -164,6 +164,25 @@ object ConcurrentMapSpec extends Spec {
for (i <- 0 until sz) assertEqual(ct.get(new Wrap(i)), None)
}
+ "compute size correctly" in {
+ val ct = new Ctrie[Wrap, Int]
+ val sz = 36450
+ for (i <- 0 until sz) ct(new Wrap(i)) = i
+
+ assertEqual(ct.size, sz)
+ assertEqual(ct.size, sz)
+ }
+
+ "compute size correctly in parallel" in {
+ val ct = new Ctrie[Wrap, Int]
+ val sz = 36450
+ for (i <- 0 until sz) ct(new Wrap(i)) = i
+ val pct = ct.par
+
+ assertEqual(pct.size, sz)
+ assertEqual(pct.size, sz)
+ }
+
}
}