summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/scheduler/ActorGC.scala
blob: a27799d13217ef64b88b34db446631fa05cc80c3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2013, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */


package scala.actors
package scheduler

import java.lang.ref.{Reference, WeakReference, ReferenceQueue}
import scala.collection.mutable

/**
 * ActorGC keeps track of the number of live actors being managed by a
 * a scheduler so that it can shutdown when all of the actors it manages have
 * either been explicitly terminated or garbage collected.
 *
 * When an actor is started, it is registered with the ActorGC via the
 * `newActor` method, and when an actor is knowingly terminated
 * (e.g. act method finishes, exit explicitly called, an exception is thrown),
 * the ActorGC is informed via the `terminated` method.
 */
@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
trait ActorGC extends TerminationMonitor {
  self: IScheduler =>

  /** Actors are added to refQ in newActor. */
  private val refQ = new ReferenceQueue[TrackedReactor]

  /**
   * This is a set of references to all the actors registered with
   * this ActorGC. It is maintained so that the WeakReferences will
   * not be GC'd before the actors to which they point.
   */
  private val refSet = new mutable.HashSet[Reference[t] forSome { type t <: TrackedReactor }]

  /** newActor is invoked whenever a new actor is started. */
  override def newActor(a: TrackedReactor) = synchronized {
    // registers a reference to the actor with the ReferenceQueue
    val wr = new WeakReference[TrackedReactor](a, refQ)
    refSet += wr
    activeActors += 1
  }

  /** Checks for actors that have become garbage. */
  protected override def gc() = synchronized {
    // check for unreachable actors
    def drainRefQ() {
      val wr = refQ.poll
      if (wr != null) {
        activeActors -= 1
        refSet -= wr
        // continue draining
        drainRefQ()
      }
    }
    drainRefQ()
  }

  /** Prints some status information on currently managed actors. */
  protected def status() {
    println(this+": size of refSet: "+refSet.size)
  }

  /** Checks whether all actors have terminated. */
  override private[actors] def allActorsTerminated: Boolean = synchronized {
    activeActors <= 0
  }

  override def onTerminate(a: TrackedReactor)(f: => Unit): Unit = synchronized {
    terminationHandlers += (a -> (() => f))
  }

  override def terminated(a: TrackedReactor) = {
    super.terminated(a)

    synchronized {
      // find the weak reference that points to the terminated actor, if any
      refSet.find((ref: Reference[t] forSome { type t <: TrackedReactor }) => ref.get() == a) match {
        case Some(r) =>
          // invoking clear will not cause r to be enqueued
          r.clear()
          refSet -= r.asInstanceOf[Reference[t] forSome { type t <: TrackedReactor }]
        case None =>
          // do nothing
      }
    }
  }

  private[actors] def getPendingCount = synchronized {
    activeActors
  }

  private[actors] def setPendingCount(cnt: Int) = synchronized {
    activeActors = cnt
  }

}