summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/routing/Pool.scala
blob: d972bb84c855c11a501fa281e4989d3eaa12483d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
/**
 * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
 */

package akka.routing

import akka.actor.{ Actor, ActorRef, PoisonPill }
import java.util.concurrent.TimeUnit

/**
 * Actor pooling
 *
 * An actor pool is an message router for a set of delegate actors. The pool is an actor itself.
 * There are a handful of basic concepts that need to be understood when working with and defining your pool.
 *
 * Selectors - A selector is a trait that determines how and how many pooled actors will receive an incoming message.
 * Capacitors - A capacitor is a trait that influences the size of pool.  There are effectively two types.
 *                              The first determines the size itself - either fixed or bounded.
 *                              The second determines how to adjust of the pool according to some internal pressure characteristic.
 * Filters - A filter can be used to refine the raw pressure value returned from a capacitor.
 *
 * It should be pointed out that all actors in the pool are treated as essentially equivalent.  This is not to say
 * that one couldn't instance different classes within the pool, only that the pool, when selecting and routing,
 * will not take any type information into consideration.
 *
 * @author Garrick Evans
 */

object ActorPool {
  case object Stat
  case class Stats(size: Int)
}

/**
 * Defines the nature of an actor pool.
 */
trait ActorPool {
  def instance(): ActorRef //Question, Instance of what?
  def capacity(delegates: Seq[ActorRef]): Int //Question, What is the semantics of this return value?
  def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] //Question, Why does select return this instead of an ordered Set?
}

/**
 * A default implementation of a pool, on each message to route,
 *      - checks the current capacity and adjusts accordingly if needed
 *      - routes the incoming message to a selection set of delegate actors
 */
trait DefaultActorPool extends ActorPool { this: Actor =>
  import ActorPool._
  import collection.mutable.LinkedList
  import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached

  protected var _delegates = Vector[ActorRef]()
  private var _lastCapacityChange = 0
  private var _lastSelectorCount = 0

  override def postStop() = _delegates foreach { delegate =>
    try {
      delegate ! PoisonPill
    } catch { case e: Exception => } //Ignore any exceptions here
  }

  protected def _route(): Receive = {
    // for testing...
    case Stat =>
      self reply_? Stats(_delegates length)
    case max: MaximumNumberOfRestartsWithinTimeRangeReached =>
      _delegates = _delegates filterNot { _.uuid == max.victim.uuid }
    case msg =>
      resizeIfAppropriate()

      select(_delegates) match {
        case (selectedDelegates, count) =>
          _lastSelectorCount = count
          selectedDelegates foreach { _ forward msg } //Should we really send the same message to several actors?
      }
  }

  private def resizeIfAppropriate() {
    val requestedCapacity = capacity(_delegates)
    val newDelegates = requestedCapacity match {
      case qty if qty > 0 =>
        _delegates ++ {
          for (i  0 until requestedCapacity) yield {
            val delegate = instance()
            self startLink delegate
            delegate
          }
        }
      case qty if qty < 0 =>
        _delegates.splitAt(_delegates.length + requestedCapacity) match {
          case (keep, abandon) =>
            abandon foreach { _ ! PoisonPill }
            keep
        }
      case _ => _delegates //No change
    }

    _lastCapacityChange = requestedCapacity
    _delegates = newDelegates
  }
}

/**
 * Selectors
 *      These traits define how, when a message needs to be routed, delegate(s) are chosen from the pool
 */

/**
 * Returns the set of delegates with the least amount of message backlog.
 */
trait SmallestMailboxSelector {
  def selectionCount: Int
  def partialFill: Boolean

  def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = {
    var set: Seq[ActorRef] = Nil
    var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount

    while (take > 0) {
      set = delegates.sortWith(_.mailboxSize < _.mailboxSize).take(take) ++ set //Question, doesn't this risk selecting the same actor multiple times?
      take -= set.size
    }

    (set.iterator, set.size)
  }
}

/**
 * Returns the set of delegates that occur sequentially 'after' the last delegate from the previous selection
 */
trait RoundRobinSelector {
  private var _last: Int = -1;

  def selectionCount: Int
  def partialFill: Boolean

  def select(delegates: Seq[ActorRef]): Tuple2[Iterator[ActorRef], Int] = {
    val length = delegates.length
    val take = if (partialFill) math.min(selectionCount, length)
    else selectionCount

    val set =
      for (i  0 until take) yield {
        _last = (_last + 1) % length
        delegates(_last)
      }

    (set.iterator, set.size)
  }
}

/**
 * Capacitors
 *      These traits define how to alter the size of the pool
 */

/**
 * Ensures a fixed number of delegates in the pool
 */
trait FixedSizeCapacitor {
  def limit: Int
  def capacity(delegates: Seq[ActorRef]): Int = (limit - delegates.size) max 0
}

/**
 * Constrains the pool capacity to a bounded range
 */
trait BoundedCapacitor {
  def lowerBound: Int
  def upperBound: Int

  def capacity(delegates: Seq[ActorRef]): Int = {
    val current = delegates length
    val delta = _eval(delegates)
    val proposed = current + delta

    if (proposed < lowerBound) delta + (lowerBound - proposed)
    else if (proposed > upperBound) delta - (proposed - upperBound)
    else delta
  }

  protected def _eval(delegates: Seq[ActorRef]): Int
}

/**
 * Returns the number of delegates required to manage the current message backlogs
 */
trait MailboxPressureCapacitor {
  def pressureThreshold: Int
  def pressure(delegates: Seq[ActorRef]): Int =
    delegates count { _.mailboxSize > pressureThreshold }
}

/**
 * Returns the number of delegates required to respond to the number of pending futures
 */
trait ActiveFuturesPressureCapacitor {
  def pressure(delegates: Seq[ActorRef]): Int =
    delegates count { _.senderFuture.isDefined }
}

/**
 */
trait CapacityStrategy {
  import ActorPool._

  def pressure(delegates: Seq[ActorRef]): Int
  def filter(pressure: Int, capacity: Int): Int

  protected def _eval(delegates: Seq[ActorRef]): Int = filter(pressure(delegates), delegates.size)
}

trait FixedCapacityStrategy extends FixedSizeCapacitor
trait BoundedCapacityStrategy extends CapacityStrategy with BoundedCapacitor

/**
 * Filters
 *  These traits refine the raw pressure reading into a more appropriate capacity delta.
 */

/**
 * The basic filter trait that composes ramp-up and back-off subfiltering.
 */
trait Filter {
  def rampup(pressure: Int, capacity: Int): Int
  def backoff(pressure: Int, capacity: Int): Int

  // pass through both filters just to be sure any internal counters
  // are updated consistently. ramping up is always + and backing off
  // is always - and each should return 0 otherwise...
  def filter(pressure: Int, capacity: Int): Int =
    rampup(pressure, capacity) + backoff(pressure, capacity)
}

trait BasicFilter extends Filter with BasicRampup with BasicBackoff

/**
 * Filter performs steady incremental growth using only the basic ramp-up subfilter
 */
trait BasicNoBackoffFilter extends BasicRampup {
  def filter(pressure: Int, capacity: Int): Int = rampup(pressure, capacity)
}

/**
 * Basic incremental growth as a percentage of the current pool capacity
 */
trait BasicRampup {
  def rampupRate: Double

  def rampup(pressure: Int, capacity: Int): Int =
    if (pressure < capacity) 0 else math.ceil(rampupRate * capacity) toInt
}

/**
 * Basic decrement as a percentage of the current pool capacity
 */
trait BasicBackoff {
  def backoffThreshold: Double
  def backoffRate: Double

  def backoff(pressure: Int, capacity: Int): Int =
    if (capacity > 0 && pressure / capacity < backoffThreshold) math.ceil(-1.0 * backoffRate * capacity) toInt else 0
}
/**
 * This filter tracks the average pressure over the lifetime of the pool (or since last reset) and
 * will begin to reduce capacity once this value drops below the provided threshold.  The number of
 * delegates to cull from the pool is determined by some scaling factor (the backoffRate) multiplied
 * by the difference in capacity and pressure.
 */
trait RunningMeanBackoff {
  def backoffThreshold: Double
  def backoffRate: Double

  private var _pressure: Double = 0.0
  private var _capacity: Double = 0.0

  def backoff(pressure: Int, capacity: Int): Int = {
    _pressure += pressure
    _capacity += capacity

    if (capacity > 0 && pressure / capacity < backoffThreshold
      && _capacity > 0 && _pressure / _capacity < backoffThreshold) //Why does the entire clause need to be true?
      math.floor(-1.0 * backoffRate * (capacity - pressure)).toInt
    else 0
  }

  def backoffReset {
    _pressure = 0.0
    _capacity = 0.0
  }
}