diff options
Diffstat (limited to 'src')
4 files changed, 52 insertions, 46 deletions
diff --git a/src/library/scala/collection/mutable/Ctrie.scala b/src/library/scala/collection/mutable/Ctrie.scala index e1a72d9511..6ed3a516c4 100644 --- a/src/library/scala/collection/mutable/Ctrie.scala +++ b/src/library/scala/collection/mutable/Ctrie.scala @@ -13,6 +13,7 @@ package mutable import java.util.concurrent.atomic._ import collection.immutable.{ ListMap => ImmutableListMap } +import collection.parallel.mutable.ParCtrie import generic._ import annotation.tailrec import annotation.switch @@ -578,6 +579,8 @@ private[mutable] case class RDCSS_Descriptor[K, V](old: INode[K, V], expectedmai * iterator and clear operations. The cost of evaluating the (lazy) snapshot is * distributed across subsequent updates, thus making snapshot evaluation horizontally scalable. * + * For details, see: http://lampwww.epfl.ch/~prokopec/ctries-snapshot.pdf + * * @author Aleksandar Prokopec * @since 2.10 */ @@ -585,6 +588,7 @@ private[mutable] case class RDCSS_Descriptor[K, V](old: INode[K, V], expectedmai final class Ctrie[K, V] private (r: AnyRef, rtupd: AtomicReferenceFieldUpdater[Ctrie[K, V], AnyRef]) extends ConcurrentMap[K, V] with MapLike[K, V, Ctrie[K, V]] + with CustomParallelizable[(K, V), ParCtrie[K, V]] with Serializable { import Ctrie.computeHash @@ -710,6 +714,10 @@ extends ConcurrentMap[K, V] /* public methods */ + override def seq = this + + override def par = new ParCtrie(this) + override def empty: Ctrie[K, V] = new Ctrie[K, V] @inline final def isReadOnly = rootupdater eq null @@ -820,7 +828,7 @@ extends ConcurrentMap[K, V] def iterator: Iterator[(K, V)] = if (nonReadOnly) readOnlySnapshot().iterator - else new CtrieIterator(this) + else new CtrieIterator(0, this) override def stringPrefix = "Ctrie" @@ -844,7 +852,7 @@ object Ctrie extends MutableMapFactory[Ctrie] { } -private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] { +private[collection] class CtrieIterator[K, V](var level: Int, ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] { var stack = new Array[Array[BasicNode]](7) var stackpos = new Array[Int](7) var depth = -1 @@ -910,7 +918,7 @@ private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean } } else current = null - protected def newIterator(_ct: Ctrie[K, V], _mustInit: Boolean) = new CtrieIterator[K, V](_ct, _mustInit) + protected def newIterator(_lev: Int, _ct: Ctrie[K, V], _mustInit: Boolean) = new CtrieIterator[K, V](_lev, _ct, _mustInit) /** Returns a sequence of iterators over subsets of this iterator. * It's used to ease the implementation of splitters for a parallel version of the Ctrie. @@ -920,8 +928,12 @@ private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean val it = subiter subiter = null advance() + this.level += 1 Seq(it, this) - } else if (depth == -1) Seq(this) else { + } else if (depth == -1) { + this.level += 1 + Seq(this) + } else { var d = 0 while (d <= depth) { val rem = stack(d).length - 1 - stackpos(d) @@ -929,15 +941,17 @@ private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean val (arr1, arr2) = stack(d).drop(stackpos(d) + 1).splitAt(rem / 2) stack(d) = arr1 stackpos(d) = -1 - val it = newIterator(ct, false) + val it = newIterator(level + 1, ct, false) it.stack(0) = arr2 it.stackpos(0) = -1 it.depth = 0 it.advance() // <-- fix it + this.level += 1 return Seq(this, it) } d += 1 } + this.level += 1 Seq(this) } diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 32e0e8a8ed..7c5a835e56 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -96,17 +96,6 @@ import annotation.unchecked.uncheckedVariance * The combination of methods `toMap`, `toSeq` or `toSet` along with `par` and `seq` is a flexible * way to change between different collection types. * - * The method: - * - * {{{ - * def threshold(sz: Int, p: Int): Int - * }}} - * - * provides an estimate on the minimum number of elements the collection has before - * the splitting stops and depends on the number of elements in the collection. A rule of the - * thumb is the number of elements divided by 8 times the parallelism level. This method may - * be overridden in concrete implementations if necessary. - * * Since this trait extends the `Iterable` trait, methods like `size` must also * be implemented in concrete collections, while `iterator` forwards to `splitter` by * default. @@ -206,18 +195,6 @@ self: ParIterableLike[T, Repr, Sequential] => */ def isStrictSplitterCollection = true - /** Some minimal number of elements after which this collection should be handled - * sequentially by different processors. - * - * This method depends on the size of the collection and the parallelism level, which - * are both specified as arguments. - * - * @param sz the size based on which to compute the threshold - * @param p the parallelism level based on which to compute the threshold - * @return the maximum number of elements for performing operations sequentially - */ - def threshold(sz: Int, p: Int): Int = thresholdFromSize(sz, p) - /** The `newBuilder` operation returns a parallel builder assigned to this collection's fork/join pool. * This method forwards the call to `newCombiner`. */ @@ -833,7 +810,7 @@ self: ParIterableLike[T, Repr, Sequential] => extends StrictSplitterCheckTask[R, Tp] { protected[this] val pit: IterableSplitter[T] protected[this] def newSubtask(p: IterableSplitter[T]): Accessor[R, Tp] - def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel) + def shouldSplitFurther = pit.shouldSplitFurther(self.repr, parallelismLevel) def split = pit.splitWithSignalling.map(newSubtask(_)) // default split procedure private[parallel] override def signalAbort = pit.abort override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")(supername: " + super.toString + ")" @@ -1362,7 +1339,7 @@ self: ParIterableLike[T, Repr, Sequential] => /* scan tree */ - protected[this] def scanBlockSize = (threshold(size, parallelismLevel) / 2) max 1 + protected[this] def scanBlockSize = (thresholdFromSize(size, parallelismLevel) / 2) max 1 protected[this] trait ScanTree[U >: T] { def beginsAt: Int diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index e8b332da89..8ed4583419 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -28,6 +28,11 @@ private[collection] trait RemainsIterator[+T] extends Iterator[T] { * This method doesn't change the state of the iterator. */ def remaining: Int + + /** For most collections, this is a cheap operation. + * Exceptions can override this method. + */ + def isRemainingCheap = true } @@ -112,7 +117,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[ def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = { //val cb = pbf(repr) - cb.sizeHint(remaining) + if (isRemainingCheap) cb.sizeHint(remaining) while (hasNext) cb += f(next) cb } @@ -137,7 +142,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[ } def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](b: Bld): Bld = { - b.sizeHint(remaining) + if (isRemainingCheap) b.sizeHint(remaining) while (hasNext) b += next b } @@ -179,7 +184,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[ def drop2combiner[U >: T, This](n: Int, cb: Combiner[U, This]): Combiner[U, This] = { drop(n) - cb.sizeHint(remaining) + if (isRemainingCheap) cb.sizeHint(remaining) while (hasNext) cb += next cb } @@ -197,7 +202,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[ def splitAt2combiners[U >: T, This](at: Int, before: Combiner[U, This], after: Combiner[U, This]) = { before.sizeHint(at) - after.sizeHint(remaining - at) + if (isRemainingCheap) after.sizeHint(remaining - at) var left = at while (left > 0) { before += next @@ -223,7 +228,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[ val curr = next if (p(curr)) before += curr else { - after.sizeHint(remaining + 1) + if (isRemainingCheap) after.sizeHint(remaining + 1) after += curr isBefore = false } @@ -263,7 +268,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[ } def zip2combiner[U >: T, S, That](otherpit: RemainsIterator[S], cb: Combiner[(U, S), That]): Combiner[(U, S), That] = { - cb.sizeHint(remaining min otherpit.remaining) + if (isRemainingCheap && otherpit.isRemainingCheap) cb.sizeHint(remaining min otherpit.remaining) while (hasNext && otherpit.hasNext) { cb += ((next, otherpit.next)) } @@ -271,7 +276,7 @@ private[collection] trait AugmentedIterableIterator[+T] extends RemainsIterator[ } def zipAll2combiner[U >: T, S, That](that: RemainsIterator[S], thiselem: U, thatelem: S, cb: Combiner[(U, S), That]): Combiner[(U, S), That] = { - cb.sizeHint(remaining max that.remaining) + if (isRemainingCheap && that.isRemainingCheap) cb.sizeHint(remaining max that.remaining) while (this.hasNext && that.hasNext) cb += ((this.next, that.next)) while (this.hasNext) cb += ((this.next, thatelem)) while (that.hasNext) cb += ((thiselem, that.next)) @@ -330,7 +335,7 @@ private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIter /* transformers */ def reverse2combiner[U >: T, This](cb: Combiner[U, This]): Combiner[U, This] = { - cb.sizeHint(remaining) + if (isRemainingCheap) cb.sizeHint(remaining) var lst = List[T]() while (hasNext) lst ::= next while (lst != Nil) { @@ -342,7 +347,7 @@ private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIter def reverseMap2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = { //val cb = cbf(repr) - cb.sizeHint(remaining) + if (isRemainingCheap) cb.sizeHint(remaining) var lst = List[S]() while (hasNext) lst ::= f(next) while (lst != Nil) { @@ -354,7 +359,7 @@ private[collection] trait AugmentedSeqIterator[+T] extends AugmentedIterableIter def updated2combiner[U >: T, That](index: Int, elem: U, cb: Combiner[U, That]): Combiner[U, That] = { //val cb = cbf(repr) - cb.sizeHint(remaining) + if (isRemainingCheap) cb.sizeHint(remaining) var j = 0 while (hasNext) { if (j == index) { @@ -395,6 +400,8 @@ self => pits } + def shouldSplitFurther[S](coll: ParIterable[S], parallelismLevel: Int) = remaining > thresholdFromSize(coll.size, parallelismLevel) + /** The number of elements this iterator has yet to traverse. This method * doesn't change the state of the iterator. * diff --git a/src/library/scala/collection/parallel/mutable/ParCtrie.scala b/src/library/scala/collection/parallel/mutable/ParCtrie.scala index d8c060e719..86624500fd 100644 --- a/src/library/scala/collection/parallel/mutable/ParCtrie.scala +++ b/src/library/scala/collection/parallel/mutable/ParCtrie.scala @@ -27,7 +27,7 @@ import scala.collection.mutable.CtrieIterator * @author Aleksandar Prokopec * @since 2.10 */ -final class ParCtrie[K, V] private[mutable] (private val ctrie: Ctrie[K, V]) +final class ParCtrie[K, V] private[collection] (private val ctrie: Ctrie[K, V]) extends ParMap[K, V] with GenericParMapTemplate[K, V, ParCtrie] with ParMapLike[K, V, ParCtrie[K, V], Ctrie[K, V]] @@ -45,7 +45,7 @@ extends ParMap[K, V] override def seq = ctrie - def splitter = new ParCtrieSplitter(ctrie.readOnlySnapshot().asInstanceOf[Ctrie[K, V]], true) + def splitter = new ParCtrieSplitter(0, ctrie.readOnlySnapshot().asInstanceOf[Ctrie[K, V]], true) override def size = ctrie.size @@ -76,15 +76,21 @@ extends ParMap[K, V] } -private[collection] class ParCtrieSplitter[K, V](ct: Ctrie[K, V], mustInit: Boolean) -extends CtrieIterator[K, V](ct, mustInit) +private[collection] class ParCtrieSplitter[K, V](lev: Int, ct: Ctrie[K, V], mustInit: Boolean) +extends CtrieIterator[K, V](lev, ct, mustInit) with IterableSplitter[(K, V)] { // only evaluated if `remaining` is invoked (which is not used by most tasks) - lazy val totalsize = ct.iterator.size // TODO improve to lazily compute sizes + //lazy val totalsize = ct.iterator.size /* TODO improve to lazily compute sizes */ + def totalsize: Int = throw new UnsupportedOperationException var iterated = 0 - protected override def newIterator(_ct: Ctrie[K, V], _mustInit: Boolean) = new ParCtrieSplitter[K, V](_ct, _mustInit) + protected override def newIterator(_lev: Int, _ct: Ctrie[K, V], _mustInit: Boolean) = new ParCtrieSplitter[K, V](_lev, _ct, _mustInit) + + override def shouldSplitFurther[S](coll: collection.parallel.ParIterable[S], parallelismLevel: Int) = { + val maxsplits = 3 + Integer.highestOneBit(parallelismLevel) + level < maxsplits + } def dup = null // TODO necessary for views @@ -95,6 +101,8 @@ extends CtrieIterator[K, V](ct, mustInit) def split: Seq[IterableSplitter[(K, V)]] = subdivide().asInstanceOf[Seq[IterableSplitter[(K, V)]]] + override def isRemainingCheap = false + def remaining: Int = totalsize - iterated } |