diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-08-07 19:06:33 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-08-07 19:06:33 -0300 |
commit | 923b88e8adef2f66b43e551fa4a0a1bbae5af7ff (patch) | |
tree | d555199f0c63b690ec51805b496ee2d54eb014da /kamon-core/src/main/scala/kamon/executor/eventbus.scala | |
parent | 1e6665e30d96772eab92aca4d23e176adcd88dc5 (diff) | |
download | Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.tar.gz Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.tar.bz2 Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.zip |
upgrading to akka 2.2
Diffstat (limited to 'kamon-core/src/main/scala/kamon/executor/eventbus.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/executor/eventbus.scala | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala new file mode 100644 index 00000000..599f2a7a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala @@ -0,0 +1,103 @@ +package kamon.executor + +import akka.event.ActorEventBus +import akka.event.LookupClassification +import akka.actor._ +import java.util.concurrent.TimeUnit + +import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} +import akka.util.Timeout +import scala.util.{Random, Success, Failure} +import scala.concurrent.Future + +trait Message + +case class PostMessage(text:String) extends Message + +case class MessageEvent(val channel:String, val message:Message) + +class AppActorEventBus extends ActorEventBus with LookupClassification{ + type Event = MessageEvent + type Classifier=String + protected def mapSize(): Int={ + 10 + } + + protected def classify(event: Event): Classifier={ + event.channel + } + + protected def publish(event: Event, subscriber: Subscriber): Unit={ + subscriber ! event + } +} +case class Ping() +case class Pong() + +class PingActor extends Actor with ActorLogging { + + val pong = context.actorOf(Props[PongActor]) + val random = new Random() + def receive = { + case Pong() => { + //Thread.sleep(random.nextInt(2000)) + //log.info("Message from Ping") + pong ! Ping() + } + } +} + +class PongActor extends Actor with ActorLogging { + def receive = { + case Ping() => { + sender ! Pong() + } + } +} + + +object TryAkka extends App{ + val system = ActorSystem("MySystem") + val appActorEventBus=new AppActorEventBus + val NEW_POST_CHANNEL="/posts/new" + val subscriber = system.actorOf(Props(new Actor { + def receive = { + case d: MessageEvent => println(d) + } + })) + + Kamon.start + for(i <- 1 to 4) { + val ping = system.actorOf(Props[PingActor]) + ping ! Pong() + } + + + def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") + + /* + val newRelicReporter = new NewRelicReporter(registry) + newRelicReporter.start(1, TimeUnit.SECONDS) + +*/ + import akka.pattern.ask + implicit val timeout = Timeout(10, TimeUnit.SECONDS) + implicit def execContext = system.dispatcher + + + + Kamon.start + + Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) + threadPrintln("Before doing it") + val f = Future { threadPrintln("This is happening inside the future body") } + + Kamon.stop + + + //Thread.sleep(3000) + //system.shutdown() + +/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) + appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ +}
\ No newline at end of file |