aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/kamon/executor/eventbus.scala
blob: d83f2ac6dca036c10824ba1d87a2ba6a7bc1e633 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package kamon.executor

import akka.event.ActorEventBus
import akka.event.LookupClassification
import akka.actor._
import java.util.concurrent.TimeUnit

import kamon.{Kamon, TraceContext}
import akka.util.Timeout
import scala.util.Success
import scala.util.Failure
import scala.concurrent.Future

trait Message

case class PostMessage(text:String) extends Message

case class MessageEvent(val channel:String, val message:Message)

class AppActorEventBus extends ActorEventBus with LookupClassification{
  type Event = MessageEvent
  type Classifier=String
  protected def mapSize(): Int={
    10
  }

  protected def classify(event: Event): Classifier={
    event.channel
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit={
    subscriber ! event
  }
}
case class Ping()
case class Pong()

class PingActor(val target: ActorRef) extends Actor with ActorLogging {
  implicit def executionContext = context.dispatcher
  implicit val timeout = Timeout(30, TimeUnit.SECONDS)

  def receive = {
    case Pong() => {
      log.info(s"pong with context ${Kamon.context}")
      Thread.sleep(1000)
      sender ! Ping()
    }
    case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000)
  }

  def withAny(): Any = {1}
  def withAnyRef(): AnyRef = {new Object}
}

class PongActor extends Actor with ActorLogging {
  def receive = {
    case Ping() => {
      Thread.sleep(3000)
      sender ! Pong()
      log.info(s"ping with context ${Kamon.context}")
    }
    case a: Any => println(s"Got ${a} in PONG")
  }
}


object TryAkka extends App{
  val system = ActorSystem("MySystem")
  val appActorEventBus=new AppActorEventBus
  val NEW_POST_CHANNEL="/posts/new"
  val subscriber = system.actorOf(Props(new Actor {
    def receive = {
      case d: MessageEvent => println(d)
    }
  }))





  def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body")

  /*
  val newRelicReporter = new NewRelicReporter(registry)
  newRelicReporter.start(1, TimeUnit.SECONDS)

*/
  import akka.pattern.ask
  implicit val timeout = Timeout(10, TimeUnit.SECONDS)
  implicit def execContext = system.dispatcher
  //for(i <- 1 to 8) {
/*  val i = 1
    TraceContext.start
    val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-${i}"))), s"pong-${i}")
    val f = ping ? Pong()

  f.map({
    a => threadPrintln(s"In the map body, with the context: ${TraceContext.current}")
  })
  .flatMap({
    (a: Any) => {
      threadPrintln(s"Executing the flatMap, with the context: ${TraceContext.current}")
      Future { s"In the flatMap body, with the context: ${TraceContext.current}" }
    }
  })
    .onComplete({
    case Success(p) => threadPrintln(s"On my main success, with String [$p] and the context: ${TraceContext.current}")
    case Failure(t) => threadPrintln(s"Something went wrong in the main, with the context: ${TraceContext.current}")
  })*/
  //}

  Kamon.start
  threadPrintln("Before doing it")
  val f = Future { threadPrintln("This is happening inside the future body") }

  Kamon.context.get.close



/*  appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
  appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/
}