From 9ca0bffb230baab4d79f30ceabfcb453387fe952 Mon Sep 17 00:00:00 2001 From: Diego Date: Sat, 7 Jun 2014 17:03:56 -0300 Subject: = core : fixes #38 * I've changed the way to get the Actorsystem inside of DispatcerTracing, passing the actorSystem across the Dispatchers and then to each Dispatcher in the same ActorSystem. --- .../akka/instrumentation/DispatcherTracing.scala | 53 ++++++++++++++-------- 1 file changed, 35 insertions(+), 18 deletions(-) (limited to 'kamon-core/src/main') diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala index 4918fd04..69b78e5e 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala @@ -17,11 +17,11 @@ package akka.instrumentation import org.aspectj.lang.annotation._ -import akka.dispatch.{ExecutorServiceDelegate, Dispatcher, MessageDispatcher} -import kamon.metrics.{Metrics, DispatcherMetrics} +import akka.dispatch.{ Dispatchers, ExecutorServiceDelegate, Dispatcher, MessageDispatcher } +import kamon.metrics.{ Metrics, DispatcherMetrics } import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder import kamon.Kamon -import akka.actor.{Cancellable, ActorSystemImpl} +import akka.actor.{ Cancellable, ActorSystemImpl } import scala.concurrent.forkjoin.ForkJoinPool import java.util.concurrent.ThreadPoolExecutor import java.lang.reflect.Method @@ -30,14 +30,24 @@ import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurem @Aspect class DispatcherTracing { - private[this] var actorSystem: ActorSystemImpl = _ + @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))") + def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {} - @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && this(system)") - def onActorSystemStartup(system: ActorSystemImpl) = {} + @Before("onActorSystemStartup(dispatchers, system)") + def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = { + val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem] + currentDispatchers.actorSystem = system + } + + @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)") + def onDispatchersLookup(dispatchers: Dispatchers) = {} + + @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher") + def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = { + val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem] + val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] - @Before("onActorSystemStartup(system)") - def beforeActorSystemStartup(system: ActorSystemImpl): Unit = { - actorSystem = system + dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem } @Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))") @@ -52,9 +62,9 @@ class DispatcherTracing { @After("onDispatcherStartup(dispatcher)") def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = { - val metricsExtension = Kamon(Metrics)(actorSystem) - val metricIdentity = DispatcherMetrics(dispatcher.id) val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] + val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem) + val metricIdentity = DispatcherMetrics(dispatcher.id) dispatcherWithMetrics.metricIdentity = metricIdentity dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory) @@ -83,13 +93,12 @@ class DispatcherTracing { val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] dispatcherWithMetrics.dispatcherMetricsRecorder.map { - dispatcher => + dispatcher ⇒ + println("dispatcher down => " + dispatcher) dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() - Kamon(Metrics)(actorSystem).unregister(dispatcherWithMetrics.metricIdentity) + Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity) } } - - } @Aspect @@ -97,12 +106,20 @@ class DispatcherMetricsMixin { @DeclareMixin("akka.dispatch.Dispatcher") def mixinDispatcherMetricsToMessageDispatcher: DispatcherMessageMetrics = new DispatcherMessageMetrics {} + + @DeclareMixin("akka.dispatch.Dispatchers") + def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {} } trait DispatcherMessageMetrics { var metricIdentity: DispatcherMetrics = _ var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _ var dispatcherCollectorCancellable: Cancellable = _ + var actorSystem: ActorSystemImpl = _ +} + +trait DispatchersWithActorSystem { + var actorSystem: ActorSystemImpl = _ } object DispatcherMetricsCollector { @@ -130,13 +147,13 @@ object DispatcherMetricsCollector { case x: Dispatcher ⇒ { val executor = executorServiceMethod.invoke(x) match { case delegate: ExecutorServiceDelegate ⇒ delegate.executor - case other ⇒ other + case other ⇒ other } executor match { - case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) + case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) - case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) + case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) } } case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) -- cgit v1.2.3