diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2011-03-24 15:00:41 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2011-03-24 15:00:41 +0000 |
commit | d0e519a3091ce7e14781a8b929c35a3e228194d8 (patch) | |
tree | 161f5c08de8d018d85cd73a663f2929b9a3eeaac /src/library/scala/collection/parallel/Tasks.scala | |
parent | af011572ee74162202b2a66a98bf5e480b5b435b (diff) | |
download | scala-d0e519a3091ce7e14781a8b929c35a3e228194d8.tar.gz scala-d0e519a3091ce7e14781a8b929c35a3e228194d8.tar.bz2 scala-d0e519a3091ce7e14781a8b929c35a3e228194d8.zip |
Added a temporary fix for #4351, but disabled i...
Added a temporary fix for #4351, but disabled it because the extend
specialized class with nonspecialized type-parameters is used in the
stdlib already.
Disabling scala.parallel package, adding the currently disabled
scala.concurrent package which will be implemented in some of the next
releases.
Review by phaller.
Diffstat (limited to 'src/library/scala/collection/parallel/Tasks.scala')
-rw-r--r-- | src/library/scala/collection/parallel/Tasks.scala | 70 |
1 files changed, 68 insertions, 2 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index 0b220a020f..80cdd31fa1 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -279,7 +279,9 @@ trait ThreadPoolTasks extends Tasks { } override def release = synchronized { completed = true - decrTasks + executor.synchronized { + decrTasks + } this.notifyAll } } @@ -352,6 +354,70 @@ object ThreadPoolTasks { } +/** An implementation of tasks objects based on the Java thread pooling API and synchronization using futures. */ +trait FutureThreadPoolTasks extends Tasks { + import java.util.concurrent._ + + trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] { + @volatile var future: Future[_] = null + + def start = { + executor.synchronized { + future = executor.submit(this) + } + } + def sync = future.get + def tryCancel = false + def run = { + compute + } + } + + protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] + + var environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool + def executor = environment.asInstanceOf[ThreadPoolExecutor] + + def execute[R, Tp](task: Task[R, Tp]): () => R = { + val t = newTaskImpl(task) + + // debuglog("-----------> Executing without wait: " + task) + t.start + + () => { + t.sync + t.body.forwardThrowable + t.body.result + } + } + + def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { + val t = newTaskImpl(task) + + // debuglog("-----------> Executing with wait: " + task) + t.start + + t.sync + t.body.forwardThrowable + t.body.result + } + + def parallelismLevel = FutureThreadPoolTasks.numCores + +} + +object FutureThreadPoolTasks { + import java.util.concurrent._ + + val numCores = Runtime.getRuntime.availableProcessors + + val tcount = new atomic.AtomicLong(0L) + + val defaultThreadPool = Executors.newCachedThreadPool() +} + + + /** * A trait describing objects that provide a fork/join pool. */ @@ -430,7 +496,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { object ForkJoinTasks { - val defaultForkJoinPool: ForkJoinPool = scala.parallel.forkjoinpool + val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() // scala.parallel.forkjoinpool // defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors) // defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors) } |