aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/akka/instrumentation
diff options
context:
space:
mode:
authorIvan Topolnak <itopolnak@despegar.com>2014-05-21 18:17:30 -0300
committerIvan Topolnak <itopolnak@despegar.com>2014-05-21 18:17:30 -0300
commitd3976b2b46f8fed2d748f4b7539ee466e18b597b (patch)
tree1ff6bbcbdd52d255f321a3df7491822a2e6385c7 /kamon-core/src/main/scala/akka/instrumentation
parentd072dbf8df995840f071e08b9daeefbed6285857 (diff)
downloadKamon-d3976b2b46f8fed2d748f4b7539ee466e18b597b.tar.gz
Kamon-d3976b2b46f8fed2d748f4b7539ee466e18b597b.tar.bz2
Kamon-d3976b2b46f8fed2d748f4b7539ee466e18b597b.zip
= core: avoid using tuples in favor of case classes
Diffstat (limited to 'kamon-core/src/main/scala/akka/instrumentation')
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala7
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala48
2 files changed, 49 insertions, 6 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
index 20bfe564..6db86828 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -24,7 +24,8 @@ import kamon.trace._
import kamon.metrics.{ ActorMetrics, Metrics }
import kamon.Kamon
import kamon.metrics.ActorMetrics.ActorMetricRecorder
-import kamon.metrics.instruments.counter.MinMaxCounter
+import kamon.metrics.instruments.MinMaxCounter
+import kamon.metrics.instruments.MinMaxCounter.CounterMeasurement
@Aspect
class BehaviourInvokeTracing {
@@ -46,11 +47,11 @@ class BehaviourInvokeTracing {
cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder {
cellWithMetrics.actorMetricsRecorder.map { am ⇒
import am.mailboxSize._
- val (min, max, sum) = cellWithMetrics.queueSize.collect()
+ val CounterMeasurement(min, max, current) = cellWithMetrics.queueSize.collect()
record(min)
record(max)
- record(sum)
+ record(current)
}
}
}
diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
index 63e96fa0..0747f0d3 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
@@ -17,12 +17,15 @@
package akka.instrumentation
import org.aspectj.lang.annotation._
-import akka.dispatch.{ Dispatcher, MessageDispatcher }
+import akka.dispatch.{ ExecutorServiceDelegate, Dispatcher, MessageDispatcher }
import kamon.metrics.{ Metrics, DispatcherMetrics }
import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder
import kamon.Kamon
import akka.actor.{ Cancellable, ActorSystemImpl }
-import kamon.metrics.dispatcher.DispatcherMetricsCollector
+import scala.concurrent.forkjoin.ForkJoinPool
+import java.util.concurrent.ThreadPoolExecutor
+import java.lang.reflect.Method
+import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurement
@Aspect
class DispatcherTracing {
@@ -60,7 +63,8 @@ class DispatcherTracing {
dispatcherWithMetrics.dispatcherCollectorCancellable = metricsExtension.scheduleGaugeRecorder {
dispatcherWithMetrics.dispatcherMetricsRecorder.map {
dm ⇒
- val (maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) = DispatcherMetricsCollector.collect(dispatcher)
+ val DispatcherMetricsMeasurement(maximumPoolSize, runningThreadCount, queueTaskCount, poolSize) =
+ DispatcherMetricsCollector.collect(dispatcher)
dm.maximumPoolSize.record(maximumPoolSize)
dm.runningThreadCount.record(runningThreadCount)
@@ -77,6 +81,7 @@ class DispatcherTracing {
@After("onDispatcherShutdown(dispatcher)")
def afterDispatcherShutdown(dispatcher: MessageDispatcher): Unit = {
val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
+
dispatcherWithMetrics.dispatcherCollectorCancellable.cancel()
Kamon(Metrics)(actorSystem).unregister(dispatcherWithMetrics.metricIdentity)
}
@@ -95,3 +100,40 @@ trait DispatcherMessageMetrics {
var dispatcherCollectorCancellable: Cancellable = _
}
+object DispatcherMetricsCollector {
+
+ case class DispatcherMetricsMeasurement(maximumPoolSize: Long, runningThreadCount: Long, queueTaskCount: Long, poolSize: Long)
+
+ private def collectForkJoinMetrics(pool: ForkJoinPool): DispatcherMetricsMeasurement = {
+ DispatcherMetricsMeasurement(pool.getParallelism, pool.getActiveThreadCount,
+ (pool.getQueuedTaskCount + pool.getQueuedSubmissionCount), pool.getPoolSize)
+ }
+ private def collectExecutorMetrics(pool: ThreadPoolExecutor): DispatcherMetricsMeasurement = {
+ DispatcherMetricsMeasurement(pool.getMaximumPoolSize, pool.getActiveCount, pool.getQueue.size(), pool.getPoolSize)
+ }
+
+ private val executorServiceMethod: Method = {
+ // executorService is protected
+ val method = classOf[Dispatcher].getDeclaredMethod("executorService")
+ method.setAccessible(true)
+ method
+ }
+
+ def collect(dispatcher: MessageDispatcher): DispatcherMetricsMeasurement = {
+ dispatcher match {
+ case x: Dispatcher ⇒ {
+ val executor = executorServiceMethod.invoke(x) match {
+ case delegate: ExecutorServiceDelegate ⇒ delegate.executor
+ case other ⇒ other
+ }
+
+ executor match {
+ case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp)
+ case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe)
+ case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
+ }
+ }
+ case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
+ }
+ }
+}