aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/kamon/instrumentation
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-08-07 19:06:33 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-08-07 19:06:33 -0300
commit923b88e8adef2f66b43e551fa4a0a1bbae5af7ff (patch)
treed555199f0c63b690ec51805b496ee2d54eb014da /src/main/scala/kamon/instrumentation
parent1e6665e30d96772eab92aca4d23e176adcd88dc5 (diff)
downloadKamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.tar.gz
Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.tar.bz2
Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.zip
upgrading to akka 2.2
Diffstat (limited to 'src/main/scala/kamon/instrumentation')
-rw-r--r--src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala89
-rw-r--r--src/main/scala/kamon/instrumentation/AspectJPimps.scala23
-rw-r--r--src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala245
-rw-r--r--src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala73
-rw-r--r--src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala61
-rw-r--r--src/main/scala/kamon/instrumentation/SampleInstrumentation.scala49
6 files changed, 0 insertions, 540 deletions
diff --git a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
deleted file mode 100644
index 7398a2bd..00000000
--- a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-package kamon.instrumentation
-
-import org.aspectj.lang.annotation._
-import org.aspectj.lang.ProceedingJoinPoint
-import akka.actor.{Props, ActorSystem, ActorRef}
-import kamon.{Kamon, TraceContext}
-import akka.dispatch.{MessageDispatcher, Envelope}
-import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram}
-import kamon.metric.{MetricDirectory, Metrics}
-import com.codahale.metrics
-import kamon.instrumentation.TraceableMessage
-import scala.Some
-
-case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
-
-
-@Aspect
-class ActorRefTellInstrumentation {
- import ProceedingJoinPointPimp._
-
- @Pointcut("execution(* akka.actor.LocalActorRef+.$bang(..)) && target(actor) && args(message, sender)")
- def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}
-
- @Around("sendingMessageToActorRef(actor, message, sender)")
- def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = {
-
- val actorName = MetricDirectory.nameForActor(actor)
- val t = Metrics.registry.timer(actorName + "LATENCY")
- //println(s"About to proceed with: $actor $message $sender")
- pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender)
- }
-}
-
-
-@Aspect("perthis(actorCellCreation(..))")
-class ActorCellInvokeInstrumentation {
-
- var processingTimeTimer: Timer = _
- var shouldTrack = false
-
- // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut.
-
- @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
- def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
-
- @After("actorCellCreation(system, ref, props, dispatcher, parent)")
- def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
- val actorName = MetricDirectory.nameForActor(ref)
- val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)
-
- /** TODO: Find a better way to filter the things we don't want to measure. */
- //if(system.name != "kamon" && actorName.startsWith("/user")) {
- processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME")
- shouldTrack = true
- //}
- }
-
-
- @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)")
- def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
-
-
- @Around("invokingActorBehaviourAtActorCell(envelope)")
- def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
- import ProceedingJoinPointPimp._
- //println("ENVELOPE --------------------->"+envelope)
- envelope match {
- case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
- timer.stop()
-
- val originalEnvelope = envelope.copy(message = msg)
-
- //println("PROCESSING TIME TIMER: "+processingTimeTimer)
- val pt = processingTimeTimer.time()
- ctx match {
- case Some(c) => {
- Kamon.set(c)
- //println("ENVELOPE ORIGINAL:---------------->"+originalEnvelope)
- pjp.proceedWith(originalEnvelope)
- Kamon.clear
- }
- case None => pjp.proceedWith(originalEnvelope)
- }
- pt.stop()
- }
- case _ => pjp.proceed
- }
- }
-}
diff --git a/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/src/main/scala/kamon/instrumentation/AspectJPimps.scala
deleted file mode 100644
index 84c20c52..00000000
--- a/src/main/scala/kamon/instrumentation/AspectJPimps.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package kamon.instrumentation
-
-import org.aspectj.lang.ProceedingJoinPoint
-
-trait ProceedingJoinPointPimp {
- import language.implicitConversions
-
- implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp)
-}
-
-object ProceedingJoinPointPimp extends ProceedingJoinPointPimp
-
-case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) {
- def proceedWith(newUniqueArg: AnyRef) = {
- val args = pjp.getArgs
- args.update(0, newUniqueArg)
- pjp.proceed(args)
- }
-
- def proceedWithTarget(args: AnyRef*) = {
- pjp.proceed(args.toArray)
- }
-}
diff --git a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
deleted file mode 100644
index b4f8a475..00000000
--- a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-package kamon.instrumentation
-
-import org.aspectj.lang.annotation._
-import java.util.concurrent._
-import org.aspectj.lang.ProceedingJoinPoint
-import java.util
-import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector}
-import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory}
-import com.typesafe.config.Config
-import kamon.Kamon
-import scala.concurrent.forkjoin.ForkJoinPool
-import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
-
-
-@Aspect
-class ActorSystemInstrumentation {
-
- @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)")
- def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {}
-
- @After("actorSystemInstantiation(name, applicationConfig, classLoader)")
- def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = {
-
- Kamon.Metric.registerActorSystem(name)
- }
-}
-
-@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))")
-class ForkJoinPoolInstrumentation {
- var activeThreadsHistogram: Histogram = _
- var poolSizeHistogram: Histogram = _
-
- @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)")
- def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {}
-
- @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)")
- def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = {
- val (actorSystemName, dispatcherName) = threadFactory match {
- case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames)
- case _ => ("Unknown", "Unknown")
- }
-
- val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName)
- for(m <- metrics) {
- activeThreadsHistogram = m.activeThreadCount
- poolSizeHistogram = m.poolSize
- println(s"Registered $dispatcherName for actor system $actorSystemName")
- }
- }
-
- def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = {
- knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName => (asName, threadFactoryName.substring(asName.length+1))).getOrElse(("Unkown", "Unkown"))
- }
-
-
-
-
- @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)")
- def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {}
-
- @After("forkJoinScan(fjp)")
- def updateMetrics(fjp: AkkaForkJoinPool): Unit = {
- activeThreadsHistogram.update(fjp.getActiveThreadCount)
- poolSizeHistogram.update(fjp.getPoolSize)
- }
-
-
-
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-/**
- * ExecutorService monitoring base:
- */
-trait ExecutorServiceCollector {
- def updateActiveThreadCount(diff: Int): Unit
- def updateTotalThreadCount(diff: Int): Unit
- def updateQueueSize(diff: Int): Unit
-}
-
-trait WatchedExecutorService {
- def collector: ExecutorServiceCollector
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-trait ExecutorServiceMonitoring {
- def dispatcherMetrics: DispatcherMetricCollector
-}
-
-class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring {
- @volatile var dispatcherMetrics: DispatcherMetricCollector = _
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
- def createExecutorService: ExecutorService = delegate.createExecutorService
-}
-
-@Aspect
-class ExecutorServiceFactoryProviderInstrumentation {
-
- @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()")
- def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = {
- true
- }
-
- @Around("factoryMethodCall(dispatcherName, threadFactory)")
- def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
- val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast
-
- val actorSystemName = threadFactory match {
- case m: MonitorableThreadFactory => m.name
- case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name.
- }
-
- new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate)
- }
-
-}
-
-
-@Aspect
-class NamedExecutorServiceFactoryDelegateInstrumentation {
-
- @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)")
- def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {}
-
- @Around("factoryMethodCall(namedFactory)")
- def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = {
- val delegate = pjp.proceed().asInstanceOf[ExecutorService]
- val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)
-
- ExecutorServiceMetricCollector.register(executorFullName, delegate)
-
- new NamedExecutorServiceDelegate(executorFullName, delegate)
- }
-}
-
-case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService {
- def shutdown() = {
- ExecutorServiceMetricCollector.deregister(fullName)
- delegate.shutdown()
- }
- def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
- def isShutdown: Boolean = delegate.isShutdown
- def isTerminated: Boolean = delegate.isTerminated
- def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit)
- def submit[T](task: Callable[T]): Future[T] = delegate.submit(task)
- def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result)
- def submit(task: Runnable): Future[_] = delegate.submit(task)
- def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks)
- def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit)
- def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks)
- def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit)
- def execute(command: Runnable) = delegate.execute(command)
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
deleted file mode 100644
index c21502ac..00000000
--- a/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-package kamon.instrumentation
-
-import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
-import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue}
-import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect}
-import akka.actor.{ActorSystem, ActorRef}
-import kamon.metric.{Metrics, MetricDirectory}
-import org.aspectj.lang.ProceedingJoinPoint
-
-
-/**
- * For Mailboxes we would like to track the queue size and message latency. Currently the latency
- * will be gathered from the ActorCellMetrics.
- */
-
-
-@Aspect
-class MessageQueueInstrumentation {
-
- @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)")
- def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {}
-
- @Around("messageQueueCreation(owner, system)")
- def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = {
- val delegate = pjp.proceed.asInstanceOf[MessageQueue]
-
- // We are not interested in monitoring mailboxes if we don't know where they belong to.
- val monitoredMailbox = for(own <- owner; sys <- system) yield {
- val systemName = sys.name
- val ownerName = MetricDirectory.nameForActor(own)
- val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName)
-
- val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir())
- Metrics.include(mailBoxName, queueSizeHistogram)
-
- new MonitoredMessageQueue(delegate, queueSizeHistogram)
- }
-
- monitoredMailbox match {
- case None => delegate
- case Some(mmb) => mmb
- }
- }
-}
-
-
-class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{
-
- def enqueue(receiver: ActorRef, handle: Envelope) = {
- delegate.enqueue(receiver, handle)
- queueSizeHistogram.update(numberOfMessages)
- }
-
- def dequeue(): Envelope = {
- val envelope = delegate.dequeue()
- queueSizeHistogram.update(numberOfMessages)
-
- envelope
- }
-
- def numberOfMessages: Int = delegate.numberOfMessages
- def hasMessages: Boolean = delegate.hasMessages
- def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters)
-}
-
-
-
-
-
-
-
-
-
diff --git a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
deleted file mode 100644
index e75a638f..00000000
--- a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-package kamon.instrumentation
-
-import org.aspectj.lang.annotation._
-import kamon.{Kamon, TraceContext}
-import org.aspectj.lang.ProceedingJoinPoint
-import scala.Some
-
-/**
- * Marker interface, just to make sure we don't instrument all the Runnables in the classpath.
- */
-trait TraceContextAwareRunnable extends Runnable {}
-
-
-@Aspect("perthis(instrumentedRunnableCreation())")
-class RunnableInstrumentation {
-
- /**
- * These are the Runnables that need to be instrumented and make the TraceContext available
- * while their run method is executed.
- */
- @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
- def onCompleteCallbacksRunnable: TraceContextAwareRunnable = null
-
-
- /**
- * Pointcuts
- */
-
- @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..))")
- def instrumentedRunnableCreation(): Unit = {}
-
- @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable.run())")
- def runnableExecution() = {}
-
-
- /**
- * Aspect members
- */
-
- private val traceContext = Kamon.context
-
-
- /**
- * Advices
- */
- import kamon.TraceContextSwap.withContext
-
- @Before("instrumentedRunnableCreation()")
- def beforeCreation = {
- //println((new Throwable).getStackTraceString)
- }
-
-
- @Around("runnableExecution()")
- def around(pjp: ProceedingJoinPoint) = {
- import pjp._
-
- withContext(traceContext, proceed())
- }
-
-}
diff --git a/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala
deleted file mode 100644
index 74261403..00000000
--- a/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-package kamon.instrumentation
-
-import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram}
-import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect}
-
-class ActorCage(val name: String, val size: Int) {
-
- def doIt: Unit = println("name")
-}
-
-trait CageMonitoring {
- def histogram: Histogram
- def count(value: Int): Unit
-}
-
-class CageMonitoringImp extends CageMonitoring{
- final val histogram = new Histogram(new ExponentiallyDecayingReservoir())
-
- def count(value: Int) = histogram.update(value)
-
-}
-
-
-@Aspect
-class InceptionAspect {
-
- @DeclareMixin("kamon.instrumentation.ActorCage")
- def mixin: CageMonitoring = new CageMonitoringImp
-
-
- @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)")
- def theActorCageDidIt(actorCage: CageMonitoring) = {}
-
- @After("theActorCageDidIt(actorCage)")
- def afterDoingIt(actorCage: CageMonitoring) = {
- actorCage.count(1)
- actorCage.histogram.getSnapshot.dump(System.out)
- }
-
-
-
-}
-
-
-object Runner extends App {
- val cage = new ActorCage("ivan", 10)
-
- cage.doIt
-}