summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/ParIterableLike.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/collection/parallel/ParIterableLike.scala')
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala43
1 files changed, 36 insertions, 7 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 3008f93ebd..9f894c0af8 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -10,12 +10,12 @@ import scala.collection.Parallel
import scala.collection.Parallelizable
import scala.collection.Sequentializable
import scala.collection.generic._
-
+import immutable.HashMapCombiner
import java.util.concurrent.atomic.AtomicBoolean
+import annotation.unchecked.uncheckedVariance
-import annotation.unchecked.uncheckedStable
// TODO update docs!!
@@ -520,6 +520,12 @@ self =>
executeAndWaitResult(new Partition(pred, cbfactory, parallelIterator) mapResult { p => (p._1.result, p._2.result) })
}
+ // override def groupBy[K](f: T => K): immutable.ParMap[K, Repr] = {
+ // executeAndWaitResult(new GroupBy(f, () => HashMapCombiner[K, T], parallelIterator) mapResult {
+ // rcb => rcb.groupByKey(cbfactory)
+ // })
+ // }
+
override def take(n: Int): Repr = {
val actualn = if (size > n) n else size
if (actualn < MIN_FOR_COPY) take_sequential(actualn)
@@ -893,9 +899,9 @@ self =>
def leaf(prev: Option[Combiner[S, That]]) = result = pit.flatmap2combiner(f, pbf(self.repr))
protected[this] def newSubtask(p: ParIterableIterator[T]) = new FlatMap(f, pbf, p)
override def merge(that: FlatMap[S, That]) = {
- debuglog("merging " + result + " and " + that.result)
+ //debuglog("merging " + result + " and " + that.result)
result = result combine that.result
- debuglog("merged into " + result)
+ //debuglog("merged into " + result)
}
}
@@ -956,6 +962,29 @@ self =>
override def merge(that: Partition[U, This]) = result = (result._1 combine that.result._1, result._2 combine that.result._2)
}
+ protected[this] class GroupBy[K, U >: T](
+ f: U => K,
+ mcf: () => HashMapCombiner[K, U],
+ protected[this] val pit: ParIterableIterator[T]
+ ) extends Transformer[HashMapCombiner[K, U], GroupBy[K, U]] {
+ @volatile var result: Result = null
+ final def leaf(prev: Option[Result]) = {
+ // note: HashMapCombiner doesn't merge same keys until evaluation
+ val cb = mcf()
+ while (pit.hasNext) {
+ val elem = pit.next
+ cb += f(elem) -> elem
+ }
+ result = cb
+ }
+ protected[this] def newSubtask(p: ParIterableIterator[T]) = new GroupBy(f, mcf, p)
+ override def merge(that: GroupBy[K, U]) = {
+ // note: this works because we know that a HashMapCombiner doesn't merge same keys until evaluation
+ // --> we know we're not dropping any mappings
+ result = (result combine that.result).asInstanceOf[HashMapCombiner[K, U]]
+ }
+ }
+
protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], Take[U, This]] {
@volatile var result: Combiner[U, This] = null
@@ -1264,9 +1293,9 @@ self =>
private[parallel] def brokenInvariants = Seq[String]()
- private val dbbuff = ArrayBuffer[String]()
- def debugBuffer: ArrayBuffer[String] = dbbuff
- // def debugBuffer: ArrayBuffer[String] = null
+ // private val dbbuff = ArrayBuffer[String]()
+ // def debugBuffer: ArrayBuffer[String] = dbbuff
+ def debugBuffer: ArrayBuffer[String] = null
private[parallel] def debugclear() = synchronized {
debugBuffer.clear