diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-06-13 18:24:04 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-06-13 18:24:04 -0300 |
commit | 80725fd14a728c6afcc9d8b3ac7c4bd10e8bd05e (patch) | |
tree | 3a576d68682ba76b2296ceccd18a2b077197fbb2 /src/main/scala/akka/instrumentation | |
parent | 84c9ae342ea4a280b0033d9d78239b19b01b728f (diff) | |
download | Kamon-80725fd14a728c6afcc9d8b3ac7c4bd10e8bd05e.tar.gz Kamon-80725fd14a728c6afcc9d8b3ac7c4bd10e8bd05e.tar.bz2 Kamon-80725fd14a728c6afcc9d8b3ac7c4bd10e8bd05e.zip |
wip
Diffstat (limited to 'src/main/scala/akka/instrumentation')
-rw-r--r-- | src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala | 47 | ||||
-rw-r--r-- | src/main/scala/akka/instrumentation/MessageQueueMetrics.scala | 71 |
2 files changed, 109 insertions, 9 deletions
diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala index f631b79a..218c09cc 100644 --- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala +++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala @@ -1,12 +1,14 @@ package akka.instrumentation -import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} +import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect} import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{ActorRef} +import akka.actor.{Props, ActorSystem, ActorRef} import kamon.{Kamon, TraceContext} import akka.dispatch.Envelope +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import kamon.metric.{MetricDirectory, Metrics} -case class TraceableMessage(traceContext: TraceContext, message: Any) +case class TraceableEnvelope(traceContext: TraceContext, message: Any, timeStamp: Long = System.nanoTime()) @Aspect @@ -22,8 +24,12 @@ class ActorRefTellInstrumentation { Kamon.context() match { case Some(ctx) => { - val traceableMessage = TraceableMessage(ctx, message) - proceed(getArgs.updated(0, traceableMessage)) + val traceableMessage = TraceableEnvelope(ctx, message) + + // update the args with the new message + val args = getArgs + args.update(0, traceableMessage) + proceed(args) } case None => proceed } @@ -31,19 +37,42 @@ class ActorRefTellInstrumentation { } -@Aspect +@Aspect("perthis(actorCellCreation(..))") class ActorCellInvokeInstrumentation { + val latencyHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) + val messagesPer + @volatile var shouldTrack = false + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {} + + @Before("actorCellCreation(system, ref, props, parent)") + def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = { + val actorName = MetricDirectory.nameForActor(ref) + val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) + + // TODO: Find a better way to filter the thins we don't want to measure. + if(system.name != "kamon" && actorName.startsWith("/user")) { + Metrics.registry.register(histogramName + "/cell", latencyHistogram) + shouldTrack = true + } + } + + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} @Around("invokingActorBehaviourAtActorCell(envelope)") - def around(pjp: ProceedingJoinPoint, envelope: Envelope) = { + def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { import pjp._ envelope match { - case Envelope(TraceableMessage(ctx, msg), sender) => { + case Envelope(TraceableEnvelope(ctx, msg, timeStamp), sender) => { + latencyHistogram.update(System.nanoTime() - timeStamp) + Kamon.set(ctx) val originalEnvelope = envelope.copy(message = msg) @@ -54,4 +83,4 @@ class ActorCellInvokeInstrumentation { case _ => proceed } } -}
\ No newline at end of file +} diff --git a/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala b/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala new file mode 100644 index 00000000..a7f5cdc8 --- /dev/null +++ b/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala @@ -0,0 +1,71 @@ +package akka.instrumentation + +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import akka.dispatch.{Envelope, MessageQueue} +import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} +import akka.actor.{ActorSystem, ActorRef} +import kamon.metric.{Metrics, MetricDirectory} +import org.aspectj.lang.ProceedingJoinPoint + + +/** + * For Mailboxes we would like to track the queue size and message latency. Currently the latency + * will be gathered from the ActorCellMetrics. + */ + + +@Aspect +class MessageQueueInstrumentation { + + @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)") + def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {} + + @Around("messageQueueCreation(owner, system)") + def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { + val delegate = pjp.proceed.asInstanceOf[MessageQueue] + + // We are not interested in monitoring mailboxes if we don't know where they belong to. + val monitoredMailbox = for(own <- owner; sys <- system) yield new MonitoredMessageQueue(delegate, own, sys) + + monitoredMailbox match { + case None => delegate + case Some(mmb) => mmb + } + + } +} + + +class MonitoredMessageQueue(val delegate: MessageQueue, owner: ActorRef, system: ActorSystem) extends MessageQueue { + val queueSizeHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) + + val fullName = MetricDirectory.nameForMailbox(system.name, MetricDirectory.nameForActor(owner)) + Metrics.registry.register(fullName, queueSizeHistogram) + + def enqueue(receiver: ActorRef, handle: Envelope) = { + queueSizeHistogram.update(numberOfMessages) + delegate.enqueue(receiver, handle) + } + + def dequeue(): Envelope = { + queueSizeHistogram.update(numberOfMessages) + delegate.dequeue() + } + + def numberOfMessages: Int = delegate.numberOfMessages + def hasMessages: Boolean = delegate.hasMessages + def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = { + Metrics.deregister(fullName) + + delegate.cleanUp(owner, deadLetters) + } +} + + + + + + + + + |