diff options
Diffstat (limited to 'src/library')
-rw-r--r-- | src/library/scala/collection/parallel/Tasks.scala | 70 | ||||
-rw-r--r-- | src/library/scala/concurrent/package.scala.disabled | 108 | ||||
-rw-r--r-- | src/library/scala/parallel/package.scala.disabled (renamed from src/library/scala/parallel/package.scala) | 44 |
3 files changed, 198 insertions, 24 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index 0b220a020f..80cdd31fa1 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -279,7 +279,9 @@ trait ThreadPoolTasks extends Tasks { } override def release = synchronized { completed = true - decrTasks + executor.synchronized { + decrTasks + } this.notifyAll } } @@ -352,6 +354,70 @@ object ThreadPoolTasks { } +/** An implementation of tasks objects based on the Java thread pooling API and synchronization using futures. */ +trait FutureThreadPoolTasks extends Tasks { + import java.util.concurrent._ + + trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] { + @volatile var future: Future[_] = null + + def start = { + executor.synchronized { + future = executor.submit(this) + } + } + def sync = future.get + def tryCancel = false + def run = { + compute + } + } + + protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp] + + var environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool + def executor = environment.asInstanceOf[ThreadPoolExecutor] + + def execute[R, Tp](task: Task[R, Tp]): () => R = { + val t = newTaskImpl(task) + + // debuglog("-----------> Executing without wait: " + task) + t.start + + () => { + t.sync + t.body.forwardThrowable + t.body.result + } + } + + def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { + val t = newTaskImpl(task) + + // debuglog("-----------> Executing with wait: " + task) + t.start + + t.sync + t.body.forwardThrowable + t.body.result + } + + def parallelismLevel = FutureThreadPoolTasks.numCores + +} + +object FutureThreadPoolTasks { + import java.util.concurrent._ + + val numCores = Runtime.getRuntime.availableProcessors + + val tcount = new atomic.AtomicLong(0L) + + val defaultThreadPool = Executors.newCachedThreadPool() +} + + + /** * A trait describing objects that provide a fork/join pool. */ @@ -430,7 +496,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool { object ForkJoinTasks { - val defaultForkJoinPool: ForkJoinPool = scala.parallel.forkjoinpool + val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() // scala.parallel.forkjoinpool // defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors) // defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors) } diff --git a/src/library/scala/concurrent/package.scala.disabled b/src/library/scala/concurrent/package.scala.disabled new file mode 100644 index 0000000000..42b4bf954c --- /dev/null +++ b/src/library/scala/concurrent/package.scala.disabled @@ -0,0 +1,108 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + + + +package scala + + + + +/** This package object contains primitives for parallel programming. + */ +package object concurrent { + + /** Performs a call which can potentially block execution. + * + * Example: + * {{{ + * val lock = new ReentrantLock + * + * // ... do something ... + * + * blocking { + * if (!lock.hasLock) lock.lock() + * } + * }}} + * + * '''Note:''' calling methods that wait arbitrary amounts of time + * (e.g. for I/O operations or locks) may severely decrease performance + * or even result in deadlocks. This does not include waiting for + * results of futures. + * + * @tparam T the result type of the blocking operation + * @param body the blocking operation + * @param runner the runner used for parallel computations + * @return the result of the potentially blocking operation + */ + def blocking[T](body: =>T)(implicit runner: TaskRunner): T = { + null.asInstanceOf[T] + } + + /** Invokes a computation asynchronously. Does not wait for the computation + * to finish. + * + * @tparam U the result type of the operation + * @param p the computation to be invoked asynchronously + * @param runner the runner used for parallel computations + */ + def spawn[U](p: =>U)(implicit runner: TaskRunner): Unit = { + } + + /** Starts 2 parallel computations and returns once they are completed. + * + * $invokingPar + * + * @tparam T1 the type of the result of 1st the parallel computation + * @tparam T2 the type of the result of 2nd the parallel computation + * @param b1 the 1st computation to be invoked in parallel + * @param b2 the 2nd computation to be invoked in parallel + * @param runner the runner used for parallel computations + * @return a tuple of results corresponding to parallel computations + */ + def par[T1, T2](b1: =>T1)(b2: =>T2)(implicit runner: TaskRunner): (T1, T2) = { + null + } + + /** Starts 3 parallel computations and returns once they are completed. + * + * $invokingPar + * + * @tparam T1 the type of the result of 1st the parallel computation + * @tparam T2 the type of the result of 2nd the parallel computation + * @tparam T3 the type of the result of 3rd the parallel computation + * @param b1 the 1st computation to be invoked in parallel + * @param b2 the 2nd computation to be invoked in parallel + * @param b3 the 3rd computation to be invoked in parallel + * @param runner the runner used for parallel computations + * @return a tuple of results corresponding to parallel computations + */ + def par[T1, T2, T3](b1: =>T1)(b2: =>T2)(b3: =>T3)(implicit runner: TaskRunner): (T1, T2, T3) = { + null + } + + /** Starts 4 parallel computations and returns once they are completed. + * + * $invokingPar + * + * @tparam T1 the type of the result of 1st the parallel computation + * @tparam T2 the type of the result of 2nd the parallel computation + * @tparam T3 the type of the result of 3rd the parallel computation + * @tparam T4 the type of the result of 4th the parallel computation + * @param b1 the 1st computation to be invoked in parallel + * @param b2 the 2nd computation to be invoked in parallel + * @param b3 the 3rd computation to be invoked in parallel + * @param b4 the 4th computation to be invoked in parallel + * @param runner the runner used for parallel computations + * @return a tuple of results corresponding to parallel computations + */ + def par[T1, T2, T3, T4](b1: =>T1)(b2: =>T2)(b3: =>T3)(b4: =>T4)(implicit runner: TaskRunner): (T1, T2, T3, T4) = { + null + } + +} diff --git a/src/library/scala/parallel/package.scala b/src/library/scala/parallel/package.scala.disabled index 4cae1ad4b1..45f5470d03 100644 --- a/src/library/scala/parallel/package.scala +++ b/src/library/scala/parallel/package.scala.disabled @@ -15,13 +15,13 @@ import scala.concurrent.forkjoin._ * chain obtained by querying results of unfinished futures can have * arbitrary lengths. However, care must be taken not to create a * circular dependency, as this will result in a deadlock. - * + * * Additionally, if the parallel computation performs a blocking call * (e.g. an I/O operation or waiting for a lock) other than waiting for a future, * it should do so by invoking the `block` method. This is another * form of waiting that could potentially create a circular dependency, * an the user should take care not to do this. - * + * * Users should be aware that invoking a parallel computation has a * certain overhead. Parallel computations should not be invoked for * small computations, as this can lead to bad performance. A rule of the @@ -31,36 +31,36 @@ import scala.concurrent.forkjoin._ * computationally equivalent to a loop with 10000 arithmetic operations. */ package object parallel { - + private[scala] val forkjoinpool = new ForkJoinPool() - + private class Task[T](body: =>T) extends RecursiveTask[T] with Future[T] { def compute = body def apply() = join() } - + private final def newTask[T](body: =>T) = new Task[T](body) - + private final def executeTask[T](task: RecursiveTask[T]) { if (Thread.currentThread().isInstanceOf[ForkJoinWorkerThread]) task.fork else forkjoinpool.execute(task) } - + /* public methods */ - + /** Performs a call which can potentially block execution. - * + * * Example: * {{{ * val lock = new ReentrantLock - * + * * // ... do something ... - * + * * blocking { * if (!lock.hasLock) lock.lock() * } * }}} - * + * * '''Note:''' calling methods that wait arbitrary amounts of time * (e.g. for I/O operations or locks) may severely decrease performance * or even result in deadlocks. This does not include waiting for @@ -82,11 +82,11 @@ package object parallel { blocker.result.asInstanceOf[T] } else body } - + /** Starts a parallel computation and returns a future. - * + * * $invokingPar - * + * * @tparam T the type of the result of the parallel computation * @param body the computation to be invoked in parallel * @return a future with the result @@ -96,9 +96,9 @@ package object parallel { executeTask(task) task } - + /** Starts 2 parallel computations and returns a future. - * + * * $invokingPar * * @tparam T1 the type of the result of 1st the parallel computation @@ -114,9 +114,9 @@ package object parallel { executeTask(t2) (t1, t2) } - + /** Starts 3 parallel computations and returns a future. - * + * * $invokingPar * * @tparam T1 the type of the result of 1st the parallel computation @@ -136,9 +136,9 @@ package object parallel { executeTask(t3) (t1, t2, t3) } - + /** Starts 4 parallel computations and returns a future. - * + * * $invokingPar * * @tparam T1 the type of the result of 1st the parallel computation @@ -162,7 +162,7 @@ package object parallel { executeTask(t4) (t1, t2, t3, t4) } - + } |