/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
import scala.collection.mutable.{ ListBuffer, Map }
import scala.reflect.ArrayTag
import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap }
import java.util.{ Set => JSet }
import annotation.tailrec
import akka.util.ReflectiveAccess._
import akka.util.{ ReflectiveAccess, ReadWriteGuard, ListenerManagement }
/**
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
*
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
sealed trait ActorRegistryEvent
case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
/**
* Registry holding all Actor instances in the whole system.
* Mapped by:
* <ul>
* <li>the Actor's UUID</li>
* <li>the Actor's id field (which can be set by user-code)</li>
* <li>the Actor's class</li>
* <li>all Actors that are subtypes of a specific type</li>
* <ul>
*
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
final class ActorRegistry private[actor] () extends ListenerManagement {
private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef]
private val actorsById = new Index[String, ActorRef]
private val guard = new ReadWriteGuard
/**
* Returns all actors in the system.
*/
def actors: Array[ActorRef] = filter(_ => true)
/**
* Returns the number of actors in the system.
*/
def size: Int = actorsByUUID.size
/**
* Invokes a function for all actors.
*/
def foreach(f: (ActorRef) => Unit) = {
val elements = actorsByUUID.elements
while (elements.hasMoreElements) f(elements.nextElement)
}
/**
* Invokes the function on all known actors until it returns Some
* Returns None if the function never returns Some
*/
def find[T](f: PartialFunction[ActorRef, T]): Option[T] = {
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val element = elements.nextElement
if (f isDefinedAt element) return Some(f(element))
}
None
}
/**
* Finds all actors that are subtypes of the class passed in as the ClassTag argument and supporting passed message.
*/
def actorsFor[T <: Actor](message: Any)(implicit classTag: ClassTag[T]): Array[ActorRef] =
filter(a => classTag.erasure.isAssignableFrom(a.actor.getClass) && a.isDefinedAt(message))
/**
* Finds all actors that satisfy a predicate.
*/
def filter(p: ActorRef => Boolean): Array[ActorRef] = {
val all = new ListBuffer[ActorRef]
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val actorId = elements.nextElement
if (p(actorId)) all += actorId
}
all.toArray
}
/**
* Finds all actors that are subtypes of the class passed in as the ClassTag argument.
*/
def actorsFor[T <: Actor](implicit classTag: ClassTag[T]): Array[ActorRef] =
actorsFor[T](classTag.erasure.asInstanceOf[Class[T]])
/**
* Finds any actor that matches T. Very expensive, traverses ALL alive actors.
*/
def actorFor[T <: Actor](implicit classTag: ClassTag[T]): Option[ActorRef] =
find({ case a: ActorRef if classTag.erasure.isAssignableFrom(a.actor.getClass) => a })
/**
* Finds all actors of type or sub-type specified by the class passed in as the Class argument.
*/
def actorsFor[T <: Actor](clazz: Class[T]): Array[ActorRef] =
filter(a => clazz.isAssignableFrom(a.actor.getClass))
/**
* Finds all actors that has a specific id.
*/
def actorsFor(id: String): Array[ActorRef] = actorsById values id
/**
* Finds the actor that has a specific UUID.
*/
def actorFor(uuid: Uuid): Option[ActorRef] = Option(actorsByUUID get uuid)
/**
* Returns all typed actors in the system.
*/
def typedActors: Array[AnyRef] = filterTypedActors(_ => true)
/**
* Invokes a function for all typed actors.
*/
def foreachTypedActor(f: (AnyRef) => Unit) = {
TypedActorModule.ensureEnabled
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val proxy = typedActorFor(elements.nextElement)
if (proxy.isDefined) f(proxy.get)
}
}
/**
* Invokes the function on all known typed actors until it returns Some
* Returns None if the function never returns Some
*/
def findTypedActor[T](f: PartialFunction[AnyRef, T]): Option[T] = {
TypedActorModule.ensureEnabled
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val proxy = typedActorFor(elements.nextElement)
if (proxy.isDefined && (f isDefinedAt proxy)) return Some(f(proxy))
}
None
}
/**
* Finds all typed actors that satisfy a predicate.
*/
def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = {
TypedActorModule.ensureEnabled
val all = new ListBuffer[AnyRef]
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val proxy = typedActorFor(elements.nextElement)
if (proxy.isDefined && p(proxy.get)) all += proxy.get
}
all.toArray
}
/**
* Finds all typed actors that are subtypes of the class passed in as the ClassTag argument.
*/
def typedActorsFor[T <: AnyRef](implicit classTag: ClassTag[T]): Array[AnyRef] = {
TypedActorModule.ensureEnabled
typedActorsFor[T](classTag.erasure.asInstanceOf[Class[T]])
}
/**
* Finds any typed actor that matches T.
*/
def typedActorFor[T <: AnyRef](implicit classTag: ClassTag[T]): Option[AnyRef] = {
TypedActorModule.ensureEnabled
def predicate(proxy: AnyRef): Boolean = {
val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
actorRef.isDefined && classTag.erasure.isAssignableFrom(actorRef.get.actor.getClass)
}
findTypedActor({ case a: Some[AnyRef] if predicate(a.get) => a })
}
/**
* Finds all typed actors of type or sub-type specified by the class passed in as the Class argument.
*/
def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = {
TypedActorModule.ensureEnabled
def predicate(proxy: AnyRef): Boolean = {
val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass)
}
filterTypedActors(predicate)
}
/**
* Finds all typed actors that have a specific id.
*/
def typedActorsFor(id: String): Array[AnyRef] = {
TypedActorModule.ensureEnabled
val actorRefs = actorsById values id
actorRefs.flatMap(typedActorFor(_))
}
/**
* Finds the typed actor that has a specific UUID.
*/
def typedActorFor(uuid: Uuid): Option[AnyRef] = {
TypedActorModule.ensureEnabled
val actorRef = actorsByUUID get uuid
if (actorRef eq null) None
else typedActorFor(actorRef)
}
/**
* Get the typed actor proxy for a given typed actor ref.
*/
private def typedActorFor(actorRef: ActorRef): Option[AnyRef] = {
TypedActorModule.typedActorObjectInstance.get.proxyFor(actorRef)
}
/**
* Registers an actor in the ActorRegistry.
*/
private[akka] def register(actor: ActorRef) {
val id = actor.id
val uuid = actor.uuid
actorsById.put(id, actor)
actorsByUUID.put(uuid, actor)
// notify listeners
notifyListeners(ActorRegistered(actor))
}
/**
* Unregisters an actor in the ActorRegistry.
*/
private[akka] def unregister(actor: ActorRef) {
val id = actor.id
val uuid = actor.uuid
actorsByUUID remove uuid
actorsById.remove(id, actor)
// notify listeners
notifyListeners(ActorUnregistered(actor))
}
/**
* Shuts down and unregisters all actors in the system.
*/
def shutdownAll() {
if (TypedActorModule.isEnabled) {
val elements = actorsByUUID.elements
while (elements.hasMoreElements) {
val actorRef = elements.nextElement
val proxy = typedActorFor(actorRef)
if (proxy.isDefined) TypedActorModule.typedActorObjectInstance.get.stop(proxy.get)
else actorRef.stop()
}
} else foreach(_.stop())
if (Remote.isEnabled) {
Actor.remote.clear //TODO: REVISIT: Should this be here?
}
actorsByUUID.clear
actorsById.clear
}
}
/**
* An implementation of a ConcurrentMultiMap
* Adds/remove is serialized over the specified key
* Reads are fully concurrent <-- el-cheapo
*
* @author Viktor Klang
*/
class Index[K <: AnyRef, V <: AnyRef: ArrayTag] {
private val Naught = Array[V]() //Nil for Arrays
private val container = new ConcurrentHashMap[K, JSet[V]]
private val emptySet = new ConcurrentSkipListSet[V]
/**
* Associates the value of type V with the key of type K
* @return true if the value didn't exist for the key previously, and false otherwise
*/
def put(key: K, value: V): Boolean = {
//Tailrecursive spin-locking put
@tailrec
def spinPut(k: K, v: V): Boolean = {
var retry = false
var added = false
val set = container get k
if (set ne null) {
set.synchronized {
if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
else { //Else add the value to the set and signal that retry is not needed
added = set add v
retry = false
}
}
} else {
val newSet = new ConcurrentSkipListSet[V]
newSet add v
// Parry for two simultaneous putIfAbsent(id,newSet)
val oldSet = container.putIfAbsent(k, newSet)
if (oldSet ne null) {
oldSet.synchronized {
if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
else { //Else try to add the value to the set and signal that retry is not needed
added = oldSet add v
retry = false
}
}
} else added = true
}
if (retry) spinPut(k, v)
else added
}
spinPut(key, value)
}
/**
* @return a _new_ array of all existing values for the given key at the time of the call
*/
def values(key: K): Array[V] = {
val set: JSet[V] = container get key
val result = if (set ne null) set toArray Naught else Naught
result.asInstanceOf[Array[V]]
}
/**
* @return Some(value) for the first matching value where the supplied function returns true for the given key,
* if no matches it returns None
*/
def findValue(key: K)(f: (V) => Boolean): Option[V] = {
import scala.collection.JavaConversions._
val set = container get key
if (set ne null) set.iterator.find(f)
else None
}
/**
* Applies the supplied function to all keys and their values
*/
def foreach(fun: (K, V) => Unit) {
import scala.collection.JavaConversions._
container.entrySet foreach { (e) =>
e.getValue.foreach(fun(e.getKey, _))
}
}
/**
* Disassociates the value of type V from the key of type K
* @return true if the value was disassociated from the key and false if it wasn't previously associated with the key
*/
def remove(key: K, value: V): Boolean = {
val set = container get key
if (set ne null) {
set.synchronized {
if (set.remove(value)) { //If we can remove the value
if (set.isEmpty) //and the set becomes empty
container.remove(key, emptySet) //We try to remove the key if it's mapped to an empty set
true //Remove succeeded
} else false //Remove failed
}
} else false //Remove failed
}
/**
* @return true if the underlying containers is empty, may report false negatives when the last remove is underway
*/
def isEmpty: Boolean = container.isEmpty
/**
* Removes all keys and all values
*/
def clear = foreach { case (k, v) => remove(k, v) }
}