summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/dispatch
diff options
context:
space:
mode:
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/dispatch')
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala227
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala305
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala165
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala68
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala260
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala52
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala259
7 files changed, 0 insertions, 1336 deletions
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala b/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala
deleted file mode 100644
index a567d0bcb0..0000000000
--- a/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.dispatch
-
-import akka.actor.{ Actor, ActorRef }
-import akka.actor.newUuid
-import akka.config.Config._
-import akka.util.{ Duration, ReflectiveAccess }
-
-import akka.config.Configuration
-
-import java.util.concurrent.TimeUnit
-
-/**
- * Scala API. Dispatcher factory.
- * <p/>
- * Example usage:
- * <pre/>
- * val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name")
- * dispatcher
- * .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
- * .setCorePoolSize(16)
- * .setMaxPoolSize(128)
- * .setKeepAliveTimeInMillis(60000)
- * .setRejectionPolicy(new CallerRunsPolicy)
- * .build
- * </pre>
- * <p/>
- * Java API. Dispatcher factory.
- * <p/>
- * Example usage:
- * <pre/>
- * MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name");
- * dispatcher
- * .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
- * .setCorePoolSize(16)
- * .setMaxPoolSize(128)
- * .setKeepAliveTimeInMillis(60000)
- * .setRejectionPolicy(new CallerRunsPolicy())
- * .build();
- * </pre>
- * <p/>
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-object Dispatchers {
- val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
- val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout").
- map(time => Duration(time, TIME_UNIT)).
- getOrElse(Duration(1000, TimeUnit.MILLISECONDS))
- val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
- val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT)
- val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time", -1), TIME_UNIT)
- val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt
- val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox()
-
- lazy val defaultGlobalDispatcher = {
- config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
- }
-
- object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE)
-
- /**
- * Creates an thread based dispatcher serving a single actor through the same single thread.
- * Uses the default timeout
- * <p/>
- * E.g. each actor consumes its own thread.
- */
- def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor)
-
- /**
- * Creates an thread based dispatcher serving a single actor through the same single thread.
- * Uses the default timeout
- * If capacity is negative, it's Integer.MAX_VALUE
- * <p/>
- * E.g. each actor consumes its own thread.
- */
- def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int) = new ThreadBasedDispatcher(actor, mailboxCapacity)
-
- /**
- * Creates an thread based dispatcher serving a single actor through the same single thread.
- * If capacity is negative, it's Integer.MAX_VALUE
- * <p/>
- * E.g. each actor consumes its own thread.
- */
- def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) =
- new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeOut)
-
- /**
- * Creates an executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
- * <p/>
- * Has a fluent builder interface for configuring its semantics.
- */
- def newExecutorBasedEventDrivenDispatcher(name: String) =
- ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenDispatcher(name, config), ThreadPoolConfig())
-
- /**
- * Creates an executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
- * <p/>
- * Has a fluent builder interface for configuring its semantics.
- */
- def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
- ThreadPoolConfigDispatcherBuilder(config =>
- new ExecutorBasedEventDrivenDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig())
-
- /**
- * Creates an executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
- * <p/>
- * Has a fluent builder interface for configuring its semantics.
- */
- def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
- ThreadPoolConfigDispatcherBuilder(config =>
- new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig())
-
- /**
- * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
- * <p/>
- * Has a fluent builder interface for configuring its semantics.
- */
- def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) =
- ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, config), ThreadPoolConfig())
-
- /**
- * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
- * <p/>
- * Has a fluent builder interface for configuring its semantics.
- */
- def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int) =
- ThreadPoolConfigDispatcherBuilder(config =>
- new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config), ThreadPoolConfig())
-
- /**
- * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
- * <p/>
- * Has a fluent builder interface for configuring its semantics.
- */
- def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
- ThreadPoolConfigDispatcherBuilder(config =>
- new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig())
-
- /**
- * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
- * <p/>
- * Has a fluent builder interface for configuring its semantics.
- */
- def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
- ThreadPoolConfigDispatcherBuilder(config =>
- new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig())
- /**
- * Utility function that tries to load the specified dispatcher config from the akka.conf
- * or else use the supplied default dispatcher
- */
- def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher =
- config getSection key flatMap from getOrElse default
-
- /*
- * Creates of obtains a dispatcher from a ConfigMap according to the format below
- *
- * default-dispatcher {
- * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
- * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
- * # GlobalExecutorBasedEventDriven
- * # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
- * keep-alive-time = 60 # Keep alive time for threads
- * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
- * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
- * executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
- * allow-core-timeout = on # Allow core threads to time out
- * rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
- * throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
- * }
- * ex: from(config.getConfigMap(identifier).get)
- *
- * Gotcha: Only configures the dispatcher if possible
- * Returns: None if "type" isn't specified in the config
- * Throws: IllegalArgumentException if the value of "type" is not valid
- * IllegalArgumentException if it cannot
- */
- def from(cfg: Configuration): Option[MessageDispatcher] = {
- cfg.getString("type") map {
- case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcherConfigurator()
- case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator()
- case "GlobalExecutorBasedEventDriven" => GlobalExecutorBasedEventDrivenDispatcherConfigurator
- case fqn =>
- ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
- case r: Right[_, Class[MessageDispatcherConfigurator]] =>
- ReflectiveAccess.createInstance[MessageDispatcherConfigurator](r.b, Array[Class[_]](), Array[AnyRef]()) match {
- case r: Right[Exception, MessageDispatcherConfigurator] => r.b
- case l: Left[Exception, MessageDispatcherConfigurator] =>
- throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, l.a)
- }
- case l: Left[Exception, _] =>
- throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, l.a)
- }
- } map {
- _ configure cfg
- }
- }
-}
-
-object GlobalExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator {
- def configure(config: Configuration): MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
-}
-
-class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator {
- def configure(config: Configuration): MessageDispatcher = {
- configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenDispatcher(
- config.getString("name", newUuid.toString),
- config.getInt("throughput", Dispatchers.THROUGHPUT),
- config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
- mailboxType(config),
- threadPoolConfig)).build
- }
-}
-
-class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends MessageDispatcherConfigurator {
- def configure(config: Configuration): MessageDispatcher = {
- configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(
- config.getString("name", newUuid.toString),
- config.getInt("throughput", Dispatchers.THROUGHPUT),
- config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
- mailboxType(config),
- threadPoolConfig)).build
- }
-}
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala
deleted file mode 100644
index bc3f29ac68..0000000000
--- a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala
+++ /dev/null
@@ -1,305 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.dispatch
-
-import akka.event.EventHandler
-import akka.actor.{ ActorRef, IllegalActorStateException }
-import akka.util.{ ReflectiveAccess, Switch }
-
-import java.util.Queue
-import java.util.concurrent.atomic.AtomicReference
-import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue }
-
-/**
- * Default settings are:
- * <pre/>
- * - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
- * - NR_START_THREADS = 16
- * - NR_MAX_THREADS = 128
- * - KEEP_ALIVE_TIME = 60000L // one minute
- * </pre>
- * <p/>
- *
- * The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
- * There is a default thread pool defined but make use of the builder if you need it. Here are some examples.
- * <p/>
- *
- * Scala API.
- * <p/>
- * Example usage:
- * <pre/>
- * val dispatcher = new ExecutorBasedEventDrivenDispatcher("name")
- * dispatcher
- * .withNewThreadPoolWithBoundedBlockingQueue(100)
- * .setCorePoolSize(16)
- * .setMaxPoolSize(128)
- * .setKeepAliveTimeInMillis(60000)
- * .setRejectionPolicy(new CallerRunsPolicy)
- * .buildThreadPool
- * </pre>
- * <p/>
- *
- * Java API.
- * <p/>
- * Example usage:
- * <pre/>
- * ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name");
- * dispatcher
- * .withNewThreadPoolWithBoundedBlockingQueue(100)
- * .setCorePoolSize(16)
- * .setMaxPoolSize(128)
- * .setKeepAliveTimeInMillis(60000)
- * .setRejectionPolicy(new CallerRunsPolicy())
- * .buildThreadPool();
- * </pre>
- * <p/>
- *
- * But the preferred way of creating dispatchers is to use
- * the {@link akka.dispatch.Dispatchers} factory object.
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- * @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
- * mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher
- * always continues until the mailbox is empty.
- * Larger values (or zero or negative) increase throughput, smaller values increase fairness
- */
-class ExecutorBasedEventDrivenDispatcher(
- _name: String,
- val throughput: Int = Dispatchers.THROUGHPUT,
- val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
- val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
- val config: ThreadPoolConfig = ThreadPoolConfig())
- extends MessageDispatcher {
-
- def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
- this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
-
- def this(_name: String, throughput: Int, mailboxType: MailboxType) =
- this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
-
- def this(_name: String, throughput: Int) =
- this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
-
- def this(_name: String, _config: ThreadPoolConfig) =
- this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
-
- def this(_name: String) =
- this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
-
- val name = "akka:event-driven:dispatcher:" + _name
-
- private[akka] val threadFactory = new MonitorableThreadFactory(name)
- private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
-
- private[akka] def dispatch(invocation: MessageInvocation) = {
- val mbox = getMailbox(invocation.receiver)
- mbox enqueue invocation
- registerForExecution(mbox)
- }
-
- private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit = if (active.isOn) {
- try executorService.get() execute invocation
- catch {
- case e: RejectedExecutionException =>
- EventHandler.warning(this, e.toString)
- throw e
- }
- }
-
- /**
- * @return the mailbox associated with the actor
- */
- protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
-
- override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
-
- def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
- case b: UnboundedMailbox =>
- new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox {
- @inline
- final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
- @inline
- final def enqueue(m: MessageInvocation) = this.add(m)
- @inline
- final def dequeue(): MessageInvocation = this.poll()
- }
- case b: BoundedMailbox =>
- new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox {
- @inline
- final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
- }
- }
-
- private[akka] def start {}
-
- private[akka] def shutdown {
- val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
- if (old ne null) {
- old.shutdownNow()
- }
- }
-
- private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
- if (mbox.dispatcherLock.tryLock()) {
- if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended
- try {
- executorService.get() execute mbox
- } catch {
- case e: RejectedExecutionException =>
- EventHandler.warning(this, e.toString)
- mbox.dispatcherLock.unlock()
- throw e
- }
- } else {
- mbox.dispatcherLock.unlock() //If the dispatcher isn't active or if the actor is suspended, unlock the dispatcher lock
- }
- }
- }
-
- private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
- registerForExecution(mbox)
-
- override val toString = getClass.getSimpleName + "[" + name + "]"
-
- def suspend(actorRef: ActorRef) {
- getMailbox(actorRef).suspended.tryLock
- }
-
- def resume(actorRef: ActorRef) {
- val mbox = getMailbox(actorRef)
- mbox.suspended.tryUnlock
- reRegisterForExecution(mbox)
- }
-}
-
-/**
- * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox.
- */
-trait ExecutableMailbox extends Runnable { self: MessageQueue =>
-
- def dispatcher: ExecutorBasedEventDrivenDispatcher
-
- final def run = {
- try {
- processMailbox()
- } catch {
- case ie: InterruptedException =>
- }
- finally {
- dispatcherLock.unlock()
- }
- if (!self.isEmpty)
- dispatcher.reRegisterForExecution(this)
- }
-
- /**
- * Process the messages in the mailbox
- *
- * @return true if the processing finished before the mailbox was empty, due to the throughput constraint
- */
- final def processMailbox() {
- if (!self.suspended.locked) {
- var nextMessage = self.dequeue
- if (nextMessage ne null) { //If we have a message
- if (dispatcher.throughput <= 1) //If we only run one message per process
- nextMessage.invoke //Just run it
- else { //But otherwise, if we are throttled, we need to do some book-keeping
- var processedMessages = 0
- val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0
- val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime)
- else 0
- do {
- nextMessage.invoke
- nextMessage =
- if (self.suspended.locked) {
- null // If we are suspended, abort
- } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
- processedMessages += 1
- if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
- null //We reached our boundaries, abort
- else self.dequeue //Dequeue the next message
- }
- } while (nextMessage ne null)
- }
- }
- }
- }
-}
-
-object PriorityGenerator {
- /**
- * Creates a PriorityGenerator that uses the supplied function as priority generator
- */
- def apply(priorityFunction: Any => Int): PriorityGenerator = new PriorityGenerator {
- def gen(message: Any): Int = priorityFunction(message)
- }
-}
-
-/**
- * A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a
- * PriorityExecutorBasedEventDrivenDispatcher
- */
-abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] {
- def gen(message: Any): Int
-
- final def compare(thisMessage: MessageInvocation, thatMessage: MessageInvocation): Int =
- gen(thisMessage.message) - gen(thatMessage.message)
-}
-
-/**
- * A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox,
- * prioritized according to the supplied comparator.
- *
- * The dispatcher will process the messages with the _lowest_ priority first.
- */
-class PriorityExecutorBasedEventDrivenDispatcher(
- name: String,
- val comparator: java.util.Comparator[MessageInvocation],
- throughput: Int = Dispatchers.THROUGHPUT,
- throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
- mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
- config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox {
-
- def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
- this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
-
- def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) =
- this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
-
- def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
- this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
-
- def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) =
- this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config)
-
- def this(name: String, comparator: java.util.Comparator[MessageInvocation]) =
- this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
-}
-
-/**
- * Can be used to give an ExecutorBasedEventDrivenDispatcher's actors priority-enabled mailboxes
- *
- * Usage:
- * new ExecutorBasedEventDrivenDispatcher(...) with PriorityMailbox {
- * val comparator = ...comparator that determines mailbox priority ordering...
- * }
- */
-trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher =>
- def comparator: java.util.Comparator[MessageInvocation]
-
- override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match {
- case b: UnboundedMailbox =>
- new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox {
- @inline
- final def dispatcher = self
- }
-
- case b: BoundedMailbox =>
- new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, comparator) with ExecutableMailbox {
- @inline
- final def dispatcher = self
- }
- }
-}
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
deleted file mode 100644
index 4cba8eec8b..0000000000
--- a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.dispatch
-
-import akka.actor.{ ActorRef, Actor, IllegalActorStateException }
-import akka.util.{ ReflectiveAccess, Switch }
-
-import java.util.Queue
-import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
-import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue }
-import util.DynamicVariable
-
-/**
- * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
- * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the
- * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
- * <p/>
- * Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
- * best described as "work donating" because the actor of which work is being stolen takes the initiative.
- * <p/>
- * The preferred way of creating dispatchers is to use
- * the {@link akka.dispatch.Dispatchers} factory object.
- *
- * @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
- * @see akka.dispatch.Dispatchers
- *
- * @author Viktor Klang
- */
-class ExecutorBasedEventDrivenWorkStealingDispatcher(
- _name: String,
- throughput: Int = Dispatchers.THROUGHPUT,
- throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
- mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
- config: ThreadPoolConfig = ThreadPoolConfig())
- extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) {
-
- def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
- this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
-
- def this(_name: String, throughput: Int, mailboxType: MailboxType) =
- this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
-
- def this(_name: String, throughput: Int) =
- this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
-
- def this(_name: String, _config: ThreadPoolConfig) =
- this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
-
- def this(_name: String, memberType: Class[_ <: Actor]) =
- this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
-
- def this(_name: String, mailboxType: MailboxType) =
- this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
-
- @volatile
- private var actorType: Option[Class[_]] = None
- @volatile
- private var members = Vector[ActorRef]()
- private val donationInProgress = new DynamicVariable(false)
-
- private[akka] override def register(actorRef: ActorRef) = {
- //Verify actor type conformity
- actorType match {
- case None => actorType = Some(actorRef.actor.getClass)
- case Some(aType) =>
- if (aType != actorRef.actor.getClass)
- throw new IllegalActorStateException(String.format(
- "Can't register actor %s in a work stealing dispatcher which already knows actors of type %s",
- actorRef, aType))
- }
-
- synchronized { members :+= actorRef } //Update members
- super.register(actorRef)
- }
-
- private[akka] override def unregister(actorRef: ActorRef) = {
- synchronized { members = members.filterNot(actorRef eq) } //Update members
- super.unregister(actorRef)
- }
-
- override private[akka] def dispatch(invocation: MessageInvocation) = {
- val mbox = getMailbox(invocation.receiver)
- if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) {
- //We were busy and we got to donate the message to some other lucky guy, we're done here
- } else {
- mbox enqueue invocation
- registerForExecution(mbox)
- }
- }
-
- override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
- try {
- donationInProgress.value = true
- while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor
- } finally { donationInProgress.value = false }
-
- if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution
- super.reRegisterForExecution(mbox)
- }
-
- /**
- * Returns true if it successfully donated a message
- */
- protected def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
- val actors = members // copy to prevent concurrent modifications having any impact
-
- // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
- // the dispatcher is being shut down...
- // Starts at is seeded by current time
- doFindDonorRecipient(donorMbox, actors, (System.currentTimeMillis % actors.size).asInstanceOf[Int]) match {
- case null => false
- case recipient => donate(donorMbox.dequeue, recipient)
- }
- }
-
- /**
- * Returns true if the donation succeeded or false otherwise
- */
- protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = try {
- donationInProgress.value = true
- val actors = members // copy to prevent concurrent modifications having any impact
- doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match {
- case null => false
- case recipient => donate(message, recipient)
- }
- } finally { donationInProgress.value = false }
-
- /**
- * Rewrites the message and adds that message to the recipients mailbox
- * returns true if the message is non-null
- */
- protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = {
- if (organ ne null) {
- if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
- organ.message, recipient.timeout, organ.sender, organ.senderFuture)
- else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender)
- else recipient.postMessageToMailbox(organ.message, None)
- true
- } else false
- }
-
- /**
- * Returns an available recipient for the message, if any
- */
- protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorRef], startIndex: Int): ActorRef = {
- val prSz = potentialRecipients.size
- var i = 0
- var recipient: ActorRef = null
-
- while ((i < prSz) && (recipient eq null)) {
- val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap
- val mbox = getMailbox(actor)
-
- if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself
- recipient = actor //Found!
- }
-
- i += 1
- }
-
- recipient // nothing found, reuse same start index next time
- }
-}
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala b/test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala
deleted file mode 100644
index 4c00577157..0000000000
--- a/test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.dispatch
-
-import akka.AkkaException
-
-import java.util.{ Comparator, PriorityQueue }
-import java.util.concurrent._
-import akka.util._
-
-class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
-
-/**
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-trait MessageQueue {
- val dispatcherLock = new SimpleLock
- val suspended = new SimpleLock
- def enqueue(handle: MessageInvocation)
- def dequeue(): MessageInvocation
- def size: Int
- def isEmpty: Boolean
-}
-
-/**
- * Mailbox configuration.
- */
-sealed trait MailboxType
-
-case class UnboundedMailbox() extends MailboxType
-case class BoundedMailbox(
- val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
- val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
- if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
- if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
-}
-
-trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] =>
- @inline
- final def enqueue(handle: MessageInvocation): Unit = this add handle
- @inline
- final def dequeue(): MessageInvocation = this.poll()
-}
-
-trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] =>
- def pushTimeOut: Duration
-
- final def enqueue(handle: MessageInvocation) {
- if (pushTimeOut.length > 0) {
- this.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
- throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
- }
- } else this put handle
- }
-
- @inline
- final def dequeue(): MessageInvocation = this.poll()
-}
-
-class DefaultUnboundedMessageQueue extends LinkedBlockingQueue[MessageInvocation] with UnboundedMessageQueueSemantics
-
-class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration) extends LinkedBlockingQueue[MessageInvocation](capacity) with BoundedMessageQueueSemantics
-
-class UnboundedPriorityMessageQueue(cmp: Comparator[MessageInvocation]) extends PriorityBlockingQueue[MessageInvocation](11, cmp) with UnboundedMessageQueueSemantics
-
-class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, cmp: Comparator[MessageInvocation]) extends BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with BoundedMessageQueueSemantics
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala b/test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala
deleted file mode 100644
index 20887c3867..0000000000
--- a/test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.dispatch
-
-import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicLong
-import akka.event.EventHandler
-import akka.config.Configuration
-import akka.config.Config.TIME_UNIT
-import akka.util.{ Duration, Switch, ReentrantGuard }
-import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy }
-import akka.actor._
-
-/**
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-final case class MessageInvocation(val receiver: ActorRef,
- val message: Any,
- val sender: Option[ActorRef],
- val senderFuture: Option[CompletableFuture[Any]]) {
- if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
-
- def invoke = try {
- receiver.invoke(this)
- } catch {
- case e: NullPointerException => throw new ActorInitializationException(
- "Don't call 'self ! message' in the Actor's constructor (in Scala this means in the body of the class).")
- }
-}
-
-final case class FutureInvocation[T](future: CompletableFuture[T], function: () => T, cleanup: () => Unit) extends Runnable {
- def run = {
- future complete (try {
- Right(function())
- } catch {
- case e =>
- EventHandler.error(e, this, e.getMessage)
- Left(e)
- }
- finally {
- cleanup()
- })
- }
-}
-
-object MessageDispatcher {
- val UNSCHEDULED = 0
- val SCHEDULED = 1
- val RESCHEDULED = 2
-
- implicit def defaultGlobalDispatcher = Dispatchers.defaultGlobalDispatcher
-}
-
-/**
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-trait MessageDispatcher {
- import MessageDispatcher._
-
- protected val uuids = new ConcurrentSkipListSet[Uuid]
- protected val futures = new AtomicLong(0L)
- protected val guard = new ReentrantGuard
- protected val active = new Switch(false)
-
- private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard
-
- /**
- * Creates and returns a mailbox for the given actor.
- */
- private[akka] def createMailbox(actorRef: ActorRef): AnyRef
-
- /**
- * Attaches the specified actorRef to this dispatcher
- */
- final def attach(actorRef: ActorRef): Unit = guard withGuard {
- register(actorRef)
- }
-
- /**
- * Detaches the specified actorRef from this dispatcher
- */
- final def detach(actorRef: ActorRef): Unit = guard withGuard {
- unregister(actorRef)
- }
-
- private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = dispatch(invocation)
-
- private[akka] final def dispatchFuture[T](block: () => T, timeout: Long): Future[T] = {
- futures.getAndIncrement()
- try {
- val future = new DefaultCompletableFuture[T](timeout)
-
- if (active.isOff)
- guard withGuard { active.switchOn { start } }
-
- executeFuture(FutureInvocation[T](future, block, futureCleanup))
- future
- } catch {
- case e =>
- futures.decrementAndGet
- throw e
- }
- }
-
- private val futureCleanup: () => Unit =
- () => if (futures.decrementAndGet() == 0) {
- guard withGuard {
- if (futures.get == 0 && uuids.isEmpty) {
- shutdownSchedule match {
- case UNSCHEDULED =>
- shutdownSchedule = SCHEDULED
- Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
- case SCHEDULED =>
- shutdownSchedule = RESCHEDULED
- case RESCHEDULED => //Already marked for reschedule
- }
- }
- }
- }
-
- private[akka] def register(actorRef: ActorRef) {
- if (actorRef.mailbox eq null)
- actorRef.mailbox = createMailbox(actorRef)
-
- uuids add actorRef.uuid
- if (active.isOff) {
- active.switchOn {
- start
- }
- }
- }
-
- private[akka] def unregister(actorRef: ActorRef) = {
- if (uuids remove actorRef.uuid) {
- actorRef.mailbox = null
- if (uuids.isEmpty && futures.get == 0) {
- shutdownSchedule match {
- case UNSCHEDULED =>
- shutdownSchedule = SCHEDULED
- Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
- case SCHEDULED =>
- shutdownSchedule = RESCHEDULED
- case RESCHEDULED => //Already marked for reschedule
- }
- }
- }
- }
-
- /**
- * Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors
- */
- def stopAllAttachedActors {
- val i = uuids.iterator
- while (i.hasNext()) {
- val uuid = i.next()
- Actor.registry.actorFor(uuid) match {
- case Some(actor) => actor.stop()
- case None => {}
- }
- }
- }
-
- private val shutdownAction = new Runnable {
- def run = guard withGuard {
- shutdownSchedule match {
- case RESCHEDULED =>
- shutdownSchedule = SCHEDULED
- Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
- case SCHEDULED =>
- if (uuids.isEmpty && futures.get == 0) {
- active switchOff {
- shutdown // shut down in the dispatcher's references is zero
- }
- }
- shutdownSchedule = UNSCHEDULED
- case UNSCHEDULED => //Do nothing
- }
- }
- }
-
- /**
- * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms
- * defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second
- */
- private[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis
-
- /**
- * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
- */
- def suspend(actorRef: ActorRef): Unit
-
- /*
- * After the call to this method, the dispatcher must begin any new message processing for the specified reference
- */
- def resume(actorRef: ActorRef): Unit
-
- /**
- * Will be called when the dispatcher is to queue an invocation for execution
- */
- private[akka] def dispatch(invocation: MessageInvocation): Unit
-
- private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit
-
- /**
- * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown
- */
- private[akka] def start(): Unit
-
- /**
- * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
- */
- private[akka] def shutdown(): Unit
-
- /**
- * Returns the size of the mailbox for the specified actor
- */
- def mailboxSize(actorRef: ActorRef): Int
-
- /**
- * Returns the amount of futures queued for execution
- */
- def pendingFutures: Long = futures.get
-}
-
-/**
- * Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig
- */
-abstract class MessageDispatcherConfigurator {
- /**
- * Returns an instance of MessageDispatcher given a Configuration
- */
- def configure(config: Configuration): MessageDispatcher
-
- def mailboxType(config: Configuration): MailboxType = {
- val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY)
- if (capacity < 1) UnboundedMailbox()
- else BoundedMailbox(capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT))
- }
-
- def configureThreadPool(config: Configuration, createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
- import ThreadPoolConfigDispatcherBuilder.conf_?
-
- //Apply the following options to the config if they are present in the config
- ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure(
- conf_?(config getInt "keep-alive-time")(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
- conf_?(config getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
- conf_?(config getDouble "max-pool-size-factor")(factor => _.setMaxPoolSizeFromFactor(factor)),
- conf_?(config getInt "executor-bounds")(bounds => _.setExecutorBounds(bounds)),
- conf_?(config getBool "allow-core-timeout")(allow => _.setAllowCoreThreadTimeout(allow)),
- conf_?(config getString "rejection-policy" map {
- case "abort" => new AbortPolicy()
- case "caller-runs" => new CallerRunsPolicy()
- case "discard-oldest" => new DiscardOldestPolicy()
- case "discard" => new DiscardPolicy()
- case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
- })(policy => _.setRejectionPolicy(policy)))
- }
-}
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala
deleted file mode 100644
index 3169c70ef9..0000000000
--- a/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.dispatch
-
-import akka.actor.{ Actor, ActorRef }
-import akka.config.Config.config
-import akka.util.Duration
-
-import java.util.Queue
-import java.util.concurrent.{ ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue }
-import akka.actor
-import java.util.concurrent.atomic.AtomicReference
-
-/**
- * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
- *
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-class ThreadBasedDispatcher(_actor: ActorRef, _mailboxType: MailboxType)
- extends ExecutorBasedEventDrivenDispatcher(
- _actor.uuid.toString, Dispatchers.THROUGHPUT, -1, _mailboxType, ThreadBasedDispatcher.oneThread) {
-
- private[akka] val owner = new AtomicReference[ActorRef](_actor)
-
- def this(actor: ActorRef) =
- this(actor, UnboundedMailbox()) // For Java API
-
- def this(actor: ActorRef, capacity: Int) =
- this(actor, BoundedMailbox(capacity)) //For Java API
-
- def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = //For Java API
- this(actor, BoundedMailbox(capacity, pushTimeOut))
-
- override def register(actorRef: ActorRef) = {
- val actor = owner.get()
- if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
- owner.compareAndSet(null, actorRef) //Register if unregistered
- super.register(actorRef)
- }
-
- override def unregister(actorRef: ActorRef) = {
- super.unregister(actorRef)
- owner.compareAndSet(actorRef, null) //Unregister (prevent memory leak)
- }
-}
-
-object ThreadBasedDispatcher {
- val oneThread: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1)
-}
-
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala b/test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala
deleted file mode 100644
index e847610c4c..0000000000
--- a/test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.dispatch
-
-import java.util.Collection
-import java.util.concurrent._
-import atomic.{ AtomicLong, AtomicInteger }
-import ThreadPoolExecutor.CallerRunsPolicy
-
-import akka.util.Duration
-import akka.event.EventHandler
-
-object ThreadPoolConfig {
- type Bounds = Int
- type FlowHandler = Either[RejectedExecutionHandler, Bounds]
- type QueueFactory = () => BlockingQueue[Runnable]
-
- val defaultAllowCoreThreadTimeout: Boolean = false
- val defaultCorePoolSize: Int = 16
- val defaultMaxPoolSize: Int = 128
- val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS)
- def defaultFlowHandler: FlowHandler = flowHandler(new CallerRunsPolicy)
-
- def flowHandler(rejectionHandler: RejectedExecutionHandler): FlowHandler = Left(rejectionHandler)
- def flowHandler(bounds: Int): FlowHandler = Right(bounds)
-
- def fixedPoolSize(size: Int): Int = size
- def scaledPoolSize(multiplier: Double): Int =
- (Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt
-
- def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory =
- () => new ArrayBlockingQueue[Runnable](capacity, fair)
-
- def synchronousQueue(fair: Boolean): QueueFactory =
- () => new SynchronousQueue[Runnable](fair)
-
- def linkedBlockingQueue(): QueueFactory =
- () => new LinkedBlockingQueue[Runnable]()
-
- def linkedBlockingQueue(capacity: Int): QueueFactory =
- () => new LinkedBlockingQueue[Runnable](capacity)
-
- def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory =
- () => queue
-
- def reusableQueue(queueFactory: QueueFactory): QueueFactory = {
- val queue = queueFactory()
- () => queue
- }
-}
-
-case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
- corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
- maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
- threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
- flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler,
- queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) {
-
- final def createLazyExecutorService(threadFactory: ThreadFactory): ExecutorService =
- new LazyExecutorServiceWrapper(createExecutorService(threadFactory))
-
- final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = {
- flowHandler match {
- case Left(rejectHandler) =>
- val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler)
- service.allowCoreThreadTimeOut(allowCorePoolTimeout)
- service
- case Right(bounds) =>
- val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory)
- service.allowCoreThreadTimeOut(allowCorePoolTimeout)
- new BoundedExecutorDecorator(service, bounds)
- }
- }
-}
-
-trait DispatcherBuilder {
- def build: MessageDispatcher
-}
-
-object ThreadPoolConfigDispatcherBuilder {
- def conf_?[T](opt: Option[T])(fun: (T) => ThreadPoolConfigDispatcherBuilder => ThreadPoolConfigDispatcherBuilder): Option[(ThreadPoolConfigDispatcherBuilder) => ThreadPoolConfigDispatcherBuilder] = opt map fun
-}
-
-case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) => MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
- import ThreadPoolConfig._
- def build = dispatcherFactory(config)
-
- //TODO remove this, for backwards compat only
- @deprecated("Use .build instead", "1.1")
- def buildThreadPool = build
-
- def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue()))
-
- def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(flowHandler = defaultFlowHandler, queueFactory = newQueueFactory))
-
- def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder =
- withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue))
-
- def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(queueFactory = linkedBlockingQueue(), flowHandler = defaultFlowHandler))
-
- def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity), flowHandler = defaultFlowHandler))
-
- def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(queueFactory = synchronousQueue(fair), flowHandler = defaultFlowHandler))
-
- def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair), flowHandler = defaultFlowHandler))
-
- def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(corePoolSize = size))
-
- def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(maxPoolSize = size))
-
- def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
- setCorePoolSize(scaledPoolSize(multiplier))
-
- def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
- setMaxPoolSize(scaledPoolSize(multiplier))
-
- def setExecutorBounds(bounds: Int): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(flowHandler = flowHandler(bounds)))
-
- def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder =
- setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS))
-
- def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(threadTimeout = time))
-
- def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolConfigDispatcherBuilder =
- setFlowHandler(flowHandler(policy))
-
- def setFlowHandler(newFlowHandler: FlowHandler): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(flowHandler = newFlowHandler))
-
- def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
- this.copy(config = config.copy(allowCorePoolTimeout = allow))
-
- def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) => f.map(_(c)).getOrElse(c))
-}
-
-/**
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-class MonitorableThreadFactory(val name: String) extends ThreadFactory {
- protected val counter = new AtomicLong
-
- def newThread(runnable: Runnable) = new MonitorableThread(runnable, name)
-}
-
-/**
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-object MonitorableThread {
- val DEFAULT_NAME = "MonitorableThread"
-
- // FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring
- val created = new AtomicInteger
- val alive = new AtomicInteger
-}
-
-/**
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-class MonitorableThread(runnable: Runnable, name: String)
- extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) {
-
- setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- def uncaughtException(thread: Thread, cause: Throwable) = {}
- })
-
- override def run = {
- try {
- MonitorableThread.alive.incrementAndGet
- super.run
- } finally {
- MonitorableThread.alive.decrementAndGet
- }
- }
-}
-
-/**
- * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
- */
-class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate {
- protected val semaphore = new Semaphore(bound)
-
- override def execute(command: Runnable) = {
- semaphore.acquire
- try {
- executor.execute(new Runnable() {
- def run = {
- try {
- command.run
- } finally {
- semaphore.release
- }
- }
- })
- } catch {
- case e: RejectedExecutionException =>
- EventHandler.warning(this, e.toString)
- semaphore.release
- case e: Throwable =>
- EventHandler.error(e, this, e.getMessage)
- throw e
- }
- }
-}
-
-trait ExecutorServiceDelegate extends ExecutorService {
-
- def executor: ExecutorService
-
- def execute(command: Runnable) = executor.execute(command)
-
- def shutdown() { executor.shutdown() }
-
- def shutdownNow() = executor.shutdownNow()
-
- def isShutdown = executor.isShutdown
-
- def isTerminated = executor.isTerminated
-
- def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit)
-
- def submit[T](callable: Callable[T]) = executor.submit(callable)
-
- def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
-
- def submit(runnable: Runnable) = executor.submit(runnable)
-
- def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
-
- def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
-
- def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
-
- def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
-}
-
-trait LazyExecutorService extends ExecutorServiceDelegate {
-
- def createExecutor: ExecutorService
-
- lazy val executor = {
- createExecutor
- }
-}
-
-class LazyExecutorServiceWrapper(executorFactory: => ExecutorService) extends LazyExecutorService {
- def createExecutor = executorFactory
-}