summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
diff options
context:
space:
mode:
Diffstat (limited to 'test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala')
-rw-r--r--test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala165
1 files changed, 0 insertions, 165 deletions
diff --git a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
deleted file mode 100644
index 4cba8eec8b..0000000000
--- a/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
- */
-
-package akka.dispatch
-
-import akka.actor.{ ActorRef, Actor, IllegalActorStateException }
-import akka.util.{ ReflectiveAccess, Switch }
-
-import java.util.Queue
-import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
-import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue }
-import util.DynamicVariable
-
-/**
- * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
- * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the
- * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
- * <p/>
- * Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
- * best described as "work donating" because the actor of which work is being stolen takes the initiative.
- * <p/>
- * The preferred way of creating dispatchers is to use
- * the {@link akka.dispatch.Dispatchers} factory object.
- *
- * @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
- * @see akka.dispatch.Dispatchers
- *
- * @author Viktor Klang
- */
-class ExecutorBasedEventDrivenWorkStealingDispatcher(
- _name: String,
- throughput: Int = Dispatchers.THROUGHPUT,
- throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
- mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
- config: ThreadPoolConfig = ThreadPoolConfig())
- extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) {
-
- def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
- this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
-
- def this(_name: String, throughput: Int, mailboxType: MailboxType) =
- this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
-
- def this(_name: String, throughput: Int) =
- this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
-
- def this(_name: String, _config: ThreadPoolConfig) =
- this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
-
- def this(_name: String, memberType: Class[_ <: Actor]) =
- this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
-
- def this(_name: String, mailboxType: MailboxType) =
- this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
-
- @volatile
- private var actorType: Option[Class[_]] = None
- @volatile
- private var members = Vector[ActorRef]()
- private val donationInProgress = new DynamicVariable(false)
-
- private[akka] override def register(actorRef: ActorRef) = {
- //Verify actor type conformity
- actorType match {
- case None => actorType = Some(actorRef.actor.getClass)
- case Some(aType) =>
- if (aType != actorRef.actor.getClass)
- throw new IllegalActorStateException(String.format(
- "Can't register actor %s in a work stealing dispatcher which already knows actors of type %s",
- actorRef, aType))
- }
-
- synchronized { members :+= actorRef } //Update members
- super.register(actorRef)
- }
-
- private[akka] override def unregister(actorRef: ActorRef) = {
- synchronized { members = members.filterNot(actorRef eq) } //Update members
- super.unregister(actorRef)
- }
-
- override private[akka] def dispatch(invocation: MessageInvocation) = {
- val mbox = getMailbox(invocation.receiver)
- if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) {
- //We were busy and we got to donate the message to some other lucky guy, we're done here
- } else {
- mbox enqueue invocation
- registerForExecution(mbox)
- }
- }
-
- override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
- try {
- donationInProgress.value = true
- while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor
- } finally { donationInProgress.value = false }
-
- if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution
- super.reRegisterForExecution(mbox)
- }
-
- /**
- * Returns true if it successfully donated a message
- */
- protected def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
- val actors = members // copy to prevent concurrent modifications having any impact
-
- // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
- // the dispatcher is being shut down...
- // Starts at is seeded by current time
- doFindDonorRecipient(donorMbox, actors, (System.currentTimeMillis % actors.size).asInstanceOf[Int]) match {
- case null => false
- case recipient => donate(donorMbox.dequeue, recipient)
- }
- }
-
- /**
- * Returns true if the donation succeeded or false otherwise
- */
- protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = try {
- donationInProgress.value = true
- val actors = members // copy to prevent concurrent modifications having any impact
- doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match {
- case null => false
- case recipient => donate(message, recipient)
- }
- } finally { donationInProgress.value = false }
-
- /**
- * Rewrites the message and adds that message to the recipients mailbox
- * returns true if the message is non-null
- */
- protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = {
- if (organ ne null) {
- if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
- organ.message, recipient.timeout, organ.sender, organ.senderFuture)
- else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender)
- else recipient.postMessageToMailbox(organ.message, None)
- true
- } else false
- }
-
- /**
- * Returns an available recipient for the message, if any
- */
- protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorRef], startIndex: Int): ActorRef = {
- val prSz = potentialRecipients.size
- var i = 0
- var recipient: ActorRef = null
-
- while ((i < prSz) && (recipient eq null)) {
- val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap
- val mbox = getMailbox(actor)
-
- if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself
- recipient = actor //Found!
- }
-
- i += 1
- }
-
- recipient // nothing found, reuse same start index next time
- }
-}