From 6ee6ea75f1f230b5156a546c1e0f16f6952f99a0 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 8 May 2013 18:25:52 -0300 Subject: wip in passing some kind of context along with actor messages --- src/main/scala/kamon/actor/EnhancedActor.scala | 45 ++++++++++++++++++++++++++ src/main/scala/kamon/executor/eventbus.scala | 36 ++++++++++++++------- 2 files changed, 69 insertions(+), 12 deletions(-) create mode 100644 src/main/scala/kamon/actor/EnhancedActor.scala (limited to 'src') diff --git a/src/main/scala/kamon/actor/EnhancedActor.scala b/src/main/scala/kamon/actor/EnhancedActor.scala new file mode 100644 index 00000000..ad879505 --- /dev/null +++ b/src/main/scala/kamon/actor/EnhancedActor.scala @@ -0,0 +1,45 @@ +package kamon.actor + +import akka.actor.{ActorRef, Actor} + +trait EnhancedActor extends Actor { + protected[this] var transactionContext: TransactionContext = _ + + final def receive = { + case a: Any => { + a match { + case ContextAwareMessage(ctx, message) => { + transactionContext = ctx + println(s"Actor ${self.path.toString}. Current context: ${transactionContext}") + wrappedReceive(message) + } + case message: Any => wrappedReceive(message) + } + } + } + + + + + def wrappedReceive: Receive + + + def superTell(target: ActorRef, message: Any) = { + target.tell(ContextAwareMessage(transactionContext, message), self) + } + +} + + +case class ContextAwareMessage(context: TransactionContext, message: Any) + + +case class TransactionContext(id: Long, entries: List[ContextEntry]) { + def append(entry: ContextEntry) = this.copy(entries = entry :: this.entries) +} +sealed trait ContextEntry + +case class DeveloperComment(comment: String) extends ContextEntry + +case class MessageExecutionTime(actorPath: String, begin: Long, end: Long) extends ContextEntry + diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala index d64ff444..09f28b69 100644 --- a/src/main/scala/kamon/executor/eventbus.scala +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -8,6 +8,8 @@ import kamon.metric.NewRelicReporter import com.yammer.metrics.core.{MetricName, MetricsRegistry} import com.yammer.metrics.reporting.ConsoleReporter +import kamon.actor.{DeveloperComment, TransactionContext, ContextAwareMessage, EnhancedActor} +import scala.concurrent.Future trait Message @@ -48,18 +50,28 @@ object TryAkka extends App{ case class Ping() case class Pong() - class PingActor(val target: ActorRef) extends Actor { - def receive = { - case Pong() => target ! Ping() + class PingActor(val target: ActorRef) extends EnhancedActor { + import akka.pattern.pipe + implicit def executionContext = context.dispatcher + + def wrappedReceive = { + case Pong() => { + transactionContext = transactionContext.append(DeveloperComment("In PONG")) + + + Future { + Thread.sleep(1000) // Doing something really expensive + ContextAwareMessage(transactionContext, Ping()) + } pipeTo target + + } } } - class PongActor extends Actor { - var i = 0 - def receive = { + class PongActor extends EnhancedActor { + def wrappedReceive = { case Ping() => { - i=i+1 - sender ! Pong() + superTell(sender, Pong()) } } } @@ -71,10 +83,10 @@ object TryAkka extends App{ */ - for(i <- 1 to 8) { - val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-actor-${i}"))), s"pong-actor-${i}") - ping ! Pong() - } + /*for(i <- 1 to 8) {*/ + val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], "ping"))), "pong") + ping ! ContextAwareMessage(TransactionContext(1707, Nil), Pong()) + //} /* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) -- cgit v1.2.3