summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/dispatch
diff options
context:
space:
mode:
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/dispatch')
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala227
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala305
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala165
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/Future.scala832
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala68
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala260
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala52
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala259
8 files changed, 2168 insertions, 0 deletions
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala b/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala
new file mode 100644
index 0000000000..7dd1bf6218
--- /dev/null
+++ b/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala
@@ -0,0 +1,227 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package akka.dispatch
+
+import akka.actor.{ Actor, ActorRef }
+import akka.actor.newUuid
+import akka.config.Config._
+import akka.util.{ Duration, ReflectiveAccess }
+
+import akka.config.Configuration
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * Scala API. Dispatcher factory.
+ * <p/>
+ * Example usage:
+ * <pre/>
+ * val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name")
+ * dispatcher
+ * .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
+ * .setCorePoolSize(16)
+ * .setMaxPoolSize(128)
+ * .setKeepAliveTimeInMillis(60000)
+ * .setRejectionPolicy(new CallerRunsPolicy)
+ * .build
+ * </pre>
+ * <p/>
+ * Java API. Dispatcher factory.
+ * <p/>
+ * Example usage:
+ * <pre/>
+ * MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name");
+ * dispatcher
+ * .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
+ * .setCorePoolSize(16)
+ * .setMaxPoolSize(128)
+ * .setKeepAliveTimeInMillis(60000)
+ * .setRejectionPolicy(new CallerRunsPolicy())
+ * .build();
+ * </pre>
+ * <p/>
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
+ */
+object Dispatchers {
+ val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
+ val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout").
+ map(time => Duration(time, TIME_UNIT)).
+ getOrElse(Duration(1000, TimeUnit.MILLISECONDS))
+ val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
+ val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT)
+ val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time", -1), TIME_UNIT)
+ val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt
+ val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox()
+
+ lazy val defaultGlobalDispatcher = {
+ config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
+ }
+
+ object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE)
+
+ /**
+ * Creates an thread based dispatcher serving a single actor through the same single thread.
+ * Uses the default timeout
+ * <p/>
+ * E.g. each actor consumes its own thread.
+ */
+ def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor)
+
+ /**
+ * Creates an thread based dispatcher serving a single actor through the same single thread.
+ * Uses the default timeout
+ * If capacity is negative, it's Integer.MAX_VALUE
+ * <p/>
+ * E.g. each actor consumes its own thread.
+ */
+ def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int) = new ThreadBasedDispatcher(actor, mailboxCapacity)
+
+ /**
+ * Creates an thread based dispatcher serving a single actor through the same single thread.
+ * If capacity is negative, it's Integer.MAX_VALUE
+ * <p/>
+ * E.g. each actor consumes its own thread.
+ */
+ def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) =
+ new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeOut)
+
+ /**
+ * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
+ * <p/>
+ * Has a fluent builder interface for configuring its semantics.
+ */
+ def newExecutorBasedEventDrivenDispatcher(name: String) =
+ ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenDispatcher(name, config), ThreadPoolConfig())
+
+ /**
+ * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
+ * <p/>
+ * Has a fluent builder interface for configuring its semantics.
+ */
+ def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
+ ThreadPoolConfigDispatcherBuilder(config =>
+ new ExecutorBasedEventDrivenDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig())
+
+ /**
+ * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
+ * <p/>
+ * Has a fluent builder interface for configuring its semantics.
+ */
+ def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
+ ThreadPoolConfigDispatcherBuilder(config =>
+ new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig())
+
+ /**
+ * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
+ * <p/>
+ * Has a fluent builder interface for configuring its semantics.
+ */
+ def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) =
+ ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, config), ThreadPoolConfig())
+
+ /**
+ * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
+ * <p/>
+ * Has a fluent builder interface for configuring its semantics.
+ */
+ def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int) =
+ ThreadPoolConfigDispatcherBuilder(config =>
+ new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config), ThreadPoolConfig())
+
+ /**
+ * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
+ * <p/>
+ * Has a fluent builder interface for configuring its semantics.
+ */
+ def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
+ ThreadPoolConfigDispatcherBuilder(config =>
+ new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig())
+
+ /**
+ * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
+ * <p/>
+ * Has a fluent builder interface for configuring its semantics.
+ */
+ def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
+ ThreadPoolConfigDispatcherBuilder(config =>
+ new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig())
+ /**
+ * Utility function that tries to load the specified dispatcher config from the akka.conf
+ * or else use the supplied default dispatcher
+ */
+ def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher =
+ config getSection key flatMap from getOrElse default
+
+ /*
+ * Creates of obtains a dispatcher from a ConfigMap according to the format below
+ *
+ * default-dispatcher {
+ * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
+ * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
+ * # GlobalExecutorBasedEventDriven
+ * # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
+ * keep-alive-time = 60 # Keep alive time for threads
+ * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
+ * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
+ * executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
+ * allow-core-timeout = on # Allow core threads to time out
+ * rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
+ * throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
+ * }
+ * ex: from(config.getConfigMap(identifier).get)
+ *
+ * Gotcha: Only configures the dispatcher if possible
+ * Returns: None if "type" isn't specified in the config
+ * Throws: IllegalArgumentException if the value of "type" is not valid
+ * IllegalArgumentException if it cannot
+ */
+ def from(cfg: Configuration): Option[MessageDispatcher] = {
+ cfg.getString("type") map {
+ case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcherConfigurator()
+ case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator()
+ case "GlobalExecutorBasedEventDriven" => GlobalExecutorBasedEventDrivenDispatcherConfigurator
+ case fqn =>
+ ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
+ case r: Right[_, Class[MessageDispatcherConfigurator]] =>
+ ReflectiveAccess.createInstance[MessageDispatcherConfigurator](r.b, Array[Class[_]](), Array[AnyRef]()) match {
+ case r: Right[Exception, MessageDispatcherConfigurator] => r.b
+ case l: Left[Exception, MessageDispatcherConfigurator] =>
+ throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, l.a)
+ }
+ case l: Left[Exception, _] =>
+ throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, l.a)
+ }
+ } map {
+ _ configure cfg
+ }
+ }
+}
+
+object GlobalExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator {
+ def configure(config: Configuration): MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
+}
+
+class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator {
+ def configure(config: Configuration): MessageDispatcher = {
+ configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenDispatcher(
+ config.getString("name", newUuid.toString),
+ config.getInt("throughput", Dispatchers.THROUGHPUT),
+ config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
+ mailboxType(config),
+ threadPoolConfig)).build
+ }
+}
+
+class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends MessageDispatcherConfigurator {
+ def configure(config: Configuration): MessageDispatcher = {
+ configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(
+ config.getString("name", newUuid.toString),
+ config.getInt("throughput", Dispatchers.THROUGHPUT),
+ config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
+ mailboxType(config),
+ threadPoolConfig)).build
+ }
+} \ No newline at end of file
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala
new file mode 100644
index 0000000000..bc3f29ac68
--- /dev/null
+++ b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala
@@ -0,0 +1,305 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package akka.dispatch
+
+import akka.event.EventHandler
+import akka.actor.{ ActorRef, IllegalActorStateException }
+import akka.util.{ ReflectiveAccess, Switch }
+
+import java.util.Queue
+import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue }
+
+/**
+ * Default settings are:
+ * <pre/>
+ * - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
+ * - NR_START_THREADS = 16
+ * - NR_MAX_THREADS = 128
+ * - KEEP_ALIVE_TIME = 60000L // one minute
+ * </pre>
+ * <p/>
+ *
+ * The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case.
+ * There is a default thread pool defined but make use of the builder if you need it. Here are some examples.
+ * <p/>
+ *
+ * Scala API.
+ * <p/>
+ * Example usage:
+ * <pre/>
+ * val dispatcher = new ExecutorBasedEventDrivenDispatcher("name")
+ * dispatcher
+ * .withNewThreadPoolWithBoundedBlockingQueue(100)
+ * .setCorePoolSize(16)
+ * .setMaxPoolSize(128)
+ * .setKeepAliveTimeInMillis(60000)
+ * .setRejectionPolicy(new CallerRunsPolicy)
+ * .buildThreadPool
+ * </pre>
+ * <p/>
+ *
+ * Java API.
+ * <p/>
+ * Example usage:
+ * <pre/>
+ * ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name");
+ * dispatcher
+ * .withNewThreadPoolWithBoundedBlockingQueue(100)
+ * .setCorePoolSize(16)
+ * .setMaxPoolSize(128)
+ * .setKeepAliveTimeInMillis(60000)
+ * .setRejectionPolicy(new CallerRunsPolicy())
+ * .buildThreadPool();
+ * </pre>
+ * <p/>
+ *
+ * But the preferred way of creating dispatchers is to use
+ * the {@link akka.dispatch.Dispatchers} factory object.
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
+ * @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
+ * mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher
+ * always continues until the mailbox is empty.
+ * Larger values (or zero or negative) increase throughput, smaller values increase fairness
+ */
+class ExecutorBasedEventDrivenDispatcher(
+ _name: String,
+ val throughput: Int = Dispatchers.THROUGHPUT,
+ val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
+ val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
+ val config: ThreadPoolConfig = ThreadPoolConfig())
+ extends MessageDispatcher {
+
+ def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
+ this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
+
+ def this(_name: String, throughput: Int, mailboxType: MailboxType) =
+ this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
+
+ def this(_name: String, throughput: Int) =
+ this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
+
+ def this(_name: String, _config: ThreadPoolConfig) =
+ this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
+
+ def this(_name: String) =
+ this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
+
+ val name = "akka:event-driven:dispatcher:" + _name
+
+ private[akka] val threadFactory = new MonitorableThreadFactory(name)
+ private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
+
+ private[akka] def dispatch(invocation: MessageInvocation) = {
+ val mbox = getMailbox(invocation.receiver)
+ mbox enqueue invocation
+ registerForExecution(mbox)
+ }
+
+ private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit = if (active.isOn) {
+ try executorService.get() execute invocation
+ catch {
+ case e: RejectedExecutionException =>
+ EventHandler.warning(this, e.toString)
+ throw e
+ }
+ }
+
+ /**
+ * @return the mailbox associated with the actor
+ */
+ protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
+
+ override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
+
+ def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
+ case b: UnboundedMailbox =>
+ new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox {
+ @inline
+ final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
+ @inline
+ final def enqueue(m: MessageInvocation) = this.add(m)
+ @inline
+ final def dequeue(): MessageInvocation = this.poll()
+ }
+ case b: BoundedMailbox =>
+ new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox {
+ @inline
+ final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
+ }
+ }
+
+ private[akka] def start {}
+
+ private[akka] def shutdown {
+ val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
+ if (old ne null) {
+ old.shutdownNow()
+ }
+ }
+
+ private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
+ if (mbox.dispatcherLock.tryLock()) {
+ if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended
+ try {
+ executorService.get() execute mbox
+ } catch {
+ case e: RejectedExecutionException =>
+ EventHandler.warning(this, e.toString)
+ mbox.dispatcherLock.unlock()
+ throw e
+ }
+ } else {
+ mbox.dispatcherLock.unlock() //If the dispatcher isn't active or if the actor is suspended, unlock the dispatcher lock
+ }
+ }
+ }
+
+ private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
+ registerForExecution(mbox)
+
+ override val toString = getClass.getSimpleName + "[" + name + "]"
+
+ def suspend(actorRef: ActorRef) {
+ getMailbox(actorRef).suspended.tryLock
+ }
+
+ def resume(actorRef: ActorRef) {
+ val mbox = getMailbox(actorRef)
+ mbox.suspended.tryUnlock
+ reRegisterForExecution(mbox)
+ }
+}
+
+/**
+ * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox.
+ */
+trait ExecutableMailbox extends Runnable { self: MessageQueue =>
+
+ def dispatcher: ExecutorBasedEventDrivenDispatcher
+
+ final def run = {
+ try {
+ processMailbox()
+ } catch {
+ case ie: InterruptedException =>
+ }
+ finally {
+ dispatcherLock.unlock()
+ }
+ if (!self.isEmpty)
+ dispatcher.reRegisterForExecution(this)
+ }
+
+ /**
+ * Process the messages in the mailbox
+ *
+ * @return true if the processing finished before the mailbox was empty, due to the throughput constraint
+ */
+ final def processMailbox() {
+ if (!self.suspended.locked) {
+ var nextMessage = self.dequeue
+ if (nextMessage ne null) { //If we have a message
+ if (dispatcher.throughput <= 1) //If we only run one message per process
+ nextMessage.invoke //Just run it
+ else { //But otherwise, if we are throttled, we need to do some book-keeping
+ var processedMessages = 0
+ val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0
+ val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime)
+ else 0
+ do {
+ nextMessage.invoke
+ nextMessage =
+ if (self.suspended.locked) {
+ null // If we are suspended, abort
+ } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
+ processedMessages += 1
+ if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
+ null //We reached our boundaries, abort
+ else self.dequeue //Dequeue the next message
+ }
+ } while (nextMessage ne null)
+ }
+ }
+ }
+ }
+}
+
+object PriorityGenerator {
+ /**
+ * Creates a PriorityGenerator that uses the supplied function as priority generator
+ */
+ def apply(priorityFunction: Any => Int): PriorityGenerator = new PriorityGenerator {
+ def gen(message: Any): Int = priorityFunction(message)
+ }
+}
+
+/**
+ * A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a
+ * PriorityExecutorBasedEventDrivenDispatcher
+ */
+abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] {
+ def gen(message: Any): Int
+
+ final def compare(thisMessage: MessageInvocation, thatMessage: MessageInvocation): Int =
+ gen(thisMessage.message) - gen(thatMessage.message)
+}
+
+/**
+ * A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox,
+ * prioritized according to the supplied comparator.
+ *
+ * The dispatcher will process the messages with the _lowest_ priority first.
+ */
+class PriorityExecutorBasedEventDrivenDispatcher(
+ name: String,
+ val comparator: java.util.Comparator[MessageInvocation],
+ throughput: Int = Dispatchers.THROUGHPUT,
+ throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
+ mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
+ config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox {
+
+ def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
+ this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
+
+ def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) =
+ this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
+
+ def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
+ this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
+
+ def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) =
+ this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config)
+
+ def this(name: String, comparator: java.util.Comparator[MessageInvocation]) =
+ this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
+}
+
+/**
+ * Can be used to give an ExecutorBasedEventDrivenDispatcher's actors priority-enabled mailboxes
+ *
+ * Usage:
+ * new ExecutorBasedEventDrivenDispatcher(...) with PriorityMailbox {
+ * val comparator = ...comparator that determines mailbox priority ordering...
+ * }
+ */
+trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher =>
+ def comparator: java.util.Comparator[MessageInvocation]
+
+ override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match {
+ case b: UnboundedMailbox =>
+ new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox {
+ @inline
+ final def dispatcher = self
+ }
+
+ case b: BoundedMailbox =>
+ new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, comparator) with ExecutableMailbox {
+ @inline
+ final def dispatcher = self
+ }
+ }
+}
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
new file mode 100644
index 0000000000..4cba8eec8b
--- /dev/null
+++ b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
@@ -0,0 +1,165 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package akka.dispatch
+
+import akka.actor.{ ActorRef, Actor, IllegalActorStateException }
+import akka.util.{ ReflectiveAccess, Switch }
+
+import java.util.Queue
+import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
+import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue }
+import util.DynamicVariable
+
+/**
+ * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
+ * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the
+ * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
+ * <p/>
+ * Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
+ * best described as "work donating" because the actor of which work is being stolen takes the initiative.
+ * <p/>
+ * The preferred way of creating dispatchers is to use
+ * the {@link akka.dispatch.Dispatchers} factory object.
+ *
+ * @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
+ * @see akka.dispatch.Dispatchers
+ *
+ * @author Viktor Klang
+ */
+class ExecutorBasedEventDrivenWorkStealingDispatcher(
+ _name: String,
+ throughput: Int = Dispatchers.THROUGHPUT,
+ throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
+ mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
+ config: ThreadPoolConfig = ThreadPoolConfig())
+ extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) {
+
+ def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
+ this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
+
+ def this(_name: String, throughput: Int, mailboxType: MailboxType) =
+ this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
+
+ def this(_name: String, throughput: Int) =
+ this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
+
+ def this(_name: String, _config: ThreadPoolConfig) =
+ this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
+
+ def this(_name: String, memberType: Class[_ <: Actor]) =
+ this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
+
+ def this(_name: String, mailboxType: MailboxType) =
+ this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
+
+ @volatile
+ private var actorType: Option[Class[_]] = None
+ @volatile
+ private var members = Vector[ActorRef]()
+ private val donationInProgress = new DynamicVariable(false)
+
+ private[akka] override def register(actorRef: ActorRef) = {
+ //Verify actor type conformity
+ actorType match {
+ case None => actorType = Some(actorRef.actor.getClass)
+ case Some(aType) =>
+ if (aType != actorRef.actor.getClass)
+ throw new IllegalActorStateException(String.format(
+ "Can't register actor %s in a work stealing dispatcher which already knows actors of type %s",
+ actorRef, aType))
+ }
+
+ synchronized { members :+= actorRef } //Update members
+ super.register(actorRef)
+ }
+
+ private[akka] override def unregister(actorRef: ActorRef) = {
+ synchronized { members = members.filterNot(actorRef eq) } //Update members
+ super.unregister(actorRef)
+ }
+
+ override private[akka] def dispatch(invocation: MessageInvocation) = {
+ val mbox = getMailbox(invocation.receiver)
+ if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) {
+ //We were busy and we got to donate the message to some other lucky guy, we're done here
+ } else {
+ mbox enqueue invocation
+ registerForExecution(mbox)
+ }
+ }
+
+ override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
+ try {
+ donationInProgress.value = true
+ while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor
+ } finally { donationInProgress.value = false }
+
+ if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution
+ super.reRegisterForExecution(mbox)
+ }
+
+ /**
+ * Returns true if it successfully donated a message
+ */
+ protected def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
+ val actors = members // copy to prevent concurrent modifications having any impact
+
+ // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
+ // the dispatcher is being shut down...
+ // Starts at is seeded by current time
+ doFindDonorRecipient(donorMbox, actors, (System.currentTimeMillis % actors.size).asInstanceOf[Int]) match {
+ case null => false
+ case recipient => donate(donorMbox.dequeue, recipient)
+ }
+ }
+
+ /**
+ * Returns true if the donation succeeded or false otherwise
+ */
+ protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = try {
+ donationInProgress.value = true
+ val actors = members // copy to prevent concurrent modifications having any impact
+ doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match {
+ case null => false
+ case recipient => donate(message, recipient)
+ }
+ } finally { donationInProgress.value = false }
+
+ /**
+ * Rewrites the message and adds that message to the recipients mailbox
+ * returns true if the message is non-null
+ */
+ protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = {
+ if (organ ne null) {
+ if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
+ organ.message, recipient.timeout, organ.sender, organ.senderFuture)
+ else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender)
+ else recipient.postMessageToMailbox(organ.message, None)
+ true
+ } else false
+ }
+
+ /**
+ * Returns an available recipient for the message, if any
+ */
+ protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorRef], startIndex: Int): ActorRef = {
+ val prSz = potentialRecipients.size
+ var i = 0
+ var recipient: ActorRef = null
+
+ while ((i < prSz) && (recipient eq null)) {
+ val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap
+ val mbox = getMailbox(actor)
+
+ if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself
+ recipient = actor //Found!
+ }
+
+ i += 1
+ }
+
+ recipient // nothing found, reuse same start index next time
+ }
+}
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/Future.scala b/test/disabled/presentation/akka/src/akka/dispatch/Future.scala
new file mode 100644
index 0000000000..1ad304d726
--- /dev/null
+++ b/test/disabled/presentation/akka/src/akka/dispatch/Future.scala
@@ -0,0 +1,832 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package akka.dispatch
+
+import akka.AkkaException
+import akka.event.EventHandler
+import akka.actor.{ Actor, Channel }
+import akka.util.Duration
+import akka.japi.{ Procedure, Function => JFunc }
+
+import scala.util.continuations._
+
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
+import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS => MILLIS }
+import java.util.concurrent.atomic.{ AtomicBoolean }
+import java.lang.{ Iterable => JIterable }
+import java.util.{ LinkedList => JLinkedList }
+import scala.collection.mutable.Stack
+import annotation.tailrec
+
+class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
+
+object Futures {
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T]): Future[T] =
+ Future(body.call)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], timeout: Long): Future[T] =
+ Future(body.call, timeout)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] =
+ Future(body.call)(dispatcher)
+
+ /**
+ * Java API, equivalent to Future.apply
+ */
+ def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
+ Future(body.call, timeout)(dispatcher)
+
+ /**
+ * Returns a Future to the result of the first future in the list that is completed
+ */
+ def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = {
+ val futureResult = new DefaultCompletableFuture[T](timeout)
+
+ val completeFirst: Future[T] => Unit = _.value.foreach(futureResult complete _)
+ for (f ← futures) f onComplete completeFirst
+
+ futureResult
+ }
+
+ /**
+ * Java API.
+ * Returns a Future to the result of the first future in the list that is completed
+ */
+ def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] =
+ firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
+
+ /**
+ * A non-blocking fold over the specified futures.
+ * The fold is performed on the thread where the last future is completed,
+ * the result will be the first failure of any of the futures, or any failure in the actual fold,
+ * or the result of the fold.
+ * Example:
+ * <pre>
+ * val result = Futures.fold(0)(futures)(_ + _).await.result
+ * </pre>
+ */
+ def fold[T, R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = {
+ if (futures.isEmpty) {
+ new AlreadyCompletedFuture[R](Right(zero))
+ } else {
+ val result = new DefaultCompletableFuture[R](timeout)
+ val results = new ConcurrentLinkedQueue[T]()
+ val allDone = futures.size
+
+ val aggregate: Future[T] => Unit = f => if (!result.isCompleted) { //TODO: This is an optimization, is it premature?
+ f.value.get match {
+ case r: Right[Throwable, T] =>
+ results add r.b
+ if (results.size == allDone) { //Only one thread can get here
+ try {
+ result completeWithResult scala.collection.JavaConversions.collectionAsScalaIterable(results).foldLeft(zero)(foldFun)
+ } catch {
+ case e: Exception =>
+ EventHandler.error(e, this, e.getMessage)
+ result completeWithException e
+ }
+ finally {
+ results.clear
+ }
+ }
+ case l: Left[Throwable, T] =>
+ result completeWithException l.a
+ results.clear
+ }
+ }
+
+ futures foreach { _ onComplete aggregate }
+ result
+ }
+ }
+
+ /**
+ * Java API
+ * A non-blocking fold over the specified futures.
+ * The fold is performed on the thread where the last future is completed,
+ * the result will be the first failure of any of the futures, or any failure in the actual fold,
+ * or the result of the fold.
+ */
+ def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
+ fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)
+
+ /**
+ * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
+ * Example:
+ * <pre>
+ * val result = Futures.reduce(futures)(_ + _).await.result
+ * </pre>
+ */
+ def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R, T) => T): Future[R] = {
+ if (futures.isEmpty)
+ new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left")))
+ else {
+ val result = new DefaultCompletableFuture[R](timeout)
+ val seedFound = new AtomicBoolean(false)
+ val seedFold: Future[T] => Unit = f => {
+ if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
+ f.value.get match {
+ case r: Right[Throwable, T] =>
+ result.completeWith(fold(r.b, timeout)(futures.filterNot(_ eq f))(op))
+ case l: Left[Throwable, T] =>
+ result.completeWithException(l.a)
+ }
+ }
+ }
+ for (f ← futures) f onComplete seedFold //Attach the listener to the Futures
+ result
+ }
+ }
+
+ /**
+ * Java API.
+ * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
+ */
+ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] =
+ reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
+
+ /**
+ * Java API.
+ * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.util.LinkedList[A]].
+ * Useful for reducing many Futures into a single Future.
+ */
+ def sequence[A](in: JIterable[Future[A]], timeout: Long): Future[JLinkedList[A]] =
+ scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) =>
+ for (r ← fr; a ← fa) yield {
+ r add a
+ r
+ })
+
+ /**
+ * Java API.
+ * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.util.LinkedList[A]].
+ * Useful for reducing many Futures into a single Future.
+ */
+ def sequence[A](in: JIterable[Future[A]]): Future[JLinkedList[A]] = sequence(in, Actor.TIMEOUT)
+
+ /**
+ * Java API.
+ * Transforms a java.lang.Iterable[A] into a Future[java.util.LinkedList[B]] using the provided Function A => Future[B].
+ * This is useful for performing a parallel map. For example, to apply a function to all items of a list
+ * in parallel.
+ */
+ def traverse[A, B](in: JIterable[A], timeout: Long, fn: JFunc[A, Future[B]]): Future[JLinkedList[B]] =
+ scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) =>
+ val fb = fn(a)
+ for (r ← fr; b ← fb) yield {
+ r add b
+ r
+ }
+ }
+
+ /**
+ * Java API.
+ * Transforms a java.lang.Iterable[A] into a Future[java.util.LinkedList[B]] using the provided Function A => Future[B].
+ * This is useful for performing a parallel map. For example, to apply a function to all items of a list
+ * in parallel.
+ */
+ def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JLinkedList[B]] = traverse(in, Actor.TIMEOUT, fn)
+
+ // =====================================
+ // Deprecations
+ // =====================================
+
+ /**
+ * (Blocking!)
+ */
+ @deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)", "1.1")
+ def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
+
+ /**
+ * Returns the First Future that is completed (blocking!)
+ */
+ @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await", "1.1")
+ def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await
+
+ /**
+ * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed
+ */
+ @deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }", "1.1")
+ def awaitMap[A, B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
+ in map { f => fun(f.await) }
+
+ /**
+ * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
+ */
+ @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException", "1.1")
+ def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1, f2)).await.resultOrException
+}
+
+object Future {
+ /**
+ * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
+ * The execution is performed by the specified Dispatcher.
+ */
+ def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] =
+ dispatcher.dispatchFuture(() => body, timeout)
+
+ /**
+ * Construct a completable channel
+ */
+ def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] {
+ val future = empty[Any](timeout)
+ def !(msg: Any) = future completeWithResult msg
+ }
+
+ /**
+ * Create an empty Future with default timeout
+ */
+ def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultCompletableFuture[T](timeout)
+
+ import scala.collection.mutable.Builder
+ import scala.collection.generic.CanBuildFrom
+
+ /**
+ * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
+ * Useful for reducing many Futures into a single Future.
+ */
+ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
+ in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) => for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
+
+ /**
+ * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B].
+ * This is useful for performing a parallel map. For example, to apply a function to all items of a list
+ * in parallel:
+ * <pre>
+ * val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x)))
+ * </pre>
+ */
+ def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
+ in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) =>
+ val fb = fn(a.asInstanceOf[A])
+ for (r ← fr; b ← fb) yield (r += b)
+ }.map(_.result)
+
+ /**
+ * Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
+ * Continuations plugin.
+ *
+ * Within the block, the result of a Future may be accessed by calling Future.apply. At that point
+ * execution is suspended with the rest of the block being stored in a continuation until the result
+ * of the Future is available. If an Exception is thrown while processing, it will be contained
+ * within the resulting Future.
+ *
+ * This allows working with Futures in an imperative style without blocking for each result.
+ *
+ * Completing a Future using 'CompletableFuture << Future' will also suspend execution until the
+ * value of the other Future is available.
+ *
+ * The Delimited Continuations compiler plugin must be enabled in order to use this method.
+ */
+ def flow[A](body: => A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = {
+ val future = Promise[A](timeout)
+ (reset(future.asInstanceOf[CompletableFuture[Any]].completeWithResult(body)): Future[Any]) onComplete { f =>
+ val opte = f.exception
+ if (opte.isDefined) future completeWithException (opte.get)
+ }
+ future
+ }
+
+ private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() => Unit]]]() {
+ override def initialValue = None
+ }
+}
+
+sealed trait Future[+T] {
+
+ /**
+ * For use only within a Future.flow block or another compatible Delimited Continuations reset block.
+ *
+ * Returns the result of this Future without blocking, by suspending execution and storing it as a
+ * continuation until the result is available.
+ *
+ * If this Future is untyped (a Future[Nothing]), a type parameter must be explicitly provided or
+ * execution will fail. The normal result of getting a Future from an ActorRef using !!! will return
+ * an untyped Future.
+ */
+ def apply[A >: T](): A @cps[Future[Any]] = shift(this flatMap (_: A => Future[Any]))
+
+ /**
+ * Blocks awaiting completion of this Future, then returns the resulting value,
+ * or throws the completed exception
+ *
+ * Scala & Java API
+ *
+ * throws FutureTimeoutException if this Future times out when waiting for completion
+ */
+ def get: T = this.await.resultOrException.get
+
+ /**
+ * Blocks the current thread until the Future has been completed or the
+ * timeout has expired. In the case of the timeout expiring a
+ * FutureTimeoutException will be thrown.
+ */
+ def await: Future[T]
+
+ /**
+ * Blocks the current thread until the Future has been completed or the
+ * timeout has expired. The timeout will be the least value of 'atMost' and the timeout
+ * supplied at the constructuion of this Future.
+ * In the case of the timeout expiring a FutureTimeoutException will be thrown.
+ */
+ def await(atMost: Duration): Future[T]
+
+ /**
+ * Blocks the current thread until the Future has been completed. Use
+ * caution with this method as it ignores the timeout and will block
+ * indefinitely if the Future is never completed.
+ */
+ @deprecated("Will be removed after 1.1, it's dangerous and can cause deadlocks, agony and insanity.", "1.1")
+ def awaitBlocking: Future[T]
+
+ /**
+ * Tests whether this Future has been completed.
+ */
+ final def isCompleted: Boolean = value.isDefined
+
+ /**
+ * Tests whether this Future's timeout has expired.
+ *
+ * Note that an expired Future may still contain a value, or it may be
+ * completed with a value.
+ */
+ def isExpired: Boolean
+
+ /**
+ * This Future's timeout in nanoseconds.
+ */
+ def timeoutInNanos: Long
+
+ /**
+ * The contained value of this Future. Before this Future is completed
+ * the value will be None. After completion the value will be Some(Right(t))
+ * if it contains a valid result, or Some(Left(error)) if it contains
+ * an exception.
+ */
+ def value: Option[Either[Throwable, T]]
+
+ /**
+ * Returns the successful result of this Future if it exists.
+ */
+ final def result: Option[T] = {
+ val v = value
+ if (v.isDefined) v.get.right.toOption
+ else None
+ }
+
+ /**
+ * Returns the contained exception of this Future if it exists.
+ */
+ final def exception: Option[Throwable] = {
+ val v = value
+ if (v.isDefined) v.get.left.toOption
+ else None
+ }
+
+ /**
+ * When this Future is completed, apply the provided function to the
+ * Future. If the Future has already been completed, this will apply
+ * immediately.
+ */
+ def onComplete(func: Future[T] => Unit): Future[T]
+
+ /**
+ * When the future is completed with a valid result, apply the provided
+ * PartialFunction to the result.
+ * <pre>
+ * val result = future receive {
+ * case Foo => "foo"
+ * case Bar => "bar"
+ * }.await.result
+ * </pre>
+ */
+ final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f =>
+ val optr = f.result
+ if (optr.isDefined) {
+ val r = optr.get
+ if (pf.isDefinedAt(r)) pf(r)
+ }
+ }
+
+ /**
+ * Creates a new Future by applying a PartialFunction to the successful
+ * result of this Future if a match is found, or else return a MatchError.
+ * If this Future is completed with an exception then the new Future will
+ * also contain this exception.
+ * Example:
+ * <pre>
+ * val future1 = for {
+ * a <- actor !!! Req("Hello") collect { case Res(x: Int) => x }
+ * b <- actor !!! Req(a) collect { case Res(x: String) => x }
+ * c <- actor !!! Req(7) collect { case Res(x: String) => x }
+ * } yield b + "-" + c
+ * </pre>
+ */
+ final def collect[A](pf: PartialFunction[Any, A]): Future[A] = {
+ val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
+ onComplete { ft =>
+ val v = ft.value.get
+ fa complete {
+ if (v.isLeft) v.asInstanceOf[Either[Throwable, A]]
+ else {
+ try {
+ val r = v.right.get
+ if (pf isDefinedAt r) Right(pf(r))
+ else Left(new MatchError(r))
+ } catch {
+ case e: Exception =>
+ EventHandler.error(e, this, e.getMessage)
+ Left(e)
+ }
+ }
+ }
+ }
+ fa
+ }
+
+ /**
+ * Creates a new Future that will handle any matching Throwable that this
+ * Future might contain. If there is no match, or if this Future contains
+ * a valid result then the new Future will contain the same.
+ * Example:
+ * <pre>
+ * Future(6 / 0) failure { case e: ArithmeticException => 0 } // result: 0
+ * Future(6 / 0) failure { case e: NotFoundException => 0 } // result: exception
+ * Future(6 / 2) failure { case e: ArithmeticException => 0 } // result: 3
+ * </pre>
+ */
+ final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
+ val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
+ onComplete { ft =>
+ val opte = ft.exception
+ fa complete {
+ if (opte.isDefined) {
+ val e = opte.get
+ try {
+ if (pf isDefinedAt e) Right(pf(e))
+ else Left(e)
+ } catch {
+ case x: Exception => Left(x)
+ }
+ } else ft.value.get
+ }
+ }
+ fa
+ }
+
+ /**
+ * Creates a new Future by applying a function to the successful result of
+ * this Future. If this Future is completed with an exception then the new
+ * Future will also contain this exception.
+ * Example:
+ * <pre>
+ * val future1 = for {
+ * a: Int <- actor !!! "Hello" // returns 5
+ * b: String <- actor !!! a // returns "10"
+ * c: String <- actor !!! 7 // returns "14"
+ * } yield b + "-" + c
+ * </pre>
+ */
+ final def map[A](f: T => A): Future[A] = {
+ val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
+ onComplete { ft =>
+ val optv = ft.value
+ if (optv.isDefined) {
+ val v = optv.get
+ if (v.isLeft)
+ fa complete v.asInstanceOf[Either[Throwable, A]]
+ else {
+ fa complete (try {
+ Right(f(v.right.get))
+ } catch {
+ case e: Exception =>
+ EventHandler.error(e, this, e.getMessage)
+ Left(e)
+ })
+ }
+ }
+ }
+ fa
+ }
+
+ /**
+ * Creates a new Future by applying a function to the successful result of
+ * this Future, and returns the result of the function as the new Future.
+ * If this Future is completed with an exception then the new Future will
+ * also contain this exception.
+ * Example:
+ * <pre>
+ * val future1 = for {
+ * a: Int <- actor !!! "Hello" // returns 5
+ * b: String <- actor !!! a // returns "10"
+ * c: String <- actor !!! 7 // returns "14"
+ * } yield b + "-" + c
+ * </pre>
+ */
+ final def flatMap[A](f: T => Future[A]): Future[A] = {
+ val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
+ onComplete { ft =>
+ val optv = ft.value
+ if (optv.isDefined) {
+ val v = optv.get
+ if (v.isLeft)
+ fa complete v.asInstanceOf[Either[Throwable, A]]
+ else {
+ try {
+ fa.completeWith(f(v.right.get))
+ } catch {
+ case e: Exception =>
+ EventHandler.error(e, this, e.getMessage)
+ fa completeWithException e
+ }
+ }
+ }
+ }
+ fa
+ }
+
+ final def foreach(f: T => Unit): Unit = onComplete { ft =>
+ val optr = ft.result
+ if (optr.isDefined)
+ f(optr.get)
+ }
+
+ final def filter(p: Any => Boolean): Future[Any] = {
+ val f = new DefaultCompletableFuture[T](timeoutInNanos, NANOS)
+ onComplete { ft =>
+ val optv = ft.value
+ if (optv.isDefined) {
+ val v = optv.get
+ if (v.isLeft)
+ f complete v
+ else {
+ val r = v.right.get
+ f complete (try {
+ if (p(r)) Right(r)
+ else Left(new MatchError(r))
+ } catch {
+ case e: Exception =>
+ EventHandler.error(e, this, e.getMessage)
+ Left(e)
+ })
+ }
+ }
+ }
+ f
+ }
+
+ /**
+ * Returns the current result, throws the exception is one has been raised, else returns None
+ */
+ final def resultOrException: Option[T] = {
+ val v = value
+ if (v.isDefined) {
+ val r = v.get
+ if (r.isLeft) throw r.left.get
+ else r.right.toOption
+ } else None
+ }
+
+ /* Java API */
+ final def onComplete[A >: T](proc: Procedure[Future[A]]): Future[T] = onComplete(proc(_))
+
+ final def map[A >: T, B](f: JFunc[A, B]): Future[B] = map(f(_))
+
+ final def flatMap[A >: T, B](f: JFunc[A, Future[B]]): Future[B] = flatMap(f(_))
+
+ final def foreach[A >: T](proc: Procedure[A]): Unit = foreach(proc(_))
+
+ final def filter(p: JFunc[Any, Boolean]): Future[Any] = filter(p(_))
+
+}
+
+object Promise {
+
+ def apply[A](timeout: Long): CompletableFuture[A] = new DefaultCompletableFuture[A](timeout)
+
+ def apply[A](): CompletableFuture[A] = apply(Actor.TIMEOUT)
+
+}
+
+/**
+ * Essentially this is the Promise (or write-side) of a Future (read-side).
+ */
+trait CompletableFuture[T] extends Future[T] {
+ /**
+ * Completes this Future with the specified result, if not already completed.
+ * @return this
+ */
+ def complete(value: Either[Throwable, T]): Future[T]
+
+ /**
+ * Completes this Future with the specified result, if not already completed.
+ * @return this
+ */
+ final def completeWithResult(result: T): Future[T] = complete(Right(result))
+
+ /**
+ * Completes this Future with the specified exception, if not already completed.
+ * @return this
+ */
+ final def completeWithException(exception: Throwable): Future[T] = complete(Left(exception))
+
+ /**
+ * Completes this Future with the specified other Future, when that Future is completed,
+ * unless this Future has already been completed.
+ * @return this.
+ */
+ final def completeWith(other: Future[T]): Future[T] = {
+ other onComplete { f => complete(f.value.get) }
+ this
+ }
+
+ final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => cont(complete(Right(value))) }
+
+ final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) =>
+ val fr = new DefaultCompletableFuture[Any](Actor.TIMEOUT)
+ this completeWith other onComplete { f =>
+ try {
+ fr completeWith cont(f)
+ } catch {
+ case e: Exception =>
+ EventHandler.error(e, this, e.getMessage)
+ fr completeWithException e
+ }
+ }
+ fr
+ }
+
+}
+
+/**
+ * The default concrete Future implementation.
+ */
+class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] {
+
+ def this() = this(0, MILLIS)
+
+ def this(timeout: Long) = this(timeout, MILLIS)
+
+ val timeoutInNanos = timeunit.toNanos(timeout)
+ private val _startTimeInNanos = currentTimeInNanos
+ private val _lock = new ReentrantLock
+ private val _signal = _lock.newCondition
+ private var _value: Option[Either[Throwable, T]] = None
+ private var _listeners: List[Future[T] => Unit] = Nil
+
+ /**
+ * Must be called inside _lock.lock<->_lock.unlock
+ */
+ @tailrec
+ private def awaitUnsafe(waitTimeNanos: Long): Boolean = {
+ if (_value.isEmpty && waitTimeNanos > 0) {
+ val start = currentTimeInNanos
+ val remainingNanos = try {
+ _signal.awaitNanos(waitTimeNanos)
+ } catch {
+ case e: InterruptedException =>
+ waitTimeNanos - (currentTimeInNanos - start)
+ }
+ awaitUnsafe(remainingNanos)
+ } else {
+ _value.isDefined
+ }
+ }
+
+ def await(atMost: Duration) = {
+ _lock.lock
+ if (try { awaitUnsafe(atMost.toNanos min timeLeft()) } finally { _lock.unlock }) this
+ else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
+ }
+
+ def await = {
+ _lock.lock
+ if (try { awaitUnsafe(timeLeft()) } finally { _lock.unlock }) this
+ else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
+ }
+
+ def awaitBlocking = {
+ _lock.lock
+ try {
+ while (_value.isEmpty) {
+ _signal.await
+ }
+ this
+ } finally {
+ _lock.unlock
+ }
+ }
+
+ def isExpired: Boolean = timeLeft() <= 0
+
+ def value: Option[Either[Throwable, T]] = {
+ _lock.lock
+ try {
+ _value
+ } finally {
+ _lock.unlock
+ }
+ }
+
+ def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = {
+ _lock.lock
+ val notifyTheseListeners = try {
+ if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired
+ _value = Some(value)
+ val existingListeners = _listeners
+ _listeners = Nil
+ existingListeners
+ } else Nil
+ } finally {
+ _signal.signalAll
+ _lock.unlock
+ }
+
+ if (notifyTheseListeners.nonEmpty) { // Steps to ensure we don't run into a stack-overflow situation
+ @tailrec
+ def runCallbacks(rest: List[Future[T] => Unit], callbacks: Stack[() => Unit]) {
+ if (rest.nonEmpty) {
+ notifyCompleted(rest.head)
+ while (callbacks.nonEmpty) { callbacks.pop().apply() }
+ runCallbacks(rest.tail, callbacks)
+ }
+ }
+
+ val pending = Future.callbacksPendingExecution.get
+ if (pending.isDefined) { //Instead of nesting the calls to the callbacks (leading to stack overflow)
+ pending.get.push(() => { // Linearize/aggregate callbacks at top level and then execute
+ val doNotify = notifyCompleted _ //Hoist closure to avoid garbage
+ notifyTheseListeners foreach doNotify
+ })
+ } else {
+ try {
+ val callbacks = Stack[() => Unit]() // Allocate new aggregator for pending callbacks
+ Future.callbacksPendingExecution.set(Some(callbacks)) // Specify the callback aggregator
+ runCallbacks(notifyTheseListeners, callbacks) // Execute callbacks, if they trigger new callbacks, they are aggregated
+ } finally { Future.callbacksPendingExecution.set(None) } // Ensure cleanup
+ }
+ }
+
+ this
+ }
+
+ def onComplete(func: Future[T] => Unit): CompletableFuture[T] = {
+ _lock.lock
+ val notifyNow = try {
+ if (_value.isEmpty) {
+ if (!isExpired) { //Only add the listener if the future isn't expired
+ _listeners ::= func
+ false
+ } else false //Will never run the callback since the future is expired
+ } else true
+ } finally {
+ _lock.unlock
+ }
+
+ if (notifyNow) notifyCompleted(func)
+
+ this
+ }
+
+ private def notifyCompleted(func: Future[T] => Unit) {
+ try {
+ func(this)
+ } catch {
+ case e => EventHandler notify EventHandler.Error(e, this)
+ }
+ }
+
+ @inline
+ private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis)
+ @inline
+ private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
+}
+
+/**
+ * An already completed Future is seeded with it's result at creation, is useful for when you are participating in
+ * a Future-composition but you already have a value to contribute.
+ */
+sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] {
+ val value = Some(suppliedValue)
+
+ def complete(value: Either[Throwable, T]): CompletableFuture[T] = this
+ def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this }
+ def await(atMost: Duration): Future[T] = this
+ def await: Future[T] = this
+ def awaitBlocking: Future[T] = this
+ def isExpired: Boolean = true
+ def timeoutInNanos: Long = 0
+}
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala b/test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala
new file mode 100644
index 0000000000..4c00577157
--- /dev/null
+++ b/test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package akka.dispatch
+
+import akka.AkkaException
+
+import java.util.{ Comparator, PriorityQueue }
+import java.util.concurrent._
+import akka.util._
+
+class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
+
+/**
+ * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
+ */
+trait MessageQueue {
+ val dispatcherLock = new SimpleLock
+ val suspended = new SimpleLock
+ def enqueue(handle: MessageInvocation)
+ def dequeue(): MessageInvocation
+ def size: Int
+ def isEmpty: Boolean
+}
+
+/**
+ * Mailbox configuration.
+ */
+sealed trait MailboxType
+
+case class UnboundedMailbox() extends MailboxType
+case class BoundedMailbox(
+ val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
+ val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
+ if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
+ if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
+}
+
+trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] =>
+ @inline
+ final def enqueue(handle: MessageInvocation): Unit = this add handle
+ @inline
+ final def dequeue(): MessageInvocation = this.poll()
+}
+
+trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] =>
+ def pushTimeOut: Duration
+
+ final def enqueue(handle: MessageInvocation) {
+ if (pushTimeOut.length > 0) {
+ this.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
+ throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
+ }
+ } else this put handle
+ }
+
+ @inline
+ final def dequeue(): MessageInvocation = this.poll()
+}
+
+class DefaultUnboundedMessageQueue extends LinkedBlockingQueue[MessageInvocation] with UnboundedMessageQueueSemantics
+
+class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration) extends LinkedBlockingQueue[MessageInvocation](capacity) with BoundedMessageQueueSemantics
+
+class UnboundedPriorityMessageQueue(cmp: Comparator[MessageInvocation]) extends PriorityBlockingQueue[MessageInvocation](11, cmp) with UnboundedMessageQueueSemantics
+
+class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, cmp: Comparator[MessageInvocation]) extends BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with BoundedMessageQueueSemantics
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala b/test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala
new file mode 100644
index 0000000000..20887c3867
--- /dev/null
+++ b/test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala
@@ -0,0 +1,260 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package akka.dispatch
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+import akka.event.EventHandler
+import akka.config.Configuration
+import akka.config.Config.TIME_UNIT
+import akka.util.{ Duration, Switch, ReentrantGuard }
+import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy }
+import akka.actor._
+
+/**
+ * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
+ */
+final case class MessageInvocation(val receiver: ActorRef,
+ val message: Any,
+ val sender: Option[ActorRef],
+ val senderFuture: Option[CompletableFuture[Any]]) {
+ if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
+
+ def invoke = try {
+ receiver.invoke(this)
+ } catch {
+ case e: NullPointerException => throw new ActorInitializationException(
+ "Don't call 'self ! message' in the Actor's constructor (in Scala this means in the body of the class).")
+ }
+}
+
+final case class FutureInvocation[T](future: CompletableFuture[T], function: () => T, cleanup: () => Unit) extends Runnable {
+ def run = {
+ future complete (try {
+ Right(function())
+ } catch {
+ case e =>
+ EventHandler.error(e, this, e.getMessage)
+ Left(e)
+ }
+ finally {
+ cleanup()
+ })
+ }
+}
+
+object MessageDispatcher {
+ val UNSCHEDULED = 0
+ val SCHEDULED = 1
+ val RESCHEDULED = 2
+
+ implicit def defaultGlobalDispatcher = Dispatchers.defaultGlobalDispatcher
+}
+
+/**
+ * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
+ */
+trait MessageDispatcher {
+ import MessageDispatcher._
+
+ protected val uuids = new ConcurrentSkipListSet[Uuid]
+ protected val futures = new AtomicLong(0L)
+ protected val guard = new ReentrantGuard
+ protected val active = new Switch(false)
+
+ private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard
+
+ /**
+ * Creates and returns a mailbox for the given actor.
+ */
+ private[akka] def createMailbox(actorRef: ActorRef): AnyRef
+
+ /**
+ * Attaches the specified actorRef to this dispatcher
+ */
+ final def attach(actorRef: ActorRef): Unit = guard withGuard {
+ register(actorRef)
+ }
+
+ /**
+ * Detaches the specified actorRef from this dispatcher
+ */
+ final def detach(actorRef: ActorRef): Unit = guard withGuard {
+ unregister(actorRef)
+ }
+
+ private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = dispatch(invocation)
+
+ private[akka] final def dispatchFuture[T](block: () => T, timeout: Long): Future[T] = {
+ futures.getAndIncrement()
+ try {
+ val future = new DefaultCompletableFuture[T](timeout)
+
+ if (active.isOff)
+ guard withGuard { active.switchOn { start } }
+
+ executeFuture(FutureInvocation[T](future, block, futureCleanup))
+ future
+ } catch {
+ case e =>
+ futures.decrementAndGet
+ throw e
+ }
+ }
+
+ private val futureCleanup: () => Unit =
+ () => if (futures.decrementAndGet() == 0) {
+ guard withGuard {
+ if (futures.get == 0 && uuids.isEmpty) {
+ shutdownSchedule match {
+ case UNSCHEDULED =>
+ shutdownSchedule = SCHEDULED
+ Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
+ case SCHEDULED =>
+ shutdownSchedule = RESCHEDULED
+ case RESCHEDULED => //Already marked for reschedule
+ }
+ }
+ }
+ }
+
+ private[akka] def register(actorRef: ActorRef) {
+ if (actorRef.mailbox eq null)
+ actorRef.mailbox = createMailbox(actorRef)
+
+ uuids add actorRef.uuid
+ if (active.isOff) {
+ active.switchOn {
+ start
+ }
+ }
+ }
+
+ private[akka] def unregister(actorRef: ActorRef) = {
+ if (uuids remove actorRef.uuid) {
+ actorRef.mailbox = null
+ if (uuids.isEmpty && futures.get == 0) {
+ shutdownSchedule match {
+ case UNSCHEDULED =>
+ shutdownSchedule = SCHEDULED
+ Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
+ case SCHEDULED =>
+ shutdownSchedule = RESCHEDULED
+ case RESCHEDULED => //Already marked for reschedule
+ }
+ }
+ }
+ }
+
+ /**
+ * Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors
+ */
+ def stopAllAttachedActors {
+ val i = uuids.iterator
+ while (i.hasNext()) {
+ val uuid = i.next()
+ Actor.registry.actorFor(uuid) match {
+ case Some(actor) => actor.stop()
+ case None => {}
+ }
+ }
+ }
+
+ private val shutdownAction = new Runnable {
+ def run = guard withGuard {
+ shutdownSchedule match {
+ case RESCHEDULED =>
+ shutdownSchedule = SCHEDULED
+ Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
+ case SCHEDULED =>
+ if (uuids.isEmpty && futures.get == 0) {
+ active switchOff {
+ shutdown // shut down in the dispatcher's references is zero
+ }
+ }
+ shutdownSchedule = UNSCHEDULED
+ case UNSCHEDULED => //Do nothing
+ }
+ }
+ }
+
+ /**
+ * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms
+ * defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second
+ */
+ private[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis
+
+ /**
+ * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
+ */
+ def suspend(actorRef: ActorRef): Unit
+
+ /*
+ * After the call to this method, the dispatcher must begin any new message processing for the specified reference
+ */
+ def resume(actorRef: ActorRef): Unit
+
+ /**
+ * Will be called when the dispatcher is to queue an invocation for execution
+ */
+ private[akka] def dispatch(invocation: MessageInvocation): Unit
+
+ private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit
+
+ /**
+ * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown
+ */
+ private[akka] def start(): Unit
+
+ /**
+ * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
+ */
+ private[akka] def shutdown(): Unit
+
+ /**
+ * Returns the size of the mailbox for the specified actor
+ */
+ def mailboxSize(actorRef: ActorRef): Int
+
+ /**
+ * Returns the amount of futures queued for execution
+ */
+ def pendingFutures: Long = futures.get
+}
+
+/**
+ * Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig
+ */
+abstract class MessageDispatcherConfigurator {
+ /**
+ * Returns an instance of MessageDispatcher given a Configuration
+ */
+ def configure(config: Configuration): MessageDispatcher
+
+ def mailboxType(config: Configuration): MailboxType = {
+ val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY)
+ if (capacity < 1) UnboundedMailbox()
+ else BoundedMailbox(capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT))
+ }
+
+ def configureThreadPool(config: Configuration, createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
+ import ThreadPoolConfigDispatcherBuilder.conf_?
+
+ //Apply the following options to the config if they are present in the config
+ ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure(
+ conf_?(config getInt "keep-alive-time")(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
+ conf_?(config getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
+ conf_?(config getDouble "max-pool-size-factor")(factor => _.setMaxPoolSizeFromFactor(factor)),
+ conf_?(config getInt "executor-bounds")(bounds => _.setExecutorBounds(bounds)),
+ conf_?(config getBool "allow-core-timeout")(allow => _.setAllowCoreThreadTimeout(allow)),
+ conf_?(config getString "rejection-policy" map {
+ case "abort" => new AbortPolicy()
+ case "caller-runs" => new CallerRunsPolicy()
+ case "discard-oldest" => new DiscardOldestPolicy()
+ case "discard" => new DiscardPolicy()
+ case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
+ })(policy => _.setRejectionPolicy(policy)))
+ }
+}
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala
new file mode 100644
index 0000000000..3169c70ef9
--- /dev/null
+++ b/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package akka.dispatch
+
+import akka.actor.{ Actor, ActorRef }
+import akka.config.Config.config
+import akka.util.Duration
+
+import java.util.Queue
+import java.util.concurrent.{ ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue }
+import akka.actor
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
+ */
+class ThreadBasedDispatcher(_actor: ActorRef, _mailboxType: MailboxType)
+ extends ExecutorBasedEventDrivenDispatcher(
+ _actor.uuid.toString, Dispatchers.THROUGHPUT, -1, _mailboxType, ThreadBasedDispatcher.oneThread) {
+
+ private[akka] val owner = new AtomicReference[ActorRef](_actor)
+
+ def this(actor: ActorRef) =
+ this(actor, UnboundedMailbox()) // For Java API
+
+ def this(actor: ActorRef, capacity: Int) =
+ this(actor, BoundedMailbox(capacity)) //For Java API
+
+ def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = //For Java API
+ this(actor, BoundedMailbox(capacity, pushTimeOut))
+
+ override def register(actorRef: ActorRef) = {
+ val actor = owner.get()
+ if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
+ owner.compareAndSet(null, actorRef) //Register if unregistered
+ super.register(actorRef)
+ }
+
+ override def unregister(actorRef: ActorRef) = {
+ super.unregister(actorRef)
+ owner.compareAndSet(actorRef, null) //Unregister (prevent memory leak)
+ }
+}
+
+object ThreadBasedDispatcher {
+ val oneThread: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1)
+}
+
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala b/test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala
new file mode 100644
index 0000000000..e847610c4c
--- /dev/null
+++ b/test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala
@@ -0,0 +1,259 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
+ */
+
+package akka.dispatch
+
+import java.util.Collection
+import java.util.concurrent._
+import atomic.{ AtomicLong, AtomicInteger }
+import ThreadPoolExecutor.CallerRunsPolicy
+
+import akka.util.Duration
+import akka.event.EventHandler
+
+object ThreadPoolConfig {
+ type Bounds = Int
+ type FlowHandler = Either[RejectedExecutionHandler, Bounds]
+ type QueueFactory = () => BlockingQueue[Runnable]
+
+ val defaultAllowCoreThreadTimeout: Boolean = false
+ val defaultCorePoolSize: Int = 16
+ val defaultMaxPoolSize: Int = 128
+ val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS)
+ def defaultFlowHandler: FlowHandler = flowHandler(new CallerRunsPolicy)
+
+ def flowHandler(rejectionHandler: RejectedExecutionHandler): FlowHandler = Left(rejectionHandler)
+ def flowHandler(bounds: Int): FlowHandler = Right(bounds)
+
+ def fixedPoolSize(size: Int): Int = size
+ def scaledPoolSize(multiplier: Double): Int =
+ (Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt
+
+ def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory =
+ () => new ArrayBlockingQueue[Runnable](capacity, fair)
+
+ def synchronousQueue(fair: Boolean): QueueFactory =
+ () => new SynchronousQueue[Runnable](fair)
+
+ def linkedBlockingQueue(): QueueFactory =
+ () => new LinkedBlockingQueue[Runnable]()
+
+ def linkedBlockingQueue(capacity: Int): QueueFactory =
+ () => new LinkedBlockingQueue[Runnable](capacity)
+
+ def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory =
+ () => queue
+
+ def reusableQueue(queueFactory: QueueFactory): QueueFactory = {
+ val queue = queueFactory()
+ () => queue
+ }
+}
+
+case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
+ corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
+ maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
+ threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
+ flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler,
+ queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) {
+
+ final def createLazyExecutorService(threadFactory: ThreadFactory): ExecutorService =
+ new LazyExecutorServiceWrapper(createExecutorService(threadFactory))
+
+ final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = {
+ flowHandler match {
+ case Left(rejectHandler) =>
+ val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler)
+ service.allowCoreThreadTimeOut(allowCorePoolTimeout)
+ service
+ case Right(bounds) =>
+ val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory)
+ service.allowCoreThreadTimeOut(allowCorePoolTimeout)
+ new BoundedExecutorDecorator(service, bounds)
+ }
+ }
+}
+
+trait DispatcherBuilder {
+ def build: MessageDispatcher
+}
+
+object ThreadPoolConfigDispatcherBuilder {
+ def conf_?[T](opt: Option[T])(fun: (T) => ThreadPoolConfigDispatcherBuilder => ThreadPoolConfigDispatcherBuilder): Option[(ThreadPoolConfigDispatcherBuilder) => ThreadPoolConfigDispatcherBuilder] = opt map fun
+}
+
+case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) => MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
+ import ThreadPoolConfig._
+ def build = dispatcherFactory(config)
+
+ //TODO remove this, for backwards compat only
+ @deprecated("Use .build instead", "1.1")
+ def buildThreadPool = build
+
+ def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder =
+ this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue()))
+
+ def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
+ this.copy(config = config.copy(flowHandler = defaultFlowHandler, queueFactory = newQueueFactory))
+
+ def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder =
+ withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue))
+
+ def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder =
+ this.copy(config = config.copy(queueFactory = linkedBlockingQueue(), flowHandler = defaultFlowHandler))
+
+ def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder =
+ this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity), flowHandler = defaultFlowHandler))
+
+ def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder =
+ this.copy(config = config.copy(queueFactory = synchronousQueue(fair), flowHandler = defaultFlowHandler))
+
+ def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder =
+ this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair), flowHandler = defaultFlowHandler))
+
+ def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
+ this.copy(config = config.copy(corePoolSize = size))
+
+ def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
+ this.copy(config = config.copy(maxPoolSize = size))
+
+ def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
+ setCorePoolSize(scaledPoolSize(multiplier))
+
+ def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
+ setMaxPoolSize(scaledPoolSize(multiplier))
+
+ def setExecutorBounds(bounds: Int): ThreadPoolConfigDispatcherBuilder =
+ this.copy(config = config.copy(flowHandler = flowHandler(bounds)))
+
+ def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder =
+ setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS))
+
+ def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder =
+ this.copy(config = config.copy(threadTimeout = time))
+
+ def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolConfigDispatcherBuilder =
+ setFlowHandler(flowHandler(policy))
+
+ def setFlowHandler(newFlowHandler: FlowHandler): ThreadPoolConfigDispatcherBuilder =
+ this.copy(config = config.copy(flowHandler = newFlowHandler))
+
+ def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
+ this.copy(config = config.copy(allowCorePoolTimeout = allow))
+
+ def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) => f.map(_(c)).getOrElse(c))
+}
+
+/**
+ * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
+ */
+class MonitorableThreadFactory(val name: String) extends ThreadFactory {
+ protected val counter = new AtomicLong
+
+ def newThread(runnable: Runnable) = new MonitorableThread(runnable, name)
+}
+
+/**
+ * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
+ */
+object MonitorableThread {
+ val DEFAULT_NAME = "MonitorableThread"
+
+ // FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring
+ val created = new AtomicInteger
+ val alive = new AtomicInteger
+}
+
+/**
+ * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
+ */
+class MonitorableThread(runnable: Runnable, name: String)
+ extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) {
+
+ setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ def uncaughtException(thread: Thread, cause: Throwable) = {}
+ })
+
+ override def run = {
+ try {
+ MonitorableThread.alive.incrementAndGet
+ super.run
+ } finally {
+ MonitorableThread.alive.decrementAndGet
+ }
+ }
+}
+
+/**
+ * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
+ */
+class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate {
+ protected val semaphore = new Semaphore(bound)
+
+ override def execute(command: Runnable) = {
+ semaphore.acquire
+ try {
+ executor.execute(new Runnable() {
+ def run = {
+ try {
+ command.run
+ } finally {
+ semaphore.release
+ }
+ }
+ })
+ } catch {
+ case e: RejectedExecutionException =>
+ EventHandler.warning(this, e.toString)
+ semaphore.release
+ case e: Throwable =>
+ EventHandler.error(e, this, e.getMessage)
+ throw e
+ }
+ }
+}
+
+trait ExecutorServiceDelegate extends ExecutorService {
+
+ def executor: ExecutorService
+
+ def execute(command: Runnable) = executor.execute(command)
+
+ def shutdown() { executor.shutdown() }
+
+ def shutdownNow() = executor.shutdownNow()
+
+ def isShutdown = executor.isShutdown
+
+ def isTerminated = executor.isTerminated
+
+ def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit)
+
+ def submit[T](callable: Callable[T]) = executor.submit(callable)
+
+ def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
+
+ def submit(runnable: Runnable) = executor.submit(runnable)
+
+ def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
+
+ def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
+
+ def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
+
+ def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
+}
+
+trait LazyExecutorService extends ExecutorServiceDelegate {
+
+ def createExecutor: ExecutorService
+
+ lazy val executor = {
+ createExecutor
+ }
+}
+
+class LazyExecutorServiceWrapper(executorFactory: => ExecutorService) extends LazyExecutorService {
+ def createExecutor = executorFactory
+}