diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-10-20 20:19:43 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-10-20 20:19:43 +0000 |
commit | eeb70cd5f422e51d2be0658c4ad0b9e7f3d7b4fb (patch) | |
tree | cd9d135f97ec3dc433287ef99fa57b7e31d17925 /src | |
parent | 2014160121a62681bdc0e873a3f7e9b5e3bbae16 (diff) | |
download | scala-eeb70cd5f422e51d2be0658c4ad0b9e7f3d7b4fb.tar.gz scala-eeb70cd5f422e51d2be0658c4ad0b9e7f3d7b4fb.tar.bz2 scala-eeb70cd5f422e51d2be0658c4ad0b9e7f3d7b4fb.zip |
Refactoring certain tasks to accept empty split...
Refactoring certain tasks to accept empty splitters. Adding parallel
mutable hash maps. No review
Diffstat (limited to 'src')
13 files changed, 251 insertions, 58 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 7bbc6c09f4..4d95043c3a 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -162,10 +162,6 @@ self => } } - /** Convenience for signal context passing iterator. - */ - //type SCPI <: SignalContextPassingIterator[ParIterator] - /** Creates a new parallel iterator used to traverse the elements of this parallel collection. * This iterator is more specific than the iterator of the returned by `iterator`, and augmented * with additional accessor and transformer methods. @@ -184,6 +180,18 @@ self => def par = repr + /** Denotes whether this parallel collection has strict splitters. + * + * This is true in general, and specific collection instances may choose to + * override this method. Such collections will fail to execute methods + * which rely on splitters being strict, i.e. returning a correct value + * in the `remaining` method. + * + * This method helps ensure that such failures occur on method invocations, + * rather than later on and in unpredictable ways. + */ + def isStrictSplitterCollection = true + /** Some minimal number of elements after which this collection should be handled * sequentially by different processors. * @@ -211,19 +219,21 @@ self => */ protected def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]): Combiner[S, That] = newc + type SSCTask[R, Tp] = StrictSplitterCheckTask[R, Tp] + /* convenience task operations wrapper */ - protected implicit def task2ops[R, Tp](tsk: Task[R, Tp]) = new { + protected implicit def task2ops[R, Tp](tsk: SSCTask[R, Tp]) = new { def mapResult[R1](mapping: R => R1): ResultMapping[R, Tp, R1] = new ResultMapping[R, Tp, R1](tsk) { def map(r: R): R1 = mapping(r) } - def compose[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] { + def compose[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]] { val ft = tsk val st = t2 def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr) } - def parallel[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] { + def parallel[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]] { val ft = tsk val st = t2 def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr) @@ -269,7 +279,7 @@ self => * if this $coll is empty. */ def reduce[U >: T](op: (U, U) => U): U = { - executeAndWaitResult(new Reduce(op, parallelIterator)) + executeAndWaitResult(new Reduce(op, parallelIterator) mapResult { _.get }) } /** Optionally reduces the elements of this sequence using the specified associative binary operator. @@ -355,7 +365,7 @@ self => * @param f function that's applied to each element */ override def foreach[U](f: T => U): Unit = { - executeAndWait(new Foreach(f, parallelIterator)) + executeAndWaitResult(new Foreach(f, parallelIterator)) } override def count(p: T => Boolean): Int = { @@ -371,11 +381,11 @@ self => } override def min[U >: T](implicit ord: Ordering[U]): T = { - executeAndWaitResult(new Min(ord, parallelIterator)).asInstanceOf[T] + executeAndWaitResult(new Min(ord, parallelIterator) mapResult { _.get }).asInstanceOf[T] } override def max[U >: T](implicit ord: Ordering[U]): T = { - executeAndWaitResult(new Max(ord, parallelIterator)).asInstanceOf[T] + executeAndWaitResult(new Max(ord, parallelIterator) mapResult { _.get }).asInstanceOf[T] } override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => @@ -428,7 +438,6 @@ self => } protected[this] def cbfactory ={ - println(newCombiner + ", " + newCombiner.getClass) () => newCombiner } @@ -604,7 +613,7 @@ self => } override def copyToArray[U >: T](xs: Array[U], start: Int, len: Int) = if (len > 0) { - executeAndWait(new CopyToArray(start, len, xs, parallelIterator)) + executeAndWaitResult(new CopyToArray(start, len, xs, parallelIterator)) } override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { @@ -647,13 +656,19 @@ self => /* tasks */ + protected trait StrictSplitterCheckTask[R, Tp] extends super.Task[R, Tp] { + def requiresStrictSplitters = false + if (requiresStrictSplitters && !isStrictSplitterCollection) + throw new UnsupportedOperationException("This collection does not provide strict splitters.") + } + /** Standard accessor task that iterates over the elements of the collection. * * @tparam R type of the result of this method (`R` for result). * @tparam Tp the representation type of the task at hand. */ protected trait Accessor[R, Tp] - extends super.Task[R, Tp] { + extends StrictSplitterCheckTask[R, Tp] { protected[this] val pit: ParIterableIterator[T] protected[this] def newSubtask(p: ParIterableIterator[T]): Accessor[R, Tp] def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel) @@ -662,7 +677,7 @@ self => override def toString = "Accessor(" + pit.toString + ")" } - protected[this] trait NonDivisibleTask[R, Tp] extends super.Task[R, Tp] { + protected[this] trait NonDivisibleTask[R, Tp] extends StrictSplitterCheckTask[R, Tp] { def shouldSplitFurther = false def split = throw new UnsupportedOperationException("Does not split.") override def toString = "NonDivisibleTask" @@ -670,7 +685,7 @@ self => protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]] - protected[this] trait Composite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] + protected[this] trait Composite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] extends NonDivisibleTask[R, Composite[FR, SR, R, First, Second]] { val ft: First val st: Second @@ -680,10 +695,11 @@ self => ft.signalAbort st.signalAbort } + override def requiresStrictSplitters = ft.requiresStrictSplitters || st.requiresStrictSplitters } /** Sequentially performs one task after another. */ - protected[this] trait SeqComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] + protected[this] trait SeqComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] extends Composite[FR, SR, R, First, Second] { def leaf(prevr: Option[R]) = { ft.compute @@ -693,7 +709,7 @@ self => } /** Performs two tasks in parallel, and waits for both to finish. */ - protected[this] trait ParComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] + protected[this] trait ParComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] extends Composite[FR, SR, R, First, Second] { def leaf(prevr: Option[R]) = { st.start @@ -703,7 +719,7 @@ self => } } - protected[this] abstract class ResultMapping[R, Tp, R1](val inner: Task[R, Tp]) + protected[this] abstract class ResultMapping[R, Tp, R1](val inner: StrictSplitterCheckTask[R, Tp]) extends NonDivisibleTask[R1, ResultMapping[R, Tp, R1]] { var result: R1 = null.asInstanceOf[R1] def map(r: R): R1 @@ -714,6 +730,7 @@ self => private[parallel] override def signalAbort { inner.signalAbort } + override def requiresStrictSplitters = inner.requiresStrictSplitters } protected trait Transformer[R, Tp] extends Accessor[R, Tp] @@ -731,11 +748,14 @@ self => override def merge(that: Count) = result = result + that.result } - protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Reduce[U]] { - var result: U = null.asInstanceOf[U] - def leaf(prevr: Option[U]) = result = pit.reduce(op) + protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] { + var result: Option[U] = None + def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.reduce(op)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Reduce(op, p) - override def merge(that: Reduce[U]) = result = op(result, that.result) + override def merge(that: Reduce[U]) = + if (this.result == None) result = that.result + else if (that.result != None) op(result.get, that.result.get) + override def requiresStrictSplitters = true } protected[this] class Fold[U >: T](z: U, op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Fold[U]] { @@ -767,18 +787,24 @@ self => override def merge(that: Product[U]) = result = num.times(result, that.result) } - protected[this] class Min[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Min[U]] { - var result: U = null.asInstanceOf[U] - def leaf(prevr: Option[U]) = result = pit.min(ord) + protected[this] class Min[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Min[U]] { + var result: Option[U] = None + def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.min(ord)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Min(ord, p) - override def merge(that: Min[U]) = result = if (ord.lteq(result, that.result)) result else that.result + override def merge(that: Min[U]) = + if (this.result == None) result = that.result + else if (that.result != None) result = if (ord.lteq(result.get, that.result.get)) result else that.result + override def requiresStrictSplitters = true } - protected[this] class Max[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Max[U]] { - var result: U = null.asInstanceOf[U] - def leaf(prevr: Option[U]) = result = pit.max(ord) + protected[this] class Max[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Max[U]] { + var result: Option[U] = None + def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.max(ord)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Max(ord, p) - override def merge(that: Max[U]) = result = if (ord.gteq(result, that.result)) result else that.result + override def merge(that: Max[U]) = + if (this.result == None) result = that.result + else if (that.result != None) result = if (ord.gteq(result.get, that.result.get)) result else that.result + override def requiresStrictSplitters = true } protected[this] class Map[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: ParIterableIterator[T]) @@ -873,6 +899,7 @@ self => } } override def merge(that: Take[U, This]) = result = result combine that.result + override def requiresStrictSplitters = true } protected[this] class Drop[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) @@ -889,6 +916,7 @@ self => } } override def merge(that: Drop[U, This]) = result = result combine that.result + override def requiresStrictSplitters = true } protected[this] class Slice[U >: T, This >: Repr](from: Int, until: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) @@ -906,6 +934,7 @@ self => } } override def merge(that: Slice[U, This]) = result = result combine that.result + override def requiresStrictSplitters = true } protected[this] class SplitAt[U >: T, This >: Repr](at: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) @@ -919,6 +948,7 @@ self => for ((p, untilp) <- pits zip sizes) yield new SplitAt((at max untilp min (untilp + p.remaining)) - untilp, cbf, p) } override def merge(that: SplitAt[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2) + override def requiresStrictSplitters = true } protected[this] class TakeWhile[U >: T, This >: Repr] @@ -937,6 +967,7 @@ self => override def merge(that: TakeWhile[U, This]) = if (result._2) { result = (result._1 combine that.result._1, that.result._2) } + override def requiresStrictSplitters = true } protected[this] class Span[U >: T, This >: Repr] @@ -959,6 +990,7 @@ self => } else { (result._1, result._2 combine that.result._1 combine that.result._2) } + override def requiresStrictSplitters = true } protected[this] class Zip[U >: T, S, That](pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: ParSeqIterator[S]) @@ -973,6 +1005,7 @@ self => (pits zip opits) map { p => new Zip(pbf, p._1, p._2) } } override def merge(that: Zip[U, S, That]) = result = result combine that.result + override def requiresStrictSplitters = true } protected[this] class ZipAll[U >: T, S, That] @@ -992,9 +1025,10 @@ self => Seq( new ZipAll(pit.remaining, thiselem, thatelem, pbf, pit, opits(0)), // nothing wrong will happen with the cast below - elem T is never accessed new ZipAll(diff, thiselem, thatelem, pbf, immutable.repetition(thiselem, diff).parallelIterator.asInstanceOf[ParIterableIterator[T]], opits(1)) - ) + ) } override def merge(that: ZipAll[U, S, That]) = result = result combine that.result + override def requiresStrictSplitters = true } protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], protected[this] val pit: ParIterableIterator[T]) @@ -1009,6 +1043,7 @@ self => new CopyToArray[U, This](from + untilp, plen, array, p) } } + override def requiresStrictSplitters = true } protected[this] class ScanTree[U >: T](val from: Int, val len: Int) { @@ -1059,7 +1094,7 @@ self => } protected[this] class ApplyToArray[U >: T, A >: U](elem: U, op: (U, U) => U, from: Int, len: Int, array: Array[A]) - extends super.Task[Unit, ApplyToArray[U, A]] { + extends StrictSplitterCheckTask[Unit, ApplyToArray[U, A]] { var result: Unit = () def leaf(prev: Option[Unit]) = { var i = from @@ -1082,6 +1117,7 @@ self => protected[this] class BuildScanTree[U >: T, A >: U](z: U, op: (U, U) => U, val from: Int, val len: Int, array: Array[A], protected[this] val pit: ParIterableIterator[T]) extends Accessor[ScanTree[U], BuildScanTree[U, A]] { + // TODO reimplement - there are some issues here var result: ScanTree[U] = null def leaf(prev: Option[ScanTree[U]]) = if ((prev != None && prev.get.chunkFinished) || from == 1) { val prevElem = if (from == 1) z else prev.get.value @@ -1114,10 +1150,11 @@ self => // set result result = ns } + override def requiresStrictSplitters = true } protected[this] class ScanWithScanTree[U >: T, A >: U](first: Option[U], op: (U, U) => U, st: ScanTree[U], src: Array[A], dest: Array[A]) - extends super.Task[Unit, ScanWithScanTree[U, A]] { + extends StrictSplitterCheckTask[Unit, ScanWithScanTree[U, A]] { var result = (); def leaf(prev: Option[Unit]) = scan(st, first.get) private def scan(st: ScanTree[U], elem: U): Unit = if (!st.chunkFinished) { @@ -1135,7 +1172,7 @@ self => } protected[this] class FromArray[S, A, That](array: Array[A], from: Int, len: Int, cbf: CanCombineFrom[Repr, S, That]) - extends super.Task[Combiner[S, That], FromArray[S, A, That]] { + extends StrictSplitterCheckTask[Combiner[S, That], FromArray[S, A, That]] { var result: Result = null def leaf(prev: Option[Result]) = { val cb = prev getOrElse cbf(self.repr) diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index f26ef0348c..d2e0f965d6 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -338,6 +338,7 @@ self => for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new SegmentLength(pred, from + untilp, p) } override def merge(that: SegmentLength) = if (result._2) result = (result._1 + that.result._1, that.result._2) + override def requiresStrictSplitters = true } protected[this] class IndexWhere(pred: T => Boolean, from: Int, protected[this] val pit: ParSeqIterator[T]) @@ -358,6 +359,7 @@ self => override def merge(that: IndexWhere) = result = if (result == -1) that.result else { if (that.result != -1) result min that.result else result } + override def requiresStrictSplitters = true } protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, protected[this] val pit: ParSeqIterator[T]) @@ -378,6 +380,7 @@ self => override def merge(that: LastIndexWhere) = result = if (result == -1) that.result else { if (that.result != -1) result max that.result else result } + override def requiresStrictSplitters = true } protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], protected[this] val pit: ParSeqIterator[T]) @@ -410,6 +413,7 @@ self => for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new SameElements(p, op) } override def merge(that: SameElements[U]) = result = result && that.result + override def requiresStrictSplitters = true } protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], protected[this] val pit: ParSeqIterator[T]) @@ -422,6 +426,7 @@ self => for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Updated(pos - untilp, elem, pbf, p) } override def merge(that: Updated[U, That]) = result = result combine that.result + override def requiresStrictSplitters = true } protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParSeqIterator[T], val otherpit: ParSeqIterator[S]) @@ -456,6 +461,7 @@ self => for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new Corresponds(corr, p, op) } override def merge(that: Corresponds[S]) = result = result && that.result + override def requiresStrictSplitters = true } } diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index 686d08a301..4831c829ad 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -346,11 +346,12 @@ self => * * '''Note''': This method may be implemented to return an upper bound on the number of elements * in the iterator, instead of the exact number of elements to iterate. + * Parallel collections which have such iterators are called non-strict-splitter collections. * * In that case, 2 considerations must be taken into account: * * 1) classes that inherit `ParIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt`, `copyToArray` - * and which use tasks having the iterated subset length as a ctor argument. + * and all others using this information. * * 2) if an iterator provides an upper bound on the number of elements, then after splitting the sum * of `remaining` values of split iterators must be less than or equal to this upper bound. diff --git a/src/library/scala/collection/parallel/Splitter.scala b/src/library/scala/collection/parallel/Splitter.scala index c890fdf974..e598b96c82 100644 --- a/src/library/scala/collection/parallel/Splitter.scala +++ b/src/library/scala/collection/parallel/Splitter.scala @@ -30,6 +30,17 @@ trait Splitter[+T] extends Iterator[T] { * @return a sequence of disjunct iterators of the collection */ def split: Seq[Splitter[T]] + /* + * '''Note:''' splitters in this sequence may actually be empty and it can contain a splitter + * which iterates over the same elements as the original splitter AS LONG AS calling `split` + * a finite number of times on the resulting splitters eventually returns a nontrivial partition. + * + * Note that the docs contract above yields implementations which are a subset of implementations + * defined by this fineprint. + * + * The rationale behind this is best given by the following example: + * try splitting an iterator over a linear hash table. + */ } diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index 3facaea7c3..915e02f787 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -92,9 +92,6 @@ trait Tasks { /** Executes a task and returns a future. Forwards an exception if some task threw it. */ def execute[R, Tp](fjtask: TaskType[R, Tp]): () => R - /** Executes a task and waits for it to finish. Forwards an exception if some task threw it. */ - def executeAndWait[R, Tp](task: TaskType[R, Tp]) - /** Executes a result task, waits for it to finish, then returns its result. Forwards an exception if some task threw it. */ def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R @@ -215,20 +212,6 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { } /** Executes a task on a fork/join pool and waits for it to finish. - * - * $fjdispatch - */ - def executeAndWait[R, Tp](fjtask: Task[R, Tp]) { - if (currentThread.isInstanceOf[ForkJoinWorkerThread]) { - fjtask.fork - } else { - forkJoinPool.execute(fjtask) - } - fjtask.join - fjtask.forwardException - } - - /** Executes a task on a fork/join pool and waits for it to finish. * Returns its result when it does. * * $fjdispatch diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index c72eb66207..306ec68548 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -156,7 +156,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => val buckets = heads.filter(_ != null) val root = new Array[HashMap[K, V]](buckets.length) - executeAndWait(new CreateTrie(buckets, root, 0, buckets.length)) + executeAndWaitResult(new CreateTrie(buckets, root, 0, buckets.length)) var bitmap = 0 var i = 0 diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala index c9bda86d67..0ef2681567 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala @@ -151,7 +151,7 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] => val buckets = heads.filter(_ != null) val root = new Array[HashSet[T]](buckets.length) - executeAndWait(new CreateTrie(buckets, root, 0, buckets.length)) + executeAndWaitResult(new CreateTrie(buckets, root, 0, buckets.length)) var bitmap = 0 var i = 0 diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index d2123a402f..0ca6858fb2 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -541,7 +541,7 @@ extends ParSeq[T] val targetarr = targarrseq.array.asInstanceOf[Array[Any]] // fill it in parallel - executeAndWait(new Map[S](f, targetarr, 0, length)) + executeAndWaitResult(new Map[S](f, targetarr, 0, length)) // wrap it into a parallel array (new ParArray[S](targarrseq)).asInstanceOf[That] @@ -554,7 +554,7 @@ extends ParSeq[T] targetarr(0) = z // do a parallel prefix scan - executeAndWait(new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator) mapResult { st => + executeAndWaitResult(new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator) mapResult { st => // println("-----------------------") // println(targetarr.toList) // st.printTree diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala index cb0e589d9b..7e6bbd9333 100644 --- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala +++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala @@ -29,7 +29,7 @@ extends LazyCombiner[T, ParArray[T], ExposedArrayBuffer[T]] val arrayseq = new ArraySeq[T](size) val array = arrayseq.array.asInstanceOf[Array[Any]] - executeAndWait(new CopyChainToArray(array, 0, size)) + executeAndWaitResult(new CopyChainToArray(array, 0, size)) new ParArray(arrayseq) } else { // optimisation if there is only 1 array diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala new file mode 100644 index 0000000000..db0c6ac50a --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala @@ -0,0 +1,55 @@ +package scala.collection.parallel +package mutable + + + + + + + + + +/* +class ParHashMap[K, +V] +extends ParMap[K, V] + with GenericParMapTemplate[K, V, ParHashMap] + with ParMapLike[K, V] +{ +self => + + override def mapCompanion: GenericParMapCompanion[ParHashMap] = ParHashMap + + override def empty: ParHashMap[K, V] = new ParHashMap[K, V] + + def parallelIterator = null // TODO + + def seq = null // TODO + + + +} + + + + +object ParHashMap { + +} + +*/ + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/mutable/ParHashSet.scala b/src/library/scala/collection/parallel/mutable/ParHashSet.scala new file mode 100644 index 0000000000..dc33ef3189 --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParHashSet.scala @@ -0,0 +1,30 @@ +package scala.collection.parallel.mutable + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/mutable/ParMap.scala b/src/library/scala/collection/parallel/mutable/ParMap.scala new file mode 100644 index 0000000000..c5fccf45ae --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParMap.scala @@ -0,0 +1,53 @@ +package scala.collection.parallel.mutable + + + + +import collection.generic._ +import collection.parallel.Combiner + + + +trait ParMap[K, V] +extends collection.mutable.Map[K, V] + with collection.parallel.ParMap[K, V] + with ParIterable[(K, V)] + with GenericParMapTemplate[K, V, ParMap] + with ParMapLike[K, V, ParMap[K, V], collection.mutable.Map[K, V]] +{ + + override def mapCompanion: GenericParMapCompanion[ParMap] = ParMap + + override def empty: ParMap[K, V] = null // TODO + +} + + + +object ParMap extends ParMapFactory[ParMap] { + def empty[K, V]: ParMap[K, V] = null // TODO + + def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = null // TODO + + implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParMap[K, V]] = new CanCombineFromMap[K, V] + +} + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/mutable/ParMapLike.scala b/src/library/scala/collection/parallel/mutable/ParMapLike.scala new file mode 100644 index 0000000000..902186eb2d --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParMapLike.scala @@ -0,0 +1,17 @@ +package scala.collection.parallel +package mutable + + + +import collection.generic._ +import collection.mutable.Builder + + + +trait ParMapLike[K, + V, + +Repr <: ParMapLike[K, V, Repr, Sequential] with ParMap[K, V], + +Sequential <: collection.mutable.Map[K, V] with collection.mutable.MapLike[K, V, Sequential]] +extends collection.parallel.ParMapLike[K, V, Repr, Sequential] + with collection.mutable.MapLike[K, V, Repr] + |