aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/executor/eventbus.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/executor/eventbus.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/executor/eventbus.scala103
1 files changed, 103 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
new file mode 100644
index 00000000..599f2a7a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala
@@ -0,0 +1,103 @@
+package kamon.executor
+
+import akka.event.ActorEventBus
+import akka.event.LookupClassification
+import akka.actor._
+import java.util.concurrent.TimeUnit
+
+import kamon.{CodeBlockExecutionTime, Kamon, TraceContext}
+import akka.util.Timeout
+import scala.util.{Random, Success, Failure}
+import scala.concurrent.Future
+
+trait Message
+
+case class PostMessage(text:String) extends Message
+
+case class MessageEvent(val channel:String, val message:Message)
+
+class AppActorEventBus extends ActorEventBus with LookupClassification{
+ type Event = MessageEvent
+ type Classifier=String
+ protected def mapSize(): Int={
+ 10
+ }
+
+ protected def classify(event: Event): Classifier={
+ event.channel
+ }
+
+ protected def publish(event: Event, subscriber: Subscriber): Unit={
+ subscriber ! event
+ }
+}
+case class Ping()
+case class Pong()
+
+class PingActor extends Actor with ActorLogging {
+
+ val pong = context.actorOf(Props[PongActor])
+ val random = new Random()
+ def receive = {
+ case Pong() => {
+ //Thread.sleep(random.nextInt(2000))
+ //log.info("Message from Ping")
+ pong ! Ping()
+ }
+ }
+}
+
+class PongActor extends Actor with ActorLogging {
+ def receive = {
+ case Ping() => {
+ sender ! Pong()
+ }
+ }
+}
+
+
+object TryAkka extends App{
+ val system = ActorSystem("MySystem")
+ val appActorEventBus=new AppActorEventBus
+ val NEW_POST_CHANNEL="/posts/new"
+ val subscriber = system.actorOf(Props(new Actor {
+ def receive = {
+ case d: MessageEvent => println(d)
+ }
+ }))
+
+ Kamon.start
+ for(i <- 1 to 4) {
+ val ping = system.actorOf(Props[PingActor])
+ ping ! Pong()
+ }
+
+
+ def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body")
+
+ /*
+ val newRelicReporter = new NewRelicReporter(registry)
+ newRelicReporter.start(1, TimeUnit.SECONDS)
+
+*/
+ import akka.pattern.ask
+ implicit val timeout = Timeout(10, TimeUnit.SECONDS)
+ implicit def execContext = system.dispatcher
+
+
+
+ Kamon.start
+
+ Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime()))
+ threadPrintln("Before doing it")
+ val f = Future { threadPrintln("This is happening inside the future body") }
+
+ Kamon.stop
+
+
+ //Thread.sleep(3000)
+ //system.shutdown()
+
+/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
+ appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/
+} \ No newline at end of file