From d3976b2b46f8fed2d748f4b7539ee466e18b597b Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 21 May 2014 18:17:30 -0300 Subject: = core: avoid using tuples in favor of case classes --- .../ActorMessagePassingTracing.scala | 7 ++-- .../akka/instrumentation/DispatcherTracing.scala | 48 ++++++++++++++++++++-- 2 files changed, 49 insertions(+), 6 deletions(-) (limited to 'kamon-core/src/main/scala/akka') diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index 20bfe564..6db86828 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -24,7 +24,8 @@ import kamon.trace._ import kamon.metrics.{ ActorMetrics, Metrics } import kamon.Kamon import kamon.metrics.ActorMetrics.ActorMetricRecorder -import kamon.metrics.instruments.counter.MinMaxCounter +import kamon.metrics.instruments.MinMaxCounter +import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement @Aspect class BehaviourInvokeTracing { @@ -46,11 +47,11 @@ class BehaviourInvokeTracing { cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder { cellWithMetrics.actorMetricsRecorder.map { am ⇒ import am.mailboxSize._ - val (min, max, sum) = cellWithMetrics.queueSize.collect() + val CounterMeasurement(min, max, current) = cellWithMetrics.queueSize.collect() record(min) record(max) - record(sum) + record(current) } } } diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala index 63e96fa0..0747f0d3 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala @@ -17,12 +17,15 @@ package akka.instrumentation import org.aspectj.lang.annotation._ -import akka.dispatch.{ Dispatcher, MessageDispatcher } +import akka.dispatch.{ ExecutorServiceDelegate, Dispatcher, MessageDispatcher } import kamon.metrics.{ Metrics, DispatcherMetrics } import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder import kamon.Kamon import akka.actor.{ Cancellable, ActorSystemImpl } -import kamon.metrics.dispatcher.DispatcherMetricsCollector +import scala.concurrent.forkjoin.ForkJoinPool +import java.util.concurrent.ThreadPoolExecutor +import java.lang.reflect.Method +import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement @Aspect class DispatcherTracing { @@ -60,7 +63,8 @@ class DispatcherTracing { dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder { dispatcherWithMetrics.dispatcherMetricsRecorder.map { dm ⇒ - val (maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = DispatcherMetricsCollector.collect(dispatcher) + val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = + DispatcherMetricsCollector.collect(dispatcher) dm.maximumPoolSize.record(maximumPoolSize) dm.runningThreadCount.record(runningThreadCount) @@ -77,6 +81,7 @@ class DispatcherTracing { @After("onDispatcherShutdown(dispatcher)") def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = { val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics] + dispatcherWithMetrics.dispatcherCollectorCancellable.cancel() Kamon(Metrics)(actorSystem).unregister(dispatcherWithMetrics.metricIdentity) } @@ -95,3 +100,40 @@ trait DispatcherMessageMetrics { var dispatcherCollectorCancellable: Cancellable = _ } +object DispatcherMetricsCollector { + + case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long) + + private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = { + DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount, + (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize) + } + private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = { + DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize) + } + + private val executorServiceMethod: Method = { + // executorService is protected + val method = classOf[Dispatcher].getDeclaredMethod("executorService") + method.setAccessible(true) + method + } + + def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = { + dispatcher match { + case x: Dispatcher ⇒ { + val executor = executorServiceMethod.invoke(x) match { + case delegate: ExecutorServiceDelegate ⇒ delegate.executor + case other ⇒ other + } + + executor match { + case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp) + case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe) + case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) + } + } + case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L) + } + } +} -- cgit v1.2.3