diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-05-22 14:26:54 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-05-22 14:26:54 +0000 |
commit | 6b26cdf4fc9d324079c65a694b6c55ef2edf3f26 (patch) | |
tree | 96c61624aa537233aa0f6189e5ba081e1d52451e /src/actors | |
parent | 36a2c0d43b2b5e9ed8e7233649d7c22e5555086c (diff) | |
download | scala-6b26cdf4fc9d324079c65a694b6c55ef2edf3f26.tar.gz scala-6b26cdf4fc9d324079c65a694b6c55ef2edf3f26.tar.bz2 scala-6b26cdf4fc9d324079c65a694b6c55ef2edf3f26.zip |
Added JDK 5 Executor-based schedulers.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 20 | ||||
-rw-r--r-- | src/actors/scala/actors/Channel.scala | 2 | ||||
-rw-r--r-- | src/actors/scala/actors/DefaultExecutorScheduler.scala | 74 | ||||
-rw-r--r-- | src/actors/scala/actors/ExecutorScheduler.scala | 35 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 8 | ||||
-rw-r--r-- | src/actors/scala/actors/SchedulerService.scala | 18 |
6 files changed, 134 insertions, 23 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 3b07aae124..63d36eb7f8 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -211,7 +211,7 @@ object Actor { true // events are immediately removed from the mailbox def apply(m: Any) { if (f.isDefinedAt(m)) f(m) - self.react(this) + a.react(this) } } @@ -431,7 +431,7 @@ trait Actor extends AbstractActor { * @return result of processing the received value */ def receive[R](f: PartialFunction[Any, R]): R = { - assert(Actor.self == this, "receive from channel belonging to other actor") + assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") this.synchronized { if (shouldExit) exit() // links val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) @@ -458,7 +458,7 @@ trait Actor extends AbstractActor { * @return result of processing the received value */ def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = { - assert(Actor.self == this, "receive from channel belonging to other actor") + assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") this.synchronized { if (shouldExit) exit() // links @@ -510,7 +510,7 @@ trait Actor extends AbstractActor { * @param f a partial function with message patterns and actions */ def react(f: PartialFunction[Any, Unit]): Nothing = { - assert(Actor.self == this, "react on channel belonging to other actor") + assert(Actor.self(scheduler) == this, "react on channel belonging to other actor") this.synchronized { if (shouldExit) exit() // links val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) @@ -537,7 +537,7 @@ trait Actor extends AbstractActor { * @param f a partial function with message patterns and actions */ def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = { - assert(Actor.self == this, "react on channel belonging to other actor") + assert(Actor.self(scheduler) == this, "react on channel belonging to other actor") this.synchronized { if (shouldExit) exit() // links // first, remove spurious TIMEOUT message from mailbox if any @@ -907,7 +907,7 @@ trait Actor extends AbstractActor { * @return ... */ def link(to: AbstractActor): AbstractActor = { - assert(Actor.self == this, "link called on actor different from self") + assert(Actor.self(scheduler) == this, "link called on actor different from self") this linkTo to to linkTo this to @@ -917,7 +917,7 @@ trait Actor extends AbstractActor { * Links <code>self</code> to actor defined by <code>body</code>. */ def link(body: => Unit): Actor = { - assert(Actor.self == this, "link called on actor different from self") + assert(Actor.self(scheduler) == this, "link called on actor different from self") val a = new Actor { def act() = body override final val scheduler: IScheduler = Actor.this.scheduler @@ -935,7 +935,7 @@ trait Actor extends AbstractActor { * Unlinks <code>self</code> from actor <code>from</code>. */ def unlink(from: AbstractActor) { - assert(Actor.self == this, "unlink called on actor different from self") + assert(Actor.self(scheduler) == this, "unlink called on actor different from self") this unlinkFrom from from unlinkFrom this } @@ -965,7 +965,7 @@ trait Actor extends AbstractActor { * <code>reason != 'normal</code>. * </p> */ - def exit(reason: AnyRef): Nothing = { + protected[actors] def exit(reason: AnyRef): Nothing = { exitReason = reason exit() } @@ -973,7 +973,7 @@ trait Actor extends AbstractActor { /** * Terminates with exit reason <code>'normal</code>. */ - def exit(): Nothing = { + protected[actors] def exit(): Nothing = { // links if (!links.isEmpty) exitLinked() diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index dddf3fa903..da41318138 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -40,7 +40,7 @@ case class ! [a](ch: Channel[a], msg: a) */ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { - private[actors] var recv: Actor = { + private var recv: Actor = { // basically Actor.self, but can be null Actor.tl.get.asInstanceOf[Actor] } diff --git a/src/actors/scala/actors/DefaultExecutorScheduler.scala b/src/actors/scala/actors/DefaultExecutorScheduler.scala new file mode 100644 index 0000000000..22da121bbc --- /dev/null +++ b/src/actors/scala/actors/DefaultExecutorScheduler.scala @@ -0,0 +1,74 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue} + +/** + * The <code>DefaultExecutorScheduler</code> class uses a default + * <code>ThreadPoolExecutor</code> for executing <code>Actor</code>s. + * + * It can be configured using the two JVM properties + * <code>actors.corePoolSize</code> and + * <code>actors.maxPoolSize</code> that control the initial and + * maximum size of the thread pool, respectively. + * + * @author Philipp Haller + */ +class DefaultExecutorScheduler extends { + + private val rt = Runtime.getRuntime() + private val minNumThreads = 4 + + /** The value of the actors.corePoolSize JVM property. This property + * determines the initial thread pool size. + */ + private val coreProp = try { + System.getProperty("actors.corePoolSize") + } catch { + case ace: java.security.AccessControlException => + null + } + + private val maxProp = + try { + System.getProperty("actors.maxPoolSize") + } catch { + case ace: java.security.AccessControlException => + null + } + + private val initCoreSize = + if (null ne coreProp) Integer.parseInt(coreProp) + else { + val numCores = rt.availableProcessors() + if (2 * numCores > minNumThreads) + 2 * numCores + else + minNumThreads + } + + private val maxSize = + if (null ne maxProp) Integer.parseInt(maxProp) + else 256 + + private val coreSize = initCoreSize + + private val workQueue = new LinkedBlockingQueue[Runnable] + + private val threadPool = new ThreadPoolExecutor(coreSize, + maxSize, + 50L, + TimeUnit.MILLISECONDS, + workQueue) + } with ExecutorScheduler(threadPool) { + override val CHECK_FREQ = 50 + } diff --git a/src/actors/scala/actors/ExecutorScheduler.scala b/src/actors/scala/actors/ExecutorScheduler.scala new file mode 100644 index 0000000000..98bd953118 --- /dev/null +++ b/src/actors/scala/actors/ExecutorScheduler.scala @@ -0,0 +1,35 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +import java.util.concurrent.ExecutorService + +/** + * The <code>ExecutorScheduler</code> class uses an + * <code>ExecutorService</code> to execute <code>Actor</code>s. + * + * @author Philipp Haller + */ +class ExecutorScheduler(executor: ExecutorService) extends SchedulerService { + + /** Submits a <code>Runnable</code> for execution. + * + * @param task the task to be executed + */ + def execute(task: Runnable): Unit = + executor execute task + + /** This method is called when the <code>SchedulerService</code> + * shuts down. + */ + def onShutdown(): Unit = + executor.shutdown() +} diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index c4ebe85861..d16107fdb2 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -14,8 +14,8 @@ import compat.Platform import java.lang.Runnable /** - * The <code>Scheduler</code> object is used by - * <code>Actor</code> to execute tasks of an execution of an actor. + * The <code>Scheduler</code> object is used by <code>Actor</code> to + * execute tasks of an execution of an actor. * * @version 0.9.18 * @author Philipp Haller @@ -25,7 +25,7 @@ object Scheduler extends IScheduler { Debug.info("initializing "+this+"...") private var sched: IScheduler = { - val s = new FJTaskScheduler2 + val s = new DefaultExecutorScheduler s.start() s } @@ -108,7 +108,7 @@ object Scheduler extends IScheduler { * for all schedulers used to execute actor tasks. * * Subclasses of <code>Actor</code> that override its - * <code>scheduler</code> member value must provide + * <code>scheduler</code> member must provide * an implementation of the <code>IScheduler</code> * trait. * diff --git a/src/actors/scala/actors/SchedulerService.scala b/src/actors/scala/actors/SchedulerService.scala index f885e30b02..7b0640896a 100644 --- a/src/actors/scala/actors/SchedulerService.scala +++ b/src/actors/scala/actors/SchedulerService.scala @@ -13,10 +13,10 @@ package scala.actors import java.lang.{Runnable, Thread, InterruptedException} /** - * The abstract <code>SchedulerService</code> class allows - * subclasses to implement a custom <code>onShutdown</code> - * method, which is invoked when the runtime system has detected - * that all actors have been terminated. + * The abstract <code>SchedulerService</code> class allows subclasses + * to implement a custom <code>onShutdown</code> method, which is + * invoked when the runtime system has detected that all actors have + * been terminated. * * @version 0.9.18 * @author Philipp Haller @@ -78,12 +78,14 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler } } - /** Submits a <code>Runnable</code> for execution. + /** Submits a closure for execution. * - * @param task the task to be executed + * @param fun the closure to be executed */ - def execute(task: Runnable): Unit = - execute { task.run() } + def execute(fun: => Unit): Unit = + execute(new Runnable { + def run() { fun } + }) /** Shuts down the scheduler. */ |