diff options
Diffstat (limited to 'src/main/scala')
-rw-r--r-- | src/main/scala/akka/ActorInstrumentation.scala | 46 | ||||
-rw-r--r-- | src/main/scala/akka/ActorSystemAspect.scala | 18 | ||||
-rw-r--r-- | src/main/scala/akka/MailboxAspect.scala | 16 | ||||
-rw-r--r-- | src/main/scala/akka/MailboxMetrics.scala | 35 | ||||
-rw-r--r-- | src/main/scala/akka/PoolMetrics.scala | 29 | ||||
-rw-r--r-- | src/main/scala/akka/PoolMonitorInstrumentation.scala | 30 | ||||
-rw-r--r-- | src/main/scala/akka/Tracer.scala | 24 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala (renamed from src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala) | 44 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/AspectJPimps.scala | 19 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala (renamed from src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala) | 9 | ||||
-rw-r--r-- | src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala (renamed from src/main/scala/akka/instrumentation/MessageQueueMetrics.scala) | 32 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/Metrics.scala | 19 |
12 files changed, 74 insertions, 247 deletions
diff --git a/src/main/scala/akka/ActorInstrumentation.scala b/src/main/scala/akka/ActorInstrumentation.scala deleted file mode 100644 index aa14f237..00000000 --- a/src/main/scala/akka/ActorInstrumentation.scala +++ /dev/null @@ -1,46 +0,0 @@ -package akka - -import actor.ActorCell -import org.aspectj.lang.annotation.{After, Around, Pointcut, Aspect} -import org.aspectj.lang.ProceedingJoinPoint -import kamon.metric.Metrics.{ registry => meterRegistry } -import com.codahale.metrics.Meter -import kamon.metric.MetricsUtils._ - -@Aspect("perthis(actorCellCreation(*))") -class ActorInstrumentation { - - /** - * Aspect members - */ - - private val actorMeter:Meter = new Meter - - /** - * Pointcuts - */ - @Pointcut("execution(akka.actor.ActorCell+.new(..)) && this(actor)") - def actorCellCreation(actor:ActorCell):Unit = {} - - @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") - def actorReceive():Unit = {} - - /** - * Advices - */ - @After("actorCellCreation(actor)") - def afterCellCreation(actor:ActorCell):Unit ={ - val actorName:String = actor.self.path.toString - - meterRegistry.register(s"meter-for-${actorName}", actorMeter) - } - - @Around("actorReceive()") - def around(pjp: ProceedingJoinPoint) = { - import pjp._ - - markMeter(actorMeter) { - proceed - } - } - }
\ No newline at end of file diff --git a/src/main/scala/akka/ActorSystemAspect.scala b/src/main/scala/akka/ActorSystemAspect.scala deleted file mode 100644 index 9d1d515d..00000000 --- a/src/main/scala/akka/ActorSystemAspect.scala +++ /dev/null @@ -1,18 +0,0 @@ -package akka - -import org.aspectj.lang.annotation._ -import actor.ActorSystemImpl - -@Aspect -class ActorSystemAspect { - println("Created ActorSystemAspect") - - @Pointcut("execution(* akka.actor.ActorRefProvider+.init(..)) && !within(ActorSystemAspect)") - protected def actorSystem():Unit = {} - - @After("actorSystem() && args(system)") - def collectActorSystem(system: ActorSystemImpl):Unit = { - Tracer.collectActorSystem(system) - Tracer.start() - } -} diff --git a/src/main/scala/akka/MailboxAspect.scala b/src/main/scala/akka/MailboxAspect.scala deleted file mode 100644 index 5ca6d6ab..00000000 --- a/src/main/scala/akka/MailboxAspect.scala +++ /dev/null @@ -1,16 +0,0 @@ -package akka - -import org.aspectj.lang.annotation._ - -@Aspect("perthis(mailboxMonitor())") -class MailboxAspect { - println("Created MailboxAspect") - - @Pointcut("execution(akka.dispatch.Mailbox.new(..)) && !within(MailboxAspect)") - protected def mailboxMonitor():Unit = {} - - @After("mailboxMonitor() && this(mb)") - def afterInitialization(mb: akka.dispatch.Mailbox) : Unit = { - Tracer.collectMailbox(mb) - } -}
\ No newline at end of file diff --git a/src/main/scala/akka/MailboxMetrics.scala b/src/main/scala/akka/MailboxMetrics.scala deleted file mode 100644 index 6bf65cc7..00000000 --- a/src/main/scala/akka/MailboxMetrics.scala +++ /dev/null @@ -1,35 +0,0 @@ -package akka - -import akka.dispatch.Mailbox -import com.newrelic.api.agent.NewRelic - -case class MailboxMetrics(mailboxes:Map[String,Mailbox]) - - -object MailboxMetrics { - def apply(mailboxes: List[Mailbox]) = { - new MailboxMetrics(mailboxes.take(mailboxes.length - 1).map{m => (m.actor.self.path.toString -> m)}.toMap) //TODO:research why collect an ActorSystemImpl - } - - def toMap(mb: Mailbox):Map[String,Int] = Map[String,Int]( - "NumberOfMessages" -> mb.numberOfMessages, - "MailboxDispatcherThroughput" -> mb.dispatcher.throughput, - "SuspendCount" -> mb.suspendCount - ) -} - -class MailboxSenderMetrics(mailboxes:List[Mailbox]) extends Runnable { - def run() { - val mbm = MailboxMetrics(mailboxes) - mbm.mailboxes.map { case(actorName,mb) => { - println(s"Sending metrics to Newrelic MailBoxMonitor for Actor -> ${actorName}") - - MailboxMetrics.toMap(mb).map {case(property, value) => - NewRelic.recordMetric(s"${actorName}:Mailbox:${property}", value) - } - } - } - } -} - - diff --git a/src/main/scala/akka/PoolMetrics.scala b/src/main/scala/akka/PoolMetrics.scala deleted file mode 100644 index 422e34fd..00000000 --- a/src/main/scala/akka/PoolMetrics.scala +++ /dev/null @@ -1,29 +0,0 @@ -package akka - -import scala.concurrent.forkjoin.ForkJoinPool -import com.newrelic.api.agent.NewRelic - -case class PoolMetrics(poolName:String, data:Map[String,Int]) - -object PoolMetrics { - def apply(pool: ForkJoinPool) = new PoolMetrics(pool.getClass.getSimpleName, toMap(pool)) - - def toMap(pool: scala.concurrent.forkjoin.ForkJoinPool):Map[String,Int] = Map[String,Int]( - "ActiveThreadCount" -> pool.getActiveThreadCount, - "Parallelism" -> pool.getParallelism, - "PoolSize" -> pool.getPoolSize, - "QueuedSubmissionCount" -> pool.getQueuedSubmissionCount, - "StealCount" -> pool.getStealCount.toInt, - "QueuedTaskCount" -> pool.getQueuedTaskCount.toInt, - "RunningThreadCount" -> pool.getRunningThreadCount - ) -} - -class PoolMetricsSender(forkJoinPool:ForkJoinPool) extends Runnable { - def run() { - val pool = PoolMetrics(forkJoinPool) - println(s"Sending Metrics to NewRelic -> ${pool}") - pool.data.map{case(k,v) => NewRelic.recordMetric(s"${pool.poolName}:${k}",v)} - } -} - diff --git a/src/main/scala/akka/PoolMonitorInstrumentation.scala b/src/main/scala/akka/PoolMonitorInstrumentation.scala deleted file mode 100644 index e78e0d7e..00000000 --- a/src/main/scala/akka/PoolMonitorInstrumentation.scala +++ /dev/null @@ -1,30 +0,0 @@ -package akka - -import org.aspectj.lang.annotation._ -import akka.dispatch.MonitorableThreadFactory -import kamon.metric.Metrics -import scala.concurrent.forkjoin.ForkJoinPool -import com.codahale.metrics.Gauge - -@Aspect("perthis(poolMonitor(scala.concurrent.forkjoin.ForkJoinPool))") -class PoolMonitorAspect { - println("Created PoolMonitorAspect") - - - @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..)) && this(pool)") - protected def poolMonitor(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = {} - - @After("poolMonitor(pool)") - def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = { - pool.getFactory match { - case m: MonitorableThreadFactory => registerForMonitoring(pool, m.name) - } - } - - def registerForMonitoring(fjp: ForkJoinPool, name: String) { - Metrics.registry.register(s"/metrics/actorsystem/{actorsystem-name}/dispatcher/$name", - new Gauge[Long] { - def getValue: Long = fjp.getPoolSize - }) - } -} diff --git a/src/main/scala/akka/Tracer.scala b/src/main/scala/akka/Tracer.scala deleted file mode 100644 index 3b301247..00000000 --- a/src/main/scala/akka/Tracer.scala +++ /dev/null @@ -1,24 +0,0 @@ -package akka - -import actor.ActorSystemImpl -import scala.concurrent.forkjoin.ForkJoinPool -import scala.concurrent.duration._ -import akka.dispatch.Mailbox -import scala._ - -object Tracer { - protected[this] var mailboxes:List[Mailbox] = Nil - protected[this] var tracerActorSystem: ActorSystemImpl = _ - protected[this] var forkJoinPool:ForkJoinPool = _ - - def collectPool(pool: ForkJoinPool) = forkJoinPool = pool - def collectActorSystem(actorSystem: ActorSystemImpl) = tracerActorSystem = actorSystem - def collectMailbox(mb: akka.dispatch.Mailbox) = mailboxes ::= mb - - def start():Unit ={ - implicit val dispatcher = tracerActorSystem.dispatcher - - tracerActorSystem.scheduler.schedule(6 seconds, 5 second, new MailboxSenderMetrics(mailboxes)) - tracerActorSystem.scheduler.schedule(7 seconds, 5 second, new PoolMetricsSender(forkJoinPool)) - } -}
\ No newline at end of file diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 218c09cc..b345eaae 100644 --- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -1,4 +1,4 @@ -package akka.instrumentation +package kamon.instrumentation import org.aspectj.lang.annotation.{Before, Around, Pointcut, Aspect} import org.aspectj.lang.ProceedingJoinPoint @@ -8,32 +8,18 @@ import akka.dispatch.Envelope import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} import kamon.metric.{MetricDirectory, Metrics} -case class TraceableEnvelope(traceContext: TraceContext, message: Any, timeStamp: Long = System.nanoTime()) +case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timeStamp: Long = System.nanoTime()) @Aspect class ActorRefTellInstrumentation { - println("Created ActorAspect") + import ProceedingJoinPointPimp._ @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.pattern.PromiseActorRef) && args(message, sender)") def sendingMessageToActorRef(message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(message, sender)") - def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { - import pjp._ - - Kamon.context() match { - case Some(ctx) => { - val traceableMessage = TraceableEnvelope(ctx, message) - - // update the args with the new message - val args = getArgs - args.update(0, traceableMessage) - proceed(args) - } - case None => proceed - } - } + def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = pjp.proceedWith(TraceableMessage(Kamon.context, message)) } @@ -41,8 +27,7 @@ class ActorRefTellInstrumentation { class ActorCellInvokeInstrumentation { val latencyHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) - val messagesPer - @volatile var shouldTrack = false + var shouldTrack = false @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, parent)") def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {} @@ -67,20 +52,23 @@ class ActorCellInvokeInstrumentation { @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { - import pjp._ + import ProceedingJoinPointPimp._ envelope match { - case Envelope(TraceableEnvelope(ctx, msg, timeStamp), sender) => { + case Envelope(TraceableMessage(ctx, msg, timeStamp), sender) => { latencyHistogram.update(System.nanoTime() - timeStamp) - Kamon.set(ctx) - val originalEnvelope = envelope.copy(message = msg) - proceed(getArgs.updated(0, originalEnvelope)) - - Kamon.clear + ctx match { + case Some(c) => { + Kamon.set(c) + pjp.proceedWith(originalEnvelope) + Kamon.clear + } + case None => pjp.proceedWith(originalEnvelope) + } } - case _ => proceed + case _ => pjp.proceed } } } diff --git a/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/src/main/scala/kamon/instrumentation/AspectJPimps.scala new file mode 100644 index 00000000..0663e801 --- /dev/null +++ b/src/main/scala/kamon/instrumentation/AspectJPimps.scala @@ -0,0 +1,19 @@ +package kamon.instrumentation + +import org.aspectj.lang.ProceedingJoinPoint + +trait ProceedingJoinPointPimp { + import language.implicitConversions + + implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) +} + +object ProceedingJoinPointPimp extends ProceedingJoinPointPimp + +case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { + def proceedWith(newUniqueArg: AnyRef) = { + val args = pjp.getArgs + args.update(0, newUniqueArg) + pjp.proceed(args) + } +} diff --git a/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index 35e06b5d..f3ee4ee7 100644 --- a/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -1,12 +1,11 @@ -package akka.dispatch +package kamon.instrumentation import org.aspectj.lang.annotation._ import java.util.concurrent._ -import scala.concurrent.forkjoin.ForkJoinPool import org.aspectj.lang.ProceedingJoinPoint import java.util -import akka.dispatch.NamedExecutorServiceFactoryDelegate import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector} +import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory} case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { @@ -21,7 +20,7 @@ class ExecutorServiceFactoryProviderInstrumentation { @Around("factoryMethodCall(id, threadFactory)") def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { - val delegate = pjp.proceed(Array[AnyRef](id, threadFactory)).asInstanceOf[ExecutorServiceFactory] // Safe Cast + val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast val actorSystemName = threadFactory match { case m: MonitorableThreadFactory => m.name @@ -42,7 +41,7 @@ class NamedExecutorServiceFactoryDelegateInstrumentation { @Around("factoryMethodCall(namedFactory)") def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { - val delegate = pjp.proceed(Array[AnyRef](namedFactory)).asInstanceOf[ExecutorService] + val delegate = pjp.proceed().asInstanceOf[ExecutorService] val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) ExecutorServiceMetricCollector.register(executorFullName, delegate) diff --git a/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala index a7f5cdc8..75d6189c 100644 --- a/src/main/scala/akka/instrumentation/MessageQueueMetrics.scala +++ b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -1,4 +1,4 @@ -package akka.instrumentation +package kamon.instrumentation import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} import akka.dispatch.{Envelope, MessageQueue} @@ -25,40 +25,42 @@ class MessageQueueInstrumentation { val delegate = pjp.proceed.asInstanceOf[MessageQueue] // We are not interested in monitoring mailboxes if we don't know where they belong to. - val monitoredMailbox = for(own <- owner; sys <- system) yield new MonitoredMessageQueue(delegate, own, sys) + val monitoredMailbox = for(own <- owner; sys <- system) yield { + val systemName = sys.name + val ownerName = MetricDirectory.nameForActor(own) + val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName) + + val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir()) + Metrics.include(mailBoxName, queueSizeHistogram) + + new MonitoredMessageQueue(delegate, queueSizeHistogram) + } monitoredMailbox match { case None => delegate case Some(mmb) => mmb } - } } -class MonitoredMessageQueue(val delegate: MessageQueue, owner: ActorRef, system: ActorSystem) extends MessageQueue { - val queueSizeHistogram: Histogram = new Histogram(new ExponentiallyDecayingReservoir) - - val fullName = MetricDirectory.nameForMailbox(system.name, MetricDirectory.nameForActor(owner)) - Metrics.registry.register(fullName, queueSizeHistogram) +class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue { def enqueue(receiver: ActorRef, handle: Envelope) = { - queueSizeHistogram.update(numberOfMessages) delegate.enqueue(receiver, handle) + queueSizeHistogram.update(numberOfMessages) } def dequeue(): Envelope = { + val envelope = delegate.dequeue() queueSizeHistogram.update(numberOfMessages) - delegate.dequeue() + + envelope } def numberOfMessages: Int = delegate.numberOfMessages def hasMessages: Boolean = delegate.hasMessages - def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = { - Metrics.deregister(fullName) - - delegate.cleanUp(owner, deadLetters) - } + def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) } diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala index ebf4fd2b..30b8bda9 100644 --- a/src/main/scala/kamon/metric/Metrics.scala +++ b/src/main/scala/kamon/metric/Metrics.scala @@ -4,7 +4,14 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics._ import akka.actor.ActorRef -object Metrics { +trait MetricDepot { + def include(name: String, metric: Metric): Unit + def exclude(name: String): Unit +} + + + +object Metrics extends MetricDepot { val registry: MetricRegistry = new MetricRegistry val consoleReporter = ConsoleReporter.forRegistry(registry) @@ -14,6 +21,16 @@ object Metrics { consoleReporter.build().start(60, TimeUnit.SECONDS) + def include(name: String, metric: Metric) = registry.register(name, metric) + + def exclude(name: String) = { + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(name) + }) + } + + + def deregister(fullName: String) = { registry.removeMatching(new MetricFilter { def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) |