diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-08-03 15:56:44 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-08-03 15:56:44 +0000 |
commit | ac89702827559cab8835edecb35cc09a1ca3fe10 (patch) | |
tree | 7d8b01c12aeb8f42192ac68cce3289b3a4078310 /src/library | |
parent | cf7a2f64f1357dcfa8ecf78ae8f29880c9fab214 (diff) | |
download | scala-ac89702827559cab8835edecb35cc09a1ca3fe10.tar.gz scala-ac89702827559cab8835edecb35cc09a1ca3fe10.tar.bz2 scala-ac89702827559cab8835edecb35cc09a1ca3fe10.zip |
Added the scala.concurrent.TaskRunner and scala...
Added the scala.concurrent.TaskRunner and
scala.concurrent.AsyncInvokable abstractions with corresponding
refactorings in scala.actors and scala.concurrent.
Diffstat (limited to 'src/library')
-rw-r--r-- | src/library/scala/concurrent/AsyncInvokable.scala | 13 | ||||
-rw-r--r-- | src/library/scala/concurrent/DelayedLazyVal.scala | 5 | ||||
-rw-r--r-- | src/library/scala/concurrent/JavaConversions.scala | 44 | ||||
-rw-r--r-- | src/library/scala/concurrent/ManagedBlocker.scala | 24 | ||||
-rw-r--r-- | src/library/scala/concurrent/TaskRunner.scala | 29 | ||||
-rw-r--r-- | src/library/scala/concurrent/TaskRunners.scala | 27 | ||||
-rw-r--r-- | src/library/scala/concurrent/ThreadPoolRunner.scala | 44 | ||||
-rw-r--r-- | src/library/scala/concurrent/ThreadRunner.scala | 33 | ||||
-rw-r--r-- | src/library/scala/concurrent/jolib.scala | 3 | ||||
-rw-r--r-- | src/library/scala/concurrent/ops.scala | 16 | ||||
-rw-r--r-- | src/library/scala/concurrent/pilib.scala | 3 |
11 files changed, 230 insertions, 11 deletions
diff --git a/src/library/scala/concurrent/AsyncInvokable.scala b/src/library/scala/concurrent/AsyncInvokable.scala new file mode 100644 index 0000000000..ae84042689 --- /dev/null +++ b/src/library/scala/concurrent/AsyncInvokable.scala @@ -0,0 +1,13 @@ +package scala.concurrent + +/** The <code>AsyncInvokable</code> trait... + * + * @author Philipp Haller + */ +trait AsyncInvokable[-T, +R] { + + type Future[+S] <: () => S + + def !!(task: T): Future[R] + +} diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala index 0fa3c1660b..63477b4b3c 100644 --- a/src/library/scala/concurrent/DelayedLazyVal.scala +++ b/src/library/scala/concurrent/DelayedLazyVal.scala @@ -11,6 +11,7 @@ package scala.concurrent import annotation.experimental +import ops._ /** A <code>DelayedLazyVal</code> is a wrapper for lengthy * computations which have a valid partially computed result. @@ -37,8 +38,8 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) { */ def apply(): T = if (isDone) complete else f() - ops.future { + future { body isDone = true } -}
\ No newline at end of file +} diff --git a/src/library/scala/concurrent/JavaConversions.scala b/src/library/scala/concurrent/JavaConversions.scala new file mode 100644 index 0000000000..9fde489ced --- /dev/null +++ b/src/library/scala/concurrent/JavaConversions.scala @@ -0,0 +1,44 @@ +package scala.concurrent + +import java.util.concurrent.{ExecutorService, Executor} + +/** The <code>JavaConversions</code> object... + * + * @author Philipp Haller + */ +object JavaConversions { + + implicit def asTaskRunner(exec: ExecutorService): TaskRunner[Unit] = + new ThreadPoolRunner[Unit] { + override protected def executor = + exec + + def shutdown() = + exec.shutdown() + } + + implicit def asTaskRunner(exec: Executor): TaskRunner[Unit] = + new TaskRunner[Unit] { + type Future[+R] = () => R + + def submit(task: () => Unit): this.Future[Unit] = { + val result = new SyncVar[Either[Unit, Throwable]] + val runnable = new Runnable { + def run() { result set tryCatch(task()) } + } + exec.execute(runnable) + () => result.get match { + case Left(a) => a + case Right(t) => throw t + } + } + + def managedBlock(blocker: ManagedBlocker) { + blocker.block() + } + + def shutdown() { + // do nothing + } + } +} diff --git a/src/library/scala/concurrent/ManagedBlocker.scala b/src/library/scala/concurrent/ManagedBlocker.scala new file mode 100644 index 0000000000..c77f97285e --- /dev/null +++ b/src/library/scala/concurrent/ManagedBlocker.scala @@ -0,0 +1,24 @@ +package scala.concurrent + +/** The <code>ManagedBlocker</code> trait... + * + * @author Philipp Haller + */ +trait ManagedBlocker { + + /** + * Possibly blocks the current thread, for example waiting for + * a lock or condition. + * @return true if no additional blocking is necessary (i.e., + * if isReleasable would return true). + * @throws InterruptedException if interrupted while waiting + * (the method is not required to do so, but is allowed to). + */ + def block(): Boolean + + /** + * Returns true if blocking is unnecessary. + */ + def isReleasable: Boolean + +} diff --git a/src/library/scala/concurrent/TaskRunner.scala b/src/library/scala/concurrent/TaskRunner.scala new file mode 100644 index 0000000000..d29e8ff12f --- /dev/null +++ b/src/library/scala/concurrent/TaskRunner.scala @@ -0,0 +1,29 @@ +package scala.concurrent + +/** The <code>TaskRunner</code> trait... + * + * @author Philipp Haller + */ +trait TaskRunner[T] extends AsyncInvokable[() => T, T] { + + def submit(task: () => T): Future[T] + + def shutdown(): Unit + + def !!(task: () => T): Future[T] = + submit(task) + + def managedBlock(blocker: ManagedBlocker): Unit + + /** If expression computed successfully return it in <code>Left</code>, + * otherwise return exception in <code>Right</code>. + */ + protected def tryCatch[A](left: => A): Either[A, Exception] = { + try { + Left(left) + } catch { + case e: Exception => Right(e) + } + } + +} diff --git a/src/library/scala/concurrent/TaskRunners.scala b/src/library/scala/concurrent/TaskRunners.scala new file mode 100644 index 0000000000..8219d9d169 --- /dev/null +++ b/src/library/scala/concurrent/TaskRunners.scala @@ -0,0 +1,27 @@ +package scala.concurrent + +import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit} + +/** The <code>TaskRunners</code> object... + * + * @author Philipp Haller + */ +object TaskRunners { + + implicit val threadRunner: TaskRunner[Unit] = + new ThreadRunner[Unit] + + implicit val threadPoolRunner: TaskRunner[Unit] = { + val numCores = Runtime.getRuntime().availableProcessors() + val keepAliveTime = 60000L + val workQueue = new LinkedBlockingQueue[Runnable] + val exec = new ThreadPoolExecutor(numCores, + numCores, + keepAliveTime, + TimeUnit.MILLISECONDS, + workQueue, + new ThreadPoolExecutor.CallerRunsPolicy) + JavaConversions.asTaskRunner(exec) + } + +} diff --git a/src/library/scala/concurrent/ThreadPoolRunner.scala b/src/library/scala/concurrent/ThreadPoolRunner.scala new file mode 100644 index 0000000000..cbc5ebb293 --- /dev/null +++ b/src/library/scala/concurrent/ThreadPoolRunner.scala @@ -0,0 +1,44 @@ +package scala.concurrent + +import java.util.concurrent.{ExecutorService, Callable, TimeUnit} + +import scala.annotation.unchecked.uncheckedVariance + +/** The <code>ThreadPoolRunner</code> trait... + * + * @author Philipp Haller + */ +trait ThreadPoolRunner[T] extends TaskRunner[T] { + + type Future[+R] = RichFuture[R] + + trait RichFuture[+S] extends java.util.concurrent.Future[S @uncheckedVariance] + with (() => S) + + protected def executor: ExecutorService + + def submit(task: () => T): this.Future[T] = { + val callable = new Callable[T] { + def call() = task() + } + toRichFuture(executor.submit[T](callable)) + } + + def execute(task: Runnable): Unit = + executor execute task + + def managedBlock(blocker: ManagedBlocker) { + blocker.block() + } + + private def toRichFuture[S](future: java.util.concurrent.Future[S]) = + new RichFuture[S] { + def cancel(mayInterrupt: Boolean) = future cancel mayInterrupt + def get() = future.get() + def get(timeout: Long, unit: TimeUnit) = future.get(timeout, unit) + def isCancelled() = future.isCancelled() + def isDone() = future.isDone() + def apply() = future.get() + } + +} diff --git a/src/library/scala/concurrent/ThreadRunner.scala b/src/library/scala/concurrent/ThreadRunner.scala new file mode 100644 index 0000000000..7fb653a326 --- /dev/null +++ b/src/library/scala/concurrent/ThreadRunner.scala @@ -0,0 +1,33 @@ +package scala.concurrent + +import java.lang.Thread + +/** The <code>ThreadRunner</code> trait... + * + * @author Philipp Haller + */ +class ThreadRunner[T] extends TaskRunner[T] { + + type Future[+S] = () => S + + def submit(task: () => T): this.Future[T] = { + val result = new SyncVar[Either[T, Exception]] + val runnable = new Runnable { + def run() { result set tryCatch(task()) } + } + (new Thread(runnable)).start() + () => result.get match { + case Left(a) => a + case Right(t) => throw t + } + } + + def managedBlock(blocker: ManagedBlocker) { + blocker.block() + } + + def shutdown() { + // do nothing + } + +} diff --git a/src/library/scala/concurrent/jolib.scala b/src/library/scala/concurrent/jolib.scala index 615996695b..d53f90f744 100644 --- a/src/library/scala/concurrent/jolib.scala +++ b/src/library/scala/concurrent/jolib.scala @@ -11,6 +11,7 @@ package scala.concurrent +import ops._ /** * Library for using join-calculus concurrent primitives in Scala. @@ -44,7 +45,7 @@ package scala.concurrent case None => () => () case Some((p, r)) => { val args = values(p) - () => concurrent.ops.spawn(r(args)) + () => spawn(r(args)) } } diff --git a/src/library/scala/concurrent/ops.scala b/src/library/scala/concurrent/ops.scala index 939ed6f575..f3be1475a7 100644 --- a/src/library/scala/concurrent/ops.scala +++ b/src/library/scala/concurrent/ops.scala @@ -16,10 +16,13 @@ import java.lang.Thread /** The object <code>ops</code> ... * - * @author Martin Odersky, Stepan Koltsov - * @version 1.0, 12/03/2003 + * @author Martin Odersky, Stepan Koltsov, Philipp Haller */ object ops { + + implicit val defaultRunner: TaskRunner[Unit] = + TaskRunners.threadRunner + /** * If expression computed successfully return it in <code>Left</code>, * otherwise return exception in <code>Right</code>. @@ -36,18 +39,17 @@ object ops { * * @param p the expression to evaluate */ - def spawn(p: => Unit) = { - val t = new Thread() { override def run() = p } - t.start() + def spawn(p: => Unit)(implicit runner: TaskRunner[Unit]): Unit = { + runner submit (() => p) } /** * @param p ... * @return ... */ - def future[A](p: => A): () => A = { + def future[A](p: => A)(implicit runner: TaskRunner[Unit]): () => A = { val result = new SyncVar[Either[A, Throwable]] - spawn { result set tryCatch(p) } + spawn({ result set tryCatch(p) })(runner) () => result.get match { case Left(a) => a case Right(t) => throw t diff --git a/src/library/scala/concurrent/pilib.scala b/src/library/scala/concurrent/pilib.scala index a510f41055..246f7e2c54 100644 --- a/src/library/scala/concurrent/pilib.scala +++ b/src/library/scala/concurrent/pilib.scala @@ -11,7 +11,6 @@ package scala.concurrent - /** <p> * Library for using Pi-calculus concurrent primitives in * <a href="http://scala-lang.org/" target="_top">Scala</a>. As an @@ -33,6 +32,8 @@ package scala.concurrent */ object pilib { + import TaskRunners.threadRunner + //////////////////////////////// SPAWN ///////////////////////////////// /** |