From 9382ce9d66b5d6bfef515cee56f40aa178920335 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Tue, 10 Sep 2013 18:35:25 -0300 Subject: Simple instrumentation just for keeping the uow. --- kamon-core/src/main/resources/application.conf | 2 +- kamon-core/src/main/scala/kamon/Kamon.scala | 12 +++-- .../instrumentation/ActorInstrumentation.scala | 41 +++++++++++++++ .../ActorRefTellInstrumentation.scala | 59 +++++++--------------- kamon-core/src/main/scala/test/PingPong.scala | 7 ++- 5 files changed, 73 insertions(+), 48 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf index 2f8d8d87..647939f8 100644 --- a/kamon-core/src/main/resources/application.conf +++ b/kamon-core/src/main/resources/application.conf @@ -2,7 +2,7 @@ akka { loglevel = DEBUG stdout-loglevel = DEBUG - extensions = ["kamon.dashboard.DashboardExtension"] + #extensions = ["kamon.dashboard.DashboardExtension"] actor { default-dispatcher { diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 298f43eb..118239f7 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -5,6 +5,12 @@ import kamon.metric.{HistogramSnapshot, ActorSystemMetrics} import scala.concurrent.duration.FiniteDuration import com.newrelic.api.agent.NewRelic import scala.collection.concurrent.TrieMap +import kamon.instrumentation.{SimpleContextPassingInstrumentation, ActorInstrumentationConfiguration} + + +object Instrument { + val instrumentation: ActorInstrumentationConfiguration = new SimpleContextPassingInstrumentation +} object Kamon { implicit lazy val actorSystem = ActorSystem("kamon") @@ -19,8 +25,8 @@ object Kamon { def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) } - val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") - val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") + //val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") + //val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") } @@ -79,7 +85,7 @@ case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThre class NewrelicReporterActor extends Actor { import scala.concurrent.duration._ - Kamon.metricManager ! RegisterForAllDispatchers(5 seconds) + //Kamon.metricManager ! RegisterForAllDispatchers(5 seconds) def receive = { case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => { diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala new file mode 100644 index 00000000..4e47c2a4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala @@ -0,0 +1,41 @@ +package kamon.instrumentation + +import akka.actor.{Props, ActorSystem, ActorRef} +import akka.dispatch.{MessageDispatcher, Envelope} +import kamon.{Tracer, TraceContext} +import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage + +trait ActorInstrumentationConfiguration { + def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any + def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation +} + + +trait ActorReceiveInvokeInstrumentation { + def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) +} + +object ActorReceiveInvokeInstrumentation { + val noopPreReceive = new ActorReceiveInvokeInstrumentation{ + def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = (envelope, None) + } +} + +class SimpleContextPassingInstrumentation extends ActorInstrumentationConfiguration { + def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any = SimpleTraceMessage(message, Tracer.context) + + def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation = { + new ActorReceiveInvokeInstrumentation { + def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = envelope match { + case env @ Envelope(SimpleTraceMessage(msg, ctx), _) => (env.copy(message = msg), ctx) + case anyOther => (anyOther, None) + } + } + } +} + +object SimpleContextPassingInstrumentation { + case class SimpleTraceMessage(message: Any, context: Option[TraceContext]) +} + + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 7d3e36ca..84498cb8 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -3,11 +3,12 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{Props, ActorSystem, ActorRef} -import kamon.{Tracer, TraceContext} +import kamon.{Kamon, Tracer, TraceContext} import akka.dispatch.{MessageDispatcher, Envelope} import com.codahale.metrics.Timer import kamon.metric.{MetricDirectory, Metrics} import scala.Some +import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) @@ -16,76 +17,48 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti class ActorRefTellInstrumentation { import ProceedingJoinPointPimp._ - val t2 = Metrics.registry.timer("some" + "LATENCY") - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)") def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(actor, message, sender)") def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { + import kamon.Instrument.instrumentation.sendMessageTransformation - //val actorName = MetricDirectory.nameForActor(actor) - //val t = Metrics.registry.timer(actorName + "LATENCY") - //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]") - pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender) - + pjp.proceedWithTarget(actor, sendMessageTransformation(sender, actor, message).asInstanceOf[AnyRef], sender) } } -@Aspect("perthis(actorCellCreation(..))") -class ActorCellInvokeInstrumentation { - var processingTimeTimer: Timer = _ - var shouldTrack = false +@Aspect("""perthis(actorCellCreation(akka.actor.ActorSystem, akka.actor.ActorRef, akka.actor.Props, akka.dispatch.MessageDispatcher, akka.actor.ActorRef))""") +class ActorCellInvokeInstrumentation { + var instrumentation = ActorReceiveInvokeInstrumentation.noopPreReceive // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. - @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} @After("actorCellCreation(system, ref, props, dispatcher, parent)") def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - val actorName = MetricDirectory.nameForActor(ref) - val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) - - //println("=====> Created ActorCell for: "+ref.toString()) - /** TODO: Find a better way to filter the things we don't want to measure. */ - //if(system.name != "kamon" && actorName.startsWith("/user")) { - processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") - shouldTrack = true - //} + instrumentation = kamon.Instrument.instrumentation.receiveInvokeInstrumentation(system, ref, props, dispatcher, parent) } @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} - @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { import ProceedingJoinPointPimp._ - //println("ENVELOPE --------------------->"+envelope) - envelope match { - case Envelope(TraceableMessage(ctx, msg, timer), sender) => { - //timer.stop() - val originalEnvelope = envelope.copy(message = msg) - - //println("PROCESSING TIME TIMER: "+processingTimeTimer) - val pt = processingTimeTimer.time() - ctx match { - case Some(c) => { - Tracer.set(c) - //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) - pjp.proceedWith(originalEnvelope) - Tracer.clear - } - case None => pjp.proceedWith(originalEnvelope) - } - pt.stop() + val (originalEnvelope, ctx) = instrumentation.preReceive(envelope) + ctx match { + case Some(c) => { + Tracer.set(c) + pjp.proceedWith(originalEnvelope) + Tracer.clear } - case _ => pjp.proceed + case None => pjp.proceedWith(originalEnvelope) } } } @@ -117,3 +90,5 @@ class UnregisteredActorRefInstrumentation { } } } + + diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala index f9d6869c..6ed17ec6 100644 --- a/kamon-core/src/main/scala/test/PingPong.scala +++ b/kamon-core/src/main/scala/test/PingPong.scala @@ -1,6 +1,6 @@ package test -import akka.actor.{Props, Actor, ActorSystem} +import akka.actor.{Deploy, Props, Actor, ActorSystem} object PingPong extends App { @@ -22,8 +22,11 @@ case object Ping case object Pong class Pinger extends Actor { + val ponger = context.actorOf(Props[Ponger], "ponger#") + val ponger2 = context.actorOf(Props[Ponger], "ponger#") + def receive = { - case Pong => sender ! Ping + case Pong => ponger ! Ping } } -- cgit v1.2.3