From 50d89e2a25b331e953a03ad8d91a18b9e8c0b121 Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 29 Aug 2014 02:33:04 -0300 Subject: + core: provide metrics for routers * processing-time * errors * time-in-mailbox closes #62 --- kamon-core/src/main/resources/reference.conf | 11 ++ .../akka/ActorCellInstrumentation.scala | 62 ++++++++- .../akka/RoutedActorCellInstrumentation.scala | 143 +++++++++++++++++++++ .../main/scala/kamon/metric/RouterMetrics.scala | 85 ++++++++++++ 4 files changed, 294 insertions(+), 7 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala (limited to 'kamon-core/src/main') diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index b3df73bf..ace05e87 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -49,6 +49,12 @@ kamon { excludes = [ "system/*", "user/IO-*" ] } }, + { + router { + includes = [] + excludes = [ "system/*", "user/IO-*" ] + } + }, { trace { includes = [ "*" ] @@ -88,6 +94,11 @@ kamon { mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision} } + router { + processing-time = ${kamon.metrics.precision.default-histogram-precision} + time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision} + } + trace { elapsed-time = ${kamon.metrics.precision.default-histogram-precision} segment = ${kamon.metrics.precision.default-histogram-precision} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala index 235f5143..f6b68617 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala @@ -18,9 +18,11 @@ package akka.instrumentation import akka.actor._ import akka.dispatch.{ Envelope, MessageDispatcher } +import akka.routing.RoutedActorCell import kamon.Kamon import kamon.metric.ActorMetrics.ActorMetricsRecorder -import kamon.metric.{ ActorMetrics, Metrics } +import kamon.metric.RouterMetrics.RouterMetricsRecorder +import kamon.metric.{ ActorMetrics, Metrics, RouterMetrics } import kamon.trace._ import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ @@ -28,6 +30,8 @@ import org.aspectj.lang.annotation._ @Aspect class ActorCellInstrumentation { + import ActorCellInstrumentation.PimpedActorCellMetrics + @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} @@ -38,8 +42,14 @@ class ActorCellInstrumentation { val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.metricIdentity = metricIdentity + cellWithMetrics.actorMetricIdentity = metricIdentity cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) + + cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ + val routerMetricIdentity = RouterMetrics(s"${routedActorCell.asInstanceOf[RoutedActorCell].self.path.elements.mkString("/")}") + routedActorCell.routerMetricIdentity = routerMetricIdentity + routedActorCell.routerMetricsRecorder = metricsExtension.register(routerMetricIdentity, RouterMetrics.Factory) + } } @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") @@ -47,9 +57,9 @@ class ActorCellInstrumentation { @Around("invokingActorBehaviourAtActorCell(cell, envelope)") def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] val timestampBeforeProcessing = System.nanoTime() val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] try { TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { @@ -58,9 +68,23 @@ class ActorCellInstrumentation { } finally { cellWithMetrics.actorMetricsRecorder.map { am ⇒ - am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) - am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) + val processingTime = System.nanoTime() - timestampBeforeProcessing + val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime + + am.processingTime.record(processingTime) + am.timeInMailbox.record(timeInMailbox) am.mailboxSize.decrement() + + (processingTime, timeInMailbox) + } map { + case (processingTime, timeInMailbox) ⇒ + cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ + routedActorCell.routerMetricsRecorder.map { + rm ⇒ + rm.processingTime.record(processingTime) + rm.timeInMailbox.record(timeInMailbox) + } + } } } } @@ -82,7 +106,13 @@ class ActorCellInstrumentation { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] cellWithMetrics.actorMetricsRecorder.map { p ⇒ - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity) + Kamon(Metrics)(cell.system).unregister(cellWithMetrics.actorMetricIdentity) + } + + cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ + routedActorCell.routerMetricsRecorder.map { rm ⇒ + Kamon(Metrics)(cell.system).unregister(cellWithMetrics.routerMetricIdentity) + } } } @@ -96,12 +126,21 @@ class ActorCellInstrumentation { cellWithMetrics.actorMetricsRecorder.map { am ⇒ am.errors.increment() } + + cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ + routedActorCell.routerMetricsRecorder.map { + rm ⇒ rm.errors.increment() + } + } } + } trait ActorCellMetrics { - var metricIdentity: ActorMetrics = _ + var actorMetricIdentity: ActorMetrics = _ + var routerMetricIdentity: RouterMetrics = _ var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ + var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ } @Aspect @@ -125,4 +164,13 @@ class TraceContextIntoEnvelopeMixin { // Necessary to force the initialization of ContextAware at the moment of creation. ctx.traceContext } +} + +object ActorCellInstrumentation { + implicit class PimpedActorCellMetrics(cell: ActorCellMetrics) { + def onRoutedActorCell(block: ActorCellMetrics ⇒ Unit) = cell match { + case routedActorCell: RoutedActorCell ⇒ block(cell) + case everyThingElse ⇒ + } + } } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala new file mode 100644 index 00000000..f75080db --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala @@ -0,0 +1,143 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.instrumentation + +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.dispatch.MessageDispatcher +import akka.routing.RoutedActorCell +import kamon.metric.RouterMetrics +import kamon.metric.RouterMetrics.RouterMetricsRecorder +import org.aspectj.lang.annotation._ + +@Aspect +class RoutedActorCellInstrumentation { + + @Pointcut("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(system, ref, routerProps, routerDispatcher, supervisor)") + def actorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, routerProps: Props, routerDispatcher: MessageDispatcher, supervisor: ActorRef ): Unit = {} + +// @After("actorCellCreation(cell, system, ref, routerProps, routerDispatcher, supervisor)") + @After("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(*, ref, *, *, *, *)") +// def afterCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, routerProps: Props, routerDispatcher: MessageDispatcher, supervisor: ActorRef): Unit = { + def a(cell: RoutedActorCell, ref: ActorRef) = { + + print("adf;kjaskadjlfaj"+ ref) +// cell.router.routees +// val metricsExtension = Kamon(Metrics)(system) +// val metricIdentity = RouterMetrics(ref.path.elements.mkString("/")) +// val cellWithMetrics = cell.asInstanceOf[RoutedActorCellMetrics] +// +// cellWithMetrics.metricIdentity = metricIdentity +// cellWithMetrics.routerMetricsRecorder = metricsExtension.register(metricIdentity, RouterMetrics.Factory) + } +} + +trait RoutedActorCellMetrics { + var metricIdentity: RouterMetrics = _ + var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ +} + +@Aspect +class RoutedActorCellMetricsIntoRoutedActorCellMixin { + + @DeclareMixin("akka.routing.RoutedActorCell") + def mixinRoutedActorCellMetricsToRoutedActorCell: RoutedActorCellMetrics = new RoutedActorCellMetrics {} +} + +// @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") +// def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} +// +// @Around("invokingActorBehaviourAtActorCell(cell, envelope)") +// def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { +// val timestampBeforeProcessing = System.nanoTime() +// val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] +// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] +// +// try { +// TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { +// pjp.proceed() +// } +// } finally { +// cellWithMetrics.actorMetricsRecorder.map { +// am ⇒ +// am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) +// am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) +// am.mailboxSize.decrement() +// } +// } +// } +// +// @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)") +// def sendingMessageToActorCell(cell: ActorCell): Unit = {} +// +// @After("sendingMessageToActorCell(cell)") +// def afterSendMessageToActorCell(cell: ActorCell): Unit = { +// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] +// cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment()) +// } +// +// @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") +// def actorStop(cell: ActorCell): Unit = {} +// +// @After("actorStop(cell)") +// def afterStop(cell: ActorCell): Unit = { +// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] +// +// cellWithMetrics.actorMetricsRecorder.map { p ⇒ +// Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity) +// } +// } +// +// @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") +// def actorInvokeFailure(cell: ActorCell): Unit = {} +// +// @Before("actorInvokeFailure(cell)") +// def beforeInvokeFailure(cell: ActorCell): Unit = { +// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] +// +// cellWithMetrics.actorMetricsRecorder.map { +// am ⇒ am.errors.increment() +// } +// } +//} +// +//trait ActorCellMetrics { +// var metricIdentity: ActorMetrics = _ +// var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ +//} +// +//@Aspect +//class ActorCellMetricsIntoActorCellMixin { +// +// @DeclareMixin("akka.actor.ActorCell") +// def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} +//} +// +//@Aspect +//class TraceContextIntoEnvelopeMixin { +// +// @DeclareMixin("akka.dispatch.Envelope") +// def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default +// +// @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") +// def envelopeCreation(ctx: TraceContextAware): Unit = {} +// +// @After("envelopeCreation(ctx)") +// def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { +// // Necessary to force the initialization of ContextAware at the moment of creation. +// ctx.traceContext +// } +//} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala b/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala new file mode 100644 index 00000000..adb2a18b --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala @@ -0,0 +1,85 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package kamon.metric + +import akka.actor.ActorSystem +import com.typesafe.config.Config +<<<<<<< Updated upstream +import kamon.metric.instrument.{ Counter, Histogram } +======= +import kamon.metric.instrument.{Counter, Histogram} +>>>>>>> Stashed changes + +case class RouterMetrics(name: String) extends MetricGroupIdentity { + val category = RouterMetrics +} + +object RouterMetrics extends MetricGroupCategory { + val name = "router" + + case object ProcessingTime extends MetricIdentity { val name = "processing-time" } + case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } + case object Errors extends MetricIdentity { val name = "errors" } + + case class RouterMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder { + + def collect(context: CollectionContext): RouterMetricSnapshot = + RouterMetricSnapshot(processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context)) + + def cleanup: Unit = { + processingTime.cleanup + timeInMailbox.cleanup + errors.cleanup + } + } + + case class RouterMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, +<<<<<<< Updated upstream + errors: Counter.Snapshot) extends MetricGroupSnapshot { +======= + errors: Counter.Snapshot) extends MetricGroupSnapshot { +>>>>>>> Stashed changes + + type GroupSnapshotType = RouterMetricSnapshot + + def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot = + RouterMetricSnapshot( + processingTime.merge(that.processingTime, context), + timeInMailbox.merge(that.timeInMailbox, context), + errors.merge(that.errors, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + ProcessingTime -> processingTime, + TimeInMailbox -> timeInMailbox, + Errors -> errors) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = RouterMetricsRecorder + + def create(config: Config, system: ActorSystem): RouterMetricsRecorder = { + val settings = config.getConfig("precision.router") + + val processingTimeConfig = settings.getConfig("processing-time") + val timeInMailboxConfig = settings.getConfig("time-in-mailbox") + + new RouterMetricsRecorder( + Histogram.fromConfig(processingTimeConfig), + Histogram.fromConfig(timeInMailboxConfig), + Counter()) + } + } +} -- cgit v1.2.3