summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-05-22 14:26:54 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-05-22 14:26:54 +0000
commit6b26cdf4fc9d324079c65a694b6c55ef2edf3f26 (patch)
tree96c61624aa537233aa0f6189e5ba081e1d52451e
parent36a2c0d43b2b5e9ed8e7233649d7c22e5555086c (diff)
downloadscala-6b26cdf4fc9d324079c65a694b6c55ef2edf3f26.tar.gz
scala-6b26cdf4fc9d324079c65a694b6c55ef2edf3f26.tar.bz2
scala-6b26cdf4fc9d324079c65a694b6c55ef2edf3f26.zip
Added JDK 5 Executor-based schedulers.
-rw-r--r--src/actors/scala/actors/Actor.scala20
-rw-r--r--src/actors/scala/actors/Channel.scala2
-rw-r--r--src/actors/scala/actors/DefaultExecutorScheduler.scala74
-rw-r--r--src/actors/scala/actors/ExecutorScheduler.scala35
-rw-r--r--src/actors/scala/actors/Scheduler.scala8
-rw-r--r--src/actors/scala/actors/SchedulerService.scala18
6 files changed, 134 insertions, 23 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 3b07aae124..63d36eb7f8 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -211,7 +211,7 @@ object Actor {
true // events are immediately removed from the mailbox
def apply(m: Any) {
if (f.isDefinedAt(m)) f(m)
- self.react(this)
+ a.react(this)
}
}
@@ -431,7 +431,7 @@ trait Actor extends AbstractActor {
* @return result of processing the received value
*/
def receive[R](f: PartialFunction[Any, R]): R = {
- assert(Actor.self == this, "receive from channel belonging to other actor")
+ assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
this.synchronized {
if (shouldExit) exit() // links
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
@@ -458,7 +458,7 @@ trait Actor extends AbstractActor {
* @return result of processing the received value
*/
def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = {
- assert(Actor.self == this, "receive from channel belonging to other actor")
+ assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
this.synchronized {
if (shouldExit) exit() // links
@@ -510,7 +510,7 @@ trait Actor extends AbstractActor {
* @param f a partial function with message patterns and actions
*/
def react(f: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.self == this, "react on channel belonging to other actor")
+ assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
this.synchronized {
if (shouldExit) exit() // links
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
@@ -537,7 +537,7 @@ trait Actor extends AbstractActor {
* @param f a partial function with message patterns and actions
*/
def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.self == this, "react on channel belonging to other actor")
+ assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
this.synchronized {
if (shouldExit) exit() // links
// first, remove spurious TIMEOUT message from mailbox if any
@@ -907,7 +907,7 @@ trait Actor extends AbstractActor {
* @return ...
*/
def link(to: AbstractActor): AbstractActor = {
- assert(Actor.self == this, "link called on actor different from self")
+ assert(Actor.self(scheduler) == this, "link called on actor different from self")
this linkTo to
to linkTo this
to
@@ -917,7 +917,7 @@ trait Actor extends AbstractActor {
* Links <code>self</code> to actor defined by <code>body</code>.
*/
def link(body: => Unit): Actor = {
- assert(Actor.self == this, "link called on actor different from self")
+ assert(Actor.self(scheduler) == this, "link called on actor different from self")
val a = new Actor {
def act() = body
override final val scheduler: IScheduler = Actor.this.scheduler
@@ -935,7 +935,7 @@ trait Actor extends AbstractActor {
* Unlinks <code>self</code> from actor <code>from</code>.
*/
def unlink(from: AbstractActor) {
- assert(Actor.self == this, "unlink called on actor different from self")
+ assert(Actor.self(scheduler) == this, "unlink called on actor different from self")
this unlinkFrom from
from unlinkFrom this
}
@@ -965,7 +965,7 @@ trait Actor extends AbstractActor {
* <code>reason != 'normal</code>.
* </p>
*/
- def exit(reason: AnyRef): Nothing = {
+ protected[actors] def exit(reason: AnyRef): Nothing = {
exitReason = reason
exit()
}
@@ -973,7 +973,7 @@ trait Actor extends AbstractActor {
/**
* Terminates with exit reason <code>'normal</code>.
*/
- def exit(): Nothing = {
+ protected[actors] def exit(): Nothing = {
// links
if (!links.isEmpty)
exitLinked()
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index dddf3fa903..da41318138 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -40,7 +40,7 @@ case class ! [a](ch: Channel[a], msg: a)
*/
class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
- private[actors] var recv: Actor = {
+ private var recv: Actor = {
// basically Actor.self, but can be null
Actor.tl.get.asInstanceOf[Actor]
}
diff --git a/src/actors/scala/actors/DefaultExecutorScheduler.scala b/src/actors/scala/actors/DefaultExecutorScheduler.scala
new file mode 100644
index 0000000000..22da121bbc
--- /dev/null
+++ b/src/actors/scala/actors/DefaultExecutorScheduler.scala
@@ -0,0 +1,74 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue}
+
+/**
+ * The <code>DefaultExecutorScheduler</code> class uses a default
+ * <code>ThreadPoolExecutor</code> for executing <code>Actor</code>s.
+ *
+ * It can be configured using the two JVM properties
+ * <code>actors.corePoolSize</code> and
+ * <code>actors.maxPoolSize</code> that control the initial and
+ * maximum size of the thread pool, respectively.
+ *
+ * @author Philipp Haller
+ */
+class DefaultExecutorScheduler extends {
+
+ private val rt = Runtime.getRuntime()
+ private val minNumThreads = 4
+
+ /** The value of the actors.corePoolSize JVM property. This property
+ * determines the initial thread pool size.
+ */
+ private val coreProp = try {
+ System.getProperty("actors.corePoolSize")
+ } catch {
+ case ace: java.security.AccessControlException =>
+ null
+ }
+
+ private val maxProp =
+ try {
+ System.getProperty("actors.maxPoolSize")
+ } catch {
+ case ace: java.security.AccessControlException =>
+ null
+ }
+
+ private val initCoreSize =
+ if (null ne coreProp) Integer.parseInt(coreProp)
+ else {
+ val numCores = rt.availableProcessors()
+ if (2 * numCores > minNumThreads)
+ 2 * numCores
+ else
+ minNumThreads
+ }
+
+ private val maxSize =
+ if (null ne maxProp) Integer.parseInt(maxProp)
+ else 256
+
+ private val coreSize = initCoreSize
+
+ private val workQueue = new LinkedBlockingQueue[Runnable]
+
+ private val threadPool = new ThreadPoolExecutor(coreSize,
+ maxSize,
+ 50L,
+ TimeUnit.MILLISECONDS,
+ workQueue)
+ } with ExecutorScheduler(threadPool) {
+ override val CHECK_FREQ = 50
+ }
diff --git a/src/actors/scala/actors/ExecutorScheduler.scala b/src/actors/scala/actors/ExecutorScheduler.scala
new file mode 100644
index 0000000000..98bd953118
--- /dev/null
+++ b/src/actors/scala/actors/ExecutorScheduler.scala
@@ -0,0 +1,35 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+import java.util.concurrent.ExecutorService
+
+/**
+ * The <code>ExecutorScheduler</code> class uses an
+ * <code>ExecutorService</code> to execute <code>Actor</code>s.
+ *
+ * @author Philipp Haller
+ */
+class ExecutorScheduler(executor: ExecutorService) extends SchedulerService {
+
+ /** Submits a <code>Runnable</code> for execution.
+ *
+ * @param task the task to be executed
+ */
+ def execute(task: Runnable): Unit =
+ executor execute task
+
+ /** This method is called when the <code>SchedulerService</code>
+ * shuts down.
+ */
+ def onShutdown(): Unit =
+ executor.shutdown()
+}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index c4ebe85861..d16107fdb2 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -14,8 +14,8 @@ import compat.Platform
import java.lang.Runnable
/**
- * The <code>Scheduler</code> object is used by
- * <code>Actor</code> to execute tasks of an execution of an actor.
+ * The <code>Scheduler</code> object is used by <code>Actor</code> to
+ * execute tasks of an execution of an actor.
*
* @version 0.9.18
* @author Philipp Haller
@@ -25,7 +25,7 @@ object Scheduler extends IScheduler {
Debug.info("initializing "+this+"...")
private var sched: IScheduler = {
- val s = new FJTaskScheduler2
+ val s = new DefaultExecutorScheduler
s.start()
s
}
@@ -108,7 +108,7 @@ object Scheduler extends IScheduler {
* for all schedulers used to execute actor tasks.
*
* Subclasses of <code>Actor</code> that override its
- * <code>scheduler</code> member value must provide
+ * <code>scheduler</code> member must provide
* an implementation of the <code>IScheduler</code>
* trait.
*
diff --git a/src/actors/scala/actors/SchedulerService.scala b/src/actors/scala/actors/SchedulerService.scala
index f885e30b02..7b0640896a 100644
--- a/src/actors/scala/actors/SchedulerService.scala
+++ b/src/actors/scala/actors/SchedulerService.scala
@@ -13,10 +13,10 @@ package scala.actors
import java.lang.{Runnable, Thread, InterruptedException}
/**
- * The abstract <code>SchedulerService</code> class allows
- * subclasses to implement a custom <code>onShutdown</code>
- * method, which is invoked when the runtime system has detected
- * that all actors have been terminated.
+ * The abstract <code>SchedulerService</code> class allows subclasses
+ * to implement a custom <code>onShutdown</code> method, which is
+ * invoked when the runtime system has detected that all actors have
+ * been terminated.
*
* @version 0.9.18
* @author Philipp Haller
@@ -78,12 +78,14 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler
}
}
- /** Submits a <code>Runnable</code> for execution.
+ /** Submits a closure for execution.
*
- * @param task the task to be executed
+ * @param fun the closure to be executed
*/
- def execute(task: Runnable): Unit =
- execute { task.run() }
+ def execute(fun: => Unit): Unit =
+ execute(new Runnable {
+ def run() { fun }
+ })
/** Shuts down the scheduler.
*/