summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/actor/ActorRegistry.scala
blob: 5d649fcd36f18558307e6de578ed2663162d417a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
/**
 * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
 */

package akka.actor

import scala.collection.mutable.{ ListBuffer, Map }
import scala.reflect.ArrayTag

import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap }
import java.util.{ Set => JSet }

import annotation.tailrec
import akka.util.ReflectiveAccess._
import akka.util.{ ReflectiveAccess, ReadWriteGuard, ListenerManagement }

/**
 * Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
 *
 * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
 */
sealed trait ActorRegistryEvent
case class ActorRegistered(actor: ActorRef) extends ActorRegistryEvent
case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent

/**
 * Registry holding all Actor instances in the whole system.
 * Mapped by:
 * <ul>
 * <li>the Actor's UUID</li>
 * <li>the Actor's id field (which can be set by user-code)</li>
 * <li>the Actor's class</li>
 * <li>all Actors that are subtypes of a specific type</li>
 * <ul>
 *
 * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
 */

final class ActorRegistry private[actor] () extends ListenerManagement {

  private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef]
  private val actorsById = new Index[String, ActorRef]
  private val guard = new ReadWriteGuard

  /**
   * Returns all actors in the system.
   */
  def actors: Array[ActorRef] = filter(_ => true)

  /**
   * Returns the number of actors in the system.
   */
  def size: Int = actorsByUUID.size

  /**
   * Invokes a function for all actors.
   */
  def foreach(f: (ActorRef) => Unit) = {
    val elements = actorsByUUID.elements
    while (elements.hasMoreElements) f(elements.nextElement)
  }

  /**
   * Invokes the function on all known actors until it returns Some
   * Returns None if the function never returns Some
   */
  def find[T](f: PartialFunction[ActorRef, T]): Option[T] = {
    val elements = actorsByUUID.elements
    while (elements.hasMoreElements) {
      val element = elements.nextElement
      if (f isDefinedAt element) return Some(f(element))
    }
    None
  }

  /**
   * Finds all actors that are subtypes of the class passed in as the ClassTag argument and supporting passed message.
   */
  def actorsFor[T <: Actor](message: Any)(implicit classTag: ClassTag[T]): Array[ActorRef] =
    filter(a => classTag.erasure.isAssignableFrom(a.actor.getClass) && a.isDefinedAt(message))

  /**
   * Finds all actors that satisfy a predicate.
   */
  def filter(p: ActorRef => Boolean): Array[ActorRef] = {
    val all = new ListBuffer[ActorRef]
    val elements = actorsByUUID.elements
    while (elements.hasMoreElements) {
      val actorId = elements.nextElement
      if (p(actorId)) all += actorId
    }
    all.toArray
  }

  /**
   * Finds all actors that are subtypes of the class passed in as the ClassTag argument.
   */
  def actorsFor[T <: Actor](implicit classTag: ClassTag[T]): Array[ActorRef] =
    actorsFor[T](classTag.erasure.asInstanceOf[Class[T]])

  /**
   * Finds any actor that matches T. Very expensive, traverses ALL alive actors.
   */
  def actorFor[T <: Actor](implicit classTag: ClassTag[T]): Option[ActorRef] =
    find({ case a: ActorRef if classTag.erasure.isAssignableFrom(a.actor.getClass) => a })

  /**
   * Finds all actors of type or sub-type specified by the class passed in as the Class argument.
   */
  def actorsFor[T <: Actor](clazz: Class[T]): Array[ActorRef] =
    filter(a => clazz.isAssignableFrom(a.actor.getClass))

  /**
   * Finds all actors that has a specific id.
   */
  def actorsFor(id: String): Array[ActorRef] = actorsById values id

  /**
   * Finds the actor that has a specific UUID.
   */
  def actorFor(uuid: Uuid): Option[ActorRef] = Option(actorsByUUID get uuid)

  /**
   * Returns all typed actors in the system.
   */
  def typedActors: Array[AnyRef] = filterTypedActors(_ => true)

  /**
   * Invokes a function for all typed actors.
   */
  def foreachTypedActor(f: (AnyRef) => Unit) = {
    TypedActorModule.ensureEnabled
    val elements = actorsByUUID.elements
    while (elements.hasMoreElements) {
      val proxy = typedActorFor(elements.nextElement)
      if (proxy.isDefined) f(proxy.get)
    }
  }

  /**
   * Invokes the function on all known typed actors until it returns Some
   * Returns None if the function never returns Some
   */
  def findTypedActor[T](f: PartialFunction[AnyRef, T]): Option[T] = {
    TypedActorModule.ensureEnabled
    val elements = actorsByUUID.elements
    while (elements.hasMoreElements) {
      val proxy = typedActorFor(elements.nextElement)
      if (proxy.isDefined && (f isDefinedAt proxy)) return Some(f(proxy))
    }
    None
  }

  /**
   * Finds all typed actors that satisfy a predicate.
   */
  def filterTypedActors(p: AnyRef => Boolean): Array[AnyRef] = {
    TypedActorModule.ensureEnabled
    val all = new ListBuffer[AnyRef]
    val elements = actorsByUUID.elements
    while (elements.hasMoreElements) {
      val proxy = typedActorFor(elements.nextElement)
      if (proxy.isDefined && p(proxy.get)) all += proxy.get
    }
    all.toArray
  }

  /**
   * Finds all typed actors that are subtypes of the class passed in as the ClassTag argument.
   */
  def typedActorsFor[T <: AnyRef](implicit classTag: ClassTag[T]): Array[AnyRef] = {
    TypedActorModule.ensureEnabled
    typedActorsFor[T](classTag.erasure.asInstanceOf[Class[T]])
  }

  /**
   * Finds any typed actor that matches T.
   */
  def typedActorFor[T <: AnyRef](implicit classTag: ClassTag[T]): Option[AnyRef] = {
    TypedActorModule.ensureEnabled
    def predicate(proxy: AnyRef): Boolean = {
      val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
      actorRef.isDefined && classTag.erasure.isAssignableFrom(actorRef.get.actor.getClass)
    }
    findTypedActor({ case a: Some[AnyRef] if predicate(a.get) => a })
  }

  /**
   * Finds all typed actors of type or sub-type specified by the class passed in as the Class argument.
   */
  def typedActorsFor[T <: AnyRef](clazz: Class[T]): Array[AnyRef] = {
    TypedActorModule.ensureEnabled
    def predicate(proxy: AnyRef): Boolean = {
      val actorRef = TypedActorModule.typedActorObjectInstance.get.actorFor(proxy)
      actorRef.isDefined && clazz.isAssignableFrom(actorRef.get.actor.getClass)
    }
    filterTypedActors(predicate)
  }

  /**
   * Finds all typed actors that have a specific id.
   */
  def typedActorsFor(id: String): Array[AnyRef] = {
    TypedActorModule.ensureEnabled
    val actorRefs = actorsById values id
    actorRefs.flatMap(typedActorFor(_))
  }

  /**
   * Finds the typed actor that has a specific UUID.
   */
  def typedActorFor(uuid: Uuid): Option[AnyRef] = {
    TypedActorModule.ensureEnabled
    val actorRef = actorsByUUID get uuid
    if (actorRef eq null) None
    else typedActorFor(actorRef)
  }

  /**
   * Get the typed actor proxy for a given typed actor ref.
   */
  private def typedActorFor(actorRef: ActorRef): Option[AnyRef] = {
    TypedActorModule.typedActorObjectInstance.get.proxyFor(actorRef)
  }

  /**
   *  Registers an actor in the ActorRegistry.
   */
  private[akka] def register(actor: ActorRef) {
    val id = actor.id
    val uuid = actor.uuid

    actorsById.put(id, actor)
    actorsByUUID.put(uuid, actor)

    // notify listeners
    notifyListeners(ActorRegistered(actor))
  }

  /**
   * Unregisters an actor in the ActorRegistry.
   */
  private[akka] def unregister(actor: ActorRef) {
    val id = actor.id
    val uuid = actor.uuid

    actorsByUUID remove uuid
    actorsById.remove(id, actor)

    // notify listeners
    notifyListeners(ActorUnregistered(actor))
  }

  /**
   * Shuts down and unregisters all actors in the system.
   */
  def shutdownAll() {
    if (TypedActorModule.isEnabled) {
      val elements = actorsByUUID.elements
      while (elements.hasMoreElements) {
        val actorRef = elements.nextElement
        val proxy = typedActorFor(actorRef)
        if (proxy.isDefined) TypedActorModule.typedActorObjectInstance.get.stop(proxy.get)
        else actorRef.stop()
      }
    } else foreach(_.stop())
    if (Remote.isEnabled) {
      Actor.remote.clear //TODO: REVISIT: Should this be here?
    }
    actorsByUUID.clear
    actorsById.clear
  }
}

/**
 * An implementation of a ConcurrentMultiMap
 * Adds/remove is serialized over the specified key
 * Reads are fully concurrent <-- el-cheapo
 *
 * @author Viktor Klang
 */
class Index[K <: AnyRef, V <: AnyRef: ArrayTag] {
  private val Naught = Array[V]() //Nil for Arrays
  private val container = new ConcurrentHashMap[K, JSet[V]]
  private val emptySet = new ConcurrentSkipListSet[V]

  /**
   * Associates the value of type V with the key of type K
   * @return true if the value didn't exist for the key previously, and false otherwise
   */
  def put(key: K, value: V): Boolean = {
    //Tailrecursive spin-locking put
    @tailrec
    def spinPut(k: K, v: V): Boolean = {
      var retry = false
      var added = false
      val set = container get k

      if (set ne null) {
        set.synchronized {
          if (set.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
          else { //Else add the value to the set and signal that retry is not needed
            added = set add v
            retry = false
          }
        }
      } else {
        val newSet = new ConcurrentSkipListSet[V]
        newSet add v

        // Parry for two simultaneous putIfAbsent(id,newSet)
        val oldSet = container.putIfAbsent(k, newSet)
        if (oldSet ne null) {
          oldSet.synchronized {
            if (oldSet.isEmpty) retry = true //IF the set is empty then it has been removed, so signal retry
            else { //Else try to add the value to the set and signal that retry is not needed
              added = oldSet add v
              retry = false
            }
          }
        } else added = true
      }

      if (retry) spinPut(k, v)
      else added
    }

    spinPut(key, value)
  }

  /**
   * @return a _new_ array of all existing values for the given key at the time of the call
   */
  def values(key: K): Array[V] = {
    val set: JSet[V] = container get key
    val result = if (set ne null) set toArray Naught else Naught
    result.asInstanceOf[Array[V]]
  }

  /**
   * @return Some(value) for the first matching value where the supplied function returns true for the given key,
   * if no matches it returns None
   */
  def findValue(key: K)(f: (V) => Boolean): Option[V] = {
    import scala.collection.JavaConversions._
    val set = container get key
    if (set ne null) set.iterator.find(f)
    else None
  }

  /**
   * Applies the supplied function to all keys and their values
   */
  def foreach(fun: (K, V) => Unit) {
    import scala.collection.JavaConversions._
    container.entrySet foreach { (e) =>
      e.getValue.foreach(fun(e.getKey, _))
    }
  }

  /**
   * Disassociates the value of type V from the key of type K
   * @return true if the value was disassociated from the key and false if it wasn't previously associated with the key
   */
  def remove(key: K, value: V): Boolean = {
    val set = container get key

    if (set ne null) {
      set.synchronized {
        if (set.remove(value)) { //If we can remove the value
          if (set.isEmpty) //and the set becomes empty
            container.remove(key, emptySet) //We try to remove the key if it's mapped to an empty set

          true //Remove succeeded
        } else false //Remove failed
      }
    } else false //Remove failed
  }

  /**
   * @return true if the underlying containers is empty, may report false negatives when the last remove is underway
   */
  def isEmpty: Boolean = container.isEmpty

  /**
   *  Removes all keys and all values
   */
  def clear = foreach { case (k, v) => remove(k, v) }
}