/** * Copyright (C) 2009-2011 Scalable Solutions AB */ package akka.dispatch import akka.event.EventHandler import akka.actor.{ ActorRef, IllegalActorStateException } import akka.util.{ ReflectiveAccess, Switch } import java.util.Queue import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue } /** * Default settings are: *
 *   - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
 *   - NR_START_THREADS = 16
 *   - NR_MAX_THREADS = 128
 *   - KEEP_ALIVE_TIME = 60000L // one minute
 * 
*

* * The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case. * There is a default thread pool defined but make use of the builder if you need it. Here are some examples. *

* * Scala API. *

* Example usage: *

 *   val dispatcher = new ExecutorBasedEventDrivenDispatcher("name")
 *   dispatcher
 *     .withNewThreadPoolWithBoundedBlockingQueue(100)
 *     .setCorePoolSize(16)
 *     .setMaxPoolSize(128)
 *     .setKeepAliveTimeInMillis(60000)
 *     .setRejectionPolicy(new CallerRunsPolicy)
 *     .buildThreadPool
 * 
*

* * Java API. *

* Example usage: *

 *   ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name");
 *   dispatcher
 *     .withNewThreadPoolWithBoundedBlockingQueue(100)
 *     .setCorePoolSize(16)
 *     .setMaxPoolSize(128)
 *     .setKeepAliveTimeInMillis(60000)
 *     .setRejectionPolicy(new CallerRunsPolicy())
 *     .buildThreadPool();
 * 
*

* * But the preferred way of creating dispatchers is to use * the {@link akka.dispatch.Dispatchers} factory object. * * @author Jonas Bonér * @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the * mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher * always continues until the mailbox is empty. * Larger values (or zero or negative) increase throughput, smaller values increase fairness */ class ExecutorBasedEventDrivenDispatcher( _name: String, val throughput: Int = Dispatchers.THROUGHPUT, val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, val config: ThreadPoolConfig = ThreadPoolConfig()) extends MessageDispatcher { def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage def this(_name: String, throughput: Int, mailboxType: MailboxType) = this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage def this(_name: String, _config: ThreadPoolConfig) = this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config) def this(_name: String) = this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage val name = "akka:event-driven:dispatcher:" + _name private[akka] val threadFactory = new MonitorableThreadFactory(name) private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) private[akka] def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) mbox enqueue invocation registerForExecution(mbox) } private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit = if (active.isOn) { try executorService.get() execute invocation catch { case e: RejectedExecutionException => EventHandler.warning(this, e.toString) throw e } } /** * @return the mailbox associated with the actor */ protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { case b: UnboundedMailbox => new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox { @inline final def dispatcher = ExecutorBasedEventDrivenDispatcher.this @inline final def enqueue(m: MessageInvocation) = this.add(m) @inline final def dequeue(): MessageInvocation = this.poll() } case b: BoundedMailbox => new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox { @inline final def dispatcher = ExecutorBasedEventDrivenDispatcher.this } } private[akka] def start {} private[akka] def shutdown { val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) if (old ne null) { old.shutdownNow() } } private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { if (mbox.dispatcherLock.tryLock()) { if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended try { executorService.get() execute mbox } catch { case e: RejectedExecutionException => EventHandler.warning(this, e.toString) mbox.dispatcherLock.unlock() throw e } } else { mbox.dispatcherLock.unlock() //If the dispatcher isn't active or if the actor is suspended, unlock the dispatcher lock } } } private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = registerForExecution(mbox) override val toString = getClass.getSimpleName + "[" + name + "]" def suspend(actorRef: ActorRef) { getMailbox(actorRef).suspended.tryLock } def resume(actorRef: ActorRef) { val mbox = getMailbox(actorRef) mbox.suspended.tryUnlock reRegisterForExecution(mbox) } } /** * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox. */ trait ExecutableMailbox extends Runnable { self: MessageQueue => def dispatcher: ExecutorBasedEventDrivenDispatcher final def run = { try { processMailbox() } catch { case ie: InterruptedException => } finally { dispatcherLock.unlock() } if (!self.isEmpty) dispatcher.reRegisterForExecution(this) } /** * Process the messages in the mailbox * * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ final def processMailbox() { if (!self.suspended.locked) { var nextMessage = self.dequeue if (nextMessage ne null) { //If we have a message if (dispatcher.throughput <= 1) //If we only run one message per process nextMessage.invoke //Just run it else { //But otherwise, if we are throttled, we need to do some book-keeping var processedMessages = 0 val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0 val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0 do { nextMessage.invoke nextMessage = if (self.suspended.locked) { null // If we are suspended, abort } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries processedMessages += 1 if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out null //We reached our boundaries, abort else self.dequeue //Dequeue the next message } } while (nextMessage ne null) } } } } } object PriorityGenerator { /** * Creates a PriorityGenerator that uses the supplied function as priority generator */ def apply(priorityFunction: Any => Int): PriorityGenerator = new PriorityGenerator { def gen(message: Any): Int = priorityFunction(message) } } /** * A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a * PriorityExecutorBasedEventDrivenDispatcher */ abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] { def gen(message: Any): Int final def compare(thisMessage: MessageInvocation, thatMessage: MessageInvocation): Int = gen(thisMessage.message) - gen(thatMessage.message) } /** * A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox, * prioritized according to the supplied comparator. * * The dispatcher will process the messages with the _lowest_ priority first. */ class PriorityExecutorBasedEventDrivenDispatcher( name: String, val comparator: java.util.Comparator[MessageInvocation], throughput: Int = Dispatchers.THROUGHPUT, throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox { def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) = this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) = this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) = this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config) def this(name: String, comparator: java.util.Comparator[MessageInvocation]) = this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage } /** * Can be used to give an ExecutorBasedEventDrivenDispatcher's actors priority-enabled mailboxes * * Usage: * new ExecutorBasedEventDrivenDispatcher(...) with PriorityMailbox { * val comparator = ...comparator that determines mailbox priority ordering... * } */ trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher => def comparator: java.util.Comparator[MessageInvocation] override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match { case b: UnboundedMailbox => new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox { @inline final def dispatcher = self } case b: BoundedMailbox => new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, comparator) with ExecutableMailbox { @inline final def dispatcher = self } } }