summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Prokopec <axel22@gmail.com>2012-02-02 19:59:12 +0100
committerAleksandar Prokopec <axel22@gmail.com>2012-02-02 19:59:12 +0100
commit2d9dfe3077fa2b43a336548cad98a522215c52a9 (patch)
treec0d8a801230fc8b0984e0ae416a4bd21c4ded35f
parentc3d19c58d8a94b7232718321f6994c001257cc96 (diff)
downloadscala-2d9dfe3077fa2b43a336548cad98a522215c52a9.tar.gz
scala-2d9dfe3077fa2b43a336548cad98a522215c52a9.tar.bz2
scala-2d9dfe3077fa2b43a336548cad98a522215c52a9.zip
Add parallel Ctrie parallel collection.
-rw-r--r--src/library/scala/collection/mutable/Ctrie.scala8
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/ParCtrie.scala139
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala1
-rw-r--r--src/library/scala/collection/parallel/package.scala11
5 files changed, 151 insertions, 10 deletions
diff --git a/src/library/scala/collection/mutable/Ctrie.scala b/src/library/scala/collection/mutable/Ctrie.scala
index 84cceb44eb..e1a72d9511 100644
--- a/src/library/scala/collection/mutable/Ctrie.scala
+++ b/src/library/scala/collection/mutable/Ctrie.scala
@@ -844,7 +844,7 @@ object Ctrie extends MutableMapFactory[Ctrie] {
}
-private[mutable] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] {
+private[collection] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] {
var stack = new Array[Array[BasicNode]](7)
var stackpos = new Array[Int](7)
var depth = -1
@@ -910,10 +910,12 @@ private[mutable] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean =
}
} else current = null
+ protected def newIterator(_ct: Ctrie[K, V], _mustInit: Boolean) = new CtrieIterator[K, V](_ct, _mustInit)
+
/** Returns a sequence of iterators over subsets of this iterator.
* It's used to ease the implementation of splitters for a parallel version of the Ctrie.
*/
- protected def subdivide: Seq[Iterator[(K, V)]] = if (subiter ne null) {
+ protected def subdivide(): Seq[Iterator[(K, V)]] = if (subiter ne null) {
// the case where an LNode is being iterated
val it = subiter
subiter = null
@@ -927,7 +929,7 @@ private[mutable] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean =
val (arr1, arr2) = stack(d).drop(stackpos(d) + 1).splitAt(rem / 2)
stack(d) = arr1
stackpos(d) = -1
- val it = new CtrieIterator[K, V](ct, false)
+ val it = newIterator(ct, false)
it.stack(0) = arr2
it.stackpos(0) = -1
it.depth = 0
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index b24497371d..32e0e8a8ed 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -451,7 +451,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
reduce((x, y) => if (cmp.lteq(f(x), f(y))) x else y)
}
-
+
def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = if (bf(repr).isCombiner) {
executeAndWaitResult(new Map[S, That](f, combinerFactory(() => bf(repr).asCombiner), splitter) mapResult { _.result })
} else seq.map(f)(bf2seq(bf))
diff --git a/src/library/scala/collection/parallel/mutable/ParCtrie.scala b/src/library/scala/collection/parallel/mutable/ParCtrie.scala
new file mode 100644
index 0000000000..d8c060e719
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParCtrie.scala
@@ -0,0 +1,139 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.collection.parallel.mutable
+
+
+
+import scala.collection.generic._
+import scala.collection.parallel.Combiner
+import scala.collection.parallel.IterableSplitter
+import scala.collection.mutable.Ctrie
+import scala.collection.mutable.CtrieIterator
+
+
+
+/** Parallel Ctrie collection.
+ *
+ * It has its bulk operations parallelized, but uses the snapshot operation
+ * to create the splitter. This means that parallel bulk operations can be
+ * called concurrently with the modifications.
+ *
+ * @author Aleksandar Prokopec
+ * @since 2.10
+ */
+final class ParCtrie[K, V] private[mutable] (private val ctrie: Ctrie[K, V])
+extends ParMap[K, V]
+ with GenericParMapTemplate[K, V, ParCtrie]
+ with ParMapLike[K, V, ParCtrie[K, V], Ctrie[K, V]]
+ with ParCtrieCombiner[K, V]
+ with Serializable
+{
+
+ def this() = this(new Ctrie)
+
+ override def mapCompanion: GenericParMapCompanion[ParCtrie] = ParCtrie
+
+ override def empty: ParCtrie[K, V] = ParCtrie.empty
+
+ protected[this] override def newCombiner = ParCtrie.newCombiner
+
+ override def seq = ctrie
+
+ def splitter = new ParCtrieSplitter(ctrie.readOnlySnapshot().asInstanceOf[Ctrie[K, V]], true)
+
+ override def size = ctrie.size
+
+ override def clear() = ctrie.clear()
+
+ def result = this
+
+ def get(key: K): Option[V] = ctrie.get(key)
+
+ def put(key: K, value: V): Option[V] = ctrie.put(key, value)
+
+ def update(key: K, value: V): Unit = ctrie.update(key, value)
+
+ def remove(key: K): Option[V] = ctrie.remove(key)
+
+ def +=(kv: (K, V)): this.type = {
+ ctrie.+=(kv)
+ this
+ }
+
+ def -=(key: K): this.type = {
+ ctrie.-=(key)
+ this
+ }
+
+ override def stringPrefix = "ParCtrie"
+
+}
+
+
+private[collection] class ParCtrieSplitter[K, V](ct: Ctrie[K, V], mustInit: Boolean)
+extends CtrieIterator[K, V](ct, mustInit)
+ with IterableSplitter[(K, V)]
+{
+ // only evaluated if `remaining` is invoked (which is not used by most tasks)
+ lazy val totalsize = ct.iterator.size // TODO improve to lazily compute sizes
+ var iterated = 0
+
+ protected override def newIterator(_ct: Ctrie[K, V], _mustInit: Boolean) = new ParCtrieSplitter[K, V](_ct, _mustInit)
+
+ def dup = null // TODO necessary for views
+
+ override def next() = {
+ iterated += 1
+ super.next()
+ }
+
+ def split: Seq[IterableSplitter[(K, V)]] = subdivide().asInstanceOf[Seq[IterableSplitter[(K, V)]]]
+
+ def remaining: Int = totalsize - iterated
+}
+
+
+/** Only used within the `ParCtrie`. */
+private[mutable] trait ParCtrieCombiner[K, V] extends Combiner[(K, V), ParCtrie[K, V]] {
+
+ def combine[N <: (K, V), NewTo >: ParCtrie[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this eq other) this else {
+ throw new UnsupportedOperationException("This shouldn't have been called in the first place.")
+
+ val thiz = this.asInstanceOf[ParCtrie[K, V]]
+ val that = other.asInstanceOf[ParCtrie[K, V]]
+ val result = new ParCtrie[K, V]
+
+ result ++= thiz.iterator
+ result ++= that.iterator
+
+ result
+ }
+
+ override def canBeShared = true
+
+}
+
+
+object ParCtrie extends ParMapFactory[ParCtrie] {
+
+ def empty[K, V]: ParCtrie[K, V] = new ParCtrie[K, V]
+
+ def newCombiner[K, V]: Combiner[(K, V), ParCtrie[K, V]] = new ParCtrie[K, V]
+
+ implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParCtrie[K, V]] = new CanCombineFromMap[K, V]
+
+}
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
index 3b4d3dc0b0..15ffd3fdd2 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -12,7 +12,6 @@ package mutable
-
import collection.generic._
import collection.mutable.DefaultEntry
import collection.mutable.HashEntry
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index cdb9944fdc..8f19d0ecdb 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -196,22 +196,23 @@ package parallel {
* the receiver (which will be the return value).
*/
private[parallel] abstract class BucketCombiner[-Elem, +To, Buck, +CombinerType <: BucketCombiner[Elem, To, Buck, CombinerType]]
- (private val bucketnumber: Int)
+ (private val bucketnumber: Int)
extends Combiner[Elem, To] {
//self: EnvironmentPassingCombiner[Elem, To] =>
protected var buckets: Array[UnrolledBuffer[Buck]] @uncheckedVariance = new Array[UnrolledBuffer[Buck]](bucketnumber)
protected var sz: Int = 0
-
+
def size = sz
-
+
def clear() = {
buckets = new Array[UnrolledBuffer[Buck]](bucketnumber)
sz = 0
}
-
+
def beforeCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
+
def afterCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
-
+
def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = {
if (this eq other) this
else other match {