aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-06-05 22:49:22 -0300
committerDiego <diegolparra@gmail.com>2014-06-05 22:49:22 -0300
commitc985fe9b26d7fd91404b887a8c1a851fbf478208 (patch)
treeb014757bbf0d05ea914923adc64068b0cae5bba1
parentd211435ab2602e2fda0be4f5ea85614feec02b85 (diff)
downloadKamon-c985fe9b26d7fd91404b887a8c1a851fbf478208.tar.gz
Kamon-c985fe9b26d7fd91404b887a8c1a851fbf478208.tar.bz2
Kamon-c985fe9b26d7fd91404b887a8c1a851fbf478208.zip
= core: fixes #37
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala22
1 files changed, 14 insertions, 8 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
index 0747f0d3..4918fd04 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
@@ -17,11 +17,11 @@
package akka.instrumentation
import org.aspectj.lang.annotation._
-import akka.dispatch.{ ExecutorServiceDelegate, Dispatcher, MessageDispatcher }
-import kamon.metrics.{ Metrics, DispatcherMetrics }
+import akka.dispatch.{ExecutorServiceDelegate, Dispatcher, MessageDispatcher}
+import kamon.metrics.{Metrics, DispatcherMetrics}
import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder
import kamon.Kamon
-import akka.actor.{ Cancellable, ActorSystemImpl }
+import akka.actor.{Cancellable, ActorSystemImpl}
import scala.concurrent.forkjoin.ForkJoinPool
import java.util.concurrent.ThreadPoolExecutor
import java.lang.reflect.Method
@@ -82,9 +82,14 @@ class DispatcherTracing {
def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {
val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
- dispatcherWithMetrics.dispatcherCollectorCancellable.cancel()
- Kamon(Metrics)(actorSystem).unregister(dispatcherWithMetrics.metricIdentity)
+ dispatcherWithMetrics.dispatcherMetricsRecorder.map {
+ dispatcher =>
+ dispatcherWithMetrics.dispatcherCollectorCancellable.cancel()
+ Kamon(Metrics)(actorSystem).unregister(dispatcherWithMetrics.metricIdentity)
+ }
}
+
+
}
@Aspect
@@ -108,6 +113,7 @@ object DispatcherMetricsCollector {
DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount,
(pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize)
}
+
private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = {
DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize)
}
@@ -124,13 +130,13 @@ object DispatcherMetricsCollector {
case x: Dispatcher ⇒ {
val executor = executorServiceMethod.invoke(x) match {
case delegate: ExecutorServiceDelegate ⇒ delegate.executor
- case other ⇒ other
+ case other ⇒ other
}
executor match {
- case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp)
+ case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp)
case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe)
- case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
+ case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
}
}
case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)