aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-05-08 18:25:52 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-05-08 18:25:52 -0300
commit6ee6ea75f1f230b5156a546c1e0f16f6952f99a0 (patch)
tree59c34ff6a4dca2b9e0b889f033ee5ea32cfb2a87 /src
parent50cc3a5a6508e390746f590ea83e5473367dd59e (diff)
downloadKamon-6ee6ea75f1f230b5156a546c1e0f16f6952f99a0.tar.gz
Kamon-6ee6ea75f1f230b5156a546c1e0f16f6952f99a0.tar.bz2
Kamon-6ee6ea75f1f230b5156a546c1e0f16f6952f99a0.zip
wip in passing some kind of context along with actor messages
Diffstat (limited to 'src')
-rw-r--r--src/main/scala/kamon/actor/EnhancedActor.scala45
-rw-r--r--src/main/scala/kamon/executor/eventbus.scala36
2 files changed, 69 insertions, 12 deletions
diff --git a/src/main/scala/kamon/actor/EnhancedActor.scala b/src/main/scala/kamon/actor/EnhancedActor.scala
new file mode 100644
index 00000000..ad879505
--- /dev/null
+++ b/src/main/scala/kamon/actor/EnhancedActor.scala
@@ -0,0 +1,45 @@
+package kamon.actor
+
+import akka.actor.{ActorRef, Actor}
+
+trait EnhancedActor extends Actor {
+ protected[this] var transactionContext: TransactionContext = _
+
+ final def receive = {
+ case a: Any => {
+ a match {
+ case ContextAwareMessage(ctx, message) => {
+ transactionContext = ctx
+ println(s"Actor ${self.path.toString}. Current context: ${transactionContext}")
+ wrappedReceive(message)
+ }
+ case message: Any => wrappedReceive(message)
+ }
+ }
+ }
+
+
+
+
+ def wrappedReceive: Receive
+
+
+ def superTell(target: ActorRef, message: Any) = {
+ target.tell(ContextAwareMessage(transactionContext, message), self)
+ }
+
+}
+
+
+case class ContextAwareMessage(context: TransactionContext, message: Any)
+
+
+case class TransactionContext(id: Long, entries: List[ContextEntry]) {
+ def append(entry: ContextEntry) = this.copy(entries = entry :: this.entries)
+}
+sealed trait ContextEntry
+
+case class DeveloperComment(comment: String) extends ContextEntry
+
+case class MessageExecutionTime(actorPath: String, begin: Long, end: Long) extends ContextEntry
+
diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala
index d64ff444..09f28b69 100644
--- a/src/main/scala/kamon/executor/eventbus.scala
+++ b/src/main/scala/kamon/executor/eventbus.scala
@@ -8,6 +8,8 @@ import kamon.metric.NewRelicReporter
import com.yammer.metrics.core.{MetricName, MetricsRegistry}
import com.yammer.metrics.reporting.ConsoleReporter
+import kamon.actor.{DeveloperComment, TransactionContext, ContextAwareMessage, EnhancedActor}
+import scala.concurrent.Future
trait Message
@@ -48,18 +50,28 @@ object TryAkka extends App{
case class Ping()
case class Pong()
- class PingActor(val target: ActorRef) extends Actor {
- def receive = {
- case Pong() => target ! Ping()
+ class PingActor(val target: ActorRef) extends EnhancedActor {
+ import akka.pattern.pipe
+ implicit def executionContext = context.dispatcher
+
+ def wrappedReceive = {
+ case Pong() => {
+ transactionContext = transactionContext.append(DeveloperComment("In PONG"))
+
+
+ Future {
+ Thread.sleep(1000) // Doing something really expensive
+ ContextAwareMessage(transactionContext, Ping())
+ } pipeTo target
+
+ }
}
}
- class PongActor extends Actor {
- var i = 0
- def receive = {
+ class PongActor extends EnhancedActor {
+ def wrappedReceive = {
case Ping() => {
- i=i+1
- sender ! Pong()
+ superTell(sender, Pong())
}
}
}
@@ -71,10 +83,10 @@ object TryAkka extends App{
*/
- for(i <- 1 to 8) {
- val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-actor-${i}"))), s"pong-actor-${i}")
- ping ! Pong()
- }
+ /*for(i <- 1 to 8) {*/
+ val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], "ping"))), "pong")
+ ping ! ContextAwareMessage(TransactionContext(1707, Nil), Pong())
+ //}
/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)