From 3566482d061248ff01882fa9647ae2d65677d5ed Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 14 Mar 2016 21:59:02 +0100 Subject: introduce selective instrumentation for akka actors. --- .../src/main/scala/kamon/akka/ActorMetrics.scala | 41 ---- .../src/main/scala/kamon/akka/AkkaExtension.scala | 33 ++- .../main/scala/kamon/akka/EntityRecorders.scala | 74 +++++++ .../src/main/scala/kamon/akka/RouterMetrics.scala | 40 ---- .../instrumentation/ActorCellInstrumentation.scala | 245 --------------------- .../instrumentation/ActorInstrumentation.scala | 157 +++++++++++++ .../kamon/akka/instrumentation/ActorMonitor.scala | 135 ++++++++++++ .../kamon/akka/instrumentation/CellInfo.scala | 31 +++ .../instrumentation/EnvelopeInstrumentation.scala | 32 +++ .../instrumentation/RouterInstrumentation.scala | 53 +++++ .../kamon/akka/instrumentation/RouterMonitor.scala | 61 +++++ 11 files changed, 569 insertions(+), 333 deletions(-) delete mode 100644 kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala create mode 100644 kamon-akka/src/main/scala/kamon/akka/EntityRecorders.scala delete mode 100644 kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala delete mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala create mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorInstrumentation.scala create mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala create mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/CellInfo.scala create mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/EnvelopeInstrumentation.scala create mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterInstrumentation.scala create mode 100644 kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterMonitor.scala (limited to 'kamon-akka/src/main/scala/kamon') diff --git a/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala deleted file mode 100644 index c99df586..00000000 --- a/kamon-akka/src/main/scala/kamon/akka/ActorMetrics.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.akka - -import kamon.metric.{ EntityRecorderFactory, GenericEntityRecorder } -import kamon.metric.instrument.{ Time, InstrumentFactory } - -/** - * Entity recorder for Akka Actors. The metrics being tracked are: - * - * - time-in-mailbox: Time spent from the instant when a message is enqueued in a actor's mailbox to the instant when - * that message is dequeued for processing. - * - processing-time: Time taken for the actor to process the receive function. - * - mailbox-size: Size of the actor's mailbox. - * - errors: Number or errors seen by the actor's supervision mechanism. - */ -class ActorMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { - val timeInMailbox = histogram("time-in-mailbox", Time.Nanoseconds) - val processingTime = histogram("processing-time", Time.Nanoseconds) - val mailboxSize = minMaxCounter("mailbox-size") - val errors = counter("errors") -} - -object ActorMetrics extends EntityRecorderFactory[ActorMetrics] { - def category: String = "akka-actor" - def createRecorder(instrumentFactory: InstrumentFactory): ActorMetrics = new ActorMetrics(instrumentFactory) -} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala index 2fe2a42f..95bfc64e 100644 --- a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala +++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala @@ -20,7 +20,10 @@ import com.typesafe.config.Config import kamon.Kamon object AkkaExtension { - val askPatternTimeoutWarning = AskPatternTimeoutWarningSettings.fromConfig(Kamon.config.getConfig("kamon.akka")) + private val akkaConfig = Kamon.config.getConfig("kamon.akka") + + val askPatternTimeoutWarning = AskPatternTimeoutWarningSettings.fromConfig(akkaConfig) + val traceContextPropagation = TraceContextPropagationSettings.fromConfig(akkaConfig) } sealed trait AskPatternTimeoutWarningSetting @@ -29,11 +32,27 @@ object AskPatternTimeoutWarningSettings { case object Lightweight extends AskPatternTimeoutWarningSetting case object Heavyweight extends AskPatternTimeoutWarningSetting - def fromConfig(config: Config): AskPatternTimeoutWarningSetting = config.getString("ask-pattern-timeout-warning") match { - case "off" ⇒ Off - case "lightweight" ⇒ Lightweight - case "heavyweight" ⇒ Heavyweight - case other ⇒ sys.error(s"Unrecognized option [$other] for the kamon.akka.ask-pattern-timeout-warning config.") - } + def fromConfig(config: Config): AskPatternTimeoutWarningSetting = + config.getString("ask-pattern-timeout-warning") match { + case "off" ⇒ Off + case "lightweight" ⇒ Lightweight + case "heavyweight" ⇒ Heavyweight + case other ⇒ sys.error(s"Unrecognized option [$other] for the kamon.akka.ask-pattern-timeout-warning config.") + } +} + +sealed trait TraceContextPropagationSetting +object TraceContextPropagationSettings { + case object Off extends TraceContextPropagationSetting + case object MonitoredActorsOnly extends TraceContextPropagationSetting + case object Always extends TraceContextPropagationSetting + + def fromConfig(config: Config): TraceContextPropagationSetting = + config.getString("automatic-trace-context-propagation") match { + case "off" ⇒ Off + case "monitored-actors-only" ⇒ MonitoredActorsOnly + case "always" ⇒ Always + case other ⇒ sys.error(s"Unrecognized option [$other] for the kamon.akka.automatic-trace-context-propagation config.") + } } diff --git a/kamon-akka/src/main/scala/kamon/akka/EntityRecorders.scala b/kamon-akka/src/main/scala/kamon/akka/EntityRecorders.scala new file mode 100644 index 00000000..35a5b80b --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/EntityRecorders.scala @@ -0,0 +1,74 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package kamon.akka + +import kamon.metric._ +import kamon.metric.instrument.{ Time, InstrumentFactory } + +/** + * + * Entity recorder for Akka Actors. The metrics being tracked are: + * + * - time-in-mailbox: Time spent from the instant when a message is enqueued in a actor's mailbox to the instant when + * that message is dequeued for processing. + * - processing-time: Time taken for the actor to process the receive function. + * - mailbox-size: Size of the actor's mailbox. + * - errors: Number or errors seen by the actor's supervision mechanism. + */ +class ActorMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val timeInMailbox = histogram("time-in-mailbox", Time.Nanoseconds) + val processingTime = histogram("processing-time", Time.Nanoseconds) + val mailboxSize = minMaxCounter("mailbox-size") + val errors = counter("errors") +} + +object ActorMetrics extends EntityRecorderFactoryCompanion[ActorMetrics]("akka-actor", new ActorMetrics(_)) + +/** + * + * Entity recorder for Akka Routers. The metrics being tracked are: + * + * - routing-time: Time taken for the router to process the routing logic. + * - time-in-mailbox: Time spent from the instant when a message is enqueued in a actor's mailbox to the instant when + * that message is dequeued for processing. + * - processing-time: Time taken for the actor to process the receive function. + * - errors: Number or errors seen by the actor's supervision mechanism. + */ +class RouterMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val routingTime = histogram("routing-time", Time.Nanoseconds) + val timeInMailbox = histogram("time-in-mailbox", Time.Nanoseconds) + val processingTime = histogram("processing-time", Time.Nanoseconds) + val errors = counter("errors") +} + +object RouterMetrics extends EntityRecorderFactoryCompanion[RouterMetrics]("akka-router", new RouterMetrics(_)) + +/** + * + * Entity recorder for Actor Groups. The metrics being tracked are: + * + * - time-in-mailbox: Time spent from the instant when a message is enqueued in a actor's mailbox to the instant when + * that message is dequeued for processing. + * - processing-time: Time taken for the actor to process the receive function. + * - errors: Number or errors seen by the actor's supervision mechanism. + */ +class ActorGroupMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val timeInMailbox = histogram("time-in-mailbox", Time.Nanoseconds) + val processingTime = histogram("processing-time", Time.Nanoseconds) + val errors = counter("errors") +} + +object ActorGroupMetrics extends EntityRecorderFactoryCompanion[ActorGroupMetrics]("akka-actor-group", new ActorGroupMetrics(_)) diff --git a/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala b/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala deleted file mode 100644 index 5c5bb05a..00000000 --- a/kamon-akka/src/main/scala/kamon/akka/RouterMetrics.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.akka - -import kamon.metric._ -import kamon.metric.instrument.{ Time, InstrumentFactory } - -/** - * Entity recorder for Akka Routers. The metrics being tracked are: - * - * - routing-time: Time taken for the router to process the routing logic. - * - time-in-mailbox: Time spent from the instant when a message is enqueued in a actor's mailbox to the instant when - * that message is dequeued for processing. - * - processing-time: Time taken for the actor to process the receive function. - * - errors: Number or errors seen by the actor's supervision mechanism. - */ -class RouterMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { - val routingTime = histogram("routing-time", Time.Nanoseconds) - val timeInMailbox = histogram("time-in-mailbox", Time.Nanoseconds) - val processingTime = histogram("processing-time", Time.Nanoseconds) - val errors = counter("errors") -} - -object RouterMetrics extends EntityRecorderFactory[RouterMetrics] { - def category: String = "akka-router" - def createRecorder(instrumentFactory: InstrumentFactory): RouterMetrics = new RouterMetrics(instrumentFactory) -} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala deleted file mode 100644 index cd0d55e9..00000000 --- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala +++ /dev/null @@ -1,245 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package akka.kamon.instrumentation - -import akka.actor._ -import akka.dispatch.{ Envelope, MessageDispatcher } -import akka.routing.RoutedActorCell -import kamon.Kamon -import kamon.akka.{ ActorMetrics, RouterMetrics } -import kamon.metric.{ Entity, MetricsModule } -import kamon.trace._ -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation._ - -@Aspect -class ActorCellInstrumentation { - - @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") - def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} - - @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") - def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - def isRootSupervisor(path: String): Boolean = path.length == 0 || path == "user" || path == "system" - - val pathString = ref.path.elements.mkString("/") - val actorEntity = Entity(system.name + "/" + pathString, ActorMetrics.category) - - if (!isRootSupervisor(pathString) && Kamon.metrics.shouldTrack(actorEntity)) { - val actorMetricsRecorder = Kamon.metrics.entity(ActorMetrics, actorEntity) - val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - - cellMetrics.entity = actorEntity - cellMetrics.recorder = Some(actorMetricsRecorder) - cellMetrics.metrics = Kamon.metrics - } - } - - @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(envelope)") - def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} - - @Around("invokingActorBehaviourAtActorCell(cell, envelope)") - def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { - val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] - - try { - Tracer.withContext(contextAndTimestamp.traceContext) { - pjp.proceed() - } - } finally { - val processingTime = System.nanoTime() - timestampBeforeProcessing - val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime - - cellMetrics.recorder.foreach { am ⇒ - am.processingTime.record(processingTime) - am.timeInMailbox.record(timeInMailbox) - am.mailboxSize.decrement() - } - - // In case that this actor is behind a router, record the metrics for the router. - envelope.asInstanceOf[RouterAwareEnvelope].routerMetricsRecorder.foreach { rm ⇒ - rm.processingTime.record(processingTime) - rm.timeInMailbox.record(timeInMailbox) - } - } - } - - @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell) && args(envelope)") - def sendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = {} - - @After("sendMessageInActorCell(cell, envelope)") - def afterSendMessageInActorCell(cell: ActorCell, envelope: Envelope): Unit = { - val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - cellMetrics.recorder.foreach { am ⇒ - am.mailboxSize.increment() - } - } - - @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") - def actorStop(cell: ActorCell): Unit = {} - - @After("actorStop(cell)") - def afterStop(cell: ActorCell): Unit = { - val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - cellMetrics.unsubscribe() - - // The Stop can't be captured from the RoutedActorCell so we need to put this piece of cleanup here. - if (cell.isInstanceOf[RoutedActorCell]) { - val routedCellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] - routedCellMetrics.unsubscribe() - } - } - - @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") - def actorInvokeFailure(cell: ActorCell): Unit = {} - - @Before("actorInvokeFailure(cell)") - def beforeInvokeFailure(cell: ActorCell): Unit = { - val cellMetrics = cell.asInstanceOf[ActorCellMetrics] - cellMetrics.recorder.foreach { am ⇒ - am.errors.increment() - } - - // In case that this actor is behind a router, count the errors for the router as well. - val envelope = cell.currentMessage.asInstanceOf[RouterAwareEnvelope] - if (envelope ne null) { - // The ActorCell.handleInvokeFailure(..) method is also called when a failure occurs - // while processing a system message, in which case ActorCell.currentMessage is always - // null. - envelope.routerMetricsRecorder.foreach { rm ⇒ - rm.errors.increment() - } - } - } -} - -@Aspect -class RoutedActorCellInstrumentation { - - @Pointcut("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, routeeProps, supervisor)") - def routedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = {} - - @After("routedActorCellCreation(cell, system, ref, props, dispatcher, routeeProps, supervisor)") - def afterRoutedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = { - val routerEntity = Entity(system.name + "/" + ref.path.elements.mkString("/"), RouterMetrics.category) - - if (Kamon.metrics.shouldTrack(routerEntity)) { - val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] - - cellMetrics.metrics = Kamon.metrics - cellMetrics.routerEntity = routerEntity - cellMetrics.routerRecorder = Some(Kamon.metrics.entity(RouterMetrics, routerEntity)) - } - } - - @Pointcut("execution(* akka.routing.RoutedActorCell.sendMessage(*)) && this(cell) && args(envelope)") - def sendMessageInRouterActorCell(cell: RoutedActorCell, envelope: Envelope) = {} - - @Around("sendMessageInRouterActorCell(cell, envelope)") - def aroundSendMessageInRouterActorCell(pjp: ProceedingJoinPoint, cell: RoutedActorCell, envelope: Envelope): Any = { - val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] - val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[TimestampedTraceContextAware] - - try { - Tracer.withContext(contextAndTimestamp.traceContext) { - - // The router metrics recorder will only be picked up if the message is sent from a tracked router. - RouterAwareEnvelope.dynamicRouterMetricsRecorder.withValue(cellMetrics.routerRecorder) { - pjp.proceed() - } - } - } finally { - cellMetrics.routerRecorder.foreach { routerRecorder ⇒ - routerRecorder.routingTime.record(System.nanoTime() - timestampBeforeProcessing) - } - } - } -} - -trait WithMetricModule { - var metrics: MetricsModule = _ -} - -trait ActorCellMetrics extends WithMetricModule { - - var entity: Entity = _ - var recorder: Option[ActorMetrics] = None - - def unsubscribe() = { - recorder.foreach { _ ⇒ - metrics.removeEntity(entity) - } - } -} - -trait RoutedActorCellMetrics extends WithMetricModule { - var routerEntity: Entity = _ - var routerRecorder: Option[RouterMetrics] = None - - def unsubscribe() = { - routerRecorder.foreach { _ ⇒ - metrics.removeEntity(routerEntity) - } - } -} - -trait RouterAwareEnvelope { - def routerMetricsRecorder: Option[RouterMetrics] -} - -object RouterAwareEnvelope { - import scala.util.DynamicVariable - private[kamon] val dynamicRouterMetricsRecorder = new DynamicVariable[Option[RouterMetrics]](None) - - def default: RouterAwareEnvelope = new RouterAwareEnvelope { - val routerMetricsRecorder: Option[RouterMetrics] = dynamicRouterMetricsRecorder.value - } -} - -@Aspect -class MetricsIntoActorCellsMixin { - - @DeclareMixin("akka.actor.ActorCell") - def mixinActorCellMetricsToActorCell: ActorCellMetrics = new ActorCellMetrics {} - - @DeclareMixin("akka.routing.RoutedActorCell") - def mixinActorCellMetricsToRoutedActorCell: RoutedActorCellMetrics = new RoutedActorCellMetrics {} - -} - -@Aspect -class TraceContextIntoEnvelopeMixin { - - @DeclareMixin("akka.dispatch.Envelope") - def mixinTraceContextAwareToEnvelope: TimestampedTraceContextAware = TimestampedTraceContextAware.default - - @DeclareMixin("akka.dispatch.Envelope") - def mixinRouterAwareToEnvelope: RouterAwareEnvelope = RouterAwareEnvelope.default - - @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def envelopeCreation(ctx: TimestampedTraceContextAware): Unit = {} - - @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: TimestampedTraceContextAware with RouterAwareEnvelope): Unit = { - // Necessary to force the initialization of ContextAware at the moment of creation. - ctx.traceContext - ctx.routerMetricsRecorder - } -} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorInstrumentation.scala new file mode 100644 index 00000000..7b4664f8 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorInstrumentation.scala @@ -0,0 +1,157 @@ +package akka.kamon.instrumentation + +import java.util.concurrent.locks.ReentrantLock + +import akka.actor._ +import akka.dispatch.Envelope +import akka.dispatch.sysmsg.SystemMessage +import akka.routing.RoutedActorCell +import kamon.trace.Tracer +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +import scala.collection.immutable + +@Aspect +class ActorCellInstrumentation { + + def actorInstrumentation(cell: Cell): ActorMonitor = + cell.asInstanceOf[ActorInstrumentationAware].actorInstrumentation + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, *, *, parent)") + def actorCellCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: InternalActorRef): Unit = {} + + @Pointcut("execution(akka.actor.UnstartedCell.new(..)) && this(cell) && args(system, ref, *, parent)") + def repointableActorRefCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: InternalActorRef): Unit = {} + + @After("actorCellCreation(cell, system, ref, parent)") + def afterCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): Unit = { + cell.asInstanceOf[ActorInstrumentationAware].setActorInstrumentation( + ActorMonitor.createActorMonitor(cell, system, ref, parent)) + } + + @After("repointableActorRefCreation(cell, system, ref, parent)") + def afterRepointableActorRefCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): Unit = { + cell.asInstanceOf[ActorInstrumentationAware].setActorInstrumentation( + ActorMonitor.createActorMonitor(cell, system, ref, parent)) + } + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(envelope)") + def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} + + @Around("invokingActorBehaviourAtActorCell(cell, envelope)") + def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { + actorInstrumentation(cell).processMessage(pjp, envelope.asInstanceOf[InstrumentedEnvelope].envelopeContext()) + } + + /** + * + * + */ + + @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell) && args(envelope)") + def sendMessageInActorCell(cell: Cell, envelope: Envelope): Unit = {} + + @Pointcut("execution(* akka.actor.UnstartedCell.sendMessage(*)) && this(cell) && args(envelope)") + def sendMessageInUnstartedActorCell(cell: Cell, envelope: Envelope): Unit = {} + + @Before("sendMessageInActorCell(cell, envelope)") + def afterSendMessageInActorCell(cell: Cell, envelope: Envelope): Unit = { + envelope.asInstanceOf[InstrumentedEnvelope].setEnvelopeContext( + actorInstrumentation(cell).captureEnvelopeContext()) + } + + @Before("sendMessageInUnstartedActorCell(cell, envelope)") + def afterSendMessageInUnstartedActorCell(cell: Cell, envelope: Envelope): Unit = { + envelope.asInstanceOf[InstrumentedEnvelope].setEnvelopeContext( + actorInstrumentation(cell).captureEnvelopeContext()) + } + + @Pointcut("execution(* akka.actor.UnstartedCell.replaceWith(*)) && this(unStartedCell) && args(cell)") + def replaceWithInRepointableActorRef(unStartedCell: UnstartedCell, cell: Cell): Unit = {} + + @Around("replaceWithInRepointableActorRef(unStartedCell, cell)") + def aroundReplaceWithInRepointableActorRef(pjp: ProceedingJoinPoint, unStartedCell: UnstartedCell, cell: Cell): Unit = { + // TODO: Find a way to do this without resorting to reflection and, even better, without copy/pasting the Akka Code! + val unstartedCellClass = classOf[UnstartedCell] + val queueField = unstartedCellClass.getDeclaredField("akka$actor$UnstartedCell$$queue") + queueField.setAccessible(true) + + val lockField = unstartedCellClass.getDeclaredField("lock") + lockField.setAccessible(true) + + val queue = queueField.get(unStartedCell).asInstanceOf[java.util.LinkedList[_]] + val lock = lockField.get(unStartedCell).asInstanceOf[ReentrantLock] + + def locked[T](body: ⇒ T): T = { + lock.lock() + try body finally lock.unlock() + } + + locked { + try { + while (!queue.isEmpty) { + queue.poll() match { + case s: SystemMessage ⇒ cell.sendSystemMessage(s) // TODO: ============= CHECK SYSTEM MESSAGESSSSS ========= + case e: Envelope with InstrumentedEnvelope ⇒ + Tracer.withContext(e.envelopeContext().context) { + cell.sendMessage(e) + } + } + } + } finally { + unStartedCell.self.swapCell(cell) + } + } + } + + /** + * + */ + + @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") + def actorStop(cell: ActorCell): Unit = {} + + @After("actorStop(cell)") + def afterStop(cell: ActorCell): Unit = { + actorInstrumentation(cell).cleanup() + + // The Stop can't be captured from the RoutedActorCell so we need to put this piece of cleanup here. + if (cell.isInstanceOf[RoutedActorCell]) { + cell.asInstanceOf[RouterInstrumentationAware].routerInstrumentation.cleanup() + } + } + + @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell) && args(childrenNotToSuspend, failure)") + def actorInvokeFailure(cell: ActorCell, childrenNotToSuspend: immutable.Iterable[ActorRef], failure: Throwable): Unit = {} + + @Before("actorInvokeFailure(cell, childrenNotToSuspend, failure)") + def beforeInvokeFailure(cell: ActorCell, childrenNotToSuspend: immutable.Iterable[ActorRef], failure: Throwable): Unit = { + actorInstrumentation(cell).processFailure(failure) + } +} + +trait ActorInstrumentationAware { + def actorInstrumentation: ActorMonitor + def setActorInstrumentation(ai: ActorMonitor): Unit +} + +object ActorInstrumentationAware { + def apply(): ActorInstrumentationAware = new ActorInstrumentationAware { + private var _ai: ActorMonitor = _ + + def setActorInstrumentation(ai: ActorMonitor): Unit = _ai = ai + def actorInstrumentation: ActorMonitor = _ai + } +} + +@Aspect +class MetricsIntoActorCellsMixin { + + @DeclareMixin("akka.actor.ActorCell") + def mixinActorCellMetricsToActorCell: ActorInstrumentationAware = ActorInstrumentationAware() + + @DeclareMixin("akka.actor.UnstartedCell") + def mixinActorCellMetricsToUnstartedActorCell: ActorInstrumentationAware = ActorInstrumentationAware() + +} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala new file mode 100644 index 00000000..a2b920a2 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala @@ -0,0 +1,135 @@ +package akka.kamon.instrumentation + +import akka.actor.{ Cell, ActorRef, ActorSystem } +import akka.kamon.instrumentation.ActorMonitors.{ TrackedRouteeWithContextPropagation, TrackedRoutee, ContextPropagation, TrackedActor } +import kamon.Kamon +import kamon.akka.TraceContextPropagationSettings.{ Always, MonitoredActorsOnly, Off } +import kamon.akka.{ AkkaExtension, RouterMetrics, ActorMetrics } +import kamon.metric.Entity +import kamon.trace.{ TraceContext, EmptyTraceContext, Tracer } +import kamon.util.RelativeNanoTimestamp +import org.aspectj.lang.ProceedingJoinPoint + +trait ActorMonitor { + def captureEnvelopeContext(): EnvelopeContext = EnvelopeContext(captureTimestamp, captureTraceContext) + def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = pjp.proceed() + def processFailure(failure: Throwable): Unit = {} + def cleanup(): Unit = {} + + protected def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.zero + protected def captureTraceContext: TraceContext = EmptyTraceContext +} + +object ActorMonitor { + + def createActorMonitor(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): ActorMonitor = { + val cellInfo = CellInfo.cellInfoFor(cell, system, ref, parent) + + if (cellInfo.isRouter) + ActorMonitors.NoOp + else { + if (cellInfo.isRoutee) + createRouteeMonitor(cellInfo) + else + createRegularActorMonitor(cellInfo) + } + } + + def createRegularActorMonitor(cellInfo: CellInfo): ActorMonitor = { + def actorMetrics = Kamon.metrics.entity(ActorMetrics, cellInfo.entity) + + AkkaExtension.traceContextPropagation match { + case Off if cellInfo.isTracked ⇒ new TrackedActor(cellInfo.entity, actorMetrics) + case Off ⇒ ActorMonitors.NoOp + case MonitoredActorsOnly if cellInfo.isTracked ⇒ new TrackedActor(cellInfo.entity, actorMetrics) with ContextPropagation + case MonitoredActorsOnly ⇒ ActorMonitors.NoOp + case Always if cellInfo.isTracked ⇒ new TrackedActor(cellInfo.entity, actorMetrics) with ContextPropagation + case Always ⇒ ActorMonitors.ContextPropagationOnly + } + } + + def createRouteeMonitor(cellInfo: CellInfo): ActorMonitor = { + def routerMetrics = Kamon.metrics.entity(RouterMetrics, cellInfo.entity) + + AkkaExtension.traceContextPropagation match { + case Off if cellInfo.isTracked ⇒ new TrackedRoutee(cellInfo.entity, routerMetrics) + case Off ⇒ ActorMonitors.NoOp + case MonitoredActorsOnly if cellInfo.isTracked ⇒ new TrackedRouteeWithContextPropagation(cellInfo.entity, routerMetrics) + case MonitoredActorsOnly ⇒ ActorMonitors.NoOp + case Always if cellInfo.isTracked ⇒ new TrackedRouteeWithContextPropagation(cellInfo.entity, routerMetrics) + case Always ⇒ ActorMonitors.ContextPropagationOnly + } + } +} + +object ActorMonitors { + val NoOp = new ActorMonitor {} + val ContextPropagationOnly = new ActorMonitor with ContextPropagation + + class TrackedActor(val entity: Entity, actorMetrics: ActorMetrics) extends ActorMonitor { + override def captureEnvelopeContext(): EnvelopeContext = { + actorMetrics.mailboxSize.increment() + super.captureEnvelopeContext() + } + + override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = { + val timestampBeforeProcessing = RelativeNanoTimestamp.now + + try { + super.processMessage(pjp, envelopeContext) + + } finally { + val timestampAfterProcessing = RelativeNanoTimestamp.now + val timeInMailbox = timestampBeforeProcessing - envelopeContext.nanoTime + val processingTime = timestampAfterProcessing - timestampBeforeProcessing + + actorMetrics.processingTime.record(processingTime.nanos) + actorMetrics.timeInMailbox.record(timeInMailbox.nanos) + actorMetrics.mailboxSize.decrement() + } + } + + override def processFailure(failure: Throwable): Unit = actorMetrics.errors.increment() + override def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now + override def cleanup(): Unit = Kamon.metrics.removeEntity(entity) + } + + class TrackedRoutee(val entity: Entity, routerMetrics: RouterMetrics) extends ActorMonitor { + + override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = { + val timestampBeforeProcessing = RelativeNanoTimestamp.now + + try { + super.processMessage(pjp, envelopeContext) + + } finally { + val timestampAfterProcessing = RelativeNanoTimestamp.now + val timeInMailbox = timestampBeforeProcessing - envelopeContext.nanoTime + val processingTime = timestampAfterProcessing - timestampBeforeProcessing + + routerMetrics.processingTime.record(processingTime.nanos) + routerMetrics.timeInMailbox.record(timeInMailbox.nanos) + } + } + + override def processFailure(failure: Throwable): Unit = routerMetrics.errors.increment() + override def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now + override def cleanup(): Unit = {} + } + + trait ContextPropagation extends ActorMonitor { + override protected def captureTraceContext: TraceContext = Tracer.currentContext + + override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = { + Tracer.withContext(envelopeContext.context) { + super.processMessage(pjp, envelopeContext) + } + } + } + + class TrackedActorWithContextPropagation(entity: Entity, actorMetrics: ActorMetrics) + extends TrackedActor(entity, actorMetrics) with ContextPropagation + + class TrackedRouteeWithContextPropagation(entity: Entity, routerMetrics: RouterMetrics) + extends TrackedRoutee(entity, routerMetrics) with ContextPropagation +} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/CellInfo.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/CellInfo.scala new file mode 100644 index 00000000..e144e605 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/CellInfo.scala @@ -0,0 +1,31 @@ +package akka.kamon.instrumentation + +import akka.actor.{ Cell, ActorRef, ActorSystem } +import akka.routing.{ RoutedActorRef, RoutedActorCell } +import kamon.Kamon +import kamon.akka.{ ActorMetrics, RouterMetrics } +import kamon.metric.Entity + +case class CellInfo(entity: Entity, isRouter: Boolean, isRoutee: Boolean, isTracked: Boolean) + +object CellInfo { + + def cellName(system: ActorSystem, ref: ActorRef): String = + system.name + "/" + ref.path.elements.mkString("/") + + def cellInfoFor(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): CellInfo = { + import kamon.metric.Entity + + val pathString = ref.path.elements.mkString("/") + val isRootSupervisor = pathString.length == 0 || pathString == "user" || pathString == "system" + val isRouter = cell.isInstanceOf[RoutedActorCell] + val isRoutee = parent.isInstanceOf[RoutedActorRef] + + val name = if (isRoutee) cellName(system, parent) else cellName(system, ref) + val category = if (isRouter || isRoutee) RouterMetrics.category else ActorMetrics.category + val entity = Entity(name, category) + val isTracked = !isRootSupervisor && Kamon.metrics.shouldTrack(entity) + + CellInfo(entity, isRouter, isRoutee, isTracked) + } +} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/EnvelopeInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/EnvelopeInstrumentation.scala new file mode 100644 index 00000000..0bb50dc2 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/EnvelopeInstrumentation.scala @@ -0,0 +1,32 @@ +package akka.kamon.instrumentation + +import kamon.trace.{ EmptyTraceContext, TraceContext } +import kamon.util.RelativeNanoTimestamp +import org.aspectj.lang.annotation.{ DeclareMixin, Aspect } + +case class EnvelopeContext(nanoTime: RelativeNanoTimestamp, context: TraceContext) + +object EnvelopeContext { + val Empty = EnvelopeContext(RelativeNanoTimestamp.zero, EmptyTraceContext) +} + +trait InstrumentedEnvelope { + def envelopeContext(): EnvelopeContext + def setEnvelopeContext(envelopeContext: EnvelopeContext): Unit +} + +object InstrumentedEnvelope { + def apply(): InstrumentedEnvelope = new InstrumentedEnvelope { + var envelopeContext: EnvelopeContext = _ + + def setEnvelopeContext(envelopeContext: EnvelopeContext): Unit = + this.envelopeContext = envelopeContext + } +} + +@Aspect +class EnvelopeContextIntoEnvelopeMixin { + + @DeclareMixin("akka.dispatch.Envelope") + def mixinInstrumentationToEnvelope: InstrumentedEnvelope = InstrumentedEnvelope() +} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterInstrumentation.scala new file mode 100644 index 00000000..c11abc34 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterInstrumentation.scala @@ -0,0 +1,53 @@ +package akka.kamon.instrumentation + +import akka.actor.{ Props, ActorRef, ActorSystem, Cell } +import akka.dispatch.{ Envelope, MessageDispatcher } +import akka.routing.RoutedActorCell +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +@Aspect +class RoutedActorCellInstrumentation { + + def routerInstrumentation(cell: Cell): RouterMonitor = + cell.asInstanceOf[RouterInstrumentationAware].routerInstrumentation + + @Pointcut("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, routeeProps, supervisor)") + def routedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = {} + + @After("routedActorCellCreation(cell, system, ref, props, dispatcher, routeeProps, supervisor)") + def afterRoutedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = { + cell.asInstanceOf[RouterInstrumentationAware].setRouterInstrumentation( + RouterMonitor.createRouterInstrumentation(cell)) + } + + @Pointcut("execution(* akka.routing.RoutedActorCell.sendMessage(*)) && this(cell) && args(envelope)") + def sendMessageInRouterActorCell(cell: RoutedActorCell, envelope: Envelope) = {} + + @Around("sendMessageInRouterActorCell(cell, envelope)") + def aroundSendMessageInRouterActorCell(pjp: ProceedingJoinPoint, cell: RoutedActorCell, envelope: Envelope): Any = { + routerInstrumentation(cell).processMessage(pjp) + } +} + +trait RouterInstrumentationAware { + def routerInstrumentation: RouterMonitor + def setRouterInstrumentation(ai: RouterMonitor): Unit +} + +object RouterInstrumentationAware { + def apply(): RouterInstrumentationAware = new RouterInstrumentationAware { + private var _ri: RouterMonitor = _ + + def setRouterInstrumentation(ai: RouterMonitor): Unit = _ri = ai + def routerInstrumentation: RouterMonitor = _ri + } +} + +@Aspect +class MetricsIntoRouterCellsMixin { + + @DeclareMixin("akka.routing.RoutedActorCell") + def mixinActorCellMetricsToRoutedActorCell: RouterInstrumentationAware = RouterInstrumentationAware() + +} \ No newline at end of file diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterMonitor.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterMonitor.scala new file mode 100644 index 00000000..5c4c7aa3 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterMonitor.scala @@ -0,0 +1,61 @@ +package akka.kamon.instrumentation + +import akka.actor.{ Cell, Props, ActorRef, ActorSystem } +import akka.dispatch.{ Envelope, MessageDispatcher } +import akka.routing.RoutedActorCell +import kamon.Kamon +import kamon.akka.RouterMetrics +import kamon.metric.Entity +import kamon.util.RelativeNanoTimestamp +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +trait RouterMonitor { + def processMessage(pjp: ProceedingJoinPoint): AnyRef + def processFailure(failure: Throwable): Unit + def cleanup(): Unit + + def routeeAdded(): Unit + def routeeRemoved(): Unit +} + +object RouterMonitor { + + def createRouterInstrumentation(cell: Cell): RouterMonitor = { + val cellInfo = CellInfo.cellInfoFor(cell, cell.system, cell.self, cell.parent) + def routerMetrics = Kamon.metrics.entity(RouterMetrics, cellInfo.entity) + + if (cellInfo.isTracked) + new MetricsOnlyRouterMonitor(cellInfo.entity, routerMetrics) + else NoOpRouterMonitor + } +} + +object NoOpRouterMonitor extends RouterMonitor { + def processMessage(pjp: ProceedingJoinPoint): AnyRef = pjp.proceed() + def processFailure(failure: Throwable): Unit = {} + def routeeAdded(): Unit = {} + def routeeRemoved(): Unit = {} + def cleanup(): Unit = {} +} + +class MetricsOnlyRouterMonitor(entity: Entity, routerMetrics: RouterMetrics) extends RouterMonitor { + + def processMessage(pjp: ProceedingJoinPoint): AnyRef = { + val timestampBeforeProcessing = RelativeNanoTimestamp.now + + try { + pjp.proceed() + } finally { + val timestampAfterProcessing = RelativeNanoTimestamp.now + val routingTime = timestampAfterProcessing - timestampBeforeProcessing + + routerMetrics.routingTime.record(routingTime.nanos) + } + } + + def processFailure(failure: Throwable): Unit = {} + def routeeAdded(): Unit = {} + def routeeRemoved(): Unit = {} + def cleanup(): Unit = Kamon.metrics.removeEntity(entity) +} \ No newline at end of file -- cgit v1.2.3