diff options
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/actor/ActorRef.scala')
-rw-r--r-- | test/disabled/presentation/akka/src/akka/actor/ActorRef.scala | 1433 |
1 files changed, 0 insertions, 1433 deletions
diff --git a/test/disabled/presentation/akka/src/akka/actor/ActorRef.scala b/test/disabled/presentation/akka/src/akka/actor/ActorRef.scala deleted file mode 100644 index 97bb710e29..0000000000 --- a/test/disabled/presentation/akka/src/akka/actor/ActorRef.scala +++ /dev/null @@ -1,1433 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.actor - -import akka.event.EventHandler -import akka.dispatch._ -import akka.config.Supervision._ -import akka.util._ -import ReflectiveAccess._ - -import java.net.InetSocketAddress -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } -import java.util.{ Map => JMap } - -import scala.beans.BeanProperty -import scala.collection.immutable.Stack -import scala.annotation.tailrec - -private[akka] object ActorRefInternals { - - /** - * LifeCycles for ActorRefs. - */ - private[akka] sealed trait StatusType - object UNSTARTED extends StatusType - object RUNNING extends StatusType - object BEING_RESTARTED extends StatusType - object SHUTDOWN extends StatusType -} - -/** - * Abstraction for unification of sender and senderFuture for later reply. - * Can be stored away and used at a later point in time. - */ -abstract class Channel[T] { - - /** - * Scala API. <p/> - * Sends the specified message to the channel. - */ - def !(msg: T): Unit - - /** - * Java API. <p/> - * Sends the specified message to the channel. - */ - def sendOneWay(msg: T): Unit = this.!(msg) -} - -/** - * ActorRef is an immutable and serializable handle to an Actor. - * <p/> - * Create an ActorRef for an Actor by using the factory method on the Actor object. - * <p/> - * Here is an example on how to create an actor with a default constructor. - * <pre> - * import Actor._ - * - * val actor = actorOf[MyActor] - * actor.start() - * actor ! message - * actor.stop() - * </pre> - * - * You can also create and start actors like this: - * <pre> - * val actor = actorOf[MyActor].start() - * </pre> - * - * Here is an example on how to create an actor with a non-default constructor. - * <pre> - * import Actor._ - * - * val actor = actorOf(new MyActor(...)) - * actor.start() - * actor ! message - * actor.stop() - * </pre> - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef => - // Only mutable for RemoteServer in order to maintain identity across nodes - @volatile - protected[akka] var _uuid = newUuid - @volatile - protected[this] var _status: ActorRefInternals.StatusType = ActorRefInternals.UNSTARTED - - /** - * User overridable callback/setting. - * <p/> - * Identifier for actor, does not have to be a unique one. Default is the 'uuid'. - * <p/> - * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote - * actor in RemoteServer etc.But also as the identifier for persistence, which means - * that you can use a custom name to be able to retrieve the "correct" persisted state - * upon restart, remote restart etc. - */ - @BeanProperty - @volatile - var id: String = _uuid.toString - - /** - * User overridable callback/setting. - * <p/> - * Defines the default timeout for '!!' and '!!!' invocations, - * e.g. the timeout for the future returned by the call to '!!' and '!!!'. - */ - @deprecated("Will be replaced by implicit-scoped timeout on all methods that needs it, will default to timeout specified in config", "1.1") - @BeanProperty - @volatile - var timeout: Long = Actor.TIMEOUT - - /** - * User overridable callback/setting. - * <p/> - * Defines the default timeout for an initial receive invocation. - * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. - */ - @volatile - var receiveTimeout: Option[Long] = None - - /** - * Akka Java API. <p/> - * Defines the default timeout for an initial receive invocation. - * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. - */ - def setReceiveTimeout(timeout: Long) = this.receiveTimeout = Some(timeout) - def getReceiveTimeout(): Option[Long] = receiveTimeout - - /** - * Akka Java API. <p/> - * A faultHandler defines what should be done when a linked actor signals an error. - * <p/> - * Can be one of: - * <pre> - * getContext().setFaultHandler(new AllForOneStrategy(new Class[]{Throwable.class},maxNrOfRetries, withinTimeRange)); - * </pre> - * Or: - * <pre> - * getContext().setFaultHandler(new OneForOneStrategy(new Class[]{Throwable.class},maxNrOfRetries, withinTimeRange)); - * </pre> - */ - def setFaultHandler(handler: FaultHandlingStrategy) - def getFaultHandler(): FaultHandlingStrategy - - /** - * Akka Java API. <p/> - * A lifeCycle defines whether the actor will be stopped on error (Temporary) or if it can be restarted (Permanent) - * <p/> - * Can be one of: - * - * import static akka.config.Supervision.*; - * <pre> - * getContext().setLifeCycle(permanent()); - * </pre> - * Or: - * <pre> - * getContext().setLifeCycle(temporary()); - * </pre> - */ - def setLifeCycle(lifeCycle: LifeCycle): Unit - def getLifeCycle(): LifeCycle - - /** - * Akka Java API. <p/> - * The default dispatcher is the <tt>Dispatchers.globalExecutorBasedEventDrivenDispatcher</tt>. - * This means that all actors will share the same event-driven executor based dispatcher. - * <p/> - * You can override it so it fits the specific use-case that the actor is used for. - * See the <tt>akka.dispatch.Dispatchers</tt> class for the different - * dispatchers available. - * <p/> - * The default is also that all actors that are created and spawned from within this actor - * is sharing the same dispatcher as its creator. - */ - def setDispatcher(dispatcher: MessageDispatcher) = this.dispatcher = dispatcher - def getDispatcher(): MessageDispatcher = dispatcher - - /** - * Returns on which node this actor lives if None it lives in the local ActorRegistry - */ - @deprecated("Remoting will become fully transparent in the future", "1.1") - def homeAddress: Option[InetSocketAddress] - - /** - * Java API. <p/> - */ - @deprecated("Remoting will become fully transparent in the future", "1.1") - def getHomeAddress(): InetSocketAddress = homeAddress getOrElse null - - /** - * Holds the hot swapped partial function. - */ - @volatile - protected[akka] var hotswap = Stack[PartialFunction[Any, Unit]]() - - /** - * This is a reference to the message currently being processed by the actor - */ - @volatile - protected[akka] var currentMessage: MessageInvocation = null - - /** - * Comparison only takes uuid into account. - */ - def compareTo(other: ActorRef) = this.uuid compareTo other.uuid - - /** - * Returns the uuid for the actor. - */ - def getUuid() = _uuid - def uuid = _uuid - - /** - * Akka Java API. <p/> - * The reference sender Actor of the last received message. - * Is defined if the message was sent from another Actor, else None. - */ - def getSender(): Option[ActorRef] = sender - - /** - * Akka Java API. <p/> - * The reference sender future of the last received message. - * Is defined if the message was sent with sent with '!!' or '!!!', else None. - */ - def getSenderFuture(): Option[CompletableFuture[Any]] = senderFuture - - /** - * Is the actor being restarted? - */ - def isBeingRestarted: Boolean = _status == ActorRefInternals.BEING_RESTARTED - - /** - * Is the actor running? - */ - def isRunning: Boolean = _status match { - case ActorRefInternals.BEING_RESTARTED | ActorRefInternals.RUNNING => true - case _ => false - } - - /** - * Is the actor shut down? - */ - def isShutdown: Boolean = _status == ActorRefInternals.SHUTDOWN - - /** - * Is the actor ever started? - */ - def isUnstarted: Boolean = _status == ActorRefInternals.UNSTARTED - - /** - * Is the actor able to handle the message passed in as arguments? - */ - @deprecated("Will be removed without replacement, it's just not reliable in the face of `become` and `unbecome`", "1.1") - def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message) - - /** - * Only for internal use. UUID is effectively final. - */ - protected[akka] def uuid_=(uid: Uuid) = _uuid = uid - - /** - * Akka Java API. <p/> - * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. - * <p/> - * <pre> - * actor.sendOneWay(message); - * </pre> - * <p/> - */ - def sendOneWay(message: AnyRef): Unit = sendOneWay(message, null) - - /** - * Akka Java API. <p/> - * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. - * <p/> - * Allows you to pass along the sender of the message. - * <p/> - * <pre> - * actor.sendOneWay(message, context); - * </pre> - * <p/> - */ - def sendOneWay(message: AnyRef, sender: ActorRef): Unit = this.!(message)(Option(sender)) - - /** - * Akka Java API. <p/> - * @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef) - * Uses the default timeout of the Actor (setTimeout()) and omits the sender reference - */ - def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message, timeout, null) - - /** - * Akka Java API. <p/> - * @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef) - * Uses the default timeout of the Actor (setTimeout()) - */ - def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message, timeout, sender) - - /** - * Akka Java API. <p/> - * Sends a message asynchronously and waits on a future for a reply message under the hood. - * <p/> - * It waits on the reply either until it receives it or until the timeout expires - * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. - * <p/> - * <b>NOTE:</b> - * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to - * implement request/response message exchanges. - * <p/> - * If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>getContext().reply(..)</code> - * to send a reply message to the original sender. If not then the sender will block until the timeout expires. - */ - def sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef): AnyRef = { - !!(message, timeout)(Option(sender)).getOrElse(throw new ActorTimeoutException( - "Message [" + message + - "]\n\tsent to [" + actorClassName + - "]\n\tfrom [" + (if (sender ne null) sender.actorClassName else "nowhere") + - "]\n\twith timeout [" + timeout + - "]\n\ttimed out.")) - .asInstanceOf[AnyRef] - } - - /** - * Akka Java API. <p/> - * @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] - * Uses the Actors default timeout (setTimeout()) and omits the sender - */ - def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]] - - /** - * Akka Java API. <p/> - * @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] - * Uses the Actors default timeout (setTimeout()) - */ - def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] = sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]] - - /** - * Akka Java API. <p/> - * Sends a message asynchronously returns a future holding the eventual reply message. - * <p/> - * <b>NOTE:</b> - * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to - * implement request/response message exchanges. - * <p/> - * If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code> - * to send a reply message to the original sender. If not then the sender will block until the timeout expires. - */ - def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]] - - /** - * Akka Java API. <p/> - * Forwards the message specified to this actor and preserves the original sender of the message - */ - def forward(message: AnyRef, sender: ActorRef): Unit = - if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") - else forward(message)(Some(sender)) - - /** - * Akka Java API. <p/> - * Use <code>getContext().replyUnsafe(..)</code> to reply with a message to the original sender of the message currently - * being processed. - * <p/> - * Throws an IllegalStateException if unable to determine what to reply to. - */ - def replyUnsafe(message: AnyRef) = reply(message) - - /** - * Akka Java API. <p/> - * Use <code>getContext().replySafe(..)</code> to reply with a message to the original sender of the message currently - * being processed. - * <p/> - * Returns true if reply was sent, and false if unable to determine what to reply to. - */ - def replySafe(message: AnyRef): Boolean = reply_?(message) - - /** - * Returns the class for the Actor instance that is managed by the ActorRef. - */ - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def actorClass: Class[_ <: Actor] - - /** - * Akka Java API. <p/> - * Returns the class for the Actor instance that is managed by the ActorRef. - */ - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def getActorClass(): Class[_ <: Actor] = actorClass - - /** - * Returns the class name for the Actor instance that is managed by the ActorRef. - */ - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def actorClassName: String - - /** - * Akka Java API. <p/> - * Returns the class name for the Actor instance that is managed by the ActorRef. - */ - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def getActorClassName(): String = actorClassName - - /** - * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. - */ - def dispatcher_=(md: MessageDispatcher): Unit - - /** - * Get the dispatcher for this actor. - */ - def dispatcher: MessageDispatcher - - /** - * Starts up the actor and its message queue. - */ - def start(): ActorRef - - /** - * Shuts down the actor its dispatcher and message queue. - * Alias for 'stop'. - */ - def exit() = stop() - - /** - * Shuts down the actor its dispatcher and message queue. - */ - def stop(): Unit - - /** - * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will - * receive a notification if the linked actor has crashed. - * <p/> - * If the 'trapExit' member field of the 'faultHandler' has been set to at contain at least one exception class then it will - * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy - * defined by the 'faultHandler'. - */ - def link(actorRef: ActorRef): Unit - - /** - * Unlink the actor. - */ - def unlink(actorRef: ActorRef): Unit - - /** - * Atomically start and link an actor. - */ - def startLink(actorRef: ActorRef): Unit - - /** - * Atomically create (from actor class) and start an actor. - * <p/> - * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, use Actor.actorOf instead", "1.1") - def spawn(clazz: Class[_ <: Actor]): ActorRef - - /** - * Atomically create (from actor class), make it remote and start an actor. - * <p/> - * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, client managed actors will be removed", "1.1") - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef - - /** - * Atomically create (from actor class), link and start an actor. - * <p/> - * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, use Actor.remote.actorOf instead and then link on success", "1.1") - def spawnLink(clazz: Class[_ <: Actor]): ActorRef - - /** - * Atomically create (from actor class), make it remote, link and start an actor. - * <p/> - * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, client managed actors will be removed", "1.1") - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef - - /** - * Returns the mailbox size. - */ - def mailboxSize = dispatcher.mailboxSize(this) - - /** - * Akka Java API. <p/> - * Returns the mailbox size. - */ - def getMailboxSize(): Int = mailboxSize - - /** - * Returns the supervisor, if there is one. - */ - def supervisor: Option[ActorRef] - - /** - * Akka Java API. <p/> - * Returns the supervisor, if there is one. - */ - def getSupervisor(): ActorRef = supervisor getOrElse null - - /** - * Returns an unmodifiable Java Map containing the linked actors, - * please note that the backing map is thread-safe but not immutable - */ - def linkedActors: JMap[Uuid, ActorRef] - - /** - * Java API. <p/> - * Returns an unmodifiable Java Map containing the linked actors, - * please note that the backing map is thread-safe but not immutable - */ - def getLinkedActors(): JMap[Uuid, ActorRef] = linkedActors - - /** - * Abstraction for unification of sender and senderFuture for later reply - */ - def channel: Channel[Any] = { - if (senderFuture.isDefined) { - new Channel[Any] { - val future = senderFuture.get - def !(msg: Any) = future completeWithResult msg - } - } else if (sender.isDefined) { - val someSelf = Some(this) - new Channel[Any] { - val client = sender.get - def !(msg: Any) = client.!(msg)(someSelf) - } - } else throw new IllegalActorStateException("No channel available") - } - - /** - * Java API. <p/> - * Abstraction for unification of sender and senderFuture for later reply - */ - def getChannel: Channel[Any] = channel - - protected[akka] def invoke(messageHandle: MessageInvocation): Unit - - protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit - - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] - - protected[akka] def actorInstance: AtomicReference[Actor] - - protected[akka] def actor: Actor = actorInstance.get - - protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit - - protected[akka] def mailbox: AnyRef - protected[akka] def mailbox_=(value: AnyRef): AnyRef - - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit - - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit - - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit - - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] - - override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid) - - override def equals(that: Any): Boolean = { - that.isInstanceOf[ActorRef] && - that.asInstanceOf[ActorRef].uuid == uuid - } - - override def toString = "Actor[" + id + ":" + uuid + "]" -} - -/** - * Local (serializable) ActorRef that is used when referencing the Actor on its "home" node. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -class LocalActorRef private[akka] ( - private[this] val actorFactory: () => Actor, - val homeAddress: Option[InetSocketAddress], - val clientManaged: Boolean = false) - extends ActorRef with ScalaActorRef { - protected[akka] val guard = new ReentrantGuard - - @volatile - protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None - @volatile - private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] - @volatile - private[akka] var _supervisor: Option[ActorRef] = None - @volatile - private var maxNrOfRetriesCount: Int = 0 - @volatile - private var restartsWithinTimeRangeTimestamp: Long = 0L - @volatile - private var _mailbox: AnyRef = _ - @volatile - private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher - - protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } - - //If it was started inside "newActor", initialize it - if (isRunning) initializeActorInstance - - // used only for deserialization - private[akka] def this( - __uuid: Uuid, - __id: String, - __timeout: Long, - __receiveTimeout: Option[Long], - __lifeCycle: LifeCycle, - __supervisor: Option[ActorRef], - __hotswap: Stack[PartialFunction[Any, Unit]], - __factory: () => Actor, - __homeAddress: Option[InetSocketAddress]) = { - this(__factory, __homeAddress) - _uuid = __uuid - id = __id - timeout = __timeout - receiveTimeout = __receiveTimeout - lifeCycle = __lifeCycle - _supervisor = __supervisor - hotswap = __hotswap - setActorSelfFields(actor, this) - start - } - - /** - * Returns whether this actor ref is client-managed remote or not - */ - private[akka] final def isClientManaged_? = clientManaged && homeAddress.isDefined && isRemotingEnabled - - // ========= PUBLIC FUNCTIONS ========= - - /** - * Returns the class for the Actor instance that is managed by the ActorRef. - */ - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]] - - /** - * Returns the class name for the Actor instance that is managed by the ActorRef. - */ - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def actorClassName: String = actorClass.getName - - /** - * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. - */ - def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard { - if (!isBeingRestarted) { - if (!isRunning) _dispatcher = md - else throw new ActorInitializationException( - "Can not swap dispatcher for " + toString + " after it has been started") - } - } - - /** - * Get the dispatcher for this actor. - */ - def dispatcher: MessageDispatcher = _dispatcher - - /** - * Starts up the actor and its message queue. - */ - def start(): ActorRef = guard.withGuard { - if (isShutdown) throw new ActorStartException( - "Can't restart an actor that has been shut down with 'stop' or 'exit'") - if (!isRunning) { - dispatcher.attach(this) - - _status = ActorRefInternals.RUNNING - - // If we are not currently creating this ActorRef instance - if ((actorInstance ne null) && (actorInstance.get ne null)) - initializeActorInstance - - if (isClientManaged_?) - Actor.remote.registerClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid) - - checkReceiveTimeout //Schedule the initial Receive timeout - } - this - } - - /** - * Shuts down the actor its dispatcher and message queue. - */ - def stop() = guard.withGuard { - if (isRunning) { - receiveTimeout = None - cancelReceiveTimeout - dispatcher.detach(this) - _status = ActorRefInternals.SHUTDOWN - try { - actor.postStop - } finally { - currentMessage = null - Actor.registry.unregister(this) - if (isRemotingEnabled) { - if (isClientManaged_?) - Actor.remote.unregisterClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid) - Actor.remote.unregister(this) - } - setActorSelfFields(actorInstance.get, null) - } - } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") - } - - /** - * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will - * receive a notification if the linked actor has crashed. - * <p/> - * If the 'trapExit' member field of the 'faultHandler' has been set to at contain at least one exception class then it will - * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy - * defined by the 'faultHandler'. - * <p/> - * To be invoked from within the actor itself. - */ - def link(actorRef: ActorRef): Unit = guard.withGuard { - val actorRefSupervisor = actorRef.supervisor - val hasSupervisorAlready = actorRefSupervisor.isDefined - if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return // we already supervise this guy - else if (hasSupervisorAlready) throw new IllegalActorStateException( - "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") - else { - _linkedActors.put(actorRef.uuid, actorRef) - actorRef.supervisor = Some(this) - } - } - - /** - * Unlink the actor. - * <p/> - * To be invoked from within the actor itself. - */ - def unlink(actorRef: ActorRef) = guard.withGuard { - if (_linkedActors.remove(actorRef.uuid) eq null) - throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink") - - actorRef.supervisor = None - } - - /** - * Atomically start and link an actor. - * <p/> - * To be invoked from within the actor itself. - */ - def startLink(actorRef: ActorRef): Unit = guard.withGuard { - link(actorRef) - actorRef.start() - } - - /** - * Atomically create (from actor class) and start an actor. - * <p/> - * To be invoked from within the actor itself. - */ - def spawn(clazz: Class[_ <: Actor]): ActorRef = - Actor.actorOf(clazz).start() - - /** - * Atomically create (from actor class), start and make an actor remote. - * <p/> - * To be invoked from within the actor itself. - */ - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = { - ensureRemotingEnabled - val ref = Actor.remote.actorOf(clazz, hostname, port) - ref.timeout = timeout - ref.start() - } - - /** - * Atomically create (from actor class), start and link an actor. - * <p/> - * To be invoked from within the actor itself. - */ - def spawnLink(clazz: Class[_ <: Actor]): ActorRef = { - val actor = spawn(clazz) - link(actor) - actor.start() - actor - } - - /** - * Atomically create (from actor class), start, link and make an actor remote. - * <p/> - * To be invoked from within the actor itself. - */ - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = { - ensureRemotingEnabled - val actor = Actor.remote.actorOf(clazz, hostname, port) - actor.timeout = timeout - link(actor) - actor.start() - actor - } - - /** - * Returns the mailbox. - */ - def mailbox: AnyRef = _mailbox - - protected[akka] def mailbox_=(value: AnyRef): AnyRef = { _mailbox = value; value } - - /** - * Returns the supervisor, if there is one. - */ - def supervisor: Option[ActorRef] = _supervisor - - // ========= AKKA PROTECTED FUNCTIONS ========= - - protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup - - protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - if (isClientManaged_?) { - Actor.remote.send[Any]( - message, senderOption, None, homeAddress.get, timeout, true, this, None, ActorType.ScalaActor, None) - } else - dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None) - - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - if (isClientManaged_?) { - val future = Actor.remote.send[T]( - message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, ActorType.ScalaActor, None) - if (future.isDefined) future.get - else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) - } else { - val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) - dispatcher dispatchMessage new MessageInvocation( - this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) - future.get - } - } - - /** - * Callback for the dispatcher. This is the single entry point to the user Actor implementation. - */ - protected[akka] def invoke(messageHandle: MessageInvocation): Unit = { - guard.lock.lock - try { - if (!isShutdown) { - currentMessage = messageHandle - try { - try { - cancelReceiveTimeout // FIXME: leave this here? - actor(messageHandle.message) - currentMessage = null // reset current message after successful invocation - } catch { - case e: InterruptedException => - currentMessage = null // received message while actor is shutting down, ignore - case e => - handleExceptionInDispatch(e, messageHandle.message) - } - finally { - checkReceiveTimeout // Reschedule receive timeout - } - } catch { - case e => - EventHandler.error(e, this, messageHandle.message.toString) - throw e - } - } - } finally { guard.lock.unlock } - } - - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) { - faultHandler match { - case AllForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) => - restartLinkedActors(reason, maxRetries, within) - - case OneForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) => - dead.restart(reason, maxRetries, within) - - case _ => - if (_supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason)) - else dead.stop() - } - } - - private def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { - val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal - false - } else if (withinTimeRange.isEmpty) { // restrict number of restarts - maxNrOfRetriesCount += 1 //Increment number of retries - maxNrOfRetriesCount > maxNrOfRetries.get - } else { // cannot restart more than N within M timerange - maxNrOfRetriesCount += 1 //Increment number of retries - val windowStart = restartsWithinTimeRangeTimestamp - val now = System.currentTimeMillis - val retries = maxNrOfRetriesCount - //We are within the time window if it isn't the first restart, or if the window hasn't closed - val insideWindow = if (windowStart == 0) false - else (now - windowStart) <= withinTimeRange.get - - //The actor is dead if it dies X times within the window of restart - val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1) - - if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window - restartsWithinTimeRangeTimestamp = now - - if (windowStart != 0 && !insideWindow) //Reset number of restarts if window has expired - maxNrOfRetriesCount = 1 - - unrestartable - } - - denied == false //If we weren't denied, we have a go - } - - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - def performRestart() { - val failedActor = actorInstance.get - - failedActor match { - case p: Proxyable => - failedActor.preRestart(reason) - failedActor.postRestart(reason) - case _ => - failedActor.preRestart(reason) - val freshActor = newActor - setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor - actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call - freshActor.preStart - freshActor.postRestart(reason) - } - } - - def tooManyRestarts() { - _supervisor.foreach { sup => - // can supervisor handle the notification? - val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) - if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) - } - stop - } - - @tailrec - def attemptRestart() { - val success = if (requestRestartPermission(maxNrOfRetries, withinTimeRange)) { - guard.withGuard[Boolean] { - _status = ActorRefInternals.BEING_RESTARTED - - lifeCycle match { - case Temporary => - shutDownTemporaryActor(this) - true - - case _ => // either permanent or none where default is permanent - val success = try { - performRestart() - true - } catch { - case e => - EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString)) - false // an error or exception here should trigger a retry - } - finally { - currentMessage = null - } - if (success) { - _status = ActorRefInternals.RUNNING - dispatcher.resume(this) - restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) - } - success - } - } - } else { - tooManyRestarts() - true // done - } - - if (success) () // alles gut - else attemptRestart() - } - - attemptRestart() // recur - } - - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = { - val i = _linkedActors.values.iterator - while (i.hasNext) { - val actorRef = i.next - actorRef.lifeCycle match { - // either permanent or none where default is permanent - case Temporary => shutDownTemporaryActor(actorRef) - case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) - } - } - } - - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { - ensureRemotingEnabled - if (_supervisor.isDefined) { - if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this) - Some(_supervisor.get.uuid) - } else None - } - - def linkedActors: JMap[Uuid, ActorRef] = java.util.Collections.unmodifiableMap(_linkedActors) - - // ========= PRIVATE FUNCTIONS ========= - - private[this] def newActor: Actor = { - try { - Actor.actorRefInCreation.set(Some(this)) - val a = actorFactory() - if (a eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") - a - } finally { - Actor.actorRefInCreation.set(None) - } - } - - private def shutDownTemporaryActor(temporaryActor: ActorRef) { - temporaryActor.stop() - _linkedActors.remove(temporaryActor.uuid) // remove the temporary actor - // if last temporary actor is gone, then unlink me from supervisor - if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this)) - true - } - - private def handleExceptionInDispatch(reason: Throwable, message: Any) = { - EventHandler.error(reason, this, message.toString) - - //Prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - - senderFuture.foreach(_.completeWithException(reason)) - - if (supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason)) - else { - lifeCycle match { - case Temporary => shutDownTemporaryActor(this) - case _ => dispatcher.resume(this) //Resume processing for this actor - } - } - } - - private def notifySupervisorWithMessage(notification: LifeCycleMessage) = { - // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - _supervisor.foreach { sup => - if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors - //Scoped stop all linked actors, to avoid leaking the 'i' val - { - val i = _linkedActors.values.iterator - while (i.hasNext) { - i.next.stop() - i.remove - } - } - //Stop the actor itself - stop - } else sup ! notification // else notify supervisor - } - } - - private def setActorSelfFields(actor: Actor, value: ActorRef) { - - @tailrec - def lookupAndSetSelfFields(clazz: Class[_], actor: Actor, value: ActorRef): Boolean = { - val success = try { - val selfField = clazz.getDeclaredField("self") - val someSelfField = clazz.getDeclaredField("someSelf") - selfField.setAccessible(true) - someSelfField.setAccessible(true) - selfField.set(actor, value) - someSelfField.set(actor, if (value ne null) Some(value) else null) - true - } catch { - case e: NoSuchFieldException => false - } - - if (success) true - else { - val parent = clazz.getSuperclass - if (parent eq null) - throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") - lookupAndSetSelfFields(parent, actor, value) - } - } - - lookupAndSetSelfFields(actor.getClass, actor, value) - } - - private def initializeActorInstance = { - actor.preStart // run actor preStart - Actor.registry.register(this) - } - - protected[akka] def checkReceiveTimeout = { - cancelReceiveTimeout - if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed - _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS)) - } - } - - protected[akka] def cancelReceiveTimeout = { - if (_futureTimeout.isDefined) { - _futureTimeout.get.cancel(true) - _futureTimeout = None - } - } -} - -/** - * System messages for RemoteActorRef. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -object RemoteActorSystemMessage { - val Stop = "RemoteActorRef:stop".intern -} - -/** - * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. - * This reference is network-aware (remembers its origin) and immutable. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -private[akka] case class RemoteActorRef private[akka] ( - classOrServiceName: String, - val actorClassName: String, - val hostname: String, - val port: Int, - _timeout: Long, - loader: Option[ClassLoader], - val actorType: ActorType = ActorType.ScalaActor) - extends ActorRef with ScalaActorRef { - - ensureRemotingEnabled - - val homeAddress = Some(new InetSocketAddress(hostname, port)) - - //protected def clientManaged = classOrServiceName.isEmpty //If no class or service name, it's client managed - id = classOrServiceName - //id = classOrServiceName.getOrElse("uuid:" + uuid) //If we're a server-managed we want to have classOrServiceName as id, or else, we're a client-managed and we want to have our uuid as id - - timeout = _timeout - - start - - def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - Actor.remote.send[Any](message, senderOption, None, homeAddress.get, timeout, true, this, None, actorType, loader) - - def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = Actor.remote.send[T]( - message, senderOption, senderFuture, - homeAddress.get, timeout, - false, this, None, - actorType, loader) - if (future.isDefined) future.get - else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) - } - - def start: ActorRef = synchronized { - _status = ActorRefInternals.RUNNING - this - } - - def stop: Unit = synchronized { - if (_status == ActorRefInternals.RUNNING) { - _status = ActorRefInternals.SHUTDOWN - postMessageToMailbox(RemoteActorSystemMessage.Stop, None) - } - } - - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None - - // ==== NOT SUPPORTED ==== - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def actorClass: Class[_ <: Actor] = unsupported - def dispatcher_=(md: MessageDispatcher): Unit = unsupported - def dispatcher: MessageDispatcher = unsupported - def link(actorRef: ActorRef): Unit = unsupported - def unlink(actorRef: ActorRef): Unit = unsupported - def startLink(actorRef: ActorRef): Unit = unsupported - def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported - def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported - def supervisor: Option[ActorRef] = unsupported - def linkedActors: JMap[Uuid, ActorRef] = unsupported - protected[akka] def mailbox: AnyRef = unsupported - protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported - protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported - protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported - protected[akka] def actorInstance: AtomicReference[Actor] = unsupported - private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") -} - -/** - * This trait represents the common (external) methods for all ActorRefs - * Needed because implicit conversions aren't applied when instance imports are used - * - * i.e. - * var self: ScalaActorRef = ... - * import self._ - * //can't call ActorRef methods here unless they are declared in a common - * //superclass, which ActorRefShared is. - */ -trait ActorRefShared { - /** - * Returns the uuid for the actor. - */ - def uuid: Uuid -} - -/** - * This trait represents the Scala Actor API - * There are implicit conversions in ../actor/Implicits.scala - * from ActorRef -> ScalaActorRef and back - */ -trait ScalaActorRef extends ActorRefShared { ref: ActorRef => - - /** - * Identifier for actor, does not have to be a unique one. Default is the 'uuid'. - * <p/> - * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote - * actor in RemoteServer etc.But also as the identifier for persistence, which means - * that you can use a custom name to be able to retrieve the "correct" persisted state - * upon restart, remote restart etc. - */ - def id: String - - def id_=(id: String): Unit - - /** - * User overridable callback/setting. - * <p/> - * Defines the life-cycle for a supervised actor. - */ - @volatile - @BeanProperty - var lifeCycle: LifeCycle = UndefinedLifeCycle - - /** - * User overridable callback/setting. - * <p/> - * Don't forget to supply a List of exception types to intercept (trapExit) - * <p/> - * Can be one of: - * <pre> - * faultHandler = AllForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange) - * </pre> - * Or: - * <pre> - * faultHandler = OneForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange) - * </pre> - */ - @volatile - @BeanProperty - var faultHandler: FaultHandlingStrategy = NoFaultHandlingStrategy - - /** - * The reference sender Actor of the last received message. - * Is defined if the message was sent from another Actor, else None. - */ - def sender: Option[ActorRef] = { - val msg = currentMessage - if (msg eq null) None - else msg.sender - } - - /** - * The reference sender future of the last received message. - * Is defined if the message was sent with sent with '!!' or '!!!', else None. - */ - def senderFuture(): Option[CompletableFuture[Any]] = { - val msg = currentMessage - if (msg eq null) None - else msg.senderFuture - } - - /** - * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. - * <p/> - * - * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument. - * <p/> - * - * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable, - * if invoked from within an Actor. If not then no sender is available. - * <pre> - * actor ! message - * </pre> - * <p/> - */ - def !(message: Any)(implicit sender: Option[ActorRef] = None): Unit = { - if (isRunning) postMessageToMailbox(message, sender) - else throw new ActorInitializationException( - "Actor has not been started, you need to invoke 'actor.start()' before using it") - } - - /** - * Sends a message asynchronously and waits on a future for a reply message. - * <p/> - * It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>) - * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics. - * <p/> - * <b>NOTE:</b> - * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to - * implement request/response message exchanges. - * If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code> - * to send a reply message to the original sender. If not then the sender will block until the timeout expires. - */ - def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = { - if (isRunning) { - val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None) - val isMessageJoinPoint = if (isTypedActorEnabled) TypedActorModule.resolveFutureIfMessageIsJoinPoint(message, future) - else false - try { - future.await - } catch { - case e: FutureTimeoutException => - if (isMessageJoinPoint) { - EventHandler.error(e, this, e.getMessage) - throw e - } else None - } - future.resultOrException - } else throw new ActorInitializationException( - "Actor has not been started, you need to invoke 'actor.start()' before using it") - } - - /** - * Sends a message asynchronously returns a future holding the eventual reply message. - * <p/> - * <b>NOTE:</b> - * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to - * implement request/response message exchanges. - * If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>self.reply(..)</code> - * to send a reply message to the original sender. If not then the sender will block until the timeout expires. - */ - def !!![T](message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Future[T] = { - if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None) - else throw new ActorInitializationException( - "Actor has not been started, you need to invoke 'actor.start()' before using it") - } - - /** - * Forwards the message and passes the original sender actor as the sender. - * <p/> - * Works with '!', '!!' and '!!!'. - */ - def forward(message: Any)(implicit sender: Some[ActorRef]) = { - if (isRunning) { - if (sender.get.senderFuture.isDefined) - postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, sender.get.sender, sender.get.senderFuture) - else - postMessageToMailbox(message, sender.get.sender) - } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start()' before using it") - } - - /** - * Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently - * being processed. - * <p/> - * Throws an IllegalStateException if unable to determine what to reply to. - */ - def reply(message: Any) = if (!reply_?(message)) throw new IllegalActorStateException( - "\n\tNo sender in scope, can't reply. " + - "\n\tYou have probably: " + - "\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." + - "\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." + - "\n\tElse you might want to use 'reply_?' which returns Boolean(true) if success and Boolean(false) if no sender in scope") - - /** - * Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently - * being processed. - * <p/> - * Returns true if reply was sent, and false if unable to determine what to reply to. - */ - def reply_?(message: Any): Boolean = { - if (senderFuture.isDefined) { - senderFuture.get completeWithResult message - true - } else if (sender.isDefined) { - //TODO: optimize away this allocation, perhaps by having implicit self: Option[ActorRef] in signature - sender.get.!(message)(Some(this)) - true - } else false - } - - /** - * Atomically create (from actor class) and start an actor. - */ - def spawn[T <: Actor: ClassTag]: ActorRef = - spawn(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]]) - - /** - * Atomically create (from actor class), start and make an actor remote. - */ - def spawnRemote[T <: Actor: ClassTag](hostname: String, port: Int, timeout: Long): ActorRef = { - ensureRemotingEnabled - spawnRemote(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) - } - - /** - * Atomically create (from actor class), start and link an actor. - */ - def spawnLink[T <: Actor: ClassTag]: ActorRef = - spawnLink(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]]) - - /** - * Atomically create (from actor class), start, link and make an actor remote. - */ - def spawnLinkRemote[T <: Actor: ClassTag](hostname: String, port: Int, timeout: Long): ActorRef = { - ensureRemotingEnabled - spawnLinkRemote(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) - } -} |