summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/collection/mutable/PriorityQueue.scala4
-rw-r--r--src/library/scala/collection/package.scala1
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala38
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala3
-rw-r--r--src/library/scala/collection/parallel/TaskSupport.scala6
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala295
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala4
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArray.scala7
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala2
-rw-r--r--src/library/scala/collection/parallel/package.scala4
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala2
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala2
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala2
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala4
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala2
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala2
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala2
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala2
-rw-r--r--test/files/scalacheck/parallel-collections/IntOperators.scala24
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala4
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala4
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala6
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala4
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala8
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala49
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala4
-rw-r--r--test/files/scalacheck/parallel-collections/pc.scala14
27 files changed, 371 insertions, 128 deletions
diff --git a/src/library/scala/collection/mutable/PriorityQueue.scala b/src/library/scala/collection/mutable/PriorityQueue.scala
index 45f1d1f06f..453bed54e6 100644
--- a/src/library/scala/collection/mutable/PriorityQueue.scala
+++ b/src/library/scala/collection/mutable/PriorityQueue.scala
@@ -15,8 +15,8 @@ import generic._
import annotation.migration
/** This class implements priority queues using a heap.
- * To prioritize elements of type T there must be an implicit
- * Ordering[T] available at creation.
+ * To prioritize elements of type A there must be an implicit
+ * Ordering[A] available at creation.
*
* @tparam A type of the elements in this priority queue.
* @param ord implicit ordering used to compare the elements of type `A`.
diff --git a/src/library/scala/collection/package.scala b/src/library/scala/collection/package.scala
index 13b6f22826..31cea84ab8 100644
--- a/src/library/scala/collection/package.scala
+++ b/src/library/scala/collection/package.scala
@@ -87,6 +87,7 @@ package object collection {
}
def arrayString[T](array: Array[T], from: Int, until: Int) = array.slice(from, until).map(x => if (x != null) x.toString else "n/a").mkString(" | ")
+
}
}
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 53a219ca28..83e5c6cb59 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -476,8 +476,7 @@ self =>
val copythis = new Copy(() => pbf(repr), parallelIterator)
val copythat = wrap {
val othtask = new other.Copy(() => pbf(self.repr), other.parallelIterator)
- othtask.compute
- othtask.result
+ other.tasksupport.executeAndWaitResult(othtask)
}
val task = (copythis parallel copythat) { _ combine _ } mapResult {
_.result
@@ -713,13 +712,12 @@ self =>
def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel)
def split = pit.split.map(newSubtask(_)) // default split procedure
private[parallel] override def signalAbort = pit.abort
- override def toString = "Accessor(" + pit.toString + ")"
+ override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")"
}
protected[this] trait NonDivisibleTask[R, Tp] extends StrictSplitterCheckTask[R, Tp] {
def shouldSplitFurther = false
def split = throw new UnsupportedOperationException("Does not split.")
- override def toString = "NonDivisibleTask"
}
protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]]
@@ -768,9 +766,7 @@ self =>
var result: R1 = null.asInstanceOf[R1]
def map(r: R): R1
def leaf(prevr: Option[R1]) = {
- inner.compute
- throwable = inner.throwable
- if (throwable eq null) result = map(inner.result)
+ result = map(executeAndWaitResult(inner))
}
private[parallel] override def signalAbort {
inner.signalAbort
@@ -787,10 +783,12 @@ self =>
}
protected[this] class Count(pred: T => Boolean, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Int, Count] {
+ // val pittxt = pit.toString
var result: Int = 0
def leaf(prevr: Option[Int]) = result = pit.count(pred)
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Count(pred, p)
override def merge(that: Count) = result = result + that.result
+ // override def toString = "CountTask(" + pittxt + ")"
}
protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] {
@@ -901,7 +899,9 @@ self =>
protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], Filter[U, This]] {
var result: Combiner[U, This] = null
- def leaf(prev: Option[Combiner[U, This]]) = result = pit.filter2combiner(pred, reuse(prev, cbf()))
+ def leaf(prev: Option[Combiner[U, This]]) = {
+ result = pit.filter2combiner(pred, reuse(prev, cbf()))
+ }
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Filter(pred, cbf, p)
override def merge(that: Filter[U, This]) = result = result combine that.result
}
@@ -909,7 +909,9 @@ self =>
protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], FilterNot[U, This]] {
var result: Combiner[U, This] = null
- def leaf(prev: Option[Combiner[U, This]]) = result = pit.filterNot2combiner(pred, reuse(prev, cbf()))
+ def leaf(prev: Option[Combiner[U, This]]) = {
+ result = pit.filterNot2combiner(pred, reuse(prev, cbf()))
+ }
protected[this] def newSubtask(p: ParIterableIterator[T]) = new FilterNot(pred, cbf, p)
override def merge(that: FilterNot[U, This]) = result = result combine that.result
}
@@ -1253,6 +1255,24 @@ self =>
private[parallel] def brokenInvariants = Seq[String]()
+ private val debugBuffer = collection.mutable.ArrayBuffer[String]()
+
+ private[parallel] def debugclear() = synchronized {
+ debugBuffer.clear
+ }
+
+ private[parallel] def debuglog(s: String) = synchronized {
+ debugBuffer += s
+ }
+
+ import collection.DebugUtils._
+ private[parallel] def printDebugBuffer = println(buildString {
+ append =>
+ for (s <- debugBuffer) {
+ append(s)
+ }
+ })
+
}
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index 9e24b83d8a..0ea33d0e39 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -242,8 +242,7 @@ self =>
val copystart = new Copy[U, That](() => pbf(repr), pits(0))
val copymiddle = wrap {
val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator)
- tsk.compute
- tsk.result
+ that.tasksupport.executeAndWaitResult(tsk)
}
val copyend = new Copy[U, That](() => pbf(repr), pits(2))
executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala
index 8a072b22aa..1b1ee6e469 100644
--- a/src/library/scala/collection/parallel/TaskSupport.scala
+++ b/src/library/scala/collection/parallel/TaskSupport.scala
@@ -6,7 +6,11 @@ package scala.collection.parallel
-trait TaskSupport extends AdaptiveWorkStealingForkJoinTasks
+trait TaskSupport extends Tasks
+
+class ForkJoinTaskSupport extends TaskSupport with AdaptiveWorkStealingForkJoinTasks
+
+class ThreadPoolTaskSupport extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index 6f0fe47a6a..ec38513d9b 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -7,7 +7,7 @@ import scala.concurrent.forkjoin._
import scala.util.control.Breaks._
-
+import annotation.unchecked.uncheckedVariance
@@ -20,49 +20,35 @@ import scala.util.control.Breaks._
*/
trait Tasks {
- /** A task abstraction which allows starting a task with `start`,
- * waiting for it to finish with `sync` and attempting to cancel
- * the task with `tryCancel`.
- * It also defines a method `leaf` which must be called once the
- * the task is started and defines what this task actually does.
- * Method `split` allows splitting this task into smaller subtasks,
- * and method `shouldSplitFurther` decides if the task should be
- * partitioned further.
- * Method `merge` allows merging the results of the 2 tasks. It updates
- * the result of the receiver.
- * Finally, it defines the task result of type `U`.
- */
+ private[parallel] val debugMessages = collection.mutable.ArrayBuffer[String]()
+
+ private[parallel] def debuglog(s: String) = synchronized {
+ debugMessages += s
+ }
+
trait Task[R, +Tp] {
type Result = R
+
def repr = this.asInstanceOf[Tp]
- /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */
- def compute
- /** Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous task
- * or `None` if there was no previous task.
+ /** Body of the task - non-divisible unit of work done by this task.
+ * Optionally is provided with the result from the previous completed task
+ * or `None` if there was no previous task (or the previous task is uncompleted or unknown).
*/
def leaf(result: Option[R])
- /** Start task. */
- def start
- /** Wait for task to finish. */
- def sync
- /** Try to cancel the task.
- * @return `true` if cancellation is successful.
- */
- def tryCancel: Boolean
/** A result that can be accessed once the task is completed. */
- def result: R
+ var result: R
/** Decides whether or not this task should be split further. */
def shouldSplitFurther: Boolean
/** Splits this task into a list of smaller tasks. */
- protected[this] def split: Seq[Task[R, Tp]]
+ private[parallel] def split: Seq[Task[R, Tp]]
/** Read of results of `that` task and merge them into results of this one. */
- protected[this] def merge(that: Tp) {}
+ private[parallel] def merge(that: Tp @uncheckedVariance) {}
// exception handling mechanism
var throwable: Throwable = null
def forwardThrowable = if (throwable != null) throw throwable
// tries to do the leaf computation, storing the possible exception
- protected def tryLeaf(result: Option[R]) {
+ private[parallel] def tryLeaf(result: Option[R]) {
try {
tryBreakable {
leaf(result)
@@ -75,7 +61,7 @@ trait Tasks {
signalAbort
}
}
- protected[this] def tryMerge(t: Tp) {
+ private[parallel] def tryMerge(t: Tp @uncheckedVariance) {
val that = t.asInstanceOf[Task[R, Tp]]
if (this.throwable == null && that.throwable == null) merge(t)
mergeThrowables(that)
@@ -90,16 +76,44 @@ trait Tasks {
private[parallel] def signalAbort {}
}
- type TaskType[R, +Tp] <: Task[R, Tp]
- type ExecutionEnvironment
+ trait TaskImpl[R, +Tp] {
+ /** the body of this task - what it executes, how it gets split and how results are merged. */
+ val body: Task[R, Tp]
+
+ def split: Seq[TaskImpl[R, Tp]]
+ /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */
+ def compute
+ /** Start task. */
+ def start
+ /** Wait for task to finish. */
+ def sync
+ /** Try to cancel the task.
+ * @return `true` if cancellation is successful.
+ */
+ def tryCancel: Boolean
+ /** If the task has been cancelled successfully, those syncing on it may
+ * automatically be notified, depending on the implementation. If they
+ * aren't, this release method should be called after processing the
+ * cancelled task.
+ *
+ * This method may be overridden.
+ */
+ def release {}
+ }
+
+ protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
- var environment: ExecutionEnvironment
+ /* task control */
+
+ // safe to assume it will always have the same type,
+ // because the `tasksupport` in parallel iterable is final
+ var environment: AnyRef
/** Executes a task and returns a future. Forwards an exception if some task threw it. */
- def execute[R, Tp](fjtask: TaskType[R, Tp]): () => R
+ def execute[R, Tp](fjtask: Task[R, Tp]): () => R
/** Executes a result task, waits for it to finish, then returns its result. Forwards an exception if some task threw it. */
- def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R
/** Retrieves the parallelism level of the task execution environment. */
def parallelismLevel: Int
@@ -107,40 +121,46 @@ trait Tasks {
}
+
/** This trait implements scheduling by employing
* an adaptive work stealing technique.
*/
trait AdaptiveWorkStealingTasks extends Tasks {
- trait Task[R, Tp] extends super.Task[R, Tp] {
- var next: Task[R, Tp] = null
+ trait TaskImpl[R, Tp] extends super.TaskImpl[R, Tp] {
+ var next: TaskImpl[R, Tp] = null
var shouldWaitFor = true
- var result: R
-
- def split: Seq[Task[R, Tp]]
- /** The actual leaf computation. */
- def leaf(result: Option[R]): Unit
+ def split: Seq[TaskImpl[R, Tp]]
- def compute = if (shouldSplitFurther) internal else tryLeaf(None)
+ def compute = if (body.shouldSplitFurther) internal else body.tryLeaf(None)
def internal = {
var last = spawnSubtasks
- last.tryLeaf(None)
- result = last.result
+ last.body.tryLeaf(None)
+ body.result = last.body.result
while (last.next != null) {
- val lastresult = Option(last.result)
+ // val lastresult = Option(last.body.result)
+ val beforelast = last
last = last.next
- if (last.tryCancel) last.tryLeaf(lastresult) else last.sync
- tryMerge(last.repr)
+ if (last.tryCancel) {
+ // debuglog("Done with " + beforelast.body + ", next direct is " + last.body)
+ last.body.tryLeaf(Some(body.result))
+ last.release
+ } else {
+ // debuglog("Done with " + beforelast.body + ", next sync is " + last.body)
+ last.sync
+ }
+ // debuglog("Merging " + body + " with " + last.body)
+ body.tryMerge(last.body.repr)
}
}
def spawnSubtasks = {
- var last: Task[R, Tp] = null
- var head: Task[R, Tp] = this
+ var last: TaskImpl[R, Tp] = null
+ var head: TaskImpl[R, Tp] = this
do {
val subtasks = head.split
head = subtasks.head
@@ -149,7 +169,7 @@ trait AdaptiveWorkStealingTasks extends Tasks {
last = t
t.start
}
- } while (head.shouldSplitFurther);
+ } while (head.body.shouldSplitFurther);
head.next = last
head
}
@@ -165,6 +185,9 @@ trait AdaptiveWorkStealingTasks extends Tasks {
}
}
+ // specialize ctor
+ protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+
}
@@ -176,6 +199,123 @@ trait HavingForkJoinPool {
}
+trait ThreadPoolTasks extends Tasks {
+ import java.util.concurrent._
+
+ trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
+ // initially, this is null
+ // once the task is started, this future is set and used for `sync`
+ // utb: var future: Future[_] = null
+ @volatile var owned = false
+ @volatile var completed = false
+
+ def start = synchronized {
+ // debuglog("Starting " + body)
+ // utb: future = executor.submit(this)
+ executor.synchronized {
+ incrTasks
+ executor.submit(this)
+ }
+ }
+ def sync = synchronized {
+ // debuglog("Syncing on " + body)
+ // utb: future.get()
+ executor.synchronized {
+ val coresize = executor.getCorePoolSize
+ if (coresize < totaltasks) executor.setCorePoolSize(coresize + 1)
+ }
+ if (!completed) this.wait
+ }
+ def tryCancel = synchronized {
+ // utb: future.cancel(false)
+ if (!owned) {
+ // debuglog("Cancelling " + body)
+ owned = true
+ true
+ } else false
+ }
+ def run = {
+ // utb: compute
+ var isOkToRun = false
+ synchronized {
+ if (!owned) {
+ owned = true
+ isOkToRun = true
+ }
+ }
+ if (isOkToRun) {
+ // debuglog("Running body of " + body)
+ compute
+ release
+ } else {
+ // just skip
+ // debuglog("skipping body of " + body)
+ }
+ }
+ override def release = synchronized {
+ completed = true
+ decrTasks
+ this.notifyAll
+ }
+ }
+
+ protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+
+ var environment: AnyRef = ThreadPoolTasks.defaultThreadPool
+ def executor = environment.asInstanceOf[ThreadPoolExecutor]
+ def queue = executor.getQueue.asInstanceOf[LinkedBlockingQueue[Runnable]]
+ var totaltasks = 0
+
+ private def incrTasks = synchronized {
+ totaltasks += 1
+ }
+
+ private def decrTasks = synchronized {
+ totaltasks -= 1
+ }
+
+ def execute[R, Tp](task: Task[R, Tp]): () => R = {
+ val t = newTaskImpl(task)
+
+ // debuglog("-----------> Executing without wait: " + task)
+ t.start
+
+ () => {
+ t.sync
+ t.body.result
+ }
+ }
+
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
+ val t = newTaskImpl(task)
+
+ // debuglog("-----------> Executing with wait: " + task)
+ t.start
+
+ t.sync
+ t.body.result
+ }
+
+ def parallelismLevel = ThreadPoolTasks.numCores
+
+}
+
+object ThreadPoolTasks {
+ import java.util.concurrent._
+
+ val numCores = Runtime.getRuntime.availableProcessors
+
+ val defaultThreadPool = new ThreadPoolExecutor(
+ numCores,
+ Int.MaxValue,
+ 60L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue[Runnable],
+ new ThreadPoolExecutor.CallerRunsPolicy
+ )
+}
+
+
+
/** An implementation trait for parallel tasks based on the fork/join framework.
*
* @define fjdispatch
@@ -184,36 +324,37 @@ trait HavingForkJoinPool {
*/
trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
- trait Task[R, +Tp] extends RecursiveAction with super.Task[R, Tp] {
+ trait TaskImpl[R, +Tp] extends RecursiveAction with super.TaskImpl[R, Tp] {
def start = fork
def sync = join
def tryCancel = tryUnfork
- var result: R
}
- type TaskType[R, +Tp] = Task[R, Tp]
- type ExecutionEnvironment = ForkJoinPool
+ // specialize ctor
+ protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
/** The fork/join pool of this collection.
*/
- def forkJoinPool: ForkJoinPool = environment
- var environment = ForkJoinTasks.defaultForkJoinPool
+ def forkJoinPool: ForkJoinPool = environment.asInstanceOf[ForkJoinPool]
+ var environment: AnyRef = ForkJoinTasks.defaultForkJoinPool
/** Executes a task and does not wait for it to finish - instead returns a future.
*
* $fjdispatch
*/
- def execute[R, Tp](fjtask: Task[R, Tp]): () => R = {
- if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
+ def execute[R, Tp](task: Task[R, Tp]): () => R = {
+ val fjtask = newTaskImpl(task)
+
+ if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
fjtask.fork
} else {
forkJoinPool.execute(fjtask)
}
() => {
- fjtask.join
- fjtask.forwardThrowable
- fjtask.result
+ fjtask.sync
+ fjtask.body.forwardThrowable
+ fjtask.body.result
}
}
@@ -224,15 +365,18 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
*
* @return the result of the task
*/
- def executeAndWaitResult[R, Tp](fjtask: Task[R, Tp]): R = {
- if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
+ val fjtask = newTaskImpl(task)
+
+ if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
fjtask.fork
} else {
forkJoinPool.execute(fjtask)
}
- fjtask.join
- fjtask.forwardThrowable
- fjtask.result
+
+ fjtask.sync
+ fjtask.body.forwardThrowable
+ fjtask.body.result
}
def parallelismLevel = forkJoinPool.getParallelism
@@ -251,11 +395,26 @@ object ForkJoinTasks {
*/
trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks {
- trait Task[R, Tp] extends super[ForkJoinTasks].Task[R, Tp] with super[AdaptiveWorkStealingTasks].Task[R, Tp]
+ class TaskImpl[R, Tp](val body: Task[R, Tp])
+ extends super[ForkJoinTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] {
+ def split = body.split.map(b => newTaskImpl(b))
+ }
+
+ def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b)
}
+trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveWorkStealingTasks {
+
+ class TaskImpl[R, Tp](val body: Task[R, Tp])
+ extends super[ThreadPoolTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] {
+ def split = body.split.map(b => newTaskImpl(b))
+ }
+
+ def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b)
+
+}
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 5c3720a3bf..58dce1aef4 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -90,6 +90,7 @@ self =>
i < sz
}
def remaining = sz - i
+ override def toString = "HashTrieIterator(" + sz + ")"
}
private[parallel] def printDebugInfo {
@@ -168,7 +169,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
}
override def toString = {
- "HashTrieCombiner(buckets:\n\t" + buckets.filter(_ != null).mkString("\n\t") + ")\n"
+ "HashTrieCombiner(sz: " + size + ")"
+ //"HashTrieCombiner(buckets:\n\t" + buckets.filter(_ != null).mkString("\n\t") + ")\n"
}
/* tasks */
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
index a1164b7b80..909b8eb5d7 100644
--- a/src/library/scala/collection/parallel/mutable/ParArray.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -102,8 +102,11 @@ self =>
val left = remaining
if (left >= 2) {
val splitpoint = left / 2
- Seq(new ParArrayIterator(i, i + splitpoint, arr) with SCPI,
- new ParArrayIterator(i + splitpoint, until, arr) with SCPI)
+ val sq = Seq(
+ new ParArrayIterator(i, i + splitpoint, arr) with SCPI,
+ new ParArrayIterator(i + splitpoint, until, arr) with SCPI)
+ i = until
+ sq
} else {
Seq(this)
}
diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
index d95e478fec..760f8b09ce 100644
--- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
@@ -38,7 +38,7 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] =>
pa
}
- override def toString = "ParArrayCombiner(" + size + "): " + chain
+ override def toString = "ParArrayCombiner(" + size + "): " //+ chain
/* tasks */
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 8cd54e6083..19ae9aef5d 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -36,7 +36,9 @@ package object parallel {
private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString)
- private[parallel] def getTaskSupport: TaskSupport = new TaskSupport {}
+ private[parallel] def getTaskSupport: TaskSupport =
+ if (util.Properties.isJavaAtLeast("1.6")) new ForkJoinTaskSupport
+ else new ThreadPoolTaskSupport
/* implicit conversions */
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala
index d0b7bae834..6ac8e7a3ad 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala
@@ -137,7 +137,7 @@ object RefParHashTableSetBenches extends ParHashTableSetBenches[Dummy] {
val phm = new ParHashSet[Dummy]
for (i <- 0 until sz) phm += new Dummy(i)
forkJoinPool.setParallelism(p)
- phm.environment = forkJoinPool
+ phm.tasksupport.environment = forkJoinPool
phm
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala
index 291f7ec62d..83e3177324 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala
@@ -206,7 +206,7 @@ object RefParHashTableBenches extends ParHashTableBenches[Dummy, Dummy] {
val phm = new ParHashMap[Dummy, Dummy]
for (i <- 0 until sz) phm += ((new Dummy(i), new Dummy(i)))
forkJoinPool.setParallelism(p)
- phm.environment = forkJoinPool
+ phm.tasksupport.environment = forkJoinPool
phm
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala
index 3e37086361..87a34e1e0e 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala
@@ -164,7 +164,7 @@ object RefParHashTrieBenches extends ParHashTrieBenches[Dummy, Dummy] {
var pht = new ParHashMap[Dummy, Dummy]
for (i <- 0 until sz) pht += ((new Dummy(i), new Dummy(i)))
forkJoinPool.setParallelism(p)
- pht.environment = forkJoinPool
+ pht.tasksupport.environment = forkJoinPool
pht
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala
index 17ad2f9882..b5dcfca872 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala
@@ -7,7 +7,7 @@ object ForeachHeavy extends Companion {
def benchName = "foreach-heavy";
def apply(sz: Int, parallelism: Int, what: String) = new ForeachHeavy(sz, parallelism, what)
override def comparisons = List("jsr")
- override def defaultSize = 16
+ override def defaultSize = 2048
val fun = (a: Cont) => heavyOperation(a)
val funjsr = new extra166y.Ops.Procedure[Cont] {
@@ -21,7 +21,7 @@ object ForeachHeavy extends Companion {
def checkPrime(n: Int) = {
var isPrime = true
var i = 2
- val until = scala.math.sqrt(n).toInt + 1
+ val until = 550
while (i < until) {
if (n % i == 0) isPrime = false
i += 1
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala
index be49995589..e4eb51d83b 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala
@@ -39,7 +39,7 @@ extends Resettable(sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) {
def assignProduct(a: Matrix[T], b: Matrix[T]) = {
val range = new ParRange(0, n * n, 1, false)
- range.environment = forkjoinpool
+ range.tasksupport.environment = forkjoinpool
for (i <- range) this(i / n, i % n) = calcProduct(a, b, i / n, i % n);
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala
index 68ceac2b53..c75432360b 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala
@@ -77,7 +77,7 @@ extends Bench with SequentialOps[T] {
for (i <- 0 until size) arr(i) = elemcreator(i)
case "par" =>
pa = new ParArray[T](size)
- pa.environment = forkjoinpool
+ pa.tasksupport.environment = forkjoinpool
for (i <- 0 until size) pa(i) = elemcreator(i)
case "jsr" =>
jsrarr = JSR166Array.create(size, cls, papool)
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala
index 14a6259a38..6cd1d74c5e 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala
@@ -22,7 +22,7 @@ object RangeBenches extends StandardParIterableBenches[Int, ParRange] {
def createParallel(sz: Int, p: Int) = {
val pr = new collection.parallel.immutable.ParRange(0, sz, 1, false)
forkJoinPool.setParallelism(p)
- pr.environment = forkJoinPool
+ pr.tasksupport.environment = forkJoinPool
pr
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala
index eed62fc5c1..abd9b7838f 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala
@@ -31,7 +31,7 @@ extends ParSeqViewBenches[Dummy, ParSeqView[Dummy, ParSeq[Dummy], Seq[Dummy]], S
forkJoinPool.setParallelism(p)
for (i <- 0 until sz) pa(i) = new Dummy(i)
val v = pa.view
- v.environment = forkJoinPool
+ v.tasksupport.environment = forkJoinPool
v
}
def createSeqView(sz: Int, p: Int) = createSequential(sz, p).view
diff --git a/test/files/scalacheck/parallel-collections/IntOperators.scala b/test/files/scalacheck/parallel-collections/IntOperators.scala
index 8d214b614f..690ee34cca 100644
--- a/test/files/scalacheck/parallel-collections/IntOperators.scala
+++ b/test/files/scalacheck/parallel-collections/IntOperators.scala
@@ -5,8 +5,14 @@ import scala.collection.parallel._
trait IntOperators extends Operators[Int] {
- def reduceOperators = List(_ + _, _ * _, math.min(_, _), math.max(_, _))
- def countPredicates = List(_ >= 0, _ < 0, _ < 50, _ < 500, _ < 5000, _ < 50000, _ % 2 == 0, _ == 99)
+ def reduceOperators = List(_ + _, _ * _, math.min(_, _), math.max(_, _), _ ^ _)
+ def countPredicates = List(
+ x => true,
+ _ >= 0, _ < 0, _ < 50, _ < 500, _ < 5000, _ < 50000, _ % 2 == 0, _ == 99,
+ x => x > 50 && x < 150,
+ x => x > 350 && x < 550,
+ x => (x > 1000 && x < 1500) || (x > 400 && x < 500)
+ )
def forallPredicates = List(_ >= 0, _ < 0, _ % 2 == 0, _ != 55, _ != 505, _ != 5005)
def existsPredicates = List(_ >= 0, _ < 0, _ % 2 == 0, _ == 55, _ == 505, _ == 5005)
def findPredicates = List(_ >= 0, _ % 2 == 0, _ < 0, _ == 50, _ == 500, _ == 5000)
@@ -18,9 +24,14 @@ trait IntOperators extends Operators[Int] {
(n: Int) => if (n == 0) List(1, 2, 3, 4, 5) else if (n < 0) List(1, 2, 3) else List()
)
def filterPredicates = List(
- _ % 2 == 0, _ % 3 == 0, _ % 4 != 0, _ % 17 != 0, n => n > 50 && n < 100, _ >= 0, _ < 0, _ == 99,
- _ > 500, _ > 5000, _ > 50000, _ < 500, _ < 50, _ < -50, _ < -5e5,
- x => true, x => false, x => x % 53 == 0 && x % 17 == 0
+ _ % 2 == 0, _ % 3 == 0,
+ _ % 4 != 0, _ % 17 != 0,
+ n => n > 50 && n < 100,
+ _ >= 0, _ < 0, _ == 99,
+ _ > 500, _ > 5000, _ > 50000,
+ _ < 500, _ < 50, _ < -50, _ < -5e5,
+ x => true, x => false,
+ x => x % 53 == 0 && x % 17 == 0
)
def filterNotPredicates = filterPredicates
def partitionPredicates = filterPredicates
@@ -30,7 +41,8 @@ trait IntOperators extends Operators[Int] {
_ < -100, _ < -1000, _ > -200, _ > -50,
n => -90 < n && n < -10,
n => 50 < n && n < 550,
- n => 5000 < n && n < 7500
+ n => 5000 < n && n < 7500,
+ n => -50 < n && n < 450
)
def dropWhilePredicates = takeWhilePredicates
def spanPredicates = takeWhilePredicates
diff --git a/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala b/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala
index 019e8c4fde..255c04498e 100644
--- a/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala
@@ -15,8 +15,8 @@ import scala.collection.parallel.ops._
abstract class ParallelArrayCheck[T](tp: String) extends ParallelSeqCheck[T]("ParArray[" + tp + "]") {
- ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
- ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
type CollType = ParArray[T]
diff --git a/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala b/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala
index a98331dc86..9805e2644f 100644
--- a/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala
@@ -22,8 +22,8 @@
// abstract class ParallelArrayViewCheck[T](tp: String)
// extends ParallelSeqCheck[T]("ParallelSeqView[" + tp + ", ParallelArray[" + tp + "]]") {
-// ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
-// ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
+// // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
+// // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
// type CollType = ParallelSeqView[T, ParallelArray[T], ArraySeq[T]]
diff --git a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala
index d53c0ba9d6..061bb08d9b 100644
--- a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala
@@ -15,8 +15,8 @@ import scala.collection.parallel.ops._
abstract class ParallelHashMapCheck[K, V](tp: String) extends ParallelMapCheck[K, V]("mutable.ParHashMap[" + tp + "]") {
- ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
- ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
type CollType = ParHashMap[K, V]
@@ -64,7 +64,7 @@ with PairValues[Int, Int]
}
override def checkDataStructureInvariants(orig: Traversable[(Int, Int)], ds: AnyRef) = ds match {
- case pm: ParHashMap[k, v] =>
+ case pm: ParHashMap[k, v] if 1 == 0 => // disabled this to make tests faster
val invs = pm.brokenInvariants
val containsall = (for ((k, v) <- orig) yield {
diff --git a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala
index 973a5cdf4b..be70a7c7a3 100644
--- a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala
@@ -15,8 +15,8 @@ import scala.collection.parallel.ops._
abstract class ParallelHashSetCheck[T](tp: String) extends ParallelSetCheck[T]("mutable.ParHashSet[" + tp + "]") {
- ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
- ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
type CollType = ParHashSet[T]
diff --git a/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala
index 10329c19f2..bbec52dc92 100644
--- a/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala
@@ -15,8 +15,8 @@ import scala.collection.parallel.ops._
abstract class ParallelHashMapCheck[K, V](tp: String) extends ParallelMapCheck[K, V]("immutable.ParHashMap[" + tp + "]") {
- ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
- ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
type CollType = ParHashMap[K, V]
@@ -67,8 +67,8 @@ with PairValues[Int, Int]
abstract class ParallelHashSetCheck[T](tp: String) extends ParallelSetCheck[T]("immutable.ParHashSet[" + tp + "]") {
- ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
- ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
type CollType = ParHashSet[T]
diff --git a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala
index e8838de3f5..8b5d72ea01 100644
--- a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala
@@ -82,6 +82,33 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col
case _ => t1 == t2 && t2 == t1
}
+ def printDebugInfo(coll: ParIterableLike[_, _, _]) {
+ println("Collection debug info: ")
+ coll.printDebugBuffer
+ println("Task debug info: ")
+ println(coll.tasksupport.debugMessages.mkString("\n"))
+ }
+
+ def printComparison(t: Traversable[_], coll: ParIterable[_], tf: Traversable[_], cf: ParIterable[_], ind: Int) {
+ printDebugInfo(coll)
+ println("Operator: " + ind)
+ println("sz: " + t.size)
+ println(t)
+ println
+ println("sz: " + coll.size)
+ println(coll)
+ println("transformed to:")
+ println
+ println("size: " + tf.size)
+ println(tf)
+ println
+ println("size: " + cf.size)
+ println(cf)
+ println
+ println("tf == cf - " + (tf == cf))
+ println("cf == tf - " + (cf == tf))
+ }
+
property("reductions must be equal for assoc. operators") = forAll(collectionPairs) { case (t, coll) =>
if (t.size != 0) {
val results = for ((op, ind) <- reduceOperators.zipWithIndex) yield {
@@ -105,10 +132,11 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col
val tc = t.count(pred)
val cc = coll.count(pred)
if (tc != cc) {
- println("from: " + t)
- println("and: " + coll.toList)
+ println("from: " + t + " - size: " + t.size)
+ println("and: " + coll + " - size: " + coll.toList.size)
println(tc)
println(cc)
+ printDebugInfo(coll)
}
("op index: " + ind) |: tc == cc
}
@@ -184,11 +212,20 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col
val cf = coll.filter(p)
val invs = checkDataStructureInvariants(tf, cf)
if (tf != cf || cf != tf || !invs) {
+ printDebugInfo(coll)
+ println("Operator: " + ind)
+ println("sz: " + t.size)
println(t)
+ println
+ println("sz: " + coll.size)
println(coll)
+ println
println("filtered to:")
+ println
println(cf)
+ println
println(tf)
+ println
println("tf == cf - " + (tf == cf))
println("cf == tf - " + (cf == tf))
printDataStructureDebugInfo(cf)
@@ -199,8 +236,12 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col
}
property("filterNots must be equal") = forAll(collectionPairs) { case (t, coll) =>
- (for ((p, ind) <- filterNotPredicates.zipWithIndex)
- yield ("op index: " + ind) |: t.filterNot(p) == coll.filterNot(p)).reduceLeft(_ && _)
+ (for ((p, ind) <- filterNotPredicates.zipWithIndex) yield {
+ val tf = t.filterNot(p)
+ val cf = coll.filterNot(p)
+ if (tf != cf || cf != tf) printComparison(t, coll, tf, cf, ind)
+ ("op index: " + ind) |: tf == cf && cf == tf
+ }).reduceLeft(_ && _)
}
if (!isCheckingViews) property("partitions must be equal") = forAll(collectionPairs) { case (t, coll) =>
diff --git a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala
index 850a5d5473..c34fb872aa 100644
--- a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala
@@ -18,8 +18,8 @@ import scala.collection.parallel.ops._
object ParallelRangeCheck extends ParallelSeqCheck[Int]("ParallelRange[Int]") with ops.IntSeqOperators {
- ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
- ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
+ // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
type CollType = collection.parallel.ParSeq[Int]
diff --git a/test/files/scalacheck/parallel-collections/pc.scala b/test/files/scalacheck/parallel-collections/pc.scala
index efc393889e..590da6dba4 100644
--- a/test/files/scalacheck/parallel-collections/pc.scala
+++ b/test/files/scalacheck/parallel-collections/pc.scala
@@ -11,22 +11,22 @@ class ParCollProperties extends Properties("Parallel collections") {
/* Collections */
// parallel arrays
- //include(mutable.IntParallelArrayCheck)
+ include(mutable.IntParallelArrayCheck)
// parallel ranges
- //include(immutable.ParallelRangeCheck)
+ include(immutable.ParallelRangeCheck)
// parallel immutable hash maps (tries)
- //include(immutable.IntIntParallelHashMapCheck)
+ include(immutable.IntIntParallelHashMapCheck)
// parallel immutable hash sets (tries)
- //include(immutable.IntParallelHashSetCheck)
+ include(immutable.IntParallelHashSetCheck)
// parallel mutable hash maps (tables)
- //include(mutable.IntIntParallelHashMapCheck)
+ include(mutable.IntIntParallelHashMapCheck)
// parallel mutable hash sets (tables)
- //include(mutable.IntParallelHashSetCheck)
+ include(mutable.IntParallelHashSetCheck)
// parallel vectors
@@ -52,7 +52,7 @@ object Test {
workers = 1,
minSize = 0,
maxSize = 4000,
- minSuccessfulTests = 150
+ minSuccessfulTests = 120
),
pc
)