aboutsummaryrefslogblamecommitdiff
path: root/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala
blob: 90d2b2701421c3c5f7532985f10f7623d432e2d6 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15














                                                                           





                                           
                                                                         
                                 


                                                                  








                                                                                                          
                                            




                                                                                                                                                                 

                                               





                                                                                                                                                                  
                                                                   








                                                                                                  
       


                                                                                                 
                                                                                                                                                                    

   




                                                                                        

                                                             

   

 












                                              
  






                                                                       
  
 
















                                                                                                                                                                  

                                                                                                                                                               






                                                                                      








                                                                                                                                               
                                                                                                                            
 
                                                                         






                                                                                                              
                                                         















                                                                                                                                                            
/* ===================================================
 * Copyright © 2013 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ========================================================== */
package kamon.instrumentation

import org.aspectj.lang.annotation._
import java.util.concurrent._
import org.aspectj.lang.ProceedingJoinPoint
import java.util
import akka.dispatch.{ MonitorableThreadFactory, ExecutorServiceFactory }
import com.typesafe.config.Config
import scala.concurrent.forkjoin.ForkJoinPool
import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool

@Aspect
class ActorSystemInstrumentation {

  @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && args(name, applicationConfig, classLoader)")
  def actorSystemInstantiation(name: String, applicationConfig: Config, classLoader: ClassLoader) = {}

  @After("actorSystemInstantiation(name, applicationConfig, classLoader)")
  def registerActorSystem(name: String, applicationConfig: Config, classLoader: ClassLoader): Unit = {

    //Kamon.Metric.registerActorSystem(name)
  }
}

@Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))")
class ForkJoinPoolInstrumentation {
  /*  var activeThreadsHistogram: Histogram = _
  var poolSizeHistogram: Histogram = _*/

  @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)")
  def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {}

  @After("forkJoinPoolInstantiation(parallelism, threadFactory, exceptionHandler)")
  def initializeMetrics(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler): Unit = {
    /*val (actorSystemName, dispatcherName) = threadFactory match {
      case mtf: MonitorableThreadFactory => splitName(mtf.name, Kamon.Metric.actorSystemNames)
      case _ => ("Unknown", "Unknown")
    }

    val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName)
    for(m <- metrics) {
      activeThreadsHistogram = m.activeThreadCount
      poolSizeHistogram = m.poolSize
      println(s"Registered $dispatcherName for actor system $actorSystemName")
    }*/
  }

  def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = {
    knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName  (asName, threadFactoryName.substring(asName.length + 1))).getOrElse(("Unkown", "Unkown"))
  }

  @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)")
  def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {}

  @After("forkJoinScan(fjp)")
  def updateMetrics(fjp: AkkaForkJoinPool): Unit = {
    /*activeThreadsHistogram.update(fjp.getActiveThreadCount)
    poolSizeHistogram.update(fjp.getPoolSize)*/
  }

}

/**
 *  ExecutorService monitoring base:
 */
trait ExecutorServiceCollector {
  def updateActiveThreadCount(diff: Int): Unit
  def updateTotalThreadCount(diff: Int): Unit
  def updateQueueSize(diff: Int): Unit
}

trait WatchedExecutorService {
  def collector: ExecutorServiceCollector
}

/*
trait ExecutorServiceMonitoring {
  def dispatcherMetrics: DispatcherMetricCollector
}

class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring {
  @volatile var dispatcherMetrics: DispatcherMetricCollector = _
}
*/

case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
  def createExecutorService: ExecutorService = delegate.createExecutorService
}

@Aspect
class ExecutorServiceFactoryProviderInstrumentation {

  @Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(dispatcherName, threadFactory) && if()")
  def factoryMethodCall(dispatcherName: String, threadFactory: ThreadFactory): Boolean = {
    true
  }

  @Around("factoryMethodCall(dispatcherName, threadFactory)")
  def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, dispatcherName: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
    val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast

    val actorSystemName = threadFactory match {
      case m: MonitorableThreadFactory  m.name
      case _                            "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name.
    }

    new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate)
  }

}

@Aspect
class NamedExecutorServiceFactoryDelegateInstrumentation {

  @Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)")
  def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {}

  @Around("factoryMethodCall(namedFactory)")
  def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = {
    val delegate = pjp.proceed().asInstanceOf[ExecutorService]
    val executorFullName = "" //MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)

    //ExecutorServiceMetricCollector.register(executorFullName, delegate)

    new NamedExecutorServiceDelegate(executorFullName, delegate)
  }
}

case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService {
  def shutdown() = {
    //ExecutorServiceMetricCollector.deregister(fullName)
    delegate.shutdown()
  }
  def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
  def isShutdown: Boolean = delegate.isShutdown
  def isTerminated: Boolean = delegate.isTerminated
  def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit)
  def submit[T](task: Callable[T]): Future[T] = delegate.submit(task)
  def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result)
  def submit(task: Runnable): Future[_] = delegate.submit(task)
  def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks)
  def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit)
  def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks)
  def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit)
  def execute(command: Runnable) = delegate.execute(command)
}