summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
blob: 4cba8eec8b58b8b9fd6fa1d1330b3b4b28d8c7cd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/**
 *    Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
 */

package akka.dispatch

import akka.actor.{ ActorRef, Actor, IllegalActorStateException }
import akka.util.{ ReflectiveAccess, Switch }

import java.util.Queue
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue }
import util.DynamicVariable

/**
 * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
 * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the
 * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
 * <p/>
 * Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
 * best described as "work donating" because the actor of which work is being stolen takes the initiative.
 * <p/>
 * The preferred way of creating dispatchers is to use
 * the {@link akka.dispatch.Dispatchers} factory object.
 *
 * @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
 * @see akka.dispatch.Dispatchers
 *
 * @author Viktor Klang
 */
class ExecutorBasedEventDrivenWorkStealingDispatcher(
  _name: String,
  throughput: Int = Dispatchers.THROUGHPUT,
  throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
  mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
  config: ThreadPoolConfig = ThreadPoolConfig())
  extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) {

  def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
    this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage

  def this(_name: String, throughput: Int, mailboxType: MailboxType) =
    this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage

  def this(_name: String, throughput: Int) =
    this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage

  def this(_name: String, _config: ThreadPoolConfig) =
    this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)

  def this(_name: String, memberType: Class[_ <: Actor]) =
    this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage

  def this(_name: String, mailboxType: MailboxType) =
    this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage

  @volatile
  private var actorType: Option[Class[_]] = None
  @volatile
  private var members = Vector[ActorRef]()
  private val donationInProgress = new DynamicVariable(false)

  private[akka] override def register(actorRef: ActorRef) = {
    //Verify actor type conformity
    actorType match {
      case None => actorType = Some(actorRef.actor.getClass)
      case Some(aType) =>
        if (aType != actorRef.actor.getClass)
          throw new IllegalActorStateException(String.format(
            "Can't register actor %s in a work stealing dispatcher which already knows actors of type %s",
            actorRef, aType))
    }

    synchronized { members :+= actorRef } //Update members
    super.register(actorRef)
  }

  private[akka] override def unregister(actorRef: ActorRef) = {
    synchronized { members = members.filterNot(actorRef eq) } //Update members
    super.unregister(actorRef)
  }

  override private[akka] def dispatch(invocation: MessageInvocation) = {
    val mbox = getMailbox(invocation.receiver)
    if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) {
      //We were busy and we got to donate the message to some other lucky guy, we're done here
    } else {
      mbox enqueue invocation
      registerForExecution(mbox)
    }
  }

  override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
    try {
      donationInProgress.value = true
      while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor
    } finally { donationInProgress.value = false }

    if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution
      super.reRegisterForExecution(mbox)
  }

  /**
   * Returns true if it successfully donated a message
   */
  protected def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
    val actors = members // copy to prevent concurrent modifications having any impact

    // we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
    // the dispatcher is being shut down...
    // Starts at is seeded by current time
    doFindDonorRecipient(donorMbox, actors, (System.currentTimeMillis % actors.size).asInstanceOf[Int]) match {
      case null      => false
      case recipient => donate(donorMbox.dequeue, recipient)
    }
  }

  /**
   * Returns true if the donation succeeded or false otherwise
   */
  protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = try {
    donationInProgress.value = true
    val actors = members // copy to prevent concurrent modifications having any impact
    doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match {
      case null      => false
      case recipient => donate(message, recipient)
    }
  } finally { donationInProgress.value = false }

  /**
   * Rewrites the message and adds that message to the recipients mailbox
   * returns true if the message is non-null
   */
  protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = {
    if (organ ne null) {
      if (organ.senderFuture.isDefined) recipient.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
        organ.message, recipient.timeout, organ.sender, organ.senderFuture)
      else if (organ.sender.isDefined) recipient.postMessageToMailbox(organ.message, organ.sender)
      else recipient.postMessageToMailbox(organ.message, None)
      true
    } else false
  }

  /**
   * Returns an available recipient for the message, if any
   */
  protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorRef], startIndex: Int): ActorRef = {
    val prSz = potentialRecipients.size
    var i = 0
    var recipient: ActorRef = null

    while ((i < prSz) && (recipient eq null)) {
      val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap
      val mbox = getMailbox(actor)

      if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself
        recipient = actor //Found!
      }

      i += 1
    }

    recipient // nothing found, reuse same start index next time
  }
}