aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-08-29 02:33:04 -0300
committerDiego <diegolparra@gmail.com>2014-08-29 02:33:04 -0300
commitdc5c3fb65085af225464768e0e672075713135d6 (patch)
tree21ef78c2aeb0601f93a8d868e4f2a810845c173e /kamon-core
parent2b56ce068e928de6ecae993e5038ef33394712aa (diff)
downloadKamon-dc5c3fb65085af225464768e0e672075713135d6.tar.gz
Kamon-dc5c3fb65085af225464768e0e672075713135d6.tar.bz2
Kamon-dc5c3fb65085af225464768e0e672075713135d6.zip
+ core: provide metrics for routers
* processing-time * errors * time-in-mailbox closes #62
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/resources/reference.conf11
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala62
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala143
-rw-r--r--kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala85
-rw-r--r--kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala163
5 files changed, 457 insertions, 7 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index b3df73bf..ace05e87 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -50,6 +50,12 @@ kamon {
}
},
{
+ router {
+ includes = []
+ excludes = [ "system/*", "user/IO-*" ]
+ }
+ },
+ {
trace {
includes = [ "*" ]
excludes = []
@@ -88,6 +94,11 @@ kamon {
mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision}
}
+ router {
+ processing-time = ${kamon.metrics.precision.default-histogram-precision}
+ time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision}
+ }
+
trace {
elapsed-time = ${kamon.metrics.precision.default-histogram-precision}
segment = ${kamon.metrics.precision.default-histogram-precision}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
index 235f5143..f6b68617 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
@@ -18,9 +18,11 @@ package akka.instrumentation
import akka.actor._
import akka.dispatch.{ Envelope, MessageDispatcher }
+import akka.routing.RoutedActorCell
import kamon.Kamon
import kamon.metric.ActorMetrics.ActorMetricsRecorder
-import kamon.metric.{ ActorMetrics, Metrics }
+import kamon.metric.RouterMetrics.RouterMetricsRecorder
+import kamon.metric.{ ActorMetrics, Metrics, RouterMetrics }
import kamon.trace._
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation._
@@ -28,6 +30,8 @@ import org.aspectj.lang.annotation._
@Aspect
class ActorCellInstrumentation {
+ import ActorCellInstrumentation.PimpedActorCellMetrics
+
@Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)")
def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
@@ -38,8 +42,14 @@ class ActorCellInstrumentation {
val metricIdentity = ActorMetrics(ref.path.elements.mkString("/"))
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.metricIdentity = metricIdentity
+ cellWithMetrics.actorMetricIdentity = metricIdentity
cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory)
+
+ cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
+ val routerMetricIdentity = RouterMetrics(s"${routedActorCell.asInstanceOf[RoutedActorCell].self.path.elements.mkString("/")}")
+ routedActorCell.routerMetricIdentity = routerMetricIdentity
+ routedActorCell.routerMetricsRecorder = metricsExtension.register(routerMetricIdentity, RouterMetrics.Factory)
+ }
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
@@ -47,9 +57,9 @@ class ActorCellInstrumentation {
@Around("invokingActorBehaviourAtActorCell(cell, envelope)")
def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = {
+ val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
val timestampBeforeProcessing = System.nanoTime()
val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware]
- val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
try {
TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) {
@@ -58,9 +68,23 @@ class ActorCellInstrumentation {
} finally {
cellWithMetrics.actorMetricsRecorder.map {
am ⇒
- am.processingTime.record(System.nanoTime() - timestampBeforeProcessing)
- am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime)
+ val processingTime = System.nanoTime() - timestampBeforeProcessing
+ val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime
+
+ am.processingTime.record(processingTime)
+ am.timeInMailbox.record(timeInMailbox)
am.mailboxSize.decrement()
+
+ (processingTime, timeInMailbox)
+ } map {
+ case (processingTime, timeInMailbox) ⇒
+ cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
+ routedActorCell.routerMetricsRecorder.map {
+ rm ⇒
+ rm.processingTime.record(processingTime)
+ rm.timeInMailbox.record(timeInMailbox)
+ }
+ }
}
}
}
@@ -82,7 +106,13 @@ class ActorCellInstrumentation {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
cellWithMetrics.actorMetricsRecorder.map { p ⇒
- Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity)
+ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.actorMetricIdentity)
+ }
+
+ cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
+ routedActorCell.routerMetricsRecorder.map { rm ⇒
+ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.routerMetricIdentity)
+ }
}
}
@@ -96,12 +126,21 @@ class ActorCellInstrumentation {
cellWithMetrics.actorMetricsRecorder.map {
am ⇒ am.errors.increment()
}
+
+ cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
+ routedActorCell.routerMetricsRecorder.map {
+ rm ⇒ rm.errors.increment()
+ }
+ }
}
+
}
trait ActorCellMetrics {
- var metricIdentity: ActorMetrics = _
+ var actorMetricIdentity: ActorMetrics = _
+ var routerMetricIdentity: RouterMetrics = _
var actorMetricsRecorder: Option[ActorMetricsRecorder] = _
+ var routerMetricsRecorder: Option[RouterMetricsRecorder] = _
}
@Aspect
@@ -125,4 +164,13 @@ class TraceContextIntoEnvelopeMixin {
// Necessary to force the initialization of ContextAware at the moment of creation.
ctx.traceContext
}
+}
+
+object ActorCellInstrumentation {
+ implicit class PimpedActorCellMetrics(cell: ActorCellMetrics) {
+ def onRoutedActorCell(block: ActorCellMetrics ⇒ Unit) = cell match {
+ case routedActorCell: RoutedActorCell ⇒ block(cell)
+ case everyThingElse ⇒
+ }
+ }
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala
new file mode 100644
index 00000000..f75080db
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala
@@ -0,0 +1,143 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package akka.instrumentation
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.dispatch.MessageDispatcher
+import akka.routing.RoutedActorCell
+import kamon.metric.RouterMetrics
+import kamon.metric.RouterMetrics.RouterMetricsRecorder
+import org.aspectj.lang.annotation._
+
+@Aspect
+class RoutedActorCellInstrumentation {
+
+ @Pointcut("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(system, ref, routerProps, routerDispatcher, supervisor)")
+ def actorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, routerProps: Props, routerDispatcher: MessageDispatcher, supervisor: ActorRef ): Unit = {}
+
+// @After("actorCellCreation(cell, system, ref, routerProps, routerDispatcher, supervisor)")
+ @After("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(*, ref, *, *, *, *)")
+// def afterCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, routerProps: Props, routerDispatcher: MessageDispatcher, supervisor: ActorRef): Unit = {
+ def a(cell: RoutedActorCell, ref: ActorRef) = {
+
+ print("adf;kjaskadjlfaj"+ ref)
+// cell.router.routees
+// val metricsExtension = Kamon(Metrics)(system)
+// val metricIdentity = RouterMetrics(ref.path.elements.mkString("/"))
+// val cellWithMetrics = cell.asInstanceOf[RoutedActorCellMetrics]
+//
+// cellWithMetrics.metricIdentity = metricIdentity
+// cellWithMetrics.routerMetricsRecorder = metricsExtension.register(metricIdentity, RouterMetrics.Factory)
+ }
+}
+
+trait RoutedActorCellMetrics {
+ var metricIdentity: RouterMetrics = _
+ var routerMetricsRecorder: Option[RouterMetricsRecorder] = _
+}
+
+@Aspect
+class RoutedActorCellMetricsIntoRoutedActorCellMixin {
+
+ @DeclareMixin("akka.routing.RoutedActorCell")
+ def mixinRoutedActorCellMetricsToRoutedActorCell: RoutedActorCellMetrics = new RoutedActorCellMetrics {}
+}
+
+// @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
+// def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {}
+//
+// @Around("invokingActorBehaviourAtActorCell(cell, envelope)")
+// def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = {
+// val timestampBeforeProcessing = System.nanoTime()
+// val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware]
+// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
+//
+// try {
+// TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) {
+// pjp.proceed()
+// }
+// } finally {
+// cellWithMetrics.actorMetricsRecorder.map {
+// am ⇒
+// am.processingTime.record(System.nanoTime() - timestampBeforeProcessing)
+// am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime)
+// am.mailboxSize.decrement()
+// }
+// }
+// }
+//
+// @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)")
+// def sendingMessageToActorCell(cell: ActorCell): Unit = {}
+//
+// @After("sendingMessageToActorCell(cell)")
+// def afterSendMessageToActorCell(cell: ActorCell): Unit = {
+// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
+// cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment())
+// }
+//
+// @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
+// def actorStop(cell: ActorCell): Unit = {}
+//
+// @After("actorStop(cell)")
+// def afterStop(cell: ActorCell): Unit = {
+// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
+//
+// cellWithMetrics.actorMetricsRecorder.map { p ⇒
+// Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity)
+// }
+// }
+//
+// @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)")
+// def actorInvokeFailure(cell: ActorCell): Unit = {}
+//
+// @Before("actorInvokeFailure(cell)")
+// def beforeInvokeFailure(cell: ActorCell): Unit = {
+// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
+//
+// cellWithMetrics.actorMetricsRecorder.map {
+// am ⇒ am.errors.increment()
+// }
+// }
+//}
+//
+//trait ActorCellMetrics {
+// var metricIdentity: ActorMetrics = _
+// var actorMetricsRecorder: Option[ActorMetricsRecorder] = _
+//}
+//
+//@Aspect
+//class ActorCellMetricsIntoActorCellMixin {
+//
+// @DeclareMixin("akka.actor.ActorCell")
+// def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {}
+//}
+//
+//@Aspect
+//class TraceContextIntoEnvelopeMixin {
+//
+// @DeclareMixin("akka.dispatch.Envelope")
+// def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default
+//
+// @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
+// def envelopeCreation(ctx: TraceContextAware): Unit = {}
+//
+// @After("envelopeCreation(ctx)")
+// def afterEnvelopeCreation(ctx: TraceContextAware): Unit = {
+// // Necessary to force the initialization of ContextAware at the moment of creation.
+// ctx.traceContext
+// }
+//} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala b/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala
new file mode 100644
index 00000000..adb2a18b
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala
@@ -0,0 +1,85 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+package kamon.metric
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+<<<<<<< Updated upstream
+import kamon.metric.instrument.{ Counter, Histogram }
+=======
+import kamon.metric.instrument.{Counter, Histogram}
+>>>>>>> Stashed changes
+
+case class RouterMetrics(name: String) extends MetricGroupIdentity {
+ val category = RouterMetrics
+}
+
+object RouterMetrics extends MetricGroupCategory {
+ val name = "router"
+
+ case object ProcessingTime extends MetricIdentity { val name = "processing-time" }
+ case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" }
+ case object Errors extends MetricIdentity { val name = "errors" }
+
+ case class RouterMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder {
+
+ def collect(context: CollectionContext): RouterMetricSnapshot =
+ RouterMetricSnapshot(processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context))
+
+ def cleanup: Unit = {
+ processingTime.cleanup
+ timeInMailbox.cleanup
+ errors.cleanup
+ }
+ }
+
+ case class RouterMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot,
+<<<<<<< Updated upstream
+ errors: Counter.Snapshot) extends MetricGroupSnapshot {
+=======
+ errors: Counter.Snapshot) extends MetricGroupSnapshot {
+>>>>>>> Stashed changes
+
+ type GroupSnapshotType = RouterMetricSnapshot
+
+ def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot =
+ RouterMetricSnapshot(
+ processingTime.merge(that.processingTime, context),
+ timeInMailbox.merge(that.timeInMailbox, context),
+ errors.merge(that.errors, context))
+
+ lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
+ ProcessingTime -> processingTime,
+ TimeInMailbox -> timeInMailbox,
+ Errors -> errors)
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = RouterMetricsRecorder
+
+ def create(config: Config, system: ActorSystem): RouterMetricsRecorder = {
+ val settings = config.getConfig("precision.router")
+
+ val processingTimeConfig = settings.getConfig("processing-time")
+ val timeInMailboxConfig = settings.getConfig("time-in-mailbox")
+
+ new RouterMetricsRecorder(
+ Histogram.fromConfig(processingTimeConfig),
+ Histogram.fromConfig(timeInMailboxConfig),
+ Counter())
+ }
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala
new file mode 100644
index 00000000..37b3d169
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala
@@ -0,0 +1,163 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import java.nio.LongBuffer
+
+import akka.actor._
+import akka.routing.RoundRobinPool
+import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase }
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.metric.RouterMetrics._
+import kamon.metric.RouterMetricsTestActor._
+import kamon.metric.Subscriptions.TickMetricSnapshot
+import kamon.metric.instrument.{ Counter, Histogram }
+import org.scalatest.{ Matchers, WordSpecLike }
+
+import scala.concurrent.duration._
+
+class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
+ implicit lazy val system: ActorSystem = ActorSystem("router-metrics-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics {
+ | tick-interval = 1 second
+ | default-collection-context-buffer-size = 10
+ |
+ | filters = [
+ | {
+ | router {
+ | includes = [ "user/tracked-*", "user/measuring-*", "user/stop" ]
+ | excludes = [ "user/tracked-explicitly-excluded"]
+ | }
+ | }
+ | ]
+ | precision {
+ | default-histogram-precision {
+ | highest-trackable-value = 3600000000000
+ | significant-value-digits = 2
+ | }
+ | }
+ |}
+ """.stripMargin))
+
+ "the Kamon router metrics" should {
+ "respect the configured include and exclude filters" in new RouterMetricsFixtures {
+ createTestRouter("tracked-router")
+ createTestRouter("non-tracked-router")
+ createTestRouter("tracked-explicitly-excluded")
+
+ Kamon(Metrics).subscribe(RouterMetrics, "*", testActor, permanently = true)
+ expectMsgType[TickMetricSnapshot]
+
+ within(2 seconds) {
+ val tickSnapshot = expectMsgType[TickMetricSnapshot]
+ tickSnapshot.metrics.keys should contain(RouterMetrics("user/tracked-router"))
+ tickSnapshot.metrics.keys should not contain (RouterMetrics("user/non-tracked-router"))
+ tickSnapshot.metrics.keys should not contain (RouterMetrics("user/tracked-explicitly-excluded"))
+ }
+ }
+
+ "record the processing-time of the receive function" in new RouterMetricsFixtures {
+ val metricsListener = TestProbe()
+ val trackedRouter = createTestRouter("measuring-processing-time")
+
+ trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref)
+ val timings = metricsListener.expectMsgType[RouterTrackedTimings]
+
+ val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics
+ tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L)
+ tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L)
+ // tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos)
+ }
+
+ "record the number of errors" in new RouterMetricsFixtures {
+ val metricsListener = TestProbe()
+ val trackedRouter = createTestRouter("measuring-errors")
+
+ for (i ← 1 to 10) {
+ trackedRouter.tell(Fail, metricsListener.ref)
+ }
+ val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics
+ tickSnapshot(RouterMetrics("user/measuring-errors")).metrics(Errors).asInstanceOf[Counter.Snapshot].count should be(10L)
+ }
+
+ "record the time-in-mailbox" in new RouterMetricsFixtures {
+ val metricsListener = TestProbe()
+ val trackedRouter = createTestRouter("measuring-time-in-mailbox")
+
+ trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref)
+ val timings = metricsListener.expectMsgType[RouterTrackedTimings]
+ val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics
+
+ tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L)
+ tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L)
+ tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos)
+ }
+
+ "clean up the associated recorder when the actor is stopped" in new RouterMetricsFixtures {
+ val trackedRouter = createTestRouter("stop")
+ trackedRouter ! Ping
+ Kamon(Metrics).storage.toString() // force to be initialized
+ Kamon(Metrics).storage.get(RouterMetrics("user/stop")) should not be empty
+
+ val deathWatcher = TestProbe()
+ deathWatcher.watch(trackedRouter)
+ trackedRouter ! PoisonPill
+ deathWatcher.expectTerminated(trackedRouter)
+
+ Kamon(Metrics).storage.get(RouterMetrics("user/stop")) shouldBe empty
+ }
+ }
+
+ trait RouterMetricsFixtures {
+ val collectionContext = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(10000)
+ }
+
+ def createTestRouter(name: String): ActorRef = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), name)
+
+ def takeSnapshotOf(amr: RouterMetricsRecorder): RouterMetricSnapshot = amr.collect(collectionContext)
+ }
+}
+
+class RouterMetricsTestActor extends Actor {
+ def receive = {
+ case Discard ⇒
+ case Fail ⇒ 1 / 0
+ case Ping ⇒ sender ! Pong
+ case RouterTrackTimings(sendTimestamp, sleep) ⇒ {
+ val dequeueTimestamp = System.nanoTime()
+ sleep.map(s ⇒ Thread.sleep(s.toMillis))
+ val afterReceiveTimestamp = System.nanoTime()
+
+ sender ! RouterTrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp)
+ }
+ }
+}
+
+object RouterMetricsTestActor {
+ case object Ping
+ case object Pong
+ case object Fail
+ case object Discard
+
+ case class RouterTrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None)
+ case class RouterTrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) {
+ def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp
+ def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp
+ }
+}