From 01a34f67ff75419c440f2e69c0a0db888a670a34 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 12 Jan 2015 01:45:27 +0100 Subject: ! all: improve the metric recorders infrastructure --- .../DispatcherInstrumentation.scala | 168 +++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala (limited to 'kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala') diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala new file mode 100644 index 00000000..f4bc31c4 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala @@ -0,0 +1,168 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.kamon.instrumentation + +import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor } + +import akka.actor.{ ActorSystem, ActorSystemImpl } +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool +import akka.dispatch._ +import akka.kamon.instrumentation.LookupDataAware.LookupData +import kamon.akka.{ AkkaDispatcherMetrics, ThreadPoolExecutorDispatcherMetrics, ForkJoinPoolDispatcherMetrics } +import kamon.metric.{ Metrics, Entity } +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class DispatcherInstrumentation { + + @Pointcut("execution(* akka.actor.ActorSystemImpl.start(..)) && this(system)") + def actorSystemInitialization(system: ActorSystemImpl): Unit = {} + + @Before("actorSystemInitialization(system)") + def afterActorSystemInitialization(system: ActorSystemImpl): Unit = { + system.dispatchers.asInstanceOf[ActorSystemAware].actorSystem = system + + // The default dispatcher for the actor system is looked up in the ActorSystemImpl's initialization code and we + // can't get the Metrics extension there since the ActorSystem is not yet fully constructed. To workaround that + // we are manually selecting and registering the default dispatcher with the Metrics extension. All other dispatchers + // will by registered by the instrumentation bellow. + + // Yes, reflection sucks, but this piece of code is only executed once on ActorSystem's startup. + val defaultDispatcher = system.dispatcher + val executorServiceDelegateField = defaultDispatcher.getClass.getDeclaredField("executorServiceDelegate") + executorServiceDelegateField.setAccessible(true) + + val lazyExecutorServiceDelegate = executorServiceDelegateField.get(defaultDispatcher) + val executorField = lazyExecutorServiceDelegate.getClass.getMethod("executor") + executorField.setAccessible(true) + + val defaultDispatcherExecutor = executorField.invoke(lazyExecutorServiceDelegate).asInstanceOf[ExecutorService] + registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system) + } + + private def registerDispatcher(dispatcherName: String, executorService: ExecutorService, system: ActorSystem): Unit = + executorService match { + case fjp: AkkaForkJoinPool ⇒ + Metrics.get(system).register(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherName) + + case tpe: ThreadPoolExecutor ⇒ + Metrics.get(system).register(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherName) + + case others ⇒ // Currently not interested in other kinds of dispatchers. + } + + @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers) && args(dispatcherName)") + def dispatchersLookup(dispatchers: ActorSystemAware, dispatcherName: String) = {} + + @Around("dispatchersLookup(dispatchers, dispatcherName)") + def aroundDispatchersLookup(pjp: ProceedingJoinPoint, dispatchers: ActorSystemAware, dispatcherName: String): Any = + LookupDataAware.withLookupData(LookupData(dispatcherName, dispatchers.actorSystem)) { + pjp.proceed() + } + + @Pointcut("initialization(akka.dispatch.ExecutorServiceFactory.new(..)) && target(factory)") + def executorServiceFactoryInitialization(factory: LookupDataAware): Unit = {} + + @After("executorServiceFactoryInitialization(factory)") + def afterExecutorServiceFactoryInitialization(factory: LookupDataAware): Unit = + factory.lookupData = LookupDataAware.currentLookupData + + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactory+.createExecutorService()) && this(factory) && !cflow(execution(* akka.dispatch.Dispatcher.shutdown()))") + def createExecutorService(factory: LookupDataAware): Unit = {} + + @AfterReturning(pointcut = "createExecutorService(factory)", returning = "executorService") + def afterCreateExecutorService(factory: LookupDataAware, executorService: ExecutorService): Unit = { + val lookupData = factory.lookupData + + // lookupData.actorSystem will be null only during the first lookup of the default dispatcher during the + // ActorSystemImpl's initialization. + if (lookupData.actorSystem != null) + registerDispatcher(lookupData.dispatcherName, executorService, lookupData.actorSystem) + } + + @Pointcut("initialization(akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.new(..)) && this(lazyExecutor)") + def lazyExecutorInitialization(lazyExecutor: LookupDataAware): Unit = {} + + @After("lazyExecutorInitialization(lazyExecutor)") + def afterLazyExecutorInitialization(lazyExecutor: LookupDataAware): Unit = + lazyExecutor.lookupData = LookupDataAware.currentLookupData + + @Pointcut("execution(* akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.copy()) && this(lazyExecutor)") + def lazyExecutorCopy(lazyExecutor: LookupDataAware): Unit = {} + + @Around("lazyExecutorCopy(lazyExecutor)") + def aroundLazyExecutorCopy(pjp: ProceedingJoinPoint, lazyExecutor: LookupDataAware): Any = + LookupDataAware.withLookupData(lazyExecutor.lookupData) { + pjp.proceed() + } + + @Pointcut("execution(* akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.shutdown()) && this(lazyExecutor)") + def lazyExecutorShutdown(lazyExecutor: LookupDataAware): Unit = {} + + @After("lazyExecutorShutdown(lazyExecutor)") + def afterLazyExecutorShutdown(lazyExecutor: LookupDataAware): Unit = { + import lazyExecutor.lookupData + + if (lookupData.actorSystem != null) + Metrics.get(lookupData.actorSystem).unregister(Entity(lookupData.dispatcherName, AkkaDispatcherMetrics.Category)) + } + +} + +@Aspect +class DispatcherMetricCollectionInfoIntoDispatcherMixin { + + @DeclareMixin("akka.dispatch.Dispatchers") + def mixinActorSystemAwareToDispatchers: ActorSystemAware = ActorSystemAware() + + @DeclareMixin("akka.dispatch.Dispatcher.LazyExecutorServiceDelegate") + def mixinLookupDataAwareToExecutors: LookupDataAware = LookupDataAware() + + @DeclareMixin("akka.dispatch.ExecutorServiceFactory+") + def mixinActorSystemAwareToDispatcher: LookupDataAware = LookupDataAware() +} + +trait ActorSystemAware { + @volatile var actorSystem: ActorSystem = _ +} + +object ActorSystemAware { + def apply(): ActorSystemAware = new ActorSystemAware {} +} + +trait LookupDataAware { + @volatile var lookupData: LookupData = _ +} + +object LookupDataAware { + case class LookupData(dispatcherName: String, actorSystem: ActorSystem) + + private val _currentDispatcherLookupData = new ThreadLocal[LookupData] + + def apply() = new LookupDataAware {} + + def currentLookupData: LookupData = _currentDispatcherLookupData.get() + + def withLookupData[T](lookupData: LookupData)(thunk: ⇒ T): T = { + _currentDispatcherLookupData.set(lookupData) + val result = thunk + _currentDispatcherLookupData.remove() + + result + } +} \ No newline at end of file -- cgit v1.2.3