diff options
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala')
-rw-r--r-- | test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala | 227 |
1 files changed, 0 insertions, 227 deletions
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala b/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala deleted file mode 100644 index a567d0bcb0..0000000000 --- a/test/disabled/presentation/akka/src/akka/dispatch/Dispatchers.scala +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se> - */ - -package akka.dispatch - -import akka.actor.{ Actor, ActorRef } -import akka.actor.newUuid -import akka.config.Config._ -import akka.util.{ Duration, ReflectiveAccess } - -import akka.config.Configuration - -import java.util.concurrent.TimeUnit - -/** - * Scala API. Dispatcher factory. - * <p/> - * Example usage: - * <pre/> - * val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name") - * dispatcher - * .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) - * .setCorePoolSize(16) - * .setMaxPoolSize(128) - * .setKeepAliveTimeInMillis(60000) - * .setRejectionPolicy(new CallerRunsPolicy) - * .build - * </pre> - * <p/> - * Java API. Dispatcher factory. - * <p/> - * Example usage: - * <pre/> - * MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name"); - * dispatcher - * .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) - * .setCorePoolSize(16) - * .setMaxPoolSize(128) - * .setKeepAliveTimeInMillis(60000) - * .setRejectionPolicy(new CallerRunsPolicy()) - * .build(); - * </pre> - * <p/> - * - * @author <a href="http://jonasboner.com">Jonas Bonér</a> - */ -object Dispatchers { - val THROUGHPUT = config.getInt("akka.actor.throughput", 5) - val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout"). - map(time => Duration(time, TIME_UNIT)). - getOrElse(Duration(1000, TimeUnit.MILLISECONDS)) - val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) - val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT) - val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time", -1), TIME_UNIT) - val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt - val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox() - - lazy val defaultGlobalDispatcher = { - config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) - } - - object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) - - /** - * Creates an thread based dispatcher serving a single actor through the same single thread. - * Uses the default timeout - * <p/> - * E.g. each actor consumes its own thread. - */ - def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor) - - /** - * Creates an thread based dispatcher serving a single actor through the same single thread. - * Uses the default timeout - * If capacity is negative, it's Integer.MAX_VALUE - * <p/> - * E.g. each actor consumes its own thread. - */ - def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int) = new ThreadBasedDispatcher(actor, mailboxCapacity) - - /** - * Creates an thread based dispatcher serving a single actor through the same single thread. - * If capacity is negative, it's Integer.MAX_VALUE - * <p/> - * E.g. each actor consumes its own thread. - */ - def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = - new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeOut) - - /** - * Creates an executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenDispatcher(name: String) = - ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenDispatcher(name, config), ThreadPoolConfig()) - - /** - * Creates an executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = - ThreadPoolConfigDispatcherBuilder(config => - new ExecutorBasedEventDrivenDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig()) - - /** - * Creates an executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = - ThreadPoolConfigDispatcherBuilder(config => - new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig()) - - /** - * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) = - ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, config), ThreadPoolConfig()) - - /** - * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int) = - ThreadPoolConfigDispatcherBuilder(config => - new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config), ThreadPoolConfig()) - - /** - * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = - ThreadPoolConfigDispatcherBuilder(config => - new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig()) - - /** - * Creates an executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. - * <p/> - * Has a fluent builder interface for configuring its semantics. - */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = - ThreadPoolConfigDispatcherBuilder(config => - new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig()) - /** - * Utility function that tries to load the specified dispatcher config from the akka.conf - * or else use the supplied default dispatcher - */ - def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = - config getSection key flatMap from getOrElse default - - /* - * Creates of obtains a dispatcher from a ConfigMap according to the format below - * - * default-dispatcher { - * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable - * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, - * # GlobalExecutorBasedEventDriven - * # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor - * keep-alive-time = 60 # Keep alive time for threads - * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) - * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) - * executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded - * allow-core-timeout = on # Allow core threads to time out - * rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard - * throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher - * } - * ex: from(config.getConfigMap(identifier).get) - * - * Gotcha: Only configures the dispatcher if possible - * Returns: None if "type" isn't specified in the config - * Throws: IllegalArgumentException if the value of "type" is not valid - * IllegalArgumentException if it cannot - */ - def from(cfg: Configuration): Option[MessageDispatcher] = { - cfg.getString("type") map { - case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcherConfigurator() - case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator() - case "GlobalExecutorBasedEventDriven" => GlobalExecutorBasedEventDrivenDispatcherConfigurator - case fqn => - ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match { - case r: Right[_, Class[MessageDispatcherConfigurator]] => - ReflectiveAccess.createInstance[MessageDispatcherConfigurator](r.b, Array[Class[_]](), Array[AnyRef]()) match { - case r: Right[Exception, MessageDispatcherConfigurator] => r.b - case l: Left[Exception, MessageDispatcherConfigurator] => - throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, l.a) - } - case l: Left[Exception, _] => - throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, l.a) - } - } map { - _ configure cfg - } - } -} - -object GlobalExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator { - def configure(config: Configuration): MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher -} - -class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator { - def configure(config: Configuration): MessageDispatcher = { - configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenDispatcher( - config.getString("name", newUuid.toString), - config.getInt("throughput", Dispatchers.THROUGHPUT), - config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), - mailboxType(config), - threadPoolConfig)).build - } -} - -class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends MessageDispatcherConfigurator { - def configure(config: Configuration): MessageDispatcher = { - configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher( - config.getString("name", newUuid.toString), - config.getInt("throughput", Dispatchers.THROUGHPUT), - config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), - mailboxType(config), - threadPoolConfig)).build - } -} |