aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-06-07 17:03:56 -0300
committerDiego <diegolparra@gmail.com>2014-06-07 17:03:56 -0300
commit9ca0bffb230baab4d79f30ceabfcb453387fe952 (patch)
tree19df3bf9fc393d6b027fddec22a1990ced07ce45
parentc985fe9b26d7fd91404b887a8c1a851fbf478208 (diff)
downloadKamon-9ca0bffb230baab4d79f30ceabfcb453387fe952.tar.gz
Kamon-9ca0bffb230baab4d79f30ceabfcb453387fe952.tar.bz2
Kamon-9ca0bffb230baab4d79f30ceabfcb453387fe952.zip
= core : fixes #38
* I've changed the way to get the Actorsystem inside of DispatcerTracing, passing the actorSystem across the Dispatchers and then to each Dispatcher in the same ActorSystem.
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala53
-rw-r--r--kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala12
2 files changed, 41 insertions, 24 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
index 4918fd04..69b78e5e 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/DispatcherTracing.scala
@@ -17,11 +17,11 @@
package akka.instrumentation
import org.aspectj.lang.annotation._
-import akka.dispatch.{ExecutorServiceDelegate, Dispatcher, MessageDispatcher}
-import kamon.metrics.{Metrics, DispatcherMetrics}
+import akka.dispatch.{ Dispatchers, ExecutorServiceDelegate, Dispatcher, MessageDispatcher }
+import kamon.metrics.{ Metrics, DispatcherMetrics }
import kamon.metrics.DispatcherMetrics.DispatcherMetricRecorder
import kamon.Kamon
-import akka.actor.{Cancellable, ActorSystemImpl}
+import akka.actor.{ Cancellable, ActorSystemImpl }
import scala.concurrent.forkjoin.ForkJoinPool
import java.util.concurrent.ThreadPoolExecutor
import java.lang.reflect.Method
@@ -30,14 +30,24 @@ import akka.instrumentation.DispatcherMetricsCollector.DispatcherMetricsMeasurem
@Aspect
class DispatcherTracing {
- private[this] var actorSystem: ActorSystemImpl = _
+ @Pointcut("execution(akka.dispatch.Dispatchers.new(..)) && this(dispatchers) && cflow(execution(akka.actor.ActorSystemImpl.new(..)) && this(system))")
+ def onActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl) = {}
- @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && this(system)")
- def onActorSystemStartup(system: ActorSystemImpl) = {}
+ @Before("onActorSystemStartup(dispatchers, system)")
+ def beforeActorSystemStartup(dispatchers: Dispatchers, system: ActorSystemImpl): Unit = {
+ val currentDispatchers = dispatchers.asInstanceOf[DispatchersWithActorSystem]
+ currentDispatchers.actorSystem = system
+ }
+
+ @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers)")
+ def onDispatchersLookup(dispatchers: Dispatchers) = {}
+
+ @AfterReturning(pointcut = "onDispatchersLookup(dispatchers)", returning = "dispatcher")
+ def afterReturningLookup(dispatchers: Dispatchers, dispatcher: Dispatcher): Unit = {
+ val dispatchersWithActorSystem = dispatchers.asInstanceOf[DispatchersWithActorSystem]
+ val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
- @Before("onActorSystemStartup(system)")
- def beforeActorSystemStartup(system: ActorSystemImpl): Unit = {
- actorSystem = system
+ dispatcherWithMetrics.actorSystem = dispatchersWithActorSystem.actorSystem
}
@Pointcut("call(* akka.dispatch.ExecutorServiceFactory.createExecutorService(..))")
@@ -52,9 +62,9 @@ class DispatcherTracing {
@After("onDispatcherStartup(dispatcher)")
def afterDispatcherStartup(dispatcher: MessageDispatcher): Unit = {
- val metricsExtension = Kamon(Metrics)(actorSystem)
- val metricIdentity = DispatcherMetrics(dispatcher.id)
val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
+ val metricsExtension = Kamon(Metrics)(dispatcherWithMetrics.actorSystem)
+ val metricIdentity = DispatcherMetrics(dispatcher.id)
dispatcherWithMetrics.metricIdentity = metricIdentity
dispatcherWithMetrics.dispatcherMetricsRecorder = metricsExtension.register(metricIdentity, DispatcherMetrics.Factory)
@@ -83,13 +93,12 @@ class DispatcherTracing {
val dispatcherWithMetrics = dispatcher.asInstanceOf[DispatcherMessageMetrics]
dispatcherWithMetrics.dispatcherMetricsRecorder.map {
- dispatcher =>
+ dispatcher ⇒
+ println("dispatcher down => " + dispatcher)
dispatcherWithMetrics.dispatcherCollectorCancellable.cancel()
- Kamon(Metrics)(actorSystem).unregister(dispatcherWithMetrics.metricIdentity)
+ Kamon(Metrics)(dispatcherWithMetrics.actorSystem).unregister(dispatcherWithMetrics.metricIdentity)
}
}
-
-
}
@Aspect
@@ -97,12 +106,20 @@ class DispatcherMetricsMixin {
@DeclareMixin("akka.dispatch.Dispatcher")
def mixinDispatcherMetricsToMessageDispatcher: DispatcherMessageMetrics = new DispatcherMessageMetrics {}
+
+ @DeclareMixin("akka.dispatch.Dispatchers")
+ def mixinDispatchersToDispatchersWithActorSystem: DispatchersWithActorSystem = new DispatchersWithActorSystem {}
}
trait DispatcherMessageMetrics {
var metricIdentity: DispatcherMetrics = _
var dispatcherMetricsRecorder: Option[DispatcherMetricRecorder] = _
var dispatcherCollectorCancellable: Cancellable = _
+ var actorSystem: ActorSystemImpl = _
+}
+
+trait DispatchersWithActorSystem {
+ var actorSystem: ActorSystemImpl = _
}
object DispatcherMetricsCollector {
@@ -130,13 +147,13 @@ object DispatcherMetricsCollector {
case x: Dispatcher ⇒ {
val executor = executorServiceMethod.invoke(x) match {
case delegate: ExecutorServiceDelegate ⇒ delegate.executor
- case other ⇒ other
+ case other ⇒ other
}
executor match {
- case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp)
+ case fjp: ForkJoinPool ⇒ collectForkJoinMetrics(fjp)
case tpe: ThreadPoolExecutor ⇒ collectExecutorMetrics(tpe)
- case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
+ case anything ⇒ DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
}
}
case _ ⇒ new DispatcherMetricsMeasurement(0L, 0L, 0L, 0L)
diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
index de4dc140..5b7ca471 100644
--- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
+++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
@@ -16,15 +16,15 @@
package kamon.datadog
-import akka.actor.{ActorSystem, Props, ActorRef, Actor}
-import akka.io.{Udp, IO}
+import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
+import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.metrics.MetricSnapshot.Measurement
-import kamon.metrics.InstrumentTypes.{Counter, Gauge, Histogram, InstrumentType}
+import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType }
import java.text.DecimalFormat
-import kamon.metrics.{MetricIdentity, MetricGroupIdentity}
+import kamon.metrics.{ MetricIdentity, MetricGroupIdentity }
class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider {
@@ -77,8 +77,8 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long
instrumentType match {
case Histogram ⇒ dataDogDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count))
- case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g")
- case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c")
+ case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g")
+ case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c")
}
}