summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVojin Jovanovic <vojin.jovanovic@epfl.ch>2012-03-19 22:25:26 +0100
committerVojin Jovanovic <vojin.jovanovic@epfl.ch>2012-03-20 20:31:22 +0100
commit66f0679169bd8d5dc749c2288777c5a217ae3d43 (patch)
tree66faeb3cc69b2c725788f4a20a5752493c43e9cf
parentd9d46a8bbb1b30d322057bb513ea4317bda735d3 (diff)
downloadscala-66f0679169bd8d5dc749c2288777c5a217ae3d43.tar.gz
scala-66f0679169bd8d5dc749c2288777c5a217ae3d43.tar.bz2
scala-66f0679169bd8d5dc749c2288777c5a217ae3d43.zip
Prepared actors hierarchy for migration.
Internal nodes added so methods relevant to akka can be overridden. Review by: @phaller
-rw-r--r--src/actors/scala/actors/Actor.scala484
-rw-r--r--src/actors/scala/actors/ActorCanReply.scala2
-rw-r--r--src/actors/scala/actors/ActorTask.scala9
-rw-r--r--src/actors/scala/actors/Channel.scala2
-rw-r--r--src/actors/scala/actors/Combinators.scala2
-rw-r--r--src/actors/scala/actors/InternalActor.scala509
-rw-r--r--src/actors/scala/actors/InternalReplyReactor.scala161
-rw-r--r--src/actors/scala/actors/OutputChannel.scala2
-rw-r--r--src/actors/scala/actors/ReactChannel.scala2
-rw-r--r--src/actors/scala/actors/Reactor.scala2
-rw-r--r--src/actors/scala/actors/ReactorCanReply.scala2
-rw-r--r--src/actors/scala/actors/ReplyReactor.scala165
-rw-r--r--src/actors/scala/actors/ReplyReactorTask.scala4
-rw-r--r--src/actors/scala/actors/UncaughtException.scala2
14 files changed, 702 insertions, 646 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index b746f68268..aab533ae8d 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -46,7 +46,7 @@ object Actor extends Combinators {
Terminated = Value
}
- private[actors] val tl = new ThreadLocal[ReplyReactor]
+ private[actors] val tl = new ThreadLocal[InternalReplyReactor]
// timer thread runs as daemon
private[actors] val timer = new Timer(true)
@@ -59,15 +59,15 @@ object Actor extends Combinators {
*
* @return returns the currently executing actor.
*/
- def self: Actor = self(Scheduler)
+ def self: Actor = self(Scheduler).asInstanceOf[Actor]
- private[actors] def self(sched: IScheduler): Actor =
- rawSelf(sched).asInstanceOf[Actor]
+ private[actors] def self(sched: IScheduler): InternalActor =
+ rawSelf(sched).asInstanceOf[InternalActor]
- private[actors] def rawSelf: ReplyReactor =
+ private[actors] def rawSelf: InternalReplyReactor =
rawSelf(Scheduler)
- private[actors] def rawSelf(sched: IScheduler): ReplyReactor = {
+ private[actors] def rawSelf(sched: IScheduler): InternalReplyReactor = {
val s = tl.get
if (s eq null) {
val r = new ActorProxy(Thread.currentThread, sched)
@@ -245,7 +245,7 @@ object Actor extends Combinators {
def eventloop(f: PartialFunction[Any, Unit]): Nothing =
rawSelf.react(new RecursiveProxyHandler(rawSelf, f))
- private class RecursiveProxyHandler(a: ReplyReactor, f: PartialFunction[Any, Unit])
+ private class RecursiveProxyHandler(a: InternalReplyReactor, f: PartialFunction[Any, Unit])
extends scala.runtime.AbstractPartialFunction[Any, Unit] {
def _isDefinedAt(m: Any): Boolean =
true // events are immediately removed from the mailbox
@@ -259,7 +259,7 @@ object Actor extends Combinators {
* Returns the actor which sent the last received message.
*/
def sender: OutputChannel[Any] =
- rawSelf.sender
+ rawSelf.internalSender
/**
* Sends `msg` to the actor waiting in a call to `!?`.
@@ -302,7 +302,7 @@ object Actor extends Combinators {
def andThen[b](other: => b): Unit
}
- implicit def mkBody[a](body: => a) = new Body[a] {
+ implicit def mkBody[a](body: => a) = new InternalActor.Body[a] {
def andThen[b](other: => b): Unit = rawSelf.seq(body, other)
}
@@ -397,476 +397,12 @@ object Actor extends Combinators {
* @define channel actor's mailbox
*/
@SerialVersionUID(-781154067877019505L)
-trait Actor extends AbstractActor with ReplyReactor with ActorCanReply with InputChannel[Any] with Serializable {
-
- /* The following two fields are only used when the actor
- * suspends by blocking its underlying thread, for example,
- * when waiting in a receive or synchronous send.
- */
- @volatile
- private var isSuspended = false
-
- /* This field is used to communicate the received message from
- * the invocation of send to the place where the thread of
- * the receiving actor resumes inside receive/receiveWithin.
- */
- @volatile
- private var received: Option[Any] = None
-
- protected[actors] override def scheduler: IScheduler = Scheduler
-
- private[actors] override def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: PartialFunction[Any, Any]) =
- if (isSuspended) {
- () => synchronized {
- mailbox.append(msg, replyTo)
- resumeActor()
- }
- } else super.startSearch(msg, replyTo, handler)
-
- // we override this method to check `shouldExit` before suspending
- private[actors] override def searchMailbox(startMbox: MQueue[Any],
- handler: PartialFunction[Any, Any],
- resumeOnSameThread: Boolean) {
- var tmpMbox = startMbox
- var done = false
- while (!done) {
- val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => {
- senders = List(replyTo)
- handler.isDefinedAt(msg)
- })
- if (tmpMbox ne mailbox)
- tmpMbox.foreach((m, s) => mailbox.append(m, s))
- if (null eq qel) {
- synchronized {
- // in mean time new stuff might have arrived
- if (!sendBuffer.isEmpty) {
- tmpMbox = new MQueue[Any]("Temp")
- drainSendBuffer(tmpMbox)
- // keep going
- } else {
- // very important to check for `shouldExit` at this point
- // since linked actors might have set it after we checked
- // last time (e.g., at the beginning of `react`)
- if (shouldExit) exit()
- waitingFor = handler
- // see Reactor.searchMailbox
- throw Actor.suspendException
- }
- }
- } else {
- resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread)
- done = true
- }
- }
- }
-
- private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable =
- new ActorTask(this, fun, handler, msg)
-
- /** See the companion object's `receive` method. */
- def receive[R](f: PartialFunction[Any, R]): R = {
- assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
-
- synchronized {
- if (shouldExit) exit() // links
- drainSendBuffer(mailbox)
- }
-
- var done = false
- while (!done) {
- val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
- senders = replyTo :: senders
- val matches = f.isDefinedAt(m)
- senders = senders.tail
- matches
- })
- if (null eq qel) {
- synchronized {
- // in mean time new stuff might have arrived
- if (!sendBuffer.isEmpty) {
- drainSendBuffer(mailbox)
- // keep going
- } else {
- waitingFor = f
- isSuspended = true
- scheduler.managedBlock(blocker)
- drainSendBuffer(mailbox)
- // keep going
- }
- }
- } else {
- received = Some(qel.msg)
- senders = qel.session :: senders
- done = true
- }
- }
-
- val result = f(received.get)
- received = None
- senders = senders.tail
- result
- }
-
- /** See the companion object's `receiveWithin` method. */
- def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = {
- assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
-
- synchronized {
- if (shouldExit) exit() // links
- drainSendBuffer(mailbox)
- }
-
- // first, remove spurious TIMEOUT message from mailbox if any
- mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)
-
- val receiveTimeout = () => {
- if (f.isDefinedAt(TIMEOUT)) {
- received = Some(TIMEOUT)
- senders = this :: senders
- } else
- sys.error("unhandled timeout")
- }
-
- var done = false
- while (!done) {
- val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
- senders = replyTo :: senders
- val matches = f.isDefinedAt(m)
- senders = senders.tail
- matches
- })
- if (null eq qel) {
- val todo = synchronized {
- // in mean time new stuff might have arrived
- if (!sendBuffer.isEmpty) {
- drainSendBuffer(mailbox)
- // keep going
- () => {}
- } else if (msec == 0L) {
- done = true
- receiveTimeout
- } else {
- if (onTimeout.isEmpty) {
- if (!f.isDefinedAt(TIMEOUT))
- sys.error("unhandled timeout")
-
- val thisActor = this
- onTimeout = Some(new TimerTask {
- def run() {
- thisActor.send(TIMEOUT, thisActor)
- }
- })
- Actor.timer.schedule(onTimeout.get, msec)
- }
-
- // It is possible that !onTimeout.isEmpty, but TIMEOUT is not yet in mailbox
- // See SI-4759
- waitingFor = f
- received = None
- isSuspended = true
- scheduler.managedBlock(blocker)
- drainSendBuffer(mailbox)
- // keep going
- () => {}
- }
- }
- todo()
- } else {
- synchronized {
- if (!onTimeout.isEmpty) {
- onTimeout.get.cancel()
- onTimeout = None
- }
- }
- received = Some(qel.msg)
- senders = qel.session :: senders
- done = true
- }
- }
-
- val result = f(received.get)
- received = None
- senders = senders.tail
- result
- }
-
- /** See the companion object's `react` method. */
- override def react(handler: PartialFunction[Any, Unit]): Nothing = {
- synchronized {
- if (shouldExit) exit()
- }
- super.react(handler)
- }
-
- /** See the companion object's `reactWithin` method. */
- override def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = {
- synchronized {
- if (shouldExit) exit()
- }
- super.reactWithin(msec)(handler)
- }
-
- /** Receives the next message from the mailbox */
- def ? : Any = receive {
- case x => x
- }
-
- // guarded by lock of this
- // never throws SuspendActorControl
- private[actors] override def scheduleActor(f: PartialFunction[Any, Any], msg: Any) =
- if (f eq null) {
- // do nothing (timeout is handled instead)
- }
- else {
- val task = new ActorTask(this, null, f, msg)
- scheduler executeFromActor task
- }
-
- /* Used for notifying scheduler when blocking inside receive/receiveWithin. */
- private object blocker extends scala.concurrent.ManagedBlocker {
- def block() = {
- Actor.this.suspendActor()
- true
- }
- def isReleasable =
- !Actor.this.isSuspended
- }
-
- private def suspendActor() = synchronized {
- while (isSuspended) {
- try {
- wait()
- } catch {
- case _: InterruptedException =>
- }
- }
- // links: check if we should exit
- if (shouldExit) exit()
- }
-
- private def resumeActor() {
- isSuspended = false
- notify()
- }
-
- private[actors] override def exiting = synchronized {
- _state == Actor.State.Terminated
- }
-
- // guarded by this
- private[actors] override def dostart() {
- // Reset various flags.
- //
- // Note that we do *not* reset `trapExit`. The reason is that
- // users should be able to set the field in the constructor
- // and before `act` is called.
- exitReason = 'normal
- shouldExit = false
-
- super.dostart()
- }
+trait Actor extends InternalActor with ReplyReactor {
override def start(): Actor = synchronized {
super.start()
this
}
- /** State of this actor */
- override def getState: Actor.State.Value = synchronized {
- if (isSuspended) {
- if (onTimeout.isEmpty)
- Actor.State.Blocked
- else
- Actor.State.TimedBlocked
- } else
- super.getState
- }
-
- // guarded by this
- private[actors] var links: List[AbstractActor] = Nil
-
- /**
- * Links `self` to actor `to`.
- *
- * @param to the actor to link to
- * @return the parameter actor
- */
- def link(to: AbstractActor): AbstractActor = {
- assert(Actor.self(scheduler) == this, "link called on actor different from self")
- this linkTo to
- to linkTo this
- to
- }
-
- /**
- * Links `self` to the actor defined by `body`.
- *
- * @param body the body of the actor to link to
- * @return the parameter actor
- */
- def link(body: => Unit): Actor = {
- assert(Actor.self(scheduler) == this, "link called on actor different from self")
- val a = new Actor {
- def act() = body
- override final val scheduler: IScheduler = Actor.this.scheduler
- }
- link(a)
- a.start()
- a
- }
-
- private[actors] def linkTo(to: AbstractActor) = synchronized {
- links = to :: links
- }
-
- /**
- * Unlinks `self` from actor `from`.
- */
- def unlink(from: AbstractActor) {
- assert(Actor.self(scheduler) == this, "unlink called on actor different from self")
- this unlinkFrom from
- from unlinkFrom this
- }
-
- private[actors] def unlinkFrom(from: AbstractActor) = synchronized {
- links = links.filterNot(from.==)
- }
-
- @volatile
- var trapExit = false
- // guarded by this
- private var exitReason: AnyRef = 'normal
- // guarded by this
- private[actors] var shouldExit = false
-
- /**
- * Terminates execution of `self` with the following effect on
- * linked actors:
- *
- * For each linked actor `a` with `trapExit` set to `'''true'''`,
- * send message `Exit(self, reason)` to `a`.
- *
- * For each linked actor `a` with `trapExit` set to `'''false'''`
- * (default), call `a.exit(reason)` if `reason != 'normal`.
- */
- protected[actors] def exit(reason: AnyRef): Nothing = {
- synchronized {
- exitReason = reason
- }
- exit()
- }
-
- /**
- * Terminates with exit reason `'normal`.
- */
- protected[actors] override def exit(): Nothing = {
- val todo = synchronized {
- if (!links.isEmpty)
- exitLinked()
- else
- () => {}
- }
- todo()
- super.exit()
- }
-
- // Assume !links.isEmpty
- // guarded by this
- private[actors] def exitLinked(): () => Unit = {
- _state = Actor.State.Terminated
- // reset waitingFor, otherwise getState returns Suspended
- waitingFor = Reactor.waitingForNone
- // remove this from links
- val mylinks = links.filterNot(this.==)
- // unlink actors
- mylinks.foreach(unlinkFrom(_))
- // return closure that locks linked actors
- () => {
- mylinks.foreach((linked: AbstractActor) => {
- linked.synchronized {
- if (!linked.exiting) {
- linked.unlinkFrom(this)
- linked.exit(this, exitReason)
- }
- }
- })
- }
- }
-
- // Assume !links.isEmpty
- // guarded by this
- private[actors] def exitLinked(reason: AnyRef): () => Unit = {
- exitReason = reason
- exitLinked()
- }
-
- // Assume !this.exiting
- private[actors] def exit(from: AbstractActor, reason: AnyRef) {
- if (trapExit) {
- this ! Exit(from, reason)
- }
- else if (reason != 'normal)
- synchronized {
- shouldExit = true
- exitReason = reason
- // resume this Actor in a way that
- // causes it to exit
- // (because shouldExit == true)
- if (isSuspended)
- resumeActor()
- else if (waitingFor ne Reactor.waitingForNone) {
- waitingFor = Reactor.waitingForNone
- // it doesn't matter what partial function we are passing here
- scheduleActor(waitingFor, null)
- /* Here we should not throw a SuspendActorControl,
- since the current method is called from an actor that
- is in the process of exiting.
-
- Therefore, the contract for scheduleActor is that
- it never throws a SuspendActorControl.
- */
- }
- }
- }
-
- /** Requires qualified private, because `RemoteActor` must
- * register a termination handler.
- */
- private[actors] def onTerminate(f: => Unit) {
- scheduler.onTerminate(this) { f }
- }
}
-
-/**
- * Used as the timeout pattern in
- * <a href="Actor.html#receiveWithin(Long)" target="contentFrame">
- * <code>receiveWithin</code></a> and
- * <a href="Actor.html#reactWithin(Long)" target="contentFrame">
- * <code>reactWithin</code></a>.
- *
- * @example {{{
- * receiveWithin(500) {
- * case (x, y) => ...
- * case TIMEOUT => ...
- * }
- * }}}
- *
- * @author Philipp Haller
- */
-case object TIMEOUT
-
-
-/** Sent to an actor with `trapExit` set to `'''true'''` whenever one of its
- * linked actors terminates.
- *
- * @param from the actor that terminated
- * @param reason the reason that caused the actor to terminate
- */
-case class Exit(from: AbstractActor, reason: AnyRef)
-
-/** Manages control flow of actor executions.
- *
- * @author Philipp Haller
- */
-private[actors] class SuspendActorControl extends ControlThrowable
diff --git a/src/actors/scala/actors/ActorCanReply.scala b/src/actors/scala/actors/ActorCanReply.scala
index b307aafa57..d92fb183c0 100644
--- a/src/actors/scala/actors/ActorCanReply.scala
+++ b/src/actors/scala/actors/ActorCanReply.scala
@@ -18,7 +18,7 @@ import scala.concurrent.SyncVar
* @author Philipp Haller
*/
private[actors] trait ActorCanReply extends ReactorCanReply {
- this: AbstractActor with ReplyReactor =>
+ this: AbstractActor with InternalReplyReactor =>
override def !?(msg: Any): Any = {
val replyCh = new Channel[Any](Actor.self(scheduler))
diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala
index 090d0448f0..bb04302238 100644
--- a/src/actors/scala/actors/ActorTask.scala
+++ b/src/actors/scala/actors/ActorTask.scala
@@ -17,7 +17,7 @@ package scala.actors
* changes to the underlying var invisible.) I can't figure out what's supposed
* to happen, so I renamed the constructor parameter to at least be less confusing.
*/
-private[actors] class ActorTask(actor: Actor,
+private[actors] class ActorTask(actor: InternalActor,
fun: () => Unit,
handler: PartialFunction[Any, Any],
initialMsg: Any)
@@ -32,7 +32,7 @@ private[actors] class ActorTask(actor: Actor,
}
protected override def terminateExecution(e: Throwable) {
- val senderInfo = try { Some(actor.sender) } catch {
+ val senderInfo = try { Some(actor.internalSender) } catch {
case _: Exception => None
}
// !!! If this is supposed to be setting the current contents of the
@@ -45,13 +45,16 @@ private[actors] class ActorTask(actor: Actor,
e)
val todo = actor.synchronized {
- if (!actor.links.isEmpty)
+ val res = if (!actor.links.isEmpty)
actor.exitLinked(uncaught)
else {
super.terminateExecution(e)
() => {}
}
+ actor.internalPostStop
+ res
}
+
todo()
}
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index 62331239e8..36cee66b42 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -34,7 +34,7 @@ case class ! [a](ch: Channel[a], msg: a)
* @define actor channel
* @define channel channel
*/
-class Channel[Msg](val receiver: Actor) extends InputChannel[Msg] with OutputChannel[Msg] with CanReply[Msg, Any] {
+class Channel[Msg](val receiver: InternalActor) extends InputChannel[Msg] with OutputChannel[Msg] with CanReply[Msg, Any] {
type Future[+P] = scala.actors.Future[P]
diff --git a/src/actors/scala/actors/Combinators.scala b/src/actors/scala/actors/Combinators.scala
index 5276c7843e..c1a9095614 100644
--- a/src/actors/scala/actors/Combinators.scala
+++ b/src/actors/scala/actors/Combinators.scala
@@ -16,7 +16,7 @@ private[actors] trait Combinators {
* Enables the composition of suspendable closures using `andThen`,
* `loop`, `loopWhile`, etc.
*/
- implicit def mkBody[a](body: => a): Actor.Body[a]
+ implicit def mkBody[a](body: => a): InternalActor.Body[a]
/**
* Repeatedly executes `body`.
diff --git a/src/actors/scala/actors/InternalActor.scala b/src/actors/scala/actors/InternalActor.scala
new file mode 100644
index 0000000000..c94da5b9fd
--- /dev/null
+++ b/src/actors/scala/actors/InternalActor.scala
@@ -0,0 +1,509 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+package scala.actors
+import java.util.TimerTask
+import scala.util.control.ControlThrowable
+
+private[actors] object InternalActor {
+ private[actors] trait Body[a] {
+ def andThen[b](other: => b): Unit
+ }
+}
+
+private[actors] trait InternalActor extends AbstractActor with InternalReplyReactor with ActorCanReply with InputChannel[Any] with Serializable {
+
+ /* The following two fields are only used when the actor
+ * suspends by blocking its underlying thread, for example,
+ * when waiting in a receive or synchronous send.
+ */
+ @volatile
+ private[actors] var isSuspended = false
+
+ /* This field is used to communicate the received message from
+ * the invocation of send to the place where the thread of
+ * the receiving actor resumes inside receive/receiveWithin.
+ */
+ @volatile
+ private var received: Option[Any] = None
+
+ protected[actors] override def scheduler: IScheduler = Scheduler
+
+ private[actors] override def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: PartialFunction[Any, Any]) =
+ if (isSuspended) {
+ () =>
+ synchronized {
+ mailbox.append(msg, replyTo)
+ resumeActor()
+ }
+ } else super.startSearch(msg, replyTo, handler)
+
+ // we override this method to check `shouldExit` before suspending
+ private[actors] override def searchMailbox(startMbox: MQueue[Any],
+ handler: PartialFunction[Any, Any],
+ resumeOnSameThread: Boolean) {
+ var tmpMbox = startMbox
+ var done = false
+ while (!done) {
+ val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => {
+ senders = List(replyTo)
+ handler.isDefinedAt(msg)
+ })
+ if (tmpMbox ne mailbox)
+ tmpMbox.foreach((m, s) => mailbox.append(m, s))
+ if (null eq qel) {
+ synchronized {
+ // in mean time new stuff might have arrived
+ if (!sendBuffer.isEmpty) {
+ tmpMbox = new MQueue[Any]("Temp")
+ drainSendBuffer(tmpMbox)
+ // keep going
+ } else {
+ // very important to check for `shouldExit` at this point
+ // since linked actors might have set it after we checked
+ // last time (e.g., at the beginning of `react`)
+ if (shouldExit) exit()
+ waitingFor = handler
+ // see Reactor.searchMailbox
+ throw Actor.suspendException
+ }
+ }
+ } else {
+ resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread)
+ done = true
+ }
+ }
+ }
+
+ private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable =
+ new ActorTask(this, fun, handler, msg)
+
+ /** See the companion object's `receive` method. */
+ def receive[R](f: PartialFunction[Any, R]): R = {
+ assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
+
+ synchronized {
+ if (shouldExit) exit() // links
+ drainSendBuffer(mailbox)
+ }
+
+ var done = false
+ while (!done) {
+ val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
+ senders = replyTo :: senders
+ val matches = f.isDefinedAt(m)
+ senders = senders.tail
+ matches
+ })
+ if (null eq qel) {
+ synchronized {
+ // in mean time new stuff might have arrived
+ if (!sendBuffer.isEmpty) {
+ drainSendBuffer(mailbox)
+ // keep going
+ } else {
+ waitingFor = f
+ isSuspended = true
+ scheduler.managedBlock(blocker)
+ drainSendBuffer(mailbox)
+ // keep going
+ }
+ }
+ } else {
+ received = Some(qel.msg)
+ senders = qel.session :: senders
+ done = true
+ }
+ }
+
+ val result = f(received.get)
+ received = None
+ senders = senders.tail
+ result
+ }
+
+ /** See the companion object's `receiveWithin` method. */
+ def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = {
+ assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
+
+ synchronized {
+ if (shouldExit) exit() // links
+ drainSendBuffer(mailbox)
+ }
+
+ // first, remove spurious TIMEOUT message from mailbox if any
+ mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)
+
+ val receiveTimeout = () => {
+ if (f.isDefinedAt(TIMEOUT)) {
+ received = Some(TIMEOUT)
+ senders = this :: senders
+ } else
+ sys.error("unhandled timeout")
+ }
+
+ var done = false
+ while (!done) {
+ val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
+ senders = replyTo :: senders
+ val matches = f.isDefinedAt(m)
+ senders = senders.tail
+ matches
+ })
+ if (null eq qel) {
+ val todo = synchronized {
+ // in mean time new stuff might have arrived
+ if (!sendBuffer.isEmpty) {
+ drainSendBuffer(mailbox)
+ // keep going
+ () => {}
+ } else if (msec == 0L) {
+ done = true
+ receiveTimeout
+ } else {
+ if (onTimeout.isEmpty) {
+ if (!f.isDefinedAt(TIMEOUT))
+ sys.error("unhandled timeout")
+
+ val thisActor = this
+ onTimeout = Some(new TimerTask {
+ def run() {
+ thisActor.send(TIMEOUT, thisActor)
+ }
+ })
+ Actor.timer.schedule(onTimeout.get, msec)
+ }
+
+ // It is possible that !onTimeout.isEmpty, but TIMEOUT is not yet in mailbox
+ // See SI-4759
+ waitingFor = f
+ received = None
+ isSuspended = true
+ scheduler.managedBlock(blocker)
+ drainSendBuffer(mailbox)
+ // keep going
+ () => {}
+ }
+ }
+ todo()
+ } else {
+ synchronized {
+ if (!onTimeout.isEmpty) {
+ onTimeout.get.cancel()
+ onTimeout = None
+ }
+ }
+ received = Some(qel.msg)
+ senders = qel.session :: senders
+ done = true
+ }
+ }
+
+ val result = f(received.get)
+ received = None
+ senders = senders.tail
+ result
+ }
+
+ /** See the companion object's `react` method. */
+ override def react(handler: PartialFunction[Any, Unit]): Nothing = {
+ synchronized {
+ if (shouldExit) exit()
+ }
+ super.react(handler)
+ }
+
+ /** See the companion object's `reactWithin` method. */
+ override def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = {
+ synchronized {
+ if (shouldExit) exit()
+ }
+ super.reactWithin(msec)(handler)
+ }
+
+ /** Receives the next message from the mailbox */
+ def ? : Any = receive {
+ case x => x
+ }
+
+ // guarded by lock of this
+ // never throws SuspendActorControl
+ private[actors] override def scheduleActor(f: PartialFunction[Any, Any], msg: Any) =
+ if (f eq null) {
+ // do nothing (timeout is handled instead)
+ } else {
+ val task = new ActorTask(this, null, f, msg)
+ scheduler executeFromActor task
+ }
+
+ /* Used for notifying scheduler when blocking inside receive/receiveWithin. */
+ private object blocker extends scala.concurrent.ManagedBlocker {
+ def block() = {
+ InternalActor.this.suspendActor()
+ true
+ }
+ def isReleasable =
+ !InternalActor.this.isSuspended
+ }
+
+ private def suspendActor() = synchronized {
+ while (isSuspended) {
+ try {
+ wait()
+ } catch {
+ case _: InterruptedException =>
+ }
+ }
+ // links: check if we should exit
+ if (shouldExit) exit()
+ }
+
+ private def resumeActor() {
+ isSuspended = false
+ notify()
+ }
+
+ private[actors] override def exiting = synchronized {
+ _state == Actor.State.Terminated
+ }
+
+ // guarded by this
+ private[actors] override def dostart() {
+ // Reset various flags.
+ //
+ // Note that we do *not* reset `trapExit`. The reason is that
+ // users should be able to set the field in the constructor
+ // and before `act` is called.
+ exitReason = 'normal
+ shouldExit = false
+
+ super.dostart()
+ }
+
+ override def start(): InternalActor = synchronized {
+ super.start()
+ this
+ }
+
+ /** State of this actor */
+ override def getState: Actor.State.Value = synchronized {
+ if (isSuspended) {
+ if (onTimeout.isEmpty)
+ Actor.State.Blocked
+ else
+ Actor.State.TimedBlocked
+ } else
+ super.getState
+ }
+
+ // guarded by this
+ private[actors] var links: List[AbstractActor] = Nil
+
+ /**
+ * Links <code>self</code> to actor <code>to</code>.
+ *
+ * @param to the actor to link to
+ * @return the parameter actor
+ */
+ def link(to: AbstractActor): AbstractActor = {
+ assert(Actor.self(scheduler) == this, "link called on actor different from self")
+ this linkTo to
+ to linkTo this
+ to
+ }
+
+ /**
+ * Links <code>self</code> to the actor defined by <code>body</code>.
+ *
+ * @param body the body of the actor to link to
+ * @return the parameter actor
+ */
+ def link(body: => Unit): Actor = {
+ assert(Actor.self(scheduler) == this, "link called on actor different from self")
+ val a = new Actor {
+ def act() = body
+ override final val scheduler: IScheduler = InternalActor.this.scheduler
+ }
+ link(a)
+ a.start()
+ a
+ }
+
+ private[actors] def linkTo(to: AbstractActor) = synchronized {
+ links = to :: links
+ }
+
+ /**
+ * Unlinks <code>self</code> from actor <code>from</code>.
+ */
+ def unlink(from: AbstractActor) {
+ assert(Actor.self(scheduler) == this, "unlink called on actor different from self")
+ this unlinkFrom from
+ from unlinkFrom this
+ }
+
+ private[actors] def unlinkFrom(from: AbstractActor) = synchronized {
+ links = links.filterNot(from.==)
+ }
+
+ @volatile
+ private[actors] var _trapExit = false
+
+ def trapExit = _trapExit
+
+ def trapExit_=(value: Boolean) = _trapExit = value
+
+ // guarded by this
+ private var exitReason: AnyRef = 'normal
+ // guarded by this
+ private[actors] var shouldExit = false
+
+ /**
+ * <p>
+ * Terminates execution of <code>self</code> with the following
+ * effect on linked actors:
+ * </p>
+ * <p>
+ * For each linked actor <code>a</code> with
+ * <code>trapExit</code> set to <code>true</code>, send message
+ * <code>Exit(self, reason)</code> to <code>a</code>.
+ * </p>
+ * <p>
+ * For each linked actor <code>a</code> with
+ * <code>trapExit</code> set to <code>false</code> (default),
+ * call <code>a.exit(reason)</code> if
+ * <code>reason != 'normal</code>.
+ * </p>
+ */
+ protected[actors] def exit(reason: AnyRef): Nothing = {
+ synchronized {
+ exitReason = reason
+ }
+ exit()
+ }
+
+ /**
+ * Terminates with exit reason <code>'normal</code>.
+ */
+ protected[actors] override def exit(): Nothing = {
+ val todo = synchronized {
+ if (!links.isEmpty)
+ exitLinked()
+ else
+ () => {}
+ }
+ todo()
+ super.exit()
+ }
+
+ // Assume !links.isEmpty
+ // guarded by this
+ private[actors] def exitLinked(): () => Unit = {
+ _state = Actor.State.Terminated
+ // reset waitingFor, otherwise getState returns Suspended
+ waitingFor = Reactor.waitingForNone
+ // remove this from links
+ val mylinks = links.filterNot(this.==)
+ // unlink actors
+ mylinks.foreach(unlinkFrom(_))
+ // return closure that locks linked actors
+ () => {
+ mylinks.foreach((linked: AbstractActor) => {
+ linked.synchronized {
+ if (!linked.exiting) {
+ linked.unlinkFrom(this)
+ linked.exit(this, exitReason)
+ }
+ }
+ })
+ }
+ }
+
+ // Assume !links.isEmpty
+ // guarded by this
+ private[actors] def exitLinked(reason: AnyRef): () => Unit = {
+ exitReason = reason
+ exitLinked()
+ }
+
+ // Assume !this.exiting
+ private[actors] def exit(from: AbstractActor, reason: AnyRef) {
+ if (trapExit) {
+ this ! Exit(from, reason)
+ } else if (reason != 'normal)
+ stop(reason)
+ }
+
+ /* Requires qualified private, because <code>RemoteActor</code> must
+ * register a termination handler.
+ */
+ private[actors] def onTerminate(f: => Unit) {
+ scheduler.onTerminate(this) { f }
+ }
+
+ private[actors] def internalPostStop() = {}
+
+ private[actors] def stop(reason: AnyRef): Unit = {
+ synchronized {
+ shouldExit = true
+ exitReason = reason
+ // resume this Actor in a way that
+ // causes it to exit
+ // (because shouldExit == true)
+ if (isSuspended)
+ resumeActor()
+ else if (waitingFor ne Reactor.waitingForNone) {
+ waitingFor = Reactor.waitingForNone
+ // it doesn't matter what partial function we are passing here
+ val task = new ActorTask(this, null, waitingFor, null)
+ scheduler execute task
+ /* Here we should not throw a SuspendActorControl,
+ since the current method is called from an actor that
+ is in the process of exiting.
+
+ Therefore, the contract for scheduleActor is that
+ it never throws a SuspendActorControl.
+ */
+ }
+ }
+ }
+}
+
+/**
+ * Used as the timeout pattern in
+ * <a href="Actor.html#receiveWithin(Long)" target="contentFrame">
+ * <code>receiveWithin</code></a> and
+ * <a href="Actor.html#reactWithin(Long)" target="contentFrame">
+ * <code>reactWithin</code></a>.
+ *
+ * @example {{{
+ * receiveWithin(500) {
+ * case (x, y) => ...
+ * case TIMEOUT => ...
+ * }
+ * }}}
+ *
+ * @author Philipp Haller
+ */
+case object TIMEOUT
+
+/**
+ * Sent to an actor
+ * with `trapExit` set to `true` whenever one of its linked actors
+ * terminates.
+ *
+ * @param from the actor that terminated
+ * @param reason the reason that caused the actor to terminate
+ */
+case class Exit(from: AbstractActor, reason: AnyRef)
+
+/**
+ * Manages control flow of actor executions.
+ *
+ * @author Philipp Haller
+ */
+private[actors] class SuspendActorControl extends ControlThrowable
diff --git a/src/actors/scala/actors/InternalReplyReactor.scala b/src/actors/scala/actors/InternalReplyReactor.scala
new file mode 100644
index 0000000000..38295138d4
--- /dev/null
+++ b/src/actors/scala/actors/InternalReplyReactor.scala
@@ -0,0 +1,161 @@
+package scala.actors
+
+import java.util.{TimerTask}
+
+/**
+ * Extends the [[scala.actors.Reactor]]
+ * trait with methods to reply to the sender of a message.
+ * Sending a message to a <code>ReplyReactor</code> implicitly
+ * passes a reference to the sender together with the message.
+ *
+ * @author Philipp Haller
+ *
+ * @define actor `ReplyReactor`
+ */
+trait InternalReplyReactor extends Reactor[Any] with ReactorCanReply {
+
+ /* A list of the current senders. The head of the list is
+ * the sender of the message that was received last.
+ */
+ @volatile
+ private[actors] var senders: List[OutputChannel[Any]] = List()
+
+ /* This option holds a TimerTask when the actor waits in a
+ * reactWithin. The TimerTask is cancelled when the actor
+ * resumes.
+ *
+ * guarded by this
+ */
+ private[actors] var onTimeout: Option[TimerTask] = None
+
+ /**
+ * Returns the $actor which sent the last received message.
+ */
+ protected[actors] def internalSender: OutputChannel[Any] = senders.head
+
+ /**
+ * Replies with <code>msg</code> to the sender.
+ */
+ protected[actors] def reply(msg: Any) {
+ internalSender ! msg
+ }
+
+ override def !(msg: Any) {
+ send(msg, Actor.rawSelf(scheduler))
+ }
+
+ override def forward(msg: Any) {
+ send(msg, Actor.sender)
+ }
+
+ private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
+ synchronized {
+ if (!onTimeout.isEmpty) {
+ onTimeout.get.cancel()
+ onTimeout = None
+ }
+ }
+ senders = List(item._2)
+ super.resumeReceiver(item, handler, onSameThread)
+ }
+
+ private[actors] override def searchMailbox(startMbox: MQueue[Any],
+ handler: PartialFunction[Any, Any],
+ resumeOnSameThread: Boolean) {
+ var tmpMbox = startMbox
+ var done = false
+ while (!done) {
+ val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => {
+ senders = List(replyTo)
+ handler.isDefinedAt(msg)
+ })
+ if (tmpMbox ne mailbox)
+ tmpMbox.foreach((m, s) => mailbox.append(m, s))
+ if (null eq qel) {
+ synchronized {
+ // in mean time new stuff might have arrived
+ if (!sendBuffer.isEmpty) {
+ tmpMbox = new MQueue[Any]("Temp")
+ drainSendBuffer(tmpMbox)
+ // keep going
+ } else {
+ waitingFor = handler
+ // see Reactor.searchMailbox
+ throw Actor.suspendException
+ }
+ }
+ } else {
+ resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread)
+ done = true
+ }
+ }
+ }
+
+ private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable =
+ new ReplyReactorTask(this, fun, handler, msg)
+
+ protected[actors] override def react(handler: PartialFunction[Any, Unit]): Nothing = {
+ assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
+ super.react(handler)
+ }
+
+
+ /**
+ * Receives a message from this $actor's mailbox within a certain
+ * time span.
+ *
+ * This method never returns. Therefore, the rest of the computation
+ * has to be contained in the actions of the partial function.
+ *
+ * @param msec the time span before timeout
+ * @param handler a partial function with message patterns and actions
+ */
+ protected[actors] def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = {
+ assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
+
+ synchronized { drainSendBuffer(mailbox) }
+
+ // first, remove spurious TIMEOUT message from mailbox if any
+ mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)
+
+ while (true) {
+ val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
+ senders = List(replyTo)
+ handler isDefinedAt m
+ })
+ if (null eq qel) {
+ synchronized {
+ // in mean time new messages might have arrived
+ if (!sendBuffer.isEmpty) {
+ drainSendBuffer(mailbox)
+ // keep going
+ } else if (msec == 0L) {
+ // throws Actor.suspendException
+ resumeReceiver((TIMEOUT, this), handler, false)
+ } else {
+ waitingFor = handler
+ val thisActor = this
+ onTimeout = Some(new TimerTask {
+ def run() { thisActor.send(TIMEOUT, thisActor) }
+ })
+ Actor.timer.schedule(onTimeout.get, msec)
+ throw Actor.suspendException
+ }
+ }
+ } else
+ resumeReceiver((qel.msg, qel.session), handler, false)
+ }
+ throw Actor.suspendException
+ }
+
+ override def getState: Actor.State.Value = synchronized {
+ if (waitingFor ne Reactor.waitingForNone) {
+ if (onTimeout.isEmpty)
+ Actor.State.Suspended
+ else
+ Actor.State.TimedSuspended
+ } else
+ _state
+ }
+
+}
diff --git a/src/actors/scala/actors/OutputChannel.scala b/src/actors/scala/actors/OutputChannel.scala
index 089b3d0981..1fba684975 100644
--- a/src/actors/scala/actors/OutputChannel.scala
+++ b/src/actors/scala/actors/OutputChannel.scala
@@ -43,5 +43,5 @@ trait OutputChannel[-Msg] {
/**
* Returns the `Actor` that is receiving from this $actor.
*/
- def receiver: Actor
+ def receiver: InternalActor
}
diff --git a/src/actors/scala/actors/ReactChannel.scala b/src/actors/scala/actors/ReactChannel.scala
index fccde34272..81a166c1a4 100644
--- a/src/actors/scala/actors/ReactChannel.scala
+++ b/src/actors/scala/actors/ReactChannel.scala
@@ -12,7 +12,7 @@ package scala.actors
/**
* @author Philipp Haller
*/
-private[actors] class ReactChannel[Msg](receiver: ReplyReactor) extends InputChannel[Msg] {
+private[actors] class ReactChannel[Msg](receiver: InternalReplyReactor) extends InputChannel[Msg] {
private case class SendToReactor(channel: ReactChannel[Msg], msg: Msg)
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index 7d21e9f91e..8fc7578344 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -253,7 +253,7 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators {
_state
}
- implicit def mkBody[A](body: => A) = new Actor.Body[A] {
+ implicit def mkBody[A](body: => A) = new InternalActor.Body[A] {
def andThen[B](other: => B): Unit = Reactor.this.seq(body, other)
}
diff --git a/src/actors/scala/actors/ReactorCanReply.scala b/src/actors/scala/actors/ReactorCanReply.scala
index 68f9999776..dabd0832f0 100644
--- a/src/actors/scala/actors/ReactorCanReply.scala
+++ b/src/actors/scala/actors/ReactorCanReply.scala
@@ -16,7 +16,7 @@ package scala.actors
* @author Philipp Haller
*/
private[actors] trait ReactorCanReply extends CanReply[Any, Any] {
- _: ReplyReactor =>
+ _: InternalReplyReactor =>
type Future[+P] = scala.actors.Future[P]
diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala
index 0e5ce00c91..0ffbbd3cce 100644
--- a/src/actors/scala/actors/ReplyReactor.scala
+++ b/src/actors/scala/actors/ReplyReactor.scala
@@ -5,165 +5,12 @@
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
-
package scala.actors
-import java.util.{Timer, TimerTask}
-
-/**
- * Extends the [[scala.actors.Reactor]] trait with methods to reply to the
- * sender of a message.
- *
- * Sending a message to a `ReplyReactor` implicitly passes a reference to
- * the sender together with the message.
- *
- * @author Philipp Haller
- *
- * @define actor `ReplyReactor`
- */
-trait ReplyReactor extends Reactor[Any] with ReactorCanReply {
-
- /* A list of the current senders. The head of the list is
- * the sender of the message that was received last.
- */
- @volatile
- private[actors] var senders: List[OutputChannel[Any]] = List()
-
- /* This option holds a TimerTask when the actor waits in a
- * reactWithin. The TimerTask is cancelled when the actor
- * resumes.
- *
- * guarded by this
- */
- private[actors] var onTimeout: Option[TimerTask] = None
-
- /**
- * Returns the $actor which sent the last received message.
- */
- protected[actors] def sender: OutputChannel[Any] = senders.head
-
- /**
- * Replies with `msg` to the sender.
- */
- protected[actors] def reply(msg: Any) {
- sender ! msg
- }
-
- override def !(msg: Any) {
- send(msg, Actor.rawSelf(scheduler))
- }
-
- override def forward(msg: Any) {
- send(msg, Actor.sender)
- }
-
- private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
- synchronized {
- if (!onTimeout.isEmpty) {
- onTimeout.get.cancel()
- onTimeout = None
- }
- }
- senders = List(item._2)
- super.resumeReceiver(item, handler, onSameThread)
- }
-
- private[actors] override def searchMailbox(startMbox: MQueue[Any],
- handler: PartialFunction[Any, Any],
- resumeOnSameThread: Boolean) {
- var tmpMbox = startMbox
- var done = false
- while (!done) {
- val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => {
- senders = List(replyTo)
- handler.isDefinedAt(msg)
- })
- if (tmpMbox ne mailbox)
- tmpMbox.foreach((m, s) => mailbox.append(m, s))
- if (null eq qel) {
- synchronized {
- // in mean time new stuff might have arrived
- if (!sendBuffer.isEmpty) {
- tmpMbox = new MQueue[Any]("Temp")
- drainSendBuffer(tmpMbox)
- // keep going
- } else {
- waitingFor = handler
- // see Reactor.searchMailbox
- throw Actor.suspendException
- }
- }
- } else {
- resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread)
- done = true
- }
- }
- }
-
- private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable =
- new ReplyReactorTask(this, fun, handler, msg)
-
- protected[actors] override def react(handler: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
- super.react(handler)
- }
-
- /**
- * Receives a message from this $actor's mailbox within a certain
- * time span.
- *
- * This method never returns. Therefore, the rest of the computation
- * has to be contained in the actions of the partial function.
- *
- * @param msec the time span before timeout
- * @param handler a partial function with message patterns and actions
- */
- protected[actors] def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
-
- synchronized { drainSendBuffer(mailbox) }
-
- // first, remove spurious TIMEOUT message from mailbox if any
- mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)
-
- while (true) {
- val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
- senders = List(replyTo)
- handler isDefinedAt m
- })
- if (null eq qel) {
- synchronized {
- // in mean time new messages might have arrived
- if (!sendBuffer.isEmpty) {
- drainSendBuffer(mailbox)
- // keep going
- } else if (msec == 0L) {
- // throws Actor.suspendException
- resumeReceiver((TIMEOUT, this), handler, false)
- } else {
- waitingFor = handler
- val thisActor = this
- onTimeout = Some(new TimerTask {
- def run() { thisActor.send(TIMEOUT, thisActor) }
- })
- Actor.timer.schedule(onTimeout.get, msec)
- throw Actor.suspendException
- }
- }
- } else
- resumeReceiver((qel.msg, qel.session), handler, false)
- }
- throw Actor.suspendException
- }
-
- override def getState: Actor.State.Value = synchronized {
- if (waitingFor ne Reactor.waitingForNone) {
- if (onTimeout.isEmpty)
- Actor.State.Suspended
- else
- Actor.State.TimedSuspended
- } else
- _state
- }
-
+@deprecated("Scala Actors are beeing removed from the standard library. Please refer to the migration guide.", "2.10")
+trait ReplyReactor extends InternalReplyReactor {
+
+ protected[actors] def sender: OutputChannel[Any] = super.internalSender
+
}
+
diff --git a/src/actors/scala/actors/ReplyReactorTask.scala b/src/actors/scala/actors/ReplyReactorTask.scala
index cb63d7e000..d38eb50381 100644
--- a/src/actors/scala/actors/ReplyReactorTask.scala
+++ b/src/actors/scala/actors/ReplyReactorTask.scala
@@ -17,13 +17,13 @@ package scala.actors
* changes to the underlying var invisible.) I can't figure out what's supposed
* to happen, so I renamed the constructor parameter to at least be less confusing.
*/
-private[actors] class ReplyReactorTask(replyReactor: ReplyReactor,
+private[actors] class ReplyReactorTask(replyReactor: InternalReplyReactor,
fun: () => Unit,
handler: PartialFunction[Any, Any],
msg: Any)
extends ReactorTask(replyReactor, fun, handler, msg) {
- var saved: ReplyReactor = _
+ var saved: InternalReplyReactor = _
protected override def beginExecution() {
saved = Actor.tl.get
diff --git a/src/actors/scala/actors/UncaughtException.scala b/src/actors/scala/actors/UncaughtException.scala
index 3e6efe3b7c..a3e7f795f1 100644
--- a/src/actors/scala/actors/UncaughtException.scala
+++ b/src/actors/scala/actors/UncaughtException.scala
@@ -20,7 +20,7 @@ package scala.actors
* @author Philipp Haller
* @author Erik Engbrecht
*/
-case class UncaughtException(actor: Actor,
+case class UncaughtException(actor: InternalActor,
message: Option[Any],
sender: Option[OutputChannel[Any]],
thread: Thread,