/** * Copyright (C) 2009-2011 Scalable Solutions AB */ package akka.dispatch import akka.actor.{ Actor, ActorRef } import akka.actor.newUuid import akka.config.Config._ import akka.util.{ Duration, ReflectiveAccess } import akka.config.Configuration import java.util.concurrent.TimeUnit /** * Scala API. Dispatcher factory. *

* Example usage: *

 *   val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name")
 *   dispatcher
 *     .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
 *     .setCorePoolSize(16)
 *     .setMaxPoolSize(128)
 *     .setKeepAliveTimeInMillis(60000)
 *     .setRejectionPolicy(new CallerRunsPolicy)
 *     .build
 * 
*

* Java API. Dispatcher factory. *

* Example usage: *

 *   MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name");
 *   dispatcher
 *     .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
 *     .setCorePoolSize(16)
 *     .setMaxPoolSize(128)
 *     .setKeepAliveTimeInMillis(60000)
 *     .setRejectionPolicy(new CallerRunsPolicy())
 *     .build();
 * 
*

* * @author Jonas Bonér */ object Dispatchers { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout"). map(time => Duration(time, TIME_UNIT)). getOrElse(Duration(1000, TimeUnit.MILLISECONDS)) val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT) val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time", -1), TIME_UNIT) val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox() lazy val defaultGlobalDispatcher = { config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) } object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) /** * Creates an thread based dispatcher serving a single actor through the same single thread. * Uses the default timeout *

* E.g. each actor consumes its own thread. */ def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor) /** * Creates an thread based dispatcher serving a single actor through the same single thread. * Uses the default timeout * If capacity is negative, it's Integer.MAX_VALUE *

* E.g. each actor consumes its own thread. */ def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int) = new ThreadBasedDispatcher(actor, mailboxCapacity) /** * Creates an thread based dispatcher serving a single actor through the same single thread. * If capacity is negative, it's Integer.MAX_VALUE *

* E.g. each actor consumes its own thread. */ def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeOut) /** * Creates an executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. *

* Has a fluent builder interface for configuring its semantics. */ def newExecutorBasedEventDrivenDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenDispatcher(name, config), ThreadPoolConfig()) /** * Creates an executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. *

* Has a fluent builder interface for configuring its semantics. */ def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig()) /** * Creates an executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. *

* Has a fluent builder interface for configuring its semantics. */ def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig()) /** * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. *

* Has a fluent builder interface for configuring its semantics. */ def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, config), ThreadPoolConfig()) /** * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. *

* Has a fluent builder interface for configuring its semantics. */ def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int) = ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config), ThreadPoolConfig()) /** * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. *

* Has a fluent builder interface for configuring its semantics. */ def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig()) /** * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. *

* Has a fluent builder interface for configuring its semantics. */ def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig()) /** * Utility function that tries to load the specified dispatcher config from the akka.conf * or else use the supplied default dispatcher */ def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = config getSection key flatMap from getOrElse default /* * Creates of obtains a dispatcher from a ConfigMap according to the format below * * default-dispatcher { * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, * # GlobalExecutorBasedEventDriven * # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor * keep-alive-time = 60 # Keep alive time for threads * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) * executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded * allow-core-timeout = on # Allow core threads to time out * rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard * throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher * } * ex: from(config.getConfigMap(identifier).get) * * Gotcha: Only configures the dispatcher if possible * Returns: None if "type" isn't specified in the config * Throws: IllegalArgumentException if the value of "type" is not valid * IllegalArgumentException if it cannot */ def from(cfg: Configuration): Option[MessageDispatcher] = { cfg.getString("type") map { case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcherConfigurator() case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator() case "GlobalExecutorBasedEventDriven" => GlobalExecutorBasedEventDrivenDispatcherConfigurator case fqn => ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match { case r: Right[_, Class[MessageDispatcherConfigurator]] => ReflectiveAccess.createInstance[MessageDispatcherConfigurator](r.b, Array[Class[_]](), Array[AnyRef]()) match { case r: Right[Exception, MessageDispatcherConfigurator] => r.b case l: Left[Exception, MessageDispatcherConfigurator] => throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, l.a) } case l: Left[Exception, _] => throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, l.a) } } map { _ configure cfg } } } object GlobalExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator { def configure(config: Configuration): MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher } class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator { def configure(config: Configuration): MessageDispatcher = { configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenDispatcher( config.getString("name", newUuid.toString), config.getInt("throughput", Dispatchers.THROUGHPUT), config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), mailboxType(config), threadPoolConfig)).build } } class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends MessageDispatcherConfigurator { def configure(config: Configuration): MessageDispatcher = { configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher( config.getString("name", newUuid.toString), config.getInt("throughput", Dispatchers.THROUGHPUT), config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), mailboxType(config), threadPoolConfig)).build } }