/** Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
import akka.dispatch._
import akka.config.Config._
import akka.util.Helpers.{ narrow, narrowSilently }
import akka.util.ListenerManagement
import akka.AkkaException
import scala.beans.BeanProperty
import akka.util.{ ReflectiveAccess, Duration }
import akka.remoteinterface.RemoteSupport
import akka.japi.{ Creator, Procedure }
import java.lang.reflect.InvocationTargetException
/** Life-cycle messages for the Actors
*/
sealed trait LifeCycleMessage extends Serializable
/* Marker trait to show which Messages are automatically handled by Akka */
sealed trait AutoReceivedMessage { self: LifeCycleMessage => }
case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true)
extends AutoReceivedMessage with LifeCycleMessage {
/** Java API
*/
def this(code: akka.japi.Function[ActorRef, Procedure[Any]], discardOld: Boolean) =
this((self: ActorRef) => {
val behavior = code(self)
val result: Actor.Receive = { case msg => behavior(msg) }
result
}, discardOld)
/** Java API with default non-stacking behavior
*/
def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true)
}
case object RevertHotSwap extends AutoReceivedMessage with LifeCycleMessage
case class Restart(reason: Throwable) extends AutoReceivedMessage with LifeCycleMessage
case class Exit(dead: ActorRef, killer: Throwable) extends AutoReceivedMessage with LifeCycleMessage
case class Link(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage
case class Unlink(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage
case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage
case object PoisonPill extends AutoReceivedMessage with LifeCycleMessage
case object Kill extends AutoReceivedMessage with LifeCycleMessage
case object ReceiveTimeout extends LifeCycleMessage
case class MaximumNumberOfRestartsWithinTimeRangeReached(
@BeanProperty val victim: ActorRef,
@BeanProperty val maxNrOfRetries: Option[Int],
@BeanProperty val withinTimeRange: Option[Int],
@BeanProperty val lastExceptionCausingRestart: Throwable) extends LifeCycleMessage
// Exceptions for Actors
class ActorStartException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class IllegalActorStateException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorKilledException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorInitializationException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorTimeoutException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class InvalidMessageException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
/** This message is thrown by default when an Actors behavior doesn't match a message
*/
case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception {
override def getMessage() = "Actor %s does not handle [%s]".format(ref, msg)
override def fillInStackTrace() = this //Don't waste cycles generating stack trace
}
/** Actor factory module with factory methods for creating various kinds of Actors.
*
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
object Actor extends ListenerManagement {
/** Add shutdown cleanups
*/
private[akka] lazy val shutdownHook = {
val hook = new Runnable {
override def run {
// Clear Thread.subclassAudits
val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits")
tf.setAccessible(true)
val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_, _]]
subclassAudits synchronized { subclassAudits.clear }
}
}
Runtime.getRuntime.addShutdownHook(new Thread(hook))
hook
}
val registry = new ActorRegistry
lazy val remote: RemoteSupport = {
ReflectiveAccess
.Remote
.defaultRemoteSupport
.map(_())
.getOrElse(throw new UnsupportedOperationException("You need to have akka-remote.jar on classpath"))
}
private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis
private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
/** A Receive is a convenience type that defines actor message behavior currently modeled as
* a PartialFunction[Any, Unit].
*/
type Receive = PartialFunction[Any, Unit]
private[actor] val actorRefInCreation = new ThreadLocal[Option[ActorRef]] {
override def initialValue = None
}
/** Creates an ActorRef out of the Actor with type T.
* <pre>
* import Actor._
* val actor = actorOf[MyActor]
* actor.start()
* actor ! message
* actor.stop()
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf[MyActor].start()
* </pre>
*/
def actorOf[T <: Actor: ClassTag]: ActorRef = actorOf(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]])
/** Creates an ActorRef out of the Actor of the specified Class.
* <pre>
* import Actor._
* val actor = actorOf(classOf[MyActor])
* actor.start()
* actor ! message
* actor.stop()
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(classOf[MyActor]).start()
* </pre>
*/
def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs) match {
case Right(actor) => actor
case Left(exception) =>
val cause = exception match {
case i: InvocationTargetException => i.getTargetException
case _ => exception
}
throw new ActorInitializationException(
"Could not instantiate Actor of " + clazz +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", cause)
}
}, None)
/** Creates an ActorRef out of the Actor. Allows you to pass in a factory function
* that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted.
* <p/>
* This function should <b>NOT</b> be used for remote actors.
* <pre>
* import Actor._
* val actor = actorOf(new MyActor)
* actor.start()
* actor ! message
* actor.stop()
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(new MyActor).start()
* </pre>
*/
def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory, None)
/** Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator<Actor>)
* that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted.
* <p/>
* This function should <b>NOT</b> be used for remote actors.
* JAVA API
*/
def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create, None)
/** Use to spawn out a block of code in an event-driven actor. Will shut actor down when
* the block has been executed.
* <p/>
* NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since
* there is a method 'spawn[ActorType]' in the Actor trait already.
* Example:
* <pre>
* import Actor.{spawn}
*
* spawn {
* ... // do stuff
* }
* </pre>
*/
def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = {
case object Spawn
actorOf(new Actor() {
self.dispatcher = dispatcher
def receive = {
case Spawn => try { body } finally { self.stop() }
}
}).start() ! Spawn
}
/** Implicitly converts the given Option[Any] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>
* to convert an Option[Any] to an Option[T].
*/
implicit def toAnyOptionAsTypedOption(anyOption: Option[Any]) = new AnyOptionAsTypedOption(anyOption)
/** Implicitly converts the given Future[_] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>
* to convert an Option[Any] to an Option[T].
* This means that the following code is equivalent:
* (actor !! "foo").as[Int] (Deprecated)
* and
* (actor !!! "foo").as[Int] (Recommended)
*/
implicit def futureToAnyOptionAsTypedOption(anyFuture: Future[_]) = new AnyOptionAsTypedOption({
try { anyFuture.await } catch { case t: FutureTimeoutException => }
anyFuture.resultOrException
})
}
/** Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
* <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>
* <p/>
* An actor has a well-defined (non-cyclic) life-cycle.
* <pre>
* => NEW (newly created actor) - can't receive messages (yet)
* => STARTED (when 'start' is invoked) - can receive messages
* => SHUT DOWN (when 'exit' is invoked) - can't do anything
* </pre>
*
* <p/>
* The Actor's API is available in the 'self' member variable.
*
* <p/>
* Here you find functions like:
* - !, !!, !!! and forward
* - link, unlink, startLink, spawnLink etc
* - makeRemote etc.
* - start, stop
* - etc.
*
* <p/>
* Here you also find fields like
* - dispatcher = ...
* - id = ...
* - lifeCycle = ...
* - faultHandler = ...
* - trapExit = ...
* - etc.
*
* <p/>
* This means that to use them you have to prefix them with 'self', like this: <tt>self ! Message</tt>
*
* However, for convenience you can import these functions and fields like below, which will allow you do
* drop the 'self' prefix:
* <pre>
* class MyActor extends Actor {
* import self._
* id = ...
* dispatcher = ...
* spawnLink[OtherActor]
* ...
* }
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
trait Actor {
/** Type alias because traits cannot have companion objects.
*/
type Receive = Actor.Receive
/*
* Some[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* the 'forward' function.
*/
@transient
implicit val someSelf: Some[ActorRef] = {
val optRef = Actor.actorRefInCreation.get
if (optRef.isEmpty) throw new ActorInitializationException(
"ActorRef for instance of actor [" + getClass.getName + "] is not in scope." +
"\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." +
"\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
"\n\tEither use:" +
"\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))'")
Actor.actorRefInCreation.set(None)
optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed?
optRef.asInstanceOf[Some[ActorRef]]
}
/*
* Option[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
* Mainly for internal use, functions as the implicit sender references when invoking
* one of the message send functions ('!', '!!' and '!!!').
*/
implicit def optionSelf: Option[ActorRef] = someSelf
/** The 'self' field holds the ActorRef for this actor.
* <p/>
* Can be used to send messages to itself:
* <pre>
* self ! message
* </pre>
* Here you also find most of the Actor API.
* <p/>
* For example fields like:
* <pre>
* self.dispatcher = ...
* self.trapExit = ...
* self.faultHandler = ...
* self.lifeCycle = ...
* self.sender
* </pre>
* <p/>
* Here you also find methods like:
* <pre>
* self.reply(..)
* self.link(..)
* self.unlink(..)
* self.start(..)
* self.stop(..)
* </pre>
*/
@transient
val self: ScalaActorRef = someSelf.get
/** User overridable callback/setting.
* <p/>
* Partial function implementing the actor logic.
* To be implemented by concrete actor class.
* <p/>
* Example code:
* <pre>
* def receive = {
* case Ping =>
* println("got a 'Ping' message")
* self.reply("pong")
*
* case OneWay =>
* println("got a 'OneWay' message")
*
* case unknown =>
* println("unknown message: " + unknown)
* }
* </pre>
*/
protected def receive: Receive
/** User overridable callback.
* <p/>
* Is called when an Actor is started by invoking 'actor.start()'.
*/
def preStart() {}
/** User overridable callback.
* <p/>
* Is called when 'actor.stop()' is invoked.
*/
def postStop() {}
/** User overridable callback.
* <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
*/
def preRestart(reason: Throwable) {}
/** User overridable callback.
* <p/>
* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
*/
def postRestart(reason: Throwable) {}
/** User overridable callback.
* <p/>
* Is called when a message isn't handled by the current behavior of the actor
* by default it throws an UnhandledMessageException
*/
def unhandled(msg: Any) {
throw new UnhandledMessageException(msg, self)
}
/** Is the actor able to handle the message passed in as arguments?
*/
def isDefinedAt(message: Any): Boolean = {
val behaviorStack = self.hotswap
message match { //Same logic as apply(msg) but without the unhandled catch-all
case l: AutoReceivedMessage => true
case msg if behaviorStack.nonEmpty &&
behaviorStack.head.isDefinedAt(msg) => true
case msg if behaviorStack.isEmpty &&
processingBehavior.isDefinedAt(msg) => true
case _ => false
}
}
/** Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
* Puts the behavior on top of the hotswap stack.
* If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
*/
def become(behavior: Receive, discardOld: Boolean = true) {
if (discardOld) unbecome()
self.hotswap = self.hotswap.push(behavior)
}
/** Reverts the Actor behavior to the previous one in the hotswap stack.
*/
def unbecome(): Unit = {
val h = self.hotswap
if (h.nonEmpty) self.hotswap = h.pop
}
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
private[akka] final def apply(msg: Any) = {
if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null")
val behaviorStack = self.hotswap
msg match {
case l: AutoReceivedMessage => autoReceiveMessage(l)
case msg if behaviorStack.nonEmpty &&
behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg)
case msg if behaviorStack.isEmpty &&
processingBehavior.isDefinedAt(msg) => processingBehavior.apply(msg)
case unknown => unhandled(unknown) //This is the only line that differs from processingbehavior
}
}
private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match {
case HotSwap(code, discardOld) => become(code(self), discardOld)
case RevertHotSwap => unbecome()
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop()
case Restart(reason) => throw reason
case Kill => throw new ActorKilledException("Kill")
case PoisonPill =>
val f = self.senderFuture
self.stop()
if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill"))
}
private lazy val processingBehavior = receive //ProcessingBehavior is the original behavior
}
private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) {
/** Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException
* if the actual type is not assignable from the given one.
*/
def as[T]: Option[T] = narrow[T](anyOption)
/** Convenience helper to cast the given Option of Any to an Option of the given type. Will swallow a possible
* ClassCastException and return None in that case.
*/
def asSilently[T: ClassTag]: Option[T] = narrowSilently[T](anyOption)
}
/** Marker interface for proxyable actors (such as typed actor).
*
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
trait Proxyable {
private[actor] def swapProxiedActor(newInstance: Actor)
}
/** Represents the different Actor types.
*
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
sealed trait ActorType
object ActorType {
case object ScalaActor extends ActorType
case object TypedActor extends ActorType
}