aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/executor/eventbus.scala
blob: 33ff4a4ed021d9feffed8877b03006a17c4567df (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package kamon.executor

import akka.event.ActorEventBus
import akka.event.LookupClassification
import akka.actor._
import java.util.concurrent.TimeUnit

import kamon.{Tracer, CodeBlockExecutionTime, Kamon, TraceContext}
import akka.util.Timeout
import scala.util.{Random, Success, Failure}
import scala.concurrent.Future

trait Message

case class PostMessage(text:String) extends Message

case class MessageEvent(val channel:String, val message:Message)

class AppActorEventBus extends ActorEventBus with LookupClassification{
  type Event = MessageEvent
  type Classifier=String
  protected def mapSize(): Int={
    10
  }

  protected def classify(event: Event): Classifier={
    event.channel
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit={
    subscriber ! event
  }
}
case class Ping()
case class Pong()

class PingActor extends Actor with ActorLogging {

  val pong = context.actorOf(Props[PongActor])
  val random = new Random()
  def receive = {
    case Pong() => {
      //Thread.sleep(random.nextInt(2000))
      //log.info("Message from Ping")
      pong ! Ping()
    }
  }
}

class PongActor extends Actor with ActorLogging {
  def receive = {
    case Ping() => {
      sender ! Pong()
    }
  }
}


object TryAkka extends App{
  val system = ActorSystem("MySystem")
  val appActorEventBus=new AppActorEventBus
  val NEW_POST_CHANNEL="/posts/new"
  val subscriber = system.actorOf(Props(new Actor {
    def receive = {
      case d: MessageEvent => println(d)
    }
  }))

  Tracer.start
  for(i <- 1 to 4) {
    val ping = system.actorOf(Props[PingActor])
    ping ! Pong()
  }


  def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Tracer.context}] : $body")

  /*
  val newRelicReporter = new NewRelicReporter(registry)
  newRelicReporter.start(1, TimeUnit.SECONDS)

*/
  import akka.pattern.ask
  implicit val timeout = Timeout(10, TimeUnit.SECONDS)
  implicit def execContext = system.dispatcher



  Tracer.start

  Tracer.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime()))
  threadPrintln("Before doing it")
  val f = Future { threadPrintln("This is happening inside the future body") }

  Tracer.stop


  //Thread.sleep(3000)
  //system.shutdown()

/*  appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
  appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/
}