summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/actors/scala/actors/AbstractActor.scala5
-rw-r--r--src/actors/scala/actors/Actor.scala201
-rw-r--r--src/actors/scala/actors/IScheduler.scala6
-rw-r--r--src/actors/scala/actors/MessageQueue.scala48
-rw-r--r--src/actors/scala/actors/ReactChannel.scala2
-rw-r--r--src/actors/scala/actors/Reactor.scala79
-rw-r--r--src/actors/scala/actors/ReactorTask.scala2
-rw-r--r--src/actors/scala/actors/ReplyReactor.scala102
-rw-r--r--src/actors/scala/actors/ReplyReactorTask.scala2
-rw-r--r--src/actors/scala/actors/Replyable.scala34
-rw-r--r--src/actors/scala/actors/ReplyableActor.scala10
-rw-r--r--src/actors/scala/actors/ReplyableReactor.scala12
-rw-r--r--src/actors/scala/actors/SchedulerAdapter.scala6
-rw-r--r--src/actors/scala/actors/package.scala3
-rw-r--r--src/actors/scala/actors/scheduler/ActorGC.scala16
-rw-r--r--src/actors/scala/actors/scheduler/DelegatingScheduler.scala6
-rw-r--r--src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala6
-rw-r--r--src/actors/scala/actors/scheduler/TerminationMonitor.scala8
-rw-r--r--test/files/jvm/actor-getstate.check2
-rw-r--r--test/files/jvm/actor-getstate.scala73
-rw-r--r--test/files/jvm/reactor-exceptionOnSend.scala4
-rw-r--r--test/files/jvm/reactor-producer-consumer.scala10
-rw-r--r--test/files/jvm/reactor.scala6
-rw-r--r--test/files/jvm/replyablereactor2.scala2
-rw-r--r--test/files/jvm/replyablereactor3.scala2
-rw-r--r--test/files/jvm/replyablereactor4.scala2
26 files changed, 396 insertions, 253 deletions
diff --git a/src/actors/scala/actors/AbstractActor.scala b/src/actors/scala/actors/AbstractActor.scala
index 9c72b307fa..9cc62a1cde 100644
--- a/src/actors/scala/actors/AbstractActor.scala
+++ b/src/actors/scala/actors/AbstractActor.scala
@@ -13,12 +13,11 @@ package scala.actors
/**
* The <code>AbstractActor</code> trait.
*
- * @version 0.9.18
* @author Philipp Haller
*/
-trait AbstractActor extends OutputChannel[Any] with Replyable[Any, Any] {
+trait AbstractActor extends OutputChannel[Any] with CanReply[Any, Any] {
- private[actors] var exiting = false
+ private[actors] def exiting: Boolean = false
private[actors] def linkTo(to: AbstractActor): Unit
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 6c8647daaa..69e3bd243e 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -24,7 +24,35 @@ import java.util.concurrent.{ExecutionException, Callable}
*/
object Actor extends Combinators {
- private[actors] val tl = new ThreadLocal[Reactor]
+ /** An actor state. An actor can be in one of the following states:
+ * <ul>
+ * <li>New<br>
+ * An actor that has not yet started is in this state.</li>
+ * <li>Runnable<br>
+ * An actor executing is in this state.</li>
+ * <li>Suspended<br>
+ * An actor that is suspended waiting in a react is in this state.</li>
+ * <li>TimedSuspended<br>
+ * An actor that is suspended waiting in a reactWithin is in this state.</li>
+ * <li>Blocked<br>
+ * An actor that is blocked waiting in a receive is in this state.</li>
+ * <li>TimedBlocked<br>
+ * An actor that is blocked waiting in a receiveWithin is in this state.</li>
+ * <li>Terminated<br>
+ * An actor that has terminated is in this state.</li>
+ * </ul>
+ */
+ object State extends Enumeration {
+ val New,
+ Runnable,
+ Suspended,
+ TimedSuspended,
+ Blocked,
+ TimedBlocked,
+ Terminated = Value
+ }
+
+ private[actors] val tl = new ThreadLocal[ReplyReactor]
// timer thread runs as daemon
private[actors] val timer = new Timer(true)
@@ -43,9 +71,10 @@ object Actor extends Combinators {
private[actors] def self(sched: IScheduler): Actor =
rawSelf(sched).asInstanceOf[Actor]
- private[actors] def rawSelf: Reactor = rawSelf(Scheduler)
+ private[actors] def rawSelf: ReplyReactor =
+ rawSelf(Scheduler)
- private[actors] def rawSelf(sched: IScheduler): Reactor = {
+ private[actors] def rawSelf(sched: IScheduler): ReplyReactor = {
val s = tl.get
if (s eq null) {
val r = new ActorProxy(currentThread, sched)
@@ -208,7 +237,7 @@ object Actor extends Combinators {
def eventloop(f: PartialFunction[Any, Unit]): Nothing =
rawSelf.react(new RecursiveProxyHandler(rawSelf, f))
- private class RecursiveProxyHandler(a: Reactor, f: PartialFunction[Any, Unit])
+ private class RecursiveProxyHandler(a: ReplyReactor, f: PartialFunction[Any, Unit])
extends PartialFunction[Any, Unit] {
def isDefinedAt(m: Any): Boolean =
true // events are immediately removed from the mailbox
@@ -222,21 +251,21 @@ object Actor extends Combinators {
* Returns the actor which sent the last received message.
*/
def sender: OutputChannel[Any] =
- rawSelf.asInstanceOf[ReplyReactor].sender
+ rawSelf.sender
/**
* Send <code>msg</code> to the actor waiting in a call to
* <code>!?</code>.
*/
def reply(msg: Any): Unit =
- rawSelf.asInstanceOf[ReplyReactor].reply(msg)
+ rawSelf.reply(msg)
/**
* Send <code>()</code> to the actor waiting in a call to
* <code>!?</code>.
*/
def reply(): Unit =
- rawSelf.asInstanceOf[ReplyReactor].reply(())
+ rawSelf.reply(())
/**
* Returns the number of messages in <code>self</code>'s mailbox
@@ -328,7 +357,7 @@ object Actor extends Combinators {
* <code>Exit(self, 'normal)</code> to <code>a</code>.
* </p>
*/
- def exit(): Nothing = self.exit()
+ def exit(): Nothing = rawSelf.exit()
}
@@ -371,12 +400,6 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
@volatile
private var received: Option[Any] = None
- /* This option holds a TimerTask when the actor waits in a
- * reactWithin/receiveWithin. The TimerTask is cancelled when
- * the actor can continue.
- */
- private var onTimeout: Option[TimerTask] = None
-
protected[actors] override def scheduler: IScheduler = Scheduler
private[actors] override def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: PartialFunction[Any, Any]) =
@@ -390,17 +413,6 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
private[actors] override def makeReaction(fun: () => Unit): Runnable =
new ActorTask(this, fun)
- private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
- synchronized {
- if (!onTimeout.isEmpty) {
- onTimeout.get.cancel()
- onTimeout = None
- }
- }
- senders = List(item._2)
- super.resumeReceiver(item, handler, onSameThread)
- }
-
/**
* Receives a message from this actor's mailbox.
*
@@ -530,89 +542,18 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
result
}
- /**
- * Receives a message from this actor's mailbox.
- * <p>
- * This method never returns. Therefore, the rest of the computation
- * has to be contained in the actions of the partial function.
- *
- * @param f a partial function with message patterns and actions
- */
- override def react(f: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
+ override def react(handler: PartialFunction[Any, Unit]): Nothing = {
synchronized {
- if (shouldExit) exit() // links
- drainSendBuffer(mailbox)
+ if (shouldExit) exit()
}
- searchMailbox(mailbox, f, false)
- throw Actor.suspendException
+ super.react(handler)
}
- /**
- * Receives a message from this actor's mailbox within a certain
- * time span.
- * <p>
- * This method never returns. Therefore, the rest of the computation
- * has to be contained in the actions of the partial function.
- *
- * @param msec the time span before timeout
- * @param f a partial function with message patterns and actions
- */
- def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
-
+ override def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = {
synchronized {
- if (shouldExit) exit() // links
- drainSendBuffer(mailbox)
- }
-
- // first, remove spurious TIMEOUT message from mailbox if any
- mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)
-
- val receiveTimeout = () => {
- if (f.isDefinedAt(TIMEOUT)) {
- senders = List(this)
- scheduleActor(f, TIMEOUT)
- } else
- error("unhandled timeout")
- }
-
- var done = false
- while (!done) {
- val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
- senders = List(replyTo)
- f.isDefinedAt(m)
- })
- if (null eq qel) {
- val todo = synchronized {
- // in mean time new stuff might have arrived
- if (!sendBuffer.isEmpty) {
- drainSendBuffer(mailbox)
- // keep going
- () => {}
- } else if (msec == 0L) {
- done = true
- receiveTimeout
- } else {
- waitingFor = f
- val thisActor = this
- onTimeout = Some(new TimerTask {
- def run() { thisActor.send(TIMEOUT, thisActor) }
- })
- Actor.timer.schedule(onTimeout.get, msec)
- done = true
- () => {}
- }
- }
- todo()
- } else {
- senders = List(qel.session)
- scheduleActor(f, qel.msg)
- done = true
- }
+ if (shouldExit) exit()
}
-
- throw Actor.suspendException
+ super.reactWithin(msec)(handler)
}
/**
@@ -660,25 +601,44 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
notify()
}
+ private[actors] override def exiting = synchronized {
+ _state == Actor.State.Terminated
+ }
+
/**
* Starts this actor.
*/
override def start(): Actor = synchronized {
- // Reset various flags.
- //
- // Note that we do *not* reset `trapExit`. The reason is that
- // users should be able to set the field in the constructor
- // and before `act` is called.
- exitReason = 'normal
- exiting = false
- shouldExit = false
+ if (_state == Actor.State.New) {
+ _state = Actor.State.Runnable
+
+ // Reset various flags.
+ //
+ // Note that we do *not* reset `trapExit`. The reason is that
+ // users should be able to set the field in the constructor
+ // and before `act` is called.
+ exitReason = 'normal
+ shouldExit = false
- scheduler newActor this
- scheduler execute (new Reaction(this))
+ scheduler newActor this
+ scheduler execute (new Reaction(this))
- this
+ this
+ } else
+ this
}
+ override def getState: Actor.State.Value = synchronized {
+ if (isSuspended) {
+ if (onTimeout.isEmpty)
+ Actor.State.Blocked
+ else
+ Actor.State.TimedBlocked
+ } else
+ super.getState
+ }
+
+ // guarded by this
private[actors] var links: List[AbstractActor] = Nil
/**
@@ -728,8 +688,11 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
links = links.filterNot(from.==)
}
+ @volatile
var trapExit = false
+ // guarded by this
private var exitReason: AnyRef = 'normal
+ // guarded by this
private[actors] var shouldExit = false
/**
@@ -749,7 +712,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
* <code>reason != 'normal</code>.
* </p>
*/
- protected[actors] def exit(reason: AnyRef): Nothing = {
+ protected[actors] def exit(reason: AnyRef): Nothing = synchronized {
exitReason = reason
exit()
}
@@ -757,17 +720,16 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
/**
* Terminates with exit reason <code>'normal</code>.
*/
- protected[actors] override def exit(): Nothing = {
- // links
+ protected[actors] override def exit(): Nothing = synchronized {
if (!links.isEmpty)
exitLinked()
- terminated()
- throw Actor.suspendException
+ super.exit()
}
// Assume !links.isEmpty
+ // guarded by this
private[actors] def exitLinked() {
- exiting = true
+ _state = Actor.State.Terminated
// remove this from links
val mylinks = links.filterNot(this.==)
// exit linked processes
@@ -779,6 +741,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
}
// Assume !links.isEmpty
+ // guarded by this
private[actors] def exitLinked(reason: AnyRef) {
exitReason = reason
exitLinked()
diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala
index 7a47c670e7..1718dab045 100644
--- a/src/actors/scala/actors/IScheduler.scala
+++ b/src/actors/scala/actors/IScheduler.scala
@@ -49,14 +49,14 @@ trait IScheduler {
*
* @param a the actor to be registered
*/
- def newActor(a: Reactor): Unit
+ def newActor(a: TrackedReactor): Unit
/** Unregisters an actor from this scheduler, because it
* has terminated.
*
* @param a the actor to be registered
*/
- def terminated(a: Reactor): Unit
+ def terminated(a: TrackedReactor): Unit
/** Registers a closure to be executed when the specified
* actor terminates.
@@ -64,7 +64,7 @@ trait IScheduler {
* @param a the actor
* @param f the closure to be registered
*/
- def onTerminate(a: Reactor)(f: => Unit): Unit
+ def onTerminate(a: TrackedReactor)(f: => Unit): Unit
def managedBlock(blocker: scala.concurrent.ManagedBlocker): Unit
diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala
index 000ff1bfc6..2c1c2446e6 100644
--- a/src/actors/scala/actors/MessageQueue.scala
+++ b/src/actors/scala/actors/MessageQueue.scala
@@ -18,14 +18,14 @@ package scala.actors
*/
@serializable @SerialVersionUID(7124278808020037465L)
@deprecated("this class is going to be removed in a future release")
-class MessageQueueElement(msg: Any, session: OutputChannel[Any], next: MessageQueueElement) extends MQueueElement(msg, session, next) {
+class MessageQueueElement(msg: Any, session: OutputChannel[Any], next: MessageQueueElement) extends MQueueElement[Any](msg, session, next) {
def this() = this(null, null, null)
def this(msg: Any, session: OutputChannel[Any]) = this(msg, session, null)
}
-private[actors] class MQueueElement(val msg: Any, val session: OutputChannel[Any], var next: MQueueElement) {
+private[actors] class MQueueElement[Msg >: Null](val msg: Msg, val session: OutputChannel[Any], var next: MQueueElement[Msg]) {
def this() = this(null, null, null)
- def this(msg: Any, session: OutputChannel[Any]) = this(msg, session, null)
+ def this(msg: Msg, session: OutputChannel[Any]) = this(msg, session, null)
}
/**
@@ -38,11 +38,11 @@ private[actors] class MQueueElement(val msg: Any, val session: OutputChannel[Any
*/
@serializable @SerialVersionUID(2168935872884095767L)
@deprecated("this class is going to be removed in a future release")
-class MessageQueue(label: String) extends MQueue(label)
+class MessageQueue(label: String) extends MQueue[Any](label)
-private[actors] class MQueue(protected val label: String) {
- protected var first: MQueueElement = null
- protected var last: MQueueElement = null // last eq null iff list is empty
+private[actors] class MQueue[Msg >: Null](protected val label: String) {
+ protected var first: MQueueElement[Msg] = null
+ protected var last: MQueueElement[Msg] = null // last eq null iff list is empty
private var _size = 0
def size = _size
@@ -52,7 +52,7 @@ private[actors] class MQueue(protected val label: String) {
_size += diff
}
- def append(msg: Any, session: OutputChannel[Any]) {
+ def append(msg: Msg, session: OutputChannel[Any]) {
changeSize(1) // size always increases by 1
val el = new MQueueElement(msg, session)
@@ -62,7 +62,7 @@ private[actors] class MQueue(protected val label: String) {
last = el
}
- def append(el: MQueueElement) {
+ def append(el: MQueueElement[Msg]) {
changeSize(1) // size always increases by 1
if (isEmpty) first = el
@@ -71,7 +71,7 @@ private[actors] class MQueue(protected val label: String) {
last = el
}
- def foreach(f: (Any, OutputChannel[Any]) => Unit) {
+ def foreach(f: (Msg, OutputChannel[Any]) => Unit) {
var curr = first
while (curr != null) {
f(curr.msg, curr.session)
@@ -79,7 +79,7 @@ private[actors] class MQueue(protected val label: String) {
}
}
- def foreachAppend(target: MQueue) {
+ def foreachAppend(target: MQueue[Msg]) {
var curr = first
while (curr != null) {
target.append(curr)
@@ -87,7 +87,7 @@ private[actors] class MQueue(protected val label: String) {
}
}
- def foreachDequeue(target: MQueue) {
+ def foreachDequeue(target: MQueue[Msg]) {
var curr = first
while (curr != null) {
target.append(curr)
@@ -98,7 +98,7 @@ private[actors] class MQueue(protected val label: String) {
_size = 0
}
- def foldLeft[B](z: B)(f: (B, Any) => B): B = {
+ def foldLeft[B](z: B)(f: (B, Msg) => B): B = {
var acc = z
var curr = first
while (curr != null) {
@@ -111,10 +111,10 @@ private[actors] class MQueue(protected val label: String) {
/** Returns the n-th message that satisfies the predicate <code>p</code>
* without removing it.
*/
- def get(n: Int)(p: Any => Boolean): Option[Any] = {
+ def get(n: Int)(p: Msg => Boolean): Option[Msg] = {
var pos = 0
- def test(msg: Any): Boolean =
+ def test(msg: Msg): Boolean =
p(msg) && (pos == n || { pos += 1; false })
var curr = first
@@ -127,16 +127,16 @@ private[actors] class MQueue(protected val label: String) {
/** Removes the n-th message that satisfies the predicate <code>p</code>.
*/
- def remove(n: Int)(p: (Any, OutputChannel[Any]) => Boolean): Option[(Any, OutputChannel[Any])] =
+ def remove(n: Int)(p: (Msg, OutputChannel[Any]) => Boolean): Option[(Msg, OutputChannel[Any])] =
removeInternal(n)(p) map (x => (x.msg, x.session))
/** Extracts the first message that satisfies the predicate <code>p</code>
* or <code>null</code> if <code>p</code> fails for all of them.
*/
- def extractFirst(p: (Any, OutputChannel[Any]) => Boolean): MQueueElement =
+ def extractFirst(p: (Msg, OutputChannel[Any]) => Boolean): MQueueElement[Msg] =
removeInternal(0)(p) orNull
- def extractFirst(pf: PartialFunction[Any, Any]): MQueueElement = {
+ def extractFirst(pf: PartialFunction[Msg, Any]): MQueueElement[Msg] = {
if (isEmpty) // early return
return null
@@ -173,14 +173,14 @@ private[actors] class MQueue(protected val label: String) {
}
}
- private def removeInternal(n: Int)(p: (Any, OutputChannel[Any]) => Boolean): Option[MQueueElement] = {
+ private def removeInternal(n: Int)(p: (Msg, OutputChannel[Any]) => Boolean): Option[MQueueElement[Msg]] = {
var pos = 0
- def foundMsg(x: MQueueElement) = {
+ def foundMsg(x: MQueueElement[Msg]) = {
changeSize(-1)
Some(x)
}
- def test(msg: Any, session: OutputChannel[Any]): Boolean =
+ def test(msg: Msg, session: OutputChannel[Any]): Boolean =
p(msg, session) && (pos == n || { pos += 1 ; false })
if (isEmpty) // early return
@@ -220,7 +220,7 @@ private[actors] class MQueue(protected val label: String) {
/** Debugging trait.
*/
-private[actors] trait MessageQueueTracer extends MQueue
+private[actors] trait MessageQueueTracer extends MQueue[Any]
{
private val queueNumber = MessageQueueTracer.getQueueNumber
@@ -238,7 +238,7 @@ private[actors] trait MessageQueueTracer extends MQueue
printQueue("REMOVE %s" format res)
res
}
- override def extractFirst(p: (Any, OutputChannel[Any]) => Boolean): MQueueElement = {
+ override def extractFirst(p: (Any, OutputChannel[Any]) => Boolean): MQueueElement[Any] = {
val res = super.extractFirst(p)
printQueue("EXTRACT_FIRST %s" format res)
res
@@ -253,7 +253,7 @@ private[actors] trait MessageQueueTracer extends MQueue
override def toString() = "%s:%d".format(label, queueNumber)
}
-object MessageQueueTracer {
+private[actors] object MessageQueueTracer {
// for tracing purposes
private var queueNumberAssigner = 0
private def getQueueNumber = synchronized {
diff --git a/src/actors/scala/actors/ReactChannel.scala b/src/actors/scala/actors/ReactChannel.scala
index 8bbbc04f53..dc31e99711 100644
--- a/src/actors/scala/actors/ReactChannel.scala
+++ b/src/actors/scala/actors/ReactChannel.scala
@@ -15,7 +15,7 @@ package scala.actors
*
* @author Philipp Haller
*/
-private[actors] class ReactChannel[Msg](receiver: Reactor) extends InputChannel[Msg] {
+private[actors] class ReactChannel[Msg](receiver: ReplyReactor) extends InputChannel[Msg] {
private case class SendToReactor(channel: ReactChannel[Msg], msg: Msg)
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index 71ecd00e0a..c20f6dd41a 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -51,13 +51,13 @@ private[actors] object Reactor {
*
* @author Philipp Haller
*/
-trait Reactor extends OutputChannel[Any] with Combinators {
+trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators {
/* The actor's mailbox. */
- private[actors] val mailbox = new MQueue("Reactor")
+ private[actors] val mailbox = new MQueue[Msg]("Reactor")
// guarded by this
- private[actors] val sendBuffer = new MQueue("SendBuffer")
+ private[actors] val sendBuffer = new MQueue[Msg]("SendBuffer")
/* Whenever this actor executes on some thread, `waitingFor` is
* guaranteed to be equal to `Reactor.waitingForNone`.
@@ -71,12 +71,15 @@ trait Reactor extends OutputChannel[Any] with Combinators {
*
* guarded by this
*/
- private[actors] var waitingFor: PartialFunction[Any, Any] =
+ private[actors] var waitingFor: PartialFunction[Msg, Any] =
Reactor.waitingForNone
+ // guarded by this
+ private[actors] var _state: Actor.State.Value = Actor.State.New
+
/**
- * The behavior of an actor is specified by implementing this
- * abstract method.
+ * The behavior of a <code>Reactor</code> is specified by implementing
+ * this method.
*/
def act(): Unit
@@ -96,7 +99,7 @@ trait Reactor extends OutputChannel[Any] with Combinators {
* @param msg the message to send
* @param replyTo the reply destination
*/
- def send(msg: Any, replyTo: OutputChannel[Any]) {
+ def send(msg: Msg, replyTo: OutputChannel[Any]) {
val todo = synchronized {
if (waitingFor ne Reactor.waitingForNone) {
val savedWaitingFor = waitingFor
@@ -110,9 +113,9 @@ trait Reactor extends OutputChannel[Any] with Combinators {
todo()
}
- private[actors] def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: PartialFunction[Any, Any]) =
+ private[actors] def startSearch(msg: Msg, replyTo: OutputChannel[Any], handler: PartialFunction[Msg, Any]) =
() => scheduler execute (makeReaction(() => {
- val startMbox = new MQueue("Start")
+ val startMbox = new MQueue[Msg]("Start")
synchronized { startMbox.append(msg, replyTo) }
searchMailbox(startMbox, handler, true)
}))
@@ -120,7 +123,7 @@ trait Reactor extends OutputChannel[Any] with Combinators {
private[actors] def makeReaction(fun: () => Unit): Runnable =
new ReactorTask(this, fun)
- private[actors] def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
+ private[actors] def resumeReceiver(item: (Msg, OutputChannel[Any]), handler: PartialFunction[Msg, Any], onSameThread: Boolean) {
if (onSameThread)
handler(item._1)
else {
@@ -136,23 +139,23 @@ trait Reactor extends OutputChannel[Any] with Combinators {
}
}
- def !(msg: Any) {
+ def !(msg: Msg) {
send(msg, null)
}
- def forward(msg: Any) {
+ def forward(msg: Msg) {
send(msg, null)
}
def receiver: Actor = this.asInstanceOf[Actor]
// guarded by this
- private[actors] def drainSendBuffer(mbox: MQueue) {
+ private[actors] def drainSendBuffer(mbox: MQueue[Msg]) {
sendBuffer.foreachDequeue(mbox)
}
- private[actors] def searchMailbox(startMbox: MQueue,
- handler: PartialFunction[Any, Any],
+ private[actors] def searchMailbox(startMbox: MQueue[Msg],
+ handler: PartialFunction[Msg, Any],
resumeOnSameThread: Boolean) {
var tmpMbox = startMbox
var done = false
@@ -164,7 +167,7 @@ trait Reactor extends OutputChannel[Any] with Combinators {
synchronized {
// in mean time new stuff might have arrived
if (!sendBuffer.isEmpty) {
- tmpMbox = new MQueue("Temp")
+ tmpMbox = new MQueue[Msg]("Temp")
drainSendBuffer(tmpMbox)
// keep going
} else {
@@ -186,9 +189,17 @@ trait Reactor extends OutputChannel[Any] with Combinators {
}
}
- protected[actors] def react(f: PartialFunction[Any, Unit]): Nothing = {
+ /**
+ * Receives a message from this actor's mailbox.
+ * <p>
+ * This method never returns. Therefore, the rest of the computation
+ * has to be contained in the actions of the partial function.
+ *
+ * @param handler a partial function with message patterns and actions
+ */
+ protected[actors] def react(handler: PartialFunction[Msg, Unit]): Nothing = {
synchronized { drainSendBuffer(mailbox) }
- searchMailbox(mailbox, f, false)
+ searchMailbox(mailbox, handler, false)
throw Actor.suspendException
}
@@ -199,15 +210,30 @@ trait Reactor extends OutputChannel[Any] with Combinators {
*
* never throws SuspendActorControl
*/
- private[actors] def scheduleActor(handler: PartialFunction[Any, Any], msg: Any) = {
+ private[actors] def scheduleActor(handler: PartialFunction[Msg, Any], msg: Msg) = {
val fun = () => handler(msg): Unit
scheduler executeFromActor makeReaction(fun)
}
- def start(): Reactor = {
- scheduler newActor this
- scheduler execute makeReaction(() => act())
- this
+ def start(): Reactor[Msg] = synchronized {
+ if (_state == Actor.State.New) {
+ _state = Actor.State.Runnable
+ scheduler newActor this
+ scheduler execute makeReaction(() => act())
+ this
+ } else
+ this
+ }
+
+ /** Returns the execution state of this actor.
+ *
+ * @return the execution state
+ */
+ def getState: Actor.State.Value = synchronized {
+ if (waitingFor ne Reactor.waitingForNone)
+ Actor.State.Suspended
+ else
+ _state
}
implicit def mkBody[A](body: => A) = new Actor.Body[A] {
@@ -230,19 +256,22 @@ trait Reactor extends OutputChannel[Any] with Combinators {
// to avoid stack overflow:
// instead of directly executing `next`,
// schedule as continuation
- scheduleActor({ case _ => next }, 1)
+ scheduleActor({ case _ => next }, null)
throw Actor.suspendException
}
first
throw new KillActorControl
}
- protected[this] def exit(): Nothing = {
+ protected[actors] def exit(): Nothing = {
terminated()
throw Actor.suspendException
}
private[actors] def terminated() {
+ synchronized {
+ _state = Actor.State.Terminated
+ }
scheduler.terminated(this)
}
diff --git a/src/actors/scala/actors/ReactorTask.scala b/src/actors/scala/actors/ReactorTask.scala
index 87ec7834fa..7d4409199a 100644
--- a/src/actors/scala/actors/ReactorTask.scala
+++ b/src/actors/scala/actors/ReactorTask.scala
@@ -21,7 +21,7 @@ import scala.concurrent.forkjoin.RecursiveAction
*
* @author Philipp Haller
*/
-private[actors] class ReactorTask[T >: Null <: Reactor](var reactor: T, var fun: () => Any)
+private[actors] class ReactorTask[T >: Null <: TrackedReactor](var reactor: T, var fun: () => Any)
extends RecursiveAction with Callable[Unit] with Runnable {
def run() {
diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala
index 05e4590162..196f7b1f4c 100644
--- a/src/actors/scala/actors/ReplyReactor.scala
+++ b/src/actors/scala/actors/ReplyReactor.scala
@@ -10,6 +10,8 @@
package scala.actors
+import java.util.{Timer, TimerTask}
+
/** <p>
* The <code>ReplyReactor</code> trait extends the <code>Reactor</code>
* trait with methods to reply to the sender of a message.
@@ -19,17 +21,26 @@ package scala.actors
*
* @author Philipp Haller
*/
-trait ReplyReactor extends Reactor with ReplyableReactor {
+trait ReplyReactor extends Reactor[Any] with ReplyableReactor {
/* A list of the current senders. The head of the list is
* the sender of the message that was received last.
*/
@volatile
- private[actors] var senders: List[OutputChannel[Any]] =
- Nil
+ private[actors] var senders: List[OutputChannel[Any]] = List()
+
+ /* This option holds a TimerTask when the actor waits in a
+ * reactWithin. The TimerTask is cancelled when the actor
+ * resumes.
+ *
+ * guarded by this
+ */
+ private[actors] var onTimeout: Option[TimerTask] = None
- protected[actors] def sender: OutputChannel[Any] =
- senders.head
+ /**
+ * Returns the actor which sent the last received message.
+ */
+ protected[actors] def sender: OutputChannel[Any] = senders.head
/**
* Replies with <code>msg</code> to the sender.
@@ -53,17 +64,17 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
}
private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
- senders = List(item._2)
- if (onSameThread)
- handler(item._1)
- else {
- scheduleActor(handler, item._1)
- // see Reactor.resumeReceiver
- throw Actor.suspendException
+ synchronized {
+ if (!onTimeout.isEmpty) {
+ onTimeout.get.cancel()
+ onTimeout = None
+ }
}
+ senders = List(item._2)
+ super.resumeReceiver(item, handler, onSameThread)
}
- private[actors] override def searchMailbox(startMbox: MQueue,
+ private[actors] override def searchMailbox(startMbox: MQueue[Any],
handler: PartialFunction[Any, Any],
resumeOnSameThread: Boolean) {
var tmpMbox = startMbox
@@ -79,7 +90,7 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
synchronized {
// in mean time new stuff might have arrived
if (!sendBuffer.isEmpty) {
- tmpMbox = new MQueue("Temp")
+ tmpMbox = new MQueue[Any]("Temp")
drainSendBuffer(tmpMbox)
// keep going
} else {
@@ -98,4 +109,67 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
private[actors] override def makeReaction(fun: () => Unit): Runnable =
new ReplyReactorTask(this, fun)
+ protected[actors] override def react(handler: PartialFunction[Any, Unit]): Nothing = {
+ assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
+ super.react(handler)
+ }
+
+ /**
+ * Receives a message from this actor's mailbox within a certain
+ * time span.
+ * <p>
+ * This method never returns. Therefore, the rest of the computation
+ * has to be contained in the actions of the partial function.
+ *
+ * @param msec the time span before timeout
+ * @param handler a partial function with message patterns and actions
+ */
+ protected[actors] def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = {
+ assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
+
+ synchronized { drainSendBuffer(mailbox) }
+
+ // first, remove spurious TIMEOUT message from mailbox if any
+ mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)
+
+ while (true) {
+ val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
+ senders = List(replyTo)
+ handler isDefinedAt m
+ })
+ if (null eq qel) {
+ synchronized {
+ // in mean time new messages might have arrived
+ if (!sendBuffer.isEmpty) {
+ drainSendBuffer(mailbox)
+ // keep going
+ } else if (msec == 0L) {
+ // throws Actor.suspendException
+ resumeReceiver((TIMEOUT, this), handler, false)
+ } else {
+ waitingFor = handler
+ val thisActor = this
+ onTimeout = Some(new TimerTask {
+ def run() { thisActor.send(TIMEOUT, thisActor) }
+ })
+ Actor.timer.schedule(onTimeout.get, msec)
+ throw Actor.suspendException
+ }
+ }
+ } else
+ resumeReceiver((qel.msg, qel.session), handler, false)
+ }
+ throw Actor.suspendException
+ }
+
+ override def getState: Actor.State.Value = synchronized {
+ if (waitingFor ne Reactor.waitingForNone) {
+ if (onTimeout.isEmpty)
+ Actor.State.Suspended
+ else
+ Actor.State.TimedSuspended
+ } else
+ _state
+ }
+
}
diff --git a/src/actors/scala/actors/ReplyReactorTask.scala b/src/actors/scala/actors/ReplyReactorTask.scala
index 934a87435b..2ad0884166 100644
--- a/src/actors/scala/actors/ReplyReactorTask.scala
+++ b/src/actors/scala/actors/ReplyReactorTask.scala
@@ -18,7 +18,7 @@ package scala.actors
*/
private[actors] class ReplyReactorTask[T >: Null <: ReplyReactor](reactor: T, fun: () => Unit) extends ReactorTask[ReplyReactor](reactor, fun) {
- var saved: Reactor = _
+ var saved: ReplyReactor = _
protected override def beginExecution() {
saved = Actor.tl.get
diff --git a/src/actors/scala/actors/Replyable.scala b/src/actors/scala/actors/Replyable.scala
index 2c7e55e06a..23f0a5319b 100644
--- a/src/actors/scala/actors/Replyable.scala
+++ b/src/actors/scala/actors/Replyable.scala
@@ -11,16 +11,15 @@
package scala.actors
/**
- * The Replyable trait defines result-bearing message send operations
- * on replyable actors.
+ * The <code>CanReply</code> trait defines result-bearing message send operations.
*
* @author Philipp Haller
*/
-trait Replyable[-T, +R] {
+trait CanReply[-T, +R] {
/**
- * Sends <code>msg</code> to this Replyable and awaits reply
- * (synchronous).
+ * Sends <code>msg</code> to this <code>CanReply</code> and
+ * awaits reply (synchronous).
*
* @param msg the message to be sent
* @return the reply
@@ -28,8 +27,9 @@ trait Replyable[-T, +R] {
def !?(msg: T): R
/**
- * Sends <code>msg</code> to this Replyable and awaits reply
- * (synchronous) within <code>msec</code> milliseconds.
+ * Sends <code>msg</code> to this <code>CanReply</code> and
+ * awaits reply (synchronous) within <code>msec</code>
+ * milliseconds.
*
* @param msec the time span before timeout
* @param msg the message to be sent
@@ -39,8 +39,8 @@ trait Replyable[-T, +R] {
def !?(msec: Long, msg: T): Option[R]
/**
- * Sends <code>msg</code> to this actor and immediately
- * returns a future representing the reply value.
+ * Sends <code>msg</code> to this <code>CanReply</code> and
+ * immediately returns a future representing the reply value.
*
* @param msg the message to be sent
* @return the future
@@ -49,17 +49,17 @@ trait Replyable[-T, +R] {
() => this !? msg
/**
- * Sends <code>msg</code> to this actor and immediately
- * returns a future representing the reply value.
+ * Sends <code>msg</code> to this <code>CanReply</code> and
+ * immediately returns a future representing the reply value.
* The reply is post-processed using the partial function
- * <code>f</code>. This also allows to recover a more
+ * <code>handler</code>. This also allows to recover a more
* precise type for the reply value.
*
- * @param msg the message to be sent
- * @param f the function to be applied to the response
- * @return the future
+ * @param msg the message to be sent
+ * @param handler the function to be applied to the response
+ * @return the future
*/
- def !![P](msg: T, f: PartialFunction[R, P]): () => P =
- () => f(this !? msg)
+ def !![P](msg: T, handler: PartialFunction[R, P]): () => P =
+ () => handler(this !? msg)
}
diff --git a/src/actors/scala/actors/ReplyableActor.scala b/src/actors/scala/actors/ReplyableActor.scala
index 2122dd854b..fa4fa92f96 100644
--- a/src/actors/scala/actors/ReplyableActor.scala
+++ b/src/actors/scala/actors/ReplyableActor.scala
@@ -59,18 +59,18 @@ private[actors] trait ReplyableActor extends ReplyableReactor {
* Sends <code>msg</code> to this actor and immediately
* returns a future representing the reply value.
* The reply is post-processed using the partial function
- * <code>f</code>. This also allows to recover a more
+ * <code>handler</code>. This also allows to recover a more
* precise type for the reply value.
*/
- override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
+ override def !![A](msg: Any, handler: PartialFunction[Any, A]): Future[A] = {
val ftch = new Channel[A](Actor.self(thiz.scheduler))
thiz.send(msg, new OutputChannel[Any] {
def !(msg: Any) =
- ftch ! f(msg)
+ ftch ! handler(msg)
def send(msg: Any, replyTo: OutputChannel[Any]) =
- ftch.send(f(msg), replyTo)
+ ftch.send(handler(msg), replyTo)
def forward(msg: Any) =
- ftch.forward(f(msg))
+ ftch.forward(handler(msg))
def receiver =
ftch.receiver
})
diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala
index ecca50e26d..bfe16eb395 100644
--- a/src/actors/scala/actors/ReplyableReactor.scala
+++ b/src/actors/scala/actors/ReplyableReactor.scala
@@ -17,7 +17,7 @@ package scala.actors
*
* @author Philipp Haller
*/
-private[actors] trait ReplyableReactor extends Replyable[Any, Any] {
+private[actors] trait ReplyableReactor extends CanReply[Any, Any] {
_: ReplyReactor =>
/**
@@ -67,27 +67,27 @@ private[actors] trait ReplyableReactor extends Replyable[Any, Any] {
* Sends <code>msg</code> to this actor and immediately
* returns a future representing the reply value.
* The reply is post-processed using the partial function
- * <code>f</code>. This also allows to recover a more
+ * <code>handler</code>. This also allows to recover a more
* precise type for the reply value.
*/
- override def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = {
+ override def !![A](msg: Any, handler: PartialFunction[Any, A]): Future[A] = {
val myself = Actor.rawSelf(this.scheduler)
val ftch = new ReactChannel[A](myself)
val res = new scala.concurrent.SyncVar[A]
val out = new OutputChannel[Any] {
def !(msg: Any) = {
- val msg1 = f(msg)
+ val msg1 = handler(msg)
ftch ! msg1
res set msg1
}
def send(msg: Any, replyTo: OutputChannel[Any]) = {
- val msg1 = f(msg)
+ val msg1 = handler(msg)
ftch.send(msg1, replyTo)
res set msg1
}
def forward(msg: Any) = {
- val msg1 = f(msg)
+ val msg1 = handler(msg)
ftch forward msg1
res set msg1
}
diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala
index f88d15bb38..698096c556 100644
--- a/src/actors/scala/actors/SchedulerAdapter.scala
+++ b/src/actors/scala/actors/SchedulerAdapter.scala
@@ -42,7 +42,7 @@ trait SchedulerAdapter extends IScheduler {
*
* @param a the actor to be registered
*/
- def newActor(a: Reactor) =
+ def newActor(a: TrackedReactor) =
Scheduler.newActor(a)
/** Unregisters an actor from this scheduler, because it
@@ -50,7 +50,7 @@ trait SchedulerAdapter extends IScheduler {
*
* @param a the actor to be unregistered
*/
- def terminated(a: Reactor) =
+ def terminated(a: TrackedReactor) =
Scheduler.terminated(a)
/** Registers a closure to be executed when the specified
@@ -59,7 +59,7 @@ trait SchedulerAdapter extends IScheduler {
* @param a the actor
* @param f the closure to be registered
*/
- def onTerminate(a: Reactor)(f: => Unit) =
+ def onTerminate(a: TrackedReactor)(f: => Unit) =
Scheduler.onTerminate(a)(f)
def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
diff --git a/src/actors/scala/actors/package.scala b/src/actors/scala/actors/package.scala
index eadc75c433..6ffd17da14 100644
--- a/src/actors/scala/actors/package.scala
+++ b/src/actors/scala/actors/package.scala
@@ -2,6 +2,9 @@ package scala
package object actors {
+ // type of Reactors tracked by termination detector
+ private[actors] type TrackedReactor = Reactor[A] forSome { type A >: Null }
+
@deprecated("use scala.actors.scheduler.ForkJoinScheduler instead")
type FJTaskScheduler2 = scala.actors.scheduler.ForkJoinScheduler
diff --git a/src/actors/scala/actors/scheduler/ActorGC.scala b/src/actors/scala/actors/scheduler/ActorGC.scala
index 09fe2f1209..0fe94b09c9 100644
--- a/src/actors/scala/actors/scheduler/ActorGC.scala
+++ b/src/actors/scala/actors/scheduler/ActorGC.scala
@@ -29,19 +29,19 @@ trait ActorGC extends TerminationMonitor {
self: IScheduler =>
/** Actors are added to refQ in newActor. */
- private val refQ = new ReferenceQueue[Reactor]
+ private val refQ = new ReferenceQueue[TrackedReactor]
/**
* This is a set of references to all the actors registered with
* this ActorGC. It is maintained so that the WeakReferences will not be GC'd
* before the actors to which they point.
*/
- private val refSet = new HashSet[Reference[t] forSome { type t <: Reactor }]
+ private val refSet = new HashSet[Reference[t] forSome { type t <: TrackedReactor }]
/** newActor is invoked whenever a new actor is started. */
- override def newActor(a: Reactor) = synchronized {
+ override def newActor(a: TrackedReactor) = synchronized {
// registers a reference to the actor with the ReferenceQueue
- val wr = new WeakReference[Reactor](a, refQ)
+ val wr = new WeakReference[TrackedReactor](a, refQ)
refSet += wr
activeActors += 1
}
@@ -71,20 +71,20 @@ trait ActorGC extends TerminationMonitor {
activeActors <= 0
}
- override def onTerminate(a: Reactor)(f: => Unit): Unit = synchronized {
+ override def onTerminate(a: TrackedReactor)(f: => Unit): Unit = synchronized {
terminationHandlers += (a -> (() => f))
}
- override def terminated(a: Reactor) = {
+ override def terminated(a: TrackedReactor) = {
super.terminated(a)
synchronized {
// find the weak reference that points to the terminated actor, if any
- refSet.find((ref: Reference[t] forSome { type t <: Reactor }) => ref.get() == a) match {
+ refSet.find((ref: Reference[t] forSome { type t <: TrackedReactor }) => ref.get() == a) match {
case Some(r) =>
// invoking clear will not cause r to be enqueued
r.clear()
- refSet -= r.asInstanceOf[Reference[t] forSome { type t <: Reactor }]
+ refSet -= r.asInstanceOf[Reference[t] forSome { type t <: TrackedReactor }]
case None =>
// do nothing
}
diff --git a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala
index d1c99d7c13..193b1e3e2b 100644
--- a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala
+++ b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala
@@ -48,7 +48,7 @@ private[actors] trait DelegatingScheduler extends IScheduler {
}
}
- def newActor(actor: Reactor) = synchronized {
+ def newActor(actor: TrackedReactor) = synchronized {
val createNew = if (sched eq null)
true
else sched.synchronized {
@@ -65,9 +65,9 @@ private[actors] trait DelegatingScheduler extends IScheduler {
}
}
- def terminated(actor: Reactor) = impl.terminated(actor)
+ def terminated(actor: TrackedReactor) = impl.terminated(actor)
- def onTerminate(actor: Reactor)(f: => Unit) = impl.onTerminate(actor)(f)
+ def onTerminate(actor: TrackedReactor)(f: => Unit) = impl.onTerminate(actor)(f)
override def managedBlock(blocker: ManagedBlocker): Unit =
impl.managedBlock(blocker)
diff --git a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala
index 9eca972e90..f91351d1f5 100644
--- a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala
+++ b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala
@@ -54,11 +54,11 @@ class SingleThreadedScheduler extends IScheduler {
isShutdown = true
}
- def newActor(actor: Reactor) {}
- def terminated(actor: Reactor) {}
+ def newActor(actor: TrackedReactor) {}
+ def terminated(actor: TrackedReactor) {}
// TODO: run termination handlers at end of shutdown.
- def onTerminate(actor: Reactor)(f: => Unit) {}
+ def onTerminate(actor: TrackedReactor)(f: => Unit) {}
def isActive =
!isShutdown
diff --git a/src/actors/scala/actors/scheduler/TerminationMonitor.scala b/src/actors/scala/actors/scheduler/TerminationMonitor.scala
index 67f72371a3..82897f7afd 100644
--- a/src/actors/scala/actors/scheduler/TerminationMonitor.scala
+++ b/src/actors/scala/actors/scheduler/TerminationMonitor.scala
@@ -17,11 +17,11 @@ private[scheduler] trait TerminationMonitor {
_: IScheduler =>
protected var activeActors = 0
- protected val terminationHandlers = new HashMap[Reactor, () => Unit]
+ protected val terminationHandlers = new HashMap[TrackedReactor, () => Unit]
private var started = false
/** newActor is invoked whenever a new actor is started. */
- def newActor(a: Reactor) = synchronized {
+ def newActor(a: TrackedReactor) = synchronized {
activeActors += 1
if (!started)
started = true
@@ -33,7 +33,7 @@ private[scheduler] trait TerminationMonitor {
* @param a the actor
* @param f the closure to be registered
*/
- def onTerminate(a: Reactor)(f: => Unit): Unit = synchronized {
+ def onTerminate(a: TrackedReactor)(f: => Unit): Unit = synchronized {
terminationHandlers += (a -> (() => f))
}
@@ -41,7 +41,7 @@ private[scheduler] trait TerminationMonitor {
*
* @param a the actor that has terminated
*/
- def terminated(a: Reactor) = {
+ def terminated(a: TrackedReactor) = {
// obtain termination handler (if any)
val todo = synchronized {
terminationHandlers.get(a) match {
diff --git a/test/files/jvm/actor-getstate.check b/test/files/jvm/actor-getstate.check
new file mode 100644
index 0000000000..2c94e48371
--- /dev/null
+++ b/test/files/jvm/actor-getstate.check
@@ -0,0 +1,2 @@
+OK
+OK
diff --git a/test/files/jvm/actor-getstate.scala b/test/files/jvm/actor-getstate.scala
new file mode 100644
index 0000000000..9de3247653
--- /dev/null
+++ b/test/files/jvm/actor-getstate.scala
@@ -0,0 +1,73 @@
+import scala.actors.{Reactor, Actor, TIMEOUT}
+
+object Test {
+
+ def assert(cond: => Boolean) {
+ if (!cond)
+ println("FAIL")
+ }
+
+ def main(args: Array[String]) {
+ val a = new Reactor[Any] {
+ def act() {
+ assert(getState == Actor.State.Runnable)
+ react {
+ case 'go =>
+ println("OK")
+ }
+ }
+ }
+ assert(a.getState == Actor.State.New)
+
+ a.start()
+ Thread.sleep(100)
+ assert(a.getState == Actor.State.Suspended)
+
+ a ! 'go
+ Thread.sleep(100)
+ assert(a.getState == Actor.State.Terminated)
+
+ val b = new Actor {
+ def act() {
+ assert(getState == Actor.State.Runnable)
+ react {
+ case 'go =>
+ reactWithin(100000) {
+ case TIMEOUT =>
+ case 'go =>
+ receive {
+ case 'go =>
+ }
+ receiveWithin(100000) {
+ case TIMEOUT =>
+ case 'go =>
+ println("OK")
+ }
+ }
+ }
+ }
+ }
+ assert(b.getState == Actor.State.New)
+
+ b.start()
+ Thread.sleep(100)
+ assert(b.getState == Actor.State.Suspended)
+
+ b ! 'go
+ Thread.sleep(100)
+ assert(b.getState == Actor.State.TimedSuspended)
+
+ b ! 'go
+ Thread.sleep(100)
+ assert(b.getState == Actor.State.Blocked)
+
+ b ! 'go
+ Thread.sleep(100)
+ assert(b.getState == Actor.State.TimedBlocked)
+
+ b ! 'go
+ Thread.sleep(100)
+ assert(b.getState == Actor.State.Terminated)
+ }
+
+}
diff --git a/test/files/jvm/reactor-exceptionOnSend.scala b/test/files/jvm/reactor-exceptionOnSend.scala
index 3684943b9b..c89aab334b 100644
--- a/test/files/jvm/reactor-exceptionOnSend.scala
+++ b/test/files/jvm/reactor-exceptionOnSend.scala
@@ -3,7 +3,7 @@ import scala.actors.Actor._
case class MyException(text: String) extends Exception(text)
-object A extends Reactor {
+object A extends Reactor[Any] {
override def exceptionHandler = {
case MyException(text) =>
println("receiver handles exception")
@@ -29,7 +29,7 @@ object A extends Reactor {
}
}
-object B extends Reactor {
+object B extends Reactor[Any] {
def act() {
A.start()
A ! 'hello
diff --git a/test/files/jvm/reactor-producer-consumer.scala b/test/files/jvm/reactor-producer-consumer.scala
index 946e1561ce..0d33043fc6 100644
--- a/test/files/jvm/reactor-producer-consumer.scala
+++ b/test/files/jvm/reactor-producer-consumer.scala
@@ -2,10 +2,10 @@ import scala.actors.Reactor
object Test {
case class Stop()
- case class Get(from: Reactor)
+ case class Get(from: Reactor[Any])
case class Put(x: Int)
- class UnboundedBuffer extends Reactor {
+ class UnboundedBuffer extends Reactor[Any] {
def act() {
react {
case Stop() =>
@@ -20,7 +20,7 @@ object Test {
}
}
- class Producer(buf: UnboundedBuffer, n: Int, delay: Long, parent: Reactor) extends Reactor {
+ class Producer(buf: UnboundedBuffer, n: Int, delay: Long, parent: Reactor[Any]) extends Reactor[Any] {
def act() {
var i = 0
while (i < n) {
@@ -32,7 +32,7 @@ object Test {
}
}
- class Consumer(buf: UnboundedBuffer, n: Int, delay: Long, parent: Reactor) extends Reactor {
+ class Consumer(buf: UnboundedBuffer, n: Int, delay: Long, parent: Reactor[Any]) extends Reactor[Any] {
val step = n / 10
var i = 0
def act() {
@@ -53,7 +53,7 @@ object Test {
}
def main(args: Array[String]) {
- val parent = new Reactor {
+ val parent = new Reactor[Any] {
def act() {
val buffer = new UnboundedBuffer
buffer.start()
diff --git a/test/files/jvm/reactor.scala b/test/files/jvm/reactor.scala
index 919263b65a..12d5c7c221 100644
--- a/test/files/jvm/reactor.scala
+++ b/test/files/jvm/reactor.scala
@@ -1,7 +1,7 @@
import scala.actors.Reactor
-case class Ping(from: Reactor)
+case class Ping(from: Reactor[Any])
case object Pong
case object Stop
@@ -19,7 +19,7 @@ object Test {
}
}
-class PingActor(count: Int, pong: Reactor) extends Reactor {
+class PingActor(count: Int, pong: Reactor[Any]) extends Reactor[Any] {
def act() {
var pingsLeft = count - 1
pong ! Ping(this)
@@ -41,7 +41,7 @@ class PingActor(count: Int, pong: Reactor) extends Reactor {
}
}
-class PongActor extends Reactor {
+class PongActor extends Reactor[Any] {
def act() {
var pongCount = 0
loop {
diff --git a/test/files/jvm/replyablereactor2.scala b/test/files/jvm/replyablereactor2.scala
index 22622274dd..57b7cfe201 100644
--- a/test/files/jvm/replyablereactor2.scala
+++ b/test/files/jvm/replyablereactor2.scala
@@ -19,7 +19,7 @@ object Test {
val a = new MyActor
a.start()
- val b = new Reactor {
+ val b = new Reactor[Any] {
def act() {
react {
case r: MyActor =>
diff --git a/test/files/jvm/replyablereactor3.scala b/test/files/jvm/replyablereactor3.scala
index 676ffe98e6..b33db811e2 100644
--- a/test/files/jvm/replyablereactor3.scala
+++ b/test/files/jvm/replyablereactor3.scala
@@ -19,7 +19,7 @@ object Test {
val a = new MyActor
a.start()
- val b = new Reactor {
+ val b = new Reactor[Any] {
def act() {
react {
case r: MyActor =>
diff --git a/test/files/jvm/replyablereactor4.scala b/test/files/jvm/replyablereactor4.scala
index d61fb64287..dc24f5e88d 100644
--- a/test/files/jvm/replyablereactor4.scala
+++ b/test/files/jvm/replyablereactor4.scala
@@ -19,7 +19,7 @@ object Test {
val a = new MyActor
a.start()
- val b = new Reactor {
+ val b = new Reactor[Any] {
def act() {
react {
case r: MyActor =>