summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/routing
diff options
context:
space:
mode:
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/routing')
-rw-r--r--test/disabled/presentation/akka/src/akka/routing/Iterators.scala49
-rw-r--r--test/disabled/presentation/akka/src/akka/routing/Listeners.scala37
-rw-r--r--test/disabled/presentation/akka/src/akka/routing/Pool.scala292
-rw-r--r--test/disabled/presentation/akka/src/akka/routing/Routers.scala87
-rw-r--r--test/disabled/presentation/akka/src/akka/routing/Routing.scala64
5 files changed, 0 insertions, 529 deletions
diff --git a/test/disabled/presentation/akka/src/akka/routing/Iterators.scala b/test/disabled/presentation/akka/src/akka/routing/Iterators.scala
deleted file mode 100644
index 315e7bea51..0000000000
--- a/test/disabled/presentation/akka/src/akka/routing/Iterators.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.routing
-
-import akka.actor.ActorRef
-import scala.collection.JavaConversions._
-import scala.collection.immutable.Seq
-
-/**
- * An Iterator that is either always empty or yields an infinite number of Ts.
- */
-trait InfiniteIterator[T] extends Iterator[T] {
- val items: Seq[T]
-}
-
-/**
- * CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List.
- */
-case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] {
- def this(items: java.util.List[T]) = this(items.toList)
-
- @volatile
- private[this] var current: Seq[T] = items
-
- def hasNext = items != Nil
-
- def next = {
- val nc = if (current == Nil) items else current
- current = nc.tail
- nc.head
- }
-
- override def exists(f: T => Boolean): Boolean = items.exists(f)
-}
-
-/**
- * This InfiniteIterator always returns the Actor that has the currently smallest mailbox
- * useful for work-stealing.
- */
-case class SmallestMailboxFirstIterator(val items: Seq[ActorRef]) extends InfiniteIterator[ActorRef] {
- def this(items: java.util.List[ActorRef]) = this(items.toList)
- def hasNext = items != Nil
-
- def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2)
-
- override def exists(f: ActorRef => Boolean): Boolean = items.exists(f)
-}
diff --git a/test/disabled/presentation/akka/src/akka/routing/Listeners.scala b/test/disabled/presentation/akka/src/akka/routing/Listeners.scala
deleted file mode 100644
index 04f6c1259f..0000000000
--- a/test/disabled/presentation/akka/src/akka/routing/Listeners.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.routing
-
-import akka.actor.{ Actor, ActorRef }
-import java.util.concurrent.ConcurrentSkipListSet
-import scala.collection.JavaConversions._
-
-sealed trait ListenerMessage
-case class Listen(listener: ActorRef) extends ListenerMessage
-case class Deafen(listener: ActorRef) extends ListenerMessage
-case class WithListeners(f: (ActorRef) => Unit) extends ListenerMessage
-
-/**
- * Listeners is a generic trait to implement listening capability on an Actor.
- * <p/>
- * Use the <code>gossip(msg)</code> method to have it sent to the listeners.
- * <p/>
- * Send <code>Listen(self)</code> to start listening.
- * <p/>
- * Send <code>Deafen(self)</code> to stop listening.
- * <p/>
- * Send <code>WithListeners(fun)</code> to traverse the current listeners.
- */
-trait Listeners { self: Actor =>
- private val listeners = new ConcurrentSkipListSet[ActorRef]
-
- protected def listenerManagement: Receive = {
- case Listen(l) => listeners add l
- case Deafen(l) => listeners remove l
- case WithListeners(f) => listeners foreach f
- }
-
- protected def gossip(msg: Any) = listeners foreach (_ ! msg)
-}
diff --git a/test/disabled/presentation/akka/src/akka/routing/Pool.scala b/test/disabled/presentation/akka/src/akka/routing/Pool.scala
deleted file mode 100644
index d972bb84c8..0000000000
--- a/test/disabled/presentation/akka/src/akka/routing/Pool.scala
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.routing
-
-import akka.actor.{ Actor, ActorRef, PoisonPill }
-import java.util.concurrent.TimeUnit
-
-/**
- * Actor pooling
- *
- * An actor pool is an message router for a set of delegate actors. The pool is an actor itself.
- * There are a handful of basic concepts that need to be understood when working with and defining your pool.
- *
- * Selectors - A selector is a trait that determines how and how many pooled actors will receive an incoming message.
- * Capacitors - A capacitor is a trait that influences the size of pool. There are effectively two types.
- * The first determines the size itself - either fixed or bounded.
- * The second determines how to adjust of the pool according to some internal pressure characteristic.
- * Filters - A filter can be used to refine the raw pressure value returned from a capacitor.
- *
- * It should be pointed out that all actors in the pool are treated as essentially equivalent. This is not to say
- * that one couldn't instance different classes within the pool, only that the pool, when selecting and routing,
- * will not take any type information into consideration.
- *
- * @author Garrick Evans
- */
-
-object ActorPool {
- case object Stat
- case class Stats(size: Int)
-}
-
-/**
- * Defines the nature of an actor pool.
- */
-trait ActorPool {
- def instance(): ActorRef //Question, Instance of what?
- def capacity(delegates: Seq[ActorRef]): Int //Question, What is the semantics of this return value?
- def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] //Question, Why does select return this instead of an ordered Set?
-}
-
-/**
- * A default implementation of a pool, on each message to route,
- * - checks the current capacity and adjusts accordingly if needed
- * - routes the incoming message to a selection set of delegate actors
- */
-trait DefaultActorPool extends ActorPool { this: Actor =>
- import ActorPool._
- import collection.mutable.LinkedList
- import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached
-
- protected var _delegates = Vector[ActorRef]()
- private var _lastCapacityChange = 0
- private var _lastSelectorCount = 0
-
- override def postStop() = _delegates foreach { delegate =>
- try {
- delegate ! PoisonPill
- } catch { case e: Exception => } //Ignore any exceptions here
- }
-
- protected def _route(): Receive = {
- // for testing...
- case Stat =>
- self reply_? Stats(_delegates length)
- case max: MaximumNumberOfRestartsWithinTimeRangeReached =>
- _delegates = _delegates filterNot { _.uuid == max.victim.uuid }
- case msg =>
- resizeIfAppropriate()
-
- select(_delegates) match {
- case (selectedDelegates, count) =>
- _lastSelectorCount = count
- selectedDelegates foreach { _ forward msg } //Should we really send the same message to several actors?
- }
- }
-
- private def resizeIfAppropriate() {
- val requestedCapacity = capacity(_delegates)
- val newDelegates = requestedCapacity match {
- case qty if qty > 0 =>
- _delegates ++ {
- for (i ← 0 until requestedCapacity) yield {
- val delegate = instance()
- self startLink delegate
- delegate
- }
- }
- case qty if qty < 0 =>
- _delegates.splitAt(_delegates.length + requestedCapacity) match {
- case (keep, abandon) =>
- abandon foreach { _ ! PoisonPill }
- keep
- }
- case _ => _delegates //No change
- }
-
- _lastCapacityChange = requestedCapacity
- _delegates = newDelegates
- }
-}
-
-/**
- * Selectors
- * These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool
- */
-
-/**
- * Returns the set of delegates with the least amount of message backlog.
- */
-trait SmallestMailboxSelector {
- def selectionCount: Int
- def partialFill: Boolean
-
- def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = {
- var set: Seq[ActorRef] = Nil
- var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount
-
- while (take > 0) {
- set = delegates.sortWith(_.mailboxSize < _.mailboxSize).take(take) ++ set //Question, doesn't this risk selecting the same actor multiple times?
- take -= set.size
- }
-
- (set.iterator, set.size)
- }
-}
-
-/**
- * Returns the set of delegates that occur sequentially 'after' the last delegate from the previous selection
- */
-trait RoundRobinSelector {
- private var _last: Int = -1;
-
- def selectionCount: Int
- def partialFill: Boolean
-
- def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = {
- val length = delegates.length
- val take = if (partialFill) math.min(selectionCount, length)
- else selectionCount
-
- val set =
- for (i ← 0 until take) yield {
- _last = (_last + 1) % length
- delegates(_last)
- }
-
- (set.iterator, set.size)
- }
-}
-
-/**
- * Capacitors
- * These traits define how to alter the size of the pool
- */
-
-/**
- * Ensures a fixed number of delegates in the pool
- */
-trait FixedSizeCapacitor {
- def limit: Int
- def capacity(delegates: Seq[ActorRef]): Int = (limit - delegates.size) max 0
-}
-
-/**
- * Constrains the pool capacity to a bounded range
- */
-trait BoundedCapacitor {
- def lowerBound: Int
- def upperBound: Int
-
- def capacity(delegates: Seq[ActorRef]): Int = {
- val current = delegates length
- val delta = _eval(delegates)
- val proposed = current + delta
-
- if (proposed < lowerBound) delta + (lowerBound - proposed)
- else if (proposed > upperBound) delta - (proposed - upperBound)
- else delta
- }
-
- protected def _eval(delegates: Seq[ActorRef]): Int
-}
-
-/**
- * Returns the number of delegates required to manage the current message backlogs
- */
-trait MailboxPressureCapacitor {
- def pressureThreshold: Int
- def pressure(delegates: Seq[ActorRef]): Int =
- delegates count { _.mailboxSize > pressureThreshold }
-}
-
-/**
- * Returns the number of delegates required to respond to the number of pending futures
- */
-trait ActiveFuturesPressureCapacitor {
- def pressure(delegates: Seq[ActorRef]): Int =
- delegates count { _.senderFuture.isDefined }
-}
-
-/**
- */
-trait CapacityStrategy {
- import ActorPool._
-
- def pressure(delegates: Seq[ActorRef]): Int
- def filter(pressure: Int, capacity: Int): Int
-
- protected def _eval(delegates: Seq[ActorRef]): Int = filter(pressure(delegates), delegates.size)
-}
-
-trait FixedCapacityStrategy extends FixedSizeCapacitor
-trait BoundedCapacityStrategy extends CapacityStrategy with BoundedCapacitor
-
-/**
- * Filters
- * These traits refine the raw pressure reading into a more appropriate capacity delta.
- */
-
-/**
- * The basic filter trait that composes ramp-up and back-off subfiltering.
- */
-trait Filter {
- def rampup(pressure: Int, capacity: Int): Int
- def backoff(pressure: Int, capacity: Int): Int
-
- // pass through both filters just to be sure any internal counters
- // are updated consistently. ramping up is always + and backing off
- // is always - and each should return 0 otherwise...
- def filter(pressure: Int, capacity: Int): Int =
- rampup(pressure, capacity) + backoff(pressure, capacity)
-}
-
-trait BasicFilter extends Filter with BasicRampup with BasicBackoff
-
-/**
- * Filter performs steady incremental growth using only the basic ramp-up subfilter
- */
-trait BasicNoBackoffFilter extends BasicRampup {
- def filter(pressure: Int, capacity: Int): Int = rampup(pressure, capacity)
-}
-
-/**
- * Basic incremental growth as a percentage of the current pool capacity
- */
-trait BasicRampup {
- def rampupRate: Double
-
- def rampup(pressure: Int, capacity: Int): Int =
- if (pressure < capacity) 0 else math.ceil(rampupRate * capacity) toInt
-}
-
-/**
- * Basic decrement as a percentage of the current pool capacity
- */
-trait BasicBackoff {
- def backoffThreshold: Double
- def backoffRate: Double
-
- def backoff(pressure: Int, capacity: Int): Int =
- if (capacity > 0 && pressure / capacity < backoffThreshold) math.ceil(-1.0 * backoffRate * capacity) toInt else 0
-}
-/**
- * This filter tracks the average pressure over the lifetime of the pool (or since last reset) and
- * will begin to reduce capacity once this value drops below the provided threshold. The number of
- * delegates to cull from the pool is determined by some scaling factor (the backoffRate) multiplied
- * by the difference in capacity and pressure.
- */
-trait RunningMeanBackoff {
- def backoffThreshold: Double
- def backoffRate: Double
-
- private var _pressure: Double = 0.0
- private var _capacity: Double = 0.0
-
- def backoff(pressure: Int, capacity: Int): Int = {
- _pressure += pressure
- _capacity += capacity
-
- if (capacity > 0 && pressure / capacity < backoffThreshold
- && _capacity > 0 && _pressure / _capacity < backoffThreshold) //Why does the entire clause need to be true?
- math.floor(-1.0 * backoffRate * (capacity - pressure)).toInt
- else 0
- }
-
- def backoffReset {
- _pressure = 0.0
- _capacity = 0.0
- }
-}
diff --git a/test/disabled/presentation/akka/src/akka/routing/Routers.scala b/test/disabled/presentation/akka/src/akka/routing/Routers.scala
deleted file mode 100644
index a4c34c5c67..0000000000
--- a/test/disabled/presentation/akka/src/akka/routing/Routers.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.routing
-
-import akka.actor.{ UntypedActor, Actor, ActorRef }
-
-/**
- * A Dispatcher is a trait whose purpose is to route incoming messages to actors.
- */
-trait Dispatcher { this: Actor =>
-
- protected def transform(msg: Any): Any = msg
-
- protected def routes: PartialFunction[Any, ActorRef]
-
- protected def broadcast(message: Any) {}
-
- protected def dispatch: Receive = {
- case Routing.Broadcast(message) =>
- broadcast(message)
- case a if routes.isDefinedAt(a) =>
- if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
- else routes(a).!(transform(a))(None)
- }
-
- def receive = dispatch
-
- private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
-}
-
-/**
- * An UntypedDispatcher is an abstract class whose purpose is to route incoming messages to actors.
- */
-abstract class UntypedDispatcher extends UntypedActor {
- protected def transform(msg: Any): Any = msg
-
- protected def route(msg: Any): ActorRef
-
- protected def broadcast(message: Any) {}
-
- private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
-
- @throws(classOf[Exception])
- def onReceive(msg: Any): Unit = {
- if (msg.isInstanceOf[Routing.Broadcast]) broadcast(msg.asInstanceOf[Routing.Broadcast].message)
- else {
- val r = route(msg)
- if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
- if (isSenderDefined) r.forward(transform(msg))(someSelf)
- else r.!(transform(msg))(None)
- }
- }
-}
-
-/**
- * A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets
- * to dispatch incoming messages to.
- */
-trait LoadBalancer extends Dispatcher { self: Actor =>
- protected def seq: InfiniteIterator[ActorRef]
-
- protected def routes = {
- case x if seq.hasNext => seq.next
- }
-
- override def broadcast(message: Any) = seq.items.foreach(_ ! message)
-
- override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
-}
-
-/**
- * A UntypedLoadBalancer is a specialized kind of UntypedDispatcher, that is supplied an InfiniteIterator of targets
- * to dispatch incoming messages to.
- */
-abstract class UntypedLoadBalancer extends UntypedDispatcher {
- protected def seq: InfiniteIterator[ActorRef]
-
- protected def route(msg: Any) =
- if (seq.hasNext) seq.next
- else null
-
- override def broadcast(message: Any) = seq.items.foreach(_ ! message)
-
- override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
-}
diff --git a/test/disabled/presentation/akka/src/akka/routing/Routing.scala b/test/disabled/presentation/akka/src/akka/routing/Routing.scala
deleted file mode 100644
index befc124248..0000000000
--- a/test/disabled/presentation/akka/src/akka/routing/Routing.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.routing
-
-import akka.actor.{ Actor, ActorRef }
-import akka.actor.Actor._
-
-object Routing {
-
- sealed trait RoutingMessage
- case class Broadcast(message: Any) extends RoutingMessage
-
- type PF[A, B] = PartialFunction[A, B]
-
- /**
- * Creates a new PartialFunction whose isDefinedAt is a combination
- * of the two parameters, and whose apply is first to call filter.apply
- * and then filtered.apply.
- */
- def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = {
- case a: A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) =>
- filter(a)
- filtered(a)
- }
-
- /**
- * Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true.
- */
- def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] =
- filter({ case a if a.isInstanceOf[A] => interceptor(a) }, interceptee)
-
- /**
- * Creates a LoadBalancer from the thunk-supplied InfiniteIterator.
- */
- def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef =
- actorOf(new Actor with LoadBalancer {
- val seq = actors
- }).start()
-
- /**
- * Creates a Dispatcher given a routing and a message-transforming function.
- */
- def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef =
- actorOf(new Actor with Dispatcher {
- override def transform(msg: Any) = msgTransformer(msg)
- def routes = routing
- }).start()
-
- /**
- * Creates a Dispatcher given a routing.
- */
- def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Dispatcher {
- def routes = routing
- }).start()
-
- /**
- * Creates an actor that pipes all incoming messages to
- * both another actor and through the supplied function
- */
- def loggerActor(actorToLog: ActorRef, logger: (Any) => Unit): ActorRef =
- dispatcherActor({ case _ => actorToLog }, logger)
-}