summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/collection/parallel')
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala20
-rw-r--r--src/library/scala/collection/parallel/ParMap.scala1
-rw-r--r--src/library/scala/collection/parallel/ParMapLike.scala4
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala15
-rw-r--r--src/library/scala/collection/parallel/RemainsIterator.scala12
-rw-r--r--src/library/scala/collection/parallel/TaskSupport.scala4
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala13
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala2
-rw-r--r--src/library/scala/collection/parallel/immutable/ParMap.scala1
-rw-r--r--src/library/scala/collection/parallel/immutable/ParRange.scala1
-rw-r--r--src/library/scala/collection/parallel/mutable/LazyCombiner.scala1
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArray.scala1
-rw-r--r--src/library/scala/collection/parallel/mutable/ParTrieMap.scala15
-rw-r--r--src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala8
-rw-r--r--src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala16
-rw-r--r--src/library/scala/collection/parallel/package.scala12
16 files changed, 41 insertions, 85 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 8c9b959569..2ed7bc075e 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -9,6 +9,8 @@
package scala
package collection.parallel
+import scala.language.{ higherKinds, implicitConversions }
+
import scala.collection.mutable.Builder
import scala.collection.mutable.ArrayBuffer
import scala.collection.IterableLike
@@ -21,13 +23,9 @@ import scala.collection.GenIterable
import scala.collection.GenTraversableOnce
import scala.collection.GenTraversable
import immutable.HashMapCombiner
-import scala.reflect.{ClassTag, classTag}
-
-import java.util.concurrent.atomic.AtomicBoolean
+import scala.reflect.ClassTag
import scala.annotation.unchecked.uncheckedVariance
-import scala.annotation.unchecked.uncheckedStable
-import scala.language.{ higherKinds, implicitConversions }
import scala.collection.parallel.ParallelCollectionImplicits._
@@ -195,7 +193,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
* import scala.collection.parallel._
* val pc = mutable.ParArray(1, 2, 3)
* pc.tasksupport = new ForkJoinTaskSupport(
- * new scala.concurrent.forkjoin.ForkJoinPool(2))
+ * new java.util.concurrent.ForkJoinPool(2))
* }}}
*
* @see [[scala.collection.parallel.TaskSupport]]
@@ -1284,7 +1282,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] {
@volatile var result: Result = null
def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](othpit, pbf())
- protected[this] def newSubtask(p: IterableSplitter[T]) = unsupported
+ protected[this] def newSubtask(p: IterableSplitter[T]) = throw new UnsupportedOperationException
override def split = {
val pits = pit.splitWithSignalling
val sizes = pits.map(_.remaining)
@@ -1300,7 +1298,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
extends Transformer[Combiner[(U, S), That], ZipAll[U, S, That]] {
@volatile var result: Result = null
def leaf(prev: Option[Result]) = result = pit.zipAll2combiner[U, S, That](othpit, thiselem, thatelem, pbf())
- protected[this] def newSubtask(p: IterableSplitter[T]) = unsupported
+ protected[this] def newSubtask(p: IterableSplitter[T]) = throw new UnsupportedOperationException
override def split = if (pit.remaining <= len) {
val pits = pit.splitWithSignalling
val sizes = pits.map(_.remaining)
@@ -1322,7 +1320,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
extends Accessor[Unit, CopyToArray[U, This]] {
@volatile var result: Unit = ()
def leaf(prev: Option[Unit]) = pit.copyToArray(array, from, len)
- protected[this] def newSubtask(p: IterableSplitter[T]) = unsupported
+ protected[this] def newSubtask(p: IterableSplitter[T]) = throw new UnsupportedOperationException
override def split = {
val pits = pit.splitWithSignalling
for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining); if untilp < len) yield {
@@ -1379,7 +1377,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
val half = howmany / 2
ScanNode(mergeTrees(trees, from, half), mergeTrees(trees, from + half, howmany - half))
} else trees(from)
- protected[this] def newSubtask(pit: IterableSplitter[T]) = unsupported
+ protected[this] def newSubtask(pit: IterableSplitter[T]) = throw new UnsupportedOperationException
override def split = {
val pits = pit.splitWithSignalling
for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield {
@@ -1416,7 +1414,7 @@ self: ParIterableLike[T, Repr, Sequential] =>
new FromScanTree(left, z, op, cbf),
new FromScanTree(right, z, op, cbf)
)
- case _ => unsupportedop("Cannot be split further")
+ case _ => throw new UnsupportedOperationException("Cannot be split further")
}
def shouldSplitFurther = tree match {
case ScanNode(_, _) => true
diff --git a/src/library/scala/collection/parallel/ParMap.scala b/src/library/scala/collection/parallel/ParMap.scala
index 9f92e6c1e8..70afe5174b 100644
--- a/src/library/scala/collection/parallel/ParMap.scala
+++ b/src/library/scala/collection/parallel/ParMap.scala
@@ -11,7 +11,6 @@ package collection.parallel
import scala.collection.Map
import scala.collection.GenMap
-import scala.collection.mutable.Builder
import scala.collection.generic.ParMapFactory
import scala.collection.generic.GenericParMapTemplate
import scala.collection.generic.GenericParMapCompanion
diff --git a/src/library/scala/collection/parallel/ParMapLike.scala b/src/library/scala/collection/parallel/ParMapLike.scala
index 0a671fb085..a3ac388587 100644
--- a/src/library/scala/collection/parallel/ParMapLike.scala
+++ b/src/library/scala/collection/parallel/ParMapLike.scala
@@ -12,10 +12,8 @@ package collection.parallel
import scala.collection.MapLike
import scala.collection.GenMapLike
import scala.collection.Map
-import scala.collection.mutable.Builder
+
import scala.annotation.unchecked.uncheckedVariance
-import scala.collection.generic.IdleSignalling
-import scala.collection.generic.Signalling
/** A template trait for mutable parallel maps. This trait is to be mixed in
* with concrete parallel maps to override the representation type.
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index 0b6fec364e..60fa1858e7 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -9,11 +9,10 @@
package scala
package collection.parallel
-import scala.collection.{ Parallel, SeqLike, GenSeqLike, GenSeq, GenIterable, Iterator }
+import scala.collection.{ SeqLike, GenSeq, GenIterable, Iterator }
import scala.collection.generic.DefaultSignalling
import scala.collection.generic.AtomicIndexFlag
import scala.collection.generic.CanBuildFrom
-import scala.collection.generic.CanCombineFrom
import scala.collection.generic.VolatileAbort
import scala.collection.parallel.ParallelCollectionImplicits._
@@ -365,7 +364,7 @@ self =>
pit.setIndexFlagIfLesser(from)
}
}
- protected[this] def newSubtask(p: SuperParIterator) = unsupported
+ protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
override def split = {
val pits = pit.splitWithSignalling
for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield new IndexWhere(pred, untilp, p)
@@ -386,7 +385,7 @@ self =>
pit.setIndexFlagIfGreater(pos)
}
}
- protected[this] def newSubtask(p: SuperParIterator) = unsupported
+ protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
override def split = {
val pits = pit.splitWithSignalling
for ((p, untilp) <- pits zip pits.scanLeft(pos)(_ + _.remaining)) yield new LastIndexWhere(pred, untilp, p)
@@ -420,7 +419,7 @@ self =>
result = pit.sameElements(otherpit)
if (!result) pit.abort()
}
- protected[this] def newSubtask(p: SuperParIterator) = unsupported
+ protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
override def split = {
val fp = pit.remaining / 2
val sp = pit.remaining - fp
@@ -434,7 +433,7 @@ self =>
extends Transformer[Combiner[U, That], Updated[U, That]] {
@volatile var result: Combiner[U, That] = null
def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf())
- protected[this] def newSubtask(p: SuperParIterator) = unsupported
+ protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
override def split = {
val pits = pit.splitWithSignalling
for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Updated(pos - untilp, elem, pbf, p)
@@ -447,7 +446,7 @@ self =>
extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] {
@volatile var result: Result = null
def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, cf())
- protected[this] def newSubtask(p: SuperParIterator) = unsupported
+ protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
override def split = {
val fp = len / 2
val sp = len - len / 2
@@ -468,7 +467,7 @@ self =>
result = pit.corresponds(corr)(otherpit)
if (!result) pit.abort()
}
- protected[this] def newSubtask(p: SuperParIterator) = unsupported
+ protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
override def split = {
val fp = pit.remaining / 2
val sp = pit.remaining - fp
diff --git a/src/library/scala/collection/parallel/RemainsIterator.scala b/src/library/scala/collection/parallel/RemainsIterator.scala
index 5f2ceac0e0..63d63d9ef3 100644
--- a/src/library/scala/collection/parallel/RemainsIterator.scala
+++ b/src/library/scala/collection/parallel/RemainsIterator.scala
@@ -9,13 +9,10 @@
package scala
package collection.parallel
-import scala.collection.Parallel
import scala.collection.generic.Signalling
import scala.collection.generic.DelegatedSignalling
import scala.collection.generic.IdleSignalling
-import scala.collection.generic.CanCombineFrom
import scala.collection.mutable.Builder
-import scala.collection.Iterator.empty
import scala.collection.GenTraversableOnce
import scala.collection.parallel.immutable.repetition
@@ -456,6 +453,15 @@ self =>
}
it
}
+ /** Drop implemented as simple eager consumption. */
+ override def drop(n: Int): IterableSplitter[T] = {
+ var i = 0
+ while (i < n && hasNext) {
+ next()
+ i += 1
+ }
+ this
+ }
override def take(n: Int): IterableSplitter[T] = newTaken(n)
override def slice(from1: Int, until1: Int): IterableSplitter[T] = newSliceInternal(newTaken(until1), from1)
diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala
index 9064018d46..6ab694de04 100644
--- a/src/library/scala/collection/parallel/TaskSupport.scala
+++ b/src/library/scala/collection/parallel/TaskSupport.scala
@@ -10,7 +10,7 @@ package scala
package collection.parallel
import java.util.concurrent.ThreadPoolExecutor
-import scala.concurrent.forkjoin.ForkJoinPool
+import java.util.concurrent.ForkJoinPool
import scala.concurrent.ExecutionContext
/** A trait implementing the scheduling of a parallel collection operation.
@@ -41,7 +41,7 @@ import scala.concurrent.ExecutionContext
* import scala.collection.parallel._
* val pc = mutable.ParArray(1, 2, 3)
* pc.tasksupport = new ForkJoinTaskSupport(
- * new scala.concurrent.forkjoin.ForkJoinPool(2))
+ * new java.util.concurrent.ForkJoinPool(2))
* }}}
*
* @see [[http://docs.scala-lang.org/overviews/parallel-collections/configuration.html Configuring Parallel Collections]] section
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index fcf0dff846..2a4e40dd16 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -10,7 +10,7 @@ package scala
package collection.parallel
import java.util.concurrent.ThreadPoolExecutor
-import scala.concurrent.forkjoin._
+import java.util.concurrent.{ForkJoinPool, RecursiveAction, ForkJoinWorkerThread}
import scala.concurrent.ExecutionContext
import scala.util.control.Breaks._
import scala.annotation.unchecked.uncheckedVariance
@@ -66,13 +66,10 @@ trait Task[R, +Tp] {
}
private[parallel] def mergeThrowables(that: Task[_, _]) {
- // TODO: As soon as we target Java >= 7, use Throwable#addSuppressed
- // to pass additional Throwables to the caller, e. g.
- // if (this.throwable != null && that.throwable != null)
- // this.throwable.addSuppressed(that.throwable)
- // For now, we just use whatever Throwable comes across “first”.
- if (this.throwable == null && that.throwable != null)
- this.throwable = that.throwable
+ if (this.throwable != null && that.throwable != null)
+ this.throwable.addSuppressed(that.throwable)
+ else if (this.throwable == null && that.throwable != null)
+ this.throwable = that.throwable
}
// override in concrete task implementations to signal abort to other tasks
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
index 65a632470e..3a1ec7fff8 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -197,7 +197,7 @@ extends scala.collection.parallel.BucketCombiner[T, ParHashSet[T], Any, HashSetC
while (i < chunksz) {
val v = chunkarr(i).asInstanceOf[T]
val hc = trie.computeHash(v)
- trie = trie.updated0(v, hc, rootbits)
+ trie = trie.updated0(v, hc, rootbits) // internal API, private[collection]
i += 1
}
i = 0
diff --git a/src/library/scala/collection/parallel/immutable/ParMap.scala b/src/library/scala/collection/parallel/immutable/ParMap.scala
index 2956c2a883..65bb2e12c5 100644
--- a/src/library/scala/collection/parallel/immutable/ParMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParMap.scala
@@ -16,7 +16,6 @@ import scala.collection.generic.GenericParMapCompanion
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.ParMapLike
import scala.collection.parallel.Combiner
-import scala.collection.GenMapLike
/** A template trait for immutable parallel maps.
*
diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala
index ec90de3a7d..8fd5382ce9 100644
--- a/src/library/scala/collection/parallel/immutable/ParRange.scala
+++ b/src/library/scala/collection/parallel/immutable/ParRange.scala
@@ -12,7 +12,6 @@ package collection.parallel.immutable
import scala.collection.immutable.Range
import scala.collection.parallel.Combiner
import scala.collection.parallel.SeqSplitter
-import scala.collection.generic.CanCombineFrom
import scala.collection.Iterator
/** Parallel ranges.
diff --git a/src/library/scala/collection/parallel/mutable/LazyCombiner.scala b/src/library/scala/collection/parallel/mutable/LazyCombiner.scala
index 5ab2bb81c6..cc25b5b4b2 100644
--- a/src/library/scala/collection/parallel/mutable/LazyCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/LazyCombiner.scala
@@ -30,7 +30,6 @@ trait LazyCombiner[Elem, +To, Buff <: Growable[Elem] with Sizing] extends Combin
def result: To = allocateAndCopy
def clear() = { chain.clear() }
def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
- import language.existentials // FIXME: See SI-7750
if (other.isInstanceOf[LazyCombiner[_, _, _]]) {
val that = other.asInstanceOf[LazyCombiner[Elem, To, Buff]]
newLazyCombiner(chain ++= that.chain)
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
index d0d022db4b..8a2cf2716a 100644
--- a/src/library/scala/collection/parallel/mutable/ParArray.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -18,7 +18,6 @@ import scala.collection.generic.GenericParCompanion
import scala.collection.generic.CanCombineFrom
import scala.collection.generic.CanBuildFrom
import scala.collection.generic.ParFactory
-import scala.collection.generic.Sizing
import scala.collection.parallel.Combiner
import scala.collection.parallel.SeqSplitter
import scala.collection.parallel.ParSeqLike
diff --git a/src/library/scala/collection/parallel/mutable/ParTrieMap.scala b/src/library/scala/collection/parallel/mutable/ParTrieMap.scala
index a1dc37cec9..2faf223b99 100644
--- a/src/library/scala/collection/parallel/mutable/ParTrieMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParTrieMap.scala
@@ -152,18 +152,9 @@ extends TrieMapIterator[K, V](lev, ct, mustInit)
/** Only used within the `ParTrieMap`. */
private[mutable] trait ParTrieMapCombiner[K, V] extends Combiner[(K, V), ParTrieMap[K, V]] {
- def combine[N <: (K, V), NewTo >: ParTrieMap[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this eq other) this else {
- throw new UnsupportedOperationException("This shouldn't have been called in the first place.")
-
- val thiz = this.asInstanceOf[ParTrieMap[K, V]]
- val that = other.asInstanceOf[ParTrieMap[K, V]]
- val result = new ParTrieMap[K, V]
-
- result ++= thiz.iterator
- result ++= that.iterator
-
- result
- }
+ def combine[N <: (K, V), NewTo >: ParTrieMap[K, V]](other: Combiner[N, NewTo]): Combiner[N, NewTo] =
+ if (this eq other) this
+ else throw new UnsupportedOperationException("This shouldn't have been called in the first place.")
override def canBeShared = true
}
diff --git a/src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala
index 79322c85b1..6883457fef 100644
--- a/src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/ResizableParArrayCombiner.scala
@@ -9,18 +9,10 @@
package scala
package collection.parallel.mutable
-
-
-import scala.collection.generic.Sizing
import scala.collection.mutable.ArraySeq
import scala.collection.mutable.ArrayBuffer
-import scala.collection.parallel.TaskSupport
-import scala.collection.parallel.unsupportedop
-import scala.collection.parallel.Combiner
import scala.collection.parallel.Task
-
-
/** An array combiner that uses a chain of arraybuffers to store elements. */
trait ResizableParArrayCombiner[T] extends LazyCombiner[T, ParArray[T], ExposedArrayBuffer[T]] {
diff --git a/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala
index d1379cde11..e71e61f2f1 100644
--- a/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala
@@ -9,23 +9,11 @@
package scala
package collection.parallel.mutable
-import scala.collection.generic.Sizing
import scala.collection.mutable.ArraySeq
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.UnrolledBuffer
+import scala.collection.mutable.DoublingUnrolledBuffer
import scala.collection.mutable.UnrolledBuffer.Unrolled
-import scala.collection.parallel.TaskSupport
-import scala.collection.parallel.unsupportedop
import scala.collection.parallel.Combiner
import scala.collection.parallel.Task
-import scala.reflect.ClassTag
-
-// Todo -- revisit whether inheritance is the best way to achieve this functionality
-private[mutable] class DoublingUnrolledBuffer[T](implicit t: ClassTag[T]) extends UnrolledBuffer[T]()(t) {
- override def calcNextLength(sz: Int) = if (sz < 10000) sz * 2 else sz
- protected override def newUnrolled = new Unrolled[T](0, new Array[T](4), null, this)
-}
-
/** An array combiner that uses doubling unrolled buffers to store elements. */
trait UnrolledParArrayCombiner[T]
@@ -62,7 +50,7 @@ extends Combiner[T, ParArray[T]] {
case that: UnrolledParArrayCombiner[t] =>
buff concat that.buff
this
- case _ => unsupportedop("Cannot combine with combiner of different type.")
+ case _ => throw new UnsupportedOperationException("Cannot combine with combiner of different type.")
}
def size = buff.size
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index d77dcb0658..ba64ca505b 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -35,15 +35,7 @@ package object parallel {
else sz
}
- private[parallel] def unsupported = throw new UnsupportedOperationException
-
- private[parallel] def unsupportedop(msg: String) = throw new UnsupportedOperationException(msg)
-
- private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString)
-
- private[parallel] def getTaskSupport: TaskSupport = new ExecutionContextTaskSupport
-
- val defaultTaskSupport: TaskSupport = getTaskSupport
+ val defaultTaskSupport: TaskSupport = new ExecutionContextTaskSupport
def setTaskSupport[Coll](c: Coll, t: TaskSupport): Coll = {
c match {
@@ -98,7 +90,7 @@ package parallel {
}
}
}
-
+
trait FactoryOps[From, Elem, To] {
trait Otherwise[R] {
def otherwise(notbody: => R): R