aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala22
1 files changed, 14 insertions, 8 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
index 0747f0d3..4918fd04 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
@@ -17,11 +17,11 @@
package akka.instrumentation
import org.aspectj.lang.annotation._
-import akka.dispatch.{ ExecutorServiceDelegate, Dispatcher, MessageDispatcher }
-import kamon.metrics.{ Metrics, DispatcherMetrics }
+import akka.dispatch.{ExecutorServiceDelegate, Dispatcher, MessageDispatcher}
+import kamon.metrics.{Metrics, DispatcherMetrics}
import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder
import kamon.Kamon
-import akka.actor.{ Cancellable, ActorSystemImpl }
+import akka.actor.{Cancellable, ActorSystemImpl}
import scala.concurrent.forkjoin.ForkJoinPool
import java.util.concurrent.ThreadPoolExecutor
import java.lang.reflect.Method
@@ -82,9 +82,14 @@ class DispatcherTracing {
def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {
val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
- dispatcherWithMetrics.dispatcherCollectorCancellable.cancel()
- Kamon(Metrics)(actorSystem).unregister(dispatcherWithMetrics.metricIdentity)
+ dispatcherWithMetrics.dispatcherMetricsRecorder.map {
+ dispatcher =>
+ dispatcherWithMetrics.dispatcherCollectorCancellable.cancel()
+ Kamon(Metrics)(actorSystem).unregister(dispatcherWithMetrics.metricIdentity)
+ }
}
+
+
}
@Aspect
@@ -108,6 +113,7 @@ object DispatcherMetricsCollector {
DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount,
(pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize)
}
+
private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = {
DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize)
}
@@ -124,13 +130,13 @@ object DispatcherMetricsCollector {
case x: Dispatcher ⇒ {
val executor = executorServiceMethod.invoke(x) match {
case delegate: ExecutorServiceDelegate ⇒ delegate.executor
- case other ⇒ other
+ case other ⇒ other
}
executor match {
- case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp)
+ case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp)
case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe)
- case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
+ case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
}
}
case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)