From 658bdd03a3b549cf7225197388e1e18b01723f1f Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Fri, 14 Jun 2013 18:33:23 -0300 Subject: minor cleanup, still working in metrics --- src/main/resources/META-INF/aop.xml | 22 ++---- src/main/scala/akka/ActorInstrumentation.scala | 46 ------------ src/main/scala/akka/ActorSystemAspect.scala | 18 ----- src/main/scala/akka/MailboxAspect.scala | 16 ---- src/main/scala/akka/MailboxMetrics.scala | 35 --------- src/main/scala/akka/PoolMetrics.scala | 29 -------- .../scala/akka/PoolMonitorInstrumentation.scala | 30 -------- src/main/scala/akka/Tracer.scala | 24 ------ .../ActorRefTellInstrumentation.scala | 86 ---------------------- .../akka/instrumentation/MessageQueueMetrics.scala | 71 ------------------ .../ActorRefTellInstrumentation.scala | 74 +++++++++++++++++++ .../scala/kamon/instrumentation/AspectJPimps.scala | 19 +++++ .../DispatcherInstrumentation.scala | 71 ------------------ .../instrumentation/ExecutorServiceMetrics.scala | 70 ++++++++++++++++++ .../instrumentation/MessageQueueMetrics.scala | 73 ++++++++++++++++++ src/main/scala/kamon/metric/Metrics.scala | 19 ++++- 16 files changed, 260 insertions(+), 443 deletions(-) delete mode 100644 src/main/scala/akka/ActorInstrumentation.scala delete mode 100644 src/main/scala/akka/ActorSystemAspect.scala delete mode 100644 src/main/scala/akka/MailboxAspect.scala delete mode 100644 src/main/scala/akka/MailboxMetrics.scala delete mode 100644 src/main/scala/akka/PoolMetrics.scala delete mode 100644 src/main/scala/akka/PoolMonitorInstrumentation.scala delete mode 100644 src/main/scala/akka/Tracer.scala delete mode 100644 src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala delete mode 100644 src/main/scala/akka/instrumentation/MessageQueueMetrics.scala create mode 100644 src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala create mode 100644 src/main/scala/kamon/instrumentation/AspectJPimps.scala delete mode 100644 src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala create mode 100644 src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala create mode 100644 src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala (limited to 'src/main') diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml index b4b3d879..8619b2c8 100644 --- a/src/main/resources/META-INF/aop.xml +++ b/src/main/resources/META-INF/aop.xml @@ -2,29 +2,19 @@ - + - - - - - + + - - - - + - - - - - + + diff --git a/src/main/scala/akka/ActorInstrumentation.scala b/src/main/scala/akka/ActorInstrumentation.scala deleted file mode 100644 index aa14f237..00000000 --- a/src/main/scala/akka/ActorInstrumentation.scala +++ /dev/null @@ -1,46 +0,0 @@ -package akka - -import actor.ActorCell -import org.aspectj.lang.annotation.{After, Around, Pointcut, Aspect} -import org.aspectj.lang.ProceedingJoinPoint -import kamon.metric.Metrics.{ registry => meterRegistry } -import com.codahale.metrics.Meter -import kamon.metric.MetricsUtils._ - -@Aspect("perthis(actorCellCreation(*))") -class ActorInstrumentation { - - /** - * Aspect members - */ - - private val actorMeter:Meter = new Meter - - /** - * Pointcuts - */ - @Pointcut("execution(akka.actor.ActorCell+.new(..)) && this(actor)") - def actorCellCreation(actor:ActorCell):Unit = {} - - @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") - def actorReceive():Unit = {} - - /** - * Advices - */ - @After("actorCellCreation(actor)") - def afterCellCreation(actor:ActorCell):Unit ={ - val actorName:String = actor.self.path.toString - - meterRegistry.register(s"meter-for-${actorName}", actorMeter) - } - - @Around("actorReceive()") - def around(pjp: ProceedingJoinPoint) = { - import pjp._ - - markMeter(actorMeter) { - proceed - } - } - } \ No newline at end of file diff --git a/src/main/scala/akka/ActorSystemAspect.scala b/src/main/scala/akka/ActorSystemAspect.scala deleted file mode 100644 index 9d1d515d..00000000 --- a/src/main/scala/akka/ActorSystemAspect.scala +++ /dev/null @@ -1,18 +0,0 @@ -package akka - -import org.aspectj.lang.annotation._ -import actor.ActorSystemImpl - -@Aspect -class ActorSystemAspect { - println("Created ActorSystemAspect") - - @Pointcut("execution(* akka.actor.ActorRefProvider+.init(..)) && !within(ActorSystemAspect)") - protected def actorSystem():Unit = {} - - @After("actorSystem() && args(system)") - def collectActorSystem(system: ActorSystemImpl):Unit = { - Tracer.collectActorSystem(system) - Tracer.start() - } -} diff --git a/src/main/scala/akka/MailboxAspect.scala b/src/main/scala/akka/MailboxAspect.scala deleted file mode 100644 index 5ca6d6ab..00000000 --- a/src/main/scala/akka/MailboxAspect.scala +++ /dev/null @@ -1,16 +0,0 @@ -package akka - -import org.aspectj.lang.annotation._ - -@Aspect("perthis(mailboxMonitor())") -class MailboxAspect { - println("Created MailboxAspect") - - @Pointcut("execution(akka.dispatch.Mailbox.new(..)) && !within(MailboxAspect)") - protected def mailboxMonitor():Unit = {} - - @After("mailboxMonitor() && this(mb)") - def afterInitialization(mb: akka.dispatch.Mailbox) : Unit = { - Tracer.collectMailbox(mb) - } -} \ No newline at end of file diff --git a/src/main/scala/akka/MailboxMetrics.scala b/src/main/scala/akka/MailboxMetrics.scala deleted file mode 100644 index 6bf65cc7..00000000 --- a/src/main/scala/akka/MailboxMetrics.scala +++ /dev/null @@ -1,35 +0,0 @@ -package akka - -import akka.dispatch.Mailbox -import com.newrelic.api.agent.NewRelic - -case class MailboxMetrics(mailboxes:Map[String,Mailbox]) - - -object MailboxMetrics { - def apply(mailboxes: List[Mailbox]) = { - new MailboxMetrics(mailboxes.take(mailboxes.length - 1).map{m => (m.actor.self.path.toString -> m)}.toMap) //TODO:research why collect an ActorSystemImpl - } - - def toMap(mb: Mailbox):Map[String,Int] = Map[String,Int]( - "NumberOfMessages" -> mb.numberOfMessages, - "MailboxDispatcherThroughput" -> mb.dispatcher.throughput, - "SuspendCount" -> mb.suspendCount - ) -} - -class MailboxSenderMetrics(mailboxes:List[Mailbox]) extends Runnable { - def run() { - val mbm = MailboxMetrics(mailboxes) - mbm.mailboxes.map { case(actorName,mb) => { - println(s"Sending metrics to Newrelic MailBoxMonitor for Actor -> ${actorName}") - - MailboxMetrics.toMap(mb).map {case(property, value) => - NewRelic.recordMetric(s"${actorName}:Mailbox:${property}", value) - } - } - } - } -} - - diff --git a/src/main/scala/akka/PoolMetrics.scala b/src/main/scala/akka/PoolMetrics.scala deleted file mode 100644 index 422e34fd..00000000 --- a/src/main/scala/akka/PoolMetrics.scala +++ /dev/null @@ -1,29 +0,0 @@ -package akka - -import scala.concurrent.forkjoin.ForkJoinPool -import com.newrelic.api.agent.NewRelic - -case class PoolMetrics(poolName:String, data:Map[String,Int]) - -object PoolMetrics { - def apply(pool: ForkJoinPool) = new PoolMetrics(pool.getClass.getSimpleName, toMap(pool)) - - def toMap(pool: scala.concurrent.forkjoin.ForkJoinPool):Map[String,Int] = Map[String,Int]( - "ActiveThreadCount" -> pool.getActiveThreadCount, - "Parallelism" -> pool.getParallelism, - "PoolSize" -> pool.getPoolSize, - "QueuedSubmissionCount" -> pool.getQueuedSubmissionCount, - "StealCount" -> pool.getStealCount.toInt, - "QueuedTaskCount" -> pool.getQueuedTaskCount.toInt, - "RunningThreadCount" -> pool.getRunningThreadCount - ) -} - -class PoolMetricsSender(forkJoinPool:ForkJoinPool) extends Runnable { - def run() { - val pool = PoolMetrics(forkJoinPool) - println(s"Sending Metrics to NewRelic -> ${pool}") - pool.data.map{case(k,v) => NewRelic.recordMetric(s"${pool.poolName}:${k}",v)} - } -} - diff --git a/src/main/scala/akka/PoolMonitorInstrumentation.scala b/src/main/scala/akka/PoolMonitorInstrumentation.scala deleted file mode 100644 index e78e0d7e..00000000 --- a/src/main/scala/akka/PoolMonitorInstrumentation.scala +++ /dev/null @@ -1,30 +0,0 @@ -package akka - -import org.aspectj.lang.annotation._ -import akka.dispatch.MonitorableThreadFactory -import kamon.metric.Metrics -import scala.concurrent.forkjoin.ForkJoinPool -import com.codahale.metrics.Gauge - -@Aspect("perthis(poolMonitor(scala.concurrent.forkjoin.ForkJoinPool))") -class PoolMonitorAspect { - println("Created PoolMonitorAspect") - - - @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..)) && this(pool)") - protected def poolMonitor(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = {} - - @After("poolMonitor(pool)") - def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = { - pool.getFactory match { - case m: MonitorableThreadFactory => registerForMonitoring(pool, m.name) - } - } - - def registerForMonitoring(fjp: ForkJoinPool, name: String) { - Metrics.registry.register(s"/metrics/actorsystem/{actorsystem-name}/dispatcher/$name", - new Gauge[Long] { - def getValue: Long = fjp.getPoolSize - }) - } -} diff --git a/src/main/scala/akka/Tracer.scala b/src/main/scala/akka/Tracer.scala deleted file mode 100644 index 3b301247..00000000 --- a/src/main/scala/akka/Tracer.scala +++ /dev/null @@ -1,24 +0,0 @@ -package akka - -import actor.ActorSystemImpl -import scala.concurrent.forkjoin.ForkJoinPool -import scala.concurrent.duration._ -import akka.dispatch.Mailbox -import scala._ - -object Tracer { - protected[this] var mailboxes:List[Mailbox] = Nil - protected[this] var tracerActorSystem: ActorSystemImpl = _ - protected[this] var forkJoinPool:ForkJoinPool = _ - - def collectPool(pool: ForkJoinPool) = forkJoinPool = pool - def collectActorSystem(actorSystem: ActorSystemImpl) = tracerActorSystem = actorSystem - def collectMailbox(mb: akka.dispatch.Mailbox) = mailboxes ::= mb - - def start():Unit ={ - implicit val dispatcher = tracerActorSystem.dispatcher - - tracerActorSystem.scheduler.schedule(6 seconds, 5 second, new MailboxSenderMetrics(mailboxes)) - tracerActorSystem.scheduler.schedule(7 seconds, 5 second, new PoolMetricsSender(forkJoinPool)) - } -} \ No newline at end of file diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala deleted file mode 100644 index 218c09cc..00000000 --- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala +++ /dev/null @@ -1,86 +0,0 @@ -package akka.instrumentation - -import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect} -import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{Props, ActorSystem, ActorRef} -import kamon.{Kamon, TraceContext} -import akka.dispatch.Envelope -import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} -import kamon.metric.{MetricDirectory, Metrics} - -case class TraceableEnvelope(traceContext: TraceContext, message: Any, timeStamp: Long = System.nanoTime()) - - -@Aspect -class ActorRefTellInstrumentation { - println("Created ActorAspect") - - @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.pattern.PromiseActorRef) && args(message, sender)") - def sendingMessageToActorRef(message: Any, sender: ActorRef) = {} - - @Around("sendingMessageToActorRef(message, sender)") - def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { - import pjp._ - - Kamon.context() match { - case Some(ctx) => { - val traceableMessage = TraceableEnvelope(ctx, message) - - // update the args with the new message - val args = getArgs - args.update(0, traceableMessage) - proceed(args) - } - case None => proceed - } - } -} - - -@Aspect("perthis(actorCellCreation(..))") -class ActorCellInvokeInstrumentation { - - val latencyHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) - val messagesPer - @volatile var shouldTrack = false - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, parent)") - def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {} - - @Before("actorCellCreation(system, ref, props, parent)") - def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = { - val actorName = MetricDirectory.nameForActor(ref) - val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) - - // TODO: Find a better way to filter the thins we don't want to measure. - if(system.name != "kamon" && actorName.startsWith("/user")) { - Metrics.registry.register(histogramName + "/cell", latencyHistogram) - shouldTrack = true - } - } - - - - @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") - def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} - - - @Around("invokingActorBehaviourAtActorCell(envelope)") - def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { - import pjp._ - - envelope match { - case Envelope(TraceableEnvelope(ctx, msg, timeStamp), sender) => { - latencyHistogram.update(System.nanoTime() - timeStamp) - - Kamon.set(ctx) - - val originalEnvelope = envelope.copy(message = msg) - proceed(getArgs.updated(0, originalEnvelope)) - - Kamon.clear - } - case _ => proceed - } - } -} diff --git a/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala b/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala deleted file mode 100644 index a7f5cdc8..00000000 --- a/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala +++ /dev/null @@ -1,71 +0,0 @@ -package akka.instrumentation - -import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} -import akka.dispatch.{Envelope, MessageQueue} -import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} -import akka.actor.{ActorSystem, ActorRef} -import kamon.metric.{Metrics, MetricDirectory} -import org.aspectj.lang.ProceedingJoinPoint - - -/** - * For Mailboxes we would like to track the queue size and message latency. Currently the latency - * will be gathered from the ActorCellMetrics. - */ - - -@Aspect -class MessageQueueInstrumentation { - - @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)") - def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {} - - @Around("messageQueueCreation(owner, system)") - def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { - val delegate = pjp.proceed.asInstanceOf[MessageQueue] - - // We are not interested in monitoring mailboxes if we don't know where they belong to. - val monitoredMailbox = for(own <- owner; sys <- system) yield new MonitoredMessageQueue(delegate, own, sys) - - monitoredMailbox match { - case None => delegate - case Some(mmb) => mmb - } - - } -} - - -class MonitoredMessageQueue(val delegate: MessageQueue, owner: ActorRef, system: ActorSystem) extends MessageQueue { - val queueSizeHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) - - val fullName = MetricDirectory.nameForMailbox(system.name, MetricDirectory.nameForActor(owner)) - Metrics.registry.register(fullName, queueSizeHistogram) - - def enqueue(receiver: ActorRef, handle: Envelope) = { - queueSizeHistogram.update(numberOfMessages) - delegate.enqueue(receiver, handle) - } - - def dequeue(): Envelope = { - queueSizeHistogram.update(numberOfMessages) - delegate.dequeue() - } - - def numberOfMessages: Int = delegate.numberOfMessages - def hasMessages: Boolean = delegate.hasMessages - def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = { - Metrics.deregister(fullName) - - delegate.cleanUp(owner, deadLetters) - } -} - - - - - - - - - diff --git a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala new file mode 100644 index 00000000..b345eaae --- /dev/null +++ b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -0,0 +1,74 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect} +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{Props, ActorSystem, ActorRef} +import kamon.{Kamon, TraceContext} +import akka.dispatch.Envelope +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import kamon.metric.{MetricDirectory, Metrics} + +case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timeStamp: Long = System.nanoTime()) + + +@Aspect +class ActorRefTellInstrumentation { + import ProceedingJoinPointPimp._ + + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.pattern.PromiseActorRef) && args(message, sender)") + def sendingMessageToActorRef(message: Any, sender: ActorRef) = {} + + @Around("sendingMessageToActorRef(message, sender)") + def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = pjp.proceedWith(TraceableMessage(Kamon.context, message)) +} + + +@Aspect("perthis(actorCellCreation(..))") +class ActorCellInvokeInstrumentation { + + val latencyHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) + var shouldTrack = false + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {} + + @Before("actorCellCreation(system, ref, props, parent)") + def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = { + val actorName = MetricDirectory.nameForActor(ref) + val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) + + // TODO: Find a better way to filter the thins we don't want to measure. + if(system.name != "kamon" && actorName.startsWith("/user")) { + Metrics.registry.register(histogramName + "/cell", latencyHistogram) + shouldTrack = true + } + } + + + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { + import ProceedingJoinPointPimp._ + + envelope match { + case Envelope(TraceableMessage(ctx, msg, timeStamp), sender) => { + latencyHistogram.update(System.nanoTime() - timeStamp) + + val originalEnvelope = envelope.copy(message = msg) + ctx match { + case Some(c) => { + Kamon.set(c) + pjp.proceedWith(originalEnvelope) + Kamon.clear + } + case None => pjp.proceedWith(originalEnvelope) + } + } + case _ => pjp.proceed + } + } +} diff --git a/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/src/main/scala/kamon/instrumentation/AspectJPimps.scala new file mode 100644 index 00000000..0663e801 --- /dev/null +++ b/src/main/scala/kamon/instrumentation/AspectJPimps.scala @@ -0,0 +1,19 @@ +package kamon.instrumentation + +import org.aspectj.lang.ProceedingJoinPoint + +trait ProceedingJoinPointPimp { + import language.implicitConversions + + implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) +} + +object ProceedingJoinPointPimp extends ProceedingJoinPointPimp + +case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { + def proceedWith(newUniqueArg: AnyRef) = { + val args = pjp.getArgs + args.update(0, newUniqueArg) + pjp.proceed(args) + } +} diff --git a/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala b/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala deleted file mode 100644 index 35e06b5d..00000000 --- a/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala +++ /dev/null @@ -1,71 +0,0 @@ -package akka.dispatch - -import org.aspectj.lang.annotation._ -import java.util.concurrent._ -import scala.concurrent.forkjoin.ForkJoinPool -import org.aspectj.lang.ProceedingJoinPoint -import java.util -import akka.dispatch.NamedExecutorServiceFactoryDelegate -import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector} - - -case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { - def createExecutorService: ExecutorService = delegate.createExecutorService -} - -@Aspect -class ExecutorServiceFactoryProviderInstrumentation { - - @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(id, threadFactory)") - def factoryMethodCall(id: String, threadFactory: ThreadFactory) = {} - - @Around("factoryMethodCall(id, threadFactory)") - def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { - val delegate = pjp.proceed(Array[AnyRef](id, threadFactory)).asInstanceOf[ExecutorServiceFactory] // Safe Cast - - val actorSystemName = threadFactory match { - case m: MonitorableThreadFactory => m.name - case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. - } - - new NamedExecutorServiceFactoryDelegate(actorSystemName, id, delegate) - } - -} - - -@Aspect -class NamedExecutorServiceFactoryDelegateInstrumentation { - - @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)") - def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {} - - @Around("factoryMethodCall(namedFactory)") - def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { - val delegate = pjp.proceed(Array[AnyRef](namedFactory)).asInstanceOf[ExecutorService] - val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) - - ExecutorServiceMetricCollector.register(executorFullName, delegate) - - new NamedExecutorServiceDelegate(executorFullName, delegate) - } -} - -case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { - def shutdown() = { - ExecutorServiceMetricCollector.deregister(fullName) - delegate.shutdown() - } - def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() - def isShutdown: Boolean = delegate.isShutdown - def isTerminated: Boolean = delegate.isTerminated - def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit) - def submit[T](task: Callable[T]): Future[T] = delegate.submit(task) - def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result) - def submit(task: Runnable): Future[_] = delegate.submit(task) - def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks) - def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit) - def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks) - def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit) - def execute(command: Runnable) = delegate.execute(command) -} \ No newline at end of file diff --git a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala new file mode 100644 index 00000000..f3ee4ee7 --- /dev/null +++ b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -0,0 +1,70 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import java.util.concurrent._ +import org.aspectj.lang.ProceedingJoinPoint +import java.util +import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector} +import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory} + + +case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { + def createExecutorService: ExecutorService = delegate.createExecutorService +} + +@Aspect +class ExecutorServiceFactoryProviderInstrumentation { + + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(id, threadFactory)") + def factoryMethodCall(id: String, threadFactory: ThreadFactory) = {} + + @Around("factoryMethodCall(id, threadFactory)") + def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast + + val actorSystemName = threadFactory match { + case m: MonitorableThreadFactory => m.name + case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. + } + + new NamedExecutorServiceFactoryDelegate(actorSystemName, id, delegate) + } + +} + + +@Aspect +class NamedExecutorServiceFactoryDelegateInstrumentation { + + @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)") + def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {} + + @Around("factoryMethodCall(namedFactory)") + def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { + val delegate = pjp.proceed().asInstanceOf[ExecutorService] + val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) + + ExecutorServiceMetricCollector.register(executorFullName, delegate) + + new NamedExecutorServiceDelegate(executorFullName, delegate) + } +} + +case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { + def shutdown() = { + ExecutorServiceMetricCollector.deregister(fullName) + delegate.shutdown() + } + def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() + def isShutdown: Boolean = delegate.isShutdown + def isTerminated: Boolean = delegate.isTerminated + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit) + def submit[T](task: Callable[T]): Future[T] = delegate.submit(task) + def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result) + def submit(task: Runnable): Future[_] = delegate.submit(task) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit) + def execute(command: Runnable) = delegate.execute(command) +} \ No newline at end of file diff --git a/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala new file mode 100644 index 00000000..75d6189c --- /dev/null +++ b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -0,0 +1,73 @@ +package kamon.instrumentation + +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import akka.dispatch.{Envelope, MessageQueue} +import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} +import akka.actor.{ActorSystem, ActorRef} +import kamon.metric.{Metrics, MetricDirectory} +import org.aspectj.lang.ProceedingJoinPoint + + +/** + * For Mailboxes we would like to track the queue size and message latency. Currently the latency + * will be gathered from the ActorCellMetrics. + */ + + +@Aspect +class MessageQueueInstrumentation { + + @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)") + def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {} + + @Around("messageQueueCreation(owner, system)") + def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { + val delegate = pjp.proceed.asInstanceOf[MessageQueue] + + // We are not interested in monitoring mailboxes if we don't know where they belong to. + val monitoredMailbox = for(own <- owner; sys <- system) yield { + val systemName = sys.name + val ownerName = MetricDirectory.nameForActor(own) + val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName) + + val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir()) + Metrics.include(mailBoxName, queueSizeHistogram) + + new MonitoredMessageQueue(delegate, queueSizeHistogram) + } + + monitoredMailbox match { + case None => delegate + case Some(mmb) => mmb + } + } +} + + +class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue { + + def enqueue(receiver: ActorRef, handle: Envelope) = { + delegate.enqueue(receiver, handle) + queueSizeHistogram.update(numberOfMessages) + } + + def dequeue(): Envelope = { + val envelope = delegate.dequeue() + queueSizeHistogram.update(numberOfMessages) + + envelope + } + + def numberOfMessages: Int = delegate.numberOfMessages + def hasMessages: Boolean = delegate.hasMessages + def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) +} + + + + + + + + + diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala index ebf4fd2b..30b8bda9 100644 --- a/src/main/scala/kamon/metric/Metrics.scala +++ b/src/main/scala/kamon/metric/Metrics.scala @@ -4,7 +4,14 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics._ import akka.actor.ActorRef -object Metrics { +trait MetricDepot { + def include(name: String, metric: Metric): Unit + def exclude(name: String): Unit +} + + + +object Metrics extends MetricDepot { val registry: MetricRegistry = new MetricRegistry val consoleReporter = ConsoleReporter.forRegistry(registry) @@ -14,6 +21,16 @@ object Metrics { consoleReporter.build().start(60, TimeUnit.SECONDS) + def include(name: String, metric: Metric) = registry.register(name, metric) + + def exclude(name: String) = { + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(name) + }) + } + + + def deregister(fullName: String) = { registry.removeMatching(new MetricFilter { def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) -- cgit v1.2.3