diff options
27 files changed, 371 insertions, 128 deletions
diff --git a/src/library/scala/collection/mutable/PriorityQueue.scala b/src/library/scala/collection/mutable/PriorityQueue.scala index 45f1d1f06f..453bed54e6 100644 --- a/src/library/scala/collection/mutable/PriorityQueue.scala +++ b/src/library/scala/collection/mutable/PriorityQueue.scala @@ -15,8 +15,8 @@ import generic._ import annotation.migration /** This class implements priority queues using a heap. - * To prioritize elements of type T there must be an implicit - * Ordering[T] available at creation. + * To prioritize elements of type A there must be an implicit + * Ordering[A] available at creation. * * @tparam A type of the elements in this priority queue. * @param ord implicit ordering used to compare the elements of type `A`. diff --git a/src/library/scala/collection/package.scala b/src/library/scala/collection/package.scala index 13b6f22826..31cea84ab8 100644 --- a/src/library/scala/collection/package.scala +++ b/src/library/scala/collection/package.scala @@ -87,6 +87,7 @@ package object collection { } def arrayString[T](array: Array[T], from: Int, until: Int) = array.slice(from, until).map(x => if (x != null) x.toString else "n/a").mkString(" | ") + } } diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 53a219ca28..83e5c6cb59 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -476,8 +476,7 @@ self => val copythis = new Copy(() => pbf(repr), parallelIterator) val copythat = wrap { val othtask = new other.Copy(() => pbf(self.repr), other.parallelIterator) - othtask.compute - othtask.result + other.tasksupport.executeAndWaitResult(othtask) } val task = (copythis parallel copythat) { _ combine _ } mapResult { _.result @@ -713,13 +712,12 @@ self => def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel) def split = pit.split.map(newSubtask(_)) // default split procedure private[parallel] override def signalAbort = pit.abort - override def toString = "Accessor(" + pit.toString + ")" + override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")" } protected[this] trait NonDivisibleTask[R, Tp] extends StrictSplitterCheckTask[R, Tp] { def shouldSplitFurther = false def split = throw new UnsupportedOperationException("Does not split.") - override def toString = "NonDivisibleTask" } protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]] @@ -768,9 +766,7 @@ self => var result: R1 = null.asInstanceOf[R1] def map(r: R): R1 def leaf(prevr: Option[R1]) = { - inner.compute - throwable = inner.throwable - if (throwable eq null) result = map(inner.result) + result = map(executeAndWaitResult(inner)) } private[parallel] override def signalAbort { inner.signalAbort @@ -787,10 +783,12 @@ self => } protected[this] class Count(pred: T => Boolean, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Int, Count] { + // val pittxt = pit.toString var result: Int = 0 def leaf(prevr: Option[Int]) = result = pit.count(pred) protected[this] def newSubtask(p: ParIterableIterator[T]) = new Count(pred, p) override def merge(that: Count) = result = result + that.result + // override def toString = "CountTask(" + pittxt + ")" } protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] { @@ -901,7 +899,9 @@ self => protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, This], Filter[U, This]] { var result: Combiner[U, This] = null - def leaf(prev: Option[Combiner[U, This]]) = result = pit.filter2combiner(pred, reuse(prev, cbf())) + def leaf(prev: Option[Combiner[U, This]]) = { + result = pit.filter2combiner(pred, reuse(prev, cbf())) + } protected[this] def newSubtask(p: ParIterableIterator[T]) = new Filter(pred, cbf, p) override def merge(that: Filter[U, This]) = result = result combine that.result } @@ -909,7 +909,9 @@ self => protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T]) extends Transformer[Combiner[U, This], FilterNot[U, This]] { var result: Combiner[U, This] = null - def leaf(prev: Option[Combiner[U, This]]) = result = pit.filterNot2combiner(pred, reuse(prev, cbf())) + def leaf(prev: Option[Combiner[U, This]]) = { + result = pit.filterNot2combiner(pred, reuse(prev, cbf())) + } protected[this] def newSubtask(p: ParIterableIterator[T]) = new FilterNot(pred, cbf, p) override def merge(that: FilterNot[U, This]) = result = result combine that.result } @@ -1253,6 +1255,24 @@ self => private[parallel] def brokenInvariants = Seq[String]() + private val debugBuffer = collection.mutable.ArrayBuffer[String]() + + private[parallel] def debugclear() = synchronized { + debugBuffer.clear + } + + private[parallel] def debuglog(s: String) = synchronized { + debugBuffer += s + } + + import collection.DebugUtils._ + private[parallel] def printDebugBuffer = println(buildString { + append => + for (s <- debugBuffer) { + append(s) + } + }) + } diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala index 9e24b83d8a..0ea33d0e39 100644 --- a/src/library/scala/collection/parallel/ParSeqLike.scala +++ b/src/library/scala/collection/parallel/ParSeqLike.scala @@ -242,8 +242,7 @@ self => val copystart = new Copy[U, That](() => pbf(repr), pits(0)) val copymiddle = wrap { val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator) - tsk.compute - tsk.result + that.tasksupport.executeAndWaitResult(tsk) } val copyend = new Copy[U, That](() => pbf(repr), pits(2)) executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala index 8a072b22aa..1b1ee6e469 100644 --- a/src/library/scala/collection/parallel/TaskSupport.scala +++ b/src/library/scala/collection/parallel/TaskSupport.scala @@ -6,7 +6,11 @@ package scala.collection.parallel -trait TaskSupport extends AdaptiveWorkStealingForkJoinTasks +trait TaskSupport extends Tasks + +class ForkJoinTaskSupport extends TaskSupport with AdaptiveWorkStealingForkJoinTasks + +class ThreadPoolTaskSupport extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index 6f0fe47a6a..ec38513d9b 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -7,7 +7,7 @@ import scala.concurrent.forkjoin._ import scala.util.control.Breaks._ - +import annotation.unchecked.uncheckedVariance @@ -20,49 +20,35 @@ import scala.util.control.Breaks._ */ trait Tasks { - /** A task abstraction which allows starting a task with `start`, - * waiting for it to finish with `sync` and attempting to cancel - * the task with `tryCancel`. - * It also defines a method `leaf` which must be called once the - * the task is started and defines what this task actually does. - * Method `split` allows splitting this task into smaller subtasks, - * and method `shouldSplitFurther` decides if the task should be - * partitioned further. - * Method `merge` allows merging the results of the 2 tasks. It updates - * the result of the receiver. - * Finally, it defines the task result of type `U`. - */ + private[parallel] val debugMessages = collection.mutable.ArrayBuffer[String]() + + private[parallel] def debuglog(s: String) = synchronized { + debugMessages += s + } + trait Task[R, +Tp] { type Result = R + def repr = this.asInstanceOf[Tp] - /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */ - def compute - /** Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous task - * or `None` if there was no previous task. + /** Body of the task - non-divisible unit of work done by this task. + * Optionally is provided with the result from the previous completed task + * or `None` if there was no previous task (or the previous task is uncompleted or unknown). */ def leaf(result: Option[R]) - /** Start task. */ - def start - /** Wait for task to finish. */ - def sync - /** Try to cancel the task. - * @return `true` if cancellation is successful. - */ - def tryCancel: Boolean /** A result that can be accessed once the task is completed. */ - def result: R + var result: R /** Decides whether or not this task should be split further. */ def shouldSplitFurther: Boolean /** Splits this task into a list of smaller tasks. */ - protected[this] def split: Seq[Task[R, Tp]] + private[parallel] def split: Seq[Task[R, Tp]] /** Read of results of `that` task and merge them into results of this one. */ - protected[this] def merge(that: Tp) {} + private[parallel] def merge(that: Tp @uncheckedVariance) {} // exception handling mechanism var throwable: Throwable = null def forwardThrowable = if (throwable != null) throw throwable // tries to do the leaf computation, storing the possible exception - protected def tryLeaf(result: Option[R]) { + private[parallel] def tryLeaf(result: Option[R]) { try { tryBreakable { leaf(result) @@ -75,7 +61,7 @@ trait Tasks { signalAbort } } - protected[this] def tryMerge(t: Tp) { + private[parallel] def tryMerge(t: Tp @uncheckedVariance) { val that = t.asInstanceOf[Task[R, Tp]] if (this.throwable == null && that.throwable == null) merge(t) mergeThrowables(that) @@ -90,16 +76,44 @@ trait Tasks { private[parallel] def signalAbort {} } - type TaskType[R, +Tp] <: Task[R, Tp] - type ExecutionEnvironment + trait TaskImpl[R, +Tp] { + /** the body of this task - what it executes, how it gets split and how results are merged. */ + val body: Task[R, Tp] + + def split: Seq[TaskImpl[R, Tp]] + /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */ + def compute + /** Start task. */ + def start + /** Wait for task to finish. */ + def sync + /** Try to cancel the task. + * @return `true` if cancellation is successful. + */ + def tryCancel: Boolean + /** If the task has been cancelled successfully, those syncing on it may + * automatically be notified, depending on the implementation. If they + * aren't, this release method should be called after processing the + * cancelled task. + * + * This method may be overridden. + */ + def release {} + } + + protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] - var environment: ExecutionEnvironment + /* task control */ + + // safe to assume it will always have the same type, + // because the `tasksupport` in parallel iterable is final + var environment: AnyRef /** Executes a task and returns a future. Forwards an exception if some task threw it. */ - def execute[R, Tp](fjtask: TaskType[R, Tp]): () => R + def execute[R, Tp](fjtask: Task[R, Tp]): () => R /** Executes a result task, waits for it to finish, then returns its result. Forwards an exception if some task threw it. */ - def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R + def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R /** Retrieves the parallelism level of the task execution environment. */ def parallelismLevel: Int @@ -107,40 +121,46 @@ trait Tasks { } + /** This trait implements scheduling by employing * an adaptive work stealing technique. */ trait AdaptiveWorkStealingTasks extends Tasks { - trait Task[R, Tp] extends super.Task[R, Tp] { - var next: Task[R, Tp] = null + trait TaskImpl[R, Tp] extends super.TaskImpl[R, Tp] { + var next: TaskImpl[R, Tp] = null var shouldWaitFor = true - var result: R - - def split: Seq[Task[R, Tp]] - /** The actual leaf computation. */ - def leaf(result: Option[R]): Unit + def split: Seq[TaskImpl[R, Tp]] - def compute = if (shouldSplitFurther) internal else tryLeaf(None) + def compute = if (body.shouldSplitFurther) internal else body.tryLeaf(None) def internal = { var last = spawnSubtasks - last.tryLeaf(None) - result = last.result + last.body.tryLeaf(None) + body.result = last.body.result while (last.next != null) { - val lastresult = Option(last.result) + // val lastresult = Option(last.body.result) + val beforelast = last last = last.next - if (last.tryCancel) last.tryLeaf(lastresult) else last.sync - tryMerge(last.repr) + if (last.tryCancel) { + // debuglog("Done with " + beforelast.body + ", next direct is " + last.body) + last.body.tryLeaf(Some(body.result)) + last.release + } else { + // debuglog("Done with " + beforelast.body + ", next sync is " + last.body) + last.sync + } + // debuglog("Merging " + body + " with " + last.body) + body.tryMerge(last.body.repr) } } def spawnSubtasks = { - var last: Task[R, Tp] = null - var head: Task[R, Tp] = this + var last: TaskImpl[R, Tp] = null + var head: TaskImpl[R, Tp] = this do { val subtasks = head.split head = subtasks.head @@ -149,7 +169,7 @@ trait AdaptiveWorkStealingTasks extends Tasks { last = t t.start } - } while (head.shouldSplitFurther); + } while (head.body.shouldSplitFurther); head.next = last head } @@ -165,6 +185,9 @@ trait AdaptiveWorkStealingTasks extends Tasks { } } + // specialize ctor + protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] + } @@ -176,6 +199,123 @@ trait HavingForkJoinPool { } +trait ThreadPoolTasks extends Tasks { + import java.util.concurrent._ + + trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] { + // initially, this is null + // once the task is started, this future is set and used for `sync` + // utb: var future: Future[_] = null + @volatile var owned = false + @volatile var completed = false + + def start = synchronized { + // debuglog("Starting " + body) + // utb: future = executor.submit(this) + executor.synchronized { + incrTasks + executor.submit(this) + } + } + def sync = synchronized { + // debuglog("Syncing on " + body) + // utb: future.get() + executor.synchronized { + val coresize = executor.getCorePoolSize + if (coresize < totaltasks) executor.setCorePoolSize(coresize + 1) + } + if (!completed) this.wait + } + def tryCancel = synchronized { + // utb: future.cancel(false) + if (!owned) { + // debuglog("Cancelling " + body) + owned = true + true + } else false + } + def run = { + // utb: compute + var isOkToRun = false + synchronized { + if (!owned) { + owned = true + isOkToRun = true + } + } + if (isOkToRun) { + // debuglog("Running body of " + body) + compute + release + } else { + // just skip + // debuglog("skipping body of " + body) + } + } + override def release = synchronized { + completed = true + decrTasks + this.notifyAll + } + } + + protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] + + var environment: AnyRef = ThreadPoolTasks.defaultThreadPool + def executor = environment.asInstanceOf[ThreadPoolExecutor] + def queue = executor.getQueue.asInstanceOf[LinkedBlockingQueue[Runnable]] + var totaltasks = 0 + + private def incrTasks = synchronized { + totaltasks += 1 + } + + private def decrTasks = synchronized { + totaltasks -= 1 + } + + def execute[R, Tp](task: Task[R, Tp]): () => R = { + val t = newTaskImpl(task) + + // debuglog("-----------> Executing without wait: " + task) + t.start + + () => { + t.sync + t.body.result + } + } + + def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { + val t = newTaskImpl(task) + + // debuglog("-----------> Executing with wait: " + task) + t.start + + t.sync + t.body.result + } + + def parallelismLevel = ThreadPoolTasks.numCores + +} + +object ThreadPoolTasks { + import java.util.concurrent._ + + val numCores = Runtime.getRuntime.availableProcessors + + val defaultThreadPool = new ThreadPoolExecutor( + numCores, + Int.MaxValue, + 60L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue[Runnable], + new ThreadPoolExecutor.CallerRunsPolicy + ) +} + + + /** An implementation trait for parallel tasks based on the fork/join framework. * * @define fjdispatch @@ -184,36 +324,37 @@ trait HavingForkJoinPool { */ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { - trait Task[R, +Tp] extends RecursiveAction with super.Task[R, Tp] { + trait TaskImpl[R, +Tp] extends RecursiveAction with super.TaskImpl[R, Tp] { def start = fork def sync = join def tryCancel = tryUnfork - var result: R } - type TaskType[R, +Tp] = Task[R, Tp] - type ExecutionEnvironment = ForkJoinPool + // specialize ctor + protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] /** The fork/join pool of this collection. */ - def forkJoinPool: ForkJoinPool = environment - var environment = ForkJoinTasks.defaultForkJoinPool + def forkJoinPool: ForkJoinPool = environment.asInstanceOf[ForkJoinPool] + var environment: AnyRef = ForkJoinTasks.defaultForkJoinPool /** Executes a task and does not wait for it to finish - instead returns a future. * * $fjdispatch */ - def execute[R, Tp](fjtask: Task[R, Tp]): () => R = { - if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) { + def execute[R, Tp](task: Task[R, Tp]): () => R = { + val fjtask = newTaskImpl(task) + + if (currentThread.isInstanceOf[ForkJoinWorkerThread]) { fjtask.fork } else { forkJoinPool.execute(fjtask) } () => { - fjtask.join - fjtask.forwardThrowable - fjtask.result + fjtask.sync + fjtask.body.forwardThrowable + fjtask.body.result } } @@ -224,15 +365,18 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { * * @return the result of the task */ - def executeAndWaitResult[R, Tp](fjtask: Task[R, Tp]): R = { - if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) { + def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { + val fjtask = newTaskImpl(task) + + if (currentThread.isInstanceOf[ForkJoinWorkerThread]) { fjtask.fork } else { forkJoinPool.execute(fjtask) } - fjtask.join - fjtask.forwardThrowable - fjtask.result + + fjtask.sync + fjtask.body.forwardThrowable + fjtask.body.result } def parallelismLevel = forkJoinPool.getParallelism @@ -251,11 +395,26 @@ object ForkJoinTasks { */ trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks { - trait Task[R, Tp] extends super[ForkJoinTasks].Task[R, Tp] with super[AdaptiveWorkStealingTasks].Task[R, Tp] + class TaskImpl[R, Tp](val body: Task[R, Tp]) + extends super[ForkJoinTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] { + def split = body.split.map(b => newTaskImpl(b)) + } + + def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b) } +trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveWorkStealingTasks { + + class TaskImpl[R, Tp](val body: Task[R, Tp]) + extends super[ThreadPoolTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] { + def split = body.split.map(b => newTaskImpl(b)) + } + + def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b) + +} diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala index 5c3720a3bf..58dce1aef4 100644 --- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala @@ -90,6 +90,7 @@ self => i < sz } def remaining = sz - i + override def toString = "HashTrieIterator(" + sz + ")" } private[parallel] def printDebugInfo { @@ -168,7 +169,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] => } override def toString = { - "HashTrieCombiner(buckets:\n\t" + buckets.filter(_ != null).mkString("\n\t") + ")\n" + "HashTrieCombiner(sz: " + size + ")" + //"HashTrieCombiner(buckets:\n\t" + buckets.filter(_ != null).mkString("\n\t") + ")\n" } /* tasks */ diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala index a1164b7b80..909b8eb5d7 100644 --- a/src/library/scala/collection/parallel/mutable/ParArray.scala +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -102,8 +102,11 @@ self => val left = remaining if (left >= 2) { val splitpoint = left / 2 - Seq(new ParArrayIterator(i, i + splitpoint, arr) with SCPI, - new ParArrayIterator(i + splitpoint, until, arr) with SCPI) + val sq = Seq( + new ParArrayIterator(i, i + splitpoint, arr) with SCPI, + new ParArrayIterator(i + splitpoint, until, arr) with SCPI) + i = until + sq } else { Seq(this) } diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala index d95e478fec..760f8b09ce 100644 --- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala +++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala @@ -38,7 +38,7 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] => pa } - override def toString = "ParArrayCombiner(" + size + "): " + chain + override def toString = "ParArrayCombiner(" + size + "): " //+ chain /* tasks */ diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index 8cd54e6083..19ae9aef5d 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -36,7 +36,9 @@ package object parallel { private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString) - private[parallel] def getTaskSupport: TaskSupport = new TaskSupport {} + private[parallel] def getTaskSupport: TaskSupport = + if (util.Properties.isJavaAtLeast("1.6")) new ForkJoinTaskSupport + else new ThreadPoolTaskSupport /* implicit conversions */ diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala index d0b7bae834..6ac8e7a3ad 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTableSets.scala @@ -137,7 +137,7 @@ object RefParHashTableSetBenches extends ParHashTableSetBenches[Dummy] { val phm = new ParHashSet[Dummy] for (i <- 0 until sz) phm += new Dummy(i) forkJoinPool.setParallelism(p) - phm.environment = forkJoinPool + phm.tasksupport.environment = forkJoinPool phm } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala index 291f7ec62d..83e3177324 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala @@ -206,7 +206,7 @@ object RefParHashTableBenches extends ParHashTableBenches[Dummy, Dummy] { val phm = new ParHashMap[Dummy, Dummy] for (i <- 0 until sz) phm += ((new Dummy(i), new Dummy(i))) forkJoinPool.setParallelism(p) - phm.environment = forkJoinPool + phm.tasksupport.environment = forkJoinPool phm } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala index 3e37086361..87a34e1e0e 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala @@ -164,7 +164,7 @@ object RefParHashTrieBenches extends ParHashTrieBenches[Dummy, Dummy] { var pht = new ParHashMap[Dummy, Dummy] for (i <- 0 until sz) pht += ((new Dummy(i), new Dummy(i))) forkJoinPool.setParallelism(p) - pht.environment = forkJoinPool + pht.tasksupport.environment = forkJoinPool pht } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala index 17ad2f9882..b5dcfca872 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/ForeachHeavy.scala @@ -7,7 +7,7 @@ object ForeachHeavy extends Companion { def benchName = "foreach-heavy"; def apply(sz: Int, parallelism: Int, what: String) = new ForeachHeavy(sz, parallelism, what) override def comparisons = List("jsr") - override def defaultSize = 16 + override def defaultSize = 2048 val fun = (a: Cont) => heavyOperation(a) val funjsr = new extra166y.Ops.Procedure[Cont] { @@ -21,7 +21,7 @@ object ForeachHeavy extends Companion { def checkPrime(n: Int) = { var isPrime = true var i = 2 - val until = scala.math.sqrt(n).toInt + 1 + val until = 550 while (i < until) { if (n % i == 0) isPrime = false i += 1 diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala index be49995589..e4eb51d83b 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala @@ -39,7 +39,7 @@ extends Resettable(sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) { def assignProduct(a: Matrix[T], b: Matrix[T]) = { val range = new ParRange(0, n * n, 1, false) - range.environment = forkjoinpool + range.tasksupport.environment = forkjoinpool for (i <- range) this(i / n, i % n) = calcProduct(a, b, i / n, i % n); } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala index 68ceac2b53..c75432360b 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/Resettable.scala @@ -77,7 +77,7 @@ extends Bench with SequentialOps[T] { for (i <- 0 until size) arr(i) = elemcreator(i) case "par" => pa = new ParArray[T](size) - pa.environment = forkjoinpool + pa.tasksupport.environment = forkjoinpool for (i <- 0 until size) pa(i) = elemcreator(i) case "jsr" => jsrarr = JSR166Array.create(size, cls, papool) diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala index 14a6259a38..6cd1d74c5e 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala @@ -22,7 +22,7 @@ object RangeBenches extends StandardParIterableBenches[Int, ParRange] { def createParallel(sz: Int, p: Int) = { val pr = new collection.parallel.immutable.ParRange(0, sz, 1, false) forkJoinPool.setParallelism(p) - pr.environment = forkJoinPool + pr.tasksupport.environment = forkJoinPool pr } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala index eed62fc5c1..abd9b7838f 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_view/SeqViewBenches.scala @@ -31,7 +31,7 @@ extends ParSeqViewBenches[Dummy, ParSeqView[Dummy, ParSeq[Dummy], Seq[Dummy]], S forkJoinPool.setParallelism(p) for (i <- 0 until sz) pa(i) = new Dummy(i) val v = pa.view - v.environment = forkJoinPool + v.tasksupport.environment = forkJoinPool v } def createSeqView(sz: Int, p: Int) = createSequential(sz, p).view diff --git a/test/files/scalacheck/parallel-collections/IntOperators.scala b/test/files/scalacheck/parallel-collections/IntOperators.scala index 8d214b614f..690ee34cca 100644 --- a/test/files/scalacheck/parallel-collections/IntOperators.scala +++ b/test/files/scalacheck/parallel-collections/IntOperators.scala @@ -5,8 +5,14 @@ import scala.collection.parallel._ trait IntOperators extends Operators[Int] { - def reduceOperators = List(_ + _, _ * _, math.min(_, _), math.max(_, _)) - def countPredicates = List(_ >= 0, _ < 0, _ < 50, _ < 500, _ < 5000, _ < 50000, _ % 2 == 0, _ == 99) + def reduceOperators = List(_ + _, _ * _, math.min(_, _), math.max(_, _), _ ^ _) + def countPredicates = List( + x => true, + _ >= 0, _ < 0, _ < 50, _ < 500, _ < 5000, _ < 50000, _ % 2 == 0, _ == 99, + x => x > 50 && x < 150, + x => x > 350 && x < 550, + x => (x > 1000 && x < 1500) || (x > 400 && x < 500) + ) def forallPredicates = List(_ >= 0, _ < 0, _ % 2 == 0, _ != 55, _ != 505, _ != 5005) def existsPredicates = List(_ >= 0, _ < 0, _ % 2 == 0, _ == 55, _ == 505, _ == 5005) def findPredicates = List(_ >= 0, _ % 2 == 0, _ < 0, _ == 50, _ == 500, _ == 5000) @@ -18,9 +24,14 @@ trait IntOperators extends Operators[Int] { (n: Int) => if (n == 0) List(1, 2, 3, 4, 5) else if (n < 0) List(1, 2, 3) else List() ) def filterPredicates = List( - _ % 2 == 0, _ % 3 == 0, _ % 4 != 0, _ % 17 != 0, n => n > 50 && n < 100, _ >= 0, _ < 0, _ == 99, - _ > 500, _ > 5000, _ > 50000, _ < 500, _ < 50, _ < -50, _ < -5e5, - x => true, x => false, x => x % 53 == 0 && x % 17 == 0 + _ % 2 == 0, _ % 3 == 0, + _ % 4 != 0, _ % 17 != 0, + n => n > 50 && n < 100, + _ >= 0, _ < 0, _ == 99, + _ > 500, _ > 5000, _ > 50000, + _ < 500, _ < 50, _ < -50, _ < -5e5, + x => true, x => false, + x => x % 53 == 0 && x % 17 == 0 ) def filterNotPredicates = filterPredicates def partitionPredicates = filterPredicates @@ -30,7 +41,8 @@ trait IntOperators extends Operators[Int] { _ < -100, _ < -1000, _ > -200, _ > -50, n => -90 < n && n < -10, n => 50 < n && n < 550, - n => 5000 < n && n < 7500 + n => 5000 < n && n < 7500, + n => -50 < n && n < 450 ) def dropWhilePredicates = takeWhilePredicates def spanPredicates = takeWhilePredicates diff --git a/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala b/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala index 019e8c4fde..255c04498e 100644 --- a/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala @@ -15,8 +15,8 @@ import scala.collection.parallel.ops._ abstract class ParallelArrayCheck[T](tp: String) extends ParallelSeqCheck[T]("ParArray[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParArray[T] diff --git a/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala b/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala index a98331dc86..9805e2644f 100644 --- a/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelArrayViewCheck.scala @@ -22,8 +22,8 @@ // abstract class ParallelArrayViewCheck[T](tp: String) // extends ParallelSeqCheck[T]("ParallelSeqView[" + tp + ", ParallelArray[" + tp + "]]") { -// ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) -// ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) +// // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) +// // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) // type CollType = ParallelSeqView[T, ParallelArray[T], ArraySeq[T]] diff --git a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala index d53c0ba9d6..061bb08d9b 100644 --- a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala @@ -15,8 +15,8 @@ import scala.collection.parallel.ops._ abstract class ParallelHashMapCheck[K, V](tp: String) extends ParallelMapCheck[K, V]("mutable.ParHashMap[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParHashMap[K, V] @@ -64,7 +64,7 @@ with PairValues[Int, Int] } override def checkDataStructureInvariants(orig: Traversable[(Int, Int)], ds: AnyRef) = ds match { - case pm: ParHashMap[k, v] => + case pm: ParHashMap[k, v] if 1 == 0 => // disabled this to make tests faster val invs = pm.brokenInvariants val containsall = (for ((k, v) <- orig) yield { diff --git a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala index 973a5cdf4b..be70a7c7a3 100644 --- a/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelHashSetCheck.scala @@ -15,8 +15,8 @@ import scala.collection.parallel.ops._ abstract class ParallelHashSetCheck[T](tp: String) extends ParallelSetCheck[T]("mutable.ParHashSet[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParHashSet[T] diff --git a/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala index 10329c19f2..bbec52dc92 100644 --- a/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelHashTrieCheck.scala @@ -15,8 +15,8 @@ import scala.collection.parallel.ops._ abstract class ParallelHashMapCheck[K, V](tp: String) extends ParallelMapCheck[K, V]("immutable.ParHashMap[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParHashMap[K, V] @@ -67,8 +67,8 @@ with PairValues[Int, Int] abstract class ParallelHashSetCheck[T](tp: String) extends ParallelSetCheck[T]("immutable.ParHashSet[" + tp + "]") { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = ParHashSet[T] diff --git a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala index e8838de3f5..8b5d72ea01 100644 --- a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala @@ -82,6 +82,33 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col case _ => t1 == t2 && t2 == t1 } + def printDebugInfo(coll: ParIterableLike[_, _, _]) { + println("Collection debug info: ") + coll.printDebugBuffer + println("Task debug info: ") + println(coll.tasksupport.debugMessages.mkString("\n")) + } + + def printComparison(t: Traversable[_], coll: ParIterable[_], tf: Traversable[_], cf: ParIterable[_], ind: Int) { + printDebugInfo(coll) + println("Operator: " + ind) + println("sz: " + t.size) + println(t) + println + println("sz: " + coll.size) + println(coll) + println("transformed to:") + println + println("size: " + tf.size) + println(tf) + println + println("size: " + cf.size) + println(cf) + println + println("tf == cf - " + (tf == cf)) + println("cf == tf - " + (cf == tf)) + } + property("reductions must be equal for assoc. operators") = forAll(collectionPairs) { case (t, coll) => if (t.size != 0) { val results = for ((op, ind) <- reduceOperators.zipWithIndex) yield { @@ -105,10 +132,11 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col val tc = t.count(pred) val cc = coll.count(pred) if (tc != cc) { - println("from: " + t) - println("and: " + coll.toList) + println("from: " + t + " - size: " + t.size) + println("and: " + coll + " - size: " + coll.toList.size) println(tc) println(cc) + printDebugInfo(coll) } ("op index: " + ind) |: tc == cc } @@ -184,11 +212,20 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col val cf = coll.filter(p) val invs = checkDataStructureInvariants(tf, cf) if (tf != cf || cf != tf || !invs) { + printDebugInfo(coll) + println("Operator: " + ind) + println("sz: " + t.size) println(t) + println + println("sz: " + coll.size) println(coll) + println println("filtered to:") + println println(cf) + println println(tf) + println println("tf == cf - " + (tf == cf)) println("cf == tf - " + (cf == tf)) printDataStructureDebugInfo(cf) @@ -199,8 +236,12 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col } property("filterNots must be equal") = forAll(collectionPairs) { case (t, coll) => - (for ((p, ind) <- filterNotPredicates.zipWithIndex) - yield ("op index: " + ind) |: t.filterNot(p) == coll.filterNot(p)).reduceLeft(_ && _) + (for ((p, ind) <- filterNotPredicates.zipWithIndex) yield { + val tf = t.filterNot(p) + val cf = coll.filterNot(p) + if (tf != cf || cf != tf) printComparison(t, coll, tf, cf, ind) + ("op index: " + ind) |: tf == cf && cf == tf + }).reduceLeft(_ && _) } if (!isCheckingViews) property("partitions must be equal") = forAll(collectionPairs) { case (t, coll) => diff --git a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala index 850a5d5473..c34fb872aa 100644 --- a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala @@ -18,8 +18,8 @@ import scala.collection.parallel.ops._ object ParallelRangeCheck extends ParallelSeqCheck[Int]("ParallelRange[Int]") with ops.IntSeqOperators { - ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) - ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) type CollType = collection.parallel.ParSeq[Int] diff --git a/test/files/scalacheck/parallel-collections/pc.scala b/test/files/scalacheck/parallel-collections/pc.scala index efc393889e..590da6dba4 100644 --- a/test/files/scalacheck/parallel-collections/pc.scala +++ b/test/files/scalacheck/parallel-collections/pc.scala @@ -11,22 +11,22 @@ class ParCollProperties extends Properties("Parallel collections") { /* Collections */ // parallel arrays - //include(mutable.IntParallelArrayCheck) + include(mutable.IntParallelArrayCheck) // parallel ranges - //include(immutable.ParallelRangeCheck) + include(immutable.ParallelRangeCheck) // parallel immutable hash maps (tries) - //include(immutable.IntIntParallelHashMapCheck) + include(immutable.IntIntParallelHashMapCheck) // parallel immutable hash sets (tries) - //include(immutable.IntParallelHashSetCheck) + include(immutable.IntParallelHashSetCheck) // parallel mutable hash maps (tables) - //include(mutable.IntIntParallelHashMapCheck) + include(mutable.IntIntParallelHashMapCheck) // parallel mutable hash sets (tables) - //include(mutable.IntParallelHashSetCheck) + include(mutable.IntParallelHashSetCheck) // parallel vectors @@ -52,7 +52,7 @@ object Test { workers = 1, minSize = 0, maxSize = 4000, - minSuccessfulTests = 150 + minSuccessfulTests = 120 ), pc ) |