1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
package akka.dispatch
import org.aspectj.lang.annotation._
import java.util.concurrent._
import scala.concurrent.forkjoin.ForkJoinPool
import org.aspectj.lang.ProceedingJoinPoint
import java.util
import akka.dispatch.NamedExecutorServiceFactoryDelegate
import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector}
case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = delegate.createExecutorService
}
@Aspect
class ExecutorServiceFactoryProviderInstrumentation {
@Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(id, threadFactory)")
def factoryMethodCall(id: String, threadFactory: ThreadFactory) = {}
@Around("factoryMethodCall(id, threadFactory)")
def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
val delegate = pjp.proceed(Array[AnyRef](id, threadFactory)).asInstanceOf[ExecutorServiceFactory] // Safe Cast
val actorSystemName = threadFactory match {
case m: MonitorableThreadFactory => m.name
case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name.
}
new NamedExecutorServiceFactoryDelegate(actorSystemName, id, delegate)
}
}
@Aspect
class NamedExecutorServiceFactoryDelegateInstrumentation {
@Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)")
def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {}
@Around("factoryMethodCall(namedFactory)")
def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = {
val delegate = pjp.proceed(Array[AnyRef](namedFactory)).asInstanceOf[ExecutorService]
val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)
ExecutorServiceMetricCollector.register(executorFullName, delegate)
new NamedExecutorServiceDelegate(executorFullName, delegate)
}
}
case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService {
def shutdown() = {
ExecutorServiceMetricCollector.deregister(fullName)
delegate.shutdown()
}
def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
def isShutdown: Boolean = delegate.isShutdown
def isTerminated: Boolean = delegate.isTerminated
def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit)
def submit[T](task: Callable[T]): Future[T] = delegate.submit(task)
def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result)
def submit(task: Runnable): Future[_] = delegate.submit(task)
def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks)
def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit)
def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks)
def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit)
def execute(command: Runnable) = delegate.execute(command)
}
|