diff options
Diffstat (limited to 'test/files/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala')
-rw-r--r-- | test/files/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala | 305 |
1 files changed, 305 insertions, 0 deletions
diff --git a/test/files/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/test/files/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala new file mode 100644 index 0000000000..bc3f29ac68 --- /dev/null +++ b/test/files/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -0,0 +1,305 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> + */ + +package akka.dispatch + +import akka.event.EventHandler +import akka.actor.{ ActorRef, IllegalActorStateException } +import akka.util.{ ReflectiveAccess, Switch } + +import java.util.Queue +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue } + +/** + * Default settings are: + * <pre/> + * - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + * - NR_START_THREADS = 16 + * - NR_MAX_THREADS = 128 + * - KEEP_ALIVE_TIME = 60000L // one minute + * </pre> + * <p/> + * + * The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case. + * There is a default thread pool defined but make use of the builder if you need it. Here are some examples. + * <p/> + * + * Scala API. + * <p/> + * Example usage: + * <pre/> + * val dispatcher = new ExecutorBasedEventDrivenDispatcher("name") + * dispatcher + * .withNewThreadPoolWithBoundedBlockingQueue(100) + * .setCorePoolSize(16) + * .setMaxPoolSize(128) + * .setKeepAliveTimeInMillis(60000) + * .setRejectionPolicy(new CallerRunsPolicy) + * .buildThreadPool + * </pre> + * <p/> + * + * Java API. + * <p/> + * Example usage: + * <pre/> + * ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name"); + * dispatcher + * .withNewThreadPoolWithBoundedBlockingQueue(100) + * .setCorePoolSize(16) + * .setMaxPoolSize(128) + * .setKeepAliveTimeInMillis(60000) + * .setRejectionPolicy(new CallerRunsPolicy()) + * .buildThreadPool(); + * </pre> + * <p/> + * + * But the preferred way of creating dispatchers is to use + * the {@link akka.dispatch.Dispatchers} factory object. + * + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + * @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the + * mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher + * always continues until the mailbox is empty. + * Larger values (or zero or negative) increase throughput, smaller values increase fairness + */ +class ExecutorBasedEventDrivenDispatcher( + _name: String, + val throughput: Int = Dispatchers.THROUGHPUT, + val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, + val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, + val config: ThreadPoolConfig = ThreadPoolConfig()) + extends MessageDispatcher { + + def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = + this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage + + def this(_name: String, throughput: Int, mailboxType: MailboxType) = + this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage + + def this(_name: String, throughput: Int) = + this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + def this(_name: String, _config: ThreadPoolConfig) = + this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config) + + def this(_name: String) = + this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + val name = "akka:event-driven:dispatcher:" + _name + + private[akka] val threadFactory = new MonitorableThreadFactory(name) + private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) + + private[akka] def dispatch(invocation: MessageInvocation) = { + val mbox = getMailbox(invocation.receiver) + mbox enqueue invocation + registerForExecution(mbox) + } + + private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit = if (active.isOn) { + try executorService.get() execute invocation + catch { + case e: RejectedExecutionException => + EventHandler.warning(this, e.toString) + throw e + } + } + + /** + * @return the mailbox associated with the actor + */ + protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] + + override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size + + def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { + case b: UnboundedMailbox => + new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox { + @inline + final def dispatcher = ExecutorBasedEventDrivenDispatcher.this + @inline + final def enqueue(m: MessageInvocation) = this.add(m) + @inline + final def dequeue(): MessageInvocation = this.poll() + } + case b: BoundedMailbox => + new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox { + @inline + final def dispatcher = ExecutorBasedEventDrivenDispatcher.this + } + } + + private[akka] def start {} + + private[akka] def shutdown { + val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) + if (old ne null) { + old.shutdownNow() + } + } + + private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { + if (mbox.dispatcherLock.tryLock()) { + if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended + try { + executorService.get() execute mbox + } catch { + case e: RejectedExecutionException => + EventHandler.warning(this, e.toString) + mbox.dispatcherLock.unlock() + throw e + } + } else { + mbox.dispatcherLock.unlock() //If the dispatcher isn't active or if the actor is suspended, unlock the dispatcher lock + } + } + } + + private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = + registerForExecution(mbox) + + override val toString = getClass.getSimpleName + "[" + name + "]" + + def suspend(actorRef: ActorRef) { + getMailbox(actorRef).suspended.tryLock + } + + def resume(actorRef: ActorRef) { + val mbox = getMailbox(actorRef) + mbox.suspended.tryUnlock + reRegisterForExecution(mbox) + } +} + +/** + * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox. + */ +trait ExecutableMailbox extends Runnable { self: MessageQueue => + + def dispatcher: ExecutorBasedEventDrivenDispatcher + + final def run = { + try { + processMailbox() + } catch { + case ie: InterruptedException => + } + finally { + dispatcherLock.unlock() + } + if (!self.isEmpty) + dispatcher.reRegisterForExecution(this) + } + + /** + * Process the messages in the mailbox + * + * @return true if the processing finished before the mailbox was empty, due to the throughput constraint + */ + final def processMailbox() { + if (!self.suspended.locked) { + var nextMessage = self.dequeue + if (nextMessage ne null) { //If we have a message + if (dispatcher.throughput <= 1) //If we only run one message per process + nextMessage.invoke //Just run it + else { //But otherwise, if we are throttled, we need to do some book-keeping + var processedMessages = 0 + val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0 + val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) + else 0 + do { + nextMessage.invoke + nextMessage = + if (self.suspended.locked) { + null // If we are suspended, abort + } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries + processedMessages += 1 + if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out + null //We reached our boundaries, abort + else self.dequeue //Dequeue the next message + } + } while (nextMessage ne null) + } + } + } + } +} + +object PriorityGenerator { + /** + * Creates a PriorityGenerator that uses the supplied function as priority generator + */ + def apply(priorityFunction: Any => Int): PriorityGenerator = new PriorityGenerator { + def gen(message: Any): Int = priorityFunction(message) + } +} + +/** + * A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a + * PriorityExecutorBasedEventDrivenDispatcher + */ +abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] { + def gen(message: Any): Int + + final def compare(thisMessage: MessageInvocation, thatMessage: MessageInvocation): Int = + gen(thisMessage.message) - gen(thatMessage.message) +} + +/** + * A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox, + * prioritized according to the supplied comparator. + * + * The dispatcher will process the messages with the _lowest_ priority first. + */ +class PriorityExecutorBasedEventDrivenDispatcher( + name: String, + val comparator: java.util.Comparator[MessageInvocation], + throughput: Int = Dispatchers.THROUGHPUT, + throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, + mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, + config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox { + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = + this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) = + this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) = + this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) = + this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config) + + def this(name: String, comparator: java.util.Comparator[MessageInvocation]) = + this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage +} + +/** + * Can be used to give an ExecutorBasedEventDrivenDispatcher's actors priority-enabled mailboxes + * + * Usage: + * new ExecutorBasedEventDrivenDispatcher(...) with PriorityMailbox { + * val comparator = ...comparator that determines mailbox priority ordering... + * } + */ +trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher => + def comparator: java.util.Comparator[MessageInvocation] + + override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match { + case b: UnboundedMailbox => + new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox { + @inline + final def dispatcher = self + } + + case b: BoundedMailbox => + new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, comparator) with ExecutableMailbox { + @inline + final def dispatcher = self + } + } +} |