diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-06-14 18:33:23 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-06-14 18:33:23 -0300 |
commit | 658bdd03a3b549cf7225197388e1e18b01723f1f (patch) | |
tree | c88228d013324519d29aad6edebaa1fd65145c28 /src/main/scala/kamon/instrumentation | |
parent | 80725fd14a728c6afcc9d8b3ac7c4bd10e8bd05e (diff) | |
download | Kamon-658bdd03a3b549cf7225197388e1e18b01723f1f.tar.gz Kamon-658bdd03a3b549cf7225197388e1e18b01723f1f.tar.bz2 Kamon-658bdd03a3b549cf7225197388e1e18b01723f1f.zip |
minor cleanup, still working in metrics
Diffstat (limited to 'src/main/scala/kamon/instrumentation')
-rw-r--r-- | src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala | 74 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/AspectJPimps.scala | 19 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala (renamed from src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala) | 9 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala | 73 |
4 files changed, 170 insertions, 5 deletions
diff --git a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala new file mode 100644 index 00000000..b345eaae --- /dev/null +++ b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -0,0 +1,74 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect} +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{Props, ActorSystem, ActorRef} +import kamon.{Kamon, TraceContext} +import akka.dispatch.Envelope +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import kamon.metric.{MetricDirectory, Metrics} + +case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timeStamp: Long = System.nanoTime()) + + +@Aspect +class ActorRefTellInstrumentation { + import ProceedingJoinPointPimp._ + + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.pattern.PromiseActorRef) && args(message, sender)") + def sendingMessageToActorRef(message: Any, sender: ActorRef) = {} + + @Around("sendingMessageToActorRef(message, sender)") + def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = pjp.proceedWith(TraceableMessage(Kamon.context, message)) +} + + +@Aspect("perthis(actorCellCreation(..))") +class ActorCellInvokeInstrumentation { + + val latencyHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) + var shouldTrack = false + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {} + + @Before("actorCellCreation(system, ref, props, parent)") + def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = { + val actorName = MetricDirectory.nameForActor(ref) + val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) + + // TODO: Find a better way to filter the thins we don't want to measure. + if(system.name != "kamon" && actorName.startsWith("/user")) { + Metrics.registry.register(histogramName + "/cell", latencyHistogram) + shouldTrack = true + } + } + + + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { + import ProceedingJoinPointPimp._ + + envelope match { + case Envelope(TraceableMessage(ctx, msg, timeStamp), sender) => { + latencyHistogram.update(System.nanoTime() - timeStamp) + + val originalEnvelope = envelope.copy(message = msg) + ctx match { + case Some(c) => { + Kamon.set(c) + pjp.proceedWith(originalEnvelope) + Kamon.clear + } + case None => pjp.proceedWith(originalEnvelope) + } + } + case _ => pjp.proceed + } + } +} diff --git a/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/src/main/scala/kamon/instrumentation/AspectJPimps.scala new file mode 100644 index 00000000..0663e801 --- /dev/null +++ b/src/main/scala/kamon/instrumentation/AspectJPimps.scala @@ -0,0 +1,19 @@ +package kamon.instrumentation + +import org.aspectj.lang.ProceedingJoinPoint + +trait ProceedingJoinPointPimp { + import language.implicitConversions + + implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) +} + +object ProceedingJoinPointPimp extends ProceedingJoinPointPimp + +case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { + def proceedWith(newUniqueArg: AnyRef) = { + val args = pjp.getArgs + args.update(0, newUniqueArg) + pjp.proceed(args) + } +} diff --git a/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index 35e06b5d..f3ee4ee7 100644 --- a/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -1,12 +1,11 @@ -package akka.dispatch +package kamon.instrumentation import org.aspectj.lang.annotation._ import java.util.concurrent._ -import scala.concurrent.forkjoin.ForkJoinPool import org.aspectj.lang.ProceedingJoinPoint import java.util -import akka.dispatch.NamedExecutorServiceFactoryDelegate import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector} +import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory} case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { @@ -21,7 +20,7 @@ class ExecutorServiceFactoryProviderInstrumentation { @Around("factoryMethodCall(id, threadFactory)") def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { - val delegate = pjp.proceed(Array[AnyRef](id, threadFactory)).asInstanceOf[ExecutorServiceFactory] // Safe Cast + val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast val actorSystemName = threadFactory match { case m: MonitorableThreadFactory => m.name @@ -42,7 +41,7 @@ class NamedExecutorServiceFactoryDelegateInstrumentation { @Around("factoryMethodCall(namedFactory)") def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { - val delegate = pjp.proceed(Array[AnyRef](namedFactory)).asInstanceOf[ExecutorService] + val delegate = pjp.proceed().asInstanceOf[ExecutorService] val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) ExecutorServiceMetricCollector.register(executorFullName, delegate) diff --git a/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala new file mode 100644 index 00000000..75d6189c --- /dev/null +++ b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -0,0 +1,73 @@ +package kamon.instrumentation + +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import akka.dispatch.{Envelope, MessageQueue} +import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} +import akka.actor.{ActorSystem, ActorRef} +import kamon.metric.{Metrics, MetricDirectory} +import org.aspectj.lang.ProceedingJoinPoint + + +/** + * For Mailboxes we would like to track the queue size and message latency. Currently the latency + * will be gathered from the ActorCellMetrics. + */ + + +@Aspect +class MessageQueueInstrumentation { + + @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)") + def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {} + + @Around("messageQueueCreation(owner, system)") + def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { + val delegate = pjp.proceed.asInstanceOf[MessageQueue] + + // We are not interested in monitoring mailboxes if we don't know where they belong to. + val monitoredMailbox = for(own <- owner; sys <- system) yield { + val systemName = sys.name + val ownerName = MetricDirectory.nameForActor(own) + val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName) + + val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir()) + Metrics.include(mailBoxName, queueSizeHistogram) + + new MonitoredMessageQueue(delegate, queueSizeHistogram) + } + + monitoredMailbox match { + case None => delegate + case Some(mmb) => mmb + } + } +} + + +class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue { + + def enqueue(receiver: ActorRef, handle: Envelope) = { + delegate.enqueue(receiver, handle) + queueSizeHistogram.update(numberOfMessages) + } + + def dequeue(): Envelope = { + val envelope = delegate.dequeue() + queueSizeHistogram.update(numberOfMessages) + + envelope + } + + def numberOfMessages: Int = delegate.numberOfMessages + def hasMessages: Boolean = delegate.hasMessages + def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) +} + + + + + + + + + |