summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-08-03 15:56:44 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-08-03 15:56:44 +0000
commitac89702827559cab8835edecb35cc09a1ca3fe10 (patch)
tree7d8b01c12aeb8f42192ac68cce3289b3a4078310 /src/library
parentcf7a2f64f1357dcfa8ecf78ae8f29880c9fab214 (diff)
downloadscala-ac89702827559cab8835edecb35cc09a1ca3fe10.tar.gz
scala-ac89702827559cab8835edecb35cc09a1ca3fe10.tar.bz2
scala-ac89702827559cab8835edecb35cc09a1ca3fe10.zip
Added the scala.concurrent.TaskRunner and scala...
Added the scala.concurrent.TaskRunner and scala.concurrent.AsyncInvokable abstractions with corresponding refactorings in scala.actors and scala.concurrent.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/concurrent/AsyncInvokable.scala13
-rw-r--r--src/library/scala/concurrent/DelayedLazyVal.scala5
-rw-r--r--src/library/scala/concurrent/JavaConversions.scala44
-rw-r--r--src/library/scala/concurrent/ManagedBlocker.scala24
-rw-r--r--src/library/scala/concurrent/TaskRunner.scala29
-rw-r--r--src/library/scala/concurrent/TaskRunners.scala27
-rw-r--r--src/library/scala/concurrent/ThreadPoolRunner.scala44
-rw-r--r--src/library/scala/concurrent/ThreadRunner.scala33
-rw-r--r--src/library/scala/concurrent/jolib.scala3
-rw-r--r--src/library/scala/concurrent/ops.scala16
-rw-r--r--src/library/scala/concurrent/pilib.scala3
11 files changed, 230 insertions, 11 deletions
diff --git a/src/library/scala/concurrent/AsyncInvokable.scala b/src/library/scala/concurrent/AsyncInvokable.scala
new file mode 100644
index 0000000000..ae84042689
--- /dev/null
+++ b/src/library/scala/concurrent/AsyncInvokable.scala
@@ -0,0 +1,13 @@
+package scala.concurrent
+
+/** The <code>AsyncInvokable</code> trait...
+ *
+ * @author Philipp Haller
+ */
+trait AsyncInvokable[-T, +R] {
+
+ type Future[+S] <: () => S
+
+ def !!(task: T): Future[R]
+
+}
diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala
index 0fa3c1660b..63477b4b3c 100644
--- a/src/library/scala/concurrent/DelayedLazyVal.scala
+++ b/src/library/scala/concurrent/DelayedLazyVal.scala
@@ -11,6 +11,7 @@
package scala.concurrent
import annotation.experimental
+import ops._
/** A <code>DelayedLazyVal</code> is a wrapper for lengthy
* computations which have a valid partially computed result.
@@ -37,8 +38,8 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) {
*/
def apply(): T = if (isDone) complete else f()
- ops.future {
+ future {
body
isDone = true
}
-} \ No newline at end of file
+}
diff --git a/src/library/scala/concurrent/JavaConversions.scala b/src/library/scala/concurrent/JavaConversions.scala
new file mode 100644
index 0000000000..9fde489ced
--- /dev/null
+++ b/src/library/scala/concurrent/JavaConversions.scala
@@ -0,0 +1,44 @@
+package scala.concurrent
+
+import java.util.concurrent.{ExecutorService, Executor}
+
+/** The <code>JavaConversions</code> object...
+ *
+ * @author Philipp Haller
+ */
+object JavaConversions {
+
+ implicit def asTaskRunner(exec: ExecutorService): TaskRunner[Unit] =
+ new ThreadPoolRunner[Unit] {
+ override protected def executor =
+ exec
+
+ def shutdown() =
+ exec.shutdown()
+ }
+
+ implicit def asTaskRunner(exec: Executor): TaskRunner[Unit] =
+ new TaskRunner[Unit] {
+ type Future[+R] = () => R
+
+ def submit(task: () => Unit): this.Future[Unit] = {
+ val result = new SyncVar[Either[Unit, Throwable]]
+ val runnable = new Runnable {
+ def run() { result set tryCatch(task()) }
+ }
+ exec.execute(runnable)
+ () => result.get match {
+ case Left(a) => a
+ case Right(t) => throw t
+ }
+ }
+
+ def managedBlock(blocker: ManagedBlocker) {
+ blocker.block()
+ }
+
+ def shutdown() {
+ // do nothing
+ }
+ }
+}
diff --git a/src/library/scala/concurrent/ManagedBlocker.scala b/src/library/scala/concurrent/ManagedBlocker.scala
new file mode 100644
index 0000000000..c77f97285e
--- /dev/null
+++ b/src/library/scala/concurrent/ManagedBlocker.scala
@@ -0,0 +1,24 @@
+package scala.concurrent
+
+/** The <code>ManagedBlocker</code> trait...
+ *
+ * @author Philipp Haller
+ */
+trait ManagedBlocker {
+
+ /**
+ * Possibly blocks the current thread, for example waiting for
+ * a lock or condition.
+ * @return true if no additional blocking is necessary (i.e.,
+ * if isReleasable would return true).
+ * @throws InterruptedException if interrupted while waiting
+ * (the method is not required to do so, but is allowed to).
+ */
+ def block(): Boolean
+
+ /**
+ * Returns true if blocking is unnecessary.
+ */
+ def isReleasable: Boolean
+
+}
diff --git a/src/library/scala/concurrent/TaskRunner.scala b/src/library/scala/concurrent/TaskRunner.scala
new file mode 100644
index 0000000000..d29e8ff12f
--- /dev/null
+++ b/src/library/scala/concurrent/TaskRunner.scala
@@ -0,0 +1,29 @@
+package scala.concurrent
+
+/** The <code>TaskRunner</code> trait...
+ *
+ * @author Philipp Haller
+ */
+trait TaskRunner[T] extends AsyncInvokable[() => T, T] {
+
+ def submit(task: () => T): Future[T]
+
+ def shutdown(): Unit
+
+ def !!(task: () => T): Future[T] =
+ submit(task)
+
+ def managedBlock(blocker: ManagedBlocker): Unit
+
+ /** If expression computed successfully return it in <code>Left</code>,
+ * otherwise return exception in <code>Right</code>.
+ */
+ protected def tryCatch[A](left: => A): Either[A, Exception] = {
+ try {
+ Left(left)
+ } catch {
+ case e: Exception => Right(e)
+ }
+ }
+
+}
diff --git a/src/library/scala/concurrent/TaskRunners.scala b/src/library/scala/concurrent/TaskRunners.scala
new file mode 100644
index 0000000000..8219d9d169
--- /dev/null
+++ b/src/library/scala/concurrent/TaskRunners.scala
@@ -0,0 +1,27 @@
+package scala.concurrent
+
+import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit}
+
+/** The <code>TaskRunners</code> object...
+ *
+ * @author Philipp Haller
+ */
+object TaskRunners {
+
+ implicit val threadRunner: TaskRunner[Unit] =
+ new ThreadRunner[Unit]
+
+ implicit val threadPoolRunner: TaskRunner[Unit] = {
+ val numCores = Runtime.getRuntime().availableProcessors()
+ val keepAliveTime = 60000L
+ val workQueue = new LinkedBlockingQueue[Runnable]
+ val exec = new ThreadPoolExecutor(numCores,
+ numCores,
+ keepAliveTime,
+ TimeUnit.MILLISECONDS,
+ workQueue,
+ new ThreadPoolExecutor.CallerRunsPolicy)
+ JavaConversions.asTaskRunner(exec)
+ }
+
+}
diff --git a/src/library/scala/concurrent/ThreadPoolRunner.scala b/src/library/scala/concurrent/ThreadPoolRunner.scala
new file mode 100644
index 0000000000..cbc5ebb293
--- /dev/null
+++ b/src/library/scala/concurrent/ThreadPoolRunner.scala
@@ -0,0 +1,44 @@
+package scala.concurrent
+
+import java.util.concurrent.{ExecutorService, Callable, TimeUnit}
+
+import scala.annotation.unchecked.uncheckedVariance
+
+/** The <code>ThreadPoolRunner</code> trait...
+ *
+ * @author Philipp Haller
+ */
+trait ThreadPoolRunner[T] extends TaskRunner[T] {
+
+ type Future[+R] = RichFuture[R]
+
+ trait RichFuture[+S] extends java.util.concurrent.Future[S @uncheckedVariance]
+ with (() => S)
+
+ protected def executor: ExecutorService
+
+ def submit(task: () => T): this.Future[T] = {
+ val callable = new Callable[T] {
+ def call() = task()
+ }
+ toRichFuture(executor.submit[T](callable))
+ }
+
+ def execute(task: Runnable): Unit =
+ executor execute task
+
+ def managedBlock(blocker: ManagedBlocker) {
+ blocker.block()
+ }
+
+ private def toRichFuture[S](future: java.util.concurrent.Future[S]) =
+ new RichFuture[S] {
+ def cancel(mayInterrupt: Boolean) = future cancel mayInterrupt
+ def get() = future.get()
+ def get(timeout: Long, unit: TimeUnit) = future.get(timeout, unit)
+ def isCancelled() = future.isCancelled()
+ def isDone() = future.isDone()
+ def apply() = future.get()
+ }
+
+}
diff --git a/src/library/scala/concurrent/ThreadRunner.scala b/src/library/scala/concurrent/ThreadRunner.scala
new file mode 100644
index 0000000000..7fb653a326
--- /dev/null
+++ b/src/library/scala/concurrent/ThreadRunner.scala
@@ -0,0 +1,33 @@
+package scala.concurrent
+
+import java.lang.Thread
+
+/** The <code>ThreadRunner</code> trait...
+ *
+ * @author Philipp Haller
+ */
+class ThreadRunner[T] extends TaskRunner[T] {
+
+ type Future[+S] = () => S
+
+ def submit(task: () => T): this.Future[T] = {
+ val result = new SyncVar[Either[T, Exception]]
+ val runnable = new Runnable {
+ def run() { result set tryCatch(task()) }
+ }
+ (new Thread(runnable)).start()
+ () => result.get match {
+ case Left(a) => a
+ case Right(t) => throw t
+ }
+ }
+
+ def managedBlock(blocker: ManagedBlocker) {
+ blocker.block()
+ }
+
+ def shutdown() {
+ // do nothing
+ }
+
+}
diff --git a/src/library/scala/concurrent/jolib.scala b/src/library/scala/concurrent/jolib.scala
index 615996695b..d53f90f744 100644
--- a/src/library/scala/concurrent/jolib.scala
+++ b/src/library/scala/concurrent/jolib.scala
@@ -11,6 +11,7 @@
package scala.concurrent
+import ops._
/**
* Library for using join-calculus concurrent primitives in Scala.
@@ -44,7 +45,7 @@ package scala.concurrent
case None => () => ()
case Some((p, r)) => {
val args = values(p)
- () => concurrent.ops.spawn(r(args))
+ () => spawn(r(args))
}
}
diff --git a/src/library/scala/concurrent/ops.scala b/src/library/scala/concurrent/ops.scala
index 939ed6f575..f3be1475a7 100644
--- a/src/library/scala/concurrent/ops.scala
+++ b/src/library/scala/concurrent/ops.scala
@@ -16,10 +16,13 @@ import java.lang.Thread
/** The object <code>ops</code> ...
*
- * @author Martin Odersky, Stepan Koltsov
- * @version 1.0, 12/03/2003
+ * @author Martin Odersky, Stepan Koltsov, Philipp Haller
*/
object ops {
+
+ implicit val defaultRunner: TaskRunner[Unit] =
+ TaskRunners.threadRunner
+
/**
* If expression computed successfully return it in <code>Left</code>,
* otherwise return exception in <code>Right</code>.
@@ -36,18 +39,17 @@ object ops {
*
* @param p the expression to evaluate
*/
- def spawn(p: => Unit) = {
- val t = new Thread() { override def run() = p }
- t.start()
+ def spawn(p: => Unit)(implicit runner: TaskRunner[Unit]): Unit = {
+ runner submit (() => p)
}
/**
* @param p ...
* @return ...
*/
- def future[A](p: => A): () => A = {
+ def future[A](p: => A)(implicit runner: TaskRunner[Unit]): () => A = {
val result = new SyncVar[Either[A, Throwable]]
- spawn { result set tryCatch(p) }
+ spawn({ result set tryCatch(p) })(runner)
() => result.get match {
case Left(a) => a
case Right(t) => throw t
diff --git a/src/library/scala/concurrent/pilib.scala b/src/library/scala/concurrent/pilib.scala
index a510f41055..246f7e2c54 100644
--- a/src/library/scala/concurrent/pilib.scala
+++ b/src/library/scala/concurrent/pilib.scala
@@ -11,7 +11,6 @@
package scala.concurrent
-
/** <p>
* Library for using Pi-calculus concurrent primitives in
* <a href="http://scala-lang.org/" target="_top">Scala</a>. As an
@@ -33,6 +32,8 @@ package scala.concurrent
*/
object pilib {
+ import TaskRunners.threadRunner
+
//////////////////////////////// SPAWN /////////////////////////////////
/**