diff options
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/routing')
5 files changed, 0 insertions, 529 deletions
diff --git a/test/disabled/presentation/akka/src/akka/routing/Iterators.scala b/test/disabled/presentation/akka/src/akka/routing/Iterators.scala deleted file mode 100644 index 315e7bea51..0000000000 --- a/test/disabled/presentation/akka/src/akka/routing/Iterators.scala +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.routing - -import akka.actor.ActorRef -import scala.collection.JavaConversions._ -import scala.collection.immutable.Seq - -/** - * An Iterator that is either always empty or yields an infinite number of Ts. - */ -trait InfiniteIterator[T] extends Iterator[T] { - val items: Seq[T] -} - -/** - * CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List. - */ -case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] { - def this(items: java.util.List[T]) = this(items.toList) - - @volatile - private[this] var current: Seq[T] = items - - def hasNext = items != Nil - - def next = { - val nc = if (current == Nil) items else current - current = nc.tail - nc.head - } - - override def exists(f: T => Boolean): Boolean = items.exists(f) -} - -/** - * This InfiniteIterator always returns the Actor that has the currently smallest mailbox - * useful for work-stealing. - */ -case class SmallestMailboxFirstIterator(val items: Seq[ActorRef]) extends InfiniteIterator[ActorRef] { - def this(items: java.util.List[ActorRef]) = this(items.toList) - def hasNext = items != Nil - - def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2) - - override def exists(f: ActorRef => Boolean): Boolean = items.exists(f) -} diff --git a/test/disabled/presentation/akka/src/akka/routing/Listeners.scala b/test/disabled/presentation/akka/src/akka/routing/Listeners.scala deleted file mode 100644 index 04f6c1259f..0000000000 --- a/test/disabled/presentation/akka/src/akka/routing/Listeners.scala +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.routing - -import akka.actor.{ Actor, ActorRef } -import java.util.concurrent.ConcurrentSkipListSet -import scala.collection.JavaConversions._ - -sealed trait ListenerMessage -case class Listen(listener: ActorRef) extends ListenerMessage -case class Deafen(listener: ActorRef) extends ListenerMessage -case class WithListeners(f: (ActorRef) => Unit) extends ListenerMessage - -/** - * Listeners is a generic trait to implement listening capability on an Actor. - * <p/> - * Use the <code>gossip(msg)</code> method to have it sent to the listeners. - * <p/> - * Send <code>Listen(self)</code> to start listening. - * <p/> - * Send <code>Deafen(self)</code> to stop listening. - * <p/> - * Send <code>WithListeners(fun)</code> to traverse the current listeners. - */ -trait Listeners { self: Actor => - private val listeners = new ConcurrentSkipListSet[ActorRef] - - protected def listenerManagement: Receive = { - case Listen(l) => listeners add l - case Deafen(l) => listeners remove l - case WithListeners(f) => listeners foreach f - } - - protected def gossip(msg: Any) = listeners foreach (_ ! msg) -} diff --git a/test/disabled/presentation/akka/src/akka/routing/Pool.scala b/test/disabled/presentation/akka/src/akka/routing/Pool.scala deleted file mode 100644 index d972bb84c8..0000000000 --- a/test/disabled/presentation/akka/src/akka/routing/Pool.scala +++ /dev/null @@ -1,292 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.routing - -import akka.actor.{ Actor, ActorRef, PoisonPill } -import java.util.concurrent.TimeUnit - -/** - * Actor pooling - * - * An actor pool is an message router for a set of delegate actors. The pool is an actor itself. - * There are a handful of basic concepts that need to be understood when working with and defining your pool. - * - * Selectors - A selector is a trait that determines how and how many pooled actors will receive an incoming message. - * Capacitors - A capacitor is a trait that influences the size of pool. There are effectively two types. - * The first determines the size itself - either fixed or bounded. - * The second determines how to adjust of the pool according to some internal pressure characteristic. - * Filters - A filter can be used to refine the raw pressure value returned from a capacitor. - * - * It should be pointed out that all actors in the pool are treated as essentially equivalent. This is not to say - * that one couldn't instance different classes within the pool, only that the pool, when selecting and routing, - * will not take any type information into consideration. - * - * @author Garrick Evans - */ - -object ActorPool { - case object Stat - case class Stats(size: Int) -} - -/** - * Defines the nature of an actor pool. - */ -trait ActorPool { - def instance(): ActorRef //Question, Instance of what? - def capacity(delegates: Seq[ActorRef]): Int //Question, What is the semantics of this return value? - def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] //Question, Why does select return this instead of an ordered Set? -} - -/** - * A default implementation of a pool, on each message to route, - * - checks the current capacity and adjusts accordingly if needed - * - routes the incoming message to a selection set of delegate actors - */ -trait DefaultActorPool extends ActorPool { this: Actor => - import ActorPool._ - import collection.mutable.LinkedList - import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached - - protected var _delegates = Vector[ActorRef]() - private var _lastCapacityChange = 0 - private var _lastSelectorCount = 0 - - override def postStop() = _delegates foreach { delegate => - try { - delegate ! PoisonPill - } catch { case e: Exception => } //Ignore any exceptions here - } - - protected def _route(): Receive = { - // for testing... - case Stat => - self reply_? Stats(_delegates length) - case max: MaximumNumberOfRestartsWithinTimeRangeReached => - _delegates = _delegates filterNot { _.uuid == max.victim.uuid } - case msg => - resizeIfAppropriate() - - select(_delegates) match { - case (selectedDelegates, count) => - _lastSelectorCount = count - selectedDelegates foreach { _ forward msg } //Should we really send the same message to several actors? - } - } - - private def resizeIfAppropriate() { - val requestedCapacity = capacity(_delegates) - val newDelegates = requestedCapacity match { - case qty if qty > 0 => - _delegates ++ { - for (i ← 0 until requestedCapacity) yield { - val delegate = instance() - self startLink delegate - delegate - } - } - case qty if qty < 0 => - _delegates.splitAt(_delegates.length + requestedCapacity) match { - case (keep, abandon) => - abandon foreach { _ ! PoisonPill } - keep - } - case _ => _delegates //No change - } - - _lastCapacityChange = requestedCapacity - _delegates = newDelegates - } -} - -/** - * Selectors - * These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool - */ - -/** - * Returns the set of delegates with the least amount of message backlog. - */ -trait SmallestMailboxSelector { - def selectionCount: Int - def partialFill: Boolean - - def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = { - var set: Seq[ActorRef] = Nil - var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount - - while (take > 0) { - set = delegates.sortWith(_.mailboxSize < _.mailboxSize).take(take) ++ set //Question, doesn't this risk selecting the same actor multiple times? - take -= set.size - } - - (set.iterator, set.size) - } -} - -/** - * Returns the set of delegates that occur sequentially 'after' the last delegate from the previous selection - */ -trait RoundRobinSelector { - private var _last: Int = -1; - - def selectionCount: Int - def partialFill: Boolean - - def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = { - val length = delegates.length - val take = if (partialFill) math.min(selectionCount, length) - else selectionCount - - val set = - for (i ← 0 until take) yield { - _last = (_last + 1) % length - delegates(_last) - } - - (set.iterator, set.size) - } -} - -/** - * Capacitors - * These traits define how to alter the size of the pool - */ - -/** - * Ensures a fixed number of delegates in the pool - */ -trait FixedSizeCapacitor { - def limit: Int - def capacity(delegates: Seq[ActorRef]): Int = (limit - delegates.size) max 0 -} - -/** - * Constrains the pool capacity to a bounded range - */ -trait BoundedCapacitor { - def lowerBound: Int - def upperBound: Int - - def capacity(delegates: Seq[ActorRef]): Int = { - val current = delegates length - val delta = _eval(delegates) - val proposed = current + delta - - if (proposed < lowerBound) delta + (lowerBound - proposed) - else if (proposed > upperBound) delta - (proposed - upperBound) - else delta - } - - protected def _eval(delegates: Seq[ActorRef]): Int -} - -/** - * Returns the number of delegates required to manage the current message backlogs - */ -trait MailboxPressureCapacitor { - def pressureThreshold: Int - def pressure(delegates: Seq[ActorRef]): Int = - delegates count { _.mailboxSize > pressureThreshold } -} - -/** - * Returns the number of delegates required to respond to the number of pending futures - */ -trait ActiveFuturesPressureCapacitor { - def pressure(delegates: Seq[ActorRef]): Int = - delegates count { _.senderFuture.isDefined } -} - -/** - */ -trait CapacityStrategy { - import ActorPool._ - - def pressure(delegates: Seq[ActorRef]): Int - def filter(pressure: Int, capacity: Int): Int - - protected def _eval(delegates: Seq[ActorRef]): Int = filter(pressure(delegates), delegates.size) -} - -trait FixedCapacityStrategy extends FixedSizeCapacitor -trait BoundedCapacityStrategy extends CapacityStrategy with BoundedCapacitor - -/** - * Filters - * These traits refine the raw pressure reading into a more appropriate capacity delta. - */ - -/** - * The basic filter trait that composes ramp-up and back-off subfiltering. - */ -trait Filter { - def rampup(pressure: Int, capacity: Int): Int - def backoff(pressure: Int, capacity: Int): Int - - // pass through both filters just to be sure any internal counters - // are updated consistently. ramping up is always + and backing off - // is always - and each should return 0 otherwise... - def filter(pressure: Int, capacity: Int): Int = - rampup(pressure, capacity) + backoff(pressure, capacity) -} - -trait BasicFilter extends Filter with BasicRampup with BasicBackoff - -/** - * Filter performs steady incremental growth using only the basic ramp-up subfilter - */ -trait BasicNoBackoffFilter extends BasicRampup { - def filter(pressure: Int, capacity: Int): Int = rampup(pressure, capacity) -} - -/** - * Basic incremental growth as a percentage of the current pool capacity - */ -trait BasicRampup { - def rampupRate: Double - - def rampup(pressure: Int, capacity: Int): Int = - if (pressure < capacity) 0 else math.ceil(rampupRate * capacity) toInt -} - -/** - * Basic decrement as a percentage of the current pool capacity - */ -trait BasicBackoff { - def backoffThreshold: Double - def backoffRate: Double - - def backoff(pressure: Int, capacity: Int): Int = - if (capacity > 0 && pressure / capacity < backoffThreshold) math.ceil(-1.0 * backoffRate * capacity) toInt else 0 -} -/** - * This filter tracks the average pressure over the lifetime of the pool (or since last reset) and - * will begin to reduce capacity once this value drops below the provided threshold. The number of - * delegates to cull from the pool is determined by some scaling factor (the backoffRate) multiplied - * by the difference in capacity and pressure. - */ -trait RunningMeanBackoff { - def backoffThreshold: Double - def backoffRate: Double - - private var _pressure: Double = 0.0 - private var _capacity: Double = 0.0 - - def backoff(pressure: Int, capacity: Int): Int = { - _pressure += pressure - _capacity += capacity - - if (capacity > 0 && pressure / capacity < backoffThreshold - && _capacity > 0 && _pressure / _capacity < backoffThreshold) //Why does the entire clause need to be true? - math.floor(-1.0 * backoffRate * (capacity - pressure)).toInt - else 0 - } - - def backoffReset { - _pressure = 0.0 - _capacity = 0.0 - } -} diff --git a/test/disabled/presentation/akka/src/akka/routing/Routers.scala b/test/disabled/presentation/akka/src/akka/routing/Routers.scala deleted file mode 100644 index a4c34c5c67..0000000000 --- a/test/disabled/presentation/akka/src/akka/routing/Routers.scala +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.routing - -import akka.actor.{ UntypedActor, Actor, ActorRef } - -/** - * A Dispatcher is a trait whose purpose is to route incoming messages to actors. - */ -trait Dispatcher { this: Actor => - - protected def transform(msg: Any): Any = msg - - protected def routes: PartialFunction[Any, ActorRef] - - protected def broadcast(message: Any) {} - - protected def dispatch: Receive = { - case Routing.Broadcast(message) => - broadcast(message) - case a if routes.isDefinedAt(a) => - if (isSenderDefined) routes(a).forward(transform(a))(someSelf) - else routes(a).!(transform(a))(None) - } - - def receive = dispatch - - private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined -} - -/** - * An UntypedDispatcher is an abstract class whose purpose is to route incoming messages to actors. - */ -abstract class UntypedDispatcher extends UntypedActor { - protected def transform(msg: Any): Any = msg - - protected def route(msg: Any): ActorRef - - protected def broadcast(message: Any) {} - - private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined - - @throws(classOf[Exception]) - def onReceive(msg: Any): Unit = { - if (msg.isInstanceOf[Routing.Broadcast]) broadcast(msg.asInstanceOf[Routing.Broadcast].message) - else { - val r = route(msg) - if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!") - if (isSenderDefined) r.forward(transform(msg))(someSelf) - else r.!(transform(msg))(None) - } - } -} - -/** - * A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets - * to dispatch incoming messages to. - */ -trait LoadBalancer extends Dispatcher { self: Actor => - protected def seq: InfiniteIterator[ActorRef] - - protected def routes = { - case x if seq.hasNext => seq.next - } - - override def broadcast(message: Any) = seq.items.foreach(_ ! message) - - override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg)) -} - -/** - * A UntypedLoadBalancer is a specialized kind of UntypedDispatcher, that is supplied an InfiniteIterator of targets - * to dispatch incoming messages to. - */ -abstract class UntypedLoadBalancer extends UntypedDispatcher { - protected def seq: InfiniteIterator[ActorRef] - - protected def route(msg: Any) = - if (seq.hasNext) seq.next - else null - - override def broadcast(message: Any) = seq.items.foreach(_ ! message) - - override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg)) -} diff --git a/test/disabled/presentation/akka/src/akka/routing/Routing.scala b/test/disabled/presentation/akka/src/akka/routing/Routing.scala deleted file mode 100644 index befc124248..0000000000 --- a/test/disabled/presentation/akka/src/akka/routing/Routing.scala +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.routing - -import akka.actor.{ Actor, ActorRef } -import akka.actor.Actor._ - -object Routing { - - sealed trait RoutingMessage - case class Broadcast(message: Any) extends RoutingMessage - - type PF[A, B] = PartialFunction[A, B] - - /** - * Creates a new PartialFunction whose isDefinedAt is a combination - * of the two parameters, and whose apply is first to call filter.apply - * and then filtered.apply. - */ - def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = { - case a: A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) => - filter(a) - filtered(a) - } - - /** - * Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true. - */ - def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = - filter({ case a if a.isInstanceOf[A] => interceptor(a) }, interceptee) - - /** - * Creates a LoadBalancer from the thunk-supplied InfiniteIterator. - */ - def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef = - actorOf(new Actor with LoadBalancer { - val seq = actors - }).start() - - /** - * Creates a Dispatcher given a routing and a message-transforming function. - */ - def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef = - actorOf(new Actor with Dispatcher { - override def transform(msg: Any) = msgTransformer(msg) - def routes = routing - }).start() - - /** - * Creates a Dispatcher given a routing. - */ - def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Dispatcher { - def routes = routing - }).start() - - /** - * Creates an actor that pipes all incoming messages to - * both another actor and through the supplied function - */ - def loggerActor(actorToLog: ActorRef, logger: (Any) => Unit): ActorRef = - dispatcherActor({ case _ => actorToLog }, logger) -} |