aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/instrumentation
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-09-10 18:35:25 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-09-10 18:35:25 -0300
commitd8305b781aaf454cec558bfb86a366682e9f5eab (patch)
treed2ae762332faef33aedfaf68b1732611c4f1542d /kamon-core/src/main/scala/kamon/instrumentation
parent218b8896f2987c43068646cea8eca597ceaf0843 (diff)
downloadKamon-d8305b781aaf454cec558bfb86a366682e9f5eab.tar.gz
Kamon-d8305b781aaf454cec558bfb86a366682e9f5eab.tar.bz2
Kamon-d8305b781aaf454cec558bfb86a366682e9f5eab.zip
Simple instrumentation just for keeping the uow.
Diffstat (limited to 'kamon-core/src/main/scala/kamon/instrumentation')
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala41
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala59
2 files changed, 58 insertions, 42 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala
new file mode 100644
index 00000000..4e47c2a4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala
@@ -0,0 +1,41 @@
+package kamon.instrumentation
+
+import akka.actor.{Props, ActorSystem, ActorRef}
+import akka.dispatch.{MessageDispatcher, Envelope}
+import kamon.{Tracer, TraceContext}
+import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage
+
+trait ActorInstrumentationConfiguration {
+ def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any
+ def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation
+}
+
+
+trait ActorReceiveInvokeInstrumentation {
+ def preReceive(envelope: Envelope): (Envelope, Option[TraceContext])
+}
+
+object ActorReceiveInvokeInstrumentation {
+ val noopPreReceive = new ActorReceiveInvokeInstrumentation{
+ def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = (envelope, None)
+ }
+}
+
+class SimpleContextPassingInstrumentation extends ActorInstrumentationConfiguration {
+ def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any = SimpleTraceMessage(message, Tracer.context)
+
+ def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation = {
+ new ActorReceiveInvokeInstrumentation {
+ def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = envelope match {
+ case env @ Envelope(SimpleTraceMessage(msg, ctx), _) => (env.copy(message = msg), ctx)
+ case anyOther => (anyOther, None)
+ }
+ }
+ }
+}
+
+object SimpleContextPassingInstrumentation {
+ case class SimpleTraceMessage(message: Any, context: Option[TraceContext])
+}
+
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
index 7d3e36ca..84498cb8 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -3,11 +3,12 @@ package kamon.instrumentation
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
import akka.actor.{Props, ActorSystem, ActorRef}
-import kamon.{Tracer, TraceContext}
+import kamon.{Kamon, Tracer, TraceContext}
import akka.dispatch.{MessageDispatcher, Envelope}
import com.codahale.metrics.Timer
import kamon.metric.{MetricDirectory, Metrics}
import scala.Some
+import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage
case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
@@ -16,76 +17,48 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti
class ActorRefTellInstrumentation {
import ProceedingJoinPointPimp._
- val t2 = Metrics.registry.timer("some" + "LATENCY")
-
@Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)")
def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}
@Around("sendingMessageToActorRef(actor, message, sender)")
def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = {
+ import kamon.Instrument.instrumentation.sendMessageTransformation
- //val actorName = MetricDirectory.nameForActor(actor)
- //val t = Metrics.registry.timer(actorName + "LATENCY")
- //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]")
- pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender)
-
+ pjp.proceedWithTarget(actor, sendMessageTransformation(sender, actor, message).asInstanceOf[AnyRef], sender)
}
}
-@Aspect("perthis(actorCellCreation(..))")
-class ActorCellInvokeInstrumentation {
- var processingTimeTimer: Timer = _
- var shouldTrack = false
+@Aspect("""perthis(actorCellCreation(akka.actor.ActorSystem, akka.actor.ActorRef, akka.actor.Props, akka.dispatch.MessageDispatcher, akka.actor.ActorRef))""")
+class ActorCellInvokeInstrumentation {
+ var instrumentation = ActorReceiveInvokeInstrumentation.noopPreReceive
// AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut.
-
@Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
@After("actorCellCreation(system, ref, props, dispatcher, parent)")
def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
- val actorName = MetricDirectory.nameForActor(ref)
- val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
-
- //println("=====> Created ActorCell for: "+ref.toString())
- /** TODO: Find a better way to filter the things we don't want to measure. */
- //if(system.name != "kamon" && actorName.startsWith("/user")) {
- processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME")
- shouldTrack = true
- //}
+ instrumentation = kamon.Instrument.instrumentation.receiveInvokeInstrumentation(system, ref, props, dispatcher, parent)
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
-
@Around("invokingActorBehaviourAtActorCell(envelope)")
def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
import ProceedingJoinPointPimp._
- //println("ENVELOPE --------------------->"+envelope)
- envelope match {
- case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
- //timer.stop()
- val originalEnvelope = envelope.copy(message = msg)
-
- //println("PROCESSING TIME TIMER: "+processingTimeTimer)
- val pt = processingTimeTimer.time()
- ctx match {
- case Some(c) => {
- Tracer.set(c)
- //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
- pjp.proceedWith(originalEnvelope)
- Tracer.clear
- }
- case None => pjp.proceedWith(originalEnvelope)
- }
- pt.stop()
+ val (originalEnvelope, ctx) = instrumentation.preReceive(envelope)
+ ctx match {
+ case Some(c) => {
+ Tracer.set(c)
+ pjp.proceedWith(originalEnvelope)
+ Tracer.clear
}
- case _ => pjp.proceed
+ case None => pjp.proceedWith(originalEnvelope)
}
}
}
@@ -117,3 +90,5 @@ class UnregisteredActorRefInstrumentation {
}
}
}
+
+