diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-12-09 10:08:11 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-12-09 10:08:11 +0000 |
commit | a730fb5cc6cea39a29e9ff4cd666fa8498f6adec (patch) | |
tree | a271e1b7cdd2bd044344fa7aef19d6820637b3a5 /test | |
parent | 4dbe72f83f7ade1517bad7444009d3d0c8a69bd5 (diff) | |
download | scala-a730fb5cc6cea39a29e9ff4cd666fa8498f6adec.tar.gz scala-a730fb5cc6cea39a29e9ff4cd666fa8498f6adec.tar.bz2 scala-a730fb5cc6cea39a29e9ff4cd666fa8498f6adec.zip |
Fixing jvm 1.5 support for parallel collections.
Special cased with thread pool executor scheduling. Fixed an ugly
concurrency bug where futures returned by a thread pool executor didn't
remove the task from the queue when cancel was called. Note to self and
others: don't cancel futures returned by thread pool executors, it might
lead to unexpected behaviour. Modified the executor to add new threads
if all the active threads are syncing, in order to avoid deadlocks.
Fixed a hidden bug in AdaptiveWorkStealingTasks, where correct behaviour
depended on the execution order of the tasks. This didn't fail before
with ForkJoinTasks, since there the execution order is well-defined.
Scalachecked 1.5 & 1.6 support.
No review.
Diffstat (limited to 'test')
17 files changed, 94 insertions, 41 deletions
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala index d0b7bae834..6ac8e7a3ad 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala @@ -137,7 +137,7 @@ object RefParHashTableSetBenches extends ParHashTableSetBenches[Dummy] { val phm = new ParHashSet[Dummy] for (i <- 0 until sz) phm += new Dummy(i) forkJoinPool.setParallelism(p) - phm.environment = forkJoinPool + phm.tasksupport.environment = forkJoinPool phm } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala index 291f7ec62d..83e3177324 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala @@ -206,7 +206,7 @@ object RefParHashTableBenches extends ParHashTableBenches[Dummy, Dummy] { val phm = new ParHashMap[Dummy, Dummy] for (i <- 0 until sz) phm += ((new Dummy(i), new Dummy(i))) forkJoinPool.setParallelism(p) - phm.environment = forkJoinPool + phm.tasksupport.environment = forkJoinPool phm } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala index 3e37086361..87a34e1e0e 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala @@ -164,7 +164,7 @@ object RefParHashTrieBenches extends ParHashTrieBenches[Dummy, Dummy] { var pht = new ParHashMap[Dummy, Dummy] for (i <- 0 until sz) pht += ((new Dummy(i), new Dummy(i))) forkJoinPool.setParallelism(p) - pht.environment = forkJoinPool + pht.tasksupport.environment = forkJoinPool pht } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala index 17ad2f9882..b5dcfca872 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala @@ -7,7 +7,7 @@ object ForeachHeavy extends Companion { def benchName = "foreach-heavy"; def apply(sz: Int, parallelism: Int, what: String) = new ForeachHeavy(sz, parallelism, what) override def comparisons = List("jsr") - override def defaultSize = 16 + override def defaultSize = 2048 val fun = (a: Cont) => heavyOperation(a) val funjsr = new extra166y.Ops.Procedure[Cont] { @@ -21,7 +21,7 @@ object ForeachHeavy extends Companion { def checkPrime(n: Int) = { var isPrime = true var i = 2 - val until = scala.math.sqrt(n).toInt + 1 + val until = 550 while (i < until) { if (n % i == 0) isPrime = false i += 1 diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala index be49995589..e4eb51d83b 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala @@ -39,7 +39,7 @@ extends Resettable(sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) { def assignProduct(a: Matrix[T], b: Matrix[T]) = { val range = new ParRange(0, n * n, 1, false) - range.environment = forkjoinpool + range.tasksupport.environment = forkjoinpool for (i <- range) this(i / n, i % n) = calcProduct(a, b, i / n, i % n); } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala index 68ceac2b53..c75432360b 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala @@ -77,7 +77,7 @@ extends Bench with SequentialOps[T] { for (i <- 0 until size) arr(i) = elemcreator(i) case "par" => pa = new ParArray[T](size) - pa.environment = forkjoinpool + pa.tasksupport.environment = forkjoinpool for (i <- 0 until size) pa(i) = elemcreator(i) case "jsr" => jsrarr = JSR166Array.create(size, cls, papool) diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala index 14a6259a38..6cd1d74c5e 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala @@ -22,7 +22,7 @@ object RangeBenches extends StandardParIterableBenches[Int, ParRange] { def createParallel(sz: Int, p: Int) = { val pr = new collection.parallel.immutable.ParRange(0, sz, 1, false) forkJoinPool.setParallelism(p) - pr.environment = forkJoinPool + pr.tasksupport.environment = forkJoinPool pr } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala index eed62fc5c1..abd9b7838f 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala @@ -31,7 +31,7 @@ extends ParSeqViewBenches[Dummy, ParSeqView[Dummy, ParSeq[Dummy], Seq[Dummy]], S forkJoinPool.setParallelism(p) for (i <- 0 until sz) pa(i) = new Dummy(i) val v = pa.view - v.environment = forkJoinPool + v.tasksupport.environment = forkJoinPool v } def createSeqView(sz: Int, p: Int) = createSequential(sz, p).view diff --git a/test/files/scalacheck/parallel-collections/IntOperators.scala b/test/files/scalacheck/parallel-collections/IntOperators.scala index 8d214b614f..690ee34cca 100644 --- a/test/files/scalacheck/parallel-collections/IntOperators.scala +++ b/test/files/scalacheck/parallel-collections/IntOperators.scala @@ -5,8 +5,14 @@ import scala.collection.parallel._ trait IntOperators extends Operators[Int] { - def reduceOperators = List(_ + _, _ * _, math.min(_, _), math.max(_, _)) - def countPredicates = List(_ >= 0, _ < 0, _ < 50, _ < 500, _ < 5000, _ < 50000, _ % 2 == 0, _ == 99) + def reduceOperators = List(_ + _, _ * _, math.min(_, _), math.max(_, _), _ ^ _) + def countPredicates = List( + x => true, + _ >= 0, _ < 0, _ < 50, _ < 500, _ < 5000, _ < 50000, _ % 2 == 0, _ == 99, + x => x > 50 && x < 150, + x => x > 350 && x < 550, + x => (x > 1000 && x < 1500) || (x > 400 && x < 500) + ) def forallPredicates = List(_ >= 0, _ < 0, _ % 2 == 0, _ != 55, _ != 505, _ != 5005) def existsPredicates = List(_ >= 0, _ < 0, _ % 2 == 0, _ == 55, _ == 505, _ == 5005) def findPredicates = List(_ >= 0, _ % 2 == 0, _ < 0, _ == 50, _ == 500, _ == 5000) @@ -18,9 +24,14 @@ trait IntOperators extends Operators[Int] { (n: Int) => if (n == 0) List(1, 2, 3, 4, 5) else if (n < 0) List(1, 2, 3) else List() ) def filterPredicates = List( - _ % 2 == 0, _ % 3 == 0, _ % 4 != 0, _ % 17 != 0, n => n > 50 && n < 100, _ >= 0, _ < 0, _ == 99, - _ > 500, _ > 5000, _ > 50000, _ < 500, _ < 50, _ < -50, _ < -5e5, - x => true, x => false, x => x % 53 == 0 && x % 17 == 0 + _ % 2 == 0, _ % 3 == 0, + _ % 4 != 0, _ % 17 != 0, + n => n > 50 && n < 100, + _ >= 0, _ < 0, _ == 99, + _ > 500, _ > 5000, _ > 50000, + _ < 500, _ < 50, _ < -50, _ < -5e5, + x => true, x => false, + x => x % 53 == 0 && x % 17 == 0 ) def filterNotPredicates = filterPredicates def partitionPredicates = filterPredicates @@ -30,7 +41,8 @@ trait IntOperators extends Operators[Int] { _ < -100, _ < -1000, _ > -200, _ > -50, n => -90 < n && n < -10, n => 50 < n && n < 550, - n => 5000 < n && n < 7500 + n => 5000 < n && n < 7500, + n => -50 < n && n < 450 ) def dropWhilePredicates = takeWhilePredicates def spanPredicates = takeWhilePredicates diff --git a/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala b/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala index 019e8c4fde..255c04498e 100644 --- a/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala @@ -15,8 +15,8 @@ import scala.collection.parallel.ops._ abstract class ParallelArrayCheck[T](tp: String) extends ParallelSeqCheck[T]("ParArray[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParArray[T] diff --git a/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala b/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala index a98331dc86..9805e2644f 100644 --- a/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala @@ -22,8 +22,8 @@ // abstract class ParallelArrayViewCheck[T](tp: String) // extends ParallelSeqCheck[T]("ParallelSeqView[" + tp + ", ParallelArray[" + tp + "]]") { -// ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) -// ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) +// // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) +// // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) // type CollType = ParallelSeqView[T, ParallelArray[T], ArraySeq[T]] diff --git a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala index d53c0ba9d6..061bb08d9b 100644 --- a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala @@ -15,8 +15,8 @@ import scala.collection.parallel.ops._ abstract class ParallelHashMapCheck[K, V](tp: String) extends ParallelMapCheck[K, V]("mutable.ParHashMap[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParHashMap[K, V] @@ -64,7 +64,7 @@ with PairValues[Int, Int] } override def checkDataStructureInvariants(orig: Traversable[(Int, Int)], ds: AnyRef) = ds match { - case pm: ParHashMap[k, v] => + case pm: ParHashMap[k, v] if 1 == 0 => // disabled this to make tests faster val invs = pm.brokenInvariants val containsall = (for ((k, v) <- orig) yield { diff --git a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala index 973a5cdf4b..be70a7c7a3 100644 --- a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala @@ -15,8 +15,8 @@ import scala.collection.parallel.ops._ abstract class ParallelHashSetCheck[T](tp: String) extends ParallelSetCheck[T]("mutable.ParHashSet[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParHashSet[T] diff --git a/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala index 10329c19f2..bbec52dc92 100644 --- a/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala @@ -15,8 +15,8 @@ import scala.collection.parallel.ops._ abstract class ParallelHashMapCheck[K, V](tp: String) extends ParallelMapCheck[K, V]("immutable.ParHashMap[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParHashMap[K, V] @@ -67,8 +67,8 @@ with PairValues[Int, Int] abstract class ParallelHashSetCheck[T](tp: String) extends ParallelSetCheck[T]("immutable.ParHashSet[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParHashSet[T] diff --git a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala index e8838de3f5..8b5d72ea01 100644 --- a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala @@ -82,6 +82,33 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col case _ => t1 == t2 && t2 == t1 } + def printDebugInfo(coll: ParIterableLike[_, _, _]) { + println("Collection debug info: ") + coll.printDebugBuffer + println("Task debug info: ") + println(coll.tasksupport.debugMessages.mkString("\n")) + } + + def printComparison(t: Traversable[_], coll: ParIterable[_], tf: Traversable[_], cf: ParIterable[_], ind: Int) { + printDebugInfo(coll) + println("Operator: " + ind) + println("sz: " + t.size) + println(t) + println + println("sz: " + coll.size) + println(coll) + println("transformed to:") + println + println("size: " + tf.size) + println(tf) + println + println("size: " + cf.size) + println(cf) + println + println("tf == cf - " + (tf == cf)) + println("cf == tf - " + (cf == tf)) + } + property("reductions must be equal for assoc. operators") = forAll(collectionPairs) { case (t, coll) => if (t.size != 0) { val results = for ((op, ind) <- reduceOperators.zipWithIndex) yield { @@ -105,10 +132,11 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col val tc = t.count(pred) val cc = coll.count(pred) if (tc != cc) { - println("from: " + t) - println("and: " + coll.toList) + println("from: " + t + " - size: " + t.size) + println("and: " + coll + " - size: " + coll.toList.size) println(tc) println(cc) + printDebugInfo(coll) } ("op index: " + ind) |: tc == cc } @@ -184,11 +212,20 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col val cf = coll.filter(p) val invs = checkDataStructureInvariants(tf, cf) if (tf != cf || cf != tf || !invs) { + printDebugInfo(coll) + println("Operator: " + ind) + println("sz: " + t.size) println(t) + println + println("sz: " + coll.size) println(coll) + println println("filtered to:") + println println(cf) + println println(tf) + println println("tf == cf - " + (tf == cf)) println("cf == tf - " + (cf == tf)) printDataStructureDebugInfo(cf) @@ -199,8 +236,12 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col } property("filterNots must be equal") = forAll(collectionPairs) { case (t, coll) => - (for ((p, ind) <- filterNotPredicates.zipWithIndex) - yield ("op index: " + ind) |: t.filterNot(p) == coll.filterNot(p)).reduceLeft(_ && _) + (for ((p, ind) <- filterNotPredicates.zipWithIndex) yield { + val tf = t.filterNot(p) + val cf = coll.filterNot(p) + if (tf != cf || cf != tf) printComparison(t, coll, tf, cf, ind) + ("op index: " + ind) |: tf == cf && cf == tf + }).reduceLeft(_ && _) } if (!isCheckingViews) property("partitions must be equal") = forAll(collectionPairs) { case (t, coll) => diff --git a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala index 850a5d5473..c34fb872aa 100644 --- a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala @@ -18,8 +18,8 @@ import scala.collection.parallel.ops._ object ParallelRangeCheck extends ParallelSeqCheck[Int]("ParallelRange[Int]") with ops.IntSeqOperators { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = collection.parallel.ParSeq[Int] diff --git a/test/files/scalacheck/parallel-collections/pc.scala b/test/files/scalacheck/parallel-collections/pc.scala index efc393889e..590da6dba4 100644 --- a/test/files/scalacheck/parallel-collections/pc.scala +++ b/test/files/scalacheck/parallel-collections/pc.scala @@ -11,22 +11,22 @@ class ParCollProperties extends Properties("Parallel collections") { /* Collections */ // parallel arrays - //include(mutable.IntParallelArrayCheck) + include(mutable.IntParallelArrayCheck) // parallel ranges - //include(immutable.ParallelRangeCheck) + include(immutable.ParallelRangeCheck) // parallel immutable hash maps (tries) - //include(immutable.IntIntParallelHashMapCheck) + include(immutable.IntIntParallelHashMapCheck) // parallel immutable hash sets (tries) - //include(immutable.IntParallelHashSetCheck) + include(immutable.IntParallelHashSetCheck) // parallel mutable hash maps (tables) - //include(mutable.IntIntParallelHashMapCheck) + include(mutable.IntIntParallelHashMapCheck) // parallel mutable hash sets (tables) - //include(mutable.IntParallelHashSetCheck) + include(mutable.IntParallelHashSetCheck) // parallel vectors @@ -52,7 +52,7 @@ object Test { workers = 1, minSize = 0, maxSize = 4000, - minSuccessfulTests = 150 + minSuccessfulTests = 120 ), pc ) |