From a730fb5cc6cea39a29e9ff4cd666fa8498f6adec Mon Sep 17 00:00:00 2001 From: Aleksandar Pokopec Date: Thu, 9 Dec 2010 10:08:11 +0000 Subject: Fixing jvm 1.5 support for parallel collections. Special cased with thread pool executor scheduling. Fixed an ugly concurrency bug where futures returned by a thread pool executor didn't remove the task from the queue when cancel was called. Note to self and others: don't cancel futures returned by thread pool executors, it might lead to unexpected behaviour. Modified the executor to add new threads if all the active threads are syncing, in order to avoid deadlocks. Fixed a hidden bug in AdaptiveWorkStealingTasks, where correct behaviour depended on the execution order of the tasks. This didn't fail before with ForkJoinTasks, since there the execution order is well-defined. Scalachecked 1.5 & 1.6 support. No review. --- .../parallel-collections/IntOperators.scala | 24 ++++++++--- .../parallel-collections/ParallelArrayCheck.scala | 4 +- .../ParallelArrayViewCheck.scala | 4 +- .../ParallelHashMapCheck.scala | 6 +-- .../ParallelHashSetCheck.scala | 4 +- .../ParallelHashTrieCheck.scala | 8 ++-- .../ParallelIterableCheck.scala | 49 ++++++++++++++++++++-- .../parallel-collections/ParallelRangeCheck.scala | 4 +- .../files/scalacheck/parallel-collections/pc.scala | 14 +++---- 9 files changed, 85 insertions(+), 32 deletions(-) (limited to 'test/files/scalacheck/parallel-collections') diff --git a/test/files/scalacheck/parallel-collections/IntOperators.scala b/test/files/scalacheck/parallel-collections/IntOperators.scala index 8d214b614f..690ee34cca 100644 --- a/test/files/scalacheck/parallel-collections/IntOperators.scala +++ b/test/files/scalacheck/parallel-collections/IntOperators.scala @@ -5,8 +5,14 @@ import scala.collection.parallel._ trait IntOperators extends Operators[Int] { - def reduceOperators = List(_ + _, _ * _, math.min(_, _), math.max(_, _)) - def countPredicates = List(_ >= 0, _ < 0, _ < 50, _ < 500, _ < 5000, _ < 50000, _ % 2 == 0, _ == 99) + def reduceOperators = List(_ + _, _ * _, math.min(_, _), math.max(_, _), _ ^ _) + def countPredicates = List( + x => true, + _ >= 0, _ < 0, _ < 50, _ < 500, _ < 5000, _ < 50000, _ % 2 == 0, _ == 99, + x => x > 50 && x < 150, + x => x > 350 && x < 550, + x => (x > 1000 && x < 1500) || (x > 400 && x < 500) + ) def forallPredicates = List(_ >= 0, _ < 0, _ % 2 == 0, _ != 55, _ != 505, _ != 5005) def existsPredicates = List(_ >= 0, _ < 0, _ % 2 == 0, _ == 55, _ == 505, _ == 5005) def findPredicates = List(_ >= 0, _ % 2 == 0, _ < 0, _ == 50, _ == 500, _ == 5000) @@ -18,9 +24,14 @@ trait IntOperators extends Operators[Int] { (n: Int) => if (n == 0) List(1, 2, 3, 4, 5) else if (n < 0) List(1, 2, 3) else List() ) def filterPredicates = List( - _ % 2 == 0, _ % 3 == 0, _ % 4 != 0, _ % 17 != 0, n => n > 50 && n < 100, _ >= 0, _ < 0, _ == 99, - _ > 500, _ > 5000, _ > 50000, _ < 500, _ < 50, _ < -50, _ < -5e5, - x => true, x => false, x => x % 53 == 0 && x % 17 == 0 + _ % 2 == 0, _ % 3 == 0, + _ % 4 != 0, _ % 17 != 0, + n => n > 50 && n < 100, + _ >= 0, _ < 0, _ == 99, + _ > 500, _ > 5000, _ > 50000, + _ < 500, _ < 50, _ < -50, _ < -5e5, + x => true, x => false, + x => x % 53 == 0 && x % 17 == 0 ) def filterNotPredicates = filterPredicates def partitionPredicates = filterPredicates @@ -30,7 +41,8 @@ trait IntOperators extends Operators[Int] { _ < -100, _ < -1000, _ > -200, _ > -50, n => -90 < n && n < -10, n => 50 < n && n < 550, - n => 5000 < n && n < 7500 + n => 5000 < n && n < 7500, + n => -50 < n && n < 450 ) def dropWhilePredicates = takeWhilePredicates def spanPredicates = takeWhilePredicates diff --git a/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala b/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala index 019e8c4fde..255c04498e 100644 --- a/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala @@ -15,8 +15,8 @@ import scala.collection.parallel.ops._ abstract class ParallelArrayCheck[T](tp: String) extends ParallelSeqCheck[T]("ParArray[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParArray[T] diff --git a/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala b/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala index a98331dc86..9805e2644f 100644 --- a/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala @@ -22,8 +22,8 @@ // abstract class ParallelArrayViewCheck[T](tp: String) // extends ParallelSeqCheck[T]("ParallelSeqView[" + tp + ", ParallelArray[" + tp + "]]") { -// ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) -// ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) +// // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) +// // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) // type CollType = ParallelSeqView[T, ParallelArray[T], ArraySeq[T]] diff --git a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala index d53c0ba9d6..061bb08d9b 100644 --- a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala @@ -15,8 +15,8 @@ import scala.collection.parallel.ops._ abstract class ParallelHashMapCheck[K, V](tp: String) extends ParallelMapCheck[K, V]("mutable.ParHashMap[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParHashMap[K, V] @@ -64,7 +64,7 @@ with PairValues[Int, Int] } override def checkDataStructureInvariants(orig: Traversable[(Int, Int)], ds: AnyRef) = ds match { - case pm: ParHashMap[k, v] => + case pm: ParHashMap[k, v] if 1 == 0 => // disabled this to make tests faster val invs = pm.brokenInvariants val containsall = (for ((k, v) <- orig) yield { diff --git a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala index 973a5cdf4b..be70a7c7a3 100644 --- a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala @@ -15,8 +15,8 @@ import scala.collection.parallel.ops._ abstract class ParallelHashSetCheck[T](tp: String) extends ParallelSetCheck[T]("mutable.ParHashSet[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParHashSet[T] diff --git a/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala index 10329c19f2..bbec52dc92 100644 --- a/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala @@ -15,8 +15,8 @@ import scala.collection.parallel.ops._ abstract class ParallelHashMapCheck[K, V](tp: String) extends ParallelMapCheck[K, V]("immutable.ParHashMap[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParHashMap[K, V] @@ -67,8 +67,8 @@ with PairValues[Int, Int] abstract class ParallelHashSetCheck[T](tp: String) extends ParallelSetCheck[T]("immutable.ParHashSet[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParHashSet[T] diff --git a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala index e8838de3f5..8b5d72ea01 100644 --- a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala @@ -82,6 +82,33 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col case _ => t1 == t2 && t2 == t1 } + def printDebugInfo(coll: ParIterableLike[_, _, _]) { + println("Collection debug info: ") + coll.printDebugBuffer + println("Task debug info: ") + println(coll.tasksupport.debugMessages.mkString("\n")) + } + + def printComparison(t: Traversable[_], coll: ParIterable[_], tf: Traversable[_], cf: ParIterable[_], ind: Int) { + printDebugInfo(coll) + println("Operator: " + ind) + println("sz: " + t.size) + println(t) + println + println("sz: " + coll.size) + println(coll) + println("transformed to:") + println + println("size: " + tf.size) + println(tf) + println + println("size: " + cf.size) + println(cf) + println + println("tf == cf - " + (tf == cf)) + println("cf == tf - " + (cf == tf)) + } + property("reductions must be equal for assoc. operators") = forAll(collectionPairs) { case (t, coll) => if (t.size != 0) { val results = for ((op, ind) <- reduceOperators.zipWithIndex) yield { @@ -105,10 +132,11 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col val tc = t.count(pred) val cc = coll.count(pred) if (tc != cc) { - println("from: " + t) - println("and: " + coll.toList) + println("from: " + t + " - size: " + t.size) + println("and: " + coll + " - size: " + coll.toList.size) println(tc) println(cc) + printDebugInfo(coll) } ("op index: " + ind) |: tc == cc } @@ -184,11 +212,20 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col val cf = coll.filter(p) val invs = checkDataStructureInvariants(tf, cf) if (tf != cf || cf != tf || !invs) { + printDebugInfo(coll) + println("Operator: " + ind) + println("sz: " + t.size) println(t) + println + println("sz: " + coll.size) println(coll) + println println("filtered to:") + println println(cf) + println println(tf) + println println("tf == cf - " + (tf == cf)) println("cf == tf - " + (cf == tf)) printDataStructureDebugInfo(cf) @@ -199,8 +236,12 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col } property("filterNots must be equal") = forAll(collectionPairs) { case (t, coll) => - (for ((p, ind) <- filterNotPredicates.zipWithIndex) - yield ("op index: " + ind) |: t.filterNot(p) == coll.filterNot(p)).reduceLeft(_ && _) + (for ((p, ind) <- filterNotPredicates.zipWithIndex) yield { + val tf = t.filterNot(p) + val cf = coll.filterNot(p) + if (tf != cf || cf != tf) printComparison(t, coll, tf, cf, ind) + ("op index: " + ind) |: tf == cf && cf == tf + }).reduceLeft(_ && _) } if (!isCheckingViews) property("partitions must be equal") = forAll(collectionPairs) { case (t, coll) => diff --git a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala index 850a5d5473..c34fb872aa 100644 --- a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala @@ -18,8 +18,8 @@ import scala.collection.parallel.ops._ object ParallelRangeCheck extends ParallelSeqCheck[Int]("ParallelRange[Int]") with ops.IntSeqOperators { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = collection.parallel.ParSeq[Int] diff --git a/test/files/scalacheck/parallel-collections/pc.scala b/test/files/scalacheck/parallel-collections/pc.scala index efc393889e..590da6dba4 100644 --- a/test/files/scalacheck/parallel-collections/pc.scala +++ b/test/files/scalacheck/parallel-collections/pc.scala @@ -11,22 +11,22 @@ class ParCollProperties extends Properties("Parallel collections") { /* Collections */ // parallel arrays - //include(mutable.IntParallelArrayCheck) + include(mutable.IntParallelArrayCheck) // parallel ranges - //include(immutable.ParallelRangeCheck) + include(immutable.ParallelRangeCheck) // parallel immutable hash maps (tries) - //include(immutable.IntIntParallelHashMapCheck) + include(immutable.IntIntParallelHashMapCheck) // parallel immutable hash sets (tries) - //include(immutable.IntParallelHashSetCheck) + include(immutable.IntParallelHashSetCheck) // parallel mutable hash maps (tables) - //include(mutable.IntIntParallelHashMapCheck) + include(mutable.IntIntParallelHashMapCheck) // parallel mutable hash sets (tables) - //include(mutable.IntParallelHashSetCheck) + include(mutable.IntParallelHashSetCheck) // parallel vectors @@ -52,7 +52,7 @@ object Test { workers = 1, minSize = 0, maxSize = 4000, - minSuccessfulTests = 150 + minSuccessfulTests = 120 ), pc ) -- cgit v1.2.3