From dc5c3fb65085af225464768e0e672075713135d6 Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 29 Aug 2014 02:33:04 -0300 Subject: + core: provide metrics for routers * processing-time * errors * time-in-mailbox closes #62 --- kamon-core/src/main/resources/reference.conf | 11 ++ .../akka/ActorCellInstrumentation.scala | 62 +++++++- .../akka/RoutedActorCellInstrumentation.scala | 143 ++++++++++++++++++ .../main/scala/kamon/metric/RouterMetrics.scala | 85 +++++++++++ .../scala/kamon/metric/RouterMetricsSpec.scala | 163 +++++++++++++++++++++ .../src/main/resources/application.conf | 13 +- .../main/scala/test/SimpleRequestProcessor.scala | 40 ++++- 7 files changed, 507 insertions(+), 10 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala create mode 100644 kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index b3df73bf..ace05e87 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -49,6 +49,12 @@ kamon { excludes = [ "system/*", "user/IO-*" ] } }, + { + router { + includes = [] + excludes = [ "system/*", "user/IO-*" ] + } + }, { trace { includes = [ "*" ] @@ -88,6 +94,11 @@ kamon { mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision} } + router { + processing-time = ${kamon.metrics.precision.default-histogram-precision} + time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} + } + trace { elapsed-time = ${kamon.metrics.precision.default-histogram-precision} segment = ${kamon.metrics.precision.default-histogram-precision} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala index 235f5143..f6b68617 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala @@ -18,9 +18,11 @@ package akka.instrumentation import akka.actor._ import akka.dispatch.{ Envelope, MessageDispatcher } +import akka.routing.RoutedActorCell import kamon.Kamon import kamon.metric.ActorMetrics.ActorMetricsRecorder -import kamon.metric.{ ActorMetrics, Metrics } +import kamon.metric.RouterMetrics.RouterMetricsRecorder +import kamon.metric.{ ActorMetrics, Metrics, RouterMetrics } import kamon.trace._ import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ @@ -28,6 +30,8 @@ import org.aspectj.lang.annotation._ @Aspect class ActorCellInstrumentation { + import ActorCellInstrumentation.PimpedActorCellMetrics + @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} @@ -38,8 +42,14 @@ class ActorCellInstrumentation { val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.metricIdentity = metricIdentity + cellWithMetrics.actorMetricIdentity = metricIdentity cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) + + cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ + val routerMetricIdentity = RouterMetrics(s"${routedActorCell.asInstanceOf[RoutedActorCell].self.path.elements.mkString("/")}") + routedActorCell.routerMetricIdentity = routerMetricIdentity + routedActorCell.routerMetricsRecorder = metricsExtension.register(routerMetricIdentity, RouterMetrics.Factory) + } } @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") @@ -47,9 +57,9 @@ class ActorCellInstrumentation { @Around("invokingActorBehaviourAtActorCell(cell, envelope)") def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] val timestampBeforeProcessing = System.nanoTime() val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] try { TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { @@ -58,9 +68,23 @@ class ActorCellInstrumentation { } finally { cellWithMetrics.actorMetricsRecorder.map { am ⇒ - am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) - am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) + val processingTime = System.nanoTime() - timestampBeforeProcessing + val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime + + am.processingTime.record(processingTime) + am.timeInMailbox.record(timeInMailbox) am.mailboxSize.decrement() + + (processingTime, timeInMailbox) + } map { + case (processingTime, timeInMailbox) ⇒ + cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ + routedActorCell.routerMetricsRecorder.map { + rm ⇒ + rm.processingTime.record(processingTime) + rm.timeInMailbox.record(timeInMailbox) + } + } } } } @@ -82,7 +106,13 @@ class ActorCellInstrumentation { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] cellWithMetrics.actorMetricsRecorder.map { p ⇒ - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity) + Kamon(Metrics)(cell.system).unregister(cellWithMetrics.actorMetricIdentity) + } + + cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ + routedActorCell.routerMetricsRecorder.map { rm ⇒ + Kamon(Metrics)(cell.system).unregister(cellWithMetrics.routerMetricIdentity) + } } } @@ -96,12 +126,21 @@ class ActorCellInstrumentation { cellWithMetrics.actorMetricsRecorder.map { am ⇒ am.errors.increment() } + + cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ + routedActorCell.routerMetricsRecorder.map { + rm ⇒ rm.errors.increment() + } + } } + } trait ActorCellMetrics { - var metricIdentity: ActorMetrics = _ + var actorMetricIdentity: ActorMetrics = _ + var routerMetricIdentity: RouterMetrics = _ var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ + var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ } @Aspect @@ -125,4 +164,13 @@ class TraceContextIntoEnvelopeMixin { // Necessary to force the initialization of ContextAware at the moment of creation. ctx.traceContext } +} + +object ActorCellInstrumentation { + implicit class PimpedActorCellMetrics(cell: ActorCellMetrics) { + def onRoutedActorCell(block: ActorCellMetrics ⇒ Unit) = cell match { + case routedActorCell: RoutedActorCell ⇒ block(cell) + case everyThingElse ⇒ + } + } } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala new file mode 100644 index 00000000..f75080db --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala @@ -0,0 +1,143 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.instrumentation + +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.dispatch.MessageDispatcher +import akka.routing.RoutedActorCell +import kamon.metric.RouterMetrics +import kamon.metric.RouterMetrics.RouterMetricsRecorder +import org.aspectj.lang.annotation._ + +@Aspect +class RoutedActorCellInstrumentation { + + @Pointcut("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(system, ref, routerProps, routerDispatcher, supervisor)") + def actorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, routerProps: Props, routerDispatcher: MessageDispatcher, supervisor: ActorRef ): Unit = {} + +// @After("actorCellCreation(cell, system, ref, routerProps, routerDispatcher, supervisor)") + @After("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(*, ref, *, *, *, *)") +// def afterCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, routerProps: Props, routerDispatcher: MessageDispatcher, supervisor: ActorRef): Unit = { + def a(cell: RoutedActorCell, ref: ActorRef) = { + + print("adf;kjaskadjlfaj"+ ref) +// cell.router.routees +// val metricsExtension = Kamon(Metrics)(system) +// val metricIdentity = RouterMetrics(ref.path.elements.mkString("/")) +// val cellWithMetrics = cell.asInstanceOf[RoutedActorCellMetrics] +// +// cellWithMetrics.metricIdentity = metricIdentity +// cellWithMetrics.routerMetricsRecorder = metricsExtension.register(metricIdentity, RouterMetrics.Factory) + } +} + +trait RoutedActorCellMetrics { + var metricIdentity: RouterMetrics = _ + var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ +} + +@Aspect +class RoutedActorCellMetricsIntoRoutedActorCellMixin { + + @DeclareMixin("akka.routing.RoutedActorCell") + def mixinRoutedActorCellMetricsToRoutedActorCell: RoutedActorCellMetrics = new RoutedActorCellMetrics {} +} + +// @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") +// def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} +// +// @Around("invokingActorBehaviourAtActorCell(cell, envelope)") +// def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { +// val timestampBeforeProcessing = System.nanoTime() +// val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] +// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] +// +// try { +// TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { +// pjp.proceed() +// } +// } finally { +// cellWithMetrics.actorMetricsRecorder.map { +// am ⇒ +// am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) +// am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) +// am.mailboxSize.decrement() +// } +// } +// } +// +// @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)") +// def sendingMessageToActorCell(cell: ActorCell): Unit = {} +// +// @After("sendingMessageToActorCell(cell)") +// def afterSendMessageToActorCell(cell: ActorCell): Unit = { +// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] +// cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment()) +// } +// +// @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") +// def actorStop(cell: ActorCell): Unit = {} +// +// @After("actorStop(cell)") +// def afterStop(cell: ActorCell): Unit = { +// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] +// +// cellWithMetrics.actorMetricsRecorder.map { p ⇒ +// Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity) +// } +// } +// +// @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") +// def actorInvokeFailure(cell: ActorCell): Unit = {} +// +// @Before("actorInvokeFailure(cell)") +// def beforeInvokeFailure(cell: ActorCell): Unit = { +// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] +// +// cellWithMetrics.actorMetricsRecorder.map { +// am ⇒ am.errors.increment() +// } +// } +//} +// +//trait ActorCellMetrics { +// var metricIdentity: ActorMetrics = _ +// var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ +//} +// +//@Aspect +//class ActorCellMetricsIntoActorCellMixin { +// +// @DeclareMixin("akka.actor.ActorCell") +// def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} +//} +// +//@Aspect +//class TraceContextIntoEnvelopeMixin { +// +// @DeclareMixin("akka.dispatch.Envelope") +// def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default +// +// @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") +// def envelopeCreation(ctx: TraceContextAware): Unit = {} +// +// @After("envelopeCreation(ctx)") +// def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { +// // Necessary to force the initialization of ContextAware at the moment of creation. +// ctx.traceContext +// } +//} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala b/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala new file mode 100644 index 00000000..adb2a18b --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala @@ -0,0 +1,85 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package kamon.metric + +import akka.actor.ActorSystem +import com.typesafe.config.Config +<<<<<<< Updated upstream +import kamon.metric.instrument.{ Counter, Histogram } +======= +import kamon.metric.instrument.{Counter, Histogram} +>>>>>>> Stashed changes + +case class RouterMetrics(name: String) extends MetricGroupIdentity { + val category = RouterMetrics +} + +object RouterMetrics extends MetricGroupCategory { + val name = "router" + + case object ProcessingTime extends MetricIdentity { val name = "processing-time" } + case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } + case object Errors extends MetricIdentity { val name = "errors" } + + case class RouterMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder { + + def collect(context: CollectionContext): RouterMetricSnapshot = + RouterMetricSnapshot(processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context)) + + def cleanup: Unit = { + processingTime.cleanup + timeInMailbox.cleanup + errors.cleanup + } + } + + case class RouterMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, +<<<<<<< Updated upstream + errors: Counter.Snapshot) extends MetricGroupSnapshot { +======= + errors: Counter.Snapshot) extends MetricGroupSnapshot { +>>>>>>> Stashed changes + + type GroupSnapshotType = RouterMetricSnapshot + + def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot = + RouterMetricSnapshot( + processingTime.merge(that.processingTime, context), + timeInMailbox.merge(that.timeInMailbox, context), + errors.merge(that.errors, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + ProcessingTime -> processingTime, + TimeInMailbox -> timeInMailbox, + Errors -> errors) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = RouterMetricsRecorder + + def create(config: Config, system: ActorSystem): RouterMetricsRecorder = { + val settings = config.getConfig("precision.router") + + val processingTimeConfig = settings.getConfig("processing-time") + val timeInMailboxConfig = settings.getConfig("time-in-mailbox") + + new RouterMetricsRecorder( + Histogram.fromConfig(processingTimeConfig), + Histogram.fromConfig(timeInMailboxConfig), + Counter()) + } + } +} diff --git a/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala new file mode 100644 index 00000000..37b3d169 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala @@ -0,0 +1,163 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import java.nio.LongBuffer + +import akka.actor._ +import akka.routing.RoundRobinPool +import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase } +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.metric.RouterMetrics._ +import kamon.metric.RouterMetricsTestActor._ +import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.instrument.{ Counter, Histogram } +import org.scalatest.{ Matchers, WordSpecLike } + +import scala.concurrent.duration._ + +class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("router-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | tick-interval = 1 second + | default-collection-context-buffer-size = 10 + | + | filters = [ + | { + | router { + | includes = [ "user/tracked-*", "user/measuring-*", "user/stop" ] + | excludes = [ "user/tracked-explicitly-excluded"] + | } + | } + | ] + | precision { + | default-histogram-precision { + | highest-trackable-value = 3600000000000 + | significant-value-digits = 2 + | } + | } + |} + """.stripMargin)) + + "the Kamon router metrics" should { + "respect the configured include and exclude filters" in new RouterMetricsFixtures { + createTestRouter("tracked-router") + createTestRouter("non-tracked-router") + createTestRouter("tracked-explicitly-excluded") + + Kamon(Metrics).subscribe(RouterMetrics, "*", testActor, permanently = true) + expectMsgType[TickMetricSnapshot] + + within(2 seconds) { + val tickSnapshot = expectMsgType[TickMetricSnapshot] + tickSnapshot.metrics.keys should contain(RouterMetrics("user/tracked-router")) + tickSnapshot.metrics.keys should not contain (RouterMetrics("user/non-tracked-router")) + tickSnapshot.metrics.keys should not contain (RouterMetrics("user/tracked-explicitly-excluded")) + } + } + + "record the processing-time of the receive function" in new RouterMetricsFixtures { + val metricsListener = TestProbe() + val trackedRouter = createTestRouter("measuring-processing-time") + + trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref) + val timings = metricsListener.expectMsgType[RouterTrackedTimings] + + val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics + tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) + tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L) + // tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + } + + "record the number of errors" in new RouterMetricsFixtures { + val metricsListener = TestProbe() + val trackedRouter = createTestRouter("measuring-errors") + + for (i ← 1 to 10) { + trackedRouter.tell(Fail, metricsListener.ref) + } + val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics + tickSnapshot(RouterMetrics("user/measuring-errors")).metrics(Errors).asInstanceOf[Counter.Snapshot].count should be(10L) + } + + "record the time-in-mailbox" in new RouterMetricsFixtures { + val metricsListener = TestProbe() + val trackedRouter = createTestRouter("measuring-time-in-mailbox") + + trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref) + val timings = metricsListener.expectMsgType[RouterTrackedTimings] + val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics + + tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) + tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L) + tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + } + + "clean up the associated recorder when the actor is stopped" in new RouterMetricsFixtures { + val trackedRouter = createTestRouter("stop") + trackedRouter ! Ping + Kamon(Metrics).storage.toString() // force to be initialized + Kamon(Metrics).storage.get(RouterMetrics("user/stop")) should not be empty + + val deathWatcher = TestProbe() + deathWatcher.watch(trackedRouter) + trackedRouter ! PoisonPill + deathWatcher.expectTerminated(trackedRouter) + + Kamon(Metrics).storage.get(RouterMetrics("user/stop")) shouldBe empty + } + } + + trait RouterMetricsFixtures { + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) + } + + def createTestRouter(name: String): ActorRef = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), name) + + def takeSnapshotOf(amr: RouterMetricsRecorder): RouterMetricSnapshot = amr.collect(collectionContext) + } +} + +class RouterMetricsTestActor extends Actor { + def receive = { + case Discard ⇒ + case Fail ⇒ 1 / 0 + case Ping ⇒ sender ! Pong + case RouterTrackTimings(sendTimestamp, sleep) ⇒ { + val dequeueTimestamp = System.nanoTime() + sleep.map(s ⇒ Thread.sleep(s.toMillis)) + val afterReceiveTimestamp = System.nanoTime() + + sender ! RouterTrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) + } + } +} + +object RouterMetricsTestActor { + case object Ping + case object Pong + case object Fail + case object Discard + + case class RouterTrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) + case class RouterTrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { + def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp + def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp + } +} diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf index 5d96c7f6..a870dc6b 100644 --- a/kamon-playground/src/main/resources/application.conf +++ b/kamon-playground/src/main/resources/application.conf @@ -1,8 +1,11 @@ akka { loglevel = INFO - extensions = ["kamon.logreporter.LogReporter", "kamon.system.SystemMetrics"] +<<<<<<< Updated upstream +======= + extensions = ["kamon.logreporter.LogReporter"] +>>>>>>> Stashed changes actor { debug { unhandled = on @@ -51,6 +54,12 @@ kamon { includes = [ "*" ] excludes = [ ] } + }, + { + router { + includes = [ "user/replier" ] + excludes = [ "system/*", "user/IO-*" ] + } } ] @@ -86,7 +95,7 @@ kamon { } weaver { - showWeaveInfo = off + showWeaveInfo = on verbose = off debug = off showWarn = off diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index 301a9bbd..58a1478c 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -23,7 +23,11 @@ import spray.httpx.RequestBuilding import scala.concurrent.{ Await, Future } import kamon.spray.KamonTraceDirectives import scala.util.Random -import akka.routing.RoundRobinPool +<<<<<<< Updated upstream +import akka.routing.{ RoundRobinRoutingLogic, Router, ActorRefRoutee, RoundRobinPool } +======= +import akka.routing.{RoundRobinRoutingLogic, Router, ActorRefRoutee, RoundRobinPool} +>>>>>>> Stashed changes import kamon.trace.TraceRecorder import kamon.Kamon import kamon.metric._ @@ -65,7 +69,15 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil //Kamon(UserMetrics).registerGauge("test-gauge")(() => 10L) val pipeline = sendReceive +<<<<<<< Updated upstream + val replier = system.actorOf(Props[Replier].withRouter(RoundRobinPool(nrOfInstances = 4)), "replier") + val master = system.actorOf(Props[Replier].withRouter(RoundRobinPool(nrOfInstances = 7)), "master") + +======= val replier = system.actorOf(Props[Replier].withRouter(RoundRobinPool(nrOfInstances = 2)), "replier") + val master = system.actorOf(Props[Master], "Master") + master ! Work() +>>>>>>> Stashed changes val random = new Random() startServer(interface = "localhost", port = 9090) { @@ -98,7 +110,12 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil path("ok") { traceName("OK") { complete { +<<<<<<< Updated upstream + (master ? "Ok").mapTo[String] +======= + master ! Work() "ok" +>>>>>>> Stashed changes } } } ~ @@ -127,6 +144,27 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil } +case class Work() +class Master extends Actor { + var router = { + val routees = Vector.fill(5) { + val r = context.actorOf(Props[PrintWhatever]) + context.watch(r) + ActorRefRoutee(r) + } + Router(RoundRobinRoutingLogic(), routees) + } + def receive = { + case w: Work => + router.route(w, sender()) + case Terminated(a) => + router = router.removeRoutee(a) + val r = context.actorOf(Props[PrintWhatever]) + context watch r + router = router.addRoutee(r) + } +} + class PrintWhatever extends Actor { def receive = { case TickMetricSnapshot(from, to, metrics) ⇒ -- cgit v1.2.3