diff options
Diffstat (limited to 'test/files/presentation/akka/src/akka/remoteinterface/RemoteInterface.scala')
-rw-r--r-- | test/files/presentation/akka/src/akka/remoteinterface/RemoteInterface.scala | 493 |
1 files changed, 493 insertions, 0 deletions
diff --git a/test/files/presentation/akka/src/akka/remoteinterface/RemoteInterface.scala b/test/files/presentation/akka/src/akka/remoteinterface/RemoteInterface.scala new file mode 100644 index 0000000000..6366a4158c --- /dev/null +++ b/test/files/presentation/akka/src/akka/remoteinterface/RemoteInterface.scala @@ -0,0 +1,493 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se> + */ + +package akka.remoteinterface + +import akka.japi.Creator +import akka.actor._ +import akka.util._ +import akka.dispatch.CompletableFuture +import akka.AkkaException + +import scala.reflect.BeanProperty + +import java.net.InetSocketAddress +import java.util.concurrent.ConcurrentHashMap +import java.io.{ PrintWriter, PrintStream } +import java.lang.reflect.InvocationTargetException + +trait RemoteModule { + val UUID_PREFIX = "uuid:".intern + + def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope + protected[akka] def notifyListeners(message: => Any): Unit + + private[akka] def actors: ConcurrentHashMap[String, ActorRef] + private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef] + private[akka] def actorsFactories: ConcurrentHashMap[String, () => ActorRef] + private[akka] def typedActors: ConcurrentHashMap[String, AnyRef] + private[akka] def typedActorsByUuid: ConcurrentHashMap[String, AnyRef] + private[akka] def typedActorsFactories: ConcurrentHashMap[String, () => AnyRef] + + /** Lookup methods **/ + + private[akka] def findActorById(id: String): ActorRef = actors.get(id) + + private[akka] def findActorByUuid(uuid: String): ActorRef = actorsByUuid.get(uuid) + + private[akka] def findActorFactory(id: String): () => ActorRef = actorsFactories.get(id) + + private[akka] def findTypedActorById(id: String): AnyRef = typedActors.get(id) + + private[akka] def findTypedActorFactory(id: String): () => AnyRef = typedActorsFactories.get(id) + + private[akka] def findTypedActorByUuid(uuid: String): AnyRef = typedActorsByUuid.get(uuid) + + private[akka] def findActorByIdOrUuid(id: String, uuid: String): ActorRef = { + var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findActorByUuid(id.substring(UUID_PREFIX.length)) + else findActorById(id) + if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid) + actorRefOrNull + } + + private[akka] def findTypedActorByIdOrUuid(id: String, uuid: String): AnyRef = { + var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) findTypedActorByUuid(id.substring(UUID_PREFIX.length)) + else findTypedActorById(id) + if (actorRefOrNull eq null) actorRefOrNull = findTypedActorByUuid(uuid) + actorRefOrNull + } +} + +/** + * Life-cycle events for RemoteClient. + */ +sealed trait RemoteClientLifeCycleEvent +case class RemoteClientError( + @BeanProperty cause: Throwable, + @BeanProperty client: RemoteClientModule, + @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientDisconnected( + @BeanProperty client: RemoteClientModule, + @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientConnected( + @BeanProperty client: RemoteClientModule, + @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientStarted( + @BeanProperty client: RemoteClientModule, + @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientShutdown( + @BeanProperty client: RemoteClientModule, + @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent +case class RemoteClientWriteFailed( + @BeanProperty request: AnyRef, + @BeanProperty cause: Throwable, + @BeanProperty client: RemoteClientModule, + @BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent + +/** + * Life-cycle events for RemoteServer. + */ +sealed trait RemoteServerLifeCycleEvent +case class RemoteServerStarted( + @BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent +case class RemoteServerShutdown( + @BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent +case class RemoteServerError( + @BeanProperty val cause: Throwable, + @BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent +case class RemoteServerClientConnected( + @BeanProperty val server: RemoteServerModule, + @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent +case class RemoteServerClientDisconnected( + @BeanProperty val server: RemoteServerModule, + @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent +case class RemoteServerClientClosed( + @BeanProperty val server: RemoteServerModule, + @BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent +case class RemoteServerWriteFailed( + @BeanProperty request: AnyRef, + @BeanProperty cause: Throwable, + @BeanProperty server: RemoteServerModule, + @BeanProperty clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent + +/** + * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. + */ +class RemoteClientException private[akka] ( + message: String, + @BeanProperty val client: RemoteClientModule, + val remoteAddress: InetSocketAddress, cause: Throwable = null) extends AkkaException(message, cause) + +/** + * Thrown when the remote server actor dispatching fails for some reason. + */ +class RemoteServerException private[akka] (message: String) extends AkkaException(message) + +/** + * Thrown when a remote exception sent over the wire cannot be loaded and instantiated + */ +case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException private[akka] (cause: Throwable, originalClassName: String, originalMessage: String) + extends AkkaException("\nParsingError[%s]\nOriginalException[%s]\nOriginalMessage[%s]" + .format(cause.toString, originalClassName, originalMessage)) { + override def printStackTrace = cause.printStackTrace + override def printStackTrace(printStream: PrintStream) = cause.printStackTrace(printStream) + override def printStackTrace(printWriter: PrintWriter) = cause.printStackTrace(printWriter) +} + +abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { + + lazy val eventHandler: ActorRef = { + val handler = Actor.actorOf[RemoteEventHandler].start() + // add the remote client and server listener that pipes the events to the event handler system + addListener(handler) + handler + } + + def shutdown() { + eventHandler.stop() + removeListener(eventHandler) + this.shutdownClientModule() + this.shutdownServerModule() + clear + } + + /** + * Creates a Client-managed ActorRef out of the Actor of the specified Class. + * If the supplied host and port is identical of the configured local node, it will be a local actor + * <pre> + * import Actor._ + * val actor = actorOf(classOf[MyActor],"www.akka.io", 2552) + * actor.start() + * actor ! message + * actor.stop() + * </pre> + * You can create and start the actor in one statement like this: + * <pre> + * val actor = actorOf(classOf[MyActor],"www.akka.io", 2552).start() + * </pre> + */ + @deprecated("Will be removed after 1.1", "1.1") + def actorOf(factory: => Actor, host: String, port: Int): ActorRef = + Actor.remote.clientManagedActorOf(() => factory, host, port) + + /** + * Creates a Client-managed ActorRef out of the Actor of the specified Class. + * If the supplied host and port is identical of the configured local node, it will be a local actor + * <pre> + * import Actor._ + * val actor = actorOf(classOf[MyActor],"www.akka.io",2552) + * actor.start() + * actor ! message + * actor.stop() + * </pre> + * You can create and start the actor in one statement like this: + * <pre> + * val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start() + * </pre> + */ + @deprecated("Will be removed after 1.1", "1.1") + def actorOf(clazz: Class[_ <: Actor], host: String, port: Int): ActorRef = + clientManagedActorOf(() => createActorFromClass(clazz), host, port) + + /** + * Creates a Client-managed ActorRef out of the Actor of the specified Class. + * If the supplied host and port is identical of the configured local node, it will be a local actor + * <pre> + * import Actor._ + * val actor = actorOf[MyActor]("www.akka.io",2552) + * actor.start() + * actor ! message + * actor.stop() + * </pre> + * You can create and start the actor in one statement like this: + * <pre> + * val actor = actorOf[MyActor]("www.akka.io",2552).start() + * </pre> + */ + @deprecated("Will be removed after 1.1", "1.1") + def actorOf[T <: Actor: Manifest](host: String, port: Int): ActorRef = + clientManagedActorOf(() => createActorFromClass(manifest.erasure), host, port) + + protected def createActorFromClass(clazz: Class[_]): Actor = { + import ReflectiveAccess.{ createInstance, noParams, noArgs } + createInstance[Actor](clazz, noParams, noArgs) match { + case Right(actor) => actor + case Left(exception) => + val cause = exception match { + case i: InvocationTargetException => i.getTargetException + case _ => exception + } + + throw new ActorInitializationException( + "Could not instantiate Actor of " + clazz + + "\nMake sure Actor is NOT defined inside a class/trait," + + "\nif so put it outside the class/trait, f.e. in a companion object," + + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", cause) + } + } + + protected override def manageLifeCycleOfListeners = false + protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) + + private[akka] val actors = new ConcurrentHashMap[String, ActorRef] + private[akka] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] + private[akka] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef] + private[akka] val typedActors = new ConcurrentHashMap[String, AnyRef] + private[akka] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] + private[akka] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef] + + def clear { + actors.clear + actorsByUuid.clear + typedActors.clear + typedActorsByUuid.clear + actorsFactories.clear + typedActorsFactories.clear + } +} + +/** + * This is the interface for the RemoteServer functionality, it's used in Actor.remote + */ +trait RemoteServerModule extends RemoteModule { + protected val guard = new ReentrantGuard + + /** + * Signals whether the server is up and running or not + */ + def isRunning: Boolean + + /** + * Gets the name of the server instance + */ + def name: String + + /** + * Gets the address of the server instance + */ + def address: InetSocketAddress + + /** + * Starts the server up + */ + def start(): RemoteServerModule = + start(ReflectiveAccess.Remote.configDefaultAddress.getAddress.getHostAddress, + ReflectiveAccess.Remote.configDefaultAddress.getPort, + None) + + /** + * Starts the server up + */ + def start(loader: ClassLoader): RemoteServerModule = + start(ReflectiveAccess.Remote.configDefaultAddress.getAddress.getHostAddress, + ReflectiveAccess.Remote.configDefaultAddress.getPort, + Option(loader)) + + /** + * Starts the server up + */ + def start(host: String, port: Int): RemoteServerModule = + start(host, port, None) + + /** + * Starts the server up + */ + def start(host: String, port: Int, loader: ClassLoader): RemoteServerModule = + start(host, port, Option(loader)) + + /** + * Starts the server up + */ + def start(host: String, port: Int, loader: Option[ClassLoader]): RemoteServerModule + + /** + * Shuts the server down + */ + def shutdownServerModule(): Unit + + /** + * Register typed actor by interface name. + */ + def registerTypedActor(intfClass: Class[_], typedActor: AnyRef): Unit = registerTypedActor(intfClass.getName, typedActor) + + /** + * Register remote typed actor by a specific id. + * @param id custom actor id + * @param typedActor typed actor to register + */ + def registerTypedActor(id: String, typedActor: AnyRef): Unit + + /** + * Register typed actor by interface name. + */ + def registerTypedPerSessionActor(intfClass: Class[_], factory: => AnyRef): Unit = registerTypedActor(intfClass.getName, factory) + + /** + * Register typed actor by interface name. + * Java API + */ + def registerTypedPerSessionActor(intfClass: Class[_], factory: Creator[AnyRef]): Unit = registerTypedActor(intfClass.getName, factory) + + /** + * Register remote typed actor by a specific id. + * @param id custom actor id + * @param typedActor typed actor to register + */ + def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit + + /** + * Register remote typed actor by a specific id. + * @param id custom actor id + * @param typedActor typed actor to register + * Java API + */ + def registerTypedPerSessionActor(id: String, factory: Creator[AnyRef]): Unit = registerTypedPerSessionActor(id, factory.create) + + /** + * Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already. + */ + def register(actorRef: ActorRef): Unit = register(actorRef.id, actorRef) + + /** + * Register Remote Actor by the Actor's uuid field. It starts the Actor if it is not started already. + */ + def registerByUuid(actorRef: ActorRef): Unit + + /** + * Register Remote Actor by a specific 'id' passed as argument. The actor is registered by UUID rather than ID + * when prefixing the handle with the “uuid:” protocol. + * <p/> + * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. + */ + def register(id: String, actorRef: ActorRef): Unit + + /** + * Register Remote Session Actor by a specific 'id' passed as argument. + * <p/> + * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. + */ + def registerPerSession(id: String, factory: => ActorRef): Unit + + /** + * Register Remote Session Actor by a specific 'id' passed as argument. + * <p/> + * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. + * Java API + */ + def registerPerSession(id: String, factory: Creator[ActorRef]): Unit = registerPerSession(id, factory.create) + + /** + * Unregister Remote Actor that is registered using its 'id' field (not custom ID). + */ + def unregister(actorRef: ActorRef): Unit + + /** + * Unregister Remote Actor by specific 'id'. + * <p/> + * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregister(id: String): Unit + + /** + * Unregister Remote Actor by specific 'id'. + * <p/> + * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregisterPerSession(id: String): Unit + + /** + * Unregister Remote Typed Actor by specific 'id'. + * <p/> + * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregisterTypedActor(id: String): Unit + + /** + * Unregister Remote Typed Actor by specific 'id'. + * <p/> + * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregisterTypedPerSessionActor(id: String): Unit +} + +trait RemoteClientModule extends RemoteModule { self: RemoteModule => + + def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, Actor.TIMEOUT, hostname, port, None) + + def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, Actor.TIMEOUT, hostname, port, Some(loader)) + + def actorFor(serviceId: String, className: String, hostname: String, port: Int): ActorRef = + actorFor(serviceId, className, Actor.TIMEOUT, hostname, port, None) + + def actorFor(serviceId: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(serviceId, className, Actor.TIMEOUT, hostname, port, Some(loader)) + + def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, None) + + def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, Some(loader)) + + def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = + actorFor(serviceId, className, timeout, hostname, port, None) + + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int): T = + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, Actor.TIMEOUT, hostname, port, None) + + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int): T = + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None) + + def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = + typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader)) + + def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = + typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) + + @deprecated("Will be removed after 1.1", "1.1") + def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef + + /** + * Clean-up all open connections. + */ + def shutdownClientModule(): Unit + + /** + * Shuts down a specific client connected to the supplied remote address returns true if successful + */ + def shutdownClientConnection(address: InetSocketAddress): Boolean + + /** + * Restarts a specific client connected to the supplied remote address, but only if the client is not shut down + */ + def restartClientConnection(address: InetSocketAddress): Boolean + + /** Methods that needs to be implemented by a transport **/ + + protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): T + + protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef + + protected[akka] def send[T](message: Any, + senderOption: Option[ActorRef], + senderFuture: Option[CompletableFuture[T]], + remoteAddress: InetSocketAddress, + timeout: Long, + isOneWay: Boolean, + actorRef: ActorRef, + typedActorInfo: Option[Tuple2[String, String]], + actorType: ActorType, + loader: Option[ClassLoader]): Option[CompletableFuture[T]] + + private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef + + private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef + + @deprecated("Will be removed after 1.1", "1.1") + private[akka] def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit + + @deprecated("Will be removed after 1.1", "1.1") + private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit +} |