summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-20 20:19:43 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-20 20:19:43 +0000
commiteeb70cd5f422e51d2be0658c4ad0b9e7f3d7b4fb (patch)
treecd9d135f97ec3dc433287ef99fa57b7e31d17925
parent2014160121a62681bdc0e873a3f7e9b5e3bbae16 (diff)
downloadscala-eeb70cd5f422e51d2be0658c4ad0b9e7f3d7b4fb.tar.gz
scala-eeb70cd5f422e51d2be0658c4ad0b9e7f3d7b4fb.tar.bz2
scala-eeb70cd5f422e51d2be0658c4ad0b9e7f3d7b4fb.zip
Refactoring certain tasks to accept empty split...
Refactoring certain tasks to accept empty splitters. Adding parallel mutable hash maps. No review
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala107
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala6
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala3
-rw-r--r--src/library/scala/collection/parallel/Splitter.scala11
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala17
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala2
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArray.scala4
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala55
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashSet.scala30
-rw-r--r--src/library/scala/collection/parallel/mutable/ParMap.scala53
-rw-r--r--src/library/scala/collection/parallel/mutable/ParMapLike.scala17
13 files changed, 251 insertions, 58 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 7bbc6c09f4..4d95043c3a 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -162,10 +162,6 @@ self =>
}
}
- /** Convenience for signal context passing iterator.
- */
- //type SCPI <: SignalContextPassingIterator[ParIterator]
-
/** Creates a new parallel iterator used to traverse the elements of this parallel collection.
* This iterator is more specific than the iterator of the returned by `iterator`, and augmented
* with additional accessor and transformer methods.
@@ -184,6 +180,18 @@ self =>
def par = repr
+ /** Denotes whether this parallel collection has strict splitters.
+ *
+ * This is true in general, and specific collection instances may choose to
+ * override this method. Such collections will fail to execute methods
+ * which rely on splitters being strict, i.e. returning a correct value
+ * in the `remaining` method.
+ *
+ * This method helps ensure that such failures occur on method invocations,
+ * rather than later on and in unpredictable ways.
+ */
+ def isStrictSplitterCollection = true
+
/** Some minimal number of elements after which this collection should be handled
* sequentially by different processors.
*
@@ -211,19 +219,21 @@ self =>
*/
protected def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]): Combiner[S, That] = newc
+ type SSCTask[R, Tp] = StrictSplitterCheckTask[R, Tp]
+
/* convenience task operations wrapper */
- protected implicit def task2ops[R, Tp](tsk: Task[R, Tp]) = new {
+ protected implicit def task2ops[R, Tp](tsk: SSCTask[R, Tp]) = new {
def mapResult[R1](mapping: R => R1): ResultMapping[R, Tp, R1] = new ResultMapping[R, Tp, R1](tsk) {
def map(r: R): R1 = mapping(r)
}
- def compose[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] {
+ def compose[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]] {
val ft = tsk
val st = t2
def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr)
}
- def parallel[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] {
+ def parallel[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]] {
val ft = tsk
val st = t2
def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr)
@@ -269,7 +279,7 @@ self =>
* if this $coll is empty.
*/
def reduce[U >: T](op: (U, U) => U): U = {
- executeAndWaitResult(new Reduce(op, parallelIterator))
+ executeAndWaitResult(new Reduce(op, parallelIterator) mapResult { _.get })
}
/** Optionally reduces the elements of this sequence using the specified associative binary operator.
@@ -355,7 +365,7 @@ self =>
* @param f function that's applied to each element
*/
override def foreach[U](f: T => U): Unit = {
- executeAndWait(new Foreach(f, parallelIterator))
+ executeAndWaitResult(new Foreach(f, parallelIterator))
}
override def count(p: T => Boolean): Int = {
@@ -371,11 +381,11 @@ self =>
}
override def min[U >: T](implicit ord: Ordering[U]): T = {
- executeAndWaitResult(new Min(ord, parallelIterator)).asInstanceOf[T]
+ executeAndWaitResult(new Min(ord, parallelIterator) mapResult { _.get }).asInstanceOf[T]
}
override def max[U >: T](implicit ord: Ordering[U]): T = {
- executeAndWaitResult(new Max(ord, parallelIterator)).asInstanceOf[T]
+ executeAndWaitResult(new Max(ord, parallelIterator) mapResult { _.get }).asInstanceOf[T]
}
override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf =>
@@ -428,7 +438,6 @@ self =>
}
protected[this] def cbfactory ={
- println(newCombiner + ", " + newCombiner.getClass)
() => newCombiner
}
@@ -604,7 +613,7 @@ self =>
}
override def copyToArray[U >: T](xs: Array[U], start: Int, len: Int) = if (len > 0) {
- executeAndWait(new CopyToArray(start, len, xs, parallelIterator))
+ executeAndWaitResult(new CopyToArray(start, len, xs, parallelIterator))
}
override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) {
@@ -647,13 +656,19 @@ self =>
/* tasks */
+ protected trait StrictSplitterCheckTask[R, Tp] extends super.Task[R, Tp] {
+ def requiresStrictSplitters = false
+ if (requiresStrictSplitters && !isStrictSplitterCollection)
+ throw new UnsupportedOperationException("This collection does not provide strict splitters.")
+ }
+
/** Standard accessor task that iterates over the elements of the collection.
*
* @tparam R type of the result of this method (`R` for result).
* @tparam Tp the representation type of the task at hand.
*/
protected trait Accessor[R, Tp]
- extends super.Task[R, Tp] {
+ extends StrictSplitterCheckTask[R, Tp] {
protected[this] val pit: ParIterableIterator[T]
protected[this] def newSubtask(p: ParIterableIterator[T]): Accessor[R, Tp]
def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel)
@@ -662,7 +677,7 @@ self =>
override def toString = "Accessor(" + pit.toString + ")"
}
- protected[this] trait NonDivisibleTask[R, Tp] extends super.Task[R, Tp] {
+ protected[this] trait NonDivisibleTask[R, Tp] extends StrictSplitterCheckTask[R, Tp] {
def shouldSplitFurther = false
def split = throw new UnsupportedOperationException("Does not split.")
override def toString = "NonDivisibleTask"
@@ -670,7 +685,7 @@ self =>
protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]]
- protected[this] trait Composite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]]
+ protected[this] trait Composite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]]
extends NonDivisibleTask[R, Composite[FR, SR, R, First, Second]] {
val ft: First
val st: Second
@@ -680,10 +695,11 @@ self =>
ft.signalAbort
st.signalAbort
}
+ override def requiresStrictSplitters = ft.requiresStrictSplitters || st.requiresStrictSplitters
}
/** Sequentially performs one task after another. */
- protected[this] trait SeqComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]]
+ protected[this] trait SeqComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]]
extends Composite[FR, SR, R, First, Second] {
def leaf(prevr: Option[R]) = {
ft.compute
@@ -693,7 +709,7 @@ self =>
}
/** Performs two tasks in parallel, and waits for both to finish. */
- protected[this] trait ParComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]]
+ protected[this] trait ParComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]]
extends Composite[FR, SR, R, First, Second] {
def leaf(prevr: Option[R]) = {
st.start
@@ -703,7 +719,7 @@ self =>
}
}
- protected[this] abstract class ResultMapping[R, Tp, R1](val inner: Task[R, Tp])
+ protected[this] abstract class ResultMapping[R, Tp, R1](val inner: StrictSplitterCheckTask[R, Tp])
extends NonDivisibleTask[R1, ResultMapping[R, Tp, R1]] {
var result: R1 = null.asInstanceOf[R1]
def map(r: R): R1
@@ -714,6 +730,7 @@ self =>
private[parallel] override def signalAbort {
inner.signalAbort
}
+ override def requiresStrictSplitters = inner.requiresStrictSplitters
}
protected trait Transformer[R, Tp] extends Accessor[R, Tp]
@@ -731,11 +748,14 @@ self =>
override def merge(that: Count) = result = result + that.result
}
- protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Reduce[U]] {
- var result: U = null.asInstanceOf[U]
- def leaf(prevr: Option[U]) = result = pit.reduce(op)
+ protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] {
+ var result: Option[U] = None
+ def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.reduce(op))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Reduce(op, p)
- override def merge(that: Reduce[U]) = result = op(result, that.result)
+ override def merge(that: Reduce[U]) =
+ if (this.result == None) result = that.result
+ else if (that.result != None) op(result.get, that.result.get)
+ override def requiresStrictSplitters = true
}
protected[this] class Fold[U >: T](z: U, op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Fold[U]] {
@@ -767,18 +787,24 @@ self =>
override def merge(that: Product[U]) = result = num.times(result, that.result)
}
- protected[this] class Min[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Min[U]] {
- var result: U = null.asInstanceOf[U]
- def leaf(prevr: Option[U]) = result = pit.min(ord)
+ protected[this] class Min[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Min[U]] {
+ var result: Option[U] = None
+ def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.min(ord))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Min(ord, p)
- override def merge(that: Min[U]) = result = if (ord.lteq(result, that.result)) result else that.result
+ override def merge(that: Min[U]) =
+ if (this.result == None) result = that.result
+ else if (that.result != None) result = if (ord.lteq(result.get, that.result.get)) result else that.result
+ override def requiresStrictSplitters = true
}
- protected[this] class Max[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Max[U]] {
- var result: U = null.asInstanceOf[U]
- def leaf(prevr: Option[U]) = result = pit.max(ord)
+ protected[this] class Max[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Max[U]] {
+ var result: Option[U] = None
+ def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.max(ord))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Max(ord, p)
- override def merge(that: Max[U]) = result = if (ord.gteq(result, that.result)) result else that.result
+ override def merge(that: Max[U]) =
+ if (this.result == None) result = that.result
+ else if (that.result != None) result = if (ord.gteq(result.get, that.result.get)) result else that.result
+ override def requiresStrictSplitters = true
}
protected[this] class Map[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: ParIterableIterator[T])
@@ -873,6 +899,7 @@ self =>
}
}
override def merge(that: Take[U, This]) = result = result combine that.result
+ override def requiresStrictSplitters = true
}
protected[this] class Drop[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
@@ -889,6 +916,7 @@ self =>
}
}
override def merge(that: Drop[U, This]) = result = result combine that.result
+ override def requiresStrictSplitters = true
}
protected[this] class Slice[U >: T, This >: Repr](from: Int, until: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
@@ -906,6 +934,7 @@ self =>
}
}
override def merge(that: Slice[U, This]) = result = result combine that.result
+ override def requiresStrictSplitters = true
}
protected[this] class SplitAt[U >: T, This >: Repr](at: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
@@ -919,6 +948,7 @@ self =>
for ((p, untilp) <- pits zip sizes) yield new SplitAt((at max untilp min (untilp + p.remaining)) - untilp, cbf, p)
}
override def merge(that: SplitAt[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2)
+ override def requiresStrictSplitters = true
}
protected[this] class TakeWhile[U >: T, This >: Repr]
@@ -937,6 +967,7 @@ self =>
override def merge(that: TakeWhile[U, This]) = if (result._2) {
result = (result._1 combine that.result._1, that.result._2)
}
+ override def requiresStrictSplitters = true
}
protected[this] class Span[U >: T, This >: Repr]
@@ -959,6 +990,7 @@ self =>
} else {
(result._1, result._2 combine that.result._1 combine that.result._2)
}
+ override def requiresStrictSplitters = true
}
protected[this] class Zip[U >: T, S, That](pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: ParSeqIterator[S])
@@ -973,6 +1005,7 @@ self =>
(pits zip opits) map { p => new Zip(pbf, p._1, p._2) }
}
override def merge(that: Zip[U, S, That]) = result = result combine that.result
+ override def requiresStrictSplitters = true
}
protected[this] class ZipAll[U >: T, S, That]
@@ -992,9 +1025,10 @@ self =>
Seq(
new ZipAll(pit.remaining, thiselem, thatelem, pbf, pit, opits(0)), // nothing wrong will happen with the cast below - elem T is never accessed
new ZipAll(diff, thiselem, thatelem, pbf, immutable.repetition(thiselem, diff).parallelIterator.asInstanceOf[ParIterableIterator[T]], opits(1))
- )
+ )
}
override def merge(that: ZipAll[U, S, That]) = result = result combine that.result
+ override def requiresStrictSplitters = true
}
protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], protected[this] val pit: ParIterableIterator[T])
@@ -1009,6 +1043,7 @@ self =>
new CopyToArray[U, This](from + untilp, plen, array, p)
}
}
+ override def requiresStrictSplitters = true
}
protected[this] class ScanTree[U >: T](val from: Int, val len: Int) {
@@ -1059,7 +1094,7 @@ self =>
}
protected[this] class ApplyToArray[U >: T, A >: U](elem: U, op: (U, U) => U, from: Int, len: Int, array: Array[A])
- extends super.Task[Unit, ApplyToArray[U, A]] {
+ extends StrictSplitterCheckTask[Unit, ApplyToArray[U, A]] {
var result: Unit = ()
def leaf(prev: Option[Unit]) = {
var i = from
@@ -1082,6 +1117,7 @@ self =>
protected[this] class BuildScanTree[U >: T, A >: U](z: U, op: (U, U) => U, val from: Int, val len: Int, array: Array[A], protected[this] val pit: ParIterableIterator[T])
extends Accessor[ScanTree[U], BuildScanTree[U, A]] {
+ // TODO reimplement - there are some issues here
var result: ScanTree[U] = null
def leaf(prev: Option[ScanTree[U]]) = if ((prev != None && prev.get.chunkFinished) || from == 1) {
val prevElem = if (from == 1) z else prev.get.value
@@ -1114,10 +1150,11 @@ self =>
// set result
result = ns
}
+ override def requiresStrictSplitters = true
}
protected[this] class ScanWithScanTree[U >: T, A >: U](first: Option[U], op: (U, U) => U, st: ScanTree[U], src: Array[A], dest: Array[A])
- extends super.Task[Unit, ScanWithScanTree[U, A]] {
+ extends StrictSplitterCheckTask[Unit, ScanWithScanTree[U, A]] {
var result = ();
def leaf(prev: Option[Unit]) = scan(st, first.get)
private def scan(st: ScanTree[U], elem: U): Unit = if (!st.chunkFinished) {
@@ -1135,7 +1172,7 @@ self =>
}
protected[this] class FromArray[S, A, That](array: Array[A], from: Int, len: Int, cbf: CanCombineFrom[Repr, S, That])
- extends super.Task[Combiner[S, That], FromArray[S, A, That]] {
+ extends StrictSplitterCheckTask[Combiner[S, That], FromArray[S, A, That]] {
var result: Result = null
def leaf(prev: Option[Result]) = {
val cb = prev getOrElse cbf(self.repr)
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index f26ef0348c..d2e0f965d6 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -338,6 +338,7 @@ self =>
for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new SegmentLength(pred, from + untilp, p)
}
override def merge(that: SegmentLength) = if (result._2) result = (result._1 + that.result._1, that.result._2)
+ override def requiresStrictSplitters = true
}
protected[this] class IndexWhere(pred: T => Boolean, from: Int, protected[this] val pit: ParSeqIterator[T])
@@ -358,6 +359,7 @@ self =>
override def merge(that: IndexWhere) = result = if (result == -1) that.result else {
if (that.result != -1) result min that.result else result
}
+ override def requiresStrictSplitters = true
}
protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, protected[this] val pit: ParSeqIterator[T])
@@ -378,6 +380,7 @@ self =>
override def merge(that: LastIndexWhere) = result = if (result == -1) that.result else {
if (that.result != -1) result max that.result else result
}
+ override def requiresStrictSplitters = true
}
protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], protected[this] val pit: ParSeqIterator[T])
@@ -410,6 +413,7 @@ self =>
for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new SameElements(p, op)
}
override def merge(that: SameElements[U]) = result = result && that.result
+ override def requiresStrictSplitters = true
}
protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], protected[this] val pit: ParSeqIterator[T])
@@ -422,6 +426,7 @@ self =>
for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Updated(pos - untilp, elem, pbf, p)
}
override def merge(that: Updated[U, That]) = result = result combine that.result
+ override def requiresStrictSplitters = true
}
protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParSeqIterator[T], val otherpit: ParSeqIterator[S])
@@ -456,6 +461,7 @@ self =>
for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new Corresponds(corr, p, op)
}
override def merge(that: Corresponds[S]) = result = result && that.result
+ override def requiresStrictSplitters = true
}
}
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index 686d08a301..4831c829ad 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -346,11 +346,12 @@ self =>
*
* '''Note''': This method may be implemented to return an upper bound on the number of elements
* in the iterator, instead of the exact number of elements to iterate.
+ * Parallel collections which have such iterators are called non-strict-splitter collections.
*
* In that case, 2 considerations must be taken into account:
*
* 1) classes that inherit `ParIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt`, `copyToArray`
- * and which use tasks having the iterated subset length as a ctor argument.
+ * and all others using this information.
*
* 2) if an iterator provides an upper bound on the number of elements, then after splitting the sum
* of `remaining` values of split iterators must be less than or equal to this upper bound.
diff --git a/src/library/scala/collection/parallel/Splitter.scala b/src/library/scala/collection/parallel/Splitter.scala
index c890fdf974..e598b96c82 100644
--- a/src/library/scala/collection/parallel/Splitter.scala
+++ b/src/library/scala/collection/parallel/Splitter.scala
@@ -30,6 +30,17 @@ trait Splitter[+T] extends Iterator[T] {
* @return a sequence of disjunct iterators of the collection
*/
def split: Seq[Splitter[T]]
+ /*
+ * '''Note:''' splitters in this sequence may actually be empty and it can contain a splitter
+ * which iterates over the same elements as the original splitter AS LONG AS calling `split`
+ * a finite number of times on the resulting splitters eventually returns a nontrivial partition.
+ *
+ * Note that the docs contract above yields implementations which are a subset of implementations
+ * defined by this fineprint.
+ *
+ * The rationale behind this is best given by the following example:
+ * try splitting an iterator over a linear hash table.
+ */
}
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index 3facaea7c3..915e02f787 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -92,9 +92,6 @@ trait Tasks {
/** Executes a task and returns a future. Forwards an exception if some task threw it. */
def execute[R, Tp](fjtask: TaskType[R, Tp]): () => R
- /** Executes a task and waits for it to finish. Forwards an exception if some task threw it. */
- def executeAndWait[R, Tp](task: TaskType[R, Tp])
-
/** Executes a result task, waits for it to finish, then returns its result. Forwards an exception if some task threw it. */
def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R
@@ -215,20 +212,6 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
}
/** Executes a task on a fork/join pool and waits for it to finish.
- *
- * $fjdispatch
- */
- def executeAndWait[R, Tp](fjtask: Task[R, Tp]) {
- if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
- fjtask.fork
- } else {
- forkJoinPool.execute(fjtask)
- }
- fjtask.join
- fjtask.forwardException
- }
-
- /** Executes a task on a fork/join pool and waits for it to finish.
* Returns its result when it does.
*
* $fjdispatch
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index c72eb66207..306ec68548 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -156,7 +156,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
val buckets = heads.filter(_ != null)
val root = new Array[HashMap[K, V]](buckets.length)
- executeAndWait(new CreateTrie(buckets, root, 0, buckets.length))
+ executeAndWaitResult(new CreateTrie(buckets, root, 0, buckets.length))
var bitmap = 0
var i = 0
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
index c9bda86d67..0ef2681567 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -151,7 +151,7 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] =>
val buckets = heads.filter(_ != null)
val root = new Array[HashSet[T]](buckets.length)
- executeAndWait(new CreateTrie(buckets, root, 0, buckets.length))
+ executeAndWaitResult(new CreateTrie(buckets, root, 0, buckets.length))
var bitmap = 0
var i = 0
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
index d2123a402f..0ca6858fb2 100644
--- a/src/library/scala/collection/parallel/mutable/ParArray.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -541,7 +541,7 @@ extends ParSeq[T]
val targetarr = targarrseq.array.asInstanceOf[Array[Any]]
// fill it in parallel
- executeAndWait(new Map[S](f, targetarr, 0, length))
+ executeAndWaitResult(new Map[S](f, targetarr, 0, length))
// wrap it into a parallel array
(new ParArray[S](targarrseq)).asInstanceOf[That]
@@ -554,7 +554,7 @@ extends ParSeq[T]
targetarr(0) = z
// do a parallel prefix scan
- executeAndWait(new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator) mapResult { st =>
+ executeAndWaitResult(new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator) mapResult { st =>
// println("-----------------------")
// println(targetarr.toList)
// st.printTree
diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
index cb0e589d9b..7e6bbd9333 100644
--- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
@@ -29,7 +29,7 @@ extends LazyCombiner[T, ParArray[T], ExposedArrayBuffer[T]]
val arrayseq = new ArraySeq[T](size)
val array = arrayseq.array.asInstanceOf[Array[Any]]
- executeAndWait(new CopyChainToArray(array, 0, size))
+ executeAndWaitResult(new CopyChainToArray(array, 0, size))
new ParArray(arrayseq)
} else { // optimisation if there is only 1 array
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
new file mode 100644
index 0000000000..db0c6ac50a
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -0,0 +1,55 @@
+package scala.collection.parallel
+package mutable
+
+
+
+
+
+
+
+
+
+/*
+class ParHashMap[K, +V]
+extends ParMap[K, V]
+ with GenericParMapTemplate[K, V, ParHashMap]
+ with ParMapLike[K, V]
+{
+self =>
+
+ override def mapCompanion: GenericParMapCompanion[ParHashMap] = ParHashMap
+
+ override def empty: ParHashMap[K, V] = new ParHashMap[K, V]
+
+ def parallelIterator = null // TODO
+
+ def seq = null // TODO
+
+
+
+}
+
+
+
+
+object ParHashMap {
+
+}
+
+*/
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/mutable/ParHashSet.scala b/src/library/scala/collection/parallel/mutable/ParHashSet.scala
new file mode 100644
index 0000000000..dc33ef3189
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParHashSet.scala
@@ -0,0 +1,30 @@
+package scala.collection.parallel.mutable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/mutable/ParMap.scala b/src/library/scala/collection/parallel/mutable/ParMap.scala
new file mode 100644
index 0000000000..c5fccf45ae
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParMap.scala
@@ -0,0 +1,53 @@
+package scala.collection.parallel.mutable
+
+
+
+
+import collection.generic._
+import collection.parallel.Combiner
+
+
+
+trait ParMap[K, V]
+extends collection.mutable.Map[K, V]
+ with collection.parallel.ParMap[K, V]
+ with ParIterable[(K, V)]
+ with GenericParMapTemplate[K, V, ParMap]
+ with ParMapLike[K, V, ParMap[K, V], collection.mutable.Map[K, V]]
+{
+
+ override def mapCompanion: GenericParMapCompanion[ParMap] = ParMap
+
+ override def empty: ParMap[K, V] = null // TODO
+
+}
+
+
+
+object ParMap extends ParMapFactory[ParMap] {
+ def empty[K, V]: ParMap[K, V] = null // TODO
+
+ def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = null // TODO
+
+ implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParMap[K, V]] = new CanCombineFromMap[K, V]
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/collection/parallel/mutable/ParMapLike.scala b/src/library/scala/collection/parallel/mutable/ParMapLike.scala
new file mode 100644
index 0000000000..902186eb2d
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParMapLike.scala
@@ -0,0 +1,17 @@
+package scala.collection.parallel
+package mutable
+
+
+
+import collection.generic._
+import collection.mutable.Builder
+
+
+
+trait ParMapLike[K,
+ V,
+ +Repr <: ParMapLike[K, V, Repr, Sequential] with ParMap[K, V],
+ +Sequential <: collection.mutable.Map[K, V] with collection.mutable.MapLike[K, V, Sequential]]
+extends collection.parallel.ParMapLike[K, V, Repr, Sequential]
+ with collection.mutable.MapLike[K, V, Repr]
+