diff options
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala')
-rw-r--r-- | test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala | 305 |
1 files changed, 0 insertions, 305 deletions
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala deleted file mode 100644 index bc3f29ac68..0000000000 --- a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ /dev/null @@ -1,305 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.dispatch - -import akka.event.EventHandler -import akka.actor.{ ActorRef, IllegalActorStateException } -import akka.util.{ ReflectiveAccess, Switch } - -import java.util.Queue -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue } - -/** - * Default settings are: - * <pre/> - * - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity - * - NR_START_THREADS = 16 - * - NR_MAX_THREADS = 128 - * - KEEP_ALIVE_TIME = 60000L // one minute - * </pre> - * <p/> - * - * The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case. - * There is a default thread pool defined but make use of the builder if you need it. Here are some examples. - * <p/> - * - * Scala API. - * <p/> - * Example usage: - * <pre/> - * val dispatcher = new ExecutorBasedEventDrivenDispatcher("name") - * dispatcher - * .withNewThreadPoolWithBoundedBlockingQueue(100) - * .setCorePoolSize(16) - * .setMaxPoolSize(128) - * .setKeepAliveTimeInMillis(60000) - * .setRejectionPolicy(new CallerRunsPolicy) - * .buildThreadPool - * </pre> - * <p/> - * - * Java API. - * <p/> - * Example usage: - * <pre/> - * ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name"); - * dispatcher - * .withNewThreadPoolWithBoundedBlockingQueue(100) - * .setCorePoolSize(16) - * .setMaxPoolSize(128) - * .setKeepAliveTimeInMillis(60000) - * .setRejectionPolicy(new CallerRunsPolicy()) - * .buildThreadPool(); - * </pre> - * <p/> - * - * But the preferred way of creating dispatchers is to use - * the {@link akka.dispatch.Dispatchers} factory object. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - * @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the - * mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher - * always continues until the mailbox is empty. - * Larger values (or zero or negative) increase throughput, smaller values increase fairness - */ -class ExecutorBasedEventDrivenDispatcher( - _name: String, - val throughput: Int = Dispatchers.THROUGHPUT, - val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, - val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, - val config: ThreadPoolConfig = ThreadPoolConfig()) - extends MessageDispatcher { - - def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = - this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage - - def this(_name: String, throughput: Int, mailboxType: MailboxType) = - this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage - - def this(_name: String, throughput: Int) = - this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - - def this(_name: String, _config: ThreadPoolConfig) = - this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config) - - def this(_name: String) = - this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - - val name = "akka:event-driven:dispatcher:" + _name - - private[akka] val threadFactory = new MonitorableThreadFactory(name) - private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) - - private[akka] def dispatch(invocation: MessageInvocation) = { - val mbox = getMailbox(invocation.receiver) - mbox enqueue invocation - registerForExecution(mbox) - } - - private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit = if (active.isOn) { - try executorService.get() execute invocation - catch { - case e: RejectedExecutionException => - EventHandler.warning(this, e.toString) - throw e - } - } - - /** - * @return the mailbox associated with the actor - */ - protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] - - override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - - def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { - case b: UnboundedMailbox => - new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox { - @inline - final def dispatcher = ExecutorBasedEventDrivenDispatcher.this - @inline - final def enqueue(m: MessageInvocation) = this.add(m) - @inline - final def dequeue(): MessageInvocation = this.poll() - } - case b: BoundedMailbox => - new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox { - @inline - final def dispatcher = ExecutorBasedEventDrivenDispatcher.this - } - } - - private[akka] def start {} - - private[akka] def shutdown { - val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) - if (old ne null) { - old.shutdownNow() - } - } - - private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { - if (mbox.dispatcherLock.tryLock()) { - if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended - try { - executorService.get() execute mbox - } catch { - case e: RejectedExecutionException => - EventHandler.warning(this, e.toString) - mbox.dispatcherLock.unlock() - throw e - } - } else { - mbox.dispatcherLock.unlock() //If the dispatcher isn't active or if the actor is suspended, unlock the dispatcher lock - } - } - } - - private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = - registerForExecution(mbox) - - override val toString = getClass.getSimpleName + "[" + name + "]" - - def suspend(actorRef: ActorRef) { - getMailbox(actorRef).suspended.tryLock - } - - def resume(actorRef: ActorRef) { - val mbox = getMailbox(actorRef) - mbox.suspended.tryUnlock - reRegisterForExecution(mbox) - } -} - -/** - * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox. - */ -trait ExecutableMailbox extends Runnable { self: MessageQueue => - - def dispatcher: ExecutorBasedEventDrivenDispatcher - - final def run = { - try { - processMailbox() - } catch { - case ie: InterruptedException => - } - finally { - dispatcherLock.unlock() - } - if (!self.isEmpty) - dispatcher.reRegisterForExecution(this) - } - - /** - * Process the messages in the mailbox - * - * @return true if the processing finished before the mailbox was empty, due to the throughput constraint - */ - final def processMailbox() { - if (!self.suspended.locked) { - var nextMessage = self.dequeue - if (nextMessage ne null) { //If we have a message - if (dispatcher.throughput <= 1) //If we only run one message per process - nextMessage.invoke //Just run it - else { //But otherwise, if we are throttled, we need to do some book-keeping - var processedMessages = 0 - val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0 - val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) - else 0 - do { - nextMessage.invoke - nextMessage = - if (self.suspended.locked) { - null // If we are suspended, abort - } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries - processedMessages += 1 - if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out - null //We reached our boundaries, abort - else self.dequeue //Dequeue the next message - } - } while (nextMessage ne null) - } - } - } - } -} - -object PriorityGenerator { - /** - * Creates a PriorityGenerator that uses the supplied function as priority generator - */ - def apply(priorityFunction: Any => Int): PriorityGenerator = new PriorityGenerator { - def gen(message: Any): Int = priorityFunction(message) - } -} - -/** - * A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a - * PriorityExecutorBasedEventDrivenDispatcher - */ -abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] { - def gen(message: Any): Int - - final def compare(thisMessage: MessageInvocation, thatMessage: MessageInvocation): Int = - gen(thisMessage.message) - gen(thatMessage.message) -} - -/** - * A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox, - * prioritized according to the supplied comparator. - * - * The dispatcher will process the messages with the _lowest_ priority first. - */ -class PriorityExecutorBasedEventDrivenDispatcher( - name: String, - val comparator: java.util.Comparator[MessageInvocation], - throughput: Int = Dispatchers.THROUGHPUT, - throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, - mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, - config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox { - - def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = - this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage - - def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) = - this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage - - def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) = - this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - - def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) = - this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config) - - def this(name: String, comparator: java.util.Comparator[MessageInvocation]) = - this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage -} - -/** - * Can be used to give an ExecutorBasedEventDrivenDispatcher's actors priority-enabled mailboxes - * - * Usage: - * new ExecutorBasedEventDrivenDispatcher(...) with PriorityMailbox { - * val comparator = ...comparator that determines mailbox priority ordering... - * } - */ -trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher => - def comparator: java.util.Comparator[MessageInvocation] - - override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match { - case b: UnboundedMailbox => - new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox { - @inline - final def dispatcher = self - } - - case b: BoundedMailbox => - new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, comparator) with ExecutableMailbox { - @inline - final def dispatcher = self - } - } -} |