summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/dispatch/ThreadBasedDispatcher.scala
blob: 3169c70ef9005846cfbd0a4ef4508b14f6915b4f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
 * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
 */

package akka.dispatch

import akka.actor.{ Actor, ActorRef }
import akka.config.Config.config
import akka.util.Duration

import java.util.Queue
import java.util.concurrent.{ ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue }
import akka.actor
import java.util.concurrent.atomic.AtomicReference

/**
 * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
 *
 * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
 */
class ThreadBasedDispatcher(_actor: ActorRef, _mailboxType: MailboxType)
  extends ExecutorBasedEventDrivenDispatcher(
    _actor.uuid.toString, Dispatchers.THROUGHPUT, -1, _mailboxType, ThreadBasedDispatcher.oneThread) {

  private[akka] val owner = new AtomicReference[ActorRef](_actor)

  def this(actor: ActorRef) =
    this(actor, UnboundedMailbox()) // For Java API

  def this(actor: ActorRef, capacity: Int) =
    this(actor, BoundedMailbox(capacity)) //For Java API

  def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = //For Java API
    this(actor, BoundedMailbox(capacity, pushTimeOut))

  override def register(actorRef: ActorRef) = {
    val actor = owner.get()
    if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
    owner.compareAndSet(null, actorRef) //Register if unregistered
    super.register(actorRef)
  }

  override def unregister(actorRef: ActorRef) = {
    super.unregister(actorRef)
    owner.compareAndSet(actorRef, null) //Unregister (prevent memory leak)
  }
}

object ThreadBasedDispatcher {
  val oneThread: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1)
}