From eeb70cd5f422e51d2be0658c4ad0b9e7f3d7b4fb Mon Sep 17 00:00:00 2001 From: Aleksandar Pokopec Date: Wed, 20 Oct 2010 20:19:43 +0000 Subject: Refactoring certain tasks to accept empty split... Refactoring certain tasks to accept empty splitters. Adding parallel mutable hash maps. No review --- .../collection/parallel/ParIterableLike.scala | 107 ++++++++++++++------- .../scala/collection/parallel/ParSeqLike.scala | 6 ++ .../collection/parallel/RemainsIterator.scala | 3 +- .../scala/collection/parallel/Splitter.scala | 11 +++ src/library/scala/collection/parallel/Tasks.scala | 17 ---- .../collection/parallel/immutable/ParHashMap.scala | 2 +- .../collection/parallel/immutable/ParHashSet.scala | 2 +- .../collection/parallel/mutable/ParArray.scala | 4 +- .../parallel/mutable/ParArrayCombiner.scala | 2 +- .../collection/parallel/mutable/ParHashMap.scala | 55 +++++++++++ .../collection/parallel/mutable/ParHashSet.scala | 30 ++++++ .../scala/collection/parallel/mutable/ParMap.scala | 53 ++++++++++ .../collection/parallel/mutable/ParMapLike.scala | 17 ++++ 13 files changed, 251 insertions(+), 58 deletions(-) create mode 100644 src/library/scala/collection/parallel/mutable/ParHashMap.scala create mode 100644 src/library/scala/collection/parallel/mutable/ParHashSet.scala create mode 100644 src/library/scala/collection/parallel/mutable/ParMap.scala create mode 100644 src/library/scala/collection/parallel/mutable/ParMapLike.scala diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 7bbc6c09f4..4d95043c3a 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -162,10 +162,6 @@ self => } } - /** Convenience for signal context passing iterator. - */ - //type SCPI <: SignalContextPassingIterator[ParIterator] - /** Creates a new parallel iterator used to traverse the elements of this parallel collection. * This iterator is more specific than the iterator of the returned by `iterator`, and augmented * with additional accessor and transformer methods. @@ -184,6 +180,18 @@ self => def par = repr + /** Denotes whether this parallel collection has strict splitters. + * + * This is true in general, and specific collection instances may choose to + * override this method. Such collections will fail to execute methods + * which rely on splitters being strict, i.e. returning a correct value + * in the `remaining` method. + * + * This method helps ensure that such failures occur on method invocations, + * rather than later on and in unpredictable ways. + */ + def isStrictSplitterCollection = true + /** Some minimal number of elements after which this collection should be handled * sequentially by different processors. * @@ -211,19 +219,21 @@ self => */ protected def reuse[S, That](oldc: Option[Combiner[S, That]], newc: Combiner[S, That]): Combiner[S, That] = newc + type SSCTask[R, Tp] = StrictSplitterCheckTask[R, Tp] + /* convenience task operations wrapper */ - protected implicit def task2ops[R, Tp](tsk: Task[R, Tp]) = new { + protected implicit def task2ops[R, Tp](tsk: SSCTask[R, Tp]) = new { def mapResult[R1](mapping: R => R1): ResultMapping[R, Tp, R1] = new ResultMapping[R, Tp, R1](tsk) { def map(r: R): R1 = mapping(r) } - def compose[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] { + def compose[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]] { val ft = tsk val st = t2 def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr) } - def parallel[R3, R2, Tp2](t2: Task[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, Task[R, Tp], Task[R2, Tp2]] { + def parallel[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]] { val ft = tsk val st = t2 def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr) @@ -269,7 +279,7 @@ self => * if this $coll is empty. */ def reduce[U >: T](op: (U, U) => U): U = { - executeAndWaitResult(new Reduce(op, parallelIterator)) + executeAndWaitResult(new Reduce(op, parallelIterator) mapResult { _.get }) } /** Optionally reduces the elements of this sequence using the specified associative binary operator. @@ -355,7 +365,7 @@ self => * @param f function that's applied to each element */ override def foreach[U](f: T => U): Unit = { - executeAndWait(new Foreach(f, parallelIterator)) + executeAndWaitResult(new Foreach(f, parallelIterator)) } override def count(p: T => Boolean): Int = { @@ -371,11 +381,11 @@ self => } override def min[U >: T](implicit ord: Ordering[U]): T = { - executeAndWaitResult(new Min(ord, parallelIterator)).asInstanceOf[T] + executeAndWaitResult(new Min(ord, parallelIterator) mapResult { _.get }).asInstanceOf[T] } override def max[U >: T](implicit ord: Ordering[U]): T = { - executeAndWaitResult(new Max(ord, parallelIterator)).asInstanceOf[T] + executeAndWaitResult(new Max(ord, parallelIterator) mapResult { _.get }).asInstanceOf[T] } override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf => @@ -428,7 +438,6 @@ self => } protected[this] def cbfactory ={ - println(newCombiner + ", " + newCombiner.getClass) () => newCombiner } @@ -604,7 +613,7 @@ self => } override def copyToArray[U >: T](xs: Array[U], start: Int, len: Int) = if (len > 0) { - executeAndWait(new CopyToArray(start, len, xs, parallelIterator)) + executeAndWaitResult(new CopyToArray(start, len, xs, parallelIterator)) } override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { @@ -647,13 +656,19 @@ self => /* tasks */ + protected trait StrictSplitterCheckTask[R, Tp] extends super.Task[R, Tp] { + def requiresStrictSplitters = false + if (requiresStrictSplitters && !isStrictSplitterCollection) + throw new UnsupportedOperationException("This collection does not provide strict splitters.") + } + /** Standard accessor task that iterates over the elements of the collection. * * @tparam R type of the result of this method (`R` for result). * @tparam Tp the representation type of the task at hand. */ protected trait Accessor[R, Tp] - extends super.Task[R, Tp] { + extends StrictSplitterCheckTask[R, Tp] { protected[this] val pit: ParIterableIterator[T] protected[this] def newSubtask(p: ParIterableIterator[T]): Accessor[R, Tp] def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel) @@ -662,7 +677,7 @@ self => override def toString = "Accessor(" + pit.toString + ")" } - protected[this] trait NonDivisibleTask[R, Tp] extends super.Task[R, Tp] { + protected[this] trait NonDivisibleTask[R, Tp] extends StrictSplitterCheckTask[R, Tp] { def shouldSplitFurther = false def split = throw new UnsupportedOperationException("Does not split.") override def toString = "NonDivisibleTask" @@ -670,7 +685,7 @@ self => protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]] - protected[this] trait Composite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] + protected[this] trait Composite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] extends NonDivisibleTask[R, Composite[FR, SR, R, First, Second]] { val ft: First val st: Second @@ -680,10 +695,11 @@ self => ft.signalAbort st.signalAbort } + override def requiresStrictSplitters = ft.requiresStrictSplitters || st.requiresStrictSplitters } /** Sequentially performs one task after another. */ - protected[this] trait SeqComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] + protected[this] trait SeqComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] extends Composite[FR, SR, R, First, Second] { def leaf(prevr: Option[R]) = { ft.compute @@ -693,7 +709,7 @@ self => } /** Performs two tasks in parallel, and waits for both to finish. */ - protected[this] trait ParComposite[FR, SR, R, First <: super.Task[FR, _], Second <: super.Task[SR, _]] + protected[this] trait ParComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]] extends Composite[FR, SR, R, First, Second] { def leaf(prevr: Option[R]) = { st.start @@ -703,7 +719,7 @@ self => } } - protected[this] abstract class ResultMapping[R, Tp, R1](val inner: Task[R, Tp]) + protected[this] abstract class ResultMapping[R, Tp, R1](val inner: StrictSplitterCheckTask[R, Tp]) extends NonDivisibleTask[R1, ResultMapping[R, Tp, R1]] { var result: R1 = null.asInstanceOf[R1] def map(r: R): R1 @@ -714,6 +730,7 @@ self => private[parallel] override def signalAbort { inner.signalAbort } + override def requiresStrictSplitters = inner.requiresStrictSplitters } protected trait Transformer[R, Tp] extends Accessor[R, Tp] @@ -731,11 +748,14 @@ self => override def merge(that: Count) = result = result + that.result } - protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Reduce[U]] { - var result: U = null.asInstanceOf[U] - def leaf(prevr: Option[U]) = result = pit.reduce(op) + protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] { + var result: Option[U] = None + def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.reduce(op)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Reduce(op, p) - override def merge(that: Reduce[U]) = result = op(result, that.result) + override def merge(that: Reduce[U]) = + if (this.result == None) result = that.result + else if (that.result != None) op(result.get, that.result.get) + override def requiresStrictSplitters = true } protected[this] class Fold[U >: T](z: U, op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Fold[U]] { @@ -767,18 +787,24 @@ self => override def merge(that: Product[U]) = result = num.times(result, that.result) } - protected[this] class Min[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Min[U]] { - var result: U = null.asInstanceOf[U] - def leaf(prevr: Option[U]) = result = pit.min(ord) + protected[this] class Min[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Min[U]] { + var result: Option[U] = None + def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.min(ord)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Min(ord, p) - override def merge(that: Min[U]) = result = if (ord.lteq(result, that.result)) result else that.result + override def merge(that: Min[U]) = + if (this.result == None) result = that.result + else if (that.result != None) result = if (ord.lteq(result.get, that.result.get)) result else that.result + override def requiresStrictSplitters = true } - protected[this] class Max[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[U, Max[U]] { - var result: U = null.asInstanceOf[U] - def leaf(prevr: Option[U]) = result = pit.max(ord) + protected[this] class Max[U >: T](ord: Ordering[U], protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Max[U]] { + var result: Option[U] = None + def leaf(prevr: Option[Option[U]]) = if (pit.remaining > 0) result = Some(pit.max(ord)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Max(ord, p) - override def merge(that: Max[U]) = result = if (ord.gteq(result, that.result)) result else that.result + override def merge(that: Max[U]) = + if (this.result == None) result = that.result + else if (that.result != None) result = if (ord.gteq(result.get, that.result.get)) result else that.result + override def requiresStrictSplitters = true } protected[this] class Map[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: ParIterableIterator[T]) @@ -873,6 +899,7 @@ self => } } override def merge(that: Take[U, This]) = result = result combine that.result + override def requiresStrictSplitters = true } protected[this] class Drop[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) @@ -889,6 +916,7 @@ self => } } override def merge(that: Drop[U, This]) = result = result combine that.result + override def requiresStrictSplitters = true } protected[this] class Slice[U >: T, This >: Repr](from: Int, until: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) @@ -906,6 +934,7 @@ self => } } override def merge(that: Slice[U, This]) = result = result combine that.result + override def requiresStrictSplitters = true } protected[this] class SplitAt[U >: T, This >: Repr](at: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) @@ -919,6 +948,7 @@ self => for ((p, untilp) <- pits zip sizes) yield new SplitAt((at max untilp min (untilp + p.remaining)) - untilp, cbf, p) } override def merge(that: SplitAt[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2) + override def requiresStrictSplitters = true } protected[this] class TakeWhile[U >: T, This >: Repr] @@ -937,6 +967,7 @@ self => override def merge(that: TakeWhile[U, This]) = if (result._2) { result = (result._1 combine that.result._1, that.result._2) } + override def requiresStrictSplitters = true } protected[this] class Span[U >: T, This >: Repr] @@ -959,6 +990,7 @@ self => } else { (result._1, result._2 combine that.result._1 combine that.result._2) } + override def requiresStrictSplitters = true } protected[this] class Zip[U >: T, S, That](pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParIterableIterator[T], val othpit: ParSeqIterator[S]) @@ -973,6 +1005,7 @@ self => (pits zip opits) map { p => new Zip(pbf, p._1, p._2) } } override def merge(that: Zip[U, S, That]) = result = result combine that.result + override def requiresStrictSplitters = true } protected[this] class ZipAll[U >: T, S, That] @@ -992,9 +1025,10 @@ self => Seq( new ZipAll(pit.remaining, thiselem, thatelem, pbf, pit, opits(0)), // nothing wrong will happen with the cast below - elem T is never accessed new ZipAll(diff, thiselem, thatelem, pbf, immutable.repetition(thiselem, diff).parallelIterator.asInstanceOf[ParIterableIterator[T]], opits(1)) - ) + ) } override def merge(that: ZipAll[U, S, That]) = result = result combine that.result + override def requiresStrictSplitters = true } protected[this] class CopyToArray[U >: T, This >: Repr](from: Int, len: Int, array: Array[U], protected[this] val pit: ParIterableIterator[T]) @@ -1009,6 +1043,7 @@ self => new CopyToArray[U, This](from + untilp, plen, array, p) } } + override def requiresStrictSplitters = true } protected[this] class ScanTree[U >: T](val from: Int, val len: Int) { @@ -1059,7 +1094,7 @@ self => } protected[this] class ApplyToArray[U >: T, A >: U](elem: U, op: (U, U) => U, from: Int, len: Int, array: Array[A]) - extends super.Task[Unit, ApplyToArray[U, A]] { + extends StrictSplitterCheckTask[Unit, ApplyToArray[U, A]] { var result: Unit = () def leaf(prev: Option[Unit]) = { var i = from @@ -1082,6 +1117,7 @@ self => protected[this] class BuildScanTree[U >: T, A >: U](z: U, op: (U, U) => U, val from: Int, val len: Int, array: Array[A], protected[this] val pit: ParIterableIterator[T]) extends Accessor[ScanTree[U], BuildScanTree[U, A]] { + // TODO reimplement - there are some issues here var result: ScanTree[U] = null def leaf(prev: Option[ScanTree[U]]) = if ((prev != None && prev.get.chunkFinished) || from == 1) { val prevElem = if (from == 1) z else prev.get.value @@ -1114,10 +1150,11 @@ self => // set result result = ns } + override def requiresStrictSplitters = true } protected[this] class ScanWithScanTree[U >: T, A >: U](first: Option[U], op: (U, U) => U, st: ScanTree[U], src: Array[A], dest: Array[A]) - extends super.Task[Unit, ScanWithScanTree[U, A]] { + extends StrictSplitterCheckTask[Unit, ScanWithScanTree[U, A]] { var result = (); def leaf(prev: Option[Unit]) = scan(st, first.get) private def scan(st: ScanTree[U], elem: U): Unit = if (!st.chunkFinished) { @@ -1135,7 +1172,7 @@ self => } protected[this] class FromArray[S, A, That](array: Array[A], from: Int, len: Int, cbf: CanCombineFrom[Repr, S, That]) - extends super.Task[Combiner[S, That], FromArray[S, A, That]] { + extends StrictSplitterCheckTask[Combiner[S, That], FromArray[S, A, That]] { var result: Result = null def leaf(prev: Option[Result]) = { val cb = prev getOrElse cbf(self.repr) diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index f26ef0348c..d2e0f965d6 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -338,6 +338,7 @@ self => for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new SegmentLength(pred, from + untilp, p) } override def merge(that: SegmentLength) = if (result._2) result = (result._1 + that.result._1, that.result._2) + override def requiresStrictSplitters = true } protected[this] class IndexWhere(pred: T => Boolean, from: Int, protected[this] val pit: ParSeqIterator[T]) @@ -358,6 +359,7 @@ self => override def merge(that: IndexWhere) = result = if (result == -1) that.result else { if (that.result != -1) result min that.result else result } + override def requiresStrictSplitters = true } protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, protected[this] val pit: ParSeqIterator[T]) @@ -378,6 +380,7 @@ self => override def merge(that: LastIndexWhere) = result = if (result == -1) that.result else { if (that.result != -1) result max that.result else result } + override def requiresStrictSplitters = true } protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], protected[this] val pit: ParSeqIterator[T]) @@ -410,6 +413,7 @@ self => for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new SameElements(p, op) } override def merge(that: SameElements[U]) = result = result && that.result + override def requiresStrictSplitters = true } protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], protected[this] val pit: ParSeqIterator[T]) @@ -422,6 +426,7 @@ self => for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Updated(pos - untilp, elem, pbf, p) } override def merge(that: Updated[U, That]) = result = result combine that.result + override def requiresStrictSplitters = true } protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: ParSeqIterator[T], val otherpit: ParSeqIterator[S]) @@ -456,6 +461,7 @@ self => for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new Corresponds(corr, p, op) } override def merge(that: Corresponds[S]) = result = result && that.result + override def requiresStrictSplitters = true } } diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala index 686d08a301..4831c829ad 100644 --- a/src/library/scala/collection/parallel/RemainsIterator.scala +++ b/src/library/scala/collection/parallel/RemainsIterator.scala @@ -346,11 +346,12 @@ self => * * '''Note''': This method may be implemented to return an upper bound on the number of elements * in the iterator, instead of the exact number of elements to iterate. + * Parallel collections which have such iterators are called non-strict-splitter collections. * * In that case, 2 considerations must be taken into account: * * 1) classes that inherit `ParIterable` must reimplement methods `take`, `drop`, `slice`, `splitAt`, `copyToArray` - * and which use tasks having the iterated subset length as a ctor argument. + * and all others using this information. * * 2) if an iterator provides an upper bound on the number of elements, then after splitting the sum * of `remaining` values of split iterators must be less than or equal to this upper bound. diff --git a/src/library/scala/collection/parallel/Splitter.scala b/src/library/scala/collection/parallel/Splitter.scala index c890fdf974..e598b96c82 100644 --- a/src/library/scala/collection/parallel/Splitter.scala +++ b/src/library/scala/collection/parallel/Splitter.scala @@ -30,6 +30,17 @@ trait Splitter[+T] extends Iterator[T] { * @return a sequence of disjunct iterators of the collection */ def split: Seq[Splitter[T]] + /* + * '''Note:''' splitters in this sequence may actually be empty and it can contain a splitter + * which iterates over the same elements as the original splitter AS LONG AS calling `split` + * a finite number of times on the resulting splitters eventually returns a nontrivial partition. + * + * Note that the docs contract above yields implementations which are a subset of implementations + * defined by this fineprint. + * + * The rationale behind this is best given by the following example: + * try splitting an iterator over a linear hash table. + */ } diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index 3facaea7c3..915e02f787 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -92,9 +92,6 @@ trait Tasks { /** Executes a task and returns a future. Forwards an exception if some task threw it. */ def execute[R, Tp](fjtask: TaskType[R, Tp]): () => R - /** Executes a task and waits for it to finish. Forwards an exception if some task threw it. */ - def executeAndWait[R, Tp](task: TaskType[R, Tp]) - /** Executes a result task, waits for it to finish, then returns its result. Forwards an exception if some task threw it. */ def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R @@ -214,20 +211,6 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { } } - /** Executes a task on a fork/join pool and waits for it to finish. - * - * $fjdispatch - */ - def executeAndWait[R, Tp](fjtask: Task[R, Tp]) { - if (currentThread.isInstanceOf[ForkJoinWorkerThread]) { - fjtask.fork - } else { - forkJoinPool.execute(fjtask) - } - fjtask.join - fjtask.forwardException - } - /** Executes a task on a fork/join pool and waits for it to finish. * Returns its result when it does. * diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index c72eb66207..306ec68548 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -156,7 +156,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => val buckets = heads.filter(_ != null) val root = new Array[HashMap[K, V]](buckets.length) - executeAndWait(new CreateTrie(buckets, root, 0, buckets.length)) + executeAndWaitResult(new CreateTrie(buckets, root, 0, buckets.length)) var bitmap = 0 var i = 0 diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala index c9bda86d67..0ef2681567 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala @@ -151,7 +151,7 @@ self: EnvironmentPassingCombiner[T, ParHashSet[T]] => val buckets = heads.filter(_ != null) val root = new Array[HashSet[T]](buckets.length) - executeAndWait(new CreateTrie(buckets, root, 0, buckets.length)) + executeAndWaitResult(new CreateTrie(buckets, root, 0, buckets.length)) var bitmap = 0 var i = 0 diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index d2123a402f..0ca6858fb2 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -541,7 +541,7 @@ extends ParSeq[T] val targetarr = targarrseq.array.asInstanceOf[Array[Any]] // fill it in parallel - executeAndWait(new Map[S](f, targetarr, 0, length)) + executeAndWaitResult(new Map[S](f, targetarr, 0, length)) // wrap it into a parallel array (new ParArray[S](targarrseq)).asInstanceOf[That] @@ -554,7 +554,7 @@ extends ParSeq[T] targetarr(0) = z // do a parallel prefix scan - executeAndWait(new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator) mapResult { st => + executeAndWaitResult(new BuildScanTree[U, Any](z, op, 1, size, targetarr, parallelIterator) mapResult { st => // println("-----------------------") // println(targetarr.toList) // st.printTree diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala index cb0e589d9b..7e6bbd9333 100644 --- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala +++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala @@ -29,7 +29,7 @@ extends LazyCombiner[T, ParArray[T], ExposedArrayBuffer[T]] val arrayseq = new ArraySeq[T](size) val array = arrayseq.array.asInstanceOf[Array[Any]] - executeAndWait(new CopyChainToArray(array, 0, size)) + executeAndWaitResult(new CopyChainToArray(array, 0, size)) new ParArray(arrayseq) } else { // optimisation if there is only 1 array diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala new file mode 100644 index 0000000000..db0c6ac50a --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala @@ -0,0 +1,55 @@ +package scala.collection.parallel +package mutable + + + + + + + + + +/* +class ParHashMap[K, +V] +extends ParMap[K, V] + with GenericParMapTemplate[K, V, ParHashMap] + with ParMapLike[K, V] +{ +self => + + override def mapCompanion: GenericParMapCompanion[ParHashMap] = ParHashMap + + override def empty: ParHashMap[K, V] = new ParHashMap[K, V] + + def parallelIterator = null // TODO + + def seq = null // TODO + + + +} + + + + +object ParHashMap { + +} + +*/ + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/mutable/ParHashSet.scala b/src/library/scala/collection/parallel/mutable/ParHashSet.scala new file mode 100644 index 0000000000..dc33ef3189 --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParHashSet.scala @@ -0,0 +1,30 @@ +package scala.collection.parallel.mutable + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/mutable/ParMap.scala b/src/library/scala/collection/parallel/mutable/ParMap.scala new file mode 100644 index 0000000000..c5fccf45ae --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParMap.scala @@ -0,0 +1,53 @@ +package scala.collection.parallel.mutable + + + + +import collection.generic._ +import collection.parallel.Combiner + + + +trait ParMap[K, V] +extends collection.mutable.Map[K, V] + with collection.parallel.ParMap[K, V] + with ParIterable[(K, V)] + with GenericParMapTemplate[K, V, ParMap] + with ParMapLike[K, V, ParMap[K, V], collection.mutable.Map[K, V]] +{ + + override def mapCompanion: GenericParMapCompanion[ParMap] = ParMap + + override def empty: ParMap[K, V] = null // TODO + +} + + + +object ParMap extends ParMapFactory[ParMap] { + def empty[K, V]: ParMap[K, V] = null // TODO + + def newCombiner[K, V]: Combiner[(K, V), ParMap[K, V]] = null // TODO + + implicit def canBuildFrom[K, V]: CanCombineFrom[Coll, (K, V), ParMap[K, V]] = new CanCombineFromMap[K, V] + +} + + + + + + + + + + + + + + + + + + + diff --git a/src/library/scala/collection/parallel/mutable/ParMapLike.scala b/src/library/scala/collection/parallel/mutable/ParMapLike.scala new file mode 100644 index 0000000000..902186eb2d --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParMapLike.scala @@ -0,0 +1,17 @@ +package scala.collection.parallel +package mutable + + + +import collection.generic._ +import collection.mutable.Builder + + + +trait ParMapLike[K, + V, + +Repr <: ParMapLike[K, V, Repr, Sequential] with ParMap[K, V], + +Sequential <: collection.mutable.Map[K, V] with collection.mutable.MapLike[K, V, Sequential]] +extends collection.parallel.ParMapLike[K, V, Repr, Sequential] + with collection.mutable.MapLike[K, V, Repr] + -- cgit v1.2.3