summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2008-08-04 14:53:04 +0000
committerPhilipp Haller <hallerp@gmail.com>2008-08-04 14:53:04 +0000
commit8b7c4138c6d90cd4bcac90867141e568041c9de0 (patch)
treee9a1831ffe41aee4520493d9aced6777c8d1d01a
parent699e811f1a1a9970ebed6ee5ec366aa8aefea6cf (diff)
downloadscala-8b7c4138c6d90cd4bcac90867141e568041c9de0.tar.gz
scala-8b7c4138c6d90cd4bcac90867141e568041c9de0.tar.bz2
scala-8b7c4138c6d90cd4bcac90867141e568041c9de0.zip
Actor trait abstracts from scheduling strategy.
-rw-r--r--src/actors/scala/actors/Actor.scala19
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala28
-rw-r--r--src/actors/scala/actors/Scheduler.scala137
-rw-r--r--src/actors/scala/actors/SchedulerAdapter.scala49
-rw-r--r--src/actors/scala/actors/TickedScheduler.scala36
5 files changed, 164 insertions, 105 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 2edb360ae9..3bd27be895 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -19,7 +19,7 @@ import scala.compat.Platform
* <code>receive</code>, <code>react</code>, <code>reply</code>,
* etc.
*
- * @version 0.9.14
+ * @version 0.9.18
* @author Philipp Haller
*/
object Actor {
@@ -350,7 +350,7 @@ object Actor {
* </li>
* </ul>
*
- * @version 0.9.16
+ * @version 0.9.18
* @author Philipp Haller
*/
@serializable
@@ -365,6 +365,9 @@ trait Actor extends AbstractActor {
protected val mailbox = new MessageQueue
private var sessions: List[OutputChannel[Any]] = Nil
+ protected val scheduler: IScheduler =
+ Scheduler
+
/**
* Returns the number of messages in this actor's mailbox
*
@@ -401,7 +404,7 @@ trait Actor extends AbstractActor {
if (isSuspended)
resumeActor()
else // assert continuation != null
- Scheduler.execute(new Reaction(this, continuation, msg))
+ scheduler.execute(new Reaction(this, continuation, msg))
} else {
mailbox.append(msg, replyTo)
}
@@ -708,11 +711,11 @@ trait Actor extends AbstractActor {
val task = new Reaction(this,
if (f eq null) continuation else f,
msg)
- Scheduler execute task
+ scheduler execute task
}
private def tick(): Unit =
- Scheduler tick this
+ scheduler tick this
private[actors] var kill: () => Unit = () => {}
@@ -772,7 +775,11 @@ trait Actor extends AbstractActor {
exiting = false
shouldExit = false
- Scheduler start new Reaction(this)
+ scheduler execute {
+ ActorGC.newActor(Actor.this)
+ (new Reaction(Actor.this)).run()
+ }
+
this
}
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index 34095da299..e7d1d7656d 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -20,7 +20,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
/**
* FJTaskScheduler2
*
- * @version 0.9.12
+ * @version 0.9.18
* @author Philipp Haller
*/
class FJTaskScheduler2 extends Thread with IScheduler {
@@ -139,28 +139,18 @@ class FJTaskScheduler2 extends Thread with IScheduler {
}
/**
- * @param item the task to be executed.
+ * @param task the task to be executed
*/
- def execute(task: Runnable) {
- executor.execute(task)
- }
-
- def start(task: Runnable) {
- if (task.isInstanceOf[Reaction]) {
- val reaction = task.asInstanceOf[Reaction]
- ActorGC.newActor(reaction.a)
- }
- executor.execute(task)
- }
+ def execute(task: Runnable): Unit =
+ executor execute task
- /**
- * @param worker the worker thread executing tasks
- * @return the executed task
- */
- def getTask(worker: WorkerThread) = null
+ def execute(fun: => Unit): Unit =
+ executor.execute(new Runnable {
+ def run() { fun }
+ })
/**
- * @param a the actor
+ * @param a the actor
*/
def tick(a: Actor) {
lastActivity = Platform.currentTime
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 41a69d12e0..12429e83e1 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -8,7 +8,6 @@
// $Id$
-
package scala.actors
import compat.Platform
@@ -22,12 +21,13 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
* The <code>Scheduler</code> object is used by
* <code>Actor</code> to execute tasks of an execution of an actor.
*
- * @version 0.9.10
+ * @version 0.9.18
* @author Philipp Haller
*/
-object Scheduler {
+object Scheduler extends IScheduler {
+
private var sched: IScheduler = {
- var s: IScheduler = new FJTaskScheduler2
+ val s = new FJTaskScheduler2
s.start()
s
}
@@ -35,24 +35,29 @@ object Scheduler {
def impl = sched
def impl_= (scheduler: IScheduler) = {
sched = scheduler
- sched.start()
}
private var tasks: LinkedQueue = null
private var pendingCount = 0
- def snapshot(): Unit = {
- tasks = sched.snapshot()
- pendingCount = ActorGC.getPendingCount
- sched.shutdown()
- }
+ /* Assumes <code>sched</code> holds an instance
+ * of <code>FJTaskScheduler2</code>.
+ */
+ def snapshot(): Unit =
+ if (sched.isInstanceOf[FJTaskScheduler2]) {
+ val fjts = sched.asInstanceOf[FJTaskScheduler2]
+ tasks = fjts.snapshot()
+ pendingCount = ActorGC.getPendingCount
+ fjts.shutdown()
+ } else
+ error("snapshot operation not supported.")
/* Creates an instance of class <code>FJTaskScheduler2</code>
* and submits <code>tasks</code> for execution.
*/
def restart(): Unit = synchronized {
sched = {
- var s: IScheduler = new FJTaskScheduler2
+ val s = new FJTaskScheduler2
ActorGC.setPendingCount(pendingCount)
s.start()
s
@@ -64,30 +69,32 @@ object Scheduler {
tasks = null
}
- /* The following two methods (<code>start</code> and
- * <code>execute</code>) are called from within
- * <code>Actor</code> to submit tasks for execution.
- */
- def start(task: Runnable) = sched.start(task)
+ def execute(task: Runnable) {
+ val t = currentThread
+ if (t.isInstanceOf[FJTaskRunner]) {
+ val tr = t.asInstanceOf[FJTaskRunner]
+ tr.push(new FJTask {
+ def run() { task.run() }
+ })
+ } else
+ sched execute task
+ }
- def execute(task: Runnable) = {
+ def execute(fun: => Unit) {
val t = currentThread
if (t.isInstanceOf[FJTaskRunner]) {
val tr = t.asInstanceOf[FJTaskRunner]
tr.push(new FJTask {
- def run() {
- task.run()
- }
+ def run() { fun }
})
- } else sched.execute(task)
+ } else
+ sched execute { fun }
}
/* This method is used to notify the scheduler
* of library activity by the argument Actor.
- *
- * It is only called from within <code>Actor</code>.
*/
- def tick(a: Actor) = sched.tick(a)
+ def tick(a: Actor) = sched tick a
def shutdown() = sched.shutdown()
@@ -98,22 +105,40 @@ object Scheduler {
/**
- * This abstract class provides a common interface for all
- * schedulers used to execute actor tasks.
+ * The <code>IScheduler</code> trait provides a common interface
+ * for all schedulers used to execute actor tasks.
*
- * @version 0.9.8
+ * Subclasses of <code>Actor</code> that override its
+ * <code>scheduler</code> member value must provide
+ * an implementation of the <code>IScheduler</code>
+ * trait.
+ *
+ * @version 0.9.18
* @author Philipp Haller
*/
trait IScheduler {
- def start(): Unit
- def start(task: Runnable): Unit
+ /** Submits a closure for execution.
+ *
+ * @param fun the closure to be executed
+ */
+ def execute(fun: => Unit): Unit
+
+ /** Submits a <code>Runnable</code> for execution.
+ *
+ * @param task the task to be executed
+ */
def execute(task: Runnable): Unit
- def getTask(worker: WorkerThread): Runnable
+ /** Notifies the scheduler about activity of the
+ * executing actor.
+ *
+ * @param a the active actor
+ */
def tick(a: Actor): Unit
- def snapshot(): LinkedQueue
+ /** Shuts down the scheduler.
+ */
def shutdown(): Unit
def onLockup(handler: () => Unit): Unit
@@ -127,47 +152,36 @@ trait IScheduler {
}
+trait WorkerThreadScheduler extends IScheduler {
+ /**
+ * @param worker the worker thread executing tasks
+ * @return the task to be executed
+ */
+ def getTask(worker: WorkerThread): Runnable
+}
+
+
/**
* This scheduler executes the tasks of an actor on a single
* thread (the current thread).
*
- * @version 0.9.9
+ * @version 0.9.18
* @author Philipp Haller
*/
class SingleThreadedScheduler extends IScheduler {
- def start() {}
- val taskQ = new scala.collection.mutable.Queue[Runnable]
-
- def start(task: Runnable) {
- // execute task immediately on same thread
+ def execute(task: Runnable) {
task.run()
- while (taskQ.length > 0) {
- val nextTask = taskQ.dequeue
- nextTask.run()
- }
}
- def execute(task: Runnable) {
- val a = Actor.tl.get.asInstanceOf[Actor]
- if ((null ne a) && a.isInstanceOf[ActorProxy]) {
- // execute task immediately on same thread
- task.run()
- while (taskQ.length > 0) {
- val nextTask = taskQ.dequeue
- nextTask.run()
- }
- } else {
- // queue task for later execution
- taskQ += task
- }
- }
+ def execute(fun: => Unit): Unit =
+ execute(new Runnable {
+ def run() { fun }
+ })
- def getTask(worker: WorkerThread): Runnable = null
def tick(a: Actor) {}
def shutdown() {}
- def snapshot(): LinkedQueue = { null }
def onLockup(handler: () => Unit) {}
def onLockup(millis: Int)(handler: () => Unit) {}
@@ -176,7 +190,7 @@ class SingleThreadedScheduler extends IScheduler {
/**
- * The <code>QuickException</code> class is used to manage control flow
+ * The <code>QuitException</code> class is used to manage control flow
* of certain schedulers and worker threads.
*
* @version 0.9.8
@@ -190,6 +204,7 @@ private[actors] class QuitException extends Throwable {
override def fillInStackTrace(): Throwable = this
}
+
/**
* <p>
* The class <code>WorkerThread</code> is used by schedulers to execute
@@ -239,10 +254,10 @@ private[actors] class QuitException extends Throwable {
* execution. QED
* </p>
*
- * @version 0.9.8
+ * @version 0.9.18
* @author Philipp Haller
*/
-class WorkerThread(sched: IScheduler) extends Thread {
+class WorkerThread(sched: WorkerThreadScheduler) extends Thread {
private var task: Runnable = null
private[actors] var running = true
@@ -263,7 +278,7 @@ class WorkerThread(sched: IScheduler) extends Thread {
}
}
this.synchronized {
- task = sched.getTask(this)
+ task = sched getTask this
while (task eq null) {
try {
diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala
new file mode 100644
index 0000000000..dc1e442f9a
--- /dev/null
+++ b/src/actors/scala/actors/SchedulerAdapter.scala
@@ -0,0 +1,49 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2008, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.actors
+
+/** The <code>SchedulerAdapter</code> trait is used to adapt
+ * the behavior of the standard <code>Scheduler</code> object.
+ *
+ * Providing an implementation for the
+ * <code>execute(f: => Unit)</code> method is sufficient to
+ * obtain a concrete <code>IScheduler</code> class.
+ *
+ * @version 0.9.18
+ * @author Philipp Haller
+ */
+trait SchedulerAdapter extends IScheduler {
+
+ /** Submits a <code>Runnable</code> for execution.
+ *
+ * @param task the task to be executed
+ */
+ def execute(task: Runnable): Unit =
+ execute { task.run() }
+
+ /** Notifies the scheduler about activity of the
+ * executing actor.
+ *
+ * @param a the active actor
+ */
+ def tick(a: Actor): Unit =
+ Scheduler tick a
+
+ /** Shuts down the scheduler.
+ */
+ def shutdown(): Unit =
+ Scheduler.shutdown()
+
+ def onLockup(handler: () => Unit) {}
+
+ def onLockup(millis: Int)(handler: () => Unit) {}
+
+ def printActorDump {}
+
+}
diff --git a/src/actors/scala/actors/TickedScheduler.scala b/src/actors/scala/actors/TickedScheduler.scala
index 2d90a906cd..3ce1fab731 100644
--- a/src/actors/scala/actors/TickedScheduler.scala
+++ b/src/actors/scala/actors/TickedScheduler.scala
@@ -20,10 +20,16 @@ import scala.compat.Platform
* <p>This scheduler uses a thread pool to execute tasks that are generated
* by the execution of actors.</p>
*
- * @version 0.9.8
+ * Use class <code>FJTaskScheduler2</code> instead.
+ *
+ * @version 0.9.18
* @author Philipp Haller
*/
-class TickedScheduler extends Thread with IScheduler {
+@deprecated
+class TickedScheduler extends Thread with WorkerThreadScheduler {
+ // as long as this thread runs, JVM should not exit
+ setDaemon(false)
+
private val tasks = new Queue[Runnable]
// Worker threads
@@ -35,23 +41,8 @@ class TickedScheduler extends Thread with IScheduler {
private var lastActivity = Platform.currentTime
- private var pendingReactions = 0
- def pendReaction: Unit = synchronized {
- pendingReactions += 1
- }
- def unPendReaction: Unit = synchronized {
- pendingReactions -= 1
- }
-
def printActorDump {}
- def start(task: Runnable): Unit = synchronized {
- pendingReactions += 1
- execute(task)
- }
-
- def terminated(a: Actor) {}
-
private var TICK_FREQ = 5
private var CHECK_FREQ = 50
@@ -83,6 +74,8 @@ class TickedScheduler extends Thread with IScheduler {
if (terminating) throw new QuitException
}
+ ActorGC.gc()
+
if (tasks.length > 0) {
// check if we need more threads
if (Platform.currentTime - lastActivity >= TICK_FREQ) {
@@ -97,9 +90,11 @@ class TickedScheduler extends Thread with IScheduler {
}
} // tasks.length > 0
else {
- if (pendingReactions == 0) {
+ if (ActorGC.allTerminated) {
// if all worker threads idle terminate
if (workers.length == idle.length) {
+ Debug.info(this+": initiating shutdown...")
+
val idleThreads = idle.elements
while (idleThreads.hasNext) {
val worker = idleThreads.next
@@ -135,7 +130,10 @@ class TickedScheduler extends Thread with IScheduler {
}
}
- def snapshot(): LinkedQueue = null
+ def execute(fun: => Unit): Unit =
+ execute(new Runnable {
+ def run() { fun }
+ })
/**
* @param worker the worker thread executing tasks