summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/ActorRef.scala119
-rw-r--r--src/actors/scala/actors/ActorTask.scala1
-rw-r--r--src/actors/scala/actors/InternalActor.scala53
-rw-r--r--src/actors/scala/actors/MQueue.scala14
-rw-r--r--src/actors/scala/actors/Reactor.scala10
5 files changed, 186 insertions, 11 deletions
diff --git a/src/actors/scala/actors/ActorRef.scala b/src/actors/scala/actors/ActorRef.scala
new file mode 100644
index 0000000000..8f70b13e01
--- /dev/null
+++ b/src/actors/scala/actors/ActorRef.scala
@@ -0,0 +1,119 @@
+package scala.actors
+
+import java.util.concurrent.TimeoutException
+import scala.concurrent.util.Duration
+
+/**
+ * Trait used for migration of Scala actors to Akka.
+ */
+@deprecated("ActorRef ought to be used only with the Actor Migration Kit.")
+trait ActorRef {
+
+ /**
+ * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
+ * <p/>
+ *
+ * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
+ * <p/>
+ *
+ * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable,
+ * if invoked from within an Actor. If not then no sender is available.
+ * <pre>
+ * actor ! message
+ * </pre>
+ * <p/>
+ */
+ def !(message: Any)(implicit sender: ActorRef = null): Unit
+
+ /**
+ * Sends a message asynchronously, returning a future which may eventually hold the reply.
+ */
+ private[actors] def ?(message: Any, timeout: Duration): Future[Any]
+
+ /**
+ * Forwards the message and passes the original sender actor as the sender.
+ * <p/>
+ * Works with '!' and '?'.
+ */
+ def forward(message: Any)
+
+ private[actors] def localActor: AbstractActor
+
+}
+
+private[actors] class OutputChannelRef(val actor: OutputChannel[Any]) extends ActorRef {
+
+ override private[actors] def ?(message: Any, timeout: Duration): Future[Any] =
+ throw new UnsupportedOperationException("Output channel does not support ?")
+
+ /**
+ * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
+ * <p/>
+ *
+ * <p/>
+ * <pre>
+ * actor ! message
+ * </pre>
+ * <p/>
+ */
+ def !(message: Any)(implicit sender: ActorRef = null): Unit =
+ if (sender != null)
+ actor.send(message, sender.localActor)
+ else
+ actor ! message
+
+ override def equals(that: Any) =
+ that.isInstanceOf[OutputChannelRef] && that.asInstanceOf[OutputChannelRef].actor == this.actor
+
+ private[actors] override def localActor: AbstractActor =
+ throw new UnsupportedOperationException("Output channel does not have an instance of the actor")
+
+ def forward(message: Any): Unit = throw new UnsupportedOperationException("OutputChannel does not support forward.")
+
+}
+
+private[actors] class ReactorRef(override val actor: Reactor[Any]) extends OutputChannelRef(actor) {
+
+ /**
+ * Forwards the message and passes the original sender actor as the sender.
+ * <p/>
+ * Works with '!' and '?'.
+ */
+ override def forward(message: Any) = actor.forward(message)
+
+}
+
+private[actors] final class InternalActorRef(override val actor: InternalActor) extends ReactorRef(actor) {
+
+ /**
+ * Sends a message asynchronously, returning a future which may eventually hold the reply.
+ */
+ override private[actors] def ?(message: Any, timeout: Duration): Future[Any] =
+ Futures.future {
+ val dur = if (timeout.isFinite()) timeout.toMillis else (java.lang.Long.MAX_VALUE >> 2)
+ actor !? (dur, message) match {
+ case Some(x) => x
+ case None => new AskTimeoutException("? operation timed out.")
+ }
+ }
+
+ override def !(message: Any)(implicit sender: ActorRef = null): Unit =
+ if (message == PoisonPill)
+ actor.stop('normal)
+ else if (sender != null)
+ actor.send(message, sender.localActor)
+ else
+ actor ! message
+
+ private[actors] override def localActor: InternalActor = this.actor
+}
+
+/**
+ * This is what is used to complete a Future that is returned from an ask/? call,
+ * when it times out.
+ */
+class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException {
+ def this(message: String) = this(message, null: Throwable)
+}
+
+object PoisonPill
diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala
index bb04302238..045b00f5f2 100644
--- a/src/actors/scala/actors/ActorTask.scala
+++ b/src/actors/scala/actors/ActorTask.scala
@@ -51,7 +51,6 @@ private[actors] class ActorTask(actor: InternalActor,
super.terminateExecution(e)
() => {}
}
- actor.internalPostStop
res
}
diff --git a/src/actors/scala/actors/InternalActor.scala b/src/actors/scala/actors/InternalActor.scala
index c94da5b9fd..cb66021d1c 100644
--- a/src/actors/scala/actors/InternalActor.scala
+++ b/src/actors/scala/actors/InternalActor.scala
@@ -153,7 +153,7 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac
val matches = f.isDefinedAt(m)
senders = senders.tail
matches
- })
+ })
if (null eq qel) {
val todo = synchronized {
// in mean time new stuff might have arrived
@@ -317,6 +317,35 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac
}
/**
+ * Links <code>self</code> to actor <code>to</code>.
+ *
+ * @param to the actor to link to
+ * @return the parameter actor
+ */
+ def link(to: ActorRef): ActorRef = {
+ this.link(to.localActor)
+ to
+ }
+
+ /**
+ * Unidirectional linking. For migration purposes only
+ */
+ private[actors] def watch(subject: ActorRef): ActorRef = {
+ assert(Actor.self(scheduler) == this, "link called on actor different from self")
+ subject.localActor linkTo this
+ subject
+ }
+
+ /**
+ * Unidirectional linking. For migration purposes only
+ */
+ private[actors] def unwatch(subject: ActorRef): ActorRef = {
+ assert(Actor.self(scheduler) == this, "link called on actor different from self")
+ subject.localActor unlinkFrom this
+ subject
+ }
+
+ /**
* Links <code>self</code> to the actor defined by <code>body</code>.
*
* @param body the body of the actor to link to
@@ -346,17 +375,24 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac
from unlinkFrom this
}
+ /**
+ * Unlinks <code>self</code> from actor <code>from</code>.
+ */
+ def unlink(from: ActorRef) {
+ unlink(from.localActor)
+ }
+
private[actors] def unlinkFrom(from: AbstractActor) = synchronized {
links = links.filterNot(from.==)
}
- @volatile
+ @volatile
private[actors] var _trapExit = false
-
+
def trapExit = _trapExit
-
+
def trapExit_=(value: Boolean) = _trapExit = value
-
+
// guarded by this
private var exitReason: AnyRef = 'normal
// guarded by this
@@ -445,12 +481,11 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac
scheduler.onTerminate(this) { f }
}
- private[actors] def internalPostStop() = {}
- private[actors] def stop(reason: AnyRef): Unit = {
+ private[actors] def stop(reason: AnyRef): Unit = {
synchronized {
shouldExit = true
- exitReason = reason
+ exitReason = reason
// resume this Actor in a way that
// causes it to exit
// (because shouldExit == true)
@@ -464,7 +499,7 @@ private[actors] trait InternalActor extends AbstractActor with InternalReplyReac
/* Here we should not throw a SuspendActorControl,
since the current method is called from an actor that
is in the process of exiting.
-
+
Therefore, the contract for scheduleActor is that
it never throws a SuspendActorControl.
*/
diff --git a/src/actors/scala/actors/MQueue.scala b/src/actors/scala/actors/MQueue.scala
index 65427d68c5..4a148d2cb3 100644
--- a/src/actors/scala/actors/MQueue.scala
+++ b/src/actors/scala/actors/MQueue.scala
@@ -25,6 +25,20 @@ private[actors] class MQueue[Msg >: Null](protected val label: String) {
_size += diff
}
+ def prepend(other: MQueue[Msg]) {
+ if (!other.isEmpty) {
+ other.last.next = first
+ first = other.first
+ }
+ }
+
+ def clear() {
+ first = null
+ last = null
+ _size = 0
+ }
+
+
def append(msg: Msg, session: OutputChannel[Any]) {
changeSize(1) // size always increases by 1
val el = new MQueueElement(msg, session)
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index 206a97d97c..7a8d738758 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -214,11 +214,16 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators {
scheduler executeFromActor makeReaction(null, handler, msg)
}
+ private[actors] def preAct() = {}
+
// guarded by this
private[actors] def dostart() {
_state = Actor.State.Runnable
scheduler newActor this
- scheduler execute makeReaction(() => act(), null, null)
+ scheduler execute makeReaction(() => {
+ preAct()
+ act()
+ }, null, null)
}
/**
@@ -285,12 +290,15 @@ trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators {
throw Actor.suspendException
}
+ private[actors] def internalPostStop() = {}
+
private[actors] def terminated() {
synchronized {
_state = Actor.State.Terminated
// reset waitingFor, otherwise getState returns Suspended
waitingFor = Reactor.waitingForNone
}
+ internalPostStop()
scheduler.terminated(this)
}