summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-01-10 10:04:32 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-01-10 10:04:32 +0000
commit1828ef43100374982b6313ae94b885616ccf85a1 (patch)
tree1c10ffd31919f1acf4c4979bf3f8648f2cc79785 /src/actors
parentf75cbd338f81a00ab9696fb0482fd561ce1a0826 (diff)
downloadscala-1828ef43100374982b6313ae94b885616ccf85a1.tar.gz
scala-1828ef43100374982b6313ae94b885616ccf85a1.tar.bz2
scala-1828ef43100374982b6313ae94b885616ccf85a1.zip
Fixed bug in andThen.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala37
-rw-r--r--src/actors/scala/actors/ActorProxy.scala6
-rw-r--r--src/actors/scala/actors/Channel.scala153
-rw-r--r--src/actors/scala/actors/Reaction.scala29
-rw-r--r--src/actors/scala/actors/Scheduler.scala15
5 files changed, 171 insertions, 69 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 3e8bf190c3..478466f916 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -171,12 +171,12 @@ object Actor {
private[actors] trait Body[a] {
def orElse[b >: a](other: => b): b
- def andThen[b >: a](other: => b): b
+ def andThen[b](other: => b): Nothing
}
implicit def mkBody[a](body: => a) = new Body[a] {
def orElse[b >: a](other: => b): b = choose(body, other)
- def andThen[b >: a](other: => b): b = seq(body, other)
+ def andThen[b](other: => b): Nothing = seq(body, other)
}
private[actors] def choose[a, b >: a](alt1: => a, alt2: => b): b = {
@@ -223,6 +223,7 @@ object Actor {
val s = self
s.kill = () => { body; s.kill() }
body
+ exit("normal")
}
/**
@@ -232,11 +233,12 @@ object Actor {
* @param first ...
* @param next ...
*/
- def seq[a, b >: a](first: => a, next: => b): b = {
+ def seq[a, b](first: => a, next: => b): Nothing = {
val s = self
val killNext = s.kill
s.kill = () => { s.kill = killNext; next; s.kill() }
first
+ exit("normal")
}
/**
@@ -279,7 +281,7 @@ object Actor {
* <code>!reason.equals("normal")</code>.
* </p>
*/
- def exit(reason: String): Unit = self.exit(reason)
+ def exit(reason: String): Nothing = self.exit(reason)
}
case class Request[a](msg: a) {
@@ -401,7 +403,7 @@ trait Actor extends OutputChannel[Any] {
try {
wait()
} catch {
- case t: InterruptedException =>
+ case _: InterruptedException =>
}
}
}
@@ -418,7 +420,7 @@ trait Actor extends OutputChannel[Any] {
fromExc = false
wait(waittime)
} catch {
- case t: InterruptedException => {
+ case _: InterruptedException => {
fromExc = true
val now = Platform.currentTime
val waited = now-ts
@@ -450,23 +452,6 @@ trait Actor extends OutputChannel[Any] {
def start(): Unit =
Scheduler start new Reaction(this)
-
- /*
- * Debugging support.
- */
- private[actors] var name = ""
-
- private var childCnt = 0
-
- private[actors] def nextChildName = {
- val s = childCnt + name
- childCnt = childCnt + 1
- s
- }
-
- private[actors] def setName(n: String) =
- name = n
-
private val links = new HashSet[Actor]
/**
@@ -528,9 +513,11 @@ trait Actor extends OutputChannel[Any] {
* <code>!reason.equals("normal")</code>.
* </p>
*/
- def exit(reason: String): Unit = {
+ def exit(reason: String): Nothing = {
+ kill()
exitReason = reason
- currentThread.interrupt()
+ //currentThread.interrupt()
+ throw new ExitActorException
}
private[actors] def exit(from: Actor, reason: String): Unit = {
diff --git a/src/actors/scala/actors/ActorProxy.scala b/src/actors/scala/actors/ActorProxy.scala
index 2cfd12cb0a..0cdbcde587 100644
--- a/src/actors/scala/actors/ActorProxy.scala
+++ b/src/actors/scala/actors/ActorProxy.scala
@@ -46,8 +46,10 @@ private[actors] class ActorProxy(t: Thread) extends Actor {
*
* @param reason the exit reason of the interrupted thread.
*/
- override def exit(reason: String): Unit = {
+ override def exit(reason: String): Nothing = {
+ kill()
exitReason = reason
- t.interrupt()
+ exitLinked()
+ throw new InterruptedException
}
}
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index cae5f3d1b9..5b5318f615 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -47,9 +47,55 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
private[actors] var isSuspended = false
+ type WaitingPart = Tuple4[Msg=>boolean, // waitingFor
+ Option[Actor], // sender
+ Option[PartialFunction[Any, Unit]],
+ Option[Actor]]
+ var waitingParts: List[WaitingPart] = Nil
+
+ var waitingPart: WaitingPart
+
+ def waitingFor(msg: Msg, sender: Actor): boolean = {
+ waitingPart = waitingParts.find(p => p._1(msg) &&
+ (if (!p._2.isEmpty) p._2.get == sender
+ else true))
+ }
+
+ def waitForNothing() =
+ waitingParts = Nil
+
+ def resumeWaitingPart(msg: Msg, sender: Actor) = {
+ receiver.pushSender(sender)
+ waitingPart._3 match {
+ case Some(cont) => receiver.scheduleActor(cont, msg)
+ case None => // resume blocked thread
+ received = Some(msg)
+ p._4.get.resumeActor()
+ }
+ }
+
//private val messageQueue = new MessageQueue[Msg]
private val mailbox = new scala.collection.mutable.Queue[Pair[Msg, Actor]]
+/*
+ private def send(msg: Msg, sender: Actor) = receiver.synchronized {
+ receiver.tick()
+ if (waitingFor(msg, sender)) {
+ waitForNothing()
+
+ if (receiver.timeoutPending) {
+ receiver.timeoutPending = false
+ TimerThread.trashRequest(receiver)
+ }
+
+ resumeWaitingPart(msg, sender)
+ } else {
+ //messageQueue.append(msg, sender)
+ mailbox += Pair(msg, sender)
+ }
+ }
+*/
+
private def send(msg: Msg, sender: Actor) = receiver.synchronized {
receiver.tick()
if (waitingFor(msg) && ((waitingForSender eq null) ||
@@ -99,6 +145,16 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
}
}
+ def !?(msec: long, msg: Msg): Option[Any] = {
+ Debug.info("rpc with timeout "+msec)
+ Actor.self.freshReply()
+ this ! msg
+ Actor.self.reply.receiveWithinFrom(msec)(receiver) {
+ case TIMEOUT => None
+ case x => Some(x)
+ }
+ }
+
/**
* Forwards <code>msg</code> to <code>this</code> keeping the
* last sender as sender instead of <code>self</code>.
@@ -112,7 +168,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
assert(Actor.self == receiver, "receive from channel belonging to other actor")
receiver.synchronized {
receiver.tick()
- waitingFor = f.isDefinedAt
+
/*
val q = messageQueue.extractFirst(waitingFor)
if (q != null) {
@@ -136,14 +192,17 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
case None => {
// acquire lock because we might call wait()
this.synchronized {
+ waitingFor = f.isDefinedAt
isSuspended = true
+ //val wp = Tuple4(f.isDefinedAt, None, None, Some(receiver))
+ //waitingParts = wp :: waitingParts
receiver.suspendActor()
}
}
}
- isSuspended = false
waitingFor = waitingForNone
+ isSuspended = false
}
val result = f(received.get)
receiver.popSender()
@@ -154,8 +213,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
assert(Actor.self == receiver, "receive from channel belonging to other actor")
receiver.synchronized {
receiver.tick()
- waitingFor = f.isDefinedAt
- waitingForSender = r
+
/*
var q = messageQueue.dequeueFirst((item: MessageQueueResult[Msg]) => {
waitingFor(item.msg) && item.sender == r
@@ -180,15 +238,17 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
case None => {
// acquire lock because we might call wait()
this.synchronized {
+ waitingFor = f.isDefinedAt
+ waitingForSender = r
isSuspended = true
receiver.suspendActor()
}
}
}
- isSuspended = false
waitingFor = waitingForNone
waitingForSender = null
+ isSuspended = false
}
val result = f(received.get)
receiver.popSender()
@@ -205,7 +265,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
assert(Actor.self == receiver, "receive from channel belonging to other actor")
receiver.synchronized {
receiver.tick()
- waitingFor = f.isDefinedAt
+
/*
val q = messageQueue.extractFirst(waitingFor)
if (q != null) {
@@ -236,6 +296,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
case None => {
// acquire lock because we might call wait()
this.synchronized {
+ waitingFor = f.isDefinedAt
isSuspended = true
received = None
receiver.suspendActorFor(msec)
@@ -255,8 +316,75 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
}
}
+ waitingFor = waitingForNone
isSuspended = false
+ }
+ val result = f(received.get)
+ receiver.popSender()
+ result
+ }
+
+ def receiveWithinFrom[R](msec: long)(r: Actor)(f: PartialFunction[Any, R]): R = {
+ assert(Actor.self == receiver, "receive from channel belonging to other actor")
+ receiver.synchronized {
+ receiver.tick()
+
+/*
+ val q = messageQueue.extractFirst(waitingFor)
+ if (q != null) {
+ received = q.msg
+ receiver.pushSender(q.sender)
+ }
+ else synchronized {
+ waitingFor = f.isDefinedAt
+ waitingForSender = r
+ isSuspended = true
+ receiver.suspendActorFor(msec)
+ if (received eq null)
+ if (f.isDefinedAt(TIMEOUT)) {
+ isSuspended = false
+ val result = f(TIMEOUT)
+ return result
+ }
+ else
+ error("unhandled timeout")
+ }
+*/
+
+ mailbox.dequeueFirst((p: Pair[Msg, Actor]) => {
+ waitingFor(p._1) && p._2 == r
+ }) match {
+ case Some(Pair(msg, sender)) => {
+ received = Some(msg)
+ receiver.pushSender(sender)
+ }
+ case None => {
+ // acquire lock because we might call wait()
+ this.synchronized {
+ waitingFor = f.isDefinedAt
+ waitingForSender = r
+ isSuspended = true
+ received = None
+ receiver.suspendActorFor(msec)
+ Debug.info("received: "+received)
+ if (received.isEmpty) {
+ Debug.info("no message received after "+msec+" millis")
+ if (f.isDefinedAt(TIMEOUT)) {
+ Debug.info("executing TIMEOUT action")
+ isSuspended = false
+ val result = f(TIMEOUT)
+ return result
+ }
+ else
+ error("unhandled timeout")
+ }
+ }
+ }
+ }
+
waitingFor = waitingForNone
+ waitingForSender = null
+ isSuspended = false
}
val result = f(received.get)
receiver.popSender()
@@ -271,7 +399,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
Scheduler.pendReaction
receiver.synchronized {
receiver.tick()
- waitingFor = f.isDefinedAt
+
/*
val q = messageQueue.extractFirst(waitingFor)
if (q != null) {
@@ -289,14 +417,15 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
waitingFor(p._1)
}) match {
case Some(Pair(msg, sender)) => {
- received = Some(msg)
receiver.pushSender(sender)
- waitingFor = waitingForNone
- receiver.scheduleActor(f, received.get)
+ receiver.scheduleActor(f, msg)
}
case None => {
this.synchronized {
//Scheduler.detached(receiver)
+ waitingFor = f.isDefinedAt
+ //val wp = Tuple4(f.isDefinedAt, None, Some(f), None)
+ //waitingParts = wp :: waitingParts
receiver.detachActor(f)
}
}
@@ -314,7 +443,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
Scheduler.pendReaction
receiver.synchronized {
receiver.tick()
- waitingFor = f.isDefinedAt
+
/*
val q = messageQueue.extractFirst(waitingFor)
if (q != null) {
@@ -324,6 +453,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
receiver.scheduleActor(f, received)
}
else synchronized {
+ waitingFor = f.isDefinedAt
TimerThread.requestTimeout(receiver.asInstanceOf[Reactor], f, msec)
receiver.asInstanceOf[Reactor].timeoutPending = true
receiver.detachActor(f)
@@ -341,6 +471,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
}
case None => {
this.synchronized {
+ waitingFor = f.isDefinedAt
TimerThread.requestTimeout(receiver, f, msec)
receiver.timeoutPending = true
receiver.detachActor(f)
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index 06410ff899..107b5e7847 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -14,6 +14,8 @@ package scala.actors
import java.lang.{InterruptedException, Runnable}
import java.util.logging.{Level, Logger}
+class ExitActorException extends Throwable
+
/**
* The abstract class <code>Reaction</code> associates
* an instance of an <code>Actor</code> with a
@@ -52,23 +54,16 @@ private[actors] class Reaction(a: Actor,
Scheduler.unPendReaction
a.isDetached = false
try {
- if (f == null)
- a.act()
- else
- f(msg)
-
- if (currentThread.isInterrupted())
- throw new InterruptedException
-
- a.kill()
-
- if (currentThread.isInterrupted())
- throw new InterruptedException
-
- a.exit("normal")
-
- if (currentThread.isInterrupted())
- throw new InterruptedException
+ try {
+ if (f == null)
+ a.act()
+ else
+ f(msg)
+ a.exit("normal")
+ } catch {
+ case _: ExitActorException =>
+ throw new InterruptedException
+ }
}
catch {
case ie: InterruptedException => {
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index b6696f2793..a4b5eb3368 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -121,7 +121,7 @@ class TickedScheduler extends Thread with IScheduler {
def printActorDump {
var num = 0
for (val a <- alive.elements) {
- Console.println("Actor `"+a.name+"' ("+num+"): "+a)
+ Console.println("Actor ("+num+"): "+a)
if (a.isDetached)
Console.println("Detached")
else {
@@ -143,19 +143,6 @@ class TickedScheduler extends Thread with IScheduler {
def start(task: Reaction): unit = synchronized {
Debug.info("Starting " + task.actor)
alive += task.actor
-
- // determine name of actor
- val creator = Actor.self
- if (creator.isInstanceOf[ActorProxy]) {
- // created by Java thread
- // only ok, if it is the main thread
- val tname = currentThread.toString()
- if (tname.indexOf("main") == -1) {
- // print/log warning
- Console.println("Warning: Some debugging features not available if actors are created by non-main Java threads.")
- } else task.actor.name = creator.nextChildName
- } else task.actor.name = creator.nextChildName
-
execute(task)
}