blob: 4c00577157874a7a2d20df2e50047847c9ccc16e (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.dispatch
import akka.AkkaException
import java.util.{ Comparator, PriorityQueue }
import java.util.concurrent._
import akka.util._
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
/**
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
*/
trait MessageQueue {
val dispatcherLock = new SimpleLock
val suspended = new SimpleLock
def enqueue(handle: MessageInvocation)
def dequeue(): MessageInvocation
def size: Int
def isEmpty: Boolean
}
/**
* Mailbox configuration.
*/
sealed trait MailboxType
case class UnboundedMailbox() extends MailboxType
case class BoundedMailbox(
val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
}
trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] =>
@inline
final def enqueue(handle: MessageInvocation): Unit = this add handle
@inline
final def dequeue(): MessageInvocation = this.poll()
}
trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] =>
def pushTimeOut: Duration
final def enqueue(handle: MessageInvocation) {
if (pushTimeOut.length > 0) {
this.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
}
} else this put handle
}
@inline
final def dequeue(): MessageInvocation = this.poll()
}
class DefaultUnboundedMessageQueue extends LinkedBlockingQueue[MessageInvocation] with UnboundedMessageQueueSemantics
class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration) extends LinkedBlockingQueue[MessageInvocation](capacity) with BoundedMessageQueueSemantics
class UnboundedPriorityMessageQueue(cmp: Comparator[MessageInvocation]) extends PriorityBlockingQueue[MessageInvocation](11, cmp) with UnboundedMessageQueueSemantics
class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, cmp: Comparator[MessageInvocation]) extends BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with BoundedMessageQueueSemantics
|