summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/actor
diff options
context:
space:
mode:
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/actor')
-rw-r--r--test/disabled/presentation/akka/src/akka/actor/Actor.scala503
-rw-r--r--test/disabled/presentation/akka/src/akka/actor/ActorRef.scala1433
-rw-r--r--test/disabled/presentation/akka/src/akka/actor/ActorRegistry.scala389
-rw-r--r--test/disabled/presentation/akka/src/akka/actor/Actors.java108
-rw-r--r--test/disabled/presentation/akka/src/akka/actor/BootableActorLoaderService.scala60
-rw-r--r--test/disabled/presentation/akka/src/akka/actor/FSM.scala527
-rw-r--r--test/disabled/presentation/akka/src/akka/actor/Scheduler.scala133
-rw-r--r--test/disabled/presentation/akka/src/akka/actor/Supervisor.scala176
-rw-r--r--test/disabled/presentation/akka/src/akka/actor/UntypedActor.scala134
-rw-r--r--test/disabled/presentation/akka/src/akka/actor/package.scala23
10 files changed, 0 insertions, 3486 deletions
diff --git a/test/disabled/presentation/akka/src/akka/actor/Actor.scala b/test/disabled/presentation/akka/src/akka/actor/Actor.scala
deleted file mode 100644
index b9bc51b635..0000000000
--- a/test/disabled/presentation/akka/src/akka/actor/Actor.scala
+++ /dev/null
@@ -1,503 +0,0 @@
-/** Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.actor
-
-import akka.dispatch._
-import akka.config.Config._
-import akka.util.Helpers.{ narrow, narrowSilently }
-import akka.util.ListenerManagement
-import akka.AkkaException
-
-import scala.beans.BeanProperty
-import akka.util.{ ReflectiveAccess, Duration }
-import akka.remoteinterface.RemoteSupport
-import akka.japi.{ Creator, Procedure }
-import java.lang.reflect.InvocationTargetException
-
-/** Life-cycle messages for the Actors
- */
-sealed trait LifeCycleMessage extends Serializable
-
-/* Marker trait to show which Messages are automatically handled by Akka */
-sealed trait AutoReceivedMessage { self: LifeCycleMessage => }
-
-case class HotSwap(code: ActorRef => Actor.Receive, discardOld: Boolean = true)
- extends AutoReceivedMessage with LifeCycleMessage {
-
- /** Java API
- */
- def this(code: akka.japi.Function[ActorRef, Procedure[Any]], discardOld: Boolean) =
- this((self: ActorRef) => {
- val behavior = code(self)
- val result: Actor.Receive = { case msg => behavior(msg) }
- result
- }, discardOld)
-
- /** Java API with default non-stacking behavior
- */
- def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true)
-}
-
-case object RevertHotSwap extends AutoReceivedMessage with LifeCycleMessage
-
-case class Restart(reason: Throwable) extends AutoReceivedMessage with LifeCycleMessage
-
-case class Exit(dead: ActorRef, killer: Throwable) extends AutoReceivedMessage with LifeCycleMessage
-
-case class Link(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage
-
-case class Unlink(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage
-
-case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage
-
-case object PoisonPill extends AutoReceivedMessage with LifeCycleMessage
-
-case object Kill extends AutoReceivedMessage with LifeCycleMessage
-
-case object ReceiveTimeout extends LifeCycleMessage
-
-case class MaximumNumberOfRestartsWithinTimeRangeReached(
- @BeanProperty val victim: ActorRef,
- @BeanProperty val maxNrOfRetries: Option[Int],
- @BeanProperty val withinTimeRange: Option[Int],
- @BeanProperty val lastExceptionCausingRestart: Throwable) extends LifeCycleMessage
-
-// Exceptions for Actors
-class ActorStartException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
-class IllegalActorStateException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
-class ActorKilledException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
-class ActorInitializationException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
-class ActorTimeoutException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
-class InvalidMessageException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
-
-/** This message is thrown by default when an Actors behavior doesn't match a message
- */
-case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception {
- override def getMessage() = "Actor %s does not handle [%s]".format(ref, msg)
- override def fillInStackTrace() = this //Don't waste cycles generating stack trace
-}
-
-/** Actor factory module with factory methods for creating various kinds of Actors.
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-object Actor extends ListenerManagement {
-
- /** Add shutdown cleanups
- */
- private[akka] lazy val shutdownHook = {
- val hook = new Runnable {
- override def run {
- // Clear Thread.subclassAudits
- val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits")
- tf.setAccessible(true)
- val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_, _]]
- subclassAudits synchronized { subclassAudits.clear }
- }
- }
- Runtime.getRuntime.addShutdownHook(new Thread(hook))
- hook
- }
-
- val registry = new ActorRegistry
-
- lazy val remote: RemoteSupport = {
- ReflectiveAccess
- .Remote
- .defaultRemoteSupport
- .map(_())
- .getOrElse(throw new UnsupportedOperationException("You need to have akka-remote.jar on classpath"))
- }
-
- private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis
- private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
-
- /** A Receive is a convenience type that defines actor message behavior currently modeled as
- * a PartialFunction[Any, Unit].
- */
- type Receive = PartialFunction[Any, Unit]
-
- private[actor] val actorRefInCreation = new ThreadLocal[Option[ActorRef]] {
- override def initialValue = None
- }
-
- /** Creates an ActorRef out of the Actor with type T.
- * <pre>
- * import Actor._
- * val actor = actorOf[MyActor]
- * actor.start()
- * actor ! message
- * actor.stop()
- * </pre>
- * You can create and start the actor in one statement like this:
- * <pre>
- * val actor = actorOf[MyActor].start()
- * </pre>
- */
- def actorOf[T <: Actor: ClassTag]: ActorRef = actorOf(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]])
-
- /** Creates an ActorRef out of the Actor of the specified Class.
- * <pre>
- * import Actor._
- * val actor = actorOf(classOf[MyActor])
- * actor.start()
- * actor ! message
- * actor.stop()
- * </pre>
- * You can create and start the actor in one statement like this:
- * <pre>
- * val actor = actorOf(classOf[MyActor]).start()
- * </pre>
- */
- def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => {
- import ReflectiveAccess.{ createInstance, noParams, noArgs }
- createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs) match {
- case Right(actor) => actor
- case Left(exception) =>
- val cause = exception match {
- case i: InvocationTargetException => i.getTargetException
- case _ => exception
- }
-
- throw new ActorInitializationException(
- "Could not instantiate Actor of " + clazz +
- "\nMake sure Actor is NOT defined inside a class/trait," +
- "\nif so put it outside the class/trait, f.e. in a companion object," +
- "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", cause)
- }
-
- }, None)
-
- /** Creates an ActorRef out of the Actor. Allows you to pass in a factory function
- * that creates the Actor. Please note that this function can be invoked multiple
- * times if for example the Actor is supervised and needs to be restarted.
- * <p/>
- * This function should <b>NOT</b> be used for remote actors.
- * <pre>
- * import Actor._
- * val actor = actorOf(new MyActor)
- * actor.start()
- * actor ! message
- * actor.stop()
- * </pre>
- * You can create and start the actor in one statement like this:
- * <pre>
- * val actor = actorOf(new MyActor).start()
- * </pre>
- */
- def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory, None)
-
- /** Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator<Actor>)
- * that creates the Actor. Please note that this function can be invoked multiple
- * times if for example the Actor is supervised and needs to be restarted.
- * <p/>
- * This function should <b>NOT</b> be used for remote actors.
- * JAVA API
- */
- def actorOf(creator: Creator[Actor]): ActorRef = new LocalActorRef(() => creator.create, None)
-
- /** Use to spawn out a block of code in an event-driven actor. Will shut actor down when
- * the block has been executed.
- * <p/>
- * NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since
- * there is a method 'spawn[ActorType]' in the Actor trait already.
- * Example:
- * <pre>
- * import Actor.{spawn}
- *
- * spawn {
- * ... // do stuff
- * }
- * </pre>
- */
- def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = {
- case object Spawn
- actorOf(new Actor() {
- self.dispatcher = dispatcher
- def receive = {
- case Spawn => try { body } finally { self.stop() }
- }
- }).start() ! Spawn
- }
-
- /** Implicitly converts the given Option[Any] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>
- * to convert an Option[Any] to an Option[T].
- */
- implicit def toAnyOptionAsTypedOption(anyOption: Option[Any]) = new AnyOptionAsTypedOption(anyOption)
-
- /** Implicitly converts the given Future[_] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>
- * to convert an Option[Any] to an Option[T].
- * This means that the following code is equivalent:
- * (actor !! "foo").as[Int] (Deprecated)
- * and
- * (actor !!! "foo").as[Int] (Recommended)
- */
- implicit def futureToAnyOptionAsTypedOption(anyFuture: Future[_]) = new AnyOptionAsTypedOption({
- try { anyFuture.await } catch { case t: FutureTimeoutException => }
- anyFuture.resultOrException
- })
-}
-
-/** Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
- * <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>
- * <p/>
- * An actor has a well-defined (non-cyclic) life-cycle.
- * <pre>
- * => NEW (newly created actor) - can't receive messages (yet)
- * => STARTED (when 'start' is invoked) - can receive messages
- * => SHUT DOWN (when 'exit' is invoked) - can't do anything
- * </pre>
- *
- * <p/>
- * The Actor's API is available in the 'self' member variable.
- *
- * <p/>
- * Here you find functions like:
- * - !, !!, !!! and forward
- * - link, unlink, startLink, spawnLink etc
- * - makeRemote etc.
- * - start, stop
- * - etc.
- *
- * <p/>
- * Here you also find fields like
- * - dispatcher = ...
- * - id = ...
- * - lifeCycle = ...
- * - faultHandler = ...
- * - trapExit = ...
- * - etc.
- *
- * <p/>
- * This means that to use them you have to prefix them with 'self', like this: <tt>self ! Message</tt>
- *
- * However, for convenience you can import these functions and fields like below, which will allow you do
- * drop the 'self' prefix:
- * <pre>
- * class MyActor extends Actor {
- * import self._
- * id = ...
- * dispatcher = ...
- * spawnLink[OtherActor]
- * ...
- * }
- * </pre>
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-trait Actor {
-
- /** Type alias because traits cannot have companion objects.
- */
- type Receive = Actor.Receive
-
- /*
- * Some[ActorRef] representation of the 'self' ActorRef reference.
- * <p/>
- * Mainly for internal use, functions as the implicit sender references when invoking
- * the 'forward' function.
- */
- @transient
- implicit val someSelf: Some[ActorRef] = {
- val optRef = Actor.actorRefInCreation.get
- if (optRef.isEmpty) throw new ActorInitializationException(
- "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." +
- "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." +
- "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
- "\n\tEither use:" +
- "\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
- "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'")
- Actor.actorRefInCreation.set(None)
- optRef.asInstanceOf[Some[ActorRef]].get.id = getClass.getName //FIXME: Is this needed?
- optRef.asInstanceOf[Some[ActorRef]]
- }
-
- /*
- * Option[ActorRef] representation of the 'self' ActorRef reference.
- * <p/>
- * Mainly for internal use, functions as the implicit sender references when invoking
- * one of the message send functions ('!', '!!' and '!!!').
- */
- implicit def optionSelf: Option[ActorRef] = someSelf
-
- /** The 'self' field holds the ActorRef for this actor.
- * <p/>
- * Can be used to send messages to itself:
- * <pre>
- * self ! message
- * </pre>
- * Here you also find most of the Actor API.
- * <p/>
- * For example fields like:
- * <pre>
- * self.dispatcher = ...
- * self.trapExit = ...
- * self.faultHandler = ...
- * self.lifeCycle = ...
- * self.sender
- * </pre>
- * <p/>
- * Here you also find methods like:
- * <pre>
- * self.reply(..)
- * self.link(..)
- * self.unlink(..)
- * self.start(..)
- * self.stop(..)
- * </pre>
- */
- @transient
- val self: ScalaActorRef = someSelf.get
-
- /** User overridable callback/setting.
- * <p/>
- * Partial function implementing the actor logic.
- * To be implemented by concrete actor class.
- * <p/>
- * Example code:
- * <pre>
- * def receive = {
- * case Ping =&gt;
- * println("got a 'Ping' message")
- * self.reply("pong")
- *
- * case OneWay =&gt;
- * println("got a 'OneWay' message")
- *
- * case unknown =&gt;
- * println("unknown message: " + unknown)
- * }
- * </pre>
- */
- protected def receive: Receive
-
- /** User overridable callback.
- * <p/>
- * Is called when an Actor is started by invoking 'actor.start()'.
- */
- def preStart() {}
-
- /** User overridable callback.
- * <p/>
- * Is called when 'actor.stop()' is invoked.
- */
- def postStop() {}
-
- /** User overridable callback.
- * <p/>
- * Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
- */
- def preRestart(reason: Throwable) {}
-
- /** User overridable callback.
- * <p/>
- * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
- */
- def postRestart(reason: Throwable) {}
-
- /** User overridable callback.
- * <p/>
- * Is called when a message isn't handled by the current behavior of the actor
- * by default it throws an UnhandledMessageException
- */
- def unhandled(msg: Any) {
- throw new UnhandledMessageException(msg, self)
- }
-
- /** Is the actor able to handle the message passed in as arguments?
- */
- def isDefinedAt(message: Any): Boolean = {
- val behaviorStack = self.hotswap
- message match { //Same logic as apply(msg) but without the unhandled catch-all
- case l: AutoReceivedMessage => true
- case msg if behaviorStack.nonEmpty &&
- behaviorStack.head.isDefinedAt(msg) => true
- case msg if behaviorStack.isEmpty &&
- processingBehavior.isDefinedAt(msg) => true
- case _ => false
- }
- }
-
- /** Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
- * Puts the behavior on top of the hotswap stack.
- * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
- */
- def become(behavior: Receive, discardOld: Boolean = true) {
- if (discardOld) unbecome()
- self.hotswap = self.hotswap.push(behavior)
- }
-
- /** Reverts the Actor behavior to the previous one in the hotswap stack.
- */
- def unbecome(): Unit = {
- val h = self.hotswap
- if (h.nonEmpty) self.hotswap = h.pop
- }
-
- // =========================================
- // ==== INTERNAL IMPLEMENTATION DETAILS ====
- // =========================================
-
- private[akka] final def apply(msg: Any) = {
- if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
- throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null")
- val behaviorStack = self.hotswap
- msg match {
- case l: AutoReceivedMessage => autoReceiveMessage(l)
- case msg if behaviorStack.nonEmpty &&
- behaviorStack.head.isDefinedAt(msg) => behaviorStack.head.apply(msg)
- case msg if behaviorStack.isEmpty &&
- processingBehavior.isDefinedAt(msg) => processingBehavior.apply(msg)
- case unknown => unhandled(unknown) //This is the only line that differs from processingbehavior
- }
- }
-
- private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match {
- case HotSwap(code, discardOld) => become(code(self), discardOld)
- case RevertHotSwap => unbecome()
- case Exit(dead, reason) => self.handleTrapExit(dead, reason)
- case Link(child) => self.link(child)
- case Unlink(child) => self.unlink(child)
- case UnlinkAndStop(child) => self.unlink(child); child.stop()
- case Restart(reason) => throw reason
- case Kill => throw new ActorKilledException("Kill")
- case PoisonPill =>
- val f = self.senderFuture
- self.stop()
- if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill"))
- }
-
- private lazy val processingBehavior = receive //ProcessingBehavior is the original behavior
-}
-
-private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) {
-
- /** Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException
- * if the actual type is not assignable from the given one.
- */
- def as[T]: Option[T] = narrow[T](anyOption)
-
- /** Convenience helper to cast the given Option of Any to an Option of the given type. Will swallow a possible
- * ClassCastException and return None in that case.
- */
- def asSilently[T: ClassTag]: Option[T] = narrowSilently[T](anyOption)
-}
-
-/** Marker interface for proxyable actors (such as typed actor).
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-trait Proxyable {
- private[actor] def swapProxiedActor(newInstance: Actor)
-}
-
-/** Represents the different Actor types.
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-sealed trait ActorType
-object ActorType {
- case object ScalaActor extends ActorType
- case object TypedActor extends ActorType
-}
diff --git a/test/disabled/presentation/akka/src/akka/actor/ActorRef.scala b/test/disabled/presentation/akka/src/akka/actor/ActorRef.scala
deleted file mode 100644
index 97bb710e29..0000000000
--- a/test/disabled/presentation/akka/src/akka/actor/ActorRef.scala
+++ /dev/null
@@ -1,1433 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.actor
-
-import akka.event.EventHandler
-import akka.dispatch._
-import akka.config.Supervision._
-import akka.util._
-import ReflectiveAccess._
-
-import java.net.InetSocketAddress
-import java.util.concurrent.atomic.AtomicReference
-import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
-import java.util.{ Map => JMap }
-
-import scala.beans.BeanProperty
-import scala.collection.immutable.Stack
-import scala.annotation.tailrec
-
-private[akka] object ActorRefInternals {
-
- /**
- * LifeCycles for ActorRefs.
- */
- private[akka] sealed trait StatusType
- object UNSTARTED extends StatusType
- object RUNNING extends StatusType
- object BEING_RESTARTED extends StatusType
- object SHUTDOWN extends StatusType
-}
-
-/**
- * Abstraction for unification of sender and senderFuture for later reply.
- * Can be stored away and used at a later point in time.
- */
-abstract class Channel[T] {
-
- /**
- * Scala API. <p/>
- * Sends the specified message to the channel.
- */
- def !(msg: T): Unit
-
- /**
- * Java API. <p/>
- * Sends the specified message to the channel.
- */
- def sendOneWay(msg: T): Unit = this.!(msg)
-}
-
-/**
- * ActorRef is an immutable and serializable handle to an Actor.
- * <p/>
- * Create an ActorRef for an Actor by using the factory method on the Actor object.
- * <p/>
- * Here is an example on how to create an actor with a default constructor.
- * <pre>
- * import Actor._
- *
- * val actor = actorOf[MyActor]
- * actor.start()
- * actor ! message
- * actor.stop()
- * </pre>
- *
- * You can also create and start actors like this:
- * <pre>
- * val actor = actorOf[MyActor].start()
- * </pre>
- *
- * Here is an example on how to create an actor with a non-default constructor.
- * <pre>
- * import Actor._
- *
- * val actor = actorOf(new MyActor(...))
- * actor.start()
- * actor ! message
- * actor.stop()
- * </pre>
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scalaRef: ScalaActorRef =>
- // Only mutable for RemoteServer in order to maintain identity across nodes
- @volatile
- protected[akka] var _uuid = newUuid
- @volatile
- protected[this] var _status: ActorRefInternals.StatusType = ActorRefInternals.UNSTARTED
-
- /**
- * User overridable callback/setting.
- * <p/>
- * Identifier for actor, does not have to be a unique one. Default is the 'uuid'.
- * <p/>
- * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote
- * actor in RemoteServer etc.But also as the identifier for persistence, which means
- * that you can use a custom name to be able to retrieve the "correct" persisted state
- * upon restart, remote restart etc.
- */
- @BeanProperty
- @volatile
- var id: String = _uuid.toString
-
- /**
- * User overridable callback/setting.
- * <p/>
- * Defines the default timeout for '!!' and '!!!' invocations,
- * e.g. the timeout for the future returned by the call to '!!' and '!!!'.
- */
- @deprecated("Will be replaced by implicit-scoped timeout on all methods that needs it, will default to timeout specified in config", "1.1")
- @BeanProperty
- @volatile
- var timeout: Long = Actor.TIMEOUT
-
- /**
- * User overridable callback/setting.
- * <p/>
- * Defines the default timeout for an initial receive invocation.
- * When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
- */
- @volatile
- var receiveTimeout: Option[Long] = None
-
- /**
- * Akka Java API. <p/>
- * Defines the default timeout for an initial receive invocation.
- * When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
- */
- def setReceiveTimeout(timeout: Long) = this.receiveTimeout = Some(timeout)
- def getReceiveTimeout(): Option[Long] = receiveTimeout
-
- /**
- * Akka Java API. <p/>
- * A faultHandler defines what should be done when a linked actor signals an error.
- * <p/>
- * Can be one of:
- * <pre>
- * getContext().setFaultHandler(new AllForOneStrategy(new Class[]{Throwable.class},maxNrOfRetries, withinTimeRange));
- * </pre>
- * Or:
- * <pre>
- * getContext().setFaultHandler(new OneForOneStrategy(new Class[]{Throwable.class},maxNrOfRetries, withinTimeRange));
- * </pre>
- */
- def setFaultHandler(handler: FaultHandlingStrategy)
- def getFaultHandler(): FaultHandlingStrategy
-
- /**
- * Akka Java API. <p/>
- * A lifeCycle defines whether the actor will be stopped on error (Temporary) or if it can be restarted (Permanent)
- * <p/>
- * Can be one of:
- *
- * import static akka.config.Supervision.*;
- * <pre>
- * getContext().setLifeCycle(permanent());
- * </pre>
- * Or:
- * <pre>
- * getContext().setLifeCycle(temporary());
- * </pre>
- */
- def setLifeCycle(lifeCycle: LifeCycle): Unit
- def getLifeCycle(): LifeCycle
-
- /**
- * Akka Java API. <p/>
- * The default dispatcher is the <tt>Dispatchers.globalExecutorBasedEventDrivenDispatcher</tt>.
- * This means that all actors will share the same event-driven executor based dispatcher.
- * <p/>
- * You can override it so it fits the specific use-case that the actor is used for.
- * See the <tt>akka.dispatch.Dispatchers</tt> class for the different
- * dispatchers available.
- * <p/>
- * The default is also that all actors that are created and spawned from within this actor
- * is sharing the same dispatcher as its creator.
- */
- def setDispatcher(dispatcher: MessageDispatcher) = this.dispatcher = dispatcher
- def getDispatcher(): MessageDispatcher = dispatcher
-
- /**
- * Returns on which node this actor lives if None it lives in the local ActorRegistry
- */
- @deprecated("Remoting will become fully transparent in the future", "1.1")
- def homeAddress: Option[InetSocketAddress]
-
- /**
- * Java API. <p/>
- */
- @deprecated("Remoting will become fully transparent in the future", "1.1")
- def getHomeAddress(): InetSocketAddress = homeAddress getOrElse null
-
- /**
- * Holds the hot swapped partial function.
- */
- @volatile
- protected[akka] var hotswap = Stack[PartialFunction[Any, Unit]]()
-
- /**
- * This is a reference to the message currently being processed by the actor
- */
- @volatile
- protected[akka] var currentMessage: MessageInvocation = null
-
- /**
- * Comparison only takes uuid into account.
- */
- def compareTo(other: ActorRef) = this.uuid compareTo other.uuid
-
- /**
- * Returns the uuid for the actor.
- */
- def getUuid() = _uuid
- def uuid = _uuid
-
- /**
- * Akka Java API. <p/>
- * The reference sender Actor of the last received message.
- * Is defined if the message was sent from another Actor, else None.
- */
- def getSender(): Option[ActorRef] = sender
-
- /**
- * Akka Java API. <p/>
- * The reference sender future of the last received message.
- * Is defined if the message was sent with sent with '!!' or '!!!', else None.
- */
- def getSenderFuture(): Option[CompletableFuture[Any]] = senderFuture
-
- /**
- * Is the actor being restarted?
- */
- def isBeingRestarted: Boolean = _status == ActorRefInternals.BEING_RESTARTED
-
- /**
- * Is the actor running?
- */
- def isRunning: Boolean = _status match {
- case ActorRefInternals.BEING_RESTARTED | ActorRefInternals.RUNNING => true
- case _ => false
- }
-
- /**
- * Is the actor shut down?
- */
- def isShutdown: Boolean = _status == ActorRefInternals.SHUTDOWN
-
- /**
- * Is the actor ever started?
- */
- def isUnstarted: Boolean = _status == ActorRefInternals.UNSTARTED
-
- /**
- * Is the actor able to handle the message passed in as arguments?
- */
- @deprecated("Will be removed without replacement, it's just not reliable in the face of `become` and `unbecome`", "1.1")
- def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message)
-
- /**
- * Only for internal use. UUID is effectively final.
- */
- protected[akka] def uuid_=(uid: Uuid) = _uuid = uid
-
- /**
- * Akka Java API. <p/>
- * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
- * <p/>
- * <pre>
- * actor.sendOneWay(message);
- * </pre>
- * <p/>
- */
- def sendOneWay(message: AnyRef): Unit = sendOneWay(message, null)
-
- /**
- * Akka Java API. <p/>
- * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
- * <p/>
- * Allows you to pass along the sender of the message.
- * <p/>
- * <pre>
- * actor.sendOneWay(message, context);
- * </pre>
- * <p/>
- */
- def sendOneWay(message: AnyRef, sender: ActorRef): Unit = this.!(message)(Option(sender))
-
- /**
- * Akka Java API. <p/>
- * @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef)
- * Uses the default timeout of the Actor (setTimeout()) and omits the sender reference
- */
- def sendRequestReply(message: AnyRef): AnyRef = sendRequestReply(message, timeout, null)
-
- /**
- * Akka Java API. <p/>
- * @see sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef)
- * Uses the default timeout of the Actor (setTimeout())
- */
- def sendRequestReply(message: AnyRef, sender: ActorRef): AnyRef = sendRequestReply(message, timeout, sender)
-
- /**
- * Akka Java API. <p/>
- * Sends a message asynchronously and waits on a future for a reply message under the hood.
- * <p/>
- * It waits on the reply either until it receives it or until the timeout expires
- * (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics.
- * <p/>
- * <b>NOTE:</b>
- * Use this method with care. In most cases it is better to use 'sendOneWay' together with 'getContext().getSender()' to
- * implement request/response message exchanges.
- * <p/>
- * If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
- * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
- */
- def sendRequestReply(message: AnyRef, timeout: Long, sender: ActorRef): AnyRef = {
- !!(message, timeout)(Option(sender)).getOrElse(throw new ActorTimeoutException(
- "Message [" + message +
- "]\n\tsent to [" + actorClassName +
- "]\n\tfrom [" + (if (sender ne null) sender.actorClassName else "nowhere") +
- "]\n\twith timeout [" + timeout +
- "]\n\ttimed out."))
- .asInstanceOf[AnyRef]
- }
-
- /**
- * Akka Java API. <p/>
- * @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_]
- * Uses the Actors default timeout (setTimeout()) and omits the sender
- */
- def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]]
-
- /**
- * Akka Java API. <p/>
- * @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_]
- * Uses the Actors default timeout (setTimeout())
- */
- def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] = sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]]
-
- /**
- * Akka Java API. <p/>
- * Sends a message asynchronously returns a future holding the eventual reply message.
- * <p/>
- * <b>NOTE:</b>
- * Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to
- * implement request/response message exchanges.
- * <p/>
- * If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
- * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
- */
- def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]]
-
- /**
- * Akka Java API. <p/>
- * Forwards the message specified to this actor and preserves the original sender of the message
- */
- def forward(message: AnyRef, sender: ActorRef): Unit =
- if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null")
- else forward(message)(Some(sender))
-
- /**
- * Akka Java API. <p/>
- * Use <code>getContext().replyUnsafe(..)</code> to reply with a message to the original sender of the message currently
- * being processed.
- * <p/>
- * Throws an IllegalStateException if unable to determine what to reply to.
- */
- def replyUnsafe(message: AnyRef) = reply(message)
-
- /**
- * Akka Java API. <p/>
- * Use <code>getContext().replySafe(..)</code> to reply with a message to the original sender of the message currently
- * being processed.
- * <p/>
- * Returns true if reply was sent, and false if unable to determine what to reply to.
- */
- def replySafe(message: AnyRef): Boolean = reply_?(message)
-
- /**
- * Returns the class for the Actor instance that is managed by the ActorRef.
- */
- @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1")
- def actorClass: Class[_ <: Actor]
-
- /**
- * Akka Java API. <p/>
- * Returns the class for the Actor instance that is managed by the ActorRef.
- */
- @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1")
- def getActorClass(): Class[_ <: Actor] = actorClass
-
- /**
- * Returns the class name for the Actor instance that is managed by the ActorRef.
- */
- @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1")
- def actorClassName: String
-
- /**
- * Akka Java API. <p/>
- * Returns the class name for the Actor instance that is managed by the ActorRef.
- */
- @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1")
- def getActorClassName(): String = actorClassName
-
- /**
- * Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
- */
- def dispatcher_=(md: MessageDispatcher): Unit
-
- /**
- * Get the dispatcher for this actor.
- */
- def dispatcher: MessageDispatcher
-
- /**
- * Starts up the actor and its message queue.
- */
- def start(): ActorRef
-
- /**
- * Shuts down the actor its dispatcher and message queue.
- * Alias for 'stop'.
- */
- def exit() = stop()
-
- /**
- * Shuts down the actor its dispatcher and message queue.
- */
- def stop(): Unit
-
- /**
- * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
- * receive a notification if the linked actor has crashed.
- * <p/>
- * If the 'trapExit' member field of the 'faultHandler' has been set to at contain at least one exception class then it will
- * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy
- * defined by the 'faultHandler'.
- */
- def link(actorRef: ActorRef): Unit
-
- /**
- * Unlink the actor.
- */
- def unlink(actorRef: ActorRef): Unit
-
- /**
- * Atomically start and link an actor.
- */
- def startLink(actorRef: ActorRef): Unit
-
- /**
- * Atomically create (from actor class) and start an actor.
- * <p/>
- * To be invoked from within the actor itself.
- */
- @deprecated("Will be removed after 1.1, use Actor.actorOf instead", "1.1")
- def spawn(clazz: Class[_ <: Actor]): ActorRef
-
- /**
- * Atomically create (from actor class), make it remote and start an actor.
- * <p/>
- * To be invoked from within the actor itself.
- */
- @deprecated("Will be removed after 1.1, client managed actors will be removed", "1.1")
- def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef
-
- /**
- * Atomically create (from actor class), link and start an actor.
- * <p/>
- * To be invoked from within the actor itself.
- */
- @deprecated("Will be removed after 1.1, use Actor.remote.actorOf instead and then link on success", "1.1")
- def spawnLink(clazz: Class[_ <: Actor]): ActorRef
-
- /**
- * Atomically create (from actor class), make it remote, link and start an actor.
- * <p/>
- * To be invoked from within the actor itself.
- */
- @deprecated("Will be removed after 1.1, client managed actors will be removed", "1.1")
- def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef
-
- /**
- * Returns the mailbox size.
- */
- def mailboxSize = dispatcher.mailboxSize(this)
-
- /**
- * Akka Java API. <p/>
- * Returns the mailbox size.
- */
- def getMailboxSize(): Int = mailboxSize
-
- /**
- * Returns the supervisor, if there is one.
- */
- def supervisor: Option[ActorRef]
-
- /**
- * Akka Java API. <p/>
- * Returns the supervisor, if there is one.
- */
- def getSupervisor(): ActorRef = supervisor getOrElse null
-
- /**
- * Returns an unmodifiable Java Map containing the linked actors,
- * please note that the backing map is thread-safe but not immutable
- */
- def linkedActors: JMap[Uuid, ActorRef]
-
- /**
- * Java API. <p/>
- * Returns an unmodifiable Java Map containing the linked actors,
- * please note that the backing map is thread-safe but not immutable
- */
- def getLinkedActors(): JMap[Uuid, ActorRef] = linkedActors
-
- /**
- * Abstraction for unification of sender and senderFuture for later reply
- */
- def channel: Channel[Any] = {
- if (senderFuture.isDefined) {
- new Channel[Any] {
- val future = senderFuture.get
- def !(msg: Any) = future completeWithResult msg
- }
- } else if (sender.isDefined) {
- val someSelf = Some(this)
- new Channel[Any] {
- val client = sender.get
- def !(msg: Any) = client.!(msg)(someSelf)
- }
- } else throw new IllegalActorStateException("No channel available")
- }
-
- /**
- * Java API. <p/>
- * Abstraction for unification of sender and senderFuture for later reply
- */
- def getChannel: Channel[Any] = channel
-
- protected[akka] def invoke(messageHandle: MessageInvocation): Unit
-
- protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit
-
- protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
- message: Any,
- timeout: Long,
- senderOption: Option[ActorRef],
- senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T]
-
- protected[akka] def actorInstance: AtomicReference[Actor]
-
- protected[akka] def actor: Actor = actorInstance.get
-
- protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit
-
- protected[akka] def mailbox: AnyRef
- protected[akka] def mailbox_=(value: AnyRef): AnyRef
-
- protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit
-
- protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit
-
- protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit
-
- protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid]
-
- override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid)
-
- override def equals(that: Any): Boolean = {
- that.isInstanceOf[ActorRef] &&
- that.asInstanceOf[ActorRef].uuid == uuid
- }
-
- override def toString = "Actor[" + id + ":" + uuid + "]"
-}
-
-/**
- * Local (serializable) ActorRef that is used when referencing the Actor on its "home" node.
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-class LocalActorRef private[akka] (
- private[this] val actorFactory: () => Actor,
- val homeAddress: Option[InetSocketAddress],
- val clientManaged: Boolean = false)
- extends ActorRef with ScalaActorRef {
- protected[akka] val guard = new ReentrantGuard
-
- @volatile
- protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
- @volatile
- private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef]
- @volatile
- private[akka] var _supervisor: Option[ActorRef] = None
- @volatile
- private var maxNrOfRetriesCount: Int = 0
- @volatile
- private var restartsWithinTimeRangeTimestamp: Long = 0L
- @volatile
- private var _mailbox: AnyRef = _
- @volatile
- private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
-
- protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
-
- //If it was started inside "newActor", initialize it
- if (isRunning) initializeActorInstance
-
- // used only for deserialization
- private[akka] def this(
- __uuid: Uuid,
- __id: String,
- __timeout: Long,
- __receiveTimeout: Option[Long],
- __lifeCycle: LifeCycle,
- __supervisor: Option[ActorRef],
- __hotswap: Stack[PartialFunction[Any, Unit]],
- __factory: () => Actor,
- __homeAddress: Option[InetSocketAddress]) = {
- this(__factory, __homeAddress)
- _uuid = __uuid
- id = __id
- timeout = __timeout
- receiveTimeout = __receiveTimeout
- lifeCycle = __lifeCycle
- _supervisor = __supervisor
- hotswap = __hotswap
- setActorSelfFields(actor, this)
- start
- }
-
- /**
- * Returns whether this actor ref is client-managed remote or not
- */
- private[akka] final def isClientManaged_? = clientManaged && homeAddress.isDefined && isRemotingEnabled
-
- // ========= PUBLIC FUNCTIONS =========
-
- /**
- * Returns the class for the Actor instance that is managed by the ActorRef.
- */
- @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1")
- def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]]
-
- /**
- * Returns the class name for the Actor instance that is managed by the ActorRef.
- */
- @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1")
- def actorClassName: String = actorClass.getName
-
- /**
- * Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
- */
- def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard {
- if (!isBeingRestarted) {
- if (!isRunning) _dispatcher = md
- else throw new ActorInitializationException(
- "Can not swap dispatcher for " + toString + " after it has been started")
- }
- }
-
- /**
- * Get the dispatcher for this actor.
- */
- def dispatcher: MessageDispatcher = _dispatcher
-
- /**
- * Starts up the actor and its message queue.
- */
- def start(): ActorRef = guard.withGuard {
- if (isShutdown) throw new ActorStartException(
- "Can't restart an actor that has been shut down with 'stop' or 'exit'")
- if (!isRunning) {
- dispatcher.attach(this)
-
- _status = ActorRefInternals.RUNNING
-
- // If we are not currently creating this ActorRef instance
- if ((actorInstance ne null) && (actorInstance.get ne null))
- initializeActorInstance
-
- if (isClientManaged_?)
- Actor.remote.registerClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid)
-
- checkReceiveTimeout //Schedule the initial Receive timeout
- }
- this
- }
-
- /**
- * Shuts down the actor its dispatcher and message queue.
- */
- def stop() = guard.withGuard {
- if (isRunning) {
- receiveTimeout = None
- cancelReceiveTimeout
- dispatcher.detach(this)
- _status = ActorRefInternals.SHUTDOWN
- try {
- actor.postStop
- } finally {
- currentMessage = null
- Actor.registry.unregister(this)
- if (isRemotingEnabled) {
- if (isClientManaged_?)
- Actor.remote.unregisterClientManagedActor(homeAddress.get.getAddress.getHostAddress, homeAddress.get.getPort, uuid)
- Actor.remote.unregister(this)
- }
- setActorSelfFields(actorInstance.get, null)
- }
- } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
- }
-
- /**
- * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
- * receive a notification if the linked actor has crashed.
- * <p/>
- * If the 'trapExit' member field of the 'faultHandler' has been set to at contain at least one exception class then it will
- * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy
- * defined by the 'faultHandler'.
- * <p/>
- * To be invoked from within the actor itself.
- */
- def link(actorRef: ActorRef): Unit = guard.withGuard {
- val actorRefSupervisor = actorRef.supervisor
- val hasSupervisorAlready = actorRefSupervisor.isDefined
- if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return // we already supervise this guy
- else if (hasSupervisorAlready) throw new IllegalActorStateException(
- "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
- else {
- _linkedActors.put(actorRef.uuid, actorRef)
- actorRef.supervisor = Some(this)
- }
- }
-
- /**
- * Unlink the actor.
- * <p/>
- * To be invoked from within the actor itself.
- */
- def unlink(actorRef: ActorRef) = guard.withGuard {
- if (_linkedActors.remove(actorRef.uuid) eq null)
- throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink")
-
- actorRef.supervisor = None
- }
-
- /**
- * Atomically start and link an actor.
- * <p/>
- * To be invoked from within the actor itself.
- */
- def startLink(actorRef: ActorRef): Unit = guard.withGuard {
- link(actorRef)
- actorRef.start()
- }
-
- /**
- * Atomically create (from actor class) and start an actor.
- * <p/>
- * To be invoked from within the actor itself.
- */
- def spawn(clazz: Class[_ <: Actor]): ActorRef =
- Actor.actorOf(clazz).start()
-
- /**
- * Atomically create (from actor class), start and make an actor remote.
- * <p/>
- * To be invoked from within the actor itself.
- */
- def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = {
- ensureRemotingEnabled
- val ref = Actor.remote.actorOf(clazz, hostname, port)
- ref.timeout = timeout
- ref.start()
- }
-
- /**
- * Atomically create (from actor class), start and link an actor.
- * <p/>
- * To be invoked from within the actor itself.
- */
- def spawnLink(clazz: Class[_ <: Actor]): ActorRef = {
- val actor = spawn(clazz)
- link(actor)
- actor.start()
- actor
- }
-
- /**
- * Atomically create (from actor class), start, link and make an actor remote.
- * <p/>
- * To be invoked from within the actor itself.
- */
- def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef = {
- ensureRemotingEnabled
- val actor = Actor.remote.actorOf(clazz, hostname, port)
- actor.timeout = timeout
- link(actor)
- actor.start()
- actor
- }
-
- /**
- * Returns the mailbox.
- */
- def mailbox: AnyRef = _mailbox
-
- protected[akka] def mailbox_=(value: AnyRef): AnyRef = { _mailbox = value; value }
-
- /**
- * Returns the supervisor, if there is one.
- */
- def supervisor: Option[ActorRef] = _supervisor
-
- // ========= AKKA PROTECTED FUNCTIONS =========
-
- protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
-
- protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
- if (isClientManaged_?) {
- Actor.remote.send[Any](
- message, senderOption, None, homeAddress.get, timeout, true, this, None, ActorType.ScalaActor, None)
- } else
- dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None)
-
- protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
- message: Any,
- timeout: Long,
- senderOption: Option[ActorRef],
- senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
- if (isClientManaged_?) {
- val future = Actor.remote.send[T](
- message, senderOption, senderFuture, homeAddress.get, timeout, false, this, None, ActorType.ScalaActor, None)
- if (future.isDefined) future.get
- else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
- } else {
- val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout))
- dispatcher dispatchMessage new MessageInvocation(
- this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]])
- future.get
- }
- }
-
- /**
- * Callback for the dispatcher. This is the single entry point to the user Actor implementation.
- */
- protected[akka] def invoke(messageHandle: MessageInvocation): Unit = {
- guard.lock.lock
- try {
- if (!isShutdown) {
- currentMessage = messageHandle
- try {
- try {
- cancelReceiveTimeout // FIXME: leave this here?
- actor(messageHandle.message)
- currentMessage = null // reset current message after successful invocation
- } catch {
- case e: InterruptedException =>
- currentMessage = null // received message while actor is shutting down, ignore
- case e =>
- handleExceptionInDispatch(e, messageHandle.message)
- }
- finally {
- checkReceiveTimeout // Reschedule receive timeout
- }
- } catch {
- case e =>
- EventHandler.error(e, this, messageHandle.message.toString)
- throw e
- }
- }
- } finally { guard.lock.unlock }
- }
-
- protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
- faultHandler match {
- case AllForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) =>
- restartLinkedActors(reason, maxRetries, within)
-
- case OneForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) =>
- dead.restart(reason, maxRetries, within)
-
- case _ =>
- if (_supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason))
- else dead.stop()
- }
- }
-
- private def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = {
- val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal
- false
- } else if (withinTimeRange.isEmpty) { // restrict number of restarts
- maxNrOfRetriesCount += 1 //Increment number of retries
- maxNrOfRetriesCount > maxNrOfRetries.get
- } else { // cannot restart more than N within M timerange
- maxNrOfRetriesCount += 1 //Increment number of retries
- val windowStart = restartsWithinTimeRangeTimestamp
- val now = System.currentTimeMillis
- val retries = maxNrOfRetriesCount
- //We are within the time window if it isn't the first restart, or if the window hasn't closed
- val insideWindow = if (windowStart == 0) false
- else (now - windowStart) <= withinTimeRange.get
-
- //The actor is dead if it dies X times within the window of restart
- val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1)
-
- if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window
- restartsWithinTimeRangeTimestamp = now
-
- if (windowStart != 0 && !insideWindow) //Reset number of restarts if window has expired
- maxNrOfRetriesCount = 1
-
- unrestartable
- }
-
- denied == false //If we weren't denied, we have a go
- }
-
- protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
- def performRestart() {
- val failedActor = actorInstance.get
-
- failedActor match {
- case p: Proxyable =>
- failedActor.preRestart(reason)
- failedActor.postRestart(reason)
- case _ =>
- failedActor.preRestart(reason)
- val freshActor = newActor
- setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor
- actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call
- freshActor.preStart
- freshActor.postRestart(reason)
- }
- }
-
- def tooManyRestarts() {
- _supervisor.foreach { sup =>
- // can supervisor handle the notification?
- val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
- if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification)
- }
- stop
- }
-
- @tailrec
- def attemptRestart() {
- val success = if (requestRestartPermission(maxNrOfRetries, withinTimeRange)) {
- guard.withGuard[Boolean] {
- _status = ActorRefInternals.BEING_RESTARTED
-
- lifeCycle match {
- case Temporary =>
- shutDownTemporaryActor(this)
- true
-
- case _ => // either permanent or none where default is permanent
- val success = try {
- performRestart()
- true
- } catch {
- case e =>
- EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString))
- false // an error or exception here should trigger a retry
- }
- finally {
- currentMessage = null
- }
- if (success) {
- _status = ActorRefInternals.RUNNING
- dispatcher.resume(this)
- restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
- }
- success
- }
- }
- } else {
- tooManyRestarts()
- true // done
- }
-
- if (success) () // alles gut
- else attemptRestart()
- }
-
- attemptRestart() // recur
- }
-
- protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = {
- val i = _linkedActors.values.iterator
- while (i.hasNext) {
- val actorRef = i.next
- actorRef.lifeCycle match {
- // either permanent or none where default is permanent
- case Temporary => shutDownTemporaryActor(actorRef)
- case _ => actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
- }
- }
- }
-
- protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard {
- ensureRemotingEnabled
- if (_supervisor.isDefined) {
- if (homeAddress.isDefined) Actor.remote.registerSupervisorForActor(this)
- Some(_supervisor.get.uuid)
- } else None
- }
-
- def linkedActors: JMap[Uuid, ActorRef] = java.util.Collections.unmodifiableMap(_linkedActors)
-
- // ========= PRIVATE FUNCTIONS =========
-
- private[this] def newActor: Actor = {
- try {
- Actor.actorRefInCreation.set(Some(this))
- val a = actorFactory()
- if (a eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
- a
- } finally {
- Actor.actorRefInCreation.set(None)
- }
- }
-
- private def shutDownTemporaryActor(temporaryActor: ActorRef) {
- temporaryActor.stop()
- _linkedActors.remove(temporaryActor.uuid) // remove the temporary actor
- // if last temporary actor is gone, then unlink me from supervisor
- if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this))
- true
- }
-
- private def handleExceptionInDispatch(reason: Throwable, message: Any) = {
- EventHandler.error(reason, this, message.toString)
-
- //Prevent any further messages to be processed until the actor has been restarted
- dispatcher.suspend(this)
-
- senderFuture.foreach(_.completeWithException(reason))
-
- if (supervisor.isDefined) notifySupervisorWithMessage(Exit(this, reason))
- else {
- lifeCycle match {
- case Temporary => shutDownTemporaryActor(this)
- case _ => dispatcher.resume(this) //Resume processing for this actor
- }
- }
- }
-
- private def notifySupervisorWithMessage(notification: LifeCycleMessage) = {
- // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
- _supervisor.foreach { sup =>
- if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors
- //Scoped stop all linked actors, to avoid leaking the 'i' val
- {
- val i = _linkedActors.values.iterator
- while (i.hasNext) {
- i.next.stop()
- i.remove
- }
- }
- //Stop the actor itself
- stop
- } else sup ! notification // else notify supervisor
- }
- }
-
- private def setActorSelfFields(actor: Actor, value: ActorRef) {
-
- @tailrec
- def lookupAndSetSelfFields(clazz: Class[_], actor: Actor, value: ActorRef): Boolean = {
- val success = try {
- val selfField = clazz.getDeclaredField("self")
- val someSelfField = clazz.getDeclaredField("someSelf")
- selfField.setAccessible(true)
- someSelfField.setAccessible(true)
- selfField.set(actor, value)
- someSelfField.set(actor, if (value ne null) Some(value) else null)
- true
- } catch {
- case e: NoSuchFieldException => false
- }
-
- if (success) true
- else {
- val parent = clazz.getSuperclass
- if (parent eq null)
- throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait")
- lookupAndSetSelfFields(parent, actor, value)
- }
- }
-
- lookupAndSetSelfFields(actor.getClass, actor, value)
- }
-
- private def initializeActorInstance = {
- actor.preStart // run actor preStart
- Actor.registry.register(this)
- }
-
- protected[akka] def checkReceiveTimeout = {
- cancelReceiveTimeout
- if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed
- _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS))
- }
- }
-
- protected[akka] def cancelReceiveTimeout = {
- if (_futureTimeout.isDefined) {
- _futureTimeout.get.cancel(true)
- _futureTimeout = None
- }
- }
-}
-
-/**
- * System messages for RemoteActorRef.
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-object RemoteActorSystemMessage {
- val Stop = "RemoteActorRef:stop".intern
-}
-
-/**
- * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
- * This reference is network-aware (remembers its origin) and immutable.
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-private[akka] case class RemoteActorRef private[akka] (
- classOrServiceName: String,
- val actorClassName: String,
- val hostname: String,
- val port: Int,
- _timeout: Long,
- loader: Option[ClassLoader],
- val actorType: ActorType = ActorType.ScalaActor)
- extends ActorRef with ScalaActorRef {
-
- ensureRemotingEnabled
-
- val homeAddress = Some(new InetSocketAddress(hostname, port))
-
- //protected def clientManaged = classOrServiceName.isEmpty //If no class or service name, it's client managed
- id = classOrServiceName
- //id = classOrServiceName.getOrElse("uuid:" + uuid) //If we're a server-managed we want to have classOrServiceName as id, or else, we're a client-managed and we want to have our uuid as id
-
- timeout = _timeout
-
- start
-
- def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
- Actor.remote.send[Any](message, senderOption, None, homeAddress.get, timeout, true, this, None, actorType, loader)
-
- def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
- message: Any,
- timeout: Long,
- senderOption: Option[ActorRef],
- senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
- val future = Actor.remote.send[T](
- message, senderOption, senderFuture,
- homeAddress.get, timeout,
- false, this, None,
- actorType, loader)
- if (future.isDefined) future.get
- else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
- }
-
- def start: ActorRef = synchronized {
- _status = ActorRefInternals.RUNNING
- this
- }
-
- def stop: Unit = synchronized {
- if (_status == ActorRefInternals.RUNNING) {
- _status = ActorRefInternals.SHUTDOWN
- postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
- }
- }
-
- protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = None
-
- // ==== NOT SUPPORTED ====
- @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1")
- def actorClass: Class[_ <: Actor] = unsupported
- def dispatcher_=(md: MessageDispatcher): Unit = unsupported
- def dispatcher: MessageDispatcher = unsupported
- def link(actorRef: ActorRef): Unit = unsupported
- def unlink(actorRef: ActorRef): Unit = unsupported
- def startLink(actorRef: ActorRef): Unit = unsupported
- def spawn(clazz: Class[_ <: Actor]): ActorRef = unsupported
- def spawnRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported
- def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported
- def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported
- def supervisor: Option[ActorRef] = unsupported
- def linkedActors: JMap[Uuid, ActorRef] = unsupported
- protected[akka] def mailbox: AnyRef = unsupported
- protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
- protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
- protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
- protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
- protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported
- protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported
- protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
- private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
-}
-
-/**
- * This trait represents the common (external) methods for all ActorRefs
- * Needed because implicit conversions aren't applied when instance imports are used
- *
- * i.e.
- * var self: ScalaActorRef = ...
- * import self._
- * //can't call ActorRef methods here unless they are declared in a common
- * //superclass, which ActorRefShared is.
- */
-trait ActorRefShared {
- /**
- * Returns the uuid for the actor.
- */
- def uuid: Uuid
-}
-
-/**
- * This trait represents the Scala Actor API
- * There are implicit conversions in ../actor/Implicits.scala
- * from ActorRef -> ScalaActorRef and back
- */
-trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
-
- /**
- * Identifier for actor, does not have to be a unique one. Default is the 'uuid'.
- * <p/>
- * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote
- * actor in RemoteServer etc.But also as the identifier for persistence, which means
- * that you can use a custom name to be able to retrieve the "correct" persisted state
- * upon restart, remote restart etc.
- */
- def id: String
-
- def id_=(id: String): Unit
-
- /**
- * User overridable callback/setting.
- * <p/>
- * Defines the life-cycle for a supervised actor.
- */
- @volatile
- @BeanProperty
- var lifeCycle: LifeCycle = UndefinedLifeCycle
-
- /**
- * User overridable callback/setting.
- * <p/>
- * Don't forget to supply a List of exception types to intercept (trapExit)
- * <p/>
- * Can be one of:
- * <pre>
- * faultHandler = AllForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange)
- * </pre>
- * Or:
- * <pre>
- * faultHandler = OneForOneStrategy(trapExit = List(classOf[Exception]), maxNrOfRetries, withinTimeRange)
- * </pre>
- */
- @volatile
- @BeanProperty
- var faultHandler: FaultHandlingStrategy = NoFaultHandlingStrategy
-
- /**
- * The reference sender Actor of the last received message.
- * Is defined if the message was sent from another Actor, else None.
- */
- def sender: Option[ActorRef] = {
- val msg = currentMessage
- if (msg eq null) None
- else msg.sender
- }
-
- /**
- * The reference sender future of the last received message.
- * Is defined if the message was sent with sent with '!!' or '!!!', else None.
- */
- def senderFuture(): Option[CompletableFuture[Any]] = {
- val msg = currentMessage
- if (msg eq null) None
- else msg.senderFuture
- }
-
- /**
- * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
- * <p/>
- *
- * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
- * <p/>
- *
- * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable,
- * if invoked from within an Actor. If not then no sender is available.
- * <pre>
- * actor ! message
- * </pre>
- * <p/>
- */
- def !(message: Any)(implicit sender: Option[ActorRef] = None): Unit = {
- if (isRunning) postMessageToMailbox(message, sender)
- else throw new ActorInitializationException(
- "Actor has not been started, you need to invoke 'actor.start()' before using it")
- }
-
- /**
- * Sends a message asynchronously and waits on a future for a reply message.
- * <p/>
- * It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>)
- * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
- * <p/>
- * <b>NOTE:</b>
- * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
- * implement request/response message exchanges.
- * If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
- * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
- */
- def !!(message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Option[Any] = {
- if (isRunning) {
- val future = postMessageToMailboxAndCreateFutureResultWithTimeout[Any](message, timeout, sender, None)
- val isMessageJoinPoint = if (isTypedActorEnabled) TypedActorModule.resolveFutureIfMessageIsJoinPoint(message, future)
- else false
- try {
- future.await
- } catch {
- case e: FutureTimeoutException =>
- if (isMessageJoinPoint) {
- EventHandler.error(e, this, e.getMessage)
- throw e
- } else None
- }
- future.resultOrException
- } else throw new ActorInitializationException(
- "Actor has not been started, you need to invoke 'actor.start()' before using it")
- }
-
- /**
- * Sends a message asynchronously returns a future holding the eventual reply message.
- * <p/>
- * <b>NOTE:</b>
- * Use this method with care. In most cases it is better to use '!' together with the 'sender' member field to
- * implement request/response message exchanges.
- * If you are sending messages using <code>!!!</code> then you <b>have to</b> use <code>self.reply(..)</code>
- * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
- */
- def !!![T](message: Any, timeout: Long = this.timeout)(implicit sender: Option[ActorRef] = None): Future[T] = {
- if (isRunning) postMessageToMailboxAndCreateFutureResultWithTimeout[T](message, timeout, sender, None)
- else throw new ActorInitializationException(
- "Actor has not been started, you need to invoke 'actor.start()' before using it")
- }
-
- /**
- * Forwards the message and passes the original sender actor as the sender.
- * <p/>
- * Works with '!', '!!' and '!!!'.
- */
- def forward(message: Any)(implicit sender: Some[ActorRef]) = {
- if (isRunning) {
- if (sender.get.senderFuture.isDefined)
- postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, sender.get.sender, sender.get.senderFuture)
- else
- postMessageToMailbox(message, sender.get.sender)
- } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor.start()' before using it")
- }
-
- /**
- * Use <code>self.reply(..)</code> to reply with a message to the original sender of the message currently
- * being processed.
- * <p/>
- * Throws an IllegalStateException if unable to determine what to reply to.
- */
- def reply(message: Any) = if (!reply_?(message)) throw new IllegalActorStateException(
- "\n\tNo sender in scope, can't reply. " +
- "\n\tYou have probably: " +
- "\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
- "\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." +
- "\n\tElse you might want to use 'reply_?' which returns Boolean(true) if success and Boolean(false) if no sender in scope")
-
- /**
- * Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
- * being processed.
- * <p/>
- * Returns true if reply was sent, and false if unable to determine what to reply to.
- */
- def reply_?(message: Any): Boolean = {
- if (senderFuture.isDefined) {
- senderFuture.get completeWithResult message
- true
- } else if (sender.isDefined) {
- //TODO: optimize away this allocation, perhaps by having implicit self: Option[ActorRef] in signature
- sender.get.!(message)(Some(this))
- true
- } else false
- }
-
- /**
- * Atomically create (from actor class) and start an actor.
- */
- def spawn[T <: Actor: ClassTag]: ActorRef =
- spawn(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]])
-
- /**
- * Atomically create (from actor class), start and make an actor remote.
- */
- def spawnRemote[T <: Actor: ClassTag](hostname: String, port: Int, timeout: Long): ActorRef = {
- ensureRemotingEnabled
- spawnRemote(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout)
- }
-
- /**
- * Atomically create (from actor class), start and link an actor.
- */
- def spawnLink[T <: Actor: ClassTag]: ActorRef =
- spawnLink(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]])
-
- /**
- * Atomically create (from actor class), start, link and make an actor remote.
- */
- def spawnLinkRemote[T <: Actor: ClassTag](hostname: String, port: Int, timeout: Long): ActorRef = {
- ensureRemotingEnabled
- spawnLinkRemote(classTag[T].erasure.asInstanceOf[Class[_ <: Actor]], hostname, port, timeout)
- }
-}
diff --git a/test/disabled/presentation/akka/src/akka/actor/ActorRegistry.scala b/test/disabled/presentation/akka/src/akka/actor/ActorRegistry.scala
deleted file mode 100644
index 5d649fcd36..0000000000
--- a/test/disabled/presentation/akka/src/akka/actor/ActorRegistry.scala
+++ /dev/null
@@ -1,389 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.actor
-
-import scala.collection.mutable.{ ListBuffer, Map }
-import scala.reflect.ArrayTag
-
-import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap }
-import java.util.{ Set => JSet }
-
-import annotation.tailrec
-import akka.util.ReflectiveAccess._
-import akka.util.{ ReflectiveAccess, ReadWriteGuard, ListenerManagement }
-
-/**
- * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-sealed trait ActorRegistryEvent
-case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
-case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
-
-/**
- * Registry holding all Actor instances in the whole system.
- * Mapped by:
- * <ul>
- * <li>the Actor's UUID</li>
- * <li>the Actor's id field (which can be set by user-code)</li>
- * <li>the Actor's class</li>
- * <li>all Actors that are subtypes of a specific type</li>
- * <ul>
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-
-final class ActorRegistry private[actor] () extends ListenerManagement {
-
- private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef]
- private val actorsById = new Index[String, ActorRef]
- private val guard = new ReadWriteGuard
-
- /**
- * Returns all actors in the system.
- */
- def actors: Array[ActorRef] = filter(_ => true)
-
- /**
- * Returns the number of actors in the system.
- */
- def size: Int = actorsByUUID.size
-
- /**
- * Invokes a function for all actors.
- */
- def foreach(f: (ActorRef) => Unit) = {
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) f(elements.nextElement)
- }
-
- /**
- * Invokes the function on all known actors until it returns Some
- * Returns None if the function never returns Some
- */
- def find[T](f: PartialFunction[ActorRef, T]): Option[T] = {
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) {
- val element = elements.nextElement
- if (f isDefinedAt element) return Some(f(element))
- }
- None
- }
-
- /**
- * Finds all actors that are subtypes of the class passed in as the ClassTag argument and supporting passed message.
- */
- def actorsFor[T <: Actor](message: Any)(implicit classTag: ClassTag[T]): Array[ActorRef] =
- filter(a => classTag.erasure.isAssignableFrom(a.actor.getClass) && a.isDefinedAt(message))
-
- /**
- * Finds all actors that satisfy a predicate.
- */
- def filter(p: ActorRef => Boolean): Array[ActorRef] = {
- val all = new ListBuffer[ActorRef]
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) {
- val actorId = elements.nextElement
- if (p(actorId)) all += actorId
- }
- all.toArray
- }
-
- /**
- * Finds all actors that are subtypes of the class passed in as the ClassTag argument.
- */
- def actorsFor[T <: Actor](implicit classTag: ClassTag[T]): Array[ActorRef] =
- actorsFor[T](classTag.erasure.asInstanceOf[Class[T]])
-
- /**
- * Finds any actor that matches T. Very expensive, traverses ALL alive actors.
- */
- def actorFor[T <: Actor](implicit classTag: ClassTag[T]): Option[ActorRef] =
- find({ case a: ActorRef if classTag.erasure.isAssignableFrom(a.actor.getClass) => a })
-
- /**
- * Finds all actors of type or sub-type specified by the class passed in as the Class argument.
- */
- def actorsFor[T <: Actor](clazz: Class[T]): Array[ActorRef] =
- filter(a => clazz.isAssignableFrom(a.actor.getClass))
-
- /**
- * Finds all actors that has a specific id.
- */
- def actorsFor(id: String): Array[ActorRef] = actorsById values id
-
- /**
- * Finds the actor that has a specific UUID.
- */
- def actorFor(uuid: Uuid): Option[ActorRef] = Option(actorsByUUID get uuid)
-
- /**
- * Returns all typed actors in the system.
- */
- def typedActors: Array[AnyRef] = filterTypedActors(_ => true)
-
- /**
- * Invokes a function for all typed actors.
- */
- def foreachTypedActor(f: (AnyRef) => Unit) = {
- TypedActorModule.ensureEnabled
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) {
- val proxy = typedActorFor(elements.nextElement)
- if (proxy.isDefined) f(proxy.get)
- }
- }
-
- /**
- * Invokes the function on all known typed actors until it returns Some
- * Returns None if the function never returns Some
- */
- def findTypedActor[T](f: PartialFunction[AnyRef, T]): Option[T] = {
- TypedActorModule.ensureEnabled
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) {
- val proxy = typedActorFor(elements.nextElement)
- if (proxy.isDefined && (f isDefinedAt proxy)) return Some(f(proxy))
- }
- None
- }
-
- /**
- * Finds all typed actors that satisfy a predicate.
- */
- def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = {
- TypedActorModule.ensureEnabled
- val all = new ListBuffer[AnyRef]
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) {
- val proxy = typedActorFor(elements.nextElement)
- if (proxy.isDefined && p(proxy.get)) all += proxy.get
- }
- all.toArray
- }
-
- /**
- * Finds all typed actors that are subtypes of the class passed in as the ClassTag argument.
- */
- def typedActorsFor[T <: AnyRef](implicit classTag: ClassTag[T]): Array[AnyRef] = {
- TypedActorModule.ensureEnabled
- typedActorsFor[T](classTag.erasure.asInstanceOf[Class[T]])
- }
-
- /**
- * Finds any typed actor that matches T.
- */
- def typedActorFor[T <: AnyRef](implicit classTag: ClassTag[T]): Option[AnyRef] = {
- TypedActorModule.ensureEnabled
- def predicate(proxy: AnyRef): Boolean = {
- val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
- actorRef.isDefined && classTag.erasure.isAssignableFrom(actorRef.get.actor.getClass)
- }
- findTypedActor({ case a: Some[AnyRef] if predicate(a.get) => a })
- }
-
- /**
- * Finds all typed actors of type or sub-type specified by the class passed in as the Class argument.
- */
- def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = {
- TypedActorModule.ensureEnabled
- def predicate(proxy: AnyRef): Boolean = {
- val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
- actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass)
- }
- filterTypedActors(predicate)
- }
-
- /**
- * Finds all typed actors that have a specific id.
- */
- def typedActorsFor(id: String): Array[AnyRef] = {
- TypedActorModule.ensureEnabled
- val actorRefs = actorsById values id
- actorRefs.flatMap(typedActorFor(_))
- }
-
- /**
- * Finds the typed actor that has a specific UUID.
- */
- def typedActorFor(uuid: Uuid): Option[AnyRef] = {
- TypedActorModule.ensureEnabled
- val actorRef = actorsByUUID get uuid
- if (actorRef eq null) None
- else typedActorFor(actorRef)
- }
-
- /**
- * Get the typed actor proxy for a given typed actor ref.
- */
- private def typedActorFor(actorRef: ActorRef): Option[AnyRef] = {
- TypedActorModule.typedActorObjectInstance.get.proxyFor(actorRef)
- }
-
- /**
- * Registers an actor in the ActorRegistry.
- */
- private[akka] def register(actor: ActorRef) {
- val id = actor.id
- val uuid = actor.uuid
-
- actorsById.put(id, actor)
- actorsByUUID.put(uuid, actor)
-
- // notify listeners
- notifyListeners(ActorRegistered(actor))
- }
-
- /**
- * Unregisters an actor in the ActorRegistry.
- */
- private[akka] def unregister(actor: ActorRef) {
- val id = actor.id
- val uuid = actor.uuid
-
- actorsByUUID remove uuid
- actorsById.remove(id, actor)
-
- // notify listeners
- notifyListeners(ActorUnregistered(actor))
- }
-
- /**
- * Shuts down and unregisters all actors in the system.
- */
- def shutdownAll() {
- if (TypedActorModule.isEnabled) {
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) {
- val actorRef = elements.nextElement
- val proxy = typedActorFor(actorRef)
- if (proxy.isDefined) TypedActorModule.typedActorObjectInstance.get.stop(proxy.get)
- else actorRef.stop()
- }
- } else foreach(_.stop())
- if (Remote.isEnabled) {
- Actor.remote.clear //TODO: REVISIT: Should this be here?
- }
- actorsByUUID.clear
- actorsById.clear
- }
-}
-
-/**
- * An implementation of a ConcurrentMultiMap
- * Adds/remove is serialized over the specified key
- * Reads are fully concurrent <-- el-cheapo
- *
- * @author Viktor Klang
- */
-class Index[K <: AnyRef, V <: AnyRef: ArrayTag] {
- private val Naught = Array[V]() //Nil for Arrays
- private val container = new ConcurrentHashMap[K, JSet[V]]
- private val emptySet = new ConcurrentSkipListSet[V]
-
- /**
- * Associates the value of type V with the key of type K
- * @return true if the value didn't exist for the key previously, and false otherwise
- */
- def put(key: K, value: V): Boolean = {
- //Tailrecursive spin-locking put
- @tailrec
- def spinPut(k: K, v: V): Boolean = {
- var retry = false
- var added = false
- val set = container get k
-
- if (set ne null) {
- set.synchronized {
- if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
- else { //Else add the value to the set and signal that retry is not needed
- added = set add v
- retry = false
- }
- }
- } else {
- val newSet = new ConcurrentSkipListSet[V]
- newSet add v
-
- // Parry for two simultaneous putIfAbsent(id,newSet)
- val oldSet = container.putIfAbsent(k, newSet)
- if (oldSet ne null) {
- oldSet.synchronized {
- if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
- else { //Else try to add the value to the set and signal that retry is not needed
- added = oldSet add v
- retry = false
- }
- }
- } else added = true
- }
-
- if (retry) spinPut(k, v)
- else added
- }
-
- spinPut(key, value)
- }
-
- /**
- * @return a _new_ array of all existing values for the given key at the time of the call
- */
- def values(key: K): Array[V] = {
- val set: JSet[V] = container get key
- val result = if (set ne null) set toArray Naught else Naught
- result.asInstanceOf[Array[V]]
- }
-
- /**
- * @return Some(value) for the first matching value where the supplied function returns true for the given key,
- * if no matches it returns None
- */
- def findValue(key: K)(f: (V) => Boolean): Option[V] = {
- import scala.collection.JavaConversions._
- val set = container get key
- if (set ne null) set.iterator.find(f)
- else None
- }
-
- /**
- * Applies the supplied function to all keys and their values
- */
- def foreach(fun: (K, V) => Unit) {
- import scala.collection.JavaConversions._
- container.entrySet foreach { (e) =>
- e.getValue.foreach(fun(e.getKey, _))
- }
- }
-
- /**
- * Disassociates the value of type V from the key of type K
- * @return true if the value was disassociated from the key and false if it wasn't previously associated with the key
- */
- def remove(key: K, value: V): Boolean = {
- val set = container get key
-
- if (set ne null) {
- set.synchronized {
- if (set.remove(value)) { //If we can remove the value
- if (set.isEmpty) //and the set becomes empty
- container.remove(key, emptySet) //We try to remove the key if it's mapped to an empty set
-
- true //Remove succeeded
- } else false //Remove failed
- }
- } else false //Remove failed
- }
-
- /**
- * @return true if the underlying containers is empty, may report false negatives when the last remove is underway
- */
- def isEmpty: Boolean = container.isEmpty
-
- /**
- * Removes all keys and all values
- */
- def clear = foreach { case (k, v) => remove(k, v) }
-}
diff --git a/test/disabled/presentation/akka/src/akka/actor/Actors.java b/test/disabled/presentation/akka/src/akka/actor/Actors.java
deleted file mode 100644
index a5ec9f37dc..0000000000
--- a/test/disabled/presentation/akka/src/akka/actor/Actors.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package akka.actor;
-
-import akka.japi.Creator;
-import akka.remoteinterface.RemoteSupport;
-
-/**
- * JAVA API for
- * - creating actors,
- * - creating remote actors,
- * - locating actors
- */
-public class Actors {
- /**
- *
- * @return The actor registry
- */
- public static ActorRegistry registry() {
- return Actor$.MODULE$.registry();
- }
-
- /**
- *
- * @return
- * @throws UnsupportedOperationException If remoting isn't configured
- * @throws ModuleNotAvailableException If the class for the remote support cannot be loaded
- */
- public static RemoteSupport remote() {
- return Actor$.MODULE$.remote();
- }
-
- /**
- * NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the
- * UntypedActor instance directly, but only through its 'ActorRef' wrapper reference.
- * <p/>
- * Creates an ActorRef out of the Actor. Allows you to pass in the instance for the UntypedActor.
- * Only use this method when you need to pass in constructor arguments into the 'UntypedActor'.
- * <p/>
- * You use it by implementing the UntypedActorFactory interface.
- * Example in Java:
- * <pre>
- * ActorRef actor = Actors.actorOf(new UntypedActorFactory() {
- * public UntypedActor create() {
- * return new MyUntypedActor("service:name", 5);
- * }
- * });
- * actor.start();
- * actor.sendOneWay(message, context);
- * actor.stop();
- * </pre>
- */
- public static ActorRef actorOf(final Creator<Actor> factory) {
- return Actor$.MODULE$.actorOf(factory);
- }
-
- /**
- * Creates an ActorRef out of the Actor type represented by the class provided.
- * Example in Java:
- * <pre>
- * ActorRef actor = Actors.actorOf(MyUntypedActor.class);
- * actor.start();
- * actor.sendOneWay(message, context);
- * actor.stop();
- * </pre>
- * You can create and start the actor in one statement like this:
- * <pre>
- * val actor = Actors.actorOf(MyActor.class).start();
- * </pre>
- */
- public static ActorRef actorOf(final Class<? extends Actor> type) {
- return Actor$.MODULE$.actorOf(type);
- }
-
- /**
- * The message that is sent when an Actor gets a receive timeout.
- * <pre>
- * if( message == receiveTimeout() ) {
- * //Timed out
- * }
- * </pre>
- * @return the single instance of ReceiveTimeout
- */
- public final static ReceiveTimeout$ receiveTimeout() {
- return ReceiveTimeout$.MODULE$;
- }
-
- /**
- * The message that when sent to an Actor kills it by throwing an exception.
- * <pre>
- * actor.sendOneWay(kill());
- * </pre>
- * @return the single instance of Kill
- */
- public final static Kill$ kill() {
- return Kill$.MODULE$;
- }
-
-
- /**
- * The message that when sent to an Actor shuts it down by calling 'stop'.
- * <pre>
- * actor.sendOneWay(poisonPill());
- * </pre>
- * @return the single instance of PoisonPill
- */
- public final static PoisonPill$ poisonPill() {
- return PoisonPill$.MODULE$;
- }
-}
diff --git a/test/disabled/presentation/akka/src/akka/actor/BootableActorLoaderService.scala b/test/disabled/presentation/akka/src/akka/actor/BootableActorLoaderService.scala
deleted file mode 100644
index a54fca9ac7..0000000000
--- a/test/disabled/presentation/akka/src/akka/actor/BootableActorLoaderService.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.actor
-
-import java.io.File
-import java.net.{ URL, URLClassLoader }
-import java.util.jar.JarFile
-
-import akka.util.{ Bootable }
-import akka.config.Config._
-
-/**
- * Handles all modules in the deploy directory (load and unload)
- */
-trait BootableActorLoaderService extends Bootable {
-
- val BOOT_CLASSES = config.getList("akka.boot")
- lazy val applicationLoader: Option[ClassLoader] = createApplicationClassLoader
-
- protected def createApplicationClassLoader: Option[ClassLoader] = Some({
- if (HOME.isDefined) {
- val DEPLOY = HOME.get + "/deploy"
- val DEPLOY_DIR = new File(DEPLOY)
- if (!DEPLOY_DIR.exists) {
- System.exit(-1)
- }
- val filesToDeploy = DEPLOY_DIR.listFiles.toArray.toList
- .asInstanceOf[List[File]].filter(_.getName.endsWith(".jar"))
- var dependencyJars: List[URL] = Nil
- filesToDeploy.map { file =>
- val jarFile = new JarFile(file)
- val en = jarFile.entries
- while (en.hasMoreElements) {
- val name = en.nextElement.getName
- if (name.endsWith(".jar")) dependencyJars ::= new File(
- String.format("jar:file:%s!/%s", jarFile.getName, name)).toURI.toURL
- }
- }
- val toDeploy = filesToDeploy.map(_.toURI.toURL)
- val allJars = toDeploy ::: dependencyJars
-
- new URLClassLoader(allJars.toArray, Thread.currentThread.getContextClassLoader)
- } else Thread.currentThread.getContextClassLoader
- })
-
- abstract override def onLoad = {
- super.onLoad
-
- for (loader ← applicationLoader; clazz ← BOOT_CLASSES) {
- loader.loadClass(clazz).newInstance
- }
- }
-
- abstract override def onUnload = {
- super.onUnload
- Actor.registry.shutdownAll()
- }
-}
diff --git a/test/disabled/presentation/akka/src/akka/actor/FSM.scala b/test/disabled/presentation/akka/src/akka/actor/FSM.scala
deleted file mode 100644
index d9cd9a9ca2..0000000000
--- a/test/disabled/presentation/akka/src/akka/actor/FSM.scala
+++ /dev/null
@@ -1,527 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-package akka.actor
-
-import akka.util._
-import akka.event.EventHandler
-
-import scala.collection.mutable
-import java.util.concurrent.ScheduledFuture
-
-object FSM {
-
- object NullFunction extends PartialFunction[Any, Nothing] {
- def isDefinedAt(o: Any) = false
- def apply(o: Any) = sys.error("undefined")
- }
-
- case class CurrentState[S](fsmRef: ActorRef, state: S)
- case class Transition[S](fsmRef: ActorRef, from: S, to: S)
- case class SubscribeTransitionCallBack(actorRef: ActorRef)
- case class UnsubscribeTransitionCallBack(actorRef: ActorRef)
-
- sealed trait Reason
- case object Normal extends Reason
- case object Shutdown extends Reason
- case class Failure(cause: Any) extends Reason
-
- case object StateTimeout
- case class TimeoutMarker(generation: Long)
-
- case class Timer(name: String, msg: AnyRef, repeat: Boolean, generation: Int) {
- private var ref: Option[ScheduledFuture[AnyRef]] = _
-
- def schedule(actor: ActorRef, timeout: Duration) {
- if (repeat) {
- ref = Some(Scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit))
- } else {
- ref = Some(Scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit))
- }
- }
-
- def cancel {
- if (ref.isDefined) {
- ref.get.cancel(true)
- ref = None
- }
- }
- }
-
- /*
- * This extractor is just convenience for matching a (S, S) pair, including a
- * reminder what the new state is.
- */
- object -> {
- def unapply[S](in: (S, S)) = Some(in)
- }
-
- /*
- * With these implicits in scope, you can write "5 seconds" anywhere a
- * Duration or Option[Duration] is expected. This is conveniently true
- * for derived classes.
- */
- implicit def d2od(d: Duration): Option[Duration] = Some(d)
-}
-
-/**
- * Finite State Machine actor trait. Use as follows:
- *
- * <pre>
- * object A {
- * trait State
- * case class One extends State
- * case class Two extends State
- *
- * case class Data(i : Int)
- * }
- *
- * class A extends Actor with FSM[A.State, A.Data] {
- * import A._
- *
- * startWith(One, Data(42))
- * when(One) {
- * case Event(SomeMsg, Data(x)) => ...
- * case Ev(SomeMsg) => ... // convenience when data not needed
- * }
- * when(Two, stateTimeout = 5 seconds) { ... }
- * initialize
- * }
- * </pre>
- *
- * Within the partial function the following values are returned for effecting
- * state transitions:
- *
- * - <code>stay</code> for staying in the same state
- * - <code>stay using Data(...)</code> for staying in the same state, but with
- * different data
- * - <code>stay forMax 5.millis</code> for staying with a state timeout; can be
- * combined with <code>using</code>
- * - <code>goto(...)</code> for changing into a different state; also supports
- * <code>using</code> and <code>forMax</code>
- * - <code>stop</code> for terminating this FSM actor
- *
- * Each of the above also supports the method <code>replying(AnyRef)</code> for
- * sending a reply before changing state.
- *
- * While changing state, custom handlers may be invoked which are registered
- * using <code>onTransition</code>. This is meant to enable concentrating
- * different concerns in different places; you may choose to use
- * <code>when</code> for describing the properties of a state, including of
- * course initiating transitions, but you can describe the transitions using
- * <code>onTransition</code> to avoid having to duplicate that code among
- * multiple paths which lead to a transition:
- *
- * <pre>
- * onTransition {
- * case Active -&gt; _ =&gt; cancelTimer("activeTimer")
- * }
- * </pre>
- *
- * Multiple such blocks are supported and all of them will be called, not only
- * the first matching one.
- *
- * Another feature is that other actors may subscribe for transition events by
- * sending a <code>SubscribeTransitionCallback</code> message to this actor;
- * use <code>UnsubscribeTransitionCallback</code> before stopping the other
- * actor.
- *
- * State timeouts set an upper bound to the time which may pass before another
- * message is received in the current state. If no external message is
- * available, then upon expiry of the timeout a StateTimeout message is sent.
- * Note that this message will only be received in the state for which the
- * timeout was set and that any message received will cancel the timeout
- * (possibly to be started again by the next transition).
- *
- * Another feature is the ability to install and cancel single-shot as well as
- * repeated timers which arrange for the sending of a user-specified message:
- *
- * <pre>
- * setTimer("tock", TockMsg, 1 second, true) // repeating
- * setTimer("lifetime", TerminateMsg, 1 hour, false) // single-shot
- * cancelTimer("tock")
- * timerActive_? ("tock")
- * </pre>
- */
-trait FSM[S, D] extends ListenerManagement {
- this: Actor =>
-
- import FSM._
-
- type StateFunction = scala.PartialFunction[Event[D], State]
- type Timeout = Option[Duration]
- type TransitionHandler = PartialFunction[(S, S), Unit]
-
- /**
- * ****************************************
- * DSL
- * ****************************************
- */
-
- /**
- * Insert a new StateFunction at the end of the processing chain for the
- * given state. If the stateTimeout parameter is set, entering this state
- * without a differing explicit timeout setting will trigger a StateTimeout
- * event; the same is true when using #stay.
- *
- * @param stateName designator for the state
- * @param stateTimeout default state timeout for this state
- * @param stateFunction partial function describing response to input
- */
- protected final def when(stateName: S, stateTimeout: Timeout = None)(stateFunction: StateFunction) = {
- register(stateName, stateFunction, stateTimeout)
- }
-
- /**
- * Set initial state. Call this method from the constructor before the #initialize method.
- *
- * @param stateName initial state designator
- * @param stateData initial state data
- * @param timeout state timeout for the initial state, overriding the default timeout for that state
- */
- protected final def startWith(stateName: S,
- stateData: D,
- timeout: Timeout = None) = {
- currentState = State(stateName, stateData, timeout)
- }
-
- /**
- * Produce transition to other state. Return this from a state function in
- * order to effect the transition.
- *
- * @param nextStateName state designator for the next state
- * @return state transition descriptor
- */
- protected final def goto(nextStateName: S): State = {
- State(nextStateName, currentState.stateData)
- }
-
- /**
- * Produce "empty" transition descriptor. Return this from a state function
- * when no state change is to be effected.
- *
- * @return descriptor for staying in current state
- */
- protected final def stay(): State = {
- // cannot directly use currentState because of the timeout field
- goto(currentState.stateName)
- }
-
- /**
- * Produce change descriptor to stop this FSM actor with reason "Normal".
- */
- protected final def stop(): State = {
- stop(Normal)
- }
-
- /**
- * Produce change descriptor to stop this FSM actor including specified reason.
- */
- protected final def stop(reason: Reason): State = {
- stop(reason, currentState.stateData)
- }
-
- /**
- * Produce change descriptor to stop this FSM actor including specified reason.
- */
- protected final def stop(reason: Reason, stateData: D): State = {
- stay using stateData withStopReason (reason)
- }
-
- /**
- * Schedule named timer to deliver message after given delay, possibly repeating.
- * @param name identifier to be used with cancelTimer()
- * @param msg message to be delivered
- * @param timeout delay of first message delivery and between subsequent messages
- * @param repeat send once if false, scheduleAtFixedRate if true
- * @return current state descriptor
- */
- protected final def setTimer(name: String, msg: AnyRef, timeout: Duration, repeat: Boolean): State = {
- if (timers contains name) {
- timers(name).cancel
- }
- val timer = Timer(name, msg, repeat, timerGen.next)
- timer.schedule(self, timeout)
- timers(name) = timer
- stay
- }
-
- /**
- * Cancel named timer, ensuring that the message is not subsequently delivered (no race).
- * @param name of the timer to cancel
- */
- protected final def cancelTimer(name: String) = {
- if (timers contains name) {
- timers(name).cancel
- timers -= name
- }
- }
-
- /**
- * Inquire whether the named timer is still active. Returns true unless the
- * timer does not exist, has previously been canceled or if it was a
- * single-shot timer whose message was already received.
- */
- protected final def timerActive_?(name: String) = timers contains name
-
- /**
- * Set state timeout explicitly. This method can safely be used from within a
- * state handler.
- */
- protected final def setStateTimeout(state: S, timeout: Timeout) {
- stateTimeouts(state) = timeout
- }
-
- /**
- * Set handler which is called upon each state transition, i.e. not when
- * staying in the same state. This may use the pair extractor defined in the
- * FSM companion object like so:
- *
- * <pre>
- * onTransition {
- * case Old -&gt; New =&gt; doSomething
- * }
- * </pre>
- *
- * It is also possible to supply a 2-ary function object:
- *
- * <pre>
- * onTransition(handler _)
- *
- * private def handler(from: S, to: S) { ... }
- * </pre>
- *
- * The underscore is unfortunately necessary to enable the nicer syntax shown
- * above (it uses the implicit conversion total2pf under the hood).
- *
- * <b>Multiple handlers may be installed, and every one of them will be
- * called, not only the first one matching.</b>
- */
- protected final def onTransition(transitionHandler: TransitionHandler) {
- transitionEvent :+= transitionHandler
- }
-
- /**
- * Convenience wrapper for using a total function instead of a partial
- * function literal. To be used with onTransition.
- */
- implicit protected final def total2pf(transitionHandler: (S, S) => Unit) =
- new PartialFunction[(S, S), Unit] {
- def isDefinedAt(in: (S, S)) = true
- def apply(in: (S, S)) { transitionHandler(in._1, in._2) }
- }
-
- /**
- * Set handler which is called upon termination of this FSM actor.
- */
- protected final def onTermination(terminationHandler: PartialFunction[StopEvent[S, D], Unit]) = {
- terminateEvent = terminationHandler
- }
-
- /**
- * Set handler which is called upon reception of unhandled messages.
- */
- protected final def whenUnhandled(stateFunction: StateFunction) = {
- handleEvent = stateFunction orElse handleEventDefault
- }
-
- /**
- * Verify existence of initial state and setup timers. This should be the
- * last call within the constructor.
- */
- def initialize {
- makeTransition(currentState)
- }
-
- /**
- * ****************************************************************
- * PRIVATE IMPLEMENTATION DETAILS
- * ****************************************************************
- */
-
- /*
- * FSM State data and current timeout handling
- */
- private var currentState: State = _
- private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None
- private var generation: Long = 0L
-
- /*
- * Timer handling
- */
- private val timers = mutable.Map[String, Timer]()
- private val timerGen = Iterator from 0
-
- /*
- * State definitions
- */
- private val stateFunctions = mutable.Map[S, StateFunction]()
- private val stateTimeouts = mutable.Map[S, Timeout]()
-
- private def register(name: S, function: StateFunction, timeout: Timeout) {
- if (stateFunctions contains name) {
- stateFunctions(name) = stateFunctions(name) orElse function
- stateTimeouts(name) = timeout orElse stateTimeouts(name)
- } else {
- stateFunctions(name) = function
- stateTimeouts(name) = timeout
- }
- }
-
- /*
- * unhandled event handler
- */
- private val handleEventDefault: StateFunction = {
- case Event(value, stateData) =>
- stay
- }
- private var handleEvent: StateFunction = handleEventDefault
-
- /*
- * termination handling
- */
- private var terminateEvent: PartialFunction[StopEvent[S, D], Unit] = {
- case StopEvent(Failure(cause), _, _) =>
- case StopEvent(reason, _, _) =>
- }
-
- /*
- * transition handling
- */
- private var transitionEvent: List[TransitionHandler] = Nil
- private def handleTransition(prev: S, next: S) {
- val tuple = (prev, next)
- for (te ← transitionEvent) { if (te.isDefinedAt(tuple)) te(tuple) }
- }
-
- // ListenerManagement shall not start() or stop() listener actors
- override protected val manageLifeCycleOfListeners = false
-
- /**
- * *******************************************
- * Main actor receive() method
- * *******************************************
- */
- override final protected def receive: Receive = {
- case TimeoutMarker(gen) =>
- if (generation == gen) {
- processEvent(StateTimeout)
- }
- case t@Timer(name, msg, repeat, generation) =>
- if ((timers contains name) && (timers(name).generation == generation)) {
- processEvent(msg)
- if (!repeat) {
- timers -= name
- }
- }
- case SubscribeTransitionCallBack(actorRef) =>
- addListener(actorRef)
- // send current state back as reference point
- try {
- actorRef ! CurrentState(self, currentState.stateName)
- } catch {
- case e: ActorInitializationException =>
- EventHandler.warning(this, "trying to register not running listener")
- }
- case UnsubscribeTransitionCallBack(actorRef) =>
- removeListener(actorRef)
- case value => {
- if (timeoutFuture.isDefined) {
- timeoutFuture.get.cancel(true)
- timeoutFuture = None
- }
- generation += 1
- processEvent(value)
- }
- }
-
- private def processEvent(value: Any) = {
- val event = Event(value, currentState.stateData)
- val stateFunc = stateFunctions(currentState.stateName)
- val nextState = if (stateFunc isDefinedAt event) {
- stateFunc(event)
- } else {
- // handleEventDefault ensures that this is always defined
- handleEvent(event)
- }
- nextState.stopReason match {
- case Some(reason) => terminate(reason)
- case None => makeTransition(nextState)
- }
- }
-
- private def makeTransition(nextState: State) = {
- if (!stateFunctions.contains(nextState.stateName)) {
- terminate(Failure("Next state %s does not exist".format(nextState.stateName)))
- } else {
- if (currentState.stateName != nextState.stateName) {
- handleTransition(currentState.stateName, nextState.stateName)
- notifyListeners(Transition(self, currentState.stateName, nextState.stateName))
- }
- applyState(nextState)
- }
- }
-
- private def applyState(nextState: State) = {
- currentState = nextState
- val timeout = if (currentState.timeout.isDefined) currentState.timeout else stateTimeouts(currentState.stateName)
- if (timeout.isDefined) {
- val t = timeout.get
- if (t.finite_? && t.length >= 0) {
- timeoutFuture = Some(Scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit))
- }
- }
- }
-
- private def terminate(reason: Reason) = {
- terminateEvent.apply(StopEvent(reason, currentState.stateName, currentState.stateData))
- self.stop()
- }
-
- case class Event[D](event: Any, stateData: D)
- object Ev {
- def unapply[D](e: Event[D]): Option[Any] = Some(e.event)
- }
-
- case class State(stateName: S, stateData: D, timeout: Timeout = None) {
-
- /**
- * Modify state transition descriptor to include a state timeout for the
- * next state. This timeout overrides any default timeout set for the next
- * state.
- */
- def forMax(timeout: Duration): State = {
- copy(timeout = Some(timeout))
- }
-
- /**
- * Send reply to sender of the current message, if available.
- *
- * @return this state transition descriptor
- */
- def replying(replyValue: Any): State = {
- self.sender match {
- case Some(sender) => sender ! replyValue
- case None =>
- }
- this
- }
-
- /**
- * Modify state transition descriptor with new state data. The data will be
- * set when transitioning to the new state.
- */
- def using(nextStateDate: D): State = {
- copy(stateData = nextStateDate)
- }
-
- private[akka] var stopReason: Option[Reason] = None
-
- private[akka] def withStopReason(reason: Reason): State = {
- stopReason = Some(reason)
- this
- }
- }
-
- case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D)
-}
diff --git a/test/disabled/presentation/akka/src/akka/actor/Scheduler.scala b/test/disabled/presentation/akka/src/akka/actor/Scheduler.scala
deleted file mode 100644
index 128584f3c5..0000000000
--- a/test/disabled/presentation/akka/src/akka/actor/Scheduler.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Copyright 2007 WorldWide Conferencing, LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Rework of David Pollak's ActorPing class in the Lift Project
- * which is licensed under the Apache 2 License.
- */
-package akka.actor
-
-import scala.collection.JavaConversions
-
-import java.util.concurrent._
-
-import akka.event.EventHandler
-import akka.AkkaException
-
-object Scheduler {
- import Actor._
-
- case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e)
-
- @volatile
- private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
-
- /**
- * Schedules to send the specified message to the receiver after initialDelay and then repeated after delay
- */
- def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
- try {
- service.scheduleAtFixedRate(
- new Runnable { def run = receiver ! message },
- initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
- } catch {
- case e: Exception =>
- val error = SchedulerException(message + " could not be scheduled on " + receiver, e)
- EventHandler.error(error, this, "%s @ %s".format(receiver, message))
- throw error
- }
- }
-
- /**
- * Schedules to run specified function to the receiver after initialDelay and then repeated after delay,
- * avoid blocking operations since this is executed in the schedulers thread
- */
- def schedule(f: () => Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] =
- schedule(new Runnable { def run = f() }, initialDelay, delay, timeUnit)
-
- /**
- * Schedules to run specified runnable to the receiver after initialDelay and then repeated after delay,
- * avoid blocking operations since this is executed in the schedulers thread
- */
- def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
- try {
- service.scheduleAtFixedRate(runnable, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
- } catch {
- case e: Exception =>
- val error = SchedulerException("Failed to schedule a Runnable", e)
- EventHandler.error(error, this, error.getMessage)
- throw error
- }
- }
-
- /**
- * Schedules to send the specified message to the receiver after delay
- */
- def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
- try {
- service.schedule(
- new Runnable { def run = receiver ! message },
- delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
- } catch {
- case e: Exception =>
- val error = SchedulerException(message + " could not be scheduleOnce'd on " + receiver, e)
- EventHandler.error(e, this, receiver + " @ " + message)
- throw error
- }
- }
-
- /**
- * Schedules a function to be run after delay,
- * avoid blocking operations since the runnable is executed in the schedulers thread
- */
- def scheduleOnce(f: () => Unit, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] =
- scheduleOnce(new Runnable { def run = f() }, delay, timeUnit)
-
- /**
- * Schedules a runnable to be run after delay,
- * avoid blocking operations since the runnable is executed in the schedulers thread
- */
- def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
- try {
- service.schedule(runnable, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
- } catch {
- case e: Exception =>
- val error = SchedulerException("Failed to scheduleOnce a Runnable", e)
- EventHandler.error(e, this, error.getMessage)
- throw error
- }
- }
-
- def shutdown() {
- synchronized {
- service.shutdown()
- }
- }
-
- def restart() {
- synchronized {
- shutdown()
- service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
- }
- }
-}
-
-private object SchedulerThreadFactory extends ThreadFactory {
- private var count = 0
- val threadFactory = Executors.defaultThreadFactory()
-
- def newThread(r: Runnable): Thread = {
- val thread = threadFactory.newThread(r)
- thread.setName("akka:scheduler-" + count)
- thread.setDaemon(true)
- thread
- }
-}
diff --git a/test/disabled/presentation/akka/src/akka/actor/Supervisor.scala b/test/disabled/presentation/akka/src/akka/actor/Supervisor.scala
deleted file mode 100644
index bec3c83f1a..0000000000
--- a/test/disabled/presentation/akka/src/akka/actor/Supervisor.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.actor
-
-import akka.AkkaException
-import akka.util._
-import ReflectiveAccess._
-import Actor._
-
-import java.util.concurrent.{ CopyOnWriteArrayList, ConcurrentHashMap }
-import java.net.InetSocketAddress
-import akka.config.Supervision._
-
-class SupervisorException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
-
-/**
- * Factory object for creating supervisors declarative. It creates instances of the 'Supervisor' class.
- * These are not actors, if you need a supervisor that is an Actor then you have to use the 'SupervisorActor'
- * factory object.
- * <p/>
- *
- * Here is a sample on how to use it:
- * <pre>
- * val supervisor = Supervisor(
- * SupervisorConfig(
- * RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
- * Supervise(
- * myFirstActor,
- * Permanent) ::
- * Supervise(
- * mySecondActor,
- * Permanent) ::
- * Nil))
- * </pre>
- *
- * You dynamically link and unlink child children using the 'link' and 'unlink' methods.
- * <pre>
- * supervisor.link(child)
- * supervisor.unlink(child)
- * </pre>
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-object Supervisor {
- def apply(config: SupervisorConfig): Supervisor = SupervisorFactory(config).newInstance.start
-}
-
-/**
- * Use this factory instead of the Supervisor factory object if you want to control
- * instantiation and starting of the Supervisor, if not then it is easier and better
- * to use the Supervisor factory object.
- * <p>
- * Example usage:
- * <pre>
- * val factory = SupervisorFactory(
- * SupervisorConfig(
- * RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
- * Supervise(
- * myFirstActor,
- * Permanent) ::
- * Supervise(
- * mySecondActor,
- * Permanent) ::
- * Nil))
- * </pre>
- *
- * Then create a new Supervisor tree with the concrete Services we have defined.
- *
- * <pre>
- * val supervisor = factory.newInstance
- * supervisor.start // start up all managed servers
- * </pre>
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-case class SupervisorFactory(val config: SupervisorConfig) {
-
- def newInstance: Supervisor = newInstanceFor(config)
-
- def newInstanceFor(config: SupervisorConfig): Supervisor = {
- val supervisor = new Supervisor(config.restartStrategy, config.maxRestartsHandler)
- supervisor.configure(config)
- supervisor.start
- supervisor
- }
-}
-
-/**
- * <b>NOTE:</b>
- * <p/>
- * The supervisor class is only used for the configuration system when configuring supervisor
- * hierarchies declaratively. Should not be used as part of the regular programming API. Instead
- * wire the children together using 'link', 'spawnLink' etc. and set the 'trapExit' flag in the
- * children that should trap error signals and trigger restart.
- * <p/>
- * See the Scaladoc for the SupervisorFactory for an example on how to declaratively wire up children.
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) => Unit) {
- import Supervisor._
-
- private val _childActors = new ConcurrentHashMap[String, List[ActorRef]]
- private val _childSupervisors = new CopyOnWriteArrayList[Supervisor]
-
- private[akka] val supervisor = actorOf(new SupervisorActor(handler, maxRestartsHandler)).start()
-
- def uuid = supervisor.uuid
-
- def start: Supervisor = {
- this
- }
-
- def shutdown(): Unit = supervisor.stop()
-
- def link(child: ActorRef) = supervisor.link(child)
-
- def unlink(child: ActorRef) = supervisor.unlink(child)
-
- def children: List[ActorRef] =
- _childActors.values.toArray.toList.asInstanceOf[List[List[ActorRef]]].flatten
-
- def childSupervisors: List[Supervisor] =
- _childActors.values.toArray.toList.asInstanceOf[List[Supervisor]]
-
- def configure(config: SupervisorConfig): Unit = config match {
- case SupervisorConfig(_, servers, _) =>
-
- servers.map(server =>
- server match {
- case Supervise(actorRef, lifeCycle, registerAsRemoteService) =>
- actorRef.start()
- val className = actorRef.actor.getClass.getName
- val currentActors = {
- val list = _childActors.get(className)
- if (list eq null) List[ActorRef]()
- else list
- }
- _childActors.put(className, actorRef :: currentActors)
- actorRef.lifeCycle = lifeCycle
- supervisor.link(actorRef)
- if (registerAsRemoteService)
- Actor.remote.register(actorRef)
- case supervisorConfig@SupervisorConfig(_, _, _) => // recursive supervisor configuration
- val childSupervisor = Supervisor(supervisorConfig)
- supervisor.link(childSupervisor.supervisor)
- _childSupervisors.add(childSupervisor)
- })
- }
-}
-
-/**
- * For internal use only.
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-final class SupervisorActor private[akka] (handler: FaultHandlingStrategy, maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) => Unit) extends Actor {
- self.faultHandler = handler
-
- override def postStop(): Unit = {
- val i = self.linkedActors.values.iterator
- while (i.hasNext) {
- val ref = i.next
- ref.stop()
- self.unlink(ref)
- }
- }
-
- def receive = {
- case max@MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) => maxRestartsHandler(self, max)
- case unknown => throw new SupervisorException(
- "SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]")
- }
-}
diff --git a/test/disabled/presentation/akka/src/akka/actor/UntypedActor.scala b/test/disabled/presentation/akka/src/akka/actor/UntypedActor.scala
deleted file mode 100644
index cbc43f22f8..0000000000
--- a/test/disabled/presentation/akka/src/akka/actor/UntypedActor.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.actor
-
-import akka.japi.{ Creator, Procedure }
-
-/**
- * Subclass this abstract class to create a MDB-style untyped actor.
- * <p/>
- * This class is meant to be used from Java.
- * <p/>
- * Here is an example on how to create and use an UntypedActor:
- * <pre>
- * public class SampleUntypedActor extends UntypedActor {
- * public void onReceive(Object message) throws Exception {
- * if (message instanceof String) {
- * String msg = (String)message;
- *
- * if (msg.equals("UseReply")) {
- * // Reply to original sender of message using the 'replyUnsafe' method
- * getContext().replyUnsafe(msg + ":" + getContext().getUuid());
- *
- * } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) {
- * // Reply to original sender of message using the sender reference
- * // also passing along my own reference (the context)
- * getContext().getSender().get().sendOneWay(msg, context);
- *
- * } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) {
- * // Reply to original sender of message using the sender future reference
- * getContext().getSenderFuture().get().completeWithResult(msg);
- *
- * } else if (msg.equals("SendToSelf")) {
- * // Send message to the actor itself recursively
- * getContext().sendOneWay(msg)
- *
- * } else if (msg.equals("ForwardMessage")) {
- * // Retrieve an actor from the ActorRegistry by ID and get an ActorRef back
- * ActorRef actorRef = Actor.registry.actorsFor("some-actor-id").head();
- *
- * } else throw new IllegalArgumentException("Unknown message: " + message);
- * } else throw new IllegalArgumentException("Unknown message: " + message);
- * }
- *
- * public static void main(String[] args) {
- * ActorRef actor = Actors.actorOf(SampleUntypedActor.class);
- * actor.start();
- * actor.sendOneWay("SendToSelf");
- * actor.stop();
- * }
- * }
- * </pre>
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-abstract class UntypedActor extends Actor {
-
- /**
- * To be implemented by concrete UntypedActor. Defines the message handler.
- */
- @throws(classOf[Exception])
- def onReceive(message: Any): Unit
-
- /**
- * Returns the 'self' reference with the API.
- */
- def getContext(): ActorRef = self
-
- /**
- * Returns the 'self' reference with the API.
- */
- def context(): ActorRef = self
-
- /**
- * Java API for become
- */
- def become(behavior: Procedure[Any]): Unit = become(behavior, false)
-
- /*
- * Java API for become with optional discardOld
- */
- def become(behavior: Procedure[Any], discardOld: Boolean): Unit =
- super.become({ case msg => behavior.apply(msg) }, discardOld)
-
- /**
- * User overridable callback.
- * <p/>
- * Is called when an Actor is started by invoking 'actor.start()'.
- */
- override def preStart() {}
-
- /**
- * User overridable callback.
- * <p/>
- * Is called when 'actor.stop()' is invoked.
- */
- override def postStop() {}
-
- /**
- * User overridable callback.
- * <p/>
- * Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
- */
- override def preRestart(reason: Throwable) {}
-
- /**
- * User overridable callback.
- * <p/>
- * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
- */
- override def postRestart(reason: Throwable) {}
-
- /**
- * User overridable callback.
- * <p/>
- * Is called when a message isn't handled by the current behavior of the actor
- * by default it throws an UnhandledMessageException
- */
- override def unhandled(msg: Any) {
- throw new UnhandledMessageException(msg, self)
- }
-
- final protected def receive = {
- case msg => onReceive(msg)
- }
-}
-
-/**
- * Factory closure for an UntypedActor, to be used with 'Actors.actorOf(factory)'.
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-trait UntypedActorFactory extends Creator[Actor]
diff --git a/test/disabled/presentation/akka/src/akka/actor/package.scala b/test/disabled/presentation/akka/src/akka/actor/package.scala
deleted file mode 100644
index fbeeed49cb..0000000000
--- a/test/disabled/presentation/akka/src/akka/actor/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka
-
-import actor.{ ScalaActorRef, ActorRef }
-
-package object actor {
- implicit def actorRef2Scala(ref: ActorRef): ScalaActorRef =
- ref.asInstanceOf[ScalaActorRef]
-
- implicit def scala2ActorRef(ref: ScalaActorRef): ActorRef =
- ref.asInstanceOf[ActorRef]
-
- type Uuid = com.eaio.uuid.UUID
-
- def newUuid(): Uuid = new Uuid()
-
- def uuidFrom(time: Long, clockSeqAndNode: Long): Uuid = new Uuid(time, clockSeqAndNode)
-
- def uuidFrom(uuid: String): Uuid = new Uuid(uuid)
-}