diff options
author | Paul Phillips <paulp@improving.org> | 2011-12-05 10:05:01 -0800 |
---|---|---|
committer | Paul Phillips <paulp@improving.org> | 2011-12-05 10:05:01 -0800 |
commit | 09ba583b1e08b96d9b1d703a1c0c6bdaa55ae7f7 (patch) | |
tree | fc8661a15dc929c43a815445507a35e05f266185 /test/disabled/presentation/akka/src/akka/actor/FSM.scala | |
parent | 8b1e0225fdde17be06d064dece1f1851bd5bde03 (diff) | |
download | scala-09ba583b1e08b96d9b1d703a1c0c6bdaa55ae7f7.tar.gz scala-09ba583b1e08b96d9b1d703a1c0c6bdaa55ae7f7.tar.bz2 scala-09ba583b1e08b96d9b1d703a1c0c6bdaa55ae7f7.zip |
Disabled non-deterministic tests.
Everyone's favorite "will they or won't they" tests, akka and timeofday.
They will be welcomed back into the fold once they can stick to a
decision on whether to pass or fail.
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/actor/FSM.scala')
-rw-r--r-- | test/disabled/presentation/akka/src/akka/actor/FSM.scala | 527 |
1 files changed, 527 insertions, 0 deletions
diff --git a/test/disabled/presentation/akka/src/akka/actor/FSM.scala b/test/disabled/presentation/akka/src/akka/actor/FSM.scala new file mode 100644 index 0000000000..d9cd9a9ca2 --- /dev/null +++ b/test/disabled/presentation/akka/src/akka/actor/FSM.scala @@ -0,0 +1,527 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> + */ +package akka.actor + +import akka.util._ +import akka.event.EventHandler + +import scala.collection.mutable +import java.util.concurrent.ScheduledFuture + +object FSM { + + object NullFunction extends PartialFunction[Any, Nothing] { + def isDefinedAt(o: Any) = false + def apply(o: Any) = sys.error("undefined") + } + + case class CurrentState[S](fsmRef: ActorRef, state: S) + case class Transition[S](fsmRef: ActorRef, from: S, to: S) + case class SubscribeTransitionCallBack(actorRef: ActorRef) + case class UnsubscribeTransitionCallBack(actorRef: ActorRef) + + sealed trait Reason + case object Normal extends Reason + case object Shutdown extends Reason + case class Failure(cause: Any) extends Reason + + case object StateTimeout + case class TimeoutMarker(generation: Long) + + case class Timer(name: String, msg: AnyRef, repeat: Boolean, generation: Int) { + private var ref: Option[ScheduledFuture[AnyRef]] = _ + + def schedule(actor: ActorRef, timeout: Duration) { + if (repeat) { + ref = Some(Scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit)) + } else { + ref = Some(Scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit)) + } + } + + def cancel { + if (ref.isDefined) { + ref.get.cancel(true) + ref = None + } + } + } + + /* + * This extractor is just convenience for matching a (S, S) pair, including a + * reminder what the new state is. + */ + object -> { + def unapply[S](in: (S, S)) = Some(in) + } + + /* + * With these implicits in scope, you can write "5 seconds" anywhere a + * Duration or Option[Duration] is expected. This is conveniently true + * for derived classes. + */ + implicit def d2od(d: Duration): Option[Duration] = Some(d) +} + +/** + * Finite State Machine actor trait. Use as follows: + * + * <pre> + * object A { + * trait State + * case class One extends State + * case class Two extends State + * + * case class Data(i : Int) + * } + * + * class A extends Actor with FSM[A.State, A.Data] { + * import A._ + * + * startWith(One, Data(42)) + * when(One) { + * case Event(SomeMsg, Data(x)) => ... + * case Ev(SomeMsg) => ... // convenience when data not needed + * } + * when(Two, stateTimeout = 5 seconds) { ... } + * initialize + * } + * </pre> + * + * Within the partial function the following values are returned for effecting + * state transitions: + * + * - <code>stay</code> for staying in the same state + * - <code>stay using Data(...)</code> for staying in the same state, but with + * different data + * - <code>stay forMax 5.millis</code> for staying with a state timeout; can be + * combined with <code>using</code> + * - <code>goto(...)</code> for changing into a different state; also supports + * <code>using</code> and <code>forMax</code> + * - <code>stop</code> for terminating this FSM actor + * + * Each of the above also supports the method <code>replying(AnyRef)</code> for + * sending a reply before changing state. + * + * While changing state, custom handlers may be invoked which are registered + * using <code>onTransition</code>. This is meant to enable concentrating + * different concerns in different places; you may choose to use + * <code>when</code> for describing the properties of a state, including of + * course initiating transitions, but you can describe the transitions using + * <code>onTransition</code> to avoid having to duplicate that code among + * multiple paths which lead to a transition: + * + * <pre> + * onTransition { + * case Active -> _ => cancelTimer("activeTimer") + * } + * </pre> + * + * Multiple such blocks are supported and all of them will be called, not only + * the first matching one. + * + * Another feature is that other actors may subscribe for transition events by + * sending a <code>SubscribeTransitionCallback</code> message to this actor; + * use <code>UnsubscribeTransitionCallback</code> before stopping the other + * actor. + * + * State timeouts set an upper bound to the time which may pass before another + * message is received in the current state. If no external message is + * available, then upon expiry of the timeout a StateTimeout message is sent. + * Note that this message will only be received in the state for which the + * timeout was set and that any message received will cancel the timeout + * (possibly to be started again by the next transition). + * + * Another feature is the ability to install and cancel single-shot as well as + * repeated timers which arrange for the sending of a user-specified message: + * + * <pre> + * setTimer("tock", TockMsg, 1 second, true) // repeating + * setTimer("lifetime", TerminateMsg, 1 hour, false) // single-shot + * cancelTimer("tock") + * timerActive_? ("tock") + * </pre> + */ +trait FSM[S, D] extends ListenerManagement { + this: Actor => + + import FSM._ + + type StateFunction = scala.PartialFunction[Event[D], State] + type Timeout = Option[Duration] + type TransitionHandler = PartialFunction[(S, S), Unit] + + /** + * **************************************** + * DSL + * **************************************** + */ + + /** + * Insert a new StateFunction at the end of the processing chain for the + * given state. If the stateTimeout parameter is set, entering this state + * without a differing explicit timeout setting will trigger a StateTimeout + * event; the same is true when using #stay. + * + * @param stateName designator for the state + * @param stateTimeout default state timeout for this state + * @param stateFunction partial function describing response to input + */ + protected final def when(stateName: S, stateTimeout: Timeout = None)(stateFunction: StateFunction) = { + register(stateName, stateFunction, stateTimeout) + } + + /** + * Set initial state. Call this method from the constructor before the #initialize method. + * + * @param stateName initial state designator + * @param stateData initial state data + * @param timeout state timeout for the initial state, overriding the default timeout for that state + */ + protected final def startWith(stateName: S, + stateData: D, + timeout: Timeout = None) = { + currentState = State(stateName, stateData, timeout) + } + + /** + * Produce transition to other state. Return this from a state function in + * order to effect the transition. + * + * @param nextStateName state designator for the next state + * @return state transition descriptor + */ + protected final def goto(nextStateName: S): State = { + State(nextStateName, currentState.stateData) + } + + /** + * Produce "empty" transition descriptor. Return this from a state function + * when no state change is to be effected. + * + * @return descriptor for staying in current state + */ + protected final def stay(): State = { + // cannot directly use currentState because of the timeout field + goto(currentState.stateName) + } + + /** + * Produce change descriptor to stop this FSM actor with reason "Normal". + */ + protected final def stop(): State = { + stop(Normal) + } + + /** + * Produce change descriptor to stop this FSM actor including specified reason. + */ + protected final def stop(reason: Reason): State = { + stop(reason, currentState.stateData) + } + + /** + * Produce change descriptor to stop this FSM actor including specified reason. + */ + protected final def stop(reason: Reason, stateData: D): State = { + stay using stateData withStopReason (reason) + } + + /** + * Schedule named timer to deliver message after given delay, possibly repeating. + * @param name identifier to be used with cancelTimer() + * @param msg message to be delivered + * @param timeout delay of first message delivery and between subsequent messages + * @param repeat send once if false, scheduleAtFixedRate if true + * @return current state descriptor + */ + protected final def setTimer(name: String, msg: AnyRef, timeout: Duration, repeat: Boolean): State = { + if (timers contains name) { + timers(name).cancel + } + val timer = Timer(name, msg, repeat, timerGen.next) + timer.schedule(self, timeout) + timers(name) = timer + stay + } + + /** + * Cancel named timer, ensuring that the message is not subsequently delivered (no race). + * @param name of the timer to cancel + */ + protected final def cancelTimer(name: String) = { + if (timers contains name) { + timers(name).cancel + timers -= name + } + } + + /** + * Inquire whether the named timer is still active. Returns true unless the + * timer does not exist, has previously been canceled or if it was a + * single-shot timer whose message was already received. + */ + protected final def timerActive_?(name: String) = timers contains name + + /** + * Set state timeout explicitly. This method can safely be used from within a + * state handler. + */ + protected final def setStateTimeout(state: S, timeout: Timeout) { + stateTimeouts(state) = timeout + } + + /** + * Set handler which is called upon each state transition, i.e. not when + * staying in the same state. This may use the pair extractor defined in the + * FSM companion object like so: + * + * <pre> + * onTransition { + * case Old -> New => doSomething + * } + * </pre> + * + * It is also possible to supply a 2-ary function object: + * + * <pre> + * onTransition(handler _) + * + * private def handler(from: S, to: S) { ... } + * </pre> + * + * The underscore is unfortunately necessary to enable the nicer syntax shown + * above (it uses the implicit conversion total2pf under the hood). + * + * <b>Multiple handlers may be installed, and every one of them will be + * called, not only the first one matching.</b> + */ + protected final def onTransition(transitionHandler: TransitionHandler) { + transitionEvent :+= transitionHandler + } + + /** + * Convenience wrapper for using a total function instead of a partial + * function literal. To be used with onTransition. + */ + implicit protected final def total2pf(transitionHandler: (S, S) => Unit) = + new PartialFunction[(S, S), Unit] { + def isDefinedAt(in: (S, S)) = true + def apply(in: (S, S)) { transitionHandler(in._1, in._2) } + } + + /** + * Set handler which is called upon termination of this FSM actor. + */ + protected final def onTermination(terminationHandler: PartialFunction[StopEvent[S, D], Unit]) = { + terminateEvent = terminationHandler + } + + /** + * Set handler which is called upon reception of unhandled messages. + */ + protected final def whenUnhandled(stateFunction: StateFunction) = { + handleEvent = stateFunction orElse handleEventDefault + } + + /** + * Verify existence of initial state and setup timers. This should be the + * last call within the constructor. + */ + def initialize { + makeTransition(currentState) + } + + /** + * **************************************************************** + * PRIVATE IMPLEMENTATION DETAILS + * **************************************************************** + */ + + /* + * FSM State data and current timeout handling + */ + private var currentState: State = _ + private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + private var generation: Long = 0L + + /* + * Timer handling + */ + private val timers = mutable.Map[String, Timer]() + private val timerGen = Iterator from 0 + + /* + * State definitions + */ + private val stateFunctions = mutable.Map[S, StateFunction]() + private val stateTimeouts = mutable.Map[S, Timeout]() + + private def register(name: S, function: StateFunction, timeout: Timeout) { + if (stateFunctions contains name) { + stateFunctions(name) = stateFunctions(name) orElse function + stateTimeouts(name) = timeout orElse stateTimeouts(name) + } else { + stateFunctions(name) = function + stateTimeouts(name) = timeout + } + } + + /* + * unhandled event handler + */ + private val handleEventDefault: StateFunction = { + case Event(value, stateData) => + stay + } + private var handleEvent: StateFunction = handleEventDefault + + /* + * termination handling + */ + private var terminateEvent: PartialFunction[StopEvent[S, D], Unit] = { + case StopEvent(Failure(cause), _, _) => + case StopEvent(reason, _, _) => + } + + /* + * transition handling + */ + private var transitionEvent: List[TransitionHandler] = Nil + private def handleTransition(prev: S, next: S) { + val tuple = (prev, next) + for (te ← transitionEvent) { if (te.isDefinedAt(tuple)) te(tuple) } + } + + // ListenerManagement shall not start() or stop() listener actors + override protected val manageLifeCycleOfListeners = false + + /** + * ******************************************* + * Main actor receive() method + * ******************************************* + */ + override final protected def receive: Receive = { + case TimeoutMarker(gen) => + if (generation == gen) { + processEvent(StateTimeout) + } + case t@Timer(name, msg, repeat, generation) => + if ((timers contains name) && (timers(name).generation == generation)) { + processEvent(msg) + if (!repeat) { + timers -= name + } + } + case SubscribeTransitionCallBack(actorRef) => + addListener(actorRef) + // send current state back as reference point + try { + actorRef ! CurrentState(self, currentState.stateName) + } catch { + case e: ActorInitializationException => + EventHandler.warning(this, "trying to register not running listener") + } + case UnsubscribeTransitionCallBack(actorRef) => + removeListener(actorRef) + case value => { + if (timeoutFuture.isDefined) { + timeoutFuture.get.cancel(true) + timeoutFuture = None + } + generation += 1 + processEvent(value) + } + } + + private def processEvent(value: Any) = { + val event = Event(value, currentState.stateData) + val stateFunc = stateFunctions(currentState.stateName) + val nextState = if (stateFunc isDefinedAt event) { + stateFunc(event) + } else { + // handleEventDefault ensures that this is always defined + handleEvent(event) + } + nextState.stopReason match { + case Some(reason) => terminate(reason) + case None => makeTransition(nextState) + } + } + + private def makeTransition(nextState: State) = { + if (!stateFunctions.contains(nextState.stateName)) { + terminate(Failure("Next state %s does not exist".format(nextState.stateName))) + } else { + if (currentState.stateName != nextState.stateName) { + handleTransition(currentState.stateName, nextState.stateName) + notifyListeners(Transition(self, currentState.stateName, nextState.stateName)) + } + applyState(nextState) + } + } + + private def applyState(nextState: State) = { + currentState = nextState + val timeout = if (currentState.timeout.isDefined) currentState.timeout else stateTimeouts(currentState.stateName) + if (timeout.isDefined) { + val t = timeout.get + if (t.finite_? && t.length >= 0) { + timeoutFuture = Some(Scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit)) + } + } + } + + private def terminate(reason: Reason) = { + terminateEvent.apply(StopEvent(reason, currentState.stateName, currentState.stateData)) + self.stop() + } + + case class Event[D](event: Any, stateData: D) + object Ev { + def unapply[D](e: Event[D]): Option[Any] = Some(e.event) + } + + case class State(stateName: S, stateData: D, timeout: Timeout = None) { + + /** + * Modify state transition descriptor to include a state timeout for the + * next state. This timeout overrides any default timeout set for the next + * state. + */ + def forMax(timeout: Duration): State = { + copy(timeout = Some(timeout)) + } + + /** + * Send reply to sender of the current message, if available. + * + * @return this state transition descriptor + */ + def replying(replyValue: Any): State = { + self.sender match { + case Some(sender) => sender ! replyValue + case None => + } + this + } + + /** + * Modify state transition descriptor with new state data. The data will be + * set when transitioning to the new state. + */ + def using(nextStateDate: D): State = { + copy(stateData = nextStateDate) + } + + private[akka] var stopReason: Option[Reason] = None + + private[akka] def withStopReason(reason: Reason): State = { + stopReason = Some(reason) + this + } + } + + case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D) +} |