summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/parallel/Tasks.scala
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2011-03-24 15:00:41 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2011-03-24 15:00:41 +0000
commitd0e519a3091ce7e14781a8b929c35a3e228194d8 (patch)
tree161f5c08de8d018d85cd73a663f2929b9a3eeaac /src/library/scala/collection/parallel/Tasks.scala
parentaf011572ee74162202b2a66a98bf5e480b5b435b (diff)
downloadscala-d0e519a3091ce7e14781a8b929c35a3e228194d8.tar.gz
scala-d0e519a3091ce7e14781a8b929c35a3e228194d8.tar.bz2
scala-d0e519a3091ce7e14781a8b929c35a3e228194d8.zip
Added a temporary fix for #4351, but disabled i...
Added a temporary fix for #4351, but disabled it because the extend specialized class with nonspecialized type-parameters is used in the stdlib already. Disabling scala.parallel package, adding the currently disabled scala.concurrent package which will be implemented in some of the next releases. Review by phaller.
Diffstat (limited to 'src/library/scala/collection/parallel/Tasks.scala')
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala70
1 files changed, 68 insertions, 2 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index 0b220a020f..80cdd31fa1 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -279,7 +279,9 @@ trait ThreadPoolTasks extends Tasks {
}
override def release = synchronized {
completed = true
- decrTasks
+ executor.synchronized {
+ decrTasks
+ }
this.notifyAll
}
}
@@ -352,6 +354,70 @@ object ThreadPoolTasks {
}
+/** An implementation of tasks objects based on the Java thread pooling API and synchronization using futures. */
+trait FutureThreadPoolTasks extends Tasks {
+ import java.util.concurrent._
+
+ trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
+ @volatile var future: Future[_] = null
+
+ def start = {
+ executor.synchronized {
+ future = executor.submit(this)
+ }
+ }
+ def sync = future.get
+ def tryCancel = false
+ def run = {
+ compute
+ }
+ }
+
+ protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+
+ var environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool
+ def executor = environment.asInstanceOf[ThreadPoolExecutor]
+
+ def execute[R, Tp](task: Task[R, Tp]): () => R = {
+ val t = newTaskImpl(task)
+
+ // debuglog("-----------> Executing without wait: " + task)
+ t.start
+
+ () => {
+ t.sync
+ t.body.forwardThrowable
+ t.body.result
+ }
+ }
+
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
+ val t = newTaskImpl(task)
+
+ // debuglog("-----------> Executing with wait: " + task)
+ t.start
+
+ t.sync
+ t.body.forwardThrowable
+ t.body.result
+ }
+
+ def parallelismLevel = FutureThreadPoolTasks.numCores
+
+}
+
+object FutureThreadPoolTasks {
+ import java.util.concurrent._
+
+ val numCores = Runtime.getRuntime.availableProcessors
+
+ val tcount = new atomic.AtomicLong(0L)
+
+ val defaultThreadPool = Executors.newCachedThreadPool()
+}
+
+
+
/**
* A trait describing objects that provide a fork/join pool.
*/
@@ -430,7 +496,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
object ForkJoinTasks {
- val defaultForkJoinPool: ForkJoinPool = scala.parallel.forkjoinpool
+ val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() // scala.parallel.forkjoinpool
// defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
// defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
}