blob: a4c34c5c67d0552bbd86d7303a19c1c006b548cd (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.routing
import akka.actor.{ UntypedActor, Actor, ActorRef }
/**
* A Dispatcher is a trait whose purpose is to route incoming messages to actors.
*/
trait Dispatcher { this: Actor =>
protected def transform(msg: Any): Any = msg
protected def routes: PartialFunction[Any, ActorRef]
protected def broadcast(message: Any) {}
protected def dispatch: Receive = {
case Routing.Broadcast(message) =>
broadcast(message)
case a if routes.isDefinedAt(a) =>
if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
else routes(a).!(transform(a))(None)
}
def receive = dispatch
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
}
/**
* An UntypedDispatcher is an abstract class whose purpose is to route incoming messages to actors.
*/
abstract class UntypedDispatcher extends UntypedActor {
protected def transform(msg: Any): Any = msg
protected def route(msg: Any): ActorRef
protected def broadcast(message: Any) {}
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
@throws(classOf[Exception])
def onReceive(msg: Any): Unit = {
if (msg.isInstanceOf[Routing.Broadcast]) broadcast(msg.asInstanceOf[Routing.Broadcast].message)
else {
val r = route(msg)
if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
if (isSenderDefined) r.forward(transform(msg))(someSelf)
else r.!(transform(msg))(None)
}
}
}
/**
* A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
trait LoadBalancer extends Dispatcher { self: Actor =>
protected def seq: InfiniteIterator[ActorRef]
protected def routes = {
case x if seq.hasNext => seq.next
}
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
}
/**
* A UntypedLoadBalancer is a specialized kind of UntypedDispatcher, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
abstract class UntypedLoadBalancer extends UntypedDispatcher {
protected def seq: InfiniteIterator[ActorRef]
protected def route(msg: Any) =
if (seq.hasNext) seq.next
else null
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
}
|