From a730fb5cc6cea39a29e9ff4cd666fa8498f6adec Mon Sep 17 00:00:00 2001 From: Aleksandar Pokopec Date: Thu, 9 Dec 2010 10:08:11 +0000 Subject: Fixing jvm 1.5 support for parallel collections. Special cased with thread pool executor scheduling. Fixed an ugly concurrency bug where futures returned by a thread pool executor didn't remove the task from the queue when cancel was called. Note to self and others: don't cancel futures returned by thread pool executors, it might lead to unexpected behaviour. Modified the executor to add new threads if all the active threads are syncing, in order to avoid deadlocks. Fixed a hidden bug in AdaptiveWorkStealingTasks, where correct behaviour depended on the execution order of the tasks. This didn't fail before with ForkJoinTasks, since there the execution order is well-defined. Scalachecked 1.5 & 1.6 support. No review. --- .../parallel/benchmarks/hashtables/ParallelHashTableSets.scala | 2 +- .../parallel/benchmarks/hashtables/ParallelHashTables.scala | 2 +- .../collection/parallel/benchmarks/hashtries/ParallelHashTries.scala | 2 +- .../collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala | 4 ++-- .../parallel/benchmarks/parallel_array/MatrixMultiplication.scala | 2 +- .../collection/parallel/benchmarks/parallel_array/Resettable.scala | 2 +- .../collection/parallel/benchmarks/parallel_range/RangeBenches.scala | 2 +- .../collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala | 2 +- 8 files changed, 9 insertions(+), 9 deletions(-) (limited to 'test/benchmarks') diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala index d0b7bae834..6ac8e7a3ad 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala @@ -137,7 +137,7 @@ object RefParHashTableSetBenches extends ParHashTableSetBenches[Dummy] { val phm = new ParHashSet[Dummy] for (i <- 0 until sz) phm += new Dummy(i) forkJoinPool.setParallelism(p) - phm.environment = forkJoinPool + phm.tasksupport.environment = forkJoinPool phm } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala index 291f7ec62d..83e3177324 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala @@ -206,7 +206,7 @@ object RefParHashTableBenches extends ParHashTableBenches[Dummy, Dummy] { val phm = new ParHashMap[Dummy, Dummy] for (i <- 0 until sz) phm += ((new Dummy(i), new Dummy(i))) forkJoinPool.setParallelism(p) - phm.environment = forkJoinPool + phm.tasksupport.environment = forkJoinPool phm } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala index 3e37086361..87a34e1e0e 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala @@ -164,7 +164,7 @@ object RefParHashTrieBenches extends ParHashTrieBenches[Dummy, Dummy] { var pht = new ParHashMap[Dummy, Dummy] for (i <- 0 until sz) pht += ((new Dummy(i), new Dummy(i))) forkJoinPool.setParallelism(p) - pht.environment = forkJoinPool + pht.tasksupport.environment = forkJoinPool pht } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala index 17ad2f9882..b5dcfca872 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala @@ -7,7 +7,7 @@ object ForeachHeavy extends Companion { def benchName = "foreach-heavy"; def apply(sz: Int, parallelism: Int, what: String) = new ForeachHeavy(sz, parallelism, what) override def comparisons = List("jsr") - override def defaultSize = 16 + override def defaultSize = 2048 val fun = (a: Cont) => heavyOperation(a) val funjsr = new extra166y.Ops.Procedure[Cont] { @@ -21,7 +21,7 @@ object ForeachHeavy extends Companion { def checkPrime(n: Int) = { var isPrime = true var i = 2 - val until = scala.math.sqrt(n).toInt + 1 + val until = 550 while (i < until) { if (n % i == 0) isPrime = false i += 1 diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala index be49995589..e4eb51d83b 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala @@ -39,7 +39,7 @@ extends Resettable(sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) { def assignProduct(a: Matrix[T], b: Matrix[T]) = { val range = new ParRange(0, n * n, 1, false) - range.environment = forkjoinpool + range.tasksupport.environment = forkjoinpool for (i <- range) this(i / n, i % n) = calcProduct(a, b, i / n, i % n); } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala index 68ceac2b53..c75432360b 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala @@ -77,7 +77,7 @@ extends Bench with SequentialOps[T] { for (i <- 0 until size) arr(i) = elemcreator(i) case "par" => pa = new ParArray[T](size) - pa.environment = forkjoinpool + pa.tasksupport.environment = forkjoinpool for (i <- 0 until size) pa(i) = elemcreator(i) case "jsr" => jsrarr = JSR166Array.create(size, cls, papool) diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala index 14a6259a38..6cd1d74c5e 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala @@ -22,7 +22,7 @@ object RangeBenches extends StandardParIterableBenches[Int, ParRange] { def createParallel(sz: Int, p: Int) = { val pr = new collection.parallel.immutable.ParRange(0, sz, 1, false) forkJoinPool.setParallelism(p) - pr.environment = forkJoinPool + pr.tasksupport.environment = forkJoinPool pr } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala index eed62fc5c1..abd9b7838f 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala @@ -31,7 +31,7 @@ extends ParSeqViewBenches[Dummy, ParSeqView[Dummy, ParSeq[Dummy], Seq[Dummy]], S forkJoinPool.setParallelism(p) for (i <- 0 until sz) pa(i) = new Dummy(i) val v = pa.view - v.environment = forkJoinPool + v.tasksupport.environment = forkJoinPool v } def createSeqView(sz: Int, p: Int) = createSequential(sz, p).view -- cgit v1.2.3