diff options
Diffstat (limited to 'src/library/scala/collection/parallel/ParIterableLike.scala')
-rw-r--r-- | src/library/scala/collection/parallel/ParIterableLike.scala | 43 |
1 files changed, 36 insertions, 7 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 3008f93ebd..9f894c0af8 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -10,12 +10,12 @@ import scala.collection.Parallel import scala.collection.Parallelizable import scala.collection.Sequentializable import scala.collection.generic._ - +import immutable.HashMapCombiner import java.util.concurrent.atomic.AtomicBoolean +import annotation.unchecked.uncheckedVariance -import annotation.unchecked.uncheckedStable // TODO update docs!! @@ -520,6 +520,12 @@ self => executeAndWaitResult(new Partition(pred, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) }) } + // override def groupBy[K](f: T => K): immutable.ParMap[K, Repr] = { + // executeAndWaitResult(new GroupBy(f, () => HashMapCombiner[K, T], parallelIterator) mapResult { + // rcb => rcb.groupByKey(cbfactory) + // }) + // } + override def take(n: Int): Repr = { val actualn = if (size > n) n else size if (actualn < MIN_FOR_COPY) take_sequential(actualn) @@ -893,9 +899,9 @@ self => def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf(self.repr)) protected[this] def newSubtask(p: ParIterableIterator[T]) = new FlatMap(f, pbf, p) override def merge(that: FlatMap[S, That]) = { - debuglog("merging " + result + " and " + that.result) + //debuglog("merging " + result + " and " + that.result) result = result combine that.result - debuglog("merged into " + result) + //debuglog("merged into " + result) } } @@ -956,6 +962,29 @@ self => override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2) } + protected[this] class GroupBy[K, U >: T]( + f: U => K, + mcf: () => HashMapCombiner[K, U], + protected[this] val pit: ParIterableIterator[T] + ) extends Transformer[HashMapCombiner[K, U], GroupBy[K, U]] { + @volatile var result: Result = null + final def leaf(prev: Option[Result]) = { + // note: HashMapCombiner doesn't merge same keys until evaluation + val cb = mcf() + while (pit.hasNext) { + val elem = pit.next + cb += f(elem) -> elem + } + result = cb + } + protected[this] def newSubtask(p: ParIterableIterator[T]) = new GroupBy(f, mcf, p) + override def merge(that: GroupBy[K, U]) = { + // note: this works because we know that a HashMapCombiner doesn't merge same keys until evaluation + // --> we know we're not dropping any mappings + result = (result combine that.result).asInstanceOf[HashMapCombiner[K, U]] + } + } + protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, This], Take[U, This]] { @volatile var result: Combiner[U, This] = null @@ -1264,9 +1293,9 @@ self => private[parallel] def brokenInvariants = Seq[String]() - private val dbbuff = ArrayBuffer[String]() - def debugBuffer: ArrayBuffer[String] = dbbuff - // def debugBuffer: ArrayBuffer[String] = null + // private val dbbuff = ArrayBuffer[String]() + // def debugBuffer: ArrayBuffer[String] = dbbuff + def debugBuffer: ArrayBuffer[String] = null private[parallel] def debugclear() = synchronized { debugBuffer.clear |