summaryrefslogtreecommitdiff
path: root/test/disabled/presentation/akka/src/akka/routing/Routing.scala
blob: befc124248e1bcb5a5a6d73c63314e8bd095ff3d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
 * Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
 */

package akka.routing

import akka.actor.{ Actor, ActorRef }
import akka.actor.Actor._

object Routing {

  sealed trait RoutingMessage
  case class Broadcast(message: Any) extends RoutingMessage

  type PF[A, B] = PartialFunction[A, B]

  /**
   * Creates a new PartialFunction whose isDefinedAt is a combination
   * of the two parameters, and whose apply is first to call filter.apply
   * and then filtered.apply.
   */
  def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = {
    case a: A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) =>
      filter(a)
      filtered(a)
  }

  /**
   * Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true.
   */
  def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] =
    filter({ case a if a.isInstanceOf[A] => interceptor(a) }, interceptee)

  /**
   * Creates a LoadBalancer from the thunk-supplied InfiniteIterator.
   */
  def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef =
    actorOf(new Actor with LoadBalancer {
      val seq = actors
    }).start()

  /**
   * Creates a Dispatcher given a routing and a message-transforming function.
   */
  def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef =
    actorOf(new Actor with Dispatcher {
      override def transform(msg: Any) = msgTransformer(msg)
      def routes = routing
    }).start()

  /**
   * Creates a Dispatcher given a routing.
   */
  def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Dispatcher {
    def routes = routing
  }).start()

  /**
   * Creates an actor that pipes all incoming messages to
   * both another actor and through the supplied function
   */
  def loggerActor(actorToLog: ActorRef, logger: (Any) => Unit): ActorRef =
    dispatcherActor({ case _ => actorToLog }, logger)
}