diff options
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/dispatch')
7 files changed, 0 insertions, 1336 deletions
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala b/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala deleted file mode 100644 index a567d0bcb0..0000000000 --- a/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.dispatch - -import akka.actor.{ Actor, ActorRef } -import akka.actor.newUuid -import akka.config.Config._ -import akka.util.{ Duration, ReflectiveAccess } - -import akka.config.Configuration - -import java.util.concurrent.TimeUnit - -/** - * Scala API. Dispatcher factory. - * <p/> - * Example usage: - * <pre/> - * val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name") - * dispatcher - * .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) - * .setCorePoolSize(16) - * .setMaxPoolSize(128) - * .setKeepAliveTimeInMillis(60000) - * .setRejectionPolicy(new CallerRunsPolicy) - * .build - * </pre> - * <p/> - * Java API. Dispatcher factory. - * <p/> - * Example usage: - * <pre/> - * MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name"); - * dispatcher - * .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) - * .setCorePoolSize(16) - * .setMaxPoolSize(128) - * .setKeepAliveTimeInMillis(60000) - * .setRejectionPolicy(new CallerRunsPolicy()) - * .build(); - * </pre> - * <p/> - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -object Dispatchers { - val THROUGHPUT = config.getInt("akka.actor.throughput", 5) - val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout"). - map(time => Duration(time, TIME_UNIT)). - getOrElse(Duration(1000, TimeUnit.MILLISECONDS)) - val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) - val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT) - val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time", -1), TIME_UNIT) - val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt - val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox() - - lazy val defaultGlobalDispatcher = { - config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) - } - - object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) - - /** - * Creates an thread based dispatcher serving a single actor through the same single thread. - * Uses the default timeout - * <p/> - * E.g. each actor consumes its own thread. - */ - def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor) - - /** - * Creates an thread based dispatcher serving a single actor through the same single thread. - * Uses the default timeout - * If capacity is negative, it's Integer.MAX_VALUE - * <p/> - * E.g. each actor consumes its own thread. - */ - def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int) = new ThreadBasedDispatcher(actor, mailboxCapacity) - - /** - * Creates an thread based dispatcher serving a single actor through the same single thread. - * If capacity is negative, it's Integer.MAX_VALUE - * <p/> - * E.g. each actor consumes its own thread. - */ - def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = - new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeOut) - - /** - * Creates an executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenDispatcher(name: String) = - ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenDispatcher(name, config), ThreadPoolConfig()) - - /** - * Creates an executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = - ThreadPoolConfigDispatcherBuilder(config => - new ExecutorBasedEventDrivenDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig()) - - /** - * Creates an executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = - ThreadPoolConfigDispatcherBuilder(config => - new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig()) - - /** - * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) = - ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, config), ThreadPoolConfig()) - - /** - * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int) = - ThreadPoolConfigDispatcherBuilder(config => - new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config), ThreadPoolConfig()) - - /** - * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = - ThreadPoolConfigDispatcherBuilder(config => - new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig()) - - /** - * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = - ThreadPoolConfigDispatcherBuilder(config => - new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig()) - /** - * Utility function that tries to load the specified dispatcher config from the akka.conf - * or else use the supplied default dispatcher - */ - def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = - config getSection key flatMap from getOrElse default - - /* - * Creates of obtains a dispatcher from a ConfigMap according to the format below - * - * default-dispatcher { - * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable - * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, - * # GlobalExecutorBasedEventDriven - * # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor - * keep-alive-time = 60 # Keep alive time for threads - * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) - * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) - * executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded - * allow-core-timeout = on # Allow core threads to time out - * rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard - * throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher - * } - * ex: from(config.getConfigMap(identifier).get) - * - * Gotcha: Only configures the dispatcher if possible - * Returns: None if "type" isn't specified in the config - * Throws: IllegalArgumentException if the value of "type" is not valid - * IllegalArgumentException if it cannot - */ - def from(cfg: Configuration): Option[MessageDispatcher] = { - cfg.getString("type") map { - case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcherConfigurator() - case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator() - case "GlobalExecutorBasedEventDriven" => GlobalExecutorBasedEventDrivenDispatcherConfigurator - case fqn => - ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match { - case r: Right[_, Class[MessageDispatcherConfigurator]] => - ReflectiveAccess.createInstance[MessageDispatcherConfigurator](r.b, Array[Class[_]](), Array[AnyRef]()) match { - case r: Right[Exception, MessageDispatcherConfigurator] => r.b - case l: Left[Exception, MessageDispatcherConfigurator] => - throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, l.a) - } - case l: Left[Exception, _] => - throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, l.a) - } - } map { - _ configure cfg - } - } -} - -object GlobalExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator { - def configure(config: Configuration): MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher -} - -class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator { - def configure(config: Configuration): MessageDispatcher = { - configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenDispatcher( - config.getString("name", newUuid.toString), - config.getInt("throughput", Dispatchers.THROUGHPUT), - config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), - mailboxType(config), - threadPoolConfig)).build - } -} - -class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends MessageDispatcherConfigurator { - def configure(config: Configuration): MessageDispatcher = { - configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher( - config.getString("name", newUuid.toString), - config.getInt("throughput", Dispatchers.THROUGHPUT), - config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), - mailboxType(config), - threadPoolConfig)).build - } -} diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala deleted file mode 100644 index bc3f29ac68..0000000000 --- a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ /dev/null @@ -1,305 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.dispatch - -import akka.event.EventHandler -import akka.actor.{ ActorRef, IllegalActorStateException } -import akka.util.{ ReflectiveAccess, Switch } - -import java.util.Queue -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue } - -/** - * Default settings are: - * <pre/> - * - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity - * - NR_START_THREADS = 16 - * - NR_MAX_THREADS = 128 - * - KEEP_ALIVE_TIME = 60000L // one minute - * </pre> - * <p/> - * - * The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case. - * There is a default thread pool defined but make use of the builder if you need it. Here are some examples. - * <p/> - * - * Scala API. - * <p/> - * Example usage: - * <pre/> - * val dispatcher = new ExecutorBasedEventDrivenDispatcher("name") - * dispatcher - * .withNewThreadPoolWithBoundedBlockingQueue(100) - * .setCorePoolSize(16) - * .setMaxPoolSize(128) - * .setKeepAliveTimeInMillis(60000) - * .setRejectionPolicy(new CallerRunsPolicy) - * .buildThreadPool - * </pre> - * <p/> - * - * Java API. - * <p/> - * Example usage: - * <pre/> - * ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name"); - * dispatcher - * .withNewThreadPoolWithBoundedBlockingQueue(100) - * .setCorePoolSize(16) - * .setMaxPoolSize(128) - * .setKeepAliveTimeInMillis(60000) - * .setRejectionPolicy(new CallerRunsPolicy()) - * .buildThreadPool(); - * </pre> - * <p/> - * - * But the preferred way of creating dispatchers is to use - * the {@link akka.dispatch.Dispatchers} factory object. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - * @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the - * mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher - * always continues until the mailbox is empty. - * Larger values (or zero or negative) increase throughput, smaller values increase fairness - */ -class ExecutorBasedEventDrivenDispatcher( - _name: String, - val throughput: Int = Dispatchers.THROUGHPUT, - val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, - val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, - val config: ThreadPoolConfig = ThreadPoolConfig()) - extends MessageDispatcher { - - def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = - this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage - - def this(_name: String, throughput: Int, mailboxType: MailboxType) = - this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage - - def this(_name: String, throughput: Int) = - this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - - def this(_name: String, _config: ThreadPoolConfig) = - this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config) - - def this(_name: String) = - this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - - val name = "akka:event-driven:dispatcher:" + _name - - private[akka] val threadFactory = new MonitorableThreadFactory(name) - private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) - - private[akka] def dispatch(invocation: MessageInvocation) = { - val mbox = getMailbox(invocation.receiver) - mbox enqueue invocation - registerForExecution(mbox) - } - - private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit = if (active.isOn) { - try executorService.get() execute invocation - catch { - case e: RejectedExecutionException => - EventHandler.warning(this, e.toString) - throw e - } - } - - /** - * @return the mailbox associated with the actor - */ - protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] - - override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - - def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { - case b: UnboundedMailbox => - new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox { - @inline - final def dispatcher = ExecutorBasedEventDrivenDispatcher.this - @inline - final def enqueue(m: MessageInvocation) = this.add(m) - @inline - final def dequeue(): MessageInvocation = this.poll() - } - case b: BoundedMailbox => - new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox { - @inline - final def dispatcher = ExecutorBasedEventDrivenDispatcher.this - } - } - - private[akka] def start {} - - private[akka] def shutdown { - val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) - if (old ne null) { - old.shutdownNow() - } - } - - private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { - if (mbox.dispatcherLock.tryLock()) { - if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended - try { - executorService.get() execute mbox - } catch { - case e: RejectedExecutionException => - EventHandler.warning(this, e.toString) - mbox.dispatcherLock.unlock() - throw e - } - } else { - mbox.dispatcherLock.unlock() //If the dispatcher isn't active or if the actor is suspended, unlock the dispatcher lock - } - } - } - - private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = - registerForExecution(mbox) - - override val toString = getClass.getSimpleName + "[" + name + "]" - - def suspend(actorRef: ActorRef) { - getMailbox(actorRef).suspended.tryLock - } - - def resume(actorRef: ActorRef) { - val mbox = getMailbox(actorRef) - mbox.suspended.tryUnlock - reRegisterForExecution(mbox) - } -} - -/** - * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox. - */ -trait ExecutableMailbox extends Runnable { self: MessageQueue => - - def dispatcher: ExecutorBasedEventDrivenDispatcher - - final def run = { - try { - processMailbox() - } catch { - case ie: InterruptedException => - } - finally { - dispatcherLock.unlock() - } - if (!self.isEmpty) - dispatcher.reRegisterForExecution(this) - } - - /** - * Process the messages in the mailbox - * - * @return true if the processing finished before the mailbox was empty, due to the throughput constraint - */ - final def processMailbox() { - if (!self.suspended.locked) { - var nextMessage = self.dequeue - if (nextMessage ne null) { //If we have a message - if (dispatcher.throughput <= 1) //If we only run one message per process - nextMessage.invoke //Just run it - else { //But otherwise, if we are throttled, we need to do some book-keeping - var processedMessages = 0 - val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0 - val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) - else 0 - do { - nextMessage.invoke - nextMessage = - if (self.suspended.locked) { - null // If we are suspended, abort - } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries - processedMessages += 1 - if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out - null //We reached our boundaries, abort - else self.dequeue //Dequeue the next message - } - } while (nextMessage ne null) - } - } - } - } -} - -object PriorityGenerator { - /** - * Creates a PriorityGenerator that uses the supplied function as priority generator - */ - def apply(priorityFunction: Any => Int): PriorityGenerator = new PriorityGenerator { - def gen(message: Any): Int = priorityFunction(message) - } -} - -/** - * A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a - * PriorityExecutorBasedEventDrivenDispatcher - */ -abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] { - def gen(message: Any): Int - - final def compare(thisMessage: MessageInvocation, thatMessage: MessageInvocation): Int = - gen(thisMessage.message) - gen(thatMessage.message) -} - -/** - * A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox, - * prioritized according to the supplied comparator. - * - * The dispatcher will process the messages with the _lowest_ priority first. - */ -class PriorityExecutorBasedEventDrivenDispatcher( - name: String, - val comparator: java.util.Comparator[MessageInvocation], - throughput: Int = Dispatchers.THROUGHPUT, - throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, - mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, - config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox { - - def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = - this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage - - def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) = - this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage - - def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) = - this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - - def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) = - this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config) - - def this(name: String, comparator: java.util.Comparator[MessageInvocation]) = - this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage -} - -/** - * Can be used to give an ExecutorBasedEventDrivenDispatcher's actors priority-enabled mailboxes - * - * Usage: - * new ExecutorBasedEventDrivenDispatcher(...) with PriorityMailbox { - * val comparator = ...comparator that determines mailbox priority ordering... - * } - */ -trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher => - def comparator: java.util.Comparator[MessageInvocation] - - override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match { - case b: UnboundedMailbox => - new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox { - @inline - final def dispatcher = self - } - - case b: BoundedMailbox => - new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, comparator) with ExecutableMailbox { - @inline - final def dispatcher = self - } - } -} diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala deleted file mode 100644 index 4cba8eec8b..0000000000 --- a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.dispatch - -import akka.actor.{ ActorRef, Actor, IllegalActorStateException } -import akka.util.{ ReflectiveAccess, Switch } - -import java.util.Queue -import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } -import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue } -import util.DynamicVariable - -/** - * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed - * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the - * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. - * <p/> - * Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably - * best described as "work donating" because the actor of which work is being stolen takes the initiative. - * <p/> - * The preferred way of creating dispatchers is to use - * the {@link akka.dispatch.Dispatchers} factory object. - * - * @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher - * @see akka.dispatch.Dispatchers - * - * @author Viktor Klang - */ -class ExecutorBasedEventDrivenWorkStealingDispatcher( - _name: String, - throughput: Int = Dispatchers.THROUGHPUT, - throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, - mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, - config: ThreadPoolConfig = ThreadPoolConfig()) - extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) { - - def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = - this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage - - def this(_name: String, throughput: Int, mailboxType: MailboxType) = - this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage - - def this(_name: String, throughput: Int) = - this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - - def this(_name: String, _config: ThreadPoolConfig) = - this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config) - - def this(_name: String, memberType: Class[_ <: Actor]) = - this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage - - def this(_name: String, mailboxType: MailboxType) = - this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage - - @volatile - private var actorType: Option[Class[_]] = None - @volatile - private var members = Vector[ActorRef]() - private val donationInProgress = new DynamicVariable(false) - - private[akka] override def register(actorRef: ActorRef) = { - //Verify actor type conformity - actorType match { - case None => actorType = Some(actorRef.actor.getClass) - case Some(aType) => - if (aType != actorRef.actor.getClass) - throw new IllegalActorStateException(String.format( - "Can't register actor %s in a work stealing dispatcher which already knows actors of type %s", - actorRef, aType)) - } - - synchronized { members :+= actorRef } //Update members - super.register(actorRef) - } - - private[akka] override def unregister(actorRef: ActorRef) = { - synchronized { members = members.filterNot(actorRef eq) } //Update members - super.unregister(actorRef) - } - - override private[akka] def dispatch(invocation: MessageInvocation) = { - val mbox = getMailbox(invocation.receiver) - if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) { - //We were busy and we got to donate the message to some other lucky guy, we're done here - } else { - mbox enqueue invocation - registerForExecution(mbox) - } - } - - override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { - try { - donationInProgress.value = true - while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor - } finally { donationInProgress.value = false } - - if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution - super.reRegisterForExecution(mbox) - } - - /** - * Returns true if it successfully donated a message - */ - protected def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Boolean = { - val actors = members // copy to prevent concurrent modifications having any impact - - // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means - // the dispatcher is being shut down... - // Starts at is seeded by current time - doFindDonorRecipient(donorMbox, actors, (System.currentTimeMillis % actors.size).asInstanceOf[Int]) match { - case null => false - case recipient => donate(donorMbox.dequeue, recipient) - } - } - - /** - * Returns true if the donation succeeded or false otherwise - */ - protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = try { - donationInProgress.value = true - val actors = members // copy to prevent concurrent modifications having any impact - doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match { - case null => false - case recipient => donate(message, recipient) - } - } finally { donationInProgress.value = false } - - /** - * Rewrites the message and adds that message to the recipients mailbox - * returns true if the message is non-null - */ - protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = { - if (organ ne null) { - if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any]( - organ.message, recipient.timeout, organ.sender, organ.senderFuture) - else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender) - else recipient.postMessageToMailbox(organ.message, None) - true - } else false - } - - /** - * Returns an available recipient for the message, if any - */ - protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorRef], startIndex: Int): ActorRef = { - val prSz = potentialRecipients.size - var i = 0 - var recipient: ActorRef = null - - while ((i < prSz) && (recipient eq null)) { - val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap - val mbox = getMailbox(actor) - - if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself - recipient = actor //Found! - } - - i += 1 - } - - recipient // nothing found, reuse same start index next time - } -} diff --git a/test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala b/test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala deleted file mode 100644 index 4c00577157..0000000000 --- a/test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.dispatch - -import akka.AkkaException - -import java.util.{ Comparator, PriorityQueue } -import java.util.concurrent._ -import akka.util._ - -class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) - -/** - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -trait MessageQueue { - val dispatcherLock = new SimpleLock - val suspended = new SimpleLock - def enqueue(handle: MessageInvocation) - def dequeue(): MessageInvocation - def size: Int - def isEmpty: Boolean -} - -/** - * Mailbox configuration. - */ -sealed trait MailboxType - -case class UnboundedMailbox() extends MailboxType -case class BoundedMailbox( - val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY }, - val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType { - if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") - if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") -} - -trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] => - @inline - final def enqueue(handle: MessageInvocation): Unit = this add handle - @inline - final def dequeue(): MessageInvocation = this.poll() -} - -trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] => - def pushTimeOut: Duration - - final def enqueue(handle: MessageInvocation) { - if (pushTimeOut.length > 0) { - this.offer(handle, pushTimeOut.length, pushTimeOut.unit) || { - throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) - } - } else this put handle - } - - @inline - final def dequeue(): MessageInvocation = this.poll() -} - -class DefaultUnboundedMessageQueue extends LinkedBlockingQueue[MessageInvocation] with UnboundedMessageQueueSemantics - -class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration) extends LinkedBlockingQueue[MessageInvocation](capacity) with BoundedMessageQueueSemantics - -class UnboundedPriorityMessageQueue(cmp: Comparator[MessageInvocation]) extends PriorityBlockingQueue[MessageInvocation](11, cmp) with UnboundedMessageQueueSemantics - -class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, cmp: Comparator[MessageInvocation]) extends BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with BoundedMessageQueueSemantics diff --git a/test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala b/test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala deleted file mode 100644 index 20887c3867..0000000000 --- a/test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.dispatch - -import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicLong -import akka.event.EventHandler -import akka.config.Configuration -import akka.config.Config.TIME_UNIT -import akka.util.{ Duration, Switch, ReentrantGuard } -import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy } -import akka.actor._ - -/** - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -final case class MessageInvocation(val receiver: ActorRef, - val message: Any, - val sender: Option[ActorRef], - val senderFuture: Option[CompletableFuture[Any]]) { - if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") - - def invoke = try { - receiver.invoke(this) - } catch { - case e: NullPointerException => throw new ActorInitializationException( - "Don't call 'self ! message' in the Actor's constructor (in Scala this means in the body of the class).") - } -} - -final case class FutureInvocation[T](future: CompletableFuture[T], function: () => T, cleanup: () => Unit) extends Runnable { - def run = { - future complete (try { - Right(function()) - } catch { - case e => - EventHandler.error(e, this, e.getMessage) - Left(e) - } - finally { - cleanup() - }) - } -} - -object MessageDispatcher { - val UNSCHEDULED = 0 - val SCHEDULED = 1 - val RESCHEDULED = 2 - - implicit def defaultGlobalDispatcher = Dispatchers.defaultGlobalDispatcher -} - -/** - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -trait MessageDispatcher { - import MessageDispatcher._ - - protected val uuids = new ConcurrentSkipListSet[Uuid] - protected val futures = new AtomicLong(0L) - protected val guard = new ReentrantGuard - protected val active = new Switch(false) - - private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard - - /** - * Creates and returns a mailbox for the given actor. - */ - private[akka] def createMailbox(actorRef: ActorRef): AnyRef - - /** - * Attaches the specified actorRef to this dispatcher - */ - final def attach(actorRef: ActorRef): Unit = guard withGuard { - register(actorRef) - } - - /** - * Detaches the specified actorRef from this dispatcher - */ - final def detach(actorRef: ActorRef): Unit = guard withGuard { - unregister(actorRef) - } - - private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = dispatch(invocation) - - private[akka] final def dispatchFuture[T](block: () => T, timeout: Long): Future[T] = { - futures.getAndIncrement() - try { - val future = new DefaultCompletableFuture[T](timeout) - - if (active.isOff) - guard withGuard { active.switchOn { start } } - - executeFuture(FutureInvocation[T](future, block, futureCleanup)) - future - } catch { - case e => - futures.decrementAndGet - throw e - } - } - - private val futureCleanup: () => Unit = - () => if (futures.decrementAndGet() == 0) { - guard withGuard { - if (futures.get == 0 && uuids.isEmpty) { - shutdownSchedule match { - case UNSCHEDULED => - shutdownSchedule = SCHEDULED - Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) - case SCHEDULED => - shutdownSchedule = RESCHEDULED - case RESCHEDULED => //Already marked for reschedule - } - } - } - } - - private[akka] def register(actorRef: ActorRef) { - if (actorRef.mailbox eq null) - actorRef.mailbox = createMailbox(actorRef) - - uuids add actorRef.uuid - if (active.isOff) { - active.switchOn { - start - } - } - } - - private[akka] def unregister(actorRef: ActorRef) = { - if (uuids remove actorRef.uuid) { - actorRef.mailbox = null - if (uuids.isEmpty && futures.get == 0) { - shutdownSchedule match { - case UNSCHEDULED => - shutdownSchedule = SCHEDULED - Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) - case SCHEDULED => - shutdownSchedule = RESCHEDULED - case RESCHEDULED => //Already marked for reschedule - } - } - } - } - - /** - * Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors - */ - def stopAllAttachedActors { - val i = uuids.iterator - while (i.hasNext()) { - val uuid = i.next() - Actor.registry.actorFor(uuid) match { - case Some(actor) => actor.stop() - case None => {} - } - } - } - - private val shutdownAction = new Runnable { - def run = guard withGuard { - shutdownSchedule match { - case RESCHEDULED => - shutdownSchedule = SCHEDULED - Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) - case SCHEDULED => - if (uuids.isEmpty && futures.get == 0) { - active switchOff { - shutdown // shut down in the dispatcher's references is zero - } - } - shutdownSchedule = UNSCHEDULED - case UNSCHEDULED => //Do nothing - } - } - } - - /** - * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms - * defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second - */ - private[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis - - /** - * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference - */ - def suspend(actorRef: ActorRef): Unit - - /* - * After the call to this method, the dispatcher must begin any new message processing for the specified reference - */ - def resume(actorRef: ActorRef): Unit - - /** - * Will be called when the dispatcher is to queue an invocation for execution - */ - private[akka] def dispatch(invocation: MessageInvocation): Unit - - private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit - - /** - * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown - */ - private[akka] def start(): Unit - - /** - * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached - */ - private[akka] def shutdown(): Unit - - /** - * Returns the size of the mailbox for the specified actor - */ - def mailboxSize(actorRef: ActorRef): Int - - /** - * Returns the amount of futures queued for execution - */ - def pendingFutures: Long = futures.get -} - -/** - * Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig - */ -abstract class MessageDispatcherConfigurator { - /** - * Returns an instance of MessageDispatcher given a Configuration - */ - def configure(config: Configuration): MessageDispatcher - - def mailboxType(config: Configuration): MailboxType = { - val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY) - if (capacity < 1) UnboundedMailbox() - else BoundedMailbox(capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT)) - } - - def configureThreadPool(config: Configuration, createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = { - import ThreadPoolConfigDispatcherBuilder.conf_? - - //Apply the following options to the config if they are present in the config - ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure( - conf_?(config getInt "keep-alive-time")(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))), - conf_?(config getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)), - conf_?(config getDouble "max-pool-size-factor")(factor => _.setMaxPoolSizeFromFactor(factor)), - conf_?(config getInt "executor-bounds")(bounds => _.setExecutorBounds(bounds)), - conf_?(config getBool "allow-core-timeout")(allow => _.setAllowCoreThreadTimeout(allow)), - conf_?(config getString "rejection-policy" map { - case "abort" => new AbortPolicy() - case "caller-runs" => new CallerRunsPolicy() - case "discard-oldest" => new DiscardOldestPolicy() - case "discard" => new DiscardPolicy() - case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) - })(policy => _.setRejectionPolicy(policy))) - } -} diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala deleted file mode 100644 index 3169c70ef9..0000000000 --- a/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.dispatch - -import akka.actor.{ Actor, ActorRef } -import akka.config.Config.config -import akka.util.Duration - -import java.util.Queue -import java.util.concurrent.{ ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue } -import akka.actor -import java.util.concurrent.atomic.AtomicReference - -/** - * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -class ThreadBasedDispatcher(_actor: ActorRef, _mailboxType: MailboxType) - extends ExecutorBasedEventDrivenDispatcher( - _actor.uuid.toString, Dispatchers.THROUGHPUT, -1, _mailboxType, ThreadBasedDispatcher.oneThread) { - - private[akka] val owner = new AtomicReference[ActorRef](_actor) - - def this(actor: ActorRef) = - this(actor, UnboundedMailbox()) // For Java API - - def this(actor: ActorRef, capacity: Int) = - this(actor, BoundedMailbox(capacity)) //For Java API - - def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = //For Java API - this(actor, BoundedMailbox(capacity, pushTimeOut)) - - override def register(actorRef: ActorRef) = { - val actor = owner.get() - if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) - owner.compareAndSet(null, actorRef) //Register if unregistered - super.register(actorRef) - } - - override def unregister(actorRef: ActorRef) = { - super.unregister(actorRef) - owner.compareAndSet(actorRef, null) //Unregister (prevent memory leak) - } -} - -object ThreadBasedDispatcher { - val oneThread: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1) -} - diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala b/test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala deleted file mode 100644 index e847610c4c..0000000000 --- a/test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala +++ /dev/null @@ -1,259 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.dispatch - -import java.util.Collection -import java.util.concurrent._ -import atomic.{ AtomicLong, AtomicInteger } -import ThreadPoolExecutor.CallerRunsPolicy - -import akka.util.Duration -import akka.event.EventHandler - -object ThreadPoolConfig { - type Bounds = Int - type FlowHandler = Either[RejectedExecutionHandler, Bounds] - type QueueFactory = () => BlockingQueue[Runnable] - - val defaultAllowCoreThreadTimeout: Boolean = false - val defaultCorePoolSize: Int = 16 - val defaultMaxPoolSize: Int = 128 - val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS) - def defaultFlowHandler: FlowHandler = flowHandler(new CallerRunsPolicy) - - def flowHandler(rejectionHandler: RejectedExecutionHandler): FlowHandler = Left(rejectionHandler) - def flowHandler(bounds: Int): FlowHandler = Right(bounds) - - def fixedPoolSize(size: Int): Int = size - def scaledPoolSize(multiplier: Double): Int = - (Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt - - def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory = - () => new ArrayBlockingQueue[Runnable](capacity, fair) - - def synchronousQueue(fair: Boolean): QueueFactory = - () => new SynchronousQueue[Runnable](fair) - - def linkedBlockingQueue(): QueueFactory = - () => new LinkedBlockingQueue[Runnable]() - - def linkedBlockingQueue(capacity: Int): QueueFactory = - () => new LinkedBlockingQueue[Runnable](capacity) - - def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory = - () => queue - - def reusableQueue(queueFactory: QueueFactory): QueueFactory = { - val queue = queueFactory() - () => queue - } -} - -case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout, - corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize, - maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, - threadTimeout: Duration = ThreadPoolConfig.defaultTimeout, - flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler, - queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) { - - final def createLazyExecutorService(threadFactory: ThreadFactory): ExecutorService = - new LazyExecutorServiceWrapper(createExecutorService(threadFactory)) - - final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = { - flowHandler match { - case Left(rejectHandler) => - val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler) - service.allowCoreThreadTimeOut(allowCorePoolTimeout) - service - case Right(bounds) => - val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory) - service.allowCoreThreadTimeOut(allowCorePoolTimeout) - new BoundedExecutorDecorator(service, bounds) - } - } -} - -trait DispatcherBuilder { - def build: MessageDispatcher -} - -object ThreadPoolConfigDispatcherBuilder { - def conf_?[T](opt: Option[T])(fun: (T) => ThreadPoolConfigDispatcherBuilder => ThreadPoolConfigDispatcherBuilder): Option[(ThreadPoolConfigDispatcherBuilder) => ThreadPoolConfigDispatcherBuilder] = opt map fun -} - -case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) => MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder { - import ThreadPoolConfig._ - def build = dispatcherFactory(config) - - //TODO remove this, for backwards compat only - @deprecated("Use .build instead", "1.1") - def buildThreadPool = build - - def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue())) - - def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(flowHandler = defaultFlowHandler, queueFactory = newQueueFactory)) - - def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder = - withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue)) - - def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(queueFactory = linkedBlockingQueue(), flowHandler = defaultFlowHandler)) - - def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity), flowHandler = defaultFlowHandler)) - - def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(queueFactory = synchronousQueue(fair), flowHandler = defaultFlowHandler)) - - def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair), flowHandler = defaultFlowHandler)) - - def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(corePoolSize = size)) - - def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(maxPoolSize = size)) - - def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder = - setCorePoolSize(scaledPoolSize(multiplier)) - - def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder = - setMaxPoolSize(scaledPoolSize(multiplier)) - - def setExecutorBounds(bounds: Int): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(flowHandler = flowHandler(bounds))) - - def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder = - setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS)) - - def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(threadTimeout = time)) - - def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolConfigDispatcherBuilder = - setFlowHandler(flowHandler(policy)) - - def setFlowHandler(newFlowHandler: FlowHandler): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(flowHandler = newFlowHandler)) - - def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder = - this.copy(config = config.copy(allowCorePoolTimeout = allow)) - - def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) => f.map(_(c)).getOrElse(c)) -} - -/** - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -class MonitorableThreadFactory(val name: String) extends ThreadFactory { - protected val counter = new AtomicLong - - def newThread(runnable: Runnable) = new MonitorableThread(runnable, name) -} - -/** - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -object MonitorableThread { - val DEFAULT_NAME = "MonitorableThread" - - // FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring - val created = new AtomicInteger - val alive = new AtomicInteger -} - -/** - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -class MonitorableThread(runnable: Runnable, name: String) - extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) { - - setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - def uncaughtException(thread: Thread, cause: Throwable) = {} - }) - - override def run = { - try { - MonitorableThread.alive.incrementAndGet - super.run - } finally { - MonitorableThread.alive.decrementAndGet - } - } -} - -/** - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate { - protected val semaphore = new Semaphore(bound) - - override def execute(command: Runnable) = { - semaphore.acquire - try { - executor.execute(new Runnable() { - def run = { - try { - command.run - } finally { - semaphore.release - } - } - }) - } catch { - case e: RejectedExecutionException => - EventHandler.warning(this, e.toString) - semaphore.release - case e: Throwable => - EventHandler.error(e, this, e.getMessage) - throw e - } - } -} - -trait ExecutorServiceDelegate extends ExecutorService { - - def executor: ExecutorService - - def execute(command: Runnable) = executor.execute(command) - - def shutdown() { executor.shutdown() } - - def shutdownNow() = executor.shutdownNow() - - def isShutdown = executor.isShutdown - - def isTerminated = executor.isTerminated - - def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit) - - def submit[T](callable: Callable[T]) = executor.submit(callable) - - def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t) - - def submit(runnable: Runnable) = executor.submit(runnable) - - def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables) - - def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit) - - def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables) - - def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) -} - -trait LazyExecutorService extends ExecutorServiceDelegate { - - def createExecutor: ExecutorService - - lazy val executor = { - createExecutor - } -} - -class LazyExecutorServiceWrapper(executorFactory: => ExecutorService) extends LazyExecutorService { - def createExecutor = executorFactory -} |