diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-08-07 19:06:33 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-08-07 19:06:33 -0300 |
commit | cd1a9dd25fb550a515e7a7408b88233773268c38 (patch) | |
tree | 98c16e292c533cc9aa51bb0f073864b1f9e2b68a /kamon-core/src/main/scala/kamon/instrumentation | |
parent | 6566e1c41510e54dd987d3e34e40f1031169d592 (diff) | |
download | Kamon-cd1a9dd25fb550a515e7a7408b88233773268c38.tar.gz Kamon-cd1a9dd25fb550a515e7a7408b88233773268c38.tar.bz2 Kamon-cd1a9dd25fb550a515e7a7408b88233773268c38.zip |
upgrading to akka 2.2
Diffstat (limited to 'kamon-core/src/main/scala/kamon/instrumentation')
6 files changed, 540 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala new file mode 100644 index 00000000..82915ce9 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -0,0 +1,89 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{Props, ActorSystem, ActorRef} +import kamon.{Kamon, TraceContext} +import akka.dispatch.{MessageDispatcher, Envelope} +import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram} +import kamon.metric.{MetricDirectory, Metrics} +import com.codahale.metrics +import kamon.instrumentation.TraceableMessage +import scala.Some + +case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) + + +@Aspect +class ActorRefTellInstrumentation { + import ProceedingJoinPointPimp._ + + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && target(actor) && args(message, sender)") + def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} + + @Around("sendingMessageToActorRef(actor, message, sender)") + def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit = { + + val actorName = MetricDirectory.nameForActor(actor) + val t = Metrics.registry.timer(actorName + "LATENCY") + //println(s"About to proceed with: $actor $message $sender ${Kamon.context}") + pjp.proceedWithTarget(actor, TraceableMessage(Kamon.context, message, t.time()), sender) + } +} + + +@Aspect("perthis(actorCellCreation(..))") +class ActorCellInvokeInstrumentation { + + var processingTimeTimer: Timer = _ + var shouldTrack = false + + // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(system, ref, props, dispatcher, parent)") + def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { + val actorName = MetricDirectory.nameForActor(ref) + val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) + + /** TODO: Find a better way to filter the things we don't want to measure. */ + //if(system.name != "kamon" && actorName.startsWith("/user")) { + processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") + shouldTrack = true + //} + } + + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { + import ProceedingJoinPointPimp._ + println("ENVELOPE --------------------->"+envelope) + envelope match { + case Envelope(TraceableMessage(ctx, msg, timer), sender) => { + timer.stop() + + val originalEnvelope = envelope.copy(message = msg) + + //println("PROCESSING TIME TIMER: "+processingTimeTimer) + val pt = processingTimeTimer.time() + ctx match { + case Some(c) => { + Kamon.set(c) + println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + pjp.proceedWith(originalEnvelope) + Kamon.clear + } + case None => pjp.proceedWith(originalEnvelope) + } + pt.stop() + } + case _ => pjp.proceed + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala new file mode 100644 index 00000000..84c20c52 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala @@ -0,0 +1,23 @@ +package kamon.instrumentation + +import org.aspectj.lang.ProceedingJoinPoint + +trait ProceedingJoinPointPimp { + import language.implicitConversions + + implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) +} + +object ProceedingJoinPointPimp extends ProceedingJoinPointPimp + +case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { + def proceedWith(newUniqueArg: AnyRef) = { + val args = pjp.getArgs + args.update(0, newUniqueArg) + pjp.proceed(args) + } + + def proceedWithTarget(args: AnyRef*) = { + pjp.proceed(args.toArray) + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala new file mode 100644 index 00000000..b4f8a475 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -0,0 +1,245 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import java.util.concurrent._ +import org.aspectj.lang.ProceedingJoinPoint +import java.util +import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector} +import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory} +import com.typesafe.config.Config +import kamon.Kamon +import scala.concurrent.forkjoin.ForkJoinPool +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool + + +@Aspect +class ActorSystemInstrumentation { + + @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)") + def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {} + + @After("actorSystemInstantiation(name, applicationConfig, classLoader)") + def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = { + + Kamon.Metric.registerActorSystem(name) + } +} + +@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))") +class ForkJoinPoolInstrumentation { + var activeThreadsHistogram: Histogram = _ + var poolSizeHistogram: Histogram = _ + + @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)") + def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {} + + @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)") + def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = { + val (actorSystemName, dispatcherName) = threadFactory match { + case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames) + case _ => ("Unknown", "Unknown") + } + + val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName) + for(m <- metrics) { + activeThreadsHistogram = m.activeThreadCount + poolSizeHistogram = m.poolSize + println(s"Registered $dispatcherName for actor system $actorSystemName") + } + } + + def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = { + knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName => (asName, threadFactoryName.substring(asName.length+1))).getOrElse(("Unkown", "Unkown")) + } + + + + + @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)") + def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {} + + @After("forkJoinScan(fjp)") + def updateMetrics(fjp: AkkaForkJoinPool): Unit = { + activeThreadsHistogram.update(fjp.getActiveThreadCount) + poolSizeHistogram.update(fjp.getPoolSize) + } + + + +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +/** + * ExecutorService monitoring base: + */ +trait ExecutorServiceCollector { + def updateActiveThreadCount(diff: Int): Unit + def updateTotalThreadCount(diff: Int): Unit + def updateQueueSize(diff: Int): Unit +} + +trait WatchedExecutorService { + def collector: ExecutorServiceCollector +} + + + + + + + + + + + + + + +trait ExecutorServiceMonitoring { + def dispatcherMetrics: DispatcherMetricCollector +} + +class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring { + @volatile var dispatcherMetrics: DispatcherMetricCollector = _ +} + + + + + + + + + + + + + + + + +case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { + def createExecutorService: ExecutorService = delegate.createExecutorService +} + +@Aspect +class ExecutorServiceFactoryProviderInstrumentation { + + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()") + def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = { + true + } + + @Around("factoryMethodCall(dispatcherName, threadFactory)") + def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast + + val actorSystemName = threadFactory match { + case m: MonitorableThreadFactory => m.name + case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. + } + + new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate) + } + +} + + +@Aspect +class NamedExecutorServiceFactoryDelegateInstrumentation { + + @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)") + def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {} + + @Around("factoryMethodCall(namedFactory)") + def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { + val delegate = pjp.proceed().asInstanceOf[ExecutorService] + val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) + + ExecutorServiceMetricCollector.register(executorFullName, delegate) + + new NamedExecutorServiceDelegate(executorFullName, delegate) + } +} + +case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { + def shutdown() = { + ExecutorServiceMetricCollector.deregister(fullName) + delegate.shutdown() + } + def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() + def isShutdown: Boolean = delegate.isShutdown + def isTerminated: Boolean = delegate.isTerminated + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit) + def submit[T](task: Callable[T]): Future[T] = delegate.submit(task) + def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result) + def submit(task: Runnable): Future[_] = delegate.submit(task) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks) + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks) + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit) + def execute(command: Runnable) = delegate.execute(command) +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala new file mode 100644 index 00000000..c21502ac --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -0,0 +1,73 @@ +package kamon.instrumentation + +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue} +import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} +import akka.actor.{ActorSystem, ActorRef} +import kamon.metric.{Metrics, MetricDirectory} +import org.aspectj.lang.ProceedingJoinPoint + + +/** + * For Mailboxes we would like to track the queue size and message latency. Currently the latency + * will be gathered from the ActorCellMetrics. + */ + + +@Aspect +class MessageQueueInstrumentation { + + @Pointcut("execution(* akka.dispatch.MailboxType+.create(..)) && args(owner, system)") + def messageQueueCreation(owner: Option[ActorRef], system: Option[ActorSystem]) = {} + + @Around("messageQueueCreation(owner, system)") + def wrapMessageQueue(pjp: ProceedingJoinPoint, owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { + val delegate = pjp.proceed.asInstanceOf[MessageQueue] + + // We are not interested in monitoring mailboxes if we don't know where they belong to. + val monitoredMailbox = for(own <- owner; sys <- system) yield { + val systemName = sys.name + val ownerName = MetricDirectory.nameForActor(own) + val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName) + + val queueSizeHistogram = new Histogram(new ExponentiallyDecayingReservoir()) + Metrics.include(mailBoxName, queueSizeHistogram) + + new MonitoredMessageQueue(delegate, queueSizeHistogram) + } + + monitoredMailbox match { + case None => delegate + case Some(mmb) => mmb + } + } +} + + +class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{ + + def enqueue(receiver: ActorRef, handle: Envelope) = { + delegate.enqueue(receiver, handle) + queueSizeHistogram.update(numberOfMessages) + } + + def dequeue(): Envelope = { + val envelope = delegate.dequeue() + queueSizeHistogram.update(numberOfMessages) + + envelope + } + + def numberOfMessages: Int = delegate.numberOfMessages + def hasMessages: Boolean = delegate.hasMessages + def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) +} + + + + + + + + + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala new file mode 100644 index 00000000..e75a638f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -0,0 +1,61 @@ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import kamon.{Kamon, TraceContext} +import org.aspectj.lang.ProceedingJoinPoint +import scala.Some + +/** + * Marker interface, just to make sure we don't instrument all the Runnables in the classpath. + */ +trait TraceContextAwareRunnable extends Runnable {} + + +@Aspect("perthis(instrumentedRunnableCreation())") +class RunnableInstrumentation { + + /** + * These are the Runnables that need to be instrumented and make the TraceContext available + * while their run method is executed. + */ + @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") + def onCompleteCallbacksRunnable: TraceContextAwareRunnable = null + + + /** + * Pointcuts + */ + + @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..))") + def instrumentedRunnableCreation(): Unit = {} + + @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable.run())") + def runnableExecution() = {} + + + /** + * Aspect members + */ + + private val traceContext = Kamon.context + + + /** + * Advices + */ + import kamon.TraceContextSwap.withContext + + @Before("instrumentedRunnableCreation()") + def beforeCreation = { + //println((new Throwable).getStackTraceString) + } + + + @Around("runnableExecution()") + def around(pjp: ProceedingJoinPoint) = { + import pjp._ + + withContext(traceContext, proceed()) + } + +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala new file mode 100644 index 00000000..74261403 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala @@ -0,0 +1,49 @@ +package kamon.instrumentation + +import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} +import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect} + +class ActorCage(val name: String, val size: Int) { + + def doIt: Unit = println("name") +} + +trait CageMonitoring { + def histogram: Histogram + def count(value: Int): Unit +} + +class CageMonitoringImp extends CageMonitoring{ + final val histogram = new Histogram(new ExponentiallyDecayingReservoir()) + + def count(value: Int) = histogram.update(value) + +} + + +@Aspect +class InceptionAspect { + + @DeclareMixin("kamon.instrumentation.ActorCage") + def mixin: CageMonitoring = new CageMonitoringImp + + + @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)") + def theActorCageDidIt(actorCage: CageMonitoring) = {} + + @After("theActorCageDidIt(actorCage)") + def afterDoingIt(actorCage: CageMonitoring) = { + actorCage.count(1) + actorCage.histogram.getSnapshot.dump(System.out) + } + + + +} + + +object Runner extends App { + val cage = new ActorCage("ivan", 10) + + cage.doIt +} |