summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/package.scala
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-20 20:19:56 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-20 20:19:56 +0000
commitd3d218e5ea77584489437f0dfa8148ee3764d6f7 (patch)
tree881fba9234da6654e8d914c8b56ddadd100c5cba /src/library/scala/collection/parallel/package.scala
parentd13a2529aa8218836d13ee04303da4f3325933c2 (diff)
downloadscala-d3d218e5ea77584489437f0dfa8148ee3764d6f7.tar.gz
scala-d3d218e5ea77584489437f0dfa8148ee3764d6f7.tar.bz2
scala-d3d218e5ea77584489437f0dfa8148ee3764d6f7.zip
Further work on parallel mutable hash maps.
Changed HashTable interface. Fixed one test. Implemented hash map iterators. Implementing hash map combiners. Extracting common functionalities of bucket-based combiners. No review.
Diffstat (limited to 'src/library/scala/collection/parallel/package.scala')
-rw-r--r--src/library/scala/collection/parallel/package.scala141
1 files changed, 137 insertions, 4 deletions
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 76677a1148..a30d564039 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -7,13 +7,21 @@ import scala.collection.generic.CanBuildFrom
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.mutable.ParArray
+import annotation.unchecked.uncheckedVariance
+
/** Package object for parallel collections.
*/
package object parallel {
- val MIN_FOR_COPY = -1 // TODO: set to 5000
+
+ /* constants */
+ val MIN_FOR_COPY = -1
val CHECK_RATE = 512
val SQRT2 = math.sqrt(2)
+ val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors
+ private[parallel] val unrolledsize = 16
+
+ /* functions */
/** Computes threshold from the size of the collection and the parallelism level.
*/
@@ -23,11 +31,136 @@ package object parallel {
else sz
}
- val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors
+ private[parallel] def unsupported(msg: String) = throw new UnsupportedOperationException(msg)
+
+ private[parallel] def unsupported = throw new UnsupportedOperationException
+
+ /* classes */
- def unsupported(msg: String) = throw new UnsupportedOperationException(msg)
+ /** Unrolled list node.
+ */
+ private[parallel] class Unrolled[T: ClassManifest] {
+ var size = 0
+ var array = new Array[T](unrolledsize)
+ var next: Unrolled[T] = null
+ // adds and returns itself or the new unrolled if full
+ def add(elem: T): Unrolled[T] = if (size < unrolledsize) {
+ array(size) = elem
+ size += 1
+ this
+ } else {
+ next = new Unrolled[T]
+ next.add(elem)
+ }
+ def foreach[U](f: T => U) {
+ var unrolled = this
+ var i = 0
+ while (unrolled ne null) {
+ val chunkarr = unrolled.array
+ val chunksz = unrolled.size
+ while (i < chunksz) {
+ val elem = chunkarr(i)
+ f(elem)
+ i += 1
+ }
+ i = 0
+ unrolled = unrolled.next
+ }
+ }
+ override def toString = array.mkString("Unrolled(", ", ", ")")
+ }
+
+ /** A helper iterator for iterating very small array buffers.
+ * Automatically forwards the signal delegate when splitting.
+ */
+ private[parallel] class BufferIterator[T]
+ (private val buffer: collection.mutable.ArrayBuffer[T], private var index: Int, private val until: Int, var signalDelegate: collection.generic.Signalling)
+ extends ParIterableIterator[T] {
+ def hasNext = index < until
+ def next = {
+ val r = buffer(index)
+ index += 1
+ r
+ }
+ def remaining = until - index
+ def split: Seq[ParIterableIterator[T]] = if (remaining > 1) {
+ val divsz = (until - index) / 2
+ Seq(
+ new BufferIterator(buffer, index, index + divsz, signalDelegate),
+ new BufferIterator(buffer, index + divsz, until, signalDelegate)
+ )
+ } else Seq(this)
+ }
+
+ /** A helper combiner which contains an array of buckets. Buckets themselves
+ * are unrolled linked lists. Some parallel collections are constructed by
+ * sorting their result set according to some criteria.
+ *
+ * A reference `heads` to bucket heads is maintained, as well as a reference
+ * `lasts` to the last unrolled list node. Size is kept in `sz` and maintained
+ * whenever 2 bucket combiners are combined.
+ *
+ * Clients decide how to maintain these by implementing `+=` and `result`.
+ * Populating and using the buckets is up to the client.
+ * Note that in general the type of the elements contained in the buckets `Buck`
+ * doesn't have to correspond to combiner element type `Elem`.
+ *
+ * This class simply gives an efficient `combine` for free - it chains
+ * the buckets together. Since the `combine` contract states that the receiver (`this`)
+ * becomes invalidated, `combine` reuses the receiver and returns it.
+ *
+ * Methods `beforeCombine` and `afterCombine` are called before and after
+ * combining the buckets, respectively, given that the argument to `combine`
+ * is not `this` (as required by the `combine` contract).
+ * They can be overriden in subclasses to provide custom behaviour by modifying
+ * the receiver (which will be the return value).
+ */
+ private[parallel] abstract class BucketCombiner[-Elem, +To, Buck, +CombinerType <: BucketCombiner[Elem, To, Buck, CombinerType]]
+ (private val bucketnumber: Int)
+ extends Combiner[Elem, To] {
+ self: EnvironmentPassingCombiner[Elem, To] =>
+ protected var heads: Array[Unrolled[Buck]] @uncheckedVariance = new Array[Unrolled[Buck]](bucketnumber)
+ protected var lasts: Array[Unrolled[Buck]] @uncheckedVariance = new Array[Unrolled[Buck]](bucketnumber)
+ protected var sz: Int = 0
+
+ def size = sz
+
+ def clear = {
+ heads = new Array[Unrolled[Buck]](bucketnumber)
+ lasts = new Array[Unrolled[Buck]](bucketnumber)
+ sz = 0
+ }
+
+ def beforeCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
+ def afterCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
+
+ def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
+ if (other.isInstanceOf[BucketCombiner[_, _, _, _]]) {
+ beforeCombine(other)
+
+ val that = other.asInstanceOf[BucketCombiner[Elem, To, Buck, CombinerType]]
+ var i = 0
+ while (i < bucketnumber) {
+ if (lasts(i) eq null) {
+ heads(i) = that.heads(i)
+ lasts(i) = that.lasts(i)
+ } else {
+ lasts(i).next = that.heads(i)
+ if (that.lasts(i) ne null) lasts(i) = that.lasts(i)
+ }
+ i += 1
+ }
+ sz = sz + that.size
+
+ afterCombine(other)
+
+ this
+ } else error("Unexpected combiner type.")
+ } else this
+
+ }
- def unsupported = throw new UnsupportedOperationException
+ /* implicit conversions */
/** An implicit conversion providing arrays with a `par` method, which
* returns a parallel array.