diff options
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/dispatch')
8 files changed, 2168 insertions, 0 deletions
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala b/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala new file mode 100644 index 0000000000..7dd1bf6218 --- /dev/null +++ b/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala @@ -0,0 +1,227 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> + */ + +package akka.dispatch + +import akka.actor.{ Actor, ActorRef } +import akka.actor.newUuid +import akka.config.Config._ +import akka.util.{ Duration, ReflectiveAccess } + +import akka.config.Configuration + +import java.util.concurrent.TimeUnit + +/** + * Scala API. Dispatcher factory. + * <p/> + * Example usage: + * <pre/> + * val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name") + * dispatcher + * .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) + * .setCorePoolSize(16) + * .setMaxPoolSize(128) + * .setKeepAliveTimeInMillis(60000) + * .setRejectionPolicy(new CallerRunsPolicy) + * .build + * </pre> + * <p/> + * Java API. Dispatcher factory. + * <p/> + * Example usage: + * <pre/> + * MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name"); + * dispatcher + * .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) + * .setCorePoolSize(16) + * .setMaxPoolSize(128) + * .setKeepAliveTimeInMillis(60000) + * .setRejectionPolicy(new CallerRunsPolicy()) + * .build(); + * </pre> + * <p/> + * + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +object Dispatchers { + val THROUGHPUT = config.getInt("akka.actor.throughput", 5) + val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout"). + map(time => Duration(time, TIME_UNIT)). + getOrElse(Duration(1000, TimeUnit.MILLISECONDS)) + val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) + val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT) + val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time", -1), TIME_UNIT) + val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt + val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox() + + lazy val defaultGlobalDispatcher = { + config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) + } + + object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) + + /** + * Creates an thread based dispatcher serving a single actor through the same single thread. + * Uses the default timeout + * <p/> + * E.g. each actor consumes its own thread. + */ + def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor) + + /** + * Creates an thread based dispatcher serving a single actor through the same single thread. + * Uses the default timeout + * If capacity is negative, it's Integer.MAX_VALUE + * <p/> + * E.g. each actor consumes its own thread. + */ + def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int) = new ThreadBasedDispatcher(actor, mailboxCapacity) + + /** + * Creates an thread based dispatcher serving a single actor through the same single thread. + * If capacity is negative, it's Integer.MAX_VALUE + * <p/> + * E.g. each actor consumes its own thread. + */ + def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = + new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeOut) + + /** + * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. + * <p/> + * Has a fluent builder interface for configuring its semantics. + */ + def newExecutorBasedEventDrivenDispatcher(name: String) = + ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenDispatcher(name, config), ThreadPoolConfig()) + + /** + * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. + * <p/> + * Has a fluent builder interface for configuring its semantics. + */ + def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = + ThreadPoolConfigDispatcherBuilder(config => + new ExecutorBasedEventDrivenDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig()) + + /** + * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. + * <p/> + * Has a fluent builder interface for configuring its semantics. + */ + def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = + ThreadPoolConfigDispatcherBuilder(config => + new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig()) + + /** + * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. + * <p/> + * Has a fluent builder interface for configuring its semantics. + */ + def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) = + ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, config), ThreadPoolConfig()) + + /** + * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. + * <p/> + * Has a fluent builder interface for configuring its semantics. + */ + def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int) = + ThreadPoolConfigDispatcherBuilder(config => + new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config), ThreadPoolConfig()) + + /** + * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. + * <p/> + * Has a fluent builder interface for configuring its semantics. + */ + def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = + ThreadPoolConfigDispatcherBuilder(config => + new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig()) + + /** + * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. + * <p/> + * Has a fluent builder interface for configuring its semantics. + */ + def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = + ThreadPoolConfigDispatcherBuilder(config => + new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig()) + /** + * Utility function that tries to load the specified dispatcher config from the akka.conf + * or else use the supplied default dispatcher + */ + def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = + config getSection key flatMap from getOrElse default + + /* + * Creates of obtains a dispatcher from a ConfigMap according to the format below + * + * default-dispatcher { + * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable + * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, + * # GlobalExecutorBasedEventDriven + * # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor + * keep-alive-time = 60 # Keep alive time for threads + * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) + * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) + * executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded + * allow-core-timeout = on # Allow core threads to time out + * rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard + * throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher + * } + * ex: from(config.getConfigMap(identifier).get) + * + * Gotcha: Only configures the dispatcher if possible + * Returns: None if "type" isn't specified in the config + * Throws: IllegalArgumentException if the value of "type" is not valid + * IllegalArgumentException if it cannot + */ + def from(cfg: Configuration): Option[MessageDispatcher] = { + cfg.getString("type") map { + case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcherConfigurator() + case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator() + case "GlobalExecutorBasedEventDriven" => GlobalExecutorBasedEventDrivenDispatcherConfigurator + case fqn => + ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match { + case r: Right[_, Class[MessageDispatcherConfigurator]] => + ReflectiveAccess.createInstance[MessageDispatcherConfigurator](r.b, Array[Class[_]](), Array[AnyRef]()) match { + case r: Right[Exception, MessageDispatcherConfigurator] => r.b + case l: Left[Exception, MessageDispatcherConfigurator] => + throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, l.a) + } + case l: Left[Exception, _] => + throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, l.a) + } + } map { + _ configure cfg + } + } +} + +object GlobalExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator { + def configure(config: Configuration): MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher +} + +class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator { + def configure(config: Configuration): MessageDispatcher = { + configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenDispatcher( + config.getString("name", newUuid.toString), + config.getInt("throughput", Dispatchers.THROUGHPUT), + config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), + mailboxType(config), + threadPoolConfig)).build + } +} + +class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends MessageDispatcherConfigurator { + def configure(config: Configuration): MessageDispatcher = { + configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher( + config.getString("name", newUuid.toString), + config.getInt("throughput", Dispatchers.THROUGHPUT), + config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), + mailboxType(config), + threadPoolConfig)).build + } +}
\ No newline at end of file diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala new file mode 100644 index 0000000000..bc3f29ac68 --- /dev/null +++ b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -0,0 +1,305 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> + */ + +package akka.dispatch + +import akka.event.EventHandler +import akka.actor.{ ActorRef, IllegalActorStateException } +import akka.util.{ ReflectiveAccess, Switch } + +import java.util.Queue +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue } + +/** + * Default settings are: + * <pre/> + * - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + * - NR_START_THREADS = 16 + * - NR_MAX_THREADS = 128 + * - KEEP_ALIVE_TIME = 60000L // one minute + * </pre> + * <p/> + * + * The dispatcher has a fluent builder interface to build up a thread pool to suite your use-case. + * There is a default thread pool defined but make use of the builder if you need it. Here are some examples. + * <p/> + * + * Scala API. + * <p/> + * Example usage: + * <pre/> + * val dispatcher = new ExecutorBasedEventDrivenDispatcher("name") + * dispatcher + * .withNewThreadPoolWithBoundedBlockingQueue(100) + * .setCorePoolSize(16) + * .setMaxPoolSize(128) + * .setKeepAliveTimeInMillis(60000) + * .setRejectionPolicy(new CallerRunsPolicy) + * .buildThreadPool + * </pre> + * <p/> + * + * Java API. + * <p/> + * Example usage: + * <pre/> + * ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name"); + * dispatcher + * .withNewThreadPoolWithBoundedBlockingQueue(100) + * .setCorePoolSize(16) + * .setMaxPoolSize(128) + * .setKeepAliveTimeInMillis(60000) + * .setRejectionPolicy(new CallerRunsPolicy()) + * .buildThreadPool(); + * </pre> + * <p/> + * + * But the preferred way of creating dispatchers is to use + * the {@link akka.dispatch.Dispatchers} factory object. + * + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + * @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the + * mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher + * always continues until the mailbox is empty. + * Larger values (or zero or negative) increase throughput, smaller values increase fairness + */ +class ExecutorBasedEventDrivenDispatcher( + _name: String, + val throughput: Int = Dispatchers.THROUGHPUT, + val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, + val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, + val config: ThreadPoolConfig = ThreadPoolConfig()) + extends MessageDispatcher { + + def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = + this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage + + def this(_name: String, throughput: Int, mailboxType: MailboxType) = + this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage + + def this(_name: String, throughput: Int) = + this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + def this(_name: String, _config: ThreadPoolConfig) = + this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config) + + def this(_name: String) = + this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + val name = "akka:event-driven:dispatcher:" + _name + + private[akka] val threadFactory = new MonitorableThreadFactory(name) + private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) + + private[akka] def dispatch(invocation: MessageInvocation) = { + val mbox = getMailbox(invocation.receiver) + mbox enqueue invocation + registerForExecution(mbox) + } + + private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit = if (active.isOn) { + try executorService.get() execute invocation + catch { + case e: RejectedExecutionException => + EventHandler.warning(this, e.toString) + throw e + } + } + + /** + * @return the mailbox associated with the actor + */ + protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] + + override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size + + def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { + case b: UnboundedMailbox => + new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox { + @inline + final def dispatcher = ExecutorBasedEventDrivenDispatcher.this + @inline + final def enqueue(m: MessageInvocation) = this.add(m) + @inline + final def dequeue(): MessageInvocation = this.poll() + } + case b: BoundedMailbox => + new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox { + @inline + final def dispatcher = ExecutorBasedEventDrivenDispatcher.this + } + } + + private[akka] def start {} + + private[akka] def shutdown { + val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) + if (old ne null) { + old.shutdownNow() + } + } + + private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { + if (mbox.dispatcherLock.tryLock()) { + if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended + try { + executorService.get() execute mbox + } catch { + case e: RejectedExecutionException => + EventHandler.warning(this, e.toString) + mbox.dispatcherLock.unlock() + throw e + } + } else { + mbox.dispatcherLock.unlock() //If the dispatcher isn't active or if the actor is suspended, unlock the dispatcher lock + } + } + } + + private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = + registerForExecution(mbox) + + override val toString = getClass.getSimpleName + "[" + name + "]" + + def suspend(actorRef: ActorRef) { + getMailbox(actorRef).suspended.tryLock + } + + def resume(actorRef: ActorRef) { + val mbox = getMailbox(actorRef) + mbox.suspended.tryUnlock + reRegisterForExecution(mbox) + } +} + +/** + * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox. + */ +trait ExecutableMailbox extends Runnable { self: MessageQueue => + + def dispatcher: ExecutorBasedEventDrivenDispatcher + + final def run = { + try { + processMailbox() + } catch { + case ie: InterruptedException => + } + finally { + dispatcherLock.unlock() + } + if (!self.isEmpty) + dispatcher.reRegisterForExecution(this) + } + + /** + * Process the messages in the mailbox + * + * @return true if the processing finished before the mailbox was empty, due to the throughput constraint + */ + final def processMailbox() { + if (!self.suspended.locked) { + var nextMessage = self.dequeue + if (nextMessage ne null) { //If we have a message + if (dispatcher.throughput <= 1) //If we only run one message per process + nextMessage.invoke //Just run it + else { //But otherwise, if we are throttled, we need to do some book-keeping + var processedMessages = 0 + val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0 + val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) + else 0 + do { + nextMessage.invoke + nextMessage = + if (self.suspended.locked) { + null // If we are suspended, abort + } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries + processedMessages += 1 + if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out + null //We reached our boundaries, abort + else self.dequeue //Dequeue the next message + } + } while (nextMessage ne null) + } + } + } + } +} + +object PriorityGenerator { + /** + * Creates a PriorityGenerator that uses the supplied function as priority generator + */ + def apply(priorityFunction: Any => Int): PriorityGenerator = new PriorityGenerator { + def gen(message: Any): Int = priorityFunction(message) + } +} + +/** + * A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a + * PriorityExecutorBasedEventDrivenDispatcher + */ +abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] { + def gen(message: Any): Int + + final def compare(thisMessage: MessageInvocation, thatMessage: MessageInvocation): Int = + gen(thisMessage.message) - gen(thatMessage.message) +} + +/** + * A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox, + * prioritized according to the supplied comparator. + * + * The dispatcher will process the messages with the _lowest_ priority first. + */ +class PriorityExecutorBasedEventDrivenDispatcher( + name: String, + val comparator: java.util.Comparator[MessageInvocation], + throughput: Int = Dispatchers.THROUGHPUT, + throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, + mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, + config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox { + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = + this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) = + this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) = + this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) = + this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config) + + def this(name: String, comparator: java.util.Comparator[MessageInvocation]) = + this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage +} + +/** + * Can be used to give an ExecutorBasedEventDrivenDispatcher's actors priority-enabled mailboxes + * + * Usage: + * new ExecutorBasedEventDrivenDispatcher(...) with PriorityMailbox { + * val comparator = ...comparator that determines mailbox priority ordering... + * } + */ +trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher => + def comparator: java.util.Comparator[MessageInvocation] + + override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match { + case b: UnboundedMailbox => + new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox { + @inline + final def dispatcher = self + } + + case b: BoundedMailbox => + new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, comparator) with ExecutableMailbox { + @inline + final def dispatcher = self + } + } +} diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala new file mode 100644 index 0000000000..4cba8eec8b --- /dev/null +++ b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -0,0 +1,165 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> + */ + +package akka.dispatch + +import akka.actor.{ ActorRef, Actor, IllegalActorStateException } +import akka.util.{ ReflectiveAccess, Switch } + +import java.util.Queue +import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } +import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue } +import util.DynamicVariable + +/** + * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed + * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the + * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. + * <p/> + * Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably + * best described as "work donating" because the actor of which work is being stolen takes the initiative. + * <p/> + * The preferred way of creating dispatchers is to use + * the {@link akka.dispatch.Dispatchers} factory object. + * + * @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher + * @see akka.dispatch.Dispatchers + * + * @author Viktor Klang + */ +class ExecutorBasedEventDrivenWorkStealingDispatcher( + _name: String, + throughput: Int = Dispatchers.THROUGHPUT, + throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, + mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, + config: ThreadPoolConfig = ThreadPoolConfig()) + extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) { + + def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = + this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage + + def this(_name: String, throughput: Int, mailboxType: MailboxType) = + this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage + + def this(_name: String, throughput: Int) = + this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + def this(_name: String, _config: ThreadPoolConfig) = + this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config) + + def this(_name: String, memberType: Class[_ <: Actor]) = + this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + def this(_name: String, mailboxType: MailboxType) = + this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage + + @volatile + private var actorType: Option[Class[_]] = None + @volatile + private var members = Vector[ActorRef]() + private val donationInProgress = new DynamicVariable(false) + + private[akka] override def register(actorRef: ActorRef) = { + //Verify actor type conformity + actorType match { + case None => actorType = Some(actorRef.actor.getClass) + case Some(aType) => + if (aType != actorRef.actor.getClass) + throw new IllegalActorStateException(String.format( + "Can't register actor %s in a work stealing dispatcher which already knows actors of type %s", + actorRef, aType)) + } + + synchronized { members :+= actorRef } //Update members + super.register(actorRef) + } + + private[akka] override def unregister(actorRef: ActorRef) = { + synchronized { members = members.filterNot(actorRef eq) } //Update members + super.unregister(actorRef) + } + + override private[akka] def dispatch(invocation: MessageInvocation) = { + val mbox = getMailbox(invocation.receiver) + if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) { + //We were busy and we got to donate the message to some other lucky guy, we're done here + } else { + mbox enqueue invocation + registerForExecution(mbox) + } + } + + override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { + try { + donationInProgress.value = true + while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor + } finally { donationInProgress.value = false } + + if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution + super.reRegisterForExecution(mbox) + } + + /** + * Returns true if it successfully donated a message + */ + protected def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Boolean = { + val actors = members // copy to prevent concurrent modifications having any impact + + // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means + // the dispatcher is being shut down... + // Starts at is seeded by current time + doFindDonorRecipient(donorMbox, actors, (System.currentTimeMillis % actors.size).asInstanceOf[Int]) match { + case null => false + case recipient => donate(donorMbox.dequeue, recipient) + } + } + + /** + * Returns true if the donation succeeded or false otherwise + */ + protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = try { + donationInProgress.value = true + val actors = members // copy to prevent concurrent modifications having any impact + doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match { + case null => false + case recipient => donate(message, recipient) + } + } finally { donationInProgress.value = false } + + /** + * Rewrites the message and adds that message to the recipients mailbox + * returns true if the message is non-null + */ + protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = { + if (organ ne null) { + if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any]( + organ.message, recipient.timeout, organ.sender, organ.senderFuture) + else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender) + else recipient.postMessageToMailbox(organ.message, None) + true + } else false + } + + /** + * Returns an available recipient for the message, if any + */ + protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorRef], startIndex: Int): ActorRef = { + val prSz = potentialRecipients.size + var i = 0 + var recipient: ActorRef = null + + while ((i < prSz) && (recipient eq null)) { + val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap + val mbox = getMailbox(actor) + + if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself + recipient = actor //Found! + } + + i += 1 + } + + recipient // nothing found, reuse same start index next time + } +} diff --git a/test/disabled/presentation/akka/src/akka/dispatch/Future.scala b/test/disabled/presentation/akka/src/akka/dispatch/Future.scala new file mode 100644 index 0000000000..1ad304d726 --- /dev/null +++ b/test/disabled/presentation/akka/src/akka/dispatch/Future.scala @@ -0,0 +1,832 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> + */ + +package akka.dispatch + +import akka.AkkaException +import akka.event.EventHandler +import akka.actor.{ Actor, Channel } +import akka.util.Duration +import akka.japi.{ Procedure, Function => JFunc } + +import scala.util.continuations._ + +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } +import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS => MILLIS } +import java.util.concurrent.atomic.{ AtomicBoolean } +import java.lang.{ Iterable => JIterable } +import java.util.{ LinkedList => JLinkedList } +import scala.collection.mutable.Stack +import annotation.tailrec + +class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) + +object Futures { + + /** + * Java API, equivalent to Future.apply + */ + def future[T](body: Callable[T]): Future[T] = + Future(body.call) + + /** + * Java API, equivalent to Future.apply + */ + def future[T](body: Callable[T], timeout: Long): Future[T] = + Future(body.call, timeout) + + /** + * Java API, equivalent to Future.apply + */ + def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = + Future(body.call)(dispatcher) + + /** + * Java API, equivalent to Future.apply + */ + def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] = + Future(body.call, timeout)(dispatcher) + + /** + * Returns a Future to the result of the first future in the list that is completed + */ + def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = { + val futureResult = new DefaultCompletableFuture[T](timeout) + + val completeFirst: Future[T] => Unit = _.value.foreach(futureResult complete _) + for (f ← futures) f onComplete completeFirst + + futureResult + } + + /** + * Java API. + * Returns a Future to the result of the first future in the list that is completed + */ + def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] = + firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout) + + /** + * A non-blocking fold over the specified futures. + * The fold is performed on the thread where the last future is completed, + * the result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + * Example: + * <pre> + * val result = Futures.fold(0)(futures)(_ + _).await.result + * </pre> + */ + def fold[T, R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) => R): Future[R] = { + if (futures.isEmpty) { + new AlreadyCompletedFuture[R](Right(zero)) + } else { + val result = new DefaultCompletableFuture[R](timeout) + val results = new ConcurrentLinkedQueue[T]() + val allDone = futures.size + + val aggregate: Future[T] => Unit = f => if (!result.isCompleted) { //TODO: This is an optimization, is it premature? + f.value.get match { + case r: Right[Throwable, T] => + results add r.b + if (results.size == allDone) { //Only one thread can get here + try { + result completeWithResult scala.collection.JavaConversions.collectionAsScalaIterable(results).foldLeft(zero)(foldFun) + } catch { + case e: Exception => + EventHandler.error(e, this, e.getMessage) + result completeWithException e + } + finally { + results.clear + } + } + case l: Left[Throwable, T] => + result completeWithException l.a + results.clear + } + } + + futures foreach { _ onComplete aggregate } + result + } + } + + /** + * Java API + * A non-blocking fold over the specified futures. + * The fold is performed on the thread where the last future is completed, + * the result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + */ + def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = + fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _) + + /** + * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first + * Example: + * <pre> + * val result = Futures.reduce(futures)(_ + _).await.result + * </pre> + */ + def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R, T) => T): Future[R] = { + if (futures.isEmpty) + new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left"))) + else { + val result = new DefaultCompletableFuture[R](timeout) + val seedFound = new AtomicBoolean(false) + val seedFold: Future[T] => Unit = f => { + if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold + f.value.get match { + case r: Right[Throwable, T] => + result.completeWith(fold(r.b, timeout)(futures.filterNot(_ eq f))(op)) + case l: Left[Throwable, T] => + result.completeWithException(l.a) + } + } + } + for (f ← futures) f onComplete seedFold //Attach the listener to the Futures + result + } + } + + /** + * Java API. + * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first + */ + def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = + reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) + + /** + * Java API. + * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.util.LinkedList[A]]. + * Useful for reducing many Futures into a single Future. + */ + def sequence[A](in: JIterable[Future[A]], timeout: Long): Future[JLinkedList[A]] = + scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) => + for (r ← fr; a ← fa) yield { + r add a + r + }) + + /** + * Java API. + * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.util.LinkedList[A]]. + * Useful for reducing many Futures into a single Future. + */ + def sequence[A](in: JIterable[Future[A]]): Future[JLinkedList[A]] = sequence(in, Actor.TIMEOUT) + + /** + * Java API. + * Transforms a java.lang.Iterable[A] into a Future[java.util.LinkedList[B]] using the provided Function A => Future[B]. + * This is useful for performing a parallel map. For example, to apply a function to all items of a list + * in parallel. + */ + def traverse[A, B](in: JIterable[A], timeout: Long, fn: JFunc[A, Future[B]]): Future[JLinkedList[B]] = + scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) => + val fb = fn(a) + for (r ← fr; b ← fb) yield { + r add b + r + } + } + + /** + * Java API. + * Transforms a java.lang.Iterable[A] into a Future[java.util.LinkedList[B]] using the provided Function A => Future[B]. + * This is useful for performing a parallel map. For example, to apply a function to all items of a list + * in parallel. + */ + def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JLinkedList[B]] = traverse(in, Actor.TIMEOUT, fn) + + // ===================================== + // Deprecations + // ===================================== + + /** + * (Blocking!) + */ + @deprecated("Will be removed after 1.1, if you must block, use: futures.foreach(_.await)", "1.1") + def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) + + /** + * Returns the First Future that is completed (blocking!) + */ + @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(futures).await", "1.1") + def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf[Any](futures, timeout).await + + /** + * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed + */ + @deprecated("Will be removed after 1.1, if you must block, use: futures map { f => fun(f.await) }", "1.1") + def awaitMap[A, B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = + in map { f => fun(f.await) } + + /** + * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) + */ + @deprecated("Will be removed after 1.1, if you must block, use: firstCompletedOf(List(f1,f2)).await.resultOrException", "1.1") + def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = firstCompletedOf[T](List(f1, f2)).await.resultOrException +} + +object Future { + /** + * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body + * The execution is performed by the specified Dispatcher. + */ + def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = + dispatcher.dispatchFuture(() => body, timeout) + + /** + * Construct a completable channel + */ + def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] { + val future = empty[Any](timeout) + def !(msg: Any) = future completeWithResult msg + } + + /** + * Create an empty Future with default timeout + */ + def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultCompletableFuture[T](timeout) + + import scala.collection.mutable.Builder + import scala.collection.generic.CanBuildFrom + + /** + * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]]. + * Useful for reducing many Futures into a single Future. + */ + def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = + in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) => for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) + + /** + * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B]. + * This is useful for performing a parallel map. For example, to apply a function to all items of a list + * in parallel: + * <pre> + * val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x))) + * </pre> + */ + def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = + in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => + val fb = fn(a.asInstanceOf[A]) + for (r ← fr; b ← fb) yield (r += b) + }.map(_.result) + + /** + * Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited + * Continuations plugin. + * + * Within the block, the result of a Future may be accessed by calling Future.apply. At that point + * execution is suspended with the rest of the block being stored in a continuation until the result + * of the Future is available. If an Exception is thrown while processing, it will be contained + * within the resulting Future. + * + * This allows working with Futures in an imperative style without blocking for each result. + * + * Completing a Future using 'CompletableFuture << Future' will also suspend execution until the + * value of the other Future is available. + * + * The Delimited Continuations compiler plugin must be enabled in order to use this method. + */ + def flow[A](body: => A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = { + val future = Promise[A](timeout) + (reset(future.asInstanceOf[CompletableFuture[Any]].completeWithResult(body)): Future[Any]) onComplete { f => + val opte = f.exception + if (opte.isDefined) future completeWithException (opte.get) + } + future + } + + private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() => Unit]]]() { + override def initialValue = None + } +} + +sealed trait Future[+T] { + + /** + * For use only within a Future.flow block or another compatible Delimited Continuations reset block. + * + * Returns the result of this Future without blocking, by suspending execution and storing it as a + * continuation until the result is available. + * + * If this Future is untyped (a Future[Nothing]), a type parameter must be explicitly provided or + * execution will fail. The normal result of getting a Future from an ActorRef using !!! will return + * an untyped Future. + */ + def apply[A >: T](): A @cps[Future[Any]] = shift(this flatMap (_: A => Future[Any])) + + /** + * Blocks awaiting completion of this Future, then returns the resulting value, + * or throws the completed exception + * + * Scala & Java API + * + * throws FutureTimeoutException if this Future times out when waiting for completion + */ + def get: T = this.await.resultOrException.get + + /** + * Blocks the current thread until the Future has been completed or the + * timeout has expired. In the case of the timeout expiring a + * FutureTimeoutException will be thrown. + */ + def await: Future[T] + + /** + * Blocks the current thread until the Future has been completed or the + * timeout has expired. The timeout will be the least value of 'atMost' and the timeout + * supplied at the constructuion of this Future. + * In the case of the timeout expiring a FutureTimeoutException will be thrown. + */ + def await(atMost: Duration): Future[T] + + /** + * Blocks the current thread until the Future has been completed. Use + * caution with this method as it ignores the timeout and will block + * indefinitely if the Future is never completed. + */ + @deprecated("Will be removed after 1.1, it's dangerous and can cause deadlocks, agony and insanity.", "1.1") + def awaitBlocking: Future[T] + + /** + * Tests whether this Future has been completed. + */ + final def isCompleted: Boolean = value.isDefined + + /** + * Tests whether this Future's timeout has expired. + * + * Note that an expired Future may still contain a value, or it may be + * completed with a value. + */ + def isExpired: Boolean + + /** + * This Future's timeout in nanoseconds. + */ + def timeoutInNanos: Long + + /** + * The contained value of this Future. Before this Future is completed + * the value will be None. After completion the value will be Some(Right(t)) + * if it contains a valid result, or Some(Left(error)) if it contains + * an exception. + */ + def value: Option[Either[Throwable, T]] + + /** + * Returns the successful result of this Future if it exists. + */ + final def result: Option[T] = { + val v = value + if (v.isDefined) v.get.right.toOption + else None + } + + /** + * Returns the contained exception of this Future if it exists. + */ + final def exception: Option[Throwable] = { + val v = value + if (v.isDefined) v.get.left.toOption + else None + } + + /** + * When this Future is completed, apply the provided function to the + * Future. If the Future has already been completed, this will apply + * immediately. + */ + def onComplete(func: Future[T] => Unit): Future[T] + + /** + * When the future is completed with a valid result, apply the provided + * PartialFunction to the result. + * <pre> + * val result = future receive { + * case Foo => "foo" + * case Bar => "bar" + * }.await.result + * </pre> + */ + final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f => + val optr = f.result + if (optr.isDefined) { + val r = optr.get + if (pf.isDefinedAt(r)) pf(r) + } + } + + /** + * Creates a new Future by applying a PartialFunction to the successful + * result of this Future if a match is found, or else return a MatchError. + * If this Future is completed with an exception then the new Future will + * also contain this exception. + * Example: + * <pre> + * val future1 = for { + * a <- actor !!! Req("Hello") collect { case Res(x: Int) => x } + * b <- actor !!! Req(a) collect { case Res(x: String) => x } + * c <- actor !!! Req(7) collect { case Res(x: String) => x } + * } yield b + "-" + c + * </pre> + */ + final def collect[A](pf: PartialFunction[Any, A]): Future[A] = { + val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + onComplete { ft => + val v = ft.value.get + fa complete { + if (v.isLeft) v.asInstanceOf[Either[Throwable, A]] + else { + try { + val r = v.right.get + if (pf isDefinedAt r) Right(pf(r)) + else Left(new MatchError(r)) + } catch { + case e: Exception => + EventHandler.error(e, this, e.getMessage) + Left(e) + } + } + } + } + fa + } + + /** + * Creates a new Future that will handle any matching Throwable that this + * Future might contain. If there is no match, or if this Future contains + * a valid result then the new Future will contain the same. + * Example: + * <pre> + * Future(6 / 0) failure { case e: ArithmeticException => 0 } // result: 0 + * Future(6 / 0) failure { case e: NotFoundException => 0 } // result: exception + * Future(6 / 2) failure { case e: ArithmeticException => 0 } // result: 3 + * </pre> + */ + final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { + val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + onComplete { ft => + val opte = ft.exception + fa complete { + if (opte.isDefined) { + val e = opte.get + try { + if (pf isDefinedAt e) Right(pf(e)) + else Left(e) + } catch { + case x: Exception => Left(x) + } + } else ft.value.get + } + } + fa + } + + /** + * Creates a new Future by applying a function to the successful result of + * this Future. If this Future is completed with an exception then the new + * Future will also contain this exception. + * Example: + * <pre> + * val future1 = for { + * a: Int <- actor !!! "Hello" // returns 5 + * b: String <- actor !!! a // returns "10" + * c: String <- actor !!! 7 // returns "14" + * } yield b + "-" + c + * </pre> + */ + final def map[A](f: T => A): Future[A] = { + val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + onComplete { ft => + val optv = ft.value + if (optv.isDefined) { + val v = optv.get + if (v.isLeft) + fa complete v.asInstanceOf[Either[Throwable, A]] + else { + fa complete (try { + Right(f(v.right.get)) + } catch { + case e: Exception => + EventHandler.error(e, this, e.getMessage) + Left(e) + }) + } + } + } + fa + } + + /** + * Creates a new Future by applying a function to the successful result of + * this Future, and returns the result of the function as the new Future. + * If this Future is completed with an exception then the new Future will + * also contain this exception. + * Example: + * <pre> + * val future1 = for { + * a: Int <- actor !!! "Hello" // returns 5 + * b: String <- actor !!! a // returns "10" + * c: String <- actor !!! 7 // returns "14" + * } yield b + "-" + c + * </pre> + */ + final def flatMap[A](f: T => Future[A]): Future[A] = { + val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + onComplete { ft => + val optv = ft.value + if (optv.isDefined) { + val v = optv.get + if (v.isLeft) + fa complete v.asInstanceOf[Either[Throwable, A]] + else { + try { + fa.completeWith(f(v.right.get)) + } catch { + case e: Exception => + EventHandler.error(e, this, e.getMessage) + fa completeWithException e + } + } + } + } + fa + } + + final def foreach(f: T => Unit): Unit = onComplete { ft => + val optr = ft.result + if (optr.isDefined) + f(optr.get) + } + + final def filter(p: Any => Boolean): Future[Any] = { + val f = new DefaultCompletableFuture[T](timeoutInNanos, NANOS) + onComplete { ft => + val optv = ft.value + if (optv.isDefined) { + val v = optv.get + if (v.isLeft) + f complete v + else { + val r = v.right.get + f complete (try { + if (p(r)) Right(r) + else Left(new MatchError(r)) + } catch { + case e: Exception => + EventHandler.error(e, this, e.getMessage) + Left(e) + }) + } + } + } + f + } + + /** + * Returns the current result, throws the exception is one has been raised, else returns None + */ + final def resultOrException: Option[T] = { + val v = value + if (v.isDefined) { + val r = v.get + if (r.isLeft) throw r.left.get + else r.right.toOption + } else None + } + + /* Java API */ + final def onComplete[A >: T](proc: Procedure[Future[A]]): Future[T] = onComplete(proc(_)) + + final def map[A >: T, B](f: JFunc[A, B]): Future[B] = map(f(_)) + + final def flatMap[A >: T, B](f: JFunc[A, Future[B]]): Future[B] = flatMap(f(_)) + + final def foreach[A >: T](proc: Procedure[A]): Unit = foreach(proc(_)) + + final def filter(p: JFunc[Any, Boolean]): Future[Any] = filter(p(_)) + +} + +object Promise { + + def apply[A](timeout: Long): CompletableFuture[A] = new DefaultCompletableFuture[A](timeout) + + def apply[A](): CompletableFuture[A] = apply(Actor.TIMEOUT) + +} + +/** + * Essentially this is the Promise (or write-side) of a Future (read-side). + */ +trait CompletableFuture[T] extends Future[T] { + /** + * Completes this Future with the specified result, if not already completed. + * @return this + */ + def complete(value: Either[Throwable, T]): Future[T] + + /** + * Completes this Future with the specified result, if not already completed. + * @return this + */ + final def completeWithResult(result: T): Future[T] = complete(Right(result)) + + /** + * Completes this Future with the specified exception, if not already completed. + * @return this + */ + final def completeWithException(exception: Throwable): Future[T] = complete(Left(exception)) + + /** + * Completes this Future with the specified other Future, when that Future is completed, + * unless this Future has already been completed. + * @return this. + */ + final def completeWith(other: Future[T]): Future[T] = { + other onComplete { f => complete(f.value.get) } + this + } + + final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => cont(complete(Right(value))) } + + final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => + val fr = new DefaultCompletableFuture[Any](Actor.TIMEOUT) + this completeWith other onComplete { f => + try { + fr completeWith cont(f) + } catch { + case e: Exception => + EventHandler.error(e, this, e.getMessage) + fr completeWithException e + } + } + fr + } + +} + +/** + * The default concrete Future implementation. + */ +class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] { + + def this() = this(0, MILLIS) + + def this(timeout: Long) = this(timeout, MILLIS) + + val timeoutInNanos = timeunit.toNanos(timeout) + private val _startTimeInNanos = currentTimeInNanos + private val _lock = new ReentrantLock + private val _signal = _lock.newCondition + private var _value: Option[Either[Throwable, T]] = None + private var _listeners: List[Future[T] => Unit] = Nil + + /** + * Must be called inside _lock.lock<->_lock.unlock + */ + @tailrec + private def awaitUnsafe(waitTimeNanos: Long): Boolean = { + if (_value.isEmpty && waitTimeNanos > 0) { + val start = currentTimeInNanos + val remainingNanos = try { + _signal.awaitNanos(waitTimeNanos) + } catch { + case e: InterruptedException => + waitTimeNanos - (currentTimeInNanos - start) + } + awaitUnsafe(remainingNanos) + } else { + _value.isDefined + } + } + + def await(atMost: Duration) = { + _lock.lock + if (try { awaitUnsafe(atMost.toNanos min timeLeft()) } finally { _lock.unlock }) this + else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds") + } + + def await = { + _lock.lock + if (try { awaitUnsafe(timeLeft()) } finally { _lock.unlock }) this + else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds") + } + + def awaitBlocking = { + _lock.lock + try { + while (_value.isEmpty) { + _signal.await + } + this + } finally { + _lock.unlock + } + } + + def isExpired: Boolean = timeLeft() <= 0 + + def value: Option[Either[Throwable, T]] = { + _lock.lock + try { + _value + } finally { + _lock.unlock + } + } + + def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { + _lock.lock + val notifyTheseListeners = try { + if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired + _value = Some(value) + val existingListeners = _listeners + _listeners = Nil + existingListeners + } else Nil + } finally { + _signal.signalAll + _lock.unlock + } + + if (notifyTheseListeners.nonEmpty) { // Steps to ensure we don't run into a stack-overflow situation + @tailrec + def runCallbacks(rest: List[Future[T] => Unit], callbacks: Stack[() => Unit]) { + if (rest.nonEmpty) { + notifyCompleted(rest.head) + while (callbacks.nonEmpty) { callbacks.pop().apply() } + runCallbacks(rest.tail, callbacks) + } + } + + val pending = Future.callbacksPendingExecution.get + if (pending.isDefined) { //Instead of nesting the calls to the callbacks (leading to stack overflow) + pending.get.push(() => { // Linearize/aggregate callbacks at top level and then execute + val doNotify = notifyCompleted _ //Hoist closure to avoid garbage + notifyTheseListeners foreach doNotify + }) + } else { + try { + val callbacks = Stack[() => Unit]() // Allocate new aggregator for pending callbacks + Future.callbacksPendingExecution.set(Some(callbacks)) // Specify the callback aggregator + runCallbacks(notifyTheseListeners, callbacks) // Execute callbacks, if they trigger new callbacks, they are aggregated + } finally { Future.callbacksPendingExecution.set(None) } // Ensure cleanup + } + } + + this + } + + def onComplete(func: Future[T] => Unit): CompletableFuture[T] = { + _lock.lock + val notifyNow = try { + if (_value.isEmpty) { + if (!isExpired) { //Only add the listener if the future isn't expired + _listeners ::= func + false + } else false //Will never run the callback since the future is expired + } else true + } finally { + _lock.unlock + } + + if (notifyNow) notifyCompleted(func) + + this + } + + private def notifyCompleted(func: Future[T] => Unit) { + try { + func(this) + } catch { + case e => EventHandler notify EventHandler.Error(e, this) + } + } + + @inline + private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) + @inline + private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) +} + +/** + * An already completed Future is seeded with it's result at creation, is useful for when you are participating in + * a Future-composition but you already have a value to contribute. + */ +sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] { + val value = Some(suppliedValue) + + def complete(value: Either[Throwable, T]): CompletableFuture[T] = this + def onComplete(func: Future[T] => Unit): Future[T] = { func(this); this } + def await(atMost: Duration): Future[T] = this + def await: Future[T] = this + def awaitBlocking: Future[T] = this + def isExpired: Boolean = true + def timeoutInNanos: Long = 0 +} diff --git a/test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala b/test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala new file mode 100644 index 0000000000..4c00577157 --- /dev/null +++ b/test/disabled/presentation/akka/src/akka/dispatch/MailboxHandling.scala @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> + */ + +package akka.dispatch + +import akka.AkkaException + +import java.util.{ Comparator, PriorityQueue } +import java.util.concurrent._ +import akka.util._ + +class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) + +/** + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +trait MessageQueue { + val dispatcherLock = new SimpleLock + val suspended = new SimpleLock + def enqueue(handle: MessageInvocation) + def dequeue(): MessageInvocation + def size: Int + def isEmpty: Boolean +} + +/** + * Mailbox configuration. + */ +sealed trait MailboxType + +case class UnboundedMailbox() extends MailboxType +case class BoundedMailbox( + val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY }, + val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType { + if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") + if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") +} + +trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] => + @inline + final def enqueue(handle: MessageInvocation): Unit = this add handle + @inline + final def dequeue(): MessageInvocation = this.poll() +} + +trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] => + def pushTimeOut: Duration + + final def enqueue(handle: MessageInvocation) { + if (pushTimeOut.length > 0) { + this.offer(handle, pushTimeOut.length, pushTimeOut.unit) || { + throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) + } + } else this put handle + } + + @inline + final def dequeue(): MessageInvocation = this.poll() +} + +class DefaultUnboundedMessageQueue extends LinkedBlockingQueue[MessageInvocation] with UnboundedMessageQueueSemantics + +class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration) extends LinkedBlockingQueue[MessageInvocation](capacity) with BoundedMessageQueueSemantics + +class UnboundedPriorityMessageQueue(cmp: Comparator[MessageInvocation]) extends PriorityBlockingQueue[MessageInvocation](11, cmp) with UnboundedMessageQueueSemantics + +class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, cmp: Comparator[MessageInvocation]) extends BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with BoundedMessageQueueSemantics diff --git a/test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala b/test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala new file mode 100644 index 0000000000..20887c3867 --- /dev/null +++ b/test/disabled/presentation/akka/src/akka/dispatch/MessageHandling.scala @@ -0,0 +1,260 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> + */ + +package akka.dispatch + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong +import akka.event.EventHandler +import akka.config.Configuration +import akka.config.Config.TIME_UNIT +import akka.util.{ Duration, Switch, ReentrantGuard } +import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy } +import akka.actor._ + +/** + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +final case class MessageInvocation(val receiver: ActorRef, + val message: Any, + val sender: Option[ActorRef], + val senderFuture: Option[CompletableFuture[Any]]) { + if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") + + def invoke = try { + receiver.invoke(this) + } catch { + case e: NullPointerException => throw new ActorInitializationException( + "Don't call 'self ! message' in the Actor's constructor (in Scala this means in the body of the class).") + } +} + +final case class FutureInvocation[T](future: CompletableFuture[T], function: () => T, cleanup: () => Unit) extends Runnable { + def run = { + future complete (try { + Right(function()) + } catch { + case e => + EventHandler.error(e, this, e.getMessage) + Left(e) + } + finally { + cleanup() + }) + } +} + +object MessageDispatcher { + val UNSCHEDULED = 0 + val SCHEDULED = 1 + val RESCHEDULED = 2 + + implicit def defaultGlobalDispatcher = Dispatchers.defaultGlobalDispatcher +} + +/** + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +trait MessageDispatcher { + import MessageDispatcher._ + + protected val uuids = new ConcurrentSkipListSet[Uuid] + protected val futures = new AtomicLong(0L) + protected val guard = new ReentrantGuard + protected val active = new Switch(false) + + private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard + + /** + * Creates and returns a mailbox for the given actor. + */ + private[akka] def createMailbox(actorRef: ActorRef): AnyRef + + /** + * Attaches the specified actorRef to this dispatcher + */ + final def attach(actorRef: ActorRef): Unit = guard withGuard { + register(actorRef) + } + + /** + * Detaches the specified actorRef from this dispatcher + */ + final def detach(actorRef: ActorRef): Unit = guard withGuard { + unregister(actorRef) + } + + private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = dispatch(invocation) + + private[akka] final def dispatchFuture[T](block: () => T, timeout: Long): Future[T] = { + futures.getAndIncrement() + try { + val future = new DefaultCompletableFuture[T](timeout) + + if (active.isOff) + guard withGuard { active.switchOn { start } } + + executeFuture(FutureInvocation[T](future, block, futureCleanup)) + future + } catch { + case e => + futures.decrementAndGet + throw e + } + } + + private val futureCleanup: () => Unit = + () => if (futures.decrementAndGet() == 0) { + guard withGuard { + if (futures.get == 0 && uuids.isEmpty) { + shutdownSchedule match { + case UNSCHEDULED => + shutdownSchedule = SCHEDULED + Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) + case SCHEDULED => + shutdownSchedule = RESCHEDULED + case RESCHEDULED => //Already marked for reschedule + } + } + } + } + + private[akka] def register(actorRef: ActorRef) { + if (actorRef.mailbox eq null) + actorRef.mailbox = createMailbox(actorRef) + + uuids add actorRef.uuid + if (active.isOff) { + active.switchOn { + start + } + } + } + + private[akka] def unregister(actorRef: ActorRef) = { + if (uuids remove actorRef.uuid) { + actorRef.mailbox = null + if (uuids.isEmpty && futures.get == 0) { + shutdownSchedule match { + case UNSCHEDULED => + shutdownSchedule = SCHEDULED + Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) + case SCHEDULED => + shutdownSchedule = RESCHEDULED + case RESCHEDULED => //Already marked for reschedule + } + } + } + } + + /** + * Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors + */ + def stopAllAttachedActors { + val i = uuids.iterator + while (i.hasNext()) { + val uuid = i.next() + Actor.registry.actorFor(uuid) match { + case Some(actor) => actor.stop() + case None => {} + } + } + } + + private val shutdownAction = new Runnable { + def run = guard withGuard { + shutdownSchedule match { + case RESCHEDULED => + shutdownSchedule = SCHEDULED + Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) + case SCHEDULED => + if (uuids.isEmpty && futures.get == 0) { + active switchOff { + shutdown // shut down in the dispatcher's references is zero + } + } + shutdownSchedule = UNSCHEDULED + case UNSCHEDULED => //Do nothing + } + } + } + + /** + * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms + * defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second + */ + private[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis + + /** + * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference + */ + def suspend(actorRef: ActorRef): Unit + + /* + * After the call to this method, the dispatcher must begin any new message processing for the specified reference + */ + def resume(actorRef: ActorRef): Unit + + /** + * Will be called when the dispatcher is to queue an invocation for execution + */ + private[akka] def dispatch(invocation: MessageInvocation): Unit + + private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit + + /** + * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown + */ + private[akka] def start(): Unit + + /** + * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached + */ + private[akka] def shutdown(): Unit + + /** + * Returns the size of the mailbox for the specified actor + */ + def mailboxSize(actorRef: ActorRef): Int + + /** + * Returns the amount of futures queued for execution + */ + def pendingFutures: Long = futures.get +} + +/** + * Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig + */ +abstract class MessageDispatcherConfigurator { + /** + * Returns an instance of MessageDispatcher given a Configuration + */ + def configure(config: Configuration): MessageDispatcher + + def mailboxType(config: Configuration): MailboxType = { + val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY) + if (capacity < 1) UnboundedMailbox() + else BoundedMailbox(capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT)) + } + + def configureThreadPool(config: Configuration, createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = { + import ThreadPoolConfigDispatcherBuilder.conf_? + + //Apply the following options to the config if they are present in the config + ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure( + conf_?(config getInt "keep-alive-time")(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))), + conf_?(config getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)), + conf_?(config getDouble "max-pool-size-factor")(factor => _.setMaxPoolSizeFromFactor(factor)), + conf_?(config getInt "executor-bounds")(bounds => _.setExecutorBounds(bounds)), + conf_?(config getBool "allow-core-timeout")(allow => _.setAllowCoreThreadTimeout(allow)), + conf_?(config getString "rejection-policy" map { + case "abort" => new AbortPolicy() + case "caller-runs" => new CallerRunsPolicy() + case "discard-oldest" => new DiscardOldestPolicy() + case "discard" => new DiscardPolicy() + case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) + })(policy => _.setRejectionPolicy(policy))) + } +} diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala new file mode 100644 index 0000000000..3169c70ef9 --- /dev/null +++ b/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> + */ + +package akka.dispatch + +import akka.actor.{ Actor, ActorRef } +import akka.config.Config.config +import akka.util.Duration + +import java.util.Queue +import java.util.concurrent.{ ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue } +import akka.actor +import java.util.concurrent.atomic.AtomicReference + +/** + * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. + * + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +class ThreadBasedDispatcher(_actor: ActorRef, _mailboxType: MailboxType) + extends ExecutorBasedEventDrivenDispatcher( + _actor.uuid.toString, Dispatchers.THROUGHPUT, -1, _mailboxType, ThreadBasedDispatcher.oneThread) { + + private[akka] val owner = new AtomicReference[ActorRef](_actor) + + def this(actor: ActorRef) = + this(actor, UnboundedMailbox()) // For Java API + + def this(actor: ActorRef, capacity: Int) = + this(actor, BoundedMailbox(capacity)) //For Java API + + def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = //For Java API + this(actor, BoundedMailbox(capacity, pushTimeOut)) + + override def register(actorRef: ActorRef) = { + val actor = owner.get() + if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) + owner.compareAndSet(null, actorRef) //Register if unregistered + super.register(actorRef) + } + + override def unregister(actorRef: ActorRef) = { + super.unregister(actorRef) + owner.compareAndSet(actorRef, null) //Unregister (prevent memory leak) + } +} + +object ThreadBasedDispatcher { + val oneThread: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1) +} + diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala b/test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala new file mode 100644 index 0000000000..e847610c4c --- /dev/null +++ b/test/disabled/presentation/akka/src/akka/dispatch/ThreadPoolBuilder.scala @@ -0,0 +1,259 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> + */ + +package akka.dispatch + +import java.util.Collection +import java.util.concurrent._ +import atomic.{ AtomicLong, AtomicInteger } +import ThreadPoolExecutor.CallerRunsPolicy + +import akka.util.Duration +import akka.event.EventHandler + +object ThreadPoolConfig { + type Bounds = Int + type FlowHandler = Either[RejectedExecutionHandler, Bounds] + type QueueFactory = () => BlockingQueue[Runnable] + + val defaultAllowCoreThreadTimeout: Boolean = false + val defaultCorePoolSize: Int = 16 + val defaultMaxPoolSize: Int = 128 + val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS) + def defaultFlowHandler: FlowHandler = flowHandler(new CallerRunsPolicy) + + def flowHandler(rejectionHandler: RejectedExecutionHandler): FlowHandler = Left(rejectionHandler) + def flowHandler(bounds: Int): FlowHandler = Right(bounds) + + def fixedPoolSize(size: Int): Int = size + def scaledPoolSize(multiplier: Double): Int = + (Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt + + def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory = + () => new ArrayBlockingQueue[Runnable](capacity, fair) + + def synchronousQueue(fair: Boolean): QueueFactory = + () => new SynchronousQueue[Runnable](fair) + + def linkedBlockingQueue(): QueueFactory = + () => new LinkedBlockingQueue[Runnable]() + + def linkedBlockingQueue(capacity: Int): QueueFactory = + () => new LinkedBlockingQueue[Runnable](capacity) + + def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory = + () => queue + + def reusableQueue(queueFactory: QueueFactory): QueueFactory = { + val queue = queueFactory() + () => queue + } +} + +case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout, + corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize, + maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, + threadTimeout: Duration = ThreadPoolConfig.defaultTimeout, + flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler, + queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) { + + final def createLazyExecutorService(threadFactory: ThreadFactory): ExecutorService = + new LazyExecutorServiceWrapper(createExecutorService(threadFactory)) + + final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = { + flowHandler match { + case Left(rejectHandler) => + val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler) + service.allowCoreThreadTimeOut(allowCorePoolTimeout) + service + case Right(bounds) => + val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory) + service.allowCoreThreadTimeOut(allowCorePoolTimeout) + new BoundedExecutorDecorator(service, bounds) + } + } +} + +trait DispatcherBuilder { + def build: MessageDispatcher +} + +object ThreadPoolConfigDispatcherBuilder { + def conf_?[T](opt: Option[T])(fun: (T) => ThreadPoolConfigDispatcherBuilder => ThreadPoolConfigDispatcherBuilder): Option[(ThreadPoolConfigDispatcherBuilder) => ThreadPoolConfigDispatcherBuilder] = opt map fun +} + +case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) => MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder { + import ThreadPoolConfig._ + def build = dispatcherFactory(config) + + //TODO remove this, for backwards compat only + @deprecated("Use .build instead", "1.1") + def buildThreadPool = build + + def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue())) + + def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(flowHandler = defaultFlowHandler, queueFactory = newQueueFactory)) + + def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder = + withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue)) + + def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(queueFactory = linkedBlockingQueue(), flowHandler = defaultFlowHandler)) + + def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity), flowHandler = defaultFlowHandler)) + + def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(queueFactory = synchronousQueue(fair), flowHandler = defaultFlowHandler)) + + def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair), flowHandler = defaultFlowHandler)) + + def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(corePoolSize = size)) + + def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(maxPoolSize = size)) + + def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder = + setCorePoolSize(scaledPoolSize(multiplier)) + + def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder = + setMaxPoolSize(scaledPoolSize(multiplier)) + + def setExecutorBounds(bounds: Int): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(flowHandler = flowHandler(bounds))) + + def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder = + setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS)) + + def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(threadTimeout = time)) + + def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolConfigDispatcherBuilder = + setFlowHandler(flowHandler(policy)) + + def setFlowHandler(newFlowHandler: FlowHandler): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(flowHandler = newFlowHandler)) + + def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(allowCorePoolTimeout = allow)) + + def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) => f.map(_(c)).getOrElse(c)) +} + +/** + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +class MonitorableThreadFactory(val name: String) extends ThreadFactory { + protected val counter = new AtomicLong + + def newThread(runnable: Runnable) = new MonitorableThread(runnable, name) +} + +/** + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +object MonitorableThread { + val DEFAULT_NAME = "MonitorableThread" + + // FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring + val created = new AtomicInteger + val alive = new AtomicInteger +} + +/** + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +class MonitorableThread(runnable: Runnable, name: String) + extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) { + + setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + def uncaughtException(thread: Thread, cause: Throwable) = {} + }) + + override def run = { + try { + MonitorableThread.alive.incrementAndGet + super.run + } finally { + MonitorableThread.alive.decrementAndGet + } + } +} + +/** + * @author <a href="http://jonasboner.com">Jonas Bonér</a> + */ +class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate { + protected val semaphore = new Semaphore(bound) + + override def execute(command: Runnable) = { + semaphore.acquire + try { + executor.execute(new Runnable() { + def run = { + try { + command.run + } finally { + semaphore.release + } + } + }) + } catch { + case e: RejectedExecutionException => + EventHandler.warning(this, e.toString) + semaphore.release + case e: Throwable => + EventHandler.error(e, this, e.getMessage) + throw e + } + } +} + +trait ExecutorServiceDelegate extends ExecutorService { + + def executor: ExecutorService + + def execute(command: Runnable) = executor.execute(command) + + def shutdown() { executor.shutdown() } + + def shutdownNow() = executor.shutdownNow() + + def isShutdown = executor.isShutdown + + def isTerminated = executor.isTerminated + + def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit) + + def submit[T](callable: Callable[T]) = executor.submit(callable) + + def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t) + + def submit(runnable: Runnable) = executor.submit(runnable) + + def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables) + + def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit) + + def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables) + + def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) +} + +trait LazyExecutorService extends ExecutorServiceDelegate { + + def createExecutor: ExecutorService + + lazy val executor = { + createExecutor + } +} + +class LazyExecutorServiceWrapper(executorFactory: => ExecutorService) extends LazyExecutorService { + def createExecutor = executorFactory +} |