diff options
Diffstat (limited to 'test/files/presentation/akka/src/akka/actor/ActorRef.scala')
-rw-r--r-- | test/files/presentation/akka/src/akka/actor/ActorRef.scala | 1433 |
1 files changed, 1433 insertions, 0 deletions
diff --git a/test/files/presentation/akka/src/akka/actor/ActorRef.scala b/test/files/presentation/akka/src/akka/actor/ActorRef.scala new file mode 100644 index 0000000000..4ce14512b8 --- /dev/null +++ b/test/files/presentation/akka/src/akka/actor/ActorRef.scala @@ -0,0 +1,1433 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> + */ + +package akka.actor + +import akka.event.EventHandler +import akka.dispatch._ +import akka.config.Supervision._ +import akka.util._ +import ReflectiveAccess._ + +import java.net.InetSocketAddress +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } +import java.util.{ Map => JMap } + +import scala.reflect.BeanProperty +import scala.collection.immutable.Stack +import scala.annotation.tailrec + +private[akka] object ActorRefInternals { + + /** + * LifeCycles for ActorRefs. + */ + private[akka] sealed trait StatusType + object UNSTARTED extends StatusType + object RUNNING extends StatusType + object BEING_RESTARTED extends StatusType + object SHUTDOWN extends StatusType +} + +/** + * Abstraction for unification of sender and senderFuture for later reply. + * Can be stored away and used at a later point in time. + */ +abstract class Channel[T] { + + /** + * Scala API. <p/> + * Sends the specified message to the channel. + */ + def !(msg: T): Unit + + /** + * Java API. <p/> + * Sends the specified message to the channel. + */ + def sendOneWay(msg: T): Unit = this.!(msg) +} + +/** + * ActorRef is an immutable and serializable handle to an Actor. + * <p/> + * Create an ActorRef for an Actor by using the factory method on the Actor object. + * <p/> + * Here is an example on how to create an actor with a default constructor. + * <pre> + * import Actor._ + * + * val actor = actorOf[MyActor] + * actor.start() + * actor ! message + * actor.stop() + * </pre> + * + * You can also create and start actors like this: + * <pre> + * val actor = actorOf[MyActor].start() + * </pre> + * + * Here is an example on how to create an actor with a non-default constructor. + * <pre> + * import Actor._ + * + * val actor = actorOf(new MyActor(...)) + * actor.start() + * actor ! message + * actor.stop() + * </pre> + * + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef => + // Only mutable for RemoteServer in order to maintain identity across nodes + @volatile + protected[akka] var _uuid = newUuid + @volatile + protected[this] var _status: ActorRefInternals.StatusType = ActorRefInternals.UNSTARTED + + /** + * User overridable callback/setting. + * <p/> + * Identifier for actor, does not have to be a unique one. Default is the 'uuid'. + * <p/> + * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote + * actor in RemoteServer etc.But also as the identifier for persistence, which means + * that you can use a custom name to be able to retrieve the "correct" persisted state + * upon restart, remote restart etc. + */ + @BeanProperty + @volatile + var id: String = _uuid.toString + + /** + * User overridable callback/setting. + * <p/> + * Defines the default timeout for '!!' and '!!!' invocations, + * e.g. the timeout for the future returned by the call to '!!' and '!!!'. + */ + @deprecated("Will be replaced by implicit-scoped timeout on all methods that needs it, will default to timeout specified in config", "1.1") + @BeanProperty + @volatile + var timeout: Long = Actor.TIMEOUT + + /** + * User overridable callback/setting. + * <p/> + * Defines the default timeout for an initial receive invocation. + * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. + */ + @volatile + var receiveTimeout: Option[Long] = None + + /** + * Akka Java API. <p/> + * Defines the default timeout for an initial receive invocation. + * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. + */ + def setReceiveTimeout(timeout: Long) = this.receiveTimeout = Some(timeout) + def getReceiveTimeout(): Option[Long] = receiveTimeout + + /** + * Akka Java API. <p/> + * A faultHandler defines what should be done when a linked actor signals an error. + * <p/> + * Can be one of: + * <pre> + * getContext().setFaultHandler(new AllForOneStrategy(new Class[]{Throwable.class},maxNrOfRetries, withinTimeRange)); + * </pre> + * Or: + * <pre> + * getContext().setFaultHandler(new OneForOneStrategy(new Class[]{Throwable.class},maxNrOfRetries, withinTimeRange)); + * </pre> + */ + def setFaultHandler(handler: FaultHandlingStrategy) + def getFaultHandler(): FaultHandlingStrategy + + /** + * Akka Java API. <p/> + * A lifeCycle defines whether the actor will be stopped on error (Temporary) or if it can be restarted (Permanent) + * <p/> + * Can be one of: + * + * import static akka.config.Supervision.*; + * <pre> + * getContext().setLifeCycle(permanent()); + * </pre> + * Or: + * <pre> + * getContext().setLifeCycle(temporary()); + * </pre> + */ + def setLifeCycle(lifeCycle: LifeCycle): Unit + def getLifeCycle(): LifeCycle + + /** + * Akka Java API. <p/> + * The default dispatcher is the <tt>Dispatchers.globalExecutorBasedEventDrivenDispatcher</tt>. + * This means that all actors will share the same event-driven executor based dispatcher. + * <p/> + * You can override it so it fits the specific use-case that the actor is used for. + * See the <tt>akka.dispatch.Dispatchers</tt> class for the different + * dispatchers available. + * <p/> + * The default is also that all actors that are created and spawned from within this actor + * is sharing the same dispatcher as its creator. + */ + def setDispatcher(dispatcher: MessageDispatcher) = this.dispatcher = dispatcher + def getDispatcher(): MessageDispatcher = dispatcher + + /** + * Returns on which node this actor lives if None it lives in the local ActorRegistry + */ + @deprecated("Remoting will become fully transparent in the future", "1.1") + def homeAddress: Option[InetSocketAddress] + + /** + * Java API. <p/> + */ + @deprecated("Remoting will become fully transparent in the future", "1.1") + def getHomeAddress(): InetSocketAddress = homeAddress getOrElse null + + /** + * Holds the hot swapped partial function. + */ + @volatile + protected[akka] var hotswap = Stack[PartialFunction[Any, Unit]]() + + /** + * This is a reference to the message currently being processed by the actor + */ + @volatile + protected[akka] var currentMessage: MessageInvocation = null + + /** + * Comparison only takes uuid into account. + */ + def compareTo(other: ActorRef) = this.uuid compareTo other.uuid + + /** + * Returns the uuid for the actor. + */ + def getUuid() = _uuid + def uuid = _uuid + + /** + * Akka Java API. <p/> + * The reference sender Actor of the last received message. + * Is defined if the message was sent from another Actor, else None. + */ + def getSender(): Option[ActorRef] = sender + + /** + * Akka Java API. <p/> + * The reference sender future of the last received message. + * Is defined if the message was sent with sent with '!!' or '!!!', else None. + */ + def getSenderFuture(): Option[CompletableFuture[Any]] = senderFuture + + /** + * Is the actor being restarted? + */ + def isBeingRestarted: Boolean = _status == ActorRefInternals.BEING_RESTARTED + + /** + * Is the actor running? + */ + def isRunning: Boolean = _status match { + case ActorRefInternals.BEING_RESTARTED | ActorRefInternals.RUNNING => true + case _ => false + } + + /** + * Is the actor shut down? + */ + def isShutdown: Boolean = _status == ActorRefInternals.SHUTDOWN + + /** + * Is the actor ever started? + */ + def isUnstarted: Boolean = _status == ActorRefInternals.UNSTARTED + + /** + * Is the actor able to handle the message passed in as arguments? + */ + @deprecated("Will be removed without replacement, it's just not reliable in the face of `become` and `unbecome`", "1.1") + def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message) + + /** + * Only for internal use. UUID is effectively final. + */ + protected[akka] def uuid_=(uid: Uuid) = _uuid = uid + + /** + * Akka Java API. <p/> + * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. + * <p/> + * <pre> + * actor.sendOneWay(message); + * </pre> + * <p/> + */ + def sendOneWay(message: AnyRef): Unit = sendOneWay(message, null) + + /** + * Akka Java API. <p/> + * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. + * <p/> + * Allows you to pass along the sender of the message. + * <p/> + * <pre> + * actor.sendOneWay(message, context); + * </pre> + * <p/> + */ + def sendOneWay(message: AnyRef, sender: ActorRef): Unit = this.!(message)(Option(sender)) + + /** + * Akka Java API. <p/> + * @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef) + * Uses the default timeout of the Actor (setTimeout()) and omits the sender reference + */ + def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message, timeout, null) + + /** + * Akka Java API. <p/> + * @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef) + * Uses the default timeout of the Actor (setTimeout()) + */ + def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message, timeout, sender) + + /** + * Akka Java API. <p/> + * Sends a message asynchronously and waits on a future for a reply message under the hood. + * <p/> + * It waits on the reply either until it receives it or until the timeout expires + * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. + * <p/> + * <b>NOTE:</b> + * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to + * implement request/response message exchanges. + * <p/> + * If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>getContext().reply(..)</code> + * to send a reply message to the original sender. If not then the sender will block until the timeout expires. + */ + def sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef): AnyRef = { + !!(message, timeout)(Option(sender)).getOrElse(throw new ActorTimeoutException( + "Message [" + message + + "]\n\tsent to [" + actorClassName + + "]\n\tfrom [" + (if (sender ne null) sender.actorClassName else "nowhere") + + "]\n\twith timeout [" + timeout + + "]\n\ttimed out.")) + .asInstanceOf[AnyRef] + } + + /** + * Akka Java API. <p/> + * @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] + * Uses the Actors default timeout (setTimeout()) and omits the sender + */ + def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]] + + /** + * Akka Java API. <p/> + * @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] + * Uses the Actors default timeout (setTimeout()) + */ + def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] = sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]] + + /** + * Akka Java API. <p/> + * Sends a message asynchronously returns a future holding the eventual reply message. + * <p/> + * <b>NOTE:</b> + * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to + * implement request/response message exchanges. + * <p/> + * If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code> + * to send a reply message to the original sender. If not then the sender will block until the timeout expires. + */ + def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]] + + /** + * Akka Java API. <p/> + * Forwards the message specified to this actor and preserves the original sender of the message + */ + def forward(message: AnyRef, sender: ActorRef): Unit = + if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") + else forward(message)(Some(sender)) + + /** + * Akka Java API. <p/> + * Use <code>getContext().replyUnsafe(..)</code> to reply with a message to the original sender of the message currently + * being processed. + * <p/> + * Throws an IllegalStateException if unable to determine what to reply to. + */ + def replyUnsafe(message: AnyRef) = reply(message) + + /** + * Akka Java API. <p/> + * Use <code>getContext().replySafe(..)</code> to reply with a message to the original sender of the message currently + * being processed. + * <p/> + * Returns true if reply was sent, and false if unable to determine what to reply to. + */ + def replySafe(message: AnyRef): Boolean = reply_?(message) + + /** + * Returns the class for the Actor instance that is managed by the ActorRef. + */ + @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") + def actorClass: Class[_ <: Actor] + + /** + * Akka Java API. <p/> + * Returns the class for the Actor instance that is managed by the ActorRef. + */ + @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") + def getActorClass(): Class[_ <: Actor] = actorClass + + /** + * Returns the class name for the Actor instance that is managed by the ActorRef. + */ + @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") + def actorClassName: String + + /** + * Akka Java API. <p/> + * Returns the class name for the Actor instance that is managed by the ActorRef. + */ + @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") + def getActorClassName(): String = actorClassName + + /** + * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. + */ + def dispatcher_=(md: MessageDispatcher): Unit + + /** + * Get the dispatcher for this actor. + */ + def dispatcher: MessageDispatcher + + /** + * Starts up the actor and its message queue. + */ + def start(): ActorRef + + /** + * Shuts down the actor its dispatcher and message queue. + * Alias for 'stop'. + */ + def exit() = stop() + + /** + * Shuts down the actor its dispatcher and message queue. + */ + def stop(): Unit + + /** + * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will + * receive a notification if the linked actor has crashed. + * <p/> + * If the 'trapExit' member field of the 'faultHandler' has been set to at contain at least one exception class then it will + * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy + * defined by the 'faultHandler'. + */ + def link(actorRef: ActorRef): Unit + + /** + * Unlink the actor. + */ + def unlink(actorRef: ActorRef): Unit + + /** + * Atomically start and link an actor. + */ + def startLink(actorRef: ActorRef): Unit + + /** + * Atomically create (from actor class) and start an actor. + * <p/> + * To be invoked from within the actor itself. + */ + @deprecated("Will be removed after 1.1, use Actor.actorOf instead", "1.1") + def spawn(clazz: Class[_ <: Actor]): ActorRef + + /** + * Atomically create (from actor class), make it remote and start an actor. + * <p/> + * To be invoked from within the actor itself. + */ + @deprecated("Will be removed after 1.1, client managed actors will be removed", "1.1") + def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef + + /** + * Atomically create (from actor class), link and start an actor. + * <p/> + * To be invoked from within the actor itself. + */ + @deprecated("Will be removed after 1.1, use use Actor.remote.actorOf instead and then link on success", "1.1") + def spawnLink(clazz: Class[_ <: Actor]): ActorRef + + /** + * Atomically create (from actor class), make it remote, link and start an actor. + * <p/> + * To be invoked from within the actor itself. + */ + @deprecated("Will be removed after 1.1, client managed actors will be removed", "1.1") + def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef + + /** + * Returns the mailbox size. + */ + def mailboxSize = dispatcher.mailboxSize(this) + + /** + * Akka Java API. <p/> + * Returns the mailbox size. + */ + def getMailboxSize(): Int = mailboxSize + + /** + * Returns the supervisor, if there is one. + */ + def supervisor: Option[ActorRef] + + /** + * Akka Java API. <p/> + * Returns the supervisor, if there is one. + */ + def getSupervisor(): ActorRef = supervisor getOrElse null + + /** + * Returns an unmodifiable Java Map containing the linked actors, + * please note that the backing map is thread-safe but not immutable + */ + def linkedActors: JMap[Uuid, ActorRef] + + /** + * Java API. <p/> + * Returns an unmodifiable Java Map containing the linked actors, + * please note that the backing map is thread-safe but not immutable + */ + def getLinkedActors(): JMap[Uuid, ActorRef] = linkedActors + + /** + * Abstraction for unification of sender and senderFuture for later reply + */ + def channel: Channel[Any] = { + if (senderFuture.isDefined) { + new Channel[Any] { + val future = senderFuture.get + def !(msg: Any) = future completeWithResult msg + } + } else if (sender.isDefined) { + val someSelf = Some(this) + new Channel[Any] { + val client = sender.get + def !(msg: Any) = client.!(msg)(someSelf) + } + } else throw new IllegalActorStateException("No channel available") + } + + /** + * Java API. <p/> + * Abstraction for unification of sender and senderFuture for later reply + */ + def getChannel: Channel[Any] = channel + + protected[akka] def invoke(messageHandle: MessageInvocation): Unit + + protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit + + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + message: Any, + timeout: Long, + senderOption: Option[ActorRef], + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] + + protected[akka] def actorInstance: AtomicReference[Actor] + + protected[akka] def actor: Actor = actorInstance.get + + protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit + + protected[akka] def mailbox: AnyRef + protected[akka] def mailbox_=(value: AnyRef): AnyRef + + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit + + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit + + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit + + protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] + + override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid) + + override def equals(that: Any): Boolean = { + that.isInstanceOf[ActorRef] && + that.asInstanceOf[ActorRef].uuid == uuid + } + + override def toString = "Actor[" + id + ":" + uuid + "]" +} + +/** + * Local (serializable) ActorRef that is used when referencing the Actor on its "home" node. + * + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +class LocalActorRef private[akka] ( + private[this] val actorFactory: () => Actor, + val homeAddress: Option[InetSocketAddress], + val clientManaged: Boolean = false) + extends ActorRef with ScalaActorRef { + protected[akka] val guard = new ReentrantGuard + + @volatile + protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None + @volatile + private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] + @volatile + private[akka] var _supervisor: Option[ActorRef] = None + @volatile + private var maxNrOfRetriesCount: Int = 0 + @volatile + private var restartsWithinTimeRangeTimestamp: Long = 0L + @volatile + private var _mailbox: AnyRef = _ + @volatile + private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher + + protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } + + //If it was started inside "newActor", initialize it + if (isRunning) initializeActorInstance + + // used only for deserialization + private[akka] def this( + __uuid: Uuid, + __id: String, + __timeout: Long, + __receiveTimeout: Option[Long], + __lifeCycle: LifeCycle, + __supervisor: Option[ActorRef], + __hotswap: Stack[PartialFunction[Any, Unit]], + __factory: () => Actor, + __homeAddress: Option[InetSocketAddress]) = { + this(__factory, __homeAddress) + _uuid = __uuid + id = __id + timeout = __timeout + receiveTimeout = __receiveTimeout + lifeCycle = __lifeCycle + _supervisor = __supervisor + hotswap = __hotswap + setActorSelfFields(actor, this) + start + } + + /** + * Returns whether this actor ref is client-managed remote or not + */ + private[akka] final def isClientManaged_? = clientManaged && homeAddress.isDefined && isRemotingEnabled + + // ========= PUBLIC FUNCTIONS ========= + + /** + * Returns the class for the Actor instance that is managed by the ActorRef. + */ + @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") + def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]] + + /** + * Returns the class name for the Actor instance that is managed by the ActorRef. + */ + @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") + def actorClassName: String = actorClass.getName + + /** + * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. + */ + def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard { + if (!isBeingRestarted) { + if (!isRunning) _dispatcher = md + else throw new ActorInitializationException( + "Can not swap dispatcher for " + toString + " after it has been started") + } + } + + /** + * Get the dispatcher for this actor. + */ + def dispatcher: MessageDispatcher = _dispatcher + + /** + * Starts up the actor and its message queue. + */ + def start(): ActorRef = guard.withGuard { + if (isShutdown) throw new ActorStartException( + "Can't restart an actor that has been shut down with 'stop' or 'exit'") + if (!isRunning) { + dispatcher.attach(this) + + _status = ActorRefInternals.RUNNING + + // If we are not currently creating this ActorRef instance + if ((actorInstance ne null) && (actorInstance.get ne null)) + initializeActorInstance + + if (isClientManaged_?) + Actor.remote.registerClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid) + + checkReceiveTimeout //Schedule the initial Receive timeout + } + this + } + + /** + * Shuts down the actor its dispatcher and message queue. + */ + def stop() = guard.withGuard { + if (isRunning) { + receiveTimeout = None + cancelReceiveTimeout + dispatcher.detach(this) + _status = ActorRefInternals.SHUTDOWN + try { + actor.postStop + } finally { + currentMessage = null + Actor.registry.unregister(this) + if (isRemotingEnabled) { + if (isClientManaged_?) + Actor.remote.unregisterClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid) + Actor.remote.unregister(this) + } + setActorSelfFields(actorInstance.get, null) + } + } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") + } + + /** + * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will + * receive a notification if the linked actor has crashed. + * <p/> + * If the 'trapExit' member field of the 'faultHandler' has been set to at contain at least one exception class then it will + * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy + * defined by the 'faultHandler'. + * <p/> + * To be invoked from within the actor itself. + */ + def link(actorRef: ActorRef): Unit = guard.withGuard { + val actorRefSupervisor = actorRef.supervisor + val hasSupervisorAlready = actorRefSupervisor.isDefined + if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return // we already supervise this guy + else if (hasSupervisorAlready) throw new IllegalActorStateException( + "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") + else { + _linkedActors.put(actorRef.uuid, actorRef) + actorRef.supervisor = Some(this) + } + } + + /** + * Unlink the actor. + * <p/> + * To be invoked from within the actor itself. + */ + def unlink(actorRef: ActorRef) = guard.withGuard { + if (_linkedActors.remove(actorRef.uuid) eq null) + throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink") + + actorRef.supervisor = None + } + + /** + * Atomically start and link an actor. + * <p/> + * To be invoked from within the actor itself. + */ + def startLink(actorRef: ActorRef): Unit = guard.withGuard { + link(actorRef) + actorRef.start() + } + + /** + * Atomically create (from actor class) and start an actor. + * <p/> + * To be invoked from within the actor itself. + */ + def spawn(clazz: Class[_ <: Actor]): ActorRef = + Actor.actorOf(clazz).start() + + /** + * Atomically create (from actor class), start and make an actor remote. + * <p/> + * To be invoked from within the actor itself. + */ + def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = { + ensureRemotingEnabled + val ref = Actor.remote.actorOf(clazz, hostname, port) + ref.timeout = timeout + ref.start() + } + + /** + * Atomically create (from actor class), start and link an actor. + * <p/> + * To be invoked from within the actor itself. + */ + def spawnLink(clazz: Class[_ <: Actor]): ActorRef = { + val actor = spawn(clazz) + link(actor) + actor.start() + actor + } + + /** + * Atomically create (from actor class), start, link and make an actor remote. + * <p/> + * To be invoked from within the actor itself. + */ + def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = { + ensureRemotingEnabled + val actor = Actor.remote.actorOf(clazz, hostname, port) + actor.timeout = timeout + link(actor) + actor.start() + actor + } + + /** + * Returns the mailbox. + */ + def mailbox: AnyRef = _mailbox + + protected[akka] def mailbox_=(value: AnyRef): AnyRef = { _mailbox = value; value } + + /** + * Returns the supervisor, if there is one. + */ + def supervisor: Option[ActorRef] = _supervisor + + // ========= AKKA PROTECTED FUNCTIONS ========= + + protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup + + protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = + if (isClientManaged_?) { + Actor.remote.send[Any]( + message, senderOption, None, homeAddress.get, timeout, true, this, None, ActorType.ScalaActor, None) + } else + dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None) + + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + message: Any, + timeout: Long, + senderOption: Option[ActorRef], + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + if (isClientManaged_?) { + val future = Actor.remote.send[T]( + message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, ActorType.ScalaActor, None) + if (future.isDefined) future.get + else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) + } else { + val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) + dispatcher dispatchMessage new MessageInvocation( + this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) + future.get + } + } + + /** + * Callback for the dispatcher. This is the single entry point to the user Actor implementation. + */ + protected[akka] def invoke(messageHandle: MessageInvocation): Unit = { + guard.lock.lock + try { + if (!isShutdown) { + currentMessage = messageHandle + try { + try { + cancelReceiveTimeout // FIXME: leave this here? + actor(messageHandle.message) + currentMessage = null // reset current message after successful invocation + } catch { + case e: InterruptedException => + currentMessage = null // received message while actor is shutting down, ignore + case e => + handleExceptionInDispatch(e, messageHandle.message) + } + finally { + checkReceiveTimeout // Reschedule receive timeout + } + } catch { + case e => + EventHandler.error(e, this, messageHandle.message.toString) + throw e + } + } + } finally { guard.lock.unlock } + } + + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) { + faultHandler match { + case AllForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) => + restartLinkedActors(reason, maxRetries, within) + + case OneForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) => + dead.restart(reason, maxRetries, within) + + case _ => + if (_supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason)) + else dead.stop() + } + } + + private def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { + val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal + false + } else if (withinTimeRange.isEmpty) { // restrict number of restarts + maxNrOfRetriesCount += 1 //Increment number of retries + maxNrOfRetriesCount > maxNrOfRetries.get + } else { // cannot restart more than N within M timerange + maxNrOfRetriesCount += 1 //Increment number of retries + val windowStart = restartsWithinTimeRangeTimestamp + val now = System.currentTimeMillis + val retries = maxNrOfRetriesCount + //We are within the time window if it isn't the first restart, or if the window hasn't closed + val insideWindow = if (windowStart == 0) false + else (now - windowStart) <= withinTimeRange.get + + //The actor is dead if it dies X times within the window of restart + val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1) + + if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window + restartsWithinTimeRangeTimestamp = now + + if (windowStart != 0 && !insideWindow) //Reset number of restarts if window has expired + maxNrOfRetriesCount = 1 + + unrestartable + } + + denied == false //If we weren't denied, we have a go + } + + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { + def performRestart() { + val failedActor = actorInstance.get + + failedActor match { + case p: Proxyable => + failedActor.preRestart(reason) + failedActor.postRestart(reason) + case _ => + failedActor.preRestart(reason) + val freshActor = newActor + setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor + actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call + freshActor.preStart + freshActor.postRestart(reason) + } + } + + def tooManyRestarts() { + _supervisor.foreach { sup => + // can supervisor handle the notification? + val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) + if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) + } + stop + } + + @tailrec + def attemptRestart() { + val success = if (requestRestartPermission(maxNrOfRetries, withinTimeRange)) { + guard.withGuard[Boolean] { + _status = ActorRefInternals.BEING_RESTARTED + + lifeCycle match { + case Temporary => + shutDownTemporaryActor(this) + true + + case _ => // either permanent or none where default is permanent + val success = try { + performRestart() + true + } catch { + case e => + EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString)) + false // an error or exception here should trigger a retry + } + finally { + currentMessage = null + } + if (success) { + _status = ActorRefInternals.RUNNING + dispatcher.resume(this) + restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) + } + success + } + } + } else { + tooManyRestarts() + true // done + } + + if (success) () // alles gut + else attemptRestart() + } + + attemptRestart() // recur + } + + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = { + val i = _linkedActors.values.iterator + while (i.hasNext) { + val actorRef = i.next + actorRef.lifeCycle match { + // either permanent or none where default is permanent + case Temporary => shutDownTemporaryActor(actorRef) + case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) + } + } + } + + protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { + ensureRemotingEnabled + if (_supervisor.isDefined) { + if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this) + Some(_supervisor.get.uuid) + } else None + } + + def linkedActors: JMap[Uuid, ActorRef] = java.util.Collections.unmodifiableMap(_linkedActors) + + // ========= PRIVATE FUNCTIONS ========= + + private[this] def newActor: Actor = { + try { + Actor.actorRefInCreation.set(Some(this)) + val a = actorFactory() + if (a eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") + a + } finally { + Actor.actorRefInCreation.set(None) + } + } + + private def shutDownTemporaryActor(temporaryActor: ActorRef) { + temporaryActor.stop() + _linkedActors.remove(temporaryActor.uuid) // remove the temporary actor + // if last temporary actor is gone, then unlink me from supervisor + if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this)) + true + } + + private def handleExceptionInDispatch(reason: Throwable, message: Any) = { + EventHandler.error(reason, this, message.toString) + + //Prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + + senderFuture.foreach(_.completeWithException(reason)) + + if (supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason)) + else { + lifeCycle match { + case Temporary => shutDownTemporaryActor(this) + case _ => dispatcher.resume(this) //Resume processing for this actor + } + } + } + + private def notifySupervisorWithMessage(notification: LifeCycleMessage) = { + // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client + _supervisor.foreach { sup => + if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors + //Scoped stop all linked actors, to avoid leaking the 'i' val + { + val i = _linkedActors.values.iterator + while (i.hasNext) { + i.next.stop() + i.remove + } + } + //Stop the actor itself + stop + } else sup ! notification // else notify supervisor + } + } + + private def setActorSelfFields(actor: Actor, value: ActorRef) { + + @tailrec + def lookupAndSetSelfFields(clazz: Class[_], actor: Actor, value: ActorRef): Boolean = { + val success = try { + val selfField = clazz.getDeclaredField("self") + val someSelfField = clazz.getDeclaredField("someSelf") + selfField.setAccessible(true) + someSelfField.setAccessible(true) + selfField.set(actor, value) + someSelfField.set(actor, if (value ne null) Some(value) else null) + true + } catch { + case e: NoSuchFieldException => false + } + + if (success) true + else { + val parent = clazz.getSuperclass + if (parent eq null) + throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") + lookupAndSetSelfFields(parent, actor, value) + } + } + + lookupAndSetSelfFields(actor.getClass, actor, value) + } + + private def initializeActorInstance = { + actor.preStart // run actor preStart + Actor.registry.register(this) + } + + protected[akka] def checkReceiveTimeout = { + cancelReceiveTimeout + if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed + _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS)) + } + } + + protected[akka] def cancelReceiveTimeout = { + if (_futureTimeout.isDefined) { + _futureTimeout.get.cancel(true) + _futureTimeout = None + } + } +} + +/** + * System messages for RemoteActorRef. + * + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +object RemoteActorSystemMessage { + val Stop = "RemoteActorRef:stop".intern +} + +/** + * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. + * This reference is network-aware (remembers its origin) and immutable. + * + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +private[akka] case class RemoteActorRef private[akka] ( + classOrServiceName: String, + val actorClassName: String, + val hostname: String, + val port: Int, + _timeout: Long, + loader: Option[ClassLoader], + val actorType: ActorType = ActorType.ScalaActor) + extends ActorRef with ScalaActorRef { + + ensureRemotingEnabled + + val homeAddress = Some(new InetSocketAddress(hostname, port)) + + //protected def clientManaged = classOrServiceName.isEmpty //If no class or service name, it's client managed + id = classOrServiceName + //id = classOrServiceName.getOrElse("uuid:" + uuid) //If we're a server-managed we want to have classOrServiceName as id, or else, we're a client-managed and we want to have our uuid as id + + timeout = _timeout + + start + + def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = + Actor.remote.send[Any](message, senderOption, None, homeAddress.get, timeout, true, this, None, actorType, loader) + + def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + message: Any, + timeout: Long, + senderOption: Option[ActorRef], + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + val future = Actor.remote.send[T]( + message, senderOption, senderFuture, + homeAddress.get, timeout, + false, this, None, + actorType, loader) + if (future.isDefined) future.get + else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) + } + + def start: ActorRef = synchronized { + _status = ActorRefInternals.RUNNING + this + } + + def stop: Unit = synchronized { + if (_status == ActorRefInternals.RUNNING) { + _status = ActorRefInternals.SHUTDOWN + postMessageToMailbox(RemoteActorSystemMessage.Stop, None) + } + } + + protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None + + // ==== NOT SUPPORTED ==== + @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") + def actorClass: Class[_ <: Actor] = unsupported + def dispatcher_=(md: MessageDispatcher): Unit = unsupported + def dispatcher: MessageDispatcher = unsupported + def link(actorRef: ActorRef): Unit = unsupported + def unlink(actorRef: ActorRef): Unit = unsupported + def startLink(actorRef: ActorRef): Unit = unsupported + def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported + def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported + def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported + def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported + def supervisor: Option[ActorRef] = unsupported + def linkedActors: JMap[Uuid, ActorRef] = unsupported + protected[akka] def mailbox: AnyRef = unsupported + protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported + protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported + protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported + protected[akka] def actorInstance: AtomicReference[Actor] = unsupported + private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") +} + +/** + * This trait represents the common (external) methods for all ActorRefs + * Needed because implicit conversions aren't applied when instance imports are used + * + * i.e. + * var self: ScalaActorRef = ... + * import self._ + * //can't call ActorRef methods here unless they are declared in a common + * //superclass, which ActorRefShared is. + */ +trait ActorRefShared { + /** + * Returns the uuid for the actor. + */ + def uuid: Uuid +} + +/** + * This trait represents the Scala Actor API + * There are implicit conversions in ../actor/Implicits.scala + * from ActorRef -> ScalaActorRef and back + */ +trait ScalaActorRef extends ActorRefShared { ref: ActorRef => + + /** + * Identifier for actor, does not have to be a unique one. Default is the 'uuid'. + * <p/> + * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote + * actor in RemoteServer etc.But also as the identifier for persistence, which means + * that you can use a custom name to be able to retrieve the "correct" persisted state + * upon restart, remote restart etc. + */ + def id: String + + def id_=(id: String): Unit + + /** + * User overridable callback/setting. + * <p/> + * Defines the life-cycle for a supervised actor. + */ + @volatile + @BeanProperty + var lifeCycle: LifeCycle = UndefinedLifeCycle + + /** + * User overridable callback/setting. + * <p/> + * Don't forget to supply a List of exception types to intercept (trapExit) + * <p/> + * Can be one of: + * <pre> + * faultHandler = AllForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange) + * </pre> + * Or: + * <pre> + * faultHandler = OneForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange) + * </pre> + */ + @volatile + @BeanProperty + var faultHandler: FaultHandlingStrategy = NoFaultHandlingStrategy + + /** + * The reference sender Actor of the last received message. + * Is defined if the message was sent from another Actor, else None. + */ + def sender: Option[ActorRef] = { + val msg = currentMessage + if (msg eq null) None + else msg.sender + } + + /** + * The reference sender future of the last received message. + * Is defined if the message was sent with sent with '!!' or '!!!', else None. + */ + def senderFuture(): Option[CompletableFuture[Any]] = { + val msg = currentMessage + if (msg eq null) None + else msg.senderFuture + } + + /** + * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. + * <p/> + * + * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument. + * <p/> + * + * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable, + * if invoked from within an Actor. If not then no sender is available. + * <pre> + * actor ! message + * </pre> + * <p/> + */ + def !(message: Any)(implicit sender: Option[ActorRef] = None): Unit = { + if (isRunning) postMessageToMailbox(message, sender) + else throw new ActorInitializationException( + "Actor has not been started, you need to invoke 'actor.start()' before using it") + } + + /** + * Sends a message asynchronously and waits on a future for a reply message. + * <p/> + * It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>) + * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics. + * <p/> + * <b>NOTE:</b> + * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to + * implement request/response message exchanges. + * If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code> + * to send a reply message to the original sender. If not then the sender will block until the timeout expires. + */ + def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = { + if (isRunning) { + val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None) + val isMessageJoinPoint = if (isTypedActorEnabled) TypedActorModule.resolveFutureIfMessageIsJoinPoint(message, future) + else false + try { + future.await + } catch { + case e: FutureTimeoutException => + if (isMessageJoinPoint) { + EventHandler.error(e, this, e.getMessage) + throw e + } else None + } + future.resultOrException + } else throw new ActorInitializationException( + "Actor has not been started, you need to invoke 'actor.start()' before using it") + } + + /** + * Sends a message asynchronously returns a future holding the eventual reply message. + * <p/> + * <b>NOTE:</b> + * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to + * implement request/response message exchanges. + * If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>self.reply(..)</code> + * to send a reply message to the original sender. If not then the sender will block until the timeout expires. + */ + def !!![T](message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Future[T] = { + if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None) + else throw new ActorInitializationException( + "Actor has not been started, you need to invoke 'actor.start()' before using it") + } + + /** + * Forwards the message and passes the original sender actor as the sender. + * <p/> + * Works with '!', '!!' and '!!!'. + */ + def forward(message: Any)(implicit sender: Some[ActorRef]) = { + if (isRunning) { + if (sender.get.senderFuture.isDefined) + postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, sender.get.sender, sender.get.senderFuture) + else + postMessageToMailbox(message, sender.get.sender) + } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start()' before using it") + } + + /** + * Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently + * being processed. + * <p/> + * Throws an IllegalStateException if unable to determine what to reply to. + */ + def reply(message: Any) = if (!reply_?(message)) throw new IllegalActorStateException( + "\n\tNo sender in scope, can't reply. " + + "\n\tYou have probably: " + + "\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." + + "\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." + + "\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope") + + /** + * Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently + * being processed. + * <p/> + * Returns true if reply was sent, and false if unable to determine what to reply to. + */ + def reply_?(message: Any): Boolean = { + if (senderFuture.isDefined) { + senderFuture.get completeWithResult message + true + } else if (sender.isDefined) { + //TODO: optimize away this allocation, perhaps by having implicit self: Option[ActorRef] in signature + sender.get.!(message)(Some(this)) + true + } else false + } + + /** + * Atomically create (from actor class) and start an actor. + */ + def spawn[T <: Actor: Manifest]: ActorRef = + spawn(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) + + /** + * Atomically create (from actor class), start and make an actor remote. + */ + def spawnRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = { + ensureRemotingEnabled + spawnRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) + } + + /** + * Atomically create (from actor class), start and link an actor. + */ + def spawnLink[T <: Actor: Manifest]: ActorRef = + spawnLink(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]]) + + /** + * Atomically create (from actor class), start, link and make an actor remote. + */ + def spawnLinkRemote[T <: Actor: Manifest](hostname: String, port: Int, timeout: Long): ActorRef = { + ensureRemotingEnabled + spawnLinkRemote(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) + } +} |