summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-09 10:08:11 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-09 10:08:11 +0000
commita730fb5cc6cea39a29e9ff4cd666fa8498f6adec (patch)
treea271e1b7cdd2bd044344fa7aef19d6820637b3a5 /src/library
parent4dbe72f83f7ade1517bad7444009d3d0c8a69bd5 (diff)
downloadscala-a730fb5cc6cea39a29e9ff4cd666fa8498f6adec.tar.gz
scala-a730fb5cc6cea39a29e9ff4cd666fa8498f6adec.tar.bz2
scala-a730fb5cc6cea39a29e9ff4cd666fa8498f6adec.zip
Fixing jvm 1.5 support for parallel collections.
Special cased with thread pool executor scheduling. Fixed an ugly concurrency bug where futures returned by a thread pool executor didn't remove the task from the queue when cancel was called. Note to self and others: don't cancel futures returned by thread pool executors, it might lead to unexpected behaviour. Modified the executor to add new threads if all the active threads are syncing, in order to avoid deadlocks. Fixed a hidden bug in AdaptiveWorkStealingTasks, where correct behaviour depended on the execution order of the tasks. This didn't fail before with ForkJoinTasks, since there the execution order is well-defined. Scalachecked 1.5 & 1.6 support. No review.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/collection/mutable/PriorityQueue.scala4
-rw-r--r--src/library/scala/collection/package.scala1
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala38
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala3
-rw-r--r--src/library/scala/collection/parallel/TaskSupport.scala6
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala295
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala4
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArray.scala7
-rw-r--r--src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala2
-rw-r--r--src/library/scala/collection/parallel/package.scala4
10 files changed, 277 insertions, 87 deletions
diff --git a/src/library/scala/collection/mutable/PriorityQueue.scala b/src/library/scala/collection/mutable/PriorityQueue.scala
index 45f1d1f06f..453bed54e6 100644
--- a/src/library/scala/collection/mutable/PriorityQueue.scala
+++ b/src/library/scala/collection/mutable/PriorityQueue.scala
@@ -15,8 +15,8 @@ import generic._
import annotation.migration
/** This class implements priority queues using a heap.
- * To prioritize elements of type T there must be an implicit
- * Ordering[T] available at creation.
+ * To prioritize elements of type A there must be an implicit
+ * Ordering[A] available at creation.
*
* @tparam A type of the elements in this priority queue.
* @param ord implicit ordering used to compare the elements of type `A`.
diff --git a/src/library/scala/collection/package.scala b/src/library/scala/collection/package.scala
index 13b6f22826..31cea84ab8 100644
--- a/src/library/scala/collection/package.scala
+++ b/src/library/scala/collection/package.scala
@@ -87,6 +87,7 @@ package object collection {
}
def arrayString[T](array: Array[T], from: Int, until: Int) = array.slice(from, until).map(x => if (x != null) x.toString else "n/a").mkString(" | ")
+
}
}
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 53a219ca28..83e5c6cb59 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -476,8 +476,7 @@ self =>
val copythis = new Copy(() => pbf(repr), parallelIterator)
val copythat = wrap {
val othtask = new other.Copy(() => pbf(self.repr), other.parallelIterator)
- othtask.compute
- othtask.result
+ other.tasksupport.executeAndWaitResult(othtask)
}
val task = (copythis parallel copythat) { _ combine _ } mapResult {
_.result
@@ -713,13 +712,12 @@ self =>
def shouldSplitFurther = pit.remaining > threshold(size, parallelismLevel)
def split = pit.split.map(newSubtask(_)) // default split procedure
private[parallel] override def signalAbort = pit.abort
- override def toString = "Accessor(" + pit.toString + ")"
+ override def toString = this.getClass.getSimpleName + "(" + pit.toString + ")(" + result + ")"
}
protected[this] trait NonDivisibleTask[R, Tp] extends StrictSplitterCheckTask[R, Tp] {
def shouldSplitFurther = false
def split = throw new UnsupportedOperationException("Does not split.")
- override def toString = "NonDivisibleTask"
}
protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]]
@@ -768,9 +766,7 @@ self =>
var result: R1 = null.asInstanceOf[R1]
def map(r: R): R1
def leaf(prevr: Option[R1]) = {
- inner.compute
- throwable = inner.throwable
- if (throwable eq null) result = map(inner.result)
+ result = map(executeAndWaitResult(inner))
}
private[parallel] override def signalAbort {
inner.signalAbort
@@ -787,10 +783,12 @@ self =>
}
protected[this] class Count(pred: T => Boolean, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Int, Count] {
+ // val pittxt = pit.toString
var result: Int = 0
def leaf(prevr: Option[Int]) = result = pit.count(pred)
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Count(pred, p)
override def merge(that: Count) = result = result + that.result
+ // override def toString = "CountTask(" + pittxt + ")"
}
protected[this] class Reduce[U >: T](op: (U, U) => U, protected[this] val pit: ParIterableIterator[T]) extends Accessor[Option[U], Reduce[U]] {
@@ -901,7 +899,9 @@ self =>
protected[this] class Filter[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], Filter[U, This]] {
var result: Combiner[U, This] = null
- def leaf(prev: Option[Combiner[U, This]]) = result = pit.filter2combiner(pred, reuse(prev, cbf()))
+ def leaf(prev: Option[Combiner[U, This]]) = {
+ result = pit.filter2combiner(pred, reuse(prev, cbf()))
+ }
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Filter(pred, cbf, p)
override def merge(that: Filter[U, This]) = result = result combine that.result
}
@@ -909,7 +909,9 @@ self =>
protected[this] class FilterNot[U >: T, This >: Repr](pred: T => Boolean, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], FilterNot[U, This]] {
var result: Combiner[U, This] = null
- def leaf(prev: Option[Combiner[U, This]]) = result = pit.filterNot2combiner(pred, reuse(prev, cbf()))
+ def leaf(prev: Option[Combiner[U, This]]) = {
+ result = pit.filterNot2combiner(pred, reuse(prev, cbf()))
+ }
protected[this] def newSubtask(p: ParIterableIterator[T]) = new FilterNot(pred, cbf, p)
override def merge(that: FilterNot[U, This]) = result = result combine that.result
}
@@ -1253,6 +1255,24 @@ self =>
private[parallel] def brokenInvariants = Seq[String]()
+ private val debugBuffer = collection.mutable.ArrayBuffer[String]()
+
+ private[parallel] def debugclear() = synchronized {
+ debugBuffer.clear
+ }
+
+ private[parallel] def debuglog(s: String) = synchronized {
+ debugBuffer += s
+ }
+
+ import collection.DebugUtils._
+ private[parallel] def printDebugBuffer = println(buildString {
+ append =>
+ for (s <- debugBuffer) {
+ append(s)
+ }
+ })
+
}
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index 9e24b83d8a..0ea33d0e39 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -242,8 +242,7 @@ self =>
val copystart = new Copy[U, That](() => pbf(repr), pits(0))
val copymiddle = wrap {
val tsk = new that.Copy[U, That](() => pbf(repr), that.parallelIterator)
- tsk.compute
- tsk.result
+ that.tasksupport.executeAndWaitResult(tsk)
}
val copyend = new Copy[U, That](() => pbf(repr), pits(2))
executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
diff --git a/src/library/scala/collection/parallel/TaskSupport.scala b/src/library/scala/collection/parallel/TaskSupport.scala
index 8a072b22aa..1b1ee6e469 100644
--- a/src/library/scala/collection/parallel/TaskSupport.scala
+++ b/src/library/scala/collection/parallel/TaskSupport.scala
@@ -6,7 +6,11 @@ package scala.collection.parallel
-trait TaskSupport extends AdaptiveWorkStealingForkJoinTasks
+trait TaskSupport extends Tasks
+
+class ForkJoinTaskSupport extends TaskSupport with AdaptiveWorkStealingForkJoinTasks
+
+class ThreadPoolTaskSupport extends TaskSupport with AdaptiveWorkStealingThreadPoolTasks
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index 6f0fe47a6a..ec38513d9b 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -7,7 +7,7 @@ import scala.concurrent.forkjoin._
import scala.util.control.Breaks._
-
+import annotation.unchecked.uncheckedVariance
@@ -20,49 +20,35 @@ import scala.util.control.Breaks._
*/
trait Tasks {
- /** A task abstraction which allows starting a task with `start`,
- * waiting for it to finish with `sync` and attempting to cancel
- * the task with `tryCancel`.
- * It also defines a method `leaf` which must be called once the
- * the task is started and defines what this task actually does.
- * Method `split` allows splitting this task into smaller subtasks,
- * and method `shouldSplitFurther` decides if the task should be
- * partitioned further.
- * Method `merge` allows merging the results of the 2 tasks. It updates
- * the result of the receiver.
- * Finally, it defines the task result of type `U`.
- */
+ private[parallel] val debugMessages = collection.mutable.ArrayBuffer[String]()
+
+ private[parallel] def debuglog(s: String) = synchronized {
+ debugMessages += s
+ }
+
trait Task[R, +Tp] {
type Result = R
+
def repr = this.asInstanceOf[Tp]
- /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */
- def compute
- /** Body of the task - non-divisible unit of work done by this task. Optionally is provided with the result from the previous task
- * or `None` if there was no previous task.
+ /** Body of the task - non-divisible unit of work done by this task.
+ * Optionally is provided with the result from the previous completed task
+ * or `None` if there was no previous task (or the previous task is uncompleted or unknown).
*/
def leaf(result: Option[R])
- /** Start task. */
- def start
- /** Wait for task to finish. */
- def sync
- /** Try to cancel the task.
- * @return `true` if cancellation is successful.
- */
- def tryCancel: Boolean
/** A result that can be accessed once the task is completed. */
- def result: R
+ var result: R
/** Decides whether or not this task should be split further. */
def shouldSplitFurther: Boolean
/** Splits this task into a list of smaller tasks. */
- protected[this] def split: Seq[Task[R, Tp]]
+ private[parallel] def split: Seq[Task[R, Tp]]
/** Read of results of `that` task and merge them into results of this one. */
- protected[this] def merge(that: Tp) {}
+ private[parallel] def merge(that: Tp @uncheckedVariance) {}
// exception handling mechanism
var throwable: Throwable = null
def forwardThrowable = if (throwable != null) throw throwable
// tries to do the leaf computation, storing the possible exception
- protected def tryLeaf(result: Option[R]) {
+ private[parallel] def tryLeaf(result: Option[R]) {
try {
tryBreakable {
leaf(result)
@@ -75,7 +61,7 @@ trait Tasks {
signalAbort
}
}
- protected[this] def tryMerge(t: Tp) {
+ private[parallel] def tryMerge(t: Tp @uncheckedVariance) {
val that = t.asInstanceOf[Task[R, Tp]]
if (this.throwable == null && that.throwable == null) merge(t)
mergeThrowables(that)
@@ -90,16 +76,44 @@ trait Tasks {
private[parallel] def signalAbort {}
}
- type TaskType[R, +Tp] <: Task[R, Tp]
- type ExecutionEnvironment
+ trait TaskImpl[R, +Tp] {
+ /** the body of this task - what it executes, how it gets split and how results are merged. */
+ val body: Task[R, Tp]
+
+ def split: Seq[TaskImpl[R, Tp]]
+ /** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */
+ def compute
+ /** Start task. */
+ def start
+ /** Wait for task to finish. */
+ def sync
+ /** Try to cancel the task.
+ * @return `true` if cancellation is successful.
+ */
+ def tryCancel: Boolean
+ /** If the task has been cancelled successfully, those syncing on it may
+ * automatically be notified, depending on the implementation. If they
+ * aren't, this release method should be called after processing the
+ * cancelled task.
+ *
+ * This method may be overridden.
+ */
+ def release {}
+ }
+
+ protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
- var environment: ExecutionEnvironment
+ /* task control */
+
+ // safe to assume it will always have the same type,
+ // because the `tasksupport` in parallel iterable is final
+ var environment: AnyRef
/** Executes a task and returns a future. Forwards an exception if some task threw it. */
- def execute[R, Tp](fjtask: TaskType[R, Tp]): () => R
+ def execute[R, Tp](fjtask: Task[R, Tp]): () => R
/** Executes a result task, waits for it to finish, then returns its result. Forwards an exception if some task threw it. */
- def executeAndWaitResult[R, Tp](task: TaskType[R, Tp]): R
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R
/** Retrieves the parallelism level of the task execution environment. */
def parallelismLevel: Int
@@ -107,40 +121,46 @@ trait Tasks {
}
+
/** This trait implements scheduling by employing
* an adaptive work stealing technique.
*/
trait AdaptiveWorkStealingTasks extends Tasks {
- trait Task[R, Tp] extends super.Task[R, Tp] {
- var next: Task[R, Tp] = null
+ trait TaskImpl[R, Tp] extends super.TaskImpl[R, Tp] {
+ var next: TaskImpl[R, Tp] = null
var shouldWaitFor = true
- var result: R
-
- def split: Seq[Task[R, Tp]]
- /** The actual leaf computation. */
- def leaf(result: Option[R]): Unit
+ def split: Seq[TaskImpl[R, Tp]]
- def compute = if (shouldSplitFurther) internal else tryLeaf(None)
+ def compute = if (body.shouldSplitFurther) internal else body.tryLeaf(None)
def internal = {
var last = spawnSubtasks
- last.tryLeaf(None)
- result = last.result
+ last.body.tryLeaf(None)
+ body.result = last.body.result
while (last.next != null) {
- val lastresult = Option(last.result)
+ // val lastresult = Option(last.body.result)
+ val beforelast = last
last = last.next
- if (last.tryCancel) last.tryLeaf(lastresult) else last.sync
- tryMerge(last.repr)
+ if (last.tryCancel) {
+ // debuglog("Done with " + beforelast.body + ", next direct is " + last.body)
+ last.body.tryLeaf(Some(body.result))
+ last.release
+ } else {
+ // debuglog("Done with " + beforelast.body + ", next sync is " + last.body)
+ last.sync
+ }
+ // debuglog("Merging " + body + " with " + last.body)
+ body.tryMerge(last.body.repr)
}
}
def spawnSubtasks = {
- var last: Task[R, Tp] = null
- var head: Task[R, Tp] = this
+ var last: TaskImpl[R, Tp] = null
+ var head: TaskImpl[R, Tp] = this
do {
val subtasks = head.split
head = subtasks.head
@@ -149,7 +169,7 @@ trait AdaptiveWorkStealingTasks extends Tasks {
last = t
t.start
}
- } while (head.shouldSplitFurther);
+ } while (head.body.shouldSplitFurther);
head.next = last
head
}
@@ -165,6 +185,9 @@ trait AdaptiveWorkStealingTasks extends Tasks {
}
}
+ // specialize ctor
+ protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+
}
@@ -176,6 +199,123 @@ trait HavingForkJoinPool {
}
+trait ThreadPoolTasks extends Tasks {
+ import java.util.concurrent._
+
+ trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
+ // initially, this is null
+ // once the task is started, this future is set and used for `sync`
+ // utb: var future: Future[_] = null
+ @volatile var owned = false
+ @volatile var completed = false
+
+ def start = synchronized {
+ // debuglog("Starting " + body)
+ // utb: future = executor.submit(this)
+ executor.synchronized {
+ incrTasks
+ executor.submit(this)
+ }
+ }
+ def sync = synchronized {
+ // debuglog("Syncing on " + body)
+ // utb: future.get()
+ executor.synchronized {
+ val coresize = executor.getCorePoolSize
+ if (coresize < totaltasks) executor.setCorePoolSize(coresize + 1)
+ }
+ if (!completed) this.wait
+ }
+ def tryCancel = synchronized {
+ // utb: future.cancel(false)
+ if (!owned) {
+ // debuglog("Cancelling " + body)
+ owned = true
+ true
+ } else false
+ }
+ def run = {
+ // utb: compute
+ var isOkToRun = false
+ synchronized {
+ if (!owned) {
+ owned = true
+ isOkToRun = true
+ }
+ }
+ if (isOkToRun) {
+ // debuglog("Running body of " + body)
+ compute
+ release
+ } else {
+ // just skip
+ // debuglog("skipping body of " + body)
+ }
+ }
+ override def release = synchronized {
+ completed = true
+ decrTasks
+ this.notifyAll
+ }
+ }
+
+ protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+
+ var environment: AnyRef = ThreadPoolTasks.defaultThreadPool
+ def executor = environment.asInstanceOf[ThreadPoolExecutor]
+ def queue = executor.getQueue.asInstanceOf[LinkedBlockingQueue[Runnable]]
+ var totaltasks = 0
+
+ private def incrTasks = synchronized {
+ totaltasks += 1
+ }
+
+ private def decrTasks = synchronized {
+ totaltasks -= 1
+ }
+
+ def execute[R, Tp](task: Task[R, Tp]): () => R = {
+ val t = newTaskImpl(task)
+
+ // debuglog("-----------> Executing without wait: " + task)
+ t.start
+
+ () => {
+ t.sync
+ t.body.result
+ }
+ }
+
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
+ val t = newTaskImpl(task)
+
+ // debuglog("-----------> Executing with wait: " + task)
+ t.start
+
+ t.sync
+ t.body.result
+ }
+
+ def parallelismLevel = ThreadPoolTasks.numCores
+
+}
+
+object ThreadPoolTasks {
+ import java.util.concurrent._
+
+ val numCores = Runtime.getRuntime.availableProcessors
+
+ val defaultThreadPool = new ThreadPoolExecutor(
+ numCores,
+ Int.MaxValue,
+ 60L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue[Runnable],
+ new ThreadPoolExecutor.CallerRunsPolicy
+ )
+}
+
+
+
/** An implementation trait for parallel tasks based on the fork/join framework.
*
* @define fjdispatch
@@ -184,36 +324,37 @@ trait HavingForkJoinPool {
*/
trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
- trait Task[R, +Tp] extends RecursiveAction with super.Task[R, Tp] {
+ trait TaskImpl[R, +Tp] extends RecursiveAction with super.TaskImpl[R, Tp] {
def start = fork
def sync = join
def tryCancel = tryUnfork
- var result: R
}
- type TaskType[R, +Tp] = Task[R, Tp]
- type ExecutionEnvironment = ForkJoinPool
+ // specialize ctor
+ protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
/** The fork/join pool of this collection.
*/
- def forkJoinPool: ForkJoinPool = environment
- var environment = ForkJoinTasks.defaultForkJoinPool
+ def forkJoinPool: ForkJoinPool = environment.asInstanceOf[ForkJoinPool]
+ var environment: AnyRef = ForkJoinTasks.defaultForkJoinPool
/** Executes a task and does not wait for it to finish - instead returns a future.
*
* $fjdispatch
*/
- def execute[R, Tp](fjtask: Task[R, Tp]): () => R = {
- if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
+ def execute[R, Tp](task: Task[R, Tp]): () => R = {
+ val fjtask = newTaskImpl(task)
+
+ if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
fjtask.fork
} else {
forkJoinPool.execute(fjtask)
}
() => {
- fjtask.join
- fjtask.forwardThrowable
- fjtask.result
+ fjtask.sync
+ fjtask.body.forwardThrowable
+ fjtask.body.result
}
}
@@ -224,15 +365,18 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
*
* @return the result of the task
*/
- def executeAndWaitResult[R, Tp](fjtask: Task[R, Tp]): R = {
- if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
+ val fjtask = newTaskImpl(task)
+
+ if (currentThread.isInstanceOf[ForkJoinWorkerThread]) {
fjtask.fork
} else {
forkJoinPool.execute(fjtask)
}
- fjtask.join
- fjtask.forwardThrowable
- fjtask.result
+
+ fjtask.sync
+ fjtask.body.forwardThrowable
+ fjtask.body.result
}
def parallelismLevel = forkJoinPool.getParallelism
@@ -251,11 +395,26 @@ object ForkJoinTasks {
*/
trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks {
- trait Task[R, Tp] extends super[ForkJoinTasks].Task[R, Tp] with super[AdaptiveWorkStealingTasks].Task[R, Tp]
+ class TaskImpl[R, Tp](val body: Task[R, Tp])
+ extends super[ForkJoinTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] {
+ def split = body.split.map(b => newTaskImpl(b))
+ }
+
+ def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b)
}
+trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveWorkStealingTasks {
+
+ class TaskImpl[R, Tp](val body: Task[R, Tp])
+ extends super[ThreadPoolTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] {
+ def split = body.split.map(b => newTaskImpl(b))
+ }
+
+ def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b)
+
+}
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 5c3720a3bf..58dce1aef4 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -90,6 +90,7 @@ self =>
i < sz
}
def remaining = sz - i
+ override def toString = "HashTrieIterator(" + sz + ")"
}
private[parallel] def printDebugInfo {
@@ -168,7 +169,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
}
override def toString = {
- "HashTrieCombiner(buckets:\n\t" + buckets.filter(_ != null).mkString("\n\t") + ")\n"
+ "HashTrieCombiner(sz: " + size + ")"
+ //"HashTrieCombiner(buckets:\n\t" + buckets.filter(_ != null).mkString("\n\t") + ")\n"
}
/* tasks */
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
index a1164b7b80..909b8eb5d7 100644
--- a/src/library/scala/collection/parallel/mutable/ParArray.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -102,8 +102,11 @@ self =>
val left = remaining
if (left >= 2) {
val splitpoint = left / 2
- Seq(new ParArrayIterator(i, i + splitpoint, arr) with SCPI,
- new ParArrayIterator(i + splitpoint, until, arr) with SCPI)
+ val sq = Seq(
+ new ParArrayIterator(i, i + splitpoint, arr) with SCPI,
+ new ParArrayIterator(i + splitpoint, until, arr) with SCPI)
+ i = until
+ sq
} else {
Seq(this)
}
diff --git a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
index d95e478fec..760f8b09ce 100644
--- a/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/ParArrayCombiner.scala
@@ -38,7 +38,7 @@ self: EnvironmentPassingCombiner[T, ParArray[T]] =>
pa
}
- override def toString = "ParArrayCombiner(" + size + "): " + chain
+ override def toString = "ParArrayCombiner(" + size + "): " //+ chain
/* tasks */
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index 8cd54e6083..19ae9aef5d 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -36,7 +36,9 @@ package object parallel {
private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString)
- private[parallel] def getTaskSupport: TaskSupport = new TaskSupport {}
+ private[parallel] def getTaskSupport: TaskSupport =
+ if (util.Properties.isJavaAtLeast("1.6")) new ForkJoinTaskSupport
+ else new ThreadPoolTaskSupport
/* implicit conversions */