aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/instrumentation
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-08-07 19:06:33 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-08-07 19:06:33 -0300
commit923b88e8adef2f66b43e551fa4a0a1bbae5af7ff (patch)
treed555199f0c63b690ec51805b496ee2d54eb014da /kamon-core/src/main/scala/kamon/instrumentation
parent1e6665e30d96772eab92aca4d23e176adcd88dc5 (diff)
downloadKamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.tar.gz
Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.tar.bz2
Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.zip
upgrading to akka 2.2
Diffstat (limited to 'kamon-core/src/main/scala/kamon/instrumentation')
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala89
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala23
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala245
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala73
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala61
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala49
6 files changed, 540 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
new file mode 100644
index 00000000..82915ce9
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
@@ -0,0 +1,89 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import akka.actor.{Props, ActorSystem, ActorRef}
+import kamon.{Kamon, TraceContext}
+import akka.dispatch.{MessageDispatcher, Envelope}
+import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram}
+import kamon.metric.{MetricDirectory, Metrics}
+import com.codahale.metrics
+import kamon.instrumentation.TraceableMessage
+import scala.Some
+
+case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
+
+
+@Aspect
+class ActorRefTellInstrumentation {
+ import ProceedingJoinPointPimp._
+
+ @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)")
+ def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}
+
+ @Around("sendingMessageToActorRef(actor, message, sender)")
+ def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = {
+
+ val actorName = MetricDirectory.nameForActor(actor)
+ val t = Metrics.registry.timer(actorName + "LATENCY")
+ //println(s"About to proceed with: $actor $message $sender ${Kamon.context}")
+ pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender)
+ }
+}
+
+
+@Aspect("perthis(actorCellCreation(..))")
+class ActorCellInvokeInstrumentation {
+
+ var processingTimeTimer: Timer = _
+ var shouldTrack = false
+
+ // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut.
+
+ @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
+ def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
+
+ @After("actorCellCreation(system, ref, props, dispatcher, parent)")
+ def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
+ val actorName = MetricDirectory.nameForActor(ref)
+ val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
+
+ /** TODO: Find a better way to filter the things we don't want to measure. */
+ //if(system.name != "kamon" && actorName.startsWith("/user")) {
+ processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME")
+ shouldTrack = true
+ //}
+ }
+
+
+ @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)")
+ def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
+
+
+ @Around("invokingActorBehaviourAtActorCell(envelope)")
+ def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
+ import ProceedingJoinPointPimp._
+ println("ENVELOPE --------------------->"+envelope)
+ envelope match {
+ case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
+ timer.stop()
+
+ val originalEnvelope = envelope.copy(message = msg)
+
+ //println("PROCESSING TIME TIMER: "+processingTimeTimer)
+ val pt = processingTimeTimer.time()
+ ctx match {
+ case Some(c) => {
+ Kamon.set(c)
+ println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
+ pjp.proceedWith(originalEnvelope)
+ Kamon.clear
+ }
+ case None => pjp.proceedWith(originalEnvelope)
+ }
+ pt.stop()
+ }
+ case _ => pjp.proceed
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala
new file mode 100644
index 00000000..84c20c52
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala
@@ -0,0 +1,23 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.ProceedingJoinPoint
+
+trait ProceedingJoinPointPimp {
+ import language.implicitConversions
+
+ implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp)
+}
+
+object ProceedingJoinPointPimp extends ProceedingJoinPointPimp
+
+case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) {
+ def proceedWith(newUniqueArg: AnyRef) = {
+ val args = pjp.getArgs
+ args.update(0, newUniqueArg)
+ pjp.proceed(args)
+ }
+
+ def proceedWithTarget(args: AnyRef*) = {
+ pjp.proceed(args.toArray)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
new file mode 100644
index 00000000..b4f8a475
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
@@ -0,0 +1,245 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import java.util.concurrent._
+import org.aspectj.lang.ProceedingJoinPoint
+import java.util
+import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector}
+import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory}
+import com.typesafe.config.Config
+import kamon.Kamon
+import scala.concurrent.forkjoin.ForkJoinPool
+import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
+
+
+@Aspect
+class ActorSystemInstrumentation {
+
+ @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)")
+ def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {}
+
+ @After("actorSystemInstantiation(name, applicationConfig, classLoader)")
+ def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = {
+
+ Kamon.Metric.registerActorSystem(name)
+ }
+}
+
+@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))")
+class ForkJoinPoolInstrumentation {
+ var activeThreadsHistogram: Histogram = _
+ var poolSizeHistogram: Histogram = _
+
+ @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)")
+ def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {}
+
+ @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)")
+ def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = {
+ val (actorSystemName, dispatcherName) = threadFactory match {
+ case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames)
+ case _ => ("Unknown", "Unknown")
+ }
+
+ val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName)
+ for(m <- metrics) {
+ activeThreadsHistogram = m.activeThreadCount
+ poolSizeHistogram = m.poolSize
+ println(s"Registered $dispatcherName for actor system $actorSystemName")
+ }
+ }
+
+ def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = {
+ knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName => (asName, threadFactoryName.substring(asName.length+1))).getOrElse(("Unkown", "Unkown"))
+ }
+
+
+
+
+ @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)")
+ def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {}
+
+ @After("forkJoinScan(fjp)")
+ def updateMetrics(fjp: AkkaForkJoinPool): Unit = {
+ activeThreadsHistogram.update(fjp.getActiveThreadCount)
+ poolSizeHistogram.update(fjp.getPoolSize)
+ }
+
+
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+/**
+ * ExecutorService monitoring base:
+ */
+trait ExecutorServiceCollector {
+ def updateActiveThreadCount(diff: Int): Unit
+ def updateTotalThreadCount(diff: Int): Unit
+ def updateQueueSize(diff: Int): Unit
+}
+
+trait WatchedExecutorService {
+ def collector: ExecutorServiceCollector
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+trait ExecutorServiceMonitoring {
+ def dispatcherMetrics: DispatcherMetricCollector
+}
+
+class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring {
+ @volatile var dispatcherMetrics: DispatcherMetricCollector = _
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
+ def createExecutorService: ExecutorService = delegate.createExecutorService
+}
+
+@Aspect
+class ExecutorServiceFactoryProviderInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()")
+ def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = {
+ true
+ }
+
+ @Around("factoryMethodCall(dispatcherName, threadFactory)")
+ def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
+ val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast
+
+ val actorSystemName = threadFactory match {
+ case m: MonitorableThreadFactory => m.name
+ case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name.
+ }
+
+ new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate)
+ }
+
+}
+
+
+@Aspect
+class NamedExecutorServiceFactoryDelegateInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)")
+ def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {}
+
+ @Around("factoryMethodCall(namedFactory)")
+ def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = {
+ val delegate = pjp.proceed().asInstanceOf[ExecutorService]
+ val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)
+
+ ExecutorServiceMetricCollector.register(executorFullName, delegate)
+
+ new NamedExecutorServiceDelegate(executorFullName, delegate)
+ }
+}
+
+case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService {
+ def shutdown() = {
+ ExecutorServiceMetricCollector.deregister(fullName)
+ delegate.shutdown()
+ }
+ def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
+ def isShutdown: Boolean = delegate.isShutdown
+ def isTerminated: Boolean = delegate.isTerminated
+ def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit)
+ def submit[T](task: Callable[T]): Future[T] = delegate.submit(task)
+ def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result)
+ def submit(task: Runnable): Future[_] = delegate.submit(task)
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks)
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit)
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks)
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit)
+ def execute(command: Runnable) = delegate.execute(command)
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
new file mode 100644
index 00000000..c21502ac
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
@@ -0,0 +1,73 @@
+package kamon.instrumentation
+
+import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
+import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue}
+import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect}
+import akka.actor.{ActorSystem, ActorRef}
+import kamon.metric.{Metrics, MetricDirectory}
+import org.aspectj.lang.ProceedingJoinPoint
+
+
+/**
+ * For Mailboxes we would like to track the queue size and message latency. Currently the latency
+ * will be gathered from the ActorCellMetrics.
+ */
+
+
+@Aspect
+class MessageQueueInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)")
+ def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {}
+
+ @Around("messageQueueCreation(owner, system)")
+ def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = {
+ val delegate = pjp.proceed.asInstanceOf[MessageQueue]
+
+ // We are not interested in monitoring mailboxes if we don't know where they belong to.
+ val monitoredMailbox = for(own <- owner; sys <- system) yield {
+ val systemName = sys.name
+ val ownerName = MetricDirectory.nameForActor(own)
+ val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName)
+
+ val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir())
+ Metrics.include(mailBoxName, queueSizeHistogram)
+
+ new MonitoredMessageQueue(delegate, queueSizeHistogram)
+ }
+
+ monitoredMailbox match {
+ case None => delegate
+ case Some(mmb) => mmb
+ }
+ }
+}
+
+
+class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{
+
+ def enqueue(receiver: ActorRef, handle: Envelope) = {
+ delegate.enqueue(receiver, handle)
+ queueSizeHistogram.update(numberOfMessages)
+ }
+
+ def dequeue(): Envelope = {
+ val envelope = delegate.dequeue()
+ queueSizeHistogram.update(numberOfMessages)
+
+ envelope
+ }
+
+ def numberOfMessages: Int = delegate.numberOfMessages
+ def hasMessages: Boolean = delegate.hasMessages
+ def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters)
+}
+
+
+
+
+
+
+
+
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
new file mode 100644
index 00000000..e75a638f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
@@ -0,0 +1,61 @@
+package kamon.instrumentation
+
+import org.aspectj.lang.annotation._
+import kamon.{Kamon, TraceContext}
+import org.aspectj.lang.ProceedingJoinPoint
+import scala.Some
+
+/**
+ * Marker interface, just to make sure we don't instrument all the Runnables in the classpath.
+ */
+trait TraceContextAwareRunnable extends Runnable {}
+
+
+@Aspect("perthis(instrumentedRunnableCreation())")
+class RunnableInstrumentation {
+
+ /**
+ * These are the Runnables that need to be instrumented and make the TraceContext available
+ * while their run method is executed.
+ */
+ @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
+ def onCompleteCallbacksRunnable: TraceContextAwareRunnable = null
+
+
+ /**
+ * Pointcuts
+ */
+
+ @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..))")
+ def instrumentedRunnableCreation(): Unit = {}
+
+ @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable.run())")
+ def runnableExecution() = {}
+
+
+ /**
+ * Aspect members
+ */
+
+ private val traceContext = Kamon.context
+
+
+ /**
+ * Advices
+ */
+ import kamon.TraceContextSwap.withContext
+
+ @Before("instrumentedRunnableCreation()")
+ def beforeCreation = {
+ //println((new Throwable).getStackTraceString)
+ }
+
+
+ @Around("runnableExecution()")
+ def around(pjp: ProceedingJoinPoint) = {
+ import pjp._
+
+ withContext(traceContext, proceed())
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala
new file mode 100644
index 00000000..74261403
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala
@@ -0,0 +1,49 @@
+package kamon.instrumentation
+
+import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
+import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect}
+
+class ActorCage(val name: String, val size: Int) {
+
+ def doIt: Unit = println("name")
+}
+
+trait CageMonitoring {
+ def histogram: Histogram
+ def count(value: Int): Unit
+}
+
+class CageMonitoringImp extends CageMonitoring{
+ final val histogram = new Histogram(new ExponentiallyDecayingReservoir())
+
+ def count(value: Int) = histogram.update(value)
+
+}
+
+
+@Aspect
+class InceptionAspect {
+
+ @DeclareMixin("kamon.instrumentation.ActorCage")
+ def mixin: CageMonitoring = new CageMonitoringImp
+
+
+ @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)")
+ def theActorCageDidIt(actorCage: CageMonitoring) = {}
+
+ @After("theActorCageDidIt(actorCage)")
+ def afterDoingIt(actorCage: CageMonitoring) = {
+ actorCage.count(1)
+ actorCage.histogram.getSnapshot.dump(System.out)
+ }
+
+
+
+}
+
+
+object Runner extends App {
+ val cage = new ActorCage("ivan", 10)
+
+ cage.doIt
+}