aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/kamon/instrumentation
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-06-11 18:29:17 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-06-11 18:29:17 -0300
commit84c9ae342ea4a280b0033d9d78239b19b01b728f (patch)
tree7cc9730bc8f0c89410f4655988922e2b679549b5 /src/main/scala/kamon/instrumentation
parent197746563e47783ed4b5f43e94c9aa63734081f6 (diff)
downloadKamon-84c9ae342ea4a280b0033d9d78239b19b01b728f.tar.gz
Kamon-84c9ae342ea4a280b0033d9d78239b19b01b728f.tar.bz2
Kamon-84c9ae342ea4a280b0033d9d78239b19b01b728f.zip
wip
Diffstat (limited to 'src/main/scala/kamon/instrumentation')
-rw-r--r--src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala71
-rw-r--r--src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala6
2 files changed, 77 insertions, 0 deletions
diff --git a/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala b/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala
new file mode 100644
index 00000000..35e06b5d
--- /dev/null
+++ b/src/main/scala/kamon/instrumentation/DispatcherInstrumentation.scala
@@ -0,0 +1,71 @@
+package akka.dispatch
+
+import org.aspectj.lang.annotation._
+import java.util.concurrent._
+import scala.concurrent.forkjoin.ForkJoinPool
+import org.aspectj.lang.ProceedingJoinPoint
+import java.util
+import akka.dispatch.NamedExecutorServiceFactoryDelegate
+import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector}
+
+
+case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
+ def createExecutorService: ExecutorService = delegate.createExecutorService
+}
+
+@Aspect
+class ExecutorServiceFactoryProviderInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(id, threadFactory)")
+ def factoryMethodCall(id: String, threadFactory: ThreadFactory) = {}
+
+ @Around("factoryMethodCall(id, threadFactory)")
+ def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
+ val delegate = pjp.proceed(Array[AnyRef](id, threadFactory)).asInstanceOf[ExecutorServiceFactory] // Safe Cast
+
+ val actorSystemName = threadFactory match {
+ case m: MonitorableThreadFactory => m.name
+ case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name.
+ }
+
+ new NamedExecutorServiceFactoryDelegate(actorSystemName, id, delegate)
+ }
+
+}
+
+
+@Aspect
+class NamedExecutorServiceFactoryDelegateInstrumentation {
+
+ @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)")
+ def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {}
+
+ @Around("factoryMethodCall(namedFactory)")
+ def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = {
+ val delegate = pjp.proceed(Array[AnyRef](namedFactory)).asInstanceOf[ExecutorService]
+ val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)
+
+ ExecutorServiceMetricCollector.register(executorFullName, delegate)
+
+ new NamedExecutorServiceDelegate(executorFullName, delegate)
+ }
+}
+
+case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService {
+ def shutdown() = {
+ ExecutorServiceMetricCollector.deregister(fullName)
+ delegate.shutdown()
+ }
+ def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
+ def isShutdown: Boolean = delegate.isShutdown
+ def isTerminated: Boolean = delegate.isTerminated
+ def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit)
+ def submit[T](task: Callable[T]): Future[T] = delegate.submit(task)
+ def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result)
+ def submit(task: Runnable): Future[_] = delegate.submit(task)
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks)
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit)
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks)
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit)
+ def execute(command: Runnable) = delegate.execute(command)
+} \ No newline at end of file
diff --git a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
index ef908625..e75a638f 100644
--- a/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
+++ b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
@@ -45,6 +45,12 @@ class RunnableInstrumentation {
*/
import kamon.TraceContextSwap.withContext
+ @Before("instrumentedRunnableCreation()")
+ def beforeCreation = {
+ //println((new Throwable).getStackTraceString)
+ }
+
+
@Around("runnableExecution()")
def around(pjp: ProceedingJoinPoint) = {
import pjp._