summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-11-27 22:13:27 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-11-27 22:13:27 +0000
commit49d47cb3723c9d3160e3f215808b571a05d1b252 (patch)
tree631787bc9e2f67613402eff45207a3fbd098fd12 /src
parent14e47d131b1d395a3c636bef815706dd60171011 (diff)
downloadscala-49d47cb3723c9d3160e3f215808b571a05d1b252.tar.gz
scala-49d47cb3723c9d3160e3f215808b571a05d1b252.tar.bz2
scala-49d47cb3723c9d3160e3f215808b571a05d1b252.zip
Refactored parallel collections for pluggable t...
Refactored parallel collections for pluggable task support implementations. No review.
Diffstat (limited to 'src')
-rw-r--r--src/library/scala/collection/generic/GenericParTemplate.scala12
-rw-r--r--src/library/scala/collection/parallel/Combiner.scala8
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala29
-rw-r--r--src/library/scala/collection/parallel/ParIterableViewLike.scala5
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala1
-rw-r--r--src/library/scala/collection/parallel/ParSeqViewLike.scala3
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala4
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala3
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArray.scala17
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala7
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala3
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashSet.scala5
-rw-r--r--src/library/scala/collection/parallel/package.scala2
13 files changed, 50 insertions, 49 deletions
diff --git a/src/library/scala/collection/generic/GenericParTemplate.scala b/src/library/scala/collection/generic/GenericParTemplate.scala
index 1aac428438..1c3c43269d 100644
--- a/src/library/scala/collection/generic/GenericParTemplate.scala
+++ b/src/library/scala/collection/generic/GenericParTemplate.scala
@@ -25,15 +25,16 @@ import annotation.unchecked.uncheckedVariance
trait GenericParTemplate[+A, +CC[X] <: ParIterable[X]]
extends GenericTraversableTemplate[A, CC]
with HasNewCombiner[A, CC[A] @uncheckedVariance]
- with TaskSupport
{
+ private[collection] def tasksupport: TaskSupport
+
def companion: GenericCompanion[CC] with GenericParCompanion[CC]
protected[this] override def newBuilder: collection.mutable.Builder[A, CC[A]] = newCombiner
protected[this] override def newCombiner: Combiner[A, CC[A]] = {
val cb = companion.newCombiner[A]
- cb.environment = environment
+ cb.tasksupport.environment = tasksupport.environment
cb
}
@@ -41,7 +42,7 @@ extends GenericTraversableTemplate[A, CC]
def genericCombiner[B]: Combiner[B, CC[B]] = {
val cb = companion.newCombiner[B]
- cb.environment = environment
+ cb.tasksupport.environment = tasksupport.environment
cb
}
@@ -49,13 +50,14 @@ extends GenericTraversableTemplate[A, CC]
trait GenericParMapTemplate[K, +V, +CC[X, Y] <: ParMap[X, Y]]
-extends TaskSupport
{
+ private[collection] def tasksupport: TaskSupport
+
def mapCompanion: GenericParMapCompanion[CC]
def genericMapCombiner[P, Q]: Combiner[(P, Q), CC[P, Q]] = {
val cb = mapCompanion.newCombiner[P, Q]
- cb.environment = environment
+ cb.tasksupport.environment = tasksupport.environment
cb
}
}
diff --git a/src/library/scala/collection/parallel/Combiner.scala b/src/library/scala/collection/parallel/Combiner.scala
index f47f92457f..93522185fb 100644
--- a/src/library/scala/collection/parallel/Combiner.scala
+++ b/src/library/scala/collection/parallel/Combiner.scala
@@ -18,8 +18,9 @@ import scala.collection.generic.Sizing
*
* @author prokopec
*/
-trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel with TaskSupport {
+trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel {
self: EnvironmentPassingCombiner[Elem, To] =>
+ private[collection] final val tasksupport = getTaskSupport
type EPC = EnvironmentPassingCombiner[Elem, To]
@@ -56,7 +57,10 @@ self: EnvironmentPassingCombiner[Elem, To] =>
trait EnvironmentPassingCombiner[-Elem, +To] extends Combiner[Elem, To] {
abstract override def result = {
val res = super.result
- if (res.isInstanceOf[TaskSupport]) res.asInstanceOf[TaskSupport].environment = environment
+ res match {
+ case pc: ParIterableLike[_, _, _] => pc.tasksupport.environment = tasksupport.environment
+ case _ =>
+ }
res
}
}
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index b3e7ea6072..9cbe6fc655 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -123,9 +123,12 @@ extends IterableLike[T, Repr]
with Sequentializable[T, Sequential]
with Parallel
with HasNewCombiner[T, Repr]
- with TaskSupport {
+{
self =>
+ private[collection] final val tasksupport: TaskSupport = getTaskSupport
+ import tasksupport._
+
/** Parallel iterators are split iterators that have additional accessor and
* transformer methods defined in terms of methods `next` and `hasNext`.
* When creating a new parallel collection, one might want to override these
@@ -682,7 +685,7 @@ self =>
override def toMap[K, V](implicit ev: T <:< (K, V)): collection.immutable.Map[K, V] = seq.toMap
- override def toParIterable = this.asInstanceOf[ParIterable[T]] // TODO add a type bound on Repr
+ override def toParIterable = this.asInstanceOf[ParIterable[T]]
override def toParSeq = seq.toParSeq
@@ -692,7 +695,7 @@ self =>
/* tasks */
- protected trait StrictSplitterCheckTask[R, Tp] extends super.Task[R, Tp] {
+ protected trait StrictSplitterCheckTask[R, Tp] extends Task[R, Tp] {
def requiresStrictSplitters = false
if (requiresStrictSplitters && !isStrictSplitterCollection)
throw new UnsupportedOperationException("This collection does not provide strict splitters.")
@@ -792,26 +795,8 @@ self =>
protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] {
var result: Option[U] = None
- def leaf(prevr: Option[Option[U]]) = {
- // pit.debugInformation
- // val rem = pit.remaining
- // val lst = pit.toList
- // val pa = mutable.ParArray(lst: _*)
- // val str = "At leaf we will iterate " + rem + " elems: " + pa.parallelIterator.toList
- // val p2 = pa.parallelIterator
- if (pit.remaining > 0) result = Some(pit.reduce(op))
- // println(str)
- }
+ def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.reduce(op))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Reduce(op, p)
- // override def split = {
- // var str = pit.debugInformation
- // val pits = pit.split
- // str += "\nsplitting: " + pits.map(_.remaining) + "\n"
- // str += pits.map(_.debugInformation).mkString("\n")
- // str += "=========================================\n"
- // println(str)
- // pits map { p => newSubtask(p) }
- // }
override def merge(that: Reduce[U]) =
if (this.result == None) result = that.result
else if (that.result != None) result = Some(op(result.get, that.result.get))
diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala
index c5b33fb7b6..4192b0be5c 100644
--- a/src/library/scala/collection/parallel/ParIterableViewLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala
@@ -40,6 +40,7 @@ extends IterableView[T, Coll]
with ParIterableLike[T, This, ThisSeq]
{
self =>
+ import tasksupport._
override protected[this] def newCombiner: Combiner[T, This] = throw new UnsupportedOperationException(this + ".newCombiner");
@@ -48,7 +49,7 @@ self =>
trait Transformed[+S] extends ParIterableView[S, Coll, CollSeq] with super.Transformed[S] {
override def parallelIterator: ParIterableIterator[S]
override def iterator = parallelIterator
- environment = self.environment
+ tasksupport.environment = self.tasksupport.environment
}
trait Sliced extends super.Sliced with Transformed[T] {
@@ -121,7 +122,7 @@ self =>
newZippedAllTryParSeq(that, thisElem, thatElem).asInstanceOf[That]
override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf =>
- executeAndWaitResult(new Force(pbf, parallelIterator) mapResult { _.result })
+ executeAndWaitResult(new Force(pbf, parallelIterator).mapResult(_.result).asInstanceOf[Task[That, ResultMapping[_, Force[U, That], That]]])
} otherwise {
val b = bf(underlying)
b ++= this.iterator
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index 5cc78e272b..9e24b83d8a 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -40,6 +40,7 @@ trait ParSeqLike[+T, +Repr <: Parallel, +Sequential <: Seq[T] with SeqLike[T, Se
extends scala.collection.SeqLike[T, Repr]
with ParIterableLike[T, Repr, Sequential] {
self =>
+ import tasksupport._
type SuperParIterator = ParIterableIterator[T]
diff --git a/src/library/scala/collection/parallel/ParSeqViewLike.scala b/src/library/scala/collection/parallel/ParSeqViewLike.scala
index 0334cda6a8..2c323ecade 100644
--- a/src/library/scala/collection/parallel/ParSeqViewLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqViewLike.scala
@@ -39,6 +39,7 @@ extends SeqView[T, Coll]
with ParSeqLike[T, This, ThisSeq]
{
self =>
+ import tasksupport._
trait Transformed[+S] extends ParSeqView[S, Coll, CollSeq]
with super[ParIterableView].Transformed[S] with super[SeqView].Transformed[S] {
@@ -161,7 +162,7 @@ self =>
override def scanRight[S, That](z: S)(op: (T, S) => S)(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisParSeq.scanRight(z)(op)).asInstanceOf[That]
override def groupBy[K](f: T => K): collection.immutable.Map[K, This] = thisParSeq.groupBy(f).mapValues(xs => newForced(xs).asInstanceOf[This])
override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf =>
- executeAndWaitResult(new Force(pbf, parallelIterator) mapResult { _.result })
+ executeAndWaitResult(new Force(pbf, parallelIterator).mapResult(_.result).asInstanceOf[Task[That, _]])
} otherwise {
val b = bf(underlying)
b ++= this.iterator
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 79dddc7c8b..5c3720a3bf 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -129,6 +129,7 @@ private[immutable] abstract class HashMapCombiner[K, V]
extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], (K, V), HashMapCombiner[K, V]](HashMapCombiner.rootsize) {
self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
import HashMapCombiner._
+ import tasksupport._
val emptyTrie = HashMap.empty[K, V]
def +=(elem: (K, V)) = {
@@ -172,7 +173,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
/* tasks */
- class CreateTrie(bucks: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int) extends super.Task[Unit, CreateTrie] {
+ class CreateTrie(bucks: Array[Unrolled[(K, V)]], root: Array[HashMap[K, V]], offset: Int, howmany: Int)
+ extends Task[Unit, CreateTrie] {
var result = ()
def leaf(prev: Option[Unit]) = {
var i = offset
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
index 66ded02397..747ed3eed3 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -107,6 +107,7 @@ private[immutable] abstract class HashSetCombiner[T]
extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetCombiner[T]](HashSetCombiner.rootsize) {
self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
import HashSetCombiner._
+ import tasksupport._
val emptyTrie = HashSet.empty[T]
def +=(elem: T) = {
@@ -147,7 +148,7 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
/* tasks */
class CreateTrie(bucks: Array[Unrolled[Any]], root: Array[HashSet[T]], offset: Int, howmany: Int)
- extends super.Task[Unit, CreateTrie] {
+ extends Task[Unit, CreateTrie] {
var result = ()
def leaf(prev: Option[Unit]) = {
var i = offset
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
index 0ca6858fb2..f02775f332 100644
--- a/src/library/scala/collection/parallel/mutable/ParArray.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -42,7 +42,8 @@ extends ParSeq[T]
with GenericParTemplate[T, ParArray]
with ParSeqLike[T, ParArray[T], ArraySeq[T]]
{
- self =>
+self =>
+ import tasksupport._
private val array: Array[Any] = arrayseq.array.asInstanceOf[Array[Any]]
@@ -533,6 +534,8 @@ extends ParSeq[T]
/* operations */
+ private def asTask[R, Tp](t: Any) = t.asInstanceOf[Task[R, Tp]]
+
private def buildsArray[S, That](c: Builder[S, That]) = c.isInstanceOf[ParArrayCombiner[_]]
override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[ParArray[T], S, That]) = if (buildsArray(bf(repr))) {
@@ -554,13 +557,9 @@ extends ParSeq[T]
targetarr(0) = z
// do a parallel prefix scan
- executeAndWaitResult(new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator) mapResult { st =>
- // println("-----------------------")
- // println(targetarr.toList)
- // st.printTree
- executeAndWaitResult(new ScanWithScanTree[U, Any](Some(z), op, st, array, targetarr))
- })
- // println(targetarr.toList)
+ executeAndWaitResult(asTask[That, Task[That, _]](new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator).mapResult(st =>
+ executeAndWaitResult(asTask[That, Task[That, _]](new ScanWithScanTree[U, Any](Some(z), op, st, array, targetarr)))
+ )))
// wrap the array into a parallel array
(new ParArray[U](targarrseq)).asInstanceOf[That]
@@ -568,7 +567,7 @@ extends ParSeq[T]
/* tasks */
- class Map[S](f: T => S, targetarr: Array[Any], offset: Int, howmany: Int) extends super.Task[Unit, Map[S]] {
+ class Map[S](f: T => S, targetarr: Array[Any], offset: Int, howmany: Int) extends Task[Unit, Map[S]] {
var result = ();
def leaf(prev: Option[Unit]) = {
val tarr = targetarr
diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
index 7e6bbd9333..d95e478fec 100644
--- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
@@ -18,8 +18,9 @@ import scala.collection.parallel.EnvironmentPassingCombiner
trait ParArrayCombiner[T]
extends LazyCombiner[T, ParArray[T], ExposedArrayBuffer[T]]
- with TaskSupport {
- self: EnvironmentPassingCombiner[T, ParArray[T]] =>
+{
+self: EnvironmentPassingCombiner[T, ParArray[T]] =>
+ import tasksupport._
override def sizeHint(sz: Int) = if (chain.length == 1) chain(0).sizeHint(sz)
@@ -41,7 +42,7 @@ extends LazyCombiner[T, ParArray[T], ExposedArrayBuffer[T]]
/* tasks */
- class CopyChainToArray(array: Array[Any], offset: Int, howmany: Int) extends super.Task[Unit, CopyChainToArray] {
+ class CopyChainToArray(array: Array[Any], offset: Int, howmany: Int) extends Task[Unit, CopyChainToArray] {
var result = ()
def leaf(prev: Option[Unit]) = if (howmany > 0) {
var totalleft = howmany
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
index f4b59186d5..537c442e23 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -123,6 +123,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr
with collection.mutable.HashTable.HashUtils[K]
{
self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
+ import tasksupport._
private var mask = ParHashMapCombiner.discriminantmask
private var nonmasklen = ParHashMapCombiner.nonmasklength
@@ -220,7 +221,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
import UnrolledBuffer.Unrolled
class FillBlocks(buckets: Array[Unrolled[DefaultEntry[K, V]]], table: AddingHashTable, offset: Int, howmany: Int)
- extends super.Task[Int, FillBlocks] {
+ extends Task[Int, FillBlocks] {
var result = Int.MinValue
def leaf(prev: Option[Int]) = {
var i = offset
diff --git a/src/library/scala/collection/parallel/mutable/ParHashSet.scala b/src/library/scala/collection/parallel/mutable/ParHashSet.scala
index ef173a23ab..66303862d3 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashSet.scala
@@ -90,6 +90,7 @@ private[mutable] abstract class ParHashSetCombiner[T](private val tableLoadFacto
extends collection.parallel.BucketCombiner[T, ParHashSet[T], Any, ParHashSetCombiner[T]](ParHashSetCombiner.numblocks)
with collection.mutable.FlatHashTable.HashUtils[T] {
self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
+ import tasksupport._
private var mask = ParHashSetCombiner.discriminantmask
private var nonmasklen = ParHashSetCombiner.nonmasklength
@@ -189,7 +190,7 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
//
// tableSize = tableSize + 1
//
- // interestingly, it completely bogs down the parallel
+ // furthermore, it completely bogs down the parallel
// execution when there are multiple workers
nnSizeMapAdd(h)
@@ -200,7 +201,7 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
/* tasks */
class FillBlocks(buckets: Array[UnrolledBuffer[Any]], table: AddingFlatHashTable, val offset: Int, val howmany: Int)
- extends super.Task[(Int, UnrolledBuffer[Any]), FillBlocks] {
+ extends Task[(Int, UnrolledBuffer[Any]), FillBlocks] {
var result = (Int.MinValue, new UnrolledBuffer[Any]);
def leaf(prev: Option[(Int, UnrolledBuffer[Any])]) {
var i = offset
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 0872ccc423..e1556a1092 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -36,6 +36,8 @@ package object parallel {
private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString)
+ private[parallel] def getTaskSupport: TaskSupport = new TaskSupport {}
+
/* implicit conversions */
/** An implicit conversion providing arrays with a `par` method, which