diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-08-03 15:56:44 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-08-03 15:56:44 +0000 |
commit | ac89702827559cab8835edecb35cc09a1ca3fe10 (patch) | |
tree | 7d8b01c12aeb8f42192ac68cce3289b3a4078310 /src | |
parent | cf7a2f64f1357dcfa8ecf78ae8f29880c9fab214 (diff) | |
download | scala-ac89702827559cab8835edecb35cc09a1ca3fe10.tar.gz scala-ac89702827559cab8835edecb35cc09a1ca3fe10.tar.bz2 scala-ac89702827559cab8835edecb35cc09a1ca3fe10.zip |
Added the scala.concurrent.TaskRunner and scala...
Added the scala.concurrent.TaskRunner and
scala.concurrent.AsyncInvokable abstractions with corresponding
refactorings in scala.actors and scala.concurrent.
Diffstat (limited to 'src')
28 files changed, 285 insertions, 88 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index f9e79ce98f..aa65f5a6ac 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -643,7 +643,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { scheduler executeFromActor task } - private class ActorBlocker(timeout: Long) extends ManagedBlocker { + private class ActorBlocker(timeout: Long) extends scala.concurrent.ManagedBlocker { def block() = { if (timeout > 0) Actor.this.suspendActorFor(timeout) @@ -651,7 +651,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { Actor.this.suspendActor() true } - def isReleasable() = + def isReleasable = !Actor.this.isSuspended } diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index e789e96be8..b12ef2d925 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -121,9 +121,6 @@ class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) def execute(task: Runnable): Unit = executor execute task - def executeFromActor(task: Runnable) = - execute(task) - def execute(fun: => Unit): Unit = executor.execute(new Runnable { def run() { fun } @@ -142,4 +139,7 @@ class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) def isActive = !terminating && !suspending + def managedBlock(blocker: scala.concurrent.ManagedBlocker) { + blocker.block() + } } diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala index 8f3b243b71..f4034fbfab 100644 --- a/src/actors/scala/actors/IScheduler.scala +++ b/src/actors/scala/actors/IScheduler.scala @@ -16,8 +16,7 @@ package scala.actors * * Subclasses of <code>Actor</code> that override its * <code>scheduler</code> member must provide - * an implementation of the <code>IScheduler</code> - * trait. + * an <code>IScheduler</code> implementation. * * @author Philipp Haller */ @@ -35,7 +34,8 @@ trait IScheduler { */ def execute(task: Runnable): Unit - def executeFromActor(task: Runnable): Unit + def executeFromActor(task: Runnable): Unit = + execute(task) /** Shuts down the scheduler. */ @@ -66,7 +66,5 @@ trait IScheduler { */ def onTerminate(a: Reactor)(f: => Unit): Unit - def managedBlock(blocker: ManagedBlocker) { - blocker.block() - } + def managedBlock(blocker: scala.concurrent.ManagedBlocker): Unit } diff --git a/src/actors/scala/actors/ManagedBlocker.scala b/src/actors/scala/actors/ManagedBlocker.scala deleted file mode 100644 index f3fd08301b..0000000000 --- a/src/actors/scala/actors/ManagedBlocker.scala +++ /dev/null @@ -1,20 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors - -import forkjoin.ForkJoinPool - -/** - * The <code>ManagedBlocker</code> trait... - * - * @author Philipp Haller - */ -trait ManagedBlocker extends ForkJoinPool.ManagedBlocker diff --git a/src/actors/scala/actors/Replyable.scala b/src/actors/scala/actors/Replyable.scala index 99c65d29f3..39d87241a6 100644 --- a/src/actors/scala/actors/Replyable.scala +++ b/src/actors/scala/actors/Replyable.scala @@ -16,8 +16,11 @@ package scala.actors * * @author Philipp Haller */ -trait Replyable[-T, +R] { - +trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] { +/* + def apply(msg: T): this.Future[R] = + this !! msg +*/ /** * Sends <code>msg</code> to this Replyable and awaits reply * (synchronous). @@ -45,8 +48,8 @@ trait Replyable[-T, +R] { * @param msg the message to be sent * @return the future */ - def !!(msg: T): (() => R) = - () => this !? msg + def !!(msg: T): this.Future[R] +// () => this !? msg /** * Sends <code>msg</code> to this actor and immediately @@ -59,7 +62,7 @@ trait Replyable[-T, +R] { * @param f the function to be applied to the response * @return the future */ - def !![P](msg: T, f: PartialFunction[R, P]): (() => P) = - () => f(this !? msg) + def !![P](msg: T, f: PartialFunction[R, P]): this.Future[P] +// () => f(this !? msg) } diff --git a/src/actors/scala/actors/ReplyableActor.scala b/src/actors/scala/actors/ReplyableActor.scala index 1e1487bf39..8ba4f13842 100644 --- a/src/actors/scala/actors/ReplyableActor.scala +++ b/src/actors/scala/actors/ReplyableActor.scala @@ -84,6 +84,7 @@ trait ReplyableActor extends ReplyableReactor { override def !!(msg: Any): Future[Any] = { val ftch = new Channel[Any](Actor.self(thiz.scheduler)) val linkedChannel = new AbstractActor { + type Future[+R] = scala.actors.Future[R] def !(msg: Any) = ftch ! msg def send(msg: Any, replyTo: OutputChannel[Any]) = diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala index 84168abe0a..6752fbb7b4 100644 --- a/src/actors/scala/actors/ReplyableReactor.scala +++ b/src/actors/scala/actors/ReplyableReactor.scala @@ -20,6 +20,8 @@ package scala.actors trait ReplyableReactor extends Replyable[Any, Any] { thiz: ReplyReactor => + type Future[+S] = scala.actors.Future[S] + /** * Sends <code>msg</code> to this actor and awaits reply * (synchronous). diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index cb764e963b..894dbc93e8 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -24,12 +24,13 @@ object Scheduler extends DelegatingScheduler { Debug.info("initializing "+this+"...") def makeNewScheduler: IScheduler = { - val workQueue = new LinkedBlockingQueue[Runnable](100000) + val workQueue = new LinkedBlockingQueue[Runnable] val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, - 50L, + 60000L, TimeUnit.MILLISECONDS, - workQueue) + workQueue, + new ThreadPoolExecutor.CallerRunsPolicy) val s = new ThreadPoolScheduler(threadPool, true) //val s = new ForkJoinScheduler Debug.info(this+": starting new "+s+" ["+s.getClass+"]") diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala index 6355ee1ace..ba7822c372 100644 --- a/src/actors/scala/actors/SchedulerAdapter.scala +++ b/src/actors/scala/actors/SchedulerAdapter.scala @@ -28,9 +28,6 @@ trait SchedulerAdapter extends IScheduler { def execute(task: Runnable): Unit = execute { task.run() } - def executeFromActor(task: Runnable): Unit = - execute(task) - /** Shuts down the scheduler. */ def shutdown(): Unit = @@ -64,4 +61,8 @@ trait SchedulerAdapter extends IScheduler { */ def onTerminate(a: Reactor)(f: => Unit) = Scheduler.onTerminate(a)(f) + + def managedBlock(blocker: scala.concurrent.ManagedBlocker) { + blocker.block() + } } diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala index d99a8d3c1c..60bcd34221 100644 --- a/src/actors/scala/actors/remote/Proxy.scala +++ b/src/actors/scala/actors/remote/Proxy.scala @@ -20,6 +20,8 @@ import scala.collection.mutable.HashMap private[remote] class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor { import java.io.{IOException, ObjectOutputStream, ObjectInputStream} + type Future[+R] = scala.actors.Future[R] + @transient private[remote] var del: Actor = null startDelegate() diff --git a/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala b/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala index 88ccb6dce8..4a8f9d034d 100644 --- a/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala +++ b/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala @@ -42,12 +42,12 @@ class DefaultExecutorScheduler(daemon: Boolean) private val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, - 50L, + 60000L, TimeUnit.MILLISECONDS, workQueue, threadFactory) val executor = threadPool - override val CHECK_FREQ = 50 + override val CHECK_FREQ = 10 } diff --git a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala index 4bd2d09bd0..e72acd09d8 100644 --- a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala +++ b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala @@ -9,6 +9,8 @@ package scala.actors package scheduler +import scala.concurrent.ManagedBlocker + /** * @author Erik Engbrecht */ @@ -37,7 +39,7 @@ trait DelegatingScheduler extends IScheduler { def execute(task: Runnable) = impl.execute(task) - def executeFromActor(task: Runnable) = impl.executeFromActor(task) + override def executeFromActor(task: Runnable) = impl.executeFromActor(task) def shutdown(): Unit = synchronized { if (sched ne null) { diff --git a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala index 8e032242f6..07a014f92d 100644 --- a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala +++ b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala @@ -11,7 +11,7 @@ package scala.actors package scheduler -import java.util.concurrent.{ExecutorService, RejectedExecutionException} +import scala.concurrent.ThreadPoolRunner /** * The <code>ExecutorScheduler</code> class uses an @@ -19,29 +19,9 @@ import java.util.concurrent.{ExecutorService, RejectedExecutionException} * * @author Philipp Haller */ -trait ExecutorScheduler extends IScheduler { +trait ExecutorScheduler extends IScheduler with ThreadPoolRunner[Unit] { - protected def executor: ExecutorService - - /** Submits a <code>Runnable</code> for execution. - * - * @param task the task to be executed - */ - def execute(task: Runnable) { - try { - executor execute task - } catch { - case ree: RejectedExecutionException => - // run task on current thread - task.run() - } - } - - def executeFromActor(task: Runnable) = - execute(task) - - /** This method is called when the <code>SchedulerService</code> - * shuts down. + /** This method is called when the scheduler shuts down. */ def onShutdown(): Unit = executor.shutdown() @@ -51,4 +31,5 @@ trait ExecutorScheduler extends IScheduler { */ def isActive = (executor ne null) && !executor.isShutdown + } diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala index 5d6d676b1a..82e8f5c2fd 100644 --- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala +++ b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala @@ -94,7 +94,7 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor pool.execute(task) } - def executeFromActor(task: Runnable) { + override def executeFromActor(task: Runnable) { val recAction = new RecursiveAction { def compute() = task.run() } @@ -110,8 +110,11 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor def run() { fun } }) - override def managedBlock(blocker: ManagedBlocker) { - ForkJoinPool.managedBlock(blocker, true) + override def managedBlock(blocker: scala.concurrent.ManagedBlocker) { + ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { + def block = blocker.block() + def isReleasable() = blocker.isReleasable + }, true) } /** Shuts down the scheduler. diff --git a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala index c7e588f2ed..4ad865a15d 100644 --- a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala +++ b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala @@ -24,9 +24,6 @@ class SingleThreadedScheduler extends IScheduler { task.run() } - def executeFromActor(task: Runnable) = - execute(task) - def execute(fun: => Unit): Unit = execute(new Runnable { def run() { fun } @@ -39,4 +36,8 @@ class SingleThreadedScheduler extends IScheduler { def onTerminate(actor: Reactor)(f: => Unit) {} def isActive = true + + def managedBlock(blocker: scala.concurrent.ManagedBlocker) { + blocker.block() + } } diff --git a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala index 4826d44383..fd8a0e6a64 100644 --- a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala +++ b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala @@ -12,6 +12,7 @@ package scala.actors package scheduler import java.util.concurrent.ThreadPoolExecutor +import scala.concurrent.ManagedBlocker /** * The <code>ThreadPoolScheduler</code> class uses an @@ -44,7 +45,7 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, override def managedBlock(blocker: ManagedBlocker) { val coreSize = executor.getCorePoolSize() - if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) { + if (coreSize < ThreadPoolConfig.maxPoolSize && (executor.getActiveCount() >= coreSize - 1)) { executor.setCorePoolSize(coreSize + 1) } blocker.block() @@ -67,7 +68,7 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, throw new QuitException val coreSize = executor.getCorePoolSize() - if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) { + if (coreSize < ThreadPoolConfig.maxPoolSize && (executor.getActiveCount() >= coreSize - 1)) { executor.setCorePoolSize(coreSize + 1) } } diff --git a/src/compiler/scala/tools/nsc/InterpreterLoop.scala b/src/compiler/scala/tools/nsc/InterpreterLoop.scala index 9d4e734a40..69db5c98b2 100644 --- a/src/compiler/scala/tools/nsc/InterpreterLoop.scala +++ b/src/compiler/scala/tools/nsc/InterpreterLoop.scala @@ -55,6 +55,8 @@ object InterpreterControl { } import InterpreterControl._ +import scala.concurrent.ops.defaultRunner + /** The * <a href="http://scala-lang.org/" target="_top">Scala</a> * interactive shell. It provides a read-eval-print loop around diff --git a/src/library/scala/concurrent/AsyncInvokable.scala b/src/library/scala/concurrent/AsyncInvokable.scala new file mode 100644 index 0000000000..ae84042689 --- /dev/null +++ b/src/library/scala/concurrent/AsyncInvokable.scala @@ -0,0 +1,13 @@ +package scala.concurrent + +/** The <code>AsyncInvokable</code> trait... + * + * @author Philipp Haller + */ +trait AsyncInvokable[-T, +R] { + + type Future[+S] <: () => S + + def !!(task: T): Future[R] + +} diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala index 0fa3c1660b..63477b4b3c 100644 --- a/src/library/scala/concurrent/DelayedLazyVal.scala +++ b/src/library/scala/concurrent/DelayedLazyVal.scala @@ -11,6 +11,7 @@ package scala.concurrent import annotation.experimental +import ops._ /** A <code>DelayedLazyVal</code> is a wrapper for lengthy * computations which have a valid partially computed result. @@ -37,8 +38,8 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) { */ def apply(): T = if (isDone) complete else f() - ops.future { + future { body isDone = true } -}
\ No newline at end of file +} diff --git a/src/library/scala/concurrent/JavaConversions.scala b/src/library/scala/concurrent/JavaConversions.scala new file mode 100644 index 0000000000..9fde489ced --- /dev/null +++ b/src/library/scala/concurrent/JavaConversions.scala @@ -0,0 +1,44 @@ +package scala.concurrent + +import java.util.concurrent.{ExecutorService, Executor} + +/** The <code>JavaConversions</code> object... + * + * @author Philipp Haller + */ +object JavaConversions { + + implicit def asTaskRunner(exec: ExecutorService): TaskRunner[Unit] = + new ThreadPoolRunner[Unit] { + override protected def executor = + exec + + def shutdown() = + exec.shutdown() + } + + implicit def asTaskRunner(exec: Executor): TaskRunner[Unit] = + new TaskRunner[Unit] { + type Future[+R] = () => R + + def submit(task: () => Unit): this.Future[Unit] = { + val result = new SyncVar[Either[Unit, Throwable]] + val runnable = new Runnable { + def run() { result set tryCatch(task()) } + } + exec.execute(runnable) + () => result.get match { + case Left(a) => a + case Right(t) => throw t + } + } + + def managedBlock(blocker: ManagedBlocker) { + blocker.block() + } + + def shutdown() { + // do nothing + } + } +} diff --git a/src/library/scala/concurrent/ManagedBlocker.scala b/src/library/scala/concurrent/ManagedBlocker.scala new file mode 100644 index 0000000000..c77f97285e --- /dev/null +++ b/src/library/scala/concurrent/ManagedBlocker.scala @@ -0,0 +1,24 @@ +package scala.concurrent + +/** The <code>ManagedBlocker</code> trait... + * + * @author Philipp Haller + */ +trait ManagedBlocker { + + /** + * Possibly blocks the current thread, for example waiting for + * a lock or condition. + * @return true if no additional blocking is necessary (i.e., + * if isReleasable would return true). + * @throws InterruptedException if interrupted while waiting + * (the method is not required to do so, but is allowed to). + */ + def block(): Boolean + + /** + * Returns true if blocking is unnecessary. + */ + def isReleasable: Boolean + +} diff --git a/src/library/scala/concurrent/TaskRunner.scala b/src/library/scala/concurrent/TaskRunner.scala new file mode 100644 index 0000000000..d29e8ff12f --- /dev/null +++ b/src/library/scala/concurrent/TaskRunner.scala @@ -0,0 +1,29 @@ +package scala.concurrent + +/** The <code>TaskRunner</code> trait... + * + * @author Philipp Haller + */ +trait TaskRunner[T] extends AsyncInvokable[() => T, T] { + + def submit(task: () => T): Future[T] + + def shutdown(): Unit + + def !!(task: () => T): Future[T] = + submit(task) + + def managedBlock(blocker: ManagedBlocker): Unit + + /** If expression computed successfully return it in <code>Left</code>, + * otherwise return exception in <code>Right</code>. + */ + protected def tryCatch[A](left: => A): Either[A, Exception] = { + try { + Left(left) + } catch { + case e: Exception => Right(e) + } + } + +} diff --git a/src/library/scala/concurrent/TaskRunners.scala b/src/library/scala/concurrent/TaskRunners.scala new file mode 100644 index 0000000000..8219d9d169 --- /dev/null +++ b/src/library/scala/concurrent/TaskRunners.scala @@ -0,0 +1,27 @@ +package scala.concurrent + +import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit} + +/** The <code>TaskRunners</code> object... + * + * @author Philipp Haller + */ +object TaskRunners { + + implicit val threadRunner: TaskRunner[Unit] = + new ThreadRunner[Unit] + + implicit val threadPoolRunner: TaskRunner[Unit] = { + val numCores = Runtime.getRuntime().availableProcessors() + val keepAliveTime = 60000L + val workQueue = new LinkedBlockingQueue[Runnable] + val exec = new ThreadPoolExecutor(numCores, + numCores, + keepAliveTime, + TimeUnit.MILLISECONDS, + workQueue, + new ThreadPoolExecutor.CallerRunsPolicy) + JavaConversions.asTaskRunner(exec) + } + +} diff --git a/src/library/scala/concurrent/ThreadPoolRunner.scala b/src/library/scala/concurrent/ThreadPoolRunner.scala new file mode 100644 index 0000000000..cbc5ebb293 --- /dev/null +++ b/src/library/scala/concurrent/ThreadPoolRunner.scala @@ -0,0 +1,44 @@ +package scala.concurrent + +import java.util.concurrent.{ExecutorService, Callable, TimeUnit} + +import scala.annotation.unchecked.uncheckedVariance + +/** The <code>ThreadPoolRunner</code> trait... + * + * @author Philipp Haller + */ +trait ThreadPoolRunner[T] extends TaskRunner[T] { + + type Future[+R] = RichFuture[R] + + trait RichFuture[+S] extends java.util.concurrent.Future[S @uncheckedVariance] + with (() => S) + + protected def executor: ExecutorService + + def submit(task: () => T): this.Future[T] = { + val callable = new Callable[T] { + def call() = task() + } + toRichFuture(executor.submit[T](callable)) + } + + def execute(task: Runnable): Unit = + executor execute task + + def managedBlock(blocker: ManagedBlocker) { + blocker.block() + } + + private def toRichFuture[S](future: java.util.concurrent.Future[S]) = + new RichFuture[S] { + def cancel(mayInterrupt: Boolean) = future cancel mayInterrupt + def get() = future.get() + def get(timeout: Long, unit: TimeUnit) = future.get(timeout, unit) + def isCancelled() = future.isCancelled() + def isDone() = future.isDone() + def apply() = future.get() + } + +} diff --git a/src/library/scala/concurrent/ThreadRunner.scala b/src/library/scala/concurrent/ThreadRunner.scala new file mode 100644 index 0000000000..7fb653a326 --- /dev/null +++ b/src/library/scala/concurrent/ThreadRunner.scala @@ -0,0 +1,33 @@ +package scala.concurrent + +import java.lang.Thread + +/** The <code>ThreadRunner</code> trait... + * + * @author Philipp Haller + */ +class ThreadRunner[T] extends TaskRunner[T] { + + type Future[+S] = () => S + + def submit(task: () => T): this.Future[T] = { + val result = new SyncVar[Either[T, Exception]] + val runnable = new Runnable { + def run() { result set tryCatch(task()) } + } + (new Thread(runnable)).start() + () => result.get match { + case Left(a) => a + case Right(t) => throw t + } + } + + def managedBlock(blocker: ManagedBlocker) { + blocker.block() + } + + def shutdown() { + // do nothing + } + +} diff --git a/src/library/scala/concurrent/jolib.scala b/src/library/scala/concurrent/jolib.scala index 615996695b..d53f90f744 100644 --- a/src/library/scala/concurrent/jolib.scala +++ b/src/library/scala/concurrent/jolib.scala @@ -11,6 +11,7 @@ package scala.concurrent +import ops._ /** * Library for using join-calculus concurrent primitives in Scala. @@ -44,7 +45,7 @@ package scala.concurrent case None => () => () case Some((p, r)) => { val args = values(p) - () => concurrent.ops.spawn(r(args)) + () => spawn(r(args)) } } diff --git a/src/library/scala/concurrent/ops.scala b/src/library/scala/concurrent/ops.scala index 939ed6f575..f3be1475a7 100644 --- a/src/library/scala/concurrent/ops.scala +++ b/src/library/scala/concurrent/ops.scala @@ -16,10 +16,13 @@ import java.lang.Thread /** The object <code>ops</code> ... * - * @author Martin Odersky, Stepan Koltsov - * @version 1.0, 12/03/2003 + * @author Martin Odersky, Stepan Koltsov, Philipp Haller */ object ops { + + implicit val defaultRunner: TaskRunner[Unit] = + TaskRunners.threadRunner + /** * If expression computed successfully return it in <code>Left</code>, * otherwise return exception in <code>Right</code>. @@ -36,18 +39,17 @@ object ops { * * @param p the expression to evaluate */ - def spawn(p: => Unit) = { - val t = new Thread() { override def run() = p } - t.start() + def spawn(p: => Unit)(implicit runner: TaskRunner[Unit]): Unit = { + runner submit (() => p) } /** * @param p ... * @return ... */ - def future[A](p: => A): () => A = { + def future[A](p: => A)(implicit runner: TaskRunner[Unit]): () => A = { val result = new SyncVar[Either[A, Throwable]] - spawn { result set tryCatch(p) } + spawn({ result set tryCatch(p) })(runner) () => result.get match { case Left(a) => a case Right(t) => throw t diff --git a/src/library/scala/concurrent/pilib.scala b/src/library/scala/concurrent/pilib.scala index a510f41055..246f7e2c54 100644 --- a/src/library/scala/concurrent/pilib.scala +++ b/src/library/scala/concurrent/pilib.scala @@ -11,7 +11,6 @@ package scala.concurrent - /** <p> * Library for using Pi-calculus concurrent primitives in * <a href="http://scala-lang.org/" target="_top">Scala</a>. As an @@ -33,6 +32,8 @@ package scala.concurrent */ object pilib { + import TaskRunners.threadRunner + //////////////////////////////// SPAWN ///////////////////////////////// /** |