aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kamon-core/src/main/resources/application.conf2
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala41
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala59
-rw-r--r--kamon-core/src/main/scala/test/PingPong.scala7
5 files changed, 73 insertions, 48 deletions
diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf
index 2f8d8d87..647939f8 100644
--- a/kamon-core/src/main/resources/application.conf
+++ b/kamon-core/src/main/resources/application.conf
@@ -2,7 +2,7 @@ akka {
loglevel = DEBUG
stdout-loglevel = DEBUG
- extensions = ["kamon.dashboard.DashboardExtension"]
+ #extensions = ["kamon.dashboard.DashboardExtension"]
actor {
default-dispatcher {
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 298f43eb..118239f7 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -5,6 +5,12 @@ import kamon.metric.{HistogramSnapshot, ActorSystemMetrics}
import scala.concurrent.duration.FiniteDuration
import com.newrelic.api.agent.NewRelic
import scala.collection.concurrent.TrieMap
+import kamon.instrumentation.{SimpleContextPassingInstrumentation, ActorInstrumentationConfiguration}
+
+
+object Instrument {
+ val instrumentation: ActorInstrumentationConfiguration = new SimpleContextPassingInstrumentation
+}
object Kamon {
implicit lazy val actorSystem = ActorSystem("kamon")
@@ -19,8 +25,8 @@ object Kamon {
def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name)
}
- val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
- val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")
+ //val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager")
+ //val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter")
}
@@ -79,7 +85,7 @@ case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThre
class NewrelicReporterActor extends Actor {
import scala.concurrent.duration._
- Kamon.metricManager ! RegisterForAllDispatchers(5 seconds)
+ //Kamon.metricManager ! RegisterForAllDispatchers(5 seconds)
def receive = {
case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => {
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala
new file mode 100644
index 00000000..4e47c2a4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorInstrumentation.scala
@@ -0,0 +1,41 @@
+package kamon.instrumentation
+
+import akka.actor.{Props, ActorSystem, ActorRef}
+import akka.dispatch.{MessageDispatcher, Envelope}
+import kamon.{Tracer, TraceContext}
+import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage
+
+trait ActorInstrumentationConfiguration {
+ def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any
+ def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation
+}
+
+
+trait ActorReceiveInvokeInstrumentation {
+ def preReceive(envelope: Envelope): (Envelope, Option[TraceContext])
+}
+
+object ActorReceiveInvokeInstrumentation {
+ val noopPreReceive = new ActorReceiveInvokeInstrumentation{
+ def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = (envelope, None)
+ }
+}
+
+class SimpleContextPassingInstrumentation extends ActorInstrumentationConfiguration {
+ def sendMessageTransformation(from: ActorRef, to: ActorRef, message: Any): Any = SimpleTraceMessage(message, Tracer.context)
+
+ def receiveInvokeInstrumentation(system: ActorSystem, self: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): ActorReceiveInvokeInstrumentation = {
+ new ActorReceiveInvokeInstrumentation {
+ def preReceive(envelope: Envelope): (Envelope, Option[TraceContext]) = envelope match {
+ case env @ Envelope(SimpleTraceMessage(msg, ctx), _) => (env.copy(message = msg), ctx)
+ case anyOther => (anyOther, None)
+ }
+ }
+ }
+}
+
+object SimpleContextPassingInstrumentation {
+ case class SimpleTraceMessage(message: Any, context: Option[TraceContext])
+}
+
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
index 7d3e36ca..84498cb8 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -3,11 +3,12 @@ package kamon.instrumentation
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
import akka.actor.{Props, ActorSystem, ActorRef}
-import kamon.{Tracer, TraceContext}
+import kamon.{Kamon, Tracer, TraceContext}
import akka.dispatch.{MessageDispatcher, Envelope}
import com.codahale.metrics.Timer
import kamon.metric.{MetricDirectory, Metrics}
import scala.Some
+import kamon.instrumentation.SimpleContextPassingInstrumentation.SimpleTraceMessage
case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
@@ -16,76 +17,48 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti
class ActorRefTellInstrumentation {
import ProceedingJoinPointPimp._
- val t2 = Metrics.registry.timer("some" + "LATENCY")
-
@Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && !within(akka.actor.DeadLetterActorRef) && target(actor) && args(message, sender)")
def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}
@Around("sendingMessageToActorRef(actor, message, sender)")
def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = {
+ import kamon.Instrument.instrumentation.sendMessageTransformation
- //val actorName = MetricDirectory.nameForActor(actor)
- //val t = Metrics.registry.timer(actorName + "LATENCY")
- //println(s"Wrapped message from [$sender] to [$actor] with content: [$message]")
- pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t2.time()), sender)
-
+ pjp.proceedWithTarget(actor, sendMessageTransformation(sender, actor, message).asInstanceOf[AnyRef], sender)
}
}
-@Aspect("perthis(actorCellCreation(..))")
-class ActorCellInvokeInstrumentation {
- var processingTimeTimer: Timer = _
- var shouldTrack = false
+@Aspect("""perthis(actorCellCreation(akka.actor.ActorSystem, akka.actor.ActorRef, akka.actor.Props, akka.dispatch.MessageDispatcher, akka.actor.ActorRef))""")
+class ActorCellInvokeInstrumentation {
+ var instrumentation = ActorReceiveInvokeInstrumentation.noopPreReceive
// AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut.
-
@Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
@After("actorCellCreation(system, ref, props, dispatcher, parent)")
def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
- val actorName = MetricDirectory.nameForActor(ref)
- val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
-
- //println("=====> Created ActorCell for: "+ref.toString())
- /** TODO: Find a better way to filter the things we don't want to measure. */
- //if(system.name != "kamon" && actorName.startsWith("/user")) {
- processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME")
- shouldTrack = true
- //}
+ instrumentation = kamon.Instrument.instrumentation.receiveInvokeInstrumentation(system, ref, props, dispatcher, parent)
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
-
@Around("invokingActorBehaviourAtActorCell(envelope)")
def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
import ProceedingJoinPointPimp._
- //println("ENVELOPE --------------------->"+envelope)
- envelope match {
- case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
- //timer.stop()
- val originalEnvelope = envelope.copy(message = msg)
-
- //println("PROCESSING TIME TIMER: "+processingTimeTimer)
- val pt = processingTimeTimer.time()
- ctx match {
- case Some(c) => {
- Tracer.set(c)
- //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
- pjp.proceedWith(originalEnvelope)
- Tracer.clear
- }
- case None => pjp.proceedWith(originalEnvelope)
- }
- pt.stop()
+ val (originalEnvelope, ctx) = instrumentation.preReceive(envelope)
+ ctx match {
+ case Some(c) => {
+ Tracer.set(c)
+ pjp.proceedWith(originalEnvelope)
+ Tracer.clear
}
- case _ => pjp.proceed
+ case None => pjp.proceedWith(originalEnvelope)
}
}
}
@@ -117,3 +90,5 @@ class UnregisteredActorRefInstrumentation {
}
}
}
+
+
diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala
index f9d6869c..6ed17ec6 100644
--- a/kamon-core/src/main/scala/test/PingPong.scala
+++ b/kamon-core/src/main/scala/test/PingPong.scala
@@ -1,6 +1,6 @@
package test
-import akka.actor.{Props, Actor, ActorSystem}
+import akka.actor.{Deploy, Props, Actor, ActorSystem}
object PingPong extends App {
@@ -22,8 +22,11 @@ case object Ping
case object Pong
class Pinger extends Actor {
+ val ponger = context.actorOf(Props[Ponger], "ponger#")
+ val ponger2 = context.actorOf(Props[Ponger], "ponger#")
+
def receive = {
- case Pong => sender ! Ping
+ case Pong => ponger ! Ping
}
}