aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala')
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala245
1 files changed, 0 insertions, 245 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala
deleted file mode 100644
index cd0d55e9..00000000
--- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package akka.kamon.instrumentation
-
-import akka.actor._
-import akka.dispatch.{ Envelope, MessageDispatcher }
-import akka.routing.RoutedActorCell
-import kamon.Kamon
-import kamon.akka.{ ActorMetrics, RouterMetrics }
-import kamon.metric.{ Entity, MetricsModule }
-import kamon.trace._
-import org.aspectj.lang.ProceedingJoinPoint
-import org.aspectj.lang.annotation._
-
-@Aspect
-class ActorCellInstrumentation {
-
- @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)")
- def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
-
- @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)")
- def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
- def isRootSupervisor(path: String): Boolean = path.length == 0 || path == "user" || path == "system"
-
- val pathString = ref.path.elements.mkString("/")
- val actorEntity = Entity(system.name + "/" + pathString, ActorMetrics.category)
-
- if (!isRootSupervisor(pathString) && Kamon.metrics.shouldTrack(actorEntity)) {
- val actorMetricsRecorder = Kamon.metrics.entity(ActorMetrics, actorEntity)
- val cellMetrics = cell.asInstanceOf[ActorCellMetrics]
-
- cellMetrics.entity = actorEntity
- cellMetrics.recorder = Some(actorMetricsRecorder)
- cellMetrics.metrics = Kamon.metrics
- }
- }
-
- @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(envelope)")
- def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {}
-
- @Around("invokingActorBehaviourAtActorCell(cell, envelope)")
- def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = {
- val cellMetrics = cell.asInstanceOf[ActorCellMetrics]
- val timestampBeforeProcessing = System.nanoTime()
- val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware]
-
- try {
- Tracer.withContext(contextAndTimestamp.traceContext) {
- pjp.proceed()
- }
- } finally {
- val processingTime = System.nanoTime() - timestampBeforeProcessing
- val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime
-
- cellMetrics.recorder.foreach { am ⇒
- am.processingTime.record(processingTime)
- am.timeInMailbox.record(timeInMailbox)
- am.mailboxSize.decrement()
- }
-
- // In case that this actor is behind a router, record the metrics for the router.
- envelope.asInstanceOf[RouterAwareEnvelope].routerMetricsRecorder.foreach { rm ⇒
- rm.processingTime.record(processingTime)
- rm.timeInMailbox.record(timeInMailbox)
- }
- }
- }
-
- @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell) && args(envelope)")
- def sendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = {}
-
- @After("sendMessageInActorCell(cell, envelope)")
- def afterSendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = {
- val cellMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellMetrics.recorder.foreach { am ⇒
- am.mailboxSize.increment()
- }
- }
-
- @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
- def actorStop(cell: ActorCell): Unit = {}
-
- @After("actorStop(cell)")
- def afterStop(cell: ActorCell): Unit = {
- val cellMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellMetrics.unsubscribe()
-
- // The Stop can't be captured from the RoutedActorCell so we need to put this piece of cleanup here.
- if (cell.isInstanceOf[RoutedActorCell]) {
- val routedCellMetrics = cell.asInstanceOf[RoutedActorCellMetrics]
- routedCellMetrics.unsubscribe()
- }
- }
-
- @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)")
- def actorInvokeFailure(cell: ActorCell): Unit = {}
-
- @Before("actorInvokeFailure(cell)")
- def beforeInvokeFailure(cell: ActorCell): Unit = {
- val cellMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellMetrics.recorder.foreach { am ⇒
- am.errors.increment()
- }
-
- // In case that this actor is behind a router, count the errors for the router as well.
- val envelope = cell.currentMessage.asInstanceOf[RouterAwareEnvelope]
- if (envelope ne null) {
- // The ActorCell.handleInvokeFailure(..) method is also called when a failure occurs
- // while processing a system message, in which case ActorCell.currentMessage is always
- // null.
- envelope.routerMetricsRecorder.foreach { rm ⇒
- rm.errors.increment()
- }
- }
- }
-}
-
-@Aspect
-class RoutedActorCellInstrumentation {
-
- @Pointcut("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, routeeProps, supervisor)")
- def routedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = {}
-
- @After("routedActorCellCreation(cell, system, ref, props, dispatcher, routeeProps, supervisor)")
- def afterRoutedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = {
- val routerEntity = Entity(system.name + "/" + ref.path.elements.mkString("/"), RouterMetrics.category)
-
- if (Kamon.metrics.shouldTrack(routerEntity)) {
- val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics]
-
- cellMetrics.metrics = Kamon.metrics
- cellMetrics.routerEntity = routerEntity
- cellMetrics.routerRecorder = Some(Kamon.metrics.entity(RouterMetrics, routerEntity))
- }
- }
-
- @Pointcut("execution(* akka.routing.RoutedActorCell.sendMessage(*)) && this(cell) && args(envelope)")
- def sendMessageInRouterActorCell(cell: RoutedActorCell, envelope: Envelope) = {}
-
- @Around("sendMessageInRouterActorCell(cell, envelope)")
- def aroundSendMessageInRouterActorCell(pjp: ProceedingJoinPoint, cell: RoutedActorCell, envelope: Envelope): Any = {
- val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics]
- val timestampBeforeProcessing = System.nanoTime()
- val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware]
-
- try {
- Tracer.withContext(contextAndTimestamp.traceContext) {
-
- // The router metrics recorder will only be picked up if the message is sent from a tracked router.
- RouterAwareEnvelope.dynamicRouterMetricsRecorder.withValue(cellMetrics.routerRecorder) {
- pjp.proceed()
- }
- }
- } finally {
- cellMetrics.routerRecorder.foreach { routerRecorder ⇒
- routerRecorder.routingTime.record(System.nanoTime() - timestampBeforeProcessing)
- }
- }
- }
-}
-
-trait WithMetricModule {
- var metrics: MetricsModule = _
-}
-
-trait ActorCellMetrics extends WithMetricModule {
-
- var entity: Entity = _
- var recorder: Option[ActorMetrics] = None
-
- def unsubscribe() = {
- recorder.foreach { _ ⇒
- metrics.removeEntity(entity)
- }
- }
-}
-
-trait RoutedActorCellMetrics extends WithMetricModule {
- var routerEntity: Entity = _
- var routerRecorder: Option[RouterMetrics] = None
-
- def unsubscribe() = {
- routerRecorder.foreach { _ ⇒
- metrics.removeEntity(routerEntity)
- }
- }
-}
-
-trait RouterAwareEnvelope {
- def routerMetricsRecorder: Option[RouterMetrics]
-}
-
-object RouterAwareEnvelope {
- import scala.util.DynamicVariable
- private[kamon] val dynamicRouterMetricsRecorder = new DynamicVariable[Option[RouterMetrics]](None)
-
- def default: RouterAwareEnvelope = new RouterAwareEnvelope {
- val routerMetricsRecorder: Option[RouterMetrics] = dynamicRouterMetricsRecorder.value
- }
-}
-
-@Aspect
-class MetricsIntoActorCellsMixin {
-
- @DeclareMixin("akka.actor.ActorCell")
- def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {}
-
- @DeclareMixin("akka.routing.RoutedActorCell")
- def mixinActorCellMetricsToRoutedActorCell: RoutedActorCellMetrics = new RoutedActorCellMetrics {}
-
-}
-
-@Aspect
-class TraceContextIntoEnvelopeMixin {
-
- @DeclareMixin("akka.dispatch.Envelope")
- def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default
-
- @DeclareMixin("akka.dispatch.Envelope")
- def mixinRouterAwareToEnvelope: RouterAwareEnvelope = RouterAwareEnvelope.default
-
- @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)")
- def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {}
-
- @After("envelopeCreation(ctx)")
- def afterEnvelopeCreation(ctx: TimestampedTraceContextAware with RouterAwareEnvelope): Unit = {
- // Necessary to force the initialization of ContextAware at the moment of creation.
- ctx.traceContext
- ctx.routerMetricsRecorder
- }
-} \ No newline at end of file