summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Prokopec <axel22@gmail.com>2012-02-03 15:05:56 +0100
committerAleksandar Prokopec <axel22@gmail.com>2012-02-03 15:15:57 +0100
commit86946630e9a1240fb9a378b2ec62e78b521f4320 (patch)
treeb731aacfed51a534bca9ca1c3eae21f960af765e
parent2d9dfe3077fa2b43a336548cad98a522215c52a9 (diff)
downloadscala-86946630e9a1240fb9a378b2ec62e78b521f4320.tar.gz
scala-86946630e9a1240fb9a378b2ec62e78b521f4320.tar.bz2
scala-86946630e9a1240fb9a378b2ec62e78b521f4320.zip
Fix some issues in parallel Ctrie.
This change resolves some issues with ParCtrie splitters and their `remaining` method, which currently evaluates the size of the Ctrie. Since this is still not done lazily, nor in parallel, it has a certain cost, which is unacceptable. Change #1: The `shouldSplitFurther` method is by default implemented by calling the `remaining` method. This method now forwards the call to the same method in the splitter which is by default implemented in the same way as before, but can be overridden by custom collections such as the ParCtrie. Change #2: ParCtrie splitter now has a `level` member which just counts how many times the method has been split. This information is used to override the default `shouldSplitFurther` implementation. Change #3: The tasks and splitters rely heavily on the `remaining` method in the splitter for most operations. There is an additional method called `isRemainingCheap` which returns true by default, but can be overridden by custom collections such as the `Ctrie`.
-rw-r--r--src/library/scala/collection/mutable/Ctrie.scala24
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala27
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala27
-rw-r--r--src/library/scala/collection/parallel/mutable/ParCtrie.scala20
-rw-r--r--test/benchmarking/ParCtrie-map.scala21
-rw-r--r--test/benchmarking/TreeSetInsert.scala2
6 files changed, 75 insertions, 46 deletions
diff --git a/src/library/scala/collection/mutable/Ctrie.scala b/src/library/scala/collection/mutable/Ctrie.scala
index e1a72d9511..6ed3a516c4 100644
--- a/src/library/scala/collection/mutable/Ctrie.scala
+++ b/src/library/scala/collection/mutable/Ctrie.scala
@@ -13,6 +13,7 @@ package mutable
import java.util.concurrent.atomic._
import collection.immutable.{ ListMap => ImmutableListMap }
+import collection.parallel.mutable.ParCtrie
import generic._
import annotation.tailrec
import annotation.switch
@@ -578,6 +579,8 @@ private[mutable] case class RDCSS_Descriptor[K, V](old: INode[K, V], expectedmai
* iterator and clear operations. The cost of evaluating the (lazy) snapshot is
* distributed across subsequent updates, thus making snapshot evaluation horizontally scalable.
*
+ * For details, see: http://lampwww.epfl.ch/~prokopec/ctries-snapshot.pdf
+ *
* @author Aleksandar Prokopec
* @since 2.10
*/
@@ -585,6 +588,7 @@ private[mutable] case class RDCSS_Descriptor[K, V](old: INode[K, V], expectedmai
final class Ctrie[K, V] private (r: AnyRef, rtupd: AtomicReferenceFieldUpdater[Ctrie[K, V], AnyRef])
extends ConcurrentMap[K, V]
with MapLike[K, V, Ctrie[K, V]]
+ with CustomParallelizable[(K, V), ParCtrie[K, V]]
with Serializable
{
import Ctrie.computeHash
@@ -710,6 +714,10 @@ extends ConcurrentMap[K, V]
/* public methods */
+ override def seq = this
+
+ override def par = new ParCtrie(this)
+
override def empty: Ctrie[K, V] = new Ctrie[K, V]
@inline final def isReadOnly = rootupdater eq null
@@ -820,7 +828,7 @@ extends ConcurrentMap[K, V]
def iterator: Iterator[(K, V)] =
if (nonReadOnly) readOnlySnapshot().iterator
- else new CtrieIterator(this)
+ else new CtrieIterator(0, this)
override def stringPrefix = "Ctrie"
@@ -844,7 +852,7 @@ object Ctrie extends MutableMapFactory[Ctrie] {
}
-private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] {
+private[collection] class CtrieIterator[K, V](var level: Int, ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] {
var stack = new Array[Array[BasicNode]](7)
var stackpos = new Array[Int](7)
var depth = -1
@@ -910,7 +918,7 @@ private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean
}
} else current = null
- protected def newIterator(_ct: Ctrie[K, V], _mustInit: Boolean) = new CtrieIterator[K, V](_ct, _mustInit)
+ protected def newIterator(_lev: Int, _ct: Ctrie[K, V], _mustInit: Boolean) = new CtrieIterator[K, V](_lev, _ct, _mustInit)
/** Returns a sequence of iterators over subsets of this iterator.
* It's used to ease the implementation of splitters for a parallel version of the Ctrie.
@@ -920,8 +928,12 @@ private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean
val it = subiter
subiter = null
advance()
+ this.level += 1
Seq(it, this)
- } else if (depth == -1) Seq(this) else {
+ } else if (depth == -1) {
+ this.level += 1
+ Seq(this)
+ } else {
var d = 0
while (d <= depth) {
val rem = stack(d).length - 1 - stackpos(d)
@@ -929,15 +941,17 @@ private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean
val (arr1, arr2) = stack(d).drop(stackpos(d) + 1).splitAt(rem / 2)
stack(d) = arr1
stackpos(d) = -1
- val it = newIterator(ct, false)
+ val it = newIterator(level + 1, ct, false)
it.stack(0) = arr2
it.stackpos(0) = -1
it.depth = 0
it.advance() // <-- fix it
+ this.level += 1
return Seq(this, it)
}
d += 1
}
+ this.level += 1
Seq(this)
}
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 32e0e8a8ed..7c5a835e56 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -96,17 +96,6 @@ import annotation.unchecked.uncheckedVariance
* The combination of methods `toMap`, `toSeq` or `toSet` along with `par` and `seq` is a flexible
* way to change between different collection types.
*
- * The method:
- *
- * {{{
- * def threshold(sz: Int, p: Int): Int
- * }}}
- *
- * provides an estimate on the minimum number of elements the collection has before
- * the splitting stops and depends on the number of elements in the collection. A rule of the
- * thumb is the number of elements divided by 8 times the parallelism level. This method may
- * be overridden in concrete implementations if necessary.
- *
* Since this trait extends the `Iterable` trait, methods like `size` must also
* be implemented in concrete collections, while `iterator` forwards to `splitter` by
* default.
@@ -206,18 +195,6 @@ self: ParIterableLike[T, Repr, Sequential] =>
*/
def isStrictSplitterCollection = true
- /** Some minimal number of elements after which this collection should be handled
- * sequentially by different processors.
- *
- * This method depends on the size of the collection and the parallelism level, which
- * are both specified as arguments.
- *
- * @param sz the size based on which to compute the threshold
- * @param p the parallelism level based on which to compute the threshold
- * @return the maximum number of elements for performing operations sequentially
- */
- def threshold(sz: Int, p: Int): Int = thresholdFromSize(sz, p)
-
/** The `newBuilder` operation returns a parallel builder assigned to this collection's fork/join pool.
* This method forwards the call to `newCombiner`.
*/
@@ -833,7 +810,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
extends StrictSplitterCheckTask[R, Tp] {
protected[this] val pit: IterableSplitter[T]
protected[this] def newSubtask(p: IterableSplitter[T]): Accessor[R, Tp]
- def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel)
+ def shouldSplitFurther = pit.shouldSplitFurther(self.repr, parallelismLevel)
def split = pit.splitWithSignalling.map(newSubtask(_)) // default split procedure
private[parallel] override def signalAbort = pit.abort
override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")(supername: " + super.toString + ")"
@@ -1362,7 +1339,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
/* scan tree */
- protected[this] def scanBlockSize = (threshold(size, parallelismLevel) / 2) max 1
+ protected[this] def scanBlockSize = (thresholdFromSize(size, parallelismLevel) / 2) max 1
protected[this] trait ScanTree[U >: T] {
def beginsAt: Int
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index e8b332da89..8ed4583419 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -28,6 +28,11 @@ private[collection] trait RemainsIterator[+T] extends Iterator[T] {
* This method doesn't change the state of the iterator.
*/
def remaining: Int
+
+ /** For most collections, this is a cheap operation.
+ * Exceptions can override this method.
+ */
+ def isRemainingCheap = true
}
@@ -112,7 +117,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
//val cb = pbf(repr)
- cb.sizeHint(remaining)
+ if (isRemainingCheap) cb.sizeHint(remaining)
while (hasNext) cb += f(next)
cb
}
@@ -137,7 +142,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
}
def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](b: Bld): Bld = {
- b.sizeHint(remaining)
+ if (isRemainingCheap) b.sizeHint(remaining)
while (hasNext) b += next
b
}
@@ -179,7 +184,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
def drop2combiner[U >: T, This](n: Int, cb: Combiner[U, This]): Combiner[U, This] = {
drop(n)
- cb.sizeHint(remaining)
+ if (isRemainingCheap) cb.sizeHint(remaining)
while (hasNext) cb += next
cb
}
@@ -197,7 +202,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
def splitAt2combiners[U >: T, This](at: Int, before: Combiner[U, This], after: Combiner[U, This]) = {
before.sizeHint(at)
- after.sizeHint(remaining - at)
+ if (isRemainingCheap) after.sizeHint(remaining - at)
var left = at
while (left > 0) {
before += next
@@ -223,7 +228,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
val curr = next
if (p(curr)) before += curr
else {
- after.sizeHint(remaining + 1)
+ if (isRemainingCheap) after.sizeHint(remaining + 1)
after += curr
isBefore = false
}
@@ -263,7 +268,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
}
def zip2combiner[U >: T, S, That](otherpit: RemainsIterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
- cb.sizeHint(remaining min otherpit.remaining)
+ if (isRemainingCheap && otherpit.isRemainingCheap) cb.sizeHint(remaining min otherpit.remaining)
while (hasNext && otherpit.hasNext) {
cb += ((next, otherpit.next))
}
@@ -271,7 +276,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
}
def zipAll2combiner[U >: T, S, That](that: RemainsIterator[S], thiselem: U, thatelem: S, cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
- cb.sizeHint(remaining max that.remaining)
+ if (isRemainingCheap && that.isRemainingCheap) cb.sizeHint(remaining max that.remaining)
while (this.hasNext && that.hasNext) cb += ((this.next, that.next))
while (this.hasNext) cb += ((this.next, thatelem))
while (that.hasNext) cb += ((thiselem, that.next))
@@ -330,7 +335,7 @@ private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIter
/* transformers */
def reverse2combiner[U >: T, This](cb: Combiner[U, This]): Combiner[U, This] = {
- cb.sizeHint(remaining)
+ if (isRemainingCheap) cb.sizeHint(remaining)
var lst = List[T]()
while (hasNext) lst ::= next
while (lst != Nil) {
@@ -342,7 +347,7 @@ private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIter
def reverseMap2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
//val cb = cbf(repr)
- cb.sizeHint(remaining)
+ if (isRemainingCheap) cb.sizeHint(remaining)
var lst = List[S]()
while (hasNext) lst ::= f(next)
while (lst != Nil) {
@@ -354,7 +359,7 @@ private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIter
def updated2combiner[U >: T, That](index: Int, elem: U, cb: Combiner[U, That]): Combiner[U, That] = {
//val cb = cbf(repr)
- cb.sizeHint(remaining)
+ if (isRemainingCheap) cb.sizeHint(remaining)
var j = 0
while (hasNext) {
if (j == index) {
@@ -395,6 +400,8 @@ self =>
pits
}
+ def shouldSplitFurther[S](coll: ParIterable[S], parallelismLevel: Int) = remaining > thresholdFromSize(coll.size, parallelismLevel)
+
/** The number of elements this iterator has yet to traverse. This method
* doesn't change the state of the iterator.
*
diff --git a/src/library/scala/collection/parallel/mutable/ParCtrie.scala b/src/library/scala/collection/parallel/mutable/ParCtrie.scala
index d8c060e719..86624500fd 100644
--- a/src/library/scala/collection/parallel/mutable/ParCtrie.scala
+++ b/src/library/scala/collection/parallel/mutable/ParCtrie.scala
@@ -27,7 +27,7 @@ import scala.collection.mutable.CtrieIterator
* @author Aleksandar Prokopec
* @since 2.10
*/
-final class ParCtrie[K, V] private[mutable] (private val ctrie: Ctrie[K, V])
+final class ParCtrie[K, V] private[collection] (private val ctrie: Ctrie[K, V])
extends ParMap[K, V]
with GenericParMapTemplate[K, V, ParCtrie]
with ParMapLike[K, V, ParCtrie[K, V], Ctrie[K, V]]
@@ -45,7 +45,7 @@ extends ParMap[K, V]
override def seq = ctrie
- def splitter = new ParCtrieSplitter(ctrie.readOnlySnapshot().asInstanceOf[Ctrie[K, V]], true)
+ def splitter = new ParCtrieSplitter(0, ctrie.readOnlySnapshot().asInstanceOf[Ctrie[K, V]], true)
override def size = ctrie.size
@@ -76,15 +76,21 @@ extends ParMap[K, V]
}
-private[collection] class ParCtrieSplitter[K, V](ct: Ctrie[K, V], mustInit: Boolean)
-extends CtrieIterator[K, V](ct, mustInit)
+private[collection] class ParCtrieSplitter[K, V](lev: Int, ct: Ctrie[K, V], mustInit: Boolean)
+extends CtrieIterator[K, V](lev, ct, mustInit)
with IterableSplitter[(K, V)]
{
// only evaluated if `remaining` is invoked (which is not used by most tasks)
- lazy val totalsize = ct.iterator.size // TODO improve to lazily compute sizes
+ //lazy val totalsize = ct.iterator.size /* TODO improve to lazily compute sizes */
+ def totalsize: Int = throw new UnsupportedOperationException
var iterated = 0
- protected override def newIterator(_ct: Ctrie[K, V], _mustInit: Boolean) = new ParCtrieSplitter[K, V](_ct, _mustInit)
+ protected override def newIterator(_lev: Int, _ct: Ctrie[K, V], _mustInit: Boolean) = new ParCtrieSplitter[K, V](_lev, _ct, _mustInit)
+
+ override def shouldSplitFurther[S](coll: collection.parallel.ParIterable[S], parallelismLevel: Int) = {
+ val maxsplits = 3 + Integer.highestOneBit(parallelismLevel)
+ level < maxsplits
+ }
def dup = null // TODO necessary for views
@@ -95,6 +101,8 @@ extends CtrieIterator[K, V](ct, mustInit)
def split: Seq[IterableSplitter[(K, V)]] = subdivide().asInstanceOf[Seq[IterableSplitter[(K, V)]]]
+ override def isRemainingCheap = false
+
def remaining: Int = totalsize - iterated
}
diff --git a/test/benchmarking/ParCtrie-map.scala b/test/benchmarking/ParCtrie-map.scala
new file mode 100644
index 0000000000..c8de99f33e
--- /dev/null
+++ b/test/benchmarking/ParCtrie-map.scala
@@ -0,0 +1,21 @@
+
+
+
+import collection.parallel.mutable.ParCtrie
+
+
+
+object Map extends testing.Benchmark {
+ val length = sys.props("length").toInt
+ val par = sys.props("par").toInt
+ val parctrie = ParCtrie((0 until length) zip (0 until length): _*)
+
+ collection.parallel.ForkJoinTasks.defaultForkJoinPool.setParallelism(par)
+
+ def run = {
+ parctrie map {
+ kv => kv
+ }
+ }
+}
+
diff --git a/test/benchmarking/TreeSetInsert.scala b/test/benchmarking/TreeSetInsert.scala
index 9ede8aedc5..23444aa305 100644
--- a/test/benchmarking/TreeSetInsert.scala
+++ b/test/benchmarking/TreeSetInsert.scala
@@ -33,6 +33,7 @@ object JavaUtilTS extends testing.Benchmark {
}
}
+
object MutableTS extends testing.Benchmark {
val length = sys.props("length").toInt
var data: Array[Dummy] = (0 until length) map { a => new Dummy(a) } toArray
@@ -50,6 +51,7 @@ object MutableTS extends testing.Benchmark {
}
}
+
object ImmutableTS extends testing.Benchmark {
val length = sys.props("length").toInt
var data: Array[Dummy] = (0 until length) map { a => new Dummy(a) } toArray