summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala70
-rw-r--r--src/library/scala/concurrent/package.scala.disabled108
-rw-r--r--src/library/scala/parallel/package.scala.disabled (renamed from src/library/scala/parallel/package.scala)44
3 files changed, 198 insertions, 24 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index 0b220a020f..80cdd31fa1 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -279,7 +279,9 @@ trait ThreadPoolTasks extends Tasks {
}
override def release = synchronized {
completed = true
- decrTasks
+ executor.synchronized {
+ decrTasks
+ }
this.notifyAll
}
}
@@ -352,6 +354,70 @@ object ThreadPoolTasks {
}
+/** An implementation of tasks objects based on the Java thread pooling API and synchronization using futures. */
+trait FutureThreadPoolTasks extends Tasks {
+ import java.util.concurrent._
+
+ trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
+ @volatile var future: Future[_] = null
+
+ def start = {
+ executor.synchronized {
+ future = executor.submit(this)
+ }
+ }
+ def sync = future.get
+ def tryCancel = false
+ def run = {
+ compute
+ }
+ }
+
+ protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+
+ var environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool
+ def executor = environment.asInstanceOf[ThreadPoolExecutor]
+
+ def execute[R, Tp](task: Task[R, Tp]): () => R = {
+ val t = newTaskImpl(task)
+
+ // debuglog("-----------> Executing without wait: " + task)
+ t.start
+
+ () => {
+ t.sync
+ t.body.forwardThrowable
+ t.body.result
+ }
+ }
+
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
+ val t = newTaskImpl(task)
+
+ // debuglog("-----------> Executing with wait: " + task)
+ t.start
+
+ t.sync
+ t.body.forwardThrowable
+ t.body.result
+ }
+
+ def parallelismLevel = FutureThreadPoolTasks.numCores
+
+}
+
+object FutureThreadPoolTasks {
+ import java.util.concurrent._
+
+ val numCores = Runtime.getRuntime.availableProcessors
+
+ val tcount = new atomic.AtomicLong(0L)
+
+ val defaultThreadPool = Executors.newCachedThreadPool()
+}
+
+
+
/**
* A trait describing objects that provide a fork/join pool.
*/
@@ -430,7 +496,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
object ForkJoinTasks {
- val defaultForkJoinPool: ForkJoinPool = scala.parallel.forkjoinpool
+ val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() // scala.parallel.forkjoinpool
// defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
// defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
}
diff --git a/src/library/scala/concurrent/package.scala.disabled b/src/library/scala/concurrent/package.scala.disabled
new file mode 100644
index 0000000000..42b4bf954c
--- /dev/null
+++ b/src/library/scala/concurrent/package.scala.disabled
@@ -0,0 +1,108 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+
+
+package scala
+
+
+
+
+/** This package object contains primitives for parallel programming.
+ */
+package object concurrent {
+
+ /** Performs a call which can potentially block execution.
+ *
+ * Example:
+ * {{{
+ * val lock = new ReentrantLock
+ *
+ * // ... do something ...
+ *
+ * blocking {
+ * if (!lock.hasLock) lock.lock()
+ * }
+ * }}}
+ *
+ * '''Note:''' calling methods that wait arbitrary amounts of time
+ * (e.g. for I/O operations or locks) may severely decrease performance
+ * or even result in deadlocks. This does not include waiting for
+ * results of futures.
+ *
+ * @tparam T the result type of the blocking operation
+ * @param body the blocking operation
+ * @param runner the runner used for parallel computations
+ * @return the result of the potentially blocking operation
+ */
+ def blocking[T](body: =>T)(implicit runner: TaskRunner): T = {
+ null.asInstanceOf[T]
+ }
+
+ /** Invokes a computation asynchronously. Does not wait for the computation
+ * to finish.
+ *
+ * @tparam U the result type of the operation
+ * @param p the computation to be invoked asynchronously
+ * @param runner the runner used for parallel computations
+ */
+ def spawn[U](p: =>U)(implicit runner: TaskRunner): Unit = {
+ }
+
+ /** Starts 2 parallel computations and returns once they are completed.
+ *
+ * $invokingPar
+ *
+ * @tparam T1 the type of the result of 1st the parallel computation
+ * @tparam T2 the type of the result of 2nd the parallel computation
+ * @param b1 the 1st computation to be invoked in parallel
+ * @param b2 the 2nd computation to be invoked in parallel
+ * @param runner the runner used for parallel computations
+ * @return a tuple of results corresponding to parallel computations
+ */
+ def par[T1, T2](b1: =>T1)(b2: =>T2)(implicit runner: TaskRunner): (T1, T2) = {
+ null
+ }
+
+ /** Starts 3 parallel computations and returns once they are completed.
+ *
+ * $invokingPar
+ *
+ * @tparam T1 the type of the result of 1st the parallel computation
+ * @tparam T2 the type of the result of 2nd the parallel computation
+ * @tparam T3 the type of the result of 3rd the parallel computation
+ * @param b1 the 1st computation to be invoked in parallel
+ * @param b2 the 2nd computation to be invoked in parallel
+ * @param b3 the 3rd computation to be invoked in parallel
+ * @param runner the runner used for parallel computations
+ * @return a tuple of results corresponding to parallel computations
+ */
+ def par[T1, T2, T3](b1: =>T1)(b2: =>T2)(b3: =>T3)(implicit runner: TaskRunner): (T1, T2, T3) = {
+ null
+ }
+
+ /** Starts 4 parallel computations and returns once they are completed.
+ *
+ * $invokingPar
+ *
+ * @tparam T1 the type of the result of 1st the parallel computation
+ * @tparam T2 the type of the result of 2nd the parallel computation
+ * @tparam T3 the type of the result of 3rd the parallel computation
+ * @tparam T4 the type of the result of 4th the parallel computation
+ * @param b1 the 1st computation to be invoked in parallel
+ * @param b2 the 2nd computation to be invoked in parallel
+ * @param b3 the 3rd computation to be invoked in parallel
+ * @param b4 the 4th computation to be invoked in parallel
+ * @param runner the runner used for parallel computations
+ * @return a tuple of results corresponding to parallel computations
+ */
+ def par[T1, T2, T3, T4](b1: =>T1)(b2: =>T2)(b3: =>T3)(b4: =>T4)(implicit runner: TaskRunner): (T1, T2, T3, T4) = {
+ null
+ }
+
+}
diff --git a/src/library/scala/parallel/package.scala b/src/library/scala/parallel/package.scala.disabled
index 4cae1ad4b1..45f5470d03 100644
--- a/src/library/scala/parallel/package.scala
+++ b/src/library/scala/parallel/package.scala.disabled
@@ -15,13 +15,13 @@ import scala.concurrent.forkjoin._
* chain obtained by querying results of unfinished futures can have
* arbitrary lengths. However, care must be taken not to create a
* circular dependency, as this will result in a deadlock.
- *
+ *
* Additionally, if the parallel computation performs a blocking call
* (e.g. an I/O operation or waiting for a lock) other than waiting for a future,
* it should do so by invoking the `block` method. This is another
* form of waiting that could potentially create a circular dependency,
* an the user should take care not to do this.
- *
+ *
* Users should be aware that invoking a parallel computation has a
* certain overhead. Parallel computations should not be invoked for
* small computations, as this can lead to bad performance. A rule of the
@@ -31,36 +31,36 @@ import scala.concurrent.forkjoin._
* computationally equivalent to a loop with 10000 arithmetic operations.
*/
package object parallel {
-
+
private[scala] val forkjoinpool = new ForkJoinPool()
-
+
private class Task[T](body: =>T) extends RecursiveTask[T] with Future[T] {
def compute = body
def apply() = join()
}
-
+
private final def newTask[T](body: =>T) = new Task[T](body)
-
+
private final def executeTask[T](task: RecursiveTask[T]) {
if (Thread.currentThread().isInstanceOf[ForkJoinWorkerThread]) task.fork
else forkjoinpool.execute(task)
}
-
+
/* public methods */
-
+
/** Performs a call which can potentially block execution.
- *
+ *
* Example:
* {{{
* val lock = new ReentrantLock
- *
+ *
* // ... do something ...
- *
+ *
* blocking {
* if (!lock.hasLock) lock.lock()
* }
* }}}
- *
+ *
* '''Note:''' calling methods that wait arbitrary amounts of time
* (e.g. for I/O operations or locks) may severely decrease performance
* or even result in deadlocks. This does not include waiting for
@@ -82,11 +82,11 @@ package object parallel {
blocker.result.asInstanceOf[T]
} else body
}
-
+
/** Starts a parallel computation and returns a future.
- *
+ *
* $invokingPar
- *
+ *
* @tparam T the type of the result of the parallel computation
* @param body the computation to be invoked in parallel
* @return a future with the result
@@ -96,9 +96,9 @@ package object parallel {
executeTask(task)
task
}
-
+
/** Starts 2 parallel computations and returns a future.
- *
+ *
* $invokingPar
*
* @tparam T1 the type of the result of 1st the parallel computation
@@ -114,9 +114,9 @@ package object parallel {
executeTask(t2)
(t1, t2)
}
-
+
/** Starts 3 parallel computations and returns a future.
- *
+ *
* $invokingPar
*
* @tparam T1 the type of the result of 1st the parallel computation
@@ -136,9 +136,9 @@ package object parallel {
executeTask(t3)
(t1, t2, t3)
}
-
+
/** Starts 4 parallel computations and returns a future.
- *
+ *
* $invokingPar
*
* @tparam T1 the type of the result of 1st the parallel computation
@@ -162,7 +162,7 @@ package object parallel {
executeTask(t4)
(t1, t2, t3, t4)
}
-
+
}