summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2011-03-24 15:00:41 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2011-03-24 15:00:41 +0000
commitd0e519a3091ce7e14781a8b929c35a3e228194d8 (patch)
tree161f5c08de8d018d85cd73a663f2929b9a3eeaac /src/library
parentaf011572ee74162202b2a66a98bf5e480b5b435b (diff)
downloadscala-d0e519a3091ce7e14781a8b929c35a3e228194d8.tar.gz
scala-d0e519a3091ce7e14781a8b929c35a3e228194d8.tar.bz2
scala-d0e519a3091ce7e14781a8b929c35a3e228194d8.zip
Added a temporary fix for #4351, but disabled i...
Added a temporary fix for #4351, but disabled it because the extend specialized class with nonspecialized type-parameters is used in the stdlib already. Disabling scala.parallel package, adding the currently disabled scala.concurrent package which will be implemented in some of the next releases. Review by phaller.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala70
-rw-r--r--src/library/scala/concurrent/package.scala.disabled108
-rw-r--r--src/library/scala/parallel/package.scala.disabled (renamed from src/library/scala/parallel/package.scala)44
3 files changed, 198 insertions, 24 deletions
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index 0b220a020f..80cdd31fa1 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -279,7 +279,9 @@ trait ThreadPoolTasks extends Tasks {
}
override def release = synchronized {
completed = true
- decrTasks
+ executor.synchronized {
+ decrTasks
+ }
this.notifyAll
}
}
@@ -352,6 +354,70 @@ object ThreadPoolTasks {
}
+/** An implementation of tasks objects based on the Java thread pooling API and synchronization using futures. */
+trait FutureThreadPoolTasks extends Tasks {
+ import java.util.concurrent._
+
+ trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
+ @volatile var future: Future[_] = null
+
+ def start = {
+ executor.synchronized {
+ future = executor.submit(this)
+ }
+ }
+ def sync = future.get
+ def tryCancel = false
+ def run = {
+ compute
+ }
+ }
+
+ protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
+
+ var environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool
+ def executor = environment.asInstanceOf[ThreadPoolExecutor]
+
+ def execute[R, Tp](task: Task[R, Tp]): () => R = {
+ val t = newTaskImpl(task)
+
+ // debuglog("-----------> Executing without wait: " + task)
+ t.start
+
+ () => {
+ t.sync
+ t.body.forwardThrowable
+ t.body.result
+ }
+ }
+
+ def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
+ val t = newTaskImpl(task)
+
+ // debuglog("-----------> Executing with wait: " + task)
+ t.start
+
+ t.sync
+ t.body.forwardThrowable
+ t.body.result
+ }
+
+ def parallelismLevel = FutureThreadPoolTasks.numCores
+
+}
+
+object FutureThreadPoolTasks {
+ import java.util.concurrent._
+
+ val numCores = Runtime.getRuntime.availableProcessors
+
+ val tcount = new atomic.AtomicLong(0L)
+
+ val defaultThreadPool = Executors.newCachedThreadPool()
+}
+
+
+
/**
* A trait describing objects that provide a fork/join pool.
*/
@@ -430,7 +496,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
object ForkJoinTasks {
- val defaultForkJoinPool: ForkJoinPool = scala.parallel.forkjoinpool
+ val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() // scala.parallel.forkjoinpool
// defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
// defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
}
diff --git a/src/library/scala/concurrent/package.scala.disabled b/src/library/scala/concurrent/package.scala.disabled
new file mode 100644
index 0000000000..42b4bf954c
--- /dev/null
+++ b/src/library/scala/concurrent/package.scala.disabled
@@ -0,0 +1,108 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+
+
+package scala
+
+
+
+
+/** This package object contains primitives for parallel programming.
+ */
+package object concurrent {
+
+ /** Performs a call which can potentially block execution.
+ *
+ * Example:
+ * {{{
+ * val lock = new ReentrantLock
+ *
+ * // ... do something ...
+ *
+ * blocking {
+ * if (!lock.hasLock) lock.lock()
+ * }
+ * }}}
+ *
+ * '''Note:''' calling methods that wait arbitrary amounts of time
+ * (e.g. for I/O operations or locks) may severely decrease performance
+ * or even result in deadlocks. This does not include waiting for
+ * results of futures.
+ *
+ * @tparam T the result type of the blocking operation
+ * @param body the blocking operation
+ * @param runner the runner used for parallel computations
+ * @return the result of the potentially blocking operation
+ */
+ def blocking[T](body: =>T)(implicit runner: TaskRunner): T = {
+ null.asInstanceOf[T]
+ }
+
+ /** Invokes a computation asynchronously. Does not wait for the computation
+ * to finish.
+ *
+ * @tparam U the result type of the operation
+ * @param p the computation to be invoked asynchronously
+ * @param runner the runner used for parallel computations
+ */
+ def spawn[U](p: =>U)(implicit runner: TaskRunner): Unit = {
+ }
+
+ /** Starts 2 parallel computations and returns once they are completed.
+ *
+ * $invokingPar
+ *
+ * @tparam T1 the type of the result of 1st the parallel computation
+ * @tparam T2 the type of the result of 2nd the parallel computation
+ * @param b1 the 1st computation to be invoked in parallel
+ * @param b2 the 2nd computation to be invoked in parallel
+ * @param runner the runner used for parallel computations
+ * @return a tuple of results corresponding to parallel computations
+ */
+ def par[T1, T2](b1: =>T1)(b2: =>T2)(implicit runner: TaskRunner): (T1, T2) = {
+ null
+ }
+
+ /** Starts 3 parallel computations and returns once they are completed.
+ *
+ * $invokingPar
+ *
+ * @tparam T1 the type of the result of 1st the parallel computation
+ * @tparam T2 the type of the result of 2nd the parallel computation
+ * @tparam T3 the type of the result of 3rd the parallel computation
+ * @param b1 the 1st computation to be invoked in parallel
+ * @param b2 the 2nd computation to be invoked in parallel
+ * @param b3 the 3rd computation to be invoked in parallel
+ * @param runner the runner used for parallel computations
+ * @return a tuple of results corresponding to parallel computations
+ */
+ def par[T1, T2, T3](b1: =>T1)(b2: =>T2)(b3: =>T3)(implicit runner: TaskRunner): (T1, T2, T3) = {
+ null
+ }
+
+ /** Starts 4 parallel computations and returns once they are completed.
+ *
+ * $invokingPar
+ *
+ * @tparam T1 the type of the result of 1st the parallel computation
+ * @tparam T2 the type of the result of 2nd the parallel computation
+ * @tparam T3 the type of the result of 3rd the parallel computation
+ * @tparam T4 the type of the result of 4th the parallel computation
+ * @param b1 the 1st computation to be invoked in parallel
+ * @param b2 the 2nd computation to be invoked in parallel
+ * @param b3 the 3rd computation to be invoked in parallel
+ * @param b4 the 4th computation to be invoked in parallel
+ * @param runner the runner used for parallel computations
+ * @return a tuple of results corresponding to parallel computations
+ */
+ def par[T1, T2, T3, T4](b1: =>T1)(b2: =>T2)(b3: =>T3)(b4: =>T4)(implicit runner: TaskRunner): (T1, T2, T3, T4) = {
+ null
+ }
+
+}
diff --git a/src/library/scala/parallel/package.scala b/src/library/scala/parallel/package.scala.disabled
index 4cae1ad4b1..45f5470d03 100644
--- a/src/library/scala/parallel/package.scala
+++ b/src/library/scala/parallel/package.scala.disabled
@@ -15,13 +15,13 @@ import scala.concurrent.forkjoin._
* chain obtained by querying results of unfinished futures can have
* arbitrary lengths. However, care must be taken not to create a
* circular dependency, as this will result in a deadlock.
- *
+ *
* Additionally, if the parallel computation performs a blocking call
* (e.g. an I/O operation or waiting for a lock) other than waiting for a future,
* it should do so by invoking the `block` method. This is another
* form of waiting that could potentially create a circular dependency,
* an the user should take care not to do this.
- *
+ *
* Users should be aware that invoking a parallel computation has a
* certain overhead. Parallel computations should not be invoked for
* small computations, as this can lead to bad performance. A rule of the
@@ -31,36 +31,36 @@ import scala.concurrent.forkjoin._
* computationally equivalent to a loop with 10000 arithmetic operations.
*/
package object parallel {
-
+
private[scala] val forkjoinpool = new ForkJoinPool()
-
+
private class Task[T](body: =>T) extends RecursiveTask[T] with Future[T] {
def compute = body
def apply() = join()
}
-
+
private final def newTask[T](body: =>T) = new Task[T](body)
-
+
private final def executeTask[T](task: RecursiveTask[T]) {
if (Thread.currentThread().isInstanceOf[ForkJoinWorkerThread]) task.fork
else forkjoinpool.execute(task)
}
-
+
/* public methods */
-
+
/** Performs a call which can potentially block execution.
- *
+ *
* Example:
* {{{
* val lock = new ReentrantLock
- *
+ *
* // ... do something ...
- *
+ *
* blocking {
* if (!lock.hasLock) lock.lock()
* }
* }}}
- *
+ *
* '''Note:''' calling methods that wait arbitrary amounts of time
* (e.g. for I/O operations or locks) may severely decrease performance
* or even result in deadlocks. This does not include waiting for
@@ -82,11 +82,11 @@ package object parallel {
blocker.result.asInstanceOf[T]
} else body
}
-
+
/** Starts a parallel computation and returns a future.
- *
+ *
* $invokingPar
- *
+ *
* @tparam T the type of the result of the parallel computation
* @param body the computation to be invoked in parallel
* @return a future with the result
@@ -96,9 +96,9 @@ package object parallel {
executeTask(task)
task
}
-
+
/** Starts 2 parallel computations and returns a future.
- *
+ *
* $invokingPar
*
* @tparam T1 the type of the result of 1st the parallel computation
@@ -114,9 +114,9 @@ package object parallel {
executeTask(t2)
(t1, t2)
}
-
+
/** Starts 3 parallel computations and returns a future.
- *
+ *
* $invokingPar
*
* @tparam T1 the type of the result of 1st the parallel computation
@@ -136,9 +136,9 @@ package object parallel {
executeTask(t3)
(t1, t2, t3)
}
-
+
/** Starts 4 parallel computations and returns a future.
- *
+ *
* $invokingPar
*
* @tparam T1 the type of the result of 1st the parallel computation
@@ -162,7 +162,7 @@ package object parallel {
executeTask(t4)
(t1, t2, t3, t4)
}
-
+
}