summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-11-13 16:51:14 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-11-13 16:51:14 +0000
commit96c144a972ce52014a6b6a6ebc465052fb3c891d (patch)
tree72a74e3ee7806f25730eec20e21b275e87e0dbba
parentdff1d96421a130e9b3a0cc5dc01cd606a1d2d131 (diff)
downloadscala-96c144a972ce52014a6b6a6ebc465052fb3c891d.tar.gz
scala-96c144a972ce52014a6b6a6ebc465052fb3c891d.tar.bz2
scala-96c144a972ce52014a6b6a6ebc465052fb3c891d.zip
Made other schedulers non-abstract again.
-rw-r--r--src/actors/scala/actors/Actor.scala24
-rw-r--r--src/actors/scala/actors/ActorProxy.scala2
-rw-r--r--src/actors/scala/actors/Channel.scala2
-rw-r--r--src/actors/scala/actors/Reaction.scala6
-rw-r--r--src/actors/scala/actors/Scheduler.scala206
-rw-r--r--src/actors/scala/actors/TimerThread.scala5
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala2
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala2
8 files changed, 129 insertions, 120 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 419f7f30fb..3efa508d65 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -19,7 +19,7 @@ import compat.Platform
* <code>receive</code>, <code>react</code>, <code>reply</code>,
* etc.
*
- * @version Beta2
+ * @version 0.9.0
* @author Philipp Haller
*/
object Actor {
@@ -282,6 +282,13 @@ object Actor {
def exit(reason: String): Unit = self.exit(reason)
}
+case class Request[a](msg: a) {
+ private[actors] val in = new Channel[a]
+ def reply(resp: a): unit = {
+ in ! resp
+ }
+}
+
/**
* <p>
* This class provides (together with <code>Channel</code>) an
@@ -293,7 +300,7 @@ object Actor {
* Philipp Haller, Martin Odersky <i>Proc. JMLC 2006</i>
* </p>
*
- * @version Beta2
+ * @version 0.9.0
* @author Philipp Haller
*/
trait Actor extends OutputChannel[Any] {
@@ -318,8 +325,8 @@ trait Actor extends OutputChannel[Any] {
/**
* The behavior of an actor is specified by implementing this
* abstract method. Note that the preferred way to create actors
- * is through the <code>actor</code> and <code>reactor</code>
- * methods defined in object <code>Actor</code>.
+ * is through the <code>actor</code> method
+ * defined in object <code>Actor</code>.
*/
def act(): Unit
@@ -336,6 +343,13 @@ trait Actor extends OutputChannel[Any] {
*/
def !?(msg: Any): Any = in !? msg
+ def rpc[a](msg: a): a = {
+ Debug.info("Actor.!? called by "+Actor.self)
+ val req = Request(msg)
+ in ! req
+ req.in.?
+ }
+
private val lastSenders = new Stack[Actor]
private[actors] def sender: Actor = {
@@ -545,7 +559,7 @@ trait Actor extends OutputChannel[Any] {
* <code>b</code> terminates and <code>a</code> has
* <code>trapExit</code> set to <code>true</code>.
*
- * @version Beta2
+ * @version 0.9.0
* @author Philipp Haller
*/
case class Exit(from: Actor, reason: String)
diff --git a/src/actors/scala/actors/ActorProxy.scala b/src/actors/scala/actors/ActorProxy.scala
index 6e727e01f0..8609ceab85 100644
--- a/src/actors/scala/actors/ActorProxy.scala
+++ b/src/actors/scala/actors/ActorProxy.scala
@@ -18,7 +18,7 @@ import java.lang.Thread
* This class provides a dynamic actor proxy for normal Java
* threads.
*
- * @version Beta2
+ * @version 0.9.0
* @author Philipp Haller
*/
private[actors] class ActorProxy(t: Thread) extends Actor {
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index 85949db53a..65eccb8ba0 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -29,7 +29,7 @@ class SuspendActorException extends Throwable {
* actors. Only the actor creating an instance of a
* <code>Channel</code> may receive from it.
*
- * @version Beta2
+ * @version 0.9.0
* @author Philipp Haller
*/
class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index 5b7c5e0f66..7fb4372a36 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -24,7 +24,7 @@ import java.util.logging.Level
* the different kinds of tasks used for the execution of
* event-based <code>Actor</code>s.
*
- * @version Beta2
+ * @version 0.9.0
* @author Philipp Haller
*/
private[actors] abstract class Reaction extends Runnable {
@@ -48,7 +48,7 @@ private[actors] abstract class Reaction extends Runnable {
* This class represents task items used to start the execution
* of <code>Actor</code>s.
*
- * @version Beta2
+ * @version 0.9.0
* @author Philipp Haller
*/
private[actors] class StartTask(a: Actor) extends Reaction {
@@ -92,7 +92,7 @@ private[actors] class StartTask(a: Actor) extends Reaction {
* specified in arguments of <code>react</code> and
* <code>reactWithin</code>.
*
- * @version Beta2
+ * @version 0.9.0
* @author Philipp Haller
*/
private[actors] class ActorTask(a: Actor,
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 314f1e2311..e869e2a31e 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -11,14 +11,11 @@
package scala.actors
-import java.lang.{Runnable, Thread}
-import java.lang.InterruptedException
+import compat.Platform
-import java.util.logging.Logger
-import java.util.logging.FileHandler
-import java.util.logging.Level
+import java.lang.{Runnable, Thread, InterruptedException}
+import java.util.logging.{Logger, FileHandler, Level}
-import compat.Platform
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack}
/**
@@ -26,12 +23,11 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack}
* <code>Actor</code> to execute tasks of an execution of a
* reactor.
*
- * @version Beta2
+ * @version 0.9.0
* @author Philipp Haller
*/
object Scheduler {
private var sched: IScheduler =
- //new SpareWorkerScheduler
{
val logger = Logger.getLogger("Scheduler")
logger.addHandler(new FileHandler("sched.log"))
@@ -52,7 +48,7 @@ object Scheduler {
def tick(a: Actor) = sched.tick(a)
- def shutdown(): Unit = sched.shutdown()
+ def shutdown(): unit = sched.shutdown()
def pendReaction: unit = sched.pendReaction
def unPendReaction: unit = sched.unPendReaction
@@ -62,19 +58,19 @@ object Scheduler {
* This abstract class provides a common interface for all
* schedulers used to execute reactors.
*
- * @version Beta2
+ * @version 0.9.0
* @author Philipp Haller
*/
trait IScheduler {
- def execute(task: Reaction): Unit
+ def execute(task: Reaction): unit
def getTask(worker: WorkerThread): Runnable
- def tick(a: Actor): Unit
+ def tick(a: Actor): unit
- def shutdown(): Unit
+ def shutdown(): unit
val QUIT_TASK = new Reaction() {
def actor: Actor = null
- def run(): Unit = {}
+ def run(): unit = {}
override def toString() = "QUIT_TASK"
}
@@ -83,92 +79,6 @@ trait IScheduler {
}
/**
- * This scheduler executes the tasks of a reactor on a single
- * thread (the current thread).
- *
- * @version Beta2
- * @author Philipp Haller
- */
-abstract class SingleThreadedScheduler extends IScheduler {
- def execute(task: Reaction): Unit = {
- // execute task immediately on same thread
- task.run()
- }
-
- def getTask(worker: WorkerThread): Runnable = null
-
- def tick(a: Actor): Unit = {}
-
- def shutdown(): Unit = {}
-}
-
-/**
- * This scheduler creates additional threads whenever there is no
- * idle thread available.
- *
- * @version Beta2
- * @author Philipp Haller
- */
-abstract class SpareWorkerScheduler extends IScheduler {
- private val tasks = new Queue[Reaction]
- private val idle = new Queue[WorkerThread]
- private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
-
- private var terminating = false
-
- def init() = {
- for (val i <- 0 until 2) {
- val worker = new WorkerThread(this)
- workers += worker
- worker.start()
- }
- }
- init()
-
- def execute(task: Reaction): Unit = synchronized {
- if (!terminating) {
- if (idle.length == 0) {
- tasks += task
- val newWorker = new WorkerThread(this)
- workers += newWorker
- newWorker.start()
- }
- else {
- val worker = idle.dequeue
- worker.execute(task)
- }
- }
- }
-
- def getTask(worker: WorkerThread) = synchronized {
- if (terminating)
- QUIT_TASK
- else {
- if (tasks.length > 0) tasks.dequeue
- else {
- idle += worker
- null
- }
- }
- }
-
- def tick(a: Actor): Unit = {}
-
- def shutdown(): Unit = synchronized {
- terminating = true
-
- val idleThreads = idle.elements
- while (idleThreads.hasNext) {
- val worker = idleThreads.next
- worker.running = false
- worker.interrupt()
- // caused deadlock (tries to acquire lock of worker)
- //worker.join()
- }
- }
-}
-
-/**
* The class <code>TickedScheduler</code> ...
*
* @author Philipp Haller
@@ -311,7 +221,7 @@ class TickedScheduler extends Thread with IScheduler {
}
}
- def shutdown(): Unit = synchronized {
+ def shutdown(): unit = synchronized {
terminating = true
val idleThreads = idle.elements
@@ -338,6 +248,96 @@ class QuitException extends Throwable {
/**
+ * This scheduler executes the tasks of a reactor on a single
+ * thread (the current thread).
+ *
+ * @version 0.9.0
+ * @author Philipp Haller
+ */
+class SingleThreadedScheduler extends IScheduler {
+ def execute(task: Reaction): unit = {
+ // execute task immediately on same thread
+ task.run()
+ }
+
+ def getTask(worker: WorkerThread): Runnable = null
+ def tick(a: Actor): Unit = {}
+ def shutdown(): Unit = {}
+ def pendReaction: unit = {}
+ def unPendReaction: unit = {}
+}
+
+/**
+ * This scheduler creates additional threads whenever there is no
+ * idle thread available.
+ *
+ * @version 0.9.0
+ * @author Philipp Haller
+ */
+class SpareWorkerScheduler extends IScheduler {
+ private val tasks = new Queue[Reaction]
+ private val idle = new Queue[WorkerThread]
+ private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
+
+ private var terminating = false
+
+ def init() = {
+ for (val i <- 0 until 2) {
+ val worker = new WorkerThread(this)
+ workers += worker
+ worker.start()
+ }
+ }
+ init()
+
+ def execute(task: Reaction): unit = synchronized {
+ if (!terminating) {
+ if (idle.length == 0) {
+ tasks += task
+ val newWorker = new WorkerThread(this)
+ workers += newWorker
+ newWorker.start()
+ }
+ else {
+ val worker = idle.dequeue
+ worker.execute(task)
+ }
+ }
+ }
+
+ def getTask(worker: WorkerThread) = synchronized {
+ if (terminating)
+ QUIT_TASK
+ else {
+ if (tasks.length > 0) tasks.dequeue
+ else {
+ idle += worker
+ null
+ }
+ }
+ }
+
+ def tick(a: Actor): unit = {}
+
+ def shutdown(): unit = synchronized {
+ terminating = true
+
+ val idleThreads = idle.elements
+ while (idleThreads.hasNext) {
+ val worker = idleThreads.next
+ worker.running = false
+ worker.interrupt()
+ // caused deadlock (tries to acquire lock of worker)
+ //worker.join()
+ }
+ }
+
+ def pendReaction: unit = {}
+ def unPendReaction: unit = {}
+}
+
+
+/**
* <p>
* The class <code>WorkerThread</code> is used by schedulers to execute
* actor tasks on multiple threads.
@@ -386,7 +386,7 @@ class QuitException extends Throwable {
* execution. QED
* </p>
*
- * @version Beta2
+ * @version 0.9.0
* @author Philipp Haller
*/
class WorkerThread(sched: IScheduler) extends Thread {
@@ -398,7 +398,7 @@ class WorkerThread(sched: IScheduler) extends Thread {
notify()
}
- override def run(): Unit =
+ override def run(): unit =
try {
while (running) {
if (task ne null) {
diff --git a/src/actors/scala/actors/TimerThread.scala b/src/actors/scala/actors/TimerThread.scala
index e4588d4a12..0bd51a2c1c 100644
--- a/src/actors/scala/actors/TimerThread.scala
+++ b/src/actors/scala/actors/TimerThread.scala
@@ -15,8 +15,7 @@ package scala.actors
import java.lang.{Runnable, Thread}
import java.lang.InterruptedException
-import java.util.logging.Logger
-import java.util.logging.Level
+import java.util.logging.{Logger, Level}
/**
* This class allows the (local) sending of a message to an actor after
@@ -24,7 +23,7 @@ import java.util.logging.Level
* Note that the library deletes non-received <code>TIMEOUT</code> message if a
* message is received before the time-out occurs.
*
- * @version Beta2
+ * @version 0.9.0
* @author Sebastien Noir
* @author Philipp Haller
*/
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index 020fc1747a..db97b5dd4d 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -53,8 +53,6 @@ class NetKernel(service: Service) {
namedSend(senderNode, receiver, senderName, refmsg)
}
}
- override def !?(msg: Any): Any =
- error("!? not implemented for remote actors.")
}
senderProxy.start()
}
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
index 3b4e437ac8..2d3f8d22cc 100644
--- a/src/actors/scala/actors/remote/RemoteActor.scala
+++ b/src/actors/scala/actors/remote/RemoteActor.scala
@@ -95,8 +95,6 @@ object RemoteActor {
case other =>
error("Cannot send non-AnyRef value remotely.")
}
- override def !?(msg: Any): Any =
- error("!? not implemented for remote actors.")
}
}