diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/instrumentation')
-rw-r--r-- | kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala (renamed from kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala) | 27 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala | 18 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala | 47 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala | 4 |
4 files changed, 71 insertions, 25 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala index 252e5220..297017cf 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/AspectJPimps.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala @@ -15,24 +15,23 @@ * ========================================================== */ package kamon.instrumentation +import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect } import org.aspectj.lang.ProceedingJoinPoint +import kamon.trace.{ ContextAware, Trace } -trait ProceedingJoinPointPimp { - import language.implicitConversions +@Aspect +class ActorLoggingTracing { - implicit def pimpProceedingJointPoint(pjp: ProceedingJoinPoint) = RichProceedingJointPoint(pjp) -} - -object ProceedingJoinPointPimp extends ProceedingJoinPointPimp + @DeclareMixin("akka.event.Logging.LogEvent+") + def mixin: ContextAware = ContextAware.default -case class RichProceedingJointPoint(pjp: ProceedingJoinPoint) { - def proceedWith(newUniqueArg: AnyRef) = { - val args = pjp.getArgs - args.update(0, newUniqueArg) - pjp.proceed(args) - } + @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") + def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = {} - def proceedWithTarget(args: AnyRef*) = { - pjp.proceed(args.toArray) + @Around("withMdcInvocation(logSource, logEvent, logStatement)") + def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = { + Trace.withContext(logEvent.traceContext) { + pjp.proceed() + } } } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index a3da76f7..90d2b270 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -19,10 +19,8 @@ import org.aspectj.lang.annotation._ import java.util.concurrent._ import org.aspectj.lang.ProceedingJoinPoint import java.util -import kamon.metric.{ DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector } import akka.dispatch.{ MonitorableThreadFactory, ExecutorServiceFactory } import com.typesafe.config.Config -import kamon.Kamon import scala.concurrent.forkjoin.ForkJoinPool import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool @@ -41,8 +39,8 @@ class ActorSystemInstrumentation { @Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))") class ForkJoinPoolInstrumentation { - var activeThreadsHistogram: Histogram = _ - var poolSizeHistogram: Histogram = _ + /* var activeThreadsHistogram: Histogram = _ + var poolSizeHistogram: Histogram = _*/ @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)") def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {} @@ -71,8 +69,8 @@ class ForkJoinPoolInstrumentation { @After("forkJoinScan(fjp)") def updateMetrics(fjp: AkkaForkJoinPool): Unit = { - activeThreadsHistogram.update(fjp.getActiveThreadCount) - poolSizeHistogram.update(fjp.getPoolSize) + /*activeThreadsHistogram.update(fjp.getActiveThreadCount) + poolSizeHistogram.update(fjp.getPoolSize)*/ } } @@ -90,6 +88,7 @@ trait WatchedExecutorService { def collector: ExecutorServiceCollector } +/* trait ExecutorServiceMonitoring { def dispatcherMetrics: DispatcherMetricCollector } @@ -97,6 +96,7 @@ trait ExecutorServiceMonitoring { class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring { @volatile var dispatcherMetrics: DispatcherMetricCollector = _ } +*/ case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = delegate.createExecutorService @@ -133,9 +133,9 @@ class NamedExecutorServiceFactoryDelegateInstrumentation { @Around("factoryMethodCall(namedFactory)") def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = { val delegate = pjp.proceed().asInstanceOf[ExecutorService] - val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) + val executorFullName = "" //MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName) - ExecutorServiceMetricCollector.register(executorFullName, delegate) + //ExecutorServiceMetricCollector.register(executorFullName, delegate) new NamedExecutorServiceDelegate(executorFullName, delegate) } @@ -143,7 +143,7 @@ class NamedExecutorServiceFactoryDelegateInstrumentation { case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService { def shutdown() = { - ExecutorServiceMetricCollector.deregister(fullName) + //ExecutorServiceMetricCollector.deregister(fullName) delegate.shutdown() } def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala new file mode 100644 index 00000000..5600d582 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala @@ -0,0 +1,47 @@ +/* =================================================== + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import kamon.trace.{ ContextAware, TraceContext, Trace } + +@Aspect +class FutureTracing { + + @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") + def mixin: ContextAware = ContextAware.default + + @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)") + def futureRelatedRunnableCreation(runnable: ContextAware): Unit = {} + + @After("futureRelatedRunnableCreation(runnable)") + def afterCreation(runnable: ContextAware): Unit = { + // Force traceContext initialization. + runnable.traceContext + } + + @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)") + def futureRelatedRunnableExecution(runnable: ContextAware) = {} + + @Around("futureRelatedRunnableExecution(runnable)") + def aroundExecution(pjp: ProceedingJoinPoint, runnable: ContextAware): Any = { + Trace.withContext(runnable.traceContext) { + pjp.proceed() + } + } + +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala index da797fa1..44eb8c43 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -15,17 +15,16 @@ * ========================================================== */ package kamon.instrumentation -import com.codahale.metrics.{ ExponentiallyDecayingReservoir, Histogram } import akka.dispatch.{ UnboundedMessageQueueSemantics, Envelope, MessageQueue } import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect } import akka.actor.{ ActorSystem, ActorRef } -import kamon.metric.{ Metrics, MetricDirectory } import org.aspectj.lang.ProceedingJoinPoint /** * For Mailboxes we would like to track the queue size and message latency. Currently the latency * will be gathered from the ActorCellMetrics. */ +/* @Aspect class MessageQueueInstrumentation { @@ -74,4 +73,5 @@ class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: def hasMessages: Boolean = delegate.hasMessages def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) } +*/ |