/** * Copyright (C) 2009-2011 Scalable Solutions AB */ package akka.actor import akka.AkkaException import akka.util._ import ReflectiveAccess._ import Actor._ import java.util.concurrent.{ CopyOnWriteArrayList, ConcurrentHashMap } import java.net.InetSocketAddress import akka.config.Supervision._ class SupervisorException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) /** * Factory object for creating supervisors declarative. It creates instances of the 'Supervisor' class. * These are not actors, if you need a supervisor that is an Actor then you have to use the 'SupervisorActor' * factory object. *

* * Here is a sample on how to use it: *

 *  val supervisor = Supervisor(
 *    SupervisorConfig(
 *      RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
 *      Supervise(
 *        myFirstActor,
 *        Permanent) ::
 *      Supervise(
 *        mySecondActor,
 *        Permanent) ::
 *      Nil))
 * 
* * You dynamically link and unlink child children using the 'link' and 'unlink' methods. *
 * supervisor.link(child)
 * supervisor.unlink(child)
 * 
* * @author Jonas Bonér */ object Supervisor { def apply(config: SupervisorConfig): Supervisor = SupervisorFactory(config).newInstance.start } /** * Use this factory instead of the Supervisor factory object if you want to control * instantiation and starting of the Supervisor, if not then it is easier and better * to use the Supervisor factory object. *

* Example usage: *

 *  val factory = SupervisorFactory(
 *    SupervisorConfig(
 *      RestartStrategy(OneForOne, 3, 10, List(classOf[Exception]),
 *      Supervise(
 *        myFirstActor,
 *        Permanent) ::
 *      Supervise(
 *        mySecondActor,
 *        Permanent) ::
 *      Nil))
 * 
* * Then create a new Supervisor tree with the concrete Services we have defined. * *
 * val supervisor = factory.newInstance
 * supervisor.start // start up all managed servers
 * 
* * @author Jonas Bonér */ case class SupervisorFactory(val config: SupervisorConfig) { def newInstance: Supervisor = newInstanceFor(config) def newInstanceFor(config: SupervisorConfig): Supervisor = { val supervisor = new Supervisor(config.restartStrategy, config.maxRestartsHandler) supervisor.configure(config) supervisor.start supervisor } } /** * NOTE: *

* The supervisor class is only used for the configuration system when configuring supervisor * hierarchies declaratively. Should not be used as part of the regular programming API. Instead * wire the children together using 'link', 'spawnLink' etc. and set the 'trapExit' flag in the * children that should trap error signals and trigger restart. *

* See the Scaladoc for the SupervisorFactory for an example on how to declaratively wire up children. * * @author Jonas Bonér */ sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) => Unit) { import Supervisor._ private val _childActors = new ConcurrentHashMap[String, List[ActorRef]] private val _childSupervisors = new CopyOnWriteArrayList[Supervisor] private[akka] val supervisor = actorOf(new SupervisorActor(handler, maxRestartsHandler)).start() def uuid = supervisor.uuid def start: Supervisor = { this } def shutdown(): Unit = supervisor.stop() def link(child: ActorRef) = supervisor.link(child) def unlink(child: ActorRef) = supervisor.unlink(child) def children: List[ActorRef] = _childActors.values.toArray.toList.asInstanceOf[List[List[ActorRef]]].flatten def childSupervisors: List[Supervisor] = _childActors.values.toArray.toList.asInstanceOf[List[Supervisor]] def configure(config: SupervisorConfig): Unit = config match { case SupervisorConfig(_, servers, _) => servers.map(server => server match { case Supervise(actorRef, lifeCycle, registerAsRemoteService) => actorRef.start() val className = actorRef.actor.getClass.getName val currentActors = { val list = _childActors.get(className) if (list eq null) List[ActorRef]() else list } _childActors.put(className, actorRef :: currentActors) actorRef.lifeCycle = lifeCycle supervisor.link(actorRef) if (registerAsRemoteService) Actor.remote.register(actorRef) case supervisorConfig@SupervisorConfig(_, _, _) => // recursive supervisor configuration val childSupervisor = Supervisor(supervisorConfig) supervisor.link(childSupervisor.supervisor) _childSupervisors.add(childSupervisor) }) } } /** * For internal use only. * * @author Jonas Bonér */ final class SupervisorActor private[akka] (handler: FaultHandlingStrategy, maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) => Unit) extends Actor { self.faultHandler = handler override def postStop(): Unit = { val i = self.linkedActors.values.iterator while (i.hasNext) { val ref = i.next ref.stop() self.unlink(ref) } } def receive = { case max@MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) => maxRestartsHandler(self, max) case unknown => throw new SupervisorException( "SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]") } }