authorSeth Tisue <>2017-03-20 17:13:56 -0700
committerSeth Tisue <>2017-03-20 17:24:33 -0700
commit25048bc73741846107c18ed01e0e9f6f07785379 (patch)
treec1c9d60002fec74fc13af354e51bb3d688b33902 /test/disabled/presentation/akka/src/akka/actor/ActorRegistry.scala
parent0563c4b23cdc7ed6c05e9defe2a675df4d838347 (diff)
rm -r test/{flaky,disabled*,checker-tests,support,debug}
keeping this stuff, somewhere, forever and ever and ever is what version control is for. who dares disturb the ancient and accursed tomb of all this code...?
- * Copyright (C) 2009-2011 Scalable Solutions AB <>
- */
-import scala.collection.mutable.{ ListBuffer, Map }
-import scala.reflect.ArrayTag
-import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap }
-import java.util.{ Set => JSet }
-import annotation.tailrec
-import akka.util.ReflectiveAccess._
-import akka.util.{ ReflectiveAccess, ReadWriteGuard, ListenerManagement }
- * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
- *
- * @author <a href="">Jonas Bon&#233;r</a>
- */
-sealed trait ActorRegistryEvent
-case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
-case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
- * Registry holding all Actor instances in the whole system.
- * Mapped by:
- * <ul>
- * <li>the Actor's UUID</li>
- * <li>the Actor's id field (which can be set by user-code)</li>
- * <li>the Actor's class</li>
- * <li>all Actors that are subtypes of a specific type</li>
- * <ul>
- *
- * @author <a href="">Jonas Bon&#233;r</a>
- */
-final class ActorRegistry private[actor] () extends ListenerManagement {
- private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef]
- private val actorsById = new Index[String, ActorRef]
- private val guard = new ReadWriteGuard
- /**
- * Returns all actors in the system.
- */
- def actors: Array[ActorRef] = filter(_ => true)
- /**
- * Returns the number of actors in the system.
- */
- def size: Int = actorsByUUID.size
- /**
- * Invokes a function for all actors.
- */
- def foreach(f: (ActorRef) => Unit) = {
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) f(elements.nextElement)
- }
- /**
- * Invokes the function on all known actors until it returns Some
- * Returns None if the function never returns Some
- */
- def find[T](f: PartialFunction[ActorRef, T]): Option[T] = {
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) {
- val element = elements.nextElement
- if (f isDefinedAt element) return Some(f(element))
- }
- None
- }
- /**
- * Finds all actors that are subtypes of the class passed in as the ClassTag argument and supporting passed message.
- */
- def actorsFor[T <: Actor](message: Any)(implicit classTag: ClassTag[T]): Array[ActorRef] =
- filter(a => classTag.erasure.isAssignableFrom( && a.isDefinedAt(message))
- /**
- * Finds all actors that satisfy a predicate.
- */
- def filter(p: ActorRef => Boolean): Array[ActorRef] = {
- val all = new ListBuffer[ActorRef]
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) {
- val actorId = elements.nextElement
- if (p(actorId)) all += actorId
- }
- all.toArray
- }
- /**
- * Finds all actors that are subtypes of the class passed in as the ClassTag argument.
- */
- def actorsFor[T <: Actor](implicit classTag: ClassTag[T]): Array[ActorRef] =
- actorsFor[T](classTag.erasure.asInstanceOf[Class[T]])
- /**
- * Finds any actor that matches T. Very expensive, traverses ALL alive actors.
- */
- def actorFor[T <: Actor](implicit classTag: ClassTag[T]): Option[ActorRef] =
- find({ case a: ActorRef if classTag.erasure.isAssignableFrom( => a })
- /**
- * Finds all actors of type or sub-type specified by the class passed in as the Class argument.
- */
- def actorsFor[T <: Actor](clazz: Class[T]): Array[ActorRef] =
- filter(a => clazz.isAssignableFrom(
- /**
- * Finds all actors that has a specific id.
- */
- def actorsFor(id: String): Array[ActorRef] = actorsById values id
- /**
- * Finds the actor that has a specific UUID.
- */
- def actorFor(uuid: Uuid): Option[ActorRef] = Option(actorsByUUID get uuid)
- /**
- * Returns all typed actors in the system.
- */
- def typedActors: Array[AnyRef] = filterTypedActors(_ => true)
- /**
- * Invokes a function for all typed actors.
- */
- def foreachTypedActor(f: (AnyRef) => Unit) = {
- TypedActorModule.ensureEnabled
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) {
- val proxy = typedActorFor(elements.nextElement)
- if (proxy.isDefined) f(proxy.get)
- }
- }
- /**
- * Invokes the function on all known typed actors until it returns Some
- * Returns None if the function never returns Some
- */
- def findTypedActor[T](f: PartialFunction[AnyRef, T]): Option[T] = {
- TypedActorModule.ensureEnabled
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) {
- val proxy = typedActorFor(elements.nextElement)
- if (proxy.isDefined && (f isDefinedAt proxy)) return Some(f(proxy))
- }
- None
- }
- /**
- * Finds all typed actors that satisfy a predicate.
- */
- def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = {
- TypedActorModule.ensureEnabled
- val all = new ListBuffer[AnyRef]
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) {
- val proxy = typedActorFor(elements.nextElement)
- if (proxy.isDefined && p(proxy.get)) all += proxy.get
- }
- all.toArray
- }
- /**
- * Finds all typed actors that are subtypes of the class passed in as the ClassTag argument.
- */
- def typedActorsFor[T <: AnyRef](implicit classTag: ClassTag[T]): Array[AnyRef] = {
- TypedActorModule.ensureEnabled
- typedActorsFor[T](classTag.erasure.asInstanceOf[Class[T]])
- }
- /**
- * Finds any typed actor that matches T.
- */
- def typedActorFor[T <: AnyRef](implicit classTag: ClassTag[T]): Option[AnyRef] = {
- TypedActorModule.ensureEnabled
- def predicate(proxy: AnyRef): Boolean = {
- val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
- actorRef.isDefined && classTag.erasure.isAssignableFrom(
- }
- findTypedActor({ case a: Some[AnyRef] if predicate(a.get) => a })
- }
- /**
- * Finds all typed actors of type or sub-type specified by the class passed in as the Class argument.
- */
- def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = {
- TypedActorModule.ensureEnabled
- def predicate(proxy: AnyRef): Boolean = {
- val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
- actorRef.isDefined && clazz.isAssignableFrom(
- }
- filterTypedActors(predicate)
- }
- /**
- * Finds all typed actors that have a specific id.
- */
- def typedActorsFor(id: String): Array[AnyRef] = {
- TypedActorModule.ensureEnabled
- val actorRefs = actorsById values id
- actorRefs.flatMap(typedActorFor(_))
- }
- /**
- * Finds the typed actor that has a specific UUID.
- */
- def typedActorFor(uuid: Uuid): Option[AnyRef] = {
- TypedActorModule.ensureEnabled
- val actorRef = actorsByUUID get uuid
- if (actorRef eq null) None
- else typedActorFor(actorRef)
- }
- /**
- * Get the typed actor proxy for a given typed actor ref.
- */
- private def typedActorFor(actorRef: ActorRef): Option[AnyRef] = {
- TypedActorModule.typedActorObjectInstance.get.proxyFor(actorRef)
- }
- /**
- * Registers an actor in the ActorRegistry.
- */
- private[akka] def register(actor: ActorRef) {
- val id =
- val uuid = actor.uuid
- actorsById.put(id, actor)
- actorsByUUID.put(uuid, actor)
- // notify listeners
- notifyListeners(ActorRegistered(actor))
- }
- /**
- * Unregisters an actor in the ActorRegistry.
- */
- private[akka] def unregister(actor: ActorRef) {
- val id =
- val uuid = actor.uuid
- actorsByUUID remove uuid
- actorsById.remove(id, actor)
- // notify listeners
- notifyListeners(ActorUnregistered(actor))
- }
- /**
- * Shuts down and unregisters all actors in the system.
- */
- def shutdownAll() {
- if (TypedActorModule.isEnabled) {
- val elements = actorsByUUID.elements
- while (elements.hasMoreElements) {
- val actorRef = elements.nextElement
- val proxy = typedActorFor(actorRef)
- if (proxy.isDefined) TypedActorModule.typedActorObjectInstance.get.stop(proxy.get)
- else actorRef.stop()
- }
- } else foreach(_.stop())
- if (Remote.isEnabled) {
- Actor.remote.clear //TODO: REVISIT: Should this be here?
- }
- actorsByUUID.clear
- actorsById.clear
- }
- * An implementation of a ConcurrentMultiMap
- * Adds/remove is serialized over the specified key
- * Reads are fully concurrent <-- el-cheapo
- *
- * @author Viktor Klang
- */
-class Index[K <: AnyRef, V <: AnyRef: ArrayTag] {
- private val Naught = Array[V]() //Nil for Arrays
- private val container = new ConcurrentHashMap[K, JSet[V]]
- private val emptySet = new ConcurrentSkipListSet[V]
- /**
- * Associates the value of type V with the key of type K
- * @return true if the value didn't exist for the key previously, and false otherwise
- */
- def put(key: K, value: V): Boolean = {
- //Tailrecursive spin-locking put
- @tailrec
- def spinPut(k: K, v: V): Boolean = {
- var retry = false
- var added = false
- val set = container get k
- if (set ne null) {
- set.synchronized {
- if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
- else { //Else add the value to the set and signal that retry is not needed
- added = set add v
- retry = false
- }
- }
- } else {
- val newSet = new ConcurrentSkipListSet[V]
- newSet add v
- // Parry for two simultaneous putIfAbsent(id,newSet)
- val oldSet = container.putIfAbsent(k, newSet)
- if (oldSet ne null) {
- oldSet.synchronized {
- if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
- else { //Else try to add the value to the set and signal that retry is not needed
- added = oldSet add v
- retry = false
- }
- }
- } else added = true
- }
- if (retry) spinPut(k, v)
- else added
- }
- spinPut(key, value)
- }
- /**
- * @return a _new_ array of all existing values for the given key at the time of the call
- */
- def values(key: K): Array[V] = {
- val set: JSet[V] = container get key
- val result = if (set ne null) set toArray Naught else Naught
- result.asInstanceOf[Array[V]]
- }
- /**
- * @return Some(value) for the first matching value where the supplied function returns true for the given key,
- * if no matches it returns None
- */
- def findValue(key: K)(f: (V) => Boolean): Option[V] = {
- import scala.collection.convert.wrapAsScala._
- val set = container get key
- if (set ne null) set.iterator.find(f)
- else None
- }
- /**
- * Applies the supplied function to all keys and their values
- */
- def foreach(fun: (K, V) => Unit) {
- import scala.collection.convert.wrapAsScala._
- container.entrySet foreach { (e) =>
- e.getValue.foreach(fun(e.getKey, _))
- }
- }
- /**
- * Disassociates the value of type V from the key of type K
- * @return true if the value was disassociated from the key and false if it wasn't previously associated with the key
- */
- def remove(key: K, value: V): Boolean = {
- val set = container get key
- if (set ne null) {
- set.synchronized {
- if (set.remove(value)) { //If we can remove the value
- if (set.isEmpty) //and the set becomes empty
- container.remove(key, emptySet) //We try to remove the key if it's mapped to an empty set
- true //Remove succeeded
- } else false //Remove failed
- }
- } else false //Remove failed
- }
- /**
- * @return true if the underlying containers is empty, may report false negatives when the last remove is underway
- */
- def isEmpty: Boolean = container.isEmpty
- /**
- * Removes all keys and all values
- */
- def clear = foreach { case (k, v) => remove(k, v) }