diff options
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/actor')
10 files changed, 0 insertions, 3486 deletions
diff --git a/test/disabled/presentation/akka/src/akka/actor/Actor.scala b/test/disabled/presentation/akka/src/akka/actor/Actor.scala deleted file mode 100644 index b9bc51b635..0000000000 --- a/test/disabled/presentation/akka/src/akka/actor/Actor.scala +++ /dev/null @@ -1,503 +0,0 @@ -/** Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.actor - -import akka.dispatch._ -import akka.config.Config._ -import akka.util.Helpers.{ narrow, narrowSilently } -import akka.util.ListenerManagement -import akka.AkkaException - -import scala.beans.BeanProperty -import akka.util.{ ReflectiveAccess, Duration } -import akka.remoteinterface.RemoteSupport -import akka.japi.{ Creator, Procedure } -import java.lang.reflect.InvocationTargetException - -/** Life-cycle messages for the Actors - */ -sealed trait LifeCycleMessage extends Serializable - -/* Marker trait to show which Messages are automatically handled by Akka */ -sealed trait AutoReceivedMessage { self: LifeCycleMessage => } - -case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true) - extends AutoReceivedMessage with LifeCycleMessage { - - /** Java API - */ - def this(code: akka.japi.Function[ActorRef, Procedure[Any]], discardOld: Boolean) = - this((self: ActorRef) => { - val behavior = code(self) - val result: Actor.Receive = { case msg => behavior(msg) } - result - }, discardOld) - - /** Java API with default non-stacking behavior - */ - def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true) -} - -case object RevertHotSwap extends AutoReceivedMessage with LifeCycleMessage - -case class Restart(reason: Throwable) extends AutoReceivedMessage with LifeCycleMessage - -case class Exit(dead: ActorRef, killer: Throwable) extends AutoReceivedMessage with LifeCycleMessage - -case class Link(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage - -case class Unlink(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage - -case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage - -case object PoisonPill extends AutoReceivedMessage with LifeCycleMessage - -case object Kill extends AutoReceivedMessage with LifeCycleMessage - -case object ReceiveTimeout extends LifeCycleMessage - -case class MaximumNumberOfRestartsWithinTimeRangeReached( - @BeanProperty val victim: ActorRef, - @BeanProperty val maxNrOfRetries: Option[Int], - @BeanProperty val withinTimeRange: Option[Int], - @BeanProperty val lastExceptionCausingRestart: Throwable) extends LifeCycleMessage - -// Exceptions for Actors -class ActorStartException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) -class IllegalActorStateException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) -class ActorKilledException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) -class ActorInitializationException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) -class ActorTimeoutException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) -class InvalidMessageException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) - -/** This message is thrown by default when an Actors behavior doesn't match a message - */ -case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception { - override def getMessage() = "Actor %s does not handle [%s]".format(ref, msg) - override def fillInStackTrace() = this //Don't waste cycles generating stack trace -} - -/** Actor factory module with factory methods for creating various kinds of Actors. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -object Actor extends ListenerManagement { - - /** Add shutdown cleanups - */ - private[akka] lazy val shutdownHook = { - val hook = new Runnable { - override def run { - // Clear Thread.subclassAudits - val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits") - tf.setAccessible(true) - val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_, _]] - subclassAudits synchronized { subclassAudits.clear } - } - } - Runtime.getRuntime.addShutdownHook(new Thread(hook)) - hook - } - - val registry = new ActorRegistry - - lazy val remote: RemoteSupport = { - ReflectiveAccess - .Remote - .defaultRemoteSupport - .map(_()) - .getOrElse(throw new UnsupportedOperationException("You need to have akka-remote.jar on classpath")) - } - - private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis - private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) - - /** A Receive is a convenience type that defines actor message behavior currently modeled as - * a PartialFunction[Any, Unit]. - */ - type Receive = PartialFunction[Any, Unit] - - private[actor] val actorRefInCreation = new ThreadLocal[Option[ActorRef]] { - override def initialValue = None - } - - /** Creates an ActorRef out of the Actor with type T. - * <pre> - * import Actor._ - * val actor = actorOf[MyActor] - * actor.start() - * actor ! message - * actor.stop() - * </pre> - * You can create and start the actor in one statement like this: - * <pre> - * val actor = actorOf[MyActor].start() - * </pre> - */ - def actorOf[T <: Actor: ClassTag]: ActorRef = actorOf(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]]) - - /** Creates an ActorRef out of the Actor of the specified Class. - * <pre> - * import Actor._ - * val actor = actorOf(classOf[MyActor]) - * actor.start() - * actor ! message - * actor.stop() - * </pre> - * You can create and start the actor in one statement like this: - * <pre> - * val actor = actorOf(classOf[MyActor]).start() - * </pre> - */ - def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => { - import ReflectiveAccess.{ createInstance, noParams, noArgs } - createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs) match { - case Right(actor) => actor - case Left(exception) => - val cause = exception match { - case i: InvocationTargetException => i.getTargetException - case _ => exception - } - - throw new ActorInitializationException( - "Could not instantiate Actor of " + clazz + - "\nMake sure Actor is NOT defined inside a class/trait," + - "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", cause) - } - - }, None) - - /** Creates an ActorRef out of the Actor. Allows you to pass in a factory function - * that creates the Actor. Please note that this function can be invoked multiple - * times if for example the Actor is supervised and needs to be restarted. - * <p/> - * This function should <b>NOT</b> be used for remote actors. - * <pre> - * import Actor._ - * val actor = actorOf(new MyActor) - * actor.start() - * actor ! message - * actor.stop() - * </pre> - * You can create and start the actor in one statement like this: - * <pre> - * val actor = actorOf(new MyActor).start() - * </pre> - */ - def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory, None) - - /** Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator<Actor>) - * that creates the Actor. Please note that this function can be invoked multiple - * times if for example the Actor is supervised and needs to be restarted. - * <p/> - * This function should <b>NOT</b> be used for remote actors. - * JAVA API - */ - def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create, None) - - /** Use to spawn out a block of code in an event-driven actor. Will shut actor down when - * the block has been executed. - * <p/> - * NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since - * there is a method 'spawn[ActorType]' in the Actor trait already. - * Example: - * <pre> - * import Actor.{spawn} - * - * spawn { - * ... // do stuff - * } - * </pre> - */ - def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = { - case object Spawn - actorOf(new Actor() { - self.dispatcher = dispatcher - def receive = { - case Spawn => try { body } finally { self.stop() } - } - }).start() ! Spawn - } - - /** Implicitly converts the given Option[Any] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code> - * to convert an Option[Any] to an Option[T]. - */ - implicit def toAnyOptionAsTypedOption(anyOption: Option[Any]) = new AnyOptionAsTypedOption(anyOption) - - /** Implicitly converts the given Future[_] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code> - * to convert an Option[Any] to an Option[T]. - * This means that the following code is equivalent: - * (actor !! "foo").as[Int] (Deprecated) - * and - * (actor !!! "foo").as[Int] (Recommended) - */ - implicit def futureToAnyOptionAsTypedOption(anyFuture: Future[_]) = new AnyOptionAsTypedOption({ - try { anyFuture.await } catch { case t: FutureTimeoutException => } - anyFuture.resultOrException - }) -} - -/** Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': - * <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a> - * <p/> - * An actor has a well-defined (non-cyclic) life-cycle. - * <pre> - * => NEW (newly created actor) - can't receive messages (yet) - * => STARTED (when 'start' is invoked) - can receive messages - * => SHUT DOWN (when 'exit' is invoked) - can't do anything - * </pre> - * - * <p/> - * The Actor's API is available in the 'self' member variable. - * - * <p/> - * Here you find functions like: - * - !, !!, !!! and forward - * - link, unlink, startLink, spawnLink etc - * - makeRemote etc. - * - start, stop - * - etc. - * - * <p/> - * Here you also find fields like - * - dispatcher = ... - * - id = ... - * - lifeCycle = ... - * - faultHandler = ... - * - trapExit = ... - * - etc. - * - * <p/> - * This means that to use them you have to prefix them with 'self', like this: <tt>self ! Message</tt> - * - * However, for convenience you can import these functions and fields like below, which will allow you do - * drop the 'self' prefix: - * <pre> - * class MyActor extends Actor { - * import self._ - * id = ... - * dispatcher = ... - * spawnLink[OtherActor] - * ... - * } - * </pre> - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -trait Actor { - - /** Type alias because traits cannot have companion objects. - */ - type Receive = Actor.Receive - - /* - * Some[ActorRef] representation of the 'self' ActorRef reference. - * <p/> - * Mainly for internal use, functions as the implicit sender references when invoking - * the 'forward' function. - */ - @transient - implicit val someSelf: Some[ActorRef] = { - val optRef = Actor.actorRefInCreation.get - if (optRef.isEmpty) throw new ActorInitializationException( - "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." + - "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." + - "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." + - "\n\tEither use:" + - "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + - "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'") - Actor.actorRefInCreation.set(None) - optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed? - optRef.asInstanceOf[Some[ActorRef]] - } - - /* - * Option[ActorRef] representation of the 'self' ActorRef reference. - * <p/> - * Mainly for internal use, functions as the implicit sender references when invoking - * one of the message send functions ('!', '!!' and '!!!'). - */ - implicit def optionSelf: Option[ActorRef] = someSelf - - /** The 'self' field holds the ActorRef for this actor. - * <p/> - * Can be used to send messages to itself: - * <pre> - * self ! message - * </pre> - * Here you also find most of the Actor API. - * <p/> - * For example fields like: - * <pre> - * self.dispatcher = ... - * self.trapExit = ... - * self.faultHandler = ... - * self.lifeCycle = ... - * self.sender - * </pre> - * <p/> - * Here you also find methods like: - * <pre> - * self.reply(..) - * self.link(..) - * self.unlink(..) - * self.start(..) - * self.stop(..) - * </pre> - */ - @transient - val self: ScalaActorRef = someSelf.get - - /** User overridable callback/setting. - * <p/> - * Partial function implementing the actor logic. - * To be implemented by concrete actor class. - * <p/> - * Example code: - * <pre> - * def receive = { - * case Ping => - * println("got a 'Ping' message") - * self.reply("pong") - * - * case OneWay => - * println("got a 'OneWay' message") - * - * case unknown => - * println("unknown message: " + unknown) - * } - * </pre> - */ - protected def receive: Receive - - /** User overridable callback. - * <p/> - * Is called when an Actor is started by invoking 'actor.start()'. - */ - def preStart() {} - - /** User overridable callback. - * <p/> - * Is called when 'actor.stop()' is invoked. - */ - def postStop() {} - - /** User overridable callback. - * <p/> - * Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated. - */ - def preRestart(reason: Throwable) {} - - /** User overridable callback. - * <p/> - * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. - */ - def postRestart(reason: Throwable) {} - - /** User overridable callback. - * <p/> - * Is called when a message isn't handled by the current behavior of the actor - * by default it throws an UnhandledMessageException - */ - def unhandled(msg: Any) { - throw new UnhandledMessageException(msg, self) - } - - /** Is the actor able to handle the message passed in as arguments? - */ - def isDefinedAt(message: Any): Boolean = { - val behaviorStack = self.hotswap - message match { //Same logic as apply(msg) but without the unhandled catch-all - case l: AutoReceivedMessage => true - case msg if behaviorStack.nonEmpty && - behaviorStack.head.isDefinedAt(msg) => true - case msg if behaviorStack.isEmpty && - processingBehavior.isDefinedAt(msg) => true - case _ => false - } - } - - /** Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. - * Puts the behavior on top of the hotswap stack. - * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack - */ - def become(behavior: Receive, discardOld: Boolean = true) { - if (discardOld) unbecome() - self.hotswap = self.hotswap.push(behavior) - } - - /** Reverts the Actor behavior to the previous one in the hotswap stack. - */ - def unbecome(): Unit = { - val h = self.hotswap - if (h.nonEmpty) self.hotswap = h.pop - } - - // ========================================= - // ==== INTERNAL IMPLEMENTATION DETAILS ==== - // ========================================= - - private[akka] final def apply(msg: Any) = { - if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null)) - throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null") - val behaviorStack = self.hotswap - msg match { - case l: AutoReceivedMessage => autoReceiveMessage(l) - case msg if behaviorStack.nonEmpty && - behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg) - case msg if behaviorStack.isEmpty && - processingBehavior.isDefinedAt(msg) => processingBehavior.apply(msg) - case unknown => unhandled(unknown) //This is the only line that differs from processingbehavior - } - } - - private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match { - case HotSwap(code, discardOld) => become(code(self), discardOld) - case RevertHotSwap => unbecome() - case Exit(dead, reason) => self.handleTrapExit(dead, reason) - case Link(child) => self.link(child) - case Unlink(child) => self.unlink(child) - case UnlinkAndStop(child) => self.unlink(child); child.stop() - case Restart(reason) => throw reason - case Kill => throw new ActorKilledException("Kill") - case PoisonPill => - val f = self.senderFuture - self.stop() - if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill")) - } - - private lazy val processingBehavior = receive //ProcessingBehavior is the original behavior -} - -private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) { - - /** Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException - * if the actual type is not assignable from the given one. - */ - def as[T]: Option[T] = narrow[T](anyOption) - - /** Convenience helper to cast the given Option of Any to an Option of the given type. Will swallow a possible - * ClassCastException and return None in that case. - */ - def asSilently[T: ClassTag]: Option[T] = narrowSilently[T](anyOption) -} - -/** Marker interface for proxyable actors (such as typed actor). - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -trait Proxyable { - private[actor] def swapProxiedActor(newInstance: Actor) -} - -/** Represents the different Actor types. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -sealed trait ActorType -object ActorType { - case object ScalaActor extends ActorType - case object TypedActor extends ActorType -} diff --git a/test/disabled/presentation/akka/src/akka/actor/ActorRef.scala b/test/disabled/presentation/akka/src/akka/actor/ActorRef.scala deleted file mode 100644 index 97bb710e29..0000000000 --- a/test/disabled/presentation/akka/src/akka/actor/ActorRef.scala +++ /dev/null @@ -1,1433 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.actor - -import akka.event.EventHandler -import akka.dispatch._ -import akka.config.Supervision._ -import akka.util._ -import ReflectiveAccess._ - -import java.net.InetSocketAddress -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } -import java.util.{ Map => JMap } - -import scala.beans.BeanProperty -import scala.collection.immutable.Stack -import scala.annotation.tailrec - -private[akka] object ActorRefInternals { - - /** - * LifeCycles for ActorRefs. - */ - private[akka] sealed trait StatusType - object UNSTARTED extends StatusType - object RUNNING extends StatusType - object BEING_RESTARTED extends StatusType - object SHUTDOWN extends StatusType -} - -/** - * Abstraction for unification of sender and senderFuture for later reply. - * Can be stored away and used at a later point in time. - */ -abstract class Channel[T] { - - /** - * Scala API. <p/> - * Sends the specified message to the channel. - */ - def !(msg: T): Unit - - /** - * Java API. <p/> - * Sends the specified message to the channel. - */ - def sendOneWay(msg: T): Unit = this.!(msg) -} - -/** - * ActorRef is an immutable and serializable handle to an Actor. - * <p/> - * Create an ActorRef for an Actor by using the factory method on the Actor object. - * <p/> - * Here is an example on how to create an actor with a default constructor. - * <pre> - * import Actor._ - * - * val actor = actorOf[MyActor] - * actor.start() - * actor ! message - * actor.stop() - * </pre> - * - * You can also create and start actors like this: - * <pre> - * val actor = actorOf[MyActor].start() - * </pre> - * - * Here is an example on how to create an actor with a non-default constructor. - * <pre> - * import Actor._ - * - * val actor = actorOf(new MyActor(...)) - * actor.start() - * actor ! message - * actor.stop() - * </pre> - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef => - // Only mutable for RemoteServer in order to maintain identity across nodes - @volatile - protected[akka] var _uuid = newUuid - @volatile - protected[this] var _status: ActorRefInternals.StatusType = ActorRefInternals.UNSTARTED - - /** - * User overridable callback/setting. - * <p/> - * Identifier for actor, does not have to be a unique one. Default is the 'uuid'. - * <p/> - * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote - * actor in RemoteServer etc.But also as the identifier for persistence, which means - * that you can use a custom name to be able to retrieve the "correct" persisted state - * upon restart, remote restart etc. - */ - @BeanProperty - @volatile - var id: String = _uuid.toString - - /** - * User overridable callback/setting. - * <p/> - * Defines the default timeout for '!!' and '!!!' invocations, - * e.g. the timeout for the future returned by the call to '!!' and '!!!'. - */ - @deprecated("Will be replaced by implicit-scoped timeout on all methods that needs it, will default to timeout specified in config", "1.1") - @BeanProperty - @volatile - var timeout: Long = Actor.TIMEOUT - - /** - * User overridable callback/setting. - * <p/> - * Defines the default timeout for an initial receive invocation. - * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. - */ - @volatile - var receiveTimeout: Option[Long] = None - - /** - * Akka Java API. <p/> - * Defines the default timeout for an initial receive invocation. - * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. - */ - def setReceiveTimeout(timeout: Long) = this.receiveTimeout = Some(timeout) - def getReceiveTimeout(): Option[Long] = receiveTimeout - - /** - * Akka Java API. <p/> - * A faultHandler defines what should be done when a linked actor signals an error. - * <p/> - * Can be one of: - * <pre> - * getContext().setFaultHandler(new AllForOneStrategy(new Class[]{Throwable.class},maxNrOfRetries, withinTimeRange)); - * </pre> - * Or: - * <pre> - * getContext().setFaultHandler(new OneForOneStrategy(new Class[]{Throwable.class},maxNrOfRetries, withinTimeRange)); - * </pre> - */ - def setFaultHandler(handler: FaultHandlingStrategy) - def getFaultHandler(): FaultHandlingStrategy - - /** - * Akka Java API. <p/> - * A lifeCycle defines whether the actor will be stopped on error (Temporary) or if it can be restarted (Permanent) - * <p/> - * Can be one of: - * - * import static akka.config.Supervision.*; - * <pre> - * getContext().setLifeCycle(permanent()); - * </pre> - * Or: - * <pre> - * getContext().setLifeCycle(temporary()); - * </pre> - */ - def setLifeCycle(lifeCycle: LifeCycle): Unit - def getLifeCycle(): LifeCycle - - /** - * Akka Java API. <p/> - * The default dispatcher is the <tt>Dispatchers.globalExecutorBasedEventDrivenDispatcher</tt>. - * This means that all actors will share the same event-driven executor based dispatcher. - * <p/> - * You can override it so it fits the specific use-case that the actor is used for. - * See the <tt>akka.dispatch.Dispatchers</tt> class for the different - * dispatchers available. - * <p/> - * The default is also that all actors that are created and spawned from within this actor - * is sharing the same dispatcher as its creator. - */ - def setDispatcher(dispatcher: MessageDispatcher) = this.dispatcher = dispatcher - def getDispatcher(): MessageDispatcher = dispatcher - - /** - * Returns on which node this actor lives if None it lives in the local ActorRegistry - */ - @deprecated("Remoting will become fully transparent in the future", "1.1") - def homeAddress: Option[InetSocketAddress] - - /** - * Java API. <p/> - */ - @deprecated("Remoting will become fully transparent in the future", "1.1") - def getHomeAddress(): InetSocketAddress = homeAddress getOrElse null - - /** - * Holds the hot swapped partial function. - */ - @volatile - protected[akka] var hotswap = Stack[PartialFunction[Any, Unit]]() - - /** - * This is a reference to the message currently being processed by the actor - */ - @volatile - protected[akka] var currentMessage: MessageInvocation = null - - /** - * Comparison only takes uuid into account. - */ - def compareTo(other: ActorRef) = this.uuid compareTo other.uuid - - /** - * Returns the uuid for the actor. - */ - def getUuid() = _uuid - def uuid = _uuid - - /** - * Akka Java API. <p/> - * The reference sender Actor of the last received message. - * Is defined if the message was sent from another Actor, else None. - */ - def getSender(): Option[ActorRef] = sender - - /** - * Akka Java API. <p/> - * The reference sender future of the last received message. - * Is defined if the message was sent with sent with '!!' or '!!!', else None. - */ - def getSenderFuture(): Option[CompletableFuture[Any]] = senderFuture - - /** - * Is the actor being restarted? - */ - def isBeingRestarted: Boolean = _status == ActorRefInternals.BEING_RESTARTED - - /** - * Is the actor running? - */ - def isRunning: Boolean = _status match { - case ActorRefInternals.BEING_RESTARTED | ActorRefInternals.RUNNING => true - case _ => false - } - - /** - * Is the actor shut down? - */ - def isShutdown: Boolean = _status == ActorRefInternals.SHUTDOWN - - /** - * Is the actor ever started? - */ - def isUnstarted: Boolean = _status == ActorRefInternals.UNSTARTED - - /** - * Is the actor able to handle the message passed in as arguments? - */ - @deprecated("Will be removed without replacement, it's just not reliable in the face of `become` and `unbecome`", "1.1") - def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message) - - /** - * Only for internal use. UUID is effectively final. - */ - protected[akka] def uuid_=(uid: Uuid) = _uuid = uid - - /** - * Akka Java API. <p/> - * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. - * <p/> - * <pre> - * actor.sendOneWay(message); - * </pre> - * <p/> - */ - def sendOneWay(message: AnyRef): Unit = sendOneWay(message, null) - - /** - * Akka Java API. <p/> - * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. - * <p/> - * Allows you to pass along the sender of the message. - * <p/> - * <pre> - * actor.sendOneWay(message, context); - * </pre> - * <p/> - */ - def sendOneWay(message: AnyRef, sender: ActorRef): Unit = this.!(message)(Option(sender)) - - /** - * Akka Java API. <p/> - * @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef) - * Uses the default timeout of the Actor (setTimeout()) and omits the sender reference - */ - def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message, timeout, null) - - /** - * Akka Java API. <p/> - * @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef) - * Uses the default timeout of the Actor (setTimeout()) - */ - def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message, timeout, sender) - - /** - * Akka Java API. <p/> - * Sends a message asynchronously and waits on a future for a reply message under the hood. - * <p/> - * It waits on the reply either until it receives it or until the timeout expires - * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics. - * <p/> - * <b>NOTE:</b> - * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to - * implement request/response message exchanges. - * <p/> - * If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>getContext().reply(..)</code> - * to send a reply message to the original sender. If not then the sender will block until the timeout expires. - */ - def sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef): AnyRef = { - !!(message, timeout)(Option(sender)).getOrElse(throw new ActorTimeoutException( - "Message [" + message + - "]\n\tsent to [" + actorClassName + - "]\n\tfrom [" + (if (sender ne null) sender.actorClassName else "nowhere") + - "]\n\twith timeout [" + timeout + - "]\n\ttimed out.")) - .asInstanceOf[AnyRef] - } - - /** - * Akka Java API. <p/> - * @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] - * Uses the Actors default timeout (setTimeout()) and omits the sender - */ - def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]] - - /** - * Akka Java API. <p/> - * @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] - * Uses the Actors default timeout (setTimeout()) - */ - def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] = sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]] - - /** - * Akka Java API. <p/> - * Sends a message asynchronously returns a future holding the eventual reply message. - * <p/> - * <b>NOTE:</b> - * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to - * implement request/response message exchanges. - * <p/> - * If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code> - * to send a reply message to the original sender. If not then the sender will block until the timeout expires. - */ - def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]] - - /** - * Akka Java API. <p/> - * Forwards the message specified to this actor and preserves the original sender of the message - */ - def forward(message: AnyRef, sender: ActorRef): Unit = - if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") - else forward(message)(Some(sender)) - - /** - * Akka Java API. <p/> - * Use <code>getContext().replyUnsafe(..)</code> to reply with a message to the original sender of the message currently - * being processed. - * <p/> - * Throws an IllegalStateException if unable to determine what to reply to. - */ - def replyUnsafe(message: AnyRef) = reply(message) - - /** - * Akka Java API. <p/> - * Use <code>getContext().replySafe(..)</code> to reply with a message to the original sender of the message currently - * being processed. - * <p/> - * Returns true if reply was sent, and false if unable to determine what to reply to. - */ - def replySafe(message: AnyRef): Boolean = reply_?(message) - - /** - * Returns the class for the Actor instance that is managed by the ActorRef. - */ - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def actorClass: Class[_ <: Actor] - - /** - * Akka Java API. <p/> - * Returns the class for the Actor instance that is managed by the ActorRef. - */ - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def getActorClass(): Class[_ <: Actor] = actorClass - - /** - * Returns the class name for the Actor instance that is managed by the ActorRef. - */ - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def actorClassName: String - - /** - * Akka Java API. <p/> - * Returns the class name for the Actor instance that is managed by the ActorRef. - */ - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def getActorClassName(): String = actorClassName - - /** - * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. - */ - def dispatcher_=(md: MessageDispatcher): Unit - - /** - * Get the dispatcher for this actor. - */ - def dispatcher: MessageDispatcher - - /** - * Starts up the actor and its message queue. - */ - def start(): ActorRef - - /** - * Shuts down the actor its dispatcher and message queue. - * Alias for 'stop'. - */ - def exit() = stop() - - /** - * Shuts down the actor its dispatcher and message queue. - */ - def stop(): Unit - - /** - * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will - * receive a notification if the linked actor has crashed. - * <p/> - * If the 'trapExit' member field of the 'faultHandler' has been set to at contain at least one exception class then it will - * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy - * defined by the 'faultHandler'. - */ - def link(actorRef: ActorRef): Unit - - /** - * Unlink the actor. - */ - def unlink(actorRef: ActorRef): Unit - - /** - * Atomically start and link an actor. - */ - def startLink(actorRef: ActorRef): Unit - - /** - * Atomically create (from actor class) and start an actor. - * <p/> - * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, use Actor.actorOf instead", "1.1") - def spawn(clazz: Class[_ <: Actor]): ActorRef - - /** - * Atomically create (from actor class), make it remote and start an actor. - * <p/> - * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, client managed actors will be removed", "1.1") - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef - - /** - * Atomically create (from actor class), link and start an actor. - * <p/> - * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, use Actor.remote.actorOf instead and then link on success", "1.1") - def spawnLink(clazz: Class[_ <: Actor]): ActorRef - - /** - * Atomically create (from actor class), make it remote, link and start an actor. - * <p/> - * To be invoked from within the actor itself. - */ - @deprecated("Will be removed after 1.1, client managed actors will be removed", "1.1") - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef - - /** - * Returns the mailbox size. - */ - def mailboxSize = dispatcher.mailboxSize(this) - - /** - * Akka Java API. <p/> - * Returns the mailbox size. - */ - def getMailboxSize(): Int = mailboxSize - - /** - * Returns the supervisor, if there is one. - */ - def supervisor: Option[ActorRef] - - /** - * Akka Java API. <p/> - * Returns the supervisor, if there is one. - */ - def getSupervisor(): ActorRef = supervisor getOrElse null - - /** - * Returns an unmodifiable Java Map containing the linked actors, - * please note that the backing map is thread-safe but not immutable - */ - def linkedActors: JMap[Uuid, ActorRef] - - /** - * Java API. <p/> - * Returns an unmodifiable Java Map containing the linked actors, - * please note that the backing map is thread-safe but not immutable - */ - def getLinkedActors(): JMap[Uuid, ActorRef] = linkedActors - - /** - * Abstraction for unification of sender and senderFuture for later reply - */ - def channel: Channel[Any] = { - if (senderFuture.isDefined) { - new Channel[Any] { - val future = senderFuture.get - def !(msg: Any) = future completeWithResult msg - } - } else if (sender.isDefined) { - val someSelf = Some(this) - new Channel[Any] { - val client = sender.get - def !(msg: Any) = client.!(msg)(someSelf) - } - } else throw new IllegalActorStateException("No channel available") - } - - /** - * Java API. <p/> - * Abstraction for unification of sender and senderFuture for later reply - */ - def getChannel: Channel[Any] = channel - - protected[akka] def invoke(messageHandle: MessageInvocation): Unit - - protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit - - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] - - protected[akka] def actorInstance: AtomicReference[Actor] - - protected[akka] def actor: Actor = actorInstance.get - - protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit - - protected[akka] def mailbox: AnyRef - protected[akka] def mailbox_=(value: AnyRef): AnyRef - - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit - - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit - - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit - - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] - - override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid) - - override def equals(that: Any): Boolean = { - that.isInstanceOf[ActorRef] && - that.asInstanceOf[ActorRef].uuid == uuid - } - - override def toString = "Actor[" + id + ":" + uuid + "]" -} - -/** - * Local (serializable) ActorRef that is used when referencing the Actor on its "home" node. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -class LocalActorRef private[akka] ( - private[this] val actorFactory: () => Actor, - val homeAddress: Option[InetSocketAddress], - val clientManaged: Boolean = false) - extends ActorRef with ScalaActorRef { - protected[akka] val guard = new ReentrantGuard - - @volatile - protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None - @volatile - private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] - @volatile - private[akka] var _supervisor: Option[ActorRef] = None - @volatile - private var maxNrOfRetriesCount: Int = 0 - @volatile - private var restartsWithinTimeRangeTimestamp: Long = 0L - @volatile - private var _mailbox: AnyRef = _ - @volatile - private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher - - protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } - - //If it was started inside "newActor", initialize it - if (isRunning) initializeActorInstance - - // used only for deserialization - private[akka] def this( - __uuid: Uuid, - __id: String, - __timeout: Long, - __receiveTimeout: Option[Long], - __lifeCycle: LifeCycle, - __supervisor: Option[ActorRef], - __hotswap: Stack[PartialFunction[Any, Unit]], - __factory: () => Actor, - __homeAddress: Option[InetSocketAddress]) = { - this(__factory, __homeAddress) - _uuid = __uuid - id = __id - timeout = __timeout - receiveTimeout = __receiveTimeout - lifeCycle = __lifeCycle - _supervisor = __supervisor - hotswap = __hotswap - setActorSelfFields(actor, this) - start - } - - /** - * Returns whether this actor ref is client-managed remote or not - */ - private[akka] final def isClientManaged_? = clientManaged && homeAddress.isDefined && isRemotingEnabled - - // ========= PUBLIC FUNCTIONS ========= - - /** - * Returns the class for the Actor instance that is managed by the ActorRef. - */ - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]] - - /** - * Returns the class name for the Actor instance that is managed by the ActorRef. - */ - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def actorClassName: String = actorClass.getName - - /** - * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. - */ - def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard { - if (!isBeingRestarted) { - if (!isRunning) _dispatcher = md - else throw new ActorInitializationException( - "Can not swap dispatcher for " + toString + " after it has been started") - } - } - - /** - * Get the dispatcher for this actor. - */ - def dispatcher: MessageDispatcher = _dispatcher - - /** - * Starts up the actor and its message queue. - */ - def start(): ActorRef = guard.withGuard { - if (isShutdown) throw new ActorStartException( - "Can't restart an actor that has been shut down with 'stop' or 'exit'") - if (!isRunning) { - dispatcher.attach(this) - - _status = ActorRefInternals.RUNNING - - // If we are not currently creating this ActorRef instance - if ((actorInstance ne null) && (actorInstance.get ne null)) - initializeActorInstance - - if (isClientManaged_?) - Actor.remote.registerClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid) - - checkReceiveTimeout //Schedule the initial Receive timeout - } - this - } - - /** - * Shuts down the actor its dispatcher and message queue. - */ - def stop() = guard.withGuard { - if (isRunning) { - receiveTimeout = None - cancelReceiveTimeout - dispatcher.detach(this) - _status = ActorRefInternals.SHUTDOWN - try { - actor.postStop - } finally { - currentMessage = null - Actor.registry.unregister(this) - if (isRemotingEnabled) { - if (isClientManaged_?) - Actor.remote.unregisterClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid) - Actor.remote.unregister(this) - } - setActorSelfFields(actorInstance.get, null) - } - } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") - } - - /** - * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will - * receive a notification if the linked actor has crashed. - * <p/> - * If the 'trapExit' member field of the 'faultHandler' has been set to at contain at least one exception class then it will - * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy - * defined by the 'faultHandler'. - * <p/> - * To be invoked from within the actor itself. - */ - def link(actorRef: ActorRef): Unit = guard.withGuard { - val actorRefSupervisor = actorRef.supervisor - val hasSupervisorAlready = actorRefSupervisor.isDefined - if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return // we already supervise this guy - else if (hasSupervisorAlready) throw new IllegalActorStateException( - "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") - else { - _linkedActors.put(actorRef.uuid, actorRef) - actorRef.supervisor = Some(this) - } - } - - /** - * Unlink the actor. - * <p/> - * To be invoked from within the actor itself. - */ - def unlink(actorRef: ActorRef) = guard.withGuard { - if (_linkedActors.remove(actorRef.uuid) eq null) - throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink") - - actorRef.supervisor = None - } - - /** - * Atomically start and link an actor. - * <p/> - * To be invoked from within the actor itself. - */ - def startLink(actorRef: ActorRef): Unit = guard.withGuard { - link(actorRef) - actorRef.start() - } - - /** - * Atomically create (from actor class) and start an actor. - * <p/> - * To be invoked from within the actor itself. - */ - def spawn(clazz: Class[_ <: Actor]): ActorRef = - Actor.actorOf(clazz).start() - - /** - * Atomically create (from actor class), start and make an actor remote. - * <p/> - * To be invoked from within the actor itself. - */ - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = { - ensureRemotingEnabled - val ref = Actor.remote.actorOf(clazz, hostname, port) - ref.timeout = timeout - ref.start() - } - - /** - * Atomically create (from actor class), start and link an actor. - * <p/> - * To be invoked from within the actor itself. - */ - def spawnLink(clazz: Class[_ <: Actor]): ActorRef = { - val actor = spawn(clazz) - link(actor) - actor.start() - actor - } - - /** - * Atomically create (from actor class), start, link and make an actor remote. - * <p/> - * To be invoked from within the actor itself. - */ - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = { - ensureRemotingEnabled - val actor = Actor.remote.actorOf(clazz, hostname, port) - actor.timeout = timeout - link(actor) - actor.start() - actor - } - - /** - * Returns the mailbox. - */ - def mailbox: AnyRef = _mailbox - - protected[akka] def mailbox_=(value: AnyRef): AnyRef = { _mailbox = value; value } - - /** - * Returns the supervisor, if there is one. - */ - def supervisor: Option[ActorRef] = _supervisor - - // ========= AKKA PROTECTED FUNCTIONS ========= - - protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup - - protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - if (isClientManaged_?) { - Actor.remote.send[Any]( - message, senderOption, None, homeAddress.get, timeout, true, this, None, ActorType.ScalaActor, None) - } else - dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None) - - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - if (isClientManaged_?) { - val future = Actor.remote.send[T]( - message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, ActorType.ScalaActor, None) - if (future.isDefined) future.get - else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) - } else { - val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) - dispatcher dispatchMessage new MessageInvocation( - this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) - future.get - } - } - - /** - * Callback for the dispatcher. This is the single entry point to the user Actor implementation. - */ - protected[akka] def invoke(messageHandle: MessageInvocation): Unit = { - guard.lock.lock - try { - if (!isShutdown) { - currentMessage = messageHandle - try { - try { - cancelReceiveTimeout // FIXME: leave this here? - actor(messageHandle.message) - currentMessage = null // reset current message after successful invocation - } catch { - case e: InterruptedException => - currentMessage = null // received message while actor is shutting down, ignore - case e => - handleExceptionInDispatch(e, messageHandle.message) - } - finally { - checkReceiveTimeout // Reschedule receive timeout - } - } catch { - case e => - EventHandler.error(e, this, messageHandle.message.toString) - throw e - } - } - } finally { guard.lock.unlock } - } - - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) { - faultHandler match { - case AllForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) => - restartLinkedActors(reason, maxRetries, within) - - case OneForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) => - dead.restart(reason, maxRetries, within) - - case _ => - if (_supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason)) - else dead.stop() - } - } - - private def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { - val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal - false - } else if (withinTimeRange.isEmpty) { // restrict number of restarts - maxNrOfRetriesCount += 1 //Increment number of retries - maxNrOfRetriesCount > maxNrOfRetries.get - } else { // cannot restart more than N within M timerange - maxNrOfRetriesCount += 1 //Increment number of retries - val windowStart = restartsWithinTimeRangeTimestamp - val now = System.currentTimeMillis - val retries = maxNrOfRetriesCount - //We are within the time window if it isn't the first restart, or if the window hasn't closed - val insideWindow = if (windowStart == 0) false - else (now - windowStart) <= withinTimeRange.get - - //The actor is dead if it dies X times within the window of restart - val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1) - - if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window - restartsWithinTimeRangeTimestamp = now - - if (windowStart != 0 && !insideWindow) //Reset number of restarts if window has expired - maxNrOfRetriesCount = 1 - - unrestartable - } - - denied == false //If we weren't denied, we have a go - } - - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - def performRestart() { - val failedActor = actorInstance.get - - failedActor match { - case p: Proxyable => - failedActor.preRestart(reason) - failedActor.postRestart(reason) - case _ => - failedActor.preRestart(reason) - val freshActor = newActor - setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor - actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call - freshActor.preStart - freshActor.postRestart(reason) - } - } - - def tooManyRestarts() { - _supervisor.foreach { sup => - // can supervisor handle the notification? - val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) - if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) - } - stop - } - - @tailrec - def attemptRestart() { - val success = if (requestRestartPermission(maxNrOfRetries, withinTimeRange)) { - guard.withGuard[Boolean] { - _status = ActorRefInternals.BEING_RESTARTED - - lifeCycle match { - case Temporary => - shutDownTemporaryActor(this) - true - - case _ => // either permanent or none where default is permanent - val success = try { - performRestart() - true - } catch { - case e => - EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString)) - false // an error or exception here should trigger a retry - } - finally { - currentMessage = null - } - if (success) { - _status = ActorRefInternals.RUNNING - dispatcher.resume(this) - restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) - } - success - } - } - } else { - tooManyRestarts() - true // done - } - - if (success) () // alles gut - else attemptRestart() - } - - attemptRestart() // recur - } - - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = { - val i = _linkedActors.values.iterator - while (i.hasNext) { - val actorRef = i.next - actorRef.lifeCycle match { - // either permanent or none where default is permanent - case Temporary => shutDownTemporaryActor(actorRef) - case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) - } - } - } - - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard { - ensureRemotingEnabled - if (_supervisor.isDefined) { - if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this) - Some(_supervisor.get.uuid) - } else None - } - - def linkedActors: JMap[Uuid, ActorRef] = java.util.Collections.unmodifiableMap(_linkedActors) - - // ========= PRIVATE FUNCTIONS ========= - - private[this] def newActor: Actor = { - try { - Actor.actorRefInCreation.set(Some(this)) - val a = actorFactory() - if (a eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") - a - } finally { - Actor.actorRefInCreation.set(None) - } - } - - private def shutDownTemporaryActor(temporaryActor: ActorRef) { - temporaryActor.stop() - _linkedActors.remove(temporaryActor.uuid) // remove the temporary actor - // if last temporary actor is gone, then unlink me from supervisor - if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this)) - true - } - - private def handleExceptionInDispatch(reason: Throwable, message: Any) = { - EventHandler.error(reason, this, message.toString) - - //Prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - - senderFuture.foreach(_.completeWithException(reason)) - - if (supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason)) - else { - lifeCycle match { - case Temporary => shutDownTemporaryActor(this) - case _ => dispatcher.resume(this) //Resume processing for this actor - } - } - } - - private def notifySupervisorWithMessage(notification: LifeCycleMessage) = { - // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - _supervisor.foreach { sup => - if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors - //Scoped stop all linked actors, to avoid leaking the 'i' val - { - val i = _linkedActors.values.iterator - while (i.hasNext) { - i.next.stop() - i.remove - } - } - //Stop the actor itself - stop - } else sup ! notification // else notify supervisor - } - } - - private def setActorSelfFields(actor: Actor, value: ActorRef) { - - @tailrec - def lookupAndSetSelfFields(clazz: Class[_], actor: Actor, value: ActorRef): Boolean = { - val success = try { - val selfField = clazz.getDeclaredField("self") - val someSelfField = clazz.getDeclaredField("someSelf") - selfField.setAccessible(true) - someSelfField.setAccessible(true) - selfField.set(actor, value) - someSelfField.set(actor, if (value ne null) Some(value) else null) - true - } catch { - case e: NoSuchFieldException => false - } - - if (success) true - else { - val parent = clazz.getSuperclass - if (parent eq null) - throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") - lookupAndSetSelfFields(parent, actor, value) - } - } - - lookupAndSetSelfFields(actor.getClass, actor, value) - } - - private def initializeActorInstance = { - actor.preStart // run actor preStart - Actor.registry.register(this) - } - - protected[akka] def checkReceiveTimeout = { - cancelReceiveTimeout - if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed - _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS)) - } - } - - protected[akka] def cancelReceiveTimeout = { - if (_futureTimeout.isDefined) { - _futureTimeout.get.cancel(true) - _futureTimeout = None - } - } -} - -/** - * System messages for RemoteActorRef. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -object RemoteActorSystemMessage { - val Stop = "RemoteActorRef:stop".intern -} - -/** - * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. - * This reference is network-aware (remembers its origin) and immutable. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -private[akka] case class RemoteActorRef private[akka] ( - classOrServiceName: String, - val actorClassName: String, - val hostname: String, - val port: Int, - _timeout: Long, - loader: Option[ClassLoader], - val actorType: ActorType = ActorType.ScalaActor) - extends ActorRef with ScalaActorRef { - - ensureRemotingEnabled - - val homeAddress = Some(new InetSocketAddress(hostname, port)) - - //protected def clientManaged = classOrServiceName.isEmpty //If no class or service name, it's client managed - id = classOrServiceName - //id = classOrServiceName.getOrElse("uuid:" + uuid) //If we're a server-managed we want to have classOrServiceName as id, or else, we're a client-managed and we want to have our uuid as id - - timeout = _timeout - - start - - def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - Actor.remote.send[Any](message, senderOption, None, homeAddress.get, timeout, true, this, None, actorType, loader) - - def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = Actor.remote.send[T]( - message, senderOption, senderFuture, - homeAddress.get, timeout, - false, this, None, - actorType, loader) - if (future.isDefined) future.get - else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) - } - - def start: ActorRef = synchronized { - _status = ActorRefInternals.RUNNING - this - } - - def stop: Unit = synchronized { - if (_status == ActorRefInternals.RUNNING) { - _status = ActorRefInternals.SHUTDOWN - postMessageToMailbox(RemoteActorSystemMessage.Stop, None) - } - } - - protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None - - // ==== NOT SUPPORTED ==== - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def actorClass: Class[_ <: Actor] = unsupported - def dispatcher_=(md: MessageDispatcher): Unit = unsupported - def dispatcher: MessageDispatcher = unsupported - def link(actorRef: ActorRef): Unit = unsupported - def unlink(actorRef: ActorRef): Unit = unsupported - def startLink(actorRef: ActorRef): Unit = unsupported - def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported - def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported - def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported - def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported - def supervisor: Option[ActorRef] = unsupported - def linkedActors: JMap[Uuid, ActorRef] = unsupported - protected[akka] def mailbox: AnyRef = unsupported - protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported - protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported - protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported - protected[akka] def actorInstance: AtomicReference[Actor] = unsupported - private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") -} - -/** - * This trait represents the common (external) methods for all ActorRefs - * Needed because implicit conversions aren't applied when instance imports are used - * - * i.e. - * var self: ScalaActorRef = ... - * import self._ - * //can't call ActorRef methods here unless they are declared in a common - * //superclass, which ActorRefShared is. - */ -trait ActorRefShared { - /** - * Returns the uuid for the actor. - */ - def uuid: Uuid -} - -/** - * This trait represents the Scala Actor API - * There are implicit conversions in ../actor/Implicits.scala - * from ActorRef -> ScalaActorRef and back - */ -trait ScalaActorRef extends ActorRefShared { ref: ActorRef => - - /** - * Identifier for actor, does not have to be a unique one. Default is the 'uuid'. - * <p/> - * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote - * actor in RemoteServer etc.But also as the identifier for persistence, which means - * that you can use a custom name to be able to retrieve the "correct" persisted state - * upon restart, remote restart etc. - */ - def id: String - - def id_=(id: String): Unit - - /** - * User overridable callback/setting. - * <p/> - * Defines the life-cycle for a supervised actor. - */ - @volatile - @BeanProperty - var lifeCycle: LifeCycle = UndefinedLifeCycle - - /** - * User overridable callback/setting. - * <p/> - * Don't forget to supply a List of exception types to intercept (trapExit) - * <p/> - * Can be one of: - * <pre> - * faultHandler = AllForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange) - * </pre> - * Or: - * <pre> - * faultHandler = OneForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange) - * </pre> - */ - @volatile - @BeanProperty - var faultHandler: FaultHandlingStrategy = NoFaultHandlingStrategy - - /** - * The reference sender Actor of the last received message. - * Is defined if the message was sent from another Actor, else None. - */ - def sender: Option[ActorRef] = { - val msg = currentMessage - if (msg eq null) None - else msg.sender - } - - /** - * The reference sender future of the last received message. - * Is defined if the message was sent with sent with '!!' or '!!!', else None. - */ - def senderFuture(): Option[CompletableFuture[Any]] = { - val msg = currentMessage - if (msg eq null) None - else msg.senderFuture - } - - /** - * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. - * <p/> - * - * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument. - * <p/> - * - * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable, - * if invoked from within an Actor. If not then no sender is available. - * <pre> - * actor ! message - * </pre> - * <p/> - */ - def !(message: Any)(implicit sender: Option[ActorRef] = None): Unit = { - if (isRunning) postMessageToMailbox(message, sender) - else throw new ActorInitializationException( - "Actor has not been started, you need to invoke 'actor.start()' before using it") - } - - /** - * Sends a message asynchronously and waits on a future for a reply message. - * <p/> - * It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>) - * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics. - * <p/> - * <b>NOTE:</b> - * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to - * implement request/response message exchanges. - * If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code> - * to send a reply message to the original sender. If not then the sender will block until the timeout expires. - */ - def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = { - if (isRunning) { - val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None) - val isMessageJoinPoint = if (isTypedActorEnabled) TypedActorModule.resolveFutureIfMessageIsJoinPoint(message, future) - else false - try { - future.await - } catch { - case e: FutureTimeoutException => - if (isMessageJoinPoint) { - EventHandler.error(e, this, e.getMessage) - throw e - } else None - } - future.resultOrException - } else throw new ActorInitializationException( - "Actor has not been started, you need to invoke 'actor.start()' before using it") - } - - /** - * Sends a message asynchronously returns a future holding the eventual reply message. - * <p/> - * <b>NOTE:</b> - * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to - * implement request/response message exchanges. - * If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>self.reply(..)</code> - * to send a reply message to the original sender. If not then the sender will block until the timeout expires. - */ - def !!![T](message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Future[T] = { - if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None) - else throw new ActorInitializationException( - "Actor has not been started, you need to invoke 'actor.start()' before using it") - } - - /** - * Forwards the message and passes the original sender actor as the sender. - * <p/> - * Works with '!', '!!' and '!!!'. - */ - def forward(message: Any)(implicit sender: Some[ActorRef]) = { - if (isRunning) { - if (sender.get.senderFuture.isDefined) - postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, sender.get.sender, sender.get.senderFuture) - else - postMessageToMailbox(message, sender.get.sender) - } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start()' before using it") - } - - /** - * Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently - * being processed. - * <p/> - * Throws an IllegalStateException if unable to determine what to reply to. - */ - def reply(message: Any) = if (!reply_?(message)) throw new IllegalActorStateException( - "\n\tNo sender in scope, can't reply. " + - "\n\tYou have probably: " + - "\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." + - "\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." + - "\n\tElse you might want to use 'reply_?' which returns Boolean(true) if success and Boolean(false) if no sender in scope") - - /** - * Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently - * being processed. - * <p/> - * Returns true if reply was sent, and false if unable to determine what to reply to. - */ - def reply_?(message: Any): Boolean = { - if (senderFuture.isDefined) { - senderFuture.get completeWithResult message - true - } else if (sender.isDefined) { - //TODO: optimize away this allocation, perhaps by having implicit self: Option[ActorRef] in signature - sender.get.!(message)(Some(this)) - true - } else false - } - - /** - * Atomically create (from actor class) and start an actor. - */ - def spawn[T <: Actor: ClassTag]: ActorRef = - spawn(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]]) - - /** - * Atomically create (from actor class), start and make an actor remote. - */ - def spawnRemote[T <: Actor: ClassTag](hostname: String, port: Int, timeout: Long): ActorRef = { - ensureRemotingEnabled - spawnRemote(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) - } - - /** - * Atomically create (from actor class), start and link an actor. - */ - def spawnLink[T <: Actor: ClassTag]: ActorRef = - spawnLink(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]]) - - /** - * Atomically create (from actor class), start, link and make an actor remote. - */ - def spawnLinkRemote[T <: Actor: ClassTag](hostname: String, port: Int, timeout: Long): ActorRef = { - ensureRemotingEnabled - spawnLinkRemote(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout) - } -} diff --git a/test/disabled/presentation/akka/src/akka/actor/ActorRegistry.scala b/test/disabled/presentation/akka/src/akka/actor/ActorRegistry.scala deleted file mode 100644 index 5d649fcd36..0000000000 --- a/test/disabled/presentation/akka/src/akka/actor/ActorRegistry.scala +++ /dev/null @@ -1,389 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.actor - -import scala.collection.mutable.{ ListBuffer, Map } -import scala.reflect.ArrayTag - -import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap } -import java.util.{ Set => JSet } - -import annotation.tailrec -import akka.util.ReflectiveAccess._ -import akka.util.{ ReflectiveAccess, ReadWriteGuard, ListenerManagement } - -/** - * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -sealed trait ActorRegistryEvent -case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent -case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent - -/** - * Registry holding all Actor instances in the whole system. - * Mapped by: - * <ul> - * <li>the Actor's UUID</li> - * <li>the Actor's id field (which can be set by user-code)</li> - * <li>the Actor's class</li> - * <li>all Actors that are subtypes of a specific type</li> - * <ul> - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ - -final class ActorRegistry private[actor] () extends ListenerManagement { - - private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef] - private val actorsById = new Index[String, ActorRef] - private val guard = new ReadWriteGuard - - /** - * Returns all actors in the system. - */ - def actors: Array[ActorRef] = filter(_ => true) - - /** - * Returns the number of actors in the system. - */ - def size: Int = actorsByUUID.size - - /** - * Invokes a function for all actors. - */ - def foreach(f: (ActorRef) => Unit) = { - val elements = actorsByUUID.elements - while (elements.hasMoreElements) f(elements.nextElement) - } - - /** - * Invokes the function on all known actors until it returns Some - * Returns None if the function never returns Some - */ - def find[T](f: PartialFunction[ActorRef, T]): Option[T] = { - val elements = actorsByUUID.elements - while (elements.hasMoreElements) { - val element = elements.nextElement - if (f isDefinedAt element) return Some(f(element)) - } - None - } - - /** - * Finds all actors that are subtypes of the class passed in as the ClassTag argument and supporting passed message. - */ - def actorsFor[T <: Actor](message: Any)(implicit classTag: ClassTag[T]): Array[ActorRef] = - filter(a => classTag.erasure.isAssignableFrom(a.actor.getClass) && a.isDefinedAt(message)) - - /** - * Finds all actors that satisfy a predicate. - */ - def filter(p: ActorRef => Boolean): Array[ActorRef] = { - val all = new ListBuffer[ActorRef] - val elements = actorsByUUID.elements - while (elements.hasMoreElements) { - val actorId = elements.nextElement - if (p(actorId)) all += actorId - } - all.toArray - } - - /** - * Finds all actors that are subtypes of the class passed in as the ClassTag argument. - */ - def actorsFor[T <: Actor](implicit classTag: ClassTag[T]): Array[ActorRef] = - actorsFor[T](classTag.erasure.asInstanceOf[Class[T]]) - - /** - * Finds any actor that matches T. Very expensive, traverses ALL alive actors. - */ - def actorFor[T <: Actor](implicit classTag: ClassTag[T]): Option[ActorRef] = - find({ case a: ActorRef if classTag.erasure.isAssignableFrom(a.actor.getClass) => a }) - - /** - * Finds all actors of type or sub-type specified by the class passed in as the Class argument. - */ - def actorsFor[T <: Actor](clazz: Class[T]): Array[ActorRef] = - filter(a => clazz.isAssignableFrom(a.actor.getClass)) - - /** - * Finds all actors that has a specific id. - */ - def actorsFor(id: String): Array[ActorRef] = actorsById values id - - /** - * Finds the actor that has a specific UUID. - */ - def actorFor(uuid: Uuid): Option[ActorRef] = Option(actorsByUUID get uuid) - - /** - * Returns all typed actors in the system. - */ - def typedActors: Array[AnyRef] = filterTypedActors(_ => true) - - /** - * Invokes a function for all typed actors. - */ - def foreachTypedActor(f: (AnyRef) => Unit) = { - TypedActorModule.ensureEnabled - val elements = actorsByUUID.elements - while (elements.hasMoreElements) { - val proxy = typedActorFor(elements.nextElement) - if (proxy.isDefined) f(proxy.get) - } - } - - /** - * Invokes the function on all known typed actors until it returns Some - * Returns None if the function never returns Some - */ - def findTypedActor[T](f: PartialFunction[AnyRef, T]): Option[T] = { - TypedActorModule.ensureEnabled - val elements = actorsByUUID.elements - while (elements.hasMoreElements) { - val proxy = typedActorFor(elements.nextElement) - if (proxy.isDefined && (f isDefinedAt proxy)) return Some(f(proxy)) - } - None - } - - /** - * Finds all typed actors that satisfy a predicate. - */ - def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = { - TypedActorModule.ensureEnabled - val all = new ListBuffer[AnyRef] - val elements = actorsByUUID.elements - while (elements.hasMoreElements) { - val proxy = typedActorFor(elements.nextElement) - if (proxy.isDefined && p(proxy.get)) all += proxy.get - } - all.toArray - } - - /** - * Finds all typed actors that are subtypes of the class passed in as the ClassTag argument. - */ - def typedActorsFor[T <: AnyRef](implicit classTag: ClassTag[T]): Array[AnyRef] = { - TypedActorModule.ensureEnabled - typedActorsFor[T](classTag.erasure.asInstanceOf[Class[T]]) - } - - /** - * Finds any typed actor that matches T. - */ - def typedActorFor[T <: AnyRef](implicit classTag: ClassTag[T]): Option[AnyRef] = { - TypedActorModule.ensureEnabled - def predicate(proxy: AnyRef): Boolean = { - val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) - actorRef.isDefined && classTag.erasure.isAssignableFrom(actorRef.get.actor.getClass) - } - findTypedActor({ case a: Some[AnyRef] if predicate(a.get) => a }) - } - - /** - * Finds all typed actors of type or sub-type specified by the class passed in as the Class argument. - */ - def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = { - TypedActorModule.ensureEnabled - def predicate(proxy: AnyRef): Boolean = { - val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy) - actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass) - } - filterTypedActors(predicate) - } - - /** - * Finds all typed actors that have a specific id. - */ - def typedActorsFor(id: String): Array[AnyRef] = { - TypedActorModule.ensureEnabled - val actorRefs = actorsById values id - actorRefs.flatMap(typedActorFor(_)) - } - - /** - * Finds the typed actor that has a specific UUID. - */ - def typedActorFor(uuid: Uuid): Option[AnyRef] = { - TypedActorModule.ensureEnabled - val actorRef = actorsByUUID get uuid - if (actorRef eq null) None - else typedActorFor(actorRef) - } - - /** - * Get the typed actor proxy for a given typed actor ref. - */ - private def typedActorFor(actorRef: ActorRef): Option[AnyRef] = { - TypedActorModule.typedActorObjectInstance.get.proxyFor(actorRef) - } - - /** - * Registers an actor in the ActorRegistry. - */ - private[akka] def register(actor: ActorRef) { - val id = actor.id - val uuid = actor.uuid - - actorsById.put(id, actor) - actorsByUUID.put(uuid, actor) - - // notify listeners - notifyListeners(ActorRegistered(actor)) - } - - /** - * Unregisters an actor in the ActorRegistry. - */ - private[akka] def unregister(actor: ActorRef) { - val id = actor.id - val uuid = actor.uuid - - actorsByUUID remove uuid - actorsById.remove(id, actor) - - // notify listeners - notifyListeners(ActorUnregistered(actor)) - } - - /** - * Shuts down and unregisters all actors in the system. - */ - def shutdownAll() { - if (TypedActorModule.isEnabled) { - val elements = actorsByUUID.elements - while (elements.hasMoreElements) { - val actorRef = elements.nextElement - val proxy = typedActorFor(actorRef) - if (proxy.isDefined) TypedActorModule.typedActorObjectInstance.get.stop(proxy.get) - else actorRef.stop() - } - } else foreach(_.stop()) - if (Remote.isEnabled) { - Actor.remote.clear //TODO: REVISIT: Should this be here? - } - actorsByUUID.clear - actorsById.clear - } -} - -/** - * An implementation of a ConcurrentMultiMap - * Adds/remove is serialized over the specified key - * Reads are fully concurrent <-- el-cheapo - * - * @author Viktor Klang - */ -class Index[K <: AnyRef, V <: AnyRef: ArrayTag] { - private val Naught = Array[V]() //Nil for Arrays - private val container = new ConcurrentHashMap[K, JSet[V]] - private val emptySet = new ConcurrentSkipListSet[V] - - /** - * Associates the value of type V with the key of type K - * @return true if the value didn't exist for the key previously, and false otherwise - */ - def put(key: K, value: V): Boolean = { - //Tailrecursive spin-locking put - @tailrec - def spinPut(k: K, v: V): Boolean = { - var retry = false - var added = false - val set = container get k - - if (set ne null) { - set.synchronized { - if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry - else { //Else add the value to the set and signal that retry is not needed - added = set add v - retry = false - } - } - } else { - val newSet = new ConcurrentSkipListSet[V] - newSet add v - - // Parry for two simultaneous putIfAbsent(id,newSet) - val oldSet = container.putIfAbsent(k, newSet) - if (oldSet ne null) { - oldSet.synchronized { - if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry - else { //Else try to add the value to the set and signal that retry is not needed - added = oldSet add v - retry = false - } - } - } else added = true - } - - if (retry) spinPut(k, v) - else added - } - - spinPut(key, value) - } - - /** - * @return a _new_ array of all existing values for the given key at the time of the call - */ - def values(key: K): Array[V] = { - val set: JSet[V] = container get key - val result = if (set ne null) set toArray Naught else Naught - result.asInstanceOf[Array[V]] - } - - /** - * @return Some(value) for the first matching value where the supplied function returns true for the given key, - * if no matches it returns None - */ - def findValue(key: K)(f: (V) => Boolean): Option[V] = { - import scala.collection.JavaConversions._ - val set = container get key - if (set ne null) set.iterator.find(f) - else None - } - - /** - * Applies the supplied function to all keys and their values - */ - def foreach(fun: (K, V) => Unit) { - import scala.collection.JavaConversions._ - container.entrySet foreach { (e) => - e.getValue.foreach(fun(e.getKey, _)) - } - } - - /** - * Disassociates the value of type V from the key of type K - * @return true if the value was disassociated from the key and false if it wasn't previously associated with the key - */ - def remove(key: K, value: V): Boolean = { - val set = container get key - - if (set ne null) { - set.synchronized { - if (set.remove(value)) { //If we can remove the value - if (set.isEmpty) //and the set becomes empty - container.remove(key, emptySet) //We try to remove the key if it's mapped to an empty set - - true //Remove succeeded - } else false //Remove failed - } - } else false //Remove failed - } - - /** - * @return true if the underlying containers is empty, may report false negatives when the last remove is underway - */ - def isEmpty: Boolean = container.isEmpty - - /** - * Removes all keys and all values - */ - def clear = foreach { case (k, v) => remove(k, v) } -} diff --git a/test/disabled/presentation/akka/src/akka/actor/Actors.java b/test/disabled/presentation/akka/src/akka/actor/Actors.java deleted file mode 100644 index a5ec9f37dc..0000000000 --- a/test/disabled/presentation/akka/src/akka/actor/Actors.java +++ /dev/null @@ -1,108 +0,0 @@ -package akka.actor; - -import akka.japi.Creator; -import akka.remoteinterface.RemoteSupport; - -/** - * JAVA API for - * - creating actors, - * - creating remote actors, - * - locating actors - */ -public class Actors { - /** - * - * @return The actor registry - */ - public static ActorRegistry registry() { - return Actor$.MODULE$.registry(); - } - - /** - * - * @return - * @throws UnsupportedOperationException If remoting isn't configured - * @throws ModuleNotAvailableException If the class for the remote support cannot be loaded - */ - public static RemoteSupport remote() { - return Actor$.MODULE$.remote(); - } - - /** - * NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the - * UntypedActor instance directly, but only through its 'ActorRef' wrapper reference. - * <p/> - * Creates an ActorRef out of the Actor. Allows you to pass in the instance for the UntypedActor. - * Only use this method when you need to pass in constructor arguments into the 'UntypedActor'. - * <p/> - * You use it by implementing the UntypedActorFactory interface. - * Example in Java: - * <pre> - * ActorRef actor = Actors.actorOf(new UntypedActorFactory() { - * public UntypedActor create() { - * return new MyUntypedActor("service:name", 5); - * } - * }); - * actor.start(); - * actor.sendOneWay(message, context); - * actor.stop(); - * </pre> - */ - public static ActorRef actorOf(final Creator<Actor> factory) { - return Actor$.MODULE$.actorOf(factory); - } - - /** - * Creates an ActorRef out of the Actor type represented by the class provided. - * Example in Java: - * <pre> - * ActorRef actor = Actors.actorOf(MyUntypedActor.class); - * actor.start(); - * actor.sendOneWay(message, context); - * actor.stop(); - * </pre> - * You can create and start the actor in one statement like this: - * <pre> - * val actor = Actors.actorOf(MyActor.class).start(); - * </pre> - */ - public static ActorRef actorOf(final Class<? extends Actor> type) { - return Actor$.MODULE$.actorOf(type); - } - - /** - * The message that is sent when an Actor gets a receive timeout. - * <pre> - * if( message == receiveTimeout() ) { - * //Timed out - * } - * </pre> - * @return the single instance of ReceiveTimeout - */ - public final static ReceiveTimeout$ receiveTimeout() { - return ReceiveTimeout$.MODULE$; - } - - /** - * The message that when sent to an Actor kills it by throwing an exception. - * <pre> - * actor.sendOneWay(kill()); - * </pre> - * @return the single instance of Kill - */ - public final static Kill$ kill() { - return Kill$.MODULE$; - } - - - /** - * The message that when sent to an Actor shuts it down by calling 'stop'. - * <pre> - * actor.sendOneWay(poisonPill()); - * </pre> - * @return the single instance of PoisonPill - */ - public final static PoisonPill$ poisonPill() { - return PoisonPill$.MODULE$; - } -} diff --git a/test/disabled/presentation/akka/src/akka/actor/BootableActorLoaderService.scala b/test/disabled/presentation/akka/src/akka/actor/BootableActorLoaderService.scala deleted file mode 100644 index a54fca9ac7..0000000000 --- a/test/disabled/presentation/akka/src/akka/actor/BootableActorLoaderService.scala +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.actor - -import java.io.File -import java.net.{ URL, URLClassLoader } -import java.util.jar.JarFile - -import akka.util.{ Bootable } -import akka.config.Config._ - -/** - * Handles all modules in the deploy directory (load and unload) - */ -trait BootableActorLoaderService extends Bootable { - - val BOOT_CLASSES = config.getList("akka.boot") - lazy val applicationLoader: Option[ClassLoader] = createApplicationClassLoader - - protected def createApplicationClassLoader: Option[ClassLoader] = Some({ - if (HOME.isDefined) { - val DEPLOY = HOME.get + "/deploy" - val DEPLOY_DIR = new File(DEPLOY) - if (!DEPLOY_DIR.exists) { - System.exit(-1) - } - val filesToDeploy = DEPLOY_DIR.listFiles.toArray.toList - .asInstanceOf[List[File]].filter(_.getName.endsWith(".jar")) - var dependencyJars: List[URL] = Nil - filesToDeploy.map { file => - val jarFile = new JarFile(file) - val en = jarFile.entries - while (en.hasMoreElements) { - val name = en.nextElement.getName - if (name.endsWith(".jar")) dependencyJars ::= new File( - String.format("jar:file:%s!/%s", jarFile.getName, name)).toURI.toURL - } - } - val toDeploy = filesToDeploy.map(_.toURI.toURL) - val allJars = toDeploy ::: dependencyJars - - new URLClassLoader(allJars.toArray, Thread.currentThread.getContextClassLoader) - } else Thread.currentThread.getContextClassLoader - }) - - abstract override def onLoad = { - super.onLoad - - for (loader ← applicationLoader; clazz ← BOOT_CLASSES) { - loader.loadClass(clazz).newInstance - } - } - - abstract override def onUnload = { - super.onUnload - Actor.registry.shutdownAll() - } -} diff --git a/test/disabled/presentation/akka/src/akka/actor/FSM.scala b/test/disabled/presentation/akka/src/akka/actor/FSM.scala deleted file mode 100644 index d9cd9a9ca2..0000000000 --- a/test/disabled/presentation/akka/src/akka/actor/FSM.scala +++ /dev/null @@ -1,527 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ -package akka.actor - -import akka.util._ -import akka.event.EventHandler - -import scala.collection.mutable -import java.util.concurrent.ScheduledFuture - -object FSM { - - object NullFunction extends PartialFunction[Any, Nothing] { - def isDefinedAt(o: Any) = false - def apply(o: Any) = sys.error("undefined") - } - - case class CurrentState[S](fsmRef: ActorRef, state: S) - case class Transition[S](fsmRef: ActorRef, from: S, to: S) - case class SubscribeTransitionCallBack(actorRef: ActorRef) - case class UnsubscribeTransitionCallBack(actorRef: ActorRef) - - sealed trait Reason - case object Normal extends Reason - case object Shutdown extends Reason - case class Failure(cause: Any) extends Reason - - case object StateTimeout - case class TimeoutMarker(generation: Long) - - case class Timer(name: String, msg: AnyRef, repeat: Boolean, generation: Int) { - private var ref: Option[ScheduledFuture[AnyRef]] = _ - - def schedule(actor: ActorRef, timeout: Duration) { - if (repeat) { - ref = Some(Scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit)) - } else { - ref = Some(Scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit)) - } - } - - def cancel { - if (ref.isDefined) { - ref.get.cancel(true) - ref = None - } - } - } - - /* - * This extractor is just convenience for matching a (S, S) pair, including a - * reminder what the new state is. - */ - object -> { - def unapply[S](in: (S, S)) = Some(in) - } - - /* - * With these implicits in scope, you can write "5 seconds" anywhere a - * Duration or Option[Duration] is expected. This is conveniently true - * for derived classes. - */ - implicit def d2od(d: Duration): Option[Duration] = Some(d) -} - -/** - * Finite State Machine actor trait. Use as follows: - * - * <pre> - * object A { - * trait State - * case class One extends State - * case class Two extends State - * - * case class Data(i : Int) - * } - * - * class A extends Actor with FSM[A.State, A.Data] { - * import A._ - * - * startWith(One, Data(42)) - * when(One) { - * case Event(SomeMsg, Data(x)) => ... - * case Ev(SomeMsg) => ... // convenience when data not needed - * } - * when(Two, stateTimeout = 5 seconds) { ... } - * initialize - * } - * </pre> - * - * Within the partial function the following values are returned for effecting - * state transitions: - * - * - <code>stay</code> for staying in the same state - * - <code>stay using Data(...)</code> for staying in the same state, but with - * different data - * - <code>stay forMax 5.millis</code> for staying with a state timeout; can be - * combined with <code>using</code> - * - <code>goto(...)</code> for changing into a different state; also supports - * <code>using</code> and <code>forMax</code> - * - <code>stop</code> for terminating this FSM actor - * - * Each of the above also supports the method <code>replying(AnyRef)</code> for - * sending a reply before changing state. - * - * While changing state, custom handlers may be invoked which are registered - * using <code>onTransition</code>. This is meant to enable concentrating - * different concerns in different places; you may choose to use - * <code>when</code> for describing the properties of a state, including of - * course initiating transitions, but you can describe the transitions using - * <code>onTransition</code> to avoid having to duplicate that code among - * multiple paths which lead to a transition: - * - * <pre> - * onTransition { - * case Active -> _ => cancelTimer("activeTimer") - * } - * </pre> - * - * Multiple such blocks are supported and all of them will be called, not only - * the first matching one. - * - * Another feature is that other actors may subscribe for transition events by - * sending a <code>SubscribeTransitionCallback</code> message to this actor; - * use <code>UnsubscribeTransitionCallback</code> before stopping the other - * actor. - * - * State timeouts set an upper bound to the time which may pass before another - * message is received in the current state. If no external message is - * available, then upon expiry of the timeout a StateTimeout message is sent. - * Note that this message will only be received in the state for which the - * timeout was set and that any message received will cancel the timeout - * (possibly to be started again by the next transition). - * - * Another feature is the ability to install and cancel single-shot as well as - * repeated timers which arrange for the sending of a user-specified message: - * - * <pre> - * setTimer("tock", TockMsg, 1 second, true) // repeating - * setTimer("lifetime", TerminateMsg, 1 hour, false) // single-shot - * cancelTimer("tock") - * timerActive_? ("tock") - * </pre> - */ -trait FSM[S, D] extends ListenerManagement { - this: Actor => - - import FSM._ - - type StateFunction = scala.PartialFunction[Event[D], State] - type Timeout = Option[Duration] - type TransitionHandler = PartialFunction[(S, S), Unit] - - /** - * **************************************** - * DSL - * **************************************** - */ - - /** - * Insert a new StateFunction at the end of the processing chain for the - * given state. If the stateTimeout parameter is set, entering this state - * without a differing explicit timeout setting will trigger a StateTimeout - * event; the same is true when using #stay. - * - * @param stateName designator for the state - * @param stateTimeout default state timeout for this state - * @param stateFunction partial function describing response to input - */ - protected final def when(stateName: S, stateTimeout: Timeout = None)(stateFunction: StateFunction) = { - register(stateName, stateFunction, stateTimeout) - } - - /** - * Set initial state. Call this method from the constructor before the #initialize method. - * - * @param stateName initial state designator - * @param stateData initial state data - * @param timeout state timeout for the initial state, overriding the default timeout for that state - */ - protected final def startWith(stateName: S, - stateData: D, - timeout: Timeout = None) = { - currentState = State(stateName, stateData, timeout) - } - - /** - * Produce transition to other state. Return this from a state function in - * order to effect the transition. - * - * @param nextStateName state designator for the next state - * @return state transition descriptor - */ - protected final def goto(nextStateName: S): State = { - State(nextStateName, currentState.stateData) - } - - /** - * Produce "empty" transition descriptor. Return this from a state function - * when no state change is to be effected. - * - * @return descriptor for staying in current state - */ - protected final def stay(): State = { - // cannot directly use currentState because of the timeout field - goto(currentState.stateName) - } - - /** - * Produce change descriptor to stop this FSM actor with reason "Normal". - */ - protected final def stop(): State = { - stop(Normal) - } - - /** - * Produce change descriptor to stop this FSM actor including specified reason. - */ - protected final def stop(reason: Reason): State = { - stop(reason, currentState.stateData) - } - - /** - * Produce change descriptor to stop this FSM actor including specified reason. - */ - protected final def stop(reason: Reason, stateData: D): State = { - stay using stateData withStopReason (reason) - } - - /** - * Schedule named timer to deliver message after given delay, possibly repeating. - * @param name identifier to be used with cancelTimer() - * @param msg message to be delivered - * @param timeout delay of first message delivery and between subsequent messages - * @param repeat send once if false, scheduleAtFixedRate if true - * @return current state descriptor - */ - protected final def setTimer(name: String, msg: AnyRef, timeout: Duration, repeat: Boolean): State = { - if (timers contains name) { - timers(name).cancel - } - val timer = Timer(name, msg, repeat, timerGen.next) - timer.schedule(self, timeout) - timers(name) = timer - stay - } - - /** - * Cancel named timer, ensuring that the message is not subsequently delivered (no race). - * @param name of the timer to cancel - */ - protected final def cancelTimer(name: String) = { - if (timers contains name) { - timers(name).cancel - timers -= name - } - } - - /** - * Inquire whether the named timer is still active. Returns true unless the - * timer does not exist, has previously been canceled or if it was a - * single-shot timer whose message was already received. - */ - protected final def timerActive_?(name: String) = timers contains name - - /** - * Set state timeout explicitly. This method can safely be used from within a - * state handler. - */ - protected final def setStateTimeout(state: S, timeout: Timeout) { - stateTimeouts(state) = timeout - } - - /** - * Set handler which is called upon each state transition, i.e. not when - * staying in the same state. This may use the pair extractor defined in the - * FSM companion object like so: - * - * <pre> - * onTransition { - * case Old -> New => doSomething - * } - * </pre> - * - * It is also possible to supply a 2-ary function object: - * - * <pre> - * onTransition(handler _) - * - * private def handler(from: S, to: S) { ... } - * </pre> - * - * The underscore is unfortunately necessary to enable the nicer syntax shown - * above (it uses the implicit conversion total2pf under the hood). - * - * <b>Multiple handlers may be installed, and every one of them will be - * called, not only the first one matching.</b> - */ - protected final def onTransition(transitionHandler: TransitionHandler) { - transitionEvent :+= transitionHandler - } - - /** - * Convenience wrapper for using a total function instead of a partial - * function literal. To be used with onTransition. - */ - implicit protected final def total2pf(transitionHandler: (S, S) => Unit) = - new PartialFunction[(S, S), Unit] { - def isDefinedAt(in: (S, S)) = true - def apply(in: (S, S)) { transitionHandler(in._1, in._2) } - } - - /** - * Set handler which is called upon termination of this FSM actor. - */ - protected final def onTermination(terminationHandler: PartialFunction[StopEvent[S, D], Unit]) = { - terminateEvent = terminationHandler - } - - /** - * Set handler which is called upon reception of unhandled messages. - */ - protected final def whenUnhandled(stateFunction: StateFunction) = { - handleEvent = stateFunction orElse handleEventDefault - } - - /** - * Verify existence of initial state and setup timers. This should be the - * last call within the constructor. - */ - def initialize { - makeTransition(currentState) - } - - /** - * **************************************************************** - * PRIVATE IMPLEMENTATION DETAILS - * **************************************************************** - */ - - /* - * FSM State data and current timeout handling - */ - private var currentState: State = _ - private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None - private var generation: Long = 0L - - /* - * Timer handling - */ - private val timers = mutable.Map[String, Timer]() - private val timerGen = Iterator from 0 - - /* - * State definitions - */ - private val stateFunctions = mutable.Map[S, StateFunction]() - private val stateTimeouts = mutable.Map[S, Timeout]() - - private def register(name: S, function: StateFunction, timeout: Timeout) { - if (stateFunctions contains name) { - stateFunctions(name) = stateFunctions(name) orElse function - stateTimeouts(name) = timeout orElse stateTimeouts(name) - } else { - stateFunctions(name) = function - stateTimeouts(name) = timeout - } - } - - /* - * unhandled event handler - */ - private val handleEventDefault: StateFunction = { - case Event(value, stateData) => - stay - } - private var handleEvent: StateFunction = handleEventDefault - - /* - * termination handling - */ - private var terminateEvent: PartialFunction[StopEvent[S, D], Unit] = { - case StopEvent(Failure(cause), _, _) => - case StopEvent(reason, _, _) => - } - - /* - * transition handling - */ - private var transitionEvent: List[TransitionHandler] = Nil - private def handleTransition(prev: S, next: S) { - val tuple = (prev, next) - for (te ← transitionEvent) { if (te.isDefinedAt(tuple)) te(tuple) } - } - - // ListenerManagement shall not start() or stop() listener actors - override protected val manageLifeCycleOfListeners = false - - /** - * ******************************************* - * Main actor receive() method - * ******************************************* - */ - override final protected def receive: Receive = { - case TimeoutMarker(gen) => - if (generation == gen) { - processEvent(StateTimeout) - } - case t@Timer(name, msg, repeat, generation) => - if ((timers contains name) && (timers(name).generation == generation)) { - processEvent(msg) - if (!repeat) { - timers -= name - } - } - case SubscribeTransitionCallBack(actorRef) => - addListener(actorRef) - // send current state back as reference point - try { - actorRef ! CurrentState(self, currentState.stateName) - } catch { - case e: ActorInitializationException => - EventHandler.warning(this, "trying to register not running listener") - } - case UnsubscribeTransitionCallBack(actorRef) => - removeListener(actorRef) - case value => { - if (timeoutFuture.isDefined) { - timeoutFuture.get.cancel(true) - timeoutFuture = None - } - generation += 1 - processEvent(value) - } - } - - private def processEvent(value: Any) = { - val event = Event(value, currentState.stateData) - val stateFunc = stateFunctions(currentState.stateName) - val nextState = if (stateFunc isDefinedAt event) { - stateFunc(event) - } else { - // handleEventDefault ensures that this is always defined - handleEvent(event) - } - nextState.stopReason match { - case Some(reason) => terminate(reason) - case None => makeTransition(nextState) - } - } - - private def makeTransition(nextState: State) = { - if (!stateFunctions.contains(nextState.stateName)) { - terminate(Failure("Next state %s does not exist".format(nextState.stateName))) - } else { - if (currentState.stateName != nextState.stateName) { - handleTransition(currentState.stateName, nextState.stateName) - notifyListeners(Transition(self, currentState.stateName, nextState.stateName)) - } - applyState(nextState) - } - } - - private def applyState(nextState: State) = { - currentState = nextState - val timeout = if (currentState.timeout.isDefined) currentState.timeout else stateTimeouts(currentState.stateName) - if (timeout.isDefined) { - val t = timeout.get - if (t.finite_? && t.length >= 0) { - timeoutFuture = Some(Scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit)) - } - } - } - - private def terminate(reason: Reason) = { - terminateEvent.apply(StopEvent(reason, currentState.stateName, currentState.stateData)) - self.stop() - } - - case class Event[D](event: Any, stateData: D) - object Ev { - def unapply[D](e: Event[D]): Option[Any] = Some(e.event) - } - - case class State(stateName: S, stateData: D, timeout: Timeout = None) { - - /** - * Modify state transition descriptor to include a state timeout for the - * next state. This timeout overrides any default timeout set for the next - * state. - */ - def forMax(timeout: Duration): State = { - copy(timeout = Some(timeout)) - } - - /** - * Send reply to sender of the current message, if available. - * - * @return this state transition descriptor - */ - def replying(replyValue: Any): State = { - self.sender match { - case Some(sender) => sender ! replyValue - case None => - } - this - } - - /** - * Modify state transition descriptor with new state data. The data will be - * set when transitioning to the new state. - */ - def using(nextStateDate: D): State = { - copy(stateData = nextStateDate) - } - - private[akka] var stopReason: Option[Reason] = None - - private[akka] def withStopReason(reason: Reason): State = { - stopReason = Some(reason) - this - } - } - - case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D) -} diff --git a/test/disabled/presentation/akka/src/akka/actor/Scheduler.scala b/test/disabled/presentation/akka/src/akka/actor/Scheduler.scala deleted file mode 100644 index 128584f3c5..0000000000 --- a/test/disabled/presentation/akka/src/akka/actor/Scheduler.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright 2007 WorldWide Conferencing, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Rework of David Pollak's ActorPing class in the Lift Project - * which is licensed under the Apache 2 License. - */ -package akka.actor - -import scala.collection.JavaConversions - -import java.util.concurrent._ - -import akka.event.EventHandler -import akka.AkkaException - -object Scheduler { - import Actor._ - - case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e) - - @volatile - private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - - /** - * Schedules to send the specified message to the receiver after initialDelay and then repeated after delay - */ - def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { - try { - service.scheduleAtFixedRate( - new Runnable { def run = receiver ! message }, - initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] - } catch { - case e: Exception => - val error = SchedulerException(message + " could not be scheduled on " + receiver, e) - EventHandler.error(error, this, "%s @ %s".format(receiver, message)) - throw error - } - } - - /** - * Schedules to run specified function to the receiver after initialDelay and then repeated after delay, - * avoid blocking operations since this is executed in the schedulers thread - */ - def schedule(f: () => Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = - schedule(new Runnable { def run = f() }, initialDelay, delay, timeUnit) - - /** - * Schedules to run specified runnable to the receiver after initialDelay and then repeated after delay, - * avoid blocking operations since this is executed in the schedulers thread - */ - def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { - try { - service.scheduleAtFixedRate(runnable, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] - } catch { - case e: Exception => - val error = SchedulerException("Failed to schedule a Runnable", e) - EventHandler.error(error, this, error.getMessage) - throw error - } - } - - /** - * Schedules to send the specified message to the receiver after delay - */ - def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { - try { - service.schedule( - new Runnable { def run = receiver ! message }, - delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] - } catch { - case e: Exception => - val error = SchedulerException(message + " could not be scheduleOnce'd on " + receiver, e) - EventHandler.error(e, this, receiver + " @ " + message) - throw error - } - } - - /** - * Schedules a function to be run after delay, - * avoid blocking operations since the runnable is executed in the schedulers thread - */ - def scheduleOnce(f: () => Unit, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = - scheduleOnce(new Runnable { def run = f() }, delay, timeUnit) - - /** - * Schedules a runnable to be run after delay, - * avoid blocking operations since the runnable is executed in the schedulers thread - */ - def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { - try { - service.schedule(runnable, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] - } catch { - case e: Exception => - val error = SchedulerException("Failed to scheduleOnce a Runnable", e) - EventHandler.error(e, this, error.getMessage) - throw error - } - } - - def shutdown() { - synchronized { - service.shutdown() - } - } - - def restart() { - synchronized { - shutdown() - service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - } - } -} - -private object SchedulerThreadFactory extends ThreadFactory { - private var count = 0 - val threadFactory = Executors.defaultThreadFactory() - - def newThread(r: Runnable): Thread = { - val thread = threadFactory.newThread(r) - thread.setName("akka:scheduler-" + count) - thread.setDaemon(true) - thread - } -} diff --git a/test/disabled/presentation/akka/src/akka/actor/Supervisor.scala b/test/disabled/presentation/akka/src/akka/actor/Supervisor.scala deleted file mode 100644 index bec3c83f1a..0000000000 --- a/test/disabled/presentation/akka/src/akka/actor/Supervisor.scala +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.actor - -import akka.AkkaException -import akka.util._ -import ReflectiveAccess._ -import Actor._ - -import java.util.concurrent.{ CopyOnWriteArrayList, ConcurrentHashMap } -import java.net.InetSocketAddress -import akka.config.Supervision._ - -class SupervisorException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) - -/** - * Factory object for creating supervisors declarative. It creates instances of the 'Supervisor' class. - * These are not actors, if you need a supervisor that is an Actor then you have to use the 'SupervisorActor' - * factory object. - * <p/> - * - * Here is a sample on how to use it: - * <pre> - * val supervisor = Supervisor( - * SupervisorConfig( - * RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]), - * Supervise( - * myFirstActor, - * Permanent) :: - * Supervise( - * mySecondActor, - * Permanent) :: - * Nil)) - * </pre> - * - * You dynamically link and unlink child children using the 'link' and 'unlink' methods. - * <pre> - * supervisor.link(child) - * supervisor.unlink(child) - * </pre> - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -object Supervisor { - def apply(config: SupervisorConfig): Supervisor = SupervisorFactory(config).newInstance.start -} - -/** - * Use this factory instead of the Supervisor factory object if you want to control - * instantiation and starting of the Supervisor, if not then it is easier and better - * to use the Supervisor factory object. - * <p> - * Example usage: - * <pre> - * val factory = SupervisorFactory( - * SupervisorConfig( - * RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]), - * Supervise( - * myFirstActor, - * Permanent) :: - * Supervise( - * mySecondActor, - * Permanent) :: - * Nil)) - * </pre> - * - * Then create a new Supervisor tree with the concrete Services we have defined. - * - * <pre> - * val supervisor = factory.newInstance - * supervisor.start // start up all managed servers - * </pre> - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -case class SupervisorFactory(val config: SupervisorConfig) { - - def newInstance: Supervisor = newInstanceFor(config) - - def newInstanceFor(config: SupervisorConfig): Supervisor = { - val supervisor = new Supervisor(config.restartStrategy, config.maxRestartsHandler) - supervisor.configure(config) - supervisor.start - supervisor - } -} - -/** - * <b>NOTE:</b> - * <p/> - * The supervisor class is only used for the configuration system when configuring supervisor - * hierarchies declaratively. Should not be used as part of the regular programming API. Instead - * wire the children together using 'link', 'spawnLink' etc. and set the 'trapExit' flag in the - * children that should trap error signals and trigger restart. - * <p/> - * See the Scaladoc for the SupervisorFactory for an example on how to declaratively wire up children. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) => Unit) { - import Supervisor._ - - private val _childActors = new ConcurrentHashMap[String, List[ActorRef]] - private val _childSupervisors = new CopyOnWriteArrayList[Supervisor] - - private[akka] val supervisor = actorOf(new SupervisorActor(handler, maxRestartsHandler)).start() - - def uuid = supervisor.uuid - - def start: Supervisor = { - this - } - - def shutdown(): Unit = supervisor.stop() - - def link(child: ActorRef) = supervisor.link(child) - - def unlink(child: ActorRef) = supervisor.unlink(child) - - def children: List[ActorRef] = - _childActors.values.toArray.toList.asInstanceOf[List[List[ActorRef]]].flatten - - def childSupervisors: List[Supervisor] = - _childActors.values.toArray.toList.asInstanceOf[List[Supervisor]] - - def configure(config: SupervisorConfig): Unit = config match { - case SupervisorConfig(_, servers, _) => - - servers.map(server => - server match { - case Supervise(actorRef, lifeCycle, registerAsRemoteService) => - actorRef.start() - val className = actorRef.actor.getClass.getName - val currentActors = { - val list = _childActors.get(className) - if (list eq null) List[ActorRef]() - else list - } - _childActors.put(className, actorRef :: currentActors) - actorRef.lifeCycle = lifeCycle - supervisor.link(actorRef) - if (registerAsRemoteService) - Actor.remote.register(actorRef) - case supervisorConfig@SupervisorConfig(_, _, _) => // recursive supervisor configuration - val childSupervisor = Supervisor(supervisorConfig) - supervisor.link(childSupervisor.supervisor) - _childSupervisors.add(childSupervisor) - }) - } -} - -/** - * For internal use only. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -final class SupervisorActor private[akka] (handler: FaultHandlingStrategy, maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) => Unit) extends Actor { - self.faultHandler = handler - - override def postStop(): Unit = { - val i = self.linkedActors.values.iterator - while (i.hasNext) { - val ref = i.next - ref.stop() - self.unlink(ref) - } - } - - def receive = { - case max@MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) => maxRestartsHandler(self, max) - case unknown => throw new SupervisorException( - "SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]") - } -} diff --git a/test/disabled/presentation/akka/src/akka/actor/UntypedActor.scala b/test/disabled/presentation/akka/src/akka/actor/UntypedActor.scala deleted file mode 100644 index cbc43f22f8..0000000000 --- a/test/disabled/presentation/akka/src/akka/actor/UntypedActor.scala +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.actor - -import akka.japi.{ Creator, Procedure } - -/** - * Subclass this abstract class to create a MDB-style untyped actor. - * <p/> - * This class is meant to be used from Java. - * <p/> - * Here is an example on how to create and use an UntypedActor: - * <pre> - * public class SampleUntypedActor extends UntypedActor { - * public void onReceive(Object message) throws Exception { - * if (message instanceof String) { - * String msg = (String)message; - * - * if (msg.equals("UseReply")) { - * // Reply to original sender of message using the 'replyUnsafe' method - * getContext().replyUnsafe(msg + ":" + getContext().getUuid()); - * - * } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) { - * // Reply to original sender of message using the sender reference - * // also passing along my own reference (the context) - * getContext().getSender().get().sendOneWay(msg, context); - * - * } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) { - * // Reply to original sender of message using the sender future reference - * getContext().getSenderFuture().get().completeWithResult(msg); - * - * } else if (msg.equals("SendToSelf")) { - * // Send message to the actor itself recursively - * getContext().sendOneWay(msg) - * - * } else if (msg.equals("ForwardMessage")) { - * // Retrieve an actor from the ActorRegistry by ID and get an ActorRef back - * ActorRef actorRef = Actor.registry.actorsFor("some-actor-id").head(); - * - * } else throw new IllegalArgumentException("Unknown message: " + message); - * } else throw new IllegalArgumentException("Unknown message: " + message); - * } - * - * public static void main(String[] args) { - * ActorRef actor = Actors.actorOf(SampleUntypedActor.class); - * actor.start(); - * actor.sendOneWay("SendToSelf"); - * actor.stop(); - * } - * } - * </pre> - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -abstract class UntypedActor extends Actor { - - /** - * To be implemented by concrete UntypedActor. Defines the message handler. - */ - @throws(classOf[Exception]) - def onReceive(message: Any): Unit - - /** - * Returns the 'self' reference with the API. - */ - def getContext(): ActorRef = self - - /** - * Returns the 'self' reference with the API. - */ - def context(): ActorRef = self - - /** - * Java API for become - */ - def become(behavior: Procedure[Any]): Unit = become(behavior, false) - - /* - * Java API for become with optional discardOld - */ - def become(behavior: Procedure[Any], discardOld: Boolean): Unit = - super.become({ case msg => behavior.apply(msg) }, discardOld) - - /** - * User overridable callback. - * <p/> - * Is called when an Actor is started by invoking 'actor.start()'. - */ - override def preStart() {} - - /** - * User overridable callback. - * <p/> - * Is called when 'actor.stop()' is invoked. - */ - override def postStop() {} - - /** - * User overridable callback. - * <p/> - * Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated. - */ - override def preRestart(reason: Throwable) {} - - /** - * User overridable callback. - * <p/> - * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. - */ - override def postRestart(reason: Throwable) {} - - /** - * User overridable callback. - * <p/> - * Is called when a message isn't handled by the current behavior of the actor - * by default it throws an UnhandledMessageException - */ - override def unhandled(msg: Any) { - throw new UnhandledMessageException(msg, self) - } - - final protected def receive = { - case msg => onReceive(msg) - } -} - -/** - * Factory closure for an UntypedActor, to be used with 'Actors.actorOf(factory)'. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -trait UntypedActorFactory extends Creator[Actor] diff --git a/test/disabled/presentation/akka/src/akka/actor/package.scala b/test/disabled/presentation/akka/src/akka/actor/package.scala deleted file mode 100644 index fbeeed49cb..0000000000 --- a/test/disabled/presentation/akka/src/akka/actor/package.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka - -import actor.{ ScalaActorRef, ActorRef } - -package object actor { - implicit def actorRef2Scala(ref: ActorRef): ScalaActorRef = - ref.asInstanceOf[ScalaActorRef] - - implicit def scala2ActorRef(ref: ScalaActorRef): ActorRef = - ref.asInstanceOf[ActorRef] - - type Uuid = com.eaio.uuid.UUID - - def newUuid(): Uuid = new Uuid() - - def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time, clockSeqAndNode) - - def uuidFrom(uuid: String): Uuid = new Uuid(uuid) -} |