aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-08-29 02:33:04 -0300
committerDiego <diegolparra@gmail.com>2014-08-29 02:33:04 -0300
commit50d89e2a25b331e953a03ad8d91a18b9e8c0b121 (patch)
tree417637c3ac48d7afc754e5eeadd3a7067275bde0 /kamon-core/src/main
parent5ead4817f1bdfbfe6c46d9c70fd08a69623d90ac (diff)
downloadKamon-50d89e2a25b331e953a03ad8d91a18b9e8c0b121.tar.gz
Kamon-50d89e2a25b331e953a03ad8d91a18b9e8c0b121.tar.bz2
Kamon-50d89e2a25b331e953a03ad8d91a18b9e8c0b121.zip
+ core: provide metrics for routers
* processing-time * errors * time-in-mailbox closes #62
Diffstat (limited to 'kamon-core/src/main')
-rw-r--r--kamon-core/src/main/resources/reference.conf11
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala62
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala143
-rw-r--r--kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala85
4 files changed, 294 insertions, 7 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index b3df73bf..ace05e87 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -50,6 +50,12 @@ kamon {
}
},
{
+ router {
+ includes = []
+ excludes = [ "system/*", "user/IO-*" ]
+ }
+ },
+ {
trace {
includes = [ "*" ]
excludes = []
@@ -88,6 +94,11 @@ kamon {
mailbox-size = ${kamon.metrics.precision.default-min-max-counter-precision}
}
+ router {
+ processing-time = ${kamon.metrics.precision.default-histogram-precision}
+ time-in-mailbox = ${kamon.metrics.precision.default-histogram-precision}
+ }
+
trace {
elapsed-time = ${kamon.metrics.precision.default-histogram-precision}
segment = ${kamon.metrics.precision.default-histogram-precision}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
index 235f5143..f6b68617 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
@@ -18,9 +18,11 @@ package akka.instrumentation
import akka.actor._
import akka.dispatch.{ Envelope, MessageDispatcher }
+import akka.routing.RoutedActorCell
import kamon.Kamon
import kamon.metric.ActorMetrics.ActorMetricsRecorder
-import kamon.metric.{ ActorMetrics, Metrics }
+import kamon.metric.RouterMetrics.RouterMetricsRecorder
+import kamon.metric.{ ActorMetrics, Metrics, RouterMetrics }
import kamon.trace._
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation._
@@ -28,6 +30,8 @@ import org.aspectj.lang.annotation._
@Aspect
class ActorCellInstrumentation {
+ import ActorCellInstrumentation.PimpedActorCellMetrics
+
@Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)")
def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
@@ -38,8 +42,14 @@ class ActorCellInstrumentation {
val metricIdentity = ActorMetrics(ref.path.elements.mkString("/"))
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.metricIdentity = metricIdentity
+ cellWithMetrics.actorMetricIdentity = metricIdentity
cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory)
+
+ cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
+ val routerMetricIdentity = RouterMetrics(s"${routedActorCell.asInstanceOf[RoutedActorCell].self.path.elements.mkString("/")}")
+ routedActorCell.routerMetricIdentity = routerMetricIdentity
+ routedActorCell.routerMetricsRecorder = metricsExtension.register(routerMetricIdentity, RouterMetrics.Factory)
+ }
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
@@ -47,9 +57,9 @@ class ActorCellInstrumentation {
@Around("invokingActorBehaviourAtActorCell(cell, envelope)")
def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = {
+ val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
val timestampBeforeProcessing = System.nanoTime()
val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware]
- val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
try {
TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) {
@@ -58,9 +68,23 @@ class ActorCellInstrumentation {
} finally {
cellWithMetrics.actorMetricsRecorder.map {
am ⇒
- am.processingTime.record(System.nanoTime() - timestampBeforeProcessing)
- am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime)
+ val processingTime = System.nanoTime() - timestampBeforeProcessing
+ val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime
+
+ am.processingTime.record(processingTime)
+ am.timeInMailbox.record(timeInMailbox)
am.mailboxSize.decrement()
+
+ (processingTime, timeInMailbox)
+ } map {
+ case (processingTime, timeInMailbox) ⇒
+ cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
+ routedActorCell.routerMetricsRecorder.map {
+ rm ⇒
+ rm.processingTime.record(processingTime)
+ rm.timeInMailbox.record(timeInMailbox)
+ }
+ }
}
}
}
@@ -82,7 +106,13 @@ class ActorCellInstrumentation {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
cellWithMetrics.actorMetricsRecorder.map { p ⇒
- Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity)
+ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.actorMetricIdentity)
+ }
+
+ cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
+ routedActorCell.routerMetricsRecorder.map { rm ⇒
+ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.routerMetricIdentity)
+ }
}
}
@@ -96,12 +126,21 @@ class ActorCellInstrumentation {
cellWithMetrics.actorMetricsRecorder.map {
am ⇒ am.errors.increment()
}
+
+ cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
+ routedActorCell.routerMetricsRecorder.map {
+ rm ⇒ rm.errors.increment()
+ }
+ }
}
+
}
trait ActorCellMetrics {
- var metricIdentity: ActorMetrics = _
+ var actorMetricIdentity: ActorMetrics = _
+ var routerMetricIdentity: RouterMetrics = _
var actorMetricsRecorder: Option[ActorMetricsRecorder] = _
+ var routerMetricsRecorder: Option[RouterMetricsRecorder] = _
}
@Aspect
@@ -125,4 +164,13 @@ class TraceContextIntoEnvelopeMixin {
// Necessary to force the initialization of ContextAware at the moment of creation.
ctx.traceContext
}
+}
+
+object ActorCellInstrumentation {
+ implicit class PimpedActorCellMetrics(cell: ActorCellMetrics) {
+ def onRoutedActorCell(block: ActorCellMetrics ⇒ Unit) = cell match {
+ case routedActorCell: RoutedActorCell ⇒ block(cell)
+ case everyThingElse ⇒
+ }
+ }
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala
new file mode 100644
index 00000000..f75080db
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/RoutedActorCellInstrumentation.scala
@@ -0,0 +1,143 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package akka.instrumentation
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.dispatch.MessageDispatcher
+import akka.routing.RoutedActorCell
+import kamon.metric.RouterMetrics
+import kamon.metric.RouterMetrics.RouterMetricsRecorder
+import org.aspectj.lang.annotation._
+
+@Aspect
+class RoutedActorCellInstrumentation {
+
+ @Pointcut("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(system, ref, routerProps, routerDispatcher, supervisor)")
+ def actorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, routerProps: Props, routerDispatcher: MessageDispatcher, supervisor: ActorRef ): Unit = {}
+
+// @After("actorCellCreation(cell, system, ref, routerProps, routerDispatcher, supervisor)")
+ @After("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(*, ref, *, *, *, *)")
+// def afterCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, routerProps: Props, routerDispatcher: MessageDispatcher, supervisor: ActorRef): Unit = {
+ def a(cell: RoutedActorCell, ref: ActorRef) = {
+
+ print("adf;kjaskadjlfaj"+ ref)
+// cell.router.routees
+// val metricsExtension = Kamon(Metrics)(system)
+// val metricIdentity = RouterMetrics(ref.path.elements.mkString("/"))
+// val cellWithMetrics = cell.asInstanceOf[RoutedActorCellMetrics]
+//
+// cellWithMetrics.metricIdentity = metricIdentity
+// cellWithMetrics.routerMetricsRecorder = metricsExtension.register(metricIdentity, RouterMetrics.Factory)
+ }
+}
+
+trait RoutedActorCellMetrics {
+ var metricIdentity: RouterMetrics = _
+ var routerMetricsRecorder: Option[RouterMetricsRecorder] = _
+}
+
+@Aspect
+class RoutedActorCellMetricsIntoRoutedActorCellMixin {
+
+ @DeclareMixin("akka.routing.RoutedActorCell")
+ def mixinRoutedActorCellMetricsToRoutedActorCell: RoutedActorCellMetrics = new RoutedActorCellMetrics {}
+}
+
+// @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
+// def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {}
+//
+// @Around("invokingActorBehaviourAtActorCell(cell, envelope)")
+// def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = {
+// val timestampBeforeProcessing = System.nanoTime()
+// val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware]
+// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
+//
+// try {
+// TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) {
+// pjp.proceed()
+// }
+// } finally {
+// cellWithMetrics.actorMetricsRecorder.map {
+// am ⇒
+// am.processingTime.record(System.nanoTime() - timestampBeforeProcessing)
+// am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime)
+// am.mailboxSize.decrement()
+// }
+// }
+// }
+//
+// @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)")
+// def sendingMessageToActorCell(cell: ActorCell): Unit = {}
+//
+// @After("sendingMessageToActorCell(cell)")
+// def afterSendMessageToActorCell(cell: ActorCell): Unit = {
+// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
+// cellWithMetrics.actorMetricsRecorder.map(am ⇒ am.mailboxSize.increment())
+// }
+//
+// @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
+// def actorStop(cell: ActorCell): Unit = {}
+//
+// @After("actorStop(cell)")
+// def afterStop(cell: ActorCell): Unit = {
+// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
+//
+// cellWithMetrics.actorMetricsRecorder.map { p ⇒
+// Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity)
+// }
+// }
+//
+// @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)")
+// def actorInvokeFailure(cell: ActorCell): Unit = {}
+//
+// @Before("actorInvokeFailure(cell)")
+// def beforeInvokeFailure(cell: ActorCell): Unit = {
+// val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
+//
+// cellWithMetrics.actorMetricsRecorder.map {
+// am ⇒ am.errors.increment()
+// }
+// }
+//}
+//
+//trait ActorCellMetrics {
+// var metricIdentity: ActorMetrics = _
+// var actorMetricsRecorder: Option[ActorMetricsRecorder] = _
+//}
+//
+//@Aspect
+//class ActorCellMetricsIntoActorCellMixin {
+//
+// @DeclareMixin("akka.actor.ActorCell")
+// def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {}
+//}
+//
+//@Aspect
+//class TraceContextIntoEnvelopeMixin {
+//
+// @DeclareMixin("akka.dispatch.Envelope")
+// def mixinTraceContextAwareToEnvelope: TraceContextAware = TraceContextAware.default
+//
+// @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
+// def envelopeCreation(ctx: TraceContextAware): Unit = {}
+//
+// @After("envelopeCreation(ctx)")
+// def afterEnvelopeCreation(ctx: TraceContextAware): Unit = {
+// // Necessary to force the initialization of ContextAware at the moment of creation.
+// ctx.traceContext
+// }
+//} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala b/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala
new file mode 100644
index 00000000..adb2a18b
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/RouterMetrics.scala
@@ -0,0 +1,85 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+package kamon.metric
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+<<<<<<< Updated upstream
+import kamon.metric.instrument.{ Counter, Histogram }
+=======
+import kamon.metric.instrument.{Counter, Histogram}
+>>>>>>> Stashed changes
+
+case class RouterMetrics(name: String) extends MetricGroupIdentity {
+ val category = RouterMetrics
+}
+
+object RouterMetrics extends MetricGroupCategory {
+ val name = "router"
+
+ case object ProcessingTime extends MetricIdentity { val name = "processing-time" }
+ case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" }
+ case object Errors extends MetricIdentity { val name = "errors" }
+
+ case class RouterMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, errors: Counter) extends MetricGroupRecorder {
+
+ def collect(context: CollectionContext): RouterMetricSnapshot =
+ RouterMetricSnapshot(processingTime.collect(context), timeInMailbox.collect(context), errors.collect(context))
+
+ def cleanup: Unit = {
+ processingTime.cleanup
+ timeInMailbox.cleanup
+ errors.cleanup
+ }
+ }
+
+ case class RouterMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot,
+<<<<<<< Updated upstream
+ errors: Counter.Snapshot) extends MetricGroupSnapshot {
+=======
+ errors: Counter.Snapshot) extends MetricGroupSnapshot {
+>>>>>>> Stashed changes
+
+ type GroupSnapshotType = RouterMetricSnapshot
+
+ def merge(that: RouterMetricSnapshot, context: CollectionContext): RouterMetricSnapshot =
+ RouterMetricSnapshot(
+ processingTime.merge(that.processingTime, context),
+ timeInMailbox.merge(that.timeInMailbox, context),
+ errors.merge(that.errors, context))
+
+ lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
+ ProcessingTime -> processingTime,
+ TimeInMailbox -> timeInMailbox,
+ Errors -> errors)
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = RouterMetricsRecorder
+
+ def create(config: Config, system: ActorSystem): RouterMetricsRecorder = {
+ val settings = config.getConfig("precision.router")
+
+ val processingTimeConfig = settings.getConfig("processing-time")
+ val timeInMailboxConfig = settings.getConfig("time-in-mailbox")
+
+ new RouterMetricsRecorder(
+ Histogram.fromConfig(processingTimeConfig),
+ Histogram.fromConfig(timeInMailboxConfig),
+ Counter())
+ }
+ }
+}