From 613cf9a2b326558ac7819720e18aecd869c8f35c Mon Sep 17 00:00:00 2001 From: Diego Date: Thu, 5 Jun 2014 22:49:22 -0300 Subject: = core: fixes #37 --- .../akka/instrumentation/DispatcherTracing.scala | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) (limited to 'kamon-core') diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala index 0747f0d3..4918fd04 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala @@ -17,11 +17,11 @@ package akka.instrumentation import org.aspectj.lang.annotation._ -import akka.dispatch.{ ExecutorServiceDelegate, Dispatcher, MessageDispatcher } -import kamon.metrics.{ Metrics, DispatcherMetrics } +import akka.dispatch.{ExecutorServiceDelegate, Dispatcher, MessageDispatcher} +import kamon.metrics.{Metrics, DispatcherMetrics} import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder import kamon.Kamon -import akka.actor.{ Cancellable, ActorSystemImpl } +import akka.actor.{Cancellable, ActorSystemImpl} import scala.concurrent.forkjoin.ForkJoinPool import java.util.concurrent.ThreadPoolExecutor import java.lang.reflect.Method @@ -82,9 +82,14 @@ class DispatcherTracing { def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] - dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() - Kamon(Metrics)(actorSystem).unregister(dispatcherWithMetrics.metricIdentity) + dispatcherWithMetrics.dispatcherMetricsRecorder.map { + dispatcher => + dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() + Kamon(Metrics)(actorSystem).unregister(dispatcherWithMetrics.metricIdentity) + } } + + } @Aspect @@ -108,6 +113,7 @@ object DispatcherMetricsCollector { DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) } + private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) } @@ -124,13 +130,13 @@ object DispatcherMetricsCollector { case x: Dispatcher ⇒ { val executor = executorServiceMethod.invoke(x) match { case delegate: ExecutorServiceDelegate ⇒ delegate.executor - case other ⇒ other + case other ⇒ other } executor match { - case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) + case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) - case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) + case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) } } case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) -- cgit v1.2.3