aboutsummaryrefslogblamecommitdiff
path: root/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
blob: eb6b5293d8a2415b19a4a79a106ab09974806afa (plain) (tree)























                                                                                             
                  
                                                                                                               
                          

































                                                                                                                         



                                                                                          

                                      



                                                                                                
























































                                                                                                                                                                      
                                                                                                   













































                                                                               
/*
 * =========================================================================================
 * Copyright © 2013-2014 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */

package akka.kamon.instrumentation

import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor }

import akka.actor.{ ActorSystem, ActorSystemImpl }
import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
import akka.dispatch._
import akka.kamon.instrumentation.LookupDataAware.LookupData
import kamon.Kamon
import kamon.akka.{ AkkaDispatcherMetrics, ThreadPoolExecutorDispatcherMetrics, ForkJoinPoolDispatcherMetrics }
import kamon.metric.Entity
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation._

@Aspect
class DispatcherInstrumentation {

  @Pointcut("execution(* akka.actor.ActorSystemImpl.start(..)) && this(system)")
  def actorSystemInitialization(system: ActorSystemImpl): Unit = {}

  @Before("actorSystemInitialization(system)")
  def afterActorSystemInitialization(system: ActorSystemImpl): Unit = {
    system.dispatchers.asInstanceOf[ActorSystemAware].actorSystem = system

    // The default dispatcher for the actor system is looked up in the ActorSystemImpl's initialization code and we
    // can't get the Metrics extension there since the ActorSystem is not yet fully constructed. To workaround that
    // we are manually selecting and registering the default dispatcher with the Metrics extension. All other dispatchers
    // will by registered by the instrumentation bellow.

    // Yes, reflection sucks, but this piece of code is only executed once on ActorSystem's startup.
    val defaultDispatcher = system.dispatcher
    val executorServiceDelegateField = defaultDispatcher.getClass.getDeclaredField("executorServiceDelegate")
    executorServiceDelegateField.setAccessible(true)

    val lazyExecutorServiceDelegate = executorServiceDelegateField.get(defaultDispatcher)
    val executorField = lazyExecutorServiceDelegate.getClass.getMethod("executor")
    executorField.setAccessible(true)

    val defaultDispatcherExecutor = executorField.invoke(lazyExecutorServiceDelegate).asInstanceOf[ExecutorService]
    registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system)
  }

  private def registerDispatcher(dispatcherName: String, executorService: ExecutorService, system: ActorSystem): Unit =
    executorService match {
      case fjp: AkkaForkJoinPool 
        val dispatcherEntity = Entity(dispatcherName, AkkaDispatcherMetrics.Category)

        if (Kamon.metrics.shouldTrack(dispatcherEntity))
          Kamon.metrics.entity(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherName)

      case tpe: ThreadPoolExecutor 
        val dispatcherEntity = Entity(dispatcherName, AkkaDispatcherMetrics.Category)

        if (Kamon.metrics.shouldTrack(dispatcherEntity))
          Kamon.metrics.entity(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherName)

      case others  // Currently not interested in other kinds of dispatchers.
    }

  @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers) && args(dispatcherName)")
  def dispatchersLookup(dispatchers: ActorSystemAware, dispatcherName: String) = {}

  @Around("dispatchersLookup(dispatchers, dispatcherName)")
  def aroundDispatchersLookup(pjp: ProceedingJoinPoint, dispatchers: ActorSystemAware, dispatcherName: String): Any =
    LookupDataAware.withLookupData(LookupData(dispatcherName, dispatchers.actorSystem)) {
      pjp.proceed()
    }

  @Pointcut("initialization(akka.dispatch.ExecutorServiceFactory.new(..)) && target(factory)")
  def executorServiceFactoryInitialization(factory: LookupDataAware): Unit = {}

  @After("executorServiceFactoryInitialization(factory)")
  def afterExecutorServiceFactoryInitialization(factory: LookupDataAware): Unit =
    factory.lookupData = LookupDataAware.currentLookupData

  @Pointcut("execution(* akka.dispatch.ExecutorServiceFactory+.createExecutorService()) && this(factory) && !cflow(execution(* akka.dispatch.Dispatcher.shutdown()))")
  def createExecutorService(factory: LookupDataAware): Unit = {}

  @AfterReturning(pointcut = "createExecutorService(factory)", returning = "executorService")
  def afterCreateExecutorService(factory: LookupDataAware, executorService: ExecutorService): Unit = {
    val lookupData = factory.lookupData

    // lookupData.actorSystem will be null only during the first lookup of the default dispatcher during the
    // ActorSystemImpl's initialization.
    if (lookupData.actorSystem != null)
      registerDispatcher(lookupData.dispatcherName, executorService, lookupData.actorSystem)
  }

  @Pointcut("initialization(akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.new(..)) && this(lazyExecutor)")
  def lazyExecutorInitialization(lazyExecutor: LookupDataAware): Unit = {}

  @After("lazyExecutorInitialization(lazyExecutor)")
  def afterLazyExecutorInitialization(lazyExecutor: LookupDataAware): Unit =
    lazyExecutor.lookupData = LookupDataAware.currentLookupData

  @Pointcut("execution(* akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.copy()) && this(lazyExecutor)")
  def lazyExecutorCopy(lazyExecutor: LookupDataAware): Unit = {}

  @Around("lazyExecutorCopy(lazyExecutor)")
  def aroundLazyExecutorCopy(pjp: ProceedingJoinPoint, lazyExecutor: LookupDataAware): Any =
    LookupDataAware.withLookupData(lazyExecutor.lookupData) {
      pjp.proceed()
    }

  @Pointcut("execution(* akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.shutdown()) && this(lazyExecutor)")
  def lazyExecutorShutdown(lazyExecutor: LookupDataAware): Unit = {}

  @After("lazyExecutorShutdown(lazyExecutor)")
  def afterLazyExecutorShutdown(lazyExecutor: LookupDataAware): Unit = {
    import lazyExecutor.lookupData

    if (lookupData.actorSystem != null)
      Kamon.metrics.removeEntity(Entity(lookupData.dispatcherName, AkkaDispatcherMetrics.Category))
  }

}

@Aspect
class DispatcherMetricCollectionInfoIntoDispatcherMixin {

  @DeclareMixin("akka.dispatch.Dispatchers")
  def mixinActorSystemAwareToDispatchers: ActorSystemAware = ActorSystemAware()

  @DeclareMixin("akka.dispatch.Dispatcher.LazyExecutorServiceDelegate")
  def mixinLookupDataAwareToExecutors: LookupDataAware = LookupDataAware()

  @DeclareMixin("akka.dispatch.ExecutorServiceFactory+")
  def mixinActorSystemAwareToDispatcher: LookupDataAware = LookupDataAware()
}

trait ActorSystemAware {
  @volatile var actorSystem: ActorSystem = _
}

object ActorSystemAware {
  def apply(): ActorSystemAware = new ActorSystemAware {}
}

trait LookupDataAware {
  @volatile var lookupData: LookupData = _
}

object LookupDataAware {
  case class LookupData(dispatcherName: String, actorSystem: ActorSystem)

  private val _currentDispatcherLookupData = new ThreadLocal[LookupData]

  def apply() = new LookupDataAware {}

  def currentLookupData: LookupData = _currentDispatcherLookupData.get()

  def withLookupData[T](lookupData: LookupData)(thunk:  T): T = {
    _currentDispatcherLookupData.set(lookupData)
    val result = thunk
    _currentDispatcherLookupData.remove()

    result
  }
}