summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/collection/mutable/Ctrie.scala24
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala27
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala27
-rw-r--r--src/library/scala/collection/parallel/mutable/ParCtrie.scala20
-rw-r--r--test/benchmarking/ParCtrie-map.scala21
-rw-r--r--test/benchmarking/TreeSetInsert.scala2
6 files changed, 75 insertions, 46 deletions
diff --git a/src/library/scala/collection/mutable/Ctrie.scala b/src/library/scala/collection/mutable/Ctrie.scala
index e1a72d9511..6ed3a516c4 100644
--- a/src/library/scala/collection/mutable/Ctrie.scala
+++ b/src/library/scala/collection/mutable/Ctrie.scala
@@ -13,6 +13,7 @@ package mutable
import java.util.concurrent.atomic._
import collection.immutable.{ ListMap => ImmutableListMap }
+import collection.parallel.mutable.ParCtrie
import generic._
import annotation.tailrec
import annotation.switch
@@ -578,6 +579,8 @@ private[mutable] case class RDCSS_Descriptor[K, V](old: INode[K, V], expectedmai
* iterator and clear operations. The cost of evaluating the (lazy) snapshot is
* distributed across subsequent updates, thus making snapshot evaluation horizontally scalable.
*
+ * For details, see: http://lampwww.epfl.ch/~prokopec/ctries-snapshot.pdf
+ *
* @author Aleksandar Prokopec
* @since 2.10
*/
@@ -585,6 +588,7 @@ private[mutable] case class RDCSS_Descriptor[K, V](old: INode[K, V], expectedmai
final class Ctrie[K, V] private (r: AnyRef, rtupd: AtomicReferenceFieldUpdater[Ctrie[K, V], AnyRef])
extends ConcurrentMap[K, V]
with MapLike[K, V, Ctrie[K, V]]
+ with CustomParallelizable[(K, V), ParCtrie[K, V]]
with Serializable
{
import Ctrie.computeHash
@@ -710,6 +714,10 @@ extends ConcurrentMap[K, V]
/* public methods */
+ override def seq = this
+
+ override def par = new ParCtrie(this)
+
override def empty: Ctrie[K, V] = new Ctrie[K, V]
@inline final def isReadOnly = rootupdater eq null
@@ -820,7 +828,7 @@ extends ConcurrentMap[K, V]
def iterator: Iterator[(K, V)] =
if (nonReadOnly) readOnlySnapshot().iterator
- else new CtrieIterator(this)
+ else new CtrieIterator(0, this)
override def stringPrefix = "Ctrie"
@@ -844,7 +852,7 @@ object Ctrie extends MutableMapFactory[Ctrie] {
}
-private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] {
+private[collection] class CtrieIterator[K, V](var level: Int, ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] {
var stack = new Array[Array[BasicNode]](7)
var stackpos = new Array[Int](7)
var depth = -1
@@ -910,7 +918,7 @@ private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean
}
} else current = null
- protected def newIterator(_ct: Ctrie[K, V], _mustInit: Boolean) = new CtrieIterator[K, V](_ct, _mustInit)
+ protected def newIterator(_lev: Int, _ct: Ctrie[K, V], _mustInit: Boolean) = new CtrieIterator[K, V](_lev, _ct, _mustInit)
/** Returns a sequence of iterators over subsets of this iterator.
* It's used to ease the implementation of splitters for a parallel version of the Ctrie.
@@ -920,8 +928,12 @@ private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean
val it = subiter
subiter = null
advance()
+ this.level += 1
Seq(it, this)
- } else if (depth == -1) Seq(this) else {
+ } else if (depth == -1) {
+ this.level += 1
+ Seq(this)
+ } else {
var d = 0
while (d <= depth) {
val rem = stack(d).length - 1 - stackpos(d)
@@ -929,15 +941,17 @@ private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean
val (arr1, arr2) = stack(d).drop(stackpos(d) + 1).splitAt(rem / 2)
stack(d) = arr1
stackpos(d) = -1
- val it = newIterator(ct, false)
+ val it = newIterator(level + 1, ct, false)
it.stack(0) = arr2
it.stackpos(0) = -1
it.depth = 0
it.advance() // <-- fix it
+ this.level += 1
return Seq(this, it)
}
d += 1
}
+ this.level += 1
Seq(this)
}
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 32e0e8a8ed..7c5a835e56 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -96,17 +96,6 @@ import annotation.unchecked.uncheckedVariance
* The combination of methods `toMap`, `toSeq` or `toSet` along with `par` and `seq` is a flexible
* way to change between different collection types.
*
- * The method:
- *
- * {{{
- * def threshold(sz: Int, p: Int): Int
- * }}}
- *
- * provides an estimate on the minimum number of elements the collection has before
- * the splitting stops and depends on the number of elements in the collection. A rule of the
- * thumb is the number of elements divided by 8 times the parallelism level. This method may
- * be overridden in concrete implementations if necessary.
- *
* Since this trait extends the `Iterable` trait, methods like `size` must also
* be implemented in concrete collections, while `iterator` forwards to `splitter` by
* default.
@@ -206,18 +195,6 @@ self: ParIterableLike[T, Repr, Sequential] =>
*/
def isStrictSplitterCollection = true
- /** Some minimal number of elements after which this collection should be handled
- * sequentially by different processors.
- *
- * This method depends on the size of the collection and the parallelism level, which
- * are both specified as arguments.
- *
- * @param sz the size based on which to compute the threshold
- * @param p the parallelism level based on which to compute the threshold
- * @return the maximum number of elements for performing operations sequentially
- */
- def threshold(sz: Int, p: Int): Int = thresholdFromSize(sz, p)
-
/** The `newBuilder` operation returns a parallel builder assigned to this collection's fork/join pool.
* This method forwards the call to `newCombiner`.
*/
@@ -833,7 +810,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
extends StrictSplitterCheckTask[R, Tp] {
protected[this] val pit: IterableSplitter[T]
protected[this] def newSubtask(p: IterableSplitter[T]): Accessor[R, Tp]
- def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel)
+ def shouldSplitFurther = pit.shouldSplitFurther(self.repr, parallelismLevel)
def split = pit.splitWithSignalling.map(newSubtask(_)) // default split procedure
private[parallel] override def signalAbort = pit.abort
override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")(supername: " + super.toString + ")"
@@ -1362,7 +1339,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
/* scan tree */
- protected[this] def scanBlockSize = (threshold(size, parallelismLevel) / 2) max 1
+ protected[this] def scanBlockSize = (thresholdFromSize(size, parallelismLevel) / 2) max 1
protected[this] trait ScanTree[U >: T] {
def beginsAt: Int
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index e8b332da89..8ed4583419 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -28,6 +28,11 @@ private[collection] trait RemainsIterator[+T] extends Iterator[T] {
* This method doesn't change the state of the iterator.
*/
def remaining: Int
+
+ /** For most collections, this is a cheap operation.
+ * Exceptions can override this method.
+ */
+ def isRemainingCheap = true
}
@@ -112,7 +117,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
//val cb = pbf(repr)
- cb.sizeHint(remaining)
+ if (isRemainingCheap) cb.sizeHint(remaining)
while (hasNext) cb += f(next)
cb
}
@@ -137,7 +142,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
}
def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](b: Bld): Bld = {
- b.sizeHint(remaining)
+ if (isRemainingCheap) b.sizeHint(remaining)
while (hasNext) b += next
b
}
@@ -179,7 +184,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
def drop2combiner[U >: T, This](n: Int, cb: Combiner[U, This]): Combiner[U, This] = {
drop(n)
- cb.sizeHint(remaining)
+ if (isRemainingCheap) cb.sizeHint(remaining)
while (hasNext) cb += next
cb
}
@@ -197,7 +202,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
def splitAt2combiners[U >: T, This](at: Int, before: Combiner[U, This], after: Combiner[U, This]) = {
before.sizeHint(at)
- after.sizeHint(remaining - at)
+ if (isRemainingCheap) after.sizeHint(remaining - at)
var left = at
while (left > 0) {
before += next
@@ -223,7 +228,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
val curr = next
if (p(curr)) before += curr
else {
- after.sizeHint(remaining + 1)
+ if (isRemainingCheap) after.sizeHint(remaining + 1)
after += curr
isBefore = false
}
@@ -263,7 +268,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
}
def zip2combiner[U >: T, S, That](otherpit: RemainsIterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
- cb.sizeHint(remaining min otherpit.remaining)
+ if (isRemainingCheap && otherpit.isRemainingCheap) cb.sizeHint(remaining min otherpit.remaining)
while (hasNext && otherpit.hasNext) {
cb += ((next, otherpit.next))
}
@@ -271,7 +276,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[
}
def zipAll2combiner[U >: T, S, That](that: RemainsIterator[S], thiselem: U, thatelem: S, cb: Combiner[(U, S), That]): Combiner[(U, S), That] = {
- cb.sizeHint(remaining max that.remaining)
+ if (isRemainingCheap && that.isRemainingCheap) cb.sizeHint(remaining max that.remaining)
while (this.hasNext && that.hasNext) cb += ((this.next, that.next))
while (this.hasNext) cb += ((this.next, thatelem))
while (that.hasNext) cb += ((thiselem, that.next))
@@ -330,7 +335,7 @@ private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIter
/* transformers */
def reverse2combiner[U >: T, This](cb: Combiner[U, This]): Combiner[U, This] = {
- cb.sizeHint(remaining)
+ if (isRemainingCheap) cb.sizeHint(remaining)
var lst = List[T]()
while (hasNext) lst ::= next
while (lst != Nil) {
@@ -342,7 +347,7 @@ private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIter
def reverseMap2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
//val cb = cbf(repr)
- cb.sizeHint(remaining)
+ if (isRemainingCheap) cb.sizeHint(remaining)
var lst = List[S]()
while (hasNext) lst ::= f(next)
while (lst != Nil) {
@@ -354,7 +359,7 @@ private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIter
def updated2combiner[U >: T, That](index: Int, elem: U, cb: Combiner[U, That]): Combiner[U, That] = {
//val cb = cbf(repr)
- cb.sizeHint(remaining)
+ if (isRemainingCheap) cb.sizeHint(remaining)
var j = 0
while (hasNext) {
if (j == index) {
@@ -395,6 +400,8 @@ self =>
pits
}
+ def shouldSplitFurther[S](coll: ParIterable[S], parallelismLevel: Int) = remaining > thresholdFromSize(coll.size, parallelismLevel)
+
/** The number of elements this iterator has yet to traverse. This method
* doesn't change the state of the iterator.
*
diff --git a/src/library/scala/collection/parallel/mutable/ParCtrie.scala b/src/library/scala/collection/parallel/mutable/ParCtrie.scala
index d8c060e719..86624500fd 100644
--- a/src/library/scala/collection/parallel/mutable/ParCtrie.scala
+++ b/src/library/scala/collection/parallel/mutable/ParCtrie.scala
@@ -27,7 +27,7 @@ import scala.collection.mutable.CtrieIterator
* @author Aleksandar Prokopec
* @since 2.10
*/
-final class ParCtrie[K, V] private[mutable] (private val ctrie: Ctrie[K, V])
+final class ParCtrie[K, V] private[collection] (private val ctrie: Ctrie[K, V])
extends ParMap[K, V]
with GenericParMapTemplate[K, V, ParCtrie]
with ParMapLike[K, V, ParCtrie[K, V], Ctrie[K, V]]
@@ -45,7 +45,7 @@ extends ParMap[K, V]
override def seq = ctrie
- def splitter = new ParCtrieSplitter(ctrie.readOnlySnapshot().asInstanceOf[Ctrie[K, V]], true)
+ def splitter = new ParCtrieSplitter(0, ctrie.readOnlySnapshot().asInstanceOf[Ctrie[K, V]], true)
override def size = ctrie.size
@@ -76,15 +76,21 @@ extends ParMap[K, V]
}
-private[collection] class ParCtrieSplitter[K, V](ct: Ctrie[K, V], mustInit: Boolean)
-extends CtrieIterator[K, V](ct, mustInit)
+private[collection] class ParCtrieSplitter[K, V](lev: Int, ct: Ctrie[K, V], mustInit: Boolean)
+extends CtrieIterator[K, V](lev, ct, mustInit)
with IterableSplitter[(K, V)]
{
// only evaluated if `remaining` is invoked (which is not used by most tasks)
- lazy val totalsize = ct.iterator.size // TODO improve to lazily compute sizes
+ //lazy val totalsize = ct.iterator.size /* TODO improve to lazily compute sizes */
+ def totalsize: Int = throw new UnsupportedOperationException
var iterated = 0
- protected override def newIterator(_ct: Ctrie[K, V], _mustInit: Boolean) = new ParCtrieSplitter[K, V](_ct, _mustInit)
+ protected override def newIterator(_lev: Int, _ct: Ctrie[K, V], _mustInit: Boolean) = new ParCtrieSplitter[K, V](_lev, _ct, _mustInit)
+
+ override def shouldSplitFurther[S](coll: collection.parallel.ParIterable[S], parallelismLevel: Int) = {
+ val maxsplits = 3 + Integer.highestOneBit(parallelismLevel)
+ level < maxsplits
+ }
def dup = null // TODO necessary for views
@@ -95,6 +101,8 @@ extends CtrieIterator[K, V](ct, mustInit)
def split: Seq[IterableSplitter[(K, V)]] = subdivide().asInstanceOf[Seq[IterableSplitter[(K, V)]]]
+ override def isRemainingCheap = false
+
def remaining: Int = totalsize - iterated
}
diff --git a/test/benchmarking/ParCtrie-map.scala b/test/benchmarking/ParCtrie-map.scala
new file mode 100644
index 0000000000..c8de99f33e
--- /dev/null
+++ b/test/benchmarking/ParCtrie-map.scala
@@ -0,0 +1,21 @@
+
+
+
+import collection.parallel.mutable.ParCtrie
+
+
+
+object Map extends testing.Benchmark {
+ val length = sys.props("length").toInt
+ val par = sys.props("par").toInt
+ val parctrie = ParCtrie((0 until length) zip (0 until length): _*)
+
+ collection.parallel.ForkJoinTasks.defaultForkJoinPool.setParallelism(par)
+
+ def run = {
+ parctrie map {
+ kv => kv
+ }
+ }
+}
+
diff --git a/test/benchmarking/TreeSetInsert.scala b/test/benchmarking/TreeSetInsert.scala
index 9ede8aedc5..23444aa305 100644
--- a/test/benchmarking/TreeSetInsert.scala
+++ b/test/benchmarking/TreeSetInsert.scala
@@ -33,6 +33,7 @@ object JavaUtilTS extends testing.Benchmark {
}
}
+
object MutableTS extends testing.Benchmark {
val length = sys.props("length").toInt
var data: Array[Dummy] = (0 until length) map { a => new Dummy(a) } toArray
@@ -50,6 +51,7 @@ object MutableTS extends testing.Benchmark {
}
}
+
object ImmutableTS extends testing.Benchmark {
val length = sys.props("length").toInt
var data: Array[Dummy] = (0 until length) map { a => new Dummy(a) } toArray