summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala
blob: bc3f29ac68c2d4f670d8ac8b595d143365c69523 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
/**
 * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
 */

package akka.dispatch

import akka.event.EventHandler
import akka.actor.{ ActorRef, IllegalActorStateException }
import akka.util.{ ReflectiveAccess, Switch }

import java.util.Queue
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue }

/**
 * Default settings are:
 * <pre/>
 *   - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
 *   - NR_START_THREADS = 16
 *   - NR_MAX_THREADS = 128
 *   - KEEP_ALIVE_TIME = 60000L // one minute
 * </pre>
 * <p/>
 *
 * The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
 * There is a default thread pool defined but make use of the builder if you need it. Here are some examples.
 * <p/>
 *
 * Scala API.
 * <p/>
 * Example usage:
 * <pre/>
 *   val dispatcher = new ExecutorBasedEventDrivenDispatcher("name")
 *   dispatcher
 *     .withNewThreadPoolWithBoundedBlockingQueue(100)
 *     .setCorePoolSize(16)
 *     .setMaxPoolSize(128)
 *     .setKeepAliveTimeInMillis(60000)
 *     .setRejectionPolicy(new CallerRunsPolicy)
 *     .buildThreadPool
 * </pre>
 * <p/>
 *
 * Java API.
 * <p/>
 * Example usage:
 * <pre/>
 *   ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name");
 *   dispatcher
 *     .withNewThreadPoolWithBoundedBlockingQueue(100)
 *     .setCorePoolSize(16)
 *     .setMaxPoolSize(128)
 *     .setKeepAliveTimeInMillis(60000)
 *     .setRejectionPolicy(new CallerRunsPolicy())
 *     .buildThreadPool();
 * </pre>
 * <p/>
 *
 * But the preferred way of creating dispatchers is to use
 * the {@link akka.dispatch.Dispatchers} factory object.
 *
 * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
 * @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
 *                   mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher
 *                   always continues until the mailbox is empty.
 *                   Larger values (or zero or negative) increase throughput, smaller values increase fairness
 */
class ExecutorBasedEventDrivenDispatcher(
  _name: String,
  val throughput: Int = Dispatchers.THROUGHPUT,
  val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
  val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
  val config: ThreadPoolConfig = ThreadPoolConfig())
  extends MessageDispatcher {

  def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
    this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage

  def this(_name: String, throughput: Int, mailboxType: MailboxType) =
    this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage

  def this(_name: String, throughput: Int) =
    this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage

  def this(_name: String, _config: ThreadPoolConfig) =
    this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)

  def this(_name: String) =
    this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage

  val name = "akka:event-driven:dispatcher:" + _name

  private[akka] val threadFactory = new MonitorableThreadFactory(name)
  private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))

  private[akka] def dispatch(invocation: MessageInvocation) = {
    val mbox = getMailbox(invocation.receiver)
    mbox enqueue invocation
    registerForExecution(mbox)
  }

  private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit = if (active.isOn) {
    try executorService.get() execute invocation
    catch {
      case e: RejectedExecutionException =>
        EventHandler.warning(this, e.toString)
        throw e
    }
  }

  /**
   * @return the mailbox associated with the actor
   */
  protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]

  override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size

  def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
    case b: UnboundedMailbox =>
      new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox {
        @inline
        final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
        @inline
        final def enqueue(m: MessageInvocation) = this.add(m)
        @inline
        final def dequeue(): MessageInvocation = this.poll()
      }
    case b: BoundedMailbox =>
      new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox {
        @inline
        final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
      }
  }

  private[akka] def start {}

  private[akka] def shutdown {
    val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
    if (old ne null) {
      old.shutdownNow()
    }
  }

  private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
    if (mbox.dispatcherLock.tryLock()) {
      if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended
        try {
          executorService.get() execute mbox
        } catch {
          case e: RejectedExecutionException =>
            EventHandler.warning(this, e.toString)
            mbox.dispatcherLock.unlock()
            throw e
        }
      } else {
        mbox.dispatcherLock.unlock() //If the dispatcher isn't active or if the actor is suspended, unlock the dispatcher lock
      }
    }
  }

  private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
    registerForExecution(mbox)

  override val toString = getClass.getSimpleName + "[" + name + "]"

  def suspend(actorRef: ActorRef) {
    getMailbox(actorRef).suspended.tryLock
  }

  def resume(actorRef: ActorRef) {
    val mbox = getMailbox(actorRef)
    mbox.suspended.tryUnlock
    reRegisterForExecution(mbox)
  }
}

/**
 * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox.
 */
trait ExecutableMailbox extends Runnable { self: MessageQueue =>

  def dispatcher: ExecutorBasedEventDrivenDispatcher

  final def run = {
    try {
      processMailbox()
    } catch {
      case ie: InterruptedException =>
    }
    finally {
      dispatcherLock.unlock()
    }
    if (!self.isEmpty)
      dispatcher.reRegisterForExecution(this)
  }

  /**
   * Process the messages in the mailbox
   *
   * @return true if the processing finished before the mailbox was empty, due to the throughput constraint
   */
  final def processMailbox() {
    if (!self.suspended.locked) {
      var nextMessage = self.dequeue
      if (nextMessage ne null) { //If we have a message
        if (dispatcher.throughput <= 1) //If we only run one message per process
          nextMessage.invoke //Just run it
        else { //But otherwise, if we are throttled, we need to do some book-keeping
          var processedMessages = 0
          val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0
          val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime)
          else 0
          do {
            nextMessage.invoke
            nextMessage =
              if (self.suspended.locked) {
                null // If we are suspended, abort
              } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
                processedMessages += 1
                if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
                  null //We reached our boundaries, abort
                else self.dequeue //Dequeue the next message
              }
          } while (nextMessage ne null)
        }
      }
    }
  }
}

object PriorityGenerator {
  /**
   * Creates a PriorityGenerator that uses the supplied function as priority generator
   */
  def apply(priorityFunction: Any => Int): PriorityGenerator = new PriorityGenerator {
    def gen(message: Any): Int = priorityFunction(message)
  }
}

/**
 * A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a
 * PriorityExecutorBasedEventDrivenDispatcher
 */
abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] {
  def gen(message: Any): Int

  final def compare(thisMessage: MessageInvocation, thatMessage: MessageInvocation): Int =
    gen(thisMessage.message) - gen(thatMessage.message)
}

/**
 * A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox,
 * prioritized according to the supplied comparator.
 *
 * The dispatcher will process the messages with the _lowest_ priority first.
 */
class PriorityExecutorBasedEventDrivenDispatcher(
  name: String,
  val comparator: java.util.Comparator[MessageInvocation],
  throughput: Int = Dispatchers.THROUGHPUT,
  throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
  mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
  config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox {

  def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
    this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage

  def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) =
    this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage

  def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
    this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage

  def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) =
    this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config)

  def this(name: String, comparator: java.util.Comparator[MessageInvocation]) =
    this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
}

/**
 * Can be used to give an ExecutorBasedEventDrivenDispatcher's actors priority-enabled mailboxes
 *
 * Usage:
 * new ExecutorBasedEventDrivenDispatcher(...) with PriorityMailbox {
 *   val comparator = ...comparator that determines mailbox priority ordering...
 * }
 */
trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher =>
  def comparator: java.util.Comparator[MessageInvocation]

  override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match {
    case b: UnboundedMailbox =>
      new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox {
        @inline
        final def dispatcher = self
      }

    case b: BoundedMailbox =>
      new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, comparator) with ExecutableMailbox {
        @inline
        final def dispatcher = self
      }
  }
}